Skip to content

Commit

Permalink
[wip] source-facebook-marketing: attempting to allow for multiple acc…
Browse files Browse the repository at this point in the history
…ount ids
  • Loading branch information
Luishfs committed Nov 5, 2024
1 parent cac4775 commit 84e8456
Show file tree
Hide file tree
Showing 17 changed files with 822 additions and 379 deletions.
7 changes: 4 additions & 3 deletions source-facebook-marketing/connector_config.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
account_id_sops: ENC[AES256_GCM,data:DuMXhQYiyxxthsOHllmw,iv:wNw1RFVBe/AWvY6uycliIY3nPpzBiz+P4UBDr6f4nLI=,tag:7s2DQ+ApVrcfjMBNqOK+/w==,type:int]
account_ids:
- 196765372540622
credentials:
access_token_sops: ENC[AES256_GCM,data:Re73N0PHhhJs7Umo5ONjzSXdb4xKe+XtOBI3r1padMyQo5Bki3bW7l+1+/v4ifQuq4RypBwxQOAXdJfarwqLMW6vKCpxzq+kcbEHkvFd+NTJIWduYl8lC5bbFHT6uTrBNMxaTv/N7LNTwohBss+iKc9kf+Dp7dwpUqikOohnvOZ9y6C4+p2lvanOUpVfw7mGL52uW8VALPiLWh1hgbn6ZXlk7MOAjZAboIzf2bCzkzZQIiTz/4NYVLcmbFbPx8DRjrUd6XU1yeLsfXbT3CNRbAJyNWwDJN9vwKErGQIKcfsXsP5hVb0rpFuSW+WBvpJ0avCR6N8vvM8I8mc5W47EMuMTAxI=,iv:hWmqMuuIUBm8Vm30DVzv5jkesBAIuxZnCzn2h/ysUZ8=,tag:uKzEZpI3M4IWiXGXIqncZQ==,type:str]
auth_type: OAuth Credentials
Expand Down Expand Up @@ -44,8 +45,8 @@ sops:
azure_kv: []
hc_vault: []
age: []
lastmodified: "2024-02-05T23:41:33Z"
mac: ENC[AES256_GCM,data:ByAdg0VEy2xMpZXxAwa5oKB7CfG+eSA5ej/TJ2g9ZsIR+XhwJklI1XBW62VvahoGeC2AP9wYC/Mk7bP03914JZCz1v2gsbn6nL4WVuooIt0XMFgaquwWjoVrDa7Hy3iTHvr2Uwf4YcBWI/M3NwKpD/3f9TpNQRstrJq8pw6AC0k=,iv:Spd541rsKs5Si4AAdtxxB/muD9XiokkyEGBC8/wqAsg=,tag:eXTEjYrz/Vq4526D3E6whw==,type:str]
lastmodified: "2024-10-30T19:01:21Z"
mac: ENC[AES256_GCM,data:lksew6RnULnIZyMc5HZCDRo8bpjO3kX/9gh3ev5Xgmyd7QTG/rO2HCpeIsC77Gw4vco6trl2F/7U6r0ICl0UBHlLHgCAAAVLkXzadhM7Ty8xkW+cpo6VoL9HAKdaFZchT8dJlv2zOXDNWuPg5RF5dBxk97SZqkRMR8ekNKiAErQ=,iv:kL9jIUkCTrPgX6bZ07IPtXfHIYAt09Tvu+7zmherbnQ=,tag:ypP4PbGgLIzPVxkLe/DbrQ==,type:str]
pgp: []
encrypted_suffix: _sops
version: 3.8.1
12 changes: 8 additions & 4 deletions source-facebook-marketing/source_facebook_marketing/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import logging
from dataclasses import dataclass
from time import sleep
from typing import List

import backoff
import pendulum
Expand Down Expand Up @@ -158,16 +159,19 @@ def call(
class API:
"""Simple wrapper around Facebook API"""

def __init__(self, account_id: str, access_token: str):
self._account_id = account_id
def __init__(self, access_token: str):
self._account_ids = {}
# design flaw in MyFacebookAdsApi requires such strange set of new default api instance
self.api = MyFacebookAdsApi.init(access_token=access_token, crash_log=False, api_version="v19.0")
FacebookAdsApi.set_default_api(self.api)

@cached_property
def account(self) -> AdAccount:
def get_account(self, account_id) -> AdAccount:
"""Find current account"""
return self._find_account(self._account_id)
if account_id in self._account_ids:
return self._account_ids[account_id]
self._account_ids[account_id] = self._find_account(account_id)
return self._account_ids[account_id]

@staticmethod
def _find_account(account_id: str) -> AdAccount:
Expand Down
57 changes: 57 additions & 0 deletions source-facebook-marketing/source_facebook_marketing/migrations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import logging
from typing import Any, List, Mapping

from airbyte_cdk.config_observation import create_connector_config_control_message
from airbyte_cdk.entrypoint import AirbyteEntrypoint
from airbyte_cdk.sources import Source
from airbyte_cdk.sources.message import InMemoryMessageRepository, MessageRepository

logger = logging.getLogger("airbyte_logger")


class MigrateAccountId:

message_repository: MessageRepository = InMemoryMessageRepository()
migrate_from_key: str = "account_id"
migrate_to_key: str = "account_ids"

@classmethod
def should_migrate(cls, config: Mapping[str, Any]) -> bool:
return False if config.get(cls.migrate_to_key) else True

@classmethod
def transform(cls, config: Mapping[str, Any]) -> Mapping[str, Any]:
config[cls.migrate_to_key] = [config[cls.migrate_from_key]]
# return transformed config
return config

@classmethod
def modify_and_save(cls, config_path: str, source: Source, config: Mapping[str, Any]) -> Mapping[str, Any]:
# modify the config
migrated_config = cls.transform(config)
# save the config
source.write_config(migrated_config, config_path)
# return modified config
return migrated_config

@classmethod
def emit_control_message(cls, migrated_config: Mapping[str, Any]) -> None:
# add the Airbyte Control Message to message repo
cls.message_repository.emit_message(create_connector_config_control_message(migrated_config))
# emit the Airbyte Control Message from message queue to stdout
for message in cls.message_repository._message_queue:
print(message.json(exclude_unset=True))

@classmethod
def migrate(cls, args: List[str], source: Source) -> None:
# get config path
config_path = AirbyteEntrypoint(source).extract_config(args)
# proceed only if `--config` arg is provided
if config_path:
# read the existing config
config = source.read_config(config_path)
# migration check
if cls.should_migrate(config):
cls.emit_control_message(
cls.modify_and_save(config_path, source, config),
)
31 changes: 20 additions & 11 deletions source-facebook-marketing/source_facebook_marketing/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) ->
if config.end_date < config.start_date:
return False, "end_date must be equal or after start_date."

api = API(account_id=config.account_id, access_token=config.credentials.access_token)
logger.info(f"Select account {api.account}")
api = API(access_token=config.credentials.access_token)
except (requests.exceptions.RequestException, ValidationError) as e:
return False, e

Expand All @@ -89,19 +88,21 @@ def streams(self, config: Mapping[str, Any]) -> List[Type[Stream]]:
config.start_date = validate_start_date(config.start_date)
config.end_date = validate_end_date(config.start_date, config.end_date)

api = API(account_id=config.account_id, access_token=config.credentials.access_token)
api = API(access_token=config.credentials.access_token)

insights_args = dict(
api=api, start_date=config.start_date, end_date=config.end_date, insights_lookback_window=config.insights_lookback_window
api=api, start_date=config.start_date, end_date=config.end_date, insights_lookback_window=config.insights_lookback_window, account_ids=config.account_ids,
)
streams = [
AdAccount(
api=api,
account_ids=config.account_ids,
source_defined_primary_key=["account_id"],
),
AdSets(
api=api,
start_date=config.start_date,
account_ids=config.account_ids,
end_date=config.end_date,
include_deleted=config.include_deleted,
page_size=config.page_size,
Expand All @@ -111,6 +112,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Type[Stream]]:
Ads(
api=api,
start_date=config.start_date,
account_ids=config.account_ids,
end_date=config.end_date,
include_deleted=config.include_deleted,
page_size=config.page_size,
Expand All @@ -120,19 +122,21 @@ def streams(self, config: Mapping[str, Any]) -> List[Type[Stream]]:
AdCreatives(
api=api,
fetch_thumbnail_images=config.fetch_thumbnail_images,
account_ids=config.account_ids,
page_size=config.page_size,
max_batch_size=config.max_batch_size,
source_defined_primary_key=["id"],
),
AdsInsights(page_size=config.page_size, max_batch_size=config.max_batch_size, source_defined_primary_key=["date_start", "account_id", "ad_id"], **insights_args),
AdsInsightsAgeAndGender(page_size=config.page_size, max_batch_size=config.max_batch_size,source_defined_primary_key=["id"], **insights_args),
AdsInsightsCountry(page_size=config.page_size, max_batch_size=config.max_batch_size, source_defined_primary_key=["date_start", "account_id", "ad_id"], **insights_args),
AdsInsightsRegion(page_size=config.page_size, max_batch_size=config.max_batch_size, source_defined_primary_key=["date_start", "account_id", "ad_id"], **insights_args),
AdsInsightsDma(page_size=config.page_size, max_batch_size=config.max_batch_size, source_defined_primary_key=["date_start", "account_id", "ad_id"], **insights_args),
AdsInsightsPlatformAndDevice(page_size=config.page_size, max_batch_size=config.max_batch_size, source_defined_primary_key=["date_start", "account_id", "ad_id"], **insights_args),
AdsInsightsActionType(page_size=config.page_size, max_batch_size=config.max_batch_size, source_defined_primary_key=["id"], **insights_args),
AdsInsights(page_size=config.page_size, account_ids=config.account_ids, max_batch_size=config.max_batch_size, source_defined_primary_key=["date_start", "account_id", "ad_id"], **insights_args),
AdsInsightsAgeAndGender(page_size=config.page_size, account_ids=config.account_ids, max_batch_size=config.max_batch_size,source_defined_primary_key=["id"], **insights_args),
AdsInsightsCountry(page_size=config.page_size, account_ids=config.account_ids, max_batch_size=config.max_batch_size, source_defined_primary_key=["date_start", "account_id", "ad_id"], **insights_args),
AdsInsightsRegion(page_size=config.page_size, account_ids=config.account_ids, max_batch_size=config.max_batch_size, source_defined_primary_key=["date_start", "account_id", "ad_id"], **insights_args),
AdsInsightsDma(page_size=config.page_size, account_ids=config.account_ids, max_batch_size=config.max_batch_size, source_defined_primary_key=["date_start", "account_id", "ad_id"], **insights_args),
AdsInsightsPlatformAndDevice(page_size=config.page_size, account_ids=config.account_ids, max_batch_size=config.max_batch_size, source_defined_primary_key=["date_start", "account_id", "ad_id"], **insights_args),
AdsInsightsActionType(page_size=config.page_size, account_ids=config.account_ids, max_batch_size=config.max_batch_size, source_defined_primary_key=["id"], **insights_args),
Campaigns(
api=api,
account_ids=config.account_ids,
start_date=config.start_date,
end_date=config.end_date,
include_deleted=config.include_deleted,
Expand All @@ -142,13 +146,15 @@ def streams(self, config: Mapping[str, Any]) -> List[Type[Stream]]:
),
CustomConversions(
api=api,
account_ids=config.account_ids,
include_deleted=config.include_deleted,
page_size=config.page_size,
max_batch_size=config.max_batch_size,
source_defined_primary_key=["id"],
),
Images(
api=api,
account_ids=config.account_ids,
start_date=config.start_date,
end_date=config.end_date,
include_deleted=config.include_deleted,
Expand All @@ -158,6 +164,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Type[Stream]]:
),
Videos(
api=api,
account_ids=config.account_ids,
start_date=config.start_date,
end_date=config.end_date,
include_deleted=config.include_deleted,
Expand All @@ -167,6 +174,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Type[Stream]]:
),
Activities(
api=api,
account_ids=config.account_ids,
start_date=config.start_date,
end_date=config.end_date,
include_deleted=config.include_deleted,
Expand Down Expand Up @@ -206,6 +214,7 @@ def get_custom_insights_streams(self, api: API, config: ConnectorConfig) -> List
raise ValueError(mes)
stream = AdsInsights(
api=api,
account_ids=config.account_ids,
name=f"Custom{insight.name}",
fields=list(insight_fields),
breakdowns=list(set(insight.breakdowns)),
Expand Down
8 changes: 4 additions & 4 deletions source-facebook-marketing/source_facebook_marketing/spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
import logging
from datetime import datetime, timezone
from enum import Enum
from typing import Any, Dict, List, Optional, Type, Annotated, Union
from typing import Any, Dict, List, Optional, Type, Annotated, Union, Set

from airbyte_cdk.sources.config import BaseConfig
from facebook_business.adobjects.adsinsights import AdsInsights
from pydantic import BaseModel, Field, PositiveInt, ConfigDict
from pydantic import BaseModel, Field, PositiveInt, ConfigDict, constr

logger = logging.getLogger("airbyte")

Expand Down Expand Up @@ -131,8 +131,8 @@ class Config:
def schema_extra(schema: Dict[str, Any], model: Type["ConnectorConfig"]) -> None:
schema["properties"]["end_date"].pop("format")

account_id: str = Field(
title="Account ID",
account_ids: Set[constr(regex="^[0-9]+$")] = Field(
title="Account IDs",
order=0,
description=(
"The Facebook Ad account ID to use when pulling data from the Facebook Marketing API."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,14 @@ class InsightAsyncJobManager:
# limit is not reliable indicator of async workload capability we still have to use this parameter.
MAX_JOBS_IN_QUEUE = 100

def __init__(self, api: "API", jobs: Iterator[AsyncJob]):
def __init__(self, api: "API", jobs: Iterator[AsyncJob], account_id: str):
"""Init
:param api:
:param jobs:
"""
self._api = api
self._account_id = account_id
self._jobs = iter(jobs)
self._running_jobs = []

Expand Down Expand Up @@ -147,4 +148,4 @@ def _update_api_throttle_limit(self):
respond with empty list of data so api use "x-fb-ads-insights-throttle"
header to update current insights throttle limit.
"""
self._api.account.get_insights()
self._api.account(account_id=self._account_id).get_insights()
Loading

0 comments on commit 84e8456

Please sign in to comment.