From f470bc617794272d2c0f0be7a848d54a343c31f5 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Wed, 15 Apr 2026 11:58:16 -0700 Subject: [PATCH 1/2] fix(jobs): stop coercing partial-progress to 100% on FAILURE, tighten stalled-job reaper MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two related fixes for the premature ``cleanup_async_job_resources`` path that flips async_api jobs to FAILURE within ~30-55s of dispatch while NATS is still delivering results: 1. ``Job.update_progress`` no longer silently bumps ``stage.progress = 1`` when ``stage.status`` is in a final state. The bumped value made ``is_complete()`` return True and triggered cleanup mid-flight. The trigger was ``_update_job_progress`` writing ``status=FAILURE`` at partial progress as soon as ``failed/total`` crossed FAILURE_THRESHOLD (very easy early in a job — 1-2 errors out of the first few results). Progress is a measurement; leave it alone. The honest FAILURE trip happens when the stage actually reaches 100%. 2. Split ``Job.FAILED_CUTOFF_HOURS`` (originally added in PR #368 to hide failed jobs from UI listings after 3 days) into: - ``FAILED_JOBS_DISPLAY_MAX_HOURS = 24 * 3`` — original UI/API filter - ``STALLED_JOBS_MAX_MINUTES = 10`` — reaper threshold The 72h value was reused by PR #1227's stale-job reaper for an entirely different purpose, leaving stuck jobs invisible for 3 days. ``check_stale_jobs`` now defaults to 10 minutes against ``updated_at`` (already in place from #1227), so a job whose worker pool stops pulling messages gets reaped within one Beat tick (~10-25 min). ``--hours`` arg on the management command becomes ``--minutes`` to match. Together: fix #1 stops the false-positive FAILUREs; fix #2 ensures true positives are caught quickly without depending on the misfiring coercion. Co-Authored-By: Claude --- .../management/commands/update_stale_jobs.py | 8 +++--- ami/jobs/models.py | 27 ++++++++++++++----- ami/jobs/tasks.py | 15 +++++++---- ami/jobs/tests/test_jobs.py | 20 ++++++++++++++ ami/jobs/tests/test_periodic_beat_tasks.py | 6 ++--- ami/jobs/tests/test_update_stale_jobs.py | 8 +++--- ami/jobs/views.py | 2 +- 7 files changed, 62 insertions(+), 24 deletions(-) diff --git a/ami/jobs/management/commands/update_stale_jobs.py b/ami/jobs/management/commands/update_stale_jobs.py index 0f19933dd..b7620c04d 100644 --- a/ami/jobs/management/commands/update_stale_jobs.py +++ b/ami/jobs/management/commands/update_stale_jobs.py @@ -9,10 +9,10 @@ class Command(BaseCommand): def add_arguments(self, parser): parser.add_argument( - "--hours", + "--minutes", type=int, - default=Job.FAILED_CUTOFF_HOURS, - help="Number of hours to consider a job stale (default: %(default)s)", + default=Job.STALLED_JOBS_MAX_MINUTES, + help="Minutes since last update to consider a job stale (default: %(default)s)", ) parser.add_argument( "--dry-run", @@ -21,7 +21,7 @@ def add_arguments(self, parser): ) def handle(self, *args, **options): - results = check_stale_jobs(hours=options["hours"], dry_run=options["dry_run"]) + results = check_stale_jobs(minutes=options["minutes"], dry_run=options["dry_run"]) if not results: self.stdout.write("No stale jobs found.") diff --git a/ami/jobs/models.py b/ami/jobs/models.py index 3cc5c8627..17549b3e9 100644 --- a/ami/jobs/models.py +++ b/ami/jobs/models.py @@ -816,8 +816,14 @@ def get_job_type_by_inferred_key(job: "Job") -> type[JobType] | None: class Job(BaseModel): """A job to be run by the scheduler""" - # Hide old failed jobs after 3 days - FAILED_CUTOFF_HOURS = 24 * 3 + # UI/API: hide failed jobs older than this from listings (display filter only). + FAILED_JOBS_DISPLAY_MAX_HOURS = 24 * 3 + # Reaper: revoke jobs in :meth:`JobState.running_states` whose ``updated_at`` + # is older than this. A healthy async_api job bumps ``updated_at`` on every + # Redis SREM-driven progress save, so this is effectively "no progress for + # N minutes". 10 is conservative; raise if legitimate long-running jobs get + # reaped. + STALLED_JOBS_MAX_MINUTES = 10 name = models.CharField(max_length=255) queue = models.CharField(max_length=255, default="default") @@ -1037,14 +1043,21 @@ def update_progress(self, save=True): else: for stage in self.progress.stages: if stage.progress > 0 and stage.status == JobState.CREATED: - # Update any stages that have started but are still in the CREATED state + # Promote stages that have started but are still in the CREATED state. stage.status = JobState.STARTED - elif stage.status in JobState.final_states() and stage.progress < 1: - # Update any stages that are complete but have a progress less than 1 - stage.progress = 1 elif stage.progress == 1 and stage.status not in JobState.final_states(): - # Update any stages that are complete but are still in the STARTED state + # Promote stages that have measured-100% progress but are still STARTED. stage.status = JobState.SUCCESS + # Note: do NOT coerce ``stage.progress = 1`` when status is in a + # final state but progress < 1. That branch used to fire when + # ``_update_job_progress`` wrote ``status=FAILURE`` at partial + # progress (e.g. failed/total temporarily crossed FAILURE_THRESHOLD + # early in an async_api job). The save-time coercion silently + # bumped progress to 100%, which made ``is_complete()`` return True + # and triggered premature ``cleanup_async_job_resources`` while + # NATS was still delivering results. Progress is a measurement; + # leave it alone. Stuck jobs are reaped by ``check_stale_jobs`` + # via ``Job.STALLED_JOBS_MAX_MINUTES``. total_progress = sum([stage.progress for stage in self.progress.stages]) / len(self.progress.stages) self.progress.summary.progress = total_progress diff --git a/ami/jobs/tasks.py b/ami/jobs/tasks.py index de2700593..508f5cb72 100644 --- a/ami/jobs/tasks.py +++ b/ami/jobs/tasks.py @@ -377,10 +377,15 @@ def _update_job_progress( cleanup_async_job_if_needed(job) -def check_stale_jobs(hours: int | None = None, dry_run: bool = False) -> list[dict]: +def check_stale_jobs(minutes: int | None = None, dry_run: bool = False) -> list[dict]: """ Find jobs stuck in a running state past the cutoff and revoke them. + Cutoff is measured against ``Job.updated_at`` (auto-bumped on every save), + so a job that's actively making progress — including async_api jobs that + bump on each Redis SREM-driven progress save — is never reaped while + healthy. Default cutoff is :attr:`Job.STALLED_JOBS_MAX_MINUTES`. + For each stale job, checks Celery for a terminal task status. REVOKED is always trusted. For async_api jobs, SUCCESS and FAILURE are only accepted when job.progress.is_complete() — NATS workers may still be delivering @@ -397,10 +402,10 @@ def check_stale_jobs(hours: int | None = None, dry_run: bool = False) -> list[di from ami.jobs.models import Job, JobDispatchMode, JobState - if hours is None: - hours = Job.FAILED_CUTOFF_HOURS + if minutes is None: + minutes = Job.STALLED_JOBS_MAX_MINUTES - cutoff = datetime.datetime.now() - datetime.timedelta(hours=hours) + cutoff = datetime.datetime.now() - datetime.timedelta(minutes=minutes) stale_pks = list( Job.objects.filter( status__in=JobState.running_states(), @@ -492,7 +497,7 @@ class JobsHealthCheckResult: def _run_stale_jobs_check() -> IntegrityCheckResult: - """Reconcile jobs stuck in running states past FAILED_CUTOFF_HOURS.""" + """Reconcile jobs stuck in running states past Job.STALLED_JOBS_MAX_MINUTES.""" results = check_stale_jobs() updated = sum(1 for r in results if r["action"] == "updated") revoked = sum(1 for r in results if r["action"] == "revoked") diff --git a/ami/jobs/tests/test_jobs.py b/ami/jobs/tests/test_jobs.py index 7241b0a57..ae302d193 100644 --- a/ami/jobs/tests/test_jobs.py +++ b/ami/jobs/tests/test_jobs.py @@ -36,6 +36,26 @@ def test_create_job(self): self.assertEqual(job.progress.summary.progress, 0) self.assertEqual(job.progress.stages, []) + def test_save_does_not_inflate_failed_stage_progress(self): + """A stage marked FAILURE at partial progress must keep its measured value. + + Regression for the premature ``cleanup_async_job_resources`` path: when a + worker writes ``status=FAILURE`` at partial progress (e.g. failed/total + crossed FAILURE_THRESHOLD on an early result), ``Job.update_progress`` + used to coerce ``stage.progress = 1`` on the next save. That made + ``is_complete()`` return True and triggered cleanup while async results + were still in flight. Progress is a measurement; leave it alone. + """ + job = Job.objects.create(project=self.project, name="Test job - partial failure") + job.progress.add_stage("results") + job.progress.update_stage("results", progress=0.3, status=JobState.FAILURE) + job.save() + + results_stage = job.progress.get_stage("results") + self.assertEqual(results_stage.progress, 0.3) + self.assertEqual(results_stage.status, JobState.FAILURE) + self.assertFalse(job.progress.is_complete()) + def test_create_job_with_delay(self): job = Job.objects.create( job_type_key=MLJob.key, diff --git a/ami/jobs/tests/test_periodic_beat_tasks.py b/ami/jobs/tests/test_periodic_beat_tasks.py index eaf2f3368..383b3af81 100644 --- a/ami/jobs/tests/test_periodic_beat_tasks.py +++ b/ami/jobs/tests/test_periodic_beat_tasks.py @@ -19,9 +19,9 @@ class JobsHealthCheckTest(TestCase): def setUp(self): self.project = Project.objects.create(name="Beat schedule test project") - def _create_stale_job(self, status=JobState.STARTED, hours_ago=100): + def _create_stale_job(self, status=JobState.STARTED, minutes_ago=120): job = Job.objects.create(project=self.project, name="stale", status=status) - Job.objects.filter(pk=job.pk).update(updated_at=timezone.now() - timedelta(hours=hours_ago)) + Job.objects.filter(pk=job.pk).update(updated_at=timezone.now() - timedelta(minutes=minutes_ago)) job.refresh_from_db() return job @@ -55,7 +55,7 @@ def test_reports_both_sub_check_results(self, mock_manager_cls, _mock_cleanup): def test_idle_deployment_returns_all_zeros(self, mock_manager_cls, _mock_cleanup): # No stale jobs, no running async jobs. - self._create_stale_job(hours_ago=1) # recent — not stale + self._create_stale_job(minutes_ago=5) # recent — not stale self._stub_manager(mock_manager_cls) self.assertEqual( diff --git a/ami/jobs/tests/test_update_stale_jobs.py b/ami/jobs/tests/test_update_stale_jobs.py index 4a1e44427..0f0a2a215 100644 --- a/ami/jobs/tests/test_update_stale_jobs.py +++ b/ami/jobs/tests/test_update_stale_jobs.py @@ -13,14 +13,14 @@ class CheckStaleJobsTest(TestCase): def setUp(self): self.project = Project.objects.create(name="Stale jobs test project") - def _create_job(self, status=JobState.STARTED, hours_ago=100, task_id=None): + def _create_job(self, status=JobState.STARTED, minutes_ago=120, task_id=None): job = Job.objects.create( project=self.project, name=f"Test job {status}", status=status, ) Job.objects.filter(pk=job.pk).update( - updated_at=timezone.now() - timedelta(hours=hours_ago), + updated_at=timezone.now() - timedelta(minutes=minutes_ago), ) if task_id is not None: Job.objects.filter(pk=job.pk).update(task_id=task_id) @@ -114,8 +114,8 @@ def test_revokes_when_celery_lookup_fails(self, mock_async_result, mock_cleanup) @patch("ami.jobs.tasks.cleanup_async_job_if_needed") def test_skips_recent_and_final_state_jobs(self, mock_cleanup): """Recent jobs and jobs in final states are not touched.""" - self._create_job(status=JobState.STARTED, hours_ago=1) # recent - self._create_job(status=JobState.SUCCESS, hours_ago=200) # final state + self._create_job(status=JobState.STARTED, minutes_ago=5) # recent + self._create_job(status=JobState.SUCCESS, minutes_ago=300) # final state results = check_stale_jobs() diff --git a/ami/jobs/views.py b/ami/jobs/views.py index 625fb8b47..4caf37e10 100644 --- a/ami/jobs/views.py +++ b/ami/jobs/views.py @@ -223,7 +223,7 @@ def get_queryset(self) -> QuerySet: if project: jobs = jobs.filter(project=project) cutoff_hours = IntegerField(required=False, min_value=0).clean( - self.request.query_params.get("cutoff_hours", Job.FAILED_CUTOFF_HOURS) + self.request.query_params.get("cutoff_hours", Job.FAILED_JOBS_DISPLAY_MAX_HOURS) ) # Filter out completed jobs that have not been updated in the last X hours cutoff_datetime = timezone.now() - timezone.timedelta(hours=cutoff_hours) From 6218d380cb5b1ced8dcc8be5137a4e0030987dd2 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Wed, 15 Apr 2026 12:02:59 -0700 Subject: [PATCH 2/2] feat(jobs): log per-job diagnostic on stalled-job revoke When the reaper revokes a job, log a single WARN line capturing the state needed to triage "why was this stalled?" without grepping back through prior tick logs: - minutes since last update vs threshold - previous_status, dispatch_mode, celery_state - per-stage progress + status summary - pointer to running_job_snapshots for prior NATS consumer state Pairs with PR #1227's per-tick consumer snapshots: every 15 min, each running async_api job gets a NATS state snapshot; this new line tells operators which of those snapshots is the last one before revocation. Co-Authored-By: Claude --- ami/jobs/tasks.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/ami/jobs/tasks.py b/ami/jobs/tasks.py index 508f5cb72..64890f66f 100644 --- a/ami/jobs/tasks.py +++ b/ami/jobs/tasks.py @@ -454,6 +454,23 @@ def check_stale_jobs(minutes: int | None = None, dry_run: bool = False) -> list[ job.finished_at = datetime.datetime.now() job.save() else: + # Per-job diagnostic: surface enough state at revoke time that an + # operator can answer "why was this stalled?" without grepping + # back through tick logs. Pairs with the per-tick NATS consumer + # snapshots logged by ``_run_running_job_snapshot_check``. + stalled_minutes = (datetime.datetime.now() - job.updated_at).total_seconds() / 60 + stages_summary = ( + ", ".join(f"{s.key}={s.progress*100:.1f}% {s.status}" for s in job.progress.stages) + or "(no stages)" + ) + job.logger.warning( + f"Reaping stalled job: no progress for {stalled_minutes:.1f} min " + f"(threshold {minutes} min). previous_status={previous_status}, " + f"celery_state={celery_state}, dispatch_mode={job.dispatch_mode}, " + f"stages: {stages_summary}. " + f"For NATS consumer state at the last tick, see prior " + f"running_job_snapshots logs for this job." + ) if not dry_run: job.update_status(JobState.REVOKED, save=False) job.finished_at = datetime.datetime.now()