Skip to content

Commit

Permalink
Crest master (#66)
Browse files Browse the repository at this point in the history
* Tdl 19235 handle uncaught exceptions (AutoIDM#61)

* Added backoff for 5xx, 429 and ReadTimeout errors.

* Resolved pylint error.

* Updated comments in the unittest cases.

* Updated error handling.

* TDL-18749 Implement interruptible full table streams. (AutoIDM#60)

* Implemented interruptible full table streams.

* Resolved pylint error

* Resolved error in full table sync test case.

* Updated config.yml to pass cci

* Updated query building logic.

* Updated integration test case.

* Resolved review comments.

* Resolved comments.

* Implemeted logic to skip the duplicate records.

* Resolved unittest case error.

* Resolved pylint error

* Resolved integration test case error

* Added empty filter param for call_details and campaign_label stream.

* Added unit test cases for should_sync method.

* Revert "Implemeted logic to skip the duplicate records."

This reverts commit cd06e11657bd35edbaefcd7f8f12acfb938e05ec.

* Added logger message for debugging purpose

* Updated integration test case.

* Replaced .format with f string.

* Updated comment in integration test.

Co-authored-by: KrishnanG <[email protected]>
  • Loading branch information
prijendev and KrishnanG authored Jun 15, 2022
1 parent a9ca558 commit ba39b45
Show file tree
Hide file tree
Showing 5 changed files with 435 additions and 15 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ jobs:

run_integration_tests:
executor: docker-executor
parallelism: 18
parallelism: 19
steps:
- checkout
- attach_workspace:
Expand Down
133 changes: 119 additions & 14 deletions tap_google_ads/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
from datetime import timedelta
import singer
from singer import Transformer
from singer import utils
from singer import utils, metrics
from google.protobuf.json_format import MessageToJson
from google.ads.googleads.errors import GoogleAdsException
from google.api_core.exceptions import ServerError, TooManyRequests
from requests.exceptions import ReadTimeout
import backoff
from . import report_definitions

Expand All @@ -25,7 +27,7 @@
)

DEFAULT_CONVERSION_WINDOW = 30

DEFAULT_PAGE_SIZE = 1000

def get_conversion_window(config):
"""Fetch the conversion window from the config and error on invalid values"""
Expand Down Expand Up @@ -80,9 +82,60 @@ def build_parameters():
param_str = ",".join(f"{k}={v}" for k, v in API_PARAMETERS.items())
return f"PARAMETERS {param_str}"

def generate_where_and_orderby_clause(last_pk_fetched, filter_param, composite_pks):
"""
Generates a WHERE clause and a ORDER BY clause based on filter parameter(`key_properties`), and
`last_pk_fetched`.
Example:
Single PK Case:
filter_param = 'id'
last_pk_fetched = 1
composite_pks = False
Returns:
WHERE id > 1 ORDER BY id ASC
Composite PK Case:
composite_pks = True
filter_param = 'id'
last_pk_fetched = 1
Returns:
WHERE id >= 1 ORDER BY id ASC
"""
where_clause = ""
order_by_clause = ""

# Even If the stream has a composite primary key, we are storing only a single pk value in the bookmark.
# So, there might be possible that records with the same single pk value exist with different pk value combinations.
# That's why for composite_pks we are using a greater than or equal operator.
comparison_operator = ">="

if not composite_pks:
# Exclude equality for the stream which do not have a composite primary key.
# Because in single pk case we are sure that no other record will have the same pk.
# So, we do not want to fetch the last record again.
comparison_operator = ">"

if filter_param:
# Create ORDER BY clause for the stream which support filter parameter.
order_by_clause = f"ORDER BY {filter_param} ASC"

if last_pk_fetched:
# Create WHERE clause based on last_pk_fetched.
where_clause = f'WHERE {filter_param} {comparison_operator} {last_pk_fetched} '

return f'{where_clause}{order_by_clause}'

def create_core_stream_query(resource_name, selected_fields, last_pk_fetched, filter_param, composite_pks):

# Generate a query using WHERE and ORDER BY parameters.
where_order_by_clause = generate_where_and_orderby_clause(last_pk_fetched, filter_param, composite_pks)

core_query = f"SELECT {','.join(selected_fields)} FROM {resource_name} {where_order_by_clause} {build_parameters()}"

def create_core_stream_query(resource_name, selected_fields):
core_query = f"SELECT {','.join(selected_fields)} FROM {resource_name} {build_parameters()}"
return core_query


Expand Down Expand Up @@ -117,6 +170,13 @@ def generate_hash(record, metadata):


def should_give_up(ex):

# ServerError is the parent class of InternalServerError, MethodNotImplemented, BadGateway,
# ServiceUnavailable, GatewayTimeout, DataLoss and Unknown classes.
# Return False for all above errors and ReadTimeout error.
if isinstance(ex, (ServerError, TooManyRequests, ReadTimeout)):
return False

if isinstance(ex, AttributeError):
if str(ex) == "'NoneType' object has no attribute 'Call'":
LOGGER.info('Retrying request due to AttributeError')
Expand All @@ -141,12 +201,14 @@ def on_giveup_func(err):

@backoff.on_exception(backoff.expo,
(GoogleAdsException,
ServerError, TooManyRequests,
ReadTimeout,
AttributeError),
max_tries=5,
jitter=None,
giveup=should_give_up,
on_giveup=on_giveup_func,
logger=None)
)
def make_request(gas, query, customer_id):
response = gas.search(query=query, customer_id=customer_id)
return response
Expand All @@ -172,11 +234,12 @@ def filter_out_non_attribute_fields(fields):

class BaseStream: # pylint: disable=too-many-instance-attributes

def __init__(self, fields, google_ads_resource_names, resource_schema, primary_keys, automatic_keys = None):
def __init__(self, fields, google_ads_resource_names, resource_schema, primary_keys, automatic_keys = None, filter_param = None):
self.fields = fields
self.google_ads_resource_names = google_ads_resource_names
self.primary_keys = primary_keys
self.automatic_keys = automatic_keys if automatic_keys else set()
self.filter_param = filter_param
self.extract_field_information(resource_schema)

self.create_full_schema(resource_schema)
Expand Down Expand Up @@ -330,22 +393,44 @@ def sync(self, sdk_client, customer, stream, config, state): # pylint: disable=u
state = singer.set_currently_syncing(state, [stream_name, customer["customerId"]])
singer.write_state(state)

query = create_core_stream_query(resource_name, selected_fields)
# last run was interrupted if there is a bookmark available for core streams.
last_pk_fetched = singer.get_bookmark(state,
stream["tap_stream_id"],
customer["customerId"]) or {}

# Assign True if the primary key is composite.
composite_pks = len(self.primary_keys) > 1

query = create_core_stream_query(resource_name, selected_fields, last_pk_fetched.get('last_pk_fetched'), self.filter_param, composite_pks)
try:
response = make_request(gas, query, customer["customerId"])
except GoogleAdsException as err:
LOGGER.warning("Failed query: %s", query)
raise err

with Transformer() as transformer:
# Pages are fetched automatically while iterating through the response
for message in response:
json_message = google_message_to_json(message)
transformed_message = self.transform_keys(json_message)
record = transformer.transform(transformed_message, stream["schema"], singer.metadata.to_map(stream_mdata))
with metrics.record_counter(stream_name) as counter:
with Transformer() as transformer:
# Pages are fetched automatically while iterating through the response
for message in response:
json_message = google_message_to_json(message)
transformed_message = self.transform_keys(json_message)
record = transformer.transform(transformed_message, stream["schema"], singer.metadata.to_map(stream_mdata))

singer.write_record(stream_name, record)
counter.increment()

# Write state(last_pk_fetched) using primary key(id) value for core streams after DEFAULT_PAGE_SIZE records
if counter.value % DEFAULT_PAGE_SIZE == 0 and self.filter_param:
bookmark_value = record[self.primary_keys[0]]
singer.write_bookmark(state, stream["tap_stream_id"], customer["customerId"], {'last_pk_fetched': bookmark_value})

singer.write_record(stream_name, record)
singer.write_state(state)
LOGGER.info("Write state for stream: %s, value: %s", stream_name, bookmark_value)

# Flush the state for core streams if sync is completed
if stream["tap_stream_id"] in state.get('bookmarks', {}):
state['bookmarks'].pop(stream["tap_stream_id"])
singer.write_state(state)

def get_query_date(start_date, bookmark, conversion_window_date):
"""Return a date within the conversion window and after start date
Expand Down Expand Up @@ -620,12 +705,14 @@ def initialize_core_streams(resource_schema):
resource_schema,
["id"],
{"customer_id"},
filter_param="accessible_bidding_strategy.id"
),
"accounts": BaseStream(
report_definitions.ACCOUNT_FIELDS,
["customer"],
resource_schema,
["id"],
filter_param="customer.id"
),
"ad_groups": BaseStream(
report_definitions.AD_GROUP_FIELDS,
Expand All @@ -636,6 +723,7 @@ def initialize_core_streams(resource_schema):
"campaign_id",
"customer_id",
},
filter_param="ad_group.id"
),
"ad_group_criterion": BaseStream(
report_definitions.AD_GROUP_CRITERION_FIELDS,
Expand All @@ -646,6 +734,7 @@ def initialize_core_streams(resource_schema):
"campaign_id",
"customer_id",
},
filter_param="ad_group.id"
),
"ads": BaseStream(
report_definitions.AD_GROUP_AD_FIELDS,
Expand All @@ -657,13 +746,15 @@ def initialize_core_streams(resource_schema):
"campaign_id",
"customer_id",
},
filter_param = "ad_group_ad.ad.id"
),
"bidding_strategies": BaseStream(
report_definitions.BIDDING_STRATEGY_FIELDS,
["bidding_strategy"],
resource_schema,
["id"],
{"customer_id"},
filter_param="bidding_strategy.id"
),
"call_details": BaseStream(
report_definitions.CALL_VIEW_FIELDS,
Expand All @@ -682,20 +773,23 @@ def initialize_core_streams(resource_schema):
resource_schema,
["id"],
{"customer_id"},
filter_param="campaign.id"
),
"campaign_budgets": BaseStream(
report_definitions.CAMPAIGN_BUDGET_FIELDS,
["campaign_budget"],
resource_schema,
["id"],
{"customer_id"},
filter_param="campaign_budget.id"
),
"campaign_criterion": BaseStream(
report_definitions.CAMPAIGN_CRITERION_FIELDS,
["campaign_criterion"],
resource_schema,
["campaign_id","criterion_id"],
{"customer_id"},
filter_param="campaign.id"
),
"campaign_labels": BaseStream(
report_definitions.CAMPAIGN_LABEL_FIELDS,
Expand All @@ -713,13 +807,15 @@ def initialize_core_streams(resource_schema):
["carrier_constant"],
resource_schema,
["id"],
filter_param="carrier_constant.id"
),
"feed": BaseStream(
report_definitions.FEED_FIELDS,
["feed"],
resource_schema,
["id"],
{"customer_id"},
filter_param="feed.id"
),
"feed_item": BaseStream(
report_definitions.FEED_ITEM_FIELDS,
Expand All @@ -730,56 +826,65 @@ def initialize_core_streams(resource_schema):
"customer_id",
"feed_id",
},
filter_param="feed_item.id"
),
"labels": BaseStream(
report_definitions.LABEL_FIELDS,
["label"],
resource_schema,
["id"],
{"customer_id"},
filter_param="label.id"
),
"language_constant": BaseStream(
report_definitions.LANGUAGE_CONSTANT_FIELDS,
["language_constant"],
resource_schema,
["id"],
filter_param="language_constant.id"
),
"mobile_app_category_constant": BaseStream(
report_definitions.MOBILE_APP_CATEGORY_CONSTANT_FIELDS,
["mobile_app_category_constant"],
resource_schema,
["id"],
filter_param="mobile_app_category_constant.id"
),
"mobile_device_constant": BaseStream(
report_definitions.MOBILE_DEVICE_CONSTANT_FIELDS,
["mobile_device_constant"],
resource_schema,
["id"],
filter_param="mobile_device_constant.id"
),
"operating_system_version_constant": BaseStream(
report_definitions.OPERATING_SYSTEM_VERSION_CONSTANT_FIELDS,
["operating_system_version_constant"],
resource_schema,
["id"],
filter_param="operating_system_version_constant.id"
),
"topic_constant": BaseStream(
report_definitions.TOPIC_CONSTANT_FIELDS,
["topic_constant"],
resource_schema,
["id"],
filter_param="topic_constant.id"
),
"user_interest": UserInterestStream(
report_definitions.USER_INTEREST_FIELDS,
["user_interest"],
resource_schema,
["id"],
filter_param="user_interest.user_interest_id"
),
"user_list": BaseStream(
report_definitions.USER_LIST_FIELDS,
["user_list"],
resource_schema,
["id"],
{"customer_id"},
filter_param="user_list.id"
),
}

Expand Down
Loading

0 comments on commit ba39b45

Please sign in to comment.