Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
ae02d2e
fix: Job serialization overhead
carlosgjs Sep 30, 2025
24a15af
syntax
carlosgjs Sep 30, 2025
0da97a6
fix syntax
carlosgjs Sep 30, 2025
2db7d66
Simplify diagram
carlosgjs Oct 1, 2025
8a714cd
Add RabbitMQ
carlosgjs Oct 3, 2025
700f594
WIP: Use NATS JetStream for queuing
carlosgjs Oct 8, 2025
3b42e08
Merge branch 'main' into carlosg/jobio
carlosgjs Oct 14, 2025
8ea5d7d
Saving of results
carlosgjs Oct 17, 2025
61fc2c5
Update progress
carlosgjs Oct 17, 2025
9af597c
Clean up and refactor task state mgmt
carlosgjs Oct 24, 2025
7ff8865
fix async use
carlosgjs Oct 24, 2025
0fbe899
Merge branch 'main' into carlosg/jobio
carlosgjs Oct 24, 2025
7899fc5
Fix circular dependency, jobset query by pipeline slug
carlosgjs Oct 24, 2025
d9f8ffd
GH review comments
carlosgjs Oct 24, 2025
edad552
Add feature flag, rename "job" to "task"
carlosgjs Oct 29, 2025
d254867
Code reorganization
carlosgjs Oct 31, 2025
1cc890e
Resolve circular deps
carlosgjs Oct 31, 2025
84ee5a2
Update ami/jobs/models.py
carlosgjs Oct 31, 2025
09fee92
cleanup
carlosgjs Oct 31, 2025
4480b0d
Consistent progress updates, single image job command
carlosgjs Nov 4, 2025
3032709
Fix typo
carlosgjs Nov 4, 2025
3e7ef3b
Merge branch 'main' into carlosg/jobio
carlosgjs Nov 5, 2025
04be994
Merge branch 'main' into carlosg/jobio
carlosgjs Nov 18, 2025
a8b94e3
Remove unnecesary file
carlosgjs Nov 18, 2025
1fc20b5
Merge branch 'main' into carlosg/jobio
carlosgjs Nov 21, 2025
0a5c89e
Remove diagram, fix flakes
carlosgjs Nov 21, 2025
344f883
Use async_to_sync
carlosgjs Nov 21, 2025
df7eaa3
CR feedback
carlosgjs Nov 21, 2025
0391642
clean up
carlosgjs Nov 21, 2025
4ae27b0
more cleanup
carlosgjs Nov 21, 2025
4f50b3d
Apply suggestions from code review
carlosgjs Nov 21, 2025
a8fc79a
Remove old comments
carlosgjs Nov 21, 2025
4efdf07
Fix processing error cases
carlosgjs Nov 21, 2025
f221a1a
updates
carlosgjs Nov 21, 2025
1a9b80a
Merge branch 'main' into carlosg/jobio
carlosgjs Dec 9, 2025
3657fd2
Fix merge bugs, back to working state
carlosgjs Dec 9, 2025
2483592
Use PipelineProcessingTask for the queue, other fixes
carlosgjs Dec 10, 2025
0ae9674
Update tests
carlosgjs Dec 10, 2025
3c034a9
General cleanup
carlosgjs Dec 10, 2025
e9d2a1c
Add nats to CI and prod
carlosgjs Dec 10, 2025
3d198d0
Unit tests for new classes
carlosgjs Dec 10, 2025
f9a1226
Add nats to staging, don't retry save resutls task
carlosgjs Dec 19, 2025
3a73329
fix formatting
carlosgjs Dec 19, 2025
e241586
fix yaml formatting
carlosgjs Jan 13, 2026
1202063
Clean up and comments
carlos-irreverentlabs Jan 16, 2026
d2865f5
CR feedback
carlos-irreverentlabs Jan 16, 2026
b60eab0
merge
carlos-irreverentlabs Jan 16, 2026
0e350dd
Merge branch 'main' into carlosg/jobio
carlos-irreverentlabs Jan 16, 2026
936d768
fix lint
carlos-irreverentlabs Jan 16, 2026
7645b14
Merge branch 'main' into carlosg/jobio
carlosgjs Jan 20, 2026
644927f
Merge remote-tracking branch 'upstream/main'
carlosgjs Jan 22, 2026
02578b7
Merge branch 'main' into carlosg/jobio
carlosgjs Jan 22, 2026
0d614f9
Merge branch 'main' of github.com:RolnickLab/antenna into carlosg/jobio
mihow Jan 30, 2026
c9077cf
feat: use a pydantic schema for the task erorr state, mirror worker
mihow Jan 31, 2026
6564532
chore: rename function specific to nats
mihow Jan 31, 2026
6bcc610
docs: add plan for fixing the status displayed for async jobs
mihow Jan 31, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 58 additions & 6 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -1,20 +1,72 @@
.editorconfig
.gitattributes
.github
.gitignore
.gitlab-ci.yml
.idea
.pre-commit-config.yaml
.readthedocs.yml
.travis.yml
.git
ui
ami/media
backups
venv
.venv
.env
.envs
venv/
.venv/
.env/
.envs/
.envs/*
node_modules
data

# Python cache / bytecode
__pycache__/
*.py[cod]
*.pyo
*.pyd
*.pdb
*.egg-info/
*.egg
*.whl


# Django / runtime artifacts
*.log
*.pot
*.pyc
db.sqlite3
media/
staticfiles/ # collected static files (use collectstatic inside container)

# Node / UI dependencies (if using React/Vue in your UI service)
node_modules/
npm-debug.log
yarn-error.log
.pnpm-debug.log

# Docs build artifacts
/docs/_build/

# Git / VCS
.git/
.gitignore
.gitattributes
*.swp
*.swo

# IDE / editor
.vscode/
.idea/
*.iml

# OS cruft
.DS_Store
Thumbs.db

# Docker itself
.dockerignore
Dockerfile
docker-compose*.yml

# Build / dist
build/
dist/
.eggs/
3 changes: 3 additions & 0 deletions .envs/.local/.django
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ DJANGO_SUPERUSER_PASSWORD=localadmin
# Redis
REDIS_URL=redis://redis:6379/0

# NATS
NATS_URL=nats://nats:4222

# Celery / Flower
CELERY_FLOWER_USER=QSocnxapfMvzLqJXSsXtnEZqRkBtsmKT
CELERY_FLOWER_PASSWORD=BEQgmCtgyrFieKNoGTsux9YIye0I7P5Q7vEgfJD2C4jxmtHDetFaE2jhS7K7rxaf
Expand Down
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -276,3 +276,6 @@ sandbox/

# Other
flower

# huggingface cache
huggingface_cache/
24 changes: 23 additions & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,29 @@
"version": "0.2.0",
"configurations": [
{
"name": "Python Debugger: Remote Attach",
"name": "Current File",
"type": "debugpy",
"request": "launch",
"program": "${file}",
"console": "integratedTerminal"
},
{
"name": "Django attach",
"type": "debugpy",
"request": "attach",
"connect": {
"host": "localhost",
"port": 5679
},
"pathMappings": [
{
"localRoot": "${workspaceFolder}",
"remoteRoot": "."
}
]
},
{
"name": "Celery worker attach",
"type": "debugpy",
"request": "attach",
"connect": {
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ docker compose -f processing_services/example/docker-compose.yml up -d
- Django admin: http://localhost:8000/admin/
- OpenAPI / Swagger documentation: http://localhost:8000/api/v2/docs/
- Minio UI: http://minio:9001, Minio service: http://minio:9000
- NATS dashboard: https://natsdashboard.com/ (Add localhost)

NOTE: If one of these services is not working properly, it could be due another process is using the port. You can check for this with `lsof -i :<PORT_NUMBER>`.

Expand Down
144 changes: 132 additions & 12 deletions ami/jobs/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def get_status_label(status: JobState, progress: float) -> str:
if status in [JobState.CREATED, JobState.PENDING, JobState.RECEIVED]:
return "Waiting to start"
elif status in [JobState.STARTED, JobState.RETRY, JobState.SUCCESS]:
return f"{progress:.0%} complete"
return f"{progress:.0%} complete" # noqa E231
Comment thread
carlosgjs marked this conversation as resolved.
Outdated
Comment thread
carlosgjs marked this conversation as resolved.
Outdated
else:
return f"{status.name}"

Expand Down Expand Up @@ -132,14 +132,14 @@ def get_stage(self, stage_key: str) -> JobProgressStageDetail:
for stage in self.stages:
if stage.key == stage_key:
return stage
raise ValueError(f"Job stage with key '{stage_key}' not found in progress")
raise ValueError(f"Job stage with key '{stage_key}' not found in progress") # noqa E713
Comment thread
carlosgjs marked this conversation as resolved.
Outdated

def get_stage_param(self, stage_key: str, param_key: str) -> ConfigurableStageParam:
stage = self.get_stage(stage_key)
for param in stage.params:
if param.key == param_key:
return param
raise ValueError(f"Job stage parameter with key '{param_key}' not found in stage '{stage_key}'")
raise ValueError(f"Job stage parameter with key '{param_key}' not found in stage '{stage_key}'") # noqa E713

def add_stage_param(self, stage_key: str, param_name: str, value: typing.Any = None) -> ConfigurableStageParam:
stage = self.get_stage(stage_key)
Expand Down Expand Up @@ -326,10 +326,6 @@ def run(cls, job: "Job"):
job.finished_at = None
job.save()

# Keep track of sub-tasks for saving results, pair with batch number
save_tasks: list[tuple[int, AsyncResult]] = []
save_tasks_completed: list[tuple[int, AsyncResult]] = []

if job.delay:
update_interval_seconds = 2
last_update = time.time()
Expand Down Expand Up @@ -372,7 +368,7 @@ def run(cls, job: "Job"):
deployment=job.deployment,
source_images=[job.source_image_single] if job.source_image_single else None,
job_id=job.pk,
skip_processed=True,
skip_processed=False, # WIP don't commit
# shuffle=job.shuffle,
)
)
Expand All @@ -388,8 +384,6 @@ def run(cls, job: "Job"):
images = images[: job.limit]
image_count = len(images)
job.progress.add_stage_param("collect", "Limit", image_count)
else:
image_count = source_image_count

job.progress.update_stage(
"collect",
Expand All @@ -400,6 +394,17 @@ def run(cls, job: "Job"):
# End image collection stage
job.save()

# WIP: don't commit
# TODO: do this conditionally based on the type of processing service this job is using
# cls.process_images(job, images)
cls.queue_images_to_nats(job, images)

@classmethod
def process_images(cls, job, images):
image_count = len(images)
# Keep track of sub-tasks for saving results, pair with batch number
save_tasks: list[tuple[int, AsyncResult]] = []
Comment thread
mihow marked this conversation as resolved.
save_tasks_completed: list[tuple[int, AsyncResult]] = []
total_captures = 0
total_detections = 0
total_classifications = 0
Expand All @@ -419,7 +424,7 @@ def run(cls, job: "Job"):
job_id=job.pk,
project_id=job.project.pk,
)
job.logger.info(f"Processed image batch {i+1} in {time.time() - request_sent:.2f}s")
job.logger.info(f"Processed image batch {i+1} in {time.time() - request_sent:.2f}s") # noqa E231
Comment thread
carlosgjs marked this conversation as resolved.
Outdated
except Exception as e:
# Log error about image batch and continue
job.logger.error(f"Failed to process image batch {i+1}: {e}")
Expand Down Expand Up @@ -471,7 +476,7 @@ def run(cls, job: "Job"):

if image_count:
percent_successful = 1 - len(request_failed_images) / image_count if image_count else 0
job.logger.info(f"Processed {percent_successful:.0%} of images successfully.")
job.logger.info(f"Processed {percent_successful:.0%} of images successfully.") # noqa E231

# Check all Celery sub-tasks if they have completed saving results
save_tasks_remaining = set(save_tasks) - set(save_tasks_completed)
Expand Down Expand Up @@ -511,6 +516,121 @@ def run(cls, job: "Job"):
job.finished_at = datetime.datetime.now()
job.save()

# TODO: This needs to happen once a job is done
@classmethod
def cleanup_nats_resources(cls, job: "Job"):
"""
Clean up NATS JetStream resources (stream and consumer) for a completed job.

Args:
job: The Job instance
"""
import asyncio

from ami.utils.nats_queue import TaskQueueManager

job_id = f"job{job.pk}"

async def cleanup():
async with TaskQueueManager() as manager:
success = await manager.cleanup_job_resources(job_id)
return success

# Run cleanup in a new event loop
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
success = loop.run_until_complete(cleanup())
if success:
job.logger.info(f"Cleaned up NATS resources for job '{job_id}'")
else:
job.logger.warning(f"Failed to fully clean up NATS resources for job '{job_id}'")
except Exception as e:
job.logger.error(f"Error cleaning up NATS resources for job '{job_id}': {e}")
finally:
loop.close()

@classmethod
def queue_images_to_nats(cls, job: "Job", images: list):
"""
Queue all images for a job to a NATS JetStream stream for the job.

Args:
job: The Job instance
images: List of SourceImage instances to queue

Returns:
bool: True if all images were successfully queued, False otherwise
"""
import asyncio

from ami.utils.nats_queue import TaskQueueManager

job_id = f"job{job.pk}"
job.logger.info(f"Queuing {len(images)} images to NATS stream for job '{job_id}'")

# Prepare all messages outside of async context to avoid Django ORM issues
messages = []
for i, image in enumerate(images):
message = {
"job_id": job.pk,
"image_id": image.id if hasattr(image, "id") else image.pk,
"image_url": image.url() if hasattr(image, "url") else None,
"timestamp": (
image.timestamp.isoformat() if hasattr(image, "timestamp") and image.timestamp else None
),
"batch_index": i,
"total_images": len(images),
"queue_timestamp": datetime.datetime.now().isoformat(),
}
messages.append((image.pk, message))

async def queue_all_images():
successful_queues = 0
failed_queues = 0

async with TaskQueueManager() as manager:
for i, (image_pk, message) in enumerate(messages):
try:
logger.info(f"Queueing image {image_pk} to stream for job '{job_id}': {message}")
# Use TTR of 300 seconds (5 minutes) for image processing
success = await manager.publish_job(
Comment thread
carlosgjs marked this conversation as resolved.
Outdated
job_id=job_id,
data=message,
ttr=300, # 5 minutes visibility timeout
)
except Exception as e:
logger.error(f"Failed to queue image {image_pk} to stream for job '{job_id}': {e}")
success = False

if success:
successful_queues += 1
else:
failed_queues += 1

return successful_queues, failed_queues

# Run the async function in a new event loop to avoid conflicts with Django
# Use new_event_loop() to ensure we're not mixing with Django's async context
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
successful_queues, failed_queues = loop.run_until_complete(queue_all_images())
finally:
loop.close()

# Log results (back in sync context)
if successful_queues > 0:
job.logger.info(
f"Successfully queued {successful_queues}/{len(images)} images to stream for job '{job_id}'"
)

if failed_queues > 0:
job.logger.warning(f"Failed to queue {failed_queues}/{len(images)} images to stream for job '{job_id}'")
return False

return True


class DataStorageSyncJob(JobType):
name = "Data storage sync"
Expand Down
Loading
Loading