diff --git a/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java b/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java index 7d05e9ee..a70a7220 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java +++ b/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java @@ -935,22 +935,22 @@ public void syncRefreshMetadata() throws Exception { * @return the real table name */ public String getIndexTableName(final String dataTableName, final String indexName, - List scanRangeColumns) throws Exception { + List scanRangeColumns, boolean forceRefreshIndexInfo) throws Exception { String indexTableName = dataTableName; if (indexName != null && !indexName.isEmpty() && !indexName.equalsIgnoreCase("PRIMARY")) { String tmpTableName = constructIndexTableName(dataTableName, indexName); if (tmpTableName == null) { throw new ObTableException("index table name is null"); } - ObIndexInfo indexInfo = getOrRefreshIndexInfo(indexName, tmpTableName); + ObIndexInfo indexInfo = getOrRefreshIndexInfo(tmpTableName, forceRefreshIndexInfo); if (indexInfo == null) { - throw new ObTableException("index info is null"); + throw new ObTableException("index info is null, indexTableName:" + tmpTableName); } if (indexInfo.getIndexType().isGlobalIndex()) { indexTableName = tmpTableName; if (scanRangeColumns.isEmpty()) { throw new ObTableException( - "query by global index need add all index keys in order"); + "query by global index need add all index keys in order, indexTableName:" + indexTableName); } else { addRowKeyElement(indexTableName, scanRangeColumns.toArray(new String[scanRangeColumns.size()])); @@ -982,30 +982,29 @@ public String constructIndexTableName(final String dataTableName, final String i return "__idx_" + dataTableId + "_" + indexName; } - public ObIndexInfo getOrRefreshIndexInfo(final String indexName, final String indexTableName) - throws Exception { - ObIndexInfo indexInfo = indexinfos.get(indexName); - if (indexInfo != null) { + public ObIndexInfo getOrRefreshIndexInfo(final String indexTableName, boolean forceRefresh) throws Exception { + + ObIndexInfo indexInfo = indexinfos.get(indexTableName); + if (!forceRefresh && indexInfo != null) { return indexInfo; } Lock tempLock = new ReentrantLock(); - Lock lock = refreshIndexInfoLocks.putIfAbsent(indexName, tempLock); + Lock lock = refreshIndexInfoLocks.putIfAbsent(indexTableName, tempLock); lock = (lock == null) ? tempLock : lock; boolean acquired = lock.tryLock(tableEntryRefreshLockTimeout, TimeUnit.MILLISECONDS); if (!acquired) { String errMsg = "try to lock index infos refreshing timeout " + "dataSource:" - + dataSourceName + " ,indexName:" + indexName + " , timeout:" + + dataSourceName + " ,indexTableName:" + indexTableName + " , timeout:" + tableEntryRefreshLockTimeout + "."; RUNTIME.error(errMsg); throw new ObTableEntryRefreshException(errMsg); } try { - indexInfo = indexinfos.get(indexName); - if (indexInfo != null) { + indexInfo = indexinfos.get(indexTableName); + if (!forceRefresh && indexInfo != null) { return indexInfo; } else { - logger.info("index info is not exist, create new index info, indexName: {}", - indexName); + logger.info("index info is not exist, create new index info, indexTableName: {}", indexTableName); int serverSize = serverRoster.getMembers().size(); int refreshTryTimes = tableEntryRefreshTryTimes > serverSize ? serverSize : tableEntryRefreshTryTimes; @@ -1016,10 +1015,10 @@ public ObIndexInfo getOrRefreshIndexInfo(final String indexName, final String in tableEntryAcquireConnectTimeout, tableEntryAcquireSocketTimeout, indexTableName); if (indexInfo != null) { - indexinfos.put(indexName, indexInfo); + indexinfos.put(indexTableName, indexInfo); } else { - RUNTIME.error("get index info from remote is null, index name: {}", - indexName); + RUNTIME.error("get index info from remote is null, indexTableName: {}", + indexTableName); } } return indexInfo; diff --git a/src/main/java/com/alipay/oceanbase/rpc/exception/ExceptionUtil.java b/src/main/java/com/alipay/oceanbase/rpc/exception/ExceptionUtil.java index 795de0cf..152f4bc3 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/exception/ExceptionUtil.java +++ b/src/main/java/com/alipay/oceanbase/rpc/exception/ExceptionUtil.java @@ -23,6 +23,7 @@ import java.util.Objects; import static com.alipay.oceanbase.rpc.protocol.payload.ResultCodes.OB_DESERIALIZE_ERROR; +import static com.alipay.oceanbase.rpc.protocol.payload.ResultCodes.OB_ERR_KV_GLOBAL_INDEX_ROUTE; public class ExceptionUtil { @@ -70,10 +71,16 @@ public static ObTableException convertToObTableException(String host, int port, errorCode); } - // [errCode][errCodeName][errMsg][server][trace] - return new ObTableException("[" + String.valueOf(resultCodes.errorCode) + "]" + "[" - + resultCodes.name() + "]" + "[" + errMsg + "]" + "[" + server - + "]" + "[" + trace + "]", resultCodes.errorCode); + if (resultCodes.errorCode == OB_ERR_KV_GLOBAL_INDEX_ROUTE.errorCode) { + return new ObTableGlobalIndexRouteException("[" + String.valueOf(resultCodes.errorCode) + "]" + "[" + + resultCodes.name() + "]" + "[" + errMsg + "]" + "[" + server + + "]" + "[" + trace + "]", resultCodes.errorCode); + } else { + // [errCode][errCodeName][errMsg][server][trace] + return new ObTableException("[" + String.valueOf(resultCodes.errorCode) + "]" + "[" + + resultCodes.name() + "]" + "[" + errMsg + "]" + "[" + server + + "]" + "[" + trace + "]", resultCodes.errorCode); + } } /* diff --git a/src/main/java/com/alipay/oceanbase/rpc/exception/ObTableGlobalIndexRouteException.java b/src/main/java/com/alipay/oceanbase/rpc/exception/ObTableGlobalIndexRouteException.java new file mode 100644 index 00000000..6545c17b --- /dev/null +++ b/src/main/java/com/alipay/oceanbase/rpc/exception/ObTableGlobalIndexRouteException.java @@ -0,0 +1,69 @@ +/*- + * #%L + * OBKV Table Client Framework + * %% + * Copyright (C) 2021 OceanBase + * %% + * OBKV Table Client Framework is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * #L% + */ + +package com.alipay.oceanbase.rpc.exception; + +public class ObTableGlobalIndexRouteException extends ObTableException { + + /* + * Ob table global index route exception. + */ + public ObTableGlobalIndexRouteException() { + } + + /* + * Ob table global index route exception. + */ + public ObTableGlobalIndexRouteException(int errorCode) { + super(errorCode); + } + + /* + * Ob table global index route exception. + */ + public ObTableGlobalIndexRouteException(String message, int errorCode) { + super(message, errorCode); + } + + /* + * Ob table global index route exception. + */ + public ObTableGlobalIndexRouteException(String message) { + super(message); + } + + /* + * Ob table global index route exception. + */ + public ObTableGlobalIndexRouteException(String message, Throwable cause) { + super(message, cause); + } + + /* + * Ob table global index route exception. + */ + public ObTableGlobalIndexRouteException(Throwable cause) { + super(cause); + } + + /* + * Is need refresh table entry. + */ + public boolean isNeedRefreshTableEntry() { + return false; + } +} diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/ResultCodes.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/ResultCodes.java index 74876618..3eb331a8 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/ResultCodes.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/ResultCodes.java @@ -723,6 +723,7 @@ public enum ResultCodes { OB_CLUSTER_NAME_NOT_EQUAL(-9016), // OB_RS_LIST_INVAILD(-9017), // OB_AGENT_HAS_FAILED_TASK(-9018), // + OB_ERR_KV_GLOBAL_INDEX_ROUTE(-10500),// OB_KV_CREDENTIAL_NOT_MATCH(-10509), // OB_KV_ROWKEY_COUNT_NOT_MATCH(-10510), // OB_KV_COLUMN_TYPE_NOT_MATCH(-10511), // diff --git a/src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryStreamResult.java b/src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryStreamResult.java index 4e97e947..17677930 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryStreamResult.java +++ b/src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryStreamResult.java @@ -19,6 +19,7 @@ import com.alipay.oceanbase.rpc.ObTableClient; import com.alipay.oceanbase.rpc.exception.ObTableException; +import com.alipay.oceanbase.rpc.exception.ObTableGlobalIndexRouteException; import com.alipay.oceanbase.rpc.exception.ObTableReplicaNotReadableException; import com.alipay.oceanbase.rpc.exception.ObTableTimeoutExcetion; import com.alipay.oceanbase.rpc.location.model.ObServerRoute; @@ -144,6 +145,23 @@ protected ObTableQueryResult execute(ObPair partIdWithIndex, client.calculateContinuousFailure(indexTableName, e.getMessage()); throw e; } + } else if (e instanceof ObTableGlobalIndexRouteException) { + if ((tryTimes - 1) < client.getRuntimeRetryTimes()) { + logger + .warn( + "meet global index route expcetion: indexTableName:{} partition id:{}, errorCode: {}, retry times {}", + indexTableName, partIdWithIndex.getLeft(), + ((ObTableException) e).getErrorCode(), tryTimes, e); + indexTableName = client.getIndexTableName(tableName, tableQuery.getIndexName(), + tableQuery.getScanRangeColumns(), true); + } else { + logger + .warn( + "meet global index route expcetion: indexTableName:{} partition id:{}, errorCode: {}, reach max retry times {}", + indexTableName, partIdWithIndex.getLeft(), + ((ObTableException) e).getErrorCode(), tryTimes, e); + throw e; + } } else { client.calculateContinuousFailure(indexTableName, e.getMessage()); throw e; diff --git a/src/main/java/com/alipay/oceanbase/rpc/stream/async/ObTableClientQueryAsyncStreamResult.java b/src/main/java/com/alipay/oceanbase/rpc/stream/async/ObTableClientQueryAsyncStreamResult.java index 048b72c1..b24ab87f 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/stream/async/ObTableClientQueryAsyncStreamResult.java +++ b/src/main/java/com/alipay/oceanbase/rpc/stream/async/ObTableClientQueryAsyncStreamResult.java @@ -19,6 +19,7 @@ import com.alipay.oceanbase.rpc.ObTableClient; import com.alipay.oceanbase.rpc.exception.ObTableException; +import com.alipay.oceanbase.rpc.exception.ObTableGlobalIndexRouteException; import com.alipay.oceanbase.rpc.exception.ObTableTimeoutExcetion; import com.alipay.oceanbase.rpc.location.model.partition.ObPair; import com.alipay.oceanbase.rpc.protocol.payload.ObPayload; @@ -92,6 +93,23 @@ protected ObTableQueryAsyncResult executeAsync(ObPair partId client.calculateContinuousFailure(indexTableName, e.getMessage()); throw e; } + } else if (e instanceof ObTableGlobalIndexRouteException) { + if ((tryTimes - 1) < client.getRuntimeRetryTimes()) { + logger + .warn( + "meet global index route expcetion: indexTableName:{} partition id:{}, errorCode: {}, retry times {}", + indexTableName, partIdWithObTable.getLeft(), + ((ObTableException) e).getErrorCode(), tryTimes, e); + indexTableName = client.getIndexTableName(tableName, tableQuery.getIndexName(), + tableQuery.getScanRangeColumns(), true); + } else { + logger + .warn( + "meet global index route expcetion: indexTableName:{} partition id:{}, errorCode: {}, reach max retry times {}", + indexTableName, partIdWithObTable.getLeft(), + ((ObTableException) e).getErrorCode(), tryTimes, e); + throw e; + } } else { client.calculateContinuousFailure(indexTableName, e.getMessage()); throw e; diff --git a/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientQueryAsyncImpl.java b/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientQueryAsyncImpl.java index e2aa0f44..390fe434 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientQueryAsyncImpl.java +++ b/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientQueryAsyncImpl.java @@ -157,7 +157,7 @@ public Map> getPartitions() throws Exception { String indexName = tableQuery.getIndexName(); if (!this.obTableClient.isOdpMode()) { indexTableName = obTableClient.getIndexTableName(tableName, indexName, - tableQuery.getScanRangeColumns()); + tableQuery.getScanRangeColumns(), false); } this.partitionObTables = new HashMap>(); diff --git a/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientQueryImpl.java b/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientQueryImpl.java index b3b4e7a5..1f808bc5 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientQueryImpl.java +++ b/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientQueryImpl.java @@ -149,7 +149,7 @@ public ObTableClientQueryStreamResult executeInternal() throws Exception { String indexName = tableQuery.getIndexName(); if (!this.obTableClient.isOdpMode()) { indexTableName = obTableClient.getIndexTableName(tableName, indexName, - tableQuery.getScanRangeColumns()); + tableQuery.getScanRangeColumns(), false); } for (ObNewRange rang : tableQuery.getKeyRanges()) {