Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Java and Spark versions to Garmadon logs #232

Merged
merged 3 commits into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.criteo.hadoop.garmadon.agent.headers;

import com.criteo.hadoop.garmadon.jvm.utils.JavaRuntime;
import com.criteo.hadoop.garmadon.jvm.utils.SparkRuntime;
import com.criteo.hadoop.garmadon.schema.enums.Component;
import com.criteo.hadoop.garmadon.schema.enums.Framework;
import com.criteo.hadoop.garmadon.schema.events.Header;
Expand All @@ -21,6 +23,7 @@ public final class ContainerHeader {
// as grafana/ES can't join on different event for display HDFS call per framework/component
// or compute used per framework/component
private Framework framework = Framework.YARN;
private String frameworkVersion = null;
private Component component = Component.UNKNOWN;
private String executorId;
private String mainClass;
Expand Down Expand Up @@ -52,11 +55,9 @@ private void setFrameworkComponent() {
break;
// SPARK
case "org.apache.spark.deploy.yarn.ApplicationMaster":
framework = Framework.SPARK;
component = Component.APP_MASTER;
break;
case "org.apache.spark.deploy.yarn.ExecutorLauncher":
framework = Framework.SPARK;
frameworkVersion = SparkRuntime.getVersion();
component = Component.APP_MASTER;
break;
case "org.apache.spark.executor.CoarseGrainedExecutorBackend":
Expand All @@ -76,21 +77,12 @@ private void setFrameworkComponent() {
break;
// FLINK
case "org.apache.flink.yarn.YarnApplicationMasterRunner":
framework = Framework.FLINK;
component = Component.APP_MASTER;
break;
case "org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint":
framework = Framework.FLINK;
component = Component.APP_MASTER;
break;
case "org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint":
framework = Framework.FLINK;
component = Component.APP_MASTER;
break;
case "org.apache.flink.yarn.YarnTaskManager":
framework = Framework.FLINK;
component = Component.TASK_MANAGER;
break;
case "org.apache.flink.yarn.YarnTaskExecutorRunner":
framework = Framework.FLINK;
component = Component.TASK_MANAGER;
Expand Down Expand Up @@ -123,9 +115,12 @@ private Header.SerializedHeader createCachedHeader() {
.withContainerID(containerIdString)
.withPid(HeaderUtils.getPid())
.withFramework(framework.toString())
.withFrameworkVersion(frameworkVersion)
.withComponent(component.name())
.withExecutorId(executorId)
.withMainClass(mainClass)
.withJavaVersion(JavaRuntime.version())
.withJavaFeature(JavaRuntime.feature())
.buildSerializedHeader();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.criteo.hadoop.garmadon.agent.headers;

import com.criteo.hadoop.garmadon.jvm.utils.JavaRuntime;
import com.criteo.hadoop.garmadon.schema.events.Header;
import com.criteo.hadoop.garmadon.schema.events.HeaderUtils;

Expand All @@ -17,6 +18,8 @@ private Header createCachedHeader() {
.withUser(HeaderUtils.getUser())
.withPid(HeaderUtils.getPid())
.withMainClass(HeaderUtils.getJavaMainClass())
.withJavaVersion(JavaRuntime.version())
.withJavaFeature(JavaRuntime.feature())
.addTag(Header.Tag.NODEMANAGER.name())
.addTags(System.getProperty("garmadon.tags"))
.build();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.criteo.hadoop.garmadon.agent.headers;

import com.criteo.hadoop.garmadon.jvm.utils.JavaRuntime;
import com.criteo.hadoop.garmadon.schema.events.Header;
import com.criteo.hadoop.garmadon.schema.events.HeaderUtils;

Expand All @@ -17,6 +18,8 @@ private Header createCachedHeader() {
.withUser(HeaderUtils.getUser())
.withPid(HeaderUtils.getPid())
.withMainClass(HeaderUtils.getJavaMainClass())
.withJavaVersion(JavaRuntime.version())
.withJavaFeature(JavaRuntime.feature())
.addTag(Header.Tag.RESOURCEMANAGER.name())
.addTags(System.getProperty("garmadon.tags"))
.build();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.criteo.hadoop.garmadon.agent.headers;

import com.criteo.hadoop.garmadon.jvm.utils.JavaRuntime;
import com.criteo.hadoop.garmadon.schema.events.Header;
import com.criteo.hadoop.garmadon.schema.events.HeaderUtils;

Expand All @@ -20,6 +21,8 @@ private Header.SerializedHeader createCachedHeader() {
.withUser(HeaderUtils.getUser())
.withPid(HeaderUtils.getPid())
.withMainClass(HeaderUtils.getJavaMainClass())
.withJavaVersion(JavaRuntime.version())
.withJavaFeature(JavaRuntime.feature())
.buildSerializedHeader();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.criteo.hadoop.garmadon.forwarder.metrics.ForwarderEventSender;
import com.criteo.hadoop.garmadon.forwarder.metrics.HostStatistics;
import com.criteo.hadoop.garmadon.forwarder.metrics.PrometheusHttpMetrics;
import com.criteo.hadoop.garmadon.jvm.utils.JavaRuntime;
import com.criteo.hadoop.garmadon.schema.events.Header;
import com.criteo.hadoop.garmadon.schema.events.HeaderUtils;
import com.criteo.hadoop.garmadon.schema.exceptions.TypeMarkerException;
Expand Down Expand Up @@ -73,6 +74,8 @@ public Forwarder(Properties properties) {
.withPid(HeaderUtils.getPid())
.withUser(HeaderUtils.getUser())
.withMainClass(HeaderUtils.getJavaMainClass())
.withJavaVersion(JavaRuntime.version())
.withJavaFeature(JavaRuntime.feature())
.addTag(Header.Tag.FORWARDER.name());

for (String tag : properties.getProperty("forwarder.tags", "").split(",")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,24 @@ public class GarmadonFlinkReporterTest {

private static final Random random = new Random();

private static final Header DUMMY_HEADER = new Header("id", "appId", "appAttemptId",
"appName", "user", "container", "hostname",
Collections.singletonList("tag"), "pid", "framework", "component",
"executorId", "mainClass");
private static final Header DUMMY_HEADER = Header.newBuilder()
.withId("id")
.withApplicationID("appId")
.withAttemptID("appAttemptId")
.withApplicationName("appName")
.withUser("user")
.withContainerID("container")
.withHostname("hostname")
.addTag("tag")
.withPid("pid")
.withFramework("framework")
.withFrameworkVersion("frameworkVersion")
.withComponent("component")
.withExecutorId("executorId")
.withMainClass("mainClass")
.withJavaVersion("javaVersion")
.withJavaFeature(8)
.build();

private static final String HOST = "localhost";
private static final String JOB_ID = "SomeJobId";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,21 @@
import static org.mockito.Mockito.*;

public class GarmadonSparkListenerTest {
private static final Header DUMMY_HEADER = new Header("id", "appId", "appAttemptId",
"appName", "user", "container", "hostname",
Collections.singletonList("tag"), "pid", "framework", "component",
"executorId", "mainClass");
private static final Header DUMMY_HEADER = Header.newBuilder()
.withId("id")
.withApplicationID("appId")
.withAttemptID("appAttemptId")
.withApplicationName("appName")
.withUser("user")
.withContainerID("container")
.withHostname("hostname")
.addTag("tag")
.withPid("pid")
.withFramework("framework")
.withComponent("component")
.withExecutorId("executorId")
.withMainClass("mainClass")
.build();

@Test
public void onExecutorAdded() {
Expand Down
38 changes: 38 additions & 0 deletions jvm-statistics-test-spark-3.5/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>com.criteo.java</groupId>
<artifactId>garmadon</artifactId>
<version>1.4.0</version>
<relativePath>../pom.xml</relativePath>
</parent>

<modelVersion>4.0.0</modelVersion>

<artifactId>garmadon-jvm-statistics-test-spark-3.5</artifactId>
<version>1.4.0</version>

<dependencies>
<dependency>
<groupId>com.criteo.java</groupId>
<artifactId>garmadon-jvm-statistics</artifactId>
<version>${garmadon.version}</version>
</dependency>

<!-- Test dependencies -->
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>3.11.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.5.3</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.criteo.hadoop.garmadon.jvm.utils;

import junit.framework.TestCase;

import static org.assertj.core.api.Assertions.assertThat;

public class SparkRuntimeTest extends TestCase {

public void test_get_3_5_version() {
assertThat(SparkRuntime.getVersion()).isEqualTo("3.5.3");
}

}
6 changes: 6 additions & 0 deletions jvm-statistics/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -57,5 +57,11 @@
<version>3.11.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.2.1</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -1,19 +1,11 @@
package com.criteo.hadoop.garmadon.jvm.utils;

public final class JavaRuntime {

private static final Version VERSION = parseVersion(System.getProperty("java.version"));

private JavaRuntime() {
}

public static int getVersion() throws RuntimeException {
if (VERSION.versionNumber == -1) {
throw new RuntimeException("Could not parse Java version.", VERSION.parsingError);
}
return VERSION.versionNumber;
}

static Version parseVersion(String version) {
try {
int versionNumber;
Expand All @@ -23,29 +15,46 @@ static Version parseVersion(String version) {
int dot = version.indexOf(".");
versionNumber = Integer.parseInt(version.substring(0, dot));
}
return new Version(versionNumber);
return new Version(version, versionNumber);
} catch (RuntimeException e) {
return new Version(e);
return new Version(version, e);
}
}

public static int feature() throws RuntimeException {
if (VERSION.feature == -1) {
throw new RuntimeException("Could not parse Java version.", VERSION.parsingError);
}
return VERSION.feature;
}

static final class Version {
public static String version() {
return VERSION.version;
}

private final int versionNumber;
final static class Version {
private final String version;
private final int feature;
private final RuntimeException parsingError;

private Version(int versionNumber) {
this.versionNumber = versionNumber;
private Version(String version, int feature) {
this.version = version;
this.feature = feature;
this.parsingError = null;
}

private Version(RuntimeException parsingError) {
this.versionNumber = -1;
private Version(String version, RuntimeException parsingError) {
this.version = version;
this.feature = -1;
this.parsingError = parsingError;
}

public int getVersionNumber() {
return versionNumber;
public String getVersion() {
return version;
}

public int getFeature() {
return feature;
}

public RuntimeException getParsingError() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package com.criteo.hadoop.garmadon.jvm.utils;

import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;

public final class SparkRuntime {
private static final Version VERSION = computeVersion();

private SparkRuntime() {
}

public static String getVersion() throws RuntimeException {
if (VERSION.versionNumber == null) {
throw new RuntimeException("Could not find Spark version. Is this a Spark application?", VERSION.throwable);
}
return VERSION.versionNumber;
}

static Version computeVersion() {
try {
return computeSpark32Version();
} catch (Throwable e) {
try {
return computeSpark35Version();
} catch (Throwable t) {
return new Version(t);
}
}
}

private static Version computeSpark32Version() throws ClassNotFoundException, NoSuchFieldException, IllegalAccessException {
Class<?> clazz = Class.forName("org.apache.spark.package$SparkBuildInfo$");
Field moduleFIeld = clazz.getField("MODULE$");
Object instance = moduleFIeld.get(null);
Field versionField = clazz.getDeclaredField("spark_version");
versionField.setAccessible(true);
String version = (String) versionField.get(instance);
return new Version(version);
}

private static Version computeSpark35Version() throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {
Class<?> clazz = Class.forName("org.apache.spark.SparkBuildInfo");
Method versionMethod = clazz.getDeclaredMethod("spark_version");
String version = (String) versionMethod.invoke(null);
return new Version(version);
}

final static class Version {
private final String versionNumber;
private final Throwable throwable;

private Version(String versionNumber) {
this.versionNumber = versionNumber;
this.throwable = null;
}

private Version(Throwable throwable) {
this.versionNumber = null;
this.throwable = throwable;
}

public String getVersionNumer() {
return versionNumber;
}

public Throwable getThrowable() {
return throwable;
}
}
}
Loading
Loading