Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 142 additions & 1 deletion cli/decompose/decompose.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import json
import keyword
import re
from enum import Enum
from graphlib import TopologicalSorter
from pathlib import Path
from typing import Annotated

import typer

from .pipeline import DecompBackend
from .pipeline import DecompBackend, DecompPipelineResult, DecompSubtasksResult


# Must maintain declaration order
Expand All @@ -20,6 +22,141 @@ class DecompVersion(str, Enum):
this_file_dir = Path(__file__).resolve().parent


def reorder_subtasks(
subtasks: list[DecompSubtasksResult],
) -> list[DecompSubtasksResult]:
"""Reorder subtasks based on their dependencies using topological sort.

Uses Python's graphlib.TopologicalSorter to perform a topological sort of
subtasks based on their depends_on relationships. Also renumbers the subtask
descriptions if they start with a number pattern (e.g., "1. ", "2. ").

Args:
subtasks: List of subtask dictionaries with 'tag' and 'depends_on' fields

Returns:
Reordered list of subtasks where all dependencies appear before dependents,
with subtask descriptions renumbered to match the new order

Raises:
ValueError: If a circular dependency is detected
"""
# Build dependency graph
Copy link
Member

@psschwei psschwei Feb 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would there be any benefit to changing the internal representation of tasks to a graph rather than a list (within all of decompose, not just here)? I'm pretty sure the answer is no, but this is probably the time to ask 😄

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I was looking at it a bit further down the chain and the only benefit would be that you wouldn't need this intermediary bit to verify ordering.

Thinking about that though, while the subtasks are ordered correctly I think I need to double check that the numbering matches as well post re-ordering.

subtask_map = {subtask["tag"].lower(): subtask for subtask in subtasks}

# Build graph for TopologicalSorter
# Format: {node: {dependencies}}
graph = {}
for tag, subtask in subtask_map.items():
deps = subtask.get("depends_on", [])
# Filter to only include dependencies that exist in subtask_map
valid_deps = {dep.lower() for dep in deps if dep.lower() in subtask_map}
graph[tag] = valid_deps

# Perform topological sort
try:
ts = TopologicalSorter(graph)
sorted_tags = list(ts.static_order())
except ValueError as e:
# TopologicalSorter raises ValueError for circular dependencies
raise ValueError(
"Circular dependency detected in subtasks. Cannot automatically reorder."
) from e

# Get reordered subtasks
reordered = [subtask_map[tag] for tag in sorted_tags]

# Renumber subtask descriptions if they start with "N. " pattern
number_pattern = re.compile(r"^\d+\.\s+")
for i, subtask in enumerate(reordered, start=1):
if number_pattern.match(subtask["subtask"]):
# Replace the number at the start with the new position
subtask["subtask"] = number_pattern.sub(f"{i}. ", subtask["subtask"])

return reordered


def verify_user_variables(
decomp_data: DecompPipelineResult, input_var: list[str] | None
) -> DecompPipelineResult:
"""Verify and fix user variable ordering in subtasks.

Validates that:
1. All input_vars_required exist in the provided input_var list
2. All depends_on variables reference existing subtasks
3. Subtasks are ordered so dependencies appear before dependents

If dependencies are out of order, automatically reorders them using topological sort.

Args:
decomp_data: The decomposition pipeline result containing subtasks
input_var: List of user-provided input variable names

Returns:
The decomp_data with potentially reordered subtasks

Raises:
ValueError: If a required input variable is missing or dependencies are invalid
"""
if input_var is None:
input_var = []

# Normalize input variables to lowercase for comparison
available_input_vars = {var.lower() for var in input_var}

# Build set of all subtask tags
all_subtask_tags = {subtask["tag"].lower() for subtask in decomp_data["subtasks"]}

# Validate that all required variables exist
for subtask in decomp_data["subtasks"]:
subtask_tag = subtask["tag"].lower()

# Check input_vars_required exist in provided input variables
for required_var in subtask.get("input_vars_required", []):
var_lower = required_var.lower()
if var_lower not in available_input_vars:
raise ValueError(
f'Subtask "{subtask_tag}" requires input variable '
f'"{required_var}" which was not provided in --input-var. '
f"Available input variables: {sorted(available_input_vars) if available_input_vars else 'none'}"
)

# Check that all dependencies exist somewhere in the subtasks
for dep_var in subtask.get("depends_on", []):
dep_lower = dep_var.lower()
if dep_lower not in all_subtask_tags:
raise ValueError(
f'Subtask "{subtask_tag}" depends on variable '
f'"{dep_var}" which does not exist in any subtask. '
f"Available subtask tags: {sorted(all_subtask_tags)}"
)

# Check if reordering is needed
needs_reordering = False
defined_subtask_tags = set()

for subtask in decomp_data["subtasks"]:
subtask_tag = subtask["tag"].lower()

# Check if any dependency hasn't been defined yet
for dep_var in subtask.get("depends_on", []):
dep_lower = dep_var.lower()
if dep_lower not in defined_subtask_tags:
needs_reordering = True
break

if needs_reordering:
break

defined_subtask_tags.add(subtask_tag)

# Reorder if needed
if needs_reordering:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we log that this is happening?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not opposed to the idea.

From a user perspective nothing related to the creation of the output is currently available until the final output (though you will see some progress markers via cli) - this behavior is unchanged.

decomp_data["subtasks"] = reorder_subtasks(decomp_data["subtasks"])

return decomp_data


def run(
out_dir: Annotated[
Path,
Expand Down Expand Up @@ -170,6 +307,10 @@ def run(
backend_api_key=backend_api_key,
)

# Verify that all user variables are properly defined before use
# This may reorder subtasks if dependencies are out of order
decomp_data = verify_user_variables(decomp_data, input_var)

with open(out_dir / f"{out_name}.json", "w") as f:
json.dump(decomp_data, f, indent=2)

Expand Down
Loading