Skip to content

Commit

Permalink
[Chore] add error code for obkv filter parse error
Browse files Browse the repository at this point in the history
  • Loading branch information
shenyunlong committed Feb 20, 2024
1 parent 33d1dd0 commit 693f61d
Show file tree
Hide file tree
Showing 9 changed files with 56 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,44 +31,44 @@
import com.alipay.oceanbase.rpc.table.api.TableQuery;

public class ObClusterTableAsyncQuery extends AbstractTableQuery {
private final ObTableClientQueryAsyncImpl tableClientQuerySync;
private final ObTableClientQueryAsyncImpl tableClientQueryAsync;

ObClusterTableAsyncQuery(ObTableClientQueryAsyncImpl tableClientQuerySync) {
this.tableClientQuerySync = tableClientQuerySync;
ObClusterTableAsyncQuery(ObTableClientQueryAsyncImpl tableClientQueryAsync) {
this.tableClientQueryAsync = tableClientQueryAsync;
}

@Override
public ObTableQuery getObTableQuery() {
return tableClientQuerySync.getObTableQuery();
return tableClientQueryAsync.getObTableQuery();
}

@Override
public String getTableName() {
return tableClientQuerySync.getTableName();
return tableClientQueryAsync.getTableName();
}

@Override
public QueryResultSet execute() throws Exception {
return tableClientQuerySync.execute();
return tableClientQueryAsync.execute();
}

@Override
public QueryResultSet executeInit(ObPair<Long, ObTableParam> entry) throws Exception {
return tableClientQuerySync.executeInit(entry);
return tableClientQueryAsync.executeInit(entry);
}

@Override
public QueryResultSet executeNext(ObPair<Long, ObTableParam> entry) throws Exception {
return tableClientQuerySync.executeNext(entry);
return tableClientQueryAsync.executeNext(entry);
}

ObTableClientQueryAsyncStreamResult executeInternal(ObQueryOperationType type) throws Exception {
return tableClientQuerySync.executeInternal(type);
return tableClientQueryAsync.executeInternal(type);
}

@Override
public TableQuery select(String... columns) {
tableClientQuerySync.select(columns);
tableClientQueryAsync.select(columns);
return this;
}

Expand All @@ -79,69 +79,69 @@ public TableQuery setKeys(String... keys) {

@Override
public TableQuery limit(int offset, int limit) {
tableClientQuerySync.limit(offset, limit);
tableClientQueryAsync.limit(offset, limit);
return this;
}

@Override
public TableQuery addScanRange(Object[] start, boolean startEquals, Object[] end,
boolean endEquals) {
tableClientQuerySync.addScanRange(start, startEquals, end, endEquals);
tableClientQueryAsync.addScanRange(start, startEquals, end, endEquals);
return this;
}

@Override
public TableQuery addScanRangeStartsWith(Object[] start, boolean startEquals) {
tableClientQuerySync.addScanRangeStartsWith(start, startEquals);
tableClientQueryAsync.addScanRangeStartsWith(start, startEquals);
return this;
}

@Override
public TableQuery addScanRangeEndsWith(Object[] end, boolean endEquals) {
tableClientQuerySync.addScanRangeStartsWith(end, endEquals);
tableClientQueryAsync.addScanRangeStartsWith(end, endEquals);
return this;
}

@Override
public TableQuery scanOrder(boolean forward) {
tableClientQuerySync.scanOrder(forward);
tableClientQueryAsync.scanOrder(forward);
return this;
}

@Override
public TableQuery indexName(String indexName) {
tableClientQuerySync.indexName(indexName);
tableClientQueryAsync.indexName(indexName);
return this;
}

@Override
public TableQuery filterString(String filterString) {
tableClientQuerySync.filterString(filterString);
tableClientQueryAsync.filterString(filterString);
return this;
}

@Override
public TableQuery setHTableFilter(ObHTableFilter obHTableFilter) {
return tableClientQuerySync.setHTableFilter(obHTableFilter);
return tableClientQueryAsync.setHTableFilter(obHTableFilter);
}

@Override
public TableQuery setBatchSize(int batchSize) {
return tableClientQuerySync.setBatchSize(batchSize);
return tableClientQueryAsync.setBatchSize(batchSize);
}

@Override
public TableQuery setMaxResultSize(long maxResultSize) {
return tableClientQuerySync.setMaxResultSize(maxResultSize);
return tableClientQueryAsync.setMaxResultSize(maxResultSize);
}

@Override
public void clear() {
tableClientQuerySync.clear();
tableClientQueryAsync.clear();
}

public void setEntityType(ObTableEntityType entityType) {
super.setEntityType(entityType);
tableClientQuerySync.setEntityType(entityType);
tableClientQueryAsync.setEntityType(entityType);
}
}
10 changes: 5 additions & 5 deletions src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -1691,8 +1691,8 @@ public TableQuery query(String tableName) {

@Override
public TableQuery queryByBatchV2(String tableName) {
ObTableClientQueryAsyncImpl querySync = new ObTableClientQueryAsyncImpl(tableName, this);
return new ObClusterTableAsyncQuery(querySync);
ObTableClientQueryAsyncImpl queryAsync = new ObTableClientQueryAsyncImpl(tableName, this);
return new ObClusterTableAsyncQuery(queryAsync);
}

@Override
Expand Down Expand Up @@ -2534,11 +2534,11 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E
tableQuery.setEntityType(request.getEntityType());
return new ObClusterTableQuery(tableQuery).executeInternal();
} else if (request instanceof ObTableQueryAsyncRequest) {
ObTableClientQueryAsyncImpl tableClientQuerySync = new ObTableClientQueryAsyncImpl(
ObTableClientQueryAsyncImpl tableClientQueryAsync = new ObTableClientQueryAsyncImpl(
request.getTableName(), ((ObTableQueryAsyncRequest) request)
.getObTableQueryRequest().getTableQuery(), this);
tableClientQuerySync.setEntityType(request.getEntityType());
return new ObClusterTableAsyncQuery(tableClientQuerySync)
tableClientQueryAsync.setEntityType(request.getEntityType());
return new ObClusterTableAsyncQuery(tableClientQueryAsync)
.executeInternal(((ObTableQueryAsyncRequest) request).getQueryType());
} else if (request instanceof ObTableBatchOperationRequest) {
ObTableClientBatchOpsImpl batchOps = new ObTableClientBatchOpsImpl(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -727,7 +727,8 @@ public enum ResultCodes {
OB_KV_ROWKEY_COUNT_NOT_MATCH(-10510), //
OB_KV_COLUMN_TYPE_NOT_MATCH(-10511), //
OB_KV_COLLATION_MISMATCH(-10512), //
OB_KV_SCAN_RANGE_MISSING(-10513);
OB_KV_SCAN_RANGE_MISSING(-10513),
OB_KV_FILTER_PARSE_ERROR(-10514);

public final int errorCode;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ protected ObTableQueryResult checkObTableQueryResult(Object result) {
return (ObTableQueryResult) result;
}

protected ObTableQueryAsyncResult checkObTableQuerySyncResult(Object result) {
protected ObTableQueryAsyncResult checkObTableQueryAsyncResult(Object result) {
if (result == null) {
throw new ObTableException("client get unexpected NULL result");
}
Expand Down Expand Up @@ -278,18 +278,18 @@ protected void cacheStreamNext(ObPair<Long, ObTableParam> partIdWithObTable,
}
}

private void cacheResultRows(ObTableQueryAsyncResult tableQuerySyncResult) {
cacheRows.addAll(tableQuerySyncResult.getAffectedEntity().getPropertiesRows());
cacheProperties = tableQuerySyncResult.getAffectedEntity().getPropertiesNames();
private void cacheResultRows(ObTableQueryAsyncResult tableQueryAsyncResult) {
cacheRows.addAll(tableQueryAsyncResult.getAffectedEntity().getPropertiesRows());
cacheProperties = tableQueryAsyncResult.getAffectedEntity().getPropertiesNames();
}

protected void cacheStreamNext(ObPair<Long, ObTableParam> partIdWithObTable,
ObTableQueryAsyncResult tableQuerySyncResult) {
cacheResultRows(tableQuerySyncResult);
if (tableQuerySyncResult.getAffectedEntity().isStream()
&& tableQuerySyncResult.getAffectedEntity().isStreamNext()) {
ObTableQueryAsyncResult tableQueryAsyncResult) {
cacheResultRows(tableQueryAsyncResult);
if (tableQueryAsyncResult.getAffectedEntity().isStream()
&& tableQueryAsyncResult.getAffectedEntity().isStreamNext()) {
partitionLastResult.addLast(new ObPair<ObPair<Long, ObTableParam>, ObTableQueryResult>(
partIdWithObTable, tableQuerySyncResult.getAffectedEntity()));
partIdWithObTable, tableQueryAsyncResult.getAffectedEntity()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ protected ObTableQueryAsyncResult executeAsync(ObPair<Long, ObTableParam> partId
Thread.sleep(client.getRuntimeRetryInterval());
}

cacheStreamNext(partIdWithObTable, checkObTableQuerySyncResult(result));
cacheStreamNext(partIdWithObTable, checkObTableQueryAsyncResult(result));

ObTableQueryAsyncResult obTableQueryAsyncResult = (ObTableQueryAsyncResult) result;
if (obTableQueryAsyncResult.isEnd()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ protected ObTableQueryAsyncResult executeAsync(ObPair<Long, ObTableParam> partId
ObPayload streamRequest) throws Exception {
Object result = partIdWithObTable.getRight().getObTable().execute(streamRequest);//执行query start/ query next等等

cacheStreamNext(partIdWithObTable, checkObTableQuerySyncResult(result));
cacheStreamNext(partIdWithObTable, checkObTableQueryAsyncResult(result));

ObTableQueryAsyncResult obTableQueryAsyncResult = (ObTableQueryAsyncResult) result;
isEnd = obTableQueryAsyncResult.isEnd();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,11 @@ public ObTableClientQueryAsyncStreamResult executeInternal(ObQueryOperationType
obTableClientQueryASyncStreamResult.setClient(obTableClient);
obTableClientQueryASyncStreamResult.init(type, sessionId);

QueryResultSet querySyncResultSet = new QueryResultSet(obTableClientQueryASyncStreamResult);
QueryResultSet queryAsyncResultSet = new QueryResultSet(obTableClientQueryASyncStreamResult);
this.hasMore = !obTableClientQueryASyncStreamResult.isEnd();
querySyncResultSet.setHasMore(this.hasMore);
queryAsyncResultSet.setHasMore(this.hasMore);
obTableClientQueryASyncStreamResult.setHasMore(this.hasMore);
querySyncResultSet.setSessionId(obTableClientQueryASyncStreamResult.getSessionId());
queryAsyncResultSet.setSessionId(obTableClientQueryASyncStreamResult.getSessionId());
this.sessionId = obTableClientQueryASyncStreamResult.getSessionId();

return obTableClientQueryASyncStreamResult;
Expand All @@ -139,11 +139,11 @@ public ObTableClientQueryAsyncStreamResult executeInternal(ObQueryOperationType
obTableClientQueryASyncStreamResult.setClient(obTableClient);
obTableClientQueryASyncStreamResult.init(type, entry, sessionId);

QueryResultSet querySyncResultSet = new QueryResultSet(obTableClientQueryASyncStreamResult);
QueryResultSet queryAsyncResultSet = new QueryResultSet(obTableClientQueryASyncStreamResult);
this.hasMore = !obTableClientQueryASyncStreamResult.isEnd();
querySyncResultSet.setHasMore(this.hasMore);
queryAsyncResultSet.setHasMore(this.hasMore);
obTableClientQueryASyncStreamResult.setHasMore(this.hasMore);
querySyncResultSet.setSessionId(obTableClientQueryASyncStreamResult.getSessionId());
queryAsyncResultSet.setSessionId(obTableClientQueryASyncStreamResult.getSessionId());
this.sessionId = obTableClientQueryASyncStreamResult.getSessionId();

return obTableClientQueryASyncStreamResult;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,12 @@ public QueryResultSet execute(ObQueryOperationType type) throws Exception {

obTableQueryAsyncStreamResult.init(type, sessionId);

QueryResultSet querySyncResultSet = new QueryResultSet(obTableQueryAsyncStreamResult);
QueryResultSet queryAsyncResultSet = new QueryResultSet(obTableQueryAsyncStreamResult);
boolean hasMore = !obTableQueryAsyncStreamResult.isEnd();
querySyncResultSet.setHasMore(hasMore);
querySyncResultSet.setSessionId(obTableQueryAsyncStreamResult.getSessionId());
queryAsyncResultSet.setHasMore(hasMore);
queryAsyncResultSet.setSessionId(obTableQueryAsyncStreamResult.getSessionId());
this.sessionId = obTableQueryAsyncStreamResult.getSessionId();
return querySyncResultSet;
return queryAsyncResultSet;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@

import static org.junit.Assert.assertEquals;

public class ObbTableQuerySyncPayloadTest {
public class ObbTableQueryAsyncPayloadTest {
@Test
public void test_ObTableQuerySync() {
public void test_ObTableQueryAsync() {
ObTableQuery obTableQuery = getObTableQuery();
byte[] bytes = obTableQuery.encode();
ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer();
Expand All @@ -44,7 +44,7 @@ public void test_ObTableQuerySync() {
}

@Test
public void test_ObTableQuerySyncResult() {
public void test_ObTableQueryAsyncResult() {
ObTableQueryAsyncResult obTableQueryAsyncResult = new ObTableQueryAsyncResult();
ObTableQueryResult obTableQueryResult = new ObTableQueryResult();
obTableQueryResult.setRowCount(0);
Expand Down Expand Up @@ -88,9 +88,9 @@ private ObTableQuery getObTableQuery() {
return obTableQuery;
}

private void checkObTableQuery(ObTableQuery obTableQuerySync, ObTableQuery newObTableQuerySync) {
ObTableQuery obTableQuery = obTableQuerySync;
ObTableQuery obTableQuery1 = newObTableQuerySync;
private void checkObTableQuery(ObTableQuery obTableQueryAsync, ObTableQuery newObTableQueryAsync) {
ObTableQuery obTableQuery = obTableQueryAsync;
ObTableQuery obTableQuery1 = newObTableQueryAsync;
checkObNewRange(obTableQuery.getKeyRanges().get(0), obTableQuery1.getKeyRanges().get(0));
assertEquals(obTableQuery.getSelectColumns().get(0), obTableQuery1.getSelectColumns()
.get(0));
Expand Down

0 comments on commit 693f61d

Please sign in to comment.