Skip to content
This repository has been archived by the owner on Dec 18, 2018. It is now read-only.

Commit

Permalink
Better filter clients connecting to Kestrel's dispatch pipes
Browse files Browse the repository at this point in the history
  • Loading branch information
halter73 committed Nov 11, 2016
1 parent ec89197 commit f1d0faf
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public abstract class ListenerPrimary : Listener
private readonly List<UvPipeHandle> _dispatchPipes = new List<UvPipeHandle>();
private int _dispatchIndex;
private string _pipeName;
private byte[] _pipeMessage;
private IntPtr _fileCompletionInfoPtr;
private bool _tryDetachFromIOCP = PlatformApis.IsWindows;

Expand All @@ -36,10 +37,12 @@ protected ListenerPrimary(ServiceContext serviceContext) : base(serviceContext)

public async Task StartAsync(
string pipeName,
byte[] pipeMessage,
ServerAddress address,
KestrelThread thread)
{
_pipeName = pipeName;
_pipeMessage = pipeMessage;

if (_fileCompletionInfoPtr == IntPtr.Zero)
{
Expand Down Expand Up @@ -187,34 +190,47 @@ await Thread.PostAsync(state =>

private class PipeReadContext
{
private const int _bufferLength = 16;

private readonly ListenerPrimary _listener;
private readonly byte[] _buf = new byte[_bufferLength];
private readonly IntPtr _bufPtr;
private GCHandle _bufHandle;
private int _bytesRead;

public PipeReadContext(ListenerPrimary listener)
{
_listener = listener;
_bufHandle = GCHandle.Alloc(new byte[8], GCHandleType.Pinned);
_bufHandle = GCHandle.Alloc(_buf, GCHandleType.Pinned);
_bufPtr = _bufHandle.AddrOfPinnedObject();
}

public Libuv.uv_buf_t AllocCallback(UvStreamHandle dispatchPipe, int suggestedSize)
{
return dispatchPipe.Libuv.buf_init(_bufPtr + _bytesRead, 8 - _bytesRead);
return dispatchPipe.Libuv.buf_init(_bufPtr + _bytesRead, _bufferLength - _bytesRead);
}

public unsafe void ReadCallback(UvStreamHandle dispatchPipe, int status)
public void ReadCallback(UvStreamHandle dispatchPipe, int status)
{
try
{
dispatchPipe.Libuv.ThrowIfErrored(status);

_bytesRead += status;

if (_bytesRead == 8)
if (_bytesRead == _bufferLength)
{
if (*(ulong*)_bufPtr == Constants.PipeMessage)
var correctMessage = true;

for (var i = 0; i < _bufferLength; i++)
{
if (_buf[i] != _listener._pipeMessage[i])
{
correctMessage = false;
}
}

if (correctMessage)
{
_listener._dispatchPipes.Add((UvPipeHandle) dispatchPipe);
dispatchPipe.ReadStop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http
/// </summary>
public abstract class ListenerSecondary : ListenerContext, IAsyncDisposable
{
private static ArraySegment<ArraySegment<byte>> _pipeMessage =
new ArraySegment<ArraySegment<byte>>(new[] { new ArraySegment<byte>(BitConverter.GetBytes(Constants.PipeMessage)) });

private string _pipeName;
private byte[] _pipeMessage;
private IntPtr _ptr;
private Libuv.uv_buf_t _buf;
private bool _closed;
Expand All @@ -36,10 +34,12 @@ protected ListenerSecondary(ServiceContext serviceContext) : base(serviceContext

public Task StartAsync(
string pipeName,
byte[] pipeMessage,
ServerAddress address,
KestrelThread thread)
{
_pipeName = pipeName;
_pipeMessage = pipeMessage;
_buf = thread.Loop.Libuv.buf_init(_ptr, 4);

ServerAddress = address;
Expand Down Expand Up @@ -104,7 +104,7 @@ private void ConnectedCallback(UvConnectRequest connect, int status, Exception e
writeReq.Init(Thread.Loop);
writeReq.Write(
DispatchPipe,
_pipeMessage,
new ArraySegment<ArraySegment<byte>>(new [] { new ArraySegment<byte>(_pipeMessage) }),
(req, status2, ex, state) =>
{
req.Dispose();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@ internal class Constants

public const string ServerName = "Kestrel";

// "Kestrel\0"
public const ulong PipeMessage = 0x006C65727473654B;

private static int? GetECONNRESET()
{
if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public IDisposable CreateServer(ServerAddress address)
try
{
var pipeName = (Libuv.IsWindows ? @"\\.\pipe\kestrel_" : "/tmp/kestrel_") + Guid.NewGuid().ToString("n");
var pipeMessage = Guid.NewGuid().ToByteArray();

var single = Threads.Count == 1;
var first = true;
Expand All @@ -91,15 +92,15 @@ public IDisposable CreateServer(ServerAddress address)
: new TcpListenerPrimary(ServiceContext);

listeners.Add(listener);
listener.StartAsync(pipeName, address, thread).Wait();
listener.StartAsync(pipeName, pipeMessage, address, thread).Wait();
}
else
{
var listener = usingPipes
? (ListenerSecondary) new PipeListenerSecondary(ServiceContext)
: new TcpListenerSecondary(ServiceContext);
listeners.Add(listener);
listener.StartAsync(pipeName, address, thread).Wait();
listener.StartAsync(pipeName, pipeMessage, address, thread).Wait();
}

first = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Hosting.Server;
Expand All @@ -10,6 +11,7 @@
using Microsoft.AspNetCore.Server.Kestrel;
using Microsoft.AspNetCore.Server.Kestrel.Internal;
using Microsoft.AspNetCore.Server.Kestrel.Internal.Http;
using Microsoft.AspNetCore.Server.Kestrel.Internal.Infrastructure;
using Microsoft.AspNetCore.Server.Kestrel.Internal.Networking;
using Microsoft.AspNetCore.Testing;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -55,12 +57,13 @@ public async Task ConnectionsGetRoundRobinedToSecondaryListeners()
{
var address = ServerAddress.FromUrl("http://127.0.0.1:0/");
var pipeName = (libuv.IsWindows ? @"\\.\pipe\kestrel_" : "/tmp/kestrel_") + Guid.NewGuid().ToString("n");
var pipeMessage = Guid.NewGuid().ToByteArray();

// Start primary listener
var kestrelThreadPrimary = new KestrelThread(kestrelEngine);
await kestrelThreadPrimary.StartAsync();
var listenerPrimary = new TcpListenerPrimary(serviceContextPrimary);
await listenerPrimary.StartAsync(pipeName, address, kestrelThreadPrimary);
await listenerPrimary.StartAsync(pipeName, pipeMessage, address, kestrelThreadPrimary);

// Until a secondary listener is added, TCP connections get dispatched directly
Assert.Equal("Primary", await HttpClientSlim.GetStringAsync(address.ToString()));
Expand All @@ -70,7 +73,7 @@ public async Task ConnectionsGetRoundRobinedToSecondaryListeners()
var kestrelThreadSecondary = new KestrelThread(kestrelEngine);
await kestrelThreadSecondary.StartAsync();
var listenerSecondary = new TcpListenerSecondary(serviceContextSecondary);
await listenerSecondary.StartAsync(pipeName, address, kestrelThreadSecondary);
await listenerSecondary.StartAsync(pipeName, pipeMessage, address, kestrelThreadSecondary);

// Once a secondary listener is added, TCP connections start getting dispatched to it
Assert.Equal("Secondary", await HttpClientSlim.GetStringAsync(address.ToString()));
Expand Down Expand Up @@ -128,18 +131,19 @@ public async Task NonListenerPipeConnectionsAreLoggedAndIgnored()
{
var address = ServerAddress.FromUrl("http://127.0.0.1:0/");
var pipeName = (libuv.IsWindows ? @"\\.\pipe\kestrel_" : "/tmp/kestrel_") + Guid.NewGuid().ToString("n");
var pipeMessage = Guid.NewGuid().ToByteArray();

// Start primary listener
var kestrelThreadPrimary = new KestrelThread(kestrelEngine);
await kestrelThreadPrimary.StartAsync();
var listenerPrimary = new TcpListenerPrimary(serviceContextPrimary);
await listenerPrimary.StartAsync(pipeName, address, kestrelThreadPrimary);
await listenerPrimary.StartAsync(pipeName, pipeMessage, address, kestrelThreadPrimary);

// Add secondary listener
var kestrelThreadSecondary = new KestrelThread(kestrelEngine);
await kestrelThreadSecondary.StartAsync();
var listenerSecondary = new TcpListenerSecondary(serviceContextSecondary);
await listenerSecondary.StartAsync(pipeName, address, kestrelThreadSecondary);
await listenerSecondary.StartAsync(pipeName, pipeMessage, address, kestrelThreadSecondary);

// TCP Connections get round-robined
Assert.Equal("Secondary", await HttpClientSlim.GetStringAsync(address.ToString()));
Expand Down Expand Up @@ -199,7 +203,79 @@ public async Task NonListenerPipeConnectionsAreLoggedAndIgnored()

Assert.Equal(1, primaryTrace.Logger.TotalErrorsLogged);
var errorMessage = primaryTrace.Logger.Messages.First(m => m.LogLevel == LogLevel.Error);
Assert.Contains("EOF", errorMessage.Exception.ToString());
Assert.Equal(Constants.EOF, Assert.IsType<UvException>(errorMessage.Exception).StatusCode);
}


[Fact]
public async Task PipeConnectionsWithWrongMessageAreLoggedAndIgnored()
{
var libuv = new Libuv();

var primaryTrace = new TestKestrelTrace();

var serviceContextPrimary = new TestServiceContext
{
Log = primaryTrace,
FrameFactory = context =>
{
return new Frame<DefaultHttpContext>(new TestApplication(c =>
{
return c.Response.WriteAsync("Primary");
}), context);
}
};

var serviceContextSecondary = new ServiceContext
{
Log = new TestKestrelTrace(),
AppLifetime = serviceContextPrimary.AppLifetime,
DateHeaderValueManager = serviceContextPrimary.DateHeaderValueManager,
ServerOptions = serviceContextPrimary.ServerOptions,
ThreadPool = serviceContextPrimary.ThreadPool,
FrameFactory = context =>
{
return new Frame<DefaultHttpContext>(new TestApplication(c =>
{
return c.Response.WriteAsync("Secondary"); ;
}), context);
}
};

using (var kestrelEngine = new KestrelEngine(libuv, serviceContextPrimary))
{
var address = ServerAddress.FromUrl("http://127.0.0.1:0/");
var pipeName = (libuv.IsWindows ? @"\\.\pipe\kestrel_" : "/tmp/kestrel_") + Guid.NewGuid().ToString("n");
var pipeMessage = Guid.NewGuid().ToByteArray();

// Start primary listener
var kestrelThreadPrimary = new KestrelThread(kestrelEngine);
await kestrelThreadPrimary.StartAsync();
var listenerPrimary = new TcpListenerPrimary(serviceContextPrimary);
await listenerPrimary.StartAsync(pipeName, pipeMessage, address, kestrelThreadPrimary);

// Add secondary listener with wrong pipe message
var kestrelThreadSecondary = new KestrelThread(kestrelEngine);
await kestrelThreadSecondary.StartAsync();
var listenerSecondary = new TcpListenerSecondary(serviceContextSecondary);
await listenerSecondary.StartAsync(pipeName, Guid.NewGuid().ToByteArray(), address, kestrelThreadSecondary);

// TCP Connections get round-robined
Assert.Equal("Primary", await HttpClientSlim.GetStringAsync(address.ToString()));
Assert.Equal("Primary", await HttpClientSlim.GetStringAsync(address.ToString()));
Assert.Equal("Primary", await HttpClientSlim.GetStringAsync(address.ToString()));

await listenerSecondary.DisposeAsync();
await kestrelThreadSecondary.StopAsync(TimeSpan.FromSeconds(1));

await listenerPrimary.DisposeAsync();
await kestrelThreadPrimary.StopAsync(TimeSpan.FromSeconds(1));
}

Assert.Equal(1, primaryTrace.Logger.TotalErrorsLogged);
var errorMessage = primaryTrace.Logger.Messages.First(m => m.LogLevel == LogLevel.Error);
Assert.IsType<IOException>(errorMessage.Exception);
Assert.Contains("Bad data", errorMessage.Exception.ToString());
}

private class TestApplication : IHttpApplication<DefaultHttpContext>
Expand Down

0 comments on commit f1d0faf

Please sign in to comment.