Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix] adapt to lob-related table object type #112

Closed
wants to merge 11 commits into from
36 changes: 33 additions & 3 deletions src/main/java/com/alipay/oceanbase/rpc/ObClusterTableBatchOps.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@

package com.alipay.oceanbase.rpc;

import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableBatchOperation;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableBatchOperationResult;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableEntityType;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.*;
import com.alipay.oceanbase.rpc.table.AbstractTableBatchOps;
import com.alipay.oceanbase.rpc.table.ObTableClientBatchOpsImpl;

Expand Down Expand Up @@ -104,11 +102,20 @@ public void append(Object[] rowkeys, String[] columns, Object[] values, boolean
tableBatchOps.append(rowkeys, columns, values, withResult);
}

/*
* Put.
*/
@Override
public void put(Object[] rowkeys, String[] columns, Object[] values) {
tableBatchOps.put(rowkeys, columns, values);
}

/*
* Execute.
*/
@Override
public List<Object> execute() throws Exception {
preCheck();
return tableBatchOps.execute();
}

Expand All @@ -117,13 +124,15 @@ public List<Object> execute() throws Exception {
*/
@Override
public List<Object> executeWithResult() throws Exception {
preCheck();
return tableBatchOps.executeWithResult();
}

/*
* Execute internal.
*/
public ObTableBatchOperationResult executeInternal() throws Exception {
preCheck();
return tableBatchOps.executeInternal();
}

Expand Down Expand Up @@ -167,4 +176,25 @@ public void setAtomicOperation(boolean atomicOperation) {
super.setAtomicOperation(atomicOperation);
tableBatchOps.setAtomicOperation(atomicOperation);
}

@Override
public void setReturnOneResult(boolean returnOneResult) {
super.setReturnOneResult(returnOneResult);
tableBatchOps.setReturnOneResult(returnOneResult);
}

void preCheck() {
List<ObTableOperation> operations = this.tableBatchOps.getObTableBatchOperation().getTableOperations();
if (operations.isEmpty()) {
throw new IllegalArgumentException("operations is empty");
}
ObTableOperationType lastType = operations.get(0).getOperationType();
if (returnOneResult
&& !(this.tableBatchOps.getObTableBatchOperation().isSameType() && (lastType == ObTableOperationType.INSERT
|| lastType == ObTableOperationType.PUT
|| lastType == ObTableOperationType.REPLACE || lastType == ObTableOperationType.DEL))) {
throw new IllegalArgumentException(
"returnOneResult only support multi-insert/put/replace/del");
}
}
}
13 changes: 13 additions & 0 deletions src/main/java/com/alipay/oceanbase/rpc/ObGlobal.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,20 @@ public static String getObVsnString(long version) {
getObVsnMajorPatch(version), getObVsnMinorPatch(version));
}

public static boolean isLsOpSupport() {
boolean isSupp = false;
if (OB_VERSION != 0) {
if (obVsnMajor() == 4 && obVsnMinor() == 2 && obVsnMajorPatch() == 3
&& OB_VERSION >= OB_VERSION_4_2_3_0) {
isSupp = true;
}
}
return isSupp;
}

public static final long OB_VERSION_4_2_1_0 = calcVersion(4, (short) 2, (byte) 1, (byte) 0);

public static final long OB_VERSION_4_2_3_0 = calcVersion(4, (short) 2, (byte) 3, (byte) 0);

public static long OB_VERSION = calcVersion(0, (short) 0, (byte) 0, (byte) 0);
}
54 changes: 50 additions & 4 deletions src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -935,7 +935,8 @@ public void syncRefreshMetadata() throws Exception {
* @return the real table name
*/
public String getIndexTableName(final String dataTableName, final String indexName,
List<String> scanRangeColumns, boolean forceRefreshIndexInfo) throws Exception {
List<String> scanRangeColumns, boolean forceRefreshIndexInfo)
throws Exception {
String indexTableName = dataTableName;
if (indexName != null && !indexName.isEmpty() && !indexName.equalsIgnoreCase("PRIMARY")) {
String tmpTableName = constructIndexTableName(dataTableName, indexName);
Expand All @@ -950,7 +951,8 @@ public String getIndexTableName(final String dataTableName, final String indexNa
indexTableName = tmpTableName;
if (scanRangeColumns.isEmpty()) {
throw new ObTableException(
"query by global index need add all index keys in order, indexTableName:" + indexTableName);
"query by global index need add all index keys in order, indexTableName:"
+ indexTableName);
} else {
addRowKeyElement(indexTableName,
scanRangeColumns.toArray(new String[scanRangeColumns.size()]));
Expand Down Expand Up @@ -982,7 +984,8 @@ public String constructIndexTableName(final String dataTableName, final String i
return "__idx_" + dataTableId + "_" + indexName;
}

public ObIndexInfo getOrRefreshIndexInfo(final String indexTableName, boolean forceRefresh) throws Exception {
public ObIndexInfo getOrRefreshIndexInfo(final String indexTableName, boolean forceRefresh)
throws Exception {

ObIndexInfo indexInfo = indexinfos.get(indexTableName);
if (!forceRefresh && indexInfo != null) {
Expand All @@ -1004,7 +1007,8 @@ public ObIndexInfo getOrRefreshIndexInfo(final String indexTableName, boolean fo
if (!forceRefresh && indexInfo != null) {
return indexInfo;
} else {
logger.info("index info is not exist, create new index info, indexTableName: {}", indexTableName);
logger.info("index info is not exist, create new index info, indexTableName: {}",
indexTableName);
int serverSize = serverRoster.getMembers().size();
int refreshTryTimes = tableEntryRefreshTryTimes > serverSize ? serverSize
: tableEntryRefreshTryTimes;
Expand Down Expand Up @@ -1533,6 +1537,7 @@ public ObPair<Long, ObTableParam> getTable(String tableName, TableEntry tableEnt
long partIdx = tableEntry.getPartIdx(partId);
partId = tableEntry.isPartitionTable() ? tableEntry.getPartitionInfo()
.getPartTabletIdMap().get(partIdx) : partId;
param.setLsId(tableEntry.getPartitionEntry().getLsId(partId));
}

param.setTableId(tableEntry.getTableId());
Expand Down Expand Up @@ -1998,6 +2003,47 @@ public ObPayload execute(ObPair<Long, ObTableParam> obPair) throws Exception {
});
}

/**
* put with result
* @param tableName which table to put
* @param rowKey insert row key
* @param keyRanges scan range
* @param columns columns name to put
* @param values new values
* @return execute result
* @throws Exception exception
*/
public ObPayload putWithResult(final String tableName, final Object[] rowKey,
final List<ObNewRange> keyRanges, final String[] columns,
final Object[] values) throws Exception {
final long start = System.currentTimeMillis();
return executeMutation(tableName,
new MutationExecuteCallback<ObPayload>(rowKey, keyRanges) {
/**
* Execute.
*/
@Override
public ObPayload execute(ObPair<Long, ObTableParam> obPair) throws Exception {
long TableTime = System.currentTimeMillis();
ObTableParam tableParam = obPair.getRight();
ObTable obTable = tableParam.getObTable();
ObTableOperationRequest request = ObTableOperationRequest.getInstance(
tableName, PUT, rowKey, columns, values,
obTable.getObTableOperationTimeout());
request.setTableId(tableParam.getTableId());
// partId/tabletId
request.setPartitionId(tableParam.getPartitionId());
ObPayload result = obTable.execute(request);
String endpoint = obTable.getIp() + ":" + obTable.getPort();
MonitorUtil.info(request, database, tableName, "PUT", endpoint, rowKey,
(ObTableOperationResult) result, TableTime - start,
System.currentTimeMillis() - TableTime, getslowQueryMonitorThreshold());
checkResult(obTable.getIp(), obTable.getPort(), request, result);
return result;
}
});
}

/**
* Replace.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,15 @@ public static ObTableException convertToObTableException(String host, int port,
}

if (resultCodes.errorCode == OB_ERR_KV_GLOBAL_INDEX_ROUTE.errorCode) {
return new ObTableGlobalIndexRouteException("[" + String.valueOf(resultCodes.errorCode) + "]" + "["
+ resultCodes.name() + "]" + "[" + errMsg + "]" + "[" + server
+ "]" + "[" + trace + "]", resultCodes.errorCode);
return new ObTableGlobalIndexRouteException("[" + String.valueOf(resultCodes.errorCode)
+ "]" + "[" + resultCodes.name() + "]"
+ "[" + errMsg + "]" + "[" + server + "]"
+ "[" + trace + "]", resultCodes.errorCode);
} else {
// [errCode][errCodeName][errMsg][server][trace]
return new ObTableException("[" + String.valueOf(resultCodes.errorCode) + "]" + "["
+ resultCodes.name() + "]" + "[" + errMsg + "]" + "[" + server
+ "]" + "[" + trace + "]", resultCodes.errorCode);
+ resultCodes.name() + "]" + "[" + errMsg + "]" + "["
+ server + "]" + "[" + trace + "]", resultCodes.errorCode);
}
}

Expand Down
15 changes: 13 additions & 2 deletions src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import static com.alipay.oceanbase.rpc.util.RandomUtil.getRandomNum;
import static com.alipay.oceanbase.rpc.util.TableClientLoggerFactory.*;
import static java.lang.String.format;
import static com.alipay.oceanbase.rpc.protocol.payload.Constants.INVALID_LS_ID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -154,9 +155,11 @@ public class LocationUtil {

private static final String PROXY_LOCATION_SQL_PARTITION_V4 = "SELECT /*+READ_CONSISTENCY(WEAK)*/ A.tablet_id as tablet_id, A.svr_ip as svr_ip, A.sql_port as sql_port, "
+ "A.table_id as table_id, A.role as role, A.replica_num as replica_num, A.part_num as part_num, B.svr_port as svr_port, B.status as status, B.stop_time as stop_time "
+ ", A.spare1 as replica_type "
+ ", A.spare1 as replica_type, D.ls_id as ls_id "
+ "FROM oceanbase.__all_virtual_proxy_schema A inner join oceanbase.__all_server B on A.svr_ip = B.svr_ip and A.sql_port = B.inner_port "
+ "WHERE tenant_name = ? and database_name=? and table_name = ? and tablet_id in ({0})";
+ "inner join oceanbase.DBA_OB_TENANTS C on C.tenant_name = A.tenant_name "
+ "left join oceanbase.CDB_OB_TABLET_TO_LS D on D.tenant_id = C.tenant_id and D.tablet_id = A.tablet_id "
+ "WHERE C.tenant_name = ? and database_name= ? and table_name = ? and A.tablet_id in ({0}) ";

private static final String PROXY_FIRST_PARTITION_SQL_V4 = "SELECT /*+READ_CONSISTENCY(WEAK)*/ part_id, part_name, tablet_id, high_bound_val, sub_part_num "
+ "FROM oceanbase.__all_virtual_proxy_partition "
Expand Down Expand Up @@ -945,11 +948,18 @@ private static ObPartitionEntry getPartitionLocationFromResultSet(TableEntry tab
throws SQLException,
ObTablePartitionLocationRefreshException {
Map<Long, ObPartitionLocation> partitionLocation = new HashMap<Long, ObPartitionLocation>();
Map<Long, Long> tabletLsIdMap = new HashMap<>();
while (rs.next()) {
ReplicaLocation replica = buildReplicaLocation(rs);
long partitionId;
if (ObGlobal.obVsnMajor() >= 4) {
partitionId = rs.getLong("tablet_id");
long lsId = rs.getLong("ls_id");
if (!rs.wasNull()) {
tabletLsIdMap.put(partitionId, lsId);
} else {
tabletLsIdMap.put(partitionId, INVALID_LS_ID); // non-partitioned table
}
} else {
partitionId = rs.getLong("partition_id");
if (tableEntry.isPartitionTable()
Expand All @@ -975,6 +985,7 @@ private static ObPartitionEntry getPartitionLocationFromResultSet(TableEntry tab
}
ObPartitionEntry partitionEntry = new ObPartitionEntry();
partitionEntry.setPartitionLocation(partitionLocation);
partitionEntry.setTabletLsIdMap(tabletLsIdMap);

if (ObGlobal.obVsnMajor() < 4) {
for (long i = 0; i < tableEntry.getPartitionNum(); i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
public class ObPartitionEntry {
private Map<Long, ObPartitionLocation> partitionLocation = new HashMap<Long, ObPartitionLocation>();

// mapping from tablet id to ls id, and the part id to tablet id mapping is in ObPartitionInfo
private Map<Long, Long> tabletLsIdMap = new HashMap<>();

public Map<Long, ObPartitionLocation> getPartitionLocation() {
return partitionLocation;
}
Expand Down Expand Up @@ -83,4 +86,14 @@ public void prepareForWeakRead(ObServerLdcLocation ldcLocation) {
public String toString() {
return "ObPartitionEntry{" + "partitionLocation=" + partitionLocation + '}';
}

public Map<Long, Long> getTabletLsIdMap() {
return tabletLsIdMap;
}

public void setTabletLsIdMap(Map<Long, Long> tabletLsIdMap) {
this.tabletLsIdMap = tabletLsIdMap;
}

public long getLsId(long tabletId) { return tabletLsIdMap.get(tabletId); }
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public class ObPartitionInfo {
private ObPartDesc firstPartDesc = null;
private ObPartDesc subPartDesc = null;
private List<ObColumn> partColumns = new ArrayList<ObColumn>(1);
// mapping from part id to tablet id, and the tablet id to ls id mapping is in ObPartitionInfo
private Map<Long, Long> partTabletIdMap = null;
private Map<String, Long> partNameIdMap = null;
private Map<String, Integer> rowKeyElement = null;
Expand Down
Loading
Loading