Features/ibis custom eras#37
Conversation
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## develop #37 +/- ##
===========================================
+ Coverage 85.79% 85.82% +0.02%
===========================================
Files 169 170 +1
Lines 12514 12570 +56
===========================================
+ Hits 10737 10788 +51
- Misses 1777 1782 +5 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
…to preserve all events When DrugExposure(first=True) and QualifiedLimit=First, every person has exactly 1 event with event_id=1 (assigned by _assign_primary_event_ids). The CustomEra window previously grouped by event_id alone, collapsing all rows into 1 partition and dropping N-1 rows with _rn==0. Grouping by (person_id, event_id) gives each row its own partition, preserving all events. Adds regression test via build_cohort.
egillax
left a comment
There was a problem hiding this comment.
I think I found some issues here, can verify by comparing cohort counts on PhenotypeLibrary cohorts: 1043, 1395, 1387, 1427 and 1412
| ) | ||
|
|
||
| event_window = ibis.window( | ||
| group_by=joined.event_id, |
There was a problem hiding this comment.
This should partition by both person_id and event_id. Otherwise, events from different people that share the same per-person event_id get ranked in the same window, and the custom era strategy can drop valid cohort rows for other objects. This matches Circe BE semantics: primary event_id is assigned as row_number() over (partition by person_id ...), so it is not globally unique.
For example comparing to CirceR on cohort 1043 on my local data I get:
CirceR: rows=106467 persons=106467
This PR: rows=1 persons=1
|
|
||
|
|
||
| def _compute_eras(exposures, *, gap_days: int, offset: int): | ||
| padded = exposures.mutate(_padded_end=(exposures._exposure_end + ibis.interval(days=int(gap_days)))) |
There was a problem hiding this comment.
I think this needs to include offset in the padded end before era grouping. Circe BE groups custom eras on DRUG_EXPOSURE_END_DATE + gapDays + offset, then computes the final era end as max(padded_end) - gapDays. Applying offset only after grouping changes which exposures merge into the same custom era.
CirceBE relevant code: https://github.com/OHDSI/circe-be/blob/498893689a9cf4f09c2a43cc893bb01116db7184/src/main/resources/resources/cohortdefinition/sql/customEraStrategy.sql#L30-L38
There was a problem hiding this comment.
Can test on PL cohorts: 1395, 1387, 1427, 1412 to verify.
| ) | ||
|
|
||
| de = ctx.table("drug_exposure") | ||
| filtered = de.filter(de.drug_concept_id.isin(concept_ids)) |
There was a problem hiding this comment.
This should also include drug_source_concept_id matches. Circe BE builds the custom-era drug target from both de.drug_concept_id = cs.concept_id and de.drug_source_concept_id = cs.concept_id branches,
unioned together. Filtering only drug_concept_id means source-coded drug exposures can be missed, which breaks parity for concept sets that match source concepts.
|
|
||
| # --- raw SQL pipeline (Java CUSTOM_ERA_STRATEGY_TEMPLATE logic, DuckDB dialect) --- | ||
| # Computes drug eras, then matches era end_dates to events via start_date overlap. | ||
| sql = f""" |
There was a problem hiding this comment.
minor lint issue: remove the f here
| from __future__ import annotations | ||
|
|
||
| import ibis | ||
|
|
||
| from ..plan.schema import PERSON_ID, START_DATE | ||
| from .end_strategy import _replace_end_date, attach_observation_bounds | ||
|
|
||
|
|
||
| def _compute_exposure_end_date(table, *, days_supply_override: int | None): | ||
| start = table["drug_exposure_start_date"].cast("date") | ||
|
|
||
| if days_supply_override is not None: | ||
| return start + ibis.interval(days=days_supply_override) | ||
|
|
||
| raw_end = ( | ||
| table["drug_exposure_end_date"].cast("date") | ||
| if "drug_exposure_end_date" in table.columns | ||
| else ibis.null().cast("date") | ||
| ) | ||
| days_supply = ( | ||
| table["days_supply"].cast("int64") if "days_supply" in table.columns else ibis.null().cast("int64") | ||
| ) | ||
| supply_end = start + days_supply.as_interval("D") | ||
|
|
||
| return ibis.coalesce(raw_end, supply_end, start + ibis.interval(days=1)) | ||
|
|
||
|
|
||
| def _compute_eras(exposures, *, gap_days: int, offset: int): | ||
| padded = exposures.mutate(_padded_end=(exposures._exposure_end + ibis.interval(days=int(gap_days)))) | ||
|
|
||
| ordering = [ | ||
| padded.start_date, | ||
| padded._padded_end.desc(), | ||
| padded._exposure_end.desc(), | ||
| ] | ||
|
|
||
| cumulative_window = ibis.cumulative_window(group_by=padded.person_id, order_by=ordering) | ||
| ordered_window = ibis.window(group_by=padded.person_id, order_by=ordering) | ||
|
|
||
| with_cummax = padded.mutate(_cummax_padded_end=padded._padded_end.max().over(cumulative_window)) | ||
|
|
||
| with_prev = with_cummax.mutate(_prev_max=with_cummax._cummax_padded_end.lag().over(ordered_window)) | ||
|
|
||
| marked = with_prev.mutate( | ||
| _is_new=ibis.ifelse( | ||
| with_prev._prev_max.isnull() | (with_prev._prev_max < with_prev.start_date), | ||
| ibis.literal(1, type="int64"), | ||
| ibis.literal(0, type="int64"), | ||
| ) | ||
| ) | ||
|
|
||
| group_window = ibis.cumulative_window( | ||
| group_by=marked.person_id, | ||
| order_by=[ | ||
| marked.start_date, | ||
| marked._padded_end.desc(), | ||
| marked._exposure_end.desc(), | ||
| marked._is_new.desc(), | ||
| ], | ||
| ) | ||
| era_indexed = marked.mutate(_era_id=marked._is_new.sum().over(group_window)) | ||
|
|
||
| collapsed = era_indexed.group_by(era_indexed.person_id, era_indexed._era_id).aggregate( | ||
| era_start_date=era_indexed.start_date.min(), | ||
| _max_exposure_end=era_indexed._exposure_end.max(), | ||
| ) | ||
|
|
||
| return collapsed.select( | ||
| collapsed.person_id.cast("int64").name(PERSON_ID), | ||
| collapsed.era_start_date.cast("date").name("era_start_date"), | ||
| (collapsed._max_exposure_end + ibis.interval(days=int(offset))).cast("date").name("era_end_date"), | ||
| ) | ||
|
|
||
|
|
||
| def compute_drug_eras( | ||
| ctx, *, drug_codeset_id: int, gap_days: int, offset: int, days_supply_override: int | None | ||
| ): | ||
| concept_ids = ctx.concept_ids_for_codeset(drug_codeset_id) | ||
|
|
||
| if not concept_ids: | ||
| de = ctx.table("drug_exposure") | ||
| return de.filter(ibis.literal(False)).select( | ||
| de.person_id.cast("int64").name(PERSON_ID), | ||
| ibis.null().cast("date").name("era_start_date"), | ||
| ibis.null().cast("date").name("era_end_date"), | ||
| ) | ||
|
|
||
| de = ctx.table("drug_exposure") | ||
| filtered = de.filter(de.drug_concept_id.isin(concept_ids)) | ||
|
|
||
| prepared = filtered.select( | ||
| filtered.person_id.cast("int64").name("person_id"), | ||
| filtered.drug_exposure_start_date.cast("date").name("start_date"), | ||
| _compute_exposure_end_date(filtered, days_supply_override=days_supply_override).name("_exposure_end"), | ||
| ) | ||
|
|
||
| return _compute_eras(prepared, gap_days=gap_days, offset=offset) | ||
|
|
||
|
|
||
| def apply_custom_era_strategy(events, strategy, ctx): | ||
| payload = strategy.payload | ||
| drug_codeset_id = payload["drug_codeset_id"] | ||
| gap_days = payload["gap_days"] | ||
| offset = payload["offset"] | ||
| days_supply_override = payload.get("days_supply_override") | ||
|
|
||
| if drug_codeset_id is None: | ||
| with_bounds = attach_observation_bounds(events, ctx) | ||
| return _replace_end_date(events, with_bounds, with_bounds.op_end_date) | ||
|
|
||
| eras = compute_drug_eras( | ||
| ctx, | ||
| drug_codeset_id=drug_codeset_id, | ||
| gap_days=gap_days, | ||
| offset=offset, | ||
| days_supply_override=days_supply_override, | ||
| ) | ||
|
|
||
| eras_for_join = eras.select( | ||
| eras.person_id.name("_era_person_id"), | ||
| eras.era_start_date, | ||
| eras.era_end_date, | ||
| ) | ||
|
|
||
| with_bounds = attach_observation_bounds(events, ctx) | ||
|
|
||
| joined = with_bounds.left_join( | ||
| eras_for_join, | ||
| predicates=[ | ||
| with_bounds.person_id == eras_for_join._era_person_id, | ||
| with_bounds[START_DATE] >= eras_for_join.era_start_date, | ||
| with_bounds[START_DATE] <= eras_for_join.era_end_date, | ||
| ], | ||
| ) | ||
|
|
||
| event_window = ibis.window( | ||
| group_by=joined.event_id, | ||
| order_by=[joined.era_end_date.desc()], | ||
| ) | ||
| ranked = joined.mutate(_rn=ibis.row_number().over(event_window)) | ||
| one_per_event = ranked.filter(ranked._rn == 0) | ||
|
|
||
| effective_end = ibis.coalesce( | ||
| one_per_event.era_end_date, | ||
| one_per_event.op_end_date, | ||
| ) | ||
| final_end = ibis.least(effective_end, one_per_event.op_end_date) | ||
|
|
||
| return _replace_end_date(events, one_per_event, final_end) |
There was a problem hiding this comment.
We are filtering the whole drug_exposure table here for these concept_ids, isn't it better to first filter to people having primary events/included events. Like CIRCE-BE does:
| # 5 exposures for person 1, with gap_days=7, offset=3. | ||
| # Exposure end_dates are set explicitly so COALESCE is predictable. |
There was a problem hiding this comment.
This fixture does not actually exercise the gapDays + offset boundary. With these dates, every pair either merges with gapDays alone or remains split even with gapDays + offset, so applying offset after grouping can still pass. Could we add a case where two exposures are separated by more than gapDays but less than or equal to gapDays + offset? That would fail if offset is not included before era grouping.
A minimal example would be gap_days=0, offset=10, exposure 1 ending 2020-01-10, exposure 2 starting 2020-01-15.
| SELECT | ||
| ev.person_id, | ||
| ev.start_date, | ||
| LEAST( |
There was a problem hiding this comment.
This raw SQL does not quite match Circe BE’s final end-date selection. Circe BE unions the default OP-end candidate with #strategy_ends, then chooses the earliest valid end date per (person_id, event_id)
using row_number() ... order by CE.end_date. Here we left join eras and take MAX(er.era_end_date), which can mask differences when multiple strategy ends are possible. Could this test mirror the Circe BE
cohort_ends union / earliest-end selection instead?
Circe BE reference:
https://github.com/OHDSI/circe-be/blob/498893689a9cf4f09c2a43cc893bb01116db7184/src/main/resources/resources/cohortdefinition/sql/generateCohort.sql#L39-L52
ccrce/execution/engine/custom_era.py: - Issue 2 ✓ — _padded_end includes gap_days + offset; era end uses max(padded_end) - gap_days matching Circe BE - Issue 3 ✓ — Filters both drug_concept_id and drug_source_concept_id (with column existence guard) - Issue 5 ✓ — compute_drug_eras accepts cohort_person_ids; apply_custom_era_strategy semi-joins drug_exposure to cohort persons
Already tested in the cohort definition set/benchmarking branch I'm working on - but seems to work well