diff --git a/h/security/identity.py b/h/security/identity.py index 68fda82df5e..4b7374c94f5 100644 --- a/h/security/identity.py +++ b/h/security/identity.py @@ -1,7 +1,7 @@ """Data classes used to represent authenticated users.""" from dataclasses import dataclass -from typing import List, Optional +from typing import List, Optional, Self from h.models import AuthClient, Group, User @@ -94,3 +94,11 @@ def from_models(cls, user: User = None, auth_client: AuthClient = None): LongLivedAuthClient.from_model(auth_client) if auth_client else None ), ) + + @staticmethod + def authenticated_userid(identity: Self | None) -> str | None: + """Return the authenticated_userid from the given identity.""" + if identity and identity.user: + return identity.user.userid + + return None diff --git a/h/security/policy/_api.py b/h/security/policy/_api.py index d6fb82b0576..9a62a7d572b 100644 --- a/h/security/policy/_api.py +++ b/h/security/policy/_api.py @@ -1,10 +1,11 @@ from pyramid.request import Request, RequestLocalCache +from pyramid.security import Allowed, Denied from h.security.identity import Identity -from h.security.policy._identity_base import IdentityBasedPolicy +from h.security.permits import identity_permits -class APIPolicy(IdentityBasedPolicy): +class APIPolicy: """The security policy for API requests. Delegates to subpolicies.""" def __init__(self, sub_policies): @@ -18,10 +19,16 @@ def forget(self, *_args, **_kwargs): def identity(self, request) -> Identity | None: return self._identity_cache.get_or_create(request) + def authenticated_userid(self, request): + return Identity.authenticated_userid(self.identity(request)) + def remember(self, *_args, **_kwargs): # remember() isn't supported for stateless API requests. return [] + def permits(self, request, context, permission) -> Allowed | Denied: + return identity_permits(self.identity(request), context, permission) + def _load_identity(self, request: Request) -> Identity | None: for policy in applicable_policies(request, self.sub_policies): identity = policy.identity(request) diff --git a/h/security/policy/_auth_client.py b/h/security/policy/_auth_client.py index 2cab66fa71f..aeddc0794d6 100644 --- a/h/security/policy/_auth_client.py +++ b/h/security/policy/_auth_client.py @@ -1,16 +1,17 @@ import hmac from pyramid.authentication import extract_http_basic_credentials +from pyramid.security import Allowed, Denied from sqlalchemy.exc import StatementError from h.exceptions import InvalidUserId from h.models import AuthClient from h.models.auth_client import GrantType from h.security.identity import Identity -from h.security.policy._identity_base import IdentityBasedPolicy +from h.security.permits import identity_permits -class AuthClientPolicy(IdentityBasedPolicy): +class AuthClientPolicy: """ An authentication policy for registered AuthClients. @@ -80,6 +81,12 @@ def identity(self, request): return Identity.from_models(auth_client=auth_client, user=user) + def authenticated_userid(self, request): + return Identity.authenticated_userid(self.identity(request)) + + def permits(self, request, context, permission) -> Allowed | Denied: + return identity_permits(self.identity(request), context, permission) + @classmethod def _get_auth_client(cls, request): """Get a matching auth client if the credentials are valid.""" diff --git a/h/security/policy/_cookie.py b/h/security/policy/_cookie.py index 5f4c53969a4..bac30380633 100644 --- a/h/security/policy/_cookie.py +++ b/h/security/policy/_cookie.py @@ -3,13 +3,14 @@ from os import urandom import webob +from pyramid.security import Allowed, Denied from h.security.identity import Identity -from h.security.policy._identity_base import IdentityBasedPolicy +from h.security.permits import identity_permits from h.services.auth_ticket import AuthTicketService -class CookiePolicy(IdentityBasedPolicy): +class CookiePolicy: """ An authentication policy based on cookies. @@ -32,6 +33,9 @@ def identity(self, request): return Identity.from_models(user=user) + def authenticated_userid(self, request): + return Identity.authenticated_userid(self.identity(request)) + def remember(self, request, userid, **kw): # pylint:disable=unused-argument """Get a list of headers which will remember the user in a cookie.""" @@ -70,6 +74,9 @@ def forget(self, request): return self.cookie.get_headers(None, max_age=0) + def permits(self, request, context, permission) -> Allowed | Denied: + return identity_permits(self.identity(request), context, permission) + @staticmethod @lru_cache # Ensure we only add this once per request def _add_vary_by_cookie(request): diff --git a/h/security/policy/_identity_base.py b/h/security/policy/_identity_base.py deleted file mode 100644 index 58b626349b8..00000000000 --- a/h/security/policy/_identity_base.py +++ /dev/null @@ -1,49 +0,0 @@ -from typing import Optional - -from pyramid.security import Allowed, Denied - -from h.security.identity import Identity -from h.security.permits import identity_permits - - -class IdentityBasedPolicy: - """ - A base policy which will fill a policy based on returning an identity. - - This means you only need to implement `identity()` to get a functioning - security policy. - """ - - # pylint: disable=unused-argument - def identity(self, request) -> Optional[Identity]: - """ - Get an Identity object for valid credentials. - - Sub-classes should implement this to return an Identity object when - the request contains valid credentials. - - :param request: Pyramid request to inspect - """ - return None - - def permits(self, request, context, permission) -> Allowed | Denied: - """ - Get whether a given request has the requested permission on the context. - - :param request: Pyramid request to extract identity from - :param context: A context object - :param permission: The permission requested - """ - return identity_permits(self.identity(request), context, permission) - - def authenticated_userid(self, request): - """ - Return the userid implied by the token in the passed request, if any. - - :param request: Pyramid request to inspect - :return: The userid authenticated for the passed request or None - """ - if (identity := self.identity(request)) and identity.user: - return identity.user.userid - - return None diff --git a/h/security/policy/bearer_token.py b/h/security/policy/bearer_token.py index 98701588f3c..ccf50ab6b26 100644 --- a/h/security/policy/bearer_token.py +++ b/h/security/policy/bearer_token.py @@ -1,11 +1,12 @@ from pyramid.request import RequestLocalCache +from pyramid.security import Allowed, Denied from h.security.identity import Identity -from h.security.policy._identity_base import IdentityBasedPolicy +from h.security.permits import identity_permits from h.security.policy.helpers import is_api_request -class BearerTokenPolicy(IdentityBasedPolicy): +class BearerTokenPolicy: """ A Bearer token authentication policy. @@ -37,6 +38,12 @@ def identity(self, request): return self._identity_cache.get_or_create(request) + def authenticated_userid(self, request): + return Identity.authenticated_userid(self.identity(request)) + + def permits(self, request, context, permission) -> Allowed | Denied: + return identity_permits(self.identity(request), context, permission) + def _load_identity(self, request): token_svc = request.find_service(name="auth_token") token_str = None diff --git a/h/security/policy/top_level.py b/h/security/policy/top_level.py index 9ab00c980c7..ef3a3b745ce 100644 --- a/h/security/policy/top_level.py +++ b/h/security/policy/top_level.py @@ -1,17 +1,17 @@ import webob from pyramid.request import RequestLocalCache +from pyramid.security import Allowed, Denied from h.security.identity import Identity from h.security.policy._api import APIPolicy from h.security.policy._api_cookie import APICookiePolicy from h.security.policy._auth_client import AuthClientPolicy from h.security.policy._cookie import CookiePolicy -from h.security.policy._identity_base import IdentityBasedPolicy from h.security.policy.bearer_token import BearerTokenPolicy from h.security.policy.helpers import is_api_request -class TopLevelPolicy(IdentityBasedPolicy): +class TopLevelPolicy: """The top-level security policy. Delegates to subpolicies.""" def __init__(self): @@ -24,10 +24,16 @@ def forget(self, request, **kw): def identity(self, request) -> Identity | None: return self._identity_cache.get_or_create(request) + def authenticated_userid(self, request): + return get_subpolicy(request).authenticated_userid(request) + def remember(self, request, userid, **kw): self._identity_cache.clear(request) return get_subpolicy(request).remember(request, userid, **kw) + def permits(self, request, context, permission) -> Allowed | Denied: + return get_subpolicy(request).permits(request, context, permission) + def _load_identity(self, request): return get_subpolicy(request).identity(request) diff --git a/tests/unit/h/security/identity_test.py b/tests/unit/h/security/identity_test.py index dd9f4b6f91a..9edcd469918 100644 --- a/tests/unit/h/security/identity_test.py +++ b/tests/unit/h/security/identity_test.py @@ -82,6 +82,29 @@ def test_from_models_with_None(self, LongLivedUser, LongLivedAuthClient): {"user": None, "auth_client": None} ) + @pytest.mark.parametrize( + "identity,authenticated_userid", + [ + (None, None), + (Identity(user=None), None), + ( + Identity( + user=LongLivedUser( + id=sentinel.id, + userid=sentinel.userid, + authority=sentinel.authority, + groups=[], + staff=False, + admin=False, + ) + ), + sentinel.userid, + ), + ], + ) + def test_authenticated_userid(self, identity, authenticated_userid): + assert Identity.authenticated_userid(identity) == authenticated_userid + @pytest.fixture(autouse=True) def LongLivedUser(self, patch): return patch("h.security.identity.LongLivedUser") diff --git a/tests/unit/h/security/policy/_api_test.py b/tests/unit/h/security/policy/_api_test.py index 3b0591e31ec..8e42d4ba7a4 100644 --- a/tests/unit/h/security/policy/_api_test.py +++ b/tests/unit/h/security/policy/_api_test.py @@ -3,7 +3,6 @@ import pytest from pyramid.request import Request -from h.security.identity import Identity from h.security.policy._api import APIPolicy, applicable_policies @@ -60,13 +59,37 @@ def test_identity_when_there_are_no_applicable_policies( assert identity is None + def test_authenticated_userid(self, api_policy, pyramid_request, Identity, mocker): + mocker.spy(api_policy, "identity") + + authenticated_userid = api_policy.authenticated_userid(pyramid_request) + + api_policy.identity.assert_called_once_with(pyramid_request) + Identity.authenticated_userid.assert_called_once_with( + api_policy.identity.spy_return + ) + assert authenticated_userid == Identity.authenticated_userid.return_value + def test_remember(self, api_policy, pyramid_request): assert api_policy.remember(pyramid_request, sentinel.userid, foo="bar") == [] + def test_permits(self, api_policy, pyramid_request, identity_permits, mocker): + mocker.spy(api_policy, "identity") + + result = api_policy.permits( + pyramid_request, sentinel.context, sentinel.permission + ) + + api_policy.identity.assert_called_once_with(pyramid_request) + identity_permits.assert_called_once_with( + api_policy.identity.spy_return, sentinel.context, sentinel.permission + ) + assert result == identity_permits.return_value + @pytest.fixture(autouse=True) def applicable_policies(self, mocker): class SubpolicySpec: - def identity(self, request: Request) -> Identity | None: + def identity(self, request: Request): """Return the identity of the current user.""" return mocker.patch( @@ -112,3 +135,15 @@ def handles(request) -> bool: for policy in policies: policy.handles.assert_called_once_with(pyramid_request) assert returned_policies == [policies[index] for index in expected_policies] + + +@pytest.fixture(autouse=True) +def Identity(mocker): + return mocker.patch("h.security.policy._api.Identity", autospec=True, spec_set=True) + + +@pytest.fixture(autouse=True) +def identity_permits(mocker): + return mocker.patch( + "h.security.policy._api.identity_permits", autospec=True, spec_set=True + ) diff --git a/tests/unit/h/security/policy/_auth_client_test.py b/tests/unit/h/security/policy/_auth_client_test.py index ed8bff1e9be..3a78db7840f 100644 --- a/tests/unit/h/security/policy/_auth_client_test.py +++ b/tests/unit/h/security/policy/_auth_client_test.py @@ -5,7 +5,6 @@ from h.exceptions import InvalidUserId from h.models.auth_client import GrantType -from h.security import Identity from h.security.policy._auth_client import AuthClientPolicy @@ -28,22 +27,26 @@ def test_handles_rejects_no_route(self, pyramid_request): assert not AuthClientPolicy.handles(pyramid_request) - def test_identity(self, pyramid_request, auth_client, user_service): + def test_identity(self, pyramid_request, auth_client, user_service, Identity): pyramid_request.headers["X-Forwarded-User"] = sentinel.forwarded_user identity = AuthClientPolicy().identity(pyramid_request) user_service.fetch.assert_called_once_with(sentinel.forwarded_user) - assert identity == Identity.from_models( + Identity.from_models.assert_called_once_with( auth_client=auth_client, user=user_service.fetch.return_value ) + assert identity == Identity.from_models.return_value - def test_identify_without_forwarded_user(self, pyramid_request, auth_client): + def test_identify_without_forwarded_user( + self, pyramid_request, auth_client, Identity + ): pyramid_request.headers["X-Forwarded-User"] = None identity = AuthClientPolicy().identity(pyramid_request) - assert identity == Identity.from_models(auth_client=auth_client) + Identity.from_models.assert_called_once_with(auth_client=auth_client, user=None) + assert identity == Identity.from_models.return_value def test_identity_returns_None_without_credentials(self, pyramid_request): pyramid_request.headers["Authorization"] = None @@ -122,6 +125,33 @@ def test_identify_returns_None_on_forwarded_userid_authority_mismatch( assert AuthClientPolicy().identity(pyramid_request) is None + def test_authenticated_userid(self, pyramid_request, Identity): + pyramid_request.headers["X-Forwarded-User"] = sentinel.forwarded_user + + authenticated_userid = AuthClientPolicy().authenticated_userid(pyramid_request) + + Identity.authenticated_userid.assert_called_once_with( + Identity.from_models.return_value + ) + assert authenticated_userid == Identity.authenticated_userid.return_value + + def test_permits(self, pyramid_request, mocker, identity_permits): + auth_client_policy = AuthClientPolicy() + # pylint:disable=no-member + mocker.spy(auth_client_policy, "identity") + + result = auth_client_policy.permits( + pyramid_request, sentinel.context, sentinel.permission + ) + + auth_client_policy.identity.assert_called_once_with(pyramid_request) + identity_permits.assert_called_once_with( + auth_client_policy.identity.spy_return, + sentinel.context, + sentinel.permission, + ) + assert result == identity_permits.return_value + @pytest.fixture def auth_client(self, factories): return factories.ConfidentialAuthClient(grant_type=GrantType.client_credentials) @@ -144,3 +174,17 @@ def user(self, factories, auth_client, user_service): user = factories.User(authority=auth_client.authority) user_service.fetch.return_value = user return user + + +@pytest.fixture(autouse=True) +def Identity(mocker): + return mocker.patch( + "h.security.policy._auth_client.Identity", autospec=True, spec_set=True + ) + + +@pytest.fixture(autouse=True) +def identity_permits(mocker): + return mocker.patch( + "h.security.policy._auth_client.identity_permits", autospec=True, spec_set=True + ) diff --git a/tests/unit/h/security/policy/_cookie_test.py b/tests/unit/h/security/policy/_cookie_test.py index ff2d2dd2ee4..b8811cec868 100644 --- a/tests/unit/h/security/policy/_cookie_test.py +++ b/tests/unit/h/security/policy/_cookie_test.py @@ -4,22 +4,24 @@ import webob from h_matchers import Any -from h.security import Identity from h.security.policy import _cookie from h.security.policy._cookie import CookiePolicy, base64 @pytest.mark.usefixtures("auth_ticket_service") class TestCookiePolicy: - def test_identity(self, pyramid_request, auth_ticket_service, cookie_policy, user): + def test_identity( + self, pyramid_request, auth_ticket_service, cookie_policy, user, Identity + ): identity = cookie_policy.identity(pyramid_request) auth_ticket_service.verify_ticket.assert_called_once_with( user.userid, sentinel.ticket_id ) - assert identity == Identity.from_models( + Identity.from_models.assert_called_once_with( user=auth_ticket_service.verify_ticket.return_value ) + assert identity == Identity.from_models.return_value def test_identity_when_user_marked_as_deleted( self, pyramid_request, auth_ticket_service, cookie_policy @@ -35,6 +37,19 @@ def test_identity_with_no_auth_ticket( assert cookie_policy.identity(pyramid_request) is None + def test_authenticated_userid( + self, pyramid_request, cookie_policy, Identity, mocker + ): + mocker.spy(cookie_policy, "identity") + + authenticated_userid = cookie_policy.authenticated_userid(pyramid_request) + + cookie_policy.identity.assert_called_once_with(pyramid_request) + Identity.authenticated_userid.assert_called_once_with( + cookie_policy.identity.spy_return + ) + assert authenticated_userid == Identity.authenticated_userid.return_value + def test_remember( self, pyramid_request, @@ -63,12 +78,12 @@ def test_remember( assert result == cookie.get_headers.return_value def test_remember_with_existing_user( - self, pyramid_request, auth_ticket_service, user, cookie_policy + self, pyramid_request, user, cookie_policy, Identity ): pyramid_request.session["data"] = "old" # This is a secret parameter used by `pyramid.testing.DummySession` pyramid_request.session["_csrft_"] = "old_csrf_token" - auth_ticket_service.verify_ticket.return_value = user + Identity.authenticated_userid.return_value = user.userid cookie_policy.remember(pyramid_request, user.userid) @@ -99,6 +114,19 @@ def test_forget_when_no_ticket_id_in_cookie( cookie.get_headers.assert_called_once_with(None, max_age=0) assert result == cookie.get_headers.return_value + def test_permits(self, cookie_policy, pyramid_request, mocker, identity_permits): + mocker.spy(cookie_policy, "identity") + + result = cookie_policy.permits( + pyramid_request, sentinel.context, sentinel.permission + ) + + cookie_policy.identity.assert_called_once_with(pyramid_request) + identity_permits.assert_called_once_with( + cookie_policy.identity.spy_return, sentinel.context, sentinel.permission + ) + assert result == identity_permits.return_value + @pytest.mark.parametrize( "method,args", (("identity", ()), ("remember", (sentinel.userid,)), ("forget", ())), @@ -152,3 +180,17 @@ def urlsafe_b64encode(mocker): @pytest.fixture(autouse=True) def urandom(mocker): return mocker.spy(_cookie, "urandom") + + +@pytest.fixture(autouse=True) +def Identity(mocker): + return mocker.patch( + "h.security.policy._cookie.Identity", autospec=True, spec_set=True + ) + + +@pytest.fixture(autouse=True) +def identity_permits(mocker): + return mocker.patch( + "h.security.policy._cookie.identity_permits", autospec=True, spec_set=True + ) diff --git a/tests/unit/h/security/policy/_identity_base_test.py b/tests/unit/h/security/policy/_identity_base_test.py deleted file mode 100644 index a6fcaeef3f2..00000000000 --- a/tests/unit/h/security/policy/_identity_base_test.py +++ /dev/null @@ -1,57 +0,0 @@ -from unittest.mock import sentinel - -import pytest - -from h.security import Identity -from h.security.policy._identity_base import IdentityBasedPolicy - - -class TestIdentityBasedPolicy: - def test_identity_method_does_nothing(self, pyramid_request): - assert IdentityBasedPolicy().identity(pyramid_request) is None - - def test_permits(self, policy, identity, pyramid_request, identity_permits): - result = policy.permits(pyramid_request, sentinel.context, sentinel.permission) - - identity_permits.assert_called_once_with( - identity, sentinel.context, sentinel.permission - ) - assert result == identity_permits.return_value - - def test_authenticated_userid(self, policy, pyramid_request, identity): - assert policy.authenticated_userid(pyramid_request) == identity.user.userid - - def test_authenticated_userid_return_None_if_the_identity_has_no_user( - self, policy, pyramid_request, identity - ): - identity.user = None - - assert policy.authenticated_userid(pyramid_request) is None - - def test_authenticated_userid_return_None_if_the_identity_is_None( - self, policy, pyramid_request - ): - policy.returned_identity = None - - assert policy.authenticated_userid(pyramid_request) is None - - @pytest.fixture - def identity(self, factories): - return Identity.from_models(user=factories.User()) - - @pytest.fixture - def policy(self, identity): - class CustomPolicy(IdentityBasedPolicy): - returned_identity = None - - def identity(self, _request): - return self.returned_identity - - policy = CustomPolicy() - policy.returned_identity = identity - - return policy - - @pytest.fixture(autouse=True) - def identity_permits(self, patch): - return patch("h.security.policy._identity_base.identity_permits") diff --git a/tests/unit/h/security/policy/bearer_token_test.py b/tests/unit/h/security/policy/bearer_token_test.py index 29f9db4b2a7..06e4cf6cdbb 100644 --- a/tests/unit/h/security/policy/bearer_token_test.py +++ b/tests/unit/h/security/policy/bearer_token_test.py @@ -2,7 +2,6 @@ import pytest -from h.security import Identity from h.security.policy.bearer_token import BearerTokenPolicy @@ -26,7 +25,9 @@ def test_handles( assert BearerTokenPolicy.handles(pyramid_request) == expected_result - def test_identity(self, pyramid_request, auth_token_service, user_service): + def test_identity( + self, pyramid_request, auth_token_service, user_service, Identity + ): identity = BearerTokenPolicy().identity(pyramid_request) auth_token_service.get_bearer_token.assert_called_once_with(pyramid_request) @@ -36,7 +37,10 @@ def test_identity(self, pyramid_request, auth_token_service, user_service): user_service.fetch.assert_called_once_with( auth_token_service.validate.return_value.userid ) - assert identity == Identity.from_models(user=user_service.fetch.return_value) + Identity.from_models.assert_called_once_with( + user=user_service.fetch.return_value + ) + assert identity == Identity.from_models.return_value def test_identity_caches(self, pyramid_request, auth_token_service): policy = BearerTokenPolicy() @@ -83,7 +87,46 @@ def test_identity_returns_None_for_user_marked_as_deleted( assert BearerTokenPolicy().identity(pyramid_request) is None + def test_authenticated_userid(self, pyramid_request, Identity): + authenticated_userid = BearerTokenPolicy().authenticated_userid(pyramid_request) + + Identity.authenticated_userid.assert_called_once_with( + Identity.from_models.return_value + ) + assert authenticated_userid == Identity.authenticated_userid.return_value + + def test_permits(self, pyramid_request, mocker, identity_permits): + bearer_token_policy = BearerTokenPolicy() + # pylint:disable=no-member + mocker.spy(bearer_token_policy, "identity") + + result = bearer_token_policy.permits( + pyramid_request, sentinel.context, sentinel.permission + ) + + bearer_token_policy.identity.assert_called_once_with(pyramid_request) + identity_permits.assert_called_once_with( + bearer_token_policy.identity.spy_return, + sentinel.context, + sentinel.permission, + ) + assert result == identity_permits.return_value + @pytest.fixture(autouse=True) def is_api_request(mocker): return mocker.patch("h.security.policy.bearer_token.is_api_request", autospec=True) + + +@pytest.fixture(autouse=True) +def Identity(mocker): + return mocker.patch( + "h.security.policy.bearer_token.Identity", autospec=True, spec_set=True + ) + + +@pytest.fixture(autouse=True) +def identity_permits(mocker): + return mocker.patch( + "h.security.policy.bearer_token.identity_permits", autospec=True, spec_set=True + ) diff --git a/tests/unit/h/security/policy/top_level_test.py b/tests/unit/h/security/policy/top_level_test.py index a9371dcba0b..a4919fb9545 100644 --- a/tests/unit/h/security/policy/top_level_test.py +++ b/tests/unit/h/security/policy/top_level_test.py @@ -20,6 +20,17 @@ def test_identity(self, get_subpolicy, policy, pyramid_request): get_subpolicy.return_value.identity.assert_called_once_with(pyramid_request) assert identity == get_subpolicy.return_value.identity.return_value + def test_authenticated_userid(self, get_subpolicy, policy, pyramid_request): + authenticated_userid = policy.authenticated_userid(pyramid_request) + + get_subpolicy.return_value.authenticated_userid.assert_called_once_with( + pyramid_request + ) + assert ( + authenticated_userid + == get_subpolicy.return_value.authenticated_userid.return_value + ) + def test_remember(self, get_subpolicy, policy, pyramid_request): headers = policy.remember(pyramid_request, sentinel.userid, foo="bar") @@ -28,6 +39,14 @@ def test_remember(self, get_subpolicy, policy, pyramid_request): ) assert headers == get_subpolicy.return_value.remember.return_value + def test_permits(self, get_subpolicy, policy, pyramid_request): + permits = policy.permits(pyramid_request, sentinel.context, sentinel.permission) + + get_subpolicy.return_value.permits.assert_called_once_with( + pyramid_request, sentinel.context, sentinel.permission + ) + assert permits == get_subpolicy.return_value.permits.return_value + @pytest.fixture def policy(self): return TopLevelPolicy()