diff --git a/Cargo.toml b/Cargo.toml index 9f7ba27..ac4fc3c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,6 +3,6 @@ members = ["crates/*"] resolver = "2" [workspace.dependencies] -sqd-contract-client = { git = "https://github.com/subsquid/subsquid-network.git", version = "1.0.7" } -sqd-messages = { git = "https://github.com/subsquid/subsquid-network.git", version = "1.0.3" } -sqd-network-transport = { git = "https://github.com/subsquid/subsquid-network.git", version = "1.0.23" } +sqd-contract-client = { git = "https://github.com/subsquid/subsquid-network.git", branch="new_pings", version = "1.0.7" } +sqd-messages = { git = "https://github.com/subsquid/subsquid-network.git", branch="new_pings", version = "1.0.3" } +sqd-network-transport = { git = "https://github.com/subsquid/subsquid-network.git", branch="new_pings", version = "1.0.23" } diff --git a/crates/network-scheduler/Cargo.toml b/crates/network-scheduler/Cargo.toml index 9252f39..8e54274 100644 --- a/crates/network-scheduler/Cargo.toml +++ b/crates/network-scheduler/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "network-scheduler" -version = "1.0.26" +version = "1.0.27" edition = "2021" [dependencies] @@ -43,6 +43,7 @@ url = "2.5.0" sqd-contract-client = { workspace = true } sqd-messages = { workspace = true, features = ["semver"] } sqd-network-transport = { workspace = true, features = ["scheduler", "metrics"] } +chrono = "0.4.38" [target.'cfg(not(target_env = "msvc"))'.dependencies] tikv-jemallocator = "0.6" diff --git a/crates/network-scheduler/config.yml b/crates/network-scheduler/config.yml index 1a74c07..72fd8af 100644 --- a/crates/network-scheduler/config.yml +++ b/crates/network-scheduler/config.yml @@ -34,6 +34,7 @@ dataset_buckets: - bsc-mainnet-1 - base-1 - moonbeam-evm-1 +network_state_name: network-state.json scheduler_state_bucket: network-scheduler-state-test supported_worker_versions: ">=1.0.0-rc3" recommended_worker_versions: ">=1.0.0" diff --git a/crates/network-scheduler/src/assignment.rs b/crates/network-scheduler/src/assignment.rs new file mode 100644 index 0000000..a655d4a --- /dev/null +++ b/crates/network-scheduler/src/assignment.rs @@ -0,0 +1,168 @@ +use std::collections::HashMap; + +use serde::{Deserialize, Serialize}; + +use crate::signature::timed_hmac_now; + +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct Chunk { + pub id: String, + pub base_url: String, + pub files: HashMap, + pub size_bytes: u64, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct Dataset { + pub id: String, + pub base_url: String, + pub chunks: Vec, +} + +#[derive(Debug, Default, Serialize, Deserialize)] +#[serde(rename_all = "kebab-case")] +struct EncryptedHeaders { + worker_id: String, + worker_signature: String, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +struct WorkerAssignment { + status: String, + chunks_deltas: Vec, + encrypted_headers: EncryptedHeaders, +} + +#[derive(Debug, Default, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct Assignment { + datasets: Vec, + worker_assignments: HashMap, + #[serde(skip)] + chunk_map: Option>, +} + +#[derive(Serialize, Deserialize)] +pub struct NetworkAssignment { + pub(crate) url: String, + pub(crate) id: String, +} + +#[derive(Serialize, Deserialize)] +pub struct NetworkState { + pub(crate) network: String, + pub(crate) assignment: NetworkAssignment +} + +impl Assignment { + pub fn add_chunk(&mut self, chunk: Chunk, dataset_id: String, dataset_url: String) { + match self.datasets.iter_mut().find(|dataset| dataset.id == dataset_id) { + Some(dataset) => dataset.chunks.push(chunk), + None => self.datasets.push(Dataset { id: dataset_id, base_url: dataset_url, chunks: vec![chunk] }), + } + self.chunk_map = None + } + + pub fn insert_assignment(&mut self, peer_id: String, status: String, chunks_deltas: Vec) { + self.worker_assignments.insert(peer_id.clone(), WorkerAssignment { + status, + chunks_deltas, + encrypted_headers: Default::default() + }); + } + + pub fn dataset_chunks_for_peer_id(&self, peer_id: String) -> Option> { + let local_assignment = match self.worker_assignments.get(&peer_id) { + Some(worker_assignment) => worker_assignment, + None => { + return None + } + }; + let mut result: Vec = Default::default(); + let mut idxs: Vec = Default::default(); + let mut cursor = 0; + for v in &local_assignment.chunks_deltas { + cursor += v; + idxs.push(cursor); + } + cursor = 0; + for u in &self.datasets { + if idxs.is_empty() { + break; + } + let mut filtered_chunks: Vec = Default::default(); + for v in &u.chunks { + if idxs[0] < cursor { + return None; // Malformed diffs + } + if idxs[0] == cursor { + filtered_chunks.push(v.clone()); + idxs.remove(0); + } + if idxs.is_empty() { + break; + } + cursor += 1; + } + if !filtered_chunks.is_empty() { + result.push(Dataset { + id: u.id.clone(), + base_url: u.base_url.clone(), + chunks: filtered_chunks + }); + } + } + Some(result) + } + + pub fn headers_for_peer_id(&self, peer_id: String) -> Option> { + let local_assignment = match self.worker_assignments.get(&peer_id) { + Some(worker_assignment) => worker_assignment, + None => { + return None + } + }; + let headers = match serde_json::to_value(&local_assignment.encrypted_headers) { + Ok(v) => v, + Err(_) => { + return None; + } + }; + let mut result: HashMap = Default::default(); + for (k,v) in headers.as_object().unwrap() { + result.insert(k.to_string(), v.as_str().unwrap().to_string()); + } + Some(result) + } + + pub fn chunk_index(&mut self, chunk_id: String) -> Option { + if self.chunk_map.is_none() { + let mut chunk_map: HashMap = Default::default(); + let mut idx = 0; + for dataset in &self.datasets { + for chunk in &dataset.chunks { + chunk_map.insert(chunk.id.clone(), idx); + idx += 1; + } + }; + self.chunk_map = Some(chunk_map); + }; + self.chunk_map.as_ref().unwrap().get(&chunk_id).cloned() + } + + pub fn regenerate_headers(&mut self, cloudflare_storage_secret: String) { + for (worker_id, worker_assignment) in &mut self.worker_assignments { + let worker_signature = timed_hmac_now( + worker_id, + &cloudflare_storage_secret, + ); + worker_assignment.encrypted_headers = EncryptedHeaders { + worker_id: worker_id.to_string(), + worker_signature, + } + } + } +} \ No newline at end of file diff --git a/crates/network-scheduler/src/cli.rs b/crates/network-scheduler/src/cli.rs index 4f763c3..b284f69 100644 --- a/crates/network-scheduler/src/cli.rs +++ b/crates/network-scheduler/src/cli.rs @@ -52,6 +52,7 @@ pub struct Config { #[serde(default = "default_storage_domain")] pub storage_domain: String, pub dataset_buckets: Vec, + pub network_state_name: String, pub scheduler_state_bucket: String, #[serde(skip_serializing)] pub cloudflare_storage_secret: String, diff --git a/crates/network-scheduler/src/main.rs b/crates/network-scheduler/src/main.rs index f9cb8ef..5a434ab 100644 --- a/crates/network-scheduler/src/main.rs +++ b/crates/network-scheduler/src/main.rs @@ -2,7 +2,7 @@ use clap::Parser; use env_logger::Env; use prometheus_client::registry::Registry; -use sqd_network_transport::{P2PTransportBuilder, SchedulerConfig}; +use sqd_network_transport::{get_agent_info, AgentInfo, P2PTransportBuilder, SchedulerConfig}; use crate::cli::{Cli, Config}; use crate::server::Server; @@ -18,6 +18,7 @@ mod server; mod signature; mod storage; mod worker_state; +mod assignment; #[cfg(not(target_env = "msvc"))] use tikv_jemallocator::Jemalloc; @@ -42,7 +43,8 @@ async fn main() -> anyhow::Result<()> { prometheus_metrics::register_metrics(&mut metrics_registry); // Build P2P transport - let transport_builder = P2PTransportBuilder::from_cli(args.transport).await?; + let agent_info = get_agent_info!(); + let transport_builder = P2PTransportBuilder::from_cli(args.transport, agent_info).await?; let contract_client: Box = transport_builder.contract_client(); let local_peer_id = transport_builder.local_peer_id(); let scheduler_config = SchedulerConfig { diff --git a/crates/network-scheduler/src/server.rs b/crates/network-scheduler/src/server.rs index 3bfd68c..ebbe039 100644 --- a/crates/network-scheduler/src/server.rs +++ b/crates/network-scheduler/src/server.rs @@ -8,15 +8,18 @@ use futures::{Stream, StreamExt}; use itertools::Itertools; use parking_lot::Mutex; use prometheus_client::registry::Registry; +use tokio::join; use tokio::signal::unix::{signal, SignalKind}; use tokio::sync::mpsc::Receiver; use tokio::time::Instant; +use base64::{engine::general_purpose::STANDARD as base64, Engine}; use sqd_messages::signatures::msg_hash; use sqd_messages::{Pong, RangeSet}; use sqd_network_transport::util::{CancellationToken, TaskManager}; use sqd_network_transport::{SchedulerEvent, SchedulerTransportHandle}; +use crate::assignment::{Assignment, Chunk}; use crate::cli::Config; use crate::data_chunk::{chunks_to_worker_state, DataChunk}; use crate::scheduler::{ChunkStatus, Scheduler}; @@ -171,7 +174,9 @@ impl Server { if current_epoch >= last_schedule_epoch + schedule_interval { scheduler.schedule(current_epoch); match scheduler.to_json() { - Ok(state) => storage_client.save_scheduler(state).await, + Ok(state) => { + storage_client.save_scheduler(state.clone()).await + }, Err(e) => log::error!("Error serializing scheduler: {e:?}"), } } @@ -284,10 +289,13 @@ impl Server { log::info!("Updating chunks summary"); let workers = scheduler.all_workers(); let units = scheduler.known_units(); + let assignment = build_assignment(&workers, &units); + let assignment_fut = storage_client.save_assignment(assignment); let summary = build_chunks_summary(workers, units); let save_fut = storage_client.save_chunks_list(&summary); + scheduler.update_chunks_summary(summary); - save_fut.await; + join!(save_fut, assignment_fut); } }; self.task_manager @@ -313,6 +321,62 @@ fn find_workers_with_chunk( .collect() } +fn build_assignment( + workers: &Vec, + units: &DashMap, +) -> Assignment { + let mut assignment: Assignment = Default::default(); + let mut aux: HashMap> = Default::default(); + for (k, unit) in units.clone() { + let mut local_ids: Vec = Default::default(); + for chunk in unit.chunks { + let chunk_str = chunk.chunk_str; + let download_url = chunk.download_url; + let mut files: HashMap = Default::default(); + for filename in chunk.filenames { + files.insert(filename.clone(), filename); + } + let dataset_str = chunk.dataset_id; + let dataset_id = base64.encode(dataset_str); + let size_bytes = chunk.size_bytes; + let chunk = Chunk { + id: chunk_str.clone(), + base_url: format!("{download_url}/{chunk_str}"), + files, + size_bytes, + }; + + assignment.add_chunk(chunk, dataset_id, download_url); + local_ids.push(chunk_str); + } + aux.insert(k.to_string(), local_ids); + }; + + for worker in workers { + let peer_id = worker.peer_id; + let status = match worker.jail_reason { + Some(str) => str.to_string(), + None => "Ok".to_string() + }; + let mut chunks_idxs: Vec = Default::default(); + + for unit in &worker.assigned_units { + let unit_id = unit.to_string(); + for chunk_id in aux.get(&unit_id).unwrap() { + chunks_idxs.push(assignment.chunk_index(chunk_id.clone()).unwrap()); + } + } + chunks_idxs.sort(); + for i in (1..chunks_idxs.len()).rev() { + chunks_idxs[i] -= chunks_idxs[i - 1]; + }; + + assignment.insert_assignment(peer_id.to_string(), status, chunks_idxs); + }; + assignment.regenerate_headers(Config::get().cloudflare_storage_secret.clone()); + assignment +} + fn build_chunks_summary( workers: Vec, units: DashMap, diff --git a/crates/network-scheduler/src/storage.rs b/crates/network-scheduler/src/storage.rs index a02e82c..6058a11 100644 --- a/crates/network-scheduler/src/storage.rs +++ b/crates/network-scheduler/src/storage.rs @@ -1,18 +1,24 @@ use std::fmt::Display; use std::future::Future; +use std::io::Write; use std::sync::Arc; use std::time::Duration; use aws_sdk_s3 as s3; use aws_sdk_s3::error::SdkError; use aws_sdk_s3::types::Object; +use chrono::Utc; +use flate2::write::GzEncoder; +use flate2::Compression; use itertools::Itertools; use nonempty::NonEmpty; use tokio::sync::mpsc::{self, Receiver, Sender}; use tokio::sync::Mutex; +use sha2::{Sha256, Digest}; use sqd_network_transport::util::{CancellationToken, TaskManager}; +use crate::assignment::{Assignment, NetworkAssignment, NetworkState}; use crate::cli::Config; use crate::data_chunk::DataChunk; use crate::prometheus_metrics; @@ -301,6 +307,59 @@ impl S3Storage { prometheus_metrics::s3_request(); } + pub async fn save_assignment(&self, assignment: Assignment) { + log::debug!("Encoding assignment"); + let mut encoder = GzEncoder::new(Vec::new(), Compression::best()); + let _ = encoder.write_all(serde_json::to_vec(&assignment).unwrap().as_slice()); + let compressed_bytes = encoder.finish().unwrap(); + log::debug!("Saving assignment"); + let mut hasher = Sha256::new(); + hasher.update(compressed_bytes.as_slice()); + let hash = hasher.finalize(); + let network = Config::get().network.clone(); + let current_time = Utc::now(); + let timestamp = current_time.format("%FT%T"); + let filename: String = format!("assignments/{network}/{timestamp}_{hash:X}.json.gz"); + + let saving_result = self + .client + .put_object() + .bucket(&self.config.scheduler_state_bucket) + //.bucket("network-scheduler-state") + .key(&filename) + .body(compressed_bytes.into()) + .send() + .await; + prometheus_metrics::s3_request(); + match saving_result { + Ok(_) => {}, + Err(e) => { + log::error!("Error saving assignment: {e:?}"); + return; + } + } + + let network_state = NetworkState { + network: Config::get().network.clone(), + assignment: NetworkAssignment { + url: format!("https://metadata.sqd-datasets.io/{filename}"), + id: format!("{timestamp}_{hash:X}") + } + }; + let contents = serde_json::to_vec(&network_state).unwrap(); + let _ = self + .client + .put_object() + .bucket(&self.config.scheduler_state_bucket) + //.bucket("network-scheduler-state") + .key(Config::get().network_state_name.clone()) + .body(contents.into()) + .send() + .await + .map_err(|e| log::error!("Error saving link to assignment: {e:?}")); + prometheus_metrics::s3_request(); + } + pub fn save_chunks_list(&self, chunks_summary: &ChunksSummary) -> impl Future { log::debug!("Saving chunks list"); let start = tokio::time::Instant::now();