diff --git a/DEVELOP.md b/DEVELOP.md index a58de2a0..40356f48 100644 --- a/DEVELOP.md +++ b/DEVELOP.md @@ -3,6 +3,8 @@ Development of Kubernetes-Mesos * [Prerequisites](#prerequisites) * [Build](README.md#build) the framework +* [Testing](#testing) +* [Profiling](#profiling) ### Prerequisites To get started with development you'll need to install some prerequisites: @@ -66,5 +68,37 @@ $ mkdir -pv /opt && (export GOPATH=/opt; cd /opt && ln -sv /opt/bin/godep /usr/local/bin/godep) ``` +### Testing + +There is a Makefile target called `test` that will execute unit tests for packages that have them. +```shell +$ make test +``` + +### Profiling + +The project Makefile supports the inclusion of arbitrary Go build flags. +To generate a build with profiling enabled, include `TAGS=profile`. +If you're not using the Makefile then you must supply `-tags 'profile'` as part of your `go build` command. +If you're using the dockerbuilder then read the [instructions][3] for profiling in the accompanying documentation. + +```shell +$ make TAGS=profile +``` + +Profiling, when enabled, is supported for both the `kubernetes-mesos` (framework) and `kubernetes-executor` binaries: +```shell +$ ts=$(date +'%Y%m%d%H%M%S') +$ curl http://${servicehost}:9000/debug/pprof/heap >framework.heap.$ts +$ curl http://10.132.189.${slave}:10250/debug/pprof/heap >${slave}.heap.$ts +``` + +If you have captured two or more heaps it's trivial to use the Go pprof tooling to generate reports: +```shell +$ go tool pprof --base=./${slave}.heap.20141117175634 --inuse_objects --pdf \ + ./bin/kubernetes-executor ./${slave}.heap.20141120162503 >${slave}-20141120a.pdf +``` + [1]: https://github.com/mesosphere/kubernetes-mesos#build [2]: https://github.com/tools/godep +[3]: hack/dockerbuild/README.md#profiling diff --git a/Makefile b/Makefile index b3ebc33f..171807ed 100644 --- a/Makefile +++ b/Makefile @@ -11,14 +11,24 @@ K8S_CMD := \ github.com/GoogleCloudPlatform/kubernetes/cmd/kubecfg \ github.com/GoogleCloudPlatform/kubernetes/cmd/proxy FRAMEWORK_CMD := \ - github.com/mesosphere/kubernetes-mesos/kubernetes-mesos \ + github.com/mesosphere/kubernetes-mesos/kubernetes-mesos \ github.com/mesosphere/kubernetes-mesos/kubernetes-executor +FRAMEWORK_LIB := \ + github.com/mesosphere/kubernetes-mesos/scheduler \ + github.com/mesosphere/kubernetes-mesos/service \ + github.com/mesosphere/kubernetes-mesos/executor \ + github.com/mesosphere/kubernetes-mesos/queue +# a list of upstream projects for which we test the availability of patches +PATCH_SCRIPT := $(current_dir)/hack/patches/apply.sh # TODO: make this something more reasonable DESTDIR ?= /target -.PHONY: all error require-godep framework require-vendor proxy install info bootstrap require-gopath format +# default build tags +TAGS ?= + +.PHONY: all error require-godep framework require-vendor proxy install info bootstrap require-gopath format test patch ifneq ($(WITH_MESOS_DIR),) @@ -40,7 +50,7 @@ WITH_MESOS_CGO_FLAGS := \ endif -all: proxy framework +all: patch proxy framework error: echo -E "$@: ${MSG}" >&2 @@ -58,14 +68,13 @@ proxy: require-godep require-vendor: framework: require-godep - env $(WITH_MESOS_CGO_FLAGS) go install $${WITH_RACE:+-race} $(FRAMEWORK_CMD) + env $(WITH_MESOS_CGO_FLAGS) go install -v -x -tags '$(TAGS)' $${WITH_RACE:+-race} $(FRAMEWORK_CMD) format: require-gopath - go fmt github.com/mesosphere/kubernetes-mesos/kubernetes-mesos \ - github.com/mesosphere/kubernetes-mesos/kubernetes-executor \ - github.com/mesosphere/kubernetes-mesos/scheduler \ - github.com/mesosphere/kubernetes-mesos/service \ - github.com/mesosphere/kubernetes-mesos/executor + go fmt $(FRAMEWORK_CMD) $(FRAMEWORK_LIB) + +test: require-gopath + go test $(FRAMEWORK_LIB) install: all mkdir -p $(DESTDIR) @@ -79,6 +88,13 @@ info: @echo CGO_CXXFLAGS="$(CGO_CXXFLAGS)" @echo CGO_LDFLAGS="$(CGO_LDFLAGS)" @echo RACE_FLAGS=$${WITH_RACE:+-race} + @echo TAGS=$(TAGS) bootstrap: require-godep godep restore + +patch: $(PATCH_SCRIPT) + $(PATCH_SCRIPT) + +$(PATCH_SCRIPT): + test -x $@ || chmod +x $@ diff --git a/executor/executor.go b/executor/executor.go index 12b73406..6b330d60 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -187,30 +187,38 @@ func (k *KubernetesExecutor) LaunchTask(driver mesos.ExecutorDriver, taskInfo *m return } - // TODO(jdefelice) remove the task (and pod state) here? - k.sendStatusUpdate(taskInfo.GetTaskId(), mesos.TaskState_TASK_LOST, "Task lost: launch failed") + + k.lock.Lock() + defer k.lock.Unlock() + k.reportLostTask(taskId, "Task lost: launch failed") }() } // Intended to be executed as part of the pod monitoring loop, this fn (ultimately) checks with Docker -// whether the pod is running and will return true if the pod becomes unkown. If there's still a task -// record on file, but no pod in Docker, then we'll also send a TASK_LOST event. +// whether the pod is running. It will only return false if the task is still registered and the pod is +// registered in Docker. Otherwise it returns true. If there's still a task record on file, but no pod +// in Docker, then we'll also send a TASK_LOST event. func (k *KubernetesExecutor) checkForLostPodTask(taskInfo *mesos.TaskInfo, isKnownPod func() bool) bool { // TODO (jdefelice) don't send false alarms for deleted pods (KILLED tasks) - k.lock.RLock() - defer k.lock.RUnlock() + k.lock.Lock() + defer k.lock.Unlock() - if !isKnownPod() { - taskId := taskInfo.GetTaskId().GetValue() - if _, ok := k.tasks[taskId]; !ok { - log.Infof("Ignoring lost container: task not present") + // TODO(jdef) we should really consider k.pods here, along with what docker is reporting, since the kubelet + // may constantly attempt to instantiate a pod as long as it's in the pod state that we're handing to it. + // otherwise, we're probably reporting a TASK_LOST prematurely. Should probably consult RestartPolicy to + // determine appropriate behavior. Should probably also gracefully handle docker daemon restarts. + taskId := taskInfo.GetTaskId().GetValue() + if _, ok := k.tasks[taskId]; ok { + if isKnownPod() { + return false } else { - // TODO(jdefelice) remove the task (and pod state) here? - k.sendStatusUpdate(taskInfo.GetTaskId(), mesos.TaskState_TASK_LOST, "Task lost: container disappeared") + log.Warningf("Detected lost pod, reporting lost task %v", taskId) + k.reportLostTask(taskId, "Task lost: container disappeared") } - return true + } else { + log.V(2).Infof("Task %v no longer registered, stop monitoring for lost pods", taskId) } - return false + return true } // KillTask is called when the executor receives a request to kill a task. @@ -243,8 +251,9 @@ func (k *KubernetesExecutor) killPodForTask(tid, reason string) { // to be deprecated. pid := task.containerManifest.ID if _, found := k.pods[pid]; !found { - log.Warningf("Cannot remove Unknown pod %v\n", pid) + log.Warningf("Cannot remove Unknown pod %v for task %v", pid, tid) } else { + log.V(2).Infof("Deleting pod %v for task %v", pid, tid) delete(k.pods, pid) // Send the pod updates to the channel. @@ -259,6 +268,37 @@ func (k *KubernetesExecutor) killPodForTask(tid, reason string) { k.sendStatusUpdate(task.mesosTaskInfo.GetTaskId(), mesos.TaskState_TASK_KILLED, reason) } +// Reports a lost task to the slave and updates internal task and pod tracking state. +// Assumes that the caller is locking around pod and task state. +func (k *KubernetesExecutor) reportLostTask(tid, reason string) { + task, ok := k.tasks[tid] + if !ok { + log.Infof("Failed to report lost task, unknown task %v\n", tid) + return + } + delete(k.tasks, tid) + + // TODO(nnielsen): Verify this assumption. Manifest ID's has been marked + // to be deprecated. + pid := task.containerManifest.ID + if _, found := k.pods[pid]; !found { + log.Warningf("Cannot remove Unknown pod %v for lost task %v", pid, tid) + } else { + log.V(2).Infof("Deleting pod %v for lost task %v", pid, tid) + delete(k.pods, pid) + + // Send the pod updates to the channel. + update := kubelet.PodUpdate{Op: kubelet.SET} + for _, p := range k.pods { + update.Pods = append(update.Pods, *p) + } + k.updateChan <- update + } + // TODO(yifan): Check the result of the kill event. + + k.sendStatusUpdate(task.mesosTaskInfo.GetTaskId(), mesos.TaskState_TASK_LOST, reason) +} + // FrameworkMessage is called when the framework sends some message to the executor func (k *KubernetesExecutor) FrameworkMessage(driver mesos.ExecutorDriver, message string) { log.Infof("Receives message from framework %v\n", message) diff --git a/executor/server.go b/executor/server.go index 21831917..77080a4a 100644 --- a/executor/server.go +++ b/executor/server.go @@ -36,6 +36,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" "github.com/golang/glog" "github.com/google/cadvisor/info" + "github.com/mesosphere/kubernetes-mesos/profile" ) // Server is a http.Handler which exposes kubelet functionality over HTTP. @@ -85,6 +86,7 @@ func NewServer(host HostInterface, updates chan<- interface{}, ns string) Server // InstallDefaultHandlers registers the set of supported HTTP request patterns with the mux. func (s *Server) InstallDefaultHandlers() { healthz.InstallHandler(s.mux) + profile.InstallHandler(s.mux) s.mux.HandleFunc("/podInfo", s.handlePodInfo) s.mux.HandleFunc("/stats/", s.handleStats) s.mux.HandleFunc("/logs/", s.handleLogs) @@ -98,6 +100,7 @@ func (s *Server) error(w http.ResponseWriter, err error) { // handlePodInfo handles podInfo requests against the Kubelet. func (s *Server) handlePodInfo(w http.ResponseWriter, req *http.Request) { + req.Close = true u, err := url.ParseRequestURI(req.RequestURI) if err != nil { s.error(w, err) diff --git a/hack/dockerbuild/README.md b/hack/dockerbuild/README.md index 53a8c3a1..9371250e 100644 --- a/hack/dockerbuild/README.md +++ b/hack/dockerbuild/README.md @@ -21,7 +21,7 @@ This example copies the resulting binaries into the host-mounted volume `/tmp/ta To build and copy the binaries: - $ docker run -rm -v /tmp/target:/target k8s-mesos-builder + $ docker run --rm -v /tmp/target:/target k8s-mesos-builder ... git clone https://${GOPKG}.git . Cloning into '.'... @@ -43,7 +43,7 @@ To build and copy the binaries: Alternatively, it can be used to generate binaries from a branch: - $ docker run -rm -v /tmp/target:/target -e GIT_BRANCH=default_port k8s-mesos-builder + $ docker run --rm -v /tmp/target:/target -e GIT_BRANCH=default_port k8s-mesos-builder Want a quick-and-dirty development environment to start hacking? @@ -52,8 +52,34 @@ Want a quick-and-dirty development environment to start hacking? Need to build the project, but from a forked git repo? - $ docker run -rm -v /tmp/target:/target -e GIT_REPO=https://github.com/whoami/kubernetes-mesos k8s-mesos-builder + $ docker run --rm -v /tmp/target:/target -e GIT_REPO=https://github.com/whoami/kubernetes-mesos k8s-mesos-builder To hack in your currently checked out repo mount the root of the github repo to `/snapshot`: $ docker run -ti -v /tmp/target:/target -v /home/jdef/kubernetes-mesos:/snapshot k8s-mesos-builder bash + +## Profiling + +Profiling in the cloud with Kubernetes-Mesos is easy! +First, ssh into your Mesos cluster and generate a set of project binaries with profiling enabled (the `TAGS` variable is important here): + + $ docker run --rm -ti -e GIT_BRANCH=offer_storage -e TAGS=profile \ + -v $(pwd)/bin:/target jdef/kubernetes-mesos:dockerbuild + +Next, [start the framework](https://github.com/mesosphere/kubernetes-mesos/#start-the-framework) and schedule some pods. +Once the framework and executors are up and running you can start capturing heaps: + + $ ts=$(date +'%Y%m%d%H%M%S') + $ curl http://${servicehost}:9000/debug/pprof/heap >framework.heap.$ts + $ for slave in 240 242 243; do + curl http://10.132.189.${slave}:10250/debug/pprof/heap >${slave}.heap.$ts; + done + +Once you have a few heaps, you can generate reports. +Additional packages may be required to support the reporting format you desire. + + $ apt-get install ghostscript graphviz + $ go tool pprof --base=./framework.heap.20141117175634 --inuse_objects --pdf \ + ./bin/kubernetes-executor ./framework.heap.20141120162503 >framework-20141120a.pdf + +For more details regarding profiling read the [pprof](http://golang.org/pkg/net/http/pprof/) package documentation. diff --git a/hack/patches/README.md b/hack/patches/README.md new file mode 100644 index 00000000..daa8ce98 --- /dev/null +++ b/hack/patches/README.md @@ -0,0 +1,22 @@ +### Overview +Periodically bug fixes will be needed in upstream code that this project depends upon. +Rather than patch incorporated vendored code, this directory provides a place to isolate individual patches that may be applied at build time. +**For now only a single HUNK should be present in each `.patch` file.** + +**NOTE:** This is not intended to replace proper pull-requests in other projects. +Rather it is a short-term stop-gap solution for moving forward with patched code until the appropriate PR's are applied upstream and this project has been rebased to a revision that includes said PR's. + +### Naming Convention +Patch files are named according to the upstream project they apply to and the issue number relative to **this project**. +``` + {patched-project}---{k8s-mesos-issue}.patch +``` + +For example, a file named `k8s---issue1234.patch` would indicate a patch for the Kubernetes project, tracked by issue 1234 in this project's (kubernetes-mesos) issues list. +Issue 1234 should cross-reference any relevant PR's in the upstream project's repository. + +#### Projects + +Project Code | Project Name +-------------|------------- + k8s | [kubernetes](https://github.com/GoogleCloudPlatform/kubernetes) diff --git a/hack/patches/apply.sh b/hack/patches/apply.sh new file mode 100755 index 00000000..94c815d2 --- /dev/null +++ b/hack/patches/apply.sh @@ -0,0 +1,43 @@ +#!/bin/bash + +function die() { + test "x$1" = "x" || echo -E "$1" >&2 + exit 1 +} + +test -n "$GOPATH" || die Missing GOPATH +pkg="${GOPATH%%:*}" +echo GO packages in $pkg will be patched + +test -n "$pkg" || die Invalid GOPATH=$GOPATH +home=$(dirname "$0") +home=$(readlink -f "$home") +echo Patch directory $home + +# Add new k/v pairs for each project repo that may require patching +# and update the README.md as entries are modified here +declare -A pmap +pmap=( + [k8s]=github.com/GoogleCloudPlatform/kubernetes +) + +# TODO(jdef) at some point we should be able to apply patches with +# multiple hunks, ala: +# http://unix.stackexchange.com/questions/65698/how-to-make-patch-ignore-already-applied-hunks + +for k in "${!pmap[@]}"; do + repo="${pmap["${k}"]}" + echo "Checking patches for ${k}.. ($repo)" + find "${home}" -type f -name "${k}---issue*.patch" | while IFS= read -r f; do + #ff="${f%%.patch}" + #test "x$f" != "x$ff" || continue + cmd=( patch -p1 -s -r- -i"$f" ) + echo -n -E "${cmd[@]}" + output=$(cd "${pkg}/src/${repo}" && "${cmd[@]}") && echo || { + echo -E "$output" | \ + grep -q 'Reversed (or previously applied) patch detected' && \ + echo " (already applied)" || \ + { echo; die "Failed to apply patch ${f}: ${output}"; } + } + done +done diff --git a/hack/patches/k8s---issue77.patch b/hack/patches/k8s---issue77.patch new file mode 100644 index 00000000..2da147f7 --- /dev/null +++ b/hack/patches/k8s---issue77.patch @@ -0,0 +1,14 @@ +diff --git a/pkg/kubelet/dockertools/docker.go b/pkg/kubelet/dockertools/docker.go +index b42e2ce..b0f7808 100644 +--- a/pkg/kubelet/dockertools/docker.go ++++ b/pkg/kubelet/dockertools/docker.go +@@ -215,6 +215,9 @@ func GetDockerPodInfo(client DockerInterface, podFullName, uuid string) (api.Pod + } + + for _, value := range containers { ++ if len(value.Names) == 0 { ++ continue ++ } + dockerManifestID, dockerUUID, dockerContainerName, _ := ParseDockerName(value.Names[0]) + if dockerManifestID != podFullName { + continue diff --git a/kubernetes-mesos/main.go b/kubernetes-mesos/main.go index 2fb8b90a..9e8b2a8d 100644 --- a/kubernetes-mesos/main.go +++ b/kubernetes-mesos/main.go @@ -47,6 +47,7 @@ import ( goetcd "github.com/coreos/go-etcd/etcd" log "github.com/golang/glog" "github.com/mesos/mesos-go/mesos" + _ "github.com/mesosphere/kubernetes-mesos/profile" kmscheduler "github.com/mesosphere/kubernetes-mesos/scheduler" kmendpoint "github.com/mesosphere/kubernetes-mesos/service" ) @@ -175,6 +176,8 @@ func main() { driver.Init() defer driver.Destroy() + + mesosPodScheduler.Init() go driver.Start() log.V(2).Info("Serving executor artifacts...") diff --git a/profile/profile.go b/profile/profile.go new file mode 100644 index 00000000..47748a90 --- /dev/null +++ b/profile/profile.go @@ -0,0 +1,14 @@ +// +build profile + +package profile + +import "net/http" +import "net/http/pprof" + +func InstallHandler(m *http.ServeMux) { + // register the same endpoints as net/http/pprof.init() does + m.Handle("/debug/pprof/", http.HandlerFunc(pprof.Index)) + m.Handle("/debug/pprof/cmdline", http.HandlerFunc(pprof.Cmdline)) + m.Handle("/debug/pprof/profile", http.HandlerFunc(pprof.Profile)) + m.Handle("/debug/pprof/symbol", http.HandlerFunc(pprof.Symbol)) +} diff --git a/profile/profile_disabled.go b/profile/profile_disabled.go new file mode 100644 index 00000000..616882ad --- /dev/null +++ b/profile/profile_disabled.go @@ -0,0 +1,9 @@ +// +build !profile + +package profile + +import "net/http" + +func InstallHandler(m *http.ServeMux) { + // intentionally does nothing +} diff --git a/queue/queue.go b/queue/queue.go new file mode 100644 index 00000000..a056b2ad --- /dev/null +++ b/queue/queue.go @@ -0,0 +1,121 @@ +package queue + +import ( + "container/heap" + "sync" + "time" +) + +type Delayed interface { + // return the remaining delay; a non-positive value indicates no delay + GetDelay() time.Duration +} + +type qitem struct { + value interface{} + priority time.Time + index int +} + +// A priorityQueue implements heap.Interface and holds qitems. +type priorityQueue []*qitem + +func (pq priorityQueue) Len() int { return len(pq) } + +func (pq priorityQueue) Less(i, j int) bool { + return pq[i].priority.Before(pq[j].priority) +} + +func (pq priorityQueue) Swap(i, j int) { + pq[i], pq[j] = pq[j], pq[i] + pq[i].index = i + pq[j].index = j +} + +func (pq *priorityQueue) Push(x interface{}) { + n := len(*pq) + item := x.(*qitem) + item.index = n + *pq = append(*pq, item) +} + +func (pq *priorityQueue) Pop() interface{} { + old := *pq + n := len(old) + item := old[n-1] + item.index = -1 // for safety + *pq = old[0 : n-1] + return item +} + +// concurrency-safe, deadline-oriented queue that returns items after their +// delay period has expired. +type DelayQueue struct { + queue priorityQueue + lock sync.Mutex + cond sync.Cond +} + +func NewDelayQueue() *DelayQueue { + q := &DelayQueue{} + q.cond.L = &q.lock + return q +} + +func (q *DelayQueue) Add(d Delayed) { + deadline := time.Now().Add(d.GetDelay()) + + q.lock.Lock() + defer q.lock.Unlock() + heap.Push(&q.queue, &qitem{ + value: d, + priority: deadline, + }) + q.cond.Broadcast() +} + +func (q *DelayQueue) next() *qitem { + q.lock.Lock() + defer q.lock.Unlock() + for q.queue.Len() == 0 { + q.cond.Wait() + } + x := heap.Pop(&q.queue) + item := x.(*qitem) + return item +} + +type empty struct{} + +// wait for the delay of the next item in the queue to expire, blocking if +// there are no items in the queue. does not guarantee first-come-first-serve +// ordering with respect to clients. +func (q *DelayQueue) Pop() Delayed { + var ch chan empty + for { + item := q.next() + x := item.value.(Delayed) + waitingPeriod := item.priority.Sub(time.Now()) + if waitingPeriod >= 0 { + // listen for calls to Add() while we're waiting for the deadline + if ch == nil { + ch = make(chan empty, 1) + } + go func() { + q.lock.Lock() + defer q.lock.Unlock() + q.cond.Wait() + ch <- empty{} + }() + select { + case <-time.After(waitingPeriod): + return x + case <-ch: + // we may no longer have the earliest deadline, re-try + q.Add(x) + continue + } + } + return x + } +} diff --git a/queue/queue_test.go b/queue/queue_test.go new file mode 100644 index 00000000..53aaab6a --- /dev/null +++ b/queue/queue_test.go @@ -0,0 +1,261 @@ +package queue + +import ( + "sync/atomic" + "testing" + "time" +) + +const ( + tolerance = 100 * time.Millisecond // go time delays aren't perfect, this is our tolerance for errors WRT expected timeouts +) + +func TestPQ(t *testing.T) { + t.Parallel() + + var pq priorityQueue + if pq.Len() != 0 { + t.Fatalf("pq should be empty") + } + + now := time.Now() + now2 := now.Add(2 * time.Second) + pq.Push(&qitem{priority: now2}) + if pq.Len() != 1 { + t.Fatalf("pq.len should be 1") + } + x := pq.Pop() + if x == nil { + t.Fatalf("x is nil") + } + if pq.Len() != 0 { + t.Fatalf("pq should be empty") + } + item := x.(*qitem) + if !item.priority.Equal(now2) { + t.Fatalf("item.priority != now2") + } + + pq.Push(&qitem{priority: now2}) + pq.Push(&qitem{priority: now2}) + pq.Push(&qitem{priority: now2}) + pq.Push(&qitem{priority: now2}) + pq.Push(&qitem{priority: now2}) + pq.Pop() + pq.Pop() + pq.Pop() + pq.Pop() + pq.Pop() + if pq.Len() != 0 { + t.Fatalf("pq should be empty") + } + now4 := now.Add(4 * time.Second) + now6 := now.Add(4 * time.Second) + pq.Push(&qitem{priority: now2}) + pq.Push(&qitem{priority: now4}) + pq.Push(&qitem{priority: now6}) + pq.Swap(0, 2) + if !pq[0].priority.Equal(now6) || !pq[2].priority.Equal(now2) { + t.Fatalf("swap failed") + } + if pq.Less(1, 2) { + t.Fatalf("now4 < now2") + } +} + +func TestPopEmptyPQ(t *testing.T) { + t.Parallel() + defer func() { + if r := recover(); r == nil { + t.Fatalf("Expected panic from popping an empty PQ") + } + }() + var pq priorityQueue + pq.Pop() +} + +type testjob struct { + d time.Duration + t time.Time +} + +func (j *testjob) GetDelay() time.Duration { + return j.d +} + +func TestDQ_sanity_check(t *testing.T) { + t.Parallel() + + dq := NewDelayQueue() + delay := 2 * time.Second + dq.Add(&testjob{d: delay}) + + before := time.Now() + x := dq.Pop() + + now := time.Now() + waitPeriod := now.Sub(before) + + if waitPeriod+tolerance < delay { + t.Fatalf("delay too short: %v, expected: %v", waitPeriod, delay) + } + if x == nil { + t.Fatalf("x is nil") + } + item := x.(*testjob) + if item.d != delay { + t.Fatalf("d != delay") + } +} + +func TestDQ_ordered_add_pop(t *testing.T) { + t.Parallel() + + dq := NewDelayQueue() + dq.Add(&testjob{d: 2 * time.Second}) + dq.Add(&testjob{d: 1 * time.Second}) + dq.Add(&testjob{d: 3 * time.Second}) + + var finished [3]*testjob + before := time.Now() + idx := int32(-1) + ch := make(chan bool, 3) + for _ = range finished { + go func() { + var ok bool + x := dq.Pop() + i := atomic.AddInt32(&idx, 1) + if finished[i], ok = x.(*testjob); !ok { + t.Fatalf("expected a *testjob, not %v", x) + } + finished[i].t = time.Now() + ch <- true + }() + } + <-ch + <-ch + <-ch + + after := time.Now() + totalDelay := after.Sub(before) + if totalDelay+tolerance < (3 * time.Second) { + t.Fatalf("totalDelay < 3s: %v", totalDelay) + } + for i, v := range finished { + if v == nil { + t.Fatalf("task %d was nil", i) + } + expected := time.Duration(i+1) * time.Second + if v.d != expected { + t.Fatalf("task %d had delay-priority %v, expected %v", i, v.d, expected) + } + actualDelay := v.t.Sub(before) + if actualDelay+tolerance < v.d { + t.Fatalf("task %d had actual-delay %v < expected delay %v", i, actualDelay, v.d) + } + } +} + +func TestDQ_always_pop_earliest_deadline(t *testing.T) { + t.Parallel() + + // add a testjob with delay of 2s + // spawn a func f1 that attempts to Pop() and wait for f1 to begin + // add a testjob with a delay of 1s + // check that the func f1 actually popped the 1s task (not the 2s task) + + dq := NewDelayQueue() + dq.Add(&testjob{d: 2 * time.Second}) + ch := make(chan *testjob) + started := make(chan bool) + + go func() { + started <- true + x := dq.Pop() + job := x.(*testjob) + job.t = time.Now() + ch <- job + }() + + <-started + time.Sleep(500 * time.Millisecond) // give plently of time for Pop() to enter + expected := 1 * time.Second + dq.Add(&testjob{d: expected}) + job := <-ch + + if expected != job.d { + t.Fatalf("Expected delay-prority of %v got instead got %v", expected, job.d) + } + + job = dq.Pop().(*testjob) + expected = 2 * time.Second + if expected != job.d { + t.Fatalf("Expected delay-prority of %v got instead got %v", expected, job.d) + } +} + +func TestDQ_always_pop_earliest_deadline_multi(t *testing.T) { + t.Parallel() + + dq := NewDelayQueue() + dq.Add(&testjob{d: 2 * time.Second}) + + ch := make(chan *testjob) + multi := 10 + started := make(chan bool, multi) + + go func() { + started <- true + for i := 0; i < multi; i++ { + x := dq.Pop() + job := x.(*testjob) + job.t = time.Now() + ch <- job + } + }() + + <-started + time.Sleep(500 * time.Millisecond) // give plently of time for Pop() to enter + expected := 1 * time.Second + + for i := 0; i < multi; i++ { + dq.Add(&testjob{d: expected}) + } + for i := 0; i < multi; i++ { + job := <-ch + if expected != job.d { + t.Fatalf("Expected delay-prority of %v got instead got %v", expected, job.d) + } + } + + job := dq.Pop().(*testjob) + expected = 2 * time.Second + if expected != job.d { + t.Fatalf("Expected delay-prority of %v got instead got %v", expected, job.d) + } +} + +func TestDQ_negative_delay(t *testing.T) { + t.Parallel() + + dq := NewDelayQueue() + delay := -2 * time.Second + dq.Add(&testjob{d: delay}) + + before := time.Now() + x := dq.Pop() + + now := time.Now() + waitPeriod := now.Sub(before) + + if waitPeriod > tolerance { + t.Fatalf("delay too long: %v, expected something less than: %v", waitPeriod, tolerance) + } + if x == nil { + t.Fatalf("x is nil") + } + item := x.(*testjob) + if item.d != delay { + t.Fatalf("d != delay") + } +} diff --git a/scheduler/backoff.go b/scheduler/backoff.go index 59af62e4..cb1c7156 100644 --- a/scheduler/backoff.go +++ b/scheduler/backoff.go @@ -66,8 +66,16 @@ func (p *podBackoff) getBackoff(podID string) time.Duration { return duration } -func (p *podBackoff) wait(podID string) { - time.Sleep(p.getBackoff(podID)) +// wait until some backoff period elapses, or else we receive a signal on the +// done channel - whichever comes first. it is not necessary to receive data +// over the channel, a closed channel will be the signal. the channel may be +// nil, meaning that the entire backoff period must elapse before returning. +func (p *podBackoff) wait(podID string, done <-chan empty) { + select { + case <-time.After(p.getBackoff(podID)): + case <-done: + log.V(3).Infof("Leaving backoff early for pod %s", podID) + } } func (p *podBackoff) gc() { diff --git a/scheduler/offers.go b/scheduler/offers.go new file mode 100644 index 00000000..ccbc7142 --- /dev/null +++ b/scheduler/offers.go @@ -0,0 +1,359 @@ +package scheduler + +import ( + "sync/atomic" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + log "github.com/golang/glog" + "github.com/mesos/mesos-go/mesos" + "github.com/mesosphere/kubernetes-mesos/queue" +) + +const ( + offerListenerMaxAge = 5 // max number of times we'll attempt to fit an offer to a listener before requiring them to re-register themselves + deferredDeclineTtlFactor = 2 // this factor, multiplied by the offer ttl, determines how long to wait before attempting to decline previously claimed offers that were subsequently deleted, then released. see offerStorage.Delete +) + +type OfferFilter func(*mesos.Offer) bool + +type OfferRegistry interface { + // Initialize the instance, spawning necessary housekeeping go routines. + Init() + Add([]*mesos.Offer) + + // Listen for arriving offers that are acceptable to the filter, sending + // a signal on (by closing) the returned channel. A listener will only + // ever be notified once, if at all. + Listen(id string, f OfferFilter) <-chan empty + + // invoked when offers are rescinded or expired + Delete(string) + + Get(offerId string) (PerishableOffer, bool) + + Walk(Walker) error + + // invalidate one or all (when offerId="") offers; offers are not declined, + // but are simply flagged as expired in the offer history + Invalidate(offerId string) +} + +// callback that is invoked during a walk through a series of live offers, +// returning with stop=true (or err != nil) if the walk should stop permaturely. +type Walker func(offer PerishableOffer) (stop bool, err error) + +type OfferRegistryConfig struct { + declineOffer func(offerId string) error + ttl time.Duration // determines a perishable offer's expiration deadline: now+ttl + lingerTtl time.Duration // if zero, offers will not linger in the FIFO past their expiration deadline + listenerDelay time.Duration // specifies the sleep time between offer listener notifications +} + +type offerStorage struct { + OfferRegistryConfig + offers *cache.FIFO // collection of PerishableOffer, both live and expired + listeners *cache.FIFO // collection of *offerListener + delayed *queue.DelayQueue // deadline-oriented offer-event queue +} + +type liveOffer struct { + *mesos.Offer + expiration time.Time + acquired int32 // 1 = acquired, 0 = free +} + +type expiredOffer struct { + id string + deadline time.Time +} + +type PerishableOffer interface { + queue.Delayed + // returns true if this offer has expired + HasExpired() bool + // if not yet expired, return mesos offer details; otherwise nil + Details() *mesos.Offer + // mark this offer as acquired, returning true if it was previously unacquired. thread-safe. + Acquire() bool + // mark this offer as un-acquired. thread-safe. + Release() + // expire or delete this offer from storage + age(s *offerStorage) +} + +func (e *expiredOffer) HasExpired() bool { + return true +} + +func (e *expiredOffer) Details() *mesos.Offer { + return nil +} + +func (e *expiredOffer) Acquire() bool { + return false +} + +func (e *expiredOffer) Release() {} + +func (e *expiredOffer) age(s *offerStorage) { + log.V(3).Infof("Delete lingering offer: %v", e.id) + s.offers.Delete(e.id) +} + +// return the time left to linger +func (e *expiredOffer) GetDelay() time.Duration { + return e.deadline.Sub(time.Now()) +} + +func (to *liveOffer) HasExpired() bool { + return time.Now().After(to.expiration) +} + +func (to *liveOffer) Details() *mesos.Offer { + return to.Offer +} + +func (to *liveOffer) Acquire() bool { + return atomic.CompareAndSwapInt32(&to.acquired, 0, 1) +} + +func (to *liveOffer) Release() { + atomic.CompareAndSwapInt32(&to.acquired, 1, 0) +} + +func (to *liveOffer) age(s *offerStorage) { + s.Delete(to.Offer.Id.GetValue()) +} + +// return the time remaining before the offer expires +func (to *liveOffer) GetDelay() time.Duration { + return to.expiration.Sub(time.Now()) +} + +func CreateOfferRegistry(c OfferRegistryConfig) OfferRegistry { + return &offerStorage{c, cache.NewFIFO(), cache.NewFIFO(), queue.NewDelayQueue()} +} + +func (s *offerStorage) Add(offers []*mesos.Offer) { + now := time.Now() + for _, offer := range offers { + offerId := offer.Id.GetValue() + log.V(3).Infof("Receiving offer %v", offerId) + timed := &liveOffer{offer, now.Add(s.ttl), 0} + s.offers.Add(offerId, timed) + s.delayed.Add(timed) + } +} + +// delete an offer from storage, meaning that we expire it +func (s *offerStorage) Delete(offerId string) { + if offer, ok := s.Get(offerId); ok { + log.V(3).Infof("Deleting offer %v", offerId) + // attempt to block others from consuming the offer. if it's already been + // claimed and is not yet lingering then don't decline it - just mark it as + // expired in the history: allow a prior claimant to attempt to launch with it + myoffer := offer.Acquire() + if offer.Details() != nil { + if myoffer { + log.V(3).Infof("Declining offer %v", offerId) + if err := s.declineOffer(offerId); err != nil { + log.Warningf("Failed to decline offer %v: %v", offerId, err) + } + } else { + // some pod has acquired this and may attempt to launch a task with it + // failed schedule/launch attempts are requried to Release() any claims on the offer + go func() { + // TODO(jdef): not sure what a good value is here. the goal is to provide a + // launchTasks (driver) operation enough time to complete so that we don't end + // up declining an offer that we're actually attempting to use. + time.Sleep(deferredDeclineTtlFactor * s.ttl) + + // at this point the offer is in one of five states: + // a) permanently deleted: expired due to timeout + // b) permanently deleted: expired due to having been rescinded + // c) lingering: expired due to timeout + // d) lingering: expired due to having been rescinded + // e) claimed: task launched and it using resources from this offer + // we want to **avoid** declining an offer that's claimed: attempt to acquire + if offer.Acquire() { + // previously claimed offer was released, perhaps due to a launch + // failure, so we should attempt to decline + if err := s.declineOffer(offerId); err != nil { + log.Warningf("Failed to decline (previously claimed) offer %v: %v", offerId, err) + } + } + }() + } + } + s.expireOffer(offer) + } // else, ignore offers not in the history +} + +// expire all known, live offers +func (s *offerStorage) Invalidate(offerId string) { + if offerId != "" { + s.invalidateOne(offerId) + return + } + obj := s.offers.List() + for _, o := range obj { + offer, ok := o.(PerishableOffer) + if !ok { + log.Errorf("Expected perishable offer, not %v", o) + continue + } + offer.Acquire() // attempt to block others from using it + s.expireOffer(offer) + // don't decline, we already know that it's an invalid offer + } +} + +func (s *offerStorage) invalidateOne(offerId string) { + if offer, ok := s.Get(offerId); ok { + offer.Acquire() // attempt to block others from using it + s.expireOffer(offer) + // don't decline, we already know that it's an invalid offer + } +} + +// Walk the collection of offers. The walk stops either as indicated by the +// Walker or when the end of the offer list is reached. Expired offers are +// never passed to a Walker. +func (s *offerStorage) Walk(w Walker) error { + for offerId := range s.offers.Contains() { + offer, ok := s.Get(offerId) + if !ok { + // offer disappeared... + continue + } + if offer.HasExpired() { + // never pass expired offers to walkers + continue + } + if stop, err := w(offer); err != nil { + return err + } else if stop { + return nil + } + } + return nil +} + +func (s *offerStorage) expireOffer(offer PerishableOffer) { + // the offer may or may not be expired due to TTL so check for details + // since that's a more reliable determinant of lingering status + if details := offer.Details(); details != nil { + // recently expired, should linger + offerId := details.Id.GetValue() + log.V(3).Infof("Expiring offer %v", offerId) + if s.lingerTtl > 0 { + log.V(3).Infof("offer will linger: %v", offerId) + expired := &expiredOffer{offerId, time.Now().Add(s.lingerTtl)} + s.offers.Update(offerId, expired) + s.delayed.Add(expired) + } else { + log.V(3).Infof("Permanently deleting offer %v", offerId) + s.offers.Delete(offerId) + } + } // else, it's still lingering... +} + +func (s *offerStorage) Get(id string) (PerishableOffer, bool) { + if obj, ok := s.offers.Get(id); !ok { + return nil, false + } else { + to, ok := obj.(PerishableOffer) + if !ok { + log.Errorf("invalid offer object in fifo '%v'", obj) + } + return to, ok + } +} + +type offerListener struct { + id string + accepts OfferFilter + notify chan<- empty + age int +} + +// register a listener for new offers, whom we'll notify upon receiving such. +// notification is delivered in the form of closing the channel, nothing is ever sent. +func (s *offerStorage) Listen(id string, f OfferFilter) <-chan empty { + if f == nil { + return nil + } + ch := make(chan empty) + listen := &offerListener{ + id: id, + accepts: f, + notify: ch, + age: 0, + } + log.V(3).Infof("Registering offer listener %s", listen.id) + s.listeners.Add(id, listen) + return ch +} + +func (s *offerStorage) ageOffers() { + offer, ok := s.delayed.Pop().(PerishableOffer) + if !ok { + log.Errorf("Expected PerishableOffer, not %v", offer) + return + } + if details := offer.Details(); details != nil && !offer.HasExpired() { + // live offer has not expired yet: timed out early + // FWIW: early timeouts are more frequent when GOMAXPROCS is > 1 + s.delayed.Add(offer) + } else { + offer.age(s) + } +} + +func (s *offerStorage) notifyListeners() { + var listen *offerListener + var ok bool + // get the next offer listener + for { + obj := s.listeners.Pop() + if listen, ok = obj.(*offerListener); ok { + break + } + log.Warningf("unexpected listener object %v", obj) + } + // notify if we find an acceptable offer + for id := range s.offers.Contains() { + var offer PerishableOffer + if offer, ok = s.Get(id); !ok || offer.HasExpired() { + continue + } + if listen.accepts(offer.Details()) { + log.V(3).Infof("Notifying offer listener %s", listen.id) + close(listen.notify) + return + } + } + // no interesting offers found, re-queue the listener + listen.age++ + if listen.age < offerListenerMaxAge { + // if the same listener has re-registered in the meantime we don't want to + // destroy the newer listener channel. this is racy since a newer listener + // can register between the Get() and Update(), but the consequences aren't + // very dire - the listener merely has to wait their full backoff period. + if _, ok := s.listeners.Get(listen.id); !ok { + log.V(3).Infof("Re-registering offer listener %s", listen.id) + s.listeners.Update(listen.id, listen) + } + } // else, you're gc'd +} + +func (s *offerStorage) Init() { + go util.Forever(s.ageOffers, 0) + + // to avoid a rush on offers, we add a short delay between each registered + // listener, so as to allow the most recently notified listener a bit of time + // to act on the offer. + go util.Forever(s.notifyListeners, s.listenerDelay) +} diff --git a/scheduler/offers_test.go b/scheduler/offers_test.go new file mode 100644 index 00000000..9dc71444 --- /dev/null +++ b/scheduler/offers_test.go @@ -0,0 +1,149 @@ +package scheduler + +import ( + "errors" + "testing" + "time" +) + +func TestTimedOffer(t *testing.T) { + t.Parallel() + + ttl := 2 * time.Second + now := time.Now() + o := &liveOffer{nil, now.Add(ttl), 0} + + if o.HasExpired() { + t.Errorf("offer ttl was %v and should not have expired yet", ttl) + } + if !o.Acquire() { + t.Fatal("1st acquisition of offer failed") + } + o.Release() + if !o.Acquire() { + t.Fatal("2nd acquisition of offer failed") + } + if o.Acquire() { + t.Fatal("3rd acquisition of offer passed but prior claim was not released") + } + o.Release() + if !o.Acquire() { + t.Fatal("4th acquisition of offer failed") + } + o.Release() + time.Sleep(ttl) + if !o.HasExpired() { + t.Fatal("offer not expired after ttl passed") + } + if !o.Acquire() { + t.Fatal("5th acquisition of offer failed; should not be tied to expiration") + } + if o.Acquire() { + t.Fatal("6th acquisition of offer succeeded; should already be acquired") + } +} // TestTimedOffer + +func TestWalk(t *testing.T) { + t.Parallel() + config := OfferRegistryConfig{ + declineOffer: func(offerId string) error { + return nil + }, + ttl: 0 * time.Second, + lingerTtl: 0 * time.Second, + listenerDelay: 0 * time.Second, + } + storage := CreateOfferRegistry(config) + acceptedOfferId := "" + walked := 0 + walker1 := func(p PerishableOffer) (bool, error) { + walked++ + if p.Acquire() { + acceptedOfferId = "foo" + return true, nil + } + return false, nil + } + // sanity check + err := storage.Walk(walker1) + if err != nil { + t.Fatalf("received impossible error %v", err) + } + if walked != 0 { + t.Fatal("walked empty storage") + } + if acceptedOfferId != "" { + t.Fatal("somehow found an offer when registry was empty") + } + impl, ok := storage.(*offerStorage) + if !ok { + t.Fatal("unexpected offer storage impl") + } + // single offer + ttl := 2 * time.Second + now := time.Now() + o := &liveOffer{nil, now.Add(ttl), 0} + + impl.offers.Add("x", o) + err = storage.Walk(walker1) + if err != nil { + t.Fatalf("received impossible error %v", err) + } + if walked != 1 { + t.Fatalf("walk count %d", walked) + } + if acceptedOfferId != "foo" { + t.Fatalf("found offer %v", acceptedOfferId) + } + + acceptedOfferId = "" + err = storage.Walk(walker1) + if err != nil { + t.Fatalf("received impossible error %v", err) + } + if walked != 2 { + t.Fatalf("walk count %d", walked) + } + if acceptedOfferId != "" { + t.Fatalf("found offer %v", acceptedOfferId) + } + + impl.offers.Add("y", o) // offer already Acquire()d + err = storage.Walk(walker1) + if err != nil { + t.Fatalf("received impossible error %v", err) + } + if walked != 4 { + t.Fatalf("walk count %d", walked) + } + if acceptedOfferId != "" { + t.Fatalf("found offer %v", acceptedOfferId) + } + + walker2 := func(p PerishableOffer) (bool, error) { + walked++ + return true, nil + } + err = storage.Walk(walker2) + if err != nil { + t.Fatalf("received impossible error %v", err) + } + if walked != 5 { + t.Fatalf("walk count %d", walked) + } + if acceptedOfferId != "" { + t.Fatalf("found offer %v", acceptedOfferId) + } + + walker3 := func(p PerishableOffer) (bool, error) { + walked++ + return true, errors.New("baz") + } + err = storage.Walk(walker3) + if err == nil { + t.Fatal("expected error") + } + if walked != 6 { + t.Fatalf("walk count %d", walked) + } +} diff --git a/scheduler/pod_task.go b/scheduler/pod_task.go new file mode 100644 index 00000000..1b299ced --- /dev/null +++ b/scheduler/pod_task.go @@ -0,0 +1,180 @@ +package scheduler + +import ( + "fmt" + + "code.google.com/p/go-uuid/uuid" + "code.google.com/p/goprotobuf/proto" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + log "github.com/golang/glog" + "github.com/mesos/mesos-go/mesos" + "gopkg.in/v1/yaml" +) + +const ( + containerCpus = 0.25 // initial CPU allocated for executor + containerMem = 64 // initial MB of memory allocated for executor +) + +// A struct that describes a pod task. +type PodTask struct { + ID string + Pod *api.Pod + TaskInfo *mesos.TaskInfo + Launched bool + Offer PerishableOffer +} + +func rangeResource(name string, ports []uint64) *mesos.Resource { + if len(ports) == 0 { + // pod may consist of a container that doesn't expose any ports on the host + return nil + } + return &mesos.Resource{ + Name: proto.String(name), + Type: mesos.Value_RANGES.Enum(), + Ranges: NewRanges(ports), + } +} + +// func NewRange(begin uint64, end uint64) *mesos.Value_Ranges { +func NewRanges(ports []uint64) *mesos.Value_Ranges { + r := make([]*mesos.Value_Range, 0) + for _, port := range ports { + r = append(r, &mesos.Value_Range{Begin: &port, End: &port}) + } + return &mesos.Value_Ranges{Range: r} +} + +func (t *PodTask) hasAcceptedOffer() bool { + return t.TaskInfo.TaskId != nil +} + +func (t *PodTask) GetOfferId() string { + if t.Offer == nil { + return "" + } + return t.Offer.Details().Id.GetValue() +} + +// Fill the TaskInfo in the PodTask, should be called during k8s scheduling, +// before binding. +func (t *PodTask) FillTaskInfo(offer PerishableOffer) error { + if offer == nil || offer.Details() == nil { + return fmt.Errorf("Nil offer for task %v", t) + } + details := offer.Details() + if details == nil { + return fmt.Errorf("Illegal offer for task %v: %v", t, offer) + } + if t.Offer != nil && t.Offer != offer { + return fmt.Errorf("Offer assignment must be idempotent with task %v: %v", t, offer) + } + t.Offer = offer + log.V(3).Infof("Recording offer(s) %v against pod %v", details.Id, t.Pod.ID) + + t.TaskInfo.TaskId = &mesos.TaskID{Value: proto.String(t.ID)} + t.TaskInfo.SlaveId = details.GetSlaveId() + t.TaskInfo.Resources = []*mesos.Resource{ + mesos.ScalarResource("cpus", containerCpus), + mesos.ScalarResource("mem", containerMem), + } + if ports := rangeResource("ports", t.Ports()); ports != nil { + t.TaskInfo.Resources = append(t.TaskInfo.Resources, ports) + } + var err error + if t.TaskInfo.Data, err = yaml.Marshal(&t.Pod.DesiredState.Manifest); err != nil { + return err + } + return nil +} + +// Clear offer-related details from the task, should be called if/when an offer +// has already been assigned to a task but for some reason is no longer valid. +func (t *PodTask) ClearTaskInfo() { + log.V(3).Infof("Clearing offer(s) from pod %v", t.Pod.ID) + t.Offer = nil + t.TaskInfo.TaskId = nil + t.TaskInfo.SlaveId = nil + t.TaskInfo.Resources = nil + t.TaskInfo.Data = nil +} + +func (t *PodTask) Ports() []uint64 { + ports := make([]uint64, 0) + for _, container := range t.Pod.DesiredState.Manifest.Containers { + // strip all port==0 from this array; k8s already knows what to do with zero- + // ports (it does not create 'port bindings' on the minion-host); we need to + // remove the wildcards from this array since they don't consume host resources + for _, port := range container.Ports { + // HostPort is int, not uint64. + if port.HostPort != 0 { + ports = append(ports, uint64(port.HostPort)) + } + } + } + + return ports +} + +func (t *PodTask) AcceptOffer(offer *mesos.Offer) bool { + var cpus float64 = 0 + var mem float64 = 0 + + // Mimic set type + requiredPorts := make(map[uint64]struct{}) + for _, port := range t.Ports() { + requiredPorts[port] = struct{}{} + } + + for _, resource := range offer.Resources { + if resource.GetName() == "cpus" { + cpus = *resource.GetScalar().Value + } + + if resource.GetName() == "mem" { + mem = *resource.GetScalar().Value + } + + if resource.GetName() == "ports" { + for _, r := range (*resource).GetRanges().Range { + bp := r.GetBegin() + ep := r.GetEnd() + + for port, _ := range requiredPorts { + log.V(2).Infof("Evaluating port range {%d:%d} %d", bp, ep, port) + + if (bp <= port) && (port <= ep) { + delete(requiredPorts, port) + } + } + } + } + } + + unsatisfiedPorts := len(requiredPorts) + if unsatisfiedPorts > 0 { + log.V(2).Infof("Could not schedule pod %s: %d ports could not be allocated", t.Pod.ID, unsatisfiedPorts) + return false + } + + if (cpus < containerCpus) || (mem < containerMem) { + log.V(2).Infof("Not enough resources: cpus: %f mem: %f", cpus, mem) + return false + } + + return true +} + +func newPodTask(pod *api.Pod, executor *mesos.ExecutorInfo) (*PodTask, error) { + taskId := uuid.NewUUID().String() + task := &PodTask{ + ID: taskId, // pod.JSONBase.ID, + Pod: pod, + TaskInfo: new(mesos.TaskInfo), + Launched: false, + } + task.TaskInfo.Name = proto.String("PodTask") + task.TaskInfo.Executor = executor + return task, nil +} diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 1336f688..d9a94cf6 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -3,10 +3,11 @@ package scheduler import ( "container/ring" "encoding/json" + "errors" "fmt" "sync" + "time" - "code.google.com/p/go-uuid/uuid" "code.google.com/p/goprotobuf/proto" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" @@ -24,9 +25,23 @@ import ( ) const ( - containerCpus = 0.25 - containerMem = 64 - defaultFinishedTasksSize = 1024 + defaultFinishedTasksSize = 1024 // size of the finished task history buffer + defaultOfferTTL = 5 // seconds that an offer is viable, prior to being expired + defaultOfferLingerTTL = 120 // seconds that an expired offer lingers in history + defaultListenerDelay = 1 // number of seconds between offer listener notifications +) + +type stateType int + +const ( + statePending stateType = iota + stateRunning + stateFinished + stateUnknown +) + +var ( + noSuitableOffersErr = errors.New("No suitable offers for pod/task") ) // PodScheduleFunc implements how to schedule pods among slaves. @@ -39,168 +54,19 @@ const ( // details. // // See the FIFOScheduleFunc for example. -type PodScheduleFunc func(k *KubernetesScheduler, slaves map[string]*Slave, task *PodTask) (string, error) - -// A struct that describes a pod task. -type PodTask struct { - ID string - Pod *api.Pod - TaskInfo *mesos.TaskInfo - OfferIds []string - Launched bool -} - -func rangeResource(name string, ports []uint64) *mesos.Resource { - if len(ports) == 0 { - // pod may consist of a container that doesn't expose any ports on the host - return nil - } - return &mesos.Resource{ - Name: proto.String(name), - Type: mesos.Value_RANGES.Enum(), - Ranges: NewRanges(ports), - } -} - -// func NewRange(begin uint64, end uint64) *mesos.Value_Ranges { -func NewRanges(ports []uint64) *mesos.Value_Ranges { - r := make([]*mesos.Value_Range, 0) - for _, port := range ports { - r = append(r, &mesos.Value_Range{Begin: &port, End: &port}) - } - return &mesos.Value_Ranges{Range: r} -} - -func (t *PodTask) hasAcceptedOffer() bool { - return t.TaskInfo.TaskId != nil -} - -// Fill the TaskInfo in the PodTask, should be called during k8s scheduling, -// before binding. -func (t *PodTask) FillTaskInfo(slaveId string, offer *mesos.Offer) { - t.OfferIds = append(t.OfferIds, offer.GetId().GetValue()) - - var err error - - // TODO(nnielsen): We only launch one pod per offer. We should be able to launch multiple. - - // TODO(nnielsen): Assign/aquire pod port. - t.TaskInfo.TaskId = &mesos.TaskID{Value: proto.String(t.ID)} - t.TaskInfo.SlaveId = &mesos.SlaveID{Value: proto.String(slaveId)} - t.TaskInfo.Resources = []*mesos.Resource{ - mesos.ScalarResource("cpus", containerCpus), - mesos.ScalarResource("mem", containerMem), - } - if ports := rangeResource("ports", t.Ports()); ports != nil { - t.TaskInfo.Resources = append(t.TaskInfo.Resources, ports) - } - t.TaskInfo.Data, err = yaml.Marshal(&t.Pod.DesiredState.Manifest) - if err != nil { - log.Warningf("Failed to marshal the manifest") - } -} - -// Clear offer-related details from the task, should be called if/when an offer -// has already been assigned to a task but for some reason is no longer valid. -func (t *PodTask) ClearTaskInfo() { - t.OfferIds = nil - t.TaskInfo.TaskId = nil - t.TaskInfo.SlaveId = nil - t.TaskInfo.Resources = nil - t.TaskInfo.Data = nil -} - -func (t *PodTask) Ports() []uint64 { - ports := make([]uint64, 0) - for _, container := range t.Pod.DesiredState.Manifest.Containers { - // strip all port==0 from this array; k8s already knows what to do with zero- - // ports (it does not create 'port bindings' on the minion-host); we need to - // remove the wildcards from this array since they don't consume host resources - for _, port := range container.Ports { - // HostPort is int, not uint64. - if port.HostPort != 0 { - ports = append(ports, uint64(port.HostPort)) - } - } - } - - return ports -} - -func (t *PodTask) AcceptOffer(slaveId string, offer *mesos.Offer) bool { - var cpus float64 = 0 - var mem float64 = 0 - - // Mimic set type - requiredPorts := make(map[uint64]struct{}) - for _, port := range t.Ports() { - requiredPorts[port] = struct{}{} - } - - for _, resource := range offer.Resources { - if resource.GetName() == "cpus" { - cpus = *resource.GetScalar().Value - } - - if resource.GetName() == "mem" { - mem = *resource.GetScalar().Value - } - - if resource.GetName() == "ports" { - for _, r := range (*resource).GetRanges().Range { - bp := r.GetBegin() - ep := r.GetEnd() - - for port, _ := range requiredPorts { - log.V(2).Infof("Evaluating port range {%d:%d} %d", bp, ep, port) - - if (bp <= port) && (port <= ep) { - delete(requiredPorts, port) - } - } - } - } - } - - unsatisfiedPorts := len(requiredPorts) - if unsatisfiedPorts > 0 { - log.V(2).Infof("Could not schedule pod %s: %d ports could not be allocated on slave %s", t.Pod.ID, unsatisfiedPorts, slaveId) - return false - } - - if (cpus < containerCpus) || (mem < containerMem) { - log.V(2).Infof("Not enough resources: cpus: %f mem: %f", cpus, mem) - return false - } - - return true -} - -func newPodTask(pod *api.Pod, executor *mesos.ExecutorInfo) (*PodTask, error) { - taskId := uuid.NewUUID().String() - task := &PodTask{ - ID: taskId, // pod.JSONBase.ID, - Pod: pod, - TaskInfo: new(mesos.TaskInfo), - Launched: false, - } - task.TaskInfo.Name = proto.String("PodTask") - task.TaskInfo.Executor = executor - return task, nil -} +type PodScheduleFunc func(r OfferRegistry, slaves map[string]*Slave, task *PodTask) (PerishableOffer, error) // A struct that describes the slave. +type empty struct{} type Slave struct { HostName string - Offers map[string]*mesos.Offer - tasks map[string]*PodTask + Offers map[string]empty } func newSlave(hostName string) *Slave { return &Slave{ HostName: hostName, - Offers: make(map[string]*mesos.Offer), - tasks: make(map[string]*PodTask), + Offers: make(map[string]empty), } } @@ -224,8 +90,7 @@ type KubernetesScheduler struct { masterInfo *mesos.MasterInfo registered bool - // OfferID => offer. - offers map[string]*mesos.Offer + offers OfferRegistry // SlaveID => slave. slaves map[string]*Slave @@ -251,7 +116,8 @@ type KubernetesScheduler struct { // New create a new KubernetesScheduler func New(executor *mesos.ExecutorInfo, scheduleFunc PodScheduleFunc, client *client.Client, helper tools.EtcdHelper, sr service.Registry) *KubernetesScheduler { - return &KubernetesScheduler{ + var k *KubernetesScheduler + k = &KubernetesScheduler{ new(sync.RWMutex), helper, executor, @@ -259,7 +125,15 @@ func New(executor *mesos.ExecutorInfo, scheduleFunc PodScheduleFunc, client *cli nil, nil, false, - make(map[string]*mesos.Offer), + CreateOfferRegistry(OfferRegistryConfig{ + declineOffer: func(id string) error { + offerId := &mesos.OfferID{Value: proto.String(id)} + return k.Driver.DeclineOffer(offerId, nil) + }, + ttl: defaultOfferTTL * time.Second, + lingerTtl: defaultOfferLingerTTL * time.Second, // remember expired offers so that we can tell if a previously scheduler offer relies on one + listenerDelay: defaultListenerDelay * time.Second, + }), make(map[string]*Slave), make(map[string]string), make(map[string]*PodTask), @@ -271,6 +145,25 @@ func New(executor *mesos.ExecutorInfo, scheduleFunc PodScheduleFunc, client *cli cache.NewFIFO(), sr, } + return k +} + +// assume that the caller has already locked around access to task state +func (k *KubernetesScheduler) getTask(taskId string) (*PodTask, stateType) { + if task, found := k.runningTasks[taskId]; found { + return task, stateRunning + } + if task, found := k.pendingTasks[taskId]; found { + return task, statePending + } + if containsTask(k.finishedTasks, taskId) { + return nil, stateFinished + } + return nil, stateUnknown +} + +func (k *KubernetesScheduler) Init() { + k.offers.Init() } // Registered is called when the scheduler registered with the master successfully. @@ -298,13 +191,7 @@ func (k *KubernetesScheduler) Disconnected(driver mesos.SchedulerDriver) { defer k.Unlock() // discard all cached offers to avoid unnecessary TASK_LOST updates - for offerId := range k.offers { - k.deleteOffer(offerId) - } - - // TODO(jdef): it's possible that a task is pending, in between Schedule() and - // Bind(), such that it's offer is now invalid. We should check for that and - // clearing the offer from the task (along with a related check in Bind()) + k.offers.Invalidate("") } // ResourceOffers is called when the scheduler receives some offers from the master. @@ -316,8 +203,7 @@ func (k *KubernetesScheduler) ResourceOffers(driver mesos.SchedulerDriver, offer defer k.Unlock() // Record the offers in the global offer map as well as each slave's offer map. - // TODO(jdef): we probably don't want to hold onto these offers forever because that's greedy - + k.offers.Add(offers) for _, offer := range offers { offerId := offer.GetId().GetValue() slaveId := offer.GetSlaveId().GetValue() @@ -327,21 +213,24 @@ func (k *KubernetesScheduler) ResourceOffers(driver mesos.SchedulerDriver, offer k.slaves[slaveId] = newSlave(offer.GetHostname()) slave = k.slaves[slaveId] } - slave.Offers[offerId] = offer - k.offers[offerId] = offer + slave.Offers[offerId] = empty{} k.slaveIDs[slave.HostName] = slaveId } } // requires the caller to have locked the offers and slaves state func (k *KubernetesScheduler) deleteOffer(oid string) { - slaveId := k.offers[oid].GetSlaveId().GetValue() - delete(k.offers, oid) + if offer, ok := k.offers.Get(oid); ok { + k.offers.Delete(oid) + if details := offer.Details(); details != nil { + slaveId := details.GetSlaveId().GetValue() - if slave, found := k.slaves[slaveId]; !found { - log.Warningf("No slave for id %s associated with offer id %s", slaveId, oid) - } else { - delete(slave.Offers, oid) + if slave, found := k.slaves[slaveId]; !found { + log.Infof("No slave for id %s associated with offer id %s", slaveId, oid) + } else { + delete(slave.Offers, oid) + } + } // else, offer already expired / lingering } } @@ -390,27 +279,26 @@ func (k *KubernetesScheduler) handleTaskStarting(taskStatus *mesos.TaskStatus) { func (k *KubernetesScheduler) handleTaskRunning(taskStatus *mesos.TaskStatus) { taskId, slaveId := taskStatus.GetTaskId().GetValue(), taskStatus.GetSlaveId().GetValue() - slave, exists := k.slaves[slaveId] - if !exists { + if _, exists := k.slaves[slaveId]; !exists { log.Warningf("Ignore status TASK_RUNNING because the slave does not exist\n") return } - task, exists := k.pendingTasks[taskId] - if !exists { - log.Warningf("Ignore status TASK_RUNNING (%s) because the the task is discarded: '%v'", taskId, k.pendingTasks) - return - } - if _, exists = k.runningTasks[taskId]; exists { + switch task, state := k.getTask(taskId); state { + case statePending: + log.Infof("Received running status for pending task: '%v'", taskStatus) + k.fillRunningPodInfo(task, taskStatus) + k.runningTasks[taskId] = task + delete(k.pendingTasks, taskId) + case stateRunning: log.Warningf("Ignore status TASK_RUNNING because the the task is already running") - return - } - if containsTask(k.finishedTasks, taskId) { + case stateFinished: log.Warningf("Ignore status TASK_RUNNING because the the task is already finished") - return + default: + log.Warningf("Ignore status TASK_RUNNING (%s) because the the task is discarded", taskId) } +} - log.Infof("Received running status: '%v'", taskStatus) - +func (k *KubernetesScheduler) fillRunningPodInfo(task *PodTask, taskStatus *mesos.TaskStatus) { task.Pod.CurrentState.Status = task.Pod.DesiredState.Status task.Pod.CurrentState.Manifest = task.Pod.DesiredState.Manifest task.Pod.CurrentState.Host = task.Pod.DesiredState.Host @@ -433,121 +321,77 @@ func (k *KubernetesScheduler) handleTaskRunning(taskStatus *mesos.TaskStatus) { } else { log.Warningf("Couldn't find network container for %s in %v", task.Pod.ID, target) } + } else { + log.Errorf("Invalid TaskStatus.Data for task '%v': %v", task.ID, err) } } else { - log.Warningf("Missing Data for task '%v'", taskId) + log.Errorf("Missing TaskStatus.Data for task '%v'", task.ID) } - - k.runningTasks[taskId] = task - slave.tasks[taskId] = task - delete(k.pendingTasks, taskId) } func (k *KubernetesScheduler) handleTaskFinished(taskStatus *mesos.TaskStatus) { taskId, slaveId := taskStatus.GetTaskId().GetValue(), taskStatus.GetSlaveId().GetValue() - slave, exists := k.slaves[slaveId] - if !exists { + if _, exists := k.slaves[slaveId]; !exists { log.Warningf("Ignore status TASK_FINISHED because the slave does not exist\n") return } - if _, exists := k.pendingTasks[taskId]; exists { + switch task, state := k.getTask(taskId); state { + case statePending: panic("Pending task finished, this couldn't happen") - } - if _, exists := k.runningTasks[taskId]; exists { - log.Warningf("Ignore status TASK_FINISHED because the the task is not running") - return - } - if containsTask(k.finishedTasks, taskId) { + case stateRunning: + log.V(2).Infof( + "Received finished status for running task: '%v', running/pod task queue length = %d/%d", + taskStatus, len(k.runningTasks), len(k.podToTask)) + delete(k.podToTask, task.Pod.ID) + k.finishedTasks.Next().Value = taskId + delete(k.runningTasks, taskId) + case stateFinished: log.Warningf("Ignore status TASK_FINISHED because the the task is already finished") - return - } - - task := k.runningTasks[taskId] - _, exists = k.podToTask[task.ID] - if exists { - delete(k.podToTask, task.ID) + default: + log.Warningf("Ignore status TASK_FINISHED because the the task is not running") } - - k.finishedTasks.Next().Value = taskId - delete(k.runningTasks, taskId) - delete(slave.tasks, taskId) } func (k *KubernetesScheduler) handleTaskFailed(taskStatus *mesos.TaskStatus) { log.Errorf("Task failed: '%v'", taskStatus) - taskId := taskStatus.GetTaskId().GetValue() - podId := "" - if _, exists := k.pendingTasks[taskId]; exists { - task := k.pendingTasks[taskId] - podId = task.Pod.ID + switch task, state := k.getTask(taskId); state { + case statePending: delete(k.pendingTasks, taskId) - } - if _, exists := k.runningTasks[taskId]; exists { - task := k.runningTasks[taskId] - podId = task.Pod.ID + delete(k.podToTask, task.Pod.ID) + case stateRunning: delete(k.runningTasks, taskId) + delete(k.podToTask, task.Pod.ID) } - - if podId != "" { - _, exists := k.podToTask[podId] - if exists { - delete(k.podToTask, podId) - } - } - // TODO (jdefelice) delete from slave.tasks? } func (k *KubernetesScheduler) handleTaskKilled(taskStatus *mesos.TaskStatus) { log.Errorf("Task killed: '%v'", taskStatus) taskId := taskStatus.GetTaskId().GetValue() - podId := "" - if _, exists := k.pendingTasks[taskId]; exists { - task := k.pendingTasks[taskId] - podId = task.Pod.ID + switch task, state := k.getTask(taskId); state { + case statePending: delete(k.pendingTasks, taskId) - } - if _, exists := k.runningTasks[taskId]; exists { - task := k.runningTasks[taskId] - podId = task.Pod.ID + delete(k.podToTask, task.Pod.ID) + case stateRunning: delete(k.runningTasks, taskId) + delete(k.podToTask, task.Pod.ID) } - - if podId != "" { - log.V(2).Infof("Deleting pod-task mapping: %s", podId) - _, exists := k.podToTask[podId] - if exists { - delete(k.podToTask, podId) - } - } - // TODO (jdefelice) delete from slave.tasks? } func (k *KubernetesScheduler) handleTaskLost(taskStatus *mesos.TaskStatus) { log.Errorf("Task lost: '%v'", taskStatus) taskId := taskStatus.GetTaskId().GetValue() - podId := "" - if _, exists := k.pendingTasks[taskId]; exists { - task := k.pendingTasks[taskId] - podId = task.Pod.ID + switch task, state := k.getTask(taskId); state { + case statePending: delete(k.pendingTasks, taskId) - } - if _, exists := k.runningTasks[taskId]; exists { - task := k.runningTasks[taskId] - podId = task.Pod.ID + delete(k.podToTask, task.Pod.ID) + case stateRunning: delete(k.runningTasks, taskId) + delete(k.podToTask, task.Pod.ID) } - - if podId != "" { - _, exists := k.podToTask[podId] - if exists { - delete(k.podToTask, podId) - } - } - // TODO (jdefelice) delete from slave.tasks? } // FrameworkMessage is called when the scheduler receives a message from the executor. @@ -559,7 +403,21 @@ func (k *KubernetesScheduler) FrameworkMessage(driver mesos.SchedulerDriver, // SlaveLost is called when some slave is lost. func (k *KubernetesScheduler) SlaveLost(driver mesos.SchedulerDriver, slaveId *mesos.SlaveID) { log.Infof("Slave %v is lost\n", slaveId) - // TODO(yifan): Restart any unfinished tasks on that slave. + + k.Lock() + defer k.Unlock() + + // invalidate all offers mapped to that slave + if slave, ok := k.slaves[slaveId.GetValue()]; ok { + for offerId := range slave.Offers { + k.offers.Invalidate(offerId) + } + } + + // TODO(jdef): delete slave from our internal list? + + // unfinished tasks/pods will be dropped. use a replication controller if you want pods to + // be restarted when slaves die. } // ExecutorLost is called when some executor is lost. @@ -595,43 +453,33 @@ func (k *KubernetesScheduler) Schedule(pod api.Pod, unused algorithm.MinionListe // Call ScheduleFunc and subtract some resources, returning the name of the machine the task is scheduled on func (k *KubernetesScheduler) doSchedule(task *PodTask) (string, error) { - offerId, err := k.scheduleFunc(k, k.slaves, task) + offer, err := k.scheduleFunc(k.offers, k.slaves, task) if err != nil { return "", err } - offer, ok := k.offers[offerId] - if !ok { - task.ClearTaskInfo() - return "", fmt.Errorf("Offer disappeared (%v) while scheduling task %v", offerId, task.ID) - } - slaveId := offer.GetSlaveId().GetValue() - slave, ok := k.slaves[slaveId] - if !ok { - //TODO(jdef): decline offer? + slaveId := offer.Details().GetSlaveId().GetValue() + if slave, ok := k.slaves[slaveId]; !ok { + // not much sense in Release()ing the offer here since its owner died + offer.Release() + k.offers.Invalidate(offer.Details().Id.GetValue()) task.ClearTaskInfo() return "", fmt.Errorf("Slave disappeared (%v) while scheduling task %v", slaveId, task.ID) + } else { + task.FillTaskInfo(offer) + return slave.HostName, nil } - task.FillTaskInfo(slaveId, offer) - - // Subtract offers. - for _, offerId := range task.OfferIds { - k.deleteOffer(offerId) - } - return slave.HostName, nil } // implementation of scheduling plugin's NextPod func; see plugin/pkg/scheduler func (k *KubernetesScheduler) yield() *api.Pod { pod := k.podQueue.Pop().(*api.Pod) - // TODO: Remove or reduce verbosity by sep 6th, 2014. Leave until then to - // make it easy to find scheduling problems. - log.Infof("About to try and schedule pod %v\n", pod.ID) + log.V(2).Infof("About to try and schedule pod %v\n", pod.ID) return pod } // implementation of scheduling plugin's Error func; see plugin/pkg/scheduler func (k *KubernetesScheduler) handleSchedulingError(backoff *podBackoff, pod *api.Pod, err error) { - log.Errorf("Error scheduling %v: %v; retrying", pod.ID, err) + log.Infof("Error scheduling %v: %v; retrying", pod.ID, err) backoff.gc() // Retry asynchronously. @@ -639,13 +487,29 @@ func (k *KubernetesScheduler) handleSchedulingError(backoff *podBackoff, pod *ap go func() { defer util.HandleCrash() podId := pod.ID - backoff.wait(podId) + // did we error out because if non-matching offers? if so, register an offer + // listener to be notified if/when a matching offer comes in. + var offersAvailable <-chan empty + if err == noSuitableOffersErr { + offersAvailable = k.offers.Listen(podId, func(offer *mesos.Offer) bool { + k.RLock() + defer k.RUnlock() + if taskId, ok := k.podToTask[podId]; ok { + switch task, state := k.getTask(taskId); state { + case statePending: + return task.AcceptOffer(offer) + } + } + return false + }) + } + backoff.wait(podId, offersAvailable) // Get the pod again; it may have changed/been scheduled already. pod = &api.Pod{} err := k.client.Get().Path("pods").Path(podId).Do().Into(pod) if err != nil { - log.Errorf("Error getting pod %v for retry: %v; abandoning", podId, err) + log.Infof("Failed to get pod %v for retry: %v; abandoning", podId, err) return } if pod.DesiredState.Host == "" { @@ -662,7 +526,7 @@ func (k *KubernetesScheduler) handleSchedulingError(backoff *podBackoff, pod *ap k.podQueue.Add(pod.ID, pod) } else { // this state shouldn't really be possible, so I'm warning if we ever see it - log.Warningf("Scheduler detected pod no longer pending: %v, will not re-queue", podId) + log.Errorf("Scheduler detected pod no longer pending: %v, will not re-queue; possible offer leak", podId) } } else { log.Infof("Scheduler detected deleted pod: %v, will not re-queue", podId) @@ -678,7 +542,7 @@ func (k *KubernetesScheduler) ListPods(selector labels.Selector) (*api.PodList, k.RLock() defer k.RUnlock() - var result []api.Pod + result := []api.Pod{} for _, task := range k.runningTasks { pod := task.Pod @@ -687,8 +551,6 @@ func (k *KubernetesScheduler) ListPods(selector labels.Selector) (*api.PodList, result = append(result, *pod) } } - - // TODO(nnielsen): Refactor tasks append for the three lists. for _, task := range k.pendingTasks { pod := task.Pod @@ -698,15 +560,19 @@ func (k *KubernetesScheduler) ListPods(selector labels.Selector) (*api.PodList, } } - // TODO(nnielsen): Wire up check in finished tasks + // TODO(nnielsen): Wire up check in finished tasks. (jdef) not sure how many + // finished tasks are really appropriate to return here. finished tasks do not + // have a TTL in the finishedTasks ring and I don't think we want to return + // hundreds of finished tasks here. matches := &api.PodList{Items: result} - log.V(2).Infof("Returning pods: '%v'\n", matches) + log.V(5).Infof("Returning pods: '%v'\n", matches) return matches, nil } -// Get a specific pod. +// Get a specific pod. It's *very* important to return a clone of the Pod that +// we've saved because our caller will likely modify it. func (k *KubernetesScheduler) GetPod(podId string) (*api.Pod, error) { log.V(2).Infof("Get pod '%s'\n", podId) @@ -718,19 +584,22 @@ func (k *KubernetesScheduler) GetPod(podId string) (*api.Pod, error) { return nil, fmt.Errorf("Could not resolve pod '%s' to task id", podId) } - if task, exists := k.pendingTasks[taskId]; exists { - // return nil, fmt.Errorf("Pod '%s' is still pending", podId) - log.V(2).Infof("Pending Pod '%s': %v", podId, task.Pod) - return task.Pod, nil - } - if containsTask(k.finishedTasks, taskId) { + switch task, state := k.getTask(taskId); state { + case statePending: + log.V(5).Infof("Pending Pod '%s': %v", podId, task.Pod) + podCopy := *task.Pod + return &podCopy, nil + case stateRunning: + log.V(5).Infof("Running Pod '%s': %v", podId, task.Pod) + podCopy := *task.Pod + return &podCopy, nil + case stateFinished: return nil, fmt.Errorf("Pod '%s' is finished", podId) + case stateUnknown: + return nil, fmt.Errorf("Unknown Pod %v", podId) + default: + return nil, fmt.Errorf("Unexpected task state %v for task %v", state, taskId) } - if task, exists := k.runningTasks[taskId]; exists { - log.V(2).Infof("Running Pod '%s': %v", podId, task.Pod) - return task.Pod, nil - } - return nil, fmt.Errorf("Unknown Pod %v", podId) } // Create a pod based on a specification; DOES NOT schedule it onto a specific machine, @@ -781,13 +650,45 @@ func (k *KubernetesScheduler) Bind(binding *api.Binding) error { return fmt.Errorf("Pod Task does not exist %v\n", taskId) } - // TODO(jdef): ensure that the task hasAcceptedOffer(), it's possible that between - // Schedule() and now that the offer for this task was rescinded or invalidated + // sanity check: ensure that the task hasAcceptedOffer(), it's possible that between + // Schedule() and now that the offer for this task was rescinded or invalidated. + // ((we should never see this here)) + if !task.hasAcceptedOffer() { + return fmt.Errorf("task has not accepted a valid offer, pod %v", podId) + } + + // By this time, there is a chance that the slave is disconnected. + offerId := task.GetOfferId() + if offer, ok := k.offers.Get(offerId); !ok || offer.HasExpired() { + // already rescinded or timed out or otherwise invalidated + task.Offer.Release() + task.ClearTaskInfo() + return fmt.Errorf("failed prior to launchTask due to expired offer, pod %v", podId) + } + var err error + if err = k.prepareTaskForLaunch(binding.Host, task); err == nil { + log.V(2).Infof("Launching task : %v", task) + taskList := []*mesos.TaskInfo{task.TaskInfo} + if err = k.Driver.LaunchTasks(task.Offer.Details().Id, taskList, nil); err == nil { + // we *intentionally* do not record our binding to etcd since we're not using bindings + // to manage pod lifecycle + task.Pod.DesiredState.Host = binding.Host + task.Launched = true + k.offers.Invalidate(offerId) + return nil + } + } + task.Offer.Release() + task.ClearTaskInfo() + return fmt.Errorf("Failed to launch task for pod %s: %v", podId, err) +} + +func (k *KubernetesScheduler) prepareTaskForLaunch(machine string, task *PodTask) error { // TODO(k8s): move this to a watch/rectification loop. - manifest, err := k.makeManifest(binding.Host, *task.Pod) + manifest, err := k.makeManifest(machine, *task.Pod) if err != nil { - log.Warningf("Failed to generate an updated manifest") + log.V(2).Infof("Failed to generate an updated manifest") return err } @@ -798,23 +699,9 @@ func (k *KubernetesScheduler) Bind(binding *api.Binding) error { task.Pod.DesiredState.Manifest = manifest task.TaskInfo.Data, err = yaml.Marshal(&manifest) if err != nil { - log.Warningf("Failed to marshal the updated manifest") + log.V(2).Infof("Failed to marshal the updated manifest") return err } - - // TODO(yifan): By this time, there is a chance that the slave is disconnected. - log.V(2).Infof("Launching task : %v", task) - offerId := &mesos.OfferID{Value: proto.String(task.OfferIds[0])} - if err := k.Driver.LaunchTasks(offerId, []*mesos.TaskInfo{task.TaskInfo}, nil); err != nil { - task.ClearTaskInfo() - // TODO(jdef): decline the offer too? - return fmt.Errorf("Failed to launch task for pod %s: %v", podId, err) - } - task.Pod.DesiredState.Host = binding.Host - task.Launched = true - - // we *intentionally* do not record our binding to etcd since we're not using bindings - // to manage pod lifecycle return nil } @@ -847,15 +734,10 @@ func (k *KubernetesScheduler) DeletePod(podId string) error { k.Lock() defer k.Unlock() - // TODO(jdef): set pod.DesiredState.Host="" - // The k8s DeletePod() implementation does this, and the k8s REST implementation - // uses this state to determine what level of pod detail should be returned to the - // end user. Need to think more about the impact of setting this to "" before doing - // the same. - // prevent the scheduler from attempting to pop this; it's also possible that // it's concurrently being scheduled (somewhere between pod scheduling and - // binding) + // binding) - if so, then we'll end up removing it from pendingTasks which + // will abort Bind()ing k.podQueue.Delete(podId) taskId, exists := k.podToTask[podId] @@ -865,23 +747,30 @@ func (k *KubernetesScheduler) DeletePod(podId string) error { // determine if the task has already been launched to mesos, if not then // cleanup is easier (podToTask,pendingTasks) since there's no state to sync + var killTaskId *mesos.TaskID + task, state := k.getTask(taskId) - if task, exists := k.runningTasks[taskId]; exists { - taskId := &mesos.TaskID{Value: proto.String(task.ID)} - return k.Driver.KillTask(taskId) - } - - if task, exists := k.pendingTasks[taskId]; exists { + switch state { + case stateRunning: + killTaskId = &mesos.TaskID{Value: proto.String(task.ID)} + case statePending: if !task.Launched { + // we've been invoked in between Schedule() and Bind() + if task.hasAcceptedOffer() { + task.Offer.Release() + task.ClearTaskInfo() + } delete(k.podToTask, podId) delete(k.pendingTasks, taskId) return nil } - taskId := &mesos.TaskID{Value: proto.String(task.ID)} - return k.Driver.KillTask(taskId) + killTaskId = &mesos.TaskID{Value: proto.String(task.ID)} + default: + return fmt.Errorf("Cannot kill pod '%s': pod not found", podId) } - - return fmt.Errorf("Cannot kill pod '%s': pod not found", podId) + // signal to watchers that the related pod is going down + task.Pod.DesiredState.Host = "" + return k.Driver.KillTask(killTaskId) } func (k *KubernetesScheduler) WatchPods(resourceVersion uint64, filter func(*api.Pod) bool) (watch.Interface, error) { @@ -889,31 +778,45 @@ func (k *KubernetesScheduler) WatchPods(resourceVersion uint64, filter func(*api } // A FCFS scheduler. -func FCFSScheduleFunc(k *KubernetesScheduler, slaves map[string]*Slave, task *PodTask) (string, error) { +func FCFSScheduleFunc(r OfferRegistry, slaves map[string]*Slave, task *PodTask) (PerishableOffer, error) { if task.hasAcceptedOffer() { // verify that the offer is still on the table - offerId := task.OfferIds[0] - if _, ok := k.offers[offerId]; ok { + offerId := task.GetOfferId() + if offer, ok := r.Get(offerId); ok && !offer.HasExpired() { // skip tasks that have already have assigned offers - return offerId, nil + return task.Offer, nil } - // TODO(jdef): decline offer? + task.Offer.Release() task.ClearTaskInfo() } - for slaveId, slave := range slaves { - for _, offer := range slave.Offers { - if !task.AcceptOffer(slaveId, offer) { - log.V(2).Infof("Declining offer %v", offer) - if err := k.Driver.DeclineOffer(offer.Id, nil); err != nil { - log.Warningf("Failed to decline offer %v: %v", offer.Id, err) - } - k.deleteOffer(offer.Id.GetValue()) - continue + + var acceptedOffer PerishableOffer + err := r.Walk(func(p PerishableOffer) (bool, error) { + offer := p.Details() + if offer == nil { + return false, fmt.Errorf("nil offer while scheduling task %v", task.ID) + } + if task.AcceptOffer(offer) { + if p.Acquire() { + acceptedOffer = p + log.V(3).Infof("Pod %v accepted offer %v", task.Pod.ID, offer.Id.GetValue()) + return true, nil // stop, we found an offer } - return offer.Id.GetValue(), nil } + return false, nil // continue + }) + if acceptedOffer != nil { + if err != nil { + log.Warningf("problems walking the offer registry: %v, attempting to continue", err) + } + return acceptedOffer, nil + } + if err != nil { + log.V(2).Infof("failed to find a fit for pod: %v, err = %v", task.Pod.ID, err) + return nil, err } - return "", fmt.Errorf("failed to find a fit for pod: %v", task.Pod.ID) + log.V(2).Infof("failed to find a fit for pod: %v", task.Pod.ID) + return nil, noSuitableOffersErr } func containsTask(finishedTasks *ring.Ring, taskId string) bool {