From f7042ca84afb058bd88716a1bcc5279f3d43a753 Mon Sep 17 00:00:00 2001 From: Arvid Norberg Date: Mon, 7 Jul 2008 12:04:06 +0000 Subject: [PATCH] improved piece timeout and peer snubbing logic --- include/libtorrent/alert_types.hpp | 81 ++++++++ include/libtorrent/peer_connection.hpp | 36 ++-- include/libtorrent/policy.hpp | 2 - src/bt_peer_connection.cpp | 2 +- src/peer_connection.cpp | 247 ++++++++++++++----------- src/policy.cpp | 20 +- src/torrent.cpp | 12 +- 7 files changed, 252 insertions(+), 148 deletions(-) diff --git a/include/libtorrent/alert_types.hpp b/include/libtorrent/alert_types.hpp index 4422c6f87..69f971f13 100644 --- a/include/libtorrent/alert_types.hpp +++ b/include/libtorrent/alert_types.hpp @@ -360,6 +360,31 @@ namespace libtorrent tcp::endpoint ip; }; + struct TORRENT_EXPORT peer_unsnubbed_alert: torrent_alert + { + peer_unsnubbed_alert(torrent_handle const& h, tcp::endpoint const& ip_ + , peer_id const& pid_) + : torrent_alert(h) + , ip(ip_) + , pid(pid_) + {} + + virtual std::auto_ptr clone() const + { return std::auto_ptr(new peer_unsnubbed_alert(*this)); } + virtual char const* what() const { return "peer unsnubbed"; } + const static int static_category = alert::peer_notification; + virtual int category() const { return static_category; } + virtual std::string message() const + { + error_code ec; + return torrent_alert::message() + " peer unsnubbed: (" + ip.address().to_string(ec) + + ")"; + } + + tcp::endpoint ip; + peer_id pid; + }; + struct TORRENT_EXPORT peer_snubbed_alert: torrent_alert { peer_snubbed_alert(torrent_handle const& h, tcp::endpoint const& ip_ @@ -531,6 +556,62 @@ namespace libtorrent } }; + struct TORRENT_EXPORT request_dropped_alert: torrent_alert + { + request_dropped_alert( + const torrent_handle& h + , int block_num + , int piece_num) + : torrent_alert(h) + , block_index(block_num) + , piece_index(piece_num) + { TORRENT_ASSERT(block_index >= 0 && piece_index >= 0);} + + int block_index; + int piece_index; + + virtual std::auto_ptr clone() const + { return std::auto_ptr(new request_dropped_alert(*this)); } + virtual char const* what() const { return "block request dropped"; } + const static int static_category = alert::progress_notification + | alert::peer_notification; + virtual int category() const { return static_category; } + virtual std::string message() const + { + return torrent_alert::message() + " block " + + boost::lexical_cast(block_index) + " in piece " + + boost::lexical_cast(piece_index) + " was dropped by remote peer"; + } + }; + + struct TORRENT_EXPORT block_timeout_alert: torrent_alert + { + block_timeout_alert( + const torrent_handle& h + , int block_num + , int piece_num) + : torrent_alert(h) + , block_index(block_num) + , piece_index(piece_num) + { TORRENT_ASSERT(block_index >= 0 && piece_index >= 0);} + + int block_index; + int piece_index; + + virtual std::auto_ptr clone() const + { return std::auto_ptr(new block_timeout_alert(*this)); } + virtual char const* what() const { return "block timed out"; } + const static int static_category = alert::progress_notification + | alert::peer_notification; + virtual int category() const { return static_category; } + virtual std::string message() const + { + return torrent_alert::message() + " timed out block " + + boost::lexical_cast(block_index) + " in piece " + + boost::lexical_cast(piece_index); + } + }; + struct TORRENT_EXPORT block_finished_alert: torrent_alert { block_finished_alert( diff --git a/include/libtorrent/peer_connection.hpp b/include/libtorrent/peer_connection.hpp index f43857d5e..8eebb113b 100644 --- a/include/libtorrent/peer_connection.hpp +++ b/include/libtorrent/peer_connection.hpp @@ -88,9 +88,21 @@ namespace libtorrent struct session_impl; } - struct TORRENT_EXPORT protocol_error: std::runtime_error + struct pending_block { - protocol_error(const std::string& msg): std::runtime_error(msg) {}; + pending_block(piece_block const& b): skipped(0), block(b) {} + int skipped; + // the number of times the request + // has been skipped by out of order blocks + piece_block block; + }; + + struct has_block + { + has_block(piece_block const& b): block(b) {} + piece_block const& block; + bool operator()(pending_block const& pb) const + { return pb.block == block; } }; class TORRENT_EXPORT peer_connection @@ -209,7 +221,7 @@ namespace libtorrent void set_pid(const peer_id& pid) { m_peer_id = pid; } bool has_piece(int i) const; - std::deque const& download_queue() const; + std::deque const& download_queue() const; std::deque const& request_queue() const; std::deque const& upload_queue() const; @@ -238,6 +250,8 @@ namespace libtorrent // is called once every second by the main loop void second_tick(float tick_interval); + void timeout_requests(); + boost::shared_ptr get_socket() const { return m_socket; } tcp::endpoint const& remote() const { return m_remote; } @@ -349,6 +363,8 @@ namespace libtorrent void send_interested(); void send_not_interested(); + void snub_peer(); + // adds a block to the request queue void add_request(piece_block const& b); // removes a block from the request queue or download queue @@ -573,6 +589,11 @@ namespace libtorrent // download queue. Used for request timeout ptime m_requested; + // if the timeout is extended for the outstanding + // requests, this is the number of seconds it was + // extended. + int m_timeout_extend; + // a timestamp when the remote download rate // was last updated ptime m_remote_dl_update; @@ -646,7 +667,7 @@ namespace libtorrent // the queue of blocks we have requested // from this peer - std::deque m_download_queue; + std::deque m_download_queue; // the pieces we will send to the peer // if requested (regardless of choke state) @@ -792,13 +813,6 @@ namespace libtorrent // is used to fill the bitmask in init() bool m_have_all:1; - // if this is true, this peer is assumed to handle all piece - // requests in fifo order. All skipped blocks are re-requested - // immediately instead of having a looser requirement - // where blocks can be sent out of order. The default is to - // allow non-fifo order. - bool m_assume_fifo:1; - // this is true if this connection has been added // to the list of connections that will be closed. bool m_disconnecting:1; diff --git a/include/libtorrent/policy.hpp b/include/libtorrent/policy.hpp index 0c9380b8a..158163531 100644 --- a/include/libtorrent/policy.hpp +++ b/include/libtorrent/policy.hpp @@ -99,8 +99,6 @@ namespace libtorrent // the peer has got at least one interesting piece void peer_is_interesting(peer_connection& c); - int count_choked() const; - // the peer unchoked us void unchoked(peer_connection& c); diff --git a/src/bt_peer_connection.cpp b/src/bt_peer_connection.cpp index 16e63728a..cf9e3bc34 100644 --- a/src/bt_peer_connection.cpp +++ b/src/bt_peer_connection.cpp @@ -826,7 +826,7 @@ namespace libtorrent TORRENT_ASSERT(t); while (!download_queue().empty()) { - piece_block const& b = download_queue().front(); + piece_block const& b = download_queue().front().block; peer_request r; r.piece = b.piece_index; r.start = b.block_index * t->block_size(); diff --git a/src/peer_connection.cpp b/src/peer_connection.cpp index 89b429fe7..6c2b2eb88 100644 --- a/src/peer_connection.cpp +++ b/src/peer_connection.cpp @@ -82,6 +82,7 @@ namespace libtorrent , m_last_receive(time_now()) , m_last_sent(time_now()) , m_requested(min_time()) + , m_timeout_extend(0) , m_remote_dl_update(time_now()) , m_became_uninterested(time_now()) , m_became_uninteresting(time_now()) @@ -121,7 +122,6 @@ namespace libtorrent , m_failed(false) , m_ignore_bandwidth_limits(false) , m_have_all(false) - , m_assume_fifo(false) , m_disconnecting(false) , m_connecting(true) , m_queued(true) @@ -185,6 +185,7 @@ namespace libtorrent , m_last_receive(time_now()) , m_last_sent(time_now()) , m_requested(min_time()) + , m_timeout_extend(0) , m_remote_dl_update(time_now()) , m_became_uninterested(time_now()) , m_became_uninteresting(time_now()) @@ -223,7 +224,6 @@ namespace libtorrent , m_failed(false) , m_ignore_bandwidth_limits(false) , m_have_all(false) - , m_assume_fifo(false) , m_disconnecting(false) , m_connecting(false) , m_queued(false) @@ -574,7 +574,7 @@ namespace libtorrent return m_request_queue; } - std::deque const& peer_connection::download_queue() const + std::deque const& peer_connection::download_queue() const { return m_download_queue; } @@ -851,9 +851,10 @@ namespace libtorrent if (is_disconnecting()) return; - std::deque::iterator i = std::find_if( + std::deque::iterator i = std::find_if( m_download_queue.begin(), m_download_queue.end() - , bind(match_request, boost::cref(r), _1, t->block_size())); + , bind(match_request, boost::cref(r), bind(&pending_block::block, _1) + , t->block_size())); #ifdef TORRENT_VERBOSE_LOGGING (*m_logger) << time_now_string() @@ -863,7 +864,7 @@ namespace libtorrent piece_block b(-1, 0); if (i != m_download_queue.end()) { - b = *i; + b = i->block; m_download_queue.erase(i); // if the peer is in parole mode, keep the request @@ -1512,11 +1513,11 @@ namespace libtorrent TORRENT_ASSERT(p.length == t->block_size() || p.length == t->torrent_file().total_size() % t->block_size()); - std::deque::iterator b - = std::find( + std::deque::iterator b + = std::find_if( m_download_queue.begin() , m_download_queue.end() - , block_finished); + , has_block(block_finished)); if (b == m_download_queue.end()) { @@ -1535,39 +1536,41 @@ namespace libtorrent return; } - if (m_assume_fifo) + for (std::deque::iterator i = m_download_queue.begin(); + i != b;) { - for (std::deque::iterator i = m_download_queue.begin(); - i != b; ++i) - { + #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING - (*m_logger) << time_now_string() - << " *** SKIPPED_PIECE [ piece: " << i->piece_index << " | " - "b: " << i->block_index << " ] ***\n"; + (*m_logger) << time_now_string() + << " *** SKIPPED_PIECE [ piece: " << i->piece_index << " | " + "b: " << i->block_index << " ] ***\n"; #endif - // since this piece was skipped, clear it and allow it to - // be requested from other peers - // TODO: send cancel? - picker.abort_download(*i); + + ++i->skipped; + // if the number of times a block is skipped by out of order + // blocks exceeds the size of the outstanding queue, assume that + // the other end dropped the request. + if (i->skipped > m_desired_queue_size) + { + if (m_ses.m_alerts.should_post()) + m_ses.m_alerts.post_alert(request_dropped_alert(t->get_handle() + , i->block.block_index, i->block.piece_index)); + picker.abort_download(i->block); + i = m_download_queue.erase(i); + } + else + { + ++i; } - - // remove the request that just finished - // from the download queue plus the - // skipped blocks. - m_download_queue.erase(m_download_queue.begin(), b); - b = m_download_queue.begin(); - TORRENT_ASSERT(*b == block_finished); } - - if (total_seconds(time_now() - m_requested) < m_ses.settings().request_timeout) - m_snubbed = false; - + // if the block we got is already finished, then ignore it if (picker.is_downloaded(block_finished)) { t->received_redundant_data(p.length); m_download_queue.erase(b); + m_timeout_extend = 0; if (!m_download_queue.empty()) m_requested = time_now(); @@ -1577,12 +1580,25 @@ namespace libtorrent return; } + if (total_seconds(time_now() - m_requested) + < m_ses.settings().request_timeout + && m_snubbed) + { + m_snubbed = false; + if (m_ses.m_alerts.should_post()) + { + m_ses.m_alerts.post_alert(peer_unsnubbed_alert(t->get_handle() + , m_remote, m_peer_id)); + } + } + fs.async_write(p, data, bind(&peer_connection::on_disk_write_complete , self(), _1, _2, p, t)); m_outstanding_writing_bytes += p.length; TORRENT_ASSERT(m_channel_state[download_channel] == peer_info::bw_idle); m_download_queue.erase(b); + m_timeout_extend = 0; if (!m_download_queue.empty()) m_requested = time_now(); @@ -1902,8 +1918,10 @@ namespace libtorrent TORRENT_ASSERT(block.block_index < t->torrent_file().piece_size(block.piece_index)); TORRENT_ASSERT(!t->picker().is_requested(block) || (t->picker().num_peers(block) > 0)); TORRENT_ASSERT(!t->have_piece(block.piece_index)); - TORRENT_ASSERT(std::find(m_download_queue.begin(), m_download_queue.end(), block) == m_download_queue.end()); - TORRENT_ASSERT(std::find(m_request_queue.begin(), m_request_queue.end(), block) == m_request_queue.end()); + TORRENT_ASSERT(std::find_if(m_download_queue.begin(), m_download_queue.end() + , has_block(block)) == m_download_queue.end()); + TORRENT_ASSERT(std::find(m_request_queue.begin(), m_request_queue.end() + , block) == m_request_queue.end()); piece_picker::piece_state_t state; peer_speed_t speed = peer_speed(); @@ -1955,18 +1973,20 @@ namespace libtorrent // cancelled, then just ignore the cancel. if (!t->picker().is_requested(block)) return; - std::deque::iterator it - = std::find(m_download_queue.begin(), m_download_queue.end(), block); + std::deque::iterator it + = std::find_if(m_download_queue.begin(), m_download_queue.end(), has_block(block)); if (it == m_download_queue.end()) { - it = std::find(m_request_queue.begin(), m_request_queue.end(), block); + std::deque::iterator rit = std::find(m_request_queue.begin() + , m_request_queue.end(), block); + // when a multi block is received, it is cancelled // from all peers, so if this one hasn't requested // the block, just ignore to cancel it. - if (it == m_request_queue.end()) return; + if (rit == m_request_queue.end()) return; t->picker().abort_download(block); - m_request_queue.erase(it); + m_request_queue.erase(rit); // since we found it in the request queue, it means it hasn't been // sent yet, so we don't have to send a cancel. return; @@ -2260,7 +2280,7 @@ namespace libtorrent while (!m_download_queue.empty()) { - picker.abort_download(m_download_queue.back()); + picker.abort_download(m_download_queue.back().block); m_download_queue.pop_back(); } while (!m_request_queue.empty()) @@ -2354,7 +2374,8 @@ namespace libtorrent p.send_quota = m_bandwidth_limit[upload_channel].quota_left(); p.receive_quota = m_bandwidth_limit[download_channel].quota_left(); if (m_download_queue.empty()) p.request_timeout = -1; - else p.request_timeout = total_seconds(m_requested - now) + m_ses.settings().request_timeout; + else p.request_timeout = total_seconds(m_requested - now) + m_ses.settings().request_timeout + + m_timeout_extend; #ifndef TORRENT_DISABLE_GEO_IP p.inet_as_name = m_inet_as_name; #endif @@ -2608,29 +2629,12 @@ namespace libtorrent } if (!m_download_queue.empty() - && now > m_requested + seconds(m_ses.settings().request_timeout) - && t->has_picker()) + && now > m_requested + seconds(m_ses.settings().request_timeout + + m_timeout_extend)) { - if (!m_snubbed) - { - m_snubbed = true; - if (m_ses.m_alerts.should_post()) - { - m_ses.m_alerts.post_alert(peer_snubbed_alert(t->get_handle() - , m_remote, m_peer_id)); - } - } - m_desired_queue_size = 1; - piece_picker& picker = t->picker(); - // the front request timed out! - picker.abort_download(m_download_queue[0]); - m_download_queue.pop_front(); - if (!m_download_queue.empty()) - m_requested = time_now(); - request_a_block(*t, *this); - send_block_requests(); + snub_peer(); } - + // if we haven't sent something in too long, send a keep-alive keep_alive(); @@ -2686,7 +2690,8 @@ namespace libtorrent } if (!m_download_queue.empty() - && now - m_last_piece > seconds(m_ses.settings().piece_timeout)) + && now - m_last_piece > seconds(m_ses.settings().piece_timeout + + m_timeout_extend)) { // this peer isn't sending the pieces we've // requested (this has been observed by BitComet) @@ -2698,51 +2703,7 @@ namespace libtorrent << " " << total_seconds(now - m_last_piece) << "] ***\n"; #endif - if (!m_snubbed) - { - m_snubbed = true; - if (m_ses.m_alerts.should_post()) - { - m_ses.m_alerts.post_alert(peer_snubbed_alert(t->get_handle() - , m_remote, m_peer_id)); - } - } - m_desired_queue_size = 1; - - if (t->is_seed()) - { - m_download_queue.clear(); - m_request_queue.clear(); - } - else - { - piece_picker& picker = t->picker(); - - std::deque dl(m_download_queue); - for (std::deque::iterator i = dl.begin() - , end(dl.end()); i != end; ++i) - { - piece_block const& r = m_download_queue.back(); -#ifdef TORRENT_VERBOSE_LOGGING - (*m_logger) << time_now_string() - << " ==> CANCEL [ piece: " << r.piece_index - << " | block: " << r.block_index - << " ]\n"; -#endif - write_cancel(t->to_req(r)); - } - while (!m_request_queue.empty()) - { - piece_block const& r = m_request_queue.back(); - picker.abort_download(r); - m_request_queue.pop_back(); - } - - m_assume_fifo = true; - - request_a_block(*t, *this); - send_block_requests(); - } + snub_peer(); } // If the client sends more data @@ -2802,6 +2763,71 @@ namespace libtorrent fill_send_buffer(); } + void peer_connection::snub_peer() + { + boost::shared_ptr t = m_torrent.lock(); + TORRENT_ASSERT(t); + + if (!m_snubbed) + { + m_snubbed = true; + if (m_ses.m_alerts.should_post()) + { + m_ses.m_alerts.post_alert(peer_snubbed_alert(t->get_handle() + , m_remote, m_peer_id)); + } + } + m_desired_queue_size = 1; + + if (!t->has_picker()) return; + piece_picker& picker = t->picker(); + + piece_block r(-1, -1); + // time out the last request in the queue + if (!m_request_queue.empty()) + { + r = m_request_queue.back(); + m_request_queue.pop_back(); + } + else + { + TORRENT_ASSERT(!m_download_queue.empty()); + r = m_download_queue.back().block; + + // only time out a request if it blocks the piece + // from being completed (i.e. no free blocks to + // request from it) + piece_picker::downloading_piece p; + picker.piece_info(r.piece_index, p); + int free_blocks = picker.blocks_in_piece(r.piece_index) + - p.finished - p.writing - p.requested; + if (free_blocks > 0) + { + m_timeout_extend += m_ses.settings().request_timeout; + return; + } + + if (m_ses.m_alerts.should_post()) + { + m_ses.m_alerts.post_alert(block_timeout_alert(t->get_handle() + , r.block_index, r.piece_index)); + } + m_download_queue.pop_back(); + } + if (!m_download_queue.empty() || !m_request_queue.empty()) + m_timeout_extend += m_ses.settings().request_timeout; + + request_a_block(*t, *this); + send_block_requests(); + + // abort the block after the new one has + // been requested in order to prevent it from + // picking the same block again, stalling the + // same piece indefinitely. + if (r != piece_block(-1, -1)) + picker.abort_download(r); + } + void peer_connection::fill_send_buffer() { INVARIANT_CHECK; @@ -3530,7 +3556,8 @@ namespace libtorrent TORRENT_ASSERT(m_bandwidth_limit[upload_channel].quota_left() == 0); std::set unique; - std::copy(m_download_queue.begin(), m_download_queue.end(), std::inserter(unique, unique.begin())); + std::transform(m_download_queue.begin(), m_download_queue.end() + , std::inserter(unique, unique.begin()), boost::bind(&pending_block::block, _1)); std::copy(m_request_queue.begin(), m_request_queue.end(), std::inserter(unique, unique.begin())); TORRENT_ASSERT(unique.size() == m_download_queue.size() + m_request_queue.size()); if (m_peer_info) @@ -3565,9 +3592,9 @@ namespace libtorrent for (std::deque::const_iterator i = p.request_queue().begin() , end(p.request_queue().end()); i != end; ++i) ++num_requests[*i]; - for (std::deque::const_iterator i = p.download_queue().begin() + for (std::deque::const_iterator i = p.download_queue().begin() , end(p.download_queue().end()); i != end; ++i) - ++num_requests[*i]; + ++num_requests[i->block]; } for (std::map::iterator i = num_requests.begin() , end(num_requests.end()); i != end; ++i) diff --git a/src/policy.cpp b/src/policy.cpp index bb75e0994..944585a76 100644 --- a/src/policy.cpp +++ b/src/policy.cpp @@ -271,7 +271,7 @@ namespace libtorrent (*c.m_logger) << time_now_string() << " PIECE_PICKER [ php: " << prefer_whole_pieces << " picked: " << interesting_pieces.size() << " ]\n"; #endif - std::deque const& dq = c.download_queue(); + std::deque const& dq = c.download_queue(); std::deque const& rq = c.request_queue(); for (std::vector::iterator i = interesting_pieces.begin(); i != interesting_pieces.end(); ++i) @@ -282,7 +282,7 @@ namespace libtorrent { if (num_requests <= 0) break; // don't request pieces we already have in our request queue - if (std::find(dq.begin(), dq.end(), *i) != dq.end() + if (std::find_if(dq.begin(), dq.end(), has_block(*i)) != dq.end() || std::find(rq.begin(), rq.end(), *i) != rq.end()) continue; @@ -539,22 +539,6 @@ namespace libtorrent } } - int policy::count_choked() const - { - int ret = 0; - for (const_iterator i = m_peers.begin(); - i != m_peers.end(); ++i) - { - if (!i->second.connection - || i->second.connection->is_connecting() - || i->second.connection->is_disconnecting() - || !i->second.connection->is_peer_interested()) - continue; - if (i->second.connection->is_choked()) ++ret; - } - return ret; - } - bool policy::new_connection(peer_connection& c) { TORRENT_ASSERT(!c.is_local()); diff --git a/src/torrent.cpp b/src/torrent.cpp index ef0e93de4..b9dea84f6 100644 --- a/src/torrent.cpp +++ b/src/torrent.cpp @@ -1474,13 +1474,13 @@ namespace libtorrent i != m_connections.end(); ++i) { peer_connection* p = *i; - std::deque const& dq = p->download_queue(); + std::deque const& dq = p->download_queue(); std::deque const& rq = p->request_queue(); - for (std::deque::const_iterator k = dq.begin() + for (std::deque::const_iterator k = dq.begin() , end(dq.end()); k != end; ++k) { - if (k->piece_index != index) continue; - m_picker->mark_as_downloading(*k, p->peer_info_struct() + if (k->block.piece_index != index) continue; + m_picker->mark_as_downloading(k->block, p->peer_info_struct() , (piece_picker::piece_state_t)p->peer_speed()); } for (std::deque::const_iterator k = rq.begin() @@ -3330,9 +3330,9 @@ namespace libtorrent for (std::deque::const_iterator i = p.request_queue().begin() , end(p.request_queue().end()); i != end; ++i) ++num_requests[*i]; - for (std::deque::const_iterator i = p.download_queue().begin() + for (std::deque::const_iterator i = p.download_queue().begin() , end(p.download_queue().end()); i != end; ++i) - ++num_requests[*i]; + ++num_requests[i->block]; if (!p.is_choked()) ++num_uploads; torrent* associated_torrent = p.associated_torrent().lock().get(); if (associated_torrent != this)