Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

source-linkedin-ads-v2: fix bugs causing missing data #2107

Merged
merged 3 commits into from
Nov 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 69 additions & 13 deletions source-linkedin-ads-v2/source_linkedin_ads_v2/analytics_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@
from collections import defaultdict
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional
from urllib.parse import urlencode
from datetime import datetime, UTC, timedelta

import pendulum
import requests
from airbyte_cdk.sources.streams.core import package_name_from_class
from airbyte_cdk.sources.streams.core import package_name_from_class, IncrementalMixin
from airbyte_cdk.sources.utils import casing
from airbyte_cdk.sources.utils.schema_helpers import ResourceSchemaLoader
from airbyte_protocol.models import SyncMode
from source_linkedin_ads_v2.streams import Campaigns, Creatives, IncrementalLinkedinAdsStream
from source_linkedin_ads_v2.streams import Campaigns, Creatives, LinkedinAdsStream

from .utils import get_parent_stream_values, transform_data

Expand Down Expand Up @@ -119,7 +120,7 @@
]


class LinkedInAdsAnalyticsStream(IncrementalLinkedinAdsStream, ABC):
class LinkedInAdsAnalyticsStream(LinkedinAdsStream, IncrementalMixin, ABC):
"""
AdAnalytics Streams more info:
https://learn.microsoft.com/en-us/linkedin/marketing/integrations/ads-reporting/ads-reporting?tabs=curl&view=li-lms-2023-05#analytics-finder
Expand All @@ -128,10 +129,39 @@ class LinkedInAdsAnalyticsStream(IncrementalLinkedinAdsStream, ABC):
endpoint = "adAnalytics"
# For Analytics streams, the primary_key is the entity of the pivot [Campaign URN, Creative URN, etc.] + `end_date`
primary_key = ["string_of_pivot_values", "end_date"]
cursor_field = "end_date"
records_limit = 15000
FIELDS_CHUNK_SIZE = 18

_cursor_value = ""

@property
def cursor_field(self) -> str:
"""Name of the field associated with the state"""
return "end_date"

@property
def state(self) -> Mapping[str, Any]:
return {self.cursor_field: self._cursor_value}

@state.setter
def state(self, value: Mapping[str, Any]):
self._cursor_value = value[self.cursor_field]

@property
def primary_slice_key(self) -> str:
"""
Define the main slice_key from `slice_key_value_map`. Always the first element.
EXAMPLE:
in : {"k1": "v1", "k2": "v2", ...}
out : "k1"
"""
return list(self.parent_values_map.keys())[0]

@property
@abstractmethod
def parent_stream(self) -> LinkedinAdsStream:
"""Defines the parent stream for slicing, the class object should be provided."""

def get_json_schema(self) -> Mapping[str, Any]:
schema = ResourceSchemaLoader(package_name_from_class(self.__class__)).get_schema("ad_analytics")
return schema
Expand Down Expand Up @@ -270,15 +300,41 @@ def stream_slices(
}
"""
parent_stream = self.parent_stream(config=self.config)
stream_state = stream_state or {self.cursor_field: self.config.get("start_date")}
for record in parent_stream.read_records(sync_mode=sync_mode):
base_slice = get_parent_stream_values(record, self.parent_values_map)
for date_slice in self.get_date_slices(stream_state.get(self.cursor_field), self.config.get("end_date")):
date_slice_with_fields: List = []
for fields_set in self.chunk_analytics_fields():
base_slice["fields"] = ",".join(fields_set)
date_slice_with_fields.append(base_slice | date_slice)
yield {"field_date_chunks": date_slice_with_fields}

stream_state = {
self.cursor_field: self.state.get(self.cursor_field, None) or self.config.get('start_date')
}

base_slices = []
for record in parent_stream.read_records(sync_mode=sync_mode, fetch_additional_fields=False):
base_slices.append(get_parent_stream_values(record, self.parent_values_map))

date_slices = []
for date_slice in self.get_date_slices(stream_state.get(self.cursor_field), self.config.get("end_date")):
date_slices.append(date_slice)

for date_slice in date_slices:
for base_slice in base_slices:
base_with_date_and_fields_slices = []
for field_set in self.chunk_analytics_fields():
base_slice["fields"] = ",".join(field_set)
base_with_date_and_fields_slices.append(base_slice | date_slice)
yield {"field_date_chunks": base_with_date_and_fields_slices}

# After reading an entire date slice, we update state to the next start date for the next date slice.
last_end_dt = datetime(
year = date_slice.get('dateRange').get('end.year'),
month = date_slice.get('dateRange').get('end.month'),
day = date_slice.get('dateRange').get('end.day'),
tzinfo=UTC
)

# Move the current datetime back 2 hours to catch the final metrics for the previous day.
current_dt = datetime.now(tz=UTC) - timedelta(hours=2)

new_cursor = min(last_end_dt, current_dt).strftime('%Y-%m-%d')
self.logger.info(f'Updating cursor to {new_cursor}.')
self.state = {self.cursor_field: new_cursor}

@staticmethod
def get_date_slices(start_date: str, end_date: str = None, window_in_days: int = WINDOW_IN_DAYS) -> Iterable[Mapping[str, Any]]:
Expand Down
84 changes: 62 additions & 22 deletions source-linkedin-ads-v2/source_linkedin_ads_v2/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@

import logging
from abc import ABC, abstractmethod
from typing import Any, Dict, Iterable, Mapping, MutableMapping, Optional
from typing import Any, Dict, Iterable, Mapping, MutableMapping, Optional, List
from urllib.parse import quote, urlencode

import pendulum
import requests
from airbyte_cdk.sources.streams.core import StreamData
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
from airbyte_protocol.models import SyncMode

from .utils import get_parent_stream_values, transform_data

Expand Down Expand Up @@ -116,6 +118,18 @@ def should_retry(self, response: requests.Response) -> bool:
self.logger.error(error_message)
return super().should_retry(response)

def read_records(
self,
sync_mode: SyncMode,
cursor_field: Optional[List[str]] = None,
stream_slice: Optional[Mapping[str, Any]] = None,
stream_state: Optional[Mapping[str, Any]] = None,
**kwargs,
) -> Iterable[StreamData]:
yield from self._read_pages(
lambda req, res, state, _slice: self.parse_response(res, stream_slice=_slice, stream_state=state), stream_slice, stream_state
)


class OffsetPaginationMixin:
"""Mixin for offset based pagination for endpoints tha tdoesnt support cursor based pagination"""
Expand Down Expand Up @@ -399,35 +413,61 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late
return {self.cursor_field: max(latest_record.get(self.cursor_field), int(current_stream_state.get(self.cursor_field)))}


def _fetch_creative_name(self, record: StreamData, stream_state: Mapping[str, Any] = None) -> str | None:
# We try to add a creative_name field to each record. LinkedIn does not always
# include this data in the /rest/adAccounts/{account_id}/creative endpoint's
# response depending on the creative's type. If that's the case, we try to
# get the creative name from /rest/post/{encoded_share_urn}.

creative_name = None
# Check if the creative name is already in the in record.
# We may need to handle more types of creatives like this beyond just text ads.
text_ad_name = record.get('content', {}).get('textAd', {}).get('headline', None)
if text_ad_name:
creative_name = text_ad_name
# Otherwise, we check /rest/post/{encoded_share_urn} for the creative_name.
else:
share_urn = record.get('content', {}).get('reference', None)
if share_urn:
url = f"{self.url_base}posts/{quote(share_urn)}"
headers = super().request_headers(stream_state)
# Get the access token directly from the config or via the Oauth2Authenticator.
access_token = self.config.get('credentials', {}).get('access_token', None) or self.config.get('authenticator', None)._access_token
headers.update({"Authorization": f"Bearer {access_token}"})
try:
response = requests.request(method='GET', url=url, headers=headers)
if response.status_code != 200:
raise RuntimeError(response.text)
dsc_name = response.json().get('adContext', {}).get('dscName', None)
if dsc_name:
creative_name = dsc_name
except RuntimeError as err:
# Some creatives won't have names due to LinkedIn's data retention limits. We notify that we couldn't
# retrieve the creative name, and still emit the record.
self.logger.warning(f"Encountered error when fetching creative name for creative {record.get('id')} : {err}")

return creative_name

def read_records(
self, stream_state: Mapping[str, Any] = None, stream_slice: Optional[Mapping[str, Any]] = None, **kwargs
self, stream_state: Mapping[str, Any] = None, stream_slice: Optional[Mapping[str, Any]] = None, fetch_additional_fields: bool = True, **kwargs
) -> Iterable[Mapping[str, Any]]:
stream_state = stream_state or {}
parent_stream = self.parent_stream(config=self.config)
for record in parent_stream.read_records(**kwargs):
child_stream_slice = super().read_records(stream_slice=get_parent_stream_values(record, self.parent_values_map), **kwargs)
for child_records in child_stream_slice:
try:
creative_id = child_records.get('id').split(":")[-1]
url_creative = f"https://api.linkedin.com/v2/adCreativesV2/{creative_id}?projection=(variables(data(*,com.linkedin.ads.SponsoredUpdateCreativeVariables(*,share~(subject,text(text),content(contentEntities(*(description,entityLocation,title))))))))"
headers = super().request_headers(stream_state)
headers.update({"Authorization": f"Bearer {self.config['authenticator']._access_token}"})
response_creative = requests.get(url=url_creative,headers=headers).json()
if response_creative["variables"]["data"].get("com.linkedin.ads.TextAdCreativeVariables"):
child_records["creative_name"] = response_creative["variables"]["data"]["com.linkedin.ads.TextAdCreativeVariables"]["title"]
elif response_creative["variables"]["data"]["com.linkedin.ads.SponsoredUpdateCreativeVariables"]["share~"].get("subject"):
child_records["creative_name"] = response_creative["variables"]["data"]["com.linkedin.ads.SponsoredUpdateCreativeVariables"]["share~"]["subject"]
if stream_state:
if child_records[self.cursor_field] >= stream_state.get(self.cursor_field):
yield child_records
else:
continue
else:
yield child_records
except Exception as e:
self.logger.error(f"{e}")
for child_record in child_stream_slice:

# Do not emit records that have a cursor value earlier than the last cursor we've seen.
if stream_state and child_record[self.cursor_field] < stream_state.get(self.cursor_field):
continue

if fetch_additional_fields:
creative_name = self._fetch_creative_name(record=child_record, stream_state=stream_state)

if creative_name:
child_record["creative_name"] = creative_name

yield child_record


class Conversions(OffsetPaginationMixin, LinkedInAdsStreamSlicing):
Expand Down
Empty file.
Loading