Skip to content

Commit

Permalink
Add base of join-test-grid command
Browse files Browse the repository at this point in the history
  • Loading branch information
jeisses committed Nov 22, 2023
1 parent d174eaf commit f16e5e9
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 41 deletions.
2 changes: 1 addition & 1 deletion deps.edn
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
:deps
{org.clojure/clojure {:mvn/version "1.11.0"}
nostromo/nostromo {:git/url "https://github.com/nosana-ci/nostromo"
:sha "0dd207fab706bda8d29e11208ef8921c9359e0bb"}
:sha "543a42aabd469af951df5a050c4a6a1ee6a5f7bc"}
;; nostromo/nostromo {:local/root "../../../nostromo"}

org.clojure/tools.cli {:mvn/version "1.0.214"}
Expand Down
25 changes: 11 additions & 14 deletions src/nosana_node/cli.clj
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,11 @@

"join-test-grid"
{:options
[["-h" "--help"]]
[[nil "--market ADDR" "Solana address of the Test Grid market."
:default-desc ""
:default "2kNSniTBsLCioSr4dgdZh6S1JKQc8cZQvxrPWkEn1ERj"
:id :nosana-market]
["-h" "--help"]]

:usage
(fn [options-summary]
Expand Down Expand Up @@ -136,10 +140,9 @@
[]
(reduce
(fn [conf-map [conf-key env-var]]
(let [env-val (System/getenv env-var)]
(if env-val
(assoc conf-map conf-key env-val)
conf-map)))
(if-let [env-val (System/getenv env-var)]
(assoc conf-map conf-key env-val)
conf-map))
{}
env-config))

Expand All @@ -148,8 +151,6 @@
(cond
))



(defn use-cli
"Parse CLI arguments using `tools.cli` and add to system map.
Expand All @@ -171,7 +172,7 @@
(not (contains? cli-actions action))
{:exit-message (str "Unknown action " action) :ok? false}

;; parse the inner options passed this the action
;; parse the inner options passed to this action
:else
(let [{a-summary :summary a-errors :errors a-args :arguments a-options :options}
(cli/parse-opts (rest arguments) (get-in cli-options [action :options]))]
Expand All @@ -184,7 +185,6 @@

:else
(merge options a-options))))]

(cond
(:exit-message state)
(do
Expand All @@ -200,15 +200,12 @@
(nth (reverse [:trace :debug :info :warn :error
:fatal :report])
verbosity)]

(log/set-min-level! log-level)
(log/debug "Log level is " log-level))

;; merge CLI over existing config
(cond->
(-> sys
(assoc :nos/action action)
(update :nos/vault merge state))
(= "start" action)
(assoc :run-server? true)
(not= "start" action)
(:nos/start-job-loop? false))))))
(update :nos/vault merge state)))))))
89 changes: 83 additions & 6 deletions src/nosana_node/join_test_grid.clj
Original file line number Diff line number Diff line change
@@ -1,10 +1,87 @@
(ns nosana-node.join-test-grid
"Functions for handling the `join-test-grid` command"
(:require [nosana-node.nosana :as nos]
[nosana-node.solana :as sol]))
(:require
[taoensso.timbre :as log]
[nosana-node.nosana :as nos]
[nosana-node.solana :as sol]
[clojure.core.async :as async :refer
[<!! <! >!! go go-loop >! timeout chan]]))

(defn join-test-grid
(defn has-jobs?
"Check if the test grid market still has jobs."
[conf]
(let [market (nos/get-market conf)]
(and (= (:queueType market) 0)
(not-empty (:queue market)))))

(defn get-test-grid-run
"Get run for this node on the test grid.
Will enter the market if necessary."
[conf]
(go
(if-let [run (nos/find-next-run conf)]
;; if we already have a run we can use that
run
;; else we will enter the market and return the run
(cond
(not (has-jobs? conf))
{:error "There is currently no availability on the Test Grid." :ok? true}

:else
(let [_ (println "Entering the market")
sig (nos/enter-market conf)
tx (<! (sol/await-tx< sig (:network conf)))]
;; TODO: error handling
;; after entering the market we should have a job assigned
(if-let [run (nos/find-next-run conf)]
run
{:error "Could not find a Test Grid slot, please try again later." :ok? true}))))))

(defn print-results [sig run-addr flow]
(println "Benchmark finished.")
(println "Receipt" sig "\n")
(println "---\n")
(println "Test Grid Registration code: " run-addr))

(defn join-test-grid
"Handle the `join-test-grid` command."
[system]
(println "test grid")
)
[{:nos/keys [conf] :as system}]
;; TODO: in general we should improve the information printed to
;; console during this process.

;; it might be good to hide some verbose logs from this point
(log/set-min-level! :warn)

(go
(let [run (<! (get-test-grid-run conf))]
(cond
(:error run)
(do
(println (:error run))
(System/exit (if (:ok? run) 0 1)))

:else
(do
(println "Found a Test Grid assignment, starting benchmark...\n")

;; we can subscribe to a channel that will receive the run results when finished
(let [finish-flow-chan (nos/subscribe-to-finished-flows system)
flow-id (nos/start-flow-for-run! run conf system)

result
;; this will either return the flow results or timeout
(async/alt!
finish-flow-chan
([[_ flow-id flow]]
(println "Benchmark finished, posting results\n")
{:sig (<! (nos/finish-flow-2 flow conf))
:flow flow})

;; TODO: does this timeout make sense?
(timeout 5000)
{:error "Error: timed out while waiting for benchmark results."})]
(if (:error result)
(do
(println (:error result))
(System/exit 1))
(print-results (:sig result) (first run) (:flow result)))))))))
29 changes: 13 additions & 16 deletions src/nosana_node/main.clj
Original file line number Diff line number Diff line change
Expand Up @@ -46,27 +46,24 @@
(let [sys
(start-system
system
{:http/handler #'nos-sys/handler
:system/components [use-vault
{:http/handler #'nos-sys/handler
:system/components [use-vault
cli/use-cli
store/use-fs-store
(use-when :run-server?
;; use-nrepl
use-nostromo
use-nosana
nos-sys/use-wrap-ctx
)
(use-when #(not (:run-server? %))
use-pipeline)]
:system/profile :prod
:cli-args args
:nos/log-dir "/tmp/logs"
:nos/store-path "/tmp/store"
;; use-nrepl
use-nostromo
use-nosana
nos-sys/use-wrap-ctx]
:system/profile :prod
:cli-args args
:run-server? true
:nos/log-dir "/tmp/logs"
:nos/store-path "/tmp/store"
:nos/start-job-loop true
:nos/vault-path (io/resource "config.edn")})]
:nos/vault-path (io/resource "config.edn")})]
(case (:nos/action sys)
"start" (<!! (work-loop sys))
"join-test-grid") (join-test-grid sys))
"join-test-grid" (<!! (join-test-grid sys))))
(catch Exception e
(do
(log/log :trace e)
Expand Down
18 changes: 14 additions & 4 deletions src/nosana_node/nosana.clj
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@
result-ipfs)
tx (<! (sol/await-tx< sig (:network conf)))]
(log :info "Job results posted " result-ipfs sig)
nil)))
sig)))

(defn process-flow!
"Check the state of a flow and finalize its job if finished.
Expand All @@ -539,7 +539,8 @@
;; TODO: consider moving garbage-collecting to an op in
;; the flow
(docker/gc-volumes! flow {:uri (:podman-conn-uri vault)})
(<! (finish-flow-2 flow conf)))
(<! (finish-flow-2 flow conf))
nil)
(flow-expired? flow)
(do
(log :info "Flow has expired at " (:expired flow))
Expand Down Expand Up @@ -644,8 +645,8 @@
(> (- (flow/current-time) timestamp) (* 60 15)))

(defn find-next-run
"Find all assigned runs and return the first assigned to our
market. Returns a tuple of [run-address run-data]."
"Find the first run assigned to us in the configure market.
Returns a tuple of [run-address run-data]."
[conf]
(let [runs (find-my-runs conf)]
(loop [[[run-addr run] & rst] runs]
Expand All @@ -658,6 +659,15 @@
[run-addr run]
(recur rst)))))))

(defn subscribe-to-finished-flows
"Return a chan that publishes messages of finished flows."
[{:nos/keys [flow-chan-mult]}]
(let [finish-flow-chan (chan)
flow-chan-tap (chan)]
(async/tap flow-chan-mult flow-chan-tap)
(async/sub (async/pub flow-chan-tap first) :finished finish-flow-chan)
finish-flow-chan))

(defn work-loop
"Main loop."
[{:nos/keys [conf poll-delay exit-chan flow-chan-mult work-loop-chan] :as system}]
Expand Down

0 comments on commit f16e5e9

Please sign in to comment.