Skip to content

Commit

Permalink
Merge pull request #4 from clue-labs/unwrap-writable
Browse files Browse the repository at this point in the history
Add unwrapWritable() function
  • Loading branch information
clue committed May 21, 2016
2 parents 4c8c8d7 + 68d5bf6 commit 1a5597b
Show file tree
Hide file tree
Showing 4 changed files with 521 additions and 0 deletions.
45 changes: 45 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ built on top of [React PHP](http://reactphp.org/).
* [first()](#first)
* [all()](#all)
* [unwrapReadable()](#unwrapreadable)
* [unwrapWritable()](#unwrapwritable)
* [Install](#install)
* [License](#license)

Expand Down Expand Up @@ -151,6 +152,50 @@ If the given promise is already settled and does not resolve with an
instance of `ReadableStreamInterface`, then you will not be able to receive
the `error` event.

### unwrapWritable()

The `unwrapWritable(PromiseInterface $promise)` function can be used to unwrap
a `Promise` which resolves with a `WritableStreamInterface`.

This function returns a writable stream instance (implementing `WritableStreamInterface`)
right away which acts as a proxy for the future promise resolution.
Once the given Promise resolves with a `WritableStreamInterface`, any data you
wrote to the proxy will be piped to the inner stream.

```php
//$promise = someFunctionWhichResolvesWithAStream();
$promise = startUploadStream($uri);

$stream = Stream\unwrapWritable($promise);

$stream->write('hello');
$stream->end('world');

$stream->on('close', function () {
echo 'DONE';
});
```

If the given promise is either rejected or fulfilled with anything but an
instance of `WritableStreamInterface`, then the output stream will emit
an `error` event and close:

```php
$promise = startUploadStream($invalidUri);

$stream = Stream\unwrapWritable($promise);

$stream->on('error', function (Exception $error) {
echo 'Error: ' . $error->getMessage();
});
```

The given `$promise` SHOULD be pending, i.e. it SHOULD NOT be fulfilled or rejected
at the time of invoking this function.
If the given promise is already settled and does not resolve with an
instance of `WritableStreamInterface`, then you will not be able to receive
the `error` event.

## Install

The recommended way to install this library is [through Composer](https://getcomposer.org).
Expand Down
156 changes: 156 additions & 0 deletions src/UnwrapWritableStream.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
<?php

namespace Clue\React\Promise\Stream;

use Evenement\EventEmitter;
use React\Promise\PromiseInterface;
use React\Stream\WritableStreamInterface;
use React\Stream\Util;
use React\Promise\CancellablePromiseInterface;
use InvalidArgumentException;

/**
* @internal
* @see unwrapWritable() instead
*/
class UnwrapWritableStream extends EventEmitter implements WritableStreamInterface
{
private $promise;
private $stream;
private $buffer = '';
private $closed = false;
private $ending = false;

/**
* Instantiate new unwrapped writable stream for given `Promise` which resolves with a `WritableStreamInterface`.
*
* @param PromiseInterface $promise Promise<WritableStreamInterface, Exception>
*/
public function __construct(PromiseInterface $promise)
{
$out = $this;
$store =& $this->stream;
$buffer =& $this->buffer;
$ending =& $this->ending;
$closed =& $this->closed;

$this->promise = $promise->then(
function ($stream) {
if (!($stream instanceof WritableStreamInterface)) {
throw new InvalidArgumentException('Not a writable stream');
}
return $stream;
}
)->then(
function (WritableStreamInterface $stream) use ($out, &$store, &$buffer, &$ending, &$closed) {
// stream is already closed, make sure to close output stream
if (!$stream->isWritable()) {
$out->close();
return $stream;
}

// resolves but output is already closed, make sure to close stream silently
if ($closed) {
$stream->close();
return $stream;
}

// forward drain events for back pressure
$stream->on('drain', function () use ($out) {
$out->emit('drain', array($out));
});

// error events cancel output stream
$stream->on('error', function ($error) use ($out) {
$out->emit('error', array($error, $out));
$out->close();
});

// close both streams once either side closes
$stream->on('close', array($out, 'close'));
$out->on('close', array($stream, 'close'));

if ($buffer !== '') {
// flush buffer to stream and check if its buffer is not exceeded
$drained = $stream->write($buffer) !== false;
$buffer = '';

if ($drained) {
// signal drain event, because the output stream previous signalled a full buffer
$out->emit('drain', array($out));
}
}

if ($ending) {
$stream->end();
} else {
$store = $stream;
}

return $stream;
},
function ($e) use ($out) {
$out->emit('error', array($e, $out));
$out->close();
}
);
}

public function write($data)
{
if ($this->ending) {
return;
}

// forward to inner stream if possible
if ($this->stream !== null) {
return $this->stream->write($data);
}

// append to buffer and signal the buffer is full
$this->buffer .= $data;
return false;
}

public function end($data = null)
{
if ($this->ending) {
return;
}

$this->ending = true;

// forward to inner stream if possible
if ($this->stream !== null) {
return $this->stream->end($data);
}

// append to buffer
if ($data !== null) {
$this->buffer .= $data;
}
}

public function isWritable()
{
return !$this->ending;
}

public function close()
{
if ($this->closed) {
return;
}

$this->buffer = '';
$this->ending = true;
$this->closed = true;

// try to cancel promise once the stream closes
if ($this->promise instanceof CancellablePromiseInterface) {
$this->promise->cancel();
}

$this->emit('close', array($this));
}
}
11 changes: 11 additions & 0 deletions src/functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -147,3 +147,14 @@ function unwrapReadable(PromiseInterface $promise)
{
return new UnwrapReadableStream($promise);
}

/**
* unwrap a `Promise` which resolves with a `WritableStreamInterface`.
*
* @param PromiseInterface $promise Promise<WritableStreamInterface, Exception>
* @return WritableStreamInterface
*/
function unwrapWritable(PromiseInterface $promise)
{
return new UnwrapWritableStream($promise);
}
Loading

0 comments on commit 1a5597b

Please sign in to comment.