diff --git a/dist/src/main/resources/reference.conf b/dist/src/main/resources/reference.conf index 1231d5a57a..97a939e091 100644 --- a/dist/src/main/resources/reference.conf +++ b/dist/src/main/resources/reference.conf @@ -53,6 +53,9 @@ sparta.config.checkpointPath = "/tmp/sparta/checkpoint" # Auto delete checkpoint when run policies in cluster mode is necessary HDFS configuration sparta.config.autoDeleteCheckpoint = false +# Add time to checkpoint path when run policies in cluster mode is necessary HDFS configuration +sparta.config.addTimeToCheckpointPath = false + # HDFS # The hadoop user name could be configured by two ways: diff --git a/serving-api/src/main/resources/reference.conf b/serving-api/src/main/resources/reference.conf index 766db95849..d2ef8738c3 100644 --- a/serving-api/src/main/resources/reference.conf +++ b/serving-api/src/main/resources/reference.conf @@ -53,6 +53,9 @@ sparta.config.checkpointPath = "/tmp/sparta/checkpoint" # Auto delete checkpoint when run policies in cluster mode is necessary HDFS configuration sparta.config.autoDeleteCheckpoint = false +# Add time to checkpoint path when run policies in cluster mode is necessary HDFS configuration +sparta.config.addTimeToCheckpointPath = false + # HDFS # The hadoop user name could be configured by two ways: diff --git a/serving-core/src/main/scala/com/stratio/sparta/serving/core/constants/AppConstant.scala b/serving-core/src/main/scala/com/stratio/sparta/serving/core/constants/AppConstant.scala index d4a4de116f..1eecace39d 100644 --- a/serving-core/src/main/scala/com/stratio/sparta/serving/core/constants/AppConstant.scala +++ b/serving-core/src/main/scala/com/stratio/sparta/serving/core/constants/AppConstant.scala @@ -57,6 +57,8 @@ object AppConstant { //Checkpooint val ConfigAutoDeleteCheckpoint = "autoDeleteCheckpoint" val DefaultAutoDeleteCheckpoint = false + val ConfigAddTimeToCheckpointPath = "addTimeToCheckpointPath" + val DefaultAddTimeToCheckpointPath = false val ConfigCheckpointPath = "checkpointPath" val DefaultCheckpointPath = "sparta/checkpoint" val DefaultCheckpointPathLocalMode = s"/tmp/$DefaultCheckpointPath" diff --git a/serving-core/src/main/scala/com/stratio/sparta/serving/core/utils/CheckpointUtils.scala b/serving-core/src/main/scala/com/stratio/sparta/serving/core/utils/CheckpointUtils.scala index 26139211b1..6109024506 100644 --- a/serving-core/src/main/scala/com/stratio/sparta/serving/core/utils/CheckpointUtils.scala +++ b/serving-core/src/main/scala/com/stratio/sparta/serving/core/utils/CheckpointUtils.scala @@ -32,13 +32,13 @@ trait CheckpointUtils extends PolicyConfigUtils { /* PUBLIC METHODS */ def deleteFromLocal(policy: PolicyModel): Unit = { - val checkpointDirectory = checkpointPath(policy, withTime = false) + val checkpointDirectory = checkpointPath(policy, checkTime = false) log.info(s"Deleting checkpoint directory: $checkpointDirectory") FileUtils.deleteDirectory(new File(checkpointDirectory)) } def deleteFromHDFS(policy: PolicyModel): Unit = { - val checkpointDirectory = checkpointPath(policy, withTime = false) + val checkpointDirectory = checkpointPath(policy, checkTime = false) log.info(s"Deleting checkpoint directory: $checkpointDirectory") HdfsUtils().delete(checkpointDirectory) } @@ -54,7 +54,7 @@ trait CheckpointUtils extends PolicyConfigUtils { if (isExecutionType(policy, AppConstant.ConfigLocal)) deleteFromLocal(policy) else deleteFromHDFS(policy) } match { - case Success(_) => log.info(s"Checkpoint deleted in folder: ${checkpointPath(policy, withTime = false)}") + case Success(_) => log.info(s"Checkpoint deleted in folder: ${checkpointPath(policy, checkTime = false)}") case Failure(ex) => log.error("Cannot delete checkpoint folder", ex) } @@ -63,21 +63,22 @@ trait CheckpointUtils extends PolicyConfigUtils { Try { createFromLocal(policy) } match { - case Success(_) => log.info(s"Checkpoint created in folder: ${checkpointPath(policy, withTime = false)}") + case Success(_) => log.info(s"Checkpoint created in folder: ${checkpointPath(policy, checkTime = false)}") case Failure(ex) => log.error("Cannot create checkpoint folder", ex) } } - def checkpointPath(policy: PolicyModel, withTime: Boolean = true): String = { + def checkpointPath(policy: PolicyModel, checkTime: Boolean = true): String = { val path = policy.checkpointPath.map(path => cleanCheckpointPath(path)) .getOrElse(checkpointPathFromProperties(policy)) - if(withTime) s"$path/${policy.name}/${Calendar.getInstance().getTime().getTime}" + if(checkTime && addTimeToCheckpointPathFromProperties()) + s"$path/${policy.name}/${Calendar.getInstance().getTime.getTime}" else s"$path/${policy.name}" } def autoDeleteCheckpointPath(policy: PolicyModel): Boolean = - policy.autoDeleteCheckpoint.getOrElse(autoDeleteCheckpointPathFromProperties) + policy.autoDeleteCheckpoint.getOrElse(autoDeleteCheckpointPathFromProperties()) /* PRIVATE METHODS */ @@ -95,10 +96,14 @@ trait CheckpointUtils extends PolicyConfigUtils { checkpointPath <- Try(cleanCheckpointPath(config.getString(ConfigCheckpointPath))).toOption } yield checkpointPath).getOrElse(generateDefaultCheckpointPath(policy)) - private def autoDeleteCheckpointPathFromProperties: Boolean = + private def autoDeleteCheckpointPathFromProperties(): Boolean = Try(SpartaConfig.getDetailConfig.get.getBoolean(ConfigAutoDeleteCheckpoint)) .getOrElse(DefaultAutoDeleteCheckpoint) + private def addTimeToCheckpointPathFromProperties(): Boolean = + Try(SpartaConfig.getDetailConfig.get.getBoolean(ConfigAddTimeToCheckpointPath)) + .getOrElse(DefaultAddTimeToCheckpointPath) + private def generateDefaultCheckpointPath(policy: PolicyModel): String = executionMode(policy) match { case mode if mode == ConfigMesos || mode == ConfigYarn || mode == ConfigStandAlone => diff --git a/web/src/data-templates/policy.json b/web/src/data-templates/policy.json index bd9bd6e114..91bf5aad49 100644 --- a/web/src/data-templates/policy.json +++ b/web/src/data-templates/policy.json @@ -258,7 +258,7 @@ { "propertyId": "checkpointPath", "propertyName": "_CHECKPOINT_PATH_", - "width": 6, + "width": 12, "position": "left", "propertyType": "text", "required": false, @@ -269,12 +269,22 @@ "propertyId": "autoDeleteCheckpoint", "propertyName": "_AUTO_DELETE_CHECKPOINT_", "width": 6, - "position": "right", + "position": "left", "propertyType": "boolean", "required": false, "tooltip": "Auto delete checkpoint path in HDFS when running policies. Overrides the value in the configuration file.", "qa": "auto-delete-checkpoint" }, + { + "propertyId": "addTimeToCheckpointPath", + "propertyName": "_ADD_TIME_TO_CHECKPOINT_", + "width": 6, + "position": "right", + "propertyType": "boolean", + "required": false, + "tooltip": "Add the current time to checkpoint path. Be careful if you need HA in your streaming process.", + "qa": "auto-delete-checkpoint" + }, { "propertyId": "stopGracefully", "propertyName": "_STOP_GRACEFULLY_", diff --git a/web/src/languages/en-US.json b/web/src/languages/en-US.json index 1d4ea3d40d..e51aa9f4dd 100644 --- a/web/src/languages/en-US.json +++ b/web/src/languages/en-US.json @@ -192,6 +192,7 @@ "_CHECKPOINT_PATH_": "Checkpoint path", "_TEMPORAL_TABLE_TRANSFORMATIONS_": "Transformations temporal table", "_AUTO_DELETE_CHECKPOINT_": "Auto-delete checkpoint", + "_ADD_TIME_TO_CHECKPOINT_": "Add time to checkpoint path", "_PERSIST_RAW_DATA_": "Persist raw data", "_MONITORING_LINK_": "Monitoring link", "_STOP_GRACEFULLY_": "Stop Gracefully", @@ -276,6 +277,7 @@ "_OUTPUT_ACTIONS_MENU_DOWNLOAD_": "Download", "_OUTPUT_ACTIONS_MENU_EDIT_": "Edit", "_OUTPUT_ACTIONS_MENU_STOP_": "Stop", + "_OUTPUT_ACTIONS_MENU_CHECKPOINT_": "Delete checkpoint", "_NAME_HELP_": "White spaces are not allowed", "_INPUT_CREATE_FIRST_ONE_": "Click here to create your first input", "_OUTPUT_CREATE_FIRST_ONE_": "Click here to create your first output", @@ -284,6 +286,7 @@ "_RUN_POLICY_KO_": "The policy '{{policyName}}' is already running! Please stop it and try again later.", "_STOP_POLICY_OK_": "The policy '{{policyName}}' is stopping!", "_STOP_POLICY_KO_": "The policy '{{policyName}}' is already stopped! Please run it and try again later.", + "_DELETE_CHECKPOINT_POLICY_OK_": "Checkpoint deleted from policy: {{policyName}}", "_REMOVE_MODEL_MESSAGE_": "If you remove this model, the following cubes will be removed: {{modelList}}", "_CONFIRM_": "Confirm", "_CLOSE_": "Close", diff --git a/web/src/scripts/controllers/policy-list.js b/web/src/scripts/controllers/policy-list.js index 3a2b85aae2..8320a711cf 100644 --- a/web/src/scripts/controllers/policy-list.js +++ b/web/src/scripts/controllers/policy-list.js @@ -35,6 +35,7 @@ vm.runPolicy = runPolicy; vm.stopPolicy = stopPolicy; vm.editPolicy = editPolicy; + vm.deleteCheckpoint = deleteCheckpoint; vm.deleteErrorMessage = deleteErrorMessage; vm.deleteSuccessMessage = deleteSuccessMessage; vm.downloadPolicy = downloadPolicy; @@ -150,6 +151,14 @@ } } + + function deleteCheckpoint(policyName){ + var deletePolicyCheckpoint = PolicyFactory.deletePolicyCheckpoint(policyName); + deletePolicyCheckpoint.then(function(response){ + vm.successMessage.text = $translate.instant('_DELETE_CHECKPOINT_POLICY_OK_', {policyName: policyName}); + }); + } + function deletePolicyConfirm(size, policy) { var controller = 'DeletePolicyModalCtrl'; var templateUrl = "templates/policies/st-delete-policy-modal.tpl.html"; diff --git a/web/src/scripts/factories/policy-factory.js b/web/src/scripts/factories/policy-factory.js index 19026ec6b9..99d9d97a0b 100644 --- a/web/src/scripts/factories/policy-factory.js +++ b/web/src/scripts/factories/policy-factory.js @@ -42,6 +42,9 @@ stopPolicy: function (policy) { return ApiPolicyService.stopPolicy().update(policy).$promise; }, + deletePolicyCheckpoint: function (policyName) { + return ApiPolicyService.deletePolicyCheckpoint().delete({'name': policyName}).$promise; + }, savePolicy: function (policyData) { return ApiPolicyService.savePolicy().put(policyData).$promise; }, diff --git a/web/src/scripts/services/api/api-policy-service.js b/web/src/scripts/services/api/api-policy-service.js index 1384dc1777..4e73e4b35c 100644 --- a/web/src/scripts/services/api/api-policy-service.js +++ b/web/src/scripts/services/api/api-policy-service.js @@ -30,6 +30,7 @@ vm.createPolicy = createPolicy; vm.deletePolicy = deletePolicy; vm.runPolicy = runPolicy; + vm.deletePolicyCheckpoint = deletePolicyCheckpoint; vm.getFakePolicy = getFakePolicy; vm.savePolicy = savePolicy; vm.stopPolicy = stopPolicy; @@ -86,6 +87,16 @@ }); } + function deletePolicyCheckpoint(){ + return $resource('/policy/checkpoint/:name', {name: '@name'}, + { + 'delete': { + method: 'DELETE', + timeout: apiConfigSettings.timeout + } + }); + } + function stopPolicy() { return $resource('/policyContext', {}, { diff --git a/web/src/views/policies.html b/web/src/views/policies.html index 928a32bd33..b08d423f03 100644 --- a/web/src/views/policies.html +++ b/web/src/views/policies.html @@ -112,6 +112,11 @@ data-qa="policy-context-menu-{{policyData.id}}-stop"> {{"_OUTPUT_ACTIONS_MENU_STOP_" | translate}} + + {{"_OUTPUT_ACTIONS_MENU_CHECKPOINT_" | translate}} +