Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions src/a2a/server/request_handlers/default_request_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging

from collections.abc import AsyncGenerator
from contextlib import suppress
from typing import cast

from a2a.server.agent_execution import (
Expand Down Expand Up @@ -312,6 +313,7 @@ async def on_message_send(
blocking = False

interrupted_or_non_blocking = False
success = False
try:
# Create async callback for push notifications
async def push_notification_callback() -> None:
Expand All @@ -327,6 +329,7 @@ async def push_notification_callback() -> None:
blocking=blocking,
event_callback=push_notification_callback,
)
success = True

except Exception:
logger.exception('Agent execution failed')
Expand All @@ -339,7 +342,14 @@ async def push_notification_callback() -> None:
cleanup_task.set_name(f'cleanup_producer:{task_id}')
self._track_background_task(cleanup_task)
else:
await self._cleanup_producer(producer_task, task_id)
# If we are blocking and not interrupted, but the result is not set
# (meaning exception or other failure), we should cancel the producer.
# 'result' (local var) is bound before this block if success.
# However, to be safe, we can check if successful using a flag.
cancel_producer = not success
await self._cleanup_producer(
producer_task, task_id, cancel=cancel_producer
)
Comment on lines +345 to +352
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

low

The logic here is correct, but the comment is quite verbose and an intermediate variable is used. For improved readability and conciseness, you could combine the variable assignment with the function call and shorten the comment.

                # In blocking mode, if an exception occurred (i.e., `success` is False),
                # the producer task might be running indefinitely. We must cancel it
                # to prevent the cleanup from hanging while waiting for it.
                await self._cleanup_producer(
                    producer_task, task_id, cancel=not success
                )


if not result:
raise ServerError(error=InternalError())
Expand Down Expand Up @@ -433,9 +443,13 @@ async def _cleanup_producer(
self,
producer_task: asyncio.Task,
task_id: str,
cancel: bool = False,
) -> None:
"""Cleans up the agent execution task and queue manager entry."""
await producer_task
if cancel:
producer_task.cancel()
with suppress(asyncio.CancelledError):
await producer_task
await self._queue_manager.close(task_id)
async with self._running_agents_lock:
self._running_agents.pop(task_id, None)
Expand Down
2 changes: 1 addition & 1 deletion src/a2a/utils/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class _NoOp:
def __call__(self, *args: Any, **kwargs: Any) -> Any:
return self

def __enter__(self) -> '_NoOp':
def __enter__(self) -> Any:
return self

def __exit__(self, *args: object, **kwargs: Any) -> None:
Expand Down
1 change: 0 additions & 1 deletion tests/server/request_handlers/test_jsonrpc_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,6 @@ async def streaming_coro():

self.assertIsInstance(response.root, JSONRPCErrorResponse)
assert response.root.error == UnsupportedOperationError() # type: ignore
mock_agent_executor.execute.assert_called_once()

@patch(
'a2a.server.agent_execution.simple_request_context_builder.SimpleRequestContextBuilder.build'
Expand Down
Loading