Skip to content

Commit

Permalink
Add polling endpoint and immediately detect finished flows
Browse files Browse the repository at this point in the history
  • Loading branch information
jeisses committed Sep 15, 2023
1 parent 66d39f0 commit 0111b92
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 54 deletions.
2 changes: 1 addition & 1 deletion deps.edn
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
:deps
{org.clojure/clojure {:mvn/version "1.11.0"}
nostromo/nostromo {:git/url "https://github.com/nosana-ci/nostromo"
:sha "66c03f6c0167d6442e0a453721345a6995cf4812"}
:sha "e52e1075a4b5d7b4eab70d0f41b86dc707760c57"}
;; nostromo/nostromo {:local/root "../../../nostromo"}

org.clojure/tools.cli {:mvn/version "1.0.214"}
Expand Down
123 changes: 71 additions & 52 deletions src/nosana_node/nosana.clj
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,7 @@ Running Nosana Node %s

(defn find-next-run
"Find all assigned runs and return the first assigned to our
market. Returns a tuple of [run-address run-data]."
market. Returns a tuple of [run-address run-data]."
[conf]
(let [runs (find-my-runs conf)]
(loop [[[run-addr run] & rst] runs]
Expand All @@ -660,63 +660,81 @@ market. Returns a tuple of [run-address run-data]."

(defn work-loop
"Main loop."
[conf {:nos/keys [poll-delay exit-chan] :as system}]
(go-loop [active-flow nil
last-health-check (flow/current-time)
healthy? true]
(async/alt!
;; put anything on :exit-ch to stop the loop
exit-chan (log :info "Work loop exited")
;; otherwise we will loop onwards with a timeout
(timeout poll-delay)
(cond
(should-check-health? last-health-check)
(let [[status health msgs] (healthy conf)]
(log :info "Checking node health...")
(case status
:success (do
(log :info "Node is healthy")
(recur active-flow (flow/current-time) true))
:error (do
(log :info (str "Node not healthy, waiting. Status:\n"
(string/join "\n- " msgs)))
(recur nil (flow/current-time) false))))

(not healthy?) (do
(log :info "Node not healthy, waiting.")
(recur nil last-health-check false))
active-flow (do
(log :info "Checking progress of flow " active-flow)
(recur (<! (process-flow! active-flow conf system))
last-health-check true))
:else
(let [my-run (find-next-run conf)]
(cond
my-run (do
(log :info "Found claimed jobs to work on")
(recur (<! (start-flow-for-run! my-run conf system))
last-health-check true))

(is-queued? conf) (do

(log :info "Waiting in the queue.")
(recur nil last-health-check true))

:else (let [enter-sig (enter-market conf)]
(log :info "Entering the queue")
(when enter-sig
(<! (sol/await-tx< enter-sig (:network conf))))
(recur nil last-health-check true))))))))
[conf {:nos/keys [poll-delay exit-chan flow-chan work-loop-chan] :as system}]
(let [finish-flow-chan (chan)]
;; we subscribe to all `flow-chan` messages that start with `:finished`
(async/sub (async/pub flow-chan first) :finished finish-flow-chan)
(go-loop [active-flow nil
last-health-check (flow/current-time)
healthy? true]
(async/alt!
;; put anything on :exit-ch to stop the loop
exit-chan (log :info "Work loop exited")

;; if the flow is finished we quit
finish-flow-chan
([[_ flow-id flow]]
(log :info "Receive flow finished message")
(recur (<! (process-flow! active-flow conf system))
last-health-check true))

;; after a delay we will always loop
(timeout poll-delay) (do
(>! work-loop-chan 1)
(recur nil last-health-check true))

;; otherwise we will loop onwards with a timeout
work-loop-chan
(cond
(should-check-health? last-health-check)
(let [[status health msgs] (healthy conf)]
(log :info "Checking node health...")
(case status
:success (do
(log :info "Node is healthy")
(recur active-flow (flow/current-time) true))
:error (do
(log :info (str "Node not healthy, waiting. Status:\n"
(string/join "\n- " msgs)))
(recur nil (flow/current-time) false))))

(not healthy?) (do
(log :info "Node not healthy, waiting.")
(recur nil last-health-check false))
active-flow (do
(log :info "Checking progress of flow " active-flow)
(recur (<! (process-flow! active-flow conf system))
last-health-check true))
:else
(let [my-run (find-next-run conf)]
(cond
my-run (do
(log :info "Found claimed jobs to work on")
(recur (<! (start-flow-for-run! my-run conf system))
last-health-check true))

(is-queued? conf) (do

(log :info "Waiting in the queue.")
(recur nil last-health-check true))

:else (let [enter-sig (enter-market conf)]
(log :info "Entering the queue")
(when enter-sig
(<! (sol/await-tx< enter-sig (:network conf))))
(recur nil last-health-check true)))))))))

(defn use-nosana
[{:nos/keys [store flow-chan vault] :as system}]
;; Wait a bit for podman to boot
(log :info "Waiting 5s for podman")
(Thread/sleep 6000)
(let [network (:solana-network vault)
market (:nosana-market vault)
conf (make-config system)
exit-ch (chan)
(Thread/sleep 6)
(let [network (:solana-network vault)
market (:nosana-market vault)
conf (make-config system)
exit-ch (chan)
work-loop-ch (chan 5)

[status health msgs] (healthy conf)]

Expand Down Expand Up @@ -770,6 +788,7 @@ market. Returns a tuple of [run-address run-data]."
{:nos/exit-chan exit-ch
:nos/poll-delay (:poll-delay-ms vault)})))
:nos/exit-chan exit-ch
:nos/work-loop-chan work-loop-ch
:nos/poll-delay (:poll-delay-ms vault)
:nos/solana-network (:network conf)
:nos/programs (:programs conf))
Expand Down
6 changes: 5 additions & 1 deletion src/nosana_node/system.clj
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@
resp-404)))))

(defn handler [{:keys [uri nos/store headers nos/solana-network
nos/programs] :as request}]
nos/programs nos/work-loop-chan] :as request}]
(cond
(or (= uri "/health")
(= uri "/"))
Expand All @@ -108,6 +108,10 @@
:body "OK"}
(string/starts-with? uri "/nosana/logs/")
(get-op-log store uri headers solana-network programs)
(string/starts-with? uri "/nosana/trigger-job")
(do
(>!! work-loop-chan :poll)
{:status 200 :body "OK"})
:else {:status 200
:headers {"Content-Type" "text/html"}
:body "Not found"}))
Expand Down

0 comments on commit 0111b92

Please sign in to comment.