diff --git a/flags/flags.go b/flags/flags.go index 9ee798bb..879ad185 100644 --- a/flags/flags.go +++ b/flags/flags.go @@ -21,6 +21,7 @@ const ( S3Category = "S3 Cache/Fallback" VerifierCategory = "KZG and Cert Verifier" VerifierDeprecatedCategory = "DEPRECATED VERIFIER FLAGS -- THESE WILL BE REMOVED IN V2.0.0" + WVMCategory = "WVM Storage option" ) const ( diff --git a/server/config.go b/server/config.go index 3dc9d55c..22eb4120 100644 --- a/server/config.go +++ b/server/config.go @@ -11,6 +11,7 @@ import ( "github.com/Layr-Labs/eigenda-proxy/store/generated_key/memstore" "github.com/Layr-Labs/eigenda-proxy/store/precomputed_key/redis" "github.com/Layr-Labs/eigenda-proxy/store/precomputed_key/s3" + "github.com/Layr-Labs/eigenda-proxy/store/precomputed_key/wvm" "github.com/Layr-Labs/eigenda-proxy/utils" "github.com/Layr-Labs/eigenda-proxy/verify" "github.com/Layr-Labs/eigenda/api/clients" @@ -33,6 +34,7 @@ type Config struct { // secondary storage RedisConfig redis.Config S3Config s3.Config + WVMConfig wvm.Config } // ReadConfig ... parses the Config from the provided flags or environment variables. diff --git a/server/load_store.go b/server/load_store.go index be298c87..2c8f2573 100644 --- a/server/load_store.go +++ b/server/load_store.go @@ -10,6 +10,7 @@ import ( "github.com/Layr-Labs/eigenda-proxy/store/generated_key/memstore" "github.com/Layr-Labs/eigenda-proxy/store/precomputed_key/redis" "github.com/Layr-Labs/eigenda-proxy/store/precomputed_key/s3" + "github.com/Layr-Labs/eigenda-proxy/store/precomputed_key/wvm" "github.com/Layr-Labs/eigenda-proxy/verify" "github.com/Layr-Labs/eigenda/api/clients" "github.com/ethereum/go-ethereum/log" @@ -18,7 +19,9 @@ import ( // TODO - create structured abstraction for dependency injection vs. overloading stateless functions // populateTargets ... creates a list of storage backends based on the provided target strings -func populateTargets(targets []string, s3 store.PrecomputedKeyStore, redis *redis.Store) []store.PrecomputedKeyStore { +func populateTargets(targets []string, s3 store.PrecomputedKeyStore, + redis *redis.Store, wvm store.PrecomputedKeyStore, +) []store.PrecomputedKeyStore { stores := make([]store.PrecomputedKeyStore, len(targets)) for i, f := range targets { @@ -37,6 +40,12 @@ func populateTargets(targets []string, s3 store.PrecomputedKeyStore, redis *redi } stores[i] = s3 + case store.WVMBackendType: + if wvm == nil { + panic(fmt.Sprintf("S3 backend is not configured but specified in targets: %s", f)) + } + stores[i] = wvm + case store.EigenDABackendType, store.MemoryBackendType: panic(fmt.Sprintf("Invalid target for fallback: %s", f)) @@ -56,6 +65,8 @@ func LoadStoreManager(ctx context.Context, cfg CLIConfig, log log.Logger, m metr // create S3 backend store (if enabled) var err error var s3Store store.PrecomputedKeyStore + // create WVM backend store (if enabled) + var wvmStore store.PrecomputedKeyStore var redisStore *redis.Store if cfg.EigenDAConfig.S3Config.Bucket != "" && cfg.EigenDAConfig.S3Config.Endpoint != "" { @@ -66,6 +77,16 @@ func LoadStoreManager(ctx context.Context, cfg CLIConfig, log log.Logger, m metr } } + if cfg.EigenDAConfig.WVMConfig.Enabled { + if cfg.EigenDAConfig.WVMConfig.Endpoint != "" { + log.Info("Using WVM backend") + wvmStore, err = wvm.NewStore(&cfg.EigenDAConfig.WVMConfig, log) + if err != nil { + return nil, fmt.Errorf("failed to create WVM store: %w", err) + } + } + } + if cfg.EigenDAConfig.RedisConfig.Endpoint != "" { log.Info("Using Redis backend") // create Redis backend store @@ -120,8 +141,8 @@ func LoadStoreManager(ctx context.Context, cfg CLIConfig, log log.Logger, m metr } // create secondary storage router - fallbacks := populateTargets(cfg.EigenDAConfig.FallbackTargets, s3Store, redisStore) - caches := populateTargets(cfg.EigenDAConfig.CacheTargets, s3Store, redisStore) + fallbacks := populateTargets(cfg.EigenDAConfig.FallbackTargets, s3Store, redisStore, wvmStore) + caches := populateTargets(cfg.EigenDAConfig.CacheTargets, s3Store, redisStore, wvmStore) secondary := store.NewSecondaryManager(log, m, caches, fallbacks) if secondary.Enabled() { // only spin-up go routines if secondary storage is enabled diff --git a/store/precomputed_key/wvm/cli.go b/store/precomputed_key/wvm/cli.go new file mode 100644 index 00000000..3b05bf28 --- /dev/null +++ b/store/precomputed_key/wvm/cli.go @@ -0,0 +1,61 @@ +package wvm + +import ( + "github.com/urfave/cli/v2" +) + +var ( + EndpointFlagName = withFlagPrefix("endpoint") + ChainIDFlagName = withFlagPrefix("chain_id") + ArchiverAddressFlagName = withFlagPrefix("archiver_address") + TimeoutFlagName = withFlagPrefix("timeout") +) + +func withFlagPrefix(s string) string { + return "wvm." + s +} + +func withEnvPrefix(envPrefix, s string) []string { + return []string{envPrefix + "_WVM_" + s} +} + +// CLIFlags ... used for WVM backend configuration +// category is used to group the flags in the help output (see https://cli.urfave.org/v2/examples/flags/#grouping) +func CLIFlags(envPrefix, category string) []cli.Flag { + return []cli.Flag{ + &cli.StringFlag{ + Name: EndpointFlagName, + Usage: "endpoint for WVM chain rpc", + EnvVars: withEnvPrefix(envPrefix, "ENDPOINT"), + Category: category, + }, + &cli.StringFlag{ + Name: ChainIDFlagName, + Usage: "chain ID of WVM chain", + EnvVars: withEnvPrefix(envPrefix, "CHAIN_ID"), + Category: category, + }, + &cli.StringFlag{ + Name: ArchiverAddressFlagName, + Usage: "user's wvm chain address used for archiving data", + EnvVars: withEnvPrefix(envPrefix, "ARCHIVER_ADDRESS"), + Category: category, + }, + // &cli.DurationFlag{ + // Name: TimeoutFlagName, + // Usage: "timeout for S3 storage operations (e.g. get, put)", + // Value: 5 * time.Second, + // EnvVars: withEnvPrefix(envPrefix, "TIMEOUT"), + // Category: category, + // }, + } +} + +func ReadConfig(ctx *cli.Context) Config { + return Config{ + Endpoint: ctx.String(EndpointFlagName), + ChainID: ctx.Int64(ChainIDFlagName), + ArchiverAddress: ctx.String(ArchiverAddressFlagName), + // Timeout: ctx.Duration(TimeoutFlagName), + } +} diff --git a/store/precomputed_key/wvm/eth_rpc_client.go b/store/precomputed_key/wvm/eth_rpc_client.go new file mode 100644 index 00000000..5c33bb0c --- /dev/null +++ b/store/precomputed_key/wvm/eth_rpc_client.go @@ -0,0 +1,277 @@ +package wvm + +import ( + "bytes" + "context" + "crypto/ecdsa" + "encoding/hex" + "encoding/json" + "fmt" + "math/big" + "os" + "reflect" + "strconv" + "strings" + "time" + + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/ethclient" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/rlp" +) + +type EthRPCClient struct { + log log.Logger + client *ethclient.Client + chainID int64 +} + +func NewEthRPCClient(log log.Logger, cfg *Config) (*EthRPCClient, error) { + client, err := ethclient.Dial(cfg.Endpoint) + if err != nil { + return nil, fmt.Errorf("failed to connect to the WVM client: %w", err) + } + privKey := os.Getenv("WVM_PRIV_KEY") + if privKey == "" { + return nil, fmt.Errorf("wvm archiver private key is empty") + } + return &EthRPCClient{log, client, cfg.ChainID}, nil +} + +// getSuggestedGasPrice connects to an Ethereum node via RPC and retrieves the current suggested gas price. +func (rpc *EthRPCClient) getSuggestedGasPrice(ctx context.Context) error { + gasPrice, err := rpc.client.SuggestGasPrice(ctx) + if err != nil { + return fmt.Errorf("failed to suggest gas price: %w", err) + } + + rpc.log.Info("wvm chain suggested gas price", "gas", gasPrice.String()) + + return nil +} + +// estimateGas tries estimates the suggested amount of gas that required to execute a given transaction. +func (rpc *EthRPCClient) estimateGas(ctx context.Context, from, to string, data []byte) (uint64, error) { + var ( + fromAddr = common.HexToAddress(from) + toAddr = common.HexToAddress(to) + bytesData []byte + err error + ) + + var encoded string + if string(data) != "" { + if ok := strings.HasPrefix(string(data), "0x"); !ok { + encoded = hexutil.Encode(data) + } + + bytesData, err = hexutil.Decode(encoded) + if err != nil { + return 0, err + } + } + + msg := ethereum.CallMsg{ + From: fromAddr, + To: &toAddr, + Gas: 0x00, + Data: bytesData, + } + + gas, err := rpc.client.EstimateGas(ctx, msg) + if err != nil { + return 0, err + } + + rpc.log.Info("WVM estimated Gas Price", "price", gas) + + return gas, nil +} + +// createRawTransaction creates a raw EIP-1559 transaction and returns it as a hex string. +func (rpc *EthRPCClient) createRawTransaction(ctx context.Context, to string, data string, gasLimit uint64) (string, error) { + baseFee, err := rpc.client.SuggestGasPrice(ctx) + if err != nil { + return "", err + } + + // WVM: maybe we don't need this, but e.g. + // increment := new(big.Int).Mul(big.NewInt(2), big.NewInt(params.GWei)) + // gasFeeCap := new(big.Int).Add(baseFee, increment) + // gasFeeCap.Add(gasFeeCap, priorityFee) + + gasFeeCap := baseFee + + // address shenanigans + // Decode the provided private key. + privKey := os.Getenv("WVM_PRIV_KEY") + if privKey == "" { + panic("wvm archiver signer key is empty") + } + pKeyBytes, err := hexutil.Decode("0x" + privKey) + if err != nil { + return "", err + } + // Convert the private key bytes to an ECDSA private key. + ecdsaPrivateKey, err := crypto.ToECDSA(pKeyBytes) + if err != nil { + return "", err + } + // Extract the public key from the ECDSA private key. + publicKey := ecdsaPrivateKey.Public() + publicKeyECDSA, ok := publicKey.(*ecdsa.PublicKey) + if !ok { + return "", fmt.Errorf("error casting public key to ECDSA") + } + // Compute the Ethereum address of the signer from the public key. + fromAddress := crypto.PubkeyToAddress(*publicKeyECDSA) + // Retrieve the nonce for the signer's account, representing the transaction count. + nonce, err := rpc.client.PendingNonceAt(ctx, fromAddress) + if err != nil { + return "", err + } + + // Prepare data payload. + var hexData string + if strings.HasPrefix(data, "0x") { + hexData = data + } else { + hexData = hexutil.Encode([]byte(data)) + } + bytesData, err := hexutil.Decode(hexData) + if err != nil { + return "", err + } + + toAddr := common.HexToAddress(to) + txData := types.DynamicFeeTx{ + ChainID: big.NewInt(rpc.chainID), + Nonce: nonce, + GasTipCap: big.NewInt(0), + GasFeeCap: gasFeeCap, + Gas: gasLimit, + To: &toAddr, + Data: bytesData, + } + + tx := types.NewTx(&txData) + signedTx, err := types.SignTx(tx, types.LatestSignerForChainID(big.NewInt(rpc.chainID)), ecdsaPrivateKey) + if err != nil { + return "", err + } + + // Encode the signed transaction into RLP (Recursive Length Prefix) format for transmission. + var buf bytes.Buffer + err = signedTx.EncodeRLP(&buf) + if err != nil { + return "", err + } + + // Return the RLP-encoded transaction as a hexadecimal string. + rawTxRLPHex := hex.EncodeToString(buf.Bytes()) + + return rawTxRLPHex, nil +} + +// Transaction represents the structure of the transaction JSON. +type Transaction struct { + Type string `json:"type"` + ChainID string `json:"chainId"` + Nonce string `json:"nonce"` + To string `json:"to"` + Gas string `json:"gas"` + GasPrice string `json:"gasPrice,omitempty"` + MaxPriorityFeePerGas string `json:"maxPriorityFeePerGas"` + MaxFeePerGas string `json:"maxFeePerGas"` + Value string `json:"value"` + Input string `json:"input"` + AccessList []string `json:"accessList"` + V string `json:"v"` + R string `json:"r"` + S string `json:"s"` + YParity string `json:"yParity"` + Hash string `json:"hash"` + TransactionTime string `json:"transactionTime,omitempty"` + TransactionCost string `json:"transactionCost,omitempty"` +} + +// helper function +func convertHexField(tx *Transaction, field string) error { + typeOfTx := reflect.TypeOf(*tx) + + // Get the value of the Transaction struct + txValue := reflect.ValueOf(tx).Elem() + + // Parse the hexadecimal string as an integer + hexStr := txValue.FieldByName(field).String() + + intValue, err := strconv.ParseUint(hexStr[2:], 16, 64) + if err != nil { + return err + } + + // Convert the integer to a decimal string + decimalStr := strconv.FormatUint(intValue, 10) + + // Check if the field exists + _, ok := typeOfTx.FieldByName(field) + if !ok { + return fmt.Errorf("field %s does not exist in Transaction struct", field) + } + + // Set the field value to the decimal string + txValue.FieldByName(field).SetString(decimalStr) + + return nil +} + +func (rpc *EthRPCClient) sendRawTransaction(ctx context.Context, rawTx string) (string, error) { + rawTxBytes, err := hex.DecodeString(rawTx) + if err != nil { + return "", err + } + + tx := new(types.Transaction) + + err = rlp.DecodeBytes(rawTxBytes, &tx) + if err != nil { + return "", err + } + + err = rpc.client.SendTransaction(ctx, tx) + if err != nil { + return "", err + } + + var txDetails Transaction + txBytes, err := tx.MarshalJSON() + if err != nil { + return "", err + } + if err := json.Unmarshal(txBytes, &txDetails); err != nil { + return "", err + } + + txDetails.TransactionTime = tx.Time().Format(time.RFC822) + txDetails.TransactionCost = tx.Cost().String() + + convertFields := []string{"Nonce", "MaxPriorityFeePerGas", "MaxFeePerGas", "Value", "Type", "Gas"} + for _, field := range convertFields { + if err := convertHexField(&txDetails, field); err != nil { + return "", err + } + } + + txJSON, err := json.MarshalIndent(txDetails, "", "\t") + if err != nil { + return "", err + } + + rpc.log.Info("WVM:raw TX Receipt:", "tx receipt", string(txJSON)) + + return txDetails.Hash, nil +} diff --git a/store/precomputed_key/wvm/wvm.go b/store/precomputed_key/wvm/wvm.go new file mode 100644 index 00000000..557a6f0b --- /dev/null +++ b/store/precomputed_key/wvm/wvm.go @@ -0,0 +1,226 @@ +package wvm + +import ( + "bytes" + "context" + "encoding/hex" + "encoding/json" + "fmt" + "io" + "net/http" + "time" + + "github.com/Layr-Labs/eigenda-proxy/store" + "github.com/andybalholm/brotli" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/log" + cache "github.com/patrickmn/go-cache" +) + +// Config...WVM client configuration +type Config struct { + Enabled bool + // RPC endpoint of WVM chain + Endpoint string + // WVM chain id + ChainID int64 + // User's archiver address + // Should be populated with tWVM tokens used in WVM testnet + // Please check out WVM Faucet to claim $tWVM. + // Please read README to gen an additional information + ArchiverAddress string +} + +// Store...wraps wvm client, ethclient and concurrent internal cache +type Store struct { + rpcClient *EthRPCClient + log log.Logger + txCache *cache.Cache + cfg *Config +} + +const ( + // wvmRPCURL = "https://testnet-rpc.wvm.dev" // wvm alphanet rpc url + ArchivePoolAddress = "0x0000000000000000000000000000000000000000" // the data settling address, a unified standard across WeaveVM archiving services +) + +func NewStore(cfg *Config, log log.Logger) (*Store, error) { + rpcClient, err := NewEthRPCClient(log, cfg) + if err != nil { + return nil, fmt.Errorf("failed to initialize rpc client for wvm chain: %w", err) + } + + return &Store{ + rpcClient: rpcClient, + cfg: cfg, log: log, txCache: cache.New(24*15*time.Hour, 24*time.Hour), + }, nil +} + +func (wvm *Store) BackendType() store.BackendType { + return store.S3BackendType +} + +func (wvm *Store) Verify(_ context.Context, key []byte, value []byte) error { + h := crypto.Keccak256Hash(value) + if !bytes.Equal(h[:], key) { + return fmt.Errorf("key does not match value, expected: %s got: %s", hex.EncodeToString(key), h.Hex()) + } + + return nil +} + +func (wvm *Store) Put(ctx context.Context, key []byte, value []byte) error { + wvmData, err := wvm.wvmEncode(value) + if err != nil { + return fmt.Errorf("failed to store data in wvm: %w", err) + } + + err = wvm.rpcClient.getSuggestedGasPrice(ctx) + if err != nil { + return fmt.Errorf("failed to store data in wvm: %w", err) + } + + gas, err := wvm.rpcClient.estimateGas(ctx, wvm.cfg.ArchiverAddress, ArchivePoolAddress, wvmData) + if err != nil { + return fmt.Errorf("failed to store data in wvm: %w", err) + } + + wvmRawTx, err := wvm.rpcClient.createRawTransaction(ctx, ArchivePoolAddress, string(wvmData), gas) + if err != nil { + return fmt.Errorf("failed to store data in wvm: %w", err) + } + + wvmTxHash, err := wvm.rpcClient.sendRawTransaction(ctx, wvmRawTx) + if err != nil { + return fmt.Errorf("failed to store data in wvm: %w", err) + } + + wvm.log.Info("WVM:TX Hash:", "tx hash", wvmTxHash) + + wvm.txCache.Set(string(key), wvmTxHash, cache.DefaultExpiration) + + wvm.log.Info("WVM:saved wvm Tx Hash by batch_id:blob_index", + "tx hash", wvmTxHash, + "key", key) + + return nil +} + +func (wvm *Store) Get(ctx context.Context, key []byte) ([]byte, error) { + wvmTxHash, err := wvm.GetWvmTxHashByCommitment(key) + if err != nil { + return nil, err + } + + wvm.log.Info("found wvm tx hash using provided commitment", "provided key", string(key)) + data, err := wvm.get(ctx, wvmTxHash) + if err != nil { + return nil, fmt.Errorf("failed to get eigenda blob from wvm: %w", err) + } + + return data, nil +} + +func (wvm *Store) get(ctx context.Context, wvmTxHash string) ([]byte, error) { + r, err := http.NewRequestWithContext(ctx, http.MethodGet, + fmt.Sprintf("https://wvm-data-retriever.shuttleapp.rs/calldata/%s", + wvmTxHash), nil) + if err != nil { + return nil, fmt.Errorf("failed to call wvm-data-retriever") + } + + defer r.Body.Close() + + type WvmResponse struct { + ArweaveBlockHash string `json:"arweave_block_hash"` + Calldata string `json:"calldata"` + WarDecodedCalldata string `json:"war_decoded_calldata"` + WvmBlockHash string `json:"wvm_block_hash"` + } + + body, err := io.ReadAll(r.Body) + if err != nil { + return nil, fmt.Errorf("failed to read response body: %w", err) + } + + var wvmData WvmResponse + err = json.Unmarshal(body, &wvmData) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal response: %w", err) + } + + wvm.log.Info("Received data from WVM", "arweave_block_hash", wvmData.ArweaveBlockHash, "wvm_block_hash", wvmData.WvmBlockHash) + + wvmDecodedBlob, err := wvm.wvmDecode(wvmData.Calldata) + if err != nil { + return nil, fmt.Errorf("failed to decode calldata to eigen decoded blob: %w", err) + } + + if len(wvmDecodedBlob) == 0 { + return nil, fmt.Errorf("blob has length zero") + } + + return wvmDecodedBlob, nil +} + +// GetWvmTxHashByCommitment uses commitment to get wvm tx hash from the internal map(temprorary hack) +// and returns it to the caller +func (wvm *Store) GetWvmTxHashByCommitment(key []byte) (string, error) { + wvmTxHash, found := wvm.txCache.Get(string(key)) + if !found { + wvm.log.Info("wvm tx hash using provided commitment NOT FOUND", "provided key", string(key)) + return "", fmt.Errorf("wvm tx hash for provided commitment not found") + } + + wvm.log.Info("wvm tx hash using provided commitment FOUND", "provided key", string(key)) + + return wvmTxHash.(string), nil +} + +func (wvm *Store) wvmEncode(eigenBlob []byte) ([]byte, error) { + eigenBlobLen := len(eigenBlob) + + wvm.log.Info("WVM:eigen blob received", "eigen blob size", eigenBlobLen) + + brotliOut := bytes.Buffer{} + writer := brotli.NewWriterOptions(&brotliOut, brotli.WriterOptions{Quality: 6}) + + in := bytes.NewReader(eigenBlob) + n, err := io.Copy(writer, in) + if err != nil { + return nil, fmt.Errorf("failed to read buffer to encode eigen blob: %w", err) + } + + if int(n) != len(eigenBlob) { + return nil, fmt.Errorf("WVM:size mismatch during brotli compression") + } + + if err := writer.Close(); err != nil { + return nil, fmt.Errorf("WVM: brotli writer close fail: %w", err) + } + + wvm.log.Info("WVM:compressed by brotli", "eigen blob size", eigenBlobLen, "eigen blob size compressed with brotli", brotliOut.Len()) + + return brotliOut.Bytes(), nil +} + +func (wvm *Store) wvmDecode(calldataBlob string) ([]byte, error) { + // trim calldata + compressedBlob, err := hex.DecodeString(calldataBlob[2:]) + if err != nil { + return nil, err + } + + wvm.log.Info("WVM:compressed eigen blob received for decompression", "compressed blob size", len(compressedBlob)) + + brotliReader := brotli.NewReader(bytes.NewReader(compressedBlob)) + + decompressedEncoded, err := io.ReadAll(brotliReader) + if err != nil { + return nil, fmt.Errorf("WVM: failed to decompress brotli data: %w", err) + } + + wvm.log.Info("WVM:blob successfully decompressed", "decompressed blob size", len(compressedBlob)) + + return decompressedEncoded, nil +} diff --git a/store/store.go b/store/store.go index 6026837e..0e5ccd70 100644 --- a/store/store.go +++ b/store/store.go @@ -15,6 +15,8 @@ const ( RedisBackendType Unknown + + WVMBackendType ) var ( @@ -32,6 +34,8 @@ func (b BackendType) String() string { return "S3" case RedisBackendType: return "Redis" + case WVMBackendType: + return "WVM" case Unknown: fallthrough default: @@ -49,6 +53,8 @@ func StringToBackendType(s string) BackendType { return MemoryBackendType case "s3": return S3BackendType + case "wvm": + return WVMBackendType case "redis": return RedisBackendType case "unknown": @@ -79,12 +85,6 @@ type GeneratedKeyStore interface { Put(ctx context.Context, value []byte) (key []byte, err error) } -type WVMedKeyGeneratedStore interface { - GeneratedKeyStore - GetWvmTxHashByCommitment(ctx context.Context, key []byte) (string, error) - GetBlobFromWvm(ctx context.Context, key []byte) ([]byte, error) -} - type PrecomputedKeyStore interface { Store // Get retrieves the given key if it's present in the key-value data store. diff --git a/store/wvm.go b/store/wvm.go deleted file mode 100644 index 9408f956..00000000 --- a/store/wvm.go +++ /dev/null @@ -1,439 +0,0 @@ -package store - -import ( - "bytes" - "context" - "crypto/ecdsa" - "encoding/hex" - "encoding/json" - "fmt" - "io" - "math/big" - "net/http" - "os" - "reflect" - "strconv" - "strings" - "time" - - "github.com/Layr-Labs/eigenda-proxy/verify" - "github.com/andybalholm/brotli" - "github.com/ethereum/go-ethereum" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/common/hexutil" - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/ethclient" - "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/rlp" - cache "github.com/patrickmn/go-cache" -) - -type WVMClient struct { - client *ethclient.Client - log log.Logger - wvmCache *cache.Cache -} - -const ( - wvmRPCURL = "https://testnet-rpc.wvm.dev" // wvm alphanet rpc url - wvmChainID = 9496 // wvm alphanet chain id - - wvmArchiverAddress = "0xF8a5a479f04d1565b925A67D088b8fC3f8f0b7eF" // we use it as a "from" address - wvmArchivePoolAddress = "0x606dc1BE30A5966FcF3C10D907d1B76A7B1Bbbd9" // we use it as a "to" address -) - -func NewWVMClient(log log.Logger) *WVMClient { - client, err := ethclient.Dial(wvmRPCURL) - if err != nil { - panic(fmt.Sprintf("failed to connect to the WVM client: %v", err)) - } - privKey := os.Getenv("WVM_PRIV_KEY") - if privKey == "" { - panic("wvm archiver signer key is empty") - } - - return &WVMClient{client: client, log: log, wvmCache: cache.New(24*15*time.Hour, 24*time.Hour)} -} - -func (wvm *WVMClient) Store(ctx context.Context, cert *verify.Certificate, eigenBlobData []byte) error { - wvm.log.Info("WVM: save BLOB in wvm chain", "batch id", cert.BlobVerificationProof.BatchId, "blob index", cert.BlobVerificationProof.BlobIndex) - - wvmData, err := wvm.WvmEncode(eigenBlobData) - if err != nil { - return fmt.Errorf("failed to store data in wvm: %w", err) - } - - err = wvm.getSuggestedGasPrice(ctx) - if err != nil { - return fmt.Errorf("failed to store data in wvm: %w", err) - } - - gas, err := wvm.estimateGas(ctx, wvmArchiverAddress, wvmArchivePoolAddress, wvmData) - if err != nil { - return fmt.Errorf("failed to store data in wvm: %w", err) - } - - wvmRawTx, err := wvm.createRawTransaction(ctx, wvmArchivePoolAddress, string(wvmData), gas) - if err != nil { - return fmt.Errorf("failed to store data in wvm: %w", err) - } - - wvmTxHash, err := wvm.sendRawTransaction(ctx, wvmRawTx) - if err != nil { - return fmt.Errorf("failed to store data in wvm: %w", err) - } - - wvm.log.Info("WVM:TX Hash:", "tx hash", wvmTxHash) - - wvm.wvmCache.Set(commitmentKey(cert.BlobVerificationProof.BatchId, cert.BlobVerificationProof.BlobIndex), wvmTxHash, cache.DefaultExpiration) - wvm.log.Info("WVM:saved wvm Tx Hash by batch_id:blob_index", - "tx hash", wvmTxHash, - "key", commitmentKey(cert.BlobVerificationProof.BatchId, cert.BlobVerificationProof.BlobIndex)) - - return nil -} - -func (wvm *WVMClient) WvmEncode(eigenBlob []byte) ([]byte, error) { - eigenBlobLen := len(eigenBlob) - - wvm.log.Info("WVM:eigen blob received", "eigen blob size", eigenBlobLen) - - brotliOut := bytes.Buffer{} - writer := brotli.NewWriterOptions(&brotliOut, brotli.WriterOptions{Quality: 6}) - - in := bytes.NewReader(eigenBlob) - n, err := io.Copy(writer, in) - if err != nil { - panic(err) - } - - if int(n) != len(eigenBlob) { - panic("WVM:size mismatch during brotli compression") - } - - if err := writer.Close(); err != nil { - panic(fmt.Errorf("WVM: brotli writer close fail: %w", err)) - } - - wvm.log.Info("WVM:compressed by brotli", "eigen blob size", eigenBlobLen, "eigen blob size compressed with brotli", brotliOut.Len()) - - return brotliOut.Bytes(), nil -} - -func (wvm *WVMClient) WvmDecode(calldataBlob string) ([]byte, error) { - // trim calldata - compressedBlob, err := hex.DecodeString(calldataBlob[2:]) - if err != nil { - return nil, err - } - - wvm.log.Info("WVM:compressed eigen blob received for decompression", "compressed blob size", len(compressedBlob)) - - brotliReader := brotli.NewReader(bytes.NewReader(compressedBlob)) - - decompressedEncoded, err := io.ReadAll(brotliReader) - if err != nil { - return nil, fmt.Errorf("WVM: failed to decompress brotli data: %w", err) - } - - wvm.log.Info("WVM:blob successfully decompressed", "decompressed blob size", len(compressedBlob)) - - return decompressedEncoded, nil -} - -// GetWvmTxHashByCommitment uses commitment to get wvm tx hash from the internal map(temprorary hack) -// and returns it to the caller -func (wvm *WVMClient) GetWvmTxHashByCommitment(cert *verify.Certificate) (string, error) { - wvmTxHash, found := wvm.wvmCache.Get(commitmentKey(cert.BlobVerificationProof.BatchId, cert.BlobVerificationProof.BlobIndex)) - if !found { - wvm.log.Info("wvm tx hash using provided commitment NOT FOUND", "provided key", commitmentKey(cert.BlobVerificationProof.BatchId, cert.BlobVerificationProof.BlobIndex)) - return "", fmt.Errorf("wvm tx hash for provided commitment not found") - } - - wvm.log.Info("wvm tx hash using provided commitment FOUND", "provided key", commitmentKey(cert.BlobVerificationProof.BatchId, cert.BlobVerificationProof.BlobIndex)) - - return wvmTxHash.(string), nil -} - -func (wvm *WVMClient) GetBlobFromWvm(ctx context.Context, wvmTxHash string) ([]byte, error) { - r, err := http.NewRequestWithContext(ctx, http.MethodGet, - fmt.Sprintf("https://wvm-data-retriever.shuttleapp.rs/calldata/%s", - wvmTxHash), nil) - if err != nil { - return nil, fmt.Errorf("failed to call wvm-data-retriever") - } - - type WvmResponse struct { - ArweaveBlockHash string `json:"arweave_block_hash"` - Calldata string `json:"calldata"` - WarDecodedCalldata string `json:"war_decoded_calldata"` - WvmBlockHash string `json:"wvm_block_hash"` - } - - defer r.Body.Close() - - body, err := io.ReadAll(r.Body) - if err != nil { - return nil, fmt.Errorf("failed to read response body: %w", err) - } - - var wvmData WvmResponse - err = json.Unmarshal(body, &wvmData) - if err != nil { - return nil, fmt.Errorf("failed to unmarshal response: %w", err) - } - - wvm.log.Info("Received data from WVM", "arweave_block_hash", wvmData.ArweaveBlockHash, "wvm_block_hash", wvmData.WvmBlockHash) - - wvmDecodedBlob, err := wvm.WvmDecode(wvmData.Calldata) - if err != nil { - return nil, fmt.Errorf("failed to decode calldata to eigen decoded blob: %w", err) - } - - if len(wvmDecodedBlob) == 0 { - return nil, fmt.Errorf("blob has length zero") - } - - return wvmDecodedBlob, nil -} - -// HELPERS - -// getSuggestedGasPrice connects to an Ethereum node via RPC and retrieves the current suggested gas price. -func (wvm *WVMClient) getSuggestedGasPrice(ctx context.Context) error { - gasPrice, err := wvm.client.SuggestGasPrice(ctx) - if err != nil { - return fmt.Errorf("failed to suggest gas price: %w", err) - } - - wvm.log.Info("WVM suggested Gas Price", "gas", gasPrice.String()) - - return nil -} - -// estimateGas tries estimates the suggested amount of gas that required to execute a given transaction. -func (wvm *WVMClient) estimateGas(ctx context.Context, from, to string, data []byte) (uint64, error) { - var ( - fromAddr = common.HexToAddress(from) - toAddr = common.HexToAddress(to) - bytesData []byte - err error - ) - - var encoded string - if string(data) != "" { - if ok := strings.HasPrefix(string(data), "0x"); !ok { - encoded = hexutil.Encode(data) - } - - bytesData, err = hexutil.Decode(encoded) - if err != nil { - return 0, err - } - } - - msg := ethereum.CallMsg{ - From: fromAddr, - To: &toAddr, - Gas: 0x00, - Data: bytesData, - } - - gas, err := wvm.client.EstimateGas(ctx, msg) - if err != nil { - return 0, err - } - - wvm.log.Info("WVM estimated Gas Price", "price", gas) - - return gas, nil -} - -// createRawTransaction creates a raw EIP-1559 transaction and returns it as a hex string. -func (wvm *WVMClient) createRawTransaction(ctx context.Context, to string, data string, gasLimit uint64) (string, error) { - baseFee, err := wvm.client.SuggestGasPrice(ctx) - if err != nil { - return "", err - } - - // WVM: maybe we don't need this, but e.g. - // increment := new(big.Int).Mul(big.NewInt(2), big.NewInt(params.GWei)) - // gasFeeCap := new(big.Int).Add(baseFee, increment) - // gasFeeCap.Add(gasFeeCap, priorityFee) - - gasFeeCap := baseFee - - // address shenanigans - // Decode the provided private key. - privKey := os.Getenv("WVM_PRIV_KEY") - if privKey == "" { - panic("wvm archiver signer key is empty") - } - pKeyBytes, err := hexutil.Decode("0x" + privKey) - if err != nil { - return "", err - } - // Convert the private key bytes to an ECDSA private key. - ecdsaPrivateKey, err := crypto.ToECDSA(pKeyBytes) - if err != nil { - return "", err - } - // Extract the public key from the ECDSA private key. - publicKey := ecdsaPrivateKey.Public() - publicKeyECDSA, ok := publicKey.(*ecdsa.PublicKey) - if !ok { - return "", fmt.Errorf("error casting public key to ECDSA") - } - // Compute the Ethereum address of the signer from the public key. - fromAddress := crypto.PubkeyToAddress(*publicKeyECDSA) - // Retrieve the nonce for the signer's account, representing the transaction count. - nonce, err := wvm.client.PendingNonceAt(ctx, fromAddress) - if err != nil { - return "", err - } - - // Prepare data payload. - var hexData string - if strings.HasPrefix(data, "0x") { - hexData = data - } else { - hexData = hexutil.Encode([]byte(data)) - } - bytesData, err := hexutil.Decode(hexData) - if err != nil { - return "", err - } - - toAddr := common.HexToAddress(to) - txData := types.DynamicFeeTx{ - ChainID: big.NewInt(wvmChainID), - Nonce: nonce, - GasTipCap: big.NewInt(0), - GasFeeCap: gasFeeCap, - Gas: gasLimit, - To: &toAddr, - Data: bytesData, - } - - tx := types.NewTx(&txData) - signedTx, err := types.SignTx(tx, types.LatestSignerForChainID(big.NewInt(wvmChainID)), ecdsaPrivateKey) - if err != nil { - return "", err - } - - // Encode the signed transaction into RLP (Recursive Length Prefix) format for transmission. - var buf bytes.Buffer - err = signedTx.EncodeRLP(&buf) - if err != nil { - return "", err - } - - // Return the RLP-encoded transaction as a hexadecimal string. - rawTxRLPHex := hex.EncodeToString(buf.Bytes()) - - return rawTxRLPHex, nil -} - -// Transaction represents the structure of the transaction JSON. -type Transaction struct { - Type string `json:"type"` - ChainID string `json:"chainId"` - Nonce string `json:"nonce"` - To string `json:"to"` - Gas string `json:"gas"` - GasPrice string `json:"gasPrice,omitempty"` - MaxPriorityFeePerGas string `json:"maxPriorityFeePerGas"` - MaxFeePerGas string `json:"maxFeePerGas"` - Value string `json:"value"` - Input string `json:"input"` - AccessList []string `json:"accessList"` - V string `json:"v"` - R string `json:"r"` - S string `json:"s"` - YParity string `json:"yParity"` - Hash string `json:"hash"` - TransactionTime string `json:"transactionTime,omitempty"` - TransactionCost string `json:"transactionCost,omitempty"` -} - -func (wvm *WVMClient) sendRawTransaction(ctx context.Context, rawTx string) (string, error) { - rawTxBytes, err := hex.DecodeString(rawTx) - if err != nil { - return "", err - } - - tx := new(types.Transaction) - - err = rlp.DecodeBytes(rawTxBytes, &tx) - if err != nil { - return "", err - } - - err = wvm.client.SendTransaction(ctx, tx) - if err != nil { - return "", err - } - - var txDetails Transaction - txBytes, err := tx.MarshalJSON() - if err != nil { - return "", err - } - if err := json.Unmarshal(txBytes, &txDetails); err != nil { - return "", err - } - - txDetails.TransactionTime = tx.Time().Format(time.RFC822) - txDetails.TransactionCost = tx.Cost().String() - - convertFields := []string{"Nonce", "MaxPriorityFeePerGas", "MaxFeePerGas", "Value", "Type", "Gas"} - for _, field := range convertFields { - if err := convertHexField(&txDetails, field); err != nil { - return "", err - } - } - - txJSON, err := json.MarshalIndent(txDetails, "", "\t") - if err != nil { - return "", err - } - - wvm.log.Info("WVM:raw TX Receipt:", "tx receipt", string(txJSON)) - - return txDetails.Hash, nil -} - -// helper function -func convertHexField(tx *Transaction, field string) error { - typeOfTx := reflect.TypeOf(*tx) - - // Get the value of the Transaction struct - txValue := reflect.ValueOf(tx).Elem() - - // Parse the hexadecimal string as an integer - hexStr := txValue.FieldByName(field).String() - - intValue, err := strconv.ParseUint(hexStr[2:], 16, 64) - if err != nil { - return err - } - - // Convert the integer to a decimal string - decimalStr := strconv.FormatUint(intValue, 10) - - // Check if the field exists - _, ok := typeOfTx.FieldByName(field) - if !ok { - return fmt.Errorf("field %s does not exist in Transaction struct", field) - } - - // Set the field value to the decimal string - txValue.FieldByName(field).SetString(decimalStr) - - return nil -} - -func commitmentKey(batchID, blobIndex uint32) string { - return fmt.Sprintf("%d:%d", batchID, blobIndex) -}