Skip to content

Commit

Permalink
Scheduler reachability check + bump to v0.2.0
Browse files Browse the repository at this point in the history
  • Loading branch information
Wiezzel committed Jan 8, 2024
1 parent baa3aa2 commit 271febe
Show file tree
Hide file tree
Showing 16 changed files with 337 additions and 256 deletions.
3 changes: 0 additions & 3 deletions .vscode/extensions.json

This file was deleted.

7 changes: 0 additions & 7 deletions .vscode/settings.json

This file was deleted.

7 changes: 4 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/contract-client/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub struct RpcArgs {
pub keystore_password: String,
#[arg(
long,
env,
env = "MAX_GET_LOG_BLOCKS",
help = "Maximum number of blocks to scan for events in a single RPC call",
default_value = "1000000000"
)]
Expand Down
2 changes: 1 addition & 1 deletion crates/logs-collector/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "logs-collector"
version = "0.1.0"
version = "0.2.0"
edition = "2021"

[dependencies]
Expand Down
3 changes: 2 additions & 1 deletion crates/network-scheduler/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "network-scheduler"
version = "0.1.7"
version = "0.2.0"
edition = "2021"

[dependencies]
Expand All @@ -12,6 +12,7 @@ axum = { version = "0.6", features = ["json"] }
clap = { version = "4", features = ["derive", "env"] }
derive-enum-from-into = "0.1"
env_logger = "0.10"
futures = "0.3"
hex = "0.4"
itertools = "0.12"
iter_num_tools = "0.7"
Expand Down
9 changes: 5 additions & 4 deletions crates/network-scheduler/config.yml
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
schedule_interval_sec: 10800 # 3 hours
worker_inactive_timeout_sec: 120
worker_stale_timeout_sec: 900 # 15 min
schedule_interval_sec: 10800 # 3 hours
worker_inactive_timeout_sec: 120 # 2 min
worker_stale_timeout_sec: 900 # 15 min
worker_unreachable_timeout_sec: 300 # 5 min
replication_factor: 2
scheduling_unit_size: 10
mixed_units_ratio: 0.1
mixing_recent_unit_weight: 10.0
worker_storage_bytes: 549755813888 # 512 GiB
worker_storage_bytes: 549755813888 # 512 GiB
s3_endpoint: 'https://7a28e49ec5f4a60c66f216392792ac38.r2.cloudflarestorage.com/'
dataset_buckets:
- 'ethereum-mainnet'
Expand Down
7 changes: 7 additions & 0 deletions crates/network-scheduler/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ pub struct Config {
#[serde_as(as = "DurationSeconds")]
#[serde(rename = "worker_stale_timeout_sec")]
pub worker_stale_timeout: Duration,
#[serde_as(as = "DurationSeconds")]
#[serde(rename = "worker_unreachable_timeout_sec")]
pub worker_unreachable_timeout: Duration,
pub replication_factor: usize,
pub scheduling_unit_size: usize,
pub worker_storage_bytes: u64,
Expand All @@ -39,6 +42,10 @@ impl Config {
pub fn get() -> &'static Self {
CONFIG.get().expect("Config not initialized")
}

pub fn worker_monitoring_interval(&self) -> Duration {
self.worker_inactive_timeout / 2
}
}

#[derive(Parser)]
Expand Down
1 change: 1 addition & 0 deletions crates/network-scheduler/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ mod scheduler;
mod scheduling_unit;
mod server;
mod storage;
mod worker_state;

const PING_TOPIC: &str = "worker_ping";

Expand Down
7 changes: 4 additions & 3 deletions crates/network-scheduler/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ use serde_with::{serde_as, TimestampMilliSeconds};
use tokio::fs::OpenOptions;
use tokio::io::{AsyncWrite, AsyncWriteExt};

use subsquid_messages::{PingV2, QueryFinished, QuerySubmitted};
use subsquid_messages::{PingV2 as Ping, QueryFinished, QuerySubmitted};
use subsquid_network_transport::PeerId;

use crate::cli::Cli;
use crate::scheduler::WorkerState;
use crate::worker_state::WorkerState;

#[serde_as]
#[derive(Debug, Clone, Serialize)]
Expand All @@ -28,6 +28,7 @@ impl Metrics {
let expected_sender = match &event {
MetricsEvent::QuerySubmitted(QuerySubmitted { client_id, .. }) => Some(client_id),
MetricsEvent::QueryFinished(QueryFinished { client_id, .. }) => Some(client_id),
MetricsEvent::Ping(Ping { worker_id, .. }) => worker_id.as_ref(),
_ => None,
};
anyhow::ensure!(
Expand All @@ -51,7 +52,7 @@ impl Metrics {
#[derive(Debug, Clone, Serialize, EnumFrom)]
#[serde(tag = "event")]
pub enum MetricsEvent {
Ping(PingV2),
Ping(Ping),
QuerySubmitted(QuerySubmitted),
QueryFinished(QueryFinished),
WorkersSnapshot { active_workers: Vec<WorkerState> },
Expand Down
3 changes: 2 additions & 1 deletion crates/network-scheduler/src/metrics_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ use subsquid_network_transport::PeerId;

use crate::cli::Config;
use crate::data_chunk::{chunks_to_worker_state, DataChunk};
use crate::scheduler::{Scheduler, WorkerState};
use crate::scheduler::Scheduler;
use crate::worker_state::WorkerState;

#[derive(Debug, Clone, Serialize)]
struct ChunkStatus {
Expand Down
Loading

0 comments on commit 271febe

Please sign in to comment.