diff --git a/tests/base.py b/tests/base.py index c2d72f8..7487f10 100644 --- a/tests/base.py +++ b/tests/base.py @@ -1,11 +1,11 @@ import os import unittest +import dateutil.parser from datetime import datetime as dt from datetime import timedelta +import time -import tap_tester.menagerie as menagerie -import tap_tester.connections as connections -import tap_tester.runner as runner +from tap_tester import menagerie, runner, connections, LOGGER class FreshdeskBaseTest(unittest.TestCase): @@ -17,17 +17,16 @@ class FreshdeskBaseTest(unittest.TestCase): INCREMENTAL = "INCREMENTAL" FULL = "FULL_TABLE" - START_DATE_FORMAT = "%Y-%m-%dT00:00:00Z" # %H:%M:%SZ - BOOKMARK_FORMAT = "%Y-%m-%dT%H:%M:%SZ" + start_date = "" + START_DATE_FORMAT = "%Y-%m-%dT00:00:00Z" # %H:%M:%SZ + BOOKMARK_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ" - EXPECTED_PAGE_SIZE = "expected-page-size" OBEYS_START_DATE = "obey-start-date" - # PARENT_STREAM = "parent-stream" # TODO applies? + PAGE_SIZE = 100 ####################################### # Tap Configurable Metadata Methods # ####################################### - start_date = "" def setUp(self): missing_envs = [x for x in [ @@ -45,11 +44,20 @@ def get_type(): def tap_name(): return "tap-freshdesk" - def get_properties(self): - start_date = dt.today() - timedelta(days=5*365) - start_date_with_fmt = dt.strftime(start_date, self.START_DATE_FORMAT) + def get_properties(self, original: bool = True): + """ + Maintain states for start_date and end_date + :param original: set to false to change the start_date or end_date + """ + return_value = { + 'start_date': '2019-01-04T00:00:00Z', + } + if original: + return return_value - return {'start_date' : start_date_with_fmt} + # Reassign start and end dates + return_value["start_date"] = self.start_date + return return_value def get_credentials(self): return { @@ -63,54 +71,60 @@ def required_environment_variables(self): def expected_metadata(self): """The expected streams and metadata about the streams""" - return { + return { "agents": { self.PRIMARY_KEYS: {"id"}, self.REPLICATION_METHOD: self.INCREMENTAL, self.REPLICATION_KEYS: {"updated_at"}, - self.EXPECTED_PAGE_SIZE: 100 + self.OBEYS_START_DATE: True }, "companies": { self.PRIMARY_KEYS: {"id"}, self.REPLICATION_METHOD: self.INCREMENTAL, self.REPLICATION_KEYS: {"updated_at"}, - self.EXPECTED_PAGE_SIZE: 100 + self.OBEYS_START_DATE: True }, "conversations": { self.PRIMARY_KEYS: {"id"}, self.REPLICATION_METHOD: self.INCREMENTAL, self.REPLICATION_KEYS: {"updated_at"}, - self.EXPECTED_PAGE_SIZE: 100 + self.OBEYS_START_DATE: True }, "groups": { self.PRIMARY_KEYS: {"id"}, self.REPLICATION_METHOD: self.INCREMENTAL, self.REPLICATION_KEYS: {"updated_at"}, - self.EXPECTED_PAGE_SIZE: 100 + self.OBEYS_START_DATE: True }, "roles": { self.PRIMARY_KEYS: {"id"}, self.REPLICATION_METHOD: self.INCREMENTAL, self.REPLICATION_KEYS: {"updated_at"}, - self.EXPECTED_PAGE_SIZE: 100 + self.OBEYS_START_DATE: True }, "satisfaction_ratings": { self.PRIMARY_KEYS: {"id"}, self.REPLICATION_METHOD: self.INCREMENTAL, self.REPLICATION_KEYS: {"updated_at"}, - self.EXPECTED_PAGE_SIZE: 100 + self.OBEYS_START_DATE: True }, "tickets": { self.PRIMARY_KEYS: {"id"}, self.REPLICATION_METHOD: self.INCREMENTAL, self.REPLICATION_KEYS: {"updated_at"}, - self.EXPECTED_PAGE_SIZE: 100 + self.OBEYS_START_DATE: True }, "time_entries": { self.PRIMARY_KEYS: {"id"}, self.REPLICATION_METHOD: self.INCREMENTAL, self.REPLICATION_KEYS: {"updated_at"}, - self.EXPECTED_PAGE_SIZE: 100 + self.OBEYS_START_DATE: True + }, + "contacts": { + self.PRIMARY_KEYS: {"id"}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {"updated_at"}, + self.OBEYS_START_DATE: True }, } @@ -120,7 +134,7 @@ def expected_metadata(self): def expected_primary_keys(self): """ - return a dictionary with key of table name + Return a dictionary with key of table name and value as a set of primary key fields """ return {table: properties.get(self.PRIMARY_KEYS, set()) @@ -129,7 +143,8 @@ def expected_primary_keys(self): def expected_automatic_fields(self): """ - return a dictionary with key of table name and value as the primary keys and replication keys + Return a dictionary with key of table name + and value as the primary keys and replication keys """ pks = self.expected_primary_keys() rks = self.expected_replication_keys() @@ -138,47 +153,36 @@ def expected_automatic_fields(self): for stream in self.expected_streams()} def expected_replication_method(self): - """return a dictionary with key of table name and value of replication method""" + """ + Return a dictionary with key of table name + and value of replication method + """ return {table: properties.get(self.REPLICATION_METHOD, None) for table, properties in self.expected_metadata().items()} - def expected_streams(self): - """A set of expected stream names""" - return set(self.expected_metadata().keys()) + def expected_streams(self, only_trial_account_streams: bool = False): + """A set of expected stream names based on only_trial_account_streams param""" + if only_trial_account_streams: + # To collect "time_entries", "satisfaction_ratings" pro account is needed. Skipping them for now. + return set(self.expected_metadata().keys() - {"time_entries", "satisfaction_ratings"}) + else: + # Returns all streams. + return set(self.expected_metadata().keys()) def expected_replication_keys(self): """ - return a dictionary with key of table name + Return a dictionary with key of table name and value as a set of replication key fields """ return {table: properties.get(self.REPLICATION_KEYS, set()) for table, properties in self.expected_metadata().items()} - def expected_page_limits(self): - return {table: properties.get(self.EXPECTED_PAGE_SIZE, set()) - for table, properties - in self.expected_metadata().items()} - - ########################## # Common Test Actions # ########################## - def create_connection_and_run_check(self, original_properties: bool = True): - """Create a new connection with the test name""" - # Create the connection - conn_id = connections.ensure_connection(self, original_properties) - - # Run a check job using orchestrator (discovery) - check_job_name = runner.run_check_mode(self, conn_id) - - # Assert that the check job succeeded - exit_status = menagerie.get_exit_status(conn_id, check_job_name) - menagerie.verify_check_exit_status(self, exit_status, check_job_name) - return conn_id - def run_and_verify_check_mode(self, conn_id): """ Run the tap in check mode and verify it succeeds. @@ -194,11 +198,16 @@ def run_and_verify_check_mode(self, conn_id): menagerie.verify_check_exit_status(self, exit_status, check_job_name) found_catalogs = menagerie.get_catalogs(conn_id) - self.assertEqual( - len(found_catalogs), 0, - msg="expected 0 length catalog for check job, conn_id: {}".format(conn_id) - ) - print("Verified len(found_catalogs) = 0 for job with conn_id: {}".format(conn_id)) + self.assertGreater(len(found_catalogs), 0, + msg="unable to locate schemas for connection {}".format(conn_id)) + + found_catalog_names = set(map(lambda c: c['stream_name'], found_catalogs)) + LOGGER.info(found_catalog_names) + self.assertSetEqual(self.expected_streams(), found_catalog_names, + msg="discovered schemas do not match") + LOGGER.info("discovered schemas are OK") + + return found_catalogs def run_and_verify_sync(self, conn_id): """ @@ -211,52 +220,128 @@ def run_and_verify_sync(self, conn_id): # Verify tap and target exit codes exit_status = menagerie.get_exit_status(conn_id, sync_job_name) - # BHT Freshdesk bug, discovery_exit_status is left as "None", not being set to 0 - # as expected. Dev is not spending time fixing Tier 3 tap issues so skip - # verification in order to allow some level of regression test to run. - #menagerie.verify_sync_exit_status(self, exit_status, sync_job_name) + menagerie.verify_sync_exit_status(self, exit_status, sync_job_name) # Verify actual rows were synced sync_record_count = runner.examine_target_output_file(self, conn_id, - self.expected_streams(), + self.expected_streams(only_trial_account_streams=True), self.expected_primary_keys()) total_row_count = sum(sync_record_count.values()) self.assertGreater(total_row_count, 0, msg="failed to replicate any data: {}".format(sync_record_count)) - print("total replicated row count: {}".format(total_row_count)) + LOGGER.info("Total replicated row count: {}".format(total_row_count)) return sync_record_count - - @staticmethod - def parse_date(date_value): + def perform_and_verify_table_and_field_selection(self, + conn_id, + test_catalogs, + select_all_fields=True): """ - Pass in string-formatted-datetime, parse the value, and return it as an unformatted datetime object. + Perform table and field selection based off of the streams to select + set and field selection parameters. + Verify this results in the expected streams selected and all or no + fields selected for those streams. """ - date_formats = { - "%Y-%m-%dT%H:%M:%S.%fZ", - "%Y-%m-%dT%H:%M:%SZ", - "%Y-%m-%dT%H:%M:%S.%f+00:00", - "%Y-%m-%dT%H:%M:%S+00:00", - "%Y-%m-%d" - } - for date_format in date_formats: - try: - date_stripped = dt.strptime(date_value, date_format) - return date_stripped - except ValueError: - continue - raise NotImplementedError("Tests do not account for dates of this format: {}".format(date_value)) + # Select all available fields or select no fields from all testable streams + self.select_all_streams_and_fields( + conn_id=conn_id, catalogs=test_catalogs, select_all_fields=select_all_fields + ) + + catalogs = menagerie.get_catalogs(conn_id) + + # Ensure our selection affects the catalog + expected_selected = [tc.get('stream_name') for tc in test_catalogs] + for cat in catalogs: + catalog_entry = menagerie.get_annotated_schema(conn_id, cat['stream_id']) + + # Verify all testable streams are selected + selected = catalog_entry.get('annotated-schema').get('selected') + LOGGER.info("Validating selection on {}: {}".format(cat['stream_name'], selected)) + if cat['stream_name'] not in expected_selected: + self.assertFalse(selected, msg="Stream selected, but not testable.") + continue # Skip remaining assertions if we aren't selecting this stream + self.assertTrue(selected, msg="Stream not selected.") + + if select_all_fields: + # Verify all fields within each selected stream are selected + for field, field_props in catalog_entry.get('annotated-schema').get('properties').items(): + field_selected = field_props.get('selected') + LOGGER.info("\tValidating selection on {}.{}: {}".format( + cat['stream_name'], field, field_selected)) + self.assertTrue(field_selected, msg="Field not selected.") + else: + # Verify only automatic fields are selected + expected_automatic_fields = self.expected_automatic_fields().get(cat['stream_name']) + selected_fields = self.get_selected_fields_from_metadata(catalog_entry['metadata']) + self.assertEqual(expected_automatic_fields, selected_fields) + @staticmethod + def get_selected_fields_from_metadata(metadata): + """Return selected fields from the metadata""" + selected_fields = set() + for field in metadata: + is_field_metadata = len(field['breadcrumb']) > 1 + inclusion_automatic_or_selected = ( + field['metadata']['selected'] is True or + field['metadata']['inclusion'] == 'automatic' + ) + if is_field_metadata and inclusion_automatic_or_selected: + selected_fields.add(field['breadcrumb'][1]) + return selected_fields + + @staticmethod + def select_all_streams_and_fields(conn_id, catalogs, select_all_fields: bool = True): + """Select all streams and all fields within streams""" + for catalog in catalogs: + schema = menagerie.get_annotated_schema(conn_id, catalog['stream_id']) - def timedelta_formatted(self, dtime, days=0, str_format="%Y-%m-%dT00:00:00Z"): - date_stripped = dt.strptime(dtime, str_format) - return_date = date_stripped + timedelta(days=days) + non_selected_properties = [] + if not select_all_fields: + # Get a list of all properties so that none are selected + non_selected_properties = schema.get('annotated-schema', {}).get( + 'properties', {}).keys() - return dt.strftime(return_date, str_format) + connections.select_catalog_and_fields_via_metadata( + conn_id, catalog, schema, [], non_selected_properties) ################################ # Tap Specific Test Actions # ################################ + + def dt_to_ts(self, dtime, format): + """Convert datetime with a format to timestamp""" + date_stripped = int(time.mktime(dt.strptime(dtime, format).timetuple())) + return date_stripped + + def calculated_states_by_stream(self, current_state): + """ + Look at the bookmarks from a previous sync and set a new bookmark + value based off timedelta expectations. This ensures the subsequent sync will replicate + at least 1 record but, fewer records than the previous sync. + + Sufficient test data is required for this test to cover a given stream. + An incremental replication stream must have at least two records with + replication keys that differ by some time span. + + If the test data is changed in the future this may break expectations for this test. + """ + timedelta_by_stream = {stream: [0, 12, 0] # {stream_name: [days, hours, minutes], ...} + for stream in current_state['bookmarks'].keys()} + + stream_to_calculated_state = { + stream: "" for stream in current_state['bookmarks'].keys()} + for stream, state in current_state['bookmarks'].items(): + state_key, state_value = list(state.keys())[0], list(state.values())[0] + state_as_datetime = dateutil.parser.parse(state_value) + + days, hours, minutes = timedelta_by_stream[stream] + calculated_state_as_datetime = state_as_datetime - timedelta(days=days, hours=hours, minutes=minutes) + + state_format = self.BOOKMARK_FORMAT + calculated_state_formatted = dt.strftime(calculated_state_as_datetime, state_format) + stream_to_calculated_state[stream] = {state_key: calculated_state_formatted} + + return stream_to_calculated_state diff --git a/tests/test_freshdesk_all_fields.py b/tests/test_freshdesk_all_fields.py new file mode 100644 index 0000000..9c0b7f2 --- /dev/null +++ b/tests/test_freshdesk_all_fields.py @@ -0,0 +1,103 @@ +from tap_tester import runner, connections, menagerie + +from base import FreshdeskBaseTest + +# As we are not able to generate following fields by Freshdesk UI, so removed it from expectation list. +KNOWN_MISSING_FIELDS = { + 'tickets': { + 'facebook_id', + 'description', + 'description_text', + 'twitter_id', + 'name', + 'phone', + 'email' + }, + 'groups': { + 'auto_ticket_assign', + 'agent_ids' + }, + 'agents': { + 'group_ids', + 'role_ids' + }, + 'contacts': { + 'view_all_tickets', + 'other_companies', + 'other_emails', + 'tags', + 'avatar' + } +} + + +class TestFreshdeskAllFields(FreshdeskBaseTest): + """Test that with all fields selected for a stream automatic and available fields are replicated""" + + @staticmethod + def name(): + return "tap_tester_freshdesk_all_fields" + + def test_run(self): + """ + • Verify no unexpected streams were replicated + • Verify that more than just the automatic fields are replicated for each stream. + • Verify all fields for each stream are replicated + """ + + expected_streams = self.expected_streams(only_trial_account_streams=True) + + # Instantiate connection + conn_id = connections.ensure_connection(self) + + # Run check mode + found_catalogs = self.run_and_verify_check_mode(conn_id) + + # Table and field selection + test_catalogs_all_fields = [catalog for catalog in found_catalogs + if catalog.get('stream_name') in expected_streams] + self.perform_and_verify_table_and_field_selection( + conn_id, test_catalogs_all_fields, select_all_fields=True, + ) + + # Grab metadata after performing table-and-field selection to set expectations + # used for asserting all fields are replicated + stream_to_all_catalog_fields = dict() + for catalog in test_catalogs_all_fields: + stream_id, stream_name = catalog['stream_id'], catalog['stream_name'] + catalog_entry = menagerie.get_annotated_schema(conn_id, stream_id) + fields_from_field_level_md = [md_entry['breadcrumb'][1] + for md_entry in catalog_entry['metadata'] + if md_entry['breadcrumb'] != []] + stream_to_all_catalog_fields[stream_name] = set(fields_from_field_level_md) + + # Run initial sync + record_count_by_stream = self.run_and_verify_sync(conn_id) + synced_records = runner.get_records_from_target_output() + + # Verify no unexpected streams were replicated + synced_stream_names = set(synced_records.keys()) + self.assertSetEqual(expected_streams, synced_stream_names) + + for stream in expected_streams: + with self.subTest(stream=stream): + # Expected values + expected_automatic_fields = self.expected_automatic_fields().get(stream) + + # Get all expected keys + expected_all_keys = stream_to_all_catalog_fields[stream] + + messages = synced_records.get(stream) + # Collect actual values + actual_all_keys = set() + for message in messages['messages']: + if message['action'] == 'upsert': + actual_all_keys.update(message['data'].keys()) + + expected_all_keys = expected_all_keys - KNOWN_MISSING_FIELDS.get(stream, set()) + + # Verify all fields for a stream were replicated + self.assertGreater(len(expected_all_keys),len(expected_automatic_fields)) + self.assertTrue(expected_automatic_fields.issubset(expected_all_keys), + msg=f'{expected_automatic_fields - expected_all_keys} is not in "expected_all_keys"') + self.assertSetEqual(expected_all_keys, actual_all_keys) diff --git a/tests/test_freshdesk_automatic_fields.py b/tests/test_freshdesk_automatic_fields.py new file mode 100644 index 0000000..8c63211 --- /dev/null +++ b/tests/test_freshdesk_automatic_fields.py @@ -0,0 +1,71 @@ +from tap_tester import runner, connections + +from base import FreshdeskBaseTest + + +class TestFreshdeskAutomaticFields(FreshdeskBaseTest): + """Test that with no fields selected for a stream automatic fields are still replicated""" + + @staticmethod + def name(): + return "tap_tester_freshdesk_automatic_fields" + + def test_run(self): + """ + • Verify we can deselect all fields except when inclusion=automatic, which is handled by base.py methods + • Verify that only the automatic fields are sent to the target. + • Verify that all replicated records have unique primary key values. + """ + + expected_streams = self.expected_streams(only_trial_account_streams=True) + + # Instantiate connection + conn_id = connections.ensure_connection(self) + + # Run check mode + found_catalogs = self.run_and_verify_check_mode(conn_id) + + # Table and field selection + test_catalogs_automatic_fields = [catalog for catalog in found_catalogs + if catalog.get('stream_name') in expected_streams] + + self.perform_and_verify_table_and_field_selection( + conn_id, test_catalogs_automatic_fields, select_all_fields=False, + ) + + # Run initial sync + record_count_by_stream = self.run_and_verify_sync(conn_id) + synced_records = runner.get_records_from_target_output() + + for stream in expected_streams: + with self.subTest(stream=stream): + + # Expected values + expected_primary_keys = self.expected_primary_keys()[stream] + expected_keys = self.expected_automatic_fields().get(stream) + + # Collect actual values + data = synced_records.get(stream, {}) + record_messages_keys = [set(row.get('data').keys()) + for row in data.get('messages', {})] + primary_keys_list = [ + tuple(message.get('data').get(expected_pk) + for expected_pk in expected_primary_keys) + for message in data.get('messages') + if message.get('action') == 'upsert'] + unique_primary_keys_list = set(primary_keys_list) + + # Verify that you get some records for each stream + self.assertGreater( + record_count_by_stream.get(stream, -1), 0, + msg="The number of records is not over the stream max limit for the {} stream".format(stream)) + + # Verify that only the automatic fields are sent to the target + for actual_keys in record_messages_keys: + self.assertSetEqual(expected_keys, actual_keys) + + # Verify that all replicated records have unique primary key values. + self.assertEqual( + len(primary_keys_list), + len(unique_primary_keys_list), + msg="Replicated record does not have unique primary key values.") diff --git a/tests/test_freshdesk_bookmarks.py b/tests/test_freshdesk_bookmarks.py index 00a934d..027ab59 100644 --- a/tests/test_freshdesk_bookmarks.py +++ b/tests/test_freshdesk_bookmarks.py @@ -1,12 +1,3 @@ -import re -import os -import pytz -import time -import dateutil.parser - -from datetime import timedelta -from datetime import datetime - from tap_tester import menagerie, connections, runner from base import FreshdeskBaseTest @@ -15,236 +6,150 @@ class FreshdeskBookmarks(FreshdeskBaseTest): """Test incremental replication via bookmarks (without CRUD).""" - start_date = "" - test_streams = {} - @staticmethod def name(): - return "tt_freshdesk_bookmarks" - - def get_properties(self): - return_value = { - 'start_date': '2019-01-04T00:00:00Z', # start date includes roles - } - - self.start_date = return_value['start_date'] - return return_value + return "tap_tester_freshdesk_bookmarks" - def calculated_states_by_stream(self, current_state): + def test_run(self): """ - Look at the bookmarks from a previous sync and set a new bookmark - value based off timedelta expectations. This ensures the subsequent sync will replicate - at least 1 record but, fewer records than the previous sync. - - Sufficient test data is required for this test to cover a given stream. - An incremental replication stream must have at least two records with - replication keys that differ by some time span. - - If the test data is changed in the future this may break expectations for this test. + • Verify that for each stream you can do a sync which records bookmarks. + • Verify that the bookmark is the maximum value sent to the target for the replication key. + • Verify that a second sync respects the bookmark + All data of the second sync is >= the bookmark from the first sync + The number of records in the 2nd sync is less then the first + + PREREQUISITE + For EACH stream that is incrementally replicated there are multiple rows of data with + different values for the replication key """ - bookmark_streams = self.test_streams - {'conversations'} - print("bookmark_streams: {}".format(bookmark_streams)) - - timedelta_by_stream = {stream: [0, 12, 0] # {stream_name: [days, hours, minutes], ...} - for stream in bookmark_streams} - #timedelta_by_stream['tickets'] = [698, 17, 26] # original conversations math, must update - # TODO Add time_entries, satisfaction_ratings streams (403) - # BUG https://jira.talendforge.org/browse/TDL-17559. Redefining state to be closer to - # expected format so the underlying code wont have to change as much after the JIRA fix - current_state = {'bookmarks': current_state} - del current_state['bookmarks']['tickets_deleted'] # Delete unexpected streams - del current_state['bookmarks']['tickets_spam'] # generated by filter? - - # Keep existing format for this method so it will work after bug fix - stream_to_calculated_state = {stream: "" for stream in bookmark_streams} - for stream, state_value in current_state['bookmarks'].items(): - - if stream in bookmark_streams: - state_as_datetime = dateutil.parser.parse(state_value) - - days, hours, minutes = timedelta_by_stream[stream] - calculated_state_as_datetime = state_as_datetime - timedelta(days=days, hours=hours, minutes=minutes) - - state_format = self.BOOKMARK_FORMAT - calculated_state_formatted = datetime.strftime(calculated_state_as_datetime, state_format) - if calculated_state_formatted < self.start_date: - raise RuntimeError("Time delta error for stream {}, sim start_date < start_date!".format(stream)) - stream_to_calculated_state[stream] = calculated_state_formatted - - return stream_to_calculated_state - - def test_run(self): - """A Bookmarks Test""" - # Since this tap has no table and field selection all streams will sync every time. So define - # the subset of streams to run assertions against until all streams are covered - self.test_streams = {'tickets', 'companies', 'agents', 'groups', 'roles', 'conversations'} + # Tickets and Contacts stream also collect some deleted data on the basis of filter param. + # Written separate bookmark test case for them in test_freshdesk_bookmarks_stream_with_fillter_param.py + expected_streams = self.expected_streams(only_trial_account_streams=True) - {'tickets', 'contacts'} expected_replication_keys = self.expected_replication_keys() expected_replication_methods = self.expected_replication_method() ########################################################################## - ### First Sync + # First Sync ########################################################################## conn_id = connections.ensure_connection(self) # Run in check mode - check_job_name = self.run_and_verify_check_mode(conn_id) + found_catalogs = self.run_and_verify_check_mode(conn_id) + + # Table and field selection + catalog_entries = [catalog for catalog in found_catalogs + if catalog.get('tap_stream_id') in expected_streams] + + self.perform_and_verify_table_and_field_selection( + conn_id, catalog_entries) # Run a sync job using orchestrator first_sync_record_count = self.run_and_verify_sync(conn_id) - first_sync_messages = runner.get_records_from_target_output() + first_sync_records = runner.get_records_from_target_output() first_sync_bookmarks = menagerie.get_state(conn_id) - # Update based on sync data - first_sync_empty = self.test_streams - first_sync_messages.keys() - if len(first_sync_empty) > 0: - print("Missing stream(s): {} in sync 1. Failing test for stream(s)".format(first_sync_empty)) - self.first_sync_empty = first_sync_empty - first_sync_bonus = first_sync_messages.keys() - self.test_streams - if len(first_sync_bonus) > 0: - print("Found stream: {} in first sync. Add to test_streams?".format(first_sync_bonus)) - ########################################################################## - ### Update State Between Syncs + # Update State Between Syncs ########################################################################## - #new_states = {'bookmarks': dict()} # BUG TDL-17559 + new_states = {'bookmarks': dict()} simulated_states = self.calculated_states_by_stream(first_sync_bookmarks) - # for stream, new_state in simulated_states.items(): # BUG TDL-17559 - # new_states['bookmarks'][stream] = new_state # Save expected format - # menagerie.set_state(conn_id, new_states) - menagerie.set_state(conn_id, simulated_states) + for stream, new_state in simulated_states.items(): + new_states['bookmarks'][stream] = new_state + menagerie.set_state(conn_id, new_states) ########################################################################## - ### Second Sync + # Second Sync ########################################################################## second_sync_record_count = self.run_and_verify_sync(conn_id) - second_sync_messages = runner.get_records_from_target_output() + second_sync_records = runner.get_records_from_target_output() second_sync_bookmarks = menagerie.get_state(conn_id) - # Update based on sync data - second_sync_empty = self.test_streams - second_sync_messages.keys() - if len(second_sync_empty) > 0: - print("Missing stream(s): {} in sync 2. Failing test. Check test data!"\ - .format(second_sync_empty)) - self.second_sync_empty = second_sync_empty - second_sync_bonus = second_sync_messages.keys() - self.test_streams - if len(second_sync_bonus) > 0: - print("Found stream(s): {} in second sync. Add to test_streams?".format(second_sync_bonus)) - ########################################################################## - ### Test By Stream + # Test By Stream ########################################################################## - for stream in self.test_streams: # Add supported streams 1 by 1 + for stream in expected_streams: # Add supported streams 1 by 1 with self.subTest(stream=stream): - # Assert failures for streams not present in first sync (loss of coverage) - if stream in self.first_sync_empty: - self.assertTrue(False, msg="Stream: {} no longer in sync 1. Check test data".format(stream)) - - continue - - # Assert failures for streams present in first sync but not second sync - if stream in self.second_sync_empty: - if stream == 'conversations': - print("Commented out failing test case for stream: {}".format(stream)) - print("See https://jira.talendforge.org/browse/TDL-17738 for details") - # conversations is a child of tickets. When the child object (conversation / note) - # is updated, the parent object (ticket) is also updated. The ticket is then being - # sync'd after update but the child conversation of that updated ticket is not - # (at least when the actual note text was updated. There are several ways to update - # the child). - #self.assertTrue(False, msg="Stream: {} present in sync 1, missing in sync 2!".format(stream)) - - continue - - self.assertTrue(False, msg="Stream: {} present in sync 1, missing in sync 2!".format(stream)) - - continue - - # expected values + # Expected values expected_replication_method = expected_replication_methods[stream] - # collect information for assertions from syncs 1 & 2 base on expected values + # Collect information for assertions from syncs 1 & 2 base on expected values first_sync_count = first_sync_record_count.get(stream, 0) second_sync_count = second_sync_record_count.get(stream, 0) - first_sync_records = [record.get('data') for record in - first_sync_messages.get(stream).get('messages') + first_sync_messages = [record.get('data') for record in + first_sync_records.get(stream, {'messages': []}).get('messages') if record.get('action') == 'upsert'] - second_sync_records = [record.get('data') for record in - second_sync_messages.get(stream).get('messages') + second_sync_messages = [record.get('data') for record in + second_sync_records.get(stream, {'messages': []}).get('messages') if record.get('action') == 'upsert'] - if stream != 'conversations': # conversations has no bookmark - first_bookmark_value = first_sync_bookmarks.get(stream) - second_bookmark_value = second_sync_bookmarks.get(stream) + first_bookmark_key_value = first_sync_bookmarks.get('bookmarks', {}).get(stream) + second_bookmark_key_value = second_sync_bookmarks.get('bookmarks', {}).get(stream) if expected_replication_method == self.INCREMENTAL: - # collect information specific to incremental streams from syncs 1 & 2 - replication_key = next(iter(expected_replication_keys[stream])) - if stream != 'conversations': # conversations has no bookmark - simulated_bookmark_value = simulated_states[stream] + # Collect information specific to incremental streams from syncs 1 & 2 + replication_key = list(expected_replication_keys[stream])[0] + first_bookmark_value = first_bookmark_key_value.get(replication_key) + second_bookmark_value = second_bookmark_key_value.get(replication_key) - if stream == 'conversations': - print("*** Only checking sync counts for stream: {}".format(stream)) - # TODO discuss re-factor to use tickets bookmark for conversations assertions - # Verify the number of records in the 2nd sync is less then the first - self.assertLessEqual(second_sync_count, first_sync_count) - if second_sync_count == first_sync_count: - print("WARN: first_sync_count == second_sync_count for stream: {}".format(stream)) + first_bookmark_value_ts = self.dt_to_ts(first_bookmark_value, self.BOOKMARK_FORMAT) + second_bookmark_value_ts = self.dt_to_ts(second_bookmark_value, self.BOOKMARK_FORMAT) - continue + simulated_bookmark_value_ts = self.dt_to_ts( + new_states['bookmarks'][stream][replication_key], self.BOOKMARK_FORMAT) # Verify the first sync sets a bookmark of the expected form + self.assertIsNotNone(first_bookmark_key_value) self.assertIsNotNone(first_bookmark_value) # Verify the second sync sets a bookmark of the expected form + self.assertIsNotNone(second_bookmark_key_value) self.assertIsNotNone(second_bookmark_value) - # Verify the second sync bookmark is Equal to the first sync bookmark - # assumes no changes to data during test - self.assertEqual(second_bookmark_value, first_bookmark_value) + # Verify the second sync bookmark is Equal or Greater than the first sync bookmark + self.assertGreaterEqual( + second_bookmark_value_ts, first_bookmark_value_ts) - # Verify the number of records in the 2nd sync is less then the first - if stream == 'roles': - self.assertEqual(second_sync_count, first_sync_count) - print("WARN: Less covereage, unable to update records for stream: {}".format(stream)) - else: - self.assertLess(second_sync_count, first_sync_count) - - # Verify the bookmark is the max value sent to the target for a given replication key. - rec_time = [] - for record in first_sync_records: - rec_time += record['updated_at'], - - rec_time.sort() - self.assertEqual(rec_time[-1], first_bookmark_value) - - rec_time = [] - for record in second_sync_records: - rec_time += record['updated_at'], - - rec_time.sort() - self.assertEqual(rec_time[-1], second_bookmark_value) - - # Verify all replication key values in sync 2 are >= the simulated bookmark value. - for record in second_sync_records: - self.assertTrue(record['updated_at'] >= simulated_states[stream], - msg="record time cannot be less than bookmark time" + for record in first_sync_messages: + # Verify the first sync bookmark value is the max replication key value for a given stream + replication_key_value = self.dt_to_ts( + record.get(replication_key), self.BOOKMARK_FORMAT) + + self.assertLessEqual( + replication_key_value, first_bookmark_value_ts, + msg="First sync bookmark was set incorrectly, a record with a \ + greater replication-key value was synced." + ) + + for record in second_sync_messages: + # Verify the second sync bookmark value is the max replication key value for a given stream + replication_key_value = self.dt_to_ts(record.get(replication_key), self.BOOKMARK_FORMAT) + + self.assertGreaterEqual(replication_key_value, simulated_bookmark_value_ts, + msg="Second sync records do not respect the previous bookmark.") + + self.assertLessEqual( + replication_key_value, second_bookmark_value_ts, + msg="Second sync bookmark was set incorrectly, a record with a \ + greater replication-key value was synced." ) + # Verify the number of records in the 2nd sync is less then the first + self.assertLessEqual(second_sync_count, first_sync_count) # No full table streams for freshdesk as of Jan 31 2022 else: - raise NotImplementedError( - "INVALID EXPECTATIONS\t\tSTREAM: {} REPLICATION_METHOD: {}".format(stream, expected_replication_method) + "INVALID EXPECTATIONS\t\tSTREAM: {} REPLICATION_METHOD: {}".format( + stream, expected_replication_method) ) - # Verify at least 1 record was replicated in the second sync - self.assertGreater(second_sync_count, 0, msg="We are not fully testing bookmarking for {}".format(stream)) + self.assertGreater( + second_sync_count, 0, msg="We are not fully testing bookmarking for {}".format(stream)) diff --git a/tests/test_freshdesk_bookmarks_stream_with_fillter_param.py b/tests/test_freshdesk_bookmarks_stream_with_fillter_param.py new file mode 100644 index 0000000..4d2f375 --- /dev/null +++ b/tests/test_freshdesk_bookmarks_stream_with_fillter_param.py @@ -0,0 +1,153 @@ +from tap_tester import connections, runner, menagerie + +from base import FreshdeskBaseTest + + +class BookmarkTest(FreshdeskBaseTest): + """ + Test tap sets a separate bookmark for tickets and contacts streams filter param + tickets_deleted, tickets_spam, contacts_deleted and contacts_blocked + and respects it for the next sync. + """ + + def name(self): + return "tap_tester_freshdesk_bookmarks_stream_with_filter_param" + + def test_run(self): + """ + Verify that for each stream you can do a sync that records bookmarks. + That the bookmark is the maximum value sent to the target for the replication key. + That a second sync respects the bookmark + All data of the second sync is >= the bookmark from the first sync + The number of records in the 2nd sync is less than the first (This assumes that + new data added to the stream is done at a rate slow enough that you haven't + doubled the amount of data from the start date to the first sync between + the first sync and second sync run in this test) + PREREQUISITE + For EACH stream that is incrementally replicated, there are multiple rows of data with + different values for the replication key + """ + + streams_to_test = {'tickets', 'contacts'} + expected_replication_keys = self.expected_replication_keys() + + ########################################################################## + # First Sync + ########################################################################## + conn_id = connections.ensure_connection(self) + + # Run in check mode + found_catalogs = self.run_and_verify_check_mode(conn_id) + + # Table and field selection + catalog_entries = [catalog for catalog in found_catalogs + if catalog.get('tap_stream_id') in streams_to_test] + + self.perform_and_verify_table_and_field_selection( + conn_id, catalog_entries) + + # Run a first sync job using orchestrator + first_sync_record_count = self.run_and_verify_sync(conn_id) + first_sync_records = runner.get_records_from_target_output() + first_sync_bookmarks = menagerie.get_state(conn_id) + + ########################################################################## + # Update State Between Syncs + ########################################################################## + + new_states = {'bookmarks': dict()} + simulated_states = self.calculated_states_by_stream(first_sync_bookmarks) + for stream, new_state in simulated_states.items(): + new_states['bookmarks'][stream] = new_state + menagerie.set_state(conn_id, new_states) + + ########################################################################## + # Second Sync + ########################################################################## + + second_sync_record_count = self.run_and_verify_sync(conn_id) + second_sync_records = runner.get_records_from_target_output() + second_sync_bookmarks = menagerie.get_state(conn_id) + + ########################################################################## + # Test By Stream + ########################################################################## + + for stream in streams_to_test: + with self.subTest(stream=stream): + + replication_key = list(expected_replication_keys[stream])[0] + if stream == 'tickets': + stream_filters = ['', 'deleted', 'spam'] + # Skipping "contacts_blocked" filter as there is no data present for it. + if stream == 'contacts': + stream_filters = ['', 'deleted'] + + second_sync_count = second_sync_record_count.get(stream, 0) + # Verify at least 1 record was replicated in the second sync + self.assertGreater( + second_sync_count, 0, msg="We are not fully testing bookmarking for {}".format(stream)) + + for filter in stream_filters: + filter_stream = stream + if filter: + filter_stream = filter_stream + "_" + filter + + # Collect information for assertions from syncs 1 & 2 base on expected values + first_sync_messages = [record.get('data') for record in + first_sync_records.get(stream, {}).get('messages', []) + if record.get('action') == 'upsert' and record.get('data').get(filter) == 'true'] + second_sync_messages = [record.get('data') for record in + second_sync_records.get(stream, {}).get('messages', []) + if record.get('action') == 'upsert' and record.get('data').get(filter) == 'true'] + else: + # Collect information for assertions from syncs 1 & 2 base on expected values + first_sync_messages = [record.get('data') for record in + first_sync_records.get(stream, {}).get('messages', []) + if record.get('action') == 'upsert' and not any(record.get('data').get(_filter) for _filter in stream_filters)] + second_sync_messages = [record.get('data') for record in + second_sync_records.get(stream, {}).get('messages', []) + if record.get('action') == 'upsert' and not any(record.get('data').get(_filter) for _filter in stream_filters)] + + # Get bookmark for tickets/contacts stream + first_bookmark_value = first_sync_bookmarks.get('bookmarks', {}).get(filter_stream, {}).get(replication_key) + second_bookmark_value = second_sync_bookmarks.get('bookmarks', {}).get(filter_stream, {}).get(replication_key) + + first_bookmark_value_ts = self.dt_to_ts(first_bookmark_value, self.BOOKMARK_FORMAT) + second_bookmark_value_ts = self.dt_to_ts(second_bookmark_value, self.BOOKMARK_FORMAT) + + simulated_bookmark_value = self.dt_to_ts( + new_states['bookmarks'][filter_stream][replication_key], self.BOOKMARK_FORMAT) + + # Verify the first sync sets bookmarks of the expected form + self.assertIsNotNone(first_bookmark_value) + + # Verify the second sync sets bookmarks of the expected form + self.assertIsNotNone(second_bookmark_value) + + for record in first_sync_messages: + + # Verify the first sync bookmark value is the max replication key value for a given stream + replication_key_value = self.dt_to_ts(record.get(replication_key), self.BOOKMARK_FORMAT) + # Verify the first sync bookmark value is the max replication key value for a tickets/contacts stream + self.assertLessEqual( + replication_key_value, first_bookmark_value_ts, + msg=( + "First sync bookmark for {} was set incorrectly, a \ + record with a greater replication-key value was synced.".format(stream)) + ) + + for record in second_sync_messages: + + # Verify the second sync bookmark value is the max replication key value for a given stream + replication_key_value = self.dt_to_ts(record.get(replication_key), self.BOOKMARK_FORMAT) + # Verify the second sync bookmark value is the max replication key value for a tickets/contacts stream + self.assertGreaterEqual(replication_key_value, simulated_bookmark_value, + msg=("Second sync records do not respect the previous bookmark for {}.".format(stream))) + + self.assertLessEqual( + replication_key_value, second_bookmark_value_ts, + msg=( + "First sync bookmark for {} was set incorrectly, a record with a \ + greater replication-key value was synced.".format(stream)) + ) diff --git a/tests/test_freshdesk_check.py b/tests/test_freshdesk_check.py deleted file mode 100644 index 8a94917..0000000 --- a/tests/test_freshdesk_check.py +++ /dev/null @@ -1,29 +0,0 @@ -"""Test tap check mode and metadata/annotated-schema.""" -import re - -from tap_tester import menagerie, connections, runner - -from base import FreshdeskBaseTest - - -class FreshdeskCheckTest(FreshdeskBaseTest): - """Test tap check mode and metadata/annotated-schema conforms to standards.""" - - @staticmethod - def name(): - return "tt_freshdesk_check" - - def test_run(self): - """ - Freshdesk check test (does not run discovery). - Verify that check does NOT create a discovery catalog, schema, metadata, etc. - - • Verify check job does not populate found_catalogs - • Verify no critical errors are thrown for check job - """ - streams_to_test = self.expected_streams() - - conn_id = connections.ensure_connection(self) - - # Run and verify the check, see base.py for details - self.run_and_verify_check_mode(conn_id) diff --git a/tests/test_freshdesk_discovery.py b/tests/test_freshdesk_discovery.py new file mode 100644 index 0000000..798df94 --- /dev/null +++ b/tests/test_freshdesk_discovery.py @@ -0,0 +1,122 @@ +import re + +from tap_tester import menagerie, connections + +from base import FreshdeskBaseTest + + +class TestFreshdeskDiscovery(FreshdeskBaseTest): + + def name(self): + return "tap_tester_freshdesk_discovery" + + def test_run(self): + """ + Testing that discovery creates the appropriate catalog with valid metadata. + • Verify number of actual streams discovered match expected + • Verify the stream names discovered were what we expect + • Verify stream names follow naming convention + streams should only have lowercase alphas and underscores + • Verify there is only 1 top level breadcrumb + • Verify primary key(s) + • Verify that primary keys are given the inclusion of automatic. + • Verify that all other fields have inclusion of available metadata. + """ + streams_to_test = self.expected_streams() + + conn_id = connections.ensure_connection(self) + + found_catalogs = self.run_and_verify_check_mode(conn_id) + # Verify stream names follow naming convention + # streams should only have lowercase alphas and underscores + found_catalog_names = {c['tap_stream_id'] for c in found_catalogs} + self.assertTrue(all([re.fullmatch(r"[a-z_]+", name) for name in found_catalog_names]), + msg="One or more streams don't follow standard naming") + + for stream in streams_to_test: + with self.subTest(stream=stream): + + # Verify ensure the catalog is found for a given stream + catalog = list([catalog for catalog in found_catalogs + if catalog["stream_name"] == stream])[0] + self.assertIsNotNone(catalog) + + # Collecting expected values + expected_primary_keys = self.expected_primary_keys()[stream] + expected_automatic_fields = self.expected_automatic_fields()[stream] + expected_replication_keys = self.expected_replication_keys()[stream] + expected_replication_method = self.expected_replication_method()[stream] + + # Collecting actual values + schema_and_metadata = menagerie.get_annotated_schema(conn_id, catalog['stream_id']) + metadata = schema_and_metadata["metadata"] + stream_properties = [item for item in metadata if item.get("breadcrumb") == []] + actual_primary_keys = set( + stream_properties[0].get( + "metadata", {self.PRIMARY_KEYS: []}).get(self.PRIMARY_KEYS, []) + ) + + actual_replication_keys = set( + stream_properties[0].get( + "metadata", {self.REPLICATION_KEYS: []}).get(self.REPLICATION_KEYS, []) + ) + + actual_replication_method = stream_properties[0].get( + "metadata", {self.REPLICATION_METHOD: None}).get(self.REPLICATION_METHOD) + + actual_automatic_fields = set( + item.get("breadcrumb", ["properties", None])[1] for item in metadata + if item.get("metadata").get("inclusion") == "automatic" + ) + + actual_fields = [] + for md_entry in metadata: + if md_entry['breadcrumb'] != []: + actual_fields.append(md_entry['breadcrumb'][1]) + + ########################################################################## + # Metadata assertions + ########################################################################## + + # Verify there is only 1 top-level breadcrumb in metadata + self.assertTrue(len(stream_properties) == 1, + msg="There is NOT only one top level breadcrumb for {}".format(stream) + + "\nstream_properties | {}".format(stream_properties)) + + # Verify there is no duplicate metadata entries + self.assertEqual(len(actual_fields), len( + set(actual_fields)), msg="duplicates in the fields retrieved") + + # Verify replication key(s) match expectations + self.assertEqual(expected_replication_keys, actual_replication_keys, + msg="expected replication key {} but actual is {}".format( + expected_replication_keys, actual_replication_keys)) + + # Verify primary key(s) match expectations + self.assertSetEqual( + expected_primary_keys, actual_primary_keys, + ) + + # Verify the replication method matches our expectations + self.assertEqual(expected_replication_method, actual_replication_method, + msg="The actual replication method {} doesn't match the expected {}".format( + actual_replication_method, expected_replication_method)) + + # Verify that if there is a replication key we are doing INCREMENTAL otherwise FULL + if expected_replication_keys: + self.assertEqual(self.INCREMENTAL,actual_replication_method) + else: + self.assertEqual(self.FULL_TABLE, actual_replication_method) + + # Verify that primary keys are given the inclusion of automatic in metadata. + self.assertSetEqual(expected_automatic_fields,actual_automatic_fields) + + # Verify that all other fields have the inclusion available + # This assumes there are no unsupported fields for SaaS sources + self.assertTrue( + all({item.get("metadata").get("inclusion") == "available" + for item in metadata + if item.get("breadcrumb", []) != [] + and item.get("breadcrumb", ["properties", None])[1] + not in actual_automatic_fields}), + msg="Not all non-key properties are set to available in metadata") diff --git a/tests/test_freshdesk_pagination.py b/tests/test_freshdesk_pagination.py index b62c7d1..1c893a1 100644 --- a/tests/test_freshdesk_pagination.py +++ b/tests/test_freshdesk_pagination.py @@ -1,58 +1,93 @@ -from tap_tester import menagerie, connections, runner -import re +from math import ceil +from tap_tester import connections, runner, LOGGER from base import FreshdeskBaseTest + class PaginationTest(FreshdeskBaseTest): def name(self): - return "tap_freshdesk_pagination_test" + return "tap_tester_freshdesk_pagination_test" def test_name(self): - print("Pagination Test for tap-freshdesk") - + LOGGER.info("Pagination Test for tap-freshdesk") + + def get_properties(self, *args, **kwargs): + """Override properties by passing page_size param.""" + props = super().get_properties(*args, **kwargs) + props['page_size'] = self.PAGE_SIZE + return props + def test_run(self): + """ + • Verify that for each stream you can get multiple pages of data. + This requires we ensure more than 1 page of data exists at all times for any given stream. + • Verify by pks that the data replicated matches the data we expect. + """ - # instantiate connection - conn_id = connections.ensure_connection(self) + # For roles stream data present in test account is limited. So, adding configurable page_size "2" + streams_to_test_1 = {"roles"} + self.run_test(streams_to_test_1, 2) - # Add supported streams 1 by 1 - streams_to_test = {'agents', 'tickets'} + streams_to_test_2 = self.expected_streams(only_trial_account_streams=True) - streams_to_test_1 + self.run_test(streams_to_test_2, 100) - # Run check mode - # Check mode has no catalog discovery for freshdesk - check_job_name = self.run_and_verify_check_mode(conn_id) + def run_test(self, expected_streams, page_size): - # Run sync mode - sync_record_count = self.run_and_verify_sync(conn_id) - sync_records = runner.get_records_from_target_output() + # Page size for pagination supported streams + self.PAGE_SIZE = page_size - # Test by stream - for stream in streams_to_test: - with self.subTest(stream=stream): - - record_count = sync_record_count.get(stream, 0) + # Instantiate connection + conn_id = connections.ensure_connection(self) - sync_messages = sync_records.get(stream, {'messages': []}).get('messages') + found_catalogs = self.run_and_verify_check_mode(conn_id) - primary_keys = self.expected_primary_keys().get(stream) + # Table and field selection + test_catalogs = [catalog for catalog in found_catalogs + if catalog.get('stream_name') in expected_streams] - # Verify the sync meets or exceeds the default record count - # for streams - conversations, time_entries, satisfaction_ratings, roles, groups, - # and companies creating test data is a challenge in freshdesk. These streams will - # be excluded from this assertion for now - # Spike created to address this issue : TDL - TODO + self.perform_and_verify_table_and_field_selection(conn_id, test_catalogs) - stream_page_size = self.expected_page_limits()[stream] - self.assertLess(stream_page_size, record_count) - print("stream_page_size: {} < record_count {} for stream: {}".format(stream_page_size, record_count, stream)) + sync_record_count = self.run_and_verify_sync(conn_id) + sync_records = runner.get_records_from_target_output() - # Verify there are no duplicates accross pages - records_pks_set = {tuple([message.get('data').get(primary_key) - for primary_key in primary_keys]) - for message in sync_messages} - records_pks_list = [tuple([message.get('data').get(primary_key) - for primary_key in primary_keys]) - for message in sync_messages] + # Verify no unexpected streams were replicated + synced_stream_names = set(sync_records.keys()) + self.assertSetEqual(expected_streams, synced_stream_names) - self.assertCountEqual(records_pks_set, records_pks_list, msg=f"We have duplicate records for {stream}") + # Test by stream + for stream in expected_streams: + with self.subTest(stream=stream): + # Expected values + expected_primary_keys = self.expected_primary_keys()[stream] + + # Collect information for assertions from syncs 1 & 2 base on expected values + record_count_sync = sync_record_count.get(stream, 0) + primary_keys_list = [tuple(message.get('data').get(expected_pk) + for expected_pk in expected_primary_keys) + for message in sync_records.get(stream).get('messages') + if message.get('action') == 'upsert'] + + # Verify that for each stream you can get multiple pages of data + self.assertGreater(record_count_sync, page_size, + msg="The number of records is not over the stream max limit") + + # Chunk the replicated records (just primary keys) into expected pages + pages = [] + page_count = ceil(len(primary_keys_list) / page_size) + for page_index in range(page_count): + page_start = page_index * page_size + page_end = (page_index + 1) * page_size + pages.append(set(primary_keys_list[page_start:page_end])) + + # Verify by primary keys that data is unique for each page + for current_index, current_page in enumerate(pages): + with self.subTest(current_page_primary_keys=current_page): + + for other_index, other_page in enumerate(pages): + if current_index == other_index: + continue # don't compare the page to itself + + self.assertTrue( + current_page.isdisjoint(other_page), msg=f'other_page_primary_keys={other_page}' + ) diff --git a/tests/test_freshdesk_parent_child_independent.py b/tests/test_freshdesk_parent_child_independent.py new file mode 100644 index 0000000..35b82c5 --- /dev/null +++ b/tests/test_freshdesk_parent_child_independent.py @@ -0,0 +1,42 @@ +from tap_tester import connections, runner + +from base import FreshdeskBaseTest + + +class ParentChildIndependentTest(FreshdeskBaseTest): + """ + Test case to verify that tap is working fine if only first-level child streams are selected + """ + + def name(self): + return "tap_tester_freshdesk_parent_child_test" + + def test_run(self): + """ + Testing that tap is working fine if only child streams are selected + • Verify that if only child streams are selected then only child streams are replicated. + """ + + # To collect "time_entries", "satisfaction_ratings"(child streams of "tickets") pro account is needed. + # Skipping them for now. + child_streams = {'conversations'} + + # Instantiate connection + conn_id = connections.ensure_connection(self) + + # Run check mode + found_catalogs = self.run_and_verify_check_mode(conn_id) + + # Table and field selection + catalog_entries = [catalog for catalog in found_catalogs + if catalog.get('tap_stream_id') in child_streams] + # Table and field selection + self.perform_and_verify_table_and_field_selection(conn_id, catalog_entries) + + # Run initial sync + self.run_and_verify_sync(conn_id) + synced_records = runner.get_records_from_target_output() + + # Verify no unexpected streams were replicated + synced_stream_names = set(synced_records.keys()) + self.assertSetEqual(child_streams, synced_stream_names) diff --git a/tests/test_freshdesk_parent_child_sync.py b/tests/test_freshdesk_parent_child_sync.py new file mode 100644 index 0000000..7bd9fa5 --- /dev/null +++ b/tests/test_freshdesk_parent_child_sync.py @@ -0,0 +1,130 @@ +from sre_parse import State +from tap_tester import menagerie, connections, runner + +from base import FreshdeskBaseTest + + +class FreshdeskBookmarks(FreshdeskBaseTest): + """Test to verify bookmark logic for parent-child sync.""" + + @staticmethod + def name(): + return "tap_tester_freshdesk_parent_child_sync" + + def test_run(self): + minimum_bookmark = "2022-08-01T10:07:03.000000Z" + maximum_bookmark = "2022-08-17T10:07:03.000000Z" + new_state = { + "bookmarks": { + "tickets": { + "updated_at": maximum_bookmark + }, + "tickets_deleted": { + "updated_at": maximum_bookmark + }, + "tickets_spam": { + "updated_at": maximum_bookmark + }, + "conversations": { + "updated_at": minimum_bookmark + } + } + } + stream_to_test = {'conversations'} + self.run_test(new_state, stream_to_test, + minimum_bookmark, maximum_bookmark) + + new_state = { + "bookmarks": { + "tickets": { + "updated_at": minimum_bookmark + }, + "tickets_deleted": { + "updated_at": minimum_bookmark + }, + "tickets_spam": { + "updated_at": minimum_bookmark + }, + "conversations": { + "updated_at": maximum_bookmark + } + } + } + stream_to_test = {'tickets'} + self.run_test(new_state, stream_to_test, + minimum_bookmark, maximum_bookmark) + + def run_test(self, new_state, stream_to_test, minimum_bookmark, maximum_bookmark): + """ + Test case to verify the working of parent-child streams + Prerequisite: + - Set child bookmark is earlier than parent bookmark + - Set Parent bookmark is earlier than child bookmark + + • Verify that minimum bookmark is used for selected parent-child stream. + • Verify that records between the bookmark values are replicated. + """ + + # To collect "time_entries", "satisfaction_ratings"(child streams of "tickets") pro account is needed. + # Skipping them for now. + expected_streams = {'tickets', 'conversations'} + + expected_replication_keys = self.expected_replication_keys() + + conn_id = connections.ensure_connection(self) + + menagerie.set_state(conn_id, new_state) + + # Run in check mode + found_catalogs = self.run_and_verify_check_mode(conn_id) + + # Table and field selection + catalog_entries = [catalog for catalog in found_catalogs + if catalog.get('tap_stream_id') in expected_streams] + + self.perform_and_verify_table_and_field_selection( + conn_id, catalog_entries) + + # Run a sync job using orchestrator + sync_record_count = self.run_and_verify_sync(conn_id) + sync_records = runner.get_records_from_target_output() + + for stream in expected_streams: + with self.subTest(stream=stream): + + # Collect information for assertions + sync_messages = [record.get('data') for record in + sync_records.get(stream, {'messages': []}).get('messages') + if record.get('action') == 'upsert'] + + replication_key = list(expected_replication_keys[stream])[0] + sync_start_date_ts = self.dt_to_ts(new_state.get("bookmarks", {stream: None}).get( + stream).get(replication_key), self.BOOKMARK_FORMAT) + + for record in sync_messages: + # Verify that the minimum bookmark is used for selected parent-child stream. + replication_key_value = self.dt_to_ts( + record.get(replication_key), self.BOOKMARK_FORMAT) + + # Verify that the records replicated for the selected streams are greater than or + # equal to given bookmark. + self.assertGreaterEqual( + replication_key_value, sync_start_date_ts, + msg="Sync records do not respect the provided bookmark." + ) + + # Verify that atleast 1 record is getting replicated for streams to test. + self.assertGreater(sync_record_count.get(stream, 0), 0) + + if stream in stream_to_test: + minimum_bookmark_value_ts = self.dt_to_ts(minimum_bookmark, self.BOOKMARK_FORMAT) + maximum_bookmark_value_ts = self.dt_to_ts(maximum_bookmark, self.BOOKMARK_FORMAT) + records_between_dates = [] + for record in sync_messages: + replication_key_value = self.dt_to_ts(record.get(replication_key), self.BOOKMARK_FORMAT) + + if minimum_bookmark_value_ts <= replication_key_value <= maximum_bookmark_value_ts: + records_between_dates.append(record) + + # Verify that records between the bookmark values are replicated for streams in streams to test. + self.assertIsNotNone(records_between_dates) diff --git a/tests/test_freshdesk_start_date.py b/tests/test_freshdesk_start_date.py index 4f23ea0..5673f48 100644 --- a/tests/test_freshdesk_start_date.py +++ b/tests/test_freshdesk_start_date.py @@ -1,170 +1,144 @@ -import os - -from tap_tester import connections, runner +from tap_tester import connections, runner, LOGGER from base import FreshdeskBaseTest class FreshdeskStartDateTest(FreshdeskBaseTest): - - start_date_1 = "" - start_date_2 = "" - test_streams = {} + """Test that the start_date configuration is respected""" @staticmethod def name(): return "tap_tester_freshdesk_start_date_test" - def get_properties(self, original: bool = True): - """Configuration properties required for the tap.""" - return_value = { - 'start_date' : '2019-01-06T00:00:00Z', - } - if original: - return return_value - - return_value["start_date"] = self.start_date - return return_value - def test_run(self): - """Instantiate start date according to the desired data set and run the test""" + """ + • Verify that a sync with a later start date has at least one record synced + and less records than the 1st sync with a previous start date + • Verify that each stream has less records than the earlier start date sync + • Verify all data from later start data has bookmark values >= start_date + • Verify that the minimum bookmark sent to the target for the later start_date sync + is greater than or equal to the start date + • Verify by primary key values, that all records in the 1st sync are included in the 2nd sync. + """ + + start_date_1 = self.get_properties().get('start_date') + start_date_2 = "2022-07-19T00:00:00Z" - self.start_date_1 = self.get_properties().get('start_date') - self.start_date_2 = self.timedelta_formatted(self.start_date_1, days=3*365+34) + self.start_date = start_date_1 - self.start_date = self.start_date_1 + start_date_1_epoch = self.dt_to_ts(start_date_1, self.START_DATE_FORMAT) + start_date_2_epoch = self.dt_to_ts(start_date_2, self.START_DATE_FORMAT) - # Excluding broken streams: time_settings, satisfaction_ratings - # TODO spike on the two 403 streams above - test_streams = {'agents', 'companies', 'groups', 'tickets', 'conversations', 'roles'} - self.test_streams = test_streams - obey_start_date_streams = {'agents', 'companies', 'groups', 'roles', 'tickets', 'conversations'} + expected_streams = self.expected_streams(only_trial_account_streams=True) ########################################################################## - ### First Sync + # First Sync ########################################################################## - # instantiate connection + # Instantiate connection conn_id_1 = connections.ensure_connection(self) - # run check mode - check_job_name_1 = self.run_and_verify_check_mode(conn_id_1) + # Run check mode + found_catalogs_1 = self.run_and_verify_check_mode(conn_id_1) + + # Table and field selection + test_catalogs_1_all_fields = [catalog for catalog in found_catalogs_1 + if catalog.get('stream_name') in expected_streams] + self.perform_and_verify_table_and_field_selection( + conn_id_1, test_catalogs_1_all_fields, select_all_fields=True) - # run initial sync + # Run initial sync record_count_by_stream_1 = self.run_and_verify_sync(conn_id_1) synced_records_1 = runner.get_records_from_target_output() - # Update based on sync data - first_sync_empty = self.test_streams - synced_records_1.keys() - if len(first_sync_empty) > 0: - print("Missing stream: {} in sync 1. Failing test for stream(s). Add test data?".format(first_sync_empty)) - self.first_sync_empty = first_sync_empty - first_sync_bonus = synced_records_1.keys() - self.test_streams - if len(first_sync_bonus) > 0: - print("Found stream: {} in first sync. Add to test_streams?".format(first_sync_bonus)) - ########################################################################## - ### Update START DATE Between Syncs + # Update START DATE Between Syncs ########################################################################## - print("REPLICATION START DATE CHANGE: {} ===>>> {} ".format(self.start_date, self.start_date_2)) - self.start_date = self.start_date_2 + LOGGER.info("REPLICATION START DATE CHANGE: {} ===>>> {} ".format( + self.start_date, start_date_2)) + self.start_date = start_date_2 ########################################################################## - ### Second Sync + # Second Sync ########################################################################## - # create a new connection with the new start_date + # Create a new connection with the new start_date conn_id_2 = connections.ensure_connection(self, original_properties=False) - # run check mode - check_job_name_2 = self.run_and_verify_check_mode(conn_id_2) + # Run check mode + found_catalogs_2 = self.run_and_verify_check_mode(conn_id_2) + + # Table and field selection + test_catalogs_2_all_fields = [catalog for catalog in found_catalogs_2 + if catalog.get('stream_name') in expected_streams] + self.perform_and_verify_table_and_field_selection( + conn_id_2, test_catalogs_2_all_fields, select_all_fields=True) - # run sync + # Run sync record_count_by_stream_2 = self.run_and_verify_sync(conn_id_2) synced_records_2 = runner.get_records_from_target_output() - # Update based on sync data - second_sync_empty = self.test_streams - synced_records_2.keys() - if len(second_sync_empty) > 0: - print("Missing stream(s): {} in sync 2. Updating expectations"\ - .format(second_sync_empty)) - self.second_sync_empty = second_sync_empty - second_sync_bonus = synced_records_2.keys() - self.test_streams - if len(second_sync_bonus) > 0: - print("Found stream(s): {} in second sync. Add to test_streams?".format(second_sync_bonus)) - - for stream in test_streams: - with self.subTest(stream=stream): - - if stream in self.first_sync_empty: - self.assertTrue(False, msg="Stream: {} missing from sync 1".format(stream)) - - continue + # Verify that sync 2 has at least one record synced and less records than sync 1 + self.assertGreater(sum(record_count_by_stream_2.values()), 0) + self.assertGreater(sum(record_count_by_stream_1.values()),sum(record_count_by_stream_2.values())) - if stream in self.second_sync_empty: - if stream == 'roles': - self.assertTrue(True, msg="Expected 0 records for stream {}".format(stream)) - print("No sync 2 data to compare for stream: {}, start_date obeyed".format(stream)) - - continue - - else: - self.assertTrue(False, msg="Sync 2 empty for stream: {}".format(stream)) - - continue + for stream in expected_streams: + with self.subTest(stream=stream): - # expected values + # Expected values expected_primary_keys = self.expected_primary_keys()[stream] - expected_start_date_1 = self.timedelta_formatted(self.start_date_1, days=0) # Insight buffer format saved - expected_start_date_2 = self.timedelta_formatted(self.start_date_2, days=0) + expected_replication_keys = self.expected_replication_keys()[stream] + expected_metadata = self.expected_metadata()[stream] - # collect information for assertions from syncs 1 & 2 base on expected values + # Collect information for assertions from syncs 1 & 2 base on expected values record_count_sync_1 = record_count_by_stream_1.get(stream, 0) record_count_sync_2 = record_count_by_stream_2.get(stream, 0) - primary_keys_list_1 = [tuple(message.get('data').get(expected_pk) for expected_pk in expected_primary_keys) - for message in synced_records_1.get(stream).get('messages') + primary_keys_list_1 = [tuple(message.get('data', {}).get(expected_pk) for expected_pk in expected_primary_keys) + for message in synced_records_1.get(stream, {'messages': []}).get('messages') if message.get('action') == 'upsert'] - primary_keys_list_2 = [tuple(message.get('data').get(expected_pk) for expected_pk in expected_primary_keys) - for message in synced_records_2.get(stream).get('messages') + primary_keys_list_2 = [tuple(message.get('data', {}).get(expected_pk) for expected_pk in expected_primary_keys) + for message in synced_records_2.get(stream, {'messages': []}).get('messages') if message.get('action') == 'upsert'] + primary_keys_sync_1 = set(primary_keys_list_1) primary_keys_sync_2 = set(primary_keys_list_2) - if stream in obey_start_date_streams: - print("Stream {} obeys start_date".format(stream)) - # collect information specific to incremental streams from syncs 1 & 2 - expected_replication_key = next(iter(self.expected_replication_keys().get(stream))) - replication_dates_1 =[row.get('data').get(expected_replication_key) for row in - synced_records_1.get(stream, {'messages': []}).get('messages', []) - if row.get('data')] - replication_dates_2 =[row.get('data').get(expected_replication_key) for row in - synced_records_2.get(stream, {'messages': []}).get('messages', []) - if row.get('data')] - - # Verify replication key is greater or equal to start_date for sync 1 - for replication_date in replication_dates_1: - self.assertGreaterEqual(replication_date, expected_start_date_1, - msg="Report pertains to a date prior to our start date.\n" + - "Sync start_date: {}\n".format(expected_start_date_1) + - "Record date: {} ".format(replication_date) + # Verify that sync 2 has at least one record synced + self.assertGreater(record_count_sync_2, 0) + + if expected_metadata.get(self.OBEYS_START_DATE): + + # Expected bookmark key is one element in set so directly access it + bookmark_keys_list_1 = [message.get('data').get(list(expected_replication_keys))[0] for message in synced_records_1.get(stream).get('messages') + if message.get('action') == 'upsert'] + bookmark_keys_list_2 = [message.get('data').get(list(expected_replication_keys))[0] for message in synced_records_2.get(stream).get('messages') + if message.get('action') == 'upsert'] + + bookmark_key_sync_1 = set(bookmark_keys_list_1) + bookmark_key_sync_2 = set(bookmark_keys_list_2) + + # Verify bookmark key values are greater than or equal to start date of sync 1 + for bookmark_key_value in bookmark_key_sync_1: + self.assertGreaterEqual( + self.dt_to_ts(bookmark_key_value, self.BOOKMARK_FORMAT), start_date_1_epoch, + msg="Report pertains to a date prior to our start date.\n" + + "Sync start_date: {}\n".format(start_date_1) + + "Record date: {} ".format(bookmark_key_value) ) - # Verify replication key is greater or equal to start_date for sync 2 - for replication_date in replication_dates_2: - self.assertGreaterEqual(replication_date, expected_start_date_2, - msg="Report pertains to a date prior to our start date.\n" + - "Sync start_date: {}\n".format(expected_start_date_2) + - "Record date: {} ".format(replication_date) + # Verify bookmark key values are greater than or equal to start date of sync 2 + for bookmark_key_value in bookmark_key_sync_2: + self.assertGreaterEqual( + self.dt_to_ts(bookmark_key_value, self.BOOKMARK_FORMAT), start_date_2_epoch, + msg="Report pertains to a date prior to our start date.\n" + + "Sync start_date: {}\n".format(start_date_2) + + "Record date: {} ".format(bookmark_key_value) ) # Verify the number of records replicated in sync 1 is greater than the number - # of records replicated in sync 2 - - if stream == 'roles': - self.assertEqual(record_count_sync_1, record_count_sync_2) - else: - self.assertGreater(record_count_sync_1, record_count_sync_2) + # of records replicated in sync 2 for stream + self.assertGreater(record_count_sync_1,record_count_sync_2) # Verify the records replicated in sync 2 were also replicated in sync 1 self.assertTrue(primary_keys_sync_2.issubset(primary_keys_sync_1)) @@ -172,7 +146,7 @@ def test_run(self): # Currently all streams obey start date. Leaving this in incase one of the two remaining # streams are implemented in the future and do not obey start date # else: - # print("Stream {} does NOT obey start_date".format(stream)) + # LOGGER.info("Stream {} does NOT obey start_date".format(stream)) # # Verify that the 2nd sync with a later start date replicates the same number of # # records as the 1st sync. # self.assertEqual(record_count_sync_2, record_count_sync_1)