diff --git a/deeprvat/annotations/annotations.py b/deeprvat/annotations/annotations.py index 4a6abec3..4008910a 100644 --- a/deeprvat/annotations/annotations.py +++ b/deeprvat/annotations/annotations.py @@ -20,6 +20,10 @@ from tqdm import tqdm, trange from fastparquet import ParquetFile import yaml +import duckdb + +import pyarrow as pa +import pyarrow.parquet as pq def precision(y_true, y_pred): @@ -2069,3 +2073,253 @@ def select_rename_fill_annotations( if __name__ == "__main__": cli() + + +@cli.command() +@click.option( + "--absplice-dir", + type=click.Path(exists=True, file_okay=False, dir_okay=True), + required=True, + help="Path to the directory containing AbSplice outputs", +) +@click.option( + "--ab-splice-agg-score-file", + type=click.Path(), + required=True, + help="Path to save the aggregated AbSplice score file", +) +def aggregate_and_concat_absplice( + absplice_dir: str, ab_splice_agg_score_file: str +) -> None: + """ + Aggregates AbSplice scores from multiple files in a directory and saves them to a single Parquet file. + This function performs the following steps: + 1. Iterates through all files in the specified directory. + 2. Reads each file and aggregates the AbSplice scores using the specified aggregation function. + 3. Saves the final aggregated scores to a specified Parquet file. + + Parameters: + - absplice_dir (str): Path to the directory containing AbSplice outputs. + - ab_splice_agg_score_file (str): Path to save the aggregated AbSplice score file. + + Returns: + None + """ + abs_splice_res_dir = Path(absplice_dir) + + tissue_agg_function = "max" + if not Path(ab_splice_agg_score_file).exists(): + # logger.info(f"removing existing file {ab_splice_agg_score_file}") + # os.remove(ab_splice_agg_score_file) + for i, chrom_file in enumerate(os.listdir(abs_splice_res_dir)): + logger.info(f"Reading file {chrom_file}") + # ab_splice_res = pd.read_parquet(abs_splice_res_dir/ chrom_file).reset_index() + ab_splice_res = pd.read_table(abs_splice_res_dir / chrom_file).reset_index() + absplice_columns = [ + j for j in ab_splice_res.columns if "AbSplice_DNA_" in j + ] + #### aggregate tissue specific ab splice scores + ab_splice_res["AbSplice_DNA"] = np.max( + ab_splice_res[absplice_columns].values, + axis=1, + ) + ab_splice_res["pos"] = ab_splice_res["pos"].astype(int) + logger.info(f"Number of rows of ab_splice df {len(ab_splice_res)}") + # merged = ab_splice_res.merge(current_annotations, how = 'left', on = ['chrom', 'pos', 'ref', 'alt']) + res = ab_splice_res[ + ["chrom", "pos", "ref", "alt", "gene_id", "AbSplice_DNA"] + ] + del ab_splice_res + table = pa.Table.from_pandas(res) + if i == 0: + logger.info(f"creating new file {ab_splice_agg_score_file}") + pqwriter = pq.ParquetWriter(ab_splice_agg_score_file, table.schema) + pqwriter.write_table(table) + del res + + if pqwriter: + pqwriter.close() + + +@cli.command() +@click.option( + "--annotations", + type=click.Path(exists=True), + required=True, + help="Path to annotations.parquet", +) +@click.option( + "--variants", + type=click.Path(exists=True), + required=True, + help="Path to variants.parquet", +) +@click.option( + "--genes", type=click.Path(exists=True), required=True, help="Path to genes.parquet" +) +@click.option( + "--ab-splice-agg-score-file", + type=click.Path(exists=True), + required=True, + help="Path to splice.parquet", +) +@click.option( + "--output", + type=click.Path(), + required=True, + help="Path to output splice_anno.parquet", +) +@click.option("--verbose", is_flag=True, help="Enable verbose logging") +@click.option("--mem-limit", type=int, default=0, help="memory limit for duck DB in GB") +@click.option("--fill-nan", is_flag=True, help="Fill AbSplice_DNA NA values with 0") +def merge_absplice_scores( + annotations, + variants, + genes, + ab_splice_agg_score_file, + output, + verbose, + mem_limit, + fill_nan, +): + """ + Processes and merges sbsplice scores to annotation parquet file + + This function performs the following steps: + 1. Loads and transforms genes.parquet by renaming columns and splitting the gene field. + 2. Loads annotations.parquet, excluding specified columns if they exist. + 3. Merges the modified genes table with the annotations table on the 'gene_id' column. + 4. Loads variants.parquet and merges it with the previous result on the 'id' column. + 5. Loads splice.parquet and merges it with the previous result on columns 'chrom', 'pos', 'ref', 'alt', and 'gene_id'. + 6. Saves the final merged table to a specified output parquet file. + + Parameters: + - annotations (str): Path to the annotations.parquet file. + - variants (str): Path to the variants.parquet file. + - genes (str): Path to the genes.parquet file. + - splice (str): Path to the splice.parquet file. + - output (str): Path to save the output splice_anno.parquet file. + - verbose (bool): If True, prints detailed logging information at each step. + + Returns: + None + """ + + con = duckdb.connect(database=":memory:") + + if mem_limit > 0: + logger.info(f"Setting memory limit to {mem_limit}GB") + # Set memory limit for DuckDB + con.execute(f"SET memory_limit = '{mem_limit}GB'") + + if verbose: + logger.info(f"Loading and transforming {genes}...") + + con.execute( + f""" + CREATE TEMP TABLE genes_mod AS + SELECT + id AS gene_id, + split_part(gene, '.', 1) AS gene, + split_part(gene, '.', 2) AS exon + FROM read_parquet('{genes}'); + """ + ) + + if verbose: + logger.info(f"Loading {annotations}...") + + all_columns = ( + con.execute(f"SELECT * FROM read_parquet('{annotations}') LIMIT 0") + .fetchdf() + .columns.tolist() + ) + exclude_columns = {"chrom", "pos", "ref", "alt", "AbSplice_DNA"} + selected_columns = [col for col in all_columns if col not in exclude_columns] + select_clause = ", ".join(selected_columns) + con.execute( + f""" + CREATE TEMP TABLE annotations AS + SELECT {select_clause} + FROM read_parquet('{annotations}'); + """ + ) + + if verbose: + logger.info("Merging genes_mod with annotations...") + con.execute( + """ + CREATE TEMP TABLE merged1 AS + SELECT a.*, g.gene, g.exon + FROM annotations a + LEFT JOIN genes_mod g + ON a.gene_id = g.gene_id; + """ + ) + + if verbose: + logger.info(f"Loading {variants}...") + con.execute( + f""" + CREATE TEMP TABLE variants AS + SELECT * FROM read_parquet('{variants}'); + """ + ) + + if verbose: + logger.info("Merging merged1 with variants...") + con.execute( + """ + CREATE TEMP TABLE merged2 AS + SELECT m1.*, v.chrom, v.pos, v.ref, v.alt + FROM merged1 m1 + LEFT JOIN variants v + ON m1.id = v.id; + """ + ) + + if verbose: + logger.info(f"Loading {ab_splice_agg_score_file}...") + con.execute( + f""" + CREATE TEMP TABLE splice AS + SELECT * FROM read_parquet('{ab_splice_agg_score_file}'); + """ + ) + + if verbose: + logger.info("Merging merged2 with absplice scores...") + con.execute( + """ + CREATE TEMP TABLE final_merge AS + SELECT m2.*, s.AbSplice_DNA + FROM merged2 m2 + LEFT JOIN splice s + ON m2.chrom = s.chrom + AND m2.pos = s.pos + AND m2.ref = s.ref + AND m2.alt = s.alt + AND m2.gene = s.gene_id; + """ + ) + + if fill_nan: + if verbose: + print(f"Filling NULL values in splice column with 0...") + con.execute( + f""" + UPDATE final_merge + SET AbSplice_DNA = 0 + WHERE AbSplice_DNA IS NULL; + """ + ) + + if verbose: + logger.info(f"Saving final merged table to {output}...") + con.execute( + f""" + COPY final_merge TO '{output}' (FORMAT 'parquet'); + """ + ) + + logger.info(f"Successfully saved final merged table to {output}") diff --git a/deeprvat_annotations.yml b/deeprvat_annotations.yml index bd297949..c7a148e6 100644 --- a/deeprvat_annotations.yml +++ b/deeprvat_annotations.yml @@ -7,6 +7,7 @@ dependencies: - click=8.0.4 - scikit-learn=1.0.2 - dask=2021.5.0 + - duckdb=1.2.1 - tqdm=4.59.0 - pybedtools=0.9.0 - keras=2.11.0 diff --git a/docs/annotations.md b/docs/annotations.md index 6ee05f1d..b1f6eac6 100644 --- a/docs/annotations.md +++ b/docs/annotations.md @@ -160,6 +160,10 @@ Alternatively, when the user want to select a specific set of genes to consider, - column `id`:`int` unique id for each gene Each row represents a gene the user want to include in the analysis. +## Manually adding precomputed and downloaded AbSplice scores + +Instead of running the absplice part of the annotation pipeline, users can include `include_absplice: false` in the annotation config file and manually add Absplice annotations after the pipeline ran through. To do this, after downloading and unpacking the precomputed absplice scores from [zenodo](https://zenodo.org/records/10781457), and running the annotation pipeline without absplice, users can run `deeprvat_annotations aggregate_and_concat_absplice` inside the `deeprvat_annotations` environment with `--absplice-dir` pointing to the unpacked directory containing the absplice scores and `--ab-splice-agg-score-file` pointing to the path where the intermediate aggregated absplice scores should be saved. +Run `deeprvat_annotations merge_absplice_scores` afterwards with the annotations, variants and `protein_coding_genes.parquet` as well as the aggregated absplice scores as input. Finally state where you want the output annotation file stored (`--output`). ## References diff --git a/tests/annotations/test_annotations.py b/tests/annotations/test_annotations.py index 068d6343..5b50417d 100644 --- a/tests/annotations/test_annotations.py +++ b/tests/annotations/test_annotations.py @@ -819,3 +819,115 @@ def test_compute_plof(test_data_name_dir, annotations_in, expected, tmp_path): assert_frame_equal( written_results, expected_results[written_results.columns], check_exact=False ) + + +@pytest.mark.parametrize( + "test_data_name_dir, absplice_dir, expected", + [ + ( + "aggregate_abplice_manually_medium", + "absplice_dir", + "aggregated_absplice_scores_expected.parquet", + ), + ], +) +def test_aggregate_absplice_manually( + test_data_name_dir, absplice_dir, expected, tmp_path +): + current_test_data_dir = ( + tests_data_dir / "aggregate_absplice_manually" / test_data_name_dir + ) + absplice_dir_path = current_test_data_dir / "input" / absplice_dir + expected_path = current_test_data_dir / "expected" / expected + output_path = tmp_path / "out.parquet" + cli_runner = CliRunner() + cli_parameters = [ + "aggregate-and-concat-absplice", + "--absplice-dir", + absplice_dir_path.as_posix(), + "--ab-splice-agg-score-file", + output_path.as_posix(), + ] + result = cli_runner.invoke(annotations_cli, cli_parameters, catch_exceptions=False) + assert result.exit_code == 0 + written_results = pd.read_parquet(output_path) + expected_results = pd.read_parquet(expected_path) + assert written_results.shape == expected_results.shape + assert_frame_equal( + written_results.sort_values( + by=["chrom", "pos", "ref", "alt", "gene_id"] + ).reset_index(drop=True), + expected_results[written_results.columns] + .sort_values(by=["chrom", "pos", "ref", "alt", "gene_id"]) + .reset_index(drop=True), + check_exact=False, + ) + + +@pytest.mark.parametrize( + "test_data_name_dir, annotations_in, variants_in, genes_in, absplice_in, expected", + [ + ( + "merge_absplice_scores_medium", + "annotations.parquet", + "variants.parquet", + "protein_coding_genes.parquet", + "aggregated_absplice_scores.parquet", + "expected.parquet", + ) + ], +) +def test_merge_absplice_scores_manually( + test_data_name_dir, + annotations_in, + variants_in, + genes_in, + absplice_in, + expected, + tmp_path, +): + current_test_data_dir = ( + tests_data_dir / "merge_absplice_scores_manually" / test_data_name_dir + ) + annotations_in_path = current_test_data_dir / "input" / annotations_in + variants_in_path = current_test_data_dir / "input" / variants_in + genes_in_path = current_test_data_dir / "input" / genes_in + splice_in_path = current_test_data_dir / "input" / absplice_in + expected_path = current_test_data_dir / "expected" / expected + output_path = tmp_path / "splice_anno.parquet" + + cli_runner = CliRunner() + cli_parameters = [ + "merge-absplice-scores", + "--annotations", + annotations_in_path.as_posix(), + "--variants", + variants_in_path.as_posix(), + "--genes", + genes_in_path.as_posix(), + "--ab-splice-agg-score-file", + splice_in_path.as_posix(), + "--output", + output_path.as_posix(), + "--verbose", + ] + + # Add '--fill-splice-nan' if testing the fill_splice_merge case + if "fill_splice" in test_data_name_dir: + cli_parameters.extend(["--fill-splice-nan", "--fill-value", "0"]) + + result = cli_runner.invoke(annotations_cli, cli_parameters, catch_exceptions=False) + + assert result.exit_code == 0, f"CLI failed: {result.output}" + + written_results = pd.read_parquet(output_path) + expected_results = pd.read_parquet(expected_path) + + assert written_results.shape == expected_results.shape, "Shapes mismatch" + assert_frame_equal( + written_results.sort_values(by=["id"]).reset_index(drop=True), + expected_results[written_results.columns] + .sort_values(by=["id"]) + .reset_index(drop=True), + check_exact=False, + ) diff --git a/tests/annotations/test_data/aggregate_absplice_manually/aggregate_abplice_manually_medium/expected/aggregated_absplice_scores_expected.parquet b/tests/annotations/test_data/aggregate_absplice_manually/aggregate_abplice_manually_medium/expected/aggregated_absplice_scores_expected.parquet new file mode 100644 index 00000000..a1260afe Binary files /dev/null and b/tests/annotations/test_data/aggregate_absplice_manually/aggregate_abplice_manually_medium/expected/aggregated_absplice_scores_expected.parquet differ diff --git a/tests/annotations/test_data/aggregate_absplice_manually/aggregate_abplice_manually_medium/input/absplice_dir/ENSG00000132182.tsv.gz b/tests/annotations/test_data/aggregate_absplice_manually/aggregate_abplice_manually_medium/input/absplice_dir/ENSG00000132182.tsv.gz new file mode 100644 index 00000000..5452af41 Binary files /dev/null and b/tests/annotations/test_data/aggregate_absplice_manually/aggregate_abplice_manually_medium/input/absplice_dir/ENSG00000132182.tsv.gz differ diff --git a/tests/annotations/test_data/aggregate_absplice_manually/aggregate_abplice_manually_medium/input/absplice_dir/ENSG00000134086.tsv.gz b/tests/annotations/test_data/aggregate_absplice_manually/aggregate_abplice_manually_medium/input/absplice_dir/ENSG00000134086.tsv.gz new file mode 100644 index 00000000..976bf274 Binary files /dev/null and b/tests/annotations/test_data/aggregate_absplice_manually/aggregate_abplice_manually_medium/input/absplice_dir/ENSG00000134086.tsv.gz differ diff --git a/tests/annotations/test_data/aggregate_absplice_manually/aggregate_abplice_manually_medium/input/absplice_dir/ENSG00000138688.tsv.gz b/tests/annotations/test_data/aggregate_absplice_manually/aggregate_abplice_manually_medium/input/absplice_dir/ENSG00000138688.tsv.gz new file mode 100644 index 00000000..e3741c62 Binary files /dev/null and b/tests/annotations/test_data/aggregate_absplice_manually/aggregate_abplice_manually_medium/input/absplice_dir/ENSG00000138688.tsv.gz differ diff --git a/tests/annotations/test_data/aggregate_absplice_manually/aggregate_abplice_manually_medium/input/absplice_dir/ENSG00000138758.tsv.gz b/tests/annotations/test_data/aggregate_absplice_manually/aggregate_abplice_manually_medium/input/absplice_dir/ENSG00000138758.tsv.gz new file mode 100644 index 00000000..4ed05415 Binary files /dev/null and b/tests/annotations/test_data/aggregate_absplice_manually/aggregate_abplice_manually_medium/input/absplice_dir/ENSG00000138758.tsv.gz differ diff --git a/tests/annotations/test_data/aggregate_absplice_manually/aggregate_abplice_manually_medium/input/absplice_dir/ENSG00000145439.tsv.gz b/tests/annotations/test_data/aggregate_absplice_manually/aggregate_abplice_manually_medium/input/absplice_dir/ENSG00000145439.tsv.gz new file mode 100644 index 00000000..75a0f72e Binary files /dev/null and b/tests/annotations/test_data/aggregate_absplice_manually/aggregate_abplice_manually_medium/input/absplice_dir/ENSG00000145439.tsv.gz differ diff --git a/tests/annotations/test_data/aggregate_absplice_manually/aggregate_abplice_manually_medium/input/absplice_dir/ENSG00000163629.tsv.gz b/tests/annotations/test_data/aggregate_absplice_manually/aggregate_abplice_manually_medium/input/absplice_dir/ENSG00000163629.tsv.gz new file mode 100644 index 00000000..72d36de2 Binary files /dev/null and b/tests/annotations/test_data/aggregate_absplice_manually/aggregate_abplice_manually_medium/input/absplice_dir/ENSG00000163629.tsv.gz differ diff --git a/tests/annotations/test_data/aggregate_absplice_manually/aggregate_abplice_manually_medium/input/absplice_dir/ENSG00000171557.tsv.gz b/tests/annotations/test_data/aggregate_absplice_manually/aggregate_abplice_manually_medium/input/absplice_dir/ENSG00000171557.tsv.gz new file mode 100644 index 00000000..96ccb522 Binary files /dev/null and b/tests/annotations/test_data/aggregate_absplice_manually/aggregate_abplice_manually_medium/input/absplice_dir/ENSG00000171557.tsv.gz differ diff --git a/tests/annotations/test_data/aggregate_absplice_manually/aggregate_abplice_manually_medium/input/absplice_dir/ENSG00000173706.tsv.gz b/tests/annotations/test_data/aggregate_absplice_manually/aggregate_abplice_manually_medium/input/absplice_dir/ENSG00000173706.tsv.gz new file mode 100644 index 00000000..61d01563 Binary files /dev/null and b/tests/annotations/test_data/aggregate_absplice_manually/aggregate_abplice_manually_medium/input/absplice_dir/ENSG00000173706.tsv.gz differ diff --git a/tests/annotations/test_data/aggregate_absplice_manually/aggregate_abplice_manually_medium/input/absplice_dir/ENSG00000175110.tsv.gz b/tests/annotations/test_data/aggregate_absplice_manually/aggregate_abplice_manually_medium/input/absplice_dir/ENSG00000175110.tsv.gz new file mode 100644 index 00000000..627dc16e Binary files /dev/null and b/tests/annotations/test_data/aggregate_absplice_manually/aggregate_abplice_manually_medium/input/absplice_dir/ENSG00000175110.tsv.gz differ diff --git a/tests/annotations/test_data/merge_absplice_scores_manually/merge_absplice_scores_medium/expected/expected.parquet b/tests/annotations/test_data/merge_absplice_scores_manually/merge_absplice_scores_medium/expected/expected.parquet new file mode 100644 index 00000000..a79ceaf3 Binary files /dev/null and b/tests/annotations/test_data/merge_absplice_scores_manually/merge_absplice_scores_medium/expected/expected.parquet differ diff --git a/tests/annotations/test_data/merge_absplice_scores_manually/merge_absplice_scores_medium/input/aggregated_absplice_scores.parquet b/tests/annotations/test_data/merge_absplice_scores_manually/merge_absplice_scores_medium/input/aggregated_absplice_scores.parquet new file mode 100644 index 00000000..a1260afe Binary files /dev/null and b/tests/annotations/test_data/merge_absplice_scores_manually/merge_absplice_scores_medium/input/aggregated_absplice_scores.parquet differ diff --git a/tests/annotations/test_data/merge_absplice_scores_manually/merge_absplice_scores_medium/input/annotations.parquet b/tests/annotations/test_data/merge_absplice_scores_manually/merge_absplice_scores_medium/input/annotations.parquet new file mode 100644 index 00000000..cf3bad1a Binary files /dev/null and b/tests/annotations/test_data/merge_absplice_scores_manually/merge_absplice_scores_medium/input/annotations.parquet differ diff --git a/tests/annotations/test_data/merge_absplice_scores_manually/merge_absplice_scores_medium/input/protein_coding_genes.parquet b/tests/annotations/test_data/merge_absplice_scores_manually/merge_absplice_scores_medium/input/protein_coding_genes.parquet new file mode 100644 index 00000000..03e864cb Binary files /dev/null and b/tests/annotations/test_data/merge_absplice_scores_manually/merge_absplice_scores_medium/input/protein_coding_genes.parquet differ diff --git a/tests/annotations/test_data/merge_absplice_scores_manually/merge_absplice_scores_medium/input/variants.parquet b/tests/annotations/test_data/merge_absplice_scores_manually/merge_absplice_scores_medium/input/variants.parquet new file mode 100644 index 00000000..37d317c4 Binary files /dev/null and b/tests/annotations/test_data/merge_absplice_scores_manually/merge_absplice_scores_medium/input/variants.parquet differ