Skip to content

Commit

Permalink
Fix headers provided by HeaderConverter interface to http connector. …
Browse files Browse the repository at this point in the history
…Fix issue null message when client is not reachable. Add tests
  • Loading branch information
oBJIADo committed Feb 3, 2025
1 parent 0e4573c commit 2526cb7
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,8 @@ private boolean recordSent(ChangeEvent<Object, Object> record, UUID messageId) t
HttpRequest request = requestBuilder.build();

r = client.send(request, HttpResponse.BodyHandlers.ofString());
}
catch (IOException ioe) {
if (!ioe.getMessage().contains("GOAWAY")) {
} catch (IOException ioe) {
if (ioe.getMessage() == null || !ioe.getMessage().contains("GOAWAY")) {
throw new InterruptedException(ioe.toString());
}
LOGGER.info("HTTP/2 GOAWAY received: {}", ioe.getMessage());
Expand Down Expand Up @@ -244,4 +243,17 @@ HttpRequest.Builder generateRequest(ChangeEvent<Object, Object> record) {

return builder;
}

@VisibleForTesting
@Override
protected String getString(Object object) {
if (object instanceof String) {
return (String) object;
}
else if (object instanceof byte[]) {
return new String((byte[]) object);
}

throw new DebeziumException(unsupportedTypeMessage(object));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,18 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Stream;

import io.debezium.DebeziumException;
import org.eclipse.microprofile.config.Config;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import io.debezium.engine.ChangeEvent;
import io.debezium.engine.Header;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

public class HttpChangeConsumerTest {

Expand Down Expand Up @@ -90,4 +96,30 @@ private Config generateMockConfig(Map<String, Object> config) {

return result;
}

@ParameterizedTest
@MethodSource("provideHeadersToConvert")
public void verifyGetStringPositive(String expected, Object bytesValue) {
HttpChangeConsumer changeConsumer = new HttpChangeConsumer();

String resultedString = changeConsumer.getString(bytesValue);
assertEquals(expected, resultedString);
}

private static Stream<Arguments> provideHeadersToConvert() {
return Stream.of(Arguments.of("value", "value".getBytes()),
Arguments.of("theValue", "theValue"),
Arguments.of("", new byte[0]));
}

@ParameterizedTest
@MethodSource("provideInvalidHeadersToConvert")
public void verifyGetStringNegative(Object bytesValue) {
HttpChangeConsumer changeConsumer = new HttpChangeConsumer();
Assertions.assertThrows(DebeziumException.class, () -> changeConsumer.getString(bytesValue));
}

private static Stream<Object> provideInvalidHeadersToConvert() {
return Stream.of(null, new Object(), new int[0]);
}
}

0 comments on commit 2526cb7

Please sign in to comment.