Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge Conflict Resolution (from 7.1.x to 7.2.x) #10652

Open
wants to merge 2 commits into
base: 7.2.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public KsqlTarget target(final URI server) {
final HttpClient client = isUriTls ? httpTlsClient : httpNonTlsClient;
return new KsqlTarget(client,
socketAddressFactory.apply(server.getPort(), server.getHost()), localProperties,
basicAuthHeader, server.getHost());
basicAuthHeader, server.getHost(), server.getPath());
}

public KsqlTarget targetHttp2(final URI server) {
Expand All @@ -133,7 +133,7 @@ public KsqlTarget targetHttp2(final URI server) {
() -> new IllegalStateException("Must provide http2 options to use targetHttp2"));
return new KsqlTarget(client,
socketAddressFactory.apply(server.getPort(), server.getHost()), localProperties,
basicAuthHeader, server.getHost());
basicAuthHeader, server.getHost(), server.getPath());
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,12 @@ private static URI parseUri(final String serverAddress) {
try {
final URL url = new URL(serverAddress);
if (url.getPort() == -1) {
return new URL(serverAddress.concat(":") + url.getDefaultPort()).toURI();
return new URL(
url.getProtocol(),
url.getHost(),
url.getDefaultPort(),
url.getFile()
).toURI();
}
return url.toURI();
} catch (final Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public final class KsqlTarget {
private final LocalProperties localProperties;
private final Optional<String> authHeader;
private final String host;
private final String subPath;

/**
* Create a KsqlTarget containing all of the connection information required to make a request
Expand All @@ -91,30 +92,33 @@ public final class KsqlTarget {
* @param localProperties Properties sent with ksql requests
* @param authHeader Optional auth headers
* @param host The hostname to use for the request, used to set the host header of the request
* @param subPath Optional path that can be provided with server name
*/
KsqlTarget(
final HttpClient httpClient,
final SocketAddress socketAddress,
final LocalProperties localProperties,
final Optional<String> authHeader,
final String host
final String host,
final String subPath
) {
this.httpClient = requireNonNull(httpClient, "httpClient");
this.socketAddress = requireNonNull(socketAddress, "socketAddress");
this.localProperties = requireNonNull(localProperties, "localProperties");
this.authHeader = requireNonNull(authHeader, "authHeader");
this.host = host;
this.subPath = subPath.replaceAll("/\\z", "");
}

public KsqlTarget authorizationHeader(final String authHeader) {
return new KsqlTarget(httpClient, socketAddress, localProperties,
Optional.of(authHeader), host);
Optional.of(authHeader), host, subPath);
}

public KsqlTarget properties(final Map<String, ?> properties) {
return new KsqlTarget(httpClient, socketAddress,
new LocalProperties(properties),
authHeader, host);
authHeader, host, subPath);
}

public RestResponse<ServerInfo> getServerInfo() {
Expand Down Expand Up @@ -448,7 +452,7 @@ private CompletableFuture<ResponseWithBody> execute(

final HttpClientRequest httpClientRequest = httpClient.request(httpMethod,
socketAddress, socketAddress.port(), host,
path,
subPath + path,
resp -> responseHandler.accept(resp, vcf))
.exceptionHandler(vcf::completeExceptionally);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class KsqlTargetTest {

private static final String HOST = "host";
private static final String QUERY = "SELECT * from RATINGS_TABLE;";
private static final String SUB_PATH = "";

@Mock
private HttpClient httpClient;
Expand Down Expand Up @@ -122,7 +123,7 @@ private void expectPostQueryRequestChunkHandler() {

@Test
public void shouldPostQueryRequest_chunkHandler() {
ksqlTarget = new KsqlTarget(httpClient, socketAddress, localProperties, authHeader, HOST);
ksqlTarget = new KsqlTarget(httpClient, socketAddress, localProperties, authHeader, HOST, SUB_PATH);
executor.submit(this::expectPostQueryRequestChunkHandler);
assertThatEventually(requestStarted::get, is(true));

Expand All @@ -137,7 +138,7 @@ public void shouldPostQueryRequest_chunkHandler() {

@Test
public void shouldPostQueryRequest_chunkHandler_exception() {
ksqlTarget = new KsqlTarget(httpClient, socketAddress, localProperties, authHeader, HOST);
ksqlTarget = new KsqlTarget(httpClient, socketAddress, localProperties, authHeader, HOST, SUB_PATH);
executor.submit(this::expectPostQueryRequestChunkHandler);

assertThatEventually(requestStarted::get, is(true));
Expand All @@ -151,7 +152,7 @@ public void shouldPostQueryRequest_chunkHandler_exception() {

@Test
public void shouldPostQueryRequest_chunkHandler_closeEarly() {
ksqlTarget = new KsqlTarget(httpClient, socketAddress, localProperties, authHeader, HOST);
ksqlTarget = new KsqlTarget(httpClient, socketAddress, localProperties, authHeader, HOST, SUB_PATH);
executor.submit(this::expectPostQueryRequestChunkHandler);

assertThatEventually(requestStarted::get, is(true));
Expand All @@ -168,7 +169,7 @@ public void shouldPostQueryRequest_chunkHandler_closeEarly() {
@Test
public void shouldPostQueryRequest_chunkHandler_closeEarlyWithError() {
doThrow(new RuntimeException("Error!")).when(httpConnection).close();
ksqlTarget = new KsqlTarget(httpClient, socketAddress, localProperties, authHeader, HOST);
ksqlTarget = new KsqlTarget(httpClient, socketAddress, localProperties, authHeader, HOST, SUB_PATH);
executor.submit(this::expectPostQueryRequestChunkHandler);

assertThatEventually(requestStarted::get, is(true));
Expand All @@ -184,7 +185,7 @@ public void shouldPostQueryRequest_chunkHandler_closeEarlyWithError() {

@Test
public void shouldPostQueryRequest_chunkHandler_closeAfterFinish() {
ksqlTarget = new KsqlTarget(httpClient, socketAddress, localProperties, authHeader, HOST);
ksqlTarget = new KsqlTarget(httpClient, socketAddress, localProperties, authHeader, HOST, SUB_PATH);
executor.submit(this::expectPostQueryRequestChunkHandler);

assertThatEventually(requestStarted::get, is(true));
Expand All @@ -201,7 +202,7 @@ public void shouldPostQueryRequest_chunkHandler_closeAfterFinish() {

@Test
public void shouldPostQueryRequest_chunkHandler_partialMessage() {
ksqlTarget = new KsqlTarget(httpClient, socketAddress, localProperties, authHeader, HOST);
ksqlTarget = new KsqlTarget(httpClient, socketAddress, localProperties, authHeader, HOST, SUB_PATH);
executor.submit(this::expectPostQueryRequestChunkHandler);

assertThatEventually(requestStarted::get, is(true));
Expand Down