Skip to content

Commit

Permalink
Merge pull request #10 from Nordstrom/bugfix/avro
Browse files Browse the repository at this point in the history
V2.0 Bugfix for record/playback of avro messages
  • Loading branch information
SgtPepperLHCB authored Jan 29, 2021
2 parents da7caa4 + 4697695 commit 13e913f
Show file tree
Hide file tree
Showing 24 changed files with 306 additions and 249 deletions.
59 changes: 44 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,18 @@
# kcr
Kafka Cassette Recorder

A utility to record and playback messages from any Kafka topic. This tool can be useful to capture
production data streams for playback in a disaster recovery scenario or for load testing.

Message key and value are stored as ASCII-encoded hexidecimal along with message header parameters
(stored as string key/value).

Messages are stored in a 'cassette' (data directory) by partition. Playback of a cassette is at the
same relative rate as was captured so message-rate peaks and valleys of the captured message stream is
reconstructed.

Storage format is in json.

## Usage

```
Expand Down Expand Up @@ -35,11 +47,17 @@ Usage: kcr record [OPTIONS]
Record a Kafka topic to a cassette.
Options:
--data-directory TEXT Kafka Cassette Recorder data directory for recording
(default=./data)
--group-id TEXT Kafka consumer group id (default=kcr-<topic>-gid
--topic TEXT Kafka topic to record (REQUIRED)
-h, --help Show this message and exit
--data-directory TEXT Kafka Cassette Recorder data directory for
recording (default=kcr)
--group-id TEXT Kafka consumer group id (default=kcr-<topic>-gid)
--topic TEXT Kafka topic to record (REQUIRED)
--duration TEXT Kafka duration for recording, format must be like
**h**m**s
--header-timestamp TEXT Use timestamp from header parameter ignoring record
timestamp
--consumer-config TEXT Optional Kafka Consumer configuration file.
OVERWRITES any command-line values.
-h, --help Show this message and exit
```

### Play
Expand All @@ -52,9 +70,19 @@ Usage: kcr play [OPTIONS]
Playback a cassette to a Kafka topic.
Options:
--cassette TEXT Kafka Cassette Recorder directory for playback (REQUIRED)
--topic TEXT Kafka topic to write (REQUIRED)
-h, --help Show this message and exit
--cassette TEXT Kafka Cassette Recorder directory for playback
(REQUIRED)
--playback-rate FLOAT Playback rate multiplier (1.0 = play at capture
rate, 2.0 = playback at twice capture rate)
--topic TEXT Kafka topic to write (REQUIRED)
--producer-config TEXT Optional Kafka Producer configuration file.
OVERWRITES any command-line values.
--info List information about a Cassette, then exit
--pause Pause at end of playback (ctrl-c to exit)
--number-of-runs TEXT Number of times to run the playback
--duration TEXT Kafka duration for playback, format must be like
**h**m**s
-h, --help Show this message and exit
```

## Metrics
Expand Down Expand Up @@ -90,35 +118,35 @@ gradle clean build
Create a recording from a simple, local cluster:

```
java -jar ./build/libs/kcr.jar record --topic my-topic --data-directory data
java -jar ./build/libs/kcr-all.jar record --topic my-topic --data-directory data
```

Create a recording from secure cluster, like Confluent Cloud:

```
java -jar ./build/libs/kcr.jar --bootstrap-servers $MY_BOOTSTRAP_SERVERS --security-protocol SASL_PLAIN --sasl-mechanism PLAIN --sasl-username $MY_SASL_USERNAME --sasl-password $MY_SASL_PASSWORD record --topic my-topic --data-directory data
java -jar ./build/libs/kcr-all.jar --bootstrap-servers $MY_BOOTSTRAP_SERVERS --security-protocol SASL_PLAIN --sasl-mechanism PLAIN --sasl-username $MY_SASL_USERNAME --sasl-password $MY_SASL_PASSWORD record --topic my-topic --data-directory data
```

### Playback (wip)

Playback is at the capture rate of the cassette (i.e., if you recorded a stream with 5 message/sec, playback will also be at 5 message/sec)

```
java -jar ./build/libs/kcr.jar play --cassette data/my-topic-yyyymmdd_hhmm --topic my-topic-too
java -jar ./build/libs/kcr-all.jar play --cassette data/my-topic-yyyymmdd_hhmm --topic my-topic-too
```

### Helper scripts

```
./scripts/topic-record <TOPIC>
./scripts/kcr-record <TOPIC>
#e.g., ./scripts/topic-record sea-of-time
#e.g., ./scripts/kcr-record sea-of-time
```

```
./scripts/topic-playback <TARGET_TOPIC> <CASSETTE_DIR>
./scripts/kcr-playback <TARGET_TOPIC> <CASSETTE_DIR>
#e.g., ./scripts/topic-playback sea-of-science ./data/kcr-sea-of-time-20190517-1708
#e.g., ./scripts/kcr-playback sea-of-science ./data/kcr-sea-of-time-20190517-1708
```

# Example
Expand All @@ -130,5 +158,6 @@ The `./example` directory has a `docker-compose.yml` that will start a local kaf

# Roadmap

* Switch to `picocli` for command-line arguments
* Use `avro` for cassette format
* Record / playback from AWS S3
112 changes: 58 additions & 54 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -1,19 +1,27 @@
buildscript {
ext.kotlin_version = '1.3.31'
repositories {
mavenCentral()
jcenter()
}
dependencies {
classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version"
classpath "org.jetbrains.kotlin:kotlin-serialization:$kotlin_version"
}
plugins {
// NB: plugin versions pinned in settings.gradle
id("org.jetbrains.kotlin.jvm")
id("org.jetbrains.kotlin.plugin.serialization")
id("application")
id("com.github.johnrengelman.shadow")
id("com.adarshr.test-logger")
id("com.diffplug.spotless")
}

apply plugin: "java"
apply plugin: "kotlin"
apply plugin: "kotlinx-serialization"
apply plugin: "application"
ext {
commonsCodecVersion = "1.15"
kafkaVersion = "2.5.1"

kotlinxCoroutinesVersion = "1.4.2"
kotlinxSerializationVersion = "1.0.0"
kotestVersion = "4.3.1"

jupiterVersion = "5.7.0"

picocliVersion = "4.6.1"

slf4jVersion = "1.7.30"
}

group = "com.nordstrom.kafka.kcr"

Expand All @@ -24,23 +32,36 @@ sourceSets {
}

repositories {
mavenCentral()
jcenter()
maven { url "https://kotlin.bintray.com/kotlinx" }
}

dependencies {
compile "org.jetbrains.kotlin:kotlin-stdlib:$kotlin_version"
compile "org.apache.kafka:kafka-clients:2.2.0"
compile "org.slf4j:slf4j-simple:1.7.9"
compile "org.jetbrains.kotlinx:kotlinx-serialization-runtime:0.11.0"
// Align versions of all Kotlin components
implementation platform('org.jetbrains.kotlin:kotlin-bom')

compile "com.github.ajalt:clikt:1.7.0"
compile "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.1.1"
// Use the Kotlin JDK standard libraries
implementation("org.jetbrains.kotlin:kotlin-stdlib")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlinxCoroutinesVersion")
implementation("org.jetbrains.kotlinx:kotlinx-serialization-core:$kotlinxSerializationVersion")
implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:$kotlinxSerializationVersion")

compile "io.micrometer:micrometer-registry-statsd:latest.release"
compile "io.micrometer:micrometer-registry-jmx:latest.release"
testCompile "org.jetbrains.kotlin:kotlin-test-junit:$kotlin_version"
// Application dependencies
// implementation("info.picocli:picocli:$picocliVersion")
implementation("org.slf4j:slf4j-api:$slf4jVersion")
implementation("org.slf4j:slf4j-simple:$slf4jVersion")
implementation("org.apache.kafka:kafka-clients:$kafkaVersion")
implementation("com.github.ajalt:clikt:1.7.0")
implementation("io.micrometer:micrometer-registry-statsd:latest.release")
implementation("io.micrometer:micrometer-registry-jmx:latest.release")
implementation("commons-codec:commons-codec:$commonsCodecVersion")

// Test
testImplementation("org.jetbrains.kotlin:kotlin-test")
testImplementation("io.kotest:kotest-assertions-core-jvm:$kotestVersion")
testImplementation("io.kotest:kotest-runner-junit5-jvm:$kotestVersion")
testImplementation("org.junit.jupiter:junit-jupiter-api:${jupiterVersion}")
testRuntimeOnly("org.jetbrains.kotlin:kotlin-reflect:$kotlinVersion")
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:${jupiterVersion}")
}

tasks.withType(org.jetbrains.kotlin.gradle.tasks.KotlinCompile).all {
Expand All @@ -50,40 +71,23 @@ tasks.withType(org.jetbrains.kotlin.gradle.tasks.KotlinCompile).all {
freeCompilerArgs += "-Xuse-experimental=kotlin.Experimental"
}
}

test {
useJUnitPlatform()
}

dependencies {
testImplementation 'io.kotlintest:kotlintest-runner-junit5:3.3.2'
}
dependsOn("cleanTest")
apply plugin: "com.adarshr.test-logger"

//test.testClassesDir = project.tasks.compileTestKotlin.destinationDir
//
//task wrapper(type: Wrapper) {
// gradleVersion="5.4"
//}

ext {
moduleName = 'com.nordstrom.kafka.kcr'
moduleLaunchPoint = mainClassName
}
task jlink(type: Exec, dependsOn: ['clean', 'jar']) {
def javaHome = System.properties.'java.home'

workingDir 'build'

commandLine "${javaHome}/bin/jlink",
'--module-path', "libs${File.pathSeparatorChar}${javaHome}/jmods",
'--strip-debug', '--no-header-files', '--no-man-pages', '--compress', '2',
'--add-modules', "${moduleName}",
'--launcher', "launch=${moduleName}/${moduleLaunchPoint}",
'--output', 'dist'
testlogger {
theme("mocha-parallel")
showExceptions(true)
showStandardStreams(true)
}
}

jar {
manifest {
attributes 'Main-Class': mainClassName
spotless {
java {
googleJavaFormat("1.9")
removeUnusedImports()
}
from { configurations.compile.collect { it.isDirectory() ? it : zipTree(it) } }
}
4 changes: 3 additions & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
kotlin.code.style=official
kotlin.code.style=official

kotlinVersion = 1.4.21
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-4.10.3-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-6.8.1-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
2 changes: 1 addition & 1 deletion scripts/kcr-playback
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ CASSETTE=$2

#needed to enable monitoring tools (jconsole, visualvm, etc)
OPTS="-Djava.rmi.server.hostname=127.0.0.1"
java $OPTS -jar build/libs/kcr.jar play --cassette $CASSETTE --topic $TOPIC
java $OPTS -jar build/libs/kcr-all.jar play --cassette $CASSETTE --topic $TOPIC
2 changes: 1 addition & 1 deletion scripts/kcr-playback-fast
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ CASSETTE=$2

#needed to enable monitoring tools (jconsole, visualvm, etc)
OPTS="-Djava.rmi.server.hostname=127.0.0.1"
java $OPTS -jar build/libs/kcr.jar play --cassette $CASSETTE --topic $TOPIC --playback-rate -1.0
java $OPTS -jar build/libs/kcr-all.jar play --cassette $CASSETTE --topic $TOPIC --playback-rate -1.0
2 changes: 1 addition & 1 deletion scripts/kcr-record
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ TOPIC=$1

#needed to enable monitoring tools (jconsole, visualvm, etc)
OPTS="-Djava.rmi.server.hostname=127.0.0.1"
java $OPTS -jar build/libs/kcr.jar record --topic $TOPIC
java $OPTS -jar build/libs/kcr-all.jar record --topic $TOPIC
11 changes: 0 additions & 11 deletions scripts/kcr-record-meadow

This file was deleted.

2 changes: 1 addition & 1 deletion scripts/kcr-run
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@

#needed to enable monitoring tools (jconsole, visualvm, etc)
OPTS="-Djava.rmi.server.hostname=127.0.0.1"
java $OPTS -jar build/libs/kcr.jar $@
java $OPTS -jar build/libs/kcr-all.jar $@
12 changes: 11 additions & 1 deletion settings.gradle
Original file line number Diff line number Diff line change
@@ -1,2 +1,12 @@
rootProject.name = 'kcr'
pluginManagement {
plugins {
id("org.jetbrains.kotlin.jvm") version "$kotlinVersion"
id("org.jetbrains.kotlin.plugin.serialization") version "$kotlinVersion"
id("application")
id("com.github.johnrengelman.shadow") version "5.2.0"
id("com.adarshr.test-logger") version "2.1.1"
id("com.diffplug.spotless") version "5.9.0"
}
}

rootProject.name = 'kcr'
12 changes: 1 addition & 11 deletions src/main/kotlin/com/nordstrom/kafka/kcr/Kcr.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,14 @@ import com.nordstrom.kafka.kcr.cassette.CassetteVersion
import com.nordstrom.kafka.kcr.commands.Play
import com.nordstrom.kafka.kcr.commands.Record
import com.nordstrom.kafka.kcr.facilities.AlphaNumKeyGenerator
import io.micrometer.core.instrument.Clock
import io.micrometer.core.instrument.binder.kafka.KafkaConsumerMetrics
import io.micrometer.core.instrument.composite.CompositeMeterRegistry
import io.micrometer.core.instrument.util.HierarchicalNameMapper
import io.micrometer.jmx.JmxConfig
import io.micrometer.jmx.JmxMeterRegistry
import io.micrometer.statsd.StatsdConfig
import io.micrometer.statsd.StatsdFlavor
import io.micrometer.statsd.StatsdMeterRegistry
import org.slf4j.LoggerFactory
import java.time.Duration
import java.util.*


class KcrVersion {
//TODO derive from gradle build.
companion object {
const val VERSION = "0.2"
const val VERSION = "2.0"
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.nordstrom.kafka.kcr.cassette

import kotlinx.serialization.decodeFromString
import kotlinx.serialization.json.Json
import java.io.File
import java.time.Duration
Expand All @@ -20,8 +21,7 @@ class CassetteInfo(val cassette: String) {
count = 0
reader.useLines { lines ->
lines.forEach { line ->
@UseExperimental(kotlinx.serialization.UnstableDefault::class)
val record = Json.parse(CassetteRecord.serializer(), line)
val record = Json.decodeFromString<CassetteRecord>(line)
if (record.timestamp < first) {
first = record.timestamp
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,15 @@ import kotlinx.serialization.Serializable
@Serializable
class CassetteRecord(
val headers: MutableMap<String, String> = mutableMapOf<String, String>(),
val timestamp: Long,
var timestamp: Long,
val partition: Int,
val offset: Long,
val key: String? = null,
val value: String
)
) {
fun withHeaderTimestamp(key: String) {
if (headers.containsKey(key)) {
timestamp = headers[key]?.toLong() ?: timestamp
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@ package com.nordstrom.kafka.kcr.cassette

class CassetteVersion {
companion object {
const val VERSION = "0.1"
const val VERSION = "2.0"
}
}
Loading

0 comments on commit 13e913f

Please sign in to comment.