From 3684b6aca143b505696a6ee1e4c744948a7a339f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Javier=20R=C3=ADos?= Date: Thu, 5 Sep 2024 23:33:43 +0200 Subject: [PATCH] add: enable bigquery `resources` on open collective (#2077) --- .../oso_dagster/assets/open_collective.py | 22 +++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/warehouse/oso_dagster/assets/open_collective.py b/warehouse/oso_dagster/assets/open_collective.py index 6b58e8d4a..f3946f4b3 100644 --- a/warehouse/oso_dagster/assets/open_collective.py +++ b/warehouse/oso_dagster/assets/open_collective.py @@ -3,8 +3,10 @@ import dlt from dagster import AssetExecutionContext, WeeklyPartitionsDefinition +from dlt.destinations.adapters import bigquery_adapter from gql import Client, gql from gql.transport.requests import RequestsHTTPTransport +from oso_dagster import constants from oso_dagster.factories import dlt_factory, pydantic_to_dlt_nullable_columns from oso_dagster.utils.secrets import secret_ref_arg from pydantic import UUID4, BaseModel @@ -270,13 +272,21 @@ def expenses( """ client = base_open_collective_client(personal_token) - yield dlt.resource( + resource = dlt.resource( get_open_collective_expenses(context, client, "DEBIT"), name="expenses", columns=pydantic_to_dlt_nullable_columns(Transaction), primary_key="id", ) + if constants.enable_bigquery: + bigquery_adapter( + resource, + partition="created_at", + ) + + yield resource + @dlt_factory( key_prefix="open_collective", @@ -303,9 +313,17 @@ def deposits( """ client = base_open_collective_client(personal_token) - yield dlt.resource( + resource = dlt.resource( get_open_collective_expenses(context, client, "CREDIT"), name="funds", columns=pydantic_to_dlt_nullable_columns(Transaction), primary_key="id", ) + + if constants.enable_bigquery: + bigquery_adapter( + resource, + partition="created_at", + ) + + yield resource