From a355beb6bb63c638eed581f7456efb664dd371c7 Mon Sep 17 00:00:00 2001 From: Christian Thomas Date: Fri, 17 Nov 2023 14:01:39 -0500 Subject: [PATCH] Testing multi-scale tiling --- .../cloud_run_orchestrator/clients.py | 14 ++++--- .../cloud_run_orchestrator/handler.py | 39 ++++++++++++++++++- 2 files changed, 46 insertions(+), 7 deletions(-) diff --git a/cerulean_cloud/cloud_run_orchestrator/clients.py b/cerulean_cloud/cloud_run_orchestrator/clients.py index 5838b8d6..863f5b25 100644 --- a/cerulean_cloud/cloud_run_orchestrator/clients.py +++ b/cerulean_cloud/cloud_run_orchestrator/clients.py @@ -68,14 +68,16 @@ def __init__( self.scale = scale # 1=256, 2=512, 3=... self.inference_parms = inference_parms + # XXX_CT reworking inference to accept scale as an argument async def get_base_tile_inference( - self, tile: morecantile.Tile, rescale=(0, 255) + self, tile: morecantile.Tile, rescale=(0, 255), scale=None ) -> InferenceResultStack: """fetch inference for base tiles""" + scale = scale or self.scale img_array = await self.titiler_client.get_base_tile( sceneid=self.sceneid, tile=tile, - scale=self.scale, + scale=scale, rescale=rescale, ) @@ -105,17 +107,19 @@ async def get_base_tile_inference( f"XXX Received unexpected status code: {res.status_code} {res.content}" ) + # XXX_CT reworking inference to accept scale as an argument async def get_offset_tile_inference( - self, bounds: List[float], rescale=(0, 255) + self, bounds: List[float], rescale=(0, 255), scale=None ) -> InferenceResultStack: """fetch inference for offset tiles""" - hw = self.scale * 256 + scale = scale or self.scale + hw = scale * 256 img_array = await self.titiler_client.get_offset_tile( self.sceneid, *bounds, width=hw, height=hw, - scale=self.scale, + scale=scale, rescale=rescale, ) img_array = reshape_as_raster(img_array) diff --git a/cerulean_cloud/cloud_run_orchestrator/handler.py b/cerulean_cloud/cloud_run_orchestrator/handler.py index b5feeb95..c9527173 100644 --- a/cerulean_cloud/cloud_run_orchestrator/handler.py +++ b/cerulean_cloud/cloud_run_orchestrator/handler.py @@ -244,6 +244,7 @@ def flatten_feature_list( return flat_list +# XXX_CT reworking perform_inference to accept scale as an argument async def perform_inference(tiles, inference_func, description): """ Perform inference on a set of tiles asynchronously. @@ -262,7 +263,7 @@ async def perform_inference(tiles, inference_func, description): """ print(f"Inference on {description}!") inferences = await asyncio.gather( - *[inference_func(tile, rescale=(0, 255)) for tile in tiles], + *[inference_func(tile, rescale=(0, 255), scale=scale) for tile in tiles], return_exceptions=False, # This raises exceptions ) return inferences @@ -303,7 +304,9 @@ async def _orchestrate( print(f"{start_time}: scene_stats: {scene_stats}") print(f"{start_time}: scene_info: {scene_info}") + # XXX_CT prepping low-res tiles base_tiles = list(tiler.tiles(*scene_bounds, [zoom], truncate=False)) + lowres_tiles = list(tiler.tiles(*scene_bounds, [zoom], truncate=False)) # base_tiles_bounds = [tiler.bounds(t) for t in base_tiles] # base_group_bounds = group_bounds_from_list_of_bounds(base_tiles_bounds) @@ -325,7 +328,9 @@ async def _orchestrate( # Filter out land tiles # XXXBUG is_tile_over_water throws ValueError if the scene crosses or is close to the antimeridian. Example: S1A_IW_GRDH_1SDV_20230726T183302_20230726T183327_049598_05F6CA_31E7 # XXXBUG is_tile_over_water throws IndexError if the scene touches the Caspian sea (globe says it is NOT ocean, whereas our cloud_function_scene_relevancy says it is). Example: S1A_IW_GRDH_1SDV_20230727T025332_20230727T025357_049603_05F6F2_AF3E + # XXX_CT prepping low-res tiles base_tiles = [t for t in base_tiles if is_tile_over_water(tiler.bounds(t))] + lowres_tiles = [t for t in lowres_tiles if is_tile_over_water(tiler.bounds(t))] offset_tiles_bounds = [b for b in offset_tiles_bounds if is_tile_over_water(b)] offset_2_tiles_bounds = [b for b in offset_2_tiles_bounds if is_tile_over_water(b)] @@ -398,26 +403,43 @@ async def _orchestrate( inference_parms=inference_parms, ) + # XXX_CT perform_inference to accept scale as an argument base_tiles_inference = await perform_inference( base_tiles, cloud_run_inference.get_base_tile_inference, f"base tiles: {start_time}", + scale, ) offset_tiles_inference = await perform_inference( offset_tiles_bounds, cloud_run_inference.get_offset_tile_inference, f"offset tiles: {start_time}", + scale, ) offset_2_tiles_inference = await perform_inference( offset_2_tiles_bounds, cloud_run_inference.get_offset_tile_inference, f"offset2 tiles: {start_time}", + scale, ) + + # XXX_CT perform_inference on lowres_tiles, accept scale as an arg + lowres_tiles_inference = await perform_inference( + lowres_tiles, + cloud_run_inference.get_base_tile_inference, + f"base tiles: {start_time}", + scale, + ) + del base_tiles del offset_tiles_bounds + # XXX_CT >> Deleting lowres_tiles, question for JR, do offset_2_tiles_bounds also need to be deleted?? + del offset_2_tiles_bounds + del lowres_tiles + # XXX_CT adding lowres, also deleting at the end. Same question for JR but about deleting offset_2_tiles_inference if model.type == "MASKRCNN": out_fc = geojson.FeatureCollection( features=flatten_feature_list(base_tiles_inference) @@ -428,8 +450,14 @@ async def _orchestrate( out_fc_offset_2 = geojson.FeatureCollection( features=flatten_feature_list(offset_2_tiles_inference) ) + out_fc_lowres = geojson.FeatureCollection( + features=flatten_feature_list(lowres_tiles_inference) + ) del base_tiles_inference del offset_tiles_inference + del offset_2_tiles_inference + del lowres_tiles_inference + elif model.type == "UNET": # print("Loading all tiles into memory for merge!") # ds_base_tiles = [] @@ -487,8 +515,14 @@ async def _orchestrate( "Model_type must be one of ['MASKRCNN', 'UNET']" ) + # XXX_CT adding out_fc_lowres merged_inferences = merge_inferences( - feature_collections=[out_fc, out_fc_offset, out_fc_offset_2], + feature_collections=[ + out_fc, + out_fc_offset, + out_fc_offset_2, + out_fc_lowres, + ], proximity_meters=None, closing_meters=None, opening_meters=None, @@ -556,6 +590,7 @@ async def _orchestrate( del out_fc del out_fc_offset del out_fc_offset_2 + del out_fc_lowres else: print(f"{start_time}: WARNING: Operating as a DRY RUN!!") return OrchestratorResult(status="Success")