diff --git a/src/PgAsync/Connection.php b/src/PgAsync/Connection.php index 4c2d149..1f896f7 100644 --- a/src/PgAsync/Connection.php +++ b/src/PgAsync/Connection.php @@ -113,6 +113,12 @@ class Connection extends EventEmitter /** @var Subject */ private $notificationSubject; + /** @var bool */ + private $cancelPending; + + /** @var bool */ + private $cancelRequested; + /** * Can be 'I' for Idle, 'T' if in transactions block * or 'E' if in failed transaction block (queries will fail until end of trans) @@ -156,15 +162,18 @@ public function __construct(array $parameters, LoopInterface $loop, ConnectorInt $parameters['application_name'] = 'pgasync'; } - $this->parameters = $parameters; $this->loop = $loop; $this->commandQueue = []; $this->queryState = static::STATE_BUSY; $this->queryType = static::QUERY_SIMPLE; $this->connStatus = static::CONNECTION_NEEDED; $this->socket = $connector ?: new Connector($loop); - $this->uri = 'tcp://' . $this->parameters['host'] . ':' . $this->parameters['port']; + $this->uri = 'tcp://' . $parameters['host'] . ':' . $parameters['port']; $this->notificationSubject = new Subject(); + $this->cancelPending = false; + $this->cancelRequested = false; + + $this->parameters = $parameters; } private function start() @@ -226,6 +235,12 @@ public function onData($data) while (strlen($data) > 0) { $data = $this->processData($data); } + + // We should only cancel if we have drained the input buffer (as much as we can see) + // and there is still a pending query that needs to be canceled + if ($this->cancelRequested) { + $this->cancelRequest(); + } } private function processData($data) @@ -390,6 +405,11 @@ private function handleCommandComplete(CommandComplete $message) $command = $this->currentCommand; $this->currentCommand = null; $command->complete(); + + // if we have requested a cancel for this query + // but we have received the command complete before we + // had a chance to start canceling - then never mind + $this->cancelRequested = false; } $this->debug('Command complete.'); } @@ -477,6 +497,11 @@ private function failAllCommandsWith(\Throwable $e = null) public function processQueue() { + if ($this->cancelPending) { + $this->debug("Not processing queue because there is a cancellation pending."); + return; + } + if (count($this->commandQueue) === 0 && $this->queryState === static::STATE_READY && $this->auto_disconnect) { $this->commandQueue[] = new Terminate(); } @@ -542,7 +567,7 @@ function (ObserverInterface $observer, SchedulerInterface $scheduler = null) use return new CallbackDisposable(function () use ($q) { if ($this->currentCommand === $q && $q->isActive()) { - $this->cancelRequest(); + $this->cancelRequested = true; } $q->cancel(); }); @@ -587,31 +612,41 @@ function (ObserverInterface $observer, SchedulerInterface $scheduler = null) use $name = 'somestatement'; + /** @var CommandInterface[] $commandGroup */ + $commandGroup = []; $close = new Close($name); - $this->commandQueue[] = $close; + $commandGroup[] = $close; $prepare = new Parse($name, $queryString); - $this->commandQueue[] = $prepare; + $commandGroup[] = $prepare; $bind = new Bind($parameters, $name); - $this->commandQueue[] = $bind; + $commandGroup[] = $bind; $describe = new Describe(); - $this->commandQueue[] = $describe; + $commandGroup[] = $describe; $execute = new Execute(); - $this->commandQueue[] = $execute; + $commandGroup[] = $execute; $sync = new Sync($queryString, $observer); - $this->commandQueue[] = $sync; + $commandGroup[] = $sync; + + $this->commandQueue = array_merge($this->commandQueue, $commandGroup); $this->processQueue(); - return new CallbackDisposable(function () use ($sync) { + return new CallbackDisposable(function () use ($sync, $commandGroup) { if ($this->currentCommand === $sync && $sync->isActive()) { - $this->cancelRequest(); + $this->cancelRequested = true; + $sync->cancel(); + + // no point in canceling the other commands because they are out the door + return; + } + foreach ($commandGroup as $command) { + $command->cancel(); } - $sync->cancel(); }); } ); @@ -646,11 +681,23 @@ public function disconnect() private function cancelRequest() { + $this->cancelRequested = false; + if ($this->queryState !== self::STATE_BUSY) { + $this->debug("Not canceling because there is nothing to cancel."); + return; + } if ($this->currentCommand !== null) { + $this->cancelPending = true; $this->socket->connect($this->uri)->then(function (DuplexStreamInterface $conn) { $cancelRequest = new CancelRequest($this->backendKeyData->getPid(), $this->backendKeyData->getKey()); + $conn->on('close', function () { + $this->cancelPending = false; + $this->processQueue(); + }); $conn->end($cancelRequest->encodedMessage()); }, function (\Throwable $e) { + $this->cancelPending = false; + $this->processQueue(); $this->debug("Error connecting for cancellation... " . $e->getMessage() . "\n"); }); } diff --git a/tests/Integration/ConnectionTest.php b/tests/Integration/ConnectionTest.php index eeb37aa..8892585 100644 --- a/tests/Integration/ConnectionTest.php +++ b/tests/Integration/ConnectionTest.php @@ -313,4 +313,71 @@ function () { $conn->disconnect(); $this->getLoop()->run(); } + + public function testCancellationWithImmediateQueryQueuedUp() { + $conn = new Connection([ + "user" => $this->getDbUser(), + "database" => $this::getDbName() + ], $this->getLoop()); + + $q1 = $conn->query("SELECT * FROM generate_series(1,4)"); + $q2 = $conn->query("SELECT pg_sleep(10)"); + + $testQuery = $q1->merge($q2)->take(1); + + $value = null; + + $testQuery->subscribe( + function ($results) use (&$value) { + $value = $results; + $this->stopLoop(); + }, + function (\Throwable $e) { + $this->fail('Expected no error' . $e->getMessage()); + $this->stopLoop(); + }, + function () { + $this->stopLoop(); + } + ); + + $this->runLoopWithTimeout(15); + + $this->assertEquals(['generate_series' => '1'], $value); + + $conn->disconnect(); + $this->getLoop()->run(); + } + + public function testArrayInParameters() { + $conn = new Connection([ + "user" => $this->getDbUser(), + "database" => $this::getDbName() + ], $this->getLoop()); + + $testQuery = $conn->executeStatement("SELECT * FROM generate_series(1,4) WHERE generate_series = ANY($1)", ['{2, 3}']); + + $value = []; + + $testQuery->subscribe( + function ($results) use (&$value) { + $value[] = $results; + $this->stopLoop(); + }, + function (\Throwable $e) { + $this->fail('Expected no error' . $e->getMessage()); + $this->stopLoop(); + }, + function () { + $this->stopLoop(); + } + ); + + $this->runLoopWithTimeout(15); + + $this->assertEquals([['generate_series' => 2], ['generate_series' => 3]], $value); + + $conn->disconnect(); + $this->getLoop()->run(); + } } \ No newline at end of file