diff --git a/ami/jobs/management/commands/chaos_monkey.py b/ami/jobs/management/commands/chaos_monkey.py index 50ad3c6ab..8dd7612b4 100644 --- a/ami/jobs/management/commands/chaos_monkey.py +++ b/ami/jobs/management/commands/chaos_monkey.py @@ -11,8 +11,17 @@ # Flush all NATS JetStream streams (simulates broker state loss) python manage.py chaos_monkey flush nats + + # Exhaust NATS max_deliver for a job without ADC: publishes test payloads, + # pulls them without ACK, waits ack_wait, repeats until max_deliver hits. + # Leaves the consumer in (num_pending=0, num_ack_pending>0, num_redelivered>0) + # — the shape `mark_lost_images_failed` is designed to reconcile. + python manage.py chaos_monkey exhaust_max_deliver --job-id 999999 \\ + --image-ids img-a,img-b,img-c """ +import asyncio + from asgiref.sync import async_to_sync from django.conf import settings from django.core.management.base import BaseCommand, CommandError @@ -27,23 +36,59 @@ class Command(BaseCommand): def add_arguments(self, parser): parser.add_argument( "action", - choices=["flush"], - help="flush: wipe all state.", + choices=["flush", "exhaust_max_deliver"], + help=( + "flush: wipe all state (requires service). " + "exhaust_max_deliver: drive a job's NATS consumer past max_deliver " + "without ADC (requires --job-id)." + ), ) parser.add_argument( "service", + nargs="?", choices=["redis", "nats"], - help="Target service to fault.", + default=None, + help="Target service for 'flush'. Ignored for other actions.", + ) + parser.add_argument( + "--job-id", + type=int, + help="Job id for 'exhaust_max_deliver'. The stream/consumer must already " + "exist (created by run_job); pass a dispatched job's id, or use " + "--ensure-stream to let this command create the stream itself.", + ) + parser.add_argument( + "--image-ids", + default="img-a,img-b,img-c", + help="Comma-separated fake image ids to publish as payloads (default 3 ids).", + ) + parser.add_argument( + "--ensure-stream", + action="store_true", + help="Create the stream+consumer if missing. Useful for standalone " + "reconciler tests against a fake job_id.", ) def handle(self, *args, **options): action = options["action"] - service = options["service"] - if action == "flush" and service == "redis": - self._flush_redis() - elif action == "flush" and service == "nats": - self._flush_nats() + if action == "flush": + service = options["service"] + if service is None: + raise CommandError("'flush' requires a service argument (redis|nats)") + if service == "redis": + self._flush_redis() + elif service == "nats": + self._flush_nats() + return + + if action == "exhaust_max_deliver": + job_id = options["job_id"] + if job_id is None: + raise CommandError("'exhaust_max_deliver' requires --job-id") + image_ids = [s.strip() for s in options["image_ids"].split(",") if s.strip()] + self._exhaust_max_deliver(job_id, image_ids, ensure_stream=options["ensure_stream"]) + return # ------------------------------------------------------------------ # Redis @@ -95,3 +140,85 @@ async def _delete_all_streams(): self.stdout.write(self.style.SUCCESS(f"Deleted {len(deleted)} stream(s).")) else: self.stdout.write("No streams found — NATS already empty.") + + def _exhaust_max_deliver(self, job_id: int, image_ids: list[str], ensure_stream: bool = False): + """Drive a job's consumer past NATS_MAX_DELIVER without running ADC. + + Publishes one payload per image id on the job's subject, then pulls + without ACK and waits ack_wait (TASK_TTR) — repeating NATS_MAX_DELIVER + times so each message hits its delivery budget. After this the consumer + sits in (num_pending=0, num_ack_pending=len(image_ids), num_redelivered>0), + which empirically is the post-exhaustion resting state — JetStream does + not clear num_ack_pending for messages that hit max_deliver. + + This is the shape `mark_lost_images_failed` is designed to reconcile. + The pending_images Redis sets for this job are NOT touched here; seed + them separately via AsyncJobStateManager.initialize_job() if you want + the reconciler to find work. + """ + from ami.ml.orchestration.nats_queue import NATS_MAX_DELIVER, TASK_TTR, TaskQueueManager + + self.stdout.write( + f"Exhausting max_deliver for job {job_id}: " + f"publishing {len(image_ids)} message(s), " + f"pulling {NATS_MAX_DELIVER}× without ACK, " + f"waiting {TASK_TTR}s between pulls. " + f"Expected total: ~{NATS_MAX_DELIVER * (TASK_TTR + 3)}s." + ) + + async def _run(): + async with TaskQueueManager() as m: + if ensure_stream: + await m._ensure_stream(job_id) + await m._ensure_consumer(job_id) + self.stdout.write(" Ensured stream+consumer exist.") + + state = await m.get_consumer_state(job_id) + if state is None: + raise CommandError( + f"No NATS consumer for job {job_id}. Dispatch the job first, " + "or pass --ensure-stream to create one." + ) + + subject = m._get_subject(job_id) + for iid in image_ids: + await m.js.publish(subject, f"chaos-payload-{iid}".encode()) + self.stdout.write(f" Published {len(image_ids)} payload(s).") + + stream = m._get_stream_name(job_id) + consumer = m._get_consumer_name(job_id) + for attempt in range(1, NATS_MAX_DELIVER + 1): + self.stdout.write(f" Attempt {attempt}/{NATS_MAX_DELIVER}: pulling (no ACK)...") + psub = await m.js.pull_subscribe_bind(consumer=consumer, stream=stream) + try: + msgs = await psub.fetch(batch=len(image_ids), timeout=5) + self.stdout.write(f" Pulled {len(msgs)} message(s).") + except Exception as e: + self.stdout.write(f" Pull returned no messages: {e}") + await psub.unsubscribe() + + if attempt < NATS_MAX_DELIVER: + self.stdout.write(f" Sleeping {TASK_TTR + 3}s (ack_wait + buffer)...") + await asyncio.sleep(TASK_TTR + 3) + + self.stdout.write(f" Final wait {TASK_TTR + 3}s for max_deliver state to settle.") + await asyncio.sleep(TASK_TTR + 3) + + state = await m.get_consumer_state(job_id) + return state + + try: + final_state = async_to_sync(_run)() + except CommandError: + raise + except Exception as e: + raise CommandError(f"exhaust_max_deliver failed: {e}") from e + + self.stdout.write( + self.style.SUCCESS( + f"Post-exhaustion ConsumerState: " + f"num_pending={final_state.num_pending} " + f"num_ack_pending={final_state.num_ack_pending} " + f"num_redelivered={final_state.num_redelivered}" + ) + ) diff --git a/ami/jobs/tasks.py b/ami/jobs/tasks.py index 4d6762793..183cd5186 100644 --- a/ami/jobs/tasks.py +++ b/ami/jobs/tasks.py @@ -13,7 +13,7 @@ from ami.main.checks.schemas import IntegrityCheckResult from ami.ml.orchestration.async_job_state import AsyncJobStateManager -from ami.ml.orchestration.nats_queue import TaskQueueManager +from ami.ml.orchestration.nats_queue import ConsumerState, TaskQueueManager from ami.ml.schemas import PipelineResultsError, PipelineResultsResponse from ami.tasks import default_soft_time_limit, default_time_limit from config import celery_app @@ -533,6 +533,243 @@ def _update_job_progress( cleanup_async_job_if_needed(job) +def mark_lost_images_failed(minutes: int | None = None, dry_run: bool = False) -> list[dict]: + """Reconcile running async_api jobs that have been idle past the cutoff + while Redis still tracks images as pending. + + **Decision signals** (all must hold): + + 1. :attr:`Job.updated_at` older than ``minutes`` — every successful result + save bumps ``updated_at``, so 10+ minutes of silence means no batch has + landed. ADC has stopped processing this job, for any reason. + 2. Redis ``job:{id}:pending_images:{process,results}`` still has ids — + there is real work left to reconcile. + 3. NATS consumer exists (``get_consumer_state`` returns not-None) — the + job is a live async_api job we own state for. + + No NATS-counter-based guards. Empirically, JetStream keeps messages in + ``num_ack_pending`` even after ``max_deliver`` is hit and the messages are + dropped from delivery; those counters are not reliable signals of whether + the queue is still making progress. Time-based staleness + "Redis still + has work" are the signals that matter. + + **Why this is safe:** + + - Late NATS deliveries are idempotent: ``save_results`` dedupes on + ``(detection, source_image)``, SREM on already-removed ids is a no-op + with ``newly_removed == 0`` gating counter accumulation (see + ``processing-lifecycle.md`` §2), and ``_update_job_progress`` clamps + percentage with ``max()``. A late result arriving post-reconcile saves + its detections and its SREM/SADD is a no-op. + - Runs BEFORE :func:`check_stale_jobs` in :func:`jobs_health_check` so + a job the reconciler can unstick lands in its natural completion state + (SUCCESS or FAILURE via :data:`FAILURE_THRESHOLD`) rather than being + REVOKEd and losing legitimate successful work. + + ``get_consumer_state`` is still called per candidate, but only so the + current ``num_redelivered`` can be logged in the diagnostic — operators + get a one-glance signal distinguishing "max_deliver exhausted" from + "never delivered" after the fact. + + Returns a list of per-job result dicts (``job_id``, ``lost_count``, + ``action``) mirroring :func:`check_stale_jobs` for consistency in + operator logs. + """ + from ami.jobs.models import Job, JobDispatchMode, JobState + + if minutes is None: + minutes = Job.STALLED_JOBS_MAX_MINUTES + + cutoff = datetime.datetime.now() - datetime.timedelta(minutes=minutes) + candidate_pks = list( + Job.objects.filter( + status__in=JobState.running_states(), + dispatch_mode=JobDispatchMode.ASYNC_API, + updated_at__lt=cutoff, + ).values_list("pk", flat=True) + ) + if not candidate_pks: + return [] + + async def _fetch_states() -> dict[int, ConsumerState | None]: + states: dict[int, ConsumerState | None] = {} + async with TaskQueueManager() as manager: + for pk in candidate_pks: + try: + states[pk] = await manager.get_consumer_state(pk) + except Exception: + # get_consumer_state already swallows per-consumer errors + # and returns None, but a truly unexpected failure (e.g. + # connection reset mid-loop) should not blow up the whole + # reconciler tick — mark this pk as "skip" and continue. + logger.exception("mark_lost_images_failed: consumer_state failed for job %s", pk) + states[pk] = None + return states + + try: + consumer_states = async_to_sync(_fetch_states)() + except Exception: + logger.exception("mark_lost_images_failed: failed to open NATS connection") + return [] + + results: list[dict] = [] + for pk in candidate_pks: + state = consumer_states.get(pk) + if state is None: + # Consumer missing: either cleanup already fired or we have no + # NATS-level record. Either way, reconciling without a consumer + # means we can't verify the state is ours — skip. + continue + + state_manager = AsyncJobStateManager(pk) + lost_ids = state_manager.get_pending_image_ids() + if not lost_ids: + continue + + if dry_run: + results.append({"job_id": pk, "lost_count": len(lost_ids), "action": "dry-run"}) + continue + + try: + action = _reconcile_lost_images(pk, lost_ids, state, cutoff) + except Exception: + logger.exception("mark_lost_images_failed: failed reconciling job %s", pk) + action = "error" + + results.append({"job_id": pk, "lost_count": len(lost_ids), "action": action}) + + return results + + +# Cap on how many image ids are written into ``progress.errors``. JSONB field on +# Job, surfaced in the UI — a 200-image job's full id list would be multi-KB +# of noise. The full set still goes to ``job.logger.warning`` (DB-backed +# JobLog table), so it remains recoverable from the UI's logs panel. +_PROGRESS_ERROR_ID_PREVIEW_LIMIT = 10 + + +def _reconcile_lost_images( + job_id: int, + lost_ids: set[str], + consumer_state: ConsumerState, + cutoff: datetime.datetime, +) -> str: + """Mark *lost_ids* as failed in Redis and push progress to 100% in both stages. + + Mirrors the SREM+SADD+progress-update sequence that + :func:`process_nats_pipeline_result` runs on every result, so the stage + transitions land in the same shape the rest of the pipeline expects. The + completion decision (SUCCESS vs FAILURE) reuses :data:`FAILURE_THRESHOLD` + so a job losing >50% of its images still falls through to FAILURE. + + Returns one of: + + - ``"marked_failed"``: reconciliation completed; counters updated. + - ``"raced"``: a late ``process_nats_pipeline_result`` bumped + ``updated_at`` (or the job left a running state) between candidate + selection and now; we defer to the natural completion path. + - ``"state_disappeared"``: Redis state vanished mid-reconcile (cleanup + fired or a different reconciler tick won the race); leave for + ``check_stale_jobs``. + """ + from ami.jobs.models import Job, JobDispatchMode, JobState + + # Re-validate inside ``select_for_update`` before any Redis SREM/SADD. The + # candidate list was computed up to a NATS round-trip ago; a late result + # arriving in that window would bump ``updated_at`` and disqualify the job. + # Without this check, we'd mark images as failed that just got their + # results — counter inflation (same id counted as both processed and failed). + try: + with transaction.atomic(): + Job.objects.select_for_update().get( + pk=job_id, + status__in=JobState.running_states(), + dispatch_mode=JobDispatchMode.ASYNC_API, + updated_at__lt=cutoff, + ) + except Job.DoesNotExist: + logger.info( + "mark_lost_images_failed: job %s no longer eligible (raced with late result)", + job_id, + ) + return "raced" + + state_manager = AsyncJobStateManager(job_id) + + # Stage "process": SREM from pending_images:process and SADD to failed_images. + process_progress = state_manager.update_state(lost_ids, stage="process", failed_image_ids=lost_ids) + # Stage "results": SREM from pending_images:results. failed_image_ids + # already covered by the previous call (SADD is idempotent anyway). + results_progress = state_manager.update_state(lost_ids, stage="results") + + if not process_progress or not results_progress: + logger.warning( + "mark_lost_images_failed: job %s state disappeared mid-reconcile; " "leaving it for check_stale_jobs", + job_id, + ) + return "state_disappeared" + + complete_state = JobState.SUCCESS + if process_progress.total > 0 and (process_progress.failed / process_progress.total) > FAILURE_THRESHOLD: + complete_state = JobState.FAILURE + + _update_job_progress( + job_id, + "process", + process_progress.percentage, + complete_state=complete_state, + processed=process_progress.processed, + remaining=process_progress.remaining, + failed=process_progress.failed, + ) + # Results-stage counters are accumulated inside _update_job_progress, so + # passing zeros here preserves whatever save_results already counted on + # the non-lost branch. We do NOT have detections/classifications for the + # lost images — by definition we never got their results. + _update_job_progress( + job_id, + "results", + results_progress.percentage, + complete_state=complete_state, + detections=0, + classifications=0, + captures=0, + ) + + sorted_ids = sorted(lost_ids) + preview_ids = sorted_ids[:_PROGRESS_ERROR_ID_PREVIEW_LIMIT] + extra = len(sorted_ids) - len(preview_ids) + ids_summary = f"{preview_ids} ... and {extra} more" if extra else str(preview_ids) + + diagnostic = ( + f"jobs_health_check: marked {len(lost_ids)} image(s) as failed " + f"(job idle past cutoff; NATS consumer " + f"num_pending={consumer_state.num_pending} " + f"num_ack_pending={consumer_state.num_ack_pending} " + f"num_redelivered={consumer_state.num_redelivered}). " + f"IDs: {ids_summary}" + ) + + # Append the (truncated) diagnostic to ``progress.errors`` so the reason is + # visible in the UI alongside the now-accurate ``failed`` count. + # ``select_for_update`` mirrors :func:`_fail_job` — the row may still be + # touched concurrently by a late ``process_nats_pipeline_result`` retry. + with transaction.atomic(): + job = Job.objects.select_for_update().get(pk=job_id) + if diagnostic not in job.progress.errors: + job.progress.errors.append(diagnostic) + job.save(update_fields=["progress"]) + + # Per-job logger gets the full id list — JobLog rows are paginated and + # not embedded in the job detail payload, so the size is acceptable there. + if extra: + job.logger.warning("%s (full IDs: %s)", diagnostic, sorted_ids) + else: + job.logger.warning(diagnostic) + + return "marked_failed" + + def check_stale_jobs(minutes: int | None = None, dry_run: bool = False) -> list[dict]: """ Find jobs stuck in a running state past the cutoff and revoke them. @@ -665,11 +902,32 @@ class JobsHealthCheckResult: Add a new field here when adding a sub-check to the umbrella. """ + lost_images: IntegrityCheckResult stale_jobs: IntegrityCheckResult running_job_snapshots: IntegrityCheckResult zombie_streams: IntegrityCheckResult +def _run_mark_lost_images_failed_check() -> IntegrityCheckResult: + """Mark NATS-lost pending images as failed so their jobs can finalize. + + Runs BEFORE ``_run_stale_jobs_check`` in :func:`jobs_health_check` — a job + the reconciler can unstick should land in its natural completion state + rather than being REVOKED. Jobs the reconciler can't help (consumer + missing or no pending ids) fall through to ``check_stale_jobs`` unchanged. + """ + results = mark_lost_images_failed() + fixed = sum(1 for r in results if r["action"] == "marked_failed") + unfixable = sum(1 for r in results if r["action"] == "error") + logger.info( + "lost_images check: %d candidate(s), %d marked failed, %d error(s)", + len(results), + fixed, + unfixable, + ) + return IntegrityCheckResult(checked=len(results), fixed=fixed, unfixable=unfixable) + + def _run_stale_jobs_check() -> IntegrityCheckResult: """Reconcile jobs stuck in running states past Job.STALLED_JOBS_MAX_MINUTES.""" results = check_stale_jobs() @@ -884,7 +1142,12 @@ def jobs_health_check() -> dict: :class:`JobsHealthCheckResult` so celery's default JSON backend can store it; add new sub-checks by extending that dataclass and calling them here. """ + # Order matters: the ``lost_images`` reconciler runs BEFORE ``stale_jobs`` + # so that jobs whose only remaining problem is NATS-dropped pending ids + # land in SUCCESS/FAILURE via _update_job_progress rather than REVOKE. + # Jobs the reconciler can't help still fall through to check_stale_jobs. result = JobsHealthCheckResult( + lost_images=_safe_run_sub_check("lost_images", _run_mark_lost_images_failed_check), stale_jobs=_safe_run_sub_check("stale_jobs", _run_stale_jobs_check), running_job_snapshots=_safe_run_sub_check("running_job_snapshots", _run_running_job_snapshot_check), zombie_streams=_safe_run_sub_check("zombie_streams", _run_zombie_streams_check), diff --git a/ami/jobs/tests/test_periodic_beat_tasks.py b/ami/jobs/tests/test_periodic_beat_tasks.py index 9e1592872..78ebc6f87 100644 --- a/ami/jobs/tests/test_periodic_beat_tasks.py +++ b/ami/jobs/tests/test_periodic_beat_tasks.py @@ -41,6 +41,10 @@ def _stub_manager(self, mock_manager_cls) -> AsyncMock: instance.populate_redelivered_counts = AsyncMock(return_value=None) instance.delete_consumer = AsyncMock(return_value=True) instance.delete_stream = AsyncMock(return_value=True) + # Lost-images sub-check default: no consumer state available (e.g. no + # async_api stale jobs). Individual tests override when exercising the + # reconciler path. + instance.get_consumer_state = AsyncMock(return_value=None) return instance def test_reports_both_sub_check_results(self, mock_manager_cls, _mock_cleanup): @@ -53,6 +57,7 @@ def test_reports_both_sub_check_results(self, mock_manager_cls, _mock_cleanup): self.assertEqual( result, { + "lost_images": _empty_check_dict(), "stale_jobs": {"checked": 2, "fixed": 2, "unfixable": 0}, "running_job_snapshots": _empty_check_dict(), "zombie_streams": _empty_check_dict(), @@ -67,6 +72,7 @@ def test_idle_deployment_returns_all_zeros(self, mock_manager_cls, _mock_cleanup self.assertEqual( jobs_health_check(), { + "lost_images": _empty_check_dict(), "stale_jobs": _empty_check_dict(), "running_job_snapshots": _empty_check_dict(), "zombie_streams": _empty_check_dict(), diff --git a/ami/jobs/tests/test_tasks.py b/ami/jobs/tests/test_tasks.py index 145e70399..89587f3f1 100644 --- a/ami/jobs/tests/test_tasks.py +++ b/ami/jobs/tests/test_tasks.py @@ -5,6 +5,7 @@ is received instead of successful pipeline results. """ +import datetime import logging from concurrent.futures import ThreadPoolExecutor from unittest.mock import AsyncMock, MagicMock, patch @@ -859,3 +860,372 @@ def test_job_with_no_pipeline_logs_generic_message(self): f"expected generic no-pipeline line in {output}", ) self.assertFalse(any("Zero workers have been seen" in ln for ln in output)) + + +class TestMarkLostImagesFailed(TransactionTestCase): + """Regression tests for the NATS-lost-images reconciler. + + Production incident 2026-04-16 (job 2421): 998 images, 982 processed cleanly, + 5 explicit failures, and 16 images stuck indefinitely in Redis pending_images + after an ADC worker hit a 2h NATS/Redis connection drop. NATS had given up + (max_deliver=2) before the worker reconnected, so the messages were gone + but the job kept sitting at 98.4% until ``check_stale_jobs`` REVOKED it and + discarded the 98% of successful work. + + The right outcome for that job was SUCCESS with ``failed=21/998`` (~2%), not + REVOKED. This test shape mirrors the incident: successful SREMs, explicit + failures already in ``failed_images``, and a residual "lost" set still in + both pending_images:{process,results}. The helper under test should SADD the + lost ids to ``failed_images``, SREM them from the pending sets, and let the + existing completion logic in ``_update_job_progress`` finalize the job. + """ + + def setUp(self): + cache.clear() + self.project = Project.objects.create(name="Lost Images Test Project") + self.pipeline = Pipeline.objects.create(name="Lost Pipeline", slug="lost-pipeline") + self.pipeline.projects.add(self.project) + self.collection = SourceImageCollection.objects.create(name="Lost Coll", project=self.project) + + def tearDown(self): + cache.clear() + + def _make_stuck_job(self, total_images: int = 10, already_processed: int = 7, explicit_failures: int = 1): + """Build the job-2421 Redis + Job.progress shape. + + Returns (job, set_of_lost_ids). The lost count is derived so the three + buckets always sum to ``total_images``. + """ + lost = total_images - already_processed - explicit_failures + assert lost > 0, "test requires at least one lost image" + + job = Job.objects.create( + job_type_key=MLJob.key, + project=self.project, + name="job-2421-shape", + pipeline=self.pipeline, + source_image_collection=self.collection, + dispatch_mode=JobDispatchMode.ASYNC_API, + ) + all_ids = [str(i) for i in range(1000, 1000 + total_images)] + processed_ids = set(all_ids[:already_processed]) + failed_ids = set(all_ids[already_processed : already_processed + explicit_failures]) + lost_ids = set(all_ids[already_processed + explicit_failures :]) + + manager = AsyncJobStateManager(job.pk) + manager.initialize_job(all_ids) + # Successful results for the first bucket: SREM from both pending sets. + manager.update_state(processed_ids, stage="process") + manager.update_state(processed_ids, stage="results") + # Explicit failures: SREM from both pending sets + SADD to failed_images + # (mirrors what process_nats_pipeline_result does with a PipelineResultsError). + manager.update_state(failed_ids, stage="process", failed_image_ids=failed_ids) + manager.update_state(failed_ids, stage="results") + + # Mirror the last _update_job_progress snapshot into job.progress. + progress = job.progress + collect_stage = progress.get_stage("collect") + collect_stage.progress = 1.0 + collect_stage.status = JobState.SUCCESS + non_lost = already_processed + explicit_failures + progress.update_stage( + "process", + progress=non_lost / total_images, + status=JobState.STARTED, + processed=non_lost, + remaining=lost, + failed=explicit_failures, + ) + progress.update_stage( + "results", + progress=non_lost / total_images, + status=JobState.STARTED, + detections=0, + classifications=0, + captures=already_processed, + ) + job.status = JobState.STARTED + job.save() + + # Force updated_at to appear stale so the helper considers this job. + Job.objects.filter(pk=job.pk).update( + updated_at=datetime.datetime.now() - datetime.timedelta(minutes=Job.STALLED_JOBS_MAX_MINUTES + 1) + ) + job.refresh_from_db() + return job, lost_ids + + def _mock_consumer_state( + self, + mock_manager_class, + num_pending: int = 0, + num_ack_pending: int = 0, + num_redelivered: int = 0, + ): + from ami.ml.orchestration.nats_queue import ConsumerState + + mock_manager = AsyncMock() + mock_manager.get_consumer_state = AsyncMock( + return_value=ConsumerState( + num_pending=num_pending, + num_ack_pending=num_ack_pending, + num_redelivered=num_redelivered, + ) + ) + mock_manager_class.return_value.__aenter__.return_value = mock_manager + mock_manager_class.return_value.__aexit__.return_value = AsyncMock() + return mock_manager + + @patch("ami.jobs.tasks.TaskQueueManager") + def test_marks_lost_images_as_failed_and_finalizes_success(self, mock_manager_class): + """Job-2421 shape: NATS drained (num_pending=0, num_ack_pending=0) while + Redis pending still holds redelivery-exhausted ids. The helper should + SADD those to failed_images, SREM from pending, and let the existing + completion logic (failed/total < FAILURE_THRESHOLD) land the job in SUCCESS. + """ + from ami.jobs.tasks import mark_lost_images_failed + + job, lost_ids = self._make_stuck_job(total_images=10, already_processed=7, explicit_failures=1) + self._mock_consumer_state(mock_manager_class, num_pending=0, num_ack_pending=0, num_redelivered=len(lost_ids)) + + results = mark_lost_images_failed() + + self.assertEqual(len(results), 1, f"expected one job reconciled, got {results}") + self.assertEqual(results[0]["job_id"], job.pk) + self.assertEqual(results[0]["lost_count"], len(lost_ids)) + self.assertEqual(results[0]["action"], "marked_failed") + + job.refresh_from_db() + self.assertEqual( + job.status, + JobState.SUCCESS.value, + f"expected SUCCESS (failed=3/10 below FAILURE_THRESHOLD); got {job.status}", + ) + self.assertTrue(job.progress.is_complete(), f"stages not complete: {job.progress.stages}") + + process = job.progress.get_stage("process") + self.assertEqual(process.progress, 1.0) + failed_param = next((p.value for p in process.params if p.key == "failed"), None) + self.assertEqual( + failed_param, + 3, + f"process.failed should be explicit_failures (1) + lost (2) = 3, got {failed_param}", + ) + remaining_param = next((p.value for p in process.params if p.key == "remaining"), None) + self.assertEqual(remaining_param, 0, f"process.remaining should be 0, got {remaining_param}") + + # Redis state is wiped by cleanup_async_job_if_needed once is_complete() + # fires, so we assert against the durable Job.progress snapshot, not + # AsyncJobStateManager.get_progress (which returns None after cleanup). + + self.assertTrue( + any("job idle past cutoff" in e for e in job.progress.errors), + f"expected diagnostic in progress.errors, got {job.progress.errors}", + ) + + @patch("ami.jobs.tasks.TaskQueueManager") + def test_reconciles_when_consumer_shows_undelivered_pending(self, mock_manager_class): + """Idle cutoff is the decision signal, not NATS counters. A job with + ``updated_at`` >10 min old and ``num_pending > 0`` means ADC hasn't + pulled messages for >10 min; those images are stuck regardless of what + the NATS stream looks like. Reconcile.""" + from ami.jobs.tasks import mark_lost_images_failed + + job, lost_ids = self._make_stuck_job() + self._mock_consumer_state(mock_manager_class, num_pending=len(lost_ids), num_ack_pending=0, num_redelivered=0) + + results = mark_lost_images_failed() + + self.assertEqual(len(results), 1) + self.assertEqual(results[0]["action"], "marked_failed") + job.refresh_from_db() + self.assertTrue(job.progress.is_complete()) + + @patch("ami.jobs.tasks.TaskQueueManager") + def test_reconciles_when_consumer_shows_ack_pending(self, mock_manager_class): + """Empirical NATS behavior: after ``max_deliver`` exhaustion, messages + stay in ``num_ack_pending`` indefinitely (not cleared until stream + deletion). This is the exact production failure mode — guarding on + ``num_ack_pending > 0`` would block recovery from the bug we're fixing.""" + from ami.jobs.tasks import mark_lost_images_failed + + job, lost_ids = self._make_stuck_job() + self._mock_consumer_state( + mock_manager_class, num_pending=0, num_ack_pending=len(lost_ids), num_redelivered=len(lost_ids) + ) + + results = mark_lost_images_failed() + + self.assertEqual(len(results), 1) + self.assertEqual(results[0]["action"], "marked_failed") + job.refresh_from_db() + self.assertTrue(job.progress.is_complete()) + + @patch("ami.jobs.tasks.TaskQueueManager") + def test_noop_when_job_updated_recently(self, mock_manager_class): + """Idle-threshold guard: a job that updated_at-bumped within the + STALLED_JOBS_MAX_MINUTES window is considered in-flight.""" + from ami.jobs.tasks import mark_lost_images_failed + + job, lost_ids = self._make_stuck_job() + # Reverse the staleness applied by _make_stuck_job. + Job.objects.filter(pk=job.pk).update(updated_at=datetime.datetime.now()) + self._mock_consumer_state(mock_manager_class, num_pending=0, num_ack_pending=0, num_redelivered=len(lost_ids)) + + results = mark_lost_images_failed() + + self.assertEqual(results, []) + + @patch("ami.jobs.tasks.TaskQueueManager") + def test_reconciles_when_num_redelivered_zero_but_redis_has_stuck_ids(self, mock_manager_class): + """Pre-#1234 Bug A signature: drained consumer, never-redelivered, but + Redis still has pending ids because an ACK landed before the SREM. + After dropping the ``num_redelivered > 0`` guard, this case reconciles + the same way as the ``max_deliver`` exhaustion case — Redis drives the + outcome, not the consumer's delivery history.""" + from ami.jobs.tasks import mark_lost_images_failed + + job, lost_ids = self._make_stuck_job() + self._mock_consumer_state(mock_manager_class, num_pending=0, num_ack_pending=0, num_redelivered=0) + + results = mark_lost_images_failed() + + self.assertEqual(len(results), 1) + self.assertEqual(results[0]["action"], "marked_failed") + job.refresh_from_db() + self.assertTrue(job.progress.is_complete()) + + @patch("ami.jobs.tasks.TaskQueueManager") + def test_mixed_failures_combine_without_double_counting(self, mock_manager_class): + """Already-SADDed explicit failures stay in ``failed_images`` alongside + the newly-SADDed lost ids; SADD is idempotent on any accidental overlap. + Final progress.failed should be explicit + lost, not 2 * overlap.""" + from ami.jobs.tasks import mark_lost_images_failed + + # 20 total, 15 processed, 3 explicit failures, 2 lost → failed_total = 5 + job, lost_ids = self._make_stuck_job(total_images=20, already_processed=15, explicit_failures=3) + self._mock_consumer_state(mock_manager_class, num_pending=0, num_ack_pending=0, num_redelivered=len(lost_ids)) + + results = mark_lost_images_failed() + + self.assertEqual(len(results), 1) + self.assertEqual(results[0]["lost_count"], len(lost_ids)) + + job.refresh_from_db() + self.assertEqual(job.status, JobState.SUCCESS.value) + process = job.progress.get_stage("process") + failed_param = next((p.value for p in process.params if p.key == "failed"), None) + self.assertEqual(failed_param, 5, f"expected 3 explicit + 2 lost = 5, got {failed_param}") + + @patch("ami.jobs.tasks.TaskQueueManager") + def test_falls_to_failure_when_lost_over_threshold(self, mock_manager_class): + """FAILURE_THRESHOLD (0.5) preserved: a job that loses >50% of its images + still lands in FAILURE via the same code path — the helper feeds accurate + counts, it does not override the completion rules.""" + from ami.jobs.tasks import mark_lost_images_failed + + # 10 total, 3 processed, 0 explicit, 7 lost → 7/10 > 0.5 → FAILURE + job, lost_ids = self._make_stuck_job(total_images=10, already_processed=3, explicit_failures=0) + self._mock_consumer_state(mock_manager_class, num_pending=0, num_ack_pending=0, num_redelivered=len(lost_ids)) + + results = mark_lost_images_failed() + + self.assertEqual(len(results), 1) + job.refresh_from_db() + self.assertEqual(job.status, JobState.FAILURE.value, f"expected FAILURE for 7/10 lost; got {job.status}") + + def test_reconcile_skips_when_job_updated_at_bumped_after_candidate_select(self): + """Race re-validation: between ``mark_lost_images_failed`` reading + ``candidate_pks`` and ``_reconcile_lost_images`` writing to Redis, a + late ``process_nats_pipeline_result`` could land and bump ``updated_at`` + past the cutoff. The reconciler must not blindly mark images as failed + in that window — it would inflate counters (same id processed AND + failed) and overwrite legitimate progress. + + Verified by calling ``_reconcile_lost_images`` directly with a cutoff + older than the job's current ``updated_at`` (mimicking the late-result + bump). Expected: returns ``"raced"``, no progress.errors written, no + Redis SREM/SADD performed. + """ + from ami.jobs.tasks import _reconcile_lost_images + from ami.ml.orchestration.nats_queue import ConsumerState + + job, lost_ids = self._make_stuck_job() + # Mimic a late result arriving: bump updated_at to "now". + Job.objects.filter(pk=job.pk).update(updated_at=datetime.datetime.now()) + + # Use a cutoff that the live (post-bump) updated_at will fail. Anything + # in the past works; pick 1 minute ago to be comfortably before "now". + cutoff = datetime.datetime.now() - datetime.timedelta(minutes=1) + consumer_state = ConsumerState(num_pending=0, num_ack_pending=0, num_redelivered=len(lost_ids)) + + # Snapshot pre-call Redis state so we can assert it was untouched. + manager = AsyncJobStateManager(job.pk) + pre_pending = manager.get_pending_image_ids() + + action = _reconcile_lost_images(job.pk, lost_ids, consumer_state, cutoff) + + self.assertEqual(action, "raced") + job.refresh_from_db() + # progress.errors untouched: no diagnostic from this run. + self.assertFalse( + any("job idle past cutoff" in e for e in job.progress.errors), + f"raced reconcile must not write progress.errors; got {job.progress.errors}", + ) + # Redis pending set unchanged: SREM was never issued. + self.assertEqual(manager.get_pending_image_ids(), pre_pending) + + @patch("ami.jobs.tasks.TaskQueueManager") + def test_progress_errors_truncates_long_id_list(self, mock_manager_class): + """JSONB cap: ``progress.errors`` is rendered in the UI and shipped + in the job detail payload. A 200-image job's full sorted id list + would be multi-KB per error entry. The diagnostic written to + ``progress.errors`` previews the first + ``_PROGRESS_ERROR_ID_PREVIEW_LIMIT`` ids and notes "and N more"; the + full list is logged separately to the per-job logger. + """ + from ami.jobs.tasks import _PROGRESS_ERROR_ID_PREVIEW_LIMIT, mark_lost_images_failed + + # 25 lost > limit (10) → "and 15 more" + job, lost_ids = self._make_stuck_job(total_images=30, already_processed=4, explicit_failures=1) + self.assertGreater(len(lost_ids), _PROGRESS_ERROR_ID_PREVIEW_LIMIT) + self._mock_consumer_state(mock_manager_class, num_pending=0, num_ack_pending=0, num_redelivered=len(lost_ids)) + + results = mark_lost_images_failed() + + self.assertEqual(len(results), 1) + self.assertEqual(results[0]["action"], "marked_failed") + + job.refresh_from_db() + diagnostic_entry = next((e for e in job.progress.errors if "job idle past cutoff" in e), None) + self.assertIsNotNone(diagnostic_entry, f"diagnostic missing; got errors={job.progress.errors}") + extra = len(lost_ids) - _PROGRESS_ERROR_ID_PREVIEW_LIMIT + self.assertIn(f"and {extra} more", diagnostic_entry) + + # The id list in progress.errors must include the first preview ids + # but not the trailing ones. Pick any id beyond the preview window + # and assert it's absent. + sorted_ids = sorted(lost_ids) + self.assertIn(sorted_ids[0], diagnostic_entry) + self.assertNotIn(sorted_ids[-1], diagnostic_entry) + + @patch("ami.jobs.tasks.TaskQueueManager") + def test_jobs_health_check_runs_lost_images_before_stale_jobs(self, mock_manager_class): + """Integration: jobs_health_check should unstick lost-images jobs before + check_stale_jobs gets a chance to REVOKE them. A job that would otherwise + be revoked (status running + updated_at past cutoff) lands in SUCCESS via + the lost-images path.""" + from ami.jobs.tasks import jobs_health_check + + job, lost_ids = self._make_stuck_job() + self._mock_consumer_state(mock_manager_class, num_pending=0, num_ack_pending=0, num_redelivered=len(lost_ids)) + + result = jobs_health_check() + + self.assertEqual(result["lost_images"]["fixed"], 1, f"result={result}") + # stale_jobs sub-check must not have revoked anything — the job already + # terminated in SUCCESS in the earlier step, so it is no longer + # running_state by the time check_stale_jobs runs. + self.assertEqual(result["stale_jobs"]["fixed"], 0, f"result={result}") + + job.refresh_from_db() + self.assertEqual(job.status, JobState.SUCCESS.value) diff --git a/ami/ml/orchestration/async_job_state.py b/ami/ml/orchestration/async_job_state.py index 77efa1d25..26e7c3024 100644 --- a/ami/ml/orchestration/async_job_state.py +++ b/ami/ml/orchestration/async_job_state.py @@ -212,6 +212,22 @@ def get_progress(self, stage: str) -> "JobStateProgress | None": failed=failed_count, ) + def get_pending_image_ids(self) -> set[str]: + """Return the union of image IDs still pending in either stage's set. + + Used by the jobs_health_check reconciler to find ids that NATS has + given up redelivering but Redis still tracks as not-yet-processed. + Returns an empty set if neither pending set exists. + """ + try: + redis = self._get_redis() + keys = [self._get_pending_key(stage) for stage in self.STAGES] + members = redis.sunion(keys) + except RedisError as e: + logger.error(f"Redis error reading pending image ids for job {self.job_id}: {e}") + return set() + return {m.decode() if isinstance(m, (bytes, bytearray)) else str(m) for m in members} + def cleanup(self) -> None: """ Delete all Redis keys associated with this job. diff --git a/ami/ml/orchestration/nats_queue.py b/ami/ml/orchestration/nats_queue.py index 119d28a8c..1338e05ff 100644 --- a/ami/ml/orchestration/nats_queue.py +++ b/ami/ml/orchestration/nats_queue.py @@ -15,6 +15,7 @@ import json import logging import re +from dataclasses import dataclass import nats from asgiref.sync import sync_to_async @@ -58,6 +59,20 @@ async def get_connection(nats_url: str) -> tuple[nats.NATS, JetStreamContext]: ADVISORY_STREAM_NAME = "advisories" # Shared stream for max delivery advisories across all jobs +@dataclass +class ConsumerState: + """A thin, mockable projection of ``nats.js.api.ConsumerInfo``. + + Only the fields the jobs_health_check reconciler needs to decide whether a + consumer has genuinely drained — keeping this off the raw nats-py type makes + tests trivially buildable without standing up a full ConsumerInfo. + """ + + num_pending: int | None + num_ack_pending: int | None + num_redelivered: int | None + + def _parse_nats_timestamp(raw: str) -> datetime.datetime: """Parse an RFC3339-ish NATS timestamp, tolerating sub-microsecond precision. @@ -678,6 +693,35 @@ async def _fetch_one(snap: dict) -> None: await asyncio.gather(*(_fetch_one(s) for s in snapshots)) + async def get_consumer_state(self, job_id: int) -> ConsumerState | None: + """Return the current consumer counters for a job, or ``None`` if unreachable. + + Tolerant like :meth:`log_consumer_stats_snapshot`: a missing stream / + consumer (e.g. the job was already cleaned up) returns ``None`` rather + than raising. The reconciler that consumes this result treats ``None`` + as "skip — nothing to act on" for that job. + """ + if self.js is None: + return None + stream_name = self._get_stream_name(job_id) + consumer_name = self._get_consumer_name(job_id) + try: + info = await asyncio.wait_for( + self.js.consumer_info(stream_name, consumer_name), + timeout=NATS_JETSTREAM_TIMEOUT, + ) + except Exception as e: + # Broad catch mirrors ``_log_consumer_stats``: if the consumer is + # gone or the NATS call errors we want to skip the job, not blow + # up the whole reconciler tick. + logger.debug(f"Could not fetch consumer state for {consumer_name}: {e}") + return None + return ConsumerState( + num_pending=info.num_pending, + num_ack_pending=info.num_ack_pending, + num_redelivered=info.num_redelivered, + ) + async def _consumer_redelivered_count(self, job_id: int) -> int | None: """Return ``num_redelivered`` from the job's consumer, or ``None`` if gone.""" if self.js is None: diff --git a/docs/claude/debugging/chaos-scenarios.md b/docs/claude/debugging/chaos-scenarios.md index d61775057..c1d6dc670 100644 --- a/docs/claude/debugging/chaos-scenarios.md +++ b/docs/claude/debugging/chaos-scenarios.md @@ -154,6 +154,51 @@ stranded at partial progress. eventually revoke it. 5. Revert; restart; `git diff` clean. +### Scenario F: max_deliver exhaustion → reconciler marks images failed + +Verifies the `mark_lost_images_failed` sub-check inside `jobs_health_check`: +when NATS has given up redelivering messages (hit `max_deliver`) but Redis +still tracks them as pending, the reconciler marks them failed so the job +lands in SUCCESS/FAILURE instead of being REVOKEd by the stale-job reaper +(which would nuke legitimate processed work). + +No code patches required — the chaos is driven entirely by a management +command. + +1. Pick a fake job id (e.g. `999999`) and fake image ids (`img-a,img-b,img-c`). +2. Seed Redis pending sets: + ``` + docker compose exec django python manage.py shell -c " + from ami.ml.orchestration.async_job_state import AsyncJobStateManager + AsyncJobStateManager(999999).initialize_job(['img-a', 'img-b', 'img-c']) + " + ``` +3. Create a minimal Job row with `dispatch_mode=ASYNC_API` and back-date + `updated_at` past `STALLED_JOBS_MAX_MINUTES`. +4. Drive the NATS consumer past `max_deliver` without ADC: + ``` + docker compose exec django python manage.py chaos_monkey exhaust_max_deliver \ + --job-id 999999 --image-ids img-a,img-b,img-c --ensure-stream + ``` + Takes ~66s (`NATS_MAX_DELIVER × (TASK_TTR + 3s)` + final settle). Prints + `num_pending=0 num_ack_pending=3 num_redelivered=3` on exit. +5. Invoke the reconciler: + ``` + docker compose exec django python manage.py shell -c " + from ami.jobs.tasks import mark_lost_images_failed + print(mark_lost_images_failed()) + " + ``` +6. Expected: + - Reconciler returns `[{'job_id': 999999, 'action': 'marked_failed', 'lost_count': 3}]`. + - Job row flips to `FAILURE` (3/3 = 100% > `FAILURE_THRESHOLD`). With a + realistic ratio (e.g. 6 lost out of 20, <50%), it would flip to `SUCCESS`. + - `job.progress.errors` has a line starting with `jobs_health_check: marked N image(s) as failed (job idle past cutoff; ...)`. + - `cleanup_async_job_if_needed` fires on completion; NATS stream + Redis + keys deleted. +7. Cleanup: `Job.objects.filter(pk=999999).delete()` (stream/Redis already + drained by the reconciler's cleanup trigger). + ## Gotchas - **celeryworker startup noise**: first ~60s after restart, the diff --git a/docs/claude/processing-lifecycle.md b/docs/claude/processing-lifecycle.md index 746ac9c44..54f94bc8b 100644 --- a/docs/claude/processing-lifecycle.md +++ b/docs/claude/processing-lifecycle.md @@ -90,6 +90,7 @@ If any invariant is violated, the failure mode is probably below. | Symptom | Likely cause | Diagnostic | Fix direction | |---|---|---|---| | Job STARTED forever; NATS drained; Redis `pending:results` > 0 | Worker crashed between ACK and results-stage SREM (Bug A) | `redis-cli -n 1 SCARD job:{id}:pending_images:results` > 0 AND NATS `num_pending+num_ack_pending == 0` | Addressed by PR #1234 (ACK now runs after the results SREM + progress commit; crashes leave the message redeliverable). The 15-min stale-job reaper is the safety net if this class of bug resurfaces. | +| Job stuck at <100% indefinitely; most images processed; Redis `pending:process` or `pending:results` has a small remainder; NATS consumer shows `num_redelivered > 0` | NATS gave up redelivering after `max_deliver=2` (the 2 delivery attempts both failed non-retriably — the payload, handler, or DB side). Empirically, JetStream keeps those messages in `num_ack_pending` indefinitely rather than clearing them. | `redis-cli -n 1 SMEMBERS job:{id}:pending_images:process` has ids AND `updated_at` is >10 min old AND last `Pending images from Redis` log line was >10 min ago | Addressed by `mark_lost_images_failed` (PR #1244) — new sub-check inside `jobs_health_check` that runs BEFORE `check_stale_jobs`, marks the still-pending ids as failed and pushes progress to 100% so the existing completion logic lands the job in SUCCESS or FAILURE based on `FAILURE_THRESHOLD`. Preserves legitimate processed work instead of REVOKE-nuking the whole job. | | Job FAILURE within 30-60s of dispatch; cleanup fired mid-processing | Premature `cleanup_async_job_resources` — `is_complete()` momentarily True (Bug B, not yet reproduced) | grep log `Finalizing NATS consumer` for job, compare timestamp to `Finished job` (run_job exit) and first `Updated job ... progress` line | Separate issue (see drafts in ami-devops). Not in scope for Fix 1. | | Transient `run_job` exception flips job to FAILURE even though 100+ images were successfully queued | `task_failure` signal missing ASYNC_API guard (Bug C) | grep log for `task_failure` on `run_job` + Job row status=FAILURE + Redis still has pending IDs | Guarded in `update_job_failure` at `tasks.py:729`: defers FAILURE for ASYNC_API jobs when `progress.is_complete()` is False, mirroring the `task_postrun` SUCCESS guard. Stale-job reaper eventually revokes if the job truly stays stuck. | | `Job state keys not found in Redis` log line | Either genuine cleanup race, or transient Redis error being misreported | If paired with autoretry-backoff log lines, it's transient (normal); if one-shot, check if cleanup fired earlier for this job_id | Already fixed in #1219/#1231 (transient path now autoretries + logs distinctly) |