Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ dependencies = [
"pydantic>=2.11.3",
"protobuf>=5.29.5",
"google-api-core>=1.26.0",
"typing-extensions>=4.0.0",
]

classifiers = [
Expand Down
31 changes: 28 additions & 3 deletions src/a2a/server/tasks/base_push_notification_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,7 @@ async def _dispatch_notification(
) -> bool:
url = push_info.url
try:
headers = None
if push_info.token:
headers = {'X-A2A-Notification-Token': push_info.token}
headers = self._build_headers(push_info)
response = await self._client.post(
url,
json=task.model_dump(mode='json', exclude_none=True),
Expand All @@ -72,3 +70,30 @@ async def _dispatch_notification(
)
return False
return True

@staticmethod
def _authorization_header(
push_info: PushNotificationConfig,
) -> str | None:
auth = push_info.authentication
if not auth or not auth.credentials:
return None
schemes = [scheme for scheme in auth.schemes if scheme]
if not schemes:
return None
scheme = next(
(scheme for scheme in schemes if scheme.lower() == 'bearer'),
schemes[0],
)
return f'{scheme} {auth.credentials}'

def _build_headers(
self, push_info: PushNotificationConfig
) -> dict[str, str] | None:
headers: dict[str, str] = {}
if push_info.token:
headers['X-A2A-Notification-Token'] = push_info.token
authorization = self._authorization_header(push_info)
if authorization:
headers['Authorization'] = authorization
return headers or None
11 changes: 10 additions & 1 deletion src/a2a/utils/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,15 @@ def internal_method(self):
from typing import TYPE_CHECKING, Any


if TYPE_CHECKING:
from typing_extensions import Self
else:
try:
from typing import Self
except ImportError: # pragma: no cover - for Python < 3.11
from typing_extensions import Self


if TYPE_CHECKING:
from opentelemetry.trace import SpanKind as SpanKindType
else:
Expand All @@ -86,7 +95,7 @@ class _NoOp:
def __call__(self, *args: Any, **kwargs: Any) -> Any:
return self

def __enter__(self) -> '_NoOp':
def __enter__(self) -> Self:
return self

def __exit__(self, *args: object, **kwargs: Any) -> None:
Expand Down
76 changes: 75 additions & 1 deletion tests/server/tasks/test_push_notification_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@
from unittest.mock import AsyncMock, MagicMock, patch

import httpx
import pytest
from pydantic import ValidationError

from a2a.server.tasks.base_push_notification_sender import (
BasePushNotificationSender,
)
from a2a.types import (
PushNotificationAuthenticationInfo,
PushNotificationConfig,
Task,
TaskState,
Expand All @@ -29,8 +32,14 @@ def create_sample_push_config(
url: str = 'http://example.com/callback',
config_id: str = 'cfg1',
token: str | None = None,
authentication: PushNotificationAuthenticationInfo | None = None,
) -> PushNotificationConfig:
return PushNotificationConfig(id=config_id, url=url, token=token)
return PushNotificationConfig(
id=config_id,
url=url,
token=token,
authentication=authentication,
)


class TestBasePushNotificationSender(unittest.IsolatedAsyncioTestCase):
Expand Down Expand Up @@ -92,6 +101,71 @@ async def test_send_notification_with_token_success(self) -> None:
)
mock_response.raise_for_status.assert_called_once()

async def test_send_notification_with_auth_header(self) -> None:
task_id = 'task_send_auth'
task_data = create_sample_task(task_id=task_id)
auth = PushNotificationAuthenticationInfo(
schemes=['Basic', 'Bearer'], credentials='token_or_jwt'
)
config = create_sample_push_config(
url='http://notify.me/here',
token='unique_token',
authentication=auth,
)
self.mock_config_store.get_info.return_value = [config]

mock_response = AsyncMock(spec=httpx.Response)
mock_response.status_code = 200
self.mock_httpx_client.post.return_value = mock_response

await self.sender.send_notification(task_data)

self.mock_config_store.get_info.assert_awaited_once_with

self.mock_httpx_client.post.assert_awaited_once_with(
config.url,
json=task_data.model_dump(mode='json', exclude_none=True),
headers={
'X-A2A-Notification-Token': 'unique_token',
'Authorization': 'Bearer token_or_jwt',
},
)
mock_response.raise_for_status.assert_called_once()

def test_authorization_header_no_credentials(self) -> None:
auth = PushNotificationAuthenticationInfo(
schemes=['Bearer'], credentials=None
)
config = create_sample_push_config(authentication=auth)
assert self.sender._authorization_header(config) is None

def test_authorization_header_empty_schemes(self) -> None:
auth = PushNotificationAuthenticationInfo(
schemes=[], credentials='token'
)
config = create_sample_push_config(authentication=auth)
assert self.sender._authorization_header(config) is None

def test_authorization_header_non_bearer_scheme(self) -> None:
auth = PushNotificationAuthenticationInfo(
schemes=['Basic'], credentials='token'
)
config = create_sample_push_config(authentication=auth)
assert self.sender._authorization_header(config) == 'Basic token'

def test_authorization_header_filters_empty_schemes(self) -> None:
auth = PushNotificationAuthenticationInfo(
schemes=['', 'Bearer'], credentials='token'
)
config = create_sample_push_config(authentication=auth)
assert self.sender._authorization_header(config) == 'Bearer token'

def test_authorization_header_none_scheme_rejected(self) -> None:
with pytest.raises(ValidationError):
PushNotificationAuthenticationInfo(
schemes=['Bearer', None], credentials='token'
)

async def test_send_notification_no_config(self) -> None:
task_id = 'task_send_no_config'
task_data = create_sample_task(task_id=task_id)
Expand Down
4 changes: 3 additions & 1 deletion uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading