fix(jobs): fixes for concurrent ML processing jobs#1261
Conversation
…w-lock contention Under concurrent async_api result processing, every ML result task queued a contending exclusive lock on the jobs_job row via `select_for_update()`, stacking behind gunicorn view threads also holding the row under ATOMIC_REQUESTS (from per-iter job.logger.info writes in /result). Observed in a validation environment under 5 concurrent async_api jobs (2x10 + 2x100 + 1x500 images, one pipeline): py-spy stacks of celeryworker_ml showed 7 of 8 ForkPoolWorker threads stuck in `psycopg wait` -> `_update_job_progress (tasks.py:493)`. pg_stat_activity consistently showed 17-20 backends "idle in transaction" holding `SELECT ... FOR UPDATE OF jobs_job` for 0.5-5+ seconds at a time. The `max()` guard below the dropped lock still prevents progress regression: two racing workers that both read `progress=0.4` and both compute a new value still write monotonically via `max(existing, new)`. The trade-off is that accumulated counts (detections, classifications, captures) can drift by one batch under a read-modify-write race -- cosmetic only, since the underlying Detection/Classification rows are written authoritatively by `save_results` before this function runs. Refs: #1256 (log refactor proposal; this PR targets the parallel progress-update path, not the log-write path). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
✅ Deploy Preview for antenna-ssec canceled.
|
✅ Deploy Preview for antenna-preview canceled.
|
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughDecorated the Celery task with Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant API as "Django API"
participant Worker as "Celery Worker"
participant DB as "Postgres DB"
participant Cachalot as "Cachalot Cache"
Client->>API: POST /api/v2/jobs/{id}/result/
API->>Worker: enqueue process_nats_pipeline_result(payload)
Worker->>Cachalot: enter cachalot_disabled() (suspend post-write invalidation)
Worker->>DB: BEGIN transaction
Worker->>DB: SELECT Job (no SELECT FOR UPDATE)
alt JOB_LOG_PERSIST_ENABLED == true
Worker->>DB: UPDATE Job.logs and progress/status fields
else
Worker->>DB: UPDATE Job.progress/status fields only
end
DB-->>Worker: COMMIT
Worker->>Cachalot: exit cachalot_disabled()
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Suggested labels
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Pull request overview
This PR aims to reduce Postgres row-lock contention during concurrent async job result processing by removing a blocking SELECT ... FOR UPDATE from _update_job_progress, keeping the surrounding transaction.atomic() and the monotonic-progress guard logic.
Changes:
- Replaces
Job.objects.select_for_update().get(pk=job_id)withJob.objects.get(pk=job_id)in_update_job_progress. - Adds an explanatory comment documenting the contention context and trade-offs.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
ami/jobs/tasks.py (1)
503-571:⚠️ Potential issue | 🟠 MajorAvoid stale full-row saves after removing the row lock.
With the plain
get()on Line 504, two workers can read the same oldprogressbefore either commits. Themax()guard on Lines 527-529 then compares against stale in-memory state, so the laterjob.save()on Line 571 can overwrite a newer 100%/SUCCESS update with an older partial stage. This makes the race broader than count drift; progress,status, andfinished_atcan regress.Consider switching this path to an optimistic compare-and-retry update, or a server-side atomic JSONB update, so each writer recomputes against the latest committed row without blocking on
select_for_update().One possible optimistic retry shape
+ max_attempts = 5 + for attempt in range(max_attempts): - with transaction.atomic(): - job = Job.objects.get(pk=job_id) + with transaction.atomic(): + job = Job.objects.get(pk=job_id) + loaded_updated_at = job.updated_at # ... compute progress/status from the freshly loaded job ... - job.save() + update_values = { + "progress": job.progress, + "updated_at": datetime.datetime.now(), + } + if job.progress.is_complete(): + update_values.update( + status=job.status, + finished_at=job.finished_at, + ) + + updated = Job.objects.filter( + pk=job_id, + updated_at=loaded_updated_at, + ).update(**update_values) + if updated: + break + else: + logger.warning("Failed to update job %s progress after concurrent retries", job_id) + return🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ami/jobs/tasks.py` around lines 503 - 571, The current plain Job.objects.get(pk=job_id) allows a stale-read and a later job.save() can overwrite newer progress; change this to an optimistic compare-and-retry update: read the Job and existing_stage (as you already do via existing_stage = job.progress.get_stage(stage)), compute the new progress/state and the JSONB state_params, then attempt a conditional update via the ORM (e.g. Job.objects.filter(pk=job_id, progress__<stage_path>__progress=existing_progress).update(progress=<new_json>, status=<new_status>, finished_at=..., ...) and only fall back to re-reading and retrying (a few times) if the update returns 0 rows; keep using the same symbols (job, existing_stage, existing_progress, passed_progress, state_params, progress_percentage, complete_state, job.save) so you update against the latest committed row instead of blindly calling job.save(), or alternatively implement a server-side JSONB update that atomically merges counts into job.progress.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@ami/jobs/tasks.py`:
- Around line 503-571: The current plain Job.objects.get(pk=job_id) allows a
stale-read and a later job.save() can overwrite newer progress; change this to
an optimistic compare-and-retry update: read the Job and existing_stage (as you
already do via existing_stage = job.progress.get_stage(stage)), compute the new
progress/state and the JSONB state_params, then attempt a conditional update via
the ORM (e.g. Job.objects.filter(pk=job_id,
progress__<stage_path>__progress=existing_progress).update(progress=<new_json>,
status=<new_status>, finished_at=..., ...) and only fall back to re-reading and
retrying (a few times) if the update returns 0 rows; keep using the same symbols
(job, existing_stage, existing_progress, passed_progress, state_params,
progress_percentage, complete_state, job.save) so you update against the latest
committed row instead of blindly calling job.save(), or alternatively implement
a server-side JSONB update that atomically merges counts into job.progress.
…ntion Under concurrent async_api load the per-record UPDATE on jobs_job.logs becomes the dominant bottleneck after the select_for_update fix in the previous commit. Observed on demo: 5 concurrent jobs produced a 56-deep pg_blocking_pids chain, all waiting on UPDATE jobs_job SET logs = ... from either /result (view, under ATOMIC_REQUESTS) or _update_job_progress (ml worker). Short-circuit emit to the module-level logger only (container stdout). The per-job UI log feed goes blank while this hotfix is active; revisit once PR #1259 lands the append-only JobLog child table.
Replace the unconditional noop of JobLogHandler.emit from the previous commit with a feature flag (default True, preserves existing behavior). Deployments hitting row-lock contention on jobs_job can set JOB_LOG_PERSIST_ENABLED=false to short-circuit the per-record UPDATE until PR #1259's append-only JobLog table is in place. Validated locally (dev compose, WEB_CONCURRENCY=1, 8 ml fork workers, batched POSTs at 50 results × 10 concurrent): flag=true: blocker_chain=37, 0/10 POSTs complete (120s timeout) flag=false: blocker_chain=1 (trivial), 20/20 POSTs complete, p95=5.5s Also add: - scripts/load_test_result_endpoint.py — standalone batched-POST driver. A single-result-per-POST curl loop does NOT reproduce the contention because it skips the per-iter job.logger.info call inside one ATOMIC_REQUESTS tx. Batching N>1 results per POST is load-bearing. - docs/claude/debugging/row-lock-contention-reproduction.md — full runbook: pathology, prereqs, steps, pg_stat_activity queries, before/after signal table, flag usage.
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@scripts/load_test_result_endpoint.py`:
- Around line 14-18: The script currently accepts the auth token as a positional
argument and lacks validation for numeric flags and the host URL; update the CLI
parsing in load_test_result_endpoint.py (e.g., in main or parse_args) to make
token optional as a --token flag and prefer reading it from an environment
variable or secure prompt when not provided (do not keep it in argv or
prints/logs), validate that --batch, --concurrency, and --rounds are integers >
0 and fail fast with a clear error message and non-zero exit if not, and
validate --host with a strict HTTP(S) URL regex/urllib.parse check before
calling urlopen (reject non-http(s) schemes); also ensure any logging or
exception messages never include the token string.
- Around line 42-56: The fire_one function currently returns (idx, status,
elapsed) and uses -1 for generic exceptions; change it to return a fourth
element carrying error details (e.g., (idx, status, elapsed, error_str)) by
catching exceptions as e in both urllib.error.HTTPError and the generic except
and using str(e) or repr(e) (use empty string or None when there is no error).
Then update the result unpacking where results are processed (previously
unpacking idx, status, elapsed) to accept the extra value (e.g., idx, status,
elapsed, err) and handle or log err for diagnostics without altering the
happy-path behavior. Ensure type hints and any downstream uses expect the new
fourth element.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 6d21f484-b804-4141-8bdc-ef058fa0a4a2
📒 Files selected for processing (4)
ami/jobs/models.pyconfig/settings/base.pydocs/claude/debugging/row-lock-contention-reproduction.mdscripts/load_test_result_endpoint.py
✅ Files skipped from review due to trivial changes (1)
- docs/claude/debugging/row-lock-contention-reproduction.md
Replaces the jobs_job.logs JSON-field UPDATE path in JobLogHandler.emit with an INSERT on a new JobLog child table. Under concurrent async_api load every log line triggered UPDATE jobs_job SET logs = ... on the shared job row; inside ATOMIC_REQUESTS a single batched /result POST stacked N such UPDATEs in one tx, blocking ML workers on the same row (issue #1256). - JobLog model + migration 0021_joblog (FK job, level, message, created_at) - JobLogHandler.emit -> JobLog.objects.create (gated by JOB_LOG_PERSIST_ENABLED flag introduced in PR #1261) - JobSerializer.logs -> SerializerMethodField that reads JobLog rows on detail endpoint and falls back to the legacy jobs_job.logs JSON for jobs created before this table existed. List endpoint keeps the cheap legacy shape to avoid N+1. - No UI changes, no new endpoint. On-wire {stdout, stderr} shape preserved. Co-Authored-By: Claude <noreply@anthropic.com>
Under concurrent async_api load the ML result task is a pure write path: save_results inserts Detection/Classification rows and _update_job_progress UPDATEs jobs_job. Cachalot's post-write invalidation added ~2.5s of overhead per task in production measurement (issue #1256 Path 4), which throttles the ml_results queue drain when ADC POSTs arrive faster than Celery can clear them. Nothing inside the task benefits from the query cache — every read is of a freshly-written row — so skipping invalidation is strictly a throughput win. Decorator stack: @celery_app.task wraps the cachalot-wrapped function, so Celery sees the context manager enter/exit on each task run. This was prototyped on demo during the 2026-04-20 investigation but never committed (demo was reset to clean main before PR #1261 was opened). Restoring it here closes the last documented ML-path throughput regression.
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
ami/jobs/tasks.py (1)
245-246: Minor:Job.objects.get(pk=job_id)happens twice in the task path.Line 245 fetches
jobright after_update_job_progressalready fetched it inside its atomic block (line 514). Not a correctness issue, but since_update_job_progressruns every batch in hot async_api paths, consider either returning the refreshedjobfrom_update_job_progressor deferring the fetch until after the SREM on line 287 — one less round-trip per result. Deferrable; flagging only as optional cleanup in line with the PR's throughput focus.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ami/jobs/tasks.py` around lines 245 - 246, The code calls Job.objects.get(pk=job_id) twice; to avoid an extra DB round-trip, modify the _update_job_progress function to return the refreshed Job instance and use that returned job in process_pipeline_result (or alternatively remove the early Job.objects.get in process_pipeline_result and defer fetching the Job until after the Redis SREM operation), referencing the existing _update_job_progress and the current Job.objects.get usage so you reuse the refreshed job rather than performing a second query.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@ami/jobs/tasks.py`:
- Around line 502-514: The current removal of select_for_update() risks
overwriting unrelated JSON fields on the Job.progress object because job.save()
writes all fields; update the _update_job_progress flow to re-read the latest
job.progress (and specifically progress.errors) inside the transaction.atomic()
block and then call job.save(update_fields=["progress", "status",
"finished_at"]) instead of a full save to limit last-write-wins to only the
intended fields; this change should reference the Job.objects.get(pk=job_id)
read, the subsequent mutation of job.progress, and the job.save() call, and
ensure _reconcile_lost_images’ appends to progress.errors are reconciled by
reading progress.errors after acquiring the atomic block before serializing.
- Around line 162-171: The decorator `@cachalot_disabled`() is invalid because
cachalot_disabled is only a context manager; remove the decorator from the
process_nats_pipeline_result definition and wrap the function body with the
cachalot_disabled context (i.e., use with cachalot_disabled: around the existing
logic inside process_nats_pipeline_result), ensuring you import
cachalot_disabled from django_cachalot if not already present and preserve the
function signature and return behavior.
---
Nitpick comments:
In `@ami/jobs/tasks.py`:
- Around line 245-246: The code calls Job.objects.get(pk=job_id) twice; to avoid
an extra DB round-trip, modify the _update_job_progress function to return the
refreshed Job instance and use that returned job in process_pipeline_result (or
alternatively remove the early Job.objects.get in process_pipeline_result and
defer fetching the Job until after the Redis SREM operation), referencing the
existing _update_job_progress and the current Job.objects.get usage so you reuse
the refreshed job rather than performing a second query.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
- tasks.py: remove reference to non-existent planning doc; point at issue #1256 and PR #1261 instead (Copilot finding). - load_test_result_endpoint.py: move token to $ANTENNA_TOKEN env var (or optional --token) to keep secrets out of argv/shell history; validate batch/concurrency/rounds > 0; require http(s) scheme on --host; capture error type+reason per-request instead of returning bare -1 (CodeRabbit findings). - docs: update the repro runbook to match the new CLI shape. Co-Authored-By: Claude <noreply@anthropic.com>
Replaces the jobs_job.logs JSON-field UPDATE path in JobLogHandler.emit with an INSERT on a new JobLog child table. Under concurrent async_api load every log line triggered UPDATE jobs_job SET logs = ... on the shared job row; inside ATOMIC_REQUESTS a single batched /result POST stacked N such UPDATEs in one tx, blocking ML workers on the same row (issue #1256). - JobLog model + migration 0021_joblog (FK job, level, message, created_at) - JobLogHandler.emit -> JobLog.objects.create (gated by JOB_LOG_PERSIST_ENABLED flag introduced in PR #1261) - JobSerializer.logs -> SerializerMethodField that reads JobLog rows on detail endpoint and falls back to the legacy jobs_job.logs JSON for jobs created before this table existed. List endpoint keeps the cheap legacy shape to avoid N+1. - No UI changes, no new endpoint. On-wire {stdout, stderr} shape preserved. Co-Authored-By: Claude <noreply@anthropic.com>
Replaces the jobs_job.logs JSON-field UPDATE path in JobLogHandler.emit with an INSERT on a new JobLog child table. Under concurrent async_api load every log line triggered UPDATE jobs_job SET logs = ... on the shared job row; inside ATOMIC_REQUESTS a single batched /result POST stacked N such UPDATEs in one tx, blocking ML workers on the same row (issue #1256). - JobLog model + migration 0021_joblog (FK job, level, message, created_at) - JobLogHandler.emit -> JobLog.objects.create (gated by JOB_LOG_PERSIST_ENABLED flag introduced in PR #1261) - JobSerializer.logs -> SerializerMethodField that reads JobLog rows on detail endpoint and falls back to the legacy jobs_job.logs JSON for jobs created before this table existed. List endpoint keeps the cheap legacy shape to avoid N+1. - No UI changes, no new endpoint. On-wire {stdout, stderr} shape preserved. Co-Authored-By: Claude <noreply@anthropic.com>
Previously `job.save()` (no update_fields) wrote the entire row, which under concurrent async_api load meant a stale read-modify-write could clobber unrelated fields mutated by a concurrent worker — most notably `progress.errors` (appended by `_reconcile_lost_images`) and `logs` (appended by JobLogHandler.emit when persistence is enabled). Narrow the write to the fields this function actually mutates (progress, status, finished_at) plus updated_at so auto_now still fires. The underlying read-modify-write race remains — this only limits blast radius, not the race itself. The counter-drift and `max()` guard semantics discussed in the block comment above are unchanged. Address PR #1261 CodeRabbit review feedback on ami/jobs/tasks.py:514. Co-Authored-By: Claude <noreply@anthropic.com>
Replaces the jobs_job.logs JSON-field UPDATE path in JobLogHandler.emit with an INSERT on a new JobLog child table. Under concurrent async_api load every log line triggered UPDATE jobs_job SET logs = ... on the shared job row; inside ATOMIC_REQUESTS a single batched /result POST stacked N such UPDATEs in one tx, blocking ML workers on the same row (issue #1256). - JobLog model + migration 0021_joblog (FK job, level, message, created_at) - JobLogHandler.emit -> JobLog.objects.create (gated by JOB_LOG_PERSIST_ENABLED flag introduced in PR #1261) - JobSerializer.logs -> SerializerMethodField that reads JobLog rows on detail endpoint and falls back to the legacy jobs_job.logs JSON for jobs created before this table existed. List endpoint keeps the cheap legacy shape to avoid N+1. - No UI changes, no new endpoint. On-wire {stdout, stderr} shape preserved. Co-Authored-By: Claude <noreply@anthropic.com>
nats_queue.py already reads getattr(settings, "NATS_TASK_TTR", 30), but the setting was never declared in base.py — so the ack_wait was pinned at 30s everywhere regardless of env. Wire it through and raise the default to 60s: 30s is too tight for 24 MB images + GPU cold start + synchronous result POST, and caused spurious NATS redeliveries under concurrent load. Per-env override via NATS_TASK_TTR (demo bumps this further to 120s). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Replaces the jobs_job.logs JSON-field UPDATE path in JobLogHandler.emit with an INSERT on a new JobLog child table. Under concurrent async_api load every log line triggered UPDATE jobs_job SET logs = ... on the shared job row; inside ATOMIC_REQUESTS a single batched /result POST stacked N such UPDATEs in one tx, blocking ML workers on the same row (issue #1256). - JobLog model + migration 0021_joblog (FK job, level, message, created_at) - JobLogHandler.emit -> JobLog.objects.create (gated by JOB_LOG_PERSIST_ENABLED flag introduced in PR #1261) - JobSerializer.logs -> SerializerMethodField that reads JobLog rows on detail endpoint and falls back to the legacy jobs_job.logs JSON for jobs created before this table existed. List endpoint keeps the cheap legacy shape to avoid N+1. - No UI changes, no new endpoint. On-wire {stdout, stderr} shape preserved. Co-Authored-By: Claude <noreply@anthropic.com>
Workers polling GET /api/v2/jobs/?ids_only=1&incomplete_only=1 currently get jobs in PK order (oldest first), so N concurrent pollers all race to the same first job. Under the workshop load (4 pollers × 4 concurrent jobs) this caused the drain-one-job-at-a-time pattern observed in the 6-job retest: j192 starved at 0% for ~500s while all pollers hammered j191, and NATS nearly exhausted max_deliver=2 on j192. Order ids_only responses by updated_at ASC (least-recently-touched first). updated_at freshens on every progress write, log write, or status transition, so actively-worked jobs drop to the bottom and starved jobs bubble to the top — rotation without a dedicated last_polled_at field. Fairness is a property of the serving endpoint, not of the worker. Third-party workers with their own consumption patterns should still see evenly-distributed handoffs; keeping the scheduling logic in Antenna rather than ADC preserves that guarantee. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Worker polling (?ids_only=1) now hands back one least-recently-touched job per request rather than the default paginated batch of 10. Concurrent workers were caching the full list and draining it serially, which starved later jobs until the cache drained. Forcing a re-poll per job lets the updated_at fairness sort rotate work across jobs every iteration. Callers who want a batch can still pass ?limit=N or ?page_size=N explicitly — back-compat preserved for the few non-polling ids_only consumers. No worker-side change required: existing clients iterate a list of zero or more and work identically with a list of length one.
The updated_at sort had a degenerate case: freshly queued jobs all share near-identical updated_at (ties broken by pk), so simultaneous pollers deterministically pick the same "oldest" job. Replacing the sort with ORDER BY RANDOM() gives probabilistic disjoint assignment across concurrent pollers without writing a poll-stamp column. Combined with the limit=1 default on ?ids_only=1, each worker poll is an independent "pick any unfinished job" draw. Fairness is probabilistic rather than strictly oldest-first, but that's an acceptable trade given we want to avoid write-on-read. If we later decide to enforce strict fairness, the right follow-up is a dedicated last_polled_at column with UPDATE ... RETURNING semantics, not a return to the updated_at sort.
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@ami/jobs/views.py`:
- Around line 256-263: The fairness logic that orders by updated_at in the
ids-only polling path (inside the view handling self.action == "list" with
url_boolean_param(..., "ids_only")) breaks when JOB_LOG_PERSIST_ENABLED is False
because updated_at won't be refreshed by log writes; modify the handler so that
when ids_only is requested you perform an explicit lightweight poll
marker/reservation update (e.g., touch a last_polled_at or reservation timestamp
on the Job row or a dedicated PollMarker record) before returning IDs, or
alternatively gate the updated_at ordering behind JOB_LOG_PERSIST_ENABLED and
document that rotational fairness only applies when job-log persistence is
enabled; ensure you update the same queryset logic
(jobs.order_by("updated_at","pk")) and any transaction/locking around the
reservation to avoid races.
- Around line 268-285: The check that decides whether to force a single-item
paginator mistakenly looks for "page_size" in request.query_params; update the
conditional in the paginator initialization branch to check for "limit" instead
of "page_size" so LimitOffsetPaginationWithPermissions (and any DRF
LimitOffsetPagination-based pagination_class) behaves correctly when callers
pass ?ids_only=1&limit=N. Specifically, modify the conditional that sets
self._paginator (the block that creates LimitOffsetPaginationWithPermissions and
sets default_limit = 1) to verify "limit" not "page_size", keeping all other
logic (self.action == "list", url_boolean_param(self.request, "ids_only",
default=False), and "limit" not in self.request.query_params) intact and
referencing pagination_class/self._paginator as before.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
Existing test_list_jobs_with_ids_only now passes ?limit=10 explicitly to assert the "list all" behavior. New test_list_jobs_ids_only_pops_one covers the new default where ?ids_only=1 returns one job to support pop()-style polling handoff across workers. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The pipeline-scoped heartbeat in _mark_pipeline_pull_services_seen only fires when a worker hits /tasks/ or /result/ on an active job, so workers idling on GET /jobs/?ids_only=1 between jobs had no heartbeat path. Their last_seen would age past PROCESSING_SERVICE_LAST_SEEN_MAX and the UI would flip them offline despite being actively online. Add _mark_async_services_seen_for_project + matching celery task that marks every async ProcessingService attached to the polling project as seen. Same Redis throttle pattern as the existing heartbeat (HEARTBEAT_THROTTLE_SECONDS = 30s), scoped by project rather than (pipeline, project) since the list endpoint has no pipeline context.
The 60s default worked on push-mode services but was still too tight for pull-mode ADC workers running cold-start GPU pipelines on 24 MB images (staging observed a nontrivial redelivery rate at 60s; demo held steady only after being bumped to 120s, then 300s). Raise the default to 300s so new deployments inherit the ceiling that staging/demo actually run at. max_deliver still bounds how long a genuinely crashed worker can hold tasks invisibly.
…s sent The project-scoped heartbeat added in 44d4c32 never fires for the actual ADC worker request: ``GET /api/v2/jobs/?pipeline__slug__in=...&ids_only=1`` does not carry a ``project_id``, so ``get_active_project()`` returns None and the helper short-circuits. Result: async processing services remained ``last_seen_live=null`` forever on demo despite workers polling every 5s. The commit's original rationale — that the list endpoint has no pipeline context — was wrong: the worker's filter slugs are right there in the query string. Add a sibling path that derives scope from those slugs: - ``update_async_services_seen_for_pipelines(pipeline_slugs)`` celery task bumps ``last_seen`` on async services whose pipelines match any slug. - ``_mark_async_services_seen_for_pipelines`` throttles dispatch via a Redis key scoped on the sorted slug set. - The ``list()`` heartbeat branch prefers the pipeline-slug path when those slugs are present, and falls back to the project-scoped path for callers that send ``?project_id=``. Tests cover the real worker shape (no project_id, slugs in query) and the task's filter semantics against sync + async + unrelated services. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Response to reviewer comments on the async_api concurrency tuning PR: - views.py: mark the three `ids_only=1` overrides (random ordering, paginator default_limit=1, heartbeat dispatch) as temporary — they exist only because list() is doubling as a claim-next-job endpoint. A dedicated `/next` action is tracked as #1265; comments carry a hard-coded removal target of 2026-04-24 to keep the expiration visible. - views.py: rewrite the `_mark_async_services_seen_for_pipelines` and `_mark_async_services_seen_for_project` docstrings so the contract is obvious — they are Redis-throttled wrappers around the matching celery task. No DB work in the wrapper. - tasks.py: add TODO on both `update_async_services_seen_*` tasks pointing at #1194 (client-ID auth), so the "one poller falsely marks its peers live" invariant-break is visible in source, not just in the PR body. - tasks.py: fix the stale comment in `_update_job_progress` — the previous wording claimed the narrowed save() avoids overwriting `updated_at`, but `updated_at` is in `update_fields` intentionally (Django skips auto_now when update_fields is provided). - config/settings/base.py: add a one-line plain-language purpose for `JOB_LOG_PERSIST_ENABLED` so the flag reads at-a-glance. No behavior changes. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Replaces the jobs_job.logs JSON-field UPDATE path in JobLogHandler.emit with an INSERT on a new JobLog child table. Under concurrent async_api load every log line triggered UPDATE jobs_job SET logs = ... on the shared job row; inside ATOMIC_REQUESTS a single batched /result POST stacked N such UPDATEs in one tx, blocking ML workers on the same row (issue #1256). - JobLog model + migration 0021_joblog (FK job, level, message, created_at) - JobLogHandler.emit -> JobLog.objects.create (gated by JOB_LOG_PERSIST_ENABLED flag introduced in PR #1261) - JobSerializer.logs -> SerializerMethodField that reads JobLog rows on detail endpoint and falls back to the legacy jobs_job.logs JSON for jobs created before this table existed. List endpoint keeps the cheap legacy shape to avoid N+1. - No UI changes, no new endpoint. On-wire {stdout, stderr} shape preserved. Co-Authored-By: Claude <noreply@anthropic.com>
Summary
Tuning PR for
async_apirow-lock and keep-up contention surfaced underconcurrent ML jobs (issue #1256). Scope is what we can safely change while
PR #1259 (the proper
JobLogappend-only child table) and #1256 option D(atomic
jsonb_setprogress update) land separately.Drop
select_for_update()in_update_job_progress— the worker-sideserialization point. Keeps
transaction.atomic()and themax()guardthat prevents progress regression.
Add
JOB_LOG_PERSIST_ENABLEDfeature flag (defaultTrue) — gatesthe per-record UPDATE in
JobLogHandler.emit. Setting toFalseis atime-bounded escape hatch until PR Refactor job logging to use separate table #1259's append-only
JobLogchildtable is in place.
Narrow
_update_job_progress's save toupdate_fields— limits thewrite to
progress,status,finished_at,updated_atso a concurrentworker's mutation of
logs/progress.errorsisn't clobbered by astale read-modify-write. Addresses PR review feedback.
Wire
NATS_TASK_TTRenv var, default 300s — the ack-wait timeout innats_queue.pywas readingsettings.NATS_TASK_TTRviagetattrwith a30fallback, but the setting was never declared. Effective ack-wait waspinned at 30s everywhere, which is too tight for 24 MB images on GPU
cold-start. Initially raised to 60s, then to 300s (commit
259c6592) tomatch what staging/demo are actually running — 60s was still tight for
cold-start GPU pipelines on large images.
max_deliver(prod=5, demo=2)bounds how long a genuinely crashed worker can hold tasks invisibly.
See "NATS redelivery amplifies Django contention" below.
Docs + script:
docs/claude/debugging/row-lock-contention-reproduction.mdand
scripts/load_test_result_endpoint.pyfor local reproduction.Fair worker polling via
updated_at— then swapped for random shuffle(commits
014b8c0c,7445f6ea). Worker polling through?ids_only=1&incomplete_only=1was returning jobs in insertion order, soconcurrent pollers deterministically converged on the same head-of-queue
job and later jobs starved. Initial fix was
ORDER BY updated_at, pk—leveraging the fact that
updated_atfreshens on every progress/logwrite, so active jobs drop to the bottom and starved jobs bubble to the
top. That has a degenerate case at startup (freshly queued jobs share
near-identical timestamps, tie-broken by
pk, so simultaneous pollsstill pick the same job). Replaced with
ORDER BY RANDOM()which givesprobabilistic disjoint assignment across concurrent pollers without
writing a poll-stamp column. Fairness is probabilistic rather than
strictly oldest-first; strict fairness is a follow-up (dedicated
last_polled_atcolumn withUPDATE ... RETURNINGsemantics).Default
?ids_only=1tolimit=1(commit7b7dfa5d). Workerpolling used to get a paginated batch of 10 jobs and drain them
serially in
for job_id in jobs: _process_job(job_id)(seeami-data-companion/trapdata/antenna/worker.py:106). The cached listcaused later jobs in the batch to starve until the earlier jobs
finished. Changing the ids_only response to length-1 forces a re-poll
per job, which re-samples the random order every iteration. Callers
that genuinely want a batch can still pass
?limit=Nexplicitly —back-compat preserved for any non-polling
ids_onlyconsumers.Implemented by overriding
JobViewSet.paginatorto return aLimitOffsetPaginationwithdefault_limit=1whenids_only=1isset and no explicit
limit/page_sizeis in the query string.Combined with the random ordering above, each worker poll becomes an
independent "pick any unfinished job" draw.
Heartbeat on idle
?ids_only=1polls (commits44d4c322,695f7616)._mark_pipeline_pull_services_seenonly fires from/tasks/and/result/— i.e., from workers with active work — so a worker sittingon
GET /jobs/?ids_only=1between jobs had no heartbeat path and wouldage past
PROCESSING_SERVICE_LAST_SEEN_MAX(60s), flipping its UIstatus to offline despite still polling.
The initial commit (
44d4c32) added a project-scoped heartbeat(
_mark_async_services_seen_for_project). On demo that heartbeat neverfired: the real ADC worker request is
GET /jobs/?pipeline__slug__in=<slugs>&ids_only=1&...with noproject_idin the query string (one worker serves pipelines acrossmultiple projects and has no single project to nominate), so
get_active_project()returnedNoneand the dispatch silentlyshort-circuited.
Follow-up commit
695f7616adds a sibling path —_mark_async_services_seen_for_pipelines+update_async_services_seen_for_pipelines— that derives scope fromthe
pipeline__slug__invalues in the query and bumpslast_seen/last_seen_liveon every asyncProcessingServicewhose
pipelinesmatch. Thelist()heartbeat branch now prefersthe pipeline-slug path when those slugs are present and falls back to
project scope for callers that do send
?project_id=. Same Redis gate(
HEARTBEAT_THROTTLE_SECONDS = 30s), keyed on the sorted slug set(
heartbeat:list:pipelines:<sorted-slugs>) so concurrent pollers forthe same pipelines share a single dispatch per window.
Verified on demo post-deploy:
last_seen_liveflipped fromfalse→trueon all three async worker entries (gpu-02, gpu-03, gpu-04)within one heartbeat window. Narrowing the scope via
AMI_ANTENNA_SERVICE_NAME(header-based, precise per-serviceidentity) is tracked as Narrow idle-poll heartbeat scope via X-Processing-Service-Name header #1264.
Disable cachalot around
process_nats_pipeline_result(commit826e2bb2).The pipeline-result Celery task runs inside a hot UPDATE path on
jobs_joband triggers cachalot's post-write invalidation across alarge set of tables on every call. Wrapping the task body in
cachalot_disabled()skips those invalidations for this specific codepath — the task doesn't query the cached tables anyway, so there's
nothing to invalidate. Cuts per-task overhead in concurrent-result
bursts. No behavior change to callers.
Reviewer takeaway addressed in
9cee66d8: the?ids_only=1branch oflist()(random ordering,limit=1paginator override, heartbeat dispatch)has quietly turned this endpoint into a claim-next-job call. Correct shape
is a dedicated
GET /jobs/next/action, tracked as #1265. The threeoverrides now carry
⚠️ TEMPORARY HACK — remove by 2026-04-24markers incode with back-references to #1265, so the expiration stays visible until
/nextships and ADC migrates.Related follow-up issue on the worker side:
ami-data-companion#143
— once the
/jobsendpoint is out of the way, starvation moves onelayer down to the NATS-task consumption pattern: a worker that has
reserved tasks from job A via
max_ack_pendingthen serializes job B'sbatch leaves A's reservations idle until ack_wait expires and NATS
redelivers. Orthogonal to this PR.
Context
Under concurrent
async_apiresult processing, three paths all mutate thesame
jobs_jobrow:select_for_updatein_update_job_progress.(addressed by commit 1 here)
JobLogHandler.emit— UPDATE onjobs_job.logsper log line. The dominantbottleneck after commit 1 lands, because the view's per-iter
job.logger.info("Queued pipeline result: …")fires once per result in abatched POST body, stacking N UPDATEs in one
ATOMIC_REQUESTStx.(escape hatch via flag in commit 2; proper fix in PR Refactor job logging to use separate table #1259)
ATOMIC_REQUESTSholding the/resultand/tasksview tx across brokerRPCs. (out of scope — future PR with
@transaction.non_atomic_requests)Commit 1 alone shifts the bottleneck from #1 to #2 rather than eliminating
it (observed:
blocker_chaingrows from ~10 deep to ~56, queue still backsup). The flag in commit 2 provides a way to actually get the workshop
through the weekend while PR #1259 goes through review.
NATS redelivery amplifies Django contention
With
ack_wait=30spinned, any task that took longer than 30s to finish(S3 download + GPU cold-start + batch inference + synchronous POST) would
be redelivered by NATS before the original worker got a chance to ack.
Observed in the most recent 6-job load test: one 100-image job had 100
of 100 tasks redelivered — every task ran twice end-to-end.
The feedback loop that matters for this PR: each redelivered task produces
a second POST to
/api/v2/jobs/<id>/result/, which enqueues a secondprocess_nats_pipeline_resultcelery task, which contends for the samejobs_jobrow as the first. So a short ack-wait directly multiplies thevery row-lock load that commits 1-3 are trying to relieve — turning a
throughput shortfall on the ML side into doubled Django-side contention.
Wiring
NATS_TASK_TTRand raising the default (60s → 300s across thebranch, final value landed in
259c6592) cuts the redelivery rate;deployments seeing cold-start cost on 24 MB images can still bump further
via env var. Demo runs the code default (300s).
Evidence
py-spy before commit 1: 7/8
ForkPoolWorkerthreads stuck atpy-spy after commit 1, flag still True: same threads now stuck at
Staging-equivalent deployment, 5 concurrent
async_apijobs (10/100/500images, same pipeline):
pg_stat_activityblocker-chain depthidle_in_txholdingUPDATE jobs_job.logsfor 60s+ml_resultsqueue under loadLocal reproduction (dev compose,
WEB_CONCURRENCY=1, 8 ml fork workers,batched POST test at 50 results × 10 concurrent):
blocker_chaintrue(default)false6-job concurrent retest on demo (commits 1-3, flag=False, NATS_TASK_TTR=30
i.e. unwired): all 6 jobs eventually SUCCESS, confirming commits 1-3 are
correctness-safe under concurrent load. First job solo ran at 1.4 img/s
(356s for 500); subsequent concurrent jobs ran 2.3-11× slower with high
redelivery (j172 = 100/100 tasks redelivered, j171 = 37/100, j173 = 93/500).
Motivates commit 4.
Reproduction steps, prereqs, the Postgres query, and the load script are in
docs/claude/debugging/row-lock-contention-reproduction.md. Asingle-result-per-POST curl loop does not reproduce the pathology;
batching results per POST is load-bearing because the per-iter
job.logger.infoinside the view is where the UPDATE stacking happens.Trade-offs
Commit 1 — count drift under race. The
max()guard still preventsprogress-percentage regression between concurrent workers. Accumulated
counts (detections, classifications, captures) can drift by one batch if
two workers race the read-modify-write cycle. Cosmetic: underlying
DetectionandClassificationrows are written authoritatively bysave_results(), which runs before_update_job_progress. A fullyatomic version with
jsonb_setand server-side arithmetic is possible as afollow-up (#1256 option D).
Commit 2 — UI log feed goes blank while flag is off. Container stdout
still captures every line via the module-level app logger, so ops
observability is unchanged. Only the per-job UI log view loses new
entries for the duration the flag is disabled. Default stays
Truesoexisting deployments aren't affected.
Commit 4 — longer ack-wait delays crash detection. If a worker truly
dies mid-task, the
NATS_TASK_TTRseconds are wasted before redelivery.At the 300s default this is still acceptable for GPU cold-start on 24 MB
images — the redelivery-amplification cost dominates the crash-detection
latency under the workloads we see. Deployments with smaller payloads or
stricter freshness SLAs can tune down via env.
max_deliver(prod=5,demo=2) is the cap that prevents infinite retries.
Rollout plan
main; keep the flag defaultTrueandNATS_TASK_TTRdefault 300. Matches what demo/staging are already running.
JOB_LOG_PERSIST_ENABLED=falseon their Django container env andrestart. Revert by unsetting or switching to
true.NATS_TASK_TTRfurther via env; the 300s default is sized for the 24 MB image +
H100 cold-start case we've been testing.
Truelog path migrates to the newJobLogtable write; the flag can be removed in a cleanup PR.jsonb_setprogress update) lands, commit 1'scount-drift trade-off goes away.
Tested / verified
Unit tests (local,
docker compose -f docker-compose.ci.yml run --rm django python manage.py test ami.jobs.tests.test_jobs):TestListEndpointHeartbeatclass covering both the project-scoped and pipeline-slug heartbeat
paths (7 cases). Also green on CI for the latest SHA.
Local reproduction (dev compose,
WEB_CONCURRENCY=1, 8 ml fork workers,scripts/load_test_result_endpoint.pyat 50 results × 10 concurrent):select_for_updatetoJobLogHandler.emit;blocker_chaingrows from ~10 to ~37. 0/10batched POSTs succeed in 120s.
JOB_LOG_PERSIST_ENABLED=false:blocker_chain≤ 1transient, 20/20 batched POSTs succeed, p95 ~5s. Numbers match the
evidence table above.
Staging-equivalent deployment (Beluga), 5 concurrent
async_apijobs(10/100/500 images, same pipeline):
idle_in_txholding
UPDATE jobs_job.logsfor 60s+,ml_resultsqueue climbsunboundedly.
~1150 then drains to 0, no
idle_in_txholders. Confirms commits 1-3are correctness-safe under concurrent load.
Demo env (
demo.antenna.insectai.org):NATS_TASK_TTR(j171-j176,mixed sizes 10/10/100/100/500/500): all 6 jobs eventually SUCCESS
despite high redelivery (j172 = 100/100 tasks redelivered). Motivated
commit 4.
9cee66d8. Verified viacurl demo.antenna.insectai.org/api/v2/ml/processing_services/?project_id=27that
last_seen_liveflipped fromfalse(stuck for hours) →trueon all three async worker entries (gpu-02, gpu-03, gpu-04) within one
heartbeat window post-deploy — confirms the pipeline-slug heartbeat
path fires for the real ADC worker request shape.
antenna-dev-serbia + antenna-dev-arctia (full-stack dev benches running
the branch end-to-end):
test_ml_job_e2eSUCCESS on both boxes on themerged-with-Refactor job logging to use separate table #1259 integration branch (CPU-only serbia and H100-24GB
arctia, respectively).
Not yet verified:
JOB_LOG_PERSIST_ENABLEDflipped on (prodwill default
True; the flag-off path is covered by the staging5-job test above, not by a prod run).
Related
JobLogchild table (permanent fix that deprecates the flag)Summary by CodeRabbit
New Features
Performance
Documentation