-
Notifications
You must be signed in to change notification settings - Fork 15
/
Copy pathCloudFSMetrics27.scala
96 lines (78 loc) · 3.35 KB
/
CloudFSMetrics27.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package ch.cern
import java.util.{Map => JMap}
import scala.collection.JavaConverters._
import com.codahale.metrics.{Gauge, MetricRegistry}
import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, PluginContext, SparkPlugin}
import org.apache.spark.SparkContext
import org.apache.hadoop.fs.FileSystem
import org.slf4j.LoggerFactory
/**
* Instrument Cloud FS I/0 using Hadoop 2.7 client API for FileSystem.getAllStatistics
* Note this API is deprecated in more recent versions of Hadoop.
* Requires the name of the Hadoop compatible filesystem to measure (s3a, oci, gs, root, etc).
* Limitation: currently supports only one filesystem at a time
*
* Parameters:
* filesystem name:
* --conf spark.cernSparkPlugin.cloudFsName=<name of the Hadoop compatible filesystem> (example: s3a)
* register metrics on the driver conditional to
* --conf spark.cernSparkPlugin.registerOnDriver=true
*
*/
class CloudFSMetrics27 extends SparkPlugin {
lazy val logger = LoggerFactory.getLogger(this.getClass.getName)
def cloudFilesystemMetrics(myContext: PluginContext): Unit= {
val fsName = myContext.conf.getOption("spark.cernSparkPlugin.cloudFsName")
if (fsName.isEmpty) {
logger.error("spark.cernSparkPlugin.cloudFsName needs to be set when using the ch.cern.CloudFSMetrics Plugin.")
throw new IllegalArgumentException
}
val metricRegistry = myContext.metricRegistry
metricRegistry.register(MetricRegistry.name("bytesRead"), new Gauge[Long] {
override def getValue: Long = {
val cloudFSStats = FileSystem.getAllStatistics().asScala.find(s => s.getScheme.equals(fsName))
cloudFSStats.map(_.getBytesRead).getOrElse(0L)
}
})
metricRegistry.register(MetricRegistry.name("bytesWritten"), new Gauge[Long] {
override def getValue: Long = {
val cloudFSStats = FileSystem.getAllStatistics().asScala.find(s => s.getScheme.equals(fsName))
cloudFSStats.map(_.getBytesWritten).getOrElse(0L)
}
})
metricRegistry.register(MetricRegistry.name("readOps"), new Gauge[Int] {
override def getValue: Int = {
val cloudFSStats = FileSystem.getAllStatistics().asScala.find(s => s.getScheme.equals(fsName))
cloudFSStats.map(_.getReadOps).getOrElse(0)
}
})
metricRegistry.register(MetricRegistry.name("writeOps"), new Gauge[Int] {
override def getValue: Int = {
val cloudFSStats = FileSystem.getAllStatistics().asScala.find(s => s.getScheme.equals(fsName))
cloudFSStats.map(_.getWriteOps).getOrElse(0)
}
})
}
// Return the plugin's driver-side component.
// register metrics conditional to --conf spark.cernSparkPlugin.registerOnDriver=true
override def driverPlugin(): DriverPlugin = {
new DriverPlugin() {
override def init(sc: SparkContext, myContext: PluginContext): JMap[String, String] = {
val registerOnDriver =
myContext.conf.getBoolean("spark.cernSparkPlugin.registerOnDriver", true)
if (registerOnDriver) {
cloudFilesystemMetrics(myContext)
}
Map.empty[String, String].asJava
}
}
}
// Return the plugin's executor-side component.
override def executorPlugin(): ExecutorPlugin = {
new ExecutorPlugin {
override def init(myContext:PluginContext, extraConf:JMap[String, String]) = {
cloudFilesystemMetrics(myContext)
}
}
}
}