Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add buildruns and taskruns to the k8s-watcher #2007

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion k8s-watcher/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func NewJupyterServerCacheFromConfig(ctx context.Context, config Config, namespa
return
}

// NewAmaltheaSessionCacheFromConfig generates a new server cache from a configuration and a specfic k8s namespace.
// NewAmaltheaSessionCacheFromConfig generates a new session cache from a configuration and a specfic k8s namespace.
func NewAmaltheaSessionCacheFromConfig(ctx context.Context, config Config, namespace string) (res *Cache, err error) {
k8sDynamicClient, err := initializeK8sDynamicClient()
if err != nil {
Expand All @@ -160,3 +160,31 @@ func NewAmaltheaSessionCacheFromConfig(ctx context.Context, config Config, names
res = &Cache{informer: informer, lister: lister, namespace: namespace, userIDLabel: config.UserIDLabel}
return
}

// NewShipwrightBuildRunCacheFromConfig generates a new buildrun cache from a configuration and a specfic k8s namespace.
func NewShipwrightBuildRunCacheFromConfig(ctx context.Context, config Config, namespace string) (res *Cache, err error) {
k8sDynamicClient, err := initializeK8sDynamicClient()
if err != nil {
return
}
resource := schema.GroupVersionResource{Group: config.ShipwrightBuildRunGroup, Version: config.ShipwrightBuildRunVersion, Resource: config.ShipwrightBuildRunPlural}
factory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(k8sDynamicClient, time.Minute, namespace, nil)
informer := factory.ForResource(resource).Informer()
lister := factory.ForResource(resource).Lister()
res = &Cache{informer: informer, lister: lister, namespace: namespace, userIDLabel: config.UserIDLabel}
return
}

// NewTektonTaskRunCacheFromConfig generates a new taskrun cache from a configuration and a specfic k8s namespace.
func NewTektonTaskRunCacheFromConfig(ctx context.Context, config Config, namespace string) (res *Cache, err error) {
k8sDynamicClient, err := initializeK8sDynamicClient()
if err != nil {
return
}
resource := schema.GroupVersionResource{Group: config.TektonTaskRunGroup, Version: config.TektonTaskRunVersion, Resource: config.TektonTaskRunPlural}
factory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(k8sDynamicClient, time.Minute, namespace, nil)
informer := factory.ForResource(resource).Informer()
lister := factory.ForResource(resource).Lister()
res = &Cache{informer: informer, lister: lister, namespace: namespace, userIDLabel: config.UserIDLabel}
return
}
30 changes: 30 additions & 0 deletions k8s-watcher/cache_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,3 +164,33 @@ func NewAmaltheaSessionCacheCollectionFromConfigOrDie(ctx context.Context, confi
}
return &caches
}

// NewShipwrightBuildRunCacheCollectionFromConfigOrDie generates a new cache map from a configuration. If it cannot
// do this successfully it will terminate the program because the server cannot run at all if this
// step fails in any way and the program cannot recover from errors that occur here.
func NewShipwrightBuildRunCacheCollectionFromConfigOrDie(ctx context.Context, config Config) *CacheCollection {
caches := CacheCollection{}
for _, namespace := range config.Namespaces {
cache, err := NewShipwrightBuildRunCacheFromConfig(ctx, config, namespace)
if err != nil {
log.Fatalf("Cannot create cache collection: %v\n", err)
}
caches[namespace] = cache
}
return &caches
}

// NewTektonTaskRunCacheCollectionFromConfigOrDie generates a new cache map from a configuration. If it cannot
// do this successfully it will terminate the program because the server cannot run at all if this
// step fails in any way and the program cannot recover from errors that occur here.
func NewTektonTaskRunCacheCollectionFromConfigOrDie(ctx context.Context, config Config) *CacheCollection {
caches := CacheCollection{}
for _, namespace := range config.Namespaces {
cache, err := NewTektonTaskRunCacheFromConfig(ctx, config, namespace)
if err != nil {
log.Fatalf("Cannot create cache collection: %v\n", err)
}
caches[namespace] = cache
}
return &caches
}
60 changes: 54 additions & 6 deletions k8s-watcher/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,30 @@ import (
type Config struct {
// A list of k8s namespaces where resources will be cached and watched for.
Namespaces []string
// The group of the k8s resource that shoud be cached.
// The group of the k8s resource that should be cached.
CrGroup string
// The version of the k8s resource that shoud be cached.
// The version of the k8s resource that should be cached.
CrVersion string
// The plural name of the k8s resource that shoud be cached.
// The plural name of the k8s resource that should be cached.
CrPlural string
// The group of the AmaltheaSession resource that shoud be cached.
// The group of the AmaltheaSession resource that should be cached.
AmaltheaSessionGroup string
// The version of the AmaltheaSession resource that shoud be cached.
// The version of the AmaltheaSession resource that should be cached.
AmaltheaSessionVersion string
// The plural name of the AmaltheaSession resource that shoud be cached.
// The plural name of the AmaltheaSession resource that should be cached.
AmaltheaSessionPlural string
// The group of the ShipwrightBuildRun resource that should be cached.
ShipwrightBuildRunGroup string
// The version of the ShipwrightBuildRun resource that should be cached.
ShipwrightBuildRunVersion string
// The plural name of the ShipwrightBuildRun resource that should be cached.
ShipwrightBuildRunPlural string
// The group of the TektonTaskRun resource that should be cached.
TektonTaskRunGroup string
// The version of the TektonTaskRun resource that should be cached.
TektonTaskRunVersion string
// The plural name of the TektonTaskRun resource that should be cached.
TektonTaskRunPlural string
// The port where the server will listen to for providing responses to requests
// about listing the cached resources or for returning specific resources.
Port int
Expand Down Expand Up @@ -90,6 +102,42 @@ func NewConfigFromEnvOrDie(prefix string) Config {
config.AmaltheaSessionPlural = "amaltheasessions"
}

if brGroup, ok := os.LookupEnv(fmt.Sprintf("%sSHIPWRIGHT_BUILDRUN_GROUP", prefix)); ok {
config.ShipwrightBuildRunGroup = brGroup
} else {
config.ShipwrightBuildRunGroup = "shipwright.io"
}

if brVersion, ok := os.LookupEnv(fmt.Sprintf("%sSHIPWRIGHT_BUILDRUN_VERSION", prefix)); ok {
config.ShipwrightBuildRunVersion = brVersion
} else {
config.ShipwrightBuildRunVersion = "v1beta1"
}

if brPlural, ok := os.LookupEnv(fmt.Sprintf("%sSHIPWRIGHT_BUILDRUN_PLURAL", prefix)); ok {
config.ShipwrightBuildRunPlural = brPlural
} else {
config.ShipwrightBuildRunPlural = "buildruns"
}

if trGroup, ok := os.LookupEnv(fmt.Sprintf("%sTEKTON_TASKRUN_GROUP", prefix)); ok {
config.TektonTaskRunGroup = trGroup
} else {
config.TektonTaskRunGroup = "tekton.dev"
}

if trVersion, ok := os.LookupEnv(fmt.Sprintf("%sTEKTON_TASKRUN_VERSION", prefix)); ok {
config.TektonTaskRunVersion = trVersion
} else {
config.TektonTaskRunVersion = "v1"
}

if trPlural, ok := os.LookupEnv(fmt.Sprintf("%sTEKTON_TASKRUN_PLURAL", prefix)); ok {
config.TektonTaskRunPlural = trPlural
} else {
config.TektonTaskRunPlural = "taskruns"
}

if port, ok := os.LookupEnv(fmt.Sprintf("%sPORT", prefix)); ok {
portInt, err := strconv.Atoi(port)
if err != nil {
Expand Down
10 changes: 10 additions & 0 deletions k8s-watcher/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
type Server struct {
cachesJS CacheCollection
cachesAS CacheCollection
cachesBR CacheCollection
cachesTR CacheCollection
config Config
router *httprouter.Router
*http.Server
Expand Down Expand Up @@ -49,8 +51,12 @@ func (s *Server) Initialize(ctx context.Context) {
s.Handler = s
go s.cachesJS.run(ctx)
go s.cachesAS.run(ctx)
go s.cachesBR.run(ctx)
go s.cachesTR.run(ctx)
s.cachesJS.synchronize(ctx, s.config.CacheSyncTimeout)
s.cachesAS.synchronize(ctx, s.config.CacheSyncTimeout)
s.cachesBR.synchronize(ctx, s.config.CacheSyncTimeout)
s.cachesTR.synchronize(ctx, s.config.CacheSyncTimeout)
}

func (s *Server) respond(w http.ResponseWriter, req *http.Request, data interface{}, err error) {
Expand All @@ -72,10 +78,14 @@ func (s *Server) respond(w http.ResponseWriter, req *http.Request, data interfac
func NewServerFromConfigOrDie(ctx context.Context, config Config) *Server {
cacheCollectionJS := NewJupyterServerCacheCollectionFromConfigOrDie(ctx, config)
cacheCollectionAS := NewAmaltheaSessionCacheCollectionFromConfigOrDie(ctx, config)
cacheCollectionBR := NewShipwrightBuildRunCacheCollectionFromConfigOrDie(ctx, config)
cacheCollectionTR := NewTektonTaskRunCacheCollectionFromConfigOrDie(ctx, config)
return &Server{
config: config,
cachesJS: *cacheCollectionJS,
cachesAS: *cacheCollectionAS,
cachesBR: *cacheCollectionBR,
cachesTR: *cacheCollectionTR,
router: httprouter.New(),
Server: &http.Server{
Addr: fmt.Sprintf(":%d", config.Port),
Expand Down
28 changes: 28 additions & 0 deletions k8s-watcher/server_routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ func (s *Server) registerRoutes() {
s.router.HandlerFunc("GET", "/sessions/:serverID", s.asGetOne)
s.router.HandlerFunc("GET", "/users/:userID/sessions", s.asUserID)
s.router.HandlerFunc("GET", "/users/:userID/sessions/:serverID", s.asUserIDServerID)
// Used for the shipwright operator in charge of image build custom resources
s.router.HandlerFunc("GET", "/buildruns", s.brGetAll)
s.router.HandlerFunc("GET", "/buildruns/:buildRunID", s.brGetOne)
// Used for the shipwright operator in charge of image build custom resources
s.router.HandlerFunc("GET", "/taskruns", s.trGetAll)
s.router.HandlerFunc("GET", "/taskruns/:taskRunID", s.trGetOne)
}

func (s *Server) jsGetAll(w http.ResponseWriter, req *http.Request) {
Expand Down Expand Up @@ -67,6 +73,28 @@ func (s *Server) asUserIDServerID(w http.ResponseWriter, req *http.Request) {
s.respond(w, req, output, err)
}

func (s *Server) brGetAll(w http.ResponseWriter, req *http.Request) {
output, err := s.cachesBR.getAll()
s.respond(w, req, output, err)
}

func (s *Server) brGetOne(w http.ResponseWriter, req *http.Request) {
params := httprouter.ParamsFromContext(req.Context())
output, err := s.cachesBR.getByName(params.ByName("buildRunID"))
s.respond(w, req, output, err)
}

func (s *Server) trGetAll(w http.ResponseWriter, req *http.Request) {
output, err := s.cachesTR.getAll()
s.respond(w, req, output, err)
}

func (s *Server) trGetOne(w http.ResponseWriter, req *http.Request) {
params := httprouter.ParamsFromContext(req.Context())
output, err := s.cachesTR.getByName(params.ByName("taskRunID"))
s.respond(w, req, output, err)
}

func (s *Server) handleHealthCheck(w http.ResponseWriter, req *http.Request) {
s.respond(w, req, map[string]string{"running": "ok"}, nil)
}
Loading