Skip to content

Commit

Permalink
Merge pull request #79 from Netflix/configurable_concurrency_groupsca…
Browse files Browse the repository at this point in the history
…lar_stage

Configurable concurrency in Group-Scalar stage
  • Loading branch information
neerajrj authored Dec 1, 2020
2 parents 4b77379 + ce08380 commit a24f534
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public class AggregationStage implements GroupToScalarComputation<String, Reques

private Observable<? extends RequestAggregation> aggregate(GroupedObservable<String, MantisGroup<String, RequestEvent>> go) {
return go.reduce(RequestAggregation.builder().build(), (accumulator, value) -> {
log.info("aggregating " + go.getKey() + " on Thread " + Thread.currentThread().getName());
accumulator.setCount(accumulator.getCount() + value.getValue().getLatency());
accumulator.setPath(go.getKey());
return accumulator;
Expand All @@ -72,6 +73,9 @@ private Observable<? extends RequestAggregation> aggregate(GroupedObservable<Str
@Override
public Observable<RequestAggregation> call(Context context, Observable<MantisGroup<String, RequestEvent>> mantisGroupO) {
return mantisGroupO
.doOnNext((mg) -> {
log.info("Received " + mg.getKeyValue() + " on Thread " + Thread.currentThread().getName());
})
.window(aggregationDurationMsec, TimeUnit.MILLISECONDS)
.flatMap((omg) -> omg.groupBy(MantisGroup::getKeyValue)
.flatMap(// .map((count) -> RequestAggregation.builder().count(count).path(go.getKey()).build())
Expand All @@ -98,6 +102,7 @@ public static GroupToScalar.Config<String, RequestEvent, RequestAggregation> con
return new GroupToScalar.Config<String, RequestEvent,RequestAggregation>()
.description("sum events for a path")
.codec(RequestAggregation.requestAggregationCodec())
.concurrentInput()
.withParameters(getParameters());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,22 +199,44 @@ private static <K, T, R> Observable<Observable<R>> executeMantisGroups(Observabl
*/
@SuppressWarnings("unchecked")
private static <K, T, R> Observable<Observable<R>> executeMantisGroupsInParallel(Observable<Observable<MantisGroup<K, T>>> go, Computation computation,
final Context context, final boolean applyTimeoutToInners, final long timeout) {
final Context context, final boolean applyTimeoutToInners, final long timeout, final int concurrency) {
logger.info("initializing {}", computation.getClass().getCanonicalName());
computation.init(context);

// from groups to observable
final Func2<Context, Observable<MantisGroup<K, T>>, Observable<R>> c
= (Func2<Context, Observable<MantisGroup<K, T>>, Observable<R>>) computation;

return
go
.lift(new MonitorOperator<>("worker_stage_outer"))
.map(observable -> c
.call(context, observable
.subscribeOn(Schedulers.computation())
.lift(new MonitorOperator<MantisGroup<K, T>>("worker_stage_inner_input")))
.lift(new MonitorOperator<R>("worker_stage_inner_output")));
if(concurrency == StageConfig.DEFAULT_STAGE_CONCURRENCY) {
return
go
.lift(new MonitorOperator<>("worker_stage_outer"))
.map(observable -> c
.call(context, observable
.observeOn(Schedulers.computation())
.lift(new MonitorOperator<>("worker_stage_inner_input")))
.lift(new MonitorOperator<>("worker_stage_inner_output")));

} else {

final SingleThreadScheduler[] singleThreadSchedulers = new SingleThreadScheduler[concurrency];
RxThreadFactory rxThreadFactory = new RxThreadFactory("MantisSingleThreadScheduler-");
logger.info("creating {} Mantis threads", concurrency);
for (int i = 0; i < concurrency; i++) {
singleThreadSchedulers[i] = new SingleThreadScheduler(rxThreadFactory);
}

return
go
.lift(new MonitorOperator<>("worker_stage_outer"))
.map(observable -> observable
.groupBy(e -> Math.abs(e.getKeyValue().hashCode()) % concurrency)
.flatMap(gbo -> c
.call(context, gbo
.observeOn(singleThreadSchedulers[gbo.getKey().intValue()])
.lift(new MonitorOperator<MantisGroup<K, T>>("worker_stage_inner_input")))
.lift(new MonitorOperator<R>("worker_stage_inner_output"))));
}
}

/**
Expand Down Expand Up @@ -279,10 +301,10 @@ private static <T, R> Observable<Observable<R>> executeInnersInParallel(Observab
.groupBy(e -> System.nanoTime() % concurrency)
.flatMap(go ->
c
.call(context, go
.observeOn(singleThreadSchedulers[go.getKey().intValue()])
.lift(new MonitorOperator<>("worker_stage_inner_input")))
.lift(new MonitorOperator<>("worker_stage_inner_output"))));
.call(context, go
.observeOn(singleThreadSchedulers[go.getKey().intValue()])
.lift(new MonitorOperator<>("worker_stage_inner_input")))
.lift(new MonitorOperator<>("worker_stage_inner_output"))));
}
}

Expand Down Expand Up @@ -419,7 +441,7 @@ private static <K, T, R> Observable<Observable<R>> setupGroupToScalarStage(Group

if (inputType == StageConfig.INPUT_STRATEGY.CONCURRENT) {
logger.info("Execute Groups in PARALLEL!!!!");
return executeMantisGroupsInParallel(source, stage.getComputation(), context, true, stage.getKeyExpireTimeSeconds());
return executeMantisGroupsInParallel(source, stage.getComputation(), context, true, stage.getKeyExpireTimeSeconds(),resolveStageConcurrency(stage.getConcurrency()));
} else if (inputType == StageConfig.INPUT_STRATEGY.SERIAL) {

Observable<Observable<MantisGroup<K, T>>> merged = Observable.just(Observable.merge(source));
Expand Down

0 comments on commit a24f534

Please sign in to comment.