diff --git a/moonstream.go b/moonstream.go index 341580c..3f129e5 100644 --- a/moonstream.go +++ b/moonstream.go @@ -276,7 +276,7 @@ func (client *MoonstreamEngineAPIClient) CreateCallRequests( time.Sleep(time.Duration(sendReTryCnt) * time.Second) if sendReTryCnt > retries { - fmt.Printf("failed to send call requests") + return fmt.Errorf("failed to send call requests") } } diff --git a/server.go b/server.go index 98fe77e..1ea6bae 100644 --- a/server.go +++ b/server.go @@ -37,28 +37,57 @@ type Server struct { } type ServerRoutineInfo struct { - Id string `json:"id"` - Status string `json:"status"` + Id string `json:"id"` + Operation string `json:"operation"` + Status string `json:"status"` } -func (server *Server) RegisterRoutine(sri *ServerRoutineInfo) { +func (server *Server) RegisterServerRoutine(sri *ServerRoutineInfo) { server.ServerMu.Lock() server.ServerActiveRoutines[sri.Id] = sri server.ServerMu.Unlock() server.ServerWg.Add(1) } -func (server *Server) ReleaseRoutine(sri *ServerRoutineInfo) { +func (server *Server) ReleaseServerRoutine(sri *ServerRoutineInfo) { server.ServerMu.Lock() - delete(server.ServerActiveRoutines, sri.Id) + // TODO(kompotkot): Add background cleaner to remove memory leaks + server.ServerActiveRoutines[sri.Id].Status = "Complete" server.ServerMu.Unlock() server.ServerWg.Done() } +func (server *Server) InProgressServerRoutine(sri *ServerRoutineInfo) { + server.ServerMu.Lock() + server.ServerActiveRoutines[sri.Id].Status = "In Progress" + server.ServerMu.Unlock() +} + +func (server *Server) FailedServerRoutine(sri *ServerRoutineInfo) { + server.ServerMu.Lock() + server.ServerActiveRoutines[sri.Id].Status = "Failed" + server.ServerMu.Unlock() + server.ServerWg.Done() +} + +func (server *Server) GetServerRoutine(id string) (*ServerRoutineInfo, bool) { + server.ServerMu.Lock() + routineInfo, routineExists := server.ServerActiveRoutines[id] + server.ServerMu.Unlock() + + return routineInfo, routineExists +} + func (server *Server) CreateCallRequestsRoutine(sri *ServerRoutineInfo, authorizationToken string, req *SignDropperRequest, specs []CallRequestSpecification) { - defer server.ReleaseRoutine(sri) - sri.Status = "Running" - server.MoonstreamEngineAPIClient.CreateCallRequests(authorizationToken, "", req.Dropper, req.TtlDays, specs, 100, 1) + server.InProgressServerRoutine(sri) + + createReqErr := server.MoonstreamEngineAPIClient.CreateCallRequests(authorizationToken, "", req.Dropper, req.TtlDays, specs, 100, 1) + if createReqErr != nil { + server.FailedServerRoutine(sri) + return + } + + server.ReleaseServerRoutine(sri) } type PingResponse struct { @@ -72,6 +101,8 @@ type SignDropperRequest struct { TtlDays int `json:"ttl_days"` Sensible bool `json:"sensible"` Requests []*DropperClaimMessage `json:"requests"` + + ServerRoutineInfoId string `json:"server_routine_info_id"` } // Check access id was provided correctly and save user access configuration to request context @@ -186,14 +217,14 @@ func (server *Server) panicMiddleware(next http.Handler) http.Handler { }) } -// pingRoute response with status of load balancer server itself +// pingRoute returns status of waggle server itself func (server *Server) pingRoute(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") response := PingResponse{Status: "ok"} json.NewEncoder(w).Encode(response) } -// signDropperRoute response with status of load balancer server itself +// signDropperRoute sign dropper call requests func (server *Server) signDropperRoute(w http.ResponseWriter, r *http.Request) { authorizationToken := r.Context().Value("authorizationToken").(string) @@ -268,22 +299,50 @@ func (server *Server) signDropperRoute(w http.ResponseWriter, r *http.Request) { if isMetatxDrop { newSri := ServerRoutineInfo{ - Id: uuid.New().String(), - Status: "Initialized", + Id: uuid.New().String(), + Operation: "Create call_requests at metatx Engine API", + Status: "Initialized", } - server.RegisterRoutine(&newSri) + server.RegisterServerRoutine(&newSri) go server.CreateCallRequestsRoutine(&newSri, authorizationToken, req, callRequests) + req.ServerRoutineInfoId = newSri.Id } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(req) } +// InfoRoutinesRoute response with status of call requests registration at metatx Engine API +func (server *Server) InfoRoutinesRoute(w http.ResponseWriter, r *http.Request) { + urlParts := strings.Split(r.URL.Path, "/") + if len(urlParts) > 3 { + sriId := urlParts[3] + _, uuidParseErr := uuid.Parse(sriId) + if uuidParseErr != nil { + http.Error(w, "Incorrect routine info ID provided", http.StatusBadRequest) + return + } + + routineInfo, routineExists := server.GetServerRoutine(sriId) + if !routineExists { + http.Error(w, "Routine info not found", http.StatusNotFound) + return + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(routineInfo) + } else { + http.Error(w, "Invalid URL", http.StatusBadRequest) + return + } +} + // Serve handles server run func (server *Server) Serve() error { serveMux := http.NewServeMux() serveMux.HandleFunc("/ping", server.pingRoute) serveMux.HandleFunc("/sign/dropper", server.signDropperRoute) + serveMux.HandleFunc("/info/routines/", server.InfoRoutinesRoute) // Set list of common middleware, from bottom to top commonHandler := server.accessMiddleware(serveMux)