Skip to content
This repository has been archived by the owner on Apr 3, 2024. It is now read-only.

Commit

Permalink
feat: add kafkasql-journal topic import example/tool
Browse files Browse the repository at this point in the history
  • Loading branch information
jsenko committed Nov 6, 2023
1 parent c7bb1ab commit 5791af3
Show file tree
Hide file tree
Showing 6 changed files with 347 additions and 0 deletions.
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
<module>rest-client-downstream</module>
<module>serdes-with-references</module>
<module>protobuf-validation</module>
<module>tools/kafkasql-topic-import</module>
<!--<module>camel-quarkus-kafka</module>-->
</modules>

Expand Down
94 changes: 94 additions & 0 deletions tools/kafkasql-topic-import/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
<?xml version="1.0"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-examples</artifactId>
<version>2.4.3.Final</version>
<relativePath>../../pom.xml</relativePath>
</parent>

<artifactId>apicurio-registry-tools-kafkasql-topic-import</artifactId>
<packaging>jar</packaging>

<dependencies>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.28</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.15.2</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.15.2</version>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.2.3</version>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>32.1.3-jre</version>
</dependency>

<dependency>
<groupId>info.picocli</groupId>
<artifactId>picocli</artifactId>
<version>4.7.5</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.9</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>2.0.9</version>
</dependency>

</dependencies>

<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>io.apicurio.registry.tools.kafkasqltopicimport.Main</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright 2020 Red Hat
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.apicurio.registry.tools.kafkasqltopicimport;

import lombok.*;

import java.util.List;


/**
* @author Jakub Senko <em>[email protected]</em>
*/
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Getter
@Setter
@EqualsAndHashCode
@ToString
public class Envelope {

private String topic;

private Integer partition;

private Long offset;

private String tstype;

private Long ts;

private Long broker;

private List<String> headers;

private String key;

private String payload;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Copyright 2020 JBoss Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.apicurio.registry.tools.kafkasqltopicimport;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Streams;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.slf4j.simple.SimpleLogger;
import picocli.CommandLine.Command;

import java.io.BufferedReader;
import java.io.FileReader;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static picocli.CommandLine.Option;

/**
* @author Jakub Senko <em>[email protected]</em>
*/
@Command(name = "import", version = "0.1", mixinStandardHelpOptions = true)
public class ImportCommand implements Runnable {

private static final ObjectMapper mapper = new ObjectMapper();

@Option(names = {"-b", "--bootstrap-sever"}, description = "Kafka bootstrap server URL.",
required = true, defaultValue = "localhost:9092")
private String kafkaBootstrapServer;

@Option(names = {"-f", "--file"}, description = "Path to a kafkasql-journal topic dump file. " +
"Messages must use a JSON envelope and have base64-encoded keys and values.", required = true)
private String dumpFilePath;

@Option(names = {"-d", "--debug"}, description = "Print debug log messages.", defaultValue = "false")
private boolean debug;

public void run() {

if(debug) {
System.setProperty(org.slf4j.simple.SimpleLogger.DEFAULT_LOG_LEVEL_KEY, "DEBUG");
} else {
System.setProperty(org.slf4j.simple.SimpleLogger.DEFAULT_LOG_LEVEL_KEY, "WARN");
}

try (Producer<byte[], byte[]> producer = createKafkaProducer()) {

try (BufferedReader br = new BufferedReader(new FileReader(dumpFilePath))) {
String line;
while ((line = br.readLine()) != null) {
var envelope = mapper.readValue(line, Envelope.class);

if (envelope.getHeaders() == null) {
envelope.setHeaders(List.of());
}
if (envelope.getHeaders().size() % 2 != 0) {
throw new RuntimeException("Invalid length of the headers field: " + envelope.getHeaders().size());
}

var key = envelope.getKey() != null ? Base64.getDecoder().decode(envelope.getKey()) : null;
var value = envelope.getPayload() != null ? Base64.getDecoder().decode(envelope.getPayload()) : null;

var record = new ProducerRecord<>(
envelope.getTopic(),
envelope.getPartition(),
envelope.getTs(),
key,
value,
Streams.zip(
Streams.zip(
IntStream.range(0, Integer.MAX_VALUE).boxed(),
envelope.getHeaders().stream(),
Tuple::new
).filter(t -> t.getA() % 2 == 0).map(Tuple::getB), // Even indexes: 0,2,4,...
Streams.zip(
IntStream.range(0, Integer.MAX_VALUE).boxed(),
envelope.getHeaders().stream(),
Tuple::new
).filter(t -> t.getA() % 2 == 1).map(Tuple::getB), // Odd indexes: 1,3,5,...
(k, v) -> new RecordHeader(k, v.getBytes(StandardCharsets.UTF_8)))
.collect(Collectors.toList())
);
producer.send(record);
}
}

producer.flush();
System.err.println("Data imported successfully.");

} catch (Exception ex) {
System.err.println("Data import failed: " + ex.getMessage());
ex.printStackTrace(System.err);
}
}


private Producer<byte[], byte[]> createKafkaProducer() {

Properties props = new Properties();

props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServer);
props.putIfAbsent(ProducerConfig.CLIENT_ID_CONFIG, "Producer-kafkasql-journal");
props.putIfAbsent(ProducerConfig.ACKS_CONFIG, "all");
props.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
props.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());

return new KafkaProducer<>(props);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2020 JBoss Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.apicurio.registry.tools.kafkasqltopicimport;

import picocli.CommandLine;

/**
* @author Jakub Senko <em>[email protected]</em>
*/
public class Main {

public static void main(String[] args) {

CommandLine cmd = new CommandLine(new ImportCommand());
int exitCode = cmd.execute(args);
System.exit(exitCode);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2020 Red Hat
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.apicurio.registry.tools.kafkasqltopicimport;

import lombok.*;

/**
* @author Jakub Senko <em>[email protected]</em>
*/
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Getter
@Setter
@EqualsAndHashCode
@ToString
public class Tuple<A, B> {

private A a;

private B b;
}

0 comments on commit 5791af3

Please sign in to comment.