diff --git a/h/storage.py b/h/storage.py index 439d1ad8ca0..e5b04222584 100644 --- a/h/storage.py +++ b/h/storage.py @@ -5,26 +5,11 @@ for storing and retrieving annotations. Data passed to these functions is assumed to be validated. """ -# FIXME: This module was originally written to be a single point of -# indirection through which the storage backend could be swapped out on -# the fly. This helped us to migrate from Elasticsearch-based -# persistence to PostgreSQL persistence. -# -# The purpose of this module is now primarily to serve as a place to -# wrap up the business logic of creating and retrieving annotations. As -# such, it probably makes more sense for this to be split up into a -# couple of different services at some point. - -from datetime import datetime from pyramid import i18n -from h import models, schemas +from h import models from h.db import types -from h.models.document import update_document_metadata -from h.security import Permission -from h.traversal.group import GroupContext -from h.util.group_scope import url_in_scope from h.util.uri import normalize as normalize_uri _ = i18n.TranslationStringFactory(__package__) @@ -49,70 +34,6 @@ def fetch_annotation(session, id_): return None -def create_annotation(request, data): - """ - Create an annotation from already-validated data. - - :param request: the request object - :param data: an annotation data dict that has already been validated by - :py:class:`h.schemas.annotation.CreateAnnotationSchema` - - :returns: the created and flushed annotation - """ - document_data = data.pop("document", {}) - - # Replies must have the same group as their parent. - if data["references"]: - root_annotation_id = data["references"][0] - - if root_annotation := fetch_annotation(request.db, root_annotation_id): - data["groupid"] = root_annotation.groupid - else: - raise schemas.ValidationError( - "references.0: " - + _("Annotation {id} does not exist").format(id=root_annotation_id) - ) - - # Create the annotation and enable relationship loading so we can access - # the group, even though we've not added this to the session yet - annotation = models.Annotation(**data) - request.db.enable_relationship_loading(annotation) - - group = annotation.group - if not group: - raise schemas.ValidationError( - "group: " + _(f"Invalid group id {annotation.groupid}") - ) - - # The user must have permission to create an annotation in the group - # they've asked to create one in. - if not request.has_permission(Permission.Group.WRITE, context=GroupContext(group)): - raise schemas.ValidationError( - "group: " + _("You may not create annotations in the specified group!") - ) - - _validate_group_scope(group, data["target_uri"]) - - annotation.created = annotation.updated = datetime.utcnow() - annotation.document = update_document_metadata( - request.db, - annotation.target_uri, - document_data["document_meta_dicts"], - document_data["document_uri_dicts"], - created=annotation.created, - updated=annotation.updated, - ) - - request.db.add(annotation) - request.db.flush() - - request.find_service( # pylint: disable=protected-access - name="search_index" - )._queue.add_by_id(annotation.id, tag="storage.create_annotation", schedule_in=60) - - return annotation - - def expand_uri(session, uri, normalized=False): """ Return all URIs which refer to the same underlying document as `uri`. @@ -161,18 +82,3 @@ def expand_uri(session, uri, normalized=False): return [uri_normalized for _, _, uri_normalized in type_uris] return [plain_uri for _, plain_uri, _ in type_uris] - - -def _validate_group_scope(group, target_uri): - # If no scopes are present, or if the group is configured to allow - # annotations outside of its scope, there's nothing to do here - if not group.scopes or not group.enforce_scope: - return - # The target URI must match at least one - # of a group's defined scopes, if the group has any - group_scopes = [scope.scope for scope in group.scopes] - if not url_in_scope(target_uri, group_scopes): - raise schemas.ValidationError( - "group scope: " - + _("Annotations for this target URI are not allowed in this group") - ) diff --git a/tests/h/storage_test.py b/tests/h/storage_test.py index 7c758ec6314..ff4b2bf4f38 100644 --- a/tests/h/storage_test.py +++ b/tests/h/storage_test.py @@ -1,18 +1,7 @@ -from datetime import datetime as datetime_ -from datetime import timedelta -from unittest.mock import create_autospec, sentinel - import pytest -import sqlalchemy as sa -from h_matchers import Any from h import storage from h.models.document import Document, DocumentURI -from h.schemas import ValidationError -from h.security import Permission -from h.traversal.group import GroupContext - -pytestmark = pytest.mark.usefixtures("search_index") class TestFetchAnnotation: @@ -96,215 +85,3 @@ def test_expand_uri_document_uris(self, db_session, normalized, expected_uris): ) assert uris == expected_uris - - -class TestCreateAnnotation: - def test_it(self, pyramid_request, annotation_data, datetime): - annotation = storage.create_annotation(pyramid_request, annotation_data) - - for param, value in annotation_data.items(): - assert getattr(annotation, param) == value - - assert annotation.created == datetime.utcnow.return_value - assert annotation.updated == datetime.utcnow.return_value - - assert sa.inspect(annotation).persistent # We saved it to the DB - - def test_it_validates_the_group_scope( - self, pyramid_request, annotation_data, group, _validate_group_scope - ): - storage.create_annotation(pyramid_request, annotation_data) - - _validate_group_scope.assert_called_once_with( - group, annotation_data["target_uri"] - ) - - def test_it_adds_document_metadata( - self, pyramid_request, annotation_data, update_document_metadata, datetime - ): - annotation_data["document"] = { - "document_meta_dicts": sentinel.document_meta_dicts, - "document_uri_dicts": sentinel.document_uri_dicts, - } - - annotation = storage.create_annotation(pyramid_request, annotation_data) - - update_document_metadata.assert_called_once_with( - pyramid_request.db, - annotation_data["target_uri"], - sentinel.document_meta_dicts, - sentinel.document_uri_dicts, - created=datetime.utcnow.return_value, - updated=datetime.utcnow.return_value, - ) - assert annotation.document == update_document_metadata.return_value - - def test_it_queues_the_search_index( - self, pyramid_request, annotation_data, search_index - ): - annotation = storage.create_annotation(pyramid_request, annotation_data) - - search_index._queue.add_by_id.assert_called_once_with( # pylint:disable=protected-access - annotation.id, tag="storage.create_annotation", schedule_in=60 - ) - - def test_it_sets_the_group_to_match_the_parent_for_replies( - self, pyramid_request, annotation_data, factories, other_group - ): - parent_annotation = factories.Annotation(group=other_group) - annotation_data["references"] = [parent_annotation.id] - - annotation = storage.create_annotation(pyramid_request, annotation_data) - - assert annotation.groupid - assert annotation.group == parent_annotation.group - - def test_it_raises_if_parent_annotation_does_not_exist( - self, pyramid_request, annotation_data - ): - annotation_data["references"] = ["MISSING_ID"] - - with pytest.raises(ValidationError): - storage.create_annotation(pyramid_request, annotation_data) - - def test_it_raises_if_the_group_doesnt_exist( - self, pyramid_request, annotation_data - ): - annotation_data["groupid"] = "MISSING_ID" - - with pytest.raises(ValidationError): - storage.create_annotation(pyramid_request, annotation_data) - - def test_it_raises_if_write_permission_is_missing( - self, pyramid_request, annotation_data, has_permission - ): - has_permission.return_value = False - - with pytest.raises(ValidationError): - storage.create_annotation(pyramid_request, annotation_data) - - has_permission.assert_called_once_with( - Permission.Group.WRITE, context=Any.instance_of(GroupContext) - ) - - def test_it_does_not_crash_if_target_selectors_is_empty( - self, pyramid_request, annotation_data - ): - # Page notes have [] for target_selectors. - annotation_data["target_selectors"] = [] - - storage.create_annotation(pyramid_request, annotation_data) - - @pytest.mark.xfail(reason="This test passed before due to over fixturing") - def test_it_does_not_crash_if_no_text_or_tags( - self, pyramid_request, annotation_data - ): - # Highlights have no text or tags. - annotation_data["text"] = annotation_data["tags"] = "" - - # ValueError: Attribute 'tags' does not accept objects of type - # So what should this be? None? - storage.create_annotation(pyramid_request, annotation_data) - - @pytest.fixture - def other_group(self, factories): - # Set an authority_provided_id so our group_id is not None - return factories.OpenGroup(authority_provided_id="other_group_auth_id") - - @pytest.fixture - def has_permission(self, pyramid_request): - pyramid_request.has_permission = create_autospec(pyramid_request.has_permission) - return pyramid_request.has_permission - - -class TestValidateGroupScope: - def test_it_allows_matching_scopes(self, scoped_group): - storage._validate_group_scope( # pylint:disable=protected-access - scoped_group, "http://inscope.example.com" - ) - - def test_it_allows_mismatching_scopes_if_a_group_has_no_scopes( - self, scoped_group, url_in_scope - ): - scoped_group.scopes = [] - - storage._validate_group_scope( # pylint:disable=protected-access - scoped_group, "http://not-inscope.example.com" - ) - - url_in_scope.assert_not_called() - - def test_it_allows_mismatching_scopes_if_enforce_is_False( - self, scoped_group, url_in_scope - ): - scoped_group.enforce_scope = False - - storage._validate_group_scope( # pylint:disable=protected-access - scoped_group, "http://not-inscope.example.com" - ) - - url_in_scope.assert_not_called() - - def test_it_catches_mismatching_scopes(self, scoped_group, url_in_scope): - url_in_scope.return_value = False - - with pytest.raises(ValidationError): - storage._validate_group_scope( # pylint:disable=protected-access - scoped_group, "http://not-inscope.example.com" - ) - - @pytest.fixture - def scoped_group(self, factories): - return factories.OpenGroup( - enforce_scope=True, - scopes=[factories.GroupScope(scope="http://inscope.example.com")], - ) - - @pytest.fixture - def url_in_scope(self, patch): - return patch("h.storage.url_in_scope") - - -@pytest.fixture -def group(factories): - # Set an authority_provided_id so our group_id is not None - return factories.OpenGroup(authority_provided_id="group_auth_id") - - -@pytest.fixture -def user(factories): - return factories.User() - - -@pytest.fixture -def annotation_data(user, group): - return { - "userid": user.userid, - "text": "text", - "tags": ["one", "two"], - "shared": False, - "target_uri": "http://www.example.com/example.html", - "groupid": group.pubid, - "references": [], - "target_selectors": ["selector_one", "selector_two"], - "document": {"document_uri_dicts": [], "document_meta_dicts": []}, - } - - -@pytest.fixture -def datetime(patch): - datetime = patch("h.storage.datetime") - datetime.utcnow.return_value = datetime_.utcnow() + timedelta(hours=1) - return datetime - - -@pytest.fixture -def _validate_group_scope(patch): - return patch("h.storage._validate_group_scope") - - -@pytest.fixture -def update_document_metadata(patch, factories): - update_document_metadata = patch("h.storage.update_document_metadata") - update_document_metadata.return_value = factories.Document() - return update_document_metadata