Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 77 additions & 14 deletions ami/jobs/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,13 @@ def run_job(self, job_id: int) -> None:
job.logger.error(f'Job #{job.pk} "{job.name}" failed: {e}')
raise
else:
from ami.jobs.models import JobDispatchMode

job.refresh_from_db()
job.logger.info(f"Finished job {job}")
if job.dispatch_mode == JobDispatchMode.ASYNC_API and not job.progress.is_complete():
job.logger.info(f"run_job task exited for job {job}; async results still in-flight via NATS")
else:
job.logger.info(f"Finished job {job}")


@celery_app.task(
Expand Down Expand Up @@ -176,9 +181,12 @@ def process_nats_pipeline_result(self, job_id: int, result_data: dict, reply_sub
classifications_count = sum(len(detection.classifications) for detection in pipeline_result.detections)
captures_count = len(pipeline_result.source_images)

acked = _ack_task_via_nats(reply_subject, job.logger)
# Update job stage with calculated progress

# Do NOT ack NATS yet. ACK must happen AFTER the results-stage SREM and
# _update_job_progress so that a worker crash between save_results and
# progress commit leaves the message redeliverable. Previously the ACK
# ran here (before SREM): on crash, NATS drained permanently while
# Redis pending_images:results kept the id, stranding the job at
# partial progress with no path to completion. See antenna#1232.
try:
progress_info = state_manager.update_state(
processed_image_ids,
Expand All @@ -187,19 +195,20 @@ def process_nats_pipeline_result(self, job_id: int, result_data: dict, reply_sub
except RedisError as e:
# Transient. save_results dedupes on re-run (get_or_create_detection)
# and SREM is a no-op on already-removed ids, so a Celery retry is
# safe for the DB and Redis sets. The caveat is _update_job_progress
# accumulates detections/classifications/captures on the results
# stage (see _update_job_progress stage=="results" branch); if this
# retry runs a second time (or NATS redelivers to ADC because
# ack_wait elapsed before we got here), those counters will inflate
# cosmetically. Tracked in #1232.
# safe for the DB and Redis sets. Counter accumulation is gated on
# progress_info.newly_removed below, so replays will not inflate
# detections/classifications/captures (fixes antenna#1232 replay case).
job.logger.warning(
f"Transient Redis error updating job {job_id} state (stage=results); Celery will retry: {e}",
exc_info=True,
)
raise

if not progress_info:
# State keys genuinely missing (total-images key returned None). Ack
# first so NATS stops redelivering a message whose state is gone,
# then fail the job. Mirrors the stage=process missing-state path.
_ack_task_via_nats(reply_subject, job.logger)
_fail_job(job_id, "Job state keys not found in Redis (likely cleaned up concurrently)")
return

Expand All @@ -208,16 +217,34 @@ def process_nats_pipeline_result(self, job_id: int, result_data: dict, reply_sub
if progress_info.total > 0 and (progress_info.failed / progress_info.total) > FAILURE_THRESHOLD:
complete_state = JobState.FAILURE

# Counter-inflation guard: only add detection/classification/capture counts
# when SREM actually removed ids (first processing of this result). On a
# replay (NATS redelivered the message or the Celery task retried past
# the SREM), newly_removed==0 and we pass zeros to keep the counters
# idempotent. The percentage/status path still runs because
# _update_job_progress uses max() and preserves FAILURE regardless.
is_first_processing = progress_info.newly_removed > 0
counts_to_apply = (
(detections_count, classifications_count, captures_count) if is_first_processing else (0, 0, 0)
)
_update_job_progress(
job_id,
"results",
progress_info.percentage,
complete_state=complete_state,
detections=detections_count,
classifications=classifications_count,
captures=captures_count,
detections=counts_to_apply[0],
classifications=counts_to_apply[1],
captures=counts_to_apply[2],
)

# Ack LAST — only after the results-stage SREM and progress commit are
# durable. If anything above crashes, NATS will redeliver the message
# and the full result path re-runs idempotently: save_results dedupes
# on (detection, source_image), SREM is a no-op on already-removed ids
# (newly_removed==0 gates counter accumulation), and the progress
# percentage is clamped by max() to never regress.
acked = _ack_task_via_nats(reply_subject, job.logger)

except RedisError:
# Logged above at the specific update_state call site; re-raise so
# Celery's autoretry_for handles the transient rather than this broad
Expand Down Expand Up @@ -337,8 +364,11 @@ def _update_job_progress(
# Don't overwrite a stage with a stale progress value.
# This guards against the race where a slower worker calls _update_job_progress
# after a faster worker has already marked further progress.
passed_progress = progress_percentage
existing_progress: float | None = None
try:
existing_stage = job.progress.get_stage(stage)
existing_progress = existing_stage.progress
progress_percentage = max(existing_stage.progress, progress_percentage)
# Explicitly preserve FAILURE: once a stage is marked FAILURE it should
# never regress to a non-failure state, regardless of enum ordering.
Expand All @@ -347,6 +377,18 @@ def _update_job_progress(
except (ValueError, AttributeError):
pass # Stage doesn't exist yet; proceed normally

# Diagnostic: when max() lifts the percentage to 1.0 from a partial value
# this worker computed, surface it. A legitimate jump means another
# worker concurrently completed the stage; an unexpected jump (e.g. the
# premature-cleanup pattern described in docs/claude/processing-lifecycle.md
# as "Bug B") is otherwise invisible.
if existing_progress is not None and progress_percentage >= 1.0 and passed_progress < 1.0:
job.logger.warning(
f"Stage '{stage}' progress lifted to 100% by max() guard: "
f"this worker passed {passed_progress*100:.1f}%, DB had {existing_progress*100:.1f}%. "
f"If no other worker just legitimately finished this stage, this is a state-race symptom."
)

# Determine the status to write:
# - Stage complete (100%): use complete_state (SUCCESS or FAILURE)
# - Stage incomplete but FAILURE already determined: keep FAILURE visible
Expand Down Expand Up @@ -374,6 +416,11 @@ def _update_job_progress(
# Clean up async resources for completed jobs that use NATS/Redis
if job.progress.is_complete():
job = Job.objects.get(pk=job_id) # Re-fetch outside transaction
# Diagnostic: log which stages satisfied the complete condition. Without
# this, premature-cleanup bugs (cleanup fires while results are still
# mid-flight) are hard to trace back to a specific stage transition.
stages_summary = ", ".join(f"{s.key}={s.progress*100:.1f}% {s.status}" for s in job.progress.stages)
job.logger.info(f"is_complete()=True after stage='{stage}' update; firing cleanup. Stages: {stages_summary}")
cleanup_async_job_if_needed(job)


Expand Down Expand Up @@ -659,9 +706,25 @@ def update_job_status(sender, task_id, task, state: str, retval=None, **kwargs):

@task_failure.connect(sender=run_job, retry=False)
def update_job_failure(sender, task_id, exception, *args, **kwargs):
from ami.jobs.models import Job, JobState
from ami.jobs.models import Job, JobDispatchMode, JobState

job = Job.objects.get(task_id=task_id)

# For ASYNC_API jobs where images have been queued to NATS but the final
# stages have not completed, a run_job failure (e.g. a transient exception
# raised *after* queue_images_to_nats returned) would otherwise collapse an
# otherwise-healthy async job: NATS workers are still processing, results
# are still arriving, but this handler would mark FAILURE and cleanup would
# destroy the stream/consumer + Redis state mid-flight. Defer terminal
# state to the async result handler, which owns is_complete() transitions.
# Mirrors the SUCCESS guard in update_job_status (task_postrun).
if job.dispatch_mode == JobDispatchMode.ASYNC_API and not job.progress.is_complete():
job.logger.warning(
f'Job #{job.pk} "{job.name}" run_job raised but async processing is in-flight; '
f"deferring FAILURE to async progress handler: {exception}"
)
return

job.update_status(JobState.FAILURE, save=False)

job.logger.error(f'Job #{job.pk} "{job.name}" failed: {exception}')
Expand Down
Loading
Loading