Skip to content

Commit

Permalink
Calculate average brake temperature
Browse files Browse the repository at this point in the history
  • Loading branch information
Sabby Anandan committed Aug 12, 2020
1 parent 5636d19 commit 2502da4
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.support.serializer.JsonSerde;

import java.time.Duration;
import java.util.Date;
import java.util.function.Function;

@SpringBootApplication
Expand All @@ -20,24 +23,41 @@ public static void main(String[] args) {
}

@Bean
public Function<KStream<Object, Truck>, KStream<String, Long>> process() {
public Function<KStream<Object, Truck>, KStream<String, AverageBrakeTemperatureAccumulator>> processBrakeTemperature() {

return input -> input
.map((k, v) -> new KeyValue<>(((Truck) v).getId(), v))
.map((k, v) -> new KeyValue<>(v.getId(), v))
.groupByKey(Grouped.with(Serdes.String(), new JsonSerde<>(Truck.class)))
// .windowedBy(TimeWindows.of(10000))
.count(Materialized.as("foo"))
.toStream();

.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
.aggregate(() -> new AverageBrakeTemperatureAccumulator(0, 0F),
(k, v, agg) -> {
agg.setCount(agg.getCount() + 1);
agg.setTotalValue(v.getBrakeTemperature());
agg.setAverage(agg.getTotalValue() / agg.getCount());
return agg;
},
Materialized.with(Serdes.String(), new JsonSerde<>(AverageBrakeTemperatureAccumulator.class)))
.toStream()
.map((k, v) -> {
v.setStart(new Date(k.window().start()));
v.setEnd(new Date(k.window().end()));
return new KeyValue<>(k.key(), v);
});
}

class Accumulator {
class AverageBrakeTemperatureAccumulator {

private int count;

private int totalValue;
private Float totalValue;

private double average;

private Date start;

private Date end;

public Accumulator(int count, int totalValue) {
public AverageBrakeTemperatureAccumulator(int count, Float totalValue) {
this.count = count;
this.totalValue = totalValue;
}
Expand All @@ -50,12 +70,36 @@ public void setCount(int count) {
this.count = count;
}

public int getTotalValue() {
public Float getTotalValue() {
return totalValue;
}

public void setTotalValue(int totalValue) {
public void setTotalValue(Float totalValue) {
this.totalValue = totalValue;
}

public double getAverage() {
return average;
}

public void setAverage(double average) {
this.average = average;
}

public Date getStart() {
return start;
}

public void setStart(Date start) {
this.start = start;
}

public Date getEnd() {
return end;
}

public void setEnd(Date end) {
this.end = end;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ public static void main(String[] args) {

@Bean
@Scheduled(fixedRate = 1000L)
public Supplier<Truck> supplier() {
return () -> randomCar();
public Supplier<Truck> generateTruck() {
return () -> randomTruck();
}

private Truck randomCar() {
private Truck randomTruck() {
Truck truck = new Truck();
truck.setId(VIN_LIST.get(random.nextInt(VIN_LIST.size())));
truck.setAcceleration(random.nextFloat() * 10);
Expand Down

0 comments on commit 2502da4

Please sign in to comment.