-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathjobs.go
72 lines (61 loc) · 1.81 KB
/
jobs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package status
import (
"context"
"encoding/json"
"net/http"
"sync/atomic"
"go.uber.org/zap"
)
type Jobs struct {
statusJobsRegistry JobsChecker
unavailableStatusCode int
log *zap.Logger
shutdownInitiated *atomic.Pointer[bool]
}
func NewJobsHandler(jc JobsChecker, shutdownInitiated *atomic.Pointer[bool], log *zap.Logger, usc int) *Jobs {
return &Jobs{
statusJobsRegistry: jc,
unavailableStatusCode: usc,
log: log,
shutdownInitiated: shutdownInitiated,
}
}
func (jb *Jobs) ServeHTTP(w http.ResponseWriter, _ *http.Request) {
if jb.shutdownInitiated != nil && *jb.shutdownInitiated.Load() {
http.Error(w, "service is shutting down", http.StatusServiceUnavailable)
return
}
if jb.statusJobsRegistry == nil {
http.Error(w, "jobs plugin not found", jb.unavailableStatusCode)
}
jobStates, err := jb.statusJobsRegistry.JobsState(context.Background())
if err != nil {
jb.log.Error("jobs state", zap.Error(err))
http.Error(w, "jobs plugin not found", jb.unavailableStatusCode)
return
}
report := make([]*JobsReport, 0, len(jobStates))
// write info about underlying drivers
for i := 0; i < len(jobStates); i++ {
report = append(report, &JobsReport{
Pipeline: jobStates[i].Pipeline,
Priority: jobStates[i].Priority,
Ready: jobStates[i].Ready,
Queue: jobStates[i].Queue,
Active: jobStates[i].Active,
Delayed: jobStates[i].Delayed,
Reserved: jobStates[i].Reserved,
Driver: jobStates[i].Driver,
ErrorMessage: jobStates[i].ErrorMessage,
})
}
data, err := json.Marshal(report)
if err != nil {
jb.log.Error("failed to marshal jobs state report", zap.Error(err))
return
}
_, err = w.Write(data)
if err != nil {
jb.log.Error("failed to write jobs state report", zap.Error(err))
}
}