From ce0838036b3db753b9cb24475dc2bda63190aec4 Mon Sep 17 00:00:00 2001 From: Neeraj Joshi Date: Tue, 1 Dec 2020 11:10:33 -0800 Subject: [PATCH] - allow concurrency to be configured for group to scalar stages - add log statements in groupby sample --- .../samples/stage/AggregationStage.java | 5 ++ .../runtime/executor/StageExecutors.java | 50 +++++++++++++------ 2 files changed, 41 insertions(+), 14 deletions(-) diff --git a/mantis-examples/mantis-examples-groupby-sample/src/main/java/com/netflix/mantis/samples/stage/AggregationStage.java b/mantis-examples/mantis-examples-groupby-sample/src/main/java/com/netflix/mantis/samples/stage/AggregationStage.java index b3bc542c4..7fe83f555 100644 --- a/mantis-examples/mantis-examples-groupby-sample/src/main/java/com/netflix/mantis/samples/stage/AggregationStage.java +++ b/mantis-examples/mantis-examples-groupby-sample/src/main/java/com/netflix/mantis/samples/stage/AggregationStage.java @@ -52,6 +52,7 @@ public class AggregationStage implements GroupToScalarComputation aggregate(GroupedObservable> go) { return go.reduce(RequestAggregation.builder().build(), (accumulator, value) -> { + log.info("aggregating " + go.getKey() + " on Thread " + Thread.currentThread().getName()); accumulator.setCount(accumulator.getCount() + value.getValue().getLatency()); accumulator.setPath(go.getKey()); return accumulator; @@ -72,6 +73,9 @@ private Observable aggregate(GroupedObservable call(Context context, Observable> mantisGroupO) { return mantisGroupO + .doOnNext((mg) -> { + log.info("Received " + mg.getKeyValue() + " on Thread " + Thread.currentThread().getName()); + }) .window(aggregationDurationMsec, TimeUnit.MILLISECONDS) .flatMap((omg) -> omg.groupBy(MantisGroup::getKeyValue) .flatMap(// .map((count) -> RequestAggregation.builder().count(count).path(go.getKey()).build()) @@ -98,6 +102,7 @@ public static GroupToScalar.Config con return new GroupToScalar.Config() .description("sum events for a path") .codec(RequestAggregation.requestAggregationCodec()) + .concurrentInput() .withParameters(getParameters()); } diff --git a/mantis-runtime/src/main/java/io/mantisrx/runtime/executor/StageExecutors.java b/mantis-runtime/src/main/java/io/mantisrx/runtime/executor/StageExecutors.java index 0571b0d48..f8d7ff337 100644 --- a/mantis-runtime/src/main/java/io/mantisrx/runtime/executor/StageExecutors.java +++ b/mantis-runtime/src/main/java/io/mantisrx/runtime/executor/StageExecutors.java @@ -199,7 +199,7 @@ private static Observable> executeMantisGroups(Observabl */ @SuppressWarnings("unchecked") private static Observable> executeMantisGroupsInParallel(Observable>> go, Computation computation, - final Context context, final boolean applyTimeoutToInners, final long timeout) { + final Context context, final boolean applyTimeoutToInners, final long timeout, final int concurrency) { logger.info("initializing {}", computation.getClass().getCanonicalName()); computation.init(context); @@ -207,14 +207,36 @@ private static Observable> executeMantisGroupsInParallel final Func2>, Observable> c = (Func2>, Observable>) computation; - return - go - .lift(new MonitorOperator<>("worker_stage_outer")) - .map(observable -> c - .call(context, observable - .subscribeOn(Schedulers.computation()) - .lift(new MonitorOperator>("worker_stage_inner_input"))) - .lift(new MonitorOperator("worker_stage_inner_output"))); + if(concurrency == StageConfig.DEFAULT_STAGE_CONCURRENCY) { + return + go + .lift(new MonitorOperator<>("worker_stage_outer")) + .map(observable -> c + .call(context, observable + .observeOn(Schedulers.computation()) + .lift(new MonitorOperator<>("worker_stage_inner_input"))) + .lift(new MonitorOperator<>("worker_stage_inner_output"))); + + } else { + + final SingleThreadScheduler[] singleThreadSchedulers = new SingleThreadScheduler[concurrency]; + RxThreadFactory rxThreadFactory = new RxThreadFactory("MantisSingleThreadScheduler-"); + logger.info("creating {} Mantis threads", concurrency); + for (int i = 0; i < concurrency; i++) { + singleThreadSchedulers[i] = new SingleThreadScheduler(rxThreadFactory); + } + + return + go + .lift(new MonitorOperator<>("worker_stage_outer")) + .map(observable -> observable + .groupBy(e -> Math.abs(e.getKeyValue().hashCode()) % concurrency) + .flatMap(gbo -> c + .call(context, gbo + .observeOn(singleThreadSchedulers[gbo.getKey().intValue()]) + .lift(new MonitorOperator>("worker_stage_inner_input"))) + .lift(new MonitorOperator("worker_stage_inner_output")))); + } } /** @@ -279,10 +301,10 @@ private static Observable> executeInnersInParallel(Observab .groupBy(e -> System.nanoTime() % concurrency) .flatMap(go -> c - .call(context, go - .observeOn(singleThreadSchedulers[go.getKey().intValue()]) - .lift(new MonitorOperator<>("worker_stage_inner_input"))) - .lift(new MonitorOperator<>("worker_stage_inner_output")))); + .call(context, go + .observeOn(singleThreadSchedulers[go.getKey().intValue()]) + .lift(new MonitorOperator<>("worker_stage_inner_input"))) + .lift(new MonitorOperator<>("worker_stage_inner_output")))); } } @@ -419,7 +441,7 @@ private static Observable> setupGroupToScalarStage(Group if (inputType == StageConfig.INPUT_STRATEGY.CONCURRENT) { logger.info("Execute Groups in PARALLEL!!!!"); - return executeMantisGroupsInParallel(source, stage.getComputation(), context, true, stage.getKeyExpireTimeSeconds()); + return executeMantisGroupsInParallel(source, stage.getComputation(), context, true, stage.getKeyExpireTimeSeconds(),resolveStageConcurrency(stage.getConcurrency())); } else if (inputType == StageConfig.INPUT_STRATEGY.SERIAL) { Observable>> merged = Observable.just(Observable.merge(source));