Skip to content

Commit

Permalink
Using Commons Pool
Browse files Browse the repository at this point in the history
  • Loading branch information
zanella committed Nov 16, 2016
1 parent 3f99894 commit 500062b
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 57 deletions.
5 changes: 5 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@
<artifactId>commons-io</artifactId>
<version>2.4</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.4.2</version>
</dependency>
<dependency>
<groupId>org.simpleframework</groupId>
<artifactId>simple</artifactId>
Expand Down
51 changes: 51 additions & 0 deletions core/src/main/java/com/turn/ttorrent/common/ByteBufferPool.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package com.turn.ttorrent.common;

import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

import java.nio.ByteBuffer;

public class ByteBufferPool {

private final GenericObjectPool<ByteBuffer> pool;

public GenericObjectPool<ByteBuffer> getPool() {
return pool;
}

public ByteBufferPool(int amount, int length) {
final GenericObjectPoolConfig gopc = new GenericObjectPoolConfig();
gopc.setMinIdle(amount);
gopc.setMaxTotal(amount);

pool = new GenericObjectPool<ByteBuffer>(new ByteBufferFactory(length), gopc);
}

private class ByteBufferFactory extends BasePooledObjectFactory<ByteBuffer> {
private final int length;

private ByteBufferFactory(int length) {
super();

this.length = length;
}

@Override
public ByteBuffer create() {
return ByteBuffer.allocate(length);
}

@Override
public PooledObject<ByteBuffer> wrap(ByteBuffer buffer) {
return new DefaultPooledObject<ByteBuffer>(buffer);
}

@Override
public void passivateObject(PooledObject<ByteBuffer> pooledObject) {
pooledObject.getObject().clear();
}
}
}

This file was deleted.

33 changes: 17 additions & 16 deletions core/src/main/java/com/turn/ttorrent/common/Torrent.java
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ public static Torrent load(File torrent, boolean seeder)
* torrent's creator.
*/
public static Torrent create(File source, URI announce, String createdBy)
throws InterruptedException, IOException, NoSuchAlgorithmException {
throws InterruptedException, IOException, NoSuchAlgorithmException, Exception {
return Torrent.create(source, null, DEFAULT_PIECE_LENGTH,
announce, null, createdBy);
}
Expand All @@ -528,7 +528,7 @@ public static Torrent create(File source, URI announce, String createdBy)
* torrent's creator.
*/
public static Torrent create(File parent, List<File> files, URI announce,
String createdBy) throws InterruptedException, IOException, NoSuchAlgorithmException {
String createdBy) throws InterruptedException, IOException, NoSuchAlgorithmException, Exception {
return Torrent.create(parent, files, DEFAULT_PIECE_LENGTH,
announce, null, createdBy);
}
Expand All @@ -549,7 +549,7 @@ public static Torrent create(File parent, List<File> files, URI announce,
* torrent's creator.
*/
public static Torrent create(File source, int pieceLength, List<List<URI>> announceList,
String createdBy) throws InterruptedException, IOException, NoSuchAlgorithmException {
String createdBy) throws InterruptedException, IOException, NoSuchAlgorithmException, Exception {
return Torrent.create(source, null, pieceLength,
null, announceList, createdBy);
}
Expand All @@ -574,7 +574,7 @@ public static Torrent create(File source, int pieceLength, List<List<URI>> annou
*/
public static Torrent create(File source, List<File> files, int pieceLength,
List<List<URI>> announceList, String createdBy)
throws InterruptedException, IOException, NoSuchAlgorithmException {
throws InterruptedException, IOException, NoSuchAlgorithmException, Exception {
return Torrent.create(source, files, pieceLength,
null, announceList, createdBy);
}
Expand All @@ -600,7 +600,7 @@ public static Torrent create(File source, List<File> files, int pieceLength,
*/
private static Torrent create(File parent, List<File> files, int pieceLength,
URI announce, List<List<URI>> announceList, String createdBy)
throws InterruptedException, IOException, NoSuchAlgorithmException {
throws InterruptedException, IOException, NoSuchAlgorithmException, Exception {
if (files == null || files.isEmpty()) {
logger.info("Creating single-file torrent for {}...",
parent.getName());
Expand Down Expand Up @@ -676,24 +676,24 @@ private static class CallableChunkHasher implements Callable<String> {

private final MessageDigest md;
private final ByteBuffer data;
private final ByteBufferRentalService bbrs;
private final ByteBufferPool bbp;

CallableChunkHasher(ByteBuffer rentedBuffer, ByteBufferRentalService bbrs) throws NoSuchAlgorithmException {
CallableChunkHasher(ByteBuffer rentedBuffer, ByteBufferPool bbp) throws NoSuchAlgorithmException {
this.md = MessageDigest.getInstance("SHA-1");

rentedBuffer.mark();
rentedBuffer.reset();
this.data = rentedBuffer;

this.bbrs = bbrs;
this.bbp = bbp;
}

@Override
public String call() throws UnsupportedEncodingException, InterruptedException {
this.md.reset();
this.md.update(this.data);

bbrs.put( this.data );
this.bbp.getPool().returnObject(this.data);

return new String(md.digest(), Torrent.BYTE_ENCODING);
}
Expand All @@ -715,15 +715,15 @@ public String call() throws UnsupportedEncodingException, InterruptedException {
* @param file The file to hash.
*/
private static String hashFile(File file, int pieceLenght)
throws InterruptedException, IOException, NoSuchAlgorithmException {
throws InterruptedException, IOException, NoSuchAlgorithmException, Exception {
return Torrent.hashFiles(Arrays.asList(new File[] { file }), pieceLenght);
}

private static String hashFiles(List<File> files, int pieceLenght)
throws InterruptedException, IOException, NoSuchAlgorithmException {
throws InterruptedException, IOException, NoSuchAlgorithmException, Exception {
int threads = getHashingThreadsCount();
ExecutorService executor = Executors.newFixedThreadPool(threads);
final ByteBufferRentalService bbrs = new ByteBufferRentalService(threads + 1, pieceLenght);
final ByteBufferPool bbp = new ByteBufferPool(threads + 1, pieceLenght);
List<Future<String>> results = new LinkedList<Future<String>>();
StringBuilder hashes = new StringBuilder();

Expand All @@ -748,13 +748,14 @@ private static String hashFiles(List<File> files, int pieceLenght)
int step = 10;

try {
buffer = bbrs.take();
buffer = bbp.getPool().borrowObject();

while (channel.read(buffer) > 0) {
if (buffer.remaining() == 0) {
buffer.clear();
results.add(executor.submit(new CallableChunkHasher(buffer, bbrs)));
buffer = bbrs.take();
results.add(executor.submit(new CallableChunkHasher(buffer, bbp)));

buffer = bbp.getPool().borrowObject();
}

if (results.size() >= threads) {
Expand All @@ -776,7 +777,7 @@ private static String hashFiles(List<File> files, int pieceLenght)
if ((buffer != null) && (buffer.position() > 0)) {
buffer.limit(buffer.position());
buffer.position(0);
results.add(executor.submit(new CallableChunkHasher(buffer, bbrs)));
results.add(executor.submit(new CallableChunkHasher(buffer, bbp)));
}

pieces += accumulateHashes(hashes, results);
Expand Down

0 comments on commit 500062b

Please sign in to comment.