From b92252dc6349848b51a0811cdb606fa20df04cc6 Mon Sep 17 00:00:00 2001 From: ylsGit Date: Tue, 26 Mar 2024 15:25:27 +0800 Subject: [PATCH] add db index for gas_price (#154) * add db index for gas_price * add dynamic config QueryPendingTxsLimit --- db/migrations/pool/1001.sql | 5 +++++ docs/config-file/node-config-doc.html | 2 +- docs/config-file/node-config-doc.md | 15 +++++++++++++++ docs/config-file/node-config-schema.json | 5 +++++ pool/interfaces.go | 2 +- pool/pgpoolstorage/pgpoolstorage.go | 17 ++++++++++++----- pool/pool.go | 4 ++-- sequencer/apollo_xlayer.go | 13 +++++++++++++ sequencer/config.go | 2 ++ sequencer/interfaces.go | 2 +- sequencer/mock_pool.go | 2 +- sequencer/sequencer.go | 2 +- 12 files changed, 59 insertions(+), 12 deletions(-) create mode 100644 db/migrations/pool/1001.sql diff --git a/db/migrations/pool/1001.sql b/db/migrations/pool/1001.sql new file mode 100644 index 0000000000..4b2bdb0de2 --- /dev/null +++ b/db/migrations/pool/1001.sql @@ -0,0 +1,5 @@ +-- +migrate Up +CREATE INDEX IF NOT EXISTS idx_transaction_gas_price ON pool.transaction (gas_price); + +-- +migrate Down +DROP INDEX IF EXISTS pool.idx_transaction_gas_price; \ No newline at end of file diff --git a/docs/config-file/node-config-doc.html b/docs/config-file/node-config-doc.html index a428c76630..3bbc02fd46 100644 --- a/docs/config-file/node-config-doc.html +++ b/docs/config-file/node-config-doc.html @@ -60,7 +60,7 @@
"300ms"
 

Metrics is the config for the sequencer metrics
Default: "1h0m0s"Type: string

Interval is the interval of time to calculate sequencer metrics


Examples:

"1m"
 
"300ms"
-

Default: trueType: boolean

EnableLog is a flag to enable/disable metrics logs


StreamServerCfg is the config for the stream server
Default: 0Type: integer

Port to listen on


Default: ""Type: string

Filename of the binary data file


Default: 0Type: integer

Version of the binary data file


Default: 0Type: integer

ChainID is the chain ID


Default: falseType: boolean

Enabled is a flag to enable/disable the data streamer


Log is the log configuration
Default: ""Type: enum (of string)

Must be one of:

  • "production"
  • "development"

Default: ""Type: enum (of string)

Must be one of:

  • "debug"
  • "info"
  • "warn"
  • "error"
  • "dpanic"
  • "panic"
  • "fatal"

Type: array of string

Each item of this array must be:


Default: 0Type: integer

UpgradeEtrogBatchNumber is the batch number of the upgrade etrog


Type: array of string

XLayer config
PackBatchSpacialList is the list of addresses that will have a special gas price

Each item of this array must be:


Default: 0Type: number

GasPriceMultiple is the multiple of the gas price


Configuration of the sequence sender service
Default: "5s"Type: string

WaitPeriodSendSequence is the time the sequencer waits until
trying to send a sequence to L1


Examples:

"1m"
+

Default: trueType: boolean

EnableLog is a flag to enable/disable metrics logs


StreamServerCfg is the config for the stream server
Default: 0Type: integer

Port to listen on


Default: ""Type: string

Filename of the binary data file


Default: 0Type: integer

Version of the binary data file


Default: 0Type: integer

ChainID is the chain ID


Default: falseType: boolean

Enabled is a flag to enable/disable the data streamer


Log is the log configuration
Default: ""Type: enum (of string)

Must be one of:

  • "production"
  • "development"

Default: ""Type: enum (of string)

Must be one of:

  • "debug"
  • "info"
  • "warn"
  • "error"
  • "dpanic"
  • "panic"
  • "fatal"

Type: array of string

Each item of this array must be:


Default: 0Type: integer

UpgradeEtrogBatchNumber is the batch number of the upgrade etrog


Type: array of string

XLayer config
PackBatchSpacialList is the list of addresses that will have a special gas price

Each item of this array must be:


Default: 0Type: number

GasPriceMultiple is the multiple of the gas price


Default: 0Type: integer

QueryPendingTxsLimit is used to limit amount txs from the db


Configuration of the sequence sender service
Default: "5s"Type: string

WaitPeriodSendSequence is the time the sequencer waits until
trying to send a sequence to L1


Examples:

"1m"
 
"300ms"
 

Default: "5s"Type: string

LastBatchVirtualizationTimeMaxWaitPeriod is time since sequences should be sent


Examples:

"1m"
 
"300ms"
diff --git a/docs/config-file/node-config-doc.md b/docs/config-file/node-config-doc.md
index ec497f288a..7947880189 100644
--- a/docs/config-file/node-config-doc.md
+++ b/docs/config-file/node-config-doc.md
@@ -2572,6 +2572,7 @@ CheckLastL2BlockHashOnCloseBatch=true
 | - [StreamServer](#Sequencer_StreamServer )                                           | No      | object          | No         | -          | StreamServerCfg is the config for the stream server                                                 |
 | - [PackBatchSpacialList](#Sequencer_PackBatchSpacialList )                           | No      | array of string | No         | -          | XLayer config
PackBatchSpacialList is the list of addresses that will have a special gas price | | - [GasPriceMultiple](#Sequencer_GasPriceMultiple ) | No | number | No | - | GasPriceMultiple is the multiple of the gas price | +| - [QueryPendingTxsLimit](#Sequencer_QueryPendingTxsLimit ) | No | integer | No | - | QueryPendingTxsLimit is used to limit amount txs from the db | ### 10.1. `Sequencer.DeletePoolTxsL1BlockConfirmations` @@ -3230,6 +3231,20 @@ PackBatchSpacialList is the list of addresses that will have a special gas price GasPriceMultiple=0 ``` +### 10.11. `Sequencer.QueryPendingTxsLimit` + +**Type:** : `integer` + +**Default:** `0` + +**Description:** QueryPendingTxsLimit is used to limit amount txs from the db + +**Example setting the default value** (0): +``` +[Sequencer] +QueryPendingTxsLimit=0 +``` + ## 11. `[SequenceSender]` **Type:** : `object` diff --git a/docs/config-file/node-config-schema.json b/docs/config-file/node-config-schema.json index 3085e22264..3ca323e17d 100644 --- a/docs/config-file/node-config-schema.json +++ b/docs/config-file/node-config-schema.json @@ -1240,6 +1240,11 @@ "type": "number", "description": "GasPriceMultiple is the multiple of the gas price", "default": 0 + }, + "QueryPendingTxsLimit": { + "type": "integer", + "description": "QueryPendingTxsLimit is used to limit amount txs from the db", + "default": 0 } }, "additionalProperties": false, diff --git a/pool/interfaces.go b/pool/interfaces.go index b4bc26d648..76e8bebd78 100644 --- a/pool/interfaces.go +++ b/pool/interfaces.go @@ -21,7 +21,7 @@ type storage interface { GetPendingTxHashesSince(ctx context.Context, since time.Time) ([]common.Hash, error) GetTxsByFromAndNonce(ctx context.Context, from common.Address, nonce uint64) ([]Transaction, error) GetTxsByStatus(ctx context.Context, state TxStatus, limit uint64) ([]Transaction, error) - GetNonWIPPendingTxs(ctx context.Context) ([]Transaction, error) + GetNonWIPPendingTxs(ctx context.Context, limit uint64) ([]Transaction, error) IsTxPending(ctx context.Context, hash common.Hash) (bool, error) SetGasPrices(ctx context.Context, l2GasPrice uint64, l1GasPrice uint64) error DeleteGasPricesHistoryOlderThan(ctx context.Context, date time.Time) error diff --git a/pool/pgpoolstorage/pgpoolstorage.go b/pool/pgpoolstorage/pgpoolstorage.go index fbc0aaea62..0e2f6985ee 100644 --- a/pool/pgpoolstorage/pgpoolstorage.go +++ b/pool/pgpoolstorage/pgpoolstorage.go @@ -172,17 +172,24 @@ func (p *PostgresPoolStorage) GetTxsByStatus(ctx context.Context, status pool.Tx } // GetNonWIPPendingTxs returns an array of transactions -func (p *PostgresPoolStorage) GetNonWIPPendingTxs(ctx context.Context) ([]pool.Transaction, error) { +// limit parameter is used to limit amount txs from the db, +// if limit = 0, then there is no limit +func (p *PostgresPoolStorage) GetNonWIPPendingTxs(ctx context.Context, limit uint64) ([]pool.Transaction, error) { var ( rows pgx.Rows err error sql string ) - sql = `SELECT encoded, status, received_at, is_wip, ip, cumulative_gas_used, used_keccak_hashes, used_poseidon_hashes, used_poseidon_paddings, used_mem_aligns, - used_arithmetics, used_binaries, used_steps, used_sha256_hashes, failed_reason, reserved_zkcounters FROM pool.transaction WHERE is_wip IS FALSE and status = $1` - rows, err = p.db.Query(ctx, sql, pool.TxStatusPending) - + if limit == 0 { + sql = `SELECT encoded, status, received_at, is_wip, ip, cumulative_gas_used, used_keccak_hashes, used_poseidon_hashes, used_poseidon_paddings, used_mem_aligns, + used_arithmetics, used_binaries, used_steps, used_sha256_hashes, failed_reason, reserved_zkcounters FROM pool.transaction WHERE is_wip IS FALSE and status = $1 ORDER BY gas_price DESC` + rows, err = p.db.Query(ctx, sql, pool.TxStatusPending) + } else { + sql = `SELECT encoded, status, received_at, is_wip, ip, cumulative_gas_used, used_keccak_hashes, used_poseidon_hashes, used_poseidon_paddings, used_mem_aligns, + used_arithmetics, used_binaries, used_steps, used_sha256_hashes, failed_reason, reserved_zkcounters FROM pool.transaction WHERE is_wip IS FALSE and status = $1 ORDER BY gas_price DESC LIMIT $2` + rows, err = p.db.Query(ctx, sql, pool.TxStatusPending, limit) + } if err != nil { return nil, err } diff --git a/pool/pool.go b/pool/pool.go index d0862befd5..f96935a65b 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -356,8 +356,8 @@ func (p *Pool) GetPendingTxs(ctx context.Context, limit uint64) ([]Transaction, } // GetNonWIPPendingTxs from the pool -func (p *Pool) GetNonWIPPendingTxs(ctx context.Context) ([]Transaction, error) { - return p.storage.GetNonWIPPendingTxs(ctx) +func (p *Pool) GetNonWIPPendingTxs(ctx context.Context, limit uint64) ([]Transaction, error) { + return p.storage.GetNonWIPPendingTxs(ctx, limit) } // GetSelectedTxs gets selected txs from the pool db diff --git a/sequencer/apollo_xlayer.go b/sequencer/apollo_xlayer.go index f6e26e12d4..b3bce074a5 100644 --- a/sequencer/apollo_xlayer.go +++ b/sequencer/apollo_xlayer.go @@ -13,6 +13,7 @@ type ApolloConfig struct { FullBatchSleepDuration types.Duration PackBatchSpacialList []string GasPriceMultiple float64 + QueryPendingTxsLimit uint64 sync.RWMutex } @@ -41,6 +42,7 @@ func UpdateConfig(apolloConfig Config) { getApolloConfig().FullBatchSleepDuration = apolloConfig.Finalizer.FullBatchSleepDuration getApolloConfig().PackBatchSpacialList = apolloConfig.PackBatchSpacialList getApolloConfig().GasPriceMultiple = apolloConfig.GasPriceMultiple + getApolloConfig().QueryPendingTxsLimit = apolloConfig.QueryPendingTxsLimit getApolloConfig().Unlock() } @@ -82,3 +84,14 @@ func getGasPriceMultiple(gpMul float64) float64 { return ret } + +func getQueryPendingTxsLimit(limit uint64) uint64 { + ret := limit + if getApolloConfig().Enable() { + getApolloConfig().RLock() + defer getApolloConfig().RUnlock() + ret = getApolloConfig().QueryPendingTxsLimit + } + + return ret +} diff --git a/sequencer/config.go b/sequencer/config.go index 122d2f7334..e55444ddc8 100644 --- a/sequencer/config.go +++ b/sequencer/config.go @@ -36,6 +36,8 @@ type Config struct { PackBatchSpacialList []string `mapstructure:"PackBatchSpacialList"` // GasPriceMultiple is the multiple of the gas price GasPriceMultiple float64 `mapstructure:"GasPriceMultiple"` + // QueryPendingTxsLimit is used to limit amount txs from the db + QueryPendingTxsLimit uint64 `mapstructure:"QueryPendingTxsLimit"` } // StreamServerCfg contains the data streamer's configuration properties diff --git a/sequencer/interfaces.go b/sequencer/interfaces.go index b518e31cf2..46a1ecf79a 100644 --- a/sequencer/interfaces.go +++ b/sequencer/interfaces.go @@ -19,7 +19,7 @@ type txPool interface { DeleteFailedTransactionsOlderThan(ctx context.Context, date time.Time) error DeleteTransactionByHash(ctx context.Context, hash common.Hash) error MarkWIPTxsAsPending(ctx context.Context) error - GetNonWIPPendingTxs(ctx context.Context) ([]pool.Transaction, error) + GetNonWIPPendingTxs(ctx context.Context, limit uint64) ([]pool.Transaction, error) UpdateTxStatus(ctx context.Context, hash common.Hash, newStatus pool.TxStatus, isWIP bool, failedReason *string) error GetTxZkCountersByHash(ctx context.Context, hash common.Hash) (*state.ZKCounters, *state.ZKCounters, error) UpdateTxWIPStatus(ctx context.Context, hash common.Hash, isWIP bool) error diff --git a/sequencer/mock_pool.go b/sequencer/mock_pool.go index 00bc480699..22d9b44056 100644 --- a/sequencer/mock_pool.go +++ b/sequencer/mock_pool.go @@ -180,7 +180,7 @@ func (_m *PoolMock) GetL1AndL2GasPrice() (uint64, uint64) { } // GetNonWIPPendingTxs provides a mock function with given fields: ctx -func (_m *PoolMock) GetNonWIPPendingTxs(ctx context.Context) ([]pool.Transaction, error) { +func (_m *PoolMock) GetNonWIPPendingTxs(ctx context.Context, limit uint64) ([]pool.Transaction, error) { ret := _m.Called(ctx) if len(ret) == 0 { diff --git a/sequencer/sequencer.go b/sequencer/sequencer.go index 62e7e3bdb6..00457712f1 100644 --- a/sequencer/sequencer.go +++ b/sequencer/sequencer.go @@ -200,7 +200,7 @@ func (s *Sequencer) expireOldWorkerTxs(ctx context.Context) { // loadFromPool keeps loading transactions from the pool func (s *Sequencer) loadFromPool(ctx context.Context) { for { - poolTransactions, err := s.pool.GetNonWIPPendingTxs(ctx) + poolTransactions, err := s.pool.GetNonWIPPendingTxs(ctx, getQueryPendingTxsLimit(s.cfg.QueryPendingTxsLimit)) if err != nil && err != pool.ErrNotFound { log.Errorf("error loading txs from pool, error: %v", err) }