Skip to content

Commit

Permalink
refactor: incorrect paramaters order in dataplane logs regarding Data…
Browse files Browse the repository at this point in the history
…Flow Id (#4833)

* Refactor incorrect paramaters' order in dataplane logs regarding DataFlow Id.

* Added unit tests.
  • Loading branch information
bmg13 authored Feb 21, 2025
1 parent bb110c5 commit 883b695
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public CompletableFuture<StreamResult<Object>> transfer(DataFlowStartMessage req
var source = sourceFactory.createSource(request);
sources.put(request.getProcessId(), source);
monitor.debug(() -> format("Transferring from %s to %s for flow id: %s.",
request.getProcessId(), request.getSourceDataAddress().getType(), request.getDestinationDataAddress().getType()));
request.getSourceDataAddress().getType(), request.getDestinationDataAddress().getType(), request.getProcessId()));
return sink.transfer(source)
.thenApply(result -> {
terminate(request.getProcessId());
Expand Down Expand Up @@ -179,13 +179,13 @@ private DataSinkFactory getSinkFactory(DataFlowStartMessage request) {
@NotNull
private CompletableFuture<StreamResult<Object>> noSourceFactory(DataFlowStartMessage request) {
return completedFuture(StreamResult.error("Unknown data source type %s for flow id: %s.".formatted(
request.getProcessId(), request.getSourceDataAddress().getType())));
request.getSourceDataAddress().getType(), request.getProcessId())));
}

@NotNull
private CompletableFuture<StreamResult<Object>> noSinkFactory(DataFlowStartMessage request) {
return completedFuture(StreamResult.error("Unknown data sink type %s for flow id: %s.".formatted(
request.getProcessId(), request.getDestinationDataAddress().getType())));
request.getDestinationDataAddress().getType(), request.getProcessId())));
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

import static java.lang.String.format;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.assertj.core.api.Assertions.assertThat;
import static org.eclipse.edc.connector.dataplane.spi.pipeline.StreamFailure.Reason.GENERAL_ERROR;
Expand All @@ -53,6 +54,8 @@

class PipelineServiceImplTest {

private static final String PROCESS_ID = "1";

private final Monitor monitor = mock();

private final DataSourceFactory sourceFactory = mock();
Expand Down Expand Up @@ -109,6 +112,51 @@ public CompletableFuture<StreamResult<Object>> transfer(DataSource source) {
verifyNoInteractions(sinkFactory);
verify(source).close();
}

@Test
void transfer_withUnknownSource_shouldFail() {
var flowRequest = dataFlow("wrong-source", "custom-destination").toRequest();
var expectedErrorMessage = format("Unknown data source type wrong-source for flow id: %s.", PROCESS_ID);

when(sourceFactory.supportedType()).thenReturn("source");
when(sourceFactory.createSource(any())).thenReturn(source);

var customSink = new DataSink() {
@Override
public CompletableFuture<StreamResult<Object>> transfer(DataSource source) {
return CompletableFuture.completedFuture(StreamResult.success("test-response"));
}
};

var future = service.transfer(flowRequest, customSink);
assertThat(future).succeedsWithin(Duration.ofSeconds(5))
.satisfies(res -> assertThat(res).isFailed())
.satisfies(res -> assertThat(res.getFailure().getMessages()).hasSize(1))
.satisfies(res -> assertThat(res.getFailure().getMessages().get(0)).isEqualTo(expectedErrorMessage));

verify(sourceFactory).supportedType();
verifyNoInteractions(sinkFactory);
verifyNoInteractions(source);
}

@Test
void transfer_withUnknownSink_shouldFail() {
var flowRequest = dataFlow("source", "custom-destination").toRequest();
var expectedErrorMessage = format("Unknown data sink type custom-destination for flow id: %s.", PROCESS_ID);

when(sourceFactory.supportedType()).thenReturn("source");
when(sourceFactory.createSource(any())).thenReturn(source);

var future = service.transfer(flowRequest);
assertThat(future).succeedsWithin(Duration.ofSeconds(5))
.satisfies(res -> assertThat(res).isFailed())
.satisfies(res -> assertThat(res.getFailure().getMessages()).hasSize(1))
.satisfies(res -> assertThat(res.getFailure().getMessages().get(0)).isEqualTo(expectedErrorMessage));

verify(sinkFactory).supportedType();
verifyNoInteractions(sourceFactory);
verifyNoInteractions(source);
}
}

@Nested
Expand Down Expand Up @@ -216,7 +264,7 @@ void shouldCloseAllTheOngoingDataFlows() throws Exception {

private DataFlow dataFlow(String sourceType, String destinationType) {
return DataFlow.Builder.newInstance()
.id("1")
.id(PROCESS_ID)
.source(DataAddress.Builder.newInstance().type(sourceType).build())
.destination(DataAddress.Builder.newInstance().type(destinationType).build())
.build();
Expand Down

0 comments on commit 883b695

Please sign in to comment.