diff --git a/src/DynamicData.Tests/Cache/SwitchFixture.cs b/src/DynamicData.Tests/Cache/SwitchFixture.cs index 3eba97a7..5a5b7655 100644 --- a/src/DynamicData.Tests/Cache/SwitchFixture.cs +++ b/src/DynamicData.Tests/Cache/SwitchFixture.cs @@ -10,54 +10,83 @@ namespace DynamicData.Tests.Cache; -public class SwitchFixture : IDisposable +public class SwitchFixture { - private readonly ChangeSetAggregator _results; - - private readonly ISourceCache _source; - - private readonly ISubject> _switchable; - - public SwitchFixture() - { - _source = new SourceCache(p => p.Name); - _switchable = new BehaviorSubject>(_source); - _results = _switchable.Switch().AsAggregator(); - } - [Fact] public void ClearsForNewSource() { + using var source = new SourceCache(p => p.Name); + using var switchable = new BehaviorSubject>(source); + var results = switchable.Switch().AsAggregator(); + + var inital = Enumerable.Range(1, 100).Select(i => new Person("Person" + i, i)).ToArray(); - _source.AddOrUpdate(inital); + source.AddOrUpdate(inital); - _results.Data.Count.Should().Be(100); + results.Data.Count.Should().Be(100); var newSource = new SourceCache(p => p.Name); - _switchable.OnNext(newSource); + switchable.OnNext(newSource); - _results.Data.Count.Should().Be(0); + results.Data.Count.Should().Be(0); newSource.AddOrUpdate(inital); - _results.Data.Count.Should().Be(100); + results.Data.Count.Should().Be(100); var nextUpdates = Enumerable.Range(101, 100).Select(i => new Person("Person" + i, i)).ToArray(); newSource.AddOrUpdate(nextUpdates); - _results.Data.Count.Should().Be(200); + results.Data.Count.Should().Be(200); } - public void Dispose() + [Fact] + public void PoulatesFirstSource() { - _source.Dispose(); - _results.Dispose(); + using var source = new SourceCache(p => p.Name); + using var switchable = new BehaviorSubject>(source); + var results = switchable.Switch().AsAggregator(); + + + var inital = Enumerable.Range(1, 100).Select(i => new Person("Person" + i, i)).ToArray(); + source.AddOrUpdate(inital); + + results.Data.Count.Should().Be(100); } [Fact] - public void PoulatesFirstSource() + public void PropagatesOuterErrors() { + using var source = new SourceCache(p => p.Name); + using var switchable = new BehaviorSubject>(source); + var results = switchable.Switch().AsAggregator(); + + + var inital = Enumerable.Range(1, 100).Select(i => new Person("Person" + i, i)).ToArray(); + source.AddOrUpdate(inital); + + var error = new Exception("Test"); + switchable.OnError(error); + + results.Error.Should().Be(error); + } + + [Fact] + public void PropagatesInnerErrors() + { + using var source = new SourceCache(p => p.Name); + using var switchable = new BehaviorSubject>>(source.Connect()); + var results = switchable.Switch().AsAggregator(); + + var inital = Enumerable.Range(1, 100).Select(i => new Person("Person" + i, i)).ToArray(); - _source.AddOrUpdate(inital); + source.AddOrUpdate(inital); + + using var source2 = new BehaviorSubject>(ChangeSet.Empty); + + switchable.OnNext(source2); + + var error = new Exception("Test"); + source2.OnError(error); - _results.Data.Count.Should().Be(100); + results.Error.Should().Be(error); } } diff --git a/src/DynamicData/Cache/Internal/Switch.cs b/src/DynamicData/Cache/Internal/Switch.cs index a6bd0891..58e89c0d 100644 --- a/src/DynamicData/Cache/Internal/Switch.cs +++ b/src/DynamicData/Cache/Internal/Switch.cs @@ -4,6 +4,7 @@ using System.Reactive.Disposables; using System.Reactive.Linq; +using System.Reactive.Subjects; namespace DynamicData.Cache.Internal; @@ -20,16 +21,25 @@ public IObservable> Run() => Observable.Create(); + var errors = new Subject>(); + var populator = Observable.Switch( - _sources.Do( - _ => - { - lock (locker) - { - destination.Clear(); - } - })).Synchronize(locker).PopulateInto(destination); - - return new CompositeDisposable(destination, populator, destination.Connect().SubscribeSafe(observer)); + _sources + .Synchronize(locker) + .Do(onNext: _ => destination.Clear(), + onError: error => errors.OnError(error))) + .Synchronize(locker) + .Do(onNext: static _ => { }, + onError: error => errors.OnError(error)) + .PopulateInto(destination); + + return new CompositeDisposable( + destination, + errors, + populator, + Observable.Merge( + destination.Connect(), + errors) + .SubscribeSafe(observer)); }); }