Skip to content

Commit

Permalink
Implement bulk store
Browse files Browse the repository at this point in the history
  • Loading branch information
Atralupus committed Feb 23, 2024
1 parent aae8e1e commit a0ae617
Show file tree
Hide file tree
Showing 7 changed files with 144 additions and 46 deletions.
24 changes: 8 additions & 16 deletions NineChroniclesUtilBackend.Store/Models/State/ArenaData.cs
Original file line number Diff line number Diff line change
@@ -1,29 +1,21 @@
using Nekoyume.Model.Arena;
using Newtonsoft.Json;
using NineChroniclesUtilBackend.Store.Util;
using Nekoyume.TableData;
using Libplanet.Crypto;

namespace NineChroniclesUtilBackend.Store.Models;

public class ArenaData
public class ArenaData : BaseData
{
public ArenaScore Score { get; }
public ArenaInformation Information { get; }
public ArenaSheet.RoundData RoundData { get; }
public Address AvatarAddress { get; }

public ArenaData(ArenaScore score, ArenaInformation information)
public ArenaData(ArenaScore score, ArenaInformation information, ArenaSheet.RoundData roundData, Address avatarAddress)
{
Score = score;
Information = information;
}

public string ToJson()
{
var settings = new JsonSerializerSettings
{
Converters = new[] { new BigIntegerToStringConverter() },
Formatting = Formatting.Indented
};

string jsonString = JsonConvert.SerializeObject(this, settings);
return jsonString;
RoundData = roundData;
AvatarAddress = avatarAddress;
}
}
17 changes: 1 addition & 16 deletions NineChroniclesUtilBackend.Store/Models/State/AvataData.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
using Nekoyume.Model.State;
using Newtonsoft.Json;
using NineChroniclesUtilBackend.Store.Util;


namespace NineChroniclesUtilBackend.Store.Models;

public class AvatarData
public class AvatarData : BaseData
{
public AvatarState Avatar { get; }
public ItemSlotState ItemSlot { get; }
Expand All @@ -17,16 +14,4 @@ public AvatarData(AvatarState avatar, ItemSlotState itemSlot, List<RuneState> ru
ItemSlot = itemSlot;
RuneSlot = runeSlot;
}

public string ToJson()
{
var settings = new JsonSerializerSettings
{
Converters = new[] { new BigIntegerToStringConverter() },
Formatting = Formatting.Indented
};

string jsonString = JsonConvert.SerializeObject(this, settings);
return jsonString;
}
}
20 changes: 20 additions & 0 deletions NineChroniclesUtilBackend.Store/Models/State/BaseData.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using Newtonsoft.Json;
using NineChroniclesUtilBackend.Store.Util;

namespace NineChroniclesUtilBackend.Store.Models;

public class BaseData
{
protected static JsonSerializerSettings JsonSerializerSettings => new JsonSerializerSettings
{
Converters = new[] { new BigIntegerToStringConverter() },
Formatting = Formatting.Indented,
// ContractResolver = new Newtonsoft.Json.Serialization.CamelCasePropertyNamesContractResolver(),
NullValueHandling = NullValueHandling.Ignore
};

public string ToJson()
{
return JsonConvert.SerializeObject(this, JsonSerializerSettings);
}
}
3 changes: 1 addition & 2 deletions NineChroniclesUtilBackend.Store/Scrapper/ArenaScrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ public async Task ExecuteAsync()
var avatarData = await GetAvatarData(avatarAddress);

RaiseDataCollected(arenaData, avatarData);
break;
}
}

Expand All @@ -50,7 +49,7 @@ public async Task<ArenaData> GetArenaData(ArenaSheet.RoundData roundData, Addres
var arenaScore = await _stateGetter.GetArenaScoreState(avatarAddress, roundData.ChampionshipId, roundData.Round);
var arenaInfo = await _stateGetter.GetArenaInfoState(avatarAddress, roundData.ChampionshipId, roundData.Round);

return new ArenaData(arenaScore, arenaInfo);
return new ArenaData(arenaScore, arenaInfo, roundData, avatarAddress);
}

public async Task<AvatarData> GetAvatarData(Address avatarAddress)
Expand Down
119 changes: 110 additions & 9 deletions NineChroniclesUtilBackend.Store/Services/MongoDbStore.cs
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
using MongoDB.Bson;
using MongoDB.Driver;
using NineChroniclesUtilBackend.Store.Models;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace NineChroniclesUtilBackend.Store.Services;

public class MongoDbStore
{
private readonly IMongoCollection<BsonDocument> _arenaCollection;
private readonly IMongoCollection<BsonDocument> _avatarCollection;
private BlockingCollection<ArenaData> _arenaDataQueue = new BlockingCollection<ArenaData>();
private BlockingCollection<AvatarData> _avatarDataQueue = new BlockingCollection<AvatarData>();
private readonly int _batchSize = 100;

public MongoDbStore(string connectionString, string databaseName)
{
Expand All @@ -16,19 +22,114 @@ public MongoDbStore(string connectionString, string databaseName)

_arenaCollection = database.GetCollection<BsonDocument>("arena");
_avatarCollection = database.GetCollection<BsonDocument>("avatars");

Task.Run(() => ProcessArenaDataAsync());

Task.Run(() => ProcessAvatarDataAsync());
}

public void AddArenaData(ArenaData arenaData)
{
_arenaDataQueue.Add(arenaData);
}

public void AddAvatarData(AvatarData avatarData)
{
_avatarDataQueue.Add(avatarData);
}

private async Task ProcessArenaDataAsync()
{
List<ArenaData> batch = new List<ArenaData>();
foreach (var arenaData in _arenaDataQueue.GetConsumingEnumerable())
{
batch.Add(arenaData);
if (batch.Count >= _batchSize)
{
await BulkUpsertArenaDataAsync(batch);
batch.Clear();
}
}

if (batch.Count > 0)
{
await BulkUpsertArenaDataAsync(batch);
}
}

private async Task ProcessAvatarDataAsync()
{
List<AvatarData> batch = new List<AvatarData>();
foreach (var avatarData in _avatarDataQueue.GetConsumingEnumerable())
{
batch.Add(avatarData);
if (batch.Count >= _batchSize)
{
await BulkUpsertAvatarDataAsync(batch);
batch.Clear();
}
}

if (batch.Count > 0)
{
await BulkUpsertAvatarDataAsync(batch);
}
}

public async Task SaveArenaDataAsync(ArenaData arenaData)
private async Task BulkUpsertArenaDataAsync(List<ArenaData> arenaDatas)
{
var jsonString = arenaData.ToJson();
var bsonDocument = BsonDocument.Parse(jsonString);
await _arenaCollection.InsertOneAsync(bsonDocument);
var bulkOps = new List<WriteModel<BsonDocument>>();
foreach (var arenaData in arenaDatas)
{
var filter = Builders<BsonDocument>.Filter.Eq("avatarAddress", arenaData.AvatarAddress.ToHex());
var bsonDocument = BsonDocument.Parse(arenaData.ToJson());
var upsertOne = new ReplaceOneModel<BsonDocument>(filter, bsonDocument) { IsUpsert = true };
bulkOps.Add(upsertOne);
}
if (bulkOps.Count > 0)
{
await _arenaCollection.BulkWriteAsync(bulkOps);
}
}

public async Task SaveAvatarDataAsync(AvatarData avatarData)
private async Task BulkUpsertAvatarDataAsync(List<AvatarData> avatarDatas)
{
var bulkOps = new List<WriteModel<BsonDocument>>();
foreach (var avatarData in avatarDatas)
{
var filter = Builders<BsonDocument>.Filter.Eq("avatar.address", avatarData.Avatar.address.ToHex());
var bsonDocument = BsonDocument.Parse(avatarData.ToJson());
var upsertOne = new ReplaceOneModel<BsonDocument>(filter, bsonDocument) { IsUpsert = true };
bulkOps.Add(upsertOne);
}
if (bulkOps.Count > 0)
{
await _avatarCollection.BulkWriteAsync(bulkOps);
}
}
public async Task FlushAsync()
{
var jsonString = avatarData.ToJson();
var bsonDocument = BsonDocument.Parse(jsonString);
await _avatarCollection.InsertOneAsync(bsonDocument);
_arenaDataQueue.CompleteAdding();
_avatarDataQueue.CompleteAdding();

var remainingArenaDatas = new List<ArenaData>();
while (_arenaDataQueue.TryTake(out var arenaData))
{
remainingArenaDatas.Add(arenaData);
}
if (remainingArenaDatas.Any())
{
await BulkUpsertArenaDataAsync(remainingArenaDatas);
}

var remainingAvatarDatas = new List<AvatarData>();
while (_avatarDataQueue.TryTake(out var avatarData))
{
remainingAvatarDatas.Add(avatarData);
}
if (remainingAvatarDatas.Any())
{
await BulkUpsertAvatarDataAsync(remainingAvatarDatas);
}
}
}
}
2 changes: 1 addition & 1 deletion NineChroniclesUtilBackend.Store/Util/JsonConverter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ public override void WriteJson(JsonWriter writer, object value, JsonSerializer s
{
writer.WriteValue(value.ToString());
}
}
}
5 changes: 3 additions & 2 deletions NineChroniclesUtilBackend.Store/Worker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,14 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
}

await _scrapper.ExecuteAsync();
await _store.FlushAsync();
}

private async void HandleDataCollected(object sender, ArenaDataCollectedEventArgs e)
{
_logger.LogInformation("{avatarAddress} Data Collected", e.AvatarData.Avatar.address);

await _store.SaveArenaDataAsync(e.ArenaData);
await _store.SaveAvatarDataAsync(e.AvatarData);
_store.AddArenaData(e.ArenaData);
_store.AddAvatarData(e.AvatarData);
}
}

0 comments on commit a0ae617

Please sign in to comment.