From 181227177b579e3814daa9e7b84366fb690b1e94 Mon Sep 17 00:00:00 2001 From: Doron Somech Date: Thu, 28 Jan 2021 15:06:46 +0200 Subject: [PATCH] add a way to send and receive routing keys --- src/NetMQ.Tests/RouterTests.cs | 21 +++++++ src/NetMQ/NetMQ-unix.csproj | 15 ----- src/NetMQ/OutgoingSocketExtensions.cs | 83 ++++++++++++++++++++++++ src/NetMQ/ReceivingSocketExtensions.cs | 87 +++++++++++++++++++++++++- src/NetMQ/RoutingKey.cs | 9 +++ 5 files changed, 198 insertions(+), 17 deletions(-) delete mode 100644 src/NetMQ/NetMQ-unix.csproj diff --git a/src/NetMQ.Tests/RouterTests.cs b/src/NetMQ.Tests/RouterTests.cs index 82ac7c9a3..0633b6a71 100644 --- a/src/NetMQ.Tests/RouterTests.cs +++ b/src/NetMQ.Tests/RouterTests.cs @@ -130,5 +130,26 @@ public void Handover() } } } + + [Fact] + public void RoutingKeys() + { + using var router = new RouterSocket("inproc://routing-keys"); + using var dealer = new DealerSocket("inproc://routing-keys"); + + dealer.SendRoutingKeys(new RoutingKey(1)).SendFrame("Hello"); + + var keys = router.ReceiveRoutingKeys(); + var message = router.ReceiveFrameString(); + + Assert.Equal("Hello", message); + + router.SendRoutingKeys(keys).SendFrame("World"); + + dealer.ReceiveRoutingKeys(); + var reply = dealer.ReceiveFrameString(); + + Assert.Equal("World", reply); + } } } diff --git a/src/NetMQ/NetMQ-unix.csproj b/src/NetMQ/NetMQ-unix.csproj deleted file mode 100644 index d48e0a058..000000000 --- a/src/NetMQ/NetMQ-unix.csproj +++ /dev/null @@ -1,15 +0,0 @@ - - - netcoreapp20 - NetMQ - NetMQ - NetMQ.snk - true - true - - - - - - - diff --git a/src/NetMQ/OutgoingSocketExtensions.cs b/src/NetMQ/OutgoingSocketExtensions.cs index 83dd3a339..33bbd4237 100644 --- a/src/NetMQ/OutgoingSocketExtensions.cs +++ b/src/NetMQ/OutgoingSocketExtensions.cs @@ -685,5 +685,88 @@ public static bool TrySendFrame(this IOutgoingSocket socket, TimeSpan timeout, R } #endregion + + #region Sending Routing Keys + + /// + /// Send empty list of routing keys over , append an empty message at the end of the keys. + /// + /// the IOutgoingSocket to transmit on + public static IOutgoingSocket SendEmptyRoutingKeys(this IOutgoingSocket socket) + { + return socket.SendMoreFrameEmpty(); + } + + /// + /// Send a single routing key over , append an empty message afterwards. + /// + /// the IOutgoingSocket to transmit on + public static IOutgoingSocket SendRoutingKeys(this IOutgoingSocket socket, params RoutingKey[] routingKeys) + { + foreach(var routingKey in routingKeys) + socket.SendMoreFrame(routingKey); + + socket.SendMoreFrameEmpty(); + + return socket; + } + + /// + /// Send routing keys over , append an empty message at the end of the keys. + /// + /// the IOutgoingSocket to transmit on + /// the routing keys to send + public static IOutgoingSocket SendRoutingKeys(this IOutgoingSocket socket, IEnumerable routingKeys) + { + foreach(var routingKey in routingKeys) + socket.SendMoreFrame(routingKey); + + socket.SendMoreFrameEmpty(); + + return socket; + } + + /// + /// Attempt to transmit routing keys over . + /// If message cannot be sent immediately, return false. + /// Routing is always sent as more frame. + /// + /// the IOutgoingSocket to transmit on + /// the routing keys to send + /// true if a message was available, otherwise false. + public static bool TrySendRoutingKeys(this IOutgoingSocket socket, IEnumerable routingKeys) + { + return socket.TrySendRoutingKeys(TimeSpan.Zero, routingKeys); + } + + /// + /// Attempt to transmit routing key over . + /// If message cannot be sent within , return false. + /// Routing is always sent as more frame. + /// + /// the IOutgoingSocket to transmit on + /// The maximum period of time to try to send a message. + /// the routing keys to send + /// true if a message was available, otherwise false. + public static bool TrySendRoutingKeys(this IOutgoingSocket socket, TimeSpan timeout, IEnumerable routingKeys) + { + var enumerator = routingKeys.GetEnumerator(); + + // Empty collection, just trying to send the empty message + if (!enumerator.MoveNext()) + return socket.TrySendFrameEmpty(timeout, true); + + if (!socket.TrySendFrame(enumerator.Current)) + return false; + + while (enumerator.MoveNext()) + socket.SendMoreFrame(enumerator.Current); + + socket.SendMoreFrameEmpty(); + + return true; + } + + #endregion } } diff --git a/src/NetMQ/ReceivingSocketExtensions.cs b/src/NetMQ/ReceivingSocketExtensions.cs index ac735e786..b20204dba 100644 --- a/src/NetMQ/ReceivingSocketExtensions.cs +++ b/src/NetMQ/ReceivingSocketExtensions.cs @@ -316,7 +316,7 @@ public static string ReceiveFrameString(this IReceivingSocket socket, Encoding e } finally { - msg.Close(); + msg.Close(); } } @@ -449,7 +449,7 @@ public static bool TryReceiveFrameString(this IReceivingSocket socket, TimeSpan } finally { - msg.Close(); + msg.Close(); } } @@ -1111,5 +1111,88 @@ public static bool TryReceiveRoutingKey(this IReceivingSocket socket, TimeSpan t } #endregion + + #region Receiving a routing keys + + /// + /// Receive routing keys from until a bottom message arrives (empty message), blocking until one arrives. + /// + /// The socket to receive from. + /// The routing keys. + public static IEnumerable ReceiveRoutingKeys(this IReceivingSocket socket) + { + List keys = new List(); + + while (true) + { + var routingKey = socket.ReceiveRoutingKey(out bool more); + if (!more) + throw new InvalidException("Malformed multipart message, empty message expected"); + + if (routingKey.Bytes.Length == 0) + break; + + keys.Add(routingKey); + } + + return keys; + } + + /// + /// Attempt to receive routing-keys from , an empty message expected at the end of routing keys. + /// If no message is immediately available, return false. + /// + /// The socket to receive from. + /// The routing-keys of the received message. + /// true if a message was available, otherwise false. + public static bool TryReceiveRoutingKeys(this IReceivingSocket socket, [NotNullWhen(returnValue: true)] out IEnumerable? routingKeys) + { + return TryReceiveRoutingKeys(socket, TimeSpan.Zero, out routingKeys); + } + + /// + /// Attempt to receive a routing-keys from . + /// If no message is available within , return false. + /// + /// The socket to receive from. + /// The maximum period of time to wait for a message to become available. + /// The routing-keys of the received message. + /// true if a message was available, otherwise false. + public static bool TryReceiveRoutingKeys(this IReceivingSocket socket, TimeSpan timeout, [NotNullWhen(returnValue: true)] out IEnumerable? routingKeys) + { + RoutingKey first = new RoutingKey(); + + if (socket.TryReceiveRoutingKey(timeout, ref first, out bool more)) + { + if (!more) + throw new InvalidException("Malformed multipart message, empty message expected"); + + List keys = new List(); + routingKeys = keys; + + if (first.Bytes.Length == 0) + return true; + + keys.Add(first); + while (true) + { + var routingKey = socket.ReceiveRoutingKey(out more); + if (!more) + throw new InvalidException("Malformed multipart message, empty message expected"); + + if (routingKey.Bytes.Length == 0) + break; + + keys.Add(routingKey); + } + + return true; + } + + routingKeys = null; + return false; + } + + #endregion } } diff --git a/src/NetMQ/RoutingKey.cs b/src/NetMQ/RoutingKey.cs index 8744a23a6..68c3c1462 100644 --- a/src/NetMQ/RoutingKey.cs +++ b/src/NetMQ/RoutingKey.cs @@ -31,6 +31,15 @@ public RoutingKey(string b64) bytes = Convert.FromBase64String(b64); } + /// + /// Create a new routing key out of a Int64 + /// + /// + public RoutingKey(long value) + { + bytes = NetworkOrderBitsConverter.GetBytes(value); + } + internal byte[] Bytes { get { return bytes; }