-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add in SILO docker-compose and importer utils #58
base: dev
Are you sure you want to change the base?
Conversation
This is the wrong version. To be updated by. @Taepper. Also, it misses an empty directory called |
Thanks for uploading ! |
Info to recall:
|
Oh, not quite anymore. You cited the instructions for a previous version of the demo. The up-to-date instructions are in
|
Thanks for updating! |
9520188
to
8a953ac
Compare
updated branch for tidyness |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copilot reviewed 7 out of 7 changed files in this pull request and generated 1 comment.
Comments suppressed due to low confidence (1)
ww_test/docker-compose.yml:2
- [nitpick] The service name 'lapisOpen' may be ambiguous as it appears to provide the LAPIS API. Consider renaming it to 'lapis' or 'lapisAPI' for improved clarity.
lapisOpen:
read_only: true | ||
depends_on: | ||
siloPreprocessing: | ||
condition: service_completed_successfully |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The depends_on condition 'service_completed_successfully' is a legacy feature from Docker Compose v2 and is not supported in v3+. Please either specify a compatible version or refactor to use healthchecks to enforce startup order.
Copilot is powered by AI, so mistakes are possible. Review output carefully before use.
diff --git c/.gitignore i/.gitignore index 7379c34..5472ddf 100644 --- c/.gitignore +++ i/.gitignore @@ -148,3 +148,6 @@ results # Snakemake .snakemake/ + +# Rust +/target diff --git c/Cargo.lock i/Cargo.lock new file mode 100644 index 0000000..050b9d9 --- /dev/null +++ i/Cargo.lock @@ -0,0 +1,102 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 3 + +[[package]] +name = "add_offset" +version = "0.1.0" +dependencies = [ + "serde_json", +] + +[[package]] +name = "itoa" +version = "1.0.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d75a2a4b1b190afb6f5425f10f6a8f959d2ea0b9c2b1d79553551850539e4674" + +[[package]] +name = "memchr" +version = "2.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" + +[[package]] +name = "proc-macro2" +version = "1.0.94" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a31971752e70b8b2686d7e46ec17fb38dad4051d94024c88df49b667caea9c84" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quote" +version = "1.0.39" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1f1914ce909e1658d9907913b4b91947430c7d9be598b15a1912935b8c04801" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "ryu" +version = "1.0.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ea1a2d0a644769cc99faa24c3ad26b379b786fe7c36fd3c546254801650e6dd" + +[[package]] +name = "serde" +version = "1.0.218" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8dfc9d19bdbf6d17e22319da49161d5d0108e4188e8b680aef6299eed22df60" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.218" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f09503e191f4e797cb8aac08e9a4a4695c5edf6a2e70e376d961ddd5c969f82b" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "serde_json" +version = "1.0.140" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20068b6e96dc6c9bd23e01df8827e6c7e1f2fddd43c21810382803c136b99373" +dependencies = [ + "itoa", + "memchr", + "ryu", + "serde", +] + +[[package]] +name = "sort_by_offset" +version = "0.1.0" +dependencies = [ + "serde_json", +] + +[[package]] +name = "syn" +version = "2.0.99" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e02e925281e18ffd9d640e234264753c43edc62d64b2d4cf898f1bc5e75f3fc2" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "unicode-ident" +version = "1.0.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00e2473a93778eb0bad35909dff6a10d28e63f792f16ed15e404fca9d5eeedbe" diff --git c/Cargo.toml i/Cargo.toml new file mode 100644 index 0000000..c1c651a --- /dev/null +++ i/Cargo.toml @@ -0,0 +1,7 @@ +[workspace] +resolver = "2" + +members = [ + "add_offset", + "sort_by_offset" +] diff --git c/add_offset/Cargo.toml i/add_offset/Cargo.toml new file mode 100644 index 0000000..dfec250 --- /dev/null +++ i/add_offset/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "add_offset" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +serde_json = "1.0" + diff --git c/add_offset/src/main.rs i/add_offset/src/main.rs new file mode 100644 index 0000000..0a608b9 --- /dev/null +++ i/add_offset/src/main.rs @@ -0,0 +1,33 @@ +use serde_json::Value; +use std::io::{self, BufRead, Write}; + +fn count_leading_ns(seq: &str) -> usize { + seq.chars().take_while(|&c| c == 'N').count() +} + +fn main() -> std::io::Result<()> { + let stdin = io::stdin(); + let stdout = io::stdout(); + let reader = stdin.lock(); + let mut writer = stdout.lock(); + + for line in reader.lines() { + let line = line?; + let mut json_value: Value = serde_json::from_str(&line).expect("Invalid JSON format"); + + if let Some(main_seq) = json_value.pointer("/alignedNucleotideSequences/main") { + if let Some(main_seq_str) = main_seq.as_str() { + let offset = count_leading_ns(main_seq_str); + + if let Value::Object(ref mut obj) = json_value { + obj.insert("offset".to_string(), Value::from(offset)); + } + } + } + + writeln!(writer, "{}", json_value.to_string())?; + } + + Ok(()) +} + diff --git c/sort_by_offset/Cargo.toml i/sort_by_offset/Cargo.toml new file mode 100644 index 0000000..7cd502e --- /dev/null +++ i/sort_by_offset/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "sort_by_offset" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +serde_json = "1.0" + diff --git c/sort_by_offset/src/main.rs i/sort_by_offset/src/main.rs new file mode 100644 index 0000000..fdaf277 --- /dev/null +++ i/sort_by_offset/src/main.rs @@ -0,0 +1,111 @@ +use std::fs::File; +use std::io::{BufRead, BufReader, BufWriter, Write, Read, stdout, stdin}; +use serde_json::Value; +use std::cmp::Ordering; +use std::collections::BinaryHeap; + +// Function to read ndjson lines from a reader +fn read_ndjson_lines<R: Read>(reader: R) -> std::io::Result<Vec<Value>> { + let reader = BufReader::new(reader); + let mut lines = Vec::new(); + for line in reader.lines() { + let line = line?; + let json: Value = serde_json::from_str(&line)?; + lines.push(json); + } + Ok(lines) +} + +// Function to write ndjson lines to a writer +fn write_ndjson_lines<W: Write>(writer: &mut W, lines: &[Value]) -> std::io::Result<()> { + let mut writer = BufWriter::new(writer); + for line in lines { + writeln!(writer, "{}", line)?; + } + Ok(()) +} + +// Function to sort ndjson lines by "N_length" +fn sort_by_n_length(mut lines: Vec<Value>) -> Vec<Value> { + lines.sort_by(|a, b| { + let a_length = a["N_length"].as_i64().unwrap_or(0); + let b_length = b["N_length"].as_i64().unwrap_or(0); + a_length.cmp(&b_length) + }); + lines +} + +// Merging function that reads from readers and writes to any object implementing `Write` +fn merge_sorted_readers<R: Read, W: Write>(output: &mut W, sorted_readers: &mut [BufReader<R>]) -> std::io::Result<()> { + let mut heap = BinaryHeap::new(); + + // Initialize heap with the first line from each reader + for (index, reader) in sorted_readers.iter_mut().enumerate() { + if let Some(Ok(line)) = reader.lines().next() { + let json: Value = serde_json::from_str(&line)?; + heap.push((json, index)); + } + } + + let mut writer = BufWriter::new(output); + + while let Some((json, index)) = heap.pop() { + writeln!(writer, "{}", json)?; + if let Some(Ok(line)) = sorted_readers[index].lines().next() { + let json: Value = serde_json::from_str(&line)?; + heap.push((json, index)); + } + } + + Ok(()) +} + +fn main() -> std::io::Result<()> { + let chunk_size = 1000; // Adjust based on available memory + let mut chunk_counter = 0; + let mut sorted_files = Vec::new(); + + // Read and process input from stdin in chunks + let reader = stdin(); + let reader = reader.lock(); + let mut lines = Vec::new(); + + for line in BufReader::new(reader).lines() { + let line = line?; + let json: Value = serde_json::from_str(&line)?; + lines.push(json); + + if lines.len() >= chunk_size { + let sorted_lines = sort_by_n_length(lines); + let chunk_file = format!("chunk_{}.ndjson", chunk_counter); + let mut file = File::create(&chunk_file)?; + write_ndjson_lines(&mut file, &sorted_lines)?; + sorted_files.push(File::open(&chunk_file)?); + lines = Vec::new(); + chunk_counter += 1; + } + } + + // Process any remaining lines + if !lines.is_empty() { + let sorted_lines = sort_by_n_length(lines); + let chunk_file = format!("chunk_{}.ndjson", chunk_counter); + let mut file = File::create(&chunk_file)?; + write_ndjson_lines(&mut file, &sorted_lines)?; + sorted_files.push(File::open(&chunk_file)?); + } + + // Convert sorted files to BufReaders + let mut sorted_readers: Vec<_> = sorted_files.iter_mut().map(|f| BufReader::new(f)).collect(); + + // Merge all sorted chunk files and write to stdout + merge_sorted_readers(&mut stdout(), &mut sorted_readers)?; + + // Optionally, clean up temporary chunk files + for file in sorted_files { + drop(file); + } + + Ok(()) +} +
@Taepper just rebased above (it somehow does not say 😲), apologies. There is some heavy lifting going on here. What's happening? What are you sorting the reads by? |
Hehe, we can rewrite the history before merging |
I am sorting by the read's offset for better compression in SILO. And then I also have added another input mode to SILO. Will change the targeted image shortly |
This PR / branch contains the executable written by @Taepper for the first POC for importing V-Pipe's outputs into SILO.
This is the state of the 16.12.2024.
The entire code to execute and run SILO standalone with downloading data from the running
wise-seq.loculus.org demo instance.