Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/adcp/ADCP_VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.6.0
2.5.3
110 changes: 91 additions & 19 deletions src/adcp/adagents.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,10 @@ def verify_agent_authorization(
return False


# Maximum number of authoritative_location redirects to follow
MAX_REDIRECT_DEPTH = 5


async def fetch_adagents(
publisher_domain: str,
timeout: float = 10.0,
Expand All @@ -288,6 +292,11 @@ async def fetch_adagents(
) -> dict[str, Any]:
"""Fetch and parse adagents.json from publisher domain.

Follows authoritative_location redirects per the AdCP specification. When a
publisher's adagents.json contains an authoritative_location field instead of
authorized_agents, this function fetches the referenced URL to get the actual
authorization data.

Args:
publisher_domain: Domain hosting the adagents.json file
timeout: Request timeout in seconds
Expand All @@ -297,11 +306,12 @@ async def fetch_adagents(
If None, a new client is created for this request.

Returns:
Parsed adagents.json data
Parsed adagents.json data (resolved from authoritative_location if present)

Raises:
AdagentsNotFoundError: If adagents.json not found (404)
AdagentsValidationError: If JSON is invalid or malformed
AdagentsValidationError: If JSON is invalid, malformed, or redirects
exceed maximum depth or form a loop
AdagentsTimeoutError: If request times out

Notes:
Expand All @@ -311,21 +321,74 @@ async def fetch_adagents(
# Validate and normalize domain for security
publisher_domain = _validate_publisher_domain(publisher_domain)

# Construct URL
# Construct initial URL
url = f"https://{publisher_domain}/.well-known/adagents.json"

# Track visited URLs to detect loops
visited_urls: set[str] = set()

for depth in range(MAX_REDIRECT_DEPTH + 1):
# Check for redirect loop
if url in visited_urls:
raise AdagentsValidationError(
f"Circular redirect detected: {url} already visited"
)
visited_urls.add(url)

data = await _fetch_adagents_url(url, timeout, user_agent, client)

# Check if this is a redirect. A response with authoritative_location but no
# authorized_agents indicates a redirect. If both are present, authorized_agents
# takes precedence (response is treated as final).
if "authoritative_location" in data and "authorized_agents" not in data:
authoritative_url = data["authoritative_location"]

# Validate HTTPS requirement
if not isinstance(authoritative_url, str) or not authoritative_url.startswith(
"https://"
):
raise AdagentsValidationError(
f"authoritative_location must be an HTTPS URL, got: {authoritative_url!r}"
)

# Check if we've exceeded max depth
if depth >= MAX_REDIRECT_DEPTH:
raise AdagentsValidationError(
f"Maximum redirect depth ({MAX_REDIRECT_DEPTH}) exceeded"
)

# Follow the redirect
url = authoritative_url
continue

# We have the final data with authorized_agents (or both fields present,
# in which case authorized_agents takes precedence)
return data

# Unreachable: loop always exits via return or raise above
raise AssertionError("Unreachable") # pragma: no cover


async def _fetch_adagents_url(
url: str,
timeout: float,
user_agent: str,
client: httpx.AsyncClient | None,
) -> dict[str, Any]:
"""Fetch and parse adagents.json from a specific URL.

This is the core fetch logic, separated to support redirect following.
"""
try:
# Use provided client or create a new one
if client is not None:
# Reuse provided client (connection pooling)
response = await client.get(
url,
headers={"User-Agent": user_agent},
timeout=timeout,
follow_redirects=True,
)
else:
# Create new client for single request
async with httpx.AsyncClient() as new_client:
response = await new_client.get(
url,
Expand All @@ -334,9 +397,11 @@ async def fetch_adagents(
follow_redirects=True,
)

# Process response (same for both paths)
# Process response
if response.status_code == 404:
raise AdagentsNotFoundError(publisher_domain)
# Extract domain from URL for error message
parsed = urlparse(url)
raise AdagentsNotFoundError(parsed.netloc)

if response.status_code != 200:
raise AdagentsValidationError(
Expand All @@ -353,22 +418,29 @@ async def fetch_adagents(
if not isinstance(data, dict):
raise AdagentsValidationError("adagents.json must be a JSON object")

if "authorized_agents" not in data:
raise AdagentsValidationError("adagents.json must have 'authorized_agents' field")

if not isinstance(data["authorized_agents"], list):
raise AdagentsValidationError("'authorized_agents' must be an array")

# Validate mutual exclusivity constraints
try:
validate_adagents(data)
except ValidationError as e:
raise AdagentsValidationError(f"Invalid adagents.json structure: {e}") from e
# If this has authorized_agents, validate it
if "authorized_agents" in data:
if not isinstance(data["authorized_agents"], list):
raise AdagentsValidationError("'authorized_agents' must be an array")

# Validate mutual exclusivity constraints
try:
validate_adagents(data)
except ValidationError as e:
raise AdagentsValidationError(
f"Invalid adagents.json structure: {e}"
) from e
elif "authoritative_location" not in data:
# Neither authorized_agents nor authoritative_location
raise AdagentsValidationError(
"adagents.json must have either 'authorized_agents' or 'authoritative_location'"
)

return data

except httpx.TimeoutException as e:
raise AdagentsTimeoutError(publisher_domain, timeout) from e
parsed = urlparse(url)
raise AdagentsTimeoutError(parsed.netloc, timeout) from e
except httpx.RequestError as e:
raise AdagentsValidationError(f"Failed to fetch adagents.json: {e}") from e

Expand Down
118 changes: 118 additions & 0 deletions tests/test_adagents.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,124 @@ async def test_fetch_success(self):
call_args = mock_client.get.call_args
assert "https://example.com/.well-known/adagents.json" in str(call_args)

@pytest.mark.asyncio
async def test_fetch_follows_authoritative_location(self):
"""Should follow authoritative_location redirect and return resolved data."""
from adcp.adagents import fetch_adagents

# Initial response has authoritative_location redirect
redirect_response_data = {
"$schema": "/schemas/2.6.0/adagents.json",
"authoritative_location": "https://cdn.example.com/adagents/v2/adagents.json",
"last_updated": "2025-01-15T10:00:00Z",
}

# Final resolved data at the authoritative location
resolved_data = {
"$schema": "/schemas/2.6.0/adagents.json",
"authorized_agents": [
{
"url": "https://agent.example.com",
"authorized_for": "All properties",
"authorization_type": "property_tags",
"property_tags": ["all"],
}
],
"last_updated": "2025-01-15T10:00:00Z",
}

# Mock client that returns different responses based on URL
called_urls: list[str] = []
responses = [redirect_response_data, resolved_data]

async def mock_get(url, **kwargs):
called_urls.append(url)
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = responses[len(called_urls) - 1]
return mock_response

mock_client = MagicMock()
mock_client.get = mock_get

result = await fetch_adagents("example.com", client=mock_client)

assert result == resolved_data
assert called_urls == [
"https://example.com/.well-known/adagents.json",
"https://cdn.example.com/adagents/v2/adagents.json",
]

@pytest.mark.asyncio
async def test_fetch_rejects_non_https_authoritative_location(self):
"""Should reject authoritative_location that uses HTTP instead of HTTPS."""
from adcp.adagents import fetch_adagents

redirect_response_data = {
"$schema": "/schemas/2.6.0/adagents.json",
"authoritative_location": "http://cdn.example.com/adagents.json", # HTTP not HTTPS
"last_updated": "2025-01-15T10:00:00Z",
}

mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = redirect_response_data

mock_client = create_mock_httpx_client(mock_response)

with pytest.raises(AdagentsValidationError, match="HTTPS"):
await fetch_adagents("example.com", client=mock_client)

@pytest.mark.asyncio
async def test_fetch_prevents_redirect_loop(self):
"""Should detect and prevent circular redirect loops."""
from adcp.adagents import fetch_adagents

# Circular redirect: A -> B -> A
redirect_data = {
"$schema": "/schemas/2.6.0/adagents.json",
"authoritative_location": "https://example.com/.well-known/adagents.json",
"last_updated": "2025-01-15T10:00:00Z",
}

mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = redirect_data

mock_client = create_mock_httpx_client(mock_response)

with pytest.raises(AdagentsValidationError, match="redirect loop|already visited"):
await fetch_adagents("example.com", client=mock_client)

@pytest.mark.asyncio
async def test_fetch_enforces_max_redirect_depth(self):
"""Should enforce maximum redirect depth to prevent abuse."""
from adcp.adagents import fetch_adagents

# Create a long chain of redirects
call_count = [0]

async def mock_get(url, **kwargs):
call_count[0] += 1
mock_response = MagicMock()
mock_response.status_code = 200
# Always return a redirect to a new URL
mock_response.json.return_value = {
"$schema": "/schemas/2.6.0/adagents.json",
"authoritative_location": f"https://cdn{call_count[0]}.example.com/adagents.json",
"last_updated": "2025-01-15T10:00:00Z",
}
return mock_response

mock_client = MagicMock()
mock_client.get = mock_get

with pytest.raises(AdagentsValidationError, match="redirect|depth"):
await fetch_adagents("example.com", client=mock_client)

# Should stop after reasonable number of redirects (not go forever)
assert call_count[0] <= 10


class TestVerifyAgentForProperty:
"""Test convenience wrapper for fetching and verifying in one call."""
Expand Down
Loading