Skip to content

Commit

Permalink
HBase 1.x compatible
Browse files Browse the repository at this point in the history
  • Loading branch information
HexyinUESTC committed Aug 30, 2024
1 parent f388433 commit 9507c7d
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 15 deletions.
79 changes: 65 additions & 14 deletions src/main/java/com/alipay/oceanbase/hbase/OHTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.alipay.oceanbase.rpc.ObTableClient;
import com.alipay.oceanbase.rpc.mutation.BatchOperation;
import com.alipay.oceanbase.rpc.mutation.result.BatchOperationResult;
import com.alipay.oceanbase.rpc.property.Property;
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj;
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObRowKey;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.*;
Expand Down Expand Up @@ -385,18 +386,34 @@ public HTableDescriptor getTableDescriptor() {
*/
@Override
public boolean exists(Get get) throws IOException {
get.setCheckExistenceOnly(true);
Result r = get(get);
return !r.isEmpty();
}

@Override
public boolean[] existsAll(List<Get> list) throws IOException {
throw new FeatureNotSupportedException("not supported yet.");
if (list.isEmpty()) {
return new boolean[]{};
}
if (list.size() == 1) {
return new boolean[]{exists(list.get(0))};
}
Result[] r = get(list);
boolean[] ret = new boolean[r.length];
for (int i = 0; i < r.length; ++i){
ret[i] = !r[i].isEmpty();
}
return ret;
}

@Override
public Boolean[] exists(List<Get> gets) throws IOException {
throw new FeatureNotSupportedException("not supported yet'");
Boolean[] result = new Boolean[gets.size()];
boolean[] exists = existsAll(gets);
for (int i = 0; i < gets.size(); ++i) {
result[i] = exists[i];
}
return result;
}

@Override
Expand Down Expand Up @@ -495,6 +512,14 @@ public Result call() throws IOException {

obTableQuery = buildObTableQuery(filter, get.getRow(), true,
get.getRow(), true);
// if (get.isClosestRowBefore()) {
// obTableQuery = buildObTableQuery(filter, null, false,
// get.getRow(), true, 1);
// obTableQuery.setScanOrder(ObScanOrder.Reverse);
// } else {
// obTableQuery = buildObTableQuery(filter, get.getRow(), true,
// get.getRow(), true, -1);
// }

request = buildObTableQueryRequest(obTableQuery,
getTargetTableName(tableNameString, Bytes.toString(family)));
Expand Down Expand Up @@ -558,10 +583,24 @@ public ResultScanner call() throws IOException {
if (scan.getFamilyMap().keySet().isEmpty()) {
filter = buildObHTableFilter(scan.getFilter(), scan.getTimeRange(),
scan.getMaxVersions(), null);
obTableQuery = buildObTableQuery(filter, scan);

request = buildObTableQueryAsyncRequest(obTableQuery,
getTargetTableName(tableNameString));
// obTableQuery = buildObTableQuery(filter, scan);
//
// request = buildObTableQueryAsyncRequest(obTableQuery,
// getTargetTableName(tableNameString));
// if (scan.isReversed()) {
// obTableQuery = buildObTableQuery(filter, scan.getStopRow(), false,
// scan.getStartRow(), true, scan.getBatch());
// } else {
// obTableQuery = buildObTableQuery(filter, scan.getStartRow(), true,
// scan.getStopRow(), false, scan.getBatch());
// }
// if (scan.isReversed()) { // reverse scan 时设置为逆序
// obTableQuery.setScanOrder(ObScanOrder.Reverse);
// }
// obTableQuery.setMaxResultSize(scan.getMaxResultSize() > 0 ? scan.getMaxResultSize() : conf.getLong(
// HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
// HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE));
// request = buildObTableQueryAsyncRequest(obTableQuery, getTargetTableName(tableNameString));
clientQueryAsyncStreamResult = (ObTableClientQueryAsyncStreamResult) obTableClient
.execute(request);
return new ClientStreamScanner(clientQueryAsyncStreamResult,
Expand All @@ -573,6 +612,21 @@ public ResultScanner call() throws IOException {
filter = buildObHTableFilter(scan.getFilter(), scan.getTimeRange(),
scan.getMaxVersions(), entry.getValue());
obTableQuery = buildObTableQuery(filter, scan);
// if (scan.isReversed()) {
// obTableQuery = buildObTableQuery(filter, scan.getStopRow(), false,
// scan.getStartRow(), true, scan.getBatch());
// } else {
// obTableQuery = buildObTableQuery(filter, scan.getStartRow(), true,
// scan.getStopRow(), false, scan.getBatch());
// }
// if (scan.isReversed()) { // reverse scan 时设置为逆序
// obTableQuery.setScanOrder(ObScanOrder.Reverse);
// }
//
// // no support set maxResultSize.
// obTableQuery.setMaxResultSize(scan.getMaxResultSize() > 0 ? scan.getMaxResultSize() : conf.getLong(
// HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
// HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE));

request = buildObTableQueryAsyncRequest(obTableQuery,
getTargetTableName(tableNameString, Bytes.toString(family)));
Expand Down Expand Up @@ -1202,22 +1256,19 @@ public void setOperationTimeout(int operationTimeout) {
(this.operationTimeout != HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT));
}

// todo
@Override
public int getOperationTimeout() {
throw new FeatureNotSupportedException("not supported yet.");
return operationTimeout;
}

//todo
// rpcTimeout means server max execute time, equal Table API rpc_execute_time, it must be set before OHTable init; please pass this parameter through conf
@Override
public void setRpcTimeout(int i) {
throw new FeatureNotSupportedException("not supported yet.");
public void setRpcTimeout(int rpcTimeout) {
}

// todo
@Override
public int getRpcTimeout() {
throw new FeatureNotSupportedException("not supported yet.");
return Integer.parseInt(configuration.get(Property.RPC_EXECUTE_TIMEOUT.getKey()));
}

public void setRuntimeBatchExecutor(ExecutorService runtimeBatchExecutor) {
Expand Down
23 changes: 23 additions & 0 deletions src/main/java/com/alipay/oceanbase/hbase/OHTableClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.alipay.oceanbase.hbase.core.Lifecycle;
import com.alipay.oceanbase.hbase.exception.FeatureNotSupportedException;
import com.alipay.oceanbase.rpc.property.Property;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import com.google.protobuf.Service;
Expand Down Expand Up @@ -228,6 +229,28 @@ public int getRpcTimeout() {
return ohTable.getRpcTimeout();
}

@Override
public void setOperationTimeout(int operationTimeout) {
checkStatus();
ohTable.setOperationTimeout(operationTimeout);
}

@Override
public int getOperationTimeout() {
checkStatus();
return ohTable.getOperationTimeout();
}

@Override
public void setRpcTimeout(int rpcTimeout) {
conf.set(Property.RPC_EXECUTE_TIMEOUT.getKey(), String.valueOf(rpcTimeout));
}

@Override
public int getRpcTimeout() {
return Integer.parseInt(conf.get(Property.RPC_EXECUTE_TIMEOUT.getKey()));
}

@Override
public byte[] getTableName() {
return tableName;
Expand Down
32 changes: 31 additions & 1 deletion src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -2222,11 +2222,23 @@ public void testCheckAndPut() throws IOException, InterruptedException {
boolean ret = hTable.checkAndPut(key.getBytes(), "family1".getBytes(), column.getBytes(),
value.getBytes(), put);
Assert.assertTrue(ret);
ret = hTable.checkAndPut(key.getBytes(), "family1".getBytes(), column.getBytes(),
CompareFilter.CompareOp.GREATER, "value1".getBytes(), put);
Assert.assertFalse(ret);
ret = hTable.checkAndPut(key.getBytes(), "family1".getBytes(), column.getBytes(),
CompareFilter.CompareOp.GREATER_OR_EQUAL, "value1".getBytes(), put);
Assert.assertTrue(ret);
ret = hTable.checkAndPut(key.getBytes(), "family1".getBytes(), column.getBytes(),
CompareFilter.CompareOp.LESS, "".getBytes(), put);
Assert.assertFalse(ret);
ret = hTable.checkAndPut(key.getBytes(), "family1".getBytes(), column.getBytes(),
CompareFilter.CompareOp.LESS_OR_EQUAL, "".getBytes(), put);
Assert.assertFalse(ret);
get = new Get(key.getBytes());
get.setMaxVersions(Integer.MAX_VALUE);
get.addColumn(family.getBytes(), column.getBytes());
r = hTable.get(get);
Assert.assertEquals(2, r.raw().length);
Assert.assertEquals(3, r.raw().length);
Assert.assertEquals("value1", Bytes.toString(r.raw()[0].getValue()));
}

Expand All @@ -2251,6 +2263,24 @@ public void testCheckAndDelete() throws IOException {
boolean ret = hTable.checkAndDelete(key.getBytes(), family.getBytes(), column.getBytes(),
value.getBytes(), delete);
Assert.assertTrue(ret);
put.add(family.getBytes(), column.getBytes(), "value6".getBytes());
hTable.put(put);
ret = hTable.checkAndDelete(key.getBytes(), "family1".getBytes(), column.getBytes(),
CompareFilter.CompareOp.GREATER, "value5".getBytes(), delete);
Assert.assertTrue(ret);
put.add(family.getBytes(), column.getBytes(), "value5".getBytes());
hTable.put(put);
ret = hTable.checkAndDelete(key.getBytes(), "family1".getBytes(), column.getBytes(),
CompareFilter.CompareOp.GREATER_OR_EQUAL, "value5".getBytes(), delete);
Assert.assertTrue(ret);
put.add(family.getBytes(), column.getBytes(), "value1".getBytes());
hTable.put(put);
ret = hTable.checkAndDelete(key.getBytes(), "family1".getBytes(), column.getBytes(),
CompareFilter.CompareOp.LESS, "value1".getBytes(), delete);
Assert.assertFalse(ret);
ret = hTable.checkAndDelete(key.getBytes(), "family1".getBytes(), column.getBytes(),
CompareFilter.CompareOp.LESS_OR_EQUAL, "value1".getBytes(), delete);
Assert.assertTrue(ret);
Get get = new Get(key.getBytes());
get.setMaxVersions(Integer.MAX_VALUE);
get.addColumn(family.getBytes(), column.getBytes());
Expand Down
1 change: 1 addition & 0 deletions src/test/java/com/alipay/oceanbase/hbase/OHTableTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.alipay.oceanbase.rpc.ObTableClient;
import com.alipay.oceanbase.rpc.exception.ObTableNotExistException;
import com.alipay.oceanbase.rpc.property.Property;
import com.alipay.sofa.common.thread.SofaThreadPoolExecutor;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.HTableInterface;
Expand Down

0 comments on commit 9507c7d

Please sign in to comment.