Skip to content

Conversation

@carlosgjs
Copy link
Collaborator

@carlosgjs carlosgjs commented Oct 8, 2025

Why This Change

Antenna's original ML processing architecture was designed as a demonstration: a single job would connect to a processing service endpoint and wait for images synchronously. This approach has become a bottleneck as the platform has grown:

  • Long-running jobs are fragile — network interruptions can cause long-running jobs to images to fail partway through
  • Single worker bottleneck — only one processing service can work on a job at a time
  • Requires public endpoints — researchers must expose their ML models via a publicly accessible server, which is often impractical in university HPC environments or local workstations behind firewalls

This PR introduces a pull-based distributed worker architecture that fundamentally changes how ML processing works in Antenna. Users still queue jobs in Antenna, but workers pull tasks from the queue rather than Antenna pushing to them. Workers authenticate with a project token and can register as a service to subscribe to job queues.

Architecture Overview

┌─────────────────────────────────────────────────────────────────────┐
│                          ANTENNA (Server)                           │
│                                                                     │
│  User queues job → Job creates tasks → Tasks pushed to NATS queue  │
│                                        ↓                            │
│                          Tasks API (pull/ack/result)                │
└─────────────────────────────────────────────────────────────────────┘
                                    ↑
            ┌───────────────────────┼───────────────────────┐
            │                       │                       │
      ┌─────┴─────┐           ┌─────┴─────┐           ┌─────┴─────┐
      │  Worker   │           │  Worker   │           │  Worker   │
      │  (HPC)    │           │  (Local)  │           │  (Cloud)  │
      └───────────┘           └───────────┘           └───────────┘
        GPU node              Workstation              Any compute

Key Advantages

  1. Resilience — Tasks are individually tracked. Network failures affect only the current batch, not the entire job. Failed tasks can be re-queued automatically.

  2. Horizontal scaling — Run as many workers as you have compute resources. A job that takes hours with one worker can complete in minutes with ten.

  3. No public endpoint required — Workers pull tasks from Antenna's API. They can run anywhere: behind university firewalls, on HPC clusters, on local workstations with GPUs, or in cloud environments.

  4. Faster overall processing — Parallelism + reduced network sensitivity = significantly faster job completion.

Researchers can still bring their own ML models by following the new (but similar) API contract. We provide several pipelines, but custom models work the same way.


Summary

Initial version of the Processing service V2. Follow up to the initial API structure here #1046

See the worker implementation here RolnickLab/ami-data-companion#94

Closes #971
Closes #968
Closes #969

Current State

The async processing path is working but disabled by default in this PR to allow for extended testing. When enabled, starting a job creates a queue for that job and populates it with one task per image. The tasks can be pulled and ACKed via the APIs introduced in PR #1046. The new path can be enabled for a project via the async_pipeline_workers feature flag.

PR #1046 introduced a scaffold of the API endpoints & schemas, which will be published in the documentation when finalized.

List of Changes

  • Added NATS JetStream to the docker compose. I also tried RabbitMQ and Beanstalkd, but they don't support the visibility timeout semantics we want or a disconnected mode of pulling and ACKing tasks.
  • Added TaskStateManager and TaskQueueManager
  • Added the queuing and async results processing logic
  • Implemented task pull/ack/result endpoints in job views (previously stubs)
  • Added unit tests for TaskQueueManager and TaskStateManager

Follow-up Work

Related Issues

See issues #970 and #971.

How to Test the Changes

This path can be enabled by turning on the job.project.feature_flags.async_pipeline_workers feature flag, see ami/jobs/models.py:400:

        if job.project.feature_flags.async_pipeline_workers:
            cls.queue_images_to_nats(job, images)
        else:
            cls.process_images(job, images)

And running the ami worker from RolnickLab/ami-data-companion#94

Test

image

Test both modes by tweaking the flag in the django admin console:
image

Checklist

  • I have tested these changes appropriately.
  • I have added and/or modified relevant tests.
  • I updated relevant documentation or comments.
  • I have verified that this PR follows the project's coding standards.
  • Any dependent changes have already been merged to main.

@netlify
Copy link

netlify bot commented Oct 8, 2025

Deploy Preview for antenna-preview canceled.

Name Link
🔨 Latest commit 6bcc610
🔍 Latest deploy log https://app.netlify.com/projects/antenna-preview/deploys/697d60c3c0d4df0008caee65

@mihow
Copy link
Collaborator

mihow commented Oct 8, 2025

Exciting!

@carlosgjs carlosgjs marked this pull request as ready for review October 24, 2025 18:49
Copilot AI review requested due to automatic review settings October 24, 2025 18:49
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR introduces Processing Service V2, enabling a pull-based task queue architecture using NATS JetStream instead of the push-based Celery approach. Workers can now pull tasks via HTTP endpoints, process them independently, and acknowledge completion without maintaining persistent connections.

Key changes:

  • Added NATS JetStream integration for distributed task queuing with configurable visibility timeouts
  • Introduced new REST API endpoints for task pulling (/jobs/{id}/tasks) and result submission (/jobs/{id}/result)
  • Implemented Redis-based progress tracking to handle asynchronous worker updates

Reviewed Changes

Copilot reviewed 15 out of 16 changed files in this pull request and generated 11 comments.

Show a summary per file
File Description
requirements/base.txt Added nats-py dependency for NATS client support
object_model_diagram.md Added comprehensive Mermaid diagram documenting ML pipeline system architecture
docker-compose.yml Added NATS JetStream service with health checks and monitoring
config/settings/base.py Added NATS_URL configuration setting
ami/utils/nats_queue.py New TaskQueueManager class for NATS JetStream operations
ami/jobs/views.py Added task pulling and result submission endpoints with pipeline filtering
ami/jobs/utils.py Helper function for running async code in sync Django context
ami/jobs/tasks.py New Celery task for processing pipeline results asynchronously
ami/jobs/task_state.py TaskStateManager for Redis-based job progress tracking
ami/jobs/models.py Added queue_images_to_nats method and NATS cleanup logic
ami/base/views.py Fixed request.data handling when not a dict
README.md Added NATS dashboard documentation link
.vscode/launch.json Added debug configurations for Django and Celery containers
.envs/.local/.django Added NATS_URL environment variable
.dockerignore Expanded with comprehensive ignore patterns
Comments suppressed due to low confidence (1)

object_model_diagram.md:1

  • The comment at line 13 appears to be template text from instructions rather than actual documentation content. This namedtuple field description doesn't match the file's purpose as an object model diagram.
# Object Model Diagram: ML Pipeline System

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@carlosgjs carlosgjs requested a review from mihow October 24, 2025 18:59
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Oct 31, 2025

📝 Walkthrough

Walkthrough

Adds NATS JetStream task queuing, a TaskQueueManager, Redis-backed task-state tracking, async ML job queuing behind a feature flag, Celery result-processing with NATS ACKing, API endpoints to reserve/submit tasks, tests, Docker/CI/staging and config updates, and a new nats dependency.

Changes

Cohort / File(s) Summary
Configuration & Requirements
\.envs/.local/.django, \.envs/.ci/.django, \.envs/.production/.django-example, config/settings/base.py, requirements/base.txt
Add NATS_URL env entries; expose NATS_URL in settings; add nats-py==2.10.0.
Docker & CI / Staging
docker-compose.yml, docker-compose.ci.yml, docker-compose.staging.yml
Introduce nats JetStream service with healthchecks; add nats to django depends_on; minor entrypoint/restart tweaks.
Docs
README.md
Document NATS dashboard URL and localhost note.
Feature Flags
ami/main/models.py
Add async_pipeline_workers: bool = False to ProjectFeatureFlags.
NATS Queue Manager
ami/ml/orchestration/nats_queue.py
New async TaskQueueManager and get_connection to manage JetStream connection, per-job stream/consumer lifecycle, publish/reserve/ack, and cleanup.
Orchestration Helpers
ami/ml/orchestration/jobs.py
Add queue_images_to_nats(job, images) and cleanup_nats_resources(job); prepare messages, initialize task state, publish via TaskQueueManager.
Task State Management
ami/ml/orchestration/task_state.py
New TaskStateManager and TaskProgress for Redis-backed per-job per-stage pending lists, progress calculation, TTL, locking, and cleanup.
ML Job Model
ami/jobs/models.py
MLJob.run branches on project.feature_flags.async_pipeline_workers; add MLJob.process_images classmethod and NATS queueing path.
Celery Tasks & Helpers
ami/jobs/tasks.py
Add Celery task process_nats_pipeline_result and helpers log_time, _update_job_progress, _ack_task_via_nats; integrate TaskStateManager, DB save, NATS ACK, timing/logging, and concurrency handling.
API Endpoints & Schema Params
ami/jobs/views.py, ami/utils/requests.py
Implement tasks GET to reserve tasks from NATS; result POST to enqueue result processing via Celery; add OpenAPI params ids_only, incomplete_only, batch.
Schemas / Models
ami/ml/schemas.py
Add PipelineResultsError; remove queue_timestamp from PipelineProcessingTask; allow PipelineTaskResult.result to be response or error.
Tests
ami/ml/test_nats_queue.py, ami/ml/tests.py, ami/jobs/tests.py
Add unit tests for TaskQueueManager and TaskStateManager; update job tests to call queue_images_to_nats and expect accepted / results_queued.
Module Init / Minor Refactor
ami/ml/orchestration/__init__.py
Remove unconditional re-export from .processing and add explanatory comment.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant MLJob
    participant Flags as FeatureFlags
    participant QueueMgr as TaskQueueManager
    participant State as TaskStateManager
    participant Worker
    participant Celery as process_nats_pipeline_result
    participant DB as Database

    Client->>MLJob: run(job, images)
    MLJob->>Flags: check async_pipeline_workers
    alt async enabled
        MLJob->>State: initialize_job(image_ids)
        MLJob->>QueueMgr: queue_images_to_nats(job, images)
        loop per image/batch
            QueueMgr->>QueueMgr: publish_task(job_id, message)
        end
    else sync path
        MLJob->>MLJob: process_images(job, images)
        MLJob->>DB: persist results & progress
    end

    Note over Worker,QueueMgr: Worker reserves tasks from JetStream
    Worker->>QueueMgr: reserve_task(job_id, batch)
    Worker->>Celery: run pipeline, produce result + reply_subject
    Celery->>State: update_state(processed_ids, "process", request_id)
    Celery->>DB: save pipeline results
    Celery->>QueueMgr: acknowledge_task(reply_subject)
    Celery->>State: update_state(processed_ids, "results", request_id)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

Suggested labels

backend, ml

Poem

🐇 I hopped through JetStream at dawn’s bright light,
I queued each frame and watched them take flight,
Redis counted carrots as workers ran fast,
Celery nodded when ACKs came at last,
A hop, a queue — the pipeline’s delight. 🥕✨

🚥 Pre-merge checks | ✅ 4 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 73.77% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Linked Issues check ✅ Passed The PR meets the core requirements of #971 by implementing queue creation via TaskQueueManager and populating with per-image tasks; integrates with pull/ACK APIs; and introduces the async_pipeline_workers feature flag for enabling the new path.
Out of Scope Changes check ✅ Passed All changes align with the PR objectives—NATS infrastructure setup, queue management, async result processing, and task state tracking. Changes are focused on enabling Processing Service V2 without unrelated modifications.
Title check ✅ Passed The title accurately describes the main change: implementing a distributed worker architecture for ML processing as the second version of the processing service. It is specific and directly related to the comprehensive changes across queue management, async task handling, and feature flags.
Description check ✅ Passed The PR description is comprehensive and well-structured, covering objectives, architecture, benefits, implementation details, testing instructions, and follow-up work.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ef8f16c and d254867.

📒 Files selected for processing (19)
  • .dockerignore (1 hunks)
  • .envs/.local/.django (1 hunks)
  • .gitignore (1 hunks)
  • .vscode/launch.json (1 hunks)
  • README.md (1 hunks)
  • ami/base/views.py (1 hunks)
  • ami/jobs/models.py (8 hunks)
  • ami/jobs/tasks.py (2 hunks)
  • ami/jobs/views.py (3 hunks)
  • ami/main/models.py (1 hunks)
  • ami/ml/orchestration/jobs.py (1 hunks)
  • ami/ml/orchestration/nats_queue.py (1 hunks)
  • ami/ml/orchestration/task_state.py (1 hunks)
  • ami/ml/orchestration/utils.py (1 hunks)
  • ami/utils/requests.py (2 hunks)
  • config/settings/base.py (2 hunks)
  • docker-compose.yml (4 hunks)
  • object_model_diagram.md (1 hunks)
  • requirements/base.txt (3 hunks)
🧰 Additional context used
🧬 Code graph analysis (8)
ami/ml/orchestration/nats_queue.py (1)
ami/jobs/views.py (1)
  • result (256-339)
ami/ml/orchestration/task_state.py (1)
ami/ml/orchestration/jobs.py (1)
  • cleanup (20-23)
ami/jobs/views.py (3)
ami/jobs/tasks.py (1)
  • process_pipeline_result (45-138)
ami/jobs/models.py (4)
  • Job (727-1012)
  • JobState (27-63)
  • logger (997-1006)
  • final_states (58-59)
ami/ml/orchestration/nats_queue.py (2)
  • TaskQueueManager (28-294)
  • reserve_task (152-208)
ami/jobs/tasks.py (5)
ami/ml/orchestration/nats_queue.py (2)
  • TaskQueueManager (28-294)
  • acknowledge_task (210-229)
ami/ml/orchestration/task_state.py (3)
  • TaskStateManager (17-97)
  • mark_images_processed (48-61)
  • get_progress (63-90)
ami/ml/orchestration/utils.py (1)
  • run_in_async_loop (8-18)
ami/jobs/models.py (5)
  • Job (727-1012)
  • JobState (27-63)
  • logger (997-1006)
  • update_stage (168-188)
  • save (947-958)
ami/ml/models/pipeline.py (3)
  • save (1115-1121)
  • save_results (809-917)
  • save_results (1107-1108)
ami/ml/orchestration/jobs.py (4)
ami/jobs/models.py (2)
  • Job (727-1012)
  • logger (997-1006)
ami/ml/orchestration/nats_queue.py (3)
  • TaskQueueManager (28-294)
  • cleanup_job_resources (278-294)
  • publish_task (119-150)
ami/ml/orchestration/task_state.py (3)
  • TaskStateManager (17-97)
  • cleanup (92-97)
  • initialize_job (38-46)
ami/ml/orchestration/utils.py (1)
  • run_in_async_loop (8-18)
ami/ml/orchestration/utils.py (1)
ami/jobs/models.py (1)
  • logger (997-1006)
ami/base/views.py (1)
ami/main/api/views.py (1)
  • get (1595-1651)
ami/jobs/models.py (3)
ami/ml/orchestration/jobs.py (1)
  • queue_images_to_nats (28-107)
ami/main/models.py (1)
  • SourceImage (1622-1870)
ami/ml/models/pipeline.py (2)
  • process_images (163-278)
  • process_images (1091-1105)
🪛 LanguageTool
object_model_diagram.md

[grammar] ~167-~167: Ensure spelling is correct
Context: ...ts 4. Job tracks progress through JobProgress and JobProgressStageDetail

(QB_NEW_EN_ORTHOGRAPHY_ERROR_IDS_1)

🪛 markdownlint-cli2 (0.18.1)
object_model_diagram.md

15-15: Fenced code blocks should have a language specified

(MD040, fenced-code-language)


31-31: Fenced code blocks should have a language specified

(MD040, fenced-code-language)


38-38: Bare URL used

(MD034, no-bare-urls)


39-39: Bare URL used

(MD034, no-bare-urls)


40-40: Bare URL used

(MD034, no-bare-urls)


41-41: Bare URL used

(MD034, no-bare-urls)


42-42: Bare URL used

(MD034, no-bare-urls)


42-42: Bare URL used

(MD034, no-bare-urls)


43-43: Bare URL used

(MD034, no-bare-urls)


61-61: Fenced code blocks should have a language specified

(MD040, fenced-code-language)


77-77: Bare URL used

(MD034, no-bare-urls)


97-97: Bare URL used

(MD034, no-bare-urls)


118-118: Code block style
Expected: fenced; Actual: indented

(MD046, code-block-style)


122-122: Code block style
Expected: fenced; Actual: indented

(MD046, code-block-style)


126-126: Code block style
Expected: fenced; Actual: indented

(MD046, code-block-style)


130-130: Code block style
Expected: fenced; Actual: indented

(MD046, code-block-style)

🪛 Ruff (0.14.2)
ami/ml/orchestration/nats_queue.py

70-70: Unused method argument: ttr

(ARG002)


73-73: Avoid specifying long messages outside the exception class

(TRY003)


81-81: Do not catch blind exception: Exception

(BLE001)


94-94: Avoid specifying long messages outside the exception class

(TRY003)


103-103: Do not catch blind exception: Exception

(BLE001)


132-132: Avoid specifying long messages outside the exception class

(TRY003)


146-146: Consider moving this statement to an else block

(TRY300)


148-148: Do not catch blind exception: Exception

(BLE001)


149-149: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


164-164: Avoid specifying long messages outside the exception class

(TRY003)


206-206: Do not catch blind exception: Exception

(BLE001)


207-207: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


221-221: Avoid specifying long messages outside the exception class

(TRY003)


226-226: Consider moving this statement to an else block

(TRY300)


227-227: Do not catch blind exception: Exception

(BLE001)


228-228: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


242-242: Avoid specifying long messages outside the exception class

(TRY003)


250-250: Consider moving this statement to an else block

(TRY300)


251-251: Do not catch blind exception: Exception

(BLE001)


252-252: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


266-266: Avoid specifying long messages outside the exception class

(TRY003)


273-273: Consider moving this statement to an else block

(TRY300)


274-274: Do not catch blind exception: Exception

(BLE001)


275-275: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

ami/ml/orchestration/task_state.py

35-35: Unused blanket noqa directive

Remove unused noqa directive

(RUF100)


36-36: Unused blanket noqa directive

Remove unused noqa directive

(RUF100)

ami/jobs/views.py

33-43: Mutable class attributes should be annotated with typing.ClassVar

(RUF012)


73-73: Mutable class attributes should be annotated with typing.ClassVar

(RUF012)


218-218: Unused method argument: pk

(ARG002)


236-236: Avoid specifying long messages outside the exception class

(TRY003)


244-244: Loop control variable i not used within loop body

Rename unused i to _i

(B007)


283-283: Avoid specifying long messages outside the exception class

(TRY003)


288-288: Avoid specifying long messages outside the exception class

(TRY003)


298-298: Avoid specifying long messages outside the exception class

(TRY003)


301-301: Avoid specifying long messages outside the exception class

(TRY003)


322-322: Do not catch blind exception: Exception

(BLE001)


323-323: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

ami/jobs/tasks.py

45-45: Unused function argument: self

(ARG001)


120-120: Do not catch blind exception: Exception

(BLE001)


121-121: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


133-133: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


136-136: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

config/settings/base.py

28-28: Unused noqa directive (non-enabled: E231, E501)

Remove unused noqa directive

(RUF100)

ami/ml/orchestration/jobs.py

69-69: Loop control variable i not used within loop body

Rename unused i to _i

(B007)


78-78: Do not catch blind exception: Exception

(BLE001)

ami/ml/orchestration/utils.py

14-14: Do not catch blind exception: Exception

(BLE001)


15-15: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

ami/jobs/models.py

75-75: Unused blanket noqa directive

Remove unused noqa directive

(RUF100)


430-430: Unused blanket noqa directive

Remove unused noqa directive

(RUF100)


482-482: Unused blanket noqa directive

Remove unused noqa directive

(RUF100)

carlosgjs and others added 2 commits October 31, 2025 14:33
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
ami/jobs/views.py (1)

226-250: Add maximum batch size constraint to prevent resource exhaustion.

The batch parameter has no upper bound, allowing users to request arbitrarily large numbers of tasks. A single request with a high batch value ties up the request thread (via async_to_sync), and multiple concurrent requests could exhaust thread resources. Add a configurable max cap.

🤖 Fix all issues with AI agents
In `@ami/jobs/views.py`:
- Around line 270-304: The current code validates and enqueues each item as it
iterates over results (using PipelineTaskResult and
process_pipeline_result.delay), which can dispatch partial work if a later item
fails; change the flow to first validate all items without calling
process_pipeline_result.delay (e.g., iterate results and construct/validate
PipelineTaskResult objects into a temporary list), and only after every item
validates successfully iterate that temporary list to call
process_pipeline_result.delay and populate queued_tasks for the response (keep
using job.pk, reply_subject and result_data.dict() when enqueuing).

In `@ami/ml/orchestration/jobs.py`:
- Around line 41-92: When building tasks, track skipped images (e.g., add a
skipped_count variable incremented in the "if not image_url" branch) and treat
them as failures: after queue_all_images runs, add skipped_count to
failed_queues (or if tasks is empty, mark job as FAILED when skipped_count > 0
instead of SUCCESS). Update logic around TaskStateManager initialization and the
"if tasks:" / else branch so totals (successful_queues, failed_queues) include
skipped_count and job.progress/state reflect that skipped images count as
failures; reference the PipelineProcessingTask creation loop, the skipped image
logger call, the queue_all_images async function, and the final
async_to_sync(queue_all_images)() call to locate where to apply these changes.
♻️ Duplicate comments (1)
ami/jobs/tasks.py (1)

102-130: Post-ACK failures can leave progress stale—ACK last or retry.

ACK is sent before results-stage updates; if update_state / _update_job_progress fails after ACK, NATS won’t redeliver and progress may stay behind. Consider moving ACK after results-stage updates (or explicitly retry on post-ACK failures).

🐛 Proposed fix
-        _ack_task_via_nats(reply_subject, job.logger)
-        # Update job stage with calculated progress
-        progress_info = state_manager.update_state(processed_image_ids, stage="results", request_id=self.request.id)
+        # Update job stage with calculated progress
+        progress_info = state_manager.update_state(processed_image_ids, stage="results", request_id=self.request.id)
@@
-        _update_job_progress(job_id, "results", progress_info.percentage)
+        _update_job_progress(job_id, "results", progress_info.percentage)
+        _ack_task_via_nats(reply_subject, job.logger)
📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e241586 and 1202063.

📒 Files selected for processing (4)
  • ami/jobs/tasks.py
  • ami/jobs/views.py
  • ami/ml/orchestration/jobs.py
  • ami/ml/orchestration/nats_queue.py
🧰 Additional context used
🧬 Code graph analysis (3)
ami/ml/orchestration/nats_queue.py (1)
ami/ml/schemas.py (1)
  • PipelineProcessingTask (217-228)
ami/jobs/views.py (3)
ami/jobs/tasks.py (1)
  • process_pipeline_result (47-130)
ami/ml/schemas.py (1)
  • PipelineTaskResult (231-237)
ami/ml/orchestration/nats_queue.py (2)
  • TaskQueueManager (35-300)
  • reserve_task (159-214)
ami/ml/orchestration/jobs.py (4)
ami/jobs/models.py (4)
  • Job (734-1019)
  • JobState (27-63)
  • logger (1004-1013)
  • save (954-965)
ami/ml/orchestration/nats_queue.py (3)
  • TaskQueueManager (35-300)
  • cleanup_job_resources (284-300)
  • publish_task (126-157)
ami/ml/orchestration/task_state.py (3)
  • TaskStateManager (17-125)
  • cleanup (119-125)
  • initialize_job (39-49)
ami/ml/schemas.py (1)
  • PipelineProcessingTask (217-228)
🪛 Ruff (0.14.11)
ami/ml/orchestration/nats_queue.py

80-80: Avoid specifying long messages outside the exception class

(TRY003)


88-88: Do not catch blind exception: Exception

(BLE001)


101-101: Avoid specifying long messages outside the exception class

(TRY003)


110-110: Do not catch blind exception: Exception

(BLE001)


138-138: Avoid specifying long messages outside the exception class

(TRY003)


153-153: Consider moving this statement to an else block

(TRY300)


155-155: Do not catch blind exception: Exception

(BLE001)


156-156: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


171-171: Avoid specifying long messages outside the exception class

(TRY003)


212-212: Do not catch blind exception: Exception

(BLE001)


213-213: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


227-227: Avoid specifying long messages outside the exception class

(TRY003)


232-232: Consider moving this statement to an else block

(TRY300)


233-233: Do not catch blind exception: Exception

(BLE001)


234-234: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


248-248: Avoid specifying long messages outside the exception class

(TRY003)


256-256: Consider moving this statement to an else block

(TRY300)


257-257: Do not catch blind exception: Exception

(BLE001)


258-258: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


272-272: Avoid specifying long messages outside the exception class

(TRY003)


279-279: Consider moving this statement to an else block

(TRY300)


280-280: Do not catch blind exception: Exception

(BLE001)


281-281: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

ami/jobs/views.py

253-253: Unused method argument: pk

(ARG002)


306-306: Avoid specifying long messages outside the exception class

(TRY003)


308-308: Do not catch blind exception: Exception

(BLE001)


309-309: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

ami/jobs/tasks.py

98-98: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


124-124: Abstract raise to an inner function

(TRY301)


127-127: Do not catch blind exception: Exception

(BLE001)


128-130: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


146-146: Do not catch blind exception: Exception

(BLE001)


147-147: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

ami/ml/orchestration/jobs.py

75-75: Do not catch blind exception: Exception

(BLE001)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: Redirect rules
  • GitHub Check: Header rules
  • GitHub Check: Pages changed
  • GitHub Check: test
🔇 Additional comments (8)
ami/ml/orchestration/jobs.py (1)

10-25: LGTM: cleanup helper returns success status.

Returning the async cleanup result allows callers to react to failures.

ami/ml/orchestration/nats_queue.py (4)

26-63: LGTM: connection lifecycle is cleaned up.

Context manager setup/teardown is clear and ensures the client is closed.


77-125: LGTM: per-job stream/consumer provisioning is clear.


126-214: LGTM: publish/reserve flow and reply_subject mapping.


216-300: LGTM: ACK and cleanup helpers are straightforward.

ami/jobs/tasks.py (3)

133-148: LGTM: ACK helper encapsulates NATS call cleanly.


151-166: LGTM: atomic progress update under row lock.


207-229: LGTM: lightweight timing helper is clear.

✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.

@mihow
Copy link
Collaborator

mihow commented Jan 31, 2026

@coderabbitai can you make tickets for these? The datetime timezone is a nuanced one. We use local times only for SourceImage timestamps (we only ever want to know the local time for those images and it introduces errors if we convert to UTC). But for job.finished_at and other aspects of the Django CMS, timestamps should have timezones (we want to show when a job started in current web users local time).

@mihow I'm not addressing two of coderabbit comments:

  • Use of logger.exception() instead of logger.error() since I don't see it in the rest of the code base and its probably better done as a global change.
  • Use of timezone.now() instead of datetime.now() to update the Job's finished_at because the rest of the codebase uses datetime, I believe because we have django set to USE_TZ=False

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jan 31, 2026

Tip

For best results, initiate chat on the files or code changes.

🧩 Analysis chain

@mihow I'll create tickets for both of these deferred improvements.


🏁 Script executed:

#!/bin/bash

# Create issue for standardizing exception logging
gh issue create \
  --title "Standardize exception logging to use logger.exception() across codebase" \
  --body "## Context

In PR `#987`, a code review suggested using \`logger.exception()\` instead of \`logger.error()\` for exception handling. This change was deferred because it should be applied consistently across the entire codebase rather than in a single PR.

## Description

Currently, the codebase uses \`logger.error()\` in various places where exceptions are being logged. Django's \`logger.exception()\` is preferred because it automatically includes the full exception traceback, making debugging easier.

## Proposed Change

- Audit the codebase for instances where \`logger.error()\` is used within exception handlers
- Replace with \`logger.exception()\` to automatically capture stack traces
- Ensure consistency across all modules

## References

- PR: https://github.com/RolnickLab/antenna/pull/987
- Comment: https://github.com/RolnickLab/antenna/pull/987#discussion_r[comment_id]
- Requested by: `@mihow`" \
  --assignee mihow

# Create issue for timezone-aware timestamps
gh issue create \
  --title "Use timezone.now() for Django CMS timestamps while keeping local times for SourceImage" \
  --body "## Context

In PR `#987`, a code review suggested using \`timezone.now()\` instead of \`datetime.now()\` for the Job model's \`finished_at\` field. This change was deferred due to the nuanced timezone requirements in the codebase.

## Description

The codebase currently has mixed timezone handling:
- Uses \`datetime.now()\` for Job timestamps and other Django CMS operations
- Project is configured with \`USE_TZ=False\`

However, there's a nuanced requirement that needs to be addressed:

### SourceImage timestamps (keep as local time)
- SourceImage timestamps should remain **local time only**
- These represent the actual local time when images were captured
- Converting to UTC would introduce errors for these timestamps
- **No changes needed for SourceImage**

### Django CMS operations (should use timezone-aware)
- Job.finished_at and similar Django CMS timestamps should be **timezone-aware**
- Web users need to see when jobs started/finished in their local time
- Currently these are naive datetimes, making it difficult to display correctly across timezones

## Proposed Change

1. Keep SourceImage timestamps as local time (no changes)
2. Migrate Job.finished_at, Job.started_at, and similar Django CMS fields to use \`timezone.now()\`
3. Consider enabling \`USE_TZ=True\` for the Django project, while ensuring SourceImage timestamps remain local
4. Update any related timestamp handling to maintain the distinction between local-only and timezone-aware timestamps

## References

- PR: https://github.com/RolnickLab/antenna/pull/987
- Comment: https://github.com/RolnickLab/antenna/pull/987#discussion_r[comment_id]
- Requested by: `@mihow`" \
  --assignee mihow

echo "Issues created successfully"

Length of output: 127


@mihow I've created two issues to track these deferred improvements:

  1. Issue #1106: Standardize exception logging to use logger.exception() across codebase

    • Proposes a codebase-wide audit to replace logger.error() with logger.exception() in exception handlers for better debugging
  2. Issue #1107: Use timezone.now() for Django CMS timestamps while keeping local times for SourceImage

    • Addresses the nuanced requirement: keep SourceImage timestamps as local-only (no UTC conversion), but migrate Django CMS operations like Job.finished_at to use timezone-aware timestamps for proper multi-timezone display

Both issues are assigned to you and include references back to this PR and your requirements.

Copy link
Collaborator

@mihow mihow left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is ready to merge! Reminder that this is only available behind a feature flag, so the job status issue will not appear for any user except us while testing. I added a plan here .agents/planning/async-job-status-handling.md, and we do have some additional work to do related to regarding job status anyway (#1025).

The last change I'd like to make is updating the PR description to describe what this introduces in the context of Antenna (adding a new distributed processing service / consumer approach).

@mihow mihow changed the title Processing service V2 - Phase 1 Distributed Worker Architecture for ML Processing (Processing Service V2) pt. 1 Jan 31, 2026
@mihow mihow merged commit 5ad5223 into RolnickLab:main Jan 31, 2026
7 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

PSv2 Job scheduler Reference PSv2 implementation PSv2 Pull API design and specification

3 participants