Skip to content

Commit

Permalink
Merge pull request #367 from jensur77/main
Browse files Browse the repository at this point in the history
Support custom `TimeMapper` class
  • Loading branch information
embano1 authored Aug 22, 2024
2 parents 1f779eb + b4e1a73 commit 0ad6c51
Show file tree
Hide file tree
Showing 8 changed files with 257 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class EventBridgeSinkConfig extends AbstractConfig {
static final String AWS_ROLE_EXTERNAL_ID_CONFIG = "aws.eventbridge.iam.external.id";
static final String AWS_DETAIL_TYPES_CONFIG = "aws.eventbridge.detail.types";
static final String AWS_DETAIL_TYPES_MAPPER_CLASS = "aws.eventbridge.detail.types.mapper.class";
static final String AWS_TIME_MAPPER_CLASS = "aws.eventbridge.time.mapper.class";
static final String AWS_EVENTBUS_RESOURCES_CONFIG = "aws.eventbridge.eventbus.resources";
static final String AWS_OFFLOADING_DEFAULT_S3_BUCKET =
"aws.eventbridge.offloading.default.s3.bucket";
Expand Down Expand Up @@ -76,10 +77,14 @@ public class EventBridgeSinkConfig extends AbstractConfig {
+ "Can be defined per topic e.g., 'topic1:MyDetailType, topic2:MyDetailType', as a single expression "
+ "with a dynamic '${topic}' placeholder for all topics e.g., 'my-detail-type-${topic}', "
+ "or as a static value without additional topic information for all topics e.g., 'my-detail-type'.";

private static final String AWS_DETAIL_TYPES_MAPPER_DOC =
"Define a custom implementation class for the DetailTypeMapper interface to customize the mapping of Kafka topics or records to the EventBridge detail-type. Define full class path e.g. software.amazon.event.kafkaconnector.mapping.DefaultDetailTypeMapper.";

private static final String AWS_TIME_MAPPER_CLASS_DEFAULT =
"software.amazon.event.kafkaconnector.mapping.DefaultTimeMapper";
private static final String AWS_TIME_MAPPER_DOC =
"Provide a custom implementation class for the TimeMapper interface to customize the mapping of records to EventBridge metadata field 'time' e.g. 'software.amazon.event.kafkaconnector.mapping.DefaultTimeMapper'.";

private static final String AWS_EVENTBUS_RESOURCES_DOC =
"An optional comma-separated list of strings to add to "
+ "the resources field in the outgoing EventBridge events.";
Expand All @@ -99,6 +104,7 @@ public class EventBridgeSinkConfig extends AbstractConfig {
public Map<String, String> detailTypeByTopic;
public String detailType;
public String detailTypeMapperClass;
public String timeMapperClass;
public String offloadingDefaultS3Bucket;
public String offloadingDefaultFieldRef;

Expand All @@ -116,6 +122,7 @@ public EventBridgeSinkConfig(final Map<?, ?> originalProps) {
this.retriesDelay = getInt(AWS_RETRIES_DELAY_CONFIG);
this.resources = getList(AWS_EVENTBUS_RESOURCES_CONFIG);
this.detailTypeMapperClass = getString(AWS_DETAIL_TYPES_MAPPER_CLASS);
this.timeMapperClass = getString(AWS_TIME_MAPPER_CLASS);
this.offloadingDefaultS3Bucket = getString(AWS_OFFLOADING_DEFAULT_S3_BUCKET);
this.offloadingDefaultFieldRef = getString(AWS_OFFLOADING_DEFAULT_FIELDREF);

Expand All @@ -132,7 +139,7 @@ public EventBridgeSinkConfig(final Map<?, ?> originalProps) {
"EventBridge properties: connectorId={} eventBusArn={} eventBusRegion={} eventBusEndpointURI={} "
+ "eventBusMaxRetries={} eventBusRetriesDelay={} eventBusResources={} "
+ "eventBusEndpointID={} roleArn={} roleSessionName={} roleExternalID={} "
+ "offloadingDefaultS3Bucket={} offloadingDefaultFieldRef={}",
+ "offloadingDefaultS3Bucket={} offloadingDefaultFieldRef={} detailTypeMapperClass={} timeMapperClass={}",
connectorId,
eventBusArn,
region,
Expand All @@ -145,7 +152,9 @@ public EventBridgeSinkConfig(final Map<?, ?> originalProps) {
connectorId,
externalId,
offloadingDefaultS3Bucket,
offloadingDefaultFieldRef);
offloadingDefaultFieldRef,
detailTypeMapperClass,
timeMapperClass);
}

private static ConfigDef createConfigDef() {
Expand Down Expand Up @@ -189,6 +198,12 @@ private static void addParams(final ConfigDef configDef) {
AWS_DETAIL_TYPES_DEFAULT,
Importance.MEDIUM,
AWS_DETAIL_TYPES_DOC);
configDef.define(
AWS_TIME_MAPPER_CLASS,
Type.STRING,
AWS_TIME_MAPPER_CLASS_DEFAULT,
Importance.MEDIUM,
AWS_TIME_MAPPER_DOC);
configDef.define(
AWS_EVENTBUS_RESOURCES_CONFIG,
Type.LIST,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,13 @@ public static void validate(ConfigValue configValue, EnvVarGetter getenv) {

case AWS_DETAIL_TYPES_MAPPER_CLASS:
{
validateDetailTypeMapperClass(configValue);
validateClassExists(configValue);
break;
}

case AWS_TIME_MAPPER_CLASS:
{
validateClassExists(configValue);
break;
}

Expand All @@ -115,7 +121,7 @@ private static void validateConnectorId(ConfigValue configValue) {
}
}

private static void validateDetailTypeMapperClass(ConfigValue configValue) {
private static void validateClassExists(ConfigValue configValue) {
var mapperClass = (String) configValue.value();
try {
Class.forName(mapperClass);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,13 @@
*/
package software.amazon.event.kafkaconnector.mapping;

import static java.util.Collections.singletonMap;
import static java.util.stream.Collectors.partitioningBy;
import static java.util.stream.Collectors.toList;
import static software.amazon.event.kafkaconnector.EventBridgeResult.Error.reportOnly;
import static software.amazon.event.kafkaconnector.EventBridgeResult.failure;
import static software.amazon.event.kafkaconnector.EventBridgeResult.success;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import java.io.IOException;
import java.util.List;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.sink.SinkRecord;
import software.amazon.awssdk.services.eventbridge.model.PutEventsRequestEntry;
import software.amazon.event.kafkaconnector.EventBridgeResult;
Expand All @@ -28,14 +21,17 @@ public class DefaultEventBridgeMapper implements EventBridgeMapper {
private static final String sourcePrefix = "kafka-connect.";

private final EventBridgeSinkConfig config;
private final JsonConverter jsonConverter = new JsonConverter();
private final ObjectMapper objectMapper = new ObjectMapper();

private final SinkRecordJsonMapper jsonMapper = new SinkRecordJsonMapper();

private final DetailTypeMapper detailTypeMapper;
private final TimeMapper timeMapper;

public DefaultEventBridgeMapper(EventBridgeSinkConfig config) {
jsonConverter.configure(singletonMap("schemas.enable", "false"), false);

this.config = config;
this.detailTypeMapper = getDetailTypeMapper(config);
this.timeMapper = getTimeMapper(config);
}

public EventBridgeMappingResult map(List<SinkRecord> records) {
Expand All @@ -61,77 +57,14 @@ private EventBridgeResult<PutEventsRequestEntry> createPutEventsEntry(SinkRecord
.source(sourcePrefix + config.connectorId)
.detailType(detailTypeMapper.getDetailType(record))
.resources(config.resources)
.detail(createJsonPayload(record))
.detail(jsonMapper.createJsonPayload(record))
.time(timeMapper.getTime(record))
.build());
} catch (Exception e) {
return failure(record, reportOnly("Cannot convert Kafka record to EventBridge.", e));
}
}

private String createJsonPayload(SinkRecord record) throws IOException {
var root = objectMapper.createObjectNode();
root.put("topic", record.topic());
root.put("partition", record.kafkaPartition());
root.put("offset", record.kafkaOffset());
root.put("timestamp", record.timestamp());
root.put("timestampType", record.timestampType().toString());
root.set("headers", createHeaderArray(record));

if (record.key() == null) {
root.set("key", null);
} else {
root.set(
"key",
createJSONFromByteArray(
jsonConverter.fromConnectData(record.topic(), record.keySchema(), record.key())));
}

// tombstone handling
if (record.value() == null) {
root.set("value", null);
} else {
root.set(
"value",
createJSONFromByteArray(
jsonConverter.fromConnectData(record.topic(), record.valueSchema(), record.value())));
}

return root.toString();
}

/**
* This method serializes Kafka message headers to JSON.
*
* @param record Kafka record to be sent to EventBridge
* @return headers to be added to EventBridge message
* @throws IOException
*/
private ArrayNode createHeaderArray(SinkRecord record) throws IOException {
var headersArray = objectMapper.createArrayNode();

for (Header header : record.headers()) {
var headerItem = objectMapper.createObjectNode();
headerItem.set(
header.key(),
createJSONFromByteArray(
jsonConverter.fromConnectHeader(
record.topic(), header.key(), header.schema(), header.value())));
headersArray.add(headerItem);
}
return headersArray;
}

/**
* This method converts the byteArray which is returned by the {@link JsonConverter} to JSON.
*
* @param jsonBytes - byteArray to convert to JSON
* @return the JSON representation of jsonBytes
* @throws IOException
*/
private JsonNode createJSONFromByteArray(byte[] jsonBytes) throws IOException {
return objectMapper.readTree(jsonBytes);
}

private DetailTypeMapper getDetailTypeMapper(EventBridgeSinkConfig config) {
try {
var myClass = Class.forName(config.detailTypeMapperClass);
Expand All @@ -144,4 +77,15 @@ private DetailTypeMapper getDetailTypeMapper(EventBridgeSinkConfig config) {
throw new RuntimeException("Topic to Detail-Type Mapper Class can't be loaded.");
}
}

private TimeMapper getTimeMapper(EventBridgeSinkConfig config) {
try {
var myClass = Class.forName(config.timeMapperClass);
var constructor = myClass.getDeclaredConstructor();
return (TimeMapper) constructor.newInstance();
} catch (Exception e) {
// This will already be verified in the Config Validator
throw new RuntimeException("Time Mapper Class can't be loaded.");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/
package software.amazon.event.kafkaconnector.mapping;

import java.time.Instant;
import org.apache.kafka.connect.sink.SinkRecord;

public class DefaultTimeMapper implements TimeMapper {

@Override
public Instant getTime(SinkRecord sinkRecord) {
// As described in AWS documentation
// https://docs.aws.amazon.com/eventbridge/latest/APIReference/API_PutEventsRequestEntry.html
// If no timestamp is provided, the timestamp of the PutEvents call is used.
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/
package software.amazon.event.kafkaconnector.mapping;

import static java.util.Collections.singletonMap;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import java.io.IOException;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.sink.SinkRecord;

public class SinkRecordJsonMapper {
private final JsonConverter jsonConverter = new JsonConverter();
private final ObjectMapper objectMapper = new ObjectMapper();

public SinkRecordJsonMapper() {
jsonConverter.configure(singletonMap("schemas.enable", "false"), false);
}

public String createJsonPayload(SinkRecord sinkRecord) throws IOException {
var root = objectMapper.createObjectNode();
root.put("topic", sinkRecord.topic());
root.put("partition", sinkRecord.kafkaPartition());
root.put("offset", sinkRecord.kafkaOffset());
root.put("timestamp", sinkRecord.timestamp());
root.put("timestampType", sinkRecord.timestampType().toString());
root.set("headers", createHeaderArray(sinkRecord));

if (sinkRecord.key() == null) {
root.set("key", null);
} else {
root.set(
"key",
createJSONFromByteArray(
jsonConverter.fromConnectData(
sinkRecord.topic(), sinkRecord.keySchema(), sinkRecord.key())));
}

// tombstone handling
if (sinkRecord.value() == null) {
root.set("value", null);
} else {
root.set(
"value",
createJSONFromByteArray(
jsonConverter.fromConnectData(
sinkRecord.topic(), sinkRecord.valueSchema(), sinkRecord.value())));
}
return root.toString();
}

/**
* This method serializes Kafka message headers to JSON.
*
* @param sinkRecord Kafka record to be sent to EventBridge
* @return headers to be added to EventBridge message
* @throws IOException
*/
private ArrayNode createHeaderArray(SinkRecord sinkRecord) throws IOException {
var headersArray = objectMapper.createArrayNode();

for (Header header : sinkRecord.headers()) {
var headerItem = objectMapper.createObjectNode();
headerItem.set(
header.key(),
createJSONFromByteArray(
jsonConverter.fromConnectHeader(
sinkRecord.topic(), header.key(), header.schema(), header.value())));
headersArray.add(headerItem);
}
return headersArray;
}

/**
* This method converts the byteArray which is returned by the {@link JsonConverter} to JSON.
*
* @param jsonBytes - byteArray to convert to JSON
* @return the JSON representation of jsonBytes
* @throws IOException
*/
private JsonNode createJSONFromByteArray(byte[] jsonBytes) throws IOException {
return objectMapper.readTree(jsonBytes);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/
package software.amazon.event.kafkaconnector.mapping;

import java.time.Instant;
import org.apache.kafka.connect.sink.SinkRecord;

public interface TimeMapper {
Instant getTime(SinkRecord sinkRecord);
}
Loading

0 comments on commit 0ad6c51

Please sign in to comment.