Skip to content

Commit

Permalink
Store screenshots as attachments in NDB. (#4473)
Browse files Browse the repository at this point in the history
  • Loading branch information
jrobbins authored Oct 24, 2024
1 parent bbfa6c8 commit da48531
Show file tree
Hide file tree
Showing 2 changed files with 251 additions and 0 deletions.
131 changes: 131 additions & 0 deletions internals/attachments.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
# Copyright 2024 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License")
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
from google.appengine.api import images
from google.cloud import ndb # type: ignore
from typing import Tuple


RESIZABLE_MIME_TYPES = [
'image/png', 'image/jpg', 'image/jpeg', 'image/gif', 'image/webp',
]
THUMB_WIDTH = 250
THUMB_HEIGHT = 200
MAX_ATTACHMENT_SIZE = 10 * 1024 * 1024 # 10 MB


class Attachment(ndb.Model):
"""Attaches files, such as screenshots, to a feature entry."""
feature_id = ndb.IntegerProperty(required=True)
created_on = ndb.DateTimeProperty(auto_now_add=True)
content = ndb.BlobProperty(required=True)
mime_type = ndb.StringProperty(required=True)
thumbnail = ndb.BlobProperty()
is_deleted = ndb.BooleanProperty(default=False)


class UnsupportedMimeType(Exception):
pass

class AttachmentTooLarge(Exception):
pass


_EXTENSION_TO_CTYPE_TABLE = {
# These are image types that we trust the browser to display.
'gif': 'image/gif',
'jpg': 'image/jpeg',
'jpeg': 'image/jpeg',
'png': 'image/png',
'webp': 'image/webp',
'txt': 'text/plain',
}


def guess_content_type_from_filename(filename: str) -> str:
"""Guess a file's content type based on the filename extension."""
ext = filename.split('.')[-1] if ('.' in filename) else ''
ext = ext.lower()
return _EXTENSION_TO_CTYPE_TABLE.get(ext, 'application/octet-stream')


def store_attachment(feature_id: int, content: bytes, mime_type: str) -> str:
""""Store some data for an attachment. Return its URI."""
check_attachment_size(content)
check_attachment_type(mime_type)
logging.info('Storing attachment with %r bytes', len(content))

attachment = Attachment(
feature_id=feature_id,
content=content,
mime_type=mime_type)

if mime_type in RESIZABLE_MIME_TYPES:
# Create and save a thumbnail too.
thumb_content = None
try:
thumb_content = images.resize(content, THUMB_WIDTH, THUMB_HEIGHT)
except images.LargeImageError:
# Don't log the whole exception because we don't need to see
# this on the Cloud Error Reporting page.
logging.info('Got LargeImageError on image with %d bytes', len(content))
except Exception as e:
# Do not raise exception for incorrectly formed images.
# See https://bugs.chromium.org/p/monorail/issues/detail?id=597 for more
# detail.
logging.exception(e)
if thumb_content:
attachment.thumbnail = thumb_content

attachment.put()
return attachment


def check_attachment_size(content: bytes):
"""Reject attachments that are too large."""
if len(content) > MAX_ATTACHMENT_SIZE:
raise AttachmentTooLarge('Attachment size %r is too large' % len(content))


def check_attachment_type(mime_type: str):
"""Reject attachments that are of an unsupported type."""
if mime_type not in _EXTENSION_TO_CTYPE_TABLE.values():
raise UnsupportedMimeType(
'Please upload an image with one of the following mime types:\n%s' %
', '.join(_EXTENSION_TO_CTYPE_TABLE.values()))


def get_attachment(feature_id: int, attachment_id: int) -> Attachment|None:
"""Return attachment, if feature_id matches, and attachment is not deleted."""
attachment = Attachment.get_by_id(attachment_id)
if (attachment and
attachment.feature_id == feature_id and
not attachment.is_deleted):
return attachment

return None


def get_attachment_uri(attachment: Attachment) -> str:
"""Return the URI path that will serve tis attachment."""
uri = '/feature/%r/attachment/%r' % (
attachment.feature_id, attachment.key.integer_id())
return uri


def mark_attachment_deleted(attachment: Attachment) -> None:
"""Mark an attachment as deleted so that it will no longer be served."""
attachment.is_deleted = True
attachment.put()
120 changes: 120 additions & 0 deletions internals/attachments_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
# Copyright 2024 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License")
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import testing_config # Must be imported before the module under test.

from unittest import mock

from internals import attachments


class AttachmentsTests(testing_config.CustomTestCase):

def setUp(self):
self.feature_id = 12345678

def tearDown(self):
for attach in attachments.Attachment.query().fetch(None):
attach.key.delete()

def test_guess_content_type_from_filename(self):
"""We can guess mime type based on filename extension."""
guess = attachments.guess_content_type_from_filename
self.assertEqual(guess('screenshot.gif'), 'image/gif')
self.assertEqual(guess('screenshot.jpg'), 'image/jpeg')
self.assertEqual(guess('screenshot.jpeg'), 'image/jpeg')
self.assertEqual(guess('screenshot.png'), 'image/png')
self.assertEqual(guess('screenshot.webp'), 'image/webp')

self.assertEqual(guess('screen.shot.webp'), 'image/webp')
self.assertEqual(guess('.webp'), 'image/webp')
self.assertEqual(guess('screen shot.webp'), 'image/webp')

self.assertEqual(guess('screenshot.pdf'), 'application/octet-stream')
self.assertEqual(guess('screenshot gif'), 'application/octet-stream')
self.assertEqual(guess('screenshotgif'), 'application/octet-stream')
self.assertEqual(guess('gif'), 'application/octet-stream')
self.assertEqual(guess(''), 'application/octet-stream')

def test_store_attachment(self):
"""We can store attachment content."""
actual = attachments.store_attachment(
self.feature_id, b'test content', 'text/plain')
attach_id = actual.key.integer_id()
retrieved = attachments.Attachment.get_by_id(attach_id)
self.assertEqual(retrieved.feature_id, self.feature_id)
self.assertEqual(retrieved.content, b'test content')
self.assertEqual(retrieved.mime_type, 'text/plain')

def test_check_attachment_size(self):
"""We can check the size of the attachment content."""
attachments.check_attachment_size(b'small content')

large_content = b'very large content ' * 1024 * 1024
with self.assertRaises(attachments.AttachmentTooLarge):
attachments.check_attachment_size(large_content)

def test_check_attachment_type(self):
"""We can check the type of the attachment."""
attachments.check_attachment_type('image/gif')
attachments.check_attachment_type('image/jpeg')
attachments.check_attachment_type('image/png')
attachments.check_attachment_type('image/webp')

with self.assertRaises(attachments.UnsupportedMimeType):
attachments.check_attachment_type('application/octet-stream')

with self.assertRaises(attachments.UnsupportedMimeType):
attachments.check_attachment_type('video/mpeg')

def test_get_attachment__found(self):
"""We can retrive an attachment, with checking for the proper feature."""
stored = attachments.store_attachment(
self.feature_id, b'test content', 'text/plain')
attach_id = stored.key.integer_id()

actual = attachments.get_attachment(self.feature_id, attach_id)

self.assertEqual(actual.feature_id, self.feature_id)
self.assertEqual(actual.content, b'test content')
self.assertEqual(actual.mime_type, 'text/plain')

def test_get_attachment__not_found(self):
"""We return None when there is no such attachment."""
# Nothing is stored
actual = attachments.get_attachment(self.feature_id, 99999)
self.assertEqual(actual, None)

def test_get_attachment__reject_probing(self):
"""We return None when an attempt is made to find all attachments."""
stored = attachments.store_attachment(
self.feature_id, b'test content', 'text/plain')
attach_id = stored.key.integer_id()

# The attachment is not part of any other feature.
actual = attachments.get_attachment(self.feature_id + 1, attach_id)

self.assertEqual(actual, None)

def test_get_attachment__deleted(self):
"""We return None when attempting to get a deleted attachment."""
stored = attachments.store_attachment(
self.feature_id, b'test content', 'text/plain')
attachments.mark_attachment_deleted(stored)

# The attachment was marked deleted.
actual = attachments.get_attachment(
self.feature_id + 1, stored.key.integer_id())

self.assertEqual(actual, None)

0 comments on commit da48531

Please sign in to comment.