diff --git a/ami/main/management/commands/seed_synthetic_occurrences.py b/ami/main/management/commands/seed_synthetic_occurrences.py new file mode 100644 index 000000000..aeb1b381e --- /dev/null +++ b/ami/main/management/commands/seed_synthetic_occurrences.py @@ -0,0 +1,144 @@ +""" +Seed synthetic Detections + Occurrences against an existing deployment so the +24h-cap regrouping path can be exercised on staging without running an ML +pipeline. + +Each SourceImage in the deployment gets one Detection. Each Detection gets one +Occurrence (event=None, determination=None). After seeding, run +``group_images_into_events`` against the deployment with +``max_event_duration=timedelta(hours=24)`` and verify that +``Occurrence.event_id`` is populated correctly via the realignment Subquery. + +Usage: + docker compose exec django python manage.py seed_synthetic_occurrences \ + --deployment [--limit 5000] [--batch-size 1000] [--clean] +""" + +import datetime + +from django.core.management.base import BaseCommand, CommandError +from django.db import transaction + +from ami.main.models import Deployment, Detection, Occurrence, SourceImage + + +class Command(BaseCommand): + help = "Create synthetic Detections + Occurrences for a deployment to test event_id realignment." + + def add_arguments(self, parser): + parser.add_argument("--deployment", type=int, required=True, help="Deployment PK") + parser.add_argument( + "--limit", + type=int, + default=None, + help="Max number of SourceImages to seed against (default: all)", + ) + parser.add_argument("--batch-size", type=int, default=1000) + parser.add_argument( + "--clean", + action="store_true", + help="Delete synthetic Detections + Occurrences for this deployment instead of creating", + ) + parser.add_argument( + "--dry-run", + action="store_true", + help="Print counts without writing", + ) + + def handle(self, *args, **opts): + try: + deployment = Deployment.objects.get(pk=opts["deployment"]) + except Deployment.DoesNotExist as exc: + raise CommandError(f"Deployment {opts['deployment']} not found") from exc + + if opts["clean"]: + self._clean(deployment, dry_run=opts["dry_run"]) + return + + self._seed( + deployment=deployment, + limit=opts["limit"], + batch_size=opts["batch_size"], + dry_run=opts["dry_run"], + ) + + def _seed(self, deployment: Deployment, limit: int | None, batch_size: int, dry_run: bool) -> None: + qs = SourceImage.objects.filter(deployment=deployment).order_by("pk") + if limit: + qs = qs[:limit] + + total = qs.count() + if not total: + self.stdout.write(self.style.WARNING(f"No SourceImages on deployment {deployment.pk}")) + return + + self.stdout.write( + f"Seeding {total} SourceImages on deployment {deployment.pk} " + f"({deployment.project_id=}) in batches of {batch_size}" + ) + if dry_run: + self.stdout.write(self.style.WARNING("Dry run; no writes.")) + return + + created_detections = 0 + created_occurrences = 0 + for offset in range(0, total, batch_size): + batch = list(qs[offset : offset + batch_size].values("pk", "timestamp")) + with transaction.atomic(): + occurrences = Occurrence.objects.bulk_create( + [ + Occurrence( + project_id=deployment.project_id, + deployment_id=deployment.pk, + event=None, + determination=None, + ) + for _ in batch + ] + ) + detections = [ + Detection( + source_image_id=row["pk"], + timestamp=row["timestamp"], + bbox=[10, 10, 20, 20], + occurrence_id=occ.pk, + ) + for row, occ in zip(batch, occurrences) + ] + Detection.objects.bulk_create(detections) + created_occurrences += len(occurrences) + created_detections += len(detections) + self.stdout.write(f" ...batch {offset // batch_size + 1}: {len(batch)} rows") + + self.stdout.write( + self.style.SUCCESS( + f"Created {created_detections} Detections + {created_occurrences} Occurrences " + f"on deployment {deployment.pk}. Now run group_images_into_events with " + f"max_event_duration={datetime.timedelta(hours=24)} and confirm event_id realignment." + ) + ) + + def _clean(self, deployment: Deployment, dry_run: bool) -> None: + synthetic_dets = Detection.objects.filter( + source_image__deployment=deployment, + bbox=[10, 10, 20, 20], + detection_algorithm__isnull=True, + path__isnull=True, + ) + synthetic_occ_ids = list( + synthetic_dets.exclude(occurrence__isnull=True).values_list("occurrence_id", flat=True).distinct() + ) + + det_count = synthetic_dets.count() + occ_count = Occurrence.objects.filter(pk__in=synthetic_occ_ids).count() + + self.stdout.write( + f"Would delete {det_count} synthetic Detections + {occ_count} Occurrences on deployment {deployment.pk}" + ) + if dry_run: + return + + with transaction.atomic(): + synthetic_dets.delete() + Occurrence.objects.filter(pk__in=synthetic_occ_ids).delete() + self.stdout.write(self.style.SUCCESS("Cleaned.")) diff --git a/ami/main/models.py b/ami/main/models.py index c604c6319..e91b395b7 100644 --- a/ami/main/models.py +++ b/ami/main/models.py @@ -1389,8 +1389,14 @@ def audit_event_lengths(deployment: Deployment): logger.error(f"Found {events_ending_before_start} event(s) with start > end in deployment {deployment}") +DEFAULT_MAX_EVENT_DURATION = datetime.timedelta(hours=24) + + def group_images_into_events( - deployment: Deployment, max_time_gap=datetime.timedelta(minutes=120), delete_empty=True + deployment: Deployment, + max_time_gap=datetime.timedelta(minutes=120), + delete_empty=True, + max_event_duration: datetime.timedelta | None = DEFAULT_MAX_EVENT_DURATION, ) -> list[Event]: # Log a warning if multiple SourceImages have the same timestamp dupes = ( @@ -1417,11 +1423,14 @@ def group_images_into_events( .distinct() ) - timestamp_groups = ami.utils.dates.group_datetimes_by_gap(image_timestamps, max_time_gap) - # @TODO this event grouping needs testing. Still getting events over 24 hours - # timestamp_groups = ami.utils.dates.group_datetimes_by_shifted_day(image_timestamps) + timestamp_groups = ami.utils.dates.group_datetimes_by_gap( + image_timestamps, + max_time_gap, + max_event_duration=max_event_duration, + ) events = [] + touched_event_pks: set[int] = set() for group in timestamp_groups: if not len(group): continue @@ -1445,6 +1454,18 @@ def group_images_into_events( defaults={"start": start_date, "end": end_date}, ) events.append(event) + touched_event_pks.add(event.pk) + + # Track events currently holding these captures — they'll lose captures + # to the UPDATE below and need their cached fields refreshed at the end. + touched_event_pks.update( + SourceImage.objects.filter(deployment=deployment, timestamp__in=group) + .exclude(event__isnull=True) + .exclude(event=event) + .values_list("event_id", flat=True) + .distinct() + ) + SourceImage.objects.filter(deployment=deployment, timestamp__in=group).update(event=event) event.save() # Update start and end times and other cached fields logger.info( @@ -1456,6 +1477,41 @@ def group_images_into_events( f"Done grouping {len(image_timestamps)} captures into {len(events)} events " f"for deployment {deployment}" ) + # Realign Occurrence.event_id with each occurrence's detections' current + # source_image.event_id. Occurrences are bound to an event once at creation + # time (Detection.associate_new_occurrence and Pipeline.save_results both + # read source_image.event), and are never re-derived afterward. Without + # this refresh, a deployment regrouped under the 24h cap keeps every + # occurrence pointing at its original (pre-cap) event regardless of when + # its detections actually fired — breaking every Occurrence.event-keyed + # query (the occur_det_proj_evt index, Event.occurrences related-name, + # event_ids= filters at models.py:4232-4477). Track the events currently + # held by occurrences in this deployment before and after the refresh so + # update_calculated_fields_for_events below picks up both losers and + # gainers of occurrences when it recomputes occurrences_count. + deployment_occurrences = Occurrence.objects.filter(deployment=deployment) + touched_event_pks.update( + deployment_occurrences.exclude(event__isnull=True).values_list("event_id", flat=True).distinct() + ) + deployment_occurrences.update( + event_id=models.Subquery( + Detection.objects.filter(occurrence_id=models.OuterRef("pk")) + .order_by("source_image__timestamp", "source_image_id", "pk") + .values("source_image__event_id")[:1] + ) + ) + touched_event_pks.update( + deployment_occurrences.exclude(event__isnull=True).values_list("event_id", flat=True).distinct() + ) + + # Refresh cached fields on every event touched by grouping. An event reused + # via matching group_by can lose captures to new events created by later + # iterations above, leaving its start/end/captures_count stale — e.g. a + # pre-existing multi-month event being re-grouped under a 24h cap. + # (#904 is expected to rework this reuse path more thoroughly.) + if touched_event_pks: + update_calculated_fields_for_events(pks=list(touched_event_pks)) + if delete_empty: logger.info("Deleting empty events for deployment") delete_empty_events(deployment=deployment) @@ -1465,9 +1521,14 @@ def group_images_into_events( logger.info(f"Setting image dimensions for event {event}") set_dimensions_for_collection(event) - logger.info("Updating relevant cached fields on deployment") - deployment.events_count = len(events) - deployment.save(update_calculated_fields=False, update_fields=["events_count"]) + # Refresh deployment-level cached counts. The async regroup_events task + # never goes through Deployment.save's calculated-fields refresh, so + # without this call the deployment list (occurrences_count, taxa_count, + # etc.) keeps showing pre-regroup numbers until the next save touches it. + # The save inside update_calculated_fields uses update_calculated_fields=False + # so it doesn't re-enter the regroup path. + logger.info("Updating cached fields on deployment") + deployment.update_calculated_fields(save=True) audit_event_lengths(deployment) diff --git a/ami/main/tests.py b/ami/main/tests.py index 95b5d980c..0e7672e1c 100644 --- a/ami/main/tests.py +++ b/ami/main/tests.py @@ -236,6 +236,204 @@ def test_grouping(self): for event in events: assert event.captures.count() == images_per_night + def _populate_continuous_captures(self, days: int = 3, interval_minutes: int = 10): + """Create ``days`` of gap-free captures (no gap > ``interval_minutes``).""" + import pathlib + import uuid + + start = datetime.datetime(2023, 4, 24, 3, 22, 38) + interval = datetime.timedelta(minutes=interval_minutes) + count = int(datetime.timedelta(days=days) / interval) + for i in range(count): + SourceImage.objects.create( + deployment=self.deployment, + timestamp=start + i * interval, + path=pathlib.Path("test") / f"{uuid.uuid4().hex[:8]}_continuous_{i}.jpg", + ) + return count + + def test_continuous_monitoring_capped_at_24_hours(self): + """ + A deployment that captures images continuously (no gap > max_time_gap) + should still be broken into daily events by the max_event_duration cap, + not coalesced into one multi-day event. + """ + self._populate_continuous_captures(days=3, interval_minutes=10) + + events = group_images_into_events( + deployment=self.deployment, + max_time_gap=datetime.timedelta(hours=2), + max_event_duration=datetime.timedelta(hours=24), + ) + + # 3 days × 24h / 10min = 432 captures; capped at 24h → exactly 3 events. + # `== 3` (not `>= 3`) guards against over-splitting regressions too. + assert len(events) == 3, f"expected exactly 3 daily events, got {len(events)}" + for event in events: + duration = event.end - event.start + assert duration <= datetime.timedelta(hours=24), f"event {event.pk} spans {duration}, exceeds 24h cap" + + def test_regrouping_existing_long_event_refreshes_cached_fields(self): + """ + Regression test for the regroup-existing-events path: a deployment + already grouped into a single multi-day event should, after re-running + grouping with the 24h cap, end up with no events exceeding 24h AND + every reused event's cached start/end/captures_count must reflect its + current captures (not its pre-regroup state). + + This is narrower than #904's refactor on purpose: it asserts the + observable cap+refresh behavior without depending on the specific + group_by reuse mechanics that #904 is expected to remove. + """ + total_captures = self._populate_continuous_captures(days=3, interval_minutes=10) + + # First pass with the cap disabled → a single multi-day "mega-event". + events_uncapped = group_images_into_events( + deployment=self.deployment, + max_time_gap=datetime.timedelta(hours=2), + max_event_duration=None, + ) + assert len(events_uncapped) == 1 + mega_event = events_uncapped[0] + assert (mega_event.end - mega_event.start) > datetime.timedelta(hours=24) + + # Second pass with the cap → must split the mega-event and refresh + # cached fields on the reused event. + group_images_into_events( + deployment=self.deployment, + max_time_gap=datetime.timedelta(hours=2), + max_event_duration=datetime.timedelta(hours=24), + ) + + all_events = Event.objects.filter(deployment=self.deployment) + assert all_events.count() == 3, f"expected exactly 3 events after regroup, got {all_events.count()}" + + for event in all_events: + duration = event.end - event.start + assert duration <= datetime.timedelta( + hours=24 + ), f"event {event.pk} spans {duration} after regroup; cached fields are stale" + + # Per-event cached-count check: catches reused events whose captures_count + # was never refreshed after captures were reassigned away. A sum-only check + # can miss this when two events' errors offset each other. + actual_captures = SourceImage.objects.filter(event=event).count() + assert event.captures_count == actual_captures, ( + f"event {event.pk} cached captures_count={event.captures_count} " + f"does not match actual related count={actual_captures}; cached counters are stale" + ) + + # Orphan check: every capture must belong to some event. + total_assigned = sum(e.captures_count for e in all_events) + assert total_assigned == total_captures, ( + f"captures_count across events ({total_assigned}) does not match total captures ({total_captures}); " + f"captures were orphaned during regroup" + ) + + def test_regrouping_realigns_occurrence_event_id(self): + """ + Regression test for stale ``Occurrence.event_id`` after regroup. + + Occurrences are bound to an event once at creation time (from + ``detection.source_image.event``). When the 24h cap runs against a + deployment that already has detections + occurrences attached to a + single mega-event, the source_images are reassigned but the + occurrences' event_ids stay stuck at the mega-event unless we + explicitly realign them. This test asserts the realignment plus the + downstream ``occurrences_count`` consistency on the daily events. + """ + self._populate_continuous_captures(days=3, interval_minutes=10) + captures = list(SourceImage.objects.filter(deployment=self.deployment).order_by("timestamp")) + + # First pass with the cap disabled → one mega-event holding everything. + group_images_into_events( + deployment=self.deployment, + max_time_gap=datetime.timedelta(hours=2), + max_event_duration=None, + ) + mega_event = Event.objects.get(deployment=self.deployment) + + # One occurrence per day, picked at mid-day offsets (12h / 36h / 60h) + # so each target sits well inside its event's window, far from the + # exact 24h boundary where the cap's strict ``>`` semantics matter. + # Index-based selection (e.g. ``captures[len // 3]``) lands at exactly + # 24h offset, where Event 2 starts at 24h+10min (one capture past), + # so two of the three targets would otherwise share an event. + start_ts = captures[0].timestamp + targets = [ + next(c for c in captures if c.timestamp >= start_ts + datetime.timedelta(hours=12)), + next(c for c in captures if c.timestamp >= start_ts + datetime.timedelta(hours=36)), + next(c for c in captures if c.timestamp >= start_ts + datetime.timedelta(hours=60)), + ] + occurrences = [] + for capture in targets: + detection = Detection.objects.create( + source_image=capture, + timestamp=capture.timestamp, + bbox=[10, 10, 20, 20], + ) + occurrence = Occurrence.objects.create( + event=mega_event, + deployment=self.deployment, + project=self.project, + ) + detection.occurrence = occurrence + detection.save() + occurrences.append(occurrence) + + # Sanity: all three occurrences point at the mega-event before regroup. + for occurrence in occurrences: + occurrence.refresh_from_db() + assert occurrence.event_id == mega_event.pk + + # Second pass: 24h cap → 3 daily events, each occurrence must follow + # its detection's source_image into the corresponding daily event. + group_images_into_events( + deployment=self.deployment, + max_time_gap=datetime.timedelta(hours=2), + max_event_duration=datetime.timedelta(hours=24), + ) + + for occurrence in occurrences: + occurrence.refresh_from_db() + first_detection = ( + Detection.objects.filter(occurrence=occurrence) + .select_related("source_image") + .order_by("source_image__timestamp") + .first() + ) + assert first_detection is not None + expected_event_id = first_detection.source_image.event_id + assert occurrence.event_id == expected_event_id, ( + f"occurrence {occurrence.pk}: stale event_id={occurrence.event_id} " + f"(expected {expected_event_id} from first detection's source_image)" + ) + + # Realignment must move all three occurrences onto distinct daily + # events. With targets at mid-day offsets, the three occurrences land + # on three different events — one on each day. + distinct_event_ids = {occ.event_id for occ in occurrences} + assert ( + len(distinct_event_ids) == 3 + ), f"expected 3 distinct event_ids across occurrences, got {distinct_event_ids}" + + # Each daily event's cached ``occurrences_count`` must match the live + # computation that ``update_calculated_fields`` itself uses (which + # applies the project's default filters). Catches the case where + # occurrences moved off an event but its cached counter wasn't + # refreshed because the event wasn't tracked as touched. + daily_events = Event.objects.filter(deployment=self.deployment) + assert daily_events.count() == 3 + for event in daily_events: + expected = event.get_occurrences_count() + assert event.occurrences_count == expected, ( + f"event {event.pk} cached occurrences_count={event.occurrences_count} " + f"!= live get_occurrences_count()={expected}; cached counter is stale" + ) + + # No occurrence should be left pointing at a deleted/missing event. + assert Occurrence.objects.filter(deployment=self.deployment, event__isnull=True).count() == 0 + def test_pruning_empty_events(self): from ami.main.models import delete_empty_events diff --git a/ami/utils/dates.py b/ami/utils/dates.py index 02dfc4350..bdc2dcf21 100644 --- a/ami/utils/dates.py +++ b/ami/utils/dates.py @@ -112,10 +112,15 @@ def format_timedelta(duration: datetime.timedelta | None) -> str: def group_datetimes_by_gap( timestamps: list[datetime.datetime], max_time_gap=datetime.timedelta(minutes=120), + max_event_duration: datetime.timedelta | None = None, ) -> list[list[datetime.datetime]]: """ Divide a list of timestamps into groups based on a maximum time gap. + When ``max_event_duration`` is set, a group is also split once it would + exceed that duration. This prevents continuous-monitoring deployments + (no quiet gap between nights) from producing a single multi-month group. + >>> timestamps = [ ... datetime.datetime(2021, 1, 1, 0, 10, 0), # @TODO confirm the first gap is having an effect ... datetime.datetime(2021, 1, 1, 0, 19, 0), @@ -145,6 +150,22 @@ def group_datetimes_by_gap( >>> result = group_datetimes_by_gap(timestamps, max_time_gap=datetime.timedelta(minutes=1)) >>> len(result) 10 + + Continuous-monitoring case: a long gap-free stream gets capped by + ``max_event_duration`` even when no gap exceeds ``max_time_gap``. + + >>> continuous = [datetime.datetime(2021, 1, 1) + datetime.timedelta(minutes=5 * i) for i in range(24 * 12 * 3)] + >>> len(group_datetimes_by_gap(continuous, max_time_gap=datetime.timedelta(minutes=120))) + 1 + >>> groups = group_datetimes_by_gap( + ... continuous, + ... max_time_gap=datetime.timedelta(minutes=120), + ... max_event_duration=datetime.timedelta(hours=24), + ... ) + >>> len(groups) + 3 + >>> all((g[-1] - g[0]) <= datetime.timedelta(hours=24) for g in groups) + True """ timestamps.sort() prev_timestamp: datetime.datetime | None = None @@ -157,7 +178,12 @@ def group_datetimes_by_gap( else: delta = datetime.timedelta(0) - if delta >= max_time_gap: + split_by_gap = delta >= max_time_gap + split_by_duration = ( + max_event_duration is not None and current_group and (timestamp - current_group[0]) > max_event_duration + ) + + if split_by_gap or split_by_duration: groups.append(current_group) current_group = []