Skip to content

Commit

Permalink
Merge pull request #31 from subsquid/new_assignments
Browse files Browse the repository at this point in the history
Storing new assignments
  • Loading branch information
denisbsu authored Oct 24, 2024
2 parents 950443b + b21c79b commit 649c795
Show file tree
Hide file tree
Showing 8 changed files with 304 additions and 8 deletions.
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ members = ["crates/*"]
resolver = "2"

[workspace.dependencies]
sqd-contract-client = { git = "https://github.com/subsquid/subsquid-network.git", version = "1.0.7" }
sqd-messages = { git = "https://github.com/subsquid/subsquid-network.git", version = "1.0.3" }
sqd-network-transport = { git = "https://github.com/subsquid/subsquid-network.git", version = "1.0.23" }
sqd-contract-client = { git = "https://github.com/subsquid/subsquid-network.git", branch="new_pings", version = "1.0.7" }
sqd-messages = { git = "https://github.com/subsquid/subsquid-network.git", branch="new_pings", version = "1.0.3" }
sqd-network-transport = { git = "https://github.com/subsquid/subsquid-network.git", branch="new_pings", version = "1.0.23" }
3 changes: 2 additions & 1 deletion crates/network-scheduler/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "network-scheduler"
version = "1.0.26"
version = "1.0.27"
edition = "2021"

[dependencies]
Expand Down Expand Up @@ -43,6 +43,7 @@ url = "2.5.0"
sqd-contract-client = { workspace = true }
sqd-messages = { workspace = true, features = ["semver"] }
sqd-network-transport = { workspace = true, features = ["scheduler", "metrics"] }
chrono = "0.4.38"

[target.'cfg(not(target_env = "msvc"))'.dependencies]
tikv-jemallocator = "0.6"
1 change: 1 addition & 0 deletions crates/network-scheduler/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ dataset_buckets:
- bsc-mainnet-1
- base-1
- moonbeam-evm-1
network_state_name: network-state.json
scheduler_state_bucket: network-scheduler-state-test
supported_worker_versions: ">=1.0.0-rc3"
recommended_worker_versions: ">=1.0.0"
Expand Down
168 changes: 168 additions & 0 deletions crates/network-scheduler/src/assignment.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
use std::collections::HashMap;

use serde::{Deserialize, Serialize};

use crate::signature::timed_hmac_now;

#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Chunk {
pub id: String,
pub base_url: String,
pub files: HashMap<String, String>,
pub size_bytes: u64,
}

#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Dataset {
pub id: String,
pub base_url: String,
pub chunks: Vec<Chunk>,
}

#[derive(Debug, Default, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
struct EncryptedHeaders {
worker_id: String,
worker_signature: String,
}

#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
struct WorkerAssignment {
status: String,
chunks_deltas: Vec<u64>,
encrypted_headers: EncryptedHeaders,
}

#[derive(Debug, Default, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Assignment {
datasets: Vec<Dataset>,
worker_assignments: HashMap<String, WorkerAssignment>,
#[serde(skip)]
chunk_map: Option<HashMap<String, u64>>,
}

#[derive(Serialize, Deserialize)]
pub struct NetworkAssignment {
pub(crate) url: String,
pub(crate) id: String,
}

#[derive(Serialize, Deserialize)]
pub struct NetworkState {
pub(crate) network: String,
pub(crate) assignment: NetworkAssignment
}

impl Assignment {
pub fn add_chunk(&mut self, chunk: Chunk, dataset_id: String, dataset_url: String) {
match self.datasets.iter_mut().find(|dataset| dataset.id == dataset_id) {
Some(dataset) => dataset.chunks.push(chunk),
None => self.datasets.push(Dataset { id: dataset_id, base_url: dataset_url, chunks: vec![chunk] }),
}
self.chunk_map = None
}

pub fn insert_assignment(&mut self, peer_id: String, status: String, chunks_deltas: Vec<u64>) {
self.worker_assignments.insert(peer_id.clone(), WorkerAssignment {
status,
chunks_deltas,
encrypted_headers: Default::default()
});
}

pub fn dataset_chunks_for_peer_id(&self, peer_id: String) -> Option<Vec<Dataset>> {
let local_assignment = match self.worker_assignments.get(&peer_id) {
Some(worker_assignment) => worker_assignment,
None => {
return None
}
};
let mut result: Vec<Dataset> = Default::default();
let mut idxs: Vec<u64> = Default::default();
let mut cursor = 0;
for v in &local_assignment.chunks_deltas {
cursor += v;
idxs.push(cursor);
}
cursor = 0;
for u in &self.datasets {
if idxs.is_empty() {
break;
}
let mut filtered_chunks: Vec<Chunk> = Default::default();
for v in &u.chunks {
if idxs[0] < cursor {
return None; // Malformed diffs
}
if idxs[0] == cursor {
filtered_chunks.push(v.clone());
idxs.remove(0);
}
if idxs.is_empty() {
break;
}
cursor += 1;
}
if !filtered_chunks.is_empty() {
result.push(Dataset {
id: u.id.clone(),
base_url: u.base_url.clone(),
chunks: filtered_chunks
});
}
}
Some(result)
}

pub fn headers_for_peer_id(&self, peer_id: String) -> Option<HashMap<String, String>> {
let local_assignment = match self.worker_assignments.get(&peer_id) {
Some(worker_assignment) => worker_assignment,
None => {
return None
}
};
let headers = match serde_json::to_value(&local_assignment.encrypted_headers) {
Ok(v) => v,
Err(_) => {
return None;
}
};
let mut result: HashMap<String, String> = Default::default();
for (k,v) in headers.as_object().unwrap() {
result.insert(k.to_string(), v.as_str().unwrap().to_string());
}
Some(result)
}

pub fn chunk_index(&mut self, chunk_id: String) -> Option<u64> {
if self.chunk_map.is_none() {
let mut chunk_map: HashMap<String, u64> = Default::default();
let mut idx = 0;
for dataset in &self.datasets {
for chunk in &dataset.chunks {
chunk_map.insert(chunk.id.clone(), idx);
idx += 1;
}
};
self.chunk_map = Some(chunk_map);
};
self.chunk_map.as_ref().unwrap().get(&chunk_id).cloned()
}

pub fn regenerate_headers(&mut self, cloudflare_storage_secret: String) {
for (worker_id, worker_assignment) in &mut self.worker_assignments {
let worker_signature = timed_hmac_now(
worker_id,
&cloudflare_storage_secret,
);
worker_assignment.encrypted_headers = EncryptedHeaders {
worker_id: worker_id.to_string(),
worker_signature,
}
}
}
}
1 change: 1 addition & 0 deletions crates/network-scheduler/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub struct Config {
#[serde(default = "default_storage_domain")]
pub storage_domain: String,
pub dataset_buckets: Vec<String>,
pub network_state_name: String,
pub scheduler_state_bucket: String,
#[serde(skip_serializing)]
pub cloudflare_storage_secret: String,
Expand Down
6 changes: 4 additions & 2 deletions crates/network-scheduler/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use clap::Parser;
use env_logger::Env;
use prometheus_client::registry::Registry;

use sqd_network_transport::{P2PTransportBuilder, SchedulerConfig};
use sqd_network_transport::{get_agent_info, AgentInfo, P2PTransportBuilder, SchedulerConfig};

use crate::cli::{Cli, Config};
use crate::server::Server;
Expand All @@ -18,6 +18,7 @@ mod server;
mod signature;
mod storage;
mod worker_state;
mod assignment;

#[cfg(not(target_env = "msvc"))]
use tikv_jemallocator::Jemalloc;
Expand All @@ -42,7 +43,8 @@ async fn main() -> anyhow::Result<()> {
prometheus_metrics::register_metrics(&mut metrics_registry);

// Build P2P transport
let transport_builder = P2PTransportBuilder::from_cli(args.transport).await?;
let agent_info = get_agent_info!();
let transport_builder = P2PTransportBuilder::from_cli(args.transport, agent_info).await?;
let contract_client: Box<dyn sqd_contract_client::Client> = transport_builder.contract_client();
let local_peer_id = transport_builder.local_peer_id();
let scheduler_config = SchedulerConfig {
Expand Down
68 changes: 66 additions & 2 deletions crates/network-scheduler/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,18 @@ use futures::{Stream, StreamExt};
use itertools::Itertools;
use parking_lot::Mutex;
use prometheus_client::registry::Registry;
use tokio::join;
use tokio::signal::unix::{signal, SignalKind};
use tokio::sync::mpsc::Receiver;
use tokio::time::Instant;
use base64::{engine::general_purpose::STANDARD as base64, Engine};

use sqd_messages::signatures::msg_hash;
use sqd_messages::{Pong, RangeSet};
use sqd_network_transport::util::{CancellationToken, TaskManager};
use sqd_network_transport::{SchedulerEvent, SchedulerTransportHandle};

use crate::assignment::{Assignment, Chunk};
use crate::cli::Config;
use crate::data_chunk::{chunks_to_worker_state, DataChunk};
use crate::scheduler::{ChunkStatus, Scheduler};
Expand Down Expand Up @@ -171,7 +174,9 @@ impl Server {
if current_epoch >= last_schedule_epoch + schedule_interval {
scheduler.schedule(current_epoch);
match scheduler.to_json() {
Ok(state) => storage_client.save_scheduler(state).await,
Ok(state) => {
storage_client.save_scheduler(state.clone()).await
},
Err(e) => log::error!("Error serializing scheduler: {e:?}"),
}
}
Expand Down Expand Up @@ -284,10 +289,13 @@ impl Server {
log::info!("Updating chunks summary");
let workers = scheduler.all_workers();
let units = scheduler.known_units();
let assignment = build_assignment(&workers, &units);
let assignment_fut = storage_client.save_assignment(assignment);
let summary = build_chunks_summary(workers, units);
let save_fut = storage_client.save_chunks_list(&summary);

scheduler.update_chunks_summary(summary);
save_fut.await;
join!(save_fut, assignment_fut);
}
};
self.task_manager
Expand All @@ -313,6 +321,62 @@ fn find_workers_with_chunk(
.collect()
}

fn build_assignment(
workers: &Vec<WorkerState>,
units: &DashMap<UnitId, SchedulingUnit>,
) -> Assignment {
let mut assignment: Assignment = Default::default();
let mut aux: HashMap<String, Vec<String>> = Default::default();
for (k, unit) in units.clone() {
let mut local_ids: Vec<String> = Default::default();
for chunk in unit.chunks {
let chunk_str = chunk.chunk_str;
let download_url = chunk.download_url;
let mut files: HashMap<String, String> = Default::default();
for filename in chunk.filenames {
files.insert(filename.clone(), filename);
}
let dataset_str = chunk.dataset_id;
let dataset_id = base64.encode(dataset_str);
let size_bytes = chunk.size_bytes;
let chunk = Chunk {
id: chunk_str.clone(),
base_url: format!("{download_url}/{chunk_str}"),
files,
size_bytes,
};

assignment.add_chunk(chunk, dataset_id, download_url);
local_ids.push(chunk_str);
}
aux.insert(k.to_string(), local_ids);
};

for worker in workers {
let peer_id = worker.peer_id;
let status = match worker.jail_reason {
Some(str) => str.to_string(),
None => "Ok".to_string()
};
let mut chunks_idxs: Vec<u64> = Default::default();

for unit in &worker.assigned_units {
let unit_id = unit.to_string();
for chunk_id in aux.get(&unit_id).unwrap() {
chunks_idxs.push(assignment.chunk_index(chunk_id.clone()).unwrap());
}
}
chunks_idxs.sort();
for i in (1..chunks_idxs.len()).rev() {
chunks_idxs[i] -= chunks_idxs[i - 1];
};

assignment.insert_assignment(peer_id.to_string(), status, chunks_idxs);
};
assignment.regenerate_headers(Config::get().cloudflare_storage_secret.clone());
assignment
}

fn build_chunks_summary(
workers: Vec<WorkerState>,
units: DashMap<UnitId, SchedulingUnit>,
Expand Down
Loading

0 comments on commit 649c795

Please sign in to comment.