improve piece_deadline/streaming

This commit is contained in:
Arvid Norberg 2014-04-22 04:21:14 +00:00
parent 4c7e24e9c6
commit 3192e59a1a
8 changed files with 620 additions and 178 deletions

View File

@ -1,3 +1,4 @@
* improve piece_deadline/streaming
* honor pieces with priority 7 in sequential download mode * honor pieces with priority 7 in sequential download mode
* simplified building python bindings * simplified building python bindings
* make ignore_non_routers more forgiving in the case there are no UPnP * make ignore_non_routers more forgiving in the case there are no UPnP

View File

@ -510,7 +510,9 @@ namespace libtorrent
bool can_request_time_critical() const; bool can_request_time_critical() const;
void make_time_critical(piece_block const& block); // returns true if the specified block was actually made time-critical.
// if the block was already time-critical, it returns false.
bool make_time_critical(piece_block const& block);
// adds a block to the request queue // adds a block to the request queue
// returns true if successful, false otherwise // returns true if successful, false otherwise

View File

@ -151,7 +151,11 @@ namespace libtorrent
// have affinity to pieces with the same speed category // have affinity to pieces with the same speed category
speed_affinity = 32, speed_affinity = 32,
// ignore the prefer_whole_pieces parameter // ignore the prefer_whole_pieces parameter
ignore_whole_pieces = 64 ignore_whole_pieces = 64,
// treat pieces with priority 6 and below as filtered
// to trigger end-game mode until all prio 7 pieces are
// completed
time_critical_mode = 128
}; };
struct downloading_piece struct downloading_piece

View File

@ -80,6 +80,8 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/peer_connection.hpp" #include "libtorrent/peer_connection.hpp"
#endif #endif
//#define TORRENT_DEBUG_STREAMING 1
namespace libtorrent namespace libtorrent
{ {
class http_parser; class http_parser;
@ -104,6 +106,29 @@ namespace libtorrent
struct piece_checker_data; struct piece_checker_data;
} }
struct time_critical_piece
{
// when this piece was first requested
ptime first_requested;
// when this piece was last requested
ptime last_requested;
// by what time we want this piece
ptime deadline;
// 1 = send alert with piece data when available
int flags;
// how many peers it's been requested from
int peers;
// the piece index
int piece;
#ifdef TORRENT_DEBUG_STREAMING
// the number of multiple requests are allowed
// to blocks still not downloaded (debugging only)
int timed_out;
#endif
bool operator<(time_critical_piece const& rhs) const
{ return deadline < rhs.deadline; }
};
// a torrent is a class that holds information // a torrent is a class that holds information
// for a specific download. It updates itself against // for a specific download. It updates itself against
// the tracker // the tracker
@ -874,6 +899,9 @@ namespace libtorrent
boost::asio::ssl::context* ssl_ctx() const { return m_ssl_ctx.get(); } boost::asio::ssl::context* ssl_ctx() const { return m_ssl_ctx.get(); }
#endif #endif
int num_time_critical_pieces() const
{ return m_time_critical_pieces.size(); }
private: private:
void on_files_deleted(int ret, disk_io_job const& j); void on_files_deleted(int ret, disk_io_job const& j);
@ -1013,24 +1041,6 @@ namespace libtorrent
std::vector<announce_entry> m_trackers; std::vector<announce_entry> m_trackers;
// this is an index into m_trackers // this is an index into m_trackers
struct time_critical_piece
{
// when this piece was first requested
ptime first_requested;
// when this piece was last requested
ptime last_requested;
// by what time we want this piece
ptime deadline;
// 1 = send alert with piece data when available
int flags;
// how many peers it's been requested from
int peers;
// the piece index
int piece;
bool operator<(time_critical_piece const& rhs) const
{ return deadline < rhs.deadline; }
};
// this list is sorted by time_critical_piece::deadline // this list is sorted by time_critical_piece::deadline
std::deque<time_critical_piece> m_time_critical_pieces; std::deque<time_critical_piece> m_time_critical_pieces;

View File

@ -838,6 +838,11 @@ namespace libtorrent
TORRENT_ASSERT(t); TORRENT_ASSERT(t);
if (!t) return 0; if (!t) return 0;
if (t->num_time_critical_pieces() > 0)
{
ret |= piece_picker::time_critical_mode;
}
if (t->is_sequential_download()) if (t->is_sequential_download())
{ {
ret |= piece_picker::sequential; ret |= piece_picker::sequential;
@ -963,12 +968,44 @@ namespace libtorrent
time_duration peer_connection::download_queue_time(int extra_bytes) const time_duration peer_connection::download_queue_time(int extra_bytes) const
{ {
int rate = m_statistics.transfer_rate(stat::download_payload)
+ m_statistics.transfer_rate(stat::download_protocol);
// avoid division by zero
if (rate < 50) rate = 50;
boost::shared_ptr<torrent> t = m_torrent.lock(); boost::shared_ptr<torrent> t = m_torrent.lock();
TORRENT_ASSERT(t); TORRENT_ASSERT(t);
int rate = 0;
// if we haven't received any data recently, the current download rate
// is not representative
if (time_now() - m_last_piece > seconds(30) && m_download_rate_peak > 0)
{
rate = m_download_rate_peak;
}
else if (time_now() - m_last_unchoked < seconds(5)
&& m_statistics.total_payload_upload() < 2 * 0x4000)
{
// if we're have only been unchoked for a short period of time,
// we don't know what rate we can get from this peer. Instead of assuming
// the lowest possible rate, assume the average.
// TODO: this should only be peers we're trying to download from
int peers_with_requests = m_ses.num_connections();
// avoid division by 0
if (peers_with_requests == 0) peers_with_requests = 1;
rate = m_ses.m_stat.transfer_rate(stat::download_payload) / peers_with_requests;
}
else
{
// current download rate in bytes per seconds
rate = m_statistics.transfer_rate(stat::download_payload)
+ m_statistics.transfer_rate(stat::download_protocol);
}
// avoid division by zero
if (rate < 50) rate = 50;
// average of current rate and peak
// rate = (rate + m_download_rate_peak) / 2;
return seconds((m_outstanding_bytes + m_queued_time_critical * t->block_size()) / rate); return seconds((m_outstanding_bytes + m_queued_time_critical * t->block_size()) / rate);
} }
@ -2891,17 +2928,17 @@ namespace libtorrent
TORRENT_ASSERT(t); TORRENT_ASSERT(t);
if (t->upload_mode()) return false; if (t->upload_mode()) return false;
// ignore snubbed peers, since they're not likely to return pieces in a timely // ignore snubbed peers, since they're not likely to return pieces in a
// manner anyway // timely manner anyway
if (m_snubbed) return false; if (m_snubbed) return false;
return true; return true;
} }
void peer_connection::make_time_critical(piece_block const& block) bool peer_connection::make_time_critical(piece_block const& block)
{ {
std::vector<pending_block>::iterator rit = std::find_if(m_request_queue.begin() std::vector<pending_block>::iterator rit = std::find_if(m_request_queue.begin()
, m_request_queue.end(), has_block(block)); , m_request_queue.end(), has_block(block));
if (rit == m_request_queue.end()) return; if (rit == m_request_queue.end()) return false;
#if TORRENT_USE_ASSERTS #if TORRENT_USE_ASSERTS
boost::shared_ptr<torrent> t = m_torrent.lock(); boost::shared_ptr<torrent> t = m_torrent.lock();
TORRENT_ASSERT(t); TORRENT_ASSERT(t);
@ -2909,11 +2946,12 @@ namespace libtorrent
TORRENT_ASSERT(t->picker().is_requested(block)); TORRENT_ASSERT(t->picker().is_requested(block));
#endif #endif
// ignore it if it's already time critical // ignore it if it's already time critical
if (rit - m_request_queue.begin() < m_queued_time_critical) return; if (rit - m_request_queue.begin() < m_queued_time_critical) return false;
pending_block b = *rit; pending_block b = *rit;
m_request_queue.erase(rit); m_request_queue.erase(rit);
m_request_queue.insert(m_request_queue.begin() + m_queued_time_critical, b); m_request_queue.insert(m_request_queue.begin() + m_queued_time_critical, b);
++m_queued_time_critical; ++m_queued_time_critical;
return true;
} }
bool peer_connection::add_request(piece_block const& block, int flags) bool peer_connection::add_request(piece_block const& block, int flags)
@ -2959,11 +2997,14 @@ namespace libtorrent
state = piece_picker::slow; state = piece_picker::slow;
} }
if (flags & req_busy) if ((flags & req_busy) && !(flags & req_time_critical))
{ {
// this block is busy (i.e. it has been requested // this block is busy (i.e. it has been requested
// from another peer already). Only allow one busy // from another peer already). Only allow one busy
// request in the pipeline at the time // request in the pipeline at the time
// this rule does not apply to time critical pieces,
// in which case we are allowed to pick more than one
// busy blocks
for (std::vector<pending_block>::const_iterator i = m_download_queue.begin() for (std::vector<pending_block>::const_iterator i = m_download_queue.begin()
, end(m_download_queue.end()); i != end; ++i) , end(m_download_queue.end()); i != end; ++i)
{ {
@ -4045,7 +4086,7 @@ namespace libtorrent
return; return;
} }
int download_rate = statistics().download_rate(); int download_rate = statistics().download_payload_rate();
// calculate the desired download queue size // calculate the desired download queue size
const int queue_time = m_ses.settings().request_queue_time; const int queue_time = m_ses.settings().request_queue_time;

View File

@ -153,7 +153,6 @@ namespace libtorrent
std::vector<downloading_piece>::const_iterator piece = find_dl_piece(index); std::vector<downloading_piece>::const_iterator piece = find_dl_piece(index);
TORRENT_ASSERT(piece != m_downloads.end()); TORRENT_ASSERT(piece != m_downloads.end());
st = *piece; st = *piece;
st.info = 0;
return; return;
} }
st.info = 0; st.info = 0;
@ -1547,6 +1546,10 @@ namespace libtorrent
for (std::vector<downloading_piece>::const_iterator i = m_downloads.begin() for (std::vector<downloading_piece>::const_iterator i = m_downloads.begin()
, end(m_downloads.end()); i != end; ++i) , end(m_downloads.end()); i != end; ++i)
{ {
// in time critical mode, only pick prio 7 pieces
if ((options & time_critical_mode) && piece_priority(i->index) != 7)
continue;
if (!is_piece_free(i->index, pieces)) continue; if (!is_piece_free(i->index, pieces)) continue;
if (m_piece_map[i->index].full if (m_piece_map[i->index].full
&& backup_blocks.size() >= num_blocks && backup_blocks.size() >= num_blocks
@ -1573,6 +1576,10 @@ namespace libtorrent
for (std::vector<int>::const_iterator i = suggested_pieces.begin(); for (std::vector<int>::const_iterator i = suggested_pieces.begin();
i != suggested_pieces.end(); ++i) i != suggested_pieces.end(); ++i)
{ {
// in time critical mode, only pick prio 7 pieces
if ((options & time_critical_mode) && piece_priority(*i) != 7)
continue;
if (!is_piece_free(*i, pieces)) continue; if (!is_piece_free(*i, pieces)) continue;
num_blocks = add_blocks(*i, pieces num_blocks = add_blocks(*i, pieces
, interesting_blocks, backup_blocks , interesting_blocks, backup_blocks
@ -1600,6 +1607,9 @@ namespace libtorrent
if (num_blocks <= 0) return; if (num_blocks <= 0) return;
} }
// in time critical mode, only pick prio 7 pieces
if ((options & time_critical_mode) == 0)
{
if (options & reverse) if (options & reverse)
{ {
for (int i = m_reverse_cursor - 1; i >= m_cursor; --i) for (int i = m_reverse_cursor - 1; i >= m_cursor; --i)
@ -1631,12 +1641,17 @@ namespace libtorrent
} }
} }
} }
}
else if (options & rarest_first) else if (options & rarest_first)
{ {
if (m_dirty) update_pieces(); if (m_dirty) update_pieces();
TORRENT_ASSERT(!m_dirty); TORRENT_ASSERT(!m_dirty);
if (options & reverse) // in time critical mode, we're only allowed to pick prio 7
// pieces. This is why reverse mode is disabled when we're in
// time-critical mode, because all prio 7 pieces are at the front
// of the list
if ((options & reverse) && (options & time_critical_mode) == 0)
{ {
// it's a bit complicated in order to always prioritize // it's a bit complicated in order to always prioritize
// partial pieces, and respect priorities. Every chunk // partial pieces, and respect priorities. Every chunk
@ -1673,6 +1688,13 @@ namespace libtorrent
for (std::vector<int>::const_iterator i = m_pieces.begin(); for (std::vector<int>::const_iterator i = m_pieces.begin();
i != m_pieces.end(); ++i) i != m_pieces.end(); ++i)
{ {
// in time critical mode, only pick prio 7 pieces
// it's safe to break here because in this mode we
// pick pieces in priority order. Once we hit a lower priority
// piece, we won't encounter any more prio 7 ones
if ((options & time_critical_mode) && piece_priority(*i) != 7)
break;
if (!is_piece_free(*i, pieces)) continue; if (!is_piece_free(*i, pieces)) continue;
num_blocks = add_blocks(*i, pieces num_blocks = add_blocks(*i, pieces
, interesting_blocks, backup_blocks , interesting_blocks, backup_blocks
@ -1683,8 +1705,25 @@ namespace libtorrent
} }
} }
} }
else if (options & time_critical_mode)
{
// if we're in time-critical mode, we are only allowed to pick
// prio 7 pieces.
for (std::vector<int>::const_iterator i = m_pieces.begin();
i != m_pieces.end() && piece_priority(*i) == 7; ++i)
{
if (!is_piece_free(*i, pieces)) continue;
num_blocks = add_blocks(*i, pieces
, interesting_blocks, backup_blocks
, backup_blocks2, num_blocks
, prefer_whole_pieces, peer, suggested_pieces
, speed, options);
if (num_blocks <= 0) return;
}
}
else else
{ {
// we're not using rarest first (only for the first // we're not using rarest first (only for the first
// bucket, since that's where the currently downloading // bucket, since that's where the currently downloading
// pieces are) // pieces are)
@ -1808,6 +1847,9 @@ namespace libtorrent
if (!pieces[i->index]) continue; if (!pieces[i->index]) continue;
if (piece_priority(i->index) == 0) continue; if (piece_priority(i->index) == 0) continue;
if ((options & time_critical_mode) && piece_priority(i->index) != 7)
continue;
int num_blocks_in_piece = blocks_in_piece(i->index); int num_blocks_in_piece = blocks_in_piece(i->index);
for (int j = 0; j < num_blocks_in_piece; ++j) for (int j = 0; j < num_blocks_in_piece; ++j)
{ {
@ -1838,7 +1880,7 @@ namespace libtorrent
} }
} }
if (interesting_blocks.empty()) if (interesting_blocks.empty() && !(options & time_critical_mode))
{ {
// print_pieces(); // print_pieces();
for (int i = 0; i < num_pieces(); ++i) for (int i = 0; i < num_pieces(); ++i)

View File

@ -244,7 +244,17 @@ namespace libtorrent
TORRENT_ASSERT(t.valid_metadata()); TORRENT_ASSERT(t.valid_metadata());
TORRENT_ASSERT(c.peer_info_struct() != 0 || c.type() != peer_connection::bittorrent_connection); TORRENT_ASSERT(c.peer_info_struct() != 0 || c.type() != peer_connection::bittorrent_connection);
int num_requests = c.desired_queue_size()
bool time_critical_mode = t.num_time_critical_pieces() > 0;
int desired_queue_size = c.desired_queue_size();
// in time critical mode, only have 1 outstanding request at a time
// via normal requests
if (time_critical_mode)
desired_queue_size = (std::min)(1, desired_queue_size);
int num_requests = desired_queue_size
- (int)c.download_queue().size() - (int)c.download_queue().size()
- (int)c.request_queue().size(); - (int)c.request_queue().size();
@ -262,7 +272,7 @@ namespace libtorrent
int prefer_whole_pieces = c.prefer_whole_pieces(); int prefer_whole_pieces = c.prefer_whole_pieces();
if (prefer_whole_pieces == 0) if (prefer_whole_pieces == 0 && !time_critical_mode)
{ {
prefer_whole_pieces = c.statistics().download_payload_rate() prefer_whole_pieces = c.statistics().download_payload_rate()
* t.settings().whole_pieces_threshold * t.settings().whole_pieces_threshold
@ -331,9 +341,11 @@ namespace libtorrent
// and we're not strictly speaking in end-game mode yet // and we're not strictly speaking in end-game mode yet
// also, if we already have at least one outstanding // also, if we already have at least one outstanding
// request, we shouldn't pick any busy pieces either // request, we shouldn't pick any busy pieces either
bool dont_pick_busy_blocks = (ses.m_settings.strict_end_game_mode // in time critical mode, it's OK to request busy blocks
bool dont_pick_busy_blocks = ((ses.m_settings.strict_end_game_mode
&& p.num_downloading_pieces() < p.num_want_left()) && p.num_downloading_pieces() < p.num_want_left())
|| dq.size() + rq.size() > 0; || dq.size() + rq.size() > 0)
&& !time_critical_mode;
// this is filled with an interesting piece // this is filled with an interesting piece
// that some other peer is currently downloading // that some other peer is currently downloading
@ -348,6 +360,13 @@ namespace libtorrent
if (prefer_whole_pieces == 0 && num_requests <= 0) break; if (prefer_whole_pieces == 0 && num_requests <= 0) break;
if (time_critical_mode && p.piece_priority(i->piece_index) != 7)
{
// assume the subsequent pieces are not prio 7 and
// be done
break;
}
int num_block_requests = p.num_peers(*i); int num_block_requests = p.num_peers(*i);
if (num_block_requests > 0) if (num_block_requests > 0)
{ {

View File

@ -78,6 +78,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/gzip.hpp" // for inflate_gzip #include "libtorrent/gzip.hpp" // for inflate_gzip
#include "libtorrent/random.hpp" #include "libtorrent/random.hpp"
#include "libtorrent/string_util.hpp" // for allocate_string_copy #include "libtorrent/string_util.hpp" // for allocate_string_copy
#include "libtorrent/alloca.hpp"
#ifdef TORRENT_USE_OPENSSL #ifdef TORRENT_USE_OPENSSL
#include "libtorrent/ssl_stream.hpp" #include "libtorrent/ssl_stream.hpp"
@ -3802,8 +3803,8 @@ namespace libtorrent
--i; --i;
} }
// just in case this piece had priority 0 // just in case this piece had priority 0
if (m_picker->piece_priority(piece) == 0) int prev_prio = m_picker->piece_priority(piece);
m_picker->set_piece_priority(piece, 1); m_picker->set_piece_priority(piece, 7);
return; return;
} }
@ -3819,8 +3820,8 @@ namespace libtorrent
m_time_critical_pieces.insert(i, p); m_time_critical_pieces.insert(i, p);
// just in case this piece had priority 0 // just in case this piece had priority 0
if (m_picker->piece_priority(piece) == 0) int prev_prio = m_picker->piece_priority(piece);
m_picker->set_piece_priority(piece, 1); m_picker->set_piece_priority(piece, 7);
piece_picker::downloading_piece pi; piece_picker::downloading_piece pi;
m_picker->piece_info(piece, pi); m_picker->piece_info(piece, pi);
@ -3875,9 +3876,9 @@ namespace libtorrent
{ {
int diff = abs(int(dl_time - m_average_piece_time)); int diff = abs(int(dl_time - m_average_piece_time));
if (m_piece_time_deviation == 0) m_piece_time_deviation = diff; if (m_piece_time_deviation == 0) m_piece_time_deviation = diff;
else m_piece_time_deviation = (m_piece_time_deviation * 6 + diff * 4) / 10; else m_piece_time_deviation = (m_piece_time_deviation * 9 + diff) / 10;
m_average_piece_time = (m_average_piece_time * 6 + dl_time * 4) / 10; m_average_piece_time = (m_average_piece_time * 9 + dl_time) / 10;
} }
} }
} }
@ -3887,6 +3888,7 @@ namespace libtorrent
m_ses.m_alerts.post_alert(read_piece_alert( m_ses.m_alerts.post_alert(read_piece_alert(
get_handle(), piece, error_code(boost::system::errc::operation_canceled, get_system_category()))); get_handle(), piece, error_code(boost::system::errc::operation_canceled, get_system_category())));
} }
if (has_picker()) m_picker->set_piece_priority(piece, 1);
m_time_critical_pieces.erase(i); m_time_critical_pieces.erase(i);
return; return;
} }
@ -8212,9 +8214,356 @@ namespace libtorrent
m_stat += s; m_stat += s;
} }
#ifdef TORRENT_DEBUG_STREAMING
char const* esc(char const* code)
{
// this is a silly optimization
// to avoid copying of strings
enum { num_strings = 200 };
static char buf[num_strings][20];
static int round_robin = 0;
char* ret = buf[round_robin];
++round_robin;
if (round_robin >= num_strings) round_robin = 0;
ret[0] = '\033';
ret[1] = '[';
int i = 2;
int j = 0;
while (code[j]) ret[i++] = code[j++];
ret[i++] = 'm';
ret[i++] = 0;
return ret;
}
int peer_index(libtorrent::tcp::endpoint addr
, std::vector<libtorrent::peer_info> const& peers)
{
using namespace libtorrent;
std::vector<peer_info>::const_iterator i = std::find_if(peers.begin()
, peers.end(), boost::bind(&peer_info::ip, _1) == addr);
if (i == peers.end()) return -1;
return i - peers.begin();
}
void print_piece(libtorrent::partial_piece_info* pp
, std::vector<libtorrent::peer_info> const& peers
, std::vector<time_critical_piece> const& time_critical)
{
using namespace libtorrent;
ptime now = time_now_hires();
float deadline = 0.f;
float last_request = 0.f;
int timed_out = -1;
int piece = pp->piece_index;
std::vector<time_critical_piece>::const_iterator i
= std::find_if(time_critical.begin(), time_critical.end()
, boost::bind(&time_critical_piece::piece, _1) == piece);
if (i != time_critical.end())
{
deadline = total_milliseconds(i->deadline - now) / 1000.f;
last_request = total_milliseconds(now - i->last_requested) / 1000.f;
timed_out = i->timed_out;
}
int num_blocks = pp->blocks_in_piece;
printf("%5d: [", piece);
for (int j = 0; j < num_blocks; ++j)
{
int index = pp ? peer_index(pp->blocks[j].peer(), peers) % 36 : -1;
char chr = '+';
if (index >= 0)
chr = (index < 10)?'0' + index:'A' + index - 10;
char const* color = "";
char const* multi_req = "";
if (pp->blocks[j].num_peers > 1)
multi_req = esc("1");
if (pp->blocks[j].bytes_progress > 0
&& pp->blocks[j].state == block_info::requested)
{
color = esc("33;7");
chr = '0' + (pp->blocks[j].bytes_progress * 10 / pp->blocks[j].block_size);
}
else if (pp->blocks[j].state == block_info::finished) color = esc("32;7");
else if (pp->blocks[j].state == block_info::writing) color = esc("36;7");
else if (pp->blocks[j].state == block_info::requested) color = esc("0");
else { color = esc("0"); chr = ' '; }
printf("%s%s%c%s", color, multi_req, chr, esc("0"));
}
printf("%s]", esc("0"));
if (deadline != 0.f)
printf(" deadline: %f last-req: %f timed_out: %d\n"
, deadline, last_request, timed_out);
else
puts("\n");
}
#endif // TORRENT_DEBUG_STREAMING
struct busy_block_t
{
int peers;
int index;
bool operator<(busy_block_t rhs) const { return peers < rhs.peers; }
};
void pick_busy_blocks(int piece, int blocks_in_piece
, int timed_out
, std::vector<piece_block>& interesting_blocks
, piece_picker::downloading_piece const& pi)
{
// if there aren't any free blocks in the piece, and the piece is
// old enough, we may switch into busy mode for this piece. In this
// case busy_blocks and busy_count are set to contain the eligible
// busy blocks we may pick
// first, figure out which blocks are eligible for picking
// in "busy-mode"
busy_block_t* busy_blocks
= TORRENT_ALLOCA(busy_block_t, blocks_in_piece);
int busy_count = 0;
// pick busy blocks from the piece
for (int k = 0; k < blocks_in_piece; ++k)
{
// only consider blocks that have been requested
// and we're still waiting for them
if (pi.info[k].state != piece_picker::block_info::state_requested)
continue;
piece_block b(piece, k);
// only allow a single additional request per block, in order
// to spread it out evenly across all stalled blocks
if (pi.info[k].num_peers > timed_out)
continue;
busy_blocks[busy_count].peers = pi.info[k].num_peers;
busy_blocks[busy_count].index = k;
++busy_count;
#ifdef TORRENT_DEBUG_STREAMING
printf(" [%d (%d)]", b.block_index, pi.info[k].num_peers);
#endif
}
#ifdef TORRENT_DEBUG_STREAMING
printf("\n");
#endif
// then sort blocks by the number of peers with requests
// to the blocks (request the blocks with the fewest peers
// first)
std::sort(busy_blocks, busy_blocks + busy_count);
// then insert them into the interesting_blocks vector
for (int k = 0; k < busy_count; ++k)
{
interesting_blocks.push_back(
piece_block(piece, busy_blocks[k].index));
}
}
void pick_time_critical_block(std::vector<peer_connection*>& peers
, std::vector<peer_connection*>& ignore_peers
, std::set<peer_connection*>& peers_with_requests
, piece_picker::downloading_piece const& pi
, time_critical_piece* i
, piece_picker* picker
, int blocks_in_piece
, int timed_out)
{
std::vector<piece_block> interesting_blocks;
std::vector<piece_block> backup1;
std::vector<piece_block> backup2;
std::vector<int> ignore;
ptime now = time_now();
// loop until every block has been requested from this piece (i->piece)
do
{
// if this peer's download time exceeds 2 seconds, we're done.
// We don't want to build unreasonably long request queues
if (!peers.empty() && peers[0]->download_queue_time() > milliseconds(2000))
break;
// pick the peer with the lowest download_queue_time that has i->piece
std::vector<peer_connection*>::iterator p = std::find_if(peers.begin(), peers.end()
, boost::bind(&peer_connection::has_piece, _1, i->piece));
// obviously we'll have to skip it if we don't have a peer that has
// this piece
if (p == peers.end())
{
#ifdef TORRENT_DEBUG_STREAMING
printf("out of peers, done\n");
#endif
break;
}
peer_connection& c = **p;
interesting_blocks.clear();
backup1.clear();
backup2.clear();
// specifically request blocks with no affinity towards fast or slow
// pieces. If we would, the picked block might end up in one of
// the backup lists
picker->add_blocks(i->piece, c.get_bitfield(), interesting_blocks
, backup1, backup2, blocks_in_piece, 0, c.peer_info_struct()
, ignore, piece_picker::none, 0);
interesting_blocks.insert(interesting_blocks.end()
, backup1.begin(), backup1.end());
interesting_blocks.insert(interesting_blocks.end()
, backup2.begin(), backup2.end());
bool busy_mode = false;
if (interesting_blocks.empty())
{
busy_mode = true;
#ifdef TORRENT_DEBUG_STREAMING
printf("interesting_blocks.empty()\n");
#endif
// there aren't any free blocks to pick, and the piece isn't
// old enough to pick busy blocks yet. break to continue to
// the next piece.
if (timed_out == 0)
{
#ifdef TORRENT_DEBUG_STREAMING
printf("not timed out, moving on to next piece\n");
#endif
break;
}
#ifdef TORRENT_DEBUG_STREAMING
printf("pick busy blocks\n");
#endif
pick_busy_blocks(i->piece, blocks_in_piece, timed_out
, interesting_blocks, pi);
}
// we can't pick anything from this piece, we're done with it.
// move on to the next one
if (interesting_blocks.empty()) break;
piece_block b = interesting_blocks.front();
if (busy_mode)
{
// in busy mode we need to make sure we don't do silly
// things like requesting the same block twice from the
// same peer
std::vector<pending_block> const& dq = c.download_queue();
bool already_requested = std::find_if(dq.begin(), dq.end()
, has_block(b)) != dq.end();
if (already_requested)
{
// if the piece is stalled, we may end up picking a block
// that we've already requested from this peer. If so, we should
// simply disregard this peer from this piece, since this peer
// is likely to be causing the stall. We should request it
// from the next peer in the list
// the peer will be put back in the set for the next piece
ignore_peers.push_back(*p);
peers.erase(p);
#ifdef TORRENT_DEBUG_STREAMING
printf("piece already requested by peer, try next peer\n");
#endif
// try next peer
continue;
}
std::vector<pending_block> const& rq = c.request_queue();
bool already_in_queue = std::find_if(rq.begin(), rq.end()
, has_block(b)) != rq.end();
if (already_in_queue)
{
if (!c.make_time_critical(b))
{
#ifdef TORRENT_DEBUG_STREAMING
printf("piece already time-critical and in queue for peer, trying next peer\n");
#endif
ignore_peers.push_back(*p);
peers.erase(p);
continue;
}
i->last_requested = now;
#ifdef TORRENT_DEBUG_STREAMING
printf("piece already in queue for peer, making time-critical\n");
#endif
// we inserted a new block in the request queue, this
// makes us actually send it later
peers_with_requests.insert(peers_with_requests.begin(), &c);
// try next peer
continue;
}
}
if (!c.add_request(b, peer_connection::req_time_critical
| (busy_mode ? peer_connection::req_busy : 0)))
{
#ifdef TORRENT_DEBUG_STREAMING
printf("failed to request block [%d, %d]\n"
, b.piece_index, b.block_index);
#endif
ignore_peers.push_back(*p);
peers.erase(p);
continue;
}
#ifdef TORRENT_DEBUG_STREAMING
printf("requested block [%d, %d]\n"
, b.piece_index, b.block_index);
#endif
if (!busy_mode) i->last_requested = now;
peers_with_requests.insert(peers_with_requests.begin(), &c);
if (i->first_requested == min_time()) i->first_requested = now;
if (!c.can_request_time_critical())
{
#ifdef TORRENT_DEBUG_STREAMING
printf("peer cannot pick time critical pieces\n");
#endif
peers.erase(p);
// try next peer
continue;
}
// resort p, since it will have a higher download_queue_time now
while (p != peers.end()-1 && (*p)->download_queue_time()
> (*(p+1))->download_queue_time())
{
std::iter_swap(p, p+1);
++p;
}
} while (!interesting_blocks.empty());
}
void torrent::request_time_critical_pieces() void torrent::request_time_critical_pieces()
{ {
TORRENT_ASSERT(m_ses.is_network_thread()); TORRENT_ASSERT(m_ses.is_network_thread());
TORRENT_ASSERT(!upload_mode());
// build a list of peers and sort it by download_queue_time // build a list of peers and sort it by download_queue_time
// we use this sorted list to determine which peer we should // we use this sorted list to determine which peer we should
// request a block from. The higher up a peer is in the list, // request a block from. The higher up a peer is in the list,
@ -8234,11 +8583,6 @@ namespace libtorrent
std::set<peer_connection*> peers_with_requests; std::set<peer_connection*> peers_with_requests;
std::vector<piece_block> interesting_blocks;
std::vector<piece_block> backup1;
std::vector<piece_block> backup2;
std::vector<int> ignore;
// peers that should be temporarily ignored for a specific piece // peers that should be temporarily ignored for a specific piece
// in order to give priority to other peers. They should be used for // in order to give priority to other peers. They should be used for
// subsequent pieces, so they are stored in this vector until the // subsequent pieces, so they are stored in this vector until the
@ -8253,33 +8597,75 @@ namespace libtorrent
for (std::deque<time_critical_piece>::iterator i = m_time_critical_pieces.begin() for (std::deque<time_critical_piece>::iterator i = m_time_critical_pieces.begin()
, end(m_time_critical_pieces.end()); i != end; ++i) , end(m_time_critical_pieces.end()); i != end; ++i)
{ {
if (peers.empty()) break; #ifdef TORRENT_DEBUG_STREAMING
printf("considering %d\n", i->piece);
#endif
// the +1000 is to compensate for the fact that we only call this function if (peers.empty())
// once per second, so if we need to request it 500 ms from now, we should request {
// it right away #ifdef TORRENT_DEBUG_STREAMING
printf("out of peers, done\n");
#endif
break;
}
// the +1000 is to compensate for the fact that we only call this
// function once per second, so if we need to request it 500 ms from
// now, we should request it right away
if (i != m_time_critical_pieces.begin() && i->deadline > now if (i != m_time_critical_pieces.begin() && i->deadline > now
+ milliseconds(m_average_piece_time + m_piece_time_deviation * 4 + 1000)) + milliseconds(m_average_piece_time + m_piece_time_deviation * 4 + 1000))
{ {
// don't request pieces whose deadline is too far in the future // don't request pieces whose deadline is too far in the future
// this is one of the termination conditions. We don't want to // this is one of the termination conditions. We don't want to
// send requests for all pieces in the torrent right away // send requests for all pieces in the torrent right away
#ifdef TORRENT_DEBUG_STREAMING
printf("reached deadline horizon [%f + %f * 4 + 1]\n"
, m_average_piece_time / 1000.f
, m_piece_time_deviation / 1000.f);
#endif
break; break;
} }
piece_picker::downloading_piece pi; piece_picker::downloading_piece pi;
m_picker->piece_info(i->piece, pi); m_picker->piece_info(i->piece, pi);
bool timed_out = false; // the number of "times" this piece has timed out.
int timed_out = 0;
int blocks_in_piece = m_picker->blocks_in_piece(i->piece);
#ifdef TORRENT_DEBUG_STREAMING
i->timed_out = timed_out;
#endif
int free_to_request = blocks_in_piece
- pi.finished - pi.writing - pi.requested;
int free_to_request = m_picker->blocks_in_piece(i->piece) - pi.finished - pi.writing - pi.requested;
if (free_to_request == 0) if (free_to_request == 0)
{ {
if (i->last_requested == min_time())
i->last_requested = now;
// if it's been more than half of the typical download time
// of a piece since we requested the last block, allow
// one more request per block
if (m_average_piece_time > 0)
timed_out = total_milliseconds(now - i->last_requested)
/ (std::max)(int(m_average_piece_time + m_piece_time_deviation / 2), 1);
#ifdef TORRENT_DEBUG_STREAMING
i->timed_out = timed_out;
#endif
// every block in this piece is already requested // every block in this piece is already requested
// there's no need to consider this piece, unless it // there's no need to consider this piece, unless it
// appears to be stalled. // appears to be stalled.
if (pi.requested == 0 || i->last_requested + milliseconds(m_average_piece_time) > now) if (pi.requested == 0 || timed_out == 0)
{ {
#ifdef TORRENT_DEBUG_STREAMING
printf("skipping %d (full) [req: %d timed_out: %d ]\n"
, i->piece, pi.requested
, timed_out);
#endif
// if requested is 0, it meants all blocks have been received, and // if requested is 0, it meants all blocks have been received, and
// we're just waiting for it to flush them to disk. // we're just waiting for it to flush them to disk.
// if last_requested is recent enough, we should give it some // if last_requested is recent enough, we should give it some
@ -8288,103 +8674,19 @@ namespace libtorrent
continue; continue;
} }
// it's been too long since we requested the last block from this piece. Allow re-requesting // it's been too long since we requested the last block from
// blocks from this piece // this piece. Allow re-requesting blocks from this piece
timed_out = true; #ifdef TORRENT_DEBUG_STREAMING
printf("timed out [average-piece-time: %d ms ]\n"
, m_average_piece_time);
#endif
} }
// loop until every block has been requested from this piece (i->piece) // pick all blocks
do pick_time_critical_block(peers, ignore_peers
{ , peers_with_requests
// pick the peer with the lowest download_queue_time that has i->piece , pi, &*i, m_picker.get()
std::vector<peer_connection*>::iterator p = std::find_if(peers.begin(), peers.end() , blocks_in_piece, timed_out);
, boost::bind(&peer_connection::has_piece, _1, i->piece));
// obviously we'll have to skip it if we don't have a peer that has this piece
if (p == peers.end()) break;
peer_connection& c = **p;
interesting_blocks.clear();
backup1.clear();
backup2.clear();
// specifically request blocks with no affinity towards fast or slow
// pieces. If we would, the picked block might end up in one of
// the backup lists
m_picker->add_blocks(i->piece, c.get_bitfield(), interesting_blocks
, backup1, backup2, 1, 0, c.peer_info_struct()
, ignore, piece_picker::none, 0);
std::vector<pending_block> const& rq = c.request_queue();
std::vector<pending_block> const& dq = c.download_queue();
bool added_request = false;
bool busy_blocks = false;
if (timed_out && interesting_blocks.empty())
{
// if the piece has timed out, allow requesting back-up blocks
interesting_blocks.swap(backup1.empty() ? backup2 : backup1);
busy_blocks = true;
}
if (!interesting_blocks.empty())
{
bool already_requested = std::find_if(dq.begin(), dq.end()
, has_block(interesting_blocks.front())) != dq.end();
if (already_requested)
{
// if the piece is stalled, we may end up picking a block
// that we've already requested from this peer. If so, we should
// simply disregard this peer from this piece, since this peer
// is likely to be causing the stall. We should request it
// from the next peer in the list
// the peer will be put back in the set for the next piece
ignore_peers.push_back(*p);
peers.erase(p);
continue;
}
bool already_in_queue = std::find_if(rq.begin(), rq.end()
, has_block(interesting_blocks.front())) != rq.end();
if (already_in_queue)
{
c.make_time_critical(interesting_blocks.front());
added_request = true;
}
else
{
if (!c.add_request(interesting_blocks.front(), peer_connection::req_time_critical
| (busy_blocks ? peer_connection::req_busy : 0)))
{
peers.erase(p);
continue;
}
added_request = true;
}
}
if (added_request)
{
peers_with_requests.insert(peers_with_requests.begin(), &c);
if (i->first_requested == min_time()) i->first_requested = now;
if (!c.can_request_time_critical())
{
peers.erase(p);
}
else
{
// resort p, since it will have a higher download_queue_time now
while (p != peers.end()-1 && (*p)->download_queue_time() > (*(p+1))->download_queue_time())
{
std::iter_swap(p, p+1);
++p;
}
}
}
} while (!interesting_blocks.empty());
peers.insert(peers.begin(), ignore_peers.begin(), ignore_peers.end()); peers.insert(peers.begin(), ignore_peers.begin(), ignore_peers.end());
ignore_peers.clear(); ignore_peers.clear();
@ -8396,6 +8698,27 @@ namespace libtorrent
{ {
(*i)->send_block_requests(); (*i)->send_block_requests();
} }
#ifdef TORRENT_DEBUG_STREAMING
std::vector<partial_piece_info> queue;
get_download_queue(&queue);
std::vector<peer_info> peer_list;
get_peer_info(peer_list);
std::sort(queue.begin(), queue.end(), boost::bind(&partial_piece_info::piece_index, _1)
< boost::bind(&partial_piece_info::piece_index, _2));
puts("\033[2J\033[0;0H");
printf("average piece download time: %.2f s (+/- %.2f s)\n"
, m_average_piece_time / 1000.f
, m_piece_time_deviation / 1000.f);
for (std::vector<partial_piece_info>::iterator i = queue.begin()
, end(queue.end()); i != end; ++i)
{
print_piece(&*i, peer_list, m_time_critical_pieces);
}
#endif // TORRENT_DEBUG_STREAMING
} }
std::set<std::string> torrent::web_seeds(web_seed_entry::type_t type) const std::set<std::string> torrent::web_seeds(web_seed_entry::type_t type) const