diff --git a/source/concurrency/sequence.d b/source/concurrency/sequence.d index 3ab9ba6..c811021 100644 --- a/source/concurrency/sequence.d +++ b/source/concurrency/sequence.d @@ -1197,6 +1197,97 @@ struct SampleSequenceOp(BaseSequence, TriggerSequence, Receiver) { } } +/// slides a window over a stream, emitting all items in the window as an array. The array is reused so you must duplicate if you want to access it beyond the stream. +auto slide(Sequence)(Sequence sequence, long window, long step = 1) { + static assert(!is(Sequence.Element == void), "Sequence passed to slide must produce elements."); + + import std.exception : enforce; + enforce(window > 0, "window must be greater than 0."); + enforce(step > 0, "step must be greated than 0."); + + return SlideSequence!(Sequence)(sequence, window, step); +} + +struct SlideSequence(Sequence) { + alias Value = void; + alias Element = Sequence.Element[]; + + Sequence sequence; + + long window, step; + + auto connect(Receiver)(return Receiver receiver) @safe return scope { + auto op = SlideSequenceOp!(Sequence, Receiver)(sequence, window, step, receiver); + return op; + } +} + +struct SlideState(Element) { + long window, step, skip; + Element[] arr; + this(long window, long step) { + this.window = window; + this.step = step; + arr.reserve(window); + } +} + +struct SlideSequenceOp(Sequence, Receiver) { + import concurrency.sender : OpType; + alias Element = Sequence.Element; + + alias Op = OpType!(FilterMapSequence!(Sequence, Nullable!(Element[]) delegate(Sequence.Element) @safe pure nothrow), Receiver); + + long step, pos; + Element[] arr; + Op op; + + @disable this(ref return scope typeof(this) rhs); + @disable this(this); + + @disable void opAssign(typeof(this) rhs) nothrow @safe @nogc; + @disable void opAssign(ref typeof(this) rhs) nothrow @safe @nogc; + + this(Sequence sequence, long window, long step, return Receiver receiver) @trusted return scope { + arr.length = window; + this.step = step; + op = sequence.filterMap(&this.onNext).connect(receiver); + } + + void start() scope { + op.start(); + } + + Nullable!(Element[]) onNext(Sequence.Element element) @safe { + import std.algorithm : moveAll; + if (pos < 0) { + pos++; + return Nullable!(Element[]).init; + } + + if (pos+1 > arr.length) { + if (step < arr.length) { + moveAll(arr[step .. $], arr[0 .. $ - step]); + pos -= step; + } else { + pos = (cast(long)arr.length) - step; + if (pos < 0) { + pos++; + return Nullable!(Element[]).init; + } + } + } + + arr[pos] = element; + pos++; + + if (pos == arr.length) { + return Nullable!(Element[])(arr.dup); + } else { + return Nullable!(Element[]).init; + } + } +} // cron - create a sequence like interval but using cron spec diff --git a/tests/ut/concurrency/sequence.d b/tests/ut/concurrency/sequence.d index 3c7312f..e3a1480 100644 --- a/tests/ut/concurrency/sequence.d +++ b/tests/ut/concurrency/sequence.d @@ -197,3 +197,37 @@ import unit_threaded; driver, ).syncWait.value.should == [1,3,5,7]; } + +@("slide.basic") +@safe unittest { + [1,2,3,4,5,6].sequence().slide(3,1).toList().syncWait.value.should == [ + [1,2,3], + [2,3,4], + [3,4,5], + [4,5,6] + ]; +} + +@("slide.step.skip") +@safe unittest { + [1,2,3,4,5,6].sequence().slide(3,2).toList().syncWait.value.should == [ + [1,2,3], + [3,4,5] + ]; +} + +@("slide.step.full") +@safe unittest { + [1,2,3,4,5,6].sequence().slide(3,3).toList().syncWait.value.should == [ + [1,2,3], + [4,5,6] + ]; +} + +@("slide.step.negative") +@safe unittest { + [1,2,3,4,5,6].sequence().slide(2,3).toList().syncWait.value.should == [ + [1,2], + [4,5] + ]; +} \ No newline at end of file