only keep one outstanding duplicate request per peer

This commit is contained in:
Arvid Norberg 2009-12-25 16:52:57 +00:00
parent ecb770eca7
commit cdf459dad5
5 changed files with 89 additions and 56 deletions

View File

@ -84,6 +84,8 @@
* support min_interval tracker extension * support min_interval tracker extension
* added session saving and loading functions * added session saving and loading functions
* added support for min-interval in tracker responses * added support for min-interval in tracker responses
* only keeps one outstanding duplicate request per peer
reduces waste download, specifically when streaming
release 0.14.8 release 0.14.8

View File

@ -97,7 +97,8 @@ namespace libtorrent
struct pending_block struct pending_block
{ {
pending_block(piece_block const& b) pending_block(piece_block const& b)
: skipped(0), not_wanted(false), timed_out(false), block(b) {} : skipped(0), not_wanted(false), timed_out(false)
, busy(false), block(b) {}
// the number of times the request // the number of times the request
// has been skipped by out of order blocks // has been skipped by out of order blocks
@ -112,6 +113,12 @@ namespace libtorrent
bool not_wanted:1; bool not_wanted:1;
bool timed_out:1; bool timed_out:1;
// the busy flag is set if the block was
// requested from another peer when this
// request was queued. We only allow a single
// busy request at a time in each peer's queue
bool busy:1;
piece_block block; piece_block block;
bool operator==(pending_block const& b) bool operator==(pending_block const& b)
@ -275,7 +282,7 @@ namespace libtorrent
bool has_piece(int i) const; bool has_piece(int i) const;
std::vector<pending_block> const& download_queue() const; std::vector<pending_block> const& download_queue() const;
std::vector<piece_block> const& request_queue() const; std::vector<pending_block> const& request_queue() const;
std::vector<peer_request> const& upload_queue() const; std::vector<peer_request> const& upload_queue() const;
// estimate of how long it will take until we have // estimate of how long it will take until we have
@ -440,7 +447,8 @@ namespace libtorrent
// adds a block to the request queue // adds a block to the request queue
// returns true if successful, false otherwise // returns true if successful, false otherwise
bool add_request(piece_block const& b, bool time_critical = false); enum flags_t { req_time_critical = 1, req_busy = 2 };
bool add_request(piece_block const& b, int flags = 0);
// clears the request queue and sends cancels for all messages // clears the request queue and sends cancels for all messages
// in the download queue // in the download queue
@ -766,7 +774,7 @@ namespace libtorrent
// the blocks we have reserved in the piece // the blocks we have reserved in the piece
// picker and will request from this peer. // picker and will request from this peer.
std::vector<piece_block> m_request_queue; std::vector<pending_block> m_request_queue;
// the queue of blocks we have requested // the queue of blocks we have requested
// from this peer // from this peer

View File

@ -831,7 +831,7 @@ namespace libtorrent
return m_have_piece[i]; return m_have_piece[i];
} }
std::vector<piece_block> const& peer_connection::request_queue() const std::vector<pending_block> const& peer_connection::request_queue() const
{ {
return m_request_queue; return m_request_queue;
} }
@ -1088,10 +1088,10 @@ namespace libtorrent
if (!t->is_seed()) if (!t->is_seed())
{ {
piece_picker& p = t->picker(); piece_picker& p = t->picker();
for (std::vector<piece_block>::const_iterator i = m_request_queue.begin() for (std::vector<pending_block>::const_iterator i = m_request_queue.begin()
, end(m_request_queue.end()); i != end; ++i) , end(m_request_queue.end()); i != end; ++i)
{ {
p.abort_download(*i); p.abort_download(i->block);
} }
} }
m_request_queue.clear(); m_request_queue.clear();
@ -1140,7 +1140,7 @@ namespace libtorrent
if (i != m_download_queue.end()) if (i != m_download_queue.end())
{ {
piece_block b = i->block; pending_block b = *i;
bool remove_from_picker = !i->timed_out && !i->not_wanted; bool remove_from_picker = !i->timed_out && !i->not_wanted;
m_download_queue.erase(i); m_download_queue.erase(i);
TORRENT_ASSERT(m_outstanding_bytes >= r.length); TORRENT_ASSERT(m_outstanding_bytes >= r.length);
@ -1158,7 +1158,7 @@ namespace libtorrent
else if (!t->is_seed() && remove_from_picker) else if (!t->is_seed() && remove_from_picker)
{ {
piece_picker& p = t->picker(); piece_picker& p = t->picker();
p.abort_download(b); p.abort_download(b.block);
} }
#if !defined TORRENT_DISABLE_INVARIANT_CHECKS && defined TORRENT_DEBUG #if !defined TORRENT_DISABLE_INVARIANT_CHECKS && defined TORRENT_DEBUG
check_invariant(); check_invariant();
@ -1941,10 +1941,10 @@ namespace libtorrent
// assume our outstanding bytes includes this piece too // assume our outstanding bytes includes this piece too
if (!in_req_queue) if (!in_req_queue)
{ {
for (std::vector<piece_block>::iterator i = m_request_queue.begin() for (std::vector<pending_block>::iterator i = m_request_queue.begin()
, end(m_request_queue.end()); i != end; ++i) , end(m_request_queue.end()); i != end; ++i)
{ {
if (*i != b) continue; if (i->block != b) continue;
in_req_queue = true; in_req_queue = true;
m_request_queue.erase(i); m_request_queue.erase(i);
break; break;
@ -2573,18 +2573,18 @@ namespace libtorrent
void peer_connection::make_time_critical(piece_block const& block) void peer_connection::make_time_critical(piece_block const& block)
{ {
std::vector<piece_block>::iterator rit = std::find(m_request_queue.begin() std::vector<pending_block>::iterator rit = std::find_if(m_request_queue.begin()
, m_request_queue.end(), block); , m_request_queue.end(), has_block(block));
if (rit == m_request_queue.end()) return; if (rit == m_request_queue.end()) return;
// ignore it if it's already time critical // ignore it if it's already time critical
if (rit - m_request_queue.begin() < m_queued_time_critical) return; if (rit - m_request_queue.begin() < m_queued_time_critical) return;
piece_block b = *rit; pending_block b = *rit;
m_request_queue.erase(rit); m_request_queue.erase(rit);
m_request_queue.insert(m_request_queue.begin() + m_queued_time_critical, b); m_request_queue.insert(m_request_queue.begin() + m_queued_time_critical, b);
++m_queued_time_critical; ++m_queued_time_critical;
} }
bool peer_connection::add_request(piece_block const& block, bool time_critical) bool peer_connection::add_request(piece_block const& block, int flags)
{ {
INVARIANT_CHECK; INVARIANT_CHECK;
@ -2624,6 +2624,24 @@ namespace libtorrent
state = piece_picker::slow; state = piece_picker::slow;
} }
if (flags & req_busy)
{
// this block is busy (i.e. it has been requested
// from another peer already). Only allow one busy
// request in the pipeline at the time
for (std::vector<pending_block>::const_iterator i = m_download_queue.begin()
, end(m_download_queue.end()); i != end; ++i)
{
if (i->busy) return false;
}
for (std::vector<pending_block>::const_iterator i = m_request_queue.begin()
, end(m_request_queue.end()); i != end; ++i)
{
if (i->busy) return false;
}
}
if (!t->picker().mark_as_downloading(block, peer_info_struct(), state)) if (!t->picker().mark_as_downloading(block, peer_info_struct(), state))
return false; return false;
@ -2633,15 +2651,17 @@ namespace libtorrent
remote(), pid(), speedmsg, block.block_index, block.piece_index)); remote(), pid(), speedmsg, block.block_index, block.piece_index));
} }
if (time_critical) pending_block pb(block);
pb.busy = flags & req_busy;
if (flags & req_time_critical)
{ {
m_request_queue.insert(m_request_queue.begin() + m_queued_time_critical m_request_queue.insert(m_request_queue.begin() + m_queued_time_critical
, block); , pb);
++m_queued_time_critical; ++m_queued_time_critical;
} }
else else
{ {
m_request_queue.push_back(block); m_request_queue.push_back(pb);
} }
return true; return true;
} }
@ -2662,7 +2682,7 @@ namespace libtorrent
while (!m_request_queue.empty()) while (!m_request_queue.empty())
{ {
t->picker().abort_download(m_request_queue.back()); t->picker().abort_download(m_request_queue.back().block);
m_request_queue.pop_back(); m_request_queue.pop_back();
} }
m_queued_time_critical = 0; m_queued_time_critical = 0;
@ -2724,8 +2744,8 @@ namespace libtorrent
= std::find_if(m_download_queue.begin(), m_download_queue.end(), has_block(block)); = std::find_if(m_download_queue.begin(), m_download_queue.end(), has_block(block));
if (it == m_download_queue.end()) if (it == m_download_queue.end())
{ {
std::vector<piece_block>::iterator rit = std::find(m_request_queue.begin() std::vector<pending_block>::iterator rit = std::find_if(m_request_queue.begin()
, m_request_queue.end(), block); , m_request_queue.end(), has_block(block));
// when a multi block is received, it is cancelled // when a multi block is received, it is cancelled
// from all peers, so if this one hasn't requested // from all peers, so if this one hasn't requested
@ -2877,16 +2897,16 @@ namespace libtorrent
&& ((int)m_download_queue.size() < m_desired_queue_size && ((int)m_download_queue.size() < m_desired_queue_size
|| m_queued_time_critical > 0)) || m_queued_time_critical > 0))
{ {
piece_block block = m_request_queue.front(); pending_block block = m_request_queue.front();
int block_offset = block.block_index * t->block_size(); int block_offset = block.block.block_index * t->block_size();
int block_size = (std::min)(t->torrent_file().piece_size( int block_size = (std::min)(t->torrent_file().piece_size(
block.piece_index) - block_offset, t->block_size()); block.block.piece_index) - block_offset, t->block_size());
TORRENT_ASSERT(block_size > 0); TORRENT_ASSERT(block_size > 0);
TORRENT_ASSERT(block_size <= t->block_size()); TORRENT_ASSERT(block_size <= t->block_size());
peer_request r; peer_request r;
r.piece = block.piece_index; r.piece = block.block.piece_index;
r.start = block_offset; r.start = block_offset;
r.length = block_size; r.length = block_size;
@ -2895,10 +2915,11 @@ namespace libtorrent
if (t->is_seed()) continue; if (t->is_seed()) continue;
// this can happen if a block times out, is re-requested and // this can happen if a block times out, is re-requested and
// then arrives "unexpectedly" // then arrives "unexpectedly"
if (t->picker().is_finished(block) || t->picker().is_downloaded(block)) if (t->picker().is_finished(block.block)
|| t->picker().is_downloaded(block.block))
continue; continue;
TORRENT_ASSERT(verify_piece(t->to_req(block))); TORRENT_ASSERT(verify_piece(t->to_req(block.block)));
m_download_queue.push_back(block); m_download_queue.push_back(block);
m_outstanding_bytes += block_size; m_outstanding_bytes += block_size;
#if !defined TORRENT_DISABLE_INVARIANT_CHECKS && defined TORRENT_DEBUG #if !defined TORRENT_DISABLE_INVARIANT_CHECKS && defined TORRENT_DEBUG
@ -2922,26 +2943,26 @@ namespace libtorrent
{ {
// check to see if this block is connected to the previous one // check to see if this block is connected to the previous one
// if it is, merge them, otherwise, break this merge loop // if it is, merge them, otherwise, break this merge loop
piece_block const& front = m_request_queue.front(); pending_block const& front = m_request_queue.front();
if (front.piece_index * blocks_per_piece + front.block_index if (front.block.piece_index * blocks_per_piece + front.block.block_index
!= block.piece_index * blocks_per_piece + block.block_index + 1) != block.block.piece_index * blocks_per_piece + block.block.block_index + 1)
break; break;
block = m_request_queue.front(); block = m_request_queue.front();
m_request_queue.erase(m_request_queue.begin()); m_request_queue.erase(m_request_queue.begin());
TORRENT_ASSERT(verify_piece(t->to_req(block))); TORRENT_ASSERT(verify_piece(t->to_req(block.block)));
m_download_queue.push_back(block); m_download_queue.push_back(block);
if (m_queued_time_critical) --m_queued_time_critical; if (m_queued_time_critical) --m_queued_time_critical;
#ifdef TORRENT_VERBOSE_LOGGING #ifdef TORRENT_VERBOSE_LOGGING
(*m_logger) << time_now_string() (*m_logger) << time_now_string()
<< " *** MERGING REQUEST ** [ " << " *** MERGING REQUEST ** [ "
"piece: " << block.piece_index << " | " "piece: " << block.block.piece_index << " | "
"block: " << block.block_index << " ]\n"; "block: " << block.block.block_index << " ]\n";
#endif #endif
block_offset = block.block_index * t->block_size(); block_offset = block.block.block_index * t->block_size();
block_size = (std::min)(t->torrent_file().piece_size( block_size = (std::min)(t->torrent_file().piece_size(
block.piece_index) - block_offset, t->block_size()); block.block.piece_index) - block_offset, t->block_size());
TORRENT_ASSERT(block_size > 0); TORRENT_ASSERT(block_size > 0);
TORRENT_ASSERT(block_size <= t->block_size()); TORRENT_ASSERT(block_size <= t->block_size());
@ -3076,7 +3097,7 @@ namespace libtorrent
} }
while (!m_request_queue.empty()) while (!m_request_queue.empty())
{ {
picker.abort_download(m_request_queue.back()); picker.abort_download(m_request_queue.back().block);
m_request_queue.pop_back(); m_request_queue.pop_back();
} }
} }
@ -3744,9 +3765,9 @@ namespace libtorrent
// time out the last request in the queue // time out the last request in the queue
if (prev_request_queue > 0) if (prev_request_queue > 0)
{ {
std::vector<piece_block>::iterator i std::vector<pending_block>::iterator i
= m_request_queue.begin() + (prev_request_queue - 1); = m_request_queue.begin() + (prev_request_queue - 1);
r = *i; r = i->block;
m_request_queue.erase(i); m_request_queue.erase(i);
if (prev_request_queue <= m_queued_time_critical) if (prev_request_queue <= m_queued_time_critical)
--m_queued_time_critical; --m_queued_time_critical;
@ -4764,7 +4785,8 @@ namespace libtorrent
std::set<piece_block> unique; std::set<piece_block> unique;
std::transform(m_download_queue.begin(), m_download_queue.end() std::transform(m_download_queue.begin(), m_download_queue.end()
, std::inserter(unique, unique.begin()), boost::bind(&pending_block::block, _1)); , std::inserter(unique, unique.begin()), boost::bind(&pending_block::block, _1));
std::copy(m_request_queue.begin(), m_request_queue.end(), std::inserter(unique, unique.begin())); std::transform(m_request_queue.begin(), m_request_queue.end()
, std::inserter(unique, unique.begin()), boost::bind(&pending_block::block, _1));
TORRENT_ASSERT(unique.size() == m_download_queue.size() + m_request_queue.size()); TORRENT_ASSERT(unique.size() == m_download_queue.size() + m_request_queue.size());
if (m_peer_info) if (m_peer_info)
{ {
@ -4825,9 +4847,9 @@ namespace libtorrent
TORRENT_ASSERT(m_ses.has_peer(*i)); TORRENT_ASSERT(m_ses.has_peer(*i));
#endif #endif
peer_connection const& p = *(*i); peer_connection const& p = *(*i);
for (std::vector<piece_block>::const_iterator i = p.request_queue().begin() for (std::vector<pending_block>::const_iterator i = p.request_queue().begin()
, end(p.request_queue().end()); i != end; ++i) , end(p.request_queue().end()); i != end; ++i)
++num_requests[*i]; ++num_requests[i->block];
for (std::vector<pending_block>::const_iterator i = p.download_queue().begin() for (std::vector<pending_block>::const_iterator i = p.download_queue().begin()
, end(p.download_queue().end()); i != end; ++i) , end(p.download_queue().end()); i != end; ++i)
if (!i->not_wanted && !i->timed_out) ++num_requests[i->block]; if (!i->not_wanted && !i->timed_out) ++num_requests[i->block];

View File

@ -221,7 +221,7 @@ namespace libtorrent
<< " picked: " << interesting_pieces.size() << " ]\n"; << " picked: " << interesting_pieces.size() << " ]\n";
#endif #endif
std::vector<pending_block> const& dq = c.download_queue(); std::vector<pending_block> const& dq = c.download_queue();
std::vector<piece_block> const& rq = c.request_queue(); std::vector<pending_block> const& rq = c.request_queue();
for (std::vector<piece_block>::iterator i = interesting_pieces.begin(); for (std::vector<piece_block>::iterator i = interesting_pieces.begin();
i != interesting_pieces.end(); ++i) i != interesting_pieces.end(); ++i)
{ {
@ -232,7 +232,7 @@ namespace libtorrent
if (num_requests <= 0) break; if (num_requests <= 0) break;
// don't request pieces we already have in our request queue // don't request pieces we already have in our request queue
if (std::find_if(dq.begin(), dq.end(), has_block(*i)) != dq.end() if (std::find_if(dq.begin(), dq.end(), has_block(*i)) != dq.end()
|| std::find(rq.begin(), rq.end(), *i) != rq.end()) || std::find_if(rq.begin(), rq.end(), has_block(*i)) != rq.end())
continue; continue;
TORRENT_ASSERT(p.num_peers(*i) > 0); TORRENT_ASSERT(p.num_peers(*i) > 0);
@ -244,13 +244,13 @@ namespace libtorrent
// don't request pieces we already have in our request queue // don't request pieces we already have in our request queue
if (std::find_if(dq.begin(), dq.end(), has_block(*i)) != dq.end() if (std::find_if(dq.begin(), dq.end(), has_block(*i)) != dq.end()
|| std::find(rq.begin(), rq.end(), *i) != rq.end()) || std::find_if(rq.begin(), rq.end(), has_block(*i)) != rq.end())
continue; continue;
// ok, we found a piece that's not being downloaded // ok, we found a piece that's not being downloaded
// by somebody else. request it from this peer // by somebody else. request it from this peer
// and return // and return
if (!c.add_request(*i)) continue; if (!c.add_request(*i, 0)) continue;
TORRENT_ASSERT(p.num_peers(*i) == 1); TORRENT_ASSERT(p.num_peers(*i) == 1);
TORRENT_ASSERT(p.is_requested(*i)); TORRENT_ASSERT(p.is_requested(*i));
num_requests--; num_requests--;
@ -277,7 +277,7 @@ namespace libtorrent
#endif #endif
TORRENT_ASSERT(p.is_requested(*i)); TORRENT_ASSERT(p.is_requested(*i));
TORRENT_ASSERT(p.num_peers(*i) > 0); TORRENT_ASSERT(p.num_peers(*i) > 0);
c.add_request(*i); c.add_request(*i, peer_connection::req_busy);
} }
policy::policy(torrent* t) policy::policy(torrent* t)

View File

@ -62,6 +62,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/hasher.hpp" #include "libtorrent/hasher.hpp"
#include "libtorrent/entry.hpp" #include "libtorrent/entry.hpp"
#include "libtorrent/peer.hpp" #include "libtorrent/peer.hpp"
#include "libtorrent/peer_connection.hpp"
#include "libtorrent/bt_peer_connection.hpp" #include "libtorrent/bt_peer_connection.hpp"
#include "libtorrent/web_peer_connection.hpp" #include "libtorrent/web_peer_connection.hpp"
#include "libtorrent/http_seed_connection.hpp" #include "libtorrent/http_seed_connection.hpp"
@ -2248,7 +2249,7 @@ namespace libtorrent
{ {
peer_connection* p = *i; peer_connection* p = *i;
std::vector<pending_block> const& dq = p->download_queue(); std::vector<pending_block> const& dq = p->download_queue();
std::vector<piece_block> const& rq = p->request_queue(); std::vector<pending_block> const& rq = p->request_queue();
for (std::vector<pending_block>::const_iterator k = dq.begin() for (std::vector<pending_block>::const_iterator k = dq.begin()
, end(dq.end()); k != end; ++k) , end(dq.end()); k != end; ++k)
{ {
@ -2256,11 +2257,11 @@ namespace libtorrent
m_picker->mark_as_downloading(k->block, p->peer_info_struct() m_picker->mark_as_downloading(k->block, p->peer_info_struct()
, (piece_picker::piece_state_t)p->peer_speed()); , (piece_picker::piece_state_t)p->peer_speed());
} }
for (std::vector<piece_block>::const_iterator k = rq.begin() for (std::vector<pending_block>::const_iterator k = rq.begin()
, end(rq.end()); k != end; ++k) , end(rq.end()); k != end; ++k)
{ {
if (k->piece_index != index) continue; if (k->block.piece_index != index) continue;
m_picker->mark_as_downloading(*k, p->peer_info_struct() m_picker->mark_as_downloading(k->block, p->peer_info_struct()
, (piece_picker::piece_state_t)p->peer_speed()); , (piece_picker::piece_state_t)p->peer_speed());
} }
} }
@ -4642,9 +4643,9 @@ namespace libtorrent
TORRENT_ASSERT(m_ses.has_peer(*i)); TORRENT_ASSERT(m_ses.has_peer(*i));
#endif #endif
peer_connection const& p = *(*i); peer_connection const& p = *(*i);
for (std::vector<piece_block>::const_iterator i = p.request_queue().begin() for (std::vector<pending_block>::const_iterator i = p.request_queue().begin()
, end(p.request_queue().end()); i != end; ++i) , end(p.request_queue().end()); i != end; ++i)
++num_requests[*i]; ++num_requests[i->block];
for (std::vector<pending_block>::const_iterator i = p.download_queue().begin() for (std::vector<pending_block>::const_iterator i = p.download_queue().begin()
, end(p.download_queue().end()); i != end; ++i) , end(p.download_queue().end()); i != end; ++i)
if (!i->not_wanted && !i->timed_out) ++num_requests[i->block]; if (!i->not_wanted && !i->timed_out) ++num_requests[i->block];
@ -5507,19 +5508,19 @@ namespace libtorrent
, backup1, backup2, 1, 0, c.peer_info_struct() , backup1, backup2, 1, 0, c.peer_info_struct()
, ignore, piece_picker::fast, 0); , ignore, piece_picker::fast, 0);
std::vector<piece_block> const& rq = c.request_queue(); std::vector<pending_block> const& rq = c.request_queue();
bool added_request = false; bool added_request = false;
if (!interesting_blocks.empty() && std::find(rq.begin(), rq.end() if (!interesting_blocks.empty() && std::find_if(rq.begin(), rq.end()
, interesting_blocks.front()) != rq.end()) , has_block(interesting_blocks.front())) != rq.end())
{ {
c.make_time_critical(interesting_blocks.front()); c.make_time_critical(interesting_blocks.front());
added_request = true; added_request = true;
} }
else if (!interesting_blocks.empty()) else if (!interesting_blocks.empty())
{ {
c.add_request(interesting_blocks.front(), true); c.add_request(interesting_blocks.front(), peer_connection::req_time_critical);
added_request = true; added_request = true;
} }