Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-2160] Setup Temporal docker Cluster for CI Tests and added tests for AbstractNestingExecWorkflowImpl #4059

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion gobblin-temporal/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,14 @@ dependencies {
testCompile project(":gobblin-example")

testCompile externalDependency.testng
testCompile externalDependency.mockito
testCompile externalDependency.mockitoInline
testCompile externalDependency.powerMockApi
testCompile externalDependency.powerMockModule
testCompile externalDependency.hadoopYarnMiniCluster
testCompile externalDependency.curatorFramework
testCompile externalDependency.curatorTest


testCompile ('com.google.inject:guice:3.0') {
force = true
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@
import io.temporal.workflow.Workflow;
import lombok.extern.slf4j.Slf4j;

import com.google.common.annotations.VisibleForTesting;


import org.apache.gobblin.temporal.util.nesting.work.WorkflowAddr;
import org.apache.gobblin.temporal.util.nesting.work.Workload;

Expand Down Expand Up @@ -115,8 +118,9 @@ protected NestingExecWorkflow<WORK_ITEM> createChildWorkflow(final WorkflowAddr
return Workflow.newChildWorkflowStub(NestingExecWorkflow.class, childOpts);
}

@VisibleForTesting
/** @return how long to pause prior to creating a child workflow, based on `numDirectLeavesChildMayHave` */
protected Duration calcPauseDurationBeforeCreatingSubTree(int numDirectLeavesChildMayHave) {
public Duration calcPauseDurationBeforeCreatingSubTree(int numDirectLeavesChildMayHave) {
// (only pause when an appreciable number of leaves)
// TODO: use a configuration value, for simpler adjustment, rather than hard-code
return numDirectLeavesChildMayHave > MAX_CHILD_SUB_TREE_LEAVES_BEFORE_SHOULD_PAUSE_DEFAULT
Expand All @@ -130,11 +134,9 @@ protected Duration calcPauseDurationBeforeCreatingSubTree(int numDirectLeavesChi
* List<Integer> naiveUniformity = Collections.nCopies(numSubTreesPerSubTree, numSubTreeChildren);
* @return each sub-tree's desired size, in ascending sub-tree order
*/
protected static List<Integer> consolidateSubTreeGrandChildren(
final int numSubTreesPerSubTree,
final int numChildrenTotal,
final int numSubTreeChildren
) {
@VisibleForTesting
public static List<Integer> consolidateSubTreeGrandChildren(final int numSubTreesPerSubTree,
pratapaditya04 marked this conversation as resolved.
Show resolved Hide resolved
final int numChildrenTotal, final int numSubTreeChildren) {
if (numSubTreesPerSubTree <= 0) {
return Lists.newArrayList();
} else if (isSqrt(numSubTreeChildren, numChildrenTotal)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,4 @@ public void testFetchesUniqueWorkDirsFromMultiWorkUnits() {
Set<String> output = GenerateWorkUnitsImpl.calculateWorkDirsToCleanup(workUnitStream);
Assert.assertEquals(output.size(), 11);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gobblin.temporal.ddm.utils;

import java.io.DataOutputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Properties;

import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.runtime.TaskState;
import org.apache.gobblin.source.Source;
import org.apache.gobblin.metastore.StateStore;
import org.apache.gobblin.temporal.ddm.util.JobStateUtils;


public class JobStateUtilTest {

private JobState jobState;
private FileSystem fileSystem;

@BeforeMethod
public void setUp() {
jobState = Mockito.mock(JobState.class);
fileSystem = Mockito.mock(FileSystem.class);
}

@Test
public void testOpenFileSystem() throws IOException {

Mockito.when(jobState.getProp(Mockito.anyString(), Mockito.anyString())).thenReturn("file:///test");
Mockito.when(jobState.getProperties()).thenReturn(new Properties());

FileSystem fs = JobStateUtils.openFileSystem(jobState);

Assert.assertNotNull(fs);
Mockito.verify(jobState,Mockito.times(1)).getProp(Mockito.anyString(), Mockito.anyString());
}

@Test
public void testCreateSource() throws ReflectiveOperationException {
Mockito.when(jobState.getProp(Mockito.anyString()))
.thenReturn("org.apache.gobblin.source.extractor.filebased.TextFileBasedSource");
Source<?, ?> source = JobStateUtils.createSource(jobState);
Assert.assertNotNull(source);
}

@Test
public void testOpenTaskStateStoreUncached() throws URISyntaxException {
Mockito.when(jobState.getProp(Mockito.anyString())).thenReturn("file:///test");
Mockito.when(jobState.getJobId()).thenReturn("testJobId");
Mockito.when(jobState.getJobName()).thenReturn("testJobName");
Mockito.when(fileSystem.makeQualified(Mockito.any()))
.thenReturn(new Path("file:///test/testJobName/testJobId/output"));
Mockito.when(fileSystem.getUri()).thenReturn(new URI("file:///test/testJobName/testJobId/output"));

StateStore<TaskState> stateStore = JobStateUtils.openTaskStateStoreUncached(jobState, fileSystem);

Assert.assertNotNull(stateStore);
}

@Test
public void testGetFileSystemUri() {
Mockito.when(jobState.getProp(Mockito.anyString(), Mockito.anyString())).thenReturn("file:///test");
URI fsUri = JobStateUtils.getFileSystemUri(jobState);
Assert.assertEquals(URI.create("file:///test"), fsUri);
Mockito.verify(jobState).getProp(Mockito.anyString(), Mockito.anyString());
}

@Test
public void testGetWorkDirRoot() {
Mockito.when(jobState.getProp(Mockito.anyString())).thenReturn("/tmp");
Mockito.when(jobState.getJobName()).thenReturn("testJob");
Mockito.when(jobState.getJobId()).thenReturn("jobId123");
Path rootPath = JobStateUtils.getWorkDirRoot(jobState);
Assert.assertEquals(new Path("/tmp/testJob/jobId123"), rootPath);
Mockito.verify(jobState, Mockito.times(1)).getProp(Mockito.anyString());
}

@Test
public void testGetWorkUnitsPath() {
Mockito.when(jobState.getProp(Mockito.anyString())).thenReturn("/tmp");
Mockito.when(jobState.getJobName()).thenReturn("testJob");
Mockito.when(jobState.getJobId()).thenReturn("jobId123");
Path workUnitsPath = JobStateUtils.getWorkUnitsPath(jobState);
Assert.assertEquals(new Path("/tmp/testJob/jobId123/input"), workUnitsPath);
}

@Test
public void testGetTaskStateStorePath() throws IOException {
Mockito.when(fileSystem.makeQualified(Mockito.any(Path.class))).thenReturn(new Path("/qualified/path"));
Mockito.when(jobState.getProp(Mockito.anyString())).thenReturn("/tmp");
Mockito.when(jobState.getJobName()).thenReturn("testJob");
Mockito.when(jobState.getJobId()).thenReturn("jobId123");
Path taskStateStorePath = JobStateUtils.getTaskStateStorePath(jobState, fileSystem);
Assert.assertEquals(new Path("/qualified/path"), taskStateStorePath);
}

@Test
public void testWriteJobState() throws IOException {
Path workDirRootPath = new Path("/tmp");
FSDataOutputStream dos = Mockito.mock(FSDataOutputStream.class);
Mockito.when(fileSystem.create(Mockito.any(Path.class))).thenReturn(dos);

JobStateUtils.writeJobState(jobState, workDirRootPath, fileSystem);

Mockito.verify(fileSystem).create(Mockito.any(Path.class));
Mockito.verify(jobState).write(Mockito.any(DataOutputStream.class), Mockito.anyBoolean(), Mockito.anyBoolean());
}

@Test
public void testGetSharedResourcesBroker() {
Mockito.when(jobState.getProperties()).thenReturn(System.getProperties());
Mockito.when(jobState.getJobName()).thenReturn("testJob");
Mockito.when(jobState.getJobId()).thenReturn("jobId123");
Assert.assertNotNull(JobStateUtils.getSharedResourcesBroker(jobState));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.temporal.ddm.workflow.impl;
pratapaditya04 marked this conversation as resolved.
Show resolved Hide resolved

import java.time.Duration;
import java.util.List;
import java.util.Optional;

import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import io.temporal.workflow.Async;
import io.temporal.workflow.Promise;
import io.temporal.workflow.Workflow;

import org.apache.gobblin.temporal.util.nesting.work.WorkflowAddr;
import org.apache.gobblin.temporal.util.nesting.work.Workload;
import org.apache.gobblin.temporal.util.nesting.workflow.AbstractNestingExecWorkflowImpl;


@RunWith(PowerMockRunner.class)
@PrepareForTest(Workflow.class)
public class AbstractNestingExecWorkflowImplTest {

@Mock
private Workload<String> mockWorkload;

@Mock
private WorkflowAddr mockWorkflowAddr;

@Mock
private Workload.WorkSpan<String> mockWorkSpan;

@Mock
private Promise<Object> mockPromise;

private AbstractNestingExecWorkflowImpl<String, Object> workflow;

@BeforeClass
public void setup() {
// PowerMockito is required to mock static methods in the Workflow class
Mockito.mockStatic(Workflow.class);
Mockito.mockStatic(Async.class);
Mockito.mockStatic(Promise.class);
this.mockWorkload = Mockito.mock(Workload.class);
this.mockWorkflowAddr = Mockito.mock(WorkflowAddr.class);
this.mockWorkSpan = Mockito.mock(Workload.WorkSpan.class);
this.mockPromise = Mockito.mock(Promise.class);

workflow = new AbstractNestingExecWorkflowImpl<String, Object>() {
@Override
protected Promise<Object> launchAsyncActivity(String task) {
return mockPromise;
}
};
}

@Test
public void testPerformWorkload_NoWorkSpan() {
// Arrange
Mockito.when(mockWorkload.getSpan(Mockito.anyInt(), Mockito.anyInt())).thenReturn(Optional.empty());

// Act
int result = workflow.performWorkload(mockWorkflowAddr, mockWorkload, 0, 10, 5, Optional.empty());

// Assert
Assert.assertEquals(0, result);
Mockito.verify(mockWorkload, Mockito.times(2)).getSpan(0, 5);
}

@Test
public void testCalcPauseDurationBeforeCreatingSubTree_NoPause() {
// Act
Duration result = workflow.calcPauseDurationBeforeCreatingSubTree(50);

// Assert
Assert.assertEquals(Duration.ZERO, result);
}

@Test
public void testCalcPauseDurationBeforeCreatingSubTree_PauseRequired() {
// Act
Duration result = workflow.calcPauseDurationBeforeCreatingSubTree(150);

// Assert
Assert.assertEquals(
Duration.ofSeconds(AbstractNestingExecWorkflowImpl.NUM_SECONDS_TO_PAUSE_BEFORE_CREATING_SUB_TREE_DEFAULT),
result);
}

@Test
public void testConsolidateSubTreeGrandChildren() {
// Act
List<Integer> result = AbstractNestingExecWorkflowImpl.consolidateSubTreeGrandChildren(3, 10, 2);

// Assert
Assert.assertEquals(3, result.size());
Assert.assertEquals(Integer.valueOf(0), result.get(0));
Assert.assertEquals(Integer.valueOf(0), result.get(1));
Assert.assertEquals(Integer.valueOf(6), result.get(2));
}

@Test(expectedExceptions = AssertionError.class)
public void testPerformWorkload_LaunchesChildWorkflows() {
pratapaditya04 marked this conversation as resolved.
Show resolved Hide resolved
// Arrange
Mockito.when(mockWorkload.getSpan(Mockito.anyInt(), Mockito.anyInt())).thenReturn(Optional.of(mockWorkSpan));
Mockito.when(mockWorkSpan.getNumElems()).thenReturn(5);
Mockito.when(mockWorkSpan.next()).thenReturn("task1");
Mockito.when(mockWorkload.isIndexKnownToExceed(Mockito.anyInt())).thenReturn(false);

// Mock the child workflow
Mockito.when(Async.function(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.anyInt(), Mockito.anyInt(),
Mockito.anyInt(), Mockito.any())).thenReturn(mockPromise);
Mockito.when(mockPromise.get()).thenReturn(5);
// Act
int result = workflow.performWorkload(mockWorkflowAddr, mockWorkload, 0, 10, 5, Optional.empty());
}
}