Skip to content

Commit

Permalink
wire up event messaging
Browse files Browse the repository at this point in the history
  • Loading branch information
dkackman committed Jan 27, 2024
1 parent b120a24 commit ac1cbe9
Show file tree
Hide file tree
Showing 10 changed files with 231 additions and 33 deletions.
23 changes: 23 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,29 @@ var standardWallet = new Wallet(1, wallet);
var transaction = await standardWallet.SendTransaction("xch1ls2w9l2tksmp8u3a8xewhn86na3fjhxq79gnsccxr0v3rpa5ejcsuugha7", 1, 1);
```

### Listen for events

```csharp
using chia.dotnet;

var endpoint = Config.Open().GetEndpoint("daemon");
using var rpcClient = new WebSocketRpcClient(endpoint);
await rpcClient.Connect();

var daemon = new DaemonProxy(rpcClient, "eventing_testharness");
await daemon.RegisterService("wallet_ui"); // this listens for the messages sent to the ui
var farmer = daemon.CreateProxyFrom<FarmerProxy>();
farmer.ConnectionsChanged += (sender, data) => Console.WriteLine($"Connection count: {data.Count()}");
farmer.NewFarmingInfo += (sender, data) => Console.WriteLine($"Farming info: {data}");
farmer.NewSignagePoint += (sender, data) => Console.WriteLine($"Signage Point: {data}");

while (true)
{
await Task.Delay(100);
}
```

### Build

````bash
Expand Down
14 changes: 14 additions & 0 deletions src/EventingTestHarness/EventingTestHarness.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\chia-dotnet\chia-dotnet.csproj" />
</ItemGroup>

</Project>
18 changes: 18 additions & 0 deletions src/EventingTestHarness/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using chia.dotnet;

var endpoint = Config.Open().GetEndpoint("daemon");
using var rpcClient = new WebSocketRpcClient(endpoint);
await rpcClient.Connect();

var daemon = new DaemonProxy(rpcClient, "eventing_testharness");
await daemon.RegisterService("wallet_ui"); // this listens for the messages sent to the ui

var farmer = daemon.CreateProxyFrom<FarmerProxy>();
farmer.ConnectionsChanged += (sender, data) => Console.WriteLine($"Connections count: {data.Count()}");
farmer.NewFarmingInfo += (sender, data) => Console.WriteLine($"Farming info: {data}");
farmer.NewSignagePoint += (sender, data) => Console.WriteLine($"Signage Point: {data}");

while (true)
{
await Task.Delay(100);
}
8 changes: 7 additions & 1 deletion src/chia-dotnet.sln
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution
..\chia-leaf-logo-384x384.png = ..\chia-leaf-logo-384x384.png
EndProjectSection
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "CodeGen", "CodeGen\CodeGen.csproj", "{736550B5-6E4E-4BC4-9E4D-B93E6FC8528B}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "CodeGen", "CodeGen\CodeGen.csproj", "{736550B5-6E4E-4BC4-9E4D-B93E6FC8528B}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EventingTestHarness", "EventingTestHarness\EventingTestHarness.csproj", "{58998896-677B-43CE-B0C2-85999D52D281}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Expand All @@ -38,6 +40,10 @@ Global
{736550B5-6E4E-4BC4-9E4D-B93E6FC8528B}.Debug|Any CPU.Build.0 = Debug|Any CPU
{736550B5-6E4E-4BC4-9E4D-B93E6FC8528B}.Release|Any CPU.ActiveCfg = Release|Any CPU
{736550B5-6E4E-4BC4-9E4D-B93E6FC8528B}.Release|Any CPU.Build.0 = Release|Any CPU
{58998896-677B-43CE-B0C2-85999D52D281}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{58998896-677B-43CE-B0C2-85999D52D281}.Debug|Any CPU.Build.0 = Debug|Any CPU
{58998896-677B-43CE-B0C2-85999D52D281}.Release|Any CPU.ActiveCfg = Release|Any CPU
{58998896-677B-43CE-B0C2-85999D52D281}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
2 changes: 1 addition & 1 deletion src/chia-dotnet/ChiaTypes/ConnectionInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public record ConnectionInfo
public int PeerPort { get; init; }
public int PeerServerPort { get; init; }
/// <summary>
/// Flag indiciating whether the peer connection is local to the node
/// Flag indicating whether the peer connection is local to the node
/// </summary>
[JsonIgnore]
public bool IsLocal => PeerHost is "127.0.0.1" or "localhost" or "::1" or "0:0:0:0:0:0:0:1";
Expand Down
4 changes: 2 additions & 2 deletions src/chia-dotnet/ChiaTypes/FarmerSignagePoint.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
namespace chia.dotnet
{
/// <summary>
/// This type doesn't exist in the chia code but is generated and passed around as a dicitonary
/// (not to be ocnfused with <see cref="SignagePoint"/>)
/// This type doesn't exist in the chia code but is generated and passed around as a dictionary
/// (not to be confused with <see cref="SignagePoint"/>)
/// </summary>
public record FarmerSignagePoint
{
Expand Down
3 changes: 1 addition & 2 deletions src/chia-dotnet/DaemonProxy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ namespace chia.dotnet
/// <param name="originService"><see cref="Message.Origin"/></param>
public sealed class DaemonProxy(WebSocketRpcClient rpcClient, string originService) : ServiceProxy(rpcClient, ServiceNames.Daemon, originService)
{

/// <summary>
/// Sends ping message to the service
/// </summary>
Expand Down Expand Up @@ -121,7 +120,7 @@ public async Task StopService(string service, CancellationToken cancellationToke
}

/// <summary>
/// Get athe list of running services
/// Get the list of running services
/// </summary>
/// <param name="cancellationToken">A token to allow the call to be cancelled</param>
/// <returns>A list of services</returns>
Expand Down
74 changes: 74 additions & 0 deletions src/chia-dotnet/FarmerProxy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,80 @@ namespace chia.dotnet
/// <param name="originService"><see cref="Message.Origin"/></param>
public sealed class FarmerProxy(IRpcClient rpcClient, string originService) : ServiceProxy(rpcClient, ServiceNames.Farmer, originService)
{
/// <summary>
/// Event raised when a new signage point is received
/// </summary>
public event EventHandler<dynamic>? NewSignagePoint;

/// <summary>
/// Event raised when a new signage point is received
/// </summary>
public event EventHandler<dynamic>? NewFarmingInfo;

/// <summary>
/// Event raised when a proof message arrives
/// </summary>
public event EventHandler<dynamic>? Proof;

/// <summary>
/// Event raised when a partial fails
/// </summary>
public event EventHandler<dynamic>? PartialFailed;

/// <summary>
/// Event raised when a partial is submitted
/// </summary>
public event EventHandler<dynamic>? PartialSubmitted;

/// <summary>
/// Event raised when a harvester is updated
/// </summary>
public event EventHandler<dynamic>? HarvesterUpdated;

/// <summary>
/// Event raised when a harvester is removed
/// </summary>
public event EventHandler<dynamic>? HarvesterRemoved;

/// <summary>
/// <see cref="ServiceProxy.OnEventMessage(Message)"/>
/// </summary>
/// <param name="msg"></param>
protected override void OnEventMessage(Message msg)
{
if (msg.Command == "new_signage_point")
{
NewSignagePoint?.Invoke(this, msg.Data);
}
else if (msg.Command == "new_farming_info")
{
NewFarmingInfo?.Invoke(this, msg.Data);
}
else if (msg.Command == "proof")
{
Proof?.Invoke(this, msg.Data);
}
else if (msg.Command == "failed_partial")
{
PartialFailed?.Invoke(this, msg.Data);
}
else if (msg.Command == "submitted_partial")
{
PartialSubmitted?.Invoke(this, msg.Data);
}
else if (msg.Command == "harvester_update")
{
HarvesterUpdated?.Invoke(this, msg.Data);
}
else if (msg.Command == "harvester_removed")
{
HarvesterRemoved?.Invoke(this, msg.Data);
}
else
{
base.OnEventMessage(msg);
}
}

/// <summary>
/// Get the farm and pool reward targets
Expand Down
84 changes: 83 additions & 1 deletion src/chia-dotnet/ServiceProxy.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Diagnostics;
using System.Collections.Generic;
using System.Dynamic;
using System.Threading;
Expand Down Expand Up @@ -34,6 +35,87 @@ protected ServiceProxy(IRpcClient rpcClient, string destinationService, string o

DestinationService = destinationService;
OriginService = originService;

// only WebSocket can source events. Https does not have this mechanism
if (rpcClient is WebSocketRpcClient wss)
{
wss.BroadcastMessageReceived += (sender, msg) =>
{
// this filters the messages so that derived classes
// only get signaled for messages from their service
if (msg.Origin == DestinationService)
{
OnEventMessage(msg);
}
};
}
}

/// <summary>
/// Indicates whether this instance is wired to a <see cref="WebSocketRpcClient"/> so may source events
/// </summary>
public bool IsEventSource => RpcClient is WebSocketRpcClient;

/// <summary>
/// Event raised when a get_connections broadcast message is received
/// </summary>
public event EventHandler<IEnumerable<ConnectionInfo>>? ConnectionsChanged;

/// <summary>
/// Event raised when a connection is added
/// </summary>
public event EventHandler<dynamic>? ConnectionAdded;

/// <summary>
/// Event raised when a connection is closed
/// </summary>
public event EventHandler<dynamic>? ConnectionClosed;

/// <summary>
/// Event raised when a broadcast message is received that isn't recognized
/// </summary>
public event EventHandler<Message>? UnrecognizedEvent;

/// <summary>
/// Called when an event message is received
/// </summary>
/// <param name="msg"></param>
/// <remarks>You need to call <see cref="DaemonProxy.RegisterService(string, CancellationToken)"/>
/// <remarks>You need to call <see cref="DaemonProxy.RegisterService(string, CancellationToken)"/>
/// with `wallet_ui` in order for service events to be generated.</remarks>
protected virtual void OnEventMessage(Message msg)
{
if (msg.Command == "get_connections")
{
var connections = SafeDeserializePayload<IEnumerable<ConnectionInfo>>(msg.Data, "connections");
ConnectionsChanged?.Invoke(this, connections);
}
else if (msg.Command == "add_connection")
{
ConnectionsChanged?.Invoke(this, msg.Data);
}
else if (msg.Command == "close_connection")
{
ConnectionClosed?.Invoke(this, msg.Data);
}
else
{
UnrecognizedEvent?.Invoke(this, msg);
}
}

protected static T? SafeDeserializePayload<T>(dynamic payload, string childItem)
{
try
{
return Converters.ToObject<IEnumerable<ConnectionInfo>>(payload, childItem);
}
catch (Exception ex)
{
Debug.WriteLine(ex.ToString());
}

return default;
}

/// <summary>
Expand Down Expand Up @@ -153,7 +235,7 @@ internal async Task<dynamic> SendMessage(string command, dynamic? data, Cancella
{
throw;
}
catch (Exception e) // wrap eveything else in a response exception - this will include websocket or http specific failures
catch (Exception e) // wrap everything else in a response exception - this will include WebSocket or http specific failures
{
throw new ResponseException(message, "Something went wrong sending the rpc message. Inspect the InnerException for details.", e);
}
Expand Down
34 changes: 8 additions & 26 deletions src/chia-dotnet/WebSocketRpcClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,7 @@ public WebSocketRpcClient(EndpointInfo endpoint)
/// <returns>An awaitable <see cref="Task"/></returns>
public async Task Connect(CancellationToken cancellationToken = default)
{
if (disposedValue)
{
throw new ObjectDisposedException(nameof(WebSocketRpcClient));
}
ObjectDisposedException.ThrowIf(disposedValue, this);

if (_webSocket.State is WebSocketState.Connecting or WebSocketState.Open)
{
Expand Down Expand Up @@ -81,10 +78,7 @@ protected virtual void OnConnected()
/// <returns>Awaitable <see cref="Task"/></returns>
public async Task Close(CancellationToken cancellationToken = default)
{
if (disposedValue)
{
throw new ObjectDisposedException(nameof(WebSocketRpcClient));
}
ObjectDisposedException.ThrowIf(disposedValue, this);

_receiveCancellationTokenSource.Cancel();
await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "bye", cancellationToken).ConfigureAwait(false);
Expand All @@ -99,10 +93,7 @@ public async Task Close(CancellationToken cancellationToken = default)
/// <returns>Awaitable <see cref="Task"/></returns>
public virtual async Task PostMessage(Message message, CancellationToken cancellationToken = default)
{
if (disposedValue)
{
throw new ObjectDisposedException(nameof(WebSocketRpcClient));
}
ObjectDisposedException.ThrowIf(disposedValue, this);

var json = message.ToJson();
await _webSocket.SendAsync(Encoding.UTF8.GetBytes(json), WebSocketMessageType.Text, true, cancellationToken).ConfigureAwait(false);
Expand All @@ -118,10 +109,7 @@ public virtual async Task PostMessage(Message message, CancellationToken cancell
/// <exception cref="ResponseException">Throws when <see cref="Message.IsSuccessfulResponse"/> is False</exception>
public async Task<dynamic> SendMessage(Message message, CancellationToken cancellationToken = default)
{
if (disposedValue)
{
throw new ObjectDisposedException(nameof(WebSocketRpcClient));
}
ObjectDisposedException.ThrowIf(disposedValue, this);

// capture the message to be sent
if (!_pendingRequests.TryAdd(message.RequestId, message))
Expand Down Expand Up @@ -165,6 +153,8 @@ public async Task<dynamic> SendMessage(Message message, CancellationToken cancel
/// or was a response from a posted message (i.e. we didn't register to receive the response)
/// Pooling state_changed messages come through this event
/// </summary>
/// <remarks>You need to call <see cref="DaemonProxy.RegisterService(string, CancellationToken)"/>
/// with `wallet_ui` in order for service events to be generated.</remarks>
public event EventHandler<Message>? BroadcastMessageReceived;

/// <summary>
Expand All @@ -173,14 +163,6 @@ public async Task<dynamic> SendMessage(Message message, CancellationToken cancel
/// <param name="message">The message to broadcast</param>
protected virtual void OnBroadcastMessageReceived(Message message)
{
if (message is null)
{
throw new ArgumentNullException(nameof(message));
}

// Debug.WriteLine("Broadcast message:");
// Debug.WriteLine(message.ToJson());

BroadcastMessageReceived?.Invoke(this, message);
}

Expand Down Expand Up @@ -215,7 +197,7 @@ private async Task ReceiveLoop()
{
_pendingResponses[message.RequestId] = message;
}
else //if (message.RequestId != string.Empty) // only broadcast if it's an actual response
else
{
OnBroadcastMessageReceived(message);
}
Expand All @@ -224,7 +206,7 @@ private async Task ReceiveLoop()

private static bool ValidateServerCertificate(object sender, X509Certificate? certificate, X509Chain? chain, SslPolicyErrors sslPolicyErrors)
{
// uncomment these checks to change remote cert validaiton requirements
// uncomment these checks to change remote cert validation requirements

// require remote ca to be trusted on this machine
//if ((sslPolicyErrors & SslPolicyErrors.RemoteCertificateChainErrors) == SslPolicyErrors.RemoteCertificateChainErrors)
Expand Down

0 comments on commit ac1cbe9

Please sign in to comment.