Skip to content

Commit

Permalink
Add SeekingLimitStream
Browse files Browse the repository at this point in the history
- Maintains an internal position separately from the wrapped
  stream and seeks prior to read
- Uses internal read position to calculate eof(), return tell()
- Calling seek() simply updates the current read position
  • Loading branch information
zbateson committed Jul 22, 2018
1 parent bd124db commit a4499c1
Show file tree
Hide file tree
Showing 3 changed files with 366 additions and 12 deletions.
26 changes: 14 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,20 @@ while (($line = GuzzleHttp\Psr7\readline()) !== false) {
```

The library consists of the following Psr\Http\Message\StreamInterface implementations:
* ZBateson\StreamDecorators\QuotedPrintableStream
* ZBateson\StreamDecorators\Base64Stream
* ZBateson\StreamDecorators\UUStream
* ZBateson\StreamDecorators\CharsetStream
* ZBateson\StreamDecorators\NonClosingStream
* ZBateson\StreamDecorators\ChunkSplitStream
* ZBateson\StreamDecorators\PregReplaceFilterStream

All of them take a single argument of a StreamInterface with the exceptions of CharsetStreams,
which also takes $fromCharset and $toCharset as arguments respectively, ChunkSplitStream which optionally
takes a $lineLength argument (defaults to 76) and a $lineEnding argument (defaults to CRLF) and
PregReplaceFilterStream which takes a $pattern argument and a $replacement argument.
* ZBateson\StreamDecorators\QuotedPrintableStream - decodes on read and encodes on write to quoted-printable
* ZBateson\StreamDecorators\Base64Stream - decodes on read and encodes on write to base64
* ZBateson\StreamDecorators\UUStream - decodes on read, encodes on write to uu-encoded
* ZBateson\StreamDecorators\CharsetStream - encodes from $streamCharset to $stringCharset on read, and vice-versa on write
* ZBateson\StreamDecorators\NonClosingStream - overrides close() and detach(), and simply unsets the attached stream without closing it
* ZBateson\StreamDecorators\ChunkSplitStream - splits written characters into lines of $lineLength long (stream implementation of php's chunk_split)
* ZBateson\StreamDecorators\PregReplaceFilterStream - calls preg_replace on with passed arguments on every read() call
* ZBateson\StreamDecorators\SeekingLimitStream - similar to GuzzleHttp's LimitStream, but maintains an internal current read position, seeking to it when read() is called, and seeking back to the wrapped stream's position after reading

QuotedPrintableStream, Base64Stream and UUStream's constructors take a single argument of a StreamInterface.
CharsetStreams's constructor also takes $streamCharset and $stringCharset as arguments respectively, ChunkSplitStream
optionally takes a $lineLength argument (defaults to 76) and a $lineEnding argument (defaults to CRLF).
PregReplaceFilterStream takes a $pattern argument and a $replacement argument. SeekingLimitStream takes optional
$limit and $offset parameters, similar to GuzzleHttp's LimitStream.

In addition, the library exposes a ZBateson\StreamDecorators\Util\CharsetConverter class which provides the following:
* a map of supported charsets with different names (aliases)
Expand Down
208 changes: 208 additions & 0 deletions src/SeekingLimitStream.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
<?php
/**
* This file is part of the ZBateson\StreamDecorators project.
*
* @license http://opensource.org/licenses/bsd-license.php BSD
*/
namespace ZBateson\StreamDecorators;

use Psr\Http\Message\StreamInterface;
use GuzzleHttp\Psr7\StreamDecoratorTrait;

/**
* Maintains an internal 'read' position, and seeks to it before reading, then
* seeks back to the original position of the underlying stream after reading if
* the attached stream supports seeking.
*
* Although based on LimitStream, it's not inherited from it since $offset and
* $limit are set to private on LimitStream, and most other functions are re-
* implemented anyway. This also decouples the implementation from upstream
* changes.
*
* @author Zaahid Bateson
*/
class SeekingLimitStream implements StreamInterface
{
use StreamDecoratorTrait;

/** @var int Offset to start reading from */
private $offset;

/** @var int Limit the number of bytes that can be read */
private $limit;

/**
* @var int Number of bytes written, and importantly, if non-zero, writes a
* final $lineEnding on close (and so maintained instead of using
* tell() directly)
*/
private $position = 0;

/**
* @param StreamInterface $stream Stream to wrap
* @param int $limit Total number of bytes to allow to be read
* from the stream. Pass -1 for no limit.
* @param int $offset Position to seek to before reading (only
* works on seekable streams).
*/
public function __construct(
StreamInterface $stream,
$limit = -1,
$offset = 0
) {
$this->stream = $stream;
$this->setLimit($limit);
$this->setOffset($offset);
}

/**
* Returns the current relative read position of this stream subset.
*
* @return int
*/
public function tell()
{
return $this->position;
}

/**
* Returns the size of the limited subset of data, or null if the wrapped
* stream returns null for getSize.
*
* @return int|null
*/
public function getSize()
{
$size = $this->stream->getSize();
if ($size === null) {
// this shouldn't happen on a seekable stream I don't think...
$pos = $this->stream->tell();
$this->stream->seek(0, SEEK_END);
$size = $this->stream->tell();
$this->stream->seek($pos);
}
if ($this->limit === -1) {
return $size - $this->offset;
} else {
return min([$this->limit, $size - $this->offset]);
}
}

/**
* Returns true if the current read position is at the end of the limited
* stream
*
* @return boolean
*/
public function eof()
{
$size = $this->limit;
if ($size === -1) {
$size = $this->getSize();
}
return ($this->position >= $size);
}

/**
* Ensures the seek position specified is within the stream's bounds, and
* sets the internal position pointer (doesn't actually seek).
*
* @param int $pos
*/
private function doSeek($pos)
{
if ($this->limit !== -1) {
$pos = min([$pos, $this->limit]);
}
$this->position = max([0, $pos]);
}

/**
* Seeks to the passed position within the confines of the limited stream's
* bounds.
*
* For SeekingLimitStream, no actual seek is performed on the underlying
* wrapped stream. Instead, an internal pointer is set, and the stream is
* 'seeked' on read operations
*
* @param int $offset
* @param int $whence
*/
public function seek($offset, $whence = SEEK_SET)
{
$pos = $offset;
switch ($whence) {
case SEEK_CUR:
$pos = $this->position + $offset;
break;
case SEEK_END:
$pos = $this->limit + $offset;
break;
default:
break;
}
$this->doSeek($pos);
}

/**
* Sets the offset to start reading from the wrapped stream.
*
* @param int $offset
* @throws \RuntimeException if the stream cannot be seeked.
*/
public function setOffset($offset)
{
$this->offset = $offset;
$this->position = 0;
}

/**
* Sets the length of the stream to the passed $limit.
*
* @param int $limit
*/
public function setLimit($limit)
{
$this->limit = $limit;
}

/**
* Seeks to the current position and reads up to $length bytes, or less if
* it would result in reading past $this->limit
*
* @param int $length
* @return string
*/
public function seekAndRead($length)
{
$this->stream->seek($this->offset + $this->position);
if ($this->limit !== -1) {
$length = min($length, $this->limit - $this->position);
if ($length <= 0) {
return '';
}
}
return $this->stream->read($length);
}

/**
* Reads from the underlying stream after seeking to the position within the
* bounds set for this limited stream. After reading, the wrapped stream is
* 'seeked' back to its position prior to the call to read().
*
* @param int $length
* @return string
*/
public function read($length)
{
$pos = $this->stream->tell();
$ret = $this->seekAndRead($length);
$this->position += strlen($ret);
$this->stream->seek($pos);
if ($this->limit !== -1 && $this->position > $this->limit) {
$ret = substr($ret, 0, -($this->position - $this->limit));
$this->position = $this->limit;
}
return $ret;
}
}
144 changes: 144 additions & 0 deletions tests/StreamDecorators/SeekingLimitStreamTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
<?php
namespace ZBateson\StreamDecorators;

use PHPUnit_Framework_TestCase;
use GuzzleHttp\Psr7;

/**
* Description of SeekingLimitStreamTest
*
* @group SeekingLimitStream
* @covers ZBateson\StreamDecorators\SeekingLimitStream
* @author Zaahid Bateson
*/
class SeekingLimitStreamTest extends PHPUnit_Framework_TestCase
{
public function testReadLimits()
{
$stream = Psr7\stream_for('This is a test');
$res = new SeekingLimitStream($stream, 3, 1);
$str = $res->getContents();
$this->assertEquals('his', $str);
}

public function testReadLimitsToEnd()
{
$stream = Psr7\stream_for('test');
$res = new SeekingLimitStream($stream, 4, 0);
$str = $res->getContents();
$this->assertEquals('test', $str);
}

public function testPosition()
{
$stream = Psr7\stream_for('This is a test');
$res = new SeekingLimitStream($stream, 3, 1);
$this->assertNotNull($res);
$this->assertEquals(0, $res->tell());
$res->getContents();
$this->assertEquals(3, $res->tell());
}

public function testGetSize()
{
$stream = Psr7\stream_for('This is a test');
$res = new SeekingLimitStream($stream);
$this->assertEquals($stream->getSize(), $res->getSize());
}

public function testGetSizeWithLimit()
{
$stream = Psr7\stream_for('This is a test');
$res = new SeekingLimitStream($stream, 5);
$this->assertEquals(5, $res->getSize());
}

public function testGetSizeWithLimitAndOffset()
{
$stream = Psr7\stream_for('This is a test');
$res = new SeekingLimitStream($stream, 5, 1);
$this->assertEquals(5, $res->getSize());
}

public function testGetSizeWithLimitBeyondSize()
{
$stream = Psr7\stream_for('This is a test');
$res = new SeekingLimitStream($stream, 5, 10);
$this->assertEquals(4, $res->getSize());
}

public function testEof()
{
$stream = Psr7\stream_for('This is a test');
$res = new SeekingLimitStream($stream, 3, 1);
$this->assertNotNull($res);
$this->assertFalse($res->eof());
$res->getContents();
$this->assertTrue($res->eof());
}

public function testEofWithStreamAtEnd()
{
$stream = Psr7\stream_for('This is a test');
$stream->getContents();
$this->assertTrue($stream->eof());
$res = new SeekingLimitStream($stream, 3, 1);
$this->assertNotNull($res);
$this->assertFalse($res->eof());
$this->assertEquals('his', $res->getContents());
$this->assertTrue($res->eof());
}

public function testEofWithNoLimit()
{
$stream = Psr7\stream_for('This is a test');
$stream->getContents();
$this->assertTrue($stream->eof());
$res = new SeekingLimitStream($stream, -1, 5);
$this->assertNotNull($res);
$this->assertFalse($res->eof());
$this->assertEquals('is a test', $res->getContents());
$this->assertTrue($res->eof());
}

public function testEofWithNoLimitAndOffset()
{
$stream = Psr7\stream_for('This is a test');
$stream->getContents();
$this->assertTrue($stream->eof());
$res = new SeekingLimitStream($stream);
$this->assertNotNull($res);
$this->assertFalse($res->eof());
$this->assertEquals('This is a test', $res->getContents());
$this->assertTrue($res->eof());
}

public function testSeek()
{
$stream = Psr7\stream_for('This is a test');
$res = new SeekingLimitStream($stream, 3, 1);

$res->seek(-1, SEEK_SET);
$this->assertEquals(0, $res->tell());
$res->seek(4, SEEK_SET);
$this->assertEquals(3, $res->tell());
$res->seek(1, SEEK_END);
$this->assertEquals(3, $res->tell());
$res->seek(-1, SEEK_CUR);
$this->assertEquals(2, $res->tell());

$res->seek(2, SEEK_SET);
$str = $res->getContents();
$this->assertEquals('s', $str);
$this->assertEquals(3, $res->tell());

$res->seek(-2, SEEK_CUR);
$this->assertEquals(1, $res->tell());
$str = $res->getContents();
$this->assertEquals('is', $str);

$res->seek(-1, SEEK_END);
$str = $res->getContents();
$this->assertEquals('s', $str);
}
}

0 comments on commit a4499c1

Please sign in to comment.