From cdf459dad579aa533756ede1f3f34ab0df5765ea Mon Sep 17 00:00:00 2001 From: Arvid Norberg Date: Fri, 25 Dec 2009 16:52:57 +0000 Subject: [PATCH] only keep one outstanding duplicate request per peer --- ChangeLog | 2 + include/libtorrent/peer_connection.hpp | 16 +++-- src/peer_connection.cpp | 96 ++++++++++++++++---------- src/policy.cpp | 10 +-- src/torrent.cpp | 21 +++--- 5 files changed, 89 insertions(+), 56 deletions(-) diff --git a/ChangeLog b/ChangeLog index 7905db013..12ef9cec3 100644 --- a/ChangeLog +++ b/ChangeLog @@ -84,6 +84,8 @@ * support min_interval tracker extension * added session saving and loading functions * added support for min-interval in tracker responses + * only keeps one outstanding duplicate request per peer + reduces waste download, specifically when streaming release 0.14.8 diff --git a/include/libtorrent/peer_connection.hpp b/include/libtorrent/peer_connection.hpp index 73b9b18af..9322c5c84 100644 --- a/include/libtorrent/peer_connection.hpp +++ b/include/libtorrent/peer_connection.hpp @@ -97,7 +97,8 @@ namespace libtorrent struct pending_block { pending_block(piece_block const& b) - : skipped(0), not_wanted(false), timed_out(false), block(b) {} + : skipped(0), not_wanted(false), timed_out(false) + , busy(false), block(b) {} // the number of times the request // has been skipped by out of order blocks @@ -111,6 +112,12 @@ namespace libtorrent // unexpectedly from the peer bool not_wanted:1; bool timed_out:1; + + // the busy flag is set if the block was + // requested from another peer when this + // request was queued. We only allow a single + // busy request at a time in each peer's queue + bool busy:1; piece_block block; @@ -275,7 +282,7 @@ namespace libtorrent bool has_piece(int i) const; std::vector const& download_queue() const; - std::vector const& request_queue() const; + std::vector const& request_queue() const; std::vector const& upload_queue() const; // estimate of how long it will take until we have @@ -440,7 +447,8 @@ namespace libtorrent // adds a block to the request queue // returns true if successful, false otherwise - bool add_request(piece_block const& b, bool time_critical = false); + enum flags_t { req_time_critical = 1, req_busy = 2 }; + bool add_request(piece_block const& b, int flags = 0); // clears the request queue and sends cancels for all messages // in the download queue @@ -766,7 +774,7 @@ namespace libtorrent // the blocks we have reserved in the piece // picker and will request from this peer. - std::vector m_request_queue; + std::vector m_request_queue; // the queue of blocks we have requested // from this peer diff --git a/src/peer_connection.cpp b/src/peer_connection.cpp index 2d0ba1f57..3ebf25b56 100644 --- a/src/peer_connection.cpp +++ b/src/peer_connection.cpp @@ -831,7 +831,7 @@ namespace libtorrent return m_have_piece[i]; } - std::vector const& peer_connection::request_queue() const + std::vector const& peer_connection::request_queue() const { return m_request_queue; } @@ -1088,10 +1088,10 @@ namespace libtorrent if (!t->is_seed()) { piece_picker& p = t->picker(); - for (std::vector::const_iterator i = m_request_queue.begin() + for (std::vector::const_iterator i = m_request_queue.begin() , end(m_request_queue.end()); i != end; ++i) { - p.abort_download(*i); + p.abort_download(i->block); } } m_request_queue.clear(); @@ -1140,7 +1140,7 @@ namespace libtorrent if (i != m_download_queue.end()) { - piece_block b = i->block; + pending_block b = *i; bool remove_from_picker = !i->timed_out && !i->not_wanted; m_download_queue.erase(i); TORRENT_ASSERT(m_outstanding_bytes >= r.length); @@ -1158,7 +1158,7 @@ namespace libtorrent else if (!t->is_seed() && remove_from_picker) { piece_picker& p = t->picker(); - p.abort_download(b); + p.abort_download(b.block); } #if !defined TORRENT_DISABLE_INVARIANT_CHECKS && defined TORRENT_DEBUG check_invariant(); @@ -1941,10 +1941,10 @@ namespace libtorrent // assume our outstanding bytes includes this piece too if (!in_req_queue) { - for (std::vector::iterator i = m_request_queue.begin() + for (std::vector::iterator i = m_request_queue.begin() , end(m_request_queue.end()); i != end; ++i) { - if (*i != b) continue; + if (i->block != b) continue; in_req_queue = true; m_request_queue.erase(i); break; @@ -2573,18 +2573,18 @@ namespace libtorrent void peer_connection::make_time_critical(piece_block const& block) { - std::vector::iterator rit = std::find(m_request_queue.begin() - , m_request_queue.end(), block); + std::vector::iterator rit = std::find_if(m_request_queue.begin() + , m_request_queue.end(), has_block(block)); if (rit == m_request_queue.end()) return; // ignore it if it's already time critical if (rit - m_request_queue.begin() < m_queued_time_critical) return; - piece_block b = *rit; + pending_block b = *rit; m_request_queue.erase(rit); m_request_queue.insert(m_request_queue.begin() + m_queued_time_critical, b); ++m_queued_time_critical; } - bool peer_connection::add_request(piece_block const& block, bool time_critical) + bool peer_connection::add_request(piece_block const& block, int flags) { INVARIANT_CHECK; @@ -2624,6 +2624,24 @@ namespace libtorrent state = piece_picker::slow; } + if (flags & req_busy) + { + // this block is busy (i.e. it has been requested + // from another peer already). Only allow one busy + // request in the pipeline at the time + for (std::vector::const_iterator i = m_download_queue.begin() + , end(m_download_queue.end()); i != end; ++i) + { + if (i->busy) return false; + } + + for (std::vector::const_iterator i = m_request_queue.begin() + , end(m_request_queue.end()); i != end; ++i) + { + if (i->busy) return false; + } + } + if (!t->picker().mark_as_downloading(block, peer_info_struct(), state)) return false; @@ -2633,15 +2651,17 @@ namespace libtorrent remote(), pid(), speedmsg, block.block_index, block.piece_index)); } - if (time_critical) + pending_block pb(block); + pb.busy = flags & req_busy; + if (flags & req_time_critical) { m_request_queue.insert(m_request_queue.begin() + m_queued_time_critical - , block); + , pb); ++m_queued_time_critical; } else { - m_request_queue.push_back(block); + m_request_queue.push_back(pb); } return true; } @@ -2662,7 +2682,7 @@ namespace libtorrent while (!m_request_queue.empty()) { - t->picker().abort_download(m_request_queue.back()); + t->picker().abort_download(m_request_queue.back().block); m_request_queue.pop_back(); } m_queued_time_critical = 0; @@ -2724,8 +2744,8 @@ namespace libtorrent = std::find_if(m_download_queue.begin(), m_download_queue.end(), has_block(block)); if (it == m_download_queue.end()) { - std::vector::iterator rit = std::find(m_request_queue.begin() - , m_request_queue.end(), block); + std::vector::iterator rit = std::find_if(m_request_queue.begin() + , m_request_queue.end(), has_block(block)); // when a multi block is received, it is cancelled // from all peers, so if this one hasn't requested @@ -2877,16 +2897,16 @@ namespace libtorrent && ((int)m_download_queue.size() < m_desired_queue_size || m_queued_time_critical > 0)) { - piece_block block = m_request_queue.front(); + pending_block block = m_request_queue.front(); - int block_offset = block.block_index * t->block_size(); + int block_offset = block.block.block_index * t->block_size(); int block_size = (std::min)(t->torrent_file().piece_size( - block.piece_index) - block_offset, t->block_size()); + block.block.piece_index) - block_offset, t->block_size()); TORRENT_ASSERT(block_size > 0); TORRENT_ASSERT(block_size <= t->block_size()); peer_request r; - r.piece = block.piece_index; + r.piece = block.block.piece_index; r.start = block_offset; r.length = block_size; @@ -2895,10 +2915,11 @@ namespace libtorrent if (t->is_seed()) continue; // this can happen if a block times out, is re-requested and // then arrives "unexpectedly" - if (t->picker().is_finished(block) || t->picker().is_downloaded(block)) + if (t->picker().is_finished(block.block) + || t->picker().is_downloaded(block.block)) continue; - TORRENT_ASSERT(verify_piece(t->to_req(block))); + TORRENT_ASSERT(verify_piece(t->to_req(block.block))); m_download_queue.push_back(block); m_outstanding_bytes += block_size; #if !defined TORRENT_DISABLE_INVARIANT_CHECKS && defined TORRENT_DEBUG @@ -2922,26 +2943,26 @@ namespace libtorrent { // check to see if this block is connected to the previous one // if it is, merge them, otherwise, break this merge loop - piece_block const& front = m_request_queue.front(); - if (front.piece_index * blocks_per_piece + front.block_index - != block.piece_index * blocks_per_piece + block.block_index + 1) + pending_block const& front = m_request_queue.front(); + if (front.block.piece_index * blocks_per_piece + front.block.block_index + != block.block.piece_index * blocks_per_piece + block.block.block_index + 1) break; block = m_request_queue.front(); m_request_queue.erase(m_request_queue.begin()); - TORRENT_ASSERT(verify_piece(t->to_req(block))); + TORRENT_ASSERT(verify_piece(t->to_req(block.block))); m_download_queue.push_back(block); if (m_queued_time_critical) --m_queued_time_critical; #ifdef TORRENT_VERBOSE_LOGGING (*m_logger) << time_now_string() << " *** MERGING REQUEST ** [ " - "piece: " << block.piece_index << " | " - "block: " << block.block_index << " ]\n"; + "piece: " << block.block.piece_index << " | " + "block: " << block.block.block_index << " ]\n"; #endif - block_offset = block.block_index * t->block_size(); + block_offset = block.block.block_index * t->block_size(); block_size = (std::min)(t->torrent_file().piece_size( - block.piece_index) - block_offset, t->block_size()); + block.block.piece_index) - block_offset, t->block_size()); TORRENT_ASSERT(block_size > 0); TORRENT_ASSERT(block_size <= t->block_size()); @@ -3076,7 +3097,7 @@ namespace libtorrent } while (!m_request_queue.empty()) { - picker.abort_download(m_request_queue.back()); + picker.abort_download(m_request_queue.back().block); m_request_queue.pop_back(); } } @@ -3744,9 +3765,9 @@ namespace libtorrent // time out the last request in the queue if (prev_request_queue > 0) { - std::vector::iterator i + std::vector::iterator i = m_request_queue.begin() + (prev_request_queue - 1); - r = *i; + r = i->block; m_request_queue.erase(i); if (prev_request_queue <= m_queued_time_critical) --m_queued_time_critical; @@ -4764,7 +4785,8 @@ namespace libtorrent std::set unique; std::transform(m_download_queue.begin(), m_download_queue.end() , std::inserter(unique, unique.begin()), boost::bind(&pending_block::block, _1)); - std::copy(m_request_queue.begin(), m_request_queue.end(), std::inserter(unique, unique.begin())); + std::transform(m_request_queue.begin(), m_request_queue.end() + , std::inserter(unique, unique.begin()), boost::bind(&pending_block::block, _1)); TORRENT_ASSERT(unique.size() == m_download_queue.size() + m_request_queue.size()); if (m_peer_info) { @@ -4825,9 +4847,9 @@ namespace libtorrent TORRENT_ASSERT(m_ses.has_peer(*i)); #endif peer_connection const& p = *(*i); - for (std::vector::const_iterator i = p.request_queue().begin() + for (std::vector::const_iterator i = p.request_queue().begin() , end(p.request_queue().end()); i != end; ++i) - ++num_requests[*i]; + ++num_requests[i->block]; for (std::vector::const_iterator i = p.download_queue().begin() , end(p.download_queue().end()); i != end; ++i) if (!i->not_wanted && !i->timed_out) ++num_requests[i->block]; diff --git a/src/policy.cpp b/src/policy.cpp index 9c0c5aa78..3aa6c725a 100644 --- a/src/policy.cpp +++ b/src/policy.cpp @@ -221,7 +221,7 @@ namespace libtorrent << " picked: " << interesting_pieces.size() << " ]\n"; #endif std::vector const& dq = c.download_queue(); - std::vector const& rq = c.request_queue(); + std::vector const& rq = c.request_queue(); for (std::vector::iterator i = interesting_pieces.begin(); i != interesting_pieces.end(); ++i) { @@ -232,7 +232,7 @@ namespace libtorrent if (num_requests <= 0) break; // don't request pieces we already have in our request queue if (std::find_if(dq.begin(), dq.end(), has_block(*i)) != dq.end() - || std::find(rq.begin(), rq.end(), *i) != rq.end()) + || std::find_if(rq.begin(), rq.end(), has_block(*i)) != rq.end()) continue; TORRENT_ASSERT(p.num_peers(*i) > 0); @@ -244,13 +244,13 @@ namespace libtorrent // don't request pieces we already have in our request queue if (std::find_if(dq.begin(), dq.end(), has_block(*i)) != dq.end() - || std::find(rq.begin(), rq.end(), *i) != rq.end()) + || std::find_if(rq.begin(), rq.end(), has_block(*i)) != rq.end()) continue; // ok, we found a piece that's not being downloaded // by somebody else. request it from this peer // and return - if (!c.add_request(*i)) continue; + if (!c.add_request(*i, 0)) continue; TORRENT_ASSERT(p.num_peers(*i) == 1); TORRENT_ASSERT(p.is_requested(*i)); num_requests--; @@ -277,7 +277,7 @@ namespace libtorrent #endif TORRENT_ASSERT(p.is_requested(*i)); TORRENT_ASSERT(p.num_peers(*i) > 0); - c.add_request(*i); + c.add_request(*i, peer_connection::req_busy); } policy::policy(torrent* t) diff --git a/src/torrent.cpp b/src/torrent.cpp index 7490ec9d9..2807d16bc 100644 --- a/src/torrent.cpp +++ b/src/torrent.cpp @@ -62,6 +62,7 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/hasher.hpp" #include "libtorrent/entry.hpp" #include "libtorrent/peer.hpp" +#include "libtorrent/peer_connection.hpp" #include "libtorrent/bt_peer_connection.hpp" #include "libtorrent/web_peer_connection.hpp" #include "libtorrent/http_seed_connection.hpp" @@ -2248,7 +2249,7 @@ namespace libtorrent { peer_connection* p = *i; std::vector const& dq = p->download_queue(); - std::vector const& rq = p->request_queue(); + std::vector const& rq = p->request_queue(); for (std::vector::const_iterator k = dq.begin() , end(dq.end()); k != end; ++k) { @@ -2256,11 +2257,11 @@ namespace libtorrent m_picker->mark_as_downloading(k->block, p->peer_info_struct() , (piece_picker::piece_state_t)p->peer_speed()); } - for (std::vector::const_iterator k = rq.begin() + for (std::vector::const_iterator k = rq.begin() , end(rq.end()); k != end; ++k) { - if (k->piece_index != index) continue; - m_picker->mark_as_downloading(*k, p->peer_info_struct() + if (k->block.piece_index != index) continue; + m_picker->mark_as_downloading(k->block, p->peer_info_struct() , (piece_picker::piece_state_t)p->peer_speed()); } } @@ -4642,9 +4643,9 @@ namespace libtorrent TORRENT_ASSERT(m_ses.has_peer(*i)); #endif peer_connection const& p = *(*i); - for (std::vector::const_iterator i = p.request_queue().begin() + for (std::vector::const_iterator i = p.request_queue().begin() , end(p.request_queue().end()); i != end; ++i) - ++num_requests[*i]; + ++num_requests[i->block]; for (std::vector::const_iterator i = p.download_queue().begin() , end(p.download_queue().end()); i != end; ++i) if (!i->not_wanted && !i->timed_out) ++num_requests[i->block]; @@ -5507,19 +5508,19 @@ namespace libtorrent , backup1, backup2, 1, 0, c.peer_info_struct() , ignore, piece_picker::fast, 0); - std::vector const& rq = c.request_queue(); + std::vector const& rq = c.request_queue(); bool added_request = false; - if (!interesting_blocks.empty() && std::find(rq.begin(), rq.end() - , interesting_blocks.front()) != rq.end()) + if (!interesting_blocks.empty() && std::find_if(rq.begin(), rq.end() + , has_block(interesting_blocks.front())) != rq.end()) { c.make_time_critical(interesting_blocks.front()); added_request = true; } else if (!interesting_blocks.empty()) { - c.add_request(interesting_blocks.front(), true); + c.add_request(interesting_blocks.front(), peer_connection::req_time_critical); added_request = true; }