Skip to content

Commit

Permalink
Add slide sequence operator
Browse files Browse the repository at this point in the history
  • Loading branch information
skoppe committed Sep 18, 2024
1 parent 705a319 commit fc3f728
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 0 deletions.
91 changes: 91 additions & 0 deletions source/concurrency/sequence.d
Original file line number Diff line number Diff line change
Expand Up @@ -1197,6 +1197,97 @@ struct SampleSequenceOp(BaseSequence, TriggerSequence, Receiver) {
}
}

/// slides a window over a stream, emitting all items in the window as an array. The array is reused so you must duplicate if you want to access it beyond the stream.
auto slide(Sequence)(Sequence sequence, long window, long step = 1) {
static assert(!is(Sequence.Element == void), "Sequence passed to slide must produce elements.");

import std.exception : enforce;
enforce(window > 0, "window must be greater than 0.");
enforce(step > 0, "step must be greated than 0.");

return SlideSequence!(Sequence)(sequence, window, step);
}

struct SlideSequence(Sequence) {
alias Value = void;
alias Element = Sequence.Element[];

Sequence sequence;

long window, step;

auto connect(Receiver)(return Receiver receiver) @safe return scope {
auto op = SlideSequenceOp!(Sequence, Receiver)(sequence, window, step, receiver);
return op;
}
}

struct SlideState(Element) {
long window, step, skip;
Element[] arr;
this(long window, long step) {
this.window = window;
this.step = step;
arr.reserve(window);
}
}

struct SlideSequenceOp(Sequence, Receiver) {
import concurrency.sender : OpType;
alias Element = Sequence.Element;

alias Op = OpType!(FilterMapSequence!(Sequence, Nullable!(Element[]) delegate(Sequence.Element) @safe pure nothrow), Receiver);

long step, pos;
Element[] arr;
Op op;

@disable this(ref return scope typeof(this) rhs);
@disable this(this);

@disable void opAssign(typeof(this) rhs) nothrow @safe @nogc;
@disable void opAssign(ref typeof(this) rhs) nothrow @safe @nogc;

this(Sequence sequence, long window, long step, return Receiver receiver) @trusted return scope {
arr.length = window;
this.step = step;
op = sequence.filterMap(&this.onNext).connect(receiver);
}

void start() scope {
op.start();
}

Nullable!(Element[]) onNext(Sequence.Element element) @safe {
import std.algorithm : moveAll;
if (pos < 0) {
pos++;
return Nullable!(Element[]).init;
}

if (pos+1 > arr.length) {
if (step < arr.length) {
moveAll(arr[step .. $], arr[0 .. $ - step]);
pos -= step;
} else {
pos = (cast(long)arr.length) - step;
if (pos < 0) {
pos++;
return Nullable!(Element[]).init;
}
}
}

arr[pos] = element;
pos++;

if (pos == arr.length) {
return Nullable!(Element[])(arr.dup);
} else {
return Nullable!(Element[]).init;
}
}
}

// cron - create a sequence like interval but using cron spec

Expand Down
34 changes: 34 additions & 0 deletions tests/ut/concurrency/sequence.d
Original file line number Diff line number Diff line change
Expand Up @@ -197,3 +197,37 @@ import unit_threaded;
driver,
).syncWait.value.should == [1,3,5,7];
}

@("slide.basic")
@safe unittest {
[1,2,3,4,5,6].sequence().slide(3,1).toList().syncWait.value.should == [
[1,2,3],
[2,3,4],
[3,4,5],
[4,5,6]
];
}

@("slide.step.skip")
@safe unittest {
[1,2,3,4,5,6].sequence().slide(3,2).toList().syncWait.value.should == [
[1,2,3],
[3,4,5]
];
}

@("slide.step.full")
@safe unittest {
[1,2,3,4,5,6].sequence().slide(3,3).toList().syncWait.value.should == [
[1,2,3],
[4,5,6]
];
}

@("slide.step.negative")
@safe unittest {
[1,2,3,4,5,6].sequence().slide(2,3).toList().syncWait.value.should == [
[1,2],
[4,5]
];
}

0 comments on commit fc3f728

Please sign in to comment.