diff --git a/brake-temperature/src/main/java/com/springone/braketemperature/BrakeTemperatureApplication.java b/brake-temperature/src/main/java/com/springone/braketemperature/BrakeTemperatureApplication.java index bd8898b..a6381e4 100644 --- a/brake-temperature/src/main/java/com/springone/braketemperature/BrakeTemperatureApplication.java +++ b/brake-temperature/src/main/java/com/springone/braketemperature/BrakeTemperatureApplication.java @@ -5,11 +5,14 @@ import org.apache.kafka.streams.kstream.Grouped; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.TimeWindows; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.kafka.support.serializer.JsonSerde; +import java.time.Duration; +import java.util.Date; import java.util.function.Function; @SpringBootApplication @@ -20,24 +23,41 @@ public static void main(String[] args) { } @Bean - public Function, KStream> process() { + public Function, KStream> processBrakeTemperature() { return input -> input - .map((k, v) -> new KeyValue<>(((Truck) v).getId(), v)) + .map((k, v) -> new KeyValue<>(v.getId(), v)) .groupByKey(Grouped.with(Serdes.String(), new JsonSerde<>(Truck.class))) - // .windowedBy(TimeWindows.of(10000)) - .count(Materialized.as("foo")) - .toStream(); - + .windowedBy(TimeWindows.of(Duration.ofSeconds(10))) + .aggregate(() -> new AverageBrakeTemperatureAccumulator(0, 0F), + (k, v, agg) -> { + agg.setCount(agg.getCount() + 1); + agg.setTotalValue(v.getBrakeTemperature()); + agg.setAverage(agg.getTotalValue() / agg.getCount()); + return agg; + }, + Materialized.with(Serdes.String(), new JsonSerde<>(AverageBrakeTemperatureAccumulator.class))) + .toStream() + .map((k, v) -> { + v.setStart(new Date(k.window().start())); + v.setEnd(new Date(k.window().end())); + return new KeyValue<>(k.key(), v); + }); } - class Accumulator { + class AverageBrakeTemperatureAccumulator { private int count; - private int totalValue; + private Float totalValue; + + private double average; + + private Date start; + + private Date end; - public Accumulator(int count, int totalValue) { + public AverageBrakeTemperatureAccumulator(int count, Float totalValue) { this.count = count; this.totalValue = totalValue; } @@ -50,12 +70,36 @@ public void setCount(int count) { this.count = count; } - public int getTotalValue() { + public Float getTotalValue() { return totalValue; } - public void setTotalValue(int totalValue) { + public void setTotalValue(Float totalValue) { this.totalValue = totalValue; } + + public double getAverage() { + return average; + } + + public void setAverage(double average) { + this.average = average; + } + + public Date getStart() { + return start; + } + + public void setStart(Date start) { + this.start = start; + } + + public Date getEnd() { + return end; + } + + public void setEnd(Date end) { + this.end = end; + } } } diff --git a/trucks/src/main/java/com/springone/trucks/TrucksApplication.java b/trucks/src/main/java/com/springone/trucks/TrucksApplication.java index 5ef9031..d5e25f6 100644 --- a/trucks/src/main/java/com/springone/trucks/TrucksApplication.java +++ b/trucks/src/main/java/com/springone/trucks/TrucksApplication.java @@ -33,11 +33,11 @@ public static void main(String[] args) { @Bean @Scheduled(fixedRate = 1000L) - public Supplier supplier() { - return () -> randomCar(); + public Supplier generateTruck() { + return () -> randomTruck(); } - private Truck randomCar() { + private Truck randomTruck() { Truck truck = new Truck(); truck.setId(VIN_LIST.get(random.nextInt(VIN_LIST.size()))); truck.setAcceleration(random.nextFloat() * 10);