diff --git a/src/net/cgrand/xforms.cljc b/src/net/cgrand/xforms.cljc index 3740693..7cb0f8d 100644 --- a/src/net/cgrand/xforms.cljc +++ b/src/net/cgrand/xforms.cljc @@ -348,7 +348,10 @@ @acc) ; downstream is done, propagate (do (vswap! m assoc! k nop-rf) - (krf @acc))) ; TODO think again + (let [acc (krf @acc)] + (when (reduced? acc) + (vreset! m (transient {}))) + acc))) acc)))) (let [kfn (or kfn key') vfn (or vfn val')] @@ -366,7 +369,10 @@ @acc) ; downstream is done, propagate (do (vswap! m assoc! k nop-rf) - (krf @acc))) + (let [acc (krf @acc)] + (when (reduced? acc) + (vreset! m (transient {}))) + acc))) acc))))))))))) (defn into-by-key @@ -657,7 +663,7 @@ (transduce (comp xform count) rf/last coll))) (defn multiplex - "Returns a transducer that runs several transducers (sepcified by xforms) in parallel. + "Returns a transducer that runs several transducers (specified by xforms) in parallel. If xforms is a map, values of the map are transducers and keys are used to tag each transducer output: => (into [] (x/multiplex [(map inc) (map dec)]) (range 3)) @@ -674,40 +680,48 @@ xforms) (into #{} (map #(% mrf)) xforms))) invoke-rfs (if (map? xforms) - (fn [acc invoke] + (fn [acc step? invoke] (reduce-kv (fn [acc tag rf] (let [acc (invoke rf acc)] - (if (reduced? acc) + (if (and step? (reduced? acc)) (if (reduced? @acc) (do (vreset! rfs nil) acc) ; downstream is done, propagate - (do (vswap! rfs dissoc tag) (rf @acc))) + (do (vswap! rfs dissoc tag) + (let [acc (rf @acc)] + (when (reduced? acc) + (vreset! rfs nil)) + acc))) acc))) acc @rfs)) - (fn [acc invoke] + (fn [acc step? invoke] (core/reduce (fn [acc rf] (let [acc (invoke rf acc)] - (if (reduced? acc) + (if (and step? (reduced? acc)) (if (reduced? @acc) (do (vreset! rfs nil) acc) ; downstream is done, propagate - (do (vswap! rfs disj rf) (rf @acc))) + (do (vswap! rfs disj rf) + (let [acc (rf @acc)] + (when (reduced? acc) + (vreset! rfs nil)) + acc))) acc))) acc @rfs)))] (kvrf ([] (rf)) - ([acc] (rf (invoke-rfs acc #(%1 %2)))) + ([acc] (rf (invoke-rfs acc false #(%1 %2)))) ([acc x] - (let [acc (invoke-rfs acc #(%1 %2 x))] + (let [acc (invoke-rfs acc true #(%1 %2 x))] (if (zero? (core/count @rfs)) (ensure-reduced acc) acc))) ([acc k v] - (let [acc (invoke-rfs acc #(%1 %2 k v))] + (let [acc (invoke-rfs acc true #(%1 %2 k v))] (if (zero? (core/count @rfs)) (ensure-reduced acc) acc))))))) diff --git a/test/net/cgrand/xforms_test.cljc b/test/net/cgrand/xforms_test.cljc index d3b0466..b999948 100644 --- a/test/net/cgrand/xforms_test.cljc +++ b/test/net/cgrand/xforms_test.cljc @@ -1,6 +1,7 @@ (ns net.cgrand.xforms-test (:require [clojure.test :refer [is deftest testing]] - [net.cgrand.xforms :as x])) + [net.cgrand.xforms :as x] + [net.cgrand.xforms.rfs :as rf])) (defn trial "A transducing context for testing that transducers are well-behaved towards @@ -139,4 +140,79 @@ (is (= (range 100) (x/into [] (x/sort) (shuffle (range 100))))) (is (= (reverse (range 100)) (x/into [] (x/sort >) (shuffle (range 100))))) (is (= (sort-by str (range 100)) (x/into [] (x/sort-by str) (shuffle (range 100))))) - (is (= (sort-by str (comp - compare) (range 100)) (x/into [] (x/sort-by str (comp - compare)) (shuffle (range 100)))))) \ No newline at end of file + (is (= (sort-by str (comp - compare) (range 100)) (x/into [] (x/sort-by str (comp - compare)) (shuffle (range 100)))))) + +(deftest by-key + (testing "Respects reduced from multiplexed reductions and from the downstream reduction." + (is (= {:x 3 :y 5} + (into {} + (x/by-key (x/reduce rf/some)) + [[:x 3] [:y 5] [:x 2]])) + "Respects multiplexed reduced.") + + (is (= [:x 4] + (transduce + (x/by-key (map inc)) + rf/some + [[:x 3] [:y 5] [:x 2]])) + "Respects downstream reduced.") + + (is (= [:y 4] + (transduce + (x/by-key (x/reduce (fn + ([] 0) + ([sum] sum) + ([sum x] + (let [sum (+ sum x)] + (if (> sum 4) (reduced 4) sum)))))) + rf/some + [[:x 3] [:y 5] [:x 2]])) + "Respects reduced in downstream complete."))) + +(deftest multiplex + (testing "Respects reduced from multiplexed reductions and from the downstream reduction." + (testing "Respects multiplexed reduced." + (is (= [3] + (into [] + (x/multiplex [(x/reduce rf/some)]) + [3 5 2]))) + (is (= {:x 3} + (into {} + (x/multiplex {:x (x/reduce rf/some)}) + [3 5 2])))) + + (testing "Respects downstream reduced." + (is (= 4 + (transduce + (x/multiplex [(map inc)]) + rf/some + [3 5 2]))) + (is (= [:x 4] + (transduce + (x/multiplex {:x (map inc)}) + rf/some + [3 5 2])))) + + (testing "Doesn't repeat multiplexed completion." + (is (= 2 + (transduce + (x/multiplex [(x/reduce rf/last)]) + rf/some + [3 5 2]))) + (is (= [:x 2] + (transduce + (x/multiplex {:x (x/reduce rf/last)}) + rf/some + [3 5 2])))) + + (testing "Respects reduced in downstream complete." + (is (= 3 + (transduce + (x/multiplex [(x/reduce rf/some) (x/reduce rf/some)]) + rf/some + [3 5 2]))) + (is (= 3 + (transduce + (x/multiplex {:x (x/reduce rf/some) :y (x/reduce rf/some)}) + (completing rf/some second) + [3 5 2]))))))