Skip to content
This repository has been archived by the owner on May 14, 2018. It is now read-only.

Commit

Permalink
v2.0.0
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrey Kurilov committed Apr 11, 2018
1 parent eb7c5d1 commit cdba0d5
Show file tree
Hide file tree
Showing 17 changed files with 109 additions and 61 deletions.
46 changes: 38 additions & 8 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
description = "The library supporting the alternative concurrency model"
description = "The concurrency library which includes the coroutines functionality"

buildscript {
repositories {
Expand All @@ -11,32 +11,62 @@ allprojects {
apply plugin: "maven"
apply plugin: "signing"
group = "com.github.akurilov"
version = "1.1.5"
version = "2.0.0"
}

tasks.withType(JavaCompile) {
sourceCompatibility = JavaVersion.VERSION_1_8
targetCompatibility = JavaVersion.VERSION_1_8
}
ext.moduleName = "${group}.concurrent"

repositories {
mavenCentral()
}

dependencies {
compile("com.github.akurilov:java-commons:[1.4.7,)")
compile("com.github.akurilov:java-commons:[2.0.0,)")
testCompile("junit:junit:4.12")
}

compileJava {
sourceCompatibility = JavaVersion.VERSION_1_10
targetCompatibility = JavaVersion.VERSION_1_10
inputs.property("moduleName", moduleName)
doFirst {
options.compilerArgs = [
"--module-path", classpath.asPath,
]
classpath = files()
}
}

compileTestJava {
sourceCompatibility = JavaVersion.VERSION_1_10
targetCompatibility = JavaVersion.VERSION_1_10
inputs.property("moduleName", moduleName)
doFirst {
options.compilerArgs = [
"--module-path", classpath.asPath,
"--add-modules", "ALL-MODULE-PATH",
"--add-reads", "${moduleName}.test=junit",
"--patch-module", "$moduleName=" + files(sourceSets.test.java.outputDir).asPath,
]
classpath = files()
}
}

jar {
inputs.property("moduleName", moduleName)
manifest {
attributes (
"Automatic-Module-Name": moduleName,
"Implementation-Version": version,
"Implementation-Title": rootProject.name,
)
}
}

javadoc {
options.addStringOption("-module-path", classpath.asPath)
}

task sourcesJar(type: Jar, dependsOn: classes) {
classifier = "sources"
from sourceSets.main.allSource
Expand Down Expand Up @@ -137,7 +167,7 @@ uploadArchives {
}

task wrapper(type: Wrapper) {
gradleVersion = "3.5"
gradleVersion = "4.6"
}

task printVersion {
Expand Down
Binary file modified gradle/wrapper/gradle-wrapper.jar
Binary file not shown.
3 changes: 1 addition & 2 deletions gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#Wed Aug 23 01:05:25 MSK 2017
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-4.6-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-3.5-bin.zip
6 changes: 3 additions & 3 deletions gradlew
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ DEFAULT_JVM_OPTS=""
# Use the maximum available, or set MAX_FD != -1 to use that value.
MAX_FD="maximum"

warn ( ) {
warn () {
echo "$*"
}

die ( ) {
die () {
echo
echo "$*"
echo
Expand Down Expand Up @@ -155,7 +155,7 @@ if $cygwin ; then
fi

# Escape application args
save ( ) {
save () {
for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done
echo " "
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ public final AsyncRunnableBase await()
@Override
public boolean await(final long timeout, final TimeUnit timeUnit)
throws IllegalStateException, InterruptedException {
long t, timeOutMilliSec = timeUnit.toMillis(timeout);
t = System.currentTimeMillis();
final var timeOutMilliSec = timeUnit.toMillis(timeout);
final var t = System.currentTimeMillis();
while(isStarted() || isShutdown()) {
if(System.currentTimeMillis() - t >= timeOutMilliSec) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,17 @@ public class ContextAwareThreadFactory
protected final boolean daemonFlag;
protected final Map<String, String> threadContext;

public ContextAwareThreadFactory(final String threadNamePrefix, final Map<String, String> threadContext) {
public ContextAwareThreadFactory(
final String threadNamePrefix, final Map<String, String> threadContext
) {
this.threadNamePrefix = threadNamePrefix;
this.daemonFlag = false;
this.threadContext = threadContext;
}

public ContextAwareThreadFactory(
final String threadNamePrefix, final boolean daemonFlag, final Map<String, String> threadContext
final String threadNamePrefix, final boolean daemonFlag,
final Map<String, String> threadContext
) {
this.threadNamePrefix = threadNamePrefix;
this.daemonFlag = daemonFlag;
Expand All @@ -55,7 +58,8 @@ protected ContextAwareThread(
@Override
public Thread newThread(final Runnable task) {
return new ContextAwareThread(
task, threadNamePrefix + "#" + threadNumber.incrementAndGet(), daemonFlag, exceptionHandler, threadContext
task, threadNamePrefix + "#" + threadNumber.incrementAndGet(), daemonFlag,
exceptionHandler, threadContext
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public RateThrottle(final double rateLimit) {
public final boolean tryAcquire(final X item) {
synchronized(this) {
if(startTime > 0) {
final long periodCount = (nanoTime() - startTime) / periodNanos;
final var periodCount = (nanoTime() - startTime) / periodNanos;
if(periodCount > acquiredCount) {
acquiredCount ++;
return true;
Expand All @@ -49,7 +49,7 @@ public final boolean tryAcquire(final X item) {
public final int tryAcquire(final X item, final int requiredCount) {
synchronized(this) {
if(startTime > 0) {
final int availableCount = (int) (
final var availableCount = (int) (
(nanoTime() - startTime) / periodNanos - acquiredCount
);
if(availableCount > requiredCount) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ public WeightThrottle(final Int2IntMap weightMap)

private void resetRemainingWeights()
throws IllegalArgumentException {
for(final int key : weightKeys) {
for(final var key : weightKeys) {
remainingWeightMap.put(key, weightMap.get(key));
}
}

private void ensureRemainingWeights() {
for(final int key : weightKeys) {
for(final var key : weightKeys) {
if(remainingWeightMap.get(key) > 0) {
return;
}
Expand All @@ -45,7 +45,7 @@ private void ensureRemainingWeights() {
public final boolean tryAcquire(final int key) {
synchronized(remainingWeightMap) {
ensureRemainingWeights();
final int remainingWeight = remainingWeightMap.get(key);
final var remainingWeight = remainingWeightMap.get(key);
if(remainingWeight > 0) {
remainingWeightMap.put(key, remainingWeight - 1);
return true;
Expand All @@ -61,7 +61,7 @@ public final int tryAcquire(final int key, final int times) {
}
synchronized(remainingWeightMap) {
ensureRemainingWeights();
final int remainingWeight = remainingWeightMap.get(key);
final var remainingWeight = remainingWeightMap.get(key);
if(times > remainingWeight) {
remainingWeightMap.put(key, 0);
return remainingWeight;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ protected void doStart() {
*/
@Override
public final void invoke() {
long t = System.nanoTime();
var t = System.nanoTime();
invokeTimed(t);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,14 @@ public CoroutinesExecutor() {
}

public CoroutinesExecutor(final boolean backgroundFlag) {
final int svcThreadCount = Runtime.getRuntime().availableProcessors();
final var svcThreadCount = Runtime.getRuntime().availableProcessors();
executor = new ThreadPoolExecutor(
svcThreadCount, svcThreadCount, 0, TimeUnit.DAYS, new ArrayBlockingQueue<>(1),
new ContextAwareThreadFactory("coroutine-processor-", true, null)
);
this.backgroundFlag = backgroundFlag;
for(int i = 0; i < svcThreadCount; i ++) {
final CoroutinesExecutorTask svcWorkerTask = new CoroutinesExecutorTask(
coroutines, backgroundFlag
);
for(var i = 0; i < svcThreadCount; i ++) {
final var svcWorkerTask = new CoroutinesExecutorTask(coroutines, backgroundFlag);
executor.submit(svcWorkerTask);
workers.add(svcWorkerTask);
svcWorkerTask.start();
Expand All @@ -56,24 +54,22 @@ public void stop(final Coroutine coroutine) {
}

public void setThreadCount(final int threadCount) {
final int newThreadCount = threadCount > 0 ?
final var newThreadCount = threadCount > 0 ?
threadCount : Runtime.getRuntime().availableProcessors();
final int oldThreadCount = executor.getCorePoolSize();
final var oldThreadCount = executor.getCorePoolSize();
if(newThreadCount != oldThreadCount) {
executor.setCorePoolSize(newThreadCount);
executor.setMaximumPoolSize(newThreadCount);
if(newThreadCount > oldThreadCount) {
for(int i = oldThreadCount; i < newThreadCount; i ++) {
final CoroutinesExecutorTask execTask = new CoroutinesExecutorTask(
coroutines, backgroundFlag
);
for(var i = oldThreadCount; i < newThreadCount; i ++) {
final var execTask = new CoroutinesExecutorTask(coroutines, backgroundFlag);
executor.submit(execTask);
workers.add(execTask);
execTask.start();
}
} else { // less, remove some active service worker tasks
try {
for(int i = oldThreadCount - 1; i >= newThreadCount; i --) {
for(var i = oldThreadCount - 1; i >= newThreadCount; i --) {
workers.remove(i).close();
}
} catch (final Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public final void run() {
break;
}
} else {
for(final Coroutine nextCoroutine : coroutines) {
for(final var nextCoroutine : coroutines) {
try {
if(nextCoroutine.isStarted()) {
nextCoroutine.invoke();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public RoundRobinOutputCoroutine(
this.outputsCount = outputs.size();
this.buffCapacity = buffCapacity;
this.buffs = new HashMap<>(this.outputsCount);
for(int i = 0; i < this.outputsCount; i ++) {
for(var i = 0; i < this.outputsCount; i ++) {
this.buffs.put(outputs.get(i), new OptLockArrayBuffer<>(buffCapacity));
}
}
Expand All @@ -60,7 +60,7 @@ public final boolean put(final T ioTask)
if(isStopped()) {
throw new EOFException();
}
final OptLockBuffer<T> buff = selectBuff();
final var buff = selectBuff();
if(buff != null && buff.tryLock()) {
try {
return buff.size() < buffCapacity && buff.add(ioTask);
Expand All @@ -79,16 +79,16 @@ public final int put(final List<T> srcBuff, final int from, final int to)
throw new EOFException();
}
OptLockBuffer<T> buff;
final int n = to - from;
final var n = to - from;
if(n > outputsCount) {
final int nPerOutput = n / outputsCount;
int nextFrom = from;
for(int i = 0; i < outputsCount; i ++) {
final var nPerOutput = n / outputsCount;
var nextFrom = from;
for(var i = 0; i < outputsCount; i ++) {
buff = selectBuff();
if(buff != null && buff.tryLock()) {
try {
final int m = Math.min(nPerOutput, buffCapacity - buff.size());
for(final T item : srcBuff.subList(nextFrom, nextFrom + m)) {
final var m = Math.min(nPerOutput, buffCapacity - buff.size());
for(final var item : srcBuff.subList(nextFrom, nextFrom + m)) {
buff.add(item);
}
nextFrom += m;
Expand All @@ -101,8 +101,8 @@ public final int put(final List<T> srcBuff, final int from, final int to)
buff = selectBuff();
if(buff != null && buff.tryLock()) {
try {
final int m = Math.min(to - nextFrom, buffCapacity - buff.size());
for(final T item : srcBuff.subList(nextFrom, nextFrom + m)) {
final var m = Math.min(to - nextFrom, buffCapacity - buff.size());
for(final var item : srcBuff.subList(nextFrom, nextFrom + m)) {
buff.add(item);
}
nextFrom += m;
Expand All @@ -113,7 +113,7 @@ public final int put(final List<T> srcBuff, final int from, final int to)
}
return nextFrom - from;
} else {
for(int i = from; i < to; i ++) {
for(var i = from; i < to; i ++) {
buff = selectBuff();
if(buff != null && buff.tryLock()) {
try {
Expand Down Expand Up @@ -162,7 +162,7 @@ protected final void invokeTimed(final long startTimeNanos) {
}
} catch(final EOFException | NoSuchObjectException | ConnectException ignored) {
} catch(final RemoteException e) {
final Throwable cause = e.getCause();
final var cause = e.getCause();
if(!(cause instanceof EOFException)) {
LOG.log(Level.WARNING, "Invocation failure", e);
}
Expand All @@ -182,8 +182,8 @@ public final Input<T> getInput() {
@Override
protected final void doClose()
throws IOException {
for(final O output : outputs) {
final OptLockBuffer<T> buff = buffs.get(output);
for(final var output : outputs) {
final var buff = buffs.get(output);
if(buff != null) {
buff.clear();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,21 +68,21 @@ protected final void invokeTimedExclusively(final long startTimeNanos) {
return;
}

final List<T> items = input.getAll();
final var items = input.getAll();
if(items != null) {
n = items.size();
if(n > 0) {
if(n == 1) {
final T item = items.get(0);
final var item = items.get(0);
if(!output.put(item)) {
deferredItems.add(item);
}
} else {
final int m = output.put(items, 0, Math.min(n, batchSize));
final var m = output.put(items, 0, Math.min(n, batchSize));
if(m < n) {
// not all items was transferred w/o blocking
// defer the remaining items for a future try
for(final T item : items.subList(m, n)) {
for(final var item : items.subList(m, n)) {
deferredItems.add(item);
}
}
Expand All @@ -98,7 +98,7 @@ protected final void invokeTimedExclusively(final long startTimeNanos) {
LOG.log(Level.WARNING, "Failed to close self after EOF", ee);
}
} catch(final RemoteException e) {
final Throwable cause = e.getCause();
final var cause = e.getCause();
if(cause instanceof EOFException) {
try {
close();
Expand Down
Loading

0 comments on commit cdba0d5

Please sign in to comment.