Skip to content

Commit

Permalink
AsyncStreams: Allow extending writable ending state until needed
Browse files Browse the repository at this point in the history
This will be useful to allow flushing the last trail of data in a writable being ended.
  • Loading branch information
Pagghiu committed Dec 10, 2024
1 parent 21c8605 commit 70a4f92
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 10 deletions.
43 changes: 35 additions & 8 deletions Libraries/AsyncStreams/AsyncStreams.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ void SC::AsyncReadableStream::push(AsyncBufferView::ID bufferID, size_t newSize)
state = State::Destroyed;
return;
}
if (newSize == 0)
{
emitError(Result::Error("AsyncReadableStream::push zero sized buffer is not allowed"));
return;
}
// Push buffer to the queue
buffers->setNewBufferSize(bufferID, newSize);
Request request;
Expand Down Expand Up @@ -365,7 +370,13 @@ SC::Result SC::AsyncWritableStream::write(AsyncBufferView::ID bufferID, Function
}
break;
case State::Ending:
case State::Ended: Assert::unreachable(); break;
if (not canEndWritable.isValid() or canEndWritable())
{
eventFinish.emit();
state = State::Ended;
}
break;
case State::Ended: break;
}
return Result(true);
}
Expand All @@ -381,14 +392,12 @@ SC::Result SC::AsyncWritableStream::write(Span<const char> data, Function<void(A
return write(bufferID, cb);
}

bool SC::AsyncWritableStream::tryAsync(Result potentialError)
void SC::AsyncWritableStream::tryAsync(Result potentialError)
{
if (potentialError)
{
eventError.emit(potentialError);
return false;
}
return true;
}

void SC::AsyncWritableStream::finishedWriting(AsyncBufferView::ID bufferID,
Expand All @@ -413,7 +422,10 @@ void SC::AsyncWritableStream::finishedWriting(AsyncBufferView::ID
// Queue is empty
if (state == State::Ending)
{
state = State::Ended;
if (not canEndWritable.isValid() or canEndWritable())
{
state = State::Ended;
}
}
else
{
Expand Down Expand Up @@ -441,9 +453,24 @@ void SC::AsyncWritableStream::end()
switch (state)
{
case State::Stopped:
// Can just jump to ended state
state = State::Ended;
eventFinish.emit();
if (canEndWritable.isValid())
{
if (canEndWritable())
{
state = State::Ended;
eventFinish.emit();
}
else
{
state = State::Ending;
}
}
else
{
// Can just jump to ended state
state = State::Ended;
eventFinish.emit();
}
break;
case State::Writing:
// We need to wait for current in-flight write to end
Expand Down
10 changes: 8 additions & 2 deletions Libraries/AsyncStreams/AsyncStreams.h
Original file line number Diff line number Diff line change
Expand Up @@ -241,9 +241,15 @@ struct AsyncWritableStream
/// @brief Signals an async error received
void emitError(Result error);

private:
bool tryAsync(Result potentialError);
/// @brief Allows keeping a writable in ENDING state until it has finished flushing all pending data.
/// If a writable stream redefines this function it should return true to allow transitioning to ENDED
/// state and return false to keep staying in ENDING state.
Function<bool()> canEndWritable;

/// @brief Will emit error if the passed in Result is false
void tryAsync(Result potentialError);

private:
enum class State
{
Stopped,
Expand Down

0 comments on commit 70a4f92

Please sign in to comment.