From 1ec469b6aef0eb477e258df23059ddef4707de6f Mon Sep 17 00:00:00 2001 From: Andy Lu Date: Thu, 24 Mar 2022 13:43:58 -0400 Subject: [PATCH] Pre beta bugfixes (#39) * break out field exlusion tests for parallelism * remove click_performance_report * update selection for click_performance_report, failing mutual exlusion case commented out * [skip ci] WIP on invalid exclusion test * fix sync canary * Fix field exclusion * Remove unused field `selectable` * Give `_sdc_record_hash` a behavior for consistency * Fix integration test * refactor field gatherer, uncomment assertions, test failing * exlusion tests passing locally * adding mutual exlusion check to discovery * Transform `type_` to `type` (#36) Co-authored-by: Bryant Gray * Report streams prefix resource names (#37) * Prepend resource name in the schema * Prepend resource name in the metadata * Prepend resource name in the transform_keys * Prepend resource name in expected_default_fields Co-authored-by: Bryant Gray * Change `_sdc_record_hash` to sorted list of tuples (#38) Co-authored-by: Bryant Gray * Fix tests * Fix more tests Co-authored-by: kspeer Co-authored-by: Bryant Gray Co-authored-by: dsprayberry <28106103+dsprayberry@users.noreply.github.com> --- .circleci/config.yml | 2 +- tap_google_ads/discover.py | 5 + tap_google_ads/streams.py | 70 ++++--- tests/base.py | 33 ++- tests/base_google_ads_field_exclusion.py | 138 +++++++++++++ tests/test_google_ads_discovery.py | 39 ++-- tests/test_google_ads_field_exclusion.py | 186 ----------------- tests/test_google_ads_field_exclusion_1.py | 27 +++ tests/test_google_ads_field_exclusion_2.py | 27 +++ tests/test_google_ads_field_exclusion_3.py | 27 +++ tests/test_google_ads_field_exclusion_4.py | 27 +++ ...est_google_ads_field_exclusion_coverage.py | 36 ++++ ...test_google_ads_field_exclusion_invalid.py | 190 +++++++----------- tests/test_google_ads_sync_canary.py | 6 +- tests/unittests/test_utils.py | 2 +- 15 files changed, 444 insertions(+), 371 deletions(-) create mode 100644 tests/base_google_ads_field_exclusion.py delete mode 100644 tests/test_google_ads_field_exclusion.py create mode 100644 tests/test_google_ads_field_exclusion_1.py create mode 100644 tests/test_google_ads_field_exclusion_2.py create mode 100644 tests/test_google_ads_field_exclusion_3.py create mode 100644 tests/test_google_ads_field_exclusion_4.py create mode 100644 tests/test_google_ads_field_exclusion_coverage.py diff --git a/.circleci/config.yml b/.circleci/config.yml index 6a0f25e..2a5b437 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -69,7 +69,7 @@ jobs: run_integration_tests: executor: docker-executor - parallelism: 16 + parallelism: 18 steps: - checkout - attach_workspace: diff --git a/tap_google_ads/discover.py b/tap_google_ads/discover.py index 5a5286d..098e5b9 100644 --- a/tap_google_ads/discover.py +++ b/tap_google_ads/discover.py @@ -183,6 +183,9 @@ def create_resource_schema(config): for field in attributes + metrics + segments: field_schema = dict(resource_schema[field]) + if field_schema["name"] in segments: + field_schema["category"] = "SEGMENT" + fields[field_schema["name"]] = { "field_details": field_schema, "incompatible_fields": [], @@ -192,6 +195,8 @@ def create_resource_schema(config): metrics_and_segments = set(metrics + segments) for field_name, field in fields.items(): + if field["field_details"]["category"] == "ATTRIBUTE": + continue for compared_field in metrics_and_segments: field_root_resource = get_root_resource_name(field_name) compared_field_root_resource = get_root_resource_name(compared_field) diff --git a/tap_google_ads/streams.py b/tap_google_ads/streams.py index b3c596b..60dd907 100644 --- a/tap_google_ads/streams.py +++ b/tap_google_ads/streams.py @@ -97,12 +97,12 @@ def create_report_query(resource_name, selected_fields, query_date): def generate_hash(record, metadata): metadata = singer.metadata.to_map(metadata) - fields_to_hash = {} + fields_to_hash = [] for key, val in record.items(): if metadata[("properties", key)]["behavior"] != "METRIC": - fields_to_hash[key] = val + fields_to_hash.append((key, val)) - hash_source_data = {key: fields_to_hash[key] for key in sorted(fields_to_hash)} + hash_source_data = sorted(fields_to_hash, key=lambda x: x[0]) hash_bytes = json.dumps(hash_source_data).encode("utf-8") return hashlib.sha256(hash_bytes).hexdigest() @@ -152,6 +152,18 @@ def make_request(gas, query, customer_id): return response +def google_message_to_json(message): + """ + The proto field name for `type` is `type_` which will + get stripped by the Transformer. So we replace all + instances of the key `"type_"` before `json.loads`ing it + """ + + json_string = MessageToJson(message, preserving_proto_field_name=True) + json_string = json_string.replace('"type_":', '"type":') + return json.loads(json_string) + + class BaseStream: # pylint: disable=too-many-instance-attributes def __init__(self, fields, google_ads_resource_names, resource_schema, primary_keys): @@ -172,7 +184,6 @@ def extract_field_information(self, resource_schema): self.field_exclusions = defaultdict(set) self.schema = {} self.behavior = {} - self.selectable = {} for resource_name in self.google_ads_resource_names: @@ -188,7 +199,6 @@ def extract_field_information(self, resource_schema): self.behavior[field_name] = field["field_details"]["category"] - self.selectable[field_name] = field["field_details"]["selectable"] self.add_extra_fields(resource_schema) self.field_exclusions = {k: list(v) for k, v in self.field_exclusions.items()} @@ -327,7 +337,7 @@ def sync(self, sdk_client, customer, stream, config, state): # pylint: disable=u with Transformer() as transformer: # Pages are fetched automatically while iterating through the response for message in response: - json_message = json.loads(MessageToJson(message, preserving_proto_field_name=True)) + json_message = google_message_to_json(message) transformed_obj = self.transform_keys(json_message) record = transformer.transform(transformed_obj, stream["schema"], singer.metadata.to_map(stream_mdata)) @@ -369,13 +379,12 @@ def format_field_names(self): """ for resource_name, schema in self.full_schema["properties"].items(): for field_name, data_type in schema["properties"].items(): - # Ensure that attributed resource fields have the resource name as a prefix, eg campaign_id under the ad_groups stream - if resource_name not in {"metrics", "segments"} and resource_name not in self.google_ads_resource_names: - self.stream_schema["properties"][f"{resource_name}_{field_name}"] = data_type # Move ad_group_ad.ad.x fields up a level in the schema (ad_group_ad.ad.x -> ad_group_ad.x) - elif resource_name == "ad_group_ad" and field_name == "ad": + if resource_name == "ad_group_ad" and field_name == "ad": for ad_field_name, ad_field_schema in data_type["properties"].items(): self.stream_schema["properties"][ad_field_name] = ad_field_schema + elif resource_name not in {"metrics", "segments"}: + self.stream_schema["properties"][f"{resource_name}_{field_name}"] = data_type else: self.stream_schema["properties"][field_name] = data_type @@ -388,22 +397,22 @@ def build_stream_metadata(self): "valid-replication-keys": ["date"] }, ("properties", "_sdc_record_hash"): { - "inclusion": "automatic" + "inclusion": "automatic", + "behavior": "PRIMARY KEY" }, } for report_field in self.fields: # Transform the field name to match the schema is_metric_or_segment = report_field.startswith("metrics.") or report_field.startswith("segments.") - if (not is_metric_or_segment - and report_field.split(".")[0] not in self.google_ads_resource_names - ): - transformed_field_name = "_".join(report_field.split(".")[:2]) # Transform ad_group_ad.ad.x fields to just x to reflect ad_group_ads schema - elif report_field.startswith("ad_group_ad.ad."): + if report_field.startswith("ad_group_ad.ad."): transformed_field_name = report_field.split(".")[2] + elif not is_metric_or_segment: + transformed_field_name = "_".join(report_field.split(".")[:2]) else: transformed_field_name = report_field.split(".")[1] - + # TODO: Maybe refactor this + # metadata_key = ("properties", transformed_field_name) # Base metadata for every field if ("properties", transformed_field_name) not in self.stream_metadata: self.stream_metadata[("properties", transformed_field_name)] = { @@ -414,9 +423,7 @@ def build_stream_metadata(self): # Transform field exclusion names so they match the schema for field_name in self.field_exclusions[report_field]: is_metric_or_segment = field_name.startswith("metrics.") or field_name.startswith("segments.") - if (not is_metric_or_segment - and field_name.split(".")[0] not in self.google_ads_resource_names - ): + if not is_metric_or_segment: new_field_name = field_name.replace(".", "_") else: new_field_name = field_name.split(".")[1] @@ -442,13 +449,22 @@ def transform_keys(self, obj): transformed_obj = {} for resource_name, value in obj.items(): - if resource_name == "ad_group_ad": - transformed_obj.update(value["ad"]) - else: + if resource_name in {"metrics", "segments"}: transformed_obj.update(value) - - if "type_" in transformed_obj: - transformed_obj["type"] = transformed_obj.pop("type_") + elif resource_name == "ad_group_ad": + for key, sub_value in value.items(): + if key == 'ad': + transformed_obj.update(sub_value) + else: + transformed_obj.update({f"{resource_name}_{key}": sub_value}) + else: + # value = {"a": 1, "b":2} + # turns into + # {"resource_a": 1, "resource_b": 2} + transformed_obj.update( + {f"{resource_name}_{key}": sub_value + for key, sub_value in value.items()} + ) return transformed_obj @@ -507,7 +523,7 @@ def sync(self, sdk_client, customer, stream, config, state): with Transformer() as transformer: # Pages are fetched automatically while iterating through the response for message in response: - json_message = json.loads(MessageToJson(message, preserving_proto_field_name=True)) + json_message = google_message_to_json(message) transformed_obj = self.transform_keys(json_message) record = transformer.transform(transformed_obj, stream["schema"]) record["_sdc_record_hash"] = generate_hash(record, stream_mdata) diff --git a/tests/base.py b/tests/base.py index 1c3412b..f42f6bb 100644 --- a/tests/base.py +++ b/tests/base.py @@ -618,7 +618,6 @@ def expected_default_fields(): }, "ad_group_audience_performance_report": { 'ad_group_name', - 'user_list_name', }, "campaign_performance_report": { 'average_cpc', # Avg. CPC, @@ -631,24 +630,24 @@ def expected_default_fields(): 'view_through_conversions', # View-through conv., }, "click_performance_report": { - 'ad_group_ad', + 'click_view_ad_group_ad', 'ad_group_id', 'ad_group_name', 'ad_group_status', 'ad_network_type', - 'area_of_interest', - 'campaign_location_target', + 'click_view_area_of_interest', + 'click_view_campaign_location_target', 'click_type', 'clicks', 'customer_descriptive_name', 'customer_id', 'device', - 'gclid', - 'location_of_presence', + 'click_view_gclid', + 'click_view_location_of_presence', 'month_of_year', - 'page_number', + 'click_view_page_number', 'slot', - 'user_list', + 'click_view_user_list', }, "display_keyword_performance_report": { # TODO NO DATA AVAILABLE 'ad_group_name', @@ -702,13 +701,13 @@ def expected_default_fields(): 'click_type', 'clicks', 'date', - 'descriptive_name', - 'id', + 'customer_descriptive_name', + 'customer_id', 'impressions', 'invalid_clicks', - 'manager', - 'test_account', - 'time_zone', + 'customer_manager', + 'customer_test_account', + 'customer_time_zone', }, "geo_performance_report": { 'clicks', @@ -739,7 +738,7 @@ def expected_default_fields(): 'conversions', 'view_through_conversions', # View-through conv., 'cost_per_conversion', # Cost / conv., - 'search_term', + 'search_term_view_search_term', 'search_term_match_type', }, "age_range_performance_report": { @@ -761,7 +760,7 @@ def expected_default_fields(): 'clicks', 'cost_micros', 'interactions', - 'placeholder_type', + 'feed_placeholder_view_placeholder_type', }, 'user_location_performance_report': { 'campaign_id', @@ -773,14 +772,14 @@ def expected_default_fields(): 'campaign_name', 'clicks', 'average_cpc', - 'unexpanded_final_url', + 'landing_page_view_unexpanded_final_url', }, 'expanded_landing_page_report': { 'ad_group_name', 'campaign_name', 'clicks', 'average_cpc', - 'expanded_final_url', + 'expanded_landing_page_view_expanded_final_url', }, 'campaign_audience_performance_report': { 'campaign_name', diff --git a/tests/base_google_ads_field_exclusion.py b/tests/base_google_ads_field_exclusion.py new file mode 100644 index 0000000..b15298c --- /dev/null +++ b/tests/base_google_ads_field_exclusion.py @@ -0,0 +1,138 @@ +"""Test tap field exclusions with random field selection.""" +from datetime import datetime as dt +from datetime import timedelta +import random + +from tap_tester import menagerie, connections, runner + +from base import GoogleAdsBase + + +class FieldExclusionGoogleAdsBase(GoogleAdsBase): + """ + Test tap's field exclusion logic for all streams + + NOTE: Manual test case must be run at least once any time this feature changes or is updated. + Verify when given field selected, `fieldExclusions` fields in metadata are grayed out and cannot be selected (Manually) + """ + + def choose_randomly(self, collection): + return random.choice(list(collection)) + + def random_field_gather(self, input_fields_with_exclusions): + """ + Method takes list of fields with exclusions and generates a random set fields without conflicts as a result + The set of fields with exclusions is generated in random order so that different combinations of fields can + be tested over time. + """ + random_selection = [] + + # Assemble a valid selection of fields with exclusions + remaining_fields = list(self.fields_with_exclusions) + while remaining_fields: + + # Choose randomly from the remaining fields + field_to_select = self.choose_randomly(remaining_fields) + random_selection.append(field_to_select) + + # Remove field and it's excluded fields from remaining + remaining_fields.remove(field_to_select) + for field in self.field_exclusions[field_to_select]: + if field in remaining_fields: + remaining_fields.remove(field) + + # Save list for debug in case test fails + self.random_order_of_exclusion_fields[self.stream].append(field_to_select) + + return random_selection + + def run_test(self): + """ + Verify tap can perform sync for random combinations of fields that do not violate exclusion rules. + Established randomization for valid field selection using new method to select specific fields. + """ + + print( + "Field Exclusion Test with random field selection for tap-google-ads report streams.\n" + f"Streams Under Test: {self.streams_to_test}" + ) + + self.random_order_of_exclusion_fields = {} + + # bump start date from default + self.start_date = dt.strftime(dt.today() - timedelta(days=1), self.START_DATE_FORMAT) + conn_id = connections.ensure_connection(self, original_properties=False) + + # Run a discovery job + found_catalogs = self.run_and_verify_check_mode(conn_id) + + for stream in self.streams_to_test: + with self.subTest(stream=stream): + + catalogs_to_test = [catalog + for catalog in found_catalogs + if catalog["stream_name"] == stream] + + # Make second call to get field level metadata + schema = menagerie.get_annotated_schema(conn_id, catalogs_to_test[0]['stream_id']) + self.field_exclusions = { + rec['breadcrumb'][1]: rec['metadata']['fieldExclusions'] + for rec in schema['metadata'] + if rec['breadcrumb'] != [] and rec['breadcrumb'][1] != "_sdc_record_hash" + } + + print(f"Perform assertions for stream: {stream}") + + # Gather fields with no exclusions so they can all be added to selection set + self.fields_without_exclusions = [] + for field, values in self.field_exclusions.items(): + if values == []: + self.fields_without_exclusions.append(field) + + # Gather fields with exclusions as input to randomly build maximum length selection set + self.fields_with_exclusions = [] + for field, values in self.field_exclusions.items(): + if values != []: + self.fields_with_exclusions.append(field) + + if len(self.fields_with_exclusions) == 0: + raise AssertionError(f"Skipping assertions. No field exclusions for stream: {stream}") + + self.stream = stream + self.random_order_of_exclusion_fields[stream] = [] + + random_exclusion_field_selection_list = self.random_field_gather(self.fields_with_exclusions) + field_selection_set = set(random_exclusion_field_selection_list + self.fields_without_exclusions) + + with self.subTest(order_of_fields_selected=self.random_order_of_exclusion_fields[stream]): + + # Select fields and re-pull annotated_schema. + self.select_stream_and_specified_fields(conn_id, catalogs_to_test[0], field_selection_set) + + try: + # Collect updated metadata + schema_2 = menagerie.get_annotated_schema(conn_id, catalogs_to_test[0]['stream_id']) + + # Verify metadata for all fields + for rec in schema_2['metadata']: + if rec['breadcrumb'] != [] and rec['breadcrumb'][1] != "_sdc_record_hash": + # Verify metadata for selected fields + if rec['breadcrumb'][1] in field_selection_set: + self.assertEqual(rec['metadata']['selected'], True, + msg="Expected selection for field {} = 'True'".format(rec['breadcrumb'][1])) + + else: # Verify metadata for non selected fields + self.assertEqual(rec['metadata']['selected'], False, + msg="Expected selection for field {} = 'False'".format(rec['breadcrumb'][1])) + + # Run a sync + sync_job_name = runner.run_sync_mode(self, conn_id) + exit_status = menagerie.get_exit_status(conn_id, sync_job_name) + menagerie.verify_sync_exit_status(self, exit_status, sync_job_name) + state = menagerie.get_state(conn_id) + + self.assertIn(stream, state['bookmarks'].keys()) + + finally: + # deselect stream once it's been tested + self.deselect_streams(conn_id, catalogs_to_test) diff --git a/tests/test_google_ads_discovery.py b/tests/test_google_ads_discovery.py index d63d6be..8eb4231 100644 --- a/tests/test_google_ads_discovery.py +++ b/tests/test_google_ads_discovery.py @@ -285,7 +285,7 @@ def test_run(self): expected_replication_method = self.expected_replication_method()[stream] # expected_fields = self.expected_fields()[stream] # TODO_TDL-17909 is_report = self.is_report(stream) - expected_behaviors = {'METRIC', 'SEGMENT', 'ATTRIBUTE'} if is_report else {'ATTRIBUTE', 'SEGMENT'} + expected_behaviors = {'METRIC', 'SEGMENT', 'ATTRIBUTE', 'PRIMARY KEY'} if is_report else {'ATTRIBUTE', 'SEGMENT'} # collecting actual values from the catalog schema_and_metadata = menagerie.get_annotated_schema(conn_id, catalog['stream_id']) @@ -365,8 +365,6 @@ def test_run(self): msg="Not all non key properties are set to available in metadata") # verify 'behavior' is present in metadata for all streams - if is_report: - actual_fields.remove('_sdc_record_hash') self.assertEqual(fields_with_behavior, set(actual_fields)) # verify 'behavior' falls into expected set of behaviors (based on stream type) @@ -374,20 +372,21 @@ def test_run(self): with self.subTest(field=field): self.assertIn(behavior, expected_behaviors) - # NB | The following assertion is left commented with the assumption that this will be a valid - # expectation by the time the tap moves to Beta. If this is not valid at that time it should - # be removed. Or if work done in TDL-17910 results in this being an unnecessary check - - # if is_report: - # # verify each field in a report stream has a 'fieldExclusions' entry and that the fields listed - # # in that set are present in elsewhere in the stream's catalog - # fields_to_exclusions = {md['breadcrumb'][-1]: md['metadata']['fieldExclusions'] - # for md in metadata - # if md['breadcrumb'] != [] and - # md['metadata'].get('fieldExclusions')} - # for field, exclusions in fields_to_exclusions.items(): - # with self.subTest(field=field): - # self.assertTrue( - # set(exclusions).issubset(set(actual_fields)), - # msg=f"'fieldExclusions' contain fields not accounted for by the catalog: {set(exclusions) - set(actual_fields)}" - # ) + # TODO put back when field exlusion changes are merged + # verify for each report stream with exlusions, that all supported fields are mutually exlcusive + if is_report and stream != "click_performance_report": + fields_to_exclusions = {md['breadcrumb'][-1]: md['metadata']['fieldExclusions'] + for md in metadata + if md['breadcrumb'] != [] and + md['metadata'].get('fieldExclusions')} + for field, exclusions in fields_to_exclusions.items(): + for excluded_field in exclusions: + with self.subTest(field=field, excluded_field=excluded_field): + + if excluded_field in actual_fields: # some fields in the exclusion list are not supported + + # Verify the excluded field has it's own exclusion list + self.assertIsNotNone(fields_to_exclusions.get(excluded_field)) + + # Verify the excluded field is excluding the original field (mutual exclusion) + self.assertIn(field, fields_to_exclusions[excluded_field]) diff --git a/tests/test_google_ads_field_exclusion.py b/tests/test_google_ads_field_exclusion.py deleted file mode 100644 index 596111a..0000000 --- a/tests/test_google_ads_field_exclusion.py +++ /dev/null @@ -1,186 +0,0 @@ -"""Test tap field exclusions with random field selection.""" -from datetime import datetime as dt -from datetime import timedelta -import random - -from tap_tester import menagerie, connections, runner - -from base import GoogleAdsBase - - -class FieldExclusionGoogleAds(GoogleAdsBase): - """ - Test tap's field exclusion logic for all streams - - NOTE: Manual test case must be run at least once any time this feature changes or is updated. - Verify when given field selected, `fieldExclusions` fields in metadata are grayed out and cannot be selected (Manually) - """ - @staticmethod - def name(): - return "tt_google_ads_field_exclusion" - - def random_field_gather(self, input_fields_with_exclusions): - """ - Method takes list of fields with exclusions and generates a random set fields without conflicts as a result - The set of fields with exclusions is generated in random order so that different combinations of fields can - be tested over time. - """ - - # Build random set of fields with exclusions. Select as many as possible - randomly_selected_list_of_fields_with_exclusions = [] - remaining_available_fields_with_exclusions = input_fields_with_exclusions - while len(remaining_available_fields_with_exclusions) > 0: - # Randomly select one field that has exclusions - newly_added_field = remaining_available_fields_with_exclusions[ - random.randrange(len(remaining_available_fields_with_exclusions))] - # Save list for debug incase test fails - self.random_order_of_exclusion_fields[self.stream].append(newly_added_field,) - randomly_selected_list_of_fields_with_exclusions.append(newly_added_field) - # Update remaining_available_fields_with_exclusinos based on random selection - newly_excluded_fields_to_remove = self.field_exclusions[newly_added_field] - # Remove newly selected field - remaining_available_fields_with_exclusions.remove(newly_added_field) - # Remove associated excluded fields - for field in newly_excluded_fields_to_remove: - if field in remaining_available_fields_with_exclusions: - remaining_available_fields_with_exclusions.remove(field) - - exclusion_fields_to_select = randomly_selected_list_of_fields_with_exclusions - - return exclusion_fields_to_select - - - def test_default_case(self): - """ - Verify tap can perform sync for random combinations of fields that do not violate exclusion rules. - Established randomization for valid field selection using new method to select specific fields. - """ - print("Field Exclusion Test with random field selection for tap-google-ads report streams") - - # --- Test report streams --- # - - streams_to_test = {stream for stream in self.expected_streams() - if self.is_report(stream)} - {'click_performance_report'} # No exclusions - - # streams_to_test = streams_to_test - { - # # These streams missing from expected_default_fields() method TODO unblocked due to random? Test them now - # # 'shopping_performance_report', - # # 'keywords_performance_report', - # # TODO These streams have no data to replicate and fail the last assertion - # 'video_performance_report', - # 'audience_performance_report', - # 'placement_performance_report', - # 'display_topics_performance_report', - # 'display_keyword_performance_report', - # } - # streams_to_test = {'gender_performance_report', 'placeholder_report',} - random_order_of_exclusion_fields = {} - - # bump start date from default - self.start_date = dt.strftime(dt.today() - timedelta(days=3), self.START_DATE_FORMAT) - conn_id = connections.ensure_connection(self, original_properties=False) - - # Run a discovery job - found_catalogs = self.run_and_verify_check_mode(conn_id) - - for stream in streams_to_test: - with self.subTest(stream=stream): - - catalogs_to_test = [catalog - for catalog in found_catalogs - if catalog["stream_name"] == stream] - - # Make second call to get field level metadata - schema = menagerie.get_annotated_schema(conn_id, catalogs_to_test[0]['stream_id']) - field_exclusions = { - rec['breadcrumb'][1]: rec['metadata']['fieldExclusions'] - for rec in schema['metadata'] - if rec['breadcrumb'] != [] and rec['breadcrumb'][1] != "_sdc_record_hash" - } - - self.field_exclusions = field_exclusions # expose filed_exclusions globally so other methods can use it - - print(f"Perform assertions for stream: {stream}") - - # Gather fields with no exclusions so they can all be added to selection set - fields_without_exclusions = [] - for field, values in field_exclusions.items(): - if values == []: - fields_without_exclusions.append(field) - - # Gather fields with exclusions as input to randomly build maximum length selection set - fields_with_exclusions = [] - for field, values in field_exclusions.items(): - if values != []: - fields_with_exclusions.append(field) - - if len(fields_with_exclusions) == 0: - raise AssertionError(f"Skipping assertions. No field exclusions for stream: {stream}") - - self.stream = stream - random_order_of_exclusion_fields[stream] = [] - self.random_order_of_exclusion_fields = random_order_of_exclusion_fields - - random_exclusion_field_selection_list = self.random_field_gather(fields_with_exclusions) - field_selection_set = set(random_exclusion_field_selection_list + fields_without_exclusions) - - with self.subTest(order_of_fields_selected=self.random_order_of_exclusion_fields[stream]): - - # Select fields and re-pull annotated_schema. - self.select_stream_and_specified_fields(conn_id, catalogs_to_test[0], field_selection_set) - - try: - # Collect updated metadata - schema_2 = menagerie.get_annotated_schema(conn_id, catalogs_to_test[0]['stream_id']) - - # Verify metadata for all fields - for rec in schema_2['metadata']: - if rec['breadcrumb'] != [] and rec['breadcrumb'][1] != "_sdc_record_hash": - # Verify metadata for selected fields - if rec['breadcrumb'][1] in field_selection_set: - self.assertEqual(rec['metadata']['selected'], True, - msg="Expected selection for field {} = 'True'".format(rec['breadcrumb'][1])) - - else: # Verify metadata for non selected fields - self.assertEqual(rec['metadata']['selected'], False, - msg="Expected selection for field {} = 'False'".format(rec['breadcrumb'][1])) - - # Run a sync - sync_job_name = runner.run_sync_mode(self, conn_id) - exit_status = menagerie.get_exit_status(conn_id, sync_job_name) - menagerie.verify_sync_exit_status(self, exit_status, sync_job_name) - - # These streams likely replicate records using the default field selection but may not produce any - # records when selecting this many fields with exclusions. - # streams_unlikely_to_replicate_records = { - # 'ad_performance_report', - # 'account_performance_report', - # 'shopping_performance_report', - # 'search_query_performance_report', - # 'placeholder_feed_item_report', - # 'placeholder_report', - # 'keywords_performance_report', - # 'keywordless_query_report', - # 'geo_performance_report', - # 'gender_performance_report', # Very rare - # 'ad_group_audience_performance_report', - # 'age_range_performance_report', - # 'campaign_audience_performance_report', - # 'user_location_performance_report', - # 'ad_group_performance_report', - # } - - # if stream not in streams_unlikely_to_replicate_records: - # sync_record_count = runner.examine_target_output_file( - # self, conn_id, self.expected_streams(), self.expected_primary_keys()) - # self.assertGreater( - # sum(sync_record_count.values()), 0, - # msg="failed to replicate any data: {}".format(sync_record_count) - # ) - # print("total replicated row count: {}".format(sum(sync_record_count.values()))) - - # TODO additional assertions? - - finally: - # deselect stream once it's been tested - self.deselect_streams(conn_id, catalogs_to_test) diff --git a/tests/test_google_ads_field_exclusion_1.py b/tests/test_google_ads_field_exclusion_1.py new file mode 100644 index 0000000..8f51148 --- /dev/null +++ b/tests/test_google_ads_field_exclusion_1.py @@ -0,0 +1,27 @@ +"""Test tap field exclusions with random field selection.""" +from datetime import datetime as dt +from datetime import timedelta +import random + +from tap_tester import menagerie, connections, runner + +from base_google_ads_field_exclusion import FieldExclusionGoogleAdsBase + + +class FieldExclusion1(FieldExclusionGoogleAdsBase): + + @staticmethod + def name(): + return "tt_google_ads_exclusion_1" + + streams_to_test = { + "account_performance_report", + "ad_group_audience_performance_report", + "ad_group_performance_report", + "ad_performance_report", + "age_range_performance_report", + "campaign_audience_performance_report", + } + + def test_run(self): + self.run_test() diff --git a/tests/test_google_ads_field_exclusion_2.py b/tests/test_google_ads_field_exclusion_2.py new file mode 100644 index 0000000..21b90df --- /dev/null +++ b/tests/test_google_ads_field_exclusion_2.py @@ -0,0 +1,27 @@ +"""Test tap field exclusions with random field selection.""" +from datetime import datetime as dt +from datetime import timedelta +import random + +from tap_tester import menagerie, connections, runner + +from base_google_ads_field_exclusion import FieldExclusionGoogleAdsBase + + +class FieldExclusion2(FieldExclusionGoogleAdsBase): + + @staticmethod + def name(): + return "tt_google_ads_exclusion_2" + + streams_to_test = { + "campaign_performance_report", + # "click_performance_report", # NO EXCLUSIONS, SKIPPED INTENTIONALLY + "display_keyword_performance_report", + "display_topics_performance_report", + "expanded_landing_page_report", + "gender_performance_report", + } + + def test_run(self): + self.run_test() diff --git a/tests/test_google_ads_field_exclusion_3.py b/tests/test_google_ads_field_exclusion_3.py new file mode 100644 index 0000000..a426dcd --- /dev/null +++ b/tests/test_google_ads_field_exclusion_3.py @@ -0,0 +1,27 @@ +"""Test tap field exclusions with random field selection.""" +from datetime import datetime as dt +from datetime import timedelta +import random + +from tap_tester import menagerie, connections, runner + +from base_google_ads_field_exclusion import FieldExclusionGoogleAdsBase + + +class FieldExclusion3(FieldExclusionGoogleAdsBase): + + @staticmethod + def name(): + return "tt_google_ads_exclusion_3" + + streams_to_test = { + "geo_performance_report", + "keywordless_query_report", + "keywords_performance_report", + "landing_page_report", + "placeholder_feed_item_report", + "placeholder_report", + } + + def test_run(self): + self.run_test() diff --git a/tests/test_google_ads_field_exclusion_4.py b/tests/test_google_ads_field_exclusion_4.py new file mode 100644 index 0000000..4eea8ab --- /dev/null +++ b/tests/test_google_ads_field_exclusion_4.py @@ -0,0 +1,27 @@ +"""Test tap field exclusions with random field selection.""" +from datetime import datetime as dt +from datetime import timedelta +import random + +from tap_tester import menagerie, connections, runner + +from base_google_ads_field_exclusion import FieldExclusionGoogleAdsBase + + +class FieldExclusion4(FieldExclusionGoogleAdsBase): + + @staticmethod + def name(): + return "tt_google_ads_exclusion_4" + + streams_to_test = { + "placement_performance_report", + "search_query_performance_report", + "shopping_performance_report", + "user_location_performance_report", + "user_location_performance_report", + "video_performance_report", + } + + def test_run(self): + self.run_test() diff --git a/tests/test_google_ads_field_exclusion_coverage.py b/tests/test_google_ads_field_exclusion_coverage.py new file mode 100644 index 0000000..8acf39a --- /dev/null +++ b/tests/test_google_ads_field_exclusion_coverage.py @@ -0,0 +1,36 @@ +from base_google_ads_field_exclusion import FieldExclusionGoogleAdsBase + + +class FieldExlusionCoverage(FieldExclusionGoogleAdsBase): + + checking_coverage = True + + @staticmethod + def name(): + return "tt_google_ads_exclusion_coverage" + + def test_run(self): + """ + Ensure all report streams are covered for the field exlusion test cases. + The report streams are spread out across several test classes for parallelism. This extra + step is required as we hardcode the streams under test in each of the four classes. + """ + report_streams = {stream for stream in self.expected_streams() + if self.is_report(stream) + and stream != "click_performance_report"} + + from test_google_ads_field_exclusion_1 import FieldExclusion1 + from test_google_ads_field_exclusion_2 import FieldExclusion2 + from test_google_ads_field_exclusion_3 import FieldExclusion3 + from test_google_ads_field_exclusion_4 import FieldExclusion4 + + f1 = FieldExclusion1() + f2 = FieldExclusion2() + f3 = FieldExclusion3() + f4 = FieldExclusion4() + + streams_under_test = f1.streams_to_test | f2.streams_to_test | f3.streams_to_test | f4.streams_to_test + + self.assertSetEqual(report_streams, streams_under_test) + print("ALL REPORT STREAMS UNDER TEST") + print(f"Streams: {streams_under_test}") diff --git a/tests/test_google_ads_field_exclusion_invalid.py b/tests/test_google_ads_field_exclusion_invalid.py index 3816935..ea2a165 100644 --- a/tests/test_google_ads_field_exclusion_invalid.py +++ b/tests/test_google_ads_field_exclusion_invalid.py @@ -21,78 +21,53 @@ class FieldExclusionInvalidGoogleAds(GoogleAdsBase): def name(): return "tt_google_ads_field_exclusion_invalid" - def perform_exclusion_verification(self, field_exclusion_dict): - """ - Verify for a pair of fields that if field_1 is in field_2's exclusion list then field_2 must be in field_1's exclusion list - """ - error_dict = {} - for field, values in field_exclusion_dict.items(): - if values != []: - for value in values: - if value in field_exclusion_dict.keys(): - if field not in field_exclusion_dict[value]: - if field not in error_dict.keys(): - error_dict[field] = [value] - else: - error_dict[field] += [value] - - return error_dict - - def random_field_gather(self, input_fields_with_exclusions): + def choose_randomly(self, collection): + return random.choice(list(collection)) + + def random_field_gather(self): """ Method takes list of fields with exclusions and generates a random set fields without conflicts as a result The set of fields with exclusions is generated in random order so that different combinations of fields can be tested over time. A single invalid field is then added to violate exclusion rules. """ - # Build random set of fields with exclusions. Select as many as possible - all_fields = input_fields_with_exclusions + self.fields_without_exclusions - randomly_selected_list_of_fields_with_exclusions = [] - remaining_available_fields_with_exclusions = input_fields_with_exclusions - while len(remaining_available_fields_with_exclusions) > 0: - # Randomly select one field that has exclusions - newly_added_field = remaining_available_fields_with_exclusions[ - random.randrange(len(remaining_available_fields_with_exclusions))] - # Save list for debug incase test fails - self.random_order_of_exclusion_fields[self.stream].append(newly_added_field,) - randomly_selected_list_of_fields_with_exclusions.append(newly_added_field) - # Update remaining_available_fields_with_exclusinos based on random selection - newly_excluded_fields_to_remove = self.field_exclusions[newly_added_field] - # Remove newly selected field - remaining_available_fields_with_exclusions.remove(newly_added_field) - # Remove associated excluded fields - for field in newly_excluded_fields_to_remove: - if field in remaining_available_fields_with_exclusions: - remaining_available_fields_with_exclusions.remove(field) + random_selection = [] + + # Assemble a valid selection of fields with exclusions + remaining_fields = list(self.fields_with_exclusions) + while remaining_fields: + + # Choose randomly from the remaining fields + field_to_select = self.choose_randomly(remaining_fields) + random_selection.append(field_to_select) + + # Remove field and it's excluded fields from remaining + remaining_fields.remove(field_to_select) + for field in self.field_exclusions[field_to_select]: + if field in remaining_fields: + remaining_fields.remove(field) + + # Save list for debug incase test fails + self.random_order_of_exclusion_fields[self.stream].append(field_to_select) # Now add one more exclusion field to make the selection invalid - found_invalid_field = False - while found_invalid_field == False: - # Select a field from our list at random - invalid_field_partner = randomly_selected_list_of_fields_with_exclusions[ - random.randrange(len(randomly_selected_list_of_fields_with_exclusions))] - # Find all fields excluded by selected field - invalid_field_pool = self.field_exclusions[invalid_field_partner] - # Remove any fields not in metadata properties for this stream - for field in reversed(invalid_field_pool): - if field not in all_fields: - invalid_field_pool.remove(field) - - # Make sure there is still one left to select, if not try again - if len(invalid_field_pool) == 0: - continue - - # Select field randomly and unset flag to terminate loop - invalid_field = invalid_field_pool[random.randrange(len(invalid_field_pool))] - found_invalid_field = True - - # Add the invalid field to the lists + while True: + # Choose randomly from the selected fields + random_field = self.choose_randomly(random_selection) + + # Choose randomly from that field's supported excluded fields + excluded_fields = set(self.field_exclusions[random_field]) + supported_excluded_fields = {field for field in excluded_fields + if field in self.fields_with_exclusions} + if supported_excluded_fields: + invalid_field = self.choose_randomly(supported_excluded_fields) + break + + # Add this invalid field to the selection + random_selection.append(invalid_field) self.random_order_of_exclusion_fields[self.stream].append(invalid_field,) - randomly_selected_list_of_fields_with_exclusions.append(invalid_field) - exclusion_fields_to_select = randomly_selected_list_of_fields_with_exclusions - - return exclusion_fields_to_select + return random_selection def test_invalid_case(self): @@ -110,14 +85,14 @@ def test_invalid_case(self): streams_to_test = {stream for stream in self.expected_streams() if self.is_report(stream)} - {'click_performance_report'} # No exclusions. TODO remove dynamically - #streams_to_test = {'search_query_performance_report', 'placeholder_report',} + # streams_to_test = {'search_query_performance_report'} # , 'placeholder_report',} random_order_of_exclusion_fields = {} tap_exit_status_by_stream = {} exclusion_errors = {} # bump start date from default - self.start_date = dt.strftime(dt.today() - timedelta(days=3), self.START_DATE_FORMAT) + self.start_date = dt.strftime(dt.today() - timedelta(days=1), self.START_DATE_FORMAT) conn_id = connections.ensure_connection(self, original_properties=False) # Run a discovery job @@ -133,43 +108,36 @@ def test_invalid_case(self): # Make second call to get field metadata schema = menagerie.get_annotated_schema(conn_id, catalogs_to_test[0]['stream_id']) - field_exclusions = { + self.field_exclusions = { rec['breadcrumb'][1]: rec['metadata']['fieldExclusions'] for rec in schema['metadata'] if rec['breadcrumb'] != [] and rec['breadcrumb'][1] != "_sdc_record_hash" } - self.field_exclusions = field_exclusions - # Gather fields with no exclusions so they can all be added to selection set - fields_without_exclusions = [] - for field, values in field_exclusions.items(): + self.fields_without_exclusions = [] + for field, values in self.field_exclusions.items(): if values == []: - fields_without_exclusions.append(field) - self.fields_without_exclusions = fields_without_exclusions + self.fields_without_exclusions.append(field) # Gather fields with exclusions as input to randomly build maximum length selection set - fields_with_exclusions = [] - for field, values in field_exclusions.items(): + self.fields_with_exclusions = [] + for field, values in self.field_exclusions.items(): if values != []: - fields_with_exclusions.append(field) - if len(fields_with_exclusions) == 0: + self.fields_with_exclusions.append(field) + if len(self.fields_with_exclusions) == 0: raise AssertionError(f"Skipping assertions. No field exclusions for stream: {stream}") # Add new key to existing dicts random_order_of_exclusion_fields[stream] = [] - exclusion_errors[stream] = {} # Expose variables globally self.stream = stream self.random_order_of_exclusion_fields = random_order_of_exclusion_fields # Build random lists - random_exclusion_field_selection_list = self.random_field_gather(fields_with_exclusions) - field_selection_set = set(random_exclusion_field_selection_list + fields_without_exclusions) - - # Collect any errors if they occur - exclusion_errors[stream] = self.perform_exclusion_verification(field_exclusions) + random_exclusion_field_selection_list = self.random_field_gather() + field_selection_set = set(random_exclusion_field_selection_list + self.fields_without_exclusions) with self.subTest(order_of_fields_selected=self.random_order_of_exclusion_fields[stream]): @@ -192,46 +160,34 @@ def test_invalid_case(self): self.assertEqual(rec['metadata']['selected'], False, msg="Expected selection for field {} = 'False'".format(rec['breadcrumb'][1])) - # # Run a sync - # sync_job_name = runner.run_sync_mode(self, conn_id) - # exit_status = menagerie.get_exit_status(conn_id, sync_job_name) - - # print(f"Perform assertions for stream: {stream}") - # if exit_status.get('target_exit_status') == 1: - # #print(f"Stream {stream} has tap_exit_status = {exit_status.get('tap_exit_status')}\n" + - # # "Message: {exit_status.get('tap_error_message')") - # tap_exit_status_by_stream[stream] = exit_status.get('tap_exit_status') - # else: - # #print(f"\n*** {stream} tap_exit_status {exit_status.get('tap_exit_status')} ***\n") - # tap_exit_status_by_stream[stream] = exit_status.get('tap_exit_status') - # #self.assertEqual(1, exit_status.get('tap_exit_status')) # 11 failures on run 1 - # self.assertEqual(0, exit_status.get('target_exit_status')) - # self.assertEqual(0, exit_status.get('discovery_exit_status')) - # self.assertIsNone(exit_status.get('check_exit_status')) + # Run a sync + sync_job_name = runner.run_sync_mode(self, conn_id) + exit_status = menagerie.get_exit_status(conn_id, sync_job_name) + + print(f"Perform assertions for stream: {stream}") + if exit_status.get('target_exit_status') == 1: + print(f"Stream {stream} has tap_exit_status = {exit_status.get('tap_exit_status')}\n" + + "Message: {exit_status.get('tap_error_message')") + tap_exit_status_by_stream[stream] = exit_status.get('tap_exit_status') + else: + print(f"\n*** {stream} tap_exit_status {exit_status.get('tap_exit_status')} ***\n") + tap_exit_status_by_stream[stream] = exit_status.get('tap_exit_status') + self.assertEqual(1, exit_status.get('tap_exit_status')) + self.assertEqual(0, exit_status.get('target_exit_status')) + self.assertEqual(0, exit_status.get('discovery_exit_status')) + self.assertIsNone(exit_status.get('check_exit_status')) # Verify error message tells user they must select an attribute/metric for the invalid stream - # TODO build list of strings to test in future - - # Initial assertion group generated if all fields selelcted - # self.assertIn( - # "PROHIBITED_FIELD_COMBINATION_IN_SELECT_CLAUSE", - # exit_status.get("tap_error_message") - # ) - # self.assertIn( - # "The following pairs of fields may not be selected together", - # exit_status.get("tap_error_message") - # ) - - # New error message if random selection method is used - # PROHIBITED_SEGMENT_WITH_METRIC_IN_SELECT_OR_WHERE_CLAUSE - - # TODO additional assertions? - # self.assertEqual(len(exclusion_erros[stream], 0) - + error_messages = ["The following pairs of fields may not be selected together", + "Cannot select or filter on the following", + "Cannot select the following",] + self.assertTrue( + any([error_message in exit_status.get("tap_error_message") + for error_message in error_messages]), + msg=f'Unexpected Error Message: {exit_status.get("tap_error_message")}') + print(f"\n*** {stream} tap_error_message {exit_status.get('tap_error_message')} ***\n") finally: # deselect stream once it's been tested self.deselect_streams(conn_id, catalogs_to_test) print("Streams tested: {}\ntap_exit_status_by_stream: {}".format(len(streams_to_test), tap_exit_status_by_stream)) - print("Exclusion errors:") - pprint.pprint(exclusion_errors) diff --git a/tests/test_google_ads_sync_canary.py b/tests/test_google_ads_sync_canary.py index 9618251..ee5bc7d 100644 --- a/tests/test_google_ads_sync_canary.py +++ b/tests/test_google_ads_sync_canary.py @@ -41,9 +41,11 @@ def test_run(self): # Perform table and field selection... core_catalogs = [catalog for catalog in test_catalogs - if not self.is_report(catalog['stream_name'])] + if not self.is_report(catalog['stream_name']) + or catalog['stream_name'] == 'click_performance_report'] report_catalogs = [catalog for catalog in test_catalogs - if self.is_report(catalog['stream_name'])] + if self.is_report(catalog['stream_name']) + and catalog['stream_name'] != 'click_performance_report'] # select all fields for core streams and... self.select_all_streams_and_fields(conn_id, core_catalogs, select_all_fields=True) # select 'default' fields for report streams diff --git a/tests/unittests/test_utils.py b/tests/unittests/test_utils.py index 6571e6d..dd55744 100644 --- a/tests/unittests/test_utils.py +++ b/tests/unittests/test_utils.py @@ -123,7 +123,7 @@ class TestRecordHashing(unittest.TestCase): ('properties', 'date'): {'behavior': 'SEGMENT'}, }) - expected_hash = 'ade8240f134633fe125388e469e61ccf9e69033fd5e5f166b4b44766bc6376d3' + expected_hash = '38d95857633f1e04092f7a308f0d3777d965cba80a5593803dd2b7e4a484ce64' def test_record_hash_canary(self): self.assertEqual(self.expected_hash, generate_hash(self.test_record, self.test_metadata))