Skip to content

Commit

Permalink
Implement script-based migrations (#139)
Browse files Browse the repository at this point in the history
This PR implements the second and last step of the transition to script based migrations (#101).

After this PR, new CW-D database migrations can be implemented by creating new scripts in the `haskell-src/db-schema/migrations` folder. The scripts in that folder must be named as `1.2.3.4_somename.sql`. This file names correspond to the version components `[1,2,3,4]` along with the step name `somename.sql`. The version components can contain an arbitrary number of elements, so `1.2.3_othername.sql` is also valid.

The migration logic implemented by this PR aims to be fairly conservative in that it expects the existing migrations to be a perfect prefix of the incoming migrations with the correct order. The order of the migrations are defined by the version components. The condition that the existing migrations need to be a prefix all the desired migrations means that once a set of migrations are run, we can only append new migrations and those migrations have to have version components that are bigger than the existing migrations. 

The reason why we're being conservative like this is to avoid very subtle issues that occasionally arise due to migrations running in different orders in different deployments.

It's also worth noting that the `--migrations-folder` introduced by this PR is optional and when that argument is not provided, CW-D uses the set of migrations that get embedded into the binary from the repository during compilation. The purpose is to avoid increasing the operational complexity of running CW-D from a compiled binary. The set of migrations associated with a CW-D release are tightly coupled with the Haskell code that comes with it anyway.

Another point worth noting is that this migrations workflow also allows CW-D operators to interleave their own migrations with the official migrations that come with CW-D. If the operator of a particular CW-D node wants to include additional migrations, they can do so by maintaining a `migrations` folder of their own and including the official CW-D migrations side by side with their own migrations. In this setup, they need to name their own migration scripts to have version numbers that are compatible with this migration workflow.

Resolves #101

* Implement script-based migrations

* Return the migration steps in matchRecursive

* Preorder steps and detect duplicates

* Fix warnings

* Base64 encode the migration checksum

* Simplify matchSteps
  • Loading branch information
enobayram authored Mar 24, 2023
1 parent d4ec40e commit 75e7606
Show file tree
Hide file tree
Showing 11 changed files with 406 additions and 244 deletions.
5 changes: 4 additions & 1 deletion haskell-src/chainweb-data.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ library
ChainwebData.Spec
ChainwebDb.BoundedScan
ChainwebDb.Database
ChainwebDb.Migration
ChainwebDb.Queries
ChainwebDb.Types.Block
ChainwebDb.Types.Common
Expand All @@ -77,6 +78,7 @@ library
ChainwebDb.Types.Transfer
build-depends:
base64-bytestring ^>=1.0
, cryptohash
, Decimal
, gargoyle
, gargoyle-postgresql
Expand All @@ -85,6 +87,7 @@ library
, http-types
, openapi3
, optparse-applicative >=0.14 && <0.17
, postgresql-simple-migration
, servant-client
, servant-openapi3
, yet-another-logger
Expand All @@ -101,7 +104,7 @@ flag threaded
executable chainweb-data
import: commons
main-is: Main.hs
hs-source-dirs: exec data
hs-source-dirs: exec data db-schema
if flag(threaded)
ghc-options: -threaded -rtsopts -with-rtsopts=-N
build-depends:
Expand Down
154 changes: 154 additions & 0 deletions haskell-src/db-schema/init.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
CREATE TABLE blocks (
chainid bigint NOT NULL,
creationtime timestamp with time zone NOT NULL,
epoch timestamp with time zone NOT NULL,
flags numeric(20,0) NOT NULL,
hash character varying NOT NULL,
height bigint NOT NULL,
miner character varying NOT NULL,
nonce numeric(20,0) NOT NULL,
parent character varying NOT NULL,
payload character varying NOT NULL,
powhash character varying NOT NULL,
predicate character varying NOT NULL,
target numeric(80,0) NOT NULL,
weight numeric(80,0) NOT NULL
);

ALTER TABLE ONLY blocks
ADD CONSTRAINT blocks_pkey PRIMARY KEY (hash);

CREATE TABLE events (
block character varying NOT NULL,
chainid bigint NOT NULL,
height bigint NOT NULL,
idx bigint NOT NULL,
module character varying NOT NULL,
modulehash character varying NOT NULL,
name character varying NOT NULL,
params jsonb NOT NULL,
paramtext character varying NOT NULL,
qualname character varying NOT NULL,
requestkey character varying NOT NULL
);

ALTER TABLE ONLY events
ADD CONSTRAINT events_pkey PRIMARY KEY (block, idx, requestkey);

ALTER TABLE ONLY events
ADD CONSTRAINT events_block_fkey FOREIGN KEY (block) REFERENCES blocks(hash);

CREATE INDEX events_height_chainid_idx
ON events
USING btree (height DESC, chainid, idx);

CREATE INDEX events_height_name_expr_expr1_idx
ON events
USING btree (height DESC, name, ((params ->> 0)), ((params ->> 1))) WHERE ((name)::text = 'TRANSFER'::text);

CREATE INDEX events_requestkey_idx
ON events
USING btree (requestkey);

CREATE TABLE minerkeys (
block character varying NOT NULL,
key character varying NOT NULL
);

ALTER TABLE ONLY minerkeys
ADD CONSTRAINT minerkeys_pkey PRIMARY KEY (block, key);

ALTER TABLE ONLY minerkeys
ADD CONSTRAINT minerkeys_block_fkey FOREIGN KEY (block) REFERENCES blocks(hash);


CREATE TABLE signers (
addr character varying,
caps jsonb NOT NULL,
idx integer NOT NULL,
pubkey character varying NOT NULL,
requestkey character varying NOT NULL,
scheme character varying,
sig character varying NOT NULL
);

ALTER TABLE ONLY signers
ADD CONSTRAINT signers_pkey PRIMARY KEY (idx, requestkey);


CREATE TABLE transactions (
badresult jsonb,
block character varying NOT NULL,
chainid bigint NOT NULL,
code character varying,
continuation jsonb,
creationtime timestamp with time zone NOT NULL,
data jsonb,
gas bigint NOT NULL,
gaslimit bigint NOT NULL,
gasprice double precision NOT NULL,
goodresult jsonb,
height bigint NOT NULL,
logs character varying,
metadata jsonb,
nonce character varying NOT NULL,
num_events bigint,
pactid character varying,
proof character varying,
requestkey character varying NOT NULL,
rollback boolean,
sender character varying NOT NULL,
step bigint,
ttl bigint NOT NULL,
txid bigint
);

ALTER TABLE ONLY transactions
ADD CONSTRAINT transactions_pkey PRIMARY KEY (block, requestkey);

ALTER TABLE ONLY transactions
ADD CONSTRAINT transactions_block_fkey FOREIGN KEY (block) REFERENCES blocks(hash);

CREATE INDEX transactions_height_idx
ON transactions
USING btree (height);

CREATE INDEX transactions_requestkey_idx
ON transactions
USING btree (requestkey);


CREATE TABLE transfers (
amount numeric NOT NULL,
block character varying NOT NULL,
chainid bigint NOT NULL,
from_acct character varying NOT NULL,
height bigint NOT NULL,
idx bigint NOT NULL,
modulehash character varying NOT NULL,
modulename character varying NOT NULL,
requestkey character varying NOT NULL,
to_acct character varying NOT NULL
);

ALTER TABLE ONLY transfers
ADD CONSTRAINT transfers_pkey PRIMARY KEY (block, chainid, idx, modulehash, requestkey);

CREATE INDEX transfers_from_acct_height_idx
ON transfers
USING btree (from_acct, height DESC, idx);


CREATE INDEX transfers_to_acct_height_idx_idx
ON transfers
USING btree (to_acct, height DESC, idx);

ALTER TABLE ONLY transfers
ADD CONSTRAINT transfers_block_fkey FOREIGN KEY (block) REFERENCES blocks(hash);


CREATE TABLE schema_migrations (
filename character varying(512) NOT NULL,
checksum character varying(32) NOT NULL,
executed_at timestamp without time zone DEFAULT now() NOT NULL
);
Empty file.
2 changes: 1 addition & 1 deletion haskell-src/exec/Chainweb/Backfill.hs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ backfillBlocksCut env args cutBS = do
headersBetween env range >>= \case
Left e -> logg Error $ fromString $ printf "ApiError for range %s: %s" (show range) (show e)
Right [] -> logg Error $ fromString $ printf "headersBetween: %s" $ show range
Right hs -> writeBlocks env pool False count hs
Right hs -> writeBlocks env pool count hs
delayFunc

-- | For all blocks written to the DB, find the shortest (in terms of block
Expand Down
75 changes: 2 additions & 73 deletions haskell-src/exec/Chainweb/Gaps.hs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}

module Chainweb.Gaps ( gaps ) where

Expand All @@ -17,20 +15,16 @@ import ChainwebData.Types
import Control.Concurrent
import Control.Concurrent.Async
import Control.Monad
import Control.Monad.Catch
import Control.Scheduler
import Data.Bool
import Data.ByteString.Lazy (ByteString)
import Data.IORef
import Data.Int
import qualified Data.Map.Strict as M
import qualified Data.Pool as P
import Data.String
import Data.Text (Text)
import Database.Beam hiding (insert)
import Database.Beam.Postgres
import Database.PostgreSQL.Simple
import Database.PostgreSQL.Simple.Types
import System.Logger hiding (logg)
import System.Exit (exitFailure)
import Text.Printf
Expand Down Expand Up @@ -72,13 +66,10 @@ gapsCut env args cutBS = do
(traverseConcurrently_ Par' doChain (M.toList gapsByChain))
final <- readIORef count
logg Info $ fromString $ printf "Filled in %d missing blocks." final
if disableIndexesPred
then withDroppedIndexes pool logg gapFiller
else gapFiller
gapFiller
where
pool = _env_dbConnPool env
delay = _fillArgs_delayMicros args
disableIndexesPred = _fillArgs_disableIndexes args
gi = mkGenesisInfo $ _env_nodeInfo env
logg = _env_logger env
createRanges cid (low, high)
Expand All @@ -92,71 +83,9 @@ gapsCut env args cutBS = do
headersBetween env range >>= \case
Left e -> logger Error $ fromString $ printf "ApiError for range %s: %s" (show range) (show e)
Right [] -> logger Error $ fromString $ printf "headersBetween: %s" $ show range
Right hs -> writeBlocks env pool disableIndexesPred count hs
Right hs -> writeBlocks env pool count hs
maybe mempty threadDelay delay

listIndexes :: P.Pool Connection -> LogFunctionIO Text -> IO [(String, String)]
listIndexes pool logger = P.withResource pool $ \conn -> do
res <- query_ conn qry
forM_ res $ \(_,name) -> do
logger Debug "index name"
logger Debug $ fromString name
return res
where
qry =
"SELECT tablename, indexname FROM pg_indexes WHERE schemaname='public';"

dropIndexes :: P.Pool Connection -> [(String, String)] -> IO ()
dropIndexes pool indexinfos = forM_ indexinfos $ \(tablename, indexname) -> P.withResource pool $ \conn ->
execute_ conn $ Query $ fromString $ printf "ALTER TABLE %s DROP CONSTRAINT %s CASCADE;" tablename indexname

dropExtensions :: P.Pool Connection -> IO ()
dropExtensions pool = P.withResource pool $ \conn ->
mapM_ (execute_ conn . Query) stmts
where
stmts = map ("DROP EXTENSION " <>) ["btree_gin;"]


dedupeMinerKeysTable :: P.Pool Connection -> LogFunctionIO Text -> IO ()
dedupeMinerKeysTable pool logger = do
logger Info "Deduping minerkeys table"
P.withResource pool $ \conn ->
void $ execute_ conn dedupestmt
where
dedupestmt =
"DELETE FROM minerkeys WHERE ctid IN (SELECT\
\ ctid FROM (SELECT\
\ block,key,ctid,row_number() OVER (PARTITION BY\
\ block,key) AS row_num FROM minerkeys) t WHERE t.row_num >1);"

dedupeSignersTable :: P.Pool Connection -> LogFunctionIO Text -> IO ()
dedupeSignersTable pool logger = do
logger Debug "Deduping signers table"
P.withResource pool $ \conn ->
void $ execute_ conn dedupestmt
where
dedupestmt =
"DELETE FROM signers WHERE ctid IN (SELECT ctid\
\ FROM (SELECT requestkey,idx,ctid,row_number() OVER (PARTITION BY requestkey,idx)\
\ AS row_num FROM signers) t WHERE t.row_num > 1);"

dedupeTables :: P.Pool Connection -> LogFunctionIO Text -> IO ()
dedupeTables pool logger = do
-- We don't need to dedupe the following tables because their primary keys
-- should be unique across any data we might encounter:
-- events, transactions, blocks
dedupeMinerKeysTable pool logger
dedupeSignersTable pool logger

withDroppedIndexes :: P.Pool Connection -> LogFunctionIO Text -> IO a -> IO a
withDroppedIndexes pool logger action = do
indexInfos <- listIndexes pool logger
fmap fst $ generalBracket (dropIndexes pool indexInfos >> dropExtensions pool) release (const action)
where
release _ = \case
ExitCaseSuccess _ -> dedupeTables pool logger
_ -> return ()

getBlockGaps :: Env -> M.Map Int64 (Maybe Int64) -> IO (M.Map Int64 [(Int64,Int64)])
getBlockGaps env existingMinHeights = withDbDebug env Debug $ do
let toMap = M.fromListWith (<>) . map (\(cid,a,b) -> (cid,[(a,b)]))
Expand Down
2 changes: 1 addition & 1 deletion haskell-src/exec/Chainweb/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ scheduledUpdates env pool ssRef runFill fillDelay = forever $ do

when runFill $ do
logg Info "Filling missing blocks"
gaps env (FillArgs fillDelay False)
gaps env (FillArgs fillDelay)
logg Info "Fill finished"
where
micros = 1000000
Expand Down
29 changes: 12 additions & 17 deletions haskell-src/exec/Chainweb/Worker.hs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import Data.Tuple.Strict (T2(..))
import Database.Beam hiding (insert)
import Database.Beam.Backend.SQL.BeamExtensions
import Database.Beam.Postgres
import Database.Beam.Postgres.Full (insert, onConflictDefault, onConflict)
import Database.Beam.Postgres.Full (insert, onConflict)
import Database.PostgreSQL.Simple.Transaction (withTransaction,withSavepoint)
import System.Logger hiding (logg)
---
Expand Down Expand Up @@ -82,39 +82,34 @@ writes pool b ks ts es ss tf = P.withResource pool $ \c -> withTransaction c $ d
-- (unDbHash $ _block_hash b)
-- (map (const '.') ts)

batchWrites :: P.Pool Connection -> Bool -> [Block] -> [[T.Text]] -> [[Transaction]] -> [[Event]] -> [[Signer]] -> [[Transfer]] -> IO ()
batchWrites pool indexesDisabled bs kss tss ess sss tfs = P.withResource pool $ \c -> withTransaction c $ do
batchWrites :: P.Pool Connection -> [Block] -> [[T.Text]] -> [[Transaction]] -> [[Event]] -> [[Signer]] -> [[Transfer]] -> IO ()
batchWrites pool bs kss tss ess sss tfs = P.withResource pool $ \c -> withTransaction c $ do
runBeamPostgres c $ do
-- Write the Blocks if unique
runInsert
$ insert (_cddb_blocks database) (insertValues bs)
$ actionOnConflict $ onConflict (conflictingFields primaryKey) onConflictDoNothing
$ onConflict (conflictingFields primaryKey) onConflictDoNothing
-- Write Pub Key many-to-many relationships if unique --
runInsert
$ insert (_cddb_minerkeys database) (insertValues $ concat $ zipWith (\b ks -> map (MinerKey (pk b)) ks) bs kss)
$ actionOnConflict $ onConflict (conflictingFields primaryKey) onConflictDoNothing
$ onConflict (conflictingFields primaryKey) onConflictDoNothing

withSavepoint c $ do
runBeamPostgres c $ do
-- Write the TXs if unique
runInsert
$ insert (_cddb_transactions database) (insertValues $ concat tss)
$ actionOnConflict $ onConflict (conflictingFields primaryKey) onConflictDoNothing
$ onConflict (conflictingFields primaryKey) onConflictDoNothing

runInsert
$ insert (_cddb_events database) (insertValues $ concat ess)
$ actionOnConflict $ onConflict (conflictingFields primaryKey) onConflictDoNothing
$ onConflict (conflictingFields primaryKey) onConflictDoNothing
runInsert
$ insert (_cddb_signers database) (insertValues $ concat sss)
$ actionOnConflict $ onConflict (conflictingFields primaryKey) onConflictDoNothing
$ onConflict (conflictingFields primaryKey) onConflictDoNothing
runInsert
$ insert (_cddb_transfers database) (insertValues $ concat tfs)
$ actionOnConflict $ onConflict (conflictingFields primaryKey) onConflictDoNothing
where
{- the type system won't allow me to simply inline the "other" expression -}
actionOnConflict other = if indexesDisabled
then onConflictDefault -- There shouldn't be any constraints on any tables, so this SHOULD be no-op
else other
$ onConflict (conflictingFields primaryKey) onConflictDoNothing


asPow :: BlockHeader -> PowHeader
Expand Down Expand Up @@ -148,8 +143,8 @@ writeBlock env pool count bh = do
policy :: RetryPolicyM IO
policy = exponentialBackoff 250_000 <> limitRetries 3

writeBlocks :: Env -> P.Pool Connection -> Bool -> IORef Int -> [BlockHeader] -> IO ()
writeBlocks env pool disableIndexesPred count bhs = do
writeBlocks :: Env -> P.Pool Connection -> IORef Int -> [BlockHeader] -> IO ()
writeBlocks env pool count bhs = do
iforM_ blocksByChainId $ \chain (Sum numWrites, bhs') -> do
let ff bh = (hashToDbHash $ _blockHeader_payloadHash bh, _blockHeader_hash bh)
retrying policy check (const $ payloadWithOutputsBatch env chain (M.fromList (ff <$> bhs')) id) >>= \case
Expand All @@ -171,7 +166,7 @@ writeBlocks env pool disableIndexesPred count bhs = do
err = printf "writeBlocks failed because we don't know how to work this version %s" version
withEventsMinHeight version err $ \evMinHeight -> do
let !tfs = M.intersectionWith (\pl bh -> mkTransferRows (fromIntegral $ _blockHeader_height bh) (_blockHeader_chainId bh) (DbHash $ hashB64U $ _blockHeader_hash bh) (posixSecondsToUTCTime $ _blockHeader_creationTime bh) pl evMinHeight) pls (makeBlockMap bhs')
batchWrites pool disableIndexesPred (M.elems bs) (M.elems kss) (M.elems tss) (M.elems ess) (M.elems sss) (M.elems tfs)
batchWrites pool (M.elems bs) (M.elems kss) (M.elems tss) (M.elems ess) (M.elems sss) (M.elems tfs)
atomicModifyIORef' count (\n -> (n + numWrites, ()))
where

Expand Down
Loading

0 comments on commit 75e7606

Please sign in to comment.