diff --git a/docs/index.html b/docs/index.html
index 2ce241c0e..d6a711c49 100755
--- a/docs/index.html
+++ b/docs/index.html
@@ -518,6 +518,7 @@ struct torrent_status
state_t state;
float progress;
+ boost::posix_time::time_duration next_announce;
std::size_t total_download;
std::size_t total_upload;
};
@@ -577,6 +578,10 @@ current task is in the state member, it will be one of the following:
+
+next_announce is the time until the torrent will announce itself to the tracker.
+
+
total_download and total_upload is the number of bytes downloaded and
uploaded to all peers, accumulated.
diff --git a/examples/client_test.cpp b/examples/client_test.cpp
index 8d24534bd..c0a7e31a3 100755
--- a/examples/client_test.cpp
+++ b/examples/client_test.cpp
@@ -36,6 +36,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include
#include
+//#include
#include "libtorrent/entry.hpp"
#include "libtorrent/bencode.hpp"
@@ -115,6 +116,16 @@ bool sleep_and_input(char* c)
return false;
}
+void set_cursor(int x, int y)
+{
+ std::cout << "\033[" << y << ";" << x << "H";
+}
+
+void clear()
+{
+ std::cout << "\033[2J";
+}
+
#endif
std::string add_suffix(float val)
@@ -237,6 +248,10 @@ int main(int argc, char* argv[])
<< add_suffix(total_down) << ") " << add_suffix(down) << "/s u:("
<< add_suffix(total_up) << ") " << add_suffix(up) << "/s\n";
+ boost::posix_time::time_duration t = s.next_announce;
+// std::cout << "next announce: " << boost::posix_time::to_simple_string(t) << "\n";
+ std::cout << "next announce: " << t.hours() << ":" << t.minutes() << ":" << t.seconds() << "\n";
+
i->get_download_queue(queue);
for (std::vector::iterator i = queue.begin();
i != queue.end();
@@ -245,7 +260,7 @@ int main(int argc, char* argv[])
std::cout << i->piece_index << ": ";
for (int j = 0; j < i->blocks_in_piece; ++j)
{
- if (i->finished_blocks[j]) std::cout << "#";
+ if (i->finished_blocks[j]) std::cout << "+";
else if (i->requested_blocks[j]) std::cout << "-";
else std::cout << ".";
}
diff --git a/include/libtorrent/peer_connection.hpp b/include/libtorrent/peer_connection.hpp
index 4bc2e6fbb..646084fb5 100755
--- a/include/libtorrent/peer_connection.hpp
+++ b/include/libtorrent/peer_connection.hpp
@@ -81,17 +81,29 @@ namespace libtorrent
// this is the constructor where the we are teh active part. The peer_conenction
// should handshake and verify that the other end has the correct id
- peer_connection(detail::session_impl* ses, torrent* t, boost::shared_ptr s, const peer_id& p);
+ peer_connection(
+ detail::session_impl* ses
+ , selector& sel
+ , torrent* t
+ , boost::shared_ptr s
+ , const peer_id& p);
// with this constructor we have been contacted and we still don't know which torrent the
// connection belongs to
- peer_connection(detail::session_impl* ses, boost::shared_ptr s);
+ peer_connection(
+ detail::session_impl* ses
+ , selector& sel
+ , boost::shared_ptr s);
~peer_connection();
// this adds an announcement in the announcement queue
// it will let the peer know that we have the given piece
- void announce_piece(int index) { m_announce_queue.push_back(index); }
+ void announce_piece(int index)
+ {
+ m_announce_queue.push_back(index);
+ send_buffer_updated();
+ }
// called from the main loop when this connection has any
// work to do.
@@ -122,6 +134,7 @@ namespace libtorrent
void interested();
void not_interested();
void request_block(piece_block block);
+ void cancel_block(piece_block block);
bool is_interesting() const throw() { return m_interesting; }
bool has_choked() const throw() { return m_choked; }
@@ -152,6 +165,7 @@ namespace libtorrent
private:
bool dispatch_message();
+ void send_buffer_updated();
void send_bitfield();
void send_have(int index);
@@ -200,6 +214,7 @@ namespace libtorrent
boost::posix_time::ptime m_last_receive;
boost::posix_time::ptime m_last_sent;
+ selector& m_selector;
boost::shared_ptr m_socket;
torrent* m_torrent;
detail::session_impl* m_ses;
@@ -207,6 +222,15 @@ namespace libtorrent
// and false if we got an incomming connection
bool m_active;
+ // this is true as long as this peer's
+ // socket is added to the selector to
+ // monitor writability. Each time we do
+ // something that generates data to be
+ // sent to this peer, we check this and
+ // if it's not added to the selector we
+ // add it.
+ bool m_added_to_selector;
+
// remote peer's id
peer_id m_peer_id;
@@ -242,6 +266,21 @@ namespace libtorrent
stat m_statistics;
};
+ // this is called each time this peer generates some
+ // data to be sent. It will add this socket to
+ // the writibility monitor in the selector.
+ inline void peer_connection::send_buffer_updated()
+ {
+ if (!has_data()) return;
+
+ if (!m_added_to_selector)
+ {
+ m_selector.monitor_writability(m_socket);
+ m_added_to_selector = true;
+ }
+ assert(m_added_to_selector);
+ assert(m_selector.is_writability_monitored(m_socket));
+ }
}
#endif // TORRENT_PEER_CONNECTION_HPP_INCLUDED
diff --git a/include/libtorrent/peer_id.hpp b/include/libtorrent/peer_id.hpp
index c9c15a616..a13840c0f 100755
--- a/include/libtorrent/peer_id.hpp
+++ b/include/libtorrent/peer_id.hpp
@@ -92,7 +92,7 @@ namespace libtorrent
os << std::hex << std::setw(2) << std::setfill('0')
<< static_cast(*i);
}
- os << std::dec << std::cout << std::setfill(' ');
+ os << std::dec << std::setfill(' ');
return os;
}
diff --git a/include/libtorrent/piece_picker.hpp b/include/libtorrent/piece_picker.hpp
index e368b2489..8aa4cf90b 100755
--- a/include/libtorrent/piece_picker.hpp
+++ b/include/libtorrent/piece_picker.hpp
@@ -54,6 +54,12 @@ namespace libtorrent
{}
int piece_index;
int block_index;
+ bool operator==(const piece_block& b) const
+ { return piece_index == b.piece_index && block_index == b.block_index; }
+
+ bool operator!=(const piece_block& b) const
+ { return piece_index != b.piece_index || block_index != b.block_index; }
+
};
class piece_picker
@@ -133,9 +139,11 @@ namespace libtorrent
// or if it already has been successfully downlloaded
bool is_downloading(piece_block block) const;
+ bool is_finished(piece_block block) const;
+
// marks this piece-block as queued for downloading
void mark_as_downloading(piece_block block, const peer_id& peer);
- void mark_as_finished(piece_block block);
+ void mark_as_finished(piece_block block, const peer_id& peer);
// if a piece had a hash-failure, it must be restured and
// made available for redownloading
diff --git a/include/libtorrent/torrent.hpp b/include/libtorrent/torrent.hpp
index d93fb453e..14a5e3b32 100755
--- a/include/libtorrent/torrent.hpp
+++ b/include/libtorrent/torrent.hpp
@@ -152,8 +152,14 @@ namespace libtorrent
// the given peer_id (should be kept at max 1)
int num_connections(const peer_id& id) const;
- std::vector::const_iterator begin() const { return m_connections.begin(); }
- std::vector::const_iterator end() const { return m_connections.end(); }
+ typedef std::vector::iterator peer_iterator;
+ typedef std::vector::const_iterator peer_const_iterator;
+
+ peer_const_iterator begin() const { return m_connections.begin(); }
+ peer_const_iterator end() const { return m_connections.end(); }
+
+ peer_iterator begin() { return m_connections.begin(); }
+ peer_iterator end() { return m_connections.end(); }
// --------------------------------------------
@@ -165,13 +171,13 @@ namespace libtorrent
void tracker_request_timed_out()
{
- std::cout << "TRACKER TIMED OUT\n";
+ std::cerr << "TRACKER TIMED OUT\n";
try_next_tracker();
}
void tracker_request_error(const char* str)
{
- std::cout << "TRACKER ERROR: " << str << "\n";
+ std::cerr << "TRACKER ERROR: " << str << "\n";
try_next_tracker();
}
@@ -179,6 +185,8 @@ namespace libtorrent
// to the tracker
std::string generate_tracker_request(int port);
+ boost::posix_time::ptime next_announce() const
+ { return m_next_request; }
// --------------------------------------------
// PIECE MANAGEMENT
@@ -216,10 +224,6 @@ namespace libtorrent
logger* spawn_logger(const char* title);
#endif
- // the number of blocks downloaded
- // that hasn't been verified yet
- int m_unverified_blocks;
-
private:
void try_next_tracker();
diff --git a/include/libtorrent/torrent_handle.hpp b/include/libtorrent/torrent_handle.hpp
index a20e2f7c6..fe979cec5 100755
--- a/include/libtorrent/torrent_handle.hpp
+++ b/include/libtorrent/torrent_handle.hpp
@@ -34,6 +34,7 @@ POSSIBILITY OF SUCH DAMAGE.
#define TORRENT_TORRENT_HANDLE_HPP_INCLUDED
#include
+#include
#include "libtorrent/peer_id.hpp"
#include "libtorrent/peer_info.hpp"
@@ -50,7 +51,8 @@ namespace libtorrent
struct duplicate_torrent: std::exception
{
- virtual const char* what() const { return "torrent already exists in session"; }
+ virtual const char* what() const throw()
+ { return "torrent already exists in session"; }
};
struct torrent_status
@@ -66,6 +68,7 @@ namespace libtorrent
state_t state;
float progress;
+ boost::posix_time::time_duration next_announce;
std::size_t total_download;
std::size_t total_upload;
};
@@ -84,17 +87,15 @@ namespace libtorrent
struct torrent_handle
{
friend class session;
-
torrent_handle(): m_ses(0) {}
- void get_peer_info(std::vector& v);
void abort();
+ void get_peer_info(std::vector& v);
torrent_status status() const;
-
void get_download_queue(std::vector& queue) const;
- // TODO: add a 'time to next announce' query.
+ // TODO: add force reannounce
private:
diff --git a/src/peer_connection.cpp b/src/peer_connection.cpp
index e75926633..b79236708 100755
--- a/src/peer_connection.cpp
+++ b/src/peer_connection.cpp
@@ -71,17 +71,24 @@ namespace
}
-libtorrent::peer_connection::peer_connection(detail::session_impl* ses, torrent* t, boost::shared_ptr s, const peer_id& p)
+libtorrent::peer_connection::peer_connection(
+ detail::session_impl* ses
+ , selector& sel
+ , torrent* t
+ , boost::shared_ptr s
+ , const peer_id& p)
: m_state(read_protocol_length)
, m_timeout(120)
, m_packet_size(1)
, m_recv_pos(0)
, m_last_receive(std::time(0))
, m_last_sent(std::time(0))
+ , m_selector(sel)
, m_socket(s)
, m_torrent(t)
, m_ses(ses)
, m_active(true)
+ , m_added_to_selector(false)
, m_peer_id(p)
, m_peer_interested(false)
, m_peer_choked(true)
@@ -107,17 +114,22 @@ libtorrent::peer_connection::peer_connection(detail::session_impl* ses, torrent*
send_bitfield();
}
-libtorrent::peer_connection::peer_connection(detail::session_impl* ses, boost::shared_ptr s)
+libtorrent::peer_connection::peer_connection(
+ detail::session_impl* ses
+ , selector& sel
+ , boost::shared_ptr s)
: m_state(read_protocol_length)
, m_timeout(120)
, m_packet_size(1)
, m_recv_pos(0)
, m_last_receive(std::time(0))
, m_last_sent(std::time(0))
+ , m_selector(sel)
, m_socket(s)
, m_torrent(0)
, m_ses(ses)
, m_active(false)
+ , m_added_to_selector(false)
, m_peer_id()
, m_peer_interested(false)
, m_peer_choked(true)
@@ -161,6 +173,7 @@ void libtorrent::peer_connection::send_handshake()
(*m_logger) << m_socket->sender().as_string() << " ==> HANDSHAKE\n";
#endif
+ send_buffer_updated();
}
bool libtorrent::peer_connection::dispatch_message()
@@ -305,6 +318,8 @@ bool libtorrent::peer_connection::dispatch_message()
r.length = read_int(&m_recv_buffer[9]);
m_requests.push_back(r);
+ send_buffer_updated();
+
#if defined(TORRENT_VERBOSE_LOGGING)
(*m_logger) << m_socket->sender().as_string() << " <== REQUEST [ piece: " << r.piece << " | s: " << r.start << " | l: " << r.length << " ]\n";
#endif
@@ -351,7 +366,7 @@ bool libtorrent::peer_connection::dispatch_message()
#endif
return false;
}
-
+/*
piece_block req = m_download_queue.front();
if (req.piece_index != index)
{
@@ -368,31 +383,48 @@ bool libtorrent::peer_connection::dispatch_message()
#endif
return false;
}
-
- m_receiving_piece.open(m_torrent->filesystem(), index, piece_file::out, offset);
-
+*/
#if defined(TORRENT_VERBOSE_LOGGING)
(*m_logger) << m_socket->sender().as_string() << " <== PIECE [ piece: " << index << " | s: " << offset << " | l: " << len << " ]\n";
#endif
- m_receiving_piece.write(&m_recv_buffer[9], len);
m_torrent->downloaded_bytes(len);
piece_picker& picker = m_torrent->picker();
piece_block block_finished(index, offset / m_torrent->block_size());
- picker.mark_as_finished(block_finished);
- // pop the request that just finished
- // from the download queue
- m_download_queue.erase(m_download_queue.begin());
- m_torrent->m_unverified_blocks++;
+ std::vector::iterator b
+ = std::find(
+ m_download_queue.begin()
+ , m_download_queue.end()
+ , block_finished);
+
+ if (b != m_download_queue.end())
+ {
+ // pop the request that just finished
+ // from the download queue
+ m_download_queue.erase(b);
+ }
+ else
+ {
+ // TODO: cancel the block from the
+ // peer that has taken over it.
+ }
+
+ if (picker.is_finished(block_finished)) break;
+
+ m_receiving_piece.open(m_torrent->filesystem(), index, piece_file::out, offset);
+ m_receiving_piece.write(&m_recv_buffer[9], len);
+ m_receiving_piece.close();
+
+ picker.mark_as_finished(block_finished, m_peer_id);
// did we just finish the piece?
if (picker.is_piece_finished(index))
{
- m_torrent->m_unverified_blocks -= picker.blocks_in_piece(index);
-
+ m_receiving_piece.open(m_torrent->filesystem(), index, piece_file::in);
bool verified = m_torrent->filesystem()->verify_piece(m_receiving_piece);
+ m_receiving_piece.close();
if (verified)
{
m_torrent->announce_piece(index);
@@ -438,10 +470,15 @@ bool libtorrent::peer_connection::dispatch_message()
m_requests.erase(i);
}
+ if (!has_data() && m_added_to_selector)
+ {
+ m_added_to_selector = false;
+ m_selector.remove_writable(m_socket);
+ }
+
#if defined(TORRENT_VERBOSE_LOGGING)
(*m_logger) << m_socket->sender().as_string() << " <== CANCEL [ piece: " << r.piece << " | s: " << r.start << " | l: " << r.length << " ]\n";
#endif
- m_requests.clear();
break;
}
}
@@ -449,6 +486,55 @@ bool libtorrent::peer_connection::dispatch_message()
return true;
}
+void libtorrent::peer_connection::cancel_block(piece_block block)
+{
+ assert(block.piece_index >= 0);
+ assert(block.piece_index < m_torrent->torrent_file().num_pieces());
+ assert(m_torrent->picker().is_downloading(block));
+
+ m_torrent->picker().abort_download(block);
+
+ std::vector::iterator i
+ = std::find(m_download_queue.begin(), m_download_queue.end(), block);
+ assert(i != m_download_queue.end());
+
+ m_download_queue.erase(i);
+
+
+ int block_offset = block.block_index * m_torrent->block_size();
+ int block_size
+ = std::min((int)m_torrent->torrent_file().piece_size(block.piece_index)-block_offset,
+ m_torrent->block_size());
+ assert(block_size > 0);
+ assert(block_size <= m_torrent->block_size());
+
+ char buf[] = {0,0,0,13, msg_cancel};
+
+ std::size_t start_offset = m_send_buffer.size();
+ m_send_buffer.resize(start_offset + 17);
+
+ std::copy(buf, buf + 5, m_send_buffer.begin()+start_offset);
+ start_offset +=5;
+
+ // index
+ write_int(block.piece_index, &m_send_buffer[start_offset]);
+ start_offset += 4;
+
+ // begin
+ write_int(block_offset, &m_send_buffer[start_offset]);
+ start_offset += 4;
+
+ // length
+ write_int(block_size, &m_send_buffer[start_offset]);
+ start_offset += 4;
+#if defined(TORRENT_VERBOSE_LOGGING)
+ (*m_logger) << m_socket->sender().as_string() << " ==> CANCEL [ piece: " << block.piece_index << " | s: " << block_offset << " | l: " << block_size << " | " << block.block_index << " ]\n";
+#endif
+ assert(start_offset == m_send_buffer.size());
+
+ send_buffer_updated();
+}
+
void libtorrent::peer_connection::request_block(piece_block block)
{
assert(block.piece_index >= 0);
@@ -493,6 +579,7 @@ void libtorrent::peer_connection::request_block(piece_block block)
#endif
assert(start_offset == m_send_buffer.size());
+ send_buffer_updated();
}
void libtorrent::peer_connection::send_bitfield()
@@ -511,6 +598,7 @@ void libtorrent::peer_connection::send_bitfield()
if (m_torrent->have_piece(i))
m_send_buffer[old_size + 5 + (i>>3)] |= 1 << (7 - (i&7));
}
+ send_buffer_updated();
}
void libtorrent::peer_connection::choke()
@@ -522,6 +610,7 @@ void libtorrent::peer_connection::choke()
#if defined(TORRENT_VERBOSE_LOGGING)
(*m_logger) << m_socket->sender().as_string() << " ==> CHOKE\n";
#endif
+ send_buffer_updated();
}
void libtorrent::peer_connection::unchoke()
@@ -533,6 +622,7 @@ void libtorrent::peer_connection::unchoke()
#if defined(TORRENT_VERBOSE_LOGGING)
(*m_logger) << m_socket->sender().as_string() << " ==> UNCHOKE\n";
#endif
+ send_buffer_updated();
}
void libtorrent::peer_connection::interested()
@@ -544,6 +634,7 @@ void libtorrent::peer_connection::interested()
#if defined(TORRENT_VERBOSE_LOGGING)
(*m_logger) << m_socket->sender().as_string() << " ==> INTERESTED\n";
#endif
+ send_buffer_updated();
}
void libtorrent::peer_connection::not_interested()
@@ -555,6 +646,7 @@ void libtorrent::peer_connection::not_interested()
#if defined(TORRENT_VERBOSE_LOGGING)
(*m_logger) << m_socket->sender().as_string() << " ==> NOT_INTERESTED\n";
#endif
+ send_buffer_updated();
}
void libtorrent::peer_connection::send_have(int index)
@@ -565,6 +657,7 @@ void libtorrent::peer_connection::send_have(int index)
#if defined(TORRENT_VERBOSE_LOGGING)
(*m_logger) << m_socket->sender().as_string() << " ==> HAVE [ piece: " << index << " ]\n";
#endif
+ send_buffer_updated();
}
@@ -784,7 +877,7 @@ bool libtorrent::peer_connection::has_data() const throw()
{
// if we have requests or pending data to be sent or announcements to be made
// we want to send data
- return !m_requests.empty() || !m_send_buffer.empty() || !m_announce_queue.empty();
+ return (!m_requests.empty() && !m_choked) || !m_send_buffer.empty() || !m_announce_queue.empty();
}
// --------------------------
@@ -794,11 +887,17 @@ bool libtorrent::peer_connection::has_data() const throw()
// throws exception when the client should be disconnected
void libtorrent::peer_connection::send_data()
{
+ assert(m_socket->is_writable());
assert(has_data());
- // only add new piece-chunks if the send buffer is empty
+ // only add new piece-chunks if the send buffer is small enough
// otherwise there will be no end to how large it will be!
- if (!m_requests.empty() && m_send_buffer.empty() && m_peer_interested && !m_choked)
+ // TODO: make ths a bit better. Don't always read the entire
+ // requested block. Have a limit of how much of the requested
+ // block is actually read at a time.
+ while (!m_requests.empty()
+ && (m_send_buffer.size() < m_torrent->block_size())
+ && !m_choked)
{
peer_request& r = m_requests.front();
@@ -840,23 +939,25 @@ void libtorrent::peer_connection::send_data()
m_sending_piece.read(&m_send_buffer[13], r.length);
#if defined(TORRENT_VERBOSE_LOGGING)
- (*m_logger) << m_socket->sender().as_string() << " ==> PIECE [ idx: " << r.piece << " | s: " << r.start << " | l: " << r.length << " | dest: " << m_socket->sender().as_string() << " ]\n";
+ (*m_logger) << m_socket->sender().as_string() << " ==> PIECE [ idx: " << r.piece << " | s: " << r.start << " | l: " << r.length << " ]\n";
#endif
// let the torrent keep track of how much we have uploaded
m_torrent->uploaded_bytes(r.length);
- m_requests.erase(m_requests.begin());
}
else
{
#if defined(TORRENT_VERBOSE_LOGGING)
- (*m_logger) << m_socket->sender().as_string() << " *** WARNING [ illegal piece request ]\n";
+ (*m_logger) << m_socket->sender().as_string() << " *** WARNING [ illegal piece request idx: " << r.piece << " | s: " << r.start << " | l: " << r.length << " ]\n";
#endif
}
+ m_requests.erase(m_requests.begin());
}
if (!m_announce_queue.empty())
{
- for (std::vector::iterator i = m_announce_queue.begin(); i != m_announce_queue.end(); ++i)
+ for (std::vector::iterator i = m_announce_queue.begin();
+ i != m_announce_queue.end();
+ ++i)
{
// (*m_logger) << "have piece: " << *i << " sent to: " << m_socket->sender().as_string() << "\n";
send_have(*i);
@@ -867,12 +968,15 @@ void libtorrent::peer_connection::send_data()
// send the actual buffer
if (!m_send_buffer.empty())
{
- // we have data that's scheduled for sending
- std::size_t sent = m_socket->send(&m_send_buffer[0], m_send_buffer.size());
-#if defined(TORRENT_VERBOSE_LOGGING)
+ // we have data that's scheduled for sending
+ int sent = m_socket->send(
+ &m_send_buffer[0]
+ , m_send_buffer.size());
+
+ #if defined(TORRENT_VERBOSE_LOGGING)
(*m_logger) << m_socket->sender().as_string() << " ==> SENT [ length: " << sent << " ]\n";
-#endif
+ #endif
if (sent > 0)
{
@@ -882,14 +986,40 @@ void libtorrent::peer_connection::send_data()
// only a part of the buffer could be sent
// remove the part that was sent from the buffer
if (sent == m_send_buffer.size())
+ {
m_send_buffer.clear();
+ }
else
- m_send_buffer.erase(m_send_buffer.begin(), m_send_buffer.begin() + sent);
+ {
+ m_send_buffer.erase(
+ m_send_buffer.begin()
+ , m_send_buffer.begin() + sent);
+ }
+ }
+ else
+ {
+ assert(sent == -1);
+ throw network_error(m_socket->last_error());
}
m_last_sent = boost::posix_time::second_clock::local_time();
}
+ assert(m_added_to_selector);
+ if (!has_data())
+ {
+ m_selector.remove_writable(m_socket);
+ m_added_to_selector = false;
+ }
+#ifndef NDEBUG
+ else
+ {
+ if (m_socket->is_writable())
+ {
+ std::cout << "ERROR\n";
+ }
+ }
+#endif
}
@@ -905,6 +1035,6 @@ void libtorrent::peer_connection::keep_alive()
#if defined(TORRENT_VERBOSE_LOGGING)
(*m_logger) << m_socket->sender().as_string() << " ==> NOP\n";
#endif
-
+ send_buffer_updated();
}
}
diff --git a/src/piece_picker.cpp b/src/piece_picker.cpp
index d24e5d7ec..f19c7da81 100755
--- a/src/piece_picker.cpp
+++ b/src/piece_picker.cpp
@@ -424,7 +424,7 @@ namespace libtorrent
int piece_blocks = std::min(blocks_in_piece(*i), num_blocks);
for (int j = 0; j < piece_blocks; ++j)
{
- interesting_blocks.push_back(piece_block(*i, 0));
+ interesting_blocks.push_back(piece_block(*i, j));
}
num_blocks -= piece_blocks;
if (num_blocks == 0) return num_blocks;
@@ -442,6 +442,8 @@ namespace libtorrent
for (int j = 0; j < num_blocks_in_piece; ++j)
{
+ if (p->finished_blocks[j] == 1) continue;
+
interesting_blocks.push_back(piece_block(*i, j));
if (p->requested_blocks[j] == 0)
{
@@ -483,13 +485,27 @@ namespace libtorrent
return i->requested_blocks[block.block_index];
}
+ bool piece_picker::is_finished(piece_block block) const
+ {
+ assert(block.piece_index < m_piece_map.size());
+ assert(block.block_index < max_blocks_per_piece);
+
+ if (m_piece_map[block.piece_index].index == 0xffffff) return true;
+ if (m_piece_map[block.piece_index].downloading == 0) return false;
+ std::vector::const_iterator i
+ = std::find_if(m_downloads.begin(), m_downloads.end(), has_index(block.piece_index));
+ assert(i != m_downloads.end());
+ return i->finished_blocks[block.block_index];
+ }
+
+
void piece_picker::mark_as_downloading(piece_block block, const peer_id& peer)
{
#ifndef NDEBUG
integrity_check();
#endif
assert(block.piece_index < m_piece_map.size());
- assert(block.block_index < max_blocks_per_piece);
+ assert(block.block_index < blocks_in_piece(block.piece_index));
piece_pos& p = m_piece_map[block.piece_index];
if (p.downloading == 0)
@@ -517,27 +533,67 @@ namespace libtorrent
#endif
}
- void piece_picker::mark_as_finished(piece_block block)
+ void piece_picker::mark_as_finished(piece_block block, const peer_id& peer)
{
#ifndef NDEBUG
integrity_check();
#endif
assert(block.piece_index < m_piece_map.size());
- assert(block.block_index < max_blocks_per_piece);
+ assert(block.block_index < blocks_in_piece(block.piece_index));
+
+ piece_pos& p = m_piece_map[block.piece_index];
+ if (p.downloading == 0)
+ {
+ p.downloading = 1;
+ move(false, p.peer_count, p.index);
+
+ downloading_piece dp;
+ dp.index = block.piece_index;
+ dp.requested_blocks[block.block_index] = 1;
+ dp.finished_blocks[block.block_index] = 1;
+ dp.info[block.block_index].peer = peer;
+ m_downloads.push_back(dp);
+ }
+ else
+ {
+ std::vector::iterator i
+ = std::find_if(m_downloads.begin(), m_downloads.end(), has_index(block.piece_index));
+ assert(i != m_downloads.end());
+ i->info[block.block_index].peer = peer;
+ i->requested_blocks[block.block_index] = 1;
+ i->finished_blocks[block.block_index] = 1;
+ }
+#ifndef NDEBUG
+ integrity_check();
+#endif
+ }
+/*
+ void piece_picker::mark_as_finished(piece_block block, const peer_id& peer)
+ {
+#ifndef NDEBUG
+ integrity_check();
+#endif
+ assert(block.piece_index < m_piece_map.size());
+ assert(block.block_index < blocks_in_piece(block.piece_index));
assert(m_piece_map[block.piece_index].downloading == 1);
std::vector::iterator i
= std::find_if(m_downloads.begin(), m_downloads.end(), has_index(block.piece_index));
assert(i != m_downloads.end());
- assert(i->requested_blocks[block.block_index] == 1);
i->finished_blocks[block.block_index] = 1;
+ // the block may have been requested, then cancled
+ // and requested by a peer that disconnects
+ // that way we can actually receive the piece
+ // without the requested bit is set.
+ i->requested_blocks[block.block_index] = 1;
i->info[block.block_index].num_downloads++;
+ i->info[block.block_index].peer = peer;
#ifndef NDEBUG
integrity_check();
#endif
}
-
+*/
void piece_picker::get_downloaders(std::vector& d, int index)
{
std::vector::iterator i
@@ -569,6 +625,10 @@ namespace libtorrent
std::vector::iterator i
= std::find_if(m_downloads.begin(), m_downloads.end(), has_index(block.piece_index));
assert(i != m_downloads.end());
+
+ if (i->finished_blocks[block.block_index]) return;
+
+ assert(block.block_index < blocks_in_piece(block.piece_index));
assert(i->requested_blocks[block.block_index] == 1);
// clear this block as being downloaded
diff --git a/src/policy.cpp b/src/policy.cpp
index 130646dd5..333846f89 100755
--- a/src/policy.cpp
+++ b/src/policy.cpp
@@ -43,20 +43,54 @@ namespace
{
// we try to maintain 4 requested blocks in the download
// queue
- request_queue = 4
+ request_queue = 16
};
using namespace libtorrent;
+
+
+ // TODO: replace these two functions with std::find_first_of
+ template
+ bool has_intersection(It1 start1, It1 end1, It2 start2, It2 end2)
+ {
+ for (;start1 != end1; ++start1)
+ for (;start2 != end2; ++start2)
+ if (*start1 == *start2) return true;
+ return false;
+ }
+
+ piece_block find_first_common(const std::vector& queue,
+ const std::vector& busy)
+ {
+ for (std::vector::const_reverse_iterator i
+ = queue.rbegin();
+ i != queue.rend();
+ ++i)
+ {
+ for (std::vector::const_iterator j
+ = busy.begin();
+ j != busy.end();
+ ++j)
+ {
+ if ((*j) == (*i)) return *i;
+ }
+ }
+ assert(false);
+ }
+
void request_a_block(torrent& t, peer_connection& c)
{
+ int num_requests = request_queue - c.download_queue().size();
+
+ // if our request queue is already full, we
+ // don't have to make any new requests yet
+ if (num_requests <= 0) return;
+
piece_picker& p = t.picker();
std::vector interesting_pieces;
interesting_pieces.reserve(100);
- int num_requests = request_queue - c.download_queue().size();
- if (num_requests <= 0) num_requests = 1;
-
// picks the interesting pieces from this peer
// the integer is the number of pieces that
// should be guaranteed to be available for download
@@ -85,12 +119,53 @@ namespace
// ok, we found a piece that's not being downloaded
// by somebody else. request it from this peer
c.request_block(*i);
- num_requests++;
- if (num_requests >= request_queue) return;
+ num_requests--;
+ if (num_requests <= 0) return;
}
- // TODO: compare this peer's bandwidth against the
- // ones downloading these pieces (busy_pieces)
+ if (busy_pieces.empty()) return;
+
+ // first look for blocks that are just queued
+ // and not actually sent to us yet
+ // (then we can cancel those and request them
+ // from this peer instead)
+
+ peer_connection* peer = 0;
+ float down_speed = 0.f;
+ // find the peer with the lowest download
+ // speed that also has a piece thatt this
+ // peer could send us
+ for (torrent::peer_iterator i = t.begin();
+ i != t.end();
+ ++i)
+ {
+ const std::vector& queue = (*i)->download_queue();
+ if ((*i)->statistics().down_peak() > down_speed
+ && has_intersection(busy_pieces.begin(),
+ busy_pieces.end(),
+ queue.begin(),
+ queue.end()))
+ {
+ peer = *i;
+ down_speed = (*i)->statistics().down_peak();
+ }
+ }
+
+ assert(peer != 0);
+
+ // this peer doesn't have a faster connection than the
+ // slowest peer. Don't take over any blocks
+ if (c.statistics().down_peak() <= down_speed) return;
+
+ // find a suitable block to take over from this peer
+ piece_block block = find_first_common(peer->download_queue(), busy_pieces);
+ peer->cancel_block(block);
+ c.request_block(block);
+
+ // the one we interrupted may need to request a new piece
+ request_a_block(t, *peer);
+
+ num_requests--;
}
}
diff --git a/src/session.cpp b/src/session.cpp
index ab2fa41d3..1304316f7 100755
--- a/src/session.cpp
+++ b/src/session.cpp
@@ -111,9 +111,6 @@ namespace libtorrent
boost::mutex::scoped_lock l(m_mutex);
if (!t->abort)
{
-#ifndef NDEBUG
- std::cout << "adding torrent to session!\n";
-#endif
boost::mutex::scoped_lock l(m_ses->m_mutex);
m_ses->m_torrents.insert(
@@ -123,7 +120,7 @@ namespace libtorrent
catch(...)
{
#ifndef NDEBUG
- std::cout << "error while checking files\n";
+ std::cerr << "error while checking files\n";
#endif
}
@@ -199,7 +196,8 @@ namespace libtorrent
}
catch(network_error&)
{
- if (m_listen_port > max_port) throw;
+ if (m_listen_port > max_port)
+ throw;
m_listen_port++;
continue;
}
@@ -230,29 +228,15 @@ namespace libtorrent
std::vector > writable_clients;
std::vector > error_clients;
boost::posix_time::ptime timer = boost::posix_time::second_clock::local_time();
-#ifdef TORRENT_DEBUG_SOCKETS
- int num_loops = 0;
-#endif
+
for(;;)
{
-#ifndef NDEBUG
- std::clock_t time__ = std::clock();
-#endif
-
-
// if nothing happens within 500000 microseconds (0.5 seconds)
// do the loop anyway to check if anything else has changed
// (*m_logger) << "sleeping\n";
m_selector.wait(500000, readable_clients, writable_clients, error_clients);
boost::mutex::scoped_lock l(m_mutex);
-#ifdef TORRENT_DEBUG_SOCKETS
- num_loops++;
-#endif
-
-
- assert(readable_clients.size() + writable_clients.size() + error_clients.size() > 0
- || (std::clock() - time__) > CLOCKS_PER_SEC / 3);
// +1 for the listen socket
assert(m_selector.count_read_monitors() == m_connections.size() + 1);
@@ -301,7 +285,7 @@ namespace libtorrent
// s->set_send_bufsize(2048);
// TODO: add some possibility to filter IP:s
- boost::shared_ptr c(new peer_connection(this, s));
+ boost::shared_ptr c(new peer_connection(this, m_selector, s));
m_connections.insert(std::make_pair(s, c));
m_selector.monitor_readability(s);
m_selector.monitor_errors(s);
@@ -356,11 +340,6 @@ namespace libtorrent
assert(p->second->has_data());
// (*m_logger) << "writable: " << p->first->sender().as_string() << "\n";
p->second->send_data();
- // if the peer doesn't have
- // any data left to send, remove it
- // from the writabilty monitor
- if (!p->second->has_data())
- m_selector.remove_writable(p->first);
}
catch(network_error&)
{
@@ -396,8 +375,9 @@ namespace libtorrent
i != m_connections.end();
++i)
{
- if (m_selector.is_writability_monitored(i->first))
- assert(i->second->has_data());
+ assert(i->second->has_data() == m_selector.is_writability_monitored(i->first));
+// if (m_selector.is_writability_monitored(i->first))
+// assert(i->second->has_data());
}
#endif
@@ -450,17 +430,6 @@ namespace libtorrent
// THE SECTION BELOW IS EXECUTED ONCE EVERY SECOND
// ************************
-
-#ifdef TORRENT_DEBUG_SOCKETS
- std::cout << "\nloops: " << num_loops << "\n";
- if (num_loops > 1300)
- {
- int i = 0;
- }
- num_loops = 0;
-#endif
-
-
// do the second_tick() on each connection
// this will update their statistics (download and upload speeds)
// also purge sockets that have timed out
@@ -482,9 +451,6 @@ namespace libtorrent
}
j->second->keep_alive();
-
- if (j->second->has_data() && !m_selector.is_writability_monitored(j->first))
- m_selector.monitor_writability(j->first);
}
// check each torrent for abortion or
@@ -538,11 +504,11 @@ namespace libtorrent
}
catch(const std::exception& e)
{
- std::cout << e.what() << "\n";
+ std::cerr << e.what() << "\n";
}
catch(...)
{
- std::cout << "error\n";
+ std::cerr << "error\n";
}
diff --git a/src/storage.cpp b/src/storage.cpp
index fff10d991..654e19483 100755
--- a/src/storage.cpp
+++ b/src/storage.cpp
@@ -286,6 +286,7 @@ void libtorrent::piece_file::seek_forward(int step)
bool libtorrent::storage::verify_piece(piece_file& file)
{
int index = file.index();
+ assert(index >= 0 && index < m_have_piece.size());
if (m_have_piece[index]) return true;
std::vector buffer(m_torrent_file->piece_size(index));
@@ -353,7 +354,6 @@ void libtorrent::storage::initialize_pieces(torrent* t,
*/
-
// we don't know of any piece we have right now. Initialize
// it to say we don't have anything and fill it in later on.
m_have_piece.resize(m_torrent_file->num_pieces());
@@ -382,10 +382,6 @@ void libtorrent::storage::initialize_pieces(torrent* t,
char zeros[chunksize];
std::fill(zeros, zeros+chunksize, 0);
-#ifndef NDEBUG
- std::cout << "allocating files\n";
-#endif
-
// remember which directories we have created, so
// we don't have to ask the filesystem all the time
std::set created_directories;
@@ -491,10 +487,6 @@ void libtorrent::storage::initialize_pieces(torrent* t,
// std::cout << "\n";
}
-#ifndef NDEBUG
- std::cout << "allocation/checking DONE!\n";
-#endif
-
}
/*
// reads the piece with the given index from disk
diff --git a/src/torrent.cpp b/src/torrent.cpp
index 109cd76f1..49dda7920 100755
--- a/src/torrent.cpp
+++ b/src/torrent.cpp
@@ -143,7 +143,6 @@ namespace libtorrent
, m_bytes_uploaded(0)
, m_bytes_downloaded(0)
, m_torrent_file(torrent_file)
- , m_unverified_blocks(0)
, m_next_request(boost::posix_time::second_clock::local_time())
, m_duration(1800)
, m_policy(new policy(this))
@@ -172,6 +171,7 @@ namespace libtorrent
// connect to random peers from the list
std::random_shuffle(peer_list.begin(), peer_list.end());
+
std::cout << "interval: " << m_duration << "\n";
std::cout << "peers:\n";
for (std::vector::const_iterator i = peer_list.begin();
@@ -179,15 +179,8 @@ namespace libtorrent
++i)
{
std::cout << " " << std::setfill(' ') << std::setw(16) << i->ip
- << " " << std::setw(5) << std::dec << i->port << " ";
- for (const unsigned char* j = i->id.begin();
- j != i->id.end();
- ++j)
- {
- std::cout << std::hex << std::setw(2) << std::setfill('0')
- << static_cast(*j);
- }
- std::cout << std::dec << " " << extract_fingerprint(i->id) << "\n";
+ << " " << std::setw(5) << std::dec << i->port << " "
+ << i->id << " " << extract_fingerprint(i->id) << "\n";
}
std::cout << std::setfill(' ');
@@ -332,7 +325,7 @@ namespace libtorrent
if (p->has_piece(i)) peer_lost(i);
}
- std::cout << p->get_socket()->sender().as_string() << " *** DISCONNECT\n";
+// std::cout << p->get_socket()->sender().as_string() << " *** DISCONNECT\n";
m_policy->connection_closed(*p);
m_connections.erase(i);
@@ -348,14 +341,18 @@ namespace libtorrent
// TODO: the send buffer size should be controllable from the outside
// s->set_send_bufsize(2048);
s->connect(a);
- boost::shared_ptr c(new peer_connection(m_ses, this, s, id));
+ boost::shared_ptr c(new peer_connection(
+ m_ses
+ , m_ses->m_selector
+ , this
+ , s
+ , id));
detail::session_impl::connection_map::iterator p =
m_ses->m_connections.insert(std::make_pair(s, c)).first;
attach_peer(boost::get_pointer(p->second));
- m_ses->m_selector.monitor_writability(s);
m_ses->m_selector.monitor_readability(s);
m_ses->m_selector.monitor_errors(s);
- std::cout << "connecting to: " << a.as_string() << ":" << a.port() << "\n";
+// std::cout << "connecting to: " << a.as_string() << ":" << a.port() << "\n";
}
void torrent::close_all_connections()
@@ -415,7 +412,7 @@ namespace libtorrent
int blocks_per_piece
= m_torrent_file.piece_length() / m_block_size;
- assert(m_unverified_blocks == m_picker.unverified_blocks());
+ int unverified_blocks = m_picker.unverified_blocks();
int blocks_we_have = num_pieces * blocks_per_piece;
const int last_piece = m_torrent_file.num_pieces()-1;
@@ -428,9 +425,12 @@ namespace libtorrent
// TODO: Implement total download and total_upload
st.total_download = 0;
st.total_upload = 0;
- st.progress = (blocks_we_have + m_unverified_blocks)
+ st.progress = (blocks_we_have + unverified_blocks)
/ static_cast(total_blocks);
+ st.next_announce = next_announce()
+ - boost::posix_time::second_clock::local_time();
+
if (num_pieces == p.size())
st.state = torrent_status::seeding;
else
diff --git a/src/torrent_handle.cpp b/src/torrent_handle.cpp
index 9bc1a25d3..da2aa218f 100755
--- a/src/torrent_handle.cpp
+++ b/src/torrent_handle.cpp
@@ -72,6 +72,7 @@ namespace libtorrent
st.total_download = 0;
st.progress = 0.f;
st.state = torrent_status::invalid_handle;
+ st.next_announce = boost::posix_time::time_duration();
return st;
}
@@ -95,6 +96,7 @@ namespace libtorrent
else
st.state = torrent_status::queued_for_checking;
st.progress = d->progress;
+ st.next_announce = boost::posix_time::time_duration();
return st;
}
}
@@ -104,6 +106,7 @@ namespace libtorrent
st.total_download = 0;
st.progress = 0.f;
st.state = torrent_status::invalid_handle;
+ st.next_announce = boost::posix_time::time_duration();
return st;
}