From 883b695969f6b1d21fe7444ab124f4fcf4c1679c Mon Sep 17 00:00:00 2001 From: bmg13 <18561736+bmg13@users.noreply.github.com> Date: Fri, 21 Feb 2025 15:59:45 +0000 Subject: [PATCH] refactor: incorrect paramaters order in dataplane logs regarding DataFlow Id (#4833) * Refactor incorrect paramaters' order in dataplane logs regarding DataFlow Id. * Added unit tests. --- .../pipeline/PipelineServiceImpl.java | 6 +-- .../pipeline/PipelineServiceImplTest.java | 50 ++++++++++++++++++- 2 files changed, 52 insertions(+), 4 deletions(-) diff --git a/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/pipeline/PipelineServiceImpl.java b/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/pipeline/PipelineServiceImpl.java index 051d8c0bb14..a6b5795d2cf 100644 --- a/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/pipeline/PipelineServiceImpl.java +++ b/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/pipeline/PipelineServiceImpl.java @@ -108,7 +108,7 @@ public CompletableFuture> transfer(DataFlowStartMessage req var source = sourceFactory.createSource(request); sources.put(request.getProcessId(), source); monitor.debug(() -> format("Transferring from %s to %s for flow id: %s.", - request.getProcessId(), request.getSourceDataAddress().getType(), request.getDestinationDataAddress().getType())); + request.getSourceDataAddress().getType(), request.getDestinationDataAddress().getType(), request.getProcessId())); return sink.transfer(source) .thenApply(result -> { terminate(request.getProcessId()); @@ -179,13 +179,13 @@ private DataSinkFactory getSinkFactory(DataFlowStartMessage request) { @NotNull private CompletableFuture> noSourceFactory(DataFlowStartMessage request) { return completedFuture(StreamResult.error("Unknown data source type %s for flow id: %s.".formatted( - request.getProcessId(), request.getSourceDataAddress().getType()))); + request.getSourceDataAddress().getType(), request.getProcessId()))); } @NotNull private CompletableFuture> noSinkFactory(DataFlowStartMessage request) { return completedFuture(StreamResult.error("Unknown data sink type %s for flow id: %s.".formatted( - request.getProcessId(), request.getDestinationDataAddress().getType()))); + request.getDestinationDataAddress().getType(), request.getProcessId()))); } diff --git a/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/pipeline/PipelineServiceImplTest.java b/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/pipeline/PipelineServiceImplTest.java index 230ce63746f..693a3d3f3f7 100644 --- a/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/pipeline/PipelineServiceImplTest.java +++ b/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/pipeline/PipelineServiceImplTest.java @@ -38,6 +38,7 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Stream; +import static java.lang.String.format; import static java.util.concurrent.CompletableFuture.completedFuture; import static org.assertj.core.api.Assertions.assertThat; import static org.eclipse.edc.connector.dataplane.spi.pipeline.StreamFailure.Reason.GENERAL_ERROR; @@ -53,6 +54,8 @@ class PipelineServiceImplTest { + private static final String PROCESS_ID = "1"; + private final Monitor monitor = mock(); private final DataSourceFactory sourceFactory = mock(); @@ -109,6 +112,51 @@ public CompletableFuture> transfer(DataSource source) { verifyNoInteractions(sinkFactory); verify(source).close(); } + + @Test + void transfer_withUnknownSource_shouldFail() { + var flowRequest = dataFlow("wrong-source", "custom-destination").toRequest(); + var expectedErrorMessage = format("Unknown data source type wrong-source for flow id: %s.", PROCESS_ID); + + when(sourceFactory.supportedType()).thenReturn("source"); + when(sourceFactory.createSource(any())).thenReturn(source); + + var customSink = new DataSink() { + @Override + public CompletableFuture> transfer(DataSource source) { + return CompletableFuture.completedFuture(StreamResult.success("test-response")); + } + }; + + var future = service.transfer(flowRequest, customSink); + assertThat(future).succeedsWithin(Duration.ofSeconds(5)) + .satisfies(res -> assertThat(res).isFailed()) + .satisfies(res -> assertThat(res.getFailure().getMessages()).hasSize(1)) + .satisfies(res -> assertThat(res.getFailure().getMessages().get(0)).isEqualTo(expectedErrorMessage)); + + verify(sourceFactory).supportedType(); + verifyNoInteractions(sinkFactory); + verifyNoInteractions(source); + } + + @Test + void transfer_withUnknownSink_shouldFail() { + var flowRequest = dataFlow("source", "custom-destination").toRequest(); + var expectedErrorMessage = format("Unknown data sink type custom-destination for flow id: %s.", PROCESS_ID); + + when(sourceFactory.supportedType()).thenReturn("source"); + when(sourceFactory.createSource(any())).thenReturn(source); + + var future = service.transfer(flowRequest); + assertThat(future).succeedsWithin(Duration.ofSeconds(5)) + .satisfies(res -> assertThat(res).isFailed()) + .satisfies(res -> assertThat(res.getFailure().getMessages()).hasSize(1)) + .satisfies(res -> assertThat(res.getFailure().getMessages().get(0)).isEqualTo(expectedErrorMessage)); + + verify(sinkFactory).supportedType(); + verifyNoInteractions(sourceFactory); + verifyNoInteractions(source); + } } @Nested @@ -216,7 +264,7 @@ void shouldCloseAllTheOngoingDataFlows() throws Exception { private DataFlow dataFlow(String sourceType, String destinationType) { return DataFlow.Builder.newInstance() - .id("1") + .id(PROCESS_ID) .source(DataAddress.Builder.newInstance().type(sourceType).build()) .destination(DataAddress.Builder.newInstance().type(destinationType).build()) .build();