Skip to content

Commit

Permalink
[Part 1 ] : Hackday project to debug connections (airbytehq#33027)
Browse files Browse the repository at this point in the history
Co-authored-by: akashkulk <[email protected]>
  • Loading branch information
akashkulk and akashkulk authored Dec 12, 2023
1 parent a2b160b commit f827af1
Show file tree
Hide file tree
Showing 13 changed files with 129 additions and 7 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ secrets
updated_configurations
!airbyte-integrations/connector-templates/**/secrets

# Connector debug configs
airbyte-integrations/connectors/**/src/test/resources/debug_resources

# Test logs
acceptance_tests_logs

Expand Down
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ MavenLocal debugging steps:

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.7.1 | 2023-12-01 | [\#33027](https://github.com/airbytehq/airbyte/pull/33027) | Add the abstract DB source debugger. |
| 0.7.0 | 2023-12-07 | [\#32326](https://github.com/airbytehq/airbyte/pull/32326) | Destinations V2 changes for JDBC destinations |
| 0.6.4 | 2023-12-06 | [\#33082](https://github.com/airbytehq/airbyte/pull/33082) | Improvements to schema snapshot error handling + schema snapshot history scope (scoped to configured DB). |
| 0.6.2 | 2023-11-30 | [\#32573](https://github.com/airbytehq/airbyte/pull/32573) | Update MSSQLConverter to enforce 6-digit microsecond precision for timestamp fields |
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.7.0
version=0.7.1
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.integrations.debug;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.cdk.integrations.base.Source;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteStateMessage;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import java.util.Collections;

/**
* Utility class defined to debug a source. Copy over any relevant configurations, catalogs & state
* in the resources/debug_resources directory.
*/
public class DebugUtil {

@SuppressWarnings({"unchecked", "deprecation", "resource"})
public static void debug(final Source debugSource) throws Exception {
final JsonNode debugConfig = DebugUtil.getConfig();
final ConfiguredAirbyteCatalog configuredAirbyteCatalog = DebugUtil.getCatalog();
final JsonNode state = DebugUtil.getState();

debugSource.check(debugConfig);
debugSource.discover(debugConfig);

final AutoCloseableIterator<AirbyteMessage> messageIterator = debugSource.read(debugConfig, configuredAirbyteCatalog, state);
messageIterator.forEachRemaining(message -> {});
}

private static JsonNode getConfig() throws Exception {
final JsonNode originalConfig = new ObjectMapper().readTree(MoreResources.readResource("debug_resources/config.json"));
final JsonNode debugConfig = ((ObjectNode) originalConfig.deepCopy()).put("debug_mode", true);
return debugConfig;
}

private static ConfiguredAirbyteCatalog getCatalog() throws Exception {
final String catalog = MoreResources.readResource("debug_resources/configured_catalog.json");
return Jsons.deserialize(catalog, ConfiguredAirbyteCatalog.class);
}

private static JsonNode getState() throws Exception {
final AirbyteStateMessage message = Jsons.deserialize(MoreResources.readResource("debug_resources/state.json"), AirbyteStateMessage.class);
return Jsons.jsonNode(Collections.singletonList(message));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ java {
}

airbyteJavaConnector {
cdkVersionRequired = '0.5.3'
cdkVersionRequired = '0.7.1'
features = ['db-sources']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerImageTag: 3.2.25
dockerImageTag: 3.2.26
dockerRepository: airbyte/source-postgres
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
githubIssueLabel: source-postgres
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ public static boolean shouldFlushAfterSync(final JsonNode config) {
return shouldFlushAfterSync;
}

public static boolean isDebugMode(final JsonNode config) {
return config.hasNonNull("debug_mode");
}

public static Optional<Integer> getFirstRecordWaitSeconds(final JsonNode config) {
final JsonNode replicationMethod = config.get("replication_method");
if (replicationMethod != null && replicationMethod.has("initial_waiting_seconds")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.integrations.source.postgres.cdc;

import static io.airbyte.integrations.source.postgres.PostgresQueryUtils.streamsUnderVacuum;
import static io.airbyte.integrations.source.postgres.PostgresUtils.isDebugMode;
import static io.airbyte.integrations.source.postgres.PostgresUtils.prettyPrintConfiguredAirbyteStreamList;

import com.fasterxml.jackson.databind.JsonNode;
Expand All @@ -17,6 +18,7 @@
import io.airbyte.cdk.integrations.source.relationaldb.TableInfo;
import io.airbyte.cdk.integrations.source.relationaldb.models.CdcState;
import io.airbyte.cdk.integrations.source.relationaldb.state.StateManager;
import io.airbyte.commons.exceptions.ConfigErrorException;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.commons.util.AutoCloseableIterators;
Expand Down Expand Up @@ -73,6 +75,11 @@ public static List<AutoCloseableIterator<AirbyteMessage>> cdcCtidIteratorsCombin
LOGGER.info("First record waiting time: {} seconds", firstRecordWaitTime.getSeconds());
LOGGER.info("Queue size: {}", queueSize.getAsInt());

if (isDebugMode(sourceConfig) && !PostgresUtils.shouldFlushAfterSync(sourceConfig)) {
throw new ConfigErrorException("WARNING: The config indicates that we are clearing the WAL while reading data. This will mutate the WAL" +
" associated with the source being debugged and is not advised.");
}

final PostgresDebeziumStateUtil postgresDebeziumStateUtil = new PostgresDebeziumStateUtil();

final JsonNode initialDebeziumState = postgresDebeziumStateUtil.constructInitialDebeziumState(database,
Expand Down Expand Up @@ -105,7 +112,8 @@ public static List<AutoCloseableIterator<AirbyteMessage>> cdcCtidIteratorsCombin

if (!savedOffsetAfterReplicationSlotLSN) {
LOGGER.warn("Saved offset is before Replication slot's confirmed_flush_lsn, Airbyte will trigger sync from scratch");
} else if (PostgresUtils.shouldFlushAfterSync(sourceConfig)) {
} else if (!isDebugMode(sourceConfig) && PostgresUtils.shouldFlushAfterSync(sourceConfig)) {
// We do not want to acknowledge the WAL logs in debug mode.
postgresDebeziumStateUtil.commitLSNToPostgresDatabase(database.getDatabaseConfig(),
savedOffset,
sourceConfig.get("replication_method").get("replication_slot").asText(),
Expand Down Expand Up @@ -162,6 +170,7 @@ public static List<AutoCloseableIterator<AirbyteMessage>> cdcCtidIteratorsCombin
LOGGER.info("No streams will be synced via ctid");
}

// Gets the target position.
final var targetPosition = PostgresCdcTargetPosition.targetPosition(database);
final AirbyteDebeziumHandler<Long> handler = new AirbyteDebeziumHandler<>(sourceConfig,
targetPosition, false, firstRecordWaitTime, subsequentRecordWaitTime, queueSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.fasterxml.jackson.databind.JsonNode;
Expand All @@ -35,6 +36,7 @@
import io.airbyte.cdk.integrations.debezium.internals.postgres.PostgresCdcTargetPosition;
import io.airbyte.cdk.integrations.debezium.internals.postgres.PostgresReplicationConnection;
import io.airbyte.cdk.integrations.util.ConnectorExceptionUtil;
import io.airbyte.commons.exceptions.ConfigErrorException;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.features.FeatureFlagsWrapper;
import io.airbyte.commons.json.Jsons;
Expand Down Expand Up @@ -102,6 +104,19 @@ protected void setup() {
testdb.withPublicationForAllTables();
}

@Test
void testDebugMode() {
final JsonNode invalidDebugConfig = testdb.testConfigBuilder()
.withSchemas(modelsSchema(), modelsSchema() + "_random")
.withoutSsl()
.withCdcReplication("While reading Data")
.with(SYNC_CHECKPOINT_RECORDS_PROPERTY, 1)
.with("debug_mode", true)
.build();
final ConfiguredAirbyteCatalog configuredCatalog = Jsons.clone(getConfiguredCatalog());
assertThrows(ConfigErrorException.class, () -> source().read(invalidDebugConfig, configuredCatalog, null));
}

@Test
void testCheckReplicationAccessSuperUserPrivilege() throws Exception {
final var cleanUserSuperName = testdb.withNamespace("super_user");
Expand Down Expand Up @@ -861,7 +876,7 @@ protected void ctidIteratorPageSizeTest() throws Exception {
});
}

private void bulkInsertRecords(int recordsToCreate) {
private void bulkInsertRecords(final int recordsToCreate) {
testdb.with("""
INSERT INTO %s.%s (%s, %s, %s)
SELECT
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.postgres;

import io.airbyte.cdk.integrations.debug.DebugUtil;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.features.FeatureFlagsWrapper;

public class PostgresDebugger {

@SuppressWarnings({"unchecked", "deprecation", "resource"})
public static void main(final String[] args) throws Exception {
final PostgresSource postgresSource = new PostgresSource();
postgresSource.setFeatureFlags(FeatureFlagsWrapper.overridingUseStreamCapableState(new EnvVariableFeatureFlags(), true));
DebugUtil.debug(postgresSource);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ void tearDown() {
}

public PostgresSource source() {
var source = new PostgresSource();
final var source = new PostgresSource();
source.setFeatureFlags(FeatureFlagsWrapper.overridingUseStreamCapableState(new EnvVariableFeatureFlags(), true));
return source;
}
Expand Down Expand Up @@ -700,7 +700,7 @@ private static ConfiguredAirbyteStream toIncrementalConfiguredStream(final Airby
@Test
void testParseJdbcParameters() {
final String jdbcPropertiesString = "foo=bar&options=-c%20search_path=test,public,pg_catalog%20-c%20statement_timeout=90000&baz=quux";
Map<String, String> parameters = PostgresSource.parseJdbcParameters(jdbcPropertiesString, "&");
final Map<String, String> parameters = PostgresSource.parseJdbcParameters(jdbcPropertiesString, "&");
assertEquals("-c%20search_path=test,public,pg_catalog%20-c%20statement_timeout=90000", parameters.get("options"));
assertEquals("bar", parameters.get("foo"));
assertEquals("quux", parameters.get("baz"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.cdk.db.jdbc.JdbcUtils;
import io.airbyte.commons.jackson.MoreMappers;
import io.airbyte.commons.json.Jsons;
import java.time.Duration;
import java.util.Collections;
Expand Down Expand Up @@ -95,4 +96,18 @@ public void shouldFlushAfterSync() {
assertFalse(PostgresUtils.shouldFlushAfterSync(config));
}

@Test
void testDebugMode() {
final var config = MoreMappers.initMapper().createObjectNode();
assertFalse(PostgresUtils.isCdc(config));

config.set("replication_method", Jsons.jsonNode(Map.of(
"replication_slot", "slot",
"publication", "ab_pub")));
assertFalse(PostgresUtils.isDebugMode(config));

config.set("debug_mode", Jsons.jsonNode(true));
assertTrue(PostgresUtils.isDebugMode(config));
}

}
1 change: 1 addition & 0 deletions docs/integrations/sources/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp

| Version | Date | Pull Request | Subject |
|---------|------------|----------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 3.2.26 | 2023-12-11 | [33027](https://github.com/airbytehq/airbyte/pull/32961) | Support for better debugging tools. |
| 3.2.25 | 2023-11-29 | [32961](https://github.com/airbytehq/airbyte/pull/32961) | Bump debezium wait time default to 20 min. |
| 3.2.24 | 2023-11-28 | [32686](https://github.com/airbytehq/airbyte/pull/32686) | Better logging to understand dbz closing reason attribution. |
| 3.2.23 | 2023-11-28 | [32891](https://github.com/airbytehq/airbyte/pull/32891) | Fix CDK dependency in build. |
Expand Down

0 comments on commit f827af1

Please sign in to comment.