From 2a0a1c3c66be7ded3000b1398fa56662491c5706 Mon Sep 17 00:00:00 2001 From: rwidom Date: Fri, 13 Jan 2023 17:04:35 -0500 Subject: [PATCH] iNaturalist in-SQL loading (#745) * cleaning and temp table in pg * sketch of full dag NOT TESTED * inaturalist dag without tests or reporting (yet) * complete dag, 25 mill recs in 5.5 hours local test * Add passwords for s3 testing with new docker * make temp loading table UNLOGGED to load it faster * inat with translation 75 million recs in 8 hrs * using OUTPUT_DIR for API files * clarify delayed requester vs requester * DRYer approach to tags TO DO * comments on taxa transformation * scientific names not ids for manual translation * TO DO comment clean-up * fix name insert syntax * Merge 'main' into feature/inaturalist-performance * add clarity on batch limit override * missing piece of merge from main * limit to 20 tags per photo * add option to use alternate dag creation for sql * adjust tests see issue #898 * slightly faster way to pull medium test sample * Note another data source for vernacular names * remove unnecessary test code * clean and upsert one batch at a time * log parsing resource doc * use common.constants.IMAGE instead of MEDIA_TYPE * add explanation of ancestry joins and taxa tags * use existing clean_intermediate_table_data * remove unnecessary env vars from load_to_s3 * declarative doc string for file update check * update iNaturalist description * remove message to Staci :) * use dynamically generated load subtasks * clarify taxa comments and include languages * consolidate consolidation code * add testing for consolidated metrics * separate ti_mock instances per test * test get batches * shorter titles to save space * add better testing instructions * dag parameter to manage post-ingestion deletions * Add kwargs to get_response_json call * get_media_type can be static method Co-authored-by: Krystle Salazar * link to original inaturalist photo, rather than medium Co-authored-by: Krystle Salazar * prefer creator name over login * remove unused constants * add to do for extension cleanup Co-authored-by: Madison Swain-Bowden Co-authored-by: Krystle Salazar --- DAGs.md | 29 +- openverse_catalog/dags/common/loader/sql.py | 3 +- .../provider_api_scripts/inaturalist.py | 421 +++++++++++++++--- .../inaturalist/create_schema.sql | 168 +++++++ .../inaturalist/export_to_json.template.sql | 87 ---- .../inaturalist/observers.sql | 10 + .../inaturalist/photos.sql | 12 +- .../inaturalist/taxa.sql | 249 ++++++++++- .../transformed_table.template.sql | 77 ++++ .../dags/providers/provider_dag_factory.py | 12 +- .../inaturalist/check_creator_metadata.py | 23 + .../inaturalist/exploring_catalog_of_life.py | 113 +++++ .../inaturalist/pull_sample_records.py | 32 +- .../provider_api_scripts/test_inaturalist.py | 158 ++++--- 14 files changed, 1112 insertions(+), 282 deletions(-) delete mode 100644 openverse_catalog/dags/providers/provider_csv_load_scripts/inaturalist/export_to_json.template.sql create mode 100644 openverse_catalog/dags/providers/provider_csv_load_scripts/inaturalist/transformed_table.template.sql create mode 100644 tests/dags/providers/provider_api_scripts/resources/inaturalist/check_creator_metadata.py create mode 100644 tests/dags/providers/provider_api_scripts/resources/inaturalist/exploring_catalog_of_life.py diff --git a/DAGs.md b/DAGs.md index 03547bea98b..f53eb4196cc 100644 --- a/DAGs.md +++ b/DAGs.md @@ -301,22 +301,19 @@ and related PRs: Provider: iNaturalist -Output: TSV file containing the media metadata. - -Notes: [The iNaturalist API is not intended for data scraping.] -(https://api.inaturalist.org/v1/docs/) - - [But there is a full dump intended for sharing on S3.] - (https://github.com/inaturalist/inaturalist-open-data/tree/documentation/Metadata) - - Because these are very large normalized tables, as opposed to more document - oriented API responses, we found that bringing the data into postgres first - was the most effective approach. [More detail in slack here.] - (https://wordpress.slack.com/archives/C02012JB00N/p1653145643080479?thread_ts=1653082292.714469&cid=C02012JB00N) - - We use the table structure defined [here,] - (https://github.com/inaturalist/inaturalist-open-data/blob/main/Metadata/structure.sql) - except for adding ancestry tags to the taxa table. +Output: Records loaded to the image catalog table. + +Notes: The iNaturalist API is not intended for data scraping. +https://api.inaturalist.org/v1/docs/ But there is a full dump intended for +sharing on S3. +https://github.com/inaturalist/inaturalist-open-data/tree/documentation/Metadata +Because these are very large normalized tables, as opposed to more document +oriented API responses, we found that bringing the data into postgres first was +the most effective approach. More detail in slack here: +https://wordpress.slack.com/archives/C02012JB00N/p1653145643080479?thread_ts=1653082292.714469&cid=C02012JB00N +We use the table structure defined here, +https://github.com/inaturalist/inaturalist-open-data/blob/main/Metadata/structure.sql +except for adding ancestry tags to the taxa table. ## `jamendo_workflow` diff --git a/openverse_catalog/dags/common/loader/sql.py b/openverse_catalog/dags/common/loader/sql.py index 6659f79f588..be042a50803 100644 --- a/openverse_catalog/dags/common/loader/sql.py +++ b/openverse_catalog/dags/common/loader/sql.py @@ -71,7 +71,7 @@ def create_loading_table( columns_definition = f"{create_column_definitions(loading_table_columns)}" table_creation_query = dedent( f""" - CREATE TABLE public.{load_table}( + CREATE UNLOGGED TABLE public.{load_table}( {columns_definition}); """ ) @@ -96,6 +96,7 @@ def create_index(column, btree_column=None): create_index(col.PROVIDER.db_name, None) create_index(col.FOREIGN_ID.db_name, "provider") create_index(col.DIRECT_URL.db_name, "provider") + return load_table def load_local_data_to_intermediate_table( diff --git a/openverse_catalog/dags/providers/provider_api_scripts/inaturalist.py b/openverse_catalog/dags/providers/provider_api_scripts/inaturalist.py index 4952d46aa37..d0814eefde3 100644 --- a/openverse_catalog/dags/providers/provider_api_scripts/inaturalist.py +++ b/openverse_catalog/dags/providers/provider_api_scripts/inaturalist.py @@ -1,105 +1,184 @@ """ Provider: iNaturalist -Output: TSV file containing the media metadata. - -Notes: [The iNaturalist API is not intended for data scraping.] - (https://api.inaturalist.org/v1/docs/) - - [But there is a full dump intended for sharing on S3.] - (https://github.com/inaturalist/inaturalist-open-data/tree/documentation/Metadata) +Output: Records loaded to the image catalog table. +Notes: The iNaturalist API is not intended for data scraping. + https://api.inaturalist.org/v1/docs/ + But there is a full dump intended for sharing on S3. + https://github.com/inaturalist/inaturalist-open-data/tree/documentation/Metadata Because these are very large normalized tables, as opposed to more document oriented API responses, we found that bringing the data into postgres first - was the most effective approach. [More detail in slack here.] - (https://wordpress.slack.com/archives/C02012JB00N/p1653145643080479?thread_ts=1653082292.714469&cid=C02012JB00N) - - We use the table structure defined [here,] - (https://github.com/inaturalist/inaturalist-open-data/blob/main/Metadata/structure.sql) + was the most effective approach. More detail in slack here: + https://wordpress.slack.com/archives/C02012JB00N/p1653145643080479?thread_ts=1653082292.714469&cid=C02012JB00N + We use the table structure defined here, + https://github.com/inaturalist/inaturalist-open-data/blob/main/Metadata/structure.sql except for adding ancestry tags to the taxa table. """ +import logging import os +import time +import zipfile +from datetime import timedelta from pathlib import Path import pendulum -from airflow.exceptions import AirflowSkipException -from airflow.operators.python import PythonOperator +import requests +from airflow import XComArg +from airflow.exceptions import AirflowNotFoundException, AirflowSkipException +from airflow.operators.python import PythonOperator, ShortCircuitOperator from airflow.providers.amazon.aws.hooks.s3 import S3Hook from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator from airflow.providers.postgres.hooks.postgres import PostgresHook from airflow.utils.task_group import TaskGroup -from common.constants import POSTGRES_CONN_ID -from common.licenses import NO_LICENSE_FOUND, get_license_info -from common.loader import provider_details as prov +from airflow.utils.trigger_rule import TriggerRule +from common.constants import IMAGE, POSTGRES_CONN_ID, XCOM_PULL_TEMPLATE +from common.loader import provider_details, reporting, sql from providers.provider_api_scripts.provider_data_ingester import ProviderDataIngester +logger = logging.getLogger(__name__) + AWS_CONN_ID = os.getenv("AWS_CONN_ID", "test_conn_id") -PROVIDER = prov.INATURALIST_DEFAULT_PROVIDER SCRIPT_DIR = Path(__file__).parents[1] / "provider_csv_load_scripts/inaturalist" SOURCE_FILE_NAMES = ["photos", "observations", "taxa", "observers"] +LOADER_ARGS = { + "postgres_conn_id": POSTGRES_CONN_ID, + "identifier": "{{ ts_nodash }}", + "media_type": IMAGE, +} +OUTPUT_DIR = Path(os.getenv("OUTPUT_DIR", "/tmp/")) class INaturalistDataIngester(ProviderDataIngester): - providers = {"image": prov.INATURALIST_DEFAULT_PROVIDER} - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.pg = PostgresHook(POSTGRES_CONN_ID) - - # adjustments to buffer limits. TO DO: try to integrate this with the dev - # environment logic in the base class, rather than just over-writing it. - # See https://github.com/WordPress/openverse-catalog/issues/682 - self.media_stores["image"].buffer_length = 10_000 - self.batch_limit = 10_000 - self.sql_template = (SCRIPT_DIR / "export_to_json.template.sql").read_text() + providers = {"image": provider_details.INATURALIST_DEFAULT_PROVIDER} def get_next_query_params(self, prev_query_params=None, **kwargs): - if prev_query_params is None: - return {"offset_num": 0} - else: - next_offset = prev_query_params["offset_num"] + self.batch_limit - return {"offset_num": next_offset} - - def get_response_json(self, query_params: dict): - """ - Call the SQL to pull json from Postgres, where the raw data has been loaded. - """ - sql_string = self.sql_template.format( - batch_limit=self.batch_limit, offset_num=query_params["offset_num"] + raise NotImplementedError( + "Instead we use get_batches to dynamically create subtasks." ) - sql_result = self.pg.get_records(sql_string) - # Postgres returns a list of tuples, even if it's one tuple with one item. - return sql_result[0][0] + + def get_response_json(self, query_params, **kwargs): + raise NotImplementedError("TSV files from AWS S3 processed in postgres.") def get_batch_data(self, response_json): - if response_json: - return response_json - return None + raise NotImplementedError("TSV files from AWS S3 processed in postgres.") def get_record_data(self, data): - if data.get("foreign_identifier") is None: - # TO DO: maybe raise an error here or after a certain number of these? - # more info in https://github.com/WordPress/openverse-catalog/issues/684 - return None - license_url = data.get("license_url") - license_info = get_license_info(license_url=license_url) - if license_info == NO_LICENSE_FOUND: - return None - record_data = {k: data[k] for k in data.keys() if k != "license_url"} - record_data["license_info"] = license_info - return record_data - - def get_media_type(self, record): + raise NotImplementedError("TSV files from AWS S3 processed in postgres.") + + @staticmethod + def get_media_type(record): # This provider only supports Images via S3, though they have some audio files # on site and in the API. - return "image" + return IMAGE def endpoint(self): raise NotImplementedError("Normalized TSV files from AWS S3 means no endpoint.") + @staticmethod + def get_batches( + batch_length: int, # must be a positive, non-zero integer + postgres_conn_id=POSTGRES_CONN_ID, + ): + pg = PostgresHook(postgres_conn_id) + max_id = pg.get_records("SELECT max(photo_id) FROM inaturalist.photos")[0][0] + if max_id is None: + # This would only happen if there were no data loaded to inaturalist.photos + # yet, but just in case. + return + else: + # Return the list of batch starts and ends, which will be passed to op_args, + # which expects each arg to be a list. So, it's a list of lists, not a list + # of tuples. + return [[(x, x + batch_length - 1)] for x in range(0, max_id, batch_length)] + + @staticmethod + def load_transformed_data( + batch: tuple[int, int], + intermediate_table: str, + identifier: str, + postgres_conn_id=POSTGRES_CONN_ID, + sql_template_file_name="transformed_table.template.sql", + ): + """ + Processes a single batch of inaturalist photo ids. batch_start is the minimum + photo_id for the batch. get_batches generates a list for xcoms to use in + generating tasks that use this function. + """ + start_time = time.perf_counter() + (batch_start, batch_end) = batch + pg = PostgresHook(postgres_conn_id) + sql_template = (SCRIPT_DIR / sql_template_file_name).read_text() + batch_number = int(batch_start / (batch_end - batch_start + 1)) + 1 + logger.info(f"Starting at photo_id {batch_start}, on batch {batch_number}.") + # Load records to the intermediate table + (loaded_records, max_id_loaded) = pg.get_records( + sql_template.format( + intermediate_table=intermediate_table, + batch_start=batch_start, + batch_end=batch_end, + ) + )[0] + logger.info( + f"Inserted {loaded_records} into {intermediate_table}. " + f"Last photo_id loaded was {max_id_loaded}, from batch {batch_number}." + ) + # Run standard cleaning + (missing_columns, foreign_id_dup) = sql.clean_intermediate_table_data( + identifier=identifier, + postgres_conn_id=postgres_conn_id, + ) + # Add transformed records to the target catalog image table. + # TO DO: Would it be better to use loader.upsert_records here? Would need to + # trace back the parameters that need to be passed in for different stats. + upserted_records = sql.upsert_records_to_db_table( + identifier=identifier, + postgres_conn_id=postgres_conn_id, + ) + logger.info(f"Upserted {upserted_records} records, from batch {batch_number}.") + # Truncate the temp table + pg.run(f"truncate table {intermediate_table};") + # Return results for consolidation + end_time = time.perf_counter() + duration = end_time - start_time + return { + "loaded": loaded_records, + "max_id_loaded": max_id_loaded, + "missing_columns": missing_columns, + "foreign_id_dup": foreign_id_dup, + "upserted": upserted_records, + "duration": duration, + } + + @staticmethod + def consolidate_load_statistics(all_results, ti): + """ + all_results should be a list of all of the return_values from dynamically + generated subtasks under load_transformed_data. This just totals the individual + stats from each step. + """ + if all_results is None: + return + else: + METRICS = ["loaded", "missing_columns", "foreign_id_dup", "upserted"] + metric_output = {} + for metric in METRICS: + metric_output[metric] = sum([x[metric] for x in all_results]) + # url dups are just a remainder, per common.loader.upsert_data + metric_output["url_dup"] = ( + metric_output["loaded"] + - metric_output["missing_columns"] + - metric_output["foreign_id_dup"] + - metric_output["upserted"] + ) + metric_output.pop("loaded") + # splitting metrics to be consistent with common.reporting.report_completion + ti.xcom_push(key="duration", value=[x["duration"] for x in all_results]) + return {IMAGE: reporting.RecordMetrics(**metric_output)} + @staticmethod def compare_update_dates( last_success: pendulum.DateTime | None, s3_keys: list, aws_conn_id=AWS_CONN_ID @@ -121,6 +200,77 @@ def compare_update_dates( # If no files have been updated, skip the DAG raise AirflowSkipException("Nothing new to ingest") + @staticmethod + def load_catalog_of_life_names(remove_api_files: bool): + COL_URL = "https://api.checklistbank.org/dataset/9840/export.zip?format=ColDP" + local_zip_file = "COL_archive.zip" + name_usage_file = "NameUsage.tsv" + vernacular_file = "VernacularName.tsv" + # download zip file from Catalog of Life + if (OUTPUT_DIR / local_zip_file).exists(): + logger.info( + f"{OUTPUT_DIR}/{local_zip_file} exists, so no Catalog of Life download." + ) + else: + # This is a static method so that it can be used to create preingestion + # tasks for airflow. Unfortunately, that means it does not have access to + # the delayed requester. So, we are just using requests for now. + with requests.get(COL_URL, stream=True) as response: + response.raise_for_status() + with open(OUTPUT_DIR / local_zip_file, "wb") as f: + for chunk in response.iter_content(chunk_size=8192): + f.write(chunk) + logger.info( + f"Saved Catalog of Life download: {OUTPUT_DIR}/{local_zip_file}" + ) + # Extract specific files we need from the zip file + if (OUTPUT_DIR / name_usage_file).exists() and ( + OUTPUT_DIR / vernacular_file + ).exists(): + logger.info("No extract, both Catalog of Life tsv files exist.") + else: + with zipfile.ZipFile(OUTPUT_DIR / local_zip_file) as z: + with open(OUTPUT_DIR / name_usage_file, "wb") as f: + f.write(z.read(name_usage_file)) + logger.info(f"Extracted raw file: {OUTPUT_DIR}/{name_usage_file}") + with open(OUTPUT_DIR / vernacular_file, "wb") as f: + f.write(z.read(vernacular_file)) + logger.info(f"Extracted raw file: {OUTPUT_DIR}/{vernacular_file}") + # set up for loading data + pg = PostgresHook(POSTGRES_CONN_ID) + COPY_SQL = ( + "COPY inaturalist.{} FROM STDIN " + "DELIMITER E'\t' CSV HEADER QUOTE E'\b' NULL AS ''" + ) + COUNT_SQL = "SELECT count(*) FROM inaturalist.{};" + # upload vernacular names file to postgres + pg.copy_expert(COPY_SQL.format("col_vernacular"), OUTPUT_DIR / vernacular_file) + vernacular_records = pg.get_records(COUNT_SQL.format("col_vernacular")) + if vernacular_records[0][0] == 0: + raise AirflowNotFoundException("No Catalog of Life vernacular data loaded.") + else: + logger.info( + f"Loaded {vernacular_records[0][0]} records from {vernacular_file}" + ) + # upload name usage file to postgres + pg.copy_expert(COPY_SQL.format("col_name_usage"), OUTPUT_DIR / name_usage_file) + name_usage_records = pg.get_records(COUNT_SQL.format("col_name_usage")) + if name_usage_records[0][0] == 0: + raise AirflowNotFoundException("No Catalog of Life name usage data loaded.") + else: + logger.info( + f"Loaded {name_usage_records[0][0]} records from {name_usage_file}" + ) + # TO DO #917: save source files on s3? + if remove_api_files: + os.remove(OUTPUT_DIR / local_zip_file) + os.remove(OUTPUT_DIR / vernacular_file) + os.remove(OUTPUT_DIR / name_usage_file) + return { + "COL Name Usage Records": name_usage_records[0][0], + "COL Vernacular Records": vernacular_records[0][0], + } + @staticmethod def create_preingestion_tasks(): @@ -135,31 +285,156 @@ def create_preingestion_tasks(): f"{file_name}.csv.gz" for file_name in SOURCE_FILE_NAMES ], }, + doc_md="Check for iNaturalist files added to S3 since last load", ) create_inaturalist_schema = SQLExecuteQueryOperator( task_id="create_inaturalist_schema", conn_id=POSTGRES_CONN_ID, sql=(SCRIPT_DIR / "create_schema.sql").read_text(), + doc_md="Create temporary schema and license table", + ) + + load_catalog_of_life_names = PythonOperator( + task_id="load_catalog_of_life_names", + python_callable=INaturalistDataIngester.load_catalog_of_life_names, + doc_md="Load vernacular taxon names from Catalog of Life", + op_kwargs={ + "remove_api_files": "{{params.sql_rm_source_data_after_ingesting}}" + }, ) - with TaskGroup(group_id="load_source_files") as load_source_files: + ( + check_for_file_updates + >> create_inaturalist_schema + >> load_catalog_of_life_names + ) + + return preingestion_tasks + + @staticmethod + def create_postingestion_tasks(): + with TaskGroup(group_id="postingestion_tasks") as postingestion_tasks: + check_drop_parameter = ShortCircuitOperator( + task_id="check_drop_parameter", + doc_md="Skip post-ingestion if NOT sql_rm_source_data_after_ingesting.", + op_args=["{{ params.sql_rm_source_data_after_ingesting }}"], + python_callable=(lambda x: x), + trigger_rule=TriggerRule.NONE_SKIPPED, + # just skip the drop steps, not the final reporting step in the dag + ignore_downstream_trigger_rules=False, + ) + drop_inaturalist_schema = SQLExecuteQueryOperator( + task_id="drop_inaturalist_schema", + conn_id=POSTGRES_CONN_ID, + sql="DROP SCHEMA IF EXISTS inaturalist CASCADE", + doc_md="Drop iNaturalist source tables and their schema", + ) + drop_loading_table = PythonOperator( + task_id="drop_loading_table", + python_callable=sql.drop_load_table, + op_kwargs=LOADER_ARGS, + doc_md="Drop the temporary (transformed) loading table", + ) + (check_drop_parameter >> [drop_inaturalist_schema, drop_loading_table]) + return postingestion_tasks + + @staticmethod + def create_ingestion_workflow(): + + with TaskGroup(group_id="ingest_data") as ingest_data: + + preingestion_tasks = INaturalistDataIngester.create_preingestion_tasks() + + with TaskGroup(group_id="pull_image_data") as pull_data: for source_name in SOURCE_FILE_NAMES: SQLExecuteQueryOperator( task_id=f"load_{source_name}", conn_id=POSTGRES_CONN_ID, sql=(SCRIPT_DIR / f"{source_name}.sql").read_text(), + doc_md=f"Load iNaturalist {source_name} from s3 to postgres", ), - (check_for_file_updates >> create_inaturalist_schema >> load_source_files) + with TaskGroup(group_id="load_image_data") as loader_tasks: - return preingestion_tasks + # Using the existing set up, but the indexes on the temporary table + # probably slows down the load a bit. + create_loading_table = PythonOperator( + task_id="create_loading_table", + python_callable=sql.create_loading_table, + op_kwargs=LOADER_ARGS, + doc_md=( + "Create a temp table for ingesting data from inaturalist " + "source tables." + ), + ) - @staticmethod - def create_postingestion_tasks(): - drop_inaturalist_schema = SQLExecuteQueryOperator( - task_id="drop_inaturalist_schema", - conn_id=POSTGRES_CONN_ID, - sql="DROP SCHEMA IF EXISTS inaturalist CASCADE", - ) - return drop_inaturalist_schema + get_batches = PythonOperator( + task_id="get_batches", + python_callable=INaturalistDataIngester.get_batches, + op_kwargs={ + "batch_length": 2_000_000, + "postgres_conn_id": LOADER_ARGS["postgres_conn_id"], + }, + ) + + load_transformed_data = PythonOperator.partial( + task_id="load_transformed_data", + python_callable=INaturalistDataIngester.load_transformed_data, + # In testing this locally, the longest iteration took 39 minutes, + # median was 18 minutes. We should probably adjust the timeouts with + # more info from production runs. + execution_timeout=timedelta(hours=1), + retries=0, + max_active_tis_per_dag=1, + op_kwargs={ + "intermediate_table": XCOM_PULL_TEMPLATE.format( + create_loading_table.task_id, "return_value" + ), + "identifier": LOADER_ARGS["identifier"], + }, + doc_md=( + "Load one batch of data from source tables to target table." + ), + ).expand( + op_args=XComArg(get_batches), + ) + + consolidate_load_statistics = PythonOperator( + task_id="consolidate_load_statistics", + python_callable=INaturalistDataIngester.consolidate_load_statistics, + op_kwargs={ + "all_results": XCOM_PULL_TEMPLATE.format( + load_transformed_data.task_id, "return_value" + ), + }, + doc_md=( + "Total load counts across batches from load_transformed_data." + ), + retries=0, + ) + + ( + create_loading_table + >> get_batches + >> load_transformed_data + >> consolidate_load_statistics + ) + + postingestion_tasks = INaturalistDataIngester.create_postingestion_tasks() + + (preingestion_tasks >> pull_data >> loader_tasks >> postingestion_tasks) + + # Reporting on the time it takes to load transformed data into the intermediate + # table, clean it, and upsert it to the final target. This is not strictly + # comparable to the time it takes to load from the s3 source. + ingestion_metrics = { + "duration": XCOM_PULL_TEMPLATE.format( + consolidate_load_statistics.task_id, "duration" + ), + "record_counts_by_media_type": XCOM_PULL_TEMPLATE.format( + consolidate_load_statistics.task_id, "return_value" + ), + } + + return ingest_data, ingestion_metrics diff --git a/openverse_catalog/dags/providers/provider_csv_load_scripts/inaturalist/create_schema.sql b/openverse_catalog/dags/providers/provider_csv_load_scripts/inaturalist/create_schema.sql index 8c4d1287fc2..736e4b8453e 100644 --- a/openverse_catalog/dags/providers/provider_csv_load_scripts/inaturalist/create_schema.sql +++ b/openverse_catalog/dags/providers/provider_csv_load_scripts/inaturalist/create_schema.sql @@ -2,3 +2,171 @@ CREATE SCHEMA IF NOT EXISTS inaturalist; COMMIT; SELECT schema_name FROM information_schema.schemata WHERE schema_name = 'inaturalist'; + +/* +LICENSE LOOKUP +Everything on iNaturalist is holding at version 4, except CC0 which is version 1.0. +License versions below are hard-coded from inaturalist +https://github.com/inaturalist/inaturalist/blob/d338ba76d82af83d8ad0107563015364a101568c/app/models/shared/license_module.rb#L5 +*/ + +DROP TABLE IF EXISTS inaturalist.license_codes; +COMMIT; + +/* +_enrich_metadata calls for both license_url and raw_license_url, but there is no +raw license_url here, it's all calculated +https://github.com/WordPress/openverse-catalog/blob/337ea7aede228609cbd5031e3a501f22b6ccc482/openverse_catalog/dags/common/storage/media.py#L247 +*/ +CREATE TABLE inaturalist.license_codes ( + inaturalist_code varchar(50), + license_name varchar(255), + license_url_metadata jsonb, + openverse_code varchar(50), + license_version varchar(25) +); +COMMIT; + +INSERT INTO inaturalist.license_codes + (inaturalist_code, license_name, license_url_metadata, openverse_code, license_version) + VALUES + ('CC-BY-NC-SA', 'Creative Commons Attribution-NonCommercial-ShareAlike License', jsonb_build_object('license_url', 'http://creativecommons.org/licenses/by-nc-sa/4.0/'), 'by-nc-sa', '4.0'), + ('CC-BY-NC', 'Creative Commons Attribution-NonCommercial License', jsonb_build_object('license_url', 'http://creativecommons.org/licenses/by-nc/4.0/'), 'by-nc', '4.0'), + ('CC-BY-NC-ND', 'Creative Commons Attribution-NonCommercial-NoDerivs License', jsonb_build_object('license_url', 'http://creativecommons.org/licenses/by-nc-nd/4.0/'), 'by-nc-nd', '4.0'), + ('CC-BY', 'Creative Commons Attribution License', jsonb_build_object('license_url', 'http://creativecommons.org/licenses/by/4.0/'), 'by', '4.0'), + ('CC-BY-SA', 'Creative Commons Attribution-ShareAlike License', jsonb_build_object('license_url', 'http://creativecommons.org/licenses/by-sa/4.0/'), 'by-sa', '4.0'), + ('CC-BY-ND', 'Creative Commons Attribution-NoDerivs License', jsonb_build_object('license_url', 'http://creativecommons.org/licenses/by-nd/4.0/'), 'by-nd', '4.0'), + ('PD', 'Public domain', jsonb_build_object('license_url', 'http://en.wikipedia.org/wiki/Public_domain'), 'pdm', ''), + ('GFDL', 'GNU Free Documentation License', jsonb_build_object('license_url', 'http://www.gnu.org/copyleft/fdl.html'), 'gfdl', ''), + ('CC0', 'Creative Commons CC0 Universal Public Domain Dedication', jsonb_build_object('license_url', 'http://creativecommons.org/publicdomain/zero/1.0/'), 'cc0', '1.0'); +COMMIT; + +/* +SPECIES NAMES +The Catalog of Life (COL) has data on vernacular names which we use to optimize titles +and tags based on iNaturalist taxon information. But there a few very common taxon_ids +that do not have matches in the COL so I am adding them hard coded here. + +Another option would be the Integrated Taxonomic Information System +https://www.itis.gov/dwca_format.html which also has vernacular names / synonyms. +*/ + +DROP TABLE IF EXISTS inaturalist.col_vernacular; +COMMIT; + +CREATE TABLE inaturalist.col_vernacular ( + taxonID varchar(5), + sourceID decimal, + taxon_name varchar(2000), + transliteration text, + name_language varchar(3), + country varchar(3), + area varchar(2000), + sex decimal, + referenceID decimal +); +COMMIT; + +DROP TABLE IF EXISTS inaturalist.col_name_usage; +COMMIT; + +CREATE TABLE inaturalist.col_name_usage ( + ID varchar(50), + alternativeID decimal, + nameAlternativeID decimal, + sourceID decimal, + parentID varchar(5), + basionymID varchar(5), + status varchar(22), + scientificName varchar(76), + authorship varchar(255), + rank varchar(21), + notho varchar(13), + uninomial varchar(50), + genericName varchar(50), + infragenericEpithet varchar(25), + specificEpithet varchar(50), + infraspecificEpithet varchar(50), + cultivarEpithet varchar(50), + namePhrase varchar(80), + nameReferenceID varchar(36), + publishedInYear decimal, + publishedInPage varchar(255), + publishedInPageLink varchar(255), + code varchar(10), + nameStatus varchar(15), + accordingToID varchar(36), + accordingToPage decimal, + accordingToPageLink decimal, + referenceID text, + scrutinizer varchar(149), + scrutinizerID decimal, + scrutinizerDate varchar(10), + extinct boolean, + temporalRangeStart varchar(15), + temporalRangeEnd varchar(15), + environment varchar(38), + species decimal, + section decimal, + subgenus decimal, + genus decimal, + subtribe decimal, + tribe decimal, + subfamily decimal, + taxon_family decimal, + superfamily decimal, + suborder decimal, + taxon_order decimal, + subclass decimal, + taxon_class decimal, + subphylum decimal, + phylum decimal, + kingdom decimal, + sequenceIndex decimal, + branchLength decimal, + link varchar(240), + nameRemarks decimal, + remarks text +); +COMMIT; + +DROP TABLE IF EXISTS inaturalist.manual_name_additions; +COMMIT; + +CREATE TABLE inaturalist.manual_name_additions ( + md5_scientificname uuid, + vernacular_name varchar(100) +); +with records as + ( + select cast(md5('Animalia') as uuid) as md5_scientificname, 'Animals' as vernacular_name + union all + select cast(md5('Araneae') as uuid) as md5_scientificname, 'Spider' as vernacular_name + union all + select cast(md5('Magnoliopsida') as uuid) as md5_scientificname, 'Flowers' as vernacular_name + union all + select cast(md5('Plantae') as uuid) as md5_scientificname, 'Plants' as vernacular_name + union all + select cast(md5('Lepidoptera') as uuid) as md5_scientificname, 'Butterflies and Moths' as vernacular_name + union all + select cast(md5('Insecta') as uuid) as md5_scientificname, 'Insect' as vernacular_name + union all + select cast(md5('Agaricales') as uuid) as md5_scientificname, 'Mushroom' as vernacular_name + union all + select cast(md5('Poaceae') as uuid) as md5_scientificname, 'Grass' as vernacular_name + union all + select cast(md5('Asteraceae') as uuid) as md5_scientificname, 'Daisy' as vernacular_name + union all + select cast(md5('Danaus plexippus') as uuid) as md5_scientificname, 'Monarch Butterfly' as vernacular_name + union all + select cast(md5('Felinae') as uuid) as md5_scientificname, 'Cats' as vernacular_name + union all + select cast(md5('Canis') as uuid) as md5_scientificname, 'Dogs' as vernacular_name + ) +INSERT INTO inaturalist.manual_name_additions +(select * from records); +COMMIT; + +select distinct table_schema +from information_schema.tables +where table_schema='inaturalist'; diff --git a/openverse_catalog/dags/providers/provider_csv_load_scripts/inaturalist/export_to_json.template.sql b/openverse_catalog/dags/providers/provider_csv_load_scripts/inaturalist/export_to_json.template.sql deleted file mode 100644 index d7cfef2f058..00000000000 --- a/openverse_catalog/dags/providers/provider_csv_load_scripts/inaturalist/export_to_json.template.sql +++ /dev/null @@ -1,87 +0,0 @@ -/* -------------------------------------------------------------------------------- -EXPORT TO JSON -------------------------------------------------------------------------------- - - ** Please note: This SQL will not run as is! You must replace offset_num and - batch_limit with integers representing the records you want to retrieve. - -Joining two very large normalized tables is difficult, in any data manipulation system. -PHOTOS has on the order of 120 million records, and OBSERVATIONS has on the order of -70 million records. We have to join them to get at the taxa (species) information for -any given photo. Taxa are the only descriptive text we have for inaturalist photos. - -The postgres query optimizer is smart enough to use limit and offset to make this like -joining multiple small-ish tables to a single large one which is not so difficult. The -ORDER BY requires a sort on each call, but the index on INATURALIST.PHOTOS.PHOTO_ID -makes that manageable. (More info on limit / offset and query optimization at -https://www.postgresql.org/docs/current/queries-limit.html) - -This query calls for 100 records at a time to match the pace of the python ingester. - -Alternative approaches considered: -- Let python do the pagination using a cursor: Joining the two full tables in one step - requires working through 120 million * 70 million possible combinations, and when - tested with the full dataset after 13 minutes the local machine was physically hot and - had not yet returned a single record. -- Use database pagination instead of limit/offset: This would have avoided the need to - sort / index, but might have introduced data quality risks (if postgres moved - things around while the job was running) and some pages appeared empty which required - more complicated python logic for retries. More on this approach at: - https://www.citusdata.com/blog/2016/03/30/five-ways-to-paginate/ - -Everything on iNaturalist is holding at version 4, except CC0 which is version 1.0. -License versions below are hard-coded from inaturalist -https://github.com/inaturalist/inaturalist/blob/d338ba76d82af83d8ad0107563015364a101568c/app/models/shared/license_module.rb#L5 -*/ - -select json_agg(records) -from ( - SELECT - INATURALIST.PHOTOS.PHOTO_ID as foreign_identifier, - INATURALIST.PHOTOS.WIDTH, - INATURALIST.PHOTOS.HEIGHT, - INATURALIST.TAXA.NAME as title, - array_to_json(INATURALIST.TAXA.ancestor_names) as raw_tags, - LOWER(INATURALIST.PHOTOS.EXTENSION) as filetype, - (CASE - WHEN INATURALIST.PHOTOS.LICENSE = 'CC-BY-NC-SA' - THEN 'http://creativecommons.org/licenses/by-nc-sa/4.0/' - WHEN INATURALIST.PHOTOS.LICENSE = 'CC-BY-NC' - THEN 'http://creativecommons.org/licenses/by-nc/4.0/' - WHEN INATURALIST.PHOTOS.LICENSE = 'CC-BY-NC-ND' - THEN 'http://creativecommons.org/licenses/by-nc-nd/4.0/' - WHEN INATURALIST.PHOTOS.LICENSE = 'CC-BY' - THEN 'http://creativecommons.org/licenses/by/4.0/' - WHEN INATURALIST.PHOTOS.LICENSE = 'CC-BY-SA' - THEN 'http://creativecommons.org/licenses/by-sa/4.0/' - WHEN INATURALIST.PHOTOS.LICENSE = 'CC-BY-ND' - THEN 'http://creativecommons.org/licenses/by-nd/4.0/' - WHEN INATURALIST.PHOTOS.LICENSE = 'PD' - THEN 'http://en.wikipedia.org/wiki/Public_domain' - WHEN INATURALIST.PHOTOS.LICENSE = 'CC0' - THEN 'http://creativecommons.org/publicdomain/zero/1.0/' - END) as license_url, - 'https://www.inaturalist.org/photos/' || INATURALIST.PHOTOS.PHOTO_ID - as foreign_landing_url, - 'https://inaturalist-open-data.s3.amazonaws.com/photos/' - || INATURALIST.PHOTOS.PHOTO_ID || '/medium.' || INATURALIST.PHOTOS.EXTENSION - as image_url, - COALESCE(INATURALIST.OBSERVERS.LOGIN, INATURALIST.PHOTOS.OBSERVER_ID::text) - as creator, - 'https://www.inaturalist.org/users/' || INATURALIST.PHOTOS.OBSERVER_ID - as creator_url - FROM INATURALIST.PHOTOS - INNER JOIN - INATURALIST.OBSERVATIONS ON - INATURALIST.PHOTOS.OBSERVATION_UUID = INATURALIST.OBSERVATIONS.OBSERVATION_UUID - INNER JOIN - INATURALIST.OBSERVERS ON - INATURALIST.PHOTOS.OBSERVER_ID = INATURALIST.OBSERVERS.OBSERVER_ID - INNER JOIN - INATURALIST.TAXA ON - INATURALIST.OBSERVATIONS.TAXON_ID = INATURALIST.TAXA.TAXON_ID - ORDER BY PHOTO_ID - LIMIT {batch_limit} - OFFSET {offset_num} -) as records; diff --git a/openverse_catalog/dags/providers/provider_csv_load_scripts/inaturalist/observers.sql b/openverse_catalog/dags/providers/provider_csv_load_scripts/inaturalist/observers.sql index 52672fdba43..2c021c74318 100644 --- a/openverse_catalog/dags/providers/provider_csv_load_scripts/inaturalist/observers.sql +++ b/openverse_catalog/dags/providers/provider_csv_load_scripts/inaturalist/observers.sql @@ -25,6 +25,16 @@ SELECT aws_s3.table_import_from_s3('inaturalist.observers', 'us-east-1'); ALTER TABLE inaturalist.observers ADD PRIMARY KEY (observer_id); + +/* +We prefer to use name, but many iNaturalist users only have a login id, not a name in +the observers table. +As of data from January 2023, out of 589,290 records, only 107,840 (18%) had name data. +Presumably, those with more photos/observations are more likely to have their name in +the table. Still, we will use the login if the name is unavailable. +*/ +ALTER TABLE inaturalist.observers ADD COLUMN creator varchar(255) ; +UPDATE inaturalist.observers SET creator = coalesce(name, login) ; COMMIT; SELECT count(*) FROM inaturalist.observers; diff --git a/openverse_catalog/dags/providers/provider_csv_load_scripts/inaturalist/photos.sql b/openverse_catalog/dags/providers/provider_csv_load_scripts/inaturalist/photos.sql index d53a173462c..ed4b0181bf5 100644 --- a/openverse_catalog/dags/providers/provider_csv_load_scripts/inaturalist/photos.sql +++ b/openverse_catalog/dags/providers/provider_csv_load_scripts/inaturalist/photos.sql @@ -6,8 +6,6 @@ PHOTOS constraint on observer_id in order to save load time -- photo_id is not unique. There are ~130,000 duplicate photo_ids (~0.1% of photos). Both records are saved to the TSV and only one is loaded back into to postgres. --- TO DO: See https://github.com/WordPress/openverse-catalog/issues/685 for more on - handling duplicate photo ids. Taking DDL from https://github.com/inaturalist/inaturalist-open-data/blob/main/Metadata/structure.sql @@ -41,4 +39,14 @@ SELECT aws_s3.table_import_from_s3('inaturalist.photos', -- more here: https://www.postgresql.org/docs/current/indexes-ordering.html CREATE INDEX ON INATURALIST.PHOTOS USING btree (PHOTO_ID); +DROP TABLE IF EXISTS inaturalist.photo_dupes; +CREATE TABLE inaturalist.photo_dupes as ( + SELECT PHOTO_ID, count(*) PHOTO_RECORDS + FROM INATURALIST.PHOTOS + GROUP BY PHOTO_ID + HAVING COUNT(*)>1 +); +ALTER TABLE inaturalist.photo_dupes ADD PRIMARY KEY (PHOTO_ID); +COMMIT; + SELECT count(*) FROM inaturalist.photos; diff --git a/openverse_catalog/dags/providers/provider_csv_load_scripts/inaturalist/taxa.sql b/openverse_catalog/dags/providers/provider_csv_load_scripts/inaturalist/taxa.sql index 347508b9147..35bdf4a7198 100644 --- a/openverse_catalog/dags/providers/provider_csv_load_scripts/inaturalist/taxa.sql +++ b/openverse_catalog/dags/providers/provider_csv_load_scripts/inaturalist/taxa.sql @@ -3,51 +3,256 @@ TAXA ------------------------------------------------------------------------------- -Taking DDL from +Taking iNaturalist DDL from https://github.com/inaturalist/inaturalist-open-data/blob/main/Metadata/structure.sql -Plus adding a field for ancestry tags. + +Integrating data from the Catalog of Life to create titles and tags. + +Integrating data from iNaturalist taxonomy table with Catalog of Life name data. + +Title: + - If the scientific name has one or more vernacular names, collect as many as will + fit into the title, separated by commas. + - If not, there are a few taxa that are very common where I googled an English name + and added it manually, so use that. + - Otherwise, use the iNaturalist name. There are a few where the name is "Not + assigned" but we're going to filter those records out & drop associated photos. + +Tags: + - If the title of a specific taxa is a vernacular name (from Catalog of Life or from + informal googling), put the iNaturalist name in the tags. + - If there are additional vernacular names that did not fit in the title, put them + in the tags. + - Put the titles of ancestors in the tags. + - Given the order of types of tags above plus alphabetical order, take only the + first 20 tags. + +Representing tags in this way to be consistent with python processing `_enrich_tags`: +TO DO #902: Find a DRYer way to do this enrichment with SQL */ -DROP TABLE IF EXISTS inaturalist.taxa; -COMMIT; +/* + ********** Create tag data type *********** +This at least makes the structure of tags a little more explicit in sql +*/ +DO $$ BEGIN + create type openverse_tag as (name varchar(255), provider varchar(255)); +EXCEPTION + WHEN duplicate_object THEN null; +END $$; +/* ********** Load raw iNaturalist Data ********* */ +DROP TABLE IF EXISTS inaturalist.taxa; CREATE TABLE inaturalist.taxa ( taxon_id integer, ancestry character varying(255), rank_level double precision, rank character varying(255), name character varying(255), - active boolean, - ancestor_names varchar[] + active boolean ); -COMMIT; - SELECT aws_s3.table_import_from_s3('inaturalist.taxa', 'taxon_id, ancestry, rank_level, rank, name, active', '(FORMAT ''csv'', DELIMITER E''\t'', HEADER, QUOTE E''\b'')', 'inaturalist-open-data', 'taxa.csv.gz', 'us-east-1'); - ALTER TABLE inaturalist.taxa ADD PRIMARY KEY (taxon_id); COMMIT; -WITH aggregated AS +/* + ********** Integrate Catalog of Life Data ********* + ++ aggregate the catalog of life vernacular names table to the scientific name level, + with string lists of names for titles and JSON arrays for tags ++ add in information from the manual table of common names and from inaturalist taxa, + so that every taxon record has the best possible title ++ add in inaturalist taxa base data, with the category of "Life" excluded from + inaturalist ancestry strings +*/ +drop table if exists inaturalist.taxa_with_vernacular; +create table inaturalist.taxa_with_vernacular as +( + with + col_vernacular_country_counts as + ( + /* + Some vernacular names differ only in case or punctuation, and + combining all of those into meaningful search terms is out of scope for + PR #745. See for example names associated with inaturalist taxon_id 47229 + +------------------------+---------+--------------------+---------+-----------+ + | scientificname | taxonid | taxon_name | records | countries | + |------------------------+---------+--------------------+---------+-----------| + | Selar crumenophthalmus | 4WD4V | bigeye scad | 19 | 16 | + | Selar crumenophthalmus | 4WD4V | big eye scad | 3 | 3 | + | Selar crumenophthalmus | 4WD4V | big-eye scad | 2 | 2 | + | Selar crumenophthalmus | 4WD4V | big-eyed scad | 1 | 0 | + | Selar crumenophthalmus | 4WD4V | bigeye scad atulai | 1 | 1 | + | Selaroides leptolepis | 4WD5B | bigeye scad | 1 | 0 | + +------------------------+---------+--------------------+---------+-----------+ + But, about 2/3 of the vernacular records do have some country information, so we + will use the distinct number of countries associated with a vernacular name to + prioritize it in terms of featuring it in titles and child tags, if there are + too many names to fit. We will also make everything lower case, to minimize + repetition. + TO DO: There is some risk that a missing country means "this is used everywhere" + and if so, we're deprioritizing the most important names, but I am going to hope + for the best for now. + */ + select + taxonid, + lower(trim(taxon_name)) as taxon_name, + count(distinct country) as countries + from inaturalist.col_vernacular + group by 1, 2 + ), + catalog_of_life_names as + ( + select + /* + Assuming that it's more efficient to join on a uuid than a string up to 56 + characters long. + */ + cast(md5(n.scientificname) as uuid) as md5_scientificname, + v.taxon_name, + /* + name_string_length -- Cumulative length of the title string (taxon names + plus comma and space) up to and including the current name, within each + scientific name, prioritizing names that are used in more countries. + Since we use ancestor titles as tags, this will help us choose a + sensible maximum length for titles. + */ + sum(length(v.taxon_name)+2) OVER (partition by n.scientificname + order by countries desc, v.taxon_name asc + rows between unbounded preceding and current row) + as name_string_length + FROM inaturalist.col_name_usage n + INNER JOIN col_vernacular_country_counts v on v.taxonid = n.id + ), + catalog_of_life as + ( + SELECT + md5_scientificname, + /* + Put as many vernacular names as possible in the title, and put the rest in + tags. + Originally, we were using the image.title field length (5,000) as the cutoff + here. But, we're using ancestor titles as tags downstream. It doesn't make + sense to have tags that are thousands of characters long. And 99.9% of taxa + have titles under 300 characters long anyway. + */ + string_agg(DISTINCT + case when name_string_length < 256 + then taxon_name end, + ', ') name_string, + array_agg(DISTINCT cast((taxon_name, 'inaturalist') as openverse_tag)) + FILTER (where name_string_length >= 256) + as tags_vernacular + FROM catalog_of_life_names + group by 1 + ) + select + taxa.taxon_id, + (case when ancestry='48460' then '' else replace(taxa.ancestry,'48460/','') end) + as ancestry, --exclude 'Life' from ancestry + coalesce( + catalog_of_life.name_string, --string list of vernacular names + manual_name_additions.vernacular_name, --name_manual + taxa.name -- name_inaturalist + ) as title, + /* + If the inaturalist name will not be the title, get ready to add it as a tag. + */ + (case when catalog_of_life.name_string is not null or + manual_name_additions.vernacular_name is not null + then to_jsonb(array_fill(cast((taxa.name, 'inaturalist') as openverse_tag), array[1])) + end) inaturalist_name_tag, + /* + We don't want more than 20 tags total, so doesn't make sense to have more than + 20 vernacular tags. + */ + to_jsonb(tags_vernacular[1:20]) tags_vernacular + from inaturalist.taxa + LEFT JOIN catalog_of_life + on (cast(md5(taxa.name) as uuid) = catalog_of_life.md5_scientificname) + LEFT JOIN inaturalist.manual_name_additions + on (cast(md5(taxa.name) as uuid) = manual_name_additions.md5_scientificname) + where taxa.name <> 'Not assigned' +); +ALTER TABLE inaturalist.taxa_with_vernacular ADD PRIMARY KEY (taxon_id); +COMMIT; + +/* + ********** Create enriched table with ancestry tags ********* + +Join each record to all of its ancestor records and aggregate ancestor titles into the +tags along with json strings from the enriched data above. + + expand each ancestry string into an array + + get the taxa record for each value of the array + + aggregate back to the original taxon level, with tags for ancestor names (titles) + +For example, in the taxa table there is the following record, think of this as a single +child leaf on a tree of different taxonomic groups: + taxon_id: 6930 + ancestry: "48460/1/2/355675/3/6888/6912/6922" + rank_level: 10 + rank: species + name: Anas platyrhynchos + active: TRUE + +Expanding the ancestry string into an array gets us this array of taxon_ids: + [48460, 1, 2, 355675, 3, 6888, 6912, 6922] + +Using a self-join on the taxa table to bring together all of the other taxa records that +match any of those taxon_ids gets us something like: + child.taxon_id child.title ancestor.taxon_id ancestor.title + -------------- -------------------- ----------------- ------------------ + 6930 Anas platyrhynchos 48460 Life + 6930 Anas platyrhynchos 1 Animalia + 6930 Anas platyrhynchos 2 Chordata + 6930 Anas platyrhynchos 355675 Vertebrata + 6930 Anas platyrhynchos 3 Aves + 6930 Anas platyrhynchos 6888 Anseriformes + 6930 Anas platyrhynchos 6912 Anatidae + 6930 Anas platyrhynchos 6922 Anas + +Which we can then group / aggregate back up to the child taxon level when we're +generating a tag list. +*/ +DROP table if exists inaturalist.taxa_enriched; +create table inaturalist.taxa_enriched as ( select child.taxon_id, - array_agg(ancestors.name) as ancestor_names + child.title, + jsonb_path_query_array( + ( + /* + concatenating jsonb arrays works as long as you have an empty array + rather than a null::jsonb + */ + coalesce(child.inaturalist_name_tag, to_jsonb(array[]::openverse_tag[])) + || coalesce(jsonb_agg(DISTINCT + cast((ancestors.title,'inaturalist') as openverse_tag)) + FILTER (where ancestors.title is not null), + to_jsonb(array[]::openverse_tag[])) + || coalesce(child.tags_vernacular, + to_jsonb(array[]::openverse_tag[])) + ), + /* + Use the jsonb query to retrieve only the first 20 values of the array that + combines inaturalist, vernacular and ancestor tags. + */ + '$[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]') + as tags from - inaturalist.taxa ancestors, - inaturalist.taxa child - where ancestors.taxon_id = ANY (string_to_array(child.ancestry, '/')::int[]) - and ancestors.rank not in ('stateofmatter','epifamily','zoosection') - group by child.taxon_id -) -UPDATE inaturalist.taxa -SET ancestor_names = aggregated.ancestor_names -from aggregated -where taxa.taxon_id = aggregated.taxon_id; + inaturalist.taxa_with_vernacular child + left join inaturalist.taxa_with_vernacular ancestors + on (ancestors.taxon_id = ANY (string_to_array(child.ancestry, '/')::int[])) + group by child.taxon_id, child.title, + child.inaturalist_name_tag, child.tags_vernacular +); +ALTER TABLE inaturalist.taxa_enriched ADD PRIMARY KEY (taxon_id); COMMIT; -SELECT count(*) FROM inaturalist.taxa; +SELECT count(*) FROM inaturalist.taxa_enriched; diff --git a/openverse_catalog/dags/providers/provider_csv_load_scripts/inaturalist/transformed_table.template.sql b/openverse_catalog/dags/providers/provider_csv_load_scripts/inaturalist/transformed_table.template.sql new file mode 100644 index 00000000000..a707506015b --- /dev/null +++ b/openverse_catalog/dags/providers/provider_csv_load_scripts/inaturalist/transformed_table.template.sql @@ -0,0 +1,77 @@ +/* +------------------------------------------------------------------------------- +Load Intermediate Table +------------------------------------------------------------------------------- + + ** Please note: This SQL will not run as is! You must replace offset_num and + batch_limit with integers representing the records you want to retrieve. + +Joining two very large normalized tables is difficult, in any data manipulation system. +PHOTOS has on the order of 120 million records, and OBSERVATIONS has on the order of +70 million records. We have to join them to get at the taxa (species) information for +any given photo. Taxa are the only descriptive text we have for inaturalist photos. + +Using image columns version 001 from common.storage.tsv_columns. +*/ + +INSERT INTO {intermediate_table} +( + SELECT + /* + The same photo_id can have multiple records, for example if there are multiple + observations for separate species. But it only actually happens in about 0.1% of + the photos. For now, we're skipping them. + TO DO #685: Figure out aggregating tags and titles or formatting alternate + foreign ids for photos with multiple taxa and process them separately. + (If we go the alternate foreign id way, we'd want to drop photos loaded in the + first inaturalist load.) + */ + INATURALIST.PHOTOS.PHOTO_ID as FOREIGN_IDENTIFIER, + 'https://www.inaturalist.org/photos/' || INATURALIST.PHOTOS.PHOTO_ID + as LANDING_URL, + 'https://inaturalist-open-data.s3.amazonaws.com/photos/' + || INATURALIST.PHOTOS.PHOTO_ID || '/original.' || INATURALIST.PHOTOS.EXTENSION + as DIRECT_URL, + -- TO DO #810: Add the thumbnail url here. + null::varchar(10) as THUMBNAIL, + -- TO DO #966: jpg, jpeg, png & gif in 6/2022 data + lower(INATURALIST.PHOTOS.EXTENSION) as FILETYPE, + null::int as FILESIZE, + INATURALIST.LICENSE_CODES.OPENVERSE_CODE as LICENSE, + INATURALIST.LICENSE_CODES.LICENSE_VERSION, + INATURALIST.OBSERVERS.CREATOR, + 'https://www.inaturalist.org/users/' || INATURALIST.PHOTOS.OBSERVER_ID + as CREATOR_URL, + taxa_enriched.title, + LICENSE_CODES.license_url_metadata as META_DATA, + taxa_enriched.tags, + 'photograph' as CATEGORY, + null::boolean as WATERMARKED, + 'inaturalist' as PROVIDER, + 'inaturalist' as SOURCE, + 'sql_bulk_load' as INGESTION_TYPE, + INATURALIST.PHOTOS.WIDTH, + INATURALIST.PHOTOS.HEIGHT + FROM INATURALIST.PHOTOS + INNER JOIN + INATURALIST.OBSERVATIONS ON + INATURALIST.PHOTOS.OBSERVATION_UUID = INATURALIST.OBSERVATIONS.OBSERVATION_UUID + INNER JOIN + INATURALIST.OBSERVERS ON + INATURALIST.PHOTOS.OBSERVER_ID = INATURALIST.OBSERVERS.OBSERVER_ID + INNER JOIN + INATURALIST.TAXA_ENRICHED ON + INATURALIST.OBSERVATIONS.TAXON_ID = INATURALIST.TAXA_ENRICHED.TAXON_ID + INNER JOIN + INATURALIST.LICENSE_CODES ON + INATURALIST.PHOTOS.LICENSE = INATURALIST.LICENSE_CODES.INATURALIST_CODE + WHERE INATURALIST.PHOTOS.PHOTO_ID BETWEEN {batch_start} AND {batch_end} + AND NOT(EXISTS(SELECT 1 FROM INATURALIST.PHOTO_DUPES + WHERE PHOTO_DUPES.PHOTO_ID BETWEEN {batch_start} AND {batch_end} + AND PHOTO_DUPES.PHOTO_ID = PHOTOS.PHOTO_ID)) +) +; +COMMIT; + +SELECT count(*) transformed_records, max(FOREIGN_IDENTIFIER) max_id_loaded +FROM {intermediate_table} ; diff --git a/openverse_catalog/dags/providers/provider_dag_factory.py b/openverse_catalog/dags/providers/provider_dag_factory.py index cfd63929b0a..d6ba82d0c58 100644 --- a/openverse_catalog/dags/providers/provider_dag_factory.py +++ b/openverse_catalog/dags/providers/provider_dag_factory.py @@ -67,6 +67,7 @@ from string import Template from airflow import DAG +from airflow.models.param import Param from airflow.operators.empty import EmptyOperator from airflow.operators.python import PythonOperator from airflow.utils.task_group import TaskGroup @@ -327,10 +328,19 @@ def create_provider_api_workflow_dag(conf: ProviderWorkflow): ], render_template_as_native_obj=True, user_defined_macros={"date_partition_for_prefix": date_partition_for_prefix}, + # Delete source data from airflow and db once ingestion is complete. Here as a + # parameter to support data quality testing in sql-only dags like iNaturalist. + params={"sql_rm_source_data_after_ingesting": Param(True, type="boolean")}, ) with dag: - ingest_data, ingestion_metrics = create_ingestion_workflow(conf) + if callable(getattr(conf.ingester_class, "create_ingestion_workflow", None)): + ( + ingest_data, + ingestion_metrics, + ) = conf.ingester_class.create_ingestion_workflow() + else: + ingest_data, ingestion_metrics = create_ingestion_workflow(conf) report_load_completion = create_report_load_completion( conf.dag_id, conf.media_types, ingestion_metrics, conf.dated diff --git a/tests/dags/providers/provider_api_scripts/resources/inaturalist/check_creator_metadata.py b/tests/dags/providers/provider_api_scripts/resources/inaturalist/check_creator_metadata.py new file mode 100644 index 00000000000..901551afdfb --- /dev/null +++ b/tests/dags/providers/provider_api_scripts/resources/inaturalist/check_creator_metadata.py @@ -0,0 +1,23 @@ +""" +Couple of additional data quality checks before merging this PR +https://github.com/WordPress/openverse-catalog/pull/745 +""" + +from pathlib import Path + +import pandas as pd + + +BASE_DIR = Path(__file__).parents[7] +FULL_DOWNLOADS = BASE_DIR / "inaturalist-downloads/dec-2022" +PHOTOS_CSV = FULL_DOWNLOADS / "photos.csv.gz" +OBSERVERS_CSV = FULL_DOWNLOADS / "observers.csv.gz" + + +observers = pd.read_csv(OBSERVERS_CSV, delimiter="\t") +print(f"The file has {len(observers)} observer records, and") +print(observers[["login", "name"]].count()) +# # results 1/13/2023 +# The file has 589290 observer records, and +# login 589290 +# name 107840 diff --git a/tests/dags/providers/provider_api_scripts/resources/inaturalist/exploring_catalog_of_life.py b/tests/dags/providers/provider_api_scripts/resources/inaturalist/exploring_catalog_of_life.py new file mode 100644 index 00000000000..24784717b9d --- /dev/null +++ b/tests/dags/providers/provider_api_scripts/resources/inaturalist/exploring_catalog_of_life.py @@ -0,0 +1,113 @@ +from pathlib import Path + +import pandas as pd + + +EXTERNAL_ROOT = Path(__file__).parents[7] + +# catalog of life data downloaded manually from +# https://api.checklistbank.org/dataset/9840/export.zip?format=ColDP +# renamed, and stored in this folder +SOURCE_DATA_PATH = EXTERNAL_ROOT / "inaturalist-june-22/catalog_of_life" +local_zip_file = "COL_archive.zip" +name_usage_file = "NameUsage.tsv" +vernacular_file = "VernacularName.tsv" + + +# prettying up the output +def print_header(header): + padding = int((88 - len(header)) / 2) + print("\n", "=" * padding, header, "=" * padding) + + +# handle common formatting for reading these files +def load_to_pandas(file_name): + df = pd.read_csv( + SOURCE_DATA_PATH / file_name, + delimiter="\t", + quotechar="\b", + ) + df.columns = [c[4:] for c in df.columns] + return df + + +# Just for an overview +def basic_stats(df, df_name): + print_header(df_name) + num_records = len(df) + print(f"{num_records=}") + col_width = max([len(c) for c in df.columns]) + for c in df.columns: + col_padding = col_width - len(c) + print(" ", c, " " * col_padding, num_records - df[c].isna().sum()) + + +# Can't remember if pd can read a specific file within a zip file, so unzipping first +# with zipfile.ZipFile(SOURCE_DATA_PATH / local_zip_file) as z: +# with open(SOURCE_DATA_PATH / name_usage_file, "wb") as f: +# f.write(z.read(name_usage_file)) +# with open(SOURCE_DATA_PATH / vernacular_file, "wb") as f: +# f.write(z.read(vernacular_file)) +NameUsageDF = load_to_pandas(name_usage_file) +basic_stats(NameUsageDF, "NameUsageDF") + +VernacularDF = load_to_pandas(vernacular_file) +basic_stats(VernacularDF, "VernacularDF") + +JoinedDF = ( + VernacularDF[ + [ + "name", + "language", + "sourceID", + "taxonID", + ] + ] + .merge( + NameUsageDF[ + [ + "ID", + "scientificName", + "scrutinizerDate", + "temporalRangeEnd", + "genericName", + ] + ], + how="inner", + left_on="taxonID", + right_on="ID", + ) + .drop(columns=["ID"]) +) +basic_stats(JoinedDF, "JoinedDF") + +print_header("Name Usage IDs") +print("Length\n", NameUsageDF.ID.str.len().value_counts()) +print("Is Numeric\n", NameUsageDF.ID.str.isnumeric().value_counts()) +print( + f"Max length of a taxon id in the vernacular table is " + f"{max(VernacularDF.taxonID.str.len())}" +) +print( + "ID outlier(s)\n", + NameUsageDF.loc[ + NameUsageDF.ID.str.len() > max(VernacularDF.taxonID.str.len()), "ID" + ], +) + +print_header("Vernacular Name Counts") +total_names = JoinedDF.scientificName.value_counts() +english_names = JoinedDF.loc[ + JoinedDF.language == "eng", : +].scientificName.value_counts() +# name_stats = pd.merge(total_names, english_names, how="left") +# print(len(name_stats)) +print( + f"{len(total_names)} scientificName values, " + f"{len(english_names)} have English names." +) +print("Total name counts\n", total_names.value_counts().sort_index()) +print("English name counts\n", english_names.value_counts().sort_index()) + +print_header("Length of / joining on Scientific Name") +print(JoinedDF.scientificName.str.len().value_counts().sort_index()) diff --git a/tests/dags/providers/provider_api_scripts/resources/inaturalist/pull_sample_records.py b/tests/dags/providers/provider_api_scripts/resources/inaturalist/pull_sample_records.py index afa56adaef3..0adff7b3bcb 100644 --- a/tests/dags/providers/provider_api_scripts/resources/inaturalist/pull_sample_records.py +++ b/tests/dags/providers/provider_api_scripts/resources/inaturalist/pull_sample_records.py @@ -25,18 +25,6 @@ SMALL_FILE_PATH = BASE_DIR / "inaturalist-june-22/small" -# This is just something I used in ipython to move things around -def move_files(from_dir, to_dir, copy=False): - for file_name in os.listdir(from_dir): - if file_name[-7:] == ".csv.gz": - if copy: - print(f"Copying {file_name} from {from_dir} to {to_dir}...") - os.system(f"cp {from_dir}/{file_name} {to_dir}/{file_name}") - else: - print(f"Moving {file_name} from {from_dir} to {to_dir}...") - os.rename(f"{from_dir}/{file_name}", f"{to_dir}/{file_name}") - - def pull_sample_records( file_name, id_name, @@ -59,7 +47,7 @@ def pull_sample_records( output_file_name = output_path / file_name # read in the selected records sample_records = [] - remaining_ids = id_list.copy() + remaining_ids = set(id_list) print( "Starting to read", working_input_file, @@ -104,7 +92,7 @@ def get_sample_id_list(sample_file, joined_on): sample_values = set() for record in records: sample_values.add(record[joined_on]) - return list(sample_values) + return sample_values if __name__ == "__main__": @@ -147,12 +135,12 @@ def get_sample_id_list(sample_file, joined_on): id_name="photo_id", id_list=photo_ids, is_unique=False, - every_nth_record=None, - output_path=DATA_FOR_TESTING, + every_nth_record=50_000, + output_path=MID_SIZE_FILE_PATH, ) # ASSOCIATED OBSERVATIONS - with gzip.open(f"{DATA_FOR_TESTING}/photos.csv.gz", "rt") as photo_output: + with gzip.open(f"{MID_SIZE_FILE_PATH}/photos.csv.gz", "rt") as photo_output: sample_observations = get_sample_id_list(photo_output, "observation_uuid") pull_sample_records( "observations.csv.gz", @@ -160,11 +148,11 @@ def get_sample_id_list(sample_file, joined_on): sample_observations, False, None, - DATA_FOR_TESTING, + MID_SIZE_FILE_PATH, ) # ASSOCIATED OBSERVERS - with gzip.open(f"{DATA_FOR_TESTING}/photos.csv.gz", "rt") as photo_output: + with gzip.open(f"{MID_SIZE_FILE_PATH}/photos.csv.gz", "rt") as photo_output: sample_observers = get_sample_id_list(photo_output, "observer_id") pull_sample_records( "observers.csv.gz", @@ -172,12 +160,12 @@ def get_sample_id_list(sample_file, joined_on): sample_observers, False, None, - DATA_FOR_TESTING, + MID_SIZE_FILE_PATH, ) # ASSOCIATED TAXA (including ancestry for photo tags) with gzip.open( - f"{DATA_FOR_TESTING}/observations.csv.gz", "rt" + f"{MID_SIZE_FILE_PATH}/observations.csv.gz", "rt" ) as observation_output: sample_taxa = get_sample_id_list(observation_output, "taxon_id") sample_taxa_with_ancestors = set(sample_taxa) @@ -192,5 +180,5 @@ def get_sample_id_list(sample_file, joined_on): list(sample_taxa_with_ancestors), False, None, - DATA_FOR_TESTING, + MID_SIZE_FILE_PATH, ) diff --git a/tests/dags/providers/provider_api_scripts/test_inaturalist.py b/tests/dags/providers/provider_api_scripts/test_inaturalist.py index f6bc39887b8..79deddfd384 100644 --- a/tests/dags/providers/provider_api_scripts/test_inaturalist.py +++ b/tests/dags/providers/provider_api_scripts/test_inaturalist.py @@ -1,20 +1,40 @@ from ast import literal_eval from pathlib import Path +from unittest import mock import pytest +from airflow.models import TaskInstance from airflow.providers.postgres.hooks.postgres import PostgresHook -from common.constants import POSTGRES_CONN_ID -from common.licenses import get_license_info +from common.constants import IMAGE, POSTGRES_CONN_ID +from common.loader.reporting import RecordMetrics from providers.provider_api_scripts import inaturalist -# Sample data included in the tests below covers the following weird cases: +# TO DO #898: Most of the transformations for inaturalist are in SQL, and testing them +# effectively could mean looking more closely at the production data itself. +# For now, sample data included in the tests below covers the following weird cases: # - 3 of the photo records link to unclassified observations, so they have no title or # tags and we don't load them. # - 10 of the remaining photo ids appear on multiple records, load one per photo_id. # - One of the photos has a taxon without any ancestors in the source table. -# To really get at data quality issues, it's worth loading bigger sample files to minio -# and running the dag in airflow locally. +# +# To get at data quality issues, it's worth loading bigger sample files to minio and +# running the dag in airflow locally: +# - Use the aws cli to download these separate files observations.csv.gz, +# observers.csv.gz, photos.csv.gz, and taxa.csv.gz from s3://inaturalist-open-data +# - Consider whether you want to really test the full dataset, which may take a couple +# of days to run locally, and will definitely take 10s of GB. If not, maybe use +# tests/dags/providers/provider_api_scripts/resources/inaturalist/pull_sample_records.py +# to pull a sample of records without breaking referential integrity. +# - Putting your final zipped test files in /tests/s3-data/inaturalist-open-data +# so that they will be synced over to minio. +# - Run `just down -v` and then `just recreate` to make sure that the test data gets to +# the test s3 instance. That process may take on the order of 15 minutes for the full +# dataset. You'll know that it's done when the s3-load container in docker exits. +# - Then, in airflow, trigger the dag possibly with configuration +# {"sql_rm_source_data_after_ingesting": false} so that a) you don't have to +# download catalog of life data over and over, and b) you can compare the results in +# the image table to the raw data in the inaturalist schema. INAT = inaturalist.INaturalistDataIngester() @@ -53,78 +73,100 @@ def test_load_data(file_name, expected): assert actual == expected -def test_get_next_query_params_no_prior(): - expected = {"offset_num": 0} - actual = INAT.get_next_query_params() - assert expected == actual - - -def test_get_next_query_params_prior_0(): - expected = {"offset_num": INAT.batch_limit} - actual = INAT.get_next_query_params({"offset_num": 0}) - assert expected == actual +@pytest.mark.parametrize("value", [None, {"offset_num": 0}]) +def test_get_next_query_params(value): + with pytest.raises( + NotImplementedError, + match="Instead we use get_batches to dynamically create subtasks.", + ): + INAT.get_next_query_params(value) @pytest.mark.parametrize("value", [None, {}]) def test_get_batch_data_returns_none(value): - actual = INAT.get_batch_data(value) - assert actual is None + with pytest.raises( + NotImplementedError, match="TSV files from AWS S3 processed in postgres." + ): + INAT.get_batch_data(value) def test_get_response_json(): - expected = JSON_RESPONSE - actual = INAT.get_response_json({"offset_num": 0}) - assert actual == expected + with pytest.raises( + NotImplementedError, match="TSV files from AWS S3 processed in postgres." + ): + INAT.get_response_json({"offset_num": 0}) def test_get_batch_data_full_response(): - actual = INAT.get_batch_data(JSON_RESPONSE) - assert isinstance(actual, list) - assert len(actual) == 34 - assert isinstance(actual[0], dict) - assert actual[0] == RECORD0 + with pytest.raises( + NotImplementedError, match="TSV files from AWS S3 processed in postgres." + ): + INAT.get_batch_data(JSON_RESPONSE) @pytest.mark.parametrize("field", ["license_url", "foreign_identifier"]) def test_get_record_data_missing_necessarly_fields(field): - expected = None - record = RECORD0.copy() - record.pop(field) - actual = INAT.get_record_data(record) - assert actual == expected + with pytest.raises( + NotImplementedError, match="TSV files from AWS S3 processed in postgres." + ): + INAT.get_record_data(RECORD0) def test_get_record_data_full_response(): - expected = { - "foreign_identifier": 10314159, - "filetype": "jpg", - "license_info": get_license_info( - license_url="http://creativecommons.org/licenses/by-nc/4.0/" - ), - "width": 1530, - "height": 2048, - "foreign_landing_url": "https://www.inaturalist.org/photos/10314159", - "image_url": "https://inaturalist-open-data.s3.amazonaws.com/photos/10314159/medium.jpg", - "creator": "akjenny", - "creator_url": "https://www.inaturalist.org/users/615549", - "title": "Trifolium hybridum", - "raw_tags": [ - "Fabaceae", - "Fabales", - "Magnoliopsida", - "Angiospermae", - "Plantae", - "Trifolium", - "Tracheophyta", - "Faboideae", - "Trifolieae", - ], - } - actual = INAT.get_record_data(RECORD0) - assert actual == expected + with pytest.raises( + NotImplementedError, match="TSV files from AWS S3 processed in postgres." + ): + INAT.get_record_data(RECORD0) def test_get_media_type(): expected = "image" - actual = INAT.get_media_type(INAT.get_record_data(RECORD0)) + actual = INAT.get_media_type({"some test": "data"}) + assert actual == expected + + +@pytest.mark.parametrize( + "all_results, expected", + [ + (None, None), + ( + [ + { + "loaded": 0, + "max_id_loaded": None, + "missing_columns": 0, + "foreign_id_dup": 0, + "upserted": 0, + "duration": 0.07186045899288729, + }, + { + "loaded": 1, + "max_id_loaded": "10314159", + "missing_columns": 0, + "foreign_id_dup": 0, + "upserted": 1, + "duration": 0.0823216249991674, + }, + ], + {IMAGE: RecordMetrics(1, 0, 0, 0)}, + ), + ], +) +def test_consolidate_load_statistics(all_results, expected): + ti_mock = mock.MagicMock(spec=TaskInstance) + actual = INAT.consolidate_load_statistics(all_results, ti_mock) assert actual == expected + + +@pytest.mark.parametrize( + "batch_length, max_id, expected", + [ + pytest.param(10, [(22,)], [[(0, 9)], [(10, 19)], [(20, 29)]], id="happy_path"), + pytest.param(10, [(2,)], [[(0, 9)]], id="bigger_batch_than_id"), + pytest.param(10, [(None,)], None, id="no_data"), + ], +) +def test_get_batches(batch_length, max_id, expected): + with mock.patch.object(PostgresHook, "get_records", return_value=max_id) as pg_mock: + actual = INAT.get_batches(batch_length, pg_mock) + assert actual == expected