Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preferred server type connection factory #1064

Open
wants to merge 30 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
8749e4d
MS SQL Connection Options: same as JDBC
tsegismont Jun 9, 2021
785f090
MSSQL: Document identity column retrieval (#976)
tsegismont Jun 9, 2021
f6547ae
Add 'generated' folders to .gitignore
DavideD Feb 14, 2021
14be7a3
Add bin folders to .gitignore
DavideD Feb 14, 2021
4e40cdb
[#887] Add testcase for select with fetch first
DavideD Feb 14, 2021
6072062
Adding and handling QRYBLKFCT codepoint (0x215F)
mswatosh Jun 1, 2021
f0037a2
MSSQL: Fix datetime2 encoding (#982)
tsegismont Jun 11, 2021
452335c
Disable DB2SecureTest which fails consistently
tsegismont Jun 11, 2021
ed5e15d
Update doc for external SQL Server test database
tsegismont Jun 11, 2021
7cdef9e
Remove dead code
vietj Jun 14, 2021
043000a
Implement batch queries for MSSQL Client (#985)
tsegismont Jun 14, 2021
f04135e
Simplify data type codec maintenance (#987)
tsegismont Jun 16, 2021
5f82580
Run tests that were previously ignored
tsegismont Jun 16, 2021
00701b5
Allow to configure database container with a fixed port (#988)
tsegismont Jun 17, 2021
21cc18c
Converted TokenType enum to constants class
tsegismont Jun 17, 2021
65efa00
Moved EnvChange to codec package
tsegismont Jun 17, 2021
0b75180
Converted MessageStatus enum to constants class
tsegismont Jun 17, 2021
1f65113
Fixup
tsegismont Jun 17, 2021
0e5d3d3
Converted MessageType enum to constants class
tsegismont Jun 17, 2021
fa01c25
Moved DoneToken to codec package
tsegismont Jun 17, 2021
ef41146
Moved ProcId to codec package
tsegismont Jun 17, 2021
9b3add8
Implement Inet PostgreSQL data type support - fixes #992
vietj Jun 21, 2021
4502b19
Support INET array
vietj Jun 21, 2021
752441d
Merge remote-tracking branch 'upstream/master'
Rattenkrieg Jun 28, 2021
01eb6c3
Merge remote-tracking branch 'upstream/master'
Rattenkrieg Jun 28, 2021
700a40e
Merge remote-tracking branch 'upstream/master'
Rattenkrieg Nov 1, 2021
6f91a36
Multi-host connection factory with configurable server type preferences
Rattenkrieg Nov 8, 2021
3b834a2
Merge remote-tracking branch 'upstream/master' into Preferred-server-…
Rattenkrieg Nov 8, 2021
bfd8cfc
add license header
Rattenkrieg Nov 8, 2021
22ead9a
add javadocs
Rattenkrieg Nov 9, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ public static void fromJson(Iterable<java.util.Map.Entry<String, Object>> json,
obj.setPipeliningLimit(((Number)member.getValue()).intValue());
}
break;
case "shouldQueryServerType":
if (member.getValue() instanceof Boolean) {
obj.setShouldQueryServerType((Boolean)member.getValue());
}
break;
case "sslMode":
if (member.getValue() instanceof String) {
obj.setSslMode(io.vertx.pgclient.SslMode.valueOf((String)member.getValue()));
Expand All @@ -42,6 +47,7 @@ public static void toJson(PgConnectOptions obj, JsonObject json) {

public static void toJson(PgConnectOptions obj, java.util.Map<String, Object> json) {
json.put("pipeliningLimit", obj.getPipeliningLimit());
json.put("shouldQueryServerType", obj.getShouldQueryServerType());
if (obj.getSslMode() != null) {
json.put("sslMode", obj.getSslMode().name());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ public static PgConnectOptions fromEnv() {
public static final String DEFAULT_PASSWORD = "pass";
public static final int DEFAULT_PIPELINING_LIMIT = 256;
public static final SslMode DEFAULT_SSLMODE = SslMode.DISABLE;
public static final boolean DEFAULT_SHOULD_QUERY_SERVER_TYPE = false;
public static final Map<String, String> DEFAULT_PROPERTIES;

static {
Expand All @@ -123,6 +124,7 @@ public static PgConnectOptions fromEnv() {
}

private int pipeliningLimit = DEFAULT_PIPELINING_LIMIT;
private boolean shouldQueryServerType = DEFAULT_SHOULD_QUERY_SERVER_TYPE;
private SslMode sslMode = DEFAULT_SSLMODE;

public PgConnectOptions() {
Expand Down Expand Up @@ -224,6 +226,13 @@ public SslMode getSslMode() {
return sslMode;
}

/**
* @return the value of current shouldQueryServerType
*/
public boolean getShouldQueryServerType() {
return shouldQueryServerType;
}

/**
* Set {@link SslMode} for the client, this option can be used to provide different levels of secure protection.
*
Expand All @@ -235,6 +244,19 @@ public PgConnectOptions setSslMode(SslMode sslmode) {
return this;
}

/**
* Set whether the client should query server type,
* If true, connection should issue implementation specific query
* to read {@link io.vertx.sqlclient.ServerType} of host being connected to
*
* @param shouldQueryServerType the value of shouldQueryServerType
* @return a reference to this, so the API can be used fluently
*/
public PgConnectOptions setShouldQueryServerType(boolean shouldQueryServerType) {
this.shouldQueryServerType = shouldQueryServerType;
return this;
}

@Override
public PgConnectOptions setSendBufferSize(int sendBufferSize) {
return (PgConnectOptions)super.setSendBufferSize(sendBufferSize);
Expand Down Expand Up @@ -464,6 +486,7 @@ protected void init() {
this.setUser(DEFAULT_USER);
this.setPassword(DEFAULT_PASSWORD);
this.setDatabase(DEFAULT_DATABASE);
this.setShouldQueryServerType(DEFAULT_SHOULD_QUERY_SERVER_TYPE);
this.setProperties(new HashMap<>(DEFAULT_PROPERTIES));
}

Expand Down Expand Up @@ -493,6 +516,7 @@ public boolean equals(Object o) {

if (pipeliningLimit != that.pipeliningLimit) return false;
if (sslMode != that.sslMode) return false;
if (shouldQueryServerType != that.shouldQueryServerType) return false;

return true;
}
Expand All @@ -502,6 +526,7 @@ public int hashCode() {
int result = super.hashCode();
result = 31 * result + pipeliningLimit;
result = 31 * result + sslMode.hashCode();
result = 31 * result + (shouldQueryServerType ? 1 : 0);
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import io.vertx.core.impl.ContextInternal;
import io.vertx.pgclient.impl.PgConnectionImpl;
import io.vertx.sqlclient.ServerType;
import io.vertx.sqlclient.PreparedStatement;
import io.vertx.sqlclient.SqlConnection;
import io.vertx.codegen.annotations.Fluent;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,27 @@
import io.vertx.core.net.impl.NetSocketInternal;
import io.vertx.pgclient.PgConnectOptions;
import io.vertx.pgclient.SslMode;
import io.vertx.sqlclient.ServerType;
import io.vertx.sqlclient.SqlConnectOptions;
import io.vertx.sqlclient.SqlConnection;
import io.vertx.sqlclient.impl.Connection;
import io.vertx.sqlclient.impl.ConnectionFactoryBase;
import io.vertx.sqlclient.impl.tracing.QueryTracer;

import java.util.Collections;
import java.util.stream.Stream;

import static io.vertx.sqlclient.ServerType.*;

/**
* @author <a href="mailto:[email protected]">Julien Viet</a>
*/
public class PgConnectionFactory extends ConnectionFactoryBase {

private SslMode sslMode;
private int pipeliningLimit;
private boolean shouldQueryServerType;
private ServerType serverType = UNDEFINED;

public PgConnectionFactory(VertxInternal context, PgConnectOptions options) {
super(context, options);
Expand All @@ -55,6 +63,7 @@ protected void initializeConfiguration(SqlConnectOptions connectOptions) {
PgConnectOptions options = (PgConnectOptions) connectOptions;
this.pipeliningLimit = options.getPipeliningLimit();
this.sslMode = options.isUsingDomainSocket() ? SslMode.DISABLE : options.getSslMode();
this.shouldQueryServerType = options.getShouldQueryServerType();

// check ssl mode here
switch (sslMode) {
Expand Down Expand Up @@ -151,16 +160,35 @@ public Future<SqlConnection> connect(Context context) {
ContextInternal contextInternal = (ContextInternal) context;
PromiseInternal<SqlConnection> promise = contextInternal.promise();
connect(asEventLoopContext(contextInternal))
.map(conn -> {
.flatMap(conn -> {
QueryTracer tracer = contextInternal.tracer() == null ? null : new QueryTracer(contextInternal.tracer(), options);
PgConnectionImpl pgConn = new PgConnectionImpl(this, contextInternal, conn, tracer, null);
conn.init(pgConn);
return (SqlConnection)pgConn;
serverType = ((PgSocketConnection) conn).serverType;
if (serverType == UNDEFINED && shouldQueryServerType) {
String paramStatus = conn.getDatabaseMetaData().majorVersion() >= 14 ? "in_hot_standby" : "transaction_read_only";
return pgConn.query(String.format("SHOW %s;", paramStatus)).execute().map(pgRowSet -> {
pgRowSet.forEach(row ->
serverType = "off".equalsIgnoreCase(row.getString(paramStatus)) ? PRIMARY : REPLICA
);
return pgConn;
});
} else {
return Future.succeededFuture((SqlConnection) pgConn);
}
})
.onComplete(promise);
return promise.future();
}

/**
* {@inheritDoc}
*/
@Override
public ServerType getServerType() {
return serverType;
}

private PgSocketConnection newSocketConnection(EventLoopContext context, NetSocketInternal socket) {
return new PgSocketConnection(socket, cachePreparedStatements, preparedStatementCacheSize, preparedStatementCacheSqlFilter, pipeliningLimit, context);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.vertx.pgclient.PgConnectOptions;
import io.vertx.pgclient.PgConnection;
import io.vertx.pgclient.PgNotification;
import io.vertx.sqlclient.ServerType;
import io.vertx.sqlclient.impl.Connection;
import io.vertx.sqlclient.impl.Notification;
import io.vertx.sqlclient.impl.SqlConnectionImpl;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.vertx.pgclient.*;
import io.vertx.pgclient.spi.PgDriver;
import io.vertx.sqlclient.PoolOptions;
import io.vertx.sqlclient.ServerRequirement;
import io.vertx.sqlclient.SqlConnectOptions;
import io.vertx.sqlclient.SqlConnection;
import io.vertx.sqlclient.impl.Connection;
Expand Down Expand Up @@ -71,8 +72,12 @@ public static PgPoolImpl create(final VertxInternal vertx, boolean pipelined, Li
int pipeliningLimit = pipelined ? baseConnectOptions.getPipeliningLimit() : 1;
PgPoolImpl pool = new PgPoolImpl(vx, baseConnectOptions, tracer, metrics, pipeliningLimit, poolOptions);
PgDriver driver = new PgDriver();
List<ConnectionFactory> lst = servers.stream().map(options -> driver.createConnectionFactory(vx, options)).collect(Collectors.toList());
ConnectionFactory factory = ConnectionFactory.roundRobinSelector(lst);
List<ConnectionFactory> lst = servers.stream()
.map(options -> driver.createConnectionFactory(vx, PgConnectOptions.wrap(options)
.setShouldQueryServerType(poolOptions.getServerRequirement() != ServerRequirement.ANY))
)
.collect(Collectors.toList());
ConnectionFactory factory = ConnectionFactory.selector(lst, poolOptions.getServerRequirement());
pool.connectionProvider(factory::connect);
pool.init();
CloseFuture closeFuture = pool.closeFuture();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.vertx.core.net.impl.NetSocketInternal;
import io.vertx.pgclient.PgException;
import io.vertx.pgclient.impl.codec.PgCodec;
import io.vertx.sqlclient.ServerType;
import io.vertx.pgclient.impl.codec.TxFailedEvent;
import io.vertx.sqlclient.impl.*;
import io.vertx.sqlclient.impl.command.*;
Expand All @@ -36,6 +37,8 @@
import java.util.Map;
import java.util.function.Predicate;

import static io.vertx.sqlclient.ServerType.UNDEFINED;

/**
* @author <a href="mailto:[email protected]">Julien Viet</a>
*/
Expand All @@ -45,6 +48,8 @@ public class PgSocketConnection extends SocketConnectionBase {
public int processId;
public int secretKey;
public PgDatabaseMetadata dbMetaData;
// TODO: consider defining getter on SocketConnectionBase
public ServerType serverType = UNDEFINED;

public PgSocketConnection(NetSocketInternal socket,
boolean cachePreparedStatements,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
import io.vertx.sqlclient.impl.command.CommandResponse;
import io.vertx.sqlclient.impl.command.InitCommand;

import static io.vertx.sqlclient.ServerType.PRIMARY;
import static io.vertx.sqlclient.ServerType.REPLICA;

class InitCommandCodec extends PgCommandCodec<Connection, InitCommand> {

private PgEncoder encoder;
Expand Down Expand Up @@ -87,6 +90,9 @@ public void handleParameterStatus(String key, String value) {
if(key.equals("server_version")) {
((PgSocketConnection)cmd.connection()).dbMetaData = new PgDatabaseMetadata(value);
}
if(key.equals("in_hot_standby")) {
((PgSocketConnection)cmd.connection()).serverType = "off".equalsIgnoreCase(value) ? PRIMARY : REPLICA;
}
}

@Override
Expand Down
Loading