Skip to content

Commit

Permalink
ohbase table group route
Browse files Browse the repository at this point in the history
  • Loading branch information
foronedream committed Mar 8, 2024
1 parent dac8296 commit 54344dd
Show file tree
Hide file tree
Showing 6 changed files with 467 additions and 27 deletions.
145 changes: 142 additions & 3 deletions src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ public class ObTableClient extends AbstractObTableClient implements Lifecycle {
private int odpPort = 2883;

private ObTable odpTable = null;
// tableGroup <-> Table
private ConcurrentHashMap<String, Lock> TableGroupCacheLocks = new ConcurrentHashMap<String, Lock>();
private ConcurrentHashMap<String, String> TableGroupCache = new ConcurrentHashMap<String, String>(); // tableGroup -> Table
private ConcurrentHashMap<String, String> TableGroupInverted = new ConcurrentHashMap<String, String>(); // Table -> tableGroup

/*
* Init.
Expand Down Expand Up @@ -572,7 +576,7 @@ private <T> T execute(String tableName, TableExecuteCallback<T> callback, ObServ
}
} else {
logger.warn("exhaust retry when replica not readable: {}",
ex.getMessage());
ex.getMessage());
RUNTIME.error("replica not readable", ex);
throw ex;
}
Expand Down Expand Up @@ -1285,6 +1289,45 @@ private TableEntry refreshTableEntry(TableEntry tableEntry, String tableName, bo
return tableEntry;
}

/**
* 根据 tableGroup 获取其中一个tableName
* @param realTableName
* @param tableName
* @return
* @throws Exception
*/
private String refreshTableName(String realTableName, String tableName) throws Exception {
TableEntryKey tableEntryKey = new TableEntryKey(clusterName, tenantName, database,
tableName);
try {
realTableName = loadTableNameWithGroup(serverRoster, //
tableEntryKey,//
tableEntryAcquireConnectTimeout,//
tableEntryAcquireSocketTimeout,//
serverAddressPriorityTimeout,//
serverAddressCachingTimeout, sysUA);
} catch (ObTableNotExistException e) {
RUNTIME.error("refreshTableName from tableGroup meet exception", e);
throw e;
} catch (ObTableServerCacheExpiredException e) {
RUNTIME.error("refreshTableEntry from tableGroup meet exception", e);
throw e;
} catch (Exception e) {
RUNTIME.error("refreshTableEntry from tableGroup meet exception", tableEntryKey, realTableName, e);
throw new ObTableNotExistException(String.format(
"failed to get table name key=%s original tableName=%s ", tableEntryKey,
realTableName), e);
}
TableGroupCache.put(tableName, realTableName);
TableGroupInverted.put(realTableName, tableName);
if (logger.isInfoEnabled()) {
logger.info(
"get table name from tableGroup, dataSource: {}, tableName: {}, refresh: {} key:{} realTableName:{} ",
dataSourceName, tableName, true, tableEntryKey, realTableName);
}
return realTableName;
}

/**
* 根据 rowkey 获取分区 id
* @param tableEntry
Expand Down Expand Up @@ -1679,6 +1722,79 @@ public List<ObPair<Long, ObTableParam>> getTables(String tableName, Object[] sta
return obTableParams;
}

/**
* get table name with table group
* @param tableName
* @param refresh
* @return
* @throws Exception
*/
public String tryGetTableNameFromTableGroupCache(final String tableName, final boolean refresh) throws Exception {
String realTableName = TableGroupCache.get(tableName); // tableGroup -> Table
// get tableName from cache
if (realTableName != null && !realTableName.isEmpty() && !refresh) {
return realTableName;
}

// not find in cache, should get tableName from observer
Lock tempLock = new ReentrantLock();
Lock lock = TableGroupCacheLocks.putIfAbsent(tableName, tempLock);
lock = (lock == null) ? tempLock : lock; // check the first lock

// attempt lock the refreshing action, avoiding concurrent refreshing
// use the time-out mechanism, avoiding the rpc hanging up
boolean acquired = lock.tryLock(metadataRefreshLockTimeout, TimeUnit.MILLISECONDS);

if (!acquired) {
String errMsg = "try to lock tableGroup inflect timeout " + "dataSource:"
+ dataSourceName + " ,tableName:" + tableName
+ " , timeout:" + metadataRefreshLockTimeout + ".";
RUNTIME.error(errMsg);
throw new ObTableEntryRefreshException(errMsg);
}

try {
String newRealTableName = TableGroupCache.get(tableName);
if (((realTableName == null || realTableName.isEmpty()) && (newRealTableName == null || newRealTableName.isEmpty()))
|| (refresh && newRealTableName.equalsIgnoreCase(realTableName))) {
if (logger.isInfoEnabled()) {
if (realTableName != null && !realTableName.isEmpty()) {
logger.info(
"realTableName need refresh, create new table entry, tablename: {}",
tableName);
} else {
logger.info("realTableName not exist, create new table entry, tablename: {}",
tableName);
}
}

try {
return refreshTableName(realTableName, tableName);
} catch (ObTableNotExistException e) {
RUNTIME.error("getOrRefreshTableName from TableGroup meet exception", e);
throw e;
} catch (ObTableServerCacheExpiredException e) {
RUNTIME.error("getOrRefreshTableName from TableGroup meet exception", e);

if (logger.isInfoEnabled()) {
logger.info("server addr is expired and it will refresh metadata.");
}
syncRefreshMetadata();
} catch (Throwable t) {
RUNTIME.error("getOrRefreshTableName from TableGroup meet exception", t);
throw t;
}
// failure reach the try times may all the server change
if (logger.isInfoEnabled()) {
logger.info("refresh table Name from TableGroup failure");
}
}
return newRealTableName;
} finally {
lock.unlock();
}
}

/**
* Aggregate.
* @param tableName table want to aggregate
Expand Down Expand Up @@ -2570,13 +2686,25 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E
.executeInternal();
return batchOpsResult.getResults().get(0);
} else if (request instanceof ObTableQueryRequest) {
ObTableClientQueryImpl tableQuery = new ObTableClientQueryImpl(request.getTableName(),
// TableGroup -> TableName
String tableName = request.getTableName();
if (((ObTableQueryRequest) request).getTableQuery().isHbaseQuery()
&& isTableGroupName(tableName)) {
tableName = tryGetTableNameFromTableGroupCache(tableName, false);
}
ObTableClientQueryImpl tableQuery = new ObTableClientQueryImpl(tableName,
((ObTableQueryRequest) request).getTableQuery(), this);
tableQuery.setEntityType(request.getEntityType());
return new ObClusterTableQuery(tableQuery).executeInternal();
} else if (request instanceof ObTableQueryAsyncRequest) {
// TableGroup -> TableName
String tableName = request.getTableName();
if (((ObTableQueryAsyncRequest) request).getObTableQueryRequest().getTableQuery().isHbaseQuery()
&& isTableGroupName(tableName)) {
tableName = tryGetTableNameFromTableGroupCache(tableName, false);
}
ObTableClientQueryAsyncImpl tableClientQueryAsync = new ObTableClientQueryAsyncImpl(
request.getTableName(), ((ObTableQueryAsyncRequest) request)
tableName, ((ObTableQueryAsyncRequest) request)
.getObTableQueryRequest().getTableQuery(), this);
tableClientQueryAsync.setEntityType(request.getEntityType());
return new ObClusterTableAsyncQuery(tableClientQueryAsync)
Expand Down Expand Up @@ -3086,4 +3214,15 @@ public String toString() {
+ ", \n ocpModel = " + ocpModel + "\n}\n";
}

public ConcurrentHashMap<String, String> getTableGroupInverted() {
return TableGroupInverted;
}

/*
* check table name whether group name
*/
public boolean isTableGroupName(String tabName) {
return !tabName.contains("$");
}

}
160 changes: 158 additions & 2 deletions src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,10 @@ public class LocationUtil {
private static final String home = System.getProperty("user.home",
"/home/admin");

private static final String TABLE_GROUP_GET_TABLE_NAME_V4 = "SELECT /*+READ_CONSISTENCY(WEAK)*/ table_name " +
"FROM oceanbase.CDB_OB_TABLEGROUP_TABLES " +
"WHERE tablegroup_name = ? and tenant_id = ? limit 1;";

private static final int TEMPLATE_PART_ID = -1;

private abstract static class TableEntryRefreshWithPriorityCallback<T> {
Expand Down Expand Up @@ -437,6 +441,155 @@ TableEntry execute(Connection connection)
});
}

private abstract static class GetTableNameWithPriorityCallback<T> {
abstract T execute(ObServerAddr obServerAddr) throws ObTableNotExistException;
}

private abstract static class GetTableNameCallback<T> {
abstract T execute(Connection connection) throws ObTableNotExistException;
}

/*
* call TableName with group
*/
private static String callTableNameRefresh(ObServerAddr obServerAddr, TableEntryKey key,
long connectTimeout, long socketTimeout,
ObUserAuth sysUA, boolean initialized,
GetTableNameCallback<String> callback)
throws ObTableNotExistException {
String url = formatObServerUrl(obServerAddr, connectTimeout, socketTimeout);
Connection connection = null;
String realTableName;
try {
connection = getMetaRefreshConnection(url, sysUA);
realTableName = callback.execute(connection);
} catch (ObTableNotExistException e) {
// avoid to refresh meta for ObTableNotExistException
RUNTIME.error("callTableName meet exception", e);
throw e;
} catch (Exception e) {
throw new ObTableNotExistException(format(
"fail to get table name from remote url=%s, key=%s", url, key), e);
} finally {
try {
if (null != connection) {
connection.close();
}
} catch (SQLException e) {
// ignore
}
}

if (realTableName != null && !realTableName.isEmpty()) {
return realTableName;
} else {
throw new ObTableNotExistException("table name is invalid, addr = " + obServerAddr
+ " key =" + key + " tableName =" + realTableName);
}

}

/*
* call table name with group
*/
private static String callTableNameWithGroup(ServerRoster serverRoster,
long priorityTimeout,
long cachingTimeout,
GetTableNameWithPriorityCallback<String> callable)
throws ObTableNotExistException {
ObServerAddr addr = serverRoster.getServer(priorityTimeout, cachingTimeout);
try {
String realTableName = callable.execute(addr);
serverRoster.resetPriority(addr);
return realTableName;
} catch (ObTableNotExistException e) {
RUNTIME.error("callTableEntryNameWithPriority meet exception", e);
serverRoster.downgradePriority(addr);
throw e;
} catch (Throwable t) {
RUNTIME.error("callTableEntryNameWithPriority meet exception", t);
throw t;
}
}

/*
* load Table Name With table Group
*/
public static String loadTableNameWithGroup(final ServerRoster serverRoster,
final TableEntryKey key,
final long connectTimeout,
final long socketTimeout,
final long priorityTimeout,
final long cachingTimeout,
final ObUserAuth sysUA)
throws ObTableNotExistException {
return callTableNameWithGroup(serverRoster, priorityTimeout, cachingTimeout,
new GetTableNameWithPriorityCallback<String>() {
@Override
String execute(ObServerAddr obServerAddr) throws ObTableEntryRefreshException {
return callTableNameRefresh(obServerAddr, key, connectTimeout, socketTimeout,
sysUA, true, new GetTableNameCallback<String>() {
@Override
String execute(Connection connection)
throws ObTableEntryRefreshException {
return getTableNameFromRemote(connection, key);
}
});
}
});
}

/*
* get TableName From Remote with Group
*/
private static String getTableNameFromRemote(Connection connection, TableEntryKey key)
throws ObTableNotExistException {
PreparedStatement ps = null;
ResultSet rs = null;
String realTableName = "";
int tenantId = -1;
try {
if (ObGlobal.obVsnMajor() == 0) {
getObVersionFromRemote(connection);
}
tenantId = checkTenantExistFromRemote(connection, key);
if (ObGlobal.obVsnMajor() >= 4) {
ps = connection.prepareStatement(TABLE_GROUP_GET_TABLE_NAME_V4);
ps.setString(1, key.getTableName());
ps.setString(2, String.valueOf(tenantId));
} else {
throw new ObTableNotExistException(format(
"fail to get table name from remote in low version than 4, key=%s", key));
}
rs = ps.executeQuery();
while (rs.next()) {
realTableName = rs.getString("table_name");
}


} catch (ObTableNotExistException e) {
// avoid to refresh meta for ObTableNotExistException
RUNTIME.error("getTableNameFromRemote meet exception", e);
throw e;
} catch (Exception e) {
RUNTIME.error("getTableNameFromRemote meet exception", e);
throw new ObTableNotExistException(format(
"fail to get table name from remote, key=%s", key), e);
} finally {
try {
if (null != rs) {
rs.close();
}
if (null != ps) {
ps.close();
}
} catch (SQLException e) {
// ignore
}
}
return realTableName;
}

/*
* Load table entry randomly.
*/
Expand Down Expand Up @@ -486,13 +639,15 @@ private static void getObVersionFromRemote(Connection connection)
}

// check tenant exist or not
private static void checkTenantExistFromRemote(Connection connection, TableEntryKey key)
private static int checkTenantExistFromRemote(Connection connection, TableEntryKey key)
throws ObTableEntryRefreshException {
try (PreparedStatement ps = connection.prepareStatement(OB_TENANT_EXIST_SQL)) {
ps.setString(1, key.getTenantName());
try (ResultSet rs = ps.executeQuery()) {
if (!rs.next()) {
throw new ObTableEntryRefreshException("fail to get tenant id from remote");
} else {
return rs.getInt("tenant_id");
}
} catch (Exception e) {
throw new ObTableEntryRefreshException("fail to get tenant id from remote", e);
Expand All @@ -508,11 +663,12 @@ private static TableEntry getTableEntryFromRemote(Connection connection, TableEn
PreparedStatement ps = null;
ResultSet rs = null;
TableEntry tableEntry;
int tenantId = -1;
try {
if (ObGlobal.obVsnMajor() == 0) {
getObVersionFromRemote(connection);
}
checkTenantExistFromRemote(connection, key);
tenantId = checkTenantExistFromRemote(connection, key);
if (ObGlobal.obVsnMajor() >= 4) {
if (key.getTableName().equals(Constants.ALL_DUMMY_TABLE)) {
ps = connection.prepareStatement(PROXY_DUMMY_LOCATION_SQL_V4);
Expand Down
Loading

0 comments on commit 54344dd

Please sign in to comment.