From 639544acb102554e5a5d0ad805a1037e76ab7848 Mon Sep 17 00:00:00 2001 From: Denis Kanonik Date: Thu, 17 Oct 2024 09:47:19 +0000 Subject: [PATCH 1/7] Storing new assignments --- crates/network-scheduler/Cargo.toml | 1 + crates/network-scheduler/config.yml | 1 + crates/network-scheduler/src/assignment.rs | 235 +++++++++++++++++++++ crates/network-scheduler/src/cli.rs | 1 + crates/network-scheduler/src/main.rs | 1 + crates/network-scheduler/src/server.rs | 7 +- crates/network-scheduler/src/storage.rs | 54 +++++ 7 files changed, 299 insertions(+), 1 deletion(-) create mode 100644 crates/network-scheduler/src/assignment.rs diff --git a/crates/network-scheduler/Cargo.toml b/crates/network-scheduler/Cargo.toml index 9252f39..29e16a8 100644 --- a/crates/network-scheduler/Cargo.toml +++ b/crates/network-scheduler/Cargo.toml @@ -43,6 +43,7 @@ url = "2.5.0" sqd-contract-client = { workspace = true } sqd-messages = { workspace = true, features = ["semver"] } sqd-network-transport = { workspace = true, features = ["scheduler", "metrics"] } +chrono = "0.4.38" [target.'cfg(not(target_env = "msvc"))'.dependencies] tikv-jemallocator = "0.6" diff --git a/crates/network-scheduler/config.yml b/crates/network-scheduler/config.yml index 1a74c07..72fd8af 100644 --- a/crates/network-scheduler/config.yml +++ b/crates/network-scheduler/config.yml @@ -34,6 +34,7 @@ dataset_buckets: - bsc-mainnet-1 - base-1 - moonbeam-evm-1 +network_state_name: network-state.json scheduler_state_bucket: network-scheduler-state-test supported_worker_versions: ">=1.0.0-rc3" recommended_worker_versions: ">=1.0.0" diff --git a/crates/network-scheduler/src/assignment.rs b/crates/network-scheduler/src/assignment.rs new file mode 100644 index 0000000..63aab4d --- /dev/null +++ b/crates/network-scheduler/src/assignment.rs @@ -0,0 +1,235 @@ +use base64::{engine::general_purpose::STANDARD as base64, Engine}; +use flate2::read::GzDecoder; +use std::{collections::HashMap, io::Read}; + +use serde::{Deserialize, Serialize}; +use serde_json::Value; + +use crate::signature::timed_hmac_now; + +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct Chunk { + pub id: String, + pub base_url: String, + pub files: HashMap, + size: u64, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct Dataset { + pub id: String, + pub base_url: String, + pub chunks: Vec, +} + +#[derive(Debug, Default, Serialize, Deserialize)] +#[serde(rename_all = "kebab-case")] +struct EncryptedHeaders { + worker_id: String, + worker_signature: String, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +struct WorkerAssignment { + status: String, + chunks_deltas: Vec, + encrypted_headers: EncryptedHeaders, +} + +#[derive(Debug, Default, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct Assignment { + datasets: Vec, + worker_assignments: HashMap, + #[serde(skip)] + chunk_map: Option>, +} + +#[derive(Serialize, Deserialize)] +pub struct NetworkAssignment { + pub(crate) url: String, + pub(crate) id: String, +} + +#[derive(Serialize, Deserialize)] +pub struct NetworkState { + pub(crate) network: String, + pub(crate) assignment: NetworkAssignment +} + +impl Assignment { + fn add_chunk(&mut self, chunk: Chunk, dataset_id: String, dataset_url: String) { + match self.datasets.iter_mut().find(|dataset| dataset.id == dataset_id) { + Some(dataset) => dataset.chunks.push(chunk), + None => self.datasets.push(Dataset { id: dataset_id, base_url: dataset_url, chunks: vec![chunk] }), + } + self.chunk_map = None + } + + pub fn dataset_chunks_for_peer_id(&self, peer_id: String) -> Option> { + let local_assignment = match self.worker_assignments.get(&peer_id) { + Some(worker_assignment) => worker_assignment, + None => { + return None + } + }; + let mut result: Vec = Default::default(); + let mut idxs: Vec = Default::default(); + let mut cursor = 0; + for v in &local_assignment.chunks_deltas { + cursor += v; + idxs.push(cursor); + } + cursor = 0; + for u in &self.datasets { + if idxs.is_empty() { + break; + } + let mut filtered_chunks: Vec = Default::default(); + for v in &u.chunks { + if idxs[0] < cursor { + return None; // Malformed diffs + } + if idxs[0] == cursor { + filtered_chunks.push(v.clone()); + idxs.remove(0); + } + if idxs.is_empty() { + break; + } + cursor += 1; + } + if !filtered_chunks.is_empty() { + result.push(Dataset { + id: u.id.clone(), + base_url: u.base_url.clone(), + chunks: filtered_chunks + }); + } + } + Some(result) + // let mut result: Vec = Default::default(); + // let mut cursor = 0; + // for v in &local_assignment.chunks_deltas { + // cursor += v; + // result.push(flat_chunks[cursor as usize].clone()); + // } + // Some(result) + } + + pub fn headers_for_peer_id(&self, peer_id: String) -> Option> { + let local_assignment = match self.worker_assignments.get(&peer_id) { + Some(worker_assignment) => worker_assignment, + None => { + return None + } + }; + let headers = match serde_json::to_value(&local_assignment.encrypted_headers) { + Ok(v) => v, + Err(_) => { + return None; + } + }; + let mut result: HashMap = Default::default(); + for (k,v) in headers.as_object().unwrap() { + result.insert(k.to_string(), v.as_str().unwrap().to_string()); + } + Some(result) + } + + pub fn chunk_index(&mut self, chunk_id: String) -> Option { + if self.chunk_map == None { + let mut chunk_map: HashMap = Default::default(); + let mut idx = 0; + for dataset in &self.datasets { + for chunk in &dataset.chunks { + chunk_map.insert(chunk.id.clone(), idx); + idx += 1; + } + }; + self.chunk_map = Some(chunk_map); + }; + match self.chunk_map.as_ref().unwrap().get(&chunk_id) { + Some(idx) =>Some(idx.clone()), + None => None + } + } + + pub fn regenerate_headers(&mut self, cloudflare_storage_secret: String) { + for (worker_id, worker_assignment) in &mut self.worker_assignments { + let worker_signature = timed_hmac_now( + &worker_id, + &cloudflare_storage_secret, + ); + worker_assignment.encrypted_headers = EncryptedHeaders { + worker_id: worker_id.to_string(), + worker_signature, + } + } + } + + pub fn new(scheduler_json: &Value, cloudflare_storage_secret: String) -> Self { + let mut assignment: Assignment = Default::default(); + let mut aux: HashMap> = Default::default(); + let units = scheduler_json.get("known_units").unwrap().as_object().unwrap(); + for (k, unit) in units { + let mut local_ids: Vec = Default::default(); + for chunk in unit.get("chunks").unwrap().as_array().unwrap() { + let chunk_str = chunk.get("chunk_str").unwrap().as_str().unwrap().to_string(); + let download_url = chunk.get("download_url").unwrap().as_str().unwrap().to_string(); + let mut files: HashMap = Default::default(); + for file in chunk.get("filenames").unwrap().as_array().unwrap() { + let filename = file.as_str().unwrap().to_string(); + files.insert(filename.clone(), filename); + } + let dataset_str = chunk.get("dataset_id").unwrap().as_str().unwrap().to_string(); + let dataset_id = base64.encode(dataset_str); + let size = chunk.get("size_bytes").unwrap().as_u64().unwrap(); + let chunk = Chunk { + id: chunk_str.clone(), + base_url: format!("{download_url}/{chunk_str}"), + files, + size, + }; + + assignment.add_chunk(chunk, dataset_id, download_url); + local_ids.push(chunk_str); + } + aux.insert(k.clone(), local_ids); + } + + let workers = scheduler_json.get("worker_states").unwrap().as_object().unwrap(); + for (worker_id, data) in workers { + let peer_id = worker_id.clone(); + let status = match data.get("jail_reason").unwrap().as_str() { + Some(str) => str.to_string(), + None => "Ok".to_string() + }; + let mut chunks_idxs: Vec = Default::default(); + let units = data.get("assigned_units").unwrap().as_array().unwrap(); + for unit in units { + let unit_id = unit.as_str().unwrap().to_string(); + for chunk_id in aux.get(&unit_id).unwrap() { + chunks_idxs.push(assignment.chunk_index(chunk_id.clone()).unwrap()); + } + } + chunks_idxs.sort(); + for i in (1..chunks_idxs.len()).rev() { + chunks_idxs[i] = chunks_idxs[i] - chunks_idxs[i - 1]; + }; + + assignment.worker_assignments.insert(peer_id.clone(), WorkerAssignment { + status, + chunks_deltas: chunks_idxs, + encrypted_headers: Default::default() + }); + } + + assignment.regenerate_headers(cloudflare_storage_secret); + + assignment + } +} \ No newline at end of file diff --git a/crates/network-scheduler/src/cli.rs b/crates/network-scheduler/src/cli.rs index 4f763c3..b284f69 100644 --- a/crates/network-scheduler/src/cli.rs +++ b/crates/network-scheduler/src/cli.rs @@ -52,6 +52,7 @@ pub struct Config { #[serde(default = "default_storage_domain")] pub storage_domain: String, pub dataset_buckets: Vec, + pub network_state_name: String, pub scheduler_state_bucket: String, #[serde(skip_serializing)] pub cloudflare_storage_secret: String, diff --git a/crates/network-scheduler/src/main.rs b/crates/network-scheduler/src/main.rs index f9cb8ef..678c699 100644 --- a/crates/network-scheduler/src/main.rs +++ b/crates/network-scheduler/src/main.rs @@ -18,6 +18,7 @@ mod server; mod signature; mod storage; mod worker_state; +mod assignment; #[cfg(not(target_env = "msvc"))] use tikv_jemallocator::Jemalloc; diff --git a/crates/network-scheduler/src/server.rs b/crates/network-scheduler/src/server.rs index 3bfd68c..ca4ba30 100644 --- a/crates/network-scheduler/src/server.rs +++ b/crates/network-scheduler/src/server.rs @@ -8,6 +8,7 @@ use futures::{Stream, StreamExt}; use itertools::Itertools; use parking_lot::Mutex; use prometheus_client::registry::Registry; +use tokio::join; use tokio::signal::unix::{signal, SignalKind}; use tokio::sync::mpsc::Receiver; use tokio::time::Instant; @@ -171,7 +172,11 @@ impl Server { if current_epoch >= last_schedule_epoch + schedule_interval { scheduler.schedule(current_epoch); match scheduler.to_json() { - Ok(state) => storage_client.save_scheduler(state).await, + Ok(state) => { + let assignment_fut = storage_client.save_assignment(&state); + let scheduler_fut = storage_client.save_scheduler(state.clone()); + join!(scheduler_fut, assignment_fut).1 + }, Err(e) => log::error!("Error serializing scheduler: {e:?}"), } } diff --git a/crates/network-scheduler/src/storage.rs b/crates/network-scheduler/src/storage.rs index a02e82c..309d475 100644 --- a/crates/network-scheduler/src/storage.rs +++ b/crates/network-scheduler/src/storage.rs @@ -1,18 +1,25 @@ use std::fmt::Display; use std::future::Future; +use std::io::Write; use std::sync::Arc; use std::time::Duration; use aws_sdk_s3 as s3; use aws_sdk_s3::error::SdkError; use aws_sdk_s3::types::Object; +use chrono::Utc; +use flate2::write::GzEncoder; +use flate2::Compression; use itertools::Itertools; use nonempty::NonEmpty; +use serde_json::Value; use tokio::sync::mpsc::{self, Receiver, Sender}; use tokio::sync::Mutex; +use sha2::{Sha256, Digest}; use sqd_network_transport::util::{CancellationToken, TaskManager}; +use crate::assignment::{Assignment, NetworkAssignment, NetworkState}; use crate::cli::Config; use crate::data_chunk::DataChunk; use crate::prometheus_metrics; @@ -301,6 +308,53 @@ impl S3Storage { prometheus_metrics::s3_request(); } + pub async fn save_assignment(&self, scheduler_state: &Vec) { + log::debug!("Encoding assignment"); + let json: Value = serde_json::from_slice(scheduler_state).unwrap(); + let assignment = Assignment::new(&json, Config::get().cloudflare_storage_secret.clone()); + let mut encoder = GzEncoder::new(Vec::new(), Compression::best()); + let _ = encoder.write_all(serde_json::to_vec(&assignment).unwrap().as_slice()); + let compressed_bytes = encoder.finish().unwrap(); + log::debug!("Saving assignment"); + let mut hasher = Sha256::new(); + hasher.update(compressed_bytes.as_slice()); + let hash = hasher.finalize(); + let network = Config::get().network.clone(); + let dt = Utc::now(); + let ts = dt.format("%Y%m%dT%H%M%S"); + let filename: String = format!("assignments/{network}_{ts}_{hash:X}.json.gz"); + + let _ = self + .client + .put_object() + .bucket(&self.config.scheduler_state_bucket) + .key(&filename) + .body(compressed_bytes.into()) + .send() + .await + .map_err(|e| log::error!("Error saving assignment: {e:?}")); + prometheus_metrics::s3_request(); + + let network_state = NetworkState { + network: Config::get().network.clone(), + assignment: NetworkAssignment { + url: format!("https://metadata.sqd-datasets.io/{filename}"), + id: format!("{ts}_{hash:X}") + } + }; + let contents = serde_json::to_vec(&network_state).unwrap(); + let _ = self + .client + .put_object() + .bucket(&self.config.scheduler_state_bucket) + .key(Config::get().network_state_name.clone()) + .body(contents.into()) + .send() + .await + .map_err(|e| log::error!("Error saving assignment: {e:?}")); + prometheus_metrics::s3_request(); + } + pub fn save_chunks_list(&self, chunks_summary: &ChunksSummary) -> impl Future { log::debug!("Saving chunks list"); let start = tokio::time::Instant::now(); From 7fb8a9d21f68deea401ea4c5d4db3b093ce23dd7 Mon Sep 17 00:00:00 2001 From: Denis Kanonik Date: Fri, 18 Oct 2024 12:18:50 +0000 Subject: [PATCH 2/7] move to recent lib version --- crates/network-scheduler/src/main.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/crates/network-scheduler/src/main.rs b/crates/network-scheduler/src/main.rs index 678c699..5a434ab 100644 --- a/crates/network-scheduler/src/main.rs +++ b/crates/network-scheduler/src/main.rs @@ -2,7 +2,7 @@ use clap::Parser; use env_logger::Env; use prometheus_client::registry::Registry; -use sqd_network_transport::{P2PTransportBuilder, SchedulerConfig}; +use sqd_network_transport::{get_agent_info, AgentInfo, P2PTransportBuilder, SchedulerConfig}; use crate::cli::{Cli, Config}; use crate::server::Server; @@ -43,7 +43,8 @@ async fn main() -> anyhow::Result<()> { prometheus_metrics::register_metrics(&mut metrics_registry); // Build P2P transport - let transport_builder = P2PTransportBuilder::from_cli(args.transport).await?; + let agent_info = get_agent_info!(); + let transport_builder = P2PTransportBuilder::from_cli(args.transport, agent_info).await?; let contract_client: Box = transport_builder.contract_client(); let local_peer_id = transport_builder.local_peer_id(); let scheduler_config = SchedulerConfig { From 9a40ac2bac2054e5e22af02c95d2accba8ce2115 Mon Sep 17 00:00:00 2001 From: Denis Kanonik Date: Fri, 18 Oct 2024 12:20:49 +0000 Subject: [PATCH 3/7] PR notes fixed --- crates/network-scheduler/src/assignment.rs | 17 ++++++-------- crates/network-scheduler/src/storage.rs | 26 ++++++++++++++-------- 2 files changed, 24 insertions(+), 19 deletions(-) diff --git a/crates/network-scheduler/src/assignment.rs b/crates/network-scheduler/src/assignment.rs index 63aab4d..e4535a5 100644 --- a/crates/network-scheduler/src/assignment.rs +++ b/crates/network-scheduler/src/assignment.rs @@ -13,7 +13,7 @@ pub struct Chunk { pub id: String, pub base_url: String, pub files: HashMap, - size: u64, + size_bytes: u64, } #[derive(Debug, Serialize, Deserialize)] @@ -141,7 +141,7 @@ impl Assignment { } pub fn chunk_index(&mut self, chunk_id: String) -> Option { - if self.chunk_map == None { + if self.chunk_map.is_none() { let mut chunk_map: HashMap = Default::default(); let mut idx = 0; for dataset in &self.datasets { @@ -152,16 +152,13 @@ impl Assignment { }; self.chunk_map = Some(chunk_map); }; - match self.chunk_map.as_ref().unwrap().get(&chunk_id) { - Some(idx) =>Some(idx.clone()), - None => None - } + self.chunk_map.as_ref().unwrap().get(&chunk_id).cloned() } pub fn regenerate_headers(&mut self, cloudflare_storage_secret: String) { for (worker_id, worker_assignment) in &mut self.worker_assignments { let worker_signature = timed_hmac_now( - &worker_id, + worker_id, &cloudflare_storage_secret, ); worker_assignment.encrypted_headers = EncryptedHeaders { @@ -187,12 +184,12 @@ impl Assignment { } let dataset_str = chunk.get("dataset_id").unwrap().as_str().unwrap().to_string(); let dataset_id = base64.encode(dataset_str); - let size = chunk.get("size_bytes").unwrap().as_u64().unwrap(); + let size_bytes = chunk.get("size_bytes").unwrap().as_u64().unwrap(); let chunk = Chunk { id: chunk_str.clone(), base_url: format!("{download_url}/{chunk_str}"), files, - size, + size_bytes, }; assignment.add_chunk(chunk, dataset_id, download_url); @@ -218,7 +215,7 @@ impl Assignment { } chunks_idxs.sort(); for i in (1..chunks_idxs.len()).rev() { - chunks_idxs[i] = chunks_idxs[i] - chunks_idxs[i - 1]; + chunks_idxs[i] -= chunks_idxs[i - 1]; }; assignment.worker_assignments.insert(peer_id.clone(), WorkerAssignment { diff --git a/crates/network-scheduler/src/storage.rs b/crates/network-scheduler/src/storage.rs index 309d475..b85885e 100644 --- a/crates/network-scheduler/src/storage.rs +++ b/crates/network-scheduler/src/storage.rs @@ -308,7 +308,7 @@ impl S3Storage { prometheus_metrics::s3_request(); } - pub async fn save_assignment(&self, scheduler_state: &Vec) { + pub async fn save_assignment(&self, scheduler_state: &[u8]) { log::debug!("Encoding assignment"); let json: Value = serde_json::from_slice(scheduler_state).unwrap(); let assignment = Assignment::new(&json, Config::get().cloudflare_storage_secret.clone()); @@ -320,26 +320,33 @@ impl S3Storage { hasher.update(compressed_bytes.as_slice()); let hash = hasher.finalize(); let network = Config::get().network.clone(); - let dt = Utc::now(); - let ts = dt.format("%Y%m%dT%H%M%S"); - let filename: String = format!("assignments/{network}_{ts}_{hash:X}.json.gz"); + let current_time = Utc::now(); + let timestamp = current_time.format("%FT%T"); + let filename: String = format!("assignments/{network}/{timestamp}_{hash:X}.json.gz"); - let _ = self + let saving_result = self .client .put_object() .bucket(&self.config.scheduler_state_bucket) + //.bucket("network-scheduler-state") .key(&filename) .body(compressed_bytes.into()) .send() - .await - .map_err(|e| log::error!("Error saving assignment: {e:?}")); + .await; prometheus_metrics::s3_request(); + match saving_result { + Ok(_) => {}, + Err(e) => { + log::error!("Error saving assignment: {e:?}"); + return; + } + } let network_state = NetworkState { network: Config::get().network.clone(), assignment: NetworkAssignment { url: format!("https://metadata.sqd-datasets.io/{filename}"), - id: format!("{ts}_{hash:X}") + id: format!("{timestamp}_{hash:X}") } }; let contents = serde_json::to_vec(&network_state).unwrap(); @@ -347,11 +354,12 @@ impl S3Storage { .client .put_object() .bucket(&self.config.scheduler_state_bucket) + //.bucket("network-scheduler-state") .key(Config::get().network_state_name.clone()) .body(contents.into()) .send() .await - .map_err(|e| log::error!("Error saving assignment: {e:?}")); + .map_err(|e| log::error!("Error saving link to assignment: {e:?}")); prometheus_metrics::s3_request(); } From 0c840bc63fb22192512a7b91687b9545af666257 Mon Sep 17 00:00:00 2001 From: Denis Kanonik Date: Mon, 21 Oct 2024 10:57:20 +0000 Subject: [PATCH 4/7] Make assignment upload more frequent --- crates/network-scheduler/src/assignment.rs | 86 +++------------------- crates/network-scheduler/src/server.rs | 70 +++++++++++++++++- crates/network-scheduler/src/storage.rs | 5 +- 3 files changed, 78 insertions(+), 83 deletions(-) diff --git a/crates/network-scheduler/src/assignment.rs b/crates/network-scheduler/src/assignment.rs index e4535a5..a655d4a 100644 --- a/crates/network-scheduler/src/assignment.rs +++ b/crates/network-scheduler/src/assignment.rs @@ -1,9 +1,6 @@ -use base64::{engine::general_purpose::STANDARD as base64, Engine}; -use flate2::read::GzDecoder; -use std::{collections::HashMap, io::Read}; +use std::collections::HashMap; use serde::{Deserialize, Serialize}; -use serde_json::Value; use crate::signature::timed_hmac_now; @@ -13,7 +10,7 @@ pub struct Chunk { pub id: String, pub base_url: String, pub files: HashMap, - size_bytes: u64, + pub size_bytes: u64, } #[derive(Debug, Serialize, Deserialize)] @@ -61,13 +58,21 @@ pub struct NetworkState { } impl Assignment { - fn add_chunk(&mut self, chunk: Chunk, dataset_id: String, dataset_url: String) { + pub fn add_chunk(&mut self, chunk: Chunk, dataset_id: String, dataset_url: String) { match self.datasets.iter_mut().find(|dataset| dataset.id == dataset_id) { Some(dataset) => dataset.chunks.push(chunk), None => self.datasets.push(Dataset { id: dataset_id, base_url: dataset_url, chunks: vec![chunk] }), } self.chunk_map = None } + + pub fn insert_assignment(&mut self, peer_id: String, status: String, chunks_deltas: Vec) { + self.worker_assignments.insert(peer_id.clone(), WorkerAssignment { + status, + chunks_deltas, + encrypted_headers: Default::default() + }); + } pub fn dataset_chunks_for_peer_id(&self, peer_id: String) -> Option> { let local_assignment = match self.worker_assignments.get(&peer_id) { @@ -111,13 +116,6 @@ impl Assignment { } } Some(result) - // let mut result: Vec = Default::default(); - // let mut cursor = 0; - // for v in &local_assignment.chunks_deltas { - // cursor += v; - // result.push(flat_chunks[cursor as usize].clone()); - // } - // Some(result) } pub fn headers_for_peer_id(&self, peer_id: String) -> Option> { @@ -167,66 +165,4 @@ impl Assignment { } } } - - pub fn new(scheduler_json: &Value, cloudflare_storage_secret: String) -> Self { - let mut assignment: Assignment = Default::default(); - let mut aux: HashMap> = Default::default(); - let units = scheduler_json.get("known_units").unwrap().as_object().unwrap(); - for (k, unit) in units { - let mut local_ids: Vec = Default::default(); - for chunk in unit.get("chunks").unwrap().as_array().unwrap() { - let chunk_str = chunk.get("chunk_str").unwrap().as_str().unwrap().to_string(); - let download_url = chunk.get("download_url").unwrap().as_str().unwrap().to_string(); - let mut files: HashMap = Default::default(); - for file in chunk.get("filenames").unwrap().as_array().unwrap() { - let filename = file.as_str().unwrap().to_string(); - files.insert(filename.clone(), filename); - } - let dataset_str = chunk.get("dataset_id").unwrap().as_str().unwrap().to_string(); - let dataset_id = base64.encode(dataset_str); - let size_bytes = chunk.get("size_bytes").unwrap().as_u64().unwrap(); - let chunk = Chunk { - id: chunk_str.clone(), - base_url: format!("{download_url}/{chunk_str}"), - files, - size_bytes, - }; - - assignment.add_chunk(chunk, dataset_id, download_url); - local_ids.push(chunk_str); - } - aux.insert(k.clone(), local_ids); - } - - let workers = scheduler_json.get("worker_states").unwrap().as_object().unwrap(); - for (worker_id, data) in workers { - let peer_id = worker_id.clone(); - let status = match data.get("jail_reason").unwrap().as_str() { - Some(str) => str.to_string(), - None => "Ok".to_string() - }; - let mut chunks_idxs: Vec = Default::default(); - let units = data.get("assigned_units").unwrap().as_array().unwrap(); - for unit in units { - let unit_id = unit.as_str().unwrap().to_string(); - for chunk_id in aux.get(&unit_id).unwrap() { - chunks_idxs.push(assignment.chunk_index(chunk_id.clone()).unwrap()); - } - } - chunks_idxs.sort(); - for i in (1..chunks_idxs.len()).rev() { - chunks_idxs[i] -= chunks_idxs[i - 1]; - }; - - assignment.worker_assignments.insert(peer_id.clone(), WorkerAssignment { - status, - chunks_deltas: chunks_idxs, - encrypted_headers: Default::default() - }); - } - - assignment.regenerate_headers(cloudflare_storage_secret); - - assignment - } } \ No newline at end of file diff --git a/crates/network-scheduler/src/server.rs b/crates/network-scheduler/src/server.rs index ca4ba30..2673dad 100644 --- a/crates/network-scheduler/src/server.rs +++ b/crates/network-scheduler/src/server.rs @@ -12,12 +12,14 @@ use tokio::join; use tokio::signal::unix::{signal, SignalKind}; use tokio::sync::mpsc::Receiver; use tokio::time::Instant; +use base64::{engine::general_purpose::STANDARD as base64, Engine}; use sqd_messages::signatures::msg_hash; use sqd_messages::{Pong, RangeSet}; use sqd_network_transport::util::{CancellationToken, TaskManager}; use sqd_network_transport::{SchedulerEvent, SchedulerTransportHandle}; +use crate::assignment::{Assignment, Chunk}; use crate::cli::Config; use crate::data_chunk::{chunks_to_worker_state, DataChunk}; use crate::scheduler::{ChunkStatus, Scheduler}; @@ -173,9 +175,7 @@ impl Server { scheduler.schedule(current_epoch); match scheduler.to_json() { Ok(state) => { - let assignment_fut = storage_client.save_assignment(&state); - let scheduler_fut = storage_client.save_scheduler(state.clone()); - join!(scheduler_fut, assignment_fut).1 + storage_client.save_scheduler(state.clone()).await }, Err(e) => log::error!("Error serializing scheduler: {e:?}"), } @@ -289,10 +289,16 @@ impl Server { log::info!("Updating chunks summary"); let workers = scheduler.all_workers(); let units = scheduler.known_units(); + let _ = units.iter().take(10).map(|entry| { + log::info!("key: {}, val: {:?}", entry.key(), entry.value().chunks); + }).collect::>(); + let assignment = build_assignment(&workers, &units); + let assignment_fut = storage_client.save_assignment(assignment); let summary = build_chunks_summary(workers, units); let save_fut = storage_client.save_chunks_list(&summary); + scheduler.update_chunks_summary(summary); - save_fut.await; + join!(save_fut, assignment_fut); } }; self.task_manager @@ -318,6 +324,62 @@ fn find_workers_with_chunk( .collect() } +fn build_assignment( + workers: &Vec, + units: &DashMap, +) -> Assignment { + let mut assignment: Assignment = Default::default(); + let mut aux: HashMap> = Default::default(); + for (k, unit) in units.clone() { + let mut local_ids: Vec = Default::default(); + for chunk in unit.chunks { + let chunk_str = chunk.chunk_str; + let download_url = chunk.download_url; + let mut files: HashMap = Default::default(); + for filename in chunk.filenames { + files.insert(filename.clone(), filename); + } + let dataset_str = chunk.dataset_id; + let dataset_id = base64.encode(dataset_str); + let size_bytes = chunk.size_bytes; + let chunk = Chunk { + id: chunk_str.clone(), + base_url: format!("{download_url}/{chunk_str}"), + files, + size_bytes, + }; + + assignment.add_chunk(chunk, dataset_id, download_url); + local_ids.push(chunk_str); + } + aux.insert(k.to_string(), local_ids); + }; + + for worker in workers { + let peer_id = worker.peer_id; + let status = match worker.jail_reason { + Some(str) => str.to_string(), + None => "Ok".to_string() + }; + let mut chunks_idxs: Vec = Default::default(); + + for unit in &worker.assigned_units { + let unit_id = unit.to_string(); + for chunk_id in aux.get(&unit_id).unwrap() { + chunks_idxs.push(assignment.chunk_index(chunk_id.clone()).unwrap()); + } + } + chunks_idxs.sort(); + for i in (1..chunks_idxs.len()).rev() { + chunks_idxs[i] -= chunks_idxs[i - 1]; + }; + + assignment.insert_assignment(peer_id.to_string(), status, chunks_idxs); + }; + assignment.regenerate_headers(Config::get().cloudflare_storage_secret.clone()); + assignment +} + fn build_chunks_summary( workers: Vec, units: DashMap, diff --git a/crates/network-scheduler/src/storage.rs b/crates/network-scheduler/src/storage.rs index b85885e..6058a11 100644 --- a/crates/network-scheduler/src/storage.rs +++ b/crates/network-scheduler/src/storage.rs @@ -12,7 +12,6 @@ use flate2::write::GzEncoder; use flate2::Compression; use itertools::Itertools; use nonempty::NonEmpty; -use serde_json::Value; use tokio::sync::mpsc::{self, Receiver, Sender}; use tokio::sync::Mutex; use sha2::{Sha256, Digest}; @@ -308,10 +307,8 @@ impl S3Storage { prometheus_metrics::s3_request(); } - pub async fn save_assignment(&self, scheduler_state: &[u8]) { + pub async fn save_assignment(&self, assignment: Assignment) { log::debug!("Encoding assignment"); - let json: Value = serde_json::from_slice(scheduler_state).unwrap(); - let assignment = Assignment::new(&json, Config::get().cloudflare_storage_secret.clone()); let mut encoder = GzEncoder::new(Vec::new(), Compression::best()); let _ = encoder.write_all(serde_json::to_vec(&assignment).unwrap().as_slice()); let compressed_bytes = encoder.finish().unwrap(); From bbacf49c0ba5b666b6ba87db1d0b39a166abb2e3 Mon Sep 17 00:00:00 2001 From: Denis Kanonik Date: Mon, 21 Oct 2024 12:42:59 +0000 Subject: [PATCH 5/7] new pings --- Cargo.toml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 9f7ba27..ac4fc3c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,6 +3,6 @@ members = ["crates/*"] resolver = "2" [workspace.dependencies] -sqd-contract-client = { git = "https://github.com/subsquid/subsquid-network.git", version = "1.0.7" } -sqd-messages = { git = "https://github.com/subsquid/subsquid-network.git", version = "1.0.3" } -sqd-network-transport = { git = "https://github.com/subsquid/subsquid-network.git", version = "1.0.23" } +sqd-contract-client = { git = "https://github.com/subsquid/subsquid-network.git", branch="new_pings", version = "1.0.7" } +sqd-messages = { git = "https://github.com/subsquid/subsquid-network.git", branch="new_pings", version = "1.0.3" } +sqd-network-transport = { git = "https://github.com/subsquid/subsquid-network.git", branch="new_pings", version = "1.0.23" } From b7e79bf11f5a9cabad389448d73002bf14f69431 Mon Sep 17 00:00:00 2001 From: Denis Kanonik Date: Wed, 23 Oct 2024 15:16:35 +0000 Subject: [PATCH 6/7] remove exessive logs --- crates/network-scheduler/src/server.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/crates/network-scheduler/src/server.rs b/crates/network-scheduler/src/server.rs index 2673dad..ebbe039 100644 --- a/crates/network-scheduler/src/server.rs +++ b/crates/network-scheduler/src/server.rs @@ -289,9 +289,6 @@ impl Server { log::info!("Updating chunks summary"); let workers = scheduler.all_workers(); let units = scheduler.known_units(); - let _ = units.iter().take(10).map(|entry| { - log::info!("key: {}, val: {:?}", entry.key(), entry.value().chunks); - }).collect::>(); let assignment = build_assignment(&workers, &units); let assignment_fut = storage_client.save_assignment(assignment); let summary = build_chunks_summary(workers, units); From b21c79b1669180ebfe9411d2dbcb1984e2de93c8 Mon Sep 17 00:00:00 2001 From: Denis Kanonik Date: Thu, 24 Oct 2024 07:39:03 +0000 Subject: [PATCH 7/7] Bump version --- crates/network-scheduler/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/network-scheduler/Cargo.toml b/crates/network-scheduler/Cargo.toml index 29e16a8..8e54274 100644 --- a/crates/network-scheduler/Cargo.toml +++ b/crates/network-scheduler/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "network-scheduler" -version = "1.0.26" +version = "1.0.27" edition = "2021" [dependencies]