Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions ami/jobs/management/commands/update_stale_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ class Command(BaseCommand):

def add_arguments(self, parser):
parser.add_argument(
"--hours",
"--minutes",
type=int,
default=Job.FAILED_CUTOFF_HOURS,
help="Number of hours to consider a job stale (default: %(default)s)",
default=Job.STALLED_JOBS_MAX_MINUTES,
help="Minutes since last update to consider a job stale (default: %(default)s)",
)
Comment thread
mihow marked this conversation as resolved.
parser.add_argument(
"--dry-run",
Expand All @@ -21,7 +21,7 @@ def add_arguments(self, parser):
)

def handle(self, *args, **options):
results = check_stale_jobs(hours=options["hours"], dry_run=options["dry_run"])
results = check_stale_jobs(minutes=options["minutes"], dry_run=options["dry_run"])

if not results:
self.stdout.write("No stale jobs found.")
Expand Down
27 changes: 20 additions & 7 deletions ami/jobs/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -816,8 +816,14 @@ def get_job_type_by_inferred_key(job: "Job") -> type[JobType] | None:
class Job(BaseModel):
"""A job to be run by the scheduler"""

# Hide old failed jobs after 3 days
FAILED_CUTOFF_HOURS = 24 * 3
# UI/API: hide failed jobs older than this from listings (display filter only).
FAILED_JOBS_DISPLAY_MAX_HOURS = 24 * 3
# Reaper: revoke jobs in :meth:`JobState.running_states` whose ``updated_at``
# is older than this. A healthy async_api job bumps ``updated_at`` on every
# Redis SREM-driven progress save, so this is effectively "no progress for
# N minutes". 10 is conservative; raise if legitimate long-running jobs get
# reaped.
STALLED_JOBS_MAX_MINUTES = 10

name = models.CharField(max_length=255)
queue = models.CharField(max_length=255, default="default")
Expand Down Expand Up @@ -1037,14 +1043,21 @@ def update_progress(self, save=True):
else:
for stage in self.progress.stages:
if stage.progress > 0 and stage.status == JobState.CREATED:
# Update any stages that have started but are still in the CREATED state
# Promote stages that have started but are still in the CREATED state.
stage.status = JobState.STARTED
elif stage.status in JobState.final_states() and stage.progress < 1:
# Update any stages that are complete but have a progress less than 1
stage.progress = 1
elif stage.progress == 1 and stage.status not in JobState.final_states():
# Update any stages that are complete but are still in the STARTED state
# Promote stages that have measured-100% progress but are still STARTED.
stage.status = JobState.SUCCESS
# Note: do NOT coerce ``stage.progress = 1`` when status is in a
# final state but progress < 1. That branch used to fire when
# ``_update_job_progress`` wrote ``status=FAILURE`` at partial
# progress (e.g. failed/total temporarily crossed FAILURE_THRESHOLD
# early in an async_api job). The save-time coercion silently
# bumped progress to 100%, which made ``is_complete()`` return True
# and triggered premature ``cleanup_async_job_resources`` while
# NATS was still delivering results. Progress is a measurement;
# leave it alone. Stuck jobs are reaped by ``check_stale_jobs``
# via ``Job.STALLED_JOBS_MAX_MINUTES``.
total_progress = sum([stage.progress for stage in self.progress.stages]) / len(self.progress.stages)

self.progress.summary.progress = total_progress
Expand Down
15 changes: 10 additions & 5 deletions ami/jobs/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,10 +377,15 @@ def _update_job_progress(
cleanup_async_job_if_needed(job)


def check_stale_jobs(hours: int | None = None, dry_run: bool = False) -> list[dict]:
def check_stale_jobs(minutes: int | None = None, dry_run: bool = False) -> list[dict]:
"""
Find jobs stuck in a running state past the cutoff and revoke them.

Cutoff is measured against ``Job.updated_at`` (auto-bumped on every save),
so a job that's actively making progress — including async_api jobs that
bump on each Redis SREM-driven progress save — is never reaped while
healthy. Default cutoff is :attr:`Job.STALLED_JOBS_MAX_MINUTES`.

For each stale job, checks Celery for a terminal task status. REVOKED is
always trusted. For async_api jobs, SUCCESS and FAILURE are only accepted
when job.progress.is_complete() — NATS workers may still be delivering
Expand All @@ -397,10 +402,10 @@ def check_stale_jobs(hours: int | None = None, dry_run: bool = False) -> list[di

from ami.jobs.models import Job, JobDispatchMode, JobState

if hours is None:
hours = Job.FAILED_CUTOFF_HOURS
if minutes is None:
minutes = Job.STALLED_JOBS_MAX_MINUTES

cutoff = datetime.datetime.now() - datetime.timedelta(hours=hours)
cutoff = datetime.datetime.now() - datetime.timedelta(minutes=minutes)
stale_pks = list(
Job.objects.filter(
status__in=JobState.running_states(),
Expand Down Expand Up @@ -492,7 +497,7 @@ class JobsHealthCheckResult:


def _run_stale_jobs_check() -> IntegrityCheckResult:
"""Reconcile jobs stuck in running states past FAILED_CUTOFF_HOURS."""
"""Reconcile jobs stuck in running states past Job.STALLED_JOBS_MAX_MINUTES."""
results = check_stale_jobs()
updated = sum(1 for r in results if r["action"] == "updated")
revoked = sum(1 for r in results if r["action"] == "revoked")
Expand Down
20 changes: 20 additions & 0 deletions ami/jobs/tests/test_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,26 @@ def test_create_job(self):
self.assertEqual(job.progress.summary.progress, 0)
self.assertEqual(job.progress.stages, [])

def test_save_does_not_inflate_failed_stage_progress(self):
"""A stage marked FAILURE at partial progress must keep its measured value.

Regression for the premature ``cleanup_async_job_resources`` path: when a
worker writes ``status=FAILURE`` at partial progress (e.g. failed/total
crossed FAILURE_THRESHOLD on an early result), ``Job.update_progress``
used to coerce ``stage.progress = 1`` on the next save. That made
``is_complete()`` return True and triggered cleanup while async results
were still in flight. Progress is a measurement; leave it alone.
"""
job = Job.objects.create(project=self.project, name="Test job - partial failure")
job.progress.add_stage("results")
job.progress.update_stage("results", progress=0.3, status=JobState.FAILURE)
job.save()

results_stage = job.progress.get_stage("results")
self.assertEqual(results_stage.progress, 0.3)
self.assertEqual(results_stage.status, JobState.FAILURE)
self.assertFalse(job.progress.is_complete())

def test_create_job_with_delay(self):
job = Job.objects.create(
job_type_key=MLJob.key,
Expand Down
6 changes: 3 additions & 3 deletions ami/jobs/tests/test_periodic_beat_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ class JobsHealthCheckTest(TestCase):
def setUp(self):
self.project = Project.objects.create(name="Beat schedule test project")

def _create_stale_job(self, status=JobState.STARTED, hours_ago=100):
def _create_stale_job(self, status=JobState.STARTED, minutes_ago=120):
job = Job.objects.create(project=self.project, name="stale", status=status)
Job.objects.filter(pk=job.pk).update(updated_at=timezone.now() - timedelta(hours=hours_ago))
Job.objects.filter(pk=job.pk).update(updated_at=timezone.now() - timedelta(minutes=minutes_ago))
job.refresh_from_db()
return job

Expand Down Expand Up @@ -55,7 +55,7 @@ def test_reports_both_sub_check_results(self, mock_manager_cls, _mock_cleanup):

def test_idle_deployment_returns_all_zeros(self, mock_manager_cls, _mock_cleanup):
# No stale jobs, no running async jobs.
self._create_stale_job(hours_ago=1) # recent — not stale
self._create_stale_job(minutes_ago=5) # recent — not stale
self._stub_manager(mock_manager_cls)

self.assertEqual(
Expand Down
8 changes: 4 additions & 4 deletions ami/jobs/tests/test_update_stale_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ class CheckStaleJobsTest(TestCase):
def setUp(self):
self.project = Project.objects.create(name="Stale jobs test project")

def _create_job(self, status=JobState.STARTED, hours_ago=100, task_id=None):
def _create_job(self, status=JobState.STARTED, minutes_ago=120, task_id=None):
job = Job.objects.create(
project=self.project,
name=f"Test job {status}",
status=status,
)
Job.objects.filter(pk=job.pk).update(
updated_at=timezone.now() - timedelta(hours=hours_ago),
updated_at=timezone.now() - timedelta(minutes=minutes_ago),
)
if task_id is not None:
Job.objects.filter(pk=job.pk).update(task_id=task_id)
Expand Down Expand Up @@ -114,8 +114,8 @@ def test_revokes_when_celery_lookup_fails(self, mock_async_result, mock_cleanup)
@patch("ami.jobs.tasks.cleanup_async_job_if_needed")
def test_skips_recent_and_final_state_jobs(self, mock_cleanup):
"""Recent jobs and jobs in final states are not touched."""
self._create_job(status=JobState.STARTED, hours_ago=1) # recent
self._create_job(status=JobState.SUCCESS, hours_ago=200) # final state
self._create_job(status=JobState.STARTED, minutes_ago=5) # recent
self._create_job(status=JobState.SUCCESS, minutes_ago=300) # final state

results = check_stale_jobs()

Expand Down
2 changes: 1 addition & 1 deletion ami/jobs/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ def get_queryset(self) -> QuerySet:
if project:
jobs = jobs.filter(project=project)
cutoff_hours = IntegerField(required=False, min_value=0).clean(
self.request.query_params.get("cutoff_hours", Job.FAILED_CUTOFF_HOURS)
self.request.query_params.get("cutoff_hours", Job.FAILED_JOBS_DISPLAY_MAX_HOURS)
)
# Filter out completed jobs that have not been updated in the last X hours
cutoff_datetime = timezone.now() - timezone.timedelta(hours=cutoff_hours)
Expand Down
Loading