Skip to content
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
ae02d2e
fix: Job serialization overhead
carlosgjs Sep 30, 2025
24a15af
syntax
carlosgjs Sep 30, 2025
0da97a6
fix syntax
carlosgjs Sep 30, 2025
2db7d66
Simplify diagram
carlosgjs Oct 1, 2025
8a714cd
Add RabbitMQ
carlosgjs Oct 3, 2025
700f594
WIP: Use NATS JetStream for queuing
carlosgjs Oct 8, 2025
3b42e08
Merge branch 'main' into carlosg/jobio
carlosgjs Oct 14, 2025
8ea5d7d
Saving of results
carlosgjs Oct 17, 2025
61fc2c5
Update progress
carlosgjs Oct 17, 2025
9af597c
Clean up and refactor task state mgmt
carlosgjs Oct 24, 2025
7ff8865
fix async use
carlosgjs Oct 24, 2025
0fbe899
Merge branch 'main' into carlosg/jobio
carlosgjs Oct 24, 2025
7899fc5
Fix circular dependency, jobset query by pipeline slug
carlosgjs Oct 24, 2025
d9f8ffd
GH review comments
carlosgjs Oct 24, 2025
edad552
Add feature flag, rename "job" to "task"
carlosgjs Oct 29, 2025
d254867
Code reorganization
carlosgjs Oct 31, 2025
1cc890e
Resolve circular deps
carlosgjs Oct 31, 2025
84ee5a2
Update ami/jobs/models.py
carlosgjs Oct 31, 2025
09fee92
cleanup
carlosgjs Oct 31, 2025
4480b0d
Consistent progress updates, single image job command
carlosgjs Nov 4, 2025
3032709
Fix typo
carlosgjs Nov 4, 2025
3e7ef3b
Merge branch 'main' into carlosg/jobio
carlosgjs Nov 5, 2025
04be994
Merge branch 'main' into carlosg/jobio
carlosgjs Nov 18, 2025
a8b94e3
Remove unnecesary file
carlosgjs Nov 18, 2025
1fc20b5
Merge branch 'main' into carlosg/jobio
carlosgjs Nov 21, 2025
0a5c89e
Remove diagram, fix flakes
carlosgjs Nov 21, 2025
344f883
Use async_to_sync
carlosgjs Nov 21, 2025
df7eaa3
CR feedback
carlosgjs Nov 21, 2025
0391642
clean up
carlosgjs Nov 21, 2025
4ae27b0
more cleanup
carlosgjs Nov 21, 2025
4f50b3d
Apply suggestions from code review
carlosgjs Nov 21, 2025
a8fc79a
Remove old comments
carlosgjs Nov 21, 2025
4efdf07
Fix processing error cases
carlosgjs Nov 21, 2025
f221a1a
updates
carlosgjs Nov 21, 2025
1a9b80a
Merge branch 'main' into carlosg/jobio
carlosgjs Dec 9, 2025
3657fd2
Fix merge bugs, back to working state
carlosgjs Dec 9, 2025
2483592
Use PipelineProcessingTask for the queue, other fixes
carlosgjs Dec 10, 2025
0ae9674
Update tests
carlosgjs Dec 10, 2025
3c034a9
General cleanup
carlosgjs Dec 10, 2025
e9d2a1c
Add nats to CI and prod
carlosgjs Dec 10, 2025
3d198d0
Unit tests for new classes
carlosgjs Dec 10, 2025
f9a1226
Add nats to staging, don't retry save resutls task
carlosgjs Dec 19, 2025
3a73329
fix formatting
carlosgjs Dec 19, 2025
e241586
fix yaml formatting
carlosgjs Jan 13, 2026
1202063
Clean up and comments
carlos-irreverentlabs Jan 16, 2026
d2865f5
CR feedback
carlos-irreverentlabs Jan 16, 2026
b60eab0
merge
carlos-irreverentlabs Jan 16, 2026
0e350dd
Merge branch 'main' into carlosg/jobio
carlos-irreverentlabs Jan 16, 2026
936d768
fix lint
carlos-irreverentlabs Jan 16, 2026
7645b14
Merge branch 'main' into carlosg/jobio
carlosgjs Jan 20, 2026
644927f
Merge remote-tracking branch 'upstream/main'
carlosgjs Jan 22, 2026
02578b7
Merge branch 'main' into carlosg/jobio
carlosgjs Jan 22, 2026
0d614f9
Merge branch 'main' of github.com:RolnickLab/antenna into carlosg/jobio
mihow Jan 30, 2026
c9077cf
feat: use a pydantic schema for the task erorr state, mirror worker
mihow Jan 31, 2026
6564532
chore: rename function specific to nats
mihow Jan 31, 2026
6bcc610
docs: add plan for fixing the status displayed for async jobs
mihow Jan 31, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 58 additions & 6 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -1,20 +1,72 @@
.editorconfig
.gitattributes
.github
.gitignore
.gitlab-ci.yml
.idea
.pre-commit-config.yaml
.readthedocs.yml
.travis.yml
.git
ui
ami/media
backups
venv
.venv
.env
.envs
venv/
.venv/
.env/
.envs/
.envs/*
node_modules
data

# Python cache / bytecode
__pycache__/
*.py[cod]
*.pyo
*.pyd
*.pdb
*.egg-info/
*.egg
*.whl


# Django / runtime artifacts
*.log
*.pot
*.pyc
db.sqlite3
media/
staticfiles/ # collected static files (use collectstatic inside container)

# Node / UI dependencies (if using React/Vue in your UI service)
node_modules/
npm-debug.log
yarn-error.log
.pnpm-debug.log

# Docs build artifacts
/docs/_build/

# Git / VCS
.git/
.gitignore
.gitattributes
*.swp
*.swo

# IDE / editor
.vscode/
.idea/
*.iml

# OS cruft
.DS_Store
Thumbs.db

# Docker itself
.dockerignore
Dockerfile
docker-compose*.yml

# Build / dist
build/
dist/
.eggs/
3 changes: 3 additions & 0 deletions .envs/.local/.django
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ DJANGO_SUPERUSER_PASSWORD=localadmin
# Redis
REDIS_URL=redis://redis:6379/0

# NATS
NATS_URL=nats://nats:4222

# Celery / Flower
CELERY_FLOWER_USER=QSocnxapfMvzLqJXSsXtnEZqRkBtsmKT
CELERY_FLOWER_PASSWORD=BEQgmCtgyrFieKNoGTsux9YIye0I7P5Q7vEgfJD2C4jxmtHDetFaE2jhS7K7rxaf
Expand Down
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -276,3 +276,6 @@ sandbox/

# Other
flower

# huggingface cache
huggingface_cache/
24 changes: 23 additions & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,29 @@
"version": "0.2.0",
"configurations": [
{
"name": "Python Debugger: Remote Attach",
"name": "Current File",
"type": "debugpy",
"request": "launch",
"program": "${file}",
"console": "integratedTerminal"
},
{
"name": "Django attach",
"type": "debugpy",
"request": "attach",
"connect": {
"host": "localhost",
"port": 5679
},
"pathMappings": [
{
"localRoot": "${workspaceFolder}",
"remoteRoot": "."
}
]
},
{
"name": "Celery worker attach",
"type": "debugpy",
"request": "attach",
"connect": {
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ docker compose -f processing_services/example/docker-compose.yml up -d
- Django admin: http://localhost:8000/admin/
- OpenAPI / Swagger documentation: http://localhost:8000/api/v2/docs/
- Minio UI: http://minio:9001, Minio service: http://minio:9000
- NATS dashboard: https://natsdashboard.com/ (Add localhost)

NOTE: If one of these services is not working properly, it could be due another process is using the port. You can check for this with `lsof -i :<PORT_NUMBER>`.

Expand Down
5 changes: 3 additions & 2 deletions ami/base/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ def get_active_project(
# If not in URL, try query parameters
if not project_id:
# Look for project_id in GET query parameters or POST data
# POST data returns a list of ints, but QueryDict.get() returns a single value
project_id = request.query_params.get(param) or request.data.get(param)
# request.data may not always be a dict (e.g., for non-POST requests), so we check its type
post_data = request.data if isinstance(request.data, dict) else {}
project_id = request.query_params.get(param) or post_data.get(param)

project_id = SingleParamSerializer[int].clean(
param_name=param,
Expand Down
32 changes: 20 additions & 12 deletions ami/jobs/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def get_status_label(status: JobState, progress: float) -> str:
if status in [JobState.CREATED, JobState.PENDING, JobState.RECEIVED]:
return "Waiting to start"
elif status in [JobState.STARTED, JobState.RETRY, JobState.SUCCESS]:
return f"{progress:.0%} complete"
return f"{progress:.0%} complete" # noqa E231
Comment thread
carlosgjs marked this conversation as resolved.
Outdated
Comment thread
carlosgjs marked this conversation as resolved.
Outdated
else:
return f"{status.name}"

Expand Down Expand Up @@ -133,14 +133,14 @@ def get_stage(self, stage_key: str) -> JobProgressStageDetail:
for stage in self.stages:
if stage.key == stage_key:
return stage
raise ValueError(f"Job stage with key '{stage_key}' not found in progress")
raise ValueError(f"Job stage with key '{stage_key}' not found in progress") # noqa E713
Comment thread
carlosgjs marked this conversation as resolved.
Outdated

def get_stage_param(self, stage_key: str, param_key: str) -> ConfigurableStageParam:
stage = self.get_stage(stage_key)
for param in stage.params:
if param.key == param_key:
return param
raise ValueError(f"Job stage parameter with key '{param_key}' not found in stage '{stage_key}'")
raise ValueError(f"Job stage parameter with key '{param_key}' not found in stage '{stage_key}'") # noqa E713

def add_stage_param(self, stage_key: str, param_name: str, value: typing.Any = None) -> ConfigurableStageParam:
stage = self.get_stage(stage_key)
Expand Down Expand Up @@ -322,15 +322,13 @@ def run(cls, job: "Job"):
"""
Procedure for an ML pipeline as a job.
"""
from ami.ml.orchestration.jobs import queue_images_to_nats
Comment thread
coderabbitai[bot] marked this conversation as resolved.

job.update_status(JobState.STARTED)
job.started_at = datetime.datetime.now()
job.finished_at = None
job.save()

# Keep track of sub-tasks for saving results, pair with batch number
save_tasks: list[tuple[int, AsyncResult]] = []
save_tasks_completed: list[tuple[int, AsyncResult]] = []

if job.delay:
update_interval_seconds = 2
last_update = time.time()
Expand Down Expand Up @@ -365,7 +363,7 @@ def run(cls, job: "Job"):
progress=0,
)

images = list(
images: list[SourceImage] = list(
# @TODO return generator plus image count
# @TODO pass to celery group chain?
job.pipeline.collect_images(
Expand All @@ -389,8 +387,6 @@ def run(cls, job: "Job"):
images = images[: job.limit]
image_count = len(images)
job.progress.add_stage_param("collect", "Limit", image_count)
else:
image_count = source_image_count

job.progress.update_stage(
"collect",
Expand All @@ -401,6 +397,17 @@ def run(cls, job: "Job"):
# End image collection stage
job.save()

if job.project.feature_flags.async_pipeline_workers:
queue_images_to_nats(job, images)
else:
cls.process_images(job, images)
Comment thread
carlosgjs marked this conversation as resolved.
Outdated
Comment on lines +400 to +410
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot Nov 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Async ML path leaves overall job lifecycle undefined (status/finished_at, Celery status mismatch)

When async_pipeline_workers is enabled, MLJob.run queues NATS tasks and returns immediately. The surrounding run_job Celery task then completes and update_job_status will set job.status to the Celery task status (SUCCESS), even though:

  • No images may yet have been processed by workers.
  • finished_at is never set for the async path.
  • The only subsequent updates come from process_pipeline_result, which adjust per-stage progress only (via _update_job_progress) and never touch job.status or finished_at.

Net effect: jobs can show SUCCESS while process/results stages are still <100%, and even once stages reach 100% there is no authoritative completion timestamp or explicit terminal status driven by the async pipeline.

Consider tightening this by:

  • Having the async pipeline be the source of truth for completion, e.g. in _update_job_progress when stage == "results" and progress_percentage >= 1.0, set job.status to JobState.SUCCESS, set finished_at, and (optionally) trigger NATS/Redis cleanup.
  • Optionally, for jobs with async_pipeline_workers=True, avoid overwriting job.status in update_job_status based solely on the run_job Celery task’s status, or treat that status as “queueing succeeded” only.

This will make the async path match the synchronous process_images semantics and avoid confusing “SUCCESS, 0% complete” states.

🤖 Prompt for AI Agents
In ami/jobs/models.py around lines 400-410, the async pipeline path only queues
work and returns, leaving job.status and finished_at unset and allowing the
Celery run_job task to mark the job SUCCESS prematurely; update the logic so
that for async_pipeline_workers you do not overwrite job.status based solely on
the run_job Celery task (treat that status as "queued" or leave unchanged), and
move authoritative completion handling into the async progress handler: in
_update_job_progress, when stage == "results" and progress_percentage >= 1.0 set
job.status = JobState.SUCCESS, set job.finished_at = now(), save the job (and
optionally perform NATS/Redis cleanup), and ensure any queued-path error
handling still sets FAILURE and finished_at as currently done.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense, I and I see this behavior. The job is marked as Successful at 0% and then the worker begins picking up the tasks. We will address this.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mihow, thank you for confirming! I appreciate you acknowledging the issue. Since you'll be addressing it in future work, feel free to reach out if you'd like any suggestions on the implementation approach when you tackle it. 🐰


@classmethod
def process_images(cls, job, images):
image_count = len(images)
# Keep track of sub-tasks for saving results, pair with batch number
save_tasks: list[tuple[int, AsyncResult]] = []
Comment thread
mihow marked this conversation as resolved.
save_tasks_completed: list[tuple[int, AsyncResult]] = []
total_captures = 0
total_detections = 0
total_classifications = 0
Expand All @@ -420,7 +427,7 @@ def run(cls, job: "Job"):
job_id=job.pk,
project_id=job.project.pk,
)
job.logger.info(f"Processed image batch {i+1} in {time.time() - request_sent:.2f}s")
job.logger.info(f"Processed image batch {i+1} in {time.time() - request_sent:.2f}s") # noqa E231
Comment thread
carlosgjs marked this conversation as resolved.
Outdated
except Exception as e:
# Log error about image batch and continue
job.logger.error(f"Failed to process image batch {i+1}: {e}")
Expand Down Expand Up @@ -472,7 +479,7 @@ def run(cls, job: "Job"):

if image_count:
percent_successful = 1 - len(request_failed_images) / image_count if image_count else 0
job.logger.info(f"Processed {percent_successful:.0%} of images successfully.")
job.logger.info(f"Processed {percent_successful:.0%} of images successfully.") # noqa E231

# Check all Celery sub-tasks if they have completed saving results
save_tasks_remaining = set(save_tasks) - set(save_tasks_completed)
Expand Down Expand Up @@ -513,6 +520,7 @@ def run(cls, job: "Job"):
job.save()


# TODO: This needs to happen once a job is done
class DataStorageSyncJob(JobType):
name = "Data storage sync"
key = "data_storage_sync"
Expand Down
108 changes: 108 additions & 0 deletions ami/jobs/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
from celery.result import AsyncResult
from celery.signals import task_failure, task_postrun, task_prerun

from ami.ml.orchestration.nats_queue import TaskQueueManager
from ami.ml.orchestration.task_state import TaskStateManager
from ami.ml.orchestration.utils import run_in_async_loop
from ami.ml.schemas import PipelineResultsResponse
from ami.tasks import default_soft_time_limit, default_time_limit
from config import celery_app

Expand Down Expand Up @@ -30,6 +34,110 @@ def run_job(self, job_id: int) -> None:
job.logger.info(f"Finished job {job}")


@celery_app.task(
bind=True,
max_retries=3,
default_retry_delay=60,
autoretry_for=(Exception,),
soft_time_limit=300, # 5 minutes
time_limit=360, # 6 minutes
)
def process_pipeline_result(self, job_id: int, result_data: dict, reply_subject: str) -> None:
Comment thread
mihow marked this conversation as resolved.
Outdated
Comment thread
mihow marked this conversation as resolved.
Outdated
"""
Process a single pipeline result asynchronously.

This task:
1. Deserializes the pipeline result
2. Saves it to the database
3. Updates progress by removing processed image IDs from Redis
4. Acknowledges the task via NATS

Args:
job_id: The job ID
result_data: Dictionary containing the pipeline result
reply_subject: NATS reply subject for acknowledgment

Returns:
dict with status information
"""
Comment thread
carlosgjs marked this conversation as resolved.
Outdated
from ami.jobs.models import Job, JobState # avoid circular import

try:
job = Job.objects.get(pk=job_id)
job.logger.info(f"Processing pipeline result for job {job_id}, reply_subject: {reply_subject}")

# Save to database (this is the slow operation)
if not job.pipeline:
job.logger.warning(f"Job {job_id} has no pipeline, skipping save_results")
return
Comment thread
carlosgjs marked this conversation as resolved.
Outdated

job.logger.info(f"Successfully saved results for job {job_id}")

# Deserialize the result
pipeline_result = PipelineResultsResponse(**result_data)

# Update progress tracking in Redis
state_manager = TaskStateManager(job.pk)
processed_image_ids = {str(img.id) for img in pipeline_result.source_images}
state_manager.mark_images_processed(processed_image_ids)

Comment thread
carlosgjs marked this conversation as resolved.
progress_info = state_manager.get_progress()
progress_percentage = 0.0

if progress_info is not None:
# Get updated progress
progress_percentage = progress_info.percentage

job.logger.info(
f"Job {job_id} progress: {progress_info.processed}/{progress_info.total} images processed "
f"({progress_percentage*100}%), {progress_info.remaining} remaining"
)
else:
job.logger.warning(f"No pending images found in Redis for job {job_id}, setting progress to 100%")
progress_percentage = 1.0

job.progress.update_stage(
"process",
status=JobState.SUCCESS if progress_percentage >= 1.0 else JobState.STARTED,
progress=progress_percentage,
)
job.save()

job.pipeline.save_results(results=pipeline_result, job_id=job.pk)
# Acknowledge the task via NATS
Comment thread
carlosgjs marked this conversation as resolved.
Outdated
try:

async def ack_task():
async with TaskQueueManager() as manager:
return await manager.acknowledge_task(reply_subject)

ack_success = run_in_async_loop(ack_task, f"acknowledging job {job.pk} via NATS")

if ack_success:
job.logger.info(f"Successfully acknowledged task via NATS: {reply_subject}")
else:
job.logger.warning(f"Failed to acknowledge task via NATS: {reply_subject}")
except Exception as ack_error:
job.logger.error(f"Error acknowledging task via NATS: {ack_error}")
# Don't fail the task if ACK fails - data is already saved

# Update job stage with calculated progress
job.progress.update_stage(
"results",
status=JobState.STARTED if progress_percentage < 1.0 else JobState.SUCCESS,
progress=progress_percentage,
)
job.save()
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated

except Job.DoesNotExist:
logger.error(f"Job {job_id} not found")
raise
except Exception as e:
logger.error(f"Failed to process pipeline result for job {job_id}: {e}")
# Celery will automatically retry based on autoretry_for
raise


@task_postrun.connect(sender=run_job)
@task_prerun.connect(sender=run_job)
def update_job_status(sender, task_id, task, *args, **kwargs):
Expand Down
Loading