diff --git a/agent/src/main/java/com/criteo/hadoop/garmadon/agent/headers/ContainerHeader.java b/agent/src/main/java/com/criteo/hadoop/garmadon/agent/headers/ContainerHeader.java index a8e97734..60babf71 100644 --- a/agent/src/main/java/com/criteo/hadoop/garmadon/agent/headers/ContainerHeader.java +++ b/agent/src/main/java/com/criteo/hadoop/garmadon/agent/headers/ContainerHeader.java @@ -1,5 +1,7 @@ package com.criteo.hadoop.garmadon.agent.headers; +import com.criteo.hadoop.garmadon.jvm.utils.JavaRuntime; +import com.criteo.hadoop.garmadon.jvm.utils.SparkRuntime; import com.criteo.hadoop.garmadon.schema.enums.Component; import com.criteo.hadoop.garmadon.schema.enums.Framework; import com.criteo.hadoop.garmadon.schema.events.Header; @@ -21,6 +23,7 @@ public final class ContainerHeader { // as grafana/ES can't join on different event for display HDFS call per framework/component // or compute used per framework/component private Framework framework = Framework.YARN; + private String frameworkVersion = null; private Component component = Component.UNKNOWN; private String executorId; private String mainClass; @@ -52,11 +55,9 @@ private void setFrameworkComponent() { break; // SPARK case "org.apache.spark.deploy.yarn.ApplicationMaster": - framework = Framework.SPARK; - component = Component.APP_MASTER; - break; case "org.apache.spark.deploy.yarn.ExecutorLauncher": framework = Framework.SPARK; + frameworkVersion = SparkRuntime.getVersion(); component = Component.APP_MASTER; break; case "org.apache.spark.executor.CoarseGrainedExecutorBackend": @@ -76,21 +77,12 @@ private void setFrameworkComponent() { break; // FLINK case "org.apache.flink.yarn.YarnApplicationMasterRunner": - framework = Framework.FLINK; - component = Component.APP_MASTER; - break; case "org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint": - framework = Framework.FLINK; - component = Component.APP_MASTER; - break; case "org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint": framework = Framework.FLINK; component = Component.APP_MASTER; break; case "org.apache.flink.yarn.YarnTaskManager": - framework = Framework.FLINK; - component = Component.TASK_MANAGER; - break; case "org.apache.flink.yarn.YarnTaskExecutorRunner": framework = Framework.FLINK; component = Component.TASK_MANAGER; @@ -123,9 +115,12 @@ private Header.SerializedHeader createCachedHeader() { .withContainerID(containerIdString) .withPid(HeaderUtils.getPid()) .withFramework(framework.toString()) + .withFrameworkVersion(frameworkVersion) .withComponent(component.name()) .withExecutorId(executorId) .withMainClass(mainClass) + .withJavaVersion(JavaRuntime.version()) + .withJavaFeature(JavaRuntime.feature()) .buildSerializedHeader(); } diff --git a/agent/src/main/java/com/criteo/hadoop/garmadon/agent/headers/NodemanagerHeader.java b/agent/src/main/java/com/criteo/hadoop/garmadon/agent/headers/NodemanagerHeader.java index c603c39c..4e6b30f8 100644 --- a/agent/src/main/java/com/criteo/hadoop/garmadon/agent/headers/NodemanagerHeader.java +++ b/agent/src/main/java/com/criteo/hadoop/garmadon/agent/headers/NodemanagerHeader.java @@ -1,5 +1,6 @@ package com.criteo.hadoop.garmadon.agent.headers; +import com.criteo.hadoop.garmadon.jvm.utils.JavaRuntime; import com.criteo.hadoop.garmadon.schema.events.Header; import com.criteo.hadoop.garmadon.schema.events.HeaderUtils; @@ -17,6 +18,8 @@ private Header createCachedHeader() { .withUser(HeaderUtils.getUser()) .withPid(HeaderUtils.getPid()) .withMainClass(HeaderUtils.getJavaMainClass()) + .withJavaVersion(JavaRuntime.version()) + .withJavaFeature(JavaRuntime.feature()) .addTag(Header.Tag.NODEMANAGER.name()) .addTags(System.getProperty("garmadon.tags")) .build(); diff --git a/agent/src/main/java/com/criteo/hadoop/garmadon/agent/headers/RessourceManagerHeader.java b/agent/src/main/java/com/criteo/hadoop/garmadon/agent/headers/RessourceManagerHeader.java index 08ee075d..cffc3ed5 100644 --- a/agent/src/main/java/com/criteo/hadoop/garmadon/agent/headers/RessourceManagerHeader.java +++ b/agent/src/main/java/com/criteo/hadoop/garmadon/agent/headers/RessourceManagerHeader.java @@ -1,5 +1,6 @@ package com.criteo.hadoop.garmadon.agent.headers; +import com.criteo.hadoop.garmadon.jvm.utils.JavaRuntime; import com.criteo.hadoop.garmadon.schema.events.Header; import com.criteo.hadoop.garmadon.schema.events.HeaderUtils; @@ -17,6 +18,8 @@ private Header createCachedHeader() { .withUser(HeaderUtils.getUser()) .withPid(HeaderUtils.getPid()) .withMainClass(HeaderUtils.getJavaMainClass()) + .withJavaVersion(JavaRuntime.version()) + .withJavaFeature(JavaRuntime.feature()) .addTag(Header.Tag.RESOURCEMANAGER.name()) .addTags(System.getProperty("garmadon.tags")) .build(); diff --git a/agent/src/main/java/com/criteo/hadoop/garmadon/agent/headers/StandaloneHeader.java b/agent/src/main/java/com/criteo/hadoop/garmadon/agent/headers/StandaloneHeader.java index 8ff3dcf8..32c86bbf 100644 --- a/agent/src/main/java/com/criteo/hadoop/garmadon/agent/headers/StandaloneHeader.java +++ b/agent/src/main/java/com/criteo/hadoop/garmadon/agent/headers/StandaloneHeader.java @@ -1,5 +1,6 @@ package com.criteo.hadoop.garmadon.agent.headers; +import com.criteo.hadoop.garmadon.jvm.utils.JavaRuntime; import com.criteo.hadoop.garmadon.schema.events.Header; import com.criteo.hadoop.garmadon.schema.events.HeaderUtils; @@ -20,6 +21,8 @@ private Header.SerializedHeader createCachedHeader() { .withUser(HeaderUtils.getUser()) .withPid(HeaderUtils.getPid()) .withMainClass(HeaderUtils.getJavaMainClass()) + .withJavaVersion(JavaRuntime.version()) + .withJavaFeature(JavaRuntime.feature()) .buildSerializedHeader(); } diff --git a/forwarder/src/main/java/com/criteo/hadoop/garmadon/forwarder/Forwarder.java b/forwarder/src/main/java/com/criteo/hadoop/garmadon/forwarder/Forwarder.java index bca837d6..782a18d1 100644 --- a/forwarder/src/main/java/com/criteo/hadoop/garmadon/forwarder/Forwarder.java +++ b/forwarder/src/main/java/com/criteo/hadoop/garmadon/forwarder/Forwarder.java @@ -5,6 +5,7 @@ import com.criteo.hadoop.garmadon.forwarder.metrics.ForwarderEventSender; import com.criteo.hadoop.garmadon.forwarder.metrics.HostStatistics; import com.criteo.hadoop.garmadon.forwarder.metrics.PrometheusHttpMetrics; +import com.criteo.hadoop.garmadon.jvm.utils.JavaRuntime; import com.criteo.hadoop.garmadon.schema.events.Header; import com.criteo.hadoop.garmadon.schema.events.HeaderUtils; import com.criteo.hadoop.garmadon.schema.exceptions.TypeMarkerException; @@ -73,6 +74,8 @@ public Forwarder(Properties properties) { .withPid(HeaderUtils.getPid()) .withUser(HeaderUtils.getUser()) .withMainClass(HeaderUtils.getJavaMainClass()) + .withJavaVersion(JavaRuntime.version()) + .withJavaFeature(JavaRuntime.feature()) .addTag(Header.Tag.FORWARDER.name()); for (String tag : properties.getProperty("forwarder.tags", "").split(",")) { diff --git a/frameworks/flink/src/test/java/com/criteo/hadoop/garmadon/flink/GarmadonFlinkReporterTest.java b/frameworks/flink/src/test/java/com/criteo/hadoop/garmadon/flink/GarmadonFlinkReporterTest.java index eb2a6724..f08a1fe9 100644 --- a/frameworks/flink/src/test/java/com/criteo/hadoop/garmadon/flink/GarmadonFlinkReporterTest.java +++ b/frameworks/flink/src/test/java/com/criteo/hadoop/garmadon/flink/GarmadonFlinkReporterTest.java @@ -23,10 +23,24 @@ public class GarmadonFlinkReporterTest { private static final Random random = new Random(); - private static final Header DUMMY_HEADER = new Header("id", "appId", "appAttemptId", - "appName", "user", "container", "hostname", - Collections.singletonList("tag"), "pid", "framework", "component", - "executorId", "mainClass"); + private static final Header DUMMY_HEADER = Header.newBuilder() + .withId("id") + .withApplicationID("appId") + .withAttemptID("appAttemptId") + .withApplicationName("appName") + .withUser("user") + .withContainerID("container") + .withHostname("hostname") + .addTag("tag") + .withPid("pid") + .withFramework("framework") + .withFrameworkVersion("frameworkVersion") + .withComponent("component") + .withExecutorId("executorId") + .withMainClass("mainClass") + .withJavaVersion("javaVersion") + .withJavaFeature(8) + .build(); private static final String HOST = "localhost"; private static final String JOB_ID = "SomeJobId"; diff --git a/frameworks/spark/src/test/java/com/criteo/hadoop/garmadon/spark/listener/GarmadonSparkListenerTest.java b/frameworks/spark/src/test/java/com/criteo/hadoop/garmadon/spark/listener/GarmadonSparkListenerTest.java index f57b1036..8a1e4f34 100644 --- a/frameworks/spark/src/test/java/com/criteo/hadoop/garmadon/spark/listener/GarmadonSparkListenerTest.java +++ b/frameworks/spark/src/test/java/com/criteo/hadoop/garmadon/spark/listener/GarmadonSparkListenerTest.java @@ -15,10 +15,21 @@ import static org.mockito.Mockito.*; public class GarmadonSparkListenerTest { - private static final Header DUMMY_HEADER = new Header("id", "appId", "appAttemptId", - "appName", "user", "container", "hostname", - Collections.singletonList("tag"), "pid", "framework", "component", - "executorId", "mainClass"); + private static final Header DUMMY_HEADER = Header.newBuilder() + .withId("id") + .withApplicationID("appId") + .withAttemptID("appAttemptId") + .withApplicationName("appName") + .withUser("user") + .withContainerID("container") + .withHostname("hostname") + .addTag("tag") + .withPid("pid") + .withFramework("framework") + .withComponent("component") + .withExecutorId("executorId") + .withMainClass("mainClass") + .build(); @Test public void onExecutorAdded() { diff --git a/jvm-statistics-test-spark-3.5/pom.xml b/jvm-statistics-test-spark-3.5/pom.xml new file mode 100644 index 00000000..d5040a5c --- /dev/null +++ b/jvm-statistics-test-spark-3.5/pom.xml @@ -0,0 +1,38 @@ + + + + com.criteo.java + garmadon + 1.4.0 + ../pom.xml + + + 4.0.0 + + garmadon-jvm-statistics-test-spark-3.5 + 1.4.0 + + + + com.criteo.java + garmadon-jvm-statistics + ${garmadon.version} + + + + + org.assertj + assertj-core + 3.11.1 + test + + + org.apache.spark + spark-core_2.12 + 3.5.3 + test + + + diff --git a/jvm-statistics-test-spark-3.5/src/test/java/com/criteo/hadoop/garmadon/jvm/utils/SparkRuntimeTest.java b/jvm-statistics-test-spark-3.5/src/test/java/com/criteo/hadoop/garmadon/jvm/utils/SparkRuntimeTest.java new file mode 100644 index 00000000..14d35fe8 --- /dev/null +++ b/jvm-statistics-test-spark-3.5/src/test/java/com/criteo/hadoop/garmadon/jvm/utils/SparkRuntimeTest.java @@ -0,0 +1,13 @@ +package com.criteo.hadoop.garmadon.jvm.utils; + +import junit.framework.TestCase; + +import static org.assertj.core.api.Assertions.assertThat; + +public class SparkRuntimeTest extends TestCase { + + public void test_get_3_5_version() { + assertThat(SparkRuntime.getVersion()).isEqualTo("3.5.3"); + } + +} diff --git a/jvm-statistics/pom.xml b/jvm-statistics/pom.xml index c94d0867..452fea03 100644 --- a/jvm-statistics/pom.xml +++ b/jvm-statistics/pom.xml @@ -57,5 +57,11 @@ 3.11.1 test + + org.apache.spark + spark-core_2.12 + 3.2.1 + test + diff --git a/jvm-statistics/src/test/java/com/criteo/hadoop/garmadon/jvm/utils/JavaRuntime.java b/jvm-statistics/src/main/java/com/criteo/hadoop/garmadon/jvm/utils/JavaRuntime.java similarity index 55% rename from jvm-statistics/src/test/java/com/criteo/hadoop/garmadon/jvm/utils/JavaRuntime.java rename to jvm-statistics/src/main/java/com/criteo/hadoop/garmadon/jvm/utils/JavaRuntime.java index afd29de1..05ef2377 100644 --- a/jvm-statistics/src/test/java/com/criteo/hadoop/garmadon/jvm/utils/JavaRuntime.java +++ b/jvm-statistics/src/main/java/com/criteo/hadoop/garmadon/jvm/utils/JavaRuntime.java @@ -1,19 +1,11 @@ package com.criteo.hadoop.garmadon.jvm.utils; public final class JavaRuntime { - private static final Version VERSION = parseVersion(System.getProperty("java.version")); private JavaRuntime() { } - public static int getVersion() throws RuntimeException { - if (VERSION.versionNumber == -1) { - throw new RuntimeException("Could not parse Java version.", VERSION.parsingError); - } - return VERSION.versionNumber; - } - static Version parseVersion(String version) { try { int versionNumber; @@ -23,29 +15,46 @@ static Version parseVersion(String version) { int dot = version.indexOf("."); versionNumber = Integer.parseInt(version.substring(0, dot)); } - return new Version(versionNumber); + return new Version(version, versionNumber); } catch (RuntimeException e) { - return new Version(e); + return new Version(version, e); + } + } + + public static int feature() throws RuntimeException { + if (VERSION.feature == -1) { + throw new RuntimeException("Could not parse Java version.", VERSION.parsingError); } + return VERSION.feature; } - static final class Version { + public static String version() { + return VERSION.version; + } - private final int versionNumber; + final static class Version { + private final String version; + private final int feature; private final RuntimeException parsingError; - private Version(int versionNumber) { - this.versionNumber = versionNumber; + private Version(String version, int feature) { + this.version = version; + this.feature = feature; this.parsingError = null; } - private Version(RuntimeException parsingError) { - this.versionNumber = -1; + private Version(String version, RuntimeException parsingError) { + this.version = version; + this.feature = -1; this.parsingError = parsingError; } - public int getVersionNumber() { - return versionNumber; + public String getVersion() { + return version; + } + + public int getFeature() { + return feature; } public RuntimeException getParsingError() { diff --git a/jvm-statistics/src/main/java/com/criteo/hadoop/garmadon/jvm/utils/SparkRuntime.java b/jvm-statistics/src/main/java/com/criteo/hadoop/garmadon/jvm/utils/SparkRuntime.java new file mode 100644 index 00000000..269aebea --- /dev/null +++ b/jvm-statistics/src/main/java/com/criteo/hadoop/garmadon/jvm/utils/SparkRuntime.java @@ -0,0 +1,71 @@ +package com.criteo.hadoop.garmadon.jvm.utils; + +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; + +public final class SparkRuntime { + private static final Version VERSION = computeVersion(); + + private SparkRuntime() { + } + + public static String getVersion() throws RuntimeException { + if (VERSION.versionNumber == null) { + throw new RuntimeException("Could not find Spark version. Is this a Spark application?", VERSION.throwable); + } + return VERSION.versionNumber; + } + + static Version computeVersion() { + try { + return computeSpark32Version(); + } catch (Throwable e) { + try { + return computeSpark35Version(); + } catch (Throwable t) { + return new Version(t); + } + } + } + + private static Version computeSpark32Version() throws ClassNotFoundException, NoSuchFieldException, IllegalAccessException { + Class clazz = Class.forName("org.apache.spark.package$SparkBuildInfo$"); + Field moduleFIeld = clazz.getField("MODULE$"); + Object instance = moduleFIeld.get(null); + Field versionField = clazz.getDeclaredField("spark_version"); + versionField.setAccessible(true); + String version = (String) versionField.get(instance); + return new Version(version); + } + + private static Version computeSpark35Version() throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException { + Class clazz = Class.forName("org.apache.spark.SparkBuildInfo"); + Method versionMethod = clazz.getDeclaredMethod("spark_version"); + String version = (String) versionMethod.invoke(null); + return new Version(version); + } + + final static class Version { + private final String versionNumber; + private final Throwable throwable; + + private Version(String versionNumber) { + this.versionNumber = versionNumber; + this.throwable = null; + } + + private Version(Throwable throwable) { + this.versionNumber = null; + this.throwable = throwable; + } + + public String getVersionNumer() { + return versionNumber; + } + + public Throwable getThrowable() { + return throwable; + } + } +} diff --git a/jvm-statistics/src/main/java/com/criteo/hadoop/garmadon/jvm/utils/package-info.java b/jvm-statistics/src/main/java/com/criteo/hadoop/garmadon/jvm/utils/package-info.java new file mode 100644 index 00000000..b9152b99 --- /dev/null +++ b/jvm-statistics/src/main/java/com/criteo/hadoop/garmadon/jvm/utils/package-info.java @@ -0,0 +1,5 @@ +package com.criteo.hadoop.garmadon.jvm.utils; + +/** + * Utils + */ \ No newline at end of file diff --git a/jvm-statistics/src/test/java/com/criteo/hadoop/garmadon/jvm/JVMStatisticsTest.java b/jvm-statistics/src/test/java/com/criteo/hadoop/garmadon/jvm/JVMStatisticsTest.java index 9fe0114e..0a1fd9fc 100644 --- a/jvm-statistics/src/test/java/com/criteo/hadoop/garmadon/jvm/JVMStatisticsTest.java +++ b/jvm-statistics/src/test/java/com/criteo/hadoop/garmadon/jvm/JVMStatisticsTest.java @@ -78,8 +78,8 @@ public class JVMStatisticsTest { private static final Pattern GCSTATS_PATTERN = Pattern.compile(".* occurred at \\d+-\\d+-\\d+ \\d+:\\d+:\\d+.\\d+, took \\d+ms \\(System\\.gc\\(\\)\\) {2}(eden|survivor|old)\\[[-+]\\d+]\\(\\d+->\\d+\\) (eden|survivor|old)\\[[-+]\\d+]\\(\\d+->\\d+\\).*"); private static final Pattern MACHINESTATS_PATTERN = Pattern.compile("machinecpu\\[%user=\\d+\\.\\d+, %nice=\\d+\\.\\d+, %sys=\\d+\\.\\d+, %idle=\\d+\\.\\d+, %iowait=\\d+\\.\\d+, %irq=\\d+\\.\\d+, %softirq=\\d+\\.\\d+, %core0=\\d+\\.\\d+.*], memory\\[swap=\\d+, physical=\\d+], network\\[.*_rx=\\d+, .*_tx=\\d+, .*_pktrx=\\d+, .*_pkttx=\\d+, .*_errin=\\d+, .*_errout=\\d+.*], disk\\[.*_reads=\\d+, .*_readbytes=\\d+, .*_writes=\\d+, .*_writebytes=\\d+.*]"); - private final CountDownLatch latch = new CountDownLatch(1); - private final AtomicBoolean isMatches = new AtomicBoolean(); + private CountDownLatch latch = new CountDownLatch(1); + private AtomicBoolean isMatches = new AtomicBoolean(); private volatile String logLine; @Test @@ -136,7 +136,7 @@ public void delayStart() throws InterruptedException { private void assertJVMStatsLog(Long timestamp, String s) { logLine = s; - Pattern pattern = JavaRuntime.getVersion() == 8 ? JVM_8_STATS_PATTERN : JVM_9_STATS_PATTERN; + Pattern pattern = JavaRuntime.feature() == 8 ? JVM_8_STATS_PATTERN : JVM_9_STATS_PATTERN; isMatches.set(pattern.matcher(s).matches()); latch.countDown(); } diff --git a/jvm-statistics/src/test/java/com/criteo/hadoop/garmadon/jvm/utils/JavaRuntimeTest.java b/jvm-statistics/src/test/java/com/criteo/hadoop/garmadon/jvm/utils/JavaRuntimeTest.java index 4ddb97a0..509a86a6 100644 --- a/jvm-statistics/src/test/java/com/criteo/hadoop/garmadon/jvm/utils/JavaRuntimeTest.java +++ b/jvm-statistics/src/test/java/com/criteo/hadoop/garmadon/jvm/utils/JavaRuntimeTest.java @@ -7,25 +7,31 @@ public class JavaRuntimeTest extends TestCase { public void test_parse_1_8_x_as_8() { - JavaRuntime.Version version = parseVersion("1.8.0_362"); - assertThat(version.getVersionNumber()).isEqualTo(8); + String versionString = "1.8.0_362"; + JavaRuntime.Version version = parseVersion(versionString); + assertThat(version.getVersion()).isEqualTo(versionString); + assertThat(version.getFeature()).isEqualTo(8); assertThat(version.getParsingError()).isNull(); } public void test_parse_11_x_as_11() { - JavaRuntime.Version version = parseVersion("11.0.16"); - assertThat(version.getVersionNumber()).isEqualTo(11); + String versionString = "11.0.16"; + JavaRuntime.Version version = parseVersion(versionString); + assertThat(version.getVersion()).isEqualTo(versionString); + assertThat(version.getFeature()).isEqualTo(11); assertThat(version.getParsingError()).isNull(); } public void test_parsing_error() { - JavaRuntime.Version version = parseVersion("ABC"); - assertThat(version.getVersionNumber()).isEqualTo(-1); + String versionString = "ABC"; + JavaRuntime.Version version = parseVersion(versionString); + assertThat(version.getVersion()).isEqualTo(versionString); + assertThat(version.getFeature()).isEqualTo(-1); assertThat(version.getParsingError()).isNotNull(); } - public void test_getVersion() { - assertThat(JavaRuntime.getVersion()).isGreaterThanOrEqualTo(8); + public void test_feature() { + assertThat(JavaRuntime.feature()).isGreaterThanOrEqualTo(8); } } diff --git a/jvm-statistics/src/test/java/com/criteo/hadoop/garmadon/jvm/utils/SparkRuntimeTest.java b/jvm-statistics/src/test/java/com/criteo/hadoop/garmadon/jvm/utils/SparkRuntimeTest.java new file mode 100644 index 00000000..6bfbdc61 --- /dev/null +++ b/jvm-statistics/src/test/java/com/criteo/hadoop/garmadon/jvm/utils/SparkRuntimeTest.java @@ -0,0 +1,13 @@ +package com.criteo.hadoop.garmadon.jvm.utils; + +import junit.framework.TestCase; + +import static org.assertj.core.api.Assertions.assertThat; + +public class SparkRuntimeTest extends TestCase { + + public void test_get_3_2_version() { + assertThat(SparkRuntime.getVersion()).isEqualTo("3.2.1"); + } + +} diff --git a/pom.xml b/pom.xml index 65678a41..4881c12f 100644 --- a/pom.xml +++ b/pom.xml @@ -120,6 +120,7 @@ schema jvm-statistics + jvm-statistics-test-spark-3.5 frameworks agent forwarder diff --git a/readers/elasticsearch/src/test/java/com/criteo/hadoop/garmadon/elasticsearch/ElasticSearchReaderTest.java b/readers/elasticsearch/src/test/java/com/criteo/hadoop/garmadon/elasticsearch/ElasticSearchReaderTest.java index 83556c47..6440b5bb 100644 --- a/readers/elasticsearch/src/test/java/com/criteo/hadoop/garmadon/elasticsearch/ElasticSearchReaderTest.java +++ b/readers/elasticsearch/src/test/java/com/criteo/hadoop/garmadon/elasticsearch/ElasticSearchReaderTest.java @@ -14,8 +14,9 @@ import com.criteo.hadoop.garmadon.schema.enums.State; import com.criteo.hadoop.garmadon.schema.serialization.GarmadonSerialization; import com.google.protobuf.Message; -import java.util.Collection; -import java.util.Collections; + +import java.util.*; + import org.apache.kafka.clients.consumer.MockConsumer; import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.elasticsearch.action.bulk.BulkProcessor; @@ -25,10 +26,6 @@ import org.mockito.ArgumentCaptor; import org.mockito.Mockito; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Map; - import static org.junit.Assert.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; @@ -81,11 +78,14 @@ public void setUp() { headerMap.put("component", ""); headerMap.put("application_name", "application_name"); headerMap.put("framework", ""); + headerMap.put("framework_version", ""); headerMap.put("attempt_id", "attempt_id"); headerMap.put("container_id", "container_id"); headerMap.put("username", "user"); headerMap.put("executor_id", ""); headerMap.put("timestamp", 0); + headerMap.put("java_version", ""); + headerMap.put("java_feature", 0); } public void writeGarmadonMessage(int type, Message message, long timestampMillis) { @@ -110,7 +110,7 @@ public void writeToES_StateEventMessage() { writeGarmadonMessage(type, event, 0L); verify(bulkProcessor, times(1)).add(argument.capture(), any(CommittableOffset.class)); - assertEquals(eventMap, argument.getValue().sourceAsMap()); + assertEquals(new TreeMap<>(eventMap), new TreeMap<>(argument.getValue().sourceAsMap())); } @Test @@ -143,7 +143,7 @@ public void writeToES_AppEventMessage() { writeGarmadonMessage(type, event, 0L); verify(bulkProcessor, times(1)).add(argument.capture(), any(CommittableOffset.class)); - assertEquals(eventMap, argument.getValue().sourceAsMap()); + assertEquals(new TreeMap<>(eventMap), new TreeMap<>(argument.getValue().sourceAsMap())); } @Test @@ -205,7 +205,8 @@ public void writeToES_JVMstats() { } } assertEquals(dsikEventMap, diskMap); - assertEquals(jvmEventMap, jvmMap); + + assertEquals(new TreeMap<>(jvmEventMap), new TreeMap<>(jvmMap)); } @Test @@ -234,7 +235,7 @@ public void writeToES_FsEventMessage() { writeGarmadonMessage(type, event, 0L); verify(bulkProcessor, times(1)).add(argument.capture(), any(CommittableOffset.class)); - assertEquals(eventMap, argument.getValue().sourceAsMap()); + assertEquals(new TreeMap<>(eventMap), new TreeMap<>(argument.getValue().sourceAsMap())); } } diff --git a/readers/elasticsearch/src/test/java/com/criteo/hadoop/garmadon/elasticsearch/cache/ElasticSearchCacheManagerTest.java b/readers/elasticsearch/src/test/java/com/criteo/hadoop/garmadon/elasticsearch/cache/ElasticSearchCacheManagerTest.java index bedbcd44..9bbb027b 100644 --- a/readers/elasticsearch/src/test/java/com/criteo/hadoop/garmadon/elasticsearch/cache/ElasticSearchCacheManagerTest.java +++ b/readers/elasticsearch/src/test/java/com/criteo/hadoop/garmadon/elasticsearch/cache/ElasticSearchCacheManagerTest.java @@ -28,6 +28,7 @@ public class ElasticSearchCacheManagerTest { private final String component = "EXECUTOR"; private final String applicationName = "application_name"; private final String framework = "SPARK"; + private final String frameworkVersion = "3.5.2"; private final String username = "n.fraison"; private final List yarnTags = new ArrayList(); @@ -89,6 +90,7 @@ private GarmadonMessage generateGarmadonMessage(int type, String component, Mess .setAttemptId("attempt_id") .setContainerId(containerId) .setFramework(framework) + .setFrameworkVersion(frameworkVersion) .setComponent(component) .build(); @@ -148,6 +150,7 @@ public void do_not_add_container_with_unknown_component_in_cache() { .setContainerId(containerId) .setComponent(Component.UNKNOWN.name()) .setFramework(framework) + .setFrameworkVersion(frameworkVersion) .build(); DataAccessEventProtos.FsEvent fsEvent = DataAccessEventProtos.FsEvent.newBuilder() diff --git a/schema/src/main/java/com/criteo/hadoop/garmadon/schema/events/Header.java b/schema/src/main/java/com/criteo/hadoop/garmadon/schema/events/Header.java index ba7aee5a..f1a74c37 100644 --- a/schema/src/main/java/com/criteo/hadoop/garmadon/schema/events/Header.java +++ b/schema/src/main/java/com/criteo/hadoop/garmadon/schema/events/Header.java @@ -19,9 +19,12 @@ public class Header { private final String hostname; private final String pid; private final String framework; + private final String frameworkVersion; private final String component; private final String executorId; private final String mainClass; + private final String javaVersion; + private final int javaFeature; private final List tags; public enum Tag { @@ -32,22 +35,23 @@ public enum Tag { STANDALONE } - public Header(String id, String applicationID, String attemptID, String applicationName, - String user, String containerID, String hostname, List tags, String pid, - String framework, String component, String executorId, String mainClass) { - this.id = id; - this.applicationID = applicationID; - this.attemptID = attemptID; - this.applicationName = applicationName; - this.user = user; - this.containerID = containerID; - this.hostname = hostname; - this.tags = tags; - this.pid = pid; - this.framework = framework; - this.component = component; - this.executorId = executorId; - this.mainClass = mainClass; + private Header(Builder builder) { + this.id = builder.id; + this.applicationID = builder.applicationID; + this.attemptID = builder.attemptID; + this.applicationName = builder.applicationName; + this.user = builder.user; + this.containerID = builder.containerID; + this.hostname = builder.hostname; + this.tags = builder.tags; + this.pid = builder.pid; + this.framework = builder.framework; + this.frameworkVersion = builder.frameworkVersion; + this.component = builder.component; + this.executorId = builder.executorId; + this.mainClass = builder.mainClass; + this.javaVersion = builder.javaVersion; + this.javaFeature = builder.javaFeature; } public String getId() { @@ -86,6 +90,11 @@ public String getFramework() { return framework; } + public String getFrameworkVersion() { + return frameworkVersion; + } + + public String getComponent() { return component; } @@ -98,6 +107,14 @@ public String getMainClass() { return mainClass; } + public String getJavaVersion() { + return javaVersion; + } + + public int getJavaFeature() { + return javaFeature; + } + public List getTags() { return tags; } @@ -119,56 +136,68 @@ public boolean equals(Object other) { Objects.equals(this.tags, otherHeader.tags) && Objects.equals(this.pid, otherHeader.pid) && Objects.equals(this.framework, otherHeader.framework) && + Objects.equals(this.frameworkVersion, otherHeader.frameworkVersion) && Objects.equals(this.component, otherHeader.component) && Objects.equals(this.executorId, otherHeader.executorId) && - Objects.equals(this.mainClass, otherHeader.mainClass); + Objects.equals(this.mainClass, otherHeader.mainClass) && + Objects.equals(this.javaVersion, otherHeader.javaVersion) && + this.javaFeature == otherHeader.javaFeature; } @Override public int hashCode() { - return Objects.hash(id, applicationID, attemptID, applicationName, user, containerID, hostname, pid, framework, component, - executorId, mainClass, tags); + return Objects.hash(id, applicationID, attemptID, applicationName, user, containerID, hostname, pid, framework, + frameworkVersion, component, executorId, mainClass, tags, javaVersion, javaFeature); } public Header cloneAndOverride(Header override) { - String idClone = override.id != null ? override.id : this.id; - String applicationIDClone = override.applicationID != null ? override.applicationID : this.applicationID; - String attemptIDClone = override.attemptID != null ? override.attemptID : this.attemptID; - String applicationNameClone = override.applicationName != null ? override.applicationName : this.applicationName; - String userClone = override.user != null ? override.user : this.user; - String containerIDClone = override.containerID != null ? override.containerID : this.containerID; - String hostnameClone = override.hostname != null ? override.hostname : this.hostname; - List tagsClone = (override.tags != null && override.tags.size() > 0) ? override.tags : this.tags; - String pidClone = override.pid != null ? override.pid : this.pid; - String frameworkClone = override.framework != null ? override.framework : this.framework; - String componentClone = override.component != null ? override.component : this.component; - String executorIdClone = override.executorId != null ? override.executorId : this.executorId; - String mainClassClone = override.mainClass != null ? override.mainClass : this.mainClass; - return new Header( - idClone, - applicationIDClone, - attemptIDClone, - applicationNameClone, - userClone, - containerIDClone, - hostnameClone, - tagsClone, - pidClone, - frameworkClone, - componentClone, - executorIdClone, - mainClassClone - ); + Builder cloneBuilder = new Builder(); + cloneBuilder.id = override.id != null ? override.id : this.id; + cloneBuilder.applicationID = override.applicationID != null ? override.applicationID : this.applicationID; + cloneBuilder.attemptID = override.attemptID != null ? override.attemptID : this.attemptID; + cloneBuilder.applicationName = override.applicationName != null ? override.applicationName : this.applicationName; + cloneBuilder.user = override.user != null ? override.user : this.user; + cloneBuilder.containerID = override.containerID != null ? override.containerID : this.containerID; + cloneBuilder.hostname = override.hostname != null ? override.hostname : this.hostname; + cloneBuilder.tags = (override.tags != null && !override.tags.isEmpty()) ? override.tags : this.tags; + cloneBuilder.pid = override.pid != null ? override.pid : this.pid; + cloneBuilder.framework = override.framework != null ? override.framework : this.framework; + cloneBuilder.frameworkVersion = override.frameworkVersion != null ? override.frameworkVersion : this.frameworkVersion; + cloneBuilder.component = override.component != null ? override.component : this.component; + cloneBuilder.executorId = override.executorId != null ? override.executorId : this.executorId; + cloneBuilder.mainClass = override.mainClass != null ? override.mainClass : this.mainClass; + cloneBuilder.javaVersion = override.javaVersion != null ? override.javaVersion : this.javaVersion; + cloneBuilder.javaFeature = override.javaFeature != 0 ? override.javaFeature : this.javaFeature; + return cloneBuilder.build(); + } + + private Builder toBuilder() { + Builder builder = new Builder(); + builder.id = this.id; + builder.applicationID = this.applicationID; + builder.attemptID = this.attemptID; + builder.applicationName = this.applicationName; + builder.user = this.user; + builder.containerID = this.containerID; + builder.hostname = this.hostname; + builder.tags = this.tags; + builder.pid = this.pid; + builder.framework = this.framework; + builder.frameworkVersion = this.frameworkVersion; + builder.component = this.component; + builder.executorId = this.executorId; + builder.mainClass = this.mainClass; + builder.javaVersion = this.javaVersion; + builder.javaFeature = this.javaFeature; + return builder; } public SerializedHeader toSerializeHeader() { - return new SerializedHeader(id, applicationID, attemptID, applicationName, user, containerID, hostname, tags, pid, framework, - component, executorId, mainClass); + return new SerializedHeader(toBuilder()); } public byte[] serialize() { - EventHeaderProtos.Header.Builder builder = EventHeaderProtos.Header - .newBuilder(); + EventHeaderProtos.Header.Builder builder = EventHeaderProtos.Header.newBuilder(); if (id != null) builder.setId(id); if (applicationID != null) builder.setApplicationId(applicationID); if (attemptID != null) builder.setAttemptId(attemptID); @@ -176,16 +205,19 @@ public byte[] serialize() { if (user != null) builder.setUsername(user); if (containerID != null) builder.setContainerId(containerID); if (hostname != null) builder.setHostname(hostname); - if (tags != null && tags.size() > 0) { + if (tags != null && !tags.isEmpty()) { for (String tag : tags) { builder.addTags(tag); } } if (pid != null) builder.setPid(pid); if (framework != null) builder.setFramework(framework); + if (frameworkVersion != null) builder.setFrameworkVersion(frameworkVersion); if (component != null) builder.setComponent(component); if (executorId != null) builder.setExecutorId(executorId); if (mainClass != null) builder.setMainClass(mainClass); + if (javaVersion != null) builder.setJavaVersion(javaVersion); + if (javaFeature != 0) builder.setJavaFeature(javaFeature); return builder.build().toByteArray(); } @@ -201,9 +233,12 @@ public String toString() { ", hostname='" + hostname + '\'' + ", pid='" + pid + '\'' + ", framework='" + framework + '\'' + + ", frameworkVersion='" + frameworkVersion + '\'' + ", component='" + component + '\'' + ", executorId='" + executorId + '\'' + ", mainClass='" + mainClass + '\'' + + ", javaVersion='" + javaVersion + '\'' + + ", javaFeature='" + javaFeature + '\'' + ", tags=" + tags + '}'; } @@ -211,11 +246,8 @@ public String toString() { public static class SerializedHeader extends Header { private final byte[] bytes; - SerializedHeader(String id, String applicationID, String appAttemptID, String applicationName, String user, - String containerID, String hostname, List tags, String pid, String framework, - String component, String executorId, String mainClass) { - super(id, applicationID, appAttemptID, applicationName, user, containerID, hostname, tags, pid, framework, - component, executorId, mainClass); + SerializedHeader(Builder builder) { + super(builder); this.bytes = super.serialize(); } @@ -230,7 +262,7 @@ public static Builder newBuilder() { } public static class Builder { - private static final Logger LOGGER = LoggerFactory.getLogger(Header.Builder.class); + private static final Logger LOGGER = LoggerFactory.getLogger(Builder.class); private static final String TAGS_REGEX = "^[a-zA-Z0-9_\\-\\.]*$"; private String id; @@ -242,9 +274,13 @@ public static class Builder { private String hostname; private String pid; private String framework; + private String frameworkVersion; private String component; private String executorId; private String mainClass; + private String javaVersion; + private int javaFeature; + private List tags = new ArrayList<>(); Builder() { @@ -313,6 +349,11 @@ public Builder withFramework(String framework) { return this; } + public Builder withFrameworkVersion(String frameworkVersion) { + this.frameworkVersion = frameworkVersion; + return this; + } + public Builder withComponent(String component) { this.component = component; return this; @@ -328,14 +369,22 @@ public Builder withMainClass(String mainClass) { return this; } + public Builder withJavaVersion(String javaVersion) { + this.javaVersion = javaVersion; + return this; + } + + public Builder withJavaFeature(int javaFeature) { + this.javaFeature = javaFeature; + return this; + } + public Header build() { - return new Header(id, applicationID, attemptID, applicationName, user, containerID, hostname, tags, - pid, framework, component, executorId, mainClass); + return new Header(this); } public SerializedHeader buildSerializedHeader() { - return new SerializedHeader(id, applicationID, attemptID, applicationName, user, containerID, hostname, - tags, pid, framework, component, executorId, mainClass); + return new SerializedHeader(this); } } } diff --git a/schema/src/main/protobuf/event_header.proto b/schema/src/main/protobuf/event_header.proto index 7e60b174..cbd4b2e3 100644 --- a/schema/src/main/protobuf/event_header.proto +++ b/schema/src/main/protobuf/event_header.proto @@ -19,4 +19,7 @@ message Header { repeated string tags = 12; string id = 13; string main_class = 14; + string java_version = 15; + int32 java_feature = 16; + string framework_version = 17; } \ No newline at end of file