Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions ami/jobs/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,12 @@ def _fail_job(job_id: int, reason: str) -> None:
job.save(update_fields=["status", "progress", "finished_at"])

job.logger.error(f"Job {job_id} marked as FAILURE: {reason}")
cleanup_async_job_resources(job.pk, job.logger)
cleanup_async_job_resources(job.pk, job.logger, job_logger=job.logger)
except Job.DoesNotExist:
logger.error(f"Cannot fail job {job_id}: not found")
# No job_logger here — the job row is gone, so cleanup lifecycle lines
# have nowhere to be mirrored to. TaskQueueManager falls through to
# the module logger.
cleanup_async_job_resources(job_id, logger)


Expand Down Expand Up @@ -423,7 +426,7 @@ def cleanup_async_job_if_needed(job) -> None:
# import here to avoid circular imports
from ami.ml.orchestration.jobs import cleanup_async_job_resources

cleanup_async_job_resources(job.pk, job.logger)
cleanup_async_job_resources(job.pk, job.logger, job_logger=job.logger)


@task_prerun.connect(sender=run_job)
Expand Down
45 changes: 36 additions & 9 deletions ami/ml/orchestration/jobs.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging

from asgiref.sync import async_to_sync
from asgiref.sync import async_to_sync, sync_to_async

from ami.jobs.models import Job, JobState
from ami.main.models import SourceImage
Expand All @@ -11,7 +11,11 @@
logger = logging.getLogger(__name__)


def cleanup_async_job_resources(job_id: int, _logger: logging.Logger) -> bool:
def cleanup_async_job_resources(
job_id: int,
_logger: logging.Logger,
job_logger: logging.Logger | None = None,
) -> bool:
"""
Clean up NATS JetStream and Redis resources for a completed job.

Expand All @@ -22,8 +26,17 @@ def cleanup_async_job_resources(job_id: int, _logger: logging.Logger) -> bool:
Cleanup failures are logged but don't fail the job - data is already saved.

Args:
job_id: The Job ID (integer primary key)
_logger: Logger to use for logging cleanup results
job_id: The Job ID (integer primary key).
_logger: Logger to use for the local Redis/NATS outcome lines emitted
by this function itself. May be a plain module logger when the
caller has no job context (e.g. the ``Job.DoesNotExist`` path in
``_fail_job``).
job_logger: Optional per-job logger (``job.logger``) to forward to
``TaskQueueManager`` so the UI job log sees the forensic
consumer-stats snapshot and the stream/consumer delete lines.
Must only be set when the caller actually has a ``job.logger`` —
otherwise cleanup lifecycle lines would be mirrored into an
unrelated module logger.
Returns:
bool: True if both cleanups succeeded, False otherwise
"""
Expand All @@ -39,9 +52,13 @@ def cleanup_async_job_resources(job_id: int, _logger: logging.Logger) -> bool:
except Exception as e:
_logger.error(f"Error cleaning up Redis state for job {job_id}: {e}")

# Cleanup NATS resources
# Cleanup NATS resources. Forward the per-job logger (if any) so the
# forensic pre-delete consumer-stats snapshot and the delete lifecycle
# lines land in the UI job log. When job_logger is None (e.g. the
# Job.DoesNotExist fallback), TaskQueueManager falls back to the module
# logger only.
async def cleanup():
async with TaskQueueManager() as manager:
async with TaskQueueManager(job_logger=job_logger) as manager:
return await manager.cleanup_job_resources(job_id)

try:
Expand Down Expand Up @@ -97,16 +114,26 @@ async def queue_all_images():
successful_queues = 0
failed_queues = 0

async with TaskQueueManager() as manager:
# Pass job.logger so stream/consumer setup and any publish failures
# appear in the UI job log (not just the module logger). Per-image
# success logs stay at module level so a 10k-image job doesn't drown
# the job log.
async with TaskQueueManager(job_logger=job.logger) as manager:
for image_pk, task in tasks:
try:
logger.info(f"Queueing image {image_pk} to stream for job '{job.pk}': {task.image_url}")
logger.debug(f"Queueing image {image_pk} to stream for job '{job.pk}': {task.image_url}")
success = await manager.publish_task(
job_id=job.pk,
data=task,
)
except Exception as e:
logger.error(f"Failed to queue image {image_pk} to stream for job '{job.pk}': {e}")
# job.logger.error triggers a sync Django ORM save inside
# JobLogHandler.emit, which raises SynchronousOnlyOperation
# when called directly from the event loop. Bridge it so
# the line actually lands in job.logs.stdout.
await sync_to_async(job.logger.error)(
f"Failed to queue image {image_pk} to stream for job '{job.pk}': {e}"
)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
success = False

if success:
Expand Down
Loading
Loading