From 82fb65193f3f685e9eb7e52f5cad0832e2f32b71 Mon Sep 17 00:00:00 2001
From: Dmitry Panin
Type.PIECE
messages.
+ *
+ * + * To impose rate limits, we only want to throttle when + * processing PIECE messages. All other peer messages + * should be exchanged as quickly as possible. + *
+ * + * @author ptgoetz + * + */ + private abstract class RateLimitThread extends Thread{ + protected Rate rate = new Rate(); + protected long sleep = 1000; + + protected void rateLimit(double maxRate, long messageSize, PeerMessage message){ + if(message.getType() == Type.PIECE && maxRate > 0){ + try { + this.rate.add(messageSize); + // continuously adjust the sleep time to try to hit our + // target rate limit + if(rate.get() > (maxRate * 1024)){ + Thread.sleep(this.sleep); + this.sleep += 50; + } else { + this.sleep -= 50; + } + if(this.sleep < 0){ + this.sleep = 0; + } + } catch (InterruptedException e) { + // not critical + } + } + } + } /** * Incoming messages thread. @@ -200,9 +239,7 @@ public void close() { * * @author mpetazzoni */ - private class IncomingThread extends Thread { - private Rate rate = new Rate(); - private long sleep = 1000; + private class IncomingThread extends RateLimitThread { @Override public void run() { @@ -249,23 +286,7 @@ public void run() { PeerMessage message = PeerMessage.parse(buffer, torrent); logger.trace("Received {} from {}", message, peer); - // throttling - if(message.getType() == Type.PIECE && PeerExchange.this.torrent.getMaxDownloadRate() > 0){ - try { - rate.add(size); - if(rate.get() > (PeerExchange.this.torrent.getMaxDownloadRate() * 1024)){ - Thread.sleep(this.sleep); - this.sleep += 50; - } else { - this.sleep -= 50; - } - if(this.sleep < 0){ - this.sleep = 0; - } - } catch (InterruptedException e) { - // not critical - } - } + rateLimit(PeerExchange.this.torrent.getMaxDownloadRate(), size, message); for (MessageListener listener : listeners) { listener.handleMessage(message); @@ -301,9 +322,7 @@ public void run() { * * @author mpetazzoni */ - private class OutgoingThread extends Thread { - private Rate rate = new Rate(); - private long sleep = 1000; + private class OutgoingThread extends RateLimitThread { @Override public void run() { @@ -338,22 +357,7 @@ public void run() { } } - if(message.getType() == Type.PIECE && PeerExchange.this.torrent.getMaxUploadRate() > 0){ - try { - rate.add(size); - if(rate.get() > (PeerExchange.this.torrent.getMaxUploadRate() * 1024)){ - Thread.sleep(this.sleep); - this.sleep += 50; - } else { - this.sleep -= 50; - } - if(this.sleep < 0){ - this.sleep = 0; - } - } catch (InterruptedException e) { - // not critical - } - } + rateLimit(PeerExchange.this.torrent.getMaxUploadRate(), size, message); } catch (InterruptedException ie) { // Ignore and potentially terminate } From 3ef9e9b1545cdc96135e2bfa72dda6ca96368d0a Mon Sep 17 00:00:00 2001 From: "P. Taylor Goetz"
+ * The algorithm is functional, but could certainly be
+ * improved upon. One obvious drawback is that with large
+ * changes in maxRate
, it will take a while
+ * for the sleep time to adjust and the throttled rate
+ * to "smooth out."
+ *
+ * Ideally, it would calculate the optimal sleep time + * necessary to hit a desired throughput rather than + * continuously adjust toward a goal. + *
+ * + * @param maxRate the target rate in kB/second + * @param messageSize the size, in bytes, of the last message read/written + * @param message the lastPeerMessage
read/written
+ */
+ protected void rateLimit(double maxRate, long messageSize, PeerMessage message) {
+ if(message.getType() == Type.PIECE && maxRate > 0) {
try {
this.rate.add(messageSize);
// continuously adjust the sleep time to try to hit our
// target rate limit
- if(rate.get() > (maxRate * 1024)){
+ if(rate.get() > (maxRate * 1024)) {
Thread.sleep(this.sleep);
this.sleep += 50;
} else {
this.sleep -= 50;
}
- if(this.sleep < 0){
+ if(this.sleep < 0) {
this.sleep = 0;
}
} catch (InterruptedException e) {
From 00bc8f287aa361850c7b8638ce243bd182747025 Mon Sep 17 00:00:00 2001
From: "P. Taylor Goetz" Type.PIECE
messages.
+ * for PIECE
messages.
*
* - * To impose rate limits, we only want to throttle when - * processing PIECE messages. All other peer messages - * should be exchanged as quickly as possible. + * To impose rate limits, we only want to throttle when processing PIECE + * messages. All other peer messages should be exchanged as quickly as + * possible. *
* * @author ptgoetz - * */ private abstract class RateLimitThread extends Thread { - protected Rate rate = new Rate(); + + protected final Rate rate = new Rate(); protected long sleep = 1000; - + /** - * Dynamically determines an amount of time to sleep, based - * on the average read/write throughput. + * Dynamically determines an amount of time to sleep, based on the + * average read/write throughput. * *
- * The algorithm is functional, but could certainly be
- * improved upon. One obvious drawback is that with large
- * changes in maxRate
, it will take a while
- * for the sleep time to adjust and the throttled rate
- * to "smooth out."
+ * The algorithm is functional, but could certainly be improved upon.
+ * One obvious drawback is that with large changes in
+ * maxRate
, it will take a while for the sleep time to
+ * adjust and the throttled rate to "smooth out."
*
- * Ideally, it would calculate the optimal sleep time - * necessary to hit a desired throughput rather than - * continuously adjust toward a goal. + * Ideally, it would calculate the optimal sleep time necessary to hit + * a desired throughput rather than continuously adjust toward a goal. *
* - * @param maxRate the target rate in kB/second - * @param messageSize the size, in bytes, of the last message read/written - * @param message the lastPeerMessage
read/written
+ * @param maxRate the target rate in kB/second.
+ * @param messageSize the size, in bytes, of the last message read/written.
+ * @param message the last PeerMessage
read/written.
*/
protected void rateLimit(double maxRate, long messageSize, PeerMessage message) {
- if(message.getType() == Type.PIECE && maxRate > 0) {
- try {
- this.rate.add(messageSize);
- // continuously adjust the sleep time to try to hit our
- // target rate limit
- if(rate.get() > (maxRate * 1024)) {
- Thread.sleep(this.sleep);
- this.sleep += 50;
- } else {
- this.sleep -= 50;
- }
- if(this.sleep < 0) {
- this.sleep = 0;
- }
- } catch (InterruptedException e) {
- // not critical
+ if (message.getType() != Type.PIECE || maxRate <= 0) {
+ return;
+ }
+
+ try {
+ this.rate.add(messageSize);
+
+ // Continuously adjust the sleep time to try to hit our target
+ // rate limit.
+ if (rate.get() > (maxRate * 1024)) {
+ Thread.sleep(this.sleep);
+ this.sleep += 50;
+ } else {
+ this.sleep = this.sleep > 50
+ ? this.sleep - 50
+ : 0;
}
+ } catch (InterruptedException e) {
+ // Not critical, eat it.
}
}
}
@@ -307,8 +307,10 @@ public void run() {
try {
PeerMessage message = PeerMessage.parse(buffer, torrent);
logger.trace("Received {} from {}", message, peer);
-
- rateLimit(PeerExchange.this.torrent.getMaxDownloadRate(), size, message);
+
+ // Wait if needed to reach configured download rate.
+ this.rateLimit(PeerExchange.this.torrent.getMaxDownloadRate(),
+ size, message);
for (MessageListener listener : listeners) {
listener.handleMessage(message);
@@ -345,7 +347,7 @@ public void run() {
* @author mpetazzoni
*/
private class OutgoingThread extends RateLimitThread {
-
+
@Override
public void run() {
try {
@@ -378,8 +380,10 @@ public void run() {
"Reached end of stream while writing");
}
}
-
- rateLimit(PeerExchange.this.torrent.getMaxUploadRate(), size, message);
+
+ // Wait if needed to reach configured upload rate.
+ this.rateLimit(PeerExchange.this.torrent.getMaxUploadRate(),
+ size, message);
} catch (InterruptedException ie) {
// Ignore and potentially terminate
}
From 64b886552678578d48edee5a860c32387d132246 Mon Sep 17 00:00:00 2001
From: Dan Oxlade
* When the download is complete, the client switches to seeding mode for
* as long as requested in the share()
call, if seeding was
- * requested. If not, the StopSeedingTask will execute immediately to stop
- * the client's main loop.
+ * requested. If not, the {@link ClientShutdown} will execute
+ * immediately to stop the client's main loop.
*
- * This method is called by {@link #stop()} to make sure all connections + * This method is called by {@link Announce#stop()} to make sure all connections * are correctly closed when the announce thread is asked to stop. *
*/ diff --git a/src/main/java/com/turn/ttorrent/client/announce/UDPTrackerClient.java b/src/main/java/com/turn/ttorrent/client/announce/UDPTrackerClient.java index 01b1c0420..5df3229b0 100644 --- a/src/main/java/com/turn/ttorrent/client/announce/UDPTrackerClient.java +++ b/src/main/java/com/turn/ttorrent/client/announce/UDPTrackerClient.java @@ -239,7 +239,7 @@ public void announce(AnnounceRequestMessage.RequestEvent event, * ** Verifies the transaction ID of the message before passing it over to - * {@link Announce#handleTrackerAnnounceResponse()}. + * any registered {@link AnnounceResponseListener}. *
* * @param message The message received from the tracker in response to the @@ -352,7 +352,7 @@ private void send(ByteBuffer data) { * * @param attempt The attempt number, used to calculate the timeout for the * receive operation. - * @retun Returns a {@link ByteBuffer} containing the packet data. + * @return Returns a {@link ByteBuffer} containing the packet data. */ private ByteBuffer recv(int attempt) throws IOException, SocketException, SocketTimeoutException { diff --git a/src/main/java/com/turn/ttorrent/common/Torrent.java b/src/main/java/com/turn/ttorrent/common/Torrent.java index e468dc560..de346067c 100644 --- a/src/main/java/com/turn/ttorrent/common/Torrent.java +++ b/src/main/java/com/turn/ttorrent/common/Torrent.java @@ -133,7 +133,6 @@ public TorrentFile(File file, long size) { * BitTorrent specification) and create a Torrent object from it. * * @param torrent The meta-info byte data. - * @param parent The parent directory or location of the torrent files. * @param seeder Whether we'll be seeding for this torrent or not. * @throws IOException When the info dictionary can't be read or * encoded and hashed back to create the torrent's SHA-1 hash. @@ -592,7 +591,7 @@ public static Torrent create(File source, List+ * If an interface name is given, return the first usable IPv4 address for + * that interface. If no interface name is given or if that interface + * doesn't have an IPv4 address, return's localhost address (if IPv4). + *
+ * + *+ * It is understood this makes the client IPv4 only, but it is important to + * remember that most BitTorrent extensions (like compact peer lists from + * trackers and UDP tracker support) are IPv4-only anyway. + *
+ * + * @param iface The network interface name. + * @return A usable IPv4 address as a {@link Inet4Address}. + * @throws UnsupportedAddressTypeException If no IPv4 address was available + * to bind on. + */ + private static Inet4Address getIPv4Address(String iface) + throws SocketException, UnsupportedAddressTypeException, + UnknownHostException { + if (iface != null) { + Enumeration+ * You can use the {@code main()} function of this class to read or create + * torrent files. See usage for details. + *
+ * + * TODO: support multiple announce URLs. + */ + public static void main(String[] args) { + BasicConfigurator.configure(new ConsoleAppender( + new PatternLayout("%-5p: %m%n"))); + + CmdLineParser parser = new CmdLineParser(); + CmdLineParser.Option help = parser.addBooleanOption('h', "help"); + CmdLineParser.Option filename = parser.addStringOption('t', "torrent"); + CmdLineParser.Option create = parser.addBooleanOption('c', "create"); + CmdLineParser.Option announce = parser.addStringOption('a', "announce"); + + try { + parser.parse(args); + } catch (CmdLineParser.OptionException oe) { + System.err.println(oe.getMessage()); + usage(System.err); + System.exit(1); + } + + // Display help and exit if requested + if (Boolean.TRUE.equals((Boolean)parser.getOptionValue(help))) { + usage(System.out); + System.exit(0); + } + + String filenameValue = (String)parser.getOptionValue(filename); + if (filenameValue == null) { + usage(System.err, "Torrent file must be provided!"); + System.exit(1); + } + + Boolean createFlag = (Boolean)parser.getOptionValue(create); + String announceURL = (String)parser.getOptionValue(announce); + + String[] otherArgs = parser.getRemainingArgs(); + + if (Boolean.TRUE.equals(createFlag) && + (otherArgs.length != 1 || announceURL == null)) { + usage(System.err, "Announce URL and a file or directory must be " + + "provided to create a torrent file!"); + System.exit(1); + } + + OutputStream fos = null; + try { + if (Boolean.TRUE.equals(createFlag)) { + if (filenameValue != null) { + fos = new FileOutputStream(filenameValue); + } else { + fos = System.out; + } + + URI announceURI = new URI(announceURL); + File source = new File(otherArgs[0]); + if (!source.exists() || !source.canRead()) { + throw new IllegalArgumentException( + "Cannot access source file or directory " + + source.getName()); + } + + String creator = String.format("%s (ttorrent)", + System.getProperty("user.name")); + + Torrent torrent = null; + if (source.isDirectory()) { + File[] files = source.listFiles(); + Arrays.sort(files); + torrent = Torrent.create(source, Arrays.asList(files), + announceURI, creator); + } else { + torrent = Torrent.create(source, announceURI, creator); + } + + torrent.save(fos); + } else { + Torrent.load(new File(filenameValue), true); + } + } catch (Exception e) { + logger.error("{}", e.getMessage(), e); + System.exit(2); + } finally { + if (fos != null && fos != System.out) { + try { + fos.close(); + } catch (IOException ioe) { + } + } + } + } +} diff --git a/src/main/java/com/turn/ttorrent/cli/TrackerMain.java b/src/main/java/com/turn/ttorrent/cli/TrackerMain.java new file mode 100644 index 000000000..bd3ffa473 --- /dev/null +++ b/src/main/java/com/turn/ttorrent/cli/TrackerMain.java @@ -0,0 +1,118 @@ +/** + * Copyright (C) 2011-2013 Turn, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.turn.ttorrent.cli; + +import com.turn.ttorrent.tracker.TrackedTorrent; +import com.turn.ttorrent.tracker.Tracker; + +import java.io.File; +import java.io.FilenameFilter; +import java.io.PrintStream; +import java.net.InetSocketAddress; + +import jargs.gnu.CmdLineParser; +import org.apache.log4j.BasicConfigurator; +import org.apache.log4j.ConsoleAppender; +import org.apache.log4j.PatternLayout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Command-line entry-point for starting a {@link Tracker} + */ +public class TrackerMain { + + private static final Logger logger = + LoggerFactory.getLogger(TrackerMain.class); + + /** + * Display program usage on the given {@link PrintStream}. + */ + private static void usage(PrintStream s) { + s.println("usage: Tracker [options] [directory]"); + s.println(); + s.println("Available options:"); + s.println(" -h,--help Show this help and exit."); + s.println(" -p,--port PORT Bind to port PORT."); + s.println(); + } + + /** + * Main function to start a tracker. + */ + public static void main(String[] args) { + BasicConfigurator.configure(new ConsoleAppender( + new PatternLayout("%d [%-25t] %-5p: %m%n"))); + + CmdLineParser parser = new CmdLineParser(); + CmdLineParser.Option help = parser.addBooleanOption('h', "help"); + CmdLineParser.Option port = parser.addIntegerOption('p', "port"); + + try { + parser.parse(args); + } catch (CmdLineParser.OptionException oe) { + System.err.println(oe.getMessage()); + usage(System.err); + System.exit(1); + } + + // Display help and exit if requested + if (Boolean.TRUE.equals((Boolean)parser.getOptionValue(help))) { + usage(System.out); + System.exit(0); + } + + Integer portValue = (Integer)parser.getOptionValue(port, + Integer.valueOf(Tracker.DEFAULT_TRACKER_PORT)); + + String[] otherArgs = parser.getRemainingArgs(); + + if (otherArgs.length > 1) { + usage(System.err); + System.exit(1); + } + + // Get directory from command-line argument or default to current + // directory + String directory = otherArgs.length > 0 + ? otherArgs[0] + : "."; + + FilenameFilter filter = new FilenameFilter() { + @Override + public boolean accept(File dir, String name) { + return name.endsWith(".torrent"); + } + }; + + try { + Tracker t = new Tracker(new InetSocketAddress(portValue.intValue())); + + File parent = new File(directory); + for (File f : parent.listFiles(filter)) { + logger.info("Loading torrent from " + f.getName()); + t.announce(TrackedTorrent.load(f)); + } + + logger.info("Starting tracker with {} announced torrents...", + t.getTrackedTorrents().size()); + t.start(); + } catch (Exception e) { + logger.error("{}", e.getMessage(), e); + System.exit(2); + } + } +} diff --git a/src/main/java/com/turn/ttorrent/client/Client.java b/src/main/java/com/turn/ttorrent/client/Client.java index f16d23c71..00325f078 100644 --- a/src/main/java/com/turn/ttorrent/client/Client.java +++ b/src/main/java/com/turn/ttorrent/client/Client.java @@ -19,26 +19,19 @@ import com.turn.ttorrent.client.announce.AnnounceException; import com.turn.ttorrent.client.announce.AnnounceResponseListener; import com.turn.ttorrent.client.peer.PeerActivityListener; +import com.turn.ttorrent.client.peer.SharingPeer; import com.turn.ttorrent.common.Peer; import com.turn.ttorrent.common.Torrent; import com.turn.ttorrent.common.protocol.PeerMessage; import com.turn.ttorrent.common.protocol.TrackerMessage; -import com.turn.ttorrent.client.peer.SharingPeer; -import java.io.File; import java.io.IOException; -import java.io.PrintStream; -import java.net.Inet4Address; import java.net.InetAddress; -import java.net.NetworkInterface; -import java.net.SocketException; import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; -import java.nio.channels.UnsupportedAddressTypeException; import java.util.BitSet; import java.util.Comparator; -import java.util.Enumeration; import java.util.HashSet; import java.util.List; import java.util.Observable; @@ -51,11 +44,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import jargs.gnu.CmdLineParser; - -import org.apache.log4j.BasicConfigurator; -import org.apache.log4j.ConsoleAppender; -import org.apache.log4j.PatternLayout; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -99,9 +87,6 @@ public class Client extends Observable implements Runnable, private static final int RATE_COMPUTATION_ITERATIONS = 2; private static final int MAX_DOWNLOADERS_UNCHOKE = 4; - /** Default data output directory. */ - private static final String DEFAULT_OUTPUT_DIRECTORY = "/tmp"; - public enum ClientState { WAITING, VALIDATING, @@ -985,12 +970,12 @@ private synchronized void seed() { * * @author mpetazzoni */ - private static class ClientShutdown extends TimerTask { + public static class ClientShutdown extends TimerTask { private final Client client; private final Timer timer; - ClientShutdown(Client client, Timer timer) { + public ClientShutdown(Client client, Timer timer) { this.client = client; this.timer = timer; } @@ -1003,130 +988,4 @@ public void run() { } } }; - - /** - * Display program usage on the given {@link PrintStream}. - */ - private static void usage(PrintStream s) { - s.println("usage: Client [options]- * If an interface name is given, return the first usable IPv4 address for - * that interface. If no interface name is given or if that interface - * doesn't have an IPv4 address, return's localhost address (if IPv4). - *
- * - *- * It is understood this makes the client IPv4 only, but it is important to - * remember that most BitTorrent extensions (like compact peer lists from - * trackers and UDP tracker support) are IPv4-only anyway. - *
- * - * @param iface The network interface name. - * @return A usable IPv4 address as a {@link Inet4Address}. - * @throws UnsupportedAddressTypeException If no IPv4 address was available - * to bind on. - */ - private static Inet4Address getIPv4Address(String iface) - throws SocketException, UnsupportedAddressTypeException, - UnknownHostException { - if (iface != null) { - Enumeration- * You can use the {@code main()} function of this {@link Torrent} class to - * read or create torrent files. See usage for details. - *
- * - * TODO: support multiple announce URLs. - */ - public static void main(String[] args) { - BasicConfigurator.configure(new ConsoleAppender( - new PatternLayout("%-5p: %m%n"))); - - CmdLineParser parser = new CmdLineParser(); - CmdLineParser.Option help = parser.addBooleanOption('h', "help"); - CmdLineParser.Option filename = parser.addStringOption('t', "torrent"); - CmdLineParser.Option create = parser.addBooleanOption('c', "create"); - CmdLineParser.Option announce = parser.addStringOption('a', "announce"); - - try { - parser.parse(args); - } catch (CmdLineParser.OptionException oe) { - System.err.println(oe.getMessage()); - usage(System.err); - System.exit(1); - } - - // Display help and exit if requested - if (Boolean.TRUE.equals((Boolean)parser.getOptionValue(help))) { - usage(System.out); - System.exit(0); - } - - String filenameValue = (String)parser.getOptionValue(filename); - if (filenameValue == null) { - usage(System.err, "Torrent file must be provided!"); - System.exit(1); - } - - Boolean createFlag = (Boolean)parser.getOptionValue(create); - String announceURL = (String)parser.getOptionValue(announce); - - String[] otherArgs = parser.getRemainingArgs(); - - if (Boolean.TRUE.equals(createFlag) && - (otherArgs.length != 1 || announceURL == null)) { - usage(System.err, "Announce URL and a file or directory must be " + - "provided to create a torrent file!"); - System.exit(1); - } - - OutputStream fos = null; - try { - if (Boolean.TRUE.equals(createFlag)) { - if (filenameValue != null) { - fos = new FileOutputStream(filenameValue); - } else { - fos = System.out; - } - - URI announceURI = new URI(announceURL); - File source = new File(otherArgs[0]); - if (!source.exists() || !source.canRead()) { - throw new IllegalArgumentException( - "Cannot access source file or directory " + - source.getName()); - } - - String creator = String.format("%s (ttorrent)", - System.getProperty("user.name")); - - Torrent torrent = null; - if (source.isDirectory()) { - File[] files = source.listFiles(); - Arrays.sort(files); - torrent = Torrent.create(source, Arrays.asList(files), - announceURI, creator); - } else { - torrent = Torrent.create(source, announceURI, creator); - } - - torrent.save(fos); - } else { - Torrent.load(new File(filenameValue), true); - } - } catch (Exception e) { - logger.error("{}", e.getMessage(), e); - System.exit(2); - } finally { - if (fos != null && fos != System.out) { - try { - fos.close(); - } catch (IOException ioe) { - } - } - } - } } diff --git a/src/main/java/com/turn/ttorrent/tracker/Tracker.java b/src/main/java/com/turn/ttorrent/tracker/Tracker.java index 6ba05d8b1..638fae259 100644 --- a/src/main/java/com/turn/ttorrent/tracker/Tracker.java +++ b/src/main/java/com/turn/ttorrent/tracker/Tracker.java @@ -17,30 +17,21 @@ import com.turn.ttorrent.common.Torrent; -import java.io.File; -import java.io.FilenameFilter; import java.io.IOException; -import java.io.PrintStream; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.MalformedURLException; import java.net.URL; +import java.util.Collection; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import jargs.gnu.CmdLineParser; - -import org.apache.log4j.BasicConfigurator; -import org.apache.log4j.ConsoleAppender; -import org.apache.log4j.PatternLayout; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import org.simpleframework.transport.connect.Connection; import org.simpleframework.transport.connect.SocketConnection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * BitTorrent tracker. @@ -180,6 +171,13 @@ public void stop() { } } + /** + * Returns the list of tracker's torrents + */ + public Collection
@@ -82,7 +76,6 @@ public class SharedTorrent extends Torrent implements PeerActivityListener {
*/
private static final float ENG_GAME_COMPLETION_RATIO = 0.95f;
- private Random random;
private boolean stop;
private long uploaded;
@@ -99,6 +92,7 @@ public class SharedTorrent extends Torrent implements PeerActivityListener {
private SortedSet
+ * This will recreate a SharedTorrent object from the provided Torrent
+ * object's encoded meta-info data.
+ * null
if no piece is interesting
+ */
+ Piece choosePiece(SortedSet> announceList,
+ public static Torrent create(File source, int pieceLength, List
> announceList,
String createdBy) throws InterruptedException, IOException {
- return Torrent.create(source, null, null, announceList, createdBy);
+ return Torrent.create(source, null, pieceLength,
+ null, announceList, createdBy);
}
/**
@@ -572,10 +578,11 @@ public static Torrent create(File source, List
> announceList,
* @param createdBy The creator's name, or any string identifying the
* torrent's creator.
*/
- public static Torrent create(File source, List
> announceList, String createdBy)
throws InterruptedException, IOException {
- return Torrent.create(source, files, null, announceList, createdBy);
+ return Torrent.create(source, files, pieceLength,
+ null, announceList, createdBy);
}
/**
@@ -597,8 +604,8 @@ public static Torrent create(File source, List
> announceList, String createdBy)
+ private static Torrent create(File parent, List
> announceList, String createdBy)
throws InterruptedException, IOException {
if (files == null || files.isEmpty()) {
logger.info("Creating single-file torrent for {}...",
@@ -630,11 +637,11 @@ private static Torrent create(File parent, List
- * Note: this implementation currently only supports single-file - * torrents. - *
- * * @author mpetazzoni */ public class SharedTorrent extends Torrent implements PeerActivityListener { From 3ce7fab1ff965b5cb78758eca7880f7c6078141c Mon Sep 17 00:00:00 2001 From: Philipp Henkel+ * Starts both incoming and outgoing thread. + *
+ */ + public void start() { + this.in.start(); + this.out.start(); + } + + /** + * Stop the peer exchange. * ** Closes the socket channel and stops both incoming and outgoing threads. *
*/ - public void close() { + public void stop() { this.stop = true; if (this.channel.isConnected()) { diff --git a/core/src/main/java/com/turn/ttorrent/client/peer/SharingPeer.java b/core/src/main/java/com/turn/ttorrent/client/peer/SharingPeer.java index c24244ec6..36e773425 100644 --- a/core/src/main/java/com/turn/ttorrent/client/peer/SharingPeer.java +++ b/core/src/main/java/com/turn/ttorrent/client/peer/SharingPeer.java @@ -270,6 +270,7 @@ public synchronized void bind(SocketChannel channel) throws SocketException { this.exchange = new PeerExchange(this, this.torrent, channel); this.exchange.register(this); + this.exchange.start(); this.download = new Rate(); this.download.reset(); @@ -308,7 +309,7 @@ public void unbind(boolean force) { synchronized (this.exchangeLock) { if (this.exchange != null) { - this.exchange.close(); + this.exchange.stop(); this.exchange = null; } } From 7fc52791f9e70af40c42893c523795eb4cf09ca0 Mon Sep 17 00:00:00 2001 From: Philipp Henkel
* This class does nothing more. All further peer-to-peer communication happens
- * in the {@link com.turn.ttorrent.client.peer.PeerExchange PeerExchange}
- * class.
+ * in the PeerExchange
class.
*
<len=0000>
*/
public static class KeepAliveMessage extends PeerMessage {
@@ -225,7 +225,7 @@ public static KeepAliveMessage craft() {
/**
* Choke message.
*
- * <len=0001><id=0>
*/
public static class ChokeMessage extends PeerMessage {
@@ -253,7 +253,7 @@ public static ChokeMessage craft() {
/**
* Unchoke message.
*
- * <len=0001><id=1>
*/
public static class UnchokeMessage extends PeerMessage {
@@ -281,7 +281,7 @@ public static UnchokeMessage craft() {
/**
* Interested message.
*
- * <len=0001<>id=2>
*/
public static class InterestedMessage extends PeerMessage {
@@ -309,7 +309,7 @@ public static InterestedMessage craft() {
/**
* Not interested message.
*
- * <len=0001><id=3>
*/
public static class NotInterestedMessage extends PeerMessage {
@@ -337,7 +337,7 @@ public static NotInterestedMessage craft() {
/**
* Have message.
*
- * <len=0005><id=4><piece index=xxxx>
*/
public static class HaveMessage extends PeerMessage {
@@ -387,7 +387,7 @@ public String toString() {
/**
* Bitfield message.
*
- * <len=0001+X><id=5><bitfield>
*/
public static class BitfieldMessage extends PeerMessage {
@@ -456,7 +456,7 @@ public String toString() {
/**
* Request message.
*
- * <len=00013><id=6><piece index><block offset><block length>
*/
public static class RequestMessage extends PeerMessage {
@@ -533,7 +533,7 @@ public String toString() {
/**
* Piece message.
*
- * <len=0009+X><id=7><piece index><block offset><block data>
*/
public static class PieceMessage extends PeerMessage {
@@ -605,7 +605,7 @@ public String toString() {
/**
* Cancel message.
*
- * <len=00013><id=8><piece index><block offset><block length>
*/
public static class CancelMessage extends PeerMessage {
diff --git a/core/src/main/java/com/turn/ttorrent/tracker/Tracker.java b/core/src/main/java/com/turn/ttorrent/tracker/Tracker.java
index 638fae259..84bf03e04 100644
--- a/core/src/main/java/com/turn/ttorrent/tracker/Tracker.java
+++ b/core/src/main/java/com/turn/ttorrent/tracker/Tracker.java
@@ -38,8 +38,8 @@
*
* * The tracker usually listens on port 6969 (the standard BitTorrent tracker - * port). Torrents must be registered directly to this tracker with the - * {@link #announce(TrackedTorrent torrent)} method. + * port). Torrents must be registered directly to this tracker with the {@link + * #announce(TrackedTorrent torrent)} method. *
* * @author mpetazzoni From 7dcb78de1cc80e333b6f013aba3025f95a973bf1 Mon Sep 17 00:00:00 2001 From: Maxime Petazzoni+ * This function will read the piece data without checking if the piece has + * been validated. It is simply meant at factoring-in the common read code + * from the validate and read functions. + *
+ * + * @param offset Offset inside this piece where to start reading. + * @param buffer A byte buffer to read the piece data into. + * @throws IllegalArgumentException If offset + length goes over + * the piece boundary. + * @throws IOException If the read can't be completed (I/O error, or EOF + * reached, which can happen if the piece is not complete). + */ + private void _read(long offset, ByteBuffer buffer) throws IOException { + int length = buffer.remaining(); + if (offset + length > this.length) { + throw new IllegalArgumentException("Piece#" + this.index + + " overrun (" + offset + " + " + length + " > " + + this.length + ") !"); + } + + // TODO: remove cast to int when large ByteBuffer support is + // implemented in Java. + int bytes = this.bucket.read(buffer, this.offset + offset); + buffer.rewind(); + buffer.limit(bytes >= 0 ? bytes : 0); + } + /** * Internal piece data read function. * @@ -288,6 +343,14 @@ public int compareTo(Piece other) { (this.index < other.index ? -1 : 1); } + /** + * Release the thread local buffers for validation. + */ + public static void clearValidationBuffers() { + validateByteArray.remove(); + validateByteBuffer.remove(); + } + /** * A {@link Callable} to call the piece validation function. * diff --git a/core/src/main/java/com/turn/ttorrent/client/peer/PeerExchange.java b/core/src/main/java/com/turn/ttorrent/client/peer/PeerExchange.java index fab7a5fa5..a647d292c 100644 --- a/core/src/main/java/com/turn/ttorrent/client/peer/PeerExchange.java +++ b/core/src/main/java/com/turn/ttorrent/client/peer/PeerExchange.java @@ -15,6 +15,7 @@ */ package com.turn.ttorrent.client.peer; +import com.turn.ttorrent.client.Piece; import com.turn.ttorrent.client.SharedTorrent; import com.turn.ttorrent.common.protocol.PeerMessage; import com.turn.ttorrent.common.protocol.PeerMessage.Type; @@ -133,7 +134,7 @@ public PeerExchange(SharingPeer peer, SharedTorrent torrent, } } - + /** * Register a new message listener to receive messages. * @@ -181,7 +182,7 @@ public void start() { this.in.start(); this.out.start(); } - + /** * Stop the peer exchange. * @@ -209,13 +210,13 @@ public void stop() { /** * Abstract Thread subclass that allows conditional rate limiting * forPIECE
messages.
- *
+ *
* * To impose rate limits, we only want to throttle when processing PIECE * messages. All other peer messages should be exchanged as quickly as * possible. *
- * + * * @author ptgoetz */ private abstract class RateLimitThread extends Thread { @@ -226,19 +227,19 @@ private abstract class RateLimitThread extends Thread { /** * Dynamically determines an amount of time to sleep, based on the * average read/write throughput. - * + * *
* The algorithm is functional, but could certainly be improved upon.
* One obvious drawback is that with large changes in
* maxRate
, it will take a while for the sleep time to
* adjust and the throttled rate to "smooth out."
*
* Ideally, it would calculate the optimal sleep time necessary to hit * a desired throughput rather than continuously adjust toward a goal. *
- * + * * @param maxRate the target rate in kB/second. * @param messageSize the size, in bytes, of the last message read/written. * @param message the lastPeerMessage
read/written.
@@ -276,7 +277,7 @@ protected void rateLimit(double maxRate, long messageSize, PeerMessage message)
* parsed and passed to the peer's handleMessage()
method that
* will act based on the message type.
*
- *
+ *
* @author mpetazzoni
*/
private class IncomingThread extends RateLimitThread {
@@ -350,14 +351,14 @@ public void run() {
}
buffer.rewind();
-
+
if (stop) {
// The buffer may contain the type from the last message
// if we were stopped before reading the payload and cause
// BufferUnderflowException in parsing.
break;
}
-
+
try {
PeerMessage message = PeerMessage.parse(buffer, torrent);
logger.trace("Received {} from {}", message, peer);
@@ -383,6 +384,7 @@ public void run() {
} catch (IOException ioe) {
this.handleIOE(ioe);
}
+ Piece.clearValidationBuffers();
}
}
}
diff --git a/core/src/main/java/com/turn/ttorrent/common/Torrent.java b/core/src/main/java/com/turn/ttorrent/common/Torrent.java
index 2b3af8812..8c7a557fd 100644
--- a/core/src/main/java/com/turn/ttorrent/common/Torrent.java
+++ b/core/src/main/java/com/turn/ttorrent/common/Torrent.java
@@ -412,6 +412,14 @@ public static byte[] hash(byte[] data) throws NoSuchAlgorithmException {
return crypt.digest();
}
+ public static byte[] hash(byte[] data, int offset, int length) throws NoSuchAlgorithmException {
+ MessageDigest crypt;
+ crypt = MessageDigest.getInstance("SHA-1");
+ crypt.reset();
+ crypt.update(data, offset, length);
+ return crypt.digest();
+ }
+
/**
* Return an hexadecimal representation of the bytes contained in the
* given string, following the default, expected byte encoding.
@@ -506,7 +514,7 @@ public static Torrent load(File torrent, boolean seeder)
*/
public static Torrent create(File source, URI announce, String createdBy)
throws InterruptedException, IOException, NoSuchAlgorithmException {
- return Torrent.create(source, null, DEFAULT_PIECE_LENGTH,
+ return Torrent.create(source, null, DEFAULT_PIECE_LENGTH,
announce, null, createdBy);
}
@@ -529,7 +537,7 @@ public static Torrent create(File source, URI announce, String createdBy)
*/
public static Torrent create(File parent, List