diff --git a/src/a2a/server/request_handlers/default_request_handler.py b/src/a2a/server/request_handlers/default_request_handler.py index 30d1ee891..9035d2b60 100644 --- a/src/a2a/server/request_handlers/default_request_handler.py +++ b/src/a2a/server/request_handlers/default_request_handler.py @@ -2,6 +2,7 @@ import logging from collections.abc import AsyncGenerator +from contextlib import suppress from typing import cast from a2a.server.agent_execution import ( @@ -312,6 +313,7 @@ async def on_message_send( blocking = False interrupted_or_non_blocking = False + success = False try: # Create async callback for push notifications async def push_notification_callback() -> None: @@ -327,6 +329,7 @@ async def push_notification_callback() -> None: blocking=blocking, event_callback=push_notification_callback, ) + success = True except Exception: logger.exception('Agent execution failed') @@ -339,7 +342,14 @@ async def push_notification_callback() -> None: cleanup_task.set_name(f'cleanup_producer:{task_id}') self._track_background_task(cleanup_task) else: - await self._cleanup_producer(producer_task, task_id) + # If we are blocking and not interrupted, but the result is not set + # (meaning exception or other failure), we should cancel the producer. + # 'result' (local var) is bound before this block if success. + # However, to be safe, we can check if successful using a flag. + cancel_producer = not success + await self._cleanup_producer( + producer_task, task_id, cancel=cancel_producer + ) if not result: raise ServerError(error=InternalError()) @@ -433,9 +443,13 @@ async def _cleanup_producer( self, producer_task: asyncio.Task, task_id: str, + cancel: bool = False, ) -> None: """Cleans up the agent execution task and queue manager entry.""" - await producer_task + if cancel: + producer_task.cancel() + with suppress(asyncio.CancelledError): + await producer_task await self._queue_manager.close(task_id) async with self._running_agents_lock: self._running_agents.pop(task_id, None) diff --git a/src/a2a/utils/telemetry.py b/src/a2a/utils/telemetry.py index c73d2ac92..81f0a0cc7 100644 --- a/src/a2a/utils/telemetry.py +++ b/src/a2a/utils/telemetry.py @@ -86,7 +86,7 @@ class _NoOp: def __call__(self, *args: Any, **kwargs: Any) -> Any: return self - def __enter__(self) -> '_NoOp': + def __enter__(self) -> Any: return self def __exit__(self, *args: object, **kwargs: Any) -> None: diff --git a/tests/server/request_handlers/test_jsonrpc_handler.py b/tests/server/request_handlers/test_jsonrpc_handler.py index d1ead0211..d10d544ac 100644 --- a/tests/server/request_handlers/test_jsonrpc_handler.py +++ b/tests/server/request_handlers/test_jsonrpc_handler.py @@ -322,7 +322,6 @@ async def streaming_coro(): self.assertIsInstance(response.root, JSONRPCErrorResponse) assert response.root.error == UnsupportedOperationError() # type: ignore - mock_agent_executor.execute.assert_called_once() @patch( 'a2a.server.agent_execution.simple_request_context_builder.SimpleRequestContextBuilder.build'