Skip to content

Commit

Permalink
Merge pull request blish-hud#971 from Denrage/feat/arcdpsBridgeV2
Browse files Browse the repository at this point in the history
Arcdps-Bridge V2 finishing touches
  • Loading branch information
dlamkins authored Jun 14, 2024
2 parents b8b7acf + 8dea94a commit 133f26c
Show file tree
Hide file tree
Showing 14 changed files with 210 additions and 98 deletions.
163 changes: 95 additions & 68 deletions Blish HUD/GameServices/ArcDps/V2/ArcDpsClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Blish_HUD.GameServices.ArcDps.Models.UnofficialExtras;
using Blish_HUD.GameServices.ArcDps.V2;
using Blish_HUD.GameServices.ArcDps.V2.Processors;

Expand All @@ -19,49 +21,56 @@ internal class ArcDpsClient : IArcDpsClient {
#endif

private static readonly Logger _logger = Logger.GetLogger<ArcDpsServiceV2>();
private readonly BlockingCollection<byte[]>[] messageQueues;
private readonly Dictionary<int, MessageProcessor> processors = new Dictionary<int, MessageProcessor>();
private readonly ArcDpsBridgeVersion arcDpsBridgeVersion;
private bool isConnected = false;
private NetworkStream networkStream;
private CancellationToken ct;
private bool disposedValue;
private readonly BlockingCollection<byte[]>[] _messageQueues;
private readonly Dictionary<int, MessageProcessor> _processors = new Dictionary<int, MessageProcessor>();
private readonly ArcDpsBridgeVersion _arcDpsBridgeVersion;
private bool _isConnected = false;
private NetworkStream _networkStream;
private CancellationTokenSource _cancellationTokenSource;
private CancellationTokenSource _linkedTokenSource;
private CancellationToken _linkedToken;
private bool _disposedValue;
private CancellationToken _ct;

public event EventHandler<SocketError> Error;

public bool IsConnected => this.isConnected && this.Client.Connected;
public bool IsConnected => _isConnected && (Client?.Connected ?? false);

public TcpClient Client { get; }
public TcpClient Client { get; private set; }

public event Action Disconnected;

public ArcDpsClient(ArcDpsBridgeVersion arcDpsBridgeVersion) {
this.arcDpsBridgeVersion = arcDpsBridgeVersion;
this._arcDpsBridgeVersion = arcDpsBridgeVersion;

processors.Add(1, new ImGuiProcessor());
_processors.Add(1, new ImGuiProcessor());

if (this.arcDpsBridgeVersion == ArcDpsBridgeVersion.V1) {
processors.Add(2, new LegacyCombatProcessor());
processors.Add(3, new LegacyCombatProcessor());
if (arcDpsBridgeVersion == ArcDpsBridgeVersion.V1) {
_processors.Add((int)MessageType.CombatEventArea, new LegacyCombatProcessor());
_processors.Add((int)MessageType.CombatEventLocal, new LegacyCombatProcessor());
} else {
processors.Add(2, new CombatEventProcessor());
processors.Add(3, new CombatEventProcessor());
_processors.Add((int)MessageType.CombatEventArea, new CombatEventProcessor());
_processors.Add((int)MessageType.CombatEventLocal, new CombatEventProcessor());
_processors.Add((int)MessageType.UserInfo, new UnofficialExtrasUserInfoProcessor());
_processors.Add((int)MessageType.ChatMessage, new UnofficialExtrasMessageInfoProcessor());
}

// hardcoded message queue size. One Collection per message type. This is done just for optimizations
this.messageQueues = new BlockingCollection<byte[]>[4];
_messageQueues = new BlockingCollection<byte[]>[byte.MaxValue];

this.Client = new TcpClient();
}

public bool IsMessageTypeAvailable(MessageType type)
=> this._processors.ContainsKey((int)type);

public void RegisterMessageTypeListener<T>(int type, Func<T, CancellationToken, Task> listener)
where T : struct {
var processor = (MessageProcessor<T>)this.processors[type];
if (messageQueues[type] == null) {
messageQueues[type] = new BlockingCollection<byte[]>();
var processor = (MessageProcessor<T>)_processors[type];
if (_messageQueues[type] == null) {
_messageQueues[type] = new BlockingCollection<byte[]>();

try {
Task.Run(() => this.ProcessMessage(processor, messageQueues[type]));
Task.Run(() => ProcessMessage(processor, _messageQueues[type]));
} catch (OperationCanceledException) {
// NOP
}
Expand All @@ -71,17 +80,17 @@ public void RegisterMessageTypeListener<T>(int type, Func<T, CancellationToken,
}

private void ProcessMessage(MessageProcessor processor, BlockingCollection<byte[]> messageQueue) {
while (!ct.IsCancellationRequested) {
ct.ThrowIfCancellationRequested();
while (!_linkedToken.IsCancellationRequested) {
_linkedToken.ThrowIfCancellationRequested();
Task.Delay(1).Wait();
foreach (var item in messageQueue.GetConsumingEnumerable()) {
ct.ThrowIfCancellationRequested();
processor.Process(item, ct);
_linkedToken.ThrowIfCancellationRequested();
processor.Process(item, _linkedToken);
ArrayPool<byte>.Shared.Return(item);
}
}

ct.ThrowIfCancellationRequested();
_linkedToken.ThrowIfCancellationRequested();
}

/// <summary>
Expand All @@ -90,104 +99,121 @@ private void ProcessMessage(MessageProcessor processor, BlockingCollection<byte[
/// <param name="endpoint"></param>
/// <param name="ct">CancellationToken to cancel the whole client</param>
public void Initialize(IPEndPoint endpoint, CancellationToken ct) {
this.ct = ct;
this.Client.Connect(endpoint);
this._ct = ct;
_cancellationTokenSource?.Cancel();
_cancellationTokenSource = new CancellationTokenSource();
_linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(ct, this._cancellationTokenSource.Token);
_linkedToken = _linkedTokenSource.Token;
Client?.Dispose();
Client = new TcpClient();
Client.ReceiveBufferSize = 4096;
Client.Connect(endpoint);
_logger.Info("Connected to arcdps endpoint on: " + endpoint.ToString());

this.networkStream = this.Client.GetStream();
this.isConnected = true;
_networkStream = Client.GetStream();
_isConnected = true;

try {
if (this.arcDpsBridgeVersion == ArcDpsBridgeVersion.V1) {
Task.Run(async () => await this.LegacyReceive(ct), ct);
if (_arcDpsBridgeVersion == ArcDpsBridgeVersion.V1) {
Task.Run(async () => await LegacyReceive(_linkedToken), _linkedToken);
} else {
Task.Run(async () => await this.Receive(ct), ct);
Task.Run(async () => await Receive(_linkedToken), _linkedToken);
}
} catch (OperationCanceledException) {
// NOP
}
}

public void Disconnect() {
if (isConnected) {
if (this.Client.Connected) {
this.Client.Close();
this.Client.Dispose();
if (_isConnected) {
if (Client?.Connected ?? false) {
Client.Close();
Client.Dispose();
_logger.Info("Disconnected from arcdps endpoint");
}

this.isConnected = false;
this.Disconnected?.Invoke();
_isConnected = false;
Disconnected?.Invoke();
}
}

private async Task LegacyReceive(CancellationToken ct) {
_logger.Info($"Start Legacy Receive Task for {this.Client.Client.RemoteEndPoint?.ToString()}");
_logger.Info($"Start Legacy Receive Task for {Client?.Client.RemoteEndPoint?.ToString()}");
try {
var messageHeaderBuffer = new byte[9];
ArrayPool<byte> pool = ArrayPool<byte>.Shared;
while (this.Client.Connected) {
while (Client?.Connected ?? false) {
ct.ThrowIfCancellationRequested();

if (this.Client.Available == 0) {
if (Client.Available == 0) {
await Task.Delay(1, ct);
}

ReadFromStream(this.networkStream, messageHeaderBuffer, 9);
ReadFromStream(_networkStream, messageHeaderBuffer, 9);

// In V1 the message type is part of the message and therefor included in message length, so we subtract it here
var messageLength = Unsafe.ReadUnaligned<int>(ref messageHeaderBuffer[0]) - 1;
var messageType = messageHeaderBuffer[8];

var messageBuffer = pool.Rent(messageLength);
ReadFromStream(this.networkStream, messageBuffer, messageLength);

this.messageQueues[messageType]?.Add(messageBuffer);
ReadMessage(pool, messageLength, _networkStream, _messageQueues, messageType);
#if DEBUG
Interlocked.Increment(ref Counter);
#endif

}
} catch (Exception ex) {
_logger.Error(ex.ToString());
this.Error?.Invoke(this, SocketError.SocketError);
this.Disconnect();
Error?.Invoke(this, SocketError.SocketError);
Disconnect();
}

_logger.Info($"Legacy Receive Task for {this.Client.Client?.RemoteEndPoint?.ToString()} stopped");
_logger.Info($"Legacy Receive Task for {Client?.Client.RemoteEndPoint?.ToString()} stopped");
}

private async Task Receive(CancellationToken ct) {
_logger.Info($"Start Receive Task for {this.Client.Client.RemoteEndPoint?.ToString()}");
_logger.Info($"Start Receive Task for {Client?.Client.RemoteEndPoint?.ToString()}");
try {
var messageHeaderBuffer = new byte[5];
ArrayPool<byte> pool = ArrayPool<byte>.Shared;
while (this.Client.Connected) {
while (Client?.Connected ?? false) {
ct.ThrowIfCancellationRequested();

if (this.Client.Available == 0) {
if (Client.Available == 0) {
await Task.Delay(1, ct);
}

ReadFromStream(this.networkStream, messageHeaderBuffer, 5);
ReadFromStream(_networkStream, messageHeaderBuffer, 5);

var messageLength = Unsafe.ReadUnaligned<int>(ref messageHeaderBuffer[0]);
var messageLength = Unsafe.ReadUnaligned<int>(ref messageHeaderBuffer[0]) - 1;
var messageType = messageHeaderBuffer[4];

var messageBuffer = pool.Rent(messageLength);
ReadFromStream(this.networkStream, messageBuffer, messageLength);
this.messageQueues[messageType]?.Add(messageBuffer);
ReadMessage(pool, messageLength, _networkStream, _messageQueues, messageType);
#if DEBUG
Interlocked.Increment(ref Counter);
#endif
}

// Reconnect if the bridge closes the connection.
// Pass on the cancellationToken from the creator of this class
this.Initialize((IPEndPoint)this.Client.Client.RemoteEndPoint, this._ct);
} catch (Exception ex) {
_logger.Error(ex.ToString());
this.Error?.Invoke(this, SocketError.SocketError);
this.Disconnect();
Error?.Invoke(this, SocketError.SocketError);
Disconnect();
}

_logger.Info($"Receive Task for {this.Client.Client?.RemoteEndPoint?.ToString()} stopped");
_logger.Info($"Receive Task for {Client?.Client.RemoteEndPoint?.ToString()} stopped");
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static void ReadMessage(ArrayPool<byte> pool, int messageLength, Stream networkStream, BlockingCollection<byte[]>[] messageQueues, byte messageType) {
var messageBuffer = pool.Rent(messageLength);
ReadFromStream(networkStream, messageBuffer, messageLength);

if (messageQueues[messageType] != null) {
messageQueues[messageType]?.Add(messageBuffer);
} else {
pool.Return(messageBuffer);
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
Expand All @@ -199,20 +225,21 @@ private static void ReadFromStream(Stream stream, byte[] buffer, int length) {
}

protected virtual void Dispose(bool disposing) {
if (!disposedValue) {
if (!_disposedValue) {
if (disposing) {
Client.Dispose();
foreach (var item in messageQueues) {
_cancellationTokenSource.Cancel();
Client?.Dispose();
foreach (var item in _messageQueues) {
if (item.Count != 0) {
foreach (var message in item) {
ArrayPool<byte>.Shared.Return(message);
}
}
}
networkStream.Dispose();
_networkStream?.Dispose();
}

disposedValue = true;
_disposedValue = true;
}
}

Expand Down
2 changes: 1 addition & 1 deletion Blish HUD/GameServices/ArcDps/V2/CommonFields.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public void Activate() {
if (_enabled) return;

_enabled = true;
GameService.ArcDpsV2.RegisterMessageType<CombatCallback>(2, CombatHandler);
GameService.ArcDpsV2.RegisterMessageType<CombatCallback>(MessageType.CombatEventArea, CombatHandler);
}

private Task CombatHandler(CombatCallback combatEvent, CancellationToken ct) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
using Blish_HUD.GameServices.ArcDps.Models.UnofficialExtras;
using Blish_HUD.GameServices.ArcDps.V2.Models;
using Blish_HUD.GameServices.ArcDps.V2.Processors;
using System;

namespace Blish_HUD.GameServices.ArcDps.V2.Extensions {
public static class BincodeBinaryReaderExtensions {
public static CombatCallback ParseCombatCallback(this BincodeBinaryReader reader) {
var result = default(CombatCallback);
result.Event = reader.ParseCombatEvent();
result.Source = reader.ParseAgent();
result.Destination = reader.ParseAgent();
result.SkillName = reader.Convert.ParseString();
result.Event = ParseOptional(reader, reader.ParseCombatEvent);
result.Source = ParseOptional(reader, reader.ParseAgent);
result.Destination = ParseOptional(reader, reader.ParseAgent);
result.SkillName = ParseOptional(reader, reader.Convert.ParseString);
result.Id = reader.Convert.ParseULong();
result.Revision = reader.Convert.ParseULong();

Expand Down Expand Up @@ -50,7 +51,7 @@ public static CombatEvent ParseCombatEvent(this BincodeBinaryReader reader) {

public static Agent ParseAgent(this BincodeBinaryReader reader) {
var result = default(Agent);
result.Name = reader.Convert.ParseString();
result.Name = ParseOptional(reader, reader.Convert.ParseString);
result.Id = reader.Convert.ParseUSize();
result.Profession = reader.Convert.ParseUInt();
result.Elite = reader.Convert.ParseUInt();
Expand All @@ -61,7 +62,7 @@ public static Agent ParseAgent(this BincodeBinaryReader reader) {

public static UserInfo ParseUserInfo(this BincodeBinaryReader reader) {
var result = default(UserInfo);
result.AccountName = reader.Convert.ParseString();
result.AccountName = ParseOptional(reader, reader.Convert.ParseString);
result.JoinTime = reader.Convert.ParseULong();
result.Role = ParseEnum((byte)reader.Convert.ParseUInt(), (int)UserRole.None, UserRole.None);
result.Subgroup = reader.Convert.ParseByte();
Expand All @@ -71,14 +72,13 @@ public static UserInfo ParseUserInfo(this BincodeBinaryReader reader) {
return result;
}

public static ChatMessageInfo ParseChatMessageInfo(BincodeBinaryReader reader) {
public static ChatMessageInfo ParseChatMessageInfo(this BincodeBinaryReader reader) {
var result = default(ChatMessageInfo);
result.ChannelId = reader.Convert.ParseUInt();
result.ChannelType = ParseEnum((byte)reader.Convert.ParseUInt(), (int)ChannelType.Invalid, ChannelType.Invalid);
result.Subgroup = reader.Convert.ParseByte();
result.IsBroadcast = reader.Convert.ParseBool();
result._unused1 = reader.Convert.ParseByte();
result.TimeStamp = reader.Convert.ParseString();
result.TimeStamp = DateTime.Parse(reader.Convert.ParseString());
result.AccountName = reader.Convert.ParseString();
result.CharacterName = reader.Convert.ParseString();
result.Text = reader.Convert.ParseString();
Expand All @@ -94,6 +94,13 @@ private static T ParseEnum<T>(byte enumByteValue, int maxValue, T unknown)

return (T)(object)enumByteValue;
}

private static T ParseOptional<T>(BincodeBinaryReader reader, Func<T> parse) {
if (reader.ReadByte() == 1) {
return parse();
}
return default;
}
}

}
6 changes: 4 additions & 2 deletions Blish HUD/GameServices/ArcDps/V2/IArcDpsClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ internal interface IArcDpsClient : IDisposable {
event EventHandler<SocketError> Error;

void Disconnect();

void Initialize(IPEndPoint endpoint, CancellationToken ct);

void Initialize(IPEndPoint endpoint, CancellationToken ct);

bool IsMessageTypeAvailable(MessageType type);

void RegisterMessageTypeListener<T>(int type, Func<T, CancellationToken, Task> listener) where T : struct;
}
}
Loading

0 comments on commit 133f26c

Please sign in to comment.