Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding a transaction manager which allows services to operate on a co… #1

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,14 @@
<version>5.7.0</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>1.4.200</version>
<scope>test</scope>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
package com.github.collinalpert.java2db.database;

import com.github.collinalpert.java2db.exceptions.ConnectionFailedException;
import com.github.collinalpert.java2db.mappers.FieldMapper;
import com.github.collinalpert.java2db.queries.*;
import com.github.collinalpert.java2db.queries.async.*;
import com.mysql.cj.exceptions.CJCommunicationsException;
import com.mysql.cj.jdbc.exceptions.CommunicationsException;

import java.io.Closeable;
import java.sql.*;
Expand All @@ -16,7 +13,7 @@
import static com.github.collinalpert.java2db.utilities.Utilities.supplierHandling;

/**
* Wrapper around {@link Connection} which eases use of connecting to a database and running queries.
* Wrapper around {@link Connection} which eases use of running queries.
* Also supports running functions and stored procedures.
*
* @author Collin Alpert
Expand All @@ -28,27 +25,9 @@ public class DBConnection implements Closeable {
*/
public static boolean LOG_QUERIES = true;

private Connection underlyingConnection;
private final Connection underlyingConnection;
private boolean isConnectionValid;

public DBConnection(ConnectionConfiguration configuration) {
try {
var connectionString = String.format("jdbc:mysql://%s:%d/%s?rewriteBatchedStatements=true", configuration.getHost(), configuration.getPort(), configuration.getDatabase());
Class.forName("com.mysql.cj.jdbc.Driver");
System.setProperty("user", configuration.getUsername());
System.setProperty("password", configuration.getPassword());
DriverManager.setLoginTimeout(configuration.getTimeout());
underlyingConnection = DriverManager.getConnection(connectionString, System.getProperties());
isConnectionValid = true;
} catch (CJCommunicationsException | CommunicationsException e) {
isConnectionValid = false;
throw new ConnectionFailedException();
} catch (ClassNotFoundException | SQLException e) {
e.printStackTrace();
isConnectionValid = false;
}
}

public DBConnection(Connection underlyingConnection) {
this.underlyingConnection = underlyingConnection;
this.isConnectionValid = true;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package com.github.collinalpert.java2db.database;

import javax.sql.DataSource;
import java.io.PrintWriter;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.logging.Logger;

/**
* {@link DataSource} implementation hard-coded to support only MySQL databases.
* Obtains connections directly from {@link java.sql.DriverManager}
*
* @author Tyler Sharpe
*/
public class MySQLDriverManagerDataSource implements DataSource {

static {
try {
Class.forName("com.mysql.cj.jdbc.Driver");
} catch (ClassNotFoundException e) {
throw new ExceptionInInitializerError(e);
}
}

private final ConnectionConfiguration configuration;

public MySQLDriverManagerDataSource(ConnectionConfiguration configuration) {
this.configuration = configuration;
}

@Override
public Connection getConnection() throws SQLException {
return DriverManager.getConnection(
String.format("jdbc:mysql://%s:%d/%s?rewriteBatchedStatements=true", configuration.getHost(), configuration.getPort(), configuration.getDatabase()),
configuration.getUsername(),
configuration.getPassword()
);
}

@Override
public Connection getConnection(String username, String password) {
throw new UnsupportedOperationException();
}

@Override
public PrintWriter getLogWriter() {
return null;
}

@Override
public void setLogWriter(PrintWriter out) {
throw new UnsupportedOperationException();
}

@Override
public void setLoginTimeout(int seconds) {
throw new UnsupportedOperationException();
}

@Override
public int getLoginTimeout() {
return configuration.getTimeout();
}

@Override
public Logger getParentLogger() {
return null;
}

@Override
public <T> T unwrap(Class<T> interfaceType) {
return null;
}

@Override
public boolean isWrapperFor(Class<?> interfaceType) {
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package com.github.collinalpert.java2db.database;

import com.github.collinalpert.java2db.utilities.ThrowableConsumer;
import com.github.collinalpert.java2db.utilities.ThrowableFunction;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;

/**
* Allows to execute code within a database transaction.
*
* This class maintains the notion of a 'current' database connection, which is bound to the currently
* executing thread via a {@link ThreadLocal}. The first call in the stack to execute code within a
* transaction opens a new connection and binds it to this thread local variable. Subsequent calls within
* the same thread which wish to participate within the transaction will then re-use this connection.
*
* @author Tyler Sharpe
*/
public class TransactionManager {

private static final ThreadLocal<DBConnection> CURRENT_THREAD_CONNECTION = new ThreadLocal<>();

private final DataSource dataSource;

public TransactionManager(DataSource dataSource) {
this.dataSource = dataSource;
}

/**
* Run some code inside of a database transaction, creating one if it does not already exist.
*/
public void transact(ThrowableConsumer<DBConnection, SQLException> action) throws SQLException {
transactAndReturn(connection -> {
action.consume(connection);
return null;
});
}

/**
* Run some code inside of a database transaction, creating one if it does not already exist, and then return a value.
* @param action Action to run
* @param <T> Type returned from the action lambda
* @return
* @throws SQLException
*/
public <T> T transactAndReturn(ThrowableFunction<DBConnection, T, SQLException> action) throws SQLException {
if (CURRENT_THREAD_CONNECTION.get() != null) {
return action.run(CURRENT_THREAD_CONNECTION.get());
}

try (Connection rawConnection = dataSource.getConnection()) {
rawConnection.setAutoCommit(false);
DBConnection dbConnection = new DBConnection(rawConnection);
CURRENT_THREAD_CONNECTION.set(dbConnection);

try {
T result = action.run(dbConnection);
rawConnection.commit();
return result;
} catch (Exception exception) {
// rollback transaction on error
try {
rawConnection.rollback();
} catch (Exception rollbackException) {
exception.addSuppressed(rollbackException);
}

throw new SQLException(exception);
}
} finally {
CURRENT_THREAD_CONNECTION.remove();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,26 @@ public class EntityProjectionQuery<E extends BaseEntity, R> implements Queryable
private final Class<R> returnType;
private final IQueryBuilder<E> queryBuilder;
private final QueryParameters<E> queryParameters;
private final ConnectionConfiguration connectionConfiguration;
private final TransactionManager transactionManager;

public EntityProjectionQuery(Class<R> returnType, IQueryBuilder<E> queryBuilder, QueryParameters<E> queryParameters, ConnectionConfiguration connectionConfiguration) {
public EntityProjectionQuery(Class<R> returnType, IQueryBuilder<E> queryBuilder, QueryParameters<E> queryParameters, TransactionManager transactionManager) {
this.returnType = returnType;
this.queryBuilder = queryBuilder;
this.queryParameters = queryParameters;
this.connectionConfiguration = connectionConfiguration;
this.transactionManager = transactionManager;
}

@Override
public Optional<R> first() {
try (var connection = new DBConnection(this.connectionConfiguration);
var result = connection.execute(getQuery())) {
if (result.next()) {
return Optional.ofNullable(result.getObject(1, this.returnType));
}

return Optional.empty();
try {
return transactionManager.transactAndReturn(connection -> {
var result = connection.execute(getQuery());
if (result.next()) {
return Optional.ofNullable(result.getObject(1, this.returnType));
} else {
return Optional.empty();
}
});
} catch (SQLException e) {
e.printStackTrace();
return Optional.empty();
Expand Down Expand Up @@ -106,13 +108,15 @@ public String getQuery() {
* @return A data structure containing a {@code ResultSet}s data.
*/
private <T, D> T resultHandling(D dataStructure, BiConsumer<D, R> valueConsumer, T defaultValue, Function<D, T> valueMapping) {
try (var connection = new DBConnection(this.connectionConfiguration);
var result = connection.execute(getQuery())) {
while (result.next()) {
valueConsumer.accept(dataStructure, result.getObject(1, this.returnType));
}
try {
return transactionManager.transactAndReturn(connection -> {
var result = connection.execute(getQuery());
while (result.next()) {
valueConsumer.accept(dataStructure, result.getObject(1, this.returnType));
}

return valueMapping.apply(dataStructure);
return valueMapping.apply(dataStructure);
});
} catch (SQLException e) {
e.printStackTrace();
return defaultValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
public class EntityQuery<E extends BaseEntity> implements Queryable<E> {

private static final TableModule tableModule = TableModule.getInstance();
protected final ConnectionConfiguration connectionConfiguration;
protected final TransactionManager transactionManager;
protected final IQueryBuilder<E> queryBuilder;
protected final QueryParameters<E> queryParameters;
private final Class<E> type;
Expand All @@ -39,9 +39,9 @@ public class EntityQuery<E extends BaseEntity> implements Queryable<E> {
* @param type The entity to query.
*/

public EntityQuery(Class<E> type, ConnectionConfiguration connectionConfiguration) {
public EntityQuery(Class<E> type, TransactionManager transactionManager) {
this.type = type;
this.connectionConfiguration = connectionConfiguration;
this.transactionManager = transactionManager;
this.queryParameters = new QueryParameters<>();
this.mapper = IoC.resolveMapper(type, new EntityMapper<>(type));
this.queryBuilder = new EntityQueryBuilder<>(type);
Expand Down Expand Up @@ -284,7 +284,7 @@ public <R> Queryable<R> project(SqlFunction<E, R> projection) {
@SuppressWarnings("unchecked") var returnType = (Class<R>) LambdaExpression.parse(projection).getBody().getResultType();
var queryBuilder = new ProjectionQueryBuilder<>(projection, this.getTableName(), (QueryBuilder<E>) this.queryBuilder);

return new EntityProjectionQuery<>(returnType, queryBuilder, this.queryParameters, this.connectionConfiguration);
return new EntityProjectionQuery<>(returnType, queryBuilder, this.queryParameters, this.transactionManager);
}

/**
Expand All @@ -296,8 +296,10 @@ public <R> Queryable<R> project(SqlFunction<E, R> projection) {
@Override
public Optional<E> first() {
this.limit(1);
try (var connection = new DBConnection(this.connectionConfiguration)) {
return this.mapper.map(connection.execute(getQuery()));
try {
return transactionManager.transactAndReturn(connection -> {
return this.mapper.map(connection.execute(getQuery()));
});
} catch (SQLException e) {
e.printStackTrace();
return Optional.empty();
Expand All @@ -311,8 +313,10 @@ public Optional<E> first() {
*/
@Override
public List<E> toList() {
try (var connection = new DBConnection(this.connectionConfiguration)) {
return this.mapper.mapToList(connection.execute(getQuery()));
try {
return transactionManager.transactAndReturn(connection -> {
return this.mapper.mapToList(connection.execute(getQuery()));
});
} catch (SQLException e) {
e.printStackTrace();
return Collections.emptyList();
Expand All @@ -326,8 +330,10 @@ public List<E> toList() {
*/
@Override
public Stream<E> toStream() {
try (var connection = new DBConnection(this.connectionConfiguration)) {
return this.mapper.mapToStream(connection.execute(getQuery()));
try {
return transactionManager.transactAndReturn(connection -> {
return this.mapper.mapToStream(connection.execute(getQuery()));
});
} catch (SQLException e) {
e.printStackTrace();
return Stream.empty();
Expand All @@ -342,8 +348,10 @@ public Stream<E> toStream() {
@Override
@SuppressWarnings("unchecked")
public E[] toArray() {
try (var connection = new DBConnection(this.connectionConfiguration)) {
return this.mapper.mapToArray(connection.execute(getQuery()));
try {
return transactionManager.transactAndReturn(connection -> {
return this.mapper.mapToArray(connection.execute(getQuery()));
});
} catch (SQLException e) {
e.printStackTrace();
return (E[]) Array.newInstance(this.type, 0);
Expand All @@ -359,8 +367,10 @@ public E[] toArray() {
*/
@Override
public <K, V> Map<K, V> toMap(Function<E, K> keyMapping, Function<E, V> valueMapping) {
try (var connection = new DBConnection(this.connectionConfiguration)) {
return this.mapper.mapToMap(connection.execute(getQuery()), keyMapping, valueMapping);
try {
return transactionManager.transactAndReturn(connection -> {
return this.mapper.mapToMap(connection.execute(getQuery()), keyMapping, valueMapping);
});
} catch (SQLException e) {
e.printStackTrace();
return Collections.emptyMap();
Expand All @@ -374,8 +384,10 @@ public <K, V> Map<K, V> toMap(Function<E, K> keyMapping, Function<E, V> valueMap
*/
@Override
public Set<E> toSet() {
try (var connection = new DBConnection(this.connectionConfiguration)) {
return this.mapper.mapToSet(connection.execute(getQuery()));
try {
return transactionManager.transactAndReturn(connection -> {
return this.mapper.mapToSet(connection.execute(getQuery()));
});
} catch (SQLException e) {
e.printStackTrace();

Expand Down
Loading