Skip to content

Commit

Permalink
Set version to 9.1.538
Browse files Browse the repository at this point in the history
  • Loading branch information
niwinz committed Nov 22, 2022
1 parent 588c112 commit 4ce39ea
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 25 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
# Changelog #


## Version 9.1.538

- Add `px/current-thread` helper function
- Add `px/thread-interrupted?` helper function
- Add `px/interrupt-thread!` helper function
- Add `px/join!` helper function
- Add `px/thread-id` helper function


## Version 9.1.536

- Assign default parallelism to scheduled executor (based on CPUs).
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ Here you can look a detailed [documentation][1].
deps.edn:

```clojure
funcool/promesa {:mvn/version "9.1.536"}
funcool/promesa {:mvn/version "9.1.538"}
```

Leiningen:

```clojure
[funcool/promesa "9.1.536"]
[funcool/promesa "9.1.538"]
```

## On the REPL
Expand Down
1 change: 1 addition & 0 deletions deps.edn
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
{com.bhauman/rebel-readline-cljs {:mvn/version "RELEASE"},
com.bhauman/rebel-readline {:mvn/version "RELEASE"},
org.clojure/tools.namespace {:mvn/version "RELEASE"},
org.clojure/core.async {:mvn/version "1.6.673"}
criterium/criterium {:mvn/version "RELEASE"}
thheller/shadow-cljs {:mvn/version "RELEASE"}}
:extra-paths ["test" "dev" "src"]},
Expand Down
4 changes: 3 additions & 1 deletion dev/user.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
[clojure.test :as test]
[clojure.tools.namespace.repl :as r]
[clojure.walk :refer [macroexpand-all]]
[clojure.core.async :as a]
[criterium.core :refer [quick-bench bench with-progress-reporting]]
[promesa.core :as p]
[promesa.exec :as px]
Expand All @@ -14,7 +15,8 @@
(:import
java.util.concurrent.CompletableFuture
java.util.concurrent.CompletionStage
java.util.function.Function))
java.util.function.Function
java.util.concurrent.atomic.AtomicLong))

(defmacro run-quick-bench
[& exprs]
Expand Down
2 changes: 1 addition & 1 deletion scripts/repl
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
set -ex
# clojure -M:dev -e "(set! *warn-on-reflection* true)" -m rebel-readline.main
# clojure -M:dev -m rebel-readline.main
clojure -J--enable-preview -M:dev -m rebel-readline.main
clojure -J-Djdk.tracePinnedThreads=short -J-Djdk.virtualThreadScheduler.parallelism=16 -J--enable-preview -M:dev -m rebel-readline.main
102 changes: 81 additions & 21 deletions src/promesa/exec.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
(:import
clojure.lang.Var
java.lang.AutoCloseable
java.time.Duration
java.util.concurrent.Callable
java.util.concurrent.CompletableFuture
java.util.concurrent.Executor
Expand All @@ -32,7 +33,6 @@

#?(:clj (set! *warn-on-reflection* true))


;; --- Globals & Defaults (with CLJS Impl)

(declare #?(:clj scheduled-executor :cljs ->ScheduledExecutor))
Expand Down Expand Up @@ -223,7 +223,16 @@
(^Thread newThread [_ ^Runnable runnable]
(func runnable)))))

#?(:clj (def ^{:no-doc true} counter (AtomicLong. 0)))
#?(:clj (def ^{:no-doc true :dynamic true}
*default-counter*
(AtomicLong. 0)))

#?(:clj
(defn get-next
"Get next value from atomic long counter"
{:no-doc true}
([] (.getAndIncrement ^AtomicLong *default-counter*))
([counter] (.getAndIncrement ^AtomicLong counter))))

#?(:clj
(defn default-thread-factory
Expand All @@ -232,37 +241,27 @@
:or {daemon true
priority Thread/NORM_PRIORITY
name "promesa/thread/%s"}}]
(reify ThreadFactory
(newThread [this runnable]
(doto (Thread. ^Runnable runnable)
(.setPriority priority)
(.setDaemon ^Boolean daemon)
(.setName (format name (.getAndIncrement ^AtomicLong counter))))))))
(let [counter (AtomicLong. 0)]
(reify ThreadFactory
(newThread [this runnable]
(doto (Thread. ^Runnable runnable)
(.setPriority priority)
(.setDaemon ^Boolean daemon)
(.setName (format name (get-next counter)))))))))

#?(:clj
(defn default-forkjoin-thread-factory
^ForkJoinPool$ForkJoinWorkerThreadFactory
[& {:keys [name daemon] :or {name "promesa/forkjoin/%s" daemon true}}]
(let [^AtomicLong counter (AtomicLong. 0)]
(let [counter (AtomicLong. 0)]
(reify ForkJoinPool$ForkJoinWorkerThreadFactory
(newThread [_ pool]
(let [thread (.newThread ForkJoinPool/defaultForkJoinWorkerThreadFactory pool)
tname (format name (.getAndIncrement counter))]
tname (format name (get-next counter))]
(.setName ^ForkJoinWorkerThread thread ^String tname)
(.setDaemon ^ForkJoinWorkerThread thread ^Boolean daemon)
thread))))))

#?(:clj
(defn- opts->thread-factory
[{:keys [daemon priority]
:or {daemon true priority Thread/NORM_PRIORITY}}]
(fn->thread-factory
(fn [runnable]
(let [thread (Thread. ^Runnable runnable)]
(.setDaemon thread daemon)
(.setPriority thread priority)
thread)))))

#?(:clj
(defn- resolve-thread-factory
{:no-doc true}
Expand Down Expand Up @@ -591,4 +590,65 @@
(cons (map first ss) (step-fn (map rest ss)))))))]
(pmap #(apply f %) (step-fn (cons coll colls)))))))

#?(:clj
(defmacro thread
"A low-level, not-pooled thread constructor."
[opts & body]
(let [[opts body] (if (map? opts)
[opts body]
[nil (cons opts body)])]
`(let [opts# ~opts
thr# (Thread. (^:once fn* [] ~@body))]
(.setName thr# (or (:name ~opts) (format "promesa/unnamed-thread/%s" (get-next))))
(.setDaemon thr# (:daemon? ~opts false))
(.setPriority thr# (:priority ~opts Thread/NORM_PRIORITY))
(.start thr#)
thr#))))

#?(:clj
(defn current-thread
"Return the current thread."
[]
(Thread/currentThread)))

#?(:clj
(defn thread-interrupted?
"Check if the thread has the interrupted flag set.
There are two special cases:
Using the `:current` keyword as argument will check the interrupted
flag on the current thread.
Using the arity 0 (passing no arguments), then the current thread
will be checked and **WARNING** the interrupted flag reset to
`false`."
([]
(Thread/interrupted))
([thread]
(if (= :current thread)
(.isInterrupted (Thread/currentThread))
(.isInterrupted ^Thread thread)))))

#?(:clj
(defn thread-id
"Retrieves the thread ID."
([]
(.getId ^Thread (Thread/currentThread)))
([^Thread thread]
(.getId thread))))

#?(:clj
(defn interrupt-thread!
[^Thread thread]
(.interrupt thread)))

#?(:clj
(defn join!
"Waits for the specified thread to terminate."
([^Thread thread]
(.join thread))
([^Thread thread duration]
(if (instance? Duration duration)
(.join thread ^Duration duration)
(.join thread (long duration))))))

0 comments on commit 4ce39ea

Please sign in to comment.