Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix compatibility problem caused by getPartition in ODP mode #273

Merged
merged 5 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 23 additions & 42 deletions src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
import static com.alipay.oceanbase.rpc.location.model.TableEntry.HBASE_ROW_KEY_ELEMENT;
import static com.alipay.oceanbase.rpc.location.model.partition.ObPartIdCalculator.*;
import static com.alipay.oceanbase.rpc.property.Property.*;
import static com.alipay.oceanbase.rpc.protocol.payload.ResultCodes.OB_ERR_KV_ROUTE_ENTRY_EXPIRE;
import static com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableOperationType.*;
import static com.alipay.oceanbase.rpc.util.TableClientLoggerFactory.*;
import static java.lang.String.format;
Expand Down Expand Up @@ -618,7 +617,6 @@ private <T> T execute(String tableName, TableExecuteCallback<T> callback, ObServ
throw new IllegalArgumentException("table name is null");
}
boolean needRefreshTableEntry = false;
boolean needRenew = false;
boolean needFetchAllRouteInfo = false;
int tryTimes = 0;
long startExecute = System.currentTimeMillis();
Expand All @@ -636,7 +634,7 @@ private <T> T execute(String tableName, TableExecuteCallback<T> callback, ObServ
ObPair<Long, ObTableParam> obPair = null;
try {
if (odpMode) {
obPair = getODPTableWithRowKeyValue(tableName, callback.getRowKey(), needRenew);
obPair = new ObPair<Long, ObTableParam>(0L, new ObTableParam(odpTable));
} else {
obPair = getTable(tableName, callback.getRowKey(),
needRefreshTableEntry, tableEntryRefreshIntervalWait,
Expand All @@ -654,17 +652,9 @@ private <T> T execute(String tableName, TableExecuteCallback<T> callback, ObServ
"execute while meet Exception, errorCode: {} , errorMsg: {}, try times {}",
((ObTableException) ex).getErrorCode(), ex.getMessage(),
tryTimes);
// if the cause is that ODP partition meta have expired, try to fetch new one
if (ex instanceof ObTablePartitionChangeException
&& ((ObTablePartitionChangeException) ex).getErrorCode() == OB_ERR_KV_ROUTE_ENTRY_EXPIRE.errorCode) {
needRenew = true;
} else {
throw ex;
}
} else {
logger.warn("execute while meet Exception, errorMsg: {}, try times {}",
ex.getMessage(), tryTimes);
throw ex;
}
} else {
RUNTIME.error("retry failed with exception", ex);
Expand All @@ -686,7 +676,7 @@ private <T> T execute(String tableName, TableExecuteCallback<T> callback, ObServ
} else if (ex instanceof ObTableException
&& ((ObTableException) ex).isNeedRefreshTableEntry()) {
needRefreshTableEntry = true;

if (retryOnChangeMasterTimes && (tryTimes - 1) < runtimeRetryTimes) {
if (ex instanceof ObTableNeedFetchAllException) {
needFetchAllRouteInfo = true;
Expand Down Expand Up @@ -787,7 +777,7 @@ private <T> T execute(String tableName, OperationExecuteCallback<T> callback,
throw new IllegalArgumentException("table name is null");
}
boolean needRefreshTableEntry = false;
boolean needRenew = false;
boolean needFetchAllRouteInfo = false;
int tryTimes = 0;
long startExecute = System.currentTimeMillis();
while (true) {
Expand All @@ -804,7 +794,7 @@ private <T> T execute(String tableName, OperationExecuteCallback<T> callback,
ObPair<Long, ObTableParam> obPair = null;
try {
if (odpMode) {
obPair = getODPTableWithRowKey(tableName, callback.getRowKey(), needRenew);
obPair = new ObPair<Long, ObTableParam>(0L, new ObTableParam(odpTable));
} else {
if (null != callback.getRowKey()) {
// in the case of retry, the location always needs to be refreshed here
Expand Down Expand Up @@ -837,20 +827,10 @@ private <T> T execute(String tableName, OperationExecuteCallback<T> callback,
"execute while meet Exception, errorCode: {} , errorMsg: {}, try times {}",
((ObTableException) ex).getErrorCode(), ex.getMessage(),
tryTimes);
// if the cause is that ODP partition meta have expired, try to fetch new one
if (ex instanceof ObTablePartitionChangeException
&& ((ObTablePartitionChangeException) ex).getErrorCode() == OB_ERR_KV_ROUTE_ENTRY_EXPIRE.errorCode) {
needRenew = true;
} else {
RUNTIME.error("execute while meet exception", ex);
throw ex;
}
} else {
logger.warn(
"execute while meet Exception, exception: {}, try times {}", ex,
tryTimes);
RUNTIME.error("execute while meet exception", ex);
throw ex;
"execute while meet Exception, exception: {}, try times {}", ex,
tryTimes);
}
} else {
RUNTIME.error("retry failed with exception", ex);
Expand All @@ -870,7 +850,7 @@ private <T> T execute(String tableName, OperationExecuteCallback<T> callback,
}
} else if (ex instanceof ObTableException
&& ((ObTableException) ex).isNeedRefreshTableEntry()) {
// if the problem is the lack of row key name, throw directly
// if the problem is the lack of row key name, throw directly
if (tableRowKeyElement.get(tableName) == null) {
logger.warn("tableRowKeyElement not found table name: {}", ex.getMessage());
RUNTIME.error("tableRowKeyElement not found table name", ex);
Expand All @@ -880,7 +860,7 @@ private <T> T execute(String tableName, OperationExecuteCallback<T> callback,
if (retryOnChangeMasterTimes && (tryTimes - 1) < runtimeRetryTimes) {
if (ex instanceof ObTableNeedFetchAllException) {
getOrRefreshTableEntry(tableName, true, true, true);
// reset failure count while fetch all route info
// reset failure count while fetch all route info
this.resetExecuteContinuousFailureCount(tableName);
}
} else {
Expand Down Expand Up @@ -1232,7 +1212,7 @@ public TableEntry getOrRefreshTableEntry(final String tableName, final boolean r
if ((fetchAll && (fetchAllInterval < punishInterval))
|| (!fetchAll && (interval < punishInterval))) {
if (waitForRefresh) {
long toHoldTime = punishInterval - interval;
long toHoldTime = fetchAll ? (punishInterval - fetchAllInterval) : (punishInterval - interval);
logger
.info(
"punish table entry {} : table entry refresh time {} punish interval {} current time {}. wait for refresh times {}ms",
Expand Down Expand Up @@ -1374,13 +1354,13 @@ public TableEntry refreshTableLocationByTabletId(TableEntry tableEntry, String t
if (info == null) {
throw new ObTableEntryRefreshException("Partition info is null for tabletId=" + tabletId);
}

long lastRefreshTime = info.getLastUpdateTime();
long currentTime = System.currentTimeMillis();
if (currentTime - lastRefreshTime < tableEntryRefreshIntervalCeiling) {
return tableEntry;
}

Lock lock = info.refreshLock;
boolean acquired = false;
try {
Expand All @@ -1400,7 +1380,7 @@ public TableEntry refreshTableLocationByTabletId(TableEntry tableEntry, String t
if (currentTime - lastRefreshTime < tableEntryRefreshIntervalCeiling) {
return tableEntry;
}

tableEntry = loadTableEntryLocationWithPriority(
serverRoster,
tableEntryKey,
Expand All @@ -1412,7 +1392,7 @@ public TableEntry refreshTableLocationByTabletId(TableEntry tableEntry, String t
serverAddressCachingTimeout,
sysUA
);

tableEntry.prepareForWeakRead(serverRoster.getServerLdcLocation());

} finally {
Expand All @@ -1430,7 +1410,7 @@ public TableEntry refreshTableLocationByTabletId(TableEntry tableEntry, String t
RUNTIME.error(LCD.convert("01-00020"), tableEntryKey, tableEntry, e);
throw new ObTableEntryRefreshException(errorMsg, e);
}

tableLocations.put(tableName, tableEntry);
tableEntryRefreshContinuousFailureCount.set(0);
return tableEntry;
Expand Down Expand Up @@ -1655,7 +1635,7 @@ private ObPair<Long, ReplicaLocation> getPartitionReplica(TableEntry tableEntry,
private ReplicaLocation getPartitionLocation(TableEntry tableEntry, long partId,
ObServerRoute route) {
// In all cases for 3.x and for non-partitioned tables in 4.x, partId will not change.
// If it is 4.x, it will be converted to tablet id.
// If it is 4.x, it will be converted to tablet id.
partId = getTabletIdByPartId(tableEntry, partId);
return tableEntry.getPartitionEntry().getPartitionLocationWithTabletId(partId)
.getReplica(route);
Expand Down Expand Up @@ -1980,9 +1960,9 @@ public ObPair<Long, ObTableParam> getTableInternal(String tableName, TableEntry
obPartitionLocationInfo = getOrRefreshPartitionInfo(tableEntry, tableName, tabletId);
replica = getPartitionLocation(obPartitionLocationInfo, route);
/**
* Normally, getOrRefreshPartitionInfo makes sure that a thread only continues if it finds the leader
* during a route refresh. But sometimes, there might not be a leader yet. In this case, the thread
* is released, and since it can't get the replica, it throws an no master exception.
* Normally, getOrRefreshPartitionInfo makes sure that a thread only continues if it finds the leader
* during a route refresh. But sometimes, there might not be a leader yet. In this case, the thread
* is released, and since it can't get the replica, it throws an no master exception.
*/
if (replica == null && obPartitionLocationInfo.getPartitionLocation().getLeader() == null) {
RUNTIME.error(LCD.convert("01-00028"), partitionId, tableEntry.getPartitionEntry(), tableEntry);
Expand Down Expand Up @@ -2023,14 +2003,14 @@ public ObPair<Long, ObTableParam> getTableInternal(String tableName, TableEntry
tableEntry = getOrRefreshTableEntry(tableName, true, waitForRefresh, false);
}
}

if (ObGlobal.obVsnMajor() >= 4) {
obPartitionLocationInfo = getOrRefreshPartitionInfo(tableEntry, tableName, tabletId);
replica = getPartitionLocation(obPartitionLocationInfo, route);
} else {
replica = getPartitionReplica(tableEntry, partitionId, route).getRight();
}

addr = replica.getAddr();
obTable = tableRoster.get(addr);

Expand Down Expand Up @@ -2334,8 +2314,9 @@ private List<ObPair<Long, ObTableParam>> getTables(String tableName, ObTableQuer
ObTableParam param = new ObTableParam(obTable);
param.setPartId(partId);
partId = getTabletIdByPartId(tableEntry, partId);
param.setLsId(tableEntry.getPartitionEntry().getPartitionInfo(partId).getTabletLsId());

if (ObGlobal.obVsnMajor() >= 4 && tableEntry != null) {
param.setLsId(tableEntry.getPartitionEntry().getPartitionInfo(partId).getTabletLsId());
}
param.setTableId(tableEntry.getTableId());
// real partition(tablet) id
param.setPartitionId(partId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ protected ObPayload commonExecute(ObTableClient client, Logger logger,
ObPayload result;
ObTable subObTable = partIdWithIndex.getRight().getObTable();
boolean needRefreshTableEntry = false;
boolean odpNeedRenew = false;
int tryTimes = 0;
long startExecute = System.currentTimeMillis();
Set<String> failedServerList = null;
Expand All @@ -138,9 +137,7 @@ protected ObPayload commonExecute(ObTableClient client, Logger logger,
try {
if (tryTimes > 1) {
if (client.isOdpMode()) {
subObTable = client
.getODPTableWithPartId(tableName, partIdWithIndex.getLeft(),
odpNeedRenew).getRight().getObTable();
subObTable = client.getOdpTable();
} else {
if (route == null) {
route = client.getReadRoute();
Expand Down Expand Up @@ -192,24 +189,16 @@ protected ObPayload commonExecute(ObTableClient client, Logger logger,
"tablename:{} stream query execute while meet Exception needing retry, errorCode: {}, errorMsg: {}, try times {}",
indexTableName, ((ObTableException) e).getErrorCode(),
e.getMessage(), tryTimes);
if (e instanceof ObTablePartitionChangeException
&& ((ObTablePartitionChangeException) e).getErrorCode() == ResultCodes.OB_ERR_KV_ROUTE_ENTRY_EXPIRE.errorCode) {
odpNeedRenew = true;
} else {
throw e;
}
} else if (e instanceof IllegalArgumentException) {
logger
.warn(
"tablename:{} stream query execute while meet Exception needing retry, try times {}, errorMsg: {}",
indexTableName, tryTimes, e.getMessage());
throw e;
} else {
logger
.warn(
"tablename:{} stream query execute while meet Exception needing retry, try times {}",
indexTableName, tryTimes, e);
throw e;
}
} else {
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,20 +262,12 @@ public Map<Long, ObPair<ObTableParam, List<ObPair<Integer, ObTableOperation>>>>
for (int j = 0; j < rowKeySize; j++) {
rowKey[j] = rowKeyObject.getObj(j).getValue();
}
ObPair<Long, ObTableParam> tableObPair = null;
if (!obTableClient.isOdpMode()) {
tableObPair = obTableClient.getTable(tableName, rowKey,
false, false, obTableClient.getRoute(batchOperation.isReadOnly()));
} else {
tableObPair = obTableClient.getODPTableWithRowKeyValue(tableName, rowKey, false);
}
if (tableObPair == null) {
throw new ObTableUnexpectedException("fail to get table pair in batch");
}
final ObPair<Long, ObTableParam> tmpTableObPair = tableObPair;
ObPair<Long, ObTableParam> tableObPair = obTableClient.getTable(
tableName, rowKey, false, false,
obTableClient.getRoute(batchOperation.isReadOnly()));
ObPair<ObTableParam, List<ObPair<Integer, ObTableOperation>>> obTableOperations = partitionOperationsMap
.computeIfAbsent(tmpTableObPair.getLeft(), k -> new ObPair<>(
tmpTableObPair.getRight(), new ArrayList<>()));
.computeIfAbsent(tableObPair.getLeft(), k -> new ObPair<>(
tableObPair.getRight(), new ArrayList<>()));
obTableOperations.getRight().add(new ObPair<>(i, operation));
}
return partitionOperationsMap;
Expand Down Expand Up @@ -351,16 +343,12 @@ public void partitionExecute(ObTableOperationResult[] results,
}
tryTimes++;
try {
if (tryTimes > 1) {
if (obTableClient.isOdpMode()) {
ObTableParam newParam = obTableClient.getODPTableWithPartId(tableName,
originPartId, odpNeedRenew).getRight();
subObTable = newParam.getObTable();
subRequest.setPartitionId(newParam.getPartitionId());
subRequest.setTableId(newParam.getTableId());
} else {
// getTable() when we need retry
// we should use partIdx to get table
if (obTableClient.isOdpMode()) {
subObTable = obTableClient.getOdpTable();
} else {
// getTable() when we need retry
// we should use partIdx to get table
if (tryTimes > 1) {
if (route == null) {
route = obTableClient.getRoute(batchOperation.isReadOnly());
}
Expand Down Expand Up @@ -408,12 +396,7 @@ public void partitionExecute(ObTableOperationResult[] results,
"batch ops execute while meet Exception, tablename:{}, errorMsg: {}, try times {}",
tableName, ex.getMessage(),
tryTimes);
if (ex instanceof ObTablePartitionChangeException
&& ((ObTablePartitionChangeException) ex).getErrorCode() == ResultCodes.OB_ERR_KV_ROUTE_ENTRY_EXPIRE.errorCode) {
odpNeedRenew = true;
}
} else {
RUNTIME.error("retry fail when normal batch executing", ex);
throw ex;
}
} else if (ex instanceof ObTableReplicaNotReadableException) {
Expand Down
Loading
Loading