Skip to content

Commit

Permalink
Updated arc service to work with the new bridge
Browse files Browse the repository at this point in the history
  • Loading branch information
Denrage committed Jun 12, 2024
1 parent b8b7acf commit 86382d3
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 48 deletions.
82 changes: 46 additions & 36 deletions Blish HUD/GameServices/ArcDps/V2/ArcDpsClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Runtime.CompilerServices;
Expand All @@ -29,7 +30,7 @@ internal class ArcDpsClient : IArcDpsClient {

public event EventHandler<SocketError> Error;

public bool IsConnected => this.isConnected && this.Client.Connected;
public bool IsConnected => isConnected && Client.Connected;

public TcpClient Client { get; }

Expand All @@ -40,7 +41,7 @@ public ArcDpsClient(ArcDpsBridgeVersion arcDpsBridgeVersion) {

processors.Add(1, new ImGuiProcessor());

if (this.arcDpsBridgeVersion == ArcDpsBridgeVersion.V1) {
if (arcDpsBridgeVersion == ArcDpsBridgeVersion.V1) {
processors.Add(2, new LegacyCombatProcessor());
processors.Add(3, new LegacyCombatProcessor());
} else {
Expand All @@ -49,19 +50,19 @@ public ArcDpsClient(ArcDpsBridgeVersion arcDpsBridgeVersion) {
}

// hardcoded message queue size. One Collection per message type. This is done just for optimizations
this.messageQueues = new BlockingCollection<byte[]>[4];
messageQueues = new BlockingCollection<byte[]>[byte.MaxValue];

this.Client = new TcpClient();
Client = new TcpClient();
}

public void RegisterMessageTypeListener<T>(int type, Func<T, CancellationToken, Task> listener)
where T : struct {
var processor = (MessageProcessor<T>)this.processors[type];
var processor = (MessageProcessor<T>)processors[type];
if (messageQueues[type] == null) {
messageQueues[type] = new BlockingCollection<byte[]>();

try {
Task.Run(() => this.ProcessMessage(processor, messageQueues[type]));
Task.Run(() => ProcessMessage(processor, messageQueues[type]));
} catch (OperationCanceledException) {
// NOP
}
Expand Down Expand Up @@ -91,17 +92,17 @@ private void ProcessMessage(MessageProcessor processor, BlockingCollection<byte[
/// <param name="ct">CancellationToken to cancel the whole client</param>
public void Initialize(IPEndPoint endpoint, CancellationToken ct) {
this.ct = ct;
this.Client.Connect(endpoint);
Client.Connect(endpoint);
_logger.Info("Connected to arcdps endpoint on: " + endpoint.ToString());

this.networkStream = this.Client.GetStream();
this.isConnected = true;
networkStream = Client.GetStream();
isConnected = true;

try {
if (this.arcDpsBridgeVersion == ArcDpsBridgeVersion.V1) {
Task.Run(async () => await this.LegacyReceive(ct), ct);
if (arcDpsBridgeVersion == ArcDpsBridgeVersion.V1) {
Task.Run(async () => await LegacyReceive(ct), ct);
} else {
Task.Run(async () => await this.Receive(ct), ct);
Task.Run(async () => await Receive(ct), ct);
}
} catch (OperationCanceledException) {
// NOP
Expand All @@ -110,84 +111,93 @@ public void Initialize(IPEndPoint endpoint, CancellationToken ct) {

public void Disconnect() {
if (isConnected) {
if (this.Client.Connected) {
this.Client.Close();
this.Client.Dispose();
if (Client.Connected) {
Client.Close();
Client.Dispose();
_logger.Info("Disconnected from arcdps endpoint");
}

this.isConnected = false;
this.Disconnected?.Invoke();
isConnected = false;
Disconnected?.Invoke();
}
}

private async Task LegacyReceive(CancellationToken ct) {
_logger.Info($"Start Legacy Receive Task for {this.Client.Client.RemoteEndPoint?.ToString()}");
_logger.Info($"Start Legacy Receive Task for {Client.Client.RemoteEndPoint?.ToString()}");
try {
var messageHeaderBuffer = new byte[9];
ArrayPool<byte> pool = ArrayPool<byte>.Shared;
while (this.Client.Connected) {
while (Client.Connected) {
ct.ThrowIfCancellationRequested();

if (this.Client.Available == 0) {
if (Client.Available == 0) {
await Task.Delay(1, ct);
}

ReadFromStream(this.networkStream, messageHeaderBuffer, 9);
ReadFromStream(networkStream, messageHeaderBuffer, 9);

// In V1 the message type is part of the message and therefor included in message length, so we subtract it here
var messageLength = Unsafe.ReadUnaligned<int>(ref messageHeaderBuffer[0]) - 1;
var messageType = messageHeaderBuffer[8];

var messageBuffer = pool.Rent(messageLength);
ReadFromStream(this.networkStream, messageBuffer, messageLength);
ReadFromStream(networkStream, messageBuffer, messageLength);

this.messageQueues[messageType]?.Add(messageBuffer);
if (messageQueues[messageType] != null) {
messageQueues[messageType]?.Add(messageBuffer);
} else {
pool.Return(messageBuffer);
}
#if DEBUG
Interlocked.Increment(ref Counter);
#endif

}
} catch (Exception ex) {
_logger.Error(ex.ToString());
this.Error?.Invoke(this, SocketError.SocketError);
this.Disconnect();
Error?.Invoke(this, SocketError.SocketError);
Disconnect();
}

_logger.Info($"Legacy Receive Task for {this.Client.Client?.RemoteEndPoint?.ToString()} stopped");
_logger.Info($"Legacy Receive Task for {Client.Client?.RemoteEndPoint?.ToString()} stopped");
}

private async Task Receive(CancellationToken ct) {
_logger.Info($"Start Receive Task for {this.Client.Client.RemoteEndPoint?.ToString()}");
_logger.Info($"Start Receive Task for {Client.Client.RemoteEndPoint?.ToString()}");
try {
var messageHeaderBuffer = new byte[5];
ArrayPool<byte> pool = ArrayPool<byte>.Shared;
while (this.Client.Connected) {
while (Client.Connected) {
ct.ThrowIfCancellationRequested();

if (this.Client.Available == 0) {
if (Client.Available == 0) {
await Task.Delay(1, ct);
}

ReadFromStream(this.networkStream, messageHeaderBuffer, 5);
ReadFromStream(networkStream, messageHeaderBuffer, 5);

var messageLength = Unsafe.ReadUnaligned<int>(ref messageHeaderBuffer[0]);
var messageLength = Unsafe.ReadUnaligned<int>(ref messageHeaderBuffer[0]) - 1;
var messageType = messageHeaderBuffer[4];

var messageBuffer = pool.Rent(messageLength);
ReadFromStream(this.networkStream, messageBuffer, messageLength);
this.messageQueues[messageType]?.Add(messageBuffer);
ReadFromStream(networkStream, messageBuffer, messageLength);

if (messageQueues[messageType] != null) {
messageQueues[messageType]?.Add(messageBuffer);
} else {
pool.Return(messageBuffer);
}
#if DEBUG
Interlocked.Increment(ref Counter);
#endif
}
} catch (Exception ex) {
_logger.Error(ex.ToString());
this.Error?.Invoke(this, SocketError.SocketError);
this.Disconnect();
Error?.Invoke(this, SocketError.SocketError);
Disconnect();
}

_logger.Info($"Receive Task for {this.Client.Client?.RemoteEndPoint?.ToString()} stopped");
_logger.Info($"Receive Task for {Client.Client?.RemoteEndPoint?.ToString()} stopped");
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
Expand Down
2 changes: 1 addition & 1 deletion Blish HUD/GameServices/ArcDps/V2/CommonFields.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public void Activate() {
if (_enabled) return;

_enabled = true;
GameService.ArcDpsV2.RegisterMessageType<CombatCallback>(2, CombatHandler);
GameService.ArcDpsV2.RegisterMessageType<CombatCallback>(MessageType.CombatEventArea, CombatHandler);
}

private Task CombatHandler(CombatCallback combatEvent, CancellationToken ct) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,19 @@ namespace Blish_HUD.GameServices.ArcDps.V2.Extensions {
public static class BincodeBinaryReaderExtensions {
public static CombatCallback ParseCombatCallback(this BincodeBinaryReader reader) {
var result = default(CombatCallback);
result.Event = reader.ParseCombatEvent();
result.Source = reader.ParseAgent();
result.Destination = reader.ParseAgent();
result.SkillName = reader.Convert.ParseString();
if (reader.Convert.ParseByte() == 1) {
result.Event = reader.ParseCombatEvent();
}
if (reader.Convert.ParseByte() == 1) {
result.Source = reader.ParseAgent();
}
if (reader.Convert.ParseByte() == 1) {
result.Destination = reader.ParseAgent();
}
if (reader.Convert.ParseByte() == 1) {
result.SkillName = reader.Convert.ParseString();
}

result.Id = reader.Convert.ParseULong();
result.Revision = reader.Convert.ParseULong();

Expand Down Expand Up @@ -50,7 +59,9 @@ public static CombatEvent ParseCombatEvent(this BincodeBinaryReader reader) {

public static Agent ParseAgent(this BincodeBinaryReader reader) {
var result = default(Agent);
result.Name = reader.Convert.ParseString();
if (reader.Convert.ParseByte() == 1) {
result.Name = reader.Convert.ParseString();
}
result.Id = reader.Convert.ParseUSize();
result.Profession = reader.Convert.ParseUInt();
result.Elite = reader.Convert.ParseUInt();
Expand Down
13 changes: 13 additions & 0 deletions Blish HUD/GameServices/ArcDps/V2/MessageType.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Blish_HUD.GameServices.ArcDps.V2 {
public enum MessageType {
ImGui = 1,
CombatEventArea = 2,
CombatEventLocal = 3,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public static double Convert(BinaryReader reader) {
}

public static class IntConverter {
public static bool UseVarint { get; set; } = true;
public static bool UseVarint { get; set; } = false;

public class VarintEncoding {
public static readonly VarintEncoding Instance = new VarintEncoding();
Expand Down
8 changes: 4 additions & 4 deletions Blish HUD/GameServices/ArcDpsService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,11 @@ public class ArcDpsService : GameService {
public void SubscribeToCombatEventId(Action<object, RawCombatEventArgs> func, params uint[] skillIds) {

if (!_subscribed) {
GameService.ArcDpsV2.RegisterMessageType<CombatCallback>(2, async (combatEvent, ct) => {
GameService.ArcDpsV2.RegisterMessageType<CombatCallback>(GameServices.ArcDps.V2.MessageType.CombatEventArea, async (combatEvent, ct) => {
DispatchSkillSubscriptions(combatEvent, RawCombatEventArgs.CombatEventType.Area);
await System.Threading.Tasks.Task.CompletedTask;
});
GameService.ArcDpsV2.RegisterMessageType<CombatCallback>(3, async (combatEvent, ct) => {
GameService.ArcDpsV2.RegisterMessageType<CombatCallback>(GameServices.ArcDps.V2.MessageType.CombatEventLocal, async (combatEvent, ct) => {
DispatchSkillSubscriptions(combatEvent, RawCombatEventArgs.CombatEventType.Local);
await System.Threading.Tasks.Task.CompletedTask;
});
Expand Down Expand Up @@ -117,13 +117,13 @@ protected override void Initialize() {
this.RawCombatEvent += (a, b) => { Interlocked.Increment(ref Counter); };
#endif

GameService.ArcDpsV2.RegisterMessageType<CombatCallback>(2, async (combatEvent, ct) => {
GameService.ArcDpsV2.RegisterMessageType<CombatCallback>(GameServices.ArcDps.V2.MessageType.CombatEventArea, async (combatEvent, ct) => {
var rawCombat = ConvertFrom(combatEvent, RawCombatEventArgs.CombatEventType.Area);
this.RawCombatEvent?.Invoke(this, rawCombat);
await System.Threading.Tasks.Task.CompletedTask;
});

GameService.ArcDpsV2.RegisterMessageType<CombatCallback>(3, async (combatEvent, ct) => {
GameService.ArcDpsV2.RegisterMessageType<CombatCallback>(GameServices.ArcDps.V2.MessageType.CombatEventLocal, async (combatEvent, ct) => {
var rawCombat = ConvertFrom(combatEvent, RawCombatEventArgs.CombatEventType.Local);
this.RawCombatEvent?.Invoke(this, rawCombat);
await System.Threading.Tasks.Task.CompletedTask;
Expand Down
11 changes: 10 additions & 1 deletion Blish HUD/GameServices/ArcDpsServiceV2.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,19 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
using Blish_HUD.ArcDps;
using Blish_HUD.GameServices.ArcDps;
using Blish_HUD.GameServices.ArcDps.V2;
using Blish_HUD.GameServices.ArcDps.V2.Extensions;
using Blish_HUD.GameServices.ArcDps.V2.Models;
using Blish_HUD.GameServices.ArcDps.V2.Processors;
using Microsoft.Xna.Framework;
using SharpDX;

namespace Blish_HUD {

Expand Down Expand Up @@ -69,6 +73,11 @@ private set {
}
}

public void RegisterMessageType<T>(MessageType type, Func<T, CancellationToken, Task> listener)
where T : struct {
RegisterMessageType<T>((int)type, listener);
}

public void RegisterMessageType<T>(int type, Func<T, CancellationToken, Task> listener)
where T : struct {
Action action = () => _arcDpsClient.RegisterMessageTypeListener(type, listener);
Expand Down Expand Up @@ -120,7 +129,7 @@ private void Start(uint processId) {
_arcDpsClient.Error += SocketErrorHandler;
_arcDpsClient.Initialize(new IPEndPoint(IPAddress.Loopback, GetPort(processId, version)), _arcDpsClientCancellationTokenSource.Token);

RegisterMessageType<ImGuiCallback>(1, async (imGuiCallback, ct) => {
RegisterMessageType<ImGuiCallback>(MessageType.ImGui, async (imGuiCallback, ct) => {
this.HudIsActive = imGuiCallback.NotCharacterSelectOrLoading != 0;
});
}
Expand Down

0 comments on commit 86382d3

Please sign in to comment.