Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions tools/bioproject2srr/bioproject2srr/.shed.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
categories: ['Tools']
description: Give a Bioproject accession, get a list of associated SRR accessions
name: bioproject2srr
owner: jpayne
ignore:
- CLAUDE.md
98 changes: 98 additions & 0 deletions tools/bioproject2srr/bioproject2srr/CLAUDE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
# CLAUDE.md

This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.

## Overview

This directory contains two complementary Galaxy tools for working with NCBI metadata:

### bio2srr (BioProject → SRR)
Galaxy tool wrapper for `bio2srr.py`, which retrieves SRR (Sequence Read Archive) accessions and biosample metadata from NCBI BioProjects using the Entrez E-utilities API. The tool recursively follows links to subprojects and biosamples, producing two outputs:
- `accessions.txt`: deduplicated list of SRR accessions
- `metadata.tsv`: full metadata table joining biosample attributes with SRA run information

### srr2sam (SRR → BioSample)
Galaxy tool wrapper for `srr2sam.py`, which performs the reverse operation: given a list of SRR accessions, retrieves associated BioSample metadata. Produces:
- `output.tsv`: tabular file with SRR accession as first column, followed by all BioSample metadata fields

Both tools are containerized and run via Docker in Galaxy with the image `docker.io/crashfrog/bp2srr-galaxy:latest`.

## Key Architecture

### Core Logic Flow (bio2srr.py)
1. **ID Resolution**: Convert BioProject accession → UID via esearch
2. **Recursive Link Following**: `resolve_bioproject_ids_and_links()` walks BioProject→BioProject and BioProject→BioSample links
3. **XML Parsing**: `flatten_biosample_xml()` extracts structured metadata from BioSample XML
4. **SRA Mapping**: `yield_sra_runs_from_sample()` links BioSamples to SRA runs via elink
5. **Output Generation**: Produces sorted, deduplicated TSV with custom header ordering

### E-utilities API Pattern
All NCBI API calls use `requests.get()` with:
- Rate limiting: 1s delay (no API key) or 0.1s (with `NCBI_API_KEY`)
- Batching: process IDs in groups of 200 for esummary calls
- Error handling: `raise_for_status()` on all responses

### Core Logic Flow (srr2sam.py)
1. **SRR → SRA UID**: Convert SRR accession → SRA UID via esearch
2. **SRA → BioSample Link**: Use elink to traverse from SRA to BioSample database
3. **BioSample Metadata Fetch**: Retrieve BioSample XML via esummary
4. **XML Parsing**: Reuse `flatten_biosample_xml()` from bio2srr.py
5. **Output Generation**: TSV with SRR accession as first column

### Header Ordering
Both tools use `header_sort_override()` to enforce priority fields at the left:
- **bio2srr**: `bioproject, srr_accession, biosample_accession, organism, taxid, package` then alphabetical
- **srr2sam**: `sra_run_accession, biosample_accession, organism, taxid, package` then alphabetical

## Testing

**Run unit tests:**
```bash
pytest tests.py -v
```

**Run Galaxy tool tests:**
Both tools include functional tests that are run by Galaxy's Planemo test framework. Test data is in `test-data/`.

**bio2srr.xml:**
- Input: `PRJNA681235` (BioProject accession)
- Expected outputs: `accessions.txt`, `metadata.tsv`
- Negative test: `NOTHING` (expects failure)

**srr2sam.xml:**
- Input: `srr_test_input.txt` (3 SRR accessions)
- Expected output: `biosample_metadata.tsv`

To run Galaxy tool tests locally:
```bash
planemo test --galaxy_python_version 3.11 bio2srr.xml
planemo test --galaxy_python_version 3.11 srr2sam.xml
```

## Tool Shed Publishing

This tool is published to the Galaxy ToolShed with metadata in `.shed.yml`:
- Owner: jpayne
- Category: Tools
- Name: bioproject2srr

## Dependencies

The Python script uses:
- `requests` for HTTP calls to NCBI E-utilities
- `xml.etree.ElementTree` for parsing BioSample and SRA run XML
- `itertools.batched` (or backport for Python <3.12)
- Environment variable `NCBI_API_KEY` (optional, increases rate limit)

## Important Constraints

**bio2srr:**
- Handles unreleased samples (samples without SRA runs) by including them in metadata but warning about omission from accessions list
- Duplicate SRA runs (reached via multiple projects/samples) are deduplicated in `accessions.txt` but retained in metadata
- Non-public SRA runs are skipped with a warning
- API responses contain "deliriously" embedded XML fragments that require wrapping in `<data>` root tags before parsing

**srr2sam:**
- If an SRR accession cannot be resolved to a BioSample, the row will contain "NOT_FOUND" in the biosample_accession column
- If BioSample XML parsing fails, the row will contain "PARSE_ERROR"
- Takes the first BioSample UID when an SRA record links to multiple BioSamples (rare edge case)
276 changes: 276 additions & 0 deletions tools/bioproject2srr/bioproject2srr/bio2srr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,276 @@
"Grab SRR numbers from Bioprojects and sub-bioprojects via Eutils"

import requests
import sys
import csv
import os

try:
from itertools import batched
except ImportError:
from itertools import islice
def batched(iterable, n):
"Batch data into tuples of length n. The last batch may be shorter."
# batched('ABCDEFG', 3) --> ABC DEF G
if n < 1:
raise ValueError('n must be at least one')
it = iter(iterable)
while batch := tuple(islice(it, n)):
yield batch
from functools import cmp_to_key
from time import sleep
from xml.etree import ElementTree as xml

esearch = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esearch.fcgi"
esummary = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esummary.fcgi"
elink = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/elink.fcgi"


import logging
logging.basicConfig(level=logging.INFO)

logger = logging.getLogger("bio2srr")

extra_params = {}

api_key = os.environ.get("NCBI_API_KEY")

if api_key:
logger.info(f"Using NCBI API key {api_key[:4]}{'*' * (len(api_key) - 8)}{api_key[-4:]}")
extra_params["api_key"] = api_key

def log(msg):
if api_key:
logger.info(msg.replace(api_key, f"{api_key[:4]}{'*' * (len(api_key) - 8)}{api_key[-4:]}")) # fix logging later
else:
logger.info(msg)

def get_tag(root, tag):
val = root.find(tag)
if val is not None:
return val.text
log(f"No result for {tag}")



def header_sort_override(a, b):
if a == b:
return 0
try:
for name in ["bioproject", "srr_accession", "biosample_accession", "organism", "taxid", "package",]:
if a == name:
return -1
if b == name:
return 1
except:
pass
if a < b:
return -1
else:
return 1

hso = cmp_to_key(header_sort_override)

def resolve_bioproject_ids_and_links(bioproject_id_list):
"Recursively follow bioproject and biosample links, yield biosample UID's and biosample XML"
for i, (bioproject, bioproject_id) in enumerate(bioproject_id_list):
log(f"Processing {bioproject} ({bioproject_id}) {i+1}/{len(bioproject_id_list)}")
#get bioproject to bioproject links
response = requests.get(elink, params=dict(db="bioproject", dbfrom="bioproject", id=bioproject_id, format="json", **extra_params))
response.raise_for_status()
reply = response.json()
linksets = reply.get("linksets", [{}])[0].get("linksetdbs", [0,0,{}])
if len(linksets) >= 3:
for id in linksets[2].get("links", []): #third index is the up to down links
response = requests.get(esummary, params=dict(id=id, db="bioproject", format="json"))
response.raise_for_status()
replyy = response.json()
biop = replyy["result"][id]["project_acc"]
if id not in bioproject_id_list:
bioproject_id_list.append((biop, id)) # recurse over bioproject links
# get bioproject to biosample links
response = requests.get(elink, params=dict(db="biosample", dbfrom="bioproject", id=bioproject_id, format="json", **extra_params))
response.raise_for_status()
reply = response.json()
links = reply.get("linksets", [{}])[0].get("linksetdbs", [{}])[0].get("links", [])
log(f"Found {len(links)} biosample links for {bioproject} ({bioproject_id})")
for ids in batched(links, 200):
response = requests.get(esummary, params=dict(id=",".join(ids), db="biosample", format="json"))
response.raise_for_status()
replyy = response.json()
for field, value in replyy.get("result", {}).items():
if "uids" not in field:
yield bioproject, field, value["sampledata"] # this is XML, deleriously
sleep(1 if not api_key else 0.1)


biosample_example = """
<BioSample access="public" publication_date="2020-12-21T00:00:00.000" last_update="2022-06-23T17:45:35.674" submission_date="2020-12-21T15:08:05.690" id="17131268" accession="SAMN17131268">
<Ids>
<Id db="BioSample" is_primary="1">SAMN17131268</Id>
<Id db_label="Sample name">CJP19-D996</Id>
</Ids>
<Description>
<Title>Pathogen: environmental/food/other sample from Campylobacter jejuni</Title>
<Organism taxonomy_id="197" taxonomy_name="Campylobacter jejuni">
<OrganismName>Campylobacter jejuni</OrganismName>
</Organism>
</Description>
<Owner>
<Name url="http://www.fda.gov/Food/FoodScienceResearch/WholeGenomeSequencingProgramWGS/default.htm" abbreviation="CFSAN">FDA Center for Food Safety and Applied Nutrition</Name>
</Owner>
<Models>
<Model>Pathogen.env</Model>
</Models>
<Package display_name="Pathogen: environmental/food/other; version 1.0">Pathogen.env.1.0</Package>
<Attributes>
<Attribute attribute_name="strain" harmonized_name="strain" display_name="strain">CJP19-D996</Attribute>
<Attribute attribute_name="collection_date" harmonized_name="collection_date" display_name="collection date">missing</Attribute>
<Attribute attribute_name="geo_loc_name" harmonized_name="geo_loc_name" display_name="geographic location">missing</Attribute>
<Attribute attribute_name="collected_by" harmonized_name="collected_by" display_name="collected by">CDC</Attribute>
<Attribute attribute_name="lat_lon" harmonized_name="lat_lon" display_name="latitude and longitude">missing</Attribute>
<Attribute attribute_name="isolation_source" harmonized_name="isolation_source" display_name="isolation source">missing</Attribute>
<Attribute attribute_name="isolate" harmonized_name="isolate" display_name="isolate">CFSAN091032</Attribute>
<Attribute attribute_name="project name" harmonized_name="project_name" display_name="project name">GenomeTrakr</Attribute>
<Attribute attribute_name="sequenced by" harmonized_name="sequenced_by" display_name="sequenced by">FDA Center for Food Safety and Applied Nutrition</Attribute>
</Attributes>
<Links>
<Link type="entrez" target="bioproject" label="PRJNA681235">681235</Link>
</Links>
<Status status="live" when="2020-12-21T15:08:05.693"/>
</BioSample>

"""

def flatten_biosample_xml(biosampxml):
root = xml.fromstring(biosampxml)
accession = get_tag(root, r'.//Id[@db="BioSample"]')
# sample_name = get_tag(root, r'.//Id[@db_label="Sample name"]')
organism = get_tag(root, r".//OrganismName")
tax_id = root.find(r".//Organism").attrib.get("taxonomy_id")
package = get_tag(root, r".//Package")
sampledict = dict(
biosample_accession=accession,
# sample_name=sample_name,
organism = organism,
taxid = tax_id,
package = package
)
for attribute in root.findall("Attributes/Attribute"):
sampledict[attribute.attrib.get("harmonized_name", attribute.attrib['attribute_name'])] = attribute.text

return sampledict


def yield_sra_runs_from_sample(biosample):
sleep(1 if not api_key else 0.1)
response = requests.get(elink, params=dict(id=biosample, dbfrom="biosample", db="sra", format="json", **extra_params))
response.raise_for_status()
reply = response.json()
for ids in batched(reply.get("linksets", [{}])[0].get("linksetdbs", [{}])[0].get("links", []), 200):
sleep(1 if not api_key else 0.1)
response = requests.get(esummary, params=dict(id=','.join(ids), db="sra", format="json", **extra_params))
response.raise_for_status()
replyy = response.json()
for field, value in replyy.get("result", {}).items():
if "uids" not in field:
yield field, value.get("runs")


runs_example = """
<Run acc="SRR13167188" total_spots="827691" total_bases="385043067" load_done="true" is_public="true" cluster_name="public" static_data_available="true"/>
<Run acc="SRR13167189" total_spots="827691" total_bases="385043067" load_done="true" is_public="true" cluster_name="public" static_data_available="true"/>
"""

def flatten_runs(runxml):
root = xml.fromstring(f"<data>{runxml}</data>") # gotta fix their garbage embedded XML since it isn't singly-rooted
for run in root.findall(".//Run"):
if run.attrib["is_public"] == "false":
logger.warning(f"Skipping non-public run {run.attrib['acc']}")
yield dict(
sra_run_accession = run.attrib["acc"],
total_spots = run.attrib["total_spots"],
total_bases = run.attrib["total_bases"],
)



def main(starting_bioproject):
rows = []
response = requests.get(esearch, params=dict(db="bioproject", term=starting_bioproject, field="PRJA", format="json"))
response.raise_for_status()
reply = response.json()
try:
bioproject_id = reply["esearchresult"]["idlist"][0]
log(f"Found UID {bioproject_id} for '{starting_bioproject}'")
except IndexError:
logger.error(f"No results found for '{starting_bioproject}'. Error was \"{reply['esearchresult']['warninglist']['outputmessages']}\"")
sys.exit(1)
sleep(1 if not api_key else 0.1)
for bioproject, biosample, biosample_xml in resolve_bioproject_ids_and_links([(starting_bioproject, bioproject_id)]):
try:
sampledict = flatten_biosample_xml(biosample_xml)
except KeyError:
log(biosample_xml)
raise
sampledict["bioproject"] = bioproject
noruns = True
for sra, runs in yield_sra_runs_from_sample(biosample):
for run in flatten_runs(runs.strip()):
noruns = False
run.update(sampledict)
rows.append(run)
if noruns:
rows.append(sampledict)

log(f"Writing {len(rows)} rows to metadata.tsv")

header = set()
for row in rows:
for key in row.keys():
header.add(key)

header = sorted(list(header), key=hso)
# logger.info(f"Header: {header}")

rows.sort(key=lambda x: x["biosample_accession"])

with open("metadata.tsv", "w") as f:
writer = csv.DictWriter(f, fieldnames=header, delimiter="\t", dialect="excel")
writer.writeheader()
writer.writerows(rows)

# check for duplicate runs and unreleased samples

accessions = [row.get("sra_run_accession") for row in rows if row.get("sra_run_accession")]

raw_length = len(accessions)

accessions = sorted(list(set(accessions)))

if raw_length < len(rows):
logger.warning(f"Bioproject {starting_bioproject} contains unreleased samples. {len(rows) - raw_length} samples will not be included in accessions.txt")

if len(accessions) < raw_length:
logger.warning(f"Some SRA runs may have been reached through multiple projects or samples. accessions.txt will be deduplicated but the metadata table is not")

log(f"Writing {len(accessions)} unique accessions to accessions.txt")

with open("accessions.txt", "w") as f:
for accession in accessions:
f.write(accession + "\n")


if __name__ == "__main__":
b = sys.argv[1].strip()
log(f"Starting with {b}")
try:
main(b)
except requests.HTTPError as e:
logger.error(e)
sys.exit(1)





Loading
Loading