Skip to content

Commit

Permalink
[core] fix.
Browse files Browse the repository at this point in the history
  • Loading branch information
LinMingQiang committed Jan 13, 2025
1 parent 005416a commit c410748
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,6 @@

import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.rest.HttpClient;
import org.apache.paimon.rest.HttpClientOptions;
import org.apache.paimon.rest.RESTClient;
import org.apache.paimon.rest.RESTRequest;
import org.apache.paimon.rest.RESTResponse;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.StringUtils;
Expand All @@ -33,23 +28,46 @@
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.SerializationFeature;

import okhttp3.Dispatcher;
import okhttp3.Headers;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.SynchronousQueue;

import static okhttp3.ConnectionSpec.CLEARTEXT;
import static okhttp3.ConnectionSpec.COMPATIBLE_TLS;
import static okhttp3.ConnectionSpec.MODERN_TLS;
import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_ACTION_URL;
import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool;

/** Report partition submission information to remote http server. */
public class HttpReportMarkDoneAction implements PartitionMarkDoneAction {

private final RESTClient client;
private final OkHttpClient client;
private final String url;
private final ObjectMapper mapper;
private static final MediaType MEDIA_TYPE = MediaType.parse("application/json");

private final FileStoreTable fileStoreTable;

private final String params;

private static final String RESPONSE_SUCCESS = "SUCCESS";

private static final String THREAD_NAME = "PAIMON-HTTP-REPORT-MARK-DONE-ACTION-THREAD";

public HttpReportMarkDoneAction(FileStoreTable fileStoreTable, CoreOptions options) {

Preconditions.checkArgument(
Expand All @@ -60,27 +78,34 @@ public HttpReportMarkDoneAction(FileStoreTable fileStoreTable, CoreOptions optio

this.fileStoreTable = fileStoreTable;
this.params = options.httpReportMarkDoneActionParams();

HttpClientOptions httpClientOptions =
new HttpClientOptions(
options.httpReportMarkDoneActionUrl(),
options.httpReportMarkDoneActionTimeout(),
options.httpReportMarkDoneActionTimeout(),
1);
this.client = new HttpClient(httpClientOptions);
this.url = options.httpReportMarkDoneActionUrl();
this.mapper = new ObjectMapper();
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);

OkHttpClient.Builder builder =
new OkHttpClient.Builder()
.dispatcher(
new Dispatcher(
createCachedThreadPool(
1, THREAD_NAME, new SynchronousQueue<>())))
.retryOnConnectionFailure(true)
.connectionSpecs(Arrays.asList(MODERN_TLS, COMPATIBLE_TLS, CLEARTEXT))
.connectTimeout(options.httpReportMarkDoneActionTimeout())
.readTimeout(options.httpReportMarkDoneActionTimeout());

this.client = builder.build();
}

@Override
public void markDone(String partition) throws Exception {
HttpReportMarkDoneResponse response =
client.post(
null,
post(
new HttpReportMarkDoneRequest(
params,
fileStoreTable.fullName(),
fileStoreTable.location().toString(),
partition),
HttpReportMarkDoneResponse.class,
Collections.emptyMap());
Preconditions.checkState(
reportIsSuccess(response),
Expand All @@ -96,7 +121,8 @@ private boolean reportIsSuccess(HttpReportMarkDoneResponse response) {
@Override
public void close() throws IOException {
try {
this.client.close();
client.dispatcher().cancelAll();
client.connectionPool().evictAll();
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand All @@ -105,7 +131,7 @@ public void close() throws IOException {
/** RestRequest only for HttpReportMarkDoneAction. */
@JsonIgnoreProperties(ignoreUnknown = true)
@VisibleForTesting
public static class HttpReportMarkDoneRequest implements RESTRequest {
public static class HttpReportMarkDoneRequest {

private static final String MARK_DONE_PARTITION = "partition";
private static final String TABLE = "table";
Expand Down Expand Up @@ -160,7 +186,7 @@ public String getParams() {
/** Response only for HttpReportMarkDoneAction. */
@JsonIgnoreProperties(ignoreUnknown = true)
@VisibleForTesting
public static class HttpReportMarkDoneResponse implements RESTResponse {
public static class HttpReportMarkDoneResponse {
private static final String RESULT = "result";

@JsonProperty(RESULT)
Expand All @@ -175,4 +201,30 @@ public String getResult() {
return result;
}
}

public HttpReportMarkDoneResponse post(
HttpReportMarkDoneRequest body, Map<String, String> headers) throws IOException {
RequestBody requestBody = RequestBody.create(mapper.writeValueAsBytes(body), MEDIA_TYPE);
Request request =
new Request.Builder()
.url(url)
.post(requestBody)
.headers(Headers.of(headers))
.build();
try (Response response = client.newCall(request).execute()) {
String responseBodyStr = response.body() != null ? response.body().string() : null;
if (!response.isSuccessful() || StringUtils.isNullOrWhitespaceOnly(responseBodyStr)) {
throw new HttpReportMarkDoneException(
response.isSuccessful()
? "ResponseBody is null or empty."
: String.format(
"Response is not successful, response is %s", response));
}
return mapper.readValue(responseBodyStr, HttpReportMarkDoneResponse.class);
} catch (HttpReportMarkDoneException e) {
throw e;
} catch (Exception e) {
throw new HttpReportMarkDoneException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.partition.actions;

/** Exception for {@link HttpReportMarkDoneAction}. */
public class HttpReportMarkDoneException extends RuntimeException {

private static final long serialVersionUID = 1L;

public HttpReportMarkDoneException(Throwable e) {
super("Http request exception.", e);
}

public HttpReportMarkDoneException(String msg) {
super(msg);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
public class HttpClient implements RESTClient {

private static final ObjectMapper OBJECT_MAPPER = RESTObjectMapper.create();
private static final String THREAD_NAME = "PAIMON-HTTP-CLIENT-THREAD-POOL";
private static final String THREAD_NAME = "REST-CATALOG-HTTP-CLIENT-THREAD-POOL";
private static final MediaType MEDIA_TYPE = MediaType.parse("application/json");

private final OkHttpClient okHttpClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,11 @@ public String createResponseBody(RESTResponse response) throws JsonProcessingExc
return objectMapper.writeValueAsString(response);
}

public <T extends RESTRequest> T readRequestBody(String body, Class<T> requestType)
throws JsonProcessingException {
public <T> T readRequestBody(String body, Class<T> requestType) throws JsonProcessingException {
return objectMapper.readValue(body, requestType);
}

public <T extends RESTResponse> T readResponseBody(String body, Class<T> responseType)
public <T> T readResponseBody(String body, Class<T> responseType)
throws JsonProcessingException {
return objectMapper.readValue(body, responseType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,7 @@ public void testHttpReportPartitionMarkDoneAction(boolean hasPk, String invoker)

FileStoreTable table = prepareTable(hasPk, options);

HttpReportMarkDoneAction.HttpReportMarkDoneResponse expectResponse =
new HttpReportMarkDoneAction.HttpReportMarkDoneResponse("success");
String expectResponse = "{\"result\":\"success\"}";
server.enqueueResponse(expectResponse, 200);
server.enqueueResponse(expectResponse, 200);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.actions.HttpReportMarkDoneAction;
import org.apache.paimon.partition.actions.HttpReportMarkDoneAction.HttpReportMarkDoneRequest;
import org.apache.paimon.partition.actions.HttpReportMarkDoneAction.HttpReportMarkDoneResponse;
import org.apache.paimon.partition.actions.HttpReportMarkDoneException;
import org.apache.paimon.rest.TestHttpWebServer;
import org.apache.paimon.rest.exceptions.BadRequestException;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.SchemaUtils;
Expand Down Expand Up @@ -62,6 +61,8 @@ public class HttpReportMarkDoneActionTest {

private static final String partition = "partition";
private static String params = "key1=value1,key2=value2";
private static final String successResponse = "{\"result\":\"success\"}";
private static final String failedResponse = "{\"result\":\"failed\"}";
private static FileStoreTable fileStoreTable;
@Rule public TemporaryFolder folder = new TemporaryFolder();

Expand All @@ -81,8 +82,7 @@ public void stopServer() throws Exception {
public void testHttpReportMarkDoneActionSuccessResponse() throws Exception {
HttpReportMarkDoneAction httpReportMarkDoneAction = createHttpReportMarkDoneAction();

HttpReportMarkDoneResponse expectedResponse = new HttpReportMarkDoneResponse("success");
server.enqueueResponse(expectedResponse, 200);
server.enqueueResponse(successResponse, 200);

httpReportMarkDoneAction.markDone(partition);
RecordedRequest request = server.takeRequest(10, TimeUnit.SECONDS);
Expand All @@ -98,8 +98,7 @@ public void testHttpReportMarkDoneActionSuccessResponse() throws Exception {
// test params is null.
params = null;
HttpReportMarkDoneAction httpReportMarkDoneAction3 = createHttpReportMarkDoneAction();
HttpReportMarkDoneResponse expectedResponse3 = new HttpReportMarkDoneResponse("success");
server.enqueueResponse(expectedResponse3, 200);
server.enqueueResponse(successResponse, 200);
httpReportMarkDoneAction3.markDone(partition);
RecordedRequest request3 = server.takeRequest(10, TimeUnit.SECONDS);
assertRequest(request3);
Expand All @@ -110,7 +109,6 @@ public void testHttpReportMarkDoneActionFailedResponse() throws Exception {
HttpReportMarkDoneAction markDoneAction = createHttpReportMarkDoneAction();

// status failed.
HttpReportMarkDoneResponse failedResponse = new HttpReportMarkDoneResponse("failed");
server.enqueueResponse(failedResponse, 200);
Assertions.assertThatThrownBy(() -> markDoneAction.markDone(partition))
.isInstanceOf(IllegalStateException.class)
Expand All @@ -125,10 +123,17 @@ public void testHttpReportMarkDoneActionFailedResponse() throws Exception {
.hasMessageContaining(
"The http-report action's response attribute `result` should be 'SUCCESS' but is 'null'.");

// empty response.
server.enqueueResponse("", 200);
Assertions.assertThatThrownBy(() -> markDoneAction.markDone(partition))
.isInstanceOf(HttpReportMarkDoneException.class)
.hasMessageContaining("ResponseBody is null or empty.");

// 400.
server.enqueueResponse("", 400);
server.enqueueResponse(successResponse, 400);
Assertions.assertThatThrownBy(() -> markDoneAction.markDone(partition))
.isInstanceOf(BadRequestException.class);
.isInstanceOf(HttpReportMarkDoneException.class)
.hasMessageContaining("Response is not successful");
}

public static void assertRequest(RecordedRequest recordedRequest)
Expand Down

0 comments on commit c410748

Please sign in to comment.