From fab42c20287c3561e476be258f36c18c222375a7 Mon Sep 17 00:00:00 2001 From: Rafael Zanella <rafzane@gmail.com> Date: Tue, 15 Nov 2016 20:02:48 -0200 Subject: [PATCH] Added ChunkHasher class Wraps CallableChunkHasher removed from Torrent class --- .../com/turn/ttorrent/common/ChunkHasher.java | 127 ++++++++++++++++++ .../com/turn/ttorrent/common/Torrent.java | 57 +++----- 2 files changed, 142 insertions(+), 42 deletions(-) create mode 100644 core/src/main/java/com/turn/ttorrent/common/ChunkHasher.java diff --git a/core/src/main/java/com/turn/ttorrent/common/ChunkHasher.java b/core/src/main/java/com/turn/ttorrent/common/ChunkHasher.java new file mode 100644 index 000000000..c18fb96f4 --- /dev/null +++ b/core/src/main/java/com/turn/ttorrent/common/ChunkHasher.java @@ -0,0 +1,127 @@ +package com.turn.ttorrent.common; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.concurrent.*; + +/** + * Hashes the torrent's file pieces + * + * @author rzanella + */ +public class ChunkHasher { + + private static final Logger logger = LoggerFactory.getLogger(ChunkHasher.class); + + private final ExecutorService executor; + + /** + * Matches the number of threads + */ + private final ArrayBlockingQueue<MessageDigest> mdQueue; + + /** + * A limited pool of buffers, so that: + * + * - We don't thrash the memory with a lot of short-lived objects + * - We don't use a lot of memory when we're ingesting a huge amount of data + * + * The ByteBuffers are array backed, so the APIs they get sent to have no need to instantiate one + */ + + private final ArrayBlockingQueue<ByteBuffer> bbQueue; + + /** + * Creates the resources needed to hash the enqueued pieces + * + * @param threads number of workers to create + * @param pieceLength size of the pieces that will be received, has to be informed upon creation since + * the user will get the buffer from here + */ + public ChunkHasher(int threads, int pieceLength) throws InterruptedException, NoSuchAlgorithmException { + mdQueue = new ArrayBlockingQueue<MessageDigest>(threads); + + for (int i = 0; i < threads; i++) { + mdQueue.add(MessageDigest.getInstance("SHA-1")); + } + + bbQueue = new ArrayBlockingQueue<ByteBuffer>(threads + 1); + + for (int i = 0; i < threads + 1; i++) { + bbQueue.add(ByteBuffer.allocate(pieceLength)); + } + + executor = Executors.newFixedThreadPool(threads); + } + + /** + * + * @param buffer + * @return Future so that the user can order the results on it's side + * @throws NoSuchAlgorithmException + */ + public Future<String> enqueueChunk(ByteBuffer buffer) throws NoSuchAlgorithmException { + return executor.submit(new CallableChunkHasher(buffer)); + } + + /** + * + * @return an array-backed ByteBuffer of pieceLength size + * @throws InterruptedException + */ + public ByteBuffer getBuffer() throws InterruptedException { + return bbQueue.take(); + } + + /** + * Clears the internal resources + * + * @throws InterruptedException + */ + public void shutdown() throws InterruptedException { + // Request orderly executor shutdown and wait for hashing tasks to + // complete. + executor.shutdown(); + while (!executor.isTerminated()) { + Thread.sleep(10); + } + } + + /** + * A {@link Callable} to hash a data chunk. + * + * @author mpetazzoni + */ + private class CallableChunkHasher implements Callable<String> { + + private final ByteBuffer data; + + CallableChunkHasher(ByteBuffer rentedBuffer) throws NoSuchAlgorithmException { + this.data = rentedBuffer; + } + + @Override + public String call() throws UnsupportedEncodingException, InterruptedException { + final MessageDigest md = mdQueue.remove(); + + this.data.mark(); + this.data.reset(); + md.update(this.data); + + final String hash = new String(md.digest(), Torrent.BYTE_ENCODING); + + this.data.clear(); + bbQueue.add(this.data); + + md.reset(); + mdQueue.add(md); + + return hash; + } + } +} diff --git a/core/src/main/java/com/turn/ttorrent/common/Torrent.java b/core/src/main/java/com/turn/ttorrent/common/Torrent.java index 2b3af8812..222e25d13 100644 --- a/core/src/main/java/com/turn/ttorrent/common/Torrent.java +++ b/core/src/main/java/com/turn/ttorrent/common/Torrent.java @@ -667,33 +667,7 @@ private static Torrent create(File parent, List<File> files, int pieceLength, return new Torrent(baos.toByteArray(), true); } - /** - * A {@link Callable} to hash a data chunk. - * - * @author mpetazzoni - */ - private static class CallableChunkHasher implements Callable<String> { - private final MessageDigest md; - private final ByteBuffer data; - - CallableChunkHasher(ByteBuffer buffer) throws NoSuchAlgorithmException { - this.md = MessageDigest.getInstance("SHA-1"); - - this.data = ByteBuffer.allocate(buffer.remaining()); - buffer.mark(); - this.data.put(buffer); - this.data.clear(); - buffer.reset(); - } - - @Override - public String call() throws UnsupportedEncodingException { - this.md.reset(); - this.md.update(this.data.array()); - return new String(md.digest(), Torrent.BYTE_ENCODING); - } - } /** * Return the concatenation of the SHA-1 hashes of a file's pieces. @@ -710,22 +684,22 @@ public String call() throws UnsupportedEncodingException { * * @param file The file to hash. */ - private static String hashFile(File file, int pieceLenght) + private static String hashFile(File file, int pieceLength) throws InterruptedException, IOException, NoSuchAlgorithmException { - return Torrent.hashFiles(Arrays.asList(new File[] { file }), pieceLenght); + return Torrent.hashFiles(Arrays.asList(new File[] { file }), pieceLength); } - private static String hashFiles(List<File> files, int pieceLenght) + private static String hashFiles(List<File> files, int pieceLength) throws InterruptedException, IOException, NoSuchAlgorithmException { int threads = getHashingThreadsCount(); - ExecutorService executor = Executors.newFixedThreadPool(threads); - ByteBuffer buffer = ByteBuffer.allocate(pieceLenght); List<Future<String>> results = new LinkedList<Future<String>>(); StringBuilder hashes = new StringBuilder(); + final ChunkHasher chunkHasher = new ChunkHasher(threads, pieceLength); long length = 0L; int pieces = 0; + ByteBuffer buffer = null; long start = System.nanoTime(); for (File file : files) { logger.info("Hashing data from {} with {} threads ({} pieces)...", @@ -733,7 +707,7 @@ private static String hashFiles(List<File> files, int pieceLenght) file.getName(), threads, (int) (Math.ceil( - (double)file.length() / pieceLenght)) + (double)file.length() / pieceLength)) }); length += file.length(); @@ -743,10 +717,14 @@ private static String hashFiles(List<File> files, int pieceLenght) int step = 10; try { + buffer = chunkHasher.getBuffer(); + while (channel.read(buffer) > 0) { if (buffer.remaining() == 0) { buffer.clear(); - results.add(executor.submit(new CallableChunkHasher(buffer))); + results.add(chunkHasher.enqueueChunk(buffer)); + + buffer = chunkHasher.getBuffer(); } if (results.size() >= threads) { @@ -765,24 +743,19 @@ private static String hashFiles(List<File> files, int pieceLenght) } // Hash the last bit, if any - if (buffer.position() > 0) { + if ((buffer != null) && (buffer.position() > 0)) { buffer.limit(buffer.position()); buffer.position(0); - results.add(executor.submit(new CallableChunkHasher(buffer))); + results.add(chunkHasher.enqueueChunk(buffer)); } pieces += accumulateHashes(hashes, results); - // Request orderly executor shutdown and wait for hashing tasks to - // complete. - executor.shutdown(); - while (!executor.isTerminated()) { - Thread.sleep(10); - } + chunkHasher.shutdown(); long elapsed = System.nanoTime() - start; int expectedPieces = (int) (Math.ceil( - (double)length / pieceLenght)); + (double)length / pieceLength)); logger.info("Hashed {} file(s) ({} bytes) in {} pieces ({} expected) in {}ms.", new Object[] { files.size(),