diff --git a/instanovo/scripts/convert_to_sdf.py b/instanovo/scripts/convert_to_sdf.py index 8c8ac9cd..be7659cc 100644 --- a/instanovo/scripts/convert_to_sdf.py +++ b/instanovo/scripts/convert_to_sdf.py @@ -44,26 +44,62 @@ def convert( shard_size: Annotated[int, typer.Option(help="Length of saved data shards")] = 1_000_000, is_annotated: Annotated[bool, typer.Option("--is-annotated", help="whether dataset is annotated")] = False, add_spectrum_id: Annotated[bool, typer.Option("--add-spectrum-id", help="Add spectrum id column")] = False, + include_ms1: Annotated[ + bool, + typer.Option( + "--include-ms1", help="Include MS1 scans alongside MS2. Adds ms_level column (1=MS1, 2=MS2). Precursor columns are empty for MS1 rows." + ), + ] = False, ) -> None: """Convert data to SpectrumDataFrame and save as *.parquet file(s).""" from instanovo.utils.data_handler import SpectrumDataFrame logging.basicConfig(level=logging.INFO) - logger.info(f"Loading {source}") - sdf = SpectrumDataFrame.load( - source, - is_annotated=is_annotated, - name=name, - partition=partition.value, - max_shard_size=shard_size, - lazy=True, - add_spectrum_id=add_spectrum_id, - ) + ms_label = " (MS1+MS2)" if include_ms1 else " (MS2 only)" + + logger.info(f"Loading {source}{ms_label}") + + if include_ms1: + ms_levels: list[int] = [1, 2] + # Direct load with MS1+MS2 support — only works for mzML/mzXML files + source_lower = source.lower() + if source_lower.endswith(".mzml"): + sdf = SpectrumDataFrame.load_mzml(source, ms_levels=ms_levels) + elif source_lower.endswith(".mzxml"): + from instanovo.utils.msreader import read_mzxml + + sdf = SpectrumDataFrame.from_polars(SpectrumDataFrame._df_from_dict(read_mzxml(source, ms_levels=ms_levels))) + else: + logger.warning("--include-ms1 only applies to mzML/mzXML files. Ignoring flag.") + sdf = SpectrumDataFrame.load( + source, + is_annotated=is_annotated, + name=name, + partition=partition.value, + max_shard_size=shard_size, + lazy=True, + add_spectrum_id=add_spectrum_id, + ) + else: + sdf = SpectrumDataFrame.load( + source, + is_annotated=is_annotated, + name=name, + partition=partition.value, + max_shard_size=shard_size, + lazy=True, + add_spectrum_id=add_spectrum_id, + ) + logger.info(f"Loaded {len(sdf):,d} rows") - logger.info(f"Filtering max_charge <= {max_charge}") - sdf.filter_rows(lambda row: row["precursor_charge"] <= max_charge) + if include_ms1: + # Filter MS2 rows by charge but keep all MS1 rows + logger.info(f"Filtering MS2 rows with max_charge <= {max_charge} (keeping all MS1 rows)") + else: + logger.info(f"Filtering max_charge <= {max_charge}") + sdf.filter_rows(lambda row: row["precursor_charge"] <= max_charge) logger.info(f"Saving {len(sdf):,d} rows to {target}") sdf.save( diff --git a/instanovo/utils/data_handler.py b/instanovo/utils/data_handler.py index 624b1cf1..82ca4e46 100644 --- a/instanovo/utils/data_handler.py +++ b/instanovo/utils/data_handler.py @@ -1243,33 +1243,32 @@ def _df_from_mzxml(source: str) -> pl.DataFrame: return SpectrumDataFrame._df_from_dict(read_mzxml(source)) @classmethod - def load_mzml(cls, source: str) -> "SpectrumDataFrame": + def load_mzml(cls, source: str, ms_levels: list[int] | None = None) -> "SpectrumDataFrame": """Load a SpectrumDataFrame from an mzML file. Args: source (str): Path to the mzML file. + ms_levels (list[int] | None): MS levels to extract. Default [2] for backward + compatibility. Use [1, 2] to extract both MS1 and MS2 scans. Returns: SpectrumDataFrame: The loaded SpectrumDataFrame. """ - # spectra = list(load_from_mzml(source)) - # return cls.from_matchms(spectra) - df = SpectrumDataFrame._df_from_dict(read_mzml(source)) + df = SpectrumDataFrame._df_from_dict(read_mzml(source, ms_levels=ms_levels)) return cls.from_polars(df) @staticmethod - def _df_from_mzml(source: str) -> pl.DataFrame: - """Load a polars DataFrame from an MGF file. + def _df_from_mzml(source: str, ms_levels: list[int] | None = None) -> pl.DataFrame: + """Load a polars DataFrame from an mzML file. Args: - source (str): Path to the MGF file. + source (str): Path to the mzML file. + ms_levels (list[int] | None): MS levels to extract. Returns: pl.DataFrame: The loaded polars DataFrame. """ - # spectra = list(load_from_mzml(source)) - # return SpectrumDataFrame._df_from_matchms(spectra) - return SpectrumDataFrame._df_from_dict(read_mzml(source)) + return SpectrumDataFrame._df_from_dict(read_mzml(source, ms_levels=ms_levels)) @classmethod def from_huggingface( @@ -1427,25 +1426,48 @@ def _parse_scan_number(scan_number: str, index: int) -> int | None: @staticmethod def _df_from_dict(data: dict[str, Any]) -> pl.DataFrame: - df = pl.DataFrame( - { - "scan_number": pl.Series( - [SpectrumDataFrame._parse_scan_number(str(x), i) for i, x in enumerate(data["scan_number"])], - dtype=pl.Int64, - ), - ANNOTATED_COLUMN: pl.Series(data["sequence"], dtype=pl.Utf8), - # Calculate precursor mass - MSColumns.PRECURSOR_MASS.value: pl.Series( - np.array(data["precursor_mz"]) * np.array(data["precursor_charge"]) - np.array(data["precursor_charge"]) * PROTON_MASS_AMU, - dtype=MS_TYPES[MSColumns.PRECURSOR_MASS], - ), - MSColumns.PRECURSOR_MZ.value: pl.Series(data["precursor_mz"], dtype=MS_TYPES[MSColumns.PRECURSOR_MZ]), - MSColumns.PRECURSOR_CHARGE.value: pl.Series(data["precursor_charge"], dtype=MS_TYPES[MSColumns.PRECURSOR_CHARGE]), - MSColumns.RETENTION_TIME.value: pl.Series(data["retention_time"], dtype=MS_TYPES[MSColumns.RETENTION_TIME]), - MSColumns.MZ_ARRAY.value: pl.Series(data["mz_array"], dtype=MS_TYPES[MSColumns.MZ_ARRAY]), - MSColumns.INTENSITY_ARRAY.value: pl.Series(data["intensity_array"], dtype=MS_TYPES[MSColumns.INTENSITY_ARRAY]), - } - ) + # Build precursor columns, handling None values from MS1 scans + precursor_mz_raw = data["precursor_mz"] + precursor_charge_raw = data["precursor_charge"] + + # Compute precursor mass: mz * charge - charge * proton_mass + # None values (MS1 rows) stay as None + precursor_mass = [ + (mz * charge - charge * PROTON_MASS_AMU) if (mz is not None and charge is not None) else None + for mz, charge in zip(precursor_mz_raw, precursor_charge_raw, strict=False) + ] + + columns: dict[str, Any] = { + "scan_number": pl.Series( + [SpectrumDataFrame._parse_scan_number(str(x), i) for i, x in enumerate(data["scan_number"])], + dtype=pl.Int64, + ), + ANNOTATED_COLUMN: pl.Series(data["sequence"], dtype=pl.Utf8), + MSColumns.PRECURSOR_MASS.value: pl.Series(precursor_mass, dtype=MS_TYPES[MSColumns.PRECURSOR_MASS]), + MSColumns.PRECURSOR_MZ.value: pl.Series(precursor_mz_raw, dtype=MS_TYPES[MSColumns.PRECURSOR_MZ]), + MSColumns.PRECURSOR_CHARGE.value: pl.Series( + [int(x) if x is not None else None for x in precursor_charge_raw], + dtype=MS_TYPES[MSColumns.PRECURSOR_CHARGE], + ), + MSColumns.RETENTION_TIME.value: pl.Series(data["retention_time"], dtype=MS_TYPES[MSColumns.RETENTION_TIME]), + MSColumns.MZ_ARRAY.value: pl.Series(data["mz_array"], dtype=MS_TYPES[MSColumns.MZ_ARRAY]), + MSColumns.INTENSITY_ARRAY.value: pl.Series(data["intensity_array"], dtype=MS_TYPES[MSColumns.INTENSITY_ARRAY]), + } + + # Include ms_level column if present (from MS1+MS2 extraction) + if "ms_level" in data and data["ms_level"]: + columns["ms_level"] = pl.Series(data["ms_level"], dtype=pl.Int32) + + # Include experiment_name if present + if "experiment_name" in data and data["experiment_name"]: + columns["experiment_name"] = pl.Series(data["experiment_name"], dtype=pl.Utf8) + + df = pl.DataFrame(columns) + + # Construct spectrum_id from experiment_name:scan_number if not already present + if "experiment_name" in df.columns and "spectrum_id" not in df.columns: + df = df.with_columns((pl.col("experiment_name") + ":" + pl.col("scan_number").cast(pl.Utf8)).alias("spectrum_id")) + return df @staticmethod diff --git a/instanovo/utils/msreader.py b/instanovo/utils/msreader.py index ef454ac6..5669a2ca 100644 --- a/instanovo/utils/msreader.py +++ b/instanovo/utils/msreader.py @@ -1,3 +1,4 @@ +from pathlib import Path from typing import Any from pyteomics import mgf, mzml, mzxml @@ -7,11 +8,14 @@ # Unused def read_mgf(file_path: str) -> dict[str, list[Any]]: """Read an mgf file and return a data dict.""" + experiment_name = Path(file_path).stem data = _initialize_data_dict() with mgf.read(file_path, index_by_scans=True) as reader: for spectrum in reader: + data["ms_level"].append(2) # MGF files contain MS2 spectra only data["scan_number"].append(spectrum.get("params", {}).get("title", "")) + data["experiment_name"].append(experiment_name) data["sequence"].append(spectrum.get("params", {}).get("seq", "")) data["precursor_mz"].append(spectrum.get("params", {}).get("pepmass", [0])[0]) data["precursor_charge"].append(spectrum.get("params", {}).get("charge", [0])[0]) @@ -24,8 +28,19 @@ def read_mgf(file_path: str) -> dict[str, list[Any]]: def read_mzml( file_path: str, + ms_levels: list[int] | None = None, ) -> dict[str, list[Any]]: - """Read an mzml file and return a data dict.""" + """Read an mzml file and return a data dict. + + Args: + file_path: Path to the mzML file. + ms_levels: List of MS levels to extract. Default [2] for backward compatibility. + Use [1, 2] to extract both MS1 and MS2 scans. + """ + if ms_levels is None: + ms_levels = [2] + + experiment_name = Path(file_path).stem data = _initialize_data_dict() ms_vocab = { @@ -41,47 +56,78 @@ def read_mzml( with mzml.read(file_path) as reader: for spectrum in reader: spectrum_dict = cvquery(spectrum) - if spectrum_dict.get(ms_vocab["ms_level"]) == 2: # Ensure it's an MS2 spectrum - data["scan_number"].append(spectrum.get("id", "")) - + current_ms_level = spectrum_dict.get(ms_vocab["ms_level"]) + if current_ms_level not in ms_levels: + continue + + data["ms_level"].append(int(current_ms_level)) + data["scan_number"].append(spectrum.get("id", "")) + data["experiment_name"].append(experiment_name) + data["retention_time"].append(spectrum_dict.get(ms_vocab["retention_time"])) + data["mz_array"].append(list(spectrum_dict.get(ms_vocab["mz_array"]))) + data["intensity_array"].append(list(spectrum_dict.get(ms_vocab["intensity_array"]))) + + if current_ms_level == 2: data["sequence"].append(spectrum_dict.get(ms_vocab["sequence"], "")) - - # Find the first matching precursor mz term pre_mz_key = next( (key for key in ms_vocab["precursor_mz"] if key in spectrum_dict), "", ) data["precursor_mz"].append(spectrum_dict.get(pre_mz_key, 0)) data["precursor_charge"].append(spectrum_dict.get(ms_vocab["precursor_charge"], 0)) - data["retention_time"].append(spectrum_dict.get(ms_vocab["retention_time"])) - data["mz_array"].append(list(spectrum_dict.get(ms_vocab["mz_array"]))) - data["intensity_array"].append(list(spectrum_dict.get(ms_vocab["intensity_array"]))) + else: + # MS1 scans: precursor info not applicable + data["sequence"].append("") + data["precursor_mz"].append(None) + data["precursor_charge"].append(None) return data -def read_mzxml(file_path: str) -> dict[str, list[Any]]: - """Read an mzxml file and return a data dict.""" +def read_mzxml(file_path: str, ms_levels: list[int] | None = None) -> dict[str, list[Any]]: + """Read an mzxml file and return a data dict. + + Args: + file_path: Path to the mzXML file. + ms_levels: List of MS levels to extract. Default [2] for backward compatibility. + """ + if ms_levels is None: + ms_levels = [2] + + experiment_name = Path(file_path).stem data = _initialize_data_dict() with mzxml.read(file_path) as reader: for spectrum in reader: - if spectrum.get("msLevel", 0) == 2: # Ensure it's an MS2 spectrum - data["scan_number"].append(spectrum.get("num", "")) + current_ms_level = spectrum.get("msLevel", 0) + if current_ms_level not in ms_levels: + continue + + data["ms_level"].append(int(current_ms_level)) + data["scan_number"].append(spectrum.get("num", "")) + data["experiment_name"].append(experiment_name) + data["retention_time"].append(spectrum.get("retentionTime")) + data["mz_array"].append(list(spectrum.get("m/z array"))) + data["intensity_array"].append(list(spectrum.get("intensity array"))) + + if current_ms_level == 2: data["sequence"].append(spectrum.get("peptide", "")) precursor = spectrum.get("precursorMz", [{}])[0] data["precursor_mz"].append(precursor.get("precursorMz", 0)) data["precursor_charge"].append(precursor.get("precursorCharge", 0)) - data["retention_time"].append(spectrum.get("retentionTime")) - data["mz_array"].append(list(spectrum.get("m/z array"))) - data["intensity_array"].append(list(spectrum.get("intensity array"))) + else: + data["sequence"].append("") + data["precursor_mz"].append(None) + data["precursor_charge"].append(None) return data def _initialize_data_dict() -> dict[str, list[Any]]: return { + "ms_level": [], "scan_number": [], + "experiment_name": [], "sequence": [], "precursor_mz": [], "precursor_charge": [], diff --git a/tests/unit_test/utils_test/data_handler_test.py b/tests/unit_test/utils_test/data_handler_test.py index cb94e4ab..3e692670 100644 --- a/tests/unit_test/utils_test/data_handler_test.py +++ b/tests/unit_test/utils_test/data_handler_test.py @@ -356,6 +356,7 @@ def test_mzml_to_parquet(dir_paths: tuple[str, str], tmp_path: Any) -> None: "retention_time": [nan], "mz_array": [array([10.0, 20.0, 30.0, 40.0])], "intensity_array": [array([1.0, 1.5, 1.0, 1.5])], + "ms_level": pd.array([2], dtype="int32"), "experiment_name": ["example"], "spectrum_id": ["example:1"], } @@ -443,6 +444,7 @@ def test_mzxml_to_parquet(dir_paths: tuple[str, str], tmp_path: Any) -> None: "retention_time": [nan], "mz_array": [array([10.0, 30.0])], "intensity_array": [array([20.0, 40.0])], + "ms_level": pd.array([2], dtype="int32"), "experiment_name": ["example"], "spectrum_id": ["example:1"], }