Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Add a http-report action to reporting partition done to remote servers. #4862

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 27 additions & 3 deletions docs/content/flink/sql-write.md
Original file line number Diff line number Diff line change
Expand Up @@ -261,11 +261,13 @@ CREATE TABLE my_partitioned_table (
'partition.time-interval'='1 d',
'partition.idle-time-to-done'='15 m',
'partition.mark-done-action'='done-partition'
-- You can also customize a PartitionMarkDoneAction to mark the partition completed.
-- 'partition.mark-done-action'='done-partition,custom',
-- 'partition.mark-done-action.custom.class'='org.apache.paimon.CustomPartitionMarkDoneAction'
);
```

You can also customize a PartitionMarkDoneAction to mark the partition completed.
- partition.mark-done-action: custom
- partition.mark-done-action.custom.class: The partition mark done class for implement PartitionMarkDoneAction interface (e.g. org.apache.paimon.CustomPartitionMarkDoneAction).

Define a class CustomPartitionMarkDoneAction to implement the PartitionMarkDoneAction interface.
```java
package org.apache.paimon;
Expand All @@ -282,6 +284,28 @@ public class CustomPartitionMarkDoneAction implements PartitionMarkDoneAction {
}
```

Paimon also support http-report partition mark done action, this action will report the partition to the remote http server.
- partition.mark-done-action: http-report
- partition.mark-done-action.http.url : Action will report the partition to the remote http server.
- partition.mark-done-action.http.timeout : Http client connection timeout and default is 5s.
- partition.mark-done-action.http.params : Http client request params in the request body json.

Http Post request body :
```json
{
"table": "table fullName",
"path": "table location path",
"partition": "mark done partition",
"params" : "custom params"
}
```
Http Response body :
```json
{
"result": "success"
}
```

1. Firstly, you need to define the time parser of the partition and the time interval between partitions in order to
determine when the partition can be properly marked done.
2. Secondly, you need to define idle-time, which determines how long it takes for the partition to have no new data,
Expand Down
20 changes: 19 additions & 1 deletion docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -663,14 +663,32 @@
<td><h5>partition.mark-done-action</h5></td>
<td style="word-wrap: break-word;">"success-file"</td>
<td>String</td>
<td>Action to mark a partition done is to notify the downstream application that the partition has finished writing, the partition is ready to be read.<br />1. 'success-file': add '_success' file to directory.<br />2. 'done-partition': add 'xxx.done' partition to metastore.<br />3. 'mark-event': mark partition event to metastore.<br />4. 'custom': use policy class to create a mark-partition policy.<br />Both can be configured at the same time: 'done-partition,success-file,mark-event,custom'.</td>
<td>Action to mark a partition done is to notify the downstream application that the partition has finished writing, the partition is ready to be read.<br />1. 'success-file': add '_success' file to directory.<br />2. 'done-partition': add 'xxx.done' partition to metastore.<br />3. 'mark-event': mark partition event to metastore.<br />4. 'http-report': report partition mark done to remote http server.<br />5. 'custom': use policy class to create a mark-partition policy.<br />Both can be configured at the same time: 'done-partition,success-file,mark-event,custom'.</td>
</tr>
<tr>
<td><h5>partition.mark-done-action.custom.class</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>The partition mark done class for implement PartitionMarkDoneAction interface. Only work in custom mark-done-action.</td>
</tr>
<tr>
<td><h5>partition.mark-done-action.http.params</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Http client request parameters will be written to the request body, this can only be used by http-report partition mark done action.</td>
</tr>
<tr>
<td><h5>partition.mark-done-action.http.timeout</h5></td>
<td style="word-wrap: break-word;">5 s</td>
<td>Duration</td>
<td>Http client connection timeout, this can only be used by http-report partition mark done action.</td>
</tr>
<tr>
<td><h5>partition.mark-done-action.http.url</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Mark done action will reports the partition to the remote http server, this can only be used by http-report partition mark done action.</td>
</tr>
<tr>
<td><h5>partition.timestamp-formatter</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
65 changes: 64 additions & 1 deletion paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -1220,7 +1221,10 @@ public class CoreOptions implements Serializable {
.text("3. 'mark-event': mark partition event to metastore.")
.linebreak()
.text(
"4. 'custom': use policy class to create a mark-partition policy.")
"4. 'http-report': report partition mark done to remote http server.")
.linebreak()
.text(
"5. 'custom': use policy class to create a mark-partition policy.")
.linebreak()
.text(
"Both can be configured at the same time: 'done-partition,success-file,mark-event,custom'.")
Expand All @@ -1234,6 +1238,27 @@ public class CoreOptions implements Serializable {
"The partition mark done class for implement"
+ " PartitionMarkDoneAction interface. Only work in custom mark-done-action.");

public static final ConfigOption<String> PARTITION_MARK_DONE_ACTION_URL =
LinMingQiang marked this conversation as resolved.
Show resolved Hide resolved
key("partition.mark-done-action.http.url")
.stringType()
.noDefaultValue()
.withDescription(
"Mark done action will reports the partition to the remote http server, this can only be used by http-report partition mark done action.");

public static final ConfigOption<Duration> PARTITION_MARK_DONE_ACTION_TIMEOUT =
key("partition.mark-done-action.http.timeout")
.durationType()
.defaultValue(Duration.ofSeconds(5))
.withDescription(
"Http client connection timeout, this can only be used by http-report partition mark done action.");

public static final ConfigOption<String> PARTITION_MARK_DONE_ACTION_PARAMS =
key("partition.mark-done-action.http.params")
.stringType()
.noDefaultValue()
.withDescription(
"Http client request parameters will be written to the request body, this can only be used by http-report partition mark done action.");

public static final ConfigOption<Boolean> METASTORE_PARTITIONED_TABLE =
key("metastore.partitioned-table")
.booleanType()
Expand Down Expand Up @@ -2262,10 +2287,28 @@ public String partitionTimestampPattern() {
return options.get(PARTITION_TIMESTAMP_PATTERN);
}

public String httpReportMarkDoneActionUrl() {
return options.get(PARTITION_MARK_DONE_ACTION_URL);
}

public Duration httpReportMarkDoneActionTimeout() {
return options.get(PARTITION_MARK_DONE_ACTION_TIMEOUT);
}

public String httpReportMarkDoneActionParams() {
return options.get(PARTITION_MARK_DONE_ACTION_PARAMS);
}

public String partitionMarkDoneCustomClass() {
return options.get(PARTITION_MARK_DONE_CUSTOM_CLASS);
}

public Set<PartitionMarkDoneAction> partitionMarkDoneActions() {
return Arrays.stream(options.get(PARTITION_MARK_DONE_ACTION).split(","))
.map(x -> PartitionMarkDoneAction.valueOf(x.replace('-', '_').toUpperCase()))
.collect(Collectors.toCollection(HashSet::new));
}

public String consumerId() {
String consumerId = options.get(CONSUMER_ID);
if (consumerId != null && consumerId.isEmpty()) {
Expand Down Expand Up @@ -3163,4 +3206,24 @@ public enum MaterializedTableRefreshStatus {
ACTIVATED,
SUSPENDED
}

/** Partition mark done actions. */
public enum PartitionMarkDoneAction {
SUCCESS_FILE("success-file"),
DONE_PARTITION("done-partition"),
MARK_EVENT("mark-event"),
HTTP_REPORT("http-report"),
CUSTOM("custom");

private final String value;

PartitionMarkDoneAction(String value) {
this.value = value;
}

@Override
public String toString() {
return value;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.partition.actions;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.rest.HttpClient;
import org.apache.paimon.rest.HttpClientOptions;
import org.apache.paimon.rest.RESTClient;
import org.apache.paimon.rest.RESTRequest;
import org.apache.paimon.rest.RESTResponse;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.StringUtils;

import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;

import java.io.IOException;
import java.util.Collections;

import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_ACTION_URL;

/** Report partition submission information to remote http server. */
public class HttpReportMarkDoneAction implements PartitionMarkDoneAction {

private final RESTClient client;

private final FileStoreTable fileStoreTable;

private final String params;

private static final String RESPONSE_SUCCESS = "SUCCESS";

public HttpReportMarkDoneAction(FileStoreTable fileStoreTable, CoreOptions options) {

Preconditions.checkArgument(
!StringUtils.isNullOrWhitespaceOnly(options.httpReportMarkDoneActionUrl()),
String.format(
"Parameter %s must be non-empty when you use `http-report` partition mark done action.",
PARTITION_MARK_DONE_ACTION_URL.key()));

this.fileStoreTable = fileStoreTable;
this.params = options.httpReportMarkDoneActionParams();
LinMingQiang marked this conversation as resolved.
Show resolved Hide resolved

HttpClientOptions httpClientOptions =
new HttpClientOptions(
options.httpReportMarkDoneActionUrl(),
options.httpReportMarkDoneActionTimeout(),
options.httpReportMarkDoneActionTimeout(),
1);
this.client = new HttpClient(httpClientOptions);
}

@Override
public void markDone(String partition) throws Exception {
HttpReportMarkDoneResponse response =
client.post(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it is better to introduce a new client for HttpReportMarkDoneAction?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. I implemented a lightweight httpclient in class HttpReportMarkDoneAction .

null,
new HttpReportMarkDoneRequest(
params,
fileStoreTable.fullName(),
fileStoreTable.location().toString(),
partition),
HttpReportMarkDoneResponse.class,
Collections.emptyMap());
Preconditions.checkState(
reportIsSuccess(response),
String.format(
"The http-report action's response attribute `result` should be 'SUCCESS' but is '%s'.",
response.getResult()));
}

private boolean reportIsSuccess(HttpReportMarkDoneResponse response) {
return response != null && RESPONSE_SUCCESS.equalsIgnoreCase(response.getResult());
}

@Override
public void close() throws IOException {
try {
this.client.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
}

/** RestRequest only for HttpReportMarkDoneAction. */
@JsonIgnoreProperties(ignoreUnknown = true)
@VisibleForTesting
public static class HttpReportMarkDoneRequest implements RESTRequest {

private static final String MARK_DONE_PARTITION = "partition";
private static final String TABLE = "table";
private static final String PATH = "path";
private static final String PARAMS = "params";

@JsonProperty(MARK_DONE_PARTITION)
private final String partition;

@JsonProperty(TABLE)
private final String table;

@JsonProperty(PATH)
private final String path;
LinMingQiang marked this conversation as resolved.
Show resolved Hide resolved

@JsonProperty(PARAMS)
private final String params;

@JsonCreator
public HttpReportMarkDoneRequest(
@JsonProperty(PARAMS) String params,
@JsonProperty(TABLE) String table,
@JsonProperty(PATH) String path,
@JsonProperty(MARK_DONE_PARTITION) String partition) {
this.params = params;
this.table = table;
this.path = path;
this.partition = partition;
}

@JsonGetter(MARK_DONE_PARTITION)
public String getPartition() {
return partition;
}

@JsonGetter(TABLE)
public String getTable() {
return table;
}

@JsonGetter(PATH)
public String getPath() {
return path;
}

@JsonGetter(PARAMS)
public String getParams() {
return params;
}
}

/** Response only for HttpReportMarkDoneAction. */
@JsonIgnoreProperties(ignoreUnknown = true)
@VisibleForTesting
public static class HttpReportMarkDoneResponse implements RESTResponse {
private static final String RESULT = "result";

@JsonProperty(RESULT)
private final String result;

public HttpReportMarkDoneResponse(@JsonProperty(RESULT) String result) {
this.result = result;
}

@JsonGetter(RESULT)
public String getResult() {
return result;
}
}
}
Loading
Loading