diff --git a/README.md b/README.md index 90da896..022d813 100644 --- a/README.md +++ b/README.md @@ -22,7 +22,7 @@ and spark-shell/pyspark environments. - **Batch Job Analysis:** With Flight Recorder mode sparkMeasure records and analyzes batch job metrics for thorough inspection. - **Monitoring Capabilities:** Seamlessly integrates with external systems like InfluxDB, Apache Kafka, - and Prometheus Gateway for extensive monitoring. + and Prometheus PushPushgateway for extensive monitoring. - **Educational Tool:** Serves as a practical example of implementing Spark Listeners for the collection of detailed Spark task metrics. - **Language Compatibility:** Fully supports Scala, Java, and Python, making it versatile for a wide range @@ -216,12 +216,13 @@ SparkMeasure is one tool for many different use cases, languages, and environmen * **Flight Recorder mode**: SparkMeasure in flight recorder will collect metrics transparently, without any need for you to change your code. - * Metrics can be saved to a file, locally or to a Hadoop-compliant filesystem - * or you can write metrics in near-realtime to an InfluxDB instance or to Apache Kafka + * Metrics can be saved to a file, locally, or to a Hadoop-compliant filesystem + * or you can write metrics in near-realtime to the followingsinks: InfluxDB, Apache Kafka, Prometheus PushPushgateway * More details: - **[Flight Recorder mode with file sink](docs/Flight_recorder_mode_FileSink.md)** - **[Flight Recorder mode with InfluxDB sink](docs/Flight_recorder_mode_InfluxDBSink.md)** - **[Flight Recorder mode with Apache Kafka sink](docs/Flight_recorder_mode_KafkaSink.md)** + - **[Flight Recorder mode with Prometheus Pushgateway sink](docs/Flight_recorder_mode_PrometheusPushgatewaySink.md)** * **Additional documentation and examples**: @@ -249,10 +250,10 @@ SparkMeasure is one tool for many different use cases, languages, and environmen * metrics collection and processing can be at the Stage-level or Task-level. The user chooses which mode to use with the API. * metrics are can be buffered into memory for real-time reporting, or they can be dumped to an external system in the "flight recorder mode". - * supported external systems are File Systems supported by the Hadoop API, InfluxDB, and Apache Kafka. + * supported external systems are File Systems supported by the Hadoop API, InfluxDB, Apache Kafka, Prometheus Pushgateway. * Metrics are flattened and collected into local memory structures in the driver (ListBuffer of a custom case class). - * sparkMeasure in flight recorder mode with InfluxDB sink and Apache Kafka do not buffer, - but rather write the collected metrics directly + * sparkMeasure in flight recorder mode when using one between the InfluxDB sink, Apache Kafka sink, and Prometheus Pushgateway sink, does not buffer, + but rather writes the collected metrics directly * Metrics processing: * metrics can be aggregated into a report showing the cumulative values for each metric * aggregated metrics can also be returned as a Scala Map or Python dictionary @@ -288,7 +289,7 @@ SparkMeasure is one tool for many different use cases, languages, and environmen in notebooks and in application code for Scala, Java, and Python. - sparkMeasure collects data in a flat structure, which makes it natural to use Spark SQL for workload data analysis/ - - sparkMeasure can sink metrics data into external systems (Filesystem, InfluxDB, Apache Kafka) + - sparkMeasure can sink metrics data into external systems (Filesystem, InfluxDB, Apache Kafka, Prometheus Pushgateway) - What are known limitations and gotchas? - sparkMeasure does not collect all the data available in the EventLog @@ -299,11 +300,11 @@ SparkMeasure is one tool for many different use cases, languages, and environmen - Metrics are collected on the driver, which could become a bottleneck. This is an issues affecting tools based on Spark ListenerBus instrumentation, such as the Spark WebUI. In addition, note that sparkMeasure in the current version buffers all data in the driver memory. - The notable exception is when using the Flight recorder mode with InfluxDB and - Apache Kafka sink, in this case metrics are directly sent to InfluxDB/Kafka + The notable exception is when using the Flight recorder mode with InfluxDB or + Apache Kafka or Prometheus Pushgateway sink, in this case metrics are directly sent to InfluxDB/Kafka/Prometheus Pushgateway. - Task metrics values are collected by sparkMeasure only for successfully executed tasks. Note that resources used by failed tasks are not collected in the current version. The notable exception is - with the Flight recorder mode with InfluxDB sink and with Apache Kafka. + with the Flight recorder mode with InfluxDB or Apache Kafka or Prometheus Pushgateway sink. - sparkMeasure collects and processes data in order of stage and/or task completion. This means that the metrics data is not available in real-time, but rather with a delay that depends on the workload and the size of the data. Moreover, performance data of jobs executing at the same time can be mixed. @@ -320,8 +321,8 @@ SparkMeasure is one tool for many different use cases, languages, and environmen - How can I save/sink the collected metrics? - You can print metrics data and reports to standard output or save them to files, using a locally mounted filesystem or a Hadoop compliant filesystem (including HDFS). - Additionally, you can sink metrics to external systems (such as Prometheus). - The Flight Recorder mode can sink to InfluxDB and Apache Kafka. + Additionally, you can sink metrics to external systems (such as Prometheus Pushgateway). + The Flight Recorder mode can sink to InfluxDB, Apache Kafka or Prometheus Pushgateway. - How can I process metrics data? - You can use Spark to read the saved metrics data and perform further post-processing and analysis. diff --git a/docs/Flight_recorder_mode_PrometheusPushgatewaySink.md b/docs/Flight_recorder_mode_PrometheusPushgatewaySink.md new file mode 100644 index 0000000..30a88e3 --- /dev/null +++ b/docs/Flight_recorder_mode_PrometheusPushgatewaySink.md @@ -0,0 +1,60 @@ +# SparkMeasure Flight Recorder mode - Prometheus Pushgateway Sink + +Use sparkMeasure in flight recorder mode to instrument Spark applications without touching their code. +Flight recorder mode attaches a Spark Listener that collects the metrics while the application runs. +This describes how to sink Spark metrics to a Prometheus Gateway. + +## PushGatewaySink + +**PushGatewaySink** is a class that extends the SparkListener infrastructure. +It collects and writes Spark metrics and application info in near real-time to a Prometheus Gateway instance. +provided by the user. Use this mode to monitor Spark execution workload. +Notes, the amount of data generated is relatively small in most applications: O(number_of_stages) + +How to use: attach the PrometheusGatewaySink to a Spark Context using the listener infrastructure. Example: + - `--conf spark.extraListeners=ch.cern.sparkmeasure.PushGatewaySink` + +Configuration for the is handled with Spark configuration parameters. +Note: you can add configuration using --config option when using spark-submit +use the .config method when allocating the Spark Session in Scala/Python/Java). +Configurations: + ``` +Option 1 (recommended) Start the listener for PushGatewaySink: +--conf spark.extraListeners=ch.cern.sparkmeasure.PushGatewaySink + +Configuration - PushGatewaySink parameters: + +--conf spark.sparkmeasure.pushgateway=SERVER:PORT + Example: --conf spark.sparkmeasure.pushgateway=localhost:9091 +--conf spark.sparkmeasure.pushgateway.jobname=JOBNAME // defaut value is pushgateway + Example: --conf spark.sparkmeasure.pushgateway.jobname=myjob1 +``` + +## Use case + +- The use case for this sink it to extend Spark monitoring, by writing execution metrics into Prometheus via the Pushgateway, + as Prometheus has a pull-based architecture. You'll need to configure Prometheus to pull metrics from the Pushgateway. + You'll also need to set up a performance dashboard from the metrics collected by Prometheus. + + +## Example of how to use Prometheus PushGatewaySink + +- Start the Prometheus Pushgateway + - Download and start the Pushgateway, from the [Prometheus download page](https://prometheus.io/download/) + +- Start Spark with sparkMeasure and attach the PushGatewaySink listener + -Note: make sure there is no firewall blocking connectivity between the driver and + the Pushgateway +``` +Examples: +bin/spark-shell \ +--conf spark.extraListeners=ch.cern.sparkmeasure.PushGatewaySink \ +--conf spark.sparkmeasure.pushgateway=localhost:9091 \ +--packages ch.cern.sparkmeasure:spark-measure_2.12:0.24 +``` + +- Look at the metrics being written to the Pushgateway + - Use the Web UI to look at the metrics being written to the Pushgateway + - Open a web browser and go to the WebUI, for example: http://localhost:9091/metrics + - You should see the metrics being written to the Pushgateway as jobs are run in Spark +``` diff --git a/docs/Reference_SparkMeasure_API_and_Configs.md b/docs/Reference_SparkMeasure_API_and_Configs.md index 453a39d..0b78c74 100644 --- a/docs/Reference_SparkMeasure_API_and_Configs.md +++ b/docs/Reference_SparkMeasure_API_and_Configs.md @@ -400,7 +400,7 @@ spark.sparkmeasure.influxdbEnableBatch, boolean, default true This code depends on "influxdb.java", you may need to add the dependency explicitly: --packages org.influxdb:influxdb-java:2.14 - Note currently we need to use version 2.14 as newer versions generate jar conflicts (tested up to Spark 3.3.0) + Note currently we need to use version 2.14 as newer versions generate jar conflicts ``` ## KafkaSink and KafkaSinkExtended @@ -428,8 +428,25 @@ Configuration - KafkaSink parameters: Example: --conf spark.sparkmeasure.kafkaTopic=sparkmeasure-stageinfo Note: the topic will be created if it does not yet exist -This code depends on "kafka-clients", you may need to add the dependency explicitly: - --packages org.apache.kafka:kafka-clients:3.2.1 +This code depends on "kafka-clients", you may need to add the dependency explicitly, example: + --packages org.apache.kafka:kafka-clients:3.7.0 +``` + +## PushGatewaySink +``` +class PushGatewaySink(conf: SparkConf) extends SparkListener + +**PushGatewaySink** is a class that extends the SparkListener infrastructure. +It collects and writes Spark metrics and application info in near real-time to a Prometheus Pushgateway instance +provided by the user. Use this mode to monitor Spark execution workload. +Notes, the amount of data generated is relatively small in most applications: O(number_of_stages) +* How to use: attach the PrometheusGatewaySink to a Spark Context using the listener infrastructure. Example: +* --conf spark.extraListeners=ch.cern.sparkmeasure.PushGatewaySink + +Configuration - PushGatewaySink parameters +--conf spark.sparkmeasure.pushgateway=SERVER:PORT + Example: --conf spark.sparkmeasure.pushgateway=localhost:9091 +--conf spark.sparkmeasure.pushgateway.jobname=JOBNAME // default is pushgateway ``` ## IOUtils @@ -480,4 +497,5 @@ def parseInfluxDBCredentials(conf: SparkConf, logger: Logger) : (String,String) def parseInfluxDBName(conf: SparkConf, logger: Logger) : String def parseInfluxDBStagemetrics(conf: SparkConf, logger: Logger) : Boolean def parseKafkaConfig(conf: SparkConf, logger: Logger) : (String,String) +def parsePushGatewayConfig(conf: SparkConf, logger: Logger): (String, String) ``` diff --git a/src/main/scala/ch/cern/sparkmeasure/PushGateway.scala b/src/main/scala/ch/cern/sparkmeasure/PushGateway.scala index 79c1007..1f6980a 100644 --- a/src/main/scala/ch/cern/sparkmeasure/PushGateway.scala +++ b/src/main/scala/ch/cern/sparkmeasure/PushGateway.scala @@ -144,7 +144,7 @@ case class PushGateway(serverIPnPort: String, metricsJob: String) { val responseCode = connection.getResponseCode() val responseMessage = connection.getResponseMessage() connection.disconnect(); - if (responseCode != 202) // 202 Accepted, 400 Bad Request + if (responseCode != 200 && responseCode != 202) // 200 and 202 Accepted, 400 Bad Request logger.error(s"Data sent error, url: '$urlFull', response: $responseCode '$responseMessage'") } catch { case ste: java.net.SocketTimeoutException => diff --git a/src/main/scala/ch/cern/sparkmeasure/PushGatewaySink.scala b/src/main/scala/ch/cern/sparkmeasure/PushGatewaySink.scala new file mode 100644 index 0000000..dcb0b0d --- /dev/null +++ b/src/main/scala/ch/cern/sparkmeasure/PushGatewaySink.scala @@ -0,0 +1,207 @@ +package ch.cern.sparkmeasure + +import org.apache.spark.SparkConf +import org.apache.spark.scheduler._ +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.execution.ui.{SparkListenerSQLExecutionEnd, SparkListenerSQLExecutionStart} +import org.slf4j.{Logger, LoggerFactory} + +import scala.util.Try + +/** + * PushGatewaySink: write Spark metrics and application info in near real-time to Prometheus Push Gateway + * use this mode to monitor Spark execution workload + * use for Grafana dashboard and analytics of job execution + * Limitation: only metrics with numeric values are reported to the Push Gateway + * + * How to use: attach the PushGatewaySink to a Spark Context using the extra listener infrastructure. + * Example: + * --conf spark.extraListeners=ch.cern.sparkmeasure.PushGatewaySink + * + * Configuration for PushGatewaySink is handled with Spark conf parameters: + * spark.sparkmeasure.pushgateway = SERVER:PORT // Prometheus Push Gateway URL + * spark.sparkmeasure.pushgateway.jobname // value for the job label, default pushgateway + * Example: --conf spark.sparkmeasure.pushgateway=localhost:9091 + * + * Output: each message contains the metric name and value, only numeric values are used + * Note: the amount of data generated is relatively small in most applications: O(number_of_stages) + */ +class PushGatewaySink(conf: SparkConf) extends SparkListener { + private val logger: Logger = LoggerFactory.getLogger(this.getClass.getName) + logger.warn("Custom monitoring listener with Prometheus Push Gateway sink initializing. Now attempting to connect to the Push Gateway") + + // Initialize PushGateway connection + val (url, job) = Utils.parsePushGatewayConfig(conf, logger) + val gateway = PushGateway(url, job) + + var appId: String = SparkSession.getActiveSession match { + case Some(sparkSession) => sparkSession.sparkContext.applicationId + case _ => "noAppId" + } + + override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = { + val submissionTime = stageSubmitted.stageInfo.submissionTime.getOrElse(0L) + val attemptNumber = stageSubmitted.stageInfo.attemptNumber().toLong + val stageId = stageSubmitted.stageInfo.stageId.toLong + val epochMillis = System.currentTimeMillis() + + val metrics = Map[String, Any]( + "name" -> "stages_started", + "appId" -> appId, + "stageId" -> stageId, + "attemptNumber" -> attemptNumber, + "submissionTime" -> submissionTime, + "epochMillis" -> epochMillis + ) + report(s"stageSubmitted-${stageId}-${attemptNumber}", metrics) + } + + + override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { + val stageId = stageCompleted.stageInfo.stageId.toLong + val submissionTime = stageCompleted.stageInfo.submissionTime.getOrElse(0L) + val completionTime = stageCompleted.stageInfo.completionTime.getOrElse(0L) + val attemptNumber = stageCompleted.stageInfo.attemptNumber().toLong + val epochMillis = System.currentTimeMillis() + + // Report overall metrics + val stageEndMetrics = Map[String, Any]( + "name" -> "stages_ended", + "appId" -> appId, + "stageId" -> stageId, + "attemptNumber" -> attemptNumber, + "submissionTime" -> submissionTime, + "completionTime" -> completionTime, + "epochMillis" -> epochMillis + ) + report(s"stageEnd-${stageId}-${attemptNumber}", stageEndMetrics) + + // Report stage task metric + val taskMetrics = stageCompleted.stageInfo.taskMetrics + val stageTaskMetrics = Map[String, Any]( + "name" -> "stage_metrics", + "appId" -> appId, + "stageId" -> stageId, + "attemptNumber" -> attemptNumber, + "submissionTime" -> submissionTime, + "completionTime" -> completionTime, + "failureReason" -> stageCompleted.stageInfo.failureReason.getOrElse(""), + "executorRunTime" -> taskMetrics.executorRunTime, + "executorCpuTime" -> taskMetrics.executorRunTime, + "executorDeserializeCpuTime" -> taskMetrics.executorDeserializeCpuTime, + "executorDeserializeTime" -> taskMetrics.executorDeserializeTime, + "jvmGCTime" -> taskMetrics.jvmGCTime, + "memoryBytesSpilled" -> taskMetrics.memoryBytesSpilled, + "peakExecutionMemory" -> taskMetrics.peakExecutionMemory, + "resultSerializationTime" -> taskMetrics.resultSerializationTime, + "resultSize" -> taskMetrics.resultSize, + "bytesRead" -> taskMetrics.inputMetrics.bytesRead, + "recordsRead" -> taskMetrics.inputMetrics.recordsRead, + "bytesWritten" -> taskMetrics.outputMetrics.bytesWritten, + "recordsWritten" -> taskMetrics.outputMetrics.recordsWritten, + "shuffleTotalBytesRead" -> taskMetrics.shuffleReadMetrics.totalBytesRead, + "shuffleRemoteBytesRead" -> taskMetrics.shuffleReadMetrics.remoteBytesRead, + "shuffleRemoteBytesReadToDisk" -> taskMetrics.shuffleReadMetrics.remoteBytesReadToDisk, + "shuffleLocalBytesRead" -> taskMetrics.shuffleReadMetrics.localBytesRead, + "shuffleTotalBlocksFetched" -> taskMetrics.shuffleReadMetrics.totalBlocksFetched, + "shuffleLocalBlocksFetched" -> taskMetrics.shuffleReadMetrics.localBlocksFetched, + "shuffleRemoteBlocksFetched" -> taskMetrics.shuffleReadMetrics.remoteBlocksFetched, + "shuffleRecordsRead" -> taskMetrics.shuffleReadMetrics.recordsRead, + "shuffleFetchWaitTime" -> taskMetrics.shuffleReadMetrics.fetchWaitTime, + "shuffleBytesWritten" -> taskMetrics.shuffleWriteMetrics.bytesWritten, + "shuffleRecordsWritten" -> taskMetrics.shuffleWriteMetrics.recordsWritten, + "shuffleWriteTime" -> taskMetrics.shuffleWriteMetrics.writeTime, + "epochMillis" -> epochMillis + ) + + report(s"stageMetrics-${stageId}-${attemptNumber}", stageTaskMetrics) + } + + override def onOtherEvent(event: SparkListenerEvent): Unit = { + val epochMillis = System.currentTimeMillis() + event match { + case e: SparkListenerSQLExecutionStart => + val startTime = e.time + val queryId = e.executionId + val description = e.description + + val queryStartMetrics = Map[String, Any]( + "name" -> "queries_started", + "appId" -> appId, + "description" -> description, + "queryId" -> queryId, + "startTime" -> startTime, + "epochMillis" -> epochMillis + ) + report(s"queryStart-${queryId}", queryStartMetrics) + case e: SparkListenerSQLExecutionEnd => + val endTime = e.time + val queryId = e.executionId + + val queryEndMetrics = Map[String, Any]( + "name" -> "queries_ended", + "appId" -> appId, + "queryId" -> queryId, + "endTime" -> endTime, + "epochMillis" -> epochMillis + ) + report(s"queryEnd-${queryId}", queryEndMetrics) + case _ => None // Ignore + } + } + + override def onJobStart(jobStart: SparkListenerJobStart): Unit = { + val startTime = jobStart.time + val jobId = jobStart.jobId.toLong + val epochMillis = System.currentTimeMillis() + + val jobStartMetrics = Map[String, Any]( + "name" -> "jobs_started", + "appId" -> appId, + "jobId" -> jobId, + "startTime" -> startTime, + "epochMillis" -> epochMillis + ) + report(s"jobStart-${jobId}", jobStartMetrics) + } + + override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { + val completionTime = jobEnd.time + val jobId = jobEnd.jobId.toLong + val epochMillis = System.currentTimeMillis() + + val jobEndMetrics = Map[String, Any]( + "name" -> "jobs_ended", + "appId" -> appId, + "jobId" -> jobId, + "completionTime" -> completionTime, + "epochMillis" -> epochMillis + ) + report(s"jobEnd-${jobId}", jobEndMetrics) + } + + override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = { + appId = applicationStart.appId.getOrElse("noAppId") + } + + override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { + logger.info(s"Spark application ended, timestamp = ${applicationEnd.time}") + } + + + protected def report[T <: Any](metricsType: String, metrics: Map[String, T]): Unit = Try { + + var strMetrics = s"" + metrics.foreach { + case (metric: String, value: Long) => + strMetrics += gateway.validateMetric(metric.toLowerCase()) + s" " + value.toString + s"\n" + case (_, _) => // Discard + } + + gateway.post(strMetrics, metricsType, "appid", appId) + + }.recover { + case ex: Throwable => logger.error(s"error on reporting metrics to Push Gateway, details=${ex.getMessage}", ex) + } + +} diff --git a/src/main/scala/ch/cern/sparkmeasure/Utils.scala b/src/main/scala/ch/cern/sparkmeasure/Utils.scala index 7b1d413..a0aa3e9 100644 --- a/src/main/scala/ch/cern/sparkmeasure/Utils.scala +++ b/src/main/scala/ch/cern/sparkmeasure/Utils.scala @@ -237,7 +237,7 @@ object Utils { } def parseKafkaConfig(conf: SparkConf, logger: Logger) : (String,String) = { - // handle InfluxDB username and password + // handle Kafka broker and topic val broker = conf.get("spark.sparkmeasure.kafkaBroker", "") val topic = conf.get("spark.sparkmeasure.kafkaTopic", "") if (broker.isEmpty || topic.isEmpty) { @@ -249,6 +249,19 @@ object Utils { (broker, topic) } + def parsePushGatewayConfig(conf: SparkConf, logger: Logger): (String, String) = { + // handle Push Gateway URL + val URL = conf.get("spark.sparkmeasure.pushgateway", "") + if (URL.isEmpty) { + throw new IllegalArgumentException("SERVER:PORT configuration for the Prometheus Push Gateway is required, use --conf spark.sparkmeasure.pushgateway=SERVER:PORT") + } else { + logger.info(s"Prometheus Push Gateway server and port: $URL") + } + // sets the value for the label "job" reported with each metrics point + val job = conf.get("spark.sparkmeasure.pushgateway.jobname", "pushgateway") + (URL, job) + } + // handle list of metrics to process by the listener onExecutorMetricsUpdate // returns an array with the metrics to process def parseExecutorMetricsConfig(conf: SparkConf, logger: Logger) : Array[String] = {