diff --git a/build.gradle b/build.gradle index cf0d743..154c082 100644 --- a/build.gradle +++ b/build.gradle @@ -1,4 +1,4 @@ -description = "The library supporting the alternative concurrency model" +description = "The concurrency library which includes the coroutines functionality" buildscript { repositories { @@ -11,32 +11,62 @@ allprojects { apply plugin: "maven" apply plugin: "signing" group = "com.github.akurilov" - version = "1.1.5" + version = "2.0.0" } -tasks.withType(JavaCompile) { - sourceCompatibility = JavaVersion.VERSION_1_8 - targetCompatibility = JavaVersion.VERSION_1_8 -} +ext.moduleName = "${group}.concurrent" repositories { mavenCentral() } dependencies { - compile("com.github.akurilov:java-commons:[1.4.7,)") + compile("com.github.akurilov:java-commons:[2.0.0,)") testCompile("junit:junit:4.12") } +compileJava { + sourceCompatibility = JavaVersion.VERSION_1_10 + targetCompatibility = JavaVersion.VERSION_1_10 + inputs.property("moduleName", moduleName) + doFirst { + options.compilerArgs = [ + "--module-path", classpath.asPath, + ] + classpath = files() + } +} + +compileTestJava { + sourceCompatibility = JavaVersion.VERSION_1_10 + targetCompatibility = JavaVersion.VERSION_1_10 + inputs.property("moduleName", moduleName) + doFirst { + options.compilerArgs = [ + "--module-path", classpath.asPath, + "--add-modules", "ALL-MODULE-PATH", + "--add-reads", "${moduleName}.test=junit", + "--patch-module", "$moduleName=" + files(sourceSets.test.java.outputDir).asPath, + ] + classpath = files() + } +} + jar { + inputs.property("moduleName", moduleName) manifest { attributes ( + "Automatic-Module-Name": moduleName, "Implementation-Version": version, "Implementation-Title": rootProject.name, ) } } +javadoc { + options.addStringOption("-module-path", classpath.asPath) +} + task sourcesJar(type: Jar, dependsOn: classes) { classifier = "sources" from sourceSets.main.allSource @@ -137,7 +167,7 @@ uploadArchives { } task wrapper(type: Wrapper) { - gradleVersion = "3.5" + gradleVersion = "4.6" } task printVersion { diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar index 027bf00..f6b961f 100644 Binary files a/gradle/wrapper/gradle-wrapper.jar and b/gradle/wrapper/gradle-wrapper.jar differ diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 6492533..bf3de21 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,5 @@ -#Wed Aug 23 01:05:25 MSK 2017 distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists +distributionUrl=https\://services.gradle.org/distributions/gradle-4.6-bin.zip zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-3.5-bin.zip diff --git a/gradlew b/gradlew index 4453cce..cccdd3d 100755 --- a/gradlew +++ b/gradlew @@ -33,11 +33,11 @@ DEFAULT_JVM_OPTS="" # Use the maximum available, or set MAX_FD != -1 to use that value. MAX_FD="maximum" -warn ( ) { +warn () { echo "$*" } -die ( ) { +die () { echo echo "$*" echo @@ -155,7 +155,7 @@ if $cygwin ; then fi # Escape application args -save ( ) { +save () { for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done echo " " } diff --git a/src/main/java/com/github/akurilov/concurrent/AsyncRunnableBase.java b/src/main/java/com/github/akurilov/concurrent/AsyncRunnableBase.java index f1f0040..2e70951 100644 --- a/src/main/java/com/github/akurilov/concurrent/AsyncRunnableBase.java +++ b/src/main/java/com/github/akurilov/concurrent/AsyncRunnableBase.java @@ -114,8 +114,8 @@ public final AsyncRunnableBase await() @Override public boolean await(final long timeout, final TimeUnit timeUnit) throws IllegalStateException, InterruptedException { - long t, timeOutMilliSec = timeUnit.toMillis(timeout); - t = System.currentTimeMillis(); + final var timeOutMilliSec = timeUnit.toMillis(timeout); + final var t = System.currentTimeMillis(); while(isStarted() || isShutdown()) { if(System.currentTimeMillis() - t >= timeOutMilliSec) { return false; diff --git a/src/main/java/com/github/akurilov/concurrent/ContextAwareThreadFactory.java b/src/main/java/com/github/akurilov/concurrent/ContextAwareThreadFactory.java index 8406f07..a2b52b1 100644 --- a/src/main/java/com/github/akurilov/concurrent/ContextAwareThreadFactory.java +++ b/src/main/java/com/github/akurilov/concurrent/ContextAwareThreadFactory.java @@ -22,14 +22,17 @@ public class ContextAwareThreadFactory protected final boolean daemonFlag; protected final Map threadContext; - public ContextAwareThreadFactory(final String threadNamePrefix, final Map threadContext) { + public ContextAwareThreadFactory( + final String threadNamePrefix, final Map threadContext + ) { this.threadNamePrefix = threadNamePrefix; this.daemonFlag = false; this.threadContext = threadContext; } public ContextAwareThreadFactory( - final String threadNamePrefix, final boolean daemonFlag, final Map threadContext + final String threadNamePrefix, final boolean daemonFlag, + final Map threadContext ) { this.threadNamePrefix = threadNamePrefix; this.daemonFlag = daemonFlag; @@ -55,7 +58,8 @@ protected ContextAwareThread( @Override public Thread newThread(final Runnable task) { return new ContextAwareThread( - task, threadNamePrefix + "#" + threadNumber.incrementAndGet(), daemonFlag, exceptionHandler, threadContext + task, threadNamePrefix + "#" + threadNumber.incrementAndGet(), daemonFlag, + exceptionHandler, threadContext ); } diff --git a/src/main/java/com/github/akurilov/concurrent/RateThrottle.java b/src/main/java/com/github/akurilov/concurrent/RateThrottle.java index 7f12f25..15e907c 100644 --- a/src/main/java/com/github/akurilov/concurrent/RateThrottle.java +++ b/src/main/java/com/github/akurilov/concurrent/RateThrottle.java @@ -30,7 +30,7 @@ public RateThrottle(final double rateLimit) { public final boolean tryAcquire(final X item) { synchronized(this) { if(startTime > 0) { - final long periodCount = (nanoTime() - startTime) / periodNanos; + final var periodCount = (nanoTime() - startTime) / periodNanos; if(periodCount > acquiredCount) { acquiredCount ++; return true; @@ -49,7 +49,7 @@ public final boolean tryAcquire(final X item) { public final int tryAcquire(final X item, final int requiredCount) { synchronized(this) { if(startTime > 0) { - final int availableCount = (int) ( + final var availableCount = (int) ( (nanoTime() - startTime) / periodNanos - acquiredCount ); if(availableCount > requiredCount) { diff --git a/src/main/java/com/github/akurilov/concurrent/WeightThrottle.java b/src/main/java/com/github/akurilov/concurrent/WeightThrottle.java index ee1a038..172e40a 100644 --- a/src/main/java/com/github/akurilov/concurrent/WeightThrottle.java +++ b/src/main/java/com/github/akurilov/concurrent/WeightThrottle.java @@ -28,13 +28,13 @@ public WeightThrottle(final Int2IntMap weightMap) private void resetRemainingWeights() throws IllegalArgumentException { - for(final int key : weightKeys) { + for(final var key : weightKeys) { remainingWeightMap.put(key, weightMap.get(key)); } } private void ensureRemainingWeights() { - for(final int key : weightKeys) { + for(final var key : weightKeys) { if(remainingWeightMap.get(key) > 0) { return; } @@ -45,7 +45,7 @@ private void ensureRemainingWeights() { public final boolean tryAcquire(final int key) { synchronized(remainingWeightMap) { ensureRemainingWeights(); - final int remainingWeight = remainingWeightMap.get(key); + final var remainingWeight = remainingWeightMap.get(key); if(remainingWeight > 0) { remainingWeightMap.put(key, remainingWeight - 1); return true; @@ -61,7 +61,7 @@ public final int tryAcquire(final int key, final int times) { } synchronized(remainingWeightMap) { ensureRemainingWeights(); - final int remainingWeight = remainingWeightMap.get(key); + final var remainingWeight = remainingWeightMap.get(key); if(times > remainingWeight) { remainingWeightMap.put(key, 0); return remainingWeight; diff --git a/src/main/java/com/github/akurilov/concurrent/coroutines/CoroutineBase.java b/src/main/java/com/github/akurilov/concurrent/coroutines/CoroutineBase.java index 3b38409..f44f290 100644 --- a/src/main/java/com/github/akurilov/concurrent/coroutines/CoroutineBase.java +++ b/src/main/java/com/github/akurilov/concurrent/coroutines/CoroutineBase.java @@ -29,7 +29,7 @@ protected void doStart() { */ @Override public final void invoke() { - long t = System.nanoTime(); + var t = System.nanoTime(); invokeTimed(t); } diff --git a/src/main/java/com/github/akurilov/concurrent/coroutines/CoroutinesExecutor.java b/src/main/java/com/github/akurilov/concurrent/coroutines/CoroutinesExecutor.java index 089e7e2..251f847 100644 --- a/src/main/java/com/github/akurilov/concurrent/coroutines/CoroutinesExecutor.java +++ b/src/main/java/com/github/akurilov/concurrent/coroutines/CoroutinesExecutor.java @@ -31,16 +31,14 @@ public CoroutinesExecutor() { } public CoroutinesExecutor(final boolean backgroundFlag) { - final int svcThreadCount = Runtime.getRuntime().availableProcessors(); + final var svcThreadCount = Runtime.getRuntime().availableProcessors(); executor = new ThreadPoolExecutor( svcThreadCount, svcThreadCount, 0, TimeUnit.DAYS, new ArrayBlockingQueue<>(1), new ContextAwareThreadFactory("coroutine-processor-", true, null) ); this.backgroundFlag = backgroundFlag; - for(int i = 0; i < svcThreadCount; i ++) { - final CoroutinesExecutorTask svcWorkerTask = new CoroutinesExecutorTask( - coroutines, backgroundFlag - ); + for(var i = 0; i < svcThreadCount; i ++) { + final var svcWorkerTask = new CoroutinesExecutorTask(coroutines, backgroundFlag); executor.submit(svcWorkerTask); workers.add(svcWorkerTask); svcWorkerTask.start(); @@ -56,24 +54,22 @@ public void stop(final Coroutine coroutine) { } public void setThreadCount(final int threadCount) { - final int newThreadCount = threadCount > 0 ? + final var newThreadCount = threadCount > 0 ? threadCount : Runtime.getRuntime().availableProcessors(); - final int oldThreadCount = executor.getCorePoolSize(); + final var oldThreadCount = executor.getCorePoolSize(); if(newThreadCount != oldThreadCount) { executor.setCorePoolSize(newThreadCount); executor.setMaximumPoolSize(newThreadCount); if(newThreadCount > oldThreadCount) { - for(int i = oldThreadCount; i < newThreadCount; i ++) { - final CoroutinesExecutorTask execTask = new CoroutinesExecutorTask( - coroutines, backgroundFlag - ); + for(var i = oldThreadCount; i < newThreadCount; i ++) { + final var execTask = new CoroutinesExecutorTask(coroutines, backgroundFlag); executor.submit(execTask); workers.add(execTask); execTask.start(); } } else { // less, remove some active service worker tasks try { - for(int i = oldThreadCount - 1; i >= newThreadCount; i --) { + for(var i = oldThreadCount - 1; i >= newThreadCount; i --) { workers.remove(i).close(); } } catch (final Exception e) { diff --git a/src/main/java/com/github/akurilov/concurrent/coroutines/CoroutinesExecutorTask.java b/src/main/java/com/github/akurilov/concurrent/coroutines/CoroutinesExecutorTask.java index a712f78..9a79617 100644 --- a/src/main/java/com/github/akurilov/concurrent/coroutines/CoroutinesExecutorTask.java +++ b/src/main/java/com/github/akurilov/concurrent/coroutines/CoroutinesExecutorTask.java @@ -33,7 +33,7 @@ public final void run() { break; } } else { - for(final Coroutine nextCoroutine : coroutines) { + for(final var nextCoroutine : coroutines) { try { if(nextCoroutine.isStarted()) { nextCoroutine.invoke(); diff --git a/src/main/java/com/github/akurilov/concurrent/coroutines/RoundRobinOutputCoroutine.java b/src/main/java/com/github/akurilov/concurrent/coroutines/RoundRobinOutputCoroutine.java index 237bbed..234e712 100644 --- a/src/main/java/com/github/akurilov/concurrent/coroutines/RoundRobinOutputCoroutine.java +++ b/src/main/java/com/github/akurilov/concurrent/coroutines/RoundRobinOutputCoroutine.java @@ -41,7 +41,7 @@ public RoundRobinOutputCoroutine( this.outputsCount = outputs.size(); this.buffCapacity = buffCapacity; this.buffs = new HashMap<>(this.outputsCount); - for(int i = 0; i < this.outputsCount; i ++) { + for(var i = 0; i < this.outputsCount; i ++) { this.buffs.put(outputs.get(i), new OptLockArrayBuffer<>(buffCapacity)); } } @@ -60,7 +60,7 @@ public final boolean put(final T ioTask) if(isStopped()) { throw new EOFException(); } - final OptLockBuffer buff = selectBuff(); + final var buff = selectBuff(); if(buff != null && buff.tryLock()) { try { return buff.size() < buffCapacity && buff.add(ioTask); @@ -79,16 +79,16 @@ public final int put(final List srcBuff, final int from, final int to) throw new EOFException(); } OptLockBuffer buff; - final int n = to - from; + final var n = to - from; if(n > outputsCount) { - final int nPerOutput = n / outputsCount; - int nextFrom = from; - for(int i = 0; i < outputsCount; i ++) { + final var nPerOutput = n / outputsCount; + var nextFrom = from; + for(var i = 0; i < outputsCount; i ++) { buff = selectBuff(); if(buff != null && buff.tryLock()) { try { - final int m = Math.min(nPerOutput, buffCapacity - buff.size()); - for(final T item : srcBuff.subList(nextFrom, nextFrom + m)) { + final var m = Math.min(nPerOutput, buffCapacity - buff.size()); + for(final var item : srcBuff.subList(nextFrom, nextFrom + m)) { buff.add(item); } nextFrom += m; @@ -101,8 +101,8 @@ public final int put(final List srcBuff, final int from, final int to) buff = selectBuff(); if(buff != null && buff.tryLock()) { try { - final int m = Math.min(to - nextFrom, buffCapacity - buff.size()); - for(final T item : srcBuff.subList(nextFrom, nextFrom + m)) { + final var m = Math.min(to - nextFrom, buffCapacity - buff.size()); + for(final var item : srcBuff.subList(nextFrom, nextFrom + m)) { buff.add(item); } nextFrom += m; @@ -113,7 +113,7 @@ public final int put(final List srcBuff, final int from, final int to) } return nextFrom - from; } else { - for(int i = from; i < to; i ++) { + for(var i = from; i < to; i ++) { buff = selectBuff(); if(buff != null && buff.tryLock()) { try { @@ -162,7 +162,7 @@ protected final void invokeTimed(final long startTimeNanos) { } } catch(final EOFException | NoSuchObjectException | ConnectException ignored) { } catch(final RemoteException e) { - final Throwable cause = e.getCause(); + final var cause = e.getCause(); if(!(cause instanceof EOFException)) { LOG.log(Level.WARNING, "Invocation failure", e); } @@ -182,8 +182,8 @@ public final Input getInput() { @Override protected final void doClose() throws IOException { - for(final O output : outputs) { - final OptLockBuffer buff = buffs.get(output); + for(final var output : outputs) { + final var buff = buffs.get(output); if(buff != null) { buff.clear(); } diff --git a/src/main/java/com/github/akurilov/concurrent/coroutines/TransferCoroutine.java b/src/main/java/com/github/akurilov/concurrent/coroutines/TransferCoroutine.java index 0120b07..3bd8e12 100644 --- a/src/main/java/com/github/akurilov/concurrent/coroutines/TransferCoroutine.java +++ b/src/main/java/com/github/akurilov/concurrent/coroutines/TransferCoroutine.java @@ -68,21 +68,21 @@ protected final void invokeTimedExclusively(final long startTimeNanos) { return; } - final List items = input.getAll(); + final var items = input.getAll(); if(items != null) { n = items.size(); if(n > 0) { if(n == 1) { - final T item = items.get(0); + final var item = items.get(0); if(!output.put(item)) { deferredItems.add(item); } } else { - final int m = output.put(items, 0, Math.min(n, batchSize)); + final var m = output.put(items, 0, Math.min(n, batchSize)); if(m < n) { // not all items was transferred w/o blocking // defer the remaining items for a future try - for(final T item : items.subList(m, n)) { + for(final var item : items.subList(m, n)) { deferredItems.add(item); } } @@ -98,7 +98,7 @@ protected final void invokeTimedExclusively(final long startTimeNanos) { LOG.log(Level.WARNING, "Failed to close self after EOF", ee); } } catch(final RemoteException e) { - final Throwable cause = e.getCause(); + final var cause = e.getCause(); if(cause instanceof EOFException) { try { close(); diff --git a/src/main/java/module-info.java b/src/main/java/module-info.java new file mode 100644 index 0000000..953853f --- /dev/null +++ b/src/main/java/module-info.java @@ -0,0 +1,17 @@ +module com.github.akurilov.concurrent { + + requires com.github.akurilov.commons; + requires commons.collections4; + requires java.base; + requires java.logging; + requires java.rmi; + + exports com.github.akurilov.concurrent; + exports com.github.akurilov.concurrent.coroutines; + exports it.unimi.dsi.fastutil; + exports it.unimi.dsi.fastutil.booleans; + exports it.unimi.dsi.fastutil.bytes; + exports it.unimi.dsi.fastutil.ints; + exports it.unimi.dsi.fastutil.objects; + exports it.unimi.dsi.fastutil.shorts; +} \ No newline at end of file diff --git a/src/test/java/com/github/akurilov/concurrent/RateThrottleTest.java b/src/test/java/com/github/akurilov/concurrent/test/RateThrottleTest.java similarity index 98% rename from src/test/java/com/github/akurilov/concurrent/RateThrottleTest.java rename to src/test/java/com/github/akurilov/concurrent/test/RateThrottleTest.java index b7a0a72..bcae49a 100644 --- a/src/test/java/com/github/akurilov/concurrent/RateThrottleTest.java +++ b/src/test/java/com/github/akurilov/concurrent/test/RateThrottleTest.java @@ -1,5 +1,6 @@ -package com.github.akurilov.concurrent; +package com.github.akurilov.concurrent.test; +import com.github.akurilov.concurrent.RateThrottle; import org.junit.Test; import static org.junit.Assert.assertEquals; diff --git a/src/test/java/com/github/akurilov/concurrent/coroutines/RoundRobinOutputCoroutineTest.java b/src/test/java/com/github/akurilov/concurrent/test/RoundRobinOutputCoroutineTest.java similarity index 96% rename from src/test/java/com/github/akurilov/concurrent/coroutines/RoundRobinOutputCoroutineTest.java rename to src/test/java/com/github/akurilov/concurrent/test/RoundRobinOutputCoroutineTest.java index 040bd9f..8d8042c 100644 --- a/src/test/java/com/github/akurilov/concurrent/coroutines/RoundRobinOutputCoroutineTest.java +++ b/src/test/java/com/github/akurilov/concurrent/test/RoundRobinOutputCoroutineTest.java @@ -1,10 +1,10 @@ -package com.github.akurilov.concurrent.coroutines; +package com.github.akurilov.concurrent.test; import com.github.akurilov.commons.io.Input; import com.github.akurilov.commons.io.Output; import com.github.akurilov.concurrent.coroutines.CoroutinesExecutor; - +import com.github.akurilov.concurrent.coroutines.RoundRobinOutputCoroutine; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; diff --git a/src/test/java/com/github/akurilov/concurrent/WeightThrottleTest.java b/src/test/java/com/github/akurilov/concurrent/test/WeightThrottleTest.java similarity index 96% rename from src/test/java/com/github/akurilov/concurrent/WeightThrottleTest.java rename to src/test/java/com/github/akurilov/concurrent/test/WeightThrottleTest.java index 9bc9a99..e5b4b9b 100644 --- a/src/test/java/com/github/akurilov/concurrent/WeightThrottleTest.java +++ b/src/test/java/com/github/akurilov/concurrent/test/WeightThrottleTest.java @@ -1,5 +1,6 @@ -package com.github.akurilov.concurrent; +package com.github.akurilov.concurrent.test; +import com.github.akurilov.concurrent.WeightThrottle; import it.unimi.dsi.fastutil.ints.Int2IntMap; import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap; import it.unimi.dsi.fastutil.ints.Int2ObjectMap;