/* Copyright (c) 2003, Arvid Norberg All rights reserved. Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: * Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. * Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. * Neither the name of the author nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ #include #include #include #include "libtorrent/peer_connection.hpp" #include "libtorrent/session.hpp" #include "libtorrent/identify_client.hpp" #include "libtorrent/entry.hpp" #include "libtorrent/bencode.hpp" #if defined(_MSC_VER) #define for if (false) {} else for #endif #define VERBOSE namespace libtorrent { // the names of the extensions to look for in // the extensions-message const char* peer_connection::extension_names[] = { "gzip" }; const peer_connection::message_handler peer_connection::m_message_handler[] = { &peer_connection::on_choke, &peer_connection::on_unchoke, &peer_connection::on_interested, &peer_connection::on_not_interested, &peer_connection::on_have, &peer_connection::on_bitfield, &peer_connection::on_request, &peer_connection::on_piece, &peer_connection::on_cancel, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, &peer_connection::on_extension_list, &peer_connection::on_extended }; peer_connection::peer_connection( detail::session_impl& ses , selector& sel , torrent* t , boost::shared_ptr s , const peer_id& p) : m_state(read_protocol_length) , m_timeout(120) , m_packet_size(1) , m_recv_pos(0) , m_last_receive(boost::gregorian::date(std::time(0))) , m_last_sent(boost::gregorian::date(std::time(0))) , m_selector(sel) , m_socket(s) , m_torrent(t) , m_attached_to_torrent(true) , m_ses(ses) , m_active(true) , m_added_to_selector(false) , m_peer_id(p) , m_peer_interested(false) , m_peer_choked(true) , m_interesting(false) , m_choked(true) , m_supports_extensions(false) , m_free_upload(0) , m_send_quota(100) , m_send_quota_left(100) , m_send_quota_limit(100) , m_trust_points(0) { assert(!m_socket->is_blocking()); assert(m_torrent != 0); #ifndef NDEBUG m_logger = m_ses.create_log(s->sender().as_string().c_str()); #endif // initialize the extension list to zero, since // we don't know which extensions the other // end supports yet std::fill(m_extension_messages, m_extension_messages + num_supported_extensions, 0); send_handshake(); // start in the state where we are trying to read the // handshake from the other side m_recv_buffer.resize(1); // assume the other end has no pieces m_have_piece.resize(m_torrent->torrent_file().num_pieces()); std::fill(m_have_piece.begin(), m_have_piece.end(), false); send_bitfield(); } peer_connection::peer_connection( detail::session_impl& ses , selector& sel , boost::shared_ptr s) : m_state(read_protocol_length) , m_timeout(120) , m_packet_size(1) , m_recv_pos(0) , m_last_receive(boost::gregorian::date(std::time(0))) , m_last_sent(boost::gregorian::date(std::time(0))) , m_selector(sel) , m_socket(s) , m_torrent(0) , m_attached_to_torrent(0) , m_ses(ses) , m_active(false) , m_added_to_selector(false) , m_peer_id() , m_peer_interested(false) , m_peer_choked(true) , m_interesting(false) , m_choked(true) , m_supports_extensions(false) , m_free_upload(0) , m_send_quota(100) , m_send_quota_left(100) , m_send_quota_limit(100) , m_trust_points(0) { assert(!m_socket->is_blocking()); #ifndef NDEBUG m_logger = m_ses.create_log(s->sender().as_string().c_str()); #endif // initialize the extension list to zero, since // we don't know which extensions the other // end supports yet std::fill(m_extension_messages, m_extension_messages + num_supported_extensions, 0); // we are not attached to any torrent yet. // we have to wait for the handshake to see // which torrent the connector want's to connect to // start in the state where we are trying to read the // handshake from the other side m_recv_buffer.resize(1); } peer_connection::~peer_connection() { m_selector.remove(m_socket); if (m_attached_to_torrent) { assert(m_torrent != 0); m_torrent->remove_peer(this); } } void peer_connection::set_send_quota(int num_bytes) { assert(num_bytes <= m_send_quota_limit || m_send_quota_limit == -1); if (num_bytes > m_send_quota_limit && m_send_quota_limit!=-1) num_bytes = m_send_quota_limit; m_send_quota = num_bytes; m_send_quota_left = num_bytes; send_buffer_updated(); } void peer_connection::send_handshake() { assert(m_send_buffer.size() == 0); // add handshake to the send buffer const char version_string[] = "BitTorrent protocol"; const int string_len = sizeof(version_string)-1; int pos = 1; m_send_buffer.resize(1 + string_len + 8 + 20 + 20); // length of version string m_send_buffer[0] = string_len; // version string itself std::copy( version_string , version_string+string_len , m_send_buffer.begin()+pos); pos += string_len; // 8 zeroes std::fill( m_send_buffer.begin() + pos , m_send_buffer.begin() + pos + 8 , 0); // indicate that we support the extension protocol m_send_buffer[pos] = 0x80; pos += 8; // info hash std::copy( m_torrent->torrent_file().info_hash().begin() , m_torrent->torrent_file().info_hash().end() , m_send_buffer.begin() + pos); pos += 20; // peer id std::copy( m_ses.get_peer_id().begin() , m_ses.get_peer_id().end() , m_send_buffer.begin() + pos); #ifndef NDEBUG (*m_logger) << m_socket->sender().as_string() << " ==> HANDSHAKE\n"; #endif send_buffer_updated(); } boost::optional peer_connection::downloading_piece() const { // are we currently receiving a 'piece' message? if (m_state != read_packet || m_recv_pos < 9 || m_recv_buffer[0] != msg_piece) return boost::optional(); const char* ptr = &m_recv_buffer[1]; int piece_index = detail::read_int(ptr); int offset = detail::read_int(ptr); int len = m_packet_size - 9; // is any of the piece message header data invalid? // TODO: make sure that len is == block_size or less only // if its's the last block. if (piece_index < 0 || piece_index >= m_torrent->torrent_file().num_pieces() || offset < 0 || offset + len > m_torrent->torrent_file().piece_size(piece_index) || offset % m_torrent->block_size() != 0) return boost::optional(); piece_block_progress p; p.piece_index = piece_index; p.block_index = offset / m_torrent->block_size(); p.bytes_downloaded = m_recv_pos - 9; p.full_block_bytes = len; return boost::optional(p); } // message handlers // ----------------------------- // ----------- CHOKE ----------- // ----------------------------- void peer_connection::on_choke(int received) { if (m_packet_size != 1) throw protocol_error("'choke' message size != 1"); m_statistics.received_bytes(0, received); if (m_recv_pos < m_packet_size) return; #ifndef NDEBUG (*m_logger) << m_socket->sender().as_string() << " <== CHOKE\n"; #endif m_peer_choked = true; m_torrent->get_policy().choked(*this); // remove all pieces from this peers download queue and // remove the 'downloading' flag from piece_picker. for (std::deque::iterator i = m_download_queue.begin(); i != m_download_queue.end(); ++i) { m_torrent->picker().abort_download(*i); } m_download_queue.clear(); #ifndef NDEBUG // m_torrent->picker().integrity_check(m_torrent); #endif } // ----------------------------- // ---------- UNCHOKE ---------- // ----------------------------- void peer_connection::on_unchoke(int received) { if (m_packet_size != 1) throw protocol_error("'unchoke' message size != 1"); m_statistics.received_bytes(0, received); if (m_recv_pos < m_packet_size) return; #ifndef NDEBUG (*m_logger) << m_socket->sender().as_string() << " <== UNCHOKE\n"; #endif m_peer_choked = false; m_torrent->get_policy().unchoked(*this); } // ----------------------------- // -------- INTERESTED --------- // ----------------------------- void peer_connection::on_interested(int received) { if (m_packet_size != 1) throw protocol_error("'interested' message size != 1"); m_statistics.received_bytes(0, received); if (m_recv_pos < m_packet_size) return; #ifndef NDEBUG (*m_logger) << m_socket->sender().as_string() << " <== INTERESTED\n"; #endif m_peer_interested = true; m_torrent->get_policy().interested(*this); } // ----------------------------- // ------ NOT INTERESTED ------- // ----------------------------- void peer_connection::on_not_interested(int received) { if (m_packet_size != 1) throw protocol_error("'not interested' message size != 1"); m_statistics.received_bytes(0, received); if (m_recv_pos < m_packet_size) return; // clear the request queue if the client isn't interested m_requests.clear(); #ifndef NDEBUG (*m_logger) << m_socket->sender().as_string() << " <== NOT_INTERESTED\n"; #endif m_peer_interested = false; m_torrent->get_policy().not_interested(*this); } // ----------------------------- // ----------- HAVE ------------ // ----------------------------- void peer_connection::on_have(int received) { if (m_packet_size != 5) throw protocol_error("'have' message size != 5"); m_statistics.received_bytes(0, received); if (m_recv_pos < m_packet_size) return; const char* ptr = &m_recv_buffer[1]; int index = detail::read_int(ptr); // if we got an invalid message, abort if (index >= m_have_piece.size() || index < 0) throw protocol_error("have message with higher index than the number of pieces"); #ifndef NDEBUG (*m_logger) << m_socket->sender().as_string() << " <== HAVE [ piece: " << index << "]\n"; #endif if (m_have_piece[index]) { #ifndef NDEBUG (*m_logger) << m_socket->sender().as_string() << " oops.. we already knew that: " << index << "\n"; #endif } else { m_have_piece[index] = true; m_torrent->peer_has(index); if (!m_torrent->have_piece(index) && !is_interesting()) m_torrent->get_policy().peer_is_interesting(*this); } } // ----------------------------- // --------- BITFIELD ---------- // ----------------------------- void peer_connection::on_bitfield(int received) { if (m_packet_size - 1 != (m_have_piece.size() + 7) / 8) throw protocol_error("bitfield with invalid size"); m_statistics.received_bytes(0, received); if (m_recv_pos < m_packet_size) return; #ifndef NDEBUG (*m_logger) << m_socket->sender().as_string() << " <== BITFIELD\n"; #endif // build a vector of all pieces std::vector piece_list; for (std::size_t i = 0; i < m_have_piece.size(); ++i) { bool have = m_recv_buffer[1 + (i>>3)] & (1 << (7 - (i&7))); if (have && !m_have_piece[i]) { m_have_piece[i] = true; piece_list.push_back(i); } else if (!have && m_have_piece[i]) { m_have_piece[i] = false; m_torrent->peer_lost(i); } } // shuffle the piece list std::random_shuffle(piece_list.begin(), piece_list.end()); // let the torrent know which pieces the // peer has, in a shuffled order bool interesting = false; for (std::vector::iterator i = piece_list.begin(); i != piece_list.end(); ++i) { int index = *i; m_torrent->peer_has(index); if (!m_torrent->have_piece(index)) interesting = true; } if (piece_list.empty()) { #ifndef NDEBUG (*m_logger) << m_socket->sender().as_string() << " *** THIS IS A SEED ***\n"; #endif } if (interesting) m_torrent->get_policy().peer_is_interesting(*this); } // ----------------------------- // ---------- REQUEST ---------- // ----------------------------- void peer_connection::on_request(int received) { if (m_packet_size != 13) throw protocol_error("'request' message size != 13"); m_statistics.received_bytes(0, received); if (m_recv_pos < m_packet_size) return; peer_request r; const char* ptr = &m_recv_buffer[1]; r.piece = detail::read_int(ptr); r.start = detail::read_int(ptr); r.length = detail::read_int(ptr); // make sure this request // is legal and taht the peer // is not choked if (r.piece >= 0 && r.piece < m_torrent->torrent_file().num_pieces() && r.start >= 0 && r.start < m_torrent->torrent_file().piece_size(r.piece) && r.length > 0 && r.length + r.start < m_torrent->torrent_file().piece_size(r.piece) && m_peer_interested) { // if we have choked the client // ignore the request if (m_choked) return; m_requests.push_back(r); send_buffer_updated(); #ifndef NDEBUG (*m_logger) << m_socket->sender().as_string() << " <== REQUEST [ piece: " << r.piece << " | s: " << r.start << " | l: " << r.length << " ]\n"; #endif } else { // TODO: log this illegal request } } // ----------------------------- // ----------- PIECE ----------- // ----------------------------- void peer_connection::on_piece(int received) { if (m_recv_pos <= 9) // only received protocol data m_statistics.received_bytes(0, received); else if (m_recv_pos - received >= 9) // only received payload data m_statistics.received_bytes(received, 0); else { // received a bit of both assert(m_recv_pos - received < 9); assert(m_recv_pos > 9); assert(9 - (m_recv_pos - received) <= 9); m_statistics.received_bytes( m_recv_pos - 9 , 9 - (m_recv_pos - received)); } if (m_recv_pos < m_packet_size) return; const char* ptr = &m_recv_buffer[1]; int index = detail::read_int(ptr); if (index < 0 || index >= m_torrent->torrent_file().num_pieces()) { #ifndef NDEBUG (*m_logger) << m_socket->sender().as_string() << " piece index invalid\n"; #endif throw protocol_error("invalid piece index in piece message"); } int offset = detail::read_int(ptr); int len = m_packet_size - 9; if (offset < 0) { #ifndef NDEBUG (*m_logger) << m_socket->sender().as_string() << " offset < 0\n"; #endif throw protocol_error("offset < 0 in piece message"); } if (offset + len > m_torrent->torrent_file().piece_size(index)) { #ifndef NDEBUG (*m_logger) << m_socket->sender().as_string() << " piece packet contains more data than the piece size\n"; #endif throw protocol_error("piece message contains more data than the piece size"); } // TODO: make sure that len is == block_size or less only // if its's the last block. if (offset % m_torrent->block_size() != 0) { #ifndef NDEBUG (*m_logger) << m_socket->sender().as_string() << " piece packet contains unaligned offset\n"; #endif throw protocol_error("piece message contains unaligned offset"); } /* piece_block req = m_download_queue.front(); if (req.piece_index != index) { #ifndef NDEBUG (*m_logger) << m_socket->sender().as_string() << " piece packet contains unrequested index\n"; #endif return false; } if (req.block_index != offset / m_torrent->block_size()) { #ifndef NDEBUG (*m_logger) << m_socket->sender().as_string() << " piece packet contains unrequested offset\n"; #endif return false; } */ #ifndef NDEBUG (*m_logger) << m_socket->sender().as_string() << " <== PIECE [ piece: " << index << " | s: " << offset << " | l: " << len << " ]\n"; #endif piece_picker& picker = m_torrent->picker(); piece_block block_finished(index, offset / m_torrent->block_size()); std::deque::iterator b = std::find( m_download_queue.begin() , m_download_queue.end() , block_finished); if (b != m_download_queue.end()) { // pop the request that just finished // from the download queue m_download_queue.erase(b); } else { // TODO: cancel the block from the // peer that has taken over it. } // if the block we got is already finished, then ignore it if (picker.is_finished(block_finished)) return; m_torrent->filesystem().write(&m_recv_buffer[9], index, offset, len); picker.mark_as_finished(block_finished, m_peer_id); m_torrent->get_policy().block_finished(*this, block_finished); // did we just finish the piece? if (picker.is_piece_finished(index)) { bool verified = m_torrent->verify_piece(index); if (verified) { m_torrent->announce_piece(index); } else { m_torrent->piece_failed(index); } m_torrent->get_policy().piece_finished(index, verified); } } // ----------------------------- // ---------- CANCEL ----------- // ----------------------------- void peer_connection::on_cancel(int received) { if (m_packet_size != 13) throw protocol_error("'cancel' message size != 13"); m_statistics.received_bytes(0, received); if (m_recv_pos < m_packet_size) return; peer_request r; const char* ptr = &m_recv_buffer[1]; r.piece = detail::read_int(ptr); r.start = detail::read_int(ptr); r.length = detail::read_int(ptr); std::deque::iterator i = std::find(m_requests.begin(), m_requests.end(), r); if (i != m_requests.end()) { m_requests.erase(i); } if (!has_data() && m_added_to_selector) { m_added_to_selector = false; m_selector.remove_writable(m_socket); } #ifndef NDEBUG (*m_logger) << m_socket->sender().as_string() << " <== CANCEL [ piece: " << r.piece << " | s: " << r.start << " | l: " << r.length << " ]\n"; #endif } // ----------------------------- // ------ EXTENSION LIST ------- // ----------------------------- void peer_connection::on_extension_list(int received) { if (m_packet_size > 100 * 1024) { // too big extension message, abort throw protocol_error("'extensions' message size > 100kB"); } m_statistics.received_bytes(0, received); if (m_recv_pos < m_packet_size) return; try { entry e = bdecode(m_recv_buffer.begin()+1, m_recv_buffer.end()); entry::dictionary_type& extensions = e.dict(); for (int i = 0; i < num_supported_extensions; ++i) { entry::dictionary_type::iterator f = extensions.find(extension_names[i]); if (f != extensions.end()) { m_extension_messages[i] = f->second.integer(); } } } catch(invalid_encoding& e) { throw protocol_error("'extensions' packet contains invalid bencoding"); } catch(type_error& e) { throw protocol_error("'extensions' packet contains incorrect types"); } } // ----------------------------- // --------- EXTENDED ---------- // ----------------------------- void peer_connection::on_extended(int received) { } bool peer_connection::dispatch_message(int received) { assert(m_recv_pos >= received); assert(m_recv_pos > 0); int packet_type = m_recv_buffer[0]; if (packet_type < 0 || packet_type >= num_supported_messages || m_message_handler[packet_type] == 0) { throw protocol_error("unknown message id"); } assert(m_message_handler[packet_type] != 0); // call the correct handler for this packet type (this->*m_message_handler[packet_type])(received); if (m_recv_pos < m_packet_size) return false; assert(m_recv_pos == m_packet_size); return true; } void peer_connection::cancel_block(piece_block block) { assert(block.piece_index >= 0); assert(block.piece_index < m_torrent->torrent_file().num_pieces()); assert(m_torrent->picker().is_downloading(block)); m_torrent->picker().abort_download(block); std::deque::iterator i = std::find(m_download_queue.begin(), m_download_queue.end(), block); assert(i != m_download_queue.end()); m_download_queue.erase(i); int block_offset = block.block_index * m_torrent->block_size(); int block_size = std::min((int)m_torrent->torrent_file().piece_size(block.piece_index)-block_offset, m_torrent->block_size()); assert(block_size > 0); assert(block_size <= m_torrent->block_size()); char buf[] = {0,0,0,13, msg_cancel}; std::size_t start_offset = m_send_buffer.size(); m_send_buffer.resize(start_offset + 17); std::copy(buf, buf + 5, m_send_buffer.begin()+start_offset); start_offset += 5; char* ptr = &m_send_buffer[start_offset]; // index detail::write_int(block.piece_index, ptr); // begin detail::write_int(block_offset, ptr); // length detail::write_int(block_size, ptr); #ifndef NDEBUG (*m_logger) << m_socket->sender().as_string() << " ==> CANCEL [ piece: " << block.piece_index << " | s: " << block_offset << " | l: " << block_size << " | " << block.block_index << " ]\n"; #endif send_buffer_updated(); } void peer_connection::request_block(piece_block block) { assert(block.piece_index >= 0); assert(block.piece_index < m_torrent->torrent_file().num_pieces()); assert(!m_torrent->picker().is_downloading(block)); m_torrent->picker().mark_as_downloading(block, m_peer_id); m_download_queue.push_back(block); int block_offset = block.block_index * m_torrent->block_size(); int block_size = std::min((int)m_torrent->torrent_file().piece_size(block.piece_index)-block_offset, m_torrent->block_size()); assert(block_size > 0); assert(block_size <= m_torrent->block_size()); char buf[] = {0,0,0,13, msg_request}; std::size_t start_offset = m_send_buffer.size(); m_send_buffer.resize(start_offset + 17); std::copy(buf, buf + 5, m_send_buffer.begin()+start_offset); char* ptr = &m_send_buffer[start_offset+5]; // index detail::write_int(block.piece_index, ptr); // begin detail::write_int(block_offset, ptr); // length detail::write_int(block_size, ptr); #ifndef NDEBUG (*m_logger) << m_socket->sender().as_string() << " ==> REQUEST [ piece: " << block.piece_index << " | s: " << block_offset << " | l: " << block_size << " | " << block.block_index << " ]\n"; #endif send_buffer_updated(); } void peer_connection::send_bitfield() { #ifndef NDEBUG (*m_logger) << m_socket->sender().as_string() << " ==> BITFIELD\n"; #endif const int packet_size = (m_have_piece.size() + 7) / 8 + 5; const int old_size = m_send_buffer.size(); m_send_buffer.resize(old_size + packet_size); char* ptr = &m_send_buffer[old_size]; detail::write_int(packet_size - 4, ptr); m_send_buffer[old_size+4] = msg_bitfield; std::fill(m_send_buffer.begin()+old_size+5, m_send_buffer.end(), 0); for (std::size_t i = 0; i < m_have_piece.size(); ++i) { if (m_torrent->have_piece(i)) m_send_buffer[old_size + 5 + (i>>3)] |= 1 << (7 - (i&7)); } send_buffer_updated(); } void peer_connection::send_extensions() { #ifndef NDEBUG (*m_logger) << m_socket->sender().as_string() << " ==> EXTENSIONS\n"; #endif assert(m_supports_extensions); entry extension_list(entry::dictionary_t); for (int i = 0; i < num_supported_extensions; ++i) { entry msg_index(entry::int_t); msg_index.integer() = i; extension_list.dict()[extension_names[i]] = msg_index; } #ifndef NDEBUG extension_list.print(std::cout); #endif // make room for message size const int msg_size_pos = m_send_buffer.size(); m_send_buffer.resize(msg_size_pos + 4); m_send_buffer.push_back(msg_extension_list); bencode(std::back_inserter(m_send_buffer), extension_list); // write the length of the message char* ptr = &m_send_buffer[msg_size_pos]; detail::write_int(m_send_buffer.size() - msg_size_pos - 4, ptr); send_buffer_updated(); } void peer_connection::choke() { if (m_choked) return; char msg[] = {0,0,0,1,msg_choke}; m_send_buffer.insert(m_send_buffer.end(), msg, msg+sizeof(msg)); m_choked = true; #ifndef NDEBUG (*m_logger) << m_socket->sender().as_string() << " ==> CHOKE\n"; #endif m_requests.clear(); send_buffer_updated(); } void peer_connection::unchoke() { if (!m_choked) return; char msg[] = {0,0,0,1,msg_unchoke}; m_send_buffer.insert(m_send_buffer.end(), msg, msg+sizeof(msg)); m_choked = false; #ifndef NDEBUG (*m_logger) << m_socket->sender().as_string() << " ==> UNCHOKE\n"; #endif send_buffer_updated(); } void peer_connection::interested() { if (m_interesting) return; char msg[] = {0,0,0,1,msg_interested}; m_send_buffer.insert(m_send_buffer.end(), msg, msg+sizeof(msg)); m_interesting = true; #ifndef NDEBUG (*m_logger) << m_socket->sender().as_string() << " ==> INTERESTED\n"; #endif send_buffer_updated(); } void peer_connection::not_interested() { if (!m_interesting) return; char msg[] = {0,0,0,1,msg_not_interested}; m_send_buffer.insert(m_send_buffer.end(), msg, msg+sizeof(msg)); m_interesting = false; #ifndef NDEBUG (*m_logger) << m_socket->sender().as_string() << " ==> NOT_INTERESTED\n"; #endif send_buffer_updated(); } void peer_connection::send_have(int index) { const int packet_size = 9; char msg[packet_size] = {0,0,0,5,msg_have}; char* ptr = msg+5; detail::write_int(index, ptr); m_send_buffer.insert(m_send_buffer.end(), msg, msg + packet_size); #ifndef NDEBUG (*m_logger) << m_socket->sender().as_string() << " ==> HAVE [ piece: " << index << " ]\n"; #endif send_buffer_updated(); } void peer_connection::second_tick() { m_statistics.second_tick(); m_send_quota_left = m_send_quota; if (m_send_quota > 0) send_buffer_updated(); // If the client sends more data // we send it data faster, otherwise, slower. // It will also depend on how much data the // client has sent us. This is the mean to // maintain a 1:1 share ratio with all peers. int diff = share_diff(); if (diff > 2*m_torrent->block_size() || m_torrent->is_seed()) { // if we have downloaded more than one piece more // than we have uploaded OR if we are a seed // have an unlimited upload rate m_send_quota_limit = -1; } else { // if we have downloaded too much, response with an // upload rate of 10 kB/s more than we dowlload // if we have uploaded too much, send with a rate of // 10 kB/s less than we receive int bias = 0; if (diff > -2*m_torrent->block_size()) { bias = m_statistics.download_rate() / 2; if (bias < 10*1024) bias = 10*1024; } else { bias = -m_statistics.download_rate() / 2; } m_send_quota_limit = m_statistics.download_rate() + bias; // the maximum send_quota given our download rate from this peer if (m_send_quota_limit < 256) m_send_quota_limit = 256; } } // -------------------------- // RECEIVE DATA // -------------------------- // throws exception when the client should be disconnected void peer_connection::receive_data() { assert(!m_socket->is_blocking()); assert(m_packet_size > 0); assert(m_socket->is_readable()); for(;;) { assert(m_packet_size > 0); int received = m_socket->receive(&m_recv_buffer[m_recv_pos], m_packet_size - m_recv_pos); // connection closed if (received == 0) { throw network_error(0); } // an error if (received < 0) { // would_block means that no data was ready to be received // returns to exit the loop if (m_socket->last_error() == socket::would_block) return; // the connection was closed throw network_error(m_socket->last_error()); } if (received > 0) { m_last_receive = boost::posix_time::second_clock::local_time(); m_recv_pos += received; switch(m_state) { case read_protocol_length: m_statistics.received_bytes(0, received); if (m_recv_pos < m_packet_size) break; assert(m_recv_pos == m_packet_size); m_packet_size = reinterpret_cast(m_recv_buffer[0]); #ifndef NDEBUG (*m_logger) << m_socket->sender().as_string() << " protocol length: " << m_packet_size << "\n"; #endif m_state = read_protocol_string; m_recv_buffer.resize(m_packet_size); m_recv_pos = 0; if (m_packet_size == 0) { #ifndef NDEBUG (*m_logger) << "incorrect protocol length\n"; #endif throw network_error(0); } break; case read_protocol_string: { m_statistics.received_bytes(0, received); if (m_recv_pos < m_packet_size) break; assert(m_recv_pos == m_packet_size); #ifndef NDEBUG (*m_logger) << m_socket->sender().as_string() << " protocol: '" << std::string(m_recv_buffer.begin(), m_recv_buffer.end()) << "'\n"; #endif const char protocol_string[] = "BitTorrent protocol"; const int protocol_len = sizeof(protocol_string) - 1; if (!std::equal(m_recv_buffer.begin(), m_recv_buffer.end(), protocol_string)) { #ifndef NDEBUG (*m_logger) << "incorrect protocol name\n"; #endif throw network_error(0); } m_state = read_info_hash; m_packet_size = 28; m_recv_pos = 0; m_recv_buffer.resize(28); } break; case read_info_hash: { m_statistics.received_bytes(0, received); if (m_recv_pos < m_packet_size) break; assert(m_recv_pos == m_packet_size); // ok, now we have got enough of the handshake. Is this connection // attached to a torrent? // TODO: if the protocol is to be extended // these 8 bytes would be used to describe the // extensions available on the other side if (m_recv_buffer[0] & 0x80) { m_supports_extensions = true; } if (m_torrent == 0) { // now, we have to see if there's a torrent with the // info_hash we got from the peer sha1_hash info_hash; std::copy(m_recv_buffer.begin()+8, m_recv_buffer.begin() + 28, (char*)info_hash.begin()); m_torrent = m_ses.find_torrent(info_hash); if (m_torrent == 0) { // we couldn't find the torrent! #ifndef NDEBUG (*m_logger) << m_socket->sender().as_string() << " couldn't find a torrent with the given info_hash\n"; #endif throw network_error(0); } // assume the other end has no pieces m_have_piece.resize(m_torrent->torrent_file().num_pieces()); std::fill(m_have_piece.begin(), m_have_piece.end(), false); // yes, we found the torrent // reply with our handshake std::copy(m_recv_buffer.begin()+28, m_recv_buffer.begin() + 48, (char*)m_peer_id.begin()); send_handshake(); send_bitfield(); } else { // verify info hash if (!std::equal(m_recv_buffer.begin()+8, m_recv_buffer.begin() + 28, (const char*)m_torrent->torrent_file().info_hash().begin())) { #ifndef NDEBUG (*m_logger) << m_socket->sender().as_string() << " received invalid info_hash\n"; #endif throw network_error(0); } } if (m_supports_extensions) send_extensions(); m_state = read_peer_id; m_packet_size = 20; m_recv_pos = 0; m_recv_buffer.resize(20); #ifndef NDEBUG (*m_logger) << m_socket->sender().as_string() << " info_hash received\n"; #endif break; } case read_peer_id: { m_statistics.received_bytes(0, received); if (m_recv_pos < m_packet_size) break; assert(m_recv_pos == m_packet_size); #ifndef NDEBUG { peer_id tmp; std::copy(m_recv_buffer.begin(), m_recv_buffer.begin() + 20, (char*)tmp.begin()); std::stringstream s; s << "received peer_id: " << tmp << " client: " << identify_client(tmp) << "\n"; (*m_logger) << s.str(); } #endif if (m_active) { // verify peer_id // TODO: It seems like the original client ignores to check the peer id // can that be correct? if (!std::equal(m_recv_buffer.begin(), m_recv_buffer.begin() + 20, (const char*)m_peer_id.begin())) { #ifndef NDEBUG (*m_logger) << m_socket->sender().as_string() << " invalid peer_id (it doesn't equal the one from the tracker)\n"; #endif throw network_error(0); } } else { // check to make sure we don't have another connection with the same // info_hash and peer_id. If we do. close this connection. std::copy(m_recv_buffer.begin(), m_recv_buffer.begin() + 20, (char*)m_peer_id.begin()); if (m_torrent->has_peer(m_peer_id)) { #ifndef NDEBUG (*m_logger) << " duplicate connection, closing\n"; #endif throw network_error(0); } m_attached_to_torrent = true; m_torrent->attach_peer(this); assert(m_torrent->get_policy().has_connection(this)); } m_state = read_packet_size; m_packet_size = 4; m_recv_pos = 0; m_recv_buffer.resize(4); break; } case read_packet_size: { m_statistics.received_bytes(0, received); if (m_recv_pos < m_packet_size) break; assert(m_recv_pos == m_packet_size); // convert from big endian to native byte order const char* ptr = &m_recv_buffer[0]; m_packet_size = detail::read_int(ptr); // don't accept packets larger than 1 MB if (m_packet_size > 1024*1024 || m_packet_size < 0) { #ifndef NDEBUG (*m_logger) << m_socket->sender().as_string() << " packet too large (packet_size > 1 Megabyte), abort\n"; #endif // packet too large throw network_error(0); } if (m_packet_size == 0) { // keepalive message m_state = read_packet_size; m_packet_size = 4; } else { m_state = read_packet; m_recv_buffer.resize(m_packet_size); } m_recv_pos = 0; assert(m_packet_size > 0); break; } case read_packet: if (dispatch_message(received)) { m_state = read_packet_size; m_packet_size = 4; m_recv_buffer.resize(4); m_recv_pos = 0; assert(m_packet_size > 0); } break; } } } assert(m_packet_size > 0); } bool peer_connection::has_data() const throw() { // if we have requests or pending data to be sent or announcements to be made // we want to send data return ((!m_requests.empty() && !m_choked) || !m_send_buffer.empty() || !m_announce_queue.empty()) && m_send_quota_left != 0; } // -------------------------- // SEND DATA // -------------------------- // throws exception when the client should be disconnected void peer_connection::send_data() { assert(m_socket->is_writable()); assert(has_data()); // only add new piece-chunks if the send buffer is small enough // otherwise there will be no end to how large it will be! // TODO: make this a bit better. Don't always read the entire // requested block. Have a limit of how much of the requested // block is actually read at a time. while (!m_requests.empty() && (m_send_buffer.size() < m_torrent->block_size()) && !m_choked) { peer_request& r = m_requests.front(); if (r.piece >= 0 && r.piece < m_have_piece.size() && m_torrent && m_torrent->have_piece(r.piece)) { // make sure the request is ok if (r.start + r.length > m_torrent->torrent_file().piece_size(r.piece)) { // NOT OK! disconnect throw network_error(0); } if (r.length <= 0 || r.start < 0) { // NOT OK! disconnect throw network_error(0); } #ifndef NDEBUG assert(m_torrent->verify_piece(r.piece) && "internal error"); #endif const int send_buffer_offset = m_send_buffer.size(); const int packet_size = 4 + 5 + 4 + r.length; m_send_buffer.resize(send_buffer_offset + packet_size); char* ptr = &m_send_buffer[send_buffer_offset]; detail::write_int(packet_size-4, ptr); *ptr = msg_piece; ++ptr; detail::write_int(r.piece, ptr); detail::write_int(r.start, ptr); m_torrent->filesystem().read( &m_send_buffer[send_buffer_offset+13] , r.piece , r.start , r.length); #ifndef NDEBUG (*m_logger) << m_socket->sender().as_string() << " ==> PIECE [ piece: " << r.piece << " | s: " << r.start << " | l: " << r.length << " ]\n"; #endif m_payloads.push_back(range(send_buffer_offset+13, r.length)); } else { #ifndef NDEBUG (*m_logger) << m_socket->sender().as_string() << " *** WARNING [ illegal piece request idx: " << r.piece << " | s: " << r.start << " | l: " << r.length << " | max_piece: " << m_have_piece.size() << " | torrent: " << (m_torrent != 0) << " | have: " << m_torrent->have_piece(r.piece) << " ]\n"; #endif } m_requests.erase(m_requests.begin()); } if (!m_announce_queue.empty()) { for (std::vector::iterator i = m_announce_queue.begin(); i != m_announce_queue.end(); ++i) { // (*m_logger) << "have piece: " << *i << " sent to: " << m_socket->sender().as_string() << "\n"; send_have(*i); } m_announce_queue.clear(); } assert(m_send_quota_left != 0); // send the actual buffer if (!m_send_buffer.empty()) { int amount_to_send = m_send_buffer.size(); assert(m_send_quota_left != 0); if (m_send_quota_left > 0) amount_to_send = std::min(m_send_quota_left, amount_to_send); // we have data that's scheduled for sending int sent = m_socket->send( &m_send_buffer[0] , amount_to_send); #ifndef NDEBUG // (*m_logger) << m_socket->sender().as_string() << " ==> SENT [ length: " << sent << " ]\n"; #endif if (sent > 0) { if (m_send_quota_left != -1) { assert(m_send_quota_left >= sent); m_send_quota_left -= sent; } int amount_payload = 0; if (!m_payloads.empty()) { for (std::deque::iterator i = m_payloads.begin(); i != m_payloads.end(); ++i) { i->start -= sent; if (i->start < 0) { if (i->start + i->length <= 0) { amount_payload += i->length; } else { amount_payload += -i->start; i->length -= -i->start; i->start = 0; } } } } // remove all payload ranges that has been sent m_payloads.erase( std::remove_if(m_payloads.begin(), m_payloads.end(), range_below_zero) , m_payloads.end()); assert(amount_payload <= sent); m_statistics.sent_bytes(amount_payload, sent - amount_payload); // empty the entire buffer at once or if // only a part of the buffer could be sent // remove the part that was sent from the buffer if (sent == m_send_buffer.size()) { m_send_buffer.clear(); } else { m_send_buffer.erase( m_send_buffer.begin() , m_send_buffer.begin() + sent); } } else { assert(sent == -1); throw network_error(m_socket->last_error()); } m_last_sent = boost::posix_time::second_clock::local_time(); } assert(m_added_to_selector); send_buffer_updated(); #ifndef NDEBUG if (has_data()) { if (m_socket->is_writable()) { std::cout << "ERROR\n"; } } #endif } void peer_connection::keep_alive() { boost::posix_time::time_duration d; d = boost::posix_time::second_clock::local_time() - m_last_sent; if (d.seconds() > m_timeout / 2) { char noop[] = {0,0,0,0}; m_send_buffer.insert(m_send_buffer.end(), noop, noop+4); m_last_sent = boost::posix_time::second_clock::local_time(); #ifndef NDEBUG (*m_logger) << m_socket->sender().as_string() << " ==> NOP\n"; #endif send_buffer_updated(); } } }