diff --git a/lib/inat_inferrer.py b/lib/inat_inferrer.py index 9aa6aa1..ec1a690 100644 --- a/lib/inat_inferrer.py +++ b/lib/inat_inferrer.py @@ -1,5 +1,4 @@ import time -import magic import tensorflow as tf import pandas as pd import h3 @@ -8,6 +7,14 @@ import os import tifffile import numpy as np +import urllib +import hashlib +import magic +import aiohttp +import aiofiles +import aiofiles.os +import asyncio + from PIL import Image from lib.tf_gp_elev_model import TFGeoPriorModelElev from lib.vision_inferrer import VisionInferrer @@ -610,6 +617,67 @@ def limit_leaf_scores_that_include_humans(self, leaf_scores): # otherwise return no results return leaf_scores.head(0) + async def embeddings_for_photos(self, photos): + response = {} + async with aiohttp.ClientSession() as session: + queue = asyncio.Queue() + workers = [asyncio.create_task(self.embeddings_worker_task(queue, response, session)) + for _ in range(5)] + for photo in photos: + queue.put_nowait(photo) + await queue.join() + for worker in workers: + worker.cancel() + return response + + async def embeddings_worker_task(self, queue, response, session): + while not queue.empty(): + photo = await queue.get() + try: + embedding = await self.embedding_for_photo(photo["url"], session) + response[photo["id"]] = embedding + finally: + queue.task_done() + + async def embedding_for_photo(self, url, session): + if url is None: + return + + try: + cache_path = await self.download_photo_async(url, session) + if cache_path is None: + return + return self.signature_for_image(cache_path) + except urllib.error.HTTPError: + return + + def signature_for_image(self, image_path): + image = InatInferrer.prepare_image_for_inference(image_path) + return self.vision_inferrer.signature_for_image(image).tolist() + + async def download_photo_async(self, url, session): + checksum = hashlib.md5(url.encode()).hexdigest() + cache_path = os.path.join(self.upload_folder, "download-" + checksum) + ".jpg" + if await aiofiles.os.path.exists(cache_path): + return cache_path + try: + async with session.get(url, timeout=10) as resp: + if resp.status == 200: + f = await aiofiles.open(cache_path, mode="wb") + await f.write(await resp.read()) + await f.close() + except asyncio.TimeoutError as e: + print("`download_photo_async` timed out") + print(e) + if not os.path.exists(cache_path): + return + mime_type = magic.from_file(cache_path, mime=True) + if mime_type != "image/jpeg": + im = Image.open(cache_path) + rgb_im = im.convert("RGB") + rgb_im.save(cache_path) + return cache_path + @staticmethod def prepare_image_for_inference(file_path): image = Image.open(file_path) diff --git a/lib/inat_vision_api.py b/lib/inat_vision_api.py index 3dde5fe..6021224 100644 --- a/lib/inat_vision_api.py +++ b/lib/inat_vision_api.py @@ -30,6 +30,8 @@ def __init__(self, config): self.h3_04_bounds_route, methods=["GET"]) self.app.add_url_rule("/geo_scores_for_taxa", "geo_scores_for_taxa", self.geo_scores_for_taxa_route, methods=["POST"]) + self.app.add_url_rule("/embeddings_for_photos", "embeddings_for_photos", + self.embeddings_for_photos_route, methods=["POST"]) self.app.add_url_rule("/build_info", "build_info", self.build_info_route, methods=["GET"]) def setup_inferrer(self, config): @@ -96,6 +98,12 @@ def geo_scores_for_taxa_route(self): for obs in request.json["observations"] } + async def embeddings_for_photos_route(self): + start_time = time.time() + response = await self.inferrer.embeddings_for_photos(request.json["photos"]) + print("embeddings_for_photos_route Time: %0.2fms" % ((time.time() - start_time) * 1000.)) + return response + def index_route(self): form = ImageForm() if "observation_id" in request.args: @@ -145,8 +153,11 @@ def score_image(self, form, file_path, lat, lng, iconic_taxon_id, geomodel): return InatVisionAPIResponses.aggregated_tree_response( aggregated_scores, self.inferrer ) + embedding = self.inferrer.signature_for_image(file_path) if \ + form.return_embedding.data == "true" else None return InatVisionAPIResponses.aggregated_object_response( - leaf_scores, aggregated_scores, self.inferrer + leaf_scores, aggregated_scores, self.inferrer, + embedding=embedding ) # legacy dict response @@ -154,7 +165,12 @@ def score_image(self, form, file_path, lat, lng, iconic_taxon_id, geomodel): return InatVisionAPIResponses.legacy_dictionary_response(leaf_scores, self.inferrer) if form.format.data == "object": - return InatVisionAPIResponses.object_response(leaf_scores, self.inferrer) + embedding = self.inferrer.signature_for_image(file_path) if \ + form.return_embedding.data == "true" else None + return InatVisionAPIResponses.object_response( + leaf_scores, self.inferrer, + embedding=embedding + ) return InatVisionAPIResponses.array_response(leaf_scores, self.inferrer) diff --git a/lib/inat_vision_api_responses.py b/lib/inat_vision_api_responses.py index 5cf292c..ad00ffe 100644 --- a/lib/inat_vision_api_responses.py +++ b/lib/inat_vision_api_responses.py @@ -20,7 +20,7 @@ def array_response(leaf_scores, inferrer): return InatVisionAPIResponses.array_response_columns(leaf_scores).to_dict(orient="records") @staticmethod - def object_response(leaf_scores, inferrer): + def object_response(leaf_scores, inferrer, embedding=None): leaf_scores = InatVisionAPIResponses.limit_leaf_scores_for_response(leaf_scores) leaf_scores = InatVisionAPIResponses.update_leaf_scores_scaling(leaf_scores) results = InatVisionAPIResponses.array_response_columns( @@ -39,10 +39,13 @@ def object_response(leaf_scores, inferrer): common_ancestor_frame ).to_dict(orient="records")[0] - return { + response = { "common_ancestor": common_ancestor, - "results": results + "results": results, } + if embedding is not None: + response["embedding"] = embedding + return response @staticmethod def aggregated_tree_response(aggregated_scores, inferrer): @@ -73,7 +76,7 @@ def aggregated_tree_response(aggregated_scores, inferrer): return "
" + "" @staticmethod - def aggregated_object_response(leaf_scores, aggregated_scores, inferrer): + def aggregated_object_response(leaf_scores, aggregated_scores, inferrer, embedding=None): top_leaf_combined_score = aggregated_scores.query( "leaf_class_id.notnull()" ).sort_values( @@ -116,10 +119,13 @@ def aggregated_object_response(leaf_scores, aggregated_scores, inferrer): common_ancestor_frame ).to_dict(orient="records")[0] - return { + response = { "common_ancestor": common_ancestor, - "results": final_results.to_dict(orient="records") + "results": final_results.to_dict(orient="records"), } + if embedding is not None: + response["embedding"] = embedding + return response @staticmethod def limit_leaf_scores_for_response(leaf_scores): diff --git a/lib/templates/home.html b/lib/templates/home.html index 41854b3..1bb7463 100644 --- a/lib/templates/home.html +++ b/lib/templates/home.html @@ -23,9 +23,9 @@
".join(printable_tree) + "