diff --git a/pyproject.toml b/pyproject.toml index ff52e089a..f4393c0c2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "uipath" -version = "2.6.22" +version = "2.6.23" description = "Python SDK and CLI for UiPath Platform, enabling programmatic interaction with automation services, process management, and deployment tools." readme = { file = "README.md", content-type = "text/markdown" } requires-python = ">=3.11" diff --git a/src/uipath/_cli/__init__.py b/src/uipath/_cli/__init__.py index 08123f028..9b33a450c 100644 --- a/src/uipath/_cli/__init__.py +++ b/src/uipath/_cli/__init__.py @@ -31,13 +31,14 @@ "eval": "cli_eval", "dev": "cli_dev", "add": "cli_add", + "server": "cli_server", "register": "cli_register", "debug": "cli_debug", "assets": "services.cli_assets", "buckets": "services.cli_buckets", } -_RUNTIME_COMMANDS = {"init", "dev", "run", "eval", "debug"} +_RUNTIME_COMMANDS = {"init", "dev", "run", "eval", "debug", "server"} _runtime_initialized = False diff --git a/src/uipath/_cli/cli_server.py b/src/uipath/_cli/cli_server.py new file mode 100644 index 000000000..850c3b779 --- /dev/null +++ b/src/uipath/_cli/cli_server.py @@ -0,0 +1,311 @@ +import asyncio +import importlib +import json +import os +import shlex +import sys +import tempfile +from importlib.metadata import entry_points +from importlib.util import find_spec +from typing import Any + +import click +from aiohttp import ClientSession, UnixConnector, web + +from ._utils._console import ConsoleLogger +from .cli_debug import debug +from .cli_eval import eval +from .cli_run import run + +console = ConsoleLogger() + +SOCKET_ENV_VAR = "UIPATH_SERVER_SOCKET" +DEFAULT_SOCKET_PATH = "/tmp/uipath-server.sock" +DEFAULT_PORT = 8765 + +IS_WINDOWS = sys.platform == "win32" + +COMMANDS = { + "run": run, + "debug": debug, + "eval": eval, +} + +DEFAULT_PRELOAD_MODULES = [ + # Network/async - slowest to load + "pysignalr.client", + "socketio", + "httpx", + # Validation/serialization + "pydantic", + "pydantic_function_models", + # CLI/UI + "click", + "rich", + # Core + "uipath.platform", + # Auth + "jwt", + # Retry logic + "tenacity", +] + + +def preload_modules() -> None: + """Pre-load modules registered by all uipath packages.""" + console.info("Pre-loading modules...") + + modules_to_load: set[str] = set(DEFAULT_PRELOAD_MODULES) + + for ep in entry_points(group="uipath.preload"): + try: + get_modules = ep.load() + modules_to_load.update(get_modules()) + except Exception as e: + console.warning(f"Failed to load entry point {ep.name}: {e}") + + for module_name in modules_to_load: + if module_name in sys.modules: + continue + if find_spec(module_name) is None: + continue + try: + importlib.import_module(module_name) + except ImportError as e: + console.warning(f"Failed to load {module_name}: {e}") + + console.success("Modules pre-loaded") + + +def generate_socket_path() -> str: + """Generate a unique socket path for the server to listen on.""" + return os.path.join(tempfile.gettempdir(), f"uipath-server-{os.getpid()}.sock") + + +def get_field(message: dict[str, Any], *keys: str) -> Any: + """Get a field from message, trying multiple key variations.""" + for key in keys: + if key in message: + return message[key] + return None + + +def parse_args(args: str | list[str] | None) -> list[str]: + """Parse args into a list of strings.""" + if args is None: + return [] + if isinstance(args, list): + return args + if isinstance(args, str): + return shlex.split(args) + return [] + + +async def send_ack(ack_socket_path: str, server_socket_path: str) -> None: + """Send acknowledgment via HTTP POST to the ack socket.""" + ack_message: dict[str, str] = { + "status": "ready", + "socket": server_socket_path, + } + + conn = UnixConnector(path=ack_socket_path) + try: + async with ClientSession(connector=conn) as session: + async with session.post( + "http://localhost/ack", # placeholder URL for Unix socket + json=ack_message, + ) as response: + if response.status == 200: + console.success(f"Sent ack to {ack_socket_path}") + else: + console.error(f"Ack failed with status {response.status}") + raise RuntimeError(f"Ack failed: {response.status}") + except Exception as e: + console.error(f"Failed to send ack to {ack_socket_path}: {e}") + raise + + +async def handle_health(request: web.Request) -> web.Response: + """Handle GET /health endpoint.""" + return web.Response(text="OK", status=200) + + +async def handle_start(request: web.Request) -> web.Response: + """Handle POST /jobs/{job_key}/start endpoint.""" + job_key = request.match_info.get("job_key") + if not job_key: + return web.json_response( + {"success": False, "error": "Missing job_key"}, + status=400, + ) + + try: + message: dict[str, Any] = await request.json() + except json.JSONDecodeError: + return web.json_response( + {"success": False, "error": "Invalid JSON"}, + status=400, + ) + + command_name = get_field(message, "command", "Command") + if not isinstance(command_name, str): + return web.json_response( + {"success": False, "error": "Missing or invalid field: 'command'"}, + status=400, + ) + + args_raw = get_field(message, "args", "Args") + args = parse_args(args_raw) + + env_vars = get_field(message, "environmentVariables", "EnvironmentVariables") or {} + working_dir = get_field(message, "workingDirectory", "WorkingDirectory") + + console.info(f"Starting job {job_key}: {command_name} {args}") + + cmd = COMMANDS.get(command_name) + if cmd is None: + return web.json_response( + {"success": False, "error": f"Unknown command: {command_name}"}, + status=400, + ) + + # Save original state + original_cwd = os.getcwd() + original_env = os.environ.copy() + + console.info(f"Original cwd: {original_cwd}") + console.info(f"Requested working_dir: {working_dir}") + + try: + if isinstance(env_vars, dict): + os.environ.update(env_vars) + + if working_dir and isinstance(working_dir, str): + os.chdir(working_dir) + + result = await asyncio.to_thread(cmd.main, args, standalone_mode=False) + + return web.json_response( + { + "success": True, + "job_key": job_key, + "result": result, + } + ) + except SystemExit as e: + exit_code = e.code if isinstance(e.code, int) else 1 + return web.json_response( + { + "success": exit_code == 0, + "job_key": job_key, + "error": None if exit_code == 0 else f"Exit code: {exit_code}", + } + ) + except Exception as e: + return web.json_response( + {"success": False, "job_key": job_key, "error": str(e)}, + status=500, + ) + finally: + # Restore original state + os.chdir(original_cwd) + os.environ.clear() + os.environ.update(original_env) + + +def create_app() -> web.Application: + """Create the aiohttp application.""" + app = web.Application() + app.router.add_get("/health", handle_health) + app.router.add_post("/jobs/{job_key}/start", handle_start) + return app + + +async def start_unix_server(ack_socket_path: str) -> None: + """Start Unix domain socket HTTP server.""" + server_socket_path = generate_socket_path() + + if os.path.exists(server_socket_path): + os.unlink(server_socket_path) + + app = create_app() + runner = web.AppRunner(app) + await runner.setup() + + try: + site = web.UnixSite(runner, server_socket_path) + await site.start() + + console.success(f"Server listening on unix://{server_socket_path}") + + await send_ack(ack_socket_path, server_socket_path) + + while True: + await asyncio.sleep(3600) + finally: + await runner.cleanup() + if os.path.exists(server_socket_path): + os.unlink(server_socket_path) + + +async def start_tcp_server(host: str, port: int) -> None: + """Start TCP HTTP server (Windows fallback).""" + app = create_app() + runner = web.AppRunner(app) + await runner.setup() + + try: + site = web.TCPSite(runner, host, port) + await site.start() + + console.success(f"Server listening on http://{host}:{port}") + + while True: + await asyncio.sleep(3600) + finally: + await runner.cleanup() + + +@click.command() +@click.option( + "--socket", + type=str, + default=None, + help=f"Unix socket path to send ready ack to (default: ${SOCKET_ENV_VAR} or {DEFAULT_SOCKET_PATH})", +) +@click.option( + "--port", + type=int, + default=None, + help=f"TCP port, used on Windows or when --tcp flag is set (default: {DEFAULT_PORT})", +) +@click.option( + "--tcp", + is_flag=True, + help="Force TCP mode even on Unix systems", +) +def server(socket: str | None, port: int | None, tcp: bool) -> None: + """Start an HTTP server that forwards commands to run/debug/eval. + + Creates its own socket to listen on and sends an ack to --socket with: + {"status": "ready", "socket": "/path/to/server.sock"} + + Endpoint: POST /jobs/{job_key}/start + Body: {"command": "run", "args": "agent.json '{}'", "environmentVariables": {}, "workingDirectory": "/path"} + + Endpoint: GET /health + """ + use_tcp = IS_WINDOWS or tcp + + preload_modules() + + try: + if use_tcp: + asyncio.run(start_tcp_server("127.0.0.1", port or DEFAULT_PORT)) + else: + ack_socket_path = ( + socket or os.environ.get(SOCKET_ENV_VAR) or DEFAULT_SOCKET_PATH + ) + asyncio.run(start_unix_server(ack_socket_path)) + except KeyboardInterrupt: + console.info("Shutting down") diff --git a/tests/cli/test_server.py b/tests/cli/test_server.py new file mode 100644 index 000000000..15f315044 --- /dev/null +++ b/tests/cli/test_server.py @@ -0,0 +1,156 @@ +import asyncio +import json +import os +import threading +import time +from typing import Any + +import aiohttp +import pytest + +from uipath._cli.cli_server import start_tcp_server + + +def create_uipath_json(script_path: str, entrypoint_name: str = "main"): + """Helper to create uipath.json with functions.""" + return {"functions": {entrypoint_name: f"{script_path}:main"}} + + +async def start_job( + port: int, job_key: str, command: str, args: list[str] +) -> dict[str, Any]: + """Start a job on the server.""" + async with aiohttp.ClientSession() as session: + async with session.post( + f"http://127.0.0.1:{port}/jobs/{job_key}/start", + json={"command": command, "args": args}, + ) as response: + return await response.json() + + +class TestServer: + @pytest.fixture + def server_port(self): + """Use a random available port for testing.""" + import socket + + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(("127.0.0.1", 0)) + return s.getsockname()[1] + + @pytest.fixture + def server(self, server_port): + """Start the server in a background thread.""" + + def run_server(): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + loop.run_until_complete(start_tcp_server("127.0.0.1", server_port)) + except asyncio.CancelledError: + pass + finally: + loop.close() + + thread = threading.Thread(target=run_server, daemon=True) + thread.start() + time.sleep(0.5) + + yield server_port + + @pytest.fixture + def simple_script(self) -> str: + return """ +from dataclasses import dataclass + +@dataclass +class Input: + message: str + repeat: int = 1 + +def main(input: Input) -> str: + return (input.message + " ") * input.repeat +""" + + def test_start_job_success(self, server, temp_dir, simple_script): + """Test starting a job through the server.""" + port = server + job_key = "test-job-123" + + with pytest.MonkeyPatch().context() as mp: + mp.chdir(temp_dir) + + script_file = "entrypoint.py" + script_path = os.path.join(temp_dir, script_file) + with open(script_path, "w") as f: + f.write(simple_script) + + with open(os.path.join(temp_dir, "uipath.json"), "w") as f: + json.dump(create_uipath_json(script_file), f) + + input_file = os.path.join(temp_dir, "input.json") + with open(input_file, "w") as f: + json.dump({"message": "Hello", "repeat": 3}, f) + + output_file = os.path.join(temp_dir, "output.json") + + response = asyncio.run( + start_job( + port, + job_key, + "run", + ["main", "--input-file", input_file, "--output-file", output_file], + ) + ) + + assert response["success"] is True + assert response["job_key"] == job_key + assert os.path.exists(output_file) + + with open(output_file, "r") as f: + output = f.read() + assert "Hello" in output + + def test_start_job_unknown_command(self, server): + """Test starting a job with unknown command.""" + port = server + + response = asyncio.run(start_job(port, "job-123", "unknown_command", [])) + + assert response["success"] is False + assert "Unknown command" in response["error"] + + def test_start_job_missing_command(self, server): + """Test starting a job without command field.""" + port = server + + async def send_invalid(): + async with aiohttp.ClientSession() as session: + async with session.post( + f"http://127.0.0.1:{port}/jobs/job-123/start", + json={"args": ["some", "args"]}, + ) as response: + return await response.json() + + response = asyncio.run(send_invalid()) + + assert response["success"] is False + assert "command" in response["error"] + + def test_start_job_invalid_json(self, server): + """Test starting a job with invalid JSON.""" + port = server + + async def send_invalid(): + async with aiohttp.ClientSession() as session: + async with session.post( + f"http://127.0.0.1:{port}/jobs/job-123/start", + data="not valid json", + headers={"Content-Type": "application/json"}, + ) as response: + return await response.json() + + response = asyncio.run(send_invalid()) + + assert response["success"] is False + assert "Invalid JSON" in response["error"] diff --git a/uv.lock b/uv.lock index b3289b67b..88710ccf2 100644 --- a/uv.lock +++ b/uv.lock @@ -2491,7 +2491,7 @@ wheels = [ [[package]] name = "uipath" -version = "2.6.22" +version = "2.6.23" source = { editable = "." } dependencies = [ { name = "applicationinsights" },