Skip to content

Commit

Permalink
support rollback when check failed (#265)
Browse files Browse the repository at this point in the history
* support rollback when check failed

* fix review comment
  • Loading branch information
WeiXinChan authored Dec 23, 2024
1 parent c40a50d commit 6721123
Show file tree
Hide file tree
Showing 11 changed files with 189 additions and 15 deletions.
15 changes: 13 additions & 2 deletions src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -3299,7 +3299,7 @@ public ObPayload mutationWithFilter(final TableQuery tableQuery, final Row rowKe
final ObTableOperation operation, final boolean withResult)
throws Exception {
return mutationWithFilter(tableQuery, rowKey, keyRanges, operation, withResult, false,
false);
false, false);
}

/**
Expand All @@ -3311,13 +3311,15 @@ public ObPayload mutationWithFilter(final TableQuery tableQuery, final Row rowKe
* @param withResult whether to bring back result
* @param checkAndExecute whether execute check and execute instead of query and mutate
* @param checkExists whether to check exists or not
* @param rollbackWhenCheckFailed whether rollback or not when check failed
* @return execute result
* @throws Exception exception
*/
public ObPayload mutationWithFilter(final TableQuery tableQuery, final Row rowKey,
final List<ObNewRange> keyRanges,
final ObTableOperation operation, final boolean withResult,
final boolean checkAndExecute, final boolean checkExists)
final boolean checkAndExecute, final boolean checkExists,
final boolean rollbackWhenCheckFailed)
throws Exception {
final long start = System.currentTimeMillis();
if (tableQuery != null && tableQuery.getObTableQuery().getKeyRanges().isEmpty()) {
Expand All @@ -3343,6 +3345,7 @@ public ObPayload execute(ObPair<Long, ObTableParam> obPair) throws Exception {
request.setPartitionId(tableParam.getPartitionId());
request.getTableQueryAndMutate().setIsCheckAndExecute(checkAndExecute);
request.getTableQueryAndMutate().setIsCheckNoExists(!checkExists);
request.getTableQueryAndMutate().setIsRollbackWhenCheckFailed(rollbackWhenCheckFailed);
ObPayload result = executeWithRetry(obTable, request, tableQuery.getTableName());
String endpoint = obTable.getIp() + ":" + obTable.getPort();
MonitorUtil.info(request, database, tableQuery.getTableName(), "QUERY_AND_MUTATE",
Expand Down Expand Up @@ -3671,6 +3674,14 @@ public CheckAndInsUp checkAndInsUp(String tableName, ObTableFilter filter,
return new CheckAndInsUp(this, tableName, filter, insUp, checkExists);
}

/**
* checkAndInsUp.
*/
public CheckAndInsUp checkAndInsUp(String tableName, ObTableFilter filter, InsertOrUpdate insUp,
boolean checkExists, boolean rollbackWhenCheckFailed) {
return new CheckAndInsUp(this, tableName, filter, insUp, checkExists, rollbackWhenCheckFailed);
}

/**
* Set full username
* @param fullUserName user name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,25 +38,33 @@ public class CheckAndInsUp {
private String tableName;
private ObTableFilter filter;
private InsertOrUpdate insUp;
boolean checkExists = true;
private boolean checkExists = true;
private boolean rollbackWhenCheckFailed = false;

public CheckAndInsUp(ObTableFilter filter, InsertOrUpdate insUp, boolean check_exists)
throws IllegalArgumentException {
this.filter = filter;
this.insUp = insUp;
this.checkExists = check_exists;
this.tableName = null;
this.client = null;
this(null, null, filter, insUp, check_exists, false);
}

public CheckAndInsUp(ObTableFilter filter, InsertOrUpdate insUp, boolean check_exists, boolean rollbackWhenCheckFailed)
throws IllegalArgumentException {
this(null, null, filter, insUp, check_exists, rollbackWhenCheckFailed);
}

public CheckAndInsUp(Table client, String tableName, ObTableFilter filter,
InsertOrUpdate insUp, boolean check_exists)
throws IllegalArgumentException {
this(client, null, filter, insUp, check_exists, false);
}

public CheckAndInsUp(Table client, String tableName, ObTableFilter filter,InsertOrUpdate insUp,
boolean check_exists, boolean rollbackWhenCheckFailed) throws IllegalArgumentException {
this.client = client;
this.tableName = tableName;
this.filter = filter;
this.insUp = insUp;
this.checkExists = check_exists;
this.rollbackWhenCheckFailed = rollbackWhenCheckFailed;
}

public Row getRowKey() {
Expand All @@ -75,6 +83,10 @@ public boolean isCheckExists() {
return checkExists;
}

public boolean isRollbackWhenCheckFailed() {
return rollbackWhenCheckFailed;
}

public MutationResult execute() throws Exception {
if (null == tableName || tableName.isEmpty()) {
throw new ObTableException("table name is null");
Expand All @@ -96,6 +108,7 @@ public MutationResult execute() throws Exception {
ObTableOperation operation = ObTableOperation.getInstance(ObTableOperationType.INSERT_OR_UPDATE,
insUp.getRowKey().getValues(), insUp.getColumns(), insUp.getValues());

return new MutationResult(((ObTableClient)client).mutationWithFilter(query, rowKey, ranges, operation, false, true, checkExists));
return new MutationResult(((ObTableClient)client).mutationWithFilter(query, rowKey, ranges, operation,
false, true, checkExists, rollbackWhenCheckFailed));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,9 @@ public enum ResultCodes {
OB_KV_FILTER_PARSE_ERROR(-10514), //
OB_KV_REDIS_PARSE_ERROR(-10515), //
OB_KV_HBASE_INCR_FIELD_IS_NOT_LONG(-10516), //
OB_KV_ODP_TIMEOUT(-10650), OB_ERR_KV_ROUTE_ENTRY_EXPIRE(-10653);
OB_KV_CHECK_FAILED(-10518), //
OB_KV_ODP_TIMEOUT(-10650), //
OB_ERR_KV_ROUTE_ENTRY_EXPIRE(-10653);

public final int errorCode;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
package com.alipay.oceanbase.rpc.protocol.payload.impl.execute;

public class ObTableQueryAndMutateFlag {
private static final int FLAG_IS_CHECK_AND_EXECUTE = 1 << 0;
private static final int FLAG_IS_CHECK_NOT_EXISTS = 1 << 1;
private long flags = 0;
private static final int FLAG_IS_CHECK_AND_EXECUTE = 1 << 0;
private static final int FLAG_IS_CHECK_NOT_EXISTS = 1 << 1;
private static final int FLAG_IS_ROLLBACK_WHEN_CHECK_FAILED = 1 << 2;
private long flags = 0;

public void setIsCheckAndExecute(boolean isCheckAndExecute) {
if (isCheckAndExecute) {
Expand All @@ -38,6 +39,14 @@ public void setIsCheckNotExists(boolean isCheckNotExists) {
}
}

public void setIsRollbackWhenCheckFailed(boolean isRollbackWhenCheckFailed) {
if (isRollbackWhenCheckFailed) {
flags |= FLAG_IS_ROLLBACK_WHEN_CHECK_FAILED;
} else {
flags &= ~FLAG_IS_ROLLBACK_WHEN_CHECK_FAILED;
}
}

public long getValue() {
return flags;
}
Expand All @@ -46,7 +55,11 @@ public boolean isCheckNotExists() {
return (flags & FLAG_IS_CHECK_NOT_EXISTS) != 0;
}

public boolean isChekAndExecute() {
public boolean isCheckAndExecute() {
return (flags & FLAG_IS_CHECK_AND_EXECUTE) != 0;
}

public boolean isRollbackWhenCheckFailed() {
return (flags & FLAG_IS_ROLLBACK_WHEN_CHECK_FAILED) != 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ public void setIsCheckNoExists(boolean isCheckNoExists) {
singleOpFlag.setIsCheckNotExists(isCheckNoExists);
}

public void setIsRollbackWhenCheckFailed(boolean isRollbackWhenCheckFailed) {
singleOpFlag.setIsRollbackWhenCheckFailed(isRollbackWhenCheckFailed);
}

public ObTableOperationType getSingleOpType() {
return singleOpType;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

public class ObTableSingleOpFlag {
private static final int FLAG_IS_CHECK_NOT_EXISTS = 1 << 0;
private static final int FLAG_IS_ROLLBACK_WHEN_CHECK_FAILED = 1 << 1;
private long flags = 0;

public void setIsCheckNotExists(boolean isCheckNotExists) {
Expand All @@ -29,6 +30,14 @@ public void setIsCheckNotExists(boolean isCheckNotExists) {
}
}

public void setIsRollbackWhenCheckFailed(boolean isRollbackWhenCheckFailed) {
if (isRollbackWhenCheckFailed) {
flags |= FLAG_IS_ROLLBACK_WHEN_CHECK_FAILED;
} else {
flags &= ~FLAG_IS_ROLLBACK_WHEN_CHECK_FAILED;
}
}

public long getValue() {
return flags;
}
Expand All @@ -37,6 +46,10 @@ public boolean isCheckNotExists() {
return (flags & FLAG_IS_CHECK_NOT_EXISTS) != 0;
}

public boolean isRollbackWhenCheckFailed() {
return (flags & FLAG_IS_ROLLBACK_WHEN_CHECK_FAILED) != 0;
}

void setValue(long value) {
flags = value;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,4 +164,8 @@ public void setIsCheckAndExecute(boolean isCheckAndExecute) {
public void setIsCheckNoExists(boolean isCheckNoExists) {
queryAndMutateFlag.setIsCheckNotExists(isCheckNoExists);
}

public void setIsRollbackWhenCheckFailed(boolean isRollbackWhenCheckFailed) {
queryAndMutateFlag.setIsRollbackWhenCheckFailed(isRollbackWhenCheckFailed);
}
}
10 changes: 9 additions & 1 deletion src/main/java/com/alipay/oceanbase/rpc/table/ObTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -347,13 +347,21 @@ public BatchOperation batchOperation(String tableName) {
}

/**
* Insert.
* checkAndInsUp.
*/
public CheckAndInsUp checkAndInsUp(String tableName, ObTableFilter filter,
InsertOrUpdate insUp, boolean checkExists) {
return new CheckAndInsUp(this, tableName, filter, insUp, checkExists);
}

/**
* checkAndInsUp.
*/
public CheckAndInsUp checkAndInsUp(String tableName, ObTableFilter filter, InsertOrUpdate insUp,
boolean checkExists, boolean rollbackWhenCheckFailed) {
return new CheckAndInsUp(this, tableName, filter, insUp, checkExists, rollbackWhenCheckFailed);
}

/*
* Execute.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ public void addOperation(CheckAndInsUp checkAndInsUp) {
ObTableSingleOp singleOp = new ObTableSingleOp();
singleOp.setSingleOpType(ObTableOperationType.CHECK_AND_INSERT_UP);
singleOp.setIsCheckNoExists(!checkAndInsUp.isCheckExists());
singleOp.setIsRollbackWhenCheckFailed(checkAndInsUp.isRollbackWhenCheckFailed());
singleOp.setQuery(query);
singleOp.addEntity(entity);

Expand Down
2 changes: 2 additions & 0 deletions src/main/java/com/alipay/oceanbase/rpc/table/api/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,6 @@ Map<String, Object> append(String tableName, Object[] rowkeys, String[] columns,

CheckAndInsUp checkAndInsUp(String tableName, ObTableFilter filter, InsertOrUpdate insUp,
boolean checkExists);
CheckAndInsUp checkAndInsUp(String tableName, ObTableFilter filter, InsertOrUpdate insUp,
boolean checkExists, boolean rollbackWhenCheckFailed);
}
103 changes: 103 additions & 0 deletions src/test/java/com/alipay/oceanbase/rpc/ObTableCheckAndInsUpTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import static com.alipay.oceanbase.rpc.filter.ObTableFilterFactory.compareVal;
import static com.alipay.oceanbase.rpc.mutation.MutationFactory.colVal;
import static com.alipay.oceanbase.rpc.mutation.MutationFactory.row;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;

public class ObTableCheckAndInsUpTest {
public ObTableClient client;
Expand Down Expand Up @@ -534,4 +536,105 @@ public void testCheckAndInsUpWithDiffObj() throws Exception {
.assertEquals(ResultCodes.OB_KV_COLUMN_TYPE_NOT_MATCH.errorCode, e.getErrorCode());
}
}

@Test
public void testSingleCheckInsUpWithRollback() throws Exception {
if (!isVersionSupported()) {
System.out.println("current version is not supported, current version: "
+ ObGlobal.OB_VERSION);
return;
}

try {
// 0. prepare data, insert(1, 'c2_v0', 'c3_v0', 100),(2, 'c2_v0', 'c3_v0', 100),(3, 'c2_v0', 'c3_v0', 100),(4, 'c2_v0', 'c3_v0', 100)
for (long i = 1L; i <= 4L; i++) {
InsertOrUpdate insertOrUpdate = client.insertOrUpdate(TABLE_NAME);
insertOrUpdate.setRowKey(row(colVal("c1", i), colVal("c2", "c2_v0")));
insertOrUpdate.addMutateRow(row(colVal("c3", "c3_v0"), colVal("c4", 100L)));
MutationResult res = insertOrUpdate.execute();
Assert.assertEquals(1, res.getAffectedRows());
}

// 1. check failed: insup(1, 'c2_v0', 'c3_v0', 200) if exists c3 <= 'c3_v0';
InsertOrUpdate insertOrUpdate1 = new InsertOrUpdate();
insertOrUpdate1.setRowKey(row(colVal("c1", 1L), colVal("c2", "c2_v0")));
insertOrUpdate1.addMutateRow(row(colVal("c3", "c3_v0"), colVal("c4", 200L)));
ObTableFilter filter = compareVal(ObCompareOp.LT, "c3", "c3_v0");
CheckAndInsUp checkAndInsUp1 = client.checkAndInsUp(TABLE_NAME, filter,insertOrUpdate1, true, true);
ObTableException thrown = assertThrows(
ObTableException.class,
() -> {
checkAndInsUp1.execute();
}
);

System.out.println(thrown.getMessage());
assertTrue(thrown.getMessage().contains("[-10518][OB_KV_CHECK_FAILED][Check failed in CheckAndInsUp]"));
} catch (Exception e) {
e.printStackTrace();
Assert.assertTrue(false);
} finally {
for (long i = 1L; i <= 4L; i++) {
Delete delete = client.delete(TABLE_NAME);
delete.setRowKey(row(colVal("c1", i), colVal("c2", "c2_v0")));
MutationResult res = delete.execute();
Assert.assertEquals(1, res.getAffectedRows());
}
}
}

@Test
public void testBatchCheckInsUpWithRollback() throws Exception {
if (!isVersionSupported()) {
System.out.println("current version is not supported, current version: "
+ ObGlobal.OB_VERSION);
return;
}

try {
// 0. prepare data, insert(1, 'c2_v0', 'c3_v0', 100),(2, 'c2_v0', 'c3_v0', 100),(3, 'c2_v0', 'c3_v0', 100),(4, 'c2_v0', 'c3_v0', 100)
for (long i = 1L; i <= 4L; i++) {
InsertOrUpdate insertOrUpdate = client.insertOrUpdate(TABLE_NAME);
insertOrUpdate.setRowKey(row(colVal("c1", i), colVal("c2", "c2_v0")));
insertOrUpdate.addMutateRow(row(colVal("c3", "c3_v0"), colVal("c4", 100L)));
MutationResult res = insertOrUpdate.execute();
Assert.assertEquals(1, res.getAffectedRows());
}

// 1. check pass: insup(1, 'c2_v0', 'c3_v0', 200) if exists c3 >= 'c3_v0';
InsertOrUpdate insertOrUpdate1 = new InsertOrUpdate();
insertOrUpdate1.setRowKey(row(colVal("c1", 1L), colVal("c2", "c2_v0")));
insertOrUpdate1.addMutateRow(row(colVal("c3", "c3_v0"), colVal("c4", 200L)));
ObTableFilter filter1 = compareVal(ObCompareOp.GT, "c3", "c3_v0");
CheckAndInsUp ck1 = client.checkAndInsUp(TABLE_NAME, filter1,insertOrUpdate1, true, true);

// 2. check failed: insup(1, 'c2_v0', 'c3_v0', 200) if exists c3 < 'c3_v0';
ObTableFilter filter2 = compareVal(ObCompareOp.LT, "c3", "c3_v0");
CheckAndInsUp ck2 = client.checkAndInsUp(TABLE_NAME, filter2,insertOrUpdate1, true, true);

// batch
BatchOperation batch = client.batchOperation(TABLE_NAME);
batch.addOperation(ck1, ck2);

ObTableException thrown = assertThrows(
ObTableException.class,
() -> {
batch.execute();
}
);

System.out.println(thrown.getMessage());
assertTrue(thrown.getMessage().contains("[-10518][OB_KV_CHECK_FAILED][Check failed in CheckAndInsUp]"));
} catch (Exception e) {
e.printStackTrace();
Assert.assertTrue(false);
} finally {
for (long i = 1L; i <= 4L; i++) {
Delete delete = client.delete(TABLE_NAME);
delete.setRowKey(row(colVal("c1", i), colVal("c2", "c2_v0")));
MutationResult res = delete.execute();
Assert.assertEquals(1, res.getAffectedRows());
}
}
}
}

0 comments on commit 6721123

Please sign in to comment.