Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 68 additions & 7 deletions ami/main/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1389,8 +1389,14 @@ def audit_event_lengths(deployment: Deployment):
logger.error(f"Found {events_ending_before_start} event(s) with start > end in deployment {deployment}")


DEFAULT_MAX_EVENT_DURATION = datetime.timedelta(hours=24)


def group_images_into_events(
deployment: Deployment, max_time_gap=datetime.timedelta(minutes=120), delete_empty=True
deployment: Deployment,
max_time_gap=datetime.timedelta(minutes=120),
delete_empty=True,
max_event_duration: datetime.timedelta | None = DEFAULT_MAX_EVENT_DURATION,
) -> list[Event]:
# Log a warning if multiple SourceImages have the same timestamp
dupes = (
Expand All @@ -1417,11 +1423,14 @@ def group_images_into_events(
.distinct()
)

timestamp_groups = ami.utils.dates.group_datetimes_by_gap(image_timestamps, max_time_gap)
# @TODO this event grouping needs testing. Still getting events over 24 hours
# timestamp_groups = ami.utils.dates.group_datetimes_by_shifted_day(image_timestamps)
timestamp_groups = ami.utils.dates.group_datetimes_by_gap(
image_timestamps,
max_time_gap,
max_event_duration=max_event_duration,
)
Comment thread
mihow marked this conversation as resolved.

events = []
touched_event_pks: set[int] = set()
for group in timestamp_groups:
if not len(group):
continue
Expand All @@ -1445,6 +1454,18 @@ def group_images_into_events(
defaults={"start": start_date, "end": end_date},
)
events.append(event)
touched_event_pks.add(event.pk)

# Track events currently holding these captures — they'll lose captures
# to the UPDATE below and need their cached fields refreshed at the end.
touched_event_pks.update(
SourceImage.objects.filter(deployment=deployment, timestamp__in=group)
.exclude(event__isnull=True)
.exclude(event=event)
.values_list("event_id", flat=True)
.distinct()
)

SourceImage.objects.filter(deployment=deployment, timestamp__in=group).update(event=event)
event.save() # Update start and end times and other cached fields
logger.info(
Expand All @@ -1456,6 +1477,41 @@ def group_images_into_events(
f"Done grouping {len(image_timestamps)} captures into {len(events)} events " f"for deployment {deployment}"
)

# Realign Occurrence.event_id with each occurrence's detections' current
# source_image.event_id. Occurrences are bound to an event once at creation
# time (Detection.associate_new_occurrence and Pipeline.save_results both
# read source_image.event), and are never re-derived afterward. Without
# this refresh, a deployment regrouped under the 24h cap keeps every
# occurrence pointing at its original (pre-cap) event regardless of when
# its detections actually fired — breaking every Occurrence.event-keyed
# query (the occur_det_proj_evt index, Event.occurrences related-name,
# event_ids= filters at models.py:4232-4477). Track the events currently
# held by occurrences in this deployment before and after the refresh so
# update_calculated_fields_for_events below picks up both losers and
# gainers of occurrences when it recomputes occurrences_count.
deployment_occurrences = Occurrence.objects.filter(deployment=deployment)
touched_event_pks.update(
deployment_occurrences.exclude(event__isnull=True).values_list("event_id", flat=True).distinct()
)
deployment_occurrences.update(
event_id=models.Subquery(
Detection.objects.filter(occurrence_id=models.OuterRef("pk"))
.order_by("source_image__timestamp")
.values("source_image__event_id")[:1]
)
)
touched_event_pks.update(
deployment_occurrences.exclude(event__isnull=True).values_list("event_id", flat=True).distinct()
)

# Refresh cached fields on every event touched by grouping. An event reused
# via matching group_by can lose captures to new events created by later
# iterations above, leaving its start/end/captures_count stale — e.g. a
# pre-existing multi-month event being re-grouped under a 24h cap.
# (#904 is expected to rework this reuse path more thoroughly.)
if touched_event_pks:
update_calculated_fields_for_events(pks=list(touched_event_pks))

if delete_empty:
logger.info("Deleting empty events for deployment")
delete_empty_events(deployment=deployment)
Expand All @@ -1465,9 +1521,14 @@ def group_images_into_events(
logger.info(f"Setting image dimensions for event {event}")
set_dimensions_for_collection(event)

logger.info("Updating relevant cached fields on deployment")
deployment.events_count = len(events)
deployment.save(update_calculated_fields=False, update_fields=["events_count"])
# Refresh deployment-level cached counts. The async regroup_events task
# never goes through Deployment.save's calculated-fields refresh, so
# without this call the deployment list (occurrences_count, taxa_count,
# etc.) keeps showing pre-regroup numbers until the next save touches it.
# The save inside update_calculated_fields uses update_calculated_fields=False
# so it doesn't re-enter the regroup path.
logger.info("Updating cached fields on deployment")
deployment.update_calculated_fields(save=True)

audit_event_lengths(deployment)

Expand Down
198 changes: 198 additions & 0 deletions ami/main/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,204 @@ def test_grouping(self):
for event in events:
assert event.captures.count() == images_per_night

def _populate_continuous_captures(self, days: int = 3, interval_minutes: int = 10):
"""Create ``days`` of gap-free captures (no gap > ``interval_minutes``)."""
import pathlib
import uuid

start = datetime.datetime(2023, 4, 24, 3, 22, 38)
interval = datetime.timedelta(minutes=interval_minutes)
count = int(datetime.timedelta(days=days) / interval)
for i in range(count):
SourceImage.objects.create(
deployment=self.deployment,
timestamp=start + i * interval,
path=pathlib.Path("test") / f"{uuid.uuid4().hex[:8]}_continuous_{i}.jpg",
)
return count

def test_continuous_monitoring_capped_at_24_hours(self):
"""
A deployment that captures images continuously (no gap > max_time_gap)
should still be broken into daily events by the max_event_duration cap,
not coalesced into one multi-day event.
"""
self._populate_continuous_captures(days=3, interval_minutes=10)

events = group_images_into_events(
deployment=self.deployment,
max_time_gap=datetime.timedelta(hours=2),
max_event_duration=datetime.timedelta(hours=24),
)

# 3 days × 24h / 10min = 432 captures; capped at 24h → exactly 3 events.
# `== 3` (not `>= 3`) guards against over-splitting regressions too.
assert len(events) == 3, f"expected exactly 3 daily events, got {len(events)}"
for event in events:
duration = event.end - event.start
assert duration <= datetime.timedelta(hours=24), f"event {event.pk} spans {duration}, exceeds 24h cap"

Comment thread
coderabbitai[bot] marked this conversation as resolved.
def test_regrouping_existing_long_event_refreshes_cached_fields(self):
"""
Regression test for the regroup-existing-events path: a deployment
already grouped into a single multi-day event should, after re-running
grouping with the 24h cap, end up with no events exceeding 24h AND
every reused event's cached start/end/captures_count must reflect its
current captures (not its pre-regroup state).

This is narrower than #904's refactor on purpose: it asserts the
observable cap+refresh behavior without depending on the specific
group_by reuse mechanics that #904 is expected to remove.
"""
total_captures = self._populate_continuous_captures(days=3, interval_minutes=10)

# First pass with the cap disabled → a single multi-day "mega-event".
events_uncapped = group_images_into_events(
deployment=self.deployment,
max_time_gap=datetime.timedelta(hours=2),
max_event_duration=None,
)
assert len(events_uncapped) == 1
mega_event = events_uncapped[0]
assert (mega_event.end - mega_event.start) > datetime.timedelta(hours=24)

# Second pass with the cap → must split the mega-event and refresh
# cached fields on the reused event.
group_images_into_events(
deployment=self.deployment,
max_time_gap=datetime.timedelta(hours=2),
max_event_duration=datetime.timedelta(hours=24),
)

all_events = Event.objects.filter(deployment=self.deployment)
assert all_events.count() == 3, f"expected exactly 3 events after regroup, got {all_events.count()}"

for event in all_events:
duration = event.end - event.start
assert duration <= datetime.timedelta(
hours=24
), f"event {event.pk} spans {duration} after regroup; cached fields are stale"

# Per-event cached-count check: catches reused events whose captures_count
# was never refreshed after captures were reassigned away. A sum-only check
# can miss this when two events' errors offset each other.
actual_captures = SourceImage.objects.filter(event=event).count()
assert event.captures_count == actual_captures, (
f"event {event.pk} cached captures_count={event.captures_count} "
f"does not match actual related count={actual_captures}; cached counters are stale"
)

# Orphan check: every capture must belong to some event.
total_assigned = sum(e.captures_count for e in all_events)
assert total_assigned == total_captures, (
f"captures_count across events ({total_assigned}) does not match total captures ({total_captures}); "
f"captures were orphaned during regroup"
)

def test_regrouping_realigns_occurrence_event_id(self):
"""
Regression test for stale ``Occurrence.event_id`` after regroup.

Occurrences are bound to an event once at creation time (from
``detection.source_image.event``). When the 24h cap runs against a
deployment that already has detections + occurrences attached to a
single mega-event, the source_images are reassigned but the
occurrences' event_ids stay stuck at the mega-event unless we
explicitly realign them. This test asserts the realignment plus the
downstream ``occurrences_count`` consistency on the daily events.
"""
self._populate_continuous_captures(days=3, interval_minutes=10)
captures = list(SourceImage.objects.filter(deployment=self.deployment).order_by("timestamp"))

# First pass with the cap disabled → one mega-event holding everything.
group_images_into_events(
deployment=self.deployment,
max_time_gap=datetime.timedelta(hours=2),
max_event_duration=None,
)
mega_event = Event.objects.get(deployment=self.deployment)

# One occurrence per day, picked at mid-day offsets (12h / 36h / 60h)
# so each target sits well inside its event's window, far from the
# exact 24h boundary where the cap's strict ``>`` semantics matter.
# Index-based selection (e.g. ``captures[len // 3]``) lands at exactly
# 24h offset, where Event 2 starts at 24h+10min (one capture past),
# so two of the three targets would otherwise share an event.
start_ts = captures[0].timestamp
targets = [
next(c for c in captures if c.timestamp >= start_ts + datetime.timedelta(hours=12)),
next(c for c in captures if c.timestamp >= start_ts + datetime.timedelta(hours=36)),
next(c for c in captures if c.timestamp >= start_ts + datetime.timedelta(hours=60)),
]
occurrences = []
for capture in targets:
detection = Detection.objects.create(
source_image=capture,
timestamp=capture.timestamp,
bbox=[0.1, 0.1, 0.2, 0.2],
)
occurrence = Occurrence.objects.create(
event=mega_event,
deployment=self.deployment,
project=self.project,
)
detection.occurrence = occurrence
detection.save()
occurrences.append(occurrence)

# Sanity: all three occurrences point at the mega-event before regroup.
for occurrence in occurrences:
occurrence.refresh_from_db()
assert occurrence.event_id == mega_event.pk

# Second pass: 24h cap → 3 daily events, each occurrence must follow
# its detection's source_image into the corresponding daily event.
group_images_into_events(
deployment=self.deployment,
max_time_gap=datetime.timedelta(hours=2),
max_event_duration=datetime.timedelta(hours=24),
)
Comment thread
coderabbitai[bot] marked this conversation as resolved.

for occurrence in occurrences:
occurrence.refresh_from_db()
first_detection = (
Detection.objects.filter(occurrence=occurrence)
.select_related("source_image")
.order_by("source_image__timestamp")
.first()
)
assert first_detection is not None
expected_event_id = first_detection.source_image.event_id
assert occurrence.event_id == expected_event_id, (
f"occurrence {occurrence.pk}: stale event_id={occurrence.event_id} "
f"(expected {expected_event_id} from first detection's source_image)"
)

# Realignment must move all three occurrences onto distinct daily
# events. With targets at mid-day offsets, the three occurrences land
# on three different events — one on each day.
distinct_event_ids = {occ.event_id for occ in occurrences}
assert (
len(distinct_event_ids) == 3
), f"expected 3 distinct event_ids across occurrences, got {distinct_event_ids}"

# Each daily event's cached ``occurrences_count`` must match the live
# computation that ``update_calculated_fields`` itself uses (which
# applies the project's default filters). Catches the case where
# occurrences moved off an event but its cached counter wasn't
# refreshed because the event wasn't tracked as touched.
daily_events = Event.objects.filter(deployment=self.deployment)
assert daily_events.count() == 3
for event in daily_events:
expected = event.get_occurrences_count()
assert event.occurrences_count == expected, (
f"event {event.pk} cached occurrences_count={event.occurrences_count} "
f"!= live get_occurrences_count()={expected}; cached counter is stale"
)

# No occurrence should be left pointing at a deleted/missing event.
assert Occurrence.objects.filter(deployment=self.deployment, event__isnull=True).count() == 0

def test_pruning_empty_events(self):
from ami.main.models import delete_empty_events

Expand Down
28 changes: 27 additions & 1 deletion ami/utils/dates.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,15 @@ def format_timedelta(duration: datetime.timedelta | None) -> str:
def group_datetimes_by_gap(
timestamps: list[datetime.datetime],
max_time_gap=datetime.timedelta(minutes=120),
max_event_duration: datetime.timedelta | None = None,
) -> list[list[datetime.datetime]]:
"""
Divide a list of timestamps into groups based on a maximum time gap.

When ``max_event_duration`` is set, a group is also split once it would
exceed that duration. This prevents continuous-monitoring deployments
(no quiet gap between nights) from producing a single multi-month group.

>>> timestamps = [
... datetime.datetime(2021, 1, 1, 0, 10, 0), # @TODO confirm the first gap is having an effect
... datetime.datetime(2021, 1, 1, 0, 19, 0),
Expand Down Expand Up @@ -145,6 +150,22 @@ def group_datetimes_by_gap(
>>> result = group_datetimes_by_gap(timestamps, max_time_gap=datetime.timedelta(minutes=1))
>>> len(result)
10

Continuous-monitoring case: a long gap-free stream gets capped by
``max_event_duration`` even when no gap exceeds ``max_time_gap``.

>>> continuous = [datetime.datetime(2021, 1, 1) + datetime.timedelta(minutes=5 * i) for i in range(24 * 12 * 3)]
>>> len(group_datetimes_by_gap(continuous, max_time_gap=datetime.timedelta(minutes=120)))
1
>>> groups = group_datetimes_by_gap(
... continuous,
... max_time_gap=datetime.timedelta(minutes=120),
... max_event_duration=datetime.timedelta(hours=24),
... )
>>> len(groups)
3
>>> all((g[-1] - g[0]) <= datetime.timedelta(hours=24) for g in groups)
True
"""
timestamps.sort()
prev_timestamp: datetime.datetime | None = None
Expand All @@ -157,7 +178,12 @@ def group_datetimes_by_gap(
else:
delta = datetime.timedelta(0)

if delta >= max_time_gap:
split_by_gap = delta >= max_time_gap
split_by_duration = (
max_event_duration is not None and current_group and (timestamp - current_group[0]) > max_event_duration
)

if split_by_gap or split_by_duration:
groups.append(current_group)
current_group = []

Expand Down
Loading