Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix classpath hashing #1832

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
386 changes: 165 additions & 221 deletions backend/src/main/scala/bloop/io/ClasspathHasher.scala

Large diffs are not rendered by default.

61 changes: 61 additions & 0 deletions backend/src/main/scala/bloop/task/ParSequenceN.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package bloop.task

import java.util.concurrent.atomic.AtomicBoolean

import scala.concurrent.Promise

/**
* Implementation is based on https://github.com/monix/monix/blob/2faa2cf7425ab0b88ea57b1ea193bce16613f42a/monix-eval/shared/src/main/scala/monix/eval/internal/TaskParSequenceN.scala
*/
private[task] object ParSequenceN {
def parSequenceN[A](n: Int)(in: Iterable[Task[A]]): Task[Vector[A]] = {
if (in.isEmpty) {
Task.now(Vector.empty)
} else {
// val isCancelled = new AtomicBoolean(false)
Task.defer {
Comment on lines +11 to +16
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

previously, parSequenceN impl was fragile to big tasks halting other tasks in chunk from progressing:

val chunks = in.grouped(n).toList.map(group => Task.parSequence(group))
Task.sequence(chunks).map(_.flatten)

I ported Monix implementation of parSequenceN which starts N workers that are constantly polling for the next job whenever they finish its task.

val queue = new java.util.concurrent.ConcurrentLinkedQueue[(Promise[A], Task[A])]()
val pairs = in.map(t => (Promise[A](), t))
pairs.foreach(queue.add)
val errorPromise = Promise[Throwable]()
val workDone = new AtomicBoolean(false)

val singleJob: Task[Unit] = Task
.defer {
queue.poll() match {
case null =>
Task(workDone.set(true))
case (p, t) =>
t.transform(
value => p.trySuccess(value),
error => errorPromise.tryFailure(error)
)
}
}
.map(_ => ())

lazy val thunkOfWork: Task[Unit] = Task.defer {
if (workDone.get()) Task.unit
else {
singleJob.flatMap(_ => thunkOfWork)
}
}

val workers = Task.parSequence {
List.fill(n)(thunkOfWork)
}

Task.chooseFirstOf(Task.fromFuture(errorPromise.future), workers).flatMap {
case Left((err, fb)) =>
fb.cancel()
Task.raiseError(err)
case Right((fa, _)) =>
fa.cancel()
val values = pairs.unzip._1.toVector.map(p => Task.fromFuture(p.future))
Task.sequence(values)
}
}
}

}
}
16 changes: 10 additions & 6 deletions backend/src/main/scala/bloop/task/Task.scala
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,11 @@ sealed trait Task[+A] { self =>
def as[B](b: => B): Task[B] =
self.map(_ => b)

def void: Task[Unit] =
self.map(_ => ())

@inline def unit(): Task[Unit] = as(())

def timeoutTo[B >: A](duration: FiniteDuration, backup: Task[B]): Task[B] = {
Task
.chooseFirstOf(
Expand All @@ -168,9 +173,10 @@ sealed trait Task[+A] { self =>
)
.flatMap {
case Left((a, _)) =>
// there no need to cancel fb - it's just sleeping
Task.now(a)
case Right((a, _)) =>
a.cancel()
case Right((fa, _)) =>
fa.cancel()
backup
}
}
Expand Down Expand Up @@ -483,10 +489,8 @@ object Task {
}
}

def parSequenceN[A](n: Int)(in: Iterable[Task[A]]): Task[List[A]] = {
val chunks = in.grouped(n).toList.map(group => Task.parSequence(group))
Task.sequence(chunks).map(_.flatten)
}
def parSequenceN[A](n: Int)(in: Iterable[Task[A]]): Task[Vector[A]] =
ParSequenceN.parSequenceN(n)(in)

def fromFuture[A](f: Future[A]): Task[A] =
Wrap(MonixTask.fromFuture(f), List.empty)
Expand Down
180 changes: 180 additions & 0 deletions backend/src/test/scala/bloop/task/ParSequenceNSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
package bloop.task

import scala.concurrent.duration._

import org.junit.Assert
import org.junit.Test
import monix.execution.schedulers.TestScheduler
import java.util.concurrent.atomic.AtomicLong
import scala.util.Success
import scala.util.Failure
import scala.concurrent.Await
import scala.concurrent.Promise

/**
* Test cases are from https://github.com/monix/monix/blob/2faa2cf7425ab0b88ea57b1ea193bce16613f42a/monix-eval/shared/src/test/scala/monix/eval/TaskParSequenceNSuite.scala
*/
class ParSequenceNSpec {

private def assertEquals[A](actual: A, expected: A): Unit = {
Assert.assertEquals(expected, actual)
}

@Test
def empty: Unit = {
implicit val sh: TestScheduler = TestScheduler()
val future = Task.parSequenceN(1)(Vector.empty).runAsync
// check that it completes
sh.tickOne()
assertEquals(future.value, Some(Success(Vector.empty)))
}

@Test
def Task_parSequenceN_should_execute_in_parallel_bounded_by_parallelism: Unit = {
implicit val s: TestScheduler = TestScheduler()

val num = new AtomicLong(0)
val task = Task(num.incrementAndGet()).flatMap(_ => Task.sleep(2.seconds))
val seq = List.fill(100)(task)

Task.parSequenceN(5)(seq).runAsync

s.tick()
assertEquals(num.get(), 5)
s.tick(2.seconds)
assertEquals(num.get(), 10)
s.tick(4.seconds)
assertEquals(num.get(), 20)
s.tick(34.seconds)
assertEquals(num.get(), 100)
}

@Test
def Task_parSequenceN_should_return_result_in_order: Unit = {
implicit val s: TestScheduler = TestScheduler()
val task = 1.until(10).toList.map(Task.eval(_))
val res = Task.parSequenceN(2)(task).runAsync

s.tick()
assertEquals(res.value, Some(Success(List(1, 2, 3, 4, 5, 6, 7, 8, 9))))
}

@Test
def Task_parSequenceN_should_return_empty_list: Unit = {
implicit val s: TestScheduler = TestScheduler()
val res = Task.parSequenceN(2)(List.empty).runAsync

s.tick()
assertEquals(res.value, Some(Success(List.empty)))
}

@Test
def Task_parSequenceN_should_handle_single_elem: Unit = {
implicit val s: TestScheduler = TestScheduler()
val task = 1.until(5).toList.map(Task.eval(_))
val res = Task.parSequenceN(10)(task).runAsync

s.tick()
assertEquals(res.value, Some(Success(List(1, 2, 3, 4))))
}

@Test
def Task_parSequenceN_should_on_error_when_one_elem_fail: Unit = {
implicit val s: TestScheduler = TestScheduler()
val ex = new Exception("dummy")
val seq = Seq(
Task(3).delayExecution(3.seconds),
Task(2).delayExecution(1.second),
Task(throw ex).delayExecution(1.seconds),
Task(3).delayExecution(5.seconds)
)

val f = Task.parSequenceN(2)(seq).runAsync

assertEquals(f.value, None)
s.tick(1.seconds)
assertEquals(f.value, None)
s.tick(2.seconds)
assertEquals(f.value, Some(Failure(ex)))
}

@Test
def Task_parSequenceN_should_be_stack_safe: Unit = {
implicit val s: TestScheduler = TestScheduler()
val count: Int = 200000
val tasks = for (_ <- 0 until count) yield Task.now(1)
val composite = Task.parSequenceN(count)(tasks).map(_.sum)
val result = composite.runAsync
s.tick()
assertEquals(result.value, Some(Success(count)))
}

@Test
def Task_parSequenceN_runAsync_multiple_times: Unit = {
implicit val s: TestScheduler = TestScheduler()
val state = new AtomicLong(0)
val task1 = Task { state.incrementAndGet(); 3 }.memoize
val task2 = task1.map { x =>
state.incrementAndGet(); x + 1
}
val task3 = Task.parSequenceN(2)(List(task2, task2, task2))

val result1 = task3.runAsync
s.tick()
assertEquals(result1.value, Some(Success(List(4, 4, 4))))
assertEquals(state.get(), 1 + 3)

val result2 = task3.runAsync
s.tick()
assertEquals(result2.value, Some(Success(List(4, 4, 4))))
assertEquals(state.get(), 1 + 3 + 3)
}

/**
* Cancellation semantic is the major difference between Monix 2 and 3.
* In Monix 2 cancellation is a mere signal that can be ignored, in Monix 3 it's a hard stop.
* Here we test whether `Task.parSequenceN` behaves according to the Monix 2 semantics.
*/
@Test
def Task_parSequenceN_should_NOT_be_canceled: Unit = {
implicit val s: TestScheduler = TestScheduler()
val num = new AtomicLong(0)
val canceled = Promise[Boolean]()
val seq = Seq(
Task.unit
.delayExecution(4.seconds)
.doOnCancel(Task.eval(canceled.success(true)).void),
Task(num.compareAndSet(0, 10)).delayExecution(1.second)
)
val f = Task.parSequenceN(1)(seq).runAsync

s.tick(1.second)
f.cancel()
s.tick(2.second)

// doOnCancel uses global scheduler, so we need to wait for it with await rather than tick
Await.ready(canceled.future, 5.second)

s.tick(1.day)
assertEquals(num.get(), 10)
}

@Test
def Task_parSequenceN_workers_dont_wait_for_each_other: Unit = {
implicit val s: TestScheduler = TestScheduler()
val seq = Seq(
Task.sleep(4.seconds).map(_ => 1),
Task.sleep(1.second).map(_ => 2),
Task.sleep(2.second).map(_ => 3)
)
val f = Task.parSequenceN(2)(seq).runAsync

s.tick(2.seconds)
assertEquals(f.value, None)

s.tick(2.seconds)
Await.ready(f, 1.second)
assertEquals(f.value, Some(Success(Vector(1, 2, 3))))
}

}
4 changes: 2 additions & 2 deletions backend/src/test/scala/bloop/task/TaskSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,6 @@ class TaskSpec {

val (one, two) = Await.result(future, 1.second)
assertEquals(one, two)

}

@Test
Expand All @@ -190,7 +189,8 @@ class TaskSpec {
val t2 = Task(ref.set(false))
val withTimeout = t1.timeoutTo(1.second, t2)

Await.result((withTimeout *> Task.sleep(2.seconds)).runAsync, 3.second)
// await just a bit longer than timeout value to see if t2 was cancelled (not executed)
Await.result((withTimeout *> Task.sleep(1.seconds)).runAsync, 3.second)
assertEquals(true, ref.get())
}

Expand Down
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@ lazy val frontend: Project = project
Dependencies.caseApp,
Dependencies.scalaDebugAdapter,
Dependencies.bloopConfig,
Dependencies.logback
Dependencies.logback,
Dependencies.oslib % Test
),
// needed for tests and to be automatically updated
Test / libraryDependencies += Dependencies.semanticdb intransitive (),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import java.nio.file.NoSuchFileException
import java.nio.file.Path
import java.util.Optional
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.TimeUnit

import scala.collection.mutable
import scala.concurrent.Await
Expand Down Expand Up @@ -159,7 +160,10 @@ object ResultsCache {
logger: Logger
): ResultsCache = {
val handle = loadAsync(build, cwd, cleanOrphanedInternalDirs, logger)
Await.result(handle.runAsync(ExecutionContext.ioScheduler), Duration.Inf)
Await.result(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this is causing timeouts for DAP tests, any reason for the change?

handle.runAsync(ExecutionContext.ioScheduler),
Duration.apply(30, TimeUnit.SECONDS)
)
}

def loadAsync(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,8 @@ object CompileBundle {
import bloop.engine.ExecutionContext.ioScheduler
import compileDependenciesData.dependencyClasspath
val out = options.ngout
val classpathHashesTask = bloop.io.ClasspathHasher
.hash(dependencyClasspath, 10, cancelCompilation, ioScheduler, logger, tracer, out)
val classpathHashesTask = bloop.io.ClasspathHasher.global
.hash(dependencyClasspath, 10, cancelCompilation, logger, tracer, out)
.executeOn(ioScheduler)

val sourceHashesTask = tracer.traceTaskVerbose("discovering and hashing sources") { _ =>
Expand Down
1 change: 1 addition & 0 deletions frontend/src/test/scala/bloop/bsp/BspBaseSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,7 @@ abstract class BspBaseSuite extends BaseSuite with BspClientTest {
// https://github.com/scalacenter/bloop/issues/281
super.ignore(name, "DISABLED")(fun)
} else {
pprint.log(name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove?

super.test(name)(fun)
}
}
Expand Down
Loading
Loading