Sketching is a bottleneck for the scalability of Snipe #5
Replies: 9 comments
-
I did that already as a fix, and I am now sketching sequencing files in parallel. import multiprocessing
import sys
import threading
import time
import pyfastx
import sourmash
from pathos.multiprocessing import ProcessingPool as Pool
def process_sequences(fasta_file, thread_id, total_threads, progress_queue, batch_size=100000, ksize=51, scaled=10000):
fa_reader = pyfastx.Fastx(fasta_file, comment=True)
mh = sourmash.MinHash(n=0, ksize=ksize, scaled=scaled, track_abundance=True)
local_count = 0 # Local counter to batch updates
total_processed = 0
for idx, (_, seq, _) in enumerate(fa_reader):
if idx % total_threads == thread_id:
mh.add_sequence(seq, force=True)
local_count += 1
total_processed += 1
if local_count >= batch_size:
progress_queue.put(batch_size)
local_count = 0
if local_count > 0:
progress_queue.put(local_count)
return mh
def merge_minhashes(minhashes):
if not minhashes:
raise ValueError("No MinHashes to merge.")
mh_full = minhashes[0]
for mh in minhashes[1:]:
mh_full.merge(mh)
return mh_full
def progress_monitor(progress_queue, progress_interval, total_threads, stop_event):
total = 0
next_update = progress_interval
while not stop_event.is_set() or not progress_queue.empty():
try:
count = progress_queue.get(timeout=0.5)
total += count
if total >= next_update:
print(f"\rProcessed {next_update} sequences.", end='', flush=True)
next_update += progress_interval
except:
pass
print(f"\rProcessed {total} sequences in total.")
def sketch_sample(fasta_file, num_processes=16, progress_interval=1000000):
start_time = time.time()
print(f"Starting sketching with {num_processes} processes...")
manager = multiprocessing.Manager()
progress_queue = manager.Queue()
stop_event = threading.Event()
monitor_thread = threading.Thread(
target=progress_monitor,
args=(progress_queue, progress_interval, num_processes, stop_event),
daemon=True
)
monitor_thread.start()
pool = Pool(nodes=num_processes)
try:
results = []
for thread_id in range(num_processes):
result = pool.apipe(process_sequences, fasta_file, thread_id, num_processes, progress_queue)
results.append(result)
pool.close()
pool.join()
except KeyboardInterrupt:
print("\nReceived interrupt signal. Terminating processes...")
pool.terminate()
pool.join()
stop_event.set()
monitor_thread.join()
sys.exit(1)
finally:
stop_event.set()
monitor_thread.join()
minhashes = [result.get() for result in results]
mh_full = merge_minhashes(minhashes)
signature = sourmash.SourmashSignature(mh_full, name="sample_signature")
end_time = time.time()
print(f"Sketching completed in {end_time - start_time:.2f} seconds.")
return signature
if __name__ == "__main__":
fasta_path = "input.fasta"
num_processes = 8
sample_sketch = sketch_sample(fasta_path, num_processes=num_processes)
sourmash.save_signatures_to_json([sample_sketch], open("output.sig", "w")) |
Beta Was this translation helpful? Give feedback.
-
For some benchmark. I sketched the
|
Beta Was this translation helpful? Give feedback.
-
Seriously! I am still running the old code on the HPC for the cattle. How can this code take a stream from the SRA like the old pipeline? |
Beta Was this translation helpful? Give feedback.
-
Well, that is one limitation. Piping a sequence to the script will limit the speed to the speed of the single IO stream. |
Beta Was this translation helpful? Give feedback.
-
Can you check this? It is Java but I assume the same concept should apply in Python @mr-eyes |
Beta Was this translation helpful? Give feedback.
-
This will be error-prone. Using multiple processes to read from the same stdin in Python is not recommended. stdin isn’t designed for multiple processes accessing it at the same time. When they try, whichever process reads first gets the data, leaving nothing for the others, causing race conditions and data loss. Even though the Global Interpreter Lock (GIL) controls Python threads, it doesn’t help here because each process runs independently and can’t coordinate access to stdin. |
Beta Was this translation helpful? Give feedback.
-
Is not this the responsibility of the bufferReader? I assume the bufferReader receives the stdin then it should hand each 'progress_interval' to an available thread. This would be useless if the download speed is slower than the sketching speed of one core but in AWS, the SRA data transfer is much higher. |
Beta Was this translation helpful? Give feedback.
-
The streaming speed is often much higher than processing speed, so handling
each bunch of data in parallel will not utilize cores.
Mohamed
…On Fri, Oct 11, 2024 at 1:07 PM Tamer Mansour ***@***.***> wrote:
Is not this the responsibility of the bufferReader? I assume the
bufferReader receives the stdin then it should hand each
'progress_interval' to an available thread. This would be useless if the
download speed is slower than the sketching speed of one core but in AWS,
the SRA data transfer is much higher.
—
Reply to this email directly, view it on GitHub
<#5 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/ABWVPKGDKTTD36GRXN33RJLZ3AVXZAVCNFSM6AAAAABPVUWOA6VHI2DSMVQWIX3LMV43URDJONRXK43TNFXW4Q3PNVWWK3TUHMYTAOJRHA4TONQ>
.
You are receiving this because you were mentioned.Message ID:
***@***.***>
|
Beta Was this translation helpful? Give feedback.
-
One way to fix this is to implement a C++/Rust component to efficiently handle the input stream, chunk it, and parallel-process it. |
Beta Was this translation helpful? Give feedback.
-
Snipe skips alignment but still dependent on sketching which is a kind of slow for large samples. Can parallel processing help solving this problem? @mr-eyes
Beta Was this translation helpful? Give feedback.
All reactions