Skip to content

Commit

Permalink
Extract AuthTicketCookieHelper class
Browse files Browse the repository at this point in the history
  • Loading branch information
seanh committed Aug 15, 2024
1 parent ec6ec7f commit 0318d44
Show file tree
Hide file tree
Showing 8 changed files with 298 additions and 230 deletions.
16 changes: 10 additions & 6 deletions h/security/policy/_api_cookie.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from pyramid.request import Request
from pyramid.security import Allowed, Denied
from webob.cookies import SignedCookieProfile

from h.security.identity import Identity
from h.security.policy._cookie import CookiePolicy
from h.security.permits import identity_permits
from h.security.policy.helpers import AuthTicketCookieHelper

COOKIE_AUTHENTICATABLE_API_REQUESTS = [
("api.groups", "POST"), # Create a new group.
Expand All @@ -13,8 +15,9 @@
class APICookiePolicy:
"""Authenticate API requests with cookies."""

def __init__(self, cookie_policy: CookiePolicy):
self.cookie_policy = cookie_policy
def __init__(self, cookie: SignedCookieProfile, helper: AuthTicketCookieHelper):
self.cookie = cookie
self.helper = helper

@staticmethod
def handles(request: Request) -> bool:
Expand All @@ -25,10 +28,11 @@ def handles(request: Request) -> bool:
) in COOKIE_AUTHENTICATABLE_API_REQUESTS

def identity(self, request: Request) -> Identity | None:
return self.cookie_policy.identity(request)
self.helper.add_vary_by_cookie(request)
return self.helper.identity(self.cookie, request)

def authenticated_userid(self, request: Request) -> str | None:
return self.cookie_policy.authenticated_userid(request)
return Identity.authenticated_userid(self.identity(request))

def permits(self, request: Request, context, permission: str) -> Allowed | Denied:
return self.cookie_policy.permits(request, context, permission)
return identity_permits(self.identity(request), context, permission)
60 changes: 10 additions & 50 deletions h/security/policy/_cookie.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
import base64
from functools import lru_cache
from os import urandom

import webob
from pyramid.security import Allowed, Denied
from webob.cookies import SignedCookieProfile

from h.security.identity import Identity
from h.security.permits import identity_permits
from h.services.auth_ticket import AuthTicketService
from h.security.policy.helpers import AuthTicketCookieHelper


class CookiePolicy:
Expand All @@ -18,32 +14,22 @@ class CookiePolicy:
straps the login for the client (when the popup shows).
"""

def __init__(self, cookie: webob.cookies.SignedCookieProfile):
def __init__(self, cookie: SignedCookieProfile, helper: AuthTicketCookieHelper):
self.cookie = cookie
self.helper = helper

def identity(self, request):
self._add_vary_by_cookie(request)

userid, ticket_id = self._get_cookie_value()

user = request.find_service(AuthTicketService).verify_ticket(userid, ticket_id)

if (not user) or user.deleted:
return None

return Identity.from_models(user=user)
self.helper.add_vary_by_cookie(request)
return self.helper.identity(self.cookie, request)

def authenticated_userid(self, request):
return Identity.authenticated_userid(self.identity(request))

def remember(self, request, userid, **kw): # pylint:disable=unused-argument
"""Get a list of headers which will remember the user in a cookie."""

self._add_vary_by_cookie(request)
self.helper.add_vary_by_cookie(request)

previous_userid = self.authenticated_userid(request)

# Clear the previous session
if previous_userid != userid:
request.session.invalidate()
else:
Expand All @@ -55,37 +41,11 @@ def remember(self, request, userid, **kw): # pylint:disable=unused-argument
request.session.update(data)
request.session.new_csrf_token()

ticket_id = base64.urlsafe_b64encode(urandom(32)).rstrip(b"=").decode("ascii")
request.find_service(AuthTicketService).add_ticket(userid, ticket_id)
return self.cookie.get_headers([userid, ticket_id])
return self.helper.remember(self.cookie, request, userid)

def forget(self, request):
"""Get a list of headers which will delete appropriate cookies."""

self._add_vary_by_cookie(request)

# Clear the session by invalidating it
request.session.invalidate()

_, ticket_id = self._get_cookie_value()

if ticket_id:
request.find_service(AuthTicketService).remove_ticket(ticket_id)

return self.cookie.get_headers(None, max_age=0)
self.helper.add_vary_by_cookie(request)
return self.helper.forget(self.cookie, request)

def permits(self, request, context, permission) -> Allowed | Denied:
return identity_permits(self.identity(request), context, permission)

@staticmethod
@lru_cache # Ensure we only add this once per request
def _add_vary_by_cookie(request):
def vary_add(request, response): # pylint:disable=unused-argument
vary = set(response.vary if response.vary is not None else [])
vary.add("Cookie")
response.vary = list(vary)

request.add_response_callback(vary_add)

def _get_cookie_value(self):
return self.cookie.get_value() or (None, None)
53 changes: 53 additions & 0 deletions h/security/policy/helpers.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,56 @@
from base64 import urlsafe_b64encode
from functools import lru_cache
from os import urandom

from pyramid.request import Request
from webob.cookies import SignedCookieProfile

from h.security.identity import Identity
from h.services.auth_ticket import AuthTicketService


def is_api_request(request) -> bool:
"""Return True if `request` is an API request."""
return bool(request.matched_route and request.matched_route.name.startswith("api."))


class AuthTicketCookieHelper:
def identity(
self, cookie: SignedCookieProfile, request: Request
) -> Identity | None:
userid, ticket_id = self.get_cookie_value(cookie)

user = request.find_service(AuthTicketService).verify_ticket(userid, ticket_id)

if (not user) or user.deleted:
return None

return Identity.from_models(user=user)

def remember(self, cookie: SignedCookieProfile, request: Request, userid: str):
ticket_id = urlsafe_b64encode(urandom(32)).rstrip(b"=").decode("ascii")
request.find_service(AuthTicketService).add_ticket(userid, ticket_id)
return cookie.get_headers([userid, ticket_id])

def forget(self, cookie: SignedCookieProfile, request: Request):
request.session.invalidate()
_, ticket_id = self.get_cookie_value(cookie)

if ticket_id:
request.find_service(AuthTicketService).remove_ticket(ticket_id)

return cookie.get_headers(None, max_age=0)

@staticmethod
@lru_cache # Ensure we only add this once per request
def add_vary_by_cookie(request: Request):
def vary_add(request, response): # pylint:disable=unused-argument
vary = set(response.vary if response.vary is not None else [])
vary.add("Cookie")
response.vary = list(vary)

request.add_response_callback(vary_add)

@staticmethod
def get_cookie_value(cookie: SignedCookieProfile):
return cookie.get_value() or (None, None)
6 changes: 3 additions & 3 deletions h/security/policy/top_level.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from h.security.policy._auth_client import AuthClientPolicy
from h.security.policy._cookie import CookiePolicy
from h.security.policy.bearer_token import BearerTokenPolicy
from h.security.policy.helpers import is_api_request
from h.security.policy.helpers import AuthTicketCookieHelper, is_api_request


class TopLevelPolicy:
Expand Down Expand Up @@ -56,8 +56,8 @@ def get_subpolicy(request):
[
BearerTokenPolicy(),
AuthClientPolicy(),
APICookiePolicy(CookiePolicy(cookie)),
APICookiePolicy(cookie, AuthTicketCookieHelper()),
]
)

return CookiePolicy(cookie)
return CookiePolicy(cookie, AuthTicketCookieHelper())
53 changes: 38 additions & 15 deletions tests/unit/h/security/policy/_api_cookie_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import pytest

from h.security.policy._api_cookie import APICookiePolicy
from h.security.policy._cookie import CookiePolicy
from h.security.policy.helpers import AuthTicketCookieHelper


class TestAPICookiePolicy:
Expand All @@ -24,34 +24,57 @@ def test_handles(

assert APICookiePolicy.handles(pyramid_request) == expected_result

def test_identity(self, api_cookie_policy, cookie_policy, pyramid_request):
def test_identity(self, api_cookie_policy, helper, pyramid_request):
identity = api_cookie_policy.identity(pyramid_request)

cookie_policy.identity.assert_called_once_with(pyramid_request)
assert identity == cookie_policy.identity.return_value
helper.add_vary_by_cookie.assert_called_once_with(pyramid_request)
helper.identity.assert_called_once_with(sentinel.cookie, pyramid_request)
assert identity == helper.identity.return_value

def test_authenticated_userid(
self, api_cookie_policy, cookie_policy, pyramid_request
self, api_cookie_policy, helper, pyramid_request, Identity
):
authenticated_userid = api_cookie_policy.authenticated_userid(pyramid_request)

cookie_policy.authenticated_userid.assert_called_once_with(pyramid_request)
assert authenticated_userid == cookie_policy.authenticated_userid.return_value
helper.add_vary_by_cookie.assert_called_once_with(pyramid_request)
helper.identity.assert_called_once_with(sentinel.cookie, pyramid_request)
Identity.authenticated_userid.assert_called_once_with(
helper.identity.return_value
)
assert authenticated_userid == Identity.authenticated_userid.return_value

def test_permits(self, api_cookie_policy, cookie_policy, pyramid_request):
def test_permits(
self, api_cookie_policy, helper, pyramid_request, identity_permits
):
permits = api_cookie_policy.permits(
pyramid_request, sentinel.context, sentinel.permission
)

cookie_policy.permits.assert_called_once_with(
pyramid_request, sentinel.context, sentinel.permission
helper.add_vary_by_cookie.assert_called_once_with(pyramid_request)
helper.identity.assert_called_once_with(sentinel.cookie, pyramid_request)
identity_permits.assert_called_once_with(
helper.identity.return_value, sentinel.context, sentinel.permission
)
assert permits == cookie_policy.permits.return_value
assert permits == identity_permits.return_value

@pytest.fixture
def cookie_policy(self):
return create_autospec(CookiePolicy, instance=True, spec_set=True)
def helper(self):
return create_autospec(AuthTicketCookieHelper, instance=True, spec_set=True)

@pytest.fixture
def api_cookie_policy(self, cookie_policy):
return APICookiePolicy(cookie_policy)
def api_cookie_policy(self, helper):
return APICookiePolicy(sentinel.cookie, helper)


@pytest.fixture(autouse=True)
def Identity(mocker):
return mocker.patch(
"h.security.policy._api_cookie.Identity", autospec=True, spec_set=True
)


@pytest.fixture(autouse=True)
def identity_permits(mocker):
return mocker.patch(
"h.security.policy._api_cookie.identity_permits", autospec=True, spec_set=True
)
Loading

0 comments on commit 0318d44

Please sign in to comment.