-
Notifications
You must be signed in to change notification settings - Fork 54
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
5 changed files
with
305 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,164 @@ | ||
import unittest | ||
import io | ||
import imghdr | ||
|
||
from PIL import Image as PILImage | ||
|
||
from willow.image import ( | ||
GIFImageFile, BadImageOperationError, WebMVP9ImageFile, OggTheoraImageFile, MP4H264ImageFile | ||
) | ||
from willow.plugins.ffmpeg import FFMpegLazyVideo, probe | ||
|
||
|
||
class TestFFMpegOperations(unittest.TestCase): | ||
def setUp(self): | ||
self.f = open('tests/images/newtons_cradle.gif', 'rb') | ||
self.image = FFMpegLazyVideo.open(GIFImageFile(self.f)) | ||
|
||
def tearDown(self): | ||
self.f.close() | ||
|
||
def test_get_size(self): | ||
width, height = self.image.get_size() | ||
self.assertEqual(width, 480) | ||
self.assertEqual(height, 360) | ||
|
||
def test_get_frame_count(self): | ||
frames = self.image.get_frame_count() | ||
self.assertEqual(frames, 34) | ||
|
||
def test_resize(self): | ||
resized_image = self.image.resize((100, 75)) | ||
self.assertEqual(resized_image.get_size(), (100, 75)) | ||
|
||
def test_crop(self): | ||
cropped_image = self.image.crop((10, 10, 100, 100)) | ||
self.assertEqual(cropped_image.get_size(), (90, 90)) | ||
|
||
def test_crop_out_of_bounds(self): | ||
# crop rectangle should be clamped to the image boundaries | ||
bottom_right_cropped_image = self.image.crop((150, 100, 250, 200)) | ||
self.assertEqual(bottom_right_cropped_image.get_size(), (50, 50)) | ||
|
||
top_left_cropped_image = self.image.crop((-50, -50, 50, 50)) | ||
self.assertEqual(top_left_cropped_image.get_size(), (50, 50)) | ||
|
||
# fail if the crop rectangle is entirely to the left of the image | ||
with self.assertRaises(BadImageOperationError): | ||
self.image.crop((-100, 50, -50, 100)) | ||
# right edge of crop rectangle is exclusive, so 0 is also invalid | ||
with self.assertRaises(BadImageOperationError): | ||
self.image.crop((-50, 50, 0, 100)) | ||
|
||
# fail if the crop rectangle is entirely above the image | ||
with self.assertRaises(BadImageOperationError): | ||
self.image.crop((50, -100, 100, -50)) | ||
# bottom edge of crop rectangle is exclusive, so 0 is also invalid | ||
with self.assertRaises(BadImageOperationError): | ||
self.image.crop((50, -50, 100, 0)) | ||
|
||
# fail if the crop rectangle is entirely to the right of the image | ||
with self.assertRaises(BadImageOperationError): | ||
self.image.crop((250, 50, 300, 100)) | ||
with self.assertRaises(BadImageOperationError): | ||
self.image.crop((200, 50, 250, 100)) | ||
|
||
# fail if the crop rectangle is entirely below the image | ||
with self.assertRaises(BadImageOperationError): | ||
self.image.crop((50, 200, 100, 250)) | ||
with self.assertRaises(BadImageOperationError): | ||
self.image.crop((50, 150, 100, 200)) | ||
|
||
# fail if left edge >= right edge | ||
with self.assertRaises(BadImageOperationError): | ||
self.image.crop((125, 25, 25, 125)) | ||
with self.assertRaises(BadImageOperationError): | ||
self.image.crop((100, 25, 100, 125)) | ||
|
||
# fail if bottom edge >= top edge | ||
with self.assertRaises(BadImageOperationError): | ||
self.image.crop((25, 125, 125, 25)) | ||
with self.assertRaises(BadImageOperationError): | ||
self.image.crop((25, 100, 125, 100)) | ||
|
||
def test_rotate(self): | ||
rotated_image = self.image.rotate(90) | ||
width, height = rotated_image.get_size() | ||
self.assertEqual((width, height), (150, 200)) | ||
|
||
def test_rotate_without_multiple_of_90(self): | ||
with self.assertRaises(UnsupportedRotation) as e: | ||
rotated_image = self.image.rotate(45) | ||
|
||
def test_rotate_greater_than_360(self): | ||
# 450 should end up the same as a 90 rotation | ||
rotated_image = self.image.rotate(450) | ||
width, height = rotated_image.get_size() | ||
self.assertEqual((width, height), (150, 200)) | ||
|
||
def test_rotate_multiple_of_360(self): | ||
rotated_image = self.image.rotate(720) | ||
width, height = rotated_image.get_size() | ||
self.assertEqual((width, height), (200, 150)) | ||
|
||
def test_set_background_color_rgb(self): | ||
red_background_image = self.image.set_background_color_rgb((255, 0, 0)) | ||
self.assertFalse(red_background_image.has_alpha()) | ||
self.assertEqual(red_background_image.image.getpixel((10, 10)), (255, 0, 0)) | ||
|
||
def test_set_background_color_rgb_color_argument_check(self): | ||
with self.assertRaises(TypeError) as e: | ||
self.image.set_background_color_rgb('rgb(255, 0, 0)') | ||
|
||
self.assertEqual(str(e.exception), "the 'color' argument must be a 3-element tuple or list") | ||
|
||
def test_save_as_webm_vp9(self): | ||
output = io.BytesIO() | ||
return_value = self.image.save_as_webm_vp9(output) | ||
output.seek(0) | ||
|
||
probe_data = probe(output) | ||
|
||
self.assertEqual(probe_data['format']['format_name'], 'matroska,webm') | ||
self.assertEqual(probe_data['streams'][0]['codec_name'], 'vp9') | ||
self.assertIsInstance(return_value, WebMVP9ImageFile) | ||
self.assertEqual(return_value.f, output) | ||
|
||
def test_save_as_ogg_theora(self): | ||
output = io.BytesIO() | ||
return_value = self.image.save_as_ogg_theora(output) | ||
output.seek(0) | ||
|
||
probe_data = probe(output) | ||
|
||
self.assertEqual(probe_data['format']['format_name'], 'ogg') | ||
self.assertEqual(probe_data['streams'][0]['codec_name'], 'theora') | ||
self.assertIsInstance(return_value, OggTheoraImageFile) | ||
self.assertEqual(return_value.f, output) | ||
|
||
def test_save_as_mp4_h264(self): | ||
output = io.BytesIO() | ||
return_value = self. image.save_as_mp4_h264(output) | ||
output.seek(0) | ||
|
||
probe_data = probe(output) | ||
|
||
self.assertEqual(probe_data['format']['format_name'], 'mov,mp4,m4a,3gp,3g2,mj2') | ||
self.assertEqual(probe_data['streams'][0]['codec_name'], 'h264') | ||
self.assertIsInstance(return_value, MP4H264ImageFile) | ||
self.assertEqual(return_value.f, output) | ||
|
||
def test_has_alpha(self): | ||
has_alpha = self.image.has_alpha() | ||
self.assertFalse(has_alpha) | ||
|
||
def test_has_animation(self): | ||
has_animation = self.image.has_animation() | ||
self.assertTrue(has_animation) | ||
|
||
def test_transparent_gif(self): | ||
with open('tests/images/transparent.gif', 'rb') as f: | ||
image = FFMpegLazyVideo.open(GIFImageFile(f)) | ||
|
||
# Transparency not supported | ||
self.assertFalse(image.has_alpha()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,118 @@ | ||
import subprocess | ||
import os.path | ||
from itertools import product | ||
import json | ||
from tempfile import NamedTemporaryFile, TemporaryDirectory | ||
|
||
from willow.image import Image | ||
|
||
from willow.image import ( | ||
GIFImageFile, | ||
WebPImageFile, | ||
WebMVP9ImageFile, | ||
OggTheoraImageFile, | ||
MP4H264ImageFile, | ||
) | ||
|
||
|
||
def probe(file): | ||
with NamedTemporaryFile() as src: | ||
src.write(file.read()) | ||
result = subprocess.run(["ffprobe", "-show_format", "-show_streams", "-loglevel", "quiet", "-print_format", "json", src.name], capture_output=True) | ||
return json.loads(result.stdout) | ||
|
||
|
||
def transcode(source_file, output_file, crop_rect, output_resolution, format, codec): | ||
with NamedTemporaryFile() as src, TemporaryDirectory() as outdir: | ||
src.write(source_file.read()) | ||
|
||
args = ["ffmpeg", "-i", src.name, "-f", format, "-codec:v", codec] | ||
|
||
if crop_rect: | ||
pass | ||
|
||
if output_resolution: | ||
args += ["-s", f"{output_resolution[0]}x{output_resolution[1]}"] | ||
|
||
args.append(os.path.join(outdir, 'out')) | ||
|
||
subprocess.run(args) | ||
|
||
with open(os.path.join(outdir, 'out'), 'rb') as out: | ||
output_file.write(out.read()) | ||
|
||
|
||
class FFMpegLazyVideo(Image): | ||
def __init__(self, source_file, crop_rect=None, output_resolution=None): | ||
self.source_file = source_file | ||
self.crop_rect = crop_rect | ||
self.output_resolution = output_resolution | ||
|
||
@Image.operation | ||
def get_size(self): | ||
if self.output_resolution: | ||
return self.output_resolution | ||
|
||
# Find the size from the source file | ||
data = probe(self.source_file.f) | ||
for stream in data['streams']: | ||
if stream['codec_type'] == 'video': | ||
return stream['width'], stream['height'] | ||
|
||
@Image.operation | ||
def get_frame_count(self): | ||
# Find the frame count from the source file | ||
data = probe(self.source_file.f) | ||
for stream in data['streams']: | ||
if stream['codec_type'] == 'video': | ||
return int(stream['nb_frames']) | ||
|
||
@Image.operation | ||
def has_alpha(self): | ||
# Alpha not supported | ||
return False | ||
|
||
@Image.operation | ||
def has_animation(self): | ||
return True | ||
|
||
@Image.operation | ||
def resize(self, size): | ||
return FFMpegLazyVideo(self.source_file, self.crop_rect, size) | ||
|
||
@Image.operation | ||
def crop(self, rect): | ||
# TODO: Combine with existing rect | ||
return FFMpegLazyVideo(self.source_file, rect, self.output_resolution) | ||
|
||
@Image.operation | ||
def set_background_color_rgb(self, color): | ||
# Alpha not supported | ||
return self | ||
|
||
@classmethod | ||
@Image.converter_from(GIFImageFile) | ||
@Image.converter_from(WebPImageFile) | ||
@Image.converter_from(WebMVP9ImageFile) | ||
@Image.converter_from(OggTheoraImageFile) | ||
@Image.converter_from(MP4H264ImageFile) | ||
def open(cls, file): | ||
return cls(file) | ||
|
||
@Image.operation | ||
def save_as_webm_vp9(self, f): | ||
transcode(self.source_file.f, f, self.crop_rect, self.output_resolution, 'webm', 'libvpx-vp9') | ||
return WebMVP9ImageFile(f) | ||
|
||
@Image.operation | ||
def save_as_ogg_theora(self, f): | ||
transcode(self.source_file.f, f, self.crop_rect, self.output_resolution, 'ogg', 'libtheora') | ||
return OggTheoraImageFile(f) | ||
|
||
@Image.operation | ||
def save_as_mp4_h264(self, f): | ||
transcode(self.source_file.f, f, self.crop_rect, self.output_resolution, 'mp4', 'libx264') | ||
return MP4H264ImageFile(f) | ||
|
||
|
||
willow_image_classes = [FFMpegLazyVideo] |