diff --git a/.gitignore b/.gitignore index ec247bc2..664f86f6 100644 --- a/.gitignore +++ b/.gitignore @@ -22,3 +22,5 @@ _testmain.go *.exe *.test + +.kube-version diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 7298f9c0..afb40fc9 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -27,149 +27,153 @@ }, { "ImportPath": "github.com/GoogleCloudPlatform/kubernetes/pkg/api", - "Comment": "v0.2-108-g1853c66", - "Rev": "1853c66ddfdfb7d673371e9a2f4be65c066ac81b" + "Comment": "v0.4.4", + "Rev": "efd443bcd6f005566daa85da0a5f0b633b40d4e3" }, { "ImportPath": "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver", - "Comment": "v0.2-108-g1853c66", - "Rev": "1853c66ddfdfb7d673371e9a2f4be65c066ac81b" + "Comment": "v0.4.4", + "Rev": "efd443bcd6f005566daa85da0a5f0b633b40d4e3" }, { "ImportPath": "github.com/GoogleCloudPlatform/kubernetes/pkg/client", - "Comment": "v0.2-108-g1853c66", - "Rev": "1853c66ddfdfb7d673371e9a2f4be65c066ac81b" + "Comment": "v0.4.4", + "Rev": "efd443bcd6f005566daa85da0a5f0b633b40d4e3" }, { "ImportPath": "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider", - "Comment": "v0.2-108-g1853c66", - "Rev": "1853c66ddfdfb7d673371e9a2f4be65c066ac81b" + "Comment": "v0.4.4", + "Rev": "efd443bcd6f005566daa85da0a5f0b633b40d4e3" }, { "ImportPath": "github.com/GoogleCloudPlatform/kubernetes/pkg/constraint", - "Comment": "v0.2-108-g1853c66", - "Rev": "1853c66ddfdfb7d673371e9a2f4be65c066ac81b" + "Comment": "v0.4.4", + "Rev": "efd443bcd6f005566daa85da0a5f0b633b40d4e3" }, { "ImportPath": "github.com/GoogleCloudPlatform/kubernetes/pkg/conversion", - "Comment": "v0.2-108-g1853c66", - "Rev": "1853c66ddfdfb7d673371e9a2f4be65c066ac81b" + "Comment": "v0.4.4", + "Rev": "efd443bcd6f005566daa85da0a5f0b633b40d4e3" }, { "ImportPath": "github.com/GoogleCloudPlatform/kubernetes/pkg/health", - "Comment": "v0.2-108-g1853c66", - "Rev": "1853c66ddfdfb7d673371e9a2f4be65c066ac81b" + "Comment": "v0.4.4", + "Rev": "efd443bcd6f005566daa85da0a5f0b633b40d4e3" }, { "ImportPath": "github.com/GoogleCloudPlatform/kubernetes/pkg/healthz", - "Comment": "v0.2-108-g1853c66", - "Rev": "1853c66ddfdfb7d673371e9a2f4be65c066ac81b" + "Comment": "v0.4.4", + "Rev": "efd443bcd6f005566daa85da0a5f0b633b40d4e3" }, { "ImportPath": "github.com/GoogleCloudPlatform/kubernetes/pkg/httplog", - "Comment": "v0.2-108-g1853c66", - "Rev": "1853c66ddfdfb7d673371e9a2f4be65c066ac81b" + "Comment": "v0.4.4", + "Rev": "efd443bcd6f005566daa85da0a5f0b633b40d4e3" }, { "ImportPath": "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet", - "Comment": "v0.2-108-g1853c66", - "Rev": "1853c66ddfdfb7d673371e9a2f4be65c066ac81b" + "Comment": "v0.4.4", + "Rev": "efd443bcd6f005566daa85da0a5f0b633b40d4e3" }, { "ImportPath": "github.com/GoogleCloudPlatform/kubernetes/pkg/labels", - "Comment": "v0.2-108-g1853c66", - "Rev": "1853c66ddfdfb7d673371e9a2f4be65c066ac81b" + "Comment": "v0.4.4", + "Rev": "efd443bcd6f005566daa85da0a5f0b633b40d4e3" }, { "ImportPath": "github.com/GoogleCloudPlatform/kubernetes/pkg/master", - "Comment": "v0.2-108-g1853c66", - "Rev": "1853c66ddfdfb7d673371e9a2f4be65c066ac81b" + "Comment": "v0.4.4", + "Rev": "efd443bcd6f005566daa85da0a5f0b633b40d4e3" }, { "ImportPath": "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/binding", - "Comment": "v0.2-108-g1853c66", - "Rev": "1853c66ddfdfb7d673371e9a2f4be65c066ac81b" + "Comment": "v0.4.4", + "Rev": "efd443bcd6f005566daa85da0a5f0b633b40d4e3" }, { "ImportPath": "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/controller", - "Comment": "v0.2-108-g1853c66", - "Rev": "1853c66ddfdfb7d673371e9a2f4be65c066ac81b" + "Comment": "v0.4.4", + "Rev": "efd443bcd6f005566daa85da0a5f0b633b40d4e3" }, { "ImportPath": "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/endpoint", - "Comment": "v0.2-108-g1853c66", - "Rev": "1853c66ddfdfb7d673371e9a2f4be65c066ac81b" + "Comment": "v0.4.4", + "Rev": "efd443bcd6f005566daa85da0a5f0b633b40d4e3" }, { "ImportPath": "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/etcd", - "Comment": "v0.2-108-g1853c66", - "Rev": "1853c66ddfdfb7d673371e9a2f4be65c066ac81b" + "Comment": "v0.4.4", + "Rev": "efd443bcd6f005566daa85da0a5f0b633b40d4e3" }, { "ImportPath": "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/minion", - "Comment": "v0.2-108-g1853c66", - "Rev": "1853c66ddfdfb7d673371e9a2f4be65c066ac81b" + "Comment": "v0.4.4", + "Rev": "efd443bcd6f005566daa85da0a5f0b633b40d4e3" }, { "ImportPath": "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod", - "Comment": "v0.2-108-g1853c66", - "Rev": "1853c66ddfdfb7d673371e9a2f4be65c066ac81b" + "Comment": "v0.4.4", + "Rev": "efd443bcd6f005566daa85da0a5f0b633b40d4e3" }, { "ImportPath": "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service", - "Comment": "v0.2-108-g1853c66", - "Rev": "1853c66ddfdfb7d673371e9a2f4be65c066ac81b" + "Comment": "v0.4.4", + "Rev": "efd443bcd6f005566daa85da0a5f0b633b40d4e3" }, { "ImportPath": "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime", - "Comment": "v0.2-108-g1853c66", - "Rev": "1853c66ddfdfb7d673371e9a2f4be65c066ac81b" + "Comment": "v0.4.4", + "Rev": "efd443bcd6f005566daa85da0a5f0b633b40d4e3" }, { "ImportPath": "github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler", - "Comment": "v0.2-108-g1853c66", - "Rev": "1853c66ddfdfb7d673371e9a2f4be65c066ac81b" + "Comment": "v0.4.4", + "Rev": "efd443bcd6f005566daa85da0a5f0b633b40d4e3" }, { "ImportPath": "github.com/GoogleCloudPlatform/kubernetes/pkg/service", - "Comment": "v0.2-108-g1853c66", - "Rev": "1853c66ddfdfb7d673371e9a2f4be65c066ac81b" + "Comment": "v0.4.4", + "Rev": "efd443bcd6f005566daa85da0a5f0b633b40d4e3" }, { "ImportPath": "github.com/GoogleCloudPlatform/kubernetes/pkg/tools", - "Comment": "v0.2-108-g1853c66", - "Rev": "1853c66ddfdfb7d673371e9a2f4be65c066ac81b" + "Comment": "v0.4.4", + "Rev": "efd443bcd6f005566daa85da0a5f0b633b40d4e3" }, { "ImportPath": "github.com/GoogleCloudPlatform/kubernetes/pkg/util", - "Comment": "v0.2-108-g1853c66", - "Rev": "1853c66ddfdfb7d673371e9a2f4be65c066ac81b" + "Comment": "v0.4.4", + "Rev": "efd443bcd6f005566daa85da0a5f0b633b40d4e3" }, { "ImportPath": "github.com/GoogleCloudPlatform/kubernetes/pkg/version", - "Comment": "v0.2-108-g1853c66", - "Rev": "1853c66ddfdfb7d673371e9a2f4be65c066ac81b" + "Comment": "v0.4.4", + "Rev": "efd443bcd6f005566daa85da0a5f0b633b40d4e3" }, { "ImportPath": "github.com/GoogleCloudPlatform/kubernetes/pkg/volume", - "Comment": "v0.2-108-g1853c66", - "Rev": "1853c66ddfdfb7d673371e9a2f4be65c066ac81b" + "Comment": "v0.4.4", + "Rev": "efd443bcd6f005566daa85da0a5f0b633b40d4e3" }, { "ImportPath": "github.com/GoogleCloudPlatform/kubernetes/pkg/watch", - "Comment": "v0.2-108-g1853c66", - "Rev": "1853c66ddfdfb7d673371e9a2f4be65c066ac81b" + "Comment": "v0.4.4", + "Rev": "efd443bcd6f005566daa85da0a5f0b633b40d4e3" }, { "ImportPath": "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler", - "Comment": "v0.2-108-g1853c66", - "Rev": "1853c66ddfdfb7d673371e9a2f4be65c066ac81b" + "Comment": "v0.4.4", + "Rev": "efd443bcd6f005566daa85da0a5f0b633b40d4e3" }, { "ImportPath": "github.com/coreos/go-etcd/etcd", "Comment": "v0.2.0-rc1-120-g23142f6", "Rev": "23142f6773a676cc2cae8dd0cb90b2ea761c853f" }, + { + "ImportPath": "github.com/elazarl/go-bindata-assetfs", + "Rev": "ae4665cf2d188c65764c73fe4af5378acc549510" + }, { "ImportPath": "github.com/fsouza/go-dockerclient", "Comment": "0.2.1-241-g0dbb508", @@ -181,13 +185,17 @@ }, { "ImportPath": "github.com/google/cadvisor/info", - "Comment": "0.2.0-27-g17b0ec5", - "Rev": "17b0ec576bcbeb321c133e4378dee1e500c9850d" + "Comment": "0.4.0", + "Rev": "5a6d06c02600b1e57e55a9d9f71dbac1bfc9fe6c" }, { "ImportPath": "github.com/mesos/mesos-go/mesos", "Rev": "9c6a968b270c50e35f99d6cf649a18521b76c18a" }, + { + "ImportPath": "github.com/skratchdot/open-golang/open", + "Rev": "ba570a111973b539baf23c918213059543b5bb6e" + }, { "ImportPath": "gopkg.in/v1/yaml", "Rev": "1b9791953ba4027efaeb728c7355e542a203be5e" diff --git a/Makefile b/Makefile index 171807ed..62ab5ad1 100644 --- a/Makefile +++ b/Makefile @@ -6,19 +6,26 @@ mkfile_path := $(abspath $(lastword $(MAKEFILE_LIST))) current_dir := $(patsubst %/,%,$(dir $(mkfile_path))) fail := ${MAKE} --no-print-directory --quiet -f $(current_dir)/Makefile error +KUBE_GO_PACKAGE ?= github.com/GoogleCloudPlatform/kubernetes + K8S_CMD := \ - github.com/GoogleCloudPlatform/kubernetes/cmd/controller-manager \ - github.com/GoogleCloudPlatform/kubernetes/cmd/kubecfg \ - github.com/GoogleCloudPlatform/kubernetes/cmd/proxy + ${KUBE_GO_PACKAGE}/cmd/kubecfg \ + ${KUBE_GO_PACKAGE}/cmd/proxy FRAMEWORK_CMD := \ - github.com/mesosphere/kubernetes-mesos/kubernetes-mesos \ + github.com/mesosphere/kubernetes-mesos/controller-manager \ + github.com/mesosphere/kubernetes-mesos/kubernetes-mesos \ github.com/mesosphere/kubernetes-mesos/kubernetes-executor FRAMEWORK_LIB := \ github.com/mesosphere/kubernetes-mesos/scheduler \ github.com/mesosphere/kubernetes-mesos/service \ + github.com/mesosphere/kubernetes-mesos/master \ github.com/mesosphere/kubernetes-mesos/executor \ github.com/mesosphere/kubernetes-mesos/queue +KUBE_GIT_VERSION_FILE := $(current_dir)/.kube-version + +SHELL := /bin/bash + # a list of upstream projects for which we test the availability of patches PATCH_SCRIPT := $(current_dir)/hack/patches/apply.sh @@ -28,7 +35,7 @@ DESTDIR ?= /target # default build tags TAGS ?= -.PHONY: all error require-godep framework require-vendor proxy install info bootstrap require-gopath format test patch +.PHONY: all error require-godep framework require-vendor proxy install info bootstrap require-gopath format test patch version ifneq ($(WITH_MESOS_DIR),) @@ -50,6 +57,9 @@ WITH_MESOS_CGO_FLAGS := \ endif +export SHELL +export KUBE_GO_PACKAGE + all: patch proxy framework error: @@ -62,8 +72,8 @@ require-godep: require-gopath require-gopath: @test -n "$(GOPATH)" || ${fail} MSG="GOPATH undefined, aborting" -proxy: require-godep - go install $(K8S_CMD) +proxy: require-godep $(KUBE_GIT_VERSION_FILE) + go install -ldflags "$$(cat $(KUBE_GIT_VERSION_FILE))" $(K8S_CMD) require-vendor: @@ -96,5 +106,12 @@ bootstrap: require-godep patch: $(PATCH_SCRIPT) $(PATCH_SCRIPT) +version: $(KUBE_GIT_VERSION_FILE) + +$(KUBE_GIT_VERSION_FILE): require-gopath + @(pkg="$(GOPATH)"; cd "$${pkg%%:*}/src/$(KUBE_GO_PACKAGE)" && \ + source $(current_dir)/hack/kube-version.sh && \ + KUBE_GO_PACKAGE=$(KUBE_GO_PACKAGE) kube::version::ldflags) >$@ + $(PATCH_SCRIPT): test -x $@ || chmod +x $@ diff --git a/controller-manager/controller-manager.go b/controller-manager/controller-manager.go new file mode 100644 index 00000000..0eeb65b2 --- /dev/null +++ b/controller-manager/controller-manager.go @@ -0,0 +1,92 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// The controller manager is responsible for monitoring replication +// controllers, and creating corresponding pods to achieve the desired +// state. It uses the API to listen for new controllers and to create/delete +// pods. +package main + +// HACK(jdef): copy/pasted from k8s /cmd/controller-manager package, hacked to use +// a modified endpoint-controller + +import ( + "flag" + "net" + "net/http" + "strconv" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/controller" + _ "github.com/GoogleCloudPlatform/kubernetes/pkg/healthz" + masterPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/master" + kendpoint "github.com/GoogleCloudPlatform/kubernetes/pkg/service" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/GoogleCloudPlatform/kubernetes/pkg/version/verflag" + "github.com/golang/glog" + + kmendpoint "github.com/mesosphere/kubernetes-mesos/service" +) + +var ( + port = flag.Int("port", masterPkg.ControllerManagerPort, "The port that the controller-manager's http service runs on") + address = util.IP(net.ParseIP("127.0.0.1")) + useHostPortEndpoints = flag.Bool("host_port_endpoints", true, "Map service endpoints to hostIP:hostPort instead of podIP:containerPort. Default true.") + clientConfig = &client.Config{} +) + +func init() { + flag.Var(&address, "address", "The IP address to serve on (set to 0.0.0.0 for all interfaces)") + client.BindClientConfigFlags(flag.CommandLine, clientConfig) +} + +func main() { + flag.Parse() + util.InitLogs() + defer util.FlushLogs() + + verflag.PrintAndExitIfRequested() + + if len(clientConfig.Host) == 0 { + glog.Fatal("usage: controller-manager -master ") + } + + kubeClient, err := client.New(clientConfig) + if err != nil { + glog.Fatalf("Invalid API configuration: %v", err) + } + + go http.ListenAndServe(net.JoinHostPort(address.String(), strconv.Itoa(*port)), nil) + + endpoints := createEndpointController(kubeClient) + go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*10) + + controllerManager := controller.NewReplicationManager(kubeClient) + controllerManager.Run(10 * time.Second) + + select {} +} + +func createEndpointController(client *client.Client) kmendpoint.EndpointController { + if *useHostPortEndpoints { + glog.V(2).Infof("Creating hostIP:hostPort endpoint controller") + return kmendpoint.NewEndpointController(client) + } + glog.V(2).Infof("Creating podIP:containerPort endpoint controller") + stockEndpointController := kendpoint.NewEndpointController(client) + return stockEndpointController +} diff --git a/examples/guestbook/redis-slave/run.sh b/examples/guestbook/redis-slave/run.sh index 103809cd..dec80951 100755 --- a/examples/guestbook/redis-slave/run.sh +++ b/examples/guestbook/redis-slave/run.sh @@ -1,3 +1,3 @@ #!/bin/bash -redis-server --slaveof $SERVICE_HOST $REDISMASTER_SERVICE_PORT +redis-server --slaveof ${REDISMASTER_SERVICE_HOST:-$SERVICE_HOST} $REDISMASTER_SERVICE_PORT diff --git a/executor/server.go b/executor/server.go index 77080a4a..65a49c32 100644 --- a/executor/server.go +++ b/executor/server.go @@ -42,23 +42,22 @@ import ( // Server is a http.Handler which exposes kubelet functionality over HTTP. type Server struct { host HostInterface - updates chan<- interface{} mux *http.ServeMux namespace string } // ListenAndServeKubeletServer initializes a server to respond to HTTP network requests on the Kubelet. -func ListenAndServeKubeletServer(host HostInterface, updates chan<- interface{}, address string, port uint, namespace string) { +func ListenAndServeKubeletServer(host HostInterface, address net.IP, port uint, enableDebuggingHandlers bool, namespace string) error { glog.Infof("Starting to listen on %s:%d", address, port) - handler := NewServer(host, updates, namespace) + handler := NewServer(host, enableDebuggingHandlers, namespace) s := &http.Server{ - Addr: net.JoinHostPort(address, strconv.FormatUint(uint64(port), 10)), + Addr: net.JoinHostPort(address.String(), strconv.FormatUint(uint64(port), 10)), Handler: &handler, ReadTimeout: 10 * time.Second, WriteTimeout: 10 * time.Second, MaxHeaderBytes: 1 << 20, } - s.ListenAndServe() + return s.ListenAndServe() } // HostInterface contains all the kubelet methods required by the server. @@ -68,18 +67,21 @@ type HostInterface interface { GetRootInfo(req *info.ContainerInfoRequest) (*info.ContainerInfo, error) GetMachineInfo() (*info.MachineInfo, error) GetPodInfo(name, uuid string) (api.PodInfo, error) + GetKubeletContainerLogs(podFullName, containerName, tail string, follow bool, stdout, stderr io.Writer) error ServeLogs(w http.ResponseWriter, req *http.Request) } // NewServer initializes and configures a kubelet.Server object to handle HTTP requests. -func NewServer(host HostInterface, updates chan<- interface{}, ns string) Server { +func NewServer(host HostInterface, enableDebuggingHandlers bool, ns string) Server { server := Server{ host: host, - updates: updates, mux: http.NewServeMux(), namespace: ns, } server.InstallDefaultHandlers() + if enableDebuggingHandlers { + server.InstallDebuggingHandlers() + } return server } @@ -89,15 +91,67 @@ func (s *Server) InstallDefaultHandlers() { profile.InstallHandler(s.mux) s.mux.HandleFunc("/podInfo", s.handlePodInfo) s.mux.HandleFunc("/stats/", s.handleStats) - s.mux.HandleFunc("/logs/", s.handleLogs) s.mux.HandleFunc("/spec/", s.handleSpec) } +// InstallDeguggingHandlers registers the HTTP request patterns that serve logs or run commands/containers +func (s *Server) InstallDebuggingHandlers() { + s.mux.HandleFunc("/logs/", s.handleLogs) + s.mux.HandleFunc("/containerLogs/", s.handleContainerLogs) +} + // error serializes an error object into an HTTP response. func (s *Server) error(w http.ResponseWriter, err error) { http.Error(w, fmt.Sprintf("Internal Error: %v", err), http.StatusInternalServerError) } +// handleContainerLogs handles containerLogs request against the Kubelet +func (s *Server) handleContainerLogs(w http.ResponseWriter, req *http.Request) { + defer req.Body.Close() + u, err := url.ParseRequestURI(req.RequestURI) + if err != nil { + s.error(w, err) + return + } + parts := strings.Split(u.Path, "/") + + var podID, containerName string + if len(parts) == 4 { + podID = parts[2] + containerName = parts[3] + } else { + http.Error(w, "Unexpected path for command running", http.StatusBadRequest) + return + } + + if len(podID) == 0 { + http.Error(w, `{"message": "Missing podID."}`, http.StatusBadRequest) + return + } + if len(containerName) == 0 { + http.Error(w, `{"message": "Missing container name."}`, http.StatusBadRequest) + return + } + + uriValues := u.Query() + follow, _ := strconv.ParseBool(uriValues.Get("follow")) + tail := uriValues.Get("tail") + + podFullName := kubelet.GetPodFullName(&kubelet.Pod{Name: podID, Namespace: s.namespace}) + + fw := FlushWriter{writer: w} + if flusher, ok := w.(http.Flusher); ok { + fw.flusher = flusher + } + w.Header().Set("Transfer-Encoding", "chunked") + w.WriteHeader(http.StatusOK) + err = s.host.GetKubeletContainerLogs(podFullName, containerName, tail, follow, &fw, &fw) + if err != nil { + s.error(w, err) + return + } +} + // handlePodInfo handles podInfo requests against the Kubelet. func (s *Server) handlePodInfo(w http.ResponseWriter, req *http.Request) { req.Close = true @@ -194,9 +248,11 @@ func (s *Server) serveStats(w http.ResponseWriter, req *http.Request) { errors.New("pod level status currently unimplemented") case 3: // Backward compatibility without uuid information - stats, err = s.host.GetContainerInfo(components[1], "", components[2], &query) + podFullName := kubelet.GetPodFullName(&kubelet.Pod{Name: components[1], Namespace: s.namespace}) + stats, err = s.host.GetContainerInfo(podFullName, "", components[2], &query) case 4: - stats, err = s.host.GetContainerInfo(components[1], components[2], components[2], &query) + podFullName := kubelet.GetPodFullName(&kubelet.Pod{Name: components[1], Namespace: s.namespace}) + stats, err = s.host.GetContainerInfo(podFullName, components[2], components[2], &query) default: http.Error(w, "unknown resource.", http.StatusNotFound) return @@ -220,3 +276,23 @@ func (s *Server) serveStats(w http.ResponseWriter, req *http.Request) { w.Write(data) return } + +// HACK(jdef): FlushWriter taken from k8s /pkg/kubelet/handlers.go + +// FlushWriter provides wrapper for responseWriter with HTTP streaming capabilities +type FlushWriter struct { + flusher http.Flusher + writer io.Writer +} + +// Write is a FlushWriter implementation of the io.Writer that sends any buffered data to the client. +func (fw *FlushWriter) Write(p []byte) (n int, err error) { + n, err = fw.writer.Write(p) + if err != nil { + return + } + if fw.flusher != nil { + fw.flusher.Flush() + } + return +} diff --git a/hack/kube-version.sh b/hack/kube-version.sh new file mode 100644 index 00000000..20808e55 --- /dev/null +++ b/hack/kube-version.sh @@ -0,0 +1,128 @@ +#!/bin/bash + +# source: https://raw.githubusercontent.com/GoogleCloudPlatform/kubernetes/v0.5.3/hack/lib/version.sh + +# Copyright 2014 Google Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# ----------------------------------------------------------------------------- +# Version management helpers. These functions help to set, save and load the +# following variables: +# +# KUBE_GIT_COMMIT - The git commit id corresponding to this +# source code. +# KUBE_GIT_TREE_STATE - "clean" indicates no changes since the git commit id +# "dirty" indicates source code changes after the git commit id +# KUBE_GIT_VERSION - "vX.Y" used to indicate the last release version. +# KUBE_GIT_MAJOR - The major part of the version +# KUBE_GIT_MINOR - The minor component of the version + +# Grovels through git to set a set of env variables. +# +# If KUBE_GIT_VERSION_FILE, this function will load from that file instead of +# querying git. +kube::version::get_version_vars() { + if [[ -n ${KUBE_GIT_VERSION_FILE-} ]]; then + kube::version::load_version_vars "${KUBE_GIT_VERSION_FILE}" + return + fi + + local git=(git --work-tree "${KUBE_ROOT}") + + if [[ -n ${KUBE_GIT_COMMIT-} ]] || KUBE_GIT_COMMIT=$("${git[@]}" rev-parse "HEAD^{commit}" 2>/dev/null); then + if [[ -z ${KUBE_GIT_TREE_STATE-} ]]; then + # Check if the tree is dirty. default to dirty + if git_status=$("${git[@]}" status --porcelain 2>/dev/null) && [[ -z ${git_status} ]]; then + KUBE_GIT_TREE_STATE="clean" + else + KUBE_GIT_TREE_STATE="dirty" + fi + fi + + # Use git describe to find the version based on annotated tags. + if [[ -n ${KUBE_GIT_VERSION-} ]] || KUBE_GIT_VERSION=$("${git[@]}" describe --tags --abbrev=14 "${KUBE_GIT_COMMIT}^{commit}" 2>/dev/null); then + if [[ "${KUBE_GIT_TREE_STATE}" == "dirty" ]]; then + # git describe --dirty only considers changes to existing files, but + # that is problematic since new untracked .go files affect the build, + # so use our idea of "dirty" from git status instead. + KUBE_GIT_VERSION+="-dirty" + fi + + # Try to match the "git describe" output to a regex to try to extract + # the "major" and "minor" versions and whether this is the exact tagged + # version or whether the tree is between two tagged versions. + if [[ "${KUBE_GIT_VERSION}" =~ ^v([0-9]+)\.([0-9]+)([.-].*)?$ ]]; then + KUBE_GIT_MAJOR=${BASH_REMATCH[1]} + KUBE_GIT_MINOR=${BASH_REMATCH[2]} + if [[ -n "${BASH_REMATCH[3]}" ]]; then + KUBE_GIT_MINOR+="+" + fi + fi + fi + fi +} + +# Saves the environment flags to $1 +kube::version::save_version_vars() { + local version_file=${1-} + [[ -n ${version_file} ]] || { + echo "!!! Internal error. No file specified in kube::version::save_version_vars" + return 1 + } + + cat <"${version_file}" +KUBE_GIT_COMMIT='${KUBE_GIT_COMMIT-}' +KUBE_GIT_TREE_STATE='${KUBE_GIT_TREE_STATE-}' +KUBE_GIT_VERSION='${KUBE_GIT_VERSION-}' +KUBE_GIT_MAJOR='${KUBE_GIT_MAJOR-}' +KUBE_GIT_MINOR='${KUBE_GIT_MINOR-}' +EOF +} + +# Loads up the version variables from file $1 +kube::version::load_version_vars() { + local version_file=${1-} + [[ -n ${version_file} ]] || { + echo "!!! Internal error. No file specified in kube::version::load_version_vars" + return 1 + } + + source "${version_file}" +} + +# Prints the value that needs to be passed to the -ldflags parameter of go build +# in order to set the Kubernetes based on the git tree status. +kube::version::ldflags() { + kube::version::get_version_vars + + local -a ldflags=() + if [[ -n ${KUBE_GIT_COMMIT-} ]]; then + ldflags+=(-X "${KUBE_GO_PACKAGE}/pkg/version.gitCommit" "${KUBE_GIT_COMMIT}") + ldflags+=(-X "${KUBE_GO_PACKAGE}/pkg/version.gitTreeState" "${KUBE_GIT_TREE_STATE}") + fi + + if [[ -n ${KUBE_GIT_VERSION-} ]]; then + ldflags+=(-X "${KUBE_GO_PACKAGE}/pkg/version.gitVersion" "${KUBE_GIT_VERSION}") + fi + + if [[ -n ${KUBE_GIT_MAJOR-} && -n ${KUBE_GIT_MINOR-} ]]; then + ldflags+=( + -X "${KUBE_GO_PACKAGE}/pkg/version.gitMajor" "${KUBE_GIT_MAJOR}" + -X "${KUBE_GO_PACKAGE}/pkg/version.gitMinor" "${KUBE_GIT_MINOR}" + ) + fi + + # The -ldflags parameter takes a single string, so join the output. + echo "${ldflags[*]-}" +} diff --git a/hack/patches/k8s---issue77.patch b/hack/patches/k8s---issue77.patch index 2da147f7..29439a5a 100644 --- a/hack/patches/k8s---issue77.patch +++ b/hack/patches/k8s---issue77.patch @@ -1,8 +1,28 @@ diff --git a/pkg/kubelet/dockertools/docker.go b/pkg/kubelet/dockertools/docker.go -index b42e2ce..b0f7808 100644 +index 2f1bb12..46464ce 100644 --- a/pkg/kubelet/dockertools/docker.go +++ b/pkg/kubelet/dockertools/docker.go -@@ -215,6 +215,9 @@ func GetDockerPodInfo(client DockerInterface, podFullName, uuid string) (api.Pod +@@ -208,6 +208,9 @@ func GetKubeletDockerContainers(client DockerInterface, allContainers bool) (Doc + } + for i := range containers { + container := &containers[i] ++ if len(container.Names) == 0 { ++ continue ++ } + // Skip containers that we didn't create to allow users to manually + // spin up their own containers if they want. + // TODO(dchen1107): Remove the old separator "--" by end of Oct +@@ -230,6 +233,9 @@ func GetRecentDockerContainersWithNameAndUUID(client DockerInterface, podFullNam + return nil, err + } + for _, dockerContainer := range containers { ++ if len(dockerContainer.Names) == 0 { ++ continue ++ } + dockerPodName, dockerUUID, dockerContainerName, _ := ParseDockerName(dockerContainer.Names[0]) + if dockerPodName != podFullName { + continue +@@ -340,6 +346,9 @@ func GetDockerPodInfo(client DockerInterface, manifest api.ContainerManifest, po } for _, value := range containers { diff --git a/kubernetes-executor/main.go b/kubernetes-executor/main.go index 135096e0..c0927367 100644 --- a/kubernetes-executor/main.go +++ b/kubernetes-executor/main.go @@ -2,20 +2,25 @@ package main import ( "bufio" - "encoding/json" "flag" "io" - "io/ioutil" + "math/rand" + "net" + "net/http" "os" "os/exec" + "path" "strings" "time" - "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/capabilities" + "github.com/GoogleCloudPlatform/kubernetes/pkg/health" + _ "github.com/GoogleCloudPlatform/kubernetes/pkg/healthz" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" kconfig "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/config" - "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" + "github.com/GoogleCloudPlatform/kubernetes/pkg/master" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/GoogleCloudPlatform/kubernetes/pkg/version/verflag" "github.com/coreos/go-etcd/etcd" "github.com/fsouza/go-dockerclient" log "github.com/golang/glog" @@ -23,22 +28,35 @@ import ( "github.com/mesosphere/kubernetes-mesos/executor" ) -var ( - syncFrequency = flag.Duration("sync_frequency", 10*time.Second, "Max period between synchronizing running containers and config") - hostnameOverride = flag.String("hostname_override", "", "If non-empty, will use this string as identification instead of the actual hostname.") - dockerEndpoint = flag.String("docker_endpoint", "", "If non-empty, use this for the docker endpoint to communicate with") - etcdServerList util.StringList - allowPrivileged = flag.Bool("allow_privileged", false, "If true, allow containers to request privileged mode.") +const ( + POD_NS string = "mesos" // k8s pod namespace + defaultRootDir = "/var/lib/kubelet" ) -const ( - POD_NS string = "mesos" // k8s pod namespace +var ( + syncFrequency = flag.Duration("sync_frequency", 10*time.Second, "Max period between synchronizing running containers and config") + address = util.IP(net.ParseIP("0.0.0.0")) + port = flag.Uint("port", master.KubeletPort, "The port for the info server to serve on") // TODO(jdef): use kmmaster.KubeletExecutorPort + hostnameOverride = flag.String("hostname_override", "", "If non-empty, will use this string as identification instead of the actual hostname.") + networkContainerImage = flag.String("network_container_image", kubelet.NetworkContainerImage, "The image that network containers in each pod will use.") + dockerEndpoint = flag.String("docker_endpoint", "", "If non-empty, use this for the docker endpoint to communicate with") + etcdServerList util.StringList + etcdConfigFile = flag.String("etcd_config", "", "The config file for the etcd client. Mutually exclusive with -etcd_servers") + rootDirectory = flag.String("root_dir", defaultRootDir, "Directory path for managing kubelet files (volume mounts,etc).") + allowPrivileged = flag.Bool("allow_privileged", false, "If true, allow containers to request privileged mode. [default=false]") + registryPullQPS = flag.Float64("registry_qps", 0.0, "If > 0, limit registry pull QPS to this value. If 0, unlimited. [default=0.0]") + registryBurst = flag.Int("registry_burst", 10, "Maximum size of a bursty pulls, temporarily allows pulls to burst to this number, while still not exceeding registry_qps. Only used if --registry_qps > 0") + enableDebuggingHandlers = flag.Bool("enable_debugging_handlers", true, "Enables server endpoints for log collection and local running of containers and commands. Default true.") + minimumGCAge = flag.Duration("minimum_container_ttl_duration", 0, "Minimum age for a finished container before it is garbage collected. Examples: '300ms', '10s' or '2h45m'") + maxContainerCount = flag.Int("maximum_dead_containers_per_container", 5, "Maximum number of old instances of a container to retain per container. Each container takes up some disk space. Default: 5.") ) -func main() { +func init() { flag.Var(&etcdServerList, "etcd_servers", "List of etcd servers to watch (http://ip:port), comma separated") + flag.Var(&address, "address", "The IP address for the info and proxy servers to serve on. Default to 0.0.0.0.") +} - flag.Parse() +func getDockerEndpoint() string { var endpoint string if len(*dockerEndpoint) > 0 { endpoint = *dockerEndpoint @@ -48,35 +66,76 @@ func main() { endpoint = "unix:///var/run/docker.sock" } log.Infof("Connecting to docker on %s", endpoint) - dockerClient, err := docker.NewClient(endpoint) - if err != nil { - log.Fatal("Couldn't connnect to docker.") - } - hostname := *hostnameOverride - if hostname == "" { + return endpoint +} + +func getHostname() string { + hostname := []byte(*hostnameOverride) + if string(hostname) == "" { // Note: We use exec here instead of os.Hostname() because we // want the FQDN, and this is the easiest way to get it. - fqdnHostname, hostnameErr := exec.Command("hostname", "-f").Output() + fqdn, err := exec.Command("hostname", "-f").Output() if err != nil { - log.Fatalf("Couldn't determine hostname: %v", hostnameErr) + log.Fatalf("Couldn't determine hostname: %v", err) } + hostname = fqdn + } + return strings.TrimSpace(string(hostname)) +} - // hostname(1) returns a terminating newline we need to strip. - hostname = string(fqdnHostname) - if len(hostname) > 0 { - hostname = hostname[0 : len(hostname)-1] - } +func main() { + + flag.Parse() + util.InitLogs() // TODO(jdef) figure out where this actually sends logs by default + defer util.FlushLogs() + rand.Seed(time.Now().UTC().UnixNano()) + + verflag.PrintAndExitIfRequested() + + etcd.SetLogger(util.NewLogger("etcd ")) + + capabilities.Initialize(capabilities.Capabilities{ + AllowPrivileged: *allowPrivileged, + }) + + dockerClient, err := docker.NewClient(getDockerEndpoint()) + if err != nil { + log.Fatal("Couldn't connnect to docker.") } + hostname := getHostname() + + if *rootDirectory == "" { + log.Fatal("Invalid root directory path.") + } + *rootDirectory = path.Clean(*rootDirectory) + if err := os.MkdirAll(*rootDirectory, 0750); err != nil { + log.Warningf("Error creating root directory: %v", err) + } + + // source of all configuration cfg := kconfig.NewPodConfig(kconfig.PodConfigNotificationSnapshotAndUpdates) - var etcdClient tools.EtcdClient + + // k8sm: no other pod configuration sources supported in this hybrid kubelet-executor + + // define etcd config source and initialize etcd client + var etcdClient *etcd.Client if len(etcdServerList) > 0 { - log.Infof("Connecting to etcd at %v", etcdServerList) etcdClient = etcd.NewClient(etcdServerList) + } else if *etcdConfigFile != "" { + var err error + etcdClient, err = etcd.NewClientFromFile(*etcdConfigFile) + if err != nil { + log.Fatalf("Error with etcd config file: %v", err) + } } - // Hack: Destroy existing k8s containers for now - we don't know how to reconcile yet. + if etcdClient != nil { + log.Infof("Connected to etcd at %v", etcdClient.GetCluster()) + } + + // TODO(???): Destroy existing k8s containers for now - we don't know how to reconcile yet. containers, err := dockerClient.ListContainers(docker.ListContainersOptions{All: true}) if err == nil { for _, container := range containers { @@ -94,7 +153,49 @@ func main() { } } - kl := kubelet.NewMainKubelet(hostname, dockerClient, nil, etcdClient, "/", *syncFrequency, *allowPrivileged) + // TODO(k8s): block until all sources have delivered at least one update to the channel, or break the sync loop + // up into "per source" synchronizations + + kl := kubelet.NewMainKubelet( + hostname, + dockerClient, + etcdClient, + *rootDirectory, + *networkContainerImage, + *syncFrequency, + float32(*registryPullQPS), + *registryBurst, + *minimumGCAge, + *maxContainerCount) + go func() { + util.Forever(func() { + err := kl.GarbageCollectContainers() + if err != nil { + log.Errorf("Garbage collect failed: %v", err) + } + }, time.Minute*1) + }() + + /* TODO(jdef): enable cadvisor integration + go func() { + defer util.HandleCrash() + // TODO(k8s): Monitor this connection, reconnect if needed? + log.V(1).Infof("Trying to create cadvisor client.") + cadvisorClient, err := cadvisor.NewClient("http://127.0.0.1:4194") + if err != nil { + log.Errorf("Error on creating cadvisor client: %v", err) + return + } + log.V(1).Infof("Successfully created cadvisor client.") + kl.SetCadvisorClient(cadvisorClient) + }() + */ + + // TODO(k8s): These should probably become more plugin-ish: register a factory func + // in each checker's init(), iterate those here. + health.AddHealthChecker(health.NewExecHealthChecker(kl)) + health.AddHealthChecker(health.NewHTTPHealthChecker(&http.Client{})) + health.AddHealthChecker(&health.TCPHealthChecker{}) driver := new(mesos.MesosExecutorDriver) kubeletExecutor := executor.New(driver, kl, cfg.Channel(POD_NS), POD_NS) @@ -107,23 +208,52 @@ func main() { log.V(2).Infof("Executor driver is running!") driver.Start() - log.V(2).Infof("Starting kubelet server...") + // TODO(who?) Recover running containers from check pointed pod list. + // start the kubelet + go util.Forever(func() { kl.Run(cfg.Updates()) }, 0) + + log.Infof("Starting kubelet server...") go util.Forever(func() { - // TODO(nnielsen): Don't hardwire port, but use port from - // resource offer. - executor.ListenAndServeKubeletServer(kl, cfg.Channel("http"), hostname, 10250, POD_NS) - }, 1*time.Second) + // TODO(nnielsen): Don't hardwire port, but use port from resource offer. + log.Error(executor.ListenAndServeKubeletServer(kl, net.IP(address), *port, *enableDebuggingHandlers, POD_NS)) + }, 0) + + go runProxyService() + + /* + // TODO(nnielsen): Factor check-pointing into subsystem. + dat, err := ioutil.ReadFile("/tmp/kubernetes-pods") + if err == nil { + var target []api.PodInfo + err := json.Unmarshal(dat, &target) + if err == nil { + log.Infof("Checkpoint: '%v'", target) + } + } + */ - log.V(2).Infof("Starting proxy process...") - var cmd *exec.Cmd + driver.Join() +} + +// this function blocks as long as the proxy service is running; intended to be +// executed asynchronously. +func runProxyService() { + // TODO(jdef): would be nice if we could run the proxy via an in-memory + // kubelet config source (in case it crashes, kubelet would restart it); + // not sure that k8s supports host-networking space for pods + log.Infof("Starting proxy process...") + + args := []string{"-bind_address=" + address.String(), "-logtostderr=true", "-v=1"} if len(etcdServerList) > 0 { etcdServerArguments := strings.Join(etcdServerList, ",") - cmd = exec.Command("./proxy", "-etcd_servers="+etcdServerArguments, "-logtostderr=true", "-v=1") - } else { - cmd = exec.Command("./proxy", "-logtostderr=true", "-v=1") + args = append(args, "-etcd_servers="+etcdServerArguments) + } else if *etcdConfigFile != "" { + args = append(args, "-etcd_config="+*etcdConfigFile) } - _, err = cmd.StdoutPipe() + //TODO(jdef): don't hardcode name of the proxy executable here + cmd := exec.Command("./proxy", args...) + _, err := cmd.StdoutPipe() if err != nil { log.Fatal(err) } @@ -142,25 +272,9 @@ func main() { if err := cmd.Start(); err != nil { log.Fatal(err) } - go io.Copy(writer, proxylogs) - - // TODO(nnielsen): Factor check-pointing into subsystem. - dat, err := ioutil.ReadFile("/tmp/kubernetes-pods") - if err == nil { - var target []api.PodInfo - err := json.Unmarshal(dat, &target) - if err == nil { - log.Infof("Checkpoint: '%v'", target) - } - } - // Recover running containers from check pointed pod list. - - go util.Forever(func() { kl.Run(cfg.Updates()) }, 0) - - driver.Join() - - log.V(2).Infof("Cleaning up proxy process...") - - // Clean up proxy process - cmd.Process.Kill() + defer func() { + log.V(2).Infof("Cleaning up proxy process...") + cmd.Process.Kill() + }() + io.Copy(writer, proxylogs) } diff --git a/kubernetes-mesos/main.go b/kubernetes-mesos/main.go index 9e8b2a8d..7ffeb564 100644 --- a/kubernetes-mesos/main.go +++ b/kubernetes-mesos/main.go @@ -29,141 +29,172 @@ import ( "code.google.com/p/goprotobuf/proto" "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" + "github.com/GoogleCloudPlatform/kubernetes/pkg/auth/authenticator/bearertoken" + "github.com/GoogleCloudPlatform/kubernetes/pkg/auth/authenticator/tokenfile" + "github.com/GoogleCloudPlatform/kubernetes/pkg/auth/handlers" + "github.com/GoogleCloudPlatform/kubernetes/pkg/capabilities" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" - "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" "github.com/GoogleCloudPlatform/kubernetes/pkg/master" - "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/binding" - "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/controller" - "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/etcd" - "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/minion" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod" - "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service" - "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" - kscheduler "github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler" - kendpoint "github.com/GoogleCloudPlatform/kubernetes/pkg/service" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" + "github.com/GoogleCloudPlatform/kubernetes/pkg/ui" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/GoogleCloudPlatform/kubernetes/pkg/version/verflag" plugin "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler" - goetcd "github.com/coreos/go-etcd/etcd" + "github.com/coreos/go-etcd/etcd" log "github.com/golang/glog" "github.com/mesos/mesos-go/mesos" + kmmaster "github.com/mesosphere/kubernetes-mesos/master" _ "github.com/mesosphere/kubernetes-mesos/profile" kmscheduler "github.com/mesosphere/kubernetes-mesos/scheduler" - kmendpoint "github.com/mesosphere/kubernetes-mesos/service" ) var ( - port = flag.Uint("port", 8888, "The port to listen on. Default 8888.") - address = flag.String("address", "127.0.0.1", "The address on the local server to listen to. Default 127.0.0.1") - apiPrefix = flag.String("api_prefix", "/api/v1beta1", "The prefix for API requests on the server. Default '/api/v1beta1'") - mesosMaster = flag.String("mesos_master", "localhost:5050", "Location of leading Mesos master") - executorPath = flag.String("executor_path", "", "Location of the kubernetes executor executable") - proxyPath = flag.String("proxy_path", "", "Location of the kubernetes proxy executable") - minionPort = flag.Uint("minion_port", 10250, "The port at which kubelet will be listening on the minions.") - useHostPortEndpoints = flag.Bool("host_port_endpoints", true, "Map service endpoints to hostIP:hostPort instead of podIP:containerPort. Default true.") - etcdServerList util.StringList + port = flag.Uint("port", 8888, "The port to listen on. Default 8888.") + address = util.IP(net.ParseIP("127.0.0.1")) + apiPrefix = flag.String("api_prefix", "/api", "The prefix for API requests on the server. Default '/api'") + storageVersion = flag.String("storage_version", "", "The version to store resources with. Defaults to server preferred") + minionPort = flag.Uint("minion_port", 10250, "The port at which kubelet will be listening on the minions. Default 10250.") + healthCheckMinions = flag.Bool("health_check_minions", true, "If true, health check minions and filter unhealthy ones. Default true.") + minionCacheTTL = flag.Duration("minion_cache_ttl", 30*time.Second, "Duration of time to cache minion information. Default 30 seconds.") + eventTTL = flag.Duration("event_ttl", 48*time.Hour, "Amount of time to retain events. Default 2 days.") + tokenAuthFile = flag.String("token_auth_file", "", "If set, the file that will be used to secure the API server via token authentication.") + etcdServerList util.StringList + etcdConfigFile = flag.String("etcd_config", "", "The config file for the etcd client. Mutually exclusive with -etcd_servers.") + corsAllowedOriginList util.StringList + allowPrivileged = flag.Bool("allow_privileged", false, "If true, allow privileged containers. Default false.") + enableLogsSupport = flag.Bool("enable_logs_support", true, "Enables server endpoint for log collection. Default true.") + mesosMaster = flag.String("mesos_master", "localhost:5050", "Location of leading Mesos master. Default localhost:5050.") + executorPath = flag.String("executor_path", "", "Location of the kubernetes executor executable") + proxyPath = flag.String("proxy_path", "", "Location of the kubernetes proxy executable") ) const ( - artifactPort = 9000 - cachePeriod = 10 * time.Second - syncPeriod = 30 * time.Second - httpReadTimeout = 10 * time.Second - httpWriteTimeout = 10 * time.Second + artifactPort = 9000 // port of the service that services mesos artifacts (executor); TODO(jdef): make this configurable + httpReadTimeout = 10 * time.Second // k8s api server config: maximum duration before timing out read of the request + httpWriteTimeout = 10 * time.Second // k8s api server config: maximum duration before timing out write of the response ) func init() { + flag.Var(&address, "address", "The IP address on to serve on (set to 0.0.0.0 for all interfaces). Default 127.0.0.1.") flag.Var(&etcdServerList, "etcd_servers", "Servers for the etcd (http://ip:port), comma separated") + flag.Var(&corsAllowedOriginList, "cors_allowed_origins", "List of allowed origins for CORS, comma separated. An allowed origin can be a regular expression to support subdomain matching. If this list is empty CORS will not be enabled.") } -type kubernetesMaster struct { - podRegistry pod.Registry - controllerRegistry controller.Registry - serviceRegistry service.Registry - minionRegistry minion.Registry - bindingRegistry binding.Registry - storage map[string]apiserver.RESTStorage - client *client.Client - scheduler *kmscheduler.KubernetesScheduler -} - -// Copied from cmd/apiserver.go -func main() { - flag.Parse() - util.InitLogs() - defer util.FlushLogs() - - if len(etcdServerList) <= 0 { - log.Fatal("No etcd severs specified!") - } - - serveExecutorArtifact := func(path string) string { - serveFile := func(pattern string, filename string) { - http.HandleFunc(pattern, func(w http.ResponseWriter, r *http.Request) { - http.ServeFile(w, r, filename) - }) - } - - // Create base path (http://foobar:5000/) - pathSplit := strings.Split(path, "/") - var base string - if len(pathSplit) > 0 { - base = pathSplit[len(pathSplit)-1] - } else { - base = path +func newEtcd(etcdConfigFile string, etcdServerList util.StringList) (helper tools.EtcdHelper, err error) { + var client tools.EtcdGetSet + if etcdConfigFile != "" { + client, err = etcd.NewClientFromFile(etcdConfigFile) + if err != nil { + return helper, err } - serveFile("/"+base, path) + } else { + client = etcd.NewClient(etcdServerList) + } - hostURI := fmt.Sprintf("http://%s:%d/%s", *address, artifactPort, base) - log.V(2).Infof("Hosting artifact '%s' at '%s'", path, hostURI) + return master.NewEtcdHelper(client, *storageVersion) +} - return hostURI +// returns (downloadURI, basename(path)) +func serveExecutorArtifact(path string) (*string, string) { + serveFile := func(pattern string, filename string) { + http.HandleFunc(pattern, func(w http.ResponseWriter, r *http.Request) { + http.ServeFile(w, r, filename) + }) } - executorURI := serveExecutorArtifact(*executorPath) - proxyURI := serveExecutorArtifact(*proxyPath) + // Create base path (http://foobar:5000/) + pathSplit := strings.Split(path, "/") + var base string + if len(pathSplit) > 0 { + base = pathSplit[len(pathSplit)-1] + } else { + base = path + } + serveFile("/"+base, path) - go http.ListenAndServe(fmt.Sprintf("%s:%d", *address, artifactPort), nil) + hostURI := fmt.Sprintf("http://%s:%d/%s", address.String(), artifactPort, base) + log.V(2).Infof("Hosting artifact '%s' at '%s'", path, hostURI) - podInfoGetter := &client.HTTPPodInfoGetter{ - Client: http.DefaultClient, - Port: *minionPort, - } + return &hostURI, base +} - client, err := client.New("http://"+net.JoinHostPort(*address, strconv.Itoa(int(*port))), nil) - if err != nil { - log.Fatal(err) - } +func prepareExecutorInfo() *mesos.ExecutorInfo { + executorUris := []*mesos.CommandInfo_URI{} + uri, _ := serveExecutorArtifact(*proxyPath) + executorUris = append(executorUris, &mesos.CommandInfo_URI{Value: uri}) + uri, executorCmd := serveExecutorArtifact(*executorPath) + executorUris = append(executorUris, &mesos.CommandInfo_URI{Value: uri}) - executorCommand := "./kubernetes-executor -v=2" + //TODO(jdef): provide some way (env var?) for user's to customize executor config + //TODO(jdef): set -hostname_override and -address to 127.0.0.1 if `address` is 127.0.0.1 + executorCommand := fmt.Sprintf("./%s -v=2 -hostname_override=0.0.0.0", executorCmd) if len(etcdServerList) > 0 { etcdServerArguments := strings.Join(etcdServerList, ",") - executorCommand = "./kubernetes-executor -v=2 -hostname_override=0.0.0.0 -etcd_servers=" + etcdServerArguments + executorCommand = fmt.Sprintf("%s -etcd_servers=%s", executorCommand, etcdServerArguments) + } else { + uri, basename := serveExecutorArtifact(*etcdConfigFile) + executorUris = append(executorUris, &mesos.CommandInfo_URI{Value: uri}) + executorCommand = fmt.Sprintf("%s -etcd_config=./%s", executorCommand, basename) } + go http.ListenAndServe(fmt.Sprintf("%s:%d", address.String(), artifactPort), nil) + log.V(2).Info("Serving executor artifacts...") + // Create mesos scheduler driver. - executor := &mesos.ExecutorInfo{ + return &mesos.ExecutorInfo{ ExecutorId: &mesos.ExecutorID{Value: proto.String("KubeleteExecutorID")}, Command: &mesos.CommandInfo{ Value: proto.String(executorCommand), - Uris: []*mesos.CommandInfo_URI{ - {Value: &executorURI}, - {Value: &proxyURI}, - }, + Uris: executorUris, }, Name: proto.String("Kubelet Executor"), Source: proto.String("kubernetes"), } +} + +// Copied from cmd/apiserver.go +func main() { + flag.Parse() + util.InitLogs() + defer util.FlushLogs() + + verflag.PrintAndExitIfRequested() + + if (*etcdConfigFile != "" && len(etcdServerList) != 0) || (*etcdConfigFile == "" && len(etcdServerList) == 0) { + log.Fatalf("specify either -etcd_servers or -etcd_config") + } + + capabilities.Initialize(capabilities.Capabilities{ + AllowPrivileged: *allowPrivileged, + }) - etcdClient := goetcd.NewClient(etcdServerList) - helper := tools.EtcdHelper{ - etcdClient, - runtime.DefaultCodec, - runtime.DefaultResourceVersioner, + // TODO(nnielsen): Using default pod info getter until + // MesosPodInfoGetter supports network containers. + // podInfoGetter := MesosPodInfoGetter.New(mesosPodScheduler) + podInfoGetter := &client.HTTPPodInfoGetter{ + Client: http.DefaultClient, + Port: *minionPort, } - serviceRegistry := etcd.NewRegistry(etcdClient) - mesosPodScheduler := kmscheduler.New(executor, kmscheduler.FCFSScheduleFunc, client, helper, serviceRegistry) + // TODO(k8s): expose same flags as client.BindClientConfigFlags but for a server + clientConfig := &client.Config{ + Host: net.JoinHostPort(address.String(), strconv.Itoa(int(*port))), + Version: *storageVersion, + } + client, err := client.New(clientConfig) + if err != nil { + log.Fatalf("Invalid server address: %v", err) + } + + helper, err := newEtcd(*etcdConfigFile, etcdServerList) + if err != nil { + log.Fatalf("Invalid storage version or misconfigured etcd: %v", err) + } + + // Create mesos scheduler driver. + executor := prepareExecutorInfo() + mesosPodScheduler := kmscheduler.New(executor, kmscheduler.FCFSScheduleFunc, client, helper) driver := &mesos.MesosSchedulerDriver{ Master: *mesosMaster, Framework: mesos.FrameworkInfo{ @@ -172,89 +203,66 @@ func main() { }, Scheduler: mesosPodScheduler, } - mesosPodScheduler.Driver = driver + m := kmmaster.New(&kmmaster.Config{ + Client: client, + Cloud: &kmscheduler.MesosCloud{mesosPodScheduler}, + EtcdHelper: helper, + HealthCheckMinions: *healthCheckMinions, + MinionCacheTTL: *minionCacheTTL, + EventTTL: *eventTTL, + PodInfoGetter: podInfoGetter, + PRFactory: func() pod.Registry { return mesosPodScheduler }, + }) + mesosPodScheduler.Init(driver, m.GetManifestFactory()) driver.Init() defer driver.Destroy() - - mesosPodScheduler.Init() go driver.Start() - log.V(2).Info("Serving executor artifacts...") - // TODO(nnielsen): Using default pod info getter until - // MesosPodInfoGetter supports network containers. - - // podInfoGetter := MesosPodInfoGetter.New(mesosPodScheduler) + //TODO(jdef): upstream, this runs as a separate process... but not in this distro yet + plugin.New(mesosPodScheduler.NewPluginConfig()).Run() - m := newKubernetesMaster(mesosPodScheduler, &master.Config{ - Client: client, - Cloud: &kmscheduler.MesosCloud{mesosPodScheduler}, - Minions: nil, - PodInfoGetter: podInfoGetter, - EtcdServers: etcdServerList, - }, etcdClient, serviceRegistry) - log.Fatal(m.run(net.JoinHostPort(*address, strconv.Itoa(int(*port))), *apiPrefix, helper.Codec)) + log.Fatal(run(m, net.JoinHostPort(address.String(), strconv.Itoa(int(*port))))) } -func newKubernetesMaster(scheduler *kmscheduler.KubernetesScheduler, c *master.Config, etcdClient tools.EtcdClient, sr service.Registry) *kubernetesMaster { - var m *kubernetesMaster +// Run begins serving the Kubernetes API. It never returns. +func run(m *kmmaster.Master, myAddress string) error { + mux := http.NewServeMux() + apiserver.NewAPIGroup(m.API_v1beta1()).InstallREST(mux, *apiPrefix+"/v1beta1") + apiserver.NewAPIGroup(m.API_v1beta2()).InstallREST(mux, *apiPrefix+"/v1beta2") + apiserver.InstallSupport(mux) + if *enableLogsSupport { + apiserver.InstallLogsSupport(mux) + } + ui.InstallSupport(mux) - minionRegistry := kmscheduler.NewCloudRegistry(c.Cloud) + handler := http.Handler(mux) - m = &kubernetesMaster{ - podRegistry: scheduler, - controllerRegistry: etcd.NewRegistry(etcdClient), - serviceRegistry: sr, - minionRegistry: minionRegistry, - bindingRegistry: etcd.NewRegistry(etcdClient), - client: c.Client, - scheduler: scheduler, + if len(corsAllowedOriginList) > 0 { + allowedOriginRegexps, err := util.CompileRegexps(corsAllowedOriginList) + if err != nil { + log.Fatalf("Invalid CORS allowed origin, --cors_allowed_origins flag was set to %v - %v", strings.Join(corsAllowedOriginList, ","), err) + } + handler = apiserver.CORS(handler, allowedOriginRegexps, nil, nil, "true") } - m.init(scheduler, c.Cloud, c.PodInfoGetter) - return m -} - -func (m *kubernetesMaster) init(scheduler kscheduler.Scheduler, cloud cloudprovider.Interface, podInfoGetter client.PodInfoGetter) { - podCache := master.NewPodCache(podInfoGetter, m.podRegistry) - go util.Forever(func() { podCache.UpdateAllContainers() }, cachePeriod) - m.storage = map[string]apiserver.RESTStorage{ - "pods": pod.NewREST(&pod.RESTConfig{ - CloudProvider: cloud, - PodCache: podCache, - PodInfoGetter: podInfoGetter, - Registry: m.podRegistry, - }), - "replicationControllers": controller.NewREST(m.controllerRegistry, m.podRegistry), - "services": service.NewREST(m.serviceRegistry, cloud, m.minionRegistry), - "minions": minion.NewREST(m.minionRegistry), - // TODO: should appear only in scheduler API group. - "bindings": binding.NewREST(m.bindingRegistry), + if len(*tokenAuthFile) != 0 { + auth, err := tokenfile.New(*tokenAuthFile) + if err != nil { + log.Fatalf("Unable to load the token authentication file '%s': %v", *tokenAuthFile, err) + } + userContexts := handlers.NewUserRequestContext() + handler = handlers.NewRequestAuthenticator(userContexts, bearertoken.New(auth), handlers.Unauthorized, handler) } -} -// Run begins serving the Kubernetes API. It never returns. -func (m *kubernetesMaster) run(myAddress, apiPrefix string, codec runtime.Codec) error { - endpoints := m.createEndpointController() - go util.Forever(func() { endpoints.SyncServiceEndpoints() }, syncPeriod) - plugin.New(m.scheduler.NewPluginConfig()).Run() + handler = apiserver.RecoverPanics(handler) s := &http.Server{ Addr: myAddress, - Handler: apiserver.Handle(m.storage, codec, apiPrefix), + Handler: handler, ReadTimeout: httpReadTimeout, WriteTimeout: httpWriteTimeout, MaxHeaderBytes: 1 << 20, } return s.ListenAndServe() } - -func (m *kubernetesMaster) createEndpointController() kmendpoint.EndpointController { - if *useHostPortEndpoints { - log.V(2).Infof("Creating hostIP:hostPort endpoint controller") - return kmendpoint.NewEndpointController(m.serviceRegistry, m.client) - } - log.V(2).Infof("Creating podIP:containerPort endpoint controller") - stockEndpointController := kendpoint.NewEndpointController(m.serviceRegistry, m.client) - return stockEndpointController -} diff --git a/master/master.go b/master/master.go new file mode 100644 index 00000000..54a20858 --- /dev/null +++ b/master/master.go @@ -0,0 +1,179 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package master + +// HACK: copied from upstream /pkg/master/master.go, then hacked for our purposes + +import ( + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta1" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta2" + "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" + kmaster "github.com/GoogleCloudPlatform/kubernetes/pkg/master" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/binding" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/controller" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/endpoint" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/etcd" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/event" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/minion" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + kmscheduler "github.com/mesosphere/kubernetes-mesos/scheduler" +) + +const ( + cachePeriod = 10 * time.Second +) + +type PodRegistryFactory func() pod.Registry + +// Config is a structure used to configure a Master. +type Config struct { + Client *client.Client + Cloud cloudprovider.Interface + EtcdHelper tools.EtcdHelper + HealthCheckMinions bool + MinionCacheTTL time.Duration + EventTTL time.Duration + MinionRegexp string + PodInfoGetter client.PodInfoGetter + NodeResources api.NodeResources + PRFactory PodRegistryFactory +} + +// Master contains state for a Kubernetes cluster master/api server. +type Master struct { + podRegistry pod.Registry + controllerRegistry controller.Registry + serviceRegistry service.Registry + endpointRegistry endpoint.Registry + minionRegistry minion.Registry + bindingRegistry binding.Registry + eventRegistry generic.Registry + storage map[string]apiserver.RESTStorage + client *client.Client + manifestFactory pod.ManifestFactory +} + +// NewEtcdHelper returns an EtcdHelper for the provided arguments or an error if the version +// is incorrect. +func NewEtcdHelper(client tools.EtcdGetSet, version string) (helper tools.EtcdHelper, err error) { + if version == "" { + version = latest.Version + } + versionInterfaces, err := latest.InterfacesFor(version) + if err != nil { + return helper, err + } + return tools.EtcdHelper{client, versionInterfaces.Codec, tools.RuntimeVersionAdapter{versionInterfaces.ResourceVersioner}}, nil +} + +// New returns a new instance of Master connected to the given etcd server. +func New(c *Config) *Master { + minionRegistry := makeMinionRegistry(c) + serviceRegistry := etcd.NewRegistry(c.EtcdHelper, nil) + manifestFactory := &pod.BasicManifestFactory{ + ServiceRegistry: serviceRegistry, + } + + m := &Master{ + podRegistry: c.PRFactory(), + controllerRegistry: etcd.NewRegistry(c.EtcdHelper, nil), + serviceRegistry: serviceRegistry, + endpointRegistry: etcd.NewRegistry(c.EtcdHelper, nil), + bindingRegistry: etcd.NewRegistry(c.EtcdHelper, manifestFactory), + eventRegistry: event.NewEtcdRegistry(c.EtcdHelper, uint64(c.EventTTL.Seconds())), + minionRegistry: minionRegistry, + client: c.Client, + manifestFactory: manifestFactory, + } + m.init(c.Cloud, c.PodInfoGetter) + return m +} + +func makeMinionRegistry(c *Config) minion.Registry { + minionRegistry := kmscheduler.NewCloudRegistry(c.Cloud) + + /* TODO(jdef): kubelet-executors may not be running on all slaves, need to be sure which slaves + are running such before we start doing health checks, etc... + + if c.HealthCheckMinions { + minionRegistry = minion.NewHealthyRegistry(minionRegistry, &http.Client{}) + } + if c.MinionCacheTTL > 0 { + cachingMinionRegistry, err := minion.NewCachingRegistry(minionRegistry, c.MinionCacheTTL) + if err != nil { + glog.Errorf("Failed to initialize caching layer, ignoring cache.") + } else { + minionRegistry = cachingMinionRegistry + } + } + */ + return minionRegistry +} + +func (m *Master) init(cloud cloudprovider.Interface, podInfoGetter client.PodInfoGetter) { + podCache := kmaster.NewPodCache(podInfoGetter, m.podRegistry) + go util.Forever(func() { podCache.UpdateAllContainers() }, cachePeriod) + + m.storage = map[string]apiserver.RESTStorage{ + "pods": pod.NewREST(&pod.RESTConfig{ + CloudProvider: cloud, + PodCache: podCache, + PodInfoGetter: podInfoGetter, + Registry: m.podRegistry, + Minions: m.client, + }), + "replicationControllers": controller.NewREST(m.controllerRegistry, m.podRegistry), + "services": service.NewREST(m.serviceRegistry, cloud, m.minionRegistry), + "endpoints": endpoint.NewREST(m.endpointRegistry), + "minions": minion.NewREST(m.minionRegistry), + "events": event.NewREST(m.eventRegistry), + "bindings": binding.NewREST(m.bindingRegistry), + } +} + +func (m *Master) GetManifestFactory() pod.ManifestFactory { + return m.manifestFactory +} + +// API_v1beta1 returns the resources and codec for API version v1beta1. +func (m *Master) API_v1beta1() (map[string]apiserver.RESTStorage, runtime.Codec, string, runtime.SelfLinker) { + storage := make(map[string]apiserver.RESTStorage) + for k, v := range m.storage { + storage[k] = v + } + return storage, v1beta1.Codec, "/api/v1beta1", latest.SelfLinker +} + +// API_v1beta2 returns the resources and codec for API version v1beta2. +func (m *Master) API_v1beta2() (map[string]apiserver.RESTStorage, runtime.Codec, string, runtime.SelfLinker) { + storage := make(map[string]apiserver.RESTStorage) + for k, v := range m.storage { + storage[k] = v + } + return storage, v1beta2.Codec, "/api/v1beta2", latest.SelfLinker +} diff --git a/scheduler/cloud.go b/scheduler/cloud.go index 77c672e3..e529eea5 100644 --- a/scheduler/cloud.go +++ b/scheduler/cloud.go @@ -3,6 +3,7 @@ package scheduler import "errors" import "net" import log "github.com/golang/glog" +import "github.com/GoogleCloudPlatform/kubernetes/pkg/api" import cloud "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" var ( @@ -32,7 +33,8 @@ func (c *MesosCloud) Zones() (cloud.Zones, bool) { return nil, false } -// implementation of cloud.Instances +// implementation of cloud.Instances. +// IPAddress returns an IP address of the specified instance. func (c *MesosCloud) IPAddress(name string) (net.IP, error) { if name == "" { return nil, noHostNameSpecified @@ -48,7 +50,8 @@ func (c *MesosCloud) IPAddress(name string) (net.IP, error) { } } -// implementation of cloud.Instances; does not implement any filtering +// implementation of cloud.Instances; does not implement any filtering. +// List lists instances that match 'filter' which is a regular expression which must match the entire instance name (fqdn). func (c *MesosCloud) List(filter string) ([]string, error) { c.RLock() defer c.RUnlock() @@ -59,3 +62,9 @@ func (c *MesosCloud) List(filter string) ([]string, error) { } return slaveHosts, nil } + +// implementation of cloud.Instances; always returns nil,nil. +// GetNodeResources gets the resources for a particular node +func (c *MesosCloud) GetNodeResources(name string) (*api.NodeResources, error) { + return nil, nil +} diff --git a/scheduler/cloud_registry.go b/scheduler/cloud_registry.go index 489f1f32..168a0cc5 100644 --- a/scheduler/cloud_registry.go +++ b/scheduler/cloud_registry.go @@ -1,7 +1,9 @@ package scheduler import "fmt" +import "github.com/GoogleCloudPlatform/kubernetes/pkg/api" import "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" +import "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/minion" // implements the minion.Registry interface type CloudRegistry struct { @@ -12,31 +14,60 @@ func NewCloudRegistry(c cloudprovider.Interface) *CloudRegistry { return &CloudRegistry{c} } -func (r *CloudRegistry) Contains(minion string) (bool, error) { - instances, err := r.List() - if err != nil { - return false, err - } - for _, name := range instances { - if name == minion { - return true, nil - } - } - return false, nil +func (r *CloudRegistry) DeleteMinion(api.Context, string) error { + return fmt.Errorf("unsupported") } -func (r CloudRegistry) Delete(minion string) error { +func (r *CloudRegistry) CreateMinion(api.Context, *api.Minion) error { return fmt.Errorf("unsupported") } -func (r CloudRegistry) Insert(minion string) error { - return fmt.Errorf("unsupported") +func (r *CloudRegistry) GetMinion(ctx api.Context, minionId string) (*api.Minion, error) { + instances, ok := r.cloud.Instances() + if !ok { + return nil, fmt.Errorf("cloud doesn't support instances") + } + hostnames, err := instances.List("") + if err != nil { + return nil, err + } + for _, m := range hostnames { + if m != minionId { + continue + } + return toApiMinion(ctx, m), nil + } + return nil, minion.ErrDoesNotExist } -func (r *CloudRegistry) List() ([]string, error) { +func (r *CloudRegistry) ListMinions(ctx api.Context) (*api.MinionList, error) { instances, ok := r.cloud.Instances() if !ok { return nil, fmt.Errorf("cloud doesn't support instances") } - return instances.List("") + minions := []api.Minion{} + hostnames, err := instances.List("") + if err != nil { + return nil, err + } + for _, m := range hostnames { + minions = append(minions, *(toApiMinion(ctx, m))) + } + ns, _ := api.NamespaceFrom(ctx) + return &api.MinionList{ + TypeMeta: api.TypeMeta{Kind: "minionList", Namespace: ns}, + Items: minions, + }, nil +} + +func toApiMinion(ctx api.Context, hostname string) *api.Minion { + ns, _ := api.NamespaceFrom(ctx) + return &api.Minion{ + TypeMeta: api.TypeMeta{ + ID: hostname, + Kind: "minion", + Namespace: ns, + }, + HostIP: hostname, + } } diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index d9a94cf6..a2356600 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -13,7 +13,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" - "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service" + kpod "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod" algorithm "github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" @@ -85,7 +85,7 @@ type KubernetesScheduler struct { // Mesos context. executor *mesos.ExecutorInfo - Driver mesos.SchedulerDriver + driver mesos.SchedulerDriver frameworkId *mesos.FrameworkID masterInfo *mesos.MasterInfo registered bool @@ -111,39 +111,34 @@ type KubernetesScheduler struct { client *client.Client podQueue *cache.FIFO - serviceRegistry service.Registry + manifestFactory kpod.ManifestFactory } // New create a new KubernetesScheduler -func New(executor *mesos.ExecutorInfo, scheduleFunc PodScheduleFunc, client *client.Client, helper tools.EtcdHelper, sr service.Registry) *KubernetesScheduler { +func New(executor *mesos.ExecutorInfo, scheduleFunc PodScheduleFunc, client *client.Client, helper tools.EtcdHelper) *KubernetesScheduler { var k *KubernetesScheduler k = &KubernetesScheduler{ - new(sync.RWMutex), - helper, - executor, - nil, - nil, - nil, - false, - CreateOfferRegistry(OfferRegistryConfig{ + RWMutex: new(sync.RWMutex), + EtcdHelper: helper, + executor: executor, + offers: CreateOfferRegistry(OfferRegistryConfig{ declineOffer: func(id string) error { offerId := &mesos.OfferID{Value: proto.String(id)} - return k.Driver.DeclineOffer(offerId, nil) + return k.driver.DeclineOffer(offerId, nil) }, ttl: defaultOfferTTL * time.Second, lingerTtl: defaultOfferLingerTTL * time.Second, // remember expired offers so that we can tell if a previously scheduler offer relies on one listenerDelay: defaultListenerDelay * time.Second, }), - make(map[string]*Slave), - make(map[string]string), - make(map[string]*PodTask), - make(map[string]*PodTask), - ring.New(defaultFinishedTasksSize), - make(map[string]string), - scheduleFunc, - client, - cache.NewFIFO(), - sr, + slaves: make(map[string]*Slave), + slaveIDs: make(map[string]string), + pendingTasks: make(map[string]*PodTask), + runningTasks: make(map[string]*PodTask), + finishedTasks: ring.New(defaultFinishedTasksSize), + podToTask: make(map[string]string), + scheduleFunc: scheduleFunc, + client: client, + podQueue: cache.NewFIFO(), } return k } @@ -162,8 +157,10 @@ func (k *KubernetesScheduler) getTask(taskId string) (*PodTask, stateType) { return nil, stateUnknown } -func (k *KubernetesScheduler) Init() { +func (k *KubernetesScheduler) Init(d mesos.SchedulerDriver, f kpod.ManifestFactory) { k.offers.Init() + k.driver = d + k.manifestFactory = f } // Registered is called when the scheduler registered with the master successfully. @@ -313,8 +310,8 @@ func (k *KubernetesScheduler) fillRunningPodInfo(task *PodTask, taskStatus *meso /// k8s-mesos cluster. For now, I've duplicated logic from k8s fillPodInfo netContainerInfo, ok := target["net"] // docker.Container if ok { - if netContainerInfo.NetworkSettings != nil { - task.Pod.CurrentState.PodIP = netContainerInfo.NetworkSettings.IPAddress + if netContainerInfo.PodIP != "" { + task.Pod.CurrentState.PodIP = netContainerInfo.PodIP } else { log.Warningf("No network settings: %#v", netContainerInfo) } @@ -536,26 +533,36 @@ func (k *KubernetesScheduler) handleSchedulingError(backoff *podBackoff, pod *ap } // ListPods obtains a list of pods that match selector. -func (k *KubernetesScheduler) ListPods(selector labels.Selector) (*api.PodList, error) { - log.V(2).Infof("List pods for '%v'\n", selector) +func (k *KubernetesScheduler) ListPodsPredicate(ctx api.Context, filter func(*api.Pod) bool) (*api.PodList, error) { + k.RLock() + defer k.RUnlock() + return k.listPods(filter) +} +// ListPods obtains a list of pods that match selector. +func (k *KubernetesScheduler) ListPods(ctx api.Context, selector labels.Selector) (*api.PodList, error) { + log.V(2).Infof("List pods for '%v'\n", selector) k.RLock() defer k.RUnlock() + return k.listPods(func(pod *api.Pod) bool { + return selector.Matches(labels.Set(pod.Labels)) + }) +} +// assumes that caller has already locked around scheduler state +func (k *KubernetesScheduler) listPods(filter func(*api.Pod) bool) (*api.PodList, error) { result := []api.Pod{} for _, task := range k.runningTasks { pod := task.Pod - var l labels.Set = pod.Labels - if selector.Matches(l) || selector.Empty() { + if filter(pod) { result = append(result, *pod) } } for _, task := range k.pendingTasks { pod := task.Pod - var l labels.Set = pod.Labels - if selector.Matches(l) || selector.Empty() { + if filter(pod) { result = append(result, *pod) } } @@ -573,7 +580,7 @@ func (k *KubernetesScheduler) ListPods(selector labels.Selector) (*api.PodList, // Get a specific pod. It's *very* important to return a clone of the Pod that // we've saved because our caller will likely modify it. -func (k *KubernetesScheduler) GetPod(podId string) (*api.Pod, error) { +func (k *KubernetesScheduler) GetPod(ctx api.Context, podId string) (*api.Pod, error) { log.V(2).Infof("Get pod '%s'\n", podId) k.RLock() @@ -604,7 +611,7 @@ func (k *KubernetesScheduler) GetPod(podId string) (*api.Pod, error) { // Create a pod based on a specification; DOES NOT schedule it onto a specific machine, // instead the pod is queued for scheduling. -func (k *KubernetesScheduler) CreatePod(pod *api.Pod) error { +func (k *KubernetesScheduler) CreatePod(ctx api.Context, pod *api.Pod) error { log.V(2).Infof("Create pod: '%v'\n", pod) // Set current status to "Waiting". pod.CurrentState.Status = api.PodWaiting @@ -624,11 +631,11 @@ func (k *KubernetesScheduler) CreatePod(pod *api.Pod) error { defer k.Unlock() if _, ok := k.podToTask[pod.ID]; ok { - return fmt.Errorf("Pod %s already launched. Please choose a unique pod name", pod.JSONBase.ID) + return fmt.Errorf("Pod %s already launched. Please choose a unique pod name", pod.ID) } k.podQueue.Add(pod.ID, pod) - k.podToTask[pod.JSONBase.ID] = task.ID + k.podToTask[pod.ID] = task.ID k.pendingTasks[task.ID] = task return nil @@ -670,7 +677,7 @@ func (k *KubernetesScheduler) Bind(binding *api.Binding) error { if err = k.prepareTaskForLaunch(binding.Host, task); err == nil { log.V(2).Infof("Launching task : %v", task) taskList := []*mesos.TaskInfo{task.TaskInfo} - if err = k.Driver.LaunchTasks(task.Offer.Details().Id, taskList, nil); err == nil { + if err = k.driver.LaunchTasks(task.Offer.Details().Id, taskList, nil); err == nil { // we *intentionally* do not record our binding to etcd since we're not using bindings // to manage pod lifecycle task.Pod.DesiredState.Host = binding.Host @@ -686,7 +693,7 @@ func (k *KubernetesScheduler) Bind(binding *api.Binding) error { func (k *KubernetesScheduler) prepareTaskForLaunch(machine string, task *PodTask) error { // TODO(k8s): move this to a watch/rectification loop. - manifest, err := k.makeManifest(machine, *task.Pod) + manifest, err := k.manifestFactory.MakeManifest(machine, *task.Pod) if err != nil { log.V(2).Infof("Failed to generate an updated manifest") return err @@ -705,30 +712,15 @@ func (k *KubernetesScheduler) prepareTaskForLaunch(machine string, task *PodTask return nil } -// TODO(jdef): hacked in from kubernetes/pkg/registry/etcd/manifest_factory.go. It would be -// nice to have another way to get access to this default implementation, unfortunately the k8s -// API doesn't allow for that. We should probably file a PR against k8s for such. -func (k *KubernetesScheduler) makeManifest(machine string, pod api.Pod) (api.ContainerManifest, error) { - envVars, err := service.GetServiceEnvironmentVariables(k.serviceRegistry, machine) - if err != nil { - return api.ContainerManifest{}, err - } - for ix, container := range pod.DesiredState.Manifest.Containers { - pod.DesiredState.Manifest.ID = pod.ID - pod.DesiredState.Manifest.Containers[ix].Env = append(container.Env, envVars...) - } - return pod.DesiredState.Manifest, nil -} - // Update an existing pod. -func (k *KubernetesScheduler) UpdatePod(pod *api.Pod) error { +func (k *KubernetesScheduler) UpdatePod(ctx api.Context, pod *api.Pod) error { // TODO(yifan): Need to send a special message to the slave/executor. // TODO(nnielsen): Pod updates not yet supported by kubelet. return fmt.Errorf("Not implemented: UpdatePod") } // Delete an existing pod. -func (k *KubernetesScheduler) DeletePod(podId string) error { +func (k *KubernetesScheduler) DeletePod(ctx api.Context, podId string) error { log.V(2).Infof("Delete pod '%s'\n", podId) k.Lock() @@ -751,8 +743,6 @@ func (k *KubernetesScheduler) DeletePod(podId string) error { task, state := k.getTask(taskId) switch state { - case stateRunning: - killTaskId = &mesos.TaskID{Value: proto.String(task.ID)} case statePending: if !task.Launched { // we've been invoked in between Schedule() and Bind() @@ -764,16 +754,18 @@ func (k *KubernetesScheduler) DeletePod(podId string) error { delete(k.pendingTasks, taskId) return nil } + fallthrough + case stateRunning: killTaskId = &mesos.TaskID{Value: proto.String(task.ID)} default: return fmt.Errorf("Cannot kill pod '%s': pod not found", podId) } // signal to watchers that the related pod is going down task.Pod.DesiredState.Host = "" - return k.Driver.KillTask(killTaskId) + return k.driver.KillTask(killTaskId) } -func (k *KubernetesScheduler) WatchPods(resourceVersion uint64, filter func(*api.Pod) bool) (watch.Interface, error) { +func (k *KubernetesScheduler) WatchPods(ctx api.Context, resourceVersion string, filter func(*api.Pod) bool) (watch.Interface, error) { return nil, nil } diff --git a/service/endpoints_controller.go b/service/endpoints_controller.go index 1d7e2853..7c5e5d15 100644 --- a/service/endpoints_controller.go +++ b/service/endpoints_controller.go @@ -22,9 +22,9 @@ import ( "strconv" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" - "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/golang/glog" @@ -36,51 +36,77 @@ type EndpointController interface { // EndpointController manages service endpoints. type endpointController struct { - client *client.Client - serviceRegistry service.Registry + client *client.Client } // NewEndpointController returns a new *EndpointController. -func NewEndpointController(serviceRegistry service.Registry, client *client.Client) EndpointController { +func NewEndpointController(client *client.Client) EndpointController { return &endpointController{ - serviceRegistry: serviceRegistry, - client: client, + client: client, } } // SyncServiceEndpoints syncs service endpoints. func (e *endpointController) SyncServiceEndpoints() error { - services, err := e.client.ListServices(labels.Everything()) + ctx := api.NewContext() + services, err := e.client.ListServices(ctx, labels.Everything()) if err != nil { glog.Errorf("Failed to list services: %v", err) return err } var resultErr error for _, service := range services.Items { - pods, err := e.client.ListPods(labels.Set(service.Selector).AsSelector()) + nsCtx := api.WithNamespace(ctx, service.Namespace) + pods, err := e.client.ListPods(nsCtx, labels.Set(service.Selector).AsSelector()) if err != nil { glog.Errorf("Error syncing service: %#v, skipping.", service) resultErr = err continue } - endpoints := make([]string, len(pods.Items)) - for ix, pod := range pods.Items { + endpoints := []string{} + for _, pod := range pods.Items { + // HACK(jdef): looks up a HostPort in the container, either by port-name or matching HostPort port, err := findPort(&pod.DesiredState.Manifest, service.ContainerPort) if err != nil { glog.Errorf("Failed to find port for service: %v, %v", service, err) continue } + // HACK(jdef): use HostIP instead of pod.CurrentState.PodIP for generic mesos compat if len(pod.CurrentState.HostIP) == 0 { glog.Errorf("Failed to find a host IP for pod: %v", pod) continue } - endpoints[ix] = net.JoinHostPort(pod.CurrentState.HostIP, strconv.Itoa(port)) + endpoints = append(endpoints, net.JoinHostPort(pod.CurrentState.HostIP, strconv.Itoa(port))) + } + currentEndpoints, err := e.client.GetEndpoints(nsCtx, service.ID) + if err != nil { + // TODO(k8s) this is brittle as all get out, refactor the client libraries to return a structured error. + if errors.IsNotFound(err) { + currentEndpoints = &api.Endpoints{ + TypeMeta: api.TypeMeta{ + ID: service.ID, + }, + } + } else { + glog.Errorf("Error getting endpoints: %#v", err) + continue + } + } + newEndpoints := &api.Endpoints{} + *newEndpoints = *currentEndpoints + newEndpoints.Endpoints = endpoints + + if len(currentEndpoints.ResourceVersion) == 0 { + // No previous endpoints, create them + _, err = e.client.CreateEndpoints(nsCtx, newEndpoints) + } else { + // Pre-existing + if endpointsEqual(currentEndpoints, endpoints) { + glog.V(2).Infof("endpoints are equal for %s, skipping update", service.ID) + continue + } + _, err = e.client.UpdateEndpoints(nsCtx, newEndpoints) } - // TODO(k8s): this is totally broken, we need to compute this and store inside an AtomicUpdate loop. - err = e.serviceRegistry.UpdateEndpoints(&api.Endpoints{ - JSONBase: api.JSONBase{ID: service.ID}, - Endpoints: endpoints, - }) if err != nil { glog.Errorf("Error updating endpoints: %#v", err) continue @@ -89,7 +115,32 @@ func (e *endpointController) SyncServiceEndpoints() error { return resultErr } -// findPort locates the container port for the given manifest and portName. +func containsEndpoint(endpoints *api.Endpoints, endpoint string) bool { + if endpoints == nil { + return false + } + for ix := range endpoints.Endpoints { + if endpoints.Endpoints[ix] == endpoint { + return true + } + } + return false +} + +func endpointsEqual(e *api.Endpoints, endpoints []string) bool { + if len(e.Endpoints) != len(endpoints) { + return false + } + for _, endpoint := range endpoints { + if !containsEndpoint(e, endpoint) { + return false + } + } + return true +} + +// findPort locates the Host port for the given manifest and portName. +// HACK(jdef): return the HostPort instead of the ContainerPort for generic mesos compat. func findPort(manifest *api.ContainerManifest, portName util.IntOrString) (int, error) { if ((portName.Kind == util.IntstrString && len(portName.StrVal) == 0) || (portName.Kind == util.IntstrInt && portName.IntVal == 0)) &&