From 731fcc6f4be8b7d20cf23a2d027e82ff1ca0b29b Mon Sep 17 00:00:00 2001 From: David Manthey Date: Wed, 17 Mar 2021 13:20:43 -0400 Subject: [PATCH] Add a conversion endpoint. This can run either as a local job or a remote job. The rationale is that for images that have multiple files, we do not have a reliable mechanism for accessing all of the files in the remote job. --- .../girder_large_image/models/image_item.py | 65 ++++++++++++++- girder/girder_large_image/rest/tiles.py | 66 ++++++++++++++-- girder/test_girder/test_tiles_rest.py | 69 +++++++++++++++- utilities/tasks/large_image_tasks/tasks.py | 79 ++++++++++++++++++- 4 files changed, 266 insertions(+), 13 deletions(-) diff --git a/girder/girder_large_image/models/image_item.py b/girder/girder_large_image/models/image_item.py index 4e4365f8c..da1135e73 100644 --- a/girder/girder_large_image/models/image_item.py +++ b/girder/girder_large_image/models/image_item.py @@ -55,12 +55,12 @@ def createImageItem(self, item, fileObj, user=None, token=None, if 'fileId' in item.setdefault('largeImage', {}): raise TileGeneralException('Item already has largeImage set.') if fileObj['itemId'] != item['_id']: - raise TileGeneralException('The provided file must be in the ' - 'provided item.') + raise TileGeneralException( + 'The provided file must be in the provided item.') if (item['largeImage'].get('expected') is True and 'jobId' in item['largeImage']): - raise TileGeneralException('Item is scheduled to generate a ' - 'largeImage.') + raise TileGeneralException( + 'Item is scheduled to generate a largeImage.') item['largeImage'].pop('expected', None) item['largeImage'].pop('sourceName', None) @@ -111,6 +111,63 @@ def _createLargeImageJob(self, item, fileObj, user, token, **kwargs): ), countdown=int(kwargs['countdown']) if kwargs.get('countdown') else None) return job.job + def convertImage(self, item, fileObj, user=None, token=None, localJob=True, **kwargs): + if fileObj['itemId'] != item['_id']: + raise TileGeneralException( + 'The provided file must be in the provided item.') + if not localJob: + return self._convertImageViaWorker(item, fileObj, user, token, **kwargs) + # local job + job = Job().createLocalJob( + module='large_image_tasks.tasks', + function='convert_image_job', + kwargs={ + 'itemId': str(item['_id']), + 'fileId': str(fileObj['_id']), + 'userId': str(user['_id']) if user else None, + **kwargs, + }, + title='Convert a file to a large image file.', + type='large_image_convert_image', + user=user, + public=True, + asynchronous=True, + ) + Job().scheduleJob(job) + return job + + def _convertImageViaWorker( + self, item, fileObj, user=None, token=None, folderId=None, + name=None, **kwargs): + import large_image_tasks.tasks + from girder_worker_utils.transforms.girder_io import GirderUploadToFolder + from girder_worker_utils.transforms.contrib.girder_io import GirderFileIdAllowDirect + from girder_worker_utils.transforms.common import TemporaryDirectory + + try: + localPath = File().getLocalFilePath(fileObj) + except (FilePathException, AttributeError): + localPath = None + job = large_image_tasks.tasks.create_tiff.apply_async(kwargs=dict( + girder_job_title='TIFF Conversion: %s' % fileObj['name'], + girder_job_other_fields={'meta': { + 'creator': 'large_image', + 'itemId': str(item['_id']), + 'task': 'convertImage', + }}, + inputFile=GirderFileIdAllowDirect(str(fileObj['_id']), fileObj['name'], localPath), + inputName=fileObj['name'], + outputDir=TemporaryDirectory(), + girder_result_hooks=[ + GirderUploadToFolder( + str(folderId if folderId else item['folderId']), + upload_kwargs=dict(filename=name), + ), + ], + **kwargs, + ), countdown=int(kwargs['countdown']) if kwargs.get('countdown') else None) + return job.job + @classmethod def _tileFromHash(cls, item, x, y, z, mayRedirect=False, **kwargs): tileCache, tileCacheLock = getTileCache() diff --git a/girder/girder_large_image/rest/tiles.py b/girder/girder_large_image/rest/tiles.py index 29b1443a3..90c479a67 100644 --- a/girder/girder_large_image/rest/tiles.py +++ b/girder/girder_large_image/rest/tiles.py @@ -25,8 +25,8 @@ from girder.api.v1.item import Item as ItemResource from girder.api.describe import autoDescribeRoute, describeRoute, Description from girder.api.rest import filtermodel, loadmodel, setRawResponse, setResponseHeader +from girder.constants import AccessType from girder.exceptions import RestException -from girder.models.model_base import AccessType from girder.models.file import File from girder.models.item import Item @@ -99,6 +99,7 @@ def __init__(self, apiRoot): self.resourceName = 'item' apiRoot.item.route('POST', (':itemId', 'tiles'), self.createTiles) + apiRoot.item.route('POST', (':itemId', 'tiles', 'convert'), self.convertImage) apiRoot.item.route('GET', (':itemId', 'tiles'), self.getTilesInfo) apiRoot.item.route('DELETE', (':itemId', 'tiles'), self.deleteTiles) apiRoot.item.route('GET', (':itemId', 'tiles', 'thumbnail'), @@ -139,10 +140,9 @@ def __init__(self, apiRoot): @describeRoute( Description('Create a large image for this item.') - .param('itemId', 'The ID of the item.', paramType='path') - .param('fileId', 'The ID of the source file containing the image. ' - 'Required if there is more than one file in the item.', - required=False) + .param('itemId', 'The source item.', paramType='path') + .param('fileId', 'The source file containing the image. Required if ' + 'there is more than one file in the item.', required=False) .param('force', 'Always use a job to create the large image.', dataType='boolean', default=False, required=False) .param('notify', 'If a job is required to create the large image, ' @@ -190,6 +190,62 @@ def createTiles(self, item, params): except TileGeneralException as e: raise RestException(e.args[0]) + @describeRoute( + Description('Create a new large image item based on an existing item') + .notes('This can be used to make an item that is a different internal ' + 'format than the original item.') + .param('itemId', 'The source item.', paramType='path') + .param('fileId', 'The source file containing the image. Required if ' + 'there is more than one file in the item.', required=False) + .param('folderId', 'The destination folder.', required=False) + .param('name', 'A new name for the output item.', required=False) + .param('localJob', 'If true, run as a local job; if false, run via ' + 'the remote worker', dataType='boolean', default=True, + required=False) + .param('tileSize', 'Tile size', dataType='int', default=256, + required=False) + .param('frame', 'Single frame number. If the source is a multiframe ' + 'image and this value is specified, only the selected frame is ' + 'included in the result.', dataType='int', default=None, + required=False) + .param('compression', 'Internal compression format', required=False, + enum=['none', 'jpeg', 'deflate', 'lzw', 'zstd', 'packbits', 'webp', 'jp2k']) + .param('quality', 'JPEG compression quality where 0 is small and 100 ' + 'is highest quality', dataType='int', default=90, + required=False) + .param('level', 'Compression level for deflate (zip) or zstd.', + dataType='int', required=False) + .param('predictor', 'Predictor for deflate (zip) or lzw.', + required=False, enum=['none', 'horizontal', 'float', 'yes']) + .param('psnr', 'JP2K compression target peak-signal-to-noise-ratio ' + 'where 0 is lossless and otherwise higher numbers are higher ' + 'quality', dataType='int', required=False) + .param('cr', 'JP2K target compression ratio where 1 is lossless', + dataType='int', required=False) + ) + @access.user + @loadmodel(model='item', map={'itemId': 'item'}, level=AccessType.READ) + @filtermodel(model='job', plugin='jobs') + def convertImage(self, item, params): + largeImageFileId = params.get('fileId') + if largeImageFileId is None: + files = list(Item().childFiles(item=item, limit=2)) + if len(files) == 1: + largeImageFileId = str(files[0]['_id']) + if not largeImageFileId: + raise RestException('Missing "fileId" parameter.') + largeImageFile = File().load(largeImageFileId, force=True, exc=True) + user = self.getCurrentUser() + token = self.getCurrentToken() + params.pop('notify', None) + localJob = self.boolParam('localJob', params, default=True) + params.pop('localJob', None) + try: + return self.imageItemModel.convertImage( + item, largeImageFile, user, token, localJob=localJob, **params) + except TileGeneralException as e: + raise RestException(e.args[0]) + @classmethod def _parseTestParams(cls, params): _adjustParams(params) diff --git a/girder/test_girder/test_tiles_rest.py b/girder/test_girder/test_tiles_rest.py index 487e3d58a..aa8f33c69 100644 --- a/girder/test_girder/test_tiles_rest.py +++ b/girder/test_girder/test_tiles_rest.py @@ -14,14 +14,14 @@ from girder.models.token import Token from girder.models.user import User +from girder_jobs.constants import JobStatus from girder_jobs.models.job import Job -from girder_large_image.models.image_item import ImageItem - from large_image import getTileSource from girder_large_image import constants from girder_large_image import getGirderTileSource from girder_large_image import loadmodelcache +from girder_large_image.models.image_item import ImageItem from . import girder_utilities as utilities @@ -134,7 +134,7 @@ def _createTestTiles(server, admin, params=None, info=None, error=None): return infoDict -def _postTileViaHttp(server, admin, itemId, fileId, jobAction=None, data=None): +def _postTileViaHttp(server, admin, itemId, fileId, jobAction=None, data=None, convert=False): """ When we know we need to process a job, we have to use an actual http request rather than the normal simulated request to cherrypy. This is @@ -1198,3 +1198,66 @@ def testTilesFromWithOptions(boundServer, admin, fsAssetstore, girderWorker): assert tileMetadata['sizeX'] == 10000 assert tileMetadata['sizeY'] == 5000 assert tileMetadata['levels'] == 5 + + +@pytest.mark.usefixtures('unbindLargeImage') +@pytest.mark.plugin('large_image') +def testTilesConvertLocal(boundServer, admin, fsAssetstore): + file = utilities.uploadTestFile('grey10kx5k.tif', admin, fsAssetstore) + itemId = str(file['itemId']) + + headers = { + 'Accept': 'application/json', + 'Girder-Token': str(Token().createToken(admin)['_id']) + } + req = requests.post('http://127.0.0.1:%d/api/v1/item/%s/tiles/convert' % ( + boundServer.boundPort, itemId), headers=headers) + assert req.status_code == 200 + job = req.json() + while job['status'] not in (JobStatus.SUCCESS, JobStatus.ERROR, JobStatus.CANCELED): + time.sleep(0.1) + job = Job().load(job['_id'], force=True) + item = Item().findOne({'name': 'grey10kx5k.tiff'}, sort=[('created', SortDir.DESCENDING)]) + itemId = item['_id'] + tileMetadata = ImageItem().getMetadata(item) + assert tileMetadata['tileWidth'] == 256 + assert tileMetadata['tileHeight'] == 256 + assert tileMetadata['sizeX'] == 10000 + assert tileMetadata['sizeY'] == 5000 + assert tileMetadata['levels'] == 7 + assert tileMetadata['magnification'] is None + assert tileMetadata['mm_x'] is None + assert tileMetadata['mm_y'] is None + _testTilesZXY(boundServer, admin, itemId, tileMetadata) + + +@pytest.mark.usefixtures('unbindLargeImage') +@pytest.mark.plugin('large_image') +def testTilesConvertRemote(boundServer, admin, fsAssetstore, girderWorker): + file = utilities.uploadTestFile('grey10kx5k.tif', admin, fsAssetstore) + itemId = str(file['itemId']) + + headers = { + 'Accept': 'application/json', + 'Girder-Token': str(Token().createToken(admin)['_id']) + } + req = requests.post('http://127.0.0.1:%d/api/v1/item/%s/tiles/convert' % ( + boundServer.boundPort, itemId), headers=headers, + data={'localJob': 'false'}) + assert req.status_code == 200 + job = req.json() + while job['status'] not in (JobStatus.SUCCESS, JobStatus.ERROR, JobStatus.CANCELED): + time.sleep(0.1) + job = Job().load(job['_id'], force=True) + item = Item().findOne({'name': 'grey10kx5k.tiff'}, sort=[('created', SortDir.DESCENDING)]) + itemId = item['_id'] + tileMetadata = ImageItem().getMetadata(item) + assert tileMetadata['tileWidth'] == 256 + assert tileMetadata['tileHeight'] == 256 + assert tileMetadata['sizeX'] == 10000 + assert tileMetadata['sizeY'] == 5000 + assert tileMetadata['levels'] == 7 + assert tileMetadata['magnification'] is None + assert tileMetadata['mm_x'] is None + assert tileMetadata['mm_y'] is None + _testTilesZXY(boundServer, admin, itemId, tileMetadata) diff --git a/utilities/tasks/large_image_tasks/tasks.py b/utilities/tasks/large_image_tasks/tasks.py index a027c33a4..afd3c7d0c 100644 --- a/utilities/tasks/large_image_tasks/tasks.py +++ b/utilities/tasks/large_image_tasks/tasks.py @@ -34,13 +34,15 @@ def create_tiff(self, inputFile, outputName=None, outputDir=None, quality=90, :param inputName: if no output name is specified, and this is specified, this is used as the basis of the output name instead of extracting the name from the inputFile path. + :returns: output path. """ import large_image_converter logger = logging.getLogger('large-image-converter') if not len(logger.handlers): logger.addHandler(logging.StreamHandler(sys.stdout)) - logger.setLevel(logging.INFO) + if not logger.level: + logger.setLevel(logging.INFO) inputPath = os.path.abspath(os.path.expanduser(inputFile)) geospatial = large_image_converter.is_geospatial(inputPath) @@ -69,3 +71,78 @@ def create_tiff(self, inputFile, outputName=None, outputDir=None, quality=90, outputPath = renamePath logger.info('Created a file of size %d' % os.path.getsize(outputPath)) return outputPath + + +class JobLogger(logging.Handler): + def __init__(self, level=logging.NOTSET, job=None, *args, **kwargs): + self._job = job + super().__init__(level=level, *args, **kwargs) + + def emit(self, record): + from girder_jobs.models.job import Job + + self._job = Job().updateJob(self._job, log=self.format(record).rstrip() + '\n') + + +def convert_image_job(job): + import psutil + import tempfile + from girder.constants import AccessType + from girder.models.file import File + from girder.models.folder import Folder + from girder.models.item import Item + from girder.models.upload import Upload + from girder.models.user import User + from girder_jobs.constants import JobStatus + from girder_jobs.models.job import Job + + kwargs = job['kwargs'] + item = Item().load(kwargs.pop('itemId'), force=True) + fileObj = File().load(kwargs.pop('fileId'), force=True) + userId = kwargs.pop('userId', None) + user = User().load(userId, force=True) if userId else None + folder = Folder().load(kwargs.pop('folderId', item['folderId']), + user=user, level=AccessType.WRITE) + name = kwargs.pop('name', None) + if '_concurrency' not in kwargs: + # Default to leaving some overhead for the main process, since this is + # running locally + kwargs['_concurrency'] = max(1, psutil.cpu_count(logical=True) - 2) + + job = Job().updateJob( + job, log='Started large image conversion\n', + status=JobStatus.RUNNING) + logger = logging.getLogger('large-image-converter') + handler = JobLogger(job=job) + logger.addHandler(handler) + # We could increase the default logging level here + # logger.setLevel(logging.DEBUG) + try: + with tempfile.TemporaryDirectory() as tempdir: + dest = create_tiff( + inputFile=File().getLocalFilePath(fileObj), + inputName=fileObj['name'], + outputDir=tempdir, + **kwargs, + ) + job = Job().updateJob(job, log='Storing result\n') + with open(dest, 'rb') as fobj: + Upload().uploadFromFile( + fobj, + size=os.path.getsize(dest), + name=name or os.path.basename(dest), + parentType='folder', + parent=folder, + user=user, + ) + except Exception as exc: + status = JobStatus.ERROR + logger.exception('Failed in large image conversion') + job = Job().updateJob( + job, log='Failed in large image conversion (%s)\n' % exc, status=status) + else: + status = JobStatus.SUCCESS + job = Job().updateJob( + job, log='Finished large image conversion\n', status=status) + finally: + logger.removeHandler(handler)