diff --git a/src/NetMQ.Tests/ClientServer.cs b/src/NetMQ.Tests/ClientServer.cs index 8fad5621..f8da5170 100644 --- a/src/NetMQ.Tests/ClientServer.cs +++ b/src/NetMQ.Tests/ClientServer.cs @@ -54,7 +54,7 @@ public void Tcp() } [Fact] - public async void Async() + public async Task Async() { using var server = new ServerSocket(); using var client = new ClientSocket(); @@ -72,7 +72,7 @@ public async void Async() } [Fact] - public async void AsyncWithCancellationToken() + public async Task AsyncWithCancellationToken() { using CancellationTokenSource source = new CancellationTokenSource(); using var server = new ServerSocket(); @@ -85,7 +85,7 @@ public async void AsyncWithCancellationToken() #if NETCOREAPP3_1 [Fact(Timeout = 120)] - public async void AsyncEnumerableCanceled() + public async Task AsyncEnumerableCanceled() { using CancellationTokenSource source = new CancellationTokenSource(); using var server = new ServerSocket(); diff --git a/src/NetMQ.Tests/MessageTests.cs b/src/NetMQ.Tests/MessageTests.cs index 2009f8f9..3ea05d47 100644 --- a/src/NetMQ.Tests/MessageTests.cs +++ b/src/NetMQ.Tests/MessageTests.cs @@ -128,7 +128,7 @@ public void Issue52_ReqToRouterBug() var msg = router.ReceiveMultipartMessage(); Assert.Equal(3, msg.FrameCount); - Assert.Equal(msg[2].ConvertToString(), testmessage); + Assert.Equal(testmessage, msg[2].ConvertToString()); } } diff --git a/src/NetMQ.Tests/NetMQ.Tests.csproj b/src/NetMQ.Tests/NetMQ.Tests.csproj index ac51221d..347d8bc9 100644 --- a/src/NetMQ.Tests/NetMQ.Tests.csproj +++ b/src/NetMQ.Tests/NetMQ.Tests.csproj @@ -15,7 +15,7 @@ true - + /Library/Frameworks/Mono.framework/Versions/Current/lib/mono /usr/lib/mono /usr/local/lib/mono @@ -39,14 +39,28 @@ - - - - + + - + + + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + + + + + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + + + diff --git a/src/NetMQ.Tests/NetMQMonitorTests.cs b/src/NetMQ.Tests/NetMQMonitorTests.cs index e6665e0b..a4d3708a 100644 --- a/src/NetMQ.Tests/NetMQMonitorTests.cs +++ b/src/NetMQ.Tests/NetMQMonitorTests.cs @@ -68,7 +68,7 @@ public void StartAsync() Thread.Sleep(200); Assert.Equal(TaskStatus.Running, task.Status); monitor.Stop(); - Assert.True(task.Wait(TimeSpan.FromMilliseconds(1000))); + Assert.True(TaskUtils.Wait(task, TimeSpan.FromMilliseconds(1000))); } } #endif @@ -154,7 +154,7 @@ public void MonitorDisposeProperlyWhenDisposedAfterMonitoredTcpSocket() } Thread.Sleep(100); // Monitor.Dispose should complete - var completed = Task.Factory.StartNew(() => monitor.Dispose()).Wait(1000); + var completed = TaskUtils.Wait(Task.Factory.StartNew(() => monitor.Dispose()), TimeSpan.FromMilliseconds(1000)); Assert.True(completed); } // NOTE If this test fails, it will hang because context.Dispose will block diff --git a/src/NetMQ.Tests/NetMQPollerTest.cs b/src/NetMQ.Tests/NetMQPollerTest.cs index f4903073..34073031 100644 --- a/src/NetMQ.Tests/NetMQPollerTest.cs +++ b/src/NetMQ.Tests/NetMQPollerTest.cs @@ -395,7 +395,7 @@ public void RemoveSocket() poller.Stop(); // await the pollerTask, 1ms should suffice - pollerTask.Wait(1); + TaskUtils.Wait(pollerTask, TimeSpan.FromMilliseconds(1)); Assert.True(pollerTask.IsCompleted); } } @@ -879,7 +879,7 @@ public void OneTask() Assert.True(poller.CanExecuteTaskInline, "Should be on NetMQPoller thread"); }); task.Start(poller); - task.Wait(); + TaskUtils.Wait(task); Assert.True(triggered); } @@ -894,7 +894,7 @@ public void SetsCurrentTaskScheduler() var task = new Task(() => Assert.Same(TaskScheduler.Current, poller)); task.Start(poller); - task.Wait(); + TaskUtils.Wait(task); } } @@ -911,7 +911,7 @@ public void CanExecuteTaskInline() var task = new Task(() => Assert.True(poller.CanExecuteTaskInline)); task.Start(poller); - task.Wait(); + TaskUtils.Wait(task); } } @@ -941,8 +941,8 @@ public void ContinueWith() }, poller); task.Start(poller); - task.Wait(); - task2.Wait(); + TaskUtils.Wait(task); + TaskUtils.Wait(task2); Assert.Equal(threadId1, threadId2); Assert.Equal(1, runCount1); @@ -982,9 +982,9 @@ public void TwoThreads() } }); - t1.Wait(1000); - t2.Wait(1000); - Task.WaitAll(allTasks.ToArray(), 1000); + TaskUtils.Wait(t1, TimeSpan.FromMilliseconds(1000)); + TaskUtils.Wait(t2, TimeSpan.FromMilliseconds(1000)); + TaskUtils.WaitAll(allTasks, TimeSpan.FromMilliseconds(1000)); Assert.Equal(100, count1); Assert.Equal(100, count2); diff --git a/src/NetMQ.Tests/NetMQQueueTests.cs b/src/NetMQ.Tests/NetMQQueueTests.cs index 9a26870d..7540d9b3 100644 --- a/src/NetMQ.Tests/NetMQQueueTests.cs +++ b/src/NetMQ.Tests/NetMQQueueTests.cs @@ -39,7 +39,7 @@ public void EnqueueShouldNotBlockWhenCapacityIsZero() } }); - bool completed = task.Wait(TimeSpan.FromSeconds(1)); + bool completed = TaskUtils.Wait(task, TimeSpan.FromSeconds(1)); Assert.True(completed, "Enqueue task should have completed " + socketWatermarkCapacity + " enqueue within 1 second"); } } diff --git a/src/NetMQ.Tests/PgmTests.cs b/src/NetMQ.Tests/PgmTests.cs index 1dcc857e..d9b38aa5 100644 --- a/src/NetMQ.Tests/PgmTests.cs +++ b/src/NetMQ.Tests/PgmTests.cs @@ -230,8 +230,8 @@ public void Sending1000Messages() } }); - pubTask.Wait(); - subTask.Wait(); + TaskUtils.Wait(pubTask); + TaskUtils.Wait(subTask); Assert.Equal(1000, count); } @@ -291,7 +291,7 @@ public void SubscriberCleanupOnUnbind(string address) monitor.Stop(); - monitorTask.Wait(); + TaskUtils.Wait(monitorTask); } } } diff --git a/src/NetMQ.Tests/RadioDish.cs b/src/NetMQ.Tests/RadioDish.cs index 676f85d4..d97b871d 100644 --- a/src/NetMQ.Tests/RadioDish.cs +++ b/src/NetMQ.Tests/RadioDish.cs @@ -1,5 +1,6 @@ using System; using System.Threading; +using System.Threading.Tasks; using NetMQ.Sockets; using Xunit; using Xunit.Abstractions; @@ -47,7 +48,7 @@ public void TestBlocking() } [Fact] - public async void TestAsync() + public async Task TestAsync() { using var radio = new RadioSocket(); using var dish = new DishSocket(); diff --git a/src/NetMQ.Tests/RouterTests.cs b/src/NetMQ.Tests/RouterTests.cs index 0633b6a7..daacc230 100644 --- a/src/NetMQ.Tests/RouterTests.cs +++ b/src/NetMQ.Tests/RouterTests.cs @@ -44,7 +44,7 @@ public void ReceiveReadyDot35Bug() using (var server = new RouterSocket()) { server.BindRandomPort("tcp://127.0.0.1"); - server.ReceiveReady += (s, e) => Assert.True(false, "Should not receive"); + server.ReceiveReady += (s, e) => Assert.Fail("Should not receive"); Assert.False(server.Poll(TimeSpan.FromMilliseconds(1500))); } diff --git a/src/NetMQ.Tests/ScatterGather.cs b/src/NetMQ.Tests/ScatterGather.cs index 6847c596..affc82e4 100644 --- a/src/NetMQ.Tests/ScatterGather.cs +++ b/src/NetMQ.Tests/ScatterGather.cs @@ -1,4 +1,5 @@ using System.Threading; +using System.Threading.Tasks; using NetMQ.Sockets; using Xunit; @@ -47,7 +48,7 @@ public void TestBlocking() } [Fact] - public async void TestAsync() + public async Task TestAsync() { using var scatter = new ScatterSocket(); using var gather = new GatherSocket(); diff --git a/src/NetMQ.Tests/SocketTests.cs b/src/NetMQ.Tests/SocketTests.cs index 87072d62..4a0c3e24 100644 --- a/src/NetMQ.Tests/SocketTests.cs +++ b/src/NetMQ.Tests/SocketTests.cs @@ -134,7 +134,7 @@ public void ReceiveMessageWithTimeout() t1.Start(); t2.Start(); - Task.WaitAll(t1, t2); + TaskUtils.WaitAll(new[]{t1, t2}); } } diff --git a/src/NetMQ.Tests/TaskUtils.cs b/src/NetMQ.Tests/TaskUtils.cs new file mode 100644 index 00000000..d6381fc6 --- /dev/null +++ b/src/NetMQ.Tests/TaskUtils.cs @@ -0,0 +1,56 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; + +namespace NetMQ.Tests +{ + internal class TaskUtils + { + internal static async Task PollUntil(Func condition, TimeSpan timeout) + { + var cts = new CancellationTokenSource(); + cts.CancelAfter(timeout); + + await PollUntil(condition, cts.Token); + } + + internal static async Task PollUntil(Func condition, CancellationToken ct = default) + { + try + { + while (!condition()) + { + await Task.Delay(25, ct).ConfigureAwait(true); + } + } + catch (TaskCanceledException) + { + // Task was cancelled. Ignore exception and return. + } + } + + internal static bool WaitAll(IEnumerable tasks, TimeSpan timeout) + { + PollUntil(() => tasks.All(t => t.IsCompleted), timeout).Wait(); + return tasks.All(t => t.Status == TaskStatus.RanToCompletion); + } + + internal static void WaitAll(IEnumerable tasks) + { + PollUntil(() => tasks.All(t => t.IsCompleted), Timeout.InfiniteTimeSpan).Wait(); + } + + internal static bool Wait(Task task, TimeSpan timeout) + { + PollUntil(() => task.IsCompleted, timeout).Wait(); + return task.Status == TaskStatus.RanToCompletion; + } + + internal static void Wait(Task task) + { + PollUntil(() => task.IsCompleted, Timeout.InfiniteTimeSpan).Wait(); + } + } +} diff --git a/src/NetMQ.Tests/XPubSubTests.cs b/src/NetMQ.Tests/XPubSubTests.cs index 14d730e7..a0a13674 100644 --- a/src/NetMQ.Tests/XPubSubTests.cs +++ b/src/NetMQ.Tests/XPubSubTests.cs @@ -332,7 +332,7 @@ public void Manual() sub.SendFrame(new byte[] { 1, (byte)'A' }); var subscription = pub.ReceiveFrameBytes(); - Assert.Equal(subscription[1], (byte)'A'); + Assert.Equal((byte)'A', subscription[1]); pub.Subscribe("B"); pub.SendFrame("A"); @@ -356,7 +356,7 @@ public void WelcomeMessage() var subscription = pub.ReceiveFrameBytes(); - Assert.Equal(subscription[1], (byte)'W'); + Assert.Equal((byte)'W', subscription[1]); Assert.Equal("W", sub.ReceiveFrameString()); } @@ -377,7 +377,7 @@ public void ClearWelcomeMessage() var subscription = pub.ReceiveFrameBytes(); - Assert.Equal(subscription[1], (byte)'W'); + Assert.Equal((byte)'W', subscription[1]); Assert.False(sub.TrySkipFrame()); } diff --git a/src/NetMQ/Core/Mailbox.cs b/src/NetMQ/Core/Mailbox.cs index dc10b378..c5fb7751 100644 --- a/src/NetMQ/Core/Mailbox.cs +++ b/src/NetMQ/Core/Mailbox.cs @@ -229,13 +229,9 @@ public bool TryRecv(int timeout, out Command command) return false; } - // We've got the signal. Now we can switch into active state. - m_active = true; - - // Get a command. - var ok = m_commandPipe.TryRead(out command); - Debug.Assert(ok); - return ok; + // We've got the signal. Now we can switch into active state if we can read. + m_active = m_commandPipe.TryRead(out command); + return m_active; } /// diff --git a/src/NetMQ/Core/SocketBase.cs b/src/NetMQ/Core/SocketBase.cs index cf706321..fc20b24e 100644 --- a/src/NetMQ/Core/SocketBase.cs +++ b/src/NetMQ/Core/SocketBase.cs @@ -1261,8 +1261,7 @@ private void ProcessCommands(int timeout, bool throttle, CancellationToken cance // Process all the commands available at the moment. while (found) { - Assumes.NotNull(command.Destination); - command.Destination.ProcessCommand(command); + command.Destination?.ProcessCommand(command); found = m_mailbox.TryRecv(0, out command); } diff --git a/src/NetMQ/Core/YPipe.cs b/src/NetMQ/Core/YPipe.cs index 61f002df..61fc807c 100644 --- a/src/NetMQ/Core/YPipe.cs +++ b/src/NetMQ/Core/YPipe.cs @@ -180,17 +180,25 @@ public bool CheckRead() /// true if the read succeeded, otherwise false. public bool TryRead([MaybeNullWhen(returnValue: false)] out T value) { - // Try to prefetch a value. - if (!CheckRead()) + try + { + // Try to prefetch a value. + if (!CheckRead()) + { + value = default(T); + return false; + } + + // There was at least one value prefetched. + // Return it to the caller. + value = m_queue.Pop(); + return true; + } + catch { value = default(T); return false; } - - // There was at least one value prefetched. - // Return it to the caller. - value = m_queue.Pop(); - return true; } /// diff --git a/src/NetMQ/NetMQ.csproj b/src/NetMQ/NetMQ.csproj index a2f3f93c..221b2527 100644 --- a/src/NetMQ/NetMQ.csproj +++ b/src/NetMQ/NetMQ.csproj @@ -39,11 +39,11 @@ - + - +