fix(jobs): throttle and defer pipeline heartbeat update#1258
Conversation
Move _mark_pipeline_pull_services_seen off the HTTP request path by dispatching a new Celery task (update_pipeline_pull_services_seen) via .delay() from the /tasks and /result endpoints. The task throttles DB writes to once per ~30s per job, cutting concurrent UPDATE pressure under async_api load. Adds 6 unit tests covering the dispatch, throttle, and no-op edge cases. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
✅ Deploy Preview for antenna-preview canceled.
|
✅ Deploy Preview for antenna-ssec canceled.
|
📝 WalkthroughWalkthroughThis pull request refactors the pipeline service heartbeat mechanism from synchronous database updates to asynchronous Celery task dispatch. A new task with write-throttling (30-second window) handles updating Changes
Sequence Diagram(s)sequenceDiagram
participant Client as HTTP Client
participant View as Job API Endpoint
participant Queue as Celery Task Queue
participant Worker as Task Worker
participant DB as Database
Client->>View: POST /api/job-tasks or /api/job-result
activate View
View->>Queue: update_pipeline_pull_services_seen.delay(job_id)
deactivate View
View-->>Client: Response (202/200)
activate Queue
Queue->>Worker: Route Task
deactivate Queue
activate Worker
Worker->>DB: Load Job (with pipeline)
activate DB
DB-->>Worker: Job data
deactivate DB
Worker->>DB: Read last_seen for async services
activate DB
DB-->>Worker: Service timestamps
deactivate DB
alt Within Throttle Window (< 30s old)
Worker->>Worker: Skip update (cheap read)
else Outside Throttle Window
Worker->>DB: UPDATE ProcessingService<br/>(last_seen=now, last_seen_live=true)
activate DB
DB-->>Worker: Update complete
deactivate DB
end
deactivate Worker
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~22 minutes Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
🧹 Nitpick comments (1)
ami/jobs/tasks.py (1)
82-86: Minor: avoid evaluating the heartbeat QuerySet twice.
recent_seen = services_qs.values_list("last_seen", flat=True)is a lazyQuerySet. On line 83,if recent_seenissues anEXISTS/LIMIT 1query, and thenfor ts in recent_seenissues the fullSELECTagain — so on the "stale" path we're running three DB statements (truthiness probe + full SELECT + UPDATE) instead of two. Since the whole point of this task is to cut DB chatter under concurrent requests, it's worth materializing once and letting an empty list short-circuit the UPDATE:♻️ Suggested refactor
- # Cheap read: skip the UPDATE if every matching service was seen recently. - recent_seen = services_qs.values_list("last_seen", flat=True) - if recent_seen and all(ts is not None and ts >= throttle_cutoff for ts in recent_seen): - return - - services_qs.update(last_seen=now, last_seen_live=True) + # Cheap read: skip the UPDATE if every matching service was seen recently. + recent_seen = list(services_qs.values_list("last_seen", flat=True)) + if not recent_seen: + # No matching async services — nothing to heartbeat. + return + if all(ts is not None and ts >= throttle_cutoff for ts in recent_seen): + return + + services_qs.update(last_seen=now, last_seen_live=True)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ami/jobs/tasks.py` around lines 82 - 86, The QuerySet recent_seen (from services_qs.values_list("last_seen", flat=True)) is being evaluated multiple times causing extra DB queries; materialize it once into a list (e.g., recent_seen_list = list(services_qs.values_list(...))) and then use that list in the truthiness check and the all(...) loop against throttle_cutoff, keeping the services_qs.update(last_seen=now, last_seen_live=True) unchanged so you only run the SELECT once before deciding to return or update.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@ami/jobs/tasks.py`:
- Around line 82-86: The QuerySet recent_seen (from
services_qs.values_list("last_seen", flat=True)) is being evaluated multiple
times causing extra DB queries; materialize it once into a list (e.g.,
recent_seen_list = list(services_qs.values_list(...))) and then use that list in
the truthiness check and the all(...) loop against throttle_cutoff, keeping the
services_qs.update(last_seen=now, last_seen_live=True) unchanged so you only run
the SELECT once before deciding to return or update.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 8c627f4f-2a9d-4f2f-9fde-7d99386df3b1
📒 Files selected for processing (3)
ami/jobs/tasks.pyami/jobs/tests/test_jobs.pyami/jobs/views.py
There was a problem hiding this comment.
Pull request overview
This PR reduces database lock contention from async worker polling by moving the “pipeline pull services heartbeat” update out of request/response flow and into a throttled Celery task.
Changes:
- Enqueue heartbeat updates via a new
update_pipeline_pull_services_seenCelery task instead of doing inlineQuerySet.update()in the views. - Add task-level throttling intended to skip frequent redundant heartbeat writes.
- Add unit tests covering view dispatch and task update/skip/no-op behaviors.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 6 comments.
| File | Description |
|---|---|
ami/jobs/views.py |
Switches heartbeat marking from synchronous DB update to Celery .delay() dispatch. |
ami/jobs/tasks.py |
Adds the new heartbeat task and throttle logic to reduce write frequency under concurrency. |
ami/jobs/tests/test_jobs.py |
Adds tests validating view dispatch and task throttling/no-op scenarios. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
* fix heartbeat review feedback Agent-Logs-Url: https://github.com/RolnickLab/antenna/sessions/bc1907bb-7118-4133-abab-4c4dd852ecc0 Co-authored-by: mihow <158175+mihow@users.noreply.github.com> * refine heartbeat timestamp handling Agent-Logs-Url: https://github.com/RolnickLab/antenna/sessions/bc1907bb-7118-4133-abab-4c4dd852ecc0 Co-authored-by: mihow <158175+mihow@users.noreply.github.com> * align heartbeat timestamps with local time Agent-Logs-Url: https://github.com/RolnickLab/antenna/sessions/bc1907bb-7118-4133-abab-4c4dd852ecc0 Co-authored-by: mihow <158175+mihow@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: mihow <158175+mihow@users.noreply.github.com>
Add a view-level cache.add() gate in _mark_pipeline_pull_services_seen keyed on (pipeline_id, project_id) with a HEARTBEAT_THROTTLE_SECONDS timeout. Previously every /tasks and /result request enqueued a Celery task whose sole job was usually to check the throttle and return; now most requests skip the enqueue entirely. The task's own stale-row check remains as a safety net under cache eviction. Key layout is intentionally noted to move to heartbeat:service:<id> once per-service identification lands via application-token auth (PR #1117), so one service's poll cannot suppress another's heartbeat. Adds test_view_gate_suppresses_redundant_dispatches; clears cache in setUp so the gate does not leak state between tests. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
| self.assertIn("async_api", resp.json()[0].lower()) | ||
|
|
||
|
|
||
| class TestPipelineHeartbeatTask(APITestCase): |
There was a problem hiding this comment.
This is a of tests for a "nice to have" feature that is slowing down our required core features. What is necessary?
The view's Redis cache gate (HEARTBEAT_THROTTLE_SECONDS, added in fd8e379) is the load-bearing throttle — it skips the whole .delay() when a recent heartbeat has already fired for the same (pipeline, project). With dispatch already gated, the task doesn't need its own throttle/staleness machinery. - Drop seen_at_iso param, Q-filter, .exists() preflight, expires=. - Task is now just .update(last_seen=now(), last_seen_live=True). - Drop 5 task-body tests (staleness, skip-when-recent, no-op-for- missing-job, no-op-for-no-pipeline, regression-guard) that exercised logic we removed. Keep 4: two endpoint dispatch smokes, broker- failure tolerance, and the view-gate suppression test. Crash-safety is unchanged: the write is still off the gunicorn request path, which was the SIGSEGV mitigation.
Problem
Under concurrent async_api jobs, N gunicorn workers call
_mark_pipeline_pull_services_seensimultaneously from both the/tasks(GET) and/result(POST) endpoints. The function issued an unguardedQuerySet.update()that hit every asyncProcessingServicerow matching the(pipeline, project)pair — so every request wrote the same shared rows with no throttle, noF()expression, and no coordination.py-spy dumps taken 26s and 57s before a gunicorn SIGSEGV in a deployment under concurrent load showed two threads blocked in this exact
.update()→ psycopgwait(). The function runs inside cachalot's monkey-patch, which layers cache invalidation on top of the concurrent UPDATE. Whether the lock contention is a direct contributor to the C-level crash is still being investigated separately; this PR is scoped to relieve the concurrency hotspot regardless of that outcome.The heartbeat is a visibility feature —
check_processing_services_onlinereadslast_seenagainst a 60s window — not a correctness-critical path. Writing on every request is unnecessary.Goal
Get this DB write off the gunicorn request path entirely, and stop firing it on every request when once per ~30s is enough to keep the 60s offline check honest.
Changes
1. Fire-and-forget via Celery. The view now dispatches
update_pipeline_pull_services_seen.delay(job.pk)(new task inami/jobs/tasks.py) instead of running the UPDATE inline. The HTTP request returns without waiting on the DB write, so the gunicorn path — where the SIGSEGV was observed — no longer touches this UPDATE at all. The task itself is minimal: oneQuerySet.update().2. Throttle at the view, before dispatch. Before
.delay(), the view takes a Redis cache lock viacache.add(key, timeout=HEARTBEAT_THROTTLE_SECONDS)keyed on(pipeline_id, project_id).cache.addis cross-process atomic on Redis, so concurrent gunicorn workers see exactly one winner per 30s window; the rest short-circuit before hitting the broker. This is what collapses the write rate — the task itself has no throttle because it doesn't need one.3. Broker tolerance. If
.delay()raises (broker down, connection reset, OSError), the view logs and continues. The heartbeat is non-critical; a broker blip should not fail the request.4. Scope note. The existing filter —
pipeline.processing_services.async_services().filter(projects=job.project_id)— is the tightest scope available until per-service application-token auth lands. Once #1117 is merged, the cache key and the UPDATE target should narrow to the individual calling service; the docstrings call this out and a tracking comment has been posted on #1117.What we still need to verify
psycopg wait()→ C-level crash link is inferred from py-spy stacks, not a controlled experiment. Moving the write off the request path should remove the specific contention signal py-spy saw, but the crash could have co-factors. Next step if it recurs: gdb + core dump on the crashing container.last_seenstays fresh enough under the 30s gate. The offline check uses a 60s window, so the margin is 2×. Should be fine, but flagging as a theoretical edge case if Celery queues back up.(pipeline, project)to one.delay()per 30s, so task volume should scale roughly withactive_jobs / 30srather than request rate.Tests
4 tests in
TestPipelineHeartbeatTask:test_tasks_endpoint_dispatches_heartbeat_task—.delay(job.pk)fires from/taskstest_result_endpoint_dispatches_heartbeat_task—.delay(job.pk)fires from/resulttest_tasks_endpoint_tolerates_heartbeat_dispatch_failure—/tasksstill returns 200 when the broker raisestest_view_gate_suppresses_redundant_dispatches— 5 rapid calls collapse to 1.delay()Pre-existing failures in
TestJobView(duplicate-key) are unrelated and present onmain.Relation to upcoming work
Once application-token auth (#1117) lands, this can be scoped from
(pipeline, project)to the individual calling service — cutting the UPDATE target from N services to 1 and letting each service have its own independent 30s gate.Summary by CodeRabbit
Performance Improvements
Testing