Skip to content

Commit

Permalink
[Feat] support checkAndInsUp
Browse files Browse the repository at this point in the history
  • Loading branch information
shenyunlong committed Jan 3, 2024
1 parent a98b1a3 commit de7c7f9
Show file tree
Hide file tree
Showing 22 changed files with 1,875 additions and 19 deletions.
32 changes: 31 additions & 1 deletion src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,18 @@
package com.alipay.oceanbase.rpc;

import com.alibaba.fastjson.JSON;
import com.alipay.oceanbase.rpc.checkandmutate.CheckAndInsUp;
import com.alipay.oceanbase.rpc.constant.Constants;
import com.alipay.oceanbase.rpc.exception.*;
import com.alipay.oceanbase.rpc.batch.QueryByBatch;
import com.alipay.oceanbase.rpc.location.LocationUtil;
import com.alipay.oceanbase.rpc.filter.ObTableFilter;
import com.alipay.oceanbase.rpc.location.model.*;
import com.alipay.oceanbase.rpc.location.model.partition.ObPair;
import com.alipay.oceanbase.rpc.location.model.partition.ObPartIdCalculator;
import com.alipay.oceanbase.rpc.location.model.partition.ObPartitionLevel;
import com.alipay.oceanbase.rpc.mutation.*;
import com.alipay.oceanbase.rpc.mutation.result.MutationResult;
import com.alipay.oceanbase.rpc.protocol.payload.ObPayload;
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObRowKey;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.*;
Expand Down Expand Up @@ -2263,7 +2266,25 @@ public BatchOperation batchOperation(String tableName) {
*/
public ObPayload mutationWithFilter(final TableQuery tableQuery, final Object[] rowKey,
final List<ObNewRange> keyRanges,
final ObTableOperation operation, final boolean withResult)
final ObTableOperation operation, final boolean withResult) throws Exception {
return mutationWithFilter(tableQuery, rowKey, keyRanges, operation, withResult, false, false);
}
/**
* execute mutation with filter
* @param tableQuery table query
* @param rowKey row key which want to mutate
* @param keyRanges scan range
* @param operation table operation
* @param withResult whether to bring back result
* @param checkAndExecute whether execute check and execute instead of query and mutate
* @param checkExists whether to check exists or not
* @return execute result
* @throws Exception exception
*/
public ObPayload mutationWithFilter(final TableQuery tableQuery, final Object[] rowKey,
final List<ObNewRange> keyRanges,
final ObTableOperation operation, final boolean withResult,
final boolean checkAndExecute, final boolean checkExists)
throws Exception {
final long start = System.currentTimeMillis();
if (tableQuery != null && tableQuery.getObTableQuery().getKeyRanges().isEmpty()) {
Expand All @@ -2287,6 +2308,8 @@ public ObPayload execute(ObPair<Long, ObTableParam> obPair) throws Exception {
request.setTableId(tableParam.getTableId());
// partId/tabletId
request.setPartitionId(tableParam.getPartitionId());
request.getTableQueryAndMutate().setIsCheckAndExecute(checkAndExecute);
request.getTableQueryAndMutate().setIsCheckNoExists(!checkExists);
ObPayload result = obTable.execute(request);
String endpoint = obTable.getIp() + ":" + obTable.getPort();
MonitorUtil.info(request, database, tableQuery.getTableName(), "QUERY_AND_MUTATE",
Expand Down Expand Up @@ -2517,6 +2540,13 @@ private ObTableQueryAndMutateRequest buildObTableQueryAndMutateRequest(ObTableQu
return request;
}

/**
* checkAndInsUp.
*/
public CheckAndInsUp checkAndInsUp(String tableName, ObTableFilter filter, InsertOrUpdate insUp, boolean checkExists) {
return new CheckAndInsUp(this, tableName, filter, insUp, checkExists);
}

/**
* Set full username
* @param fullUserName user name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.alipay.oceanbase.rpc.protocol.payload.Pcodes;
import com.alipay.oceanbase.rpc.protocol.payload.impl.direct_load.ObTableDirectLoadResult;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableBatchOperationResult;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableLSOpResult;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableOperationResult;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.mutate.ObTableQueryAndMutateResult;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObTableQueryResult;
Expand Down Expand Up @@ -94,6 +95,15 @@ public ObPayload newPayload(ObRpcPacketHeader header) {
return new ObTableDirectLoadResult();
}
}, //
OB_TABLE_API_LS_EXECUTE(Pcodes.OB_TABLE_API_LS_EXECUTE) {
/**
* New payload.
*/
@Override
public ObPayload newPayload(ObRpcPacketHeader header) {
return new ObTableLSOpResult();
}
}, //
OB_ERROR_PACKET(Pcodes.OB_ERROR_PACKET) {
/*
* New payload.
Expand Down Expand Up @@ -136,6 +146,8 @@ public static ObTablePacketCode valueOf(short value) {
return OB_TABLE_API_EXECUTE_QUERY_SYNC;
case Pcodes.OB_TABLE_API_DIRECT_LOAD:
return OB_TABLE_API_DIRECT_LOAD;
case Pcodes.OB_TABLE_API_LS_EXECUTE:
return OB_TABLE_API_LS_EXECUTE;
case Pcodes.OB_ERROR_PACKET:
return OB_ERROR_PACKET;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package com.alipay.oceanbase.rpc.checkandmutate;

import com.alipay.oceanbase.rpc.ObTableClient;
import com.alipay.oceanbase.rpc.exception.ObTableException;
import com.alipay.oceanbase.rpc.filter.ObTableFilter;
import com.alipay.oceanbase.rpc.mutation.InsertOrUpdate;
import com.alipay.oceanbase.rpc.mutation.result.MutationResult;
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObRowKey;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableOperation;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableOperationType;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObNewRange;
import com.alipay.oceanbase.rpc.table.api.Table;
import com.alipay.oceanbase.rpc.table.api.TableQuery;

import java.util.ArrayList;
import java.util.List;

public class CheckAndInsUp {
private Table client;
private String tableName;
private ObTableFilter filter;
private InsertOrUpdate insUp;
boolean checkExists = true;

public CheckAndInsUp(ObTableFilter filter, InsertOrUpdate insUp,
boolean check_exists) throws IllegalArgumentException {
this.filter = filter;
this.insUp = insUp;
this.checkExists = check_exists;
this.tableName = null;
this.client = null;
}

public CheckAndInsUp(Table client, String tableName, ObTableFilter filter,
InsertOrUpdate insUp, boolean check_exists) throws IllegalArgumentException {
this.client = client;
this.tableName = tableName;
this.filter = filter;
this.insUp = insUp;
this.checkExists = check_exists;
}

public Object[] getRowKey() {
return insUp.getRowKey();
}

public ObTableFilter getFilter() {
return filter;
}

public InsertOrUpdate getInsUp() {
return insUp;
}

public boolean isCheckExists() {
return checkExists;
}

public MutationResult execute() throws Exception {
if (null == tableName) {
throw new ObTableException("table name is null");
} else if (null == client) {
throw new ObTableException("client is null");
} else if (!(client instanceof ObTableClient)) {
throw new ObTableException("the client must be table clinet");
}

TableQuery query = client.query(tableName);
query.setFilter(filter);
Object[] rowKey = getRowKey();
List<ObNewRange> ranges = new ArrayList<>();
ObNewRange range = new ObNewRange();
range.setStartKey(ObRowKey.getInstance(insUp.getRowKey()));
range.setEndKey(ObRowKey.getInstance(insUp.getRowKey()));
ranges.add(range);
query.getObTableQuery().setKeyRanges(ranges);
ObTableOperation operation = ObTableOperation.getInstance(ObTableOperationType.INSERT_OR_UPDATE,
insUp.getRowKey(), insUp.getColumns(), insUp.getValues());

return new MutationResult(((ObTableClient)client).mutationWithFilter(query, rowKey, ranges, operation, false, true, checkExists));
}
}
73 changes: 60 additions & 13 deletions src/main/java/com/alipay/oceanbase/rpc/mutation/BatchOperation.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,18 @@
package com.alipay.oceanbase.rpc.mutation;

import com.alipay.oceanbase.rpc.ObTableClient;
import com.alipay.oceanbase.rpc.checkandmutate.CheckAndInsUp;
import com.alipay.oceanbase.rpc.exception.FeatureNotSupportedException;
import com.alipay.oceanbase.rpc.exception.ObTableException;
import com.alipay.oceanbase.rpc.exception.ObTableUnexpectedException;
import com.alipay.oceanbase.rpc.mutation.result.BatchOperationResult;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableOperationType;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.mutate.ObTableQueryAndMutate;
import com.alipay.oceanbase.rpc.table.ObTableClientLSBatchOpsImpl;
import com.alipay.oceanbase.rpc.table.api.Table;
import com.alipay.oceanbase.rpc.table.api.TableBatchOps;
import com.alipay.oceanbase.rpc.table.api.TableQuery;
import com.sun.javaws.exceptions.InvalidArgumentException;

import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -35,6 +41,7 @@ public class BatchOperation {
boolean withResult;
private List<Object> operations;
boolean isAtomic = false;
boolean hasCheckAndInsUp = false;

/*
* default constructor
Expand Down Expand Up @@ -96,78 +103,118 @@ public BatchOperation addOperation(List<Mutation> mutations) {
return this;
}

/*
* add CheckAndInsUp
*/
public BatchOperation addOperation(CheckAndInsUp... insUps) {
this.operations.addAll(Arrays.asList(insUps));
this.hasCheckAndInsUp = true;
return this;
}

public BatchOperation setIsAtomic(boolean isAtomic) {
this.isAtomic = isAtomic;
return this;
}

@SuppressWarnings("unchecked")
public BatchOperationResult execute() throws Exception {
// add rowkeyElement
boolean hasSetRowkeyElement = false;
if (hasCheckAndInsUp) {
return executeWithLSBatchOp();
} else {
return executeWithNormalBatchOp();
}
}

public BatchOperationResult executeWithNormalBatchOp() throws Exception {
TableBatchOps batchOps = client.batch(tableName);
boolean hasSetRowkeyElement = false;

for (Object operation : operations) {
if (operation instanceof Mutation) {
Mutation mutation = (Mutation) operation;
if (!hasSetRowkeyElement && mutation.getRowKeyNames() != null) {
List<String> rowKeyNames = mutation.getRowKeyNames();
((ObTableClient) client).addRowKeyElement(tableName,
rowKeyNames.toArray(new String[0]));
rowKeyNames.toArray(new String[0]));
hasSetRowkeyElement = true;
}
ObTableOperationType type = mutation.getOperationType();
switch (type) {
case GET:
throw new IllegalArgumentException("Invalid type in batch operation, "
+ type);
+ type);
case INSERT:
((Insert) mutation).removeRowkeyFromMutateColval();
batchOps.insert(mutation.getRowKey(), ((Insert) mutation).getColumns(),
((Insert) mutation).getValues());
((Insert) mutation).getValues());
break;
case DEL:
batchOps.delete(mutation.getRowKey());
break;
case UPDATE:
((Update) mutation).removeRowkeyFromMutateColval();
batchOps.update(mutation.getRowKey(), ((Update) mutation).getColumns(),
((Update) mutation).getValues());
((Update) mutation).getValues());
break;
case INSERT_OR_UPDATE:
((InsertOrUpdate) mutation).removeRowkeyFromMutateColval();
batchOps.insertOrUpdate(mutation.getRowKey(),
((InsertOrUpdate) mutation).getColumns(),
((InsertOrUpdate) mutation).getValues());
((InsertOrUpdate) mutation).getColumns(),
((InsertOrUpdate) mutation).getValues());
break;
case REPLACE:
((Replace) mutation).removeRowkeyFromMutateColval();
batchOps.replace(mutation.getRowKey(), ((Replace) mutation).getColumns(),
((Replace) mutation).getValues());
((Replace) mutation).getValues());
break;
case INCREMENT:
((Increment) mutation).removeRowkeyFromMutateColval();
batchOps.increment(mutation.getRowKey(),
((Increment) mutation).getColumns(),
((Increment) mutation).getValues(), withResult);
((Increment) mutation).getColumns(),
((Increment) mutation).getValues(), withResult);
break;
case APPEND:
((Append) mutation).removeRowkeyFromMutateColval();
batchOps.append(mutation.getRowKey(), ((Append) mutation).getColumns(),
((Append) mutation).getValues(), withResult);
((Append) mutation).getValues(), withResult);
break;
default:
throw new ObTableException("unknown operation type " + type);
}
} else if (operation instanceof TableQuery) {
TableQuery query = (TableQuery) operation;
batchOps.get(query.getRowKey().getValues(),
query.getSelectColumns().toArray((new String[0])));
query.getSelectColumns().toArray((new String[0])));
} else {
throw new ObTableException("unknown operation " + operation);
}
}
batchOps.setAtomicOperation(isAtomic);
return new BatchOperationResult(batchOps.executeWithResult());
}

public BatchOperationResult executeWithLSBatchOp() throws Exception {
ObTableClientLSBatchOpsImpl batchOps;
if (client instanceof ObTableClient) {
batchOps = new ObTableClientLSBatchOpsImpl(tableName, (ObTableClient)client);
boolean hasSetRowkeyElement = false;
for (Object operation : operations) {
if (operation instanceof CheckAndInsUp) {
CheckAndInsUp checkAndInsUp = (CheckAndInsUp) operation;
batchOps.addOperation(checkAndInsUp);
List<String> rowKeyNames = checkAndInsUp.getInsUp().getRowKeyNames();
if (!hasSetRowkeyElement && rowKeyNames != null) {
((ObTableClient) client).addRowKeyElement(tableName, rowKeyNames.toArray(new String[0]));
hasSetRowkeyElement = true;
}
} else {
throw new IllegalArgumentException("the operation in LS batch must be checkAndInsUp");
}
}
} else {
throw new IllegalArgumentException("execute batch using ObTable diretly is not supporeted");
}
return new BatchOperationResult(batchOps.executeWithResult());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ protected TableQuery getQuery() {
/*
* get row key
*/
protected Object[] getRowKey() {
public Object[] getRowKey() {
return rowKey;
}

Expand Down Expand Up @@ -441,3 +441,4 @@ static void removeRowkeyFromMutateColval(List<String> columns, List<Object> valu
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,5 @@ public interface Pcodes {
int OB_TABLE_API_EXECUTE_QUERY_SYNC = 0x1106;

int OB_TABLE_API_DIRECT_LOAD = 0x1123;
int OB_TABLE_API_LS_EXECUTE = 0x1125;
}
Loading

0 comments on commit de7c7f9

Please sign in to comment.