Skip to content

Commit

Permalink
SkipLast
Browse files Browse the repository at this point in the history
  • Loading branch information
neuecc committed Jan 1, 2024
1 parent 43143e9 commit 7e49e76
Show file tree
Hide file tree
Showing 3 changed files with 255 additions and 0 deletions.
177 changes: 177 additions & 0 deletions src/R3/Operators/SkipLast.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
namespace R3;

public static partial class ObservableExtensions
{
public static Observable<T> SkipLast<T>(this Observable<T> source, int count)
{
if (count < 0) throw new ArgumentOutOfRangeException("count");
return new SkipLast<T>(source, count);
}

// TimeBased

public static Observable<T> SkipLast<T>(this Observable<T> source, TimeSpan duration)
{
return SkipLast(source, duration, ObservableSystem.DefaultTimeProvider);
}

public static Observable<T> SkipLast<T>(this Observable<T> source, TimeSpan duration, TimeProvider timeProvider)
{
return new SkipLastTime<T>(source, duration, timeProvider);
}

// SkipLastFrame

public static Observable<T> SkipLastFrame<T>(this Observable<T> source, int frameCount)
{
return SkipLastFrame(source, frameCount, ObservableSystem.DefaultFrameProvider);
}

public static Observable<T> SkipLastFrame<T>(this Observable<T> source, int frameCount, FrameProvider frameProvider)
{
return new SkipLastFrame<T>(source, frameCount, frameProvider);
}
}

internal sealed class SkipLast<T>(Observable<T> source, int count) : Observable<T>
{
protected override IDisposable SubscribeCore(Observer<T> observer)
{
return source.Subscribe(new _SkipLast(observer, count));
}

sealed class _SkipLast(Observer<T> observer, int count) : Observer<T>, IDisposable
{
Queue<T> queue = new Queue<T>(count);

protected override void OnNextCore(T value)
{
queue.Enqueue(value);
if (queue.Count > count)
{
observer.OnNext(queue.Dequeue());
}
}

protected override void OnErrorResumeCore(Exception error)
{
observer.OnErrorResume(error);
}

protected override void OnCompletedCore(Result result)
{
observer.OnCompleted(result);
}

protected override void DisposeCore()
{
queue.Clear();
}
}
}

internal sealed class SkipLastTime<T>(Observable<T> source, TimeSpan duration, TimeProvider timeProvider) : Observable<T>
{
protected override IDisposable SubscribeCore(Observer<T> observer)
{
return source.Subscribe(new _SkipLastTime(observer, duration, timeProvider));
}

sealed class _SkipLastTime : Observer<T>, IDisposable
{
readonly Observer<T> observer;
readonly Queue<(long timestamp, T value)> queue = new();
readonly TimeSpan duration;
readonly TimeProvider timeProvider;

public _SkipLastTime(Observer<T> observer, TimeSpan duration, TimeProvider timeProvider)
{
this.observer = observer;
this.timeProvider = timeProvider;
this.duration = duration;
}

protected override void OnNextCore(T value)
{
var current = timeProvider.GetTimestamp();
queue.Enqueue((current, value));
while (queue.Count > 0 && timeProvider.GetElapsedTime(queue.Peek().timestamp, current) >= duration)
{
observer.OnNext(queue.Dequeue().value);
}
}

protected override void OnErrorResumeCore(Exception error)
{
observer.OnErrorResume(error);
}

protected override void OnCompletedCore(Result result)
{
var current = timeProvider.GetTimestamp();
while (queue.Count > 0 && timeProvider.GetElapsedTime(queue.Peek().timestamp, current) >= duration)
{
observer.OnNext(queue.Dequeue().value);
}
observer.OnCompleted(result);
}

protected override void DisposeCore()
{
queue.Clear();
}
}
}

internal sealed class SkipLastFrame<T>(Observable<T> source, int frameCount, FrameProvider frameProvider) : Observable<T>
{
protected override IDisposable SubscribeCore(Observer<T> observer)
{
return source.Subscribe(new _SkipLastFrame(observer, frameCount, frameProvider));
}

sealed class _SkipLastFrame : Observer<T>, IDisposable
{
readonly Observer<T> observer;
readonly Queue<(long frameCount, T value)> queue = new();
readonly int frameCount;
readonly FrameProvider frameProvider;

public _SkipLastFrame(Observer<T> observer, int frameCount, FrameProvider frameProvider)
{
this.observer = observer;
this.frameCount = frameCount;
this.frameProvider = frameProvider;
}

protected override void OnNextCore(T value)
{
var current = frameProvider.GetFrameCount();
queue.Enqueue((current, value));
while (queue.Count > 0 && current - queue.Peek().frameCount >= frameCount)
{
observer.OnNext(queue.Dequeue().value);
}
}

protected override void OnErrorResumeCore(Exception error)
{
observer.OnErrorResume(error);
}

protected override void OnCompletedCore(Result result)
{
var current = frameProvider.GetFrameCount();
while (queue.Count > 0 && current - queue.Peek().frameCount >= frameCount)
{
observer.OnNext(queue.Dequeue().value);
}
observer.OnCompleted(result);
}

protected override void DisposeCore()
{
queue.Clear();
}
}
}
8 changes: 8 additions & 0 deletions src/R3/Operators/TakeLast.cs
Original file line number Diff line number Diff line change
Expand Up @@ -223,5 +223,13 @@ void Trim(long currentFrameCount)
queue.Dequeue();
}
}

protected override void DisposeCore()
{
lock (gate)
{
queue.Clear();
}
}
}
}
70 changes: 70 additions & 0 deletions tests/R3.Tests/OperatorTests/SkipLastTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
using System.Reactive.Linq;

namespace R3.Tests.OperatorTests;

public class SkipLastTest
{
[Fact]
public async Task Skip()
{
var xs = await Observable.Range(1, 10).SkipLast(3).ToArrayAsync();
xs.Should().Equal([1, 2, 3, 4, 5, 6, 7]);
}

[Fact]
public void SkipTime()
{
var timeProvider = new FakeTimeProvider();

var publisher = new Subject<int>();
var list = publisher.SkipLast(TimeSpan.FromSeconds(3), timeProvider).ToLiveList();

publisher.OnNext(1);
publisher.OnNext(10);
list.AssertEqual([]);

timeProvider.Advance(TimeSpan.FromSeconds(2));
publisher.OnNext(100);
publisher.OnNext(1000);

timeProvider.Advance(TimeSpan.FromSeconds(1));
publisher.OnNext(2);
publisher.OnNext(20);

publisher.OnCompleted();

list.AssertEqual([1, 10]);
list.AssertIsCompleted();
}

[Fact]
public void SkipFrame2()
{
var frameProvider = new ManualFrameProvider();
var cts = new CancellationTokenSource();

var list = Observable.EveryUpdate(frameProvider, cts.Token)
.Select(x => frameProvider.GetFrameCount())
.SkipLastFrame(3, frameProvider)
.ToLiveList();

frameProvider.Advance(3); // 0, 1, 2
list.AssertEqual([]);
frameProvider.Advance(1); // 3
list.AssertEqual([0]);

frameProvider.Advance(1); // 4
list.AssertEqual([0, 1]);

frameProvider.Advance(1); // 5
list.AssertEqual([0, 1, 2]);

frameProvider.Advance(1); // 6
list.AssertEqual([0, 1, 2, 3]);

cts.Cancel(); // stop and OnCompleted(frame no is adavnced +1)

list.AssertEqual([0, 1, 2, 3, 4]);
list.AssertIsCompleted();
}
}

0 comments on commit 7e49e76

Please sign in to comment.