From c0fb61c5d7ae98767f6f4c1b5a23c3b7faf31573 Mon Sep 17 00:00:00 2001 From: Arvid Norberg Date: Wed, 15 Jun 2016 13:29:54 -0400 Subject: [PATCH] replace the suggest-read-cache logic with something much simpler (#815) replace the suggest-read-cache logic with something much simpler that doesn't schedule regular jobs and doesn't query the disk cache --- ChangeLog | 1 + include/libtorrent/Makefile.am | 1 + include/libtorrent/aux_/session_impl.hpp | 8 - include/libtorrent/aux_/suggest_piece.hpp | 133 ++++++++++++++ include/libtorrent/peer_connection.hpp | 18 +- include/libtorrent/settings_pack.hpp | 2 +- include/libtorrent/torrent.hpp | 37 ++-- simulation/setup_swarm.cpp | 2 +- simulation/test_swarm.cpp | 29 ++- src/bt_peer_connection.cpp | 19 +- src/peer_connection.cpp | 132 ++++++++------ src/session_impl.cpp | 23 --- src/settings_pack.cpp | 4 +- src/torrent.cpp | 211 ++++------------------ 14 files changed, 299 insertions(+), 321 deletions(-) create mode 100644 include/libtorrent/aux_/suggest_piece.hpp diff --git a/ChangeLog b/ChangeLog index 0b0da7b05..9c36746c2 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,4 @@ + * simplified suggest-read-cache feature to not depend on disk threads * removed option to disable contiguous receive buffers * deprecated public to_hex() and from_hex() functions * separated address and port fields in listen alerts diff --git a/include/libtorrent/Makefile.am b/include/libtorrent/Makefile.am index 23d5b19af..d5b4a8d74 100644 --- a/include/libtorrent/Makefile.am +++ b/include/libtorrent/Makefile.am @@ -164,6 +164,7 @@ nobase_include_HEADERS = \ aux_/session_settings.hpp \ aux_/proxy_settings.hpp \ aux_/session_interface.hpp \ + aux_/suggest_piece.hpp \ aux_/time.hpp \ aux_/file_progress.hpp \ aux_/openssl.hpp \ diff --git a/include/libtorrent/aux_/session_impl.hpp b/include/libtorrent/aux_/session_impl.hpp index c9763b0e5..cd4ffde28 100644 --- a/include/libtorrent/aux_/session_impl.hpp +++ b/include/libtorrent/aux_/session_impl.hpp @@ -929,14 +929,6 @@ namespace libtorrent // torrents. int m_auto_scrape_time_scaler; - // the index of the torrent that we'll - // refresh the next time - int m_next_suggest_torrent; - - // this is a counter of the number of seconds until - // the next time the suggest pieces are refreshed - int m_suggest_timer; - // statistics gathered from all torrents. stat m_stat; diff --git a/include/libtorrent/aux_/suggest_piece.hpp b/include/libtorrent/aux_/suggest_piece.hpp new file mode 100644 index 000000000..bfe4f4d63 --- /dev/null +++ b/include/libtorrent/aux_/suggest_piece.hpp @@ -0,0 +1,133 @@ +/* + +Copyright (c) 2016, Arvid Norberg +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in + the documentation and/or other materials provided with the distribution. + * Neither the name of the author nor the names of its + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. + +*/ + +#ifndef TORRENT_SUGGEST_PIECE_HPP_INCLUDE +#define TORRENT_SUGGEST_PIECE_HPP_INCLUDE + +#include +#include + +#include "libtorrent/bitfield.hpp" +#include "libtorrent/sliding_average.hpp" + +namespace libtorrent { namespace aux { + + +struct suggest_piece +{ + // pick at most n piece indices that are _not_ in p (which represents + // pieces the peer has already sent a suggest for) nor in bits (which are + // pieces the peer already has, and should not be suggested) + int get_pieces(std::vector& p, bitfield const& bits, int n) + { + if (m_priority_pieces.empty()) return 0; + + int ret = 0; + + // the highest priority pieces are at the end of m_priority_pieces. + // is we add any piece to the result (p), the farther back the better. + // the prioritization in p is the same, which means we have to first push + // back and then reverse the items we put there. + for (int i = int(m_priority_pieces.size())-1; i >= 0; --i) + { + int const piece = m_priority_pieces[i]; + if (bits.get_bit(piece)) continue; + if (std::any_of(p.begin(), p.end() - ret + , [piece](int pi) { return pi == piece; })) + continue; + + p.push_back(piece); + ++ret; + --n; + if (n == 0) break; + } + + // this it to maintain a strict priority order of pieces. The farther + // back, the higher priority + std::reverse(p.end() - ret, p.end()); + + return ret; + } + + void add_piece(int const index, int const availability + , int const max_queue_size) + { + // keep a running average of the availability of pieces, and filter + // anything above average. + int const mean = m_availability.mean(); + m_availability.add_sample(availability); + + if (availability > mean) return; + + auto const it = std::find(m_priority_pieces.begin() + , m_priority_pieces.end() + , index); + + if (it != m_priority_pieces.end()) + { + // increase the priority of this piece by moving it to the front + // of the queue + m_priority_pieces.erase(it); + } + + if (int(m_priority_pieces.size()) >= max_queue_size) + { + int const to_remove = int(m_priority_pieces.size()) - max_queue_size + 1; + m_priority_pieces.erase(m_priority_pieces.begin() + , m_priority_pieces.begin() + to_remove); + } + + m_priority_pieces.push_back(index); + + std::printf("SUGGEST: "); + for (int p : m_priority_pieces) std::printf(" %d", p); + std::printf("\n"); + } + +private: + + // these are pieces that would be good candidates for suggesting + // to a peer. They represent low availability pieces that we recently + // read from disk (and are likely in our read cache). + // pieces closer to the end were inserted into the cache more recently and + // have higher priority + std::vector m_priority_pieces; + +// int m_get_peers = 0; + + sliding_average<30> m_availability; +}; + +}} + +#endif + diff --git a/include/libtorrent/peer_connection.hpp b/include/libtorrent/peer_connection.hpp index ef275443e..dc244ba38 100644 --- a/include/libtorrent/peer_connection.hpp +++ b/include/libtorrent/peer_connection.hpp @@ -745,6 +745,8 @@ namespace libtorrent virtual void on_sent(error_code const& error , std::size_t bytes_transferred) = 0; + void send_piece_suggestions(int num); + virtual int hit_send_barrier(std::vector&) { return INT_MAX; } @@ -974,9 +976,11 @@ namespace libtorrent aux::handler_storage m_read_handler_storage; aux::handler_storage m_write_handler_storage; - // we have suggested these pieces to the peer - // don't suggest it again - bitfield m_sent_suggested_pieces; + // these are pieces we have recently sent suggests for to this peer. + // it just serves as a queue to remember what we've sent, to avoid + // re-sending suggests for the same piece + // i.e. outgoing suggest pieces + std::vector m_suggest_pieces; // the pieces we will send to the peer // if requested (regardless of choke state) @@ -991,8 +995,9 @@ namespace libtorrent // requested (regardless of choke state) std::vector m_allowed_fast; - // pieces that has been suggested to be - // downloaded from this peer + // pieces that has been suggested to be downloaded from this peer + // i.e. incoming suggestions + // TODO: 2 this should really be a circular buffer std::vector m_suggested_pieces; // the time when this peer last saw a complete copy @@ -1164,8 +1169,7 @@ namespace libtorrent // pick any pieces from this peer bool m_no_download:1; - // set to true when we've sent the first round of suggests - bool m_sent_suggests:1; + // 1 bit // set to true while we're trying to holepunch bool m_holepunch_mode:1; diff --git a/include/libtorrent/settings_pack.hpp b/include/libtorrent/settings_pack.hpp index dc91b9eac..4c82aa7da 100644 --- a/include/libtorrent/settings_pack.hpp +++ b/include/libtorrent/settings_pack.hpp @@ -813,7 +813,7 @@ namespace libtorrent // suggest messages to create a bias of its peers to request certain // pieces. The modes are: // - // * ``no_piece_suggestsions`` which is the default and will not send + // * ``no_piece_suggestions`` which is the default and will not send // out suggest messages. // * ``suggest_read_cache`` which will send out suggest messages for // the most recent pieces that are in the read cache. diff --git a/include/libtorrent/torrent.hpp b/include/libtorrent/torrent.hpp index b507ee7f3..c4619fafd 100644 --- a/include/libtorrent/torrent.hpp +++ b/include/libtorrent/torrent.hpp @@ -74,6 +74,7 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/linked_list.hpp" #include "libtorrent/debug.hpp" #include "libtorrent/aux_/file_progress.hpp" +#include "libtorrent/aux_/suggest_piece.hpp" #if TORRENT_COMPLETE_TYPES_REQUIRED #include "libtorrent/peer_connection.hpp" @@ -374,7 +375,6 @@ namespace libtorrent void add_piece(int piece, char const* data, int flags = 0); void on_disk_write_complete(disk_io_job const* j , peer_request p); - void on_disk_cache_complete(disk_io_job const* j); void on_disk_tick_done(disk_io_job const* j); void schedule_storage_tick(); @@ -659,13 +659,9 @@ namespace libtorrent void get_peer_info(std::vector* v); void get_download_queue(std::vector* queue) const; - void add_suggest_piece(int piece); void update_suggest_piece(int index, int change); void update_auto_sequential(); - void refresh_suggest_pieces(); - void do_refresh_suggest_pieces(); - // -------------------------------------------- // TRACKER MANAGEMENT @@ -735,16 +731,6 @@ namespace libtorrent void recalc_share_mode(); - struct suggest_piece_t - { - int piece_index; - int num_peers; - bool operator<(suggest_piece_t const& p) const { return num_peers < p.num_peers; } - }; - - std::vector const& get_suggested_pieces() const - { return m_suggested_pieces; } - bool super_seeding() const { // we're not super seeding if we're not a seed @@ -1105,12 +1091,19 @@ namespace libtorrent void set_ssl_cert_buffer(std::string const& certificate , std::string const& private_key , std::string const& dh_params); - boost::asio::ssl::context* ssl_ctx() const { return m_ssl_ctx.get(); } + boost::asio::ssl::context* ssl_ctx() const { return m_ssl_ctx.get(); } #endif int num_time_critical_pieces() const { return int(m_time_critical_pieces.size()); } + int get_suggest_pieces(std::vector& p, bitfield const& bits + , int const n) + { + return m_suggest_pieces.get_pieces(p, bits, n); + } + void add_suggest_piece(int index); + private: void ip_filter_updated(); @@ -1233,12 +1226,12 @@ namespace libtorrent // this object is used to track download progress of individual files aux::file_progress m_file_progress; - // these are the pieces we're currently - // suggesting to peers. - std::vector m_suggested_pieces; + // a queue of the most recent low-availability pieces we accessed on disk. + // These are good candidates for suggesting other peers to request from + // us. + aux::suggest_piece m_suggest_pieces; std::vector m_trackers; - // this is an index into m_trackers // this list is sorted by time_critical_piece::deadline std::vector m_time_critical_pieces; @@ -1518,9 +1511,7 @@ namespace libtorrent // the number of unchoked peers in this torrent unsigned int m_num_uploads:24; - // when this is set, second_tick will perform the actual - // work of refreshing the suggest pieces - bool m_need_suggest_pieces_refresh:1; + // 1 bit here // this is set to true when the torrent starts up // The first tracker response, when this is true, diff --git a/simulation/setup_swarm.cpp b/simulation/setup_swarm.cpp index 38ed47554..8c3a85067 100644 --- a/simulation/setup_swarm.cpp +++ b/simulation/setup_swarm.cpp @@ -266,7 +266,7 @@ void setup_swarm(int num_nodes lt::deadline_timer timer(ios); lt::error_code ec; - int swarm_id = test_counter(); + int const swarm_id = test_counter(); std::string path = save_path(swarm_id, 0); lt::create_directory(path, ec); if (ec) std::fprintf(stderr, "failed to create directory: \"%s\": %s\n" diff --git a/simulation/test_swarm.cpp b/simulation/test_swarm.cpp index f7a657d14..c14017e21 100644 --- a/simulation/test_swarm.cpp +++ b/simulation/test_swarm.cpp @@ -125,27 +125,42 @@ TORRENT_TEST(session_stats) TORRENT_TEST(suggest) { - setup_swarm(2, swarm_test::download + int num_suggests = 0; + setup_swarm(10, swarm_test::upload // add session , [](lt::settings_pack& pack) { pack.set_int(settings_pack::suggest_mode, settings_pack::suggest_read_cache); + pack.set_int(settings_pack::max_suggest_pieces, 10); } // add torrent , [](lt::add_torrent_params& params) {} // on alert - , [](lt::alert const* a, lt::session& ses) {} + , [&num_suggests](lt::alert const* a, lt::session& ses) { + if (auto pl = alert_cast(a)) + { + if (pl->direction == peer_log_alert::outgoing_message + && pl->event_type == std::string("SUGGEST")) + { + ++num_suggests; + } + } + } // terminate , [](int ticks, lt::session& ses) -> bool { - if (ticks > 80) + if (ticks > 500) { - TEST_ERROR("timeout"); return true; } - if (!is_seed(ses)) return false; - std::printf("completed in %d ticks\n", ticks); - return true; + return false; }); + + // for now, just make sure we send any suggests at all. This feature is + // experimental and it's not entirely clear it's correct or how to verify + // that it does what it's supposed to do. + // perhaps a better way would be to look at piece upload distribution over + // time + TEST_CHECK(num_suggests > 0); } TORRENT_TEST(utp_only) diff --git a/src/bt_peer_connection.cpp b/src/bt_peer_connection.cpp index bdf490378..e9a64d2fe 100644 --- a/src/bt_peer_connection.cpp +++ b/src/bt_peer_connection.cpp @@ -317,6 +317,7 @@ namespace libtorrent if (m_state < read_peer_id) return; if (m_sent_bitfield) return; + boost::shared_ptr t = associated_torrent().lock(); TORRENT_ASSERT(t); write_bitfield(); @@ -483,7 +484,7 @@ namespace libtorrent bool bt_peer_connection::in_handshake() const { - return m_state < read_packet_size; + return !m_sent_handshake; } #if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) @@ -1046,7 +1047,7 @@ namespace libtorrent buffer::const_interval recv_buffer = m_recv_buffer.get(); const char* ptr = recv_buffer.begin + 1; - int index = detail::read_int32(ptr); + int const index = detail::read_int32(ptr); incoming_have(index); } @@ -1372,7 +1373,7 @@ namespace libtorrent buffer::const_interval recv_buffer = m_recv_buffer.get(); const char* ptr = recv_buffer.begin + 1; - int piece = detail::read_uint32(ptr); + int const piece = detail::read_uint32(ptr); incoming_suggest(piece); } @@ -2134,13 +2135,6 @@ namespace libtorrent const int num_pieces = t->torrent_file().num_pieces(); TORRENT_ASSERT(num_pieces > 0); - if (num_pieces <= 0) - { -#ifndef TORRENT_DISABLE_LOGGING - peer_log(peer_log_alert::info, "BITFIELD", "not sending bitfield, num_pieces == 0"); -#endif - return; - } int lazy_pieces[50]; int num_lazy_pieces = 0; @@ -2421,7 +2415,10 @@ namespace libtorrent TORRENT_ASSERT(index >= 0); TORRENT_ASSERT(index < associated_torrent().lock()->torrent_file().num_pieces()); TORRENT_ASSERT(m_sent_handshake); - TORRENT_ASSERT(m_sent_bitfield); + + // if we haven't sent the bitfield yet, this piece should be included in + // there instead + if (!m_sent_bitfield) return; char msg[] = {0,0,0,5,msg_have,0,0,0,0}; char* ptr = msg + 5; diff --git a/src/peer_connection.cpp b/src/peer_connection.cpp index adda07399..cdd4b5bc9 100644 --- a/src/peer_connection.cpp +++ b/src/peer_connection.cpp @@ -176,7 +176,6 @@ namespace libtorrent , m_upload_only(false) , m_bitfield_received(false) , m_no_download(false) - , m_sent_suggests(false) , m_holepunch_mode(false) , m_peer_choked(true) , m_have_all(false) @@ -677,16 +676,13 @@ namespace libtorrent i = m_allowed_fast.erase(i); } - for (std::vector::iterator i = m_suggested_pieces.begin(); - i != m_suggested_pieces.end();) - { - if (*i < m_num_pieces) - { - ++i; - continue; - } - i = m_suggested_pieces.erase(i); - } + // remove any piece suggested to us whose index is invalid + // now that we know how many pieces there are + m_suggested_pieces.erase( + std::remove_if(m_suggested_pieces.begin(), m_suggested_pieces.end() + , [=](int const p) { return p >= m_num_pieces; }) + , m_suggested_pieces.end()); + on_metadata(); if (m_disconnecting) return; } @@ -904,7 +900,7 @@ namespace libtorrent ++peer_info_struct()->fast_reconnects; } - void peer_connection::received_piece(int index) + void peer_connection::received_piece(int const index) { TORRENT_ASSERT(is_single_thread()); // dont announce during handshake @@ -941,23 +937,21 @@ namespace libtorrent #endif } - void peer_connection::announce_piece(int index) + void peer_connection::announce_piece(int const index) { TORRENT_ASSERT(is_single_thread()); // dont announce during handshake if (in_handshake()) return; - if (has_piece(index)) + // optimization, don't send have messages + // to peers that already have the piece + if (!m_settings.get_bool(settings_pack::send_redundant_have) + && has_piece(index)) { - // optimization, don't send have messages - // to peers that already have the piece - if (!m_settings.get_bool(settings_pack::send_redundant_have)) - { #ifndef TORRENT_DISABLE_LOGGING - peer_log(peer_log_alert::outgoing_message, "HAVE", "piece: %d SUPRESSED", index); + peer_log(peer_log_alert::outgoing_message, "HAVE", "piece: %d SUPRESSED", index); #endif - return; - } + return; } if (disconnect_if_redundant()) return; @@ -1526,7 +1520,7 @@ namespace libtorrent // ------- SUGGEST PIECE ------- // ----------------------------- - void peer_connection::incoming_suggest(int index) + void peer_connection::incoming_suggest(int const index) { TORRENT_ASSERT(is_single_thread()); INVARIANT_CHECK; @@ -1789,7 +1783,7 @@ namespace libtorrent // ----------- HAVE ------------ // ----------------------------- - void peer_connection::incoming_have(int index) + void peer_connection::incoming_have(int const index) { TORRENT_ASSERT(is_single_thread()); INVARIANT_CHECK; @@ -1811,6 +1805,17 @@ namespace libtorrent // probably omitted, which is the same as 'have_none' if (!m_bitfield_received) incoming_have_none(); + // if this peer is choked, there's no point in sending suggest messages to + // it. They would just be out-of-date by the time we unchoke the peer + // anyway. + if (m_settings.get_int(settings_pack::suggest_mode) == settings_pack::suggest_read_cache + && !is_choked() + && std::any_of(m_suggest_pieces.begin(), m_suggest_pieces.end() + , [=](int const idx) { return idx == index; })) + { + send_piece_suggestions(2); + } + #ifndef TORRENT_DISABLE_LOGGING peer_log(peer_log_alert::incoming_message, "HAVE", "piece: %d", index); #endif @@ -3668,6 +3673,8 @@ namespace libtorrent m_counters.inc_stats_counter(counters::num_peers_up_unchoked_optimistic, -1); } + std::vector().swap(m_suggest_pieces); + #ifndef TORRENT_DISABLE_LOGGING peer_log(peer_log_alert::outgoing_message, "CHOKE"); #endif @@ -3716,22 +3723,12 @@ namespace libtorrent boost::shared_ptr t = m_torrent.lock(); if (!t->ready_for_connections()) return false; - if (!m_sent_suggests) + if (m_settings.get_int(settings_pack::suggest_mode) + == settings_pack::suggest_read_cache) { - std::vector const& ret - = t->get_suggested_pieces(); - - for (std::vector::const_iterator i = ret.begin() - , end(ret.end()); i != end; ++i) - { - TORRENT_ASSERT(i->piece_index >= 0); - // this can happen if a piece fail to be - // flushed to disk for whatever reason - if (!t->has_piece_passed(i->piece_index)) continue; - send_suggest(i->piece_index); - } - - m_sent_suggests = true; + // immediately before unchoking this peer, we should send some + // suggested pieces for it to request + send_piece_suggestions(2); } m_last_unchoke = aux::time_now(); @@ -3795,16 +3792,38 @@ namespace libtorrent #endif } - void peer_connection::send_suggest(int piece) + void peer_connection::send_piece_suggestions(int const num) + { + boost::shared_ptr t = m_torrent.lock(); + TORRENT_ASSERT(t); + + int const new_suggestions = t->get_suggest_pieces(m_suggest_pieces + , m_have_piece, num); + + // higher priority pieces are farther back in the vector, the last + // suggested piece to be received is the highest priority, so send the + // highest priority piece last. + for (auto i = m_suggest_pieces.end() - new_suggestions; + i != m_suggest_pieces.end(); ++i) + { + send_suggest(*i); + } + int const max = m_settings.get_int(settings_pack::max_suggest_pieces); + if (m_suggest_pieces.size() > max) + { + int const to_erase = m_suggest_pieces.size() - max; + m_suggest_pieces.erase(m_suggest_pieces.begin() + , m_suggest_pieces.begin() + to_erase); + } + } + + void peer_connection::send_suggest(int const piece) { TORRENT_ASSERT(is_single_thread()); - if (m_connecting) return; - if (in_handshake()) return; + if (m_connecting || in_handshake()) return; // don't suggest a piece that the peer already has - // don't suggest anything to a peer that isn't interested - if (has_piece(piece) || !m_peer_interested) - return; + if (has_piece(piece)) return; // we cannot suggest a piece we don't have! #if TORRENT_USE_ASSERTS @@ -3816,18 +3835,6 @@ namespace libtorrent } #endif - - if (m_sent_suggested_pieces.empty()) - { - boost::shared_ptr t = m_torrent.lock(); - m_sent_suggested_pieces.resize(t->torrent_file().num_pieces(), false); - } - - TORRENT_ASSERT(piece >= 0 && piece < m_sent_suggested_pieces.size()); - - if (m_sent_suggested_pieces[piece]) return; - m_sent_suggested_pieces.set_bit(piece); - write_suggest(piece); } @@ -3855,7 +3862,7 @@ namespace libtorrent if (int(m_download_queue.size()) >= m_desired_queue_size || t->upload_mode()) return; - bool empty_download_queue = m_download_queue.empty(); + bool const empty_download_queue = m_download_queue.empty(); while (!m_request_queue.empty() && (int(m_download_queue.size()) < m_desired_queue_size @@ -5259,7 +5266,7 @@ namespace libtorrent // 0: success, piece passed hash check // -1: disk failure - int disk_rtt = int(total_microseconds(clock_type::now() - issue_time)); + int const disk_rtt = int(total_microseconds(clock_type::now() - issue_time)); #ifndef TORRENT_DISABLE_LOGGING peer_log(peer_log_alert::info, "FILE_ASYNC_READ_COMPLETE" @@ -5309,6 +5316,15 @@ namespace libtorrent // thread to be done with it disk_buffer_holder buffer(m_allocator, *j); + if (t && m_settings.get_int(settings_pack::suggest_mode) + == settings_pack::suggest_read_cache) + { + // tell the torrent that we just read a block from this piece. + // if this piece is low-availability, it's now a candidate for being + // suggested to other peers + t->add_suggest_piece(r.piece); + } + if (m_disconnecting) return; // flush send buffer at the end of diff --git a/src/session_impl.cpp b/src/session_impl.cpp index a7adeb557..fd91742ae 100644 --- a/src/session_impl.cpp +++ b/src/session_impl.cpp @@ -360,8 +360,6 @@ namespace aux { , m_optimistic_unchoke_time_scaler(0) , m_disconnect_time_scaler(90) , m_auto_scrape_time_scaler(180) - , m_next_suggest_torrent(0) - , m_suggest_timer(0) , m_peak_up_rate(0) , m_peak_down_rate(0) , m_created(clock_type::now()) @@ -3327,27 +3325,6 @@ namespace aux { } } - // -------------------------------------------------------------- - // refresh torrent suggestions - // -------------------------------------------------------------- - --m_suggest_timer; - if (m_settings.get_int(settings_pack::suggest_mode) != settings_pack::no_piece_suggestions - && m_suggest_timer <= 0) - { - INVARIANT_CHECK; - m_suggest_timer = 10; - - torrent_map::iterator least_recently_refreshed = m_torrents.begin(); - if (m_next_suggest_torrent >= int(m_torrents.size())) - m_next_suggest_torrent = 0; - - std::advance(least_recently_refreshed, m_next_suggest_torrent); - - if (least_recently_refreshed != m_torrents.end()) - least_recently_refreshed->second->refresh_suggest_pieces(); - ++m_next_suggest_torrent; - } - // -------------------------------------------------------------- // connect new peers // -------------------------------------------------------------- diff --git a/src/settings_pack.cpp b/src/settings_pack.cpp index 14df2fcc6..8bd669a09 100644 --- a/src/settings_pack.cpp +++ b/src/settings_pack.cpp @@ -242,7 +242,7 @@ namespace libtorrent SET(optimistic_unchoke_interval, 30, 0), SET(num_want, 200, 0), SET(initial_picker_threshold, 4, 0), - SET(allowed_fast_set_size, 10, 0), + SET(allowed_fast_set_size, 5, 0), SET(suggest_mode, settings_pack::no_piece_suggestions, 0), SET(max_queued_disk_bytes, 1024 * 1024, &session_impl::update_queued_disk_bytes), SET(handshake_timeout, 10, 0), @@ -283,7 +283,7 @@ namespace libtorrent SET(read_cache_line_size, 32, 0), SET(write_cache_line_size, 16, 0), SET(optimistic_disk_retry, 10 * 60, 0), - SET(max_suggest_pieces, 10, 0), + SET(max_suggest_pieces, 16, 0), SET(local_service_announce_interval, 5 * 60, 0), SET(dht_announce_interval, 15 * 60, &session_impl::update_dht_announce_interval), SET(udp_tracker_token_expiry, 60, 0), diff --git a/src/torrent.cpp b/src/torrent.cpp index 3b32353ef..dc09c4527 100644 --- a/src/torrent.cpp +++ b/src/torrent.cpp @@ -249,7 +249,6 @@ namespace libtorrent , m_max_uploads((1<<24)-1) , m_save_resume_flags(0) , m_num_uploads(0) - , m_need_suggest_pieces_refresh(false) , m_need_connect_boost(true) , m_lsd_seq(0) , m_magnet_link(false) @@ -1255,8 +1254,10 @@ namespace libtorrent TORRENT_ASSERT(!m_have_all); m_picker.reset(new piece_picker()); - int blocks_per_piece = (m_torrent_file->piece_length() + block_size() - 1) / block_size(); - int blocks_in_last_piece = ((m_torrent_file->total_size() % m_torrent_file->piece_length()) + int const blocks_per_piece + = (m_torrent_file->piece_length() + block_size() - 1) / block_size(); + int const blocks_in_last_piece + = ((m_torrent_file->total_size() % m_torrent_file->piece_length()) + block_size() - 1) / block_size(); m_picker->init(blocks_per_piece, blocks_in_last_piece, m_torrent_file->num_pieces()); @@ -1374,20 +1375,6 @@ namespace libtorrent maybe_done_flushing(); } - void torrent::on_disk_cache_complete(disk_io_job const* j) - { - TORRENT_ASSERT(have_piece(j->piece)); - - dec_refcount("cache_piece"); - - if (j->ret < 0) return; - - // suggest this piece to all peers - for (peer_iterator i = m_connections.begin(); - i != m_connections.end(); ++i) - (*i)->send_suggest(j->piece); - } - void torrent::on_disk_tick_done(disk_io_job const* j) { if (j->ret && m_storage_tick == 0) @@ -4008,6 +3995,21 @@ namespace libtorrent } + void torrent::add_suggest_piece(int const index) + { + TORRENT_ASSERT(settings().get_int(settings_pack::suggest_mode) + == settings_pack::suggest_read_cache); + + // when we care about suggest mode, we keep the piece picker + // around to track piece availability + need_picker(); + int const peers = std::max(num_peers(), 1); + int const availability = m_picker->get_availability(index) * 100 / peers; + + m_suggest_pieces.add_piece(index, availability + , settings().get_int(settings_pack::max_suggest_pieces)); + } + // this is called once we have completely downloaded piece // 'index', its hash has been verified. It's also called // during initial file check when we find a piece whose hash @@ -4083,15 +4085,6 @@ namespace libtorrent p->update_interest(); } - if (settings().get_int(settings_pack::suggest_mode) == settings_pack::suggest_read_cache) - { - // we just got a new piece. Chances are that it's actually the - // rarest piece (since we're likely to download pieces rarest first) - // if it's rarer than any other piece that we currently suggest, insert - // it in the suggest set and pop the last one out - add_suggest_piece(index); - } - set_need_save_resume(); state_updated(); @@ -4149,6 +4142,16 @@ namespace libtorrent remove_time_critical_piece(index, true); + if (settings().get_int(settings_pack::suggest_mode) + == settings_pack::suggest_read_cache) + { + // we just got a new piece. Chances are that it's actually the + // rarest piece (since we're likely to download pieces rarest first) + // if it's rarer than any other piece that we currently suggest, insert + // it in the suggest set and pop the last one out + add_suggest_piece(index); + } + std::vector downloaders; m_picker->get_downloaders(downloaders, index); @@ -4475,7 +4478,6 @@ namespace libtorrent { torrent_peer* pp = peer->peer_info_struct(); m_picker->inc_refcount(index, pp); - update_suggest_piece(index, 1); } #ifdef TORRENT_DEBUG else @@ -4493,7 +4495,6 @@ namespace libtorrent TORRENT_ASSERT(bits.size() == torrent_file().num_pieces()); torrent_peer* pp = peer->peer_info_struct(); m_picker->inc_refcount(bits, pp); - refresh_suggest_pieces(); } #ifdef TORRENT_DEBUG else @@ -4525,7 +4526,6 @@ namespace libtorrent TORRENT_ASSERT(bits.size() == torrent_file().num_pieces()); torrent_peer* pp = peer->peer_info_struct(); m_picker->dec_refcount(bits, pp); - // TODO: update suggest_piece? } #ifdef TORRENT_DEBUG else @@ -4541,7 +4541,6 @@ namespace libtorrent { torrent_peer* pp = peer->peer_info_struct(); m_picker->dec_refcount(index, pp); - update_suggest_piece(index, -1); } #ifdef TORRENT_DEBUG else @@ -4551,153 +4550,6 @@ namespace libtorrent #endif } - void torrent::add_suggest_piece(int index) - { - // it would be nice if we would keep track of piece - // availability even when we're a seed, for - // the suggest piece feature - if (!has_picker()) return; - - int num_peers = m_picker->get_availability(index); - - TORRENT_ASSERT(has_piece_passed(index)); - - // in order to avoid unnecessary churn in the suggested pieces - // the new piece has to beat the existing piece by at least one - // peer in availability. - // m_suggested_pieces is sorted by rarity, the last element - // should have the most peers (num_peers). - if (m_suggested_pieces.empty() - || num_peers < m_suggested_pieces[m_suggested_pieces.size()-1].num_peers - 1) - { - suggest_piece_t sp; - sp.piece_index = index; - sp.num_peers = num_peers; - - auto range = std::equal_range(m_suggested_pieces.begin(), m_suggested_pieces.end(), sp); - - // make sure this piece isn't already in the suggested set. - // if it is, just ignore it - auto i = std::find_if(range.first, range.second - , [index] (suggest_piece_t const& p) { return p.piece_index == index; }); - if (i != range.second) return; - - m_suggested_pieces.insert(range.second, sp); - if (m_suggested_pieces.size() > 0) - m_suggested_pieces.pop_back(); - - // tell all peers about this new suggested piece - for (peer_iterator p = m_connections.begin() - , end(m_connections.end()); p != end; ++p) - { - (*p)->send_suggest(index); - } - - refresh_suggest_pieces(); - } - } - - void torrent::update_suggest_piece(int index, int change) - { - for (std::vector::iterator i = m_suggested_pieces.begin() - , end(m_suggested_pieces.end()); i != end; ++i) - { - if (i->piece_index != index) continue; - - i->num_peers += change; - if (change > 0) - std::stable_sort(i, end); - else if (change < 0) - std::stable_sort(m_suggested_pieces.begin(), i + 1); - } - - if (!m_suggested_pieces.empty() && m_suggested_pieces[0].num_peers > m_connections.size() * 2 / 3) - { - // the rarest piece we have in the suggest set is not very - // rare anymore. at least 2/3 of the peers has it now. Refresh - refresh_suggest_pieces(); - } - } - - void torrent::refresh_suggest_pieces() - { - m_need_suggest_pieces_refresh = true; - } - - void torrent::do_refresh_suggest_pieces() - { - m_need_suggest_pieces_refresh = false; - - if (settings().get_int(settings_pack::suggest_mode) - == settings_pack::no_piece_suggestions) - return; - - if (!valid_metadata()) return; - - boost::shared_ptr t = shared_from_this(); - TORRENT_ASSERT(t); - cache_status cs; - m_ses.disk_thread().get_cache_info(&cs, m_storage.get() == NULL, m_storage.get()); - - // remove write cache entries - cs.pieces.erase(std::remove_if(cs.pieces.begin(), cs.pieces.end() - , [] (cached_piece_info const& cp) { return cp.kind == cached_piece_info::write_cache; } ) - , cs.pieces.end()); - - std::vector& pieces = m_suggested_pieces; - pieces.clear(); - pieces.reserve(cs.pieces.size()); - - // sort in decending order, to get most recently used first - std::sort(cs.pieces.begin(), cs.pieces.end() - , [] (cached_piece_info const& lhs, cached_piece_info const& rhs) - { return lhs.last_use >= rhs.last_use; }); - - for (std::vector::iterator i = cs.pieces.begin() - , end(cs.pieces.end()); i != end; ++i) - { - TORRENT_ASSERT(i->storage == m_storage.get()); - if (!has_piece_passed(i->piece)) continue; - suggest_piece_t p; - p.piece_index = i->piece; - if (has_picker()) - { - p.num_peers = m_picker->get_availability(i->piece); - } - else - { - // TODO: really, we should just keep the picker around - // in this case to maintain the availability counters - p.num_peers = 0; - for (const_peer_iterator j = m_connections.begin() - , end2(m_connections.end()); j != end2; ++j) - { - peer_connection* peer = *j; - if (peer->has_piece(p.piece_index)) ++p.num_peers; - } - } - pieces.push_back(p); - } - - // sort by rarity (stable, to maintain sort - // by last use) - std::stable_sort(pieces.begin(), pieces.end()); - - // only suggest half of the pieces - pieces.resize(pieces.size() / 2); - - // send new suggests to peers - // the peers will filter out pieces we've - // already suggested to them - for (std::vector::iterator i = pieces.begin() - , end(pieces.end()); i != end; ++i) - { - for (peer_iterator p = m_connections.begin(); - p != m_connections.end(); ++p) - (*p)->send_suggest(i->piece_index); - } - } - void torrent::abort() { TORRENT_ASSERT(is_single_thread()); @@ -7878,7 +7730,8 @@ namespace libtorrent // still need the piece picker, to keep track // of availability counts for pieces if (m_picker->is_seeding() - && settings().get_int(settings_pack::suggest_mode) != settings_pack::suggest_read_cache) + && settings().get_int(settings_pack::suggest_mode) + != settings_pack::suggest_read_cache) { // no need for the piece picker anymore m_picker.reset(); @@ -9511,8 +9364,6 @@ namespace libtorrent return; } - if (m_need_suggest_pieces_refresh) - do_refresh_suggest_pieces(); if (settings().get_bool(settings_pack::rate_limit_ip_overhead)) {