diff --git a/examples/client_test.cpp b/examples/client_test.cpp index f39a1abc1..f23d33fe5 100755 --- a/examples/client_test.cpp +++ b/examples/client_test.cpp @@ -54,7 +54,7 @@ POSSIBILITY OF SUCH DAMAGE. bool sleep_and_input(char* c) { - Sleep(500); + Sleep(1000); if (kbhit()) { *c = getch(); @@ -216,8 +216,7 @@ int main(int argc, char* argv[]) if (c == 'q') break; } - clear(); - set_cursor(0, 0); + std::stringstream out; for (std::vector::iterator i = handles.begin(); i != handles.end(); ++i) @@ -227,16 +226,16 @@ int main(int argc, char* argv[]) switch(s.state) { case torrent_status::queued_for_checking: - std::cout << "queued "; + out << "queued "; break; case torrent_status::checking_files: - std::cout << "checking "; + out << "checking "; break; case torrent_status::downloading: - std::cout << "dloading "; + out << "dloading "; break; case torrent_status::seeding: - std::cout << "seeding "; + out << "seeding "; break; }; @@ -256,33 +255,35 @@ int main(int argc, char* argv[]) % add_suffix(total_up) % add_suffix(up); */ - std::cout.precision(4); - std::cout.width(5); - std::cout.fill(' '); - std::cout << (s.progress*100) << "% "; + out.precision(4); + out.width(5); + out.fill(' '); + out << (s.progress*100) << "% "; for (int i = 0; i < 50; ++i) { if (i / 50.f > s.progress) - std::cout << "-"; + out << "-"; else - std::cout << "#"; + out << "#"; } - std::cout << "\n"; + out << "\n"; - std::cout << "peers:" << num_peers << " d:" - << add_suffix(down) << "/s (" << add_suffix(total_down) << ") u:" - << add_suffix(up) << "/s (" << add_suffix(total_up) << ") diff: " - << add_suffix(total_down - total_up) << "\n"; + out << "peers: " << num_peers << " " + << "d:" << add_suffix(down) << "/s " + << "(" << add_suffix(total_down) << ") " + << "u:" << add_suffix(up) << "/s " + << "(" << add_suffix(total_up) << ") " + << "diff: " << add_suffix(total_down - total_up) << "\n"; boost::posix_time::time_duration t = s.next_announce; // std::cout << "next announce: " << boost::posix_time::to_simple_string(t) << "\n"; - std::cout << "next announce: " << t.hours() << ":" << t.minutes() << ":" << t.seconds() << "\n"; + out << "next announce: " << t.hours() << ":" << t.minutes() << ":" << t.seconds() << "\n"; for (std::vector::iterator i = peers.begin(); i != peers.end(); ++i) { - std::cout << "d: " << add_suffix(i->down_speed) << "/s " + out << "d: " << add_suffix(i->down_speed) << "/s " << "(" << add_suffix(i->total_download) << ") " << "u: " << add_suffix(i->up_speed) << "/s " << "(" << add_suffix(i->total_upload) << ") " @@ -293,31 +294,33 @@ int main(int argc, char* argv[]) << static_cast((i->flags & peer_info::choked)?"C":"_") << static_cast((i->flags & peer_info::remote_interested)?"i":"_") << static_cast((i->flags & peer_info::remote_choked)?"c":"_") << "\n"; - } - std::cout << "___________________________________\n"; + out << "___________________________________\n"; i->get_download_queue(queue); for (std::vector::iterator i = queue.begin(); i != queue.end(); ++i) { - std::cout.width(4); - std::cout.fill(' '); - std::cout << i->piece_index << ": |"; + out.width(4); + out.fill(' '); + out << i->piece_index << ": |"; for (int j = 0; j < i->blocks_in_piece; ++j) { - if (i->finished_blocks[j]) std::cout << "#"; - else if (i->requested_blocks[j]) std::cout << "="; - else std::cout << "-"; + if (i->finished_blocks[j]) out << "#"; + else if (i->requested_blocks[j]) out << "="; + else out << "-"; } - std::cout << "|\n"; + out << "|\n"; } - std::cout << "___________________________________\n"; - + out << "___________________________________\n"; } + + clear(); + set_cursor(0, 0); + std::cout << out.str(); } } catch (std::exception& e) diff --git a/include/libtorrent/peer_connection.hpp b/include/libtorrent/peer_connection.hpp index 89bfa9973..902f6422b 100755 --- a/include/libtorrent/peer_connection.hpp +++ b/include/libtorrent/peer_connection.hpp @@ -36,6 +36,7 @@ POSSIBILITY OF SUCH DAMAGE. #include #include #include +#include #include #include @@ -48,14 +49,10 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/stat.hpp" #include "libtorrent/debug.hpp" -/* - * This file declares the following functions: - * - *---------------------------------- - * - * - * - */ +// TODO: each time a block is 'taken over' +// from another peer. That peer must be given +// a chance to request another block instead. +// Where it also could become not-interested. namespace libtorrent { @@ -131,7 +128,7 @@ namespace libtorrent const peer_id& id() const throw() { return m_peer_id; } bool has_piece(int i) const throw() { return m_have_piece[i]; } - const std::vector& download_queue() const throw() + const std::deque& download_queue() const throw() { return m_download_queue; } void choke(); @@ -205,7 +202,7 @@ namespace libtorrent private: - void dispatch_message(); + bool dispatch_message(int received); void send_buffer_updated(); void send_bitfield(); @@ -252,6 +249,21 @@ namespace libtorrent // consumed by send() std::vector m_send_buffer; + // this is a queue of ranges that describes + // where in the send buffer actual payload + // data is located. This is currently + // only used to be able to gather statistics + // seperately on payload and protocol data. + struct range + { + range(int s, int l): start(s), length(l) {} + int start; + int length; + }; + static bool range_below_zero(const range& r) + { return r.start < 0; } + std::deque m_payloads; + // timeouts boost::posix_time::ptime m_last_receive; boost::posix_time::ptime m_last_sent; @@ -314,13 +326,18 @@ namespace libtorrent // the pieces the other end have std::vector m_have_piece; - std::vector m_requests; + // the queue of requests we have got + // from this peer + std::deque m_requests; // a list of pieces that have become available // and should be announced as available to // the peer std::vector m_announce_queue; - std::vector m_download_queue; + + // the queue of blocks we have requested + // from this peer + std::deque m_download_queue; stat m_statistics; diff --git a/src/peer_connection.cpp b/src/peer_connection.cpp index a0ead07fa..75c064199 100755 --- a/src/peer_connection.cpp +++ b/src/peer_connection.cpp @@ -95,8 +95,8 @@ libtorrent::peer_connection::peer_connection( , m_peer_choked(true) , m_interesting(false) , m_choked(true) - , m_send_quota(-1) - , m_send_quota_left(-1) + , m_send_quota(100) + , m_send_quota_left(100) , m_send_quota_limit(100) , m_trust_points(0) { @@ -142,8 +142,8 @@ libtorrent::peer_connection::peer_connection( , m_peer_choked(true) , m_interesting(false) , m_choked(true) - , m_send_quota(-1) - , m_send_quota_left(-1) + , m_send_quota(100) + , m_send_quota_left(100) , m_send_quota_limit(100) , m_trust_points(0) { @@ -227,15 +227,17 @@ void libtorrent::peer_connection::send_handshake() #ifndef NDEBUG (*m_logger) << m_socket->sender().as_string() << " ==> HANDSHAKE\n"; #endif - m_statistics.sent_bytes(0, m_send_buffer.size()); send_buffer_updated(); } -void libtorrent::peer_connection::dispatch_message() +bool libtorrent::peer_connection::dispatch_message(int received) { + assert(m_recv_pos >= received); + assert(m_recv_pos > 0); + int packet_type = m_recv_buffer[0]; - if (packet_type > 8 || packet_type < 0) + if (packet_type > msg_cancel || packet_type < msg_choke) throw protocol_error("unknown message id"); switch (packet_type) @@ -243,11 +245,10 @@ void libtorrent::peer_connection::dispatch_message() // *************** CHOKE *************** case msg_choke: - - if (m_packet_size != 5) - throw protocol_error("'choke' message size != 5"); - - m_statistics.received_bytes(0, m_packet_size); + if (m_packet_size != 1) + throw protocol_error("'choke' message size != 1"); + m_statistics.received_bytes(0, received); + if (m_recv_pos < m_packet_size) return false; #ifndef NDEBUG (*m_logger) << m_socket->sender().as_string() << " <== CHOKE\n"; @@ -257,7 +258,7 @@ void libtorrent::peer_connection::dispatch_message() // remove all pieces from this peers download queue and // remove the 'downloading' flag from piece_picker. - for (std::vector::iterator i = m_download_queue.begin(); + for (std::deque::iterator i = m_download_queue.begin(); i != m_download_queue.end(); ++i) { @@ -275,8 +276,8 @@ void libtorrent::peer_connection::dispatch_message() case msg_unchoke: if (m_packet_size != 1) throw protocol_error("'unchoke' message size != 1"); - - m_statistics.received_bytes(0, m_packet_size); + m_statistics.received_bytes(0, received); + if (m_recv_pos < m_packet_size) return false; #ifndef NDEBUG (*m_logger) << m_socket->sender().as_string() << " <== UNCHOKE\n"; @@ -290,8 +291,8 @@ void libtorrent::peer_connection::dispatch_message() case msg_interested: if (m_packet_size != 1) throw protocol_error("'interested' message size != 1"); - - m_statistics.received_bytes(0, m_packet_size); + m_statistics.received_bytes(0, received); + if (m_recv_pos < m_packet_size) return false; #ifndef NDEBUG (*m_logger) << m_socket->sender().as_string() << " <== INTERESTED\n"; @@ -305,8 +306,8 @@ void libtorrent::peer_connection::dispatch_message() case msg_not_interested: if (m_packet_size != 1) throw protocol_error("'not interested' message size != 1"); - - m_statistics.received_bytes(0, m_packet_size); + m_statistics.received_bytes(0, received); + if (m_recv_pos < m_packet_size) return false; #ifndef NDEBUG (*m_logger) << m_socket->sender().as_string() << " <== NOT_INTERESTED\n"; @@ -322,8 +323,8 @@ void libtorrent::peer_connection::dispatch_message() { if (m_packet_size != 5) throw protocol_error("'have' message size != 5"); - - m_statistics.received_bytes(0, m_packet_size); + m_statistics.received_bytes(0, received); + if (m_recv_pos < m_packet_size) return false; std::size_t index = read_int(&m_recv_buffer[1]); // if we got an invalid message, abort @@ -357,8 +358,8 @@ void libtorrent::peer_connection::dispatch_message() { if (m_packet_size - 1 != (m_have_piece.size() + 7) / 8) throw protocol_error("bitfield with invalid size"); - - m_statistics.received_bytes(0, m_packet_size); + m_statistics.received_bytes(0, received); + if (m_recv_pos < m_packet_size) return false; #ifndef NDEBUG (*m_logger) << m_socket->sender().as_string() << " <== BITFIELD\n"; @@ -398,8 +399,8 @@ void libtorrent::peer_connection::dispatch_message() { if (m_packet_size != 13) throw protocol_error("'request' message size != 13"); - - m_statistics.received_bytes(0, m_packet_size); + m_statistics.received_bytes(0, received); + if (m_recv_pos < m_packet_size) return false; peer_request r; r.piece = read_int(&m_recv_buffer[1]); @@ -424,6 +425,25 @@ void libtorrent::peer_connection::dispatch_message() // *************** PIECE *************** case msg_piece: { + if (m_recv_pos <= 9) + // only received protocol data + m_statistics.received_bytes(0, received); + else if (m_recv_pos - received >= 9) + // only received payload data + m_statistics.received_bytes(received, 0); + else + { + // received a bit of both + assert(m_recv_pos - received < 9); + assert(m_recv_pos > 9); + assert(9 - (m_recv_pos - received) <= 9); + m_statistics.received_bytes( + m_recv_pos - 9 + , 9 - (m_recv_pos - received)); + } + + if (m_recv_pos < m_packet_size) return false; + std::size_t index = read_int(&m_recv_buffer[1]); if (index < 0 || index >= m_torrent->torrent_file().num_pieces()) { @@ -480,13 +500,10 @@ void libtorrent::peer_connection::dispatch_message() (*m_logger) << m_socket->sender().as_string() << " <== PIECE [ piece: " << index << " | s: " << offset << " | l: " << len << " ]\n"; #endif - assert(m_packet_size > len); - m_statistics.received_bytes(len, m_packet_size - len); - piece_picker& picker = m_torrent->picker(); piece_block block_finished(index, offset / m_torrent->block_size()); - std::vector::iterator b + std::deque::iterator b = std::find( m_download_queue.begin() , m_download_queue.end() @@ -535,14 +552,15 @@ void libtorrent::peer_connection::dispatch_message() { if (m_packet_size != 13) throw protocol_error("'cancel' message size != 13"); - m_statistics.received_bytes(0, m_packet_size); + m_statistics.received_bytes(0, received); + if (m_recv_pos < m_packet_size) return false; peer_request r; r.piece = read_int(&m_recv_buffer[1]); r.start = read_int(&m_recv_buffer[5]); r.length = read_int(&m_recv_buffer[9]); - std::vector::iterator i + std::deque::iterator i = std::find(m_requests.begin(), m_requests.end(), r); if (i != m_requests.end()) { @@ -561,6 +579,8 @@ void libtorrent::peer_connection::dispatch_message() break; } } + assert(m_recv_pos == m_packet_size); + return true; } void libtorrent::peer_connection::cancel_block(piece_block block) @@ -571,7 +591,7 @@ void libtorrent::peer_connection::cancel_block(piece_block block) m_torrent->picker().abort_download(block); - std::vector::iterator i + std::deque::iterator i = std::find(m_download_queue.begin(), m_download_queue.end(), block); assert(i != m_download_queue.end()); @@ -608,7 +628,6 @@ void libtorrent::peer_connection::cancel_block(piece_block block) (*m_logger) << m_socket->sender().as_string() << " ==> CANCEL [ piece: " << block.piece_index << " | s: " << block_offset << " | l: " << block_size << " | " << block.block_index << " ]\n"; #endif assert(start_offset == m_send_buffer.size()); - m_statistics.sent_bytes(0, 17); send_buffer_updated(); } @@ -656,7 +675,6 @@ void libtorrent::peer_connection::request_block(piece_block block) (*m_logger) << m_socket->sender().as_string() << " ==> REQUEST [ piece: " << block.piece_index << " | s: " << block_offset << " | l: " << block_size << " | " << block.block_index << " ]\n"; #endif assert(start_offset == m_send_buffer.size()); - m_statistics.sent_bytes(0, 17); send_buffer_updated(); } @@ -677,7 +695,6 @@ void libtorrent::peer_connection::send_bitfield() if (m_torrent->have_piece(i)) m_send_buffer[old_size + 5 + (i>>3)] |= 1 << (7 - (i&7)); } - m_statistics.sent_bytes(0, packet_size); send_buffer_updated(); } @@ -690,7 +707,6 @@ void libtorrent::peer_connection::choke() #ifndef NDEBUG (*m_logger) << m_socket->sender().as_string() << " ==> CHOKE\n"; #endif - m_statistics.sent_bytes(0, 5); send_buffer_updated(); } @@ -703,7 +719,6 @@ void libtorrent::peer_connection::unchoke() #ifndef NDEBUG (*m_logger) << m_socket->sender().as_string() << " ==> UNCHOKE\n"; #endif - m_statistics.sent_bytes(0, 5); send_buffer_updated(); } @@ -716,7 +731,6 @@ void libtorrent::peer_connection::interested() #ifndef NDEBUG (*m_logger) << m_socket->sender().as_string() << " ==> INTERESTED\n"; #endif - m_statistics.sent_bytes(0, 5); send_buffer_updated(); } @@ -729,7 +743,6 @@ void libtorrent::peer_connection::not_interested() #ifndef NDEBUG (*m_logger) << m_socket->sender().as_string() << " ==> NOT_INTERESTED\n"; #endif - m_statistics.sent_bytes(0, 5); send_buffer_updated(); } @@ -742,7 +755,6 @@ void libtorrent::peer_connection::send_have(int index) #ifndef NDEBUG (*m_logger) << m_socket->sender().as_string() << " ==> HAVE [ piece: " << index << " ]\n"; #endif - m_statistics.sent_bytes(0, packet_size); send_buffer_updated(); } @@ -778,11 +790,12 @@ void libtorrent::peer_connection::second_tick() // upload rate of 10 kB/s more than we dowlload // if we have uploaded too much, send with a rate of // 10 kB/s less than we receive - int bias = (diff > 0 ? 10 : -10) * 1024; + int bias = (diff > -32*1024 ? 10 : -10) * 1024; // the maximum send_quota given our download rate from this peer m_send_quota_limit = m_statistics.download_rate() + bias; if (m_send_quota_limit < 500) m_send_quota_limit = 500; } + assert(m_send_quota_limit >= 500 || m_send_quota_limit == -1); } // -------------------------- @@ -809,9 +822,9 @@ void libtorrent::peer_connection::receive_data() if (received < 0) { // would_block means that no data was ready to be received - // this should never happen, since we have a selector - //assert(m_socket->last_error() != socket::would_block); - if (m_socket->last_error() == socket::would_block) return; + // returns to exit the loop + if (m_socket->last_error() == socket::would_block) + return; // the connection was closed throw network_error(m_socket->last_error()); @@ -823,202 +836,212 @@ void libtorrent::peer_connection::receive_data() m_recv_pos += received; - if (m_recv_pos == m_packet_size) + switch(m_state) { - switch(m_state) + case read_protocol_length: + + m_statistics.received_bytes(0, received); + if (m_recv_pos < m_packet_size) break; + assert(m_recv_pos == m_packet_size); + + m_packet_size = reinterpret_cast(m_recv_buffer[0]); +#ifndef NDEBUG + (*m_logger) << m_socket->sender().as_string() << " protocol length: " << m_packet_size << "\n"; +#endif + m_state = read_protocol_string; + m_recv_buffer.resize(m_packet_size); + m_recv_pos = 0; + + if (m_packet_size == 0) { - case read_protocol_length: - m_statistics.received_bytes(0, received); - - m_packet_size = reinterpret_cast(m_recv_buffer[0]); - #ifndef NDEBUG - (*m_logger) << m_socket->sender().as_string() << " protocol length: " << m_packet_size << "\n"; - #endif - m_state = read_protocol_string; - m_recv_buffer.resize(m_packet_size); - m_recv_pos = 0; - - if (m_packet_size == 0) - { - #ifndef NDEBUG - (*m_logger) << "incorrect protocol length\n"; - #endif - throw network_error(0); - } - break; - - - case read_protocol_string: - { - m_statistics.received_bytes(0, received); - #ifndef NDEBUG - (*m_logger) << m_socket->sender().as_string() << " protocol: '" << std::string(m_recv_buffer.begin(), m_recv_buffer.end()) << "'\n"; - #endif - const char protocol_string[] = "BitTorrent protocol"; - const int protocol_len = sizeof(protocol_string) - 1; - if (!std::equal(m_recv_buffer.begin(), m_recv_buffer.end(), protocol_string)) - { - #ifndef NDEBUG - (*m_logger) << "incorrect protocol name\n"; - #endif - throw network_error(0); - } - - m_state = read_info_hash; - m_packet_size = 28; - m_recv_pos = 0; - m_recv_buffer.resize(28); - } - break; - - - case read_info_hash: - { - m_statistics.received_bytes(0, received); - // ok, now we have got enough of the handshake. Is this connection - // attached to a torrent? - - if (m_torrent == 0) - { - // TODO: if the protocol is to be extended - // these 8 bytes would be used to describe the - // extensions available on the other side - - // now, we have to see if there's a torrent with the - // info_hash we got from the peer - sha1_hash info_hash; - std::copy(m_recv_buffer.begin()+8, m_recv_buffer.begin() + 28, (char*)info_hash.begin()); - - m_torrent = m_ses.find_torrent(info_hash); - if (m_torrent == 0) - { - // we couldn't find the torrent! - #ifndef NDEBUG - (*m_logger) << m_socket->sender().as_string() << " couldn't find a torrent with the given info_hash\n"; - #endif - throw network_error(0); - } - - // assume the other end has no pieces - m_have_piece.resize(m_torrent->torrent_file().num_pieces()); - std::fill(m_have_piece.begin(), m_have_piece.end(), false); - - // yes, we found the torrent - // reply with our handshake - std::copy(m_recv_buffer.begin()+28, m_recv_buffer.begin() + 48, (char*)m_peer_id.begin()); - send_handshake(); - send_bitfield(); - } - else - { - // verify info hash - if (!std::equal(m_recv_buffer.begin()+8, m_recv_buffer.begin() + 28, (const char*)m_torrent->torrent_file().info_hash().begin())) - { - #ifndef NDEBUG - (*m_logger) << m_socket->sender().as_string() << " received invalid info_hash\n"; - #endif - throw network_error(0); - } - } - - m_state = read_peer_id; - m_packet_size = 20; - m_recv_pos = 0; - m_recv_buffer.resize(20); - #ifndef NDEBUG - (*m_logger) << m_socket->sender().as_string() << " info_hash received\n"; - #endif - break; +#ifndef NDEBUG + (*m_logger) << "incorrect protocol length\n"; +#endif + throw network_error(0); } + break; - case read_peer_id: + case read_protocol_string: { m_statistics.received_bytes(0, received); - if (m_active) + if (m_recv_pos < m_packet_size) break; + assert(m_recv_pos == m_packet_size); +#ifndef NDEBUG + (*m_logger) << m_socket->sender().as_string() << " protocol: '" << std::string(m_recv_buffer.begin(), m_recv_buffer.end()) << "'\n"; +#endif + const char protocol_string[] = "BitTorrent protocol"; + const int protocol_len = sizeof(protocol_string) - 1; + if (!std::equal(m_recv_buffer.begin(), m_recv_buffer.end(), protocol_string)) { - // verify peer_id - // TODO: It seems like the original client ignores to check the peer id - // can that be correct? - if (!std::equal(m_recv_buffer.begin(), m_recv_buffer.begin() + 20, (const char*)m_peer_id.begin())) - { - #ifndef NDEBUG - (*m_logger) << m_socket->sender().as_string() << " invalid peer_id (it doesn't equal the one from the tracker)\n"; - #endif - throw network_error(0); - } - } - else - { - // check to make sure we don't have another connection with the same - // info_hash and peer_id. If we do. close this connection. - std::copy(m_recv_buffer.begin(), m_recv_buffer.begin() + 20, (char*)m_peer_id.begin()); - - if (m_torrent->has_peer(m_peer_id)) - { - #ifndef NDEBUG - (*m_logger) << m_socket->sender().as_string() << " duplicate connection, closing\n"; - #endif - throw network_error(0); - } - - m_attached_to_torrent = true; - m_torrent->attach_peer(this); - assert(m_torrent->get_policy().has_connection(this)); - } - - m_state = read_packet_size; - m_packet_size = 4; - m_recv_pos = 0; - m_recv_buffer.resize(4); - #ifndef NDEBUG - (*m_logger) << m_socket->sender().as_string() << " received peer_id\n"; - #endif - break; - } - - - case read_packet_size: - m_statistics.received_bytes(0, received); - - // convert from big endian to native byte order - m_packet_size = read_int(&m_recv_buffer[0]); - // don't accept packets larger than 1 MB - if (m_packet_size > 1024*1024 || m_packet_size < 0) - { - #ifndef NDEBUG - (*m_logger) << m_socket->sender().as_string() << " packet too large (packet_size > 1 Megabyte), abort\n"; - #endif - // packet too large +#ifndef NDEBUG + (*m_logger) << "incorrect protocol name\n"; +#endif throw network_error(0); } - - if (m_packet_size == 0) - { - // keepalive message - m_state = read_packet_size; - m_packet_size = 4; - } - else - { - m_state = read_packet; - m_recv_buffer.resize(m_packet_size); - } + + m_state = read_info_hash; + m_packet_size = 28; m_recv_pos = 0; - assert(m_packet_size > 0); - break; + m_recv_buffer.resize(28); + } + break; - case read_packet: - dispatch_message(); + case read_info_hash: + { + m_statistics.received_bytes(0, received); + if (m_recv_pos < m_packet_size) break; + assert(m_recv_pos == m_packet_size); + // ok, now we have got enough of the handshake. Is this connection + // attached to a torrent? + if (m_torrent == 0) + { + // TODO: if the protocol is to be extended + // these 8 bytes would be used to describe the + // extensions available on the other side + + // now, we have to see if there's a torrent with the + // info_hash we got from the peer + sha1_hash info_hash; + std::copy(m_recv_buffer.begin()+8, m_recv_buffer.begin() + 28, (char*)info_hash.begin()); + + m_torrent = m_ses.find_torrent(info_hash); + if (m_torrent == 0) + { + // we couldn't find the torrent! +#ifndef NDEBUG + (*m_logger) << m_socket->sender().as_string() << " couldn't find a torrent with the given info_hash\n"; +#endif + throw network_error(0); + } + + // assume the other end has no pieces + m_have_piece.resize(m_torrent->torrent_file().num_pieces()); + std::fill(m_have_piece.begin(), m_have_piece.end(), false); + + // yes, we found the torrent + // reply with our handshake + std::copy(m_recv_buffer.begin()+28, m_recv_buffer.begin() + 48, (char*)m_peer_id.begin()); + send_handshake(); + send_bitfield(); + } + else + { + // verify info hash + if (!std::equal(m_recv_buffer.begin()+8, m_recv_buffer.begin() + 28, (const char*)m_torrent->torrent_file().info_hash().begin())) + { +#ifndef NDEBUG + (*m_logger) << m_socket->sender().as_string() << " received invalid info_hash\n"; +#endif + throw network_error(0); + } + } + + m_state = read_peer_id; + m_packet_size = 20; + m_recv_pos = 0; + m_recv_buffer.resize(20); +#ifndef NDEBUG + (*m_logger) << m_socket->sender().as_string() << " info_hash received\n"; +#endif + break; + } + + + case read_peer_id: + { + m_statistics.received_bytes(0, received); + if (m_recv_pos < m_packet_size) break; + assert(m_recv_pos == m_packet_size); + + if (m_active) + { + // verify peer_id + // TODO: It seems like the original client ignores to check the peer id + // can that be correct? + if (!std::equal(m_recv_buffer.begin(), m_recv_buffer.begin() + 20, (const char*)m_peer_id.begin())) + { +#ifndef NDEBUG + (*m_logger) << m_socket->sender().as_string() << " invalid peer_id (it doesn't equal the one from the tracker)\n"; +#endif + throw network_error(0); + } + } + else + { + // check to make sure we don't have another connection with the same + // info_hash and peer_id. If we do. close this connection. + std::copy(m_recv_buffer.begin(), m_recv_buffer.begin() + 20, (char*)m_peer_id.begin()); + + if (m_torrent->has_peer(m_peer_id)) + { +#ifndef NDEBUG + (*m_logger) << m_socket->sender().as_string() << " duplicate connection, closing\n"; +#endif + throw network_error(0); + } + + m_attached_to_torrent = true; + m_torrent->attach_peer(this); + assert(m_torrent->get_policy().has_connection(this)); + } + + m_state = read_packet_size; + m_packet_size = 4; + m_recv_pos = 0; + m_recv_buffer.resize(4); +#ifndef NDEBUG + (*m_logger) << m_socket->sender().as_string() << " received peer_id\n"; +#endif + break; + } + + + case read_packet_size: + m_statistics.received_bytes(0, received); + if (m_recv_pos < m_packet_size) break; + assert(m_recv_pos == m_packet_size); + + // convert from big endian to native byte order + m_packet_size = read_int(&m_recv_buffer[0]); + // don't accept packets larger than 1 MB + if (m_packet_size > 1024*1024 || m_packet_size < 0) + { +#ifndef NDEBUG + (*m_logger) << m_socket->sender().as_string() << " packet too large (packet_size > 1 Megabyte), abort\n"; +#endif + // packet too large + throw network_error(0); + } + + if (m_packet_size == 0) + { + // keepalive message + m_state = read_packet_size; + m_packet_size = 4; + } + else + { + m_state = read_packet; + m_recv_buffer.resize(m_packet_size); + } + m_recv_pos = 0; + assert(m_packet_size > 0); + break; + + case read_packet: + + if (dispatch_message(received)) + { m_state = read_packet_size; m_packet_size = 4; m_recv_buffer.resize(4); m_recv_pos = 0; assert(m_packet_size > 0); - break; } + break; } } } @@ -1085,8 +1108,7 @@ void libtorrent::peer_connection::send_data() #ifndef NDEBUG (*m_logger) << m_socket->sender().as_string() << " ==> PIECE [ piece: " << r.piece << " | s: " << r.start << " | l: " << r.length << " ]\n"; #endif - // let the torrent keep track of how much we have uploaded - m_statistics.sent_bytes(r.length, packet_size - r.length); + m_payloads.push_back(range(send_buffer_offset+13, r.length)); } else { @@ -1143,6 +1165,37 @@ void libtorrent::peer_connection::send_data() m_send_quota_left -= sent; } + int amount_payload = 0; + if (!m_payloads.empty()) + { + for (std::deque::iterator i = m_payloads.begin(); + i != m_payloads.end(); + ++i) + { + i->start -= sent; + if (i->start < 0) + { + if (i->start + i->length <= 0) + { + amount_payload += i->length; + } + else + { + amount_payload += -i->start; + i->length -= -i->start; + i->start = 0; + } + } + } + } + // remove all payload ranges that has been sent + m_payloads.erase( + std::remove_if(m_payloads.begin(), m_payloads.end(), range_below_zero) + , m_payloads.end()); + + assert(amount_payload <= sent); + m_statistics.sent_bytes(amount_payload, sent - amount_payload); + // empty the entire buffer at once or if // only a part of the buffer could be sent // remove the part that was sent from the buffer @@ -1189,7 +1242,6 @@ void libtorrent::peer_connection::keep_alive() char noop[] = {0,0,0,0}; m_send_buffer.insert(m_send_buffer.end(), noop, noop+4); m_last_sent = boost::posix_time::second_clock::local_time(); - m_statistics.sent_bytes(0, 4); #ifndef NDEBUG (*m_logger) << m_socket->sender().as_string() << " ==> NOP\n"; #endif diff --git a/src/policy.cpp b/src/policy.cpp index a4cd80fda..0f0e6268c 100755 --- a/src/policy.cpp +++ b/src/policy.cpp @@ -66,10 +66,10 @@ namespace return false; } - piece_block find_first_common(const std::vector& queue, + piece_block find_first_common(const std::deque& queue, const std::vector& busy) { - for (std::vector::const_reverse_iterator i + for (std::deque::const_reverse_iterator i = queue.rbegin(); i != queue.rend(); ++i) @@ -145,7 +145,7 @@ namespace i != t.end(); ++i) { - const std::vector& queue = (*i)->download_queue(); + const std::deque& queue = (*i)->download_queue(); if ((*i)->statistics().down_peak() > down_speed && has_intersection(busy_pieces.begin(), busy_pieces.end(), diff --git a/src/session.cpp b/src/session.cpp index d5b19c6e5..26f244af6 100755 --- a/src/session.cpp +++ b/src/session.cpp @@ -433,17 +433,6 @@ namespace libtorrent loops_per_second = 0; #endif - // distribute the maximum upload rate among the peers - // TODO: implement an intelligent algorithm that - // will shift bandwidth from the peers that can't - // utilize all their assigned bandwidth to the peers - // that actually can maintain the upload rate. - // This should probably be done by accumulating the - // left-over bandwidth to next second. Since the - // the sockets consumes its data in rather big chunks. - - control_upload_rates(m_upload_rate, m_connections); - // do the second_tick() on each connection // this will update their statistics (download and upload speeds) // also purge sockets that have timed out @@ -495,21 +484,19 @@ namespace libtorrent i->second->second_tick(); ++i; } + // distribute the maximum upload rate among the peers + // TODO: implement an intelligent algorithm that + // will shift bandwidth from the peers that can't + // utilize all their assigned bandwidth to the peers + // that actually can maintain the upload rate. + // This should probably be done by accumulating the + // left-over bandwidth to next second. Since the + // the sockets consumes its data in rather big chunks. + + control_upload_rates(m_upload_rate, m_connections); + + m_tracker_manager.tick(); - -#ifndef NDEBUG - (*m_logger) << "peers: " << m_connections.size() << " \n"; - for (connection_map::iterator i = m_connections.begin(); - i != m_connections.end(); - ++i) - { - (*m_logger) << "h: " << i->first->sender().as_string() - << " | down: " << i->second->statistics().download_rate() - << " b/s | up: " << i->second->statistics().upload_rate() - << " b/s \n"; - } -#endif - } while (!m_tracker_manager.send_finished()) diff --git a/src/torrent.cpp b/src/torrent.cpp index f8af30876..0705c0909 100755 --- a/src/torrent.cpp +++ b/src/torrent.cpp @@ -400,7 +400,7 @@ namespace libtorrent // if the peer_connection was downloading any pieces // abort them - for (std::vector::const_iterator i = p->download_queue().begin(); + for (std::deque::const_iterator i = p->download_queue().begin(); i != p->download_queue().end(); ++i) {