Skip to content

Commit

Permalink
[SPARTA-602] set remember field when creating new StreamContext
Browse files Browse the repository at this point in the history
  • Loading branch information
eambrosio committed Apr 22, 2016
1 parent ff2b817 commit 55562d6
Show file tree
Hide file tree
Showing 21 changed files with 124 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class SpartaJob(policy: AggregationPoliciesModel) extends SLF4JLogging {
def run(sc: SparkContext): StreamingContext = {
val checkpointPolicyPath = policy.checkpointPath.concat(File.separator).concat(policy.name)
val sparkStreamingWindow = OperationsHelper.parseValueToMilliSeconds(policy.sparkStreamingWindow)
val ssc = sparkStreamingInstance(new Duration(sparkStreamingWindow), checkpointPolicyPath)
val ssc = sparkStreamingInstance(Duration(sparkStreamingWindow), checkpointPolicyPath, policy.remember)
val parserSchemas = SchemaHelper.getSchemasFromParsers(policy.transformations, Input.InitSchema)
val parsers = SpartaJob.getParsers(policy, ReflectionUtils, parserSchemas).sorted
val cubes = SpartaJob.getCubes(policy, ReflectionUtils, getSchemaWithoutRaw(parserSchemas))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.stratio.sparta.driver.factory

import java.io.File
import java.net.URI
import scala.collection.JavaConversions._
import scala.util.Try

import akka.event.slf4j.SLF4JLogging
import com.stratio.sparta.serving.core.SpartaConfig
import com.stratio.sparta.serving.core.constants.AppConstant
import com.typesafe.config.Config
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.{Duration, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.JavaConversions._
import scala.util.Try
import com.stratio.sparta.serving.core.SpartaConfig
import com.stratio.sparta.serving.core.constants.AppConstant
import com.stratio.sparta.serving.core.helpers.OperationsHelper

object SparkContextFactory extends SLF4JLogging {

Expand All @@ -47,11 +48,12 @@ object SparkContextFactory extends SLF4JLogging {

def sparkStreamingInstance: Option[StreamingContext] = ssc

def sparkStreamingInstance(batchDuration: Duration, checkpointDir: String): Option[StreamingContext] = {
def sparkStreamingInstance(batchDuration: Duration, checkpointDir: String, remember: Option[String]):
Option[StreamingContext] = {
synchronized {
ssc match {
case Some(_) => ssc
case None => ssc = Some(getNewStreamingContext(batchDuration, checkpointDir))
case None => ssc = Some(getNewStreamingContext(batchDuration, checkpointDir, remember))
}
}
ssc
Expand All @@ -61,15 +63,17 @@ object SparkContextFactory extends SLF4JLogging {

def setSparkStreamingContext(createdContext: StreamingContext): Unit = ssc = Option(createdContext)

private def getNewStreamingContext(batchDuration: Duration, checkpointDir: String): StreamingContext = {
private def getNewStreamingContext(batchDuration: Duration, checkpointDir: String, remember: Option[String]):
StreamingContext = {
val ssc = new StreamingContext(sc.get, batchDuration)
ssc.checkpoint(checkpointDir)
remember.foreach(value => ssc.remember(Duration(OperationsHelper.parseValueToMilliSeconds(value))))
ssc
}

def sparkStandAloneContextInstance(generalConfig: Option[Config],
specificConfig: Map[String, String],
jars: Seq[File]): SparkContext =
specificConfig: Map[String, String],
jars: Seq[File]): SparkContext =
synchronized {
sc.getOrElse(instantiateStandAloneContext(generalConfig, specificConfig, jars))
}
Expand All @@ -80,15 +84,15 @@ object SparkContextFactory extends SLF4JLogging {
}

private def instantiateStandAloneContext(generalConfig: Option[Config],
specificConfig: Map[String, String],
jars: Seq[File]): SparkContext = {
specificConfig: Map[String, String],
jars: Seq[File]): SparkContext = {
sc = Some(SparkContext.getOrCreate(configToSparkConf(generalConfig, specificConfig)))
jars.foreach(f => sc.get.addJar(f.getAbsolutePath))
sc.get
}

private def instantiateClusterContext(specificConfig: Map[String, String],
jars: Seq[String]): SparkContext = {
jars: Seq[String]): SparkContext = {
sc = Some(SparkContext.getOrCreate(configToSparkConf(None, specificConfig)))
jars.foreach(f => sc.get.addJar(f))
sc.get
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ class SparkContextFactoryTest extends FlatSpec with ShouldMatchers with BeforeAn
val checkpointDir = "checkpoint/SparkContextFactorySpec"
val sc = SparkContextFactory.sparkStandAloneContextInstance(config, specificConfig, Seq())
SparkContextFactory.sparkStreamingInstance should be(None)
val ssc = SparkContextFactory.sparkStreamingInstance(batchDuraction, checkpointDir)
val ssc = SparkContextFactory.sparkStreamingInstance(batchDuraction, checkpointDir, None)
ssc shouldNot be equals (None)
val otherSsc = SparkContextFactory.sparkStreamingInstance(batchDuraction, checkpointDir)
val otherSsc = SparkContextFactory.sparkStreamingInstance(batchDuraction, checkpointDir, None)
ssc should be equals (otherSsc)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ trait HttpServiceBaseTest extends WordSpec
input,
outputs,
Seq(),
userPluginsJars = Seq.empty[String])
userPluginsJars = Seq.empty[String],
remember = None)
policy
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ abstract class BaseUtilsTest extends TestKit(ActorSystem("UtilsText"))
input,
outputs,
Seq(),
userPluginsJars = Seq.empty[String])
userPluginsJars = Seq.empty[String],
remember = None)
policy
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,23 @@ import com.stratio.sparta.serving.core.exception.ServingCoreException
import com.stratio.sparta.serving.core.helpers.OperationsHelper
import com.stratio.sparta.serving.core.policy.status.PolicyStatusEnum

case class AggregationPoliciesModel(id: Option[String] = None,
version: Option[Int] = None,
storageLevel: Option[String] = AggregationPoliciesModel.storageDefaultValue,
name: String,
description: String = "default description",
sparkStreamingWindow: String = AggregationPoliciesModel.sparkStreamingWindow,
checkpointPath: String,
rawData: RawDataModel,
transformations: Seq[TransformationsModel],
streamTriggers: Seq[TriggerModel],
cubes: Seq[CubeModel],
input: Option[PolicyElementModel] = None,
outputs: Seq[PolicyElementModel],
fragments: Seq[FragmentElementModel],
userPluginsJars: Seq[String])
case class AggregationPoliciesModel(
id: Option[String] = None,
version: Option[Int] = None,
storageLevel: Option[String] = AggregationPoliciesModel.storageDefaultValue,
name: String,
description: String = "default description",
sparkStreamingWindow: String = AggregationPoliciesModel.sparkStreamingWindow,
checkpointPath: String,
rawData: RawDataModel,
transformations: Seq[TransformationsModel],
streamTriggers: Seq[TriggerModel],
cubes: Seq[CubeModel],
input: Option[PolicyElementModel] = None,
outputs: Seq[PolicyElementModel],
fragments: Seq[FragmentElementModel],
userPluginsJars: Seq[String],
remember: Option[String] = None)

case object AggregationPoliciesModel {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.stratio.sparta.serving.core.helpers

import com.stratio.sparta.serving.core.models._
Expand Down Expand Up @@ -60,7 +61,8 @@ class PolicyHelperTest extends FeatureSpec with GivenWhenThen with Matchers {
description = "description",
shortDescription = "short description",
element = PolicyElementModel("outputF", "output", Map()))),
userPluginsJars = Seq.empty[String]
userPluginsJars = Seq.empty[String],
remember = None
)

When("the helper parse these fragments")
Expand Down Expand Up @@ -110,7 +112,8 @@ class PolicyHelperTest extends FeatureSpec with GivenWhenThen with Matchers {
description = "description",
shortDescription = "short description",
element = PolicyElementModel("outputF", "output", Map()))),
userPluginsJars = Seq.empty[String]
userPluginsJars = Seq.empty[String],
remember = None
)

When("the helper tries to parse the policy it throws an exception")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.stratio.sparta.serving.core.models

import com.stratio.sparta.sdk.{JsoneyString, DimensionType, Input}
Expand All @@ -25,7 +26,7 @@ import org.scalatest.mock.MockitoSugar
import org.scalatest.{Matchers, WordSpec}

@RunWith(classOf[JUnitRunner])
class AggregationPolicyTest extends WordSpec with Matchers with MockitoSugar{
class AggregationPolicyTest extends WordSpec with Matchers with MockitoSugar {

val rawData = new RawDataModel
val fragmentModel = new FragmentElementModel(
Expand Down Expand Up @@ -101,7 +102,8 @@ class AggregationPolicyTest extends WordSpec with Matchers with MockitoSugar{
input,
outputs,
Seq(),
userPluginsJars = Seq.empty[String])
userPluginsJars = Seq.empty[String],
remember = None)

val wrongComputeLastPolicy = AggregationPoliciesModel(id = None,
version = None,
Expand All @@ -117,7 +119,8 @@ class AggregationPolicyTest extends WordSpec with Matchers with MockitoSugar{
input,
outputs,
Seq(),
Seq())
Seq(),
remember = None)

"AggregationPoliciesValidator" should {
"non throw an exception if the policy is well formed" in {
Expand Down
40 changes: 40 additions & 0 deletions web/src/data-templates/policy.json
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,46 @@
}
]
},
{
"propertyId": "rememberNumber",
"propertyName": "_POLICY_REMEMBER_FIELD_",
"propertyType": "number",
"width": 6,
"position": "left",
"required": "scope.form['dataSourcerememberTimeForm']['dataSourcerememberTimeForm'].$modelValue.length > 0",
"qa": "policy-remember-number"
},
{
"propertyId": "rememberTime",
"propertyName": "_WHITE_SPACE_",
"propertyType": "select",
"width": 6,
"position": "right",
"qa": "policy-remember-time",
"required": "scope.form['dataSourcerememberNumberForm']['dataSourcerememberNumberForm'].$modelValue",
"values": [
{
"label": "Milliseconds",
"value": "ms"
},
{
"label": "Seconds",
"value": "s"
},
{
"label": "Minutes",
"value": "m"
},
{
"label": "Hours",
"value": "h"
},
{
"label": "Days",
"value": "d"
}
]
},
{
"propertyId": "storageLevel",
"propertyName": "_STORAGELEVEL_",
Expand Down
1 change: 1 addition & 0 deletions web/src/languages/en-US.json
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@
"_CLUSTER_UI_": "Cluster UI",
"_FIXED_MEASURE_NAME_":"Fixed measure name",
"_FIXED_MEASURE_VALUE_":"Fixed measure value",
"_POLICY_REMEMBER_FIELD_":"Max query execution time",
"_ERROR_": {
"_CHANGES_WITHOUT_SAVING_": "You have to save all your changes or cancel them before going to the next step",
"_UNAVAILABLE_SERVER_": "Server is unavailable. Please, check your connection and try it again",
Expand Down
7 changes: 7 additions & 0 deletions web/src/scripts/factories/policy-model-factory.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
policy.name = inputPolicyJSON.name;
policy.description = inputPolicyJSON.description;
policy.sparkStreamingWindow = inputPolicyJSON.sparkStreamingWindow;
policy.remember = inputPolicyJSON.remember;
policy.storageLevel = inputPolicyJSON.storageLevel;
policy.checkpointPath = inputPolicyJSON.checkpointPath;
policy.rawDataEnabled = (inputPolicyJSON.rawData.enabled == "true");
Expand All @@ -66,6 +67,12 @@
policy.sparkStreamingWindowNumber = Number(sparkStreamingWindow[1]);
policy.sparkStreamingWindowTime = sparkStreamingWindow[2];
delete policy.sparkStreamingWindow;
if (policy.remember) {
var rememberField = policy.remember.split(/([0-9]+)/);
policy.rememberNumber = Number(rememberField[1]);
policy.rememberTime = rememberField[2];
}
delete policy.remember;
}

function setStreamTriggers(streamTriggers) {
Expand Down
9 changes: 7 additions & 2 deletions web/src/scripts/services/policy-service.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@
convertedDescriptionJson.rawData.path = (vm.policy.rawDataEnabled) ? vm.policy.rawDataPath : null;
}
convertedDescriptionJson.sparkStreamingWindow = json.sparkStreamingWindowNumber + json.sparkStreamingWindowTime;
if (json.rememberNumber && json.rememberTime) {
convertedDescriptionJson.remember = json.rememberNumber + json.rememberTime;
}
return convertedDescriptionJson;
}

Expand Down Expand Up @@ -123,8 +126,8 @@
var cubes = policyJson.cubes;
for (var i = 0; i < cubes.length; ++i) {
var cube = UtilsService.convertDottedPropertiesToJson(cubes[i]);
if (cube.writer.fixedMeasureName && cube.writer.fixedMeasureValue){
cube.writer.fixedMeasure = cube.writer.fixedMeasureName + ":" + cube.writer.fixedMeasureValue;
if (cube.writer.fixedMeasureName && cube.writer.fixedMeasureValue) {
cube.writer.fixedMeasure = cube.writer.fixedMeasureName + ":" + cube.writer.fixedMeasureValue;
}
delete cube.writer.fixedMeasureName;
delete cube.writer.fixedMeasureValue;
Expand All @@ -137,6 +140,8 @@
delete finalJSON['rawDataEnabled'];
delete finalJSON['sparkStreamingWindowNumber'];
delete finalJSON['sparkStreamingWindowTime'];
delete finalJSON['rememberNumber'];
delete finalJSON['rememberTime'];

if (finalJSON.rawData.enabled == 'false') {
delete finalJSON.rawData['path'];
Expand Down
6 changes: 3 additions & 3 deletions web/src/stratio-ui/template/form/form_field.html
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
<!-- input text -->
<form-field-input ng-attr-id="{{ngFormId}}" data-ng-if="(field.propertyType == 'text' || field.propertyType == 'password' || field.propertyType == 'number') && isVisible()" data-form="form" data-st-name="{{name}}" data-model="model[field.propertyId]" data-label="{{modify.propertyName || field.propertyName | translate}}" data-type="{{field.propertyType}}" data-help="{{modify.tooltip || field.tooltip}}" data-input-default="field.default" data-pattern="field.regexp" data-required="isRequired()" data-qa="{{qa}}" data-maxlength="field.maxlength" data-st-trim="field.trim" data-list-compressed="listCompressed" data-output-field="field.outputField"></form-field-input>
<!-- textarea -->
<form-field-textarea ng-attr-id="{{ngFormId}}" data-ng-if="(field.propertyType == 'textarea') && isVisible()" data-form="form" data-st-name="{{name}}" data-model="model[field.propertyId]" data-label="{{modify.propertyName || field.propertyName | translate}}" data-help="{{modify.tooltip || field.tooltip}}" data-pattern="field.regexp" data-required="field.required" data-list-compressed="listCompressed" data-qa="{{qa}}"></form-field-textarea>
<form-field-textarea ng-attr-id="{{ngFormId}}" data-ng-if="(field.propertyType == 'textarea') && isVisible()" data-form="form" data-st-name="{{name}}" data-model="model[field.propertyId]" data-label="{{modify.propertyName || field.propertyName | translate}}" data-help="{{modify.tooltip || field.tooltip}}" data-pattern="field.regexp" data-required="isRequired()" data-list-compressed="listCompressed" data-qa="{{qa}}"></form-field-textarea>
<!-- select -->
<form-field-select ng-attr-id="{{ngFormId}}" data-ng-if="(field.propertyType == 'select') && isVisible()" data-form="form" data-st-name="{{name}}" data-model="model[field.propertyId]" data-options="field.values" data-field="label" data-label="{{modify.propertyName || field.propertyName | translate}}" data-help="{{modify.tooltip || field.tooltip}}" data-required="field.required" data-list-compressed="listCompressed" data-qa="{{qa}}" data-disabled="disabled"></form-field-select>
<form-field-select ng-attr-id="{{ngFormId}}" data-ng-if="(field.propertyType == 'select') && isVisible()" data-form="form" data-st-name="{{name}}" data-model="model[field.propertyId]" data-options="field.values" data-field="label" data-label="{{modify.propertyName || field.propertyName | translate}}" data-help="{{modify.tooltip || field.tooltip}}" data-required="isRequired()"data-list-compressed="listCompressed" data-qa="{{qa}}" data-disabled="disabled"></form-field-select>
<!-- checkbox -->
<form-field-check ng-attr-id="{{ngFormId}}" data-ng-if="field.propertyType == 'boolean' && isVisible()" data-form="form" data-st-name="{{name}}" data-model="model[field.propertyId]" data-label="{{modify.propertyName || field.propertyName | translate}}" data-type="{{field.propertyType}}" data-help="{{modify.tooltip || field.tooltip}}" data-pattern="field.regexp" data-required="field.required" data-list-compressed="listCompressed" data-disabled="disabled" data-extra-message="{{extraMessage | translate}}" data-qa="{{qa}}"></form-field-check>
<form-field-check ng-attr-id="{{ngFormId}}" data-ng-if="field.propertyType == 'boolean' && isVisible()" data-form="form" data-st-name="{{name}}" data-model="model[field.propertyId]" data-label="{{modify.propertyName || field.propertyName | translate}}" data-type="{{field.propertyType}}" data-help="{{modify.tooltip || field.tooltip}}" data-pattern="field.regexp" data-required="isRequired()" data-list-compressed="listCompressed" data-disabled="disabled" data-extra-message="{{extraMessage | translate}}" data-qa="{{qa}}"></form-field-check>
<!-- list -->
<form-field-list ng-attr-id="{{ngFormId}}" data-ng-if="(field.propertyType == 'list') && isVisible()" data-field="field" data-form="form" data-st-name="{{name}}" data-model="model" data-list-compressed="listCompressed" data-modal="modal" data-qa="{{qa}}"></form-field-list>
<!-- multiple-list -->
Expand Down
Loading

0 comments on commit 55562d6

Please sign in to comment.