Skip to content

Commit

Permalink
Try to revert?
Browse files Browse the repository at this point in the history
  • Loading branch information
Fly-Style committed Nov 26, 2024
1 parent 62e7a99 commit fc0d6cd
Showing 1 changed file with 27 additions and 38 deletions.
65 changes: 27 additions & 38 deletions indexer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,11 @@ use clap::Parser;
use configs::{Opts, SubCommand};
use prometheus::Registry;
use tracing::{error, info};
use tokio::sync::mpsc::Receiver;
use crate::rmq_publisher::PublishData;

use crate::{
candidates_validator::CandidatesValidator, configs::RunConfigArgs, errors::Error, errors::Result,
indexer_wrapper::IndexerWrapper, metrics::Metricable, metrics_server::MetricsServer, rmq_publisher::RmqPublisher,
};
use crate::fastnear_indexer::FastNearIndexer;
use crate::metrics::INDEXER_NAMESPACE;

mod block_listener;
Expand All @@ -21,12 +18,12 @@ mod metrics;
mod metrics_server;
mod rmq_publisher;
mod types;
mod fastnear_indexer;

const INDEXER: &str = "indexer";

fn run(home_dir: std::path::PathBuf, config: RunConfigArgs) -> Result<()> {
let addresses_to_rollup_ids = config.compile_addresses_to_ids_map()?;

let system = actix::System::new();
let registry = Registry::new();
let server_handle = if let Some(metrics_addr) = config.metrics_ip_port_address {
Expand All @@ -35,46 +32,36 @@ fn run(home_dir: std::path::PathBuf, config: RunConfigArgs) -> Result<()> {
} else {
None
};
// TODO[sasha/firat]: refactor and tests.

// TODO: refactor
let block_res = system.block_on(async move {
let validated_stream: Receiver<PublishData>;
if cfg!(feature = "use_fastnear") {
let fastnear_indexer = FastNearIndexer::new(addresses_to_rollup_ids);
validated_stream = fastnear_indexer.run();
} else {
let indexer_config = near_indexer::IndexerConfig {
home_dir,
sync_mode: near_indexer::SyncModeEnum::LatestSynced,
await_for_node_synced: near_indexer::AwaitForNodeSyncedEnum::WaitForFullSync,
validate_genesis: true,
};

let mut indexer = IndexerWrapper::new(indexer_config, addresses_to_rollup_ids);
if config.metrics_ip_port_address.is_some() {
indexer.enable_metrics(registry.clone())?;
}

let (view_client, _) = indexer.client_actors();
let (block_handle, candidates_stream) = indexer.run();
let mut candidates_validator = CandidatesValidator::new(view_client);
if config.metrics_ip_port_address.is_some() {
candidates_validator.enable_metrics(registry.clone())?;
}

validated_stream = candidates_validator.run(candidates_stream);

// TODO: Handle block_handle whether cancelled or panics
block_handle.await?;
let indexer_config = near_indexer::IndexerConfig {
home_dir,
sync_mode: near_indexer::SyncModeEnum::LatestSynced,
await_for_node_synced: near_indexer::AwaitForNodeSyncedEnum::WaitForFullSync,
validate_genesis: true,
};

let mut indexer = IndexerWrapper::new(indexer_config, addresses_to_rollup_ids);
if let Some(_) = config.metrics_ip_port_address {
indexer.enable_metrics(registry.clone())?;
}

let (view_client, _) = indexer.client_actors();
let (block_handle, candidates_stream) = indexer.run();
let mut candidates_validator = CandidatesValidator::new(view_client);
if config.metrics_ip_port_address.is_some() {
candidates_validator.enable_metrics(registry.clone())?;
}

let validated_stream = candidates_validator.run(candidates_stream);
let mut rmq_publisher = RmqPublisher::new(&config.rmq_address)?;
if config.metrics_ip_port_address.is_some() {
rmq_publisher.enable_metrics(registry.clone())?;
}
rmq_publisher.run(validated_stream);

Ok::<_, Error>(())
Ok::<_, Error>(block_handle.await?)
});

if let Some(handle) = server_handle {
Expand All @@ -97,8 +84,10 @@ fn read_config<T: serde::de::DeserializeOwned>(
if let Some(config_path) = config_path {
let config_str = std::fs::read_to_string(config_path)?;
serde_yaml::from_str(&config_str).map_err(Into::into)
} else if let Some(config_args) = config_args {
Ok(config_args)
} else {
config_args.ok_or_else(|| Error::AnyhowError(anyhow::anyhow!("Either config_path or config_args must be provided")))
panic!("Either config_path or config_args must be provided")
}
}

Expand All @@ -111,12 +100,12 @@ fn main() -> Result<()> {
let env_filter = near_o11y::tracing_subscriber::EnvFilter::new(
"nearcore=info,publisher=info,indexer=info,candidates_validator=info,\
metrics=info,tokio_reactor=info,near=info,stats=info,telemetry=info,\
near-performance-metrics=info,fastnear_indexer=info",
near-performance-metrics=info",
);
let _subscriber = near_o11y::default_subscriber(env_filter, &Default::default()).global();
let opts: Opts = Opts::parse();

let home_dir = opts.home_dir.unwrap_or_else(near_indexer::get_default_home);
let home_dir = opts.home_dir.unwrap_or(near_indexer::get_default_home());
match opts.subcmd {
SubCommand::Init(params) => {
near_indexer::indexer_init_configs(&home_dir, read_config(params.config, params.args)?.into())?;
Expand Down

0 comments on commit fc0d6cd

Please sign in to comment.