Skip to content

Commit

Permalink
Corrected bug in user plugins files (#1801)
Browse files Browse the repository at this point in the history
  • Loading branch information
compae authored Mar 23, 2017
1 parent 5914278 commit 9b7d388
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ object SpartaClusterJob extends PluginsFilesUtils {
statusActor ! Update(PolicyStatusModel(id = policyId, status = Starting, statusInfo = Some(startingInfo)))
val streamingContextService = StreamingContextService(statusActor)

val ssc = streamingContextService.clusterStreamingContext(policy)
val ssc = streamingContextService.clusterStreamingContext(policy, pluginsFiles)
statusActor ! Update(PolicyStatusModel(
id = policyId,
status = NotDefined,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ object SparkContextFactory extends SLF4JLogging {
sc.getOrElse(instantiateSparkContext(specificConfig, jars))
}

def sparkClusterContextInstance(specificConfig: Map[String, String]): SparkContext =
sc.getOrElse(instantiateClusterContext(specificConfig))
def sparkClusterContextInstance(specificConfig: Map[String, String], files: Seq[String]): SparkContext =
sc.getOrElse(instantiateClusterContext(specificConfig, files))

private def instantiateSparkContext(specificConfig: Map[String, String], jars: Seq[File]): SparkContext = {
sc = Some(SparkContext.getOrCreate(configToSparkConf(specificConfig)))
Expand All @@ -86,8 +86,12 @@ object SparkContextFactory extends SLF4JLogging {
sc.get
}

private def instantiateClusterContext(specificConfig: Map[String, String]): SparkContext = {
private def instantiateClusterContext(specificConfig: Map[String, String], files: Seq[String]): SparkContext = {
sc = Some(SparkContext.getOrCreate(configToSparkConf(specificConfig)))
files.foreach(f => {
log.info(s"Adding jar $f to cluster Spark context")
sc.get.addJar(f)
})
sc.get
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,14 @@ case class StreamingContextService(statusActor: ActorRef, generalConfig: Option[
ssc
}

def clusterStreamingContext(policy: PolicyModel)
def clusterStreamingContext(policy: PolicyModel, files: Seq[String])
: StreamingContext = {
if (autoDeleteCheckpointPath(policy)) deleteCheckpointPath(policy)

val ssc = StreamingContext.getOrCreate(checkpointPath(policy), () => {
log.info(s"Nothing in checkpoint path: ${checkpointPath(policy)}")
val outputsSparkConfig = PolicyHelper.getSparkConfigs(policy, OutputsSparkConfiguration, Output.ClassSuffix)
sparkClusterContextInstance(outputsSparkConfig)
sparkClusterContextInstance(outputsSparkConfig, files)
SpartaPipeline(policy, statusActor).run()
})

Expand Down

0 comments on commit 9b7d388

Please sign in to comment.