Skip to content

Commit

Permalink
All, Any, IsEmpty, MaxBy, MinBy, SequenceEqual, ToDictionary, ToLookup
Browse files Browse the repository at this point in the history
  • Loading branch information
neuecc committed Jan 4, 2024
1 parent 41556c0 commit 57b0a77
Show file tree
Hide file tree
Showing 8 changed files with 619 additions and 22 deletions.
2 changes: 1 addition & 1 deletion sandbox/ConsoleApp1/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
//subject1.OnNext(10);
//subject1.OnNext(100);


// subject1.SequenceEqual(


// System.Reactive.Linq.Observable.Switch(
Expand Down
2 changes: 0 additions & 2 deletions src/R3/Operators/AggregateOperators.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

namespace R3;

// TODO: ToDictionary
// TODO: ToLookup
// TODO: Selector APIs

public static partial class ObservableExtensions
Expand Down
110 changes: 110 additions & 0 deletions src/R3/Operators/AllAnyIsEmptyAsync.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
namespace R3;

public static partial class ObservableExtensions
{
public static Task<bool> AllAsync<T>(this Observable<T> source, Func<T, bool> predicate, CancellationToken cancellationToken = default)
{
var observer = new AllAsync<T>(predicate, cancellationToken);
source.Subscribe(observer);
return observer.Task;
}

public static Task<bool> AnyAsync<T>(this Observable<T> source, CancellationToken cancellationToken = default)
{
return AnyAsync<T>(source, static x => true, cancellationToken);
}

public static Task<bool> AnyAsync<T>(this Observable<T> source, Func<T, bool> predicate, CancellationToken cancellationToken = default)
{
var observer = new AnyAsync<T>(predicate, cancellationToken);
source.Subscribe(observer);
return observer.Task;
}

public static Task<bool> IsEmptyAsync<T>(this Observable<T> source, CancellationToken cancellationToken = default)
{
var observer = new IsEmptyAsync<T>(cancellationToken);
source.Subscribe(observer);
return observer.Task;
}
}

internal sealed class AllAsync<T>(Func<T, bool> predicate, CancellationToken cancellationToken)
: TaskObserverBase<T, bool>(cancellationToken)
{
protected override void OnNextCore(T value)
{
if (!predicate(value))
{
TrySetResult(false);
}
}

protected override void OnErrorResumeCore(Exception error)
{
TrySetException(error);
}

protected override void OnCompletedCore(Result result)
{
if (result.IsFailure)
{
TrySetException(result.Exception);
return;
}
TrySetResult(true);
}
}

// AnyAsync
internal sealed class AnyAsync<T>(Func<T, bool> predicate, CancellationToken cancellationToken)
: TaskObserverBase<T, bool>(cancellationToken)
{
protected override void OnNextCore(T value)
{
if (predicate(value))
{
TrySetResult(true);
}
}

protected override void OnErrorResumeCore(Exception error)
{
TrySetException(error);
}

protected override void OnCompletedCore(Result result)
{
if (result.IsFailure)
{
TrySetException(result.Exception);
return;
}
TrySetResult(false);
}
}

// IsEmptyAsync
internal sealed class IsEmptyAsync<T>(CancellationToken cancellationToken)
: TaskObserverBase<T, bool>(cancellationToken)
{
protected override void OnNextCore(T value)
{
TrySetResult(false);
}

protected override void OnErrorResumeCore(Exception error)
{
TrySetException(error);
}

protected override void OnCompletedCore(Result result)
{
if (result.IsFailure)
{
TrySetException(result.Exception);
return;
}
TrySetResult(true);
}
}
132 changes: 132 additions & 0 deletions src/R3/Operators/MaxByMinByAsync.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
namespace R3;

public static partial class ObservableExtensions
{
public static Task<T> MaxByAsync<T, TKey>(this Observable<T> source, Func<T, TKey> keySelector, CancellationToken cancellationToken = default)
{
return MaxByAsync(source, keySelector, Comparer<TKey>.Default, cancellationToken);
}

public static Task<T> MaxByAsync<T, TKey>(this Observable<T> source, Func<T, TKey> keySelector, IComparer<TKey> comparer, CancellationToken cancellationToken = default)
{
var method = new MaxByAsync<T, TKey>(keySelector, comparer, cancellationToken);
source.Subscribe(method);
return method.Task;
}

public static Task<T> MinByAsync<T, TKey>(this Observable<T> source, Func<T, TKey> keySelector, CancellationToken cancellationToken = default)
{
return MinByAsync(source, keySelector, Comparer<TKey>.Default, cancellationToken);
}

public static Task<T> MinByAsync<T, TKey>(this Observable<T> source, Func<T, TKey> keySelector, IComparer<TKey> comparer, CancellationToken cancellationToken = default)
{
var method = new MinByAsync<T, TKey>(keySelector, comparer, cancellationToken);
source.Subscribe(method);
return method.Task;
}
}

// MaxByAsync
internal sealed class MaxByAsync<T, TKey>(Func<T, TKey> keySelector, IComparer<TKey> comparer, CancellationToken cancellationToken)
: TaskObserverBase<T, T>(cancellationToken)
{
T? latestValue;
TKey? latestKey;
bool hasValue;

protected override void OnNextCore(T value)
{
if (!hasValue)
{
hasValue = true;
latestValue = value;
return;
}

var key = keySelector(value);
if (comparer.Compare(key, latestKey) > 0)
{
latestValue = value;
latestKey = key;
hasValue = true;
}

TrySetResult(value);
}

protected override void OnErrorResumeCore(Exception error)
{
TrySetException(error);
}

protected override void OnCompletedCore(Result result)
{
if (result.IsFailure)
{
TrySetException(result.Exception);
return;
}

if (hasValue)
{
TrySetResult(latestValue!);
}
else
{
TrySetException(new InvalidOperationException("no elements"));
}
}
}

// MinByAsync
internal sealed class MinByAsync<T, TKey>(Func<T, TKey> keySelector, IComparer<TKey> comparer, CancellationToken cancellationToken)
: TaskObserverBase<T, T>(cancellationToken)
{
T? latestValue;
TKey? latestKey;
bool hasValue;

protected override void OnNextCore(T value)
{
if (!hasValue)
{
hasValue = true;
latestValue = value;
return;
}

var key = keySelector(value);
if (comparer.Compare(key, latestKey) < 0)
{
latestValue = value;
latestKey = key;
hasValue = true;
}

TrySetResult(value);
}

protected override void OnErrorResumeCore(Exception error)
{
TrySetException(error);
}

protected override void OnCompletedCore(Result result)
{
if (result.IsFailure)
{
TrySetException(result.Exception);
return;
}

if (hasValue)
{
TrySetResult(latestValue!);
}
else
{
TrySetException(new InvalidOperationException("no elements"));
}
}
}
128 changes: 128 additions & 0 deletions src/R3/Operators/SequenceEqualAsync.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
namespace R3;

public static partial class ObservableExtensions
{
public static Task<bool> SequenceEqualAsync<T>(this Observable<T> source, Observable<T> second, CancellationToken cancellationToken = default)
{
throw new NotImplementedException();
}

public static Task<bool> SequenceEqualAsync<T>(this Observable<T> source, Observable<T> second, IEqualityComparer<T> equalityComparer, CancellationToken cancellationToken = default)
{
var method = new SequenceEqualAsync<T>(equalityComparer, cancellationToken);
try
{
source.Subscribe(method.leftObserver);
second.Subscribe(method.rightObserver);
}
catch
{
method.Dispose();
throw;
}

return method.Task;
}
}

// SequenceEqualAsync
internal sealed class SequenceEqualAsync<T> : TaskObserverBase<T, bool>
{
public readonly IEqualityComparer<T> equalityComparer;
public SequenceEqualAsyncObserver leftObserver;
public SequenceEqualAsyncObserver rightObserver;

public SequenceEqualAsync(IEqualityComparer<T> equalityComparer, CancellationToken cancellationToken)
: base(cancellationToken)
{
this.equalityComparer = equalityComparer;
this.leftObserver = new SequenceEqualAsyncObserver(this);
this.rightObserver = new SequenceEqualAsyncObserver(this);
}

protected override void OnNextCore(T value)
{
}

protected override void OnErrorResumeCore(Exception error)
{
TrySetException(error);
}

protected override void OnCompletedCore(Result result)
{
if (result.IsFailure)
{
TrySetException(result.Exception);
}
}

protected override void DisposeCore()
{
leftObserver.Dispose();
rightObserver.Dispose();
}

// called in lock
void CheckValues()
{
while (leftObserver.values.Count != 0 && rightObserver.values.Count != 0)
{
var left = leftObserver.values.Dequeue();
var right = rightObserver.values.Dequeue();
if (!equalityComparer.Equals(left, right))
{
TrySetResult(false);
return;
}
}

if (leftObserver.IsCompleted && rightObserver.IsCompleted)
{
if (leftObserver.values.Count == 0 && rightObserver.values.Count == 0)
{
TrySetResult(true);
}
else
{
TrySetResult(false);
}
}
}

internal sealed class SequenceEqualAsyncObserver(SequenceEqualAsync<T> parent) : Observer<T>
{
public Queue<T> values = new Queue<T>();
public bool IsCompleted;

protected override void OnNextCore(T value)
{
lock (parent)
{
values.Enqueue(value);
parent.CheckValues();
}
}

protected override void OnErrorResumeCore(Exception error)
{
parent.OnErrorResume(error);
}

protected override void OnCompletedCore(Result result)
{
if (result.IsFailure)
{
parent.OnCompleted(result);
}
else
{
lock (parent)
{
IsCompleted = true;
parent.CheckValues();
}
}
}
}
}
Loading

0 comments on commit 57b0a77

Please sign in to comment.