Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ Run status is tracked in CouchDB with events including:
- Run directories are named according to sequencer-specific ID formats (defined in run classes)
- Final completion is indicated by the presence of a sequencer-specific final file (e.g., `CopyComplete.txt` for Illumina)
- Remote storage is accessible via rsync over SSH
- CouchDB is accessible and the database specified in the config exists and has a ddoc called `events` with a view called `current_status_per_runfolder` that emits a dictionary of all the statuses and their current state (true/false)
- CouchDB is accessible and the database specified in the config exists and has a ddoc called `events` with a view called `current_status_per_runfolder` that emits a dictionary of all the statuses and their current state (true/false)
- CouchDB is accessible and the database `gs_configs` contains a document called `regex_patterns` containing the regexes used to identify different run types.
- The flowcell ID is set to correspond to the ID that is scanned with a barcode scanner during sequencing setup in the lab

### Status Files
Expand Down Expand Up @@ -200,3 +201,4 @@ To add support for a new sequencer, add the following to dataflow_transfer:
2. Import the new class in `dataflow_transfer/run_classes/__init__.py`
3. Add a test fixture for the new run in `dataflow_transfer/tests/test_run_classes.py` and include it in the relevant tests
4. Add a section for the sequencer in the config file
5. Add a regular expression matching the run folder name of the sequencer in the CouchDB database `gs_configs`, in the document called `regex_patterns`.
6 changes: 2 additions & 4 deletions dataflow_transfer/run_classes/element_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
class ElementRun(Run):
"""Defines an Element sequencing run"""

run_family = "Element"

def __init__(self, run_dir, configuration):
super().__init__(run_dir, configuration)
self.final_file = "RunUploaded.json"
Expand All @@ -18,11 +20,7 @@ class AVITIRun(ElementRun):
run_type = "AVITI"

def __init__(self, run_dir, configuration):
self.run_id_format = (
r"^\d{8}_AV\d{6}_(A|B)\d{10}$" # 20251007_AV242106_A2507535225
)
super().__init__(run_dir, configuration)
self.flowcell_id = self.run_id.split("_")[-1][1:] # 2507535225


# TODO: Add Teton run class
36 changes: 33 additions & 3 deletions dataflow_transfer/run_classes/generic_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
class Run:
"""Defines a generic sequencing run"""

run_type = None
run_family = None
default_run_id_format = None
Comment thread
ssjunnebo marked this conversation as resolved.
Outdated

def __init__(self, run_dir, configuration):
self.run_dir = run_dir
self.run_id = os.path.basename(run_dir)
Expand All @@ -33,6 +37,29 @@ def __init__(self, run_dir, configuration):
)
self.remote_destination = self.sequencer_config.get("remote_destination")
self.db = StatusdbSession(self.configuration.get("statusdb"))
self.run_id_format = self._resolve_run_id_format()
self.flowcell_id = (
re.match(self.run_id_format, self.run_id).group("flowcell_id")
if self.run_id_format
else None
)

def _resolve_run_id_format(self):
"""Resolve the run ID regex from central config."""
run_id_format = None
if self.run_family and self.run_type:
try:
run_id_format = self.db.get_regex_pattern(
self.run_family, self.run_type
)
except Exception as exc:
logger.warning(
"Unable to load run_id_format for %s from regex config: %s",
self.run_type,
exc,
)

return run_id_format

def confirm_run_type(self):
"""Compare run ID with expected format for the run type."""
Expand Down Expand Up @@ -159,10 +186,13 @@ def has_status(self, status_name):

def update_statusdb(self, status, additional_info=None):
"""Update the statusdb document for this run with the given status."""
db_doc = (
self.db.get_db_doc(ddoc="lookup", view="runfolder_id", run_id=self.run_id)
or {}
doc_id = self.db.get_doc_id(
ddoc="lookup", view="runfolder_id", run_id=self.run_id
)
if doc_id:
db_doc = self.db.get_document(db=self.db.db_name, doc_id=doc_id)
else:
db_doc = {}

statuses_to_only_update_once = [
"sequencing_started",
Expand Down
15 changes: 2 additions & 13 deletions dataflow_transfer/run_classes/illumina_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@
class IlluminaRun(Run):
"""Defines an Illumina sequencing run"""

run_family = "Illumina"

def __init__(self, run_dir, configuration):
super().__init__(run_dir, configuration)
self.final_file = "CopyComplete.txt"
self.flowcell_id = self.run_id.split("_")[-1]


@register_run_class
Expand All @@ -19,11 +20,7 @@ class NovaSeqXPlusRun(IlluminaRun):
run_type = "NovaSeqXPlus"

def __init__(self, run_dir, configuration):
self.run_id_format = (
r"^\d{8}_[A-Z0-9]+_\d{4}_[A-Z0-9]+$" # 20251010_LH00202_0284_B22CVHTLT1
)
super().__init__(run_dir, configuration)
self.flowcell_id = self.run_id.split("_")[-1][1:] # 22CVHTLT1


@register_run_class
Expand All @@ -33,9 +30,6 @@ class NextSeqRun(IlluminaRun):
run_type = "NextSeq"

def __init__(self, run_dir, configuration):
self.run_id_format = (
r"^\d{6}_[A-Z0-9]+_\d{3}_[A-Z0-9]+$" # 251015_VH00203_572_AAHFHCCM5
)
super().__init__(run_dir, configuration)


Expand All @@ -46,9 +40,6 @@ class MiSeqRun(IlluminaRun):
run_type = "MiSeq"

def __init__(self, run_dir, configuration):
self.run_id_format = (
r"^\d{6}_[A-Z0-9]+_\d{4}_[A-Z0-9\-]+$" # 251015_M01548_0646_000000000-M6D7K
)
super().__init__(run_dir, configuration)


Expand All @@ -59,6 +50,4 @@ class MiSeqi100Run(IlluminaRun):
run_type = "MiSeqi100"

def __init__(self, run_dir, configuration):
self.run_id_format = r"^\d{8}_[A-Z0-9]+_\d{4}_[A-Z0-9]{10}-SC3$" # 20260128_SH01140_0002_ASC2150561-SC3
super().__init__(run_dir, configuration)
self.flowcell_id = self.run_id.split("_")[-1][1:] # SC2150561-SC3
5 changes: 2 additions & 3 deletions dataflow_transfer/run_classes/ont_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@
class ONTRun(Run):
"""Defines a ONT sequencing run"""

run_family = "ONT"

def __init__(self, run_dir, configuration):
super().__init__(run_dir, configuration)
self.final_file = "final_summary.txt"
self.flowcell_id = self.run_id.split("_")[-2]


@register_run_class
Expand All @@ -19,7 +20,6 @@ class PromethIONRun(ONTRun):
run_type = "PromethION"

def __init__(self, run_dir, configuration):
self.run_id_format = r"^\d{8}_\d{4}_[A-Z0-9]{2}_P[A-Z0-9]+_[a-f0-9]{8}$" # 20251015_1051_3B_PBG60686_0af3a2e0
super().__init__(run_dir, configuration)


Expand All @@ -30,5 +30,4 @@ class MinIONRun(ONTRun):
run_type = "MinION"

def __init__(self, run_dir, configuration):
self.run_id_format = r"^\d{8}_\d{4}_MN[A-Z0-9]+_[A-Z0-9]+_[a-f0-9]{8}$" # 20240229_1404_MN19414_ASH657_7a74bf8f
super().__init__(run_dir, configuration)
26 changes: 25 additions & 1 deletion dataflow_transfer/tests/test_run_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,26 @@ def __init__(self, config):
def get_db_doc(self, ddoc, view, run_id):
return None

def get_regex_pattern(self, run_family, run_type):
if run_family == "Illumina":
Comment thread
ssjunnebo marked this conversation as resolved.
if run_type == "NovaSeqXPlus":
return r"^(?P<date>\d{8})_(?P<instrument>[A-Z0-9]+)_\d{4}_(?P<position>(A|B))(?P<flowcell_id>[A-Z0-9]+)$"
elif run_type == "NextSeq":
return r"^(?P<date>\d{6})_(?P<instrument>[A-Z0-9]+)_\d{3}_(?P<flowcell_id>[A-Z0-9]+)$"
elif run_type == "MiSeq":
return r"^(?P<date>\d{6})_(?P<instrument>[A-Z0-9]+)_\d{4}_(?P<flowcell_id>[A-Z0-9\-]+)$"
elif run_type == "MiSeqi100":
return r"^(?P<date>\d{8})_(?P<instrument>[A-Z0-9]+)_\d{4}_A(?P<flowcell_id>[A-Z0-9]{9}-SC3)$"
elif run_family == "ONT":
if run_type == "PromethION":
return r"^(?P<date>\d{8})_(?P<time>\d{4})_(?P<position>[A-Z0-9]{2})_(?P<flowcell_id>P[A-Z0-9]+)_(?P<run_hash>[a-f0-9]{8})$"
elif run_type == "MinION":
return r"^(?P<date>\d{8})_(?P<time>\d{4})_(?P<position>MN[A-Z0-9]+)_(?P<flowcell_id>[A-Z0-9]+)_(?P<run_hash>[a-f0-9]{8})$"
elif run_family == "Element":
if run_type == "AVITI":
return r"^(?P<date>\d{8})_(?P<instrument>AV\d{6})_(?P<position>(A|B))(?P<flowcell_id>\d{10})$"
return None

def update_db_doc(self, doc):
pass

Expand Down Expand Up @@ -458,8 +478,12 @@ def test_update_statusdb(
class MockDB:
def __init__(self):
self.updated_doc = None
self.db_name = "mock_db"

def get_db_doc(self, ddoc, view, run_id):
def get_doc_id(self, ddoc, view, run_id):
return "mock_doc_id"

def get_document(self, db, doc_id):
return {"events": existing_statuses, "files": {}}

def update_db_doc(self, doc):
Expand Down
35 changes: 25 additions & 10 deletions dataflow_transfer/utils/statusdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,31 @@ def _retry_call(self, func):
# re-raise last exception for caller to handle
raise last_exception

def get_db_doc(self, ddoc, view, run_id):
"""Retrieve a document from the database via retried call."""
doc_id = self.get_doc_id(ddoc, view, run_id)
if doc_id:
return self._retry_call(
lambda: self.connection.get_document(
db=self.db_name, doc_id=doc_id
).get_result()
)
return None
def get_document(self, db, doc_id):
"""Retrieve a document from any database via retried call."""
return self._retry_call(
lambda: self.connection.get_document(db=db, doc_id=doc_id).get_result()
)

def get_regex_pattern(
self,
run_family,
run_type,
regex_db="gs_configs",
regex_doc_id="regex_patterns",
):
"""Lookup the python regex pattern for a run type from the central regex config document."""
regex_doc = self.get_document(db=regex_db, doc_id=regex_doc_id)
if not regex_doc:
return None

flowcell_patterns = regex_doc.get("flowcell_patterns", {})
family_patterns = flowcell_patterns.get(run_family, {})
if not family_patterns:
return None

pattern = family_patterns.get(run_type)
return pattern

def get_doc_id(self, ddoc, view, run_id):
"""Retrieve a document ID from the database via retried call."""
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ ignore = [

[project]
name = "dataflow_transfer"
version = "1.1.3"
version = "1.1.4"
description = "Script for transferring sequencing data from sequencers to storage"
authors = [
{ name = "Sara Sjunnebo", email = "sara.sjunnebo@scilifelab.se" },
Expand Down
Loading