Skip to content

Commit

Permalink
Merge pull request #115 from foronedream/master
Browse files Browse the repository at this point in the history
ohbase table group route
  • Loading branch information
foronedream authored Mar 19, 2024
2 parents fdfe9fb + 32647f3 commit 3501f55
Show file tree
Hide file tree
Showing 6 changed files with 469 additions and 23 deletions.
165 changes: 162 additions & 3 deletions src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ public class ObTableClient extends AbstractObTableClient implements Lifecycle {
private int odpPort = 2883;

private ObTable odpTable = null;
// tableGroup <-> Table
private ConcurrentHashMap<String, Lock> TableGroupCacheLocks = new ConcurrentHashMap<String, Lock>();
private ConcurrentHashMap<String, String> TableGroupCache = new ConcurrentHashMap<String, String>(); // tableGroup -> Table
private ConcurrentHashMap<String, String> TableGroupInverted = new ConcurrentHashMap<String, String>(); // Table -> tableGroup

/*
* Init.
Expand Down Expand Up @@ -572,7 +576,7 @@ private <T> T execute(String tableName, TableExecuteCallback<T> callback, ObServ
}
} else {
logger.warn("exhaust retry when replica not readable: {}",
ex.getMessage());
ex.getMessage());
RUNTIME.error("replica not readable", ex);
throw ex;
}
Expand Down Expand Up @@ -1284,6 +1288,50 @@ private TableEntry refreshTableEntry(TableEntry tableEntry, String tableName, bo
return tableEntry;
}

/**
* 根据 tableGroup 获取其中一个tableName
* physicalTableName Complete table from table group
* @param physicalTableName
* @param tableGroupName
* @return
* @throws Exception
*/
private String refreshTableNameByTableGroup(String physicalTableName, String tableGroupName) throws Exception {
TableEntryKey tableEntryKey = new TableEntryKey(clusterName, tenantName, database,
tableGroupName);
String oldTableName = physicalTableName;
try {
physicalTableName = loadTableNameWithGroupName(serverRoster, //
tableEntryKey,//
tableEntryAcquireConnectTimeout,//
tableEntryAcquireSocketTimeout,//
serverAddressPriorityTimeout,//
serverAddressCachingTimeout, sysUA);
} catch (ObTableNotExistException e) {
RUNTIME.error("refreshTableNameByTableGroup from tableGroup meet exception", e);
throw e;
} catch (ObTableServerCacheExpiredException e) {
RUNTIME.error("refreshTableEntry from tableGroup meet exception", e);
throw e;
} catch (Exception e) {
RUNTIME.error("refreshTableEntry from tableGroup meet exception", tableEntryKey, physicalTableName, e);
throw new ObTableNotExistException(String.format(
"failed to get table name key=%s original tableName=%s ", tableEntryKey,
physicalTableName), e);
}
if (!TableGroupInverted.isEmpty() && TableGroupInverted.containsKey(oldTableName)) {
TableGroupInverted.remove(oldTableName, tableGroupName);
}
TableGroupCache.put(tableGroupName, physicalTableName);
TableGroupInverted.put(physicalTableName, tableGroupName);
if (logger.isInfoEnabled()) {
logger.info(
"get table name from tableGroup, dataSource: {}, tableName: {}, refresh: {} key:{} realTableName:{} ",
dataSourceName, tableGroupName, true, tableEntryKey, physicalTableName);
}
return physicalTableName;
}

/**
* 根据 rowkey 获取分区 id
* @param tableEntry
Expand Down Expand Up @@ -1678,6 +1726,79 @@ public List<ObPair<Long, ObTableParam>> getTables(String tableName, Object[] sta
return obTableParams;
}

/**
* get table name with table group
* @param tableGroupName
* @param refresh
* @return
* @throws Exception
*/
public String tryGetTableNameFromTableGroupCache(final String tableGroupName, final boolean refresh) throws Exception {
String physicalTableName = TableGroupCache.get(tableGroupName); // tableGroup -> Table
// get tableName from cache
if (physicalTableName != null && !refresh) {
return physicalTableName;
}

// not find in cache, should get tableName from observer
Lock tempLock = new ReentrantLock();
Lock lock = TableGroupCacheLocks.putIfAbsent(tableGroupName, tempLock);
lock = (lock == null) ? tempLock : lock; // check the first lock

// attempt lock the refreshing action, avoiding concurrent refreshing
// use the time-out mechanism, avoiding the rpc hanging up
boolean acquired = lock.tryLock(metadataRefreshLockTimeout, TimeUnit.MILLISECONDS);

if (!acquired) {
String errMsg = "try to lock tableGroup inflect timeout " + "dataSource:"
+ dataSourceName + " ,tableName:" + tableGroupName
+ " , timeout:" + metadataRefreshLockTimeout + ".";
RUNTIME.error(errMsg);
throw new ObTableEntryRefreshException(errMsg);
}

try {
String newPhyTableName = TableGroupCache.get(tableGroupName);
if (((physicalTableName == null) && (newPhyTableName == null))
|| (refresh && newPhyTableName.equalsIgnoreCase(physicalTableName))) {
if (logger.isInfoEnabled()) {
if (physicalTableName != null) {
logger.info(
"realTableName need refresh, create new table entry, tablename: {}",
tableGroupName);
} else {
logger.info("realTableName not exist, create new table entry, tablename: {}",
tableGroupName);
}
}

try {
return refreshTableNameByTableGroup(physicalTableName, tableGroupName);
} catch (ObTableNotExistException e) {
RUNTIME.error("getOrRefreshTableName from TableGroup meet exception", e);
throw e;
} catch (ObTableServerCacheExpiredException e) {
RUNTIME.error("getOrRefreshTableName from TableGroup meet exception", e);

if (logger.isInfoEnabled()) {
logger.info("server addr is expired and it will refresh metadata.");
}
syncRefreshMetadata();
} catch (Throwable t) {
RUNTIME.error("getOrRefreshTableName from TableGroup meet exception", t);
throw t;
}
// failure reach the try times may all the server change
if (logger.isInfoEnabled()) {
logger.info("refresh table Name from TableGroup failure");
}
}
return newPhyTableName;
} finally {
lock.unlock();
}
}

/**
* Aggregate.
* @param tableName table want to aggregate
Expand Down Expand Up @@ -2569,13 +2690,25 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E
.executeInternal();
return batchOpsResult.getResults().get(0);
} else if (request instanceof ObTableQueryRequest) {
ObTableClientQueryImpl tableQuery = new ObTableClientQueryImpl(request.getTableName(),
// TableGroup -> TableName
String tableName = request.getTableName();
if (((ObTableQueryRequest) request).getTableQuery().isHbaseQuery()
&& isTableGroupName(tableName)) {
tableName = tryGetTableNameFromTableGroupCache(tableName, false);
}
ObTableClientQueryImpl tableQuery = new ObTableClientQueryImpl(tableName,
((ObTableQueryRequest) request).getTableQuery(), this);
tableQuery.setEntityType(request.getEntityType());
return new ObClusterTableQuery(tableQuery).executeInternal();
} else if (request instanceof ObTableQueryAsyncRequest) {
// TableGroup -> TableName
String tableName = request.getTableName();
if (((ObTableQueryAsyncRequest) request).getObTableQueryRequest().getTableQuery().isHbaseQuery()
&& isTableGroupName(tableName)) {
tableName = tryGetTableNameFromTableGroupCache(tableName, false);
}
ObTableClientQueryAsyncImpl tableClientQueryAsync = new ObTableClientQueryAsyncImpl(
request.getTableName(), ((ObTableQueryAsyncRequest) request)
tableName, ((ObTableQueryAsyncRequest) request)
.getObTableQueryRequest().getTableQuery(), this);
tableClientQueryAsync.setEntityType(request.getEntityType());
return new ObClusterTableAsyncQuery(tableClientQueryAsync)
Expand Down Expand Up @@ -3085,4 +3218,30 @@ public String toString() {
+ ", \n ocpModel = " + ocpModel + "\n}\n";
}

public ConcurrentHashMap<String, String> getTableGroupInverted() {
return TableGroupInverted;
}

public ConcurrentHashMap<String, String> getTableGroupCache() {
return TableGroupCache;
}

/**
* get table route fail than clear table group message
* @param tableGroupName
*/
public void eraseTableGroupFromCache(String tableGroupName) {
// clear table group cache
TableGroupInverted.remove(TableGroupCache.get(tableGroupName));
TableGroupCache.remove(tableGroupName);
TableGroupCacheLocks.remove(tableGroupName);
}

/*
* check table name whether group name
*/
public boolean isTableGroupName(String tabName) {
return !tabName.contains("$");
}

}
107 changes: 105 additions & 2 deletions src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,10 @@ public class LocationUtil {
private static final String home = System.getProperty("user.home",
"/home/admin");

private static final String TABLE_GROUP_GET_TABLE_NAME_V4 = "SELECT /*+READ_CONSISTENCY(WEAK)*/ table_name " +
"FROM oceanbase.CDB_OB_TABLEGROUP_TABLES " +
"WHERE tablegroup_name = ? and tenant_id = ? limit 1;";

private static final int TEMPLATE_PART_ID = -1;

private abstract static class TableEntryRefreshWithPriorityCallback<T> {
Expand Down Expand Up @@ -437,6 +441,102 @@ TableEntry execute(Connection connection)
});
}

/*
* load Table Name With table Group
*/
public static String loadTableNameWithGroupName(final ServerRoster serverRoster,
final TableEntryKey key,
final long connectTimeout,
final long socketTimeout,
final long priorityTimeout,
final long cachingTimeout,
final ObUserAuth sysUA)
throws ObTableNotExistException {
Connection connection = null;
String realTableName = "";
String url = "";
ObServerAddr addr = serverRoster.getServer(priorityTimeout, cachingTimeout);
try {
url = formatObServerUrl(addr, connectTimeout, socketTimeout);
connection = getMetaRefreshConnection(url, sysUA);
realTableName = getTableNameByGroupNameFromRemote(connection, key);
serverRoster.resetPriority(addr);
} catch (ObTableNotExistException e) {
RUNTIME.error("callTableEntryNameWithPriority meet exception", e);
serverRoster.downgradePriority(addr);
throw e;
} catch (Exception e) {
throw new ObTableNotExistException(format(
"fail to get table name from remote url=%s, key=%s", url, key), e);
} catch (Throwable t) {
RUNTIME.error("callTableEntryNameWithPriority meet exception", t);
throw t;
} finally {
try {
if (null != connection) {
connection.close();
}
} catch (SQLException e) {
// ignore
}
}
if (realTableName != null && !realTableName.isEmpty()) {
return realTableName;
} else {
throw new ObTableNotExistException("table name is invalid, addr = " + addr + " key =" + key + " tableName =" + realTableName);
}

}

/*
* get TableName From Remote with Group
*/
private static String getTableNameByGroupNameFromRemote(Connection connection, TableEntryKey key)
throws ObTableNotExistException {
PreparedStatement ps = null;
ResultSet rs = null;
String realTableName = "";
int tenantId = -1;
try {
if (ObGlobal.obVsnMajor() == 0) {
getObVersionFromRemote(connection);
}
tenantId = checkTenantExistFromRemote(connection, key);
if (ObGlobal.obVsnMajor() >= 4) {
ps = connection.prepareStatement(TABLE_GROUP_GET_TABLE_NAME_V4);
ps.setString(1, key.getTableName());
ps.setString(2, String.valueOf(tenantId));
} else {
throw new ObTableNotExistException(format(
"fail to get table name from remote in low version than 4, key=%s", key));
}
rs = ps.executeQuery();
while (rs.next()) {
realTableName = rs.getString("table_name");
}
} catch (ObTableNotExistException e) {
// avoid to refresh meta for ObTableNotExistException
RUNTIME.error("getTableNameByGroupNameFromRemote meet exception", e);
throw e;
} catch (Exception e) {
RUNTIME.error("getTableNameByGroupNameFromRemote meet exception", e);
throw new ObTableNotExistException(format(
"fail to get table name from remote, key=%s", key), e);
} finally {
try {
if (null != rs) {
rs.close();
}
if (null != ps) {
ps.close();
}
} catch (SQLException e) {
// ignore
}
}
return realTableName;
}

/*
* Load table entry randomly.
*/
Expand Down Expand Up @@ -486,13 +586,15 @@ private static void getObVersionFromRemote(Connection connection)
}

// check tenant exist or not
private static void checkTenantExistFromRemote(Connection connection, TableEntryKey key)
private static int checkTenantExistFromRemote(Connection connection, TableEntryKey key)
throws ObTableEntryRefreshException {
try (PreparedStatement ps = connection.prepareStatement(OB_TENANT_EXIST_SQL)) {
ps.setString(1, key.getTenantName());
try (ResultSet rs = ps.executeQuery()) {
if (!rs.next()) {
throw new ObTableEntryRefreshException("fail to get tenant id from remote");
} else {
return rs.getInt("tenant_id");
}
} catch (Exception e) {
throw new ObTableEntryRefreshException("fail to get tenant id from remote", e);
Expand All @@ -508,11 +610,12 @@ private static TableEntry getTableEntryFromRemote(Connection connection, TableEn
PreparedStatement ps = null;
ResultSet rs = null;
TableEntry tableEntry;
int tenantId = -1;
try {
if (ObGlobal.obVsnMajor() == 0) {
getObVersionFromRemote(connection);
}
checkTenantExistFromRemote(connection, key);
tenantId = checkTenantExistFromRemote(connection, key);
if (ObGlobal.obVsnMajor() >= 4) {
if (key.getTableName().equals(Constants.ALL_DUMMY_TABLE)) {
ps = connection.prepareStatement(PROXY_DUMMY_LOCATION_SQL_V4);
Expand Down
Loading

0 comments on commit 3501f55

Please sign in to comment.