From f3e816a82d6f85cb89331d325b1293c4db0c64f2 Mon Sep 17 00:00:00 2001 From: ghostbuster91 Date: Tue, 15 Mar 2022 17:58:48 +0100 Subject: [PATCH] Add evalScan1 operator --- core/shared/src/main/scala/fs2/Stream.scala | 23 +++++++++++++++++++ .../scala/fs2/StreamCombinatorsSuite.scala | 13 +++++++++++ 2 files changed, 36 insertions(+) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 7805d864a6..69ce49a4c9 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -1037,6 +1037,29 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, (Pull.output1(z) >> go(z, this)).stream } + /** Like `[[Stream#scan1]]`, but accepts a function returning an `F[_]`. + * + * @example {{{ + * scala> import cats.effect.SyncIO + * scala> Stream(1,2,3,4).covary[SyncIO].evalScan1((acc,i) => SyncIO(acc + i)).compile.toVector.unsafeRunSync() + * res0: Vector[Int] = Vector(1, 3, 6, 10) + * }}} + */ + def evalScan1[F2[x] >: F[x], O2 >: O](f: (O2, O2) => F2[O2]): fs2.Stream[F2, O2] = { + def go(z: O2, s: fs2.Stream[F2, O]): Pull[F2, O2, Unit] = + s.pull.uncons1.flatMap { + case Some((hd, tl)) => + Pull.eval(f(z, hd)).flatMap(o => Pull.output1(o) >> go(o, tl)) + case None => Pull.done + } + this.pull.uncons.flatMap { + case None => Pull.done + case Some((hd, tl)) => + val (pre, post) = hd.splitAt(1) + Pull.output(pre) >> go(pre(0), tl.cons(post)) + }.stream + } + /** Like `observe` but observes with a function `O => F[O2]` instead of a pipe. * Not as powerful as `observe` since not all pipes can be represented by `O => F[O2]`, but much faster. * Alias for `evalMap(o => f(o).as(o))`. diff --git a/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala b/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala index 39cd4de450..469337c651 100644 --- a/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala +++ b/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala @@ -480,6 +480,19 @@ class StreamCombinatorsSuite extends Fs2Suite { } } + test("evalScan1") { + forAllF { (s: Stream[Pure, Int]) => + val v = s.toVector + val f = (a: Int, b: Int) => a + b + val g = (a: Int, b: Int) => IO.pure(a + b) + s.covary[IO] + .evalScan1(g) + .assertEmits( + v.headOption.fold(Vector.empty[Int])(h => v.drop(1).scanLeft(h)(f)).toList + ) + } + } + test("every".flaky) { type BD = (Boolean, FiniteDuration) def durationSinceLastTrue[F[_]]: Pipe[F, BD, BD] = {