Skip to content

Commit

Permalink
Merge branch 'apache:dev' into Feature-15953
Browse files Browse the repository at this point in the history
  • Loading branch information
yinxiaolog authored Jun 13, 2024
2 parents 8f5aa06 + 8e53f2f commit 6b5464f
Show file tree
Hide file tree
Showing 29 changed files with 536 additions and 89 deletions.
5 changes: 5 additions & 0 deletions docs/docs/en/guide/metrics/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ For example, you can get the master metrics by `curl http://localhost:5679/actua
- stop: the number of stopped workflow instances
- failover: the number of workflow instance fail-overs

### RPC Related Metrics

- ds.rpc.client.sync.request.exception.count: (counter) the number of exceptions occurred in sync rpc requests
- ds.rpc.client.sync.request.duration.time: (histogram) the time cost of sync rpc requests

### Master Server Metrics

- ds.master.overload.count: (counter) the number of times the master overloaded
Expand Down
5 changes: 5 additions & 0 deletions docs/docs/zh/guide/metrics/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ metrics exporter端口`server.port`是在application.yaml里定义的: master: `
- stop:停止的工作流实例数量
- failover:容错的工作流实例数量

### RPC相关指标

- ds.rpc.client.sync.request.exception.count: (counter) 同步rpc请求异常数
- ds.rpc.client.sync.request.duration.time: (histogram) 同步rpc请求耗时

### Master Server指标

- ds.master.overload.count: (counter) master过载次数
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import com.alibaba.druid.sql.parser.SQLParserUtils;
import com.google.auto.service.AutoService;
Expand Down Expand Up @@ -128,8 +129,10 @@ public DataSourceProcessor create() {

@Override
public List<String> splitAndRemoveComment(String sql) {
String cleanSQL = SQLParserUtils.removeComment(sql, com.alibaba.druid.DbType.sqlserver);
return SQLParserUtils.split(cleanSQL, com.alibaba.druid.DbType.sqlserver);
return SQLParserUtils.splitAndRemoveComment(sql, com.alibaba.druid.DbType.sqlserver)
.stream()
.map(subSql -> subSql.concat(";"))
.collect(Collectors.toList());
}

private String transformOther(Map<String, String> otherMap) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@

package org.apache.dolphinscheduler.plugin.datasource.sqlserver.param;

import static com.google.common.truth.Truth.assertThat;

import org.apache.dolphinscheduler.common.constants.DataSourceConstants;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils;
import org.apache.dolphinscheduler.spi.enums.DbType;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.junit.jupiter.api.Assertions;
Expand Down Expand Up @@ -95,4 +98,32 @@ public void testGetValidationQuery() {
Assertions.assertEquals(DataSourceConstants.SQLSERVER_VALIDATION_QUERY,
sqlServerDatasourceProcessor.getValidationQuery());
}

@Test
void splitAndRemoveComment_singleSelect() {
String sql = "select * from table;";
List<String> subSqls = sqlServerDatasourceProcessor.splitAndRemoveComment(sql);
assertThat(subSqls).hasSize(1);
assertThat(subSqls.get(0)).isEqualTo("select * from table;");
}

@Test
void splitAndRemoveComment_singleMerge() {
String sql = "MERGE\n" +
" [ TOP ( expression ) [ PERCENT ] ]\n" +
" [ INTO ] <target_table> [ WITH ( <merge_hint> ) ] [ [ AS ] table_alias ]\n" +
" USING <table_source> [ [ AS ] table_alias ]\n" +
" ON <merge_search_condition>\n" +
" [ WHEN MATCHED [ AND <clause_search_condition> ]\n" +
" THEN <merge_matched> ] [ ...n ]\n" +
" [ WHEN NOT MATCHED [ BY TARGET ] [ AND <clause_search_condition> ]\n" +
" THEN <merge_not_matched> ]\n" +
" [ WHEN NOT MATCHED BY SOURCE [ AND <clause_search_condition> ]\n" +
" THEN <merge_matched> ] [ ...n ]\n" +
" [ <output_clause> ]\n" +
" [ OPTION ( <query_hint> [ ,...n ] ) ];";
List<String> subSqls = sqlServerDatasourceProcessor.splitAndRemoveComment(sql);
assertThat(subSqls).hasSize(1);
assertThat(subSqls.get(0)).isEqualTo(sql);
}
}
24 changes: 24 additions & 0 deletions dolphinscheduler-e2e/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,27 @@ class TenantE2ETest {
- For UI tests, it's common that the pages might need some time to load, or the operations might need some time to
complete, we can use `await().untilAsserted(() -> {})` to wait for the assertions.

## Local development

### Mac M1
Add VM options to the test configuration in IntelliJ IDEA:
```
# In this mode you need to install docker desktop for mac and run it with locally
-Dm1_chip=true
```

### Running locally(without Docker)
```
# In this mode you need to start frontend and backend services locally
-Dlocal=true
```

### Running locally(with Docker)
```
# In this mode you only need to install docker locally
```

- To run the tests locally, you need to have the DolphinScheduler running locally. You should add `dolphinscheduler-e2e/pom.xml` to the maven project
Since it does not participate in project compilation, it is not in the main project.
- Running run test class `org.apache.dolphinscheduler.e2e.cases.UserE2ETest` in the IDE. After execution, the test video will be saved as mp4 in a local temporary directory. Such as
`/var/folders/hf/123/T/record-3123/PASSED-[engine:junit-jupiter]/[class:org.apache.dolphinscheduler.e2e.cases.UserE2ETest]-20240606-152333.mp4`
Original file line number Diff line number Diff line change
Expand Up @@ -134,15 +134,16 @@ private void runInDockerContainer(ExtensionContext context) {
private void setBrowserContainerByOsName() {
DockerImageName imageName;

if (LOCAL_MODE && M1_CHIP_FLAG) {
imageName = DockerImageName.parse("seleniarm/standalone-chromium:4.1.2-20220227")
if (M1_CHIP_FLAG) {
imageName = DockerImageName.parse("seleniarm/standalone-chromium:124.0-chromedriver-124.0")
.asCompatibleSubstituteFor("selenium/standalone-chrome");

browser = new BrowserWebDriverContainer<>(imageName)
.withCapabilities(new ChromeOptions())
.withCreateContainerCmdModifier(cmd -> cmd.withUser("root"))
.withFileSystemBind(Constants.HOST_CHROME_DOWNLOAD_PATH.toFile().getAbsolutePath(),
Constants.SELENIUM_CONTAINER_CHROME_DOWNLOAD_PATH)
.withRecordingMode(RECORD_ALL, record.toFile(), MP4)
.withStartupTimeout(Duration.ofSeconds(300));
} else {
browser = new BrowserWebDriverContainer<>()
Expand Down Expand Up @@ -203,7 +204,7 @@ private ComposeContainer createDockerCompose(ExtensionContext context) {
.map(URL::getPath)
.map(File::new)
.collect(Collectors.toList());

ComposeContainer compose = new ComposeContainer(files)
.withPull(true)
.withTailChildContainers(true)
Expand All @@ -213,7 +214,7 @@ private ComposeContainer createDockerCompose(ExtensionContext context) {
DOCKER_PORT, Wait.forListeningPort().withStartupTimeout(Duration.ofSeconds(300)))
.withLogConsumer(serviceName, outputFrame -> LOGGER.info(outputFrame.getUtf8String()))
.waitingFor(serviceName, Wait.forHealthcheck().withStartupTimeout(Duration.ofSeconds(300)));


return compose;
}
Expand Down
4 changes: 2 additions & 2 deletions dolphinscheduler-e2e/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

<junit.version>5.8.1</junit.version>
<selenium.version>4.6.0</selenium.version>
<selenium.version>4.13.0</selenium.version>
<lombok.version>1.18.20</lombok.version>
<assertj-core.version>3.20.2</assertj-core.version>
<kotlin.version>1.5.30</kotlin.version>
Expand Down Expand Up @@ -119,7 +119,7 @@
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers-bom</artifactId>
<version>1.19.3</version>
<version>1.19.8</version>
<scope>import</scope>
<type>pom</type>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@
<artifactId>dolphinscheduler-common</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-meter</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,6 @@
@Documented
public @interface RpcMethod {

long timeout() default 3000L;
long timeout() default -1;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.extract.base;

import org.apache.dolphinscheduler.extract.base.protocal.Transporter;
import org.apache.dolphinscheduler.extract.base.utils.Host;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class SyncRequestDto {

private Host serverHost;
private Transporter transporter;
private long timeoutMillis;

}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
.writeAndFlush(HeartBeatTransporter.getHeartBeatTransporter())
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
if (log.isDebugEnabled()) {
log.debug("Client send heart beat to: {}", ChannelUtils.getRemoteAddress(ctx.channel()));
log.info("Client send heartbeat to: {}", ctx.channel().remoteAddress());
}
} else {
super.userEventTriggered(ctx, evt);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,17 @@

import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.extract.base.IRpcResponse;
import org.apache.dolphinscheduler.extract.base.SyncRequestDto;
import org.apache.dolphinscheduler.extract.base.config.NettyClientConfig;
import org.apache.dolphinscheduler.extract.base.exception.RemotingException;
import org.apache.dolphinscheduler.extract.base.exception.RemotingTimeoutException;
import org.apache.dolphinscheduler.extract.base.future.ResponseFuture;
import org.apache.dolphinscheduler.extract.base.metrics.ClientSyncDurationMetrics;
import org.apache.dolphinscheduler.extract.base.metrics.ClientSyncExceptionMetrics;
import org.apache.dolphinscheduler.extract.base.metrics.RpcMetrics;
import org.apache.dolphinscheduler.extract.base.protocal.Transporter;
import org.apache.dolphinscheduler.extract.base.protocal.TransporterDecoder;
import org.apache.dolphinscheduler.extract.base.protocal.TransporterEncoder;
import org.apache.dolphinscheduler.extract.base.utils.Constants;
import org.apache.dolphinscheduler.extract.base.utils.Host;
import org.apache.dolphinscheduler.extract.base.utils.NettyUtils;

Expand Down Expand Up @@ -97,8 +100,8 @@ public void initChannel(SocketChannel ch) {
ch.pipeline()
.addLast("client-idle-handler",
new IdleStateHandler(
Constants.NETTY_CLIENT_HEART_BEAT_TIME,
0,
clientConfig.getHeartBeatIntervalMillis(),
0,
TimeUnit.MILLISECONDS))
.addLast(new TransporterDecoder(), clientHandler, new TransporterEncoder());
Expand All @@ -107,38 +110,60 @@ public void initChannel(SocketChannel ch) {
isStarted.compareAndSet(false, true);
}

public IRpcResponse sendSync(final Host host,
final Transporter transporter,
final long timeoutMillis) throws InterruptedException, RemotingException {
final Channel channel = getOrCreateChannel(host);
if (channel == null) {
throw new RemotingException(String.format("connect to : %s fail", host));
}
public IRpcResponse sendSync(SyncRequestDto syncRequestDto) throws RemotingException {
long start = System.currentTimeMillis();

final Host host = syncRequestDto.getServerHost();
final Transporter transporter = syncRequestDto.getTransporter();
final long timeoutMillis = syncRequestDto.getTimeoutMillis() < 0 ? clientConfig.getConnectTimeoutMillis()
: syncRequestDto.getTimeoutMillis();
final long opaque = transporter.getHeader().getOpaque();
final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis);
channel.writeAndFlush(transporter).addListener(future -> {
if (future.isSuccess()) {
responseFuture.setSendOk(true);
return;
} else {
responseFuture.setSendOk(false);

try {
final Channel channel = getOrCreateChannel(host);
if (channel == null) {
throw new RemotingException(String.format("connect to : %s fail", host));
}
responseFuture.setCause(future.cause());
responseFuture.putResponse(null);
log.error("Send Sync request {} to host {} failed", transporter, host, responseFuture.getCause());
});
/*
* sync wait for result
*/
IRpcResponse iRpcResponse = responseFuture.waitResponse();
if (iRpcResponse == null) {
if (responseFuture.isSendOK()) {
throw new RemotingTimeoutException(host.toString(), timeoutMillis, responseFuture.getCause());
final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis);
channel.writeAndFlush(transporter).addListener(future -> {
if (future.isSuccess()) {
responseFuture.setSendOk(true);
return;
} else {
responseFuture.setSendOk(false);
}
responseFuture.setCause(future.cause());
responseFuture.putResponse(null);
log.error("Send Sync request {} to host {} failed", transporter, host, responseFuture.getCause());
});
/*
* sync wait for result
*/
IRpcResponse iRpcResponse = responseFuture.waitResponse();
if (iRpcResponse == null) {
if (responseFuture.isSendOK()) {
throw new RemotingTimeoutException(host.toString(), timeoutMillis, responseFuture.getCause());
} else {
throw new RemotingException(host.toString(), responseFuture.getCause());
}
}
return iRpcResponse;
} catch (Exception ex) {
ClientSyncExceptionMetrics clientSyncExceptionMetrics = ClientSyncExceptionMetrics
.of(syncRequestDto)
.withThrowable(ex);
RpcMetrics.recordClientSyncRequestException(clientSyncExceptionMetrics);
if (ex instanceof RemotingException) {
throw (RemotingException) ex;
} else {
throw new RemotingException(host.toString(), responseFuture.getCause());
throw new RemotingException(ex);
}
} finally {
ClientSyncDurationMetrics clientSyncDurationMetrics = ClientSyncDurationMetrics
.of(syncRequestDto)
.withMilliseconds(System.currentTimeMillis() - start);
RpcMetrics.recordClientSyncRequestDuration(clientSyncDurationMetrics);
}
return iRpcResponse;
}

Channel getOrCreateChannel(Host host) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.dolphinscheduler.extract.base.IRpcResponse;
import org.apache.dolphinscheduler.extract.base.RpcMethod;
import org.apache.dolphinscheduler.extract.base.StandardRpcRequest;
import org.apache.dolphinscheduler.extract.base.SyncRequestDto;
import org.apache.dolphinscheduler.extract.base.exception.MethodInvocationException;
import org.apache.dolphinscheduler.extract.base.protocal.Transporter;
import org.apache.dolphinscheduler.extract.base.protocal.TransporterHeader;
Expand All @@ -41,8 +42,12 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl
transporter.setBody(JsonSerializer.serialize(StandardRpcRequest.of(args)));
transporter.setHeader(TransporterHeader.of(methodIdentifier));

IRpcResponse iRpcResponse =
nettyRemotingClient.sendSync(serverHost, transporter, sync.timeout());
SyncRequestDto syncRequestDto = SyncRequestDto.builder()
.timeoutMillis(sync.timeout())
.transporter(transporter)
.serverHost(serverHost)
.build();
IRpcResponse iRpcResponse = nettyRemotingClient.sendSync(syncRequestDto);
if (!iRpcResponse.isSuccess()) {
throw MethodInvocationException.of(iRpcResponse.getMessage());
}
Expand Down
Loading

0 comments on commit 6b5464f

Please sign in to comment.