From ba39b455b6286e058022e6942ba4b5cb22a2295b Mon Sep 17 00:00:00 2001 From: Prijen Khokhani <88327452+prijendev@users.noreply.github.com> Date: Thu, 16 Jun 2022 01:32:38 +0530 Subject: [PATCH] Crest master (#66) * Tdl 19235 handle uncaught exceptions (#61) * Added backoff for 5xx, 429 and ReadTimeout errors. * Resolved pylint error. * Updated comments in the unittest cases. * Updated error handling. * TDL-18749 Implement interruptible full table streams. (#60) * Implemented interruptible full table streams. * Resolved pylint error * Resolved error in full table sync test case. * Updated config.yml to pass cci * Updated query building logic. * Updated integration test case. * Resolved review comments. * Resolved comments. * Implemeted logic to skip the duplicate records. * Resolved unittest case error. * Resolved pylint error * Resolved integration test case error * Added empty filter param for call_details and campaign_label stream. * Added unit test cases for should_sync method. * Revert "Implemeted logic to skip the duplicate records." This reverts commit cd06e11657bd35edbaefcd7f8f12acfb938e05ec. * Added logger message for debugging purpose * Updated integration test case. * Replaced .format with f string. * Updated comment in integration test. Co-authored-by: KrishnanG --- .circleci/config.yml | 2 +- tap_google_ads/streams.py | 133 ++++++++++++++++-- ..._google_ads_interrupted_sync_full_table.py | 132 +++++++++++++++++ tests/unittests/test_backoff.py | 113 +++++++++++++++ .../test_core_stream_query_building.py | 70 +++++++++ 5 files changed, 435 insertions(+), 15 deletions(-) create mode 100644 tests/test_google_ads_interrupted_sync_full_table.py create mode 100644 tests/unittests/test_backoff.py create mode 100644 tests/unittests/test_core_stream_query_building.py diff --git a/.circleci/config.yml b/.circleci/config.yml index 044ebc8..ff7e698 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -69,7 +69,7 @@ jobs: run_integration_tests: executor: docker-executor - parallelism: 18 + parallelism: 19 steps: - checkout - attach_workspace: diff --git a/tap_google_ads/streams.py b/tap_google_ads/streams.py index 783691b..57cd39e 100644 --- a/tap_google_ads/streams.py +++ b/tap_google_ads/streams.py @@ -4,9 +4,11 @@ from datetime import timedelta import singer from singer import Transformer -from singer import utils +from singer import utils, metrics from google.protobuf.json_format import MessageToJson from google.ads.googleads.errors import GoogleAdsException +from google.api_core.exceptions import ServerError, TooManyRequests +from requests.exceptions import ReadTimeout import backoff from . import report_definitions @@ -25,7 +27,7 @@ ) DEFAULT_CONVERSION_WINDOW = 30 - +DEFAULT_PAGE_SIZE = 1000 def get_conversion_window(config): """Fetch the conversion window from the config and error on invalid values""" @@ -80,9 +82,60 @@ def build_parameters(): param_str = ",".join(f"{k}={v}" for k, v in API_PARAMETERS.items()) return f"PARAMETERS {param_str}" +def generate_where_and_orderby_clause(last_pk_fetched, filter_param, composite_pks): + """ + Generates a WHERE clause and a ORDER BY clause based on filter parameter(`key_properties`), and + `last_pk_fetched`. + + Example: + + Single PK Case: + + filter_param = 'id' + last_pk_fetched = 1 + composite_pks = False + Returns: + WHERE id > 1 ORDER BY id ASC + + Composite PK Case: + + composite_pks = True + filter_param = 'id' + last_pk_fetched = 1 + Returns: + WHERE id >= 1 ORDER BY id ASC + """ + where_clause = "" + order_by_clause = "" + + # Even If the stream has a composite primary key, we are storing only a single pk value in the bookmark. + # So, there might be possible that records with the same single pk value exist with different pk value combinations. + # That's why for composite_pks we are using a greater than or equal operator. + comparison_operator = ">=" + + if not composite_pks: + # Exclude equality for the stream which do not have a composite primary key. + # Because in single pk case we are sure that no other record will have the same pk. + # So, we do not want to fetch the last record again. + comparison_operator = ">" + + if filter_param: + # Create ORDER BY clause for the stream which support filter parameter. + order_by_clause = f"ORDER BY {filter_param} ASC" + + if last_pk_fetched: + # Create WHERE clause based on last_pk_fetched. + where_clause = f'WHERE {filter_param} {comparison_operator} {last_pk_fetched} ' + + return f'{where_clause}{order_by_clause}' + +def create_core_stream_query(resource_name, selected_fields, last_pk_fetched, filter_param, composite_pks): + + # Generate a query using WHERE and ORDER BY parameters. + where_order_by_clause = generate_where_and_orderby_clause(last_pk_fetched, filter_param, composite_pks) + + core_query = f"SELECT {','.join(selected_fields)} FROM {resource_name} {where_order_by_clause} {build_parameters()}" -def create_core_stream_query(resource_name, selected_fields): - core_query = f"SELECT {','.join(selected_fields)} FROM {resource_name} {build_parameters()}" return core_query @@ -117,6 +170,13 @@ def generate_hash(record, metadata): def should_give_up(ex): + + # ServerError is the parent class of InternalServerError, MethodNotImplemented, BadGateway, + # ServiceUnavailable, GatewayTimeout, DataLoss and Unknown classes. + # Return False for all above errors and ReadTimeout error. + if isinstance(ex, (ServerError, TooManyRequests, ReadTimeout)): + return False + if isinstance(ex, AttributeError): if str(ex) == "'NoneType' object has no attribute 'Call'": LOGGER.info('Retrying request due to AttributeError') @@ -141,12 +201,14 @@ def on_giveup_func(err): @backoff.on_exception(backoff.expo, (GoogleAdsException, + ServerError, TooManyRequests, + ReadTimeout, AttributeError), max_tries=5, jitter=None, giveup=should_give_up, on_giveup=on_giveup_func, - logger=None) + ) def make_request(gas, query, customer_id): response = gas.search(query=query, customer_id=customer_id) return response @@ -172,11 +234,12 @@ def filter_out_non_attribute_fields(fields): class BaseStream: # pylint: disable=too-many-instance-attributes - def __init__(self, fields, google_ads_resource_names, resource_schema, primary_keys, automatic_keys = None): + def __init__(self, fields, google_ads_resource_names, resource_schema, primary_keys, automatic_keys = None, filter_param = None): self.fields = fields self.google_ads_resource_names = google_ads_resource_names self.primary_keys = primary_keys self.automatic_keys = automatic_keys if automatic_keys else set() + self.filter_param = filter_param self.extract_field_information(resource_schema) self.create_full_schema(resource_schema) @@ -330,22 +393,44 @@ def sync(self, sdk_client, customer, stream, config, state): # pylint: disable=u state = singer.set_currently_syncing(state, [stream_name, customer["customerId"]]) singer.write_state(state) - query = create_core_stream_query(resource_name, selected_fields) + # last run was interrupted if there is a bookmark available for core streams. + last_pk_fetched = singer.get_bookmark(state, + stream["tap_stream_id"], + customer["customerId"]) or {} + + # Assign True if the primary key is composite. + composite_pks = len(self.primary_keys) > 1 + + query = create_core_stream_query(resource_name, selected_fields, last_pk_fetched.get('last_pk_fetched'), self.filter_param, composite_pks) try: response = make_request(gas, query, customer["customerId"]) except GoogleAdsException as err: LOGGER.warning("Failed query: %s", query) raise err - with Transformer() as transformer: - # Pages are fetched automatically while iterating through the response - for message in response: - json_message = google_message_to_json(message) - transformed_message = self.transform_keys(json_message) - record = transformer.transform(transformed_message, stream["schema"], singer.metadata.to_map(stream_mdata)) + with metrics.record_counter(stream_name) as counter: + with Transformer() as transformer: + # Pages are fetched automatically while iterating through the response + for message in response: + json_message = google_message_to_json(message) + transformed_message = self.transform_keys(json_message) + record = transformer.transform(transformed_message, stream["schema"], singer.metadata.to_map(stream_mdata)) + + singer.write_record(stream_name, record) + counter.increment() + + # Write state(last_pk_fetched) using primary key(id) value for core streams after DEFAULT_PAGE_SIZE records + if counter.value % DEFAULT_PAGE_SIZE == 0 and self.filter_param: + bookmark_value = record[self.primary_keys[0]] + singer.write_bookmark(state, stream["tap_stream_id"], customer["customerId"], {'last_pk_fetched': bookmark_value}) - singer.write_record(stream_name, record) + singer.write_state(state) + LOGGER.info("Write state for stream: %s, value: %s", stream_name, bookmark_value) + # Flush the state for core streams if sync is completed + if stream["tap_stream_id"] in state.get('bookmarks', {}): + state['bookmarks'].pop(stream["tap_stream_id"]) + singer.write_state(state) def get_query_date(start_date, bookmark, conversion_window_date): """Return a date within the conversion window and after start date @@ -620,12 +705,14 @@ def initialize_core_streams(resource_schema): resource_schema, ["id"], {"customer_id"}, + filter_param="accessible_bidding_strategy.id" ), "accounts": BaseStream( report_definitions.ACCOUNT_FIELDS, ["customer"], resource_schema, ["id"], + filter_param="customer.id" ), "ad_groups": BaseStream( report_definitions.AD_GROUP_FIELDS, @@ -636,6 +723,7 @@ def initialize_core_streams(resource_schema): "campaign_id", "customer_id", }, + filter_param="ad_group.id" ), "ad_group_criterion": BaseStream( report_definitions.AD_GROUP_CRITERION_FIELDS, @@ -646,6 +734,7 @@ def initialize_core_streams(resource_schema): "campaign_id", "customer_id", }, + filter_param="ad_group.id" ), "ads": BaseStream( report_definitions.AD_GROUP_AD_FIELDS, @@ -657,6 +746,7 @@ def initialize_core_streams(resource_schema): "campaign_id", "customer_id", }, + filter_param = "ad_group_ad.ad.id" ), "bidding_strategies": BaseStream( report_definitions.BIDDING_STRATEGY_FIELDS, @@ -664,6 +754,7 @@ def initialize_core_streams(resource_schema): resource_schema, ["id"], {"customer_id"}, + filter_param="bidding_strategy.id" ), "call_details": BaseStream( report_definitions.CALL_VIEW_FIELDS, @@ -682,6 +773,7 @@ def initialize_core_streams(resource_schema): resource_schema, ["id"], {"customer_id"}, + filter_param="campaign.id" ), "campaign_budgets": BaseStream( report_definitions.CAMPAIGN_BUDGET_FIELDS, @@ -689,6 +781,7 @@ def initialize_core_streams(resource_schema): resource_schema, ["id"], {"customer_id"}, + filter_param="campaign_budget.id" ), "campaign_criterion": BaseStream( report_definitions.CAMPAIGN_CRITERION_FIELDS, @@ -696,6 +789,7 @@ def initialize_core_streams(resource_schema): resource_schema, ["campaign_id","criterion_id"], {"customer_id"}, + filter_param="campaign.id" ), "campaign_labels": BaseStream( report_definitions.CAMPAIGN_LABEL_FIELDS, @@ -713,6 +807,7 @@ def initialize_core_streams(resource_schema): ["carrier_constant"], resource_schema, ["id"], + filter_param="carrier_constant.id" ), "feed": BaseStream( report_definitions.FEED_FIELDS, @@ -720,6 +815,7 @@ def initialize_core_streams(resource_schema): resource_schema, ["id"], {"customer_id"}, + filter_param="feed.id" ), "feed_item": BaseStream( report_definitions.FEED_ITEM_FIELDS, @@ -730,6 +826,7 @@ def initialize_core_streams(resource_schema): "customer_id", "feed_id", }, + filter_param="feed_item.id" ), "labels": BaseStream( report_definitions.LABEL_FIELDS, @@ -737,42 +834,49 @@ def initialize_core_streams(resource_schema): resource_schema, ["id"], {"customer_id"}, + filter_param="label.id" ), "language_constant": BaseStream( report_definitions.LANGUAGE_CONSTANT_FIELDS, ["language_constant"], resource_schema, ["id"], + filter_param="language_constant.id" ), "mobile_app_category_constant": BaseStream( report_definitions.MOBILE_APP_CATEGORY_CONSTANT_FIELDS, ["mobile_app_category_constant"], resource_schema, ["id"], + filter_param="mobile_app_category_constant.id" ), "mobile_device_constant": BaseStream( report_definitions.MOBILE_DEVICE_CONSTANT_FIELDS, ["mobile_device_constant"], resource_schema, ["id"], + filter_param="mobile_device_constant.id" ), "operating_system_version_constant": BaseStream( report_definitions.OPERATING_SYSTEM_VERSION_CONSTANT_FIELDS, ["operating_system_version_constant"], resource_schema, ["id"], + filter_param="operating_system_version_constant.id" ), "topic_constant": BaseStream( report_definitions.TOPIC_CONSTANT_FIELDS, ["topic_constant"], resource_schema, ["id"], + filter_param="topic_constant.id" ), "user_interest": UserInterestStream( report_definitions.USER_INTEREST_FIELDS, ["user_interest"], resource_schema, ["id"], + filter_param="user_interest.user_interest_id" ), "user_list": BaseStream( report_definitions.USER_LIST_FIELDS, @@ -780,6 +884,7 @@ def initialize_core_streams(resource_schema): resource_schema, ["id"], {"customer_id"}, + filter_param="user_list.id" ), } diff --git a/tests/test_google_ads_interrupted_sync_full_table.py b/tests/test_google_ads_interrupted_sync_full_table.py new file mode 100644 index 0000000..5419b7c --- /dev/null +++ b/tests/test_google_ads_interrupted_sync_full_table.py @@ -0,0 +1,132 @@ +from asyncio import streams +import os + +from tap_tester import menagerie, connections, runner + +from base import GoogleAdsBase + + +class InterruptedSyncFullTableTest(GoogleAdsBase): + """Test tap's ability to recover from an interrupted sync for FULL Table stream""" + + @staticmethod + def name(): + return "tt_google_ads_interruption_full_table" + + def test_run(self): + + """ + Scenario: A sync job is interrupted for full table stream. The state is saved with `currently_syncing` + and `last_pk_fetched`(id of last synced record). + The next sync job kicks off, the tap picks only remaining records for interrupted stream and complete the sync. + + Expected State Structure: + state = {'currently_syncing': ('', ''), + 'bookmarks': { + '': {'': {last_pk_fetched: }}, + + Test Cases: + - Verify that id of 1st record in interrupted sync is greater than or equal to last_pk_fetched. + - Verify that all records in the full sync and interrupted sync come in Ascending order. + - Verify interrupted_sync has the fewer records as compared to full sync + - Verify state is flushed if sync is completed. + - Verify resuming sync replicates all records for streams that were yet-to-be-synced + """ + + streams_under_test = { + 'ads', + 'campaign_criterion', + 'feed' + } + + # Create connection + conn_id = connections.ensure_connection(self) + + # Run a discovery job + found_catalogs = self.run_and_verify_check_mode(conn_id) + + # partition catalogs for use in table/field selection + test_catalogs = [catalog for catalog in found_catalogs + if catalog.get('stream_name') in streams_under_test] + + # select fields + self.select_all_streams_and_fields(conn_id, test_catalogs, select_all_fields=False) + + # Run a sync + self.run_and_verify_sync(conn_id) + + # acquire records from target output + full_sync_records = runner.get_records_from_target_output() + + + # Set state such that first stream has 'completed' a sync. The interrupted stream ('campaign_criterion') + # should have a bookmark value prior to the 'completed' streams. + + interrupted_state = { + 'currently_syncing': ('campaign_criterion', '5548074409'), + 'bookmarks': { + 'campaign_criterion': {'5548074409': {'last_pk_fetched': 16990616126}}, + } + } + + # set state for interrupted sync + menagerie.set_state(conn_id, interrupted_state) + + # Run another sync + self.run_and_verify_sync(conn_id) + + # acquire records from target output + interrupted_sync_records = runner.get_records_from_target_output() + final_state = menagerie.get_state(conn_id) + + # stream-level assertions + for stream in streams_under_test: + with self.subTest(stream=stream): + + # gather results + full_records = [message['data'] for message in full_sync_records[stream]['messages']] + full_record_count = len(full_records) + interrupted_records = [message['data'] for message in interrupted_sync_records[stream]['messages']] + interrupted_record_count = len(interrupted_records) + + # campaign_criterion stream has a composite primary key. + # But, to filter out the records, we are using only campaign_id respectively. + if stream == "campaign_criterion": + primary_key = "campaign_id" + else: + primary_key = next(iter(self.expected_primary_keys()[stream])) + + # Verify that all records in the full sync come in Ascending order. + # That means id of current record is greater than id of previous record. + for i in range(1, full_record_count): + self.assertGreaterEqual(full_records[i][primary_key], full_records[i-1][primary_key], + msg='id of the current record is less than the id of the previous record.') + + # Verify that all records in the interrupted sync come in Ascending order. + # That means id of current record is greater than id of previous record. + for i in range(1, interrupted_record_count): + self.assertGreaterEqual(interrupted_records[i][primary_key], interrupted_records[i-1][primary_key], + msg='id of the current record is less than the id of the previous record.') + + if stream in interrupted_state['bookmarks'].keys(): + + # Verify second sync(interrupted_sync) have the less records as compared to first sync(full sync) for interrupted stream + self.assertLess(interrupted_record_count, full_record_count) + + # Verify that id of 1st record in interrupted sync is greater than or equal to last_pk_fetched for interrupted stream. + self.assertGreaterEqual(interrupted_records[0][primary_key], 16990616126, msg='id of first record in interrupted sync is less than last_pk_fetched') + + else: + # Verify resuming sync replicates all records for streams that were yet-to-be-synced + + for record in interrupted_records: + with self.subTest(record_primary_key=record[primary_key]): + self.assertIn(record, full_records, msg='Unexpected record replicated in resuming sync.') + + for record in full_records: + with self.subTest(record_primary_key=record[primary_key]): + self.assertIn(record, interrupted_records, msg='Record missing from resuming sync.' ) + + + # Verify state is flushed after sync completed. + self.assertNotIn(stream, final_state['bookmarks'].keys()) \ No newline at end of file diff --git a/tests/unittests/test_backoff.py b/tests/unittests/test_backoff.py new file mode 100644 index 0000000..eef14fe --- /dev/null +++ b/tests/unittests/test_backoff.py @@ -0,0 +1,113 @@ +import unittest +from unittest.mock import Mock, patch +from tap_google_ads.streams import make_request +from google.api_core.exceptions import InternalServerError, BadGateway, MethodNotImplemented, ServiceUnavailable, GatewayTimeout, TooManyRequests +from requests.exceptions import ReadTimeout + +@patch('time.sleep') +class TestBackoff(unittest.TestCase): + + def test_500_internal_server_error(self, mock_sleep): + """ + Check whether the tap backoffs properly for 5 times in case of InternalServerError. + """ + mocked_google_ads_client = Mock() + mocked_google_ads_client.search.side_effect = InternalServerError("Internal error encountered") + + try: + make_request(mocked_google_ads_client, "", "") + except InternalServerError: + pass + + # Verify that tap backoff for 5 times + self.assertEquals(mocked_google_ads_client.search.call_count, 5) + + def test_501_not_implemented_error(self, mock_sleep): + """ + Check whether the tap backoffs properly for 5 times in case of MethodNotImplemented error. + """ + mocked_google_ads_client = Mock() + mocked_google_ads_client.search.side_effect = MethodNotImplemented("Not Implemented") + + try: + make_request(mocked_google_ads_client, "", "") + except MethodNotImplemented: + pass + + # Verify that tap backoff for 5 times + self.assertEquals(mocked_google_ads_client.search.call_count, 5) + + def test_502_bad_gaetway_error(self, mock_sleep): + """ + Check whether the tap backoffs properly for 5 times in case of BadGateway error. + """ + mocked_google_ads_client = Mock() + mocked_google_ads_client.search.side_effect = BadGateway("Bad Gateway") + + try: + make_request(mocked_google_ads_client, "", "") + except BadGateway: + pass + + # Verify that tap backoff for 5 times + self.assertEquals(mocked_google_ads_client.search.call_count, 5) + + def test_503_service_unavailable_error(self, mock_sleep): + """ + Check whether the tap backoffs properly for 5 times in case of ServiceUnavailable error. + """ + mocked_google_ads_client = Mock() + mocked_google_ads_client.search.side_effect = ServiceUnavailable("Service Unavailable") + + try: + make_request(mocked_google_ads_client, "", "") + except ServiceUnavailable: + pass + + # Verify that tap backoff for 5 times + self.assertEquals(mocked_google_ads_client.search.call_count, 5) + + def test_504_gateway_timeout_error(self, mock_sleep): + """ + Check whether the tap backoffs properly for 5 times in case of GatewayTimeout error. + """ + mocked_google_ads_client = Mock() + mocked_google_ads_client.search.side_effect = GatewayTimeout("GatewayTimeout") + + try: + make_request(mocked_google_ads_client, "", "") + except GatewayTimeout: + pass + + # Verify that tap backoff for 5 times + self.assertEquals(mocked_google_ads_client.search.call_count, 5) + + def test_429_too_may_request_error(self, mock_sleep): + """ + Check whether the tap backoffs properly for 5 times in case of TooManyRequests error. + """ + mocked_google_ads_client = Mock() + mocked_google_ads_client.search.side_effect = TooManyRequests("Resource has been exhausted") + + try: + make_request(mocked_google_ads_client, "", "") + except TooManyRequests: + pass + + # Verify that tap backoff for 5 times + self.assertEquals(mocked_google_ads_client.search.call_count, 5) + + def test_read_timeout_error(self, mock_sleep): + """ + Check whether the tap backoffs properly for 5 times in case of ReadTimeout error. + """ + mocked_google_ads_client = Mock() + mocked_google_ads_client.search.side_effect = ReadTimeout("HTTPSConnectionPool(host='tap-tester-api.sandbox.stitchdata.com', port=443)") + + try: + make_request(mocked_google_ads_client, "", "") + except ReadTimeout: + pass + + # Verify that tap backoff for 5 times + self.assertEquals(mocked_google_ads_client.search.call_count, 5) diff --git a/tests/unittests/test_core_stream_query_building.py b/tests/unittests/test_core_stream_query_building.py new file mode 100644 index 0000000..a519292 --- /dev/null +++ b/tests/unittests/test_core_stream_query_building.py @@ -0,0 +1,70 @@ +import unittest +from tap_google_ads.streams import create_core_stream_query + +SELECTED_FIELDS = ["id"] +RESOURCE_NAME = "ads" + +class TestFullTableQuery(unittest.TestCase): + """ + Test that `create_core_stream_query` function build appropriate query with WHERE, ORDER BY clause. + """ + def test_empty_filter_params_clause(self): + """ + Verify that query does not contain WHERE and ORDER BY clause if filter_params value is None. + """ + + filter_params = None + last_pk_fetched = {} + composite_pks = False + + expected_query = 'SELECT id FROM ads PARAMETERS omit_unselected_resource_names=true' + + actual_query = create_core_stream_query(RESOURCE_NAME, SELECTED_FIELDS, last_pk_fetched, filter_params, composite_pks) + + self.assertEqual(expected_query, actual_query) + + def test_empty_where_clause(self): + """ + Verify that query contain only ORDER BY clause if filter_params value is not None and + last_pk_fetched is empty.(Fresh sync) + """ + filter_params = 'id' + last_pk_fetched = {} + composite_pks = False + expected_query = 'SELECT id FROM ads ORDER BY id ASC PARAMETERS omit_unselected_resource_names=true' + + actual_query = create_core_stream_query(RESOURCE_NAME, SELECTED_FIELDS, last_pk_fetched, filter_params, composite_pks) + + self.assertEqual(expected_query, actual_query) + + def test_where_orderby_clause_composite_pks(self): + """ + Verify that query contains WHERE(inclusive) and ORDER BY clause if filter_params and + last_pk_fetched are available. (interrupted sync). WHERE clause must have equality if stream contain + a composite primary key. + """ + filter_params = 'id' + last_pk_fetched = 4 + composite_pks = True + + expected_query = 'SELECT id FROM ads WHERE id >= 4 ORDER BY id ASC PARAMETERS omit_unselected_resource_names=true' + + actual_query = create_core_stream_query(RESOURCE_NAME, SELECTED_FIELDS, last_pk_fetched, filter_params, composite_pks) + + self.assertEqual(expected_query, actual_query) + + def test_where_orderby_clause_non_composite_pks(self): + """ + Verify that query contains WHERE(exclusive) and ORDER BY clause if filter_params and + last_pk_fetched are available. (interrupted sync). WHERE clause must exclude equality if stream does not contain + a composite primary key. + """ + filter_params = 'id' + last_pk_fetched = 4 + composite_pks = False + + expected_query = 'SELECT id FROM ads WHERE id > 4 ORDER BY id ASC PARAMETERS omit_unselected_resource_names=true' + + actual_query = create_core_stream_query(RESOURCE_NAME, SELECTED_FIELDS, last_pk_fetched, filter_params, composite_pks) + + self.assertEqual(expected_query, actual_query)