Skip to content

Commit

Permalink
review commit edit
Browse files Browse the repository at this point in the history
  • Loading branch information
foronedream committed Mar 19, 2024
1 parent 76d4769 commit 052e0de
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 148 deletions.
71 changes: 42 additions & 29 deletions src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -1291,45 +1291,46 @@ private TableEntry refreshTableEntry(TableEntry tableEntry, String tableName, bo

/**
* 根据 tableGroup 获取其中一个tableName
* @param realTableName
* @param tableName
* physicalTableName Complete table from table group
* @param physicalTableName
* @param tableGroupName
* @return
* @throws Exception
*/
private String refreshTableName(String realTableName, String tableName) throws Exception {
private String refreshTableNameByTableGroup(String physicalTableName, String tableGroupName) throws Exception {
TableEntryKey tableEntryKey = new TableEntryKey(clusterName, tenantName, database,
tableName);
String oldTableName = realTableName;
tableGroupName);
String oldTableName = physicalTableName;
try {
realTableName = loadTableNameWithGroup(serverRoster, //
physicalTableName = loadTableNameWithGroupName(serverRoster, //
tableEntryKey,//
tableEntryAcquireConnectTimeout,//
tableEntryAcquireSocketTimeout,//
serverAddressPriorityTimeout,//
serverAddressCachingTimeout, sysUA);
} catch (ObTableNotExistException e) {
RUNTIME.error("refreshTableName from tableGroup meet exception", e);
RUNTIME.error("refreshTableNameByTableGroup from tableGroup meet exception", e);
throw e;
} catch (ObTableServerCacheExpiredException e) {
RUNTIME.error("refreshTableEntry from tableGroup meet exception", e);
throw e;
} catch (Exception e) {
RUNTIME.error("refreshTableEntry from tableGroup meet exception", tableEntryKey, realTableName, e);
RUNTIME.error("refreshTableEntry from tableGroup meet exception", tableEntryKey, physicalTableName, e);
throw new ObTableNotExistException(String.format(
"failed to get table name key=%s original tableName=%s ", tableEntryKey,
realTableName), e);
physicalTableName), e);
}
if (!TableGroupInverted.isEmpty() && TableGroupInverted.containsKey(oldTableName)) {
TableGroupInverted.remove(oldTableName, tableName);
TableGroupInverted.remove(oldTableName, tableGroupName);
}
TableGroupCache.put(tableName, realTableName);
TableGroupInverted.put(realTableName, tableName);
TableGroupCache.put(tableGroupName, physicalTableName);
TableGroupInverted.put(physicalTableName, tableGroupName);
if (logger.isInfoEnabled()) {
logger.info(
"get table name from tableGroup, dataSource: {}, tableName: {}, refresh: {} key:{} realTableName:{} ",
dataSourceName, tableName, true, tableEntryKey, realTableName);
dataSourceName, tableGroupName, true, tableEntryKey, physicalTableName);
}
return realTableName;
return physicalTableName;
}

/**
Expand Down Expand Up @@ -1728,21 +1729,21 @@ public List<ObPair<Long, ObTableParam>> getTables(String tableName, Object[] sta

/**
* get table name with table group
* @param tableName
* @param tableGroupName
* @param refresh
* @return
* @throws Exception
*/
public String tryGetTableNameFromTableGroupCache(final String tableName, final boolean refresh) throws Exception {
String realTableName = TableGroupCache.get(tableName); // tableGroup -> Table
public String tryGetTableNameFromTableGroupCache(final String tableGroupName, final boolean refresh) throws Exception {
String physicalTableName = TableGroupCache.get(tableGroupName); // tableGroup -> Table
// get tableName from cache
if (realTableName != null && !realTableName.isEmpty() && !refresh) {
return realTableName;
if (physicalTableName != null && !refresh) {
return physicalTableName;
}

// not find in cache, should get tableName from observer
Lock tempLock = new ReentrantLock();
Lock lock = TableGroupCacheLocks.putIfAbsent(tableName, tempLock);
Lock lock = TableGroupCacheLocks.putIfAbsent(tableGroupName, tempLock);
lock = (lock == null) ? tempLock : lock; // check the first lock

// attempt lock the refreshing action, avoiding concurrent refreshing
Expand All @@ -1751,29 +1752,29 @@ public String tryGetTableNameFromTableGroupCache(final String tableName, final b

if (!acquired) {
String errMsg = "try to lock tableGroup inflect timeout " + "dataSource:"
+ dataSourceName + " ,tableName:" + tableName
+ dataSourceName + " ,tableName:" + tableGroupName
+ " , timeout:" + metadataRefreshLockTimeout + ".";
RUNTIME.error(errMsg);
throw new ObTableEntryRefreshException(errMsg);
}

try {
String newRealTableName = TableGroupCache.get(tableName);
if (((realTableName == null || realTableName.isEmpty()) && (newRealTableName == null || newRealTableName.isEmpty()))
|| (refresh && newRealTableName.equalsIgnoreCase(realTableName))) {
String newPhyTableName = TableGroupCache.get(tableGroupName);
if (((physicalTableName == null) && (newPhyTableName == null))
|| (refresh && newPhyTableName.equalsIgnoreCase(physicalTableName))) {
if (logger.isInfoEnabled()) {
if (realTableName != null && !realTableName.isEmpty()) {
if (physicalTableName != null) {
logger.info(
"realTableName need refresh, create new table entry, tablename: {}",
tableName);
tableGroupName);
} else {
logger.info("realTableName not exist, create new table entry, tablename: {}",
tableName);
tableGroupName);
}
}

try {
return refreshTableName(realTableName, tableName);
return refreshTableNameByTableGroup(physicalTableName, tableGroupName);
} catch (ObTableNotExistException e) {
RUNTIME.error("getOrRefreshTableName from TableGroup meet exception", e);
throw e;
Expand All @@ -1793,7 +1794,7 @@ public String tryGetTableNameFromTableGroupCache(final String tableName, final b
logger.info("refresh table Name from TableGroup failure");
}
}
return newRealTableName;
return newPhyTableName;
} finally {
lock.unlock();
}
Expand Down Expand Up @@ -3226,6 +3227,18 @@ public ConcurrentHashMap<String, String> getTableGroupCache() {
return TableGroupCache;
}

/**
* get table route fail than clear table group message
* @param phyTableName
* @param tableGroupName
*/
public void eraseTableGroupFromCache(String tableGroupName) {
// clear table group cache
TableGroupInverted.remove(TableGroupCache.get(tableGroupName));
TableGroupCache.remove(tableGroupName);
TableGroupCacheLocks.remove(tableGroupName);
}

/*
* check table name whether group name
*/
Expand Down
101 changes: 24 additions & 77 deletions src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -441,35 +441,36 @@ TableEntry execute(Connection connection)
});
}

private abstract static class GetTableNameWithPriorityCallback<T> {
abstract T execute(ObServerAddr obServerAddr) throws ObTableNotExistException;
}

private abstract static class GetTableNameCallback<T> {
abstract T execute(Connection connection) throws ObTableNotExistException;
}

/*
* call TableName with group
* load Table Name With table Group
*/
private static String callTableNameRefresh(ObServerAddr obServerAddr, TableEntryKey key,
long connectTimeout, long socketTimeout,
ObUserAuth sysUA, boolean initialized,
GetTableNameCallback<String> callback)
public static String loadTableNameWithGroupName(final ServerRoster serverRoster,
final TableEntryKey key,
final long connectTimeout,
final long socketTimeout,
final long priorityTimeout,
final long cachingTimeout,
final ObUserAuth sysUA)
throws ObTableNotExistException {
String url = formatObServerUrl(obServerAddr, connectTimeout, socketTimeout);
Connection connection = null;
String realTableName;
String realTableName = "";
String url = "";
ObServerAddr addr = serverRoster.getServer(priorityTimeout, cachingTimeout);
try {
url = formatObServerUrl(addr, connectTimeout, socketTimeout);
connection = getMetaRefreshConnection(url, sysUA);
realTableName = callback.execute(connection);
realTableName = getTableNameByGroupNameFromRemote(connection, key);
serverRoster.resetPriority(addr);
} catch (ObTableNotExistException e) {
// avoid to refresh meta for ObTableNotExistException
RUNTIME.error("callTableName meet exception", e);
RUNTIME.error("callTableEntryNameWithPriority meet exception", e);
serverRoster.downgradePriority(addr);
throw e;
} catch (Exception e) {
throw new ObTableNotExistException(format(
"fail to get table name from remote url=%s, key=%s", url, key), e);
} catch (Throwable t) {
RUNTIME.error("callTableEntryNameWithPriority meet exception", t);
throw t;
} finally {
try {
if (null != connection) {
Expand All @@ -479,70 +480,18 @@ private static String callTableNameRefresh(ObServerAddr obServerAddr, TableEntry
// ignore
}
}

if (realTableName != null && !realTableName.isEmpty()) {
return realTableName;
} else {
throw new ObTableNotExistException("table name is invalid, addr = " + obServerAddr
+ " key =" + key + " tableName =" + realTableName);
throw new ObTableNotExistException("table name is invalid, addr = " + addr + " key =" + key + " tableName =" + realTableName);
}

}

/*
* call table name with group
*/
private static String callTableNameWithGroup(ServerRoster serverRoster,
long priorityTimeout,
long cachingTimeout,
GetTableNameWithPriorityCallback<String> callable)
throws ObTableNotExistException {
ObServerAddr addr = serverRoster.getServer(priorityTimeout, cachingTimeout);
try {
String realTableName = callable.execute(addr);
serverRoster.resetPriority(addr);
return realTableName;
} catch (ObTableNotExistException e) {
RUNTIME.error("callTableEntryNameWithPriority meet exception", e);
serverRoster.downgradePriority(addr);
throw e;
} catch (Throwable t) {
RUNTIME.error("callTableEntryNameWithPriority meet exception", t);
throw t;
}
}

/*
* load Table Name With table Group
*/
public static String loadTableNameWithGroup(final ServerRoster serverRoster,
final TableEntryKey key,
final long connectTimeout,
final long socketTimeout,
final long priorityTimeout,
final long cachingTimeout,
final ObUserAuth sysUA)
throws ObTableNotExistException {
return callTableNameWithGroup(serverRoster, priorityTimeout, cachingTimeout,
new GetTableNameWithPriorityCallback<String>() {
@Override
String execute(ObServerAddr obServerAddr) throws ObTableEntryRefreshException {
return callTableNameRefresh(obServerAddr, key, connectTimeout, socketTimeout,
sysUA, true, new GetTableNameCallback<String>() {
@Override
String execute(Connection connection)
throws ObTableEntryRefreshException {
return getTableNameFromRemote(connection, key);
}
});
}
});

}

/*
* get TableName From Remote with Group
*/
private static String getTableNameFromRemote(Connection connection, TableEntryKey key)
private static String getTableNameByGroupNameFromRemote(Connection connection, TableEntryKey key)
throws ObTableNotExistException {
PreparedStatement ps = null;
ResultSet rs = null;
Expand All @@ -565,14 +514,12 @@ private static String getTableNameFromRemote(Connection connection, TableEntryKe
while (rs.next()) {
realTableName = rs.getString("table_name");
}


} catch (ObTableNotExistException e) {
// avoid to refresh meta for ObTableNotExistException
RUNTIME.error("getTableNameFromRemote meet exception", e);
RUNTIME.error("getTableNameByGroupNameFromRemote meet exception", e);
throw e;
} catch (Exception e) {
RUNTIME.error("getTableNameFromRemote meet exception", e);
RUNTIME.error("getTableNameByGroupNameFromRemote meet exception", e);
throw new ObTableNotExistException(format(
"fail to get table name from remote, key=%s", key), e);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@ protected ObTableQueryResult execute(ObPair<Long, ObTableParam> partIdWithIndex,
long startExecute = System.currentTimeMillis();
Set<String> failedServerList = null;
ObServerRoute route = null;
ObTableParam tableParam = null;
boolean isNeedRefreshTableGroupName = false;
while (true) {
client.checkStatus();
long currentExecute = System.currentTimeMillis();
Expand All @@ -80,19 +78,10 @@ protected ObTableQueryResult execute(ObPair<Long, ObTableParam> partIdWithIndex,
if (failedServerList != null) {
route.setBlackList(failedServerList);
}
if (isNeedRefreshTableGroupName) {
// if input is table group, retry should replace table name
indexTableName = client.tryGetTableNameFromTableGroupCache(client.getTableGroupInverted().get(indexTableName), true);
tableName = indexTableName;
}
tableParam = client.getTable(indexTableName, partIdWithIndex.getLeft(),
needRefreshTableEntry, client.isTableEntryRefreshIntervalWait(),
route).getRight();
subObTable = tableParam.getObTable();
if (isNeedRefreshTableGroupName) {
((ObTableQueryRequest) request).setPartitionId(tableParam.getPartitionId());
((ObTableQueryRequest) request).setTableId(tableParam.getTableId());
}
subObTable = client
.getTable(indexTableName, partIdWithIndex.getLeft(),
needRefreshTableEntry, client.isTableEntryRefreshIntervalWait(),
route).getRight().getObTable();
}
}
result = subObTable.execute(request);
Expand Down Expand Up @@ -141,19 +130,13 @@ protected ObTableQueryResult execute(ObPair<Long, ObTableParam> partIdWithIndex,
} else if (e instanceof ObTableException) {
if ((((ObTableException) e).getErrorCode() == ResultCodes.OB_TABLE_NOT_EXIST.errorCode
|| ((ObTableException) e).getErrorCode() == ResultCodes.OB_NOT_SUPPORTED.errorCode)
&& ((ObTableQueryRequest) request).getTableQuery().isHbaseQuery()) {
if (client.getTableGroupInverted().get(indexTableName) == null) {
// not use table group
isNeedRefreshTableGroupName = false;
} else {
isNeedRefreshTableGroupName = true;
}

&& ((ObTableQueryRequest) request).getTableQuery().isHbaseQuery()
&& client.getTableGroupInverted().get(indexTableName) != null) {
// table not exists && hbase mode && table group exists , three condition both
client.eraseTableGroupFromCache(tableName);
}
if (((ObTableException) e).isNeedRefreshTableEntry() || isNeedRefreshTableGroupName) {
if (!isNeedRefreshTableGroupName) {
needRefreshTableEntry = true;
}
if (((ObTableException) e).isNeedRefreshTableEntry()) {
needRefreshTableEntry = true;
logger
.warn(
"tablename:{} partition id:{} stream query refresh table while meet Exception needing refresh, errorCode: {}",
Expand All @@ -170,6 +153,9 @@ protected ObTableQueryResult execute(ObPair<Long, ObTableParam> partIdWithIndex,
client.calculateContinuousFailure(indexTableName, e.getMessage());
throw e;
}
} else {
client.calculateContinuousFailure(indexTableName, e.getMessage());
throw e;
}
} else {
client.calculateContinuousFailure(indexTableName, e.getMessage());
Expand Down
Loading

0 comments on commit 052e0de

Please sign in to comment.