Skip to content

Commit

Permalink
Reattempt parsing to support direct writes to the request file.
Browse files Browse the repository at this point in the history
This adds a loop to sleep for a while and reattempt parsing of the
request body if it fails the first time. It allows clients to safely
write to the request file without any renaming, which avoids potential
race conditions from multiple processes renaming to the same file.
  • Loading branch information
LTLA committed Apr 9, 2024
1 parent cf896e8 commit 8aa7631
Show file tree
Hide file tree
Showing 10 changed files with 84 additions and 58 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ When writing the request file, it is recommended to use the write-and-rename par
Specifically, users should write the JSON request body to a file inside the staging directory that does _not_ have the `request-<ACTION>-` prefix.
Once the write is complete, this file can be renamed to a file with said prefix.
This ensures that the Gobbler does not read a partially-written file.
(That said, a direct write to the final file can still be performed, in which case the Gobbler will perform a few retries to avoid errors from parsing an incomplete file.)

### Creating projects (admin)

Expand Down
6 changes: 3 additions & 3 deletions create.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@ func createProjectHandler(reqpath string, globals *globalConfiguration) error {
// Reading in the request.
handle, err := os.ReadFile(reqpath)
if err != nil {
return fmt.Errorf("failed to read %q; %w", reqpath, err)
return &readRequestError{ Cause: fmt.Errorf("failed to read %q; %w", reqpath, err) }
}
err = json.Unmarshal(handle, &request)
if err != nil {
return fmt.Errorf("failed to parse JSON from %q; %w", reqpath, err)
return &readRequestError{ Cause: fmt.Errorf("failed to parse JSON from %q; %w", reqpath, err) }
}

if request.Project == nil {
return fmt.Errorf("expected a 'project' property in %q", reqpath)
return &readRequestError{ Cause: fmt.Errorf("expected a 'project' property in %q", reqpath) }
}
project := *(request.Project)

Expand Down
13 changes: 7 additions & 6 deletions delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ func deleteProjectHandler(reqpath string, globals *globalConfiguration) error {
{
handle, err := os.ReadFile(reqpath)
if err != nil {
return fmt.Errorf("failed to read %q; %w", reqpath, err)
return &readRequestError{ Cause: fmt.Errorf("failed to read %q; %w", reqpath, err) }
}

err = json.Unmarshal(handle, &incoming)
if err != nil {
return fmt.Errorf("failed to parse JSON from %q; %w", reqpath, err)
return &readRequestError{ Cause: fmt.Errorf("failed to parse JSON from %q; %w", reqpath, err) }
}

err = isMissingOrBadName(incoming.Project)
Expand Down Expand Up @@ -75,18 +75,19 @@ func deleteAssetHandler(reqpath string, globals *globalConfiguration) error {
{
handle, err := os.ReadFile(reqpath)
if err != nil {
return fmt.Errorf("failed to read %q; %w", reqpath, err)
return &readRequestError{ Cause: fmt.Errorf("failed to read %q; %w", reqpath, err) }
}

err = json.Unmarshal(handle, &incoming)
if err != nil {
return fmt.Errorf("failed to parse JSON from %q; %w", reqpath, err)
return &readRequestError{ Cause: fmt.Errorf("failed to parse JSON from %q; %w", reqpath, err) }
}

err = isMissingOrBadName(incoming.Project)
if err != nil {
return fmt.Errorf("invalid 'project' property in %q; %w", reqpath, err)
}

err = isMissingOrBadName(incoming.Asset)
if err != nil {
return fmt.Errorf("invalid 'asset' property in %q; %w", reqpath, err)
Expand Down Expand Up @@ -160,12 +161,12 @@ func deleteVersionHandler(reqpath string, globals *globalConfiguration) error {
{
handle, err := os.ReadFile(reqpath)
if err != nil {
return fmt.Errorf("failed to read %q; %w", reqpath, err)
return &readRequestError{ Cause: fmt.Errorf("failed to read %q; %w", reqpath, err) }
}

err = json.Unmarshal(handle, &incoming)
if err != nil {
return fmt.Errorf("failed to parse JSON from %q; %w", reqpath, err)
return &readRequestError{ Cause: fmt.Errorf("failed to parse JSON from %q; %w", reqpath, err) }
}

err = isMissingOrBadName(incoming.Project)
Expand Down
4 changes: 2 additions & 2 deletions latest.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,12 @@ func refreshLatestHandler(reqpath string, globals *globalConfiguration) (*latest
{
handle, err := os.ReadFile(reqpath)
if err != nil {
return nil, fmt.Errorf("failed to read %q; %w", reqpath, err)
return nil, &readRequestError{ fmt.Errorf("failed to read %q; %w", reqpath, err) }
}

err = json.Unmarshal(handle, &incoming)
if err != nil {
return nil, fmt.Errorf("failed to parse JSON from %q; %w", reqpath, err)
return nil, &readRequestError{ fmt.Errorf("failed to parse JSON from %q; %w", reqpath, err) }
}

err = isMissingOrBadName(incoming.Project)
Expand Down
88 changes: 52 additions & 36 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,14 @@ func main() {
}
log.Println("triggered filesystem event:", event)

// It is expected that request bodies should be initially
// It is recommended that request bodies should be initially
// written to some other file (e.g., `.tmpXXXX`) inside the
// staging directory, and then moved to the actual file name
// (`request-<action>-YYY`). The rename should be atomic and
// thus we avoid problems with the code below triggering before
// the requester has completed the write of the body. Under
// this logic, we only have to watch the Create events as
// no Writes are being performed on a renamed file.
// this logic, we only have to watch the Create events as no
// Writes are being performed on a renamed file.
if event.Has(fsnotify.Create) {
info, err := os.Stat(event.Name)
if errors.Is(err, os.ErrNotExist) {
Expand All @@ -97,45 +97,61 @@ func main() {
var reportable_err error
payload := map[string]interface{}{}

if strings.HasPrefix(reqtype, "upload-") {
reportable_err = uploadHandler(reqpath, &globals)
// We prefer an atomic write, but nonetheless, if a request is directly written to the final
// file name, we will continuously retry (for up to 1 second) if there is any error during
// the reading of the request body. This takes advantage of the fact that an incompletely
// written JSON object must be invalid as it's missing the closing brace.
for i := 0; i < 4; i++ {
if strings.HasPrefix(reqtype, "upload-") {
reportable_err = uploadHandler(reqpath, &globals)

} else if strings.HasPrefix(reqtype, "refresh_latest-") {
res, err0 := refreshLatestHandler(reqpath, &globals)
if err0 == nil {
if res != nil {
payload["version"] = res.Version
}
} else {
reportable_err = err0
}

} else if strings.HasPrefix(reqtype, "refresh_latest-") {
res, err0 := refreshLatestHandler(reqpath, &globals)
if err0 == nil {
if res != nil {
payload["version"] = res.Version
} else if strings.HasPrefix(reqtype, "refresh_usage-") {
res, err0 := refreshUsageHandler(reqpath, &globals)
if err0 == nil {
payload["total"] = res.Total
} else {
reportable_err = err0
}
} else {
reportable_err = err0
}

} else if strings.HasPrefix(reqtype, "refresh_usage-") {
res, err0 := refreshUsageHandler(reqpath, &globals)
if err0 == nil {
payload["total"] = res.Total
} else if strings.HasPrefix(reqtype, "set_permissions-") {
reportable_err = setPermissionsHandler(reqpath, &globals)
} else if strings.HasPrefix(reqtype, "approve_probation-") {
reportable_err = approveProbationHandler(reqpath, &globals)
} else if strings.HasPrefix(reqtype, "reject_probation-") {
reportable_err = rejectProbationHandler(reqpath, &globals)
} else if strings.HasPrefix(reqtype, "create_project-") {
reportable_err = createProjectHandler(reqpath, &globals)
} else if strings.HasPrefix(reqtype, "delete_project-") {
reportable_err = deleteProjectHandler(reqpath, &globals)
} else if strings.HasPrefix(reqtype, "delete_asset-") {
reportable_err = deleteAssetHandler(reqpath, &globals)
} else if strings.HasPrefix(reqtype, "delete_version-") {
reportable_err = deleteVersionHandler(reqpath, &globals)
} else if strings.HasPrefix(reqtype, "health_check-") {
reportable_err = nil
} else {
reportable_err = err0
reportable_err = fmt.Errorf("cannot determine request type for %q", reqpath)
}

} else if strings.HasPrefix(reqtype, "set_permissions-") {
reportable_err = setPermissionsHandler(reqpath, &globals)
} else if strings.HasPrefix(reqtype, "approve_probation-") {
reportable_err = approveProbationHandler(reqpath, &globals)
} else if strings.HasPrefix(reqtype, "reject_probation-") {
reportable_err = rejectProbationHandler(reqpath, &globals)
} else if strings.HasPrefix(reqtype, "create_project-") {
reportable_err = createProjectHandler(reqpath, &globals)
} else if strings.HasPrefix(reqtype, "delete_project-") {
reportable_err = deleteProjectHandler(reqpath, &globals)
} else if strings.HasPrefix(reqtype, "delete_asset-") {
reportable_err = deleteAssetHandler(reqpath, &globals)
} else if strings.HasPrefix(reqtype, "delete_version-") {
reportable_err = deleteVersionHandler(reqpath, &globals)
} else if strings.HasPrefix(reqtype, "health_check-") {
reportable_err = nil
} else {
reportable_err = fmt.Errorf("cannot determine request type for %q", reqpath)
// If there's no error, or if the error is not a readRequestError, we quit
// and report the results. If the error is an RRE, we might be reading an
// incompletely written file, so we wait for a bit and try again.
if reportable_err == nil {
break
} else if _, ok := reportable_err.(*readRequestError); !ok {
break
}
time.Sleep(time.Second / 4.0)
}

if reportable_err == nil {
Expand Down
4 changes: 2 additions & 2 deletions permissions.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,12 +169,12 @@ func setPermissionsHandler(reqpath string, globals *globalConfiguration) error {
{
handle, err := os.ReadFile(reqpath)
if err != nil {
return fmt.Errorf("failed to read %q; %w", reqpath, err)
return &readRequestError{ Cause: fmt.Errorf("failed to read %q; %w", reqpath, err) }
}

err = json.Unmarshal(handle, &incoming)
if err != nil {
return fmt.Errorf("failed to parse JSON from %q; %w", reqpath, err)
return &readRequestError{ Cause: fmt.Errorf("failed to parse JSON from %q; %w", reqpath, err) }
}

err = isMissingOrBadName(incoming.Project)
Expand Down
10 changes: 5 additions & 5 deletions probation.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,27 @@ func baseProbationHandler(reqpath string, globals *globalConfiguration, approve
{
handle, err := os.ReadFile(reqpath)
if err != nil {
return fmt.Errorf("failed to read %q; %w", reqpath, err)
return &readRequestError{ Cause: fmt.Errorf("failed to read %q; %w", reqpath, err) }
}

err = json.Unmarshal(handle, &incoming)
if err != nil {
return fmt.Errorf("failed to parse JSON from %q; %w", reqpath, err)
return &readRequestError{ Cause: fmt.Errorf("failed to parse JSON from %q; %w", reqpath, err) }
}

err = isMissingOrBadName(incoming.Project)
if err != nil {
return fmt.Errorf("invalid 'project' property in %q; %w", reqpath, err)
return &readRequestError{ Cause: fmt.Errorf("invalid 'project' property in %q; %w", reqpath, err) }
}

err = isMissingOrBadName(incoming.Asset)
if err != nil {
return fmt.Errorf("invalid 'asset' property in %q; %w", reqpath, err)
return &readRequestError{ Cause: fmt.Errorf("invalid 'asset' property in %q; %w", reqpath, err) }
}

err = isMissingOrBadName(incoming.Version)
if err != nil {
return fmt.Errorf("invalid 'version' property in %q; %w", reqpath, err)
return &readRequestError{ Cause: fmt.Errorf("invalid 'version' property in %q; %w", reqpath, err) }
}
}

Expand Down
4 changes: 2 additions & 2 deletions upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,11 @@ func uploadHandler(reqpath string, globals *globalConfiguration) error {
{
handle, err := os.ReadFile(reqpath)
if err != nil {
return fmt.Errorf("failed to read %q; %w", reqpath, err)
return &readRequestError{ Cause: fmt.Errorf("failed to read %q; %w", reqpath, err) }
}
err = json.Unmarshal(handle, &request)
if err != nil {
return fmt.Errorf("failed to parse JSON from %q; %w", reqpath, err)
return &readRequestError{ Cause: fmt.Errorf("failed to parse JSON from %q; %w", reqpath, err) }
}

if request.Source == nil {
Expand Down
4 changes: 2 additions & 2 deletions usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,12 @@ func refreshUsageHandler(reqpath string, globals *globalConfiguration) (*usageMe
{
handle, err := os.ReadFile(reqpath)
if err != nil {
return nil, fmt.Errorf("failed to read %q; %w", reqpath, err)
return nil, &readRequestError{ Cause: fmt.Errorf("failed to read %q; %w", reqpath, err) }
}

err = json.Unmarshal(handle, &incoming)
if err != nil {
return nil, fmt.Errorf("failed to parse JSON from %q; %w", reqpath, err)
return nil, &readRequestError{ Cause: fmt.Errorf("failed to parse JSON from %q; %w", reqpath, err) }
}

err = isMissingOrBadName(incoming.Project)
Expand Down
8 changes: 8 additions & 0 deletions utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@ func newGlobalConfiguration(registry string) globalConfiguration {
}
}

type readRequestError struct {
Cause error
}

func (r *readRequestError) Error() string {
return r.Cause.Error()
}

func dumpJson(path string, content interface{}) error {
// Using the save-and-rename paradigm to avoid clients picking up partial writes.
temp, err := os.CreateTemp(filepath.Dir(path), ".temp*.json")
Expand Down

0 comments on commit 8aa7631

Please sign in to comment.