Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
cuteant committed Jul 15, 2021
1 parent c9579a6 commit 287b9ab
Show file tree
Hide file tree
Showing 12 changed files with 30 additions and 38 deletions.
6 changes: 0 additions & 6 deletions src/DotNetty.Buffers/CompositeByteBuffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -740,12 +740,6 @@ public override bool IsSingleIoBuffer
return _components[0].Buffer.IsSingleIoBuffer;
default:
return false;
//int count = 0;
//for (int i = 0; i < size; i++)
//{
// count += _components[i].Buffer.IoBufferCount;
//}
//return 1u >= (uint)count;
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/DotNetty.Codecs.Http/HttpServerUpgradeHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ bool Upgrade(IChannelHandlerContext ctx, IFullHttpRequest request)
static readonly Action<Task, object> CloseOnFailureAction = (t, s) => CloseOnFailure(t, s);
static void CloseOnFailure(Task t, object s)
{
if (!t.IsSuccess())
if (t.IsFailure())
{
_ = ((IChannelHandlerContext)s).Channel.CloseAsync();
}
Expand Down
6 changes: 3 additions & 3 deletions src/DotNetty.Codecs.Http2/DefaultHttp2ConnectionEncoder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -583,11 +583,11 @@ private static void NotifyLifecycleManagerOnError(Task future, IHttp2LifecycleMa
private static readonly Action<Task, object> NotifyLifecycleManagerOnErrorAction = (t, s) => NotifyLifecycleManagerOnError0(t, s);
private static void NotifyLifecycleManagerOnError0(Task t, object s)
{
var wrapped = ((IHttp2LifecycleManager, IChannelHandlerContext))s;
var (lm, ctx) = ((IHttp2LifecycleManager, IChannelHandlerContext))s;
var cause = t.Exception;
if (cause is object)
{
wrapped.Item1.OnError(wrapped.Item2, true, cause.InnerException);
lm.OnError(ctx, true, cause.InnerException);
}
}

Expand Down Expand Up @@ -681,7 +681,7 @@ public FlowControlledBase(DefaultHttp2ConnectionEncoder encoder, IHttp2Stream st
private static readonly Action<Task, object> LinkOutcomeContinuationAction = (t, s) => LinkOutcomeContinuation(t, s);
private static void LinkOutcomeContinuation(Task task, object state)
{
if (!task.IsSuccess())
if (task.IsFailure())
{
var self = (FlowControlledBase)state;
self.Error(self._owner.FlowController.ChannelHandlerContext, task.Exception.InnerException);
Expand Down
23 changes: 11 additions & 12 deletions src/DotNetty.Codecs.Http2/Http2ConnectionHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -981,7 +981,7 @@ private void ProcessRstStreamWriteResult(IChannelHandlerContext ctx, IHttp2Strea

private void CloseConnectionOnError(IChannelHandlerContext ctx, Task future)
{
if (!future.IsSuccess())
if (future.IsFailure())
{
OnConnectionError(ctx, true, future.Exception.InnerException, null);
}
Expand All @@ -990,16 +990,15 @@ private void CloseConnectionOnError(IChannelHandlerContext ctx, Task future)
private static readonly Action<Task, object> CloseChannelOnCompleteAction = (t, s) => CloseChannelOnComplete(t, s);
private static void CloseChannelOnComplete(Task t, object s)
{
var wrapped = ((IChannelHandlerContext, IPromise, IScheduledTask))s;
_ = (wrapped.Item3?.Cancel());
var promise = wrapped.Item2;
var (ctx, promise, timeoutTask) = ((IChannelHandlerContext, IPromise, IScheduledTask))s;
_ = timeoutTask?.Cancel();
if (promise is object)
{
_ = wrapped.Item1.CloseAsync(promise);
_ = ctx.CloseAsync(promise);
}
else
{
_ = wrapped.Item1.CloseAsync();
_ = ctx.CloseAsync();
}
}
private static readonly Action<object, object> ScheduledCloseChannelAction = (c, p) => ScheduledCloseChannel(c, p);
Expand All @@ -1011,22 +1010,22 @@ private static void ScheduledCloseChannel(object c, object p)
private static readonly Action<Task, object> CloseConnectionOnErrorOnCompleteAction = (t, s) => CloseConnectionOnErrorOnComplete(t, s);
private static void CloseConnectionOnErrorOnComplete(Task t, object s)
{
var wrapped = ((Http2ConnectionHandler, IChannelHandlerContext))s;
wrapped.Item1.CloseConnectionOnError(wrapped.Item2, t);
var (self, ctx) = ((Http2ConnectionHandler, IChannelHandlerContext))s;
self.CloseConnectionOnError(ctx, t);
}

private static readonly Action<Task, object> ProcessRstStreamWriteResultOnCompleteAction = (t, s) => ProcessRstStreamWriteResultOnComplete(t, s);
private static void ProcessRstStreamWriteResultOnComplete(Task t, object s)
{
var wrapped = ((Http2ConnectionHandler, IChannelHandlerContext, IHttp2Stream))s;
wrapped.Item1.ProcessRstStreamWriteResult(wrapped.Item2, wrapped.Item3, t);
var (self, ctx, stream) = ((Http2ConnectionHandler, IChannelHandlerContext, IHttp2Stream))s;
self.ProcessRstStreamWriteResult(ctx, stream, t);
}

private static readonly Action<Task, object> ProcessGoAwayWriteResultOnCompleteAction = (t, s) => ProcessGoAwayWriteResultOnComplete(t, s);
private static void ProcessGoAwayWriteResultOnComplete(Task t, object s)
{
var wrapped = ((IChannelHandlerContext, int, Http2Error, IByteBuffer))s;
ProcessGoAwayWriteResult(wrapped.Item1, wrapped.Item2, wrapped.Item3, wrapped.Item4, t);
var (ctx, lastStreamId, errorCode, debugData) = ((IChannelHandlerContext, int, Http2Error, IByteBuffer))s;
ProcessGoAwayWriteResult(ctx, lastStreamId, errorCode, debugData, t);
}

private static readonly Action<Task, object> CheckCloseConnOnCompleteAction = (t, s) => CheckCloseConnOnComplete(t, s);
Expand Down
7 changes: 3 additions & 4 deletions src/DotNetty.Codecs.Http2/Http2FrameCodec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -511,15 +511,14 @@ private void WriteHeadersFrame(IChannelHandlerContext ctx, IHttp2HeadersFrame he
private static readonly Action<Task, object> ResetNufferedStreamsAction = (t, s) => ResetNufferedStreams(t, s);
private static void ResetNufferedStreams(Task t, object s)
{
var wrapped = ((Http2FrameCodec, int))s;
var self = wrapped.Item1;
var (self, streamId) = ((Http2FrameCodec, int))s;
_ = Interlocked.Decrement(ref self.v_numBufferedStreams);
self.HandleHeaderFuture(t, wrapped.Item2);
self.HandleHeaderFuture(t, streamId);
}

private void HandleHeaderFuture(Task channelFuture, int streamId)
{
if (!channelFuture.IsSuccess())
if (channelFuture.IsFailure())
{
_ = _frameStreamToInitializeMap.TryRemove(streamId, out _);
}
Expand Down
2 changes: 1 addition & 1 deletion src/DotNetty.Codecs.Http2/Http2MultiplexHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ internal static void RegisterDone(Task future, object s)
// Handle any errors that occurred on the local thread while registering. Even though
// failures can happen after this point, they will be handled by the channel by closing the
// childChannel.
if (!future.IsSuccess())
if (future.IsFailure())
{
var childChannel = (IChannel)s;
if (childChannel.IsRegistered)
Expand Down
2 changes: 1 addition & 1 deletion src/DotNetty.Common/Concurrency/PromiseCombiner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ private void OperationComplete(Task future)
{
Debug.Assert(_executor.InEventLoop);
++_doneCount;
if (!future.IsSuccess() && _cause is null)
if (future.IsFailure() && _cause is null)
{
_cause = future.Exception.InnerException;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ public void Register()
{
Task future = _loop.RegisterAsync(this);
Debug.Assert(future.IsCompleted);
if (!future.IsSuccess())
if (future.IsFailure())
{
throw future.Exception.InnerException;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ public void ReceivingGoAwayFailsBufferedStreams()
int failCount = 0;
foreach (Task f in futures)
{
if (!f.IsSuccess())
if (f.IsFailure())
{
failCount++;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public void TestReuseFd(ServerBootstrap sb, Bootstrap cb)
{
cb.ConnectAsync(sc.LocalAddress).ContinueWith(t =>
{
if (!t.IsSuccess())
if (t.IsFailure())
{
clientDonePromise.TrySetException(t.Exception);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -882,7 +882,7 @@ public async Task TestWriteWhilePeerIsClosedReleaseObjectAndFailPromise()
.WriteAndFlushAsync(data2.RetainedDuplicate(), serverChannelCpy.NewPromise())
.ContinueWith(future =>
{
if (!future.IsSuccess() &&
if (future.IsFailure() &&
future.Exception.InnerException is ClosedChannelException)
{
writeFailLatch.Signal();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,11 @@ public async Task TestDeadlockOnAcquire()
try
{
var result = await TaskUtil.WaitAsync(futureA1, TimeSpan.FromSeconds(1));
if (!result || !futureA1.IsSuccess()) { throw new TimeoutException(); }
if (!result || futureA1.IsFailure()) { throw new TimeoutException(); }
Assert.Same(poolA1, futureA1.Result);

result = await TaskUtil.WaitAsync(futureB1, TimeSpan.FromSeconds(1));
if (!result || !futureB1.IsSuccess()) { throw new TimeoutException(); }
if (!result || futureB1.IsFailure()) { throw new TimeoutException(); }
Assert.Same(poolB1, futureB1.Result);
}
catch (Exception)
Expand All @@ -101,11 +101,11 @@ public async Task TestDeadlockOnAcquire()
try
{
var result = await TaskUtil.WaitAsync(futureA2, TimeSpan.FromSeconds(1));
if (!result || !futureA2.IsSuccess()) { throw new TimeoutException(); }
if (!result || futureA2.IsFailure()) { throw new TimeoutException(); }
Assert.Same(poolA1, futureA2.Result);

result = await TaskUtil.WaitAsync(futureB2, TimeSpan.FromSeconds(1));
if (!result || !futureB2.IsSuccess()) { throw new TimeoutException(); }
if (!result || futureB2.IsFailure()) { throw new TimeoutException(); }
Assert.Same(poolB1, futureB2.Result);
}
catch (TimeoutException)
Expand Down Expand Up @@ -246,9 +246,9 @@ public async Task TestDeadlockOnRemove()
try
{
var result = await TaskUtil.WaitAsync(future1, TimeSpan.FromSeconds(1));
if (!result || !future1.IsSuccess()) { throw new TimeoutException(); }
if (!result || future1.IsFailure()) { throw new TimeoutException(); }
result = await TaskUtil.WaitAsync(future2, TimeSpan.FromSeconds(1));
if (!result || !future2.IsSuccess()) { throw new TimeoutException(); }
if (!result || future2.IsFailure()) { throw new TimeoutException(); }
}
catch (TimeoutException)
{
Expand Down

0 comments on commit 287b9ab

Please sign in to comment.