From 8a3d9944163886db52b6d754a77d7f43a3bb7d00 Mon Sep 17 00:00:00 2001 From: Arvid Norberg Date: Sun, 4 Jan 2004 04:29:13 +0000 Subject: [PATCH] *** empty log message *** --- docs/index.html | 10 +- docs/index.rst | 10 +- examples/client_test.cpp | 4 +- include/libtorrent/peer_connection.hpp | 24 +- include/libtorrent/peer_info.hpp | 3 +- src/peer_connection.cpp | 2415 ++++++++++++------------ src/session.cpp | 12 +- src/torrent_handle.cpp | 14 +- 8 files changed, 1315 insertions(+), 1177 deletions(-) diff --git a/docs/index.html b/docs/index.html index 0dcd1d946..c923bf491 100755 --- a/docs/index.html +++ b/docs/index.html @@ -345,6 +345,8 @@ const char* buf; entry e = bdecode(buf, buf + data_size);

Now we just need to know how to retrieve information from the entry.

+

If bdecode() encounters invalid encoded data in the range given to it +it will throw invalid_encoding.

entry

@@ -392,7 +394,7 @@ public:

The integer(), string(), list() and dict() functions are accessorts that return the respecive type. If the entry object isn't of the -type you request, the accessor will throw type_error (which derives from +type you request, the accessor will throw type_error (which derives from std::runtime_error). You can ask an entry for its type through the type() function.

The print() function is there for debug purposes only.

@@ -541,7 +543,7 @@ was started.

set_max_uploads() sets the maximum number of peers that's unchoked at the same time on this torrent. If you set this to -1, there will be no limit.

write_resume_data() takes a non-const reference to a char-vector, that vector will be filled -with the fast-resume data. For more information about hpw fast-resume works, see fast resume.

+with the fast-resume data. For more information about how fast-resume works, see fast resume.

status()

status() will return a structure with information about the status of this @@ -1140,8 +1142,8 @@ for each slot 4 bytes, the number of unfinished pieces for each unfinished piece 4 bytes, index of the unfinished piece - blocks_per_piece / 32 bytes, the bitmask describing which - blocks are finished in this piece. + blocks_per_piece / 8 bytes, the bitmask describing which + blocks are finished in this piece.

diff --git a/docs/index.rst b/docs/index.rst index 638c5065b..75c83ef4f 100755 --- a/docs/index.rst +++ b/docs/index.rst @@ -325,6 +325,8 @@ Or, if you have a raw char buffer:: Now we just need to know how to retrieve information from the ``entry``. +If ``bdecode()`` encounters invalid encoded data in the range given to it +it will throw invalid_encoding_. @@ -375,7 +377,7 @@ or a string. This is its synopsis:: The ``integer()``, ``string()``, ``list()`` and ``dict()`` functions are accessorts that return the respecive type. If the ``entry`` object isn't of the -type you request, the accessor will throw ``type_error`` (which derives from +type you request, the accessor will throw type_error_ (which derives from ``std::runtime_error``). You can ask an ``entry`` for its type through the ``type()`` function. @@ -551,7 +553,7 @@ was started. torrent. If you set this to -1, there will be no limit. ``write_resume_data()`` takes a non-const reference to a char-vector, that vector will be filled -with the fast-resume data. For more information about hpw fast-resume works, see `fast resume`_. +with the fast-resume data. For more information about how fast-resume works, see `fast resume`_. status() ~~~~~~~~ @@ -1211,8 +1213,8 @@ The format of the fast-resume data is as follows, given that all 4 bytes, the number of unfinished pieces for each unfinished piece 4 bytes, index of the unfinished piece - blocks_per_piece / 32 bytes, the bitmask describing which - blocks are finished in this piece. + blocks_per_piece / 8 bytes, the bitmask describing which + blocks are finished in this piece. Feedback ======== diff --git a/examples/client_test.cpp b/examples/client_test.cpp index 630514844..ede34a098 100755 --- a/examples/client_test.cpp +++ b/examples/client_test.cpp @@ -319,7 +319,9 @@ int main(int argc, char* argv[]) << static_cast((i->flags & peer_info::interesting)?"I":"_") << static_cast((i->flags & peer_info::choked)?"C":"_") << static_cast((i->flags & peer_info::remote_interested)?"i":"_") - << static_cast((i->flags & peer_info::remote_choked)?"c":"_") << "\n"; + << static_cast((i->flags & peer_info::remote_choked)?"c":"_") + << static_cast((i->flags & peer_info::supports_extensions)?"e":"_") + << "\n"; if (i->downloading_piece_index >= 0) { diff --git a/include/libtorrent/peer_connection.hpp b/include/libtorrent/peer_connection.hpp index 663d4ad6a..f3664b1e0 100755 --- a/include/libtorrent/peer_connection.hpp +++ b/include/libtorrent/peer_connection.hpp @@ -298,6 +298,9 @@ namespace libtorrent - m_statistics.total_payload_upload(); } + bool support_extensions() const + { return m_supports_extensions; } + #ifndef NDEBUG boost::shared_ptr m_logger; #endif @@ -310,6 +313,7 @@ namespace libtorrent void send_bitfield(); void send_have(int index); void send_handshake(); + void send_extensions(); // is used during handshake enum state @@ -330,6 +334,7 @@ namespace libtorrent enum message_type { + // standard messages msg_choke = 0, msg_unchoke, msg_interested, @@ -338,7 +343,11 @@ namespace libtorrent msg_bitfield, msg_request, msg_piece, - msg_cancel + msg_cancel, + // extension protocol message + msg_extensions = 20, + // extended messages + msg_gzip_piece }; std::size_t m_packet_size; @@ -420,6 +429,11 @@ namespace libtorrent // we have choked the upload to the peer bool m_choked; + // this is set to true if the handshake from + // the peer indicated that it supports the + // extension protocol + bool m_supports_extensions; + // the pieces the other end have std::vector m_have_piece; @@ -477,6 +491,14 @@ namespace libtorrent // this value. If it sinks below a threshold, its // considered a bad peer and will be banned. int m_trust_points; + + enum extension_index + { + gzip_piece, + num_supported_extensions + }; + static const char* extension_names[num_supported_extensions]; + unsigned char m_extension_messages[num_supported_extensions]; }; // this is called each time this peer generates some diff --git a/include/libtorrent/peer_info.hpp b/include/libtorrent/peer_info.hpp index 3efa71b90..d85b4ad1b 100755 --- a/include/libtorrent/peer_info.hpp +++ b/include/libtorrent/peer_info.hpp @@ -46,7 +46,8 @@ namespace libtorrent interesting = 0x1, choked = 0x2, remote_interested = 0x4, - remote_choked = 0x8 + remote_choked = 0x8, + supports_extensions = 0x10 }; unsigned int flags; address ip; diff --git a/src/peer_connection.cpp b/src/peer_connection.cpp index 5d04a0e41..328dd8a80 100755 --- a/src/peer_connection.cpp +++ b/src/peer_connection.cpp @@ -37,6 +37,8 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/peer_connection.hpp" #include "libtorrent/session.hpp" #include "libtorrent/identify_client.hpp" +#include "libtorrent/entry.hpp" +#include "libtorrent/bencode.hpp" #if defined(_MSC_VER) #define for if (false) {} else for @@ -44,1273 +46,1374 @@ POSSIBILITY OF SUCH DAMAGE. #define VERBOSE -using namespace libtorrent; - -libtorrent::peer_connection::peer_connection( - detail::session_impl& ses - , selector& sel - , torrent* t - , boost::shared_ptr s - , const peer_id& p) - : m_state(read_protocol_length) - , m_timeout(120) - , m_packet_size(1) - , m_recv_pos(0) - , m_last_receive(boost::gregorian::date(std::time(0))) - , m_last_sent(boost::gregorian::date(std::time(0))) - , m_selector(sel) - , m_socket(s) - , m_torrent(t) - , m_attached_to_torrent(true) - , m_ses(ses) - , m_active(true) - , m_added_to_selector(false) - , m_peer_id(p) - , m_peer_interested(false) - , m_peer_choked(true) - , m_interesting(false) - , m_choked(true) - , m_free_upload(0) - , m_send_quota(100) - , m_send_quota_left(100) - , m_send_quota_limit(100) - , m_trust_points(0) +namespace libtorrent { - assert(!m_socket->is_blocking()); - assert(m_torrent != 0); -#ifndef NDEBUG - m_logger = m_ses.create_log(s->sender().as_string().c_str()); -#endif + // the names of the extensions to look for in + // the extensions-message + const char* peer_connection::extension_names[] = + { "gzip" }; - send_handshake(); - - // start in the state where we are trying to read the - // handshake from the other side - m_recv_buffer.resize(1); - - // assume the other end has no pieces - m_have_piece.resize(m_torrent->torrent_file().num_pieces()); - std::fill(m_have_piece.begin(), m_have_piece.end(), false); - - send_bitfield(); -} - -libtorrent::peer_connection::peer_connection( - detail::session_impl& ses - , selector& sel - , boost::shared_ptr s) - : m_state(read_protocol_length) - , m_timeout(120) - , m_packet_size(1) - , m_recv_pos(0) - , m_last_receive(boost::gregorian::date(std::time(0))) - , m_last_sent(boost::gregorian::date(std::time(0))) - , m_selector(sel) - , m_socket(s) - , m_torrent(0) - , m_attached_to_torrent(0) - , m_ses(ses) - , m_active(false) - , m_added_to_selector(false) - , m_peer_id() - , m_peer_interested(false) - , m_peer_choked(true) - , m_interesting(false) - , m_choked(true) - , m_free_upload(0) - , m_send_quota(100) - , m_send_quota_left(100) - , m_send_quota_limit(100) - , m_trust_points(0) -{ - assert(!m_socket->is_blocking()); - -#ifndef NDEBUG - m_logger = m_ses.create_log(s->sender().as_string().c_str()); -#endif - - // we are not attached to any torrent yet. - // we have to wait for the handshake to see - // which torrent the connector want's to connect to - - // start in the state where we are trying to read the - // handshake from the other side - m_recv_buffer.resize(1); -} - -libtorrent::peer_connection::~peer_connection() -{ - m_selector.remove(m_socket); - if (m_attached_to_torrent) + peer_connection::peer_connection( + detail::session_impl& ses + , selector& sel + , torrent* t + , boost::shared_ptr s + , const peer_id& p) + : m_state(read_protocol_length) + , m_timeout(120) + , m_packet_size(1) + , m_recv_pos(0) + , m_last_receive(boost::gregorian::date(std::time(0))) + , m_last_sent(boost::gregorian::date(std::time(0))) + , m_selector(sel) + , m_socket(s) + , m_torrent(t) + , m_attached_to_torrent(true) + , m_ses(ses) + , m_active(true) + , m_added_to_selector(false) + , m_peer_id(p) + , m_peer_interested(false) + , m_peer_choked(true) + , m_interesting(false) + , m_choked(true) + , m_supports_extensions(false) + , m_free_upload(0) + , m_send_quota(100) + , m_send_quota_left(100) + , m_send_quota_limit(100) + , m_trust_points(0) { + assert(!m_socket->is_blocking()); assert(m_torrent != 0); - m_torrent->remove_peer(this); + + #ifndef NDEBUG + m_logger = m_ses.create_log(s->sender().as_string().c_str()); + #endif + + // initialize the extension list to zero, since + // we don't know which extensions the other + // end supports yet + std::fill(m_extension_messages, m_extension_messages + num_supported_extensions, 0); + + send_handshake(); + + // start in the state where we are trying to read the + // handshake from the other side + m_recv_buffer.resize(1); + + // assume the other end has no pieces + m_have_piece.resize(m_torrent->torrent_file().num_pieces()); + std::fill(m_have_piece.begin(), m_have_piece.end(), false); + + send_bitfield(); } -} -void libtorrent::peer_connection::set_send_quota(int num_bytes) -{ - assert(num_bytes <= m_send_quota_limit || m_send_quota_limit == -1); - if (num_bytes > m_send_quota_limit && m_send_quota_limit!=-1) num_bytes = m_send_quota_limit; - - m_send_quota = num_bytes; - m_send_quota_left = num_bytes; - send_buffer_updated(); -} - -void libtorrent::peer_connection::send_handshake() -{ - assert(m_send_buffer.size() == 0); - - // add handshake to the send buffer - const char version_string[] = "BitTorrent protocol"; - const int string_len = sizeof(version_string)-1; - int pos = 1; - - m_send_buffer.resize(1 + string_len + 8 + 20 + 20); - - // length of version string - m_send_buffer[0] = string_len; - - // version string itself - std::copy( - version_string - , version_string+string_len - , m_send_buffer.begin()+pos); - pos += string_len; - - // 8 zeroes - std::fill( - m_send_buffer.begin() + pos - , m_send_buffer.begin() + pos + 8 - , 0); - pos += 8; - - // info hash - std::copy( - m_torrent->torrent_file().info_hash().begin() - , m_torrent->torrent_file().info_hash().end() - , m_send_buffer.begin() + pos); - pos += 20; - - // peer id - std::copy( - m_ses.get_peer_id().begin() - , m_ses.get_peer_id().end() - , m_send_buffer.begin() + pos); - -#ifndef NDEBUG - (*m_logger) << m_socket->sender().as_string() << " ==> HANDSHAKE\n"; -#endif - - send_buffer_updated(); -} - -boost::optional libtorrent::peer_connection::downloading_piece() const -{ - // are we currently receiving a 'piece' message? - if (m_state != read_packet - || m_recv_pos < 9 - || m_recv_buffer[0] != msg_piece) - return boost::optional(); - - const char* ptr = &m_recv_buffer[1]; - int piece_index = detail::read_int(ptr); - int offset = detail::read_int(ptr); - int len = m_packet_size - 9; - - // is any of the piece message header data invalid? - // TODO: make sure that len is == block_size or less only - // if its's the last block. - if (piece_index < 0 - || piece_index >= m_torrent->torrent_file().num_pieces() - || offset < 0 - || offset + len > m_torrent->torrent_file().piece_size(piece_index) - || offset % m_torrent->block_size() != 0) - return boost::optional(); - - piece_block_progress p; - - p.piece_index = piece_index; - p.block_index = offset / m_torrent->block_size(); - p.bytes_downloaded = m_recv_pos - 9; - p.full_block_bytes = len; - - return boost::optional(p); -} - -bool libtorrent::peer_connection::dispatch_message(int received) -{ - assert(m_recv_pos >= received); - assert(m_recv_pos > 0); - - int packet_type = m_recv_buffer[0]; - if (packet_type > msg_cancel || packet_type < msg_choke) - throw protocol_error("unknown message id"); - - switch (packet_type) + peer_connection::peer_connection( + detail::session_impl& ses + , selector& sel + , boost::shared_ptr s) + : m_state(read_protocol_length) + , m_timeout(120) + , m_packet_size(1) + , m_recv_pos(0) + , m_last_receive(boost::gregorian::date(std::time(0))) + , m_last_sent(boost::gregorian::date(std::time(0))) + , m_selector(sel) + , m_socket(s) + , m_torrent(0) + , m_attached_to_torrent(0) + , m_ses(ses) + , m_active(false) + , m_added_to_selector(false) + , m_peer_id() + , m_peer_interested(false) + , m_peer_choked(true) + , m_interesting(false) + , m_choked(true) + , m_supports_extensions(false) + , m_free_upload(0) + , m_send_quota(100) + , m_send_quota_left(100) + , m_send_quota_limit(100) + , m_trust_points(0) { + assert(!m_socket->is_blocking()); - // *************** CHOKE *************** - case msg_choke: - if (m_packet_size != 1) - throw protocol_error("'choke' message size != 1"); - m_statistics.received_bytes(0, received); - if (m_recv_pos < m_packet_size) return false; + #ifndef NDEBUG + m_logger = m_ses.create_log(s->sender().as_string().c_str()); + #endif -#ifndef NDEBUG - (*m_logger) << m_socket->sender().as_string() << " <== CHOKE\n"; -#endif - m_peer_choked = true; - m_torrent->get_policy().choked(*this); + // initialize the extension list to zero, since + // we don't know which extensions the other + // end supports yet + std::fill(m_extension_messages, m_extension_messages + num_supported_extensions, 0); - // remove all pieces from this peers download queue and - // remove the 'downloading' flag from piece_picker. - for (std::deque::iterator i = m_download_queue.begin(); - i != m_download_queue.end(); - ++i) + // we are not attached to any torrent yet. + // we have to wait for the handshake to see + // which torrent the connector want's to connect to + + // start in the state where we are trying to read the + // handshake from the other side + m_recv_buffer.resize(1); + } + + peer_connection::~peer_connection() + { + m_selector.remove(m_socket); + if (m_attached_to_torrent) { - m_torrent->picker().abort_download(*i); + assert(m_torrent != 0); + m_torrent->remove_peer(this); } - m_download_queue.clear(); -#ifndef NDEBUG -// m_torrent->picker().integrity_check(m_torrent); -#endif - break; + } + void peer_connection::set_send_quota(int num_bytes) + { + assert(num_bytes <= m_send_quota_limit || m_send_quota_limit == -1); + if (num_bytes > m_send_quota_limit && m_send_quota_limit!=-1) num_bytes = m_send_quota_limit; + m_send_quota = num_bytes; + m_send_quota_left = num_bytes; + send_buffer_updated(); + } - // *************** UNCHOKE *************** - case msg_unchoke: - if (m_packet_size != 1) - throw protocol_error("'unchoke' message size != 1"); - m_statistics.received_bytes(0, received); - if (m_recv_pos < m_packet_size) return false; + void peer_connection::send_handshake() + { + assert(m_send_buffer.size() == 0); -#ifndef NDEBUG - (*m_logger) << m_socket->sender().as_string() << " <== UNCHOKE\n"; -#endif - m_peer_choked = false; - m_torrent->get_policy().unchoked(*this); - break; + // add handshake to the send buffer + const char version_string[] = "BitTorrent protocol"; + const int string_len = sizeof(version_string)-1; + int pos = 1; + m_send_buffer.resize(1 + string_len + 8 + 20 + 20); - // *************** INTERESTED *************** - case msg_interested: - if (m_packet_size != 1) - throw protocol_error("'interested' message size != 1"); - m_statistics.received_bytes(0, received); - if (m_recv_pos < m_packet_size) return false; + // length of version string + m_send_buffer[0] = string_len; -#ifndef NDEBUG - (*m_logger) << m_socket->sender().as_string() << " <== INTERESTED\n"; -#endif - m_peer_interested = true; - m_torrent->get_policy().interested(*this); - break; + // version string itself + std::copy( + version_string + , version_string+string_len + , m_send_buffer.begin()+pos); + pos += string_len; + // 8 zeroes + std::fill( + m_send_buffer.begin() + pos + , m_send_buffer.begin() + pos + 8 + , 0); + // indicate that we support the extension protocol + m_send_buffer[pos] = 0x80; + pos += 8; - // *************** NOT INTERESTED *************** - case msg_not_interested: - if (m_packet_size != 1) - throw protocol_error("'not interested' message size != 1"); - m_statistics.received_bytes(0, received); - if (m_recv_pos < m_packet_size) return false; + // info hash + std::copy( + m_torrent->torrent_file().info_hash().begin() + , m_torrent->torrent_file().info_hash().end() + , m_send_buffer.begin() + pos); + pos += 20; - // clear the request queue if the client isn't interested - m_requests.clear(); + // peer id + std::copy( + m_ses.get_peer_id().begin() + , m_ses.get_peer_id().end() + , m_send_buffer.begin() + pos); -#ifndef NDEBUG - (*m_logger) << m_socket->sender().as_string() << " <== NOT_INTERESTED\n"; -#endif - m_peer_interested = false; - m_torrent->get_policy().not_interested(*this); - break; + #ifndef NDEBUG + (*m_logger) << m_socket->sender().as_string() << " ==> HANDSHAKE\n"; + #endif + send_buffer_updated(); + } + boost::optional peer_connection::downloading_piece() const + { + // are we currently receiving a 'piece' message? + if (m_state != read_packet + || m_recv_pos < 9 + || m_recv_buffer[0] != msg_piece) + return boost::optional(); - // *************** HAVE *************** - case msg_have: + const char* ptr = &m_recv_buffer[1]; + int piece_index = detail::read_int(ptr); + int offset = detail::read_int(ptr); + int len = m_packet_size - 9; + + // is any of the piece message header data invalid? + // TODO: make sure that len is == block_size or less only + // if its's the last block. + if (piece_index < 0 + || piece_index >= m_torrent->torrent_file().num_pieces() + || offset < 0 + || offset + len > m_torrent->torrent_file().piece_size(piece_index) + || offset % m_torrent->block_size() != 0) + return boost::optional(); + + piece_block_progress p; + + p.piece_index = piece_index; + p.block_index = offset / m_torrent->block_size(); + p.bytes_downloaded = m_recv_pos - 9; + p.full_block_bytes = len; + + return boost::optional(p); + } + + bool peer_connection::dispatch_message(int received) + { + assert(m_recv_pos >= received); + assert(m_recv_pos > 0); + + int packet_type = m_recv_buffer[0]; + if (packet_type > msg_cancel || packet_type < msg_choke) + throw protocol_error("unknown message id"); + + switch (packet_type) { - if (m_packet_size != 5) - throw protocol_error("'have' message size != 5"); + + // *************** CHOKE *************** + case msg_choke: + if (m_packet_size != 1) + throw protocol_error("'choke' message size != 1"); m_statistics.received_bytes(0, received); if (m_recv_pos < m_packet_size) return false; - const char* ptr = &m_recv_buffer[1]; - int index = detail::read_int(ptr); - // if we got an invalid message, abort - if (index >= m_have_piece.size() || index < 0) - throw protocol_error("have message with higher index than the number of pieces"); + #ifndef NDEBUG + (*m_logger) << m_socket->sender().as_string() << " <== CHOKE\n"; + #endif + m_peer_choked = true; + m_torrent->get_policy().choked(*this); -#ifndef NDEBUG - (*m_logger) << m_socket->sender().as_string() << " <== HAVE [ piece: " << index << "]\n"; -#endif - - if (m_have_piece[index]) - { -#ifndef NDEBUG - (*m_logger) << m_socket->sender().as_string() << " oops.. we already knew that: " << index << "\n"; -#endif - } - else - { - m_have_piece[index] = true; - - m_torrent->peer_has(index); - if (!m_torrent->have_piece(index) && !is_interesting()) - m_torrent->get_policy().peer_is_interesting(*this); - } - break; - } - - - - - // *************** BITFIELD *************** - case msg_bitfield: - { - if (m_packet_size - 1 != (m_have_piece.size() + 7) / 8) - throw protocol_error("bitfield with invalid size"); - m_statistics.received_bytes(0, received); - if (m_recv_pos < m_packet_size) return false; - -#ifndef NDEBUG - (*m_logger) << m_socket->sender().as_string() << " <== BITFIELD\n"; -#endif - // build a vector of all pieces - std::vector piece_list; - for (std::size_t i = 0; i < m_have_piece.size(); ++i) - { - bool have = m_recv_buffer[1 + (i>>3)] & (1 << (7 - (i&7))); - if (have && !m_have_piece[i]) - { - m_have_piece[i] = true; - piece_list.push_back(i); - } - else if (!have && m_have_piece[i]) - { - m_have_piece[i] = false; - m_torrent->peer_lost(i); - } - } - - // shuffle the piece list - std::random_shuffle(piece_list.begin(), piece_list.end()); - - // let the torrent know which pieces the - // peer has, in a shuffled order - bool interesting = false; - for (std::vector::iterator i = piece_list.begin(); - i != piece_list.end(); + // remove all pieces from this peers download queue and + // remove the 'downloading' flag from piece_picker. + for (std::deque::iterator i = m_download_queue.begin(); + i != m_download_queue.end(); ++i) { - int index = *i; - m_torrent->peer_has(index); - if (!m_torrent->have_piece(index)) - interesting = true; + m_torrent->picker().abort_download(*i); } - - if (piece_list.empty()) - { -#ifndef NDEBUG - (*m_logger) << m_socket->sender().as_string() << " *** THIS IS A SEED ***\n"; -#endif - } - - if (interesting) m_torrent->get_policy().peer_is_interesting(*this); - + m_download_queue.clear(); + #ifndef NDEBUG + // m_torrent->picker().integrity_check(m_torrent); + #endif break; - } - // *************** REQUEST *************** - case msg_request: - { - if (m_packet_size != 13) - throw protocol_error("'request' message size != 13"); + + // *************** UNCHOKE *************** + case msg_unchoke: + if (m_packet_size != 1) + throw protocol_error("'unchoke' message size != 1"); m_statistics.received_bytes(0, received); if (m_recv_pos < m_packet_size) return false; - peer_request r; - const char* ptr = &m_recv_buffer[1]; - r.piece = detail::read_int(ptr); - r.start = detail::read_int(ptr); - r.length = detail::read_int(ptr); - - // make sure this request - // is legal and taht the peer - // is not choked - if (r.piece >= 0 - && r.piece < m_torrent->torrent_file().num_pieces() - && r.start >= 0 - && r.start < m_torrent->torrent_file().piece_size(r.piece) - && r.length > 0 - && r.length + r.start < m_torrent->torrent_file().piece_size(r.piece) - && !m_choked - && m_peer_interested) - { - m_requests.push_back(r); - send_buffer_updated(); -#ifndef NDEBUG - (*m_logger) << m_socket->sender().as_string() << " <== REQUEST [ piece: " << r.piece << " | s: " << r.start << " | l: " << r.length << " ]\n"; -#endif - } - else - { - // TODO: log this illegal request - // if the only error is that the - // peer is choked, it may not be a - // mistake - } - + #ifndef NDEBUG + (*m_logger) << m_socket->sender().as_string() << " <== UNCHOKE\n"; + #endif + m_peer_choked = false; + m_torrent->get_policy().unchoked(*this); break; - } - - // *************** PIECE *************** - case msg_piece: - { - if (m_recv_pos <= 9) - // only received protocol data - m_statistics.received_bytes(0, received); - else if (m_recv_pos - received >= 9) - // only received payload data - m_statistics.received_bytes(received, 0); - else - { - // received a bit of both - assert(m_recv_pos - received < 9); - assert(m_recv_pos > 9); - assert(9 - (m_recv_pos - received) <= 9); - m_statistics.received_bytes( - m_recv_pos - 9 - , 9 - (m_recv_pos - received)); - } - + // *************** INTERESTED *************** + case msg_interested: + if (m_packet_size != 1) + throw protocol_error("'interested' message size != 1"); + m_statistics.received_bytes(0, received); if (m_recv_pos < m_packet_size) return false; - const char* ptr = &m_recv_buffer[1]; - int index = detail::read_int(ptr); - if (index < 0 || index >= m_torrent->torrent_file().num_pieces()) + #ifndef NDEBUG + (*m_logger) << m_socket->sender().as_string() << " <== INTERESTED\n"; + #endif + m_peer_interested = true; + m_torrent->get_policy().interested(*this); + break; + + + // *************** NOT INTERESTED *************** + case msg_not_interested: + if (m_packet_size != 1) + throw protocol_error("'not interested' message size != 1"); + m_statistics.received_bytes(0, received); + if (m_recv_pos < m_packet_size) return false; + + // clear the request queue if the client isn't interested + m_requests.clear(); + + #ifndef NDEBUG + (*m_logger) << m_socket->sender().as_string() << " <== NOT_INTERESTED\n"; + #endif + m_peer_interested = false; + m_torrent->get_policy().not_interested(*this); + break; + + + + // *************** HAVE *************** + case msg_have: { -#ifndef NDEBUG - (*m_logger) << m_socket->sender().as_string() << " piece index invalid\n"; -#endif - throw protocol_error("invalid piece index in piece message"); - } - int offset = detail::read_int(ptr); - int len = m_packet_size - 9; + if (m_packet_size != 5) + throw protocol_error("'have' message size != 5"); + m_statistics.received_bytes(0, received); + if (m_recv_pos < m_packet_size) return false; - if (offset < 0) - { -#ifndef NDEBUG - (*m_logger) << m_socket->sender().as_string() << " offset < 0\n"; -#endif - throw protocol_error("offset < 0 in piece message"); - } + const char* ptr = &m_recv_buffer[1]; + int index = detail::read_int(ptr); + // if we got an invalid message, abort + if (index >= m_have_piece.size() || index < 0) + throw protocol_error("have message with higher index than the number of pieces"); - if (offset + len > m_torrent->torrent_file().piece_size(index)) - { -#ifndef NDEBUG - (*m_logger) << m_socket->sender().as_string() << " piece packet contains more data than the piece size\n"; -#endif - throw protocol_error("piece message contains more data than the piece size"); - } - // TODO: make sure that len is == block_size or less only - // if its's the last block. + #ifndef NDEBUG + (*m_logger) << m_socket->sender().as_string() << " <== HAVE [ piece: " << index << "]\n"; + #endif - if (offset % m_torrent->block_size() != 0) - { -#ifndef NDEBUG - (*m_logger) << m_socket->sender().as_string() << " piece packet contains unaligned offset\n"; -#endif - throw protocol_error("piece message contains unaligned offset"); - } -/* - piece_block req = m_download_queue.front(); - if (req.piece_index != index) - { -#ifndef NDEBUG - (*m_logger) << m_socket->sender().as_string() << " piece packet contains unrequested index\n"; -#endif - return false; - } - - if (req.block_index != offset / m_torrent->block_size()) - { -#ifndef NDEBUG - (*m_logger) << m_socket->sender().as_string() << " piece packet contains unrequested offset\n"; -#endif - return false; - } -*/ -#ifndef NDEBUG - (*m_logger) << m_socket->sender().as_string() << " <== PIECE [ piece: " << index << " | s: " << offset << " | l: " << len << " ]\n"; -#endif - - piece_picker& picker = m_torrent->picker(); - piece_block block_finished(index, offset / m_torrent->block_size()); - - std::deque::iterator b - = std::find( - m_download_queue.begin() - , m_download_queue.end() - , block_finished); - - if (b != m_download_queue.end()) - { - // pop the request that just finished - // from the download queue - m_download_queue.erase(b); - } - else - { - // TODO: cancel the block from the - // peer that has taken over it. - } - - if (picker.is_finished(block_finished)) break; - - m_torrent->filesystem().write(&m_recv_buffer[9], index, offset, len); - - picker.mark_as_finished(block_finished, m_peer_id); - - m_torrent->get_policy().block_finished(*this, block_finished); - - // did we just finish the piece? - if (picker.is_piece_finished(index)) - { - bool verified = m_torrent->verify_piece(index); - if (verified) + if (m_have_piece[index]) { - m_torrent->announce_piece(index); + #ifndef NDEBUG + (*m_logger) << m_socket->sender().as_string() << " oops.. we already knew that: " << index << "\n"; + #endif } else { - m_torrent->piece_failed(index); + m_have_piece[index] = true; + + m_torrent->peer_has(index); + if (!m_torrent->have_piece(index) && !is_interesting()) + m_torrent->get_policy().peer_is_interesting(*this); } - m_torrent->get_policy().piece_finished(index, verified); + break; } - break; + + + + + // *************** BITFIELD *************** + case msg_bitfield: + { + if (m_packet_size - 1 != (m_have_piece.size() + 7) / 8) + throw protocol_error("bitfield with invalid size"); + m_statistics.received_bytes(0, received); + if (m_recv_pos < m_packet_size) return false; + + #ifndef NDEBUG + (*m_logger) << m_socket->sender().as_string() << " <== BITFIELD\n"; + #endif + // build a vector of all pieces + std::vector piece_list; + for (std::size_t i = 0; i < m_have_piece.size(); ++i) + { + bool have = m_recv_buffer[1 + (i>>3)] & (1 << (7 - (i&7))); + if (have && !m_have_piece[i]) + { + m_have_piece[i] = true; + piece_list.push_back(i); + } + else if (!have && m_have_piece[i]) + { + m_have_piece[i] = false; + m_torrent->peer_lost(i); + } + } + + // shuffle the piece list + std::random_shuffle(piece_list.begin(), piece_list.end()); + + // let the torrent know which pieces the + // peer has, in a shuffled order + bool interesting = false; + for (std::vector::iterator i = piece_list.begin(); + i != piece_list.end(); + ++i) + { + int index = *i; + m_torrent->peer_has(index); + if (!m_torrent->have_piece(index)) + interesting = true; + } + + if (piece_list.empty()) + { + #ifndef NDEBUG + (*m_logger) << m_socket->sender().as_string() << " *** THIS IS A SEED ***\n"; + #endif + } + + if (interesting) m_torrent->get_policy().peer_is_interesting(*this); + + break; + } + + + // *************** EXTENSIONS *************** + case msg_extensions: + { + if (m_packet_size > 100 * 1024) + { + // too big extension message, abort + throw protocol_error("'extensions' message size > 100kB"); + } + m_statistics.received_bytes(0, received); + if (m_recv_pos < m_packet_size) return false; + + try + { + entry e = bdecode(m_recv_buffer.begin()+1, m_recv_buffer.end()); + entry::dictionary_type& extensions = e.dict(); + + for (int i = 0; i < num_supported_extensions; ++i) + { + entry::dictionary_type::iterator f = + extensions.find(extension_names[i]); + if (f != extensions.end()) + { + m_extension_messages[i] = f->second.integer(); + } + } + } + catch(invalid_encoding& e) + { + throw protocol_error("'extensions' packet contains invalid bencoding"); + } + catch(type_error& e) + { + throw protocol_error("'extensions' packet contains incorrect types"); + } + + break; + } + + + // *************** REQUEST *************** + case msg_request: + { + if (m_packet_size != 13) + throw protocol_error("'request' message size != 13"); + m_statistics.received_bytes(0, received); + if (m_recv_pos < m_packet_size) return false; + + peer_request r; + const char* ptr = &m_recv_buffer[1]; + r.piece = detail::read_int(ptr); + r.start = detail::read_int(ptr); + r.length = detail::read_int(ptr); + + // make sure this request + // is legal and taht the peer + // is not choked + if (r.piece >= 0 + && r.piece < m_torrent->torrent_file().num_pieces() + && r.start >= 0 + && r.start < m_torrent->torrent_file().piece_size(r.piece) + && r.length > 0 + && r.length + r.start < m_torrent->torrent_file().piece_size(r.piece) + && !m_choked + && m_peer_interested) + { + m_requests.push_back(r); + send_buffer_updated(); + #ifndef NDEBUG + (*m_logger) << m_socket->sender().as_string() << " <== REQUEST [ piece: " << r.piece << " | s: " << r.start << " | l: " << r.length << " ]\n"; + #endif + } + else + { + // TODO: log this illegal request + // if the only error is that the + // peer is choked, it may not be a + // mistake + } + + break; + } + + + + // *************** PIECE *************** + case msg_piece: + { + if (m_recv_pos <= 9) + // only received protocol data + m_statistics.received_bytes(0, received); + else if (m_recv_pos - received >= 9) + // only received payload data + m_statistics.received_bytes(received, 0); + else + { + // received a bit of both + assert(m_recv_pos - received < 9); + assert(m_recv_pos > 9); + assert(9 - (m_recv_pos - received) <= 9); + m_statistics.received_bytes( + m_recv_pos - 9 + , 9 - (m_recv_pos - received)); + } + + if (m_recv_pos < m_packet_size) return false; + + const char* ptr = &m_recv_buffer[1]; + int index = detail::read_int(ptr); + if (index < 0 || index >= m_torrent->torrent_file().num_pieces()) + { + #ifndef NDEBUG + (*m_logger) << m_socket->sender().as_string() << " piece index invalid\n"; + #endif + throw protocol_error("invalid piece index in piece message"); + } + int offset = detail::read_int(ptr); + int len = m_packet_size - 9; + + if (offset < 0) + { + #ifndef NDEBUG + (*m_logger) << m_socket->sender().as_string() << " offset < 0\n"; + #endif + throw protocol_error("offset < 0 in piece message"); + } + + if (offset + len > m_torrent->torrent_file().piece_size(index)) + { + #ifndef NDEBUG + (*m_logger) << m_socket->sender().as_string() << " piece packet contains more data than the piece size\n"; + #endif + throw protocol_error("piece message contains more data than the piece size"); + } + // TODO: make sure that len is == block_size or less only + // if its's the last block. + + if (offset % m_torrent->block_size() != 0) + { + #ifndef NDEBUG + (*m_logger) << m_socket->sender().as_string() << " piece packet contains unaligned offset\n"; + #endif + throw protocol_error("piece message contains unaligned offset"); + } + /* + piece_block req = m_download_queue.front(); + if (req.piece_index != index) + { + #ifndef NDEBUG + (*m_logger) << m_socket->sender().as_string() << " piece packet contains unrequested index\n"; + #endif + return false; + } + + if (req.block_index != offset / m_torrent->block_size()) + { + #ifndef NDEBUG + (*m_logger) << m_socket->sender().as_string() << " piece packet contains unrequested offset\n"; + #endif + return false; + } + */ + #ifndef NDEBUG + (*m_logger) << m_socket->sender().as_string() << " <== PIECE [ piece: " << index << " | s: " << offset << " | l: " << len << " ]\n"; + #endif + + piece_picker& picker = m_torrent->picker(); + piece_block block_finished(index, offset / m_torrent->block_size()); + + std::deque::iterator b + = std::find( + m_download_queue.begin() + , m_download_queue.end() + , block_finished); + + if (b != m_download_queue.end()) + { + // pop the request that just finished + // from the download queue + m_download_queue.erase(b); + } + else + { + // TODO: cancel the block from the + // peer that has taken over it. + } + + if (picker.is_finished(block_finished)) break; + + m_torrent->filesystem().write(&m_recv_buffer[9], index, offset, len); + + picker.mark_as_finished(block_finished, m_peer_id); + + m_torrent->get_policy().block_finished(*this, block_finished); + + // did we just finish the piece? + if (picker.is_piece_finished(index)) + { + bool verified = m_torrent->verify_piece(index); + if (verified) + { + m_torrent->announce_piece(index); + } + else + { + m_torrent->piece_failed(index); + } + m_torrent->get_policy().piece_finished(index, verified); + } + break; + } + + + // *************** CANCEL *************** + case msg_cancel: + { + if (m_packet_size != 13) + throw protocol_error("'cancel' message size != 13"); + m_statistics.received_bytes(0, received); + if (m_recv_pos < m_packet_size) return false; + + peer_request r; + const char* ptr = &m_recv_buffer[1]; + r.piece = detail::read_int(ptr); + r.start = detail::read_int(ptr); + r.length = detail::read_int(ptr); + + std::deque::iterator i + = std::find(m_requests.begin(), m_requests.end(), r); + if (i != m_requests.end()) + { + m_requests.erase(i); + } + + if (!has_data() && m_added_to_selector) + { + m_added_to_selector = false; + m_selector.remove_writable(m_socket); + } + + #ifndef NDEBUG + (*m_logger) << m_socket->sender().as_string() << " <== CANCEL [ piece: " << r.piece << " | s: " << r.start << " | l: " << r.length << " ]\n"; + #endif + break; + } + } + assert(m_recv_pos == m_packet_size); + return true; + } + + void peer_connection::cancel_block(piece_block block) + { + assert(block.piece_index >= 0); + assert(block.piece_index < m_torrent->torrent_file().num_pieces()); + assert(m_torrent->picker().is_downloading(block)); + + m_torrent->picker().abort_download(block); + + std::deque::iterator i + = std::find(m_download_queue.begin(), m_download_queue.end(), block); + assert(i != m_download_queue.end()); + + m_download_queue.erase(i); + + + int block_offset = block.block_index * m_torrent->block_size(); + int block_size + = std::min((int)m_torrent->torrent_file().piece_size(block.piece_index)-block_offset, + m_torrent->block_size()); + assert(block_size > 0); + assert(block_size <= m_torrent->block_size()); + + char buf[] = {0,0,0,13, msg_cancel}; + + std::size_t start_offset = m_send_buffer.size(); + m_send_buffer.resize(start_offset + 17); + + std::copy(buf, buf + 5, m_send_buffer.begin()+start_offset); + start_offset += 5; + + char* ptr = &m_send_buffer[start_offset]; + + // index + detail::write_int(block.piece_index, ptr); + // begin + detail::write_int(block_offset, ptr); + // length + detail::write_int(block_size, ptr); + + #ifndef NDEBUG + (*m_logger) << m_socket->sender().as_string() << " ==> CANCEL [ piece: " << block.piece_index << " | s: " << block_offset << " | l: " << block_size << " | " << block.block_index << " ]\n"; + #endif + + send_buffer_updated(); + } + + void peer_connection::request_block(piece_block block) + { + assert(block.piece_index >= 0); + assert(block.piece_index < m_torrent->torrent_file().num_pieces()); + assert(!m_torrent->picker().is_downloading(block)); + + m_torrent->picker().mark_as_downloading(block, m_peer_id); + + m_download_queue.push_back(block); + + int block_offset = block.block_index * m_torrent->block_size(); + int block_size + = std::min((int)m_torrent->torrent_file().piece_size(block.piece_index)-block_offset, + m_torrent->block_size()); + assert(block_size > 0); + assert(block_size <= m_torrent->block_size()); + + char buf[] = {0,0,0,13, msg_request}; + + std::size_t start_offset = m_send_buffer.size(); + m_send_buffer.resize(start_offset + 17); + + std::copy(buf, buf + 5, m_send_buffer.begin()+start_offset); + + char* ptr = &m_send_buffer[start_offset+5]; + // index + detail::write_int(block.piece_index, ptr); + + // begin + detail::write_int(block_offset, ptr); + + // length + detail::write_int(block_size, ptr); + + #ifndef NDEBUG + (*m_logger) << m_socket->sender().as_string() << " ==> REQUEST [ piece: " << block.piece_index << " | s: " << block_offset << " | l: " << block_size << " | " << block.block_index << " ]\n"; + #endif + + send_buffer_updated(); + } + + void peer_connection::send_bitfield() + { + #ifndef NDEBUG + (*m_logger) << m_socket->sender().as_string() << " ==> BITFIELD\n"; + #endif + const int packet_size = (m_have_piece.size() + 7) / 8 + 5; + const int old_size = m_send_buffer.size(); + m_send_buffer.resize(old_size + packet_size); + char* ptr = &m_send_buffer[old_size]; + detail::write_int(packet_size - 4, ptr); + m_send_buffer[old_size+4] = msg_bitfield; + std::fill(m_send_buffer.begin()+old_size+5, m_send_buffer.end(), 0); + for (std::size_t i = 0; i < m_have_piece.size(); ++i) + { + if (m_torrent->have_piece(i)) + m_send_buffer[old_size + 5 + (i>>3)] |= 1 << (7 - (i&7)); + } + send_buffer_updated(); + } + + void peer_connection::send_extensions() + { + #ifndef NDEBUG + (*m_logger) << m_socket->sender().as_string() << " ==> EXTENSIONS\n"; + #endif + assert(m_supports_extensions); + + entry extension_list(entry::dictionary_t); + + for (int i = 0; i < num_supported_extensions; ++i) + { + entry msg_index(entry::int_t); + msg_index.integer() = msg_extensions + 1 + i; + extension_list.dict()[extension_names[i]] = msg_index; } +#ifndef NDEBUG + extension_list.print(std::cout); +#endif - // *************** CANCEL *************** - case msg_cancel: + m_send_buffer.push_back(msg_extensions); + // make room for message size + const int msg_size_pos = m_send_buffer.size(); + m_send_buffer.resize(msg_size_pos + 4); + + bencode(std::back_inserter(m_send_buffer), extension_list); + + // write the length of the message + char* ptr = &m_send_buffer[msg_size_pos]; + detail::write_int(m_send_buffer.size() - msg_size_pos + 1, ptr); + + send_buffer_updated(); + } + + void peer_connection::choke() + { + if (m_choked) return; + char msg[] = {0,0,0,1,msg_choke}; + m_send_buffer.insert(m_send_buffer.end(), msg, msg+sizeof(msg)); + m_choked = true; + #ifndef NDEBUG + (*m_logger) << m_socket->sender().as_string() << " ==> CHOKE\n"; + #endif + m_requests.clear(); + send_buffer_updated(); + } + + void peer_connection::unchoke() + { + if (!m_choked) return; + char msg[] = {0,0,0,1,msg_unchoke}; + m_send_buffer.insert(m_send_buffer.end(), msg, msg+sizeof(msg)); + m_choked = false; + #ifndef NDEBUG + (*m_logger) << m_socket->sender().as_string() << " ==> UNCHOKE\n"; + #endif + send_buffer_updated(); + } + + void peer_connection::interested() + { + if (m_interesting) return; + char msg[] = {0,0,0,1,msg_interested}; + m_send_buffer.insert(m_send_buffer.end(), msg, msg+sizeof(msg)); + m_interesting = true; + #ifndef NDEBUG + (*m_logger) << m_socket->sender().as_string() << " ==> INTERESTED\n"; + #endif + send_buffer_updated(); + } + + void peer_connection::not_interested() + { + if (!m_interesting) return; + char msg[] = {0,0,0,1,msg_not_interested}; + m_send_buffer.insert(m_send_buffer.end(), msg, msg+sizeof(msg)); + m_interesting = false; + #ifndef NDEBUG + (*m_logger) << m_socket->sender().as_string() << " ==> NOT_INTERESTED\n"; + #endif + send_buffer_updated(); + } + + void peer_connection::send_have(int index) + { + const int packet_size = 9; + char msg[packet_size] = {0,0,0,5,msg_have}; + char* ptr = msg+5; + detail::write_int(index, ptr); + m_send_buffer.insert(m_send_buffer.end(), msg, msg + packet_size); + #ifndef NDEBUG + (*m_logger) << m_socket->sender().as_string() << " ==> HAVE [ piece: " << index << " ]\n"; + #endif + send_buffer_updated(); + } + + void peer_connection::second_tick() + { + m_statistics.second_tick(); + m_send_quota_left = m_send_quota; + if (m_send_quota > 0) send_buffer_updated(); + + // If the client sends more data + // we send it data faster, otherwise, slower. + // It will also depend on how much data the + // client has sent us. This is the mean to + // maintain a 1:1 share ratio with all peers. + + int diff = share_diff(); + + if (diff > 2*m_torrent->block_size()) { - if (m_packet_size != 13) - throw protocol_error("'cancel' message size != 13"); - m_statistics.received_bytes(0, received); - if (m_recv_pos < m_packet_size) return false; - - peer_request r; - const char* ptr = &m_recv_buffer[1]; - r.piece = detail::read_int(ptr); - r.start = detail::read_int(ptr); - r.length = detail::read_int(ptr); - - std::deque::iterator i - = std::find(m_requests.begin(), m_requests.end(), r); - if (i != m_requests.end()) - { - m_requests.erase(i); - } - - if (!has_data() && m_added_to_selector) - { - m_added_to_selector = false; - m_selector.remove_writable(m_socket); - } - -#ifndef NDEBUG - (*m_logger) << m_socket->sender().as_string() << " <== CANCEL [ piece: " << r.piece << " | s: " << r.start << " | l: " << r.length << " ]\n"; -#endif - break; - } - } - assert(m_recv_pos == m_packet_size); - return true; -} - -void libtorrent::peer_connection::cancel_block(piece_block block) -{ - assert(block.piece_index >= 0); - assert(block.piece_index < m_torrent->torrent_file().num_pieces()); - assert(m_torrent->picker().is_downloading(block)); - - m_torrent->picker().abort_download(block); - - std::deque::iterator i - = std::find(m_download_queue.begin(), m_download_queue.end(), block); - assert(i != m_download_queue.end()); - - m_download_queue.erase(i); - - - int block_offset = block.block_index * m_torrent->block_size(); - int block_size - = std::min((int)m_torrent->torrent_file().piece_size(block.piece_index)-block_offset, - m_torrent->block_size()); - assert(block_size > 0); - assert(block_size <= m_torrent->block_size()); - - char buf[] = {0,0,0,13, msg_cancel}; - - std::size_t start_offset = m_send_buffer.size(); - m_send_buffer.resize(start_offset + 17); - - std::copy(buf, buf + 5, m_send_buffer.begin()+start_offset); - start_offset += 5; - - char* ptr = &m_send_buffer[start_offset]; - - // index - detail::write_int(block.piece_index, ptr); - // begin - detail::write_int(block_offset, ptr); - // length - detail::write_int(block_size, ptr); - -#ifndef NDEBUG - (*m_logger) << m_socket->sender().as_string() << " ==> CANCEL [ piece: " << block.piece_index << " | s: " << block_offset << " | l: " << block_size << " | " << block.block_index << " ]\n"; -#endif - - send_buffer_updated(); -} - -void libtorrent::peer_connection::request_block(piece_block block) -{ - assert(block.piece_index >= 0); - assert(block.piece_index < m_torrent->torrent_file().num_pieces()); - assert(!m_torrent->picker().is_downloading(block)); - - m_torrent->picker().mark_as_downloading(block, m_peer_id); - - m_download_queue.push_back(block); - - int block_offset = block.block_index * m_torrent->block_size(); - int block_size - = std::min((int)m_torrent->torrent_file().piece_size(block.piece_index)-block_offset, - m_torrent->block_size()); - assert(block_size > 0); - assert(block_size <= m_torrent->block_size()); - - char buf[] = {0,0,0,13, msg_request}; - - std::size_t start_offset = m_send_buffer.size(); - m_send_buffer.resize(start_offset + 17); - - std::copy(buf, buf + 5, m_send_buffer.begin()+start_offset); - - char* ptr = &m_send_buffer[start_offset+5]; - // index - detail::write_int(block.piece_index, ptr); - - // begin - detail::write_int(block_offset, ptr); - - // length - detail::write_int(block_size, ptr); - -#ifndef NDEBUG - (*m_logger) << m_socket->sender().as_string() << " ==> REQUEST [ piece: " << block.piece_index << " | s: " << block_offset << " | l: " << block_size << " | " << block.block_index << " ]\n"; -#endif - - send_buffer_updated(); -} - -void libtorrent::peer_connection::send_bitfield() -{ -#ifndef NDEBUG - (*m_logger) << m_socket->sender().as_string() << " ==> BITFIELD\n"; -#endif - const int packet_size = (m_have_piece.size() + 7) / 8 + 5; - const int old_size = m_send_buffer.size(); - m_send_buffer.resize(old_size + packet_size); - char* ptr = &m_send_buffer[old_size]; - detail::write_int(packet_size - 4, ptr); - m_send_buffer[old_size+4] = msg_bitfield; - std::fill(m_send_buffer.begin()+old_size+5, m_send_buffer.end(), 0); - for (std::size_t i = 0; i < m_have_piece.size(); ++i) - { - if (m_torrent->have_piece(i)) - m_send_buffer[old_size + 5 + (i>>3)] |= 1 << (7 - (i&7)); - } - send_buffer_updated(); -} - -void libtorrent::peer_connection::choke() -{ - if (m_choked) return; - char msg[] = {0,0,0,1,msg_choke}; - m_send_buffer.insert(m_send_buffer.end(), msg, msg+sizeof(msg)); - m_choked = true; -#ifndef NDEBUG - (*m_logger) << m_socket->sender().as_string() << " ==> CHOKE\n"; -#endif - m_requests.clear(); - send_buffer_updated(); -} - -void libtorrent::peer_connection::unchoke() -{ - if (!m_choked) return; - char msg[] = {0,0,0,1,msg_unchoke}; - m_send_buffer.insert(m_send_buffer.end(), msg, msg+sizeof(msg)); - m_choked = false; -#ifndef NDEBUG - (*m_logger) << m_socket->sender().as_string() << " ==> UNCHOKE\n"; -#endif - send_buffer_updated(); -} - -void libtorrent::peer_connection::interested() -{ - if (m_interesting) return; - char msg[] = {0,0,0,1,msg_interested}; - m_send_buffer.insert(m_send_buffer.end(), msg, msg+sizeof(msg)); - m_interesting = true; -#ifndef NDEBUG - (*m_logger) << m_socket->sender().as_string() << " ==> INTERESTED\n"; -#endif - send_buffer_updated(); -} - -void libtorrent::peer_connection::not_interested() -{ - if (!m_interesting) return; - char msg[] = {0,0,0,1,msg_not_interested}; - m_send_buffer.insert(m_send_buffer.end(), msg, msg+sizeof(msg)); - m_interesting = false; -#ifndef NDEBUG - (*m_logger) << m_socket->sender().as_string() << " ==> NOT_INTERESTED\n"; -#endif - send_buffer_updated(); -} - -void libtorrent::peer_connection::send_have(int index) -{ - const int packet_size = 9; - char msg[packet_size] = {0,0,0,5,msg_have}; - char* ptr = msg+5; - detail::write_int(index, ptr); - m_send_buffer.insert(m_send_buffer.end(), msg, msg + packet_size); -#ifndef NDEBUG - (*m_logger) << m_socket->sender().as_string() << " ==> HAVE [ piece: " << index << " ]\n"; -#endif - send_buffer_updated(); -} - -void libtorrent::peer_connection::second_tick() -{ - m_statistics.second_tick(); - m_send_quota_left = m_send_quota; - if (m_send_quota > 0) send_buffer_updated(); - - // If the client sends more data - // we send it data faster, otherwise, slower. - // It will also depend on how much data the - // client has sent us. This is the mean to - // maintain a 1:1 share ratio with all peers. - - int diff = share_diff(); - - if (diff > 2*m_torrent->block_size()) - { - // if we have downloaded more than one piece more - // than we have uploaded, have an unlimited - // upload rate - m_send_quota_limit = -1; - } - else - { - // if we have downloaded too much, response with an - // upload rate of 10 kB/s more than we dowlload - // if we have uploaded too much, send with a rate of - // 10 kB/s less than we receive - int bias = 0; - if (diff > -2*m_torrent->block_size()) - { - bias = m_statistics.download_rate() / 2; - if (bias < 10*1024) bias = 10*1024; + // if we have downloaded more than one piece more + // than we have uploaded, have an unlimited + // upload rate + m_send_quota_limit = -1; } else { - bias = -m_statistics.download_rate() / 2; - } - m_send_quota_limit = m_statistics.download_rate() + bias; - // the maximum send_quota given our download rate from this peer - if (m_send_quota_limit < 256) m_send_quota_limit = 256; - } -} - -// -------------------------- -// RECEIVE DATA -// -------------------------- - -// throws exception when the client should be disconnected -void libtorrent::peer_connection::receive_data() -{ - assert(!m_socket->is_blocking()); - assert(m_packet_size > 0); - for(;;) - { - assert(m_packet_size > 0); - int received = m_socket->receive(&m_recv_buffer[m_recv_pos], m_packet_size - m_recv_pos); - - // connection closed - if (received == 0) - { - throw network_error(0); - } - - // an error - if (received < 0) - { - // would_block means that no data was ready to be received - // returns to exit the loop - if (m_socket->last_error() == socket::would_block) - return; - - // the connection was closed - throw network_error(m_socket->last_error()); - } - - if (received > 0) - { - m_last_receive = boost::posix_time::second_clock::local_time(); - - m_recv_pos += received; - - switch(m_state) + // if we have downloaded too much, response with an + // upload rate of 10 kB/s more than we dowlload + // if we have uploaded too much, send with a rate of + // 10 kB/s less than we receive + int bias = 0; + if (diff > -2*m_torrent->block_size()) { - case read_protocol_length: + bias = m_statistics.download_rate() / 2; + if (bias < 10*1024) bias = 10*1024; + } + else + { + bias = -m_statistics.download_rate() / 2; + } + m_send_quota_limit = m_statistics.download_rate() + bias; + // the maximum send_quota given our download rate from this peer + if (m_send_quota_limit < 256) m_send_quota_limit = 256; + } + } - m_statistics.received_bytes(0, received); - if (m_recv_pos < m_packet_size) break; - assert(m_recv_pos == m_packet_size); + // -------------------------- + // RECEIVE DATA + // -------------------------- - m_packet_size = reinterpret_cast(m_recv_buffer[0]); -#ifndef NDEBUG - (*m_logger) << m_socket->sender().as_string() << " protocol length: " << m_packet_size << "\n"; -#endif - m_state = read_protocol_string; - m_recv_buffer.resize(m_packet_size); - m_recv_pos = 0; + // throws exception when the client should be disconnected + void peer_connection::receive_data() + { + assert(!m_socket->is_blocking()); + assert(m_packet_size > 0); + for(;;) + { + assert(m_packet_size > 0); + int received = m_socket->receive(&m_recv_buffer[m_recv_pos], m_packet_size - m_recv_pos); - if (m_packet_size == 0) + // connection closed + if (received == 0) + { + throw network_error(0); + } + + // an error + if (received < 0) + { + // would_block means that no data was ready to be received + // returns to exit the loop + if (m_socket->last_error() == socket::would_block) + return; + + // the connection was closed + throw network_error(m_socket->last_error()); + } + + if (received > 0) + { + m_last_receive = boost::posix_time::second_clock::local_time(); + + m_recv_pos += received; + + switch(m_state) { -#ifndef NDEBUG - (*m_logger) << "incorrect protocol length\n"; -#endif - throw network_error(0); - } - break; + case read_protocol_length: + + m_statistics.received_bytes(0, received); + if (m_recv_pos < m_packet_size) break; + assert(m_recv_pos == m_packet_size); + + m_packet_size = reinterpret_cast(m_recv_buffer[0]); + #ifndef NDEBUG + (*m_logger) << m_socket->sender().as_string() << " protocol length: " << m_packet_size << "\n"; + #endif + m_state = read_protocol_string; + m_recv_buffer.resize(m_packet_size); + m_recv_pos = 0; + + if (m_packet_size == 0) + { + #ifndef NDEBUG + (*m_logger) << "incorrect protocol length\n"; + #endif + throw network_error(0); + } + break; - case read_protocol_string: + case read_protocol_string: + { + m_statistics.received_bytes(0, received); + if (m_recv_pos < m_packet_size) break; + assert(m_recv_pos == m_packet_size); + #ifndef NDEBUG + (*m_logger) << m_socket->sender().as_string() << " protocol: '" << std::string(m_recv_buffer.begin(), m_recv_buffer.end()) << "'\n"; + #endif + const char protocol_string[] = "BitTorrent protocol"; + const int protocol_len = sizeof(protocol_string) - 1; + if (!std::equal(m_recv_buffer.begin(), m_recv_buffer.end(), protocol_string)) + { + #ifndef NDEBUG + (*m_logger) << "incorrect protocol name\n"; + #endif + throw network_error(0); + } + + m_state = read_info_hash; + m_packet_size = 28; + m_recv_pos = 0; + m_recv_buffer.resize(28); + } + break; + + + case read_info_hash: { m_statistics.received_bytes(0, received); if (m_recv_pos < m_packet_size) break; assert(m_recv_pos == m_packet_size); -#ifndef NDEBUG - (*m_logger) << m_socket->sender().as_string() << " protocol: '" << std::string(m_recv_buffer.begin(), m_recv_buffer.end()) << "'\n"; -#endif - const char protocol_string[] = "BitTorrent protocol"; - const int protocol_len = sizeof(protocol_string) - 1; - if (!std::equal(m_recv_buffer.begin(), m_recv_buffer.end(), protocol_string)) - { -#ifndef NDEBUG - (*m_logger) << "incorrect protocol name\n"; -#endif - throw network_error(0); - } + // ok, now we have got enough of the handshake. Is this connection + // attached to a torrent? - m_state = read_info_hash; - m_packet_size = 28; - m_recv_pos = 0; - m_recv_buffer.resize(28); - } - break; - - - case read_info_hash: - { - m_statistics.received_bytes(0, received); - if (m_recv_pos < m_packet_size) break; - assert(m_recv_pos == m_packet_size); - // ok, now we have got enough of the handshake. Is this connection - // attached to a torrent? - - if (m_torrent == 0) - { // TODO: if the protocol is to be extended // these 8 bytes would be used to describe the // extensions available on the other side + if (m_recv_buffer[0] & 0x80) + { + m_supports_extensions = true; + } - // now, we have to see if there's a torrent with the - // info_hash we got from the peer - sha1_hash info_hash; - std::copy(m_recv_buffer.begin()+8, m_recv_buffer.begin() + 28, (char*)info_hash.begin()); - - m_torrent = m_ses.find_torrent(info_hash); if (m_torrent == 0) { - // we couldn't find the torrent! -#ifndef NDEBUG - (*m_logger) << m_socket->sender().as_string() << " couldn't find a torrent with the given info_hash\n"; -#endif - throw network_error(0); + + // now, we have to see if there's a torrent with the + // info_hash we got from the peer + sha1_hash info_hash; + std::copy(m_recv_buffer.begin()+8, m_recv_buffer.begin() + 28, (char*)info_hash.begin()); + + m_torrent = m_ses.find_torrent(info_hash); + if (m_torrent == 0) + { + // we couldn't find the torrent! + #ifndef NDEBUG + (*m_logger) << m_socket->sender().as_string() << " couldn't find a torrent with the given info_hash\n"; + #endif + throw network_error(0); + } + + // assume the other end has no pieces + m_have_piece.resize(m_torrent->torrent_file().num_pieces()); + std::fill(m_have_piece.begin(), m_have_piece.end(), false); + + // yes, we found the torrent + // reply with our handshake + std::copy(m_recv_buffer.begin()+28, m_recv_buffer.begin() + 48, (char*)m_peer_id.begin()); + send_handshake(); + send_bitfield(); } - - // assume the other end has no pieces - m_have_piece.resize(m_torrent->torrent_file().num_pieces()); - std::fill(m_have_piece.begin(), m_have_piece.end(), false); - - // yes, we found the torrent - // reply with our handshake - std::copy(m_recv_buffer.begin()+28, m_recv_buffer.begin() + 48, (char*)m_peer_id.begin()); - send_handshake(); - send_bitfield(); - } - else - { - // verify info hash - if (!std::equal(m_recv_buffer.begin()+8, m_recv_buffer.begin() + 28, (const char*)m_torrent->torrent_file().info_hash().begin())) + else { -#ifndef NDEBUG - (*m_logger) << m_socket->sender().as_string() << " received invalid info_hash\n"; -#endif - throw network_error(0); - } - } - - m_state = read_peer_id; - m_packet_size = 20; - m_recv_pos = 0; - m_recv_buffer.resize(20); -#ifndef NDEBUG - (*m_logger) << m_socket->sender().as_string() << " info_hash received\n"; -#endif - break; - } - - - case read_peer_id: - { - m_statistics.received_bytes(0, received); - if (m_recv_pos < m_packet_size) break; - assert(m_recv_pos == m_packet_size); - -#ifndef NDEBUG - { - peer_id tmp; - std::copy(m_recv_buffer.begin(), m_recv_buffer.begin() + 20, (char*)tmp.begin()); - std::stringstream s; - s << "received peer_id: " << tmp << " client: " << identify_client(tmp) << "\n"; - (*m_logger) << s.str(); - } -#endif - - if (m_active) - { - // verify peer_id - // TODO: It seems like the original client ignores to check the peer id - // can that be correct? - if (!std::equal(m_recv_buffer.begin(), m_recv_buffer.begin() + 20, (const char*)m_peer_id.begin())) - { -#ifndef NDEBUG - (*m_logger) << m_socket->sender().as_string() << " invalid peer_id (it doesn't equal the one from the tracker)\n"; -#endif - throw network_error(0); - } - } - else - { - // check to make sure we don't have another connection with the same - // info_hash and peer_id. If we do. close this connection. - std::copy(m_recv_buffer.begin(), m_recv_buffer.begin() + 20, (char*)m_peer_id.begin()); - - if (m_torrent->has_peer(m_peer_id)) - { -#ifndef NDEBUG - (*m_logger) << " duplicate connection, closing\n"; -#endif - throw network_error(0); + // verify info hash + if (!std::equal(m_recv_buffer.begin()+8, m_recv_buffer.begin() + 28, (const char*)m_torrent->torrent_file().info_hash().begin())) + { + #ifndef NDEBUG + (*m_logger) << m_socket->sender().as_string() << " received invalid info_hash\n"; + #endif + throw network_error(0); + } } - m_attached_to_torrent = true; - m_torrent->attach_peer(this); - assert(m_torrent->get_policy().has_connection(this)); - } + if (m_supports_extensions) send_extensions(); - m_state = read_packet_size; - m_packet_size = 4; - m_recv_pos = 0; - m_recv_buffer.resize(4); - - break; - } - - - case read_packet_size: - { - m_statistics.received_bytes(0, received); - if (m_recv_pos < m_packet_size) break; - assert(m_recv_pos == m_packet_size); - - // convert from big endian to native byte order - const char* ptr = &m_recv_buffer[0]; - m_packet_size = detail::read_int(ptr); - // don't accept packets larger than 1 MB - if (m_packet_size > 1024*1024 || m_packet_size < 0) - { -#ifndef NDEBUG - (*m_logger) << m_socket->sender().as_string() << " packet too large (packet_size > 1 Megabyte), abort\n"; -#endif - // packet too large - throw network_error(0); - } - - if (m_packet_size == 0) - { - // keepalive message - m_state = read_packet_size; - m_packet_size = 4; - } - else - { - m_state = read_packet; - m_recv_buffer.resize(m_packet_size); - } - m_recv_pos = 0; - assert(m_packet_size > 0); - break; - } - case read_packet: - - if (dispatch_message(received)) - { - m_state = read_packet_size; - m_packet_size = 4; - m_recv_buffer.resize(4); + m_state = read_peer_id; + m_packet_size = 20; m_recv_pos = 0; - assert(m_packet_size > 0); + m_recv_buffer.resize(20); + #ifndef NDEBUG + (*m_logger) << m_socket->sender().as_string() << " info_hash received\n"; + #endif + break; } - break; - } - } - } - assert(m_packet_size > 0); -} -bool libtorrent::peer_connection::has_data() const throw() -{ - // if we have requests or pending data to be sent or announcements to be made - // we want to send data - return ((!m_requests.empty() && !m_choked) - || !m_send_buffer.empty() - || !m_announce_queue.empty()) - && m_send_quota_left != 0; -} - -// -------------------------- -// SEND DATA -// -------------------------- - -// throws exception when the client should be disconnected -void libtorrent::peer_connection::send_data() -{ - assert(m_socket->is_writable()); - assert(has_data()); - - // only add new piece-chunks if the send buffer is small enough - // otherwise there will be no end to how large it will be! - // TODO: make this a bit better. Don't always read the entire - // requested block. Have a limit of how much of the requested - // block is actually read at a time. - while (!m_requests.empty() - && (m_send_buffer.size() < m_torrent->block_size()) - && !m_choked) - { - peer_request& r = m_requests.front(); - - if (r.piece >= 0 && r.piece < m_have_piece.size() && m_torrent && m_torrent->have_piece(r.piece)) - { - // make sure the request is ok - if (r.start + r.length > m_torrent->torrent_file().piece_size(r.piece)) - { - // NOT OK! disconnect - throw network_error(0); - } - - if (r.length <= 0 || r.start < 0) - { - // NOT OK! disconnect - throw network_error(0); - } - -#ifndef NDEBUG - assert(m_torrent->verify_piece(r.piece) && "internal error"); -#endif - const int send_buffer_offset = m_send_buffer.size(); - const int packet_size = 4 + 5 + 4 + r.length; - m_send_buffer.resize(send_buffer_offset + packet_size); - char* ptr = &m_send_buffer[send_buffer_offset]; - detail::write_int(packet_size-4, ptr); - *ptr = msg_piece; ++ptr; - detail::write_int(r.piece, ptr); - detail::write_int(r.start, ptr); - - m_torrent->filesystem().read( - &m_send_buffer[send_buffer_offset+13] - , r.piece - , r.start - , r.length); -#ifndef NDEBUG - (*m_logger) << m_socket->sender().as_string() << " ==> PIECE [ piece: " << r.piece << " | s: " << r.start << " | l: " << r.length << " ]\n"; -#endif - m_payloads.push_back(range(send_buffer_offset+13, r.length)); - } - else - { -#ifndef NDEBUG - (*m_logger) << m_socket->sender().as_string() - << " *** WARNING [ illegal piece request idx: " << r.piece - << " | s: " << r.start - << " | l: " << r.length - << " | max_piece: " << m_have_piece.size() - << " | torrent: " << (m_torrent != 0) - << " | have: " << m_torrent->have_piece(r.piece) - << " ]\n"; -#endif - } - m_requests.erase(m_requests.begin()); - } - - if (!m_announce_queue.empty()) - { - for (std::vector::iterator i = m_announce_queue.begin(); - i != m_announce_queue.end(); - ++i) - { -// (*m_logger) << "have piece: " << *i << " sent to: " << m_socket->sender().as_string() << "\n"; - send_have(*i); - } - m_announce_queue.clear(); - } - - assert(m_send_quota_left != 0); - - // send the actual buffer - if (!m_send_buffer.empty()) - { - - int amount_to_send = m_send_buffer.size(); - assert(m_send_quota_left != 0); - if (m_send_quota_left > 0) - amount_to_send = std::min(m_send_quota_left, amount_to_send); - // we have data that's scheduled for sending - int sent = m_socket->send( - &m_send_buffer[0] - , amount_to_send); + case read_peer_id: + { + m_statistics.received_bytes(0, received); + if (m_recv_pos < m_packet_size) break; + assert(m_recv_pos == m_packet_size); #ifndef NDEBUG -// (*m_logger) << m_socket->sender().as_string() << " ==> SENT [ length: " << sent << " ]\n"; + { + peer_id tmp; + std::copy(m_recv_buffer.begin(), m_recv_buffer.begin() + 20, (char*)tmp.begin()); + std::stringstream s; + s << "received peer_id: " << tmp << " client: " << identify_client(tmp) << "\n"; + (*m_logger) << s.str(); + } #endif - if (sent > 0) - { - if (m_send_quota_left != -1) - { - assert(m_send_quota_left >= sent); - m_send_quota_left -= sent; - } - - int amount_payload = 0; - if (!m_payloads.empty()) - { - for (std::deque::iterator i = m_payloads.begin(); - i != m_payloads.end(); - ++i) - { - i->start -= sent; - if (i->start < 0) + if (m_active) { - if (i->start + i->length <= 0) + // verify peer_id + // TODO: It seems like the original client ignores to check the peer id + // can that be correct? + if (!std::equal(m_recv_buffer.begin(), m_recv_buffer.begin() + 20, (const char*)m_peer_id.begin())) { - amount_payload += i->length; - } - else - { - amount_payload += -i->start; - i->length -= -i->start; - i->start = 0; + #ifndef NDEBUG + (*m_logger) << m_socket->sender().as_string() << " invalid peer_id (it doesn't equal the one from the tracker)\n"; + #endif + throw network_error(0); } } + else + { + // check to make sure we don't have another connection with the same + // info_hash and peer_id. If we do. close this connection. + std::copy(m_recv_buffer.begin(), m_recv_buffer.begin() + 20, (char*)m_peer_id.begin()); + + if (m_torrent->has_peer(m_peer_id)) + { + #ifndef NDEBUG + (*m_logger) << " duplicate connection, closing\n"; + #endif + throw network_error(0); + } + + m_attached_to_torrent = true; + m_torrent->attach_peer(this); + assert(m_torrent->get_policy().has_connection(this)); + } + + m_state = read_packet_size; + m_packet_size = 4; + m_recv_pos = 0; + m_recv_buffer.resize(4); + + break; + } + + + case read_packet_size: + { + m_statistics.received_bytes(0, received); + if (m_recv_pos < m_packet_size) break; + assert(m_recv_pos == m_packet_size); + + // convert from big endian to native byte order + const char* ptr = &m_recv_buffer[0]; + m_packet_size = detail::read_int(ptr); + // don't accept packets larger than 1 MB + if (m_packet_size > 1024*1024 || m_packet_size < 0) + { + #ifndef NDEBUG + (*m_logger) << m_socket->sender().as_string() << " packet too large (packet_size > 1 Megabyte), abort\n"; + #endif + // packet too large + throw network_error(0); + } + + if (m_packet_size == 0) + { + // keepalive message + m_state = read_packet_size; + m_packet_size = 4; + } + else + { + m_state = read_packet; + m_recv_buffer.resize(m_packet_size); + } + m_recv_pos = 0; + assert(m_packet_size > 0); + break; + } + case read_packet: + + if (dispatch_message(received)) + { + m_state = read_packet_size; + m_packet_size = 4; + m_recv_buffer.resize(4); + m_recv_pos = 0; + assert(m_packet_size > 0); + } + break; } } - // remove all payload ranges that has been sent - m_payloads.erase( - std::remove_if(m_payloads.begin(), m_payloads.end(), range_below_zero) - , m_payloads.end()); + } + assert(m_packet_size > 0); + } - assert(amount_payload <= sent); - m_statistics.sent_bytes(amount_payload, sent - amount_payload); - // empty the entire buffer at once or if - // only a part of the buffer could be sent - // remove the part that was sent from the buffer - if (sent == m_send_buffer.size()) + bool peer_connection::has_data() const throw() + { + // if we have requests or pending data to be sent or announcements to be made + // we want to send data + return ((!m_requests.empty() && !m_choked) + || !m_send_buffer.empty() + || !m_announce_queue.empty()) + && m_send_quota_left != 0; + } + + // -------------------------- + // SEND DATA + // -------------------------- + + // throws exception when the client should be disconnected + void peer_connection::send_data() + { + assert(m_socket->is_writable()); + assert(has_data()); + + // only add new piece-chunks if the send buffer is small enough + // otherwise there will be no end to how large it will be! + // TODO: make this a bit better. Don't always read the entire + // requested block. Have a limit of how much of the requested + // block is actually read at a time. + while (!m_requests.empty() + && (m_send_buffer.size() < m_torrent->block_size()) + && !m_choked) + { + peer_request& r = m_requests.front(); + + if (r.piece >= 0 && r.piece < m_have_piece.size() && m_torrent && m_torrent->have_piece(r.piece)) { - m_send_buffer.clear(); + // make sure the request is ok + if (r.start + r.length > m_torrent->torrent_file().piece_size(r.piece)) + { + // NOT OK! disconnect + throw network_error(0); + } + + if (r.length <= 0 || r.start < 0) + { + // NOT OK! disconnect + throw network_error(0); + } + + #ifndef NDEBUG + assert(m_torrent->verify_piece(r.piece) && "internal error"); + #endif + const int send_buffer_offset = m_send_buffer.size(); + const int packet_size = 4 + 5 + 4 + r.length; + m_send_buffer.resize(send_buffer_offset + packet_size); + char* ptr = &m_send_buffer[send_buffer_offset]; + detail::write_int(packet_size-4, ptr); + *ptr = msg_piece; ++ptr; + detail::write_int(r.piece, ptr); + detail::write_int(r.start, ptr); + + m_torrent->filesystem().read( + &m_send_buffer[send_buffer_offset+13] + , r.piece + , r.start + , r.length); + #ifndef NDEBUG + (*m_logger) << m_socket->sender().as_string() << " ==> PIECE [ piece: " << r.piece << " | s: " << r.start << " | l: " << r.length << " ]\n"; + #endif + m_payloads.push_back(range(send_buffer_offset+13, r.length)); } else { - m_send_buffer.erase( - m_send_buffer.begin() - , m_send_buffer.begin() + sent); + #ifndef NDEBUG + (*m_logger) << m_socket->sender().as_string() + << " *** WARNING [ illegal piece request idx: " << r.piece + << " | s: " << r.start + << " | l: " << r.length + << " | max_piece: " << m_have_piece.size() + << " | torrent: " << (m_torrent != 0) + << " | have: " << m_torrent->have_piece(r.piece) + << " ]\n"; + #endif + } + m_requests.erase(m_requests.begin()); + } + + if (!m_announce_queue.empty()) + { + for (std::vector::iterator i = m_announce_queue.begin(); + i != m_announce_queue.end(); + ++i) + { + // (*m_logger) << "have piece: " << *i << " sent to: " << m_socket->sender().as_string() << "\n"; + send_have(*i); + } + m_announce_queue.clear(); + } + + assert(m_send_quota_left != 0); + + // send the actual buffer + if (!m_send_buffer.empty()) + { + + int amount_to_send = m_send_buffer.size(); + assert(m_send_quota_left != 0); + if (m_send_quota_left > 0) + amount_to_send = std::min(m_send_quota_left, amount_to_send); + // we have data that's scheduled for sending + int sent = m_socket->send( + &m_send_buffer[0] + , amount_to_send); + + #ifndef NDEBUG + // (*m_logger) << m_socket->sender().as_string() << " ==> SENT [ length: " << sent << " ]\n"; + #endif + + if (sent > 0) + { + if (m_send_quota_left != -1) + { + assert(m_send_quota_left >= sent); + m_send_quota_left -= sent; + } + + int amount_payload = 0; + if (!m_payloads.empty()) + { + for (std::deque::iterator i = m_payloads.begin(); + i != m_payloads.end(); + ++i) + { + i->start -= sent; + if (i->start < 0) + { + if (i->start + i->length <= 0) + { + amount_payload += i->length; + } + else + { + amount_payload += -i->start; + i->length -= -i->start; + i->start = 0; + } + } + } + } + // remove all payload ranges that has been sent + m_payloads.erase( + std::remove_if(m_payloads.begin(), m_payloads.end(), range_below_zero) + , m_payloads.end()); + + assert(amount_payload <= sent); + m_statistics.sent_bytes(amount_payload, sent - amount_payload); + + // empty the entire buffer at once or if + // only a part of the buffer could be sent + // remove the part that was sent from the buffer + if (sent == m_send_buffer.size()) + { + m_send_buffer.clear(); + } + else + { + m_send_buffer.erase( + m_send_buffer.begin() + , m_send_buffer.begin() + sent); + } + } + else + { + assert(sent == -1); + throw network_error(m_socket->last_error()); + } + + m_last_sent = boost::posix_time::second_clock::local_time(); + } + + assert(m_added_to_selector); + send_buffer_updated(); + #ifndef NDEBUG + if (has_data()) + { + if (m_socket->is_writable()) + { + std::cout << "ERROR\n"; } } - else - { - assert(sent == -1); - throw network_error(m_socket->last_error()); - } - - m_last_sent = boost::posix_time::second_clock::local_time(); + #endif } - assert(m_added_to_selector); - send_buffer_updated(); -#ifndef NDEBUG - if (has_data()) + + void peer_connection::keep_alive() { - if (m_socket->is_writable()) + boost::posix_time::time_duration d; + d = boost::posix_time::second_clock::local_time() - m_last_sent; + if (d.seconds() > m_timeout / 2) { - std::cout << "ERROR\n"; + char noop[] = {0,0,0,0}; + m_send_buffer.insert(m_send_buffer.end(), noop, noop+4); + m_last_sent = boost::posix_time::second_clock::local_time(); + #ifndef NDEBUG + (*m_logger) << m_socket->sender().as_string() << " ==> NOP\n"; + #endif + send_buffer_updated(); } } -#endif -} - - -void libtorrent::peer_connection::keep_alive() -{ - boost::posix_time::time_duration d; - d = boost::posix_time::second_clock::local_time() - m_last_sent; - if (d.seconds() > m_timeout / 2) - { - char noop[] = {0,0,0,0}; - m_send_buffer.insert(m_send_buffer.end(), noop, noop+4); - m_last_sent = boost::posix_time::second_clock::local_time(); -#ifndef NDEBUG - (*m_logger) << m_socket->sender().as_string() << " ==> NOP\n"; -#endif - send_buffer_updated(); - } -} +} \ No newline at end of file diff --git a/src/session.cpp b/src/session.cpp index 5836c6d17..1faf98926 100755 --- a/src/session.cpp +++ b/src/session.cpp @@ -880,6 +880,7 @@ namespace libtorrent m_impl.m_alerts.set_severity(s); } + // TODO: store resume data as an entry instead void detail::piece_checker_data::parse_resume_data( const std::vector* rd , const torrent_info& info) @@ -920,7 +921,7 @@ namespace libtorrent int num_unfinished = read_int(ptr); if (num_unfinished < 0) return; - if (data.size() != 20 + (1 + num_slots + 2 + num_unfinished * (num_blocks_per_piece / 32 + 1)) * 4) + if (data.size() != 20 + (1 + num_slots + 2 + num_unfinished) * 4 + num_unfinished * (num_blocks_per_piece / 8)) return; tmp_unfinished.reserve(num_unfinished); @@ -935,12 +936,13 @@ namespace libtorrent || p.index >= info.num_pieces()) return; - for (int j = 0; j < num_blocks_per_piece / 32; ++j) + const int num_bitmask_bytes = std::max(num_blocks_per_piece / 8, 1); + for (int j = 0; j < num_bitmask_bytes; ++j) { - unsigned int bits = read_int(ptr); - for (int k = 0; k < 32; ++k) + unsigned char bits = read_uchar(ptr); + for (int k = 0; k < 8; ++k) { - const int bit = j * 32 + k; + const int bit = j * 8 + k; if (bits & (1 << bit)) p.finished_blocks[bit] = true; } diff --git a/src/torrent_handle.cpp b/src/torrent_handle.cpp index d77dad606..5c28bc060 100755 --- a/src/torrent_handle.cpp +++ b/src/torrent_handle.cpp @@ -231,12 +231,15 @@ namespace libtorrent // the unsinished piece's index detail::write_int(i->index, out); - // write - for (int j = 0; j < num_blocks_per_piece / 32; ++j) + // TODO: write the bitmask in correct byteorder + // TODO: make sure to read it in the correct order too + const int num_bitmask_bytes = std::max(num_blocks_per_piece / 8, 1); + for (int j = 0; j < num_bitmask_bytes; ++j) { - unsigned int v = 0; - for (int k = 0; k < 32; ++k) v |= i->finished_blocks[j*32+k]?(1 << k):0; - detail::write_int(v, out); + unsigned char v = 0; + for (int k = 0; k < 8; ++k) + v |= i->finished_blocks[j*8+k]?(1 << k):0; + detail::write_uchar(v, out); } } } @@ -337,6 +340,7 @@ namespace libtorrent if (peer->is_choked()) p.flags |= peer_info::choked; if (peer->is_peer_interested()) p.flags |= peer_info::remote_interested; if (peer->has_peer_choked()) p.flags |= peer_info::remote_choked; + if (peer->support_extensions()) p.flags |= peer_info::supports_extensions; p.pieces = peer->get_bitfield(); }