Skip to content

Commit

Permalink
SkipUntil, SkipWhile
Browse files Browse the repository at this point in the history
  • Loading branch information
neuecc committed Jan 1, 2024
1 parent 7e49e76 commit 2c092b6
Show file tree
Hide file tree
Showing 6 changed files with 382 additions and 6 deletions.
197 changes: 197 additions & 0 deletions src/R3/Operators/SkipUntil.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
namespace R3;

public static partial class ObservableExtensions
{
public static Observable<T> SkipUntil<T, TOther>(this Observable<T> source, Observable<TOther> other)
{
return new SkipUntil<T, TOther>(source, other);
}

public static Observable<T> SkipUntil<T>(this Observable<T> source, CancellationToken cancellationToken)
{
if (!cancellationToken.CanBeCanceled) throw new ArgumentException("cancellationToken must be cancellable", nameof(cancellationToken));
return new SkipUntilC<T>(source, cancellationToken);
}

public static Observable<T> SkipUntil<T>(this Observable<T> source, Task task)
{
return new SkipUntilT<T>(source, task);
}
}

internal sealed class SkipUntil<T, TOther>(Observable<T> source, Observable<TOther> other) : Observable<T>
{
protected override IDisposable SubscribeCore(Observer<T> observer)
{
var skipUntil = new _SkipUntil(observer);
var otherSubscription = other.Subscribe(skipUntil.otherObserver);
try
{
return source.Subscribe(skipUntil); // subscription contains self and other.
}
catch
{
otherSubscription.Dispose();
throw;
}
}

sealed class _SkipUntil : Observer<T>
{
readonly Observer<T> observer;
internal readonly SkipUntilOtherObserver otherObserver;

internal bool open;

public _SkipUntil(Observer<T> observer)
{
this.observer = observer;
this.otherObserver = new SkipUntilOtherObserver(this);
}

protected override void OnNextCore(T value)
{
if (Volatile.Read(ref open))
{
observer.OnNext(value);
}
}

protected override void OnErrorResumeCore(Exception error)
{
observer.OnErrorResume(error);
}

protected override void OnCompletedCore(Result result)
{
observer.OnCompleted(result);
}

protected override void DisposeCore()
{
otherObserver.Dispose();
}
}

sealed class SkipUntilOtherObserver(_SkipUntil parent) : Observer<TOther>
{
protected override void OnNextCore(TOther value)
{
Volatile.Write(ref parent.open, true);
Dispose();
}

protected override void OnErrorResumeCore(Exception error)
{
parent.OnErrorResume(error);
}

protected override void OnCompletedCore(Result result)
{
parent.OnCompleted(result);
}
}
}

internal sealed class SkipUntilC<T>(Observable<T> source, CancellationToken cancellationToken) : Observable<T>
{
protected override IDisposable SubscribeCore(Observer<T> observer)
{
return source.Subscribe(new _SkipUntil(observer, cancellationToken));
}

sealed class _SkipUntil : Observer<T>, IDisposable
{
static readonly Action<object?> cancellationCallback = CancellationCallback;

readonly Observer<T> observer;
CancellationTokenRegistration tokenRegistration;
bool open;

public _SkipUntil(Observer<T> observer, CancellationToken cancellationToken)
{
this.observer = observer;
this.tokenRegistration = cancellationToken.Register(cancellationCallback, this);
}

protected override void OnNextCore(T value)
{
if (Volatile.Read(ref open))
{
observer.OnNext(value);
}
}

protected override void OnErrorResumeCore(Exception error)
{
observer.OnErrorResume(error);
}

protected override void OnCompletedCore(Result result)
{
observer.OnCompleted(result);
}

static void CancellationCallback(object? state)
{
var self = (_SkipUntil)state!;
Volatile.Write(ref self.open, true);
}

protected override void DisposeCore()
{
tokenRegistration.Dispose();
}
}
}

internal sealed class SkipUntilT<T>(Observable<T> source, Task task) : Observable<T>
{
protected override IDisposable SubscribeCore(Observer<T> observer)
{
return source.Subscribe(new _SkipUntil(observer, task));
}

sealed class _SkipUntil : Observer<T>, IDisposable
{
readonly Observer<T> observer;
bool open;

public _SkipUntil(Observer<T> observer, Task task)
{
this.observer = observer;
TaskAwait(task);
}

protected override void OnNextCore(T value)
{
if (Volatile.Read(ref open))
{
observer.OnNext(value);
}
}

protected override void OnErrorResumeCore(Exception error)
{
observer.OnErrorResume(error);
}

protected override void OnCompletedCore(Result result)
{
observer.OnCompleted(result);
}

async void TaskAwait(Task task)
{
try
{
await task.ConfigureAwait(false);
Volatile.Write(ref open, true);
}
catch (Exception ex)
{
OnCompleted(Result.Failure(ex));
}
}
}
}
87 changes: 87 additions & 0 deletions src/R3/Operators/SkipWhile.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
namespace R3;

public static partial class ObservableExtensions
{
public static Observable<T> SkipWhile<T>(this Observable<T> source, Func<T, bool> predicate)
{
return new SkipWhile<T>(source, predicate);
}

public static Observable<T> SkipWhile<T>(this Observable<T> source, Func<T, int, bool> predicate)
{
return new SkipWhileI<T>(source, predicate);
}
}

internal sealed class SkipWhile<T>(Observable<T> source, Func<T, bool> predicate) : Observable<T>
{
protected override IDisposable SubscribeCore(Observer<T> observer)
{
return source.Subscribe(new _SkipWhile(observer, predicate));
}

sealed class _SkipWhile(Observer<T> observer, Func<T, bool> predicate) : Observer<T>, IDisposable
{
bool open;

protected override void OnNextCore(T value)
{
if (open)
{
observer.OnNext(value);
}
else if (!predicate(value))
{
open = true;
observer.OnNext(value);
}
}

protected override void OnErrorResumeCore(Exception error)
{
observer.OnErrorResume(error);
}

protected override void OnCompletedCore(Result result)
{
observer.OnCompleted(result);
}
}
}

internal sealed class SkipWhileI<T>(Observable<T> source, Func<T, int, bool> predicate) : Observable<T>
{
protected override IDisposable SubscribeCore(Observer<T> observer)
{
return source.Subscribe(new _SkipWhile(observer, predicate));
}

sealed class _SkipWhile(Observer<T> observer, Func<T, int, bool> predicate) : Observer<T>, IDisposable
{
int count;
bool open;

protected override void OnNextCore(T value)
{
if (open)
{
observer.OnNext(value);
}
else if (!predicate(value, count++))
{
open = true;
observer.OnNext(value);
}
}

protected override void OnErrorResumeCore(Exception error)
{
observer.OnErrorResume(error);
}

protected override void OnCompletedCore(Result result)
{
observer.OnCompleted(result);
}
}
}
7 changes: 4 additions & 3 deletions src/R3/Operators/TakeUntil.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@ protected override IDisposable SubscribeCore(Observer<T> observer)
sealed class _TakeUntil : Observer<T>
{
readonly Observer<T> observer;
internal readonly TakeUntilStopperobserver stopper; // this instance is not exposed to public so can use lock.
internal readonly TakeUntilStopperObserver stopper;

public _TakeUntil(Observer<T> observer)
{
this.observer = observer;
this.stopper = new TakeUntilStopperobserver(this);
this.stopper = new TakeUntilStopperObserver(this);
}

protected override void OnNextCore(T value)
Expand All @@ -76,11 +76,12 @@ protected override void DisposeCore()
}
}

sealed class TakeUntilStopperobserver(_TakeUntil parent) : Observer<TOther>
sealed class TakeUntilStopperObserver(_TakeUntil parent) : Observer<TOther>
{
protected override void OnNextCore(TOther value)
{
parent.OnCompleted(Result.Success);
Dispose();
}

protected override void OnErrorResumeCore(Exception error)
Expand Down
3 changes: 0 additions & 3 deletions src/R3/Operators/_Operators.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ public static partial class ObservableExtensions
// Standard Query:
// Distinct, DistinctBy, DistinctUntilChanged, Scan, DefaultIfEmpty

// SkipTake:
// Skip, SkipLast, SkipUntil, SkipWhile

// return tasks:
// All, Any, Contains, SequenceEqual, IsEmpty, MaxBy, MinBy, ToDictionary, ToLookup,
}
Loading

0 comments on commit 2c092b6

Please sign in to comment.