Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions ami/jobs/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,10 @@ def _fail_job(job_id: int, reason: str) -> None:
job.save(update_fields=["status", "progress", "finished_at"])

job.logger.error(f"Job {job_id} marked as FAILURE: {reason}")
cleanup_async_job_resources(job.pk, job.logger)
cleanup_async_job_resources(job.pk)
except Job.DoesNotExist:
logger.error(f"Cannot fail job {job_id}: not found")
cleanup_async_job_resources(job_id, logger)
cleanup_async_job_resources(job_id)


def _ack_task_via_nats(reply_subject: str, job_logger: logging.Logger) -> None:
Expand Down Expand Up @@ -423,7 +423,7 @@ def cleanup_async_job_if_needed(job) -> None:
# import here to avoid circular imports
from ami.ml.orchestration.jobs import cleanup_async_job_resources

cleanup_async_job_resources(job.pk, job.logger)
cleanup_async_job_resources(job.pk)


@task_prerun.connect(sender=run_job)
Expand Down
56 changes: 42 additions & 14 deletions ami/ml/orchestration/jobs.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging

from asgiref.sync import async_to_sync
from asgiref.sync import async_to_sync, sync_to_async

from ami.jobs.models import Job, JobState
from ami.main.models import SourceImage
Expand All @@ -11,7 +11,7 @@
logger = logging.getLogger(__name__)


def cleanup_async_job_resources(job_id: int, _logger: logging.Logger) -> bool:
def cleanup_async_job_resources(job_id: int) -> bool:
"""
Clean up NATS JetStream and Redis resources for a completed job.

Expand All @@ -21,37 +21,53 @@ def cleanup_async_job_resources(job_id: int, _logger: logging.Logger) -> bool:

Cleanup failures are logged but don't fail the job - data is already saved.

Resolves the job (and its per-job logger) internally so callers only need
to pass the ``job_id`` — matches the pattern used by ``save_results`` in
``ami/jobs/tasks.py``. If the ``Job`` row is gone (e.g. the
``Job.DoesNotExist`` path in ``_fail_job``), the function falls back to
the module logger and TaskQueueManager's module-logger path.

Args:
job_id: The Job ID (integer primary key)
_logger: Logger to use for logging cleanup results
job_id: The Job ID (integer primary key).
Returns:
bool: True if both cleanups succeeded, False otherwise
"""
job_logger: logging.Logger | None = None
try:
job = Job.objects.get(pk=job_id)
job_logger = job.logger
except Job.DoesNotExist:
pass
_log = job_logger or logger

redis_success = False
nats_success = False

# Cleanup Redis state
try:
state_manager = AsyncJobStateManager(job_id)
state_manager.cleanup()
_logger.info(f"Cleaned up Redis state for job {job_id}")
_log.info(f"Cleaned up Redis state for job {job_id}")
redis_success = True
except Exception as e:
_logger.error(f"Error cleaning up Redis state for job {job_id}: {e}")
_log.error(f"Error cleaning up Redis state for job {job_id}: {e}")

# Cleanup NATS resources
# Cleanup NATS resources. Forward job_logger to TaskQueueManager so the
# forensic pre-delete consumer-stats snapshot lands in the UI job log.
# When job_logger is None, TaskQueueManager falls back to the module
# logger only.
async def cleanup():
async with TaskQueueManager() as manager:
async with TaskQueueManager(job_logger=job_logger) as manager:
return await manager.cleanup_job_resources(job_id)

try:
nats_success = async_to_sync(cleanup)()
if nats_success:
_logger.info(f"Cleaned up NATS resources for job {job_id}")
_log.info(f"Cleaned up NATS resources for job {job_id}")
else:
_logger.warning(f"Failed to clean up NATS resources for job {job_id}")
_log.warning(f"Failed to clean up NATS resources for job {job_id}")
except Exception as e:
_logger.error(f"Error cleaning up NATS resources for job {job_id}: {e}")
_log.error(f"Error cleaning up NATS resources for job {job_id}: {e}")

return redis_success and nats_success

Expand Down Expand Up @@ -97,16 +113,28 @@ async def queue_all_images():
successful_queues = 0
failed_queues = 0

async with TaskQueueManager() as manager:
# Pass job.logger so stream/consumer setup and any publish failures
# appear in the UI job log (not just the module logger). Per-image
# success logs stay at module level so a 10k-image job doesn't drown
# the job log.
async with TaskQueueManager(job_logger=job.logger) as manager:
for image_pk, task in tasks:
try:
logger.info(f"Queueing image {image_pk} to stream for job '{job.pk}': {task.image_url}")
logger.debug(f"Queueing image {image_pk} to stream for job '{job.pk}': {task.image_url}")
success = await manager.publish_task(
job_id=job.pk,
data=task,
)
except Exception as e:
logger.error(f"Failed to queue image {image_pk} to stream for job '{job.pk}': {e}")
# Module logger gets the full traceback for ops dashboards.
logger.exception("Failed to queue image %s to stream for job '%s'", image_pk, job.pk)
# job.logger.error triggers a sync Django ORM save inside
# JobLogHandler.emit, which raises SynchronousOnlyOperation
# when called directly from the event loop. Bridge it so
# the line actually lands in job.logs.stdout.
await sync_to_async(job.logger.error)(
f"Failed to queue image {image_pk} to stream for job '{job.pk}': {e}"
)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
success = False

if success:
Expand Down
Loading
Loading