Skip to content

Commit

Permalink
Will/get local working (#532)
Browse files Browse the repository at this point in the history
* add systemd unit file for tip tracing

* fix

* wip

* fix

* fix

* wip

* fix

* fix

* fix

* t

* wip

* fix

* wip

* update ci
  • Loading branch information
Will-Smith11 authored Mar 27, 2024
1 parent 743da61 commit 87471d6
Show file tree
Hide file tree
Showing 10 changed files with 361 additions and 13 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ jobs:
strategy:
fail-fast: false
matrix:
features: ["sorella-server", "default"]
features: ["sorella-server"]
steps:
- name: Get branch names.
id: branch-names
Expand All @@ -92,7 +92,7 @@ jobs:
strategy:
fail-fast: false
matrix:
features: ["sorella-server", "default"]
features: ["sorella-server"]
steps:
- name: Get branch names.
id: branch-names
Expand Down
6 changes: 3 additions & 3 deletions crates/bin/src/cli/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ mod db_query;
mod ensure_test_traces;
mod export;
mod init;
#[cfg(feature = "sorella-server")]
#[cfg(all(feature = "local-clickhouse", not(feature = "local-no-inserts")))]
mod tip_tracer;
mod trace_range;

Expand Down Expand Up @@ -44,7 +44,7 @@ pub enum DatabaseCommands {
#[cfg(feature = "local-clickhouse")]
#[command(name = "test-traces-init")]
TestTracesInit(ensure_test_traces::TestTraceArgs),
#[cfg(feature = "sorella-server")]
#[cfg(all(feature = "local-clickhouse", not(feature = "local-no-inserts")))]
#[command(name = "trace-at-tip")]
TraceAtTip(tip_tracer::TipTraceArgs),
}
Expand All @@ -60,7 +60,7 @@ impl Database {
DatabaseCommands::Export(cmd) => cmd.execute(ctx).await,
#[cfg(feature = "local-clickhouse")]
DatabaseCommands::TestTracesInit(cmd) => cmd.execute(ctx).await,
#[cfg(feature = "sorella-server")]
#[cfg(all(feature = "local-clickhouse", not(feature = "local-no-inserts")))]
DatabaseCommands::TraceAtTip(cmd) => cmd.execute(ctx).await,
}
}
Expand Down
25 changes: 17 additions & 8 deletions crates/bin/src/cli/db/tip_tracer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,22 @@ use futures::{join, StreamExt};
use tokio::sync::mpsc::unbounded_channel;

use crate::{
cli::{determine_max_tasks, get_env_vars, get_tracing_provider, load_database, static_object},
cli::{get_env_vars, get_tracing_provider, load_read_only_database, static_object},
runner::CliContext,
};

#[derive(Debug, Parser)]
pub struct TipTraceArgs {
/// Start Block
#[arg(long, short)]
pub start_block: u64,
pub start_block: Option<u64>,
}

impl TipTraceArgs {
pub async fn execute(self, ctx: CliContext) -> eyre::Result<()> {
let db_path = get_env_vars()?;

let max_tasks = determine_max_tasks(None) * 2;
let max_tasks = (num_cpus::get_physical() as f64 * 0.7) as u64 + 1;
init_threadpools(max_tasks as usize);
let (metrics_tx, metrics_rx) = unbounded_channel();

Expand All @@ -36,19 +36,28 @@ impl TipTraceArgs {
BRONTES_DB_PATH in .env",
);

let libmdbx = static_object(load_database(brontes_db_endpoint)?);
let libmdbx = static_object(load_read_only_database(brontes_db_endpoint)?);

let tracer =
get_tracing_provider(Path::new(&db_path), max_tasks, ctx.task_executor.clone());

let parser = static_object(DParser::new(metrics_tx, libmdbx, tracer.clone()).await);
let mut end_block = parser.get_latest_block_number().unwrap();

let start_block = if let Some(s) = self.start_block {
s
} else {
libmdbx.client.max_traced_block().await.unwrap()
};

// trace up to chaintip
let catchup = ctx.task_executor.spawn_critical("catchup", async move {
futures::stream::iter(self.start_block..=end_block)
.unordered_buffer_map(100, |i| parser.execute(i))
.map(|_res| ())
futures::stream::iter(start_block..=end_block)
.unordered_buffer_map(100, |i| parser.trace_for_clickhouse(i))
.map(|res| {
drop(res);
return
})
.collect::<Vec<_>>()
.await;
});
Expand All @@ -58,7 +67,7 @@ impl TipTraceArgs {
let tip = parser.get_latest_block_number().unwrap();
if tip + 1 > end_block {
end_block += 1;
let _ = parser.execute(end_block).await;
let _ = parser.trace_for_clickhouse(end_block).await;
}
}
});
Expand Down
11 changes: 11 additions & 0 deletions crates/bin/src/cli/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use brontes_database::clickhouse::Clickhouse;
use brontes_database::clickhouse::ClickhouseHttpClient;
#[cfg(all(feature = "local-clickhouse", not(feature = "local-no-inserts")))]
use brontes_database::clickhouse::ClickhouseMiddleware;
#[cfg(all(feature = "local-clickhouse", not(feature = "local-no-inserts")))]
use brontes_database::clickhouse::ReadOnlyMiddleware;
use brontes_database::libmdbx::LibmdbxReadWriter;
use brontes_inspect::{Inspector, Inspectors};
use brontes_types::{
Expand Down Expand Up @@ -36,6 +38,15 @@ pub fn load_database(db_endpoint: String) -> eyre::Result<ClickhouseMiddleware<L
Ok(ClickhouseMiddleware::new(clickhouse, inner))
}

#[cfg(all(feature = "local-clickhouse", not(feature = "local-no-inserts")))]
pub fn load_read_only_database(
db_endpoint: String,
) -> eyre::Result<ReadOnlyMiddleware<LibmdbxReadWriter>> {
let inner = LibmdbxReadWriter::init_db(db_endpoint, None)?;
let clickhouse = Clickhouse::default();
Ok(ReadOnlyMiddleware::new(clickhouse, inner))
}

pub fn load_libmdbx(db_endpoint: String) -> eyre::Result<LibmdbxReadWriter> {
LibmdbxReadWriter::init_db(db_endpoint, None)
}
Expand Down
14 changes: 14 additions & 0 deletions crates/brontes-core/src/decoding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ use reth_primitives::BlockId;
pub type ParserFuture<'a> =
Pin<Box<dyn Future<Output = Result<Option<(Vec<TxTrace>, Header)>, JoinError>> + Send + 'a>>;

pub type TraceClickhouseFuture<'a> =
Pin<Box<dyn Future<Output = Result<(), JoinError>> + Send + 'a>>;

pub struct Parser<'a, T: TracingProvider, DB: LibmdbxReader + DBWriter> {
executor: Executor,
parser: TraceParser<'a, T, DB>,
Expand Down Expand Up @@ -74,4 +77,15 @@ impl<'a, T: TracingProvider, DB: LibmdbxReader + DBWriter> Parser<'a, T, DB> {
.spawn_result_task_as(parser.execute_block(block_num), TaskKind::Default),
) as ParserFuture
}

pub fn trace_for_clickhouse(&self, block_num: u64) -> TraceClickhouseFuture {
// This will satisfy its lifetime scope do to the lifetime itself living longer
// than the process that runs brontes.
let parser: &'static TraceParser<'_, T, DB> = unsafe { std::mem::transmute(&self.parser) };

Box::pin(
self.executor
.spawn_result_task_as(parser.trace_clickhouse_block(block_num), TaskKind::Default),
) as TraceClickhouseFuture
}
}
37 changes: 37 additions & 0 deletions crates/brontes-core/src/decoding/parser.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::time::Duration;

#[cfg(feature = "dyn-decode")]
use alloy_json_abi::JsonAbi;
#[cfg(feature = "dyn-decode")]
Expand Down Expand Up @@ -48,6 +50,41 @@ impl<'db, T: TracingProvider, DB: LibmdbxReader + DBWriter> TraceParser<'db, T,
Some((traces, self.tracer.header_by_number(block_num).await.ok()??))
}

pub async fn trace_clickhouse_block(&self, block_num: u64) {
let parity_trace = self.trace_block(block_num).await;
let receipts = self.get_receipts(block_num).await;

if parity_trace.0.is_none() && receipts.0.is_none() {
return
}

#[cfg(feature = "dyn-decode")]
let traces = self
.fill_metadata(parity_trace.0.unwrap(), parity_trace.1, receipts.0.unwrap(), block_num)
.await;
#[cfg(not(feature = "dyn-decode"))]
let traces = self
.fill_metadata(parity_trace.0.unwrap(), receipts.0.unwrap(), block_num)
.await;

let mut cnt = 0;

while self
.libmdbx
.save_traces(block_num, traces.0.clone())
.await
.is_err()
{
cnt += 1;
if cnt > 20 {
error!(%block_num, "attempted 20 inserts for db but all failed");
break
}

tokio::time::sleep(Duration::from_secs(3)).await;
}
}

/// executes the tracing of a given block
#[allow(unreachable_code)]
pub async fn execute_block(&'db self, block_num: u64) -> Option<(Vec<TxTrace>, Header)> {
Expand Down
7 changes: 7 additions & 0 deletions crates/brontes-database/src/clickhouse/db_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,13 @@ impl Clickhouse {
&self.client
}

pub async fn max_traced_block(&self) -> eyre::Result<u64> {
Ok(self
.client
.query_one::<u64, _>("select max(block_number) from brontes_api.tx_traces", &())
.await?)
}

// inserts
pub async fn write_searcher_eoa_info(
&self,
Expand Down
Loading

0 comments on commit 87471d6

Please sign in to comment.