From 022b7e6d7bec24592293cb4c13730dfc470ac9c4 Mon Sep 17 00:00:00 2001 From: calvin681 Date: Fri, 11 Dec 2020 10:33:09 -0800 Subject: [PATCH] Subscribe to source job worker metrics in job master (#82) * Add mantis publish properties to docs * Update MQL docs for using [*] * Add myself as code owners * Rule base auto scale should not scale down lower than min * add comment * Subscribe to source job drop metrics in job master * add copy right, fix tests * Address review comments. Add parameters to system params. Co-authored-by: Calvin Cheung --- .../descriptor/StageScalingPolicy.java | 3 +- .../connector/job/source/JobSource.java | 15 ++ .../mantis/network/push/PushServerSse.java | 13 +- mantis-runtime/build.gradle | 3 +- .../runtime/parameter/ParameterUtils.java | 29 +++ .../runtime/parameter/SinkParameter.java | 4 +- .../parameter/SourceJobParameters.java | 228 ++++++++++++++++++ .../runtime/parameter/SinkParameterTest.java | 41 +--- .../parameter/SourceJobParametersTest.java | 137 +++++++++++ .../jobmaster/AutoScaleMetricsConfig.java | 72 ++++++ .../worker/jobmaster/JobMasterService.java | 51 +++- .../SourceJobWorkerMetricsSubscription.java | 80 ++++++ .../worker/jobmaster/WorkerMetricHandler.java | 63 ++++- .../jobmaster/AutoScaleMetricsConfigTest.java | 88 +++++++ ...ourceJobWorkerMetricsSubscriptionTest.java | 89 +++++++ .../jobmaster/WorkerMetricHandlerTest.java | 76 ++++++ 16 files changed, 941 insertions(+), 51 deletions(-) create mode 100644 mantis-runtime/src/main/java/io/mantisrx/runtime/parameter/SourceJobParameters.java create mode 100644 mantis-runtime/src/test/java/io/mantisrx/runtime/parameter/SourceJobParametersTest.java create mode 100644 mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/SourceJobWorkerMetricsSubscription.java create mode 100644 mantis-server/mantis-server-worker/src/test/java/io/mantisrx/server/worker/jobmaster/AutoScaleMetricsConfigTest.java create mode 100644 mantis-server/mantis-server-worker/src/test/java/io/mantisrx/server/worker/jobmaster/SourceJobWorkerMetricsSubscriptionTest.java diff --git a/mantis-common/src/main/java/io/mantisrx/runtime/descriptor/StageScalingPolicy.java b/mantis-common/src/main/java/io/mantisrx/runtime/descriptor/StageScalingPolicy.java index 704a2f439..fa2842eb0 100644 --- a/mantis-common/src/main/java/io/mantisrx/runtime/descriptor/StageScalingPolicy.java +++ b/mantis-common/src/main/java/io/mantisrx/runtime/descriptor/StageScalingPolicy.java @@ -179,7 +179,8 @@ public enum ScalingReason { Clutch, ClutchExperimental, RPS, - JVMMemory + JVMMemory, + SourceJobDrop } public static class RollingCount { diff --git a/mantis-connectors/mantis-connector-job/src/main/java/io/mantisrx/connector/job/source/JobSource.java b/mantis-connectors/mantis-connector-job/src/main/java/io/mantisrx/connector/job/source/JobSource.java index 008747f36..3d45110da 100644 --- a/mantis-connectors/mantis-connector-job/src/main/java/io/mantisrx/connector/job/source/JobSource.java +++ b/mantis-connectors/mantis-connector-job/src/main/java/io/mantisrx/connector/job/source/JobSource.java @@ -130,6 +130,10 @@ public Observable> call(Context context, Index return sourceObs; } + /** + * Use {@link io.mantisrx.runtime.parameter.SourceJobParameters.TargetInfo} instead. + */ + @Deprecated public static class TargetInfo { public String sourceJobName; @@ -163,6 +167,10 @@ protected static List parseInputParameters(Context ctx) { return parseTargetInfo(targetListStr); } + /** + * Use {@link io.mantisrx.runtime.parameter.SourceJobParameters#parseTargetInfo(String)} instead. + */ + @Deprecated protected static List parseTargetInfo(String targetListStr) { List targetList = new ArrayList(); JsonObject requestObj = (JsonObject) parser.parse(targetListStr); @@ -211,6 +219,10 @@ protected static List parseTargetInfo(String targetListStr) { return targetList; } + /** + * Use {@link io.mantisrx.runtime.parameter.SourceJobParameters.TargetInfoBuilder} instead. + */ + @Deprecated public static class TargetInfoBuilder { private String sourceJobName; @@ -273,6 +285,8 @@ public TargetInfo build() { } /** + * Use {@link io.mantisrx.runtime.parameter.SourceJobParameters#enforceClientIdConsistency(List, String)} instead. + * * Ensures that a list of TargetInfo contains a sane set of sourceJobName, ClientId pairs. * TODO: Currently mutates the list, which isn't problematic here, but it would be prudent to clean this up. * @@ -280,6 +294,7 @@ public TargetInfo build() { * * @return The original List modified to have consistent clientIds. */ + @Deprecated public static List enforceClientIdConsistency(List targets, String defaultClientId) { targets.sort(Comparator.comparing(t -> t.criterion)); diff --git a/mantis-network/src/main/java/io/reactivex/mantis/network/push/PushServerSse.java b/mantis-network/src/main/java/io/reactivex/mantis/network/push/PushServerSse.java index 13a1e8df2..173372548 100644 --- a/mantis-network/src/main/java/io/reactivex/mantis/network/push/PushServerSse.java +++ b/mantis-network/src/main/java/io/reactivex/mantis/network/push/PushServerSse.java @@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import io.mantisrx.common.compression.CompressionUtils; import io.mantisrx.mql.shaded.clojure.java.api.Clojure; import io.mantisrx.mql.shaded.clojure.lang.IFn; import com.netflix.spectator.api.BasicTag; @@ -58,8 +59,12 @@ public class PushServerSse extends PushServer { private static final Logger logger = LoggerFactory.getLogger(PushServerSse.class); + public static final String PUSH_SERVER_METRIC_GROUP_NAME = "PushServerSse"; + public static final String PUSH_SERVER_LEGACY_METRIC_GROUP_NAME = "ServerSentEventRequestHandler"; public static final String PROCESSED_COUNTER_METRIC_NAME = "processedCounter"; public static final String DROPPED_COUNTER_METRIC_NAME = "droppedCounter"; + public static final String CLIENT_ID_TAG_NAME = "clientId"; + public static final String SOCK_ADDR_TAG_NAME = "sockAddr"; private static IFn require = Clojure.var("io.mantisrx.mql.shaded.clojure.core", "require"); @@ -97,10 +102,10 @@ public PushServerSse(PushTrigger trigger, ServerConfig config, } private Metrics registerSseMetrics(String uniqueClientId, String socketAddrStr) { - final BasicTag clientIdTag = new BasicTag("clientId", Optional.ofNullable(uniqueClientId).orElse("none")); - final BasicTag sockAddrTag = new BasicTag("sockAddr", Optional.ofNullable(socketAddrStr).orElse("none")); + final BasicTag clientIdTag = new BasicTag(CLIENT_ID_TAG_NAME, Optional.ofNullable(uniqueClientId).orElse("none")); + final BasicTag sockAddrTag = new BasicTag(SOCK_ADDR_TAG_NAME, Optional.ofNullable(socketAddrStr).orElse("none")); - final String metricGroup = supportLegacyMetrics ? "ServerSentEventRequestHandler" : "PushServerSse"; + final String metricGroup = supportLegacyMetrics ? PUSH_SERVER_LEGACY_METRIC_GROUP_NAME : PUSH_SERVER_METRIC_GROUP_NAME; Metrics sseSinkMetrics = new Metrics.Builder() .id(metricGroup, clientIdTag, sockAddrTag) .addCounter(PROCESSED_COUNTER_METRIC_NAME) @@ -156,7 +161,7 @@ public Observable handle( predicateFunction = predicate.call(queryParameters); } - byte[] delimiter = "$$$".getBytes(); + byte[] delimiter = CompressionUtils.MANTIS_SSE_DELIMITER_BINARY; if (queryParameters != null && !queryParameters.isEmpty()) { diff --git a/mantis-runtime/build.gradle b/mantis-runtime/build.gradle index dd039cb2c..0fb40d3fd 100644 --- a/mantis-runtime/build.gradle +++ b/mantis-runtime/build.gradle @@ -15,7 +15,8 @@ */ test { - exclude 'io/mantisrx/runtime/**' // exclude remote tests that require ports + exclude 'io/mantisrx/runtime/executor/**' // exclude remote tests that require ports + exclude 'io/mantisrx/runtime/source/**' // exclude remote tests that require ports } dependencies { diff --git a/mantis-runtime/src/main/java/io/mantisrx/runtime/parameter/ParameterUtils.java b/mantis-runtime/src/main/java/io/mantisrx/runtime/parameter/ParameterUtils.java index 9dc84e815..1b5659385 100644 --- a/mantis-runtime/src/main/java/io/mantisrx/runtime/parameter/ParameterUtils.java +++ b/mantis-runtime/src/main/java/io/mantisrx/runtime/parameter/ParameterUtils.java @@ -42,6 +42,9 @@ public class ParameterUtils { public static final String JOB_MASTER_AUTOSCALE_CONFIG_SYSTEM_PARAM = "mantis.jobmaster.autoscale.adaptive.config"; public static final String JOB_MASTER_CLUTCH_SYSTEM_PARAM = "mantis.jobmaster.clutch.config"; public static final String JOB_MASTER_CLUTCH_EXPERIMENTAL_PARAM = "mantis.jobmaster.clutch.experimental.enabled"; + public static final String JOB_MASTER_AUTOSCALE_SOURCEJOB_METRIC_PARAM = "mantis.jobmaster.autoscale.sourcejob.metric.enabled"; + public static final String JOB_MASTER_AUTOSCALE_SOURCEJOB_TARGET_PARAM = "mantis.jobmaster.autoscale.sourcejob.target"; + public static final String JOB_MASTER_AUTOSCALE_SOURCEJOB_DROP_METRIC_PATTERNS_PARAM = "mantis.jobmaster.autoscale.sourcejob.dropMetricPatterns"; public static final String PER_STAGE_JVM_OPTS_FORMAT = "MANTIS_WORKER_JVM_OPTS_STAGE%d"; public static final String STAGE_CONCURRENCY = "mantis.stageConcurrency"; public static final int MAX_NUM_STAGES_FOR_JVM_OPTS_OVERRIDE = 5; @@ -212,6 +215,32 @@ public class ParameterUtils { .description("Delimiter for separating SSE data before compression") .build(); systemParams.put(compressionDelimiter.getName(), compressionDelimiter); + + ParameterDefinition autoscaleSourceJobMetricEnabled = new BooleanParameter() + .name(JOB_MASTER_AUTOSCALE_SOURCEJOB_METRIC_PARAM) + .validator(Validators.alwaysPass()) + .defaultValue(false) + .description("Enable source job drop metrics to be used for autoscaling the 1st stage") + .build(); + systemParams.put(autoscaleSourceJobMetricEnabled.getName(), autoscaleSourceJobMetricEnabled); + + ParameterDefinition autoscaleSourceJobTarget = new StringParameter() + .name(JOB_MASTER_AUTOSCALE_SOURCEJOB_TARGET_PARAM) + .validator(Validators.alwaysPass()) + .defaultValue("{}") + .description("Json config to specify source job targets for autoscale metrics. This param is not needed if the 'target' param is already present. Example: {\"targets\": [{\"sourceJobName\":, \"clientId\":}]}") + .build(); + systemParams.put(autoscaleSourceJobTarget.getName(), autoscaleSourceJobTarget); + + ParameterDefinition autoscaleSourceJobDropMetricPattern = new StringParameter() + .name(JOB_MASTER_AUTOSCALE_SOURCEJOB_DROP_METRIC_PATTERNS_PARAM) + .validator(Validators.alwaysPass()) + .defaultValue("") + .description("Additional metrics pattern for source job drops. Comma separated list, supports dynamic client ID by using '_CLIENT_ID_' as a token. " + + "Each metric should be expressed in the same format as '" + JOB_MASTER_AUTOSCALE_METRIC_SYSTEM_PARAM + "'. " + + "Example: PushServerSse:clientId=_CLIENT_ID_:*::droppedCounter::MAX,ServerSentEventRequestHandler:clientId=_CLIENT_ID_:*::droppedCounter::MAX") + .build(); + systemParams.put(autoscaleSourceJobDropMetricPattern.getName(), autoscaleSourceJobDropMetricPattern); } private ParameterUtils() { diff --git a/mantis-runtime/src/main/java/io/mantisrx/runtime/parameter/SinkParameter.java b/mantis-runtime/src/main/java/io/mantisrx/runtime/parameter/SinkParameter.java index c8826fd1a..f0a72b3e5 100644 --- a/mantis-runtime/src/main/java/io/mantisrx/runtime/parameter/SinkParameter.java +++ b/mantis-runtime/src/main/java/io/mantisrx/runtime/parameter/SinkParameter.java @@ -40,9 +40,9 @@ public class SinkParameter { */ public SinkParameter(String name, String value) throws UnsupportedEncodingException { this.name = name; - this.value = value; + this.value = value == null ? "" : value; - encodedValue = URLEncoder.encode(value, "UTF-8"); + encodedValue = URLEncoder.encode(this.value, "UTF-8"); } diff --git a/mantis-runtime/src/main/java/io/mantisrx/runtime/parameter/SourceJobParameters.java b/mantis-runtime/src/main/java/io/mantisrx/runtime/parameter/SourceJobParameters.java new file mode 100644 index 000000000..85fce3020 --- /dev/null +++ b/mantis-runtime/src/main/java/io/mantisrx/runtime/parameter/SourceJobParameters.java @@ -0,0 +1,228 @@ +/* + * Copyright 2020 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.mantisrx.runtime.parameter; + +import com.mantisrx.common.utils.MantisSSEConstants; +import io.mantisrx.runtime.Context; +import io.mantisrx.shaded.com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import io.mantisrx.shaded.com.fasterxml.jackson.annotation.JsonProperty; +import io.mantisrx.shaded.com.fasterxml.jackson.core.type.TypeReference; +import io.mantisrx.shaded.com.fasterxml.jackson.databind.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + +public class SourceJobParameters { + public static final String MANTIS_SOURCEJOB_TARGET_KEY = "target"; + public static final String MANTIS_SOURCEJOB_NAME_PARAM = "sourceJobName"; + public static final String MANTIS_SOURCEJOB_CRITERION = "criterion"; + public static final String MANTIS_SOURCEJOB_CLIENT_ID = "clientId"; + public static final String MANTIS_SOURCEJOB_IS_BROADCAST_MODE = "isBroadcastMode"; + private static final Logger log = LoggerFactory.getLogger(SourceJobParameters.class); + private static final ObjectMapper mapper = new ObjectMapper(); + + + public static List parseInputParameters(Context ctx) { + String targetListStr = (String) ctx.getParameters() + .get(MANTIS_SOURCEJOB_TARGET_KEY, "{}"); + return parseTargetInfo(targetListStr); + } + + public static List parseTargetInfo(String targetListStr) { + List targetList = new ArrayList(); + + try { + Map> targets = mapper.readValue(targetListStr, new TypeReference>>() {}); + if (targets.get("targets") != null) { + return targets.get("targets"); + } + } catch (Exception ex) { + log.error("Failed to parse target list: {}", targetListStr, ex); + } + + return targetList; + } + + @JsonIgnoreProperties(ignoreUnknown = true) + public static class TargetInfo { + + @JsonProperty(MANTIS_SOURCEJOB_NAME_PARAM) public String sourceJobName; + @JsonProperty(MANTIS_SOURCEJOB_CRITERION) public String criterion; + @JsonProperty(MANTIS_SOURCEJOB_CLIENT_ID) public String clientId; + @JsonProperty(MantisSSEConstants.SAMPLE) public int samplePerSec = -1; + @JsonProperty(MANTIS_SOURCEJOB_IS_BROADCAST_MODE) public boolean isBroadcastMode; + @JsonProperty(MantisSSEConstants.ENABLE_META_MESSAGES) public boolean enableMetaMessages; + @JsonProperty(MantisSSEConstants.MANTIS_ENABLE_COMPRESSION) public boolean enableCompressedBinary; + @JsonProperty(MantisSSEConstants.MANTIS_COMPRESSION_DELIMITER) public String delimiter; + + public TargetInfo(String jobName, + String criterion, + String clientId, + int samplePerSec, + boolean isBroadcastMode, + boolean enableMetaMessages, + boolean enableCompressedBinary, + String delimiter) { + this.sourceJobName = jobName; + this.criterion = criterion; + this.clientId = clientId; + this.samplePerSec = samplePerSec; + this.isBroadcastMode = isBroadcastMode; + this.enableMetaMessages = enableMetaMessages; + this.enableCompressedBinary = enableCompressedBinary; + this.delimiter = delimiter; + } + + public TargetInfo() { + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + TargetInfo that = (TargetInfo) o; + return Objects.equals(sourceJobName, that.sourceJobName) && + Objects.equals(criterion, that.criterion) && + Objects.equals(clientId, that.clientId) && + Objects.equals(samplePerSec, that.samplePerSec) && + Objects.equals(isBroadcastMode, that.isBroadcastMode) && + Objects.equals(enableMetaMessages, that.enableMetaMessages) && + Objects.equals(enableCompressedBinary, that.enableCompressedBinary) && + Objects.equals(delimiter, that.delimiter); + } + + @Override + public int hashCode() { + return Objects.hash(sourceJobName, criterion, clientId, samplePerSec, isBroadcastMode, enableMetaMessages, enableCompressedBinary, delimiter); + } + + @Override + public String toString() { + return "TargetInfo{" + + "sourceJobName=" + sourceJobName + "," + + "criterion=" + criterion + "," + + "clientId=" + clientId + "," + + "samplePerSec=" + samplePerSec + "," + + "isBroadcastMode=" + isBroadcastMode + "," + + "enableMetaMessages=" + enableMetaMessages + "," + + "enableCompressedBinary=" + enableCompressedBinary + "," + + "delimiter=" + delimiter + "," + + "}"; + } + } + + public static class TargetInfoBuilder { + + private String sourceJobName; + private String criterion; + private String clientId; + private int samplePerSec = -1; + private boolean isBroadcastMode = false; + private boolean enableMetaMessages = false; + private boolean enableCompressedBinary = false; + private String delimiter = null; + + public TargetInfoBuilder() { + } + + public TargetInfoBuilder withSourceJobName(String srcJobName) { + this.sourceJobName = srcJobName; + return this; + } + + public TargetInfoBuilder withQuery(String query) { + this.criterion = query; + return this; + } + + public TargetInfoBuilder withSamplePerSec(int samplePerSec) { + this.samplePerSec = samplePerSec; + return this; + } + + public TargetInfoBuilder withBroadCastMode() { + this.isBroadcastMode = true; + return this; + } + + public TargetInfoBuilder withMetaMessagesEnabled() { + this.enableMetaMessages = true; + return this; + } + + + public TargetInfoBuilder withBinaryCompressionEnabled() { + this.enableCompressedBinary = true; + return this; + } + + public TargetInfoBuilder withClientId(String clientId) { + this.clientId = clientId; + return this; + } + + public TargetInfoBuilder withDelimiter(String delimiter) { + this.delimiter = delimiter; + return this; + } + + public TargetInfo build() { + return new TargetInfo( + sourceJobName, + criterion, + clientId, + samplePerSec, + isBroadcastMode, + enableMetaMessages, + enableCompressedBinary, + delimiter); + } + } + + /** + * Ensures that a list of TargetInfo contains a sane set of sourceJobName, ClientId pairs. + * TODO: Currently mutates the list, which isn't problematic here, but it would be prudent to clean this up. + * + * @param targets A List of TargetInfo for which to validate and correct clientId inconsistencies. + * + * @return The original List modified to have consistent clientIds. + */ + public static List enforceClientIdConsistency(List targets, String defaultClientId) { + + targets.sort(Comparator.comparing(t -> t.criterion)); + HashSet> connectionPairs = new HashSet<>(targets.size()); + + for (TargetInfo target : targets) { + if (target.clientId == null) { + target.clientId = defaultClientId; + } + + Map.Entry connectionPair = new AbstractMap.SimpleEntry(target.sourceJobName, target.clientId); + int attempts = 0; + + while (connectionPairs.contains(connectionPair)) { + connectionPair = new AbstractMap.SimpleEntry(target.sourceJobName, target.clientId + "_" + ++attempts); + } + + target.clientId = connectionPair.getValue(); + connectionPairs.add(connectionPair); + } + + return targets; + } +} diff --git a/mantis-runtime/src/test/java/io/mantisrx/runtime/parameter/SinkParameterTest.java b/mantis-runtime/src/test/java/io/mantisrx/runtime/parameter/SinkParameterTest.java index 283b28e8a..1f87d3648 100644 --- a/mantis-runtime/src/test/java/io/mantisrx/runtime/parameter/SinkParameterTest.java +++ b/mantis-runtime/src/test/java/io/mantisrx/runtime/parameter/SinkParameterTest.java @@ -27,56 +27,33 @@ public class SinkParameterTest { @Test - public void testGenerateURI() { + public void testGenerateURI() throws Exception { SinkParameters sps; - try { - sps = new SinkParameters.Builder().withParameter("p1", "v1").withParameter("p2", "v2").withParameter("p3", "v3").build(); - assertEquals("?p1=v1&p2=v2&p3=v3", sps.toString()); - } catch (UnsupportedEncodingException e) { - - e.printStackTrace(); - fail(); - } - + sps = new SinkParameters.Builder().withParameter("p1", "v1").withParameter("p2", "v2").withParameter("p3", "v3").build(); + assertEquals("?p1=v1&p2=v2&p3=v3", sps.toString()); } @Test - public void testGenerateURI2() { + public void testGenerateURI2() throws Exception { SinkParameters sps; - try { - sps = new SinkParameters.Builder().withParameter("p1", "v1").withParameter("p2", null).withParameter("p3", "v3").build(); - assertEquals("?p1=v1&p2=&p3=v3", sps.toString()); - } catch (UnsupportedEncodingException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - fail(); - } - + sps = new SinkParameters.Builder().withParameter("p1", "v1").withParameter("p2", null).withParameter("p3", "v3").build(); + assertEquals("?p1=v1&p2=&p3=v3", sps.toString()); } @Test - public void testGenerateURI3() { + public void testGenerateURI3() throws Exception { SinkParameters sps; - try { - sps = new SinkParameters.Builder().withParameter("p1", "select esn, country where e[\"response.header.x-netflix.api-script-endpoint\"]==\"/account/geo\"").build(); - assertEquals("?p1=v1&p2=&p3=v3", sps.toString()); - } catch (UnsupportedEncodingException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - fail(); - } - + sps = new SinkParameters.Builder().withParameter("p1", "select esn, country where e[\"response.header.x-netflix.api-script-endpoint\"]==\"/account/geo\"").build(); + assertEquals("?p1=select+esn%2C+country+where+e%5B%22response.header.x-netflix.api-script-endpoint%22%5D%3D%3D%22%2Faccount%2Fgeo%22", sps.toString()); } @Test public void testGenerateURI4() { - SinkParameters sps = new SinkParameters.Builder().build(); assertEquals("", sps.toString()); - } diff --git a/mantis-runtime/src/test/java/io/mantisrx/runtime/parameter/SourceJobParametersTest.java b/mantis-runtime/src/test/java/io/mantisrx/runtime/parameter/SourceJobParametersTest.java new file mode 100644 index 000000000..91e82aaa0 --- /dev/null +++ b/mantis-runtime/src/test/java/io/mantisrx/runtime/parameter/SourceJobParametersTest.java @@ -0,0 +1,137 @@ +/* + * Copyright 2020 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.mantisrx.runtime.parameter; + +import io.mantisrx.shaded.com.google.common.collect.ImmutableList; +import io.mantisrx.shaded.com.google.common.collect.Lists; +import org.junit.Test; + +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public class SourceJobParametersTest { + @Test + public void shouldParseTargetInfoJson() { + String json = "{\"targets\":[" + + "{" + + " \"sourceJobName\":\"TestSource\"," + + " \"criterion\":\"select * from stream\"," + + " \"unknownProperty\":\"value\"" + + "}," + + "{" + + " \"sourceJobName\":\"TestSource2\"," + + " \"criterion\":\"select * from stream2\"," + + " \"clientId\":\"TestClientId2\"," + + " \"sample\":10," + + " \"isBroadcastMode\":true," + + " \"enableMetaMessages\":true," + + " \"mantis.EnableCompressedBinary\":true," + + " \"enableMetaMessages\":true," + + " \"mantis.CompressionDelimiter\":\"Delimiter2\"" + + "}" + + "]}"; + List infos = SourceJobParameters.parseTargetInfo(json); + List expected = ImmutableList.of( + new SourceJobParameters.TargetInfoBuilder().withSourceJobName("TestSource").withQuery("select * from stream").build(), + new SourceJobParameters.TargetInfoBuilder().withSourceJobName("TestSource2").withQuery("select * from stream2") + .withClientId("TestClientId2").withSamplePerSec(10).withBroadCastMode().withMetaMessagesEnabled().withBinaryCompressionEnabled().withDelimiter("Delimiter2").build() + ); + assertEquals(expected, infos); + } + + @Test + public void shouldParseEmptyJson() { + String json = "{}"; + List infos = SourceJobParameters.parseTargetInfo(json); + assertEquals(0, infos.size()); + + json = "invalid_json"; + infos = SourceJobParameters.parseTargetInfo(json); + assertEquals(0, infos.size()); + } + + @Test + public void shouldInsertDefaultClientIdIfNoneIsPresent() { + + SourceJobParameters.TargetInfo target = new SourceJobParameters.TargetInfoBuilder().withSourceJobName("TestSource").withQuery("select * from stream").build(); + + List result = SourceJobParameters + .enforceClientIdConsistency(Collections.singletonList(target), "defaultId"); + + SourceJobParameters.TargetInfo firstResult = result.get(0); + assertEquals(firstResult.clientId, "defaultId"); + } + + @Test + public void shouldNotChangeSingleSourceWithClientId() { + + SourceJobParameters.TargetInfo target = new SourceJobParameters.TargetInfoBuilder().withSourceJobName("TestSource").withQuery("select * from stream").withClientId("myClient").build(); + + List result = SourceJobParameters + .enforceClientIdConsistency(Collections.singletonList(target), "defaultId"); + + SourceJobParameters.TargetInfo firstResult = result.get(0); + assertEquals(firstResult.clientId, "myClient"); + } + + @Test + public void shouldChangeSecondTargetId() { + + SourceJobParameters.TargetInfo target = new SourceJobParameters.TargetInfoBuilder().withSourceJobName("TestSource").withQuery("select * from stream").withClientId("myClient").build(); + + SourceJobParameters.TargetInfo target2 = new SourceJobParameters.TargetInfoBuilder().withSourceJobName("TestSource").withQuery("select * from stream").withClientId("myClient").build(); + + List result = SourceJobParameters + .enforceClientIdConsistency(Lists.newArrayList(target, target2), "defaultId"); + + assertEquals("myClient", result.get(0).clientId); + assertEquals("myClient_1", result.get(1).clientId); + } + + @Test + public void shouldChangeSecondTargetIdWithDefaults() { + + SourceJobParameters.TargetInfo target = new SourceJobParameters.TargetInfoBuilder().withSourceJobName("TestSource").withQuery("select * from stream").build(); + + SourceJobParameters.TargetInfo target2 = new SourceJobParameters.TargetInfoBuilder().withSourceJobName("TestSource").withQuery("select * from stream").build(); + + List result = SourceJobParameters + .enforceClientIdConsistency(Lists.newArrayList(target, target2), "defaultId"); + + assertEquals("defaultId", result.get(0).clientId); + assertEquals("defaultId_1", result.get(1).clientId); + } + + @Test + public void shouldNotImpactUnrelatedSource() { + + SourceJobParameters.TargetInfo target = new SourceJobParameters.TargetInfoBuilder().withSourceJobName("TestSource").withQuery("select * from stream").withClientId("myClient").build(); + + SourceJobParameters.TargetInfo target2 = new SourceJobParameters.TargetInfoBuilder().withSourceJobName("TestSource").withQuery("select * from streamz").withClientId("myClient").build(); + + SourceJobParameters.TargetInfo target3 = new SourceJobParameters.TargetInfoBuilder().withSourceJobName("UnrelatedSource").withQuery("select * from streamzz").withClientId("myUnrelatedClient").build(); + + List result = SourceJobParameters + .enforceClientIdConsistency(Lists.newArrayList(target, target2, target3), "defaultId"); + + assertEquals("myClient", result.get(0).clientId); + assertEquals("myClient_1", result.get(1).clientId); + assertEquals("myUnrelatedClient", result.get(2).clientId); + } +} diff --git a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/AutoScaleMetricsConfig.java b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/AutoScaleMetricsConfig.java index ae86a00de..8fbfbf8ab 100644 --- a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/AutoScaleMetricsConfig.java +++ b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/AutoScaleMetricsConfig.java @@ -17,18 +17,28 @@ package io.mantisrx.server.worker.jobmaster; import static io.mantisrx.server.core.stats.MetricStringConstants.*; +import static io.reactivex.mantis.network.push.PushServerSse.*; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.regex.Pattern; public class AutoScaleMetricsConfig { + public static final String CLIENT_ID_TOKEN = "_CLIENT_ID_"; + public static final String OUTBOUND_METRIC_GROUP_PATTERN = String.format("%s:%s=%s:*", PUSH_SERVER_METRIC_GROUP_NAME, CLIENT_ID_TAG_NAME, CLIENT_ID_TOKEN); + public static final String OUTBOUND_LEGACY_METRIC_GROUP_PATTERN = String.format("%s:%s=%s:*", PUSH_SERVER_LEGACY_METRIC_GROUP_NAME, CLIENT_ID_TAG_NAME, CLIENT_ID_TOKEN); + private static final AggregationAlgo DEFAULT_ALGO = AggregationAlgo.AVERAGE; // autoscaling metric groups to subscribe to by default private static final Map> defaultAutoScaleMetrics = new HashMap<>(); + private final Map> sourceJobMetrics = new HashMap<>(); + private final Map sourceJobMetricsPatterns = new HashMap<>(); + static { defaultAutoScaleMetrics.put(RESOURCE_USAGE_METRIC_GROUP, new HashMap<>()); defaultAutoScaleMetrics.put(DATA_DROP_METRIC_GROUP, new HashMap<>()); @@ -49,6 +59,13 @@ public AutoScaleMetricsConfig() { public AutoScaleMetricsConfig(final Map> userDefinedAutoScaleMetrics) { this.userDefinedAutoScaleMetrics = userDefinedAutoScaleMetrics; + + final Map defaultOutboundMetric = new HashMap<>(); + defaultOutboundMetric.put(DROPPED_COUNTER_METRIC_NAME, AggregationAlgo.MAX); + sourceJobMetrics.put(OUTBOUND_METRIC_GROUP_PATTERN, defaultOutboundMetric); + sourceJobMetrics.put(OUTBOUND_LEGACY_METRIC_GROUP_PATTERN, defaultOutboundMetric); + sourceJobMetricsPatterns.put(OUTBOUND_METRIC_GROUP_PATTERN, generateSourceJobMetricPattern(OUTBOUND_METRIC_GROUP_PATTERN)); + sourceJobMetricsPatterns.put(OUTBOUND_LEGACY_METRIC_GROUP_PATTERN, generateSourceJobMetricPattern(OUTBOUND_LEGACY_METRIC_GROUP_PATTERN)); } public void addUserDefinedMetric(final String metricGroupName, @@ -58,6 +75,33 @@ public void addUserDefinedMetric(final String metricGroupName, userDefinedAutoScaleMetrics.get(metricGroupName).put(metricName, algo); } + /** + * Add source job drop metric patterns in addition to the default patterns. + * @param metricsStr comma separated list of metrics in the form of metricGroupName::metricName::algo + */ + public void addSourceJobDropMetrics(String metricsStr) { + for (String metric : metricsStr.split(",")) { + metric = metric.trim(); + try { + String[] parts = metric.split("::"); + String metricGroupName = parts[0]; + String metricName = parts[1]; + AggregationAlgo algo = AggregationAlgo.valueOf(parts[2]); + + Map metricGroup = sourceJobMetrics.get(metricGroupName); + if (metricGroup == null) { + metricGroup = new HashMap<>(); + sourceJobMetrics.put(metricGroupName, metricGroup); + sourceJobMetricsPatterns.put(metricGroupName, generateSourceJobMetricPattern(metricGroupName)); + } + metricGroup.put(metricName, algo); + } catch (Exception ex) { + String errMsg = String.format("Invalid format for source job metric: %s", metricsStr); + throw new RuntimeException(errMsg, ex); + } + } + } + public AggregationAlgo getAggregationAlgo(final String metricGroupName, final String metricName) { if (userDefinedAutoScaleMetrics.containsKey(metricGroupName) && userDefinedAutoScaleMetrics.get(metricGroupName).containsKey(metricName)) { return userDefinedAutoScaleMetrics.get(metricGroupName).getOrDefault(metricName, DEFAULT_ALGO); @@ -65,6 +109,11 @@ public AggregationAlgo getAggregationAlgo(final String metricGroupName, final St if (defaultAutoScaleMetrics.containsKey(metricGroupName) && defaultAutoScaleMetrics.get(metricGroupName).containsKey(metricName)) { return defaultAutoScaleMetrics.get(metricGroupName).getOrDefault(metricName, DEFAULT_ALGO); } + for (Map.Entry entry : sourceJobMetricsPatterns.entrySet()) { + if (entry.getValue().matcher(metricGroupName).matches()) { + return sourceJobMetrics.get(entry.getKey()).getOrDefault(metricName, DEFAULT_ALGO); + } + } return DEFAULT_ALGO; } @@ -98,6 +147,29 @@ public Set getMetricGroups() { return getAllMetrics().keySet(); } + public Set generateSourceJobMetricGroups(Set clientIds) { + Set results = new HashSet<>(); + for (String clientId : clientIds) { + for (String metricPattern : sourceJobMetrics.keySet()) { + results.add(metricPattern.replaceAll(CLIENT_ID_TOKEN, clientId)); + } + } + return results; + } + + public boolean isSourceJobDropMetric(String metricGroupName, String metricName) { + for (Map.Entry entry : sourceJobMetricsPatterns.entrySet()) { + if (entry.getValue().matcher(metricGroupName).matches()) { + return sourceJobMetrics.get(entry.getKey()).keySet().contains(metricName); + } + } + return false; + } + + private static Pattern generateSourceJobMetricPattern(String metricGroupName) { + return Pattern.compile(metricGroupName.replace("*", ".*").replaceAll(CLIENT_ID_TOKEN, ".*")); + } + @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/JobMasterService.java b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/JobMasterService.java index 23010d421..587cc68ca 100644 --- a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/JobMasterService.java +++ b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/JobMasterService.java @@ -18,7 +18,10 @@ import java.io.IOException; import java.util.List; +import java.util.stream.Collectors; +import io.mantisrx.runtime.parameter.ParameterUtils; +import io.mantisrx.runtime.parameter.SourceJobParameters; import io.mantisrx.shaded.com.fasterxml.jackson.core.JsonProcessingException; import io.mantisrx.shaded.com.fasterxml.jackson.databind.DeserializationFeature; import io.mantisrx.shaded.com.fasterxml.jackson.databind.ObjectMapper; @@ -54,6 +57,7 @@ public class JobMasterService implements Service { private final Action0 observableOnCompleteCallback; private final Action1 observableOnErrorCallback; private final Action0 observableOnTerminateCallback; + private final MantisMasterClientApi masterClientApi; private Subscription subscription = null; @@ -69,6 +73,7 @@ public JobMasterService(final String jobId, this.jobId = jobId; this.workerMetricsClient = workerMetricsClient; this.autoScaleMetricsConfig = autoScaleMetricsConfig; + this.masterClientApi = masterClientApi; this.jobAutoScaler = new JobAutoScaler(jobId, schedInfo, masterClientApi, context); this.metricObserver = new WorkerMetricHandler(jobId, jobAutoScaler.getObserver(), masterClientApi, autoScaleMetricsConfig).initAndGetMetricDataObserver(); this.observableOnCompleteCallback = observableOnCompleteCallback; @@ -83,13 +88,20 @@ private Measurements handleMetricEvent(final String ev) { final String jobId = measurements.getTags().get(MetricStringConstants.MANTIS_JOB_ID); final int workerIdx = Integer.parseInt(measurements.getTags().get(MetricStringConstants.MANTIS_WORKER_INDEX)); - final int stage = Integer.parseInt(measurements.getTags().get(MetricStringConstants.MANTIS_STAGE_NUM)); + int stage = Integer.parseInt(measurements.getTags().get(MetricStringConstants.MANTIS_STAGE_NUM)); final int workerNum = Integer.parseInt(measurements.getTags().get(MetricStringConstants.MANTIS_WORKER_NUM)); - - // logger.info("got data from idx {} num {} stage {}", workerIdx, workerNum, stage); - metricObserver.onNext(new MetricData(jobId, stage, workerIdx, workerNum, - measurements.getName(), (List) measurements.getGauges())); - + List gauges = (List) measurements.getGauges(); + + // Metric is not from current job, it is from the source job + if (jobId != this.jobId) { + // Funnel source job metric into the 1st stage + stage = 1; + if (gauges.isEmpty()) { + gauges = measurements.getCounters().stream().map(counter -> + new GaugeMeasurement(counter.getEvent(), counter.getCount())).collect(Collectors.toList()); + } + } + metricObserver.onNext(new MetricData(jobId, stage, workerIdx, workerNum, measurements.getName(), gauges)); return measurements; } catch (JsonProcessingException e) { @@ -110,7 +122,14 @@ public void start() { final WorkerMetricSubscription workerMetricSubscription = new WorkerMetricSubscription(jobId, workerMetricsClient, autoScaleMetricsConfig.getMetricGroups()); - final Observable> metrics = workerMetricSubscription.getMetricsClient().getResults(); + Observable> metrics = workerMetricSubscription.getMetricsClient().getResults(); + + boolean isSourceJobMetricEnabled = (boolean) context.getParameters().get( + ParameterUtils.JOB_MASTER_AUTOSCALE_SOURCEJOB_METRIC_PARAM, false); + if (isSourceJobMetricEnabled) { + metrics = metrics.mergeWith(getSourceJobMetrics()); + } + subscription = Observable.merge(metrics) .map(event -> handleMetricEvent(event.getEventAsString())) .doOnTerminate(observableOnTerminateCallback) @@ -119,6 +138,24 @@ public void start() { .subscribe(); } + protected Observable> getSourceJobMetrics() { + List targetInfos = SourceJobParameters.parseTargetInfo( + (String) context.getParameters().get(ParameterUtils.JOB_MASTER_AUTOSCALE_SOURCEJOB_TARGET_PARAM, "{}")); + if (targetInfos.isEmpty()) { + targetInfos = SourceJobParameters.parseInputParameters(context); + } + targetInfos = SourceJobParameters.enforceClientIdConsistency(targetInfos, jobId); + + String additionalDropMetricPatterns = + (String) context.getParameters().get(ParameterUtils.JOB_MASTER_AUTOSCALE_SOURCEJOB_DROP_METRIC_PATTERNS_PARAM, ""); + autoScaleMetricsConfig.addSourceJobDropMetrics(additionalDropMetricPatterns); + + SourceJobWorkerMetricsSubscription sourceSub = new SourceJobWorkerMetricsSubscription( + targetInfos, masterClientApi, workerMetricsClient, autoScaleMetricsConfig); + + return sourceSub.getResults(); + } + @Override public void shutdown() { if (subscription != null) { diff --git a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/SourceJobWorkerMetricsSubscription.java b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/SourceJobWorkerMetricsSubscription.java new file mode 100644 index 000000000..71780b38c --- /dev/null +++ b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/SourceJobWorkerMetricsSubscription.java @@ -0,0 +1,80 @@ +/* + * Copyright 2020 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.mantisrx.server.worker.jobmaster; + +import io.mantisrx.common.MantisServerSentEvent; +import io.mantisrx.runtime.parameter.SourceJobParameters; +import io.mantisrx.server.core.NamedJobInfo; +import io.mantisrx.server.master.client.MantisMasterClientApi; +import io.mantisrx.server.worker.client.WorkerMetricsClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import rx.Observable; + +import java.util.*; +import java.util.stream.Collectors; + +/** + * Manages subscriptions to source job workers. + */ +public class SourceJobWorkerMetricsSubscription { + private static final Logger logger = LoggerFactory.getLogger(SourceJobWorkerMetricsSubscription.class); + + private final List targetInfos; + private final MantisMasterClientApi masterClient; + private final WorkerMetricsClient workerMetricsClient; + private final AutoScaleMetricsConfig metricsConfig; + + public SourceJobWorkerMetricsSubscription(List targetInfos, + MantisMasterClientApi masterClient, + WorkerMetricsClient workerMetricsClient, + AutoScaleMetricsConfig metricsConfig) { + this.targetInfos = targetInfos; + this.masterClient = masterClient; + this.workerMetricsClient = workerMetricsClient; + this.metricsConfig = metricsConfig; + } + + public Observable> getResults() { + return Observable.merge(getSourceJobToClientMap().entrySet().stream().map(entry -> { + String sourceJobName = entry.getKey(); + Set clientIds = entry.getValue(); + Set sourceJobMetrics = metricsConfig.generateSourceJobMetricGroups(clientIds); + return masterClient + .namedJobInfo(sourceJobName) + .map(NamedJobInfo::getJobId) + .flatMap(jobId -> getResultsForJobId(jobId, sourceJobMetrics)); + }).collect(Collectors.toList())); + } + + protected Observable> getResultsForJobId(String jobId, Set sourceJobMetrics) { + return new WorkerMetricSubscription(jobId, workerMetricsClient, sourceJobMetrics).getMetricsClient().getResults(); + } + + protected Map> getSourceJobToClientMap() { + Map> results = new HashMap<>(); + for (SourceJobParameters.TargetInfo info : targetInfos) { + Set clientIds = results.get(info.sourceJobName); + if (clientIds == null) { + clientIds = new HashSet<>(); + results.put(info.sourceJobName, clientIds); + } + clientIds.add(info.clientId); + } + return results; + } +} diff --git a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/WorkerMetricHandler.java b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/WorkerMetricHandler.java index 1bd6118e7..9c2ce5ca2 100644 --- a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/WorkerMetricHandler.java +++ b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/jobmaster/WorkerMetricHandler.java @@ -32,6 +32,8 @@ import io.mantisrx.server.core.WorkerOutlier; import io.mantisrx.server.core.stats.MetricStringConstants; import io.mantisrx.server.master.client.MantisMasterClientApi; +import io.mantisrx.shaded.com.google.common.cache.Cache; +import io.mantisrx.shaded.com.google.common.cache.CacheBuilder; import io.reactivx.mantis.operators.DropOperator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -109,6 +111,10 @@ private class StageMetricDataOperator implements Observable.Operator workersMap = new ConcurrentHashMap<>(); + private final ConcurrentMap sourceJobWorkersMap = new ConcurrentHashMap<>(); + private final Cache sourceJobMetricsRecent = CacheBuilder.newBuilder() + .expireAfterWrite(1, TimeUnit.MINUTES) + .build(); private final WorkerOutlier workerOutlier; private final Map workerNumberByIndex = new HashMap<>(); @@ -163,12 +169,14 @@ private boolean resubmitOutlierWorkerEnabled() { private void addDataPoint(final MetricData datapoint) { final int workerIndex = datapoint.getWorkerIndex(); + logger.debug("adding data point for worker idx={} data={}", workerIndex, datapoint); - if (!workersMap.containsKey(workerIndex)) { - workersMap.putIfAbsent(workerIndex, new WorkerMetrics(valuesToKeep)); - } WorkerMetrics workerMetrics = workersMap.get(workerIndex); + if (workerMetrics == null) { + workerMetrics = new WorkerMetrics(valuesToKeep); + workersMap.put(workerIndex, workerMetrics); + } final MetricData transformedMetricData = workerMetrics.addDataPoint(datapoint.getMetricGroupName(), datapoint); if (transformedMetricData.getMetricGroupName().equals(DATA_DROP_METRIC_GROUP)) { @@ -194,6 +202,24 @@ private void addDataPoint(final MetricData datapoint) { } } + private void addSourceJobDataPoint(final MetricData datapoint) { + final String sourceJobId = datapoint.getJobId(); + final int workerIndex = datapoint.getWorkerIndex(); + + String sourceWorkerKey = sourceJobId + ":" + workerIndex; + + WorkerMetrics workerMetrics = sourceJobWorkersMap.get(sourceWorkerKey); + if (workerMetrics == null) { + workerMetrics = new WorkerMetrics(valuesToKeep); + sourceJobWorkersMap.put(sourceWorkerKey, workerMetrics); + } + + workerMetrics.addDataPoint(datapoint.getMetricGroupName(), datapoint); + + String sourceMetricKey = sourceWorkerKey + ":" + datapoint.getMetricGroupName(); + sourceJobMetricsRecent.put(sourceMetricKey, sourceMetricKey); + } + private static final int metricsIntervalSeconds = 30; // TODO make it configurable @Override @@ -275,11 +301,36 @@ public void call() { final GaugeData gaugeData = allWorkerAggregates.get(WORKER_STAGE_INNER_INPUT); final Map gauges = gaugeData.getGauges(); if (gauges.containsKey(ON_NEXT_GAUGE)) { + // Divide by 6 to account for 6 second reset by Atlas on counter metric. jobAutoScaleObserver.onNext( new JobAutoScaler.Event(StageScalingPolicy.ScalingReason.RPS, stage, gauges.get(ON_NEXT_GAUGE) / 6.0, numWorkers, "")); } } + + double sourceJobDrops = 0; + boolean hasSourceJobDropsMetric = false; + Map sourceMetricsRecent = sourceJobMetricsRecent.asMap(); + for (Map.Entry worker : sourceJobWorkersMap.entrySet()) { + Map metricGroups = metricAggregator.getAggregates(worker.getValue().getGaugesByMetricGrp()); + for (Map.Entry group : metricGroups.entrySet()) { + String metricKey = worker.getKey() + ":" + group.getKey(); + for (Map.Entry gauge : group.getValue().getGauges().entrySet()) { + if (sourceMetricsRecent.containsKey(metricKey) && + autoScaleMetricsConfig.isSourceJobDropMetric(group.getKey(), gauge.getKey())) { + sourceJobDrops += gauge.getValue(); + hasSourceJobDropsMetric = true; + } + } + } + } + if (hasSourceJobDropsMetric) { + logger.info("Job stage {}, source job drop metrics: {}", stage, sourceJobWorkersMap); + // Divide by 6 to account for 6 second reset by Atlas on counter metric. + jobAutoScaleObserver.onNext( + new JobAutoScaler.Event(StageScalingPolicy.ScalingReason.SourceJobDrop, stage, + sourceJobDrops / 6.0 / numWorkers, numWorkers, "")); + } } }, metricsIntervalSeconds, metricsIntervalSeconds, TimeUnit.SECONDS )); @@ -298,7 +349,11 @@ public void onError(Throwable e) { public void onNext(MetricData metricData) { logger.debug("Got metric metricData for job " + jobId + " stage " + stage + ", worker " + metricData.getWorkerNumber() + ": " + metricData); - addDataPoint(metricData); + if (metricData.getJobId() == jobId) { + addDataPoint(metricData); + } else { + addSourceJobDataPoint(metricData); + } } }; } diff --git a/mantis-server/mantis-server-worker/src/test/java/io/mantisrx/server/worker/jobmaster/AutoScaleMetricsConfigTest.java b/mantis-server/mantis-server-worker/src/test/java/io/mantisrx/server/worker/jobmaster/AutoScaleMetricsConfigTest.java new file mode 100644 index 000000000..6bade4bb9 --- /dev/null +++ b/mantis-server/mantis-server-worker/src/test/java/io/mantisrx/server/worker/jobmaster/AutoScaleMetricsConfigTest.java @@ -0,0 +1,88 @@ +/* + * Copyright 2020 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.mantisrx.server.worker.jobmaster; + +import io.mantisrx.shaded.com.google.common.collect.ImmutableSet; +import org.junit.Test; + +import java.util.Set; + +import static io.reactivex.mantis.network.push.PushServerSse.DROPPED_COUNTER_METRIC_NAME; +import static io.reactivex.mantis.network.push.PushServerSse.PROCESSED_COUNTER_METRIC_NAME; +import static org.junit.Assert.*; + +public class AutoScaleMetricsConfigTest { + @Test + public void testGenerateSourceJobMetricGroups() { + AutoScaleMetricsConfig config = new AutoScaleMetricsConfig(); + + Set groups = config.generateSourceJobMetricGroups(ImmutableSet.of("clientId1", "client-id-2")); + Set expected = ImmutableSet.of("PushServerSse:clientId=clientId1:*", "PushServerSse:clientId=client-id-2:*", + "ServerSentEventRequestHandler:clientId=clientId1:*", "ServerSentEventRequestHandler:clientId=client-id-2:*"); + assertEquals(expected, groups); + } + + @Test + public void testGetAggregationAlgoForSourceJobMetrics() throws Exception { + AutoScaleMetricsConfig config = new AutoScaleMetricsConfig(); + + AutoScaleMetricsConfig.AggregationAlgo aglo = config.getAggregationAlgo( + "ServerSentEventRequestHandler:clientId=RavenConnectorJob-1657357:sockAddr=/100.87.51.222", DROPPED_COUNTER_METRIC_NAME); + assertEquals(AutoScaleMetricsConfig.AggregationAlgo.MAX, aglo); + assertTrue(config.isSourceJobDropMetric("ServerSentEventRequestHandler:clientId=RavenConnectorJob-1657357:sockAddr=/100.87.51.222", DROPPED_COUNTER_METRIC_NAME)); + + aglo = config.getAggregationAlgo( + "PushServerSse:clientId=RavenConnectorJob-1657357:sockAddr=/100.87.51.222", DROPPED_COUNTER_METRIC_NAME); + assertEquals(AutoScaleMetricsConfig.AggregationAlgo.MAX, aglo); + + aglo = config.getAggregationAlgo( + "PushServerSse:clientId=RavenConnectorJob-1657357:sockAddr=/100.87.51.222", PROCESSED_COUNTER_METRIC_NAME); + assertEquals(AutoScaleMetricsConfig.AggregationAlgo.AVERAGE, aglo); + + aglo = config.getAggregationAlgo( + "ABCServerSentEventRequestHandler:clientId=RavenConnectorJob-1657357:sockAddr=/100.87.51.222", DROPPED_COUNTER_METRIC_NAME); + assertEquals(AutoScaleMetricsConfig.AggregationAlgo.AVERAGE, aglo); + + aglo = config.getAggregationAlgo( + "PushServerSse:clientId=ABC:DEF", DROPPED_COUNTER_METRIC_NAME); + assertEquals(AutoScaleMetricsConfig.AggregationAlgo.MAX, aglo); + } + + @Test + public void testAddSourceJobDropMetrics() { + AutoScaleMetricsConfig config = new AutoScaleMetricsConfig(); + config.addSourceJobDropMetrics("myDropGroup1:clientId=_CLIENT_ID_:*::myDropCounter::MAX"); + + AutoScaleMetricsConfig.AggregationAlgo aglo = config.getAggregationAlgo( + "myDropGroup1:clientId=RavenConnectorJob-1657357:sockAddr=/100.87.51.222", "myDropCounter"); + assertEquals(AutoScaleMetricsConfig.AggregationAlgo.MAX, aglo); + assertTrue(config.isSourceJobDropMetric("myDropGroup1:clientId=RavenConnectorJob-1657357:sockAddr=/100.87.51.222", "myDropCounter")); + assertFalse(config.isSourceJobDropMetric("ABCmyDropGroup1:clientId=RavenConnectorJob-1657357:sockAddr=/100.87.51.222", "myDropCounter")); + assertTrue(config.isSourceJobDropMetric("ServerSentEventRequestHandler:clientId=RavenConnectorJob-1657357:sockAddr=/100.87.51.222", DROPPED_COUNTER_METRIC_NAME)); + } + + @Test + public void testAddSourceJobDropMetricsThrowsException() { + AutoScaleMetricsConfig config = new AutoScaleMetricsConfig(); + try { + config.addSourceJobDropMetrics("InvalidMetricFormat"); + fail(); + } catch (Exception ex) { + // pass + } + } +} diff --git a/mantis-server/mantis-server-worker/src/test/java/io/mantisrx/server/worker/jobmaster/SourceJobWorkerMetricsSubscriptionTest.java b/mantis-server/mantis-server-worker/src/test/java/io/mantisrx/server/worker/jobmaster/SourceJobWorkerMetricsSubscriptionTest.java new file mode 100644 index 000000000..b1f9adc75 --- /dev/null +++ b/mantis-server/mantis-server-worker/src/test/java/io/mantisrx/server/worker/jobmaster/SourceJobWorkerMetricsSubscriptionTest.java @@ -0,0 +1,89 @@ +/* + * Copyright 2020 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.mantisrx.server.worker.jobmaster; + +import io.mantisrx.common.MantisServerSentEvent; +import io.mantisrx.runtime.parameter.SourceJobParameters; +import io.mantisrx.server.core.NamedJobInfo; +import io.mantisrx.server.master.client.MantisMasterClientApi; +import io.mantisrx.shaded.com.google.common.collect.ImmutableList; +import io.mantisrx.shaded.com.google.common.collect.ImmutableMap; +import io.mantisrx.shaded.com.google.common.collect.ImmutableSet; +import org.junit.Test; +import rx.Observable; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.*; + +public class SourceJobWorkerMetricsSubscriptionTest { + @Test + public void testGetSourceJobToClientMap() { + List infos = ImmutableList.of( + new SourceJobParameters.TargetInfoBuilder().withSourceJobName("jobA").withQuery("criterion").withClientId("client1").build(), + new SourceJobParameters.TargetInfoBuilder().withSourceJobName("jobA").withQuery("criterion").withClientId("client2").build(), + new SourceJobParameters.TargetInfoBuilder().withSourceJobName("jobB").withQuery("criterion").withClientId("client1").build(), + new SourceJobParameters.TargetInfoBuilder().withSourceJobName("jobB").withQuery("criterion").withClientId("client3").build() + ); + + SourceJobWorkerMetricsSubscription sub = new SourceJobWorkerMetricsSubscription(infos, null, null, null); + Map> results = sub.getSourceJobToClientMap(); + Map> expected = ImmutableMap.of("jobA", ImmutableSet.of("client1", "client2"), + "jobB", ImmutableSet.of("client1", "client3")); + assertEquals(expected, results); + } + + @Test + public void testGetResultsForAllSourceJobs() throws Exception { + List infos = ImmutableList.of( + new SourceJobParameters.TargetInfoBuilder().withSourceJobName("jobA").withQuery("criterion").withClientId("client1").build(), + new SourceJobParameters.TargetInfoBuilder().withSourceJobName("jobA").withQuery("criterion").withClientId("client2").build(), + new SourceJobParameters.TargetInfoBuilder().withSourceJobName("jobB").withQuery("criterion").withClientId("client1").build(), + new SourceJobParameters.TargetInfoBuilder().withSourceJobName("jobB").withQuery("criterion").withClientId("client3").build() + ); + + MantisMasterClientApi masterClient = mock(MantisMasterClientApi.class); + SourceJobWorkerMetricsSubscription sub = spy(new SourceJobWorkerMetricsSubscription(infos, masterClient, null, new AutoScaleMetricsConfig())); + + when(masterClient.namedJobInfo("jobA")).thenReturn(Observable.just(new NamedJobInfo("jobA", "jobA-1"))); + when(masterClient.namedJobInfo("jobB")).thenReturn(Observable.just(new NamedJobInfo("jobA", "jobB-2"))); + doReturn(Observable.just(Observable.just(new MantisServerSentEvent("jobA-event")))).when(sub).getResultsForJobId(eq("jobA-1"), any()); + doReturn(Observable.just(Observable.just(new MantisServerSentEvent("jobB-event")))).when(sub).getResultsForJobId(eq("jobB-2"), any()); + + CountDownLatch latch = new CountDownLatch(2); + Observable.merge(sub.getResults()).doOnNext(event -> { + if ("jobA-event".equals(event.getEventAsString()) || "jobB-event".equals(event.getEventAsString())) { + latch.countDown(); + } + }).subscribe(); + latch.await(10, TimeUnit.SECONDS); + assertEquals(0, latch.getCount()); + + Set jobAMetrics = ImmutableSet.of("PushServerSse:clientId=client1:*", "PushServerSse:clientId=client2:*", + "ServerSentEventRequestHandler:clientId=client1:*", "ServerSentEventRequestHandler:clientId=client2:*"); + verify(sub, times(1)).getResultsForJobId("jobA-1", jobAMetrics); + + jobAMetrics = ImmutableSet.of("PushServerSse:clientId=client1:*", "PushServerSse:clientId=client3:*", + "ServerSentEventRequestHandler:clientId=client1:*", "ServerSentEventRequestHandler:clientId=client3:*"); + verify(sub, times(1)).getResultsForJobId("jobB-2", jobAMetrics); + } +} diff --git a/mantis-server/mantis-server-worker/src/test/java/io/mantisrx/server/worker/jobmaster/WorkerMetricHandlerTest.java b/mantis-server/mantis-server-worker/src/test/java/io/mantisrx/server/worker/jobmaster/WorkerMetricHandlerTest.java index d138cd35a..15e2be791 100644 --- a/mantis-server/mantis-server-worker/src/test/java/io/mantisrx/server/worker/jobmaster/WorkerMetricHandlerTest.java +++ b/mantis-server/mantis-server-worker/src/test/java/io/mantisrx/server/worker/jobmaster/WorkerMetricHandlerTest.java @@ -18,6 +18,8 @@ import static io.mantisrx.server.core.stats.MetricStringConstants.DATA_DROP_METRIC_GROUP; import static io.mantisrx.server.core.stats.MetricStringConstants.KAFKA_CONSUMER_FETCH_MGR_METRIC_GROUP; +import static io.reactivex.mantis.network.push.PushServerSse.DROPPED_COUNTER_METRIC_NAME; +import static io.reactivex.mantis.network.push.PushServerSse.PROCESSED_COUNTER_METRIC_NAME; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.anyInt; @@ -41,6 +43,7 @@ import io.mantisrx.server.core.WorkerHost; import io.mantisrx.server.core.stats.MetricStringConstants; import io.mantisrx.server.master.client.MantisMasterClientApi; +import io.mantisrx.shaded.com.google.common.collect.ImmutableMap; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -281,4 +284,77 @@ public void onNext(JobAutoScaler.Event event) { assertTrue(autoScaleLatch.await(30 + 5/* leeway */, TimeUnit.SECONDS)); } + @Test + public void testSourceJobDropMetricTriggersAutoScale() throws InterruptedException { + final String jobId = "test-job-1"; + final String sourceJobId = "source-test-job-1"; + final int stage = 1; + + final MantisMasterClientApi mockMasterClientApi = mock(MantisMasterClientApi.class); + + final Map assignmentsMap = new HashMap<>(); + assignmentsMap.put(stage, new WorkerAssignments(stage, 2, + ImmutableMap.of(1, new WorkerHost("1.1.1.1", 0, Arrays.asList(31300), MantisJobState.Started, 1, 31301, -1), + 2, new WorkerHost("2.2.2.2", 1, Arrays.asList(31300), MantisJobState.Started, 2, 31301, -1)))); + when(mockMasterClientApi.schedulingChanges(jobId)).thenReturn(Observable.just(new JobSchedulingInfo(jobId, assignmentsMap))); + + final CountDownLatch latch = new CountDownLatch(1); + + final AutoScaleMetricsConfig aggregationConfig = new AutoScaleMetricsConfig(); + + final WorkerMetricHandler workerMetricHandler = new WorkerMetricHandler(jobId, new Observer() { + @Override + public void onCompleted() { + logger.warn("onCompleted"); + } + + @Override + public void onError(Throwable e) { + logger.warn("onError {}", e.getMessage(), e); + } + + @Override + public void onNext(JobAutoScaler.Event event) { + logger.info("got auto scale event {}", event); + // Expected metric value should be (1 + 2 + 3 + 6) / 6.0 / 2 + JobAutoScaler.Event expected = new JobAutoScaler.Event(StageScalingPolicy.ScalingReason.SourceJobDrop, stage, 1.0, 2, ""); + if (expected.equals(event)) { + latch.countDown(); + } + } + }, mockMasterClientApi, aggregationConfig); + + final Observer metricDataObserver = workerMetricHandler.initAndGetMetricDataObserver(); + + List gauges = Arrays.asList( + new GaugeMeasurement(PROCESSED_COUNTER_METRIC_NAME, 10.0), + new GaugeMeasurement(DROPPED_COUNTER_METRIC_NAME, 1.0)); + // Source job worker 0 -> job worker 0 + metricDataObserver.onNext(new MetricData(sourceJobId, stage, 0, 1, "ServerSentEventRequestHandler:clientId=" + jobId + ":sockAddr=/1.1.1.1", gauges)); + + gauges = Arrays.asList( + new GaugeMeasurement(PROCESSED_COUNTER_METRIC_NAME, 20.0), + new GaugeMeasurement(DROPPED_COUNTER_METRIC_NAME, 2.0)); + // Source job worker 0 -> job worker 1 + metricDataObserver.onNext(new MetricData(sourceJobId, stage, 0, 1, "ServerSentEventRequestHandler:clientId=" + jobId + ":sockAddr=/2.2.2.2", gauges)); + + gauges = Arrays.asList( + new GaugeMeasurement(PROCESSED_COUNTER_METRIC_NAME, 30.0), + new GaugeMeasurement(DROPPED_COUNTER_METRIC_NAME, 3.0)); + // Source job worker 1 -> job worker 0 + metricDataObserver.onNext(new MetricData(sourceJobId, stage, 1, 2, "ServerSentEventRequestHandler:clientId=" + jobId + ":sockAddr=/1.1.1.1", gauges)); + + gauges = Arrays.asList( + new GaugeMeasurement(PROCESSED_COUNTER_METRIC_NAME, 60.0), + new GaugeMeasurement(DROPPED_COUNTER_METRIC_NAME, 6.0)); + // Source job worker 1 -> job worker 1 + metricDataObserver.onNext(new MetricData(sourceJobId, stage, 1, 2, "ServerSentEventRequestHandler:clientId=" + jobId + ":sockAddr=/2.2.2.2", gauges)); + // Another datapoint from source job worker 1 -> job worker 1 to verify MAX aggregation + gauges = Arrays.asList( + new GaugeMeasurement(PROCESSED_COUNTER_METRIC_NAME, 50.0), + new GaugeMeasurement(DROPPED_COUNTER_METRIC_NAME, 5.0)); + metricDataObserver.onNext(new MetricData(sourceJobId, stage, 1, 2, "ServerSentEventRequestHandler:clientId=" + jobId + ":sockAddr=/2.2.2.2", gauges)); + + assertTrue(latch.await(30 + 5/* leeway */, TimeUnit.SECONDS)); + } }