Skip to content

Commit

Permalink
Add Java and Spark versions to Garmadon logs (#232)
Browse files Browse the repository at this point in the history
* ✨ Add Java and Spark versions to Garmadon logs

* Fix mostly Checkstyle rules

* Fix build

---------

Co-authored-by: Jean-Baptiste Catté <[email protected]>
  • Loading branch information
jbkt and Jean-Baptiste Catté authored Jan 24, 2025
1 parent 7229ac6 commit 885b878
Show file tree
Hide file tree
Showing 21 changed files with 371 additions and 121 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.criteo.hadoop.garmadon.agent.headers;

import com.criteo.hadoop.garmadon.jvm.utils.JavaRuntime;
import com.criteo.hadoop.garmadon.jvm.utils.SparkRuntime;
import com.criteo.hadoop.garmadon.schema.enums.Component;
import com.criteo.hadoop.garmadon.schema.enums.Framework;
import com.criteo.hadoop.garmadon.schema.events.Header;
Expand All @@ -21,6 +23,7 @@ public final class ContainerHeader {
// as grafana/ES can't join on different event for display HDFS call per framework/component
// or compute used per framework/component
private Framework framework = Framework.YARN;
private String frameworkVersion = null;
private Component component = Component.UNKNOWN;
private String executorId;
private String mainClass;
Expand Down Expand Up @@ -52,11 +55,9 @@ private void setFrameworkComponent() {
break;
// SPARK
case "org.apache.spark.deploy.yarn.ApplicationMaster":
framework = Framework.SPARK;
component = Component.APP_MASTER;
break;
case "org.apache.spark.deploy.yarn.ExecutorLauncher":
framework = Framework.SPARK;
frameworkVersion = SparkRuntime.getVersion();
component = Component.APP_MASTER;
break;
case "org.apache.spark.executor.CoarseGrainedExecutorBackend":
Expand All @@ -76,21 +77,12 @@ private void setFrameworkComponent() {
break;
// FLINK
case "org.apache.flink.yarn.YarnApplicationMasterRunner":
framework = Framework.FLINK;
component = Component.APP_MASTER;
break;
case "org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint":
framework = Framework.FLINK;
component = Component.APP_MASTER;
break;
case "org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint":
framework = Framework.FLINK;
component = Component.APP_MASTER;
break;
case "org.apache.flink.yarn.YarnTaskManager":
framework = Framework.FLINK;
component = Component.TASK_MANAGER;
break;
case "org.apache.flink.yarn.YarnTaskExecutorRunner":
framework = Framework.FLINK;
component = Component.TASK_MANAGER;
Expand Down Expand Up @@ -123,9 +115,12 @@ private Header.SerializedHeader createCachedHeader() {
.withContainerID(containerIdString)
.withPid(HeaderUtils.getPid())
.withFramework(framework.toString())
.withFrameworkVersion(frameworkVersion)
.withComponent(component.name())
.withExecutorId(executorId)
.withMainClass(mainClass)
.withJavaVersion(JavaRuntime.version())
.withJavaFeature(JavaRuntime.feature())
.buildSerializedHeader();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.criteo.hadoop.garmadon.agent.headers;

import com.criteo.hadoop.garmadon.jvm.utils.JavaRuntime;
import com.criteo.hadoop.garmadon.schema.events.Header;
import com.criteo.hadoop.garmadon.schema.events.HeaderUtils;

Expand All @@ -17,6 +18,8 @@ private Header createCachedHeader() {
.withUser(HeaderUtils.getUser())
.withPid(HeaderUtils.getPid())
.withMainClass(HeaderUtils.getJavaMainClass())
.withJavaVersion(JavaRuntime.version())
.withJavaFeature(JavaRuntime.feature())
.addTag(Header.Tag.NODEMANAGER.name())
.addTags(System.getProperty("garmadon.tags"))
.build();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.criteo.hadoop.garmadon.agent.headers;

import com.criteo.hadoop.garmadon.jvm.utils.JavaRuntime;
import com.criteo.hadoop.garmadon.schema.events.Header;
import com.criteo.hadoop.garmadon.schema.events.HeaderUtils;

Expand All @@ -17,6 +18,8 @@ private Header createCachedHeader() {
.withUser(HeaderUtils.getUser())
.withPid(HeaderUtils.getPid())
.withMainClass(HeaderUtils.getJavaMainClass())
.withJavaVersion(JavaRuntime.version())
.withJavaFeature(JavaRuntime.feature())
.addTag(Header.Tag.RESOURCEMANAGER.name())
.addTags(System.getProperty("garmadon.tags"))
.build();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.criteo.hadoop.garmadon.agent.headers;

import com.criteo.hadoop.garmadon.jvm.utils.JavaRuntime;
import com.criteo.hadoop.garmadon.schema.events.Header;
import com.criteo.hadoop.garmadon.schema.events.HeaderUtils;

Expand All @@ -20,6 +21,8 @@ private Header.SerializedHeader createCachedHeader() {
.withUser(HeaderUtils.getUser())
.withPid(HeaderUtils.getPid())
.withMainClass(HeaderUtils.getJavaMainClass())
.withJavaVersion(JavaRuntime.version())
.withJavaFeature(JavaRuntime.feature())
.buildSerializedHeader();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.criteo.hadoop.garmadon.forwarder.metrics.ForwarderEventSender;
import com.criteo.hadoop.garmadon.forwarder.metrics.HostStatistics;
import com.criteo.hadoop.garmadon.forwarder.metrics.PrometheusHttpMetrics;
import com.criteo.hadoop.garmadon.jvm.utils.JavaRuntime;
import com.criteo.hadoop.garmadon.schema.events.Header;
import com.criteo.hadoop.garmadon.schema.events.HeaderUtils;
import com.criteo.hadoop.garmadon.schema.exceptions.TypeMarkerException;
Expand Down Expand Up @@ -73,6 +74,8 @@ public Forwarder(Properties properties) {
.withPid(HeaderUtils.getPid())
.withUser(HeaderUtils.getUser())
.withMainClass(HeaderUtils.getJavaMainClass())
.withJavaVersion(JavaRuntime.version())
.withJavaFeature(JavaRuntime.feature())
.addTag(Header.Tag.FORWARDER.name());

for (String tag : properties.getProperty("forwarder.tags", "").split(",")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,24 @@ public class GarmadonFlinkReporterTest {

private static final Random random = new Random();

private static final Header DUMMY_HEADER = new Header("id", "appId", "appAttemptId",
"appName", "user", "container", "hostname",
Collections.singletonList("tag"), "pid", "framework", "component",
"executorId", "mainClass");
private static final Header DUMMY_HEADER = Header.newBuilder()
.withId("id")
.withApplicationID("appId")
.withAttemptID("appAttemptId")
.withApplicationName("appName")
.withUser("user")
.withContainerID("container")
.withHostname("hostname")
.addTag("tag")
.withPid("pid")
.withFramework("framework")
.withFrameworkVersion("frameworkVersion")
.withComponent("component")
.withExecutorId("executorId")
.withMainClass("mainClass")
.withJavaVersion("javaVersion")
.withJavaFeature(8)
.build();

private static final String HOST = "localhost";
private static final String JOB_ID = "SomeJobId";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,21 @@
import static org.mockito.Mockito.*;

public class GarmadonSparkListenerTest {
private static final Header DUMMY_HEADER = new Header("id", "appId", "appAttemptId",
"appName", "user", "container", "hostname",
Collections.singletonList("tag"), "pid", "framework", "component",
"executorId", "mainClass");
private static final Header DUMMY_HEADER = Header.newBuilder()
.withId("id")
.withApplicationID("appId")
.withAttemptID("appAttemptId")
.withApplicationName("appName")
.withUser("user")
.withContainerID("container")
.withHostname("hostname")
.addTag("tag")
.withPid("pid")
.withFramework("framework")
.withComponent("component")
.withExecutorId("executorId")
.withMainClass("mainClass")
.build();

@Test
public void onExecutorAdded() {
Expand Down
38 changes: 38 additions & 0 deletions jvm-statistics-test-spark-3.5/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>com.criteo.java</groupId>
<artifactId>garmadon</artifactId>
<version>1.4.0</version>
<relativePath>../pom.xml</relativePath>
</parent>

<modelVersion>4.0.0</modelVersion>

<artifactId>garmadon-jvm-statistics-test-spark-3.5</artifactId>
<version>1.4.0</version>

<dependencies>
<dependency>
<groupId>com.criteo.java</groupId>
<artifactId>garmadon-jvm-statistics</artifactId>
<version>${garmadon.version}</version>
</dependency>

<!-- Test dependencies -->
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>3.11.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.5.3</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.criteo.hadoop.garmadon.jvm.utils;

import junit.framework.TestCase;

import static org.assertj.core.api.Assertions.assertThat;

public class SparkRuntimeTest extends TestCase {

public void test_get_3_5_version() {
assertThat(SparkRuntime.getVersion()).isEqualTo("3.5.3");
}

}
6 changes: 6 additions & 0 deletions jvm-statistics/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -57,5 +57,11 @@
<version>3.11.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.2.1</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -1,19 +1,11 @@
package com.criteo.hadoop.garmadon.jvm.utils;

public final class JavaRuntime {

private static final Version VERSION = parseVersion(System.getProperty("java.version"));

private JavaRuntime() {
}

public static int getVersion() throws RuntimeException {
if (VERSION.versionNumber == -1) {
throw new RuntimeException("Could not parse Java version.", VERSION.parsingError);
}
return VERSION.versionNumber;
}

static Version parseVersion(String version) {
try {
int versionNumber;
Expand All @@ -23,29 +15,46 @@ static Version parseVersion(String version) {
int dot = version.indexOf(".");
versionNumber = Integer.parseInt(version.substring(0, dot));
}
return new Version(versionNumber);
return new Version(version, versionNumber);
} catch (RuntimeException e) {
return new Version(e);
return new Version(version, e);
}
}

public static int feature() throws RuntimeException {
if (VERSION.feature == -1) {
throw new RuntimeException("Could not parse Java version.", VERSION.parsingError);
}
return VERSION.feature;
}

static final class Version {
public static String version() {
return VERSION.version;
}

private final int versionNumber;
final static class Version {
private final String version;
private final int feature;
private final RuntimeException parsingError;

private Version(int versionNumber) {
this.versionNumber = versionNumber;
private Version(String version, int feature) {
this.version = version;
this.feature = feature;
this.parsingError = null;
}

private Version(RuntimeException parsingError) {
this.versionNumber = -1;
private Version(String version, RuntimeException parsingError) {
this.version = version;
this.feature = -1;
this.parsingError = parsingError;
}

public int getVersionNumber() {
return versionNumber;
public String getVersion() {
return version;
}

public int getFeature() {
return feature;
}

public RuntimeException getParsingError() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package com.criteo.hadoop.garmadon.jvm.utils;

import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;

public final class SparkRuntime {
private static final Version VERSION = computeVersion();

private SparkRuntime() {
}

public static String getVersion() throws RuntimeException {
if (VERSION.versionNumber == null) {
throw new RuntimeException("Could not find Spark version. Is this a Spark application?", VERSION.throwable);
}
return VERSION.versionNumber;
}

static Version computeVersion() {
try {
return computeSpark32Version();
} catch (Throwable e) {
try {
return computeSpark35Version();
} catch (Throwable t) {
return new Version(t);
}
}
}

private static Version computeSpark32Version() throws ClassNotFoundException, NoSuchFieldException, IllegalAccessException {
Class<?> clazz = Class.forName("org.apache.spark.package$SparkBuildInfo$");
Field moduleFIeld = clazz.getField("MODULE$");
Object instance = moduleFIeld.get(null);
Field versionField = clazz.getDeclaredField("spark_version");
versionField.setAccessible(true);
String version = (String) versionField.get(instance);
return new Version(version);
}

private static Version computeSpark35Version() throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {
Class<?> clazz = Class.forName("org.apache.spark.SparkBuildInfo");
Method versionMethod = clazz.getDeclaredMethod("spark_version");
String version = (String) versionMethod.invoke(null);
return new Version(version);
}

final static class Version {
private final String versionNumber;
private final Throwable throwable;

private Version(String versionNumber) {
this.versionNumber = versionNumber;
this.throwable = null;
}

private Version(Throwable throwable) {
this.versionNumber = null;
this.throwable = throwable;
}

public String getVersionNumer() {
return versionNumber;
}

public Throwable getThrowable() {
return throwable;
}
}
}
Loading

0 comments on commit 885b878

Please sign in to comment.