diff --git a/firebase_admin/fpnv.py b/firebase_admin/fpnv.py new file mode 100644 index 00000000..d0374859 --- /dev/null +++ b/firebase_admin/fpnv.py @@ -0,0 +1,229 @@ +# Copyright 2026 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Firebase Phone Number Verification (FPNV) module. + +This module contains functions for verifying JWTs related to the Firebase +Phone Number Verification (FPNV) service. +""" +from typing import Any, Dict + +import jwt +from jwt import PyJWKClient, InvalidTokenError, DecodeError, InvalidSignatureError, \ + PyJWKClientError, InvalidAudienceError, InvalidIssuerError, ExpiredSignatureError + +from firebase_admin import _utils + +_FPNV_ATTRIBUTE = '_fpnv' +_FPNV_JWKS_URL = 'https://fpnv.googleapis.com/v1beta/jwks' +_FPNV_ISSUER = 'https://fpnv.googleapis.com/projects/' +_ALGORITHM_ES256 = 'ES256' + + +def client(app=None): + """Returns an instance of the FPNV service for the specified app. + + Args: + app: An App instance (optional). + + Returns: + FpnvClient: A FpnvClient instance. + + Raises: + ValueError: If the app is not a valid App instance. + """ + return _utils.get_app_service(app, _FPNV_ATTRIBUTE, FpnvClient) + + +class FpnvToken(dict): + """Represents a verified FPNV token. + + This class behaves like a dictionary, allowing access to the decoded claims. + It also provides convenience properties for common claims. + """ + + def __init__(self, claims): + super(FpnvToken, self).__init__(claims) + + @property + def phone_number(self): + """Returns the phone number of the user. + This corresponds to the 'sub' claim in the JWT. + """ + return self.get('sub') + + @property + def issuer(self): + """Returns the issuer identifier for the issuer of the response.""" + return self.get('iss') + + @property + def audience(self): + """Returns the audience for which this token is intended.""" + return self.get('aud') + + @property + def exp(self): + """Returns the expiration time since the Unix epoch.""" + return self.get('exp') + + @property + def iat(self): + """Returns the issued-at time since the Unix epoch.""" + return self.get('iat') + + @property + def sub(self): + """Returns the sub (subject) of the token, which is the phone number.""" + return self.get('sub') + + @property + def claims(self): + """Returns the entire map of claims.""" + return self + + +class FpnvClient: + """The client for the Firebase Phone Number Verification service.""" + _project_id = None + + def __init__(self, app): + """Initializes the FpnvClient. + + Args: + app: A firebase_admin.App instance. + + Raises: + ValueError: If the app is invalid or lacks a project ID. + """ + self._project_id = app.project_id + + if not self._project_id: + cred = app.credential.get_credential() + if hasattr(cred, 'project_id'): + self._project_id = cred.project_id + + if not self._project_id: + raise ValueError( + 'Project ID is required for FPNV. Please ensure the app is ' + 'initialized with a credential that contains a project ID.' + ) + + self._verifier = _FpnvTokenVerifier(self._project_id) + + def verify_token(self, token) -> FpnvToken: + """Verifies the given FPNV token. + + Verifies the signature, expiration, and claims of the token. + + Args: + token: A string containing the FPNV JWT. + + Returns: + FpnvToken: The verified token claims. + + Raises: + ValueError: If the token is invalid or malformed. + """ + return FpnvToken(self._verifier.verify(token)) + + +class _FpnvTokenVerifier: + """Internal class for verifying FPNV JWTs signed with ES256.""" + _jwks_client = None + _project_id = None + + def __init__(self, project_id): + self._project_id = project_id + self._jwks_client = PyJWKClient(_FPNV_JWKS_URL, lifespan=21600) + + def verify(self, token) -> Dict[str, Any]: + _Validators.check_string("FPNV check token", token) + try: + self._validate_headers(jwt.get_unverified_header(token)) + signing_key = self._jwks_client.get_signing_key_from_jwt(token) + claims = self._validate_payload(token, signing_key.key) + except (InvalidTokenError, DecodeError, PyJWKClientError) as exception: + raise ValueError( + f'Verifying FPNV token failed. Error: {exception}' + ) from exception + + return claims + + def _validate_headers(self, headers: Any) -> None: + if headers.get('kid') is None: + raise ValueError("FPNV has no 'kid' claim.") + + if headers.get('typ') != 'JWT': + raise ValueError("The provided FPNV token has an incorrect type header") + + algorithm = headers.get('alg') + if algorithm != _ALGORITHM_ES256: + raise ValueError( + 'The provided FPNV token has an incorrect alg header. ' + f'Expected {_ALGORITHM_ES256} but got {algorithm}.' + ) + + def _validate_payload(self, token: str, signing_key: str) -> Dict[str, Any]: + """Decodes and verifies the token.""" + expected_issuer = f'{_FPNV_ISSUER}{self._project_id}' + try: + payload = jwt.decode( + token, + signing_key, + algorithms=[_ALGORITHM_ES256], + audience=expected_issuer, + issuer=expected_issuer + ) + except InvalidSignatureError as exception: + raise ValueError( + 'The provided FPNV token has an invalid signature.' + ) from exception + except InvalidAudienceError as exception: + raise ValueError( + 'The provided FPNV token has an incorrect "aud" (audience) claim. ' + f'Expected payload to include {expected_issuer}.' + ) from exception + except InvalidIssuerError as exception: + raise ValueError( + 'The provided FPNV token has an incorrect "iss" (issuer) claim. ' + f'Expected claim to include {expected_issuer}' + ) from exception + except ExpiredSignatureError as exception: + raise ValueError( + 'The provided FPNV token has expired.' + ) from exception + except InvalidTokenError as exception: + raise ValueError( + f'Decoding FPNV token failed. Error: {exception}' + ) from exception + + _Validators.check_string( + 'The provided FPNV token "sub" (subject) claim', + payload.get('sub')) + + return payload + + +class _Validators: + """A collection of data validation utilities. + + Methods provided in this class raise ``ValueErrors`` if any validations fail. + """ + + @classmethod + def check_string(cls, label: str, value: Any): + """Checks if the given value is a string.""" + if not isinstance(value, str) or not value: + raise ValueError(f'{label} must be a non-empty string.') diff --git a/tests/test_fpnv.py b/tests/test_fpnv.py new file mode 100644 index 00000000..21e85c23 --- /dev/null +++ b/tests/test_fpnv.py @@ -0,0 +1,212 @@ +# Copyright 2026 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Test cases for the firebase_admin.fpnv module.""" + +from unittest import mock + +import jwt +import pytest + +import firebase_admin +from firebase_admin import fpnv +from tests import testutils + +# Mock Data +_PROJECT_ID = 'mock-project-id' +_EXP_TIMESTAMP = 2000000000 +_ISSUER = f'https://fpnv.googleapis.com/projects/{_PROJECT_ID}' +_PHONE_NUMBER = '+1234567890' +_PUBLIC_KEY = 'test-public-key' # In real tests, use the corresponding public key + +_MOCK_PAYLOAD = { + 'iss': _ISSUER, + 'sub': '+1234567890', + 'aud': [_ISSUER], + 'exp': _EXP_TIMESTAMP, + 'iat': _EXP_TIMESTAMP - 3600, + "other": 'other' +} + + +@pytest.fixture +def client(): + app = firebase_admin.get_app() + return fpnv.client(app) + + +class TestCommon: + @classmethod + def setup_class(cls): + cred = testutils.MockCredential() + firebase_admin.initialize_app(cred, {'projectId': _PROJECT_ID}) + + @classmethod + def teardown_class(cls): + testutils.cleanup_apps() + + +class TestFpnvToken: + def test_properties(self): + token = fpnv.FpnvToken(_MOCK_PAYLOAD) + + assert token.phone_number == _PHONE_NUMBER + assert token.sub == _PHONE_NUMBER + assert token.issuer == _ISSUER + assert token.audience == [_ISSUER] + assert token.exp == _MOCK_PAYLOAD['exp'] + assert token.iat == _MOCK_PAYLOAD['iat'] + assert token.claims == _MOCK_PAYLOAD + assert token['other'] == _MOCK_PAYLOAD['other'] + + +class TestFpnvClient(TestCommon): + + def test_client(self): + app = firebase_admin.get_app() + client = fpnv.client(app) + assert isinstance(client, fpnv.FpnvClient) + assert client._project_id == _PROJECT_ID + + def test_requires_project_id(self): + cred = testutils.MockCredential() + # Create app without project ID + app = firebase_admin.initialize_app(cred, name='no_project_id') + # Mock credential to not have project_id + app.credential.get_credential().project_id = None + + with pytest.raises(ValueError, match='Project ID is required'): + fpnv.client(app) + + def test_client_default_app(self): + client = fpnv.client() + assert isinstance(client, fpnv.FpnvClient) + + def test_client_explicit_app(self): + cred = testutils.MockCredential() + app = firebase_admin.initialize_app(cred, {'projectId': _PROJECT_ID}, name='custom') + client = fpnv.client(app) + assert isinstance(client, fpnv.FpnvClient) + + +class TestVerifyToken(TestCommon): + + @mock.patch('jwt.PyJWKClient') + @mock.patch('jwt.decode') + @mock.patch('jwt.get_unverified_header') + def test_verify_token_success(self, mock_header, mock_decode, mock_jwks_cls, client): + token_str = 'valid.token.string' + # Mock Header + mock_header.return_value = {'kid': 'key1', 'typ': 'JWT', 'alg': 'ES256'} + + # Mock Signing Key + mock_jwks_instance = mock_jwks_cls.return_value + mock_signing_key = mock.Mock() + mock_signing_key.key = _PUBLIC_KEY + mock_jwks_instance.get_signing_key_from_jwt.return_value = mock_signing_key + client._verifier._jwks_client = mock_jwks_instance + + mock_decode.return_value = _MOCK_PAYLOAD + + # Execute + token = client.verify_token(token_str) + + # Verify + assert isinstance(token, fpnv.FpnvToken) + assert token.phone_number == _PHONE_NUMBER + + mock_header.assert_called_with(token_str) + mock_jwks_instance.get_signing_key_from_jwt.assert_called_with(token_str) + mock_decode.assert_called_with( + token_str, + _PUBLIC_KEY, + algorithms=['ES256'], + audience=_ISSUER, + issuer=_ISSUER + ) + + @mock.patch('jwt.get_unverified_header') + def test_verify_token_no_kid(self, mock_header): + app = firebase_admin.get_app() + client = fpnv.client(app) + mock_header.return_value = {'typ': 'JWT', 'alg': 'ES256'} # Missing kid + with pytest.raises(ValueError, match="FPNV has no 'kid' claim."): + client.verify_token('token') + + @mock.patch('jwt.get_unverified_header') + def test_verify_token_wrong_alg(self, mock_header, client): + mock_header.return_value = {'kid': 'k', 'typ': 'JWT', 'alg': 'RS256'} # Wrong alg + with pytest.raises(ValueError, match="incorrect alg"): + client.verify_token('token') + + def test_verify_token_jwk_error(self, client): + # Access the ACTUAL client instance used by the verifier + # (Assuming internal structure: client -> _verifier -> _jwks_client) + jwks_client = client._verifier._jwks_client + + # Mock the method on the existing instance + with mock.patch.object(jwks_client, 'get_signing_key_from_jwt') as mock_method: + mock_method.side_effect = jwt.PyJWKClientError("Key not found") + + # Mock header is still needed if _get_signing_key calls it before the client + with mock.patch('jwt.get_unverified_header') as mock_header: + mock_header.return_value = {'kid': 'k', 'typ': 'JWT', 'alg': 'ES256'} + + with pytest.raises(ValueError, match="Verifying FPNV token failed"): + client.verify_token('token') + + @mock.patch('jwt.PyJWKClient') + @mock.patch('jwt.decode') + @mock.patch('jwt.get_unverified_header') + def test_verify_token_expired(self, mock_header, mock_decode, mock_jwks_cls, client): + mock_header.return_value = {'kid': 'k', 'typ': 'JWT', 'alg': 'ES256'} + mock_jwks_instance = mock_jwks_cls.return_value + mock_jwks_instance.get_signing_key_from_jwt.return_value.key = _PUBLIC_KEY + client._verifier._jwks_client = mock_jwks_instance + + # Simulate ExpiredSignatureError + mock_decode.side_effect = jwt.ExpiredSignatureError("Expired") + + with pytest.raises(ValueError, match="token has expired"): + client.verify_token('token') + + @mock.patch('jwt.PyJWKClient') + @mock.patch('jwt.decode') + @mock.patch('jwt.get_unverified_header') + def test_verify_token_invalid_audience(self, mock_header, mock_decode, mock_jwks_cls, client): + mock_header.return_value = {'kid': 'k', 'typ': 'JWT', 'alg': 'ES256'} + mock_jwks_instance = mock_jwks_cls.return_value + mock_jwks_instance.get_signing_key_from_jwt.return_value.key = _PUBLIC_KEY + client._verifier._jwks_client = mock_jwks_instance + + # Simulate InvalidAudienceError + mock_decode.side_effect = jwt.InvalidAudienceError("Wrong Aud") + + with pytest.raises(ValueError, match="incorrect \"aud\""): + client.verify_token('token') + + @mock.patch('jwt.PyJWKClient') + @mock.patch('jwt.decode') + @mock.patch('jwt.get_unverified_header') + def test_verify_token_invalid_issuer(self, mock_header, mock_decode, mock_jwks_cls, client): + mock_header.return_value = {'kid': 'k', 'typ': 'JWT', 'alg': 'ES256'} + mock_jwks_instance = mock_jwks_cls.return_value + mock_jwks_instance.get_signing_key_from_jwt.return_value.key = _PUBLIC_KEY + client._verifier._jwks_client = mock_jwks_instance + + # Simulate InvalidIssuerError + mock_decode.side_effect = jwt.InvalidIssuerError("Wrong Iss") + + with pytest.raises(ValueError, match="incorrect \"iss\""): + client.verify_token('token')