Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion ami/jobs/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import pydantic
from celery import uuid
from celery.result import AsyncResult
from django.conf import settings
from django.db import models, transaction
from django.utils.text import slugify
from django_pydantic_field import SchemaField
Expand Down Expand Up @@ -333,9 +334,21 @@ def __init__(self, job: "Job", *args, **kwargs):
super().__init__(*args, **kwargs)

def emit(self, record: logging.LogRecord):
# Log to the current app logger
# Log to the current app logger (container stdout).
logger.log(record.levelno, self.format(record))

# Gated by ``JOB_LOG_PERSIST_ENABLED`` (default True). Persisting every
# log line to ``jobs_job.logs`` becomes a row-lock contention point
# under concurrent async_api load — each call triggers
# ``UPDATE jobs_job SET logs = ...`` on the shared job row, and inside
# ``ATOMIC_REQUESTS`` a single batched ``/result`` POST stacks N such
# UPDATEs in one tx, blocking every ML worker on the same row for the
# duration of the request. Deployments hitting that pattern can set the
# flag to False to short-circuit here until PR #1259 lands an
# append-only ``JobLog`` child table. See issue #1256.
if not getattr(settings, "JOB_LOG_PERSIST_ENABLED", True):
return

# Write to the logs field on the job instance.
# Refresh from DB first to reduce the window for concurrent overwrites — each
# worker holds its own stale in-memory copy of `logs`, so without a refresh the
Expand Down
13 changes: 12 additions & 1 deletion ami/jobs/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -489,8 +489,19 @@ def _update_job_progress(
) -> None:
from ami.jobs.models import Job, JobState # avoid circular import

# NOTE: Previously this used `select_for_update()` inside `transaction.atomic()`
# to serialize concurrent progress updates for the same job. Under concurrent
# async_api result processing that serialization became a bottleneck: every
# ML result task queued a contending exclusive lock on the `jobs_job` row,
# stacking behind gunicorn view threads also holding the row under
# ATOMIC_REQUESTS. The `max()` guard below still prevents progress regression
# between concurrent workers; the trade-off is that accumulated counts
# (detections/classifications/captures) can drift by one batch under race —
# cosmetic only, since the underlying `Detection`/`Classification` rows are
# written authoritatively by `save_results` before this function runs.
# See docs/claude/planning/jobs-row-lock-remediation.md and issue #1256.
Comment thread
mihow marked this conversation as resolved.
Outdated
with transaction.atomic():
job = Job.objects.select_for_update().get(pk=job_id)
job = Job.objects.get(pk=job_id)
Comment thread
mihow marked this conversation as resolved.
Comment thread
coderabbitai[bot] marked this conversation as resolved.

# For results stage, accumulate detections/classifications/captures counts
if stage == "results":
Expand Down
9 changes: 9 additions & 0 deletions config/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -568,3 +568,12 @@ def _celery_result_backend_url(redis_url):
# Default taxa filters
DEFAULT_INCLUDE_TAXA = env.list("DEFAULT_INCLUDE_TAXA", default=[]) # type: ignore[no-untyped-call]
DEFAULT_EXCLUDE_TAXA = env.list("DEFAULT_EXCLUDE_TAXA", default=[]) # type: ignore[no-untyped-call]

# When True, ``JobLogHandler.emit`` persists each log line to ``jobs_job.logs``
Comment thread
mihow marked this conversation as resolved.
# (JSONB column) so the per-job log feed in the UI stays populated. When False,
# log lines go to the container stdout logger only — used as an escape hatch
# under concurrent async_api load where the per-record UPDATE on ``jobs_job.logs``
# becomes a row-lock contention point (see issue #1256, PR #1261). Default True
# preserves existing behavior; deployments seeing contention can set to False
# until the append-only ``JobLog`` child table (PR #1259) is in place.
JOB_LOG_PERSIST_ENABLED = env.bool("JOB_LOG_PERSIST_ENABLED", default=True) # type: ignore[no-untyped-call]
172 changes: 172 additions & 0 deletions docs/claude/debugging/row-lock-contention-reproduction.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
# Reproducing the `jobs_job` row-lock contention locally

Runbook for reproducing, on a local dev stack, the row-lock contention that
affects concurrent `async_api` ML jobs. Context: issue #1256, PR #1261, and
PR #1259 (complementary `JobLog` table refactor).

**Why this matters.** Naive repro attempts with a `curl` loop that fires one
result per POST (`{"results": [{...}]}`) do NOT trigger the pathology. They
only exercise the worker-side `select_for_update` path, which is fixed once
PR #1261 lands. The dominant remaining bottleneck is per-result logging
inside `ATOMIC_REQUESTS` — to see it locally you need **batched POSTs** that
match the real ADC shape (`AMI_LOCALIZATION_BATCH_SIZE=4`,
`AMI_CLASSIFICATION_BATCH_SIZE=150`).

## The pathology

Two mutating paths UPDATE the `jobs_job` row for every log line written via
`job.logger.info(...)`:

1. **View path** (`ami/jobs/views.py` — `result` and `tasks` actions): the
per-iteration `job.logger.info("Queued pipeline result: ...")` inside the
POST body loop runs under `ATOMIC_REQUESTS`. A single batched POST with N
results therefore stacks N UPDATEs on `jobs_job.logs` inside one tx that
doesn't commit until the view returns. Every other writer on the same row
(other worker tasks, other POST handlers) blocks behind it.
2. **Worker path** (`ami/jobs/tasks.py` — `_update_job_progress`): each
`process_nats_pipeline_result` celery task calls `_update_job_progress`,
which emits its own log lines, each triggering another UPDATE on the same
row.

The smoking gun in `pg_stat_activity`:

- Root blocker: a backend `state = idle in transaction`, last query
`UPDATE "jobs_job" SET "logs" = ...`, held for many seconds.
- Waiters: dozens of backends with `wait_event_type = Lock`,
`wait_event = tuple` or `transactionid`, all on the same row.

## Prereqs

- Local antenna stack up via the standard dev compose
(`docker compose up -d`) with postgres, redis, rabbitmq, nats, django,
celeryworker, and celeryworker_ml healthy.
- A job in a running state (any `async_api` job with `status = STARTED` will
do — the view accepts results regardless of whether real tasks exist).
- An auth token for a user with permission to POST to
`/api/v2/jobs/{id}/result/`.
- Python 3.10+ on the host (the load-test script uses stdlib only).

## Scripts

- `scripts/load_test_result_endpoint.py` — fires concurrent batched POSTs.
- `ami/jobs/management/commands/chaos_monkey.py` — adjacent tooling for
`async_api` chaos scenarios; covered in `chaos-scenarios.md`.

## Step-by-step

### 1. Grab an auth token and a target job

From a shell on the host:

```bash
docker compose exec -T django python manage.py shell <<'PY'
from rest_framework.authtoken.models import Token
from ami.users.models import User
from ami.jobs.models import Job

u = User.objects.filter(is_staff=True).first()
t, _ = Token.objects.get_or_create(user=u)
print("TOKEN=", t.key)

j = Job.objects.filter(status="STARTED", dispatch_mode="async_api").first()
if j is None:
# Any running job works — create one if there isn't one.
# Adjust project/collection/pipeline PKs to your local data.
print("No running async_api job found; create one via the UI or shell.")
else:
print("JOB_ID=", j.pk)
PY
```

If no running job exists, create one with whatever project/collection/pipeline
are seeded locally. The view does not need real tasks queued behind the
job — it only needs the job row to accept result POSTs.

### 2. Fire batched POSTs

```bash
python scripts/load_test_result_endpoint.py <JOB_ID> <TOKEN> \
--batch 50 --concurrency 10 --rounds 3
```

`--batch 50` puts 50 `PipelineResultsError` entries in each POST body. Any
batch size >1 will stack UPDATEs; 50 is a comfortable reproduction size
because it makes each POST's tx hold long enough for others to pile up.
`--concurrency 10` fires 10 parallel POSTs per wave. `--rounds 3` fires
three back-to-back waves.

### 3. Monitor Postgres during the test

In a second shell:

```bash
docker exec <postgres-container> psql -U <user> -d <db> <<'SQL'
-- Scalars
SELECT count(*) AS idle_in_tx
FROM pg_stat_activity
WHERE datname = current_database() AND state = 'idle in transaction';

SELECT count(*) AS blocker_chain
FROM pg_stat_activity blocked
JOIN pg_stat_activity blocking
ON blocking.pid = ANY(pg_blocking_pids(blocked.pid))
WHERE blocked.wait_event_type = 'Lock'
AND blocked.datname = current_database();

-- Top offenders
SELECT state, wait_event,
substring(query, 1, 80),
EXTRACT(EPOCH FROM now() - xact_start) AS xact_age_s
FROM pg_stat_activity
WHERE datname = current_database()
AND state != 'idle'
AND (state = 'idle in transaction' OR wait_event_type = 'Lock')
ORDER BY xact_start NULLS LAST
LIMIT 20;
SQL
```

### 4. Before/after signatures

Measured on a local dev stack with WEB_CONCURRENCY=1 (gunicorn default) and
8 celery ML-fork workers, batch=50, concurrency=10.

| Signal | PR #1261 only (`JOB_LOG_PERSIST_ENABLED=true`) | PR #1261 + flag off (`JOB_LOG_PERSIST_ENABLED=false`) |
|---|---|---|
| `blocker_chain` count | 30+ | 0–1 (transient) |
| `idle_in_tx` count | 8–10 | 0 |
| Root-blocker query | `UPDATE jobs_job SET logs = ...` held 2–60s | transient `SELECT`s only |
| POST success (10 concurrent × 50-result batch, 120s timeout) | 0/10 (all timeout) | 10/10 |
| p95 POST latency | 120s+ | ~5s |

## The feature flag

Setting `JOB_LOG_PERSIST_ENABLED=false` (env var on the Django container)
causes `JobLogHandler.emit` to write only to the container stdout logger and
skip the per-record UPDATE on `jobs_job.logs`. The per-job UI log feed
stops receiving new entries while the flag is off; container stdout still
captures everything.

Default is `true` — existing deployments keep their current behavior. The
flag is intended as a time-bounded escape hatch until the append-only
`JobLog` child table from PR #1259 is in place.

To test the flag locally, append `JOB_LOG_PERSIST_ENABLED=false` to the
django env file used by your compose (e.g. `.envs/.local/.django`) and
recreate the django container (`docker compose up -d --force-recreate
django`). Verify from a shell:

```bash
docker compose exec -T django python -c \
"from django.conf import settings; print(settings.JOB_LOG_PERSIST_ENABLED)"
```

## Related

- Issue #1256 — full contention analysis with path breakdown.
- PR #1261 — drops `select_for_update` in `_update_job_progress`; adds the
`JOB_LOG_PERSIST_ENABLED` flag; this runbook.
- PR #1259 — append-only `JobLog` child table. When merged, the flag can be
removed in favor of a cutover to the new write path.
- `docs/claude/debugging/chaos-scenarios.md` — adjacent chaos tooling for
NATS redelivery and retry-path validation.
91 changes: 91 additions & 0 deletions scripts/load_test_result_endpoint.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
#!/usr/bin/env python3
"""Fire concurrent batched POSTs against ``POST /api/v2/jobs/{id}/result/``.

Reproduces the row-lock contention pathology described in
``docs/claude/debugging/row-lock-contention-reproduction.md``. Each POST body
contains N fake ``PipelineResultsError`` entries so the per-result
``job.logger.info(...)`` call inside ``ATOMIC_REQUESTS`` stacks N UPDATEs on
``jobs_job.logs`` in a single view transaction — the shape real ADC workers
produce (``AMI_LOCALIZATION_BATCH_SIZE=4``, ``AMI_CLASSIFICATION_BATCH_SIZE=150``).

A single-result-per-POST loop does NOT reproduce the contention. Batching
is load-bearing.

Usage:

python scripts/load_test_result_endpoint.py <job_id> <token> \\
[--batch 50] [--concurrency 10] [--rounds 3] \\
[--host http://localhost:8000]
Comment thread
coderabbitai[bot] marked this conversation as resolved.

Dependencies: Python 3.10+, stdlib only.
"""
import argparse
import concurrent.futures
import json
import time
import urllib.error
import urllib.request
import uuid


def make_body(batch_size: int, prefix: str) -> bytes:
results = [
{
"reply_subject": f"{prefix}.r{i}.{uuid.uuid4().hex[:8]}",
"result": {"error": "load-test", "image_id": f"img-{prefix}-{i}"},
}
for i in range(batch_size)
]
return json.dumps({"results": results}).encode()


def fire_one(url: str, token: str, body: bytes, idx: int) -> tuple[int, int, float]:
req = urllib.request.Request(
url,
data=body,
headers={"Authorization": f"Token {token}", "Content-Type": "application/json"},
method="POST",
)
t0 = time.time()
try:
with urllib.request.urlopen(req, timeout=120) as resp:
return (idx, resp.status, time.time() - t0)
except urllib.error.HTTPError as e:
return (idx, e.code, time.time() - t0)
except Exception:
return (idx, -1, time.time() - t0)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated


def main():
ap = argparse.ArgumentParser(description=__doc__.splitlines()[0])
ap.add_argument("job_id", type=int, help="Target Job.pk (must be in a running state)")
ap.add_argument("token", help="DRF auth Token for a user with result-POST permission")
ap.add_argument("--batch", type=int, default=50, help="results per POST body (default 50)")
ap.add_argument("--concurrency", type=int, default=10, help="parallel POSTs per round (default 10)")
ap.add_argument("--rounds", type=int, default=3, help="how many waves to fire (default 3)")
ap.add_argument("--host", default="http://localhost:8000", help="API host (default localhost:8000)")
args = ap.parse_args()

url = f"{args.host}/api/v2/jobs/{args.job_id}/result/"
print(f"url={url} batch={args.batch} concurrency={args.concurrency} rounds={args.rounds}")

t_start = time.time()
for round_idx in range(args.rounds):
with concurrent.futures.ThreadPoolExecutor(max_workers=args.concurrency) as ex:
futures = [
ex.submit(fire_one, url, args.token, make_body(args.batch, f"r{round_idx}_{i}"), i)
for i in range(args.concurrency)
]
results = [f.result() for f in concurrent.futures.as_completed(futures)]
good = sum(1 for _, s, _ in results if s == 200)
latencies = sorted([lat for _, _, lat in results])
p50 = latencies[len(latencies) // 2]
p95 = latencies[int(len(latencies) * 0.95)]
print(
f"round {round_idx}: ok={good}/{args.concurrency} "
f"p50={p50:.2f}s p95={p95:.2f}s elapsed={time.time() - t_start:.1f}s"
)


if __name__ == "__main__":
main()
Loading