From 160dee8bfa576966b2917b8a1a74a301e34350bf Mon Sep 17 00:00:00 2001 From: Peisen Zhao Date: Sun, 28 Oct 2018 20:17:00 -0400 Subject: [PATCH 1/8] Updat --- actions/v3/upload_sessions.go | 122 +++++++++++++++++++++++++++++----- utils/utils.go | 37 ++++++++--- 2 files changed, 134 insertions(+), 25 deletions(-) diff --git a/actions/v3/upload_sessions.go b/actions/v3/upload_sessions.go index 2e01c482..b2b49620 100644 --- a/actions/v3/upload_sessions.go +++ b/actions/v3/upload_sessions.go @@ -33,6 +33,7 @@ type uploadSessionCreateReqV3 struct { FileSizeBytes uint64 `json:"fileSizeBytes"` // This is Trytes instead of Byte BetaIP string `json:"betaIp"` StorageLengthInYears int `json:"storageLengthInYears"` + AlphaTreasureIndexes []int `json:"alphaTreasureIndexes"` Invoice models.Invoice `json:"invoice"` Version uint32 `json:"version"` } @@ -44,9 +45,18 @@ type uploadSessionCreateBetaResV3 struct { } type uploadSessionCreateResV3 struct { - ID string `json:"id"` - BetaSessionID string `json:"betaSessionId"` - BatchSize int `json:"batchSize"` + ID string `json:"id"` + BetaSessionID string `json:"betaSessionId"` + BatchSize int `json:"batchSize"` + Invoice models.Invoice `json:"invoice"` +} + +/*uploadSessionConfig represents the general configuration that other client could understand.*/ +type uploadSessionConfig struct { + BatchSize int `json:"batchSize"` // Represent each data contain not more number of field. + FileSizeBytes uint64 `json:"fileSizeBytes"` // Represent the total file size. + NumChunks int `json:"numChunks"` // Represent total number of chunks. + ReserveIteration bool `json:"reserveIteration"` // Represent whether iterate it from the beginning to end or end to the beginning. } var NumChunksLimit = -1 //unlimited @@ -66,8 +76,7 @@ func (usr *UploadSessionResourceV3) Update(c buffalo.Context) error { uploadSession := &models.UploadSession{} if err = models.DB.Find(uploadSession, c.Param("id")); err != nil { - oyster_utils.LogIfError(err, nil) - return c.Error(500, err) + return c.Error(500, oyster_utils.LogIfError(err, nil)) } if uploadSession == nil { return c.Error(400, fmt.Errorf("Error in finding session for id %v", c.Param("id"))) @@ -77,16 +86,14 @@ func (usr *UploadSessionResourceV3) Update(c buffalo.Context) error { return c.Error(400, errors.New("Using the wrong endpoint. This endpoint is for V3 only")) } - fileIndex := req.Chunks[0].Idx / BatchSize - objectKey := fmt.Sprintf("%v/%v", uploadSession.GenesisHash, fileIndex) + objectKey := oyster_utils.GetObjectKeyForData(uploadSession.GenesisHash, req.Chunks[0].Idx, BatchSize) var data []byte if data, err = json.Marshal(req.Chunks); err != nil { - return c.Error(500, fmt.Errorf("Unable to marshal ChunkReq to JSON with err %v", err)) + return c.Error(500, oyster_utils.LogIfError(fmt.Errorf("Unable to marshal ChunkReq to JSON with err %v", err), nil)) } if err = setDefaultBucketObject(objectKey, string(data)); err != nil { - oyster_utils.LogIfError(err, nil) - return c.Error(500, fmt.Errorf("Unable to store data to S3 with err: %v", err)) + return c.Error(500, oyster_utils.LogIfError(fmt.Errorf("Unable to store data to S3 with err: %v", err), nil)) } return c.Render(202, actions_utils.Render.JSON(map[string]bool{"success": true})) @@ -114,6 +121,16 @@ func (usr *UploadSessionResourceV3) Create(c buffalo.Context) error { StorageMethod: models.StorageMethodS3, } + if vErr, err := alphaSession.StartUploadSession(); err != nil || vErr.HasAny() { + return c.Error(400, fmt.Errorf("StartUploadSession error: %v and validation error: %v", err, vErr)) + } + + invoice := alphaSession.GetInvoice() + + // Mutates this because copying in golang sucks... + req.Invoice = invoice + req.AlphaTreasureIndexes = oyster_utils.GenerateInsertedIndexesForPearl(oyster_utils.ConvertToByte(req.FileSizeBytes)) + hasBeta := req.BetaIP != "" var betaSessionID = "" if hasBeta { @@ -124,17 +141,27 @@ func (usr *UploadSessionResourceV3) Create(c buffalo.Context) error { betaSessionID = betaSessionRes.ID alphaSession.ETHAddrBeta = nulls.NewString(betaSessionRes.ETHAddr) + + if err := saveTreasureMapForS3(&alphaSession, req.AlphaTreasureIndexes, betaSessionRes.TreasureIndexes); err != nil { + return c.Error(500, err) + } + + if err := models.DB.Save(&alphaSession); err != nil { + return c.Error(400, oyster_utils.LogIfError(err, nil)) + } } - if err := models.DB.Save(&alphaSession); err != nil { - oyster_utils.LogIfError(err, nil) - return c.Error(400, err) + if err := saveConfigForS3(alphaSession); err != nil { + return c.Error(500, err) } + models.NewBrokerBrokerTransaction(&alphaSession) + res := uploadSessionCreateResV3{ ID: alphaSession.ID.String(), BetaSessionID: betaSessionID, BatchSize: BatchSize, + Invoice: invoice, } return c.Render(200, actions_utils.Render.JSON(res)) @@ -164,12 +191,29 @@ func (usr *UploadSessionResourceV3) CreateBeta(c buffalo.Context) error { StorageMethod: models.StorageMethodS3, } + if vErr, err := u.StartUploadSession(); err != nil || vErr.HasAny() { + return c.Error(400, fmt.Errorf("Can't startUploadSession with validation error: %v and err: %v", vErr, err)) + } + + betaTreasureIndexes := oyster_utils.GenerateInsertedIndexesForPearl(oyster_utils.ConvertToByte(req.FileSizeBytes)) + if err := saveTreasureMapForS3(&u, req.AlphaTreasureIndexes, betaTreasureIndexes); err != nil { + return c.Error(500, err) + } + if err := models.DB.Save(&u); err != nil { - return c.Error(400, err) + return c.Error(500, err) } + if err := saveConfigForS3(u); err != nil { + return c.Error(500, err) + } + + models.NewBrokerBrokerTransaction(&u) + res := uploadSessionCreateBetaResV3{ - ID: u.ID.String(), + ID: u.ID.String(), + TreasureIndexes: betaTreasureIndexes, + ETHAddr: u.ETHAddrBeta.String, } return c.Render(200, actions_utils.Render.JSON(res)) @@ -219,3 +263,51 @@ func sendBetaWithUploadRequest(req uploadSessionCreateReqV3) (uploadSessionCreat err := oyster_utils.SendHttpReq(betaURL, req, betaSessionRes) return betaSessionRes, err } + +/*saveTreasureMapForS3 saves treasure keys as JSON format into S3.*/ +func saveTreasureMapForS3(u *models.UploadSession, treasureIndexA []int, treasureIndexB []int) error { + mergedIndexes, err := oyster_utils.MergeIndexes(treasureIndexA, treasureIndexB, + oyster_utils.FileSectorInChunkSize, u.NumChunks) + if err != nil { + return err + } + + if len(mergedIndexes) == 0 { + if oyster_utils.BrokerMode != oyster_utils.TestModeNoTreasure { + return oyster_utils.LogIfError(errors.New("no indexes selected for treasure"), nil) + } + return nil + } + + for { + privateKeys, err := EthWrapper.GenerateKeys(len(mergedIndexes)) + if err != nil { + return oyster_utils.LogIfError(errors.New("Could not generate eth keys: "+err.Error()), nil) + } + if len(mergedIndexes) != len(privateKeys) { + return oyster_utils.LogIfError(errors.New("privateKeys and mergedIndexes should have the same length"), nil) + } + // Update treasureId + u.MakeTreasureIdxMap(mergedIndexes, privateKeys) + } + if !u.TreasureIdxMap.Valid || u.TreasureIdxMap.String == "" { + return oyster_utils.LogIfError(errors.New("Not treasure was included in the UploadSession"), nil) + } + + return setDefaultBucketObject(oyster_utils.GetObjectKeyForTreasure(u.GenesisHash), u.TreasureIdxMap.String) +} + +/*saveConfigForS3 saves uploadSession config to S3 endpoint so that Lamdba function could read it.*/ +func saveConfigForS3(u models.UploadSession) error { + config := uploadSessionConfig{ + BatchSize: BatchSize, + FileSizeBytes: u.FileSizeBytes, + NumChunks: u.NumChunks, + ReserveIteration: u.Type == models.SessionTypeBeta, + } + data, err := json.Marshal(config) + if err != nil { + return oyster_utils.LogIfError(err, nil) + } + return setDefaultBucketObject(oyster_utils.GetObjectKeyForConfig(u.GenesisHash), string(data)) +} diff --git a/utils/utils.go b/utils/utils.go index 34c4feb4..73b7a64d 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -98,8 +98,7 @@ func SetLogInfoForDatabaseUrl(dbUrl string) { func SendHttpReq(url string, req interface{}, resp interface{}) error { reqJson, err := json.Marshal(req) if err != nil { - LogIfError(err, nil) - return err + return LogIfError(err, nil) } reqBody := bytes.NewBuffer(reqJson) @@ -107,15 +106,13 @@ func SendHttpReq(url string, req interface{}, resp interface{}) error { defer httpRes.Body.Close() // we need to close the connection if err != nil { - LogIfError(err, nil) - return err + return LogIfError(err, nil) } if err := ParseResBody(httpRes, resp); err != nil { err = fmt.Errorf("Unable to communicate with Beta node: %v", err) // This should consider as BadRequest since the client pick the beta node. - LogIfError(err, nil) - return err + return LogIfError(err, nil) } return nil } @@ -256,8 +253,7 @@ func MergeIndexes(a []int, b []int, sectorSize int, numChunks int) ([]int, error var merged []int if len(a) == 0 && len(b) == 0 || len(a) != len(b) { err := errors.New("Invalid input for utils.MergeIndexes. Both a []int and b []int must have the same length") - LogIfError(err, map[string]interface{}{"aInputSize": len(a), "bInputSize": len(b)}) - return nil, err + return nil, LogIfError(err, map[string]interface{}{"aInputSize": len(a), "bInputSize": len(b)}) } for i := 0; i < len(a); i++ { @@ -311,9 +307,9 @@ func ConvertGweiToWei(gwei *big.Int) *big.Int { } /*LogIfError logs any error if it is not nil. Allow caller to provide additional freeform info.*/ -func LogIfError(err error, extraInfo map[string]interface{}) { +func LogIfError(err error, extraInfo map[string]interface{}) error { if err == nil { - return + return nil } fmt.Println(err) @@ -325,6 +321,7 @@ func LogIfError(err error, extraInfo map[string]interface{}) { raven.CaptureError(err, logErrorTags) } } + return err } /* ReturnEncryptedEthKey will be used by several models to encrypt the eth key so we are not storing a naked key */ @@ -391,3 +388,23 @@ func IntMax(x, y int) int { func GenerateBadgerKey(startingString string, genesisHash string, chunkIdx int) string { return fmt.Sprintf("%v%v__%d", startingString, genesisHash, chunkIdx) } + +/*GetObjectKeyForTreasure will return the treasure key.*/ +func GetObjectKeyForConfig(genesisHash string) string { + return fmt.Sprintf("%v/%v", genesisHash, "config") +} + +/*GetObjectKeyForTreasure will return the treasure key.*/ +func GetObjectKeyForTreasure(genesisHash string) string { + return fmt.Sprintf("%v/%v", genesisHash, "treasure_map") +} + +/*GetObjectKeyForData will return object key for particular data. startIndex is the smallest index in data. */ +func GetObjectKeyForData(genesisHash string, startIndex int, batchSize int) string { + return fmt.Sprintf("%v/%v/%v", genesisHash, "data", startIndex%batchSize) +} + +/*GetObjectKeyForHash will return object key for particular data. startIndex is the smallest index in data. */ +func GetObjectKeyForHash(genesisHash string, startIndex int, batchSize int) string { + return fmt.Sprintf("%v/%v/%v", genesisHash, "hash", startIndex%batchSize) +} From c70531ca986daf49d83cd63b2c4894a682b41eed Mon Sep 17 00:00:00 2001 From: Peisen Zhao Date: Sun, 28 Oct 2018 20:32:40 -0400 Subject: [PATCH 2/8] update --- actions/v3/upload_sessions.go | 3 ++- utils/utils.go | 20 ++++++++++++++++---- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/actions/v3/upload_sessions.go b/actions/v3/upload_sessions.go index b2b49620..db8fa23a 100644 --- a/actions/v3/upload_sessions.go +++ b/actions/v3/upload_sessions.go @@ -86,7 +86,8 @@ func (usr *UploadSessionResourceV3) Update(c buffalo.Context) error { return c.Error(400, errors.New("Using the wrong endpoint. This endpoint is for V3 only")) } - objectKey := oyster_utils.GetObjectKeyForData(uploadSession.GenesisHash, req.Chunks[0].Idx, BatchSize) + isReverseIteration := uploadSession.Type == models.SessionTypeBeta + objectKey := oyster_utils.GetObjectKeyForData(uploadSession.GenesisHash, req.Chunks[0].Idx, uploadSession.NumChunks, isReverseIteration, BatchSize) var data []byte if data, err = json.Marshal(req.Chunks); err != nil { diff --git a/utils/utils.go b/utils/utils.go index 73b7a64d..4d3689d9 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -400,11 +400,23 @@ func GetObjectKeyForTreasure(genesisHash string) string { } /*GetObjectKeyForData will return object key for particular data. startIndex is the smallest index in data. */ -func GetObjectKeyForData(genesisHash string, startIndex int, batchSize int) string { - return fmt.Sprintf("%v/%v/%v", genesisHash, "data", startIndex%batchSize) +func GetObjectKeyForData(genesisHash string, startIndex int, totalCount int, isReserveIteration bool, batchSize int) string { + index := 0 + if isReserveIteration { + index = (totalCount - 1 - startIndex) / batchSize + } else { + index = startIndex / batchSize + } + return fmt.Sprintf("%v/%v/%v", genesisHash, "data", index) } /*GetObjectKeyForHash will return object key for particular data. startIndex is the smallest index in data. */ -func GetObjectKeyForHash(genesisHash string, startIndex int, batchSize int) string { - return fmt.Sprintf("%v/%v/%v", genesisHash, "hash", startIndex%batchSize) +func GetObjectKeyForHash(genesisHash string, startIndex int, totalCount int, isReserveIteration bool, batchSize int) string { + index := 0 + if isReserveIteration { + index = (totalCount - 1 - startIndex) / batchSize + } else { + index = startIndex / batchSize + } + return fmt.Sprintf("%v/%v/%v", genesisHash, "hash", index) } From d0fac27f554ffb446a0ed9fa93e2110393db3027 Mon Sep 17 00:00:00 2001 From: Peisen Zhao Date: Mon, 29 Oct 2018 20:55:56 -0400 Subject: [PATCH 3/8] Update --- utils/utils.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/utils/utils.go b/utils/utils.go index 4d3689d9..656dd49d 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -407,6 +407,7 @@ func GetObjectKeyForData(genesisHash string, startIndex int, totalCount int, isR } else { index = startIndex / batchSize } + return fmt.Sprintf("%v/%v/%v", genesisHash, "data", index) } @@ -418,5 +419,6 @@ func GetObjectKeyForHash(genesisHash string, startIndex int, totalCount int, isR } else { index = startIndex / batchSize } + return fmt.Sprintf("%v/%v/%v", genesisHash, "hash", index) } From 416cd7288a17adeba10f631feeb62dd606cd5940 Mon Sep 17 00:00:00 2001 From: Peisen Zhao Date: Sun, 28 Oct 2018 20:17:00 -0400 Subject: [PATCH 4/8] Updat --- actions/v3/upload_sessions.go | 122 +++++++++++++++++++++++++++++----- utils/utils.go | 37 ++++++++--- 2 files changed, 134 insertions(+), 25 deletions(-) diff --git a/actions/v3/upload_sessions.go b/actions/v3/upload_sessions.go index 2e01c482..b2b49620 100644 --- a/actions/v3/upload_sessions.go +++ b/actions/v3/upload_sessions.go @@ -33,6 +33,7 @@ type uploadSessionCreateReqV3 struct { FileSizeBytes uint64 `json:"fileSizeBytes"` // This is Trytes instead of Byte BetaIP string `json:"betaIp"` StorageLengthInYears int `json:"storageLengthInYears"` + AlphaTreasureIndexes []int `json:"alphaTreasureIndexes"` Invoice models.Invoice `json:"invoice"` Version uint32 `json:"version"` } @@ -44,9 +45,18 @@ type uploadSessionCreateBetaResV3 struct { } type uploadSessionCreateResV3 struct { - ID string `json:"id"` - BetaSessionID string `json:"betaSessionId"` - BatchSize int `json:"batchSize"` + ID string `json:"id"` + BetaSessionID string `json:"betaSessionId"` + BatchSize int `json:"batchSize"` + Invoice models.Invoice `json:"invoice"` +} + +/*uploadSessionConfig represents the general configuration that other client could understand.*/ +type uploadSessionConfig struct { + BatchSize int `json:"batchSize"` // Represent each data contain not more number of field. + FileSizeBytes uint64 `json:"fileSizeBytes"` // Represent the total file size. + NumChunks int `json:"numChunks"` // Represent total number of chunks. + ReserveIteration bool `json:"reserveIteration"` // Represent whether iterate it from the beginning to end or end to the beginning. } var NumChunksLimit = -1 //unlimited @@ -66,8 +76,7 @@ func (usr *UploadSessionResourceV3) Update(c buffalo.Context) error { uploadSession := &models.UploadSession{} if err = models.DB.Find(uploadSession, c.Param("id")); err != nil { - oyster_utils.LogIfError(err, nil) - return c.Error(500, err) + return c.Error(500, oyster_utils.LogIfError(err, nil)) } if uploadSession == nil { return c.Error(400, fmt.Errorf("Error in finding session for id %v", c.Param("id"))) @@ -77,16 +86,14 @@ func (usr *UploadSessionResourceV3) Update(c buffalo.Context) error { return c.Error(400, errors.New("Using the wrong endpoint. This endpoint is for V3 only")) } - fileIndex := req.Chunks[0].Idx / BatchSize - objectKey := fmt.Sprintf("%v/%v", uploadSession.GenesisHash, fileIndex) + objectKey := oyster_utils.GetObjectKeyForData(uploadSession.GenesisHash, req.Chunks[0].Idx, BatchSize) var data []byte if data, err = json.Marshal(req.Chunks); err != nil { - return c.Error(500, fmt.Errorf("Unable to marshal ChunkReq to JSON with err %v", err)) + return c.Error(500, oyster_utils.LogIfError(fmt.Errorf("Unable to marshal ChunkReq to JSON with err %v", err), nil)) } if err = setDefaultBucketObject(objectKey, string(data)); err != nil { - oyster_utils.LogIfError(err, nil) - return c.Error(500, fmt.Errorf("Unable to store data to S3 with err: %v", err)) + return c.Error(500, oyster_utils.LogIfError(fmt.Errorf("Unable to store data to S3 with err: %v", err), nil)) } return c.Render(202, actions_utils.Render.JSON(map[string]bool{"success": true})) @@ -114,6 +121,16 @@ func (usr *UploadSessionResourceV3) Create(c buffalo.Context) error { StorageMethod: models.StorageMethodS3, } + if vErr, err := alphaSession.StartUploadSession(); err != nil || vErr.HasAny() { + return c.Error(400, fmt.Errorf("StartUploadSession error: %v and validation error: %v", err, vErr)) + } + + invoice := alphaSession.GetInvoice() + + // Mutates this because copying in golang sucks... + req.Invoice = invoice + req.AlphaTreasureIndexes = oyster_utils.GenerateInsertedIndexesForPearl(oyster_utils.ConvertToByte(req.FileSizeBytes)) + hasBeta := req.BetaIP != "" var betaSessionID = "" if hasBeta { @@ -124,17 +141,27 @@ func (usr *UploadSessionResourceV3) Create(c buffalo.Context) error { betaSessionID = betaSessionRes.ID alphaSession.ETHAddrBeta = nulls.NewString(betaSessionRes.ETHAddr) + + if err := saveTreasureMapForS3(&alphaSession, req.AlphaTreasureIndexes, betaSessionRes.TreasureIndexes); err != nil { + return c.Error(500, err) + } + + if err := models.DB.Save(&alphaSession); err != nil { + return c.Error(400, oyster_utils.LogIfError(err, nil)) + } } - if err := models.DB.Save(&alphaSession); err != nil { - oyster_utils.LogIfError(err, nil) - return c.Error(400, err) + if err := saveConfigForS3(alphaSession); err != nil { + return c.Error(500, err) } + models.NewBrokerBrokerTransaction(&alphaSession) + res := uploadSessionCreateResV3{ ID: alphaSession.ID.String(), BetaSessionID: betaSessionID, BatchSize: BatchSize, + Invoice: invoice, } return c.Render(200, actions_utils.Render.JSON(res)) @@ -164,12 +191,29 @@ func (usr *UploadSessionResourceV3) CreateBeta(c buffalo.Context) error { StorageMethod: models.StorageMethodS3, } + if vErr, err := u.StartUploadSession(); err != nil || vErr.HasAny() { + return c.Error(400, fmt.Errorf("Can't startUploadSession with validation error: %v and err: %v", vErr, err)) + } + + betaTreasureIndexes := oyster_utils.GenerateInsertedIndexesForPearl(oyster_utils.ConvertToByte(req.FileSizeBytes)) + if err := saveTreasureMapForS3(&u, req.AlphaTreasureIndexes, betaTreasureIndexes); err != nil { + return c.Error(500, err) + } + if err := models.DB.Save(&u); err != nil { - return c.Error(400, err) + return c.Error(500, err) } + if err := saveConfigForS3(u); err != nil { + return c.Error(500, err) + } + + models.NewBrokerBrokerTransaction(&u) + res := uploadSessionCreateBetaResV3{ - ID: u.ID.String(), + ID: u.ID.String(), + TreasureIndexes: betaTreasureIndexes, + ETHAddr: u.ETHAddrBeta.String, } return c.Render(200, actions_utils.Render.JSON(res)) @@ -219,3 +263,51 @@ func sendBetaWithUploadRequest(req uploadSessionCreateReqV3) (uploadSessionCreat err := oyster_utils.SendHttpReq(betaURL, req, betaSessionRes) return betaSessionRes, err } + +/*saveTreasureMapForS3 saves treasure keys as JSON format into S3.*/ +func saveTreasureMapForS3(u *models.UploadSession, treasureIndexA []int, treasureIndexB []int) error { + mergedIndexes, err := oyster_utils.MergeIndexes(treasureIndexA, treasureIndexB, + oyster_utils.FileSectorInChunkSize, u.NumChunks) + if err != nil { + return err + } + + if len(mergedIndexes) == 0 { + if oyster_utils.BrokerMode != oyster_utils.TestModeNoTreasure { + return oyster_utils.LogIfError(errors.New("no indexes selected for treasure"), nil) + } + return nil + } + + for { + privateKeys, err := EthWrapper.GenerateKeys(len(mergedIndexes)) + if err != nil { + return oyster_utils.LogIfError(errors.New("Could not generate eth keys: "+err.Error()), nil) + } + if len(mergedIndexes) != len(privateKeys) { + return oyster_utils.LogIfError(errors.New("privateKeys and mergedIndexes should have the same length"), nil) + } + // Update treasureId + u.MakeTreasureIdxMap(mergedIndexes, privateKeys) + } + if !u.TreasureIdxMap.Valid || u.TreasureIdxMap.String == "" { + return oyster_utils.LogIfError(errors.New("Not treasure was included in the UploadSession"), nil) + } + + return setDefaultBucketObject(oyster_utils.GetObjectKeyForTreasure(u.GenesisHash), u.TreasureIdxMap.String) +} + +/*saveConfigForS3 saves uploadSession config to S3 endpoint so that Lamdba function could read it.*/ +func saveConfigForS3(u models.UploadSession) error { + config := uploadSessionConfig{ + BatchSize: BatchSize, + FileSizeBytes: u.FileSizeBytes, + NumChunks: u.NumChunks, + ReserveIteration: u.Type == models.SessionTypeBeta, + } + data, err := json.Marshal(config) + if err != nil { + return oyster_utils.LogIfError(err, nil) + } + return setDefaultBucketObject(oyster_utils.GetObjectKeyForConfig(u.GenesisHash), string(data)) +} diff --git a/utils/utils.go b/utils/utils.go index 34c4feb4..73b7a64d 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -98,8 +98,7 @@ func SetLogInfoForDatabaseUrl(dbUrl string) { func SendHttpReq(url string, req interface{}, resp interface{}) error { reqJson, err := json.Marshal(req) if err != nil { - LogIfError(err, nil) - return err + return LogIfError(err, nil) } reqBody := bytes.NewBuffer(reqJson) @@ -107,15 +106,13 @@ func SendHttpReq(url string, req interface{}, resp interface{}) error { defer httpRes.Body.Close() // we need to close the connection if err != nil { - LogIfError(err, nil) - return err + return LogIfError(err, nil) } if err := ParseResBody(httpRes, resp); err != nil { err = fmt.Errorf("Unable to communicate with Beta node: %v", err) // This should consider as BadRequest since the client pick the beta node. - LogIfError(err, nil) - return err + return LogIfError(err, nil) } return nil } @@ -256,8 +253,7 @@ func MergeIndexes(a []int, b []int, sectorSize int, numChunks int) ([]int, error var merged []int if len(a) == 0 && len(b) == 0 || len(a) != len(b) { err := errors.New("Invalid input for utils.MergeIndexes. Both a []int and b []int must have the same length") - LogIfError(err, map[string]interface{}{"aInputSize": len(a), "bInputSize": len(b)}) - return nil, err + return nil, LogIfError(err, map[string]interface{}{"aInputSize": len(a), "bInputSize": len(b)}) } for i := 0; i < len(a); i++ { @@ -311,9 +307,9 @@ func ConvertGweiToWei(gwei *big.Int) *big.Int { } /*LogIfError logs any error if it is not nil. Allow caller to provide additional freeform info.*/ -func LogIfError(err error, extraInfo map[string]interface{}) { +func LogIfError(err error, extraInfo map[string]interface{}) error { if err == nil { - return + return nil } fmt.Println(err) @@ -325,6 +321,7 @@ func LogIfError(err error, extraInfo map[string]interface{}) { raven.CaptureError(err, logErrorTags) } } + return err } /* ReturnEncryptedEthKey will be used by several models to encrypt the eth key so we are not storing a naked key */ @@ -391,3 +388,23 @@ func IntMax(x, y int) int { func GenerateBadgerKey(startingString string, genesisHash string, chunkIdx int) string { return fmt.Sprintf("%v%v__%d", startingString, genesisHash, chunkIdx) } + +/*GetObjectKeyForTreasure will return the treasure key.*/ +func GetObjectKeyForConfig(genesisHash string) string { + return fmt.Sprintf("%v/%v", genesisHash, "config") +} + +/*GetObjectKeyForTreasure will return the treasure key.*/ +func GetObjectKeyForTreasure(genesisHash string) string { + return fmt.Sprintf("%v/%v", genesisHash, "treasure_map") +} + +/*GetObjectKeyForData will return object key for particular data. startIndex is the smallest index in data. */ +func GetObjectKeyForData(genesisHash string, startIndex int, batchSize int) string { + return fmt.Sprintf("%v/%v/%v", genesisHash, "data", startIndex%batchSize) +} + +/*GetObjectKeyForHash will return object key for particular data. startIndex is the smallest index in data. */ +func GetObjectKeyForHash(genesisHash string, startIndex int, batchSize int) string { + return fmt.Sprintf("%v/%v/%v", genesisHash, "hash", startIndex%batchSize) +} From 48ba68d1b90bc5d1c860b3569e77d5202e786750 Mon Sep 17 00:00:00 2001 From: Peisen Zhao Date: Sun, 28 Oct 2018 20:32:40 -0400 Subject: [PATCH 5/8] update --- actions/v3/upload_sessions.go | 3 ++- utils/utils.go | 20 ++++++++++++++++---- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/actions/v3/upload_sessions.go b/actions/v3/upload_sessions.go index b2b49620..db8fa23a 100644 --- a/actions/v3/upload_sessions.go +++ b/actions/v3/upload_sessions.go @@ -86,7 +86,8 @@ func (usr *UploadSessionResourceV3) Update(c buffalo.Context) error { return c.Error(400, errors.New("Using the wrong endpoint. This endpoint is for V3 only")) } - objectKey := oyster_utils.GetObjectKeyForData(uploadSession.GenesisHash, req.Chunks[0].Idx, BatchSize) + isReverseIteration := uploadSession.Type == models.SessionTypeBeta + objectKey := oyster_utils.GetObjectKeyForData(uploadSession.GenesisHash, req.Chunks[0].Idx, uploadSession.NumChunks, isReverseIteration, BatchSize) var data []byte if data, err = json.Marshal(req.Chunks); err != nil { diff --git a/utils/utils.go b/utils/utils.go index 73b7a64d..4d3689d9 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -400,11 +400,23 @@ func GetObjectKeyForTreasure(genesisHash string) string { } /*GetObjectKeyForData will return object key for particular data. startIndex is the smallest index in data. */ -func GetObjectKeyForData(genesisHash string, startIndex int, batchSize int) string { - return fmt.Sprintf("%v/%v/%v", genesisHash, "data", startIndex%batchSize) +func GetObjectKeyForData(genesisHash string, startIndex int, totalCount int, isReserveIteration bool, batchSize int) string { + index := 0 + if isReserveIteration { + index = (totalCount - 1 - startIndex) / batchSize + } else { + index = startIndex / batchSize + } + return fmt.Sprintf("%v/%v/%v", genesisHash, "data", index) } /*GetObjectKeyForHash will return object key for particular data. startIndex is the smallest index in data. */ -func GetObjectKeyForHash(genesisHash string, startIndex int, batchSize int) string { - return fmt.Sprintf("%v/%v/%v", genesisHash, "hash", startIndex%batchSize) +func GetObjectKeyForHash(genesisHash string, startIndex int, totalCount int, isReserveIteration bool, batchSize int) string { + index := 0 + if isReserveIteration { + index = (totalCount - 1 - startIndex) / batchSize + } else { + index = startIndex / batchSize + } + return fmt.Sprintf("%v/%v/%v", genesisHash, "hash", index) } From fe838562db45ad34b76f31d0ad4af68261fd7ad5 Mon Sep 17 00:00:00 2001 From: Peisen Zhao Date: Mon, 29 Oct 2018 20:55:56 -0400 Subject: [PATCH 6/8] Update --- utils/utils.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/utils/utils.go b/utils/utils.go index 4d3689d9..656dd49d 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -407,6 +407,7 @@ func GetObjectKeyForData(genesisHash string, startIndex int, totalCount int, isR } else { index = startIndex / batchSize } + return fmt.Sprintf("%v/%v/%v", genesisHash, "data", index) } @@ -418,5 +419,6 @@ func GetObjectKeyForHash(genesisHash string, startIndex int, totalCount int, isR } else { index = startIndex / batchSize } + return fmt.Sprintf("%v/%v/%v", genesisHash, "hash", index) } From ccb7b17d1d0f301201e53616418ad0aa24773a5d Mon Sep 17 00:00:00 2001 From: Peisen Zhao Date: Fri, 2 Nov 2018 18:32:30 -0400 Subject: [PATCH 7/8] u --- actions/v3/upload_sessions.go | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/actions/v3/upload_sessions.go b/actions/v3/upload_sessions.go index db8fa23a..b4f730bd 100644 --- a/actions/v3/upload_sessions.go +++ b/actions/v3/upload_sessions.go @@ -56,7 +56,7 @@ type uploadSessionConfig struct { BatchSize int `json:"batchSize"` // Represent each data contain not more number of field. FileSizeBytes uint64 `json:"fileSizeBytes"` // Represent the total file size. NumChunks int `json:"numChunks"` // Represent total number of chunks. - ReserveIteration bool `json:"reserveIteration"` // Represent whether iterate it from the beginning to end or end to the beginning. + ReverseIteration bool `json:"reverseIteration"` // Represent whether iterate it from the beginning to end or end to the beginning. } var NumChunksLimit = -1 //unlimited @@ -290,9 +290,14 @@ func saveTreasureMapForS3(u *models.UploadSession, treasureIndexA []int, treasur } // Update treasureId u.MakeTreasureIdxMap(mergedIndexes, privateKeys) - } - if !u.TreasureIdxMap.Valid || u.TreasureIdxMap.String == "" { - return oyster_utils.LogIfError(errors.New("Not treasure was included in the UploadSession"), nil) + + // Verify that MakeTreasureIdxMap is correct. Otherwise, regenerate it again. + treasureIndexes, _ := alphaSession.GetTreasureIndexes() + if alphaSession.TreasureStatus == models.TreasureInDataMapPending && + alphaSession.TreasureIdxMap.Valid && alphaSession.TreasureIdxMap.String != "" && + len(treasureIndexes) == len(mergedIndexes) { + break + } } return setDefaultBucketObject(oyster_utils.GetObjectKeyForTreasure(u.GenesisHash), u.TreasureIdxMap.String) @@ -304,7 +309,7 @@ func saveConfigForS3(u models.UploadSession) error { BatchSize: BatchSize, FileSizeBytes: u.FileSizeBytes, NumChunks: u.NumChunks, - ReserveIteration: u.Type == models.SessionTypeBeta, + ReverseIteration: u.Type == models.SessionTypeBeta, } data, err := json.Marshal(config) if err != nil { From d3d69c893c29bff61fcb278904f0a246738cbd54 Mon Sep 17 00:00:00 2001 From: Peisen Zhao Date: Sat, 3 Nov 2018 17:25:35 -0400 Subject: [PATCH 8/8] update --- actions/v3/upload_sessions.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/actions/v3/upload_sessions.go b/actions/v3/upload_sessions.go index b4f730bd..0ba0d469 100644 --- a/actions/v3/upload_sessions.go +++ b/actions/v3/upload_sessions.go @@ -292,9 +292,9 @@ func saveTreasureMapForS3(u *models.UploadSession, treasureIndexA []int, treasur u.MakeTreasureIdxMap(mergedIndexes, privateKeys) // Verify that MakeTreasureIdxMap is correct. Otherwise, regenerate it again. - treasureIndexes, _ := alphaSession.GetTreasureIndexes() - if alphaSession.TreasureStatus == models.TreasureInDataMapPending && - alphaSession.TreasureIdxMap.Valid && alphaSession.TreasureIdxMap.String != "" && + treasureIndexes, _ := u.GetTreasureIndexes() + if u.TreasureStatus == models.TreasureInDataMapPending && + u.TreasureIdxMap.Valid && u.TreasureIdxMap.String != "" && len(treasureIndexes) == len(mergedIndexes) { break }