Skip to content

Commit

Permalink
Pre beta bugfixes (AutoIDM#39)
Browse files Browse the repository at this point in the history
* break out field exlusion tests for parallelism

* remove click_performance_report

* update selection for click_performance_report, failing mutual exlusion case commented out

* [skip ci] WIP on invalid exclusion test

* fix sync canary

* Fix field exclusion

* Remove unused field `selectable`

* Give `_sdc_record_hash` a behavior for consistency

* Fix integration test

* refactor field gatherer, uncomment assertions, test failing

* exlusion tests passing locally

* adding mutual exlusion check to discovery

* Transform `type_` to `type` (AutoIDM#36)

Co-authored-by: Bryant Gray <[email protected]>

* Report streams prefix resource names (AutoIDM#37)

* Prepend resource name in the schema

* Prepend resource name in the metadata

* Prepend resource name in the transform_keys

* Prepend resource name in expected_default_fields

Co-authored-by: Bryant Gray <[email protected]>

* Change `_sdc_record_hash` to sorted list of tuples (AutoIDM#38)

Co-authored-by: Bryant Gray <[email protected]>

* Fix tests

* Fix more tests

Co-authored-by: kspeer <[email protected]>
Co-authored-by: Bryant Gray <[email protected]>
Co-authored-by: dsprayberry <[email protected]>
  • Loading branch information
4 people authored Mar 24, 2022
1 parent d70e75b commit 1ec469b
Show file tree
Hide file tree
Showing 15 changed files with 444 additions and 371 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ jobs:

run_integration_tests:
executor: docker-executor
parallelism: 16
parallelism: 18
steps:
- checkout
- attach_workspace:
Expand Down
5 changes: 5 additions & 0 deletions tap_google_ads/discover.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@ def create_resource_schema(config):
for field in attributes + metrics + segments:
field_schema = dict(resource_schema[field])

if field_schema["name"] in segments:
field_schema["category"] = "SEGMENT"

fields[field_schema["name"]] = {
"field_details": field_schema,
"incompatible_fields": [],
Expand All @@ -192,6 +195,8 @@ def create_resource_schema(config):
metrics_and_segments = set(metrics + segments)

for field_name, field in fields.items():
if field["field_details"]["category"] == "ATTRIBUTE":
continue
for compared_field in metrics_and_segments:
field_root_resource = get_root_resource_name(field_name)
compared_field_root_resource = get_root_resource_name(compared_field)
Expand Down
70 changes: 43 additions & 27 deletions tap_google_ads/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,12 @@ def create_report_query(resource_name, selected_fields, query_date):

def generate_hash(record, metadata):
metadata = singer.metadata.to_map(metadata)
fields_to_hash = {}
fields_to_hash = []
for key, val in record.items():
if metadata[("properties", key)]["behavior"] != "METRIC":
fields_to_hash[key] = val
fields_to_hash.append((key, val))

hash_source_data = {key: fields_to_hash[key] for key in sorted(fields_to_hash)}
hash_source_data = sorted(fields_to_hash, key=lambda x: x[0])
hash_bytes = json.dumps(hash_source_data).encode("utf-8")
return hashlib.sha256(hash_bytes).hexdigest()

Expand Down Expand Up @@ -152,6 +152,18 @@ def make_request(gas, query, customer_id):
return response


def google_message_to_json(message):
"""
The proto field name for `type` is `type_` which will
get stripped by the Transformer. So we replace all
instances of the key `"type_"` before `json.loads`ing it
"""

json_string = MessageToJson(message, preserving_proto_field_name=True)
json_string = json_string.replace('"type_":', '"type":')
return json.loads(json_string)


class BaseStream: # pylint: disable=too-many-instance-attributes

def __init__(self, fields, google_ads_resource_names, resource_schema, primary_keys):
Expand All @@ -172,7 +184,6 @@ def extract_field_information(self, resource_schema):
self.field_exclusions = defaultdict(set)
self.schema = {}
self.behavior = {}
self.selectable = {}

for resource_name in self.google_ads_resource_names:

Expand All @@ -188,7 +199,6 @@ def extract_field_information(self, resource_schema):

self.behavior[field_name] = field["field_details"]["category"]

self.selectable[field_name] = field["field_details"]["selectable"]
self.add_extra_fields(resource_schema)
self.field_exclusions = {k: list(v) for k, v in self.field_exclusions.items()}

Expand Down Expand Up @@ -327,7 +337,7 @@ def sync(self, sdk_client, customer, stream, config, state): # pylint: disable=u
with Transformer() as transformer:
# Pages are fetched automatically while iterating through the response
for message in response:
json_message = json.loads(MessageToJson(message, preserving_proto_field_name=True))
json_message = google_message_to_json(message)
transformed_obj = self.transform_keys(json_message)
record = transformer.transform(transformed_obj, stream["schema"], singer.metadata.to_map(stream_mdata))

Expand Down Expand Up @@ -369,13 +379,12 @@ def format_field_names(self):
"""
for resource_name, schema in self.full_schema["properties"].items():
for field_name, data_type in schema["properties"].items():
# Ensure that attributed resource fields have the resource name as a prefix, eg campaign_id under the ad_groups stream
if resource_name not in {"metrics", "segments"} and resource_name not in self.google_ads_resource_names:
self.stream_schema["properties"][f"{resource_name}_{field_name}"] = data_type
# Move ad_group_ad.ad.x fields up a level in the schema (ad_group_ad.ad.x -> ad_group_ad.x)
elif resource_name == "ad_group_ad" and field_name == "ad":
if resource_name == "ad_group_ad" and field_name == "ad":
for ad_field_name, ad_field_schema in data_type["properties"].items():
self.stream_schema["properties"][ad_field_name] = ad_field_schema
elif resource_name not in {"metrics", "segments"}:
self.stream_schema["properties"][f"{resource_name}_{field_name}"] = data_type
else:
self.stream_schema["properties"][field_name] = data_type

Expand All @@ -388,22 +397,22 @@ def build_stream_metadata(self):
"valid-replication-keys": ["date"]
},
("properties", "_sdc_record_hash"): {
"inclusion": "automatic"
"inclusion": "automatic",
"behavior": "PRIMARY KEY"
},
}
for report_field in self.fields:
# Transform the field name to match the schema
is_metric_or_segment = report_field.startswith("metrics.") or report_field.startswith("segments.")
if (not is_metric_or_segment
and report_field.split(".")[0] not in self.google_ads_resource_names
):
transformed_field_name = "_".join(report_field.split(".")[:2])
# Transform ad_group_ad.ad.x fields to just x to reflect ad_group_ads schema
elif report_field.startswith("ad_group_ad.ad."):
if report_field.startswith("ad_group_ad.ad."):
transformed_field_name = report_field.split(".")[2]
elif not is_metric_or_segment:
transformed_field_name = "_".join(report_field.split(".")[:2])
else:
transformed_field_name = report_field.split(".")[1]

# TODO: Maybe refactor this
# metadata_key = ("properties", transformed_field_name)
# Base metadata for every field
if ("properties", transformed_field_name) not in self.stream_metadata:
self.stream_metadata[("properties", transformed_field_name)] = {
Expand All @@ -414,9 +423,7 @@ def build_stream_metadata(self):
# Transform field exclusion names so they match the schema
for field_name in self.field_exclusions[report_field]:
is_metric_or_segment = field_name.startswith("metrics.") or field_name.startswith("segments.")
if (not is_metric_or_segment
and field_name.split(".")[0] not in self.google_ads_resource_names
):
if not is_metric_or_segment:
new_field_name = field_name.replace(".", "_")
else:
new_field_name = field_name.split(".")[1]
Expand All @@ -442,13 +449,22 @@ def transform_keys(self, obj):
transformed_obj = {}

for resource_name, value in obj.items():
if resource_name == "ad_group_ad":
transformed_obj.update(value["ad"])
else:
if resource_name in {"metrics", "segments"}:
transformed_obj.update(value)

if "type_" in transformed_obj:
transformed_obj["type"] = transformed_obj.pop("type_")
elif resource_name == "ad_group_ad":
for key, sub_value in value.items():
if key == 'ad':
transformed_obj.update(sub_value)
else:
transformed_obj.update({f"{resource_name}_{key}": sub_value})
else:
# value = {"a": 1, "b":2}
# turns into
# {"resource_a": 1, "resource_b": 2}
transformed_obj.update(
{f"{resource_name}_{key}": sub_value
for key, sub_value in value.items()}
)

return transformed_obj

Expand Down Expand Up @@ -507,7 +523,7 @@ def sync(self, sdk_client, customer, stream, config, state):
with Transformer() as transformer:
# Pages are fetched automatically while iterating through the response
for message in response:
json_message = json.loads(MessageToJson(message, preserving_proto_field_name=True))
json_message = google_message_to_json(message)
transformed_obj = self.transform_keys(json_message)
record = transformer.transform(transformed_obj, stream["schema"])
record["_sdc_record_hash"] = generate_hash(record, stream_mdata)
Expand Down
33 changes: 16 additions & 17 deletions tests/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,6 @@ def expected_default_fields():
},
"ad_group_audience_performance_report": {
'ad_group_name',
'user_list_name',
},
"campaign_performance_report": {
'average_cpc', # Avg. CPC,
Expand All @@ -631,24 +630,24 @@ def expected_default_fields():
'view_through_conversions', # View-through conv.,
},
"click_performance_report": {
'ad_group_ad',
'click_view_ad_group_ad',
'ad_group_id',
'ad_group_name',
'ad_group_status',
'ad_network_type',
'area_of_interest',
'campaign_location_target',
'click_view_area_of_interest',
'click_view_campaign_location_target',
'click_type',
'clicks',
'customer_descriptive_name',
'customer_id',
'device',
'gclid',
'location_of_presence',
'click_view_gclid',
'click_view_location_of_presence',
'month_of_year',
'page_number',
'click_view_page_number',
'slot',
'user_list',
'click_view_user_list',
},
"display_keyword_performance_report": { # TODO NO DATA AVAILABLE
'ad_group_name',
Expand Down Expand Up @@ -702,13 +701,13 @@ def expected_default_fields():
'click_type',
'clicks',
'date',
'descriptive_name',
'id',
'customer_descriptive_name',
'customer_id',
'impressions',
'invalid_clicks',
'manager',
'test_account',
'time_zone',
'customer_manager',
'customer_test_account',
'customer_time_zone',
},
"geo_performance_report": {
'clicks',
Expand Down Expand Up @@ -739,7 +738,7 @@ def expected_default_fields():
'conversions',
'view_through_conversions', # View-through conv.,
'cost_per_conversion', # Cost / conv.,
'search_term',
'search_term_view_search_term',
'search_term_match_type',
},
"age_range_performance_report": {
Expand All @@ -761,7 +760,7 @@ def expected_default_fields():
'clicks',
'cost_micros',
'interactions',
'placeholder_type',
'feed_placeholder_view_placeholder_type',
},
'user_location_performance_report': {
'campaign_id',
Expand All @@ -773,14 +772,14 @@ def expected_default_fields():
'campaign_name',
'clicks',
'average_cpc',
'unexpanded_final_url',
'landing_page_view_unexpanded_final_url',
},
'expanded_landing_page_report': {
'ad_group_name',
'campaign_name',
'clicks',
'average_cpc',
'expanded_final_url',
'expanded_landing_page_view_expanded_final_url',
},
'campaign_audience_performance_report': {
'campaign_name',
Expand Down
Loading

0 comments on commit 1ec469b

Please sign in to comment.