Skip to content

Commit

Permalink
Append, Prepend, Concat
Browse files Browse the repository at this point in the history
  • Loading branch information
neuecc committed Dec 26, 2023
1 parent edc5702 commit ba5c87a
Show file tree
Hide file tree
Showing 9 changed files with 219 additions and 16 deletions.
3 changes: 3 additions & 0 deletions sandbox/ConsoleApp1/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@

var range = System.Reactive.Linq.Observable.Range(1, 10);

// range.Append(


// range.SelectMany(

// range.TakeLast(
Expand Down
2 changes: 1 addition & 1 deletion src/R3/Observable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public IDisposable Subscribe(Observer<T> observer)
public abstract class Observer<T> : IDisposable
{
#if DEBUG
[Obsolete("Only allow in Event<T>.")]
[Obsolete("Only allow in Observable<T>.")]
#endif
internal SingleAssignmentDisposableCore SourceSubscription;

Expand Down
54 changes: 54 additions & 0 deletions src/R3/Operators/AppendPrepend.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
namespace R3;

public static partial class ObservableExtensions
{
public static Observable<T> Append<T>(this Observable<T> source, T value)
{
return new AppendPrepend<T>(source, value, append: true);
}

public static Observable<T> Prepend<T>(this Observable<T> source, T value)
{
return new AppendPrepend<T>(source, value, append: false);
}
}


internal sealed class AppendPrepend<T>(Observable<T> source, T value, bool append) : Observable<T>
{
protected override IDisposable SubscribeCore(Observer<T> observer)
{
if (!append) // prepend
{
observer.OnNext(value);
}

return source.Subscribe(new _Append(observer, value));
}

sealed class _Append(Observer<T> observer, T value) : Observer<T>
{
protected override void OnNextCore(T value)
{
observer.OnNext(value);
}

protected override void OnErrorResumeCore(Exception error)
{
observer.OnErrorResume(error);
}

protected override void OnCompletedCore(Result result)
{
if (result.IsFailure)
{
observer.OnCompleted(result);
}
else
{
observer.OnNext(value);
observer.OnCompleted();
}
}
}
}
84 changes: 84 additions & 0 deletions src/R3/Operators/Concat.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
namespace R3;

public static partial class ObservableExtensions
{
public static Observable<T> Concat<T>(this Observable<T> source, Observable<T> second)
{
return new Concat<T>(source, second);
}
}


internal sealed class Concat<T>(Observable<T> source, Observable<T> second) : Observable<T>
{
protected override IDisposable SubscribeCore(Observer<T> observer)
{
return source.Subscribe(new _Concat(observer, second));
}

sealed class _Concat(Observer<T> observer, Observable<T> second) : Observer<T>
{
readonly Observer<T> observer = observer;

SingleAssignmentDisposableCore secondSubscription;

protected override bool AutoDisposeOnCompleted => false;

protected override void OnNextCore(T value)
{
observer.OnNext(value);
}

protected override void OnErrorResumeCore(Exception error)
{
observer.OnErrorResume(error);
}

protected override void OnCompletedCore(Result result)
{
if (result.IsFailure)
{
try
{
observer.OnCompleted(result);
}
finally
{
Dispose();
}
}
else
{
secondSubscription.Disposable = second.Subscribe(new SecondObserver(this));
}
}

protected override void DisposeCore()
{
secondSubscription.Dispose();
}

internal sealed class SecondObserver(_Concat parent) : Observer<T>
{
protected override void OnNextCore(T value)
{
parent.observer.OnNext(value);
}

protected override void OnErrorResumeCore(Exception error)
{
parent.observer.OnErrorResume(error);
}

protected override void OnCompletedCore(Result result)
{
parent.observer.OnCompleted(result);
}

protected override void DisposeCore()
{
parent.Dispose();
}
}
}
}
25 changes: 15 additions & 10 deletions src/R3/Operators/Take.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ protected override void OnNextCore(T value)
{
remaining--;
observer.OnNext(value);
}
else
{
observer.OnCompleted();
if (remaining == 0)
{
observer.OnCompleted();
}
}
}

Expand Down Expand Up @@ -183,13 +183,18 @@ bool IFrameRunnerWorkItem.MoveNext(long _)
if (remaining > 0)
{
remaining--;
return true;
}
else
{
OnCompleted(Result.Success);
return false;
if (remaining == 0)
{
OnCompleted(Result.Success);
return false;
}
else
{
return true;
}
}

return false;
}
}
}
2 changes: 1 addition & 1 deletion src/R3/Operators/_Operators.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public static partial class ObservableExtensions
//CombineLatest, Merge, Zip, WithLatestFrom, ZipLatest, Switch, MostRecent

// Standard Query:
// Concat, Append, Prepend, Distinct, DistinctUntilChanged, Scan
// Distinct, DistinctUntilChanged, Scan

// SkipTake:
// Skip, SkipLast, SkipUntil, SkipWhile
Expand Down
2 changes: 2 additions & 0 deletions src/R3/SubscriptionTracker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public static class SubscriptionTracker
static bool dirty;

[MethodImpl(MethodImplOptions.AggressiveInlining)]
[DebuggerStepThrough]
internal static bool TryTrackActiveSubscription(IDisposable subscription, int skipFrame, [NotNullWhen(true)] out TrackableDisposable? trackableDisposable)
{
if (!EnableTracking)
Expand All @@ -37,6 +38,7 @@ internal static bool TryTrackActiveSubscription(IDisposable subscription, int sk
return TryTrackActiveSubscriptionCore(subscription, skipFrame, out trackableDisposable);
}

[DebuggerStepThrough]
internal static bool TryTrackActiveSubscriptionCore(IDisposable subscription, int skipFrame, [NotNullWhen(true)] out TrackableDisposable? trackableDisposable)
{
dirty = true;
Expand Down
57 changes: 57 additions & 0 deletions tests/R3.Tests/OperatorTests/ConcatAppendPrependTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
namespace R3.Tests.OperatorTests;

public class ConcatAppendPrependTest
{
[Fact]
public void Prepend()
{
var subject = new Subject<int>();
using var list = subject.Prepend(9999).ToLiveList();

subject.OnNext(10);

list.AssertEqual([9999, 10]);

subject.OnCompleted();
list.AssertIsCompleted();
}

[Fact]
public void Append()
{
var subject = new Subject<int>();
using var list = subject.Append(9999).ToLiveList();

subject.OnNext(10);

list.AssertEqual([10]);

subject.OnCompleted();

list.AssertEqual([10, 9999]);
list.AssertIsCompleted();
}

[Fact]
public void Concat()
{
var subject = new Subject<int>();
var subject2 = new Subject<int>();
using var list = subject.Concat(subject2).ToLiveList();

subject.OnNext(10);
subject2.OnNext(9999);

list.AssertEqual([10]);

subject.OnCompleted();

subject2.OnNext(11111);

list.AssertEqual([10, 11111]);

subject2.OnCompleted();

list.AssertIsCompleted();
}
}
6 changes: 2 additions & 4 deletions tests/R3.Tests/OperatorTests/TakeTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@ public void TakeFrame()
publisher.OnNext(10000);
list.AssertEqual([1, 10, 100, 1000, 10000]);
frameProvider.Advance(2);
list.AssertIsNotCompleted();
frameProvider.Advance(1);
list.AssertIsCompleted();
}

Expand All @@ -67,9 +65,9 @@ public void TakeFrame2()
list.AssertEqual([0, 1, 2]);

frameProvider.Advance(2);
list.AssertEqual([0, 1, 2, 3, 4]);
frameProvider.Advance(1);

// not guranteed everyupdate and takeframe which call first
// list.AssertEqual([0, 1, 2, 3]);
list.AssertIsCompleted();
}
}

0 comments on commit ba5c87a

Please sign in to comment.