Skip to content

Commit

Permalink
fix sink impl
Browse files Browse the repository at this point in the history
  • Loading branch information
Leo Conforti committed Oct 17, 2024
1 parent 1e63c06 commit 6876aea
Showing 1 changed file with 2 additions and 12 deletions.
14 changes: 2 additions & 12 deletions src/convey/Sinks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@
import * as Chunk from "effect/Chunk";
import * as Console from "effect/Console";
import * as Effect from "effect/Effect";
import * as Function from "effect/Function";
import * as Predicate from "effect/Predicate";
import * as Scope from "effect/Scope";
import * as Sink from "effect/Sink";
import * as Stream from "effect/Stream";
import * as Tuple from "effect/Tuple";

import { JSONMessage } from "../generated/JSONMessage.generated.js";

Expand Down Expand Up @@ -52,14 +52,4 @@ export const followProgressSink = Sink.forEach<JSONMessage, void, never, never>(
export const followProgressInConsole = <E1, R1>(
stream: Stream.Stream<JSONMessage, E1, R1>
): Effect.Effect<Chunk.Chunk<JSONMessage>, E1, Exclude<R1, Scope.Scope>> =>
Effect.gen(function* () {
const firstStream = stream;
const secondStream = yield* Stream.broadcastDynamic(stream, 16);
const effects = Tuple.make(
waitForProgressToComplete(firstStream),
Stream.run(secondStream, followProgressSink)
);
return yield* Effect.all(effects, { concurrency: 2 });
})
.pipe(Effect.scoped)
.pipe(Effect.map(Tuple.getFirst));
Function.pipe(stream, Stream.tapSink(followProgressSink), waitForProgressToComplete);

0 comments on commit 6876aea

Please sign in to comment.