diff --git a/src/DynamicData.Benchmarks/Cache/Filter_Cache_WithPredicateState.cs b/src/DynamicData.Benchmarks/Cache/Filter_Cache_WithPredicateState.cs new file mode 100644 index 000000000..0404b7c54 --- /dev/null +++ b/src/DynamicData.Benchmarks/Cache/Filter_Cache_WithPredicateState.cs @@ -0,0 +1,161 @@ +using System; +using System.Collections.Immutable; +using System.Linq; +using System.Reactive.Subjects; + +using BenchmarkDotNet.Attributes; + +using Bogus; + +namespace DynamicData.Benchmarks.Cache; + +[MemoryDiagnoser] +[MarkdownExporterAttribute.GitHub] +public class Filter_Cache_WithPredicateState +{ + public Filter_Cache_WithPredicateState() + { + // Not exercising Moved, since ChangeAwareCache<> doesn't support it, and I'm too lazy to implement it by hand. + var changeReasons = new[] + { + ChangeReason.Add, + ChangeReason.Refresh, + ChangeReason.Remove, + ChangeReason.Update + }; + + // Weights are chosen to make the cache size likely to grow over time, + // exerting more pressure on the system the longer the benchmark runs. + // Also, to prevent bogus operations (E.G. you can't remove an item from an empty cache). + var changeReasonWeightsWhenCountIs0 = new[] + { + 1f, // Add + 0f, // Refresh + 0f, // Remove + 0f // Update + }; + + var changeReasonWeightsOtherwise = new[] + { + 0.30f, // Add + 0.25f, // Refresh + 0.20f, // Remove + 0.25f // Update + }; + + var maxChangeCount = 20; + + var randomizer = new Randomizer(0x1234567); + + var changeSets = ImmutableArray.CreateBuilder>(initialCapacity: 5_000); + var nextItemId = 1; + var items = new ChangeAwareCache(); + while (changeSets.Count < changeSets.Capacity) + { + var changeCount = randomizer.Int(1, maxChangeCount); + for (var i = 0; i < changeCount; ++i) + { + var changeReason = randomizer.WeightedRandom(changeReasons, items.Count switch + { + 0 => changeReasonWeightsWhenCountIs0, + _ => changeReasonWeightsOtherwise + }); + + switch (changeReason) + { + case ChangeReason.Add: + items.AddOrUpdate( + item: new Item() + { + Id = nextItemId, + IsIncluded = randomizer.Bool() + }, + key: nextItemId); + ++nextItemId; + break; + + case ChangeReason.Refresh: + items.Refresh(items.Keys.ElementAt(randomizer.Int(0, items.Count - 1))); + break; + + case ChangeReason.Remove: + items.Remove(items.Keys.ElementAt(randomizer.Int(0, items.Count - 1))); + break; + + case ChangeReason.Update: + var id = items.Keys.ElementAt(randomizer.Int(0, items.Count - 1)); + items.AddOrUpdate( + item: new Item() + { + Id = id, + IsIncluded = randomizer.Bool() + }, + key: id); + break; + } + } + + changeSets.Add(items.CaptureChanges()); + } + _changeSets = changeSets.MoveToImmutable(); + + + var predicateStates = ImmutableArray.CreateBuilder(initialCapacity: 5_000); + while (predicateStates.Count < predicateStates.Capacity) + predicateStates.Add(randomizer.Int()); + _predicateStates = predicateStates.MoveToImmutable(); + } + + [Benchmark(Baseline = true)] + public void RandomizedEditsAndStateChanges() + { + using var source = new Subject>(); + using var predicateState = new Subject(); + + using var subscription = source + .Filter( + predicateState: predicateState, + predicate: Item.FilterByIdInclusionMask) + .Subscribe(); + + PublishNotifications(source, predicateState); + + subscription.Dispose(); + } + + private void PublishNotifications( + IObserver> source, + IObserver predicateState) + { + int i; + for (i = 0; (i < _changeSets.Length) && (i < _predicateStates.Length); ++i) + { + source.OnNext(_changeSets[i]); + predicateState.OnNext(_predicateStates[i]); + } + + for (; i < _changeSets.Length; ++i) + source.OnNext(_changeSets[i]); + + for (; i < _predicateStates.Length; ++i) + predicateState.OnNext(_predicateStates[i]); + } + + private readonly ImmutableArray> _changeSets; + private readonly ImmutableArray _predicateStates; + + public class Item + { + public static bool FilterByIdInclusionMask( + int idInclusionMask, + Item item) + => ((item.Id & idInclusionMask) == 0) && item.IsIncluded; + + public required int Id { get; init; } + + public bool IsIncluded { get; set; } + + public override string ToString() + => $"{{ Id = {Id}, IsIncluded = {IsIncluded} }}"; + } +} diff --git a/src/DynamicData.Benchmarks/List/Filter_List_WithPredicateState.cs b/src/DynamicData.Benchmarks/List/Filter_List_WithPredicateState.cs new file mode 100644 index 000000000..3707767be --- /dev/null +++ b/src/DynamicData.Benchmarks/List/Filter_List_WithPredicateState.cs @@ -0,0 +1,237 @@ +using System; +using System.Collections.Immutable; +using System.Linq; +using System.Reactive.Subjects; + +using BenchmarkDotNet.Attributes; + +using Bogus; + +namespace DynamicData.Benchmarks.List; + +[MemoryDiagnoser] +[MarkdownExporterAttribute.GitHub] +public class Filter_List_WithPredicateState +{ + public Filter_List_WithPredicateState() + { + var randomizer = new Randomizer(0x1234567); + + _changeSets = GenerateStressItemsAndChangeSets( + editCount: 5_000, + maxChangeCount: 20, + maxRangeSize: 10, + randomizer: randomizer); + + _predicateStates = GenerateRandomPredicateStates( + valueCount: 5_000, + randomizer: randomizer); + } + + [Params(ListFilterPolicy.CalculateDiff, ListFilterPolicy.ClearAndReplace)] + public ListFilterPolicy FilterPolicy { get; set; } + + [Benchmark(Baseline = true)] + public void RandomizedEditsAndStateChanges() + { + using var source = new Subject>(); + using var predicateState = new Subject(); + + using var subscription = source + .Filter( + predicateState: predicateState, + predicate: Item.FilterByIdInclusionMask, + filterPolicy: FilterPolicy) + .Subscribe(); + + PublishNotifications(source, predicateState); + + subscription.Dispose(); + } + + private static ImmutableArray> GenerateStressItemsAndChangeSets( + int editCount, + int maxChangeCount, + int maxRangeSize, + Randomizer randomizer) + { + var changeReasons = new[] + { + ListChangeReason.Add, + ListChangeReason.AddRange, + ListChangeReason.Clear, + ListChangeReason.Moved, + ListChangeReason.Refresh, + ListChangeReason.Remove, + ListChangeReason.RemoveRange, + ListChangeReason.Replace + }; + + // Weights are chosen to make the cache size likely to grow over time, + // exerting more pressure on the system the longer the benchmark runs. + // Also, to prevent bogus operations (E.G. you can't remove an item from an empty cache). + var changeReasonWeightsWhenCountIs0 = new[] + { + 0.5f, // Add + 0.5f, // AddRange + 0.0f, // Clear + 0.0f, // Moved + 0.0f, // Refresh + 0.0f, // Remove + 0.0f, // RemoveRange + 0.0f // Replace + }; + + var changeReasonWeightsWhenCountIs1 = new[] + { + 0.400f, // Add + 0.400f, // AddRange + 0.001f, // Clear + 0.000f, // Moved + 0.000f, // Refresh + 0.199f, // Remove + 0.000f, // RemoveRange + 0.000f // Replace + }; + + var changeReasonWeightsOtherwise = new[] + { + 0.250f, // Add + 0.250f, // AddRange + 0.001f, // Clear + 0.100f, // Moved + 0.099f, // Refresh + 0.100f, // Remove + 0.100f, // RemoveRange + 0.100f // Replace + }; + + var nextItemId = 1; + + var changeSets = ImmutableArray.CreateBuilder>(initialCapacity: editCount); + + var items = new ChangeAwareList(); + + while (changeSets.Count < changeSets.Capacity) + { + var changeCount = randomizer.Int(1, maxChangeCount); + for (var i = 0; i < changeCount; ++i) + { + var changeReason = randomizer.WeightedRandom(changeReasons, items.Count switch + { + 0 => changeReasonWeightsWhenCountIs0, + 1 => changeReasonWeightsWhenCountIs1, + _ => changeReasonWeightsOtherwise + }); + + switch (changeReason) + { + case ListChangeReason.Add: + items.Add(new Item() + { + Id = nextItemId++, + IsIncluded = randomizer.Bool() + }); + break; + + case ListChangeReason.AddRange: + items.AddRange(Enumerable.Repeat(0, randomizer.Int(1, maxRangeSize)) + .Select(_ => new Item() + { + Id = nextItemId++, + IsIncluded = randomizer.Bool() + })); + break; + + case ListChangeReason.Clear: + items.Clear(); + break; + + case ListChangeReason.Moved: + items.Move( + original: randomizer.Int(0, items.Count - 1), + destination: randomizer.Int(0, items.Count - 1)); + break; + + case ListChangeReason.Refresh: + items.RefreshAt(randomizer.Int(0, items.Count - 1)); + break; + + case ListChangeReason.Remove: + items.RemoveAt(randomizer.Int(0, items.Count - 1)); + break; + + case ListChangeReason.RemoveRange: + { + var rangeStartIndex = randomizer.Int(0, items.Count - 1); + + items.RemoveRange( + index: rangeStartIndex, + count: Math.Min(items.Count - rangeStartIndex, randomizer.Int(1, maxRangeSize))); + } + break; + + case ListChangeReason.Replace: + items[randomizer.Int(0, items.Count - 1)] = new Item() + { + Id = nextItemId++, + IsIncluded = randomizer.Bool() + }; + break; + } + } + + changeSets.Add(items.CaptureChanges()); + } + + return changeSets.MoveToImmutable(); + } + + private static ImmutableArray GenerateRandomPredicateStates( + int valueCount, + Randomizer randomizer) + { + var values = ImmutableArray.CreateBuilder(initialCapacity: valueCount); + + while (values.Count < valueCount) + values.Add(randomizer.Int()); + + return values.MoveToImmutable(); + } + + private void PublishNotifications( + IObserver> source, + IObserver predicateState) + { + int i; + for (i = 0; (i < _changeSets.Length) && (i < _predicateStates.Length); ++i) + { + source.OnNext(_changeSets[i]); + predicateState.OnNext(_predicateStates[i]); + } + + for (; i < _changeSets.Length; ++i) + source.OnNext(_changeSets[i]); + + for (; i < _predicateStates.Length; ++i) + predicateState.OnNext(_predicateStates[i]); + } + + private readonly ImmutableArray> _changeSets; + private readonly ImmutableArray _predicateStates; + + public class Item + { + public static bool FilterByIdInclusionMask( + int idInclusionMask, + Item item) + => ((item.Id & idInclusionMask) == 0) && item.IsIncluded; + + public required int Id { get; init; } + + public bool IsIncluded { get; set; } + + public override string ToString() + => $"{{ Id = {Id}, IsIncluded = {IsIncluded} }}"; + } +} diff --git a/src/DynamicData.Tests/API/ApiApprovalTests.DynamicDataTests.DotNet8_0.verified.txt b/src/DynamicData.Tests/API/ApiApprovalTests.DynamicDataTests.DotNet8_0.verified.txt index 0bf3dba96..18fc3a1e2 100644 --- a/src/DynamicData.Tests/API/ApiApprovalTests.DynamicDataTests.DotNet8_0.verified.txt +++ b/src/DynamicData.Tests/API/ApiApprovalTests.DynamicDataTests.DotNet8_0.verified.txt @@ -1327,6 +1327,9 @@ namespace DynamicData public static System.IObservable> Filter(this System.IObservable> source, System.IObservable> predicateChanged, System.IObservable reapplyFilter, bool suppressEmptyChangeSets = true) where TObject : notnull where TKey : notnull { } + public static System.IObservable> Filter(this System.IObservable> source, System.IObservable predicateState, System.Func predicate, bool suppressEmptyChangeSets = true) + where TObject : notnull + where TKey : notnull { } public static System.IObservable> FilterImmutable(this System.IObservable> source, System.Func predicate, bool suppressEmptyChangeSets = true) where TObject : notnull where TKey : notnull { } @@ -2325,6 +2328,8 @@ namespace DynamicData where T : notnull { } public static System.IObservable> Filter(this System.IObservable> source, System.IObservable> predicate, DynamicData.ListFilterPolicy filterPolicy = 1) where T : notnull { } + public static System.IObservable> Filter(this System.IObservable> source, System.IObservable predicateState, System.Func predicate, DynamicData.ListFilterPolicy filterPolicy = 1, bool suppressEmptyChangeSets = true) + where T : notnull { } public static System.IObservable> FilterOnObservable(this System.IObservable> source, System.Func> objectFilterObservable, System.TimeSpan? propertyChangedThrottle = default, System.Reactive.Concurrency.IScheduler? scheduler = null) where TObject : notnull { } [System.Obsolete("Use AutoRefresh(), followed by Filter() instead")] diff --git a/src/DynamicData.Tests/Cache/FilterFixture.WithPredicateState.cs b/src/DynamicData.Tests/Cache/FilterFixture.WithPredicateState.cs new file mode 100644 index 000000000..2f3abf906 --- /dev/null +++ b/src/DynamicData.Tests/Cache/FilterFixture.WithPredicateState.cs @@ -0,0 +1,805 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Reactive.Linq; +using System.Reactive.Subjects; +using System.Threading; +using System.Threading.Tasks; + +using Bogus; +using FluentAssertions; +using Xunit; + +using DynamicData.Tests.Utilities; + +namespace DynamicData.Tests.Cache; + +public partial class FilterFixture +{ + public sealed class WithPredicateState + { + [Fact] + public void ChangesAreMadeAfterInitialPredicateState_ItemsAreFiltered() + { + using var source = new SourceCache(static item => item.Id); + using var predicateState = new Subject(); + + using var subscription = source + .Connect() + .Filter( + predicateState: predicateState, + predicate: static (predicateState, item) => item.IsIncluded) + .ValidateSynchronization() + .ValidateChangeSets(static item => item.Id) + .RecordCacheItems(out var results); + + + // Set initial state + predicateState.OnNext(new()); + + results.RecordedChangeSets.Should().BeEmpty("no source operations have been performed"); + + + // Test Add changes + source.AddOrUpdate(new[] + { + new Item() { Id = 1, IsIncluded = true }, + new Item() { Id = 2, IsIncluded = true }, + new Item() { Id = 3, IsIncluded = false }, + new Item() { Id = 4, IsIncluded = false } + }); + + results.RecordedChangeSets.Count.Should().Be(1, "one source operation was performed"); + ShouldBeValid(results, EnumerateFilteredItems()); + + + // Test Refresh changes, with no item mutations. + source.Refresh(); + + results.RecordedChangeSets.Skip(1).Count().Should().Be(1, "one source operation was performed"); + results.RecordedChangeSets.Skip(1).First().Select(static change => change.Reason).Should().AllBeEquivalentTo(ChangeReason.Refresh, "all included items should have been refreshed"); + results.RecordedChangeSets.Skip(1).First().Select(static change => change.Current).Should().BeEquivalentTo(EnumerateFilteredItems(), "all included items should have been refreshed"); + ShouldBeValid(results, EnumerateFilteredItems()); + + + // Test Refresh changes, with item mutations affecting filtering. + foreach (var item in source.Items) + item.IsIncluded = !item.IsIncluded; + source.Refresh(); + + results.RecordedChangeSets.Skip(2).Count().Should().Be(1, "one source operation was performed"); + ShouldBeValid(results, EnumerateFilteredItems()); + + + // Test Remove changes + source.RemoveKeys(new[] { 2, 3 }); + + results.RecordedChangeSets.Skip(3).Count().Should().Be(1, "one source operation was performed"); + ShouldBeValid(results, EnumerateFilteredItems()); + + + // Test Update changes, not affecting filtering + source.AddOrUpdate(new[] + { + new Item() { Id = 1, IsIncluded = false }, + new Item() { Id = 4, IsIncluded = true } + }); + + results.RecordedChangeSets.Skip(4).Count().Should().Be(1, "one source operation was performed"); + ShouldBeValid(results, EnumerateFilteredItems()); + + + // Test Update changes, affecting filtering + source.AddOrUpdate(new[] + { + new Item() { Id = 1, IsIncluded = true }, + new Item() { Id = 4, IsIncluded = false } + }); + + results.RecordedChangeSets.Skip(5).Count().Should().Be(1, "one source operation was performed"); + ShouldBeValid(results, EnumerateFilteredItems()); + + + IEnumerable EnumerateFilteredItems() + => source.Items.Where(static item => item.IsIncluded); + } + + [Fact] + public void ChangesAreMadeAfterMultiplePredicateStateChanges_ItemsAreFilteredWithLatestPredicateState() + { + using var source = new SourceCache(static item => item.Id); + using var predicateState = new BehaviorSubject(1); + + using var subscription = source + .Connect() + .Filter( + predicateState: predicateState, + predicate: static (predicateState, item) => item.Id == predicateState) + .ValidateSynchronization() + .ValidateChangeSets(static item => item.Id) + .RecordCacheItems(out var results); + + + // Publish multiple state changes + predicateState.OnNext(2); + predicateState.OnNext(3); + + results.RecordedChangeSets.Should().BeEmpty("no source operations have been performed"); + + + // Test filtering of items, by state + source.AddOrUpdate(new[] + { + new Item() { Id = 1, IsIncluded = true }, + new Item() { Id = 2, IsIncluded = true }, + new Item() { Id = 3, IsIncluded = false }, + new Item() { Id = 4, IsIncluded = false } + }); + + results.RecordedChangeSets.Count.Should().Be(1, "one source operation was performed"); + ShouldBeValid(results, source.Items.Where(item => item.Id == predicateState.Value)); + } + + [Fact] + public void ChangesAreMadeBeforeInitialPredicateState_ItemsAreFilteredOnPredicateState() + { + using var source = new SourceCache(static item => item.Id); + using var predicateState = new Subject(); + + using var subscription = source + .Connect() + .Filter( + predicateState: predicateState, + predicate: static (predicateState, item) => item.IsIncluded) + .ValidateSynchronization() + .ValidateChangeSets(static item => item.Id) + .RecordCacheItems(out var results); + + + results.RecordedChangeSets.Should().BeEmpty("no source operations have been performed"); + + + // Test Add changes + source.AddOrUpdate(new[] + { + new Item() { Id = 1, IsIncluded = true }, + new Item() { Id = 2, IsIncluded = true }, + new Item() { Id = 3, IsIncluded = false }, + new Item() { Id = 4, IsIncluded = false } + }); + + // Test Refresh changes, with no item mutations. + source.Refresh(); + + // Test Refresh changes, with item mutations affecting filtering. + foreach (var item in source.Items) + item.IsIncluded = !item.IsIncluded; + source.Refresh(); + + // Test Remove changes + source.RemoveKeys(new[] { 2, 3 }); + + // Test Update changes, not affecting filtering + source.AddOrUpdate(new[] + { + new Item() { Id = 1, IsIncluded = false }, + new Item() { Id = 4, IsIncluded = true } + }); + + // Test Update changes, affecting filtering + source.AddOrUpdate(new[] + { + new Item() { Id = 1, IsIncluded = true }, + new Item() { Id = 4, IsIncluded = false } + }); + + results.RecordedChangeSets.Should().BeEmpty("the predicate state has not initialized"); + results.RecordedItemsByKey.Should().BeEmpty("the predicate state has not initialized"); + + + // Set initial state + predicateState.OnNext(new()); + + results.RecordedChangeSets.Count.Should().Be(1, "one source operation was performed"); + ShouldBeValid(results, source.Items.Where(static item => item.IsIncluded)); + } + + [Fact] + public void ItemsAreMoved_ChangesAreIgnored() + { + using var source = new Subject>(); + using var predicateState = new Subject(); + + using var subscription = source + .Filter( + predicateState: predicateState, + predicate: static (predicateState, item) => item.IsIncluded) + .ValidateSynchronization() + .ValidateChangeSets(static item => item.Id) + .RecordCacheItems(out var results); + + var items = new[] + { + new Item() { Id = 1, IsIncluded = true }, + new Item() { Id = 2, IsIncluded = true }, + new Item() { Id = 3, IsIncluded = false }, + new Item() { Id = 4, IsIncluded = false } + }; + + + // Set initial state + predicateState.OnNext(new()); + source.OnNext(new ChangeSet(items + .Select((item, index) => new Change( + reason: ChangeReason.Add, + key: item.Id, + current: item, + index: index)))); + + results.RecordedChangeSets.Count.Should().Be(1, "one source operation was performed"); + ShouldBeValid(results, EnumerateFilteredItems()); + + + // Test Moved changes, for both included and excluded items. + source.OnNext(new ChangeSet() + { + new(reason: ChangeReason.Moved, key: 1, current: items[0], previous: default, previousIndex: 0, currentIndex: 1), + new(reason: ChangeReason.Moved, key: 2, current: items[1], previous: default, previousIndex: 0, currentIndex: 2), + new(reason: ChangeReason.Moved, key: 3, current: items[2], previous: default, previousIndex: 1, currentIndex: 0) + }); + + results.RecordedChangeSets.Skip(1).Should().BeEmpty("the move operation should have been ignored"); + ShouldBeValid(results, EnumerateFilteredItems()); + + + IEnumerable EnumerateFilteredItems() + => items.Where(static item => item.IsIncluded); + } + + [Fact] + public void PredicateIsNull_ExceptionIsThrown() + => FluentActions.Invoking(() => Observable.Empty>() + .Filter( + predicateState: Observable.Empty(), + predicate: null!)) + .Should() + .Throw(); + + [Fact] + public void PredicateStateChanges_ItemsAreRefiltered() + { + using var source = new SourceCache(static item => item.Id); + using var predicateState = new BehaviorSubject(1); + + using var subscription = source + .Connect() + .Filter( + predicateState: predicateState, + predicate: static (predicateState, item) => item.Id == predicateState) + .ValidateSynchronization() + .ValidateChangeSets(static item => item.Id) + .RecordCacheItems(out var results); + + + // Test filtering of items, by state + source.AddOrUpdate(new[] + { + new Item() { Id = 1, IsIncluded = true }, + new Item() { Id = 2, IsIncluded = true }, + new Item() { Id = 3, IsIncluded = false }, + new Item() { Id = 4, IsIncluded = false } + }); + + results.RecordedChangeSets.Count.Should().Be(1, "one source operation was performed"); + ShouldBeValid(results, EnumerateFilteredItems()); + + + // Publish a state change, to change the filtering + predicateState.OnNext(2); + + results.RecordedChangeSets.Skip(1).Count().Should().Be(1, "one source operation was performed"); + ShouldBeValid(results, EnumerateFilteredItems()); + + + IEnumerable EnumerateFilteredItems() + => source.Items.Where(item => item.Id == predicateState.Value); + } + + [Fact] + public void PredicateStateCompletesAfterInitialValue_CompletionWaitsForSourceCompletion() + { + using var source = new Subject>(); + + using var subscription = source + .Filter( + predicateState: Observable.Return(new object()), + predicate: static (predicateState, item) => item.IsIncluded) + .ValidateSynchronization() + .ValidateChangeSets(static item => item.Id) + .RecordCacheItems(out var results); + + results.HasCompleted.Should().BeFalse("changes could still be generated by the source"); + results.RecordedChangeSets.Should().BeEmpty("no source operations were performed"); + + source.OnCompleted(); + + results.HasCompleted.Should().BeTrue("all input streams have completed"); + results.RecordedChangeSets.Should().BeEmpty("no source operations were performed"); + } + + [Fact] + public void PredicateStateCompletesImmediately_CompletionIsPropagated() + { + using var source = new Subject>(); + + using var subscription = source + .Filter( + predicateState: Observable.Empty(), + predicate: static (predicateState, item) => item.IsIncluded) + .ValidateSynchronization() + .ValidateChangeSets(static item => item.Id) + .RecordCacheItems(out var results); + + results.HasCompleted.Should().BeTrue("completion of the predicate state stream before it emits any values means that items can never be accepted by the filter predicate"); + results.RecordedChangeSets.Should().BeEmpty("no source operations were performed"); + + source.HasObservers.Should().BeFalse("all subscriptions should have been disposed, during finalization of the stream"); + } + + [Fact] + public void PredicateStateErrors_ErrorIsPropagated() + { + using var source = new Subject>(); + using var predicateState = new Subject(); + + using var subscription = source + .Filter( + predicateState: predicateState, + predicate: static (predicateState, item) => item.IsIncluded) + .ValidateSynchronization() + .ValidateChangeSets(static item => item.Id) + .RecordCacheItems(out var results); + + + var error = new Exception("This is a test."); + predicateState.OnError(error); + + results.Error.Should().Be(error, "errors should be propagated"); + results.RecordedChangeSets.Should().BeEmpty("no source operations were performed"); + + source.HasObservers.Should().BeFalse("all subscriptions should have been disposed, during finalization of the stream"); + predicateState.HasObservers.Should().BeFalse("all subscriptions should have been disposed, during finalization of the stream"); + } + + [Fact] + public void PredicateStateErrorsImmediately_ErrorIsPropagated() + { + using var source = new Subject>(); + + var error = new Exception("This is a test."); + + using var subscription = source + .Filter( + predicateState: Observable.Throw(error), + predicate: static (predicateState, item) => item.IsIncluded) + .ValidateSynchronization() + .ValidateChangeSets(static item => item.Id) + .RecordCacheItems(out var results); + + results.Error.Should().Be(error, "errors should be propagated"); + results.RecordedChangeSets.Should().BeEmpty("no source operations were performed"); + + source.HasObservers.Should().BeFalse("all subscriptions should have been disposed, during finalization of the stream"); + } + + [Fact] + public void PredicateStateIsNull_ExceptionIsThrown() + => FluentActions.Invoking(() => Observable.Empty>() + .Filter( + predicateState: (null as IObservable)!, + predicate: static (_, _) => true)) + .Should() + .Throw(); + + [Fact] + public async Task SourceAndPredicateStateNotifyFromDifferentThreads_FilteringIsThreadSafe() + { + var randomizer = new Randomizer(0x1234567); + + (var items, var changeSets) = GenerateStressItemsAndChangeSets( + editCount: 5_000, + maxChangeCount: 20, + randomizer: randomizer); + + var predicateStates = GenerateRandomPredicateStates( + valueCount: 5_000, + randomizer: randomizer); + + + using var source = new Subject>(); + + using var predicateState = new Subject(); + + using var subscription = source + .Filter( + predicateState: predicateState, + predicate: Item.FilterByIdInclusionMask) + .ValidateSynchronization() + .ValidateChangeSets(static item => item.Id) + .RecordCacheItems(out var results); + + using var timeoutSource = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + + await Task.WhenAll( + Task.Run( + action: () => + { + foreach (var changeSet in changeSets) + source.OnNext(changeSet); + }, + cancellationToken: timeoutSource.Token), + Task.Run( + action: () => + { + foreach (var value in predicateStates) + predicateState.OnNext(value); + }, + cancellationToken: timeoutSource.Token)); + + var finalPredicateState = predicateStates[^1]; + ShouldBeValid(results, items.Items.Where(item => Item.FilterByIdInclusionMask(finalPredicateState, item))); + } + + [Fact] + public void SourceCompletesWhenEmpty_CompletionIsPropagated() + { + using var source = new Subject>(); + + using var predicateState = new Subject(); + + using var subscription = source + .Filter( + predicateState: predicateState, + predicate: static (predicateState, item) => item.IsIncluded) + .ValidateSynchronization() + .ValidateChangeSets(static item => item.Id) + .RecordCacheItems(out var results); + + source.OnCompleted(); + + results.HasCompleted.Should().BeTrue("no further changes can occur when there are no items to be filtered or unfiltered"); + results.RecordedChangeSets.Should().BeEmpty("no source operations were performed"); + + predicateState.HasObservers.Should().BeFalse("all subscriptions should have been disposed, during finalization of the stream"); + } + + [Fact] + public void SourceCompletesWhenNotEmpty_CompletionWaitsForStateCompletion() + { + using var source = new Subject>(); + + using var predicateState = new Subject(); + + using var subscription = source + .Filter( + predicateState: predicateState, + predicate: static (predicateState, item) => item.IsIncluded) + .ValidateSynchronization() + .ValidateChangeSets(static item => item.Id) + .RecordCacheItems(out var results); + + source.OnNext(new ChangeSet() { new(reason: ChangeReason.Add, key: 1, current: new Item() { Id = 1, IsIncluded = true }) }); + source.OnCompleted(); + + results.HasCompleted.Should().BeFalse("changes could still be generated by changes in predicate state"); + results.RecordedChangeSets.Should().BeEmpty("the predicate has not initialized"); + + predicateState.OnCompleted(); + + results.HasCompleted.Should().BeTrue("all input streams have completed"); + results.RecordedChangeSets.Should().BeEmpty("the predicate never initialized"); + } + + [Fact] + public void SourceCompletesImmediately_CompletionIsPropagated() + { + using var predicateState = new Subject(); + + using var subscription = Observable.Empty>() + .Filter( + predicateState: predicateState, + predicate: static (predicateState, item) => item.IsIncluded) + .ValidateSynchronization() + .ValidateChangeSets(static item => item.Id) + .RecordCacheItems(out var results); + + results.HasCompleted.Should().BeTrue("no further changes can occur when there are no items to be filtered or unfiltered"); + results.RecordedChangeSets.Should().BeEmpty("no source operations were performed"); + + predicateState.HasObservers.Should().BeFalse("all subscriptions should have been disposed, during finalization of the stream"); + } + + [Fact] + public void SourceErrors_ErrorIsPropagated() + { + using var source = new Subject>(); + + using var predicateState = new Subject(); + + using var subscription = source + .Filter( + predicateState: predicateState, + predicate: static (predicateState, item) => item.IsIncluded) + .ValidateSynchronization() + .ValidateChangeSets(static item => item.Id) + .RecordCacheItems(out var results); + + + var error = new Exception("This is a test."); + source.OnError(error); + + results.Error.Should().Be(error, "errors should be propagated"); + results.RecordedChangeSets.Should().BeEmpty("no source operations were performed"); + + source.HasObservers.Should().BeFalse("all subscriptions should have been disposed, during finalization of the stream"); + predicateState.HasObservers.Should().BeFalse("all subscriptions should have been disposed, during finalization of the stream"); + } + + [Fact] + public void SourceErrorsImmediately_ErrorIsPropagated() + { + using var predicateState = new Subject(); + + var error = new Exception("This is a test."); + + using var subscription = Observable.Throw>(error) + .Filter( + predicateState: predicateState, + predicate: static (predicateState, item) => item.IsIncluded) + .ValidateSynchronization() + .ValidateChangeSets(static item => item.Id) + .RecordCacheItems(out var results); + + results.Error.Should().Be(error, "errors should be propagated"); + results.RecordedChangeSets.Should().BeEmpty("no source operations were performed"); + + predicateState.HasObservers.Should().BeFalse("all subscriptions should have been disposed, during finalization of the stream"); + } + + [Fact] + public void SourceIsNull_ExceptionIsThrown() + => FluentActions.Invoking(() => ObservableCacheEx.Filter( + source: (null as IObservable>)!, + predicateState: Observable.Empty(), + predicate: static (_, _) => true)) + .Should() + .Throw(); + + [Fact] + public void SubscriptionIsDisposed_UnsubscriptionIsPropagated() + { + using var source = new Subject>(); + + using var predicateState = new Subject(); + + using var subscription = source + .Filter( + predicateState: predicateState, + predicate: static (predicateState, item) => item.IsIncluded) + .ValidateSynchronization() + .ValidateChangeSets(static item => item.Id) + .RecordCacheItems(out var results); + + + subscription.Dispose(); + + source.HasObservers.Should().BeFalse("subscription disposal should be propagated to all input streams"); + predicateState.HasObservers.Should().BeFalse("subscription disposal should be propagated to all input streams"); + + results.RecordedChangeSets.Should().BeEmpty("no source operations were performed"); + } + + [Theory] + [InlineData("source", "predicateState")] + [InlineData("predicateState", "source")] + public void SuppressEmptyChangeSetsIsFalse_EmptyChangesetsArePropagatedAndOnlyFinalCompletionIsPropagated(params string[] completionOrder) + { + using var source = new Subject>(); + + using var predicateState = new Subject(); + + using var subscription = source + .Filter( + predicateState: predicateState, + predicate: static (predicateState, item) => item.IsIncluded, + suppressEmptyChangeSets: false) + .ValidateSynchronization() + .ValidateChangeSets(static item => item.Id) + .RecordCacheItems(out var results); + + + // Initialize the predicate + predicateState.OnNext(new object()); + + results.RecordedChangeSets.Count.Should().Be(1, "the predicate state was initialized"); + results.RecordedChangeSets[0].Should().BeEmpty("there are no items in the collection"); + ShouldBeValid(results, Enumerable.Empty()); + + + // Publish an empty changeset + source.OnNext(ChangeSet.Empty); + + results.RecordedChangeSets.Skip(1).Count().Should().Be(1, "a source operation was performed"); + results.RecordedChangeSets.Skip(1).First().Should().BeEmpty("the source changeset was empty"); + ShouldBeValid(results, Enumerable.Empty()); + + + // Publish a changeset with only excluded items + source.OnNext(new ChangeSet() + { + new(reason: ChangeReason.Add, key: 1, current: new Item() { Id = 1, IsIncluded = false }), + new(reason: ChangeReason.Add, key: 2, current: new Item() { Id = 2, IsIncluded = false }), + new(reason: ChangeReason.Add, key: 3, current: new Item() { Id = 3, IsIncluded = false }) + }); + + results.RecordedChangeSets.Skip(2).Count().Should().Be(1, "a source operation was performed"); + results.RecordedChangeSets.Skip(2).First().Should().BeEmpty("all source items were excluded"); + ShouldBeValid(results, Enumerable.Empty()); + + for (var i = 0; i < completionOrder.Length; ++i) + { + switch (completionOrder[i]) + { + case nameof(source): + source.OnCompleted(); + break; + + case nameof(predicateState): + predicateState.OnCompleted(); + break; + } + + if (i < (completionOrder.Length - 1)) + results.HasCompleted.Should().BeFalse("not all input streams have completed"); + } + + results.HasCompleted.Should().BeTrue("all input streams have completed"); + } + + private static void ShouldBeValid( + CacheItemRecordingObserver results, + IEnumerable expectedFilteredItems) + { + results.Error.Should().BeNull("no errors should have occurred"); + results.HasCompleted.Should().BeFalse("no completion events should have occurred"); + results.RecordedChangeSets.Should().AllSatisfy(changeSet => + { + if (changeSet.Count is not 0) + changeSet.Should().AllSatisfy(change => + { + change.CurrentIndex.Should().Be(-1, "sorting indexes should not be propagated"); + change.PreviousIndex.Should().Be(-1, "sorting indexes should not be propagated"); + }); + }); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(expectedFilteredItems, "all filtered items should match the filter predicate"); + results.RecordedItemsSorted.Should().BeEmpty("sorting is not supported by filter opreators"); + } + + private static (ICache items, IReadOnlyList> changeSets) GenerateStressItemsAndChangeSets( + int editCount, + int maxChangeCount, + Randomizer randomizer) + { + // Not exercising Moved, since ChangeAwareCache<> doesn't support it, and I'm too lazy to implement it by hand. + var changeReasons = new[] + { + ChangeReason.Add, + ChangeReason.Refresh, + ChangeReason.Remove, + ChangeReason.Update + }; + + // Weights are chosen to make the cache size likely to grow over time, + // exerting more pressure on the system the longer the benchmark runs. + // Also, to prevent bogus operations (E.G. you can't remove an item from an empty cache). + var changeReasonWeightsWhenCountIs0 = new[] + { + 1f, // Add + 0f, // Refresh + 0f, // Remove + 0f // Update + }; + + var changeReasonWeightsOtherwise = new[] + { + 0.30f, // Add + 0.25f, // Refresh + 0.20f, // Remove + 0.25f // Update + }; + + var nextItemId = 1; + + var changeSets = new List>(capacity: editCount); + + var items = new ChangeAwareCache(); + + while (changeSets.Count < changeSets.Capacity) + { + var changeCount = randomizer.Int(1, maxChangeCount); + for (var i = 0; i < changeCount; ++i) + { + var changeReason = randomizer.WeightedRandom(changeReasons, items.Count switch + { + 0 => changeReasonWeightsWhenCountIs0, + _ => changeReasonWeightsOtherwise + }); + + switch (changeReason) + { + case ChangeReason.Add: + items.AddOrUpdate( + item: new Item() + { + Id = nextItemId, + IsIncluded = randomizer.Bool() + }, + key: nextItemId); + ++nextItemId; + break; + + case ChangeReason.Refresh: + items.Refresh(items.Keys.ElementAt(randomizer.Int(0, items.Count - 1))); + break; + + case ChangeReason.Remove: + items.Remove(items.Keys.ElementAt(randomizer.Int(0, items.Count - 1))); + break; + + case ChangeReason.Update: + var id = items.Keys.ElementAt(randomizer.Int(0, items.Count - 1)); + items.AddOrUpdate( + item: new Item() + { + Id = id, + IsIncluded = randomizer.Bool() + }, + key: id); + break; + } + } + + changeSets.Add(items.CaptureChanges()); + } + + return (items, changeSets); + } + + private static IReadOnlyList GenerateRandomPredicateStates( + int valueCount, + Randomizer randomizer) + { + var values = new List(capacity: valueCount); + + while (values.Count < valueCount) + values.Add(randomizer.Int()); + + return values; + } + + private class Item + { + public static bool FilterByIdInclusionMask( + int idInclusionMask, + Item item) + => ((item.Id & idInclusionMask) == 0) && item.IsIncluded; + + public required int Id { get; init; } + + public bool IsIncluded { get; set; } + + public override string ToString() + => $"{{ Id = {Id}, IsIncluded = {IsIncluded} }}"; + } + } +} diff --git a/src/DynamicData.Tests/Cache/FilterFixture.cs b/src/DynamicData.Tests/Cache/FilterFixture.cs index 9e901c8c1..1b3fc3ca4 100644 --- a/src/DynamicData.Tests/Cache/FilterFixture.cs +++ b/src/DynamicData.Tests/Cache/FilterFixture.cs @@ -10,7 +10,7 @@ namespace DynamicData.Tests.Cache; -public class FilterFixture : IDisposable +public partial class FilterFixture : IDisposable { private readonly ChangeSetAggregator _results; diff --git a/src/DynamicData.Tests/List/FilterFixture.WithPredicateState.cs b/src/DynamicData.Tests/List/FilterFixture.WithPredicateState.cs new file mode 100644 index 000000000..d3f28efd3 --- /dev/null +++ b/src/DynamicData.Tests/List/FilterFixture.WithPredicateState.cs @@ -0,0 +1,1031 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Reactive.Linq; +using System.Reactive.Subjects; +using System.Threading; +using System.Threading.Tasks; + +using Bogus; +using FluentAssertions; +using Xunit; + +using DynamicData.Tests.Utilities; +using Xunit.Abstractions; + +namespace DynamicData.Tests.List; + +public partial class FilterFixture +{ + public sealed class WithPredicateState + { + private readonly ITestOutputHelper _output; + + public WithPredicateState(ITestOutputHelper output) + => _output = output; + + [Theory] + [InlineData(ListFilterPolicy.CalculateDiff)] + [InlineData(ListFilterPolicy.ClearAndReplace)] + public void ChangesAreMadeAfterInitialPredicateState_ItemsAreFiltered(ListFilterPolicy filterPolicy) + { + using var source = new TestSourceList(); + using var predicateState = new Subject(); + + using var subscription = source + .Connect() + .Filter( + predicateState: predicateState, + predicate: static (predicateState, item) => item.IsIncluded, + filterPolicy: filterPolicy) + .ValidateSynchronization() + .ValidateChangeSets() + .RecordListItems(out var results); + + + // Set initial state + predicateState.OnNext(new()); + + results.RecordedChangeSets.Should().BeEmpty("no source operations have been performed"); + + + // Test Add, with an included item + var item1 = new Item() { Id = 1, IsIncluded = true }; + source.Add(item1); + + results.RecordedChangeSets.Count.Should().Be(1, "one source operation was performed, with one included item added"); + ShouldBeValid(results, EnumerateFilteredItems()); + + + // Test Add, with an excluded item + var item2 = new Item() { Id = 2, IsIncluded = false }; + source.Add(item2); + + results.RecordedChangeSets.Skip(1).Should().BeEmpty("one source operation was performed, but no included items were affected"); + + + // Test AddRange, with both included and excluded items + var item3 = new Item() { Id = 3, IsIncluded = false }; + var item4 = new Item() { Id = 4, IsIncluded = true }; + var item5 = new Item() { Id = 5, IsIncluded = true }; + var item6 = new Item() { Id = 6, IsIncluded = false }; + var item7 = new Item() { Id = 7, IsIncluded = false }; + var item8 = new Item() { Id = 8, IsIncluded = true }; + source.AddRange(new[] { item3, item4, item5, item6, item7, item8 }); + + results.RecordedChangeSets.Skip(1).Count().Should().Be(1, "one source operation was performed, with 3 included items added"); + ShouldBeValid(results, EnumerateFilteredItems()); + + + // Test Refresh, with no item mutations. + source.Refresh(Enumerable.Range(0, source.Count)); + + results.RecordedChangeSets.Skip(2).Count().Should().Be(1, "one source operation was performed, with all included items affected"); + results.RecordedChangeSets.Skip(2).First().Select(static change => change.Reason).Should().AllBeEquivalentTo(ListChangeReason.Refresh, "all included items should have been refreshed"); + results.RecordedChangeSets.Skip(2).First().Select(static change => change.Item.Current).Should().BeEquivalentTo(EnumerateFilteredItems(), "all included items should have been refreshed"); + ShouldBeValid(results, EnumerateFilteredItems()); + + + // Test Refresh, with item mutations affecting filtering. + item1.IsIncluded = !item1.IsIncluded; + item3.IsIncluded = !item3.IsIncluded; + item5.IsIncluded = !item5.IsIncluded; + item6.IsIncluded = !item6.IsIncluded; + source.Refresh(Enumerable.Range(0, source.Count)); + + results.RecordedChangeSets.Skip(3).Count().Should().Be(1, "one source operation was performed, with items being included and excluded"); + ShouldBeValid(results, EnumerateFilteredItems()); + + + // Test Remove, with an included item + source.RemoveAt(3); + + results.RecordedChangeSets.Skip(4).Count().Should().Be(1, "one source operation was performed, with one included item affected"); + ShouldBeValid(results, EnumerateFilteredItems()); + + + // Test Remove, with an excluded item + source.RemoveAt(3); + + results.RecordedChangeSets.Skip(5).Should().BeEmpty("one source operation was performed, but no included items were affected"); + + + // Test Remove, with both included and excluded items + source.RemoveRange(index: 2, count: 2); + + results.RecordedChangeSets.Skip(5).Count().Should().Be(1, "one source operation was performed, with one included item affected"); + ShouldBeValid(results, EnumerateFilteredItems()); + + + // Test Replace, not affecting filtering + var item9 = new Item() { Id = 9, IsIncluded = false }; + var item10 = new Item() { Id = 10, IsIncluded = true }; + source.Edit(updater => + { + updater.Replace(item7, item9); + updater.Replace(item8, item10); + }); + + results.RecordedChangeSets.Skip(6).Count().Should().Be(1, "one source operation was performed, with one included item affected"); + ShouldBeValid(results, EnumerateFilteredItems()); + + + // Test Replace, affecting filtering + var item11 = new Item() { Id = 11, IsIncluded = true }; + var item12 = new Item() { Id = 12, IsIncluded = false }; + source.Edit(updater => + { + updater.Replace(item9, item11); + updater.Replace(item10, item12); + }); + + results.RecordedChangeSets.Skip(7).Count().Should().Be(1, "one source operation was performed, with one included item affected"); + ShouldBeValid(results, EnumerateFilteredItems()); + + + // Test Move of an included item, relative to another included item + var item13 = new Item() { Id = 13, IsIncluded = true }; + source.Add(item13); + source.Move(2, 4); + + switch (filterPolicy) + { + case ListFilterPolicy.CalculateDiff: + results.RecordedChangeSets.Skip(8).Count().Should().Be(2, "two source operations were performed"); + break; + + case ListFilterPolicy.ClearAndReplace: + results.RecordedChangeSets.Skip(8).Count().Should().Be(1, "two source operations were performed, one of which was a move, which are not propagated, as ordering is not preserved"); + break; + } + ShouldBeValid(results, EnumerateFilteredItems()); + + + // Test Move of an excluded item + source.Move(4, 2); + + switch (filterPolicy) + { + case ListFilterPolicy.CalculateDiff: + results.RecordedChangeSets.Skip(10).Count().Should().Be(1, "one source operation was performed"); + break; + + case ListFilterPolicy.ClearAndReplace: + results.RecordedChangeSets.Skip(9).Should().BeEmpty("one source operation was performed, a move, which are not propagated, as ordering is not preserved"); + break; + } + + + // Test Clear, with included items + source.Clear(); + + switch (filterPolicy) + { + case ListFilterPolicy.CalculateDiff: + results.RecordedChangeSets.Skip(11).Count().Should().Be(1, "one source operation was performed, with all included items affected"); + break; + + case ListFilterPolicy.ClearAndReplace: + results.RecordedChangeSets.Skip(9).Count().Should().Be(1, "one source operation was performed, with all included items affected"); + break; + } + ShouldBeValid(results, EnumerateFilteredItems()); + + + // Test Clear, with only excluded items + source.Add(new Item() { Id = 14, IsIncluded = false }); + source.Clear(); + + switch (filterPolicy) + { + case ListFilterPolicy.CalculateDiff: + results.RecordedChangeSets.Skip(12).Should().BeEmpty("two source operations were performed, and neither affected included items"); + break; + + case ListFilterPolicy.ClearAndReplace: + results.RecordedChangeSets.Skip(10).Should().BeEmpty("two source operations were performed, and neither affected included items"); + break; + } + ShouldBeValid(results, EnumerateFilteredItems()); + + + IEnumerable EnumerateFilteredItems() + => source.Items.Where(static item => item.IsIncluded); + } + + [Theory] + [InlineData(ListFilterPolicy.CalculateDiff)] + [InlineData(ListFilterPolicy.ClearAndReplace)] + public void ChangesAreMadeAfterMultiplePredicateStateChanges_ItemsAreFilteredWithLatestPredicateState(ListFilterPolicy filterPolicy) + { + using var source = new SourceList(); + using var predicateState = new BehaviorSubject(1); + + using var subscription = source + .Connect() + .Filter( + predicateState: predicateState, + predicate: static (predicateState, item) => item.Id == predicateState, + filterPolicy: filterPolicy) + .ValidateSynchronization() + .ValidateChangeSets() + .RecordListItems(out var results); + + + // Publish multiple state changes + predicateState.OnNext(2); + predicateState.OnNext(3); + + results.RecordedChangeSets.Should().BeEmpty("no source operations have been performed"); + + + // Test filtering of items, by state + source.AddRange(new[] + { + new Item() { Id = 1, IsIncluded = true }, + new Item() { Id = 2, IsIncluded = true }, + new Item() { Id = 3, IsIncluded = false }, + new Item() { Id = 4, IsIncluded = false } + }); + + results.RecordedChangeSets.Count.Should().Be(1, "one source operation was performed"); + ShouldBeValid(results, source.Items.Where(item => item.Id == predicateState.Value)); + } + + [Theory] + [InlineData(ListFilterPolicy.CalculateDiff)] + [InlineData(ListFilterPolicy.ClearAndReplace)] + public void ChangesAreMadeBeforeInitialPredicateState_ItemsAreFilteredOnPredicateState(ListFilterPolicy filterPolicy) + { + using var source = new TestSourceList(); + using var predicateState = new Subject(); + + using var subscription = source + .Connect() + .Filter( + predicateState: predicateState, + predicate: static (predicateState, item) => item.IsIncluded, + filterPolicy: filterPolicy) + .ValidateSynchronization() + .ValidateChangeSets() + .RecordListItems(out var results); + + + results.RecordedChangeSets.Should().BeEmpty("no source operations have been performed"); + + + // Test Add, with an included item + var item1 = new Item() { Id = 1, IsIncluded = true }; + source.Add(item1); + + // Test Add, with an excluded item + var item2 = new Item() { Id = 2, IsIncluded = false }; + source.Add(item2); + + // Test AddRange, with both included and excluded items + var item3 = new Item() { Id = 3, IsIncluded = false }; + var item4 = new Item() { Id = 4, IsIncluded = true }; + var item5 = new Item() { Id = 5, IsIncluded = true }; + var item6 = new Item() { Id = 6, IsIncluded = false }; + var item7 = new Item() { Id = 7, IsIncluded = false }; + var item8 = new Item() { Id = 8, IsIncluded = true }; + source.AddRange(new[] { item3, item4, item5, item6, item7, item8 }); + + // Test Refresh, with no item mutations. + source.Refresh(Enumerable.Range(0, source.Count)); + + // Test Refresh, with item mutations affecting filtering. + item1.IsIncluded = !item1.IsIncluded; + item3.IsIncluded = !item3.IsIncluded; + item5.IsIncluded = !item5.IsIncluded; + item6.IsIncluded = !item6.IsIncluded; + source.Refresh(Enumerable.Range(0, source.Count)); + + // Test Remove, with an included item + source.RemoveAt(3); + + // Test Remove, with an excluded item + source.RemoveAt(3); + + // Test Remove, with both included and excluded items + source.RemoveRange(index: 2, count: 2); + + // Test Replace, not affecting filtering + var item9 = new Item() { Id = 9, IsIncluded = false }; + var item10 = new Item() { Id = 10, IsIncluded = true }; + source.Edit(updater => + { + updater.Replace(item7, item9); + updater.Replace(item8, item10); + }); + + // Test Replace, affecting filtering + var item11 = new Item() { Id = 11, IsIncluded = true }; + var item12 = new Item() { Id = 12, IsIncluded = false }; + source.Edit(updater => + { + updater.Replace(item9, item11); + updater.Replace(item10, item12); + }); + + // Test Move of an included item, relative to another included item + var item13 = new Item() { Id = 13, IsIncluded = true }; + source.Add(item13); + source.Move(2, 4); + + // Test Move of an excluded item + source.Move(4, 2); + + results.RecordedChangeSets.Should().BeEmpty("the predicate state has not initialized"); + + + // Set initial state + predicateState.OnNext(new()); + + results.RecordedChangeSets.Count.Should().Be(1, "one source operation was performed"); + ShouldBeValid(results, source.Items.Where(static item => item.IsIncluded)); + } + + [Fact] + public void FilterPolicyIsClearAndReplace_ReFilteringPreservesOrder() + { + using var source = new SourceList(); + using var predicateState = new BehaviorSubject(1); + + using var subscription = source + .Connect() + .Filter( + predicateState: predicateState, + predicate: static (predicateState, item) => item.Id == predicateState, + filterPolicy: ListFilterPolicy.ClearAndReplace) + .ValidateSynchronization() + .ValidateChangeSets() + .RecordListItems(out var results); + + + // Test filtering of items, by state + source.AddRange(new[] + { + new Item() { Id = 1, IsIncluded = true }, + new Item() { Id = 2, IsIncluded = true }, + new Item() { Id = 3, IsIncluded = false }, + new Item() { Id = 4, IsIncluded = false }, + new Item() { Id = 5, IsIncluded = true }, + new Item() { Id = 6, IsIncluded = false }, + new Item() { Id = 7, IsIncluded = false }, + new Item() { Id = 8, IsIncluded = true }, + new Item() { Id = 9, IsIncluded = false }, + new Item() { Id = 10, IsIncluded = true } + }); + + results.RecordedChangeSets.Count.Should().Be(1, "one source operation was performed"); + + // Capture the current set of filtered items, and publish a state change, to force a re-filter + var priorFilteredItems = results.RecordedItems.ToArray(); + predicateState.OnNext(1); + + results.RecordedChangeSets.Skip(1).Count().Should().Be(1, "one source operation was performed"); + ShouldBeValid(results, source.Items.Where(item => item.Id == predicateState.Value)); + results.RecordedItems.Should().BeEquivalentTo(priorFilteredItems, options => options.WithStrictOrdering()); + } + + [Fact] + public void PredicateIsNull_ExceptionIsThrown() + => FluentActions.Invoking(() => Observable.Empty>() + .Filter( + predicateState: Observable.Empty(), + predicate: null!)) + .Should() + .Throw(); + + [Theory] + [InlineData(ListFilterPolicy.CalculateDiff)] + [InlineData(ListFilterPolicy.ClearAndReplace)] + public void PredicateStateChanges_ItemsAreReFiltered(ListFilterPolicy filterPolicy) + { + using var source = new SourceList(); + using var predicateState = new BehaviorSubject(1); + + using var subscription = source + .Connect() + .Filter( + predicateState: predicateState, + predicate: static (predicateState, item) => item.Id == predicateState, + filterPolicy: filterPolicy) + .ValidateSynchronization() + .ValidateChangeSets() + .RecordListItems(out var results); + + + // Test filtering of items, by state + source.AddRange(new[] + { + new Item() { Id = 1, IsIncluded = true }, + new Item() { Id = 2, IsIncluded = true }, + new Item() { Id = 3, IsIncluded = false }, + new Item() { Id = 4, IsIncluded = false } + }); + + results.RecordedChangeSets.Count.Should().Be(1, "one source operation was performed"); + ShouldBeValid(results, EnumerateFilteredItems()); + + + // Publish a state change, to change the filtering + predicateState.OnNext(2); + + results.RecordedChangeSets.Skip(1).Count().Should().Be(1, "one source operation was performed"); + ShouldBeValid(results, EnumerateFilteredItems()); + + + IEnumerable EnumerateFilteredItems() + => source.Items.Where(item => item.Id == predicateState.Value); + } + + [Theory] + [InlineData(ListFilterPolicy.CalculateDiff)] + [InlineData(ListFilterPolicy.ClearAndReplace)] + public void PredicateStateCompletesAfterInitialValue_CompletionWaitsForSourceCompletion(ListFilterPolicy filterPolicy) + { + using var source = new Subject>(); + + using var subscription = source + .Filter( + predicateState: Observable.Return(new object()), + predicate: static (predicateState, item) => item.IsIncluded, + filterPolicy: filterPolicy) + .ValidateSynchronization() + .ValidateChangeSets() + .RecordListItems(out var results); + + results.HasCompleted.Should().BeFalse("changes could still be generated by the source"); + results.RecordedChangeSets.Should().BeEmpty("no source operations were performed"); + + source.OnCompleted(); + + results.HasCompleted.Should().BeTrue("all input streams have completed"); + results.RecordedChangeSets.Should().BeEmpty("no source operations were performed"); + } + + [Theory] + [InlineData(ListFilterPolicy.CalculateDiff)] + [InlineData(ListFilterPolicy.ClearAndReplace)] + public void PredicateStateCompletesImmediately_CompletionIsPropagated(ListFilterPolicy filterPolicy) + { + using var source = new Subject>(); + + using var subscription = source + .Filter( + predicateState: Observable.Empty(), + predicate: static (predicateState, item) => item.IsIncluded, + filterPolicy: filterPolicy) + .ValidateSynchronization() + .ValidateChangeSets() + .RecordListItems(out var results); + + results.HasCompleted.Should().BeTrue("completion of the predicate state stream before it emits any values means that items can never be accepted by the filter predicate"); + results.RecordedChangeSets.Should().BeEmpty("no source operations were performed"); + + source.HasObservers.Should().BeFalse("all subscriptions should have been disposed, during finalization of the stream"); + } + + [Theory] + [InlineData(ListFilterPolicy.CalculateDiff)] + [InlineData(ListFilterPolicy.ClearAndReplace)] + public void PredicateStateErrors_ErrorIsPropagated(ListFilterPolicy filterPolicy) + { + using var source = new Subject>(); + using var predicateState = new Subject(); + + using var subscription = source + .Filter( + predicateState: predicateState, + predicate: static (predicateState, item) => item.IsIncluded, + filterPolicy: filterPolicy) + .ValidateSynchronization() + .ValidateChangeSets() + .RecordListItems(out var results); + + + var error = new Exception("This is a test."); + predicateState.OnError(error); + + results.Error.Should().Be(error, "errors should be propagated"); + results.RecordedChangeSets.Should().BeEmpty("no source operations were performed"); + + source.HasObservers.Should().BeFalse("all subscriptions should have been disposed, during finalization of the stream"); + predicateState.HasObservers.Should().BeFalse("all subscriptions should have been disposed, during finalization of the stream"); + } + + [Theory] + [InlineData(ListFilterPolicy.CalculateDiff)] + [InlineData(ListFilterPolicy.ClearAndReplace)] + public void PredicateStateErrorsImmediately_ErrorIsPropagated(ListFilterPolicy filterPolicy) + { + using var source = new Subject>(); + + var error = new Exception("This is a test."); + + using var subscription = source + .Filter( + predicateState: Observable.Throw(error), + predicate: static (predicateState, item) => item.IsIncluded, + filterPolicy: filterPolicy) + .ValidateSynchronization() + .ValidateChangeSets() + .RecordListItems(out var results); + + results.Error.Should().Be(error, "errors should be propagated"); + results.RecordedChangeSets.Should().BeEmpty("no source operations were performed"); + + source.HasObservers.Should().BeFalse("all subscriptions should have been disposed, during finalization of the stream"); + } + + [Fact] + public void PredicateStateIsNull_ExceptionIsThrown() + => FluentActions.Invoking(() => Observable.Empty>() + .Filter( + predicateState: (null as IObservable)!, + predicate: static (_, _) => true)) + .Should() + .Throw(); + + [Theory] + [InlineData(ListFilterPolicy.CalculateDiff)] + [InlineData(ListFilterPolicy.ClearAndReplace)] + public async Task SourceAndPredicateStateNotifyFromDifferentThreads_FilteringIsThreadSafe(ListFilterPolicy filterPolicy) + { + var randomizer = new Randomizer(0x1234567); + + (var items, var changeSets) = GenerateStressItemsAndChangeSets( + editCount: 5_000, + maxChangeCount: 20, + maxRangeSize: 10, + randomizer: randomizer); + + var predicateStates = GenerateRandomPredicateStates( + valueCount: 5_000, + randomizer: randomizer); + + + using var source = new Subject>(); + + using var predicateState = new Subject(); + + using var subscription = source + .Filter( + predicateState: predicateState, + predicate: Item.FilterByIdInclusionMask, + filterPolicy: filterPolicy) + .ValidateSynchronization() + .ValidateChangeSets() + .RecordListItems(out var results); + + using var timeoutSource = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + + //int i; + //for (i = 0; (i < changeSets.Count) && (i < predicateStates.Count); ++i) + //{ + // source.OnNext(changeSets[i]); + // predicateState.OnNext(predicateStates[i]); + //} + + //for (; i < changeSets.Count; ++i) + // source.OnNext(changeSets[i]); + + //for (; i < predicateStates.Count; ++i) + // predicateState.OnNext(predicateStates[i]); + + await Task.WhenAll( + Task.Run( + action: () => + { + foreach (var changeSet in changeSets) + source.OnNext(changeSet); + }, + cancellationToken: timeoutSource.Token), + Task.Run( + action: () => + { + foreach (var value in predicateStates) + predicateState.OnNext(value); + }, + cancellationToken: timeoutSource.Token)); + + var finalPredicateState = predicateStates[^1]; + ShouldBeValid(results, items.Where(item => Item.FilterByIdInclusionMask(finalPredicateState, item))); + } + + [Theory] + [InlineData(ListFilterPolicy.CalculateDiff)] + [InlineData(ListFilterPolicy.ClearAndReplace)] + public void SourceCompletesWhenEmpty_CompletionIsPropagated(ListFilterPolicy filterPolicy) + { + using var source = new Subject>(); + + using var predicateState = new Subject(); + + using var subscription = source + .Filter( + predicateState: predicateState, + predicate: static (predicateState, item) => item.IsIncluded, + filterPolicy: filterPolicy) + .ValidateSynchronization() + .ValidateChangeSets() + .RecordListItems(out var results); + + source.OnCompleted(); + + results.HasCompleted.Should().BeTrue("no further changes can occur when there are no items to be filtered or unfiltered"); + results.RecordedChangeSets.Should().BeEmpty("no source operations were performed"); + + predicateState.HasObservers.Should().BeFalse("all subscriptions should have been disposed, during finalization of the stream"); + } + + [Theory] + [InlineData(ListFilterPolicy.CalculateDiff)] + [InlineData(ListFilterPolicy.ClearAndReplace)] + public void SourceCompletesWhenNotEmpty_CompletionWaitsForStateCompletion(ListFilterPolicy filterPolicy) + { + using var source = new Subject>(); + + using var predicateState = new Subject(); + + using var subscription = source + .Filter( + predicateState: predicateState, + predicate: static (predicateState, item) => item.IsIncluded, + filterPolicy: filterPolicy) + .ValidateSynchronization() + .ValidateChangeSets() + .RecordListItems(out var results); + + source.OnNext(new ChangeSet() { new(reason: ListChangeReason.Add, current: new Item() { Id = 1, IsIncluded = true }, index: 0) }); + source.OnCompleted(); + + results.HasCompleted.Should().BeFalse("changes could still be generated by changes in predicate state"); + results.RecordedChangeSets.Should().BeEmpty("the predicate has not initialized"); + + predicateState.OnCompleted(); + + results.HasCompleted.Should().BeTrue("all input streams have completed"); + results.RecordedChangeSets.Should().BeEmpty("the predicate never initialized"); + } + + [Theory] + [InlineData(ListFilterPolicy.CalculateDiff)] + [InlineData(ListFilterPolicy.ClearAndReplace)] + public void SourceCompletesImmediately_CompletionIsPropagated(ListFilterPolicy filterPolicy) + { + using var predicateState = new Subject(); + + using var subscription = Observable.Empty>() + .Filter( + predicateState: predicateState, + predicate: static (predicateState, item) => item.IsIncluded, + filterPolicy: filterPolicy) + .ValidateSynchronization() + .ValidateChangeSets() + .RecordListItems(out var results); + + results.HasCompleted.Should().BeTrue("no further changes can occur when there are no items to be filtered or unfiltered"); + results.RecordedChangeSets.Should().BeEmpty("no source operations were performed"); + + predicateState.HasObservers.Should().BeFalse("all subscriptions should have been disposed, during finalization of the stream"); + } + + [Theory] + [InlineData(ListFilterPolicy.CalculateDiff)] + [InlineData(ListFilterPolicy.ClearAndReplace)] + public void SourceErrors_ErrorIsPropagated(ListFilterPolicy filterPolicy) + { + using var source = new Subject>(); + + using var predicateState = new Subject(); + + using var subscription = source + .Filter( + predicateState: predicateState, + predicate: static (predicateState, item) => item.IsIncluded, + filterPolicy: filterPolicy) + .ValidateSynchronization() + .ValidateChangeSets() + .RecordListItems(out var results); + + + var error = new Exception("This is a test."); + source.OnError(error); + + results.Error.Should().Be(error, "errors should be propagated"); + results.RecordedChangeSets.Should().BeEmpty("no source operations were performed"); + + source.HasObservers.Should().BeFalse("all subscriptions should have been disposed, during finalization of the stream"); + predicateState.HasObservers.Should().BeFalse("all subscriptions should have been disposed, during finalization of the stream"); + } + + [Theory] + [InlineData(ListFilterPolicy.CalculateDiff)] + [InlineData(ListFilterPolicy.ClearAndReplace)] + public void SourceErrorsImmediately_ErrorIsPropagated(ListFilterPolicy filterPolicy) + { + using var predicateState = new Subject(); + + var error = new Exception("This is a test."); + + using var subscription = Observable.Throw>(error) + .Filter( + predicateState: predicateState, + predicate: static (predicateState, item) => item.IsIncluded, + filterPolicy: filterPolicy) + .ValidateSynchronization() + .ValidateChangeSets() + .RecordListItems(out var results); + + results.Error.Should().Be(error, "errors should be propagated"); + results.RecordedChangeSets.Should().BeEmpty("no source operations were performed"); + + predicateState.HasObservers.Should().BeFalse("all subscriptions should have been disposed, during finalization of the stream"); + } + + [Fact] + public void SourceIsNull_ExceptionIsThrown() + => FluentActions.Invoking(() => ObservableListEx.Filter( + source: (null as IObservable>)!, + predicateState: Observable.Empty(), + predicate: static (_, _) => true)) + .Should() + .Throw(); + + [Theory] + [InlineData(ListFilterPolicy.CalculateDiff)] + [InlineData(ListFilterPolicy.ClearAndReplace)] + public void SubscriptionIsDisposed_UnsubscriptionIsPropagated(ListFilterPolicy filterPolicy) + { + using var source = new Subject>(); + + using var predicateState = new Subject(); + + using var subscription = source + .Filter( + predicateState: predicateState, + predicate: static (predicateState, item) => item.IsIncluded, + filterPolicy: filterPolicy) + .ValidateSynchronization() + .ValidateChangeSets() + .RecordListItems(out var results); + + + subscription.Dispose(); + + source.HasObservers.Should().BeFalse("subscription disposal should be propagated to all input streams"); + predicateState.HasObservers.Should().BeFalse("subscription disposal should be propagated to all input streams"); + + results.RecordedChangeSets.Should().BeEmpty("no source operations were performed"); + } + + [Theory] + [InlineData("source", "predicateState")] + [InlineData("predicateState", "source")] + public void SuppressEmptyChangeSetsIsFalse_EmptyChangesetsArePropagatedAndOnlyFinalCompletionIsPropagated(params string[] completionOrder) + { + using var source = new Subject>(); + + using var predicateState = new Subject(); + + using var subscription = source + .Filter( + predicateState: predicateState, + predicate: static (predicateState, item) => item.IsIncluded, + suppressEmptyChangeSets: false) + .ValidateSynchronization() + .ValidateChangeSets() + .RecordListItems(out var results); + + + // Initialize the predicate + predicateState.OnNext(new object()); + + results.RecordedChangeSets.Count.Should().Be(1, "the predicate state was initialized"); + results.RecordedChangeSets[0].Should().BeEmpty("there are no items in the collection"); + ShouldBeValid(results, Enumerable.Empty()); + + + // Publish an empty changeset + source.OnNext(ChangeSet.Empty); + + results.RecordedChangeSets.Skip(1).Count().Should().Be(1, "a source operation was performed"); + results.RecordedChangeSets.Skip(1).First().Should().BeEmpty("the source changeset was empty"); + ShouldBeValid(results, Enumerable.Empty()); + + + // Publish a changeset with only excluded items + source.OnNext(new ChangeSet() + { + new(reason: ListChangeReason.AddRange, + items: new[] + { + new Item() { Id = 1, IsIncluded = false }, + new Item() { Id = 2, IsIncluded = false }, + new Item() { Id = 3, IsIncluded = false } + }, + index: 0) + }); + + results.RecordedChangeSets.Skip(2).Count().Should().Be(1, "a source operation was performed"); + results.RecordedChangeSets.Skip(2).First().Should().BeEmpty("all source items were excluded"); + ShouldBeValid(results, Enumerable.Empty()); + + for (var i = 0; i < completionOrder.Length; ++i) + { + switch (completionOrder[i]) + { + case nameof(source): + source.OnCompleted(); + break; + + case nameof(predicateState): + predicateState.OnCompleted(); + break; + } + + if (i < (completionOrder.Length - 1)) + results.HasCompleted.Should().BeFalse("not all input streams have completed"); + } + + results.HasCompleted.Should().BeTrue("all input streams have completed"); + } + + private static void ShouldBeValid( + ListItemRecordingObserver results, + IEnumerable expectedFilteredItems) + { + results.Error.Should().BeNull("no errors should have occurred"); + results.HasCompleted.Should().BeFalse("no completion events should have occurred"); + results.RecordedItems.Should().BeEquivalentTo(expectedFilteredItems, "all filtered items should match the filter predicate"); + } + + private static (IList items, IReadOnlyList> changeSets) GenerateStressItemsAndChangeSets( + int editCount, + int maxChangeCount, + int maxRangeSize, + Randomizer randomizer) + { + var changeReasons = new[] + { + ListChangeReason.Add, + ListChangeReason.AddRange, + ListChangeReason.Clear, + ListChangeReason.Moved, + ListChangeReason.Refresh, + ListChangeReason.Remove, + ListChangeReason.RemoveRange, + ListChangeReason.Replace + }; + + // Weights are chosen to make the cache size likely to grow over time, + // exerting more pressure on the system the longer the benchmark runs. + // Also, to prevent bogus operations (E.G. you can't remove an item from an empty cache). + var changeReasonWeightsWhenCountIs0 = new[] + { + 0.5f, // Add + 0.5f, // AddRange + 0.0f, // Clear + 0.0f, // Moved + 0.0f, // Refresh + 0.0f, // Remove + 0.0f, // RemoveRange + 0.0f // Replace + }; + + var changeReasonWeightsWhenCountIs1 = new[] + { + 0.400f, // Add + 0.400f, // AddRange + 0.001f, // Clear + 0.000f, // Moved + 0.000f, // Refresh + 0.199f, // Remove + 0.000f, // RemoveRange + 0.000f // Replace + }; + + var changeReasonWeightsOtherwise = new[] + { + 0.250f, // Add + 0.250f, // AddRange + 0.001f, // Clear + 0.100f, // Moved + 0.099f, // Refresh + 0.100f, // Remove + 0.100f, // RemoveRange + 0.100f // Replace + }; + + var nextItemId = 1; + + var changeSets = new List>(capacity: editCount); + + var items = new ChangeAwareList(); + + while (changeSets.Count < changeSets.Capacity) + { + var changeCount = randomizer.Int(1, maxChangeCount); + for (var i = 0; i < changeCount; ++i) + { + var changeReason = randomizer.WeightedRandom(changeReasons, items.Count switch + { + 0 => changeReasonWeightsWhenCountIs0, + 1 => changeReasonWeightsWhenCountIs1, + _ => changeReasonWeightsOtherwise + }); + + switch (changeReason) + { + case ListChangeReason.Add: + items.Add(new Item() + { + Id = nextItemId++, + IsIncluded = randomizer.Bool() + }); + break; + + case ListChangeReason.AddRange: + items.AddRange(Enumerable.Repeat(0, randomizer.Int(1, maxRangeSize)) + .Select(_ => new Item() + { + Id = nextItemId++, + IsIncluded = randomizer.Bool() + })); + break; + + case ListChangeReason.Clear: + items.Clear(); + break; + + case ListChangeReason.Moved: + items.Move( + original: randomizer.Int(0, items.Count - 1), + destination: randomizer.Int(0, items.Count - 1)); + break; + + case ListChangeReason.Refresh: + items.RefreshAt(randomizer.Int(0, items.Count - 1)); + break; + + case ListChangeReason.Remove: + items.RemoveAt(randomizer.Int(0, items.Count - 1)); + break; + + case ListChangeReason.RemoveRange: + { + var rangeStartIndex = randomizer.Int(0, items.Count - 1); + + items.RemoveRange( + index: rangeStartIndex, + count: Math.Min(items.Count - rangeStartIndex, randomizer.Int(1, maxRangeSize))); + } + break; + + case ListChangeReason.Replace: + items[randomizer.Int(0, items.Count - 1)] = new Item() + { + Id = nextItemId++, + IsIncluded = randomizer.Bool() + }; + break; + } + } + + changeSets.Add(items.CaptureChanges()); + } + + return (items, changeSets); + } + + private static IReadOnlyList GenerateRandomPredicateStates( + int valueCount, + Randomizer randomizer) + { + var values = new List(capacity: valueCount); + + while (values.Count < valueCount) + values.Add(randomizer.Int()); + + return values; + } + + private class Item + { + public static bool FilterByIdInclusionMask( + int idInclusionMask, + Item item) + => ((item.Id & idInclusionMask) == 0) && item.IsIncluded; + + public required int Id { get; init; } + + public bool IsIncluded { get; set; } + + public override string ToString() + => $"{{ Id = {Id}, IsIncluded = {IsIncluded} }}"; + } + } +} diff --git a/src/DynamicData.Tests/List/FilterFixture.cs b/src/DynamicData.Tests/List/FilterFixture.cs index 3b3878bfa..17ce6dc03 100644 --- a/src/DynamicData.Tests/List/FilterFixture.cs +++ b/src/DynamicData.Tests/List/FilterFixture.cs @@ -9,7 +9,7 @@ namespace DynamicData.Tests.List; -public class FilterFixture : IDisposable +public partial class FilterFixture : IDisposable { private readonly ChangeSetAggregator _results; diff --git a/src/DynamicData.Tests/Utilities/ObservableExtensions.cs b/src/DynamicData.Tests/Utilities/ObservableExtensions.cs index 76473e153..acd336d0e 100644 --- a/src/DynamicData.Tests/Utilities/ObservableExtensions.cs +++ b/src/DynamicData.Tests/Utilities/ObservableExtensions.cs @@ -64,6 +64,17 @@ public static IDisposable RecordCacheItems( return source.Subscribe(observer); } + public static IDisposable RecordListItems( + this IObservable> source, + out ListItemRecordingObserver observer, + IScheduler? scheduler = null) + where T : notnull + { + observer = new ListItemRecordingObserver(scheduler ?? GlobalConfig.DefaultScheduler); + + return source.Subscribe(observer); + } + public static IDisposable RecordValues( this IObservable source, out ValueRecordingObserver observer, @@ -84,6 +95,8 @@ public static IObservable> ValidateChangeSets(this IObservable< var reasons = Enum.GetValues(); + var receivedChangeSets = new List>(); + return source.SubscribeSafe(RawAnonymousObserver.Create>( onNext: changes => { @@ -99,16 +112,11 @@ public static IObservable> ValidateChangeSets(this IObservable< { case ChangeType.Item: change.Item.Reason.Should().Be(change.Reason); - change.Range.Should().BeEmpty("single-item changes should not specify range info"); break; case ChangeType.Range: - change.Item.Reason.Should().Be(default, "range changes should not specify single-item info"); - change.Item.PreviousIndex.Should().Be(-1, "range changes should not specify single-item info"); - change.Item.Previous.HasValue.Should().BeFalse("range changes should not specify single-item info"); - change.Item.CurrentIndex.Should().Be(-1, "range changes should not specify single-item info"); - change.Item.Current.Should().Be(default, "range changes should not specify single-item info"); + change.Item.Should().Be(default(ItemChange), "range changes should not specify single-item info"); break; } @@ -129,7 +137,7 @@ public static IObservable> ValidateChangeSets(this IObservable< break; case ListChangeReason.AddRange: - change.Range.Index.Should().BeInRange(-1, sortedItems.Count - 1, "the insertion index should be omitted, a valid index of the collection, or the next available index of the collection"); + change.Range.Index.Should().BeInRange(-1, sortedItems.Count, "the insertion index should be omitted, a valid index of the collection, or the next available index of the collection"); if (change.Range.Index is -1) sortedItems.AddRange(change.Range); else @@ -140,11 +148,9 @@ public static IObservable> ValidateChangeSets(this IObservable< break; case ListChangeReason.Clear: - change.Range.Index.Should().Be(-1, "a Clear change has no target index"); - change.Range.Should().BeEquivalentTo( - sortedItems, - config => config.WithStrictOrdering(), - "items in the range should match the corresponding items in the collection"); + change.Range.Index.Should().Be(-1, "a Clear change applies to an entire collection, it does not have a specific index"); + // The fact that ChangeAwareList can generate Clear changesets with items listed not in the order that they appear in the source seems like a defect to me. Maybe fix? + change.Range.Should().BeEquivalentTo(sortedItems, "items in the range should match the corresponding items in the collection"); sortedItems.Clear(); @@ -169,7 +175,8 @@ public static IObservable> ValidateChangeSets(this IObservable< sortedItems.Should().NotBeEmpty("an item cannot be refreshed within an empty collection"); change.Item.PreviousIndex.Should().Be(-1, "only Moved changes should specify a previous index"); - change.Item.Previous.HasValue.Should().BeFalse("only Update changes should specify a previous item"); + // This should likely be fixed. The purpose of Refresh changes is to force re-evaluation of an item that specifically has not changed, the previous item will always be the current item, by definition. + //change.Item.Previous.HasValue.Should().BeFalse("only Update changes should specify a previous item"); change.Item.CurrentIndex.Should().BeInRange(0, sortedItems.Count - 1, "the target index should be a valid index of the collection"); change.Item.Current.Should().Be(sortedItems[change.Item.CurrentIndex], "the item to be refreshed should match the corresponding item in the collection"); @@ -211,10 +218,10 @@ public static IObservable> ValidateChangeSets(this IObservable< case ListChangeReason.Replace: sortedItems.Should().NotBeEmpty("an item cannot be replaced within an empty collection"); - change.Item.PreviousIndex.Should().Be(-1, "only Moved changes should specify a previous index"); + change.Item.PreviousIndex.Should().BeInRange(0, sortedItems.Count - 1, "the index of replacement should be a valid index of the collection"); change.Item.CurrentIndex.Should().BeInRange(0, sortedItems.Count - 1, "the index to be replaced should be a valid index of the collection"); change.Item.Previous.HasValue.Should().BeTrue("a Replace change should specify a previous item"); - change.Item.Previous.Should().Be(sortedItems[change.Item.CurrentIndex], "the replaced item should match the corresponding item in the collection"); + change.Item.Previous.Value.Should().Be(sortedItems[change.Item.CurrentIndex], "the replaced item should match the corresponding item in the collection"); sortedItems[change.Item.CurrentIndex] = change.Item.Current; @@ -226,6 +233,10 @@ public static IObservable> ValidateChangeSets(this IObservable< { observer.OnError(ex); } + + observer.OnNext(changes); + + receivedChangeSets.Add(changes); }, onError: observer.OnError, onCompleted: observer.OnCompleted)); diff --git a/src/DynamicData.Tests/Utilities/TestSourceList.cs b/src/DynamicData.Tests/Utilities/TestSourceList.cs index ad4a07bdc..6d3387d51 100644 --- a/src/DynamicData.Tests/Utilities/TestSourceList.cs +++ b/src/DynamicData.Tests/Utilities/TestSourceList.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Linq; using System.Reactive.Linq; using System.Reactive.Subjects; @@ -12,12 +13,16 @@ public sealed class TestSourceList private readonly IObservable _countChanged; private readonly BehaviorSubject _error; private readonly BehaviorSubject _hasCompleted; + private readonly Subject> _refreshRequested; + private readonly Subject> _refreshRequestedPreview; private readonly SourceList _source; public TestSourceList() { _error = new(null); _hasCompleted = new(false); + _refreshRequested = new(); + _refreshRequestedPreview = new(); _source = new(); _countChanged = WrapStream(_source.CountChanged); @@ -33,7 +38,9 @@ public IReadOnlyList Items => _source.Items; public IObservable> Connect(Func? predicate = null) - => WrapStream(_source.Connect(predicate)); + => WrapStream(Observable.Merge( + _source.Connect(predicate), + _refreshRequested)); public void Complete() { @@ -47,6 +54,8 @@ public void Dispose() _source.Dispose(); _error.Dispose(); _hasCompleted.Dispose(); + _refreshRequested.Dispose(); + _refreshRequestedPreview.Dispose(); } public void Edit(Action> updateAction) @@ -57,7 +66,37 @@ public void Edit(Action> updateAction) } public IObservable> Preview(Func? predicate = null) - => WrapStream(_source.Preview(predicate)); + => WrapStream(Observable.Merge( + _source.Preview(predicate), + _refreshRequestedPreview)); + + // TODO: Formally add this to ISourceList + public void Refresh(int index) + { + var changeSet = new ChangeSet(capacity: 1) + { + new Change( + reason: ListChangeReason.Refresh, + current: _source.Items.ElementAt(index), + index: index) + }; + + _refreshRequestedPreview.OnNext(changeSet); + _refreshRequested.OnNext(changeSet); + } + + // TODO: Formally add this to ISourceList + public void Refresh(IEnumerable indexes) + { + var changeSet = new ChangeSet(indexes + .Select(index => new Change( + reason: ListChangeReason.Refresh, + current: _source.Items.ElementAt(index), + index: index))); + + _refreshRequestedPreview.OnNext(changeSet); + _refreshRequested.OnNext(changeSet); + } public void SetError(Exception error) { diff --git a/src/DynamicData/Cache/Internal/Filter.WithPredicateState.cs b/src/DynamicData/Cache/Internal/Filter.WithPredicateState.cs new file mode 100644 index 000000000..c974ed09c --- /dev/null +++ b/src/DynamicData/Cache/Internal/Filter.WithPredicateState.cs @@ -0,0 +1,337 @@ +// Copyright (c) 2011-2023 Roland Pheasant. All rights reserved. +// Roland Pheasant licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +using System; +using System.Reactive; +using System.Reactive.Disposables; +using System.Reactive.Linq; +using System.Transactions; + +using DynamicData.Internal; + +namespace DynamicData.Cache.Internal; + +internal static partial class Filter +{ + public static class WithPredicateState + where TObject : notnull + where TKey : notnull + { + public static IObservable> Create( + IObservable> source, + IObservable predicateState, + Func predicate, + bool suppressEmptyChangeSets) + { + source.ThrowArgumentNullExceptionIfNull(nameof(source)); + predicateState.ThrowArgumentNullExceptionIfNull(nameof(predicateState)); + predicate.ThrowArgumentNullExceptionIfNull(nameof(predicate)); + + return Observable.Create>(downstreamObserver => new Subscription( + downstreamObserver: downstreamObserver, + predicate: predicate, + predicateState: predicateState, + source: source, + suppressEmptyChangeSets: suppressEmptyChangeSets)); + } + + private sealed class Subscription + : IDisposable + { + private readonly List> _downstreamChangesBuffer; + private readonly IObserver> _downstreamObserver; + private readonly Dictionary _itemStatesByKey; + private readonly Func _predicate; + private readonly IDisposable? _predicateStateSubscription; + private readonly IDisposable? _sourceSubscription; + private readonly bool _suppressEmptyChangeSets; + + private bool _hasPredicateStateCompleted; + private bool _hasSourceCompleted; + private bool _isLatestPredicateStateValid; + private TState _latestPredicateState; + + public Subscription( + IObserver> downstreamObserver, + Func predicate, + IObservable predicateState, + IObservable> source, + bool suppressEmptyChangeSets) + { + _downstreamObserver = downstreamObserver; + _predicate = predicate; + _suppressEmptyChangeSets = suppressEmptyChangeSets; + + _downstreamChangesBuffer = new(); + _itemStatesByKey = new(); + + _latestPredicateState = default!; + + var onError = new Action(OnError); + + _predicateStateSubscription = predicateState + .SubscribeSafe( + onNext: OnPredicateStateNext, + onError: onError, + onCompleted: OnPredicateStateCompleted); + + _sourceSubscription = source + .SubscribeSafe( + onNext: OnSourceNext, + onError: onError, + onCompleted: OnSourceCompleted); + } + + public void Dispose() + { + _predicateStateSubscription?.Dispose(); + _sourceSubscription?.Dispose(); + } + + private object DownstreamSynchronizationGate + => _downstreamChangesBuffer; + + private object UpstreamSynchronizationGate + => _itemStatesByKey; + + private IChangeSet AssembleDownstreamChanges() + { + if (_downstreamChangesBuffer.Count is 0) + return ChangeSet.Empty; + + var downstreamChanges = new ChangeSet(_downstreamChangesBuffer); + _downstreamChangesBuffer.Clear(); + + return downstreamChanges; + } + + private void OnError(Exception error) + { + using var @lock = SwappableLock.CreateAndEnter(UpstreamSynchronizationGate); + + _predicateStateSubscription?.Dispose(); + _sourceSubscription?.Dispose(); + + @lock.SwapTo(DownstreamSynchronizationGate); + + _downstreamObserver.OnError(error); + } + + private void OnPredicateStateCompleted() + { + using var @lock = SwappableLock.CreateAndEnter(UpstreamSynchronizationGate); + + _hasPredicateStateCompleted = true; + + // If we didn't get at least one predicateState value, we can't ever emit any (non-empty) downstream changesets, + // no matter how many items come through from source, so just go ahead and complete now. + if (_hasSourceCompleted || (!_isLatestPredicateStateValid && _suppressEmptyChangeSets)) + { + @lock.SwapTo(DownstreamSynchronizationGate); + + _downstreamObserver.OnCompleted(); + } + } + + private void OnPredicateStateNext(TState predicateState) + { + using var @lock = SwappableLock.CreateAndEnter(UpstreamSynchronizationGate); + + _latestPredicateState = predicateState; + _isLatestPredicateStateValid = true; + + foreach (var key in _itemStatesByKey.Keys) + { + var itemState = _itemStatesByKey[key]; + + var isIncluded = _predicate.Invoke(predicateState, itemState.Item); + + if (isIncluded && !itemState.IsIncluded) + { + _downstreamChangesBuffer.Add(new( + reason: ChangeReason.Add, + key: key, + current: itemState.Item)); + } + else if (!isIncluded && itemState.IsIncluded) + { + _downstreamChangesBuffer.Add(new( + reason: ChangeReason.Remove, + key: key, + current: itemState.Item)); + } + + _itemStatesByKey[key] = new() + { + IsIncluded = isIncluded, + Item = itemState.Item + }; + } + + var downstreamChanges = AssembleDownstreamChanges(); + if ((downstreamChanges.Count is not 0) || !_suppressEmptyChangeSets) + { + @lock.SwapTo(DownstreamSynchronizationGate); + + _downstreamObserver.OnNext(downstreamChanges); + } + } + + private void OnSourceCompleted() + { + using var @lock = SwappableLock.CreateAndEnter(UpstreamSynchronizationGate); + + _hasSourceCompleted = true; + + // We can never emit any (non-empty) downstream changes in the future, if the collection is empty + // and the source has reported that it'll never change, so go ahead and complete now. + if (_hasPredicateStateCompleted || ((_itemStatesByKey.Count is 0) && _suppressEmptyChangeSets)) + { + @lock.SwapTo(DownstreamSynchronizationGate); + + _downstreamObserver.OnCompleted(); + } + } + + private void OnSourceNext(IChangeSet upstreamChanges) + { + using var @lock = SwappableLock.CreateAndEnter(UpstreamSynchronizationGate); + + foreach (var change in upstreamChanges.ToConcreteType()) + { + switch (change.Reason) + { + case ChangeReason.Add: + { + var isIncluded = _isLatestPredicateStateValid && _predicate.Invoke(_latestPredicateState, change.Current); + + _itemStatesByKey.Add( + key: change.Key, + value: new() + { + IsIncluded = isIncluded, + Item = change.Current + }); + + if (isIncluded) + { + _downstreamChangesBuffer.Add(new( + reason: ChangeReason.Add, + key: change.Key, + current: change.Current)); + } + } + break; + + // Intentionally not supporting Moved changes, too much work to try and track indexes. + + case ChangeReason.Refresh: + { + var itemState = _itemStatesByKey[change.Key]; + + var isIncluded = _isLatestPredicateStateValid && _predicate.Invoke(_latestPredicateState, itemState.Item); + + if (isIncluded && itemState.IsIncluded) + { + _downstreamChangesBuffer.Add(new( + reason: ChangeReason.Refresh, + key: change.Key, + current: itemState.Item)); + } + else if (isIncluded && !itemState.IsIncluded) + { + _downstreamChangesBuffer.Add(new( + reason: ChangeReason.Add, + key: change.Key, + current: itemState.Item)); + } + else if (!isIncluded && itemState.IsIncluded) + { + _downstreamChangesBuffer.Add(new( + reason: ChangeReason.Remove, + key: change.Key, + current: itemState.Item)); + } + + _itemStatesByKey[change.Key] = new() + { + IsIncluded = isIncluded, + Item = itemState.Item + }; + } + break; + + case ChangeReason.Remove: + { + var itemState = _itemStatesByKey[change.Key]; + + _itemStatesByKey.Remove(change.Key); + + if (itemState.IsIncluded) + { + _downstreamChangesBuffer.Add(new( + reason: ChangeReason.Remove, + key: change.Key, + current: itemState.Item)); + } + } + break; + + case ChangeReason.Update: + { + var itemState = _itemStatesByKey[change.Key]; + + var isIncluded = _isLatestPredicateStateValid && _predicate.Invoke(_latestPredicateState, change.Current); + + if (isIncluded && itemState.IsIncluded) + { + _downstreamChangesBuffer.Add(new( + reason: ChangeReason.Update, + key: change.Key, + current: change.Current, + previous: itemState.Item)); + } + else if (isIncluded && !itemState.IsIncluded) + { + _downstreamChangesBuffer.Add(new( + reason: ChangeReason.Add, + key: change.Key, + current: change.Current)); + } + else if (!isIncluded && itemState.IsIncluded) + { + _downstreamChangesBuffer.Add(new( + reason: ChangeReason.Remove, + key: change.Key, + current: itemState.Item)); + } + + _itemStatesByKey[change.Key] = new() + { + IsIncluded = isIncluded, + Item = change.Current + }; + } + break; + } + } + + var downstreamChanges = AssembleDownstreamChanges(); + if ((downstreamChanges.Count is not 0) || !_suppressEmptyChangeSets) + { + @lock.SwapTo(DownstreamSynchronizationGate); + + _downstreamObserver.OnNext(downstreamChanges); + } + } + } + + private readonly struct ItemState + { + public required bool IsIncluded { get; init; } + + public required TObject Item { get; init; } + } + } +} diff --git a/src/DynamicData/Cache/ObservableCacheEx.cs b/src/DynamicData/Cache/ObservableCacheEx.cs index 38737f82d..42879507b 100644 --- a/src/DynamicData/Cache/ObservableCacheEx.cs +++ b/src/DynamicData/Cache/ObservableCacheEx.cs @@ -1476,6 +1476,34 @@ public static IObservable> Filter(this return source.Filter(predicateChanged, Observable.Empty(), suppressEmptyChangeSets); } + /// + /// Creates a filtered stream which can be dynamically filtered, based on state values passed through to a static filtering predicate. + /// + /// The type of the object. + /// The type of the key. + /// The type of state value required by . + /// The source. + /// A stream of state values to be passed to . + /// A static predicate to be used to determine which items should be included or excluded by the filter. + /// By default empty changeset notifications are suppressed for performance reasons. Set to false to publish empty changesets. Doing so can be useful for monitoring loading status. + /// An observable which emits change sets. + /// Throws for , , and . + /// + /// Usually, should emit an initial value, immediately upon subscription. This is because cannot be invoked until the first state value is received, and accordingly, the operator will treat all items as excluded until then. Each value emitted by will trigger a full re-filtering of the entire collection. + /// + public static IObservable> Filter( + this IObservable> source, + IObservable predicateState, + Func predicate, + bool suppressEmptyChangeSets = true) + where TObject : notnull + where TKey : notnull + => Cache.Internal.Filter.WithPredicateState.Create( + source: source, + predicateState: predicateState, + predicate: predicate, + suppressEmptyChangeSets: suppressEmptyChangeSets); + /// /// Creates a filtered stream which can be dynamically filtered. /// diff --git a/src/DynamicData/List/Internal/Filter.WithPredicateState.cs b/src/DynamicData/List/Internal/Filter.WithPredicateState.cs new file mode 100644 index 000000000..e89c403e8 --- /dev/null +++ b/src/DynamicData/List/Internal/Filter.WithPredicateState.cs @@ -0,0 +1,1267 @@ +// Copyright (c) 2011-2023 Roland Pheasant. All rights reserved. +// Roland Pheasant licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +using System.Reactive.Linq; + +using DynamicData.Internal; + +namespace DynamicData.List.Internal; + +internal static partial class Filter +{ + public static class WithPredicateState + where T : notnull + { + public static IObservable> Create( + IObservable> source, + IObservable predicateState, + Func predicate, + ListFilterPolicy filterPolicy = ListFilterPolicy.CalculateDiff, + bool suppressEmptyChangeSets = true) + { + source.ThrowArgumentNullExceptionIfNull(nameof(source)); + predicateState.ThrowArgumentNullExceptionIfNull(nameof(predicateState)); + predicate.ThrowArgumentNullExceptionIfNull(nameof(predicate)); + + if (!EnumEx.IsDefined(filterPolicy)) + throw new ArgumentException($"Invalid {nameof(ListFilterPolicy)} value {filterPolicy}"); + + return Observable.Create>(observer => + { + SubscriptionBase subscription = (filterPolicy is ListFilterPolicy.CalculateDiff) + ? new CalculateDiffSubscription( + downstreamObserver: observer, + predicate: predicate, + suppressEmptyChangeSets: suppressEmptyChangeSets) + : new ClearAndReplaceSubscription( + downstreamObserver: observer, + predicate: predicate, + suppressEmptyChangeSets: suppressEmptyChangeSets); + + subscription.Activate( + predicateState: predicateState, + source: source); + + return subscription; + }); + } + + private abstract class SubscriptionBase + : IDisposable + { + private readonly List> _downstreamChangesBuffer; + private readonly IObserver> _downstreamObserver; + private readonly List _itemsBuffer; + private readonly List _itemStates; + private readonly List _itemStatesBuffer; + private readonly Func _predicate; + private readonly bool _suppressEmptyChangeSets; + + private bool _hasPredicateStateCompleted; + private bool _hasSourceCompleted; + private bool _isLatestPredicateStateValid; + private TState _latestPredicateState; + private IDisposable? _predicateStateSubscription; + private IDisposable? _sourceSubscription; + + protected SubscriptionBase( + IObserver> downstreamObserver, + Func predicate, + bool suppressEmptyChangeSets) + { + _downstreamObserver = downstreamObserver; + _predicate = predicate; + _suppressEmptyChangeSets = suppressEmptyChangeSets; + + _downstreamChangesBuffer = new(); + _itemsBuffer = new(); + _itemStates = new(); + _itemStatesBuffer = new(); + _latestPredicateState = default!; + } + + // Keeping subscriptions out of the constructor prevents subscriptions that emit immediately from triggering virtual method calls within the constructor. + public void Activate( + IObservable predicateState, + IObservable> source) + { + var onError = OnError; + + _predicateStateSubscription = predicateState + .SubscribeSafe( + onNext: OnPredicateStateNext, + onError: onError, + onCompleted: OnPredicateStateCompleted); + + _sourceSubscription = source + .SubscribeSafe( + onNext: OnSourceNext, + onError: onError, + onCompleted: OnSourceCompleted); + } + + public void Dispose() + { + _predicateStateSubscription?.Dispose(); + _sourceSubscription?.Dispose(); + } + + protected List> DownstreamChangesBuffer + => _downstreamChangesBuffer; + + protected bool IsLatestPredicateStateValid + => _isLatestPredicateStateValid; + + protected List ItemsBuffer + => _itemsBuffer; + + protected List ItemStates + => _itemStates; + + protected List ItemStatesBuffer + => _itemStatesBuffer; + + protected TState LatestPredicateState + => _latestPredicateState; + + protected Func Predicate + => _predicate; + + protected abstract void PerformAdd(ItemChange change); + + protected abstract void PerformAddRange(RangeChange change); + + protected abstract void PerformClear(); + + protected abstract void PerformMove(ItemChange change); + + protected abstract void PerformReFilter(); + + protected abstract void PerformRefresh(ItemChange change); + + protected abstract void PerformRemove(ItemChange change); + + protected abstract void PerformRemoveRange(RangeChange change); + + protected abstract void PerformReplace(ItemChange change); + + private object DownstreamSynchronizationGate + => _downstreamChangesBuffer; + + private object UpstreamSynchronizationGate + => _itemStates; + + private IChangeSet AssembleDownstreamChanges() + { + if (_downstreamChangesBuffer.Count is 0) + return ChangeSet.Empty; + + var downstreamChanges = new ChangeSet(_downstreamChangesBuffer); + _downstreamChangesBuffer.Clear(); + + return downstreamChanges; + } + + private void OnError(Exception error) + { + var hasUpstreamLock = false; + var hasDownstreamLock = false; + try + { + Monitor.Enter(UpstreamSynchronizationGate, ref hasUpstreamLock); + + _predicateStateSubscription?.Dispose(); + _sourceSubscription?.Dispose(); + + Monitor.Enter(DownstreamSynchronizationGate, ref hasDownstreamLock); + + if (hasUpstreamLock) + { + Monitor.Exit(UpstreamSynchronizationGate); + hasUpstreamLock = false; + } + + _downstreamObserver.OnError(error); + } + finally + { + if (hasUpstreamLock) + Monitor.Exit(UpstreamSynchronizationGate); + + if (hasDownstreamLock) + Monitor.Exit(DownstreamSynchronizationGate); + } + } + + private void OnPredicateStateCompleted() + { + var hasUpstreamLock = false; + var hasDownstreamLock = false; + try + { + Monitor.Enter(UpstreamSynchronizationGate, ref hasUpstreamLock); + + _hasPredicateStateCompleted = true; + + // If we didn't get at least one predicateState value, we can't ever emit any (non-empty) downstream changesets, + // no matter how many items come through from source, so just go ahead and complete now. + if (_hasSourceCompleted || (!_isLatestPredicateStateValid && _suppressEmptyChangeSets)) + { + Monitor.Enter(DownstreamSynchronizationGate, ref hasDownstreamLock); + + if (hasUpstreamLock) + { + Monitor.Exit(UpstreamSynchronizationGate); + hasUpstreamLock = false; + } + + _downstreamObserver.OnCompleted(); + } + } + finally + { + if (hasUpstreamLock) + Monitor.Exit(UpstreamSynchronizationGate); + + if (hasDownstreamLock) + Monitor.Exit(DownstreamSynchronizationGate); + } + } + + private void OnPredicateStateNext(TState predicateState) + { + var hasUpstreamLock = false; + var hasDownstreamLock = false; + try + { + Monitor.Enter(UpstreamSynchronizationGate, ref hasUpstreamLock); + + _latestPredicateState = predicateState; + _isLatestPredicateStateValid = true; + + PerformReFilter(); + + var downstreamChanges = AssembleDownstreamChanges(); + + if ((downstreamChanges.Count is not 0) || !_suppressEmptyChangeSets) + { + Monitor.Enter(DownstreamSynchronizationGate, ref hasDownstreamLock); + + if (hasUpstreamLock) + { + Monitor.Exit(UpstreamSynchronizationGate); + hasUpstreamLock = false; + } + + _downstreamObserver.OnNext(downstreamChanges); + } + } + finally + { + if (hasUpstreamLock) + Monitor.Exit(UpstreamSynchronizationGate); + + if (hasDownstreamLock) + Monitor.Exit(DownstreamSynchronizationGate); + } + } + + private void OnSourceCompleted() + { + var hasUpstreamLock = false; + var hasDownstreamLock = false; + try + { + Monitor.Enter(UpstreamSynchronizationGate, ref hasUpstreamLock); + + _hasSourceCompleted = true; + + // We can never emit any (non-empty) downstream changes in the future, if the collection is empty + // and the source has reported that it'll never change, so go ahead and complete now. + if (_hasPredicateStateCompleted || ((_itemStates.Count is 0) && _suppressEmptyChangeSets)) + { + Monitor.Enter(DownstreamSynchronizationGate, ref hasDownstreamLock); + + if (hasUpstreamLock) + { + Monitor.Exit(UpstreamSynchronizationGate); + hasUpstreamLock = false; + } + + _downstreamObserver.OnCompleted(); + } + } + finally + { + if (hasUpstreamLock) + Monitor.Exit(UpstreamSynchronizationGate); + + if (hasDownstreamLock) + Monitor.Exit(DownstreamSynchronizationGate); + } + } + + private void OnSourceNext(IChangeSet upstreamChanges) + { + var hasUpstreamLock = false; + var hasDownstreamLock = false; + try + { + Monitor.Enter(UpstreamSynchronizationGate, ref hasUpstreamLock); + + foreach (var change in upstreamChanges) + { + switch (change.Reason) + { + case ListChangeReason.Add: + PerformAdd(change.Item); + break; + + case ListChangeReason.AddRange: + PerformAddRange(change.Range); + break; + + case ListChangeReason.Clear: + if (_itemStates.Count is not 0) + PerformClear(); + break; + + case ListChangeReason.Moved: + if (change.Item.CurrentIndex != change.Item.PreviousIndex) + PerformMove(change.Item); + break; + + case ListChangeReason.Refresh: + PerformRefresh(change.Item); + break; + + case ListChangeReason.Remove: + PerformRemove(change.Item); + break; + + case ListChangeReason.RemoveRange: + PerformRemoveRange(change.Range); + break; + + case ListChangeReason.Replace: + PerformReplace(change.Item); + break; + } + } + + var downstreamChanges = AssembleDownstreamChanges(); + + if ((downstreamChanges.Count is not 0) || !_suppressEmptyChangeSets) + { + Monitor.Enter(DownstreamSynchronizationGate, ref hasDownstreamLock); + + if (hasUpstreamLock) + { + Monitor.Exit(UpstreamSynchronizationGate); + hasUpstreamLock = false; + } + + _downstreamObserver.OnNext(downstreamChanges); + } + } + finally + { + if (hasUpstreamLock) + Monitor.Exit(UpstreamSynchronizationGate); + + if (hasDownstreamLock) + Monitor.Exit(DownstreamSynchronizationGate); + } + } + + protected readonly struct ItemState + { + public required int? FilteredIndex { get; init; } + + public required T Item { get; init; } + } + } + + private sealed class CalculateDiffSubscription + : SubscriptionBase + { + public CalculateDiffSubscription( + IObserver> downstreamObserver, + Func predicate, + bool suppressEmptyChangeSets) + : base( + downstreamObserver: downstreamObserver, + predicate: predicate, + suppressEmptyChangeSets: suppressEmptyChangeSets) + { + } + + protected override void PerformAdd(ItemChange change) + { + var isIncluded = IsLatestPredicateStateValid && Predicate.Invoke(LatestPredicateState, change.Current); + var filteredIndex = default(int?); + + if (isIncluded) + { + filteredIndex = 0; + for (var i = change.CurrentIndex - 1; i >= 0; --i) + { + if (ItemStates[i].FilteredIndex is int priorFilteredIndex) + { + filteredIndex = priorFilteredIndex + 1; + break; + } + } + + DownstreamChangesBuffer.Add(new( + reason: ListChangeReason.Add, + current: change.Current, + index: filteredIndex.Value)); + } + + ItemStates.Insert( + index: change.CurrentIndex, + item: new() + { + FilteredIndex = filteredIndex, + Item = change.Current + }); + + for (var i = change.CurrentIndex + 1; i < ItemStates.Count; ++i) + { + var otherItemState = ItemStates[i]; + if (otherItemState.FilteredIndex is int otherFilteredIndex) + { + ItemStates[i] = otherItemState with + { + FilteredIndex = otherFilteredIndex + 1 + }; + } + } + } + + protected override void PerformAddRange(RangeChange change) + { + var nextFilteredIndex = 0; + for (var i = change.Index - 1; i >= 0; --i) + { + if (ItemStates[i].FilteredIndex is int priorFilteredIndex) + { + nextFilteredIndex = priorFilteredIndex + 1; + break; + } + } + var filteredInsertIndex = nextFilteredIndex; + + foreach (var item in change) + { + var isIncluded = IsLatestPredicateStateValid && Predicate.Invoke(LatestPredicateState, item); + int? filteredIndex = null; + + if (isIncluded) + { + filteredIndex = nextFilteredIndex++; + ItemsBuffer.Add(item); + } + + ItemStatesBuffer.Add(new() + { + FilteredIndex = filteredIndex, + Item = item + }); + } + + if (ItemStatesBuffer.Count is not 0) + { + ItemStates.InsertRange(change.Index, ItemStatesBuffer); + ItemStatesBuffer.Clear(); + + for (var i = change.Index + change.Count; i < ItemStates.Count; ++i) + { + var otherItemState = ItemStates[i]; + if (otherItemState.FilteredIndex is int otherFilteredIndex) + { + ItemStates[i] = otherItemState with + { + FilteredIndex = otherFilteredIndex + 1 + }; + } + } + + if (ItemsBuffer.Count is not 0) + { + DownstreamChangesBuffer.Add(new( + reason: ListChangeReason.AddRange, + items: ItemsBuffer.ToArray(), // The Change constructor does not safety-copy the collection, we need to do it. + index: filteredInsertIndex)); + ItemsBuffer.Clear(); + } + } + } + + protected override void PerformClear() + { + ItemsBuffer.EnsureCapacity(ItemStates.Count); + + foreach (var itemState in ItemStates) + { + if (itemState.FilteredIndex is not null) + ItemsBuffer.Add(itemState.Item); + } + ItemStates.Clear(); + + if (ItemsBuffer.Count is not 0) + { + DownstreamChangesBuffer.Add(new( + reason: ListChangeReason.Clear, + items: ItemsBuffer.ToArray())); // The Change constructor does not safety-copy the collection, we need to do it. + ItemsBuffer.Clear(); + } + } + + protected override void PerformMove(ItemChange change) + { + var itemState = ItemStates[change.PreviousIndex]; + + if (itemState.FilteredIndex is int previousFilteredIndex) + { + int currentFilteredIndex; + // Determine the filtered index to use for the moved item, after the move, by searching backwards from the target location. + // When moving forwards, only search back to the original position of the item, to see if the index needs to change at all, + // and if it does, account for the fact that the filtered index of the items in that range will need to be adjusted by -1. to account for the move. + if (change.CurrentIndex > change.PreviousIndex) + { + currentFilteredIndex = previousFilteredIndex; + for (var i = change.CurrentIndex; i >= change.PreviousIndex; --i) + { + if (ItemStates[i].FilteredIndex is int priorFilteredIndex) + { + currentFilteredIndex = priorFilteredIndex; + break; + } + } + } + // When moving backwards, search to the beginning of the list, where items' filtered indexes will not be changing. + else + { + currentFilteredIndex = 0; + for (var i = change.CurrentIndex - 1; i >= 0; --i) + { + if (ItemStates[i].FilteredIndex is int priorFilteredIndex) + { + currentFilteredIndex = priorFilteredIndex + 1; + break; + } + } + } + + if (currentFilteredIndex != previousFilteredIndex) + { + DownstreamChangesBuffer.Add(new( + current: change.Current, + currentIndex: currentFilteredIndex, + previousIndex: previousFilteredIndex)); + + itemState = itemState with + { + FilteredIndex = currentFilteredIndex + }; + } + } + + ItemStates.RemoveAt(change.PreviousIndex); + ItemStates.Insert(change.CurrentIndex, itemState); + + if (itemState.FilteredIndex is not null) + { + if (change.CurrentIndex < change.PreviousIndex) + { + for (var i = change.CurrentIndex + 1; i <= change.PreviousIndex; ++i) + { + var otherItemState = ItemStates[i]; + if (otherItemState.FilteredIndex is int otherFilteredIndex) + { + ItemStates[i] = otherItemState with + { + FilteredIndex = otherFilteredIndex + 1 + }; + } + } + } + else + { + for (var i = change.PreviousIndex; i < change.CurrentIndex; ++i) + { + var otherItemState = ItemStates[i]; + if (otherItemState.FilteredIndex is int otherFilteredIndex) + { + ItemStates[i] = otherItemState with + { + FilteredIndex = otherFilteredIndex - 1 + }; + } + } + } + } + } + + protected override void PerformReFilter() + { + var nextFilteredIndex = 0; + + for (var unfilteredIndex = 0; unfilteredIndex < ItemStates.Count; ++unfilteredIndex) + { + var itemState = ItemStates[unfilteredIndex]; + + var isIncluded = Predicate.Invoke(LatestPredicateState, itemState.Item); + + if (itemState.FilteredIndex is int filteredIndex) + { + if (isIncluded) + { + if (filteredIndex != nextFilteredIndex) + { + ItemStates[unfilteredIndex] = itemState with + { + FilteredIndex = nextFilteredIndex + }; + } + ++nextFilteredIndex; + } + else + { + ItemStates[unfilteredIndex] = itemState with + { + FilteredIndex = null + }; + + DownstreamChangesBuffer.Add(new( + reason: ListChangeReason.Remove, + current: itemState.Item, + index: nextFilteredIndex)); + } + } + else if (isIncluded) + { + DownstreamChangesBuffer.Add(new( + reason: ListChangeReason.Add, + current: itemState.Item, + index: nextFilteredIndex)); + + ItemStates[unfilteredIndex] = new() + { + FilteredIndex = nextFilteredIndex, + Item = itemState.Item + }; + + ++nextFilteredIndex; + } + } + } + + protected override void PerformRefresh(ItemChange change) + { + var itemState = ItemStates[change.CurrentIndex]; + var isIncluded = IsLatestPredicateStateValid && Predicate.Invoke(LatestPredicateState, change.Current); + + if (itemState.FilteredIndex is int filteredIndex) + { + if (isIncluded) + { + DownstreamChangesBuffer.Add(new( + reason: ListChangeReason.Refresh, + current: change.Current, + index: filteredIndex)); + } + else + { + DownstreamChangesBuffer.Add(new( + reason: ListChangeReason.Remove, + current: change.Current, + index: filteredIndex)); + + ItemStates[change.CurrentIndex] = new() + { + FilteredIndex = null, + Item = change.Current + }; + + for (var i = change.CurrentIndex + 1; i < ItemStates.Count; ++i) + { + var otherItemState = ItemStates[i]; + if (otherItemState.FilteredIndex is int otherFilteredIndex) + { + ItemStates[i] = otherItemState with + { + FilteredIndex = otherFilteredIndex - 1 + }; + } + } + } + } + else if (isIncluded) + { + filteredIndex = 0; + for (var i = change.CurrentIndex - 1; i >= 0; --i) + { + if (ItemStates[i].FilteredIndex is int priorFilteredIndex) + { + filteredIndex = priorFilteredIndex + 1; + break; + } + } + + DownstreamChangesBuffer.Add(new( + reason: ListChangeReason.Add, + current: change.Current, + index: filteredIndex)); + + ItemStates[change.CurrentIndex] = new() + { + FilteredIndex = filteredIndex, + Item = change.Current + }; + + for (var i = change.CurrentIndex + 1; i < ItemStates.Count; ++i) + { + var otherItemState = ItemStates[i]; + if (otherItemState.FilteredIndex is int otherFilteredIndex) + { + ItemStates[i] = otherItemState with + { + FilteredIndex = otherFilteredIndex + 1 + }; + } + } + } + } + + protected override void PerformRemove(ItemChange change) + { + var itemState = ItemStates[change.CurrentIndex]; + ItemStates.RemoveAt(change.CurrentIndex); + + if (itemState.FilteredIndex is int filteredIndex) + { + for (var i = change.CurrentIndex; i < ItemStates.Count; ++i) + { + var otherItemState = ItemStates[i]; + if (otherItemState.FilteredIndex is int otherFilteredIndex) + { + ItemStates[i] = otherItemState with + { + FilteredIndex = otherFilteredIndex - 1 + }; + } + } + + DownstreamChangesBuffer.Add(new( + reason: ListChangeReason.Remove, + current: change.Current, + index: filteredIndex)); + } + } + + protected override void PerformRemoveRange(RangeChange change) + { + ItemsBuffer.EnsureCapacity(change.Count); + var filteredRangeIndex = -1; + + for (var i = change.Index; i < change.Index + change.Count; ++i) + { + var itemState = ItemStates[i]; + if (itemState.FilteredIndex is int filteredIndex) + { + if (filteredRangeIndex is -1) + filteredRangeIndex = filteredIndex; + + ItemsBuffer.Add(itemState.Item); + } + } + + ItemStates.RemoveRange(change.Index, change.Count); + + if (ItemsBuffer.Count is not 0) + { + DownstreamChangesBuffer.Add(new( + reason: ListChangeReason.RemoveRange, + items: ItemsBuffer.ToArray(), // The Change constructor does not clone the collection, we need to do it. + index: filteredRangeIndex)); + + for (var i = change.Index; i < ItemStates.Count; ++i) + { + var otherItemState = ItemStates[i]; + if (otherItemState.FilteredIndex is int otherFilteredIndex) + { + ItemStates[i] = otherItemState with + { + FilteredIndex = otherFilteredIndex - ItemsBuffer.Count + }; + } + } + + ItemsBuffer.Clear(); + } + } + + protected override void PerformReplace(ItemChange change) + { + var itemState = ItemStates[change.CurrentIndex]; + var isIncluded = IsLatestPredicateStateValid && Predicate.Invoke(LatestPredicateState, change.Current); + + if (itemState.FilteredIndex is int filteredIndex) + { + if (isIncluded) + { + DownstreamChangesBuffer.Add(new( + reason: ListChangeReason.Replace, + current: change.Current, + previous: change.Previous, + currentIndex: filteredIndex, + previousIndex: filteredIndex)); + + ItemStates[change.CurrentIndex] = itemState with + { + Item = change.Current + }; + } + else + { + DownstreamChangesBuffer.Add(new( + reason: ListChangeReason.Remove, + current: change.Previous.Value, + index: filteredIndex)); + + ItemStates[change.CurrentIndex] = new() + { + FilteredIndex = null, + Item = change.Current + }; + + for (var i = change.CurrentIndex + 1; i < ItemStates.Count; ++i) + { + var otherItemState = ItemStates[i]; + if (otherItemState.FilteredIndex is int otherFilteredIndex) + { + ItemStates[i] = otherItemState with + { + FilteredIndex = otherFilteredIndex - 1 + }; + } + } + } + } + else + { + if (isIncluded) + { + filteredIndex = 0; + for (var i = change.CurrentIndex - 1; i >= 0; --i) + { + if (ItemStates[i].FilteredIndex is int priorFilteredIndex) + { + filteredIndex = priorFilteredIndex + 1; + break; + } + } + + DownstreamChangesBuffer.Add(new( + reason: ListChangeReason.Add, + current: change.Current, + index: filteredIndex)); + + ItemStates[change.CurrentIndex] = new() + { + FilteredIndex = filteredIndex, + Item = change.Current + }; + + for (var i = change.CurrentIndex + 1; i < ItemStates.Count; ++i) + { + var otherItemState = ItemStates[i]; + if (otherItemState.FilteredIndex is int otherFilteredIndex) + { + ItemStates[i] = otherItemState with + { + FilteredIndex = otherFilteredIndex + 1 + }; + } + } + } + else + { + ItemStates[change.CurrentIndex] = itemState with + { + Item = change.Current + }; + } + } + } + } + + private sealed class ClearAndReplaceSubscription + : SubscriptionBase + { + private int _filteredCount; + + public ClearAndReplaceSubscription( + IObserver> downstreamObserver, + Func predicate, + bool suppressEmptyChangeSets) + : base( + downstreamObserver: downstreamObserver, + predicate: predicate, + suppressEmptyChangeSets: suppressEmptyChangeSets) + { + } + + protected override void PerformAdd(ItemChange change) + { + var isIncluded = IsLatestPredicateStateValid && Predicate.Invoke(LatestPredicateState, change.Current); + var filteredIndex = default(int?); + + if (isIncluded) + { + filteredIndex = _filteredCount++; + DownstreamChangesBuffer.Add(new( + reason: ListChangeReason.Add, + current: change.Current, + index: filteredIndex.Value)); + } + + ItemStates.Insert( + index: change.CurrentIndex, + item: new() + { + FilteredIndex = filteredIndex, + Item = change.Current + }); + } + + protected override void PerformAddRange(RangeChange change) + { + var priorFilteredCount = _filteredCount; + + foreach (var item in change) + { + var isIncluded = IsLatestPredicateStateValid && Predicate.Invoke(LatestPredicateState, item); + int? filteredIndex = null; + + if (isIncluded) + { + filteredIndex = _filteredCount++; + ItemsBuffer.Add(item); + } + + ItemStatesBuffer.Add(new() + { + FilteredIndex = filteredIndex, + Item = item + }); + } + + if (ItemStatesBuffer.Count is not 0) + { + ItemStates.InsertRange(change.Index, ItemStatesBuffer); + ItemStatesBuffer.Clear(); + + if (ItemsBuffer.Count is not 0) + { + DownstreamChangesBuffer.Add(new( + reason: ListChangeReason.AddRange, + items: ItemsBuffer.ToArray(), // The Change constructor does not clone the collection, we need to do it. + index: priorFilteredCount)); + ItemsBuffer.Clear(); + } + } + } + + protected override void PerformClear() + { + // Not using ItemsBuffer, because we already know the exact size we need, so we can allocate a fresh one and use it directly. + var itemsBuffer = new T[_filteredCount]; + + foreach (var itemState in ItemStates) + { + if (itemState.FilteredIndex is int filteredIndex) + itemsBuffer[filteredIndex] = itemState.Item; + } + ItemStates.Clear(); + _filteredCount = 0; + + if (itemsBuffer.Length is not 0) + { + DownstreamChangesBuffer.Add(new( + reason: ListChangeReason.Clear, + items: itemsBuffer)); + } + } + + protected override void PerformMove(ItemChange change) + { + // We're not supporting propagation of move changes, but we do still need to process them, to keep ItemStates correct. + var itemState = ItemStates[change.PreviousIndex]; + ItemStates.RemoveAt(change.PreviousIndex); + ItemStates.Insert(change.CurrentIndex, itemState); + } + + protected override void PerformReFilter() + { + var nextFilteredIndex = 0; + var clearedItems = (_filteredCount is 0) + ? Array.Empty() + : new T[_filteredCount]; + + for (var unfilteredIndex = 0; unfilteredIndex < ItemStates.Count; ++unfilteredIndex) + { + var itemState = ItemStates[unfilteredIndex]; + + var isIncluded = Predicate.Invoke(LatestPredicateState, itemState.Item); + + if (itemState.FilteredIndex is int filteredIndex) + { + clearedItems[filteredIndex] = itemState.Item; + + if (isIncluded) + { + ItemsBuffer.Add(itemState.Item); + + if (filteredIndex != nextFilteredIndex) + { + ItemStates[unfilteredIndex] = itemState with + { + FilteredIndex = nextFilteredIndex + }; + } + + ++nextFilteredIndex; + } + else + { + --_filteredCount; + ItemStates[unfilteredIndex] = itemState with + { + FilteredIndex = null + }; + } + } + else if (isIncluded) + { + ++_filteredCount; + ItemStates[unfilteredIndex] = itemState with + { + FilteredIndex = nextFilteredIndex + }; + + ItemsBuffer.Add(itemState.Item); + + ++nextFilteredIndex; + } + } + + if (clearedItems.Length is not 0) + { + DownstreamChangesBuffer.Add(new( + reason: ListChangeReason.Clear, + items: clearedItems)); + } + + if (ItemsBuffer.Count is not 0) + { + DownstreamChangesBuffer.Add(new( + reason: ListChangeReason.AddRange, + items: ItemsBuffer.ToArray(), // The Change constructor does not safety-copy the collection, we need to do it. + index: 0)); + ItemsBuffer.Clear(); + } + } + + protected override void PerformRefresh(ItemChange change) + { + var itemState = ItemStates[change.CurrentIndex]; + var isIncluded = IsLatestPredicateStateValid && Predicate.Invoke(LatestPredicateState, change.Current); + + if (itemState.FilteredIndex is int filteredIndex) + { + if (isIncluded) + { + DownstreamChangesBuffer.Add(new( + reason: ListChangeReason.Refresh, + current: change.Current, + index: filteredIndex)); + } + else + { + DownstreamChangesBuffer.Add(new( + reason: ListChangeReason.Remove, + current: change.Current, + index: filteredIndex)); + + ItemStates[change.CurrentIndex] = new() + { + FilteredIndex = null, + Item = change.Current + }; + --_filteredCount; + + for (var i = 0; i < ItemStates.Count; ++i) + { + var otherItemState = ItemStates[i]; + if ((otherItemState.FilteredIndex is int otherFilteredIndex) && (otherItemState.FilteredIndex > filteredIndex)) + { + ItemStates[i] = otherItemState with + { + FilteredIndex = otherFilteredIndex - 1 + }; + } + } + } + } + else if (isIncluded) + { + filteredIndex = _filteredCount++; + DownstreamChangesBuffer.Add(new( + reason: ListChangeReason.Add, + current: change.Current, + index: filteredIndex)); + + ItemStates[change.CurrentIndex] = new() + { + FilteredIndex = filteredIndex, + Item = change.Current + }; + } + } + + protected override void PerformRemove(ItemChange change) + { + var itemState = ItemStates[change.CurrentIndex]; + ItemStates.RemoveAt(change.CurrentIndex); + + if (itemState.FilteredIndex is int filteredIndex) + { + --_filteredCount; + + for (var i = 0; i < ItemStates.Count; ++i) + { + var otherItemState = ItemStates[i]; + if ((otherItemState.FilteredIndex is int otherFilteredIndex) && (otherItemState.FilteredIndex > filteredIndex)) + { + ItemStates[i] = otherItemState with + { + FilteredIndex = --otherFilteredIndex + }; + } + } + + DownstreamChangesBuffer.Add(new( + reason: ListChangeReason.Remove, + current: change.Current, + index: filteredIndex)); + } + } + + protected override void PerformRemoveRange(RangeChange change) + { + for (var index = change.Index; index < change.Index + change.Count; ++index) + { + var itemState = ItemStates[index]; + if (itemState.FilteredIndex is int filteredIndex) + { + DownstreamChangesBuffer.Add(new( + reason: ListChangeReason.Remove, + current: itemState.Item, + index: filteredIndex)); + + for (var i = 0; i < ItemStates.Count; ++i) + { + var otherItemState = ItemStates[i]; + if ((otherItemState.FilteredIndex is int otherFilteredIndex) && (otherItemState.FilteredIndex > filteredIndex)) + { + ItemStates[i] = otherItemState with + { + FilteredIndex = otherFilteredIndex - 1 + }; + } + } + --_filteredCount; + } + } + + ItemStates.RemoveRange(change.Index, change.Count); + } + + protected override void PerformReplace(ItemChange change) + { + var itemState = ItemStates[change.CurrentIndex]; + var isIncluded = IsLatestPredicateStateValid && Predicate.Invoke(LatestPredicateState, change.Current); + + if (itemState.FilteredIndex is int filteredIndex) + { + if (isIncluded) + { + DownstreamChangesBuffer.Add(new( + reason: ListChangeReason.Replace, + current: change.Current, + previous: change.Previous, + currentIndex: filteredIndex, + previousIndex: filteredIndex)); + + ItemStates[change.CurrentIndex] = itemState with + { + Item = change.Current + }; + } + else + { + DownstreamChangesBuffer.Add(new( + reason: ListChangeReason.Remove, + current: change.Previous.Value, + index: filteredIndex)); + + ItemStates[change.CurrentIndex] = new() + { + FilteredIndex = null, + Item = change.Current + }; + --_filteredCount; + + for (var i = 0; i < ItemStates.Count; ++i) + { + var otherItemState = ItemStates[i]; + if ((otherItemState.FilteredIndex is int otherFilteredIndex) && (otherItemState.FilteredIndex > filteredIndex)) + { + ItemStates[i] = otherItemState with + { + FilteredIndex = otherFilteredIndex - 1 + }; + } + } + } + } + else if (isIncluded) + { + filteredIndex = _filteredCount++; + DownstreamChangesBuffer.Add(new( + reason: ListChangeReason.Add, + current: change.Current, + index: filteredIndex)); + + ItemStates[change.CurrentIndex] = new() + { + FilteredIndex = filteredIndex, + Item = change.Current + }; + } + else + { + ItemStates[change.CurrentIndex] = itemState with + { + Item = change.Current + }; + } + } + } + } +} diff --git a/src/DynamicData/List/ObservableListEx.cs b/src/DynamicData/List/ObservableListEx.cs index 5a2c409f1..64266d16b 100644 --- a/src/DynamicData/List/ObservableListEx.cs +++ b/src/DynamicData/List/ObservableListEx.cs @@ -712,6 +712,35 @@ public static IObservable> Filter(this IObservable(source, predicate, filterPolicy).Run(); } + /// + /// Creates a filtered stream which can be dynamically filtered, based on state values passed through to a static filtering predicate. + /// + /// The type of the item. + /// The type of state value required by . + /// The source. + /// A stream of state values to be passed to . + /// A static predicate to be used to determine which items should be included or excluded by the filter. + /// The policy that the operator should use when performing re-filtering operations. + /// By default empty changeset notifications are suppressed for performance reasons. Set to false to publish empty changesets. Doing so can be useful for monitoring loading status. + /// An observable which emits change sets. + /// Throws for , , and . + /// + /// Usually, should emit an initial value, immediately upon subscription. This is because cannot be invoked until the first state value is received, and accordingly, the operator will treat all items as excluded until then. Each value emitted by will trigger a full re-filtering of the entire collection, according to . + /// + public static IObservable> Filter( + this IObservable> source, + IObservable predicateState, + Func predicate, + ListFilterPolicy filterPolicy = ListFilterPolicy.CalculateDiff, + bool suppressEmptyChangeSets = true) + where T : notnull + => List.Internal.Filter.WithPredicateState.Create( + source: source, + predicateState: predicateState, + predicate: predicate, + filterPolicy: filterPolicy, + suppressEmptyChangeSets: suppressEmptyChangeSets); + /// /// Filters source on the specified observable property using the specified predicate. /// The filter will automatically reapply when a property changes. diff --git a/src/DynamicData/Polyfills/EnumEx.cs b/src/DynamicData/Polyfills/EnumEx.cs new file mode 100644 index 000000000..fba421293 --- /dev/null +++ b/src/DynamicData/Polyfills/EnumEx.cs @@ -0,0 +1,18 @@ +// Copyright (c) 2011-2023 Roland Pheasant. All rights reserved. +// Roland Pheasant licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +namespace System; + +internal static class EnumEx +{ + #if NET5_0_OR_GREATER + public static bool IsDefined(TEnum value) + where TEnum : struct, Enum + => Enum.IsDefined(value); + #else + public static bool IsDefined(TEnum value) + where TEnum : struct, Enum + => Enum.IsDefined(typeof(TEnum), value); + #endif +}