Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[wip][feat][io] Debezium DB2 source connector for Pulsar #19821

Open
wants to merge 45 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
061b4df
Added Debezium DB2 connector for Pulsar
devinbost Mar 15, 2023
9972036
Initial commit for DB2 Debezium connector testing
devinbost Mar 17, 2023
4ea5216
Updated db2 source config
devinbost Mar 17, 2023
f38a5c9
Removed CI file
devinbost Mar 17, 2023
5f281bb
Updating docs pom
devinbost Mar 22, 2023
d3c5bbd
Added missing hooks to execute DB2 tests and updated some test code.
devinbost Mar 24, 2023
0fa7509
Updated source config for doc purposes
devinbost Mar 28, 2023
92dc034
Update pulsar-io/debezium/db2/src/main/java/org/apache/pulsar/io/debe…
devinbost Mar 29, 2023
3e97cc1
Separated db2 connection with subsequent commands
devinbost Mar 30, 2023
817d7dc
Added more startup commands
devinbost Mar 30, 2023
58a4ca0
Updated image for Debezium test container to use the new DB2 image
devinbost Mar 30, 2023
c482f3b
Fixed image reference
devinbost Mar 31, 2023
e6b5599
Fix license header
devinbost Mar 31, 2023
262d7c6
Fixed format of license header
devinbost Mar 31, 2023
d29db07
Fixed license format by running mvn initialize license:format
devinbost Mar 31, 2023
e493cf8
Added more license details and updated CI to build docker image for DB2
devinbost Mar 31, 2023
fa9188e
Added DB2 image to Pulsar IO step in CI
devinbost Mar 31, 2023
4f05272
Added check to ensure that commands aren't run on DB2 until DB finish…
devinbost Mar 31, 2023
b04c23e
Added wait checks to ensure DB2 has initialized before connecting
devinbost Mar 31, 2023
5ab2163
Added more logging to troubleshoot container startup
devinbost Mar 31, 2023
c6812ef
Added more logging to troubleshoot container startup if there are errors
devinbost Mar 31, 2023
77557d1
Add more checks since something is timing out in the test
devinbost Mar 31, 2023
dd7515e
Increasing timeout since DB2 takes a while to start
devinbost Mar 31, 2023
bf0d3e1
Increasing timeout since DB2 takes a while to start.
devinbost Mar 31, 2023
414738c
Increasing wait time between subsequent commands
devinbost Mar 31, 2023
a9673de
Catching exception and printing more debug info
devinbost Mar 31, 2023
49c342f
Refactored DB2 testing to use JDBC driver
devinbost Apr 3, 2023
4cdd64b
Added JDBC logging for DB2
devinbost Apr 4, 2023
5ffe1e4
Added test wait strategy based on testcontainers DB2 container
devinbost Apr 4, 2023
ab96ad1
fixed typo
devinbost Apr 4, 2023
189ef39
Fixed issue with DB2 calls in test
devinbost Apr 4, 2023
e400261
Removed CDC logic from test
devinbost Apr 4, 2023
2b2de0d
Added step to setup CDC
devinbost Apr 4, 2023
970fd53
Refactored cdcsetup.sh execution to be synchronous to prevent race on DB
devinbost Apr 4, 2023
f7bcb0d
Fixed style checks
devinbost Apr 4, 2023
992aa52
Fixed case sensitive values
devinbost Apr 4, 2023
1544db3
Fixed case
devinbost Apr 4, 2023
4ce089a
Added more properties to DB2 source to try to fix connector restarts
devinbost Apr 5, 2023
faf2eab
Removing additional insert to avoid duplicate write from failing test
devinbost Apr 5, 2023
16170a8
Incremented number of expected messages.
devinbost Apr 5, 2023
972c349
Added db2 bind command
devinbost Apr 5, 2023
b437fd5
Fixed bind command
devinbost Apr 5, 2023
2dbd84a
Added more logging
devinbost Apr 5, 2023
ce6e395
Added retention
devinbost Apr 14, 2023
45f2fde
Fixed retention limit
devinbost Apr 14, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions build/run_integration_group.sh
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,10 @@ test_group_pulsar_io_ora() {
mvn_run_integration_test "$@" -DintegrationTestSuiteFile=pulsar-io-ora-source.xml -DintegrationTests -Dgroups=source -DtestRetryCount=0
}

test_group_pulsar_io_db2() {
mvn_run_integration_test "$@" -DintegrationTestSuiteFile=pulsar-io-db2-source.xml -DintegrationTests -Dgroups=source -DtestRetryCount=0
}

list_test_groups() {
declare -F | awk '{print $NF}' | sort | grep -E '^test_group_' | sed 's/^test_group_//g' | tr '[:lower:]' '[:upper:]'
}
Expand Down
1 change: 1 addition & 0 deletions distribution/io/src/assemble/io.xml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
<file><source>${basedir}/../../pulsar-io/debezium/mysql/target/pulsar-io-debezium-mysql-${project.version}.nar</source></file>
<file><source>${basedir}/../../pulsar-io/debezium/postgres/target/pulsar-io-debezium-postgres-${project.version}.nar</source></file>
<file><source>${basedir}/../../pulsar-io/debezium/oracle/target/pulsar-io-debezium-oracle-${project.version}.nar</source></file>
<file><source>${basedir}/../../pulsar-io/debezium/db2/target/pulsar-io-debezium-db2-${project.version}.nar</source></file>
<file><source>${basedir}/../../pulsar-io/debezium/mssql/target/pulsar-io-debezium-mssql-${project.version}.nar</source></file>
<file><source>${basedir}/../../pulsar-io/debezium/mongodb/target/pulsar-io-debezium-mongodb-${project.version}.nar</source></file>
<file><source>${basedir}/../../pulsar-io/influxdb/target/pulsar-io-influxdb-${project.version}.nar</source></file>
Expand Down
65 changes: 65 additions & 0 deletions pulsar-io/debezium/db2/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
<!--

Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.

-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-io-debezium</artifactId>
<version>3.0.0-SNAPSHOT</version>
</parent>

<artifactId>pulsar-io-debezium-db2</artifactId>
<name>Pulsar IO :: Debezium :: DB2</name>

<dependencies>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-core</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-debezium-core</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-db2</artifactId>
<version>${debezium.version}</version>
</dependency>

</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.io.debezium.db2;

import java.util.Map;
import org.apache.kafka.connect.runtime.TaskConfig;
import org.apache.pulsar.io.debezium.DebeziumSource;


/**
* A pulsar source that runs debezium oracle source.
*/
public class DebeziumDB2Source extends DebeziumSource {
private static final String DEFAULT_TASK = "io.debezium.connector.db2.Db2ConnectorTask";

@Override
public void setDbConnectorTask(Map<String, Object> config) throws Exception {
throwExceptionIfConfigNotMatch(config, TaskConfig.TASK_CLASS_CONFIG, DEFAULT_TASK);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.io.debezium.db2;
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

name: debezium-db2
description: Debezium DB2 Source
sourceClass: org.apache.pulsar.io.debezium.db2.DebeziumDB2Source
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#
devinbost marked this conversation as resolved.
Show resolved Hide resolved
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

## config for db2, docker image: ibmcom/db2, which was moved to icr.io/db2_community/db2
## As per note at https://hub.docker.com/r/ibmcom/db2
## docs are being moved to: https://www.ibm.com/docs/en/db2/11.5?topic=deployments-db2-community-edition-docker
## For testing, you can use this image that was configured with the required libraries: devingbost/debezium-db2:0.0.1
## docker run -itd --name mydb2 --privileged=true -p 50000:50000 -e LICENSE=accept -e DB2INST1_PASSWORD=admin -e DBNAME=testdb -v <db storage dir>:/database ibmcom/db2

tenant: "public"
namespace: "default"
name: "debezium-db2-source"
inputs: [ "db2-connect-topic" ]
topicName: "db2-connect-topic"
archive: "connectors/pulsar-io-debezium-db2-2.11.0-SNAPSHOT.nar"
parallelism: 1
configs:
database.hostname: "localhost"
database.port: "50000"
database.user: "db2inst1"
database.password: "admin"
database.dbname: "mydb2"
database.server.name: "db2inst1"
topic.prefix: "stores" # Example for hypothetical stores topics
table.exclude.list: "DB2INST1.EXCLUDEDSTORES" # Example for hypothetical stores table

database.history: "org.apache.pulsar.io.debezium.PulsarDatabaseHistory"
database.history.pulsar.topic: "debezium-db2-source-history-topic"
database.history.pulsar.service.url: "pulsar://127.0.0.1:6650"
## PULSAR_SERVICE_URL_CONFIG
pulsar.service.url: "pulsar://127.0.0.1:6650"
topic.namespace: "public/default"
# CONVERTERS
key.converter: "org.apache.kafka.connect.json.JsonConverter"
value.converter: "org.apache.kafka.connect.json.JsonConverter"
typeClassName: "org.apache.pulsar.common.schema.KeyValue"

task.class: "io.debezium.connector.db2.Db2ConnectorTask"

offset.storage.topic: "offset-topic"
snapshot.mode: "initial"
database.tcpKeepAlive: "true"
decimal.handling.mode: "double"
1 change: 1 addition & 0 deletions pulsar-io/debezium/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
<module>mongodb</module>
<module>oracle</module>
<module>mssql</module>
<module>db2</module>
</modules>

</project>
5 changes: 5 additions & 0 deletions pulsar-io/docs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@
<artifactId>pulsar-io-debezium-mssql</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-debezium-db2</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-dynamodb</artifactId>
Expand Down
13 changes: 12 additions & 1 deletion tests/docker-images/latest-version-image/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -129,5 +129,16 @@ RUN cd META-INF/bundled-dependencies && curl -sSLO https://search.maven.org/remo

RUN jar uf connectors/pulsar-io-debezium-oracle-*.nar META-INF/bundled-dependencies/ojdbc8-19.3.0.0.jar META-INF/bundled-dependencies/ucp-19.3.0.0.jar META-INF/bundled-dependencies/oraclepki-19.3.0.0.jar META-INF/bundled-dependencies/osdt_cert-19.3.0.0.jar META-INF/bundled-dependencies/osdt_core-19.3.0.0.jar META-INF/bundled-dependencies/simplefan-19.3.0.0.jar META-INF/bundled-dependencies/orai18n-19.3.0.0.jar META-INF/bundled-dependencies/xdb-19.3.0.0.jar META-INF/bundled-dependencies/xmlparserv2-19.3.0.0.jar


# Download IBM DB2 JDBC driver for DB2 Debezium Connector tests:
RUN cd META-INF/bundled-dependencies && curl -sSLO https://search.maven.org/remotecontent?filepath=com/ibm/db2/jcc/11.5.8.0/jcc-11.5.8.0.jar
RUN cd META-INF/bundled-dependencies && curl -sSLO https://search.maven.org/remotecontent?filepath=com/ibm/db2/jcc/db2jcc/db2jcc4/db2jcc-db2jcc4.jar
devinbost marked this conversation as resolved.
Show resolved Hide resolved
RUN mkdir -p /asncdctools/src
RUN cd /asncdctools/src && curl -sSLO https://raw.githubusercontent.com/debezium/debezium-connector-db2/main/src/test/docker/db2-cdc-docker/asncdc.c
RUN cd /asncdctools/src && curl -sSLO https://raw.githubusercontent.com/debezium/debezium-connector-db2/main/src/test/docker/db2-cdc-docker/dbsetup.sh
RUN cd /asncdctools/src && curl -sSLO https://raw.githubusercontent.com/debezium/debezium-connector-db2/main/src/test/docker/db2-cdc-docker/asncdc_UDF.sql
RUN cd /asncdctools/src && curl -sSLO https://raw.githubusercontent.com/debezium/debezium-connector-db2/main/src/test/docker/db2-cdc-docker/asncdcaddremove.sql
RUN cd /asncdctools/src && curl -sSLO https://raw.githubusercontent.com/debezium/debezium-connector-db2/main/src/test/docker/db2-cdc-docker/asncdctables.sql
RUN cd /asncdctools/src && curl -sSLO https://raw.githubusercontent.com/debezium/debezium-connector-db2/main/src/test/docker/db2-cdc-docker/cdcsetup.sh

RUN jar uf connectors/pulsar-io-debezium-db2-*.nar META-INF/bundled-dependencies/jcc-11.5.8.0.jar META-INF/bundled-dependencies/db2jcc-db2jcc4.jar
CMD bash
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.tests.integration.containers;


import java.time.Duration;
import java.time.temporal.ChronoUnit;
import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;

public class DebeziumDB2DbContainer extends ChaosContainer<DebeziumDB2DbContainer> {

public static final String NAME = "debezium-db2";
static final Integer[] PORTS = { 50000 };

//
private static final String IMAGE_NAME = "devingbost/debezium-db2:0.0.1";
devinbost marked this conversation as resolved.
Show resolved Hide resolved

public DebeziumDB2DbContainer(String clusterName) {
super(clusterName, IMAGE_NAME);
}

@Override
public String getContainerName() {
return clusterName;
}

@Override
protected void configure() {
super.configure();
this.withNetworkAliases(NAME)
.withExposedPorts(PORTS)
.withEnv("LICENSE", "accept")
.withEnv("DB2INSTANCE", "db2inst1")
.withEnv("DB2INST1_PASSWORD", "admin")
.withEnv("DBNAME", "mydb2")
.withEnv("BLU", "false")
.withEnv("ENABLE_ORACLE_COMPATIBILITY", "false")
.withEnv("UPDATEAVAIL", "NO")
.withEnv("TO_CREATE_SAMPLEDB", "false")
.withEnv("REPODB", "false")
.withEnv("IS_OSXFS", "false")
.withEnv("PERSISTENT_HOME", "true")
.withEnv("HADR_ENABLED", "false")
.withPrivilegedMode(true)
.withStartupTimeout(Duration.of(300, ChronoUnit.SECONDS))
.withCreateContainerCmdModifier(createContainerCmd -> {
createContainerCmd.withHostName(NAME);
createContainerCmd.withName(getContainerName());
})
.waitingFor(new HostPortWaitStrategy());
}

}
Loading