Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Postgres storage stage #160

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 98 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ r2d2_redis = "0.14.0"
deno_runtime = { version = "0.136.0", optional = true }
chrono = { version = "0.4.31", optional = true }
utxorpc = { version = "1.0.0-alpha.1", optional = true }
bb8-postgres = "0.8.1"

[features]
async = ["futures"]
Expand Down
9 changes: 9 additions & 0 deletions examples/deno-postgres/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
## Setup Postgres DB
```bash
docker compose up -d
```

## Run scrolls
```bash
RUST_BACKTRACE=1 cargo run --bin scrolls --features deno -- daemon --config examples/deno-postgres/daemon.toml
```
26 changes: 26 additions & 0 deletions examples/deno-postgres/daemon.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
[source]
type = "N2N"
peers = ["relays-new.cardano-mainnet.iohk.io:3001"]

[chain]
type = "mainnet"

[intersect]
type = "Point"
value = [
104699772,
"19525913a14c4540a782d188c333f2c54d1845620aef56e3166a2c1fffb800fc"
]

[enrich]
type = "Skip"

[reducer]
type = "Deno"
main_module = "./examples/deno-postgres/reduce.js"
storage_command_type = "sql"
use_async = true

[storage]
type = "Postgres"
url = "postgresql://postgres:password@localhost:5432/scrolls"
19 changes: 19 additions & 0 deletions examples/deno-postgres/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
version: "3"

services:
postgres:
image: postgres
container_name: postgres
ports:
- "5432:5432"
environment:
POSTGRES_DB: scrolls
POSTGRES_USER: postgres
POSTGRES_PASSWORD: password
volumes:
- ./schema.sql:/docker-entrypoint-initdb.d/schema.sql
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres -d scrolls"]
interval: 10s
timeout: 5s
retries: 5
48 changes: 48 additions & 0 deletions examples/deno-postgres/reduce.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
export async function reduce(block) {
let sqlCommands = [
`
INSERT INTO
balance_by_address (address, balance)
VALUES
(
(
SELECT
STRING_AGG(
SUBSTRING(
'ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
FROM
(FLOOR(RANDOM() * 36):: INT + 1) FOR 1
),
''
)
FROM
generate_series(1, 10)
),
FLOOR(RANDOM() * 10001):: BIGINT
);
`,
`
INSERT INTO
balance_by_stake_address (address, balance)
VALUES
(
(
SELECT
STRING_AGG(
SUBSTRING(
'ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
FROM
(FLOOR(RANDOM() * 36):: INT + 1) FOR 1
),
''
)
FROM
generate_series(1, 10)
),
FLOOR(RANDOM() * 10001):: BIGINT
);
`,
];

return sqlCommands;
}
9 changes: 9 additions & 0 deletions examples/deno-postgres/schema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
CREATE TABLE balance_by_address (
address TEXT PRIMARY KEY,
balance BIGINT NOT NULL
);

CREATE TABLE balance_by_stake_address (
address TEXT PRIMARY KEY,
balance BIGINT NOT NULL
);
1 change: 1 addition & 0 deletions examples/deno/daemon.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type = "Skip"
[reducer]
type = "Deno"
main_module = "./examples/deno/enrich.js"
storage_command_type = "crdt"
use_async = true

[storage]
Expand Down
5 changes: 4 additions & 1 deletion src/crosscut/filters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,10 @@ pub fn eval_predicate(
mod tests {
use pallas::ledger::traverse::MultiEraBlock;

use crate::{crosscut::policies::{ErrorAction, RuntimePolicy}, framework::model::BlockContext};
use crate::{
crosscut::policies::{ErrorAction, RuntimePolicy},
framework::model::BlockContext,
};

use super::{eval_predicate, AddressPattern, Predicate};

Expand Down
2 changes: 2 additions & 0 deletions src/framework/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ pub enum Record {
RawBlockPayload(Vec<u8>),
EnrichedBlockPayload(Vec<u8>, BlockContext),
CRDTCommand(Vec<CRDTCommand>),
SQLCommand(Vec<String>),
}

#[derive(Debug, Clone)]
pub enum ChainEvent {
Apply(Point, Record),
Reset(Point),
}

impl ChainEvent {
pub fn apply(point: Point, record: impl Into<Record>) -> gasket::messaging::Message<Self> {
gasket::messaging::Message {
Expand Down
23 changes: 17 additions & 6 deletions src/reducers/deno/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,15 @@ pub fn op_put_record(
#[derive(Deserialize)]
pub struct Config {
main_module: String,
storage_command_type: String,
use_async: bool,
}

impl Config {
pub fn bootstrapper(self, _ctx: &Context) -> Result<Stage, Error> {
let stage = Stage {
main_module: PathBuf::from(self.main_module),
storage_command_type: self.storage_command_type,
call_snippet: if self.use_async {
ASYNC_CALL_SNIPPET
} else {
Expand Down Expand Up @@ -97,6 +99,7 @@ async fn setup_deno(main_module: &PathBuf) -> Result<DenoWorker, WorkerError> {
#[stage(name = "reducer-deno", unit = "ChainEvent", worker = "Worker")]
pub struct Stage {
main_module: PathBuf,
storage_command_type: String,
call_snippet: &'static str,

pub input: ReducerInputPort,
Expand Down Expand Up @@ -152,12 +155,20 @@ impl gasket::framework::Worker<Stage> for Worker {
deno.js_runtime.op_state().borrow_mut().try_take();

trace!(?out, "received record from js runtime");
if let Some(crdt_json) = out {
let commands: Vec<CRDTCommand> =
serde_json::from_value(crdt_json).or_panic()?;
let evt =
ChainEvent::apply(unit.point().clone(), Record::CRDTCommand(commands));
stage.output.send(evt).await.or_retry()?;
if let Some(json) = out {
let event = match stage.storage_command_type.as_str() {
"crdt" => {
let commands: Vec<CRDTCommand> =
serde_json::from_value(json).or_panic()?;
ChainEvent::apply(unit.point().clone(), Record::CRDTCommand(commands))
}
"sql" => {
let commands: Vec<String> = serde_json::from_value(json).or_panic()?;
ChainEvent::apply(unit.point().clone(), Record::SQLCommand(commands))
}
_ => return Err(WorkerError::Panic),
};
stage.output.send(event).await.or_retry()?;
}
}
_ => todo!(),
Expand Down
6 changes: 6 additions & 0 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ use serde::Deserialize;

use crate::framework::{errors::Error, *};

pub mod postgres;
pub mod redis;

pub enum Bootstrapper {
Postgres(postgres::Stage),
Redis(redis::Stage),
}

Expand All @@ -16,12 +18,14 @@ impl StageBootstrapper for Bootstrapper {

fn connect_input(&mut self, adapter: InputAdapter) {
match self {
Bootstrapper::Postgres(p) => p.input.connect(adapter),
Bootstrapper::Redis(p) => p.input.connect(adapter),
}
}

fn spawn(self, policy: gasket::runtime::Policy) -> Tether {
match self {
Bootstrapper::Postgres(x) => gasket::runtime::spawn_stage(x, policy),
Bootstrapper::Redis(s) => gasket::runtime::spawn_stage(s, policy),
}
}
Expand All @@ -30,12 +34,14 @@ impl StageBootstrapper for Bootstrapper {
#[derive(Deserialize)]
#[serde(tag = "type")]
pub enum Config {
Postgres(postgres::Config),
Redis(redis::Config),
}

impl Config {
pub fn bootstrapper(self, ctx: &Context) -> Result<Bootstrapper, Error> {
match self {
Config::Postgres(c) => Ok(Bootstrapper::Postgres(c.bootstrapper(ctx)?)),
Config::Redis(c) => Ok(Bootstrapper::Redis(c.bootstrapper(ctx)?)),
}
}
Expand Down
Loading