diff --git a/src/main/java/com/alipay/oceanbase/rpc/ObClusterTableAsyncQuery.java b/src/main/java/com/alipay/oceanbase/rpc/ObClusterTableAsyncQuery.java index bd977fc5..f2bd0a62 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/ObClusterTableAsyncQuery.java +++ b/src/main/java/com/alipay/oceanbase/rpc/ObClusterTableAsyncQuery.java @@ -31,44 +31,44 @@ import com.alipay.oceanbase.rpc.table.api.TableQuery; public class ObClusterTableAsyncQuery extends AbstractTableQuery { - private final ObTableClientQueryAsyncImpl tableClientQuerySync; + private final ObTableClientQueryAsyncImpl tableClientQueryAsync; - ObClusterTableAsyncQuery(ObTableClientQueryAsyncImpl tableClientQuerySync) { - this.tableClientQuerySync = tableClientQuerySync; + ObClusterTableAsyncQuery(ObTableClientQueryAsyncImpl tableClientQueryAsync) { + this.tableClientQueryAsync = tableClientQueryAsync; } @Override public ObTableQuery getObTableQuery() { - return tableClientQuerySync.getObTableQuery(); + return tableClientQueryAsync.getObTableQuery(); } @Override public String getTableName() { - return tableClientQuerySync.getTableName(); + return tableClientQueryAsync.getTableName(); } @Override public QueryResultSet execute() throws Exception { - return tableClientQuerySync.execute(); + return tableClientQueryAsync.execute(); } @Override public QueryResultSet executeInit(ObPair entry) throws Exception { - return tableClientQuerySync.executeInit(entry); + return tableClientQueryAsync.executeInit(entry); } @Override public QueryResultSet executeNext(ObPair entry) throws Exception { - return tableClientQuerySync.executeNext(entry); + return tableClientQueryAsync.executeNext(entry); } ObTableClientQueryAsyncStreamResult executeInternal(ObQueryOperationType type) throws Exception { - return tableClientQuerySync.executeInternal(type); + return tableClientQueryAsync.executeInternal(type); } @Override public TableQuery select(String... columns) { - tableClientQuerySync.select(columns); + tableClientQueryAsync.select(columns); return this; } @@ -79,69 +79,69 @@ public TableQuery setKeys(String... keys) { @Override public TableQuery limit(int offset, int limit) { - tableClientQuerySync.limit(offset, limit); + tableClientQueryAsync.limit(offset, limit); return this; } @Override public TableQuery addScanRange(Object[] start, boolean startEquals, Object[] end, boolean endEquals) { - tableClientQuerySync.addScanRange(start, startEquals, end, endEquals); + tableClientQueryAsync.addScanRange(start, startEquals, end, endEquals); return this; } @Override public TableQuery addScanRangeStartsWith(Object[] start, boolean startEquals) { - tableClientQuerySync.addScanRangeStartsWith(start, startEquals); + tableClientQueryAsync.addScanRangeStartsWith(start, startEquals); return this; } @Override public TableQuery addScanRangeEndsWith(Object[] end, boolean endEquals) { - tableClientQuerySync.addScanRangeStartsWith(end, endEquals); + tableClientQueryAsync.addScanRangeStartsWith(end, endEquals); return this; } @Override public TableQuery scanOrder(boolean forward) { - tableClientQuerySync.scanOrder(forward); + tableClientQueryAsync.scanOrder(forward); return this; } @Override public TableQuery indexName(String indexName) { - tableClientQuerySync.indexName(indexName); + tableClientQueryAsync.indexName(indexName); return this; } @Override public TableQuery filterString(String filterString) { - tableClientQuerySync.filterString(filterString); + tableClientQueryAsync.filterString(filterString); return this; } @Override public TableQuery setHTableFilter(ObHTableFilter obHTableFilter) { - return tableClientQuerySync.setHTableFilter(obHTableFilter); + return tableClientQueryAsync.setHTableFilter(obHTableFilter); } @Override public TableQuery setBatchSize(int batchSize) { - return tableClientQuerySync.setBatchSize(batchSize); + return tableClientQueryAsync.setBatchSize(batchSize); } @Override public TableQuery setMaxResultSize(long maxResultSize) { - return tableClientQuerySync.setMaxResultSize(maxResultSize); + return tableClientQueryAsync.setMaxResultSize(maxResultSize); } @Override public void clear() { - tableClientQuerySync.clear(); + tableClientQueryAsync.clear(); } public void setEntityType(ObTableEntityType entityType) { super.setEntityType(entityType); - tableClientQuerySync.setEntityType(entityType); + tableClientQueryAsync.setEntityType(entityType); } } diff --git a/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java b/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java index fa2f4cf9..2b00cb49 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java +++ b/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java @@ -1691,8 +1691,8 @@ public TableQuery query(String tableName) { @Override public TableQuery queryByBatchV2(String tableName) { - ObTableClientQueryAsyncImpl querySync = new ObTableClientQueryAsyncImpl(tableName, this); - return new ObClusterTableAsyncQuery(querySync); + ObTableClientQueryAsyncImpl queryAsync = new ObTableClientQueryAsyncImpl(tableName, this); + return new ObClusterTableAsyncQuery(queryAsync); } @Override @@ -2534,11 +2534,11 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E tableQuery.setEntityType(request.getEntityType()); return new ObClusterTableQuery(tableQuery).executeInternal(); } else if (request instanceof ObTableQueryAsyncRequest) { - ObTableClientQueryAsyncImpl tableClientQuerySync = new ObTableClientQueryAsyncImpl( + ObTableClientQueryAsyncImpl tableClientQueryAsync = new ObTableClientQueryAsyncImpl( request.getTableName(), ((ObTableQueryAsyncRequest) request) .getObTableQueryRequest().getTableQuery(), this); - tableClientQuerySync.setEntityType(request.getEntityType()); - return new ObClusterTableAsyncQuery(tableClientQuerySync) + tableClientQueryAsync.setEntityType(request.getEntityType()); + return new ObClusterTableAsyncQuery(tableClientQueryAsync) .executeInternal(((ObTableQueryAsyncRequest) request).getQueryType()); } else if (request instanceof ObTableBatchOperationRequest) { ObTableClientBatchOpsImpl batchOps = new ObTableClientBatchOpsImpl( diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/ResultCodes.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/ResultCodes.java index 7cb48b8b..ede233ea 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/ResultCodes.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/ResultCodes.java @@ -727,7 +727,8 @@ public enum ResultCodes { OB_KV_ROWKEY_COUNT_NOT_MATCH(-10510), // OB_KV_COLUMN_TYPE_NOT_MATCH(-10511), // OB_KV_COLLATION_MISMATCH(-10512), // - OB_KV_SCAN_RANGE_MISSING(-10513); + OB_KV_SCAN_RANGE_MISSING(-10513), + OB_KV_FILTER_PARSE_ERROR(-10514); public final int errorCode; diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java index acc98122..a0a12dfb 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java @@ -174,7 +174,7 @@ protected ObTableQueryResult checkObTableQueryResult(Object result) { return (ObTableQueryResult) result; } - protected ObTableQueryAsyncResult checkObTableQuerySyncResult(Object result) { + protected ObTableQueryAsyncResult checkObTableQueryAsyncResult(Object result) { if (result == null) { throw new ObTableException("client get unexpected NULL result"); } @@ -278,18 +278,18 @@ protected void cacheStreamNext(ObPair partIdWithObTable, } } - private void cacheResultRows(ObTableQueryAsyncResult tableQuerySyncResult) { - cacheRows.addAll(tableQuerySyncResult.getAffectedEntity().getPropertiesRows()); - cacheProperties = tableQuerySyncResult.getAffectedEntity().getPropertiesNames(); + private void cacheResultRows(ObTableQueryAsyncResult tableQueryAsyncResult) { + cacheRows.addAll(tableQueryAsyncResult.getAffectedEntity().getPropertiesRows()); + cacheProperties = tableQueryAsyncResult.getAffectedEntity().getPropertiesNames(); } protected void cacheStreamNext(ObPair partIdWithObTable, - ObTableQueryAsyncResult tableQuerySyncResult) { - cacheResultRows(tableQuerySyncResult); - if (tableQuerySyncResult.getAffectedEntity().isStream() - && tableQuerySyncResult.getAffectedEntity().isStreamNext()) { + ObTableQueryAsyncResult tableQueryAsyncResult) { + cacheResultRows(tableQueryAsyncResult); + if (tableQueryAsyncResult.getAffectedEntity().isStream() + && tableQueryAsyncResult.getAffectedEntity().isStreamNext()) { partitionLastResult.addLast(new ObPair, ObTableQueryResult>( - partIdWithObTable, tableQuerySyncResult.getAffectedEntity())); + partIdWithObTable, tableQueryAsyncResult.getAffectedEntity())); } } diff --git a/src/main/java/com/alipay/oceanbase/rpc/stream/async/ObTableClientQueryAsyncStreamResult.java b/src/main/java/com/alipay/oceanbase/rpc/stream/async/ObTableClientQueryAsyncStreamResult.java index c95eccd5..4f47f5db 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/stream/async/ObTableClientQueryAsyncStreamResult.java +++ b/src/main/java/com/alipay/oceanbase/rpc/stream/async/ObTableClientQueryAsyncStreamResult.java @@ -100,7 +100,7 @@ protected ObTableQueryAsyncResult executeAsync(ObPair partId Thread.sleep(client.getRuntimeRetryInterval()); } - cacheStreamNext(partIdWithObTable, checkObTableQuerySyncResult(result)); + cacheStreamNext(partIdWithObTable, checkObTableQueryAsyncResult(result)); ObTableQueryAsyncResult obTableQueryAsyncResult = (ObTableQueryAsyncResult) result; if (obTableQueryAsyncResult.isEnd()) { diff --git a/src/main/java/com/alipay/oceanbase/rpc/stream/async/ObTableQueryAsyncStreamResult.java b/src/main/java/com/alipay/oceanbase/rpc/stream/async/ObTableQueryAsyncStreamResult.java index 77f206b7..ca0e3b60 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/stream/async/ObTableQueryAsyncStreamResult.java +++ b/src/main/java/com/alipay/oceanbase/rpc/stream/async/ObTableQueryAsyncStreamResult.java @@ -40,7 +40,7 @@ protected ObTableQueryAsyncResult executeAsync(ObPair partId ObPayload streamRequest) throws Exception { Object result = partIdWithObTable.getRight().getObTable().execute(streamRequest);//执行query start/ query next等等 - cacheStreamNext(partIdWithObTable, checkObTableQuerySyncResult(result)); + cacheStreamNext(partIdWithObTable, checkObTableQueryAsyncResult(result)); ObTableQueryAsyncResult obTableQueryAsyncResult = (ObTableQueryAsyncResult) result; isEnd = obTableQueryAsyncResult.isEnd(); diff --git a/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientQueryAsyncImpl.java b/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientQueryAsyncImpl.java index 8c9119c4..f7081f35 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientQueryAsyncImpl.java +++ b/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientQueryAsyncImpl.java @@ -117,11 +117,11 @@ public ObTableClientQueryAsyncStreamResult executeInternal(ObQueryOperationType obTableClientQueryASyncStreamResult.setClient(obTableClient); obTableClientQueryASyncStreamResult.init(type, sessionId); - QueryResultSet querySyncResultSet = new QueryResultSet(obTableClientQueryASyncStreamResult); + QueryResultSet queryAsyncResultSet = new QueryResultSet(obTableClientQueryASyncStreamResult); this.hasMore = !obTableClientQueryASyncStreamResult.isEnd(); - querySyncResultSet.setHasMore(this.hasMore); + queryAsyncResultSet.setHasMore(this.hasMore); obTableClientQueryASyncStreamResult.setHasMore(this.hasMore); - querySyncResultSet.setSessionId(obTableClientQueryASyncStreamResult.getSessionId()); + queryAsyncResultSet.setSessionId(obTableClientQueryASyncStreamResult.getSessionId()); this.sessionId = obTableClientQueryASyncStreamResult.getSessionId(); return obTableClientQueryASyncStreamResult; @@ -139,11 +139,11 @@ public ObTableClientQueryAsyncStreamResult executeInternal(ObQueryOperationType obTableClientQueryASyncStreamResult.setClient(obTableClient); obTableClientQueryASyncStreamResult.init(type, entry, sessionId); - QueryResultSet querySyncResultSet = new QueryResultSet(obTableClientQueryASyncStreamResult); + QueryResultSet queryAsyncResultSet = new QueryResultSet(obTableClientQueryASyncStreamResult); this.hasMore = !obTableClientQueryASyncStreamResult.isEnd(); - querySyncResultSet.setHasMore(this.hasMore); + queryAsyncResultSet.setHasMore(this.hasMore); obTableClientQueryASyncStreamResult.setHasMore(this.hasMore); - querySyncResultSet.setSessionId(obTableClientQueryASyncStreamResult.getSessionId()); + queryAsyncResultSet.setSessionId(obTableClientQueryASyncStreamResult.getSessionId()); this.sessionId = obTableClientQueryASyncStreamResult.getSessionId(); return obTableClientQueryASyncStreamResult; diff --git a/src/main/java/com/alipay/oceanbase/rpc/table/ObTableQueryAsyncImpl.java b/src/main/java/com/alipay/oceanbase/rpc/table/ObTableQueryAsyncImpl.java index cdb973c5..7192fd81 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/table/ObTableQueryAsyncImpl.java +++ b/src/main/java/com/alipay/oceanbase/rpc/table/ObTableQueryAsyncImpl.java @@ -94,12 +94,12 @@ public QueryResultSet execute(ObQueryOperationType type) throws Exception { obTableQueryAsyncStreamResult.init(type, sessionId); - QueryResultSet querySyncResultSet = new QueryResultSet(obTableQueryAsyncStreamResult); + QueryResultSet queryAsyncResultSet = new QueryResultSet(obTableQueryAsyncStreamResult); boolean hasMore = !obTableQueryAsyncStreamResult.isEnd(); - querySyncResultSet.setHasMore(hasMore); - querySyncResultSet.setSessionId(obTableQueryAsyncStreamResult.getSessionId()); + queryAsyncResultSet.setHasMore(hasMore); + queryAsyncResultSet.setSessionId(obTableQueryAsyncStreamResult.getSessionId()); this.sessionId = obTableQueryAsyncStreamResult.getSessionId(); - return querySyncResultSet; + return queryAsyncResultSet; } @Override diff --git a/src/test/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/ObbTableQuerySyncPayloadTest.java b/src/test/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/ObbTableQuerySyncPayloadTest.java index 5e3968bf..3417d23c 100644 --- a/src/test/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/ObbTableQuerySyncPayloadTest.java +++ b/src/test/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/ObbTableQuerySyncPayloadTest.java @@ -29,9 +29,9 @@ import static org.junit.Assert.assertEquals; -public class ObbTableQuerySyncPayloadTest { +public class ObbTableQueryAsyncPayloadTest { @Test - public void test_ObTableQuerySync() { + public void test_ObTableQueryAsync() { ObTableQuery obTableQuery = getObTableQuery(); byte[] bytes = obTableQuery.encode(); ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(); @@ -44,7 +44,7 @@ public void test_ObTableQuerySync() { } @Test - public void test_ObTableQuerySyncResult() { + public void test_ObTableQueryAsyncResult() { ObTableQueryAsyncResult obTableQueryAsyncResult = new ObTableQueryAsyncResult(); ObTableQueryResult obTableQueryResult = new ObTableQueryResult(); obTableQueryResult.setRowCount(0); @@ -88,9 +88,9 @@ private ObTableQuery getObTableQuery() { return obTableQuery; } - private void checkObTableQuery(ObTableQuery obTableQuerySync, ObTableQuery newObTableQuerySync) { - ObTableQuery obTableQuery = obTableQuerySync; - ObTableQuery obTableQuery1 = newObTableQuerySync; + private void checkObTableQuery(ObTableQuery obTableQueryAsync, ObTableQuery newObTableQueryAsync) { + ObTableQuery obTableQuery = obTableQueryAsync; + ObTableQuery obTableQuery1 = newObTableQueryAsync; checkObNewRange(obTableQuery.getKeyRanges().get(0), obTableQuery1.getKeyRanges().get(0)); assertEquals(obTableQuery.getSelectColumns().get(0), obTableQuery1.getSelectColumns() .get(0));