Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
9129421
WIP: Pull worker and REST dataset
carlosgjs Oct 15, 2025
41fef93
Clean-up, addd "worker" cli command, move token to env var
carlosgjs Oct 17, 2025
87910aa
Post results back
carlosgjs Oct 17, 2025
c67afce
Progress updates working
carlosgjs Oct 17, 2025
64e188d
clean up
carlosgjs Oct 24, 2025
c00de9d
Better error handling
carlosgjs Nov 4, 2025
3b60538
Support multiple pipelines
carlosgjs Dec 4, 2025
45e68bc
Use app.state for the service info
carlosgjs Dec 5, 2025
3c4dd8c
API launch target
carlosgjs Dec 5, 2025
8f76365
Integration fixes
carlosgjs Dec 9, 2025
bef1cd7
Use PipelineProcessingTask instead of raw dicts
carlosgjs Dec 10, 2025
52cff32
Fix to returned results
carlos-irreverentlabs Dec 12, 2025
f3f3cd6
Trigger CI workflows
mihow Jan 24, 2026
589cd0d
Add Antenna API settings for worker configuration
mihow Jan 24, 2026
c4147bd
Add Pydantic schemas for Antenna API responses
mihow Jan 24, 2026
f7f454a
Refactor worker to use Settings pattern and improve robustness
mihow Jan 24, 2026
7846510
Improve datasets error handling and API contract
mihow Jan 24, 2026
822c436
Add type annotations to update_detection_classification
mihow Jan 24, 2026
2f26e0f
Add Antenna worker documentation
mihow Jan 24, 2026
99e685e
Update poetry.lock with dependency updates
mihow Jan 24, 2026
ab073b3
Replace fragile urljoin with explicit f-string URL construction
mihow Jan 24, 2026
078aa26
Use plural names for batch dict keys containing lists
mihow Jan 24, 2026
38942ee
Merge branch 'main' of https://github.com/RolnickLab/ami-data-manager…
mihow Jan 24, 2026
ce1d754
Fix API tests not running in main test suite
mihow Jan 24, 2026
29172d7
Rename batch result schemas to use Antenna prefix for consistency
mihow Jan 24, 2026
d85bafb
turn off typer show locals
carlosgjs Jan 27, 2026
22c4182
add back help text
carlosgjs Jan 27, 2026
a30ffd5
Flake fixes
carlosgjs Jan 27, 2026
5baab55
Fix REST dataloader to use localization_batch_size for inference batc…
mihow Jan 28, 2026
1bf5ee5
Fix type annotations to use explicit | None syntax
mihow Jan 28, 2026
1a523b2
Retry worker API requests with urllib3 adapter, reuse sessions (#104)
mihow Jan 28, 2026
9bd7142
AMI: Pipeline Registration (#106)
mihow Jan 29, 2026
602b2bc
Address code review feedback
mihow Jan 29, 2026
b1b184c
Disable POST retries by default in get_http_session
mihow Jan 29, 2026
ce3d967
Add validation and error handling improvements
mihow Jan 29, 2026
15d07c4
Remove redundant worker tests
mihow Jan 29, 2026
15da4dd
Refactor: Extract Antenna integration into dedicated module
mihow Jan 30, 2026
3825517
chore: remove temporary plans
mihow Jan 30, 2026
8e9c7fb
Simplify HTTP session config: hardcode retry, pass auth explicitly
mihow Jan 30, 2026
1c5ed89
feat: add example service file for Antenna worker, add comments
mihow Jan 30, 2026
b427ed2
fix: guard torch.cuda.empty_cache() calls with is_available() check
mihow Jan 30, 2026
2cc0259
fix: use sys.executable for pytest subprocess call
mihow Jan 30, 2026
2594bf3
fix: handle post_batch_results failure to prevent silent data loss
mihow Jan 30, 2026
4598278
refactor: make 'ami worker' the default command, use singular --pipel…
mihow Jan 30, 2026
361da2a
fix: handle post_batch_results failure to prevent silent data loss
mihow Jan 30, 2026
c4df11c
chore: remove validate_dwc_export.py (not meant for this PR)
mihow Jan 30, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,9 @@ AMI_CLASSIFICATION_THRESHOLD=0.6
AMI_LOCALIZATION_BATCH_SIZE=2
AMI_CLASSIFICATION_BATCH_SIZE=20
AMI_NUM_WORKERS=1

# Antenna API Worker Settings (for processing jobs from Antenna platform)
# See: https://github.com/RolnickLab/antenna
AMI_ANTENNA_API_BASE_URL=http://localhost:8000/api/v2
AMI_ANTENNA_API_AUTH_TOKEN=your_antenna_auth_token_here
AMI_ANTENNA_API_BATCH_SIZE=4
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -142,3 +142,6 @@ db_data/
# Test files
sample_images
bak

# Local scratch for moving untracked files
scratch/
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ repos:
types: [pyi]

- repo: https://github.com/pycqa/flake8
rev: 3.8.3
rev: 4.0.0
hooks:
- id: flake8
files: .
Expand Down
29 changes: 29 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Python Debugger: Current File",
"type": "debugpy",
"request": "launch",
"program": "${file}",
"console": "integratedTerminal"
},
{
"name": "Run worker",
"type": "debugpy",
"request": "launch",
"module": "trapdata.cli.base",
"args": ["worker"]
},
{
"name": "Run api",
"type": "debugpy",
"request": "launch",
"module": "trapdata.cli.base",
"args": ["api"]
}
]
}
5 changes: 3 additions & 2 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ This file helps AI agents (like Claude) work efficiently with the AMI Data Compa
3. **Always prefer command line tools** to avoid expensive API requests (e.g., use git and jq instead of reading whole files)
4. **Use bulk operations and prefetch patterns** to minimize database queries
5. **Commit often** - Small, focused commits make debugging easier
6. **Use TDD whenever possible** - Tests prevent regressions and document expected behavior
7. **Keep it simple** - Always think hard and evaluate more complex approaches and alternative approaches before moving forward
6. **Use `git add -p` for staging** - Interactive staging to add only relevant changes, creating logical commits
7. **Use TDD whenever possible** - Tests prevent regressions and document expected behavior
8. **Keep it simple** - Always think hard and evaluate more complex approaches and alternative approaches before moving forward

### Think Holistically

Expand Down
54 changes: 54 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,60 @@ ami api

View the interactive API docs at http://localhost:2000/

## Running the Antenna Worker

The worker polls the Antenna platform API for queued image processing jobs, downloads images, runs detection and classification, and posts results back to Antenna.

**Setup:**

1. Get your Antenna auth token from your Antenna project settings
2. Configure the worker in `.env`:

```sh
AMI_ANTENNA_API_BASE_URL=https://antenna.insectai.org/api/v2 # Or your Antenna instance
AMI_ANTENNA_API_AUTH_TOKEN=your_token_here
AMI_ANTENNA_API_BATCH_SIZE=4
AMI_NUM_WORKERS=2 # Safe for REST API (atomic task dequeue)
```

**Register pipelines (optional):**

Register available ML pipelines with your Antenna projects:

```sh
ami worker register "My Worker Name" --project 1 --project 2
# Or register for all accessible projects:
ami worker register "My Worker Name"
```

**Run the worker:**

```sh
# Process all pipelines:
ami worker

# Or specify specific pipeline(s):
ami worker --pipeline moth_binary
ami worker --pipeline moth_binary --pipeline panama_moths_2024
```

The worker will:

1. Poll Antenna for jobs matching the specified pipeline(s)
2. Download images from the job queue
3. Run detection and classification
4. Post results back to Antenna
5. Repeat until queue is empty, then sleep and poll again

**Notes:**

- Multiple workers can run in parallel (they won't duplicate work)
- Auth token ties results to your Antenna project
- Worker continues running until interrupted (Ctrl+C)
- Safe to run multiple workers on different machines

For more information, see the [Antenna platform documentation](https://github.com/RolnickLab/antenna).

## Web UI demo (Gradio)

A simple web UI is also available to test the inference pipeline. This is a quick way to test models on a remote server via a web browser.
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ gradio = "^4.41.0"

[tool.pytest.ini_options]
asyncio_mode = 'auto'
testpaths = ["trapdata/tests", "trapdata/**/tests"]

[tool.isort]
profile = "black"
Expand Down
20 changes: 20 additions & 0 deletions trapdata/antenna/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
"""Antenna platform integration module.

This module provides integration with the Antenna platform for remote image processing.
It includes:
- API client for fetching jobs and posting results
- Worker loop for continuous job processing
- Pipeline registration with Antenna projects
- Schemas for Antenna API requests/responses
- Dataset classes for streaming tasks from the API
"""

from trapdata.antenna import client, datasets, registration, schemas, worker

__all__ = [
"client",
"datasets",
"registration",
"schemas",
"worker",
]
110 changes: 110 additions & 0 deletions trapdata/antenna/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
"""Antenna API client for fetching jobs and posting results."""

import requests

from trapdata.antenna.schemas import AntennaJobsListResponse, AntennaTaskResult
from trapdata.api.utils import get_http_session
from trapdata.common.logs import logger


def get_jobs(
base_url: str,
auth_token: str,
pipeline_slug: str,
) -> list[int]:
"""Fetch job ids from the API for the given pipeline.

Calls: GET {base_url}/jobs?pipeline__slug=<pipeline>&ids_only=1

Args:
base_url: Antenna API base URL (e.g., "http://localhost:8000/api/v2")
auth_token: API authentication token
pipeline_slug: Pipeline slug to filter jobs

Returns:
List of job ids (possibly empty) on success or error.
"""
with get_http_session(auth_token) as session:
try:
url = f"{base_url.rstrip('/')}/jobs"
params = {
"pipeline__slug": pipeline_slug,
"ids_only": 1,
"incomplete_only": 1,
}

resp = session.get(url, params=params, timeout=30)
resp.raise_for_status()

# Parse and validate response with Pydantic
jobs_response = AntennaJobsListResponse.model_validate(resp.json())
return [job.id for job in jobs_response.results]
except requests.RequestException as e:
logger.error(f"Failed to fetch jobs from {base_url}: {e}")
return []
except Exception as e:
logger.error(f"Failed to parse jobs response: {e}")
return []
Comment thread
mihow marked this conversation as resolved.


def post_batch_results(
base_url: str,
auth_token: str,
job_id: int,
results: list[AntennaTaskResult],
) -> bool:
"""
Post batch results back to the API.

Args:
base_url: Antenna API base URL (e.g., "http://localhost:8000/api/v2")
auth_token: API authentication token
job_id: Job ID
results: List of AntennaTaskResult objects

Returns:
True if successful, False otherwise
"""
url = f"{base_url.rstrip('/')}/jobs/{job_id}/result/"
payload = [r.model_dump(mode="json") for r in results]

with get_http_session(auth_token) as session:
try:
response = session.post(url, json=payload, timeout=60)
response.raise_for_status()
logger.info(f"Successfully posted {len(results)} results to {url}")
return True
except requests.RequestException as e:
logger.error(f"Failed to post results to {url}: {e}")
return False


def get_user_projects(base_url: str, auth_token: str) -> list[dict]:
"""
Fetch all projects the user has access to.

Args:
base_url: Base URL for the API (should NOT include /api/v2)
auth_token: API authentication token

Returns:
List of project dictionaries with 'id' and 'name' fields
"""
with get_http_session(auth_token) as session:
try:
url = f"{base_url.rstrip('/')}/projects/"
response = session.get(url, timeout=30)
response.raise_for_status()
data = response.json()

projects = data.get("results", [])
if isinstance(projects, list):
return projects
else:
logger.warning(
f"Unexpected projects format from {url}: {type(projects)}"
)
return []
except requests.RequestException as e:
logger.error(f"Failed to fetch projects from {base_url}: {e}")
return []
Loading