From a0ae61707a9ccb6cb329a28ce1f53179c2c6bd2d Mon Sep 17 00:00:00 2001 From: Atralupus Date: Fri, 23 Feb 2024 17:39:58 +0900 Subject: [PATCH] Implement bulk store --- .../Models/State/ArenaData.cs | 24 ++-- .../Models/State/AvataData.cs | 17 +-- .../Models/State/BaseData.cs | 20 +++ .../Scrapper/ArenaScrapper.cs | 3 +- .../Services/MongoDbStore.cs | 119 ++++++++++++++++-- .../Util/JsonConverter.cs | 2 +- NineChroniclesUtilBackend.Store/Worker.cs | 5 +- 7 files changed, 144 insertions(+), 46 deletions(-) create mode 100644 NineChroniclesUtilBackend.Store/Models/State/BaseData.cs diff --git a/NineChroniclesUtilBackend.Store/Models/State/ArenaData.cs b/NineChroniclesUtilBackend.Store/Models/State/ArenaData.cs index bb2b3da4..cca415b0 100644 --- a/NineChroniclesUtilBackend.Store/Models/State/ArenaData.cs +++ b/NineChroniclesUtilBackend.Store/Models/State/ArenaData.cs @@ -1,29 +1,21 @@ using Nekoyume.Model.Arena; -using Newtonsoft.Json; -using NineChroniclesUtilBackend.Store.Util; +using Nekoyume.TableData; +using Libplanet.Crypto; namespace NineChroniclesUtilBackend.Store.Models; -public class ArenaData +public class ArenaData : BaseData { public ArenaScore Score { get; } public ArenaInformation Information { get; } + public ArenaSheet.RoundData RoundData { get; } + public Address AvatarAddress { get; } - public ArenaData(ArenaScore score, ArenaInformation information) + public ArenaData(ArenaScore score, ArenaInformation information, ArenaSheet.RoundData roundData, Address avatarAddress) { Score = score; Information = information; - } - - public string ToJson() - { - var settings = new JsonSerializerSettings - { - Converters = new[] { new BigIntegerToStringConverter() }, - Formatting = Formatting.Indented - }; - - string jsonString = JsonConvert.SerializeObject(this, settings); - return jsonString; + RoundData = roundData; + AvatarAddress = avatarAddress; } } \ No newline at end of file diff --git a/NineChroniclesUtilBackend.Store/Models/State/AvataData.cs b/NineChroniclesUtilBackend.Store/Models/State/AvataData.cs index c6661974..eeac763e 100644 --- a/NineChroniclesUtilBackend.Store/Models/State/AvataData.cs +++ b/NineChroniclesUtilBackend.Store/Models/State/AvataData.cs @@ -1,11 +1,8 @@ using Nekoyume.Model.State; -using Newtonsoft.Json; -using NineChroniclesUtilBackend.Store.Util; - namespace NineChroniclesUtilBackend.Store.Models; -public class AvatarData +public class AvatarData : BaseData { public AvatarState Avatar { get; } public ItemSlotState ItemSlot { get; } @@ -17,16 +14,4 @@ public AvatarData(AvatarState avatar, ItemSlotState itemSlot, List ru ItemSlot = itemSlot; RuneSlot = runeSlot; } - - public string ToJson() - { - var settings = new JsonSerializerSettings - { - Converters = new[] { new BigIntegerToStringConverter() }, - Formatting = Formatting.Indented - }; - - string jsonString = JsonConvert.SerializeObject(this, settings); - return jsonString; - } } \ No newline at end of file diff --git a/NineChroniclesUtilBackend.Store/Models/State/BaseData.cs b/NineChroniclesUtilBackend.Store/Models/State/BaseData.cs new file mode 100644 index 00000000..bb006448 --- /dev/null +++ b/NineChroniclesUtilBackend.Store/Models/State/BaseData.cs @@ -0,0 +1,20 @@ +using Newtonsoft.Json; +using NineChroniclesUtilBackend.Store.Util; + +namespace NineChroniclesUtilBackend.Store.Models; + +public class BaseData +{ + protected static JsonSerializerSettings JsonSerializerSettings => new JsonSerializerSettings + { + Converters = new[] { new BigIntegerToStringConverter() }, + Formatting = Formatting.Indented, + // ContractResolver = new Newtonsoft.Json.Serialization.CamelCasePropertyNamesContractResolver(), + NullValueHandling = NullValueHandling.Ignore + }; + + public string ToJson() + { + return JsonConvert.SerializeObject(this, JsonSerializerSettings); + } +} diff --git a/NineChroniclesUtilBackend.Store/Scrapper/ArenaScrapper.cs b/NineChroniclesUtilBackend.Store/Scrapper/ArenaScrapper.cs index ba7e0000..69e0d195 100644 --- a/NineChroniclesUtilBackend.Store/Scrapper/ArenaScrapper.cs +++ b/NineChroniclesUtilBackend.Store/Scrapper/ArenaScrapper.cs @@ -33,7 +33,6 @@ public async Task ExecuteAsync() var avatarData = await GetAvatarData(avatarAddress); RaiseDataCollected(arenaData, avatarData); - break; } } @@ -50,7 +49,7 @@ public async Task GetArenaData(ArenaSheet.RoundData roundData, Addres var arenaScore = await _stateGetter.GetArenaScoreState(avatarAddress, roundData.ChampionshipId, roundData.Round); var arenaInfo = await _stateGetter.GetArenaInfoState(avatarAddress, roundData.ChampionshipId, roundData.Round); - return new ArenaData(arenaScore, arenaInfo); + return new ArenaData(arenaScore, arenaInfo, roundData, avatarAddress); } public async Task GetAvatarData(Address avatarAddress) diff --git a/NineChroniclesUtilBackend.Store/Services/MongoDbStore.cs b/NineChroniclesUtilBackend.Store/Services/MongoDbStore.cs index 30c72838..9a3c6843 100644 --- a/NineChroniclesUtilBackend.Store/Services/MongoDbStore.cs +++ b/NineChroniclesUtilBackend.Store/Services/MongoDbStore.cs @@ -1,6 +1,9 @@ using MongoDB.Bson; using MongoDB.Driver; using NineChroniclesUtilBackend.Store.Models; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Threading.Tasks; namespace NineChroniclesUtilBackend.Store.Services; @@ -8,6 +11,9 @@ public class MongoDbStore { private readonly IMongoCollection _arenaCollection; private readonly IMongoCollection _avatarCollection; + private BlockingCollection _arenaDataQueue = new BlockingCollection(); + private BlockingCollection _avatarDataQueue = new BlockingCollection(); + private readonly int _batchSize = 100; public MongoDbStore(string connectionString, string databaseName) { @@ -16,19 +22,114 @@ public MongoDbStore(string connectionString, string databaseName) _arenaCollection = database.GetCollection("arena"); _avatarCollection = database.GetCollection("avatars"); + + Task.Run(() => ProcessArenaDataAsync()); + + Task.Run(() => ProcessAvatarDataAsync()); + } + + public void AddArenaData(ArenaData arenaData) + { + _arenaDataQueue.Add(arenaData); + } + + public void AddAvatarData(AvatarData avatarData) + { + _avatarDataQueue.Add(avatarData); + } + + private async Task ProcessArenaDataAsync() + { + List batch = new List(); + foreach (var arenaData in _arenaDataQueue.GetConsumingEnumerable()) + { + batch.Add(arenaData); + if (batch.Count >= _batchSize) + { + await BulkUpsertArenaDataAsync(batch); + batch.Clear(); + } + } + + if (batch.Count > 0) + { + await BulkUpsertArenaDataAsync(batch); + } + } + + private async Task ProcessAvatarDataAsync() + { + List batch = new List(); + foreach (var avatarData in _avatarDataQueue.GetConsumingEnumerable()) + { + batch.Add(avatarData); + if (batch.Count >= _batchSize) + { + await BulkUpsertAvatarDataAsync(batch); + batch.Clear(); + } + } + + if (batch.Count > 0) + { + await BulkUpsertAvatarDataAsync(batch); + } } - public async Task SaveArenaDataAsync(ArenaData arenaData) + private async Task BulkUpsertArenaDataAsync(List arenaDatas) { - var jsonString = arenaData.ToJson(); - var bsonDocument = BsonDocument.Parse(jsonString); - await _arenaCollection.InsertOneAsync(bsonDocument); + var bulkOps = new List>(); + foreach (var arenaData in arenaDatas) + { + var filter = Builders.Filter.Eq("avatarAddress", arenaData.AvatarAddress.ToHex()); + var bsonDocument = BsonDocument.Parse(arenaData.ToJson()); + var upsertOne = new ReplaceOneModel(filter, bsonDocument) { IsUpsert = true }; + bulkOps.Add(upsertOne); + } + if (bulkOps.Count > 0) + { + await _arenaCollection.BulkWriteAsync(bulkOps); + } } - public async Task SaveAvatarDataAsync(AvatarData avatarData) + private async Task BulkUpsertAvatarDataAsync(List avatarDatas) + { + var bulkOps = new List>(); + foreach (var avatarData in avatarDatas) + { + var filter = Builders.Filter.Eq("avatar.address", avatarData.Avatar.address.ToHex()); + var bsonDocument = BsonDocument.Parse(avatarData.ToJson()); + var upsertOne = new ReplaceOneModel(filter, bsonDocument) { IsUpsert = true }; + bulkOps.Add(upsertOne); + } + if (bulkOps.Count > 0) + { + await _avatarCollection.BulkWriteAsync(bulkOps); + } + } + public async Task FlushAsync() { - var jsonString = avatarData.ToJson(); - var bsonDocument = BsonDocument.Parse(jsonString); - await _avatarCollection.InsertOneAsync(bsonDocument); + _arenaDataQueue.CompleteAdding(); + _avatarDataQueue.CompleteAdding(); + + var remainingArenaDatas = new List(); + while (_arenaDataQueue.TryTake(out var arenaData)) + { + remainingArenaDatas.Add(arenaData); + } + if (remainingArenaDatas.Any()) + { + await BulkUpsertArenaDataAsync(remainingArenaDatas); + } + + var remainingAvatarDatas = new List(); + while (_avatarDataQueue.TryTake(out var avatarData)) + { + remainingAvatarDatas.Add(avatarData); + } + if (remainingAvatarDatas.Any()) + { + await BulkUpsertAvatarDataAsync(remainingAvatarDatas); + } } -} +} \ No newline at end of file diff --git a/NineChroniclesUtilBackend.Store/Util/JsonConverter.cs b/NineChroniclesUtilBackend.Store/Util/JsonConverter.cs index fa433805..bd351e10 100644 --- a/NineChroniclesUtilBackend.Store/Util/JsonConverter.cs +++ b/NineChroniclesUtilBackend.Store/Util/JsonConverter.cs @@ -19,4 +19,4 @@ public override void WriteJson(JsonWriter writer, object value, JsonSerializer s { writer.WriteValue(value.ToString()); } -} \ No newline at end of file +} diff --git a/NineChroniclesUtilBackend.Store/Worker.cs b/NineChroniclesUtilBackend.Store/Worker.cs index 64c62cbc..9ab928e7 100644 --- a/NineChroniclesUtilBackend.Store/Worker.cs +++ b/NineChroniclesUtilBackend.Store/Worker.cs @@ -28,13 +28,14 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) } await _scrapper.ExecuteAsync(); + await _store.FlushAsync(); } private async void HandleDataCollected(object sender, ArenaDataCollectedEventArgs e) { _logger.LogInformation("{avatarAddress} Data Collected", e.AvatarData.Avatar.address); - await _store.SaveArenaDataAsync(e.ArenaData); - await _store.SaveAvatarDataAsync(e.AvatarData); + _store.AddArenaData(e.ArenaData); + _store.AddAvatarData(e.AvatarData); } }