-
Notifications
You must be signed in to change notification settings - Fork 1
Backlogger: Add refill from backlogger during job assignment #20
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
📝 WalkthroughWalkthroughIntroduces a backlog-aware assignment wrapper that repeatedly assigns jobs and refills the main queue from a backlog until no progress occurs. Updates environment and baseline call sites to use the new wrapper and exposes backlog fields in observations. Adds two node-utilization metrics to callbacks. Changes
Sequence Diagram(s)mermaid Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Tip 🧪 Unit Test Generation v2 is now available!We have significantly improved our unit test generation capabilities. To enable: Add this to your reviews:
finishing_touches:
unit_tests:
enabled: trueTry it out by using the Have feedback? Share your thoughts on our Discord thread! Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
…obs, refill queue and continue assigning until ressources are filled up. Add backlog refill loop helper for job assignment centralize refill/assign loop in job_management use progress-based break (launched or moved from backlog) update environment and baseline to call helper
bf53448 to
4d706f6
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🤖 Fix all issues with AI agents
In `@src/environment.py`:
- Around line 320-330: The backlog_assigned metric undercounts because
fill_queue_from_backlog runs earlier and its assigned jobs aren’t included in
the value returned by assign_jobs_with_backlog_refill; fix by capturing the
number of jobs pulled during the earlier prefill (e.g., record count from
fill_queue_from_backlog or len change on backlog_queue) and pass that into or
seed the wrapper used by assign_jobs_with_backlog_refill (or change the wrapper
signature to accept an initial_prefill_count) so that backlog_assigned =
prefill_count + wrapper_returned_backlog_assigned (or compute a combined value)
and store that combined value in self.state['backlog_assigned'][0].
- Around line 161-172: Observation lacks explicit time info: add a time feature
to the observation_space dict (e.g., 'time_of_day' or 'episode_hour') alongside
existing keys like 'backlog_size' and 'backlog_assigned', and ensure the
environment updates this feature on every reset() and step() (or inside the
existing _get_observation/_observe method) so the returned observation contains
the current time-of-day or hour-of-episode value; update observation_space
dtype/shape appropriately (e.g., Box(low=0, high=23, shape=(1,), dtype=np.int32)
or normalized float) and set the value during reset() initialization and each
step() before returning observations.
In `@src/job_management.py`:
- Around line 258-296: The backlog refill path can exceed the 1000-job queue cap
and skip per-hour/new-job and max-duration checks; update
assign_jobs_with_backlog_refill (and the helper fill_queue_from_backlog) to
enforce limits before moving jobs from backlog: compute combined_size =
current_queue_count(job_queue_2d) + len(backlog_queue) and only move
min(moved_from_backlog, 1000 - current_queue_count) jobs, validate each job's
new_job_count <= 1500_per_hour and runtime <= 170_hours before enqueueing,
reject or drop any jobs that violate these rules (increment total_dropped_jobs
and backlog_assigned appropriately), and ensure metrics/logging are updated when
jobs are dropped or skipped.
- Around line 270-295: The loop currently re-calls
assign_jobs_to_available_nodes multiple times per step, which causes wait ages
to be incremented repeatedly; change the loop so it only iterates again when new
jobs were actually moved from backlog into the queue. Concretely, after calling
assign_jobs_to_available_nodes and optionally fill_queue_from_backlog, break out
of the while loop unless moved_from_backlog > 0 (i.e., only continue the loop
when fill_queue_from_backlog added jobs); update the loop's termination
condition that currently checks num_launched_jobs and moved_from_backlog so it
instead stops immediately when moved_from_backlog == 0 to prevent re‑aging,
referencing assign_jobs_to_available_nodes and fill_queue_from_backlog and the
moved_from_backlog variable.
| 'backlog_size': spaces.Box( | ||
| low=0, | ||
| high=np.iinfo(np.int32).max, | ||
| shape=(1,), | ||
| dtype=np.int32 | ||
| ), | ||
| 'backlog_assigned': spaces.Box( | ||
| low=0, | ||
| high=np.iinfo(np.int32).max, | ||
| shape=(1,), | ||
| dtype=np.int32 | ||
| ), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add time features to the observation space.
The observation still lacks explicit time information (e.g., hour-of-episode or hour-of-day), which is required for the state representation. Please add a time feature to observation_space and update it each step/reset.
✅ Suggested addition (example)
'predicted_prices': spaces.Box(
low=-1000,
high=1000,
shape=(24,),
dtype=np.float32
),
+ 'time': spaces.Box(
+ low=0,
+ high=EPISODE_HOURS - 1,
+ shape=(1,),
+ dtype=np.int32
+ ),
})🤖 Prompt for AI Agents
In `@src/environment.py` around lines 161 - 172, Observation lacks explicit time
info: add a time feature to the observation_space dict (e.g., 'time_of_day' or
'episode_hour') alongside existing keys like 'backlog_size' and
'backlog_assigned', and ensure the environment updates this feature on every
reset() and step() (or inside the existing _get_observation/_observe method) so
the returned observation contains the current time-of-day or hour-of-episode
value; update observation_space dtype/shape appropriately (e.g., Box(low=0,
high=23, shape=(1,), dtype=np.int32) or normalized float) and set the value
during reset() initialization and each step() before returning observations.
| num_launched_jobs, self.next_empty_slot, num_dropped_this_step, self.next_job_id, backlog_assigned = ( | ||
| assign_jobs_with_backlog_refill( | ||
| job_queue_2d, self.state['nodes'], self.cores_available, self.running_jobs, | ||
| self.next_empty_slot, self.next_job_id, self.metrics, self.backlog_queue, | ||
| is_baseline=False | ||
| ) | ||
| ) | ||
|
|
||
| self.env_print(f" {num_launched_jobs} jobs launched") | ||
| self.state['backlog_size'][0] = len(self.backlog_queue) | ||
| self.state['backlog_assigned'][0] = backlog_assigned |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Backlog-assigned can undercount when prefill happens earlier in the step.
fill_queue_from_backlog is called before new jobs are added, but backlog_assigned only counts backlog jobs loaded during the wrapper’s internal refill. As a result, backlog jobs launched after the earlier prefill are not counted, and the observation can stay at 0 even when backlog jobs were scheduled. Capture the prefill count and seed the wrapper’s backlog tracking (or compute a combined value).
✅ Possible direction (requires wrapper signature update)
- self.next_empty_slot, _ = fill_queue_from_backlog(job_queue_2d, self.backlog_queue, self.next_empty_slot)
+ self.next_empty_slot, prefilled_from_backlog = fill_queue_from_backlog(
+ job_queue_2d, self.backlog_queue, self.next_empty_slot
+ )
num_launched_jobs, self.next_empty_slot, num_dropped_this_step, self.next_job_id, backlog_assigned = (
assign_jobs_with_backlog_refill(
job_queue_2d, self.state['nodes'], self.cores_available, self.running_jobs,
- self.next_empty_slot, self.next_job_id, self.metrics, self.backlog_queue,
+ self.next_empty_slot, self.next_job_id, self.metrics, self.backlog_queue,
+ initial_backlog_loaded=prefilled_from_backlog,
is_baseline=False
)
)🤖 Prompt for AI Agents
In `@src/environment.py` around lines 320 - 330, The backlog_assigned metric
undercounts because fill_queue_from_backlog runs earlier and its assigned jobs
aren’t included in the value returned by assign_jobs_with_backlog_refill; fix by
capturing the number of jobs pulled during the earlier prefill (e.g., record
count from fill_queue_from_backlog or len change on backlog_queue) and pass that
into or seed the wrapper used by assign_jobs_with_backlog_refill (or change the
wrapper signature to accept an initial_prefill_count) so that backlog_assigned =
prefill_count + wrapper_returned_backlog_assigned (or compute a combined value)
and store that combined value in self.state['backlog_assigned'][0].
| def assign_jobs_with_backlog_refill(job_queue_2d, nodes, cores_available, running_jobs, | ||
| next_empty_slot, next_job_id, metrics, backlog_queue, | ||
| is_baseline=False): | ||
| """ | ||
| Assign jobs, refilling the queue from backlog when it becomes empty. | ||
| Loop until no progress is made or resources/backlog are exhausted. | ||
| """ | ||
| total_launched_jobs = 0 | ||
| total_dropped_jobs = 0 | ||
| backlog_assigned = 0 | ||
| backlog_loaded_remaining = 0 | ||
|
|
||
| while True: | ||
| num_launched_jobs, next_empty_slot, num_dropped, next_job_id = assign_jobs_to_available_nodes( | ||
| job_queue_2d, nodes, cores_available, running_jobs, | ||
| next_empty_slot, next_job_id, metrics, is_baseline=is_baseline | ||
| ) | ||
| total_launched_jobs += num_launched_jobs | ||
| total_dropped_jobs += num_dropped | ||
| if backlog_loaded_remaining > 0 and num_launched_jobs > 0: | ||
| assigned_from_backlog = min(num_launched_jobs, backlog_loaded_remaining) | ||
| backlog_assigned += assigned_from_backlog | ||
| backlog_loaded_remaining -= assigned_from_backlog | ||
|
|
||
| queue_empty = np.all(job_queue_2d[:, 0] == 0) | ||
| backlog_has_jobs = len(backlog_queue) > 0 | ||
| resources_available = np.any((nodes >= 0) & (cores_available > 0)) | ||
|
|
||
| moved_from_backlog = 0 | ||
| if queue_empty and backlog_has_jobs and resources_available: | ||
| next_empty_slot, moved_from_backlog = fill_queue_from_backlog( | ||
| job_queue_2d, backlog_queue, next_empty_slot | ||
| ) | ||
| backlog_loaded_remaining += moved_from_backlog | ||
|
|
||
| if num_launched_jobs == 0 and moved_from_backlog == 0: | ||
| break | ||
|
|
||
| return total_launched_jobs, next_empty_slot, total_dropped_jobs, next_job_id, backlog_assigned |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Backlog refill can violate the 1000‑job queue cap.
With backlog enabled, overflow jobs can accumulate beyond the fixed queue size, which risks exceeding the 1000‑job limit (and potentially the per‑hour and max‑duration constraints) unless those limits are enforced elsewhere. Please cap combined queue+backlog size and validate new job counts/durations before enqueueing or refilling.
As per coding guidelines: Job queue constraints must enforce maximum 1000 jobs, maximum 1500 new jobs per hour, and maximum 170h runtime per job.
🤖 Prompt for AI Agents
In `@src/job_management.py` around lines 258 - 296, The backlog refill path can
exceed the 1000-job queue cap and skip per-hour/new-job and max-duration checks;
update assign_jobs_with_backlog_refill (and the helper fill_queue_from_backlog)
to enforce limits before moving jobs from backlog: compute combined_size =
current_queue_count(job_queue_2d) + len(backlog_queue) and only move
min(moved_from_backlog, 1000 - current_queue_count) jobs, validate each job's
new_job_count <= 1500_per_hour and runtime <= 170_hours before enqueueing,
reject or drop any jobs that violate these rules (increment total_dropped_jobs
and backlog_assigned appropriately), and ensure metrics/logging are updated when
jobs are dropped or skipped.
| while True: | ||
| num_launched_jobs, next_empty_slot, num_dropped, next_job_id = assign_jobs_to_available_nodes( | ||
| job_queue_2d, nodes, cores_available, running_jobs, | ||
| next_empty_slot, next_job_id, metrics, is_baseline=is_baseline | ||
| ) | ||
| total_launched_jobs += num_launched_jobs | ||
| total_dropped_jobs += num_dropped | ||
| if backlog_loaded_remaining > 0 and num_launched_jobs > 0: | ||
| assigned_from_backlog = min(num_launched_jobs, backlog_loaded_remaining) | ||
| backlog_assigned += assigned_from_backlog | ||
| backlog_loaded_remaining -= assigned_from_backlog | ||
|
|
||
| queue_empty = np.all(job_queue_2d[:, 0] == 0) | ||
| backlog_has_jobs = len(backlog_queue) > 0 | ||
| resources_available = np.any((nodes >= 0) & (cores_available > 0)) | ||
|
|
||
| moved_from_backlog = 0 | ||
| if queue_empty and backlog_has_jobs and resources_available: | ||
| next_empty_slot, moved_from_backlog = fill_queue_from_backlog( | ||
| job_queue_2d, backlog_queue, next_empty_slot | ||
| ) | ||
| backlog_loaded_remaining += moved_from_backlog | ||
|
|
||
| if num_launched_jobs == 0 and moved_from_backlog == 0: | ||
| break | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid re‑aging waiting jobs within the same step.
The loop re-runs assign_jobs_to_available_nodes even when no new jobs were pulled from backlog, which increments wait ages multiple times per step and inflates wait-time metrics (and future drop logic). Only iterate again when backlog is actually refilled.
✅ Suggested fix
- if num_launched_jobs == 0 and moved_from_backlog == 0:
- break
+ # Only iterate again if new jobs were pulled in from backlog.
+ if moved_from_backlog == 0:
+ break📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| while True: | |
| num_launched_jobs, next_empty_slot, num_dropped, next_job_id = assign_jobs_to_available_nodes( | |
| job_queue_2d, nodes, cores_available, running_jobs, | |
| next_empty_slot, next_job_id, metrics, is_baseline=is_baseline | |
| ) | |
| total_launched_jobs += num_launched_jobs | |
| total_dropped_jobs += num_dropped | |
| if backlog_loaded_remaining > 0 and num_launched_jobs > 0: | |
| assigned_from_backlog = min(num_launched_jobs, backlog_loaded_remaining) | |
| backlog_assigned += assigned_from_backlog | |
| backlog_loaded_remaining -= assigned_from_backlog | |
| queue_empty = np.all(job_queue_2d[:, 0] == 0) | |
| backlog_has_jobs = len(backlog_queue) > 0 | |
| resources_available = np.any((nodes >= 0) & (cores_available > 0)) | |
| moved_from_backlog = 0 | |
| if queue_empty and backlog_has_jobs and resources_available: | |
| next_empty_slot, moved_from_backlog = fill_queue_from_backlog( | |
| job_queue_2d, backlog_queue, next_empty_slot | |
| ) | |
| backlog_loaded_remaining += moved_from_backlog | |
| if num_launched_jobs == 0 and moved_from_backlog == 0: | |
| break | |
| while True: | |
| num_launched_jobs, next_empty_slot, num_dropped, next_job_id = assign_jobs_to_available_nodes( | |
| job_queue_2d, nodes, cores_available, running_jobs, | |
| next_empty_slot, next_job_id, metrics, is_baseline=is_baseline | |
| ) | |
| total_launched_jobs += num_launched_jobs | |
| total_dropped_jobs += num_dropped | |
| if backlog_loaded_remaining > 0 and num_launched_jobs > 0: | |
| assigned_from_backlog = min(num_launched_jobs, backlog_loaded_remaining) | |
| backlog_assigned += assigned_from_backlog | |
| backlog_loaded_remaining -= assigned_from_backlog | |
| queue_empty = np.all(job_queue_2d[:, 0] == 0) | |
| backlog_has_jobs = len(backlog_queue) > 0 | |
| resources_available = np.any((nodes >= 0) & (cores_available > 0)) | |
| moved_from_backlog = 0 | |
| if queue_empty and backlog_has_jobs and resources_available: | |
| next_empty_slot, moved_from_backlog = fill_queue_from_backlog( | |
| job_queue_2d, backlog_queue, next_empty_slot | |
| ) | |
| backlog_loaded_remaining += moved_from_backlog | |
| # Only iterate again if new jobs were pulled in from backlog. | |
| if moved_from_backlog == 0: | |
| break |
🤖 Prompt for AI Agents
In `@src/job_management.py` around lines 270 - 295, The loop currently re-calls
assign_jobs_to_available_nodes multiple times per step, which causes wait ages
to be incremented repeatedly; change the loop so it only iterates again when new
jobs were actually moved from backlog into the queue. Concretely, after calling
assign_jobs_to_available_nodes and optionally fill_queue_from_backlog, break out
of the while loop unless moved_from_backlog > 0 (i.e., only continue the loop
when fill_queue_from_backlog added jobs); update the loop's termination
condition that currently checks num_launched_jobs and moved_from_backlog so it
instead stops immediately when moved_from_backlog == 0 to prevent re‑aging,
referencing assign_jobs_to_available_nodes and fill_queue_from_backlog and the
moved_from_backlog variable.
Agent now has two extra integer variables in observation space to know of the existence and the length of the backlog_queue. Flagged assign_jobs so it only ages waiting jobs once per step
4132187 to
2112bc5
Compare
No description provided.