From d82f65821c620d10a8664dead9fead223ada2114 Mon Sep 17 00:00:00 2001 From: Andrei Matveyeu Date: Wed, 9 Oct 2024 11:12:13 +0200 Subject: [PATCH 1/4] Use the common etcd client in etos-iut --- cmd/iut/main.go | 15 ++++-------- internal/database/etcd/etcd.go | 11 ++++++++- pkg/iut/v1alpha1/v1alpha1.go | 43 ++++++++++++++++++++-------------- 3 files changed, 40 insertions(+), 29 deletions(-) diff --git a/cmd/iut/main.go b/cmd/iut/main.go index 19cb0bd..13b95cb 100644 --- a/cmd/iut/main.go +++ b/cmd/iut/main.go @@ -25,6 +25,7 @@ import ( "time" config "github.com/eiffel-community/etos-api/internal/configs/iut" + "github.com/eiffel-community/etos-api/internal/database/etcd" "github.com/eiffel-community/etos-api/internal/logging" server "github.com/eiffel-community/etos-api/internal/server" "github.com/eiffel-community/etos-api/pkg/application" @@ -32,7 +33,6 @@ import ( "github.com/sirupsen/logrus" "github.com/snowzach/rotatefilehook" "go.elastic.co/ecslogrus" - clientv3 "go.etcd.io/etcd/client/v3" ) // main sets up logging and starts up the webserver. @@ -62,17 +62,10 @@ func main() { "user_log": false, }) - // Database connection test - cli, err := clientv3.New(clientv3.Config{ - Endpoints: []string{cfg.DatabaseURI()}, - DialTimeout: 5 * time.Second, - }) - if err != nil { - log.WithError(err).Fatal("failed to create etcd connection") - } - + iutEtcdTreePrefix := "/iut" + db := etcd.New(cfg, logger, iutEtcdTreePrefix) log.Info("Loading v1alpha1 routes") - v1alpha1App := v1alpha1.New(cfg, log, ctx, cli) + v1alpha1App := v1alpha1.New(cfg, log, ctx, db) defer v1alpha1App.Close() router := application.New(v1alpha1App) diff --git a/internal/database/etcd/etcd.go b/internal/database/etcd/etcd.go index c57fdaa..7eaf86a 100644 --- a/internal/database/etcd/etcd.go +++ b/internal/database/etcd/etcd.go @@ -68,12 +68,21 @@ func (etcd Etcd) Open(ctx context.Context, id uuid.UUID) io.ReadWriter { } } -// Write writes data to etcd +// Write writes data to etcd. If data is nil, the current key will be deleted from the database. func (etcd Etcd) Write(p []byte) (int, error) { if etcd.ID == uuid.Nil { return 0, errors.New("please create a new etcd client using Open") } key := fmt.Sprintf("%s/%s", etcd.treePrefix, etcd.ID.String()) + + if p == nil { + _, err := etcd.client.Delete(etcd.ctx, key) + if err != nil { + return 0, fmt.Errorf("Failed to delete key %s: %s", key, err.Error()) + } + return 0, nil + } + _, err := etcd.client.Put(etcd.ctx, key, string(p)) if err != nil { return 0, err diff --git a/pkg/iut/v1alpha1/v1alpha1.go b/pkg/iut/v1alpha1/v1alpha1.go index 53f356c..98d4d0a 100644 --- a/pkg/iut/v1alpha1/v1alpha1.go +++ b/pkg/iut/v1alpha1/v1alpha1.go @@ -26,9 +26,9 @@ import ( eiffelevents "github.com/eiffel-community/eiffelevents-sdk-go" config "github.com/eiffel-community/etos-api/internal/configs/iut" + "github.com/eiffel-community/etos-api/internal/database" "github.com/eiffel-community/etos-api/pkg/application" packageurl "github.com/package-url/packageurl-go" - clientv3 "go.etcd.io/etcd/client/v3" "github.com/google/uuid" "github.com/julienschmidt/httprouter" @@ -38,14 +38,14 @@ import ( type V1Alpha1Application struct { logger *logrus.Entry cfg config.Config - database *clientv3.Client + database database.Opener wg *sync.WaitGroup } type V1Alpha1Handler struct { logger *logrus.Entry cfg config.Config - database *clientv3.Client + database database.Opener wg *sync.WaitGroup } @@ -72,11 +72,11 @@ func (a *V1Alpha1Application) Close() { } // New returns a new V1Alpha1Application object/struct -func New(cfg config.Config, log *logrus.Entry, ctx context.Context, cli *clientv3.Client) application.Application { +func New(cfg config.Config, log *logrus.Entry, ctx context.Context, db database.Opener) application.Application { return &V1Alpha1Application{ logger: log, cfg: cfg, - database: cli, + database: db, wg: &sync.WaitGroup{}, } } @@ -127,6 +127,10 @@ type StopRequest struct { // Start creates a number of IUTs and stores them in the ETCD database returning a checkout ID. func (h V1Alpha1Handler) Start(w http.ResponseWriter, r *http.Request, ps httprouter.Params) { + identifier, err := uuid.Parse(r.Header.Get("X-Etos-Id")) + if err != nil { + RespondWithError(w, http.StatusInternalServerError, err.Error()) + } checkOutID := uuid.New() w.Header().Set("X-Content-Type-Options", "nosniff") @@ -153,7 +157,8 @@ func (h V1Alpha1Handler) Start(w http.ResponseWriter, r *http.Request, ps httpro RespondWithError(w, http.StatusInternalServerError, err.Error()) return } - _, err = h.database.Put(r.Context(), fmt.Sprintf("/iut/%s", checkOutID.String()), string(iuts)) + client := h.database.Open(r.Context(), identifier) + _, err = client.Write([]byte(string(iuts))) if err != nil { RespondWithError(w, http.StatusInternalServerError, err.Error()) return @@ -166,28 +171,28 @@ func (h V1Alpha1Handler) Start(w http.ResponseWriter, r *http.Request, ps httpro // Status creates a simple DONE Status response with IUTs. func (h V1Alpha1Handler) Status(w http.ResponseWriter, r *http.Request, ps httprouter.Params) { - identifier := r.Header.Get("X-Etos-Id") + identifier, err := uuid.Parse(r.Header.Get("X-Etos-Id")) + if err != nil { + RespondWithError(w, http.StatusInternalServerError, err.Error()) + } + logger := h.logger.WithField("identifier", identifier).WithContext(r.Context()) id, err := uuid.Parse(r.URL.Query().Get("id")) - key := fmt.Sprintf("/iut/%s", id) - dbResp, err := h.database.Get(r.Context(), key) + client := h.database.Open(r.Context(), identifier) + var data []byte + _, err = client.Read(data) if err != nil { logger.Errorf("Failed to look up status request id: %s", id) RespondWithError(w, http.StatusInternalServerError, err.Error()) return } - if len(dbResp.Kvs) == 0 { - err = fmt.Errorf("No key found: %s", key) - RespondWithError(w, http.StatusInternalServerError, err.Error()) - return - } statusResp := StatusResponse{ Id: id, Status: "DONE", } - if err = json.Unmarshal(dbResp.Kvs[0].Value, &statusResp.Iuts); err != nil { + if err = json.Unmarshal(data, &statusResp.Iuts); err != nil { RespondWithError(w, http.StatusInternalServerError, err.Error()) return } @@ -202,7 +207,10 @@ func (h V1Alpha1Handler) Status(w http.ResponseWriter, r *http.Request, ps httpr // Stop deletes the given IUTs from the database and returns an empty response. func (h V1Alpha1Handler) Stop(w http.ResponseWriter, r *http.Request, ps httprouter.Params) { - identifier := r.Header.Get("X-Etos-Id") + identifier, err := uuid.Parse(r.Header.Get("X-Etos-Id")) + if err != nil { + RespondWithError(w, http.StatusInternalServerError, err.Error()) + } logger := h.logger.WithField("identifier", identifier).WithContext(r.Context()) var stopReq StopRequest @@ -212,7 +220,8 @@ func (h V1Alpha1Handler) Stop(w http.ResponseWriter, r *http.Request, ps httprou RespondWithError(w, http.StatusBadRequest, err.Error()) return } - _, err := h.database.Delete(r.Context(), fmt.Sprintf("/iut/%s", stopReq.Id)) + client := h.database.Open(r.Context(), identifier) + _, err = client.Write(nil) if err != nil { logger.Errorf("Etcd delete failed: %s", err.Error()) RespondWithError(w, http.StatusInternalServerError, err.Error()) From 192e6a986f564b5694e98114bfe096d18f3b0c5e Mon Sep 17 00:00:00 2001 From: Andrei Matveyeu Date: Wed, 9 Oct 2024 12:14:11 +0200 Subject: [PATCH 2/4] debug output added --- internal/database/etcd/etcd.go | 60 ++++++++++++++++++++++++++-------- pkg/iut/v1alpha1/v1alpha1.go | 33 ++++++++++++++++--- 2 files changed, 75 insertions(+), 18 deletions(-) diff --git a/internal/database/etcd/etcd.go b/internal/database/etcd/etcd.go index 7eaf86a..6ea20ff 100644 --- a/internal/database/etcd/etcd.go +++ b/internal/database/etcd/etcd.go @@ -100,8 +100,7 @@ func (etcd *Etcd) readByte() byte { // Read reads data from etcd and returns p bytes to user func (etcd *Etcd) Read(p []byte) (n int, err error) { if etcd.ID == uuid.Nil { - err = errors.New("please create a new etcd client using NewWithID") - return n, err + return 0, errors.New("please create a new etcd client using NewWithID") } key := fmt.Sprintf("%s/%s", etcd.treePrefix, etcd.ID.String()) @@ -109,26 +108,61 @@ func (etcd *Etcd) Read(p []byte) (n int, err error) { if !etcd.hasRead { resp, err := etcd.client.Get(etcd.ctx, key) if err != nil { - return n, err + return 0, err } if len(resp.Kvs) == 0 { - return n, io.EOF + return 0, io.EOF } etcd.data = resp.Kvs[0].Value etcd.hasRead = true } if len(etcd.data) == 0 { - return n, io.EOF + return 0, io.EOF } - if c := cap(p); c > 0 { - for n < c { - p[n] = etcd.readByte() - n++ - if len(etcd.data) == 0 { - return n, io.EOF - } - } + + // Copy as much data as possible to p + n = copy(p, etcd.data) + etcd.data = etcd.data[n:] + + if n == 0 { + return 0, io.EOF } + return n, nil } + +// func (etcd *Etcd) Read(p []byte) (n int, err error) { +// if etcd.ID == uuid.Nil { +// err = errors.New("please create a new etcd client using NewWithID") +// return n, err +// } + +// key := fmt.Sprintf("%s/%s", etcd.treePrefix, etcd.ID.String()) + +// if !etcd.hasRead { +// resp, err := etcd.client.Get(etcd.ctx, key) +// if err != nil { +// return n, err +// } +// if len(resp.Kvs) == 0 { +// return n, io.EOF +// } +// etcd.data = resp.Kvs[0].Value +// etcd.hasRead = true +// } + +// if len(etcd.data) == 0 { +// return n, io.EOF +// } +// if c := cap(p); c > 0 { +// for n < c { +// p[n] = etcd.readByte() +// n++ +// if len(etcd.data) == 0 { +// return n, io.EOF +// } +// } +// } +// return n, nil +// } diff --git a/pkg/iut/v1alpha1/v1alpha1.go b/pkg/iut/v1alpha1/v1alpha1.go index 98d4d0a..f49da4c 100644 --- a/pkg/iut/v1alpha1/v1alpha1.go +++ b/pkg/iut/v1alpha1/v1alpha1.go @@ -128,6 +128,9 @@ type StopRequest struct { // Start creates a number of IUTs and stores them in the ETCD database returning a checkout ID. func (h V1Alpha1Handler) Start(w http.ResponseWriter, r *http.Request, ps httprouter.Params) { identifier, err := uuid.Parse(r.Header.Get("X-Etos-Id")) + logger := h.logger.WithField("identifier", identifier).WithContext(r.Context()) + logger.Infof("Start request: start") + if err != nil { RespondWithError(w, http.StatusInternalServerError, err.Error()) } @@ -138,12 +141,15 @@ func (h V1Alpha1Handler) Start(w http.ResponseWriter, r *http.Request, ps httpro var startReq StartRequest if err := json.NewDecoder(r.Body).Decode(&startReq); err != nil { + h.logger.Errorf("Failed to decode request body: %s", r.Body) RespondWithError(w, http.StatusBadRequest, err.Error()) return } defer r.Body.Close() purl, err := packageurl.FromString(startReq.ArtifactIdentity) + if err != nil { + logger.Errorf("Failed to get purl from artifact identity: %s", startReq.ArtifactIdentity) RespondWithError(w, http.StatusBadRequest, err.Error()) return } @@ -152,14 +158,19 @@ func (h V1Alpha1Handler) Start(w http.ResponseWriter, r *http.Request, ps httpro for i := range purls { purls[i] = purl } + logger.Infof("Purls: %s", purls) + logger.Infof("ArtifactIdentity: %s", startReq.ArtifactIdentity) + iuts, err := json.Marshal(purls) if err != nil { + logger.Errorf("Failed to marshal purls: %s", purls) RespondWithError(w, http.StatusInternalServerError, err.Error()) return } client := h.database.Open(r.Context(), identifier) _, err = client.Write([]byte(string(iuts))) if err != nil { + logger.Errorf("Failed to write to database: %s", string(iuts)) RespondWithError(w, http.StatusInternalServerError, err.Error()) return } @@ -167,6 +178,8 @@ func (h V1Alpha1Handler) Start(w http.ResponseWriter, r *http.Request, ps httpro w.WriteHeader(http.StatusOK) response, _ := json.Marshal(startResp) _, _ = w.Write(response) + logger.Infof("Start request: end. Written: /iut/%s/%s", identifier, iuts) + } // Status creates a simple DONE Status response with IUTs. @@ -175,16 +188,19 @@ func (h V1Alpha1Handler) Status(w http.ResponseWriter, r *http.Request, ps httpr if err != nil { RespondWithError(w, http.StatusInternalServerError, err.Error()) } - logger := h.logger.WithField("identifier", identifier).WithContext(r.Context()) + logger.Infof("Status request: start") id, err := uuid.Parse(r.URL.Query().Get("id")) - client := h.database.Open(r.Context(), identifier) - var data []byte - _, err = client.Read(data) + + data := make([]byte, 4096) + byteCount, err := client.Read(data) + data = data[:byteCount] + + logger.Infof("Reading /iut/%s, %d bytes", identifier, byteCount) if err != nil { - logger.Errorf("Failed to look up status request id: %s", id) + logger.Errorf("Failed to look up status request id: %s, %s", identifier, err.Error()) RespondWithError(w, http.StatusInternalServerError, err.Error()) return } @@ -193,21 +209,27 @@ func (h V1Alpha1Handler) Status(w http.ResponseWriter, r *http.Request, ps httpr Status: "DONE", } if err = json.Unmarshal(data, &statusResp.Iuts); err != nil { + logger.Errorf("Failed to unmarshal data: %s", data) RespondWithError(w, http.StatusInternalServerError, err.Error()) return } response, err := json.Marshal(statusResp) if err != nil { + logger.Errorf("Failed to marshal status response: %s", statusResp) RespondWithError(w, http.StatusInternalServerError, err.Error()) return } + logger.Infof("Status request: end") w.WriteHeader(http.StatusOK) _, _ = w.Write(response) + } // Stop deletes the given IUTs from the database and returns an empty response. func (h V1Alpha1Handler) Stop(w http.ResponseWriter, r *http.Request, ps httprouter.Params) { identifier, err := uuid.Parse(r.Header.Get("X-Etos-Id")) + h.logger.Infof("Stop request: start") + if err != nil { RespondWithError(w, http.StatusInternalServerError, err.Error()) } @@ -227,6 +249,7 @@ func (h V1Alpha1Handler) Stop(w http.ResponseWriter, r *http.Request, ps httprou RespondWithError(w, http.StatusInternalServerError, err.Error()) return } + h.logger.Infof("Stop request: end") w.WriteHeader(http.StatusNoContent) } From a0ff8ffcb47c942eb2d678d92416c2769cb7a3f5 Mon Sep 17 00:00:00 2001 From: Andrei Matveyeu Date: Wed, 9 Oct 2024 15:27:11 +0200 Subject: [PATCH 3/4] Improve etcd.Read() code --- internal/database/etcd/etcd.go | 41 +++++----------------------------- pkg/iut/v1alpha1/v1alpha1.go | 15 +------------ 2 files changed, 7 insertions(+), 49 deletions(-) diff --git a/internal/database/etcd/etcd.go b/internal/database/etcd/etcd.go index 6ea20ff..5a78dc1 100644 --- a/internal/database/etcd/etcd.go +++ b/internal/database/etcd/etcd.go @@ -122,7 +122,13 @@ func (etcd *Etcd) Read(p []byte) (n int, err error) { } // Copy as much data as possible to p + // The copy function copies the minimum of len(p) and len(etcd.data) bytes from etcd.data to p + // It returns the number of bytes copied, which is stored in n n = copy(p, etcd.data) + + // Update etcd.data to remove the portion of data that has already been copied to p + // etcd.data[n:] creates a new slice that starts from the n-th byte to the end of the original slice + // This effectively removes the first n bytes from etcd.data, ensuring that subsequent reads start from the correct position etcd.data = etcd.data[n:] if n == 0 { @@ -131,38 +137,3 @@ func (etcd *Etcd) Read(p []byte) (n int, err error) { return n, nil } - -// func (etcd *Etcd) Read(p []byte) (n int, err error) { -// if etcd.ID == uuid.Nil { -// err = errors.New("please create a new etcd client using NewWithID") -// return n, err -// } - -// key := fmt.Sprintf("%s/%s", etcd.treePrefix, etcd.ID.String()) - -// if !etcd.hasRead { -// resp, err := etcd.client.Get(etcd.ctx, key) -// if err != nil { -// return n, err -// } -// if len(resp.Kvs) == 0 { -// return n, io.EOF -// } -// etcd.data = resp.Kvs[0].Value -// etcd.hasRead = true -// } - -// if len(etcd.data) == 0 { -// return n, io.EOF -// } -// if c := cap(p); c > 0 { -// for n < c { -// p[n] = etcd.readByte() -// n++ -// if len(etcd.data) == 0 { -// return n, io.EOF -// } -// } -// } -// return n, nil -// } diff --git a/pkg/iut/v1alpha1/v1alpha1.go b/pkg/iut/v1alpha1/v1alpha1.go index f49da4c..a4f4917 100644 --- a/pkg/iut/v1alpha1/v1alpha1.go +++ b/pkg/iut/v1alpha1/v1alpha1.go @@ -129,8 +129,6 @@ type StopRequest struct { func (h V1Alpha1Handler) Start(w http.ResponseWriter, r *http.Request, ps httprouter.Params) { identifier, err := uuid.Parse(r.Header.Get("X-Etos-Id")) logger := h.logger.WithField("identifier", identifier).WithContext(r.Context()) - logger.Infof("Start request: start") - if err != nil { RespondWithError(w, http.StatusInternalServerError, err.Error()) } @@ -141,7 +139,7 @@ func (h V1Alpha1Handler) Start(w http.ResponseWriter, r *http.Request, ps httpro var startReq StartRequest if err := json.NewDecoder(r.Body).Decode(&startReq); err != nil { - h.logger.Errorf("Failed to decode request body: %s", r.Body) + logger.Errorf("Failed to decode request body: %s", r.Body) RespondWithError(w, http.StatusBadRequest, err.Error()) return } @@ -158,9 +156,6 @@ func (h V1Alpha1Handler) Start(w http.ResponseWriter, r *http.Request, ps httpro for i := range purls { purls[i] = purl } - logger.Infof("Purls: %s", purls) - logger.Infof("ArtifactIdentity: %s", startReq.ArtifactIdentity) - iuts, err := json.Marshal(purls) if err != nil { logger.Errorf("Failed to marshal purls: %s", purls) @@ -178,8 +173,6 @@ func (h V1Alpha1Handler) Start(w http.ResponseWriter, r *http.Request, ps httpro w.WriteHeader(http.StatusOK) response, _ := json.Marshal(startResp) _, _ = w.Write(response) - logger.Infof("Start request: end. Written: /iut/%s/%s", identifier, iuts) - } // Status creates a simple DONE Status response with IUTs. @@ -189,7 +182,6 @@ func (h V1Alpha1Handler) Status(w http.ResponseWriter, r *http.Request, ps httpr RespondWithError(w, http.StatusInternalServerError, err.Error()) } logger := h.logger.WithField("identifier", identifier).WithContext(r.Context()) - logger.Infof("Status request: start") id, err := uuid.Parse(r.URL.Query().Get("id")) client := h.database.Open(r.Context(), identifier) @@ -198,7 +190,6 @@ func (h V1Alpha1Handler) Status(w http.ResponseWriter, r *http.Request, ps httpr byteCount, err := client.Read(data) data = data[:byteCount] - logger.Infof("Reading /iut/%s, %d bytes", identifier, byteCount) if err != nil { logger.Errorf("Failed to look up status request id: %s, %s", identifier, err.Error()) RespondWithError(w, http.StatusInternalServerError, err.Error()) @@ -219,7 +210,6 @@ func (h V1Alpha1Handler) Status(w http.ResponseWriter, r *http.Request, ps httpr RespondWithError(w, http.StatusInternalServerError, err.Error()) return } - logger.Infof("Status request: end") w.WriteHeader(http.StatusOK) _, _ = w.Write(response) @@ -228,8 +218,6 @@ func (h V1Alpha1Handler) Status(w http.ResponseWriter, r *http.Request, ps httpr // Stop deletes the given IUTs from the database and returns an empty response. func (h V1Alpha1Handler) Stop(w http.ResponseWriter, r *http.Request, ps httprouter.Params) { identifier, err := uuid.Parse(r.Header.Get("X-Etos-Id")) - h.logger.Infof("Stop request: start") - if err != nil { RespondWithError(w, http.StatusInternalServerError, err.Error()) } @@ -249,7 +237,6 @@ func (h V1Alpha1Handler) Stop(w http.ResponseWriter, r *http.Request, ps httprou RespondWithError(w, http.StatusInternalServerError, err.Error()) return } - h.logger.Infof("Stop request: end") w.WriteHeader(http.StatusNoContent) } From 41d6d599e715af9bf4d483517600aed4e2a6616c Mon Sep 17 00:00:00 2001 From: Andrei Matveyeu Date: Fri, 11 Oct 2024 12:48:58 +0200 Subject: [PATCH 4/4] Update pkg/iut/v1alpha1/v1alpha1.go Co-authored-by: Fredrik Fristedt --- pkg/iut/v1alpha1/v1alpha1.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/iut/v1alpha1/v1alpha1.go b/pkg/iut/v1alpha1/v1alpha1.go index a4f4917..5fd1f51 100644 --- a/pkg/iut/v1alpha1/v1alpha1.go +++ b/pkg/iut/v1alpha1/v1alpha1.go @@ -147,7 +147,7 @@ func (h V1Alpha1Handler) Start(w http.ResponseWriter, r *http.Request, ps httpro purl, err := packageurl.FromString(startReq.ArtifactIdentity) if err != nil { - logger.Errorf("Failed to get purl from artifact identity: %s", startReq.ArtifactIdentity) + logger.Errorf("Failed to create a purl struct from artifact identity: %s", startReq.ArtifactIdentity) RespondWithError(w, http.StatusBadRequest, err.Error()) return }