diff --git a/ami/jobs/models.py b/ami/jobs/models.py index 8acb718cd..4111034f8 100644 --- a/ami/jobs/models.py +++ b/ami/jobs/models.py @@ -8,6 +8,7 @@ import pydantic from celery import uuid from celery.result import AsyncResult +from django.conf import settings from django.db import models, transaction from django.utils.text import slugify from django_pydantic_field import SchemaField @@ -333,9 +334,21 @@ def __init__(self, job: "Job", *args, **kwargs): super().__init__(*args, **kwargs) def emit(self, record: logging.LogRecord): - # Log to the current app logger + # Log to the current app logger (container stdout). logger.log(record.levelno, self.format(record)) + # Gated by ``JOB_LOG_PERSIST_ENABLED`` (default True). Persisting every + # log line to ``jobs_job.logs`` becomes a row-lock contention point + # under concurrent async_api load — each call triggers + # ``UPDATE jobs_job SET logs = ...`` on the shared job row, and inside + # ``ATOMIC_REQUESTS`` a single batched ``/result`` POST stacks N such + # UPDATEs in one tx, blocking every ML worker on the same row for the + # duration of the request. Deployments hitting that pattern can set the + # flag to False to short-circuit here until PR #1259 lands an + # append-only ``JobLog`` child table. See issue #1256. + if not getattr(settings, "JOB_LOG_PERSIST_ENABLED", True): + return + # Write to the logs field on the job instance. # Refresh from DB first to reduce the window for concurrent overwrites — each # worker holds its own stale in-memory copy of `logs`, so without a refresh the diff --git a/ami/jobs/tasks.py b/ami/jobs/tasks.py index 629c95828..f3c996f86 100644 --- a/ami/jobs/tasks.py +++ b/ami/jobs/tasks.py @@ -7,6 +7,7 @@ from typing import TYPE_CHECKING from asgiref.sync import async_to_sync, sync_to_async +from cachalot.api import cachalot_disabled from celery.signals import task_failure, task_postrun, task_prerun from django.db import transaction from redis.exceptions import RedisError @@ -76,6 +77,65 @@ def update_pipeline_pull_services_seen(job_id: int) -> None: ) +@celery_app.task( + soft_time_limit=10, + time_limit=15, + ignore_result=True, +) +def update_async_services_seen_for_pipelines(pipeline_slugs: list[str]) -> None: + """ + Heartbeat for idle worker polls on + ``GET /api/v2/jobs/?pipeline__slug__in=...&ids_only=1``. + + The ADC worker sends pipeline slugs but no project_id (one worker may serve + pipelines across many projects), so scope the heartbeat by the pipelines it + asked about. Marks every async ProcessingService linked to any of those + pipelines as seen. + + TODO: once #1194 (client-ID / application-token auth) lands, scope this + update to the specific calling ProcessingService rather than every service + matching the slugs. Currently one poller's heartbeat falsely marks its + peers live. + """ + from ami.ml.models import ProcessingService # avoid circular import + + if not pipeline_slugs: + return + + ProcessingService.objects.async_services().filter( + pipelines__slug__in=pipeline_slugs, + ).distinct().update( + last_seen=datetime.datetime.now(), + last_seen_live=True, + ) + + +@celery_app.task( + soft_time_limit=10, + time_limit=15, + ignore_result=True, +) +def update_async_services_seen_for_project(project_id: int) -> None: + """ + Heartbeat for idle worker polls on ``GET /api/v2/jobs/?ids_only=1``. + + Fallback path used only when the request carries ``?project_id=`` without + ``pipeline__slug__in`` — the ADC worker does not currently send this shape, + so in practice the pipeline-slug task above is the one that fires. + + TODO: once #1194 (client-ID / application-token auth) lands, scope this + update to the specific calling ProcessingService rather than every async + service attached to the project. Currently one poller's heartbeat falsely + marks its peers live. + """ + from ami.ml.models import ProcessingService # avoid circular import + + ProcessingService.objects.async_services().filter(projects=project_id).update( + last_seen=datetime.datetime.now(), + last_seen_live=True, + ) + + @celery_app.task(bind=True, soft_time_limit=default_soft_time_limit, time_limit=default_time_limit) def run_job(self, job_id: int) -> None: from ami.jobs.models import Job @@ -158,6 +218,15 @@ def _log_worker_availability(job) -> None: soft_time_limit=300, # 5 minutes time_limit=360, # 6 minutes ) +# Disable cachalot cache invalidation for this task. Each call writes +# Detection/Classification rows and UPDATEs jobs_job; under concurrent +# async_api load, cachalot's post-write invalidation added ~2.5s/task +# (measured on demo, issue #1256 Path 4). This is a pure write path — +# nothing inside benefits from the query cache — so skipping invalidation +# is strictly a throughput win. Celery task decorator stack order matters: +# @celery_app.task wraps the cachalot-wrapped function, so Celery sees the +# cachalot context manager enter/exit on every task execution. +@cachalot_disabled() def process_nats_pipeline_result(self, job_id: int, result_data: dict, reply_subject: str) -> None: """ Process a single pipeline result asynchronously. @@ -489,8 +558,19 @@ def _update_job_progress( ) -> None: from ami.jobs.models import Job, JobState # avoid circular import + # NOTE: Previously this used `select_for_update()` inside `transaction.atomic()` + # to serialize concurrent progress updates for the same job. Under concurrent + # async_api result processing that serialization became a bottleneck: every + # ML result task queued a contending exclusive lock on the `jobs_job` row, + # stacking behind gunicorn view threads also holding the row under + # ATOMIC_REQUESTS. The `max()` guard below still prevents progress regression + # between concurrent workers; the trade-off is that accumulated counts + # (detections/classifications/captures) can drift by one batch under race — + # cosmetic only, since the underlying `Detection`/`Classification` rows are + # written authoritatively by `save_results` before this function runs. + # See issue #1256 and PR #1261. with transaction.atomic(): - job = Job.objects.select_for_update().get(pk=job_id) + job = Job.objects.get(pk=job_id) # For results stage, accumulate detections/classifications/captures counts if stage == "results": @@ -557,7 +637,14 @@ def _update_job_progress( job.progress.summary.status = complete_state job.finished_at = datetime.datetime.now() # Use naive datetime in local time job.logger.info(f"Updated job {job_id} progress in stage '{stage}' to {progress_percentage*100}%") - job.save() + # Narrow the write to the fields we actually mutated. Without this, a full + # save() would overwrite `logs` and any other field on the instance + # fetched at the top of this block — so a concurrent worker's append to + # `progress.errors` (via `_reconcile_lost_images`) or log line (via + # JobLogHandler) could be clobbered by a stale read-modify-write. + # `updated_at` is listed explicitly because Django skips `auto_now` bumps + # when `update_fields` is provided. See PR #1261 review feedback. + job.save(update_fields=["progress", "status", "finished_at", "updated_at"]) try: _log_job_throughput(job, stage) except Exception as e: diff --git a/ami/jobs/tests/test_jobs.py b/ami/jobs/tests/test_jobs.py index 90d1f6baa..ee8d833cd 100644 --- a/ami/jobs/tests/test_jobs.py +++ b/ami/jobs/tests/test_jobs.py @@ -375,7 +375,11 @@ def test_list_jobs_with_ids_only(self): self._create_job("Test job 3", start_now=False) self.client.force_authenticate(user=self.user) - jobs_list_url = reverse_with_params("api:job-list", params={"project_id": self.project.pk, "ids_only": True}) + # Pass an explicit limit to override the pop()-style default (see test_list_jobs_ids_only_pops_one below). + jobs_list_url = reverse_with_params( + "api:job-list", + params={"project_id": self.project.pk, "ids_only": True, "limit": 10}, + ) resp = self.client.get(jobs_list_url) self.assertEqual(resp.status_code, 200) @@ -388,6 +392,21 @@ def test_list_jobs_with_ids_only(self): # Verify we don't get the full results structure self.assertNotIn("details", data["results"][0]) + def test_list_jobs_ids_only_pops_one(self): + """`?ids_only=1` without an explicit limit returns one job (pop()-style handoff).""" + self._create_job("Test job 2", start_now=False) + self._create_job("Test job 3", start_now=False) + + self.client.force_authenticate(user=self.user) + jobs_list_url = reverse_with_params("api:job-list", params={"project_id": self.project.pk, "ids_only": True}) + resp = self.client.get(jobs_list_url) + + self.assertEqual(resp.status_code, 200) + data = resp.json() + self.assertEqual(data["count"], 3) + self.assertEqual(len(data["results"]), 1) + self.assertIsInstance(data["results"][0]["id"], int) + def test_list_jobs_with_incomplete_only(self): """Test the incomplete_only parameter filters jobs correctly.""" # Create jobs via API @@ -1157,3 +1176,141 @@ def test_view_gate_suppresses_redundant_dispatches(self): _mark_pipeline_pull_services_seen(self.job) self.assertEqual(mock_delay.call_count, 1) + + +class TestListEndpointHeartbeat(APITestCase): + """ + Unit tests for _mark_async_services_seen_for_project and the list endpoint's + heartbeat dispatch on ``?ids_only=1`` polls. + """ + + def setUp(self): + from django.core.cache import cache + + cache.clear() + + self.project = Project.objects.create(name="List Heartbeat Project") + self.service = ProcessingService.objects.create( + name="List Heartbeat Worker", + endpoint_url=None, + ) + self.service.projects.add(self.project) + + self.user = User.objects.create_user(email="list-heartbeat@example.com", is_superuser=True, is_active=True) + self.client.force_authenticate(user=self.user) + + def test_list_with_ids_only_dispatches_heartbeat(self): + from unittest.mock import patch + + list_url = reverse_with_params("api:job-list", params={"project_id": self.project.pk, "ids_only": True}) + with patch("ami.jobs.views.update_async_services_seen_for_project.delay") as mock_delay: + resp = self.client.get(list_url) + + self.assertEqual(resp.status_code, 200) + mock_delay.assert_called_once_with(self.project.pk) + + def test_list_without_ids_only_does_not_dispatch_heartbeat(self): + from unittest.mock import patch + + list_url = reverse_with_params("api:job-list", params={"project_id": self.project.pk}) + with patch("ami.jobs.views.update_async_services_seen_for_project.delay") as mock_delay: + resp = self.client.get(list_url) + + self.assertEqual(resp.status_code, 200) + mock_delay.assert_not_called() + + def test_list_heartbeat_tolerates_dispatch_failure(self): + """Broker unavailability on heartbeat enqueue must not break the list response.""" + from unittest.mock import patch + + from kombu.exceptions import OperationalError + + list_url = reverse_with_params("api:job-list", params={"project_id": self.project.pk, "ids_only": True}) + with patch( + "ami.jobs.views.update_async_services_seen_for_project.delay", + side_effect=OperationalError("broker unavailable"), + ): + resp = self.client.get(list_url) + + self.assertEqual(resp.status_code, 200) + + def test_view_gate_suppresses_redundant_list_dispatches(self): + """Rapid repeated list polls should dispatch at most once per throttle window.""" + from unittest.mock import patch + + from ami.jobs.views import _mark_async_services_seen_for_project + + with patch("ami.jobs.views.update_async_services_seen_for_project.delay") as mock_delay: + for _ in range(5): + _mark_async_services_seen_for_project(self.project.pk) + + self.assertEqual(mock_delay.call_count, 1) + + def test_list_with_pipeline_slugs_no_project_dispatches_heartbeat(self): + """Real ADC worker shape: ?ids_only=1&pipeline__slug__in=... with no project_id.""" + from unittest.mock import patch + + pipeline = Pipeline.objects.create(name="Heartbeat Pipeline", slug="heartbeat-pipeline") + self.service.pipelines.add(pipeline) + + list_url = reverse_with_params( + "api:job-list", + params={"ids_only": True, "pipeline__slug__in": "heartbeat-pipeline"}, + ) + with patch("ami.jobs.views.update_async_services_seen_for_pipelines.delay") as mock_delay: + resp = self.client.get(list_url) + + self.assertEqual(resp.status_code, 200) + mock_delay.assert_called_once_with(["heartbeat-pipeline"]) + + def test_task_updates_services_via_pipeline_slug(self): + """The pipeline-slug celery task marks matching async services live.""" + import datetime + + from ami.jobs.tasks import update_async_services_seen_for_pipelines + + pipeline = Pipeline.objects.create(name="Slug Pipeline", slug="slug-pipeline") + self.service.pipelines.add(pipeline) + unrelated = ProcessingService.objects.create(name="Unrelated Async", endpoint_url=None) + unrelated_last_seen_before = unrelated.last_seen + + before = datetime.datetime.now() - datetime.timedelta(seconds=1) + update_async_services_seen_for_pipelines(["slug-pipeline"]) + + self.service.refresh_from_db() + unrelated.refresh_from_db() + + self.assertTrue(self.service.last_seen_live) + self.assertIsNotNone(self.service.last_seen) + self.assertGreaterEqual(self.service.last_seen, before) + self.assertEqual(unrelated.last_seen, unrelated_last_seen_before) + + def test_task_updates_all_project_async_services(self): + """The celery task marks every async service on the project live.""" + import datetime + + from ami.jobs.tasks import update_async_services_seen_for_project + + other_async = ProcessingService.objects.create(name="Other Async", endpoint_url=None) + other_async.projects.add(self.project) + sync_service = ProcessingService.objects.create( + name="Sync Service", endpoint_url="http://nonexistent-host:9999" + ) + sync_service.projects.add(self.project) + sync_last_seen_before = ProcessingService.objects.get(pk=sync_service.pk).last_seen + + before = datetime.datetime.now() - datetime.timedelta(seconds=1) + update_async_services_seen_for_project(self.project.pk) + + self.service.refresh_from_db() + other_async.refresh_from_db() + sync_service.refresh_from_db() + + self.assertTrue(self.service.last_seen_live) + self.assertIsNotNone(self.service.last_seen) + self.assertGreaterEqual(self.service.last_seen, before) + self.assertTrue(other_async.last_seen_live) + # Sync services (with endpoint URL) are not touched by this task — last_seen + # may be set by the creation-time get_status() ping, but should be unchanged + # after the task runs. + self.assertEqual(sync_service.last_seen, sync_last_seen_before) diff --git a/ami/jobs/views.py b/ami/jobs/views.py index ec9d64481..44516ca14 100644 --- a/ami/jobs/views.py +++ b/ami/jobs/views.py @@ -16,6 +16,7 @@ from rest_framework.filters import BaseFilterBackend from rest_framework.response import Response +from ami.base.pagination import LimitOffsetPaginationWithPermissions from ami.base.permissions import ObjectPermission from ami.base.views import ProjectMixin from ami.jobs.schemas import ids_only_param, incomplete_only_param @@ -25,7 +26,13 @@ MLJobTasksRequestSerializer, MLJobTasksResponseSerializer, ) -from ami.jobs.tasks import HEARTBEAT_THROTTLE_SECONDS, process_nats_pipeline_result, update_pipeline_pull_services_seen +from ami.jobs.tasks import ( + HEARTBEAT_THROTTLE_SECONDS, + process_nats_pipeline_result, + update_async_services_seen_for_pipelines, + update_async_services_seen_for_project, + update_pipeline_pull_services_seen, +) from ami.main.api.schemas import project_id_doc_param from ami.main.api.views import DefaultViewSet from ami.utils.fields import url_boolean_param @@ -51,6 +58,54 @@ def _actor_log_context(request) -> tuple[str, str | None]: return user_desc, token_fingerprint +def _mark_async_services_seen_for_pipelines(pipeline_slugs: tuple[str, ...]) -> None: + """ + Redis-throttled wrapper around the ``update_async_services_seen_for_pipelines`` + celery task. The wrapper does no DB work itself — it gates dispatch so at + most one heartbeat is enqueued per sorted slug set per + ``HEARTBEAT_THROTTLE_SECONDS`` window (currently 30s), keeping the HTTP + request path cheap under concurrent polling. + + Called from the ``?ids_only=1`` branch of ``JobViewSet.list()`` — the real + ADC worker shape, which sends ``pipeline__slug__in=`` and no + ``project_id`` (one worker may serve pipelines across many projects and + has no single project to nominate). + """ + if not pipeline_slugs: + return + cache_key = "heartbeat:list:pipelines:" + ",".join(sorted(pipeline_slugs)) + if not cache.add(cache_key, 1, timeout=HEARTBEAT_THROTTLE_SECONDS): + return + try: + update_async_services_seen_for_pipelines.delay(list(pipeline_slugs)) + except (kombu.exceptions.KombuError, ConnectionError, OSError) as exc: + logger.warning(f"Failed to enqueue non-critical pipeline-slug heartbeat: {exc}") + + +def _mark_async_services_seen_for_project(project_id: int) -> None: + """ + Redis-throttled wrapper around ``update_async_services_seen_for_project``. + Same shape as ``_mark_async_services_seen_for_pipelines`` above — gates + celery dispatch to at most one per-project enqueue per + ``HEARTBEAT_THROTTLE_SECONDS`` window — but keyed by project id for + callers that send ``?project_id=`` without ``pipeline__slug__in``. + + The ADC worker does not currently use this shape, so this is a fallback. + Background on why idle-poll heartbeats exist at all: the other heartbeat + (``_mark_pipeline_pull_services_seen``) only fires from ``/tasks/`` and + ``/result/`` — i.e., from workers with active work — so a worker sitting + on ``GET /jobs/?ids_only=1`` between jobs would otherwise age past + ``PROCESSING_SERVICE_LAST_SEEN_MAX`` and flip to offline in the UI. + """ + cache_key = f"heartbeat:list:project:{project_id}" + if not cache.add(cache_key, 1, timeout=HEARTBEAT_THROTTLE_SECONDS): + return + try: + update_async_services_seen_for_project.delay(project_id) + except (kombu.exceptions.KombuError, ConnectionError, OSError) as exc: + logger.warning(f"Failed to enqueue non-critical project heartbeat for project {project_id}: {exc}") + + def _mark_pipeline_pull_services_seen(job: "Job") -> None: """ Enqueue a fire-and-forget heartbeat for async (pull-mode) processing services @@ -248,10 +303,58 @@ def get_queryset(self) -> QuerySet: ) # Filter out completed jobs that have not been updated in the last X hours cutoff_datetime = timezone.now() - timezone.timedelta(hours=cutoff_hours) - return jobs.exclude( + jobs = jobs.exclude( status=JobState.failed_states(), updated_at__lt=cutoff_datetime, ) + # ⚠️ TEMPORARY HACK — remove by 2026-04-24. + # Worker-polling call path (`ids_only=1`): randomize order so concurrent + # pollers don't all converge on the same head-of-queue job. An + # `updated_at`-based sort has a degenerate case at startup — freshly + # queued jobs all share near-identical timestamps, tie-broken by `pk`, + # so simultaneous polls deterministically pick the same job. Random + # ordering gives probabilistic disjoint assignment without writing a + # poll-stamp column. Combined with `limit=1` below, each poll is an + # independent "pick any unfinished job" draw. + # + # The whole `ids_only=1` branch (this ordering override, the paginator + # override in `paginator` below, the heartbeat dispatch in `list()`) + # exists because the ADC worker currently repurposes this list endpoint + # as a claim-next-job call. Correct shape is a dedicated `/next` action + # (tracked as #1265). Once `/next` ships + # and ADC is migrated, delete this `order_by("?")` override along with + # the paginator override and the list() heartbeat branch. + if self.action == "list" and url_boolean_param(self.request, "ids_only", default=False): + jobs = jobs.order_by("?") + return jobs + + @property + def paginator(self): + # ⚠️ TEMPORARY HACK — remove by 2026-04-24. + # Treat `?ids_only=1` as a pop()-style handoff ("what job is next?") + # rather than a list() dump: default to one job per response unless the + # caller explicitly asks for a batch via ?limit=N or ?page_size=N. + # Concurrent pollers drain a cached list serially and starve later jobs; + # forcing a re-poll per job lets the random-shuffle fairness sort rotate + # work across jobs every iteration. No ADC-side change required. + # + # This override exists only because `list(ids_only=True)` is being used + # as a claim-next-job call. Replace with a dedicated `/next` action + # (tracked as #1265); once ADC is migrated, + # drop this override so the list endpoint goes back to normal pagination. + if not hasattr(self, "_paginator"): + if ( + self.action == "list" + and url_boolean_param(self.request, "ids_only", default=False) + and "limit" not in self.request.query_params + and "page_size" not in self.request.query_params + ): + paginator = LimitOffsetPaginationWithPermissions() + paginator.default_limit = 1 + self._paginator = paginator + else: + self._paginator = self.pagination_class() if self.pagination_class is not None else None + return self._paginator @extend_schema( parameters=[ @@ -261,6 +364,27 @@ def get_queryset(self) -> QuerySet: ] ) def list(self, request, *args, **kwargs): + # ⚠️ TEMPORARY HACK — remove by 2026-04-24. + # Worker-polling call path: record heartbeat for async processing services. + # The real ADC worker request carries ``pipeline__slug__in=...`` and no + # project_id, so prefer the pipeline-slug scope when those slugs are + # present; fall back to project scope for callers that pass ?project_id=. + # Throttled via Redis so concurrent pollers don't churn the DB/broker. + # + # This heartbeat branch lives on `list()` only because `list(ids_only=1)` + # is doubling as the worker's claim-next-job endpoint. Once a dedicated + # `/next` action ships (tracked as #1265) + # and ADC is migrated to it, move the heartbeat to that action and + # delete this branch — `list()` should go back to being a plain list. + if url_boolean_param(request, "ids_only", default=False): + pipeline_slugs_raw = request.query_params.get("pipeline__slug__in") + if pipeline_slugs_raw: + slugs = tuple(s for s in (p.strip() for p in pipeline_slugs_raw.split(",")) if s) + _mark_async_services_seen_for_pipelines(slugs) + else: + project = self.get_active_project() + if project is not None: + _mark_async_services_seen_for_project(project.pk) return super().list(request, *args, **kwargs) @extend_schema( diff --git a/config/settings/base.py b/config/settings/base.py index 3e13275a8..eabb593d7 100644 --- a/config/settings/base.py +++ b/config/settings/base.py @@ -319,6 +319,13 @@ def _celery_result_backend_url(redis_url): # NATS # ------------------------------------------------------------------------------ NATS_URL = env("NATS_URL", default="nats://localhost:4222") # type: ignore[no-untyped-call] +# How long a worker has to ack a dispatched task before NATS redelivers it. +# Must exceed worst-case task duration: S3 download + GPU cold-start + batch +# inference + synchronous POST of results. With 24 MB images and batch sizes +# of 8-24, anything under a minute caused spurious redeliveries under load; +# 5 minutes gives cold-start GPU pipelines headroom without holding tasks +# invisibly long after a genuine worker crash (bounded by max_deliver). +NATS_TASK_TTR = env.int("NATS_TASK_TTR", default=300) # ADMIN # ------------------------------------------------------------------------------ @@ -568,3 +575,15 @@ def _celery_result_backend_url(redis_url): # Default taxa filters DEFAULT_INCLUDE_TAXA = env.list("DEFAULT_INCLUDE_TAXA", default=[]) # type: ignore[no-untyped-call] DEFAULT_EXCLUDE_TAXA = env.list("DEFAULT_EXCLUDE_TAXA", default=[]) # type: ignore[no-untyped-call] + +# Purpose: master switch for per-job logs in the database and UI. +# Set to False to disable them as a contention escape hatch. +# +# When True, ``JobLogHandler.emit`` persists each log line to ``jobs_job.logs`` +# (JSONB column) so the per-job log feed in the UI stays populated. When False, +# log lines go to the container stdout logger only — used as an escape hatch +# under concurrent async_api load where the per-record UPDATE on ``jobs_job.logs`` +# becomes a row-lock contention point (see issue #1256, PR #1261). Default True +# preserves existing behavior; deployments seeing contention can set to False +# until the append-only ``JobLog`` child table (PR #1259) is in place. +JOB_LOG_PERSIST_ENABLED = env.bool("JOB_LOG_PERSIST_ENABLED", default=True) # type: ignore[no-untyped-call] diff --git a/docs/claude/debugging/row-lock-contention-reproduction.md b/docs/claude/debugging/row-lock-contention-reproduction.md new file mode 100644 index 000000000..6946549c2 --- /dev/null +++ b/docs/claude/debugging/row-lock-contention-reproduction.md @@ -0,0 +1,173 @@ +# Reproducing the `jobs_job` row-lock contention locally + +Runbook for reproducing, on a local dev stack, the row-lock contention that +affects concurrent `async_api` ML jobs. Context: issue #1256, PR #1261, and +PR #1259 (complementary `JobLog` table refactor). + +**Why this matters.** Naive repro attempts with a `curl` loop that fires one +result per POST (`{"results": [{...}]}`) do NOT trigger the pathology. They +only exercise the worker-side `select_for_update` path, which is fixed once +PR #1261 lands. The dominant remaining bottleneck is per-result logging +inside `ATOMIC_REQUESTS` — to see it locally you need **batched POSTs** that +match the real ADC shape (`AMI_LOCALIZATION_BATCH_SIZE=4`, +`AMI_CLASSIFICATION_BATCH_SIZE=150`). + +## The pathology + +Two mutating paths UPDATE the `jobs_job` row for every log line written via +`job.logger.info(...)`: + +1. **View path** (`ami/jobs/views.py` — `result` and `tasks` actions): the + per-iteration `job.logger.info("Queued pipeline result: ...")` inside the + POST body loop runs under `ATOMIC_REQUESTS`. A single batched POST with N + results therefore stacks N UPDATEs on `jobs_job.logs` inside one tx that + doesn't commit until the view returns. Every other writer on the same row + (other worker tasks, other POST handlers) blocks behind it. +2. **Worker path** (`ami/jobs/tasks.py` — `_update_job_progress`): each + `process_nats_pipeline_result` celery task calls `_update_job_progress`, + which emits its own log lines, each triggering another UPDATE on the same + row. + +The smoking gun in `pg_stat_activity`: + +- Root blocker: a backend `state = idle in transaction`, last query + `UPDATE "jobs_job" SET "logs" = ...`, held for many seconds. +- Waiters: dozens of backends with `wait_event_type = Lock`, + `wait_event = tuple` or `transactionid`, all on the same row. + +## Prereqs + +- Local antenna stack up via the standard dev compose + (`docker compose up -d`) with postgres, redis, rabbitmq, nats, django, + celeryworker, and celeryworker_ml healthy. +- A job in a running state (any `async_api` job with `status = STARTED` will + do — the view accepts results regardless of whether real tasks exist). +- An auth token for a user with permission to POST to + `/api/v2/jobs/{id}/result/`. +- Python 3.10+ on the host (the load-test script uses stdlib only). + +## Scripts + +- `scripts/load_test_result_endpoint.py` — fires concurrent batched POSTs. +- `ami/jobs/management/commands/chaos_monkey.py` — adjacent tooling for + `async_api` chaos scenarios; covered in `chaos-scenarios.md`. + +## Step-by-step + +### 1. Grab an auth token and a target job + +From a shell on the host: + +```bash +docker compose exec -T django python manage.py shell <<'PY' +from rest_framework.authtoken.models import Token +from ami.users.models import User +from ami.jobs.models import Job + +u = User.objects.filter(is_staff=True).first() +t, _ = Token.objects.get_or_create(user=u) +print("TOKEN=", t.key) + +j = Job.objects.filter(status="STARTED", dispatch_mode="async_api").first() +if j is None: + # Any running job works — create one if there isn't one. + # Adjust project/collection/pipeline PKs to your local data. + print("No running async_api job found; create one via the UI or shell.") +else: + print("JOB_ID=", j.pk) +PY +``` + +If no running job exists, create one with whatever project/collection/pipeline +are seeded locally. The view does not need real tasks queued behind the +job — it only needs the job row to accept result POSTs. + +### 2. Fire batched POSTs + +```bash +export ANTENNA_TOKEN= +python scripts/load_test_result_endpoint.py \ + --batch 50 --concurrency 10 --rounds 3 +``` + +`--batch 50` puts 50 `PipelineResultsError` entries in each POST body. Any +batch size >1 will stack UPDATEs; 50 is a comfortable reproduction size +because it makes each POST's tx hold long enough for others to pile up. +`--concurrency 10` fires 10 parallel POSTs per wave. `--rounds 3` fires +three back-to-back waves. + +### 3. Monitor Postgres during the test + +In a second shell: + +```bash +docker exec psql -U -d <<'SQL' +-- Scalars +SELECT count(*) AS idle_in_tx + FROM pg_stat_activity + WHERE datname = current_database() AND state = 'idle in transaction'; + +SELECT count(*) AS blocker_chain + FROM pg_stat_activity blocked + JOIN pg_stat_activity blocking + ON blocking.pid = ANY(pg_blocking_pids(blocked.pid)) + WHERE blocked.wait_event_type = 'Lock' + AND blocked.datname = current_database(); + +-- Top offenders +SELECT state, wait_event, + substring(query, 1, 80), + EXTRACT(EPOCH FROM now() - xact_start) AS xact_age_s + FROM pg_stat_activity + WHERE datname = current_database() + AND state != 'idle' + AND (state = 'idle in transaction' OR wait_event_type = 'Lock') + ORDER BY xact_start NULLS LAST + LIMIT 20; +SQL +``` + +### 4. Before/after signatures + +Measured on a local dev stack with WEB_CONCURRENCY=1 (gunicorn default) and +8 celery ML-fork workers, batch=50, concurrency=10. + +| Signal | PR #1261 only (`JOB_LOG_PERSIST_ENABLED=true`) | PR #1261 + flag off (`JOB_LOG_PERSIST_ENABLED=false`) | +|---|---|---| +| `blocker_chain` count | 30+ | 0–1 (transient) | +| `idle_in_tx` count | 8–10 | 0 | +| Root-blocker query | `UPDATE jobs_job SET logs = ...` held 2–60s | transient `SELECT`s only | +| POST success (10 concurrent × 50-result batch, 120s timeout) | 0/10 (all timeout) | 10/10 | +| p95 POST latency | 120s+ | ~5s | + +## The feature flag + +Setting `JOB_LOG_PERSIST_ENABLED=false` (env var on the Django container) +causes `JobLogHandler.emit` to write only to the container stdout logger and +skip the per-record UPDATE on `jobs_job.logs`. The per-job UI log feed +stops receiving new entries while the flag is off; container stdout still +captures everything. + +Default is `true` — existing deployments keep their current behavior. The +flag is intended as a time-bounded escape hatch until the append-only +`JobLog` child table from PR #1259 is in place. + +To test the flag locally, append `JOB_LOG_PERSIST_ENABLED=false` to the +django env file used by your compose (e.g. `.envs/.local/.django`) and +recreate the django container (`docker compose up -d --force-recreate +django`). Verify from a shell: + +```bash +docker compose exec -T django python -c \ + "from django.conf import settings; print(settings.JOB_LOG_PERSIST_ENABLED)" +``` + +## Related + +- Issue #1256 — full contention analysis with path breakdown. +- PR #1261 — drops `select_for_update` in `_update_job_progress`; adds the + `JOB_LOG_PERSIST_ENABLED` flag; this runbook. +- PR #1259 — append-only `JobLog` child table. When merged, the flag can be + removed in favor of a cutover to the new write path. +- `docs/claude/debugging/chaos-scenarios.md` — adjacent chaos tooling for + NATS redelivery and retry-path validation. diff --git a/scripts/load_test_result_endpoint.py b/scripts/load_test_result_endpoint.py new file mode 100644 index 000000000..dc9a6bd88 --- /dev/null +++ b/scripts/load_test_result_endpoint.py @@ -0,0 +1,129 @@ +#!/usr/bin/env python3 +"""Fire concurrent batched POSTs against ``POST /api/v2/jobs/{id}/result/``. + +Reproduces the row-lock contention pathology described in +``docs/claude/debugging/row-lock-contention-reproduction.md``. Each POST body +contains N fake ``PipelineResultsError`` entries so the per-result +``job.logger.info(...)`` call inside ``ATOMIC_REQUESTS`` stacks N UPDATEs on +``jobs_job.logs`` in a single view transaction — the shape real ADC workers +produce (``AMI_LOCALIZATION_BATCH_SIZE=4``, ``AMI_CLASSIFICATION_BATCH_SIZE=150``). + +A single-result-per-POST loop does NOT reproduce the contention. Batching +is load-bearing. + +Usage: + + export ANTENNA_TOKEN=... + python scripts/load_test_result_endpoint.py \\ + [--batch 50] [--concurrency 10] [--rounds 3] \\ + [--host http://localhost:8000] + +Dependencies: Python 3.10+, stdlib only. +""" +import argparse +import concurrent.futures +import json +import os +import sys +import time +import urllib.error +import urllib.parse +import urllib.request +import uuid + + +def make_body(batch_size: int, prefix: str) -> bytes: + results = [ + { + "reply_subject": f"{prefix}.r{i}.{uuid.uuid4().hex[:8]}", + "result": {"error": "load-test", "image_id": f"img-{prefix}-{i}"}, + } + for i in range(batch_size) + ] + return json.dumps({"results": results}).encode() + + +def fire_one(url: str, token: str, body: bytes, idx: int) -> tuple[int, int, float, str]: + req = urllib.request.Request( + url, + data=body, + headers={"Authorization": f"Token {token}", "Content-Type": "application/json"}, + method="POST", + ) + t0 = time.time() + try: + with urllib.request.urlopen(req, timeout=120) as resp: + return (idx, resp.status, time.time() - t0, "") + except urllib.error.HTTPError as e: + return (idx, e.code, time.time() - t0, f"HTTPError: {e.reason}") + except urllib.error.URLError as e: + return (idx, -1, time.time() - t0, f"URLError: {e.reason}") + except Exception as e: + return (idx, -1, time.time() - t0, f"{type(e).__name__}: {e}") + + +def _positive_int(value: str) -> int: + try: + parsed = int(value) + except ValueError as e: + raise argparse.ArgumentTypeError(f"must be an integer, got {value!r}") from e + if parsed <= 0: + raise argparse.ArgumentTypeError(f"must be > 0, got {parsed}") + return parsed + + +def _http_url(value: str) -> str: + parsed = urllib.parse.urlparse(value) + if parsed.scheme not in {"http", "https"} or not parsed.netloc: + raise argparse.ArgumentTypeError(f"must be an http(s) URL, got {value!r}") + return value.rstrip("/") + + +def main(): + ap = argparse.ArgumentParser(description=__doc__.splitlines()[0]) + ap.add_argument("job_id", type=_positive_int, help="Target Job.pk (must be in a running state)") + ap.add_argument( + "--token", + default=os.environ.get("ANTENNA_TOKEN"), + help="DRF auth Token (default: $ANTENNA_TOKEN). Prefer the env var to avoid shell-history leakage.", + ) + ap.add_argument("--batch", type=_positive_int, default=50, help="results per POST body (default 50)") + ap.add_argument("--concurrency", type=_positive_int, default=10, help="parallel POSTs per round (default 10)") + ap.add_argument("--rounds", type=_positive_int, default=3, help="how many waves to fire (default 3)") + ap.add_argument( + "--host", type=_http_url, default="http://localhost:8000", help="API host (default localhost:8000)" + ) + args = ap.parse_args() + + if not args.token: + ap.error("no token provided: set ANTENNA_TOKEN env var or pass --token") + + url = f"{args.host}/api/v2/jobs/{args.job_id}/result/" + # Never print the token — just echo the request shape. + print(f"url={url} batch={args.batch} concurrency={args.concurrency} rounds={args.rounds}") + + t_start = time.time() + for round_idx in range(args.rounds): + with concurrent.futures.ThreadPoolExecutor(max_workers=args.concurrency) as ex: + futures = [ + ex.submit(fire_one, url, args.token, make_body(args.batch, f"r{round_idx}_{i}"), i) + for i in range(args.concurrency) + ] + results = [f.result() for f in concurrent.futures.as_completed(futures)] + good = sum(1 for _, s, _, _ in results if s == 200) + latencies = sorted([lat for _, _, lat, _ in results]) + p50 = latencies[len(latencies) // 2] + p95 = latencies[int(len(latencies) * 0.95)] + print( + f"round {round_idx}: ok={good}/{args.concurrency} " + f"p50={p50:.2f}s p95={p95:.2f}s elapsed={time.time() - t_start:.1f}s" + ) + errors = [(idx, status, err) for idx, status, _, err in results if err] + for idx, status, err in errors[:5]: + print(f" err[{idx}] status={status} {err}", file=sys.stderr) + if len(errors) > 5: + print(f" ...{len(errors) - 5} more errors suppressed", file=sys.stderr) + + +if __name__ == "__main__": + main()