Skip to content

Commit

Permalink
Spike on a new API for Kafka Streams support
Browse files Browse the repository at this point in the history
  • Loading branch information
Calderwood committed May 16, 2019
1 parent a4e5922 commit 16b35a8
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@
* @param <K> The key deserializer type
* @param <V> The value deserializer type
*/
public class AbstractKafkaStreamsConfiguration<K, V> extends AbstractKafkaConfiguration<K, V> {
public abstract class AbstractKafkaStreamsConfiguration<K, V> extends AbstractKafkaConfiguration<K, V> implements KafkaStreamsConfiguration {

/**
* Construct a new {@link KafkaStreamsConfiguration} for the given defaults.
* Construct a new {@link AbstractKafkaStreamsConfiguration} for the given defaults.
*
* @param defaultConfiguration The default configuration
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ public class ConfiguredStreamBuilder extends StreamsBuilder {
*
* @param configuration The configuration
*/
public ConfiguredStreamBuilder(Properties configuration) {
this.configuration.putAll(configuration);
public ConfiguredStreamBuilder(KafkaStreamsConfiguration configuration) {
this.configuration.putAll(configuration.getConfig());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

import javax.inject.Named;
import javax.inject.Singleton;
import java.util.Properties;

/**
* The default streams configuration is non other is present.
Expand All @@ -33,14 +32,14 @@
* @param <K>
* @param <V>
*/
@Requires(missingProperty = KafkaStreamsConfiguration.PREFIX + ".default")
@Requires(missingProperty = NamedKafkaStreamsConfiguration.PREFIX + ".default")
@Singleton
@Requires(beans = KafkaDefaultConfiguration.class)
@Named("default")
@Primary
public class DefaultKafkaStreamsConfiguration<K, V> extends AbstractKafkaStreamsConfiguration<K, V> {
/**
* Construct a new {@link KafkaStreamsConfiguration} for the given defaults.
* Construct a new {@link DefaultKafkaStreamsConfiguration} for the given defaults.
*
* @param defaultConfiguration The default configuration
* @param applicationConfiguration The application configuration
Expand All @@ -50,8 +49,6 @@ public DefaultKafkaStreamsConfiguration(KafkaDefaultConfiguration defaultConfigu
ApplicationConfiguration applicationConfiguration,
Environment environment) {
super(defaultConfiguration);
Properties config = getConfig();
config.putAll(defaultConfiguration.getConfig());
init(applicationConfiguration, environment, config);
init(applicationConfiguration, environment, getConfig());
}
}
Original file line number Diff line number Diff line change
@@ -1,65 +1,7 @@
/*
* Copyright 2017-2019 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.configuration.kafka.streams;

import io.micronaut.configuration.kafka.config.KafkaDefaultConfiguration;
import io.micronaut.context.annotation.EachProperty;
import io.micronaut.context.annotation.Parameter;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.env.Environment;
import io.micronaut.core.naming.NameUtils;
import io.micronaut.runtime.ApplicationConfiguration;

import java.util.Properties;

import static io.micronaut.configuration.kafka.streams.KafkaStreamsConfiguration.PREFIX;

/**
* The default configuration passed to {@link org.apache.kafka.streams.KafkaStreams}.
*
* @param <K> The generic key type
* @param <V> The generic value type
*/
@EachProperty(value = PREFIX, primary = "default")
@Requires(beans = KafkaDefaultConfiguration.class)
public class KafkaStreamsConfiguration<K, V> extends AbstractKafkaStreamsConfiguration<K, V> {

/**
* The default streams configuration.
*/
public static final String PREFIX = "kafka.streams";


/**
* Construct a new {@link KafkaStreamsConfiguration} for the given defaults.
*
* @param streamName The stream name
* @param defaultConfiguration The default configuration
* @param applicationConfiguration The application configuration
* @param environment The environment
*/
public KafkaStreamsConfiguration(
@Parameter String streamName,
KafkaDefaultConfiguration defaultConfiguration,
ApplicationConfiguration applicationConfiguration,
Environment environment) {
super(defaultConfiguration);
Properties config = getConfig();
String propertyKey = PREFIX + '.' + NameUtils.hyphenate(streamName, true);
config.putAll(environment.getProperty(propertyKey, Properties.class).orElseGet(Properties::new));
init(applicationConfiguration, environment, config);
}
public interface KafkaStreamsConfiguration {
Properties getConfig();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,12 @@

import io.micronaut.context.annotation.*;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.kstream.KStream;

import javax.annotation.PreDestroy;
import java.io.Closeable;
import java.time.Duration;
import java.util.Collection;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.TimeUnit;


/**
* A factory that constructs the {@link KafkaStreams} bean.
Expand All @@ -38,30 +35,15 @@ public class KafkaStreamsFactory implements Closeable {

private final Collection<KafkaStreams> streams = new ConcurrentLinkedDeque<>();

/**
* Exposes the {@link ConfiguredStreamBuilder} as a bean.
*
* @param configuration The configuration
* @return The streams builder
*/
@EachBean(AbstractKafkaStreamsConfiguration.class)
ConfiguredStreamBuilder streamsBuilder(AbstractKafkaStreamsConfiguration configuration) {
return new ConfiguredStreamBuilder(configuration.getConfig());
}

/**
* Builds the default {@link KafkaStreams} bean from the configuration and the supplied {@link ConfiguredStreamBuilder}.
*
* @param builder The builder
* @param kStreams The KStream definitions
* @return The {@link KafkaStreams} bean
*/
@EachBean(AbstractKafkaStreamsConfiguration.class)
@EachBean(ConfiguredStreamBuilder.class)
@Context
KafkaStreams kafkaStreams(
ConfiguredStreamBuilder builder,
// required for initialization. DO NOT DELETE
KStream... kStreams) {
KafkaStreams kafkaStreams(ConfiguredStreamBuilder builder) {
KafkaStreams kafkaStreams = new KafkaStreams(
builder.build(),
builder.getConfiguration()
Expand All @@ -82,5 +64,4 @@ public void close() {
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright 2017-2019 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.configuration.kafka.streams;

import io.micronaut.configuration.kafka.config.KafkaDefaultConfiguration;
import io.micronaut.context.annotation.EachProperty;
import io.micronaut.context.annotation.Parameter;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.env.Environment;
import io.micronaut.core.naming.NameUtils;
import io.micronaut.runtime.ApplicationConfiguration;

import java.util.Properties;

import static io.micronaut.configuration.kafka.streams.NamedKafkaStreamsConfiguration.PREFIX;

/**
* The default configuration passed to {@link org.apache.kafka.streams.KafkaStreams}.
*
* @param <K> The generic key type
* @param <V> The generic value type
*/
@EachProperty(value = PREFIX, primary = "default")
@Requires(beans = KafkaDefaultConfiguration.class)
public class NamedKafkaStreamsConfiguration<K, V> extends AbstractKafkaStreamsConfiguration<K, V> {

/**
* The default streams configuration.
*/
public static final String PREFIX = "kafka.streams";


/**
* Construct a new {@link NamedKafkaStreamsConfiguration} for the given defaults.
*
* @param streamName The stream name
* @param defaultConfiguration The default configuration
* @param applicationConfiguration The application configuration
* @param environment The environment
*/
public NamedKafkaStreamsConfiguration(
@Parameter String streamName,
KafkaDefaultConfiguration defaultConfiguration,
ApplicationConfiguration applicationConfiguration,
Environment environment) {
super(defaultConfiguration);
Properties config = getConfig();
String propertyKey = PREFIX + '.' + NameUtils.hyphenate(streamName, true);
config.putAll(environment.getProperty(propertyKey, Properties.class).orElseGet(Properties::new));
init(applicationConfiguration, environment, config);
}
}

0 comments on commit 16b35a8

Please sign in to comment.