diff --git a/src/NetMQ.Tests/ClientServer.cs b/src/NetMQ.Tests/ClientServer.cs
index 8fad5621..f8da5170 100644
--- a/src/NetMQ.Tests/ClientServer.cs
+++ b/src/NetMQ.Tests/ClientServer.cs
@@ -54,7 +54,7 @@ public void Tcp()
}
[Fact]
- public async void Async()
+ public async Task Async()
{
using var server = new ServerSocket();
using var client = new ClientSocket();
@@ -72,7 +72,7 @@ public async void Async()
}
[Fact]
- public async void AsyncWithCancellationToken()
+ public async Task AsyncWithCancellationToken()
{
using CancellationTokenSource source = new CancellationTokenSource();
using var server = new ServerSocket();
@@ -85,7 +85,7 @@ public async void AsyncWithCancellationToken()
#if NETCOREAPP3_1
[Fact(Timeout = 120)]
- public async void AsyncEnumerableCanceled()
+ public async Task AsyncEnumerableCanceled()
{
using CancellationTokenSource source = new CancellationTokenSource();
using var server = new ServerSocket();
diff --git a/src/NetMQ.Tests/MessageTests.cs b/src/NetMQ.Tests/MessageTests.cs
index 2009f8f9..3ea05d47 100644
--- a/src/NetMQ.Tests/MessageTests.cs
+++ b/src/NetMQ.Tests/MessageTests.cs
@@ -128,7 +128,7 @@ public void Issue52_ReqToRouterBug()
var msg = router.ReceiveMultipartMessage();
Assert.Equal(3, msg.FrameCount);
- Assert.Equal(msg[2].ConvertToString(), testmessage);
+ Assert.Equal(testmessage, msg[2].ConvertToString());
}
}
diff --git a/src/NetMQ.Tests/NetMQ.Tests.csproj b/src/NetMQ.Tests/NetMQ.Tests.csproj
index ac51221d..347d8bc9 100644
--- a/src/NetMQ.Tests/NetMQ.Tests.csproj
+++ b/src/NetMQ.Tests/NetMQ.Tests.csproj
@@ -15,7 +15,7 @@
true
-
+
/Library/Frameworks/Mono.framework/Versions/Current/lib/mono
/usr/lib/mono
/usr/local/lib/mono
@@ -39,14 +39,28 @@
-
-
-
-
+
+
-
+
+
+
+
+ all
+ runtime; build; native; contentfiles; analyzers; buildtransitive
+
+
+
+
+
+
+ all
+ runtime; build; native; contentfiles; analyzers; buildtransitive
+
+
+
diff --git a/src/NetMQ.Tests/NetMQMonitorTests.cs b/src/NetMQ.Tests/NetMQMonitorTests.cs
index e6665e0b..a4d3708a 100644
--- a/src/NetMQ.Tests/NetMQMonitorTests.cs
+++ b/src/NetMQ.Tests/NetMQMonitorTests.cs
@@ -68,7 +68,7 @@ public void StartAsync()
Thread.Sleep(200);
Assert.Equal(TaskStatus.Running, task.Status);
monitor.Stop();
- Assert.True(task.Wait(TimeSpan.FromMilliseconds(1000)));
+ Assert.True(TaskUtils.Wait(task, TimeSpan.FromMilliseconds(1000)));
}
}
#endif
@@ -154,7 +154,7 @@ public void MonitorDisposeProperlyWhenDisposedAfterMonitoredTcpSocket()
}
Thread.Sleep(100);
// Monitor.Dispose should complete
- var completed = Task.Factory.StartNew(() => monitor.Dispose()).Wait(1000);
+ var completed = TaskUtils.Wait(Task.Factory.StartNew(() => monitor.Dispose()), TimeSpan.FromMilliseconds(1000));
Assert.True(completed);
}
// NOTE If this test fails, it will hang because context.Dispose will block
diff --git a/src/NetMQ.Tests/NetMQPollerTest.cs b/src/NetMQ.Tests/NetMQPollerTest.cs
index f4903073..34073031 100644
--- a/src/NetMQ.Tests/NetMQPollerTest.cs
+++ b/src/NetMQ.Tests/NetMQPollerTest.cs
@@ -395,7 +395,7 @@ public void RemoveSocket()
poller.Stop();
// await the pollerTask, 1ms should suffice
- pollerTask.Wait(1);
+ TaskUtils.Wait(pollerTask, TimeSpan.FromMilliseconds(1));
Assert.True(pollerTask.IsCompleted);
}
}
@@ -879,7 +879,7 @@ public void OneTask()
Assert.True(poller.CanExecuteTaskInline, "Should be on NetMQPoller thread");
});
task.Start(poller);
- task.Wait();
+ TaskUtils.Wait(task);
Assert.True(triggered);
}
@@ -894,7 +894,7 @@ public void SetsCurrentTaskScheduler()
var task = new Task(() => Assert.Same(TaskScheduler.Current, poller));
task.Start(poller);
- task.Wait();
+ TaskUtils.Wait(task);
}
}
@@ -911,7 +911,7 @@ public void CanExecuteTaskInline()
var task = new Task(() => Assert.True(poller.CanExecuteTaskInline));
task.Start(poller);
- task.Wait();
+ TaskUtils.Wait(task);
}
}
@@ -941,8 +941,8 @@ public void ContinueWith()
}, poller);
task.Start(poller);
- task.Wait();
- task2.Wait();
+ TaskUtils.Wait(task);
+ TaskUtils.Wait(task2);
Assert.Equal(threadId1, threadId2);
Assert.Equal(1, runCount1);
@@ -982,9 +982,9 @@ public void TwoThreads()
}
});
- t1.Wait(1000);
- t2.Wait(1000);
- Task.WaitAll(allTasks.ToArray(), 1000);
+ TaskUtils.Wait(t1, TimeSpan.FromMilliseconds(1000));
+ TaskUtils.Wait(t2, TimeSpan.FromMilliseconds(1000));
+ TaskUtils.WaitAll(allTasks, TimeSpan.FromMilliseconds(1000));
Assert.Equal(100, count1);
Assert.Equal(100, count2);
diff --git a/src/NetMQ.Tests/NetMQQueueTests.cs b/src/NetMQ.Tests/NetMQQueueTests.cs
index 9a26870d..7540d9b3 100644
--- a/src/NetMQ.Tests/NetMQQueueTests.cs
+++ b/src/NetMQ.Tests/NetMQQueueTests.cs
@@ -39,7 +39,7 @@ public void EnqueueShouldNotBlockWhenCapacityIsZero()
}
});
- bool completed = task.Wait(TimeSpan.FromSeconds(1));
+ bool completed = TaskUtils.Wait(task, TimeSpan.FromSeconds(1));
Assert.True(completed, "Enqueue task should have completed " + socketWatermarkCapacity + " enqueue within 1 second");
}
}
diff --git a/src/NetMQ.Tests/PgmTests.cs b/src/NetMQ.Tests/PgmTests.cs
index 1dcc857e..d9b38aa5 100644
--- a/src/NetMQ.Tests/PgmTests.cs
+++ b/src/NetMQ.Tests/PgmTests.cs
@@ -230,8 +230,8 @@ public void Sending1000Messages()
}
});
- pubTask.Wait();
- subTask.Wait();
+ TaskUtils.Wait(pubTask);
+ TaskUtils.Wait(subTask);
Assert.Equal(1000, count);
}
@@ -291,7 +291,7 @@ public void SubscriberCleanupOnUnbind(string address)
monitor.Stop();
- monitorTask.Wait();
+ TaskUtils.Wait(monitorTask);
}
}
}
diff --git a/src/NetMQ.Tests/RadioDish.cs b/src/NetMQ.Tests/RadioDish.cs
index 676f85d4..d97b871d 100644
--- a/src/NetMQ.Tests/RadioDish.cs
+++ b/src/NetMQ.Tests/RadioDish.cs
@@ -1,5 +1,6 @@
using System;
using System.Threading;
+using System.Threading.Tasks;
using NetMQ.Sockets;
using Xunit;
using Xunit.Abstractions;
@@ -47,7 +48,7 @@ public void TestBlocking()
}
[Fact]
- public async void TestAsync()
+ public async Task TestAsync()
{
using var radio = new RadioSocket();
using var dish = new DishSocket();
diff --git a/src/NetMQ.Tests/RouterTests.cs b/src/NetMQ.Tests/RouterTests.cs
index 0633b6a7..daacc230 100644
--- a/src/NetMQ.Tests/RouterTests.cs
+++ b/src/NetMQ.Tests/RouterTests.cs
@@ -44,7 +44,7 @@ public void ReceiveReadyDot35Bug()
using (var server = new RouterSocket())
{
server.BindRandomPort("tcp://127.0.0.1");
- server.ReceiveReady += (s, e) => Assert.True(false, "Should not receive");
+ server.ReceiveReady += (s, e) => Assert.Fail("Should not receive");
Assert.False(server.Poll(TimeSpan.FromMilliseconds(1500)));
}
diff --git a/src/NetMQ.Tests/ScatterGather.cs b/src/NetMQ.Tests/ScatterGather.cs
index 6847c596..affc82e4 100644
--- a/src/NetMQ.Tests/ScatterGather.cs
+++ b/src/NetMQ.Tests/ScatterGather.cs
@@ -1,4 +1,5 @@
using System.Threading;
+using System.Threading.Tasks;
using NetMQ.Sockets;
using Xunit;
@@ -47,7 +48,7 @@ public void TestBlocking()
}
[Fact]
- public async void TestAsync()
+ public async Task TestAsync()
{
using var scatter = new ScatterSocket();
using var gather = new GatherSocket();
diff --git a/src/NetMQ.Tests/SocketTests.cs b/src/NetMQ.Tests/SocketTests.cs
index 87072d62..4a0c3e24 100644
--- a/src/NetMQ.Tests/SocketTests.cs
+++ b/src/NetMQ.Tests/SocketTests.cs
@@ -134,7 +134,7 @@ public void ReceiveMessageWithTimeout()
t1.Start();
t2.Start();
- Task.WaitAll(t1, t2);
+ TaskUtils.WaitAll(new[]{t1, t2});
}
}
diff --git a/src/NetMQ.Tests/TaskUtils.cs b/src/NetMQ.Tests/TaskUtils.cs
new file mode 100644
index 00000000..d6381fc6
--- /dev/null
+++ b/src/NetMQ.Tests/TaskUtils.cs
@@ -0,0 +1,56 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace NetMQ.Tests
+{
+ internal class TaskUtils
+ {
+ internal static async Task PollUntil(Func condition, TimeSpan timeout)
+ {
+ var cts = new CancellationTokenSource();
+ cts.CancelAfter(timeout);
+
+ await PollUntil(condition, cts.Token);
+ }
+
+ internal static async Task PollUntil(Func condition, CancellationToken ct = default)
+ {
+ try
+ {
+ while (!condition())
+ {
+ await Task.Delay(25, ct).ConfigureAwait(true);
+ }
+ }
+ catch (TaskCanceledException)
+ {
+ // Task was cancelled. Ignore exception and return.
+ }
+ }
+
+ internal static bool WaitAll(IEnumerable tasks, TimeSpan timeout)
+ {
+ PollUntil(() => tasks.All(t => t.IsCompleted), timeout).Wait();
+ return tasks.All(t => t.Status == TaskStatus.RanToCompletion);
+ }
+
+ internal static void WaitAll(IEnumerable tasks)
+ {
+ PollUntil(() => tasks.All(t => t.IsCompleted), Timeout.InfiniteTimeSpan).Wait();
+ }
+
+ internal static bool Wait(Task task, TimeSpan timeout)
+ {
+ PollUntil(() => task.IsCompleted, timeout).Wait();
+ return task.Status == TaskStatus.RanToCompletion;
+ }
+
+ internal static void Wait(Task task)
+ {
+ PollUntil(() => task.IsCompleted, Timeout.InfiniteTimeSpan).Wait();
+ }
+ }
+}
diff --git a/src/NetMQ.Tests/XPubSubTests.cs b/src/NetMQ.Tests/XPubSubTests.cs
index 14d730e7..a0a13674 100644
--- a/src/NetMQ.Tests/XPubSubTests.cs
+++ b/src/NetMQ.Tests/XPubSubTests.cs
@@ -332,7 +332,7 @@ public void Manual()
sub.SendFrame(new byte[] { 1, (byte)'A' });
var subscription = pub.ReceiveFrameBytes();
- Assert.Equal(subscription[1], (byte)'A');
+ Assert.Equal((byte)'A', subscription[1]);
pub.Subscribe("B");
pub.SendFrame("A");
@@ -356,7 +356,7 @@ public void WelcomeMessage()
var subscription = pub.ReceiveFrameBytes();
- Assert.Equal(subscription[1], (byte)'W');
+ Assert.Equal((byte)'W', subscription[1]);
Assert.Equal("W", sub.ReceiveFrameString());
}
@@ -377,7 +377,7 @@ public void ClearWelcomeMessage()
var subscription = pub.ReceiveFrameBytes();
- Assert.Equal(subscription[1], (byte)'W');
+ Assert.Equal((byte)'W', subscription[1]);
Assert.False(sub.TrySkipFrame());
}
diff --git a/src/NetMQ/Core/Mailbox.cs b/src/NetMQ/Core/Mailbox.cs
index dc10b378..c5fb7751 100644
--- a/src/NetMQ/Core/Mailbox.cs
+++ b/src/NetMQ/Core/Mailbox.cs
@@ -229,13 +229,9 @@ public bool TryRecv(int timeout, out Command command)
return false;
}
- // We've got the signal. Now we can switch into active state.
- m_active = true;
-
- // Get a command.
- var ok = m_commandPipe.TryRead(out command);
- Debug.Assert(ok);
- return ok;
+ // We've got the signal. Now we can switch into active state if we can read.
+ m_active = m_commandPipe.TryRead(out command);
+ return m_active;
}
///
diff --git a/src/NetMQ/Core/SocketBase.cs b/src/NetMQ/Core/SocketBase.cs
index cf706321..fc20b24e 100644
--- a/src/NetMQ/Core/SocketBase.cs
+++ b/src/NetMQ/Core/SocketBase.cs
@@ -1261,8 +1261,7 @@ private void ProcessCommands(int timeout, bool throttle, CancellationToken cance
// Process all the commands available at the moment.
while (found)
{
- Assumes.NotNull(command.Destination);
- command.Destination.ProcessCommand(command);
+ command.Destination?.ProcessCommand(command);
found = m_mailbox.TryRecv(0, out command);
}
diff --git a/src/NetMQ/Core/YPipe.cs b/src/NetMQ/Core/YPipe.cs
index 61f002df..61fc807c 100644
--- a/src/NetMQ/Core/YPipe.cs
+++ b/src/NetMQ/Core/YPipe.cs
@@ -180,17 +180,25 @@ public bool CheckRead()
/// true if the read succeeded, otherwise false.
public bool TryRead([MaybeNullWhen(returnValue: false)] out T value)
{
- // Try to prefetch a value.
- if (!CheckRead())
+ try
+ {
+ // Try to prefetch a value.
+ if (!CheckRead())
+ {
+ value = default(T);
+ return false;
+ }
+
+ // There was at least one value prefetched.
+ // Return it to the caller.
+ value = m_queue.Pop();
+ return true;
+ }
+ catch
{
value = default(T);
return false;
}
-
- // There was at least one value prefetched.
- // Return it to the caller.
- value = m_queue.Pop();
- return true;
}
///
diff --git a/src/NetMQ/NetMQ.csproj b/src/NetMQ/NetMQ.csproj
index a2f3f93c..221b2527 100644
--- a/src/NetMQ/NetMQ.csproj
+++ b/src/NetMQ/NetMQ.csproj
@@ -39,11 +39,11 @@
-
+
-
+