From 3192e59a1a7801aa3ffcc60d08da46c73e1e893f Mon Sep 17 00:00:00 2001 From: Arvid Norberg Date: Tue, 22 Apr 2014 04:21:14 +0000 Subject: [PATCH] improve piece_deadline/streaming --- ChangeLog | 1 + include/libtorrent/peer_connection.hpp | 4 +- include/libtorrent/piece_picker.hpp | 6 +- include/libtorrent/torrent.hpp | 46 ++- src/peer_connection.cpp | 63 ++- src/piece_picker.cpp | 102 +++-- src/policy.cpp | 27 +- src/torrent.cpp | 549 ++++++++++++++++++++----- 8 files changed, 620 insertions(+), 178 deletions(-) diff --git a/ChangeLog b/ChangeLog index 5f3ac6e29..88c378ac2 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,4 @@ + * improve piece_deadline/streaming * honor pieces with priority 7 in sequential download mode * simplified building python bindings * make ignore_non_routers more forgiving in the case there are no UPnP diff --git a/include/libtorrent/peer_connection.hpp b/include/libtorrent/peer_connection.hpp index 7177bf76a..f4cbff94e 100644 --- a/include/libtorrent/peer_connection.hpp +++ b/include/libtorrent/peer_connection.hpp @@ -510,7 +510,9 @@ namespace libtorrent bool can_request_time_critical() const; - void make_time_critical(piece_block const& block); + // returns true if the specified block was actually made time-critical. + // if the block was already time-critical, it returns false. + bool make_time_critical(piece_block const& block); // adds a block to the request queue // returns true if successful, false otherwise diff --git a/include/libtorrent/piece_picker.hpp b/include/libtorrent/piece_picker.hpp index 4cde17b96..00bc275a3 100644 --- a/include/libtorrent/piece_picker.hpp +++ b/include/libtorrent/piece_picker.hpp @@ -151,7 +151,11 @@ namespace libtorrent // have affinity to pieces with the same speed category speed_affinity = 32, // ignore the prefer_whole_pieces parameter - ignore_whole_pieces = 64 + ignore_whole_pieces = 64, + // treat pieces with priority 6 and below as filtered + // to trigger end-game mode until all prio 7 pieces are + // completed + time_critical_mode = 128 }; struct downloading_piece diff --git a/include/libtorrent/torrent.hpp b/include/libtorrent/torrent.hpp index 2a61c9b79..1910ab74e 100644 --- a/include/libtorrent/torrent.hpp +++ b/include/libtorrent/torrent.hpp @@ -80,6 +80,8 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/peer_connection.hpp" #endif +//#define TORRENT_DEBUG_STREAMING 1 + namespace libtorrent { class http_parser; @@ -104,6 +106,29 @@ namespace libtorrent struct piece_checker_data; } + struct time_critical_piece + { + // when this piece was first requested + ptime first_requested; + // when this piece was last requested + ptime last_requested; + // by what time we want this piece + ptime deadline; + // 1 = send alert with piece data when available + int flags; + // how many peers it's been requested from + int peers; + // the piece index + int piece; +#ifdef TORRENT_DEBUG_STREAMING + // the number of multiple requests are allowed + // to blocks still not downloaded (debugging only) + int timed_out; +#endif + bool operator<(time_critical_piece const& rhs) const + { return deadline < rhs.deadline; } + }; + // a torrent is a class that holds information // for a specific download. It updates itself against // the tracker @@ -874,6 +899,9 @@ namespace libtorrent boost::asio::ssl::context* ssl_ctx() const { return m_ssl_ctx.get(); } #endif + int num_time_critical_pieces() const + { return m_time_critical_pieces.size(); } + private: void on_files_deleted(int ret, disk_io_job const& j); @@ -1013,24 +1041,6 @@ namespace libtorrent std::vector m_trackers; // this is an index into m_trackers - struct time_critical_piece - { - // when this piece was first requested - ptime first_requested; - // when this piece was last requested - ptime last_requested; - // by what time we want this piece - ptime deadline; - // 1 = send alert with piece data when available - int flags; - // how many peers it's been requested from - int peers; - // the piece index - int piece; - bool operator<(time_critical_piece const& rhs) const - { return deadline < rhs.deadline; } - }; - // this list is sorted by time_critical_piece::deadline std::deque m_time_critical_pieces; diff --git a/src/peer_connection.cpp b/src/peer_connection.cpp index bb07900c5..137fa5399 100644 --- a/src/peer_connection.cpp +++ b/src/peer_connection.cpp @@ -838,6 +838,11 @@ namespace libtorrent TORRENT_ASSERT(t); if (!t) return 0; + if (t->num_time_critical_pieces() > 0) + { + ret |= piece_picker::time_critical_mode; + } + if (t->is_sequential_download()) { ret |= piece_picker::sequential; @@ -963,12 +968,44 @@ namespace libtorrent time_duration peer_connection::download_queue_time(int extra_bytes) const { - int rate = m_statistics.transfer_rate(stat::download_payload) - + m_statistics.transfer_rate(stat::download_protocol); - // avoid division by zero - if (rate < 50) rate = 50; boost::shared_ptr t = m_torrent.lock(); TORRENT_ASSERT(t); + + int rate = 0; + + // if we haven't received any data recently, the current download rate + // is not representative + if (time_now() - m_last_piece > seconds(30) && m_download_rate_peak > 0) + { + rate = m_download_rate_peak; + } + else if (time_now() - m_last_unchoked < seconds(5) + && m_statistics.total_payload_upload() < 2 * 0x4000) + { + // if we're have only been unchoked for a short period of time, + // we don't know what rate we can get from this peer. Instead of assuming + // the lowest possible rate, assume the average. + + // TODO: this should only be peers we're trying to download from + int peers_with_requests = m_ses.num_connections(); + // avoid division by 0 + if (peers_with_requests == 0) peers_with_requests = 1; + + rate = m_ses.m_stat.transfer_rate(stat::download_payload) / peers_with_requests; + } + else + { + // current download rate in bytes per seconds + rate = m_statistics.transfer_rate(stat::download_payload) + + m_statistics.transfer_rate(stat::download_protocol); + } + + // avoid division by zero + if (rate < 50) rate = 50; + + // average of current rate and peak +// rate = (rate + m_download_rate_peak) / 2; + return seconds((m_outstanding_bytes + m_queued_time_critical * t->block_size()) / rate); } @@ -2891,17 +2928,17 @@ namespace libtorrent TORRENT_ASSERT(t); if (t->upload_mode()) return false; - // ignore snubbed peers, since they're not likely to return pieces in a timely - // manner anyway + // ignore snubbed peers, since they're not likely to return pieces in a + // timely manner anyway if (m_snubbed) return false; return true; } - void peer_connection::make_time_critical(piece_block const& block) + bool peer_connection::make_time_critical(piece_block const& block) { std::vector::iterator rit = std::find_if(m_request_queue.begin() , m_request_queue.end(), has_block(block)); - if (rit == m_request_queue.end()) return; + if (rit == m_request_queue.end()) return false; #if TORRENT_USE_ASSERTS boost::shared_ptr t = m_torrent.lock(); TORRENT_ASSERT(t); @@ -2909,11 +2946,12 @@ namespace libtorrent TORRENT_ASSERT(t->picker().is_requested(block)); #endif // ignore it if it's already time critical - if (rit - m_request_queue.begin() < m_queued_time_critical) return; + if (rit - m_request_queue.begin() < m_queued_time_critical) return false; pending_block b = *rit; m_request_queue.erase(rit); m_request_queue.insert(m_request_queue.begin() + m_queued_time_critical, b); ++m_queued_time_critical; + return true; } bool peer_connection::add_request(piece_block const& block, int flags) @@ -2959,11 +2997,14 @@ namespace libtorrent state = piece_picker::slow; } - if (flags & req_busy) + if ((flags & req_busy) && !(flags & req_time_critical)) { // this block is busy (i.e. it has been requested // from another peer already). Only allow one busy // request in the pipeline at the time + // this rule does not apply to time critical pieces, + // in which case we are allowed to pick more than one + // busy blocks for (std::vector::const_iterator i = m_download_queue.begin() , end(m_download_queue.end()); i != end; ++i) { @@ -4045,7 +4086,7 @@ namespace libtorrent return; } - int download_rate = statistics().download_rate(); + int download_rate = statistics().download_payload_rate(); // calculate the desired download queue size const int queue_time = m_ses.settings().request_queue_time; diff --git a/src/piece_picker.cpp b/src/piece_picker.cpp index 6d0be6ad3..06f1328b1 100644 --- a/src/piece_picker.cpp +++ b/src/piece_picker.cpp @@ -153,7 +153,6 @@ namespace libtorrent std::vector::const_iterator piece = find_dl_piece(index); TORRENT_ASSERT(piece != m_downloads.end()); st = *piece; - st.info = 0; return; } st.info = 0; @@ -1547,6 +1546,10 @@ namespace libtorrent for (std::vector::const_iterator i = m_downloads.begin() , end(m_downloads.end()); i != end; ++i) { + // in time critical mode, only pick prio 7 pieces + if ((options & time_critical_mode) && piece_priority(i->index) != 7) + continue; + if (!is_piece_free(i->index, pieces)) continue; if (m_piece_map[i->index].full && backup_blocks.size() >= num_blocks @@ -1573,6 +1576,10 @@ namespace libtorrent for (std::vector::const_iterator i = suggested_pieces.begin(); i != suggested_pieces.end(); ++i) { + // in time critical mode, only pick prio 7 pieces + if ((options & time_critical_mode) && piece_priority(*i) != 7) + continue; + if (!is_piece_free(*i, pieces)) continue; num_blocks = add_blocks(*i, pieces , interesting_blocks, backup_blocks @@ -1600,34 +1607,38 @@ namespace libtorrent if (num_blocks <= 0) return; } - if (options & reverse) + // in time critical mode, only pick prio 7 pieces + if ((options & time_critical_mode) == 0) { - for (int i = m_reverse_cursor - 1; i >= m_cursor; --i) - { - if (!is_piece_free(i, pieces)) continue; - // we've already added prio 7 pieces - if (piece_priority(i) == 7) continue; - num_blocks = add_blocks(i, pieces - , interesting_blocks, backup_blocks - , backup_blocks2, num_blocks - , prefer_whole_pieces, peer, suggested_pieces - , speed, options); - if (num_blocks <= 0) return; + if (options & reverse) + { + for (int i = m_reverse_cursor - 1; i >= m_cursor; --i) + { + if (!is_piece_free(i, pieces)) continue; + // we've already added prio 7 pieces + if (piece_priority(i) == 7) continue; + num_blocks = add_blocks(i, pieces + , interesting_blocks, backup_blocks + , backup_blocks2, num_blocks + , prefer_whole_pieces, peer, suggested_pieces + , speed, options); + if (num_blocks <= 0) return; + } } - } - else - { - for (int i = m_cursor; i < m_reverse_cursor; ++i) - { - if (!is_piece_free(i, pieces)) continue; - // we've already added prio 7 pieces - if (piece_priority(i) == 7) continue; - num_blocks = add_blocks(i, pieces - , interesting_blocks, backup_blocks - , backup_blocks2, num_blocks - , prefer_whole_pieces, peer, suggested_pieces - , speed, options); - if (num_blocks <= 0) return; + else + { + for (int i = m_cursor; i < m_reverse_cursor; ++i) + { + if (!is_piece_free(i, pieces)) continue; + // we've already added prio 7 pieces + if (piece_priority(i) == 7) continue; + num_blocks = add_blocks(i, pieces + , interesting_blocks, backup_blocks + , backup_blocks2, num_blocks + , prefer_whole_pieces, peer, suggested_pieces + , speed, options); + if (num_blocks <= 0) return; + } } } } @@ -1636,7 +1647,11 @@ namespace libtorrent if (m_dirty) update_pieces(); TORRENT_ASSERT(!m_dirty); - if (options & reverse) + // in time critical mode, we're only allowed to pick prio 7 + // pieces. This is why reverse mode is disabled when we're in + // time-critical mode, because all prio 7 pieces are at the front + // of the list + if ((options & reverse) && (options & time_critical_mode) == 0) { // it's a bit complicated in order to always prioritize // partial pieces, and respect priorities. Every chunk @@ -1673,6 +1688,13 @@ namespace libtorrent for (std::vector::const_iterator i = m_pieces.begin(); i != m_pieces.end(); ++i) { + // in time critical mode, only pick prio 7 pieces + // it's safe to break here because in this mode we + // pick pieces in priority order. Once we hit a lower priority + // piece, we won't encounter any more prio 7 ones + if ((options & time_critical_mode) && piece_priority(*i) != 7) + break; + if (!is_piece_free(*i, pieces)) continue; num_blocks = add_blocks(*i, pieces , interesting_blocks, backup_blocks @@ -1683,8 +1705,25 @@ namespace libtorrent } } } + else if (options & time_critical_mode) + { + // if we're in time-critical mode, we are only allowed to pick + // prio 7 pieces. + for (std::vector::const_iterator i = m_pieces.begin(); + i != m_pieces.end() && piece_priority(*i) == 7; ++i) + { + if (!is_piece_free(*i, pieces)) continue; + num_blocks = add_blocks(*i, pieces + , interesting_blocks, backup_blocks + , backup_blocks2, num_blocks + , prefer_whole_pieces, peer, suggested_pieces + , speed, options); + if (num_blocks <= 0) return; + } + } else { + // we're not using rarest first (only for the first // bucket, since that's where the currently downloading // pieces are) @@ -1808,6 +1847,9 @@ namespace libtorrent if (!pieces[i->index]) continue; if (piece_priority(i->index) == 0) continue; + if ((options & time_critical_mode) && piece_priority(i->index) != 7) + continue; + int num_blocks_in_piece = blocks_in_piece(i->index); for (int j = 0; j < num_blocks_in_piece; ++j) { @@ -1818,7 +1860,7 @@ namespace libtorrent interesting_blocks.begin(), interesting_blocks.end() , piece_block(i->index, j)); if (k != interesting_blocks.end()) continue; - + fprintf(stderr, "interesting blocks:\n"); for (k = interesting_blocks.begin(); k != interesting_blocks.end(); ++k) fprintf(stderr, "(%d, %d)", k->piece_index, k->block_index); @@ -1838,7 +1880,7 @@ namespace libtorrent } } - if (interesting_blocks.empty()) + if (interesting_blocks.empty() && !(options & time_critical_mode)) { // print_pieces(); for (int i = 0; i < num_pieces(); ++i) diff --git a/src/policy.cpp b/src/policy.cpp index 2eb774515..4243d4029 100644 --- a/src/policy.cpp +++ b/src/policy.cpp @@ -244,7 +244,17 @@ namespace libtorrent TORRENT_ASSERT(t.valid_metadata()); TORRENT_ASSERT(c.peer_info_struct() != 0 || c.type() != peer_connection::bittorrent_connection); - int num_requests = c.desired_queue_size() + + bool time_critical_mode = t.num_time_critical_pieces() > 0; + + int desired_queue_size = c.desired_queue_size(); + + // in time critical mode, only have 1 outstanding request at a time + // via normal requests + if (time_critical_mode) + desired_queue_size = (std::min)(1, desired_queue_size); + + int num_requests = desired_queue_size - (int)c.download_queue().size() - (int)c.request_queue().size(); @@ -262,7 +272,7 @@ namespace libtorrent int prefer_whole_pieces = c.prefer_whole_pieces(); - if (prefer_whole_pieces == 0) + if (prefer_whole_pieces == 0 && !time_critical_mode) { prefer_whole_pieces = c.statistics().download_payload_rate() * t.settings().whole_pieces_threshold @@ -331,9 +341,11 @@ namespace libtorrent // and we're not strictly speaking in end-game mode yet // also, if we already have at least one outstanding // request, we shouldn't pick any busy pieces either - bool dont_pick_busy_blocks = (ses.m_settings.strict_end_game_mode + // in time critical mode, it's OK to request busy blocks + bool dont_pick_busy_blocks = ((ses.m_settings.strict_end_game_mode && p.num_downloading_pieces() < p.num_want_left()) - || dq.size() + rq.size() > 0; + || dq.size() + rq.size() > 0) + && !time_critical_mode; // this is filled with an interesting piece // that some other peer is currently downloading @@ -348,6 +360,13 @@ namespace libtorrent if (prefer_whole_pieces == 0 && num_requests <= 0) break; + if (time_critical_mode && p.piece_priority(i->piece_index) != 7) + { + // assume the subsequent pieces are not prio 7 and + // be done + break; + } + int num_block_requests = p.num_peers(*i); if (num_block_requests > 0) { diff --git a/src/torrent.cpp b/src/torrent.cpp index 849ae24ff..0e36181a2 100644 --- a/src/torrent.cpp +++ b/src/torrent.cpp @@ -78,6 +78,7 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/gzip.hpp" // for inflate_gzip #include "libtorrent/random.hpp" #include "libtorrent/string_util.hpp" // for allocate_string_copy +#include "libtorrent/alloca.hpp" #ifdef TORRENT_USE_OPENSSL #include "libtorrent/ssl_stream.hpp" @@ -3802,8 +3803,8 @@ namespace libtorrent --i; } // just in case this piece had priority 0 - if (m_picker->piece_priority(piece) == 0) - m_picker->set_piece_priority(piece, 1); + int prev_prio = m_picker->piece_priority(piece); + m_picker->set_piece_priority(piece, 7); return; } @@ -3819,8 +3820,8 @@ namespace libtorrent m_time_critical_pieces.insert(i, p); // just in case this piece had priority 0 - if (m_picker->piece_priority(piece) == 0) - m_picker->set_piece_priority(piece, 1); + int prev_prio = m_picker->piece_priority(piece); + m_picker->set_piece_priority(piece, 7); piece_picker::downloading_piece pi; m_picker->piece_info(piece, pi); @@ -3875,9 +3876,9 @@ namespace libtorrent { int diff = abs(int(dl_time - m_average_piece_time)); if (m_piece_time_deviation == 0) m_piece_time_deviation = diff; - else m_piece_time_deviation = (m_piece_time_deviation * 6 + diff * 4) / 10; + else m_piece_time_deviation = (m_piece_time_deviation * 9 + diff) / 10; - m_average_piece_time = (m_average_piece_time * 6 + dl_time * 4) / 10; + m_average_piece_time = (m_average_piece_time * 9 + dl_time) / 10; } } } @@ -3887,6 +3888,7 @@ namespace libtorrent m_ses.m_alerts.post_alert(read_piece_alert( get_handle(), piece, error_code(boost::system::errc::operation_canceled, get_system_category()))); } + if (has_picker()) m_picker->set_piece_priority(piece, 1); m_time_critical_pieces.erase(i); return; } @@ -8212,9 +8214,356 @@ namespace libtorrent m_stat += s; } +#ifdef TORRENT_DEBUG_STREAMING + char const* esc(char const* code) + { + // this is a silly optimization + // to avoid copying of strings + enum { num_strings = 200 }; + static char buf[num_strings][20]; + static int round_robin = 0; + char* ret = buf[round_robin]; + ++round_robin; + if (round_robin >= num_strings) round_robin = 0; + ret[0] = '\033'; + ret[1] = '['; + int i = 2; + int j = 0; + while (code[j]) ret[i++] = code[j++]; + ret[i++] = 'm'; + ret[i++] = 0; + return ret; + } + + int peer_index(libtorrent::tcp::endpoint addr + , std::vector const& peers) + { + using namespace libtorrent; + std::vector::const_iterator i = std::find_if(peers.begin() + , peers.end(), boost::bind(&peer_info::ip, _1) == addr); + if (i == peers.end()) return -1; + + return i - peers.begin(); + } + + void print_piece(libtorrent::partial_piece_info* pp + , std::vector const& peers + , std::vector const& time_critical) + { + using namespace libtorrent; + + ptime now = time_now_hires(); + + float deadline = 0.f; + float last_request = 0.f; + int timed_out = -1; + + int piece = pp->piece_index; + std::vector::const_iterator i + = std::find_if(time_critical.begin(), time_critical.end() + , boost::bind(&time_critical_piece::piece, _1) == piece); + if (i != time_critical.end()) + { + deadline = total_milliseconds(i->deadline - now) / 1000.f; + last_request = total_milliseconds(now - i->last_requested) / 1000.f; + timed_out = i->timed_out; + } + + int num_blocks = pp->blocks_in_piece; + + printf("%5d: [", piece); + for (int j = 0; j < num_blocks; ++j) + { + int index = pp ? peer_index(pp->blocks[j].peer(), peers) % 36 : -1; + char chr = '+'; + if (index >= 0) + chr = (index < 10)?'0' + index:'A' + index - 10; + + char const* color = ""; + char const* multi_req = ""; + + if (pp->blocks[j].num_peers > 1) + multi_req = esc("1"); + + if (pp->blocks[j].bytes_progress > 0 + && pp->blocks[j].state == block_info::requested) + { + color = esc("33;7"); + chr = '0' + (pp->blocks[j].bytes_progress * 10 / pp->blocks[j].block_size); + } + else if (pp->blocks[j].state == block_info::finished) color = esc("32;7"); + else if (pp->blocks[j].state == block_info::writing) color = esc("36;7"); + else if (pp->blocks[j].state == block_info::requested) color = esc("0"); + else { color = esc("0"); chr = ' '; } + + printf("%s%s%c%s", color, multi_req, chr, esc("0")); + } + printf("%s]", esc("0")); + if (deadline != 0.f) + printf(" deadline: %f last-req: %f timed_out: %d\n" + , deadline, last_request, timed_out); + else + puts("\n"); + } +#endif // TORRENT_DEBUG_STREAMING + + struct busy_block_t + { + int peers; + int index; + bool operator<(busy_block_t rhs) const { return peers < rhs.peers; } + }; + + void pick_busy_blocks(int piece, int blocks_in_piece + , int timed_out + , std::vector& interesting_blocks + , piece_picker::downloading_piece const& pi) + { + // if there aren't any free blocks in the piece, and the piece is + // old enough, we may switch into busy mode for this piece. In this + // case busy_blocks and busy_count are set to contain the eligible + // busy blocks we may pick + // first, figure out which blocks are eligible for picking + // in "busy-mode" + busy_block_t* busy_blocks + = TORRENT_ALLOCA(busy_block_t, blocks_in_piece); + int busy_count = 0; + + // pick busy blocks from the piece + for (int k = 0; k < blocks_in_piece; ++k) + { + // only consider blocks that have been requested + // and we're still waiting for them + if (pi.info[k].state != piece_picker::block_info::state_requested) + continue; + + piece_block b(piece, k); + + // only allow a single additional request per block, in order + // to spread it out evenly across all stalled blocks + if (pi.info[k].num_peers > timed_out) + continue; + + busy_blocks[busy_count].peers = pi.info[k].num_peers; + busy_blocks[busy_count].index = k; + ++busy_count; + +#ifdef TORRENT_DEBUG_STREAMING + printf(" [%d (%d)]", b.block_index, pi.info[k].num_peers); +#endif + } +#ifdef TORRENT_DEBUG_STREAMING + printf("\n"); +#endif + + // then sort blocks by the number of peers with requests + // to the blocks (request the blocks with the fewest peers + // first) + std::sort(busy_blocks, busy_blocks + busy_count); + + // then insert them into the interesting_blocks vector + for (int k = 0; k < busy_count; ++k) + { + interesting_blocks.push_back( + piece_block(piece, busy_blocks[k].index)); + } + } + + void pick_time_critical_block(std::vector& peers + , std::vector& ignore_peers + , std::set& peers_with_requests + , piece_picker::downloading_piece const& pi + , time_critical_piece* i + , piece_picker* picker + , int blocks_in_piece + , int timed_out) + { + std::vector interesting_blocks; + std::vector backup1; + std::vector backup2; + std::vector ignore; + + ptime now = time_now(); + + // loop until every block has been requested from this piece (i->piece) + do + { + // if this peer's download time exceeds 2 seconds, we're done. + // We don't want to build unreasonably long request queues + if (!peers.empty() && peers[0]->download_queue_time() > milliseconds(2000)) + break; + + // pick the peer with the lowest download_queue_time that has i->piece + std::vector::iterator p = std::find_if(peers.begin(), peers.end() + , boost::bind(&peer_connection::has_piece, _1, i->piece)); + + // obviously we'll have to skip it if we don't have a peer that has + // this piece + if (p == peers.end()) + { +#ifdef TORRENT_DEBUG_STREAMING + printf("out of peers, done\n"); +#endif + break; + } + peer_connection& c = **p; + + interesting_blocks.clear(); + backup1.clear(); + backup2.clear(); + + // specifically request blocks with no affinity towards fast or slow + // pieces. If we would, the picked block might end up in one of + // the backup lists + picker->add_blocks(i->piece, c.get_bitfield(), interesting_blocks + , backup1, backup2, blocks_in_piece, 0, c.peer_info_struct() + , ignore, piece_picker::none, 0); + + interesting_blocks.insert(interesting_blocks.end() + , backup1.begin(), backup1.end()); + interesting_blocks.insert(interesting_blocks.end() + , backup2.begin(), backup2.end()); + + bool busy_mode = false; + + if (interesting_blocks.empty()) + { + busy_mode = true; + +#ifdef TORRENT_DEBUG_STREAMING + printf("interesting_blocks.empty()\n"); +#endif + + // there aren't any free blocks to pick, and the piece isn't + // old enough to pick busy blocks yet. break to continue to + // the next piece. + if (timed_out == 0) + { +#ifdef TORRENT_DEBUG_STREAMING + printf("not timed out, moving on to next piece\n"); +#endif + break; + } + +#ifdef TORRENT_DEBUG_STREAMING + printf("pick busy blocks\n"); +#endif + + pick_busy_blocks(i->piece, blocks_in_piece, timed_out + , interesting_blocks, pi); + } + + // we can't pick anything from this piece, we're done with it. + // move on to the next one + if (interesting_blocks.empty()) break; + + piece_block b = interesting_blocks.front(); + + if (busy_mode) + { + // in busy mode we need to make sure we don't do silly + // things like requesting the same block twice from the + // same peer + std::vector const& dq = c.download_queue(); + + bool already_requested = std::find_if(dq.begin(), dq.end() + , has_block(b)) != dq.end(); + if (already_requested) + { + // if the piece is stalled, we may end up picking a block + // that we've already requested from this peer. If so, we should + // simply disregard this peer from this piece, since this peer + // is likely to be causing the stall. We should request it + // from the next peer in the list + // the peer will be put back in the set for the next piece + ignore_peers.push_back(*p); + peers.erase(p); +#ifdef TORRENT_DEBUG_STREAMING + printf("piece already requested by peer, try next peer\n"); +#endif + // try next peer + continue; + } + + std::vector const& rq = c.request_queue(); + + bool already_in_queue = std::find_if(rq.begin(), rq.end() + , has_block(b)) != rq.end(); + + if (already_in_queue) + { + if (!c.make_time_critical(b)) + { +#ifdef TORRENT_DEBUG_STREAMING + printf("piece already time-critical and in queue for peer, trying next peer\n"); +#endif + ignore_peers.push_back(*p); + peers.erase(p); + continue; + } + i->last_requested = now; + +#ifdef TORRENT_DEBUG_STREAMING + printf("piece already in queue for peer, making time-critical\n"); +#endif + + // we inserted a new block in the request queue, this + // makes us actually send it later + peers_with_requests.insert(peers_with_requests.begin(), &c); + + // try next peer + continue; + } + } + + if (!c.add_request(b, peer_connection::req_time_critical + | (busy_mode ? peer_connection::req_busy : 0))) + { +#ifdef TORRENT_DEBUG_STREAMING + printf("failed to request block [%d, %d]\n" + , b.piece_index, b.block_index); +#endif + ignore_peers.push_back(*p); + peers.erase(p); + continue; + } + +#ifdef TORRENT_DEBUG_STREAMING + printf("requested block [%d, %d]\n" + , b.piece_index, b.block_index); +#endif + + if (!busy_mode) i->last_requested = now; + + peers_with_requests.insert(peers_with_requests.begin(), &c); + if (i->first_requested == min_time()) i->first_requested = now; + + if (!c.can_request_time_critical()) + { +#ifdef TORRENT_DEBUG_STREAMING + printf("peer cannot pick time critical pieces\n"); +#endif + peers.erase(p); + // try next peer + continue; + } + + // resort p, since it will have a higher download_queue_time now + while (p != peers.end()-1 && (*p)->download_queue_time() + > (*(p+1))->download_queue_time()) + { + std::iter_swap(p, p+1); + ++p; + } + } while (!interesting_blocks.empty()); + + } + void torrent::request_time_critical_pieces() { TORRENT_ASSERT(m_ses.is_network_thread()); + TORRENT_ASSERT(!upload_mode()); + // build a list of peers and sort it by download_queue_time // we use this sorted list to determine which peer we should // request a block from. The higher up a peer is in the list, @@ -8234,11 +8583,6 @@ namespace libtorrent std::set peers_with_requests; - std::vector interesting_blocks; - std::vector backup1; - std::vector backup2; - std::vector ignore; - // peers that should be temporarily ignored for a specific piece // in order to give priority to other peers. They should be used for // subsequent pieces, so they are stored in this vector until the @@ -8253,33 +8597,75 @@ namespace libtorrent for (std::deque::iterator i = m_time_critical_pieces.begin() , end(m_time_critical_pieces.end()); i != end; ++i) { - if (peers.empty()) break; +#ifdef TORRENT_DEBUG_STREAMING + printf("considering %d\n", i->piece); +#endif - // the +1000 is to compensate for the fact that we only call this function - // once per second, so if we need to request it 500 ms from now, we should request - // it right away + if (peers.empty()) + { +#ifdef TORRENT_DEBUG_STREAMING + printf("out of peers, done\n"); +#endif + break; + } + + // the +1000 is to compensate for the fact that we only call this + // function once per second, so if we need to request it 500 ms from + // now, we should request it right away if (i != m_time_critical_pieces.begin() && i->deadline > now + milliseconds(m_average_piece_time + m_piece_time_deviation * 4 + 1000)) { // don't request pieces whose deadline is too far in the future // this is one of the termination conditions. We don't want to // send requests for all pieces in the torrent right away +#ifdef TORRENT_DEBUG_STREAMING + printf("reached deadline horizon [%f + %f * 4 + 1]\n" + , m_average_piece_time / 1000.f + , m_piece_time_deviation / 1000.f); +#endif break; } piece_picker::downloading_piece pi; m_picker->piece_info(i->piece, pi); - bool timed_out = false; + // the number of "times" this piece has timed out. + int timed_out = 0; + + int blocks_in_piece = m_picker->blocks_in_piece(i->piece); + +#ifdef TORRENT_DEBUG_STREAMING + i->timed_out = timed_out; +#endif + int free_to_request = blocks_in_piece + - pi.finished - pi.writing - pi.requested; - int free_to_request = m_picker->blocks_in_piece(i->piece) - pi.finished - pi.writing - pi.requested; if (free_to_request == 0) { + if (i->last_requested == min_time()) + i->last_requested = now; + + // if it's been more than half of the typical download time + // of a piece since we requested the last block, allow + // one more request per block + if (m_average_piece_time > 0) + timed_out = total_milliseconds(now - i->last_requested) + / (std::max)(int(m_average_piece_time + m_piece_time_deviation / 2), 1); + +#ifdef TORRENT_DEBUG_STREAMING + i->timed_out = timed_out; +#endif // every block in this piece is already requested // there's no need to consider this piece, unless it // appears to be stalled. - if (pi.requested == 0 || i->last_requested + milliseconds(m_average_piece_time) > now) + if (pi.requested == 0 || timed_out == 0) { +#ifdef TORRENT_DEBUG_STREAMING + printf("skipping %d (full) [req: %d timed_out: %d ]\n" + , i->piece, pi.requested + , timed_out); +#endif + // if requested is 0, it meants all blocks have been received, and // we're just waiting for it to flush them to disk. // if last_requested is recent enough, we should give it some @@ -8288,103 +8674,19 @@ namespace libtorrent continue; } - // it's been too long since we requested the last block from this piece. Allow re-requesting - // blocks from this piece - timed_out = true; + // it's been too long since we requested the last block from + // this piece. Allow re-requesting blocks from this piece +#ifdef TORRENT_DEBUG_STREAMING + printf("timed out [average-piece-time: %d ms ]\n" + , m_average_piece_time); +#endif } - // loop until every block has been requested from this piece (i->piece) - do - { - // pick the peer with the lowest download_queue_time that has i->piece - std::vector::iterator p = std::find_if(peers.begin(), peers.end() - , boost::bind(&peer_connection::has_piece, _1, i->piece)); - - // obviously we'll have to skip it if we don't have a peer that has this piece - if (p == peers.end()) break; - peer_connection& c = **p; - - interesting_blocks.clear(); - backup1.clear(); - backup2.clear(); - // specifically request blocks with no affinity towards fast or slow - // pieces. If we would, the picked block might end up in one of - // the backup lists - m_picker->add_blocks(i->piece, c.get_bitfield(), interesting_blocks - , backup1, backup2, 1, 0, c.peer_info_struct() - , ignore, piece_picker::none, 0); - - std::vector const& rq = c.request_queue(); - std::vector const& dq = c.download_queue(); - - bool added_request = false; - bool busy_blocks = false; - - if (timed_out && interesting_blocks.empty()) - { - // if the piece has timed out, allow requesting back-up blocks - interesting_blocks.swap(backup1.empty() ? backup2 : backup1); - busy_blocks = true; - } - - if (!interesting_blocks.empty()) - { - bool already_requested = std::find_if(dq.begin(), dq.end() - , has_block(interesting_blocks.front())) != dq.end(); - if (already_requested) - { - // if the piece is stalled, we may end up picking a block - // that we've already requested from this peer. If so, we should - // simply disregard this peer from this piece, since this peer - // is likely to be causing the stall. We should request it - // from the next peer in the list - // the peer will be put back in the set for the next piece - ignore_peers.push_back(*p); - peers.erase(p); - continue; - } - - bool already_in_queue = std::find_if(rq.begin(), rq.end() - , has_block(interesting_blocks.front())) != rq.end(); - - if (already_in_queue) - { - c.make_time_critical(interesting_blocks.front()); - added_request = true; - } - else - { - if (!c.add_request(interesting_blocks.front(), peer_connection::req_time_critical - | (busy_blocks ? peer_connection::req_busy : 0))) - { - peers.erase(p); - continue; - } - added_request = true; - } - } - - if (added_request) - { - peers_with_requests.insert(peers_with_requests.begin(), &c); - if (i->first_requested == min_time()) i->first_requested = now; - - if (!c.can_request_time_critical()) - { - peers.erase(p); - } - else - { - // resort p, since it will have a higher download_queue_time now - while (p != peers.end()-1 && (*p)->download_queue_time() > (*(p+1))->download_queue_time()) - { - std::iter_swap(p, p+1); - ++p; - } - } - } - - } while (!interesting_blocks.empty()); + // pick all blocks + pick_time_critical_block(peers, ignore_peers + , peers_with_requests + , pi, &*i, m_picker.get() + , blocks_in_piece, timed_out); peers.insert(peers.begin(), ignore_peers.begin(), ignore_peers.end()); ignore_peers.clear(); @@ -8396,6 +8698,27 @@ namespace libtorrent { (*i)->send_block_requests(); } + +#ifdef TORRENT_DEBUG_STREAMING + std::vector queue; + get_download_queue(&queue); + + std::vector peer_list; + get_peer_info(peer_list); + + std::sort(queue.begin(), queue.end(), boost::bind(&partial_piece_info::piece_index, _1) + < boost::bind(&partial_piece_info::piece_index, _2)); + + puts("\033[2J\033[0;0H"); + printf("average piece download time: %.2f s (+/- %.2f s)\n" + , m_average_piece_time / 1000.f + , m_piece_time_deviation / 1000.f); + for (std::vector::iterator i = queue.begin() + , end(queue.end()); i != end; ++i) + { + print_piece(&*i, peer_list, m_time_critical_pieces); + } +#endif // TORRENT_DEBUG_STREAMING } std::set torrent::web_seeds(web_seed_entry::type_t type) const