From f2ce2284da3f87e33da892f90dc79458681f2e2f Mon Sep 17 00:00:00 2001 From: arvidn Date: Sat, 23 Jan 2016 13:37:50 -0500 Subject: [PATCH] optimize the optimistic unchoke logic. extend the API for extensions to be able to affect the order of optimistic unchokes --- include/libtorrent/aux_/session_impl.hpp | 12 +- include/libtorrent/extensions.hpp | 34 +++- include/libtorrent/peer_connection_handle.hpp | 7 +- src/metadata_transfer.cpp | 2 +- src/session_impl.cpp | 181 +++++++++++------- src/torrent_peer.cpp | 2 +- src/ut_metadata.cpp | 2 +- 7 files changed, 158 insertions(+), 82 deletions(-) diff --git a/include/libtorrent/aux_/session_impl.hpp b/include/libtorrent/aux_/session_impl.hpp index 336d4d676..7d9b89d61 100644 --- a/include/libtorrent/aux_/session_impl.hpp +++ b/include/libtorrent/aux_/session_impl.hpp @@ -1169,15 +1169,19 @@ namespace libtorrent typedef std::list > ses_extension_list_t; ses_extension_list_t m_ses_extensions; - // std::string could be used for the query names if only all common implementations used SSO - // *glares at gcc* - struct extention_dht_query + // the union of all session extensions' implemented_features(). This is + // used to exclude callbacks to the session extensions. + boost::uint32_t m_session_extension_features; + + // std::string could be used for the query names if only all common + // implementations used SSO *glares at gcc* + struct extension_dht_query { boost::uint8_t query_len; boost::array query; dht_extension_handler_t handler; }; - typedef std::vector m_extension_dht_queries_t; + typedef std::vector m_extension_dht_queries_t; m_extension_dht_queries_t m_extension_dht_queries; #endif diff --git a/include/libtorrent/extensions.hpp b/include/libtorrent/extensions.hpp index 66b775e49..6f7f23b8c 100644 --- a/include/libtorrent/extensions.hpp +++ b/include/libtorrent/extensions.hpp @@ -208,6 +208,27 @@ namespace libtorrent // hidden virtual ~plugin() {} + // these are flags that can be returned by implemented_features() + // indicating which callbacks this plugin is interested in + enum feature_flags_t + { + // include this bit if your plugin needs to alter the order of the + // optimistic unchoke of peers. i.e. have the on_optimistic_unchoke() + // callback be called. + optimistic_unchoke_feature = 1, + + // include this bit if your plugin needs to have on_tick() called + tick_feature = 2 + }; + + // This function is expected to return a bitmask indicating which features + // this plugin implements. Some callbacks on this object may not be called + // unless the corresponding feature flag is returned here. Note that + // callbacks may still be called even if the corresponding feature is not + // specified in the return value here. See feature_flags_t for possible + // flags to return. + virtual boost::uint32_t implemented_features() { return 0; } + // this is called by the session every time a new torrent is added. // The ``torrent*`` points to the internal torrent object created // for the new torrent. The ``void*`` is the userdata pointer as @@ -237,12 +258,13 @@ namespace libtorrent // called once per second virtual void on_tick() {} - // called when choosing peers to optimistically unchoke - // peer's will be unchoked in the order they appear in the given - // vector which is initially sorted by when they were last - // optimistically unchoked. - // if the plugin returns true then the ordering provided will be - // used and no other plugin will be allowed to change it. + // called when choosing peers to optimisticallly unchoke. peer's will be + // unchoked in the order they appear in the given vector. if + // the plugin returns true then the ordering provided will be used and no + // other plugin will be allowed to change it. If your plugin expects this + // to be called, make sure to include the flag + // ``optimistic_unchoke_feature`` in the return value from + // implemented_features(). virtual bool on_optimistic_unchoke(std::vector& /* peers */) { return false; } diff --git a/include/libtorrent/peer_connection_handle.hpp b/include/libtorrent/peer_connection_handle.hpp index 3037c6938..a2b8d5c1a 100644 --- a/include/libtorrent/peer_connection_handle.hpp +++ b/include/libtorrent/peer_connection_handle.hpp @@ -50,6 +50,7 @@ struct crypto_plugin; typedef boost::system::error_code error_code; +// hidden struct TORRENT_EXPORT peer_connection_handle { peer_connection_handle(boost::weak_ptr impl) @@ -113,11 +114,11 @@ struct TORRENT_EXPORT peer_connection_handle time_point time_of_last_unchoke() const; bool operator==(peer_connection_handle const& o) const - { return m_connection.lock() == o.m_connection.lock(); } + { return !(m_connection < o.m_connection) && !(o.m_connection < m_connection); } bool operator!=(peer_connection_handle const& o) const - { return m_connection.lock() != o.m_connection.lock(); } + { return m_connection < o.m_connection || o.m_connection < m_connection; } bool operator<(peer_connection_handle const& o) const - { return m_connection.lock() < o.m_connection.lock(); } + { return m_connection < o.m_connection; } boost::shared_ptr native_handle() const { diff --git a/src/metadata_transfer.cpp b/src/metadata_transfer.cpp index 153a90483..d67148379 100644 --- a/src/metadata_transfer.cpp +++ b/src/metadata_transfer.cpp @@ -219,7 +219,7 @@ namespace libtorrent { namespace m_torrent.set_progress_ppm(boost::int64_t(m_metadata_progress) * 1000000 / m_metadata_size); } - void on_piece_pass(int) + void on_piece_pass(int) TORRENT_OVERRIDE { // if we became a seed, copy the metadata from // the torrent before it is deallocated diff --git a/src/session_impl.cpp b/src/session_impl.cpp index cd1bbe2b0..af46b81d6 100644 --- a/src/session_impl.cpp +++ b/src/session_impl.cpp @@ -430,6 +430,9 @@ namespace aux { , m_download_connect_attempts(0) , m_next_scrape_torrent(0) , m_tick_residual(0) +#ifndef TORRENT_DISABLE_EXTENSIONS + , m_session_extension_features(0) +#endif , m_deferred_submit_disk_jobs(false) , m_pending_auto_manage(false) , m_need_auto_manage(false) @@ -921,6 +924,7 @@ namespace aux { boost::shared_ptr p(new session_plugin_wrapper(ext)); m_ses_extensions.push_back(p); + m_session_extension_features |= p->implemented_features(); } void session_impl::add_ses_extension(boost::shared_ptr ext) @@ -931,6 +935,7 @@ namespace aux { m_ses_extensions.push_back(ext); m_alerts.add_extension(ext); ext->added(session_handle(this)); + m_session_extension_features |= ext->implemented_features(); // get any DHT queries the plugin would like to handle // and record them in m_extension_dht_queries for lookup @@ -942,7 +947,7 @@ namespace aux { { TORRENT_ASSERT(e->first.size() <= max_dht_query_length); if (e->first.size() > max_dht_query_length) continue; - extention_dht_query registration; + extension_dht_query registration; registration.query_len = e->first.size(); std::copy(e->first.begin(), e->first.end(), registration.query.begin()); registration.handler = e->second; @@ -3047,12 +3052,15 @@ retry: } #ifndef TORRENT_DISABLE_EXTENSIONS - for (ses_extension_list_t::const_iterator i = m_ses_extensions.begin() - , end(m_ses_extensions.end()); i != end; ++i) + if (m_session_extension_features & plugin::tick_feature) { - TORRENT_TRY { - (*i)->on_tick(); - } TORRENT_CATCH(std::exception&) {} + for (ses_extension_list_t::const_iterator i = m_ses_extensions.begin() + , end(m_ses_extensions.end()); i != end; ++i) + { + TORRENT_TRY { + (*i)->on_tick(); + } TORRENT_CATCH(std::exception&) {} + } } #endif @@ -3764,24 +3772,32 @@ retry: } namespace { - struct last_optimistic_unchoke_cmp + bool last_optimistic_unchoke_cmp(torrent_peer const* const l + , torrent_peer const* const r) { - bool operator()(peer_connection_handle const& l - , peer_connection_handle const& r) - { - return l.native_handle()->peer_info_struct()->last_optimistically_unchoked - < r.native_handle()->peer_info_struct()->last_optimistically_unchoked; - } - }; + return l->last_optimistically_unchoked + < r->last_optimistically_unchoked; + } } void session_impl::recalculate_optimistic_unchoke_slots() { + INVARIANT_CHECK; + TORRENT_ASSERT(is_single_thread()); if (m_stats_counters[counters::num_unchoke_slots] == 0) return; - std::vector opt_unchoke; + std::vector opt_unchoke; + // collect the currently optimistically unchoked peers here, so we can + // choke them when we've found new optimistic unchoke candidates. + std::vector prev_opt_unchoke; + + // TODO: 3 it would probably make sense to have a separate list of peers + // that are eligible for optimistic unchoke, similar to the torrents + // perhaps this could even iterate over the pool allocators of + // torrent_peer objects. It could probably be done in a single pass and + // collect the n best candidates for (connection_map::iterator i = m_connections.begin() , end(m_connections.end()); i != end; ++i) { @@ -3790,90 +3806,123 @@ retry: torrent_peer* pi = p->peer_info_struct(); if (!pi) continue; if (pi->web_seed) continue; - torrent* t = p->associated_torrent().lock().get(); - if (!t) continue; - if (t->is_paused()) continue; if (pi->optimistically_unchoked) { - TORRENT_ASSERT(!p->is_choked()); - opt_unchoke.push_back(peer_connection_handle(*i)); + prev_opt_unchoke.push_back(pi); } + torrent* t = p->associated_torrent().lock().get(); + if (!t) continue; + + // TODO: 3 peers should know whether their torrent is paused or not, + // instead of having to ask it over and over again + if (t->is_paused()) continue; + if (!p->is_connecting() && !p->is_disconnecting() && p->is_peer_interested() && t->free_upload_slots() - && p->is_choked() + && (p->is_choked() || pi->optimistically_unchoked) && !p->ignore_unchoke_slots() && t->valid_metadata()) { - opt_unchoke.push_back(peer_connection_handle(*i)); + opt_unchoke.push_back(pi); } } // find the peers that has been waiting the longest to be optimistically // unchoked - // avoid having a bias towards peers that happen to be sorted first - std::random_shuffle(opt_unchoke.begin(), opt_unchoke.end(), randint); - - // sort all candidates based on when they were last optimistically - // unchoked. - std::sort(opt_unchoke.begin(), opt_unchoke.end(), last_optimistic_unchoke_cmp()); - -#ifndef TORRENT_DISABLE_EXTENSIONS - for (ses_extension_list_t::iterator i = m_ses_extensions.begin() - , end(m_ses_extensions.end()); i != end; ++i) - { - if ((*i)->on_optimistic_unchoke(opt_unchoke)) - break; - } -#endif - int num_opt_unchoke = m_settings.get_int(settings_pack::num_optimistic_unchoke_slots); int allowed_unchoke_slots = m_stats_counters[counters::num_unchoke_slots]; if (num_opt_unchoke == 0) num_opt_unchoke = (std::max)(1, allowed_unchoke_slots / 5); + if (num_opt_unchoke > int(opt_unchoke.size())) num_opt_unchoke = + int(opt_unchoke.size()); + + // find the n best optimistic unchoke candidates + std::partial_sort(opt_unchoke.begin() + , opt_unchoke.begin() + num_opt_unchoke + , opt_unchoke.end(), &last_optimistic_unchoke_cmp); + + +#ifndef TORRENT_DISABLE_EXTENSIONS + if (m_session_extension_features & plugin::optimistic_unchoke_feature) + { + // if there is an extension that wants to reorder the optimistic + // unchoke peers, first convert the vector into one containing + // peer_connection_handles, since that's the exported API + std::vector peers; + peers.reserve(opt_unchoke.size()); + for (std::vector::iterator i = opt_unchoke.begin() + , end(opt_unchoke.end()); i != end; ++i) + { + peers.push_back(peer_connection_handle(static_cast((*i)->connection)->self())); + } + for (ses_extension_list_t::iterator i = m_ses_extensions.begin() + , end(m_ses_extensions.end()); i != end; ++i) + { + if ((*i)->on_optimistic_unchoke(peers)) + break; + } + // then convert back to the internal torrent_peer pointers + opt_unchoke.clear(); + for (std::vector::iterator i = peers.begin() + , end(peers.end()); i != end; ++i) + { + opt_unchoke.push_back(i->native_handle()->peer_info_struct()); + } + } +#endif // unchoke the first num_opt_unchoke peers in the candidate set // and make sure that the others are choked - for (std::vector::iterator i = opt_unchoke.begin() - , end(opt_unchoke.end()); i != end; ++i) + std::vector::iterator opt_unchoke_end = opt_unchoke.begin() + + num_opt_unchoke; + + for (std::vector::iterator i = opt_unchoke.begin(); + i != opt_unchoke_end; ++i) { - torrent_peer* pi = i->native_handle()->peer_info_struct(); - if (num_opt_unchoke > 0) + torrent_peer* pi = *i; + if (pi->optimistically_unchoked) { - --num_opt_unchoke; - if (!pi->optimistically_unchoked) - { - peer_connection* p = static_cast(pi->connection); - torrent* t = p->associated_torrent().lock().get(); - bool ret = t->unchoke_peer(*p, true); - if (ret) - { - pi->optimistically_unchoked = true; - m_stats_counters.inc_stats_counter(counters::num_peers_up_unchoked_optimistic); - pi->last_optimistically_unchoked = boost::uint16_t(session_time()); - } - else - { - // we failed to unchoke it, increment the count again - ++num_opt_unchoke; - } - } + TORRENT_ASSERT(!pi->connection->is_choked()); + // remove this peer from prev_opt_unchoke, to prevent us from + // choking it later. This peer gets another round of optimistic + // unchoke + std::vector::iterator existing = + std::find(prev_opt_unchoke.begin(), prev_opt_unchoke.end(), pi); + TORRENT_ASSERT(existing != prev_opt_unchoke.end()); + prev_opt_unchoke.erase(existing); } else { - if (pi->optimistically_unchoked) + peer_connection* p = static_cast(pi->connection); + TORRENT_ASSERT(p->is_choked()); + boost::shared_ptr t = p->associated_torrent().lock(); + bool ret = t->unchoke_peer(*p, true); + TORRENT_ASSERT(ret); + if (ret) { - peer_connection* p = static_cast(pi->connection); - torrent* t = p->associated_torrent().lock().get(); - pi->optimistically_unchoked = false; - m_stats_counters.inc_stats_counter(counters::num_peers_up_unchoked_optimistic, -1); - t->choke_peer(*p); + pi->optimistically_unchoked = true; + m_stats_counters.inc_stats_counter(counters::num_peers_up_unchoked_optimistic); + pi->last_optimistically_unchoked = boost::uint16_t(session_time()); } } } + + // now, choke all the previous optimistically unchoked peers + for (std::vector::iterator i = prev_opt_unchoke.begin() + , end(prev_opt_unchoke.end()); i != end; ++i) + { + torrent_peer* pi = *i; + TORRENT_ASSERT(pi->optimistically_unchoked); + peer_connection* p = static_cast(pi->connection); + boost::shared_ptr t = p->associated_torrent().lock(); + pi->optimistically_unchoked = false; + m_stats_counters.inc_stats_counter(counters::num_peers_up_unchoked_optimistic, -1); + t->choke_peer(*p); + } } void session_impl::try_connect_more_peers() diff --git a/src/torrent_peer.cpp b/src/torrent_peer.cpp index a90933e88..705541cf2 100644 --- a/src/torrent_peer.cpp +++ b/src/torrent_peer.cpp @@ -284,7 +284,7 @@ namespace libtorrent return ""; } #endif - + libtorrent::address torrent_peer::address() const { #if TORRENT_USE_IPV6 diff --git a/src/ut_metadata.cpp b/src/ut_metadata.cpp index 25a534372..19c66b302 100644 --- a/src/ut_metadata.cpp +++ b/src/ut_metadata.cpp @@ -167,7 +167,7 @@ namespace libtorrent { namespace m_torrent.set_progress_ppm(boost::int64_t(m_metadata_progress) * 1000000 / m_metadata_size); } */ - void on_piece_pass(int) + void on_piece_pass(int) TORRENT_OVERRIDE { // if we became a seed, copy the metadata from // the torrent before it is deallocated