Skip to content

Commit

Permalink
Capture exceptions in the refreshIndexInfo and do syncRefreshMetadata (
Browse files Browse the repository at this point in the history
…#301)

* Capture exceptions in the refreshIndexInfo and do syncRefreshMetadata

* fix

* fix commonExecute not catch refreshException
  • Loading branch information
maochongxin authored Mar 3, 2025
1 parent dce57a2 commit 6c2b1c2
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 21 deletions.
46 changes: 34 additions & 12 deletions src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -1141,19 +1141,41 @@ public ObIndexInfo getOrRefreshIndexInfo(final String indexTableName, boolean fo
logger.info("index info is not exist, create new index info, indexTableName: {}",
indexTableName);
int serverSize = serverRoster.getMembers().size();
int refreshTryTimes = tableEntryRefreshTryTimes > serverSize ? serverSize
: tableEntryRefreshTryTimes;
int refreshTryTimes = Math.min(tableEntryRefreshTryTimes, serverSize);
for (int i = 0; i < refreshTryTimes; i++) {
ObServerAddr serverAddr = serverRoster.getServer(serverAddressPriorityTimeout,
serverAddressCachingTimeout);
indexInfo = getIndexInfoFromRemote(serverAddr, sysUA,
tableEntryAcquireConnectTimeout, tableEntryAcquireSocketTimeout,
indexTableName);
if (indexInfo != null) {
indexinfos.put(indexTableName, indexInfo);
} else {
RUNTIME.error("get index info from remote is null, indexTableName: {}",
indexTableName);
try {
ObServerAddr serverAddr = serverRoster.getServer(serverAddressPriorityTimeout,
serverAddressCachingTimeout);
if (serverAddr.isExpired(serverAddressCachingTimeout)) {
syncRefreshMetadata(false);
}
indexInfo = getIndexInfoFromRemote(serverAddr, sysUA,
tableEntryAcquireConnectTimeout, tableEntryAcquireSocketTimeout,
indexTableName);
if (indexInfo != null) {
indexinfos.put(indexTableName, indexInfo);
break;
} else {
RUNTIME.error("get index info from remote is null, indexTableName: {}",
indexTableName);
}
} catch (ObTableServerCacheExpiredException e) {
RUNTIME.error("get index info from remote meet exception", e);
syncRefreshMetadata(false);
} catch (ObTableEntryRefreshException e) {
RUNTIME.error("get index info from remote meet exception", e);
if (tableEntryRefreshContinuousFailureCount.incrementAndGet() > tableEntryRefreshContinuousFailureCeiling) {
logger.error(LCD.convert("01-00019"),
tableEntryRefreshContinuousFailureCeiling);
syncRefreshMetadata(false);
tableEntryRefreshContinuousFailureCount.set(0);
} else if (e.isConnectInactive()) {
syncRefreshMetadata(false);
tableEntryRefreshContinuousFailureCount.set(0);
}
} catch (Throwable t) {
RUNTIME.error("getOrRefreshTableEntry meet exception", t);
throw t;
}
}
return indexInfo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1050,6 +1050,10 @@ public static ObIndexInfo getIndexInfoFromRemote(ObServerAddr obServerAddr, ObUs
} else {
throw new ObTableEntryRefreshException("index is not exist");
}
} catch (SQLException e) {
// cannot execute sql, maybe some of the observers have been killed
RUNTIME.error(LCD.convert("01-00010"), indexTableName, e.getMessage());
throw new ObTableEntryRefreshException("fail to get index info from remote", e, true);
} catch (Exception e) {
if (e instanceof ObTableEntryRefreshException) {
throw new ObTableEntryRefreshException(format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@
import com.alipay.oceanbase.rpc.ObGlobal;
import com.alipay.oceanbase.rpc.ObTableClient;
import com.alipay.oceanbase.rpc.bolt.transport.ObTableConnection;
import com.alipay.oceanbase.rpc.exception.ObTableException;
import com.alipay.oceanbase.rpc.exception.ObTableNeedFetchAllException;
import com.alipay.oceanbase.rpc.exception.ObTableRetryExhaustedException;
import com.alipay.oceanbase.rpc.exception.*;
import com.alipay.oceanbase.rpc.location.model.TableEntry;
import com.alipay.oceanbase.rpc.location.model.partition.ObPair;
import com.alipay.oceanbase.rpc.protocol.payload.Constants;
Expand Down Expand Up @@ -329,9 +327,27 @@ protected ObTableQueryAsyncResult executeAsync(ObPair<Long, ObTableParam> partId
}

// execute request
ObTableQueryAsyncResult result = (ObTableQueryAsyncResult) commonExecute(this.client,
logger, partIdWithObTable, streamRequest, connectionRef);

ObTableQueryAsyncResult result = null;
for (int i = 0; i < client.getRuntimeRetryTimes(); i++) {
try {
result = (ObTableQueryAsyncResult) commonExecute(this.client,
logger, partIdWithObTable, streamRequest, connectionRef);
break;
} catch (ObTableServerCacheExpiredException e) {
client.syncRefreshMetadata(false);
} catch (ObTableEntryRefreshException e) {
if (e.isConnectInactive()) {
client.syncRefreshMetadata(false);
} else {
throw e;
}
} catch (Throwable t) {
throw t;
}
}
if (result == null) {
throw new ObTableRetryExhaustedException("exhaust retry times " + client.getRuntimeRetryTimes());
}
// cache result
cacheResultRows(result);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

import com.alipay.oceanbase.rpc.ObTableClient;
import com.alipay.oceanbase.rpc.bolt.transport.ObTableConnection;
import com.alipay.oceanbase.rpc.exception.ObTableEntryRefreshException;
import com.alipay.oceanbase.rpc.exception.ObTableRetryExhaustedException;
import com.alipay.oceanbase.rpc.exception.ObTableServerCacheExpiredException;
import com.alipay.oceanbase.rpc.location.model.partition.ObPair;
import com.alipay.oceanbase.rpc.protocol.payload.ObPayload;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.AbstractQueryStreamResult;
Expand Down Expand Up @@ -62,9 +65,28 @@ protected ObTableQueryResult execute(ObPair<Long, ObTableParam> partIdWithIndex,
AtomicReference<ObTableConnection> connectionRef = new AtomicReference<>();

// execute request
ObTableQueryResult result = (ObTableQueryResult) commonExecute(this.client, logger,
partIdWithIndex, request, connectionRef);

ObTableQueryResult result = null;
for (int i = 0; i < client.getRuntimeRetryTimes(); i++) {
try {
result = (ObTableQueryResult) commonExecute(this.client, logger,
partIdWithIndex, request, connectionRef);
break;
} catch (ObTableServerCacheExpiredException e) {
client.syncRefreshMetadata(false);
} catch (ObTableEntryRefreshException e) {
if (e.isConnectInactive()) {
client.syncRefreshMetadata(false);
} else {
throw e;
}
} catch (Throwable t) {
throw t;
}
}
if (result == null) {
throw new ObTableRetryExhaustedException("exhaust retry times " + client.getRuntimeRetryTimes());
}

cacheStreamNext(partIdWithIndex, checkObTableQueryResult(result));

return result;
Expand Down

0 comments on commit 6c2b1c2

Please sign in to comment.