535 lines
16 KiB
PHP
Executable File
535 lines
16 KiB
PHP
Executable File
<?php
|
|
|
|
namespace React\Tests\Stream;
|
|
|
|
use Clue\StreamFilter as Filter;
|
|
use React\Stream\WritableResourceStream;
|
|
|
|
class WritableResourceStreamTest extends TestCase
|
|
{
|
|
/**
|
|
* @covers React\Stream\WritableResourceStream::__construct
|
|
* @doesNotPerformAssertions
|
|
*/
|
|
public function testConstructor()
|
|
{
|
|
$stream = fopen('php://temp', 'r+');
|
|
$loop = $this->createLoopMock();
|
|
|
|
new WritableResourceStream($stream, $loop);
|
|
}
|
|
|
|
/**
|
|
* @covers React\Stream\WritableResourceStream::__construct
|
|
* @doesNotPerformAssertions
|
|
*/
|
|
public function testConstructorWithExcessiveMode()
|
|
{
|
|
// excessive flags are ignored for temp streams, so we have to use a file stream
|
|
$name = tempnam(sys_get_temp_dir(), 'test');
|
|
$stream = @fopen($name, 'w+eANYTHING');
|
|
unlink($name);
|
|
|
|
$loop = $this->createLoopMock();
|
|
$buffer = new WritableResourceStream($stream, $loop);
|
|
$buffer->close();
|
|
}
|
|
|
|
/**
|
|
* @covers React\Stream\WritableResourceStream::__construct
|
|
* @expectedException InvalidArgumentException
|
|
*/
|
|
public function testConstructorThrowsIfNotAValidStreamResource()
|
|
{
|
|
$stream = null;
|
|
$loop = $this->createLoopMock();
|
|
|
|
new WritableResourceStream($stream, $loop);
|
|
}
|
|
|
|
/**
|
|
* @covers React\Stream\WritableResourceStream::__construct
|
|
* @expectedException InvalidArgumentException
|
|
*/
|
|
public function testConstructorThrowsExceptionOnReadOnlyStream()
|
|
{
|
|
$stream = fopen('php://temp', 'r');
|
|
$loop = $this->createLoopMock();
|
|
|
|
new WritableResourceStream($stream, $loop);
|
|
}
|
|
|
|
/**
|
|
* @covers React\Stream\WritableResourceStream::__construct
|
|
* @expectedException InvalidArgumentException
|
|
*/
|
|
public function testConstructorThrowsExceptionOnReadOnlyStreamWithExcessiveMode()
|
|
{
|
|
// excessive flags are ignored for temp streams, so we have to use a file stream
|
|
$name = tempnam(sys_get_temp_dir(), 'test');
|
|
$stream = fopen($name, 'reANYTHING');
|
|
unlink($name);
|
|
|
|
$loop = $this->createLoopMock();
|
|
new WritableResourceStream($stream, $loop);
|
|
}
|
|
|
|
/**
|
|
* @covers React\Stream\WritableResourceStream::__construct
|
|
* @expectedException RuntimeException
|
|
*/
|
|
public function testConstructorThrowsExceptionIfStreamDoesNotSupportNonBlocking()
|
|
{
|
|
if (!in_array('blocking', stream_get_wrappers())) {
|
|
stream_wrapper_register('blocking', 'React\Tests\Stream\EnforceBlockingWrapper');
|
|
}
|
|
|
|
$stream = fopen('blocking://test', 'r+');
|
|
$loop = $this->createLoopMock();
|
|
|
|
new WritableResourceStream($stream, $loop);
|
|
}
|
|
|
|
/**
|
|
* @covers React\Stream\WritableResourceStream::write
|
|
* @covers React\Stream\WritableResourceStream::handleWrite
|
|
*/
|
|
public function testWrite()
|
|
{
|
|
$stream = fopen('php://temp', 'r+');
|
|
$loop = $this->createWriteableLoopMock();
|
|
|
|
$buffer = new WritableResourceStream($stream, $loop);
|
|
$buffer->on('error', $this->expectCallableNever());
|
|
|
|
$buffer->write("foobar\n");
|
|
rewind($stream);
|
|
$this->assertSame("foobar\n", fread($stream, 1024));
|
|
}
|
|
|
|
/**
|
|
* @covers React\Stream\WritableResourceStream::write
|
|
*/
|
|
public function testWriteWithDataDoesAddResourceToLoop()
|
|
{
|
|
$stream = fopen('php://temp', 'r+');
|
|
$loop = $this->createLoopMock();
|
|
$loop->expects($this->once())->method('addWriteStream')->with($this->equalTo($stream));
|
|
|
|
$buffer = new WritableResourceStream($stream, $loop);
|
|
|
|
$buffer->write("foobar\n");
|
|
}
|
|
|
|
/**
|
|
* @covers React\Stream\WritableResourceStream::write
|
|
* @covers React\Stream\WritableResourceStream::handleWrite
|
|
*/
|
|
public function testEmptyWriteDoesNotAddToLoop()
|
|
{
|
|
$stream = fopen('php://temp', 'r+');
|
|
$loop = $this->createLoopMock();
|
|
$loop->expects($this->never())->method('addWriteStream');
|
|
|
|
$buffer = new WritableResourceStream($stream, $loop);
|
|
|
|
$buffer->write("");
|
|
$buffer->write(null);
|
|
}
|
|
|
|
/**
|
|
* @covers React\Stream\WritableResourceStream::write
|
|
* @covers React\Stream\WritableResourceStream::handleWrite
|
|
*/
|
|
public function testWriteReturnsFalseWhenWritableResourceStreamIsFull()
|
|
{
|
|
$stream = fopen('php://temp', 'r+');
|
|
$loop = $this->createWriteableLoopMock();
|
|
$loop->preventWrites = true;
|
|
|
|
$buffer = new WritableResourceStream($stream, $loop, 4);
|
|
$buffer->on('error', $this->expectCallableNever());
|
|
|
|
$this->assertTrue($buffer->write("foo"));
|
|
$loop->preventWrites = false;
|
|
$this->assertFalse($buffer->write("bar\n"));
|
|
}
|
|
|
|
/**
|
|
* @covers React\Stream\WritableResourceStream::write
|
|
*/
|
|
public function testWriteReturnsFalseWhenWritableResourceStreamIsExactlyFull()
|
|
{
|
|
$stream = fopen('php://temp', 'r+');
|
|
$loop = $this->createLoopMock();
|
|
|
|
$buffer = new WritableResourceStream($stream, $loop, 3);
|
|
|
|
$this->assertFalse($buffer->write("foo"));
|
|
}
|
|
|
|
/**
|
|
* @covers React\Stream\WritableResourceStream::write
|
|
* @covers React\Stream\WritableResourceStream::handleWrite
|
|
*/
|
|
public function testWriteDetectsWhenOtherSideIsClosed()
|
|
{
|
|
list($a, $b) = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP);
|
|
|
|
$loop = $this->createWriteableLoopMock();
|
|
|
|
$buffer = new WritableResourceStream($a, $loop, 4);
|
|
$buffer->on('error', $this->expectCallableOnce());
|
|
|
|
fclose($b);
|
|
|
|
$buffer->write("foo");
|
|
}
|
|
|
|
/**
|
|
* @covers React\Stream\WritableResourceStream::write
|
|
* @covers React\Stream\WritableResourceStream::handleWrite
|
|
*/
|
|
public function testEmitsDrainAfterWriteWhichExceedsBuffer()
|
|
{
|
|
$stream = fopen('php://temp', 'r+');
|
|
$loop = $this->createLoopMock();
|
|
|
|
$buffer = new WritableResourceStream($stream, $loop, 2);
|
|
$buffer->on('error', $this->expectCallableNever());
|
|
$buffer->on('drain', $this->expectCallableOnce());
|
|
|
|
$buffer->write("foo");
|
|
$buffer->handleWrite();
|
|
}
|
|
|
|
/**
|
|
* @covers React\Stream\WritableResourceStream::write
|
|
* @covers React\Stream\WritableResourceStream::handleWrite
|
|
*/
|
|
public function testWriteInDrain()
|
|
{
|
|
$stream = fopen('php://temp', 'r+');
|
|
$loop = $this->createLoopMock();
|
|
|
|
$buffer = new WritableResourceStream($stream, $loop, 2);
|
|
$buffer->on('error', $this->expectCallableNever());
|
|
|
|
$buffer->once('drain', function () use ($buffer) {
|
|
$buffer->write("bar\n");
|
|
$buffer->handleWrite();
|
|
});
|
|
|
|
$this->assertFalse($buffer->write("foo\n"));
|
|
$buffer->handleWrite();
|
|
|
|
fseek($stream, 0);
|
|
$this->assertSame("foo\nbar\n", stream_get_contents($stream));
|
|
}
|
|
|
|
/**
|
|
* @covers React\Stream\WritableResourceStream::write
|
|
* @covers React\Stream\WritableResourceStream::handleWrite
|
|
*/
|
|
public function testDrainAfterWrite()
|
|
{
|
|
$stream = fopen('php://temp', 'r+');
|
|
$loop = $this->createLoopMock();
|
|
|
|
$buffer = new WritableResourceStream($stream, $loop, 2);
|
|
|
|
$buffer->on('drain', $this->expectCallableOnce());
|
|
|
|
$buffer->write("foo");
|
|
$buffer->handleWrite();
|
|
}
|
|
|
|
/**
|
|
* @covers React\Stream\WritableResourceStream::handleWrite
|
|
*/
|
|
public function testDrainAfterWriteWillRemoveResourceFromLoopWithoutClosing()
|
|
{
|
|
$stream = fopen('php://temp', 'r+');
|
|
$loop = $this->createLoopMock();
|
|
$loop->expects($this->once())->method('removeWriteStream')->with($stream);
|
|
|
|
$buffer = new WritableResourceStream($stream, $loop, 2);
|
|
|
|
$buffer->on('drain', $this->expectCallableOnce());
|
|
|
|
$buffer->on('close', $this->expectCallableNever());
|
|
|
|
$buffer->write("foo");
|
|
$buffer->handleWrite();
|
|
}
|
|
|
|
/**
|
|
* @covers React\Stream\WritableResourceStream::handleWrite
|
|
*/
|
|
public function testClosingDuringDrainAfterWriteWillRemoveResourceFromLoopOnceAndClose()
|
|
{
|
|
$stream = fopen('php://temp', 'r+');
|
|
$loop = $this->createLoopMock();
|
|
$loop->expects($this->once())->method('removeWriteStream')->with($stream);
|
|
|
|
$buffer = new WritableResourceStream($stream, $loop, 2);
|
|
|
|
$buffer->on('drain', function () use ($buffer) {
|
|
$buffer->close();
|
|
});
|
|
|
|
$buffer->on('close', $this->expectCallableOnce());
|
|
|
|
$buffer->write("foo");
|
|
$buffer->handleWrite();
|
|
}
|
|
|
|
/**
|
|
* @covers React\Stream\WritableResourceStream::end
|
|
*/
|
|
public function testEndWithoutDataClosesImmediatelyIfWritableResourceStreamIsEmpty()
|
|
{
|
|
$stream = fopen('php://temp', 'r+');
|
|
$loop = $this->createLoopMock();
|
|
|
|
$buffer = new WritableResourceStream($stream, $loop);
|
|
$buffer->on('error', $this->expectCallableNever());
|
|
$buffer->on('close', $this->expectCallableOnce());
|
|
|
|
$this->assertTrue($buffer->isWritable());
|
|
$buffer->end();
|
|
$this->assertFalse($buffer->isWritable());
|
|
}
|
|
|
|
/**
|
|
* @covers React\Stream\WritableResourceStream::end
|
|
*/
|
|
public function testEndWithoutDataDoesNotCloseIfWritableResourceStreamIsFull()
|
|
{
|
|
$stream = fopen('php://temp', 'r+');
|
|
$loop = $this->createLoopMock();
|
|
|
|
$buffer = new WritableResourceStream($stream, $loop);
|
|
$buffer->on('error', $this->expectCallableNever());
|
|
$buffer->on('close', $this->expectCallableNever());
|
|
|
|
$buffer->write('foo');
|
|
|
|
$this->assertTrue($buffer->isWritable());
|
|
$buffer->end();
|
|
$this->assertFalse($buffer->isWritable());
|
|
}
|
|
|
|
/**
|
|
* @covers React\Stream\WritableResourceStream::end
|
|
*/
|
|
public function testEndWithDataClosesImmediatelyIfWritableResourceStreamFlushes()
|
|
{
|
|
$stream = fopen('php://temp', 'r+');
|
|
$filterBuffer = '';
|
|
$loop = $this->createLoopMock();
|
|
|
|
$buffer = new WritableResourceStream($stream, $loop);
|
|
$buffer->on('error', $this->expectCallableNever());
|
|
$buffer->on('close', $this->expectCallableOnce());
|
|
|
|
Filter\append($stream, function ($chunk) use (&$filterBuffer) {
|
|
$filterBuffer .= $chunk;
|
|
return $chunk;
|
|
});
|
|
|
|
$this->assertTrue($buffer->isWritable());
|
|
$buffer->end('final words');
|
|
$this->assertFalse($buffer->isWritable());
|
|
|
|
$buffer->handleWrite();
|
|
$this->assertSame('final words', $filterBuffer);
|
|
}
|
|
|
|
/**
|
|
* @covers React\Stream\WritableResourceStream::end
|
|
*/
|
|
public function testEndWithDataDoesNotCloseImmediatelyIfWritableResourceStreamIsFull()
|
|
{
|
|
$stream = fopen('php://temp', 'r+');
|
|
$loop = $this->createLoopMock();
|
|
|
|
$buffer = new WritableResourceStream($stream, $loop);
|
|
$buffer->on('error', $this->expectCallableNever());
|
|
$buffer->on('close', $this->expectCallableNever());
|
|
|
|
$buffer->write('foo');
|
|
|
|
$this->assertTrue($buffer->isWritable());
|
|
$buffer->end('final words');
|
|
$this->assertFalse($buffer->isWritable());
|
|
|
|
rewind($stream);
|
|
$this->assertSame('', stream_get_contents($stream));
|
|
}
|
|
|
|
/**
|
|
* @covers React\Stream\WritableResourceStream::isWritable
|
|
* @covers React\Stream\WritableResourceStream::close
|
|
*/
|
|
public function testClose()
|
|
{
|
|
$stream = fopen('php://temp', 'r+');
|
|
$loop = $this->createLoopMock();
|
|
|
|
$buffer = new WritableResourceStream($stream, $loop);
|
|
$buffer->on('error', $this->expectCallableNever());
|
|
$buffer->on('close', $this->expectCallableOnce());
|
|
|
|
$this->assertTrue($buffer->isWritable());
|
|
$buffer->close();
|
|
$this->assertFalse($buffer->isWritable());
|
|
|
|
$this->assertEquals(array(), $buffer->listeners('close'));
|
|
}
|
|
|
|
/**
|
|
* @covers React\Stream\WritableResourceStream::close
|
|
*/
|
|
public function testClosingAfterWriteRemovesStreamFromLoop()
|
|
{
|
|
$stream = fopen('php://temp', 'r+');
|
|
$loop = $this->createLoopMock();
|
|
$buffer = new WritableResourceStream($stream, $loop);
|
|
|
|
$loop->expects($this->once())->method('removeWriteStream')->with($stream);
|
|
|
|
$buffer->write('foo');
|
|
$buffer->close();
|
|
}
|
|
|
|
/**
|
|
* @covers React\Stream\WritableResourceStream::close
|
|
*/
|
|
public function testClosingWithoutWritingDoesNotRemoveStreamFromLoop()
|
|
{
|
|
$stream = fopen('php://temp', 'r+');
|
|
$loop = $this->createLoopMock();
|
|
$buffer = new WritableResourceStream($stream, $loop);
|
|
|
|
$loop->expects($this->never())->method('removeWriteStream');
|
|
|
|
$buffer->close();
|
|
}
|
|
|
|
/**
|
|
* @covers React\Stream\WritableResourceStream::close
|
|
*/
|
|
public function testDoubleCloseWillEmitOnlyOnce()
|
|
{
|
|
$stream = fopen('php://temp', 'r+');
|
|
$loop = $this->createLoopMock();
|
|
|
|
$buffer = new WritableResourceStream($stream, $loop);
|
|
$buffer->on('close', $this->expectCallableOnce());
|
|
|
|
$buffer->close();
|
|
$buffer->close();
|
|
}
|
|
|
|
/**
|
|
* @covers React\Stream\WritableResourceStream::write
|
|
* @covers React\Stream\WritableResourceStream::close
|
|
*/
|
|
public function testWritingToClosedWritableResourceStreamShouldNotWriteToStream()
|
|
{
|
|
$stream = fopen('php://temp', 'r+');
|
|
$filterBuffer = '';
|
|
$loop = $this->createLoopMock();
|
|
|
|
$buffer = new WritableResourceStream($stream, $loop);
|
|
|
|
Filter\append($stream, function ($chunk) use (&$filterBuffer) {
|
|
$filterBuffer .= $chunk;
|
|
return $chunk;
|
|
});
|
|
|
|
$buffer->close();
|
|
|
|
$buffer->write('foo');
|
|
|
|
$buffer->handleWrite();
|
|
$this->assertSame('', $filterBuffer);
|
|
}
|
|
|
|
/**
|
|
* @covers React\Stream\WritableResourceStream::handleWrite
|
|
*/
|
|
public function testErrorWhenStreamResourceIsInvalid()
|
|
{
|
|
$stream = fopen('php://temp', 'r+');
|
|
$loop = $this->createWriteableLoopMock();
|
|
|
|
$error = null;
|
|
|
|
$buffer = new WritableResourceStream($stream, $loop);
|
|
$buffer->on('error', function ($message) use (&$error) {
|
|
$error = $message;
|
|
});
|
|
|
|
// invalidate stream resource
|
|
fclose($stream);
|
|
|
|
$buffer->write('Attempting to write to bad stream');
|
|
|
|
$this->assertInstanceOf('Exception', $error);
|
|
|
|
// the error messages differ between PHP versions, let's just check substrings
|
|
$this->assertContains('Unable to write to stream: ', $error->getMessage());
|
|
$this->assertContains(' not a valid stream resource', $error->getMessage(), '', true);
|
|
}
|
|
|
|
public function testWritingToClosedStream()
|
|
{
|
|
if ('Darwin' === PHP_OS) {
|
|
$this->markTestSkipped('OS X issue with shutting down pair for writing');
|
|
}
|
|
|
|
list($a, $b) = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP);
|
|
$loop = $this->createLoopMock();
|
|
|
|
$error = null;
|
|
|
|
$buffer = new WritableResourceStream($a, $loop);
|
|
$buffer->on('error', function($message) use (&$error) {
|
|
$error = $message;
|
|
});
|
|
|
|
$buffer->write('foo');
|
|
$buffer->handleWrite();
|
|
stream_socket_shutdown($b, STREAM_SHUT_RD);
|
|
stream_socket_shutdown($a, STREAM_SHUT_RD);
|
|
$buffer->write('bar');
|
|
$buffer->handleWrite();
|
|
|
|
$this->assertInstanceOf('Exception', $error);
|
|
$this->assertSame('Unable to write to stream: fwrite(): send of 3 bytes failed with errno=32 Broken pipe', $error->getMessage());
|
|
}
|
|
|
|
private function createWriteableLoopMock()
|
|
{
|
|
$loop = $this->createLoopMock();
|
|
$loop->preventWrites = false;
|
|
$loop
|
|
->expects($this->any())
|
|
->method('addWriteStream')
|
|
->will($this->returnCallback(function ($stream, $listener) use ($loop) {
|
|
if (!$loop->preventWrites) {
|
|
call_user_func($listener, $stream);
|
|
}
|
|
}));
|
|
|
|
return $loop;
|
|
}
|
|
|
|
private function createLoopMock()
|
|
{
|
|
return $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock();
|
|
}
|
|
}
|