Skip to content

Commit

Permalink
Add a conversion endpoint.
Browse files Browse the repository at this point in the history
This can run either as a local job or a remote job.  The rationale is
that for images that have multiple files, we do not have a reliable
mechanism for accessing all of the files in the remote job.
  • Loading branch information
manthey committed Mar 29, 2021
1 parent ffd6f63 commit 731fcc6
Show file tree
Hide file tree
Showing 4 changed files with 266 additions and 13 deletions.
65 changes: 61 additions & 4 deletions girder/girder_large_image/models/image_item.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@ def createImageItem(self, item, fileObj, user=None, token=None,
if 'fileId' in item.setdefault('largeImage', {}):
raise TileGeneralException('Item already has largeImage set.')
if fileObj['itemId'] != item['_id']:
raise TileGeneralException('The provided file must be in the '
'provided item.')
raise TileGeneralException(
'The provided file must be in the provided item.')
if (item['largeImage'].get('expected') is True and
'jobId' in item['largeImage']):
raise TileGeneralException('Item is scheduled to generate a '
'largeImage.')
raise TileGeneralException(
'Item is scheduled to generate a largeImage.')

item['largeImage'].pop('expected', None)
item['largeImage'].pop('sourceName', None)
Expand Down Expand Up @@ -111,6 +111,63 @@ def _createLargeImageJob(self, item, fileObj, user, token, **kwargs):
), countdown=int(kwargs['countdown']) if kwargs.get('countdown') else None)
return job.job

def convertImage(self, item, fileObj, user=None, token=None, localJob=True, **kwargs):
if fileObj['itemId'] != item['_id']:
raise TileGeneralException(
'The provided file must be in the provided item.')
if not localJob:
return self._convertImageViaWorker(item, fileObj, user, token, **kwargs)
# local job
job = Job().createLocalJob(
module='large_image_tasks.tasks',
function='convert_image_job',
kwargs={
'itemId': str(item['_id']),
'fileId': str(fileObj['_id']),
'userId': str(user['_id']) if user else None,
**kwargs,
},
title='Convert a file to a large image file.',
type='large_image_convert_image',
user=user,
public=True,
asynchronous=True,
)
Job().scheduleJob(job)
return job

def _convertImageViaWorker(
self, item, fileObj, user=None, token=None, folderId=None,
name=None, **kwargs):
import large_image_tasks.tasks
from girder_worker_utils.transforms.girder_io import GirderUploadToFolder
from girder_worker_utils.transforms.contrib.girder_io import GirderFileIdAllowDirect
from girder_worker_utils.transforms.common import TemporaryDirectory

try:
localPath = File().getLocalFilePath(fileObj)
except (FilePathException, AttributeError):
localPath = None
job = large_image_tasks.tasks.create_tiff.apply_async(kwargs=dict(
girder_job_title='TIFF Conversion: %s' % fileObj['name'],
girder_job_other_fields={'meta': {
'creator': 'large_image',
'itemId': str(item['_id']),
'task': 'convertImage',
}},
inputFile=GirderFileIdAllowDirect(str(fileObj['_id']), fileObj['name'], localPath),
inputName=fileObj['name'],
outputDir=TemporaryDirectory(),
girder_result_hooks=[
GirderUploadToFolder(
str(folderId if folderId else item['folderId']),
upload_kwargs=dict(filename=name),
),
],
**kwargs,
), countdown=int(kwargs['countdown']) if kwargs.get('countdown') else None)
return job.job

@classmethod
def _tileFromHash(cls, item, x, y, z, mayRedirect=False, **kwargs):
tileCache, tileCacheLock = getTileCache()
Expand Down
66 changes: 61 additions & 5 deletions girder/girder_large_image/rest/tiles.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
from girder.api.v1.item import Item as ItemResource
from girder.api.describe import autoDescribeRoute, describeRoute, Description
from girder.api.rest import filtermodel, loadmodel, setRawResponse, setResponseHeader
from girder.constants import AccessType
from girder.exceptions import RestException
from girder.models.model_base import AccessType
from girder.models.file import File
from girder.models.item import Item

Expand Down Expand Up @@ -99,6 +99,7 @@ def __init__(self, apiRoot):

self.resourceName = 'item'
apiRoot.item.route('POST', (':itemId', 'tiles'), self.createTiles)
apiRoot.item.route('POST', (':itemId', 'tiles', 'convert'), self.convertImage)
apiRoot.item.route('GET', (':itemId', 'tiles'), self.getTilesInfo)
apiRoot.item.route('DELETE', (':itemId', 'tiles'), self.deleteTiles)
apiRoot.item.route('GET', (':itemId', 'tiles', 'thumbnail'),
Expand Down Expand Up @@ -139,10 +140,9 @@ def __init__(self, apiRoot):

@describeRoute(
Description('Create a large image for this item.')
.param('itemId', 'The ID of the item.', paramType='path')
.param('fileId', 'The ID of the source file containing the image. '
'Required if there is more than one file in the item.',
required=False)
.param('itemId', 'The source item.', paramType='path')
.param('fileId', 'The source file containing the image. Required if '
'there is more than one file in the item.', required=False)
.param('force', 'Always use a job to create the large image.',
dataType='boolean', default=False, required=False)
.param('notify', 'If a job is required to create the large image, '
Expand Down Expand Up @@ -190,6 +190,62 @@ def createTiles(self, item, params):
except TileGeneralException as e:
raise RestException(e.args[0])

@describeRoute(
Description('Create a new large image item based on an existing item')
.notes('This can be used to make an item that is a different internal '
'format than the original item.')
.param('itemId', 'The source item.', paramType='path')
.param('fileId', 'The source file containing the image. Required if '
'there is more than one file in the item.', required=False)
.param('folderId', 'The destination folder.', required=False)
.param('name', 'A new name for the output item.', required=False)
.param('localJob', 'If true, run as a local job; if false, run via '
'the remote worker', dataType='boolean', default=True,
required=False)
.param('tileSize', 'Tile size', dataType='int', default=256,
required=False)
.param('frame', 'Single frame number. If the source is a multiframe '
'image and this value is specified, only the selected frame is '
'included in the result.', dataType='int', default=None,
required=False)
.param('compression', 'Internal compression format', required=False,
enum=['none', 'jpeg', 'deflate', 'lzw', 'zstd', 'packbits', 'webp', 'jp2k'])
.param('quality', 'JPEG compression quality where 0 is small and 100 '
'is highest quality', dataType='int', default=90,
required=False)
.param('level', 'Compression level for deflate (zip) or zstd.',
dataType='int', required=False)
.param('predictor', 'Predictor for deflate (zip) or lzw.',
required=False, enum=['none', 'horizontal', 'float', 'yes'])
.param('psnr', 'JP2K compression target peak-signal-to-noise-ratio '
'where 0 is lossless and otherwise higher numbers are higher '
'quality', dataType='int', required=False)
.param('cr', 'JP2K target compression ratio where 1 is lossless',
dataType='int', required=False)
)
@access.user
@loadmodel(model='item', map={'itemId': 'item'}, level=AccessType.READ)
@filtermodel(model='job', plugin='jobs')
def convertImage(self, item, params):
largeImageFileId = params.get('fileId')
if largeImageFileId is None:
files = list(Item().childFiles(item=item, limit=2))
if len(files) == 1:
largeImageFileId = str(files[0]['_id'])
if not largeImageFileId:
raise RestException('Missing "fileId" parameter.')
largeImageFile = File().load(largeImageFileId, force=True, exc=True)
user = self.getCurrentUser()
token = self.getCurrentToken()
params.pop('notify', None)
localJob = self.boolParam('localJob', params, default=True)
params.pop('localJob', None)
try:
return self.imageItemModel.convertImage(
item, largeImageFile, user, token, localJob=localJob, **params)
except TileGeneralException as e:
raise RestException(e.args[0])

@classmethod
def _parseTestParams(cls, params):
_adjustParams(params)
Expand Down
69 changes: 66 additions & 3 deletions girder/test_girder/test_tiles_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@
from girder.models.token import Token
from girder.models.user import User

from girder_jobs.constants import JobStatus
from girder_jobs.models.job import Job

from girder_large_image.models.image_item import ImageItem

from large_image import getTileSource
from girder_large_image import constants
from girder_large_image import getGirderTileSource
from girder_large_image import loadmodelcache
from girder_large_image.models.image_item import ImageItem

from . import girder_utilities as utilities

Expand Down Expand Up @@ -134,7 +134,7 @@ def _createTestTiles(server, admin, params=None, info=None, error=None):
return infoDict


def _postTileViaHttp(server, admin, itemId, fileId, jobAction=None, data=None):
def _postTileViaHttp(server, admin, itemId, fileId, jobAction=None, data=None, convert=False):
"""
When we know we need to process a job, we have to use an actual http
request rather than the normal simulated request to cherrypy. This is
Expand Down Expand Up @@ -1198,3 +1198,66 @@ def testTilesFromWithOptions(boundServer, admin, fsAssetstore, girderWorker):
assert tileMetadata['sizeX'] == 10000
assert tileMetadata['sizeY'] == 5000
assert tileMetadata['levels'] == 5


@pytest.mark.usefixtures('unbindLargeImage')
@pytest.mark.plugin('large_image')
def testTilesConvertLocal(boundServer, admin, fsAssetstore):
file = utilities.uploadTestFile('grey10kx5k.tif', admin, fsAssetstore)
itemId = str(file['itemId'])

headers = {
'Accept': 'application/json',
'Girder-Token': str(Token().createToken(admin)['_id'])
}
req = requests.post('http://127.0.0.1:%d/api/v1/item/%s/tiles/convert' % (
boundServer.boundPort, itemId), headers=headers)
assert req.status_code == 200
job = req.json()
while job['status'] not in (JobStatus.SUCCESS, JobStatus.ERROR, JobStatus.CANCELED):
time.sleep(0.1)
job = Job().load(job['_id'], force=True)
item = Item().findOne({'name': 'grey10kx5k.tiff'}, sort=[('created', SortDir.DESCENDING)])
itemId = item['_id']
tileMetadata = ImageItem().getMetadata(item)
assert tileMetadata['tileWidth'] == 256
assert tileMetadata['tileHeight'] == 256
assert tileMetadata['sizeX'] == 10000
assert tileMetadata['sizeY'] == 5000
assert tileMetadata['levels'] == 7
assert tileMetadata['magnification'] is None
assert tileMetadata['mm_x'] is None
assert tileMetadata['mm_y'] is None
_testTilesZXY(boundServer, admin, itemId, tileMetadata)


@pytest.mark.usefixtures('unbindLargeImage')
@pytest.mark.plugin('large_image')
def testTilesConvertRemote(boundServer, admin, fsAssetstore, girderWorker):
file = utilities.uploadTestFile('grey10kx5k.tif', admin, fsAssetstore)
itemId = str(file['itemId'])

headers = {
'Accept': 'application/json',
'Girder-Token': str(Token().createToken(admin)['_id'])
}
req = requests.post('http://127.0.0.1:%d/api/v1/item/%s/tiles/convert' % (
boundServer.boundPort, itemId), headers=headers,
data={'localJob': 'false'})
assert req.status_code == 200
job = req.json()
while job['status'] not in (JobStatus.SUCCESS, JobStatus.ERROR, JobStatus.CANCELED):
time.sleep(0.1)
job = Job().load(job['_id'], force=True)
item = Item().findOne({'name': 'grey10kx5k.tiff'}, sort=[('created', SortDir.DESCENDING)])
itemId = item['_id']
tileMetadata = ImageItem().getMetadata(item)
assert tileMetadata['tileWidth'] == 256
assert tileMetadata['tileHeight'] == 256
assert tileMetadata['sizeX'] == 10000
assert tileMetadata['sizeY'] == 5000
assert tileMetadata['levels'] == 7
assert tileMetadata['magnification'] is None
assert tileMetadata['mm_x'] is None
assert tileMetadata['mm_y'] is None
_testTilesZXY(boundServer, admin, itemId, tileMetadata)
79 changes: 78 additions & 1 deletion utilities/tasks/large_image_tasks/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@ def create_tiff(self, inputFile, outputName=None, outputDir=None, quality=90,
:param inputName: if no output name is specified, and this is specified,
this is used as the basis of the output name instead of extracting the
name from the inputFile path.
:returns: output path.
"""
import large_image_converter

logger = logging.getLogger('large-image-converter')
if not len(logger.handlers):
logger.addHandler(logging.StreamHandler(sys.stdout))
logger.setLevel(logging.INFO)
if not logger.level:
logger.setLevel(logging.INFO)

inputPath = os.path.abspath(os.path.expanduser(inputFile))
geospatial = large_image_converter.is_geospatial(inputPath)
Expand Down Expand Up @@ -69,3 +71,78 @@ def create_tiff(self, inputFile, outputName=None, outputDir=None, quality=90,
outputPath = renamePath
logger.info('Created a file of size %d' % os.path.getsize(outputPath))
return outputPath


class JobLogger(logging.Handler):
def __init__(self, level=logging.NOTSET, job=None, *args, **kwargs):
self._job = job
super().__init__(level=level, *args, **kwargs)

def emit(self, record):
from girder_jobs.models.job import Job

self._job = Job().updateJob(self._job, log=self.format(record).rstrip() + '\n')


def convert_image_job(job):
import psutil
import tempfile
from girder.constants import AccessType
from girder.models.file import File
from girder.models.folder import Folder
from girder.models.item import Item
from girder.models.upload import Upload
from girder.models.user import User
from girder_jobs.constants import JobStatus
from girder_jobs.models.job import Job

kwargs = job['kwargs']
item = Item().load(kwargs.pop('itemId'), force=True)
fileObj = File().load(kwargs.pop('fileId'), force=True)
userId = kwargs.pop('userId', None)
user = User().load(userId, force=True) if userId else None
folder = Folder().load(kwargs.pop('folderId', item['folderId']),
user=user, level=AccessType.WRITE)
name = kwargs.pop('name', None)
if '_concurrency' not in kwargs:
# Default to leaving some overhead for the main process, since this is
# running locally
kwargs['_concurrency'] = max(1, psutil.cpu_count(logical=True) - 2)

job = Job().updateJob(
job, log='Started large image conversion\n',
status=JobStatus.RUNNING)
logger = logging.getLogger('large-image-converter')
handler = JobLogger(job=job)
logger.addHandler(handler)
# We could increase the default logging level here
# logger.setLevel(logging.DEBUG)
try:
with tempfile.TemporaryDirectory() as tempdir:
dest = create_tiff(
inputFile=File().getLocalFilePath(fileObj),
inputName=fileObj['name'],
outputDir=tempdir,
**kwargs,
)
job = Job().updateJob(job, log='Storing result\n')
with open(dest, 'rb') as fobj:
Upload().uploadFromFile(
fobj,
size=os.path.getsize(dest),
name=name or os.path.basename(dest),
parentType='folder',
parent=folder,
user=user,
)
except Exception as exc:
status = JobStatus.ERROR
logger.exception('Failed in large image conversion')
job = Job().updateJob(
job, log='Failed in large image conversion (%s)\n' % exc, status=status)
else:
status = JobStatus.SUCCESS
job = Job().updateJob(
job, log='Finished large image conversion\n', status=status)
finally:
logger.removeHandler(handler)

0 comments on commit 731fcc6

Please sign in to comment.