Skip to content

Conversation

@enlorenz
Copy link

No description provided.

@enlorenz enlorenz marked this pull request as draft January 30, 2026 12:40
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jan 30, 2026

📝 Walkthrough

Walkthrough

Introduces a backlog-aware assignment wrapper that repeatedly assigns jobs and refills the main queue from a backlog until no progress occurs. Updates environment and baseline call sites to use the new wrapper and exposes backlog fields in observations. Adds two node-utilization metrics to callbacks.

Changes

Cohort / File(s) Summary
Job assignment wrapper
src/job_management.py
Adds assign_jobs_with_backlog_refill(...): loops calling the existing assign routine, fills the main queue from backlog when empty, aggregates launched/dropped counts, and returns cumulative results including backlog_assigned.
Environment integration
src/environment.py
Replaces call to assign_jobs_to_available_nodes with assign_jobs_with_backlog_refill(...), passes backlog_queue, captures backlog_assigned, and exposes backlog_size and backlog_assigned in the observation space and state initialization.
Baseline integration
src/baseline.py
Imports and calls assign_jobs_with_backlog_refill(...) (replacing previous call), adapts argument order to include metrics and baseline_backlog_queue, and captures the expanded return tuple.
Metrics logging
src/callbacks.py
Appends metrics/on_nodes and metrics/used_nodes to end-of-episode logging, reading the last episode value or 0 if empty.

Sequence Diagram(s)

mermaid
sequenceDiagram
participant Env as Environment
participant JM as JobManagement
participant Backlog as BacklogQueue
participant Nodes as Nodes
Env->>JM: assign_jobs_with_backlog_refill(job_queue, nodes, cores, running, next_slot, next_job_id, metrics, backlog_queue, is_baseline)
JM->>Nodes: assign_jobs_to_available_nodes(...) — attempt assigns
alt main queue emptied and backlog has jobs
JM->>Backlog: fill_queue_from_backlog(...) — move jobs into main queue
JM->>Nodes: assign_jobs_to_available_nodes(...) — assign newly filled jobs
loop until no progress
end
end
JM-->>Env: (total_launched, next_empty_slot, total_dropped, next_job_id, backlog_assigned)

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

🚥 Pre-merge checks | ✅ 1 | ❌ 2
❌ Failed checks (1 warning, 1 inconclusive)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 42.86% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Description check ❓ Inconclusive No pull request description was provided by the author, making it impossible to evaluate whether it is related to the changeset. Add a pull request description that explains the purpose, motivation, and impact of adding backlog refill functionality during job assignment.
✅ Passed checks (1 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly describes the main change: adding backlog refill functionality during job assignment, which is reflected in the new function and integration across multiple modules.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Tip

🧪 Unit Test Generation v2 is now available!

We have significantly improved our unit test generation capabilities.

To enable: Add this to your .coderabbit.yaml configuration:

reviews:
  finishing_touches:
    unit_tests:
      enabled: true

Try it out by using the @coderabbitai generate unit tests command on your code files or under ✨ Finishing Touches on the walkthrough!

Have feedback? Share your thoughts on our Discord thread!


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@enlorenz enlorenz marked this pull request as ready for review January 30, 2026 12:45
@enlorenz enlorenz marked this pull request as draft January 30, 2026 15:17
Enis Lorenz added 2 commits January 30, 2026 16:18
…obs, refill queue and continue assigning until ressources are filled up.

Add backlog refill loop helper for job assignment

centralize refill/assign loop in job_management
use progress-based break (launched or moved from backlog)
update environment and baseline to call helper
@enlorenz enlorenz marked this pull request as ready for review January 30, 2026 15:25
@enlorenz enlorenz marked this pull request as draft January 30, 2026 16:02
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🤖 Fix all issues with AI agents
In `@src/environment.py`:
- Around line 320-330: The backlog_assigned metric undercounts because
fill_queue_from_backlog runs earlier and its assigned jobs aren’t included in
the value returned by assign_jobs_with_backlog_refill; fix by capturing the
number of jobs pulled during the earlier prefill (e.g., record count from
fill_queue_from_backlog or len change on backlog_queue) and pass that into or
seed the wrapper used by assign_jobs_with_backlog_refill (or change the wrapper
signature to accept an initial_prefill_count) so that backlog_assigned =
prefill_count + wrapper_returned_backlog_assigned (or compute a combined value)
and store that combined value in self.state['backlog_assigned'][0].
- Around line 161-172: Observation lacks explicit time info: add a time feature
to the observation_space dict (e.g., 'time_of_day' or 'episode_hour') alongside
existing keys like 'backlog_size' and 'backlog_assigned', and ensure the
environment updates this feature on every reset() and step() (or inside the
existing _get_observation/_observe method) so the returned observation contains
the current time-of-day or hour-of-episode value; update observation_space
dtype/shape appropriately (e.g., Box(low=0, high=23, shape=(1,), dtype=np.int32)
or normalized float) and set the value during reset() initialization and each
step() before returning observations.

In `@src/job_management.py`:
- Around line 258-296: The backlog refill path can exceed the 1000-job queue cap
and skip per-hour/new-job and max-duration checks; update
assign_jobs_with_backlog_refill (and the helper fill_queue_from_backlog) to
enforce limits before moving jobs from backlog: compute combined_size =
current_queue_count(job_queue_2d) + len(backlog_queue) and only move
min(moved_from_backlog, 1000 - current_queue_count) jobs, validate each job's
new_job_count <= 1500_per_hour and runtime <= 170_hours before enqueueing,
reject or drop any jobs that violate these rules (increment total_dropped_jobs
and backlog_assigned appropriately), and ensure metrics/logging are updated when
jobs are dropped or skipped.
- Around line 270-295: The loop currently re-calls
assign_jobs_to_available_nodes multiple times per step, which causes wait ages
to be incremented repeatedly; change the loop so it only iterates again when new
jobs were actually moved from backlog into the queue. Concretely, after calling
assign_jobs_to_available_nodes and optionally fill_queue_from_backlog, break out
of the while loop unless moved_from_backlog > 0 (i.e., only continue the loop
when fill_queue_from_backlog added jobs); update the loop's termination
condition that currently checks num_launched_jobs and moved_from_backlog so it
instead stops immediately when moved_from_backlog == 0 to prevent re‑aging,
referencing assign_jobs_to_available_nodes and fill_queue_from_backlog and the
moved_from_backlog variable.

Comment on lines +161 to +172
'backlog_size': spaces.Box(
low=0,
high=np.iinfo(np.int32).max,
shape=(1,),
dtype=np.int32
),
'backlog_assigned': spaces.Box(
low=0,
high=np.iinfo(np.int32).max,
shape=(1,),
dtype=np.int32
),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Add time features to the observation space.

The observation still lacks explicit time information (e.g., hour-of-episode or hour-of-day), which is required for the state representation. Please add a time feature to observation_space and update it each step/reset.

✅ Suggested addition (example)
             'predicted_prices': spaces.Box(
                 low=-1000,
                 high=1000,
                 shape=(24,),
                 dtype=np.float32
             ),
+            'time': spaces.Box(
+                low=0,
+                high=EPISODE_HOURS - 1,
+                shape=(1,),
+                dtype=np.int32
+            ),
         })
Based on learnings: State space representation should include node counts, job queue status, electricity prices, and time information.
🤖 Prompt for AI Agents
In `@src/environment.py` around lines 161 - 172, Observation lacks explicit time
info: add a time feature to the observation_space dict (e.g., 'time_of_day' or
'episode_hour') alongside existing keys like 'backlog_size' and
'backlog_assigned', and ensure the environment updates this feature on every
reset() and step() (or inside the existing _get_observation/_observe method) so
the returned observation contains the current time-of-day or hour-of-episode
value; update observation_space dtype/shape appropriately (e.g., Box(low=0,
high=23, shape=(1,), dtype=np.int32) or normalized float) and set the value
during reset() initialization and each step() before returning observations.

Comment on lines +320 to +330
num_launched_jobs, self.next_empty_slot, num_dropped_this_step, self.next_job_id, backlog_assigned = (
assign_jobs_with_backlog_refill(
job_queue_2d, self.state['nodes'], self.cores_available, self.running_jobs,
self.next_empty_slot, self.next_job_id, self.metrics, self.backlog_queue,
is_baseline=False
)
)

self.env_print(f" {num_launched_jobs} jobs launched")
self.state['backlog_size'][0] = len(self.backlog_queue)
self.state['backlog_assigned'][0] = backlog_assigned
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Backlog-assigned can undercount when prefill happens earlier in the step.

fill_queue_from_backlog is called before new jobs are added, but backlog_assigned only counts backlog jobs loaded during the wrapper’s internal refill. As a result, backlog jobs launched after the earlier prefill are not counted, and the observation can stay at 0 even when backlog jobs were scheduled. Capture the prefill count and seed the wrapper’s backlog tracking (or compute a combined value).

✅ Possible direction (requires wrapper signature update)
-        self.next_empty_slot, _ = fill_queue_from_backlog(job_queue_2d, self.backlog_queue, self.next_empty_slot)
+        self.next_empty_slot, prefilled_from_backlog = fill_queue_from_backlog(
+            job_queue_2d, self.backlog_queue, self.next_empty_slot
+        )

         num_launched_jobs, self.next_empty_slot, num_dropped_this_step, self.next_job_id, backlog_assigned = (
             assign_jobs_with_backlog_refill(
                 job_queue_2d, self.state['nodes'], self.cores_available, self.running_jobs,
-                self.next_empty_slot, self.next_job_id, self.metrics, self.backlog_queue,
+                self.next_empty_slot, self.next_job_id, self.metrics, self.backlog_queue,
+                initial_backlog_loaded=prefilled_from_backlog,
                 is_baseline=False
             )
         )
🤖 Prompt for AI Agents
In `@src/environment.py` around lines 320 - 330, The backlog_assigned metric
undercounts because fill_queue_from_backlog runs earlier and its assigned jobs
aren’t included in the value returned by assign_jobs_with_backlog_refill; fix by
capturing the number of jobs pulled during the earlier prefill (e.g., record
count from fill_queue_from_backlog or len change on backlog_queue) and pass that
into or seed the wrapper used by assign_jobs_with_backlog_refill (or change the
wrapper signature to accept an initial_prefill_count) so that backlog_assigned =
prefill_count + wrapper_returned_backlog_assigned (or compute a combined value)
and store that combined value in self.state['backlog_assigned'][0].

Comment on lines 258 to 299
def assign_jobs_with_backlog_refill(job_queue_2d, nodes, cores_available, running_jobs,
next_empty_slot, next_job_id, metrics, backlog_queue,
is_baseline=False):
"""
Assign jobs, refilling the queue from backlog when it becomes empty.
Loop until no progress is made or resources/backlog are exhausted.
"""
total_launched_jobs = 0
total_dropped_jobs = 0
backlog_assigned = 0
backlog_loaded_remaining = 0

while True:
num_launched_jobs, next_empty_slot, num_dropped, next_job_id = assign_jobs_to_available_nodes(
job_queue_2d, nodes, cores_available, running_jobs,
next_empty_slot, next_job_id, metrics, is_baseline=is_baseline
)
total_launched_jobs += num_launched_jobs
total_dropped_jobs += num_dropped
if backlog_loaded_remaining > 0 and num_launched_jobs > 0:
assigned_from_backlog = min(num_launched_jobs, backlog_loaded_remaining)
backlog_assigned += assigned_from_backlog
backlog_loaded_remaining -= assigned_from_backlog

queue_empty = np.all(job_queue_2d[:, 0] == 0)
backlog_has_jobs = len(backlog_queue) > 0
resources_available = np.any((nodes >= 0) & (cores_available > 0))

moved_from_backlog = 0
if queue_empty and backlog_has_jobs and resources_available:
next_empty_slot, moved_from_backlog = fill_queue_from_backlog(
job_queue_2d, backlog_queue, next_empty_slot
)
backlog_loaded_remaining += moved_from_backlog

if num_launched_jobs == 0 and moved_from_backlog == 0:
break

return total_launched_jobs, next_empty_slot, total_dropped_jobs, next_job_id, backlog_assigned
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Backlog refill can violate the 1000‑job queue cap.

With backlog enabled, overflow jobs can accumulate beyond the fixed queue size, which risks exceeding the 1000‑job limit (and potentially the per‑hour and max‑duration constraints) unless those limits are enforced elsewhere. Please cap combined queue+backlog size and validate new job counts/durations before enqueueing or refilling.
As per coding guidelines: Job queue constraints must enforce maximum 1000 jobs, maximum 1500 new jobs per hour, and maximum 170h runtime per job.

🤖 Prompt for AI Agents
In `@src/job_management.py` around lines 258 - 296, The backlog refill path can
exceed the 1000-job queue cap and skip per-hour/new-job and max-duration checks;
update assign_jobs_with_backlog_refill (and the helper fill_queue_from_backlog)
to enforce limits before moving jobs from backlog: compute combined_size =
current_queue_count(job_queue_2d) + len(backlog_queue) and only move
min(moved_from_backlog, 1000 - current_queue_count) jobs, validate each job's
new_job_count <= 1500_per_hour and runtime <= 170_hours before enqueueing,
reject or drop any jobs that violate these rules (increment total_dropped_jobs
and backlog_assigned appropriately), and ensure metrics/logging are updated when
jobs are dropped or skipped.

Comment on lines 270 to 298
while True:
num_launched_jobs, next_empty_slot, num_dropped, next_job_id = assign_jobs_to_available_nodes(
job_queue_2d, nodes, cores_available, running_jobs,
next_empty_slot, next_job_id, metrics, is_baseline=is_baseline
)
total_launched_jobs += num_launched_jobs
total_dropped_jobs += num_dropped
if backlog_loaded_remaining > 0 and num_launched_jobs > 0:
assigned_from_backlog = min(num_launched_jobs, backlog_loaded_remaining)
backlog_assigned += assigned_from_backlog
backlog_loaded_remaining -= assigned_from_backlog

queue_empty = np.all(job_queue_2d[:, 0] == 0)
backlog_has_jobs = len(backlog_queue) > 0
resources_available = np.any((nodes >= 0) & (cores_available > 0))

moved_from_backlog = 0
if queue_empty and backlog_has_jobs and resources_available:
next_empty_slot, moved_from_backlog = fill_queue_from_backlog(
job_queue_2d, backlog_queue, next_empty_slot
)
backlog_loaded_remaining += moved_from_backlog

if num_launched_jobs == 0 and moved_from_backlog == 0:
break

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Avoid re‑aging waiting jobs within the same step.

The loop re-runs assign_jobs_to_available_nodes even when no new jobs were pulled from backlog, which increments wait ages multiple times per step and inflates wait-time metrics (and future drop logic). Only iterate again when backlog is actually refilled.

✅ Suggested fix
-        if num_launched_jobs == 0 and moved_from_backlog == 0:
-            break
+        # Only iterate again if new jobs were pulled in from backlog.
+        if moved_from_backlog == 0:
+            break
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
while True:
num_launched_jobs, next_empty_slot, num_dropped, next_job_id = assign_jobs_to_available_nodes(
job_queue_2d, nodes, cores_available, running_jobs,
next_empty_slot, next_job_id, metrics, is_baseline=is_baseline
)
total_launched_jobs += num_launched_jobs
total_dropped_jobs += num_dropped
if backlog_loaded_remaining > 0 and num_launched_jobs > 0:
assigned_from_backlog = min(num_launched_jobs, backlog_loaded_remaining)
backlog_assigned += assigned_from_backlog
backlog_loaded_remaining -= assigned_from_backlog
queue_empty = np.all(job_queue_2d[:, 0] == 0)
backlog_has_jobs = len(backlog_queue) > 0
resources_available = np.any((nodes >= 0) & (cores_available > 0))
moved_from_backlog = 0
if queue_empty and backlog_has_jobs and resources_available:
next_empty_slot, moved_from_backlog = fill_queue_from_backlog(
job_queue_2d, backlog_queue, next_empty_slot
)
backlog_loaded_remaining += moved_from_backlog
if num_launched_jobs == 0 and moved_from_backlog == 0:
break
while True:
num_launched_jobs, next_empty_slot, num_dropped, next_job_id = assign_jobs_to_available_nodes(
job_queue_2d, nodes, cores_available, running_jobs,
next_empty_slot, next_job_id, metrics, is_baseline=is_baseline
)
total_launched_jobs += num_launched_jobs
total_dropped_jobs += num_dropped
if backlog_loaded_remaining > 0 and num_launched_jobs > 0:
assigned_from_backlog = min(num_launched_jobs, backlog_loaded_remaining)
backlog_assigned += assigned_from_backlog
backlog_loaded_remaining -= assigned_from_backlog
queue_empty = np.all(job_queue_2d[:, 0] == 0)
backlog_has_jobs = len(backlog_queue) > 0
resources_available = np.any((nodes >= 0) & (cores_available > 0))
moved_from_backlog = 0
if queue_empty and backlog_has_jobs and resources_available:
next_empty_slot, moved_from_backlog = fill_queue_from_backlog(
job_queue_2d, backlog_queue, next_empty_slot
)
backlog_loaded_remaining += moved_from_backlog
# Only iterate again if new jobs were pulled in from backlog.
if moved_from_backlog == 0:
break
🤖 Prompt for AI Agents
In `@src/job_management.py` around lines 270 - 295, The loop currently re-calls
assign_jobs_to_available_nodes multiple times per step, which causes wait ages
to be incremented repeatedly; change the loop so it only iterates again when new
jobs were actually moved from backlog into the queue. Concretely, after calling
assign_jobs_to_available_nodes and optionally fill_queue_from_backlog, break out
of the while loop unless moved_from_backlog > 0 (i.e., only continue the loop
when fill_queue_from_backlog added jobs); update the loop's termination
condition that currently checks num_launched_jobs and moved_from_backlog so it
instead stops immediately when moved_from_backlog == 0 to prevent re‑aging,
referencing assign_jobs_to_available_nodes and fill_queue_from_backlog and the
moved_from_backlog variable.

Agent now has two extra integer variables in observation space to know of the existence and the length of the backlog_queue.

Flagged assign_jobs so it only ages waiting jobs once per step
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant