diff --git a/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java b/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java index 7d05e9ee..b05eb954 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java +++ b/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java @@ -156,6 +156,11 @@ public class ObTableClient extends AbstractObTableClient implements Lifecycle { private int odpPort = 2883; private ObTable odpTable = null; + // tableGroup <-> Table + private ConcurrentHashMap TableGroupCacheLocks = new ConcurrentHashMap(); + private ConcurrentHashMap TableGroupCache = new ConcurrentHashMap(); // tableGroup -> Table + private ConcurrentHashMap TableGroupInverted = new ConcurrentHashMap(); // Table -> tableGroup + private boolean isTableGroup = false; /* * Init. @@ -572,7 +577,7 @@ private T execute(String tableName, TableExecuteCallback callback, ObServ } } else { logger.warn("exhaust retry when replica not readable: {}", - ex.getMessage()); + ex.getMessage()); RUNTIME.error("replica not readable", ex); throw ex; } @@ -1285,6 +1290,45 @@ private TableEntry refreshTableEntry(TableEntry tableEntry, String tableName, bo return tableEntry; } + /** + * 根据 tableGroup 获取其中一个tableName + * @param realTableName + * @param tableName + * @return + * @throws Exception + */ + private String refreshTableName(String realTableName, String tableName) throws Exception { + TableEntryKey tableEntryKey = new TableEntryKey(clusterName, tenantName, database, + tableName); + try { + realTableName = loadTableNameWithGroup(serverRoster, // + tableEntryKey,// + tableEntryAcquireConnectTimeout,// + tableEntryAcquireSocketTimeout,// + serverAddressPriorityTimeout,// + serverAddressCachingTimeout, sysUA); + } catch (ObTableNotExistException e) { + RUNTIME.error("refreshTableName from tableGroup meet exception", e); + throw e; + } catch (ObTableServerCacheExpiredException e) { + RUNTIME.error("refreshTableEntry from tableGroup meet exception", e); + throw e; + } catch (Exception e) { + RUNTIME.error("refreshTableEntry from tableGroup meet exception", tableEntryKey, realTableName, e); + throw new ObTableNotExistException(String.format( + "failed to get table name key=%s original tableName=%s ", tableEntryKey, + realTableName), e); + } + TableGroupCache.put(tableName, realTableName); + TableGroupInverted.put(realTableName, tableName); + if (logger.isInfoEnabled()) { + logger.info( + "get table name from tableGroup, dataSource: {}, tableName: {}, refresh: {} key:{} realTableName:{} ", + dataSourceName, tableName, true, tableEntryKey, realTableName); + } + return realTableName; + } + /** * 根据 rowkey 获取分区 id * @param tableEntry @@ -1679,6 +1723,80 @@ public List> getTables(String tableName, Object[] sta return obTableParams; } + /** + * get table name with table group + * @param tableName + * @param refresh + * @return + * @throws Exception + */ + public String tryGetTableNameFromTableGroupCache(final String tableName, final boolean refresh) throws Exception { + isTableGroup = true; + String realTableName = TableGroupCache.get(tableName); // tableGroup -> Table + // get tableName from cache + if (realTableName != null && !realTableName.isEmpty() && !refresh) { + return realTableName; + } + + // not find in cache, should get tableName from observer + Lock tempLock = new ReentrantLock(); + Lock lock = TableGroupCacheLocks.putIfAbsent(tableName, tempLock); + lock = (lock == null) ? tempLock : lock; // check the first lock + + // attempt lock the refreshing action, avoiding concurrent refreshing + // use the time-out mechanism, avoiding the rpc hanging up + boolean acquired = lock.tryLock(metadataRefreshLockTimeout, TimeUnit.MILLISECONDS); + + if (!acquired) { + String errMsg = "try to lock tableGroup inflect timeout " + "dataSource:" + + dataSourceName + " ,tableName:" + tableName + + " , timeout:" + metadataRefreshLockTimeout + "."; + RUNTIME.error(errMsg); + throw new ObTableEntryRefreshException(errMsg); + } + + try { + String newRealTableName = TableGroupCache.get(tableName); + if (((realTableName == null || realTableName.isEmpty()) && (newRealTableName == null || newRealTableName.isEmpty())) + || (refresh && newRealTableName.equalsIgnoreCase(realTableName))) { + if (logger.isInfoEnabled()) { + if (realTableName != null && !realTableName.isEmpty()) { + logger.info( + "realTableName need refresh, create new table entry, tablename: {}", + tableName); + } else { + logger.info("realTableName not exist, create new table entry, tablename: {}", + tableName); + } + } + + try { + return refreshTableName(realTableName, tableName); + } catch (ObTableNotExistException e) { + RUNTIME.error("getOrRefreshTableName from TableGroup meet exception", e); + throw e; + } catch (ObTableServerCacheExpiredException e) { + RUNTIME.error("getOrRefreshTableName from TableGroup meet exception", e); + + if (logger.isInfoEnabled()) { + logger.info("server addr is expired and it will refresh metadata."); + } + syncRefreshMetadata(); + } catch (Throwable t) { + RUNTIME.error("getOrRefreshTableName from TableGroup meet exception", t); + throw t; + } + // failure reach the try times may all the server change + if (logger.isInfoEnabled()) { + logger.info("refresh table Name from TableGroup failure"); + } + } + return newRealTableName; + } finally { + lock.unlock(); + } + } + /** * Aggregate. * @param tableName table want to aggregate @@ -3086,4 +3204,16 @@ public String toString() { + ", \n ocpModel = " + ocpModel + "\n}\n"; } + public void setIsTableGroup(boolean isTableGroup) { + this.isTableGroup = isTableGroup; + } + + public boolean isTableGroup() { + return isTableGroup; + } + + public ConcurrentHashMap getTableGroupInverted() { + return TableGroupInverted; + } + } diff --git a/src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java b/src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java index e95f3e16..a4fc3088 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java +++ b/src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java @@ -173,6 +173,10 @@ public class LocationUtil { private static final String home = System.getProperty("user.home", "/home/admin"); + private static final String TABLE_GROUP_GET_TABLE_NAME_V4 = "SELECT /*+READ_CONSISTENCY(WEAK)*/ table_name " + + "FROM oceanbase.CDB_OB_TABLEGROUP_TABLES " + + "WHERE tablegroup_name = ? and tenant_id = ? limit 1;"; + private static final int TEMPLATE_PART_ID = -1; private abstract static class TableEntryRefreshWithPriorityCallback { @@ -437,6 +441,155 @@ TableEntry execute(Connection connection) }); } + private abstract static class GetTableNameWithPriorityCallback { + abstract T execute(ObServerAddr obServerAddr) throws ObTableNotExistException; + } + + private abstract static class GetTableNameCallback { + abstract T execute(Connection connection) throws ObTableNotExistException; + } + + /* + * call TableName with group + */ + private static String callTableNameRefresh(ObServerAddr obServerAddr, TableEntryKey key, + long connectTimeout, long socketTimeout, + ObUserAuth sysUA, boolean initialized, + GetTableNameCallback callback) + throws ObTableNotExistException { + String url = formatObServerUrl(obServerAddr, connectTimeout, socketTimeout); + Connection connection = null; + String realTableName; + try { + connection = getMetaRefreshConnection(url, sysUA); + realTableName = callback.execute(connection); + } catch (ObTableNotExistException e) { + // avoid to refresh meta for ObTableNotExistException + RUNTIME.error("callTableName meet exception", e); + throw e; + } catch (Exception e) { + throw new ObTableNotExistException(format( + "fail to get table name from remote url=%s, key=%s", url, key), e); + } finally { + try { + if (null != connection) { + connection.close(); + } + } catch (SQLException e) { + // ignore + } + } + + if (realTableName != null && !realTableName.isEmpty()) { + return realTableName; + } else { + throw new ObTableNotExistException("table name is invalid, addr = " + obServerAddr + + " key =" + key + " tableName =" + realTableName); + } + + } + + /* + * call table name with group + */ + private static String callTableNameWithGroup(ServerRoster serverRoster, + long priorityTimeout, + long cachingTimeout, + GetTableNameWithPriorityCallback callable) + throws ObTableNotExistException { + ObServerAddr addr = serverRoster.getServer(priorityTimeout, cachingTimeout); + try { + String realTableName = callable.execute(addr); + serverRoster.resetPriority(addr); + return realTableName; + } catch (ObTableNotExistException e) { + RUNTIME.error("callTableEntryNameWithPriority meet exception", e); + serverRoster.downgradePriority(addr); + throw e; + } catch (Throwable t) { + RUNTIME.error("callTableEntryNameWithPriority meet exception", t); + throw t; + } + } + + /* + * load Table Name With table Group + */ + public static String loadTableNameWithGroup(final ServerRoster serverRoster, + final TableEntryKey key, + final long connectTimeout, + final long socketTimeout, + final long priorityTimeout, + final long cachingTimeout, + final ObUserAuth sysUA) + throws ObTableNotExistException { + return callTableNameWithGroup(serverRoster, priorityTimeout, cachingTimeout, + new GetTableNameWithPriorityCallback() { + @Override + String execute(ObServerAddr obServerAddr) throws ObTableEntryRefreshException { + return callTableNameRefresh(obServerAddr, key, connectTimeout, socketTimeout, + sysUA, true, new GetTableNameCallback() { + @Override + String execute(Connection connection) + throws ObTableEntryRefreshException { + return getTableNameFromRemote(connection, key); + } + }); + } + }); + } + + /* + * get TableName From Remote with Group + */ + private static String getTableNameFromRemote(Connection connection, TableEntryKey key) + throws ObTableNotExistException { + PreparedStatement ps = null; + ResultSet rs = null; + String realTableName = ""; + int tenantId = -1; + try { + if (ObGlobal.obVsnMajor() == 0) { + getObVersionFromRemote(connection); + } + tenantId = checkTenantExistFromRemote(connection, key); + if (ObGlobal.obVsnMajor() >= 4) { + ps = connection.prepareStatement(TABLE_GROUP_GET_TABLE_NAME_V4); + ps.setString(1, key.getTableName()); + ps.setString(2, String.valueOf(tenantId)); + } else { + throw new ObTableNotExistException(format( + "fail to get table name from remote in low version than 4, key=%s", key)); + } + rs = ps.executeQuery(); + while (rs.next()) { + realTableName = rs.getString("table_name"); + } + + + } catch (ObTableNotExistException e) { + // avoid to refresh meta for ObTableNotExistException + RUNTIME.error("getTableNameFromRemote meet exception", e); + throw e; + } catch (Exception e) { + RUNTIME.error("getTableNameFromRemote meet exception", e); + throw new ObTableNotExistException(format( + "fail to get table name from remote, key=%s", key), e); + } finally { + try { + if (null != rs) { + rs.close(); + } + if (null != ps) { + ps.close(); + } + } catch (SQLException e) { + // ignore + } + } + return realTableName; + } + /* * Load table entry randomly. */ @@ -486,13 +639,15 @@ private static void getObVersionFromRemote(Connection connection) } // check tenant exist or not - private static void checkTenantExistFromRemote(Connection connection, TableEntryKey key) + private static int checkTenantExistFromRemote(Connection connection, TableEntryKey key) throws ObTableEntryRefreshException { try (PreparedStatement ps = connection.prepareStatement(OB_TENANT_EXIST_SQL)) { ps.setString(1, key.getTenantName()); try (ResultSet rs = ps.executeQuery()) { if (!rs.next()) { throw new ObTableEntryRefreshException("fail to get tenant id from remote"); + } else { + return rs.getInt("tenant_id"); } } catch (Exception e) { throw new ObTableEntryRefreshException("fail to get tenant id from remote", e); @@ -508,11 +663,12 @@ private static TableEntry getTableEntryFromRemote(Connection connection, TableEn PreparedStatement ps = null; ResultSet rs = null; TableEntry tableEntry; + int tenantId = -1; try { if (ObGlobal.obVsnMajor() == 0) { getObVersionFromRemote(connection); } - checkTenantExistFromRemote(connection, key); + tenantId = checkTenantExistFromRemote(connection, key); if (ObGlobal.obVsnMajor() >= 4) { if (key.getTableName().equals(Constants.ALL_DUMMY_TABLE)) { ps = connection.prepareStatement(PROXY_DUMMY_LOCATION_SQL_V4); diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/ObTableQuery.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/ObTableQuery.java index 1dfd2a5c..08e70536 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/ObTableQuery.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/ObTableQuery.java @@ -463,6 +463,13 @@ public void setScanRangeColumns(String... scanRangeColumns) { } } + /* + * check table name whether group name + */ + public boolean isTableGroupName(String tabName) { + return !tabName.contains("$"); + } + public void setScanRangeColumns(List scanRangeColumns) { this.scanRangeColumns = scanRangeColumns; } diff --git a/src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryStreamResult.java b/src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryStreamResult.java index 4e97e947..e07c8022 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryStreamResult.java +++ b/src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryStreamResult.java @@ -24,9 +24,12 @@ import com.alipay.oceanbase.rpc.location.model.ObServerRoute; import com.alipay.oceanbase.rpc.location.model.partition.ObPair; import com.alipay.oceanbase.rpc.protocol.payload.ObPayload; +import com.alipay.oceanbase.rpc.protocol.payload.ResultCodes; import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.AbstractQueryStreamResult; +import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObTableQueryRequest; import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObTableQueryResult; import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.syncquery.ObTableQueryAsyncResult; +import com.alipay.oceanbase.rpc.exception.ObTableNotExistException; import com.alipay.oceanbase.rpc.table.ObTable; import com.alipay.oceanbase.rpc.table.ObTableParam; import com.alipay.oceanbase.rpc.util.TableClientLoggerFactory; @@ -50,6 +53,8 @@ protected ObTableQueryResult execute(ObPair partIdWithIndex, long startExecute = System.currentTimeMillis(); Set failedServerList = null; ObServerRoute route = null; + ObTableParam tableParam = null; + boolean isNeedRefreshTableGroup = false; while (true) { client.checkStatus(); long currentExecute = System.currentTimeMillis(); @@ -76,10 +81,16 @@ protected ObTableQueryResult execute(ObPair partIdWithIndex, if (failedServerList != null) { route.setBlackList(failedServerList); } - subObTable = client - .getTable(indexTableName, partIdWithIndex.getLeft(), - needRefreshTableEntry, client.isTableEntryRefreshIntervalWait(), - route).getRight().getObTable(); + tableParam = client.getTable(indexTableName, partIdWithIndex.getLeft(), + needRefreshTableEntry, client.isTableEntryRefreshIntervalWait(), + route).getRight(); + subObTable = tableParam.getObTable(); + if (client.isTableGroup() && ((ObTableQueryRequest) request).getTableQuery().isHbaseQuery() + && isNeedRefreshTableGroup) { // table Group retry has change + ((ObTableQueryRequest) request).setTableName(indexTableName); + ((ObTableQueryRequest) request).setPartitionId(tableParam.getPartitionId()); + ((ObTableQueryRequest) request).setTableId(tableParam.getTableId()); + } } } result = subObTable.execute(request); @@ -125,24 +136,40 @@ protected ObTableQueryResult execute(ObPair partIdWithIndex, indexTableName, partIdWithIndex.getLeft(), e.getMessage(), e); throw e; } - } else if (e instanceof ObTableException - && ((ObTableException) e).isNeedRefreshTableEntry()) { - needRefreshTableEntry = true; - logger - .warn( - "tablename:{} partition id:{} stream query refresh table while meet Exception needing refresh, errorCode: {}", - indexTableName, partIdWithIndex.getLeft(), - ((ObTableException) e).getErrorCode(), e); - if (client.isRetryOnChangeMasterTimes() - && (tryTimes - 1) < client.getRuntimeRetryTimes()) { + } else if (e instanceof ObTableException) { + if (((ObTableException) e).getErrorCode() == ResultCodes.OB_TABLE_NOT_EXIST.errorCode + && ((ObTableQueryRequest) request).getTableQuery().isHbaseQuery() && client.isTableGroup()) { + if (client.getTableGroupInverted().get(indexTableName) == null) { + throw new ObTableNotExistException("tableGroup not exists, TableName = " + indexTableName); + } else { + indexTableName = client.tryGetTableNameFromTableGroupCache(client.getTableGroupInverted().get(indexTableName), true); + tableName = indexTableName; + isNeedRefreshTableGroup = true; + } + + } + if (((ObTableException) e).isNeedRefreshTableEntry() || isNeedRefreshTableGroup) { + if (isNeedRefreshTableGroup) { + needRefreshTableEntry = false; + } else { + needRefreshTableEntry = true; + } logger - .warn( - "tablename:{} partition id:{} stream query retry while meet Exception needing refresh, errorCode: {} , retry times {}", - indexTableName, partIdWithIndex.getLeft(), - ((ObTableException) e).getErrorCode(), tryTimes, e); - } else { - client.calculateContinuousFailure(indexTableName, e.getMessage()); - throw e; + .warn( + "tablename:{} partition id:{} stream query refresh table while meet Exception needing refresh, errorCode: {}", + indexTableName, partIdWithIndex.getLeft(), + ((ObTableException) e).getErrorCode(), e); + if (client.isRetryOnChangeMasterTimes() + && (tryTimes - 1) < client.getRuntimeRetryTimes()) { + logger + .warn( + "tablename:{} partition id:{} stream query retry while meet Exception needing refresh, errorCode: {} , retry times {}", + indexTableName, partIdWithIndex.getLeft(), + ((ObTableException) e).getErrorCode(), tryTimes, e); + } else { + client.calculateContinuousFailure(indexTableName, e.getMessage()); + throw e; + } } } else { client.calculateContinuousFailure(indexTableName, e.getMessage()); diff --git a/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientQueryImpl.java b/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientQueryImpl.java index b3b4e7a5..93ad1073 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientQueryImpl.java +++ b/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientQueryImpl.java @@ -37,7 +37,7 @@ public class ObTableClientQueryImpl extends AbstractTableQueryImpl { - private final String tableName; + private String tableName; private final ObTableClient obTableClient; private Row rowKey; // only used by BatchOperation @@ -137,6 +137,10 @@ public ObTableClientQueryStreamResult executeInternal() throws Exception { tableQuery.addKeyRange(ObNewRange.getWholeRange()); } if (obTableClient.isOdpMode()) { + // TableGroup -> TableName + if (tableQuery.isHbaseQuery() && tableQuery.isTableGroupName(tableName)) { + throw new ObTableException("hbase in odp not support table group route"); + } if (tableQuery.getScanRangeColumns().isEmpty()) { if (tableQuery.getIndexName() != null && !tableQuery.getIndexName().equalsIgnoreCase("primary")) { @@ -147,6 +151,12 @@ public ObTableClientQueryStreamResult executeInternal() throws Exception { obTableClient.getOdpTable()))); } else { String indexName = tableQuery.getIndexName(); + + // TableGroup -> TableName + if (tableQuery.isHbaseQuery() && tableQuery.isTableGroupName(tableName)) { + tableName = obTableClient.tryGetTableNameFromTableGroupCache(tableName, false); + } + if (!this.obTableClient.isOdpMode()) { indexTableName = obTableClient.getIndexTableName(tableName, indexName, tableQuery.getScanRangeColumns()); @@ -236,6 +246,10 @@ public String getTableName() { return tableName; } + public void setTableName(String tabName) { + this.tableName = tabName; + } + /* * Get row key (used by BatchOperation) */ diff --git a/src/test/java/com/alipay/oceanbase/rpc/hbase/ObHTableOperationRequest.java b/src/test/java/com/alipay/oceanbase/rpc/hbase/ObHTableOperationRequest.java index 5b9e8420..d4daa46d 100644 --- a/src/test/java/com/alipay/oceanbase/rpc/hbase/ObHTableOperationRequest.java +++ b/src/test/java/com/alipay/oceanbase/rpc/hbase/ObHTableOperationRequest.java @@ -162,4 +162,67 @@ public ObTableAbstractOperationRequest obTableOperationRequest() { } + public ObTableAbstractOperationRequest obTableGroupOperationRequest() { + ObTableAbstractOperationRequest request = null; + if (timestamp == -1) { + timestamp = System.currentTimeMillis(); + } + ObTableQuery obTableQuery = null; + ObHTableFilter filter = null; + ObNewRange obNewRange = null; + switch (operationType) { + case GET: + request = new ObTableQueryRequest(); + obTableQuery = new ObTableQuery(); + obTableQuery.setIndexName("PRIMARY"); + filter = new ObHTableFilter(); + obTableQuery.sethTableFilter(filter); + + obNewRange = new ObNewRange(); + obNewRange + .setStartKey(ObRowKey.getInstance(rowKey, ObObj.getMin(), ObObj.getMin())); + obNewRange.setEndKey(ObRowKey.getInstance(rowKey, ObObj.getMax(), ObObj.getMax())); + obTableQuery.addKeyRange(obNewRange); + obTableQuery.addSelectColumn("K"); + obTableQuery.addSelectColumn("Q"); + obTableQuery.addSelectColumn("T"); + obTableQuery.addSelectColumn("V"); + + request.setTableName(getTableName()); + ((ObTableQueryRequest) request).setTableQuery(obTableQuery); + ((ObTableQueryRequest) request).setPartitionId(Constants.INVALID_TABLET_ID); + + break; + case SCAN: + request = new ObTableQueryRequest(); + obTableQuery = new ObTableQuery(); + obTableQuery.setIndexName("PRIMARY"); + filter = new ObHTableFilter(); + obTableQuery.sethTableFilter(filter); + + obNewRange = new ObNewRange(); + obNewRange + .setStartKey(ObRowKey.getInstance(rowKey, ObObj.getMin(), ObObj.getMin())); + obNewRange.setEndKey(ObRowKey.getInstance(rowKey, ObObj.getMax(), ObObj.getMax())); + obTableQuery.addKeyRange(obNewRange); + obTableQuery.addSelectColumn("K"); + obTableQuery.addSelectColumn("Q"); + obTableQuery.addSelectColumn("T"); + obTableQuery.addSelectColumn("V"); + + request.setTableName(getTableName()); + ((ObTableQueryRequest) request).setTableQuery(obTableQuery); + ((ObTableQueryRequest) request).setPartitionId(Constants.INVALID_TABLET_ID); + + break; + default: + throw new RuntimeException("operationType invalid: " + operationType); + } + + request.setEntityType(ObTableEntityType.HKV); + + return request; + + } + } diff --git a/src/test/java/com/alipay/oceanbase/rpc/hbase/ObHTableTest.java b/src/test/java/com/alipay/oceanbase/rpc/hbase/ObHTableTest.java index c1e07d85..f866854d 100644 --- a/src/test/java/com/alipay/oceanbase/rpc/hbase/ObHTableTest.java +++ b/src/test/java/com/alipay/oceanbase/rpc/hbase/ObHTableTest.java @@ -20,6 +20,8 @@ import com.alipay.oceanbase.rpc.ObTableClient; import com.alipay.oceanbase.rpc.exception.ObTableException; import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableOperationType; +import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObTableQueryRequest; +import com.alipay.oceanbase.rpc.stream.ObTableClientQueryStreamResult; import com.alipay.oceanbase.rpc.table.ObTable; import com.alipay.oceanbase.rpc.util.ObTableClientTestUtil; import org.junit.Before; @@ -30,6 +32,7 @@ public class ObHTableTest { private ObTable client; + private ObTableClient obTableClient; @BeforeClass static public void beforeTest() throws Exception { @@ -48,9 +51,9 @@ public void setup() throws Exception { throw new ObTableException("ODP Mode does not support this test"); } else { client = obTableClient - .getTable("test_varchar_table", new Object[] { "abc" }, true, true).getRight() - .getObTable(); - } + .getTable("test_varchar_table", new Object[] { "abc" }, true, true).getRight() + .getObTable(); + } } @Test @@ -90,4 +93,58 @@ PRIMARY KEY (`K`, `Q`, `T`) "qualifierName1".getBytes(), 12323121L }); } + public byte[] getFamilyFromTable(byte[] tableName) { + int index = 0; + for (int i = 0; i < tableName.length; i++) { + if (tableName[i] == '$') { + index = i; + break; + } + } + int len = tableName.length - index - 1; + // Family and column, return array size 2 + final byte [] result = new byte [len]; + System.arraycopy(tableName, index + 1 /* Skip delimiter */, result, 0, len); + return result; + } + + @Test + public void hbaseTableGroupTest() throws Exception { + /* + CREATE TABLEGROUP test SHARDING = 'ADAPTIVE'; + CREATE TABLE `test$family1` ( + `K` varbinary(1024) NOT NULL, + `Q` varbinary(256) NOT NULL, + `T` bigint(20) NOT NULL, + `V` varbinary(1024) DEFAULT NULL, + PRIMARY KEY (`K`, `Q`, `T`) + ) TABLEGROUP = test + PARTITION BY KEY(`K`) PARTITIONS 3; + */ + byte[] family = new byte[]{'n','u','l','l'}; + ObHTableOperationRequest hTableOperationRequestGet = new ObHTableOperationRequest(); + hTableOperationRequestGet.setOperationType(ObTableOperationType.GET); + hTableOperationRequestGet.setTableName("test"); + hTableOperationRequestGet.setRowKey("putKey".getBytes()); + + ObTableQueryRequest requestGet = (ObTableQueryRequest)hTableOperationRequestGet.obTableGroupOperationRequest(); + ObTableClientQueryStreamResult clientQueryStreamResultGet = (ObTableClientQueryStreamResult) obTableClient + .execute(requestGet); + + family = getFamilyFromTable(clientQueryStreamResultGet.getTableName().getBytes()); // change to family + // Thread.currentThread().sleep(30000); + // second test + ObHTableOperationRequest hTableOperationRequestScan = new ObHTableOperationRequest(); + hTableOperationRequestScan.setOperationType(ObTableOperationType.GET); + hTableOperationRequestScan.setTableName("test"); + hTableOperationRequestScan.setRowKey("putKey".getBytes()); + + ObTableQueryRequest requestScan = (ObTableQueryRequest)hTableOperationRequestScan.obTableGroupOperationRequest(); + ObTableClientQueryStreamResult clientQueryStreamResultScan = (ObTableClientQueryStreamResult) obTableClient + .execute(requestScan); + + family = getFamilyFromTable(clientQueryStreamResultScan.getTableName().getBytes()); // change to family + + } + }