diff --git a/lib/inat_inferrer.py b/lib/inat_inferrer.py index 9aa6aa1..ec1a690 100644 --- a/lib/inat_inferrer.py +++ b/lib/inat_inferrer.py @@ -1,5 +1,4 @@ import time -import magic import tensorflow as tf import pandas as pd import h3 @@ -8,6 +7,14 @@ import os import tifffile import numpy as np +import urllib +import hashlib +import magic +import aiohttp +import aiofiles +import aiofiles.os +import asyncio + from PIL import Image from lib.tf_gp_elev_model import TFGeoPriorModelElev from lib.vision_inferrer import VisionInferrer @@ -610,6 +617,67 @@ def limit_leaf_scores_that_include_humans(self, leaf_scores): # otherwise return no results return leaf_scores.head(0) + async def embeddings_for_photos(self, photos): + response = {} + async with aiohttp.ClientSession() as session: + queue = asyncio.Queue() + workers = [asyncio.create_task(self.embeddings_worker_task(queue, response, session)) + for _ in range(5)] + for photo in photos: + queue.put_nowait(photo) + await queue.join() + for worker in workers: + worker.cancel() + return response + + async def embeddings_worker_task(self, queue, response, session): + while not queue.empty(): + photo = await queue.get() + try: + embedding = await self.embedding_for_photo(photo["url"], session) + response[photo["id"]] = embedding + finally: + queue.task_done() + + async def embedding_for_photo(self, url, session): + if url is None: + return + + try: + cache_path = await self.download_photo_async(url, session) + if cache_path is None: + return + return self.signature_for_image(cache_path) + except urllib.error.HTTPError: + return + + def signature_for_image(self, image_path): + image = InatInferrer.prepare_image_for_inference(image_path) + return self.vision_inferrer.signature_for_image(image).tolist() + + async def download_photo_async(self, url, session): + checksum = hashlib.md5(url.encode()).hexdigest() + cache_path = os.path.join(self.upload_folder, "download-" + checksum) + ".jpg" + if await aiofiles.os.path.exists(cache_path): + return cache_path + try: + async with session.get(url, timeout=10) as resp: + if resp.status == 200: + f = await aiofiles.open(cache_path, mode="wb") + await f.write(await resp.read()) + await f.close() + except asyncio.TimeoutError as e: + print("`download_photo_async` timed out") + print(e) + if not os.path.exists(cache_path): + return + mime_type = magic.from_file(cache_path, mime=True) + if mime_type != "image/jpeg": + im = Image.open(cache_path) + rgb_im = im.convert("RGB") + rgb_im.save(cache_path) + return cache_path + @staticmethod def prepare_image_for_inference(file_path): image = Image.open(file_path) diff --git a/lib/inat_vision_api.py b/lib/inat_vision_api.py index 3dde5fe..6021224 100644 --- a/lib/inat_vision_api.py +++ b/lib/inat_vision_api.py @@ -30,6 +30,8 @@ def __init__(self, config): self.h3_04_bounds_route, methods=["GET"]) self.app.add_url_rule("/geo_scores_for_taxa", "geo_scores_for_taxa", self.geo_scores_for_taxa_route, methods=["POST"]) + self.app.add_url_rule("/embeddings_for_photos", "embeddings_for_photos", + self.embeddings_for_photos_route, methods=["POST"]) self.app.add_url_rule("/build_info", "build_info", self.build_info_route, methods=["GET"]) def setup_inferrer(self, config): @@ -96,6 +98,12 @@ def geo_scores_for_taxa_route(self): for obs in request.json["observations"] } + async def embeddings_for_photos_route(self): + start_time = time.time() + response = await self.inferrer.embeddings_for_photos(request.json["photos"]) + print("embeddings_for_photos_route Time: %0.2fms" % ((time.time() - start_time) * 1000.)) + return response + def index_route(self): form = ImageForm() if "observation_id" in request.args: @@ -145,8 +153,11 @@ def score_image(self, form, file_path, lat, lng, iconic_taxon_id, geomodel): return InatVisionAPIResponses.aggregated_tree_response( aggregated_scores, self.inferrer ) + embedding = self.inferrer.signature_for_image(file_path) if \ + form.return_embedding.data == "true" else None return InatVisionAPIResponses.aggregated_object_response( - leaf_scores, aggregated_scores, self.inferrer + leaf_scores, aggregated_scores, self.inferrer, + embedding=embedding ) # legacy dict response @@ -154,7 +165,12 @@ def score_image(self, form, file_path, lat, lng, iconic_taxon_id, geomodel): return InatVisionAPIResponses.legacy_dictionary_response(leaf_scores, self.inferrer) if form.format.data == "object": - return InatVisionAPIResponses.object_response(leaf_scores, self.inferrer) + embedding = self.inferrer.signature_for_image(file_path) if \ + form.return_embedding.data == "true" else None + return InatVisionAPIResponses.object_response( + leaf_scores, self.inferrer, + embedding=embedding + ) return InatVisionAPIResponses.array_response(leaf_scores, self.inferrer) diff --git a/lib/inat_vision_api_responses.py b/lib/inat_vision_api_responses.py index 5cf292c..ad00ffe 100644 --- a/lib/inat_vision_api_responses.py +++ b/lib/inat_vision_api_responses.py @@ -20,7 +20,7 @@ def array_response(leaf_scores, inferrer): return InatVisionAPIResponses.array_response_columns(leaf_scores).to_dict(orient="records") @staticmethod - def object_response(leaf_scores, inferrer): + def object_response(leaf_scores, inferrer, embedding=None): leaf_scores = InatVisionAPIResponses.limit_leaf_scores_for_response(leaf_scores) leaf_scores = InatVisionAPIResponses.update_leaf_scores_scaling(leaf_scores) results = InatVisionAPIResponses.array_response_columns( @@ -39,10 +39,13 @@ def object_response(leaf_scores, inferrer): common_ancestor_frame ).to_dict(orient="records")[0] - return { + response = { "common_ancestor": common_ancestor, - "results": results + "results": results, } + if embedding is not None: + response["embedding"] = embedding + return response @staticmethod def aggregated_tree_response(aggregated_scores, inferrer): @@ -73,7 +76,7 @@ def aggregated_tree_response(aggregated_scores, inferrer): return "
" + "
".join(printable_tree) + "
" @staticmethod - def aggregated_object_response(leaf_scores, aggregated_scores, inferrer): + def aggregated_object_response(leaf_scores, aggregated_scores, inferrer, embedding=None): top_leaf_combined_score = aggregated_scores.query( "leaf_class_id.notnull()" ).sort_values( @@ -116,10 +119,13 @@ def aggregated_object_response(leaf_scores, aggregated_scores, inferrer): common_ancestor_frame ).to_dict(orient="records")[0] - return { + response = { "common_ancestor": common_ancestor, - "results": final_results.to_dict(orient="records") + "results": final_results.to_dict(orient="records"), } + if embedding is not None: + response["embedding"] = embedding + return response @staticmethod def limit_leaf_scores_for_response(leaf_scores): diff --git a/lib/templates/home.html b/lib/templates/home.html index 41854b3..1bb7463 100644 --- a/lib/templates/home.html +++ b/lib/templates/home.html @@ -23,9 +23,9 @@

Slim vs Legacy Model

Lng:


+ +

diff --git a/lib/vision_inferrer.py b/lib/vision_inferrer.py index a81e8ee..9654ef8 100644 --- a/lib/vision_inferrer.py +++ b/lib/vision_inferrer.py @@ -16,8 +16,16 @@ def prepare_tf_model(self): assert device.device_type != "GPU" self.vision_model = tf.keras.models.load_model(self.model_path, compile=False) + self.signature_model = tf.keras.Model( + inputs=self.vision_model.inputs, + outputs=self.vision_model.get_layer("global_average_pooling2d_5").output + ) + self.signature_model.compile() # given an image object (usually coming from prepare_image_for_inference), # calculate vision results for the image def process_image(self, image): return self.vision_model(tf.convert_to_tensor(image), training=False)[0] + + def signature_for_image(self, image): + return self.signature_model(tf.convert_to_tensor(image), training=False)[0].numpy() diff --git a/lib/web_forms.py b/lib/web_forms.py index 2c42768..e0e0e67 100644 --- a/lib/web_forms.py +++ b/lib/web_forms.py @@ -13,4 +13,5 @@ class ImageForm(FlaskForm): taxon_id = StringField("taxon_id") geomodel = StringField("geomodel") aggregated = StringField("aggregated") + return_embedding = StringField("return_embedding") format = StringField("format") diff --git a/requirements.txt b/requirements.txt index 5a42153..a66056b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,9 @@ +aiofiles==24.1.0 +aiohttp==3.11.2;python_version>="3.11" +aiohttp==3.10.11;python_version=="3.8" flake8==7.0.0 flake8-quotes==3.4.0 -Flask==3.0.2 +Flask[async]==3.0.2 Flask-WTF==1.2.1 h3==3.7.7 h3pandas==0.2.6 diff --git a/tests/conftest.py b/tests/conftest.py index 7a5660a..f58935f 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -36,4 +36,5 @@ def inatInferrer(request, mocker): os.path.realpath(os.path.dirname(__file__) + "/fixtures/synonyms.csv") } mocker.patch("tensorflow.keras.models.load_model", return_value=MagicMock()) + mocker.patch("tensorflow.keras.Model", return_value=MagicMock()) return InatInferrer(config) diff --git a/tests/test_vision_inferrer.py b/tests/test_vision_inferrer.py index 18830bf..ccf64f1 100644 --- a/tests/test_vision_inferrer.py +++ b/tests/test_vision_inferrer.py @@ -6,6 +6,7 @@ class TestVisionInferrer: def test_initialization(self, mocker): mocker.patch("tensorflow.keras.models.load_model", return_value=MagicMock()) + mocker.patch("tensorflow.keras.Model", return_value=MagicMock()) model_path = "model_path" inferrer = VisionInferrer(model_path) assert inferrer.model_path == model_path @@ -16,6 +17,7 @@ def test_initialization(self, mocker): def test_process_image(self, mocker): mocker.patch("tensorflow.keras.models.load_model", return_value=MagicMock()) + mocker.patch("tensorflow.keras.Model", return_value=MagicMock()) model_path = "model_path" inferrer = VisionInferrer(model_path) theimage = "theimage"