Skip to content

Commit

Permalink
CombineLatest, Zip, ZipLatest, WithLatestFrom(not tested)
Browse files Browse the repository at this point in the history
  • Loading branch information
neuecc committed Jan 4, 2024
1 parent a7bfb9d commit ee29a96
Show file tree
Hide file tree
Showing 13 changed files with 8,926 additions and 151 deletions.
22 changes: 16 additions & 6 deletions sandbox/ConsoleApp1/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,27 @@


var sw = Stopwatch.StartNew();
var subject = new System.Reactive.Subjects.Subject<int>();
subject.Timeout(TimeSpan.FromSeconds(3)).Subscribe(x => Console.WriteLine(x));
var subject1 = new System.Reactive.Subjects.Subject<int>();
var subject2 = new System.Reactive.Subjects.Subject<int>();
subject1.WithLatestFrom(subject2.Finally(() => Console.WriteLine("finally subject2")), (x, y) => (x, y)).Subscribe(x => Console.WriteLine(x), () => Console.WriteLine("end"));

subject.OnNext(1);
subject1.OnNext(1);
subject1.OnNext(10);
subject1.OnNext(100);

Console.ReadLine();

subject.OnNext(2);
// subject2.OnNext(2);

subject.OnCompleted();
subject1.OnNext(1000);

// subject2.OnError(new Exception());

subject1.OnNext(100000);
subject1.OnNext(1000000);
subject1.OnNext(10000000);
subject1.OnNext(100000000);

subject1.OnCompleted();



Expand Down
142 changes: 142 additions & 0 deletions src/R3/Factories/CombineLatest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
using System.Diagnostics.CodeAnalysis;

namespace R3;

public static partial class Observable
{
public static Observable<T[]> CombineLatest<T>(params Observable<T>[] sources)
{
return new CombineLatest<T>(sources);
}

public static Observable<T[]> CombineLatest<T>(IEnumerable<Observable<T>> sources)
{
return new CombineLatest<T>(sources);
}
}

internal sealed class CombineLatest<T>(IEnumerable<Observable<T>> sources) : Observable<T[]>
{
protected override IDisposable SubscribeCore(Observer<T[]> observer)
{
return new _CombineLatest(observer, sources).Run();
}

sealed class _CombineLatest : IDisposable
{
readonly Observer<T[]> observer;
readonly Observable<T>[] sources;
readonly CombineLatestObserver[] observers;
bool hasValueAll;
int completedCount;

public _CombineLatest(Observer<T[]> observer, IEnumerable<Observable<T>> sources)
{
this.observer = observer;
if (sources is Observable<T>[] array)
{
this.sources = array;
}
else
{
this.sources = sources.ToArray();
}

var observers = new CombineLatestObserver[this.sources.Length];
for (int i = 0; i < observers.Length; i++)
{
observers[i] = new CombineLatestObserver(this);
}
this.observers = observers;
}

public IDisposable Run()
{
try
{
for (int i = 0; i < sources.Length; i++)
{
sources[i].Subscribe(observers[i]);
}
}
catch
{
Dispose();
throw;
}
return this;
}

public void TryPublishOnNext()
{
if (!hasValueAll)
{
foreach (var item in observers)
{
if (!item.HasValue) return;
}
hasValueAll = true;
}

var values = new T[observers.Length];
for (int i = 0; i < observers.Length; i++)
{
values[i] = observers[i].Value!;
}
observer.OnNext(values);
}

public void TryPublishOnCompleted(Result result)
{
if (result.IsFailure)
{
observer.OnCompleted(result);
Dispose();
}
else
{
if (Interlocked.Increment(ref completedCount) == sources.Length)
{
observer.OnCompleted();
Dispose();
}
}
}

public void Dispose()
{
foreach (var observer in observers)
{
observer.Dispose();
}
}

sealed class CombineLatestObserver(_CombineLatest parent) : Observer<T>
{
public T? Value { get; private set; }

[MemberNotNullWhen(true, nameof(Value))]
public bool HasValue { get; private set; }

protected override void OnNextCore(T value)
{
lock (parent.observers)
{
this.Value = value;
this.HasValue = true;
parent.TryPublishOnNext();
}
}

protected override void OnErrorResumeCore(Exception error)
{
parent.observer.OnErrorResume(error);
}

protected override void OnCompletedCore(Result result)
{
parent.TryPublishOnCompleted(result);
}
}
}
}
132 changes: 132 additions & 0 deletions src/R3/Factories/Zip.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
namespace R3;

public static partial class Observable
{
public static Observable<T[]> Zip<T>(params Observable<T>[] sources)
{
return new Zip<T>(sources);
}

public static Observable<T[]> Zip<T>(IEnumerable<Observable<T>> sources)
{
return new Zip<T>(sources);
}
}

internal sealed class Zip<T>(IEnumerable<Observable<T>> sources) : Observable<T[]>
{
protected override IDisposable SubscribeCore(Observer<T[]> observer)
{
return new _Zip(observer, sources).Run();
}

sealed class _Zip : IDisposable
{
readonly Observer<T[]> observer;
readonly Observable<T>[] sources;
readonly ZipObserver[] observers;
int completedCount;

public _Zip(Observer<T[]> observer, IEnumerable<Observable<T>> sources)
{
this.observer = observer;
if (sources is Observable<T>[] array)
{
this.sources = array;
}
else
{
this.sources = sources.ToArray();
}

var observers = new ZipObserver[this.sources.Length];
for (int i = 0; i < observers.Length; i++)
{
observers[i] = new ZipObserver(this);
}
this.observers = observers;
}

public IDisposable Run()
{
try
{
for (int i = 0; i < sources.Length; i++)
{
sources[i].Subscribe(observers[i]);
}
}
catch
{
Dispose();
throw;
}
return this;
}

public void TryPublishOnNext()
{
foreach (var item in observers)
{
if (!item.HasValue) return;
}

var values = new T[observers.Length];
for (int i = 0; i < observers.Length; i++)
{
values[i] = observers[i].Values.Dequeue();
}
observer.OnNext(values);
}

public void TryPublishOnCompleted(Result result)
{
if (result.IsFailure)
{
observer.OnCompleted(result);
Dispose();
}
else
{
if (Interlocked.Increment(ref completedCount) == sources.Length)
{
observer.OnCompleted();
Dispose();
}
}
}

public void Dispose()
{
foreach (var observer in observers)
{
observer.Dispose();
}
}

sealed class ZipObserver(_Zip parent) : Observer<T>
{
public Queue<T> Values { get; private set; } = new Queue<T>();
public bool HasValue => Values.Count != 0;

protected override void OnNextCore(T value)
{
lock (parent.observers)
{
this.Values.Enqueue(value);
parent.TryPublishOnNext();
}
}

protected override void OnErrorResumeCore(Exception error)
{
parent.observer.OnErrorResume(error);
}

protected override void OnCompletedCore(Result result)
{
parent.TryPublishOnCompleted(result);
}
}
}
}
Loading

0 comments on commit ee29a96

Please sign in to comment.