Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Create/CreateBeta method #634

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 108 additions & 15 deletions actions/v3/upload_sessions.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type uploadSessionCreateReqV3 struct {
FileSizeBytes uint64 `json:"fileSizeBytes"` // This is Trytes instead of Byte
BetaIP string `json:"betaIp"`
StorageLengthInYears int `json:"storageLengthInYears"`
AlphaTreasureIndexes []int `json:"alphaTreasureIndexes"`
Invoice models.Invoice `json:"invoice"`
Version uint32 `json:"version"`
}
Expand All @@ -44,9 +45,18 @@ type uploadSessionCreateBetaResV3 struct {
}

type uploadSessionCreateResV3 struct {
ID string `json:"id"`
BetaSessionID string `json:"betaSessionId"`
BatchSize int `json:"batchSize"`
ID string `json:"id"`
BetaSessionID string `json:"betaSessionId"`
BatchSize int `json:"batchSize"`
Invoice models.Invoice `json:"invoice"`
}

/*uploadSessionConfig represents the general configuration that other client could understand.*/
type uploadSessionConfig struct {
BatchSize int `json:"batchSize"` // Represent each data contain not more number of field.
FileSizeBytes uint64 `json:"fileSizeBytes"` // Represent the total file size.
NumChunks int `json:"numChunks"` // Represent total number of chunks.
ReserveIteration bool `json:"reserveIteration"` // Represent whether iterate it from the beginning to end or end to the beginning.
pzhao5 marked this conversation as resolved.
Show resolved Hide resolved
}

var NumChunksLimit = -1 //unlimited
Expand All @@ -66,8 +76,7 @@ func (usr *UploadSessionResourceV3) Update(c buffalo.Context) error {

uploadSession := &models.UploadSession{}
if err = models.DB.Find(uploadSession, c.Param("id")); err != nil {
oyster_utils.LogIfError(err, nil)
return c.Error(500, err)
return c.Error(500, oyster_utils.LogIfError(err, nil))
}
if uploadSession == nil {
return c.Error(400, fmt.Errorf("Error in finding session for id %v", c.Param("id")))
Expand All @@ -77,16 +86,15 @@ func (usr *UploadSessionResourceV3) Update(c buffalo.Context) error {
return c.Error(400, errors.New("Using the wrong endpoint. This endpoint is for V3 only"))
}

fileIndex := req.Chunks[0].Idx / BatchSize
objectKey := fmt.Sprintf("%v/%v", uploadSession.GenesisHash, fileIndex)
isReverseIteration := uploadSession.Type == models.SessionTypeBeta
objectKey := oyster_utils.GetObjectKeyForData(uploadSession.GenesisHash, req.Chunks[0].Idx, uploadSession.NumChunks, isReverseIteration, BatchSize)

var data []byte
if data, err = json.Marshal(req.Chunks); err != nil {
return c.Error(500, fmt.Errorf("Unable to marshal ChunkReq to JSON with err %v", err))
return c.Error(500, oyster_utils.LogIfError(fmt.Errorf("Unable to marshal ChunkReq to JSON with err %v", err), nil))
}
if err = setDefaultBucketObject(objectKey, string(data)); err != nil {
oyster_utils.LogIfError(err, nil)
return c.Error(500, fmt.Errorf("Unable to store data to S3 with err: %v", err))
return c.Error(500, oyster_utils.LogIfError(fmt.Errorf("Unable to store data to S3 with err: %v", err), nil))
}

return c.Render(202, actions_utils.Render.JSON(map[string]bool{"success": true}))
Expand Down Expand Up @@ -114,6 +122,16 @@ func (usr *UploadSessionResourceV3) Create(c buffalo.Context) error {
StorageMethod: models.StorageMethodS3,
}

if vErr, err := alphaSession.StartUploadSession(); err != nil || vErr.HasAny() {
return c.Error(400, fmt.Errorf("StartUploadSession error: %v and validation error: %v", err, vErr))
}

invoice := alphaSession.GetInvoice()

// Mutates this because copying in golang sucks...
req.Invoice = invoice
req.AlphaTreasureIndexes = oyster_utils.GenerateInsertedIndexesForPearl(oyster_utils.ConvertToByte(req.FileSizeBytes))

hasBeta := req.BetaIP != ""
var betaSessionID = ""
if hasBeta {
Expand All @@ -124,17 +142,27 @@ func (usr *UploadSessionResourceV3) Create(c buffalo.Context) error {

betaSessionID = betaSessionRes.ID
alphaSession.ETHAddrBeta = nulls.NewString(betaSessionRes.ETHAddr)

if err := saveTreasureMapForS3(&alphaSession, req.AlphaTreasureIndexes, betaSessionRes.TreasureIndexes); err != nil {
return c.Error(500, err)
}

if err := models.DB.Save(&alphaSession); err != nil {
return c.Error(400, oyster_utils.LogIfError(err, nil))
}
}

if err := models.DB.Save(&alphaSession); err != nil {
oyster_utils.LogIfError(err, nil)
return c.Error(400, err)
if err := saveConfigForS3(alphaSession); err != nil {
return c.Error(500, err)
}

models.NewBrokerBrokerTransaction(&alphaSession)

res := uploadSessionCreateResV3{
ID: alphaSession.ID.String(),
BetaSessionID: betaSessionID,
BatchSize: BatchSize,
Invoice: invoice,
}

return c.Render(200, actions_utils.Render.JSON(res))
Expand Down Expand Up @@ -164,12 +192,29 @@ func (usr *UploadSessionResourceV3) CreateBeta(c buffalo.Context) error {
StorageMethod: models.StorageMethodS3,
}

if vErr, err := u.StartUploadSession(); err != nil || vErr.HasAny() {
return c.Error(400, fmt.Errorf("Can't startUploadSession with validation error: %v and err: %v", vErr, err))
}

betaTreasureIndexes := oyster_utils.GenerateInsertedIndexesForPearl(oyster_utils.ConvertToByte(req.FileSizeBytes))
if err := saveTreasureMapForS3(&u, req.AlphaTreasureIndexes, betaTreasureIndexes); err != nil {
return c.Error(500, err)
}

if err := models.DB.Save(&u); err != nil {
return c.Error(400, err)
return c.Error(500, err)
}

if err := saveConfigForS3(u); err != nil {
return c.Error(500, err)
}

models.NewBrokerBrokerTransaction(&u)

res := uploadSessionCreateBetaResV3{
ID: u.ID.String(),
ID: u.ID.String(),
TreasureIndexes: betaTreasureIndexes,
ETHAddr: u.ETHAddrBeta.String,
}

return c.Render(200, actions_utils.Render.JSON(res))
Expand Down Expand Up @@ -219,3 +264,51 @@ func sendBetaWithUploadRequest(req uploadSessionCreateReqV3) (uploadSessionCreat
err := oyster_utils.SendHttpReq(betaURL, req, betaSessionRes)
return betaSessionRes, err
}

/*saveTreasureMapForS3 saves treasure keys as JSON format into S3.*/
func saveTreasureMapForS3(u *models.UploadSession, treasureIndexA []int, treasureIndexB []int) error {
mergedIndexes, err := oyster_utils.MergeIndexes(treasureIndexA, treasureIndexB,
oyster_utils.FileSectorInChunkSize, u.NumChunks)
if err != nil {
return err
}

if len(mergedIndexes) == 0 {
if oyster_utils.BrokerMode != oyster_utils.TestModeNoTreasure {
return oyster_utils.LogIfError(errors.New("no indexes selected for treasure"), nil)
}
return nil
}

for {
privateKeys, err := EthWrapper.GenerateKeys(len(mergedIndexes))
if err != nil {
return oyster_utils.LogIfError(errors.New("Could not generate eth keys: "+err.Error()), nil)
}
if len(mergedIndexes) != len(privateKeys) {
return oyster_utils.LogIfError(errors.New("privateKeys and mergedIndexes should have the same length"), nil)
}
// Update treasureId
u.MakeTreasureIdxMap(mergedIndexes, privateKeys)
pzhao5 marked this conversation as resolved.
Show resolved Hide resolved
}
if !u.TreasureIdxMap.Valid || u.TreasureIdxMap.String == "" {
return oyster_utils.LogIfError(errors.New("Not treasure was included in the UploadSession"), nil)
}

return setDefaultBucketObject(oyster_utils.GetObjectKeyForTreasure(u.GenesisHash), u.TreasureIdxMap.String)
}

/*saveConfigForS3 saves uploadSession config to S3 endpoint so that Lamdba function could read it.*/
func saveConfigForS3(u models.UploadSession) error {
config := uploadSessionConfig{
BatchSize: BatchSize,
FileSizeBytes: u.FileSizeBytes,
NumChunks: u.NumChunks,
ReserveIteration: u.Type == models.SessionTypeBeta,
}
data, err := json.Marshal(config)
if err != nil {
return oyster_utils.LogIfError(err, nil)
}
return setDefaultBucketObject(oyster_utils.GetObjectKeyForConfig(u.GenesisHash), string(data))
}
51 changes: 41 additions & 10 deletions utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,24 +98,21 @@ func SetLogInfoForDatabaseUrl(dbUrl string) {
func SendHttpReq(url string, req interface{}, resp interface{}) error {
reqJson, err := json.Marshal(req)
if err != nil {
LogIfError(err, nil)
return err
return LogIfError(err, nil)
}
reqBody := bytes.NewBuffer(reqJson)

httpRes, err := http.Post(url, "application/json", reqBody)
defer httpRes.Body.Close() // we need to close the connection

if err != nil {
LogIfError(err, nil)
return err
return LogIfError(err, nil)
}

if err := ParseResBody(httpRes, resp); err != nil {
err = fmt.Errorf("Unable to communicate with Beta node: %v", err)
// This should consider as BadRequest since the client pick the beta node.
LogIfError(err, nil)
return err
return LogIfError(err, nil)
}
return nil
}
Expand Down Expand Up @@ -256,8 +253,7 @@ func MergeIndexes(a []int, b []int, sectorSize int, numChunks int) ([]int, error
var merged []int
if len(a) == 0 && len(b) == 0 || len(a) != len(b) {
err := errors.New("Invalid input for utils.MergeIndexes. Both a []int and b []int must have the same length")
LogIfError(err, map[string]interface{}{"aInputSize": len(a), "bInputSize": len(b)})
return nil, err
return nil, LogIfError(err, map[string]interface{}{"aInputSize": len(a), "bInputSize": len(b)})
}

for i := 0; i < len(a); i++ {
Expand Down Expand Up @@ -311,9 +307,9 @@ func ConvertGweiToWei(gwei *big.Int) *big.Int {
}

/*LogIfError logs any error if it is not nil. Allow caller to provide additional freeform info.*/
func LogIfError(err error, extraInfo map[string]interface{}) {
func LogIfError(err error, extraInfo map[string]interface{}) error {
if err == nil {
return
return nil
}

fmt.Println(err)
Expand All @@ -325,6 +321,7 @@ func LogIfError(err error, extraInfo map[string]interface{}) {
raven.CaptureError(err, logErrorTags)
}
}
return err
}

/* ReturnEncryptedEthKey will be used by several models to encrypt the eth key so we are not storing a naked key */
Expand Down Expand Up @@ -391,3 +388,37 @@ func IntMax(x, y int) int {
func GenerateBadgerKey(startingString string, genesisHash string, chunkIdx int) string {
return fmt.Sprintf("%v%v__%d", startingString, genesisHash, chunkIdx)
}

/*GetObjectKeyForTreasure will return the treasure key.*/
func GetObjectKeyForConfig(genesisHash string) string {
return fmt.Sprintf("%v/%v", genesisHash, "config")
}

/*GetObjectKeyForTreasure will return the treasure key.*/
func GetObjectKeyForTreasure(genesisHash string) string {
return fmt.Sprintf("%v/%v", genesisHash, "treasure_map")
}

/*GetObjectKeyForData will return object key for particular data. startIndex is the smallest index in data. */
func GetObjectKeyForData(genesisHash string, startIndex int, totalCount int, isReserveIteration bool, batchSize int) string {
index := 0
if isReserveIteration {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit isReverseIteration

index = (totalCount - 1 - startIndex) / batchSize
} else {
index = startIndex / batchSize
}

return fmt.Sprintf("%v/%v/%v", genesisHash, "data", index)
}

/*GetObjectKeyForHash will return object key for particular data. startIndex is the smallest index in data. */
func GetObjectKeyForHash(genesisHash string, startIndex int, totalCount int, isReserveIteration bool, batchSize int) string {
index := 0
if isReserveIteration {
index = (totalCount - 1 - startIndex) / batchSize
} else {
index = startIndex / batchSize
}

return fmt.Sprintf("%v/%v/%v", genesisHash, "hash", index)
}