Skip to content

Commit

Permalink
add support for nonblocking writes
Browse files Browse the repository at this point in the history
added pq\Connection::$nonblocking and pq\Connection::flush()

Closes gh-issue #16
m6w6 committed May 18, 2016
1 parent e4075e6 commit a3df086
Showing 3 changed files with 102 additions and 1 deletion.
2 changes: 2 additions & 0 deletions package.xml
Original file line number Diff line number Diff line change
@@ -48,6 +48,7 @@
<notes><![CDATA[
* Added public readonly array pq\Result::$diag property, listing PQresultErrorField details (gh-issue #14)
* Restore listeners and prepared statements after a connection reset (gh-issue #15)
* Added pq\Connection::$nonblocking and pq\Connection::flush() to support non-blocking writes (gh-issue #16)
]]></notes>
<contents>
<dir name="/">
@@ -127,6 +128,7 @@
<file role="test" name="exceptions001.phpt" />
<file role="test" name="exceptions002.phpt" />
<file role="test" name="fetch001.phpt" />
<file role="test" name="flush001.phpt" />
<file role="test" name="gh-issue015_listeners.phpt" />
<file role="test" name="gh-issue015_statements.phpt" />
<file role="test" name="info001.phpt" />
57 changes: 56 additions & 1 deletion src/php_pqconn.c
Original file line number Diff line number Diff line change
@@ -242,6 +242,20 @@ static void php_pqconn_object_write_unbuffered(zval *object, void *o, zval *valu
obj->intern->unbuffered = z_is_true(value);
}

static void php_pqconn_object_read_nonblocking(zval *object, void *o, zval *return_value TSRMLS_DC)
{
php_pqconn_object_t *obj = o;

RETVAL_BOOL(PQisnonblocking(obj->intern->conn));
}

static void php_pqconn_object_write_nonblocking(zval *object, void *o, zval *value TSRMLS_DC)
{
php_pqconn_object_t *obj = o;

PQsetnonblocking(obj->intern->conn, z_is_true(value));
}

static void php_pqconn_object_read_db(zval *object, void *o, zval *return_value TSRMLS_DC)
{
php_pqconn_object_t *obj = o;
@@ -1050,6 +1064,40 @@ static PHP_METHOD(pqconn, poll) {
}
}

ZEND_BEGIN_ARG_INFO_EX(ai_pqconn_flush, 0, 0, 0)
ZEND_END_ARG_INFO();
static PHP_METHOD(pqconn, flush) {
zend_error_handling zeh;
ZEND_RESULT_CODE rv;

zend_replace_error_handling(EH_THROW, exce(EX_INVALID_ARGUMENT), &zeh TSRMLS_CC);
rv = zend_parse_parameters_none();
zend_restore_error_handling(&zeh TSRMLS_CC);

if (SUCCESS == rv) {
php_pqconn_object_t *obj = zend_object_store_get_object(getThis() TSRMLS_CC);

if (!obj->intern) {
throw_exce(EX_UNINITIALIZED TSRMLS_CC, "pq\\Connection not initialized");
} else if (!obj->intern->poller) {
throw_exce(EX_RUNTIME TSRMLS_CC, "No asynchronous operation active");
} else {
switch (PQflush(obj->intern->conn)) {
case -1:
default:
throw_exce(EX_RUNTIME TSRMLS_CC, "Failed to flush connection: %s", PHP_PQerrorMessage(obj->intern->conn));
break;
case 0:
RETVAL_TRUE;
break;
case 1:
RETVAL_FALSE;
break;
}
}
}
}

ZEND_BEGIN_ARG_INFO_EX(ai_pqconn_exec, 0, 0, 1)
ZEND_ARG_INFO(0, query)
ZEND_END_ARG_INFO();
@@ -1918,6 +1966,7 @@ static zend_function_entry php_pqconn_methods[] = {
PHP_ME(pqconn, reset, ai_pqconn_reset, ZEND_ACC_PUBLIC)
PHP_ME(pqconn, resetAsync, ai_pqconn_reset_async, ZEND_ACC_PUBLIC)
PHP_ME(pqconn, poll, ai_pqconn_poll, ZEND_ACC_PUBLIC)
PHP_ME(pqconn, flush, ai_pqconn_flush, ZEND_ACC_PUBLIC)
PHP_ME(pqconn, exec, ai_pqconn_exec, ZEND_ACC_PUBLIC)
PHP_ME(pqconn, execAsync, ai_pqconn_exec_async, ZEND_ACC_PUBLIC)
PHP_ME(pqconn, execParams, ai_pqconn_exec_params, ZEND_ACC_PUBLIC)
@@ -1971,7 +2020,7 @@ PHP_MINIT_FUNCTION(pqconn)
php_pqconn_object_handlers.get_properties = php_pq_object_properties;
php_pqconn_object_handlers.get_debug_info = php_pq_object_debug_info;

zend_hash_init(&php_pqconn_object_prophandlers, 20, NULL, NULL, 1);
zend_hash_init(&php_pqconn_object_prophandlers, 21, NULL, NULL, 1);

zend_declare_property_long(php_pqconn_class_entry, ZEND_STRL("status"), CONNECTION_BAD, ZEND_ACC_PUBLIC TSRMLS_CC);
ph.read = php_pqconn_object_read_status;
@@ -2005,6 +2054,12 @@ PHP_MINIT_FUNCTION(pqconn)
zend_hash_add(&php_pqconn_object_prophandlers, "unbuffered", sizeof("unbuffered"), (void *) &ph, sizeof(ph), NULL);
ph.write = NULL;

zend_declare_property_bool(php_pqconn_class_entry, ZEND_STRL("nonblocking"), 0, ZEND_ACC_PUBLIC TSRMLS_CC);
ph.read = php_pqconn_object_read_nonblocking;
ph.write = php_pqconn_object_write_nonblocking;
zend_hash_add(&php_pqconn_object_prophandlers, "nonblocking", sizeof("nonblocking"), (void *) &ph, sizeof(ph), NULL);
ph.write = NULL;

zend_declare_property_null(php_pqconn_class_entry, ZEND_STRL("db"), ZEND_ACC_PUBLIC TSRMLS_CC);
ph.read = php_pqconn_object_read_db;
zend_hash_add(&php_pqconn_object_prophandlers, "db", sizeof("db"), (void *) &ph, sizeof(ph), NULL);
44 changes: 44 additions & 0 deletions tests/flush001.phpt
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
--TEST--
flush
--SKIPIF--
<?php include "_skipif.inc"; ?>
--FILE--
<?php
echo "Test\n";

include "_setup.inc";

$c = new pq\Connection(PQ_DSN);
$c->nonblocking = true;
var_dump($c->nonblocking);
$c->execAsync("SELECT '".str_repeat("a", 6e7)."'", function($r) {
$r->fetchCol($s);
var_dump(strlen($s));
});
var_dump($flushed = $c->flush());
do {
while (!$flushed || $c->busy) {
$r = $c->busy ? [$c->socket] : null;
$w = !$flushed ?[$c->socket] : null;

if (stream_select($r, $w, $e, null)) {
if ($r) {
printf("P%d", $c->poll());
}
if ($w) {
printf("F%d", $flushed = $c->flush());
}
}
}
echo "\n";
} while ($c->getResult());
?>
===DONE===
--EXPECTF--
Test
bool(true)
bool(%s)
%r(F0)*(F1)*(P3)+%r
int(60000000)

===DONE===

0 comments on commit a3df086

Please sign in to comment.