From 55562d6dea5634dbc037e966ac2eeb199d7eaf05 Mon Sep 17 00:00:00 2001 From: eambrosio Date: Fri, 22 Apr 2016 13:10:50 +0200 Subject: [PATCH] [SPARTA-602] set remember field when creating new StreamContext --- .../com/stratio/sparta/driver/SpartaJob.scala | 2 +- .../driver/factory/SparkContextFactory.scala | 30 ++++++++------ .../factory/SparkContextFactoryTest.scala | 4 +- .../service/http/HttpServiceBaseTest.scala | 3 +- .../serving/api/utils/BaseUtilsTest.scala | 3 +- .../models/AggregationPoliciesModel.scala | 32 ++++++++------- .../core/helpers/PolicyHelperTest.scala | 7 +++- .../core/models/AggregationPolicyTest.scala | 9 +++-- web/src/data-templates/policy.json | 40 +++++++++++++++++++ web/src/languages/en-US.json | 1 + .../scripts/factories/policy-model-factory.js | 7 ++++ web/src/scripts/services/policy-service.js | 9 ++++- .../stratio-ui/template/form/form_field.html | 6 +-- .../template/form/form_field_select.html | 6 +-- .../modal/policy-creation-modal.tpl.html | 9 +++-- web/test/API/policy-service.js | 2 +- web/test/mock/api/policy.json | 1 + web/test/mock/model/policy.json | 2 + .../wizard/controller/policy-controller.js | 2 +- .../controller/policy-model-controller.js | 2 +- .../wizard/factory/policy-model-factory.js | 1 - 21 files changed, 124 insertions(+), 54 deletions(-) diff --git a/driver/src/main/scala/com/stratio/sparta/driver/SpartaJob.scala b/driver/src/main/scala/com/stratio/sparta/driver/SpartaJob.scala index 324820e57a..0fbe68ffc7 100644 --- a/driver/src/main/scala/com/stratio/sparta/driver/SpartaJob.scala +++ b/driver/src/main/scala/com/stratio/sparta/driver/SpartaJob.scala @@ -48,7 +48,7 @@ class SpartaJob(policy: AggregationPoliciesModel) extends SLF4JLogging { def run(sc: SparkContext): StreamingContext = { val checkpointPolicyPath = policy.checkpointPath.concat(File.separator).concat(policy.name) val sparkStreamingWindow = OperationsHelper.parseValueToMilliSeconds(policy.sparkStreamingWindow) - val ssc = sparkStreamingInstance(new Duration(sparkStreamingWindow), checkpointPolicyPath) + val ssc = sparkStreamingInstance(Duration(sparkStreamingWindow), checkpointPolicyPath, policy.remember) val parserSchemas = SchemaHelper.getSchemasFromParsers(policy.transformations, Input.InitSchema) val parsers = SpartaJob.getParsers(policy, ReflectionUtils, parserSchemas).sorted val cubes = SpartaJob.getCubes(policy, ReflectionUtils, getSchemaWithoutRaw(parserSchemas)) diff --git a/driver/src/main/scala/com/stratio/sparta/driver/factory/SparkContextFactory.scala b/driver/src/main/scala/com/stratio/sparta/driver/factory/SparkContextFactory.scala index c64dd04756..bec845fc20 100644 --- a/driver/src/main/scala/com/stratio/sparta/driver/factory/SparkContextFactory.scala +++ b/driver/src/main/scala/com/stratio/sparta/driver/factory/SparkContextFactory.scala @@ -13,21 +13,22 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package com.stratio.sparta.driver.factory import java.io.File -import java.net.URI +import scala.collection.JavaConversions._ +import scala.util.Try import akka.event.slf4j.SLF4JLogging -import com.stratio.sparta.serving.core.SpartaConfig -import com.stratio.sparta.serving.core.constants.AppConstant import com.typesafe.config.Config import org.apache.spark.sql.SQLContext import org.apache.spark.streaming.{Duration, StreamingContext} import org.apache.spark.{SparkConf, SparkContext} -import scala.collection.JavaConversions._ -import scala.util.Try +import com.stratio.sparta.serving.core.SpartaConfig +import com.stratio.sparta.serving.core.constants.AppConstant +import com.stratio.sparta.serving.core.helpers.OperationsHelper object SparkContextFactory extends SLF4JLogging { @@ -47,11 +48,12 @@ object SparkContextFactory extends SLF4JLogging { def sparkStreamingInstance: Option[StreamingContext] = ssc - def sparkStreamingInstance(batchDuration: Duration, checkpointDir: String): Option[StreamingContext] = { + def sparkStreamingInstance(batchDuration: Duration, checkpointDir: String, remember: Option[String]): + Option[StreamingContext] = { synchronized { ssc match { case Some(_) => ssc - case None => ssc = Some(getNewStreamingContext(batchDuration, checkpointDir)) + case None => ssc = Some(getNewStreamingContext(batchDuration, checkpointDir, remember)) } } ssc @@ -61,15 +63,17 @@ object SparkContextFactory extends SLF4JLogging { def setSparkStreamingContext(createdContext: StreamingContext): Unit = ssc = Option(createdContext) - private def getNewStreamingContext(batchDuration: Duration, checkpointDir: String): StreamingContext = { + private def getNewStreamingContext(batchDuration: Duration, checkpointDir: String, remember: Option[String]): + StreamingContext = { val ssc = new StreamingContext(sc.get, batchDuration) ssc.checkpoint(checkpointDir) + remember.foreach(value => ssc.remember(Duration(OperationsHelper.parseValueToMilliSeconds(value)))) ssc } def sparkStandAloneContextInstance(generalConfig: Option[Config], - specificConfig: Map[String, String], - jars: Seq[File]): SparkContext = + specificConfig: Map[String, String], + jars: Seq[File]): SparkContext = synchronized { sc.getOrElse(instantiateStandAloneContext(generalConfig, specificConfig, jars)) } @@ -80,15 +84,15 @@ object SparkContextFactory extends SLF4JLogging { } private def instantiateStandAloneContext(generalConfig: Option[Config], - specificConfig: Map[String, String], - jars: Seq[File]): SparkContext = { + specificConfig: Map[String, String], + jars: Seq[File]): SparkContext = { sc = Some(SparkContext.getOrCreate(configToSparkConf(generalConfig, specificConfig))) jars.foreach(f => sc.get.addJar(f.getAbsolutePath)) sc.get } private def instantiateClusterContext(specificConfig: Map[String, String], - jars: Seq[String]): SparkContext = { + jars: Seq[String]): SparkContext = { sc = Some(SparkContext.getOrCreate(configToSparkConf(None, specificConfig))) jars.foreach(f => sc.get.addJar(f)) sc.get diff --git a/driver/src/test/scala/com/stratio/sparta/driver/test/factory/SparkContextFactoryTest.scala b/driver/src/test/scala/com/stratio/sparta/driver/test/factory/SparkContextFactoryTest.scala index f45e0f81e2..f628a95f33 100644 --- a/driver/src/test/scala/com/stratio/sparta/driver/test/factory/SparkContextFactoryTest.scala +++ b/driver/src/test/scala/com/stratio/sparta/driver/test/factory/SparkContextFactoryTest.scala @@ -65,9 +65,9 @@ class SparkContextFactoryTest extends FlatSpec with ShouldMatchers with BeforeAn val checkpointDir = "checkpoint/SparkContextFactorySpec" val sc = SparkContextFactory.sparkStandAloneContextInstance(config, specificConfig, Seq()) SparkContextFactory.sparkStreamingInstance should be(None) - val ssc = SparkContextFactory.sparkStreamingInstance(batchDuraction, checkpointDir) + val ssc = SparkContextFactory.sparkStreamingInstance(batchDuraction, checkpointDir, None) ssc shouldNot be equals (None) - val otherSsc = SparkContextFactory.sparkStreamingInstance(batchDuraction, checkpointDir) + val otherSsc = SparkContextFactory.sparkStreamingInstance(batchDuraction, checkpointDir, None) ssc should be equals (otherSsc) } } diff --git a/serving-api/src/test/scala/com/stratio/sparta/serving/api/service/http/HttpServiceBaseTest.scala b/serving-api/src/test/scala/com/stratio/sparta/serving/api/service/http/HttpServiceBaseTest.scala index bb54bfcbb1..a7db3bcb0c 100644 --- a/serving-api/src/test/scala/com/stratio/sparta/serving/api/service/http/HttpServiceBaseTest.scala +++ b/serving-api/src/test/scala/com/stratio/sparta/serving/api/service/http/HttpServiceBaseTest.scala @@ -93,7 +93,8 @@ trait HttpServiceBaseTest extends WordSpec input, outputs, Seq(), - userPluginsJars = Seq.empty[String]) + userPluginsJars = Seq.empty[String], + remember = None) policy } diff --git a/serving-api/src/test/scala/com/stratio/sparta/serving/api/utils/BaseUtilsTest.scala b/serving-api/src/test/scala/com/stratio/sparta/serving/api/utils/BaseUtilsTest.scala index 13672a089d..aae6d63ec7 100644 --- a/serving-api/src/test/scala/com/stratio/sparta/serving/api/utils/BaseUtilsTest.scala +++ b/serving-api/src/test/scala/com/stratio/sparta/serving/api/utils/BaseUtilsTest.scala @@ -161,7 +161,8 @@ abstract class BaseUtilsTest extends TestKit(ActorSystem("UtilsText")) input, outputs, Seq(), - userPluginsJars = Seq.empty[String]) + userPluginsJars = Seq.empty[String], + remember = None) policy } diff --git a/serving-core/src/main/scala/com/stratio/sparta/serving/core/models/AggregationPoliciesModel.scala b/serving-core/src/main/scala/com/stratio/sparta/serving/core/models/AggregationPoliciesModel.scala index 74f58e3294..32e04186e3 100644 --- a/serving-core/src/main/scala/com/stratio/sparta/serving/core/models/AggregationPoliciesModel.scala +++ b/serving-core/src/main/scala/com/stratio/sparta/serving/core/models/AggregationPoliciesModel.scala @@ -20,21 +20,23 @@ import com.stratio.sparta.serving.core.exception.ServingCoreException import com.stratio.sparta.serving.core.helpers.OperationsHelper import com.stratio.sparta.serving.core.policy.status.PolicyStatusEnum -case class AggregationPoliciesModel(id: Option[String] = None, - version: Option[Int] = None, - storageLevel: Option[String] = AggregationPoliciesModel.storageDefaultValue, - name: String, - description: String = "default description", - sparkStreamingWindow: String = AggregationPoliciesModel.sparkStreamingWindow, - checkpointPath: String, - rawData: RawDataModel, - transformations: Seq[TransformationsModel], - streamTriggers: Seq[TriggerModel], - cubes: Seq[CubeModel], - input: Option[PolicyElementModel] = None, - outputs: Seq[PolicyElementModel], - fragments: Seq[FragmentElementModel], - userPluginsJars: Seq[String]) +case class AggregationPoliciesModel( + id: Option[String] = None, + version: Option[Int] = None, + storageLevel: Option[String] = AggregationPoliciesModel.storageDefaultValue, + name: String, + description: String = "default description", + sparkStreamingWindow: String = AggregationPoliciesModel.sparkStreamingWindow, + checkpointPath: String, + rawData: RawDataModel, + transformations: Seq[TransformationsModel], + streamTriggers: Seq[TriggerModel], + cubes: Seq[CubeModel], + input: Option[PolicyElementModel] = None, + outputs: Seq[PolicyElementModel], + fragments: Seq[FragmentElementModel], + userPluginsJars: Seq[String], + remember: Option[String] = None) case object AggregationPoliciesModel { diff --git a/serving-core/src/test/scala/com/stratio/sparta/serving/core/helpers/PolicyHelperTest.scala b/serving-core/src/test/scala/com/stratio/sparta/serving/core/helpers/PolicyHelperTest.scala index 109cfef66c..ed8781d433 100644 --- a/serving-core/src/test/scala/com/stratio/sparta/serving/core/helpers/PolicyHelperTest.scala +++ b/serving-core/src/test/scala/com/stratio/sparta/serving/core/helpers/PolicyHelperTest.scala @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package com.stratio.sparta.serving.core.helpers import com.stratio.sparta.serving.core.models._ @@ -60,7 +61,8 @@ class PolicyHelperTest extends FeatureSpec with GivenWhenThen with Matchers { description = "description", shortDescription = "short description", element = PolicyElementModel("outputF", "output", Map()))), - userPluginsJars = Seq.empty[String] + userPluginsJars = Seq.empty[String], + remember = None ) When("the helper parse these fragments") @@ -110,7 +112,8 @@ class PolicyHelperTest extends FeatureSpec with GivenWhenThen with Matchers { description = "description", shortDescription = "short description", element = PolicyElementModel("outputF", "output", Map()))), - userPluginsJars = Seq.empty[String] + userPluginsJars = Seq.empty[String], + remember = None ) When("the helper tries to parse the policy it throws an exception") diff --git a/serving-core/src/test/scala/com/stratio/sparta/serving/core/models/AggregationPolicyTest.scala b/serving-core/src/test/scala/com/stratio/sparta/serving/core/models/AggregationPolicyTest.scala index 4e3318c15d..63bf6ff892 100644 --- a/serving-core/src/test/scala/com/stratio/sparta/serving/core/models/AggregationPolicyTest.scala +++ b/serving-core/src/test/scala/com/stratio/sparta/serving/core/models/AggregationPolicyTest.scala @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package com.stratio.sparta.serving.core.models import com.stratio.sparta.sdk.{JsoneyString, DimensionType, Input} @@ -25,7 +26,7 @@ import org.scalatest.mock.MockitoSugar import org.scalatest.{Matchers, WordSpec} @RunWith(classOf[JUnitRunner]) -class AggregationPolicyTest extends WordSpec with Matchers with MockitoSugar{ +class AggregationPolicyTest extends WordSpec with Matchers with MockitoSugar { val rawData = new RawDataModel val fragmentModel = new FragmentElementModel( @@ -101,7 +102,8 @@ class AggregationPolicyTest extends WordSpec with Matchers with MockitoSugar{ input, outputs, Seq(), - userPluginsJars = Seq.empty[String]) + userPluginsJars = Seq.empty[String], + remember = None) val wrongComputeLastPolicy = AggregationPoliciesModel(id = None, version = None, @@ -117,7 +119,8 @@ class AggregationPolicyTest extends WordSpec with Matchers with MockitoSugar{ input, outputs, Seq(), - Seq()) + Seq(), + remember = None) "AggregationPoliciesValidator" should { "non throw an exception if the policy is well formed" in { diff --git a/web/src/data-templates/policy.json b/web/src/data-templates/policy.json index bafdd1362f..e6f9c961f8 100644 --- a/web/src/data-templates/policy.json +++ b/web/src/data-templates/policy.json @@ -71,6 +71,46 @@ } ] }, + { + "propertyId": "rememberNumber", + "propertyName": "_POLICY_REMEMBER_FIELD_", + "propertyType": "number", + "width": 6, + "position": "left", + "required": "scope.form['dataSourcerememberTimeForm']['dataSourcerememberTimeForm'].$modelValue.length > 0", + "qa": "policy-remember-number" + }, + { + "propertyId": "rememberTime", + "propertyName": "_WHITE_SPACE_", + "propertyType": "select", + "width": 6, + "position": "right", + "qa": "policy-remember-time", + "required": "scope.form['dataSourcerememberNumberForm']['dataSourcerememberNumberForm'].$modelValue", + "values": [ + { + "label": "Milliseconds", + "value": "ms" + }, + { + "label": "Seconds", + "value": "s" + }, + { + "label": "Minutes", + "value": "m" + }, + { + "label": "Hours", + "value": "h" + }, + { + "label": "Days", + "value": "d" + } + ] + }, { "propertyId": "storageLevel", "propertyName": "_STORAGELEVEL_", diff --git a/web/src/languages/en-US.json b/web/src/languages/en-US.json index 5de34278fe..763e73b29d 100644 --- a/web/src/languages/en-US.json +++ b/web/src/languages/en-US.json @@ -313,6 +313,7 @@ "_CLUSTER_UI_": "Cluster UI", "_FIXED_MEASURE_NAME_":"Fixed measure name", "_FIXED_MEASURE_VALUE_":"Fixed measure value", + "_POLICY_REMEMBER_FIELD_":"Max query execution time", "_ERROR_": { "_CHANGES_WITHOUT_SAVING_": "You have to save all your changes or cancel them before going to the next step", "_UNAVAILABLE_SERVER_": "Server is unavailable. Please, check your connection and try it again", diff --git a/web/src/scripts/factories/policy-model-factory.js b/web/src/scripts/factories/policy-model-factory.js index 482428aad0..b44f6374ef 100644 --- a/web/src/scripts/factories/policy-model-factory.js +++ b/web/src/scripts/factories/policy-model-factory.js @@ -49,6 +49,7 @@ policy.name = inputPolicyJSON.name; policy.description = inputPolicyJSON.description; policy.sparkStreamingWindow = inputPolicyJSON.sparkStreamingWindow; + policy.remember = inputPolicyJSON.remember; policy.storageLevel = inputPolicyJSON.storageLevel; policy.checkpointPath = inputPolicyJSON.checkpointPath; policy.rawDataEnabled = (inputPolicyJSON.rawData.enabled == "true"); @@ -66,6 +67,12 @@ policy.sparkStreamingWindowNumber = Number(sparkStreamingWindow[1]); policy.sparkStreamingWindowTime = sparkStreamingWindow[2]; delete policy.sparkStreamingWindow; + if (policy.remember) { + var rememberField = policy.remember.split(/([0-9]+)/); + policy.rememberNumber = Number(rememberField[1]); + policy.rememberTime = rememberField[2]; + } + delete policy.remember; } function setStreamTriggers(streamTriggers) { diff --git a/web/src/scripts/services/policy-service.js b/web/src/scripts/services/policy-service.js index 97c6490dea..6e4890f574 100644 --- a/web/src/scripts/services/policy-service.js +++ b/web/src/scripts/services/policy-service.js @@ -72,6 +72,9 @@ convertedDescriptionJson.rawData.path = (vm.policy.rawDataEnabled) ? vm.policy.rawDataPath : null; } convertedDescriptionJson.sparkStreamingWindow = json.sparkStreamingWindowNumber + json.sparkStreamingWindowTime; + if (json.rememberNumber && json.rememberTime) { + convertedDescriptionJson.remember = json.rememberNumber + json.rememberTime; + } return convertedDescriptionJson; } @@ -123,8 +126,8 @@ var cubes = policyJson.cubes; for (var i = 0; i < cubes.length; ++i) { var cube = UtilsService.convertDottedPropertiesToJson(cubes[i]); - if (cube.writer.fixedMeasureName && cube.writer.fixedMeasureValue){ - cube.writer.fixedMeasure = cube.writer.fixedMeasureName + ":" + cube.writer.fixedMeasureValue; + if (cube.writer.fixedMeasureName && cube.writer.fixedMeasureValue) { + cube.writer.fixedMeasure = cube.writer.fixedMeasureName + ":" + cube.writer.fixedMeasureValue; } delete cube.writer.fixedMeasureName; delete cube.writer.fixedMeasureValue; @@ -137,6 +140,8 @@ delete finalJSON['rawDataEnabled']; delete finalJSON['sparkStreamingWindowNumber']; delete finalJSON['sparkStreamingWindowTime']; + delete finalJSON['rememberNumber']; + delete finalJSON['rememberTime']; if (finalJSON.rawData.enabled == 'false') { delete finalJSON.rawData['path']; diff --git a/web/src/stratio-ui/template/form/form_field.html b/web/src/stratio-ui/template/form/form_field.html index f0fe28a0e1..e8add44662 100644 --- a/web/src/stratio-ui/template/form/form_field.html +++ b/web/src/stratio-ui/template/form/form_field.html @@ -2,11 +2,11 @@ - + - + - + diff --git a/web/src/stratio-ui/template/form/form_field_select.html b/web/src/stratio-ui/template/form/form_field_select.html index 3d1049d274..1dd286f653 100644 --- a/web/src/stratio-ui/template/form/form_field_select.html +++ b/web/src/stratio-ui/template/form/form_field_select.html @@ -1,7 +1,7 @@
- -
+ - +
@@ -38,13 +39,13 @@ data-field="properties" data-model="vm.policy" data-form="vm.form" - data-st-name="dataSource{{properties.propertyName}}Form" + data-st-name="dataSource{{properties.propertyId}}Form" data-list-compressed = "true" data-qa="{{properties.qa}}">
- +