Skip to content

Spark Job Framework

Jan Ehmueller edited this page Oct 13, 2017 · 2 revisions

Extend SparkJob Trait & set instance variables

  • the program arguments are written to the instance variable args before the job is executed
class MySparkJob extends SparkJob {
	// displayed in Spark Web UI
	appName = "My Spark Job"

	// files in src/main/resources/configs
	configFile = "myConfig.xml" 
	
	// extra Spark options set for the job
	sparkOptions("spark option") = "value)

	// Cassandra queries to be executed before the load and save functions
	cassandraLoadQueries += "query"
	cassandraSaveQueries += "query"

	// ...
}

Define input and output instance variables

// input
var entities: RDD[Entity] = _
var annotations: RDD[Annotation] = _
// output
var annotatedEntities: RDD[Entity] = _

Override methods

load

  • sets input RDDs of job instance
override def load(sc: SparkContext): Unit = {
	// set input instance variables of your job
	entities = sc.cassandraTable[Entity](...)
	annotations = sc.cassandraTable[Annotation](...)
}

run

  • contains the data processing of your job
  • uses the input RDDs set in the load function to create new data which is written to the output RDDs of the job
override def load(sc: SparkContext): Unit = {
	// data processing of your job
	// more code ...
	annotatedEntities = annotateEntities(...)
}

save

  • saves output RDDs of job instance to, e.g., the Cassandra
override def save(sc: SparkContext): Unit = {
	// save output instance variables of your job
	annotatedEntities.saveToCassandra(...)
}

Exclude load and save from scoverage

  • these methods should ONLY contain the code connecting/loading/saving from/to the Cassandra/HDFS.
// $COVERAGE-OFF$
override def load(sc: SparkContext): Unit = { ... }
override def save(sc: SparkContext): Unit = { ... }
// $COVERAGE-ON$