Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature-16992][dolphinscheduler-task-seatunnel] Implement added seatunnel configuration generation in the form mode #16993

Open
wants to merge 5 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@
<version>${project.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-datasource-api</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,35 @@ private Constants() {
public static final String JSON_SUFFIX = "json";
public static final String CONF_SUFFIX = "conf";

public static final String EQUAL_SIGN = " = ";
public static final String LINE_BREAK = "\n";
public static final String INDENT_TWO_SPACE = " ";
public static final String INDENT_FOUR_SPACE = " ";
public static final String DOUBLE_QUOTE = "\"";
public static final String SINGLE_BRACKETS_RIGHT = "}";
public static final String SINGLE_BRACKETS_LEFT = "{";

/**
* doris config
*/
public static final String DORIS_CONFIG = "doris.config";
public static final String FORMAT_JSON = "json";
public static final String READ_JSON_BY_LINE_VALUE = "true";
public static final String DORIS_CONFIG_SINK_TEMPLATE = " doris.config {\n" +
" format = \"%s\"" + "\n" +
" read_json_by_line = \"%s\"" + "\n" +
" }\n";
public static final String DORIS_HTTP_PORT = "8030";

/**
* mysql config
*/
public static final String MYSQL_EXTRA_SINK_PARAMS =
" database = \"%s\"\n table = \"%s\"\n generate_sink_sql = \"%s\"\n";
public static final String MYSQL_QUERY_PARAMS = " query = \"%s\"\n";
public static final String MYSQL_DEFAULT_QUERY_PREFIX = "select * from ";

public static final String MYSQL = "MYSQL";
public static final String HDFS = "HDFS";
public static final String DORIS = "DORIS";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.plugin.task.seatunnel;

import lombok.Getter;

@Getter
public enum JobModeEnum {

/**
* BATCH job mode in seatunnel env
*/
BATCH("BATCH"),
/**
* STREAMING job mode in seatunnel env
*/
STREAMING("STREAMING");

private final String name;

JobModeEnum(String jobMode) {
this.name = jobMode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,21 @@

package org.apache.dolphinscheduler.plugin.task.seatunnel;

import static org.apache.dolphinscheduler.plugin.task.seatunnel.Constants.DORIS;
import static org.apache.dolphinscheduler.plugin.task.seatunnel.Constants.HDFS;
import static org.apache.dolphinscheduler.plugin.task.seatunnel.Constants.MYSQL;

import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.ResourceType;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.DataSourceParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
import org.apache.dolphinscheduler.plugin.task.seatunnel.parameter.CommonConfigParameters;
import org.apache.dolphinscheduler.plugin.task.seatunnel.parameter.DorisParameters;
import org.apache.dolphinscheduler.plugin.task.seatunnel.parameter.HdfsFileParameters;
import org.apache.dolphinscheduler.plugin.task.seatunnel.parameter.MysqlParameters;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;

Expand All @@ -36,10 +47,59 @@
@NoArgsConstructor
public class SeatunnelParameters extends AbstractParameters {

/**
* source type
*/
private String sourceType;

/**
* target type
*/
private String targetType;

/**
* source config parameters
*/
private String sourceConfig;

/**
* sink config parameters
*/
private String targetConfig;

/**
* task parallelism
*/
private int parallelism;

/**
* job.mode type in seatunnel env
*/
private JobModeEnum jobMode;

/**
* Enable custom data filtering
*/
private boolean customDataFilter;

/**
* Custom seatunnel transform config when customDataFilter set true
*/
private String customTransform;

/**
* startup script
*/
private String startupScript;

private Boolean useCustom;
/**
* Whether to use user-defined configuration
*/
private boolean useCustom;

/**
* raw script
*/
private String rawScript;

/**
Expand All @@ -51,12 +111,103 @@ public class SeatunnelParameters extends AbstractParameters {
public boolean checkParameters() {
return Objects.nonNull(startupScript)
&& ((BooleanUtils.isTrue(useCustom) && StringUtils.isNotBlank(rawScript))
|| (BooleanUtils.isFalse(useCustom) && CollectionUtils.isNotEmpty(resourceList)
&& resourceList.size() == 1));
|| (BooleanUtils.isFalse(useCustom) && sourceConfig != null && targetConfig != null));
}

@Override
public List<ResourceInfo> getResourceFilesList() {
return resourceList;
}

@Override
public ResourceParametersHelper getResources() {
ResourceParametersHelper resources = super.getResources();

if (this.isUseCustom()) {
return resources;
}

if (StringUtils.isNotEmpty(sourceConfig)) {
CommonConfigParameters sourceParameters =
(CommonConfigParameters) JSONUtils.parseObject(this.getSourceConfig(),
getSourceParameter(this.getSourceType()));
int sourceDatabaseId = sourceParameters.getDatabaseId();
if (sourceDatabaseId != 0) {
resources.put(ResourceType.DATASOURCE, sourceDatabaseId);
}
}

if (StringUtils.isNotEmpty(targetConfig)) {
CommonConfigParameters sinkParameters =
(CommonConfigParameters) JSONUtils.parseObject(this.getTargetConfig(),
getTargetParameter(this.getTargetType()));
int sinkDatabaseId = sinkParameters.getDatabaseId();
if (sinkDatabaseId != 0) {
resources.put(ResourceType.DATASOURCE, sinkDatabaseId);
}
}

return resources;
}

public SeatunnelTaskExecutionContext generateExtendedContext(ResourceParametersHelper resourceParametersHelper) {

SeatunnelTaskExecutionContext seatunnelTaskExecutionContext = new SeatunnelTaskExecutionContext();

if (this.isUseCustom()) {
return seatunnelTaskExecutionContext;
}

CommonConfigParameters sourceParameter = (CommonConfigParameters) JSONUtils.parseObject(this.getSourceConfig(),
getSourceParameter(this.getSourceType()));

CommonConfigParameters sinkParameter = (CommonConfigParameters) JSONUtils.parseObject(this.getTargetConfig(),
getTargetParameter(this.getTargetType()));

DataSourceParameters dataSource = (DataSourceParameters) resourceParametersHelper
.getResourceParameters(ResourceType.DATASOURCE, sourceParameter.getDatabaseId());

DataSourceParameters dataSink = (DataSourceParameters) resourceParametersHelper
.getResourceParameters(ResourceType.DATASOURCE, sinkParameter.getDatabaseId());

if (Objects.nonNull(dataSource)) {
seatunnelTaskExecutionContext.setDataSourceId(sourceParameter.getDatabaseId());
seatunnelTaskExecutionContext.setDataSourceType(dataSource.getType());
seatunnelTaskExecutionContext.setSourceConnectionParams(dataSource.getConnectionParams());
}

if (Objects.nonNull(dataSink)) {
seatunnelTaskExecutionContext.setDataTargetId(sinkParameter.getDatabaseId());
seatunnelTaskExecutionContext.setDataTargetType(dataSink.getType());
seatunnelTaskExecutionContext.setTargetConnectionParams(dataSink.getConnectionParams());
}

return seatunnelTaskExecutionContext;
}

private Class<?> getSourceParameter(String sourceType) {
switch (sourceType) {
case MYSQL:
return MysqlParameters.class;
case HDFS:
return HdfsFileParameters.class;
case DORIS:
return DorisParameters.class;
default:
return null;
}
}

private Class<?> getTargetParameter(String sinkType) {
switch (sinkType) {
case MYSQL:
return MysqlParameters.class;
case HDFS:
return HdfsFileParameters.class;
case DORIS:
return DorisParameters.class;
default:
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder;
import org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory;
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
import org.apache.dolphinscheduler.plugin.task.seatunnel.generator.SeatunnelConfigGenerator;

import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.BooleanUtils;
Expand Down Expand Up @@ -72,6 +73,11 @@ public class SeatunnelTask extends AbstractRemoteTask {
*/
protected final TaskExecutionContext taskExecutionContext;

/**
* seatunnelTaskExecutionContext
*/
protected SeatunnelTaskExecutionContext seatunnelTaskExecutionContext;

/**
* constructor
*
Expand All @@ -95,6 +101,9 @@ public void init() {
if (seatunnelParameters == null || !seatunnelParameters.checkParameters()) {
throw new TaskException("SeaTunnel task params is not valid");
}

seatunnelTaskExecutionContext =
seatunnelParameters.generateExtendedContext(taskExecutionContext.getResourceParametersHelper());
}

// todo split handle to submit and track
Expand Down Expand Up @@ -159,14 +168,22 @@ protected List<String> buildOptions() throws Exception {
List<String> args = new ArrayList<>();
args.add(CONFIG_OPTIONS);
String scriptContent;
if (BooleanUtils.isTrue(seatunnelParameters.getUseCustom())) {
scriptContent = buildCustomConfigContent();
if (BooleanUtils.isTrue(seatunnelParameters.isUseCustom())) {
if (null != seatunnelParameters.getResourceList() && !seatunnelParameters.getResourceList().isEmpty()) {
// use resource file
String resourceFileName = seatunnelParameters.getResourceList().get(0).getResourceName();
ResourceContext resourceContext = taskExecutionContext.getResourceContext();
scriptContent = FileUtils.readFileToString(
new File(resourceContext.getResourceItem(resourceFileName).getResourceAbsolutePathInLocal()),
StandardCharsets.UTF_8);
} else {
// use custom config
scriptContent = buildCustomConfigContent();
}
} else {
String resourceFileName = seatunnelParameters.getResourceList().get(0).getResourceName();
ResourceContext resourceContext = taskExecutionContext.getResourceContext();
scriptContent = FileUtils.readFileToString(
new File(resourceContext.getResourceItem(resourceFileName).getResourceAbsolutePathInLocal()),
StandardCharsets.UTF_8);
// use generator config
scriptContent =
SeatunnelConfigGenerator.generateSeatunnelJob(seatunnelParameters, seatunnelTaskExecutionContext);
}
String filePath = buildConfigFilePath();
createConfigFileIfNotExists(scriptContent, filePath);
Expand Down
Loading
Loading