Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add transaction log #719

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/datahike/config.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@
(def ^:dynamic *default-store* :mem) ;; store-less = in-memory?
(def ^:dynamic *default-db-name* nil) ;; when nil creates random name
(def ^:dynamic *default-db-branch* :db) ;; when nil creates random name
(def ^:dynamic *default-tx-log-size* 256)

(s/def ::index #{:datahike.index/hitchhiker-tree :datahike.index/persistent-set})
(s/def ::keep-history? boolean?)
(s/def ::schema-flexibility #{:read :write})
(s/def ::attribute-refs? boolean?)
(s/def ::search-cache-size nat-int?)
(s/def ::store-cache-size pos-int?)
(s/def ::tx-log-size #(not (neg? %)))
(s/def ::crypto-hash? boolean?)
(s/def ::writer map?)
(s/def ::branch keyword?)
Expand All @@ -54,6 +56,7 @@
::attribute-refs?
::search-cache-size
::store-cache-size
::tx-log-size
::crypto-hash?
::initial-tx
::name
Expand Down Expand Up @@ -96,7 +99,8 @@
:writer self-writer
:crypto-hash? *default-crypto-hash?*
:search-cache-size *default-search-cache-size*
:store-cache-size *default-store-cache-size*})
:store-cache-size *default-store-cache-size*
:tx-log-size *default-tx-log-size*})

(defn int-from-env
[key default]
Expand Down Expand Up @@ -135,6 +139,7 @@
:index *default-index*
:search-cache-size *default-search-cache-size*
:store-cache-size *default-store-cache-size*
:tx-log-size *default-tx-log-size*
:crypto-hash? *default-crypto-hash?*
:branch *default-db-branch*
:writer self-writer
Expand Down Expand Up @@ -177,6 +182,7 @@
:writer self-writer
:search-cache-size (int-from-env :datahike-search-cache-size *default-search-cache-size*)
:store-cache-size (int-from-env :datahike-store-cache-size *default-store-cache-size*)
:tx-log-size (int-from-env :datahike-tx-log-size *default-tx-log-size*)
:index-config (if-let [index-config (map-from-env :datahike-index-config nil)]
index-config
(di/default-index-config index))}
Expand Down
2 changes: 1 addition & 1 deletion src/datahike/connector.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@
(defn- normalize-config [cfg]
(-> cfg
(update :store ds/store-identity)
(dissoc :writer :store-cache-size :search-cache-size)))
(dissoc :writer :store-cache-size :search-cache-size :tx-log-size)))

(extend-protocol PConnector
String
Expand Down
2 changes: 1 addition & 1 deletion src/datahike/db/transaction.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@
;; In context of `with-datom` we can use faster comparators which
;; do not check for nil (~10-15% performance gain in `transact`)

(defn- with-datom [db ^Datom datom]
(defn with-datom [db ^Datom datom]
(validate-datom db datom)
(let [{a-ident :ident} (dbu/attr-info db (.-a datom))
indexing? (dbu/indexing? db a-ident)
Expand Down
51 changes: 29 additions & 22 deletions src/datahike/writer.cljc
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
(ns ^:no-doc datahike.writer
(:require [superv.async :refer [S thread-try <?- go-try]]
[konserve.core :as k]
[taoensso.timbre :as log]
[datahike.core]
[datahike.writing :as w]
Expand Down Expand Up @@ -51,7 +52,7 @@
;; delay processing until the writer we are part of in connection is set
(while (not (:writer @(:wrapped-atom connection)))
(<! (timeout 10)))
(loop [old @(:wrapped-atom connection)]
(loop [old @(:wrapped-atom connection)]
(if-let [{:keys [op args callback] :as invocation} (<?- transaction-queue)]
(do
(when (> (count transaction-queue-buffer) (* 0.9 transaction-queue-size))
Expand Down Expand Up @@ -103,34 +104,40 @@
(log/debug "Writer thread gracefully closed")))))
;; commit loop
(go-try S
(loop [tx (<?- commit-queue)]
(loop [tx (<?- commit-queue)
last-flushed (<?- (k/get (:store @(:wrapped-atom connection))
(get-in @(:wrapped-atom connection) [:config :branch])
nil {:sync? false}))]
(when tx
(let [txs (into [tx] (take-while some?) (repeatedly #(poll! commit-queue)))]
;; empty channel of pending transactions
(log/trace "Batched transaction count: " (count txs))
;; commit latest tx to disk
(let [db (:db-after (first (peek txs)))]
(try
(let [start-ts (get-time-ms)
{{:keys [datahike/commit-id]} :meta
:as commit-db} (<?- (w/commit! db nil false))
commit-time (- (get-time-ms) start-ts)]
(log/trace "Commit time (ms): " commit-time)
(reset! connection commit-db)
(let [{:keys [db-after tx-data]} (first (peek txs))
last-flushed
(try
(let [start-ts (get-time-ms)
[last-flushed
{{:keys [datahike/commit-id]} :meta
:as commit-db}] (<?- (w/commit! last-flushed db-after tx-data nil false))
commit-time (- (get-time-ms) start-ts)]
(log/trace "Commit time (ms): " commit-time)
(reset! connection commit-db)
;; notify all processes that transaction is complete
(doseq [[tx-report callback] txs]
(let [tx-report (-> tx-report
(assoc-in [:tx-meta :db/commitId] commit-id)
(assoc :db-after commit-db))]
(put! callback tx-report))))
(catch Exception e
(doseq [[_ callback] txs]
(put! callback e))
(log/error "Writer thread shutting down because of commit error." e)
(close! commit-queue)
(close! transaction-queue)))
(doseq [[tx-report callback] txs]
(let [tx-report (-> tx-report
(assoc-in [:tx-meta :db/commitId] commit-id)
(assoc :db-after commit-db))]
(put! callback tx-report)))
last-flushed)
(catch Exception e
(doseq [[_ callback] txs]
(put! callback e))
(log/error "Writer thread shutting down because of commit error." e)
(close! commit-queue)
(close! transaction-queue)))]
(<! (timeout commit-wait-time))
(recur (<?- commit-queue)))))))))]))
(recur (<?- commit-queue) last-flushed))))))))]))

;; public API to internal mapping
(def default-write-fn-map {'transact! w/transact!
Expand Down
125 changes: 74 additions & 51 deletions src/datahike/writing.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"Manage all state changes and access to state of durable store."
(:require [datahike.connections :refer [delete-connection! *connections*]]
[datahike.db :as db]
[datahike.db.transaction :as dbt]
[datahike.db.utils :as dbu]
[datahike.index :as di]
[datahike.store :as ds]
Expand Down Expand Up @@ -47,14 +48,16 @@

(defn db->stored
"Maps memory db to storage layout and flushes dirty indices."
[db flush? sync?]
[last-flushed db tx-data flush? sync?]
(when-not (dbu/db? db)
(dt/raise "Argument is not a database."
{:type :argument-is-not-a-db
:argument db}))
(let [{:keys [eavt aevt avet temporal-eavt temporal-aevt temporal-avet
schema rschema system-entities ident-ref-map ref-ident-map config
max-tx max-eid op-count hash meta store]} db
{:keys [keep-history? tx-log-size]} config
tx-data (vec (concat (:tx-data last-flushed) tx-data))
schema-meta {:schema schema
:rschema rschema
:system-entities system-entities
Expand All @@ -63,64 +66,84 @@
schema-meta-key (uuid schema-meta)
backend (di/konserve-backend (:index config) store)
not-in-memory? (not= :mem (-> config :store :backend))
flush! (and flush? not-in-memory?)
tx-log-too-big? (> (count tx-data) tx-log-size)
flush! (and flush? not-in-memory? tx-log-too-big?)
schema-meta-op (when-not (sc/write-cache-has? (:store config) schema-meta-key)
(sc/add-to-write-cache (:store config) schema-meta-key)
(k/assoc store schema-meta-key schema-meta {:sync? sync?}))]
(when-not (sc/cache-has? schema-meta-key)
(sc/cache-miss schema-meta-key schema-meta))
[schema-meta-op
(merge
{:schema-meta-key schema-meta-key
:config config
:meta meta
:hash hash
:max-tx max-tx
:max-eid max-eid
:op-count op-count
:eavt-key (cond-> eavt flush! (di/-flush backend))
:aevt-key (cond-> aevt flush! (di/-flush backend))
:avet-key (cond-> avet flush! (di/-flush backend))}
(when (:keep-history? config)
{:temporal-eavt-key (cond-> temporal-eavt flush! (di/-flush backend))
:temporal-aevt-key (cond-> temporal-aevt flush! (di/-flush backend))
:temporal-avet-key (cond-> temporal-avet flush! (di/-flush backend))}))]))
(k/assoc store schema-meta-key schema-meta {:sync? sync?}))
_ (when-not (sc/cache-has? schema-meta-key)
(sc/cache-miss schema-meta-key schema-meta))
eavt (if flush! (di/-flush eavt backend) (:eavt-key last-flushed))
aevt (if flush! (di/-flush aevt backend) (:aevt-key last-flushed))
avet (if flush! (di/-flush avet backend) (:avet-key last-flushed))
temporal-eavt (if (and keep-history? flush!) (di/-flush temporal-eavt backend) (:temporal-eavt-key last-flushed))
temporal-aevt (if (and keep-history? flush!) (di/-flush temporal-aevt backend) (:temporal-aevt-key last-flushed))
temporal-avet (if (and keep-history? flush!) (di/-flush temporal-avet backend) (:temporal-avet-key last-flushed))
stored-tx-data (if tx-log-too-big? [] tx-data)
db (merge
{:schema-meta-key schema-meta-key
:tx-data stored-tx-data
:config config
:meta meta
:hash hash
:max-tx max-tx
:max-eid max-eid
:op-count op-count
:eavt-key eavt
:aevt-key aevt
:avet-key avet}
(when (:keep-history? config)
{:temporal-eavt-key temporal-eavt
:temporal-aevt-key temporal-aevt
:temporal-avet-key temporal-avet}))
last-flushed (if flush!
{:tx-data stored-tx-data
:eavt-key eavt
:aevt-key aevt
:avet-key avet
:temporal-eavt-key temporal-eavt
:temporal-aevt-key temporal-aevt
:temporal-avet-key temporal-avet}
(assoc last-flushed :tx-data tx-data))]
[schema-meta-op last-flushed db]))

(defn stored->db
"Constructs in-memory db instance from stored map value."
[stored-db store]
(let [{:keys [eavt-key aevt-key avet-key
temporal-eavt-key temporal-aevt-key temporal-avet-key
schema rschema system-entities ref-ident-map ident-ref-map
config max-tx max-eid op-count hash meta schema-meta-key]
config max-tx max-eid op-count hash meta schema-meta-key tx-data]
:or {op-count 0}} stored-db
schema-meta (or (sc/cache-lookup schema-meta-key)
;; not in store in case we load an old db where the schema meta data was inline
(when-let [schema-meta (k/get store schema-meta-key nil {:sync? true})]
(sc/cache-miss schema-meta-key schema-meta)
schema-meta))
empty (db/empty-db nil config store)]
(merge
(assoc empty
:max-tx max-tx
:max-eid max-eid
:config config
:meta meta
:schema schema
:hash hash
:op-count op-count
:eavt eavt-key
:aevt aevt-key
:avet avet-key
:temporal-eavt temporal-eavt-key
:temporal-aevt temporal-aevt-key
:temporal-avet temporal-avet-key
:rschema rschema
:system-entities system-entities
:ident-ref-map ident-ref-map
:ref-ident-map ref-ident-map
:store store)
schema-meta)))
(reduce dbt/with-datom
(merge
(assoc empty
:max-tx max-tx
:max-eid max-eid
:config config
:meta meta
:schema schema
:hash hash
:op-count op-count
:eavt eavt-key
:aevt aevt-key
:avet avet-key
:temporal-eavt temporal-eavt-key
:temporal-aevt temporal-aevt-key
:temporal-avet temporal-avet-key
:rschema rschema
:system-entities system-entities
:ident-ref-map ident-ref-map
:ref-ident-map ref-ident-map
:store store)
schema-meta)
tx-data)))

(defn branch-heads-as-commits [store parents]
(set (doall (for [p parents]
Expand All @@ -142,27 +165,27 @@
(uuid [hash max-tx max-eid meta])))

(defn commit!
([db parents]
(commit! db parents true))
([db parents sync?]
([last-flushed db-after tx-data parents]
(commit! last-flushed db-after tx-data parents true))
([last-flushed db-after tx-data parents sync?]
(async+sync sync? *default-sync-translation*
(go-try-
(let [{:keys [store config]} db
(let [{:keys [store config]} db-after
parents (or parents #{(get config :branch)})
parents (branch-heads-as-commits store parents)
cid (create-commit-id db)
db (-> db
cid (create-commit-id db-after)
db (-> db-after
(assoc-in [:meta :datahike/commit-id] cid)
(assoc-in [:meta :datahike/parents] parents))
[schema-meta-op db-to-store] (db->stored db true sync?)
[schema-meta-op last-flushed db-to-store] (db->stored last-flushed db tx-data true sync?)
_ (<?- (flush-pending-writes store sync?))
commit-log-op (k/assoc store cid db-to-store {:sync? sync?})
branch-op (k/assoc store (:branch config) db-to-store {:sync? sync?})]
;; now wait for all the writes to complete
(when (and schema-meta-op (not sync?)) (<?- schema-meta-op))
(<?- commit-log-op)
(<?- branch-op)
db)))))
[last-flushed db])))))

(defn complete-db-update [old tx-report]
(let [{:keys [writer]} old
Expand Down