Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into assert
Browse files Browse the repository at this point in the history
  • Loading branch information
trinitys7 committed Nov 17, 2023
2 parents b21e6b7 + 5919fa9 commit 5ae1f78
Show file tree
Hide file tree
Showing 19 changed files with 230 additions and 97 deletions.
18 changes: 9 additions & 9 deletions block/block-manager.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,16 @@ sequenceDiagram
Sequencer->>Full Node 2: Gossip Block
Full Node 1->>Full Node 1: Verify Block
Full Node 1->>Full Node 2: Gossip Block
Full Node 1->>Full Node 1: Mark Block Soft-Confirmed
Full Node 1->>Full Node 1: Mark Block Soft Confirmed
Full Node 2->>Full Node 2: Verify Block
Full Node 2->>Full Node 2: Mark Block Soft-Confirmed
Full Node 2->>Full Node 2: Mark Block Soft Confirmed
DA Layer->>Full Node 1: Retrieve Block
Full Node 1->>Full Node 1: Mark Block Hard-Confirmed
Full Node 1->>Full Node 1: Mark Block DA Included
DA Layer->>Full Node 2: Retrieve Block
Full Node 2->>Full Node 2: Mark Block Hard-Confirmed
Full Node 2->>Full Node 2: Mark Block DA Included
```

## Protocol/Component Description
Expand Down Expand Up @@ -90,7 +90,7 @@ The block manager of the sequencer full nodes regularly publishes the produced b

### Block Retrieval from DA Network

The block manager of the full nodes regularly pulls blocks from the DA network at `DABlockTime` intervals and starts off with a DA height read from the last state stored in the local store or `DAStartHeight` configuration parameter, whichever is the latest. The block manager also actively maintains and increments the `daHeight` counter after every DA pull. The pull happens by making the `RetrieveBlocks(daHeight)` request using the Data Availability Light Client (DALC) retriever, which can return either `Success`, `NotFound`, or `Error`. In the event of an error, a retry logic kicks in after a delay of 100 milliseconds delay between every retry and after 10 retries, an error is logged and the `daHeight` counter is not incremented, which basically results in the intentional stalling of the block retrieval logic. In the block `NotFound` scenario, there is no error as it is acceptable to have no rollup block at every DA height. The retrieval successfully increments the `daHeight` counter in this case. Finally, for the `Success` scenario, first, blocks that are successfully retrieved are marked as hard confirmed and are sent to be applied (or state update). A successful state update triggers fresh DA and block store pulls without respecting the `DABlockTime` and `BlockTime` intervals.
The block manager of the full nodes regularly pulls blocks from the DA network at `DABlockTime` intervals and starts off with a DA height read from the last state stored in the local store or `DAStartHeight` configuration parameter, whichever is the latest. The block manager also actively maintains and increments the `daHeight` counter after every DA pull. The pull happens by making the `RetrieveBlocks(daHeight)` request using the Data Availability Light Client (DALC) retriever, which can return either `Success`, `NotFound`, or `Error`. In the event of an error, a retry logic kicks in after a delay of 100 milliseconds delay between every retry and after 10 retries, an error is logged and the `daHeight` counter is not incremented, which basically results in the intentional stalling of the block retrieval logic. In the block `NotFound` scenario, there is no error as it is acceptable to have no rollup block at every DA height. The retrieval successfully increments the `daHeight` counter in this case. Finally, for the `Success` scenario, first, blocks that are successfully retrieved are marked as DA included and are sent to be applied (or state update). A successful state update triggers fresh DA and block store pulls without respecting the `DABlockTime` and `BlockTime` intervals.

### Block Sync Service

Expand All @@ -109,13 +109,13 @@ For non-sequencer full nodes, Blocks gossiped through the P2P network are retrie
Starting off with a block store height of zero, for every `blockTime` unit of time, a signal is sent to the `blockStoreCh` channel in the block manager and when this signal is received, the `BlockStoreRetrieveLoop` retrieves blocks from the block store.
It keeps track of the last retrieved block's height and every time the current block store's height is greater than the last retrieved block's height, it retrieves all blocks from the block store that are between these two heights.
For each retrieved block, it sends a new block event to the `blockInCh` channel which is the same channel in which blocks retrieved from the DA layer are sent.
This block is marked as soft-confirmed by the validating full node until the same block is seen on the DA layer and then marked hard-confirmed.
This block is marked as soft confirmed by the validating full node until the same block is seen on the DA layer and then marked DA-included.

Although a sequencer does not need to retrieve blocks from the P2P network, it still runs the `BlockStoreRetrieveLoop`.

#### About Soft/Hard Confirmations
#### About Soft Confirmations and DA Inclusions

The block manager retrieves blocks from both the P2P network and the underlying DA network because the blocks are available in the P2P network faster and DA retrieval is slower (e.g., 1 second vs 15 seconds). The blocks retrieved from the P2P network are only marked as soft confirmed until the DA retrieval succeeds on those blocks and they are marked hard confirmed. The hard confirmations can be considered to have a higher level of finality.
The block manager retrieves blocks from both the P2P network and the underlying DA network because the blocks are available in the P2P network faster and DA retrieval is slower (e.g., 1 second vs 15 seconds). The blocks retrieved from the P2P network are only marked as soft confirmed until the DA retrieval succeeds on those blocks and they are marked DA included. DA included blocks can be considered to have a higher level of finality.

### State Update after Block Retrieval

Expand Down Expand Up @@ -145,7 +145,7 @@ The communication between the full node and block manager:
* The default mode for sequencer nodes is normal (not lazy).
* The sequencer can produce empty blocks.
* The block manager uses persistent storage (disk) when the `root_dir` and `db_path` configuration parameters are specified in `config.toml` file under the app directory. If these configuration parameters are not specified, the in-memory storage is used, which will not be persistent if the node stops.
* The block manager does not re-apply the block again (in other words, create a new updated state and persist it) when a block was initially applied using P2P block sync, but later was hard confirmed by DA retrieval. The block is only set hard confirmed in this case.
* The block manager does not re-apply the block again (in other words, create a new updated state and persist it) when a block was initially applied using P2P block sync, but later was DA included during DA retrieval. The block is only marked DA included in this case.
* The block sync store is created by prefixing `blockSync` on the main data store.
* The genesis `ChainID` is used to create the `PubSubTopID` in go-header with the string `-block` appended to it. This append is because the full node also has a P2P header sync running with a different P2P network. Refer to go-header specs for more details.
* Block sync over the P2P network works only when a full node is connected to the P2P network by specifying the initial seeds to connect to via `P2PConfig.Seeds` configuration parameter when starting the full node.
Expand Down
24 changes: 12 additions & 12 deletions block/block_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,19 @@ import (

// BlockCache maintains blocks that are seen and hard confirmed
type BlockCache struct {
blocks map[uint64]*types.Block
hashes map[string]bool
hardConfirmations map[string]bool
mtx *sync.RWMutex
blocks map[uint64]*types.Block
hashes map[string]bool
daIncluded map[string]bool
mtx *sync.RWMutex
}

// NewBlockCache returns a new BlockCache struct
func NewBlockCache() *BlockCache {
return &BlockCache{
blocks: make(map[uint64]*types.Block),
hashes: make(map[string]bool),
hardConfirmations: make(map[string]bool),
mtx: new(sync.RWMutex),
blocks: make(map[uint64]*types.Block),
hashes: make(map[string]bool),
daIncluded: make(map[string]bool),
mtx: new(sync.RWMutex),
}
}

Expand Down Expand Up @@ -55,14 +55,14 @@ func (bc *BlockCache) setSeen(hash string) {
bc.hashes[hash] = true
}

func (bc *BlockCache) isHardConfirmed(hash string) bool {
func (bc *BlockCache) isDAIncluded(hash string) bool {
bc.mtx.RLock()
defer bc.mtx.RUnlock()
return bc.hardConfirmations[hash]
return bc.daIncluded[hash]
}

func (bc *BlockCache) setHardConfirmed(hash string) {
func (bc *BlockCache) setDAIncluded(hash string) {
bc.mtx.Lock()
defer bc.mtx.Unlock()
bc.hardConfirmations[hash] = true
bc.daIncluded[hash] = true
}
8 changes: 4 additions & 4 deletions block/block_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ func TestBlockCache(t *testing.T) {
bc.setSeen("hash")
require.True(t, bc.isSeen("hash"), "isSeen should return true for seen hash")

// Test setHardConfirmed
require.False(t, bc.isHardConfirmed("hash"), "hardConfirmations should be false for unseen hash")
bc.setHardConfirmed("hash")
require.True(t, bc.isHardConfirmed("hash"), "hardConfirmations should be true for seen hash")
// Test setDAIncluded
require.False(t, bc.isDAIncluded("hash"), "DAIncluded should be false for unseen hash")
bc.setDAIncluded("hash")
require.True(t, bc.isDAIncluded("hash"), "DAIncluded should be true for seen hash")
}
9 changes: 4 additions & 5 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,9 +207,8 @@ func (m *Manager) GetStoreHeight() uint64 {
return m.store.Height()
}

// GetHardConfirmation returns true if the block is hard confirmed
func (m *Manager) GetHardConfirmation(hash types.Hash) bool {
return m.blockCache.isHardConfirmed(hash.String())
func (m *Manager) IsDAIncluded(hash types.Hash) bool {
return m.blockCache.isDAIncluded(hash.String())
}

// AggregationLoop is responsible for aggregating transactions into rollup-blocks.
Expand Down Expand Up @@ -508,8 +507,8 @@ func (m *Manager) processNextDABlock(ctx context.Context) error {
m.logger.Debug("retrieved potential blocks", "n", len(blockResp.Blocks), "daHeight", daHeight)
for _, block := range blockResp.Blocks {
blockHash := block.Hash().String()
m.blockCache.setHardConfirmed(blockHash)
m.logger.Info("block marked as hard confirmed", "blockHeight", block.Height(), "blockHash", blockHash)
m.blockCache.setDAIncluded(blockHash)
m.logger.Info("block marked as DA included", "blockHeight", block.Height(), "blockHash", blockHash)
if !m.blockCache.isSeen(blockHash) {
m.blockInCh <- newBlockEvent{block, daHeight}
}
Expand Down
13 changes: 6 additions & 7 deletions block/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,18 +102,17 @@ func getMockDALC(logger log.Logger) da.DataAvailabilityLayerClient {
return dalc
}

func TestGetHardConfirmation(t *testing.T) {
func TestIsDAIncluded(t *testing.T) {
// Create a minimalistic block manager
m := &Manager{
blockCache: NewBlockCache(),
}
hash := types.Hash([]byte("hash"))

// GetHardConfirmation should return false for unseen hash
require.False(t, m.GetHardConfirmation(hash))
// IsDAIncluded should return false for unseen hash
require.False(t, m.IsDAIncluded(hash))

// Set the hash as hard confirmed and verify GetHardConfirmation returns
// true
m.blockCache.setHardConfirmed(hash.String())
require.True(t, m.GetHardConfirmation(hash))
// Set the hash as DAIncluded and verify IsDAIncluded returns true
m.blockCache.setDAIncluded(hash.String())
require.True(t, m.IsDAIncluded(hash))
}
2 changes: 1 addition & 1 deletion config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ var DefaultNodeConfig = NodeConfig{
DABlockTime: 15 * time.Second,
NamespaceID: types.NamespaceID{},
},
DALayer: "mock",
DALayer: "newda",
DAConfig: "",
Light: false,
HeaderConfig: HeaderConfig{
Expand Down
35 changes: 4 additions & 31 deletions da/celestia/mock/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import (
mux2 "github.com/gorilla/mux"

"github.com/rollkit/celestia-openrpc/types/blob"
"github.com/rollkit/celestia-openrpc/types/header"
mockda "github.com/rollkit/rollkit/da/mock"
"github.com/rollkit/go-da/test"
"github.com/rollkit/rollkit/da/newda"
"github.com/rollkit/rollkit/store"
"github.com/rollkit/rollkit/third_party/log"
"github.com/rollkit/rollkit/types"
Expand Down Expand Up @@ -53,7 +53,7 @@ type response struct {

// Server mocks celestia-node HTTP API.
type Server struct {
mock *mockda.DataAvailabilityLayerClient
mock *newda.NewDA
blockTime time.Duration
server *httptest.Server
logger log.Logger
Expand All @@ -62,7 +62,7 @@ type Server struct {
// NewServer creates new instance of Server.
func NewServer(blockTime time.Duration, logger log.Logger) *Server {
return &Server{
mock: new(mockda.DataAvailabilityLayerClient),
mock: &newda.NewDA{DA: test.NewDummyDA()},
blockTime: blockTime,
logger: logger,
}
Expand Down Expand Up @@ -107,33 +107,6 @@ func (s *Server) rpc(w http.ResponseWriter, r *http.Request) {
return
}
switch req.Method {
case "header.GetByHeight":
var params []interface{}
err := json.Unmarshal(req.Params, &params)
if err != nil {
s.writeError(w, err)
return
}
if len(params) != 1 {
s.writeError(w, errors.New("expected 1 param: height (uint64)"))
return
}
height := uint64(params[0].(float64))
dah := s.mock.GetHeaderByHeight(height)
resp := &response{
Jsonrpc: "2.0",
Result: header.ExtendedHeader{
DAH: dah,
},
ID: req.ID,
Error: nil,
}
bytes, err := json.Marshal(resp)
if err != nil {
s.writeError(w, err)
return
}
s.writeResponse(w, bytes)
case "blob.GetAll":
var params []interface{}
err := json.Unmarshal(req.Params, &params)
Expand Down
7 changes: 4 additions & 3 deletions da/grpc/mockserv/mockserv.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ import (
ds "github.com/ipfs/go-datastore"
"google.golang.org/grpc"

"github.com/rollkit/go-da/test"
grpcda "github.com/rollkit/rollkit/da/grpc"
"github.com/rollkit/rollkit/da/mock"
"github.com/rollkit/rollkit/da/newda"
"github.com/rollkit/rollkit/types"
"github.com/rollkit/rollkit/types/pb/dalc"
"github.com/rollkit/rollkit/types/pb/rollkit"
Expand All @@ -17,7 +18,7 @@ import (
// GetServer creates and returns gRPC server instance.
func GetServer(kv ds.Datastore, conf grpcda.Config, mockConfig []byte, logger cmlog.Logger) *grpc.Server {
srv := grpc.NewServer()
mockImpl := &mockImpl{}
mockImpl := &mockImpl{mock: &newda.NewDA{DA: test.NewDummyDA()}}
err := mockImpl.mock.Init([8]byte{}, mockConfig, kv, logger)
if err != nil {
logger.Error("failed to initialize mock DALC", "error", err)
Expand All @@ -33,7 +34,7 @@ func GetServer(kv ds.Datastore, conf grpcda.Config, mockConfig []byte, logger cm
}

type mockImpl struct {
mock mock.DataAvailabilityLayerClient
mock *newda.NewDA
}

func (m *mockImpl) SubmitBlocks(ctx context.Context, request *dalc.SubmitBlocksRequest) (*dalc.SubmitBlocksResponse, error) {
Expand Down
125 changes: 125 additions & 0 deletions da/newda/newda.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package newda

import (
"context"
"encoding/binary"
"fmt"

pb "github.com/rollkit/rollkit/types/pb/rollkit"

"github.com/gogo/protobuf/proto"
ds "github.com/ipfs/go-datastore"

newda "github.com/rollkit/go-da"
"github.com/rollkit/rollkit/da"
"github.com/rollkit/rollkit/third_party/log"
"github.com/rollkit/rollkit/types"
)

// NewDA is a new DA implementation.
type NewDA struct {
DA newda.DA
logger log.Logger
}

// Init is called once to allow DA client to read configuration and initialize resources.
func (n *NewDA) Init(namespaceID types.NamespaceID, config []byte, kvStore ds.Datastore, logger log.Logger) error {
n.logger = logger
return nil
}

// Start creates connection to gRPC server and instantiates gRPC client.
func (n *NewDA) Start() error {
return nil
}

// Stop closes connection to gRPC server.
func (n *NewDA) Stop() error {
return nil
}

// SubmitBlocks submits blocks to DA.
func (n *NewDA) SubmitBlocks(ctx context.Context, blocks []*types.Block) da.ResultSubmitBlocks {
blobs := make([][]byte, len(blocks))
for i := range blocks {
blob, err := blocks[i].MarshalBinary()
if err != nil {
return da.ResultSubmitBlocks{
BaseResult: da.BaseResult{
Code: da.StatusError,
Message: "failed to serialize block",
},
}
}
blobs[i] = blob
}
ids, _, err := n.DA.Submit(blobs)
if err != nil {
return da.ResultSubmitBlocks{
BaseResult: da.BaseResult{
Code: da.StatusError,
Message: "failed to submit blocks: " + err.Error(),
},
}
}

return da.ResultSubmitBlocks{
BaseResult: da.BaseResult{
Code: da.StatusSuccess,
DAHeight: binary.LittleEndian.Uint64(ids[0]),
},
}
}

// RetrieveBlocks retrieves blocks from DA.
func (n *NewDA) RetrieveBlocks(ctx context.Context, dataLayerHeight uint64) da.ResultRetrieveBlocks {
ids, err := n.DA.GetIDs(dataLayerHeight)
if err != nil {
return da.ResultRetrieveBlocks{
BaseResult: da.BaseResult{
Code: da.StatusError,
Message: fmt.Sprintf("failed to get IDs: %s", err.Error()),
DAHeight: dataLayerHeight,
},
}
}

blobs, err := n.DA.Get(ids)
if err != nil {
return da.ResultRetrieveBlocks{
BaseResult: da.BaseResult{
Code: da.StatusError,
Message: fmt.Sprintf("failed to get blobs: %s", err.Error()),
DAHeight: dataLayerHeight,
},
}
}

blocks := make([]*types.Block, len(blobs))
for i, blob := range blobs {
var block pb.Block
err = proto.Unmarshal(blob, &block)
if err != nil {
n.logger.Error("failed to unmarshal block", "daHeight", dataLayerHeight, "position", i, "error", err)
continue
}
blocks[i] = new(types.Block)
err := blocks[i].FromProto(&block)
if err != nil {
return da.ResultRetrieveBlocks{
BaseResult: da.BaseResult{
Code: da.StatusError,
Message: err.Error(),
},
}
}
}

return da.ResultRetrieveBlocks{
BaseResult: da.BaseResult{
Code: da.StatusSuccess,
DAHeight: dataLayerHeight,
},
Blocks: blocks,
}
}
Loading

0 comments on commit 5ae1f78

Please sign in to comment.