Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/payload commit port #45

Merged
merged 4 commits into from
Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 16 additions & 25 deletions docs/Diagrams/payload_commit_service.puml
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,7 @@ database ipfs
database web3_storage

group snapshot payload message
rabbitmq -> pcs: received snapshot payload message

pcs -> ipfs: store snapshot payload

alt check message payload for upload to web3_storage flag
pcs -> web3_storage: store snapshot payload
end

pcs -> redis: store unfinalized snapshot cid against epochId for the project with timestamp
rabbitmq -> pcs: received snapshot payload CID from snapshotter

note over pcs: create EIP-712 signature for smart contract transaction
alt relayer is enabled in config
Expand All @@ -30,23 +22,22 @@ group snapshot payload message
end

group snapshot finalized message
rabbitmq -> pcs: received snapshot finalized message

pcs -> redis: store finalized snapshot
redis -> pcs: get finalized snapshot against previous epochId for the project
redis -> pcs: get unfinalized snapshot against previous epochId for the project

alt unfinalized snapshotCid == finalized snapshotCid (for previous epochId)
pcs -> local_disk: store snapshot data

else stored snapshotCid != finalized snapshotCid (for previous epochId)
pcs -> ipfs: unpin unfinalized snapshotCid
pcs -> ipfs: get snapshot for finalized cid

pcs -> local_disk: store fetched snapshot data
rabbitmq -> pcs: received snapshot finalized message for a project at an `epochId`
pcs -> redis: store finalized snapshot CID at `epochId` for project
redis -> pcs: get unfinalized snapshot CID against `epochId` for the project

alt unfinalized snapshot CID not found at `epochId`
pcs -> redis: mark in snapshotter status report against project and epoch ID as `OnlyFinalizedSnapshotSubmission`
else unfinalized snapshot CID found at `epochId` for project
alt stored snapshot CID == finalized snapshot CID
pcs -> redis: clean up finalized snapshot CID zset
pcs -> redis: mark in snapshotter status report against project and epoch ID as `SuccessfulSnapshotSubmission`
else stored snapshotCid != finalized snapshotCid (for previous epochId)
pcs -> ipfs: unpin unfinalized snapshot CID
pcs -> redis: mark in snapshotter status report against project and epoch ID as `IncorrectSnapshotSubmission`
end

pcs -> redis: prune unfinalized snapshotCid older than (configured duration default is 7days)
pcs -> ipfs: get snapshot for finalized cid
pcs -> local_disk: store fetched snapshot data

pcs -> redis: generate and store snapshotter report
note left: [project wise]\ntotal missed snapshots count\ntotal incorrect snapshots count\ntotal successful snapshots count\nstatus report
Expand Down
Binary file modified docs/images/payload_commit_service_callflow.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
5 changes: 2 additions & 3 deletions go/caching/caching.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@ type DbCache interface {
GetStoredProjects(ctx context.Context) ([]string, error)
CheckIfProjectExists(ctx context.Context, projectID string) (bool, error)
StoreProjects(background context.Context, projects []string) error
AddUnfinalizedSnapshotCID(ctx context.Context, msg *datamodel.PayloadCommitMessage) error
AddSnapshotterStatusReport(ctx context.Context, epochId int, projectId string, report *datamodel.SnapshotterStatusReport, incrCount bool) error
AddSnapshotterStatusReport(ctx context.Context, epochId int, projectId string, report *datamodel.SnapshotterStatusReport) error
StoreLastFinalizedEpoch(ctx context.Context, projectID string, epochId int) error
StoreFinalizedSnapshot(ctx context.Context, msg *datamodel.PowerloomSnapshotFinalizedMessage) error
GetFinalizedSnapshotAtEpochID(ctx context.Context, projectID string, epochId int) (*datamodel.PowerloomSnapshotFinalizedMessage, error)
GetFinalizedSnapshotAtEpochID(ctx context.Context, projectID string, epochId int) (string, error)
}

// DiskCache is responsible for data caching in local disk
Expand Down
216 changes: 38 additions & 178 deletions go/caching/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,24 +65,24 @@ func (r *RedisCache) UpdateEpochProcessingStatus(ctx context.Context, projectID

func (r *RedisCache) GetUnfinalizedSnapshotAtEpochID(ctx context.Context, projectID string, epochId int) (*datamodel.UnfinalizedSnapshot, error) {
key := fmt.Sprintf(redisutils.REDIS_KEY_PROJECT_UNFINALIZED_SNAPSHOT_CIDS, projectID)
snapshotCid := ""
height := strconv.Itoa(epochId)

log.WithField("projectID", projectID).
l := log.WithField("projectID", projectID).
WithField("epochId", epochId).
WithField("key", key).
Debug("getting snapshotCid from redis at given epochId from the given projectId")
WithField("key", key)
l.Debug("getting snapshotCid from redis at given epochId from the given projectId")

res, err := r.readClient.ZRangeByScoreWithScores(ctx, key, &redis.ZRangeBy{
Min: height,
Max: height,
}).Result()
l.WithField("redis result ", res).Debug("Got result from redis")
if err != nil {
if errors.Is(err, redis.Nil) {
return nil, nil
}

log.Error("could not Get snapshot cid from redis error: ", err)
l.Error("could not Get snapshot cid from redis error: ", err)

return nil, err
}
Expand All @@ -92,17 +92,17 @@ func (r *RedisCache) GetUnfinalizedSnapshotAtEpochID(ctx context.Context, projec

val, ok := res[0].Member.(string)
if !ok {
log.Error("CRITICAL: could not convert snapshot cid data stored in redis to string")
l.Error("CRITICAL: could not convert snapshot cid data stored in redis to string")
}

err = json.Unmarshal([]byte(val), p)
if err != nil {
log.WithError(err).Error("CRITICAL: could not unmarshal snapshot cid data stored in redis")
l.WithError(err).Error("CRITICAL: could not unmarshal snapshot cid data stored in redis")

return nil, err
}

log.WithField("snapshotCid", snapshotCid).WithField("epochId", epochId).Debug("got snapshot at given epochId")
l.WithField("snapshotCid", p.SnapshotCID).WithField("epochId", epochId).Debug("got snapshot at given epochId")

return p, nil
}
Expand Down Expand Up @@ -156,97 +156,10 @@ func (r *RedisCache) StoreProjects(background context.Context, projects []string
return nil
}

// AddUnfinalizedSnapshotCID adds the given snapshot cid to the given project's zset.
func (r *RedisCache) AddUnfinalizedSnapshotCID(ctx context.Context, msg *datamodel.PayloadCommitMessage) error {
p := new(datamodel.UnfinalizedSnapshot)

p.Expiration = time.Now().Unix() + 3600*24 // 1 day
p.SnapshotCID = msg.SnapshotCID

p.Snapshot = msg.Message

key := fmt.Sprintf(redisutils.REDIS_KEY_PROJECT_UNFINALIZED_SNAPSHOT_CIDS, msg.ProjectID)

data, _ := json.Marshal(p)

err := r.writeClient.ZAdd(ctx, key, &redis.Z{
Score: float64(msg.EpochID),
Member: string(data),
}).Err()
if err != nil {
log.WithError(err).Error("failed to add snapshot cid to zset")

return err
}

// get all the members
res, err := r.writeClient.ZRangeByScoreWithScores(ctx, key, &redis.ZRangeBy{
Min: "-inf",
Max: "+inf",
}).Result()
if err != nil {
log.WithError(err).Error("failed to get all members from zset")

// ignore error
return nil
}

// remove all the members that have expired
for _, member := range res {
m := new(datamodel.UnfinalizedSnapshot)

val, ok := member.Member.(string)
if !ok {
log.Error("CRITICAL: could not convert snapshot cid data stored in redis to string")
}

err = json.Unmarshal([]byte(val), m)
if err != nil {
log.WithError(err).Error("CRITICAL: could not unmarshal snapshot cid data stored in redis")

continue
}

if float64(m.Expiration) < float64(time.Now().Unix()) {
err = r.writeClient.ZRem(ctx, key, member.Member).Err()
if err != nil {
log.WithError(err).Error("failed to remove expired snapshot cid from zset")
}
}
}

log.WithField("projectID", msg.ProjectID).
WithField("snapshotCid", msg.SnapshotCID).
WithField("epochId", msg.EpochID).
Debug("added snapshot CID to zset")

return nil
}

// AddSnapshotterStatusReport adds the snapshotter's status report to the given project and epoch ID.
func (r *RedisCache) AddSnapshotterStatusReport(ctx context.Context, epochId int, projectId string, report *datamodel.SnapshotterStatusReport, incrCount bool) error {
func (r *RedisCache) AddSnapshotterStatusReport(ctx context.Context, epochId int, projectId string, report *datamodel.SnapshotterStatusReport) error {
key := fmt.Sprintf(redisutils.REDIS_KEY_SNAPSHOTTER_STATUS_REPORT, projectId)

storedReport := new(datamodel.SnapshotterStatusReport)

reportJsonString, err := r.readClient.HGet(ctx, key, strconv.Itoa(epochId)).Result()
if err == nil || reportJsonString != "" {
_ = json.Unmarshal([]byte(reportJsonString), storedReport)
}

if report != nil {
if storedReport.SubmittedSnapshotCid != "" {
report.SubmittedSnapshotCid = storedReport.SubmittedSnapshotCid
}

if storedReport.Reason != "" {
report.Reason = storedReport.Reason
}

if storedReport.FinalizedSnapshotCid != "" {
report.FinalizedSnapshotCid = storedReport.FinalizedSnapshotCid
}

reportJson, err := json.Marshal(report)
if err != nil {
log.WithError(err).Error("failed to marshal snapshotter status report")
Expand All @@ -261,21 +174,21 @@ func (r *RedisCache) AddSnapshotterStatusReport(ctx context.Context, epochId int
return err
}

switch {
case report.State == datamodel.MissedSnapshotSubmission:
key = fmt.Sprintf(redisutils.REDIS_KEY_TOTAL_MISSED_SNAPSHOT_COUNT, projectId)
case report.State == datamodel.IncorrectSnapshotSubmission:
key = fmt.Sprintf(redisutils.REDIS_KEY_TOTAL_INCORRECT_SNAPSHOT_COUNT, projectId)
incrKey := ""
if report.State == datamodel.SuccessfulSnapshotSubmission {
incrKey = redisutils.REDIS_KEY_TOTAL_SUCCESSFUL_SNAPSHOT_COUNT
} else if report.State == datamodel.IncorrectSnapshotSubmission {
incrKey = redisutils.REDIS_KEY_TOTAL_INCORRECT_SNAPSHOT_COUNT
} else if report.State == datamodel.MissedSnapshotSubmission {
incrKey = redisutils.REDIS_KEY_TOTAL_MISSED_SNAPSHOT_COUNT
}
} else {
key = fmt.Sprintf(redisutils.REDIS_KEY_TOTAL_SUCCESSFUL_SNAPSHOT_COUNT, projectId)
}

if incrCount {
err = r.writeClient.Incr(ctx, key).Err()
if err != nil {
log.WithError(err).Error("failed to increment total missed snapshot count")
if incrKey != "" {
err = r.writeClient.Incr(ctx, fmt.Sprintf(incrKey, projectId)).Err()
if err != nil {
log.WithError(err).Error("failed to increment snapshotter status report count in redis")
}
}

}

log.Debug("added snapshotter status report in redis")
Expand All @@ -289,9 +202,9 @@ func (r *RedisCache) StoreLastFinalizedEpoch(ctx context.Context, projectID stri
l := log.WithField("key", key)

lastEpochStr, err := r.readClient.Get(ctx, key).Result()
if err != nil {
l.WithError(err).Error("failed to get last finalized epoch from redis")
}
// if err != nil {
// l.WithError(err).Error("failed to get last finalized epoch from redis")
// }

lastEpoch := 0

Expand All @@ -315,7 +228,7 @@ func (r *RedisCache) StoreLastFinalizedEpoch(ctx context.Context, projectID stri
return err
}

l.WithField("projectID", projectID).WithField("epochId", epochId).Debug("stored last finalized epoch in redis")
l.Debug("stored last finalized epoch in redis")

return nil
}
Expand All @@ -326,16 +239,9 @@ func (r *RedisCache) StoreFinalizedSnapshot(ctx context.Context, msg *datamodel.

msg.Expiry = msg.Timestamp + 3600*24 // 1 day

data, err := json.Marshal(msg)
if err != nil {
l.WithError(err).Error("failed to marshal finalized snapshot")

return err
}

err = r.writeClient.ZAdd(ctx, key, &redis.Z{
err := r.writeClient.ZAdd(ctx, key, &redis.Z{
Score: float64(msg.EpochID),
Member: string(data),
Member: msg.SnapshotCID,
}).Err()

if err != nil {
Expand All @@ -345,54 +251,17 @@ func (r *RedisCache) StoreFinalizedSnapshot(ctx context.Context, msg *datamodel.
}

// get all the members
res, err := r.readClient.ZRangeByScoreWithScores(ctx, key, &redis.ZRangeBy{
Min: "-inf",
Max: "+inf",
}).Result()
res, err := r.writeClient.ZRemRangeByScore(ctx, key, "-inf", strconv.Itoa(msg.EpochID-10240)).Result()
if err != nil {
log.WithError(err).Error("failed to get all finalized snapshots")

log.WithField("epoch ID", msg.EpochID).WithField("Redis Op result:", res).WithError(err).Error("failed to remove finalized snapshots zset cache between -inf to epochId-10240")
// ignore error
return nil
}

// remove all the members that have expired
membersToRemove := make([]interface{}, 0)
timeNow := time.Now().Unix()

for _, member := range res {
m := new(datamodel.PowerloomSnapshotFinalizedMessage)

val, ok := member.Member.(string)
if !ok {
log.Error("CRITICAL: could not convert snapshot cid data stored in redis to string")
}

err = json.Unmarshal([]byte(val), m)
if err != nil {
log.WithError(err).Error("CRITICAL: could not unmarshal snapshot cid data stored in redis")

continue
}

if float64(m.Expiry) < float64(timeNow) {
membersToRemove = append(membersToRemove, member.Member)
}
}

if len(membersToRemove) == 0 {
return nil
}

err = r.writeClient.ZRem(ctx, key, membersToRemove).Err()
if err != nil {
log.WithError(err).Error("failed to remove expired snapshot cid from zset")
}

log.WithField("epoch ID", msg.EpochID).WithField("Redis Op result:", res).Debug("removed finalized snapshots zset cache between -inf to epochId-10240")
return nil
}

func (r *RedisCache) GetFinalizedSnapshotAtEpochID(ctx context.Context, projectID string, epochId int) (*datamodel.PowerloomSnapshotFinalizedMessage, error) {
func (r *RedisCache) GetFinalizedSnapshotAtEpochID(ctx context.Context, projectID string, epochId int) (string, error) {
key := fmt.Sprintf(redisutils.REDIS_KEY_FINALIZED_SNAPSHOTS, projectID)
l := log.WithField("projectId", projectID).WithField("epochId", epochId)

Expand All @@ -403,36 +272,27 @@ func (r *RedisCache) GetFinalizedSnapshotAtEpochID(ctx context.Context, projectI
if err != nil {
l.WithError(err).Error("failed to get finalized snapshot from zset")

return nil, err
return "", err
}

if len(res) == 0 {
l.Debug("no finalized snapshot found at epochId")

return nil, fmt.Errorf("no finalized snapshot found at epochId %d", epochId)
return "", fmt.Errorf("no finalized snapshot found at epochId %d", epochId)
}

if len(res) > 1 {
l.Debug("more than one finalized snapshot found at epochId")

return nil, fmt.Errorf("more than one finalized snapshot found at epochId %d", epochId)
return "", fmt.Errorf("more than one finalized snapshot found at epochId %d", epochId)
}

m := new(datamodel.PowerloomSnapshotFinalizedMessage)

val, ok := res[0].Member.(string)
snapshot_cid, ok := res[0].Member.(string)
if !ok {
l.Error("CRITICAL: could not convert finalized snapshot data stored in redis to string")

return nil, errors.New("failed to convert finalized snapshot data stored in redis to string")
}

err = json.Unmarshal([]byte(val), m)
if err != nil {
l.WithError(err).Error("CRITICAL: could not unmarshal finalized snapshot data stored in redis")

return nil, err
return "", errors.New("failed to convert finalized snapshot data stored in redis to string")
}

return m, nil
return snapshot_cid, nil
}
Loading
Loading