diff --git a/jobs/ml_jobs/offline_recommendation/main.py b/jobs/ml_jobs/offline_recommendation/main.py index 07f2de01f8..d135dd9adc 100644 --- a/jobs/ml_jobs/offline_recommendation/main.py +++ b/jobs/ml_jobs/offline_recommendation/main.py @@ -14,25 +14,37 @@ def offline_recommendation( ..., help="Output table for offline recommendations" ), ): - client = bigquery.Client() - logger.info("Offline recommendation: fetch data...") - data = pl.from_arrow( - client.query(f"SELECT * FROM `tmp_{ENV_SHORT_NAME}.{input_table}` ") - .result() - .to_arrow() - ) - - logger.info("Offline recommendation: Get recommendations from API...") - offline_recommendations = get_offline_recos(data) - - logger.info("Offline recommendation: Store recos to BQ...") - export_polars_to_bq( - client=client, - data=offline_recommendations, - dataset=f"tmp_{ENV_SHORT_NAME}", - output_table=output_table, - ) - logger.info(f"Offline recommendation: Exported to {output_table}") + """ + Fetches data from BigQuery, gets offline recommendations, and stores the results back to BigQuery. + + Args: + input_table (str): The name of the input table containing data for offline recommendations. + output_table (str): The name of the output table for storing offline recommendations. + """ + with bigquery.Client() as client: + try: + logger.info("Offline recommendation: fetch data...") + data = pl.from_arrow( + client.query(f"SELECT * FROM `tmp_{ENV_SHORT_NAME}.{input_table}` ") + .result() + .to_arrow() + ) + + logger.info( + f"Offline recommendation: Get recommendations from API... for {len(data)} users" + ) + offline_recommendations = get_offline_recos(data) + + logger.info("Offline recommendation: Store recos to BQ...") + export_polars_to_bq( + client=client, + data=offline_recommendations, + dataset=f"tmp_{ENV_SHORT_NAME}", + output_table=output_table, + ) + logger.info(f"Offline recommendation: Exported to {output_table}") + except Exception as e: + logger.error(f"Offline recommendation failed: {e}") return diff --git a/jobs/ml_jobs/offline_recommendation/utils.py b/jobs/ml_jobs/offline_recommendation/utils.py deleted file mode 100644 index 69bb5dd537..0000000000 --- a/jobs/ml_jobs/offline_recommendation/utils.py +++ /dev/null @@ -1,119 +0,0 @@ -import concurrent -import io -import os -from multiprocessing import cpu_count - -import numpy as np -import polars as pl -import requests -from google.cloud import bigquery - -from access_gcp_secrets import access_secret - -GCP_PROJECT = os.environ.get("GCP_PROJECT", "passculture-data-ehp") -ENV_SHORT_NAME = os.environ.get("ENV_SHORT_NAME", "dev") -API_TOKEN_SECRET_ID = os.environ.get("API_TOKEN_SECRET_ID") -try: - API_TOKEN = access_secret(GCP_PROJECT, API_TOKEN_SECRET_ID) -except Exception: - API_TOKEN = "test_token" -APP_CONFIG = { - "URL": { - "dev": "https://apireco.testing.passculture.team", - "stg": "https://apireco.staging.passculture.team/", - "prod": "https://apireco.passculture.app", - }, - "TOKEN": API_TOKEN, - "route": "similar_offers", -} -N_RECO_DISPLAY = 10 - - -def get_offline_recos(data): - max_process = 2 if ENV_SHORT_NAME == "dev" else cpu_count() - 2 - subset_length = len(data) // max_process - subset_length = subset_length if subset_length > 0 else 1 - batch_number = max_process if subset_length > 1 else 1 - print( - f"Starting process... with {batch_number} CPUs, subset length: {subset_length} " - ) - batch_rows = [ - list(chunk) - for chunk in list(np.array_split(data.rows(named=True), batch_number)) - ] - - with concurrent.futures.ProcessPoolExecutor(batch_number) as executor: - futures = executor.map( - _get_recos, - batch_rows, - ) - print("Multiprocessing done") - dl_output = clean_multiprocess_output(futures) - return dl_output - - -def _get_recos(rows): - results = [] - try: - for row in rows: - try: - reco = similar_offers( - row["offer_id"], row["venue_longitude"], row["venue_latitude"] - )[:N_RECO_DISPLAY] - except Exception: - reco = [] - results.append( - {"user_id": row["user_id"], "offer_id": row["offer_id"], "recos": reco} - ) - return results - except Exception: - return results - - -def similar_offers(offer_id, longitude, latitude): - params_filter = { - "is_reco_shuffled": False, - } - return call_API(offer_id, longitude, latitude, params_filter)["results"] - - -def call_API(input, longitude, latitude, params_filter): - call = call_builder(input, longitude, latitude) - return requests.post(call, json=params_filter).json() - - -def call_builder(input, longitude, latitude): - call = f"{APP_CONFIG['URL'][ENV_SHORT_NAME]}/{APP_CONFIG['route']}/{input}?token={APP_CONFIG['TOKEN']}" - if longitude is not None and latitude is not None: - call = call + f"&longitude={longitude}&latitude={latitude}" - return call - - -def clean_multiprocess_output(futures): - user_ids = [] - recos = [] - for future in futures: - for res in future: - user_ids.append(res["user_id"]) - recos.append(res["recos"]) - dl_output = ( - pl.DataFrame({"user_id": user_ids, "recommendations": recos}) - .groupby("user_id") - .agg(pl.concat_list("recommendations").flatten().unique().drop_nulls()) - ) - return dl_output - - -def export_polars_to_bq(client, data, dataset, output_table): - with io.BytesIO() as stream: - data.write_parquet(stream) - stream.seek(0) - job = client.load_table_from_file( - stream, - destination=f"{dataset}.{output_table}", - project=GCP_PROJECT, - job_config=bigquery.LoadJobConfig( - source_format=bigquery.SourceFormat.PARQUET, - ), - ) - job.result() diff --git a/jobs/ml_jobs/offline_recommendation/utils/__init__.py b/jobs/ml_jobs/offline_recommendation/utils/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/jobs/ml_jobs/offline_recommendation/access_gcp_secrets.py b/jobs/ml_jobs/offline_recommendation/utils/access_gcp_secrets.py similarity index 100% rename from jobs/ml_jobs/offline_recommendation/access_gcp_secrets.py rename to jobs/ml_jobs/offline_recommendation/utils/access_gcp_secrets.py diff --git a/jobs/ml_jobs/offline_recommendation/utils/constants.py b/jobs/ml_jobs/offline_recommendation/utils/constants.py new file mode 100644 index 0000000000..31c3e74ff0 --- /dev/null +++ b/jobs/ml_jobs/offline_recommendation/utils/constants.py @@ -0,0 +1,27 @@ +import os +from multiprocessing import cpu_count + +from utils.access_gcp_secrets import access_secret + +GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "passculture-data-ehp") +ENV_SHORT_NAME = os.environ.get("ENV_SHORT_NAME", "dev") +API_TOKEN_SECRET_ID = os.environ.get("API_TOKEN_SECRET_ID") +API_URL_SECRET_ID = os.environ.get("API_URL_SECRET_ID") + +try: + API_TOKEN = access_secret(GCP_PROJECT_ID, API_TOKEN_SECRET_ID) +except Exception: + API_TOKEN = "test_token" +# TODO: Add secrets via infra +try: + API_URL = access_secret(GCP_PROJECT_ID, API_URL_SECRET_ID) +except Exception: + API_URL = "test_url" + +APP_CONFIG = { + "URL": API_URL, + "TOKEN": API_TOKEN, + "route": "similar_offers", +} +N_RECO_DISPLAY = 10 +MAX_PROCESS = 2 if ENV_SHORT_NAME == "dev" else cpu_count() - 2 diff --git a/jobs/ml_jobs/offline_recommendation/utils/tools.py b/jobs/ml_jobs/offline_recommendation/utils/tools.py new file mode 100644 index 0000000000..c9e34392cc --- /dev/null +++ b/jobs/ml_jobs/offline_recommendation/utils/tools.py @@ -0,0 +1,175 @@ +import concurrent.futures +import io +from urllib.parse import urlencode + +import numpy as np +import polars as pl +import requests +from google.cloud import bigquery +from loguru import logger + +from utils.constants import ( + APP_CONFIG, + ENV_SHORT_NAME, + GCP_PROJECT_ID, + MAX_PROCESS, + N_RECO_DISPLAY, +) + + +def get_offline_recos(data): + """ + Distributes the data across multiple processes to get offline recommendations. + + Args: + data (pl.DataFrame): Input data containing user and offer information. + + Returns: + pl.DataFrame: DataFrame containing user IDs and their recommendations. + """ + subset_length = max(len(data) // MAX_PROCESS, 1) + batch_number = MAX_PROCESS if subset_length > 1 else 1 + logger.info( + f"Starting process... with {batch_number} CPUs, subset length: {subset_length}" + ) + + batch_rows = [ + list(chunk) for chunk in np.array_split(data.rows(named=True), batch_number) + ] + logger.info(f"And {len(batch_rows)} batches..") + + with concurrent.futures.ThreadPoolExecutor(batch_number) as executor: + futures = executor.map(_get_recos, batch_rows) + + logger.info("Multiprocessing done") + return clean_multiprocess_output(futures) + + +def _get_recos(rows): + """ + Fetches recommendations for a batch of rows. + + Args: + rows (list): List of rows containing user and offer information. + + Returns: + list: List of dictionaries containing user IDs, offer IDs, and recommendations. + """ + results = [] + for row in rows: + try: + reco = similar_offers( + row["offer_id"], row["venue_longitude"], row["venue_latitude"] + )[:N_RECO_DISPLAY] + except Exception as e: + logger.error(f"Request failed for offer_id {row['offer_id']}: {e}") + reco = [] + results.append( + {"user_id": row["user_id"], "offer_id": row["offer_id"], "recos": reco} + ) + return results + + +def similar_offers(offer_id, longitude, latitude): + """ + Fetches similar offers from the API. + + Args: + offer_id (str): The ID of the offer. + longitude (float): The longitude of the venue. + latitude (float): The latitude of the venue. + + Returns: + list: List of similar offers. + """ + params_filter = { + "is_reco_shuffled": False, + } + try: + res = call_API(offer_id, longitude, latitude, params_filter)["results"] + return res + except Exception as e: + logger.error(f"API call failed for offer_id {offer_id}: {e}") + return [] + + +def call_API(offer_id, longitude, latitude, params_filter): + """ + Calls the recommendation API. + + Args: + offer_id (str): The ID of the offer. + longitude (float): The longitude of the venue. + latitude (float): The latitude of the venue. + params_filter (dict): Additional parameters for the API call. + + Returns: + dict: The API response. + """ + call = call_builder(offer_id, longitude, latitude) + return requests.post(call, json=params_filter).json() + + +def call_builder(offer_id, longitude, latitude): + """ + Builds the API call URL. + + Args: + offer_id (str): The ID of the offer. + longitude (float): The longitude of the venue. + latitude (float): The latitude of the venue. + + Returns: + str: The API call URL. + """ + params = {"token": APP_CONFIG["TOKEN"]} + if longitude is not None and latitude is not None: + params.update({"longitude": longitude, "latitude": latitude}) + return f"{APP_CONFIG['URL'][ENV_SHORT_NAME]}/{APP_CONFIG['route']}/{offer_id}?{urlencode(params)}" + + +def clean_multiprocess_output(futures): + """ + Cleans and aggregates the output from multiple processes. + + Args: + futures (list): List of futures containing the results from multiple processes. + + Returns: + pl.DataFrame: DataFrame containing user IDs and their unique recommendations. + """ + user_ids = [] + recos = [] + for future in futures: + for res in future: + user_ids.append(res["user_id"]) + recos.append(res["recos"]) + return ( + pl.DataFrame({"user_id": user_ids, "recommendations": recos}) + .groupby("user_id") + .agg(pl.concat_list("recommendations").flatten().unique().drop_nulls()) + ) + + +def export_polars_to_bq(client, data, dataset, output_table): + """ + Exports a Polars DataFrame to BigQuery. + + Args: + client (bigquery.Client): The BigQuery client. + data (pl.DataFrame): The data to export. + dataset (str): The dataset name. + output_table (str): The output table name. + """ + with io.BytesIO() as stream: + data.write_parquet(stream) + stream.seek(0) + job = client.load_table_from_file( + stream, + destination=f"{dataset}.{output_table}", + project=GCP_PROJECT_ID, + job_config=bigquery.LoadJobConfig( + source_format=bigquery.SourceFormat.PARQUET, + ), + ) + job.result() diff --git a/orchestration/dags/dependencies/ml/offline_recommendation/export_to_backend.py b/orchestration/dags/dependencies/ml/offline_recommendation/export_to_backend.py index 46009a292c..1ba5b63e0e 100644 --- a/orchestration/dags/dependencies/ml/offline_recommendation/export_to_backend.py +++ b/orchestration/dags/dependencies/ml/offline_recommendation/export_to_backend.py @@ -1,6 +1,11 @@ SQL_PATH = "dependencies/ml/offline_recommendation/sql/export_backend" DATE = "{{ yyyymmdd(ds) }}" -queries = ["first_booking", "day_plus_two_after_booking"] +queries = [ + "first_booking", + "day_plus_two_after_booking", + "day_plus_fifty_after_deposit", + "day_plus_thirty_inactivity", +] params = [] for query in queries: params.append( diff --git a/orchestration/dags/dependencies/ml/offline_recommendation/import_users.py b/orchestration/dags/dependencies/ml/offline_recommendation/import_users.py index f237f3620c..c200f836a8 100644 --- a/orchestration/dags/dependencies/ml/offline_recommendation/import_users.py +++ b/orchestration/dags/dependencies/ml/offline_recommendation/import_users.py @@ -1,6 +1,11 @@ SQL_PATH = "dependencies/ml/offline_recommendation/sql/import" DATE = "{{ yyyymmdd(ds) }}" -queries = ["first_booking", "day_plus_two_after_booking"] +queries = [ + "first_booking", + "day_plus_two_after_booking", + "day_plus_fifty_after_deposit", + "day_plus_thirty_inactivity", +] params = [] for query in queries: params.append( diff --git a/orchestration/dags/dependencies/ml/offline_recommendation/sql/export_backend/day_plus_fifty_after_deposit.sql b/orchestration/dags/dependencies/ml/offline_recommendation/sql/export_backend/day_plus_fifty_after_deposit.sql new file mode 100644 index 0000000000..e015e1fb3c --- /dev/null +++ b/orchestration/dags/dependencies/ml/offline_recommendation/sql/export_backend/day_plus_fifty_after_deposit.sql @@ -0,0 +1 @@ +select * from `{{ bigquery_tmp_dataset }}.offline_recommendation_{{ yyyymmdd(ds) }}_day_plus_fifty_after_deposit` \ No newline at end of file diff --git a/orchestration/dags/dependencies/ml/offline_recommendation/sql/export_backend/day_plus_thirty_inactivity.sql b/orchestration/dags/dependencies/ml/offline_recommendation/sql/export_backend/day_plus_thirty_inactivity.sql new file mode 100644 index 0000000000..e015e1fb3c --- /dev/null +++ b/orchestration/dags/dependencies/ml/offline_recommendation/sql/export_backend/day_plus_thirty_inactivity.sql @@ -0,0 +1 @@ +select * from `{{ bigquery_tmp_dataset }}.offline_recommendation_{{ yyyymmdd(ds) }}_day_plus_fifty_after_deposit` \ No newline at end of file diff --git a/orchestration/dags/dependencies/ml/offline_recommendation/sql/import/day_plus_fifty_after_deposit.sql b/orchestration/dags/dependencies/ml/offline_recommendation/sql/import/day_plus_fifty_after_deposit.sql new file mode 100644 index 0000000000..150b536f7a --- /dev/null +++ b/orchestration/dags/dependencies/ml/offline_recommendation/sql/import/day_plus_fifty_after_deposit.sql @@ -0,0 +1,43 @@ +with + user_with_first_deposit as ( + select + user_id, + user_postal_code, + first_deposit_creation_date, + current_date() as current_date + from `{{ bigquery_analytics_dataset }}.global_user` + where date(first_deposit_creation_date) = date_sub(current_date(), interval 50 day) + ), + events as ( + select + user_id, + offer_id, + event_date, + extract(hour from event_timestamp) as event_hour, + extract(dayofweek from event_timestamp) as event_day, + extract(month from event_timestamp) as event_month + from `{{ bigquery_int_firebase_dataset }}`.`native_event` + where + event_name = "ConsultOffer" + and event_date >= date_sub(date("{{ ds }}"), interval 50 day) + and user_id is not null + and offer_id is not null + and offer_id != 'NaN' + ) +select + e.user_id, + uob.user_postal_code, + e.event_date, + e.event_hour, + e.event_day, + e.event_month, + e.offer_id, + go.item_id, + eom.offer_subcategory_id, + eom.search_group_name +from events e +join `{{ bigquery_analytics_dataset }}.global_offer` go on e.offer_id = go.offer_id +join user_with_first_deposit uob on e.user_id = uob.user_id +join + `{{ bigquery_int_applicative_dataset }}.offer_metadata` eom + on eom.offer_id = e.offer_id diff --git a/orchestration/dags/dependencies/ml/offline_recommendation/sql/import/day_plus_thirty_inactivity.sql b/orchestration/dags/dependencies/ml/offline_recommendation/sql/import/day_plus_thirty_inactivity.sql new file mode 100644 index 0000000000..40323081be --- /dev/null +++ b/orchestration/dags/dependencies/ml/offline_recommendation/sql/import/day_plus_thirty_inactivity.sql @@ -0,0 +1,44 @@ +with + users_with_last_booking as ( + select + user_id, + user_postal_code, + last_booking_date, + current_date() as current_date + from `{{ bigquery_analytics_dataset }}.global_user` + where date(last_booking_date) = date_sub(current_date(), interval 30 day) + and user_is_active = true + ), + bookings as ( + select + user_id, + offer_id, + booking_created_at as event_date, + extract(hour from booking_created_at) as event_hour, + extract(dayofweek from booking_created_at) as event_day, + extract(month from booking_created_at) as event_month, + item_id, + venue_id + from `{{ bigquery_analytics_dataset }}.global_booking` + where booking_created_at >= date_sub(current_date(), interval 30 day) + and user_id is not null + and offer_id is not null + and offer_id != 'NaN' + ) +select + b.user_id, + u.user_postal_code, + b.event_date, + b.event_hour, + b.event_day, + b.event_month, + b.offer_id, + b.item_id, + eom.offer_subcategory_id, + eom.search_group_name, + evd.venue_latitude, + evd.venue_longitude +from bookings b +join users_with_last_booking u on b.user_id = u.user_id +join `{{ bigquery_int_applicative_dataset }}.offer_metadata` eom on eom.offer_id = b.offer_id +join `{{ bigquery_analytics_dataset }}.global_venue` evd on evd.venue_id = b.venue_id \ No newline at end of file diff --git a/orchestration/dags/dependencies/ml/offline_recommendation/sql/import/day_plus_two_after_booking.sql b/orchestration/dags/dependencies/ml/offline_recommendation/sql/import/day_plus_two_after_booking.sql index 1b846236fc..050ad9d16e 100644 --- a/orchestration/dags/dependencies/ml/offline_recommendation/sql/import/day_plus_two_after_booking.sql +++ b/orchestration/dags/dependencies/ml/offline_recommendation/sql/import/day_plus_two_after_booking.sql @@ -3,10 +3,10 @@ with select user_id, user_postal_code, - first_booking_date as first_booking_date, + first_individual_booking_date as first_booking_date, current_date() as current_date from `{{ bigquery_analytics_dataset }}.global_user` - where date(first_booking_date) = date_sub(current_date(), interval 2 day) + where date(first_individual_booking_date) = date_sub(current_date(), interval 2 day) ) select ebd.user_id, @@ -17,12 +17,13 @@ select ebd.booking_created_at as booking_creation_date, uob.current_date, ebd.offer_id, - eom.subcategory_id, + ebd.item_id, + eom.offer_subcategory_id, eom.search_group_name from `{{ bigquery_analytics_dataset }}.global_booking` ebd join user_with_one_booking uob on ebd.user_id = uob.user_id join - `{{ bigquery_clean_dataset }}.int_applicative__offer_metadata` eom + `{{ bigquery_int_applicative_dataset }}.offer_metadata` eom on eom.offer_id = ebd.offer_id join `{{ bigquery_analytics_dataset }}.global_venue` evd on evd.venue_id = ebd.venue_id where uob.first_booking_date = ebd.booking_creation_date diff --git a/orchestration/dags/dependencies/ml/offline_recommendation/sql/import/first_booking.sql b/orchestration/dags/dependencies/ml/offline_recommendation/sql/import/first_booking.sql index 2a2011c332..f20adda6e8 100644 --- a/orchestration/dags/dependencies/ml/offline_recommendation/sql/import/first_booking.sql +++ b/orchestration/dags/dependencies/ml/offline_recommendation/sql/import/first_booking.sql @@ -1,9 +1,9 @@ with user_with_one_booking as ( - select user_id, last_booking_date, first_booking_date, user_postal_code + select user_id, last_booking_date, first_individual_booking_date, user_postal_code from `{{ bigquery_analytics_dataset }}.global_user` where - last_booking_date = first_booking_date + last_booking_date = first_individual_booking_date and last_booking_date >= date_sub(current_date(), interval 7 day) ) select @@ -12,11 +12,12 @@ select evd.venue_latitude, evd.venue_longitude, ebd.offer_id, - eom.subcategory_id, + ebd.item_id, + eom.offer_subcategory_id, eom.search_group_name from `{{ bigquery_analytics_dataset }}.global_booking` ebd join user_with_one_booking uob on ebd.user_id = uob.user_id join - `{{ bigquery_clean_dataset }}.int_applicative__offer_metadata` eom + `{{ bigquery_int_applicative_dataset }}.offer_metadata` eom on eom.offer_id = ebd.offer_id join `{{ bigquery_analytics_dataset }}.global_venue` evd on evd.venue_id = ebd.venue_id diff --git a/orchestration/dags/jobs/ml/offline_recommendations.py b/orchestration/dags/jobs/ml/offline_recommendations.py index 88b9fc96dd..7f0788d27a 100644 --- a/orchestration/dags/jobs/ml/offline_recommendations.py +++ b/orchestration/dags/jobs/ml/offline_recommendations.py @@ -1,17 +1,18 @@ from datetime import datetime, timedelta from common import macros -from common.alerts import task_fail_slack_alert from common.config import ( DAG_FOLDER, DATA_GCS_BUCKET_NAME, ENV_SHORT_NAME, + GCP_PROJECT_ID, ) from common.operators.bigquery import bigquery_job_task from common.operators.gce import ( CloneRepositoryGCEOperator, SSHGCEOperator, StartGCEOperator, + StopGCEOperator, ) from common.utils import get_airflow_schedule from dependencies.ml.offline_recommendation.export_to_backend import ( @@ -30,19 +31,22 @@ STORAGE_PATH = f"gs://{DATA_GCS_BUCKET_NAME}/offline_recommendation_{ENV_SHORT_NAME}/offline_recommendation_{DATE}" default_args = { "start_date": datetime(2023, 8, 2), - "on_failure_callback": task_fail_slack_alert, + # "on_failure_callback": task_fail_slack_alert, "retries": 0, "retry_delay": timedelta(minutes=2), } -dag_config = { +DAG_CONFIG = { + "GCP_PROJECT_ID": GCP_PROJECT_ID, + "ENV_SHORT_NAME": ENV_SHORT_NAME, "TOKENIZERS_PARALLELISM": "false", "API_TOKEN_SECRET_ID": f"api-reco-token-{ENV_SHORT_NAME}", + "API_URL_SECRET_ID": "api-reco-internal-url", } with DAG( "offline_recommendation", default_args=default_args, description="Produce offline recommendation", - schedule_interval=get_airflow_schedule("0 0 * * 0"), + schedule_interval=get_airflow_schedule("0 2 * * *"), catchup=False, dagrun_timeout=timedelta(minutes=180), user_defined_macros=macros.default, @@ -76,7 +80,7 @@ task_id=f"""get_offline_predictions_{query_params["table"]}""", instance_name=GCE_INSTANCE, base_dir=BASE_DIR, - environment=dag_config, + environment=DAG_CONFIG, command="PYTHONPATH=. python main.py " f"""--input-table {query_params["destination_table"]} --output-table offline_recommendation_{query_params["destination_table"]}""", ) @@ -115,6 +119,10 @@ ) ) + gce_instance_stop = StopGCEOperator( + task_id="gce_stop_task", instance_name=GCE_INSTANCE + ) + end = DummyOperator(task_id="end", dag=dag) ( start @@ -124,6 +132,9 @@ >> install_dependencies >> get_offline_predictions[0] >> get_offline_predictions[1] + >> get_offline_predictions[2] + >> get_offline_predictions[3] >> export_to_backend_tasks + >> gce_instance_stop >> end )