Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipelining for Oracle Client #1434

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public static OracleConnectOptions wrap(SqlConnectOptions options) {
public static final String DEFAULT_PASSWORD = "";
public static final String DEFAULT_DATABASE = "";
public static final boolean DEFAULT_SSL = false;
public static final int DEFAULT_PIPELINING_LIMIT = 1;

private String serviceId;
private String serviceName;
Expand All @@ -51,6 +52,7 @@ public static OracleConnectOptions wrap(SqlConnectOptions options) {
private String tnsAlias;
private String tnsAdmin;
private boolean ssl;
private int pipeliningLimit = DEFAULT_PIPELINING_LIMIT;

public OracleConnectOptions() {
super();
Expand All @@ -69,6 +71,7 @@ private void copyFields(OracleConnectOptions other) {
this.tnsAlias = other.tnsAlias;
this.tnsAdmin = other.tnsAdmin;
this.ssl = other.ssl;
this.pipeliningLimit = other.pipeliningLimit;
}

public OracleConnectOptions(SqlConnectOptions options) {
Expand Down Expand Up @@ -208,6 +211,29 @@ public OracleConnectOptions setTnsAdmin(String tnsAdmin) {
return this;
}

/**
* Get the pipelining limit count.
*
* @return the pipelining count
*/
public int getPipeliningLimit() {
return pipeliningLimit;
}

/**
* Set the pipelining limit count.
*
* @param pipeliningLimit the count to configure
* @return a reference to this, so the API can be used fluently
*/
public OracleConnectOptions setPipeliningLimit(int pipeliningLimit) {
if (pipeliningLimit < 1) {
throw new IllegalArgumentException("pipelining limit can not be less than 1");
}
this.pipeliningLimit = pipeliningLimit;
return this;
}

// Non-specific options

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public Future<SqlConnection> connect(Context context, OracleConnectOptions optio
return executeBlocking(context, () -> {
OracleConnection orac = datasource.createConnectionBuilder().build();
OracleMetadata metadata = new OracleMetadata(orac.getMetaData());
OracleJdbcConnection conn = new OracleJdbcConnection(ctx, metrics, options, orac, metadata);
OracleJdbcConnection conn = new OracleJdbcConnection(ctx, metrics, options, orac, metadata, options.getPipeliningLimit());
OracleConnectionImpl msConn = new OracleConnectionImpl(ctx, this, conn);
conn.init(msConn);
return msConn;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,20 +41,23 @@ public class OracleJdbcConnection implements Connection {
private final OracleConnectOptions options;
@SuppressWarnings("rawtypes")
private final ConcurrentMap<String, RowReader> cursors = new ConcurrentHashMap<>();
private final int pipeliningLimit;
private Holder holder;

// Command pipeline state
@SuppressWarnings("rawtypes")
private final Deque<CommandBase> pending = new ArrayDeque<>();
private Promise<Void> closePromise;
private boolean inflight, executing;
private boolean executing;
private int inflight;

public OracleJdbcConnection(ContextInternal ctx, ClientMetrics metrics, OracleConnectOptions options, OracleConnection oc, OracleMetadata metadata) {
public OracleJdbcConnection(ContextInternal ctx, ClientMetrics metrics, OracleConnectOptions options, OracleConnection oc, OracleMetadata metadata, int pipeliningLimit) {
this.context = ctx;
this.metrics = metrics;
this.options = options;
this.connection = oc;
this.metadata = metadata;
this.pipeliningLimit = pipeliningLimit;
}

@Override
Expand Down Expand Up @@ -183,8 +186,8 @@ private void checkPending() {
try {
executing = true;
CommandBase cmd;
while (!inflight && (cmd = pending.poll()) != null) {
inflight = true;
while (inflight < pipeliningLimit && (cmd = pending.poll()) != null) {
inflight++;
if (metrics != null && cmd instanceof CloseConnectionCommand) {
metrics.close();
}
Expand Down Expand Up @@ -243,7 +246,7 @@ private OracleCommand forExtendedQuery(ExtendedQueryCommand cmd) {
}

private void actionComplete(CommandBase cmd, OracleCommand<?> action, AsyncResult<Void> ar) {
inflight = false;
inflight--;
Future<Void> future = Future.succeededFuture();
if (ar.failed()) {
Throwable cause = ar.cause();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package io.vertx.oracleclient.test.tck;

import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import io.vertx.oracleclient.OracleBuilder;
import io.vertx.oracleclient.OracleConnectOptions;
import io.vertx.oracleclient.test.junit.OracleRule;
import io.vertx.sqlclient.PoolOptions;
import io.vertx.sqlclient.Row;
import io.vertx.sqlclient.SqlClient;
import io.vertx.sqlclient.Tuple;
import io.vertx.sqlclient.tck.PipeliningQueryTestBase;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.runner.RunWith;

import java.util.ArrayList;
import java.util.List;

@RunWith(VertxUnitRunner.class)
public class OraclePipeliningQueryTest extends PipeliningQueryTestBase {

@ClassRule
public static final OracleRule rule = OracleRule.SHARED_INSTANCE;

@Override
protected void cleanTestTable(TestContext ctx) {
connectionConnector.connect(ctx.asyncAssertSuccess(conn -> {
conn
.query("TRUNCATE TABLE mutable")
.execute()
.onComplete(ctx.asyncAssertSuccess(result -> {
conn.close();
}));
}));
}

@Override
protected void init() {
options = rule.options();
OracleConnectOptions oracleConnectOptions = (OracleConnectOptions) options;
oracleConnectOptions.setPipeliningLimit(64);
connectionConnector = ClientConfig.CONNECT.connect(vertx, oracleConnectOptions);
pooledConnectionConnector = ClientConfig.POOLED.connect(vertx, oracleConnectOptions);
pooledClientSupplier = () -> OracleBuilder.pool(b -> b.connectingTo(oracleConnectOptions).with(new PoolOptions().setMaxSize(8)).using(vertx));
}

@Override
protected String statement(String... parts) {
return String.join("?", parts);
}

@Override
protected void testOneShotPreparedBatchInsert(TestContext ctx, SqlClient client) {
List<Tuple> batchParams = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
batchParams.add(Tuple.of(i, String.format("val-%d", i)));
}

client.preparedQuery(statement("INSERT INTO mutable(id, val) VALUES (", ", ",")"))
.executeBatch(batchParams)
.onComplete(ctx.asyncAssertSuccess(res -> {
ctx.assertEquals(1000, res.rowCount());

client.query("SELECT id, val FROM mutable")
.execute()
.onComplete(ctx.asyncAssertSuccess(res2 -> {
ctx.assertEquals(1000, res2.size());
int i = 0;
for (Row row : res2) {
ctx.assertEquals(2, row.size());
ctx.assertEquals(i, row.getInteger(0));
ctx.assertEquals(String.format("val-%d", i), row.getString(1));
i++;
}
client.close();
}));
}));
}

@Override
@Ignore("Oracle does not support batching queries")
public void testOneShotPreparedBatchQueryConn(TestContext ctx) {

}

@Override
@Ignore("Oracle does not support batching queries")
public void testOneShotPreparedBatchQueryPool(TestContext ctx) {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ public void testOneShotPreparedBatchInsertPool(TestContext ctx) {
testOneShotPreparedBatchInsert(ctx, client);
}

private void testOneShotPreparedBatchInsert(TestContext ctx, SqlClient client) {
protected void testOneShotPreparedBatchInsert(TestContext ctx, SqlClient client) {
Async latch = ctx.async(1000);
List<Tuple> batchParams = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
Expand Down Expand Up @@ -228,7 +228,7 @@ private void testOneShotPreparedBatchInsert(TestContext ctx, SqlClient client) {
}));
}

private void cleanTestTable(TestContext ctx) {
protected void cleanTestTable(TestContext ctx) {
connectionConnector.connect(ctx.asyncAssertSuccess(conn -> {
conn
.query("TRUNCATE TABLE mutable;")
Expand Down
Loading