Skip to content

Commit

Permalink
Fix ob down ls timeout (#299)
Browse files Browse the repository at this point in the history
* SQLException need to refresh obTable roster

* refresh tablet address after refresh obTable roster

* refresh table roster if obtable is null

* use force renew to update roster

* add log

* update addrExpired after refresh
  • Loading branch information
JackShi148 authored Feb 28, 2025
1 parent b466014 commit dce57a2
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 49 deletions.
112 changes: 68 additions & 44 deletions src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -924,9 +924,9 @@ public void resetExecuteContinuousFailureCount(String tableName) {
*
* @throws Exception if fail
*/
public void syncRefreshMetadata() throws Exception {
public void syncRefreshMetadata(boolean forceRenew) throws Exception {

if (System.currentTimeMillis() - lastRefreshMetadataTimestamp < metadataRefreshInterval) {
if (!forceRenew && System.currentTimeMillis() - lastRefreshMetadataTimestamp < metadataRefreshInterval) {
logger
.warn(
"try to lock metadata refreshing, it has refresh at: {}, dataSourceName: {}, url: {}",
Expand All @@ -947,7 +947,7 @@ public void syncRefreshMetadata() throws Exception {

try {

if (System.currentTimeMillis() - lastRefreshMetadataTimestamp < metadataRefreshInterval) {
if (!forceRenew && System.currentTimeMillis() - lastRefreshMetadataTimestamp < metadataRefreshInterval) {
logger.warn("it has refresh metadata at: {}, dataSourceName: {}, url: {}",
lastRefreshMetadataTimestamp, dataSourceName, paramURL);
return;
Expand Down Expand Up @@ -1295,7 +1295,7 @@ public TableEntry getOrRefreshTableEntry(final String tableName, final boolean r
if (logger.isInfoEnabled()) {
logger.info("server addr is expired and it will refresh metadata.");
}
syncRefreshMetadata();
syncRefreshMetadata(false);
tableEntryRefreshContinuousFailureCount.set(0);
} catch (ObTableEntryRefreshException e) {
RUNTIME.error("getOrRefreshTableEntry meet exception", e);
Expand All @@ -1307,11 +1307,11 @@ public TableEntry getOrRefreshTableEntry(final String tableName, final boolean r
if (tableEntryRefreshContinuousFailureCount.incrementAndGet() > tableEntryRefreshContinuousFailureCeiling) {
logger.error(LCD.convert("01-00019"),
tableEntryRefreshContinuousFailureCeiling);
syncRefreshMetadata();
syncRefreshMetadata(false);
tableEntryRefreshContinuousFailureCount.set(0);
} else if (e.isConnectInactive()) {
// getMetaRefreshConnection failed, maybe the server is down, so we need to refresh metadata directly
syncRefreshMetadata();
syncRefreshMetadata(false);
tableEntryRefreshContinuousFailureCount.set(0);
}
} catch (Throwable t) {
Expand All @@ -1326,7 +1326,7 @@ public TableEntry getOrRefreshTableEntry(final String tableName, final boolean r
"refresh table entry has tried {}-times failure and will sync refresh metadata",
refreshTryTimes);
}
syncRefreshMetadata();
syncRefreshMetadata(false);
return refreshTableEntry(tableEntry, tableName);
}
return tableEntry;
Expand Down Expand Up @@ -1405,7 +1405,7 @@ public TableEntry refreshTableLocationByTabletId(TableEntry tableEntry, String t
throw e;
} catch (ObTableServerCacheExpiredException e) {
RUNTIME.warn("RefreshTableEntry encountered an exception", e);
syncRefreshMetadata();
syncRefreshMetadata(false);
tableEntryRefreshContinuousFailureCount.set(0);
} catch (ObTableEntryRefreshException e) {
RUNTIME.error("getOrRefreshTableEntry meet exception", e);
Expand All @@ -1416,11 +1416,11 @@ public TableEntry refreshTableLocationByTabletId(TableEntry tableEntry, String t
if (tableEntryRefreshContinuousFailureCount.incrementAndGet() > tableEntryRefreshContinuousFailureCeiling) {
logger.error(LCD.convert("01-00019"),
tableEntryRefreshContinuousFailureCeiling);
syncRefreshMetadata();
syncRefreshMetadata(false);
tableEntryRefreshContinuousFailureCount.set(0);
} else if (e.isConnectInactive()) {
// getMetaRefreshConnection failed, maybe the server is down, so we need to refresh metadata directly
syncRefreshMetadata();
syncRefreshMetadata(false);
tableEntryRefreshContinuousFailureCount.set(0);
}
} catch (Throwable t) {
Expand Down Expand Up @@ -2022,38 +2022,49 @@ public ObPair<Long, ObTableParam> getTableInternal(String tableName, TableEntry
RUNTIME.error("Cannot get replica by partId: " + partId);
throw new ObTableGetException("Cannot get replica by partId: " + partId);
}
int retryTimes = 0;
ObServerAddr addr = replica.getAddr();
ObTable obTable = tableRoster.get(addr);
boolean addrExpired = addr.isExpired(serverAddressCachingTimeout);
if (obTable == null || addrExpired) {
if (obTable == null) {
logger.warn("Cannot get ObTable by addr {}, refreshing metadata.", addr);
syncRefreshMetadata();
}
if (addr.isExpired(serverAddressCachingTimeout)) {
while ((obTable == null || addrExpired) && retryTimes < 2) {
++retryTimes;
if (addrExpired) {
logger.info("Server addr {} is expired, refreshing tableEntry.", addr);
if (ObGlobal.obVsnMajor() >= 4) {
refreshTableLocationByTabletId(tableEntry, tableName, tabletId);
} else {
tableEntry = getOrRefreshTableEntry(tableName, true, waitForRefresh, false);
}
addrExpired = addr.isExpired(serverAddressCachingTimeout);
}

if (ObGlobal.obVsnMajor() >= 4) {
obPartitionLocationInfo = getOrRefreshPartitionInfo(tableEntry, tableName, tabletId);
replica = getPartitionLocation(obPartitionLocationInfo, route);
} else {
replica = getPartitionReplica(tableEntry, partitionId, route).getRight();
}

addr = replica.getAddr();
obTable = tableRoster.get(addr);

if (obTable == null) {
RUNTIME.error("Cannot get table by addr: " + addr);
throw new ObTableGetException("Cannot get table by addr: " + addr);
// need to refresh table roster to ensure the current roster is the latest
syncRefreshMetadata(true);
// the addr is wrong, need to refresh location
if (logger.isInfoEnabled()) {
logger.info("Cannot get ObTable by addr {}, refreshing metadata.", addr);
}
// refresh tablet location based on the latest roster, in case that some of the observers hase been killed
// and used the old location
tableEntry = refreshTableLocationByTabletId(tableEntry, tableName, tabletId);
if (ObGlobal.obVsnMajor() >= 4) {
obPartitionLocationInfo = getOrRefreshPartitionInfo(tableEntry, tableName, tabletId);
replica = getPartitionLocation(obPartitionLocationInfo, route);
} else {
replica = getPartitionReplica(tableEntry, partitionId, route).getRight();
}
if (replica == null) {
RUNTIME.error("Cannot get replica by partId: " + partId);
throw new ObTableGetException("Cannot get replica by partId: " + partId);
}
addr = replica.getAddr();
obTable = tableRoster.get(addr);
}
}
if (obTable == null) {
RUNTIME.error("cannot get table by addr: " + addr);
throw new ObTableGetException("obTable is null, addr is: " + addr.getIp() + ":" + addr.getSvrPort());
}
ObTableParam param = createTableParam(obTable, tableEntry, obPartitionLocationInfo, partId, tabletId);
if (ObGlobal.obVsnMajor() >= 4) {
} else {
Expand Down Expand Up @@ -2317,33 +2328,46 @@ private List<ObPair<Long, ObTableParam>> getTables(String tableName, ObTableQuer
ReplicaLocation replica = partIdWithReplica.getRight();
ObServerAddr addr = replica.getAddr();
ObTable obTable = tableRoster.get(addr);
int retryTimes = 0;
boolean addrExpired = addr.isExpired(serverAddressCachingTimeout);
if (addrExpired || obTable == null) {
if (obTable == null) {
logger.warn("Cannot get ObTable by addr {}, refreshing metadata.", addr);
syncRefreshMetadata();
}
while ((obTable == null || addrExpired) && retryTimes < 2) {
++retryTimes;
if (addrExpired) {
logger.info("Server addr {} is expired, refreshing tableEntry.", addr);
if (ObGlobal.obVsnMajor() >= 4) {
refreshTableLocationByTabletId(tableEntry, tableName, tabletId);
} else {
tableEntry = getOrRefreshTableEntry(tableName, true, waitForRefresh, false);
}
addrExpired = addr.isExpired(serverAddressCachingTimeout);
}
if (ObGlobal.obVsnMajor() >= 4) {
ObPartitionLocationInfo locationInfo = getOrRefreshPartitionInfo(tableEntry, tableName, tabletId);
replica = getPartitionLocation(locationInfo, route);
} else {
replica = getPartitionLocation(tableEntry, partId, route);
if (obTable == null) {
// need to refresh table roster to ensure the current roster is the latest
syncRefreshMetadata(true);
// the addr is wrong, need to refresh location
if (logger.isInfoEnabled()) {
logger.info("Cannot get ObTable by addr {}, refreshing metadata.", addr);
}
// refresh tablet location based on the latest roster, in case that some of the observers hase been killed
// and used the old location
tableEntry = refreshTableLocationByTabletId(tableEntry, tableName, tabletId);
if (ObGlobal.obVsnMajor() >= 4) {
ObPartitionLocationInfo locationInfo = getOrRefreshPartitionInfo(tableEntry, tableName, tabletId);
replica = getPartitionLocation(locationInfo, route);
} else {
replica = getPartitionLocation(tableEntry, partId, route);
}
if (replica == null) {
RUNTIME.error("Cannot get replica by partId: " + partId);
throw new ObTableGetException("Cannot get replica by partId: " + partId);
}
addr = replica.getAddr();
obTable = tableRoster.get(addr);
}
addr = replica.getAddr();
obTable = tableRoster.get(addr);
}

if (obTable == null) {
RUNTIME.error("cannot get table by addr: " + addr);
throw new ObTableGetException("cannot get table by addr: " + addr);
throw new ObTableGetException("obTable is null, addr is: " + addr.getIp() + ":" + addr.getSvrPort());
}

ObTableParam param = new ObTableParam(obTable);
Expand Down Expand Up @@ -2487,7 +2511,7 @@ public String tryGetTableNameFromTableGroupCache(final String tableGroupName,
if (logger.isInfoEnabled()) {
logger.info("server addr is expired and it will refresh metadata.");
}
syncRefreshMetadata();
syncRefreshMetadata(false);
} catch (Throwable t) {
RUNTIME.error("getOrRefreshTableName from TableGroup meet exception", t);
throw t;
Expand Down
29 changes: 26 additions & 3 deletions src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -782,12 +782,19 @@ private static TableEntry getTableEntryFromRemote(Connection connection, TableEn
}
}
}
} catch (SQLException e) {
// cannot execute sql, maybe some of the observers have been killed
RUNTIME.error(LCD.convert("01-00010"), key, e.getMessage());
throw new ObTableEntryRefreshException("fail to get partition location entry from remote", e, true);
} catch (ObTableNotExistException e) {
// avoid to refresh meta for ObTableNotExistException
RUNTIME.error("getTableEntryFromRemote meet exception", e);
throw e;
} catch (Exception e) {
RUNTIME.error(LCD.convert("01-00009"), key, e);
if (e instanceof ObTableEntryRefreshException) {
throw e;
}
throw new ObTableEntryRefreshException(format(
"fail to get table entry from remote, key=%s", key), e);
} finally {
Expand Down Expand Up @@ -897,6 +904,10 @@ public static TableEntry getTableEntryLocationFromRemote(Connection connection,
ps.setString(5, key.getTableName());
rs = ps.executeQuery();
getPartitionLocationFromResultSetByTablet(tableEntry, rs, partitionEntry, tabletId);
} catch (SQLException e) {
// cannot execute sql, maybe some of the observers have been killed
RUNTIME.error(LCD.convert("01-00010"), key, tableEntry, e.getMessage());
throw new ObTableEntryRefreshException("fail to get partition location entry from remote", e, true);
} catch (Exception e) {
RUNTIME.error(LCD.convert("01-00010"), key, tableEntry, e);
throw new ObTablePartitionLocationRefreshException(format(
Expand Down Expand Up @@ -946,6 +957,9 @@ public static TableEntry getTableEntryLocationFromRemote(Connection connection,
}
rs = ps.executeQuery();
partitionEntry = getPartitionLocationFromResultSet(tableEntry, rs, partitionEntry);
} catch (SQLException e) {
RUNTIME.error(LCD.convert("01-00010"), key, partitionNum, tableEntry, e);
throw new ObTableEntryRefreshException("fail to get partition location entry from remote", e, true);
} catch (Exception e) {
RUNTIME.error(LCD.convert("01-00010"), key, partitionNum, tableEntry, e);
throw new ObTablePartitionLocationRefreshException(format(
Expand Down Expand Up @@ -1061,7 +1075,8 @@ public static ObIndexInfo getIndexInfoFromRemote(ObServerAddr obServerAddr, ObUs

private static void fetchFirstPart(Connection connection, TableEntry tableEntry,
ObPartFuncType obPartFuncType)
throws ObTablePartitionInfoRefreshException {
throws ObTablePartitionInfoRefreshException,
SQLException {
String tableName = "";
TableEntryKey key = tableEntry.getTableEntryKey();
if (key != null) {
Expand Down Expand Up @@ -1109,6 +1124,8 @@ private static void fetchFirstPart(Connection connection, TableEntry tableEntry,
tableEntry.getPartitionInfo().setPartTabletIdMap(
parseFirstPartKeyHash(rs, tableEntry));
}
} catch (SQLException e) {
throw e;
} catch (Exception e) {
RUNTIME.error(LCD.convert("01-00011"), tableEntry, obPartFuncType, e);

Expand All @@ -1131,7 +1148,8 @@ private static void fetchFirstPart(Connection connection, TableEntry tableEntry,

private static void fetchSubPart(Connection connection, TableEntry tableEntry,
ObPartFuncType subPartFuncType)
throws ObTablePartitionInfoRefreshException {
throws ObTablePartitionInfoRefreshException,
SQLException {
String tableName = "";
TableEntryKey key = tableEntry.getTableEntryKey();
if (key != null) {
Expand Down Expand Up @@ -1178,6 +1196,8 @@ private static void fetchSubPart(Connection connection, TableEntry tableEntry,
tableEntry.getPartitionInfo().setPartTabletIdMap(
parseSubPartKeyHash(rs, tableEntry));
}
} catch (SQLException e) {
throw e;
} catch (Exception e) {
RUNTIME.error(LCD.convert("01-00012"), tableEntry, subPartFuncType, e);
throw new ObTablePartitionInfoRefreshException(format(
Expand Down Expand Up @@ -1454,7 +1474,8 @@ private static ReplicaLocation buildReplicaLocation(ResultSet rs) throws SQLExce
}

private static void fetchPartitionInfo(Connection connection, TableEntry tableEntry)
throws ObTablePartitionInfoRefreshException {
throws ObTablePartitionInfoRefreshException,
SQLException {
PreparedStatement pstmt = null;
ResultSet rs = null;
ObPartitionInfo info = null;
Expand All @@ -1477,6 +1498,8 @@ private static void fetchPartitionInfo(Connection connection, TableEntry tableEn
logger.info("get part info from remote info:{}", JSON.toJSON(info));
}
tableEntry.setPartitionInfo(info);
} catch (SQLException e) {
throw e;
} catch (Exception e) {
RUNTIME.error(LCD.convert("01-00014"), tableEntry);
RUNTIME.error("fail to get part info from remote");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1315,7 +1315,7 @@ public void syncRefreshMetaHelper(final ObTableClient obTableClient) {
public void run() {
for (int i = 0; i < 10; i++) {
try {
obTableClient.syncRefreshMetadata();
obTableClient.syncRefreshMetadata(false);
} catch (Exception e) {
e.printStackTrace();
Assert.fail();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ public void syncRefreshMetaHelper(final ObTableClient obTableClient) {
public void run() {
for (int i = 0; i < 10; i++) {
try {
obTableClient.syncRefreshMetadata();
obTableClient.syncRefreshMetadata(false);
} catch (Exception e) {
e.printStackTrace();
Assert.fail();
Expand Down

0 comments on commit dce57a2

Please sign in to comment.