diff --git a/docs/Diagrams/payload_commit_service.puml b/docs/Diagrams/payload_commit_service.puml index 09f7aed..6221739 100644 --- a/docs/Diagrams/payload_commit_service.puml +++ b/docs/Diagrams/payload_commit_service.puml @@ -11,15 +11,7 @@ database ipfs database web3_storage group snapshot payload message - rabbitmq -> pcs: received snapshot payload message - - pcs -> ipfs: store snapshot payload - - alt check message payload for upload to web3_storage flag - pcs -> web3_storage: store snapshot payload - end - - pcs -> redis: store unfinalized snapshot cid against epochId for the project with timestamp + rabbitmq -> pcs: received snapshot payload CID from snapshotter note over pcs: create EIP-712 signature for smart contract transaction alt relayer is enabled in config @@ -30,23 +22,22 @@ group snapshot payload message end group snapshot finalized message - rabbitmq -> pcs: received snapshot finalized message - - pcs -> redis: store finalized snapshot - redis -> pcs: get finalized snapshot against previous epochId for the project - redis -> pcs: get unfinalized snapshot against previous epochId for the project - - alt unfinalized snapshotCid == finalized snapshotCid (for previous epochId) - pcs -> local_disk: store snapshot data - - else stored snapshotCid != finalized snapshotCid (for previous epochId) - pcs -> ipfs: unpin unfinalized snapshotCid - pcs -> ipfs: get snapshot for finalized cid - - pcs -> local_disk: store fetched snapshot data + rabbitmq -> pcs: received snapshot finalized message for a project at an `epochId` + pcs -> redis: store finalized snapshot CID at `epochId` for project + redis -> pcs: get unfinalized snapshot CID against `epochId` for the project + + alt unfinalized snapshot CID not found at `epochId` + pcs -> redis: mark in snapshotter status report against project and epoch ID as `OnlyFinalizedSnapshotSubmission` + else unfinalized snapshot CID found at `epochId` for project + alt stored snapshot CID == finalized snapshot CID + pcs -> redis: clean up finalized snapshot CID zset + pcs -> redis: mark in snapshotter status report against project and epoch ID as `SuccessfulSnapshotSubmission` + else stored snapshotCid != finalized snapshotCid (for previous epochId) + pcs -> ipfs: unpin unfinalized snapshot CID + pcs -> redis: mark in snapshotter status report against project and epoch ID as `IncorrectSnapshotSubmission` end - - pcs -> redis: prune unfinalized snapshotCid older than (configured duration default is 7days) + pcs -> ipfs: get snapshot for finalized cid + pcs -> local_disk: store fetched snapshot data pcs -> redis: generate and store snapshotter report note left: [project wise]\ntotal missed snapshots count\ntotal incorrect snapshots count\ntotal successful snapshots count\nstatus report diff --git a/docs/images/payload_commit_service_callflow.png b/docs/images/payload_commit_service_callflow.png index d2b4150..177ede4 100644 Binary files a/docs/images/payload_commit_service_callflow.png and b/docs/images/payload_commit_service_callflow.png differ diff --git a/go/caching/caching.go b/go/caching/caching.go index 25af93b..218f9a2 100644 --- a/go/caching/caching.go +++ b/go/caching/caching.go @@ -14,11 +14,10 @@ type DbCache interface { GetStoredProjects(ctx context.Context) ([]string, error) CheckIfProjectExists(ctx context.Context, projectID string) (bool, error) StoreProjects(background context.Context, projects []string) error - AddUnfinalizedSnapshotCID(ctx context.Context, msg *datamodel.PayloadCommitMessage) error - AddSnapshotterStatusReport(ctx context.Context, epochId int, projectId string, report *datamodel.SnapshotterStatusReport, incrCount bool) error + AddSnapshotterStatusReport(ctx context.Context, epochId int, projectId string, report *datamodel.SnapshotterStatusReport) error StoreLastFinalizedEpoch(ctx context.Context, projectID string, epochId int) error StoreFinalizedSnapshot(ctx context.Context, msg *datamodel.PowerloomSnapshotFinalizedMessage) error - GetFinalizedSnapshotAtEpochID(ctx context.Context, projectID string, epochId int) (*datamodel.PowerloomSnapshotFinalizedMessage, error) + GetFinalizedSnapshotAtEpochID(ctx context.Context, projectID string, epochId int) (string, error) } // DiskCache is responsible for data caching in local disk diff --git a/go/caching/redis.go b/go/caching/redis.go index e71b005..9f7ddda 100644 --- a/go/caching/redis.go +++ b/go/caching/redis.go @@ -65,24 +65,24 @@ func (r *RedisCache) UpdateEpochProcessingStatus(ctx context.Context, projectID func (r *RedisCache) GetUnfinalizedSnapshotAtEpochID(ctx context.Context, projectID string, epochId int) (*datamodel.UnfinalizedSnapshot, error) { key := fmt.Sprintf(redisutils.REDIS_KEY_PROJECT_UNFINALIZED_SNAPSHOT_CIDS, projectID) - snapshotCid := "" height := strconv.Itoa(epochId) - log.WithField("projectID", projectID). + l := log.WithField("projectID", projectID). WithField("epochId", epochId). - WithField("key", key). - Debug("getting snapshotCid from redis at given epochId from the given projectId") + WithField("key", key) + l.Debug("getting snapshotCid from redis at given epochId from the given projectId") res, err := r.readClient.ZRangeByScoreWithScores(ctx, key, &redis.ZRangeBy{ Min: height, Max: height, }).Result() + l.WithField("redis result ", res).Debug("Got result from redis") if err != nil { if errors.Is(err, redis.Nil) { return nil, nil } - log.Error("could not Get snapshot cid from redis error: ", err) + l.Error("could not Get snapshot cid from redis error: ", err) return nil, err } @@ -92,17 +92,17 @@ func (r *RedisCache) GetUnfinalizedSnapshotAtEpochID(ctx context.Context, projec val, ok := res[0].Member.(string) if !ok { - log.Error("CRITICAL: could not convert snapshot cid data stored in redis to string") + l.Error("CRITICAL: could not convert snapshot cid data stored in redis to string") } err = json.Unmarshal([]byte(val), p) if err != nil { - log.WithError(err).Error("CRITICAL: could not unmarshal snapshot cid data stored in redis") + l.WithError(err).Error("CRITICAL: could not unmarshal snapshot cid data stored in redis") return nil, err } - log.WithField("snapshotCid", snapshotCid).WithField("epochId", epochId).Debug("got snapshot at given epochId") + l.WithField("snapshotCid", p.SnapshotCID).WithField("epochId", epochId).Debug("got snapshot at given epochId") return p, nil } @@ -156,97 +156,10 @@ func (r *RedisCache) StoreProjects(background context.Context, projects []string return nil } -// AddUnfinalizedSnapshotCID adds the given snapshot cid to the given project's zset. -func (r *RedisCache) AddUnfinalizedSnapshotCID(ctx context.Context, msg *datamodel.PayloadCommitMessage) error { - p := new(datamodel.UnfinalizedSnapshot) - - p.Expiration = time.Now().Unix() + 3600*24 // 1 day - p.SnapshotCID = msg.SnapshotCID - - p.Snapshot = msg.Message - - key := fmt.Sprintf(redisutils.REDIS_KEY_PROJECT_UNFINALIZED_SNAPSHOT_CIDS, msg.ProjectID) - - data, _ := json.Marshal(p) - - err := r.writeClient.ZAdd(ctx, key, &redis.Z{ - Score: float64(msg.EpochID), - Member: string(data), - }).Err() - if err != nil { - log.WithError(err).Error("failed to add snapshot cid to zset") - - return err - } - - // get all the members - res, err := r.writeClient.ZRangeByScoreWithScores(ctx, key, &redis.ZRangeBy{ - Min: "-inf", - Max: "+inf", - }).Result() - if err != nil { - log.WithError(err).Error("failed to get all members from zset") - - // ignore error - return nil - } - - // remove all the members that have expired - for _, member := range res { - m := new(datamodel.UnfinalizedSnapshot) - - val, ok := member.Member.(string) - if !ok { - log.Error("CRITICAL: could not convert snapshot cid data stored in redis to string") - } - - err = json.Unmarshal([]byte(val), m) - if err != nil { - log.WithError(err).Error("CRITICAL: could not unmarshal snapshot cid data stored in redis") - - continue - } - - if float64(m.Expiration) < float64(time.Now().Unix()) { - err = r.writeClient.ZRem(ctx, key, member.Member).Err() - if err != nil { - log.WithError(err).Error("failed to remove expired snapshot cid from zset") - } - } - } - - log.WithField("projectID", msg.ProjectID). - WithField("snapshotCid", msg.SnapshotCID). - WithField("epochId", msg.EpochID). - Debug("added snapshot CID to zset") - - return nil -} - // AddSnapshotterStatusReport adds the snapshotter's status report to the given project and epoch ID. -func (r *RedisCache) AddSnapshotterStatusReport(ctx context.Context, epochId int, projectId string, report *datamodel.SnapshotterStatusReport, incrCount bool) error { +func (r *RedisCache) AddSnapshotterStatusReport(ctx context.Context, epochId int, projectId string, report *datamodel.SnapshotterStatusReport) error { key := fmt.Sprintf(redisutils.REDIS_KEY_SNAPSHOTTER_STATUS_REPORT, projectId) - - storedReport := new(datamodel.SnapshotterStatusReport) - - reportJsonString, err := r.readClient.HGet(ctx, key, strconv.Itoa(epochId)).Result() - if err == nil || reportJsonString != "" { - _ = json.Unmarshal([]byte(reportJsonString), storedReport) - } - if report != nil { - if storedReport.SubmittedSnapshotCid != "" { - report.SubmittedSnapshotCid = storedReport.SubmittedSnapshotCid - } - - if storedReport.Reason != "" { - report.Reason = storedReport.Reason - } - - if storedReport.FinalizedSnapshotCid != "" { - report.FinalizedSnapshotCid = storedReport.FinalizedSnapshotCid - } - reportJson, err := json.Marshal(report) if err != nil { log.WithError(err).Error("failed to marshal snapshotter status report") @@ -261,21 +174,21 @@ func (r *RedisCache) AddSnapshotterStatusReport(ctx context.Context, epochId int return err } - switch { - case report.State == datamodel.MissedSnapshotSubmission: - key = fmt.Sprintf(redisutils.REDIS_KEY_TOTAL_MISSED_SNAPSHOT_COUNT, projectId) - case report.State == datamodel.IncorrectSnapshotSubmission: - key = fmt.Sprintf(redisutils.REDIS_KEY_TOTAL_INCORRECT_SNAPSHOT_COUNT, projectId) + incrKey := "" + if report.State == datamodel.SuccessfulSnapshotSubmission { + incrKey = redisutils.REDIS_KEY_TOTAL_SUCCESSFUL_SNAPSHOT_COUNT + } else if report.State == datamodel.IncorrectSnapshotSubmission { + incrKey = redisutils.REDIS_KEY_TOTAL_INCORRECT_SNAPSHOT_COUNT + } else if report.State == datamodel.MissedSnapshotSubmission { + incrKey = redisutils.REDIS_KEY_TOTAL_MISSED_SNAPSHOT_COUNT } - } else { - key = fmt.Sprintf(redisutils.REDIS_KEY_TOTAL_SUCCESSFUL_SNAPSHOT_COUNT, projectId) - } - - if incrCount { - err = r.writeClient.Incr(ctx, key).Err() - if err != nil { - log.WithError(err).Error("failed to increment total missed snapshot count") + if incrKey != "" { + err = r.writeClient.Incr(ctx, fmt.Sprintf(incrKey, projectId)).Err() + if err != nil { + log.WithError(err).Error("failed to increment snapshotter status report count in redis") + } } + } log.Debug("added snapshotter status report in redis") @@ -289,9 +202,9 @@ func (r *RedisCache) StoreLastFinalizedEpoch(ctx context.Context, projectID stri l := log.WithField("key", key) lastEpochStr, err := r.readClient.Get(ctx, key).Result() - if err != nil { - l.WithError(err).Error("failed to get last finalized epoch from redis") - } + // if err != nil { + // l.WithError(err).Error("failed to get last finalized epoch from redis") + // } lastEpoch := 0 @@ -315,7 +228,7 @@ func (r *RedisCache) StoreLastFinalizedEpoch(ctx context.Context, projectID stri return err } - l.WithField("projectID", projectID).WithField("epochId", epochId).Debug("stored last finalized epoch in redis") + l.Debug("stored last finalized epoch in redis") return nil } @@ -326,16 +239,9 @@ func (r *RedisCache) StoreFinalizedSnapshot(ctx context.Context, msg *datamodel. msg.Expiry = msg.Timestamp + 3600*24 // 1 day - data, err := json.Marshal(msg) - if err != nil { - l.WithError(err).Error("failed to marshal finalized snapshot") - - return err - } - - err = r.writeClient.ZAdd(ctx, key, &redis.Z{ + err := r.writeClient.ZAdd(ctx, key, &redis.Z{ Score: float64(msg.EpochID), - Member: string(data), + Member: msg.SnapshotCID, }).Err() if err != nil { @@ -345,54 +251,17 @@ func (r *RedisCache) StoreFinalizedSnapshot(ctx context.Context, msg *datamodel. } // get all the members - res, err := r.readClient.ZRangeByScoreWithScores(ctx, key, &redis.ZRangeBy{ - Min: "-inf", - Max: "+inf", - }).Result() + res, err := r.writeClient.ZRemRangeByScore(ctx, key, "-inf", strconv.Itoa(msg.EpochID-10240)).Result() if err != nil { - log.WithError(err).Error("failed to get all finalized snapshots") - + log.WithField("epoch ID", msg.EpochID).WithField("Redis Op result:", res).WithError(err).Error("failed to remove finalized snapshots zset cache between -inf to epochId-10240") // ignore error return nil } - - // remove all the members that have expired - membersToRemove := make([]interface{}, 0) - timeNow := time.Now().Unix() - - for _, member := range res { - m := new(datamodel.PowerloomSnapshotFinalizedMessage) - - val, ok := member.Member.(string) - if !ok { - log.Error("CRITICAL: could not convert snapshot cid data stored in redis to string") - } - - err = json.Unmarshal([]byte(val), m) - if err != nil { - log.WithError(err).Error("CRITICAL: could not unmarshal snapshot cid data stored in redis") - - continue - } - - if float64(m.Expiry) < float64(timeNow) { - membersToRemove = append(membersToRemove, member.Member) - } - } - - if len(membersToRemove) == 0 { - return nil - } - - err = r.writeClient.ZRem(ctx, key, membersToRemove).Err() - if err != nil { - log.WithError(err).Error("failed to remove expired snapshot cid from zset") - } - + log.WithField("epoch ID", msg.EpochID).WithField("Redis Op result:", res).Debug("removed finalized snapshots zset cache between -inf to epochId-10240") return nil } -func (r *RedisCache) GetFinalizedSnapshotAtEpochID(ctx context.Context, projectID string, epochId int) (*datamodel.PowerloomSnapshotFinalizedMessage, error) { +func (r *RedisCache) GetFinalizedSnapshotAtEpochID(ctx context.Context, projectID string, epochId int) (string, error) { key := fmt.Sprintf(redisutils.REDIS_KEY_FINALIZED_SNAPSHOTS, projectID) l := log.WithField("projectId", projectID).WithField("epochId", epochId) @@ -403,36 +272,27 @@ func (r *RedisCache) GetFinalizedSnapshotAtEpochID(ctx context.Context, projectI if err != nil { l.WithError(err).Error("failed to get finalized snapshot from zset") - return nil, err + return "", err } if len(res) == 0 { l.Debug("no finalized snapshot found at epochId") - return nil, fmt.Errorf("no finalized snapshot found at epochId %d", epochId) + return "", fmt.Errorf("no finalized snapshot found at epochId %d", epochId) } if len(res) > 1 { l.Debug("more than one finalized snapshot found at epochId") - return nil, fmt.Errorf("more than one finalized snapshot found at epochId %d", epochId) + return "", fmt.Errorf("more than one finalized snapshot found at epochId %d", epochId) } - m := new(datamodel.PowerloomSnapshotFinalizedMessage) - - val, ok := res[0].Member.(string) + snapshot_cid, ok := res[0].Member.(string) if !ok { l.Error("CRITICAL: could not convert finalized snapshot data stored in redis to string") - return nil, errors.New("failed to convert finalized snapshot data stored in redis to string") - } - - err = json.Unmarshal([]byte(val), m) - if err != nil { - l.WithError(err).Error("CRITICAL: could not unmarshal finalized snapshot data stored in redis") - - return nil, err + return "", errors.New("failed to convert finalized snapshot data stored in redis to string") } - return m, nil + return snapshot_cid, nil } diff --git a/go/goutils/datamodel/data_model.go b/go/goutils/datamodel/data_model.go index 999d790..90f406e 100644 --- a/go/goutils/datamodel/data_model.go +++ b/go/goutils/datamodel/data_model.go @@ -5,8 +5,10 @@ import "github.com/ethereum/go-ethereum/signer/core/apitypes" type SnapshotSubmissionState string const ( - MissedSnapshotSubmission SnapshotSubmissionState = "MISSED_SNAPSHOT" - IncorrectSnapshotSubmission SnapshotSubmissionState = "SUBMITTED_INCORRECT_SNAPSHOT" + MissedSnapshotSubmission SnapshotSubmissionState = "MISSED_SNAPSHOT" + IncorrectSnapshotSubmission SnapshotSubmissionState = "SUBMITTED_INCORRECT_SNAPSHOT" + OnlyFinalizedSnapshotSubmission SnapshotSubmissionState = "ONLY_FINALIZED_SNAPSHOT_RECIEVED" + SuccessfulSnapshotSubmission SnapshotSubmissionState = "SUCCESSFUL_SNAPSHOT_SUBMISSION" ) type SnapshotterStateUpdate struct { @@ -46,12 +48,10 @@ type ( ) type PayloadCommitMessage struct { - Message map[string]interface{} `json:"message"` - Web3Storage bool `json:"web3Storage"` - SourceChainID int `json:"sourceChainId"` - ProjectID string `json:"projectId"` - EpochID int `json:"epochId"` - SnapshotCID string `json:"snapshotCID"` + SourceChainID int `json:"sourceChainId"` + ProjectID string `json:"projectId"` + EpochID int `json:"epochId"` + SnapshotCID string `json:"snapshotCID"` } type PowerloomSnapshotFinalizedMessage struct { @@ -78,7 +78,6 @@ type SnapshotRelayerPayload struct { type SnapshotterStatusReport struct { SubmittedSnapshotCid string `json:"submittedSnapshotCid,omitempty"` - SubmittedSnapshot map[string]interface{} `json:"submittedSnapshot,omitempty"` FinalizedSnapshotCid string `json:"finalizedSnapshotCid"` FinalizedSnapshot map[string]interface{} `json:"finalizedSnapshot,omitempty"` State SnapshotSubmissionState `json:"state"` @@ -88,7 +87,6 @@ type SnapshotterStatusReport struct { type UnfinalizedSnapshot struct { SnapshotCID string `json:"snapshotCid"` Snapshot map[string]interface{} `json:"snapshot"` - Expiration int64 `json:"expiration"` } type SnapshotterIssue struct { diff --git a/go/goutils/ipfsutils/ipfs_utils.go b/go/goutils/ipfsutils/ipfs_utils.go index 6ea00c5..3a7f2dc 100644 --- a/go/goutils/ipfsutils/ipfs_utils.go +++ b/go/goutils/ipfsutils/ipfs_utils.go @@ -1,9 +1,7 @@ package ipfsutils import ( - "bytes" "context" - "encoding/json" "errors" "fmt" "net" @@ -17,7 +15,6 @@ import ( "github.com/swagftw/gi" "golang.org/x/time/rate" - "audit-protocol/goutils/datamodel" "audit-protocol/goutils/httpclient" "audit-protocol/goutils/settings" ) @@ -196,37 +193,6 @@ func ParseURL(ipfsUrl string) (string, error) { return ipfsUrl, nil } -func (client *IpfsClient) UploadSnapshotToIPFS(payloadCommit *datamodel.PayloadCommitMessage) error { - err := client.writeClientRateLimiter.Wait(context.Background()) - if err != nil { - log.WithError(err).Error("ipfs rate limiter errored") - - return err - } - - msg, err := json.Marshal(payloadCommit.Message) - if err != nil { - log.WithError(err).Error("failed to marshal payload commit message") - - return err - } - - snapshotCid, err := client.writeClient.Add(bytes.NewReader(msg), shell.CidVersion(1)) - if err != nil { - log.WithError(err).Error("failed to add snapshot to ipfs") - - return err - } - - log.WithField("snapshotCID", snapshotCid). - WithField("epochId", payloadCommit.EpochID). - Debug("ipfs add Successful") - - payloadCommit.SnapshotCID = snapshotCid - - return nil -} - // GetSnapshotFromIPFS returns the snapshot from IPFS. func (client *IpfsClient) GetSnapshotFromIPFS(snapshotCID string, outputPath string) error { err := client.readClientRateLimiter.Wait(context.Background()) diff --git a/go/payload-commit/service/service.go b/go/payload-commit/service/service.go index 0e1465a..90b3bfc 100644 --- a/go/payload-commit/service/service.go +++ b/go/payload-commit/service/service.go @@ -7,15 +7,12 @@ import ( "encoding/hex" "encoding/json" "errors" - "fmt" "math/big" "net/http" "os" "path/filepath" "strconv" "strings" - "sync" - "time" "github.com/cenkalti/backoff/v4" "github.com/ethereum/go-ethereum/accounts/abi/bind" @@ -37,8 +34,6 @@ import ( "audit-protocol/goutils/smartcontract/transactions" "audit-protocol/goutils/taskmgr" rabbitmqMgr "audit-protocol/goutils/taskmgr/rabbitmq" - "audit-protocol/goutils/taskmgr/worker" - w3storage "audit-protocol/goutils/w3s" "audit-protocol/payload-commit/signer" ) @@ -50,7 +45,6 @@ type PayloadCommitService struct { ethClient *ethclient.Client contractAPI *contractApi.ContractApi ipfsClient *ipfsutils.IpfsClient - web3sClient *w3storage.W3S diskCache *caching.LocalDiskCache txManager *transactions.TxManager privKey *ecdsa.PrivateKey @@ -87,11 +81,6 @@ func InitPayloadCommitService(reporter *reporting.IssueReporter) *PayloadCommitS log.WithError(err).Fatal("failed to invoke ipfs client") } - web3sClient, err := gi.Invoke[*w3storage.W3S]() - if err != nil { - log.WithError(err).Fatal("failed to invoke web3s client") - } - diskCache, err := gi.Invoke[*caching.LocalDiskCache]() if err != nil { log.WithError(err).Fatal("failed to invoke disk cache") @@ -113,7 +102,6 @@ func InitPayloadCommitService(reporter *reporting.IssueReporter) *PayloadCommitS ethClient: ethClient, contractAPI: contractAPI, ipfsClient: ipfsClient, - web3sClient: web3sClient, diskCache: diskCache, txManager: transactions.NewNonceManager(), privKey: privKey, @@ -174,44 +162,11 @@ func (s *PayloadCommitService) Run(msgBody []byte, topic string) error { // HandlePayloadCommitTask handles the payload commit task. func (s *PayloadCommitService) HandlePayloadCommitTask(msg *datamodel.PayloadCommitMessage) error { - log.WithField("project_id", msg.ProjectID).Info("handling payload commit task") - - // check if payload cid is already present at the epochId, for the given project in redis - // if yes, then skip the task - unfinalizedSnapshot, err := s.redisCache.GetUnfinalizedSnapshotAtEpochID(context.Background(), msg.ProjectID, msg.EpochID) - if unfinalizedSnapshot != nil { - log.WithField("epochId", msg.EpochID). - WithField("messageId", msg.ProjectID). - WithField("snapshotCid", unfinalizedSnapshot.SnapshotCID). - WithError(err).Debug("payload commit message already processed for the given epochId and project, skipping task") - - return nil - } - - // upload payload commit msg to ipfs and web3 storage - err = s.uploadToIPFSandW3s(msg) - if err != nil { - log.WithError(err).Error("failed to upload snapshot to ipfs") - - errMsg := "failed to upload snapshot to ipfs" - if msg.Web3Storage { - errMsg = "failed to upload snapshot to ipfs and web3 storage" - } - - go s.issueReporter.Report(reporting.PayloadCommitInternalIssue, msg.ProjectID, strconv.Itoa(msg.EpochID), map[string]interface{}{ - "issueDetails": "Error: " + err.Error(), - "msg": errMsg, - }) - - go s.redisCache.AddSnapshotterStatusReport(context.Background(), msg.EpochID, msg.ProjectID, &datamodel.SnapshotterStatusReport{ - SubmittedSnapshotCid: "", - State: datamodel.MissedSnapshotSubmission, - Reason: errMsg, - }, false) - } - - // publish snapshot submitted event - go s.publishSnapshotSubmittedEvent(msg) + log.WithFields(log.Fields{ + "project ID": msg.ProjectID, + "epoch ID": msg.EpochID, + "snapshot CID": msg.SnapshotCID, + }).Info("handling payload commit task") // sign payload commit message (eip712 signature) signerData, signature, err := s.signPayload(msg.SnapshotCID, msg.ProjectID, int64(msg.EpochID)) @@ -220,7 +175,7 @@ func (s *PayloadCommitService) HandlePayloadCommitTask(msg *datamodel.PayloadCom SubmittedSnapshotCid: msg.SnapshotCID, State: datamodel.MissedSnapshotSubmission, Reason: "failed to sign payload commit message", - }, false) + }) return err } @@ -258,7 +213,7 @@ func (s *PayloadCommitService) HandlePayloadCommitTask(msg *datamodel.PayloadCom SubmittedSnapshotCid: msg.SnapshotCID, State: datamodel.MissedSnapshotSubmission, Reason: "failed to submit snapshot to contract", - }, false) + }) return err } @@ -282,25 +237,12 @@ func (s *PayloadCommitService) HandlePayloadCommitTask(msg *datamodel.PayloadCom SubmittedSnapshotCid: msg.SnapshotCID, State: datamodel.MissedSnapshotSubmission, Reason: "failed to submit snapshot to relayer", - }, false) + }) return err } go s.redisCache.UpdateEpochProcessingStatus(context.Background(), msg.ProjectID, msg.EpochID, "success", "") - } - // store unfinalized payload cid in redis - err = s.redisCache.AddUnfinalizedSnapshotCID(context.Background(), msg) - if err != nil { - log.WithField("epochId", msg.EpochID). - WithField("messageId", msg.ProjectID). - WithField("snapshotCid", msg.SnapshotCID). - WithError(err).Error("failed to store snapshot cid in redis") - - go s.issueReporter.Report(reporting.PayloadCommitInternalIssue, msg.ProjectID, strconv.Itoa(msg.EpochID), map[string]interface{}{ - "issueDetails": "Error: " + err.Error(), - "msg": "failed to store unfinalized snapshot in redis", - }) } return nil @@ -308,139 +250,99 @@ func (s *PayloadCommitService) HandlePayloadCommitTask(msg *datamodel.PayloadCom // HandleFinalizedPayloadCommitTask handles finalized payload commit task. func (s *PayloadCommitService) HandleFinalizedPayloadCommitTask(msg *datamodel.PayloadCommitFinalizedMessage) error { - log.Debug("handling finalized payload commit task") + // log.Debug("handling finalized payload commit task") + log.WithFields(log.Fields{ + "project ID": msg.Message.ProjectID, + "epoch ID": msg.Message.EpochID, + "snapshot CID": msg.Message.SnapshotCID, + }).Debug("handling finalized payload commit task") // storing current finalized snapshot in redis go s.redisCache.StoreFinalizedSnapshot(context.Background(), msg.Message) - - prevEpochId := msg.Message.EpochID - 1 - - // fetch previous finalized snapshot from redis. - // fetching previous finalized snapshot as current snapshotter might not have submitted the snapshot before it got finalized by other snapshotter(s) - prevSnapshot, err := s.redisCache.GetFinalizedSnapshotAtEpochID(context.Background(), msg.Message.ProjectID, prevEpochId) - if err != nil || prevSnapshot == nil { - log.WithField("epochId", msg.Message.EpochID-1).WithError(err).Error("failed to get finalized snapshot cid from redis") - - return err - } - - // check if payload is already in cache - unfinalizedSnapshot, err := s.redisCache.GetUnfinalizedSnapshotAtEpochID(context.Background(), msg.Message.ProjectID, prevEpochId) - if err != nil { - log.WithError(err).Error("failed to get snapshot cid from redis") - } - var report *datamodel.SnapshotterStatusReport - - // if snapshot cid is not found in redis snapshot was missed - if unfinalizedSnapshot == nil { - log.Debug("snapshot was missed, fetching snapshot from ipfs") - - dirPath := filepath.Join(s.settingsObj.LocalCachePath, msg.Message.ProjectID, "snapshots") - filePath := filepath.Join(dirPath, prevSnapshot.SnapshotCID+".json") - - // create file, if it does not exist - err = os.MkdirAll(dirPath, os.ModePerm) - if err != nil { - log.WithError(err).Error("failed to create file") - } - - // get snapshot from ipfs and store it in output path - err = s.ipfsClient.GetSnapshotFromIPFS(prevSnapshot.SnapshotCID, filePath) - if err != nil { - log.WithError(err).Error("failed to get snapshot from ipfs") - + unfinalized, err := s.redisCache.GetUnfinalizedSnapshotAtEpochID(context.Background(), msg.Message.ProjectID, msg.Message.EpochID) + if unfinalized != nil { + if unfinalized.SnapshotCID != msg.Message.SnapshotCID { + log.WithFields(log.Fields{ + "project ID": msg.Message.ProjectID, + "epoch ID": msg.Message.EpochID, + "unfinalized snapshot CID": unfinalized.SnapshotCID, + "finalized snapshot CID": msg.Message.SnapshotCID, + }).Debug("cached unfinalized snapshot cid does not match with finalized snapshot cid, fetching snapshot commit message from ipfs") go s.issueReporter.Report( - reporting.PayloadCommitInternalIssue, + reporting.SubmittedIncorrectSnapshotIssue, msg.Message.ProjectID, - strconv.Itoa(prevEpochId), + strconv.Itoa(msg.Message.EpochID), map[string]interface{}{ - "issueDetails": "Error: " + err.Error(), - "msg": "failed to get snapshot from ipfs", + "issueDetails": "Error: submitted snapshot cid does not match with finalized snapshot cid", + "submittedSnapshotCID": unfinalized.SnapshotCID, + "finalizedSnapshotCID": msg.Message.SnapshotCID, }) - } - - report = &datamodel.SnapshotterStatusReport{ - FinalizedSnapshotCid: prevSnapshot.SnapshotCID, - State: datamodel.MissedSnapshotSubmission, - Reason: "INTERNAL_ERROR: snapshot was missed due to internal error", - } - } else if unfinalizedSnapshot.SnapshotCID != prevSnapshot.SnapshotCID { - // if stored snapshot cid does not match with finalized snapshot cid, fetch snapshot from ipfs and store in local disk. - log.Debug("cached snapshot cid does not match with finalized snapshot cid, fetching snapshot commit message from ipfs") - - go s.issueReporter.Report( - reporting.SubmittedIncorrectSnapshotIssue, - msg.Message.ProjectID, - strconv.Itoa(prevEpochId), - map[string]interface{}{ - "issueDetails": "Error: " + "submitted snapshot cid does not match with finalized snapshot cid", - "submittedSnapshotCID": unfinalizedSnapshot.SnapshotCID, - "finalizedSnapshotCID": prevSnapshot.SnapshotCID, - }) - - dirPath := filepath.Join(s.settingsObj.LocalCachePath, msg.Message.ProjectID, "snapshots") - filePath := filepath.Join(dirPath, prevSnapshot.SnapshotCID+".json") - - // create file, if it does not exist - err = os.MkdirAll(dirPath, os.ModePerm) - if err != nil { - log.WithError(err).Error("failed to create file") - } - - finalizedSnapshot := make(map[string]interface{}) - - // get snapshot from ipfs and store it in output path - err = s.ipfsClient.GetSnapshotFromIPFS(prevSnapshot.SnapshotCID, filePath) - if err != nil { - log.WithError(err).Error("failed to get snapshot from ipfs") + report = &datamodel.SnapshotterStatusReport{ + SubmittedSnapshotCid: unfinalized.SnapshotCID, + FinalizedSnapshotCid: msg.Message.SnapshotCID, + State: datamodel.IncorrectSnapshotSubmission, + Reason: "INTERNAL_ERROR: submitted snapshot cid does not match with finalized snapshot cid", + } + // unpin unfinalized snapshot cid from ipfs + err = s.ipfsClient.Unpin(unfinalized.SnapshotCID) + if err != nil { + log.WithError(err).WithField("cid", unfinalized.SnapshotCID).Error("failed to unpin snapshot cid from ipfs") + } - go s.issueReporter.Report( - reporting.PayloadCommitInternalIssue, - msg.Message.ProjectID, - strconv.Itoa(prevEpochId), - map[string]interface{}{ - "issueDetails": "Error: " + err.Error(), - "msg": "failed to get snapshot from ipfs", - }) } else { - snapshotDataBytes, _ := os.ReadFile(filePath) - _ = json.Unmarshal(snapshotDataBytes, &finalizedSnapshot) + report = &datamodel.SnapshotterStatusReport{ + SubmittedSnapshotCid: unfinalized.SnapshotCID, + FinalizedSnapshotCid: msg.Message.SnapshotCID, + State: datamodel.SuccessfulSnapshotSubmission, + Reason: "Submission and finalized CID match", + } } + } else { report = &datamodel.SnapshotterStatusReport{ - SubmittedSnapshotCid: unfinalizedSnapshot.SnapshotCID, - SubmittedSnapshot: unfinalizedSnapshot.Snapshot, - FinalizedSnapshotCid: prevSnapshot.SnapshotCID, - FinalizedSnapshot: finalizedSnapshot, - State: datamodel.IncorrectSnapshotSubmission, - Reason: "INTERNAL_ERROR: submitted snapshot cid does not match with finalized snapshot cid", + SubmittedSnapshotCid: "", + FinalizedSnapshotCid: msg.Message.SnapshotCID, + State: datamodel.OnlyFinalizedSnapshotSubmission, + Reason: "Only finalized CID received", } + } + dirPath := filepath.Join(s.settingsObj.LocalCachePath, msg.Message.ProjectID, "snapshots") - // unpin unfinalized snapshot cid from ipfs - err = s.ipfsClient.Unpin(unfinalizedSnapshot.SnapshotCID) - if err != nil { - log.WithError(err).WithField("cid", unfinalizedSnapshot.SnapshotCID).Error("failed to unpin snapshot cid from ipfs") - } + // create file, if it does not exist + err = os.MkdirAll(dirPath, os.ModePerm) + if err != nil { + log.WithError(err).Error("failed to create directory for snapshots") } else { - outputPath := filepath.Join(s.settingsObj.LocalCachePath, msg.Message.ProjectID, "snapshots", prevSnapshot.SnapshotCID+".json") - - data, err := json.Marshal(unfinalizedSnapshot.Snapshot) + outputPath := filepath.Join(s.settingsObj.LocalCachePath, msg.Message.ProjectID, "snapshots", msg.Message.SnapshotCID+".json") if err != nil { log.WithError(err).Error("failed to marshal payload commit message") + } else { + // get snapshot from ipfs and store it in output path + err = s.ipfsClient.GetSnapshotFromIPFS(msg.Message.SnapshotCID, outputPath) + if err != nil { + log.WithFields(log.Fields{ + "project ID": msg.Message.ProjectID, + "epoch ID": msg.Message.EpochID, + "snapshot CID": msg.Message.SnapshotCID, + }).WithError(err).Error("failed to get finalizedsnapshot from ipfs") + + go s.issueReporter.Report( + reporting.PayloadCommitInternalIssue, + msg.Message.ProjectID, + strconv.Itoa(msg.Message.EpochID), + map[string]interface{}{ + "issueDetails": "Error: " + err.Error(), + "msg": "failed to get finalized snapshot from ipfs, epoch id: " + + strconv.Itoa(msg.Message.EpochID) + + ", snapshot cid: " + msg.Message.SnapshotCID, + }) + } } - err = s.diskCache.Write(outputPath, data) - if err != nil { - log.WithError(err).Error("failed to write payload commit message to disk cache") - } - - // if snapshot cid matches with finalized snapshot cid just increment the snapshot success count - report = nil } - // generate report and store in redis - err = s.redisCache.AddSnapshotterStatusReport(context.Background(), prevEpochId, msg.Message.ProjectID, report, true) + err = s.redisCache.AddSnapshotterStatusReport(context.Background(), msg.Message.EpochID, msg.Message.ProjectID, report) if err != nil { log.WithError(err).Error("failed to add snapshotter status report to redis") } @@ -453,87 +355,6 @@ func (s *PayloadCommitService) HandleFinalizedPayloadCommitTask(msg *datamodel.P return nil } -func (s *PayloadCommitService) uploadToIPFSandW3s(msg *datamodel.PayloadCommitMessage) error { - log.WithField("msg", msg).Debug("uploading payload commit msg to ipfs and web3 storage") - - wg := sync.WaitGroup{} - - // upload to ipfs - var ipfsUploadErr error - - ipfsErrChan := make(chan error) - - wg.Add(1) - - go func() { - defer wg.Done() - - ipfsUploadErr = backoff.Retry(func() error { - ipfsUploadErr = s.ipfsClient.UploadSnapshotToIPFS(msg) - if ipfsUploadErr != nil { - log.WithError(ipfsUploadErr).Error("failed to upload snapshot to ipfs, retrying") - - return ipfsUploadErr - } - - return nil - }, backoff.WithMaxRetries(backoff.NewExponentialBackOff(), 5)) - - if ipfsUploadErr != nil { - log.WithError(ipfsUploadErr).Error("failed to upload snapshot to ipfs after max retries") - ipfsErrChan <- ipfsUploadErr - - return - } - - ipfsErrChan <- nil - }() - - // upload to web3 storage - var ( - w3sUploadErr error - snapshotCid string - ) - - if msg.Web3Storage { - wg.Add(1) - - go func() { - defer wg.Done() - - w3sUploadErr = backoff.Retry(func() error { - snapshotCid, w3sUploadErr = s.web3sClient.UploadToW3s(msg.Message) - if w3sUploadErr != nil { - return w3sUploadErr - } - - return nil - }, backoff.WithMaxRetries(backoff.NewExponentialBackOff(), 5)) - - if w3sUploadErr != nil { - log.WithError(w3sUploadErr).Error("failed to upload snapshot to web3 storage after max retries") - } - }() - } - - // if ipfs upload fails, wait for web3 storage upload to finish if msg.Web3Storage flag is true - if err := <-ipfsErrChan; err != nil { - if msg.Web3Storage { - wg.Wait() - - if w3sUploadErr != nil || snapshotCid == "" { - return fmt.Errorf("failed to upload to ipfs and web3 storage") - } - - msg.SnapshotCID = snapshotCid - } else { - return fmt.Errorf("failed to upload to ipfs") - } - } - - return nil -} - // signPayload signs the payload commit message for relayer. func (s *PayloadCommitService) signPayload(snapshotCid, projectId string, epochId int64) (*apitypes.TypedData, []byte, error) { log.Debug("signing payload commit message") @@ -684,30 +505,3 @@ func (s *PayloadCommitService) initLocalCachedData() error { return nil } - -// publishSnapshotSubmittedEvent publishes snapshot submitted event message -func (s *PayloadCommitService) publishSnapshotSubmittedEvent(msg *datamodel.PayloadCommitMessage) { - eventMsg := &datamodel.SnapshotSubmittedEventMessage{ - SnapshotCid: msg.SnapshotCID, - EpochId: msg.EpochID, - ProjectId: msg.ProjectID, - BroadcastId: s.uuid.String(), - Timestamp: time.Now().Unix(), - } - - msgBytes, err := json.Marshal(eventMsg) - if err != nil { - log.WithError(err).Error("failed to marshal snapshot submitted event message") - - return - } - - err = s.taskMgr.Publish(context.Background(), worker.TypeEventDetectorWorker, msgBytes) - if err != nil { - log.WithField("msg", string(msgBytes)).WithError(err).Error("failed to publish snapshot submitted event message") - - return - } - - log.Info("published snapshot submitted event message") -} diff --git a/pm2.config.js b/pm2.config.js index 6351199..46312b8 100644 --- a/pm2.config.js +++ b/pm2.config.js @@ -1,6 +1,6 @@ // this means if app restart {MAX_RESTART} times in 1 min then it stops -const MAX_RESTART = 10; -const MIN_UPTIME = 60000; +// const MAX_RESTART = 10; +// const MIN_UPTIME = 60000; const NODE_ENV = process.env.NODE_ENV || 'development'; module.exports = { @@ -9,9 +9,9 @@ module.exports = { name : "ap-payload-commit", script : "./payload-commit", cwd : `${__dirname}/go/payload-commit`, - max_restarts: MAX_RESTART, - min_uptime: MIN_UPTIME, - kill_timeout : 3000, + // max_restarts: MAX_RESTART, + // min_uptime: MIN_UPTIME, + // kill_timeout : 3000, env: { NODE_ENV: NODE_ENV, CONFIG_PATH:`${__dirname}`, @@ -23,9 +23,9 @@ module.exports = { name : "ap-pruning", script : "./pruning", cwd : `${__dirname}/go/pruning`, - max_restarts: MAX_RESTART, - min_uptime: MIN_UPTIME, - kill_timeout : 3000, + // max_restarts: MAX_RESTART, + // min_uptime: MIN_UPTIME, + // kill_timeout : 3000, env: { NODE_ENV: NODE_ENV, CONFIG_PATH:`${__dirname}`,