Skip to content

Commit

Permalink
[FLINK-36061] Support Iceberg CDC Pipeline SinkV2
Browse files Browse the repository at this point in the history
  • Loading branch information
ConradJam authored and czy006 committed Jan 22, 2025
1 parent 3e16a66 commit a7ab0c0
Show file tree
Hide file tree
Showing 16 changed files with 1,106 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>flink-cdc-pipeline-connectors</artifactId>
<groupId>org.apache.flink</groupId>
<version>${revision}</version>
</parent>

<artifactId>flink-cdc-pipeline-connector-iceberg</artifactId>
<packaging>jar</packaging>

<name>flink-cdc-pipeline-connector-iceberg</name>

<properties>
<iceberg.version>1.7.1</iceberg.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-core</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-flink-1.19</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-composer</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package org.apache.flink.cdc.connectors.iceberg.sink;

import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.function.HashFunctionProvider;
import org.apache.flink.cdc.common.sink.DataSink;
import org.apache.flink.cdc.common.sink.EventSinkProvider;
import org.apache.flink.cdc.common.sink.FlinkSinkProvider;
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.cdc.connectors.iceberg.sink.v2.IcebergEventSink;
import org.apache.flink.cdc.connectors.iceberg.sink.v2.IcebergRecordSerializer;

import java.io.Serializable;
import java.time.ZoneId;
import java.util.List;
import java.util.Map;

public class IcebergDataSink implements DataSink, Serializable {

// options for creating Iceberg catalog.
private final Map<String, String> catalogOptions;

// options for creating Iceberg table.
private final Map<String, String> tableOptions;

private final String commitUser;

private final Map<TableId, List<String>> partitionMaps;

private final IcebergRecordSerializer<Event> serializer;

private final ZoneId zoneId;

public final String schemaOperatorUid;

public IcebergDataSink(
Map<String, String> catalogOptions,
Map<String, String> tableOptions,
String commitUser,
Map<TableId, List<String>> partitionMaps,
IcebergRecordSerializer<Event> serializer,
ZoneId zoneId,
String schemaOperatorUid) {
this.catalogOptions = catalogOptions;
this.tableOptions = tableOptions;
this.commitUser = commitUser;
this.partitionMaps = partitionMaps;
this.serializer = serializer;
this.zoneId = zoneId;
this.schemaOperatorUid = schemaOperatorUid;
}

@Override
public EventSinkProvider getEventSinkProvider() {
IcebergEventSink icebergEventSink =
new IcebergEventSink(
tableOptions, commitUser, serializer, schemaOperatorUid, zoneId);
return FlinkSinkProvider.of(icebergEventSink);
}

@Override
public MetadataApplier getMetadataApplier() {
return new IcebergMetadataApplier(catalogOptions, tableOptions, partitionMaps);
}

@Override
public HashFunctionProvider<DataChangeEvent> getDataChangeEventHashFunctionProvider() {
// TODO getDataChangeEventHashFunctionProvider if use
return DataSink.super.getDataChangeEventHashFunctionProvider();
}

@Override
public HashFunctionProvider<DataChangeEvent> getDataChangeEventHashFunctionProvider(
int parallelism) {
return DataSink.super.getDataChangeEventHashFunctionProvider(parallelism);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package org.apache.flink.cdc.connectors.iceberg.sink;

import org.apache.flink.cdc.common.configuration.ConfigOption;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.factories.DataSinkFactory;
import org.apache.flink.cdc.common.factories.FactoryHelper;
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
import org.apache.flink.cdc.common.sink.DataSink;
import org.apache.flink.cdc.common.utils.Preconditions;
import org.apache.flink.cdc.connectors.iceberg.sink.v2.IcebergRecordEventSerializer;
import org.apache.flink.cdc.connectors.iceberg.sink.v2.IcebergRecordSerializer;
import org.apache.flink.table.catalog.Catalog;

import org.apache.commons.collections.map.HashedMap;
import org.apache.iceberg.flink.FlinkCatalogFactory;

import java.time.ZoneId;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

import static org.apache.flink.cdc.connectors.iceberg.sink.IcebergDataSinkOptions.PREFIX_CATALOG_PROPERTIES;
import static org.apache.flink.cdc.connectors.iceberg.sink.IcebergDataSinkOptions.PREFIX_TABLE_PROPERTIES;

public class IcebergDataSinkFactory implements DataSinkFactory {

public static final String IDENTIFIER = "iceberg";

@Override
public DataSink createDataSink(Context context) {
FactoryHelper.createFactoryHelper(this, context)
.validateExcept(PREFIX_TABLE_PROPERTIES, PREFIX_CATALOG_PROPERTIES);

Map<String, String> allOptions = context.getFactoryConfiguration().toMap();
Map<String, String> catalogOptions = new HashMap<>();
Map<String, String> tableOptions = new HashMap<>();
allOptions.forEach(
(key, value) -> {
if (key.startsWith(PREFIX_TABLE_PROPERTIES)) {
tableOptions.put(key.substring(PREFIX_TABLE_PROPERTIES.length()), value);
} else if (key.startsWith(IcebergDataSinkOptions.PREFIX_CATALOG_PROPERTIES)) {
catalogOptions.put(
key.substring(
IcebergDataSinkOptions.PREFIX_CATALOG_PROPERTIES.length()),
value);
}
});
FlinkCatalogFactory factory = new FlinkCatalogFactory();
try {
Catalog catalog =
factory.createCatalog(
catalogOptions.getOrDefault("default-database", "default"),
catalogOptions);
Preconditions.checkNotNull(
catalog.listDatabases(), "catalog option of Paimon is invalid.");
} catch (Exception e) {
throw new RuntimeException("failed to create or use paimon catalog", e);
}
ZoneId zoneId = ZoneId.systemDefault();
if (!Objects.equals(
context.getPipelineConfiguration().get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE),
PipelineOptions.PIPELINE_LOCAL_TIME_ZONE.defaultValue())) {
zoneId =
ZoneId.of(
context.getPipelineConfiguration()
.get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE));
}
String commitUser =
context.getFactoryConfiguration().get(IcebergDataSinkOptions.COMMIT_USER);
IcebergRecordSerializer<Event> serializer =
new IcebergRecordEventSerializer(new HashedMap(), zoneId);
String schemaOperatorUid =
context.getPipelineConfiguration()
.get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_UID);
return new IcebergDataSink(
catalogOptions,
tableOptions,
commitUser,
new HashMap<>(),
serializer,
zoneId,
schemaOperatorUid);
}

@Override
public String identifier() {
return IDENTIFIER;
}

@Override
public Set<ConfigOption<?>> requiredOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(IcebergDataSinkOptions.METASTORE);
return options;
}

@Override
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(IcebergDataSinkOptions.WAREHOUSE);
options.add(IcebergDataSinkOptions.URI);
options.add(IcebergDataSinkOptions.COMMIT_USER);
options.add(IcebergDataSinkOptions.PARTITION_KEY);
return options;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package org.apache.flink.cdc.connectors.iceberg.sink;

import org.apache.flink.cdc.common.configuration.ConfigOption;

import static org.apache.flink.cdc.common.configuration.ConfigOptions.key;

public class IcebergDataSinkOptions {

// prefix for passing properties for table creation.
public static final String PREFIX_TABLE_PROPERTIES = "table.properties.";

// prefix for passing properties for catalog creation.
public static final String PREFIX_CATALOG_PROPERTIES = "catalog.properties.";

public static final ConfigOption<String> COMMIT_USER =
key("commit.user")
.stringType()
.defaultValue("admin")
.withDescription("User name for committing data files.");

public static final ConfigOption<String> WAREHOUSE =
key("catalog.properties.warehouse")
.stringType()
.noDefaultValue()
.withDescription("The warehouse root path of catalog.");

public static final ConfigOption<String> METASTORE =
key("catalog.properties.metastore")
.stringType()
.noDefaultValue()
.withDescription("Metastore of iceberg catalog, supports filesystem and hive.");

public static final ConfigOption<String> URI =
key("catalog.properties.uri")
.stringType()
.noDefaultValue()
.withDescription("Uri of metastore server.");

public static final ConfigOption<String> PARTITION_KEY =
key("partition.key")
.stringType()
.defaultValue("")
.withDescription(
"Partition keys for each partitioned table, allow setting multiple primary keys for multiTables. "
+ "Tables are separated by ';', and partition keys are separated by ','. "
+ "For example, we can set partition.key of two tables by 'testdb.table1:id1,id2;testdb.table2:name'.");
}
Loading

0 comments on commit a7ab0c0

Please sign in to comment.