Skip to content

Commit

Permalink
feat: support for propagation of global transactions (#262)
Browse files Browse the repository at this point in the history
  • Loading branch information
106umao authored Dec 1, 2022
1 parent 52eafda commit 48bf289
Show file tree
Hide file tree
Showing 24 changed files with 922 additions and 452 deletions.
6 changes: 3 additions & 3 deletions pkg/rm/tcc/tcc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,10 +241,10 @@ func obtainStructValueType(o interface{}) (bool, reflect.Value, reflect.Type) {
}
}

func (t *TCCServiceProxy) GetTransactionInfo() tm.TransactionInfo {
func (t *TCCServiceProxy) GetTransactionInfo() tm.GtxConfig {
// todo replace with config
return tm.TransactionInfo{
TimeOut: time.Second * 10,
return tm.GtxConfig{
Timeout: time.Second * 10,
Name: t.GetActionName(),
// Propagation, Propagation
// LockRetryInternal, int64
Expand Down
4 changes: 2 additions & 2 deletions pkg/rm/tcc/tcc_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ func TestTCCGetTransactionInfo(t1 *testing.T) {
tests := struct {
name string
fields fields
want tm.TransactionInfo
want tm.GtxConfig
}{
"test1",
fields{
Expand All @@ -309,7 +309,7 @@ func TestTCCGetTransactionInfo(t1 *testing.T) {
TwoPhaseAction: twoPhaseAction1,
},
},
tm.TransactionInfo{Name: "TwoPhaseDemoService", TimeOut: time.Second * 10, Propagation: 0, LockRetryInternal: 0, LockRetryTimes: 0},
tm.GtxConfig{Name: "TwoPhaseDemoService", Timeout: time.Second * 10, Propagation: 0, LockRetryInternal: 0, LockRetryTimes: 0},
}

t1.Run(tests.name, func(t1 *testing.T) {
Expand Down
228 changes: 70 additions & 158 deletions pkg/tm/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,6 @@ package tm

import "github.com/seata/seata-go/pkg/protocol/message"

const (
LAUNCHER GlobalTransactionRole = 0
PARTICIPANT GlobalTransactionRole = 1
)

type (
Propagation int8
GlobalTransactionRole int8
)

type TransactionManager interface {
// Begin a new global transaction.
Begin(applicationId, transactionServiceGroup, name string, timeout int64) (string, error)
Expand All @@ -46,153 +36,75 @@ type TransactionManager interface {
GlobalReport(xid string, globalStatus message.GlobalStatus) (message.GlobalStatus, error)
}

// GlobalTransactionRole Identifies whether a global transaction is beginNewGtx or participates in something else
type GlobalTransactionRole int8

const (
/*
* The REQUIRED.
* The default propagation.
*
* <p>
* If transaction is existing, execute with current transaction,
* else execute with new transaction.
* </p>
*
* <p>
* The logic is similar to the following code:
* <code><pre>
* if (tx == null) {
* try {
* tx = beginNewTransaction(); // begin new transaction, is not existing
* Object rs = business.execute(); // execute with new transaction
* commitTransaction(tx);
* return rs;
* } catch (Exception ex) {
* rollbackTransaction(tx);
* throw ex;
* }
* } else {
* return business.execute(); // execute with current transaction
* }
* </pre></code>
* </p>
*/
REQUIRED = Propagation(0)

/*
* The REQUIRES_NEW.
*
* <p>
* If transaction is existing, suspend it, and then execute business with new transaction.
* </p>
*
* <p>
* The logic is similar to the following code:
* <code><pre>
* try {
* if (tx != null) {
* suspendedResource = suspendTransaction(tx); // suspend current transaction
* }
* try {
* tx = beginNewTransaction(); // begin new transaction
* Object rs = business.execute(); // execute with new transaction
* commitTransaction(tx);
* return rs;
* } catch (Exception ex) {
* rollbackTransaction(tx);
* throw ex;
* }
* } finally {
* if (suspendedResource != null) {
* resumeTransaction(suspendedResource); // resume transaction
* }
* }
* </pre></code>
* </p>
*/
REQUIRES_NEW = Propagation(1)

/*
* The NOT_SUPPORTED.
*
* <p>
* If transaction is existing, suspend it, and then execute business without transaction.
* </p>
*
* <p>
* The logic is similar to the following code:
* <code><pre>
* try {
* if (tx != null) {
* suspendedResource = suspendTransaction(tx); // suspend current transaction
* }
* return business.execute(); // execute without transaction
* } finally {
* if (suspendedResource != null) {
* resumeTransaction(suspendedResource); // resume transaction
* }
* }
* </pre></code>
* </p>
*/
NOT_SUPPORTED = Propagation(2)

/*
* The SUPPORTS.
*
* <p>
* If transaction is not existing, execute without global transaction,
* else execute business with current transaction.
* </p>
*
* <p>
* The logic is similar to the following code:
* <code><pre>
* if (tx != null) {
* return business.execute(); // execute with current transaction
* } else {
* return business.execute(); // execute without transaction
* }
* </pre></code>
* </p>
*/
SUPPORTS = Propagation(3)

/*
* The NEVER.
*
* <p>
* If transaction is existing, throw exception,
* else execute business without transaction.
* </p>
*
* <p>
* The logic is similar to the following code:
* <code><pre>
* if (tx != null) {
* throw new TransactionException("existing transaction");
* }
* return business.execute(); // execute without transaction
* </pre></code>
* </p>
*/
NEVER = Propagation(4)

/*
* The MANDATORY.
*
* <p>
* If transaction is not existing, throw exception,
* else execute business with current transaction.
* </p>
*
* <p>
* The logic is similar to the following code:
* <code><pre>
* if (tx == null) {
* throw new TransactionException("not existing transaction");
* }
* return business.execute(); // execute with current transaction
* </pre></code>
* </p>
*/
MANDATORY = Propagation(5)
UnKnow = GlobalTransactionRole(0)
Launcher = GlobalTransactionRole(1)
Participant = GlobalTransactionRole(2)
)

func (role GlobalTransactionRole) String() string {
switch role {
case UnKnow:
return "UnKnow"
case Launcher:
return "Launcher"
case Participant:
return "Participant"
}
return "UnKnow"
}

// Propagation Used to identify the spread of the global transaction enumerated types
type Propagation int8

const (
// Required
// The default propagation.
// If transaction is existing, execute with current transaction,
// else execute with beginNewGtx transaction.
Required = Propagation(0)

// RequiresNew
// If transaction is existing, suspend it, and then execute business with beginNewGtx transaction.
RequiresNew = Propagation(1)

// NotSupported
// If transaction is existing, suspend it, and then execute business without transaction.
NotSupported = Propagation(2)

// Supports
// If transaction is not existing, execute without global transaction,
// else execute business with current transaction.
Supports = Propagation(3)

// Never
// If transaction is existing, throw exception,
// else execute business without transaction.
Never = Propagation(4)

// Mandatory
// If transaction is not existing, throw exception,
// else execute business with current transaction.
Mandatory = Propagation(5)
)

func (p Propagation) String() string {
switch p {
case Required:
return "Required"
case RequiresNew:
return "RequiresNew"
case NotSupported:
return "NotSupported"
case Supports:
return "Supports"
case Never:
return "Never"
case Mandatory:
return "Mandatory"
}
return "UnKnow"
}
47 changes: 34 additions & 13 deletions pkg/tm/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,16 @@ const (
seataContextVariable = ContextParam("seataContextVariable")
)

type GlobalTransaction struct {
Xid string
XidCopy string
TxName string
// TxStatus Identify a global transaction in a certain status
TxStatus message.GlobalStatus
// TxRole Roles in the transaction propagation behavior
TxRole GlobalTransactionRole
}

type BusinessActionContext struct {
Xid string
BranchId int64
Expand All @@ -40,13 +50,10 @@ type BusinessActionContext struct {
}

type ContextVariable struct {
TxName string
Xid string
XidCopy string
FencePhase enum.FencePhase
TxRole *GlobalTransactionRole
BusinessActionContext *BusinessActionContext
TxStatus *message.GlobalStatus
// GlobalTransaction Represent seata ctx is a global transaction
GlobalTransaction
}

func InitSeataContext(ctx context.Context) context.Context {
Expand All @@ -58,13 +65,13 @@ func GetTxStatus(ctx context.Context) *message.GlobalStatus {
if variable == nil {
return nil
}
return variable.(*ContextVariable).TxStatus
return &variable.(*ContextVariable).TxStatus
}

func SetTxStatus(ctx context.Context, status message.GlobalStatus) {
variable := ctx.Value(seataContextVariable)
if variable != nil {
variable.(*ContextVariable).TxStatus = &status
variable.(*ContextVariable).TxStatus = status
}
}

Expand Down Expand Up @@ -102,18 +109,18 @@ func SetBusinessActionContext(ctx context.Context, businessActionContext *Busine
}
}

func GetTransactionRole(ctx context.Context) *GlobalTransactionRole {
func GetTxRole(ctx context.Context) *GlobalTransactionRole {
variable := ctx.Value(seataContextVariable)
if variable == nil {
return nil
}
return variable.(*ContextVariable).TxRole
return &variable.(*ContextVariable).TxRole
}

func SetTransactionRole(ctx context.Context, role GlobalTransactionRole) {
func SetTxRole(ctx context.Context, role GlobalTransactionRole) {
variable := ctx.Value(seataContextVariable)
if variable != nil {
variable.(*ContextVariable).TxRole = &role
variable.(*ContextVariable).TxRole = role
}
}

Expand All @@ -122,8 +129,7 @@ func IsGlobalTx(ctx context.Context) bool {
if variable == nil {
return false
}
xid := variable.(*ContextVariable).Xid
return xid != ""
return variable.(*ContextVariable).Xid != ""
}

func GetXID(ctx context.Context) string {
Expand Down Expand Up @@ -160,6 +166,21 @@ func UnbindXid(ctx context.Context) {
}
}

func SetTx(ctx context.Context, tx *GlobalTransaction) {
variable := ctx.Value(seataContextVariable)
if variable != nil {
variable.(*ContextVariable).GlobalTransaction = *tx
}
}

func GetTx(ctx context.Context) (tx *GlobalTransaction) {
variable := ctx.Value(seataContextVariable)
if variable != nil {
tx = &variable.(*ContextVariable).GlobalTransaction
}
return
}

func SetFencePhase(ctx context.Context, phase enum.FencePhase) {
variable := ctx.Value(seataContextVariable)
if variable != nil {
Expand Down
Loading

0 comments on commit 48bf289

Please sign in to comment.