Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
b60eab0
merge
carlos-irreverentlabs Jan 16, 2026
644927f
Merge remote-tracking branch 'upstream/main'
carlosgjs Jan 22, 2026
218f7aa
Merge remote-tracking branch 'upstream/main'
carlosgjs Feb 3, 2026
90da389
Merge remote-tracking branch 'upstream/main'
carlosgjs Feb 10, 2026
8618d3c
Merge remote-tracking branch 'upstream/main'
carlosgjs Feb 13, 2026
bd1be5f
Merge remote-tracking branch 'upstream/main'
carlosgjs Feb 17, 2026
b102ae1
Merge remote-tracking branch 'upstream/main'
carlosgjs Feb 19, 2026
bc908aa
fix: PSv2 follow-up fixes from integration tests (#1135)
mihow Feb 21, 2026
4c3802a
PSv2: Improve task fetching & web worker concurrency configuration (#…
carlosgjs Feb 21, 2026
b717e80
fix: include pipeline_slug in MinimalJobSerializer (#1148)
mihow Feb 21, 2026
883c4f8
Merge remote-tracking branch 'upstream/main'
carlosgjs Feb 24, 2026
e26f3c6
Merge remote-tracking branch 'upstream/main'
carlosgjs Feb 24, 2026
4ef7a24
Merge branch 'RolnickLab:main' into main
mihow Feb 27, 2026
c389e90
Merge remote-tracking branch 'upstream/main'
carlosgjs Feb 27, 2026
33a6425
Merge branch 'main' of github.com:uw-ssec/antenna
carlosgjs Feb 27, 2026
bf80824
Merge remote-tracking branch 'upstream/main'
carlosgjs Mar 4, 2026
a2e68a0
WIP: Add support for NATS dead-letter-queue
carlosgjs Mar 4, 2026
db05526
Update tests
carlosgjs Mar 6, 2026
602f825
Add tests, cleanup naming and error handling
carlosgjs Mar 10, 2026
0102ee7
More CR feedback
carlosgjs Mar 10, 2026
b44f5b0
Use constant
carlosgjs Mar 10, 2026
5c3a47b
CR
carlosgjs Mar 10, 2026
e09fd9a
let exception propagate
carlosgjs Mar 10, 2026
e4564fb
Use async_to_sync
carlosgjs Mar 10, 2026
9775627
refactor: rename ProcessingService last_checked → last_seen fields (#…
mihow Feb 21, 2026
8372e0a
style: fix prettier formatting in pipeline.ts
mihow Feb 21, 2026
3066b29
feat: async PS liveness tracking and ProcessingServiceQuerySet API
mihow Feb 27, 2026
3fbcb0a
feat: pull-mode PS status tracking and UI null endpoint fix
mihow Feb 27, 2026
4032790
fix: import error and null last_seen handling
mihow Feb 27, 2026
70a6898
fix: run async stale-check first, reduce beat task timeout and limits
mihow Feb 27, 2026
df4b5f2
fix: update pull-mode status tests to match heartbeat-based contract
mihow Feb 27, 2026
cd349f6
fix: scope heartbeat update to job's project
mihow Feb 27, 2026
9096625
feat: expose is_async property to frontend
mihow Feb 27, 2026
7a1a81a
fix: periodic service check — async first, short timeout, discard sta…
mihow Feb 27, 2026
9cd2ddb
docs: explain get_status feature for sync vs. async services
mihow Mar 24, 2026
e07e5ed
fix(ui): coerce nullable lastSeenLive and use i18n for "Last seen" label
mihow Mar 24, 2026
e3b00ac
fix: handle missing latency in service selection, improve beat task l…
mihow Mar 24, 2026
e2ff110
feat(ui): show "Unknown" status for async processing services
mihow Mar 24, 2026
770de40
fix(ui): treat async pipelines as selectable in pipeline picker
mihow Mar 24, 2026
f0598ac
refactor(ui): use isAsync instead of !endpointUrl for async service c…
mihow Mar 24, 2026
ddc4a7c
fix: consistent status response payload and defensive isAsync coerce
mihow Mar 24, 2026
0b9d283
fix: add missing 'project' field to ProcessingServiceSerializer field…
mihow Mar 24, 2026
203ad5d
style: fix prettier formatting in processing-services-columns
mihow Mar 24, 2026
b72187a
Merge branch 'carlos/natsdlq' into demo/integration
mihow Mar 24, 2026
d1e8836
Merge remote-tracking branch 'origin/feat/update-staging-compose' int…
mihow Mar 24, 2026
92ef439
Merge remote-tracking branch 'origin/feat/update-staging-compose' int…
mihow Mar 24, 2026
ccbabdd
fix(ui): handle null occurrence determination gracefully
mihow Mar 25, 2026
0b59d9c
fix: add occurrence determination reconciliation
mihow Mar 25, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .agents/AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,8 @@ Processing services are FastAPI applications that implement the AMI ML API contr
**Health Checks:**
- Cached status with 3 retries and exponential backoff (0s, 2s, 4s)
- Celery Beat task runs periodic checks (`ami.ml.tasks.check_processing_services_online`)
- Status stored in `ProcessingService.last_checked_live` boolean field
- Status stored in `ProcessingService.last_seen_live` boolean field
- Async/pull-mode services update status via `mark_seen()` when they register pipelines
- UI shows red/green indicator based on cached status

Location: `processing_services/` directory contains example implementations
Expand Down
5 changes: 3 additions & 2 deletions .agents/DATABASE_SCHEMA.md
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,9 @@ erDiagram
bigint id PK
string name
string endpoint_url
boolean last_checked_live
float last_checked_latency
datetime last_seen
boolean last_seen_live
float last_seen_latency
}

ProjectPipelineConfig {
Expand Down
12 changes: 11 additions & 1 deletion ami/jobs/tests/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
from ami.jobs.models import Job, JobDispatchMode, JobState, MLJob
from ami.jobs.tasks import process_nats_pipeline_result
from ami.main.models import Detection, Project, SourceImage, SourceImageCollection
from ami.ml.models import Pipeline
from ami.ml.models import Algorithm, Pipeline
from ami.ml.models.algorithm import AlgorithmTaskType
from ami.ml.orchestration.async_job_state import AsyncJobStateManager
from ami.ml.schemas import PipelineResultsError, PipelineResultsResponse, SourceImageResponse
from ami.users.models import User
Expand Down Expand Up @@ -180,6 +181,15 @@ def test_process_nats_pipeline_result_mixed_results(self, mock_manager_class):
"""
mock_manager = self._setup_mock_nats(mock_manager_class)

# Create detection algorithm for the pipeline
detection_algorithm = Algorithm.objects.create(
name="test-detector",
key="test-detector",
task_type=AlgorithmTaskType.LOCALIZATION,
)
# Update pipeline to include detection algorithm
self.pipeline.algorithms.add(detection_algorithm)

# For this test, we just want to verify progress tracking works with mixed results
# We'll skip checking final job completion status since that depends on all stages

Expand Down
30 changes: 30 additions & 0 deletions ami/jobs/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,30 @@
logger = logging.getLogger(__name__)


def _mark_pipeline_pull_services_seen(job: "Job") -> None:
"""
Record a heartbeat for async (pull-mode) processing services linked to the job's pipeline.

Called on every task-fetch and result-submit request so that the worker's polling activity
keeps last_seen/last_seen_live current. The periodic check_processing_services_online task
will mark services offline if this heartbeat stops arriving within PROCESSING_SERVICE_LAST_SEEN_MAX.

IMPORTANT: This marks ALL async services on the pipeline within this project as live, not just
the specific service that made the request. If multiple async services share the same pipeline
within a project, a single worker polling will keep all of them appearing online.
Once application-token auth is available (PR #1117), this should be scoped to the individual
calling service instead.
"""
import datetime

if not job.pipeline_id:
return
job.pipeline.processing_services.async_services().filter(projects=job.project_id).update(
last_seen=datetime.datetime.now(),
last_seen_live=True,
)


class JobFilterSet(filters.FilterSet):
"""Custom filterset to enable pipeline name filtering."""

Expand Down Expand Up @@ -245,6 +269,9 @@ def tasks(self, request, pk=None):
if not job.pipeline:
raise ValidationError("This job does not have a pipeline configured")

# Record heartbeat for async processing services on this pipeline
_mark_pipeline_pull_services_seen(job)

# Get tasks from NATS JetStream
from ami.ml.orchestration.nats_queue import TaskQueueManager

Expand Down Expand Up @@ -272,6 +299,9 @@ def result(self, request, pk=None):

job = self.get_object()

# Record heartbeat for async processing services on this pipeline
_mark_pipeline_pull_services_seen(job)

# Validate request data is a list
if isinstance(request.data, list):
results = request.data
Expand Down
8 changes: 7 additions & 1 deletion ami/main/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ class SourceImageAdmin(AdminBase):
"checksum",
"checksum_algorithm",
"created_at",
"get_was_processed",
)

list_filter = (
Expand All @@ -281,7 +282,12 @@ class SourceImageAdmin(AdminBase):
)

def get_queryset(self, request: HttpRequest) -> QuerySet[Any]:
return super().get_queryset(request).select_related("event", "deployment", "deployment__data_source")
return (
super()
.get_queryset(request)
.select_related("event", "deployment", "deployment__data_source")
.with_was_processed() # avoids N+1 from get_was_processed in list_display
)


class ClassificationInline(admin.TabularInline):
Expand Down
2 changes: 2 additions & 0 deletions ami/main/api/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -1246,6 +1246,7 @@ class Meta:
"source_images",
"source_images_count",
"source_images_with_detections_count",
"source_images_processed_count",
"occurrences_count",
"taxa_count",
"description",
Expand Down Expand Up @@ -1547,6 +1548,7 @@ class EventTimelineIntervalSerializer(serializers.Serializer):
captures_count = serializers.IntegerField()
detections_count = serializers.IntegerField()
detections_avg = serializers.IntegerField()
was_processed = serializers.BooleanField()


class EventTimelineMetaSerializer(serializers.Serializer):
Expand Down
15 changes: 11 additions & 4 deletions ami/main/api/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from ami.utils.storages import ConnectionTestResult

from ..models import (
NULL_DETECTIONS_FILTER,
Classification,
Deployment,
Detection,
Expand Down Expand Up @@ -378,7 +379,7 @@ def timeline(self, request, pk=None):
)
resolution = datetime.timedelta(minutes=resolution_minutes)

qs = SourceImage.objects.filter(event=event)
qs = SourceImage.objects.filter(event=event).with_was_processed() # type: ignore

# Bulk update all source images where detections_count is null
update_detection_counts(qs=qs, null_only=True)
Expand All @@ -404,7 +405,7 @@ def timeline(self, request, pk=None):
source_images = list(
qs.filter(timestamp__range=(start_time, end_time))
.order_by("timestamp")
.values("id", "timestamp", "detections_count")
.values("id", "timestamp", "detections_count", "was_processed")
)

timeline = []
Expand All @@ -421,6 +422,7 @@ def timeline(self, request, pk=None):
"captures_count": 0,
"detections_count": 0,
"detection_counts": [],
"was_processed": False,
}

while image_index < len(source_images) and source_images[image_index]["timestamp"] <= interval_end:
Expand All @@ -432,6 +434,9 @@ def timeline(self, request, pk=None):
interval_data["detection_counts"] += [image["detections_count"]]
if image["detections_count"] >= max(interval_data["detection_counts"]):
interval_data["top_capture"] = SourceImage(pk=image["id"])
# Track if any image in this interval was processed
if image["was_processed"]:
interval_data["was_processed"] = True
image_index += 1

# Set a meaningful average detection count to display for the interval
Expand Down Expand Up @@ -602,7 +607,7 @@ def prefetch_detections(self, queryset: QuerySet, project: Project | None = None
score = get_default_classification_threshold(project, self.request)

prefetch_queryset = (
Detection.objects.all()
Detection.objects.exclude(NULL_DETECTIONS_FILTER)
.annotate(
determination_score=models.Max("occurrence__detections__classifications__score"),
# Store whether this occurrence should be included based on default filters
Expand Down Expand Up @@ -709,6 +714,7 @@ class SourceImageCollectionViewSet(DefaultViewSet, ProjectMixin):
SourceImageCollection.objects.all()
.with_source_images_count() # type: ignore
.with_source_images_with_detections_count()
.with_source_images_processed_count()
.prefetch_related("jobs")
)
serializer_class = SourceImageCollectionSerializer
Expand All @@ -724,6 +730,7 @@ class SourceImageCollectionViewSet(DefaultViewSet, ProjectMixin):
"method",
"source_images_count",
"source_images_with_detections_count",
"source_images_processed_count",
"occurrences_count",
]

Expand Down Expand Up @@ -898,7 +905,7 @@ class DetectionViewSet(DefaultViewSet, ProjectMixin):
API endpoint that allows detections to be viewed or edited.
"""

queryset = Detection.objects.all().select_related("source_image", "detection_algorithm")
queryset = Detection.objects.exclude(NULL_DETECTIONS_FILTER).select_related("source_image", "detection_algorithm")
serializer_class = DetectionSerializer
filterset_fields = ["source_image", "detection_algorithm", "source_image__project"]
ordering_fields = ["created_at", "updated_at", "detection_score", "timestamp"]
Expand Down
106 changes: 106 additions & 0 deletions ami/main/integrity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
"""
Data integrity checks for the main app.

Functions here can be called from management commands, post-job hooks,
or periodic Celery tasks.
"""

import dataclasses
import logging

logger = logging.getLogger(__name__)


@dataclasses.dataclass
class ReconcileResult:
checked: int = 0
fixed: int = 0
unfixable: int = 0


def get_occurrences_missing_determination(
project_id: int | None = None,
job_id: int | None = None,
):
"""
Return occurrences that have detections with classifications but no determination set.

Occurrences with no classifications at all are excluded (they legitimately have no
determination).
"""
from ami.main.models import Occurrence

qs = Occurrence.objects.filter(
determination__isnull=True,
detections__classifications__isnull=False,
).distinct()

if project_id is not None:
qs = qs.filter(project_id=project_id)

if job_id is not None:
from ami.jobs.models import Job

job = Job.objects.get(pk=job_id)
if job.pipeline:
qs = qs.filter(
detections__classifications__algorithm__in=job.pipeline.algorithms.all(),
project_id=job.project_id,
)

return qs


def reconcile_missing_determinations(
project_id: int | None = None,
job_id: int | None = None,
occurrence_ids: list[int] | None = None,
dry_run: bool = False,
) -> ReconcileResult:
"""
Find occurrences missing determinations and attempt to fix them by re-running
update_occurrence_determination.
"""
from ami.main.models import update_occurrence_determination

if occurrence_ids is not None:
from ami.main.models import Occurrence

occurrences = (
Occurrence.objects.filter(
pk__in=occurrence_ids,
determination__isnull=True,
detections__classifications__isnull=False,
)
.distinct()
.select_related("determination")
)
else:
occurrences = get_occurrences_missing_determination(
project_id=project_id,
job_id=job_id,
).select_related("determination")

result = ReconcileResult(checked=occurrences.count())

if result.checked == 0 or dry_run:
return result

logger.info(f"Found {result.checked} occurrences missing determination")

for occurrence in occurrences.iterator():
try:
updated = update_occurrence_determination(occurrence, current_determination=None, save=True)
if updated:
result.fixed += 1
else:
result.unfixable += 1
except Exception:
result.unfixable += 1
logger.exception(f"Error reconciling occurrence {occurrence.pk}")

logger.info(
f"Reconciliation complete: {result.fixed} fixed, {result.unfixable} unfixable "
f"out of {result.checked} checked"
)
return result
35 changes: 35 additions & 0 deletions ami/main/management/commands/check_data_integrity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import logging

from django.core.management.base import BaseCommand

from ami.main.integrity import reconcile_missing_determinations

logger = logging.getLogger(__name__)


class Command(BaseCommand):
help = "Find and fix occurrences missing determinations."

def add_arguments(self, parser):
parser.add_argument("--dry-run", action="store_true", help="Report issues without fixing them")
parser.add_argument("--project", type=int, help="Limit to a specific project ID")
parser.add_argument("--job", type=int, help="Limit to occurrences related to a specific job ID")

def handle(self, *args, **options):
dry_run = options["dry_run"]
if dry_run:
self.stdout.write("DRY RUN — no changes will be made\n")

result = reconcile_missing_determinations(
project_id=options.get("project"),
job_id=options.get("job"),
dry_run=dry_run,
)

self.stdout.write(f"Checked: {result.checked}")
if result.fixed:
self.stdout.write(self.style.SUCCESS(f"Fixed: {result.fixed}"))
if result.unfixable:
self.stdout.write(self.style.WARNING(f"Unfixable: {result.unfixable}"))
if result.checked == 0:
self.stdout.write(self.style.SUCCESS("No issues found."))
Loading
Loading