From 50677444a57e08dd18576ce70669eee3e62d667f Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Mon, 20 Apr 2026 19:05:40 -0700 Subject: [PATCH 01/15] fix(jobs): drop select_for_update in _update_job_progress to break row-lock contention Under concurrent async_api result processing, every ML result task queued a contending exclusive lock on the jobs_job row via `select_for_update()`, stacking behind gunicorn view threads also holding the row under ATOMIC_REQUESTS (from per-iter job.logger.info writes in /result). Observed in a validation environment under 5 concurrent async_api jobs (2x10 + 2x100 + 1x500 images, one pipeline): py-spy stacks of celeryworker_ml showed 7 of 8 ForkPoolWorker threads stuck in `psycopg wait` -> `_update_job_progress (tasks.py:493)`. pg_stat_activity consistently showed 17-20 backends "idle in transaction" holding `SELECT ... FOR UPDATE OF jobs_job` for 0.5-5+ seconds at a time. The `max()` guard below the dropped lock still prevents progress regression: two racing workers that both read `progress=0.4` and both compute a new value still write monotonically via `max(existing, new)`. The trade-off is that accumulated counts (detections, classifications, captures) can drift by one batch under a read-modify-write race -- cosmetic only, since the underlying Detection/Classification rows are written authoritatively by `save_results` before this function runs. Refs: RolnickLab/antenna#1256 (log refactor proposal; this PR targets the parallel progress-update path, not the log-write path). Co-Authored-By: Claude Opus 4.7 (1M context) --- ami/jobs/tasks.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/ami/jobs/tasks.py b/ami/jobs/tasks.py index 629c95828..ef3505728 100644 --- a/ami/jobs/tasks.py +++ b/ami/jobs/tasks.py @@ -489,8 +489,19 @@ def _update_job_progress( ) -> None: from ami.jobs.models import Job, JobState # avoid circular import + # NOTE: Previously this used `select_for_update()` inside `transaction.atomic()` + # to serialize concurrent progress updates for the same job. Under concurrent + # async_api result processing that serialization became a bottleneck: every + # ML result task queued a contending exclusive lock on the `jobs_job` row, + # stacking behind gunicorn view threads also holding the row under + # ATOMIC_REQUESTS. The `max()` guard below still prevents progress regression + # between concurrent workers; the trade-off is that accumulated counts + # (detections/classifications/captures) can drift by one batch under race — + # cosmetic only, since the underlying `Detection`/`Classification` rows are + # written authoritatively by `save_results` before this function runs. + # See docs/claude/planning/jobs-row-lock-remediation.md and issue #1256. with transaction.atomic(): - job = Job.objects.select_for_update().get(pk=job_id) + job = Job.objects.get(pk=job_id) # For results stage, accumulate detections/classifications/captures counts if stage == "results": From 173d1d4ab6dc99d793a82ab9dbe3679a88dbd95e Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Mon, 20 Apr 2026 20:04:35 -0700 Subject: [PATCH 02/15] fix(jobs): short-circuit JobLogHandler.emit to break log-UPDATE contention Under concurrent async_api load the per-record UPDATE on jobs_job.logs becomes the dominant bottleneck after the select_for_update fix in the previous commit. Observed on demo: 5 concurrent jobs produced a 56-deep pg_blocking_pids chain, all waiting on UPDATE jobs_job SET logs = ... from either /result (view, under ATOMIC_REQUESTS) or _update_job_progress (ml worker). Short-circuit emit to the module-level logger only (container stdout). The per-job UI log feed goes blank while this hotfix is active; revisit once PR #1259 lands the append-only JobLog child table. --- ami/jobs/models.py | 35 +++++++++++------------------------ 1 file changed, 11 insertions(+), 24 deletions(-) diff --git a/ami/jobs/models.py b/ami/jobs/models.py index 8acb718cd..bd50f27f7 100644 --- a/ami/jobs/models.py +++ b/ami/jobs/models.py @@ -333,32 +333,19 @@ def __init__(self, job: "Job", *args, **kwargs): super().__init__(*args, **kwargs) def emit(self, record: logging.LogRecord): - # Log to the current app logger + # Log to the current app logger (container stdout). logger.log(record.levelno, self.format(record)) - # Write to the logs field on the job instance. - # Refresh from DB first to reduce the window for concurrent overwrites — each - # worker holds its own stale in-memory copy of `logs`, so without a refresh the - # last writer always wins and earlier entries are silently dropped. - # @TODO consider saving logs to the database periodically rather than on every log - try: - self.job.refresh_from_db(fields=["logs"]) - timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") - msg = f"[{timestamp}] {record.levelname} {self.format(record)}" - if msg not in self.job.logs.stdout: - self.job.logs.stdout.insert(0, msg) - - # Write a simpler copy of any errors to the errors field - if record.levelno >= logging.ERROR: - if record.message not in self.job.logs.stderr: - self.job.logs.stderr.insert(0, record.message) - - if len(self.job.logs.stdout) > self.max_log_length: - self.job.logs.stdout = self.job.logs.stdout[: self.max_log_length] - - self.job.save(update_fields=["logs"], update_progress=False) - except Exception as e: - logger.error(f"Failed to save logs for job #{self.job.pk}: {e}") + # HOTFIX 2026-04-20: Persisting every log line to ``jobs_job.logs`` is + # the dominant remaining source of row-lock contention under concurrent + # async_api load. Every call triggered ``UPDATE jobs_job SET logs = ...`` + # on the shared job row; inside ``ATOMIC_REQUESTS`` a single batched + # ``/result`` POST stacked N such UPDATEs in one tx, blocking every ML + # worker on the same row for the duration of the request. Short-circuit + # here until PR #1259 lands an append-only ``JobLog`` child table. + # Container stdout above still captures every line; only the per-job + # UI log view goes blank while this hotfix is active. + return @dataclass From 3e200419ee861ff0f36a083a1e9347e39b838489 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Mon, 20 Apr 2026 20:42:38 -0700 Subject: [PATCH 03/15] feat(jobs): add JOB_LOG_PERSIST_ENABLED flag + local repro docs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the unconditional noop of JobLogHandler.emit from the previous commit with a feature flag (default True, preserves existing behavior). Deployments hitting row-lock contention on jobs_job can set JOB_LOG_PERSIST_ENABLED=false to short-circuit the per-record UPDATE until PR #1259's append-only JobLog table is in place. Validated locally (dev compose, WEB_CONCURRENCY=1, 8 ml fork workers, batched POSTs at 50 results × 10 concurrent): flag=true: blocker_chain=37, 0/10 POSTs complete (120s timeout) flag=false: blocker_chain=1 (trivial), 20/20 POSTs complete, p95=5.5s Also add: - scripts/load_test_result_endpoint.py — standalone batched-POST driver. A single-result-per-POST curl loop does NOT reproduce the contention because it skips the per-iter job.logger.info call inside one ATOMIC_REQUESTS tx. Batching N>1 results per POST is load-bearing. - docs/claude/debugging/row-lock-contention-reproduction.md — full runbook: pathology, prereqs, steps, pg_stat_activity queries, before/after signal table, flag usage. --- ami/jobs/models.py | 46 ++++- config/settings/base.py | 9 + .../row-lock-contention-reproduction.md | 172 ++++++++++++++++++ scripts/load_test_result_endpoint.py | 91 +++++++++ 4 files changed, 308 insertions(+), 10 deletions(-) create mode 100644 docs/claude/debugging/row-lock-contention-reproduction.md create mode 100644 scripts/load_test_result_endpoint.py diff --git a/ami/jobs/models.py b/ami/jobs/models.py index bd50f27f7..4111034f8 100644 --- a/ami/jobs/models.py +++ b/ami/jobs/models.py @@ -8,6 +8,7 @@ import pydantic from celery import uuid from celery.result import AsyncResult +from django.conf import settings from django.db import models, transaction from django.utils.text import slugify from django_pydantic_field import SchemaField @@ -336,16 +337,41 @@ def emit(self, record: logging.LogRecord): # Log to the current app logger (container stdout). logger.log(record.levelno, self.format(record)) - # HOTFIX 2026-04-20: Persisting every log line to ``jobs_job.logs`` is - # the dominant remaining source of row-lock contention under concurrent - # async_api load. Every call triggered ``UPDATE jobs_job SET logs = ...`` - # on the shared job row; inside ``ATOMIC_REQUESTS`` a single batched - # ``/result`` POST stacked N such UPDATEs in one tx, blocking every ML - # worker on the same row for the duration of the request. Short-circuit - # here until PR #1259 lands an append-only ``JobLog`` child table. - # Container stdout above still captures every line; only the per-job - # UI log view goes blank while this hotfix is active. - return + # Gated by ``JOB_LOG_PERSIST_ENABLED`` (default True). Persisting every + # log line to ``jobs_job.logs`` becomes a row-lock contention point + # under concurrent async_api load — each call triggers + # ``UPDATE jobs_job SET logs = ...`` on the shared job row, and inside + # ``ATOMIC_REQUESTS`` a single batched ``/result`` POST stacks N such + # UPDATEs in one tx, blocking every ML worker on the same row for the + # duration of the request. Deployments hitting that pattern can set the + # flag to False to short-circuit here until PR #1259 lands an + # append-only ``JobLog`` child table. See issue #1256. + if not getattr(settings, "JOB_LOG_PERSIST_ENABLED", True): + return + + # Write to the logs field on the job instance. + # Refresh from DB first to reduce the window for concurrent overwrites — each + # worker holds its own stale in-memory copy of `logs`, so without a refresh the + # last writer always wins and earlier entries are silently dropped. + # @TODO consider saving logs to the database periodically rather than on every log + try: + self.job.refresh_from_db(fields=["logs"]) + timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + msg = f"[{timestamp}] {record.levelname} {self.format(record)}" + if msg not in self.job.logs.stdout: + self.job.logs.stdout.insert(0, msg) + + # Write a simpler copy of any errors to the errors field + if record.levelno >= logging.ERROR: + if record.message not in self.job.logs.stderr: + self.job.logs.stderr.insert(0, record.message) + + if len(self.job.logs.stdout) > self.max_log_length: + self.job.logs.stdout = self.job.logs.stdout[: self.max_log_length] + + self.job.save(update_fields=["logs"], update_progress=False) + except Exception as e: + logger.error(f"Failed to save logs for job #{self.job.pk}: {e}") @dataclass diff --git a/config/settings/base.py b/config/settings/base.py index 3e13275a8..5f50a3495 100644 --- a/config/settings/base.py +++ b/config/settings/base.py @@ -568,3 +568,12 @@ def _celery_result_backend_url(redis_url): # Default taxa filters DEFAULT_INCLUDE_TAXA = env.list("DEFAULT_INCLUDE_TAXA", default=[]) # type: ignore[no-untyped-call] DEFAULT_EXCLUDE_TAXA = env.list("DEFAULT_EXCLUDE_TAXA", default=[]) # type: ignore[no-untyped-call] + +# When True, ``JobLogHandler.emit`` persists each log line to ``jobs_job.logs`` +# (JSONB column) so the per-job log feed in the UI stays populated. When False, +# log lines go to the container stdout logger only — used as an escape hatch +# under concurrent async_api load where the per-record UPDATE on ``jobs_job.logs`` +# becomes a row-lock contention point (see issue #1256, PR #1261). Default True +# preserves existing behavior; deployments seeing contention can set to False +# until the append-only ``JobLog`` child table (PR #1259) is in place. +JOB_LOG_PERSIST_ENABLED = env.bool("JOB_LOG_PERSIST_ENABLED", default=True) # type: ignore[no-untyped-call] diff --git a/docs/claude/debugging/row-lock-contention-reproduction.md b/docs/claude/debugging/row-lock-contention-reproduction.md new file mode 100644 index 000000000..1ca5e23e7 --- /dev/null +++ b/docs/claude/debugging/row-lock-contention-reproduction.md @@ -0,0 +1,172 @@ +# Reproducing the `jobs_job` row-lock contention locally + +Runbook for reproducing, on a local dev stack, the row-lock contention that +affects concurrent `async_api` ML jobs. Context: issue #1256, PR #1261, and +PR #1259 (complementary `JobLog` table refactor). + +**Why this matters.** Naive repro attempts with a `curl` loop that fires one +result per POST (`{"results": [{...}]}`) do NOT trigger the pathology. They +only exercise the worker-side `select_for_update` path, which is fixed once +PR #1261 lands. The dominant remaining bottleneck is per-result logging +inside `ATOMIC_REQUESTS` — to see it locally you need **batched POSTs** that +match the real ADC shape (`AMI_LOCALIZATION_BATCH_SIZE=4`, +`AMI_CLASSIFICATION_BATCH_SIZE=150`). + +## The pathology + +Two mutating paths UPDATE the `jobs_job` row for every log line written via +`job.logger.info(...)`: + +1. **View path** (`ami/jobs/views.py` — `result` and `tasks` actions): the + per-iteration `job.logger.info("Queued pipeline result: ...")` inside the + POST body loop runs under `ATOMIC_REQUESTS`. A single batched POST with N + results therefore stacks N UPDATEs on `jobs_job.logs` inside one tx that + doesn't commit until the view returns. Every other writer on the same row + (other worker tasks, other POST handlers) blocks behind it. +2. **Worker path** (`ami/jobs/tasks.py` — `_update_job_progress`): each + `process_nats_pipeline_result` celery task calls `_update_job_progress`, + which emits its own log lines, each triggering another UPDATE on the same + row. + +The smoking gun in `pg_stat_activity`: + +- Root blocker: a backend `state = idle in transaction`, last query + `UPDATE "jobs_job" SET "logs" = ...`, held for many seconds. +- Waiters: dozens of backends with `wait_event_type = Lock`, + `wait_event = tuple` or `transactionid`, all on the same row. + +## Prereqs + +- Local antenna stack up via the standard dev compose + (`docker compose up -d`) with postgres, redis, rabbitmq, nats, django, + celeryworker, and celeryworker_ml healthy. +- A job in a running state (any `async_api` job with `status = STARTED` will + do — the view accepts results regardless of whether real tasks exist). +- An auth token for a user with permission to POST to + `/api/v2/jobs/{id}/result/`. +- Python 3.10+ on the host (the load-test script uses stdlib only). + +## Scripts + +- `scripts/load_test_result_endpoint.py` — fires concurrent batched POSTs. +- `ami/jobs/management/commands/chaos_monkey.py` — adjacent tooling for + `async_api` chaos scenarios; covered in `chaos-scenarios.md`. + +## Step-by-step + +### 1. Grab an auth token and a target job + +From a shell on the host: + +```bash +docker compose exec -T django python manage.py shell <<'PY' +from rest_framework.authtoken.models import Token +from ami.users.models import User +from ami.jobs.models import Job + +u = User.objects.filter(is_staff=True).first() +t, _ = Token.objects.get_or_create(user=u) +print("TOKEN=", t.key) + +j = Job.objects.filter(status="STARTED", dispatch_mode="async_api").first() +if j is None: + # Any running job works — create one if there isn't one. + # Adjust project/collection/pipeline PKs to your local data. + print("No running async_api job found; create one via the UI or shell.") +else: + print("JOB_ID=", j.pk) +PY +``` + +If no running job exists, create one with whatever project/collection/pipeline +are seeded locally. The view does not need real tasks queued behind the +job — it only needs the job row to accept result POSTs. + +### 2. Fire batched POSTs + +```bash +python scripts/load_test_result_endpoint.py \ + --batch 50 --concurrency 10 --rounds 3 +``` + +`--batch 50` puts 50 `PipelineResultsError` entries in each POST body. Any +batch size >1 will stack UPDATEs; 50 is a comfortable reproduction size +because it makes each POST's tx hold long enough for others to pile up. +`--concurrency 10` fires 10 parallel POSTs per wave. `--rounds 3` fires +three back-to-back waves. + +### 3. Monitor Postgres during the test + +In a second shell: + +```bash +docker exec psql -U -d <<'SQL' +-- Scalars +SELECT count(*) AS idle_in_tx + FROM pg_stat_activity + WHERE datname = current_database() AND state = 'idle in transaction'; + +SELECT count(*) AS blocker_chain + FROM pg_stat_activity blocked + JOIN pg_stat_activity blocking + ON blocking.pid = ANY(pg_blocking_pids(blocked.pid)) + WHERE blocked.wait_event_type = 'Lock' + AND blocked.datname = current_database(); + +-- Top offenders +SELECT state, wait_event, + substring(query, 1, 80), + EXTRACT(EPOCH FROM now() - xact_start) AS xact_age_s + FROM pg_stat_activity + WHERE datname = current_database() + AND state != 'idle' + AND (state = 'idle in transaction' OR wait_event_type = 'Lock') + ORDER BY xact_start NULLS LAST + LIMIT 20; +SQL +``` + +### 4. Before/after signatures + +Measured on a local dev stack with WEB_CONCURRENCY=1 (gunicorn default) and +8 celery ML-fork workers, batch=50, concurrency=10. + +| Signal | PR #1261 only (`JOB_LOG_PERSIST_ENABLED=true`) | PR #1261 + flag off (`JOB_LOG_PERSIST_ENABLED=false`) | +|---|---|---| +| `blocker_chain` count | 30+ | 0–1 (transient) | +| `idle_in_tx` count | 8–10 | 0 | +| Root-blocker query | `UPDATE jobs_job SET logs = ...` held 2–60s | transient `SELECT`s only | +| POST success (10 concurrent × 50-result batch, 120s timeout) | 0/10 (all timeout) | 10/10 | +| p95 POST latency | 120s+ | ~5s | + +## The feature flag + +Setting `JOB_LOG_PERSIST_ENABLED=false` (env var on the Django container) +causes `JobLogHandler.emit` to write only to the container stdout logger and +skip the per-record UPDATE on `jobs_job.logs`. The per-job UI log feed +stops receiving new entries while the flag is off; container stdout still +captures everything. + +Default is `true` — existing deployments keep their current behavior. The +flag is intended as a time-bounded escape hatch until the append-only +`JobLog` child table from PR #1259 is in place. + +To test the flag locally, append `JOB_LOG_PERSIST_ENABLED=false` to the +django env file used by your compose (e.g. `.envs/.local/.django`) and +recreate the django container (`docker compose up -d --force-recreate +django`). Verify from a shell: + +```bash +docker compose exec -T django python -c \ + "from django.conf import settings; print(settings.JOB_LOG_PERSIST_ENABLED)" +``` + +## Related + +- Issue #1256 — full contention analysis with path breakdown. +- PR #1261 — drops `select_for_update` in `_update_job_progress`; adds the + `JOB_LOG_PERSIST_ENABLED` flag; this runbook. +- PR #1259 — append-only `JobLog` child table. When merged, the flag can be + removed in favor of a cutover to the new write path. +- `docs/claude/debugging/chaos-scenarios.md` — adjacent chaos tooling for + NATS redelivery and retry-path validation. diff --git a/scripts/load_test_result_endpoint.py b/scripts/load_test_result_endpoint.py new file mode 100644 index 000000000..498721585 --- /dev/null +++ b/scripts/load_test_result_endpoint.py @@ -0,0 +1,91 @@ +#!/usr/bin/env python3 +"""Fire concurrent batched POSTs against ``POST /api/v2/jobs/{id}/result/``. + +Reproduces the row-lock contention pathology described in +``docs/claude/debugging/row-lock-contention-reproduction.md``. Each POST body +contains N fake ``PipelineResultsError`` entries so the per-result +``job.logger.info(...)`` call inside ``ATOMIC_REQUESTS`` stacks N UPDATEs on +``jobs_job.logs`` in a single view transaction — the shape real ADC workers +produce (``AMI_LOCALIZATION_BATCH_SIZE=4``, ``AMI_CLASSIFICATION_BATCH_SIZE=150``). + +A single-result-per-POST loop does NOT reproduce the contention. Batching +is load-bearing. + +Usage: + + python scripts/load_test_result_endpoint.py \\ + [--batch 50] [--concurrency 10] [--rounds 3] \\ + [--host http://localhost:8000] + +Dependencies: Python 3.10+, stdlib only. +""" +import argparse +import concurrent.futures +import json +import time +import urllib.error +import urllib.request +import uuid + + +def make_body(batch_size: int, prefix: str) -> bytes: + results = [ + { + "reply_subject": f"{prefix}.r{i}.{uuid.uuid4().hex[:8]}", + "result": {"error": "load-test", "image_id": f"img-{prefix}-{i}"}, + } + for i in range(batch_size) + ] + return json.dumps({"results": results}).encode() + + +def fire_one(url: str, token: str, body: bytes, idx: int) -> tuple[int, int, float]: + req = urllib.request.Request( + url, + data=body, + headers={"Authorization": f"Token {token}", "Content-Type": "application/json"}, + method="POST", + ) + t0 = time.time() + try: + with urllib.request.urlopen(req, timeout=120) as resp: + return (idx, resp.status, time.time() - t0) + except urllib.error.HTTPError as e: + return (idx, e.code, time.time() - t0) + except Exception: + return (idx, -1, time.time() - t0) + + +def main(): + ap = argparse.ArgumentParser(description=__doc__.splitlines()[0]) + ap.add_argument("job_id", type=int, help="Target Job.pk (must be in a running state)") + ap.add_argument("token", help="DRF auth Token for a user with result-POST permission") + ap.add_argument("--batch", type=int, default=50, help="results per POST body (default 50)") + ap.add_argument("--concurrency", type=int, default=10, help="parallel POSTs per round (default 10)") + ap.add_argument("--rounds", type=int, default=3, help="how many waves to fire (default 3)") + ap.add_argument("--host", default="http://localhost:8000", help="API host (default localhost:8000)") + args = ap.parse_args() + + url = f"{args.host}/api/v2/jobs/{args.job_id}/result/" + print(f"url={url} batch={args.batch} concurrency={args.concurrency} rounds={args.rounds}") + + t_start = time.time() + for round_idx in range(args.rounds): + with concurrent.futures.ThreadPoolExecutor(max_workers=args.concurrency) as ex: + futures = [ + ex.submit(fire_one, url, args.token, make_body(args.batch, f"r{round_idx}_{i}"), i) + for i in range(args.concurrency) + ] + results = [f.result() for f in concurrent.futures.as_completed(futures)] + good = sum(1 for _, s, _ in results if s == 200) + latencies = sorted([lat for _, _, lat in results]) + p50 = latencies[len(latencies) // 2] + p95 = latencies[int(len(latencies) * 0.95)] + print( + f"round {round_idx}: ok={good}/{args.concurrency} " + f"p50={p50:.2f}s p95={p95:.2f}s elapsed={time.time() - t_start:.1f}s" + ) + + +if __name__ == "__main__": + main() From 826e2bb20216a3adef0046bd214bc652e7af16db Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Tue, 21 Apr 2026 00:23:59 -0700 Subject: [PATCH 04/15] perf(jobs): wrap process_nats_pipeline_result with cachalot_disabled MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Under concurrent async_api load the ML result task is a pure write path: save_results inserts Detection/Classification rows and _update_job_progress UPDATEs jobs_job. Cachalot's post-write invalidation added ~2.5s of overhead per task in production measurement (issue #1256 Path 4), which throttles the ml_results queue drain when ADC POSTs arrive faster than Celery can clear them. Nothing inside the task benefits from the query cache — every read is of a freshly-written row — so skipping invalidation is strictly a throughput win. Decorator stack: @celery_app.task wraps the cachalot-wrapped function, so Celery sees the context manager enter/exit on each task run. This was prototyped on demo during the 2026-04-20 investigation but never committed (demo was reset to clean main before PR #1261 was opened). Restoring it here closes the last documented ML-path throughput regression. --- ami/jobs/tasks.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/ami/jobs/tasks.py b/ami/jobs/tasks.py index ef3505728..8b4435d86 100644 --- a/ami/jobs/tasks.py +++ b/ami/jobs/tasks.py @@ -7,6 +7,7 @@ from typing import TYPE_CHECKING from asgiref.sync import async_to_sync, sync_to_async +from cachalot.api import cachalot_disabled from celery.signals import task_failure, task_postrun, task_prerun from django.db import transaction from redis.exceptions import RedisError @@ -158,6 +159,15 @@ def _log_worker_availability(job) -> None: soft_time_limit=300, # 5 minutes time_limit=360, # 6 minutes ) +# Disable cachalot cache invalidation for this task. Each call writes +# Detection/Classification rows and UPDATEs jobs_job; under concurrent +# async_api load, cachalot's post-write invalidation added ~2.5s/task +# (measured on demo, issue #1256 Path 4). This is a pure write path — +# nothing inside benefits from the query cache — so skipping invalidation +# is strictly a throughput win. Celery task decorator stack order matters: +# @celery_app.task wraps the cachalot-wrapped function, so Celery sees the +# cachalot context manager enter/exit on every task execution. +@cachalot_disabled() def process_nats_pipeline_result(self, job_id: int, result_data: dict, reply_subject: str) -> None: """ Process a single pipeline result asynchronously. From 9d15813e2d58383dfcbfbc0616424fe5d84392e5 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Tue, 21 Apr 2026 01:03:13 -0700 Subject: [PATCH 05/15] chore(jobs): address PR #1261 review feedback - tasks.py: remove reference to non-existent planning doc; point at issue #1256 and PR #1261 instead (Copilot finding). - load_test_result_endpoint.py: move token to $ANTENNA_TOKEN env var (or optional --token) to keep secrets out of argv/shell history; validate batch/concurrency/rounds > 0; require http(s) scheme on --host; capture error type+reason per-request instead of returning bare -1 (CodeRabbit findings). - docs: update the repro runbook to match the new CLI shape. Co-Authored-By: Claude --- ami/jobs/tasks.py | 2 +- .../row-lock-contention-reproduction.md | 3 +- scripts/load_test_result_endpoint.py | 66 +++++++++++++++---- 3 files changed, 55 insertions(+), 16 deletions(-) diff --git a/ami/jobs/tasks.py b/ami/jobs/tasks.py index 8b4435d86..3010543ea 100644 --- a/ami/jobs/tasks.py +++ b/ami/jobs/tasks.py @@ -509,7 +509,7 @@ def _update_job_progress( # (detections/classifications/captures) can drift by one batch under race — # cosmetic only, since the underlying `Detection`/`Classification` rows are # written authoritatively by `save_results` before this function runs. - # See docs/claude/planning/jobs-row-lock-remediation.md and issue #1256. + # See issue #1256 and PR #1261. with transaction.atomic(): job = Job.objects.get(pk=job_id) diff --git a/docs/claude/debugging/row-lock-contention-reproduction.md b/docs/claude/debugging/row-lock-contention-reproduction.md index 1ca5e23e7..6946549c2 100644 --- a/docs/claude/debugging/row-lock-contention-reproduction.md +++ b/docs/claude/debugging/row-lock-contention-reproduction.md @@ -85,7 +85,8 @@ job — it only needs the job row to accept result POSTs. ### 2. Fire batched POSTs ```bash -python scripts/load_test_result_endpoint.py \ +export ANTENNA_TOKEN= +python scripts/load_test_result_endpoint.py \ --batch 50 --concurrency 10 --rounds 3 ``` diff --git a/scripts/load_test_result_endpoint.py b/scripts/load_test_result_endpoint.py index 498721585..dc9a6bd88 100644 --- a/scripts/load_test_result_endpoint.py +++ b/scripts/load_test_result_endpoint.py @@ -13,7 +13,8 @@ Usage: - python scripts/load_test_result_endpoint.py \\ + export ANTENNA_TOKEN=... + python scripts/load_test_result_endpoint.py \\ [--batch 50] [--concurrency 10] [--rounds 3] \\ [--host http://localhost:8000] @@ -22,8 +23,11 @@ import argparse import concurrent.futures import json +import os +import sys import time import urllib.error +import urllib.parse import urllib.request import uuid @@ -39,7 +43,7 @@ def make_body(batch_size: int, prefix: str) -> bytes: return json.dumps({"results": results}).encode() -def fire_one(url: str, token: str, body: bytes, idx: int) -> tuple[int, int, float]: +def fire_one(url: str, token: str, body: bytes, idx: int) -> tuple[int, int, float, str]: req = urllib.request.Request( url, data=body, @@ -49,24 +53,53 @@ def fire_one(url: str, token: str, body: bytes, idx: int) -> tuple[int, int, flo t0 = time.time() try: with urllib.request.urlopen(req, timeout=120) as resp: - return (idx, resp.status, time.time() - t0) + return (idx, resp.status, time.time() - t0, "") except urllib.error.HTTPError as e: - return (idx, e.code, time.time() - t0) - except Exception: - return (idx, -1, time.time() - t0) + return (idx, e.code, time.time() - t0, f"HTTPError: {e.reason}") + except urllib.error.URLError as e: + return (idx, -1, time.time() - t0, f"URLError: {e.reason}") + except Exception as e: + return (idx, -1, time.time() - t0, f"{type(e).__name__}: {e}") + + +def _positive_int(value: str) -> int: + try: + parsed = int(value) + except ValueError as e: + raise argparse.ArgumentTypeError(f"must be an integer, got {value!r}") from e + if parsed <= 0: + raise argparse.ArgumentTypeError(f"must be > 0, got {parsed}") + return parsed + + +def _http_url(value: str) -> str: + parsed = urllib.parse.urlparse(value) + if parsed.scheme not in {"http", "https"} or not parsed.netloc: + raise argparse.ArgumentTypeError(f"must be an http(s) URL, got {value!r}") + return value.rstrip("/") def main(): ap = argparse.ArgumentParser(description=__doc__.splitlines()[0]) - ap.add_argument("job_id", type=int, help="Target Job.pk (must be in a running state)") - ap.add_argument("token", help="DRF auth Token for a user with result-POST permission") - ap.add_argument("--batch", type=int, default=50, help="results per POST body (default 50)") - ap.add_argument("--concurrency", type=int, default=10, help="parallel POSTs per round (default 10)") - ap.add_argument("--rounds", type=int, default=3, help="how many waves to fire (default 3)") - ap.add_argument("--host", default="http://localhost:8000", help="API host (default localhost:8000)") + ap.add_argument("job_id", type=_positive_int, help="Target Job.pk (must be in a running state)") + ap.add_argument( + "--token", + default=os.environ.get("ANTENNA_TOKEN"), + help="DRF auth Token (default: $ANTENNA_TOKEN). Prefer the env var to avoid shell-history leakage.", + ) + ap.add_argument("--batch", type=_positive_int, default=50, help="results per POST body (default 50)") + ap.add_argument("--concurrency", type=_positive_int, default=10, help="parallel POSTs per round (default 10)") + ap.add_argument("--rounds", type=_positive_int, default=3, help="how many waves to fire (default 3)") + ap.add_argument( + "--host", type=_http_url, default="http://localhost:8000", help="API host (default localhost:8000)" + ) args = ap.parse_args() + if not args.token: + ap.error("no token provided: set ANTENNA_TOKEN env var or pass --token") + url = f"{args.host}/api/v2/jobs/{args.job_id}/result/" + # Never print the token — just echo the request shape. print(f"url={url} batch={args.batch} concurrency={args.concurrency} rounds={args.rounds}") t_start = time.time() @@ -77,14 +110,19 @@ def main(): for i in range(args.concurrency) ] results = [f.result() for f in concurrent.futures.as_completed(futures)] - good = sum(1 for _, s, _ in results if s == 200) - latencies = sorted([lat for _, _, lat in results]) + good = sum(1 for _, s, _, _ in results if s == 200) + latencies = sorted([lat for _, _, lat, _ in results]) p50 = latencies[len(latencies) // 2] p95 = latencies[int(len(latencies) * 0.95)] print( f"round {round_idx}: ok={good}/{args.concurrency} " f"p50={p50:.2f}s p95={p95:.2f}s elapsed={time.time() - t_start:.1f}s" ) + errors = [(idx, status, err) for idx, status, _, err in results if err] + for idx, status, err in errors[:5]: + print(f" err[{idx}] status={status} {err}", file=sys.stderr) + if len(errors) > 5: + print(f" ...{len(errors) - 5} more errors suppressed", file=sys.stderr) if __name__ == "__main__": From ef78fb50db6fb7a61f9c5da8a73bb41ca388c533 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Tue, 21 Apr 2026 01:09:33 -0700 Subject: [PATCH 06/15] fix(jobs): narrow job.save() to update_fields in _update_job_progress MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously `job.save()` (no update_fields) wrote the entire row, which under concurrent async_api load meant a stale read-modify-write could clobber unrelated fields mutated by a concurrent worker — most notably `progress.errors` (appended by `_reconcile_lost_images`) and `logs` (appended by JobLogHandler.emit when persistence is enabled). Narrow the write to the fields this function actually mutates (progress, status, finished_at) plus updated_at so auto_now still fires. The underlying read-modify-write race remains — this only limits blast radius, not the race itself. The counter-drift and `max()` guard semantics discussed in the block comment above are unchanged. Address PR #1261 CodeRabbit review feedback on ami/jobs/tasks.py:514. Co-Authored-By: Claude --- ami/jobs/tasks.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/ami/jobs/tasks.py b/ami/jobs/tasks.py index 3010543ea..29c3fb521 100644 --- a/ami/jobs/tasks.py +++ b/ami/jobs/tasks.py @@ -578,7 +578,13 @@ def _update_job_progress( job.progress.summary.status = complete_state job.finished_at = datetime.datetime.now() # Use naive datetime in local time job.logger.info(f"Updated job {job_id} progress in stage '{stage}' to {progress_percentage*100}%") - job.save() + # Narrow the write to the fields we actually mutated. Without this, a full + # save() would also overwrite `updated_at`, `logs`, and any other field on + # the instance fetched at the top of this block — so a concurrent worker's + # append to `progress.errors` (via `_reconcile_lost_images`) or log line + # (via JobLogHandler) could be clobbered by a stale read-modify-write. + # See PR #1261 review feedback. + job.save(update_fields=["progress", "status", "finished_at", "updated_at"]) try: _log_job_throughput(job, stage) except Exception as e: From 39f734ade1bcaaf2c1caea87531ff8abcc48b041 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Tue, 21 Apr 2026 01:48:36 -0700 Subject: [PATCH 07/15] feat(settings): wire NATS_TASK_TTR env var with 60s default MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit nats_queue.py already reads getattr(settings, "NATS_TASK_TTR", 30), but the setting was never declared in base.py — so the ack_wait was pinned at 30s everywhere regardless of env. Wire it through and raise the default to 60s: 30s is too tight for 24 MB images + GPU cold start + synchronous result POST, and caused spurious NATS redeliveries under concurrent load. Per-env override via NATS_TASK_TTR (demo bumps this further to 120s). Co-Authored-By: Claude Opus 4.7 (1M context) --- config/settings/base.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/config/settings/base.py b/config/settings/base.py index 5f50a3495..d7f0c160e 100644 --- a/config/settings/base.py +++ b/config/settings/base.py @@ -319,6 +319,11 @@ def _celery_result_backend_url(redis_url): # NATS # ------------------------------------------------------------------------------ NATS_URL = env("NATS_URL", default="nats://localhost:4222") # type: ignore[no-untyped-call] +# How long a worker has to ack a dispatched task before NATS redelivers it. +# Must exceed worst-case task duration: S3 download + GPU cold-start + batch +# inference + synchronous POST of results. With 24 MB images and batch sizes +# of 8-24, 30s was too tight and caused spurious redeliveries under load. +NATS_TASK_TTR = env.int("NATS_TASK_TTR", default=60) # ADMIN # ------------------------------------------------------------------------------ From 014b8c0cb5cf2b57f2b2638a23cb293c486c79be Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Tue, 21 Apr 2026 02:42:35 -0700 Subject: [PATCH 08/15] feat(jobs): fair worker polling via last-touched ordering MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Workers polling GET /api/v2/jobs/?ids_only=1&incomplete_only=1 currently get jobs in PK order (oldest first), so N concurrent pollers all race to the same first job. Under the workshop load (4 pollers × 4 concurrent jobs) this caused the drain-one-job-at-a-time pattern observed in the 6-job retest: j192 starved at 0% for ~500s while all pollers hammered j191, and NATS nearly exhausted max_deliver=2 on j192. Order ids_only responses by updated_at ASC (least-recently-touched first). updated_at freshens on every progress write, log write, or status transition, so actively-worked jobs drop to the bottom and starved jobs bubble to the top — rotation without a dedicated last_polled_at field. Fairness is a property of the serving endpoint, not of the worker. Third-party workers with their own consumption patterns should still see evenly-distributed handoffs; keeping the scheduling logic in Antenna rather than ADC preserves that guarantee. Co-Authored-By: Claude Opus 4.7 (1M context) --- ami/jobs/views.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/ami/jobs/views.py b/ami/jobs/views.py index ec9d64481..c6333e5d0 100644 --- a/ami/jobs/views.py +++ b/ami/jobs/views.py @@ -248,10 +248,19 @@ def get_queryset(self) -> QuerySet: ) # Filter out completed jobs that have not been updated in the last X hours cutoff_datetime = timezone.now() - timezone.timedelta(hours=cutoff_hours) - return jobs.exclude( + jobs = jobs.exclude( status=JobState.failed_states(), updated_at__lt=cutoff_datetime, ) + # Worker-polling call path (`ids_only=1`): hand back least-recently-touched + # jobs first so concurrent pollers don't all converge on the same oldest + # job. `updated_at` freshens on every progress/log/status write, so active + # jobs drop to the bottom and starved jobs bubble to the top without + # needing a dedicated `last_polled_at` field. Fairness falls out of + # existing writes rather than a new column. + if self.action == "list" and url_boolean_param(self.request, "ids_only", default=False): + jobs = jobs.order_by("updated_at", "pk") + return jobs @extend_schema( parameters=[ From 7b7dfa5dc6e62fd38765d610e29449e220b191d6 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Tue, 21 Apr 2026 02:59:56 -0700 Subject: [PATCH 09/15] feat(jobs): default ids_only to limit=1 for pop()-style polling MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Worker polling (?ids_only=1) now hands back one least-recently-touched job per request rather than the default paginated batch of 10. Concurrent workers were caching the full list and draining it serially, which starved later jobs until the cache drained. Forcing a re-poll per job lets the updated_at fairness sort rotate work across jobs every iteration. Callers who want a batch can still pass ?limit=N or ?page_size=N explicitly — back-compat preserved for the few non-polling ids_only consumers. No worker-side change required: existing clients iterate a list of zero or more and work identically with a list of length one. --- ami/jobs/views.py | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/ami/jobs/views.py b/ami/jobs/views.py index c6333e5d0..693f5c73d 100644 --- a/ami/jobs/views.py +++ b/ami/jobs/views.py @@ -16,6 +16,7 @@ from rest_framework.filters import BaseFilterBackend from rest_framework.response import Response +from ami.base.pagination import LimitOffsetPaginationWithPermissions from ami.base.permissions import ObjectPermission from ami.base.views import ProjectMixin from ami.jobs.schemas import ids_only_param, incomplete_only_param @@ -262,6 +263,28 @@ def get_queryset(self) -> QuerySet: jobs = jobs.order_by("updated_at", "pk") return jobs + @property + def paginator(self): + # Treat `?ids_only=1` as a pop()-style handoff ("what job is next?") + # rather than a list() dump: default to one job per response unless the + # caller explicitly asks for a batch via ?limit=N or ?page_size=N. + # Concurrent pollers drain a cached list serially and starve later jobs; + # forcing a re-poll per job lets the `updated_at` fairness sort rotate + # work across jobs every iteration. No ADC-side change required. + if not hasattr(self, "_paginator"): + if ( + self.action == "list" + and url_boolean_param(self.request, "ids_only", default=False) + and "limit" not in self.request.query_params + and "page_size" not in self.request.query_params + ): + paginator = LimitOffsetPaginationWithPermissions() + paginator.default_limit = 1 + self._paginator = paginator + else: + self._paginator = self.pagination_class() if self.pagination_class is not None else None + return self._paginator + @extend_schema( parameters=[ project_id_doc_param, From 7445f6ea45ef974786a5ecb02fe5efe1c1895aaf Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Tue, 21 Apr 2026 03:09:04 -0700 Subject: [PATCH 10/15] fix(jobs): randomize ids_only ordering instead of updated_at sort The updated_at sort had a degenerate case: freshly queued jobs all share near-identical updated_at (ties broken by pk), so simultaneous pollers deterministically pick the same "oldest" job. Replacing the sort with ORDER BY RANDOM() gives probabilistic disjoint assignment across concurrent pollers without writing a poll-stamp column. Combined with the limit=1 default on ?ids_only=1, each worker poll is an independent "pick any unfinished job" draw. Fairness is probabilistic rather than strictly oldest-first, but that's an acceptable trade given we want to avoid write-on-read. If we later decide to enforce strict fairness, the right follow-up is a dedicated last_polled_at column with UPDATE ... RETURNING semantics, not a return to the updated_at sort. --- ami/jobs/views.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/ami/jobs/views.py b/ami/jobs/views.py index 693f5c73d..d1aa81f0c 100644 --- a/ami/jobs/views.py +++ b/ami/jobs/views.py @@ -253,14 +253,16 @@ def get_queryset(self) -> QuerySet: status=JobState.failed_states(), updated_at__lt=cutoff_datetime, ) - # Worker-polling call path (`ids_only=1`): hand back least-recently-touched - # jobs first so concurrent pollers don't all converge on the same oldest - # job. `updated_at` freshens on every progress/log/status write, so active - # jobs drop to the bottom and starved jobs bubble to the top without - # needing a dedicated `last_polled_at` field. Fairness falls out of - # existing writes rather than a new column. + # Worker-polling call path (`ids_only=1`): randomize order so concurrent + # pollers don't all converge on the same head-of-queue job. An + # `updated_at`-based sort has a degenerate case at startup — freshly + # queued jobs all share near-identical timestamps, tie-broken by `pk`, + # so simultaneous polls deterministically pick the same job. Random + # ordering gives probabilistic disjoint assignment without writing a + # poll-stamp column. Combined with `limit=1` below, each poll is an + # independent "pick any unfinished job" draw. if self.action == "list" and url_boolean_param(self.request, "ids_only", default=False): - jobs = jobs.order_by("updated_at", "pk") + jobs = jobs.order_by("?") return jobs @property From aeca6c1f9bef11c50daaef1ca168ae12ae37908a Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Tue, 21 Apr 2026 03:48:21 -0700 Subject: [PATCH 11/15] test(jobs): update ids_only test for pop()-style default Existing test_list_jobs_with_ids_only now passes ?limit=10 explicitly to assert the "list all" behavior. New test_list_jobs_ids_only_pops_one covers the new default where ?ids_only=1 returns one job to support pop()-style polling handoff across workers. Co-Authored-By: Claude Opus 4.7 (1M context) --- ami/jobs/tests/test_jobs.py | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/ami/jobs/tests/test_jobs.py b/ami/jobs/tests/test_jobs.py index 90d1f6baa..afbdac9b9 100644 --- a/ami/jobs/tests/test_jobs.py +++ b/ami/jobs/tests/test_jobs.py @@ -375,7 +375,11 @@ def test_list_jobs_with_ids_only(self): self._create_job("Test job 3", start_now=False) self.client.force_authenticate(user=self.user) - jobs_list_url = reverse_with_params("api:job-list", params={"project_id": self.project.pk, "ids_only": True}) + # Pass an explicit limit to override the pop()-style default (see test_list_jobs_ids_only_pops_one below). + jobs_list_url = reverse_with_params( + "api:job-list", + params={"project_id": self.project.pk, "ids_only": True, "limit": 10}, + ) resp = self.client.get(jobs_list_url) self.assertEqual(resp.status_code, 200) @@ -388,6 +392,21 @@ def test_list_jobs_with_ids_only(self): # Verify we don't get the full results structure self.assertNotIn("details", data["results"][0]) + def test_list_jobs_ids_only_pops_one(self): + """`?ids_only=1` without an explicit limit returns one job (pop()-style handoff).""" + self._create_job("Test job 2", start_now=False) + self._create_job("Test job 3", start_now=False) + + self.client.force_authenticate(user=self.user) + jobs_list_url = reverse_with_params("api:job-list", params={"project_id": self.project.pk, "ids_only": True}) + resp = self.client.get(jobs_list_url) + + self.assertEqual(resp.status_code, 200) + data = resp.json() + self.assertEqual(data["count"], 3) + self.assertEqual(len(data["results"]), 1) + self.assertIsInstance(data["results"][0]["id"], int) + def test_list_jobs_with_incomplete_only(self): """Test the incomplete_only parameter filters jobs correctly.""" # Create jobs via API From 44d4c322795f22c0183870466494d95ff3eaa253 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Tue, 21 Apr 2026 13:54:47 -0700 Subject: [PATCH 12/15] feat(jobs): heartbeat on idle ids_only polls The pipeline-scoped heartbeat in _mark_pipeline_pull_services_seen only fires when a worker hits /tasks/ or /result/ on an active job, so workers idling on GET /jobs/?ids_only=1 between jobs had no heartbeat path. Their last_seen would age past PROCESSING_SERVICE_LAST_SEEN_MAX and the UI would flip them offline despite being actively online. Add _mark_async_services_seen_for_project + matching celery task that marks every async ProcessingService attached to the polling project as seen. Same Redis throttle pattern as the existing heartbeat (HEARTBEAT_THROTTLE_SECONDS = 30s), scoped by project rather than (pipeline, project) since the list endpoint has no pipeline context. --- ami/jobs/tasks.py | 22 +++++++++ ami/jobs/tests/test_jobs.py | 99 +++++++++++++++++++++++++++++++++++++ ami/jobs/views.py | 37 +++++++++++++- 3 files changed, 157 insertions(+), 1 deletion(-) diff --git a/ami/jobs/tasks.py b/ami/jobs/tasks.py index 29c3fb521..1d666b66b 100644 --- a/ami/jobs/tasks.py +++ b/ami/jobs/tasks.py @@ -77,6 +77,28 @@ def update_pipeline_pull_services_seen(job_id: int) -> None: ) +@celery_app.task( + soft_time_limit=10, + time_limit=15, + ignore_result=True, +) +def update_async_services_seen_for_project(project_id: int) -> None: + """ + Heartbeat for idle worker polls on ``GET /api/v2/jobs/?ids_only=1``. + + Unlike ``update_pipeline_pull_services_seen`` — which is pipeline-scoped and + only fires when a worker hits /tasks/ or /result/ for an active job — this + marks every async processing service attached to the polling project as + seen. The list endpoint has no pipeline context, so scope is the project. + """ + from ami.ml.models import ProcessingService # avoid circular import + + ProcessingService.objects.async_services().filter(projects=project_id).update( + last_seen=datetime.datetime.now(), + last_seen_live=True, + ) + + @celery_app.task(bind=True, soft_time_limit=default_soft_time_limit, time_limit=default_time_limit) def run_job(self, job_id: int) -> None: from ami.jobs.models import Job diff --git a/ami/jobs/tests/test_jobs.py b/ami/jobs/tests/test_jobs.py index afbdac9b9..34bcc562e 100644 --- a/ami/jobs/tests/test_jobs.py +++ b/ami/jobs/tests/test_jobs.py @@ -1176,3 +1176,102 @@ def test_view_gate_suppresses_redundant_dispatches(self): _mark_pipeline_pull_services_seen(self.job) self.assertEqual(mock_delay.call_count, 1) + + +class TestListEndpointHeartbeat(APITestCase): + """ + Unit tests for _mark_async_services_seen_for_project and the list endpoint's + heartbeat dispatch on ``?ids_only=1`` polls. + """ + + def setUp(self): + from django.core.cache import cache + + cache.clear() + + self.project = Project.objects.create(name="List Heartbeat Project") + self.service = ProcessingService.objects.create( + name="List Heartbeat Worker", + endpoint_url=None, + ) + self.service.projects.add(self.project) + + self.user = User.objects.create_user(email="list-heartbeat@example.com", is_superuser=True, is_active=True) + self.client.force_authenticate(user=self.user) + + def test_list_with_ids_only_dispatches_heartbeat(self): + from unittest.mock import patch + + list_url = reverse_with_params("api:job-list", params={"project_id": self.project.pk, "ids_only": True}) + with patch("ami.jobs.views.update_async_services_seen_for_project.delay") as mock_delay: + resp = self.client.get(list_url) + + self.assertEqual(resp.status_code, 200) + mock_delay.assert_called_once_with(self.project.pk) + + def test_list_without_ids_only_does_not_dispatch_heartbeat(self): + from unittest.mock import patch + + list_url = reverse_with_params("api:job-list", params={"project_id": self.project.pk}) + with patch("ami.jobs.views.update_async_services_seen_for_project.delay") as mock_delay: + resp = self.client.get(list_url) + + self.assertEqual(resp.status_code, 200) + mock_delay.assert_not_called() + + def test_list_heartbeat_tolerates_dispatch_failure(self): + """Broker unavailability on heartbeat enqueue must not break the list response.""" + from unittest.mock import patch + + from kombu.exceptions import OperationalError + + list_url = reverse_with_params("api:job-list", params={"project_id": self.project.pk, "ids_only": True}) + with patch( + "ami.jobs.views.update_async_services_seen_for_project.delay", + side_effect=OperationalError("broker unavailable"), + ): + resp = self.client.get(list_url) + + self.assertEqual(resp.status_code, 200) + + def test_view_gate_suppresses_redundant_list_dispatches(self): + """Rapid repeated list polls should dispatch at most once per throttle window.""" + from unittest.mock import patch + + from ami.jobs.views import _mark_async_services_seen_for_project + + with patch("ami.jobs.views.update_async_services_seen_for_project.delay") as mock_delay: + for _ in range(5): + _mark_async_services_seen_for_project(self.project.pk) + + self.assertEqual(mock_delay.call_count, 1) + + def test_task_updates_all_project_async_services(self): + """The celery task marks every async service on the project live.""" + import datetime + + from ami.jobs.tasks import update_async_services_seen_for_project + + other_async = ProcessingService.objects.create(name="Other Async", endpoint_url=None) + other_async.projects.add(self.project) + sync_service = ProcessingService.objects.create( + name="Sync Service", endpoint_url="http://nonexistent-host:9999" + ) + sync_service.projects.add(self.project) + sync_last_seen_before = ProcessingService.objects.get(pk=sync_service.pk).last_seen + + before = datetime.datetime.now() - datetime.timedelta(seconds=1) + update_async_services_seen_for_project(self.project.pk) + + self.service.refresh_from_db() + other_async.refresh_from_db() + sync_service.refresh_from_db() + + self.assertTrue(self.service.last_seen_live) + self.assertIsNotNone(self.service.last_seen) + self.assertGreaterEqual(self.service.last_seen, before) + self.assertTrue(other_async.last_seen_live) + # Sync services (with endpoint URL) are not touched by this task — last_seen + # may be set by the creation-time get_status() ping, but should be unchanged + # after the task runs. + self.assertEqual(sync_service.last_seen, sync_last_seen_before) diff --git a/ami/jobs/views.py b/ami/jobs/views.py index d1aa81f0c..bd2a8c2fb 100644 --- a/ami/jobs/views.py +++ b/ami/jobs/views.py @@ -26,7 +26,12 @@ MLJobTasksRequestSerializer, MLJobTasksResponseSerializer, ) -from ami.jobs.tasks import HEARTBEAT_THROTTLE_SECONDS, process_nats_pipeline_result, update_pipeline_pull_services_seen +from ami.jobs.tasks import ( + HEARTBEAT_THROTTLE_SECONDS, + process_nats_pipeline_result, + update_async_services_seen_for_project, + update_pipeline_pull_services_seen, +) from ami.main.api.schemas import project_id_doc_param from ami.main.api.views import DefaultViewSet from ami.utils.fields import url_boolean_param @@ -52,6 +57,29 @@ def _actor_log_context(request) -> tuple[str, str | None]: return user_desc, token_fingerprint +def _mark_async_services_seen_for_project(project_id: int) -> None: + """ + Heartbeat for idle worker polls on ``GET /api/v2/jobs/?ids_only=1``. + + The pipeline-scoped heartbeat in ``_mark_pipeline_pull_services_seen`` only + fires when a worker hits /tasks/ or /result/ on an active job; workers idling + on the list endpoint between jobs had no heartbeat path at all, so their + ``last_seen`` would age out of ``PROCESSING_SERVICE_LAST_SEEN_MAX`` and the + UI would flip them to offline despite being actively online. + + Scope: marks every async service attached to the polling project. The list + endpoint has no pipeline context to narrow by. Once application-token auth + lands (PR #1117), this should be scoped to the specific calling service. + """ + cache_key = f"heartbeat:list:project:{project_id}" + if not cache.add(cache_key, 1, timeout=HEARTBEAT_THROTTLE_SECONDS): + return + try: + update_async_services_seen_for_project.delay(project_id) + except (kombu.exceptions.KombuError, ConnectionError, OSError) as exc: + logger.warning(f"Failed to enqueue non-critical project heartbeat for project {project_id}: {exc}") + + def _mark_pipeline_pull_services_seen(job: "Job") -> None: """ Enqueue a fire-and-forget heartbeat for async (pull-mode) processing services @@ -295,6 +323,13 @@ def paginator(self): ] ) def list(self, request, *args, **kwargs): + # Worker-polling call path: record heartbeat for async processing services + # on the polling project. Throttled via Redis so concurrent pollers don't + # churn the DB/broker. + if url_boolean_param(request, "ids_only", default=False): + project = self.get_active_project() + if project is not None: + _mark_async_services_seen_for_project(project.pk) return super().list(request, *args, **kwargs) @extend_schema( From 259c6592d948ce23246718e3826d9d40bc690ef9 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Tue, 21 Apr 2026 17:03:54 -0700 Subject: [PATCH 13/15] feat(settings): raise NATS_TASK_TTR default to 300s The 60s default worked on push-mode services but was still too tight for pull-mode ADC workers running cold-start GPU pipelines on 24 MB images (staging observed a nontrivial redelivery rate at 60s; demo held steady only after being bumped to 120s, then 300s). Raise the default to 300s so new deployments inherit the ceiling that staging/demo actually run at. max_deliver still bounds how long a genuinely crashed worker can hold tasks invisibly. --- config/settings/base.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/config/settings/base.py b/config/settings/base.py index d7f0c160e..9287a3904 100644 --- a/config/settings/base.py +++ b/config/settings/base.py @@ -322,8 +322,10 @@ def _celery_result_backend_url(redis_url): # How long a worker has to ack a dispatched task before NATS redelivers it. # Must exceed worst-case task duration: S3 download + GPU cold-start + batch # inference + synchronous POST of results. With 24 MB images and batch sizes -# of 8-24, 30s was too tight and caused spurious redeliveries under load. -NATS_TASK_TTR = env.int("NATS_TASK_TTR", default=60) +# of 8-24, anything under a minute caused spurious redeliveries under load; +# 5 minutes gives cold-start GPU pipelines headroom without holding tasks +# invisibly long after a genuine worker crash (bounded by max_deliver). +NATS_TASK_TTR = env.int("NATS_TASK_TTR", default=300) # ADMIN # ------------------------------------------------------------------------------ From 695f7616223fc3d6a9adb4cbba2a9acb2c529ba2 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Tue, 21 Apr 2026 17:32:58 -0700 Subject: [PATCH 14/15] fix(jobs): heartbeat on ids_only polls when only pipeline__slug__in is sent MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The project-scoped heartbeat added in 44d4c32 never fires for the actual ADC worker request: ``GET /api/v2/jobs/?pipeline__slug__in=...&ids_only=1`` does not carry a ``project_id``, so ``get_active_project()`` returns None and the helper short-circuits. Result: async processing services remained ``last_seen_live=null`` forever on demo despite workers polling every 5s. The commit's original rationale — that the list endpoint has no pipeline context — was wrong: the worker's filter slugs are right there in the query string. Add a sibling path that derives scope from those slugs: - ``update_async_services_seen_for_pipelines(pipeline_slugs)`` celery task bumps ``last_seen`` on async services whose pipelines match any slug. - ``_mark_async_services_seen_for_pipelines`` throttles dispatch via a Redis key scoped on the sorted slug set. - The ``list()`` heartbeat branch prefers the pipeline-slug path when those slugs are present, and falls back to the project-scoped path for callers that send ``?project_id=``. Tests cover the real worker shape (no project_id, slugs in query) and the task's filter semantics against sync + async + unrelated services. Co-Authored-By: Claude Opus 4.7 (1M context) --- ami/jobs/tasks.py | 28 ++++++++++++++++++++++++++ ami/jobs/tests/test_jobs.py | 39 ++++++++++++++++++++++++++++++++++++ ami/jobs/views.py | 40 +++++++++++++++++++++++++++++++------ 3 files changed, 101 insertions(+), 6 deletions(-) diff --git a/ami/jobs/tasks.py b/ami/jobs/tasks.py index 1d666b66b..f01675fb9 100644 --- a/ami/jobs/tasks.py +++ b/ami/jobs/tasks.py @@ -77,6 +77,34 @@ def update_pipeline_pull_services_seen(job_id: int) -> None: ) +@celery_app.task( + soft_time_limit=10, + time_limit=15, + ignore_result=True, +) +def update_async_services_seen_for_pipelines(pipeline_slugs: list[str]) -> None: + """ + Heartbeat for idle worker polls on + ``GET /api/v2/jobs/?pipeline__slug__in=...&ids_only=1``. + + The ADC worker sends pipeline slugs but no project_id (one worker may serve + pipelines across many projects), so scope the heartbeat by the pipelines it + asked about. Marks every async ProcessingService linked to any of those + pipelines as seen. + """ + from ami.ml.models import ProcessingService # avoid circular import + + if not pipeline_slugs: + return + + ProcessingService.objects.async_services().filter( + pipelines__slug__in=pipeline_slugs, + ).distinct().update( + last_seen=datetime.datetime.now(), + last_seen_live=True, + ) + + @celery_app.task( soft_time_limit=10, time_limit=15, diff --git a/ami/jobs/tests/test_jobs.py b/ami/jobs/tests/test_jobs.py index 34bcc562e..ee8d833cd 100644 --- a/ami/jobs/tests/test_jobs.py +++ b/ami/jobs/tests/test_jobs.py @@ -1246,6 +1246,45 @@ def test_view_gate_suppresses_redundant_list_dispatches(self): self.assertEqual(mock_delay.call_count, 1) + def test_list_with_pipeline_slugs_no_project_dispatches_heartbeat(self): + """Real ADC worker shape: ?ids_only=1&pipeline__slug__in=... with no project_id.""" + from unittest.mock import patch + + pipeline = Pipeline.objects.create(name="Heartbeat Pipeline", slug="heartbeat-pipeline") + self.service.pipelines.add(pipeline) + + list_url = reverse_with_params( + "api:job-list", + params={"ids_only": True, "pipeline__slug__in": "heartbeat-pipeline"}, + ) + with patch("ami.jobs.views.update_async_services_seen_for_pipelines.delay") as mock_delay: + resp = self.client.get(list_url) + + self.assertEqual(resp.status_code, 200) + mock_delay.assert_called_once_with(["heartbeat-pipeline"]) + + def test_task_updates_services_via_pipeline_slug(self): + """The pipeline-slug celery task marks matching async services live.""" + import datetime + + from ami.jobs.tasks import update_async_services_seen_for_pipelines + + pipeline = Pipeline.objects.create(name="Slug Pipeline", slug="slug-pipeline") + self.service.pipelines.add(pipeline) + unrelated = ProcessingService.objects.create(name="Unrelated Async", endpoint_url=None) + unrelated_last_seen_before = unrelated.last_seen + + before = datetime.datetime.now() - datetime.timedelta(seconds=1) + update_async_services_seen_for_pipelines(["slug-pipeline"]) + + self.service.refresh_from_db() + unrelated.refresh_from_db() + + self.assertTrue(self.service.last_seen_live) + self.assertIsNotNone(self.service.last_seen) + self.assertGreaterEqual(self.service.last_seen, before) + self.assertEqual(unrelated.last_seen, unrelated_last_seen_before) + def test_task_updates_all_project_async_services(self): """The celery task marks every async service on the project live.""" import datetime diff --git a/ami/jobs/views.py b/ami/jobs/views.py index bd2a8c2fb..4050ad2e2 100644 --- a/ami/jobs/views.py +++ b/ami/jobs/views.py @@ -29,6 +29,7 @@ from ami.jobs.tasks import ( HEARTBEAT_THROTTLE_SECONDS, process_nats_pipeline_result, + update_async_services_seen_for_pipelines, update_async_services_seen_for_project, update_pipeline_pull_services_seen, ) @@ -57,6 +58,26 @@ def _actor_log_context(request) -> tuple[str, str | None]: return user_desc, token_fingerprint +def _mark_async_services_seen_for_pipelines(pipeline_slugs: tuple[str, ...]) -> None: + """ + Heartbeat for idle worker polls that send ``pipeline__slug__in=...`` but no + ``project_id`` — the real ADC worker shape, where one worker may serve + pipelines across many projects and has no single project to nominate. + + Redis throttle keyed on the sorted slug set so concurrent pollers for the + same pipelines share a single dispatch per window. + """ + if not pipeline_slugs: + return + cache_key = "heartbeat:list:pipelines:" + ",".join(sorted(pipeline_slugs)) + if not cache.add(cache_key, 1, timeout=HEARTBEAT_THROTTLE_SECONDS): + return + try: + update_async_services_seen_for_pipelines.delay(list(pipeline_slugs)) + except (kombu.exceptions.KombuError, ConnectionError, OSError) as exc: + logger.warning(f"Failed to enqueue non-critical pipeline-slug heartbeat: {exc}") + + def _mark_async_services_seen_for_project(project_id: int) -> None: """ Heartbeat for idle worker polls on ``GET /api/v2/jobs/?ids_only=1``. @@ -323,13 +344,20 @@ def paginator(self): ] ) def list(self, request, *args, **kwargs): - # Worker-polling call path: record heartbeat for async processing services - # on the polling project. Throttled via Redis so concurrent pollers don't - # churn the DB/broker. + # Worker-polling call path: record heartbeat for async processing services. + # The real ADC worker request carries ``pipeline__slug__in=...`` and no + # project_id, so prefer the pipeline-slug scope when those slugs are + # present; fall back to project scope for callers that pass ?project_id=. + # Throttled via Redis so concurrent pollers don't churn the DB/broker. if url_boolean_param(request, "ids_only", default=False): - project = self.get_active_project() - if project is not None: - _mark_async_services_seen_for_project(project.pk) + pipeline_slugs_raw = request.query_params.get("pipeline__slug__in") + if pipeline_slugs_raw: + slugs = tuple(s for s in (p.strip() for p in pipeline_slugs_raw.split(",")) if s) + _mark_async_services_seen_for_pipelines(slugs) + else: + project = self.get_active_project() + if project is not None: + _mark_async_services_seen_for_project(project.pk) return super().list(request, *args, **kwargs) @extend_schema( From 9cee66d8020d9a1d55f21d29552a644284457997 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Wed, 22 Apr 2026 17:25:07 -0700 Subject: [PATCH 15/15] docs(jobs): address PR #1261 review feedback MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Response to reviewer comments on the async_api concurrency tuning PR: - views.py: mark the three `ids_only=1` overrides (random ordering, paginator default_limit=1, heartbeat dispatch) as temporary — they exist only because list() is doubling as a claim-next-job endpoint. A dedicated `/next` action is tracked as #1265; comments carry a hard-coded removal target of 2026-04-24 to keep the expiration visible. - views.py: rewrite the `_mark_async_services_seen_for_pipelines` and `_mark_async_services_seen_for_project` docstrings so the contract is obvious — they are Redis-throttled wrappers around the matching celery task. No DB work in the wrapper. - tasks.py: add TODO on both `update_async_services_seen_*` tasks pointing at #1194 (client-ID auth), so the "one poller falsely marks its peers live" invariant-break is visible in source, not just in the PR body. - tasks.py: fix the stale comment in `_update_job_progress` — the previous wording claimed the narrowed save() avoids overwriting `updated_at`, but `updated_at` is in `update_fields` intentionally (Django skips auto_now when update_fields is provided). - config/settings/base.py: add a one-line plain-language purpose for `JOB_LOG_PERSIST_ENABLED` so the flag reads at-a-glance. No behavior changes. Co-Authored-By: Claude Opus 4.7 (1M context) --- ami/jobs/tasks.py | 28 ++++++++++++------ ami/jobs/views.py | 63 +++++++++++++++++++++++++++++------------ config/settings/base.py | 3 ++ 3 files changed, 67 insertions(+), 27 deletions(-) diff --git a/ami/jobs/tasks.py b/ami/jobs/tasks.py index f01675fb9..f3c996f86 100644 --- a/ami/jobs/tasks.py +++ b/ami/jobs/tasks.py @@ -91,6 +91,11 @@ def update_async_services_seen_for_pipelines(pipeline_slugs: list[str]) -> None: pipelines across many projects), so scope the heartbeat by the pipelines it asked about. Marks every async ProcessingService linked to any of those pipelines as seen. + + TODO: once #1194 (client-ID / application-token auth) lands, scope this + update to the specific calling ProcessingService rather than every service + matching the slugs. Currently one poller's heartbeat falsely marks its + peers live. """ from ami.ml.models import ProcessingService # avoid circular import @@ -114,10 +119,14 @@ def update_async_services_seen_for_project(project_id: int) -> None: """ Heartbeat for idle worker polls on ``GET /api/v2/jobs/?ids_only=1``. - Unlike ``update_pipeline_pull_services_seen`` — which is pipeline-scoped and - only fires when a worker hits /tasks/ or /result/ for an active job — this - marks every async processing service attached to the polling project as - seen. The list endpoint has no pipeline context, so scope is the project. + Fallback path used only when the request carries ``?project_id=`` without + ``pipeline__slug__in`` — the ADC worker does not currently send this shape, + so in practice the pipeline-slug task above is the one that fires. + + TODO: once #1194 (client-ID / application-token auth) lands, scope this + update to the specific calling ProcessingService rather than every async + service attached to the project. Currently one poller's heartbeat falsely + marks its peers live. """ from ami.ml.models import ProcessingService # avoid circular import @@ -629,11 +638,12 @@ def _update_job_progress( job.finished_at = datetime.datetime.now() # Use naive datetime in local time job.logger.info(f"Updated job {job_id} progress in stage '{stage}' to {progress_percentage*100}%") # Narrow the write to the fields we actually mutated. Without this, a full - # save() would also overwrite `updated_at`, `logs`, and any other field on - # the instance fetched at the top of this block — so a concurrent worker's - # append to `progress.errors` (via `_reconcile_lost_images`) or log line - # (via JobLogHandler) could be clobbered by a stale read-modify-write. - # See PR #1261 review feedback. + # save() would overwrite `logs` and any other field on the instance + # fetched at the top of this block — so a concurrent worker's append to + # `progress.errors` (via `_reconcile_lost_images`) or log line (via + # JobLogHandler) could be clobbered by a stale read-modify-write. + # `updated_at` is listed explicitly because Django skips `auto_now` bumps + # when `update_fields` is provided. See PR #1261 review feedback. job.save(update_fields=["progress", "status", "finished_at", "updated_at"]) try: _log_job_throughput(job, stage) diff --git a/ami/jobs/views.py b/ami/jobs/views.py index 4050ad2e2..44516ca14 100644 --- a/ami/jobs/views.py +++ b/ami/jobs/views.py @@ -60,12 +60,16 @@ def _actor_log_context(request) -> tuple[str, str | None]: def _mark_async_services_seen_for_pipelines(pipeline_slugs: tuple[str, ...]) -> None: """ - Heartbeat for idle worker polls that send ``pipeline__slug__in=...`` but no - ``project_id`` — the real ADC worker shape, where one worker may serve - pipelines across many projects and has no single project to nominate. - - Redis throttle keyed on the sorted slug set so concurrent pollers for the - same pipelines share a single dispatch per window. + Redis-throttled wrapper around the ``update_async_services_seen_for_pipelines`` + celery task. The wrapper does no DB work itself — it gates dispatch so at + most one heartbeat is enqueued per sorted slug set per + ``HEARTBEAT_THROTTLE_SECONDS`` window (currently 30s), keeping the HTTP + request path cheap under concurrent polling. + + Called from the ``?ids_only=1`` branch of ``JobViewSet.list()`` — the real + ADC worker shape, which sends ``pipeline__slug__in=`` and no + ``project_id`` (one worker may serve pipelines across many projects and + has no single project to nominate). """ if not pipeline_slugs: return @@ -80,17 +84,18 @@ def _mark_async_services_seen_for_pipelines(pipeline_slugs: tuple[str, ...]) -> def _mark_async_services_seen_for_project(project_id: int) -> None: """ - Heartbeat for idle worker polls on ``GET /api/v2/jobs/?ids_only=1``. - - The pipeline-scoped heartbeat in ``_mark_pipeline_pull_services_seen`` only - fires when a worker hits /tasks/ or /result/ on an active job; workers idling - on the list endpoint between jobs had no heartbeat path at all, so their - ``last_seen`` would age out of ``PROCESSING_SERVICE_LAST_SEEN_MAX`` and the - UI would flip them to offline despite being actively online. - - Scope: marks every async service attached to the polling project. The list - endpoint has no pipeline context to narrow by. Once application-token auth - lands (PR #1117), this should be scoped to the specific calling service. + Redis-throttled wrapper around ``update_async_services_seen_for_project``. + Same shape as ``_mark_async_services_seen_for_pipelines`` above — gates + celery dispatch to at most one per-project enqueue per + ``HEARTBEAT_THROTTLE_SECONDS`` window — but keyed by project id for + callers that send ``?project_id=`` without ``pipeline__slug__in``. + + The ADC worker does not currently use this shape, so this is a fallback. + Background on why idle-poll heartbeats exist at all: the other heartbeat + (``_mark_pipeline_pull_services_seen``) only fires from ``/tasks/`` and + ``/result/`` — i.e., from workers with active work — so a worker sitting + on ``GET /jobs/?ids_only=1`` between jobs would otherwise age past + ``PROCESSING_SERVICE_LAST_SEEN_MAX`` and flip to offline in the UI. """ cache_key = f"heartbeat:list:project:{project_id}" if not cache.add(cache_key, 1, timeout=HEARTBEAT_THROTTLE_SECONDS): @@ -302,6 +307,7 @@ def get_queryset(self) -> QuerySet: status=JobState.failed_states(), updated_at__lt=cutoff_datetime, ) + # ⚠️ TEMPORARY HACK — remove by 2026-04-24. # Worker-polling call path (`ids_only=1`): randomize order so concurrent # pollers don't all converge on the same head-of-queue job. An # `updated_at`-based sort has a degenerate case at startup — freshly @@ -310,18 +316,32 @@ def get_queryset(self) -> QuerySet: # ordering gives probabilistic disjoint assignment without writing a # poll-stamp column. Combined with `limit=1` below, each poll is an # independent "pick any unfinished job" draw. + # + # The whole `ids_only=1` branch (this ordering override, the paginator + # override in `paginator` below, the heartbeat dispatch in `list()`) + # exists because the ADC worker currently repurposes this list endpoint + # as a claim-next-job call. Correct shape is a dedicated `/next` action + # (tracked as #1265). Once `/next` ships + # and ADC is migrated, delete this `order_by("?")` override along with + # the paginator override and the list() heartbeat branch. if self.action == "list" and url_boolean_param(self.request, "ids_only", default=False): jobs = jobs.order_by("?") return jobs @property def paginator(self): + # ⚠️ TEMPORARY HACK — remove by 2026-04-24. # Treat `?ids_only=1` as a pop()-style handoff ("what job is next?") # rather than a list() dump: default to one job per response unless the # caller explicitly asks for a batch via ?limit=N or ?page_size=N. # Concurrent pollers drain a cached list serially and starve later jobs; - # forcing a re-poll per job lets the `updated_at` fairness sort rotate + # forcing a re-poll per job lets the random-shuffle fairness sort rotate # work across jobs every iteration. No ADC-side change required. + # + # This override exists only because `list(ids_only=True)` is being used + # as a claim-next-job call. Replace with a dedicated `/next` action + # (tracked as #1265); once ADC is migrated, + # drop this override so the list endpoint goes back to normal pagination. if not hasattr(self, "_paginator"): if ( self.action == "list" @@ -344,11 +364,18 @@ def paginator(self): ] ) def list(self, request, *args, **kwargs): + # ⚠️ TEMPORARY HACK — remove by 2026-04-24. # Worker-polling call path: record heartbeat for async processing services. # The real ADC worker request carries ``pipeline__slug__in=...`` and no # project_id, so prefer the pipeline-slug scope when those slugs are # present; fall back to project scope for callers that pass ?project_id=. # Throttled via Redis so concurrent pollers don't churn the DB/broker. + # + # This heartbeat branch lives on `list()` only because `list(ids_only=1)` + # is doubling as the worker's claim-next-job endpoint. Once a dedicated + # `/next` action ships (tracked as #1265) + # and ADC is migrated to it, move the heartbeat to that action and + # delete this branch — `list()` should go back to being a plain list. if url_boolean_param(request, "ids_only", default=False): pipeline_slugs_raw = request.query_params.get("pipeline__slug__in") if pipeline_slugs_raw: diff --git a/config/settings/base.py b/config/settings/base.py index 9287a3904..eabb593d7 100644 --- a/config/settings/base.py +++ b/config/settings/base.py @@ -576,6 +576,9 @@ def _celery_result_backend_url(redis_url): DEFAULT_INCLUDE_TAXA = env.list("DEFAULT_INCLUDE_TAXA", default=[]) # type: ignore[no-untyped-call] DEFAULT_EXCLUDE_TAXA = env.list("DEFAULT_EXCLUDE_TAXA", default=[]) # type: ignore[no-untyped-call] +# Purpose: master switch for per-job logs in the database and UI. +# Set to False to disable them as a contention escape hatch. +# # When True, ``JobLogHandler.emit`` persists each log line to ``jobs_job.logs`` # (JSONB column) so the per-job log feed in the UI stays populated. When False, # log lines go to the container stdout logger only — used as an escape hatch