-
Notifications
You must be signed in to change notification settings - Fork 72
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: implement Redis option for cursor persistence (#790)
- Loading branch information
1 parent
8dde4f8
commit 3b205b0
Showing
6 changed files
with
245 additions
and
3 deletions.
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
# Redis cursor | ||
|
||
This example shows you how to persits the cursor information on a Redis cluster. | ||
|
||
The `daemon.toml` includes the `[cursor]` section that has `type` set to `Redis`, `key` is the key on the redis cluster where to dump the information, and `url` the connection string to connect with Redis. | ||
|
||
To run the example: | ||
|
||
* Set up [Demeter CLI](https://docs.demeter.run/cli). | ||
* On a different terminal but same path run `dmtr ports tunnel`. | ||
Chose the `node` option, followed by the `preprod` network and the `stable` version. Finally, mount the socket on `./socket`. | ||
* `docker compose up -d` to spin up the Redis instance. | ||
* ```sh | ||
cargo run --bin oura --features redis daemon --config daemon.toml | ||
``` | ||
|
||
In order to see cursor information on the Redis you can do the following. | ||
|
||
```sh | ||
$ docker exec -it redis redis-cli | ||
127.0.0.1:6379> GET key | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
[source] | ||
type = "N2C" | ||
socket_path = "./socket" | ||
|
||
[intersect] | ||
type = "Origin" | ||
|
||
[chain] | ||
type = "preprod" | ||
|
||
[cursor] | ||
type = "Redis" | ||
key = "key" | ||
url = "redis://localhost:6379" | ||
flush_interval = 1 | ||
|
||
[sink] | ||
type = "Stdout" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
version: "3" | ||
services: | ||
redis: | ||
image: redis | ||
container_name: redis | ||
ports: | ||
- "6379:6379" | ||
networks: | ||
- redis-network | ||
healthcheck: | ||
test: ["CMD", "redis-cli", "ping"] | ||
interval: 5s | ||
timeout: 5s | ||
retries: 5 | ||
|
||
networks: | ||
redis-network: | ||
driver: bridge |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,163 @@ | ||
use gasket::framework::*; | ||
use pallas::network::miniprotocols::Point; | ||
use r2d2_redis::{ | ||
r2d2::{self, Pool}, | ||
redis::{self, Commands}, | ||
RedisConnectionManager, | ||
}; | ||
use serde::Deserialize; | ||
use tokio::select; | ||
use tracing::debug; | ||
|
||
use crate::framework::*; | ||
|
||
fn breadcrumbs_to_data(crumbs: &Breadcrumbs) -> Vec<(u64, String)> { | ||
crumbs | ||
.points() | ||
.into_iter() | ||
.filter_map(|p| match p { | ||
Point::Origin => None, | ||
Point::Specific(slot, hash) => Some((slot, hex::encode(hash))), | ||
}) | ||
.collect() | ||
} | ||
|
||
fn breadcrumbs_from_data(data: Vec<(u64, String)>, max: usize) -> Result<Breadcrumbs, Error> { | ||
let points: Vec<_> = data | ||
.into_iter() | ||
.map::<Result<_, Error>, _>(|(slot, hash)| { | ||
let hash = hex::decode(hash).map_err(Error::custom)?; | ||
Ok(Point::Specific(slot, hash)) | ||
}) | ||
.collect::<Result<_, _>>()?; | ||
|
||
Ok(Breadcrumbs::from_points(points, max)) | ||
} | ||
|
||
pub enum Unit { | ||
Track(Point), | ||
Flush, | ||
} | ||
|
||
pub struct Worker { | ||
pool: Pool<RedisConnectionManager>, | ||
key: String, | ||
} | ||
|
||
#[async_trait::async_trait(?Send)] | ||
impl gasket::framework::Worker<Stage> for Worker { | ||
async fn bootstrap(stage: &Stage) -> Result<Self, WorkerError> { | ||
let manager = RedisConnectionManager::new(stage.url.clone()).or_panic()?; | ||
let pool = r2d2::Pool::builder().build(manager).or_panic()?; | ||
|
||
Ok(Self { | ||
pool, | ||
key: stage.key.clone(), | ||
}) | ||
} | ||
|
||
async fn schedule(&mut self, stage: &mut Stage) -> Result<WorkSchedule<Unit>, WorkerError> { | ||
select! { | ||
msg = stage.track.recv() => { | ||
let msg = msg.or_panic()?; | ||
Ok(WorkSchedule::Unit(Unit::Track(msg.payload))) | ||
} | ||
msg = stage.flush.recv() => { | ||
msg.or_panic()?; | ||
Ok(WorkSchedule::Unit(Unit::Flush)) | ||
} | ||
} | ||
} | ||
|
||
async fn execute(&mut self, unit: &Unit, stage: &mut Stage) -> Result<(), WorkerError> { | ||
match unit { | ||
Unit::Track(x) => stage.breadcrumbs.track(x.clone()), | ||
Unit::Flush => { | ||
let data = breadcrumbs_to_data(&stage.breadcrumbs); | ||
let mut conn = self.pool.get().or_restart()?; | ||
|
||
let data_to_write = serde_json::to_string(&data).or_panic()?; | ||
conn.set(&self.key, &data_to_write) | ||
.map_err(Error::custom) | ||
.or_panic()?; | ||
} | ||
} | ||
|
||
Ok(()) | ||
} | ||
} | ||
|
||
#[derive(Stage)] | ||
#[stage(name = "cursor", unit = "Unit", worker = "Worker")] | ||
pub struct Stage { | ||
key: String, | ||
url: String, | ||
|
||
breadcrumbs: Breadcrumbs, | ||
|
||
pub track: gasket::messaging::InputPort<Point>, | ||
|
||
pub flush: gasket::messaging::TimerPort, | ||
|
||
#[metric] | ||
tracked_slot: gasket::metrics::Gauge, | ||
|
||
#[metric] | ||
flush_count: gasket::metrics::Counter, | ||
} | ||
|
||
const DEFAULT_MAX_BREADCRUMBS: usize = 10; | ||
const DEFAULT_FLUSH_INTERVAL: usize = 10; | ||
|
||
#[derive(Default, Debug, Deserialize)] | ||
pub struct Config { | ||
pub key: String, | ||
pub url: String, | ||
pub max_breadcrumbs: Option<usize>, | ||
pub flush_interval: Option<u64>, | ||
} | ||
|
||
impl Config { | ||
pub fn initial_load(&self) -> Result<Breadcrumbs, Error> { | ||
let client = redis::Client::open(self.url.clone()) | ||
.map_err(|err| Error::Custom(format!("Unable to connect to Redis: {}", err)))?; | ||
let mut conn = client | ||
.get_connection() | ||
.map_err(|err| Error::Custom(format!("Unable to establish connection: {}", err)))?; | ||
|
||
let max_breadcrumbs = self.max_breadcrumbs.unwrap_or(DEFAULT_MAX_BREADCRUMBS); | ||
|
||
let result: redis::RedisResult<Option<String>> = conn.get(&self.key); | ||
|
||
match result { | ||
Ok(Some(data_as_string)) => { | ||
debug!("Retrieving cursor information from redis."); | ||
let data: Vec<(u64, String)> = | ||
serde_json::from_str(&data_as_string).map_err(Error::custom)?; | ||
let crumbs = breadcrumbs_from_data(data, max_breadcrumbs)?; | ||
Ok(crumbs) | ||
} | ||
Ok(None) => { | ||
debug!("No cursor information found on redis cluster."); | ||
Ok(Breadcrumbs::new(max_breadcrumbs)) | ||
} | ||
Err(err) => Err(Error::custom(err)), | ||
} | ||
} | ||
|
||
pub fn bootstrapper(self, ctx: &Context) -> Result<Stage, Error> { | ||
let flush_interval = self.flush_interval.unwrap_or(DEFAULT_FLUSH_INTERVAL as u64); | ||
|
||
let stage = Stage { | ||
key: self.key.clone(), | ||
url: self.url.clone(), | ||
breadcrumbs: ctx.breadcrumbs.clone(), | ||
tracked_slot: Default::default(), | ||
flush_count: Default::default(), | ||
track: Default::default(), | ||
flush: gasket::messaging::TimerPort::from_secs(flush_interval), | ||
}; | ||
|
||
Ok(stage) | ||
} | ||
} |