From c944f7d76f605ff7c22c653365761915405861b4 Mon Sep 17 00:00:00 2001 From: halgari Date: Thu, 12 Sep 2024 10:08:12 -0600 Subject: [PATCH 1/2] Switch the guts of the ObservableDatoms system over to R3 to avoid a O(n) issue in `BehaviorSubject` in Rx --- .../IDatomStore.cs | 4 ++-- .../NexusMods.MnemonicDB.Abstractions.csproj | 1 + src/NexusMods.MnemonicDB/Connection.cs | 18 ++++++++++-------- .../NexusMods.MnemonicDB.csproj | 1 + src/NexusMods.MnemonicDB/Storage/DatomStore.cs | 4 ++-- 5 files changed, 16 insertions(+), 12 deletions(-) diff --git a/src/NexusMods.MnemonicDB.Abstractions/IDatomStore.cs b/src/NexusMods.MnemonicDB.Abstractions/IDatomStore.cs index dc2f5699..a0214c78 100644 --- a/src/NexusMods.MnemonicDB.Abstractions/IDatomStore.cs +++ b/src/NexusMods.MnemonicDB.Abstractions/IDatomStore.cs @@ -1,10 +1,10 @@ using System; using System.Collections.Generic; -using System.Threading; using System.Threading.Tasks; using NexusMods.MnemonicDB.Abstractions.IndexSegments; using NexusMods.MnemonicDB.Abstractions.Internals; using NexusMods.MnemonicDB.Abstractions.TxFunctions; +using R3; namespace NexusMods.MnemonicDB.Abstractions; @@ -17,7 +17,7 @@ public interface IDatomStore : IDisposable /// An observable of the transaction log, for getting the latest changes to the store. This observable /// will always start with the most recent value, so there is no reason to use `StartWith` or `Replay` on it. /// - public IObservable TxLog { get; } + public Observable TxLog { get; } /// /// Gets the latest transaction id found in the log. diff --git a/src/NexusMods.MnemonicDB.Abstractions/NexusMods.MnemonicDB.Abstractions.csproj b/src/NexusMods.MnemonicDB.Abstractions/NexusMods.MnemonicDB.Abstractions.csproj index a00844ef..a9b0d789 100644 --- a/src/NexusMods.MnemonicDB.Abstractions/NexusMods.MnemonicDB.Abstractions.csproj +++ b/src/NexusMods.MnemonicDB.Abstractions/NexusMods.MnemonicDB.Abstractions.csproj @@ -8,6 +8,7 @@ + diff --git a/src/NexusMods.MnemonicDB/Connection.cs b/src/NexusMods.MnemonicDB/Connection.cs index 7e9ef552..8413ace4 100644 --- a/src/NexusMods.MnemonicDB/Connection.cs +++ b/src/NexusMods.MnemonicDB/Connection.cs @@ -1,7 +1,6 @@ using System; using System.Collections.Generic; using System.Linq; -using System.Reactive.Linq; using System.Reactive.Subjects; using System.Threading; using System.Threading.Tasks; @@ -13,6 +12,9 @@ using NexusMods.MnemonicDB.Abstractions.Query; using NexusMods.MnemonicDB.Abstractions.TxFunctions; using NexusMods.MnemonicDB.Storage; +using R3; +using Observable = System.Reactive.Linq.Observable; +using ObservableExtensions = R3.ObservableExtensions; namespace NexusMods.MnemonicDB; @@ -25,7 +27,7 @@ public class Connection : IConnection private readonly Dictionary _declaredAttributes; private readonly ILogger _logger; - private BehaviorSubject _dbStream; + private R3.BehaviorSubject _dbStream; private IDisposable? _dbStreamDisposable; private readonly IAnalyzer[] _analyzers; @@ -38,7 +40,7 @@ public Connection(ILogger logger, IDatomStore store, IServiceProvide _logger = logger; _declaredAttributes = declaredAttributes.ToDictionary(a => a.Id); _store = store; - _dbStream = new BehaviorSubject(default!); + _dbStream = new R3.BehaviorSubject(default!); _analyzers = analyzers.ToArray(); Bootstrap(); } @@ -46,11 +48,11 @@ public Connection(ILogger logger, IDatomStore store, IServiceProvide /// /// Scrubs the transaction stream so that we only ever move forward and never repeat transactions /// - private IObservable ProcessUpdate(IObservable dbStream) + private R3.Observable ProcessUpdate(R3.Observable dbStream) { IDb? prev = null; - return Observable.Create((IObserver observer) => + return R3.Observable.Create((Observer observer) => { return dbStream.Subscribe(nextItem => { @@ -76,7 +78,7 @@ private IObservable ProcessUpdate(IObservable dbStream) observer.OnNext((Db)nextItem); prev = nextItem; - }, observer.OnError, observer.OnCompleted); + }, observer.OnCompleted); }); } @@ -134,7 +136,7 @@ public IObservable Revisions { if (_dbStream == default!) ThrowNullDb(); - return _dbStream!; + return ObservableExtensions.AsSystemObservable(_dbStream!); } } @@ -198,7 +200,7 @@ private void Bootstrap() AddMissingAttributes(_declaredAttributes.Values); _dbStreamDisposable = ProcessUpdate(_store.TxLog) - .Subscribe(_dbStream); + .Subscribe(itm => _dbStream.OnNext(itm)); } catch (Exception ex) { diff --git a/src/NexusMods.MnemonicDB/NexusMods.MnemonicDB.csproj b/src/NexusMods.MnemonicDB/NexusMods.MnemonicDB.csproj index 40e2afb9..8085b805 100644 --- a/src/NexusMods.MnemonicDB/NexusMods.MnemonicDB.csproj +++ b/src/NexusMods.MnemonicDB/NexusMods.MnemonicDB.csproj @@ -7,6 +7,7 @@ + diff --git a/src/NexusMods.MnemonicDB/Storage/DatomStore.cs b/src/NexusMods.MnemonicDB/Storage/DatomStore.cs index cf628fff..7026cc62 100644 --- a/src/NexusMods.MnemonicDB/Storage/DatomStore.cs +++ b/src/NexusMods.MnemonicDB/Storage/DatomStore.cs @@ -3,7 +3,6 @@ using System.Collections.Generic; using System.Diagnostics; using System.Linq; -using System.Reactive.Subjects; using System.Runtime.InteropServices; using System.Threading; using System.Threading.Tasks; @@ -18,6 +17,7 @@ using NexusMods.MnemonicDB.Abstractions.TxFunctions; using NexusMods.MnemonicDB.Storage.Abstractions; using NexusMods.MnemonicDB.Storage.DatomStorageStructures; +using R3; using Reloaded.Memory.Extensions; namespace NexusMods.MnemonicDB.Storage; @@ -158,7 +158,7 @@ public DatomStore(ILogger logger, AttributeRegistry registry, DatomS } /// - public IObservable TxLog + public Observable TxLog { get { From 2e20f02f23c696c6f3ce6819cad3d4a5026e421e Mon Sep 17 00:00:00 2001 From: halgari Date: Thu, 12 Sep 2024 10:10:35 -0600 Subject: [PATCH 2/2] Update commit message --- CHANGELOG.md | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 739013a0..73449998 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,10 @@ ## Changelog -### 0.9.81 - 19/09/2024 +### 0.9.82 - 12/09/2024 +* Fix a O(n) issue caused by Rx storing observers in a ImmutableList inside a `BehaviorSubject`. Switched to using R3 internally. Over +time Rx's uses will be replaced with R3 to avoid these and several other issues + +### 0.9.81 - 9/09/2024 * Fix a bug the source generators when trying to use HashedBlobAttributes ### 0.9.80 - 22/08/2024