Skip to content

Commit

Permalink
Cancellation improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
mbonneau committed Jul 14, 2019
1 parent 9faf9c9 commit eefc2ab
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 12 deletions.
71 changes: 59 additions & 12 deletions src/PgAsync/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,12 @@ class Connection extends EventEmitter
/** @var Subject */
private $notificationSubject;

/** @var bool */
private $cancelPending;

/** @var bool */
private $cancelRequested;

/**
* Can be 'I' for Idle, 'T' if in transactions block
* or 'E' if in failed transaction block (queries will fail until end of trans)
Expand Down Expand Up @@ -156,15 +162,18 @@ public function __construct(array $parameters, LoopInterface $loop, ConnectorInt
$parameters['application_name'] = 'pgasync';
}

$this->parameters = $parameters;
$this->loop = $loop;
$this->commandQueue = [];
$this->queryState = static::STATE_BUSY;
$this->queryType = static::QUERY_SIMPLE;
$this->connStatus = static::CONNECTION_NEEDED;
$this->socket = $connector ?: new Connector($loop);
$this->uri = 'tcp://' . $this->parameters['host'] . ':' . $this->parameters['port'];
$this->uri = 'tcp://' . $parameters['host'] . ':' . $parameters['port'];
$this->notificationSubject = new Subject();
$this->cancelPending = false;
$this->cancelRequested = false;

$this->parameters = $parameters;
}

private function start()
Expand Down Expand Up @@ -226,6 +235,12 @@ public function onData($data)
while (strlen($data) > 0) {
$data = $this->processData($data);
}

// We should only cancel if we have drained the input buffer (as much as we can see)
// and there is still a pending query that needs to be canceled
if ($this->cancelRequested) {
$this->cancelRequest();
}
}

private function processData($data)
Expand Down Expand Up @@ -390,6 +405,11 @@ private function handleCommandComplete(CommandComplete $message)
$command = $this->currentCommand;
$this->currentCommand = null;
$command->complete();

// if we have requested a cancel for this query
// but we have received the command complete before we
// had a chance to start canceling - then never mind
$this->cancelRequested = false;
}
$this->debug('Command complete.');
}
Expand Down Expand Up @@ -477,6 +497,11 @@ private function failAllCommandsWith(\Throwable $e = null)

public function processQueue()
{
if ($this->cancelPending) {
$this->debug("Not processing queue because there is a cancellation pending.");
return;
}

if (count($this->commandQueue) === 0 && $this->queryState === static::STATE_READY && $this->auto_disconnect) {
$this->commandQueue[] = new Terminate();
}
Expand Down Expand Up @@ -542,7 +567,7 @@ function (ObserverInterface $observer, SchedulerInterface $scheduler = null) use

return new CallbackDisposable(function () use ($q) {
if ($this->currentCommand === $q && $q->isActive()) {
$this->cancelRequest();
$this->cancelRequested = true;
}
$q->cancel();
});
Expand Down Expand Up @@ -587,31 +612,41 @@ function (ObserverInterface $observer, SchedulerInterface $scheduler = null) use

$name = 'somestatement';

/** @var CommandInterface[] $commandGroup */
$commandGroup = [];
$close = new Close($name);
$this->commandQueue[] = $close;
$commandGroup[] = $close;

$prepare = new Parse($name, $queryString);
$this->commandQueue[] = $prepare;
$commandGroup[] = $prepare;

$bind = new Bind($parameters, $name);
$this->commandQueue[] = $bind;
$commandGroup[] = $bind;

$describe = new Describe();
$this->commandQueue[] = $describe;
$commandGroup[] = $describe;

$execute = new Execute();
$this->commandQueue[] = $execute;
$commandGroup[] = $execute;

$sync = new Sync($queryString, $observer);
$this->commandQueue[] = $sync;
$commandGroup[] = $sync;

$this->commandQueue = array_merge($this->commandQueue, $commandGroup);

$this->processQueue();

return new CallbackDisposable(function () use ($sync) {
return new CallbackDisposable(function () use ($sync, $commandGroup) {
if ($this->currentCommand === $sync && $sync->isActive()) {
$this->cancelRequest();
$this->cancelRequested = true;
$sync->cancel();

// no point in canceling the other commands because they are out the door
return;
}
foreach ($commandGroup as $command) {
$command->cancel();
}
$sync->cancel();
});
}
);
Expand Down Expand Up @@ -646,11 +681,23 @@ public function disconnect()

private function cancelRequest()
{
$this->cancelRequested = false;
if ($this->queryState !== self::STATE_BUSY) {
$this->debug("Not canceling because there is nothing to cancel.");
return;
}
if ($this->currentCommand !== null) {
$this->cancelPending = true;
$this->socket->connect($this->uri)->then(function (DuplexStreamInterface $conn) {
$cancelRequest = new CancelRequest($this->backendKeyData->getPid(), $this->backendKeyData->getKey());
$conn->on('close', function () {
$this->cancelPending = false;
$this->processQueue();
});
$conn->end($cancelRequest->encodedMessage());
}, function (\Throwable $e) {
$this->cancelPending = false;
$this->processQueue();
$this->debug("Error connecting for cancellation... " . $e->getMessage() . "\n");
});
}
Expand Down
67 changes: 67 additions & 0 deletions tests/Integration/ConnectionTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -313,4 +313,71 @@ function () {
$conn->disconnect();
$this->getLoop()->run();
}

public function testCancellationWithImmediateQueryQueuedUp() {
$conn = new Connection([
"user" => $this->getDbUser(),
"database" => $this::getDbName()
], $this->getLoop());

$q1 = $conn->query("SELECT * FROM generate_series(1,4)");
$q2 = $conn->query("SELECT pg_sleep(10)");

$testQuery = $q1->merge($q2)->take(1);

$value = null;

$testQuery->subscribe(
function ($results) use (&$value) {
$value = $results;
$this->stopLoop();
},
function (\Throwable $e) {
$this->fail('Expected no error' . $e->getMessage());
$this->stopLoop();
},
function () {
$this->stopLoop();
}
);

$this->runLoopWithTimeout(15);

$this->assertEquals(['generate_series' => '1'], $value);

$conn->disconnect();
$this->getLoop()->run();
}

public function testArrayInParameters() {
$conn = new Connection([
"user" => $this->getDbUser(),
"database" => $this::getDbName()
], $this->getLoop());

$testQuery = $conn->executeStatement("SELECT * FROM generate_series(1,4) WHERE generate_series = ANY($1)", ['{2, 3}']);

$value = [];

$testQuery->subscribe(
function ($results) use (&$value) {
$value[] = $results;
$this->stopLoop();
},
function (\Throwable $e) {
$this->fail('Expected no error' . $e->getMessage());
$this->stopLoop();
},
function () {
$this->stopLoop();
}
);

$this->runLoopWithTimeout(15);

$this->assertEquals([['generate_series' => 2], ['generate_series' => 3]], $value);

$conn->disconnect();
$this->getLoop()->run();
}
}

0 comments on commit eefc2ab

Please sign in to comment.