diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/CURLBuilder.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/CURLBuilder.scala index e35d92d3b5..03bfb84c96 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/CURLBuilder.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/CURLBuilder.scala @@ -44,8 +44,7 @@ class CURLBuilder(val url: String) { headers.keySet.foreach(h => cURL.append(String.format("-H \'%s: %s\' \\\n", h, headers.get(h)))) formData.foreach(k => cURL.append(String.format("--data-urlencode \'%s=%s\' \\\n", k._1, k._2))) - cURL.append("-i") - cURL.toString + cURL.toString.trim.dropRight(1) } } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/WebUtils.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/WebUtils.java index 731d8e5dfb..9a2a4a8de7 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/WebUtils.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/WebUtils.java @@ -40,36 +40,6 @@ public final class WebUtils { private WebUtils() { } - /** - * token encrypt - * - * @param token token - * @return encrypt token - */ - public static String encryptToken(String token) { - try { - return EncryptUtils.encrypt(token); - } catch (Exception e) { - log.info("token encrypt failed: ", e); - return null; - } - } - - /** - * token decrypt - * - * @param encryptToken encryptToken - * @return decrypt token - */ - public static String decryptToken(String encryptToken) { - try { - return EncryptUtils.decrypt(encryptToken); - } catch (Exception e) { - log.info("token decrypt failed: ", e); - return null; - } - } - /** * camel to underscore * @@ -113,15 +83,8 @@ public static File getAppLibDir() { return getAppDir(LIB); } - public static File getAppPluginsDir() { - return getAppDir(PLUGINS); - } - public static File getAppClientDir() { return getAppDir(CLIENT); } - public static File getAppConfDir() { - return getAppDir(CONF); - } } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/annotation/AppUpdated.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/annotation/AppChangeEvent.java similarity index 88% rename from streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/annotation/AppUpdated.java rename to streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/annotation/AppChangeEvent.java index 698b54e629..8e741cd039 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/annotation/AppUpdated.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/annotation/AppChangeEvent.java @@ -17,7 +17,7 @@ package org.apache.streampark.console.core.annotation; -import org.apache.streampark.console.core.aspect.StreamParkAspect; +import org.apache.streampark.console.core.aspect.AppChangeEventAspect; import org.apache.streampark.console.core.watcher.FlinkAppHttpWatcher; import org.aspectj.lang.ProceedingJoinPoint; @@ -31,12 +31,12 @@ * In the controller({@link org.apache.streampark.console.core.controller}), If some method causes * application state update, need to add this annotation, This annotation marks which methods will * cause the application to be updated, Will work together with {@link - * StreamParkAspect#appUpdated(ProceedingJoinPoint)}, The final purpose will be refresh {@link + * AppChangeEventAspect#appChangeEvent(ProceedingJoinPoint)}, The final purpose will be refresh {@link * FlinkAppHttpWatcher#WATCHING_APPS}, Make the state of the job consistent with the database */ @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) -public @interface AppUpdated { +public @interface AppChangeEvent { boolean value() default true; } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/annotation/OpenAPI.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/annotation/OpenAPI.java index 8c039857ef..6fbe29d441 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/annotation/OpenAPI.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/annotation/OpenAPI.java @@ -45,6 +45,7 @@ String defaultValue() default ""; String bindFor() default ""; + } } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/aspect/AppChangeEventAspect.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/aspect/AppChangeEventAspect.java new file mode 100644 index 0000000000..ef8adce652 --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/aspect/AppChangeEventAspect.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.aspect; + +import org.apache.streampark.console.core.watcher.FlinkAppHttpWatcher; + +import lombok.extern.slf4j.Slf4j; +import org.aspectj.lang.ProceedingJoinPoint; +import org.aspectj.lang.annotation.Around; +import org.aspectj.lang.annotation.Aspect; +import org.aspectj.lang.annotation.Pointcut; +import org.aspectj.lang.reflect.MethodSignature; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +@Aspect +public class AppChangeEventAspect { + + @Autowired + private FlinkAppHttpWatcher flinkAppHttpWatcher; + + @Pointcut("@annotation(org.apache.streampark.console.core.annotation.AppChangeEvent)") + public void appChangeEventPointcut() { + } + + @Around("appChangeEventPointcut()") + public Object appChangeEvent(ProceedingJoinPoint joinPoint) throws Throwable { + MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature(); + log.debug("appUpdated aspect, method:{}", methodSignature.getName()); + Object target = joinPoint.proceed(); + flinkAppHttpWatcher.init(); + return target; + } + +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/aspect/OpenAPIAspect.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/aspect/OpenAPIAspect.java new file mode 100644 index 0000000000..2438802f0f --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/aspect/OpenAPIAspect.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.aspect; + +import org.apache.streampark.common.util.DateUtils; +import org.apache.streampark.common.util.ReflectUtils; +import org.apache.streampark.console.base.domain.RestResponse; +import org.apache.streampark.console.base.exception.ApiAlertException; +import org.apache.streampark.console.core.annotation.OpenAPI; +import org.apache.streampark.console.system.entity.AccessToken; + +import org.apache.commons.lang3.StringUtils; +import org.apache.shiro.SecurityUtils; + +import lombok.extern.slf4j.Slf4j; +import org.aspectj.lang.ProceedingJoinPoint; +import org.aspectj.lang.annotation.Around; +import org.aspectj.lang.annotation.Aspect; +import org.aspectj.lang.annotation.Pointcut; +import org.aspectj.lang.reflect.MethodSignature; +import org.springframework.stereotype.Component; +import org.springframework.web.context.request.RequestContextHolder; +import org.springframework.web.context.request.ServletRequestAttributes; + +import javax.servlet.http.HttpServletRequest; + +import java.lang.reflect.Field; +import java.util.Date; +import java.util.TimeZone; + +@Slf4j +@Component +@Aspect +public class OpenAPIAspect { + + @Pointcut("execution(public" + + " org.apache.streampark.console.base.domain.RestResponse" + + " org.apache.streampark.console.core.controller.*.*(..))") + public void openAPIPointcut() { + } + + @SuppressWarnings("checkstyle:SimplifyBooleanExpression") + @Around(value = "openAPIPointcut()") + public RestResponse openAPI(ProceedingJoinPoint joinPoint) throws Throwable { + MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature(); + log.debug("restResponse aspect, method:{}", methodSignature.getName()); + Boolean isApi = (Boolean) SecurityUtils.getSubject().getSession().getAttribute(AccessToken.IS_API_TOKEN); + if (isApi != null && isApi) { + HttpServletRequest request = + ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest(); + OpenAPI openAPI = methodSignature.getMethod().getAnnotation(OpenAPI.class); + if (openAPI == null) { + String url = request.getRequestURI(); + throw new ApiAlertException("openapi unsupported: " + url); + } else { + Object[] objects = joinPoint.getArgs(); + for (OpenAPI.Param param : openAPI.param()) { + String bingFor = param.bindFor(); + if (StringUtils.isNotBlank(bingFor)) { + String name = param.name(); + for (Object args : objects) { + Field bindForField = ReflectUtils.getField(args.getClass(), bingFor); + if (bindForField != null) { + Object value = request.getParameter(name); + bindForField.setAccessible(true); + if (value != null) { + if (param.type().equals(String.class)) { + bindForField.set(args, value.toString()); + } else if (param.type().equals(Boolean.class) + || param.type().equals(boolean.class)) { + bindForField.set(args, Boolean.parseBoolean(value.toString())); + } else if (param.type().equals(Integer.class) || param.type().equals(int.class)) { + bindForField.set(args, Integer.parseInt(value.toString())); + } else if (param.type().equals(Long.class) || param.type().equals(long.class)) { + bindForField.set(args, Long.parseLong(value.toString())); + } else if (param.type().equals(Date.class)) { + bindForField.set(args, DateUtils.parse(value.toString(), DateUtils.fullFormat(), + TimeZone.getDefault())); + } + } + } + } + } + } + } + } + return (RestResponse) joinPoint.proceed(); + } +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/aspect/StreamParkAspect.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/aspect/PermissionAspect.java similarity index 71% rename from streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/aspect/StreamParkAspect.java rename to streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/aspect/PermissionAspect.java index dda1626274..add5f5e012 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/aspect/StreamParkAspect.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/aspect/PermissionAspect.java @@ -19,20 +19,16 @@ import org.apache.streampark.console.base.domain.RestResponse; import org.apache.streampark.console.base.exception.ApiAlertException; -import org.apache.streampark.console.core.annotation.OpenAPI; import org.apache.streampark.console.core.annotation.Permission; import org.apache.streampark.console.core.entity.Application; import org.apache.streampark.console.core.enums.UserTypeEnum; import org.apache.streampark.console.core.service.application.ApplicationManageService; import org.apache.streampark.console.core.util.ServiceHelper; -import org.apache.streampark.console.core.watcher.FlinkAppHttpWatcher; -import org.apache.streampark.console.system.entity.AccessToken; import org.apache.streampark.console.system.entity.Member; import org.apache.streampark.console.system.entity.User; import org.apache.streampark.console.system.service.MemberService; import org.apache.commons.lang3.StringUtils; -import org.apache.shiro.SecurityUtils; import lombok.extern.slf4j.Slf4j; import org.aspectj.lang.ProceedingJoinPoint; @@ -47,18 +43,11 @@ import org.springframework.expression.spel.standard.SpelExpressionParser; import org.springframework.expression.spel.support.StandardEvaluationContext; import org.springframework.stereotype.Component; -import org.springframework.web.context.request.RequestContextHolder; -import org.springframework.web.context.request.ServletRequestAttributes; - -import javax.servlet.http.HttpServletRequest; @Slf4j @Component @Aspect -public class StreamParkAspect { - - @Autowired - private FlinkAppHttpWatcher flinkAppHttpWatcher; +public class PermissionAspect { @Autowired private MemberService memberService; @@ -66,48 +55,11 @@ public class StreamParkAspect { @Autowired private ApplicationManageService applicationManageService; - @Pointcut("execution(public" - + " org.apache.streampark.console.base.domain.RestResponse" - + " org.apache.streampark.console.core.controller.*.*(..))") - public void openAPI() { - } - - @SuppressWarnings("checkstyle:SimplifyBooleanExpression") - @Around(value = "openAPI()") - public RestResponse openAPI(ProceedingJoinPoint joinPoint) throws Throwable { - MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature(); - log.debug("restResponse aspect, method:{}", methodSignature.getName()); - Boolean isApi = (Boolean) SecurityUtils.getSubject().getSession().getAttribute(AccessToken.IS_API_TOKEN); - if (isApi != null && isApi) { - OpenAPI openAPI = methodSignature.getMethod().getAnnotation(OpenAPI.class); - if (openAPI == null) { - HttpServletRequest request = - ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest(); - String url = request.getRequestURI(); - throw new ApiAlertException("openapi unsupported: " + url); - } - } - return (RestResponse) joinPoint.proceed(); - } - - @Pointcut("@annotation(org.apache.streampark.console.core.annotation.AppUpdated)") - public void appUpdated() { - } - - @Around("appUpdated()") - public Object appUpdated(ProceedingJoinPoint joinPoint) throws Throwable { - MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature(); - log.debug("appUpdated aspect, method:{}", methodSignature.getName()); - Object target = joinPoint.proceed(); - flinkAppHttpWatcher.init(); - return target; - } - @Pointcut("@annotation(org.apache.streampark.console.core.annotation.Permission)") - public void permissionAction() { + public void permissionPointcut() { } - @Around("permissionAction()") + @Around("permissionPointcut()") public RestResponse permissionAction(ProceedingJoinPoint joinPoint) throws Throwable { MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature(); Permission permission = methodSignature.getMethod().getAnnotation(Permission.class); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/component/FlinkCheckpointProcessor.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/component/FlinkCheckpointProcessor.java index 8bad65b903..5363c2ede1 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/component/FlinkCheckpointProcessor.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/component/FlinkCheckpointProcessor.java @@ -19,11 +19,11 @@ import org.apache.streampark.common.util.AssertUtils; import org.apache.streampark.console.core.entity.Application; -import org.apache.streampark.console.core.entity.SavePoint; +import org.apache.streampark.console.core.entity.Savepoint; import org.apache.streampark.console.core.enums.CheckPointStatusEnum; import org.apache.streampark.console.core.enums.FailoverStrategyEnum; import org.apache.streampark.console.core.metrics.flink.CheckPoints; -import org.apache.streampark.console.core.service.SavePointService; +import org.apache.streampark.console.core.service.SavepointService; import org.apache.streampark.console.core.service.alert.AlertService; import org.apache.streampark.console.core.service.application.ApplicationActionService; import org.apache.streampark.console.core.utils.AlertTemplateUtils; @@ -73,7 +73,7 @@ public class FlinkCheckpointProcessor { private AlertService alertService; @Autowired - private SavePointService savePointService; + private SavepointService savepointService; @Autowired private FlinkAppHttpWatcher flinkAppHttpWatcher; @@ -172,8 +172,8 @@ private Long getLatestCheckpointedId(Long appId, String cacheId) { return checkPointCache.get( cacheId, key -> { - SavePoint savePoint = savePointService.getLatest(appId); - return Optional.ofNullable(savePoint).map(SavePoint::getChkId).orElse(null); + Savepoint savepoint = savepointService.getLatest(appId); + return Optional.ofNullable(savepoint).map(Savepoint::getChkId).orElse(null); }); } @@ -184,15 +184,15 @@ private boolean shouldProcessFailedTrigger( } private void saveSavepoint(CheckPoints.CheckPoint checkPoint, Long appId) { - SavePoint savePoint = new SavePoint(); - savePoint.setAppId(appId); - savePoint.setChkId(checkPoint.getId()); - savePoint.setLatest(true); - savePoint.setType(checkPoint.getCheckPointType().get()); - savePoint.setPath(checkPoint.getExternalPath()); - savePoint.setTriggerTime(new Date(checkPoint.getTriggerTimestamp())); - savePoint.setCreateTime(new Date()); - savePointService.save(savePoint); + Savepoint savepoint = new Savepoint(); + savepoint.setAppId(appId); + savepoint.setChkId(checkPoint.getId()); + savepoint.setLatest(true); + savepoint.setType(checkPoint.getCheckPointType().get()); + savepoint.setPath(checkPoint.getExternalPath()); + savepoint.setTriggerTime(new Date(checkPoint.getTriggerTimestamp())); + savepoint.setCreateTime(new Date()); + savepointService.save(savepoint); } public static class Counter { diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/component/OpenAPIComponent.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/component/OpenAPIComponent.java index b9507ebb46..75f43a7afd 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/component/OpenAPIComponent.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/component/OpenAPIComponent.java @@ -67,7 +67,7 @@ public synchronized OpenAPISchema getOpenAPISchema(String name) { return schemas.get(name); } - public String getOpenApiCUrl(String baseUrl, Long appId, Long teamId, String name) { + public String getOpenApiCUrl(String name, String baseUrl, Long appId, Long teamId) { OpenAPISchema schema = this.getOpenAPISchema(name); if (schema == null) { throw new UnsupportedOperationException("Unsupported OpenAPI: " + name); @@ -86,10 +86,15 @@ public String getOpenApiCUrl(String baseUrl, Long appId, Long teamId, String nam schema.getSchema().forEach(c -> { if (c.isRequired()) { - if ("appId".equals(c.getBindFor())) { - curlBuilder.addFormData(c.getName(), appId); - } else if ("teamId".equals(c.getBindFor())) { - curlBuilder.addFormData(c.getName(), teamId); + switch (c.getBindFor()) { + case "appId": + curlBuilder.addFormData(c.getName(), appId); + break; + case "teamId": + curlBuilder.addFormData(c.getName(), teamId); + break; + default: + break; } } else { curlBuilder.addFormData(c.getName(), c.getDefaultValue()); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java index 722b34d8c5..6159063e70 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java @@ -22,7 +22,7 @@ import org.apache.streampark.console.base.domain.RestRequest; import org.apache.streampark.console.base.domain.RestResponse; import org.apache.streampark.console.base.exception.InternalException; -import org.apache.streampark.console.core.annotation.AppUpdated; +import org.apache.streampark.console.core.annotation.AppChangeEvent; import org.apache.streampark.console.core.annotation.Permission; import org.apache.streampark.console.core.entity.Application; import org.apache.streampark.console.core.entity.ApplicationBackUp; @@ -100,7 +100,7 @@ public RestResponse copy(Application app) throws IOException { return RestResponse.success(); } - @AppUpdated + @AppChangeEvent @Permission(app = "#app.id") @PostMapping("update") @RequiresPermissions("app:update") @@ -124,7 +124,7 @@ public RestResponse list(Application app, RestRequest request) { return RestResponse.success(applicationList); } - @AppUpdated + @AppChangeEvent @PostMapping("mapping") @Permission(app = "#app.id") @RequiresPermissions("app:mapping") @@ -133,7 +133,7 @@ public RestResponse mapping(Application app) { return RestResponse.success(flag); } - @AppUpdated + @AppChangeEvent @Permission(app = "#app.id") @PostMapping("revoke") @RequiresPermissions("app:release") diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/OpenAPIController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/OpenAPIController.java index 75954f041a..0959004a0d 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/OpenAPIController.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/OpenAPIController.java @@ -52,9 +52,10 @@ public class OpenAPIController { }, param = { @OpenAPI.Param(name = "id", description = "current flink application id", required = true, type = Long.class, bindFor = "appId"), @OpenAPI.Param(name = "teamId", description = "current user teamId", required = true, type = Long.class), - @OpenAPI.Param(name = "savePointed", description = "restored app from the savepoint or latest checkpoint", required = false, type = String.class, defaultValue = "false"), - @OpenAPI.Param(name = "savePoint", description = "savepoint or checkpoint path", required = false, type = String.class), - @OpenAPI.Param(name = "allowNonRestored", description = "ignore savepoint if cannot be restored", required = false, type = boolean.class, defaultValue = "false") + @OpenAPI.Param(name = "argument", description = "flink program run arguments", required = false, type = String.class, bindFor = "args"), + @OpenAPI.Param(name = "restoreFromSavepoint", description = "restored app from the savepoint or checkpoint", required = false, type = Boolean.class, defaultValue = "false", bindFor = "restoreOrTriggerSavepoint"), + @OpenAPI.Param(name = "savepointPath", description = "savepoint or checkpoint path", required = false, type = String.class), + @OpenAPI.Param(name = "allowNonRestored", description = "ignore savepoint if cannot be restored", required = false, type = Boolean.class, defaultValue = "false"), }) @Permission(app = "#app.appId", team = "#app.teamId") @PostMapping("app/start") @@ -69,9 +70,9 @@ public RestResponse flinkStart(Application app) throws Exception { }, param = { @OpenAPI.Param(name = "id", description = "current flink application id", required = true, type = Long.class, bindFor = "appId"), @OpenAPI.Param(name = "teamId", description = "current user teamId", required = true, type = Long.class), - @OpenAPI.Param(name = "savePointed", description = "trigger savepoint before taking stopping", required = false, type = boolean.class, defaultValue = "false"), - @OpenAPI.Param(name = "savePoint", description = "savepoint path", required = false, type = String.class), - @OpenAPI.Param(name = "drain", description = "send max watermark before canceling", required = false, type = boolean.class, defaultValue = "false"), + @OpenAPI.Param(name = "triggerSavepoint", description = "trigger savepoint before taking stopping", required = false, type = Boolean.class, defaultValue = "false", bindFor = "restoreOrTriggerSavepoint"), + @OpenAPI.Param(name = "savepointPath", description = "savepoint path", required = false, type = String.class), + @OpenAPI.Param(name = "drain", description = "send max watermark before canceling", required = false, type = Boolean.class, defaultValue = "false"), }) @Permission(app = "#app.appId", team = "#app.teamId") @PostMapping("app/cancel") @@ -82,11 +83,11 @@ public RestResponse flinkCancel(Application app) throws Exception { } @PostMapping("curl") - public RestResponse copyOpenApiCurl(String baseUrl, - Long appId, - @NotNull(message = "{required}") Long teamId, - @NotBlank(message = "{required}") String name) { - String url = openAPIComponent.getOpenApiCUrl(baseUrl, appId, teamId, name); + public RestResponse copyOpenApiCurl(String name, + String baseUrl, + @NotNull Long appId, + @NotNull Long teamId) { + String url = openAPIComponent.getOpenApiCUrl(name, baseUrl, appId, teamId); return RestResponse.success(url); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ProjectController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ProjectController.java index 72b1df4c35..50b33a7a16 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ProjectController.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ProjectController.java @@ -20,7 +20,7 @@ import org.apache.streampark.console.base.domain.RestRequest; import org.apache.streampark.console.base.domain.RestResponse; import org.apache.streampark.console.base.exception.ApiAlertException; -import org.apache.streampark.console.core.annotation.AppUpdated; +import org.apache.streampark.console.core.annotation.AppChangeEvent; import org.apache.streampark.console.core.annotation.Permission; import org.apache.streampark.console.core.entity.Project; import org.apache.streampark.console.core.enums.GitAuthorizedErrorEnum; @@ -59,7 +59,7 @@ public RestResponse create(Project project) { return projectService.create(project); } - @AppUpdated + @AppChangeEvent @PostMapping("update") @RequiresPermissions("project:update") @Permission(team = "#project.teamId") diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SavePointController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SavepointController.java similarity index 79% rename from streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SavePointController.java rename to streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SavepointController.java index 21d3862f95..85902e3c36 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SavePointController.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SavepointController.java @@ -22,8 +22,8 @@ import org.apache.streampark.console.base.exception.InternalException; import org.apache.streampark.console.core.annotation.Permission; import org.apache.streampark.console.core.entity.Application; -import org.apache.streampark.console.core.entity.SavePoint; -import org.apache.streampark.console.core.service.SavePointService; +import org.apache.streampark.console.core.entity.Savepoint; +import org.apache.streampark.console.core.service.SavepointService; import org.apache.streampark.console.core.service.application.ApplicationManageService; import org.apache.shiro.authz.annotation.RequiresPermissions; @@ -42,37 +42,37 @@ @Validated @RestController @RequestMapping("flink/savepoint") -public class SavePointController { +public class SavepointController { @Autowired private ApplicationManageService applicationManageService; @Autowired - private SavePointService savePointService; + private SavepointService savepointService; @PostMapping("history") @Permission(app = "#sp.appId", team = "#sp.teamId") - public RestResponse history(SavePoint sp, RestRequest request) { - IPage page = savePointService.getPage(sp, request); + public RestResponse history(Savepoint sp, RestRequest request) { + IPage page = savepointService.getPage(sp, request); return RestResponse.success(page); } @PostMapping("delete") @RequiresPermissions("savepoint:delete") @Permission(app = "#sp.appId", team = "#sp.teamId") - public RestResponse delete(SavePoint sp) throws InternalException { - SavePoint savePoint = savePointService.getById(sp.getId()); - Application application = applicationManageService.getById(savePoint.getAppId()); - Boolean deleted = savePointService.remove(sp.getId(), application); + public RestResponse delete(Savepoint sp) throws InternalException { + Savepoint savepoint = savepointService.getById(sp.getId()); + Application application = applicationManageService.getById(savepoint.getAppId()); + Boolean deleted = savepointService.remove(sp.getId(), application); return RestResponse.success(deleted); } @PostMapping("trigger") - @Permission(app = "#savePoint.appId", team = "#savePoint.teamId") + @Permission(app = "#savepoint.appId", team = "#savepoint.teamId") @RequiresPermissions("savepoint:trigger") public RestResponse trigger( Long appId, @Nullable String savepointPath, @Nullable Boolean nativeFormat) { - savePointService.trigger(appId, savepointPath, nativeFormat); + savepointService.trigger(appId, savepointPath, nativeFormat); return RestResponse.success(true); } } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkApplicationController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkApplicationController.java index 5f2165b4ad..861779ca3a 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkApplicationController.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkApplicationController.java @@ -22,7 +22,7 @@ import org.apache.streampark.console.base.domain.RestRequest; import org.apache.streampark.console.base.domain.RestResponse; import org.apache.streampark.console.base.exception.InternalException; -import org.apache.streampark.console.core.annotation.AppUpdated; +import org.apache.streampark.console.core.annotation.AppChangeEvent; import org.apache.streampark.console.core.entity.ApplicationBackUp; import org.apache.streampark.console.core.entity.SparkApplication; import org.apache.streampark.console.core.entity.SparkApplicationLog; @@ -96,7 +96,7 @@ public RestResponse copy(SparkApplication app) throws IOException { return RestResponse.success(); } - @AppUpdated + @AppChangeEvent @PostMapping("update") @RequiresPermissions("app:update") public RestResponse update(SparkApplication app) { @@ -117,7 +117,7 @@ public RestResponse list(SparkApplication app, RestRequest request) { return RestResponse.success(applicationList); } - @AppUpdated + @AppChangeEvent @PostMapping("mapping") @RequiresPermissions("app:mapping") public RestResponse mapping(SparkApplication app) { @@ -125,7 +125,7 @@ public RestResponse mapping(SparkApplication app) { return RestResponse.success(flag); } - @AppUpdated + @AppChangeEvent @PostMapping("revoke") @RequiresPermissions("app:release") public RestResponse revoke(SparkApplication app) { @@ -158,7 +158,7 @@ public RestResponse stop(SparkApplication app) throws Exception { return RestResponse.success(); } - @AppUpdated + @AppChangeEvent @PostMapping("clean") @RequiresPermissions("app:clean") public RestResponse clean(SparkApplication app) { diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java index d9d7212625..f277d734ec 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java @@ -238,11 +238,11 @@ public class Application implements Serializable { private transient String flinkVersion; private transient String confPath; private transient Integer format; - private transient String savePoint; - private transient Boolean savePointed = false; + private transient String savepointPath; + private transient Boolean restoreOrTriggerSavepoint = false; private transient Boolean drain = false; private transient Boolean nativeFormat = false; - private transient Long savePointTimeout = 60L; + private transient Long savepointTimeout = 60L; private transient Boolean allowNonRestored = false; private transient Integer restoreMode; private transient String socketId; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SavePoint.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Savepoint.java similarity index 96% rename from streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SavePoint.java rename to streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Savepoint.java index 34ad48ad1c..4b78af0e63 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SavePoint.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Savepoint.java @@ -28,7 +28,7 @@ @Data @TableName("t_flink_savepoint") @Slf4j -public class SavePoint { +public class Savepoint { @TableId(type = IdType.AUTO) private Long id; @@ -41,7 +41,7 @@ public class SavePoint { /** * 1) checkPoint
- * 2) savePoint + * 2) savepoint */ private Integer type; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java index 0ed01fcd46..8dad101fbc 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java @@ -229,11 +229,11 @@ public class SparkApplication extends BaseEntity { private transient String sparkVersion; private transient String confPath; private transient Integer format; - private transient String savePoint; - private transient Boolean savePointed = false; + private transient String savepointPath; + private transient Boolean restoreOrTriggerSavepoint = false; private transient Boolean drain = false; private transient Boolean nativeFormat = false; - private transient Long savePointTimeout = 60L; + private transient Long savepointTimeout = 60L; private transient Boolean allowNonRestored = false; private transient Integer restoreMode; private transient String socketId; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SavePointMapper.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SavepointMapper.java similarity index 88% rename from streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SavePointMapper.java rename to streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SavepointMapper.java index 53714e9c81..7040ec5746 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SavePointMapper.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SavepointMapper.java @@ -17,9 +17,9 @@ package org.apache.streampark.console.core.mapper; -import org.apache.streampark.console.core.entity.SavePoint; +import org.apache.streampark.console.core.entity.Savepoint; import com.baomidou.mybatisplus.core.mapper.BaseMapper; -public interface SavePointMapper extends BaseMapper { +public interface SavepointMapper extends BaseMapper { } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SavePointService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SavepointService.java similarity index 92% rename from streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SavePointService.java rename to streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SavepointService.java index 3982d6dbea..d7597817a6 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SavePointService.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SavepointService.java @@ -20,14 +20,14 @@ import org.apache.streampark.console.base.domain.RestRequest; import org.apache.streampark.console.base.exception.InternalException; import org.apache.streampark.console.core.entity.Application; -import org.apache.streampark.console.core.entity.SavePoint; +import org.apache.streampark.console.core.entity.Savepoint; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.service.IService; import javax.annotation.Nullable; -public interface SavePointService extends IService { +public interface SavepointService extends IService { /** * Expires all savepoints for the specified application. @@ -42,7 +42,7 @@ public interface SavePointService extends IService { * @param id the unique identifier of the SavePoint * @return the latest SavePoint object, or null if not found */ - SavePoint getLatest(Long id); + Savepoint getLatest(Long id); /** * Triggers a savepoint for the specified application. @@ -67,11 +67,11 @@ public interface SavePointService extends IService { /** * Retrieves a page of savepoint objects based on the specified parameters. * - * @param savePoint The SavePoint object to be used for filtering the page results. + * @param savepoint The SavePoint object to be used for filtering the page results. * @param request The RestRequest object containing additional request parameters. * @return An instance of IPage representing the page of SavePoint objects. */ - IPage getPage(SavePoint savePoint, RestRequest request); + IPage getPage(Savepoint savepoint, RestRequest request); /** * Removes all savepoints for the specified application. diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java index 525166be7f..e19976dd4d 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java @@ -45,7 +45,7 @@ import org.apache.streampark.console.core.entity.FlinkEnv; import org.apache.streampark.console.core.entity.FlinkSql; import org.apache.streampark.console.core.entity.Resource; -import org.apache.streampark.console.core.entity.SavePoint; +import org.apache.streampark.console.core.entity.Savepoint; import org.apache.streampark.console.core.enums.CheckPointTypeEnum; import org.apache.streampark.console.core.enums.ConfigFileTypeEnum; import org.apache.streampark.console.core.enums.FlinkAppStateEnum; @@ -61,7 +61,7 @@ import org.apache.streampark.console.core.service.FlinkEnvService; import org.apache.streampark.console.core.service.FlinkSqlService; import org.apache.streampark.console.core.service.ResourceService; -import org.apache.streampark.console.core.service.SavePointService; +import org.apache.streampark.console.core.service.SavepointService; import org.apache.streampark.console.core.service.SettingService; import org.apache.streampark.console.core.service.VariableService; import org.apache.streampark.console.core.service.application.ApplicationActionService; @@ -155,7 +155,7 @@ public class ApplicationActionServiceImpl extends ServiceImpl clusterIdNamespace = getNamespaceClusterId(application); String k8sNamespace = clusterIdNamespace.t1; String k8sClusterId = clusterIdNamespace.t2; FlinkK8sRestExposedType exposedType = clusterIdNamespace.t3; + String dynamicProperties = + StringUtils.isBlank(appParam.getDynamicProperties()) ? application.getDynamicProperties() + : appParam.getDynamicProperties(); + SubmitRequest submitRequest = new SubmitRequest( flinkEnv.getFlinkVersion(), FlinkExecutionMode.of(application.getExecutionMode()), - getProperties(application), + getProperties(application, dynamicProperties), flinkEnv.getFlinkConf(), FlinkDevelopmentMode.of(application.getJobType()), application.getId(), @@ -449,7 +454,7 @@ public void start(Application appParam, boolean auto) throws Exception { application.getJobName(), appConf, application.getApplicationType(), - getSavePointed(appParam), + getSavepointPath(appParam), FlinkRestoreMode.of(appParam.getRestoreMode()), applicationArgs, k8sClusterId, @@ -713,7 +718,7 @@ private Tuple2 getUserJarAndAppConf(FlinkEnv flinkEnv, Applicati return Tuple2.of(flinkUserJar, appConf); } - private Map getProperties(Application application) { + private Map getProperties(Application application, String runtimeProperties) { Map properties = new HashMap<>(application.getOptionMap()); if (FlinkExecutionMode.isRemoteMode(application.getFlinkExecutionMode())) { FlinkCluster cluster = flinkClusterService.getById(application.getFlinkClusterId()); @@ -756,8 +761,7 @@ private Map getProperties(Application application) { properties.put(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key(), true); } - Map dynamicProperties = PropertiesUtils - .extractDynamicPropertiesAsJava(application.getDynamicProperties()); + Map dynamicProperties = PropertiesUtils.extractDynamicPropertiesAsJava(runtimeProperties); properties.putAll(dynamicProperties); ResolveOrder resolveOrder = ResolveOrder.of(application.getResolveOrder()); if (resolveOrder != null) { @@ -773,7 +777,7 @@ private void doAbort(Long id) { application.setState(FlinkAppStateEnum.CANCELED.getValue()); application.setOptionTime(new Date()); updateById(application); - savePointService.expire(application.getId()); + savepointService.expire(application.getId()); // re-tracking flink job on kubernetes and logging exception if (application.isKubernetesModeJob()) { TrackId trackId = k8sWatcherWrapper.toTrackId(application); @@ -797,15 +801,15 @@ private void doAbort(Long id) { } } - private String getSavePointed(Application appParam) { - if (appParam.getSavePointed()) { - if (StringUtils.isBlank(appParam.getSavePoint())) { - SavePoint savePoint = savePointService.getLatest(appParam.getId()); - if (savePoint != null) { - return savePoint.getPath(); + private String getSavepointPath(Application appParam) { + if (appParam.getRestoreOrTriggerSavepoint()) { + if (StringUtils.isBlank(appParam.getSavepointPath())) { + Savepoint savepoint = savepointService.getLatest(appParam.getId()); + if (savepoint != null) { + return savepoint.getPath(); } } else { - return appParam.getSavePoint(); + return appParam.getSavepointPath(); } } return null; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java index 6712318b6b..4b87659f53 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java @@ -41,7 +41,7 @@ import org.apache.streampark.console.core.runner.EnvInitializer; import org.apache.streampark.console.core.service.FlinkClusterService; import org.apache.streampark.console.core.service.FlinkEnvService; -import org.apache.streampark.console.core.service.SavePointService; +import org.apache.streampark.console.core.service.SavepointService; import org.apache.streampark.console.core.service.application.ApplicationInfoService; import org.apache.streampark.console.core.watcher.FlinkAppHttpWatcher; import org.apache.streampark.console.core.watcher.FlinkClusterWatcher; @@ -102,7 +102,7 @@ public class ApplicationInfoServiceImpl extends ServiceImpl +public class SavepointServiceImpl extends ServiceImpl implements - SavePointService { + SavepointService { @Autowired private FlinkEnvService flinkEnvService; @@ -114,24 +114,24 @@ public class SavePointServiceImpl extends ServiceImpl queryWrapper = new LambdaQueryWrapper().eq(SavePoint::getAppId, appId); - this.update(savePoint, queryWrapper); + Savepoint savepoint = new Savepoint(); + savepoint.setLatest(false); + LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper().eq(Savepoint::getAppId, appId); + this.update(savepoint, queryWrapper); } @Override - public boolean save(SavePoint entity) { + public boolean save(Savepoint entity) { this.expire(entity); this.expire(entity.getAppId()); return super.save(entity); } @Override - public SavePoint getLatest(Long id) { - LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper() - .eq(SavePoint::getAppId, id) - .eq(SavePoint::getLatest, true); + public Savepoint getLatest(Long id) { + LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper() + .eq(Savepoint::getAppId, id) + .eq(Savepoint::getLatest, true); return this.getOne(queryWrapper); } @@ -199,10 +199,10 @@ private ApplicationLog getApplicationLog(Application application) { @Override public Boolean remove(Long id, Application appParam) throws InternalException { - SavePoint savePoint = getById(id); + Savepoint savepoint = getById(id); try { - if (StringUtils.isNotBlank(savePoint.getPath())) { - appParam.getFsOperator().delete(savePoint.getPath()); + if (StringUtils.isNotBlank(savepoint.getPath())) { + appParam.getFsOperator().delete(savepoint.getPath()); } return removeById(id); } catch (Exception e) { @@ -211,11 +211,11 @@ public Boolean remove(Long id, Application appParam) throws InternalException { } @Override - public IPage getPage(SavePoint savePoint, RestRequest request) { + public IPage getPage(Savepoint savepoint, RestRequest request) { request.setSortField("trigger_time"); - Page page = MybatisPager.getPage(request); - LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper().eq(SavePoint::getAppId, - savePoint.getAppId()); + Page page = MybatisPager.getPage(request); + LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper().eq(Savepoint::getAppId, + savepoint.getAppId()); return this.page(page, queryWrapper); } @@ -223,7 +223,7 @@ public IPage getPage(SavePoint savePoint, RestRequest request) { public void remove(Application appParam) { Long appId = appParam.getId(); - LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper().eq(SavePoint::getAppId, appId); + LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper().eq(Savepoint::getAppId, appId); this.remove(queryWrapper); try { @@ -246,10 +246,10 @@ private void handleSavepointResponseFuture( 10L, TimeUnit.MINUTES, savepointResponse -> { - if (savepointResponse != null && savepointResponse.savePointDir() != null) { + if (savepointResponse != null && savepointResponse.savepointDir() != null) { applicationLog.setSuccess(true); - String savePointDir = savepointResponse.savePointDir(); - log.info("Request savepoint successful, savepointDir: {}", savePointDir); + String savepointDir = savepointResponse.savepointDir(); + log.info("Request savepoint successful, savepointDir: {}", savepointDir); } }, e -> { @@ -437,7 +437,7 @@ private int getChkNumRetainedFromFlinkEnv( return MAX_RETAINED_CHECKPOINTS.defaultValue(); } - private void expire(SavePoint entity) { + private void expire(Savepoint entity) { FlinkEnv flinkEnv = flinkEnvService.getByAppId(entity.getAppId()); Application application = applicationManageService.getById(entity.getAppId()); AssertUtils.notNull(flinkEnv); @@ -448,29 +448,29 @@ private void expire(SavePoint entity) { cpThreshold = CHECKPOINT == CheckPointTypeEnum.of(entity.getType()) ? cpThreshold - 1 : cpThreshold; if (cpThreshold == 0) { - LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper() - .eq(SavePoint::getAppId, entity.getAppId()) - .eq(SavePoint::getType, CHECKPOINT.get()); + LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper() + .eq(Savepoint::getAppId, entity.getAppId()) + .eq(Savepoint::getType, CHECKPOINT.get()); this.remove(queryWrapper); return; } - LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper() - .select(SavePoint::getTriggerTime) - .eq(SavePoint::getAppId, entity.getAppId()) - .eq(SavePoint::getType, CHECKPOINT.get()) - .orderByDesc(SavePoint::getTriggerTime); + LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper() + .select(Savepoint::getTriggerTime) + .eq(Savepoint::getAppId, entity.getAppId()) + .eq(Savepoint::getType, CHECKPOINT.get()) + .orderByDesc(Savepoint::getTriggerTime); - Page savePointPage = this.baseMapper.selectPage(new Page<>(1, cpThreshold + 1), queryWrapper); - if (CollectionUtils.isEmpty(savePointPage.getRecords()) - || savePointPage.getRecords().size() <= cpThreshold) { + Page savepointPage = this.baseMapper.selectPage(new Page<>(1, cpThreshold + 1), queryWrapper); + if (CollectionUtils.isEmpty(savepointPage.getRecords()) + || savepointPage.getRecords().size() <= cpThreshold) { return; } - SavePoint savePoint = savePointPage.getRecords().get(cpThreshold - 1); - LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper() - .eq(SavePoint::getAppId, entity.getAppId()) - .eq(SavePoint::getType, CHECKPOINT.get()) - .lt(SavePoint::getTriggerTime, savePoint.getTriggerTime()); + Savepoint savepoint = savepointPage.getRecords().get(cpThreshold - 1); + LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper() + .eq(Savepoint::getAppId, entity.getAppId()) + .eq(Savepoint::getType, CHECKPOINT.get()) + .lt(Savepoint::getTriggerTime, savepoint.getTriggerTime()); this.remove(lambdaQueryWrapper); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java index 9e7f9a941a..60b2135356 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java @@ -36,7 +36,7 @@ import org.apache.streampark.console.core.metrics.flink.Overview; import org.apache.streampark.console.core.metrics.yarn.YarnAppInfo; import org.apache.streampark.console.core.service.FlinkClusterService; -import org.apache.streampark.console.core.service.SavePointService; +import org.apache.streampark.console.core.service.SavepointService; import org.apache.streampark.console.core.service.alert.AlertService; import org.apache.streampark.console.core.service.application.ApplicationActionService; import org.apache.streampark.console.core.service.application.ApplicationInfoService; @@ -96,7 +96,7 @@ public class FlinkAppHttpWatcher { private FlinkClusterService flinkClusterService; @Autowired - private SavePointService savePointService; + private SavepointService savepointService; // track interval every 5 seconds public static final Duration WATCHING_INTERVAL = Duration.ofSeconds(5); @@ -248,13 +248,13 @@ private void doStateFailed(Application application) { // non-mapping if (application.getState() != FlinkAppStateEnum.MAPPING.getValue()) { log.error( - "[StreamPark][FlinkAppHttpWatcher] getFromFlinkRestApi and getFromYarnRestApi error,job failed,savePoint expired!"); + "[StreamPark][FlinkAppHttpWatcher] getFromFlinkRestApi and getFromYarnRestApi error,job failed,savepoint expired!"); if (StopFromEnum.NONE.equals(stopFrom)) { Date lostTime = LOST_CACHE.getIfPresent(application.getId()); if (lostTime == null) { LOST_CACHE.put(application.getId(), new Date()); } else if (DateUtils.toSecondDuration(lostTime, new Date()) >= 30) { - savePointService.expire(application.getId()); + savepointService.expire(application.getId()); application.setState(FlinkAppStateEnum.LOST.getValue()); WATCHING_APPS.remove(application.getId()); LOST_CACHE.invalidate(application.getId()); @@ -329,7 +329,7 @@ private void getStateFromFlink(Application application) throws Exception { } catch (Exception e) { log.error("get flink jobOverview error: {}", e.getMessage(), e); } - // 3) savePoint obsolete check and NEED_START check + // 3) savepoint obsolete check and NEED_START check OptionStateEnum optionState = OPTIONING.get(application.getId()); if (currentState.equals(FlinkAppStateEnum.RUNNING)) { handleRunningState(application, optionState, currentState); @@ -424,7 +424,7 @@ private void handleRunningState( } } - // The current state is running, and there is a current task in the savePointCache, + // The current state is running, and there is a current task in the savepointCache, // indicating that the task is doing savepoint if (SAVEPOINT_CACHE.getIfPresent(appId) != null) { application.setOptionState(OptionStateEnum.SAVEPOINTING.getValue()); @@ -491,8 +491,8 @@ private void handleNotRunState( if (StopFromEnum.NONE.equals(stopFrom) || applicationInfoService.checkAlter(application)) { if (StopFromEnum.NONE.equals(stopFrom)) { log.info( - "[StreamPark][FlinkAppHttpWatcher] getFromFlinkRestApi, job cancel is not form StreamPark,savePoint expired!"); - savePointService.expire(application.getId()); + "[StreamPark][FlinkAppHttpWatcher] getFromFlinkRestApi, job cancel is not form StreamPark,savepoint expired!"); + savepointService.expire(application.getId()); } stopCanceledJob(application.getId()); doAlert(application, FlinkAppStateEnum.CANCELED); @@ -551,8 +551,8 @@ private void getStateFromYarn(Application application) throws Exception { } finally { if (StopFromEnum.NONE.equals(stopFrom)) { log.error( - "[StreamPark][FlinkAppHttpWatcher] query previous state was canceling and stopFrom NotFound,savePoint expired!"); - savePointService.expire(application.getId()); + "[StreamPark][FlinkAppHttpWatcher] query previous state was canceling and stopFrom NotFound,savepoint expired!"); + savepointService.expire(application.getId()); if (flinkAppState == FlinkAppStateEnum.KILLED || flinkAppState == FlinkAppStateEnum.FAILED) { doAlert(application, flinkAppState); @@ -582,8 +582,8 @@ private void getStateFromYarn(Application application) throws Exception { if (FlinkAppStateEnum.KILLED.equals(flinkAppState)) { if (StopFromEnum.NONE.equals(stopFrom)) { log.error( - "[StreamPark][FlinkAppHttpWatcher] getFromYarnRestApi,job was killed and stopFrom NotFound,savePoint expired!"); - savePointService.expire(application.getId()); + "[StreamPark][FlinkAppHttpWatcher] getFromYarnRestApi,job was killed and stopFrom NotFound,savepoint expired!"); + savepointService.expire(application.getId()); } flinkAppState = FlinkAppStateEnum.CANCELED; cleanSavepoint(application); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTFilter.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTFilter.java index 99b8b3cee0..d7e81e5734 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTFilter.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTFilter.java @@ -17,8 +17,7 @@ package org.apache.streampark.console.system.authentication; -import org.apache.streampark.console.base.util.WebUtils; -import org.apache.streampark.console.core.enums.AuthenticationType; +import org.apache.streampark.console.base.util.EncryptUtils; import org.apache.shiro.authz.UnauthorizedException; import org.apache.shiro.web.filter.authc.BasicHttpAuthenticationFilter; @@ -58,20 +57,14 @@ protected boolean isLoginAttempt(ServletRequest request, ServletResponse respons protected boolean executeLogin(ServletRequest request, ServletResponse response) { HttpServletRequest httpServletRequest = (HttpServletRequest) request; String token = httpServletRequest.getHeader(TOKEN); - AuthenticationType type = JWTUtil.getAuthType(WebUtils.decryptToken(token)); - if (type == null) { + try { + token = EncryptUtils.decrypt(token); + JWTToken jwtToken = new JWTToken(token); + getSubject(request, response).login(jwtToken); + return true; + } catch (Exception e) { return false; } - if (type == AuthenticationType.OPENAPI) { - JWTToken jwtToken = new JWTToken(WebUtils.decryptToken(token)); - try { - getSubject(request, response).login(jwtToken); - return true; - } catch (Exception e) { - return false; - } - } - return true; } /** cross-domain support */ diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTToken.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTToken.java index 15d13fe382..dada265284 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTToken.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTToken.java @@ -33,16 +33,13 @@ public class JWTToken implements AuthenticationToken { private String expireAt; - private int signType; - public JWTToken(String token) { this.token = token; } - public JWTToken(String token, String expireAt, int signType) { + public JWTToken(String token, String expireAt) { this.token = token; this.expireAt = expireAt; - this.signType = signType; } @Override diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTUtil.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTUtil.java index 93186ccdef..f51c276f30 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTUtil.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTUtil.java @@ -17,9 +17,12 @@ package org.apache.streampark.console.system.authentication; +import org.apache.streampark.console.base.util.EncryptUtils; import org.apache.streampark.console.core.enums.AuthenticationType; +import org.apache.streampark.console.system.entity.User; import com.auth0.jwt.JWT; +import com.auth0.jwt.JWTCreator; import com.auth0.jwt.JWTVerifier; import com.auth0.jwt.algorithms.Algorithm; import com.auth0.jwt.interfaces.DecodedJWT; @@ -33,6 +36,11 @@ public class JWTUtil { private static Long ttlOfSecond; + private static final String JWT_USERID = "userId"; + private static final String JWT_USERNAME = "userName"; + private static final String JWT_TYPE = "type"; + private static final String JWT_TIMESTAMP = "timestamp"; + /** * verify token * @@ -42,7 +50,7 @@ public class JWTUtil { public static boolean verify(String token, String username, String secret) { try { Algorithm algorithm = Algorithm.HMAC256(secret); - JWTVerifier verifier = JWT.require(algorithm).withClaim("userName", username).build(); + JWTVerifier verifier = JWT.require(algorithm).withClaim(JWT_USERNAME, username).build(); verifier.verify(token); return true; } catch (Exception ignored) { @@ -54,7 +62,7 @@ public static boolean verify(String token, String username, String secret) { public static String getUserName(String token) { try { DecodedJWT jwt = JWT.decode(token); - return jwt.getClaim("userName").asString(); + return jwt.getClaim(JWT_USERNAME).asString(); } catch (Exception ignored) { return null; } @@ -63,16 +71,33 @@ public static String getUserName(String token) { public static Long getUserId(String token) { try { DecodedJWT jwt = JWT.decode(token); - return jwt.getClaim("userId").asLong(); + return jwt.getClaim(JWT_USERID).asLong(); } catch (Exception ignored) { return null; } } + /** + * @param token + * @return + */ + public static Long getTimestamp(String token) { + try { + DecodedJWT jwt = JWT.decode(token); + return jwt.getClaim(JWT_TIMESTAMP).asLong(); + } catch (Exception ignored) { + return 0L; + } + } + + /** + * @param token + * @return + */ public static AuthenticationType getAuthType(String token) { try { DecodedJWT jwt = JWT.decode(token); - int type = jwt.getClaim("type").asInt(); + int type = jwt.getClaim(JWT_TYPE).asInt(); return AuthenticationType.of(type); } catch (Exception ignored) { return null; @@ -80,44 +105,47 @@ public static AuthenticationType getAuthType(String token) { } /** - * generate token - * - * @param userId - * @param userName + * @param user + * @param authType * @return + * @throws Exception */ - public static String sign( - Long userId, String userName, String secret, AuthenticationType authType) { - Long second = getTTLOfSecond() * 1000; + public static String sign(User user, AuthenticationType authType) throws Exception { + long second = getTTLOfSecond() * 1000; Long ttl = System.currentTimeMillis() + second; - return sign(userId, userName, secret, authType, ttl); + return sign(user, authType, ttl); } /** - * generate token - * - * @param userId - * @param userName + * @param user + * @param authType * @param expireTime * @return + * @throws Exception */ - public static String sign( - Long userId, String userName, String secret, AuthenticationType authType, - Long expireTime) { + public static String sign(User user, AuthenticationType authType, Long expireTime) throws Exception { Date date = new Date(expireTime); - Algorithm algorithm = Algorithm.HMAC256(secret); - return JWT.create() - .withClaim("userId", userId) - .withClaim("userName", userName) - .withClaim("type", authType.get()) - .withExpiresAt(date) - .sign(algorithm); + Algorithm algorithm = Algorithm.HMAC256(user.getPassword()); + + JWTCreator.Builder builder = + JWT.create() + .withClaim(JWT_USERID, user.getUserId()) + .withClaim(JWT_USERNAME, user.getUsername()) + .withClaim(JWT_TYPE, authType.get()) + .withExpiresAt(date); + + if (authType == AuthenticationType.SIGN) { + builder.withClaim(JWT_TIMESTAMP, System.currentTimeMillis()); + } + + String token = builder.sign(algorithm); + return EncryptUtils.encrypt(token); } public static Long getTTLOfSecond() { if (ttlOfSecond == null) { String ttl = System.getProperty("server.session.ttl", "24h").trim(); - String regexp = "^\\d+(s|m|h|d)$"; + String regexp = "^\\d+([smhd])$"; Pattern pattern = Pattern.compile(regexp); if (!pattern.matcher(ttl).matches()) { throw new IllegalArgumentException( @@ -125,7 +153,7 @@ public static Long getTTLOfSecond() { } String unit = ttl.substring(ttl.length() - 1); String time = ttl.substring(0, ttl.length() - 1); - Long second = Long.parseLong(time); + long second = Long.parseLong(time); switch (unit) { case "m": return ttlOfSecond = second * 60; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/ShiroConfig.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/ShiroConfig.java index 6418281e0b..2d82a0e474 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/ShiroConfig.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/ShiroConfig.java @@ -74,6 +74,7 @@ public ShiroFilterFactoryBean shiroFilterFactoryBean(SecurityManager securityMan filterChainDefinitionMap.put("/*.less", "anon"); filterChainDefinitionMap.put("/*.ico", "anon"); filterChainDefinitionMap.put("/", "anon"); + filterChainDefinitionMap.put("/proxy/**", "anon"); filterChainDefinitionMap.put("/**", "jwt"); shiroFilterFactoryBean.setFilterChainDefinitionMap(filterChainDefinitionMap); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/ShiroRealm.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/ShiroRealm.java index b9d6afb0a6..6ea9d88a48 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/ShiroRealm.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/ShiroRealm.java @@ -17,14 +17,14 @@ package org.apache.streampark.console.system.authentication; -import org.apache.streampark.console.base.util.WebUtils; +import org.apache.streampark.common.util.SystemPropertyUtils; +import org.apache.streampark.console.base.util.EncryptUtils; import org.apache.streampark.console.core.enums.AuthenticationType; import org.apache.streampark.console.system.entity.AccessToken; import org.apache.streampark.console.system.entity.User; import org.apache.streampark.console.system.service.AccessTokenService; import org.apache.streampark.console.system.service.UserService; -import org.apache.commons.lang3.StringUtils; import org.apache.shiro.SecurityUtils; import org.apache.shiro.authc.AuthenticationException; import org.apache.shiro.authc.AuthenticationInfo; @@ -83,38 +83,54 @@ protected AuthenticationInfo doGetAuthenticationInfo(AuthenticationToken authent // The token here is passed from the executeLogin method of JWTFilter and has been decrypted String credential = (String) authenticationToken.getCredentials(); String username = JWTUtil.getUserName(credential); + Long userId = JWTUtil.getUserId(credential); + AuthenticationType authType = JWTUtil.getAuthType(credential); - if (StringUtils.isBlank(username)) { + if (username == null || userId == null || authType == null) { throw new AuthenticationException("the authorization token is invalid"); } + + switch (authType) { + case SIGN: + Long timestamp = JWTUtil.getTimestamp(credential); + Long startTime = SystemPropertyUtils.getLong("streampark.start.timestamp", 0); + if (timestamp < startTime) { + throw new AuthenticationException("the authorization token is expired"); + } + break; + case OPENAPI: + // Check whether the token belongs to the api and whether the permission is valid + AccessToken accessToken = accessTokenService.getByUserId(userId); + try { + String encryptToken = EncryptUtils.encrypt(credential); + if (accessToken == null || !accessToken.getToken().equals(encryptToken)) { + throw new AuthenticationException("the openapi authorization token is invalid"); + } + } catch (Exception e) { + throw new AuthenticationException(e); + } + + if (AccessToken.STATUS_DISABLE.equals(accessToken.getStatus())) { + throw new AuthenticationException( + "the openapi authorization token is disabled, please contact the administrator"); + } + + if (User.STATUS_LOCK.equals(accessToken.getUserStatus())) { + throw new AuthenticationException( + "the user [" + username + "] has been locked, please contact the administrator"); + } + SecurityUtils.getSubject().getSession().setAttribute(AccessToken.IS_API_TOKEN, true); + break; + default: + break; + } + // Query user information by username User user = userService.getByUsername(username); - - if (user == null || !JWTUtil.verify(credential, username, user.getSalt())) { + if (user == null || !JWTUtil.verify(credential, username, user.getPassword())) { throw new AuthenticationException("the authorization token verification failed."); } - AuthenticationType authType = JWTUtil.getAuthType(credential); - if (authType == AuthenticationType.OPENAPI) { - // Check whether the token belongs to the api and whether the permission is valid - AccessToken accessToken = accessTokenService.getByUserId(user.getUserId()); - if (accessToken == null - || !accessToken.getToken().equals(WebUtils.encryptToken(credential))) { - throw new AuthenticationException("the openapi authorization token is invalid"); - } - - if (AccessToken.STATUS_DISABLE.equals(accessToken.getStatus())) { - throw new AuthenticationException( - "the openapi authorization token is disabled, please contact the administrator"); - } - - if (User.STATUS_LOCK.equals(accessToken.getUserStatus())) { - throw new AuthenticationException( - "the user [" + username + "] has been locked, please contact the administrator"); - } - - SecurityUtils.getSubject().getSession().setAttribute(AccessToken.IS_API_TOKEN, true); - } return new SimpleAuthenticationInfo(credential, credential, "streampark_shiro_realm"); } } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/controller/AccessTokenController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/controller/AccessTokenController.java index 0fdb8f54cb..604bbcc5bb 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/controller/AccessTokenController.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/controller/AccessTokenController.java @@ -19,7 +19,6 @@ import org.apache.streampark.console.base.domain.RestRequest; import org.apache.streampark.console.base.domain.RestResponse; -import org.apache.streampark.console.base.exception.InternalException; import org.apache.streampark.console.core.enums.AccessTokenStateEnum; import org.apache.streampark.console.core.util.ServiceHelper; import org.apache.streampark.console.system.entity.AccessToken; @@ -49,7 +48,7 @@ public class AccessTokenController { @RequiresPermissions("token:add") public RestResponse createToken( @NotNull(message = "{required}") Long userId, - @RequestParam(required = false) String description) throws InternalException { + @RequestParam(required = false) String description) throws Exception { return accessTokenService.create(userId, description); } @@ -78,7 +77,7 @@ public RestResponse tokensList(RestRequest restRequest, AccessToken accessToken) @PostMapping("toggle") @RequiresPermissions("token:add") public RestResponse toggleToken(@NotNull(message = "{required}") Long tokenId) { - return accessTokenService.toggleToken(tokenId); + return accessTokenService.toggle(tokenId); } @DeleteMapping(value = "delete") diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/controller/PassportController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/controller/PassportController.java index 01256fef42..732d9edac0 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/controller/PassportController.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/controller/PassportController.java @@ -19,7 +19,6 @@ import org.apache.streampark.common.util.DateUtils; import org.apache.streampark.console.base.domain.RestResponse; -import org.apache.streampark.console.base.util.WebUtils; import org.apache.streampark.console.core.enums.AuthenticationType; import org.apache.streampark.console.core.enums.LoginTypeEnum; import org.apache.streampark.console.system.authentication.JWTToken; @@ -40,8 +39,6 @@ import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; -import javax.validation.constraints.NotBlank; - import java.time.LocalDateTime; import java.util.ArrayList; import java.util.List; @@ -79,16 +76,14 @@ public RestResponse type() { } @PostMapping("signin") - public RestResponse signin( - @NotBlank(message = "{required}") String username, - @NotBlank(message = "{required}") String password, - @NotBlank(message = "{required}") String loginType) throws Exception { + public RestResponse signin(User loginUser) throws Exception { - if (StringUtils.isEmpty(username)) { + if (StringUtils.isEmpty(loginUser.getUsername())) { return RestResponse.success().put("code", 0); } - User user = authenticator.authenticate(username, password, loginType); + User user = + authenticator.authenticate(loginUser.getUsername(), loginUser.getPassword(), loginUser.getLoginType()); if (user == null) { return RestResponse.success().put("code", 0); @@ -98,23 +93,17 @@ public RestResponse signin( return RestResponse.success().put("code", 1); } - this.userService.updateLoginTime(username); - String sign = JWTUtil.sign(user.getUserId(), username, user.getSalt(), AuthenticationType.SIGN); + this.userService.updateLoginTime(loginUser.getUsername()); + String token = JWTUtil.sign(user, AuthenticationType.SIGN); LocalDateTime expireTime = LocalDateTime.now().plusSeconds(JWTUtil.getTTLOfSecond()); String ttl = DateUtils.formatFullTime(expireTime); - // shiro login - JWTToken loginToken = new JWTToken(sign, ttl, AuthenticationType.SIGN.get()); - SecurityUtils.getSubject().login(loginToken); - // generate UserInfo - String token = WebUtils.encryptToken(sign); - JWTToken jwtToken = new JWTToken(token, ttl, AuthenticationType.SIGN.get()); String userId = RandomStringUtils.randomAlphanumeric(20); user.setId(userId); - Map userInfo = - userService.generateFrontendUserInfo(user, user.getLastTeamId(), jwtToken); + JWTToken jwtToken = new JWTToken(token, ttl); + Map userInfo = userService.generateFrontendUserInfo(user, jwtToken); return new RestResponse().data(userInfo); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/controller/SsoController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/controller/SsoController.java index 808e5568da..4d9b501a79 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/controller/SsoController.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/controller/SsoController.java @@ -89,7 +89,7 @@ public RestResponse token() throws Exception { ApiAlertException.throwIfNull( principal.getName(), "Please configure the correct Principal Name Attribute"); - User user = authenticator.authenticate(principal.getName(), null, LoginTypeEnum.SSO.toString()); + User user = authenticator.authenticate(principal.getName(), null, LoginTypeEnum.SSO); return userService.getLoginUserInfo(user); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/controller/UserController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/controller/UserController.java index d91d6abddf..262534daf5 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/controller/UserController.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/controller/UserController.java @@ -136,8 +136,9 @@ public RestResponse setTeam(Long teamId) { // 2) get latest userInfo user.dataMasking(); + user.setLastTeamId(teamId); - Map infoMap = userService.generateFrontendUserInfo(user, teamId, null); + Map infoMap = userService.generateFrontendUserInfo(user, null); return new RestResponse().data(infoMap); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/AccessToken.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/AccessToken.java index b17f242231..d178711111 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/AccessToken.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/AccessToken.java @@ -35,7 +35,6 @@ @TableName("t_access_token") public class AccessToken extends BaseEntity { - public static final String DEFAULT_EXPIRE_TIME = "9999-01-01 00:00:00"; public static final String IS_API_TOKEN = "is_api_token"; public static final Integer STATUS_ENABLE = 1; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/runner/StartedUpRunner.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/runner/StartedUpRunner.java index 5ab8850f88..6eeabc8150 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/runner/StartedUpRunner.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/runner/StartedUpRunner.java @@ -54,6 +54,7 @@ public void run(ApplicationArguments args) { System.out.println(" Info : streampark-console start successful "); System.out.println(" Local : http://localhost:" + port); System.out.println(" Time : " + LocalDateTime.now() + "\n\n"); + System.setProperty("streampark.start.timestamp", System.currentTimeMillis() + ""); } } } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/security/Authenticator.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/security/Authenticator.java index f2414c2ae2..78bd5deabd 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/security/Authenticator.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/security/Authenticator.java @@ -17,6 +17,7 @@ package org.apache.streampark.console.system.security; +import org.apache.streampark.console.core.enums.LoginTypeEnum; import org.apache.streampark.console.system.entity.User; public interface Authenticator { @@ -28,5 +29,5 @@ public interface Authenticator { * @param password user password * @return result object */ - User authenticate(String username, String password, String loginType) throws Exception; + User authenticate(String username, String password, LoginTypeEnum loginType) throws Exception; } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/security/impl/AuthenticatorImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/security/impl/AuthenticatorImpl.java index 5d783d8359..b315014adc 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/security/impl/AuthenticatorImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/security/impl/AuthenticatorImpl.java @@ -39,13 +39,11 @@ public class AuthenticatorImpl implements Authenticator { private LdapService ldapService; @Override - public User authenticate(String username, String password, String loginType) throws Exception { - LoginTypeEnum loginTypeEnum = LoginTypeEnum.of(loginType); - + public User authenticate(String username, String password, LoginTypeEnum loginType) throws Exception { ApiAlertException.throwIfNull( - loginTypeEnum, String.format("the login type [%s] is not supported.", loginType)); + loginType, "the login type is null"); - switch (loginTypeEnum) { + switch (loginType) { case PASSWORD: return passwordAuthenticate(username, password); case LDAP: diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/AccessTokenService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/AccessTokenService.java index 60c7bae222..61e9aa566a 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/AccessTokenService.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/AccessTokenService.java @@ -36,7 +36,7 @@ public interface AccessTokenService extends IService { * @return RestResponse * @throws InternalException */ - RestResponse create(Long userId, String description) throws InternalException; + RestResponse create(Long userId, String description) throws Exception; /** * Retrieves a page of {@link AccessToken} objects based on the provided parameters. @@ -53,7 +53,7 @@ public interface AccessTokenService extends IService { * @param tokenId AccessToken id * @return RestResponse */ - RestResponse toggleToken(Long tokenId); + RestResponse toggle(Long tokenId); /** * Get the corresponding AccessToken based on the user ID diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/UserService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/UserService.java index cbf40603ba..9c907f7cdb 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/UserService.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/UserService.java @@ -138,11 +138,10 @@ public interface UserService extends IService { * Generate user information for the front end * * @param user User - * @param teamId team id * @param token JWTToken * @return */ - Map generateFrontendUserInfo(User user, Long teamId, JWTToken token); + Map generateFrontendUserInfo(User user, JWTToken token); /** * transfer user resources to specified users @@ -158,7 +157,7 @@ public interface UserService extends IService { * @param user User * @return RestResponse */ - RestResponse getLoginUserInfo(User user); + RestResponse getLoginUserInfo(User user) throws Exception; void deleteUser(Long userId); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/AccessTokenServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/AccessTokenServiceImpl.java index 8afebcf70e..b18a069fcf 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/AccessTokenServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/AccessTokenServiceImpl.java @@ -21,9 +21,7 @@ import org.apache.streampark.console.base.domain.RestRequest; import org.apache.streampark.console.base.domain.RestResponse; import org.apache.streampark.console.base.mybatis.pager.MybatisPager; -import org.apache.streampark.console.base.util.WebUtils; import org.apache.streampark.console.core.enums.AuthenticationType; -import org.apache.streampark.console.system.authentication.JWTToken; import org.apache.streampark.console.system.authentication.JWTUtil; import org.apache.streampark.console.system.entity.AccessToken; import org.apache.streampark.console.system.entity.User; @@ -53,24 +51,21 @@ public class AccessTokenServiceImpl extends ServiceImpl getPage(AccessToken tokenParam, RestRequest request) { } @Override - public RestResponse toggleToken(Long tokenId) { + public RestResponse toggle(Long tokenId) { AccessToken tokenInfo = baseMapper.selectById(tokenId); if (tokenInfo == null) { return RestResponse.fail(ResponseCode.CODE_FAIL_ALERT, "accessToken could not be found!"); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/UserServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/UserServiceImpl.java index 08381e2f01..3e03e23d43 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/UserServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/UserServiceImpl.java @@ -24,7 +24,6 @@ import org.apache.streampark.console.base.exception.ApiAlertException; import org.apache.streampark.console.base.mybatis.pager.MybatisPager; import org.apache.streampark.console.base.util.ShaHashUtils; -import org.apache.streampark.console.base.util.WebUtils; import org.apache.streampark.console.core.enums.AuthenticationType; import org.apache.streampark.console.core.enums.LoginTypeEnum; import org.apache.streampark.console.core.service.ResourceService; @@ -228,7 +227,7 @@ public void transferResource(Long userId, Long targetUserId) { } @Override - public RestResponse getLoginUserInfo(User user) { + public RestResponse getLoginUserInfo(User user) throws Exception { if (user == null) { return RestResponse.success().put(RestResponse.CODE_KEY, 0); } @@ -237,16 +236,18 @@ public RestResponse getLoginUserInfo(User user) { return RestResponse.success().put(RestResponse.CODE_KEY, 1); } - updateLoginTime(user.getUsername()); - String token = WebUtils.encryptToken( - JWTUtil.sign( - user.getUserId(), user.getUsername(), user.getSalt(), AuthenticationType.SIGN)); + this.updateLoginTime(user.getUsername()); + String token = JWTUtil.sign(user, AuthenticationType.SIGN); + LocalDateTime expireTime = LocalDateTime.now().plusSeconds(JWTUtil.getTTLOfSecond()); - String expireTimeStr = DateUtils.formatFullTime(expireTime); - JWTToken jwtToken = new JWTToken(token, expireTimeStr, AuthenticationType.SIGN.get()); + String ttl = DateUtils.formatFullTime(expireTime); + + // generate UserInfo String userId = RandomStringUtils.randomAlphanumeric(20); user.setId(userId); - Map userInfo = generateFrontendUserInfo(user, user.getLastTeamId(), jwtToken); + JWTToken jwtToken = new JWTToken(token, ttl); + Map userInfo = generateFrontendUserInfo(user, jwtToken); + return RestResponse.success(userInfo); } @@ -265,7 +266,7 @@ public void deleteUser(Long userId) { * @return UserInfo */ @Override - public Map generateFrontendUserInfo(User user, Long teamId, JWTToken token) { + public Map generateFrontendUserInfo(User user, JWTToken token) { Map userInfo = new HashMap<>(8); // 1) token & expire @@ -279,7 +280,7 @@ public Map generateFrontendUserInfo(User user, Long teamId, JWTT userInfo.put("user", user); // 3) permissions - Set permissions = this.listPermissions(user.getUserId(), teamId); + Set permissions = this.listPermissions(user.getUserId(), user.getLastTeamId()); userInfo.put("permissions", permissions); return userInfo; diff --git a/streampark-console/streampark-console-service/src/main/resources/mapper/core/SavePointMapper.xml b/streampark-console/streampark-console-service/src/main/resources/mapper/core/SavepointMapper.xml similarity index 97% rename from streampark-console/streampark-console-service/src/main/resources/mapper/core/SavePointMapper.xml rename to streampark-console/streampark-console-service/src/main/resources/mapper/core/SavepointMapper.xml index cc1b72e573..99ff6b4278 100644 --- a/streampark-console/streampark-console-service/src/main/resources/mapper/core/SavePointMapper.xml +++ b/streampark-console/streampark-console-service/src/main/resources/mapper/core/SavepointMapper.xml @@ -16,8 +16,8 @@ ~ limitations under the License. --> - - + + diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/AccessTokenServiceTest.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/AccessTokenServiceTest.java index b1cfb89e87..c5de0d6a2f 100644 --- a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/AccessTokenServiceTest.java +++ b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/AccessTokenServiceTest.java @@ -20,7 +20,7 @@ import org.apache.streampark.console.SpringUnitTestBase; import org.apache.streampark.console.base.domain.RestRequest; import org.apache.streampark.console.base.domain.RestResponse; -import org.apache.streampark.console.base.util.WebUtils; +import org.apache.streampark.console.base.util.EncryptUtils; import org.apache.streampark.console.system.authentication.JWTToken; import org.apache.streampark.console.system.authentication.JWTUtil; import org.apache.streampark.console.system.entity.AccessToken; @@ -44,22 +44,21 @@ public class AccessTokenServiceTest extends SpringUnitTestBase { @Test void testCrudToken() throws Exception { Long mockUserId = 100000L; - String expireTime = "9999-01-01 00:00:00"; RestResponse restResponse = accessTokenService.create(mockUserId, ""); Assertions.assertNotNull(restResponse); - Assertions.assertInstanceOf(AccessToken.class, restResponse.get(RestResponse.DATA_KEY)); + Assertions.assertInstanceOf(AccessToken.class, restResponse.get("data")); // verify - AccessToken accessToken = (AccessToken) restResponse.get(RestResponse.DATA_KEY); + AccessToken accessToken = (AccessToken) restResponse.get("data"); LOG.info(accessToken.getToken()); - JWTToken jwtToken = new JWTToken(WebUtils.decryptToken(accessToken.getToken())); + JWTToken jwtToken = new JWTToken(EncryptUtils.decrypt(accessToken.getToken())); LOG.info(jwtToken.getToken()); String username = JWTUtil.getUserName(jwtToken.getToken()); Assertions.assertNotNull(username); Assertions.assertEquals("admin", username); User user = userService.getByUsername(username); Assertions.assertNotNull(user); - Assertions.assertTrue(JWTUtil.verify(jwtToken.getToken(), username, user.getSalt())); + Assertions.assertTrue(JWTUtil.verify(jwtToken.getToken(), username, user.getPassword())); // list AccessToken mockToken1 = new AccessToken(); @@ -73,9 +72,9 @@ void testCrudToken() throws Exception { // toggle Long tokenId = accessToken.getId(); - RestResponse toggleTokenResp = accessTokenService.toggleToken(tokenId); + RestResponse toggleTokenResp = accessTokenService.toggle(tokenId); Assertions.assertNotNull(toggleTokenResp); - Assertions.assertTrue((Boolean) toggleTokenResp.get(RestResponse.DATA_KEY)); + Assertions.assertTrue((Boolean) toggleTokenResp.get("data")); // get AccessToken afterToggle = accessTokenService.getByUserId(mockUserId); diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationManageServiceTest.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationManageServiceTest.java index 693248deeb..f089d81b13 100644 --- a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationManageServiceTest.java +++ b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationManageServiceTest.java @@ -82,7 +82,7 @@ void testRevoke() { app.setK8sHadoopIntegration(false); app.setBackUp(false); app.setRestart(false); - app.setSavePointed(false); + app.setRestoreOrTriggerSavepoint(false); app.setDrain(false); app.setAllowNonRestored(false); @@ -95,7 +95,7 @@ void testStart() throws Exception { Application application = new Application(); application.setId(1304056220683497473L); application.setRestart(false); - application.setSavePointed(false); + application.setRestoreOrTriggerSavepoint(false); application.setAllowNonRestored(false); applicationActionService.start(application, false); diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/SavePointServiceTest.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/SavepointServiceTest.java similarity index 83% rename from streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/SavePointServiceTest.java rename to streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/SavepointServiceTest.java index 5801e08fc4..2a3bc205f1 100644 --- a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/SavePointServiceTest.java +++ b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/SavepointServiceTest.java @@ -29,7 +29,7 @@ import org.apache.streampark.console.core.enums.ConfigFileTypeEnum; import org.apache.streampark.console.core.enums.EffectiveTypeEnum; import org.apache.streampark.console.core.service.application.ApplicationManageService; -import org.apache.streampark.console.core.service.impl.SavePointServiceImpl; +import org.apache.streampark.console.core.service.impl.SavepointServiceImpl; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.fasterxml.jackson.core.JsonProcessingException; @@ -44,13 +44,13 @@ /** * Test class for the implementation {@link - * org.apache.streampark.console.core.service.impl.SavePointServiceImpl} of {@link - * SavePointService}. + * SavepointServiceImpl} of {@link + * SavepointService}. */ -class SavePointServiceTest extends SpringUnitTestBase { +class SavepointServiceTest extends SpringUnitTestBase { @Autowired - private SavePointService savePointService; + private SavepointService savepointService; @Autowired private ApplicationConfigService configService; @@ -67,7 +67,7 @@ class SavePointServiceTest extends SpringUnitTestBase { @AfterEach void cleanTestRecordsInDatabase() { - savePointService.remove(new QueryWrapper<>()); + savepointService.remove(new QueryWrapper<>()); configService.remove(new QueryWrapper<>()); effectiveService.remove(new QueryWrapper<>()); flinkEnvService.remove(new QueryWrapper<>()); @@ -83,17 +83,17 @@ void cleanTestRecordsInDatabase() { void testGetSavepointFromDynamicProps() { String propsWithEmptyTargetValue = "-Dstate.savepoints.dir="; String props = "-Dstate.savepoints.dir=hdfs:///test"; - SavePointServiceImpl savePointServiceImpl = (SavePointServiceImpl) savePointService; + SavepointServiceImpl savepointServiceImpl = (SavepointServiceImpl) savepointService; - assertThat(savePointServiceImpl.getSavepointFromDynamicProps(null)).isNull(); - assertThat(savePointServiceImpl.getSavepointFromDynamicProps(props)).isEqualTo("hdfs:///test"); - assertThat(savePointServiceImpl.getSavepointFromDynamicProps(propsWithEmptyTargetValue)) + assertThat(savepointServiceImpl.getSavepointFromDynamicProps(null)).isNull(); + assertThat(savepointServiceImpl.getSavepointFromDynamicProps(props)).isEqualTo("hdfs:///test"); + assertThat(savepointServiceImpl.getSavepointFromDynamicProps(propsWithEmptyTargetValue)) .isEmpty(); } @Test void testGetSavepointFromAppCfgIfStreamParkOrSQLJob() { - SavePointServiceImpl savePointServiceImpl = (SavePointServiceImpl) savePointService; + SavepointServiceImpl savepointServiceImpl = (SavepointServiceImpl) savepointService; Application app = new Application(); Long appId = 1L; Long appCfgId = 1L; @@ -101,17 +101,17 @@ void testGetSavepointFromAppCfgIfStreamParkOrSQLJob() { // Test for non-(StreamPark job Or FlinkSQL job) app.setAppType(ApplicationType.APACHE_FLINK.getType()); - assertThat(savePointServiceImpl.getSavepointFromConfig(app)).isNull(); + assertThat(savepointServiceImpl.getSavepointFromConfig(app)).isNull(); app.setAppType(ApplicationType.STREAMPARK_FLINK.getType()); app.setJobType(FlinkDevelopmentMode.CUSTOM_CODE.getMode()); - assertThat(savePointServiceImpl.getSavepointFromConfig(app)).isNull(); + assertThat(savepointServiceImpl.getSavepointFromConfig(app)).isNull(); // Test for (StreamPark job Or FlinkSQL job) without application config. app.setAppType(ApplicationType.STREAMPARK_FLINK.getType()); - assertThat(savePointServiceImpl.getSavepointFromConfig(app)).isNull(); + assertThat(savepointServiceImpl.getSavepointFromConfig(app)).isNull(); app.setAppType(ApplicationType.STREAMPARK_FLINK.getType()); app.setJobType(FlinkDevelopmentMode.CUSTOM_CODE.getMode()); - assertThat(savePointServiceImpl.getSavepointFromConfig(app)).isNull(); + assertThat(savepointServiceImpl.getSavepointFromConfig(app)).isNull(); // Test for (StreamPark job Or FlinkSQL job) with application config just disabled checkpoint. ApplicationConfig appCfg = new ApplicationConfig(); @@ -120,7 +120,7 @@ void testGetSavepointFromAppCfgIfStreamParkOrSQLJob() { appCfg.setContent("state.savepoints.dir=hdfs:///test"); appCfg.setFormat(ConfigFileTypeEnum.PROPERTIES.getValue()); configService.save(appCfg); - assertThat(savePointServiceImpl.getSavepointFromConfig(app)).isNull(); + assertThat(savepointServiceImpl.getSavepointFromConfig(app)).isNull(); // Test for (StreamPark job or FlinkSQL job) with application config and enabled checkpoint and // configured value. @@ -128,7 +128,7 @@ void testGetSavepointFromAppCfgIfStreamParkOrSQLJob() { // Test for non-value for CHECKPOINTING_INTERVAL appCfg.setContent(""); configService.updateById(appCfg); - assertThat(savePointServiceImpl.getSavepointFromConfig(app)).isNull(); + assertThat(savepointServiceImpl.getSavepointFromConfig(app)).isNull(); // Test for configured CHECKPOINTING_INTERVAL appCfg.setContent( @@ -142,12 +142,12 @@ void testGetSavepointFromAppCfgIfStreamParkOrSQLJob() { effective.setAppId(appId); effective.setTargetType(EffectiveTypeEnum.CONFIG.getType()); effectiveService.save(effective); - assertThat(savePointServiceImpl.getSavepointFromConfig(app)).isEqualTo("hdfs:///test"); + assertThat(savepointServiceImpl.getSavepointFromConfig(app)).isEqualTo("hdfs:///test"); } @Test void testGetSavepointFromDeployLayer() throws JsonProcessingException { - SavePointServiceImpl savePointServiceImpl = (SavePointServiceImpl) savePointService; + SavepointServiceImpl savepointServiceImpl = (SavepointServiceImpl) savepointService; Long appId = 1L; Long idOfFlinkEnv = 1L; Long teamId = 1L; @@ -168,7 +168,7 @@ void testGetSavepointFromDeployLayer() throws JsonProcessingException { flinkEnvService.save(flinkEnv); // Test for non-remote mode - assertThat(savePointServiceImpl.getSavepointFromDeployLayer(application)) + assertThat(savepointServiceImpl.getSavepointFromDeployLayer(application)) .isEqualTo("hdfs:///test"); // Start the test lines for remote mode @@ -177,7 +177,7 @@ void testGetSavepointFromDeployLayer() throws JsonProcessingException { // Test for it without cluster. application.setExecutionMode(FlinkExecutionMode.REMOTE.getMode()); application.setFlinkClusterId(clusterId); - assertThatThrownBy(() -> savePointServiceImpl.getSavepointFromDeployLayer(application)) + assertThatThrownBy(() -> savepointServiceImpl.getSavepointFromDeployLayer(application)) .isInstanceOf(NullPointerException.class); // Ignored. diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/system/authentication/JWTTest.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/system/authentication/JWTTest.java index 2167fc14f7..ff3ea0bb12 100644 --- a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/system/authentication/JWTTest.java +++ b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/system/authentication/JWTTest.java @@ -19,8 +19,9 @@ import org.apache.streampark.common.util.DateUtils; import org.apache.streampark.console.SpringUnitTestBase; +import org.apache.streampark.console.base.util.EncryptUtils; import org.apache.streampark.console.core.enums.AuthenticationType; -import org.apache.streampark.console.system.entity.AccessToken; +import org.apache.streampark.console.system.entity.User; import com.auth0.jwt.JWT; import org.junit.jupiter.api.Assertions; @@ -32,19 +33,23 @@ class JWTTest extends SpringUnitTestBase { @Test - void testExpireTime() { + void testExpireTime() throws Exception { String userName = "black"; - String expireTime = AccessToken.DEFAULT_EXPIRE_TIME; - String token = JWTUtil.sign( - 10000L, - userName, - "streampark", - AuthenticationType.SIGN, - DateUtils.getTime(expireTime, DateUtils.fullFormat(), TimeZone.getDefault())); + String ttl = "2022-09-01 00:00:00"; + User user = new User(); + user.setUserId(10000L); + user.setUsername(userName); + user.setPassword("streampark"); + String token = + JWTUtil.sign( + user, + AuthenticationType.SIGN, + DateUtils.getTime(ttl, DateUtils.fullFormat(), TimeZone.getDefault())); assert token != null; - Date expiresAt = JWT.decode(token).getExpiresAt(); - String decodeExpireTime = DateUtils.format(expiresAt, DateUtils.fullFormat(), TimeZone.getDefault()); - Assertions.assertEquals(expireTime, decodeExpireTime); + Date expiresAt = JWT.decode(EncryptUtils.decrypt(token)).getExpiresAt(); + String decodeExpireTime = + DateUtils.format(expiresAt, DateUtils.fullFormat(), TimeZone.getDefault()); + Assertions.assertEquals(ttl, decodeExpireTime); } } diff --git a/streampark-console/streampark-console-webapp/src/api/flink/app.ts b/streampark-console/streampark-console-webapp/src/api/flink/app.ts index da8f07f7f6..1d034b3a39 100644 --- a/streampark-console/streampark-console-webapp/src/api/flink/app.ts +++ b/streampark-console/streampark-console-webapp/src/api/flink/app.ts @@ -205,7 +205,7 @@ export function fetchK8sStartLog(data): Promise> { */ export function fetchCheckSavepointPath(data: { id?: string; - savePoint?: string; + savepointPath?: string; }): Promise> { return defHttp.post( { url: APP_API.CHECK_SAVEPOINT_PATH, data }, diff --git a/streampark-console/streampark-console-webapp/src/api/flink/app.type.ts b/streampark-console/streampark-console-webapp/src/api/flink/app.type.ts index 7fdf36c29a..cd89292d45 100644 --- a/streampark-console/streampark-console-webapp/src/api/flink/app.type.ts +++ b/streampark-console/streampark-console-webapp/src/api/flink/app.type.ts @@ -116,8 +116,8 @@ export interface AppListRecord { flinkVersion: string; confPath?: any; format?: any; - savePoint?: any; - savePointed: boolean; + savepointPath?: any; + restoreOrTriggerSavepoint: boolean; drain: boolean; nativeFormat: boolean; allowNonRestored: boolean; @@ -149,10 +149,10 @@ interface AppControl { /* cancel params */ export interface CancelParam { id: string; - savePointed: boolean; + restoreOrTriggerSavepoint: boolean; drain: boolean; nativeFormat: boolean; - savePoint: string; + savepointPath: string; } // create Params export interface CreateParams { diff --git a/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts b/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts index 445f1e0828..6e0c82bb8b 100644 --- a/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts +++ b/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts @@ -84,6 +84,7 @@ export default { created: 'CREATED', starting: 'STARTING', restarting: 'RESTARTING', + savepoint: 'SAVEPOINTING', running: 'RUNNING', failing: 'FAILING', failed: 'FAILED', diff --git a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts index 7324c5f1f9..55ace86b8a 100644 --- a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts +++ b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts @@ -82,6 +82,7 @@ export default { created: '创建中', starting: '启动中', restarting: '重启中', + savepoint: '快照中', running: '运行中', failing: '失败中', failed: '作业失败', diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StartApplicationModal.vue b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StartApplicationModal.vue index fc64533ef8..8020ab3e48 100644 --- a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StartApplicationModal.vue +++ b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StartApplicationModal.vue @@ -50,7 +50,7 @@ Object.assign(receiveData, data); resetFields(); setFieldsValue({ - startSavePoint: receiveData.selected?.path, + savepointPath: receiveData.selected?.path, }); } }); @@ -67,7 +67,7 @@ labelWidth: 120, schemas: [ { - field: 'startSavePointed', + field: 'restoreSavepoint', label: t('flink.app.view.fromSavepoint'), component: 'Switch', componentProps: { @@ -78,7 +78,7 @@ afterItem: () => h('span', { class: 'conf-switch' }, t('flink.app.view.savepointTip')), }, { - field: 'startSavePoint', + field: 'savepointPath', label: 'Savepoint', component: receiveData.historySavePoint && receiveData.historySavePoint.length > 0 @@ -87,7 +87,7 @@ afterItem: () => h('span', { class: 'conf-switch' }, handleSavePointTip(receiveData.historySavePoint)), slot: 'savepoint', - ifShow: ({ values }) => values.startSavePointed, + ifShow: ({ values }) => values.restoreSavepoint, required: true, }, { @@ -100,7 +100,7 @@ }, afterItem: () => h('span', { class: 'conf-switch' }, t('flink.app.view.ignoreRestoredTip')), defaultValue: false, - ifShow: ({ values }) => values.startSavePointed, + ifShow: ({ values }) => values.restoreSavepoint, }, ], colon: true, @@ -132,13 +132,13 @@ async function handleDoSubmit() { try { const formValue = (await validate()) as Recordable; - const savePointed = formValue.startSavePointed; - const savePointPath = savePointed ? formValue['startSavePoint'] : null; + const restoreOrTriggerSavepoint = formValue.restoreSavepoint; + const savepointPath = restoreOrTriggerSavepoint ? formValue['savepointPath'] : null; handleReset(); const { data } = await fetchStart({ id: receiveData.application.id, - savePointed, - savePoint: savePointPath, + restoreOrTriggerSavepoint, + savepointPath: savepointPath, allowNonRestored: formValue.allowNonRestoredState || false, }); if (data.data) { diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StopApplicationModal.vue b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StopApplicationModal.vue index 58e68959b1..f3d5fc90e1 100644 --- a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StopApplicationModal.vue +++ b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StopApplicationModal.vue @@ -46,7 +46,7 @@ labelWidth: 120, schemas: [ { - field: 'stopSavePointed', + field: 'triggerSavepoint', label: t('flink.app.operation.triggerSavePoint'), component: 'Switch', componentProps: { @@ -65,7 +65,7 @@ placeholder: t('flink.app.operation.customSavepoint'), allowClear: true, }, - ifShow: ({ values }) => !!values.stopSavePointed, + ifShow: ({ values }) => !!values.triggerSavepoint, }, { field: 'drain', @@ -76,7 +76,7 @@ unCheckedChildren: 'OFF', }, defaultValue: false, - ifShow: ({ values }) => !!values.stopSavePointed, + ifShow: ({ values }) => !!values.triggerSavepoint, afterItem: () => h('span', { class: 'conf-switch' }, t('flink.app.operation.enableDrain')), }, ], @@ -90,18 +90,18 @@ /* submit */ async function handleSubmit() { try { - const { stopSavePointed, customSavepoint, drain } = (await validate()) as Recordable; + const { triggerSavepoint, customSavepoint, drain } = (await validate()) as Recordable; const stopReq = { id: app.id, - savePointed: stopSavePointed, - savePoint: customSavepoint, + restoreOrTriggerSavepoint: triggerSavepoint, + savepointPath: customSavepoint, drain: drain, }; - if (stopSavePointed) { + if (triggerSavepoint) { if (customSavepoint) { const { data } = await fetchCheckSavepointPath({ - savePoint: customSavepoint, + savepointPath: customSavepoint, }); if (data.data === false) { await createErrorSwal(t('flink.app.operation.invalidSavePoint') + data.message); diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useSavepoint.tsx b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useSavepoint.tsx index 7e0f11897a..1ca2a62f87 100644 --- a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useSavepoint.tsx +++ b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useSavepoint.tsx @@ -103,7 +103,7 @@ export const useSavepoint = (updateOption: Fn) => { if (unref(customSavepoint)) { submitLoading.value = true; const { data } = await fetchCheckSavepointPath({ - savePoint: unref(customSavepoint), + savepointPath: unref(customSavepoint), }); if (data.data === false) { createErrorSwal('custom savepoint path is invalid, ' + data.message); diff --git a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/applications/ApplicationsPage.java b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/applications/ApplicationsPage.java index 57f1057956..85383465df 100644 --- a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/applications/ApplicationsPage.java +++ b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/applications/ApplicationsPage.java @@ -180,7 +180,7 @@ public class StartJobForm { PageFactory.initElements(driver, this); } - @FindBy(xpath = "//button[@id='startApplicationModal_startSavePointed']//span[contains(text(), 'ON')]") + @FindBy(xpath = "//button[@id='startApplicationModal_restoreSavepoint']//span[contains(text(), 'ON')]") private WebElement radioFromSavepoint; @FindBy(xpath = "//div[contains(.,'Start Job')]//button[contains(@class, 'ant-btn')]//span[contains(., 'Apply')]") diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelResponse.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelResponse.scala index 20f11bd462..668a3af89a 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelResponse.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelResponse.scala @@ -17,4 +17,4 @@ package org.apache.streampark.flink.client.bean -case class CancelResponse(savePointDir: String) +case class CancelResponse(savepointDir: String) diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SavepointResponse.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SavepointResponse.scala index ca1b4b29a0..6127f11e2d 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SavepointResponse.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SavepointResponse.scala @@ -18,4 +18,4 @@ package org.apache.streampark.flink.client.bean /** Result class of trigger savepoint presents savepoint path. */ -case class SavepointResponse(savePointDir: String) +case class SavepointResponse(savepointDir: String)