Conversation
✅ Deploy Preview for antenna-ssec canceled.
|
✅ Deploy Preview for antenna-preview canceled.
|
There was a problem hiding this comment.
Pull request overview
This PR moves high-frequency per-job log persistence from the jobs_job.logs JSON field into an append-only joined JobLog table, and updates the API/UI to read job logs via a dedicated endpoint (with legacy JSON fallback).
Changes:
- Added
JobLogmodel + migration and updatedJobLogHandler.emit()to insert joined log rows. - Added
/api/v2/jobs/{id}/logs/endpoint + serializer helpers to return formatted stdout/stderr with legacy JSON fallback. - Updated UI job detail dialogs to fetch logs separately via a new
useJobLogshook; updated backend tests accordingly.
Reviewed changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| ui/src/pages/session-details/playback/capture-details/capture-job/capture-job-dialog.tsx | Fetch logs/errors via useJobLogs and pass into JobDetails. |
| ui/src/pages/jobs/jobs.tsx | Fetch logs/errors via useJobLogs and pass into JobDetails. |
| ui/src/pages/job-details/job-details.tsx | Make JobDetails accept logs/errors props and render them instead of job.logs/job.errors. |
| ui/src/data-services/hooks/jobs/useJobLogs.ts | New hook to fetch /jobs/{id}/logs/ and expose stdout/stderr arrays. |
| ami/ml/tests.py | Update pipeline error test to assert against JobLog rows. |
| ami/jobs/views.py | Add logs action endpoint returning serialized joined logs with a limit param. |
| ami/jobs/tests/test_jobs.py | Update assertions to read from JobLog rows; add logs endpoint test. |
| ami/jobs/tasks.py | Move per-job logger writes out of the select_for_update() transaction region. |
| ami/jobs/serializers.py | Add serialize_job_logs, adjust job serializers to use method field + new JobLogsSerializer. |
| ami/jobs/models.py | Add JobLog model and refactor JobLogHandler.emit() to append joined log rows. |
| ami/jobs/migrations/0021_joblog.py | Migration creating the JobLog table + index/ordering. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
…ntion Under concurrent async_api load the per-record UPDATE on jobs_job.logs becomes the dominant bottleneck after the select_for_update fix in the previous commit. Observed on demo: 5 concurrent jobs produced a 56-deep pg_blocking_pids chain, all waiting on UPDATE jobs_job SET logs = ... from either /result (view, under ATOMIC_REQUESTS) or _update_job_progress (ml worker). Short-circuit emit to the module-level logger only (container stdout). The per-job UI log feed goes blank while this hotfix is active; revisit once PR #1259 lands the append-only JobLog child table.
Replace the unconditional noop of JobLogHandler.emit from the previous commit with a feature flag (default True, preserves existing behavior). Deployments hitting row-lock contention on jobs_job can set JOB_LOG_PERSIST_ENABLED=false to short-circuit the per-record UPDATE until PR #1259's append-only JobLog table is in place. Validated locally (dev compose, WEB_CONCURRENCY=1, 8 ml fork workers, batched POSTs at 50 results × 10 concurrent): flag=true: blocker_chain=37, 0/10 POSTs complete (120s timeout) flag=false: blocker_chain=1 (trivial), 20/20 POSTs complete, p95=5.5s Also add: - scripts/load_test_result_endpoint.py — standalone batched-POST driver. A single-result-per-POST curl loop does NOT reproduce the contention because it skips the per-iter job.logger.info call inside one ATOMIC_REQUESTS tx. Batching N>1 results per POST is load-bearing. - docs/claude/debugging/row-lock-contention-reproduction.md — full runbook: pathology, prereqs, steps, pg_stat_activity queries, before/after signal table, flag usage.
4307d69 to
1cfe52a
Compare
|
Claude says (on behalf of @mihow): Force-pushed this branch to replace the earlier draft with a narrower scope. The core change — moving job-log writes off the parent row onto a child table — is preserved, but the new What stays:
What's removed:
What's new:
PR body has the full scope / out-of-scope list and local validation numbers. This PR depends on #1261 landing first (for the flag) — once that merges, this will rebase onto main and its diff will collapse to just the JobLog-specific commits. Next step on my side is soaking this under end-to-end concurrent-job load on a staging-equivalent deployment with a demo-DB copy, before requesting review. |
2b0d524 to
c320600
Compare
| } | ||
|
|
||
|
|
||
| def serialize_job_logs(job: Job, *, limit: int = JOB_LOGS_DEFAULT_LIMIT) -> dict[str, list[str]]: |
There was a problem hiding this comment.
Is it typical to have this in the serializer? feels like something for our jobs view
c320600 to
7207a29
Compare
* fix(jobs): drop select_for_update in _update_job_progress to break row-lock contention Under concurrent async_api result processing, every ML result task queued a contending exclusive lock on the jobs_job row via `select_for_update()`, stacking behind gunicorn view threads also holding the row under ATOMIC_REQUESTS (from per-iter job.logger.info writes in /result). Observed in a validation environment under 5 concurrent async_api jobs (2x10 + 2x100 + 1x500 images, one pipeline): py-spy stacks of celeryworker_ml showed 7 of 8 ForkPoolWorker threads stuck in `psycopg wait` -> `_update_job_progress (tasks.py:493)`. pg_stat_activity consistently showed 17-20 backends "idle in transaction" holding `SELECT ... FOR UPDATE OF jobs_job` for 0.5-5+ seconds at a time. The `max()` guard below the dropped lock still prevents progress regression: two racing workers that both read `progress=0.4` and both compute a new value still write monotonically via `max(existing, new)`. The trade-off is that accumulated counts (detections, classifications, captures) can drift by one batch under a read-modify-write race -- cosmetic only, since the underlying Detection/Classification rows are written authoritatively by `save_results` before this function runs. Refs: #1256 (log refactor proposal; this PR targets the parallel progress-update path, not the log-write path). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(jobs): short-circuit JobLogHandler.emit to break log-UPDATE contention Under concurrent async_api load the per-record UPDATE on jobs_job.logs becomes the dominant bottleneck after the select_for_update fix in the previous commit. Observed on demo: 5 concurrent jobs produced a 56-deep pg_blocking_pids chain, all waiting on UPDATE jobs_job SET logs = ... from either /result (view, under ATOMIC_REQUESTS) or _update_job_progress (ml worker). Short-circuit emit to the module-level logger only (container stdout). The per-job UI log feed goes blank while this hotfix is active; revisit once PR #1259 lands the append-only JobLog child table. * feat(jobs): add JOB_LOG_PERSIST_ENABLED flag + local repro docs Replace the unconditional noop of JobLogHandler.emit from the previous commit with a feature flag (default True, preserves existing behavior). Deployments hitting row-lock contention on jobs_job can set JOB_LOG_PERSIST_ENABLED=false to short-circuit the per-record UPDATE until PR #1259's append-only JobLog table is in place. Validated locally (dev compose, WEB_CONCURRENCY=1, 8 ml fork workers, batched POSTs at 50 results × 10 concurrent): flag=true: blocker_chain=37, 0/10 POSTs complete (120s timeout) flag=false: blocker_chain=1 (trivial), 20/20 POSTs complete, p95=5.5s Also add: - scripts/load_test_result_endpoint.py — standalone batched-POST driver. A single-result-per-POST curl loop does NOT reproduce the contention because it skips the per-iter job.logger.info call inside one ATOMIC_REQUESTS tx. Batching N>1 results per POST is load-bearing. - docs/claude/debugging/row-lock-contention-reproduction.md — full runbook: pathology, prereqs, steps, pg_stat_activity queries, before/after signal table, flag usage. * perf(jobs): wrap process_nats_pipeline_result with cachalot_disabled Under concurrent async_api load the ML result task is a pure write path: save_results inserts Detection/Classification rows and _update_job_progress UPDATEs jobs_job. Cachalot's post-write invalidation added ~2.5s of overhead per task in production measurement (issue #1256 Path 4), which throttles the ml_results queue drain when ADC POSTs arrive faster than Celery can clear them. Nothing inside the task benefits from the query cache — every read is of a freshly-written row — so skipping invalidation is strictly a throughput win. Decorator stack: @celery_app.task wraps the cachalot-wrapped function, so Celery sees the context manager enter/exit on each task run. This was prototyped on demo during the 2026-04-20 investigation but never committed (demo was reset to clean main before PR #1261 was opened). Restoring it here closes the last documented ML-path throughput regression. * chore(jobs): address PR #1261 review feedback - tasks.py: remove reference to non-existent planning doc; point at issue #1256 and PR #1261 instead (Copilot finding). - load_test_result_endpoint.py: move token to $ANTENNA_TOKEN env var (or optional --token) to keep secrets out of argv/shell history; validate batch/concurrency/rounds > 0; require http(s) scheme on --host; capture error type+reason per-request instead of returning bare -1 (CodeRabbit findings). - docs: update the repro runbook to match the new CLI shape. Co-Authored-By: Claude <noreply@anthropic.com> * fix(jobs): narrow job.save() to update_fields in _update_job_progress Previously `job.save()` (no update_fields) wrote the entire row, which under concurrent async_api load meant a stale read-modify-write could clobber unrelated fields mutated by a concurrent worker — most notably `progress.errors` (appended by `_reconcile_lost_images`) and `logs` (appended by JobLogHandler.emit when persistence is enabled). Narrow the write to the fields this function actually mutates (progress, status, finished_at) plus updated_at so auto_now still fires. The underlying read-modify-write race remains — this only limits blast radius, not the race itself. The counter-drift and `max()` guard semantics discussed in the block comment above are unchanged. Address PR #1261 CodeRabbit review feedback on ami/jobs/tasks.py:514. Co-Authored-By: Claude <noreply@anthropic.com> * feat(settings): wire NATS_TASK_TTR env var with 60s default nats_queue.py already reads getattr(settings, "NATS_TASK_TTR", 30), but the setting was never declared in base.py — so the ack_wait was pinned at 30s everywhere regardless of env. Wire it through and raise the default to 60s: 30s is too tight for 24 MB images + GPU cold start + synchronous result POST, and caused spurious NATS redeliveries under concurrent load. Per-env override via NATS_TASK_TTR (demo bumps this further to 120s). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * feat(jobs): fair worker polling via last-touched ordering Workers polling GET /api/v2/jobs/?ids_only=1&incomplete_only=1 currently get jobs in PK order (oldest first), so N concurrent pollers all race to the same first job. Under the workshop load (4 pollers × 4 concurrent jobs) this caused the drain-one-job-at-a-time pattern observed in the 6-job retest: j192 starved at 0% for ~500s while all pollers hammered j191, and NATS nearly exhausted max_deliver=2 on j192. Order ids_only responses by updated_at ASC (least-recently-touched first). updated_at freshens on every progress write, log write, or status transition, so actively-worked jobs drop to the bottom and starved jobs bubble to the top — rotation without a dedicated last_polled_at field. Fairness is a property of the serving endpoint, not of the worker. Third-party workers with their own consumption patterns should still see evenly-distributed handoffs; keeping the scheduling logic in Antenna rather than ADC preserves that guarantee. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * feat(jobs): default ids_only to limit=1 for pop()-style polling Worker polling (?ids_only=1) now hands back one least-recently-touched job per request rather than the default paginated batch of 10. Concurrent workers were caching the full list and draining it serially, which starved later jobs until the cache drained. Forcing a re-poll per job lets the updated_at fairness sort rotate work across jobs every iteration. Callers who want a batch can still pass ?limit=N or ?page_size=N explicitly — back-compat preserved for the few non-polling ids_only consumers. No worker-side change required: existing clients iterate a list of zero or more and work identically with a list of length one. * fix(jobs): randomize ids_only ordering instead of updated_at sort The updated_at sort had a degenerate case: freshly queued jobs all share near-identical updated_at (ties broken by pk), so simultaneous pollers deterministically pick the same "oldest" job. Replacing the sort with ORDER BY RANDOM() gives probabilistic disjoint assignment across concurrent pollers without writing a poll-stamp column. Combined with the limit=1 default on ?ids_only=1, each worker poll is an independent "pick any unfinished job" draw. Fairness is probabilistic rather than strictly oldest-first, but that's an acceptable trade given we want to avoid write-on-read. If we later decide to enforce strict fairness, the right follow-up is a dedicated last_polled_at column with UPDATE ... RETURNING semantics, not a return to the updated_at sort. * test(jobs): update ids_only test for pop()-style default Existing test_list_jobs_with_ids_only now passes ?limit=10 explicitly to assert the "list all" behavior. New test_list_jobs_ids_only_pops_one covers the new default where ?ids_only=1 returns one job to support pop()-style polling handoff across workers. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * feat(jobs): heartbeat on idle ids_only polls The pipeline-scoped heartbeat in _mark_pipeline_pull_services_seen only fires when a worker hits /tasks/ or /result/ on an active job, so workers idling on GET /jobs/?ids_only=1 between jobs had no heartbeat path. Their last_seen would age past PROCESSING_SERVICE_LAST_SEEN_MAX and the UI would flip them offline despite being actively online. Add _mark_async_services_seen_for_project + matching celery task that marks every async ProcessingService attached to the polling project as seen. Same Redis throttle pattern as the existing heartbeat (HEARTBEAT_THROTTLE_SECONDS = 30s), scoped by project rather than (pipeline, project) since the list endpoint has no pipeline context. * feat(settings): raise NATS_TASK_TTR default to 300s The 60s default worked on push-mode services but was still too tight for pull-mode ADC workers running cold-start GPU pipelines on 24 MB images (staging observed a nontrivial redelivery rate at 60s; demo held steady only after being bumped to 120s, then 300s). Raise the default to 300s so new deployments inherit the ceiling that staging/demo actually run at. max_deliver still bounds how long a genuinely crashed worker can hold tasks invisibly. * fix(jobs): heartbeat on ids_only polls when only pipeline__slug__in is sent The project-scoped heartbeat added in 44d4c32 never fires for the actual ADC worker request: ``GET /api/v2/jobs/?pipeline__slug__in=...&ids_only=1`` does not carry a ``project_id``, so ``get_active_project()`` returns None and the helper short-circuits. Result: async processing services remained ``last_seen_live=null`` forever on demo despite workers polling every 5s. The commit's original rationale — that the list endpoint has no pipeline context — was wrong: the worker's filter slugs are right there in the query string. Add a sibling path that derives scope from those slugs: - ``update_async_services_seen_for_pipelines(pipeline_slugs)`` celery task bumps ``last_seen`` on async services whose pipelines match any slug. - ``_mark_async_services_seen_for_pipelines`` throttles dispatch via a Redis key scoped on the sorted slug set. - The ``list()`` heartbeat branch prefers the pipeline-slug path when those slugs are present, and falls back to the project-scoped path for callers that send ``?project_id=``. Tests cover the real worker shape (no project_id, slugs in query) and the task's filter semantics against sync + async + unrelated services. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * docs(jobs): address PR #1261 review feedback Response to reviewer comments on the async_api concurrency tuning PR: - views.py: mark the three `ids_only=1` overrides (random ordering, paginator default_limit=1, heartbeat dispatch) as temporary — they exist only because list() is doubling as a claim-next-job endpoint. A dedicated `/next` action is tracked as #1265; comments carry a hard-coded removal target of 2026-04-24 to keep the expiration visible. - views.py: rewrite the `_mark_async_services_seen_for_pipelines` and `_mark_async_services_seen_for_project` docstrings so the contract is obvious — they are Redis-throttled wrappers around the matching celery task. No DB work in the wrapper. - tasks.py: add TODO on both `update_async_services_seen_*` tasks pointing at #1194 (client-ID auth), so the "one poller falsely marks its peers live" invariant-break is visible in source, not just in the PR body. - tasks.py: fix the stale comment in `_update_job_progress` — the previous wording claimed the narrowed save() avoids overwriting `updated_at`, but `updated_at` is in `update_fields` intentionally (Django skips auto_now when update_fields is provided). - config/settings/base.py: add a one-line plain-language purpose for `JOB_LOG_PERSIST_ENABLED` so the flag reads at-a-glance. No behavior changes. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Replaces the jobs_job.logs JSON-field UPDATE path in JobLogHandler.emit with an INSERT on a new JobLog child table. Under concurrent async_api load every log line triggered UPDATE jobs_job SET logs = ... on the shared job row; inside ATOMIC_REQUESTS a single batched /result POST stacked N such UPDATEs in one tx, blocking ML workers on the same row (issue #1256). - JobLog model + migration 0021_joblog (FK job, level, message, created_at) - JobLogHandler.emit -> JobLog.objects.create (gated by JOB_LOG_PERSIST_ENABLED flag introduced in PR #1261) - JobSerializer.logs -> SerializerMethodField that reads JobLog rows on detail endpoint and falls back to the legacy jobs_job.logs JSON for jobs created before this table existed. List endpoint keeps the cheap legacy shape to avoid N+1. - No UI changes, no new endpoint. On-wire {stdout, stderr} shape preserved. Co-Authored-By: Claude <noreply@anthropic.com>
- Route existing tests that assert on job.logs.stdout to read from JobLog rows via a joined_job_log_messages helper - Same for ami/ml/tests.py error-logging assertion - Add TestJobLogPersistence class with 6 new cases: - emit inserts one JobLog per call; legacy JSON column stays empty - flag=False short-circuits emit (no rows, no JSON write) - serialize_job_logs reads JobLog rows newest-first and splits stderr - serialize_job_logs falls back to legacy JSON when no rows exist - get_logs on list action returns legacy shape (no JobLog query) - get_logs on retrieve action reads JobLog rows 47/47 pass in ami.jobs.tests.test_jobs. Co-Authored-By: Claude <noreply@anthropic.com>
3381239 to
a359686
Compare
Summary
Move job-log persistence off the
jobs_job.logsJSON-field UPDATE path onto a joined append-onlyJobLogtable, without changing the UI or adding new endpoints. Replaces the earlier draft of this PR (new/logs/endpoint + UI rewire) with a narrower, serializer-compat approach.List of Changes
JobLogmodel and migration (0021_joblog) for append-only per-job log rowsJobLogHandler.emit()to insert one joined log row instead of refreshing and rewriting the parentjobs_jobrow (gated by theJOB_LOG_PERSIST_ENABLEDflag introduced in fix(jobs): fixes for concurrent ML processing jobs #1261)JobListSerializer.logs/JobSerializer.logsto aSerializerMethodFieldthat reads joined rows on detail responses, falls back to the legacyjobs_job.logsJSON for jobs created before this table existed, and keeps the cheap legacy shape on list responses to avoid N+1Not in this PR (deliberately deferred)
/logs/API endpoint (the earlier draft added one)job.logs.{stdout,stderr}as beforejobs_job.logsJSON intoJobLog— old jobs keep surfacing through the fallback path until natural rotation drops themjobs_job.logsJSONB column — separate cleanup PR after soakjsonb_setatomic version of_update_job_progress(feat(job): refactor job logging so it isn't a bottleneck #1256 option D) — insurance for higher concurrency, not needed now@transaction.non_atomic_requestson/result//tasksviews — separate PRRelated
select_for_updatein_update_job_progressand adds theJOB_LOG_PERSIST_ENABLEDflag this PR reuses. Needs to merge first; this PR rebases ontomainafter that lands.Why this scope
The earlier draft of this PR added a
/api/v2/jobs/{id}/logs/endpoint and changed 5 UI files to consume it. That's more surface area than the fix requires: the only functional change needed is moving the write path off the row-locked JSON field and onto a child table. Keeping the serializer shape identical lets the UI stay on its current contract, and aSerializerMethodFieldwith a legacy-JSON fallback covers the read side without a schema break or a transition endpoint. Rollout is one Django migration + one code change, no UI deploy coordination.How to Test
Unit:
Local reproduction of the row-lock pathology (prereqs and exact steps in
docs/claude/debugging/row-lock-contention-reproduction.md):Observed on the dev stack (WEB_CONCURRENCY=1, 8 Celery ML-fork workers), flag
JOB_LOG_PERSIST_ENABLED=true(default with the new write path):blocker_chainpeakblocker_chainpeak (20×50)idle_in_txThe latency delta vs. flag=false is the expected cost of actually persisting the rows (vs. short-circuiting emit entirely). The flag=true → JobLog path trades that small cost for no 120s-timeout stalls.
Deployment Notes
ami/jobs/migrations/0021_joblog.pybefore deploy so new writes land on theJobLogtable. The migration is additive — no downtime, no backfill.main; will be rebased at that point so the diff collapses to the JobLog-specific changes only.Checklist