Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ ena_upload_cli.egg-info/
__pycache__/
tests/ena_upload/
.idea/
.agents/
58 changes: 58 additions & 0 deletions ena_upload/ena_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import requests
import json
import sys

class EnaClient:
"""
Client per interagire con le API ENA Webin REST V2 (JSON).
"""
PRODUCTION_URL = "https://www.ebi.ac.uk/ena/submit/drop-box/submit/v2"
DEV_URL = "https://wwwdev.ebi.ac.uk/ena/submit/drop-box/submit/v2"

CHECKLIST_URL = "https://www.ebi.ac.uk/ena/submit/report/checklists"
DEV_CHECKLIST_URL = "https://wwwdev.ebi.ac.uk/ena/submit/report/checklists"

def __init__(self, webin_id, password, dev=False):
self.webin_id = webin_id
self.password = password
self.dev = dev
self.base_url = self.DEV_URL if dev else self.PRODUCTION_URL
self.checklist_base_url = self.DEV_CHECKLIST_URL if dev else self.CHECKLIST_URL
self.session = requests.Session()
self.session.auth = (webin_id, password)
self.session.trust_env = False

def get_checklist(self, checklist_id):
"""
Recupera la definizione di un checklist in formato JSON.
"""
url = f"{self.checklist_base_url}/{checklist_id}"
print(f"Fetching checklist {checklist_id} from {url}...")
try:
response = self.session.get(url, params={"format": "json"})
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"Error fetching checklist {checklist_id}: {e}")
if hasattr(e.response, 'text'):
print(f"Response: {e.response.text}")
return None

def submit_json(self, payload):
"""
Invia la sottomissione JSON all'endpoint ENA V2.
"""
print(f"Submitting JSON to {self.base_url}...")
try:
response = self.session.post(
self.base_url,
json=payload,
headers={"Accept": "application/json"}
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"Error during JSON submission: {e}")
if hasattr(e.response, 'text'):
print(f"Response: {e.response.text}")
return None
226 changes: 204 additions & 22 deletions ena_upload/ena_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from ena_upload._version import __version__
from ena_upload.check_remote import remote_check
from ena_upload.json_parsing.ena_submission import EnaSubmission
from ena_upload.ena_client import EnaClient


SCHEMA_TYPES = ['study', 'experiment', 'run', 'sample']
Expand Down Expand Up @@ -282,6 +283,18 @@ def construct_xml(schema, stream, xsd):
return xml_file


def construct_json(payload, output_file=None):
"""
Costruisce e valida il payload JSON per la sottomissione.
"""
json_string = json.dumps(payload, indent=4)
if output_file:
with open(output_file, 'w') as fw:
fw.write(json_string)
print(f'wrote {output_file}')
return json_string


def actors(template_path, checklist):
''':return: the filenames of schema definitions and templates
'''
Expand Down Expand Up @@ -336,7 +349,142 @@ def run_construct(template_path, schema_targets, center, checklist, tool):
return schema_xmls


def construct_submission(template_path, action, submission_input, center, checklist, tool):
def run_construct_json(schema_targets, center, tool):
"""
Costruisce il payload JSON per le schema in schema_targets conforme a ENA Webin V2.
"""
payload = {}

# Mappatura dei campi core per ogni schema (da colonna df a chiave JSON ENA)
core_mappings = {
'study': {
'alias': 'alias',
'title': 'title',
'study_type': 'type',
'study_abstract': 'abstract',
'pubmed_id': 'pubmedId'
},
'sample': {
'alias': 'alias',
'title': 'title',
'taxon_id': 'taxonId',
'scientific_name': 'scientificName',
'common_name': 'commonName',
'sample_description': 'description'
},
'experiment': {
'alias': 'alias',
'title': 'title',
'study_alias': 'studyAlias',
'sample_alias': 'sampleAlias',
'platform': 'platform',
'instrument_model': 'instrumentModel',
'library_name': 'libraryName',
'library_strategy': 'libraryStrategy',
'library_source': 'librarySource',
'library_selection': 'librarySelection',
'library_layout': 'libraryLayout',
'insert_size': 'nominalInsertSize'
}
}

for schema, targets in schema_targets.items():
if schema == 'run':
run_list = []
for alias, group in targets.groupby('alias'):
run_entry = {
"alias": alias,
"experimentAlias": group['experiment_alias'].iloc[0],
"centerName": center
}
files = []
for _, row in group.iterrows():
# Cerchiamo il checksum in diverse varianti possibili
checksum = (row.get('file_checksum') or
row.get('file checksum') or
row.get('checksum') or
"")

file_entry = {
"fileName": row['file_name'],
"fileType": row['file_type'],
"checksumMethod": "MD5",
"checksum": checksum
}
if 'read_type' in row and pd.notna(row['read_type']):
file_entry['readType'] = row['read_type']
files.append(file_entry)
run_entry["dataFiles"] = files
run_list.append(run_entry)
payload["runs"] = run_list
else:
schema_list = []
mappings = core_mappings.get(schema, {})

for _, row in targets.iterrows():
entry = {"centerName": center}
attributes = []

# Identifichiamo le colonne degli attributi extra
# (formato schema_attribute[tag])
pattern = re.compile(rf"{schema}_attribute\[(.*)\]")

for col in targets.columns:
val = row[col]
if pd.isna(val) or str(val).strip() == "":
continue

if col in mappings:
entry[mappings[col]] = val
elif col == 'alias': # alias è universale
entry['alias'] = val
else:
# Gestione attributi
tag = col
match = re.match(pattern, col)
if match:
tag = match.group(1)

# Evitiamo di duplicare campi core negli attributi
if col not in mappings and col != 'status':
attributes.append({"tag": tag, "value": str(val)})

if attributes:
entry["attributes"] = attributes

schema_list.append(entry)

# ENA V2 usa plurali per le chiavi radice
root_key = f"{schema}s" if not schema.endswith('s') else schema
if schema == 'study': root_key = 'studies'
if schema == 'sample': root_key = 'samples'
if schema == 'experiment': root_key = 'experiments'

payload[root_key] = schema_list

return payload


def construct_json(payload, filename):
"""
Salva il payload JSON su file.
"""
with open(filename, 'w') as f:
json.dump(payload, f, indent=4)
return filename


def get_and_validate_checklist(ena_client, checklist_id):
"""
Recupera il checklist in formato JSON e lo restituisce.
In futuro può essere usato per validazione locale.
"""
if not checklist_id:
return None
checklist_data = ena_client.get_checklist(checklist_id)
if checklist_data:
print(f"Checklist {checklist_id} retrieved successfully")
return checklist_data
'''construct XML for submission

:param action: action for submission -
Expand Down Expand Up @@ -1026,15 +1174,50 @@ def main():
# ? need to add a place holder for setting up
base_path = os.path.abspath(os.path.dirname(__file__))
template_path = os.path.join(base_path, 'templates')

# Inizializziamo il client ENA
ena_client = EnaClient(webin_id, password, dev=dev)

# Recupero checklist se specificato
if checklist:
get_and_validate_checklist(ena_client, checklist)

# Decidiamo se usare il nuovo flusso JSON (default per ADD/MODIFY in questa versione)
use_json = True

schema_update = {}

if action in ['ADD', 'MODIFY']:
# when ADD/MODIFY,
# requires source XMLs for 'run', 'experiment', 'sample', 'experiment'
# schema_xmls record XMLs for all these schema and following 'submission'
schema_xmls = run_construct(
template_path, schema_targets, center, checklist, tool)
if use_json:
print("Constructing JSON payload for submission")
json_payload = run_construct_json(schema_targets, center, tool)

if draft:
construct_json(json_payload, "submission.json")
print("Draft mode: JSON payload saved to submission.json")
return # Exit early for draft
else:
print(f"Submitting JSON to ENA server (Dev: {dev})")
response = ena_client.submit_json(json_payload)
if response:
print("Submission successful")
with open('receipt.json', 'w') as f:
json.dump(response, f, indent=4)
print("Receipt saved to receipt.json")
# TODO: Implementare process_receipt_json completa
# Per ora terminiamo qui con successo
print("\nNota: La migrazione JSON è in fase iniziale. L'aggiornamento automatico delle tabelle con gli accession number richiede l'implementazione del parser per la ricevuta JSON.")
return
else:
sys.exit("Submission failed")
else:
# Vecchio flusso XML
schema_xmls = run_construct(
template_path, schema_targets, center, checklist, tool)

submission_xml = construct_submission(template_path, action,
schema_xmls, center, checklist, tool)
submission_xml = construct_submission(template_path, action,
schema_xmls, center, checklist, tool)
schema_xmls['submission'] = submission_xml

elif action in ['CANCEL', 'RELEASE']:
# when CANCEL/RELEASE, only accessions needed
Expand All @@ -1043,14 +1226,13 @@ def main():

submission_xml = construct_submission(template_path, action,
schema_targets, center, checklist, tool)
schema_xmls['submission'] = submission_xml

else:
print(f"The action {action} is not supported.")
schema_xmls['submission'] = submission_xml
sys.exit(1)

if draft:
print("No submission will be performed, remove `--draft' argument to perform submission.")
else:
if not use_json:
if dev:
url = 'https://wwwdev.ebi.ac.uk/ena/submit/drop-box/submit/'
else:
Expand All @@ -1067,16 +1249,16 @@ def main():
print("There was an ERROR during submission:")
sys.exit(receipt)

if action in ['ADD', 'MODIFY'] and not draft:
schema_dataframe = update_table(schema_dataframe,
schema_targets,
schema_update)
else:
schema_dataframe = update_table_simple(schema_dataframe,
schema_targets,
action)
# save updates in new tables
save_update(schema_tables, schema_dataframe)
if action in ['ADD', 'MODIFY'] and not draft:
schema_dataframe = update_table(schema_dataframe,
schema_targets,
schema_update)
else:
schema_dataframe = update_table_simple(schema_dataframe,
schema_targets,
action)
# save updates in new tables
save_update(schema_tables, schema_dataframe)


if __name__ == "__main__":
Expand Down