-
Notifications
You must be signed in to change notification settings - Fork 11
Distributed Worker Architecture for ML Processing (Processing Service V2) pt. 1 #987
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
✅ Deploy Preview for antenna-preview canceled.
|
|
Exciting! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR introduces Processing Service V2, enabling a pull-based task queue architecture using NATS JetStream instead of the push-based Celery approach. Workers can now pull tasks via HTTP endpoints, process them independently, and acknowledge completion without maintaining persistent connections.
Key changes:
- Added NATS JetStream integration for distributed task queuing with configurable visibility timeouts
- Introduced new REST API endpoints for task pulling (
/jobs/{id}/tasks) and result submission (/jobs/{id}/result) - Implemented Redis-based progress tracking to handle asynchronous worker updates
Reviewed Changes
Copilot reviewed 15 out of 16 changed files in this pull request and generated 11 comments.
Show a summary per file
| File | Description |
|---|---|
| requirements/base.txt | Added nats-py dependency for NATS client support |
| object_model_diagram.md | Added comprehensive Mermaid diagram documenting ML pipeline system architecture |
| docker-compose.yml | Added NATS JetStream service with health checks and monitoring |
| config/settings/base.py | Added NATS_URL configuration setting |
| ami/utils/nats_queue.py | New TaskQueueManager class for NATS JetStream operations |
| ami/jobs/views.py | Added task pulling and result submission endpoints with pipeline filtering |
| ami/jobs/utils.py | Helper function for running async code in sync Django context |
| ami/jobs/tasks.py | New Celery task for processing pipeline results asynchronously |
| ami/jobs/task_state.py | TaskStateManager for Redis-based job progress tracking |
| ami/jobs/models.py | Added queue_images_to_nats method and NATS cleanup logic |
| ami/base/views.py | Fixed request.data handling when not a dict |
| README.md | Added NATS dashboard documentation link |
| .vscode/launch.json | Added debug configurations for Django and Celery containers |
| .envs/.local/.django | Added NATS_URL environment variable |
| .dockerignore | Expanded with comprehensive ignore patterns |
Comments suppressed due to low confidence (1)
object_model_diagram.md:1
- The comment at line 13 appears to be template text from instructions rather than actual documentation content. This namedtuple field description doesn't match the file's purpose as an object model diagram.
# Object Model Diagram: ML Pipeline System
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
📝 WalkthroughWalkthroughAdds NATS JetStream task queuing, a TaskQueueManager, Redis-backed task-state tracking, async ML job queuing behind a feature flag, Celery result-processing with NATS ACKing, API endpoints to reserve/submit tasks, tests, Docker/CI/staging and config updates, and a new nats dependency. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant MLJob
participant Flags as FeatureFlags
participant QueueMgr as TaskQueueManager
participant State as TaskStateManager
participant Worker
participant Celery as process_nats_pipeline_result
participant DB as Database
Client->>MLJob: run(job, images)
MLJob->>Flags: check async_pipeline_workers
alt async enabled
MLJob->>State: initialize_job(image_ids)
MLJob->>QueueMgr: queue_images_to_nats(job, images)
loop per image/batch
QueueMgr->>QueueMgr: publish_task(job_id, message)
end
else sync path
MLJob->>MLJob: process_images(job, images)
MLJob->>DB: persist results & progress
end
Note over Worker,QueueMgr: Worker reserves tasks from JetStream
Worker->>QueueMgr: reserve_task(job_id, batch)
Worker->>Celery: run pipeline, produce result + reply_subject
Celery->>State: update_state(processed_ids, "process", request_id)
Celery->>DB: save pipeline results
Celery->>QueueMgr: acknowledge_task(reply_subject)
Celery->>State: update_state(processed_ids, "results", request_id)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested labels
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (19)
.dockerignore(1 hunks).envs/.local/.django(1 hunks).gitignore(1 hunks).vscode/launch.json(1 hunks)README.md(1 hunks)ami/base/views.py(1 hunks)ami/jobs/models.py(8 hunks)ami/jobs/tasks.py(2 hunks)ami/jobs/views.py(3 hunks)ami/main/models.py(1 hunks)ami/ml/orchestration/jobs.py(1 hunks)ami/ml/orchestration/nats_queue.py(1 hunks)ami/ml/orchestration/task_state.py(1 hunks)ami/ml/orchestration/utils.py(1 hunks)ami/utils/requests.py(2 hunks)config/settings/base.py(2 hunks)docker-compose.yml(4 hunks)object_model_diagram.md(1 hunks)requirements/base.txt(3 hunks)
🧰 Additional context used
🧬 Code graph analysis (8)
ami/ml/orchestration/nats_queue.py (1)
ami/jobs/views.py (1)
result(256-339)
ami/ml/orchestration/task_state.py (1)
ami/ml/orchestration/jobs.py (1)
cleanup(20-23)
ami/jobs/views.py (3)
ami/jobs/tasks.py (1)
process_pipeline_result(45-138)ami/jobs/models.py (4)
Job(727-1012)JobState(27-63)logger(997-1006)final_states(58-59)ami/ml/orchestration/nats_queue.py (2)
TaskQueueManager(28-294)reserve_task(152-208)
ami/jobs/tasks.py (5)
ami/ml/orchestration/nats_queue.py (2)
TaskQueueManager(28-294)acknowledge_task(210-229)ami/ml/orchestration/task_state.py (3)
TaskStateManager(17-97)mark_images_processed(48-61)get_progress(63-90)ami/ml/orchestration/utils.py (1)
run_in_async_loop(8-18)ami/jobs/models.py (5)
Job(727-1012)JobState(27-63)logger(997-1006)update_stage(168-188)save(947-958)ami/ml/models/pipeline.py (3)
save(1115-1121)save_results(809-917)save_results(1107-1108)
ami/ml/orchestration/jobs.py (4)
ami/jobs/models.py (2)
Job(727-1012)logger(997-1006)ami/ml/orchestration/nats_queue.py (3)
TaskQueueManager(28-294)cleanup_job_resources(278-294)publish_task(119-150)ami/ml/orchestration/task_state.py (3)
TaskStateManager(17-97)cleanup(92-97)initialize_job(38-46)ami/ml/orchestration/utils.py (1)
run_in_async_loop(8-18)
ami/ml/orchestration/utils.py (1)
ami/jobs/models.py (1)
logger(997-1006)
ami/base/views.py (1)
ami/main/api/views.py (1)
get(1595-1651)
ami/jobs/models.py (3)
ami/ml/orchestration/jobs.py (1)
queue_images_to_nats(28-107)ami/main/models.py (1)
SourceImage(1622-1870)ami/ml/models/pipeline.py (2)
process_images(163-278)process_images(1091-1105)
🪛 LanguageTool
object_model_diagram.md
[grammar] ~167-~167: Ensure spelling is correct
Context: ...ts 4. Job tracks progress through JobProgress and JobProgressStageDetail
(QB_NEW_EN_ORTHOGRAPHY_ERROR_IDS_1)
🪛 markdownlint-cli2 (0.18.1)
object_model_diagram.md
15-15: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
31-31: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
38-38: Bare URL used
(MD034, no-bare-urls)
39-39: Bare URL used
(MD034, no-bare-urls)
40-40: Bare URL used
(MD034, no-bare-urls)
41-41: Bare URL used
(MD034, no-bare-urls)
42-42: Bare URL used
(MD034, no-bare-urls)
42-42: Bare URL used
(MD034, no-bare-urls)
43-43: Bare URL used
(MD034, no-bare-urls)
61-61: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
77-77: Bare URL used
(MD034, no-bare-urls)
97-97: Bare URL used
(MD034, no-bare-urls)
118-118: Code block style
Expected: fenced; Actual: indented
(MD046, code-block-style)
122-122: Code block style
Expected: fenced; Actual: indented
(MD046, code-block-style)
126-126: Code block style
Expected: fenced; Actual: indented
(MD046, code-block-style)
130-130: Code block style
Expected: fenced; Actual: indented
(MD046, code-block-style)
🪛 Ruff (0.14.2)
ami/ml/orchestration/nats_queue.py
70-70: Unused method argument: ttr
(ARG002)
73-73: Avoid specifying long messages outside the exception class
(TRY003)
81-81: Do not catch blind exception: Exception
(BLE001)
94-94: Avoid specifying long messages outside the exception class
(TRY003)
103-103: Do not catch blind exception: Exception
(BLE001)
132-132: Avoid specifying long messages outside the exception class
(TRY003)
146-146: Consider moving this statement to an else block
(TRY300)
148-148: Do not catch blind exception: Exception
(BLE001)
149-149: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
164-164: Avoid specifying long messages outside the exception class
(TRY003)
206-206: Do not catch blind exception: Exception
(BLE001)
207-207: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
221-221: Avoid specifying long messages outside the exception class
(TRY003)
226-226: Consider moving this statement to an else block
(TRY300)
227-227: Do not catch blind exception: Exception
(BLE001)
228-228: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
242-242: Avoid specifying long messages outside the exception class
(TRY003)
250-250: Consider moving this statement to an else block
(TRY300)
251-251: Do not catch blind exception: Exception
(BLE001)
252-252: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
266-266: Avoid specifying long messages outside the exception class
(TRY003)
273-273: Consider moving this statement to an else block
(TRY300)
274-274: Do not catch blind exception: Exception
(BLE001)
275-275: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
ami/ml/orchestration/task_state.py
35-35: Unused blanket noqa directive
Remove unused noqa directive
(RUF100)
36-36: Unused blanket noqa directive
Remove unused noqa directive
(RUF100)
ami/jobs/views.py
33-43: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
73-73: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
218-218: Unused method argument: pk
(ARG002)
236-236: Avoid specifying long messages outside the exception class
(TRY003)
244-244: Loop control variable i not used within loop body
Rename unused i to _i
(B007)
283-283: Avoid specifying long messages outside the exception class
(TRY003)
288-288: Avoid specifying long messages outside the exception class
(TRY003)
298-298: Avoid specifying long messages outside the exception class
(TRY003)
301-301: Avoid specifying long messages outside the exception class
(TRY003)
322-322: Do not catch blind exception: Exception
(BLE001)
323-323: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
ami/jobs/tasks.py
45-45: Unused function argument: self
(ARG001)
120-120: Do not catch blind exception: Exception
(BLE001)
121-121: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
133-133: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
136-136: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
config/settings/base.py
28-28: Unused noqa directive (non-enabled: E231, E501)
Remove unused noqa directive
(RUF100)
ami/ml/orchestration/jobs.py
69-69: Loop control variable i not used within loop body
Rename unused i to _i
(B007)
78-78: Do not catch blind exception: Exception
(BLE001)
ami/ml/orchestration/utils.py
14-14: Do not catch blind exception: Exception
(BLE001)
15-15: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
ami/jobs/models.py
75-75: Unused blanket noqa directive
Remove unused noqa directive
(RUF100)
430-430: Unused blanket noqa directive
Remove unused noqa directive
(RUF100)
482-482: Unused blanket noqa directive
Remove unused noqa directive
(RUF100)
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
ami/jobs/views.py (1)
226-250: Add maximum batch size constraint to prevent resource exhaustion.The
batchparameter has no upper bound, allowing users to request arbitrarily large numbers of tasks. A single request with a high batch value ties up the request thread (viaasync_to_sync), and multiple concurrent requests could exhaust thread resources. Add a configurable max cap.
🤖 Fix all issues with AI agents
In `@ami/jobs/views.py`:
- Around line 270-304: The current code validates and enqueues each item as it
iterates over results (using PipelineTaskResult and
process_pipeline_result.delay), which can dispatch partial work if a later item
fails; change the flow to first validate all items without calling
process_pipeline_result.delay (e.g., iterate results and construct/validate
PipelineTaskResult objects into a temporary list), and only after every item
validates successfully iterate that temporary list to call
process_pipeline_result.delay and populate queued_tasks for the response (keep
using job.pk, reply_subject and result_data.dict() when enqueuing).
In `@ami/ml/orchestration/jobs.py`:
- Around line 41-92: When building tasks, track skipped images (e.g., add a
skipped_count variable incremented in the "if not image_url" branch) and treat
them as failures: after queue_all_images runs, add skipped_count to
failed_queues (or if tasks is empty, mark job as FAILED when skipped_count > 0
instead of SUCCESS). Update logic around TaskStateManager initialization and the
"if tasks:" / else branch so totals (successful_queues, failed_queues) include
skipped_count and job.progress/state reflect that skipped images count as
failures; reference the PipelineProcessingTask creation loop, the skipped image
logger call, the queue_all_images async function, and the final
async_to_sync(queue_all_images)() call to locate where to apply these changes.
♻️ Duplicate comments (1)
ami/jobs/tasks.py (1)
102-130: Post-ACK failures can leave progress stale—ACK last or retry.ACK is sent before results-stage updates; if
update_state/_update_job_progressfails after ACK, NATS won’t redeliver and progress may stay behind. Consider moving ACK after results-stage updates (or explicitly retry on post-ACK failures).🐛 Proposed fix
- _ack_task_via_nats(reply_subject, job.logger) - # Update job stage with calculated progress - progress_info = state_manager.update_state(processed_image_ids, stage="results", request_id=self.request.id) + # Update job stage with calculated progress + progress_info = state_manager.update_state(processed_image_ids, stage="results", request_id=self.request.id) @@ - _update_job_progress(job_id, "results", progress_info.percentage) + _update_job_progress(job_id, "results", progress_info.percentage) + _ack_task_via_nats(reply_subject, job.logger)
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
ami/jobs/tasks.pyami/jobs/views.pyami/ml/orchestration/jobs.pyami/ml/orchestration/nats_queue.py
🧰 Additional context used
🧬 Code graph analysis (3)
ami/ml/orchestration/nats_queue.py (1)
ami/ml/schemas.py (1)
PipelineProcessingTask(217-228)
ami/jobs/views.py (3)
ami/jobs/tasks.py (1)
process_pipeline_result(47-130)ami/ml/schemas.py (1)
PipelineTaskResult(231-237)ami/ml/orchestration/nats_queue.py (2)
TaskQueueManager(35-300)reserve_task(159-214)
ami/ml/orchestration/jobs.py (4)
ami/jobs/models.py (4)
Job(734-1019)JobState(27-63)logger(1004-1013)save(954-965)ami/ml/orchestration/nats_queue.py (3)
TaskQueueManager(35-300)cleanup_job_resources(284-300)publish_task(126-157)ami/ml/orchestration/task_state.py (3)
TaskStateManager(17-125)cleanup(119-125)initialize_job(39-49)ami/ml/schemas.py (1)
PipelineProcessingTask(217-228)
🪛 Ruff (0.14.11)
ami/ml/orchestration/nats_queue.py
80-80: Avoid specifying long messages outside the exception class
(TRY003)
88-88: Do not catch blind exception: Exception
(BLE001)
101-101: Avoid specifying long messages outside the exception class
(TRY003)
110-110: Do not catch blind exception: Exception
(BLE001)
138-138: Avoid specifying long messages outside the exception class
(TRY003)
153-153: Consider moving this statement to an else block
(TRY300)
155-155: Do not catch blind exception: Exception
(BLE001)
156-156: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
171-171: Avoid specifying long messages outside the exception class
(TRY003)
212-212: Do not catch blind exception: Exception
(BLE001)
213-213: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
227-227: Avoid specifying long messages outside the exception class
(TRY003)
232-232: Consider moving this statement to an else block
(TRY300)
233-233: Do not catch blind exception: Exception
(BLE001)
234-234: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
248-248: Avoid specifying long messages outside the exception class
(TRY003)
256-256: Consider moving this statement to an else block
(TRY300)
257-257: Do not catch blind exception: Exception
(BLE001)
258-258: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
272-272: Avoid specifying long messages outside the exception class
(TRY003)
279-279: Consider moving this statement to an else block
(TRY300)
280-280: Do not catch blind exception: Exception
(BLE001)
281-281: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
ami/jobs/views.py
253-253: Unused method argument: pk
(ARG002)
306-306: Avoid specifying long messages outside the exception class
(TRY003)
308-308: Do not catch blind exception: Exception
(BLE001)
309-309: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
ami/jobs/tasks.py
98-98: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
124-124: Abstract raise to an inner function
(TRY301)
127-127: Do not catch blind exception: Exception
(BLE001)
128-130: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
146-146: Do not catch blind exception: Exception
(BLE001)
147-147: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
ami/ml/orchestration/jobs.py
75-75: Do not catch blind exception: Exception
(BLE001)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: Redirect rules
- GitHub Check: Header rules
- GitHub Check: Pages changed
- GitHub Check: test
🔇 Additional comments (8)
ami/ml/orchestration/jobs.py (1)
10-25: LGTM: cleanup helper returns success status.Returning the async cleanup result allows callers to react to failures.
ami/ml/orchestration/nats_queue.py (4)
26-63: LGTM: connection lifecycle is cleaned up.Context manager setup/teardown is clear and ensures the client is closed.
77-125: LGTM: per-job stream/consumer provisioning is clear.
126-214: LGTM: publish/reserve flow and reply_subject mapping.
216-300: LGTM: ACK and cleanup helpers are straightforward.ami/jobs/tasks.py (3)
133-148: LGTM: ACK helper encapsulates NATS call cleanly.
151-166: LGTM: atomic progress update under row lock.
207-229: LGTM: lightweight timing helper is clear.
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
|
@coderabbitai can you make tickets for these? The datetime timezone is a nuanced one. We use local times only for SourceImage timestamps (we only ever want to know the local time for those images and it introduces errors if we convert to UTC). But for job.finished_at and other aspects of the Django CMS, timestamps should have timezones (we want to show when a job started in current web users local time).
|
|
Tip For best results, initiate chat on the files or code changes. 🧩 Analysis chain
🏁 Script executed: Length of output: 127
Both issues are assigned to you and include references back to this PR and your requirements. |
mihow
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is ready to merge! Reminder that this is only available behind a feature flag, so the job status issue will not appear for any user except us while testing. I added a plan here .agents/planning/async-job-status-handling.md, and we do have some additional work to do related to regarding job status anyway (#1025).
The last change I'd like to make is updating the PR description to describe what this introduces in the context of Antenna (adding a new distributed processing service / consumer approach).
Why This Change
Antenna's original ML processing architecture was designed as a demonstration: a single job would connect to a processing service endpoint and wait for images synchronously. This approach has become a bottleneck as the platform has grown:
This PR introduces a pull-based distributed worker architecture that fundamentally changes how ML processing works in Antenna. Users still queue jobs in Antenna, but workers pull tasks from the queue rather than Antenna pushing to them. Workers authenticate with a project token and can register as a service to subscribe to job queues.
Architecture Overview
Key Advantages
Resilience — Tasks are individually tracked. Network failures affect only the current batch, not the entire job. Failed tasks can be re-queued automatically.
Horizontal scaling — Run as many workers as you have compute resources. A job that takes hours with one worker can complete in minutes with ten.
No public endpoint required — Workers pull tasks from Antenna's API. They can run anywhere: behind university firewalls, on HPC clusters, on local workstations with GPUs, or in cloud environments.
Faster overall processing — Parallelism + reduced network sensitivity = significantly faster job completion.
Researchers can still bring their own ML models by following the new (but similar) API contract. We provide several pipelines, but custom models work the same way.
Summary
Initial version of the Processing service V2. Follow up to the initial API structure here #1046
See the worker implementation here RolnickLab/ami-data-companion#94
Closes #971
Closes #968
Closes #969
Current State
The async processing path is working but disabled by default in this PR to allow for extended testing. When enabled, starting a job creates a queue for that job and populates it with one task per image. The tasks can be pulled and ACKed via the APIs introduced in PR #1046. The new path can be enabled for a project via the
async_pipeline_workersfeature flag.PR #1046 introduced a scaffold of the API endpoints & schemas, which will be published in the documentation when finalized.
List of Changes
TaskStateManagerandTaskQueueManagerTaskQueueManagerandTaskStateManagerFollow-up Work
Related Issues
See issues #970 and #971.
How to Test the Changes
This path can be enabled by turning on the
job.project.feature_flags.async_pipeline_workersfeature flag, seeami/jobs/models.py:400:And running the
ami workerfrom RolnickLab/ami-data-companion#94Test
Test both modes by tweaking the flag in the django admin console:

Checklist