Skip to content

Commit

Permalink
Merge pull request #2 from notional-labs/v0.11.9-notional
Browse files Browse the repository at this point in the history
Add celestia da for rollkit.
  • Loading branch information
hoank101 authored Jan 4, 2024
2 parents 02e66e8 + 5810858 commit 020fc2a
Show file tree
Hide file tree
Showing 5 changed files with 395 additions and 2 deletions.
178 changes: 178 additions & 0 deletions da/celestia/celestia.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package celestia

import (
"context"
"crypto/rand"
"encoding/binary"
"log/slog"
"strings"

"github.com/celestiaorg/nmt"
openrpc "github.com/rollkit/celestia-openrpc"
"github.com/rollkit/celestia-openrpc/types/appconsts"
"github.com/rollkit/celestia-openrpc/types/blob"
"github.com/rollkit/celestia-openrpc/types/share"
"github.com/rollkit/go-da"
)

func RandomNamespace() share.Namespace {
bytes := make([]byte, 4) // 4 bytes to produce 8 hex characters
_, err := rand.Read(bytes)
if err != nil {
panic(err)
}
ns, err := share.NewBlobNamespaceV0(bytes)
if err != nil {
panic(err)
}
return ns
}

type CelestiaDA struct {
logger *slog.Logger
context context.Context
client *openrpc.Client
namespace share.Namespace
}

func NewCelestiaDA(ctx context.Context, client *openrpc.Client) *CelestiaDA {
return &CelestiaDA{
context: ctx,
client: client,
namespace: RandomNamespace(),
logger: slog.Default(),
}
}

var _ da.DA = &CelestiaDA{}

// Commit implements da.DA.
func (c *CelestiaDA) Commit(blobs [][]byte) ([][]byte, error) {
c.logger.Info("CelestiaDA Commit", "blobs len", len(blobs))
_, commitments, err := c.blobsAndCommitments(blobs)
return commitments, err
}

// Get implements da.DA.
func (c *CelestiaDA) Get(ids [][]byte) ([][]byte, error) {
c.logger.Info("CelestiaDA Get", "ids len", len(ids))
var blobs []da.Blob
for _, id := range ids {
height, commitment := splitID(id)
blob, err := c.client.Blob.Get(c.context, height, c.namespace, commitment)
if err != nil {
return nil, err
}
blobs = append(blobs, blob.Data)
}
return blobs, nil
}

// GetIDs implements da.DA.
func (c *CelestiaDA) GetIDs(height uint64) ([][]byte, error) {
c.logger.Info("CelestiaDA GetIDs", "height", height)
var ids []da.ID
blobs, err := c.client.Blob.GetAll(c.context, height, []share.Namespace{c.namespace})
if err != nil {
if strings.Contains(err.Error(), blob.ErrBlobNotFound.Error()) {
return nil, nil
}
return nil, err
}
for _, b := range blobs {
ids = append(ids, makeID(height, b.Commitment))
}
return ids, nil
}

// MaxBlobSize implements da.DA.
func (c *CelestiaDA) MaxBlobSize() (uint64, error) {
c.logger.Info("CelestiaDA MaxBlobSize", "result", appconsts.DefaultMaxBytes)
return appconsts.DefaultMaxBytes, nil
}

// Submit implements da.DA.
func (c *CelestiaDA) Submit(daBlobs [][]byte) ([][]byte, [][]byte, error) {
c.logger.Info("CelestiaDA Submit", "daBlobs len", len(daBlobs))
blobs, commitments, err := c.blobsAndCommitments(daBlobs)
if err != nil {
return nil, nil, err
}
height, err := c.client.Blob.Submit(c.context, blobs, openrpc.DefaultSubmitOptions())
if err != nil {
return nil, nil, err
}
ids := make([]da.ID, len(daBlobs))
proofs := make([]da.Proof, len(daBlobs))
for i, commitment := range commitments {
ids[i] = makeID(height, commitment)
proof, err := c.client.Blob.GetProof(c.context, height, c.namespace, commitment)
if err != nil {
return nil, nil, err
}
proofs[i], err = (*proof)[0].MarshalJSON()
if err != nil {
return nil, nil, err
}
}
return ids, proofs, nil
}

// Validate implements da.DA.
func (c *CelestiaDA) Validate(ids [][]byte, proofs [][]byte) ([]bool, error) {
c.logger.Info("CelestiaDA Validate", "ids len", len(ids))
var included []bool
var bProofs []*blob.Proof
for _, p := range proofs {
nmtProof := &nmt.Proof{}
if err := nmtProof.UnmarshalJSON(p); err != nil {
return nil, err
}
proof := &blob.Proof{nmtProof}
bProofs = append(bProofs, proof)
}
for i, id := range ids {
height, commitment := splitID(id)
isIncluded, _ := c.client.Blob.Included(c.context, height, c.namespace, bProofs[i], commitment)
included = append(included, isIncluded)
}
return included, nil
}

func (c *CelestiaDA) blobsAndCommitments(daBlobs []da.Blob) ([]*blob.Blob, []da.Commitment, error) {
var blobs []*blob.Blob
var commitments []da.Commitment
for _, daBlob := range daBlobs {
b, err := blob.NewBlobV0(c.namespace, daBlob)
if err != nil {
return nil, nil, err
}
blobs = append(blobs, b)

commitment, err := blob.CreateCommitment(b)
if err != nil {
return nil, nil, err
}
commitments = append(commitments, commitment)
}
return blobs, commitments, nil
}

// heightLen is a length (in bytes) of serialized height.
//
// This is 8 as uint64 consist of 8 bytes.
const heightLen = 8

func makeID(height uint64, commitment da.Commitment) da.ID {
id := make([]byte, heightLen+len(commitment))
binary.LittleEndian.PutUint64(id, height)
copy(id[heightLen:], commitment)
return id
}

func splitID(id da.ID) (uint64, da.Commitment) {
if len(id) <= heightLen {
return 0, nil
}
return binary.LittleEndian.Uint64(id[:heightLen]), id[heightLen:]
}
99 changes: 99 additions & 0 deletions da/celestia/celestia_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package celestia_test

import (
"bytes"
"context"
"fmt"
"testing"
"time"

"github.com/ory/dockertest/v3"
openrpc "github.com/rollkit/celestia-openrpc"
"github.com/rollkit/go-da/test"
"github.com/rollkit/rollkit/da/celestia"
"github.com/stretchr/testify/suite"
)

type TestSuite struct {
suite.Suite

pool *dockertest.Pool
resource *dockertest.Resource

token string
}

func (t *TestSuite) SetupSuite() {
pool, err := dockertest.NewPool("")
if err != nil {
t.Failf("Could not construct docker pool", "error: %v\n", err)
}
t.pool = pool

// uses pool to try to connect to Docker
err = pool.Client.Ping()
if err != nil {
t.Failf("Could not connect to Docker", "error: %v\n", err)
}

// pulls an image, creates a container based on it and runs it
resource, err := pool.Run("ghcr.io/rollkit/local-celestia-devnet", "latest", []string{})
if err != nil {
t.Failf("Could not start resource", "error: %v\n", err)
}
t.resource = resource

// // exponential backoff-retry, because the application in the container might not be ready to accept connections yet
// pool.MaxWait = 60 * time.Second
// if err := pool.Retry(func() error {
// resp, err := http.Get(fmt.Sprintf("http://localhost:%s/balance", resource.GetPort("26659/tcp")))
// if err != nil {
// return err
// }
// bz, err := io.ReadAll(resp.Body)
// _ = resp.Body.Close()
// if err != nil {
// return err
// }
// if strings.Contains(string(bz), "error") {
// return errors.New(string(bz))
// }
// return nil
// }); err != nil {
// log.Fatalf("Could not start local-celestia-devnet: %s", err)
// }

opts := dockertest.ExecOptions{}
buf := new(bytes.Buffer)
opts.StdOut = buf
opts.StdErr = buf
_, err = resource.Exec([]string{"/bin/celestia", "bridge", "auth", "admin", "--node.store", "/home/celestia/bridge"}, opts)
if err != nil {
t.Failf("Could not execute command", "error: %v\n", err)
}

t.token = buf.String()
}

func (t *TestSuite) TearDownSuite() {
if err := t.pool.Purge(t.resource); err != nil {
t.Failf("failed to purge docker resource", "error: %v\n", err)
}
}

func TestIntegrationTestSuite(t *testing.T) {
suite.Run(t, new(TestSuite))
}

func (t *TestSuite) TestCelestiaDA() {
client, err := openrpc.NewClient(context.Background(), t.getRPCAddress(), t.token)
t.Require().NoError(err)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
da := celestia.NewCelestiaDA(ctx, client)
test.RunDATestSuite(t.T(), da)
}

func (t *TestSuite) getRPCAddress() string {
return fmt.Sprintf("http://localhost:%s", t.resource.GetPort("26658/tcp"))
}
33 changes: 32 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ toolchain go1.21.4

require (
github.com/celestiaorg/go-header v0.4.1
github.com/celestiaorg/nmt v0.20.0
github.com/celestiaorg/utils v0.1.0
github.com/cometbft/cometbft v0.38.0-rc3
github.com/cosmos/gogoproto v1.4.11
Expand Down Expand Up @@ -35,6 +36,35 @@ require (
google.golang.org/protobuf v1.31.0
)

require (
cosmossdk.io/math v1.1.2 // indirect
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 // indirect
github.com/Microsoft/go-winio v0.6.1 // indirect
github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 // indirect
github.com/celestiaorg/go-fraud v0.2.0 // indirect
github.com/celestiaorg/merkletree v0.0.0-20210714075610-a84dc3ddbbe4 // indirect
github.com/celestiaorg/rsmt2d v0.11.0 // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/containerd/continuity v0.4.1 // indirect
github.com/docker/cli v24.0.2+incompatible // indirect
github.com/docker/docker v24.0.2+incompatible // indirect
github.com/docker/go-connections v0.4.0 // indirect
github.com/filecoin-project/go-jsonrpc v0.3.1 // indirect
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
github.com/imdario/mergo v0.3.16 // indirect
github.com/klauspost/reedsolomon v1.11.8 // indirect
github.com/moby/term v0.5.0 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.0-rc2 // indirect
github.com/opencontainers/runc v1.1.7 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb // indirect
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect
github.com/xeipuuv/gojsonschema v1.2.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
)

require (
github.com/benbjohnson/clock v1.3.5 // indirect
github.com/beorn7/perks v1.0.1 // indirect
Expand Down Expand Up @@ -130,6 +160,7 @@ require (
github.com/onsi/ginkgo/v2 v2.11.0 // indirect
github.com/opencontainers/runtime-spec v1.1.0 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/ory/dockertest/v3 v3.10.0
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect
github.com/pelletier/go-toml/v2 v2.1.0 // indirect
github.com/petermattis/goid v0.0.0-20230317030725-371a4b8eda08 // indirect
Expand All @@ -143,7 +174,7 @@ require (
github.com/quic-go/quic-go v0.37.6 // indirect
github.com/quic-go/webtransport-go v0.5.3 // indirect
github.com/raulk/go-watchdog v1.3.0 // indirect
github.com/rogpeppe/go-internal v1.11.0 // indirect
github.com/rollkit/celestia-openrpc v0.3.0
github.com/sagikazarmark/locafero v0.3.0 // indirect
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
github.com/sasha-s/go-deadlock v0.3.1 // indirect
Expand Down
Loading

0 comments on commit 020fc2a

Please sign in to comment.