From 703400bf19e8bb793e13121227b483a602b6ce9e Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Mon, 2 Mar 2026 17:00:01 -0800 Subject: [PATCH 1/7] fix: revoke stale jobs by default instead of setting PENDING update_stale_jobs previously checked Celery for task state, but AsyncResult returns PENDING for tasks it has no record of. This caused stale jobs to cycle through PENDING endlessly, and async_api jobs kept serving tasks to workers via the /tasks/ endpoint. Now: only trust Celery when it reports a known state (SUCCESS, FAILURE, etc). Otherwise revoke the job and clean up NATS/Redis resources. Also adds --dry-run flag. Co-Authored-By: Claude Opus 4.6 --- .../management/commands/update_stale_jobs.py | 49 ++++++++++++++----- 1 file changed, 37 insertions(+), 12 deletions(-) diff --git a/ami/jobs/management/commands/update_stale_jobs.py b/ami/jobs/management/commands/update_stale_jobs.py index da3a53e3d..33e517020 100644 --- a/ami/jobs/management/commands/update_stale_jobs.py +++ b/ami/jobs/management/commands/update_stale_jobs.py @@ -4,35 +4,60 @@ from django.utils import timezone from ami.jobs.models import Job, JobState +from ami.jobs.tasks import cleanup_async_job_if_needed + +# Celery returns PENDING for tasks it has no record of. +# These are the states that indicate a real, known task status. +KNOWN_CELERY_STATES = frozenset(states.ALL_STATES) - {states.PENDING} class Command(BaseCommand): - help = ( - "Update the status of all jobs that are not in a final state " "and have not been updated in the last X hours." - ) + help = "Revoke stale jobs that have not been updated within the cutoff period." - # Add argument for the number of hours to consider a job stale def add_arguments(self, parser): parser.add_argument( "--hours", type=int, default=Job.FAILED_CUTOFF_HOURS, - help="Number of hours to consider a job stale", + help="Number of hours to consider a job stale (default: %(default)s)", + ) + parser.add_argument( + "--dry-run", + action="store_true", + help="Show what would be done without making changes", ) def handle(self, *args, **options): + cutoff = timezone.now() - timezone.timedelta(hours=options["hours"]) stale_jobs = Job.objects.filter( status__in=JobState.running_states(), - updated_at__lt=timezone.now() - timezone.timedelta(hours=options["hours"]), + updated_at__lt=cutoff, ) + if not stale_jobs.exists(): + self.stdout.write("No stale jobs found.") + return + for job in stale_jobs: - task = AsyncResult(job.task_id) if job.task_id else None - if task: - job.update_status(task.state, save=False) + celery_state = None + if job.task_id: + celery_state = AsyncResult(job.task_id).state + + if celery_state in KNOWN_CELERY_STATES: + # Celery has a real status for this task — use it + if options["dry_run"]: + self.stdout.write(f" [dry-run] Job {job.pk}: would update to {celery_state} (from Celery)") + continue + job.update_status(celery_state, save=False) job.save() - self.stdout.write(self.style.SUCCESS(f"Updated status of job {job.pk} to {task.state}")) + self.stdout.write(self.style.SUCCESS(f"Job {job.pk}: updated to {celery_state} (from Celery)")) else: - self.stdout.write(self.style.WARNING(f"Job {job.pk} has no associated task, setting status to FAILED")) - job.update_status(states.FAILURE, save=False) + # No task_id, or Celery has no record (returns PENDING) — revoke + if options["dry_run"]: + self.stdout.write(f" [dry-run] Job {job.pk} ({job.status}): would revoke and clean up") + continue + job.update_status(JobState.REVOKED, save=False) + job.finished_at = timezone.now() job.save() + cleanup_async_job_if_needed(job) + self.stdout.write(self.style.WARNING(f"Job {job.pk}: revoked (no known Celery state)")) From 281e931b6aa3bc02e935fc131a0ca0026a05f722 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Mon, 2 Mar 2026 17:43:53 -0800 Subject: [PATCH 2/7] refactor: extract check_stale_jobs() for reuse by periodic task Move core stale-job logic from management command into check_stale_jobs() in tasks.py. The management command is now a thin wrapper. Add tests for the extracted function. This prepares for #1025 which will call check_stale_jobs() from a Celery Beat periodic task. Co-Authored-By: Claude --- .../management/commands/update_stale_jobs.py | 48 ++++---------- ami/jobs/tasks.py | 50 ++++++++++++++ ami/jobs/tests_update_stale_jobs.py | 66 +++++++++++++++++++ 3 files changed, 127 insertions(+), 37 deletions(-) create mode 100644 ami/jobs/tests_update_stale_jobs.py diff --git a/ami/jobs/management/commands/update_stale_jobs.py b/ami/jobs/management/commands/update_stale_jobs.py index 33e517020..0f19933dd 100644 --- a/ami/jobs/management/commands/update_stale_jobs.py +++ b/ami/jobs/management/commands/update_stale_jobs.py @@ -1,14 +1,7 @@ -from celery import states -from celery.result import AsyncResult from django.core.management.base import BaseCommand -from django.utils import timezone -from ami.jobs.models import Job, JobState -from ami.jobs.tasks import cleanup_async_job_if_needed - -# Celery returns PENDING for tasks it has no record of. -# These are the states that indicate a real, known task status. -KNOWN_CELERY_STATES = frozenset(states.ALL_STATES) - {states.PENDING} +from ami.jobs.models import Job +from ami.jobs.tasks import check_stale_jobs class Command(BaseCommand): @@ -28,36 +21,17 @@ def add_arguments(self, parser): ) def handle(self, *args, **options): - cutoff = timezone.now() - timezone.timedelta(hours=options["hours"]) - stale_jobs = Job.objects.filter( - status__in=JobState.running_states(), - updated_at__lt=cutoff, - ) + results = check_stale_jobs(hours=options["hours"], dry_run=options["dry_run"]) - if not stale_jobs.exists(): + if not results: self.stdout.write("No stale jobs found.") return - for job in stale_jobs: - celery_state = None - if job.task_id: - celery_state = AsyncResult(job.task_id).state - - if celery_state in KNOWN_CELERY_STATES: - # Celery has a real status for this task — use it - if options["dry_run"]: - self.stdout.write(f" [dry-run] Job {job.pk}: would update to {celery_state} (from Celery)") - continue - job.update_status(celery_state, save=False) - job.save() - self.stdout.write(self.style.SUCCESS(f"Job {job.pk}: updated to {celery_state} (from Celery)")) + prefix = "[dry-run] " if options["dry_run"] else "" + for r in results: + if r["action"] == "updated": + self.stdout.write( + self.style.SUCCESS(f"{prefix}Job {r['job_id']}: updated to {r['state']} (from Celery)") + ) else: - # No task_id, or Celery has no record (returns PENDING) — revoke - if options["dry_run"]: - self.stdout.write(f" [dry-run] Job {job.pk} ({job.status}): would revoke and clean up") - continue - job.update_status(JobState.REVOKED, save=False) - job.finished_at = timezone.now() - job.save() - cleanup_async_job_if_needed(job) - self.stdout.write(self.style.WARNING(f"Job {job.pk}: revoked (no known Celery state)")) + self.stdout.write(self.style.WARNING(f"{prefix}Job {r['job_id']}: revoked (no known Celery state)")) diff --git a/ami/jobs/tasks.py b/ami/jobs/tasks.py index 917608be0..53d348748 100644 --- a/ami/jobs/tasks.py +++ b/ami/jobs/tasks.py @@ -316,6 +316,56 @@ def _update_job_progress( cleanup_async_job_if_needed(job) +def check_stale_jobs(hours: int | None = None, dry_run: bool = False) -> list[dict]: + """ + Find jobs stuck in a running state past the cutoff and revoke them. + + For each stale job, checks Celery for a real task status. If Celery has one + (e.g. SUCCESS, FAILURE), uses that. Otherwise revokes the job and cleans up + any async resources (NATS/Redis). + + Returns a list of dicts describing what was done to each job. + """ + import datetime + + from celery import states + from celery.result import AsyncResult + + from ami.jobs.models import Job, JobState + + if hours is None: + hours = Job.FAILED_CUTOFF_HOURS + + known_celery_states = frozenset(states.ALL_STATES) - {states.PENDING} + + cutoff = datetime.datetime.now() - datetime.timedelta(hours=hours) + stale_jobs = Job.objects.filter( + status__in=JobState.running_states(), + updated_at__lt=cutoff, + ) + + results = [] + for job in stale_jobs: + celery_state = None + if job.task_id: + celery_state = AsyncResult(job.task_id).state + + if celery_state in known_celery_states: + if not dry_run: + job.update_status(celery_state, save=False) + job.save() + results.append({"job_id": job.pk, "action": "updated", "state": celery_state}) + else: + if not dry_run: + job.update_status(JobState.REVOKED, save=False) + job.finished_at = datetime.datetime.now() + job.save() + cleanup_async_job_if_needed(job) + results.append({"job_id": job.pk, "action": "revoked", "previous_status": job.status}) + + return results + + def cleanup_async_job_if_needed(job) -> None: """ Clean up async resources (NATS/Redis) if this job uses them. diff --git a/ami/jobs/tests_update_stale_jobs.py b/ami/jobs/tests_update_stale_jobs.py new file mode 100644 index 000000000..9c9c940e0 --- /dev/null +++ b/ami/jobs/tests_update_stale_jobs.py @@ -0,0 +1,66 @@ +from datetime import timedelta +from unittest.mock import patch + +from django.test import TestCase +from django.utils import timezone + +from ami.jobs.models import Job, JobState +from ami.jobs.tasks import check_stale_jobs +from ami.main.models import Project + + +class CheckStaleJobsTest(TestCase): + def setUp(self): + self.project = Project.objects.create(name="Stale jobs test project") + + def _create_job(self, status=JobState.STARTED, hours_ago=100, task_id=None): + job = Job.objects.create( + project=self.project, + name=f"Test job {status}", + status=status, + ) + Job.objects.filter(pk=job.pk).update( + updated_at=timezone.now() - timedelta(hours=hours_ago), + ) + if task_id is not None: + Job.objects.filter(pk=job.pk).update(task_id=task_id) + job.refresh_from_db() + return job + + @patch("ami.jobs.tasks.cleanup_async_job_if_needed") + def test_dry_run(self, mock_cleanup): + """Dry run returns results without modifying jobs.""" + job = self._create_job(status=JobState.STARTED) + + results = check_stale_jobs(dry_run=True) + + self.assertEqual(len(results), 1) + self.assertEqual(results[0]["action"], "revoked") + job.refresh_from_db() + self.assertEqual(job.status, JobState.STARTED.value) + mock_cleanup.assert_not_called() + + @patch("ami.jobs.tasks.cleanup_async_job_if_needed") + def test_revokes_stale_job(self, mock_cleanup): + """Stale job without a known Celery state is revoked and cleaned up.""" + job = self._create_job(status=JobState.STARTED) + + results = check_stale_jobs() + + self.assertEqual(len(results), 1) + self.assertEqual(results[0]["action"], "revoked") + job.refresh_from_db() + self.assertEqual(job.status, JobState.REVOKED.value) + self.assertIsNotNone(job.finished_at) + mock_cleanup.assert_called_once_with(job) + + @patch("ami.jobs.tasks.cleanup_async_job_if_needed") + def test_skips_recent_and_final_state_jobs(self, mock_cleanup): + """Recent jobs and jobs in final states are not touched.""" + self._create_job(status=JobState.STARTED, hours_ago=1) # recent + self._create_job(status=JobState.SUCCESS, hours_ago=200) # final state + + results = check_stale_jobs() + + self.assertEqual(results, []) + mock_cleanup.assert_not_called() From b69d4897f43b67f7cb6392f6b55dc0dc1c77a5fa Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Mon, 2 Mar 2026 23:51:22 -0800 Subject: [PATCH 3/7] refactor: move jobs tests into tests/ package Consolidate ami/jobs/tests.py, test_tasks.py, and tests_update_stale_jobs.py into an ami/jobs/tests/ package with consistent test_ prefixes. All files are now discovered by Django's default test runner pattern. Co-Authored-By: Claude --- ami/jobs/tests/__init__.py | 0 ami/jobs/{tests.py => tests/test_jobs.py} | 0 ami/jobs/{ => tests}/test_tasks.py | 0 .../test_update_stale_jobs.py} | 0 4 files changed, 0 insertions(+), 0 deletions(-) create mode 100644 ami/jobs/tests/__init__.py rename ami/jobs/{tests.py => tests/test_jobs.py} (100%) rename ami/jobs/{ => tests}/test_tasks.py (100%) rename ami/jobs/{tests_update_stale_jobs.py => tests/test_update_stale_jobs.py} (100%) diff --git a/ami/jobs/tests/__init__.py b/ami/jobs/tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/ami/jobs/tests.py b/ami/jobs/tests/test_jobs.py similarity index 100% rename from ami/jobs/tests.py rename to ami/jobs/tests/test_jobs.py diff --git a/ami/jobs/test_tasks.py b/ami/jobs/tests/test_tasks.py similarity index 100% rename from ami/jobs/test_tasks.py rename to ami/jobs/tests/test_tasks.py diff --git a/ami/jobs/tests_update_stale_jobs.py b/ami/jobs/tests/test_update_stale_jobs.py similarity index 100% rename from ami/jobs/tests_update_stale_jobs.py rename to ami/jobs/tests/test_update_stale_jobs.py From 0bf2867b2a3bb44eb0f3bb2a45f6115ae7dca167 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Mon, 2 Mar 2026 23:52:59 -0800 Subject: [PATCH 4/7] fix: correct stale job handling in check_stale_jobs() Four issues fixed: - Use states.READY_STATES instead of ALL_STATES - {PENDING} so non-terminal Celery states (STARTED, RETRY, RECEIVED) don't leave jobs stuck - Guard SUCCESS: only accept it when job.progress.is_complete(), matching the existing check in update_job_status(); otherwise revoke the job - Set finished_at in both the "updated" and "revoked" branches - Capture previous_status before calling update_status() so the result dict reflects the original state rather than the post-mutation REVOKED value Tests added for the updated-from-Celery-state and SUCCESS-with-incomplete- progress paths. Co-Authored-By: Claude --- ami/jobs/tasks.py | 22 +++++++++---- ami/jobs/tests/test_update_stale_jobs.py | 42 +++++++++++++++++++++++- 2 files changed, 56 insertions(+), 8 deletions(-) diff --git a/ami/jobs/tasks.py b/ami/jobs/tasks.py index 53d348748..b1ccb416c 100644 --- a/ami/jobs/tasks.py +++ b/ami/jobs/tasks.py @@ -320,9 +320,11 @@ def check_stale_jobs(hours: int | None = None, dry_run: bool = False) -> list[di """ Find jobs stuck in a running state past the cutoff and revoke them. - For each stale job, checks Celery for a real task status. If Celery has one - (e.g. SUCCESS, FAILURE), uses that. Otherwise revokes the job and cleans up - any async resources (NATS/Redis). + For each stale job, checks Celery for a terminal task status. If Celery reports + FAILURE or REVOKED, that state is applied. SUCCESS is only accepted when + job.progress.is_complete(), to avoid prematurely closing async_api jobs whose + NATS workers are still delivering results. All other cases result in revocation + and cleanup of async resources (NATS/Redis). Returns a list of dicts describing what was done to each job. """ @@ -336,8 +338,6 @@ def check_stale_jobs(hours: int | None = None, dry_run: bool = False) -> list[di if hours is None: hours = Job.FAILED_CUTOFF_HOURS - known_celery_states = frozenset(states.ALL_STATES) - {states.PENDING} - cutoff = datetime.datetime.now() - datetime.timedelta(hours=hours) stale_jobs = Job.objects.filter( status__in=JobState.running_states(), @@ -350,9 +350,17 @@ def check_stale_jobs(hours: int | None = None, dry_run: bool = False) -> list[di if job.task_id: celery_state = AsyncResult(job.task_id).state - if celery_state in known_celery_states: + # Only trust terminal Celery states. For SUCCESS, also require completed + # progress so async_api jobs aren't closed while NATS results are pending. + is_terminal = celery_state in states.READY_STATES + if celery_state == states.SUCCESS and not job.progress.is_complete(): + is_terminal = False + + previous_status = job.status + if is_terminal: if not dry_run: job.update_status(celery_state, save=False) + job.finished_at = datetime.datetime.now() job.save() results.append({"job_id": job.pk, "action": "updated", "state": celery_state}) else: @@ -361,7 +369,7 @@ def check_stale_jobs(hours: int | None = None, dry_run: bool = False) -> list[di job.finished_at = datetime.datetime.now() job.save() cleanup_async_job_if_needed(job) - results.append({"job_id": job.pk, "action": "revoked", "previous_status": job.status}) + results.append({"job_id": job.pk, "action": "revoked", "previous_status": previous_status}) return results diff --git a/ami/jobs/tests/test_update_stale_jobs.py b/ami/jobs/tests/test_update_stale_jobs.py index 9c9c940e0..e97e34244 100644 --- a/ami/jobs/tests/test_update_stale_jobs.py +++ b/ami/jobs/tests/test_update_stale_jobs.py @@ -48,12 +48,52 @@ def test_revokes_stale_job(self, mock_cleanup): results = check_stale_jobs() self.assertEqual(len(results), 1) - self.assertEqual(results[0]["action"], "revoked") + result = results[0] + self.assertEqual(result["action"], "revoked") + self.assertEqual(result["previous_status"], JobState.STARTED) job.refresh_from_db() self.assertEqual(job.status, JobState.REVOKED.value) self.assertIsNotNone(job.finished_at) mock_cleanup.assert_called_once_with(job) + @patch("ami.jobs.tasks.cleanup_async_job_if_needed") + @patch("celery.result.AsyncResult") + def test_updates_status_from_known_celery_state(self, mock_async_result, mock_cleanup): + """Stale job with a terminal Celery state is updated (not revoked).""" + from celery import states + + mock_async_result.return_value.state = states.FAILURE + job = self._create_job(status=JobState.STARTED, task_id="some-celery-task-id") + + results = check_stale_jobs() + + self.assertEqual(len(results), 1) + result = results[0] + self.assertEqual(result["action"], "updated") + self.assertEqual(result["state"], states.FAILURE) + job.refresh_from_db() + self.assertEqual(job.status, JobState.FAILURE.value) + self.assertIsNotNone(job.finished_at) + mock_cleanup.assert_not_called() + + @patch("ami.jobs.tasks.cleanup_async_job_if_needed") + @patch("celery.result.AsyncResult") + def test_revokes_success_with_incomplete_progress(self, mock_async_result, mock_cleanup): + """Stale job where Celery reports SUCCESS but progress is incomplete is revoked.""" + from celery import states + + mock_async_result.return_value.state = states.SUCCESS + job = self._create_job(status=JobState.STARTED, task_id="some-celery-task-id") + # job.progress.is_complete() returns False by default (no stages completed) + + results = check_stale_jobs() + + self.assertEqual(len(results), 1) + self.assertEqual(results[0]["action"], "revoked") + job.refresh_from_db() + self.assertEqual(job.status, JobState.REVOKED.value) + mock_cleanup.assert_called_once_with(job) + @patch("ami.jobs.tasks.cleanup_async_job_if_needed") def test_skips_recent_and_final_state_jobs(self, mock_cleanup): """Recent jobs and jobs in final states are not touched.""" From 1a733b9ff07a391a1e78f2c39e314e45bed619cc Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Tue, 3 Mar 2026 00:07:54 -0800 Subject: [PATCH 5/7] fix: extend async_api progress guard to FAILURE, add cleanup to terminal branch Two issues from code review: - Guard FAILURE the same as SUCCESS for async_api jobs: if Celery reports SUCCESS or FAILURE but progress is incomplete, treat as non-terminal and revoke instead, matching the AsyncJobStateManager convention - Call cleanup_async_job_if_needed() in the is_terminal branch so NATS/Redis resources are freed for recovered jobs, not only for revoked ones Co-Authored-By: Claude --- ami/jobs/tasks.py | 21 ++++++++++++--------- ami/jobs/tests/test_update_stale_jobs.py | 8 +++++--- 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/ami/jobs/tasks.py b/ami/jobs/tasks.py index b1ccb416c..618e535d4 100644 --- a/ami/jobs/tasks.py +++ b/ami/jobs/tasks.py @@ -320,11 +320,11 @@ def check_stale_jobs(hours: int | None = None, dry_run: bool = False) -> list[di """ Find jobs stuck in a running state past the cutoff and revoke them. - For each stale job, checks Celery for a terminal task status. If Celery reports - FAILURE or REVOKED, that state is applied. SUCCESS is only accepted when - job.progress.is_complete(), to avoid prematurely closing async_api jobs whose - NATS workers are still delivering results. All other cases result in revocation - and cleanup of async resources (NATS/Redis). + For each stale job, checks Celery for a terminal task status. REVOKED is + always trusted. For async_api jobs, SUCCESS and FAILURE are only accepted + when job.progress.is_complete() — NATS workers may still be delivering + results after the Celery task finishes. All other cases result in revocation. + Async resources (NATS/Redis) are cleaned up in both branches. Returns a list of dicts describing what was done to each job. """ @@ -333,7 +333,7 @@ def check_stale_jobs(hours: int | None = None, dry_run: bool = False) -> list[di from celery import states from celery.result import AsyncResult - from ami.jobs.models import Job, JobState + from ami.jobs.models import Job, JobDispatchMode, JobState if hours is None: hours = Job.FAILED_CUTOFF_HOURS @@ -350,10 +350,12 @@ def check_stale_jobs(hours: int | None = None, dry_run: bool = False) -> list[di if job.task_id: celery_state = AsyncResult(job.task_id).state - # Only trust terminal Celery states. For SUCCESS, also require completed - # progress so async_api jobs aren't closed while NATS results are pending. + # Only trust terminal Celery states. For async_api jobs, SUCCESS and + # FAILURE are only accepted when progress is complete — NATS workers may + # still be delivering results after the Celery task finishes. is_terminal = celery_state in states.READY_STATES - if celery_state == states.SUCCESS and not job.progress.is_complete(): + is_async_api = job.dispatch_mode == JobDispatchMode.ASYNC_API + if is_async_api and celery_state in {states.SUCCESS, states.FAILURE} and not job.progress.is_complete(): is_terminal = False previous_status = job.status @@ -362,6 +364,7 @@ def check_stale_jobs(hours: int | None = None, dry_run: bool = False) -> list[di job.update_status(celery_state, save=False) job.finished_at = datetime.datetime.now() job.save() + cleanup_async_job_if_needed(job) results.append({"job_id": job.pk, "action": "updated", "state": celery_state}) else: if not dry_run: diff --git a/ami/jobs/tests/test_update_stale_jobs.py b/ami/jobs/tests/test_update_stale_jobs.py index e97e34244..63aa64d94 100644 --- a/ami/jobs/tests/test_update_stale_jobs.py +++ b/ami/jobs/tests/test_update_stale_jobs.py @@ -4,7 +4,7 @@ from django.test import TestCase from django.utils import timezone -from ami.jobs.models import Job, JobState +from ami.jobs.models import Job, JobDispatchMode, JobState from ami.jobs.tasks import check_stale_jobs from ami.main.models import Project @@ -74,16 +74,18 @@ def test_updates_status_from_known_celery_state(self, mock_async_result, mock_cl job.refresh_from_db() self.assertEqual(job.status, JobState.FAILURE.value) self.assertIsNotNone(job.finished_at) - mock_cleanup.assert_not_called() + mock_cleanup.assert_called_once_with(job) @patch("ami.jobs.tasks.cleanup_async_job_if_needed") @patch("celery.result.AsyncResult") def test_revokes_success_with_incomplete_progress(self, mock_async_result, mock_cleanup): - """Stale job where Celery reports SUCCESS but progress is incomplete is revoked.""" + """async_api job where Celery reports SUCCESS but progress is incomplete is revoked.""" from celery import states mock_async_result.return_value.state = states.SUCCESS job = self._create_job(status=JobState.STARTED, task_id="some-celery-task-id") + Job.objects.filter(pk=job.pk).update(dispatch_mode=JobDispatchMode.ASYNC_API) + job.refresh_from_db() # job.progress.is_complete() returns False by default (no stages completed) results = check_stale_jobs() From 5bb47e55a61651848aab44224c63cdfad6ac4bb8 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Tue, 3 Mar 2026 00:20:11 -0800 Subject: [PATCH 6/7] fix: use select_for_update to prevent concurrent stale-job processing Fetch stale job PKs first, then re-acquire each inside transaction.atomic() with select_for_update(). The re-fetch re-checks running state and cutoff so a job handled by a concurrent run is skipped (DoesNotExist) rather than double-processed. Async resource cleanup (NATS/Redis) runs outside the transaction to avoid holding the row lock during network calls. Matches the pattern used by _fail_job() and _update_job_progress(). Co-Authored-By: Claude --- ami/jobs/tasks.py | 75 ++++++++++++++++++++++++++++++----------------- 1 file changed, 48 insertions(+), 27 deletions(-) diff --git a/ami/jobs/tasks.py b/ami/jobs/tasks.py index 618e535d4..ff079cf62 100644 --- a/ami/jobs/tasks.py +++ b/ami/jobs/tasks.py @@ -332,6 +332,7 @@ def check_stale_jobs(hours: int | None = None, dry_run: bool = False) -> list[di from celery import states from celery.result import AsyncResult + from django.db import transaction from ami.jobs.models import Job, JobDispatchMode, JobState @@ -339,39 +340,59 @@ def check_stale_jobs(hours: int | None = None, dry_run: bool = False) -> list[di hours = Job.FAILED_CUTOFF_HOURS cutoff = datetime.datetime.now() - datetime.timedelta(hours=hours) - stale_jobs = Job.objects.filter( - status__in=JobState.running_states(), - updated_at__lt=cutoff, + stale_pks = list( + Job.objects.filter( + status__in=JobState.running_states(), + updated_at__lt=cutoff, + ).values_list("pk", flat=True) ) results = [] - for job in stale_jobs: - celery_state = None - if job.task_id: - celery_state = AsyncResult(job.task_id).state - - # Only trust terminal Celery states. For async_api jobs, SUCCESS and - # FAILURE are only accepted when progress is complete — NATS workers may - # still be delivering results after the Celery task finishes. - is_terminal = celery_state in states.READY_STATES - is_async_api = job.dispatch_mode == JobDispatchMode.ASYNC_API - if is_async_api and celery_state in {states.SUCCESS, states.FAILURE} and not job.progress.is_complete(): - is_terminal = False - - previous_status = job.status + for pk in stale_pks: + with transaction.atomic(): + try: + job = Job.objects.select_for_update().get( + pk=pk, + status__in=JobState.running_states(), + updated_at__lt=cutoff, + ) + except Job.DoesNotExist: + # Another concurrent run already handled this job. + continue + + celery_state = None + if job.task_id: + celery_state = AsyncResult(job.task_id).state + + # Only trust terminal Celery states. For async_api jobs, SUCCESS and + # FAILURE are only accepted when progress is complete — NATS workers may + # still be delivering results after the Celery task finishes. + is_terminal = celery_state in states.READY_STATES + is_async_api = job.dispatch_mode == JobDispatchMode.ASYNC_API + if is_async_api and celery_state in {states.SUCCESS, states.FAILURE} and not job.progress.is_complete(): + is_terminal = False + + previous_status = job.status + if is_terminal: + if not dry_run: + job.update_status(celery_state, save=False) + job.finished_at = datetime.datetime.now() + job.save() + else: + if not dry_run: + job.update_status(JobState.REVOKED, save=False) + job.finished_at = datetime.datetime.now() + job.save() + + # Async resource cleanup runs outside the transaction — it makes network + # calls (NATS/Redis) that should not hold the DB row lock. + if not dry_run: + job.refresh_from_db() + cleanup_async_job_if_needed(job) + if is_terminal: - if not dry_run: - job.update_status(celery_state, save=False) - job.finished_at = datetime.datetime.now() - job.save() - cleanup_async_job_if_needed(job) results.append({"job_id": job.pk, "action": "updated", "state": celery_state}) else: - if not dry_run: - job.update_status(JobState.REVOKED, save=False) - job.finished_at = datetime.datetime.now() - job.save() - cleanup_async_job_if_needed(job) results.append({"job_id": job.pk, "action": "revoked", "previous_status": previous_status}) return results From 2c2d3f5046748eeda0e0e534f3bd2a621bb73ff5 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Tue, 3 Mar 2026 00:25:30 -0800 Subject: [PATCH 7/7] fix: catch Celery backend errors in stale-job cleanup loop Wrap AsyncResult(task_id).state in try-except so a single broker/backend failure doesn't abort the entire batch. Failed lookups are logged and the job is revoked as if Celery state were unknown. Co-Authored-By: Claude --- ami/jobs/tasks.py | 11 ++++++++++- ami/jobs/tests/test_update_stale_jobs.py | 15 +++++++++++++++ 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/ami/jobs/tasks.py b/ami/jobs/tasks.py index ff079cf62..d8ff89d39 100644 --- a/ami/jobs/tasks.py +++ b/ami/jobs/tasks.py @@ -362,7 +362,16 @@ def check_stale_jobs(hours: int | None = None, dry_run: bool = False) -> list[di celery_state = None if job.task_id: - celery_state = AsyncResult(job.task_id).state + try: + celery_state = AsyncResult(job.task_id).state + except Exception: + logger.warning( + "Failed to fetch Celery state for stale job %s (task_id=%s)", + job.pk, + job.task_id, + exc_info=True, + ) + # Treat as unknown state — job will be revoked below. # Only trust terminal Celery states. For async_api jobs, SUCCESS and # FAILURE are only accepted when progress is complete — NATS workers may diff --git a/ami/jobs/tests/test_update_stale_jobs.py b/ami/jobs/tests/test_update_stale_jobs.py index 63aa64d94..4a1e44427 100644 --- a/ami/jobs/tests/test_update_stale_jobs.py +++ b/ami/jobs/tests/test_update_stale_jobs.py @@ -96,6 +96,21 @@ def test_revokes_success_with_incomplete_progress(self, mock_async_result, mock_ self.assertEqual(job.status, JobState.REVOKED.value) mock_cleanup.assert_called_once_with(job) + @patch("ami.jobs.tasks.cleanup_async_job_if_needed") + @patch("celery.result.AsyncResult") + def test_revokes_when_celery_lookup_fails(self, mock_async_result, mock_cleanup): + """Job is revoked if Celery state lookup raises an exception.""" + mock_async_result.side_effect = ConnectionError("broker down") + job = self._create_job(status=JobState.STARTED, task_id="unreachable-task") + + results = check_stale_jobs() + + self.assertEqual(len(results), 1) + self.assertEqual(results[0]["action"], "revoked") + job.refresh_from_db() + self.assertEqual(job.status, JobState.REVOKED.value) + mock_cleanup.assert_called_once_with(job) + @patch("ami.jobs.tasks.cleanup_async_job_if_needed") def test_skips_recent_and_final_state_jobs(self, mock_cleanup): """Recent jobs and jobs in final states are not touched."""