Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 34 additions & 24 deletions bolt/workflows/smlv_somatic/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,36 +115,45 @@ def entry(ctx, **kwargs):
# PCGR report
purple_data = parse_purple_purity_file(kwargs['purple_purity_fp'])

pcgr_skipped = False
if variant_counts_process['filter_pass'] <= constants.MAX_SOMATIC_VARIANTS:
pcgr_input_vcf_fp = kwargs['vcf_fp']
else:
pcgr_input_vcf_fp = select_pcgr_variants(
kwargs['vcf_fp'],
kwargs['cancer_genes_fp'],
try:
pcgr_input_vcf_fp = select_pcgr_variants(
kwargs['vcf_fp'],
kwargs['cancer_genes_fp'],
kwargs['tumor_name'],
output_dir,
)
except RuntimeError as e:
# NOTE(QC): tiered filtering could not bring PASS count below
# MAX_SOMATIC_VARIANTS (sash #52). Skip PCGR; sash marks the
# PCGR emits as optional so downstream reports still publish.
logger.warning(f'Skipping PCGR for {kwargs["tumor_name"]}: {e}')
pcgr_skipped = True

if not pcgr_skipped:
pcgr_prep_fp = pcgr.prepare_vcf_somatic(
pcgr_input_vcf_fp,
kwargs['tumor_name'],
kwargs['normal_name'],
output_dir,
)

pcgr_prep_fp = pcgr.prepare_vcf_somatic(
pcgr_input_vcf_fp,
kwargs['tumor_name'],
kwargs['normal_name'],
output_dir,
)

pcgr_output_dir = output_dir / 'pcgr'
pcgr.run_somatic(
pcgr_prep_fp,
kwargs['pcgr_data_dir'],
kwargs['vep_dir'],
pcgr_output_dir,
threads=kwargs['threads'],
pcgr_conda=kwargs['pcgr_conda'],
pcgrr_conda=kwargs['pcgrr_conda'],
purity=purple_data['purity'],
ploidy=purple_data['ploidy'],
sample_id=kwargs['tumor_name'],
)
pcgr_output_dir = output_dir / 'pcgr'
pcgr.run_somatic(
pcgr_prep_fp,
kwargs['pcgr_data_dir'],
kwargs['vep_dir'],
pcgr_output_dir,
threads=kwargs['threads'],
pcgr_conda=kwargs['pcgr_conda'],
pcgrr_conda=kwargs['pcgrr_conda'],
purity=purple_data['purity'],
ploidy=purple_data['ploidy'],
sample_id=kwargs['tumor_name'],
)


def bcftools_stats_prepare(input_fp, tumor_name, output_dir):
Expand Down Expand Up @@ -336,7 +345,8 @@ def select_pcgr_variants(vcf_fp, cancer_genes_fp, tumor_name, output_dir):
for variant_count, variant in enumerate(cyvcf2.VCF(fp_annotated_out), 1):
variant_repr = pcgr.get_variant_repr(variant)

if any(variant.INFO.get(e) for e in constants.RETAIN_FIELDS_FILTERING):
# NOTE(QC): exclude '.' — PCGR writes it as a missing-value placeholder for String fields; cyvcf2 returns it truthy (sash #52).
if any(variant.INFO.get(e) not in (None, '.') for e in constants.RETAIN_FIELDS_FILTERING):
continue

data = pcgr.get_variant_filter_data(variant)
Expand Down
137 changes: 137 additions & 0 deletions tests/test_pcgr_hypermutated.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import unittest
from unittest.mock import patch

from click.testing import CliRunner

import cyvcf2

import bolt.common.constants as constants
Expand Down Expand Up @@ -172,6 +174,35 @@ def test_retained_variants_bypass_tiered_filter(self):
count = self._run(v, limit=4, tmp=tmp)
self.assertEqual(count, 4)

def test_pcgr_mutation_hotspot_real_value_is_retained(self):
"""A real PCGR_MUTATION_HOTSPOT value (non-dot) must retain the variant."""
with tempfile.TemporaryDirectory() as tmp:
v = []
for i in range(1, 3): # 2 real hotspot variants — must survive
v.append((i*10, f'PCGR_MUTATION_HOTSPOT=GRCH38_1_{i}_A_T;PCGR_ACTIONABILITY_TIER=1;PCGR_CSQ={_csq("intron_variant")}'))
for i in range(3, 8): # 5 NONCODING — dropped
v.append((i*10, f'PCGR_ACTIONABILITY_TIER=N;PCGR_CSQ={_csq("intergenic_variant")}'))
count = self._run(v, limit=2, tmp=tmp)
self.assertEqual(count, 2)

def test_pcgr_mutation_hotspot_dot_not_treated_as_retained(self):
"""PCGR_MUTATION_HOTSPOT=. must not retain variants — '.' is a missing-value placeholder.

cyvcf2 returns the string '.' (truthy) for String INFO fields written as '=.' by PCGR on
every non-hotspot variant. Without the fix, any(variant.INFO.get(e) ...) always returns
True and ALL variants are treated as retained, so tiered filtering never drops anything and
RuntimeError fires for any sample with >450k variants (sash #52 root cause).
"""
with tempfile.TemporaryDirectory() as tmp:
v = []
for i in range(1, 3): # 2 real SAGE_HOTSPOT — must be retained
v.append((i*10, f'SAGE_HOTSPOT;PCGR_MUTATION_HOTSPOT=.;PCGR_ACTIONABILITY_TIER=1;PCGR_CSQ={_csq("intron_variant")}'))
for i in range(3, 8): # 5 NONCODING with PCGR_MUTATION_HOTSPOT=. — must be droppable
v.append((i*10, f'PCGR_MUTATION_HOTSPOT=.;PCGR_ACTIONABILITY_TIER=N;PCGR_CSQ={_csq("intergenic_variant")}'))
# limit=2: only the 2 real SAGE_HOTSPOT variants survive; the 5 dot-placeholder ones are dropped
count = self._run(v, limit=2, tmp=tmp)
self.assertEqual(count, 2)

def test_filters_set_vcf_marks_dropped_variants(self):
"""The traceability VCF marks filtered-out variants with PCGR_count_limit.

Expand Down Expand Up @@ -535,5 +566,111 @@ def test_annotation_filter_excluded_from_annotated_count(self):
self.assertEqual(counts['dragen'], 2)


class TestSelectPcgrVariantsRaisesOnUnresolvableOverflow(unittest.TestCase):
"""select_pcgr_variants raises RuntimeError when all variants are retained (hotspots)."""

def _fake_execute(self, vcf_fp):
def _run(cmd, **_):
import re
m = re.search(r'--output\s+(\S+)', cmd)
if m and 'bcftools annotate' in cmd:
shutil.copy(str(vcf_fp), m.group(1))
else:
util.execute_command(cmd)
return _run

def test_raises_when_all_variants_are_hotspots(self):
"""All SAGE_HOTSPOT variants are RETAIN_FIELDS — tiered filtering cannot drop any; RuntimeError expected."""
# RETAIN_FIELDS_FILTERING includes SAGE_HOTSPOT — use that flag, not HMF_HOTSPOT
HOTSPOT_INFO = f'SAGE_HOTSPOT;PCGR_ACTIONABILITY_TIER=1;PCGR_CSQ={_csq("intron_variant")}'
with tempfile.TemporaryDirectory() as tmp:
tmp_path = pathlib.Path(tmp)
vcf_fp = tmp_path / 'input.vcf'
variants = [(i * 10, HOTSPOT_INFO) for i in range(1, 6)]
_write_vcf(vcf_fp, variants)
cancer_genes = tmp_path / 'genes.bed'
cancer_genes.write_text('chr1\t1\t9999999\n')

with patch('bolt.common.constants.MAX_SOMATIC_VARIANTS', 3), \
patch('bolt.util.execute_command', side_effect=self._fake_execute(vcf_fp)):
with self.assertRaises(RuntimeError):
report_mod.select_pcgr_variants(vcf_fp, cancer_genes, 'TUMOR', tmp_path)


_PASS_COUNTS = {'pass': {'snps': 0, 'indels': 0, 'others': 0, 'total': 0}}


def _cli_args(dummy, output_dir):
"""Return CliRunner args list for report entry(); all file paths point to dummy."""
d = str(dummy)
return [
'--tumor_name', 'TUMOR',
'--normal_name', 'NORMAL',
'--vcf_fp', d,
'--vcf_filters_fp', d,
'--vcf_dragen_fp', d,
'--vep_dir', str(dummy.parent),
'--purple_purity_fp', d,
'--cancer_genes_fp', d,
'--giab_regions_fp', d,
'--genome_fp', d,
'--threads', '1',
'--output_dir', str(output_dir),
]


class TestEntrySkipsPcgrOnOverflow(unittest.TestCase):
"""entry() catches RuntimeError from select_pcgr_variants and skips PCGR entirely."""

def test_run_somatic_not_called_when_cap_exceeded(self):
"""When select_pcgr_variants raises RuntimeError, run_somatic must not be called."""
with tempfile.TemporaryDirectory() as tmp:
tmp_path = pathlib.Path(tmp)
dummy = tmp_path / 'dummy.vcf.gz'
dummy.touch()

with patch.object(report_mod, 'bcftools_stats_prepare', return_value=dummy), \
patch.object(report_mod, 'run_bcftools_stats'), \
patch.object(report_mod, 'allele_frequencies'), \
patch.object(report_mod, 'count_variant_types', return_value=_PASS_COUNTS), \
patch.object(report_mod, 'count_variant_process',
return_value={'filter_pass': constants.MAX_SOMATIC_VARIANTS + 1}), \
patch.object(report_mod, 'parse_purple_purity_file',
return_value={'purity': 0.8, 'ploidy': 2.0}), \
patch.object(report_mod, 'select_pcgr_variants',
side_effect=RuntimeError('595416 > 450000')), \
patch.object(pcgr, 'prepare_vcf_somatic') as mock_prep, \
patch.object(pcgr, 'run_somatic') as mock_run:
result = CliRunner().invoke(report_mod.entry, _cli_args(dummy, tmp_path / 'out'))

self.assertEqual(result.exit_code, 0, result.output)
mock_prep.assert_not_called()
mock_run.assert_not_called()

def test_run_somatic_called_when_within_limit(self):
"""When PASS count is within limit, run_somatic must be called normally."""
with tempfile.TemporaryDirectory() as tmp:
tmp_path = pathlib.Path(tmp)
dummy = tmp_path / 'dummy.vcf.gz'
dummy.touch()
fake_prep_output = tmp_path / 'prep.vcf.gz'
fake_prep_output.touch()

with patch.object(report_mod, 'bcftools_stats_prepare', return_value=dummy), \
patch.object(report_mod, 'run_bcftools_stats'), \
patch.object(report_mod, 'allele_frequencies'), \
patch.object(report_mod, 'count_variant_types', return_value=_PASS_COUNTS), \
patch.object(report_mod, 'count_variant_process',
return_value={'filter_pass': constants.MAX_SOMATIC_VARIANTS - 1}), \
patch.object(report_mod, 'parse_purple_purity_file',
return_value={'purity': 0.8, 'ploidy': 2.0}), \
patch.object(pcgr, 'prepare_vcf_somatic', return_value=fake_prep_output), \
patch.object(pcgr, 'run_somatic') as mock_run:
result = CliRunner().invoke(report_mod.entry, _cli_args(dummy, tmp_path / 'out'))

self.assertEqual(result.exit_code, 0, result.output)
mock_run.assert_called_once()


if __name__ == '__main__':
unittest.main()
Loading