From da48531b0fe4698ba31a0ae588afaea3715628c8 Mon Sep 17 00:00:00 2001 From: Jason Robbins Date: Thu, 24 Oct 2024 12:59:55 -0700 Subject: [PATCH] Store screenshots as attachments in NDB. (#4473) --- internals/attachments.py | 131 ++++++++++++++++++++++++++++++++++ internals/attachments_test.py | 120 +++++++++++++++++++++++++++++++ 2 files changed, 251 insertions(+) create mode 100644 internals/attachments.py create mode 100644 internals/attachments_test.py diff --git a/internals/attachments.py b/internals/attachments.py new file mode 100644 index 000000000000..016d17236539 --- /dev/null +++ b/internals/attachments.py @@ -0,0 +1,131 @@ +# Copyright 2024 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License") +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from google.appengine.api import images +from google.cloud import ndb # type: ignore +from typing import Tuple + + +RESIZABLE_MIME_TYPES = [ + 'image/png', 'image/jpg', 'image/jpeg', 'image/gif', 'image/webp', + ] +THUMB_WIDTH = 250 +THUMB_HEIGHT = 200 +MAX_ATTACHMENT_SIZE = 10 * 1024 * 1024 # 10 MB + + +class Attachment(ndb.Model): + """Attaches files, such as screenshots, to a feature entry.""" + feature_id = ndb.IntegerProperty(required=True) + created_on = ndb.DateTimeProperty(auto_now_add=True) + content = ndb.BlobProperty(required=True) + mime_type = ndb.StringProperty(required=True) + thumbnail = ndb.BlobProperty() + is_deleted = ndb.BooleanProperty(default=False) + + +class UnsupportedMimeType(Exception): + pass + +class AttachmentTooLarge(Exception): + pass + + +_EXTENSION_TO_CTYPE_TABLE = { + # These are image types that we trust the browser to display. + 'gif': 'image/gif', + 'jpg': 'image/jpeg', + 'jpeg': 'image/jpeg', + 'png': 'image/png', + 'webp': 'image/webp', + 'txt': 'text/plain', +} + + +def guess_content_type_from_filename(filename: str) -> str: + """Guess a file's content type based on the filename extension.""" + ext = filename.split('.')[-1] if ('.' in filename) else '' + ext = ext.lower() + return _EXTENSION_TO_CTYPE_TABLE.get(ext, 'application/octet-stream') + + +def store_attachment(feature_id: int, content: bytes, mime_type: str) -> str: + """"Store some data for an attachment. Return its URI.""" + check_attachment_size(content) + check_attachment_type(mime_type) + logging.info('Storing attachment with %r bytes', len(content)) + + attachment = Attachment( + feature_id=feature_id, + content=content, + mime_type=mime_type) + + if mime_type in RESIZABLE_MIME_TYPES: + # Create and save a thumbnail too. + thumb_content = None + try: + thumb_content = images.resize(content, THUMB_WIDTH, THUMB_HEIGHT) + except images.LargeImageError: + # Don't log the whole exception because we don't need to see + # this on the Cloud Error Reporting page. + logging.info('Got LargeImageError on image with %d bytes', len(content)) + except Exception as e: + # Do not raise exception for incorrectly formed images. + # See https://bugs.chromium.org/p/monorail/issues/detail?id=597 for more + # detail. + logging.exception(e) + if thumb_content: + attachment.thumbnail = thumb_content + + attachment.put() + return attachment + + +def check_attachment_size(content: bytes): + """Reject attachments that are too large.""" + if len(content) > MAX_ATTACHMENT_SIZE: + raise AttachmentTooLarge('Attachment size %r is too large' % len(content)) + + +def check_attachment_type(mime_type: str): + """Reject attachments that are of an unsupported type.""" + if mime_type not in _EXTENSION_TO_CTYPE_TABLE.values(): + raise UnsupportedMimeType( + 'Please upload an image with one of the following mime types:\n%s' % + ', '.join(_EXTENSION_TO_CTYPE_TABLE.values())) + + +def get_attachment(feature_id: int, attachment_id: int) -> Attachment|None: + """Return attachment, if feature_id matches, and attachment is not deleted.""" + attachment = Attachment.get_by_id(attachment_id) + if (attachment and + attachment.feature_id == feature_id and + not attachment.is_deleted): + return attachment + + return None + + +def get_attachment_uri(attachment: Attachment) -> str: + """Return the URI path that will serve tis attachment.""" + uri = '/feature/%r/attachment/%r' % ( + attachment.feature_id, attachment.key.integer_id()) + return uri + + +def mark_attachment_deleted(attachment: Attachment) -> None: + """Mark an attachment as deleted so that it will no longer be served.""" + attachment.is_deleted = True + attachment.put() diff --git a/internals/attachments_test.py b/internals/attachments_test.py new file mode 100644 index 000000000000..8c5e46b9c764 --- /dev/null +++ b/internals/attachments_test.py @@ -0,0 +1,120 @@ +# Copyright 2024 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License") +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import testing_config # Must be imported before the module under test. + +from unittest import mock + +from internals import attachments + + +class AttachmentsTests(testing_config.CustomTestCase): + + def setUp(self): + self.feature_id = 12345678 + + def tearDown(self): + for attach in attachments.Attachment.query().fetch(None): + attach.key.delete() + + def test_guess_content_type_from_filename(self): + """We can guess mime type based on filename extension.""" + guess = attachments.guess_content_type_from_filename + self.assertEqual(guess('screenshot.gif'), 'image/gif') + self.assertEqual(guess('screenshot.jpg'), 'image/jpeg') + self.assertEqual(guess('screenshot.jpeg'), 'image/jpeg') + self.assertEqual(guess('screenshot.png'), 'image/png') + self.assertEqual(guess('screenshot.webp'), 'image/webp') + + self.assertEqual(guess('screen.shot.webp'), 'image/webp') + self.assertEqual(guess('.webp'), 'image/webp') + self.assertEqual(guess('screen shot.webp'), 'image/webp') + + self.assertEqual(guess('screenshot.pdf'), 'application/octet-stream') + self.assertEqual(guess('screenshot gif'), 'application/octet-stream') + self.assertEqual(guess('screenshotgif'), 'application/octet-stream') + self.assertEqual(guess('gif'), 'application/octet-stream') + self.assertEqual(guess(''), 'application/octet-stream') + + def test_store_attachment(self): + """We can store attachment content.""" + actual = attachments.store_attachment( + self.feature_id, b'test content', 'text/plain') + attach_id = actual.key.integer_id() + retrieved = attachments.Attachment.get_by_id(attach_id) + self.assertEqual(retrieved.feature_id, self.feature_id) + self.assertEqual(retrieved.content, b'test content') + self.assertEqual(retrieved.mime_type, 'text/plain') + + def test_check_attachment_size(self): + """We can check the size of the attachment content.""" + attachments.check_attachment_size(b'small content') + + large_content = b'very large content ' * 1024 * 1024 + with self.assertRaises(attachments.AttachmentTooLarge): + attachments.check_attachment_size(large_content) + + def test_check_attachment_type(self): + """We can check the type of the attachment.""" + attachments.check_attachment_type('image/gif') + attachments.check_attachment_type('image/jpeg') + attachments.check_attachment_type('image/png') + attachments.check_attachment_type('image/webp') + + with self.assertRaises(attachments.UnsupportedMimeType): + attachments.check_attachment_type('application/octet-stream') + + with self.assertRaises(attachments.UnsupportedMimeType): + attachments.check_attachment_type('video/mpeg') + + def test_get_attachment__found(self): + """We can retrive an attachment, with checking for the proper feature.""" + stored = attachments.store_attachment( + self.feature_id, b'test content', 'text/plain') + attach_id = stored.key.integer_id() + + actual = attachments.get_attachment(self.feature_id, attach_id) + + self.assertEqual(actual.feature_id, self.feature_id) + self.assertEqual(actual.content, b'test content') + self.assertEqual(actual.mime_type, 'text/plain') + + def test_get_attachment__not_found(self): + """We return None when there is no such attachment.""" + # Nothing is stored + actual = attachments.get_attachment(self.feature_id, 99999) + self.assertEqual(actual, None) + + def test_get_attachment__reject_probing(self): + """We return None when an attempt is made to find all attachments.""" + stored = attachments.store_attachment( + self.feature_id, b'test content', 'text/plain') + attach_id = stored.key.integer_id() + + # The attachment is not part of any other feature. + actual = attachments.get_attachment(self.feature_id + 1, attach_id) + + self.assertEqual(actual, None) + + def test_get_attachment__deleted(self): + """We return None when attempting to get a deleted attachment.""" + stored = attachments.store_attachment( + self.feature_id, b'test content', 'text/plain') + attachments.mark_attachment_deleted(stored) + + # The attachment was marked deleted. + actual = attachments.get_attachment( + self.feature_id + 1, stored.key.integer_id()) + + self.assertEqual(actual, None)