diff --git a/agent/src/main/java/com/criteo/hadoop/garmadon/agent/headers/ContainerHeader.java b/agent/src/main/java/com/criteo/hadoop/garmadon/agent/headers/ContainerHeader.java
index a8e97734..60babf71 100644
--- a/agent/src/main/java/com/criteo/hadoop/garmadon/agent/headers/ContainerHeader.java
+++ b/agent/src/main/java/com/criteo/hadoop/garmadon/agent/headers/ContainerHeader.java
@@ -1,5 +1,7 @@
package com.criteo.hadoop.garmadon.agent.headers;
+import com.criteo.hadoop.garmadon.jvm.utils.JavaRuntime;
+import com.criteo.hadoop.garmadon.jvm.utils.SparkRuntime;
import com.criteo.hadoop.garmadon.schema.enums.Component;
import com.criteo.hadoop.garmadon.schema.enums.Framework;
import com.criteo.hadoop.garmadon.schema.events.Header;
@@ -21,6 +23,7 @@ public final class ContainerHeader {
// as grafana/ES can't join on different event for display HDFS call per framework/component
// or compute used per framework/component
private Framework framework = Framework.YARN;
+ private String frameworkVersion = null;
private Component component = Component.UNKNOWN;
private String executorId;
private String mainClass;
@@ -52,11 +55,9 @@ private void setFrameworkComponent() {
break;
// SPARK
case "org.apache.spark.deploy.yarn.ApplicationMaster":
- framework = Framework.SPARK;
- component = Component.APP_MASTER;
- break;
case "org.apache.spark.deploy.yarn.ExecutorLauncher":
framework = Framework.SPARK;
+ frameworkVersion = SparkRuntime.getVersion();
component = Component.APP_MASTER;
break;
case "org.apache.spark.executor.CoarseGrainedExecutorBackend":
@@ -76,21 +77,12 @@ private void setFrameworkComponent() {
break;
// FLINK
case "org.apache.flink.yarn.YarnApplicationMasterRunner":
- framework = Framework.FLINK;
- component = Component.APP_MASTER;
- break;
case "org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint":
- framework = Framework.FLINK;
- component = Component.APP_MASTER;
- break;
case "org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint":
framework = Framework.FLINK;
component = Component.APP_MASTER;
break;
case "org.apache.flink.yarn.YarnTaskManager":
- framework = Framework.FLINK;
- component = Component.TASK_MANAGER;
- break;
case "org.apache.flink.yarn.YarnTaskExecutorRunner":
framework = Framework.FLINK;
component = Component.TASK_MANAGER;
@@ -123,9 +115,12 @@ private Header.SerializedHeader createCachedHeader() {
.withContainerID(containerIdString)
.withPid(HeaderUtils.getPid())
.withFramework(framework.toString())
+ .withFrameworkVersion(frameworkVersion)
.withComponent(component.name())
.withExecutorId(executorId)
.withMainClass(mainClass)
+ .withJavaVersion(JavaRuntime.version())
+ .withJavaFeature(JavaRuntime.feature())
.buildSerializedHeader();
}
diff --git a/agent/src/main/java/com/criteo/hadoop/garmadon/agent/headers/NodemanagerHeader.java b/agent/src/main/java/com/criteo/hadoop/garmadon/agent/headers/NodemanagerHeader.java
index c603c39c..4e6b30f8 100644
--- a/agent/src/main/java/com/criteo/hadoop/garmadon/agent/headers/NodemanagerHeader.java
+++ b/agent/src/main/java/com/criteo/hadoop/garmadon/agent/headers/NodemanagerHeader.java
@@ -1,5 +1,6 @@
package com.criteo.hadoop.garmadon.agent.headers;
+import com.criteo.hadoop.garmadon.jvm.utils.JavaRuntime;
import com.criteo.hadoop.garmadon.schema.events.Header;
import com.criteo.hadoop.garmadon.schema.events.HeaderUtils;
@@ -17,6 +18,8 @@ private Header createCachedHeader() {
.withUser(HeaderUtils.getUser())
.withPid(HeaderUtils.getPid())
.withMainClass(HeaderUtils.getJavaMainClass())
+ .withJavaVersion(JavaRuntime.version())
+ .withJavaFeature(JavaRuntime.feature())
.addTag(Header.Tag.NODEMANAGER.name())
.addTags(System.getProperty("garmadon.tags"))
.build();
diff --git a/agent/src/main/java/com/criteo/hadoop/garmadon/agent/headers/RessourceManagerHeader.java b/agent/src/main/java/com/criteo/hadoop/garmadon/agent/headers/RessourceManagerHeader.java
index 08ee075d..cffc3ed5 100644
--- a/agent/src/main/java/com/criteo/hadoop/garmadon/agent/headers/RessourceManagerHeader.java
+++ b/agent/src/main/java/com/criteo/hadoop/garmadon/agent/headers/RessourceManagerHeader.java
@@ -1,5 +1,6 @@
package com.criteo.hadoop.garmadon.agent.headers;
+import com.criteo.hadoop.garmadon.jvm.utils.JavaRuntime;
import com.criteo.hadoop.garmadon.schema.events.Header;
import com.criteo.hadoop.garmadon.schema.events.HeaderUtils;
@@ -17,6 +18,8 @@ private Header createCachedHeader() {
.withUser(HeaderUtils.getUser())
.withPid(HeaderUtils.getPid())
.withMainClass(HeaderUtils.getJavaMainClass())
+ .withJavaVersion(JavaRuntime.version())
+ .withJavaFeature(JavaRuntime.feature())
.addTag(Header.Tag.RESOURCEMANAGER.name())
.addTags(System.getProperty("garmadon.tags"))
.build();
diff --git a/agent/src/main/java/com/criteo/hadoop/garmadon/agent/headers/StandaloneHeader.java b/agent/src/main/java/com/criteo/hadoop/garmadon/agent/headers/StandaloneHeader.java
index 8ff3dcf8..32c86bbf 100644
--- a/agent/src/main/java/com/criteo/hadoop/garmadon/agent/headers/StandaloneHeader.java
+++ b/agent/src/main/java/com/criteo/hadoop/garmadon/agent/headers/StandaloneHeader.java
@@ -1,5 +1,6 @@
package com.criteo.hadoop.garmadon.agent.headers;
+import com.criteo.hadoop.garmadon.jvm.utils.JavaRuntime;
import com.criteo.hadoop.garmadon.schema.events.Header;
import com.criteo.hadoop.garmadon.schema.events.HeaderUtils;
@@ -20,6 +21,8 @@ private Header.SerializedHeader createCachedHeader() {
.withUser(HeaderUtils.getUser())
.withPid(HeaderUtils.getPid())
.withMainClass(HeaderUtils.getJavaMainClass())
+ .withJavaVersion(JavaRuntime.version())
+ .withJavaFeature(JavaRuntime.feature())
.buildSerializedHeader();
}
diff --git a/forwarder/src/main/java/com/criteo/hadoop/garmadon/forwarder/Forwarder.java b/forwarder/src/main/java/com/criteo/hadoop/garmadon/forwarder/Forwarder.java
index bca837d6..782a18d1 100644
--- a/forwarder/src/main/java/com/criteo/hadoop/garmadon/forwarder/Forwarder.java
+++ b/forwarder/src/main/java/com/criteo/hadoop/garmadon/forwarder/Forwarder.java
@@ -5,6 +5,7 @@
import com.criteo.hadoop.garmadon.forwarder.metrics.ForwarderEventSender;
import com.criteo.hadoop.garmadon.forwarder.metrics.HostStatistics;
import com.criteo.hadoop.garmadon.forwarder.metrics.PrometheusHttpMetrics;
+import com.criteo.hadoop.garmadon.jvm.utils.JavaRuntime;
import com.criteo.hadoop.garmadon.schema.events.Header;
import com.criteo.hadoop.garmadon.schema.events.HeaderUtils;
import com.criteo.hadoop.garmadon.schema.exceptions.TypeMarkerException;
@@ -73,6 +74,8 @@ public Forwarder(Properties properties) {
.withPid(HeaderUtils.getPid())
.withUser(HeaderUtils.getUser())
.withMainClass(HeaderUtils.getJavaMainClass())
+ .withJavaVersion(JavaRuntime.version())
+ .withJavaFeature(JavaRuntime.feature())
.addTag(Header.Tag.FORWARDER.name());
for (String tag : properties.getProperty("forwarder.tags", "").split(",")) {
diff --git a/frameworks/flink/src/test/java/com/criteo/hadoop/garmadon/flink/GarmadonFlinkReporterTest.java b/frameworks/flink/src/test/java/com/criteo/hadoop/garmadon/flink/GarmadonFlinkReporterTest.java
index eb2a6724..f08a1fe9 100644
--- a/frameworks/flink/src/test/java/com/criteo/hadoop/garmadon/flink/GarmadonFlinkReporterTest.java
+++ b/frameworks/flink/src/test/java/com/criteo/hadoop/garmadon/flink/GarmadonFlinkReporterTest.java
@@ -23,10 +23,24 @@ public class GarmadonFlinkReporterTest {
private static final Random random = new Random();
- private static final Header DUMMY_HEADER = new Header("id", "appId", "appAttemptId",
- "appName", "user", "container", "hostname",
- Collections.singletonList("tag"), "pid", "framework", "component",
- "executorId", "mainClass");
+ private static final Header DUMMY_HEADER = Header.newBuilder()
+ .withId("id")
+ .withApplicationID("appId")
+ .withAttemptID("appAttemptId")
+ .withApplicationName("appName")
+ .withUser("user")
+ .withContainerID("container")
+ .withHostname("hostname")
+ .addTag("tag")
+ .withPid("pid")
+ .withFramework("framework")
+ .withFrameworkVersion("frameworkVersion")
+ .withComponent("component")
+ .withExecutorId("executorId")
+ .withMainClass("mainClass")
+ .withJavaVersion("javaVersion")
+ .withJavaFeature(8)
+ .build();
private static final String HOST = "localhost";
private static final String JOB_ID = "SomeJobId";
diff --git a/frameworks/spark/src/test/java/com/criteo/hadoop/garmadon/spark/listener/GarmadonSparkListenerTest.java b/frameworks/spark/src/test/java/com/criteo/hadoop/garmadon/spark/listener/GarmadonSparkListenerTest.java
index f57b1036..8a1e4f34 100644
--- a/frameworks/spark/src/test/java/com/criteo/hadoop/garmadon/spark/listener/GarmadonSparkListenerTest.java
+++ b/frameworks/spark/src/test/java/com/criteo/hadoop/garmadon/spark/listener/GarmadonSparkListenerTest.java
@@ -15,10 +15,21 @@
import static org.mockito.Mockito.*;
public class GarmadonSparkListenerTest {
- private static final Header DUMMY_HEADER = new Header("id", "appId", "appAttemptId",
- "appName", "user", "container", "hostname",
- Collections.singletonList("tag"), "pid", "framework", "component",
- "executorId", "mainClass");
+ private static final Header DUMMY_HEADER = Header.newBuilder()
+ .withId("id")
+ .withApplicationID("appId")
+ .withAttemptID("appAttemptId")
+ .withApplicationName("appName")
+ .withUser("user")
+ .withContainerID("container")
+ .withHostname("hostname")
+ .addTag("tag")
+ .withPid("pid")
+ .withFramework("framework")
+ .withComponent("component")
+ .withExecutorId("executorId")
+ .withMainClass("mainClass")
+ .build();
@Test
public void onExecutorAdded() {
diff --git a/jvm-statistics-test-spark-3.5/pom.xml b/jvm-statistics-test-spark-3.5/pom.xml
new file mode 100644
index 00000000..d5040a5c
--- /dev/null
+++ b/jvm-statistics-test-spark-3.5/pom.xml
@@ -0,0 +1,38 @@
+
+
+
+ com.criteo.java
+ garmadon
+ 1.4.0
+ ../pom.xml
+
+
+ 4.0.0
+
+ garmadon-jvm-statistics-test-spark-3.5
+ 1.4.0
+
+
+
+ com.criteo.java
+ garmadon-jvm-statistics
+ ${garmadon.version}
+
+
+
+
+ org.assertj
+ assertj-core
+ 3.11.1
+ test
+
+
+ org.apache.spark
+ spark-core_2.12
+ 3.5.3
+ test
+
+
+
diff --git a/jvm-statistics-test-spark-3.5/src/test/java/com/criteo/hadoop/garmadon/jvm/utils/SparkRuntimeTest.java b/jvm-statistics-test-spark-3.5/src/test/java/com/criteo/hadoop/garmadon/jvm/utils/SparkRuntimeTest.java
new file mode 100644
index 00000000..14d35fe8
--- /dev/null
+++ b/jvm-statistics-test-spark-3.5/src/test/java/com/criteo/hadoop/garmadon/jvm/utils/SparkRuntimeTest.java
@@ -0,0 +1,13 @@
+package com.criteo.hadoop.garmadon.jvm.utils;
+
+import junit.framework.TestCase;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class SparkRuntimeTest extends TestCase {
+
+ public void test_get_3_5_version() {
+ assertThat(SparkRuntime.getVersion()).isEqualTo("3.5.3");
+ }
+
+}
diff --git a/jvm-statistics/pom.xml b/jvm-statistics/pom.xml
index c94d0867..452fea03 100644
--- a/jvm-statistics/pom.xml
+++ b/jvm-statistics/pom.xml
@@ -57,5 +57,11 @@
3.11.1
test
+
+ org.apache.spark
+ spark-core_2.12
+ 3.2.1
+ test
+
diff --git a/jvm-statistics/src/test/java/com/criteo/hadoop/garmadon/jvm/utils/JavaRuntime.java b/jvm-statistics/src/main/java/com/criteo/hadoop/garmadon/jvm/utils/JavaRuntime.java
similarity index 55%
rename from jvm-statistics/src/test/java/com/criteo/hadoop/garmadon/jvm/utils/JavaRuntime.java
rename to jvm-statistics/src/main/java/com/criteo/hadoop/garmadon/jvm/utils/JavaRuntime.java
index afd29de1..05ef2377 100644
--- a/jvm-statistics/src/test/java/com/criteo/hadoop/garmadon/jvm/utils/JavaRuntime.java
+++ b/jvm-statistics/src/main/java/com/criteo/hadoop/garmadon/jvm/utils/JavaRuntime.java
@@ -1,19 +1,11 @@
package com.criteo.hadoop.garmadon.jvm.utils;
public final class JavaRuntime {
-
private static final Version VERSION = parseVersion(System.getProperty("java.version"));
private JavaRuntime() {
}
- public static int getVersion() throws RuntimeException {
- if (VERSION.versionNumber == -1) {
- throw new RuntimeException("Could not parse Java version.", VERSION.parsingError);
- }
- return VERSION.versionNumber;
- }
-
static Version parseVersion(String version) {
try {
int versionNumber;
@@ -23,29 +15,46 @@ static Version parseVersion(String version) {
int dot = version.indexOf(".");
versionNumber = Integer.parseInt(version.substring(0, dot));
}
- return new Version(versionNumber);
+ return new Version(version, versionNumber);
} catch (RuntimeException e) {
- return new Version(e);
+ return new Version(version, e);
+ }
+ }
+
+ public static int feature() throws RuntimeException {
+ if (VERSION.feature == -1) {
+ throw new RuntimeException("Could not parse Java version.", VERSION.parsingError);
}
+ return VERSION.feature;
}
- static final class Version {
+ public static String version() {
+ return VERSION.version;
+ }
- private final int versionNumber;
+ final static class Version {
+ private final String version;
+ private final int feature;
private final RuntimeException parsingError;
- private Version(int versionNumber) {
- this.versionNumber = versionNumber;
+ private Version(String version, int feature) {
+ this.version = version;
+ this.feature = feature;
this.parsingError = null;
}
- private Version(RuntimeException parsingError) {
- this.versionNumber = -1;
+ private Version(String version, RuntimeException parsingError) {
+ this.version = version;
+ this.feature = -1;
this.parsingError = parsingError;
}
- public int getVersionNumber() {
- return versionNumber;
+ public String getVersion() {
+ return version;
+ }
+
+ public int getFeature() {
+ return feature;
}
public RuntimeException getParsingError() {
diff --git a/jvm-statistics/src/main/java/com/criteo/hadoop/garmadon/jvm/utils/SparkRuntime.java b/jvm-statistics/src/main/java/com/criteo/hadoop/garmadon/jvm/utils/SparkRuntime.java
new file mode 100644
index 00000000..269aebea
--- /dev/null
+++ b/jvm-statistics/src/main/java/com/criteo/hadoop/garmadon/jvm/utils/SparkRuntime.java
@@ -0,0 +1,71 @@
+package com.criteo.hadoop.garmadon.jvm.utils;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+public final class SparkRuntime {
+ private static final Version VERSION = computeVersion();
+
+ private SparkRuntime() {
+ }
+
+ public static String getVersion() throws RuntimeException {
+ if (VERSION.versionNumber == null) {
+ throw new RuntimeException("Could not find Spark version. Is this a Spark application?", VERSION.throwable);
+ }
+ return VERSION.versionNumber;
+ }
+
+ static Version computeVersion() {
+ try {
+ return computeSpark32Version();
+ } catch (Throwable e) {
+ try {
+ return computeSpark35Version();
+ } catch (Throwable t) {
+ return new Version(t);
+ }
+ }
+ }
+
+ private static Version computeSpark32Version() throws ClassNotFoundException, NoSuchFieldException, IllegalAccessException {
+ Class> clazz = Class.forName("org.apache.spark.package$SparkBuildInfo$");
+ Field moduleFIeld = clazz.getField("MODULE$");
+ Object instance = moduleFIeld.get(null);
+ Field versionField = clazz.getDeclaredField("spark_version");
+ versionField.setAccessible(true);
+ String version = (String) versionField.get(instance);
+ return new Version(version);
+ }
+
+ private static Version computeSpark35Version() throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {
+ Class> clazz = Class.forName("org.apache.spark.SparkBuildInfo");
+ Method versionMethod = clazz.getDeclaredMethod("spark_version");
+ String version = (String) versionMethod.invoke(null);
+ return new Version(version);
+ }
+
+ final static class Version {
+ private final String versionNumber;
+ private final Throwable throwable;
+
+ private Version(String versionNumber) {
+ this.versionNumber = versionNumber;
+ this.throwable = null;
+ }
+
+ private Version(Throwable throwable) {
+ this.versionNumber = null;
+ this.throwable = throwable;
+ }
+
+ public String getVersionNumer() {
+ return versionNumber;
+ }
+
+ public Throwable getThrowable() {
+ return throwable;
+ }
+ }
+}
diff --git a/jvm-statistics/src/main/java/com/criteo/hadoop/garmadon/jvm/utils/package-info.java b/jvm-statistics/src/main/java/com/criteo/hadoop/garmadon/jvm/utils/package-info.java
new file mode 100644
index 00000000..b9152b99
--- /dev/null
+++ b/jvm-statistics/src/main/java/com/criteo/hadoop/garmadon/jvm/utils/package-info.java
@@ -0,0 +1,5 @@
+package com.criteo.hadoop.garmadon.jvm.utils;
+
+/**
+ * Utils
+ */
\ No newline at end of file
diff --git a/jvm-statistics/src/test/java/com/criteo/hadoop/garmadon/jvm/JVMStatisticsTest.java b/jvm-statistics/src/test/java/com/criteo/hadoop/garmadon/jvm/JVMStatisticsTest.java
index 9fe0114e..0a1fd9fc 100644
--- a/jvm-statistics/src/test/java/com/criteo/hadoop/garmadon/jvm/JVMStatisticsTest.java
+++ b/jvm-statistics/src/test/java/com/criteo/hadoop/garmadon/jvm/JVMStatisticsTest.java
@@ -78,8 +78,8 @@ public class JVMStatisticsTest {
private static final Pattern GCSTATS_PATTERN = Pattern.compile(".* occurred at \\d+-\\d+-\\d+ \\d+:\\d+:\\d+.\\d+, took \\d+ms \\(System\\.gc\\(\\)\\) {2}(eden|survivor|old)\\[[-+]\\d+]\\(\\d+->\\d+\\) (eden|survivor|old)\\[[-+]\\d+]\\(\\d+->\\d+\\).*");
private static final Pattern MACHINESTATS_PATTERN = Pattern.compile("machinecpu\\[%user=\\d+\\.\\d+, %nice=\\d+\\.\\d+, %sys=\\d+\\.\\d+, %idle=\\d+\\.\\d+, %iowait=\\d+\\.\\d+, %irq=\\d+\\.\\d+, %softirq=\\d+\\.\\d+, %core0=\\d+\\.\\d+.*], memory\\[swap=\\d+, physical=\\d+], network\\[.*_rx=\\d+, .*_tx=\\d+, .*_pktrx=\\d+, .*_pkttx=\\d+, .*_errin=\\d+, .*_errout=\\d+.*], disk\\[.*_reads=\\d+, .*_readbytes=\\d+, .*_writes=\\d+, .*_writebytes=\\d+.*]");
- private final CountDownLatch latch = new CountDownLatch(1);
- private final AtomicBoolean isMatches = new AtomicBoolean();
+ private CountDownLatch latch = new CountDownLatch(1);
+ private AtomicBoolean isMatches = new AtomicBoolean();
private volatile String logLine;
@Test
@@ -136,7 +136,7 @@ public void delayStart() throws InterruptedException {
private void assertJVMStatsLog(Long timestamp, String s) {
logLine = s;
- Pattern pattern = JavaRuntime.getVersion() == 8 ? JVM_8_STATS_PATTERN : JVM_9_STATS_PATTERN;
+ Pattern pattern = JavaRuntime.feature() == 8 ? JVM_8_STATS_PATTERN : JVM_9_STATS_PATTERN;
isMatches.set(pattern.matcher(s).matches());
latch.countDown();
}
diff --git a/jvm-statistics/src/test/java/com/criteo/hadoop/garmadon/jvm/utils/JavaRuntimeTest.java b/jvm-statistics/src/test/java/com/criteo/hadoop/garmadon/jvm/utils/JavaRuntimeTest.java
index 4ddb97a0..509a86a6 100644
--- a/jvm-statistics/src/test/java/com/criteo/hadoop/garmadon/jvm/utils/JavaRuntimeTest.java
+++ b/jvm-statistics/src/test/java/com/criteo/hadoop/garmadon/jvm/utils/JavaRuntimeTest.java
@@ -7,25 +7,31 @@
public class JavaRuntimeTest extends TestCase {
public void test_parse_1_8_x_as_8() {
- JavaRuntime.Version version = parseVersion("1.8.0_362");
- assertThat(version.getVersionNumber()).isEqualTo(8);
+ String versionString = "1.8.0_362";
+ JavaRuntime.Version version = parseVersion(versionString);
+ assertThat(version.getVersion()).isEqualTo(versionString);
+ assertThat(version.getFeature()).isEqualTo(8);
assertThat(version.getParsingError()).isNull();
}
public void test_parse_11_x_as_11() {
- JavaRuntime.Version version = parseVersion("11.0.16");
- assertThat(version.getVersionNumber()).isEqualTo(11);
+ String versionString = "11.0.16";
+ JavaRuntime.Version version = parseVersion(versionString);
+ assertThat(version.getVersion()).isEqualTo(versionString);
+ assertThat(version.getFeature()).isEqualTo(11);
assertThat(version.getParsingError()).isNull();
}
public void test_parsing_error() {
- JavaRuntime.Version version = parseVersion("ABC");
- assertThat(version.getVersionNumber()).isEqualTo(-1);
+ String versionString = "ABC";
+ JavaRuntime.Version version = parseVersion(versionString);
+ assertThat(version.getVersion()).isEqualTo(versionString);
+ assertThat(version.getFeature()).isEqualTo(-1);
assertThat(version.getParsingError()).isNotNull();
}
- public void test_getVersion() {
- assertThat(JavaRuntime.getVersion()).isGreaterThanOrEqualTo(8);
+ public void test_feature() {
+ assertThat(JavaRuntime.feature()).isGreaterThanOrEqualTo(8);
}
}
diff --git a/jvm-statistics/src/test/java/com/criteo/hadoop/garmadon/jvm/utils/SparkRuntimeTest.java b/jvm-statistics/src/test/java/com/criteo/hadoop/garmadon/jvm/utils/SparkRuntimeTest.java
new file mode 100644
index 00000000..6bfbdc61
--- /dev/null
+++ b/jvm-statistics/src/test/java/com/criteo/hadoop/garmadon/jvm/utils/SparkRuntimeTest.java
@@ -0,0 +1,13 @@
+package com.criteo.hadoop.garmadon.jvm.utils;
+
+import junit.framework.TestCase;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class SparkRuntimeTest extends TestCase {
+
+ public void test_get_3_2_version() {
+ assertThat(SparkRuntime.getVersion()).isEqualTo("3.2.1");
+ }
+
+}
diff --git a/pom.xml b/pom.xml
index 65678a41..4881c12f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -120,6 +120,7 @@
schema
jvm-statistics
+ jvm-statistics-test-spark-3.5
frameworks
agent
forwarder
diff --git a/readers/elasticsearch/src/test/java/com/criteo/hadoop/garmadon/elasticsearch/ElasticSearchReaderTest.java b/readers/elasticsearch/src/test/java/com/criteo/hadoop/garmadon/elasticsearch/ElasticSearchReaderTest.java
index 83556c47..6440b5bb 100644
--- a/readers/elasticsearch/src/test/java/com/criteo/hadoop/garmadon/elasticsearch/ElasticSearchReaderTest.java
+++ b/readers/elasticsearch/src/test/java/com/criteo/hadoop/garmadon/elasticsearch/ElasticSearchReaderTest.java
@@ -14,8 +14,9 @@
import com.criteo.hadoop.garmadon.schema.enums.State;
import com.criteo.hadoop.garmadon.schema.serialization.GarmadonSerialization;
import com.google.protobuf.Message;
-import java.util.Collection;
-import java.util.Collections;
+
+import java.util.*;
+
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.elasticsearch.action.bulk.BulkProcessor;
@@ -25,10 +26,6 @@
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;
@@ -81,11 +78,14 @@ public void setUp() {
headerMap.put("component", "");
headerMap.put("application_name", "application_name");
headerMap.put("framework", "");
+ headerMap.put("framework_version", "");
headerMap.put("attempt_id", "attempt_id");
headerMap.put("container_id", "container_id");
headerMap.put("username", "user");
headerMap.put("executor_id", "");
headerMap.put("timestamp", 0);
+ headerMap.put("java_version", "");
+ headerMap.put("java_feature", 0);
}
public void writeGarmadonMessage(int type, Message message, long timestampMillis) {
@@ -110,7 +110,7 @@ public void writeToES_StateEventMessage() {
writeGarmadonMessage(type, event, 0L);
verify(bulkProcessor, times(1)).add(argument.capture(), any(CommittableOffset.class));
- assertEquals(eventMap, argument.getValue().sourceAsMap());
+ assertEquals(new TreeMap<>(eventMap), new TreeMap<>(argument.getValue().sourceAsMap()));
}
@Test
@@ -143,7 +143,7 @@ public void writeToES_AppEventMessage() {
writeGarmadonMessage(type, event, 0L);
verify(bulkProcessor, times(1)).add(argument.capture(), any(CommittableOffset.class));
- assertEquals(eventMap, argument.getValue().sourceAsMap());
+ assertEquals(new TreeMap<>(eventMap), new TreeMap<>(argument.getValue().sourceAsMap()));
}
@Test
@@ -205,7 +205,8 @@ public void writeToES_JVMstats() {
}
}
assertEquals(dsikEventMap, diskMap);
- assertEquals(jvmEventMap, jvmMap);
+
+ assertEquals(new TreeMap<>(jvmEventMap), new TreeMap<>(jvmMap));
}
@Test
@@ -234,7 +235,7 @@ public void writeToES_FsEventMessage() {
writeGarmadonMessage(type, event, 0L);
verify(bulkProcessor, times(1)).add(argument.capture(), any(CommittableOffset.class));
- assertEquals(eventMap, argument.getValue().sourceAsMap());
+ assertEquals(new TreeMap<>(eventMap), new TreeMap<>(argument.getValue().sourceAsMap()));
}
}
diff --git a/readers/elasticsearch/src/test/java/com/criteo/hadoop/garmadon/elasticsearch/cache/ElasticSearchCacheManagerTest.java b/readers/elasticsearch/src/test/java/com/criteo/hadoop/garmadon/elasticsearch/cache/ElasticSearchCacheManagerTest.java
index bedbcd44..9bbb027b 100644
--- a/readers/elasticsearch/src/test/java/com/criteo/hadoop/garmadon/elasticsearch/cache/ElasticSearchCacheManagerTest.java
+++ b/readers/elasticsearch/src/test/java/com/criteo/hadoop/garmadon/elasticsearch/cache/ElasticSearchCacheManagerTest.java
@@ -28,6 +28,7 @@ public class ElasticSearchCacheManagerTest {
private final String component = "EXECUTOR";
private final String applicationName = "application_name";
private final String framework = "SPARK";
+ private final String frameworkVersion = "3.5.2";
private final String username = "n.fraison";
private final List yarnTags = new ArrayList();
@@ -89,6 +90,7 @@ private GarmadonMessage generateGarmadonMessage(int type, String component, Mess
.setAttemptId("attempt_id")
.setContainerId(containerId)
.setFramework(framework)
+ .setFrameworkVersion(frameworkVersion)
.setComponent(component)
.build();
@@ -148,6 +150,7 @@ public void do_not_add_container_with_unknown_component_in_cache() {
.setContainerId(containerId)
.setComponent(Component.UNKNOWN.name())
.setFramework(framework)
+ .setFrameworkVersion(frameworkVersion)
.build();
DataAccessEventProtos.FsEvent fsEvent = DataAccessEventProtos.FsEvent.newBuilder()
diff --git a/schema/src/main/java/com/criteo/hadoop/garmadon/schema/events/Header.java b/schema/src/main/java/com/criteo/hadoop/garmadon/schema/events/Header.java
index ba7aee5a..f1a74c37 100644
--- a/schema/src/main/java/com/criteo/hadoop/garmadon/schema/events/Header.java
+++ b/schema/src/main/java/com/criteo/hadoop/garmadon/schema/events/Header.java
@@ -19,9 +19,12 @@ public class Header {
private final String hostname;
private final String pid;
private final String framework;
+ private final String frameworkVersion;
private final String component;
private final String executorId;
private final String mainClass;
+ private final String javaVersion;
+ private final int javaFeature;
private final List tags;
public enum Tag {
@@ -32,22 +35,23 @@ public enum Tag {
STANDALONE
}
- public Header(String id, String applicationID, String attemptID, String applicationName,
- String user, String containerID, String hostname, List tags, String pid,
- String framework, String component, String executorId, String mainClass) {
- this.id = id;
- this.applicationID = applicationID;
- this.attemptID = attemptID;
- this.applicationName = applicationName;
- this.user = user;
- this.containerID = containerID;
- this.hostname = hostname;
- this.tags = tags;
- this.pid = pid;
- this.framework = framework;
- this.component = component;
- this.executorId = executorId;
- this.mainClass = mainClass;
+ private Header(Builder builder) {
+ this.id = builder.id;
+ this.applicationID = builder.applicationID;
+ this.attemptID = builder.attemptID;
+ this.applicationName = builder.applicationName;
+ this.user = builder.user;
+ this.containerID = builder.containerID;
+ this.hostname = builder.hostname;
+ this.tags = builder.tags;
+ this.pid = builder.pid;
+ this.framework = builder.framework;
+ this.frameworkVersion = builder.frameworkVersion;
+ this.component = builder.component;
+ this.executorId = builder.executorId;
+ this.mainClass = builder.mainClass;
+ this.javaVersion = builder.javaVersion;
+ this.javaFeature = builder.javaFeature;
}
public String getId() {
@@ -86,6 +90,11 @@ public String getFramework() {
return framework;
}
+ public String getFrameworkVersion() {
+ return frameworkVersion;
+ }
+
+
public String getComponent() {
return component;
}
@@ -98,6 +107,14 @@ public String getMainClass() {
return mainClass;
}
+ public String getJavaVersion() {
+ return javaVersion;
+ }
+
+ public int getJavaFeature() {
+ return javaFeature;
+ }
+
public List getTags() {
return tags;
}
@@ -119,56 +136,68 @@ public boolean equals(Object other) {
Objects.equals(this.tags, otherHeader.tags) &&
Objects.equals(this.pid, otherHeader.pid) &&
Objects.equals(this.framework, otherHeader.framework) &&
+ Objects.equals(this.frameworkVersion, otherHeader.frameworkVersion) &&
Objects.equals(this.component, otherHeader.component) &&
Objects.equals(this.executorId, otherHeader.executorId) &&
- Objects.equals(this.mainClass, otherHeader.mainClass);
+ Objects.equals(this.mainClass, otherHeader.mainClass) &&
+ Objects.equals(this.javaVersion, otherHeader.javaVersion) &&
+ this.javaFeature == otherHeader.javaFeature;
}
@Override
public int hashCode() {
- return Objects.hash(id, applicationID, attemptID, applicationName, user, containerID, hostname, pid, framework, component,
- executorId, mainClass, tags);
+ return Objects.hash(id, applicationID, attemptID, applicationName, user, containerID, hostname, pid, framework,
+ frameworkVersion, component, executorId, mainClass, tags, javaVersion, javaFeature);
}
public Header cloneAndOverride(Header override) {
- String idClone = override.id != null ? override.id : this.id;
- String applicationIDClone = override.applicationID != null ? override.applicationID : this.applicationID;
- String attemptIDClone = override.attemptID != null ? override.attemptID : this.attemptID;
- String applicationNameClone = override.applicationName != null ? override.applicationName : this.applicationName;
- String userClone = override.user != null ? override.user : this.user;
- String containerIDClone = override.containerID != null ? override.containerID : this.containerID;
- String hostnameClone = override.hostname != null ? override.hostname : this.hostname;
- List tagsClone = (override.tags != null && override.tags.size() > 0) ? override.tags : this.tags;
- String pidClone = override.pid != null ? override.pid : this.pid;
- String frameworkClone = override.framework != null ? override.framework : this.framework;
- String componentClone = override.component != null ? override.component : this.component;
- String executorIdClone = override.executorId != null ? override.executorId : this.executorId;
- String mainClassClone = override.mainClass != null ? override.mainClass : this.mainClass;
- return new Header(
- idClone,
- applicationIDClone,
- attemptIDClone,
- applicationNameClone,
- userClone,
- containerIDClone,
- hostnameClone,
- tagsClone,
- pidClone,
- frameworkClone,
- componentClone,
- executorIdClone,
- mainClassClone
- );
+ Builder cloneBuilder = new Builder();
+ cloneBuilder.id = override.id != null ? override.id : this.id;
+ cloneBuilder.applicationID = override.applicationID != null ? override.applicationID : this.applicationID;
+ cloneBuilder.attemptID = override.attemptID != null ? override.attemptID : this.attemptID;
+ cloneBuilder.applicationName = override.applicationName != null ? override.applicationName : this.applicationName;
+ cloneBuilder.user = override.user != null ? override.user : this.user;
+ cloneBuilder.containerID = override.containerID != null ? override.containerID : this.containerID;
+ cloneBuilder.hostname = override.hostname != null ? override.hostname : this.hostname;
+ cloneBuilder.tags = (override.tags != null && !override.tags.isEmpty()) ? override.tags : this.tags;
+ cloneBuilder.pid = override.pid != null ? override.pid : this.pid;
+ cloneBuilder.framework = override.framework != null ? override.framework : this.framework;
+ cloneBuilder.frameworkVersion = override.frameworkVersion != null ? override.frameworkVersion : this.frameworkVersion;
+ cloneBuilder.component = override.component != null ? override.component : this.component;
+ cloneBuilder.executorId = override.executorId != null ? override.executorId : this.executorId;
+ cloneBuilder.mainClass = override.mainClass != null ? override.mainClass : this.mainClass;
+ cloneBuilder.javaVersion = override.javaVersion != null ? override.javaVersion : this.javaVersion;
+ cloneBuilder.javaFeature = override.javaFeature != 0 ? override.javaFeature : this.javaFeature;
+ return cloneBuilder.build();
+ }
+
+ private Builder toBuilder() {
+ Builder builder = new Builder();
+ builder.id = this.id;
+ builder.applicationID = this.applicationID;
+ builder.attemptID = this.attemptID;
+ builder.applicationName = this.applicationName;
+ builder.user = this.user;
+ builder.containerID = this.containerID;
+ builder.hostname = this.hostname;
+ builder.tags = this.tags;
+ builder.pid = this.pid;
+ builder.framework = this.framework;
+ builder.frameworkVersion = this.frameworkVersion;
+ builder.component = this.component;
+ builder.executorId = this.executorId;
+ builder.mainClass = this.mainClass;
+ builder.javaVersion = this.javaVersion;
+ builder.javaFeature = this.javaFeature;
+ return builder;
}
public SerializedHeader toSerializeHeader() {
- return new SerializedHeader(id, applicationID, attemptID, applicationName, user, containerID, hostname, tags, pid, framework,
- component, executorId, mainClass);
+ return new SerializedHeader(toBuilder());
}
public byte[] serialize() {
- EventHeaderProtos.Header.Builder builder = EventHeaderProtos.Header
- .newBuilder();
+ EventHeaderProtos.Header.Builder builder = EventHeaderProtos.Header.newBuilder();
if (id != null) builder.setId(id);
if (applicationID != null) builder.setApplicationId(applicationID);
if (attemptID != null) builder.setAttemptId(attemptID);
@@ -176,16 +205,19 @@ public byte[] serialize() {
if (user != null) builder.setUsername(user);
if (containerID != null) builder.setContainerId(containerID);
if (hostname != null) builder.setHostname(hostname);
- if (tags != null && tags.size() > 0) {
+ if (tags != null && !tags.isEmpty()) {
for (String tag : tags) {
builder.addTags(tag);
}
}
if (pid != null) builder.setPid(pid);
if (framework != null) builder.setFramework(framework);
+ if (frameworkVersion != null) builder.setFrameworkVersion(frameworkVersion);
if (component != null) builder.setComponent(component);
if (executorId != null) builder.setExecutorId(executorId);
if (mainClass != null) builder.setMainClass(mainClass);
+ if (javaVersion != null) builder.setJavaVersion(javaVersion);
+ if (javaFeature != 0) builder.setJavaFeature(javaFeature);
return builder.build().toByteArray();
}
@@ -201,9 +233,12 @@ public String toString() {
", hostname='" + hostname + '\'' +
", pid='" + pid + '\'' +
", framework='" + framework + '\'' +
+ ", frameworkVersion='" + frameworkVersion + '\'' +
", component='" + component + '\'' +
", executorId='" + executorId + '\'' +
", mainClass='" + mainClass + '\'' +
+ ", javaVersion='" + javaVersion + '\'' +
+ ", javaFeature='" + javaFeature + '\'' +
", tags=" + tags +
'}';
}
@@ -211,11 +246,8 @@ public String toString() {
public static class SerializedHeader extends Header {
private final byte[] bytes;
- SerializedHeader(String id, String applicationID, String appAttemptID, String applicationName, String user,
- String containerID, String hostname, List tags, String pid, String framework,
- String component, String executorId, String mainClass) {
- super(id, applicationID, appAttemptID, applicationName, user, containerID, hostname, tags, pid, framework,
- component, executorId, mainClass);
+ SerializedHeader(Builder builder) {
+ super(builder);
this.bytes = super.serialize();
}
@@ -230,7 +262,7 @@ public static Builder newBuilder() {
}
public static class Builder {
- private static final Logger LOGGER = LoggerFactory.getLogger(Header.Builder.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(Builder.class);
private static final String TAGS_REGEX = "^[a-zA-Z0-9_\\-\\.]*$";
private String id;
@@ -242,9 +274,13 @@ public static class Builder {
private String hostname;
private String pid;
private String framework;
+ private String frameworkVersion;
private String component;
private String executorId;
private String mainClass;
+ private String javaVersion;
+ private int javaFeature;
+
private List tags = new ArrayList<>();
Builder() {
@@ -313,6 +349,11 @@ public Builder withFramework(String framework) {
return this;
}
+ public Builder withFrameworkVersion(String frameworkVersion) {
+ this.frameworkVersion = frameworkVersion;
+ return this;
+ }
+
public Builder withComponent(String component) {
this.component = component;
return this;
@@ -328,14 +369,22 @@ public Builder withMainClass(String mainClass) {
return this;
}
+ public Builder withJavaVersion(String javaVersion) {
+ this.javaVersion = javaVersion;
+ return this;
+ }
+
+ public Builder withJavaFeature(int javaFeature) {
+ this.javaFeature = javaFeature;
+ return this;
+ }
+
public Header build() {
- return new Header(id, applicationID, attemptID, applicationName, user, containerID, hostname, tags,
- pid, framework, component, executorId, mainClass);
+ return new Header(this);
}
public SerializedHeader buildSerializedHeader() {
- return new SerializedHeader(id, applicationID, attemptID, applicationName, user, containerID, hostname,
- tags, pid, framework, component, executorId, mainClass);
+ return new SerializedHeader(this);
}
}
}
diff --git a/schema/src/main/protobuf/event_header.proto b/schema/src/main/protobuf/event_header.proto
index 7e60b174..cbd4b2e3 100644
--- a/schema/src/main/protobuf/event_header.proto
+++ b/schema/src/main/protobuf/event_header.proto
@@ -19,4 +19,7 @@ message Header {
repeated string tags = 12;
string id = 13;
string main_class = 14;
+ string java_version = 15;
+ int32 java_feature = 16;
+ string framework_version = 17;
}
\ No newline at end of file