Skip to content

Commit

Permalink
Testing multi-scale tiling
Browse files Browse the repository at this point in the history
  • Loading branch information
cjthomas730 committed Nov 17, 2023
1 parent 9c066dc commit a355beb
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 7 deletions.
14 changes: 9 additions & 5 deletions cerulean_cloud/cloud_run_orchestrator/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,16 @@ def __init__(
self.scale = scale # 1=256, 2=512, 3=...
self.inference_parms = inference_parms

# XXX_CT reworking inference to accept scale as an argument
async def get_base_tile_inference(
self, tile: morecantile.Tile, rescale=(0, 255)
self, tile: morecantile.Tile, rescale=(0, 255), scale=None
) -> InferenceResultStack:
"""fetch inference for base tiles"""
scale = scale or self.scale
img_array = await self.titiler_client.get_base_tile(
sceneid=self.sceneid,
tile=tile,
scale=self.scale,
scale=scale,
rescale=rescale,
)

Expand Down Expand Up @@ -105,17 +107,19 @@ async def get_base_tile_inference(
f"XXX Received unexpected status code: {res.status_code} {res.content}"
)

# XXX_CT reworking inference to accept scale as an argument
async def get_offset_tile_inference(
self, bounds: List[float], rescale=(0, 255)
self, bounds: List[float], rescale=(0, 255), scale=None
) -> InferenceResultStack:
"""fetch inference for offset tiles"""
hw = self.scale * 256
scale = scale or self.scale
hw = scale * 256
img_array = await self.titiler_client.get_offset_tile(
self.sceneid,
*bounds,
width=hw,
height=hw,
scale=self.scale,
scale=scale,
rescale=rescale,
)
img_array = reshape_as_raster(img_array)
Expand Down
39 changes: 37 additions & 2 deletions cerulean_cloud/cloud_run_orchestrator/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ def flatten_feature_list(
return flat_list


# XXX_CT reworking perform_inference to accept scale as an argument
async def perform_inference(tiles, inference_func, description):
"""
Perform inference on a set of tiles asynchronously.
Expand All @@ -262,7 +263,7 @@ async def perform_inference(tiles, inference_func, description):
"""
print(f"Inference on {description}!")
inferences = await asyncio.gather(
*[inference_func(tile, rescale=(0, 255)) for tile in tiles],
*[inference_func(tile, rescale=(0, 255), scale=scale) for tile in tiles],
return_exceptions=False, # This raises exceptions
)
return inferences
Expand Down Expand Up @@ -303,7 +304,9 @@ async def _orchestrate(
print(f"{start_time}: scene_stats: {scene_stats}")
print(f"{start_time}: scene_info: {scene_info}")

# XXX_CT prepping low-res tiles
base_tiles = list(tiler.tiles(*scene_bounds, [zoom], truncate=False))
lowres_tiles = list(tiler.tiles(*scene_bounds, [zoom], truncate=False))
# base_tiles_bounds = [tiler.bounds(t) for t in base_tiles]
# base_group_bounds = group_bounds_from_list_of_bounds(base_tiles_bounds)

Expand All @@ -325,7 +328,9 @@ async def _orchestrate(
# Filter out land tiles
# XXXBUG is_tile_over_water throws ValueError if the scene crosses or is close to the antimeridian. Example: S1A_IW_GRDH_1SDV_20230726T183302_20230726T183327_049598_05F6CA_31E7
# XXXBUG is_tile_over_water throws IndexError if the scene touches the Caspian sea (globe says it is NOT ocean, whereas our cloud_function_scene_relevancy says it is). Example: S1A_IW_GRDH_1SDV_20230727T025332_20230727T025357_049603_05F6F2_AF3E
# XXX_CT prepping low-res tiles
base_tiles = [t for t in base_tiles if is_tile_over_water(tiler.bounds(t))]
lowres_tiles = [t for t in lowres_tiles if is_tile_over_water(tiler.bounds(t))]

offset_tiles_bounds = [b for b in offset_tiles_bounds if is_tile_over_water(b)]
offset_2_tiles_bounds = [b for b in offset_2_tiles_bounds if is_tile_over_water(b)]
Expand Down Expand Up @@ -398,26 +403,43 @@ async def _orchestrate(
inference_parms=inference_parms,
)

# XXX_CT perform_inference to accept scale as an argument
base_tiles_inference = await perform_inference(
base_tiles,
cloud_run_inference.get_base_tile_inference,
f"base tiles: {start_time}",
scale,
)

offset_tiles_inference = await perform_inference(
offset_tiles_bounds,
cloud_run_inference.get_offset_tile_inference,
f"offset tiles: {start_time}",
scale,
)

offset_2_tiles_inference = await perform_inference(
offset_2_tiles_bounds,
cloud_run_inference.get_offset_tile_inference,
f"offset2 tiles: {start_time}",
scale,
)

# XXX_CT perform_inference on lowres_tiles, accept scale as an arg
lowres_tiles_inference = await perform_inference(
lowres_tiles,
cloud_run_inference.get_base_tile_inference,
f"base tiles: {start_time}",
scale,
)

del base_tiles
del offset_tiles_bounds
# XXX_CT >> Deleting lowres_tiles, question for JR, do offset_2_tiles_bounds also need to be deleted??
del offset_2_tiles_bounds
del lowres_tiles

# XXX_CT adding lowres, also deleting at the end. Same question for JR but about deleting offset_2_tiles_inference
if model.type == "MASKRCNN":
out_fc = geojson.FeatureCollection(
features=flatten_feature_list(base_tiles_inference)
Expand All @@ -428,8 +450,14 @@ async def _orchestrate(
out_fc_offset_2 = geojson.FeatureCollection(
features=flatten_feature_list(offset_2_tiles_inference)
)
out_fc_lowres = geojson.FeatureCollection(
features=flatten_feature_list(lowres_tiles_inference)
)
del base_tiles_inference
del offset_tiles_inference
del offset_2_tiles_inference
del lowres_tiles_inference

elif model.type == "UNET":
# print("Loading all tiles into memory for merge!")
# ds_base_tiles = []
Expand Down Expand Up @@ -487,8 +515,14 @@ async def _orchestrate(
"Model_type must be one of ['MASKRCNN', 'UNET']"
)

# XXX_CT adding out_fc_lowres
merged_inferences = merge_inferences(
feature_collections=[out_fc, out_fc_offset, out_fc_offset_2],
feature_collections=[
out_fc,
out_fc_offset,
out_fc_offset_2,
out_fc_lowres,
],
proximity_meters=None,
closing_meters=None,
opening_meters=None,
Expand Down Expand Up @@ -556,6 +590,7 @@ async def _orchestrate(
del out_fc
del out_fc_offset
del out_fc_offset_2
del out_fc_lowres
else:
print(f"{start_time}: WARNING: Operating as a DRY RUN!!")
return OrchestratorResult(status="Success")

0 comments on commit a355beb

Please sign in to comment.