From 052e0de35d0e47e83c2f0046ef9472d27a6e9e81 Mon Sep 17 00:00:00 2001 From: foronedream Date: Mon, 18 Mar 2024 21:31:43 +0800 Subject: [PATCH] review commit edit --- .../alipay/oceanbase/rpc/ObTableClient.java | 71 +++++++----- .../oceanbase/rpc/location/LocationUtil.java | 101 +++++------------- .../ObTableClientQueryStreamResult.java | 40 +++---- .../oceanbase/rpc/hbase/ObHTableTest.java | 79 +++++++++++--- 4 files changed, 143 insertions(+), 148 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java b/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java index 41782e52..16216d53 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java +++ b/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java @@ -1291,45 +1291,46 @@ private TableEntry refreshTableEntry(TableEntry tableEntry, String tableName, bo /** * 根据 tableGroup 获取其中一个tableName - * @param realTableName - * @param tableName + * physicalTableName Complete table from table group + * @param physicalTableName + * @param tableGroupName * @return * @throws Exception */ - private String refreshTableName(String realTableName, String tableName) throws Exception { + private String refreshTableNameByTableGroup(String physicalTableName, String tableGroupName) throws Exception { TableEntryKey tableEntryKey = new TableEntryKey(clusterName, tenantName, database, - tableName); - String oldTableName = realTableName; + tableGroupName); + String oldTableName = physicalTableName; try { - realTableName = loadTableNameWithGroup(serverRoster, // + physicalTableName = loadTableNameWithGroupName(serverRoster, // tableEntryKey,// tableEntryAcquireConnectTimeout,// tableEntryAcquireSocketTimeout,// serverAddressPriorityTimeout,// serverAddressCachingTimeout, sysUA); } catch (ObTableNotExistException e) { - RUNTIME.error("refreshTableName from tableGroup meet exception", e); + RUNTIME.error("refreshTableNameByTableGroup from tableGroup meet exception", e); throw e; } catch (ObTableServerCacheExpiredException e) { RUNTIME.error("refreshTableEntry from tableGroup meet exception", e); throw e; } catch (Exception e) { - RUNTIME.error("refreshTableEntry from tableGroup meet exception", tableEntryKey, realTableName, e); + RUNTIME.error("refreshTableEntry from tableGroup meet exception", tableEntryKey, physicalTableName, e); throw new ObTableNotExistException(String.format( "failed to get table name key=%s original tableName=%s ", tableEntryKey, - realTableName), e); + physicalTableName), e); } if (!TableGroupInverted.isEmpty() && TableGroupInverted.containsKey(oldTableName)) { - TableGroupInverted.remove(oldTableName, tableName); + TableGroupInverted.remove(oldTableName, tableGroupName); } - TableGroupCache.put(tableName, realTableName); - TableGroupInverted.put(realTableName, tableName); + TableGroupCache.put(tableGroupName, physicalTableName); + TableGroupInverted.put(physicalTableName, tableGroupName); if (logger.isInfoEnabled()) { logger.info( "get table name from tableGroup, dataSource: {}, tableName: {}, refresh: {} key:{} realTableName:{} ", - dataSourceName, tableName, true, tableEntryKey, realTableName); + dataSourceName, tableGroupName, true, tableEntryKey, physicalTableName); } - return realTableName; + return physicalTableName; } /** @@ -1728,21 +1729,21 @@ public List> getTables(String tableName, Object[] sta /** * get table name with table group - * @param tableName + * @param tableGroupName * @param refresh * @return * @throws Exception */ - public String tryGetTableNameFromTableGroupCache(final String tableName, final boolean refresh) throws Exception { - String realTableName = TableGroupCache.get(tableName); // tableGroup -> Table + public String tryGetTableNameFromTableGroupCache(final String tableGroupName, final boolean refresh) throws Exception { + String physicalTableName = TableGroupCache.get(tableGroupName); // tableGroup -> Table // get tableName from cache - if (realTableName != null && !realTableName.isEmpty() && !refresh) { - return realTableName; + if (physicalTableName != null && !refresh) { + return physicalTableName; } // not find in cache, should get tableName from observer Lock tempLock = new ReentrantLock(); - Lock lock = TableGroupCacheLocks.putIfAbsent(tableName, tempLock); + Lock lock = TableGroupCacheLocks.putIfAbsent(tableGroupName, tempLock); lock = (lock == null) ? tempLock : lock; // check the first lock // attempt lock the refreshing action, avoiding concurrent refreshing @@ -1751,29 +1752,29 @@ public String tryGetTableNameFromTableGroupCache(final String tableName, final b if (!acquired) { String errMsg = "try to lock tableGroup inflect timeout " + "dataSource:" - + dataSourceName + " ,tableName:" + tableName + + dataSourceName + " ,tableName:" + tableGroupName + " , timeout:" + metadataRefreshLockTimeout + "."; RUNTIME.error(errMsg); throw new ObTableEntryRefreshException(errMsg); } try { - String newRealTableName = TableGroupCache.get(tableName); - if (((realTableName == null || realTableName.isEmpty()) && (newRealTableName == null || newRealTableName.isEmpty())) - || (refresh && newRealTableName.equalsIgnoreCase(realTableName))) { + String newPhyTableName = TableGroupCache.get(tableGroupName); + if (((physicalTableName == null) && (newPhyTableName == null)) + || (refresh && newPhyTableName.equalsIgnoreCase(physicalTableName))) { if (logger.isInfoEnabled()) { - if (realTableName != null && !realTableName.isEmpty()) { + if (physicalTableName != null) { logger.info( "realTableName need refresh, create new table entry, tablename: {}", - tableName); + tableGroupName); } else { logger.info("realTableName not exist, create new table entry, tablename: {}", - tableName); + tableGroupName); } } try { - return refreshTableName(realTableName, tableName); + return refreshTableNameByTableGroup(physicalTableName, tableGroupName); } catch (ObTableNotExistException e) { RUNTIME.error("getOrRefreshTableName from TableGroup meet exception", e); throw e; @@ -1793,7 +1794,7 @@ public String tryGetTableNameFromTableGroupCache(final String tableName, final b logger.info("refresh table Name from TableGroup failure"); } } - return newRealTableName; + return newPhyTableName; } finally { lock.unlock(); } @@ -3226,6 +3227,18 @@ public ConcurrentHashMap getTableGroupCache() { return TableGroupCache; } + /** + * get table route fail than clear table group message + * @param phyTableName + * @param tableGroupName + */ + public void eraseTableGroupFromCache(String tableGroupName) { + // clear table group cache + TableGroupInverted.remove(TableGroupCache.get(tableGroupName)); + TableGroupCache.remove(tableGroupName); + TableGroupCacheLocks.remove(tableGroupName); + } + /* * check table name whether group name */ diff --git a/src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java b/src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java index a4fc3088..e3444559 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java +++ b/src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java @@ -441,35 +441,36 @@ TableEntry execute(Connection connection) }); } - private abstract static class GetTableNameWithPriorityCallback { - abstract T execute(ObServerAddr obServerAddr) throws ObTableNotExistException; - } - - private abstract static class GetTableNameCallback { - abstract T execute(Connection connection) throws ObTableNotExistException; - } - /* - * call TableName with group + * load Table Name With table Group */ - private static String callTableNameRefresh(ObServerAddr obServerAddr, TableEntryKey key, - long connectTimeout, long socketTimeout, - ObUserAuth sysUA, boolean initialized, - GetTableNameCallback callback) + public static String loadTableNameWithGroupName(final ServerRoster serverRoster, + final TableEntryKey key, + final long connectTimeout, + final long socketTimeout, + final long priorityTimeout, + final long cachingTimeout, + final ObUserAuth sysUA) throws ObTableNotExistException { - String url = formatObServerUrl(obServerAddr, connectTimeout, socketTimeout); Connection connection = null; - String realTableName; + String realTableName = ""; + String url = ""; + ObServerAddr addr = serverRoster.getServer(priorityTimeout, cachingTimeout); try { + url = formatObServerUrl(addr, connectTimeout, socketTimeout); connection = getMetaRefreshConnection(url, sysUA); - realTableName = callback.execute(connection); + realTableName = getTableNameByGroupNameFromRemote(connection, key); + serverRoster.resetPriority(addr); } catch (ObTableNotExistException e) { - // avoid to refresh meta for ObTableNotExistException - RUNTIME.error("callTableName meet exception", e); + RUNTIME.error("callTableEntryNameWithPriority meet exception", e); + serverRoster.downgradePriority(addr); throw e; } catch (Exception e) { throw new ObTableNotExistException(format( "fail to get table name from remote url=%s, key=%s", url, key), e); + } catch (Throwable t) { + RUNTIME.error("callTableEntryNameWithPriority meet exception", t); + throw t; } finally { try { if (null != connection) { @@ -479,70 +480,18 @@ private static String callTableNameRefresh(ObServerAddr obServerAddr, TableEntry // ignore } } - if (realTableName != null && !realTableName.isEmpty()) { return realTableName; } else { - throw new ObTableNotExistException("table name is invalid, addr = " + obServerAddr - + " key =" + key + " tableName =" + realTableName); + throw new ObTableNotExistException("table name is invalid, addr = " + addr + " key =" + key + " tableName =" + realTableName); } - - } - - /* - * call table name with group - */ - private static String callTableNameWithGroup(ServerRoster serverRoster, - long priorityTimeout, - long cachingTimeout, - GetTableNameWithPriorityCallback callable) - throws ObTableNotExistException { - ObServerAddr addr = serverRoster.getServer(priorityTimeout, cachingTimeout); - try { - String realTableName = callable.execute(addr); - serverRoster.resetPriority(addr); - return realTableName; - } catch (ObTableNotExistException e) { - RUNTIME.error("callTableEntryNameWithPriority meet exception", e); - serverRoster.downgradePriority(addr); - throw e; - } catch (Throwable t) { - RUNTIME.error("callTableEntryNameWithPriority meet exception", t); - throw t; - } - } - - /* - * load Table Name With table Group - */ - public static String loadTableNameWithGroup(final ServerRoster serverRoster, - final TableEntryKey key, - final long connectTimeout, - final long socketTimeout, - final long priorityTimeout, - final long cachingTimeout, - final ObUserAuth sysUA) - throws ObTableNotExistException { - return callTableNameWithGroup(serverRoster, priorityTimeout, cachingTimeout, - new GetTableNameWithPriorityCallback() { - @Override - String execute(ObServerAddr obServerAddr) throws ObTableEntryRefreshException { - return callTableNameRefresh(obServerAddr, key, connectTimeout, socketTimeout, - sysUA, true, new GetTableNameCallback() { - @Override - String execute(Connection connection) - throws ObTableEntryRefreshException { - return getTableNameFromRemote(connection, key); - } - }); - } - }); + } /* * get TableName From Remote with Group */ - private static String getTableNameFromRemote(Connection connection, TableEntryKey key) + private static String getTableNameByGroupNameFromRemote(Connection connection, TableEntryKey key) throws ObTableNotExistException { PreparedStatement ps = null; ResultSet rs = null; @@ -565,14 +514,12 @@ private static String getTableNameFromRemote(Connection connection, TableEntryKe while (rs.next()) { realTableName = rs.getString("table_name"); } - - } catch (ObTableNotExistException e) { // avoid to refresh meta for ObTableNotExistException - RUNTIME.error("getTableNameFromRemote meet exception", e); + RUNTIME.error("getTableNameByGroupNameFromRemote meet exception", e); throw e; } catch (Exception e) { - RUNTIME.error("getTableNameFromRemote meet exception", e); + RUNTIME.error("getTableNameByGroupNameFromRemote meet exception", e); throw new ObTableNotExistException(format( "fail to get table name from remote, key=%s", key), e); } finally { diff --git a/src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryStreamResult.java b/src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryStreamResult.java index 11dc2000..5214b3b8 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryStreamResult.java +++ b/src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryStreamResult.java @@ -52,8 +52,6 @@ protected ObTableQueryResult execute(ObPair partIdWithIndex, long startExecute = System.currentTimeMillis(); Set failedServerList = null; ObServerRoute route = null; - ObTableParam tableParam = null; - boolean isNeedRefreshTableGroupName = false; while (true) { client.checkStatus(); long currentExecute = System.currentTimeMillis(); @@ -80,19 +78,10 @@ protected ObTableQueryResult execute(ObPair partIdWithIndex, if (failedServerList != null) { route.setBlackList(failedServerList); } - if (isNeedRefreshTableGroupName) { - // if input is table group, retry should replace table name - indexTableName = client.tryGetTableNameFromTableGroupCache(client.getTableGroupInverted().get(indexTableName), true); - tableName = indexTableName; - } - tableParam = client.getTable(indexTableName, partIdWithIndex.getLeft(), - needRefreshTableEntry, client.isTableEntryRefreshIntervalWait(), - route).getRight(); - subObTable = tableParam.getObTable(); - if (isNeedRefreshTableGroupName) { - ((ObTableQueryRequest) request).setPartitionId(tableParam.getPartitionId()); - ((ObTableQueryRequest) request).setTableId(tableParam.getTableId()); - } + subObTable = client + .getTable(indexTableName, partIdWithIndex.getLeft(), + needRefreshTableEntry, client.isTableEntryRefreshIntervalWait(), + route).getRight().getObTable(); } } result = subObTable.execute(request); @@ -141,19 +130,13 @@ protected ObTableQueryResult execute(ObPair partIdWithIndex, } else if (e instanceof ObTableException) { if ((((ObTableException) e).getErrorCode() == ResultCodes.OB_TABLE_NOT_EXIST.errorCode || ((ObTableException) e).getErrorCode() == ResultCodes.OB_NOT_SUPPORTED.errorCode) - && ((ObTableQueryRequest) request).getTableQuery().isHbaseQuery()) { - if (client.getTableGroupInverted().get(indexTableName) == null) { - // not use table group - isNeedRefreshTableGroupName = false; - } else { - isNeedRefreshTableGroupName = true; - } - + && ((ObTableQueryRequest) request).getTableQuery().isHbaseQuery() + && client.getTableGroupInverted().get(indexTableName) != null) { + // table not exists && hbase mode && table group exists , three condition both + client.eraseTableGroupFromCache(tableName); } - if (((ObTableException) e).isNeedRefreshTableEntry() || isNeedRefreshTableGroupName) { - if (!isNeedRefreshTableGroupName) { - needRefreshTableEntry = true; - } + if (((ObTableException) e).isNeedRefreshTableEntry()) { + needRefreshTableEntry = true; logger .warn( "tablename:{} partition id:{} stream query refresh table while meet Exception needing refresh, errorCode: {}", @@ -170,6 +153,9 @@ protected ObTableQueryResult execute(ObPair partIdWithIndex, client.calculateContinuousFailure(indexTableName, e.getMessage()); throw e; } + } else { + client.calculateContinuousFailure(indexTableName, e.getMessage()); + throw e; } } else { client.calculateContinuousFailure(indexTableName, e.getMessage()); diff --git a/src/test/java/com/alipay/oceanbase/rpc/hbase/ObHTableTest.java b/src/test/java/com/alipay/oceanbase/rpc/hbase/ObHTableTest.java index 16b4d03a..862b6049 100644 --- a/src/test/java/com/alipay/oceanbase/rpc/hbase/ObHTableTest.java +++ b/src/test/java/com/alipay/oceanbase/rpc/hbase/ObHTableTest.java @@ -19,6 +19,7 @@ import com.alipay.oceanbase.rpc.ObTableClient; import com.alipay.oceanbase.rpc.exception.ObTableException; +import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj; import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableOperationType; import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObTableQueryRequest; import com.alipay.oceanbase.rpc.stream.ObTableClientQueryStreamResult; @@ -28,6 +29,8 @@ import org.junit.BeforeClass; import org.junit.Test; +import java.util.List; + import static org.junit.Assert.assertFalse; public class ObHTableTest { @@ -38,6 +41,7 @@ public class ObHTableTest { static public void beforeTest() throws Exception { ObTableClient obTableClient = ObTableClientTestUtil.newTestClient(); obTableClient.init(); + obTableClient.setRunningMode(ObTableClient.RunningMode.HBASE); assertFalse(obTableClient.isOdpMode()); } @@ -45,14 +49,14 @@ static public void beforeTest() throws Exception { @Before public void setup() throws Exception { ObTableClient obTableClient = ObTableClientTestUtil.newTestClient(); - obTableClient.init(); + obTableClient.init(); if (obTableClient.isOdpMode()) { obTableClient.close(); throw new ObTableException("ODP Mode does not support this test"); } else { client = obTableClient - .getTable("test_varchar_table", new Object[] { "abc" }, true, true).getRight() - .getObTable(); + .getTable("test_varchar_table", new Object[] { "abc" }, true, true).getRight() + .getObTable(); this.obTableClient = obTableClient; } } @@ -94,12 +98,61 @@ PRIMARY KEY (`K`, `Q`, `T`) "qualifierName1".getBytes(), 12323121L }); } - public byte[] getFamilyFromTableName(String str) { - int index = str.indexOf('$'); - if (index != -1) { - return str.substring(index + 1).getBytes(); + public byte[][] extractFamilyFromQualifier(byte[] qualifier) throws Exception{ + int total_length = qualifier.length; + int familyLen = -1; + byte[][] familyAndQualifier = new byte[2][]; + + for (int i = 0; i < total_length; i ++) { + if (qualifier[i] == '\0') { + familyLen = i; + break; + } + } + + byte[] family = new byte[familyLen]; + if (familyLen != -1) { + for (int i = 0; i < familyLen; i ++) { + family[i] = qualifier[i]; + } + } else { + throw new RuntimeException("can not get family name"); + } + familyAndQualifier[0] = family; + int qualifierLen = total_length - familyLen - 1; + byte[] newQualifier = new byte[qualifierLen]; + if (qualifierLen > 0) { + for (int i = 0; i < qualifierLen; i ++) { + newQualifier[i] = qualifier[i + familyLen + 1]; + } + } else { + throw new RuntimeException("can not get qualifier name"); + } + for (int i = 0; i < qualifierLen; i ++) { + newQualifier[i] = qualifier[i + familyLen + 1]; + } + familyAndQualifier[1] = newQualifier; + System.out.println(newQualifier); + System.out.println(family); + return familyAndQualifier; + } + + public void getKeyValueFromResult(ObTableClientQueryStreamResult clientQueryStreamResult, boolean isTableGroup, byte[] family) throws Exception { + for (List row : clientQueryStreamResult.getCacheRows()) { + System.out.println(new String((byte[]) row.get(0).getValue())); //K + // System.out.println(family); //family + if (isTableGroup) { + byte[][] familyAndQualifier = extractFamilyFromQualifier((byte[]) row.get(1).getValue()); + System.out.println(new String(familyAndQualifier[0])); //family + System.out.println(new String(familyAndQualifier[1])); //qualifier + } else { + System.out.println(family); //family + System.out.println(new String((byte[]) row.get(1).getValue())); //qualifier + } + + System.out.println((Long) row.get(2).getValue());//T + System.out.println(new String((byte[]) row.get(3).getValue()));//V } - return null; } @Test @@ -112,8 +165,7 @@ public void hbaseTableGroupTest() throws Exception { `T` bigint(20) NOT NULL, `V` varbinary(1024) DEFAULT NULL, PRIMARY KEY (`K`, `Q`, `T`) - ) TABLEGROUP = test - PARTITION BY KEY(`K`) PARTITIONS 3; + ) TABLEGROUP = test; */ byte[] family = new byte[]{}; ObHTableOperationRequest hTableOperationRequestGet = new ObHTableOperationRequest(); @@ -125,11 +177,10 @@ PRIMARY KEY (`K`, `Q`, `T`) ObTableClientQueryStreamResult clientQueryStreamResultGet = (ObTableClientQueryStreamResult) obTableClient .execute(requestGet); - family = getFamilyFromTableName(clientQueryStreamResultGet.getTableName()); // change to family // Thread.currentThread().sleep(30000); // second test ObHTableOperationRequest hTableOperationRequestScan = new ObHTableOperationRequest(); - hTableOperationRequestScan.setOperationType(ObTableOperationType.GET); + hTableOperationRequestScan.setOperationType(ObTableOperationType.SCAN); hTableOperationRequestScan.setTableName("test"); hTableOperationRequestScan.setRowKey("putKey".getBytes()); @@ -137,8 +188,6 @@ PRIMARY KEY (`K`, `Q`, `T`) ObTableClientQueryStreamResult clientQueryStreamResultScan = (ObTableClientQueryStreamResult) obTableClient .execute(requestScan); - family = getFamilyFromTableName(clientQueryStreamResultScan.getTableName()); // change to family - - } + } }