Skip to content

Commit

Permalink
implement tx pool cleanup function (#547)
Browse files Browse the repository at this point in the history
  • Loading branch information
lightshine001 authored and laizy committed Jul 16, 2018
1 parent 25d16ae commit f3231af
Show file tree
Hide file tree
Showing 13 changed files with 98 additions and 72 deletions.
2 changes: 1 addition & 1 deletion cmd/usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ var AppHelpFlagGroups = []flagGroup{
Flags: []cli.Flag{
utils.GasPriceFlag,
utils.GasLimitFlag,
utils.TxpoolPreExecEnableFlag,
utils.TxpoolPreExecDisableFlag,
utils.DisableSyncVerifyTxFlag,
},
},
Expand Down
6 changes: 3 additions & 3 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,9 +422,9 @@ var (
}

//PreExecute switcher
TxpoolPreExecEnableFlag = cli.BoolFlag{
Name: "enabletxpoolpreexec",
Usage: "Enable preExecute in tx pool",
TxpoolPreExecDisableFlag = cli.BoolFlag{
Name: "disabletxpoolpreexec",
Usage: "Disable preExecute in tx pool",
}

//local PreExecute switcher
Expand Down
4 changes: 2 additions & 2 deletions docs/specifications/cli_user_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,8 @@ The gasprice parameter is used to set the lowest gasprice of the current node tr
--gaslimit
The gaslimit parameter is used to set the gaslimit of the current node transaction pool to accept transactions. Transactions below this gaslimit will be discarded. The default value is 20000.

--enabletxpoolpreexec
The enabletxpoolpreexec parameter is used to enable preExecute in the transaction pool, which checks whether a transactor has sufficient balance to cover transaction cost. For those who have insufficient balance, the transactions from network will be discarded by the transaction pool. By default, preExecute is disabled when ontology bootstrap.
--disabletxpoolpreexec
The disabletxpoolpreexec parameter is used to disable preExecution of a transaction from network in the transaction pool. By default, preExecution is enabled when ontology bootstrap.

--disablesyncverifytx
The disablesyncverifytx is used to disable sync verify transaction in send transaction,include rpc restful websocket.
Expand Down
4 changes: 2 additions & 2 deletions docs/specifications/cli_user_guide_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ gasprice 参数用于设定当前节点交易池接受交易的最低gasprice,
--gaslimit
gaslimit 参数用于设置当前节点交易池接受交易的最低gaslimit,低于这个gaslimit的交易将被丢弃。默认值为20000。

--enabletxpoolpreexec
enabletxpoolpreexec 参数用于在交易池中打开预执行,对来自网络的交易检查付款方账户余额,余额不足的交易将被丢弃。Ontology节点在启动时交易池默认关闭预执行
--disabletxpoolpreexec
disabletxpoolpreexec 参数用于关闭交易池中对来自网络的交易预执行校验。Ontology节点在启动时交易池默认打开预执行

--disablesyncverifytx
disablesyncverifytx 参数用于关闭rpc、restful、websocket中同步验证交易
Expand Down
6 changes: 3 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func setupAPP() *cli.App {
//txpool setting
utils.GasPriceFlag,
utils.GasLimitFlag,
utils.TxpoolPreExecEnableFlag,
utils.TxpoolPreExecDisableFlag,
utils.DisableSyncVerifyTxFlag,
//p2p setting
utils.ReservedPeersOnlyFlag,
Expand Down Expand Up @@ -270,9 +270,9 @@ func initLedger(ctx *cli.Context) (*ledger.Ledger, error) {
}

func initTxPool(ctx *cli.Context) (*proc.TXPoolServer, error) {
preExec := ctx.GlobalBool(utils.GetFlagName(utils.TxpoolPreExecEnableFlag))
disablePreExec := ctx.GlobalBool(utils.GetFlagName(utils.TxpoolPreExecDisableFlag))
bactor.DisableSyncVerifyTx = ctx.GlobalBool(utils.GetFlagName(utils.DisableSyncVerifyTxFlag))
txPoolServer, err := txnpool.StartTxnPoolServer(preExec)
txPoolServer, err := txnpool.StartTxnPoolServer(disablePreExec)
if err != nil {
return nil, fmt.Errorf("Init txpool error:%s", err)
}
Expand Down
14 changes: 14 additions & 0 deletions txnpool/common/transaction_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,3 +246,17 @@ func (tp *TXPool) RemoveTxsBelowGasPrice(gasPrice uint64) {
}
}
}

// Remain returns the remaining tx list to cleanup
func (tp *TXPool) Remain() []*types.Transaction {
tp.Lock()
defer tp.Unlock()

txList := make([]*types.Transaction, 0, len(tp.txList))
for _, txEntry := range tp.txList {
txList = append(txList, txEntry.Tx)
delete(tp.txList, txEntry.Tx.Hash())
}

return txList
}
67 changes: 34 additions & 33 deletions txnpool/proc/txnpool_actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,35 @@ func replyTxResult(txResultCh chan *tc.TxResult, hash common.Uint256,
}
}

// preExecCheck checks whether preExec pass
func preExecCheck(txn *tx.Transaction) (bool, string) {
result, err := ledger.DefLedger.PreExecuteContract(txn)
if err != nil {
log.Debugf("preExecCheck: failed to preExecuteContract tx %x err %v",
txn.Hash(), err)
}
if txn.GasLimit < result.Gas {
log.Debugf("preExecCheck: transaction's gasLimit %d is less than preExec gasLimit %d",
txn.GasLimit, result.Gas)
return false, fmt.Sprintf("transaction's gasLimit %d is less than preExec gasLimit %d",
txn.GasLimit, result.Gas)
}
gas, overflow := common.SafeMul(txn.GasPrice, result.Gas)
if overflow {
log.Debugf("preExecCheck: gasPrice %d preExec gasLimit %d overflow",
txn.GasPrice, result.Gas)
return false, fmt.Sprintf("gasPrice %d preExec gasLimit %d overflow",
txn.GasPrice, result.Gas)
}
if !isBalanceEnough(txn.Payer, gas) {
log.Debugf("preExecCheck: transactor %s has no balance enough to cover gas cost %d",
txn.Payer.ToHexString(), gas)
return false, fmt.Sprintf("transactor %s has no balance enough to cover gas cost %d",
txn.Payer.ToHexString(), gas)
}
return true, ""
}

// TxnActor: Handle the low priority msg from P2P and API
type TxActor struct {
server *TXPoolServer
Expand Down Expand Up @@ -156,43 +185,15 @@ func (ta *TxActor) handleTransaction(sender tc.SenderType, self *actor.PID,
return
}

if ta.server.preExec {
result, err := ledger.DefLedger.PreExecuteContract(txn)
if err != nil {
log.Debugf("handleTransaction: failed to preExecuteContract tx %x err %v",
txn.Hash(), err)
}
if txn.GasLimit < result.Gas {
log.Debugf("handleTransaction: transaction's gasLimit %d is less than preExec gasLimit %d",
txn.GasLimit, result.Gas)
if sender == tc.HttpSender && txResultCh != nil {
replyTxResult(txResultCh, txn.Hash(), errors.ErrUnknown,
fmt.Sprintf("transaction's gasLimit %d is less than preExec gasLimit %d",
txn.GasLimit, result.Gas))
}
return
}
gas, overflow := common.SafeMul(txn.GasPrice, result.Gas)
if overflow {
log.Debugf("handleTransaction: gasPrice %d preExec gasLimit %d overflow",
txn.GasPrice, result.Gas)
if sender == tc.HttpSender && txResultCh != nil {
replyTxResult(txResultCh, txn.Hash(), errors.ErrUnknown,
fmt.Sprintf("gasPrice %d * preExec gasLimit %d overflow",
txn.GasPrice, result.Gas))
}
return
}
if !isBalanceEnough(txn.Payer, gas) {
log.Debugf("handleTransaction: transactor %s has no balance enough to cover gas cost %d",
txn.Payer.ToHexString(), gas)
if !ta.server.disablePreExec {
if ok, desc := preExecCheck(txn); !ok {
log.Debugf("handleTransaction: preExecCheck tx %x failed", txn.Hash())
if sender == tc.HttpSender && txResultCh != nil {
replyTxResult(txResultCh, txn.Hash(), errors.ErrUnknown,
fmt.Sprintf("insufficient balance to cover gas cost %d", gas))
replyTxResult(txResultCh, txn.Hash(), errors.ErrUnknown, desc)
}
return
}
log.Debugf("handleTransaction: tx %x preExec success", txn.Hash())
log.Debugf("handleTransaction: preExecCheck tx %x passed", txn.Hash())
}
<-ta.server.slots
ta.server.assignTxToWorker(txn, sender, txResultCh)
Expand Down
6 changes: 3 additions & 3 deletions txnpool/proc/txnpool_actor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func init() {

func TestTxActor(t *testing.T) {
t.Log("Starting tx actor test")
s := NewTxPoolServer(tc.MAX_WORKER_NUM, false)
s := NewTxPoolServer(tc.MAX_WORKER_NUM, true)
if s == nil {
t.Error("Test case: new tx pool server failed")
return
Expand Down Expand Up @@ -127,7 +127,7 @@ func TestTxActor(t *testing.T) {

func TestTxPoolActor(t *testing.T) {
t.Log("Starting tx pool actor test")
s := NewTxPoolServer(tc.MAX_WORKER_NUM, false)
s := NewTxPoolServer(tc.MAX_WORKER_NUM, true)
if s == nil {
t.Error("Test case: new tx pool server failed")
return
Expand Down Expand Up @@ -181,7 +181,7 @@ func TestTxPoolActor(t *testing.T) {

func TestVerifyRspActor(t *testing.T) {
t.Log("Starting validator response actor test")
s := NewTxPoolServer(tc.MAX_WORKER_NUM, false)
s := NewTxPoolServer(tc.MAX_WORKER_NUM, true)
if s == nil {
t.Error("Test case: new tx pool server failed")
return
Expand Down
45 changes: 28 additions & 17 deletions txnpool/proc/txnpool_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,26 +72,26 @@ type registerValidators struct {

// TXPoolServer contains all api to external modules
type TXPoolServer struct {
mu sync.RWMutex // Sync mutex
wg sync.WaitGroup // Worker sync
workers []txPoolWorker // Worker pool
txPool *tc.TXPool // The tx pool that holds the valid transaction
allPendingTxs map[common.Uint256]*serverPendingTx // The txs that server is processing
pendingBlock *pendingBlock // The block that server is processing
actors map[tc.ActorType]*actor.PID // The actors running in the server
validators *registerValidators // The registered validators
stats txStats // The transaction statstics
slots chan struct{} // The limited slots for the new transaction
height uint32 // The current block height
gasPrice uint64 // Gas price to enforce for acceptance into the pool
preExec bool // PreExecute a transaction
mu sync.RWMutex // Sync mutex
wg sync.WaitGroup // Worker sync
workers []txPoolWorker // Worker pool
txPool *tc.TXPool // The tx pool that holds the valid transaction
allPendingTxs map[common.Uint256]*serverPendingTx // The txs that server is processing
pendingBlock *pendingBlock // The block that server is processing
actors map[tc.ActorType]*actor.PID // The actors running in the server
validators *registerValidators // The registered validators
stats txStats // The transaction statstics
slots chan struct{} // The limited slots for the new transaction
height uint32 // The current block height
gasPrice uint64 // Gas price to enforce for acceptance into the pool
disablePreExec bool // Disbale PreExecute a transaction
}

// NewTxPoolServer creates a new tx pool server to schedule workers to
// handle and filter inbound transactions from the network, http, and consensus.
func NewTxPoolServer(num uint8, preExec bool) *TXPoolServer {
func NewTxPoolServer(num uint8, disablePreExec bool) *TXPoolServer {
s := &TXPoolServer{}
s.init(num, preExec)
s.init(num, disablePreExec)
return s
}

Expand Down Expand Up @@ -144,7 +144,7 @@ func getGasPriceConfig() uint64 {
}

// init initializes the server with the configured settings
func (s *TXPoolServer) init(num uint8, preExec bool) {
func (s *TXPoolServer) init(num uint8, disablePreExec bool) {
// Initial txnPool
s.txPool = &tc.TXPool{}
s.txPool.Init()
Expand Down Expand Up @@ -173,7 +173,7 @@ func (s *TXPoolServer) init(num uint8, preExec bool) {
s.gasPrice = getGasPriceConfig()
log.Infof("tx pool: the current local gas price is %d", s.gasPrice)

s.preExec = preExec
s.disablePreExec = disablePreExec
// Create the given concurrent workers
s.workers = make([]txPoolWorker, num)
// Initial and start the workers
Expand Down Expand Up @@ -539,6 +539,17 @@ func (s *TXPoolServer) cleanTransactionList(txs []*tx.Transaction, height uint32
s.txPool.RemoveTxsBelowGasPrice(gasPrice)
}
}
// Cleanup tx pool
if !s.disablePreExec {
remain := s.txPool.Remain()
for _, t := range remain {
if ok, _ := preExecCheck(t); !ok {
log.Debugf("cleanTransactionList: preExecCheck tx %x failed", t.Hash())
continue
}
s.reVerifyStateful(t, tc.NilSender)
}
}
}

// delTransaction deletes a transaction in the tx pool.
Expand Down
8 changes: 4 additions & 4 deletions txnpool/proc/txnpool_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func startActor(obj interface{}) *actor.PID {
func TestTxn(t *testing.T) {
t.Log("Starting test tx")
var s *TXPoolServer
s = NewTxPoolServer(tc.MAX_WORKER_NUM, false)
s = NewTxPoolServer(tc.MAX_WORKER_NUM, true)
if s == nil {
t.Error("Test case: new tx pool server failed")
return
Expand Down Expand Up @@ -129,7 +129,7 @@ func TestTxn(t *testing.T) {
func TestAssignRsp2Worker(t *testing.T) {
t.Log("Starting assign response to the worker testing")
var s *TXPoolServer
s = NewTxPoolServer(tc.MAX_WORKER_NUM, false)
s = NewTxPoolServer(tc.MAX_WORKER_NUM, true)
if s == nil {
t.Error("Test case: new tx pool server failed")
return
Expand Down Expand Up @@ -172,7 +172,7 @@ func TestAssignRsp2Worker(t *testing.T) {
func TestActor(t *testing.T) {
t.Log("Starting actor testing")
var s *TXPoolServer
s = NewTxPoolServer(tc.MAX_WORKER_NUM, false)
s = NewTxPoolServer(tc.MAX_WORKER_NUM, true)
if s == nil {
t.Error("Test case: new tx pool server failed")
return
Expand Down Expand Up @@ -240,7 +240,7 @@ func TestActor(t *testing.T) {
func TestValidator(t *testing.T) {
t.Log("Starting validator testing")
var s *TXPoolServer
s = NewTxPoolServer(tc.MAX_WORKER_NUM, false)
s = NewTxPoolServer(tc.MAX_WORKER_NUM, true)
if s == nil {
t.Error("Test case: new tx pool server failed")
return
Expand Down
2 changes: 1 addition & 1 deletion txnpool/proc/txnpool_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (

func TestWorker(t *testing.T) {
t.Log("Starting worker test")
s := NewTxPoolServer(tc.MAX_WORKER_NUM, false)
s := NewTxPoolServer(tc.MAX_WORKER_NUM, true)
if s == nil {
t.Error("Test case: new tx pool server failed")
return
Expand Down
4 changes: 2 additions & 2 deletions txnpool/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,13 @@ func startActor(obj interface{}, id string) (*actor.PID, error) {
// StartTxnPoolServer starts the txnpool server and registers
// actors to handle the msgs from the network, http, consensus
// and validators. Meanwhile subscribes the block complete event.
func StartTxnPoolServer(preExec bool) (*tp.TXPoolServer, error) {
func StartTxnPoolServer(disablePreExec bool) (*tp.TXPoolServer, error) {
var s *tp.TXPoolServer

/* Start txnpool server to receive msgs from p2p,
* consensus and valdiators
*/
s = tp.NewTxPoolServer(tc.MAX_WORKER_NUM, preExec)
s = tp.NewTxPoolServer(tc.MAX_WORKER_NUM, disablePreExec)

// Initialize an actor to handle the msgs from valdiators
rspActor := tp.NewVerifyRspActor(s)
Expand Down
2 changes: 1 addition & 1 deletion txnpool/test/txnpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func Test_RCV(t *testing.T) {
}

// Start txnpool server to receive msgs from p2p, consensus and valdiators
s = tp.NewTxPoolServer(tc.MAX_WORKER_NUM, false)
s = tp.NewTxPoolServer(tc.MAX_WORKER_NUM, true)

// Initialize an actor to handle the msgs from valdiators
rspActor := tp.NewVerifyRspActor(s)
Expand Down

0 comments on commit f3231af

Please sign in to comment.