diff --git a/tests/base_hubspot.py b/tests/base_hubspot.py new file mode 100644 index 00000000..6212e7b6 --- /dev/null +++ b/tests/base_hubspot.py @@ -0,0 +1,383 @@ +import os +import unittest +from datetime import datetime as dt +from datetime import timedelta + +import tap_tester.menagerie as menagerie +import tap_tester.connections as connections +import tap_tester.runner as runner +from tap_tester.base_suite_tests.base_case import BaseCase +from tap_tester import LOGGER + + +class HubspotBaseCase(BaseCase): + + BASIC_DATE_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ" + + EXPECTED_PAGE_SIZE = "expected-page-size" + OBEYS_START_DATE = "obey-start-date" + PARENT_STREAM = "parent-stream" + + ####################################### + # Tap Configurable Metadata Methods # + ####################################### + + def setUp(self): + missing_envs = [x for x in [ + 'TAP_HUBSPOT_REDIRECT_URI', + 'TAP_HUBSPOT_CLIENT_ID', + 'TAP_HUBSPOT_CLIENT_SECRET', + 'TAP_HUBSPOT_REFRESH_TOKEN' + ] if os.getenv(x) is None] + if missing_envs: + raise Exception("Missing environment variables: {}".format(missing_envs)) + + @staticmethod + def get_type(): + return "platform.hubspot" + + @staticmethod + def tap_name(): + return "tap-hubspot" + + def get_properties(self): + start_date = dt.today() - timedelta(days=1) + start_date_with_fmt = dt.strftime(start_date, self.START_DATE_FORMAT) + + return {'start_date' : start_date_with_fmt} + + def get_credentials(self): + return {'refresh_token': os.getenv('TAP_HUBSPOT_REFRESH_TOKEN'), + 'client_secret': os.getenv('TAP_HUBSPOT_CLIENT_SECRET'), + 'redirect_uri': os.getenv('TAP_HUBSPOT_REDIRECT_URI'), + 'client_id': os.getenv('TAP_HUBSPOT_CLIENT_ID')} + + def expected_check_streams(self): + return set(self.expected_metadata().keys()) + + @classmethod + def expected_metadata(cls): # DOCS_BUG https://stitchdata.atlassian.net/browse/DOC-1523) + """The expected streams and metadata about the streams""" + return { + "campaigns": { + BaseCase.PRIMARY_KEYS: {"id"}, + BaseCase.REPLICATION_METHOD: BaseCase.FULL_TABLE, + HubspotBaseCase.OBEYS_START_DATE: False + }, + "companies": { + BaseCase.PRIMARY_KEYS: {"companyId"}, + BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL, + BaseCase.REPLICATION_KEYS: {"property_hs_lastmodifieddate"}, + HubspotBaseCase.EXPECTED_PAGE_SIZE: 250, + HubspotBaseCase.OBEYS_START_DATE: True + }, + "contact_lists": { + BaseCase.PRIMARY_KEYS: {"listId"}, + BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL, + BaseCase.REPLICATION_KEYS: {"updatedAt"}, + HubspotBaseCase.EXPECTED_PAGE_SIZE: 250, + HubspotBaseCase.OBEYS_START_DATE: True + }, + "contacts": { + BaseCase.PRIMARY_KEYS: {"vid"}, + BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL, + BaseCase.REPLICATION_KEYS: {"versionTimestamp"}, + HubspotBaseCase.EXPECTED_PAGE_SIZE: 100, + HubspotBaseCase.OBEYS_START_DATE: True + }, + "contacts_by_company": { + BaseCase.PRIMARY_KEYS: {"company-id", "contact-id"}, + BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL, + HubspotBaseCase.EXPECTED_PAGE_SIZE: 100, + HubspotBaseCase.OBEYS_START_DATE: True, + HubspotBaseCase.PARENT_STREAM: 'companies' + }, + "deal_pipelines": { + BaseCase.PRIMARY_KEYS: {"pipelineId"}, + BaseCase.REPLICATION_METHOD: BaseCase.FULL_TABLE, + HubspotBaseCase.OBEYS_START_DATE: False, + }, + "deals": { + BaseCase.PRIMARY_KEYS: {"dealId"}, + BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL, + BaseCase.REPLICATION_KEYS: {"property_hs_lastmodifieddate"}, + HubspotBaseCase.OBEYS_START_DATE: True + }, + "email_events": { + BaseCase.PRIMARY_KEYS: {"id"}, + BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL, + BaseCase.REPLICATION_KEYS: {"startTimestamp"}, + HubspotBaseCase.EXPECTED_PAGE_SIZE: 1000, + HubspotBaseCase.OBEYS_START_DATE: True + }, + "engagements": { + BaseCase.PRIMARY_KEYS: {"engagement_id"}, + BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL, + BaseCase.REPLICATION_KEYS: {"lastUpdated"}, + HubspotBaseCase.EXPECTED_PAGE_SIZE: 250, + HubspotBaseCase.OBEYS_START_DATE: True + }, + "forms": { + BaseCase.PRIMARY_KEYS: {"guid"}, + BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL, + BaseCase.REPLICATION_KEYS: {"updatedAt"}, + HubspotBaseCase.OBEYS_START_DATE: True + }, + "owners": { + BaseCase.PRIMARY_KEYS: {"ownerId"}, + BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL, + BaseCase.REPLICATION_KEYS: {"updatedAt"}, + HubspotBaseCase.OBEYS_START_DATE: True # TODO is this a BUG? + }, + "subscription_changes": { + BaseCase.PRIMARY_KEYS: {"timestamp", "portalId", "recipient"}, + BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL, + BaseCase.REPLICATION_KEYS: {"startTimestamp"}, + HubspotBaseCase.EXPECTED_PAGE_SIZE: 1000, + HubspotBaseCase.OBEYS_START_DATE: True + }, + "workflows": { + BaseCase.PRIMARY_KEYS: {"id"}, + BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL, + BaseCase.REPLICATION_KEYS: {"updatedAt"}, + HubspotBaseCase.OBEYS_START_DATE: True + }, + "tickets": { + BaseCase.PRIMARY_KEYS: {"id"}, + BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL, + BaseCase.REPLICATION_KEYS: {"updatedAt"}, + HubspotBaseCase.EXPECTED_PAGE_SIZE: 100, + HubspotBaseCase.OBEYS_START_DATE: True + } + } + + ############################# + # Common Metadata Methods # + ############################# + + def expected_primary_keys(self): + """ + return a dictionary with key of table name + and value as a set of primary key fields + """ + return {table: properties.get(self.PRIMARY_KEYS, set()) + for table, properties + in self.expected_metadata().items()} + + + def expected_automatic_fields(self): + """ + return a dictionary with key of table name and value as the primary keys and replication keys + """ + pks = self.expected_primary_keys() + rks = self.expected_replication_keys() + + return {stream: rks.get(stream, set()) | pks.get(stream, set()) + for stream in self.expected_streams()} + + + def expected_replication_method(self): + """return a dictionary with key of table name and value of replication method""" + return {table: properties.get(self.REPLICATION_METHOD, None) + for table, properties + in self.expected_metadata().items()} + + def expected_streams(self): + """A set of expected stream names""" + return set(self.expected_metadata().keys()) + + def expected_replication_keys(self): + """ + return a dictionary with key of table name + and value as a set of replication key fields + """ + return {table: properties.get(self.REPLICATION_KEYS, set()) + for table, properties + in self.expected_metadata().items()} + + def expected_page_limits(self): + return {table: properties.get(self.EXPECTED_PAGE_SIZE, set()) + for table, properties + in self.expected_metadata().items()} + + def expected_primary_keys(self): + + """ + return a dictionary with key of table name + and value as a set of primary key fields + """ + return {table: properties.get(self.PRIMARY_KEYS, set()) + for table, properties + in self.expected_metadata().items()} + + def expected_automatic_fields(self): + auto_fields = {} + for k, v in self.expected_metadata().items(): + auto_fields[k] = v.get(self.PRIMARY_KEYS, set()) | v.get(self.REPLICATION_KEYS, set()) + return auto_fields + + ########################## + # Common Test Actions # + ########################## + + def create_connection_and_run_check(self, original_properties: bool = True): + """Create a new connection with the test name""" + # Create the connection + conn_id = connections.ensure_connection(self, original_properties) + + # Run a check job using orchestrator (discovery) + check_job_name = runner.run_check_mode(self, conn_id) + + # Assert that the check job succeeded + exit_status = menagerie.get_exit_status(conn_id, check_job_name) + menagerie.verify_check_exit_status(self, exit_status, check_job_name) + return conn_id + + def run_and_verify_check_mode(self, conn_id): + """ + Run the tap in check mode and verify it succeeds. + This should be ran prior to field selection and initial sync. + + Return the connection id and found catalogs from menagerie. + """ + # run in check mode + check_job_name = runner.run_check_mode(self, conn_id) + + # verify check exit codes + exit_status = menagerie.get_exit_status(conn_id, check_job_name) + menagerie.verify_check_exit_status(self, exit_status, check_job_name) + + found_catalogs = menagerie.get_catalogs(conn_id) + self.assertGreater(len(found_catalogs), 0, msg="unable to locate schemas for connection {}".format(conn_id)) + + found_catalog_names = set(map(lambda c: c['tap_stream_id'], found_catalogs)) + self.assertSetEqual(self.expected_check_streams(), found_catalog_names, + msg="discovered schemas do not match") + LOGGER.info("discovered schemas are OK") + + return found_catalogs + + def run_and_verify_sync(self, conn_id): + """ + Run a sync job and make sure it exited properly. + Return a dictionary with keys of streams synced + and values of records synced for each stream + """ + # Run a sync job using orchestrator + sync_job_name = runner.run_sync_mode(self, conn_id) + + # Verify tap and target exit codes + exit_status = menagerie.get_exit_status(conn_id, sync_job_name) + menagerie.verify_sync_exit_status(self, exit_status, sync_job_name) + + # Verify actual rows were synced + sync_record_count = runner.examine_target_output_file(self, + conn_id, + self.expected_streams(), + self.expected_primary_keys()) + total_row_count = sum(sync_record_count.values()) + self.assertGreater(total_row_count, 0, + msg="failed to replicate any data: {}".format(sync_record_count)) + LOGGER.info("total replicated row count: %s", total_row_count) + + return sync_record_count + + def perform_and_verify_table_and_field_selection(self, + conn_id, + test_catalogs, + select_all_fields=True): + """ + Perform table and field selection based off of the streams to select + set and field selection parameters. + + Verify this results in the expected streams selected and all or no + fields selected for those streams. + """ + + # Select all available fields or select no fields from all testable streams + self.select_all_streams_and_fields( + conn_id=conn_id, catalogs=test_catalogs, select_all_fields=select_all_fields + ) + + catalogs = menagerie.get_catalogs(conn_id) + + # Ensure our selection affects the catalog + expected_selected = [tc.get('tap_stream_id') for tc in test_catalogs] + for cat in catalogs: + catalog_entry = menagerie.get_annotated_schema(conn_id, cat['stream_id']) + + # Verify all testable streams are selected + selected = catalog_entry.get('annotated-schema').get('selected') + LOGGER.info("Validating selection on %s: %s", cat['stream_name'], selected) + if cat['stream_name'] not in expected_selected: + self.assertFalse(selected, msg="Stream selected, but not testable.") + continue # Skip remaining assertions if we aren't selecting this stream + self.assertTrue(selected, msg="Stream not selected.") + + if select_all_fields: + # Verify all fields within each selected stream are selected + for field, field_props in catalog_entry.get('annotated-schema').get('properties').items(): + field_selected = field_props.get('selected') + LOGGER.info("\tValidating selection on %s.%s: %s", + cat['stream_name'], field, field_selected) + self.assertTrue(field_selected, msg="Field not selected.") + else: + # Verify only automatic fields are selected + expected_automatic_fields = self.expected_automatic_fields().get(cat['tap_stream_id']) + selected_fields = self.get_selected_fields_from_metadata(catalog_entry['metadata']) + self.assertEqual(expected_automatic_fields, selected_fields) + + @staticmethod + def get_selected_fields_from_metadata(metadata): + selected_fields = set() + for field in metadata: + is_field_metadata = len(field['breadcrumb']) > 1 + inclusion_automatic_or_selected = (field['metadata'].get('inclusion') == 'automatic' + or field['metadata'].get('selected') is True) + if is_field_metadata and inclusion_automatic_or_selected: + selected_fields.add(field['breadcrumb'][1]) + return selected_fields + + @staticmethod + def select_all_streams_and_fields(conn_id, catalogs, select_all_fields: bool = True): + """Select all streams and all fields within streams""" + for catalog in catalogs: + schema = menagerie.get_annotated_schema(conn_id, catalog['stream_id']) + + non_selected_properties = [] + if not select_all_fields: + # get a list of all properties so that none are selected + non_selected_properties = schema.get('annotated-schema', {}).get( + 'properties', {}).keys() + + connections.select_catalog_and_fields_via_metadata( + conn_id, catalog, schema, [], non_selected_properties) + + def timedelta_formatted(self, dtime, days=0, str_format="%Y-%m-%dT00:00:00Z"): + date_stripped = dt.strptime(dtime, str_format) + return_date = date_stripped + timedelta(days=days) + + return dt.strftime(return_date, str_format) + + ################################ + # Tap Specific Test Actions # + ################################ + + def datetime_from_timestamp(self, value, str_format="%Y-%m-%dT00:00:00Z"): + """ + Takes in a unix timestamp in milliseconds. + Returns a string formatted python datetime + """ + try: + datetime_value = dt.fromtimestamp(value) + datetime_str = dt.strftime(datetime_value, str_format) + except ValueError as err: + raise NotImplementedError( + f"Invalid argument 'value': {value} " + "This method was designed to accept unix timestamps in milliseconds." + ) + return datetime_str + + def is_child(self, stream): + """return true if this stream is a child stream""" + return self.expected_metadata()[stream].get(self.PARENT_STREAM) is not None diff --git a/tests/client.py b/tests/client.py index 94c27a94..0e9fa25e 100644 --- a/tests/client.py +++ b/tests/client.py @@ -35,6 +35,14 @@ def giveup(exc): jitter=None, giveup=giveup, interval=10) + + def get_custom_property(self, url, params=dict()): + """Perform a GET using the standard requests method and logs the action""" + """Do not handle exception for get failure in custom contact properties""" + response = requests.get(url, params=params, headers=self.HEADERS) + LOGGER.info(f"TEST CLIENT | GET {url} params={params} STATUS: {response.status_code}") + return response + def get(self, url, params=dict()): """Perform a GET using the standard requests method and logs the action""" response = requests.get(url, params=params, headers=self.HEADERS) @@ -773,6 +781,111 @@ def create(self, stream, company_ids=[], subscriptions=[], times=1): else: raise NotImplementedError(f"There is no create_{stream} method in this dipatch!") + def delete_custom_contacts_property(self): + """ Clearup the custom contact properties before CRUD test """ + custom_properties = ['custom_string', 'custom_number', 'custom_date', 'custom_datetime', 'multi_pick'] + for property in custom_properties: + get_url = f"{BASE_URL}/properties/v1/contacts/properties/named/" + property + response = self.get_custom_property(get_url) + if ( response.status_code == 404 ): + continue + url = f"{BASE_URL}/properties/v1/contacts/properties/named/" + property + response = self.delete(url) + LOGGER.info("response is %s", response) + + def create_custom_contact_properties(self): + """Create custom contact properties of all the types""" + self.delete_custom_contacts_property() + + url = f"{BASE_URL}/properties/v1/contacts/properties" + data1 = { + "name": "custom_string", + "label": "A New String Custom Property", + "description": "A new string property for you", + "groupName": "contactinformation", + "type": "string", + "fieldType": "text", + "formField": True, + "displayOrder": 6, + "options": [ + ] + } + + data2 = { + "name": "custom_number", + "label": "A New Number Custom Property", + "description": "A new number property for you", + "groupName": "contactinformation", + "type": "number", + "fieldType": "text", + "formField": True, + "displayOrder": 7, + "options": [ + ] + } + + data3 = { + "name": "custom_date", + "label": "A New Date Custom Property", + "description": "A new date property for you", + "groupName": "contactinformation", + "type": "date", + "fieldType": "text", + "formField": True, + "displayOrder": 9, + "options": [ + ] + } + + data4 = { + "name": "custom_datetime", + "label": "A New Datetime Custom Property", + "description": "A new datetime property for you", + "groupName": "contactinformation", + "type": "datetime", + "fieldType": "text", + "formField": True, + "displayOrder": 10, + "options": [ + ] + } + data5 = { + "name": "multi_pick", + "label": "multi pick", + "description": "multi select picklist test", + "groupName": "contactinformation", + "type": "enumeration", + "fieldType": "checkbox", + "hidden": False, + "options": [ + { + "label": "Option A", + "value": "option_a" + }, + { + "label": "Option B", + "value": "option_b" + }, + { + "label": "Option C", + "value": "option_c" + } + ], + "formField": True + } + + # generate a contacts record + response = self.post(url, data1) + LOGGER.info("response is %s", response) + response = self.post(url, data2) + LOGGER.info("response is %s", response) + response = self.post(url, data3) + LOGGER.info("response is %s", response) + response = self.post(url, data4) + LOGGER.info("response is %s", response) + response = self.post(url, data5) + LOGGER.info("response is %s", response) + def create_contacts(self): """ Generate a single contacts record. @@ -783,6 +896,14 @@ def create_contacts(self): url = f"{BASE_URL}/contacts/v1/contact" data = { "properties": [ + { + "property": "custom_string", + "value": "custom_string_value" + }, + { + "property": "custom_number", + "value": 1567 + }, { "property": "email", "value": f"{record_uuid}@stitchdata.com" diff --git a/tests/test_hubspot_bookmarks.py b/tests/test_hubspot_bookmarks.py index fa8a11fb..72e42497 100644 --- a/tests/test_hubspot_bookmarks.py +++ b/tests/test_hubspot_bookmarks.py @@ -53,6 +53,9 @@ def create_test_data(self, expected_streams): for stream in expected_streams} for stream in expected_streams - {'contacts_by_company'}: + # Create custom properties for contacts + if stream == 'contacts': + self.test_client.create_custom_contact_properties() if stream == 'email_events': email_records = self.test_client.create(stream, times=3) self.expected_records['email_events'] += email_records @@ -61,7 +64,6 @@ def create_test_data(self, expected_streams): for _ in range(3): record = self.test_client.create(stream) self.expected_records[stream] += record - if 'contacts_by_company' in expected_streams: # do last company_ids = [record['companyId'] for record in self.expected_records['companies']] contact_records = self.expected_records['contacts'] diff --git a/tests/test_hubspot_newfw_all_fields.py b/tests/test_hubspot_newfw_all_fields.py new file mode 100644 index 00000000..1e6345d8 --- /dev/null +++ b/tests/test_hubspot_newfw_all_fields.py @@ -0,0 +1,312 @@ +import unittest +from tap_tester.base_suite_tests.all_fields_test import AllFieldsTest +from tap_tester.logger import LOGGER +from base_hubspot import HubspotBaseCase +from client import TestClient + +def get_matching_actual_record_by_pk(expected_primary_key_dict, actual_records): + ret_records = [] + can_save = True + for record in actual_records: + for key, value in expected_primary_key_dict.items(): + actual_value = record[key] + if actual_value != value: + can_save = False + break + if can_save: + ret_records.append(record) + can_save = True + return ret_records + +FIELDS_ADDED_BY_TAP = { + # In 'contacts' streams 'versionTimeStamp' is not available in response of the second call. + # In the 1st call, Tap retrieves records of all contacts and from those records, it collects vids(id of contact). + # These records contain the versionTimestamp field. + # In the 2nd call, vids collected from the 1st call will be used to retrieve the whole contact record. + # Here, the records collected for detailed contact information do not contain the versionTimestamp field. + # So, we add the versionTimestamp field(fetched from 1st call records) explicitly in the record of 2nd call. + "contacts": { "versionTimestamp" } +} + +KNOWN_EXTRA_FIELDS = { + 'deals': { + # BUG_TDL-14993 | https://jira.talendforge.org/browse/TDL-14993 + # Has an value of object with key 'value' and value 'Null' + 'property_hs_date_entered_1258834', + 'property_hs_time_in_example_stage1660743867503491_315775040' + }, +} + +KNOWN_MISSING_FIELDS = { + 'contacts':{ # BUG https://jira.talendforge.org/browse/TDL-16016 + 'property_hs_latest_source', + 'property_hs_latest_source_data_1', + 'property_hs_latest_source_data_2', + 'property_hs_latest_source_timestamp', + 'property_hs_timezone', + 'property_hs_v2_cumulative_time_in_lead', + 'property_hs_v2_cumulative_time_in_opportunity', + 'property_hs_v2_cumulative_time_in_subscriber', + 'property_hs_v2_date_entered_customer', + 'property_hs_v2_date_entered_lead', + 'property_hs_v2_date_entered_opportunity', + 'property_hs_v2_date_entered_subscriber', + 'property_hs_v2_date_exited_lead', + 'property_hs_v2_date_exited_opportunity', + 'property_hs_v2_date_exited_subscriber', + 'property_hs_v2_latest_time_in_lead', + 'property_hs_v2_latest_time_in_opportunity', + 'property_hs_v2_latest_time_in_subscriber', + }, + 'contact_lists': { # BUG https://jira.talendforge.org/browse/TDL-14996 + 'authorId', + 'teamIds', + 'internal', + 'ilsFilterBranch', + 'limitExempt', + }, + 'email_events': { # BUG https://jira.talendforge.org/browse/TDL-14997 + 'portalSubscriptionStatus', + 'attempt', + 'source', + 'subscriptions', + 'sourceId', + 'replyTo', + 'suppressedMessage', + 'bcc', + 'suppressedReason', + 'cc', + }, + 'engagements': { # BUG https://jira.talendforge.org/browse/TDL-14997 + 'scheduledTasks', + }, + 'workflows': { # BUG https://jira.talendforge.org/browse/TDL-14998 + 'migrationStatus', + 'updateSource', + 'description', + 'originalAuthorUserId', + 'lastUpdatedByUserId', + 'creationSource', + 'portalId', + 'contactCounts', + }, + 'owners': { # BUG https://jira.talendforge.org/browse/TDL-15000 + 'activeSalesforceId' + }, + 'forms': { # BUG https://jira.talendforge.org/browse/TDL-15001 + 'alwaysCreateNewCompany', + 'themeColor', + 'publishAt', + 'editVersion', + 'embedVersion', + 'themeName', + 'style', + 'thankYouMessageJson', + 'createMarketableContact', + 'kickbackEmailWorkflowId', + 'businessUnitId', + 'portableKey', + 'parentId', + 'kickbackEmailsJson', + 'unpublishAt', + 'internalUpdatedAt', + 'multivariateTest', + 'publishedAt', + 'customUid', + 'isPublished', + 'paymentSessionTemplateIds', + 'selectedExternalOptions', + }, + 'companies': { # BUG https://jira.talendforge.org/browse/TDL-15003 + 'mergeAudits', + 'stateChanges', + 'isDeleted', + 'additionalDomains', + 'property_hs_analytics_latest_source', + 'property_hs_analytics_latest_source_data_2', + 'property_hs_analytics_latest_source_data_1', + 'property_hs_analytics_latest_source_timestamp', + }, + 'campaigns': { # BUG https://jira.talendforge.org/browse/TDL-15003 + 'lastProcessingStateChangeAt', + 'lastProcessingFinishedAt', + 'processingState', + 'lastProcessingStartedAt', + }, + 'deals': { # BUG https://jira.talendforge.org/browse/TDL-14999 + 'imports', + 'property_hs_num_associated_deal_splits', + 'property_hs_is_deal_split', + 'stateChanges', + 'property_hs_num_associated_active_deal_registrations', + 'property_hs_num_associated_deal_registrations', + 'property_hs_analytics_latest_source', + 'property_hs_analytics_latest_source_timestamp_contact', + 'property_hs_analytics_latest_source_data_1_contact', + 'property_hs_analytics_latest_source_timestamp', + 'property_hs_analytics_latest_source_data_1', + 'property_hs_analytics_latest_source_contact', + 'property_hs_analytics_latest_source_company', + 'property_hs_analytics_latest_source_data_1_company', + 'property_hs_analytics_latest_source_data_2_company', + 'property_hs_analytics_latest_source_data_2', + 'property_hs_analytics_latest_source_data_2_contact', + }, + 'subscription_changes':{ + 'normalizedEmailId' + } +} + +class HubspotAllFieldsTest(AllFieldsTest, HubspotBaseCase): + """Hubspot all fields test implementation """ + + @staticmethod + def name(): + return "tt_hubspot_all_fields" + + def streams_to_test(self): + """expected streams minus the streams not under test""" + return self.expected_streams().difference({ + 'owners', + 'subscription_changes', # BUG_TDL-14938 https://jira.talendforge.org/browse/TDL-14938 + }) + + def setUp(self): + self.maxDiff = None # see all output in failure + + test_client = TestClient(start_date=self.get_properties()['start_date']) + self.expected_records = dict() + streams = self.streams_to_test() + stream_to_run_last = 'contacts_by_company' + if stream_to_run_last in streams: + streams.remove(stream_to_run_last) + streams = list(streams) + streams.append(stream_to_run_last) + + for stream in streams: + # Get all records + if stream == 'contacts_by_company': + company_ids = [company['companyId'] for company in self.expected_records['companies']] + self.expected_records[stream] = test_client.read(stream, parent_ids=company_ids) + else: + self.expected_records[stream] = test_client.read(stream) + + for stream, records in self.expected_records.items(): + LOGGER.info("The test client found %s %s records.", len(records), stream) + + self.convert_datatype(self.expected_records) + super().setUp() + + def convert_datatype(self, expected_records): + for stream, records in expected_records.items(): + for record in records: + + # convert timestamps to string formatted datetime + timestamp_keys = {'timestamp'} + for key in timestamp_keys: + timestamp = record.get(key) + if timestamp: + unformatted = datetime.datetime.fromtimestamp(timestamp/1000) + formatted = datetime.datetime.strftime(unformatted, self.BASIC_DATE_FORMAT) + record[key] = formatted + + return expected_records + + def test_all_fields_for_streams_are_replicated(self): + for stream in self.test_streams: + with self.subTest(stream=stream): + + # gather expected values + #replication_method = self.expected_replication_method()[stream] + primary_keys = sorted(self.expected_primary_keys()[stream]) + + # gather replicated records + actual_records = [message['data'] + for message in AllFieldsTest.synced_records[stream]['messages'] + if message['action'] == 'upsert'] + + for expected_record in self.expected_records[stream]: + + primary_key_dict = {primary_key: expected_record[primary_key] for primary_key in primary_keys} + primary_key_values = list(primary_key_dict.values()) + + with self.subTest(expected_record=primary_key_dict): + # grab the replicated record that corresponds to expected_record by checking primary keys + matching_actual_records_by_pk = get_matching_actual_record_by_pk(primary_key_dict, actual_records) + if not matching_actual_records_by_pk: + LOGGER.warn("Expected %s record was not replicated: %s", + stream, primary_key_dict) + continue # skip this expected record if it isn't replicated + actual_record = matching_actual_records_by_pk[0] + + expected_keys = set(expected_record.keys()).union(FIELDS_ADDED_BY_TAP.get(stream, {})) + actual_keys = set(actual_record.keys()) + + # NB: KNOWN_MISSING_FIELDS is a dictionary of streams to aggregated missing fields. + # We will check each expected_record to see which of the known keys is present in expectations + # and then will add them to the known_missing_keys set. + known_missing_keys = set() + for missing_key in KNOWN_MISSING_FIELDS.get(stream, set()): + if missing_key in expected_record.keys(): + known_missing_keys.add(missing_key) + del expected_record[missing_key] + + # NB : KNOWN_EXTRA_FIELDS is a dictionary of streams to fields that should not + # be replicated but are. See the variable declaration at top of file for linked BUGs. + known_extra_keys = set() + for extra_key in KNOWN_EXTRA_FIELDS.get(stream, set()): + known_extra_keys.add(extra_key) + + # Verify the fields in our expected record match the fields in the corresponding replicated record + expected_keys_adjusted = expected_keys.union(known_extra_keys) + actual_keys_adjusted = actual_keys.union(known_missing_keys) + + # NB: The following woraround is for dynamic fields on the `deals` stream that we just can't track. + # At the time of implementation there is no customer feedback indicating that these dynamic fields + # would prove useful to an end user. The ones that we replicated with the test client are specific + # to our test data. We have determined that the filtering of these fields is an expected behavior. + + # deals workaround for 'property_hs_date_entered_' fields + bad_key_prefixes = {'property_hs_date_entered_', 'property_hs_date_exited_', + 'property_hs_time_in'} + bad_keys = set() + for key in expected_keys_adjusted: + for prefix in bad_key_prefixes: + if key.startswith(prefix) and key not in actual_keys_adjusted: + bad_keys.add(key) + for key in actual_keys_adjusted: + for prefix in bad_key_prefixes: + if key.startswith(prefix) and key not in expected_keys_adjusted: + bad_keys.add(key) + for key in bad_keys: + if key in expected_keys_adjusted: + expected_keys_adjusted.remove(key) + elif key in actual_keys_adjusted: + actual_keys_adjusted.remove(key) + + self.assertSetEqual(expected_keys_adjusted, actual_keys_adjusted) + + """ + Below code to be included in test_all_fields_for_streams_are_replicated once all cards of missing fields are fixed. TDL-16145 + for stream in self.test_streams: + with self.subTest(stream=stream): + + # gather expectations + expected_all_keys = self.selected_fields.get(stream, set()) - set(self.MISSING_FIELDS.get(stream, {})) + + # gather results + fields_replicated = self.actual_fields.get(stream, set()) + + # verify that all fields are sent to the target + # test the combination of all records + self.assertSetEqual(fields_replicated, expected_all_keys, + logging=f"verify all fields are replicated for stream {stream}") + """ + ########################################################################## + # Tests To Skip + ########################################################################## + + @unittest.skip("Random selection doesn't always sync records") + def test_all_streams_sync_records(self): + pass +