diff --git a/ami/jobs/migrations/0021_joblog.py b/ami/jobs/migrations/0021_joblog.py new file mode 100644 index 000000000..f88fcd4b3 --- /dev/null +++ b/ami/jobs/migrations/0021_joblog.py @@ -0,0 +1,32 @@ +from django.db import migrations, models +import django.db.models.deletion + + +class Migration(migrations.Migration): + dependencies = [ + ("jobs", "0020_schedule_job_monitoring_beat_tasks"), + ] + + operations = [ + migrations.CreateModel( + name="JobLog", + fields=[ + ("id", models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), + ("created_at", models.DateTimeField(auto_now_add=True)), + ("updated_at", models.DateTimeField(auto_now=True)), + ("level", models.CharField(max_length=20)), + ("message", models.TextField()), + ("context", models.JSONField(blank=True, default=dict)), + ( + "job", + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, related_name="log_entries", to="jobs.job" + ), + ), + ], + options={ + "ordering": ["-created_at", "-pk"], + "indexes": [models.Index(fields=["job", "-created_at"], name="jobs_joblog_job_id_e4aa59_idx")], + }, + ), + ] diff --git a/ami/jobs/models.py b/ami/jobs/models.py index 4111034f8..ce854eb57 100644 --- a/ami/jobs/models.py +++ b/ami/jobs/models.py @@ -322,6 +322,31 @@ class JobLogs(pydantic.BaseModel): stderr: list[str] = pydantic.Field(default_factory=list, alias="stderr", title="Error messages") +class JobLog(BaseModel): + """Append-only per-job log row. + + Replaces the ``jobs_job.logs`` JSON-field UPDATE path that caused row-lock + contention under concurrent async_api load (issue #1256). Each log emit + becomes a cheap INSERT on this child table instead of a refresh+UPDATE of + the shared parent row. Legacy JSON-field logs are still served by the + serializer for jobs created before this table existed. + """ + + project_accessor = "job__project" + + job = models.ForeignKey("Job", on_delete=models.CASCADE, related_name="log_entries") + level = models.CharField(max_length=20) + message = models.TextField() + # Freeform bag for future per-line metadata (stage, worker id, counters, ...) + # without requiring a schema migration. Kept nullable/empty-default so it + # costs nothing on existing rows. + context = models.JSONField(blank=True, default=dict) + + class Meta: + ordering = ["-created_at", "-pk"] + indexes = [models.Index(fields=["job", "-created_at"])] + + class JobLogHandler(logging.Handler): """ Class for handling logs from a job and writing them to the job instance. @@ -337,41 +362,24 @@ def emit(self, record: logging.LogRecord): # Log to the current app logger (container stdout). logger.log(record.levelno, self.format(record)) - # Gated by ``JOB_LOG_PERSIST_ENABLED`` (default True). Persisting every - # log line to ``jobs_job.logs`` becomes a row-lock contention point - # under concurrent async_api load — each call triggers - # ``UPDATE jobs_job SET logs = ...`` on the shared job row, and inside - # ``ATOMIC_REQUESTS`` a single batched ``/result`` POST stacks N such - # UPDATEs in one tx, blocking every ML worker on the same row for the - # duration of the request. Deployments hitting that pattern can set the - # flag to False to short-circuit here until PR #1259 lands an - # append-only ``JobLog`` child table. See issue #1256. + # Escape hatch: when False, skip the per-job DB write entirely. Container + # stdout still captures every line above, so ops observability is + # unchanged; only the per-job UI log view loses new entries for the + # duration the flag is off. Default is True. See issue #1256. if not getattr(settings, "JOB_LOG_PERSIST_ENABLED", True): return - # Write to the logs field on the job instance. - # Refresh from DB first to reduce the window for concurrent overwrites — each - # worker holds its own stale in-memory copy of `logs`, so without a refresh the - # last writer always wins and earlier entries are silently dropped. - # @TODO consider saving logs to the database periodically rather than on every log + # Append-only insert on the JobLog child table. Unlike the legacy + # jobs_job.logs JSONB update path, this does not contend with + # _update_job_progress on the parent row. try: - self.job.refresh_from_db(fields=["logs"]) - timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") - msg = f"[{timestamp}] {record.levelname} {self.format(record)}" - if msg not in self.job.logs.stdout: - self.job.logs.stdout.insert(0, msg) - - # Write a simpler copy of any errors to the errors field - if record.levelno >= logging.ERROR: - if record.message not in self.job.logs.stderr: - self.job.logs.stderr.insert(0, record.message) - - if len(self.job.logs.stdout) > self.max_log_length: - self.job.logs.stdout = self.job.logs.stdout[: self.max_log_length] - - self.job.save(update_fields=["logs"], update_progress=False) + JobLog.objects.create( + job_id=self.job.pk, + level=record.levelname, + message=self.format(record), + ) except Exception as e: - logger.error(f"Failed to save logs for job #{self.job.pk}: {e}") + logger.error(f"Failed to save log for job #{self.job.pk}: {e}") @dataclass diff --git a/ami/jobs/serializers.py b/ami/jobs/serializers.py index fed4d58fa..95bd03b20 100644 --- a/ami/jobs/serializers.py +++ b/ami/jobs/serializers.py @@ -13,9 +13,46 @@ from ami.ml.schemas import PipelineProcessingTask, PipelineTaskResult, ProcessingServiceClientInfo from ami.ml.serializers import PipelineNestedSerializer -from .models import Job, JobLogs, JobProgress, MLJob +from .models import Job, JobLog, JobProgress, MLJob from .schemas import QueuedTaskAcknowledgment +JOB_LOG_LEVELS_STDERR = {"ERROR", "CRITICAL"} +JOB_LOG_TIMESTAMP_FORMAT = "%Y-%m-%d %H:%M:%S" +JOB_LOGS_DEFAULT_LIMIT = 1000 + + +def _legacy_logs_shape(job: Job) -> dict[str, list[str]]: + legacy = getattr(job, "logs", None) + return { + "stdout": list(getattr(legacy, "stdout", []) or []), + "stderr": list(getattr(legacy, "stderr", []) or []), + } + + +def serialize_job_logs(job: Job, *, limit: int = JOB_LOGS_DEFAULT_LIMIT) -> dict[str, list[str]]: + """Return ``{stdout, stderr}`` in the shape the UI already parses. + + Reads joined ``JobLog`` rows first (newest-first, capped at ``limit``). Jobs + created before the table existed and jobs written while + ``JOB_LOG_PERSIST_ENABLED=False`` have no rows and fall back to the legacy + ``jobs_job.logs`` JSON column so their UI log panel stays populated. + """ + entries = list( + JobLog.objects.filter(job_id=job.pk) + .only("created_at", "level", "message") + .order_by("-created_at", "-pk")[:limit] + ) + if entries: + return { + "stdout": [ + f"[{entry.created_at.strftime(JOB_LOG_TIMESTAMP_FORMAT)}] {entry.level} {entry.message}" + for entry in entries + ], + "stderr": [entry.message for entry in entries if entry.level in JOB_LOG_LEVELS_STDERR], + } + + return _legacy_logs_shape(job) + class JobProjectNestedSerializer(DefaultSerializer): class Meta: @@ -49,7 +86,7 @@ class JobListSerializer(DefaultSerializer): source_image_single = SourceImageNestedSerializer(read_only=True) data_export = DataExportNestedSerializer(read_only=True) progress = SchemaField(schema=JobProgress, read_only=True) - logs = SchemaField(schema=JobLogs, read_only=True) + logs = serializers.SerializerMethodField() job_type = JobTypeSerializer(read_only=True) # All jobs created from the Jobs UI are ML jobs (datasync, etc. are created for the user) # @TODO Remove this when the UI is updated pass a job type. This should be a required field. @@ -147,6 +184,16 @@ class Meta: "dispatch_mode", ] + def get_logs(self, obj: Job) -> dict[str, list[str]]: + # List responses skip the JobLog query to avoid N+1 — the UI only renders + # logs on the detail page, so returning the (typically empty for new jobs) + # legacy JSON shape is acceptable. Detail responses go to the joined table + # and fall back to the legacy shape for pre-migration jobs. + view = self.context.get("view") + if getattr(view, "action", None) == "list": + return _legacy_logs_shape(obj) + return serialize_job_logs(obj) + class JobSerializer(JobListSerializer): # progress = serializers.JSONField(initial=Job.default_progress(), allow_null=False, required=False) diff --git a/ami/jobs/tests/test_jobs.py b/ami/jobs/tests/test_jobs.py index ee8d833cd..49a693cc4 100644 --- a/ami/jobs/tests/test_jobs.py +++ b/ami/jobs/tests/test_jobs.py @@ -8,7 +8,15 @@ from rest_framework.test import APIRequestFactory, APITestCase from ami.base.serializers import reverse_with_params -from ami.jobs.models import Job, JobDispatchMode, JobProgress, JobState, MLJob, SourceImageCollectionPopulateJob +from ami.jobs.models import ( + Job, + JobDispatchMode, + JobLog, + JobProgress, + JobState, + MLJob, + SourceImageCollectionPopulateJob, +) from ami.main.models import Project, SourceImage, SourceImageCollection from ami.ml.models import Pipeline from ami.ml.models.processing_service import ProcessingService @@ -18,6 +26,10 @@ logger = logging.getLogger(__name__) +def joined_job_log_messages(job: Job) -> str: + return "\n".join(JobLog.objects.filter(job=job).order_by("-created_at", "-pk").values_list("message", flat=True)) + + class TestJobProgress(TestCase): def setUp(self): self.project = Project.objects.create(name="Test project") @@ -645,7 +657,7 @@ def test_tasks_endpoint_logs_fetch_to_job_logger(self): self.assertEqual(resp.status_code, 200) job.refresh_from_db() - joined = "\n".join(job.logs.stdout) + joined = joined_job_log_messages(job) self.assertIn("Tasks fetched", joined) self.assertIn("requested=2", joined) self.assertIn("delivered=", joined) @@ -666,7 +678,7 @@ def test_tasks_endpoint_logs_early_exit_for_terminal_job(self): self.assertEqual(resp.json(), {"tasks": []}) job.refresh_from_db() - joined = "\n".join(job.logs.stdout) + joined = joined_job_log_messages(job) self.assertIn("non-active job", joined) self.assertIn(f"status={JobState.SUCCESS}", joined) @@ -705,7 +717,7 @@ def test_result_endpoint_mirrors_queued_log_to_job_logger(self): self.assertEqual(resp.status_code, 200) job.refresh_from_db() - joined = "\n".join(job.logs.stdout) + joined = joined_job_log_messages(job) self.assertIn("Queued pipeline result", joined) self.assertIn("mirrored-task-id", joined) self.assertIn("test.reply.logged", joined) @@ -742,7 +754,7 @@ def test_tasks_fetch_log_uses_token_fingerprint_not_full_token(self): self.assertEqual(resp.status_code, 200) job.refresh_from_db() - joined = "\n".join(job.logs.stdout) + joined = joined_job_log_messages(job) # Full token key must NOT appear anywhere in logs self.assertNotIn(token.key, joined) # Fingerprint (first 8 chars + ellipsis) MUST appear @@ -769,7 +781,7 @@ def test_tasks_fetch_zero_delivered_does_not_log_to_stdout(self): job.refresh_from_db() # No Tasks fetched line should appear in stdout for a zero-delivery poll - joined = "\n".join(job.logs.stdout) + joined = joined_job_log_messages(job) self.assertNotIn("Tasks fetched", joined) def test_tasks_fetch_nonzero_delivered_logs_to_stdout(self): @@ -799,7 +811,7 @@ def test_tasks_fetch_nonzero_delivered_logs_to_stdout(self): self.assertEqual(len(resp.json()["tasks"]), 3) job.refresh_from_db() - joined = "\n".join(job.logs.stdout) + joined = joined_job_log_messages(job) self.assertIn("Tasks fetched", joined) self.assertIn("delivered=3", joined) @@ -842,7 +854,7 @@ def test_throughput_line_is_well_formed(self): _log_job_throughput(self.job, "process") self.job.refresh_from_db() - joined = "\n".join(self.job.logs.stdout) + joined = joined_job_log_messages(self.job) self.assertIn("throughput", joined) self.assertIn("processed=10/100", joined) self.assertIn("rate=2.0 imgs/min", joined) @@ -858,7 +870,7 @@ def test_throughput_skipped_when_started_at_is_none(self): _log_job_throughput(self.job, "process") self.job.refresh_from_db() - joined = "\n".join(self.job.logs.stdout) + joined = joined_job_log_messages(self.job) self.assertNotIn("throughput", joined) def test_throughput_skipped_for_non_processing_stage(self): @@ -873,7 +885,7 @@ def test_throughput_skipped_for_non_processing_stage(self): _log_job_throughput(self.job, "delay") self.job.refresh_from_db() - joined = "\n".join(self.job.logs.stdout) + joined = joined_job_log_messages(self.job) self.assertNotIn("throughput", joined) def test_throughput_with_zero_processed_reports_unknown_eta(self): @@ -888,12 +900,127 @@ def test_throughput_with_zero_processed_reports_unknown_eta(self): _log_job_throughput(self.job, "process") self.job.refresh_from_db() - joined = "\n".join(self.job.logs.stdout) + joined = joined_job_log_messages(self.job) self.assertIn("processed=0/50", joined) self.assertIn("rate=0.0", joined) self.assertIn("ETA=unknown", joined) +class TestJobLogPersistence(TestCase): + """Exercise the JobLog table / legacy-JSON fallback paths on JobLogHandler.emit.""" + + def setUp(self): + self.project = Project.objects.create(name="JobLog Test Project") + self.pipeline = Pipeline.objects.create(name="JobLog Pipeline", slug="joblog-pipeline") + self.pipeline.projects.add(self.project) + self.job = Job.objects.create( + job_type_key=MLJob.key, + project=self.project, + name="JobLog emit test job", + pipeline=self.pipeline, + ) + + def test_emit_inserts_one_joblog_row_per_call(self): + self.job.logger.info("first") + self.job.logger.error("boom") + + rows = list(JobLog.objects.filter(job=self.job).order_by("pk").values("level", "message")) + self.assertEqual(len(rows), 2) + self.assertEqual(rows[0]["level"], "INFO") + self.assertIn("first", rows[0]["message"]) + self.assertEqual(rows[1]["level"], "ERROR") + self.assertIn("boom", rows[1]["message"]) + + # emit must not repopulate the legacy JSON column. + self.job.refresh_from_db(fields=["logs"]) + self.assertEqual(self.job.logs.stdout, []) + self.assertEqual(self.job.logs.stderr, []) + + def test_flag_disabled_short_circuits_emit(self): + from django.test import override_settings + + with override_settings(JOB_LOG_PERSIST_ENABLED=False): + self.job.logger.info("suppressed") + self.job.logger.error("also suppressed") + + self.assertFalse(JobLog.objects.filter(job=self.job).exists()) + self.job.refresh_from_db(fields=["logs"]) + self.assertEqual(self.job.logs.stdout, []) + self.assertEqual(self.job.logs.stderr, []) + + def test_serialize_job_logs_reads_from_joblog_table(self): + from ami.jobs.serializers import serialize_job_logs + + self.job.logger.info("hello world") + self.job.logger.error("something failed") + + logs = serialize_job_logs(self.job) + + self.assertEqual(len(logs["stdout"]), 2) + # Newest-first ordering. + self.assertIn("ERROR", logs["stdout"][0]) + self.assertIn("something failed", logs["stdout"][0]) + self.assertIn("INFO", logs["stdout"][1]) + self.assertIn("hello world", logs["stdout"][1]) + self.assertEqual(logs["stderr"], ["something failed"]) + + def test_serialize_job_logs_falls_back_to_legacy_json(self): + """A job with no JobLog rows but a populated ``logs`` JSON column (a + pre-migration job, or a job written under ``JOB_LOG_PERSIST_ENABLED=False`` + after legacy data had been seeded) still renders through the serializer.""" + from ami.jobs.models import JobLogs as JobLogsSchema + from ami.jobs.serializers import serialize_job_logs + + self.job.logs = JobLogsSchema(stdout=["[2025-01-01 00:00:00] INFO legacy line"], stderr=["old error"]) + self.job.save(update_fields=["logs"]) + self.assertFalse(JobLog.objects.filter(job=self.job).exists()) + + logs = serialize_job_logs(self.job) + + self.assertEqual(logs["stdout"], ["[2025-01-01 00:00:00] INFO legacy line"]) + self.assertEqual(logs["stderr"], ["old error"]) + + def test_get_logs_list_action_skips_joblog_query(self): + """The ``get_logs`` method on JobListSerializer returns the legacy JSON + shape when the viewset action is ``list``. This avoids N+1 on joined + log rows and matches UI expectations (the list view does not render logs).""" + from unittest.mock import MagicMock + + from ami.jobs.models import JobLogs as JobLogsSchema + from ami.jobs.serializers import JobListSerializer + + self.job.logger.info("ignored in list view") + self.assertEqual(JobLog.objects.filter(job=self.job).count(), 1) + + self.job.logs = JobLogsSchema(stdout=["legacy-only"], stderr=[]) + self.job.save(update_fields=["logs"]) + + # Directly instantiate the serializer with a fake view context claiming + # the list action; confirms list responses do not hit JobLog rows. + fake_view = MagicMock() + fake_view.action = "list" + serializer = JobListSerializer(instance=self.job, context={"view": fake_view}) + logs = serializer.get_logs(self.job) + + self.assertEqual(logs["stdout"], ["legacy-only"]) + self.assertEqual(logs["stderr"], []) + + def test_get_logs_detail_action_reads_joblog_table(self): + from unittest.mock import MagicMock + + from ami.jobs.serializers import JobListSerializer + + self.job.logger.info("detail view reads me") + + fake_view = MagicMock() + fake_view.action = "retrieve" + serializer = JobListSerializer(instance=self.job, context={"view": fake_view}) + logs = serializer.get_logs(self.job) + + self.assertEqual(len(logs["stdout"]), 1) + self.assertIn("detail view reads me", logs["stdout"][0]) + + class TestJobDispatchModeFiltering(APITestCase): """Test job filtering by dispatch_mode.""" diff --git a/ami/ml/tests.py b/ami/ml/tests.py index 18549d8cb..0eaa0a237 100644 --- a/ami/ml/tests.py +++ b/ami/ml/tests.py @@ -279,9 +279,9 @@ def test_repeated_registration_updates_last_seen(self): class TestPipelineWithProcessingService(TestCase): def test_run_pipeline_with_errors_from_processing_service(self): """ - Run a real pipeline and verify that if an error occurs for one image, the error is logged in job.logs.stderr. + Run a real pipeline and verify that if an error occurs for one image, the error is logged to JobLog. """ - from ami.jobs.models import Job + from ami.jobs.models import Job, JobLog # Setup test project, images, and job project, deployment = setup_test_project() @@ -305,11 +305,13 @@ def test_run_pipeline_with_errors_from_processing_service(self): pass # Expected if the backend raises job.refresh_from_db() - stderr_logs = job.logs.stderr + stderr_logs = list( + JobLog.objects.filter(job=job, level__in=["ERROR", "CRITICAL"]).values_list("message", flat=True) + ) # Check that an error message mentioning the failed image is present assert any( "Failed to process" in log for log in stderr_logs - ), f"Expected error message in job.logs.stderr, got: {stderr_logs}" + ), f"Expected error message in job logs, got: {stderr_logs}" def setUp(self): self.project, self.deployment = setup_test_project()