-
Notifications
You must be signed in to change notification settings - Fork 1
Spark Job Framework
Jan Ehmueller edited this page Oct 13, 2017
·
2 revisions
- the program arguments are written to the instance variable
args
before the job is executed
class MySparkJob extends SparkJob {
// displayed in Spark Web UI
appName = "My Spark Job"
// files in src/main/resources/configs
configFile = "myConfig.xml"
// extra Spark options set for the job
sparkOptions("spark option") = "value)
// Cassandra queries to be executed before the load and save functions
cassandraLoadQueries += "query"
cassandraSaveQueries += "query"
// ...
}
// input
var entities: RDD[Entity] = _
var annotations: RDD[Annotation] = _
// output
var annotatedEntities: RDD[Entity] = _
- sets input RDDs of job instance
override def load(sc: SparkContext): Unit = {
// set input instance variables of your job
entities = sc.cassandraTable[Entity](...)
annotations = sc.cassandraTable[Annotation](...)
}
- contains the data processing of your job
- uses the input RDDs set in the load function to create new data which is written to the output RDDs of the job
override def load(sc: SparkContext): Unit = {
// data processing of your job
// more code ...
annotatedEntities = annotateEntities(...)
}
- saves output RDDs of job instance to, e.g., the Cassandra
override def save(sc: SparkContext): Unit = {
// save output instance variables of your job
annotatedEntities.saveToCassandra(...)
}
- these methods should ONLY contain the code connecting/loading/saving from/to the Cassandra/HDFS.
// $COVERAGE-OFF$
override def load(sc: SparkContext): Unit = { ... }
override def save(sc: SparkContext): Unit = { ... }
// $COVERAGE-ON$