Skip to content
This repository has been archived by the owner on Nov 9, 2022. It is now read-only.

Commit

Permalink
Merge pull request #131 from aclavio/develop
Browse files Browse the repository at this point in the history
Release 0.8.0
  • Loading branch information
aclavio authored Oct 1, 2020
2 parents 24ea61f + 1ed8eb9 commit 0e08eeb
Show file tree
Hide file tree
Showing 49 changed files with 1,365 additions and 135 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
.project
.gradle
.idea
.tmp
build
gradle-local.properties
*.settings*
*.classpath
state-conductor-example/bin/
6 changes: 2 additions & 4 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,14 @@ There are two sub-projects.

State-conductor-modules which contains the core library that gets built into the mlBundle redistributable.

State-conductor-example which had unit tests, as well as example flows and actions.
State-conductor-example which has unit tests, as well as example flows and actions.

## Found an Issue?
If you find a bug in the source code or a mistake in the documentation, you can help us by submitting an issue to our [GitHub Issue Tracker][issue tracker]. Even better, you can submit a Pull Request with a fix for the issue you filed.


## Want a Feature?
You can request a new feature by submitting an issue to our [GitHub Issue Tracker][issue tracker]. If you
would like to implement a new feature then first create a new issue and discuss it with one of our
project maintainers.
You can request a new feature by submitting an issue to our [GitHub Issue Tracker][issue tracker]. If you would like to implement a new feature then first create a new issue and discuss it with one of our project maintainers.

## Building MarkLogic State Conductor from Source
Looking to build the code from source?
Expand Down
25 changes: 13 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,19 @@ The _State Conductor_ can be used to perform an arbitrary number of context-base

The _State Conductor_ requires a "Driver" to process documents and move them through the installed Flows' states. The _State Conductor_ supports a [Data Services](https://github.com/aclavio/marklogic-state-conductor/tree/develop/state-conductor-dataservices) driver, a [CoRB2](https://github.com/marklogic-community/corb2) driver, and a [CPF](https://docs.marklogic.com/guide/cpf) driver.

1. [Installation](#installation)
2. [Usage](#usage)
3. [Flow Files](#flow-files)
4. [Flow File Scope](#flow-file-scope)
5. [Flow File Actions](#flow-file-actions)
6. [Job Documents](#job-documents)
7. [Provenance](#provenance)
8. [Services](#services)
9. [Jobs Service](#jobs-service)
10. [Flows Service](#flows-service)
11. [Status Service](#status-service)
12. [Roadmap](#roadmap)
1. [Quick Start Guide](https://github.com/aclavio/marklogic-state-conductor/wiki/QUICKSTART)
2. [Installation](#installation)
3. [Usage](#usage)
4. [Flow Files](#flow-files)
5. [Flow File Scope](#flow-file-scope)
6. [Flow File Actions](#flow-file-actions)
7. [Job Documents](#job-documents)
8. [Provenance](#provenance)
9. [Services](#services)
10. [Jobs Service](#jobs-service)
11. [Flows Service](#flows-service)
12. [Status Service](#status-service)
13. [Roadmap](#roadmap)

## Installation <a name="installation"></a>

Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ allprojects {
}

group = "com.marklogic"
version = "0.7.0"
version = "0.8.0"
}
Empty file modified gradlew
100644 → 100755
Empty file.
4 changes: 3 additions & 1 deletion state-conductor-dataservices/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ From the root of the State Conductor project, execute the `mlDeploy` and `test`
| cooldownMillis | 5000 | (Milliseconds) If no valid jobs are found for processing, poll using this interval. |
| queueThreshold | 20000 | The upper limit for how many jobs the driver will cache for processing. After this limit is reached, polling for new jobs will fall back to the cooldownMillis interval, and jobs will not be added until the queue size falls below this threshold. |
| batchSize | 5 | How many jobs will be submitted for processing simultaneously |
| threadCount | 10 | The number of executor threads used to process jobs on the queue. Scale based on the number of available App Server threads for optimal performance. |
| metricsInterval | 5000 | (Milliseconds) How often metrics should be logged |
| fixedThreadCount | -1 | Use a fixed number of executor threads to process jobs on the queue if set. Overrides "threadsPerHost" and "maxThreadCount" when set. |
| threadsPerHost | 16 | The number of executor threads used to process jobs on the queue. Scales based on the number of available MarkLogic hosts. Capped by "maxThreadCount". |
| maxThreadCount | 128 | The maximum total number of executor threads to use when processing jobs. The number of threads used will be MIN(maxThreadCount, threadsPerHost x host count). |

11 changes: 9 additions & 2 deletions state-conductor-dataservices/data/test.properties
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
# MarkLogic connection properties
mlHost=vm1
mlPort=8888
appServicesPort=8000
username=admin
password=admin
securityContextType=digest
connectionType=direct
pollSize=1000
batchSize=5
threadCount=16
queueThreshold=5000
simpleSsl=false
flowNames=
flowStatus=
cooldownMillis=5000
cooldownMillis=10000
pollInterval=1000
metricsInterval=10000


# Thread Pool settings
#fixedThreadCount=16
threadsPerHost=16
maxThreadCount=128
7 changes: 5 additions & 2 deletions state-conductor-dataservices/docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ COPY state-conductor-dataservices.jar /state-conductor-dataservices.jar
# specify environment configs
ENV mlHost=localhost
ENV mlPort=8888
ENV appServicesPort=8000
ENV username=
ENV password=
ENV securityContextType=digest
Expand All @@ -20,8 +21,10 @@ ENV pollSize=1000
ENV pollInterval=1000
ENV cooldownMillis=5000
ENV queueThreshold=20000
ENV batchSize=5
ENV threadCount=10
ENV metricsInterval=5000
ENV batchSize=5
ENV fixedThreadCount=-1
ENV threadsPerHost=16
ENV maxThreadCount=128
# runs application
CMD ["/usr/bin/java", "-jar", "/state-conductor-dataservices.jar"]
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public void run() {
AtomicLong failed = new AtomicLong();

// grab any "new" and "working" jobs
Stream<String> jobUris = service.getJobs(batchSize, null, null, forestIds.stream());
Stream<String> jobUris = service.getJobs(1, batchSize, null, null, forestIds.stream(), null, null);

// process each of the jobs
jobUris.forEach(uri -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.marklogic.client.ext.ConfiguredDatabaseClientFactory;
import com.marklogic.client.ext.DefaultConfiguredDatabaseClientFactory;
import com.marklogic.config.StateConductorDriverConfig;
import com.marklogic.tasks.GetConfigTask;
import com.marklogic.tasks.GetJobsTask;
import com.marklogic.tasks.MetricsTask;
import com.marklogic.tasks.ProcessJobTask;
Expand All @@ -19,23 +20,30 @@
import java.io.FileInputStream;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;

public class StateConductorDriver implements Runnable, Destroyable {

private static Logger logger = LoggerFactory.getLogger(StateConductorDriver.class);
private final static Logger logger = LoggerFactory.getLogger(StateConductorDriver.class);

private StateConductorDriverConfig config;
private DatabaseClient client;
private DatabaseClient appServicesClient;
private StateConductorService service;

public StateConductorDriver(DatabaseClient client, StateConductorDriverConfig config) {
this.client = client;
public StateConductorDriver(StateConductorDriverConfig config) {
this.config = config;
service = StateConductorService.on(this.client);

ConfiguredDatabaseClientFactory configuredDatabaseClientFactory = new DefaultConfiguredDatabaseClientFactory();
client = configuredDatabaseClientFactory.newDatabaseClient(config.getDatabaseClientConfig());
appServicesClient = configuredDatabaseClientFactory.newDatabaseClient(config.getAppServicesDatabaseClientConfig());

service = StateConductorService.on(client);
}

public static Options getOptions() {
Expand Down Expand Up @@ -112,10 +120,7 @@ public static void main(String[] args) throws DestroyFailedException, IOExceptio
config = StateConductorDriverConfig.newConfig(System.getenv(), Maps.fromProperties(System.getProperties()), props);
}

ConfiguredDatabaseClientFactory configuredDatabaseClientFactory = new DefaultConfiguredDatabaseClientFactory();
DatabaseClient client = configuredDatabaseClientFactory.newDatabaseClient(config.getDatabaseClientConfig());

StateConductorDriver driver = new StateConductorDriver(client, config);
StateConductorDriver driver = new StateConductorDriver(config);
Thread driverThread = new Thread(driver);
driverThread.start();

Expand Down Expand Up @@ -153,7 +158,15 @@ public void run() {
List<ProcessJobTask> jobBuckets = new ArrayList<>();

// set up the thread pool
ExecutorService pool = Executors.newFixedThreadPool(config.getThreadCount());
int initialThreads = config.getThreadsPerHost();
if (config.useFixedThreadCount())
initialThreads = config.getFixedThreadCount();
ThreadPoolExecutor pool = new ThreadPoolExecutor(initialThreads, initialThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());

// start the getConfig thread
GetConfigTask configTask = new GetConfigTask(appServicesClient, config, pool, initialThreads);
Thread configThread = new Thread(configTask);
configThread.start();

// start the metrics thread
MetricsTask metricsTask = new MetricsTask(config, total, totalErrors);
Expand Down Expand Up @@ -197,7 +210,7 @@ public void run() {
}

if (jobBuckets.size() > 0) {
logger.info("Populated thread pool[{}] with {} batches", config.getThreadCount(), jobBuckets.size());
logger.info("Populated thread pool with {} batches", jobBuckets.size());
}

// process any results that have come in
Expand Down Expand Up @@ -241,6 +254,7 @@ public void run() {
logger.info("Stopping GetJobsTask thread...");
getJobsTask.interrupt();
metricsThread.interrupt();
configThread.interrupt();
// stop main loop
keepRunning = false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.stream.Stream;
import com.marklogic.client.io.Format;
import com.marklogic.client.io.marker.AbstractWriteHandle;
import java.io.Reader;


import com.marklogic.client.DatabaseClient;
Expand Down Expand Up @@ -33,22 +34,51 @@ private StateConductorServiceImpl(DatabaseClient dbClient) {
}

@Override
public Stream<String> getJobs(Integer count, String flowNames, Stream<String> flowStatus, Stream<String> forestIds) {
public Stream<String> getJobs(Integer start, Integer count, String flowNames, Stream<String> flowStatus, Stream<String> forestIds, String startDate, String endDate) {
return BaseProxy.StringType.toString(
baseProxy
.request("getJobs.sjs", BaseProxy.ParameterValuesKind.MULTIPLE_ATOMICS)
.withSession()
.withParams(
BaseProxy.atomicParam("start", true, BaseProxy.UnsignedIntegerType.fromInteger(start)),
BaseProxy.atomicParam("count", true, BaseProxy.UnsignedIntegerType.fromInteger(count)),
BaseProxy.atomicParam("flowNames", true, BaseProxy.StringType.fromString(flowNames)),
BaseProxy.atomicParam("flowStatus", true, BaseProxy.StringType.fromString(flowStatus)),
BaseProxy.atomicParam("forestIds", true, BaseProxy.StringType.fromString(forestIds)))
BaseProxy.atomicParam("forestIds", true, BaseProxy.StringType.fromString(forestIds)),
BaseProxy.atomicParam("startDate", true, BaseProxy.DateTimeType.fromString(startDate)),
BaseProxy.atomicParam("endDate", true, BaseProxy.DateTimeType.fromString(endDate)))
.withMethod("POST")
.responseMultiple(true, null)
);
}


@Override
public com.fasterxml.jackson.databind.node.ObjectNode getFlow(String flowName) {
return BaseProxy.ObjectType.toObjectNode(
baseProxy
.request("getFlow.sjs", BaseProxy.ParameterValuesKind.SINGLE_ATOMIC)
.withSession()
.withParams(
BaseProxy.atomicParam("flowName", true, BaseProxy.StringType.fromString(flowName)))
.withMethod("POST")
.responseSingle(false, Format.JSON)
);
}


@Override
public void deleteFlow(String flowName) {
baseProxy
.request("deleteFlow.sjs", BaseProxy.ParameterValuesKind.SINGLE_ATOMIC)
.withSession()
.withParams(
BaseProxy.atomicParam("flowName", false, BaseProxy.StringType.fromString(flowName)))
.withMethod("POST")
.responseNone();
}


@Override
public String createJob(String uri, String flowName) {
return BaseProxy.StringType.toString(
Expand Down Expand Up @@ -77,6 +107,36 @@ public com.fasterxml.jackson.databind.node.ArrayNode processJob(Stream<String> u
);
}


@Override
public com.fasterxml.jackson.databind.node.ObjectNode getFlowStatus(Stream<String> flowNames, String startDate, String endDate, Boolean detailed) {
return BaseProxy.ObjectType.toObjectNode(
baseProxy
.request("getFlowStatus.sjs", BaseProxy.ParameterValuesKind.MULTIPLE_ATOMICS)
.withSession()
.withParams(
BaseProxy.atomicParam("flowNames", true, BaseProxy.StringType.fromString(flowNames)),
BaseProxy.atomicParam("startDate", true, BaseProxy.DateTimeType.fromString(startDate)),
BaseProxy.atomicParam("endDate", true, BaseProxy.DateTimeType.fromString(endDate)),
BaseProxy.atomicParam("detailed", true, BaseProxy.BooleanType.fromBoolean(detailed)))
.withMethod("POST")
.responseSingle(false, Format.JSON)
);
}


@Override
public void insertFlow(String flowName, Reader flow) {
baseProxy
.request("insertFlow.sjs", BaseProxy.ParameterValuesKind.MULTIPLE_MIXED)
.withSession()
.withParams(
BaseProxy.atomicParam("flowName", false, BaseProxy.StringType.fromString(flowName)),
BaseProxy.documentParam("flow", false, BaseProxy.ObjectType.fromReader(flow)))
.withMethod("POST")
.responseNone();
}

}

return new StateConductorServiceImpl(db);
Expand All @@ -85,13 +145,32 @@ public com.fasterxml.jackson.databind.node.ArrayNode processJob(Stream<String> u
/**
* Returns a list of MarkLogic State Conductor Job document URIs
*
* @param start Return records starting from this position.
* @param count The number of uris to return
* @param flowNames A list of flow names to filter the returned job documents
* @param flowStatus A list of flow status's to filter the returned job documents. Defaults to 'new' and 'working'.
* @param forestIds The returned list of job documents will be limited to jobs found in this list of forests.
* @param startDate Filter on jobs created after this date and time.
* @param endDate Filter on jobs created prior to this date and time.
* @return as output
*/
Stream<String> getJobs(Integer start, Integer count, String flowNames, Stream<String> flowStatus, Stream<String> forestIds, String startDate, String endDate);

/**
* Returns a single flow if flowName is specified or all flows otherwise.
*
* @param flowName The name of the flow to return. Pass null to return all.
* @return as output
*/
Stream<String> getJobs(Integer count, String flowNames, Stream<String> flowStatus, Stream<String> forestIds);
com.fasterxml.jackson.databind.node.ObjectNode getFlow(String flowName);

/**
* Deletes a single flow.
*
* @param flowName The name of the flow to be created or updated.
*
*/
void deleteFlow(String flowName);

/**
* Creates a MarkLogic State Conductor Job document for the given uri and flow.
Expand All @@ -110,4 +189,24 @@ public com.fasterxml.jackson.databind.node.ArrayNode processJob(Stream<String> u
*/
com.fasterxml.jackson.databind.node.ArrayNode processJob(Stream<String> uri);

/**
* Returns the status and states of one or more State Conductor flows.
*
* @param flowNames The flow names for which to report status.
* @param startDate Filter on jobs created after this date and time.
* @param endDate Filter on jobs created prior to this date and time.
* @param detailed Include detailed breakdown of jobs per state per status?
* @return as output
*/
com.fasterxml.jackson.databind.node.ObjectNode getFlowStatus(Stream<String> flowNames, String startDate, String endDate, Boolean detailed);

/**
* Creates or updates a single flow.
*
* @param flowName The name of the flow to be created or updated.
* @param flow The flow to be created or updated.
*
*/
void insertFlow(String flowName, Reader flow);

}
Loading

0 comments on commit 0e08eeb

Please sign in to comment.