From a36949ad4cc9005e7361e863897caefdbbd6f946 Mon Sep 17 00:00:00 2001 From: dcuesta-pass Date: Tue, 26 Nov 2024 10:53:27 +0100 Subject: [PATCH 1/8] (DE-70) feat: Update SQL import scripts and Python utility functions for offline recommendations --- jobs/ml_jobs/offline_recommendation/utils.py | 28 +++++++++--- .../import/day_plus_fifty_after_deposit.sql | 42 ++++++++++++++++++ .../sql/import/day_plus_thirty_inactivity.sql | 44 +++++++++++++++++++ .../sql/import/day_plus_two_after_booking.sql | 9 ++-- .../sql/import/first_booking.sql | 9 ++-- .../dags/jobs/ml/offline_recommendations.py | 3 ++ 6 files changed, 120 insertions(+), 15 deletions(-) create mode 100644 orchestration/dags/dependencies/ml/offline_recommendation/sql/import/day_plus_fifty_after_deposit.sql create mode 100644 orchestration/dags/dependencies/ml/offline_recommendation/sql/import/day_plus_thirty_inactivity.sql diff --git a/jobs/ml_jobs/offline_recommendation/utils.py b/jobs/ml_jobs/offline_recommendation/utils.py index 69bb5dd537..4d2bb2359c 100644 --- a/jobs/ml_jobs/offline_recommendation/utils.py +++ b/jobs/ml_jobs/offline_recommendation/utils.py @@ -2,10 +2,11 @@ import io import os from multiprocessing import cpu_count - +import pdb import numpy as np import polars as pl import requests +from loguru import logger from google.cloud import bigquery from access_gcp_secrets import access_secret @@ -19,9 +20,9 @@ API_TOKEN = "test_token" APP_CONFIG = { "URL": { - "dev": "https://apireco.testing.passculture.team", + "dev": "https://api-reco-dev.data.ehp.internal.passculture.team", "stg": "https://apireco.staging.passculture.team/", - "prod": "https://apireco.passculture.app", + "prod": "https://apireco-prod-962488981524.europe-west1.run.app", }, "TOKEN": API_TOKEN, "route": "similar_offers", @@ -42,6 +43,7 @@ def get_offline_recos(data): for chunk in list(np.array_split(data.rows(named=True), batch_number)) ] + logger.info(f"And {len(batch_rows)} batches..") with concurrent.futures.ProcessPoolExecutor(batch_number) as executor: futures = executor.map( _get_recos, @@ -53,20 +55,29 @@ def get_offline_recos(data): def _get_recos(rows): + # logger.info("get recos") + # pdb.set_trace() results = [] try: for row in rows: try: + # logger.info("Request check") + # pdb.set_trace() reco = similar_offers( row["offer_id"], row["venue_longitude"], row["venue_latitude"] )[:N_RECO_DISPLAY] + # logger.info("Request Sucess!") + # pdb.set_trace() except Exception: + # logger.info("Request failed!") + # pdb.set_trace() reco = [] - results.append( - {"user_id": row["user_id"], "offer_id": row["offer_id"], "recos": reco} - ) + results.append( + {"user_id": row["user_id"], "offer_id": row["offer_id"], "recos": reco} + ) return results except Exception: + logger.info("get recos failed") return results @@ -74,7 +85,10 @@ def similar_offers(offer_id, longitude, latitude): params_filter = { "is_reco_shuffled": False, } - return call_API(offer_id, longitude, latitude, params_filter)["results"] + res = call_API(offer_id, longitude, latitude, params_filter)["results"] + # logger.info(f"Check on res:{res}") + # pdb.set_trace() + return res def call_API(input, longitude, latitude, params_filter): diff --git a/orchestration/dags/dependencies/ml/offline_recommendation/sql/import/day_plus_fifty_after_deposit.sql b/orchestration/dags/dependencies/ml/offline_recommendation/sql/import/day_plus_fifty_after_deposit.sql new file mode 100644 index 0000000000..09aed9d27f --- /dev/null +++ b/orchestration/dags/dependencies/ml/offline_recommendation/sql/import/day_plus_fifty_after_deposit.sql @@ -0,0 +1,42 @@ +with + user_with_first_deposit as ( + select + user_id, + user_postal_code, + first_deposit_creation_date, + current_date() as current_date + from `{{ bigquery_analytics_dataset }}.global_user` + where date(first_deposit_creation_date) = date_sub(current_date(), interval 50 day) + ), + events as ( + select + user_id, + offer_id, + event_date, + extract(hour from event_timestamp) as event_hour, + extract(dayofweek from event_timestamp) as event_day, + extract(month from event_timestamp) as event_month + from `{{ bigquery_int_firebase_dataset }}`.`native_event` + where + event_name = "ConsultOffer" + and event_date >= date_sub(date("{{ ds }}"), interval 50 day) + and user_id is not null + and offer_id is not null + and offer_id != 'NaN' + ) +select + e.user_id, + uob.user_postal_code, + e.event_date, + e.event_hour, + e.event_day, + e.event_month, + e.offer_id, + e.item_id, + eom.offer_subcategory_id, + eom.search_group_name +from events e +join user_with_first_deposit uob on e.user_id = uob.user_id +join + `{{ bigquery_int_applicative_dataset }}.offer_metadata` eom + on eom.offer_id = e.offer_id diff --git a/orchestration/dags/dependencies/ml/offline_recommendation/sql/import/day_plus_thirty_inactivity.sql b/orchestration/dags/dependencies/ml/offline_recommendation/sql/import/day_plus_thirty_inactivity.sql new file mode 100644 index 0000000000..40323081be --- /dev/null +++ b/orchestration/dags/dependencies/ml/offline_recommendation/sql/import/day_plus_thirty_inactivity.sql @@ -0,0 +1,44 @@ +with + users_with_last_booking as ( + select + user_id, + user_postal_code, + last_booking_date, + current_date() as current_date + from `{{ bigquery_analytics_dataset }}.global_user` + where date(last_booking_date) = date_sub(current_date(), interval 30 day) + and user_is_active = true + ), + bookings as ( + select + user_id, + offer_id, + booking_created_at as event_date, + extract(hour from booking_created_at) as event_hour, + extract(dayofweek from booking_created_at) as event_day, + extract(month from booking_created_at) as event_month, + item_id, + venue_id + from `{{ bigquery_analytics_dataset }}.global_booking` + where booking_created_at >= date_sub(current_date(), interval 30 day) + and user_id is not null + and offer_id is not null + and offer_id != 'NaN' + ) +select + b.user_id, + u.user_postal_code, + b.event_date, + b.event_hour, + b.event_day, + b.event_month, + b.offer_id, + b.item_id, + eom.offer_subcategory_id, + eom.search_group_name, + evd.venue_latitude, + evd.venue_longitude +from bookings b +join users_with_last_booking u on b.user_id = u.user_id +join `{{ bigquery_int_applicative_dataset }}.offer_metadata` eom on eom.offer_id = b.offer_id +join `{{ bigquery_analytics_dataset }}.global_venue` evd on evd.venue_id = b.venue_id \ No newline at end of file diff --git a/orchestration/dags/dependencies/ml/offline_recommendation/sql/import/day_plus_two_after_booking.sql b/orchestration/dags/dependencies/ml/offline_recommendation/sql/import/day_plus_two_after_booking.sql index 1b846236fc..050ad9d16e 100644 --- a/orchestration/dags/dependencies/ml/offline_recommendation/sql/import/day_plus_two_after_booking.sql +++ b/orchestration/dags/dependencies/ml/offline_recommendation/sql/import/day_plus_two_after_booking.sql @@ -3,10 +3,10 @@ with select user_id, user_postal_code, - first_booking_date as first_booking_date, + first_individual_booking_date as first_booking_date, current_date() as current_date from `{{ bigquery_analytics_dataset }}.global_user` - where date(first_booking_date) = date_sub(current_date(), interval 2 day) + where date(first_individual_booking_date) = date_sub(current_date(), interval 2 day) ) select ebd.user_id, @@ -17,12 +17,13 @@ select ebd.booking_created_at as booking_creation_date, uob.current_date, ebd.offer_id, - eom.subcategory_id, + ebd.item_id, + eom.offer_subcategory_id, eom.search_group_name from `{{ bigquery_analytics_dataset }}.global_booking` ebd join user_with_one_booking uob on ebd.user_id = uob.user_id join - `{{ bigquery_clean_dataset }}.int_applicative__offer_metadata` eom + `{{ bigquery_int_applicative_dataset }}.offer_metadata` eom on eom.offer_id = ebd.offer_id join `{{ bigquery_analytics_dataset }}.global_venue` evd on evd.venue_id = ebd.venue_id where uob.first_booking_date = ebd.booking_creation_date diff --git a/orchestration/dags/dependencies/ml/offline_recommendation/sql/import/first_booking.sql b/orchestration/dags/dependencies/ml/offline_recommendation/sql/import/first_booking.sql index 2a2011c332..f20adda6e8 100644 --- a/orchestration/dags/dependencies/ml/offline_recommendation/sql/import/first_booking.sql +++ b/orchestration/dags/dependencies/ml/offline_recommendation/sql/import/first_booking.sql @@ -1,9 +1,9 @@ with user_with_one_booking as ( - select user_id, last_booking_date, first_booking_date, user_postal_code + select user_id, last_booking_date, first_individual_booking_date, user_postal_code from `{{ bigquery_analytics_dataset }}.global_user` where - last_booking_date = first_booking_date + last_booking_date = first_individual_booking_date and last_booking_date >= date_sub(current_date(), interval 7 day) ) select @@ -12,11 +12,12 @@ select evd.venue_latitude, evd.venue_longitude, ebd.offer_id, - eom.subcategory_id, + ebd.item_id, + eom.offer_subcategory_id, eom.search_group_name from `{{ bigquery_analytics_dataset }}.global_booking` ebd join user_with_one_booking uob on ebd.user_id = uob.user_id join - `{{ bigquery_clean_dataset }}.int_applicative__offer_metadata` eom + `{{ bigquery_int_applicative_dataset }}.offer_metadata` eom on eom.offer_id = ebd.offer_id join `{{ bigquery_analytics_dataset }}.global_venue` evd on evd.venue_id = ebd.venue_id diff --git a/orchestration/dags/jobs/ml/offline_recommendations.py b/orchestration/dags/jobs/ml/offline_recommendations.py index 88b9fc96dd..8eb811a368 100644 --- a/orchestration/dags/jobs/ml/offline_recommendations.py +++ b/orchestration/dags/jobs/ml/offline_recommendations.py @@ -6,6 +6,7 @@ DAG_FOLDER, DATA_GCS_BUCKET_NAME, ENV_SHORT_NAME, + GCP_PROJECT_ID, ) from common.operators.bigquery import bigquery_job_task from common.operators.gce import ( @@ -35,6 +36,8 @@ "retry_delay": timedelta(minutes=2), } dag_config = { + "GCP_PROJECT_ID": GCP_PROJECT_ID, + "ENV_SHORT_NAME": ENV_SHORT_NAME, "TOKENIZERS_PARALLELISM": "false", "API_TOKEN_SECRET_ID": f"api-reco-token-{ENV_SHORT_NAME}", } From 8b27207082bf0a5ca119f3e70a16fbc9a3935b3d Mon Sep 17 00:00:00 2001 From: dcuesta-pass Date: Wed, 27 Nov 2024 14:29:12 +0100 Subject: [PATCH 2/8] feat: Enhance logging for offline recommendations and update GCP project variable --- jobs/ml_jobs/offline_recommendation/main.py | 4 +++- jobs/ml_jobs/offline_recommendation/utils.py | 6 +++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/jobs/ml_jobs/offline_recommendation/main.py b/jobs/ml_jobs/offline_recommendation/main.py index 07f2de01f8..13bad2694d 100644 --- a/jobs/ml_jobs/offline_recommendation/main.py +++ b/jobs/ml_jobs/offline_recommendation/main.py @@ -22,7 +22,9 @@ def offline_recommendation( .to_arrow() ) - logger.info("Offline recommendation: Get recommendations from API...") + logger.info( + f"Offline recommendation: Get recommendations from API... for {len(data)} users" + ) offline_recommendations = get_offline_recos(data) logger.info("Offline recommendation: Store recos to BQ...") diff --git a/jobs/ml_jobs/offline_recommendation/utils.py b/jobs/ml_jobs/offline_recommendation/utils.py index 4d2bb2359c..0518b84e6e 100644 --- a/jobs/ml_jobs/offline_recommendation/utils.py +++ b/jobs/ml_jobs/offline_recommendation/utils.py @@ -11,11 +11,11 @@ from access_gcp_secrets import access_secret -GCP_PROJECT = os.environ.get("GCP_PROJECT", "passculture-data-ehp") +GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "passculture-data-ehp") ENV_SHORT_NAME = os.environ.get("ENV_SHORT_NAME", "dev") API_TOKEN_SECRET_ID = os.environ.get("API_TOKEN_SECRET_ID") try: - API_TOKEN = access_secret(GCP_PROJECT, API_TOKEN_SECRET_ID) + API_TOKEN = access_secret(GCP_PROJECT_ID, API_TOKEN_SECRET_ID) except Exception: API_TOKEN = "test_token" APP_CONFIG = { @@ -125,7 +125,7 @@ def export_polars_to_bq(client, data, dataset, output_table): job = client.load_table_from_file( stream, destination=f"{dataset}.{output_table}", - project=GCP_PROJECT, + project=GCP_PROJECT_ID, job_config=bigquery.LoadJobConfig( source_format=bigquery.SourceFormat.PARQUET, ), From e367c1e8b1a63fabd9aa74ebecc6eb7ae63707e0 Mon Sep 17 00:00:00 2001 From: dcuesta-pass Date: Wed, 27 Nov 2024 14:29:21 +0100 Subject: [PATCH 3/8] feat: Add new queries for offline recommendations and implement corresponding SQL files --- .../ml/offline_recommendation/export_to_backend.py | 7 ++++++- .../dependencies/ml/offline_recommendation/import_users.py | 7 ++++++- .../sql/export_backend/day_plus_fifty_after_deposit.sql | 1 + .../sql/export_backend/day_plus_thirty_inactivity.sql | 1 + orchestration/dags/jobs/ml/offline_recommendations.py | 2 ++ 5 files changed, 16 insertions(+), 2 deletions(-) create mode 100644 orchestration/dags/dependencies/ml/offline_recommendation/sql/export_backend/day_plus_fifty_after_deposit.sql create mode 100644 orchestration/dags/dependencies/ml/offline_recommendation/sql/export_backend/day_plus_thirty_inactivity.sql diff --git a/orchestration/dags/dependencies/ml/offline_recommendation/export_to_backend.py b/orchestration/dags/dependencies/ml/offline_recommendation/export_to_backend.py index 46009a292c..1ba5b63e0e 100644 --- a/orchestration/dags/dependencies/ml/offline_recommendation/export_to_backend.py +++ b/orchestration/dags/dependencies/ml/offline_recommendation/export_to_backend.py @@ -1,6 +1,11 @@ SQL_PATH = "dependencies/ml/offline_recommendation/sql/export_backend" DATE = "{{ yyyymmdd(ds) }}" -queries = ["first_booking", "day_plus_two_after_booking"] +queries = [ + "first_booking", + "day_plus_two_after_booking", + "day_plus_fifty_after_deposit", + "day_plus_thirty_inactivity", +] params = [] for query in queries: params.append( diff --git a/orchestration/dags/dependencies/ml/offline_recommendation/import_users.py b/orchestration/dags/dependencies/ml/offline_recommendation/import_users.py index f237f3620c..c200f836a8 100644 --- a/orchestration/dags/dependencies/ml/offline_recommendation/import_users.py +++ b/orchestration/dags/dependencies/ml/offline_recommendation/import_users.py @@ -1,6 +1,11 @@ SQL_PATH = "dependencies/ml/offline_recommendation/sql/import" DATE = "{{ yyyymmdd(ds) }}" -queries = ["first_booking", "day_plus_two_after_booking"] +queries = [ + "first_booking", + "day_plus_two_after_booking", + "day_plus_fifty_after_deposit", + "day_plus_thirty_inactivity", +] params = [] for query in queries: params.append( diff --git a/orchestration/dags/dependencies/ml/offline_recommendation/sql/export_backend/day_plus_fifty_after_deposit.sql b/orchestration/dags/dependencies/ml/offline_recommendation/sql/export_backend/day_plus_fifty_after_deposit.sql new file mode 100644 index 0000000000..e015e1fb3c --- /dev/null +++ b/orchestration/dags/dependencies/ml/offline_recommendation/sql/export_backend/day_plus_fifty_after_deposit.sql @@ -0,0 +1 @@ +select * from `{{ bigquery_tmp_dataset }}.offline_recommendation_{{ yyyymmdd(ds) }}_day_plus_fifty_after_deposit` \ No newline at end of file diff --git a/orchestration/dags/dependencies/ml/offline_recommendation/sql/export_backend/day_plus_thirty_inactivity.sql b/orchestration/dags/dependencies/ml/offline_recommendation/sql/export_backend/day_plus_thirty_inactivity.sql new file mode 100644 index 0000000000..e015e1fb3c --- /dev/null +++ b/orchestration/dags/dependencies/ml/offline_recommendation/sql/export_backend/day_plus_thirty_inactivity.sql @@ -0,0 +1 @@ +select * from `{{ bigquery_tmp_dataset }}.offline_recommendation_{{ yyyymmdd(ds) }}_day_plus_fifty_after_deposit` \ No newline at end of file diff --git a/orchestration/dags/jobs/ml/offline_recommendations.py b/orchestration/dags/jobs/ml/offline_recommendations.py index 8eb811a368..d77a77366a 100644 --- a/orchestration/dags/jobs/ml/offline_recommendations.py +++ b/orchestration/dags/jobs/ml/offline_recommendations.py @@ -127,6 +127,8 @@ >> install_dependencies >> get_offline_predictions[0] >> get_offline_predictions[1] + >> get_offline_predictions[2] + >> get_offline_predictions[3] >> export_to_backend_tasks >> end ) From 7457fdcee452ed31ccb3b90378c1d15350a7cf9b Mon Sep 17 00:00:00 2001 From: dcuesta-pass Date: Wed, 27 Nov 2024 14:47:16 +0100 Subject: [PATCH 4/8] feat: Update SQL query to join global_offer table for item_id retrieval --- .../sql/import/day_plus_fifty_after_deposit.sql | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/orchestration/dags/dependencies/ml/offline_recommendation/sql/import/day_plus_fifty_after_deposit.sql b/orchestration/dags/dependencies/ml/offline_recommendation/sql/import/day_plus_fifty_after_deposit.sql index 09aed9d27f..150b536f7a 100644 --- a/orchestration/dags/dependencies/ml/offline_recommendation/sql/import/day_plus_fifty_after_deposit.sql +++ b/orchestration/dags/dependencies/ml/offline_recommendation/sql/import/day_plus_fifty_after_deposit.sql @@ -32,10 +32,11 @@ select e.event_day, e.event_month, e.offer_id, - e.item_id, + go.item_id, eom.offer_subcategory_id, eom.search_group_name from events e +join `{{ bigquery_analytics_dataset }}.global_offer` go on e.offer_id = go.offer_id join user_with_first_deposit uob on e.user_id = uob.user_id join `{{ bigquery_int_applicative_dataset }}.offer_metadata` eom From bdf6a315ab5db5ffbf1b54caafd0f608015ae12e Mon Sep 17 00:00:00 2001 From: dcuesta-pass Date: Wed, 27 Nov 2024 15:25:12 +0100 Subject: [PATCH 5/8] fix: Correct indentation in _get_recos function to improve readability --- jobs/ml_jobs/offline_recommendation/utils.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/jobs/ml_jobs/offline_recommendation/utils.py b/jobs/ml_jobs/offline_recommendation/utils.py index 0518b84e6e..3274728eeb 100644 --- a/jobs/ml_jobs/offline_recommendation/utils.py +++ b/jobs/ml_jobs/offline_recommendation/utils.py @@ -72,9 +72,9 @@ def _get_recos(rows): # logger.info("Request failed!") # pdb.set_trace() reco = [] - results.append( - {"user_id": row["user_id"], "offer_id": row["offer_id"], "recos": reco} - ) + results.append( + {"user_id": row["user_id"], "offer_id": row["offer_id"], "recos": reco} + ) return results except Exception: logger.info("get recos failed") From ae916ca9cd002b24b9112956301f929209330fbf Mon Sep 17 00:00:00 2001 From: dcuesta-pass Date: Wed, 27 Nov 2024 17:40:41 +0100 Subject: [PATCH 6/8] feat: Update schedule interval for offline recommendation DAG to run at 2 AM weekly --- .../dags/jobs/ml/offline_recommendations.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/orchestration/dags/jobs/ml/offline_recommendations.py b/orchestration/dags/jobs/ml/offline_recommendations.py index d77a77366a..9cf3aa9ab7 100644 --- a/orchestration/dags/jobs/ml/offline_recommendations.py +++ b/orchestration/dags/jobs/ml/offline_recommendations.py @@ -13,6 +13,7 @@ CloneRepositoryGCEOperator, SSHGCEOperator, StartGCEOperator, + StopGCEOperator, ) from common.utils import get_airflow_schedule from dependencies.ml.offline_recommendation.export_to_backend import ( @@ -31,11 +32,11 @@ STORAGE_PATH = f"gs://{DATA_GCS_BUCKET_NAME}/offline_recommendation_{ENV_SHORT_NAME}/offline_recommendation_{DATE}" default_args = { "start_date": datetime(2023, 8, 2), - "on_failure_callback": task_fail_slack_alert, + # "on_failure_callback": task_fail_slack_alert, "retries": 0, "retry_delay": timedelta(minutes=2), } -dag_config = { +DAG_CONFIG = { "GCP_PROJECT_ID": GCP_PROJECT_ID, "ENV_SHORT_NAME": ENV_SHORT_NAME, "TOKENIZERS_PARALLELISM": "false", @@ -45,7 +46,7 @@ "offline_recommendation", default_args=default_args, description="Produce offline recommendation", - schedule_interval=get_airflow_schedule("0 0 * * 0"), + schedule_interval=get_airflow_schedule("0 2 * * *"), catchup=False, dagrun_timeout=timedelta(minutes=180), user_defined_macros=macros.default, @@ -79,7 +80,7 @@ task_id=f"""get_offline_predictions_{query_params["table"]}""", instance_name=GCE_INSTANCE, base_dir=BASE_DIR, - environment=dag_config, + environment=DAG_CONFIG, command="PYTHONPATH=. python main.py " f"""--input-table {query_params["destination_table"]} --output-table offline_recommendation_{query_params["destination_table"]}""", ) @@ -117,6 +118,10 @@ extra_params={}, ) ) + + gce_instance_stop = StopGCEOperator( + task_id="gce_stop_task", instance_name=GCE_INSTANCE + ) end = DummyOperator(task_id="end", dag=dag) ( @@ -130,5 +135,6 @@ >> get_offline_predictions[2] >> get_offline_predictions[3] >> export_to_backend_tasks + >> gce_instance_stop >> end ) From e2e6496030fc7db4903dc59d172398b024b7d269 Mon Sep 17 00:00:00 2001 From: dcuesta-pass Date: Mon, 23 Dec 2024 14:15:42 +0100 Subject: [PATCH 7/8] feat: Add API_URL_SECRET_ID to access secrets and update APP_CONFIG with dynamic API URL --- jobs/ml_jobs/offline_recommendation/utils.py | 17 ++++++++++------- .../dags/jobs/ml/offline_recommendations.py | 4 ++-- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/jobs/ml_jobs/offline_recommendation/utils.py b/jobs/ml_jobs/offline_recommendation/utils.py index 3274728eeb..894afedbe7 100644 --- a/jobs/ml_jobs/offline_recommendation/utils.py +++ b/jobs/ml_jobs/offline_recommendation/utils.py @@ -2,28 +2,31 @@ import io import os from multiprocessing import cpu_count -import pdb + import numpy as np import polars as pl import requests -from loguru import logger from google.cloud import bigquery +from loguru import logger from access_gcp_secrets import access_secret GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "passculture-data-ehp") ENV_SHORT_NAME = os.environ.get("ENV_SHORT_NAME", "dev") API_TOKEN_SECRET_ID = os.environ.get("API_TOKEN_SECRET_ID") +API_URL_SECRET_ID = os.environ.get("API_URL_SECRET_ID") try: API_TOKEN = access_secret(GCP_PROJECT_ID, API_TOKEN_SECRET_ID) except Exception: API_TOKEN = "test_token" + +try: + API_URL = access_secret(GCP_PROJECT_ID, API_URL_SECRET_ID) +except Exception: + API_TOKEN = "test_url" + APP_CONFIG = { - "URL": { - "dev": "https://api-reco-dev.data.ehp.internal.passculture.team", - "stg": "https://apireco.staging.passculture.team/", - "prod": "https://apireco-prod-962488981524.europe-west1.run.app", - }, + "URL": API_URL, "TOKEN": API_TOKEN, "route": "similar_offers", } diff --git a/orchestration/dags/jobs/ml/offline_recommendations.py b/orchestration/dags/jobs/ml/offline_recommendations.py index 9cf3aa9ab7..7f0788d27a 100644 --- a/orchestration/dags/jobs/ml/offline_recommendations.py +++ b/orchestration/dags/jobs/ml/offline_recommendations.py @@ -1,7 +1,6 @@ from datetime import datetime, timedelta from common import macros -from common.alerts import task_fail_slack_alert from common.config import ( DAG_FOLDER, DATA_GCS_BUCKET_NAME, @@ -41,6 +40,7 @@ "ENV_SHORT_NAME": ENV_SHORT_NAME, "TOKENIZERS_PARALLELISM": "false", "API_TOKEN_SECRET_ID": f"api-reco-token-{ENV_SHORT_NAME}", + "API_URL_SECRET_ID": "api-reco-internal-url", } with DAG( "offline_recommendation", @@ -118,7 +118,7 @@ extra_params={}, ) ) - + gce_instance_stop = StopGCEOperator( task_id="gce_stop_task", instance_name=GCE_INSTANCE ) From a732419c0cd0976322239bf3134c6ed90f2338b2 Mon Sep 17 00:00:00 2001 From: dcuesta-pass Date: Mon, 23 Dec 2024 18:04:23 +0100 Subject: [PATCH 8/8] feat: Refactor access to GCP secrets and reorganize utility functions for offline recommendations --- jobs/ml_jobs/offline_recommendation/main.py | 52 +++--- jobs/ml_jobs/offline_recommendation/utils.py | 136 -------------- .../offline_recommendation/utils/__init__.py | 0 .../{ => utils}/access_gcp_secrets.py | 0 .../offline_recommendation/utils/constants.py | 27 +++ .../offline_recommendation/utils/tools.py | 175 ++++++++++++++++++ 6 files changed, 233 insertions(+), 157 deletions(-) delete mode 100644 jobs/ml_jobs/offline_recommendation/utils.py create mode 100644 jobs/ml_jobs/offline_recommendation/utils/__init__.py rename jobs/ml_jobs/offline_recommendation/{ => utils}/access_gcp_secrets.py (100%) create mode 100644 jobs/ml_jobs/offline_recommendation/utils/constants.py create mode 100644 jobs/ml_jobs/offline_recommendation/utils/tools.py diff --git a/jobs/ml_jobs/offline_recommendation/main.py b/jobs/ml_jobs/offline_recommendation/main.py index 13bad2694d..d135dd9adc 100644 --- a/jobs/ml_jobs/offline_recommendation/main.py +++ b/jobs/ml_jobs/offline_recommendation/main.py @@ -14,27 +14,37 @@ def offline_recommendation( ..., help="Output table for offline recommendations" ), ): - client = bigquery.Client() - logger.info("Offline recommendation: fetch data...") - data = pl.from_arrow( - client.query(f"SELECT * FROM `tmp_{ENV_SHORT_NAME}.{input_table}` ") - .result() - .to_arrow() - ) - - logger.info( - f"Offline recommendation: Get recommendations from API... for {len(data)} users" - ) - offline_recommendations = get_offline_recos(data) - - logger.info("Offline recommendation: Store recos to BQ...") - export_polars_to_bq( - client=client, - data=offline_recommendations, - dataset=f"tmp_{ENV_SHORT_NAME}", - output_table=output_table, - ) - logger.info(f"Offline recommendation: Exported to {output_table}") + """ + Fetches data from BigQuery, gets offline recommendations, and stores the results back to BigQuery. + + Args: + input_table (str): The name of the input table containing data for offline recommendations. + output_table (str): The name of the output table for storing offline recommendations. + """ + with bigquery.Client() as client: + try: + logger.info("Offline recommendation: fetch data...") + data = pl.from_arrow( + client.query(f"SELECT * FROM `tmp_{ENV_SHORT_NAME}.{input_table}` ") + .result() + .to_arrow() + ) + + logger.info( + f"Offline recommendation: Get recommendations from API... for {len(data)} users" + ) + offline_recommendations = get_offline_recos(data) + + logger.info("Offline recommendation: Store recos to BQ...") + export_polars_to_bq( + client=client, + data=offline_recommendations, + dataset=f"tmp_{ENV_SHORT_NAME}", + output_table=output_table, + ) + logger.info(f"Offline recommendation: Exported to {output_table}") + except Exception as e: + logger.error(f"Offline recommendation failed: {e}") return diff --git a/jobs/ml_jobs/offline_recommendation/utils.py b/jobs/ml_jobs/offline_recommendation/utils.py deleted file mode 100644 index 894afedbe7..0000000000 --- a/jobs/ml_jobs/offline_recommendation/utils.py +++ /dev/null @@ -1,136 +0,0 @@ -import concurrent -import io -import os -from multiprocessing import cpu_count - -import numpy as np -import polars as pl -import requests -from google.cloud import bigquery -from loguru import logger - -from access_gcp_secrets import access_secret - -GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "passculture-data-ehp") -ENV_SHORT_NAME = os.environ.get("ENV_SHORT_NAME", "dev") -API_TOKEN_SECRET_ID = os.environ.get("API_TOKEN_SECRET_ID") -API_URL_SECRET_ID = os.environ.get("API_URL_SECRET_ID") -try: - API_TOKEN = access_secret(GCP_PROJECT_ID, API_TOKEN_SECRET_ID) -except Exception: - API_TOKEN = "test_token" - -try: - API_URL = access_secret(GCP_PROJECT_ID, API_URL_SECRET_ID) -except Exception: - API_TOKEN = "test_url" - -APP_CONFIG = { - "URL": API_URL, - "TOKEN": API_TOKEN, - "route": "similar_offers", -} -N_RECO_DISPLAY = 10 - - -def get_offline_recos(data): - max_process = 2 if ENV_SHORT_NAME == "dev" else cpu_count() - 2 - subset_length = len(data) // max_process - subset_length = subset_length if subset_length > 0 else 1 - batch_number = max_process if subset_length > 1 else 1 - print( - f"Starting process... with {batch_number} CPUs, subset length: {subset_length} " - ) - batch_rows = [ - list(chunk) - for chunk in list(np.array_split(data.rows(named=True), batch_number)) - ] - - logger.info(f"And {len(batch_rows)} batches..") - with concurrent.futures.ProcessPoolExecutor(batch_number) as executor: - futures = executor.map( - _get_recos, - batch_rows, - ) - print("Multiprocessing done") - dl_output = clean_multiprocess_output(futures) - return dl_output - - -def _get_recos(rows): - # logger.info("get recos") - # pdb.set_trace() - results = [] - try: - for row in rows: - try: - # logger.info("Request check") - # pdb.set_trace() - reco = similar_offers( - row["offer_id"], row["venue_longitude"], row["venue_latitude"] - )[:N_RECO_DISPLAY] - # logger.info("Request Sucess!") - # pdb.set_trace() - except Exception: - # logger.info("Request failed!") - # pdb.set_trace() - reco = [] - results.append( - {"user_id": row["user_id"], "offer_id": row["offer_id"], "recos": reco} - ) - return results - except Exception: - logger.info("get recos failed") - return results - - -def similar_offers(offer_id, longitude, latitude): - params_filter = { - "is_reco_shuffled": False, - } - res = call_API(offer_id, longitude, latitude, params_filter)["results"] - # logger.info(f"Check on res:{res}") - # pdb.set_trace() - return res - - -def call_API(input, longitude, latitude, params_filter): - call = call_builder(input, longitude, latitude) - return requests.post(call, json=params_filter).json() - - -def call_builder(input, longitude, latitude): - call = f"{APP_CONFIG['URL'][ENV_SHORT_NAME]}/{APP_CONFIG['route']}/{input}?token={APP_CONFIG['TOKEN']}" - if longitude is not None and latitude is not None: - call = call + f"&longitude={longitude}&latitude={latitude}" - return call - - -def clean_multiprocess_output(futures): - user_ids = [] - recos = [] - for future in futures: - for res in future: - user_ids.append(res["user_id"]) - recos.append(res["recos"]) - dl_output = ( - pl.DataFrame({"user_id": user_ids, "recommendations": recos}) - .groupby("user_id") - .agg(pl.concat_list("recommendations").flatten().unique().drop_nulls()) - ) - return dl_output - - -def export_polars_to_bq(client, data, dataset, output_table): - with io.BytesIO() as stream: - data.write_parquet(stream) - stream.seek(0) - job = client.load_table_from_file( - stream, - destination=f"{dataset}.{output_table}", - project=GCP_PROJECT_ID, - job_config=bigquery.LoadJobConfig( - source_format=bigquery.SourceFormat.PARQUET, - ), - ) - job.result() diff --git a/jobs/ml_jobs/offline_recommendation/utils/__init__.py b/jobs/ml_jobs/offline_recommendation/utils/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/jobs/ml_jobs/offline_recommendation/access_gcp_secrets.py b/jobs/ml_jobs/offline_recommendation/utils/access_gcp_secrets.py similarity index 100% rename from jobs/ml_jobs/offline_recommendation/access_gcp_secrets.py rename to jobs/ml_jobs/offline_recommendation/utils/access_gcp_secrets.py diff --git a/jobs/ml_jobs/offline_recommendation/utils/constants.py b/jobs/ml_jobs/offline_recommendation/utils/constants.py new file mode 100644 index 0000000000..31c3e74ff0 --- /dev/null +++ b/jobs/ml_jobs/offline_recommendation/utils/constants.py @@ -0,0 +1,27 @@ +import os +from multiprocessing import cpu_count + +from utils.access_gcp_secrets import access_secret + +GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "passculture-data-ehp") +ENV_SHORT_NAME = os.environ.get("ENV_SHORT_NAME", "dev") +API_TOKEN_SECRET_ID = os.environ.get("API_TOKEN_SECRET_ID") +API_URL_SECRET_ID = os.environ.get("API_URL_SECRET_ID") + +try: + API_TOKEN = access_secret(GCP_PROJECT_ID, API_TOKEN_SECRET_ID) +except Exception: + API_TOKEN = "test_token" +# TODO: Add secrets via infra +try: + API_URL = access_secret(GCP_PROJECT_ID, API_URL_SECRET_ID) +except Exception: + API_URL = "test_url" + +APP_CONFIG = { + "URL": API_URL, + "TOKEN": API_TOKEN, + "route": "similar_offers", +} +N_RECO_DISPLAY = 10 +MAX_PROCESS = 2 if ENV_SHORT_NAME == "dev" else cpu_count() - 2 diff --git a/jobs/ml_jobs/offline_recommendation/utils/tools.py b/jobs/ml_jobs/offline_recommendation/utils/tools.py new file mode 100644 index 0000000000..c9e34392cc --- /dev/null +++ b/jobs/ml_jobs/offline_recommendation/utils/tools.py @@ -0,0 +1,175 @@ +import concurrent.futures +import io +from urllib.parse import urlencode + +import numpy as np +import polars as pl +import requests +from google.cloud import bigquery +from loguru import logger + +from utils.constants import ( + APP_CONFIG, + ENV_SHORT_NAME, + GCP_PROJECT_ID, + MAX_PROCESS, + N_RECO_DISPLAY, +) + + +def get_offline_recos(data): + """ + Distributes the data across multiple processes to get offline recommendations. + + Args: + data (pl.DataFrame): Input data containing user and offer information. + + Returns: + pl.DataFrame: DataFrame containing user IDs and their recommendations. + """ + subset_length = max(len(data) // MAX_PROCESS, 1) + batch_number = MAX_PROCESS if subset_length > 1 else 1 + logger.info( + f"Starting process... with {batch_number} CPUs, subset length: {subset_length}" + ) + + batch_rows = [ + list(chunk) for chunk in np.array_split(data.rows(named=True), batch_number) + ] + logger.info(f"And {len(batch_rows)} batches..") + + with concurrent.futures.ThreadPoolExecutor(batch_number) as executor: + futures = executor.map(_get_recos, batch_rows) + + logger.info("Multiprocessing done") + return clean_multiprocess_output(futures) + + +def _get_recos(rows): + """ + Fetches recommendations for a batch of rows. + + Args: + rows (list): List of rows containing user and offer information. + + Returns: + list: List of dictionaries containing user IDs, offer IDs, and recommendations. + """ + results = [] + for row in rows: + try: + reco = similar_offers( + row["offer_id"], row["venue_longitude"], row["venue_latitude"] + )[:N_RECO_DISPLAY] + except Exception as e: + logger.error(f"Request failed for offer_id {row['offer_id']}: {e}") + reco = [] + results.append( + {"user_id": row["user_id"], "offer_id": row["offer_id"], "recos": reco} + ) + return results + + +def similar_offers(offer_id, longitude, latitude): + """ + Fetches similar offers from the API. + + Args: + offer_id (str): The ID of the offer. + longitude (float): The longitude of the venue. + latitude (float): The latitude of the venue. + + Returns: + list: List of similar offers. + """ + params_filter = { + "is_reco_shuffled": False, + } + try: + res = call_API(offer_id, longitude, latitude, params_filter)["results"] + return res + except Exception as e: + logger.error(f"API call failed for offer_id {offer_id}: {e}") + return [] + + +def call_API(offer_id, longitude, latitude, params_filter): + """ + Calls the recommendation API. + + Args: + offer_id (str): The ID of the offer. + longitude (float): The longitude of the venue. + latitude (float): The latitude of the venue. + params_filter (dict): Additional parameters for the API call. + + Returns: + dict: The API response. + """ + call = call_builder(offer_id, longitude, latitude) + return requests.post(call, json=params_filter).json() + + +def call_builder(offer_id, longitude, latitude): + """ + Builds the API call URL. + + Args: + offer_id (str): The ID of the offer. + longitude (float): The longitude of the venue. + latitude (float): The latitude of the venue. + + Returns: + str: The API call URL. + """ + params = {"token": APP_CONFIG["TOKEN"]} + if longitude is not None and latitude is not None: + params.update({"longitude": longitude, "latitude": latitude}) + return f"{APP_CONFIG['URL'][ENV_SHORT_NAME]}/{APP_CONFIG['route']}/{offer_id}?{urlencode(params)}" + + +def clean_multiprocess_output(futures): + """ + Cleans and aggregates the output from multiple processes. + + Args: + futures (list): List of futures containing the results from multiple processes. + + Returns: + pl.DataFrame: DataFrame containing user IDs and their unique recommendations. + """ + user_ids = [] + recos = [] + for future in futures: + for res in future: + user_ids.append(res["user_id"]) + recos.append(res["recos"]) + return ( + pl.DataFrame({"user_id": user_ids, "recommendations": recos}) + .groupby("user_id") + .agg(pl.concat_list("recommendations").flatten().unique().drop_nulls()) + ) + + +def export_polars_to_bq(client, data, dataset, output_table): + """ + Exports a Polars DataFrame to BigQuery. + + Args: + client (bigquery.Client): The BigQuery client. + data (pl.DataFrame): The data to export. + dataset (str): The dataset name. + output_table (str): The output table name. + """ + with io.BytesIO() as stream: + data.write_parquet(stream) + stream.seek(0) + job = client.load_table_from_file( + stream, + destination=f"{dataset}.{output_table}", + project=GCP_PROJECT_ID, + job_config=bigquery.LoadJobConfig( + source_format=bigquery.SourceFormat.PARQUET, + ), + ) + job.result()