Skip to content

Commit

Permalink
ci: fix compilation (#241)
Browse files Browse the repository at this point in the history
  • Loading branch information
ndr-brt authored Nov 6, 2024
1 parent 7f07a45 commit c05b84a
Show file tree
Hide file tree
Showing 9 changed files with 22 additions and 169 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void initialize(ServiceExtensionContext context) {
var executorService = executorInstrumentation.instrument(
Executors.newFixedThreadPool(bigQueryConfiguration.threadPoolSize()), "BigQuery Source");

var sourceFactory = new BigQueryDataSourceFactory(bigQueryConfiguration, monitor, paramsProvider, typeManager, executorService, iamService);
var sourceFactory = new BigQueryDataSourceFactory(bigQueryConfiguration, monitor, paramsProvider, executorService, iamService);
pipelineService.registerFactory(sourceFactory);

var sinkFactory = new BigQueryDataSinkFactory(bigQueryConfiguration, executorContainer.getExecutorService(), monitor, vault, typeManager, paramsProvider, iamService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class BigQueryDataSinkFactory implements DataSinkFactory {
private final TypeManager typeManager;
private final BigQueryRequestParamsProvider requestParamsProvider;
private final BigQuerySinkDataAddressValidator sinkDataAddressValidator = new BigQuerySinkDataAddressValidator();
private IamService iamService;
private final IamService iamService;

public BigQueryDataSinkFactory(
BigQueryConfiguration configuration,
Expand All @@ -72,11 +72,6 @@ public BigQueryDataSinkFactory(
this.iamService = iamService;
}

@Override
public boolean canHandle(DataFlowStartMessage message) {
return BIGQUERY_DATA.equals(message.getDestinationDataAddress().getType());
}

@Override
public @NotNull Result<Void> validateRequest(DataFlowStartMessage message) {
// canHandle has been already invoked to have this factory selected.
Expand All @@ -95,10 +90,6 @@ public String supportedType() {

@Override
public DataSink createSink(DataFlowStartMessage message) {
if (!canHandle(message)) {
throw new GcpException("BigQuery Data Sink cannot create sink for request type " + message.getSourceDataAddress().getType());
}

monitor.info("BigQuery Data Sink Factory " + message.getId());
var params = requestParamsProvider.provideSinkParams(message);
var target = params.getTarget();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,10 @@
import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSource;
import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSourceFactory;
import org.eclipse.edc.gcp.bigquery.BigQueryConfiguration;
import org.eclipse.edc.gcp.bigquery.service.BigQueryServiceSchema;
import org.eclipse.edc.gcp.bigquery.validation.BigQuerySourceDataAddressValidator;
import org.eclipse.edc.gcp.common.GcpException;
import org.eclipse.edc.gcp.iam.IamService;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.result.Result;
import org.eclipse.edc.spi.types.TypeManager;
import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage;
import org.jetbrains.annotations.NotNull;

Expand All @@ -41,28 +38,21 @@ public class BigQueryDataSourceFactory implements DataSourceFactory {
private final BigQueryConfiguration configuration;
private final BigQueryRequestParamsProvider requestParamsProvider;
private final Monitor monitor;
private final TypeManager typeManager;
private final ExecutorService executorService;
private final BigQuerySourceDataAddressValidator sourceDataAddressValidator = new BigQuerySourceDataAddressValidator();
private IamService iamService;
private final IamService iamService;

public BigQueryDataSourceFactory(BigQueryConfiguration configuration, Monitor monitor,
BigQueryRequestParamsProvider requestParamsProvider,
TypeManager typeManager, ExecutorService executorService,
ExecutorService executorService,
IamService iamService) {
this.configuration = configuration;
this.monitor = monitor;
this.requestParamsProvider = requestParamsProvider;
this.typeManager = typeManager;
this.executorService = executorService;
this.iamService = iamService;
}

@Override
public boolean canHandle(DataFlowStartMessage message) {
return BigQueryServiceSchema.BIGQUERY_DATA.equals(message.getSourceDataAddress().getType());
}

@Override
public @NotNull Result<Void> validateRequest(DataFlowStartMessage message) {
// canHandle has been already invoked to have this factory selected.
Expand All @@ -81,10 +71,6 @@ public String supportedType() {

@Override
public DataSource createSource(DataFlowStartMessage message) {
if (!canHandle(message)) {
throw new GcpException("BigQuery Data Source cannot create source for request type " + message.getSourceDataAddress().getType());
}

var params = requestParamsProvider.provideSourceParams(message);
var target = params.getTarget();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import org.eclipse.edc.connector.dataplane.gcp.bigquery.params.BigQueryRequestParamsProvider;
import org.eclipse.edc.gcp.bigquery.BigQueryConfiguration;
import org.eclipse.edc.gcp.bigquery.service.BigQueryServiceSchema;
import org.eclipse.edc.gcp.common.GcpException;
import org.eclipse.edc.gcp.iam.IamService;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.security.Vault;
Expand All @@ -30,58 +29,30 @@
import java.util.concurrent.ExecutorService;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.when;

public class BigQueryDataSinkFactoryTest {
private static final String TEST_PROJECT = "test-project";
private static final String TEST_DATASET = "test-dataset";
private static final String TEST_TABLE = "test-table";
private static final String TEST_OTHER_TYPE = "AnotherDataAddressType";
private static final String TEST_REQUEST_ID = "request-id";
private static final String TEST_PROCESS_ID = "process-id";
private static final String TEST_CUSTOMER_NAME = "customer-name";
private static final String TEST_SINK_SERVICE_ACCOUNT_NAME = "sinkAccount";
private BigQueryConfiguration bigQueryConfiguration = new BigQueryConfiguration(null, null, "testEndpoint", 0);
private TypeManager typeManager = mock();
private Monitor monitor = mock();
private ExecutorService executorService = mock();
private Vault vault = mock();
private IamService iamService = mock();
private final BigQueryConfiguration bigQueryConfiguration = new BigQueryConfiguration(null, null, "testEndpoint", 0);
private final TypeManager typeManager = mock();
private final Monitor monitor = mock();
private final ExecutorService executorService = mock();
private final Vault vault = mock();
private final IamService iamService = mock();

@BeforeEach
void setup() {
reset(monitor);
reset(typeManager);
reset(executorService);
reset(vault);
reset(iamService);

when(monitor.withPrefix(any(String.class))).thenReturn(monitor);
}

@Test
void testCanHandle() {
var provider = new BigQueryRequestParamsProvider();
var factory = new BigQueryDataSinkFactory(bigQueryConfiguration, executorService, monitor, vault, typeManager, provider, iamService);

var bqDataFlowRequest = getDataFlowRequest(BigQueryServiceSchema.BIGQUERY_DATA);

assertThat(factory.canHandle(bqDataFlowRequest)).isTrue();

var otherDataFlowRequest = getDataFlowRequest(TEST_OTHER_TYPE);

assertThat(factory.canHandle(otherDataFlowRequest)).isFalse();
}

@Test
void testValidateRequest() {
// TODO add tests if validateRequest body is implemented with specific tests.
}

@Test
void testCreateSink() {
var provider = new BigQueryRequestParamsProvider();
Expand All @@ -91,11 +62,6 @@ void testCreateSink() {

var bqSink = factory.createSink(bqDataFlowRequest);
assertThat(bqSink).isNotNull();

var otherDataFlowRequest = getDataFlowRequest(TEST_OTHER_TYPE);

Throwable exception = assertThrows(GcpException.class, () -> factory.createSink(otherDataFlowRequest));
assertThat(exception.getMessage()).isEqualTo("BigQuery Data Sink cannot create sink for request type " + TEST_OTHER_TYPE);
}

private DataFlowStartMessage getDataFlowRequest(String type) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@
import org.eclipse.edc.gcp.bigquery.BigQueryConfiguration;
import org.eclipse.edc.gcp.bigquery.service.BigQueryServiceSchema;
import org.eclipse.edc.gcp.common.GcpConfiguration;
import org.eclipse.edc.gcp.common.GcpException;
import org.eclipse.edc.gcp.iam.IamService;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.types.TypeManager;
import org.eclipse.edc.spi.types.domain.DataAddress;
import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -31,8 +29,6 @@
import java.util.concurrent.Executors;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand All @@ -42,58 +38,30 @@ public class BigQueryDataSourceFactoryTest {
private static final String TEST_DATASET = "test-dataset";
private static final String TEST_TABLE = "test-table";
private static final String TEST_QUERY = "select * from " + TEST_TABLE + ";";
private static final String TEST_OTHER_TYPE = "AnotherDataAddressType";
private static final String TEST_REQUEST_ID = "request-id";
private static final String TEST_PROCESS_ID = "process-id";
private static final String TEST_CUSTOMER_NAME = "customer-name";
private static final String TEST_SINK_SERVICE_ACCOUNT_NAME = "sinkAccount";
private final ExecutorService executionPool = Executors.newFixedThreadPool(2);
private GcpConfiguration gcpConfiguration = new GcpConfiguration(TEST_PROJECT, TEST_SINK_SERVICE_ACCOUNT_NAME, null, null);
private BigQueryConfiguration configuration = new BigQueryConfiguration(gcpConfiguration, "testEndpoint", null, 0);
private TypeManager typeManager = mock();
private Monitor monitor = mock();
private IamService iamService = mock();
private final GcpConfiguration gcpConfiguration = new GcpConfiguration(TEST_PROJECT, TEST_SINK_SERVICE_ACCOUNT_NAME, null, null);
private final BigQueryConfiguration configuration = new BigQueryConfiguration(gcpConfiguration, "testEndpoint", null, 0);
private final Monitor monitor = mock();
private final IamService iamService = mock();

@BeforeEach
void setup() {
when(monitor.withPrefix(any(String.class))).thenReturn(monitor);
}

@Test
void testCanHandle() {
var provider = new BigQueryRequestParamsProvider();
var factory = new BigQueryDataSourceFactory(configuration, monitor, provider, typeManager, executionPool, iamService);

var bqDataFlowRequest = getDataFlowRequest(BigQueryServiceSchema.BIGQUERY_DATA);

assertThat(factory.canHandle(bqDataFlowRequest)).isTrue();

var otherDataFlowRequest = getDataFlowRequest(TEST_OTHER_TYPE);

assertThat(factory.canHandle(otherDataFlowRequest)).isFalse();
}

@Test
void testValidateRequest() {
// TODO add tests if validateRequest body is implemented with specific tests.
}

@Test
void testCreateSource() {
var provider = new BigQueryRequestParamsProvider();
var factory = new BigQueryDataSourceFactory(configuration, monitor, provider, typeManager, executionPool, iamService);

var factory = new BigQueryDataSourceFactory(configuration, monitor, provider, executionPool, iamService);
var bqDataFlowRequest = getDataFlowRequest(BigQueryServiceSchema.BIGQUERY_DATA);

try (var bqSource = factory.createSource(bqDataFlowRequest)) {
assertThat(bqSource).isNotNull();
} catch (Exception e) {
fail(e.getMessage());
}
var bqSource = factory.createSource(bqDataFlowRequest);

var otherDataFlowRequest = getDataFlowRequest(TEST_OTHER_TYPE);
var exception = assertThrows(GcpException.class, () -> factory.createSource(otherDataFlowRequest));
assertThat(exception.getMessage()).isEqualTo("BigQuery Data Source cannot create source for request type " + TEST_OTHER_TYPE);
assertThat(bqSource).isNotNull();
}

private DataFlowStartMessage getDataFlowRequest(String type) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,6 @@ public String supportedType() {
return GcsStoreSchema.TYPE;
}

@Override
public boolean canHandle(DataFlowStartMessage request) {
return GcsStoreSchema.TYPE.equals(request.getDestinationDataAddress().getType());
}

@Override
public @NotNull Result<Void> validateRequest(DataFlowStartMessage request) {
var destination = request.getDestinationDataAddress();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,11 @@ public GcsDataSourceFactory(Monitor monitor) {
this.monitor = monitor;
}


@Override
public String supportedType() {
return GcsStoreSchema.TYPE;
}

@Override
public boolean canHandle(DataFlowStartMessage request) {
return GcsStoreSchema.TYPE.equals(request.getSourceDataAddress().getType());
}

@Override
public @NotNull Result<Void> validateRequest(DataFlowStartMessage request) {
var source = request.getSourceDataAddress();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.eclipse.edc.gcp.storage.GcsStoreSchema;
import org.eclipse.edc.json.JacksonTypeManager;
import org.eclipse.edc.spi.EdcException;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.security.Vault;
import org.eclipse.edc.spi.types.TypeManager;
import org.eclipse.edc.spi.types.domain.DataAddress;
Expand All @@ -33,7 +32,6 @@
import org.junit.jupiter.params.provider.ArgumentsSource;

import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.stream.Stream;

import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -47,37 +45,13 @@ class GcsDataSinkFactoryTest {
private Vault vault = mock();
private IamService iamService = mock();
private GcsDataSinkFactory factory = new GcsDataSinkFactory(
mock(ExecutorService.class),
mock(Monitor.class),
mock(),
mock(),
vault,
typeManager,
iamService
);

@Test
void canHandle_returnsTrueWhenExpectedType() {
var destination = DataAddress.Builder
.newInstance()
.type(GcsStoreSchema.TYPE)
.build();

var result = factory.canHandle(createRequest(destination));

assertThat(result).isTrue();
}

@Test
void canHandle_returnsFalseWhenUnexpectedType() {
var destination = DataAddress.Builder
.newInstance()
.type("Not Google Storage")
.build();

var result = factory.canHandle(createRequest(destination));

assertThat(result).isFalse();
}

@Test
void validate_ShouldSucceedIfPropertiesAreValid() {
var destination = DataAddress.Builder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,29 +31,8 @@

class GcsDataSourceFactoryTest {

Monitor monitor = mock(Monitor.class);

private final GcsDataSourceFactory factory =
new GcsDataSourceFactory(monitor);

@Test
void canHandle_returnsTrueWhenExpectedType() {
var dataAddress = createDataAddress(GcsStoreSchema.TYPE)
.build();
var result = factory.canHandle(TestFunctions.createRequest(dataAddress));

assertThat(result).isTrue();
}

@Test
void canHandle_returnsFalseWhenUnexpectedType() {
var dataAddress = createDataAddress("Not Google Storage")
.build();

var result = factory.canHandle(TestFunctions.createRequest(dataAddress));

assertThat(result).isFalse();
}
private final Monitor monitor = mock(Monitor.class);
private final GcsDataSourceFactory factory = new GcsDataSourceFactory(monitor);

@Test
void validate_ShouldSucceedIfPropertiesAreValid() {
Expand Down Expand Up @@ -95,4 +74,4 @@ public Stream<Arguments> provideArguments(ExtensionContext context) {
);
}
}
}
}

0 comments on commit c05b84a

Please sign in to comment.