Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions ami/jobs/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,14 @@ def run(cls, job: "Job"):
job.finished_at = datetime.datetime.now()
job.save()
return
# When all stages are already complete (e.g. 0 images to process),
# finalize the job now since no async results will arrive to trigger completion.
if job.progress.is_complete():
has_failure = any(s.status in JobState.failed_states() for s in job.progress.stages)
job.update_status(JobState.FAILURE if has_failure else JobState.SUCCESS, save=False)
job.finished_at = datetime.datetime.now()
job.save()
cleanup_async_job_if_needed(job)
else:
cls.process_images(job, images)

Expand Down
31 changes: 31 additions & 0 deletions ami/jobs/tests/test_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,37 @@ def test_job_status_guard_prevents_premature_success(self):
self.assertEqual(job.status, initial_status)
self.assertNotEqual(job.status, JobState.SUCCESS.value)

def test_async_job_completes_when_zero_images(self):
"""Job with 0 images to process should finalize immediately, not stay STARTED."""
from unittest.mock import patch

job = Job.objects.create(
job_type_key=MLJob.key,
project=self.project,
name="Test zero images job",
pipeline=self.pipeline,
source_image_collection=self.source_image_collection,
dispatch_mode=JobDispatchMode.ASYNC_API,
)

def mock_queue(job, images):
"""Simulate queue_images_to_nats with 0 images: sets stages to SUCCESS, returns True."""
job.progress.update_stage("process", status=JobState.SUCCESS, progress=1.0)
job.progress.update_stage("results", status=JobState.SUCCESS, progress=1.0)
job.save()
return True

with (
patch.object(job.pipeline, "collect_images", return_value=[]),
patch("ami.ml.orchestration.jobs.queue_images_to_nats", side_effect=mock_queue),
patch("ami.jobs.tasks.cleanup_async_job_if_needed"),
):
job.run()

job.refresh_from_db()
self.assertEqual(job.status, JobState.SUCCESS.value)
self.assertIsNotNone(job.finished_at)

def test_job_status_allows_failure_states_immediately(self):
"""
Test that FAILURE and REVOKED states bypass the completion guard
Expand Down
Loading