Skip to content
This repository has been archived by the owner on Apr 24, 2023. It is now read-only.

Build user usage map async #2184

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions scheduler/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file

The format is based on [Keep a Changelog](http://keepachangelog.com/) and this project adheres to [Semantic Versioning](http://semver.org/).

## [1.63.4] - 2022-09-21
### Fixed
- Fixed bug in parallelized Kubernetes watch processing, from @scrosby
### Changed
- Make prometheus JVM metrics use compute cluster name, from @samincheva

## [1.63.3] - 2022-09-13
### Changed
- Parallelize Kubernetes watch processing, from @scrosby
Expand Down
2 changes: 1 addition & 1 deletion scheduler/project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
;; See the License for the specific language governing permissions and
;; limitations under the License.
;;
(defproject cook "1.63.4-SNAPSHOT"
(defproject cook "1.63.5-SNAPSHOT"
:description "This launches jobs on a Mesos cluster with fair sharing and preemption"
:license {:name "Apache License, Version 2.0"}
:dependencies [[org.clojure/clojure "1.10.3"]
Expand Down
1 change: 1 addition & 0 deletions scheduler/src/cook/mesos.clj
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@
(cond->
{:cancelled-task-trigger-chan (prepare-trigger-chan (time/seconds 3))
:lingering-task-trigger-chan (prepare-trigger-chan (time/minutes timeout-interval-minutes))
:generate-pool-user-usage-map-trigger-chan (prepare-trigger-chan (time/minutes 1))
:match-trigger-chan (async/chan (async/sliding-buffer 1))
:optimizer-trigger-chan (prepare-trigger-chan (time/seconds (:optimizer-interval-seconds optimizer-config 10)))
:progress-updater-trigger-chan (prepare-trigger-chan (time/millis (:publish-interval-ms progress-config)))
Expand Down
57 changes: 44 additions & 13 deletions scheduler/src/cook/scheduler/scheduler.clj
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
(meters/defmeter [cook-mesos scheduler handle-framework-message-rate])

(timers/deftimer [cook-mesos scheduler generate-user-usage-map-duration])
(timers/deftimer [cook-mesos scheduler generate-pool-user-usage-map-duration])

(defn metric-title
([metric-name pool]
Expand Down Expand Up @@ -708,7 +709,36 @@
(gauges/defgauge [cook-mesos scheduler front-of-job-queue-mem] (fn [] @front-of-job-queue-mem-atom))
(gauges/defgauge [cook-mesos scheduler front-of-job-queue-cpus] (fn [] @front-of-job-queue-cpus-atom))

(def cached-user-usage-map-atom (atom nil))
(defn generate-pool-user-usage-map
"Returns a mapping from user to usage stats"
[unfiltered-db]
(tracing/with-span
[s {:name "scheduler.generate-pool-user-usage-map" :tags {:component tracing-component-tag}}]
(prom/with-duration
prom/scheduler-generate-user-usage-map-duration {}
(timers/time!
generate-pool-user-usage-map-duration
(->> (tools/get-running-task-ents unfiltered-db)
(map :job/_instance)
(group-by #(cached-queries/job->pool-name %))
(pc/map-vals
(fn [jobs-for-pool] (->> jobs-for-pool
(group-by :job/user)
(pc/map-vals (fn [jobs]
(->> jobs
(map tools/job->usage)
(reduce (partial merge-with +)))))))))))))
(defn update-pool-user-usage-map
[]
(reset! cached-user-usage-map-atom (generate-pool-user-usage-map (d/db datomic/conn))))

(defn generate-user-usage-map
"Returns a mapping from user to usage stats from cached data"
[pool-name]
(get @cached-user-usage-map-atom pool-name))

(defn generate-user-usage-map-old
"Returns a mapping from user to usage stats"
[unfiltered-db pool-name]
(tracing/with-span
Expand Down Expand Up @@ -1555,7 +1585,7 @@
"Handle scheduling pending jobs using the Fenzo Scheduler."
[conn fenzo fenzo-state resources-atom pool-name->pending-jobs-atom agent-attributes-cache
scheduler-config rebalancer-reservation-atom mesos-run-as-user pool-name compute-clusters
job->acceptable-compute-clusters-fn user->quota user->usage-future]
job->acceptable-compute-clusters-fn user->quota user->usage]
(log-structured/info "Running handler for Fenzo pool" {:pool pool-name})
(prom/with-duration
prom/match-jobs-event-duration {:pool pool-name}
Expand Down Expand Up @@ -1595,9 +1625,6 @@
; and only store if it is a new node.
(when-not (ccache/get-if-present agent-attributes-cache identity slave-id)
(ccache/put-cache! agent-attributes-cache identity slave-id (offer/get-offer-attr-map offer)))))
user->usage (tracing/with-span [s {:name "scheduler.offer-handler.resolve-user-to-usage-future"
:tags {:pool pool-name :component tracing-component-tag}}]
@user->usage-future)
matched-head? (handle-resource-offers! conn fenzo-state pool-name->pending-jobs-atom
mesos-run-as-user user->usage user->quota
num-considerable offers
Expand Down Expand Up @@ -1748,7 +1775,7 @@
"Handle scheduling pending jobs using the Kubernetes Scheduler."
[conn pool-name->pending-jobs-atom pool-name
{:keys [max-jobs-considered minimum-scheduling-capacity-threshold scheduling-pause-time-ms]}
compute-clusters job->acceptable-compute-clusters-fn user->quota user->usage-future mesos-run-as-user]
compute-clusters job->acceptable-compute-clusters-fn user->quota user->usage mesos-run-as-user]
(log-structured/info "Running handler for Kubernetes Scheduler pool" {:pool pool-name})
(prom/with-duration
prom/scheduler-schedule-jobs-event-duration {:pool pool-name}
Expand Down Expand Up @@ -1778,9 +1805,6 @@
(if (> available-scheduling-capacity minimum-scheduling-capacity-threshold)
(let [db (db conn)
pending-jobs (get @pool-name->pending-jobs-atom pool-name)
user->usage (tracing/with-span [s {:name "scheduler.kubernetes-handler.resolve-user-to-usage-future"
:tags {:pool pool-name :component tracing-component-tag}}]
@user->usage-future)
considerable-jobs (pending-jobs->considerable-jobs db pending-jobs user->quota user->usage max-considerable pool-name)
considerable-job-uuids (set (map :job/uuid considerable-jobs))]
(log-structured/info "Considering jobs" {:pool pool-name :number-considered-jobs (count considerable-job-uuids)
Expand Down Expand Up @@ -2454,16 +2478,15 @@
;; 2. Once the above two items are addressed, user->usage should always correctly
;; reflect *Cook*'s understanding of the state of the world at this point.
;; When this happens, users should never exceed their quota
user->usage-future (future (tracing/with-span [s1 {:from s :finish? false}] ; NOTE: finish? is set to false to prevent early finishing of the span
(generate-user-usage-map (d/db conn) pool-name)))
user->usage (generate-user-usage-map pool-name)
compute-clusters (vals @cook.compute-cluster/cluster-name->compute-cluster-atom)
user->quota (quota/create-user->quota-fn (d/db conn) (if using-pools? pool-name nil))]
(if using-kubernetes-scheduler?
(handle-kubernetes-scheduler-pool conn pool-name->pending-jobs-atom pool-name scheduler-config compute-clusters
job->acceptable-compute-clusters-fn user->quota user->usage-future mesos-run-as-user)
job->acceptable-compute-clusters-fn user->quota user->usage mesos-run-as-user)
(handle-fenzo-pool conn fenzo fenzo-state resources-atom pool-name->pending-jobs-atom agent-attributes-cache
scheduler-config rebalancer-reservation-atom mesos-run-as-user pool-name compute-clusters
job->acceptable-compute-clusters-fn user->quota user->usage-future)))
job->acceptable-compute-clusters-fn user->quota user->usage)))
(catch Exception e
(log-structured/error "Pool handler encountered exception; continuing" {:pool pool-name} e)))))
(log-structured/info "Done with pool handler" {:pool pool-name}))
Expand All @@ -2478,7 +2501,10 @@

(persist-mea-culpa-failure-limit! conn mea-culpa-failure-limit)

(let [{:keys [match-trigger-chan rank-trigger-chan]} trigger-chans
; We're the leader. Make sure we have an initial set of user usage information
(update-pool-user-usage-map)

(let [{:keys [match-trigger-chan rank-trigger-chan generate-pool-user-usage-map-trigger-chan]} trigger-chans
pools (->> conn d/db pool/all-pools (filter pool/schedules-jobs?))
pools' (if (-> pools count pos?)
pools
Expand All @@ -2504,6 +2530,11 @@
(pool->scheduler-config name) (pool->match-trigger-chan name)
rebalancer-reservation-atom mesos-run-as-user name
cluster-name->compute-cluster-atom job->acceptable-compute-clusters-fn)))]
(tools/chime-at-ch
generate-pool-user-usage-map-trigger-chan
(fn trigger-global-pool-user-usage-map-iteration []
(log/info "Updating global pool-user-usage-map")
(update-pool-user-usage-map)))
(prepare-match-trigger-chan match-trigger-chan pools')
(async/go-loop []
(when-let [x (async/<! match-trigger-chan)]
Expand Down
3 changes: 2 additions & 1 deletion scheduler/test/cook/test/mesos/mesos_mock.clj
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,8 @@

(deftest mesos-mock-kill
;; Mock this out with a stub string, as the compute cluster shouldn't be used anywhere.
(with-redefs [cc/compute-cluster-name->ComputeCluster "mesos-mock-kill"]
(with-redefs [cc/compute-cluster-name->ComputeCluster "mesos-mock-kill"
cook.scheduler.scheduler/update-pool-user-usage-map (fn [])]
(testing "kill task"
(let [registered-atom (atom false)
offer-atom (atom [])
Expand Down
1 change: 1 addition & 0 deletions scheduler/test/cook/test/scheduler/scheduler.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2400,6 +2400,7 @@
output-atom (atom [])]
(with-redefs [sched/persist-mea-culpa-failure-limit! (fn [_ _])
d/db (fn [_])
sched/update-pool-user-usage-map (fn [])
pool/all-pools
(fn [_]
[{:pool/name "pool 1"
Expand Down