diff --git a/src/adcp/ADCP_VERSION b/src/adcp/ADCP_VERSION index e70b4523..aedc15bb 100644 --- a/src/adcp/ADCP_VERSION +++ b/src/adcp/ADCP_VERSION @@ -1 +1 @@ -2.6.0 +2.5.3 diff --git a/src/adcp/adagents.py b/src/adcp/adagents.py index d2008414..6554295e 100644 --- a/src/adcp/adagents.py +++ b/src/adcp/adagents.py @@ -280,6 +280,10 @@ def verify_agent_authorization( return False +# Maximum number of authoritative_location redirects to follow +MAX_REDIRECT_DEPTH = 5 + + async def fetch_adagents( publisher_domain: str, timeout: float = 10.0, @@ -288,6 +292,11 @@ async def fetch_adagents( ) -> dict[str, Any]: """Fetch and parse adagents.json from publisher domain. + Follows authoritative_location redirects per the AdCP specification. When a + publisher's adagents.json contains an authoritative_location field instead of + authorized_agents, this function fetches the referenced URL to get the actual + authorization data. + Args: publisher_domain: Domain hosting the adagents.json file timeout: Request timeout in seconds @@ -297,11 +306,12 @@ async def fetch_adagents( If None, a new client is created for this request. Returns: - Parsed adagents.json data + Parsed adagents.json data (resolved from authoritative_location if present) Raises: AdagentsNotFoundError: If adagents.json not found (404) - AdagentsValidationError: If JSON is invalid or malformed + AdagentsValidationError: If JSON is invalid, malformed, or redirects + exceed maximum depth or form a loop AdagentsTimeoutError: If request times out Notes: @@ -311,13 +321,67 @@ async def fetch_adagents( # Validate and normalize domain for security publisher_domain = _validate_publisher_domain(publisher_domain) - # Construct URL + # Construct initial URL url = f"https://{publisher_domain}/.well-known/adagents.json" + # Track visited URLs to detect loops + visited_urls: set[str] = set() + + for depth in range(MAX_REDIRECT_DEPTH + 1): + # Check for redirect loop + if url in visited_urls: + raise AdagentsValidationError( + f"Circular redirect detected: {url} already visited" + ) + visited_urls.add(url) + + data = await _fetch_adagents_url(url, timeout, user_agent, client) + + # Check if this is a redirect. A response with authoritative_location but no + # authorized_agents indicates a redirect. If both are present, authorized_agents + # takes precedence (response is treated as final). + if "authoritative_location" in data and "authorized_agents" not in data: + authoritative_url = data["authoritative_location"] + + # Validate HTTPS requirement + if not isinstance(authoritative_url, str) or not authoritative_url.startswith( + "https://" + ): + raise AdagentsValidationError( + f"authoritative_location must be an HTTPS URL, got: {authoritative_url!r}" + ) + + # Check if we've exceeded max depth + if depth >= MAX_REDIRECT_DEPTH: + raise AdagentsValidationError( + f"Maximum redirect depth ({MAX_REDIRECT_DEPTH}) exceeded" + ) + + # Follow the redirect + url = authoritative_url + continue + + # We have the final data with authorized_agents (or both fields present, + # in which case authorized_agents takes precedence) + return data + + # Unreachable: loop always exits via return or raise above + raise AssertionError("Unreachable") # pragma: no cover + + +async def _fetch_adagents_url( + url: str, + timeout: float, + user_agent: str, + client: httpx.AsyncClient | None, +) -> dict[str, Any]: + """Fetch and parse adagents.json from a specific URL. + + This is the core fetch logic, separated to support redirect following. + """ try: # Use provided client or create a new one if client is not None: - # Reuse provided client (connection pooling) response = await client.get( url, headers={"User-Agent": user_agent}, @@ -325,7 +389,6 @@ async def fetch_adagents( follow_redirects=True, ) else: - # Create new client for single request async with httpx.AsyncClient() as new_client: response = await new_client.get( url, @@ -334,9 +397,11 @@ async def fetch_adagents( follow_redirects=True, ) - # Process response (same for both paths) + # Process response if response.status_code == 404: - raise AdagentsNotFoundError(publisher_domain) + # Extract domain from URL for error message + parsed = urlparse(url) + raise AdagentsNotFoundError(parsed.netloc) if response.status_code != 200: raise AdagentsValidationError( @@ -353,22 +418,29 @@ async def fetch_adagents( if not isinstance(data, dict): raise AdagentsValidationError("adagents.json must be a JSON object") - if "authorized_agents" not in data: - raise AdagentsValidationError("adagents.json must have 'authorized_agents' field") - - if not isinstance(data["authorized_agents"], list): - raise AdagentsValidationError("'authorized_agents' must be an array") - - # Validate mutual exclusivity constraints - try: - validate_adagents(data) - except ValidationError as e: - raise AdagentsValidationError(f"Invalid adagents.json structure: {e}") from e + # If this has authorized_agents, validate it + if "authorized_agents" in data: + if not isinstance(data["authorized_agents"], list): + raise AdagentsValidationError("'authorized_agents' must be an array") + + # Validate mutual exclusivity constraints + try: + validate_adagents(data) + except ValidationError as e: + raise AdagentsValidationError( + f"Invalid adagents.json structure: {e}" + ) from e + elif "authoritative_location" not in data: + # Neither authorized_agents nor authoritative_location + raise AdagentsValidationError( + "adagents.json must have either 'authorized_agents' or 'authoritative_location'" + ) return data except httpx.TimeoutException as e: - raise AdagentsTimeoutError(publisher_domain, timeout) from e + parsed = urlparse(url) + raise AdagentsTimeoutError(parsed.netloc, timeout) from e except httpx.RequestError as e: raise AdagentsValidationError(f"Failed to fetch adagents.json: {e}") from e diff --git a/tests/test_adagents.py b/tests/test_adagents.py index 30e4ba51..f809dd01 100644 --- a/tests/test_adagents.py +++ b/tests/test_adagents.py @@ -400,6 +400,124 @@ async def test_fetch_success(self): call_args = mock_client.get.call_args assert "https://example.com/.well-known/adagents.json" in str(call_args) + @pytest.mark.asyncio + async def test_fetch_follows_authoritative_location(self): + """Should follow authoritative_location redirect and return resolved data.""" + from adcp.adagents import fetch_adagents + + # Initial response has authoritative_location redirect + redirect_response_data = { + "$schema": "/schemas/2.6.0/adagents.json", + "authoritative_location": "https://cdn.example.com/adagents/v2/adagents.json", + "last_updated": "2025-01-15T10:00:00Z", + } + + # Final resolved data at the authoritative location + resolved_data = { + "$schema": "/schemas/2.6.0/adagents.json", + "authorized_agents": [ + { + "url": "https://agent.example.com", + "authorized_for": "All properties", + "authorization_type": "property_tags", + "property_tags": ["all"], + } + ], + "last_updated": "2025-01-15T10:00:00Z", + } + + # Mock client that returns different responses based on URL + called_urls: list[str] = [] + responses = [redirect_response_data, resolved_data] + + async def mock_get(url, **kwargs): + called_urls.append(url) + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = responses[len(called_urls) - 1] + return mock_response + + mock_client = MagicMock() + mock_client.get = mock_get + + result = await fetch_adagents("example.com", client=mock_client) + + assert result == resolved_data + assert called_urls == [ + "https://example.com/.well-known/adagents.json", + "https://cdn.example.com/adagents/v2/adagents.json", + ] + + @pytest.mark.asyncio + async def test_fetch_rejects_non_https_authoritative_location(self): + """Should reject authoritative_location that uses HTTP instead of HTTPS.""" + from adcp.adagents import fetch_adagents + + redirect_response_data = { + "$schema": "/schemas/2.6.0/adagents.json", + "authoritative_location": "http://cdn.example.com/adagents.json", # HTTP not HTTPS + "last_updated": "2025-01-15T10:00:00Z", + } + + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = redirect_response_data + + mock_client = create_mock_httpx_client(mock_response) + + with pytest.raises(AdagentsValidationError, match="HTTPS"): + await fetch_adagents("example.com", client=mock_client) + + @pytest.mark.asyncio + async def test_fetch_prevents_redirect_loop(self): + """Should detect and prevent circular redirect loops.""" + from adcp.adagents import fetch_adagents + + # Circular redirect: A -> B -> A + redirect_data = { + "$schema": "/schemas/2.6.0/adagents.json", + "authoritative_location": "https://example.com/.well-known/adagents.json", + "last_updated": "2025-01-15T10:00:00Z", + } + + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = redirect_data + + mock_client = create_mock_httpx_client(mock_response) + + with pytest.raises(AdagentsValidationError, match="redirect loop|already visited"): + await fetch_adagents("example.com", client=mock_client) + + @pytest.mark.asyncio + async def test_fetch_enforces_max_redirect_depth(self): + """Should enforce maximum redirect depth to prevent abuse.""" + from adcp.adagents import fetch_adagents + + # Create a long chain of redirects + call_count = [0] + + async def mock_get(url, **kwargs): + call_count[0] += 1 + mock_response = MagicMock() + mock_response.status_code = 200 + # Always return a redirect to a new URL + mock_response.json.return_value = { + "$schema": "/schemas/2.6.0/adagents.json", + "authoritative_location": f"https://cdn{call_count[0]}.example.com/adagents.json", + "last_updated": "2025-01-15T10:00:00Z", + } + return mock_response + + mock_client = MagicMock() + mock_client.get = mock_get + + with pytest.raises(AdagentsValidationError, match="redirect|depth"): + await fetch_adagents("example.com", client=mock_client) + + # Should stop after reasonable number of redirects (not go forever) + assert call_count[0] <= 10 + class TestVerifyAgentForProperty: """Test convenience wrapper for fetching and verifying in one call."""