From 2acd64f13e22dbdba7128623319714574c8d0879 Mon Sep 17 00:00:00 2001 From: Sebastiaan Koppe Date: Wed, 18 Sep 2024 12:01:13 +0100 Subject: [PATCH] Allow scan over value-less sequences While not obviously useful, it becomes useful iff the delegate have side-effects. --- source/concurrency/sequence.d | 24 ++++++++++++++++++------ tests/ut/concurrency/sequence.d | 11 ++++++++++- 2 files changed, 28 insertions(+), 7 deletions(-) diff --git a/source/concurrency/sequence.d b/source/concurrency/sequence.d index 8fb9ed7..c60be93 100644 --- a/source/concurrency/sequence.d +++ b/source/concurrency/sequence.d @@ -321,9 +321,14 @@ auto nextTransform(Sequence, NextTransformer)(Sequence s, NextTransformer t) { } struct SequenceNextTransform(Sequence, NextTransformer) { + import concurrency.sender : VoidSender; import concurrency : just; alias Value = void; - alias Element = typeof(t.setNext(just(Sequence.Element.init))).Value; + static if (is(Sequence.Element == void)) { + alias Element = typeof(t.setNext(VoidSender())).Value; + } else { + alias Element = typeof(t.setNext(just(Sequence.Element.init))).Value; + } Sequence s; NextTransformer t; auto connect(Receiver)(return Receiver receiver) @safe return scope { @@ -506,7 +511,7 @@ struct SequenceTakeReceiver(Receiver) { return Result!(Sender.Value)(Cancelled()); else { state.n--; - return Result!(Sender.Value)(); + return Result!(Sender.Value)(Completed()); } })); } else { @@ -923,10 +928,17 @@ struct ScanSequenceTransformer(Fun, Seed) { Seed seed; auto setNext(Sender)(Sender sender) { import concurrency.operations : then; - return sender.then((Sender.Value value) @safe shared { - seed = fun(value, seed); - return seed; - }); + static if (is(Sender.Value == void)) { + return sender.then(() @safe shared { + seed = fun(seed); + return seed; + }); + } else { + return sender.then((Sender.Value value) @safe shared { + seed = fun(value, seed); + return seed; + }); + } } } diff --git a/tests/ut/concurrency/sequence.d b/tests/ut/concurrency/sequence.d index 0c25aee..a9721ca 100644 --- a/tests/ut/concurrency/sequence.d +++ b/tests/ut/concurrency/sequence.d @@ -35,6 +35,8 @@ import unit_threaded; @("take") @safe unittest { [1,2,3,4].sequence.take(3).toList().syncWait.value.should == [1,2,3]; + [1,2,3,4].sequence.take(3).transform((int i) => i*2).toList().syncWait.value.should == [2,4,6]; + [1,2,3,4].sequence.transform((int i) => i*2).take(3).toList().syncWait.value.should == [2,4,6]; } @("deferSequence.function") @@ -135,11 +137,18 @@ import unit_threaded; just([1,2].sequence).flatten.nextTransform(Transformer()).toList.syncWait.value.should == [1,2]; } -@("scan") +@("scan.value") @safe unittest { [1,1,1,1].sequence.scan((int i, int acc) => acc + i, 0).toList().syncWait.value.should == [1,2,3,4]; } +@("scan.void") +@safe unittest { + import core.time : msecs; + interval(1.msecs, false).scan((int acc) => acc + 1, 0).take(4).toList.syncWait.value.should == [1,2,3,4]; + interval(1.msecs, false).take(4).scan((int acc) => acc + 1, 0).toList.syncWait.value.should == [1,2,3,4]; +} + @("iotaSequence.basic") @safe unittest { iotaSequence(5, 10).toList().syncWait.value.should == [5,6,7,8,9];