From 8aa7631972c7373ade37e8053e7641f2eb7ebb35 Mon Sep 17 00:00:00 2001 From: LTLA Date: Tue, 9 Apr 2024 14:40:08 -0700 Subject: [PATCH] Reattempt parsing to support direct writes to the request file. This adds a loop to sleep for a while and reattempt parsing of the request body if it fails the first time. It allows clients to safely write to the request file without any renaming, which avoids potential race conditions from multiple processes renaming to the same file. --- README.md | 1 + create.go | 6 ++-- delete.go | 13 ++++---- latest.go | 4 +-- main.go | 88 +++++++++++++++++++++++++++++--------------------- permissions.go | 4 +-- probation.go | 10 +++--- upload.go | 4 +-- usage.go | 4 +-- utils.go | 8 +++++ 10 files changed, 84 insertions(+), 58 deletions(-) diff --git a/README.md b/README.md index cecc28e..32f2c59 100644 --- a/README.md +++ b/README.md @@ -150,6 +150,7 @@ When writing the request file, it is recommended to use the write-and-rename par Specifically, users should write the JSON request body to a file inside the staging directory that does _not_ have the `request--` prefix. Once the write is complete, this file can be renamed to a file with said prefix. This ensures that the Gobbler does not read a partially-written file. +(That said, a direct write to the final file can still be performed, in which case the Gobbler will perform a few retries to avoid errors from parsing an incomplete file.) ### Creating projects (admin) diff --git a/create.go b/create.go index ce56a58..005b23d 100644 --- a/create.go +++ b/create.go @@ -27,15 +27,15 @@ func createProjectHandler(reqpath string, globals *globalConfiguration) error { // Reading in the request. handle, err := os.ReadFile(reqpath) if err != nil { - return fmt.Errorf("failed to read %q; %w", reqpath, err) + return &readRequestError{ Cause: fmt.Errorf("failed to read %q; %w", reqpath, err) } } err = json.Unmarshal(handle, &request) if err != nil { - return fmt.Errorf("failed to parse JSON from %q; %w", reqpath, err) + return &readRequestError{ Cause: fmt.Errorf("failed to parse JSON from %q; %w", reqpath, err) } } if request.Project == nil { - return fmt.Errorf("expected a 'project' property in %q", reqpath) + return &readRequestError{ Cause: fmt.Errorf("expected a 'project' property in %q", reqpath) } } project := *(request.Project) diff --git a/delete.go b/delete.go index 62496df..9b3d66e 100644 --- a/delete.go +++ b/delete.go @@ -24,12 +24,12 @@ func deleteProjectHandler(reqpath string, globals *globalConfiguration) error { { handle, err := os.ReadFile(reqpath) if err != nil { - return fmt.Errorf("failed to read %q; %w", reqpath, err) + return &readRequestError{ Cause: fmt.Errorf("failed to read %q; %w", reqpath, err) } } err = json.Unmarshal(handle, &incoming) if err != nil { - return fmt.Errorf("failed to parse JSON from %q; %w", reqpath, err) + return &readRequestError{ Cause: fmt.Errorf("failed to parse JSON from %q; %w", reqpath, err) } } err = isMissingOrBadName(incoming.Project) @@ -75,18 +75,19 @@ func deleteAssetHandler(reqpath string, globals *globalConfiguration) error { { handle, err := os.ReadFile(reqpath) if err != nil { - return fmt.Errorf("failed to read %q; %w", reqpath, err) + return &readRequestError{ Cause: fmt.Errorf("failed to read %q; %w", reqpath, err) } } err = json.Unmarshal(handle, &incoming) if err != nil { - return fmt.Errorf("failed to parse JSON from %q; %w", reqpath, err) + return &readRequestError{ Cause: fmt.Errorf("failed to parse JSON from %q; %w", reqpath, err) } } err = isMissingOrBadName(incoming.Project) if err != nil { return fmt.Errorf("invalid 'project' property in %q; %w", reqpath, err) } + err = isMissingOrBadName(incoming.Asset) if err != nil { return fmt.Errorf("invalid 'asset' property in %q; %w", reqpath, err) @@ -160,12 +161,12 @@ func deleteVersionHandler(reqpath string, globals *globalConfiguration) error { { handle, err := os.ReadFile(reqpath) if err != nil { - return fmt.Errorf("failed to read %q; %w", reqpath, err) + return &readRequestError{ Cause: fmt.Errorf("failed to read %q; %w", reqpath, err) } } err = json.Unmarshal(handle, &incoming) if err != nil { - return fmt.Errorf("failed to parse JSON from %q; %w", reqpath, err) + return &readRequestError{ Cause: fmt.Errorf("failed to parse JSON from %q; %w", reqpath, err) } } err = isMissingOrBadName(incoming.Project) diff --git a/latest.go b/latest.go index edb6f91..d4d7fd9 100644 --- a/latest.go +++ b/latest.go @@ -100,12 +100,12 @@ func refreshLatestHandler(reqpath string, globals *globalConfiguration) (*latest { handle, err := os.ReadFile(reqpath) if err != nil { - return nil, fmt.Errorf("failed to read %q; %w", reqpath, err) + return nil, &readRequestError{ fmt.Errorf("failed to read %q; %w", reqpath, err) } } err = json.Unmarshal(handle, &incoming) if err != nil { - return nil, fmt.Errorf("failed to parse JSON from %q; %w", reqpath, err) + return nil, &readRequestError{ fmt.Errorf("failed to parse JSON from %q; %w", reqpath, err) } } err = isMissingOrBadName(incoming.Project) diff --git a/main.go b/main.go index 6dde445..bb9fc9e 100644 --- a/main.go +++ b/main.go @@ -68,14 +68,14 @@ func main() { } log.Println("triggered filesystem event:", event) - // It is expected that request bodies should be initially + // It is recommended that request bodies should be initially // written to some other file (e.g., `.tmpXXXX`) inside the // staging directory, and then moved to the actual file name // (`request--YYY`). The rename should be atomic and // thus we avoid problems with the code below triggering before // the requester has completed the write of the body. Under - // this logic, we only have to watch the Create events as - // no Writes are being performed on a renamed file. + // this logic, we only have to watch the Create events as no + // Writes are being performed on a renamed file. if event.Has(fsnotify.Create) { info, err := os.Stat(event.Name) if errors.Is(err, os.ErrNotExist) { @@ -97,45 +97,61 @@ func main() { var reportable_err error payload := map[string]interface{}{} - if strings.HasPrefix(reqtype, "upload-") { - reportable_err = uploadHandler(reqpath, &globals) + // We prefer an atomic write, but nonetheless, if a request is directly written to the final + // file name, we will continuously retry (for up to 1 second) if there is any error during + // the reading of the request body. This takes advantage of the fact that an incompletely + // written JSON object must be invalid as it's missing the closing brace. + for i := 0; i < 4; i++ { + if strings.HasPrefix(reqtype, "upload-") { + reportable_err = uploadHandler(reqpath, &globals) + + } else if strings.HasPrefix(reqtype, "refresh_latest-") { + res, err0 := refreshLatestHandler(reqpath, &globals) + if err0 == nil { + if res != nil { + payload["version"] = res.Version + } + } else { + reportable_err = err0 + } - } else if strings.HasPrefix(reqtype, "refresh_latest-") { - res, err0 := refreshLatestHandler(reqpath, &globals) - if err0 == nil { - if res != nil { - payload["version"] = res.Version + } else if strings.HasPrefix(reqtype, "refresh_usage-") { + res, err0 := refreshUsageHandler(reqpath, &globals) + if err0 == nil { + payload["total"] = res.Total + } else { + reportable_err = err0 } - } else { - reportable_err = err0 - } - } else if strings.HasPrefix(reqtype, "refresh_usage-") { - res, err0 := refreshUsageHandler(reqpath, &globals) - if err0 == nil { - payload["total"] = res.Total + } else if strings.HasPrefix(reqtype, "set_permissions-") { + reportable_err = setPermissionsHandler(reqpath, &globals) + } else if strings.HasPrefix(reqtype, "approve_probation-") { + reportable_err = approveProbationHandler(reqpath, &globals) + } else if strings.HasPrefix(reqtype, "reject_probation-") { + reportable_err = rejectProbationHandler(reqpath, &globals) + } else if strings.HasPrefix(reqtype, "create_project-") { + reportable_err = createProjectHandler(reqpath, &globals) + } else if strings.HasPrefix(reqtype, "delete_project-") { + reportable_err = deleteProjectHandler(reqpath, &globals) + } else if strings.HasPrefix(reqtype, "delete_asset-") { + reportable_err = deleteAssetHandler(reqpath, &globals) + } else if strings.HasPrefix(reqtype, "delete_version-") { + reportable_err = deleteVersionHandler(reqpath, &globals) + } else if strings.HasPrefix(reqtype, "health_check-") { + reportable_err = nil } else { - reportable_err = err0 + reportable_err = fmt.Errorf("cannot determine request type for %q", reqpath) } - } else if strings.HasPrefix(reqtype, "set_permissions-") { - reportable_err = setPermissionsHandler(reqpath, &globals) - } else if strings.HasPrefix(reqtype, "approve_probation-") { - reportable_err = approveProbationHandler(reqpath, &globals) - } else if strings.HasPrefix(reqtype, "reject_probation-") { - reportable_err = rejectProbationHandler(reqpath, &globals) - } else if strings.HasPrefix(reqtype, "create_project-") { - reportable_err = createProjectHandler(reqpath, &globals) - } else if strings.HasPrefix(reqtype, "delete_project-") { - reportable_err = deleteProjectHandler(reqpath, &globals) - } else if strings.HasPrefix(reqtype, "delete_asset-") { - reportable_err = deleteAssetHandler(reqpath, &globals) - } else if strings.HasPrefix(reqtype, "delete_version-") { - reportable_err = deleteVersionHandler(reqpath, &globals) - } else if strings.HasPrefix(reqtype, "health_check-") { - reportable_err = nil - } else { - reportable_err = fmt.Errorf("cannot determine request type for %q", reqpath) + // If there's no error, or if the error is not a readRequestError, we quit + // and report the results. If the error is an RRE, we might be reading an + // incompletely written file, so we wait for a bit and try again. + if reportable_err == nil { + break + } else if _, ok := reportable_err.(*readRequestError); !ok { + break + } + time.Sleep(time.Second / 4.0) } if reportable_err == nil { diff --git a/permissions.go b/permissions.go index 8d85b3e..7e4faba 100644 --- a/permissions.go +++ b/permissions.go @@ -169,12 +169,12 @@ func setPermissionsHandler(reqpath string, globals *globalConfiguration) error { { handle, err := os.ReadFile(reqpath) if err != nil { - return fmt.Errorf("failed to read %q; %w", reqpath, err) + return &readRequestError{ Cause: fmt.Errorf("failed to read %q; %w", reqpath, err) } } err = json.Unmarshal(handle, &incoming) if err != nil { - return fmt.Errorf("failed to parse JSON from %q; %w", reqpath, err) + return &readRequestError{ Cause: fmt.Errorf("failed to parse JSON from %q; %w", reqpath, err) } } err = isMissingOrBadName(incoming.Project) diff --git a/probation.go b/probation.go index 9a16f88..59006fb 100644 --- a/probation.go +++ b/probation.go @@ -18,27 +18,27 @@ func baseProbationHandler(reqpath string, globals *globalConfiguration, approve { handle, err := os.ReadFile(reqpath) if err != nil { - return fmt.Errorf("failed to read %q; %w", reqpath, err) + return &readRequestError{ Cause: fmt.Errorf("failed to read %q; %w", reqpath, err) } } err = json.Unmarshal(handle, &incoming) if err != nil { - return fmt.Errorf("failed to parse JSON from %q; %w", reqpath, err) + return &readRequestError{ Cause: fmt.Errorf("failed to parse JSON from %q; %w", reqpath, err) } } err = isMissingOrBadName(incoming.Project) if err != nil { - return fmt.Errorf("invalid 'project' property in %q; %w", reqpath, err) + return &readRequestError{ Cause: fmt.Errorf("invalid 'project' property in %q; %w", reqpath, err) } } err = isMissingOrBadName(incoming.Asset) if err != nil { - return fmt.Errorf("invalid 'asset' property in %q; %w", reqpath, err) + return &readRequestError{ Cause: fmt.Errorf("invalid 'asset' property in %q; %w", reqpath, err) } } err = isMissingOrBadName(incoming.Version) if err != nil { - return fmt.Errorf("invalid 'version' property in %q; %w", reqpath, err) + return &readRequestError{ Cause: fmt.Errorf("invalid 'version' property in %q; %w", reqpath, err) } } } diff --git a/upload.go b/upload.go index 0702f95..0b2f61c 100644 --- a/upload.go +++ b/upload.go @@ -67,11 +67,11 @@ func uploadHandler(reqpath string, globals *globalConfiguration) error { { handle, err := os.ReadFile(reqpath) if err != nil { - return fmt.Errorf("failed to read %q; %w", reqpath, err) + return &readRequestError{ Cause: fmt.Errorf("failed to read %q; %w", reqpath, err) } } err = json.Unmarshal(handle, &request) if err != nil { - return fmt.Errorf("failed to parse JSON from %q; %w", reqpath, err) + return &readRequestError{ Cause: fmt.Errorf("failed to parse JSON from %q; %w", reqpath, err) } } if request.Source == nil { diff --git a/usage.go b/usage.go index 060561a..02d8bef 100644 --- a/usage.go +++ b/usage.go @@ -93,12 +93,12 @@ func refreshUsageHandler(reqpath string, globals *globalConfiguration) (*usageMe { handle, err := os.ReadFile(reqpath) if err != nil { - return nil, fmt.Errorf("failed to read %q; %w", reqpath, err) + return nil, &readRequestError{ Cause: fmt.Errorf("failed to read %q; %w", reqpath, err) } } err = json.Unmarshal(handle, &incoming) if err != nil { - return nil, fmt.Errorf("failed to parse JSON from %q; %w", reqpath, err) + return nil, &readRequestError{ Cause: fmt.Errorf("failed to parse JSON from %q; %w", reqpath, err) } } err = isMissingOrBadName(incoming.Project) diff --git a/utils.go b/utils.go index fb18ea1..62b59e9 100644 --- a/utils.go +++ b/utils.go @@ -26,6 +26,14 @@ func newGlobalConfiguration(registry string) globalConfiguration { } } +type readRequestError struct { + Cause error +} + +func (r *readRequestError) Error() string { + return r.Cause.Error() +} + func dumpJson(path string, content interface{}) error { // Using the save-and-rename paradigm to avoid clients picking up partial writes. temp, err := os.CreateTemp(filepath.Dir(path), ".temp*.json")