Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 48 additions & 12 deletions instanovo/scripts/convert_to_sdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,26 +44,62 @@ def convert(
shard_size: Annotated[int, typer.Option(help="Length of saved data shards")] = 1_000_000,
is_annotated: Annotated[bool, typer.Option("--is-annotated", help="whether dataset is annotated")] = False,
add_spectrum_id: Annotated[bool, typer.Option("--add-spectrum-id", help="Add spectrum id column")] = False,
include_ms1: Annotated[
bool,
typer.Option(
"--include-ms1", help="Include MS1 scans alongside MS2. Adds ms_level column (1=MS1, 2=MS2). Precursor columns are empty for MS1 rows."
),
] = False,
) -> None:
"""Convert data to SpectrumDataFrame and save as *.parquet file(s)."""
from instanovo.utils.data_handler import SpectrumDataFrame

logging.basicConfig(level=logging.INFO)

logger.info(f"Loading {source}")
sdf = SpectrumDataFrame.load(
source,
is_annotated=is_annotated,
name=name,
partition=partition.value,
max_shard_size=shard_size,
lazy=True,
add_spectrum_id=add_spectrum_id,
)
ms_label = " (MS1+MS2)" if include_ms1 else " (MS2 only)"

logger.info(f"Loading {source}{ms_label}")

if include_ms1:
ms_levels: list[int] = [1, 2]
# Direct load with MS1+MS2 support — only works for mzML/mzXML files
source_lower = source.lower()
if source_lower.endswith(".mzml"):
sdf = SpectrumDataFrame.load_mzml(source, ms_levels=ms_levels)
elif source_lower.endswith(".mzxml"):
from instanovo.utils.msreader import read_mzxml

sdf = SpectrumDataFrame.from_polars(SpectrumDataFrame._df_from_dict(read_mzxml(source, ms_levels=ms_levels)))
else:
logger.warning("--include-ms1 only applies to mzML/mzXML files. Ignoring flag.")
sdf = SpectrumDataFrame.load(
source,
is_annotated=is_annotated,
name=name,
partition=partition.value,
max_shard_size=shard_size,
lazy=True,
add_spectrum_id=add_spectrum_id,
)
else:
sdf = SpectrumDataFrame.load(
source,
is_annotated=is_annotated,
name=name,
partition=partition.value,
max_shard_size=shard_size,
lazy=True,
add_spectrum_id=add_spectrum_id,
)

logger.info(f"Loaded {len(sdf):,d} rows")

logger.info(f"Filtering max_charge <= {max_charge}")
sdf.filter_rows(lambda row: row["precursor_charge"] <= max_charge)
if include_ms1:
# Filter MS2 rows by charge but keep all MS1 rows
logger.info(f"Filtering MS2 rows with max_charge <= {max_charge} (keeping all MS1 rows)")
else:
logger.info(f"Filtering max_charge <= {max_charge}")
sdf.filter_rows(lambda row: row["precursor_charge"] <= max_charge)

logger.info(f"Saving {len(sdf):,d} rows to {target}")
sdf.save(
Expand Down
80 changes: 51 additions & 29 deletions instanovo/utils/data_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1243,33 +1243,32 @@ def _df_from_mzxml(source: str) -> pl.DataFrame:
return SpectrumDataFrame._df_from_dict(read_mzxml(source))

@classmethod
def load_mzml(cls, source: str) -> "SpectrumDataFrame":
def load_mzml(cls, source: str, ms_levels: list[int] | None = None) -> "SpectrumDataFrame":
"""Load a SpectrumDataFrame from an mzML file.

Args:
source (str): Path to the mzML file.
ms_levels (list[int] | None): MS levels to extract. Default [2] for backward
compatibility. Use [1, 2] to extract both MS1 and MS2 scans.

Returns:
SpectrumDataFrame: The loaded SpectrumDataFrame.
"""
# spectra = list(load_from_mzml(source))
# return cls.from_matchms(spectra)
df = SpectrumDataFrame._df_from_dict(read_mzml(source))
df = SpectrumDataFrame._df_from_dict(read_mzml(source, ms_levels=ms_levels))
return cls.from_polars(df)

@staticmethod
def _df_from_mzml(source: str) -> pl.DataFrame:
"""Load a polars DataFrame from an MGF file.
def _df_from_mzml(source: str, ms_levels: list[int] | None = None) -> pl.DataFrame:
"""Load a polars DataFrame from an mzML file.

Args:
source (str): Path to the MGF file.
source (str): Path to the mzML file.
ms_levels (list[int] | None): MS levels to extract.

Returns:
pl.DataFrame: The loaded polars DataFrame.
"""
# spectra = list(load_from_mzml(source))
# return SpectrumDataFrame._df_from_matchms(spectra)
return SpectrumDataFrame._df_from_dict(read_mzml(source))
return SpectrumDataFrame._df_from_dict(read_mzml(source, ms_levels=ms_levels))

@classmethod
def from_huggingface(
Expand Down Expand Up @@ -1427,25 +1426,48 @@ def _parse_scan_number(scan_number: str, index: int) -> int | None:

@staticmethod
def _df_from_dict(data: dict[str, Any]) -> pl.DataFrame:
df = pl.DataFrame(
{
"scan_number": pl.Series(
[SpectrumDataFrame._parse_scan_number(str(x), i) for i, x in enumerate(data["scan_number"])],
dtype=pl.Int64,
),
ANNOTATED_COLUMN: pl.Series(data["sequence"], dtype=pl.Utf8),
# Calculate precursor mass
MSColumns.PRECURSOR_MASS.value: pl.Series(
np.array(data["precursor_mz"]) * np.array(data["precursor_charge"]) - np.array(data["precursor_charge"]) * PROTON_MASS_AMU,
dtype=MS_TYPES[MSColumns.PRECURSOR_MASS],
),
MSColumns.PRECURSOR_MZ.value: pl.Series(data["precursor_mz"], dtype=MS_TYPES[MSColumns.PRECURSOR_MZ]),
MSColumns.PRECURSOR_CHARGE.value: pl.Series(data["precursor_charge"], dtype=MS_TYPES[MSColumns.PRECURSOR_CHARGE]),
MSColumns.RETENTION_TIME.value: pl.Series(data["retention_time"], dtype=MS_TYPES[MSColumns.RETENTION_TIME]),
MSColumns.MZ_ARRAY.value: pl.Series(data["mz_array"], dtype=MS_TYPES[MSColumns.MZ_ARRAY]),
MSColumns.INTENSITY_ARRAY.value: pl.Series(data["intensity_array"], dtype=MS_TYPES[MSColumns.INTENSITY_ARRAY]),
}
)
# Build precursor columns, handling None values from MS1 scans
precursor_mz_raw = data["precursor_mz"]
precursor_charge_raw = data["precursor_charge"]

# Compute precursor mass: mz * charge - charge * proton_mass
# None values (MS1 rows) stay as None
precursor_mass = [
(mz * charge - charge * PROTON_MASS_AMU) if (mz is not None and charge is not None) else None
for mz, charge in zip(precursor_mz_raw, precursor_charge_raw, strict=False)
]

columns: dict[str, Any] = {
"scan_number": pl.Series(
[SpectrumDataFrame._parse_scan_number(str(x), i) for i, x in enumerate(data["scan_number"])],
dtype=pl.Int64,
),
ANNOTATED_COLUMN: pl.Series(data["sequence"], dtype=pl.Utf8),
MSColumns.PRECURSOR_MASS.value: pl.Series(precursor_mass, dtype=MS_TYPES[MSColumns.PRECURSOR_MASS]),
MSColumns.PRECURSOR_MZ.value: pl.Series(precursor_mz_raw, dtype=MS_TYPES[MSColumns.PRECURSOR_MZ]),
MSColumns.PRECURSOR_CHARGE.value: pl.Series(
[int(x) if x is not None else None for x in precursor_charge_raw],
dtype=MS_TYPES[MSColumns.PRECURSOR_CHARGE],
),
MSColumns.RETENTION_TIME.value: pl.Series(data["retention_time"], dtype=MS_TYPES[MSColumns.RETENTION_TIME]),
MSColumns.MZ_ARRAY.value: pl.Series(data["mz_array"], dtype=MS_TYPES[MSColumns.MZ_ARRAY]),
MSColumns.INTENSITY_ARRAY.value: pl.Series(data["intensity_array"], dtype=MS_TYPES[MSColumns.INTENSITY_ARRAY]),
}

# Include ms_level column if present (from MS1+MS2 extraction)
if "ms_level" in data and data["ms_level"]:
columns["ms_level"] = pl.Series(data["ms_level"], dtype=pl.Int32)

# Include experiment_name if present
if "experiment_name" in data and data["experiment_name"]:
columns["experiment_name"] = pl.Series(data["experiment_name"], dtype=pl.Utf8)

df = pl.DataFrame(columns)

# Construct spectrum_id from experiment_name:scan_number if not already present
if "experiment_name" in df.columns and "spectrum_id" not in df.columns:
df = df.with_columns((pl.col("experiment_name") + ":" + pl.col("scan_number").cast(pl.Utf8)).alias("spectrum_id"))

return df

@staticmethod
Expand Down
78 changes: 62 additions & 16 deletions instanovo/utils/msreader.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from pathlib import Path
from typing import Any

from pyteomics import mgf, mzml, mzxml
Expand All @@ -7,11 +8,14 @@
# Unused
def read_mgf(file_path: str) -> dict[str, list[Any]]:
"""Read an mgf file and return a data dict."""
experiment_name = Path(file_path).stem
data = _initialize_data_dict()

with mgf.read(file_path, index_by_scans=True) as reader:
for spectrum in reader:
data["ms_level"].append(2) # MGF files contain MS2 spectra only
data["scan_number"].append(spectrum.get("params", {}).get("title", ""))
data["experiment_name"].append(experiment_name)
data["sequence"].append(spectrum.get("params", {}).get("seq", ""))
data["precursor_mz"].append(spectrum.get("params", {}).get("pepmass", [0])[0])
data["precursor_charge"].append(spectrum.get("params", {}).get("charge", [0])[0])
Expand All @@ -24,8 +28,19 @@ def read_mgf(file_path: str) -> dict[str, list[Any]]:

def read_mzml(
file_path: str,
ms_levels: list[int] | None = None,
) -> dict[str, list[Any]]:
"""Read an mzml file and return a data dict."""
"""Read an mzml file and return a data dict.

Args:
file_path: Path to the mzML file.
ms_levels: List of MS levels to extract. Default [2] for backward compatibility.
Use [1, 2] to extract both MS1 and MS2 scans.
"""
if ms_levels is None:
ms_levels = [2]

experiment_name = Path(file_path).stem
data = _initialize_data_dict()

ms_vocab = {
Expand All @@ -41,47 +56,78 @@ def read_mzml(
with mzml.read(file_path) as reader:
for spectrum in reader:
spectrum_dict = cvquery(spectrum)
if spectrum_dict.get(ms_vocab["ms_level"]) == 2: # Ensure it's an MS2 spectrum
data["scan_number"].append(spectrum.get("id", ""))

current_ms_level = spectrum_dict.get(ms_vocab["ms_level"])
if current_ms_level not in ms_levels:
continue

data["ms_level"].append(int(current_ms_level))
data["scan_number"].append(spectrum.get("id", ""))
data["experiment_name"].append(experiment_name)
data["retention_time"].append(spectrum_dict.get(ms_vocab["retention_time"]))
data["mz_array"].append(list(spectrum_dict.get(ms_vocab["mz_array"])))
data["intensity_array"].append(list(spectrum_dict.get(ms_vocab["intensity_array"])))

if current_ms_level == 2:
data["sequence"].append(spectrum_dict.get(ms_vocab["sequence"], ""))

# Find the first matching precursor mz term
pre_mz_key = next(
(key for key in ms_vocab["precursor_mz"] if key in spectrum_dict),
"",
)
data["precursor_mz"].append(spectrum_dict.get(pre_mz_key, 0))
data["precursor_charge"].append(spectrum_dict.get(ms_vocab["precursor_charge"], 0))
data["retention_time"].append(spectrum_dict.get(ms_vocab["retention_time"]))
data["mz_array"].append(list(spectrum_dict.get(ms_vocab["mz_array"])))
data["intensity_array"].append(list(spectrum_dict.get(ms_vocab["intensity_array"])))
else:
# MS1 scans: precursor info not applicable
data["sequence"].append("")
data["precursor_mz"].append(None)
data["precursor_charge"].append(None)

return data


def read_mzxml(file_path: str) -> dict[str, list[Any]]:
"""Read an mzxml file and return a data dict."""
def read_mzxml(file_path: str, ms_levels: list[int] | None = None) -> dict[str, list[Any]]:
"""Read an mzxml file and return a data dict.

Args:
file_path: Path to the mzXML file.
ms_levels: List of MS levels to extract. Default [2] for backward compatibility.
"""
if ms_levels is None:
ms_levels = [2]

experiment_name = Path(file_path).stem
data = _initialize_data_dict()

with mzxml.read(file_path) as reader:
for spectrum in reader:
if spectrum.get("msLevel", 0) == 2: # Ensure it's an MS2 spectrum
data["scan_number"].append(spectrum.get("num", ""))
current_ms_level = spectrum.get("msLevel", 0)
if current_ms_level not in ms_levels:
continue

data["ms_level"].append(int(current_ms_level))
data["scan_number"].append(spectrum.get("num", ""))
data["experiment_name"].append(experiment_name)
data["retention_time"].append(spectrum.get("retentionTime"))
data["mz_array"].append(list(spectrum.get("m/z array")))
data["intensity_array"].append(list(spectrum.get("intensity array")))

if current_ms_level == 2:
data["sequence"].append(spectrum.get("peptide", ""))
precursor = spectrum.get("precursorMz", [{}])[0]
data["precursor_mz"].append(precursor.get("precursorMz", 0))
data["precursor_charge"].append(precursor.get("precursorCharge", 0))
data["retention_time"].append(spectrum.get("retentionTime"))
data["mz_array"].append(list(spectrum.get("m/z array")))
data["intensity_array"].append(list(spectrum.get("intensity array")))
else:
data["sequence"].append("")
data["precursor_mz"].append(None)
data["precursor_charge"].append(None)

return data


def _initialize_data_dict() -> dict[str, list[Any]]:
return {
"ms_level": [],
"scan_number": [],
"experiment_name": [],
"sequence": [],
"precursor_mz": [],
"precursor_charge": [],
Expand Down
2 changes: 2 additions & 0 deletions tests/unit_test/utils_test/data_handler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,7 @@ def test_mzml_to_parquet(dir_paths: tuple[str, str], tmp_path: Any) -> None:
"retention_time": [nan],
"mz_array": [array([10.0, 20.0, 30.0, 40.0])],
"intensity_array": [array([1.0, 1.5, 1.0, 1.5])],
"ms_level": pd.array([2], dtype="int32"),
"experiment_name": ["example"],
"spectrum_id": ["example:1"],
}
Expand Down Expand Up @@ -443,6 +444,7 @@ def test_mzxml_to_parquet(dir_paths: tuple[str, str], tmp_path: Any) -> None:
"retention_time": [nan],
"mz_array": [array([10.0, 30.0])],
"intensity_array": [array([20.0, 40.0])],
"ms_level": pd.array([2], dtype="int32"),
"experiment_name": ["example"],
"spectrum_id": ["example:1"],
}
Expand Down
Loading