From 999ca06e0b960708a1db1ce5af1367949ed1c9b9 Mon Sep 17 00:00:00 2001 From: Mayank Jha Date: Fri, 30 Jan 2026 01:42:42 -0800 Subject: [PATCH] feat: add --poll-for-triggers flag to uipath run Add a new CLI flag that enables polling for resume triggers instead of suspending execution. This reuses the existing UiPathDebugRuntime polling mechanism with a new SilentDebugBridge for non-interactive use. Usage: - `uipath run agent.py '{}' --poll-for-triggers` (5s default interval) - `uipath run agent.py '{}' --poll-for-triggers=10` (custom 10s interval) For API/HITL triggers, prompts for JSON input via stdin. For other triggers (TASK, JOB, DEEP_RAG, etc.), polls until complete. Co-Authored-By: Claude Opus 4.5 --- src/uipath/_cli/_debug/_silent_bridge.py | 106 +++++++++++++++++++++++ src/uipath/_cli/cli_run.py | 30 +++++++ 2 files changed, 136 insertions(+) create mode 100644 src/uipath/_cli/_debug/_silent_bridge.py diff --git a/src/uipath/_cli/_debug/_silent_bridge.py b/src/uipath/_cli/_debug/_silent_bridge.py new file mode 100644 index 000000000..bc39bdead --- /dev/null +++ b/src/uipath/_cli/_debug/_silent_bridge.py @@ -0,0 +1,106 @@ +"""Silent debug bridge for polling mode - minimal output, no interactive debugging.""" + +import asyncio +import json +import logging +import signal +from concurrent.futures import ThreadPoolExecutor +from typing import Any, Literal + +from uipath.runtime import UiPathRuntimeResult +from uipath.runtime.debug import UiPathDebugQuitError +from uipath.runtime.events import UiPathRuntimeStateEvent +from uipath.runtime.resumable import UiPathResumeTriggerType + +logger = logging.getLogger(__name__) + + +class SilentDebugBridge: + """A minimal debug bridge for polling mode - no interactive output.""" + + def __init__(self): + self._terminate_event: asyncio.Event | None = None + self._waiting_for_api_input = False + self._stdin_executor = ThreadPoolExecutor(max_workers=1) + + async def connect(self) -> None: + self._terminate_event = asyncio.Event() + signal.signal(signal.SIGINT, self._handle_sigint) + + async def disconnect(self) -> None: + pass + + async def emit_execution_started(self, **kwargs) -> None: + logger.debug("Execution started (polling mode)") + + async def emit_state_update(self, state_event: UiPathRuntimeStateEvent) -> None: + if state_event.node_name == "": + logger.info( + f"Polling for trigger... (attempt {state_event.payload.get('attempt', '?')})" + ) + + async def emit_breakpoint_hit(self, breakpoint_result: Any) -> None: + pass # No breakpoints in polling mode + + async def emit_execution_completed( + self, runtime_result: UiPathRuntimeResult + ) -> None: + logger.debug(f"Execution completed: {runtime_result.status}") + + async def emit_execution_suspended( + self, runtime_result: UiPathRuntimeResult + ) -> None: + if ( + runtime_result.trigger + and runtime_result.trigger.trigger_type == UiPathResumeTriggerType.API + ): + self._waiting_for_api_input = True + print("API trigger suspended. Please provide JSON input:") + + async def emit_execution_resumed(self, resume_data: Any) -> None: + logger.debug("Execution resumed") + + async def emit_execution_error(self, error: str) -> None: + logger.error(f"Execution error: {error}") + + async def wait_for_resume(self) -> dict[str, Any] | None: + """Wait for resume - prompt for input on API triggers.""" + if self._waiting_for_api_input: + self._waiting_for_api_input = False + loop = asyncio.get_running_loop() + try: + user_input = await loop.run_in_executor( + self._stdin_executor, self._read_input_blocking + ) + stripped = user_input.strip() + if not stripped: + return {} + try: + return json.loads(stripped) + except json.JSONDecodeError: + return stripped + except (KeyboardInterrupt, EOFError): + raise UiPathDebugQuitError("User interrupted") + return None # Non-API triggers don't need user input + + async def wait_for_terminate(self) -> None: + assert self._terminate_event is not None + await self._terminate_event.wait() + + def get_breakpoints(self) -> list[str] | Literal["*"]: + return [] # No breakpoints + + def _read_input_blocking(self) -> str: + assert self._terminate_event is not None + try: + return input("> ") + except KeyboardInterrupt as e: + self._terminate_event.set() + raise UiPathDebugQuitError("User pressed Ctrl+C") from e + except EOFError as e: + self._terminate_event.set() + raise UiPathDebugQuitError("STDIN closed by user") from e + + def _handle_sigint(self, signum: int, frame: Any) -> None: + if self._terminate_event: + asyncio.get_running_loop().call_soon_threadsafe(self._terminate_event.set) diff --git a/src/uipath/_cli/cli_run.py b/src/uipath/_cli/cli_run.py index c447ad8ef..78a17443e 100644 --- a/src/uipath/_cli/cli_run.py +++ b/src/uipath/_cli/cli_run.py @@ -84,6 +84,14 @@ is_flag=True, help="Keep the temporary state file even when not resuming and no job id is provided", ) +@click.option( + "--poll-for-triggers", + type=float, + default=None, + is_flag=False, + flag_value=5.0, + help="Poll for HITL triggers instead of suspending. Default interval: 5s. Specify custom: --poll-for-triggers=10", +) def run( entrypoint: str | None, input: str | None, @@ -96,6 +104,7 @@ def run( debug: bool, debug_port: int, keep_state_file: bool, + poll_for_triggers: float | None, ) -> None: """Execute the project.""" input_file = file or input_file @@ -116,6 +125,7 @@ def run( debug=debug, debug_port=debug_port, keep_state_file=keep_state_file, + poll_for_triggers=poll_for_triggers, ) if result.error_message: @@ -195,6 +205,21 @@ async def execute() -> None: ctx.conversation_id or ctx.job_id or "default", ) + # Wrap with polling debug runtime if requested + if poll_for_triggers is not None and poll_for_triggers > 0: + from uipath.runtime.debug import UiPathDebugRuntime + + from uipath._cli._debug._silent_bridge import ( + SilentDebugBridge, + ) + + silent_bridge = SilentDebugBridge() + runtime = UiPathDebugRuntime( + delegate=runtime, + debug_bridge=silent_bridge, + trigger_poll_interval=poll_for_triggers, + ) + if ctx.job_id: trace_manager.add_span_processor( LiveTrackingSpanProcessor( @@ -214,6 +239,11 @@ async def execute() -> None: ctx.result = await execute_runtime( ctx, chat_runtime or runtime ) + elif ( + poll_for_triggers is not None and poll_for_triggers > 0 + ): + # Polling mode: UiPathDebugRuntime handles everything + ctx.result = await execute_runtime(ctx, runtime) else: ctx.result = await debug_runtime(ctx, runtime) finally: