From d7d498c3a31f9779b135c3ceeb2ebe4d7e1d3ea2 Mon Sep 17 00:00:00 2001 From: Arvid Norberg Date: Mon, 22 Sep 2014 03:47:43 +0000 Subject: [PATCH] more steps towards isolating peer_connections from the session object. unchoking now happens via the torrent object. hopefully in the future, the session can determine which peers to unchoke without having direct access to them (i.e. with a mutex) --- include/libtorrent/aux_/session_impl.hpp | 21 ++- include/libtorrent/aux_/session_interface.hpp | 2 - include/libtorrent/peer_connection.hpp | 3 + include/libtorrent/performance_counters.hpp | 2 + include/libtorrent/torrent.hpp | 3 + src/bt_peer_connection.cpp | 10 +- src/peer_connection.cpp | 171 ++++++++++-------- src/session_impl.cpp | 97 ++++++---- src/session_stats.cpp | 7 + src/torrent.cpp | 19 +- 10 files changed, 203 insertions(+), 132 deletions(-) diff --git a/include/libtorrent/aux_/session_impl.hpp b/include/libtorrent/aux_/session_impl.hpp index 901fdc1bd..c2397fa12 100644 --- a/include/libtorrent/aux_/session_impl.hpp +++ b/include/libtorrent/aux_/session_impl.hpp @@ -495,18 +495,23 @@ namespace libtorrent int rate_limit(peer_class_t c, int channel) const; bool preemptive_unchoke() const; - int num_uploads() const { return m_num_unchoked; } + int num_uploads() const + { return m_stats_counters[counters::num_peers_up_unchoked]; } int num_connections() const { return m_connections.size(); } int peak_up_rate() const { return m_peak_up_rate; } - void unchoke_peer(peer_connection& c); - void choke_peer(peer_connection& c); void trigger_unchoke() - { m_unchoke_time_scaler = 0; } + { + TORRENT_ASSERT(is_single_thread()); + m_unchoke_time_scaler = 0; + } void trigger_optimistic_unchoke() - { m_optimistic_unchoke_time_scaler = 0; } + { + TORRENT_ASSERT(is_single_thread()); + m_optimistic_unchoke_time_scaler = 0; + } session_status status() const; void set_peer_id(peer_id const& id); @@ -914,24 +919,24 @@ namespace libtorrent // this should always be >= m_max_uploads int m_allowed_upload_slots; - // the number of unchoked peers - int m_num_unchoked; - // this is initialized to the unchoke_interval // session_setting and decreased every second. // when it reaches zero, it is reset to the // unchoke_interval and the unchoke set is // recomputed. + // TODO: replace this by a proper asio timer int m_unchoke_time_scaler; // this is used to decide when to recalculate which // torrents to keep queued and which to activate + // TODO: replace this by a proper asio timer int m_auto_manage_time_scaler; // works like unchoke_time_scaler but it // is only decresed when the unchoke set // is recomputed, and when it reaches zero, // the optimistic unchoke is moved to another peer. + // TODO: replace this by a proper asio timer int m_optimistic_unchoke_time_scaler; // works like unchoke_time_scaler. Each time diff --git a/include/libtorrent/aux_/session_interface.hpp b/include/libtorrent/aux_/session_interface.hpp index 90cca527a..5b46dd04e 100644 --- a/include/libtorrent/aux_/session_interface.hpp +++ b/include/libtorrent/aux_/session_interface.hpp @@ -147,8 +147,6 @@ namespace libtorrent { namespace aux virtual bool is_aborted() const = 0; virtual int num_uploads() const = 0; virtual bool preemptive_unchoke() const = 0; - virtual void unchoke_peer(peer_connection& c) = 0; - virtual void choke_peer(peer_connection& c) = 0; virtual void trigger_optimistic_unchoke() = 0; virtual void trigger_unchoke() = 0; diff --git a/include/libtorrent/peer_connection.hpp b/include/libtorrent/peer_connection.hpp index f6e53e597..d0b89a088 100644 --- a/include/libtorrent/peer_connection.hpp +++ b/include/libtorrent/peer_connection.hpp @@ -469,6 +469,9 @@ namespace libtorrent bool is_peer_interested() const { return m_peer_interested; } bool has_peer_choked() const { return m_peer_choked; } + void choke_this_peer(); + void maybe_unchoke_this_peer(); + void update_interest(); virtual void get_peer_info(peer_info& p) const; diff --git a/include/libtorrent/performance_counters.hpp b/include/libtorrent/performance_counters.hpp index b6e7c74ec..4bb8cd9a3 100644 --- a/include/libtorrent/performance_counters.hpp +++ b/include/libtorrent/performance_counters.hpp @@ -338,6 +338,8 @@ namespace libtorrent num_peers_connected, num_peers_up_interested, num_peers_down_interested, + num_peers_up_unchoked_all, + num_peers_up_unchoked_optimistic, num_peers_up_unchoked, num_peers_down_unchoked, num_peers_up_requests, diff --git a/include/libtorrent/torrent.hpp b/include/libtorrent/torrent.hpp index 5dc4c9e26..1e250cb62 100644 --- a/include/libtorrent/torrent.hpp +++ b/include/libtorrent/torrent.hpp @@ -584,6 +584,9 @@ namespace libtorrent bool choke_peer(peer_connection& c); bool unchoke_peer(peer_connection& c, bool optimistic = false); + void trigger_unchoke(); + void trigger_optimistic_unchoke(); + // used by peer_connection to attach itself to a torrent // since incoming connections don't know what torrent // they're a part of until they have received an info_hash. diff --git a/src/bt_peer_connection.cpp b/src/bt_peer_connection.cpp index 89ce9bbb7..83ce5576c 100644 --- a/src/bt_peer_connection.cpp +++ b/src/bt_peer_connection.cpp @@ -851,10 +851,7 @@ namespace libtorrent // if the peer is ignoring unchoke slots, or if we have enough // unused slots, unchoke this peer right away, to save a round-trip // in case it's interested. - if (ignore_unchoke_slots()) - send_unchoke(); - else if (m_ses.preemptive_unchoke()) - m_ses.unchoke_peer(*this); + maybe_unchoke_this_peer(); } } } @@ -3029,10 +3026,7 @@ namespace libtorrent // if the peer is ignoring unchoke slots, or if we have enough // unused slots, unchoke this peer right away, to save a round-trip // in case it's interested. - if (ignore_unchoke_slots()) - send_unchoke(); - else if (m_ses.preemptive_unchoke()) - m_ses.unchoke_peer(*this); + maybe_unchoke_this_peer(); } } diff --git a/src/peer_connection.cpp b/src/peer_connection.cpp index 96033d393..97f5bc88d 100644 --- a/src/peer_connection.cpp +++ b/src/peer_connection.cpp @@ -873,7 +873,11 @@ namespace libtorrent if (m_peer_interested) m_counters.inc_stats_counter(counters::num_peers_up_interested, -1); if (!m_choked) - m_counters.inc_stats_counter(counters::num_peers_up_unchoked, -1); + { + m_counters.inc_stats_counter(counters::num_peers_up_unchoked_all, -1); + if (!ignore_unchoke_slots()) + m_counters.inc_stats_counter(counters::num_peers_up_unchoked, -1); + } if (!m_peer_choked) m_counters.inc_stats_counter(counters::num_peers_down_unchoked, -1); if (m_connected) @@ -1284,8 +1288,14 @@ namespace libtorrent return; } - if (t->is_paused() && (!t->is_auto_managed() - || !m_settings.get_bool(settings_pack::incoming_starts_queued_torrents))) + if (t->is_paused() + && m_settings.get_bool(settings_pack::incoming_starts_queued_torrents) + && !t->is_aborted()) + { + t->resume(); + } + + if (t->is_paused() || t->is_aborted()) { // paused torrents will not accept // incoming connections unless they are auto managed @@ -1316,15 +1326,6 @@ namespace libtorrent TORRENT_ASSERT(m_torrent.expired()); - if (t->is_paused() - && m_settings.get_bool(settings_pack::incoming_starts_queued_torrents) - && !m_ses.is_paused() - && !t->is_aborted() - && !m_ses.is_aborted()) - { - t->resume(); - } - // check to make sure we don't have another connection with the same // info_hash and peer_id. If we do. close this connection. t->attach_peer(this); @@ -1705,44 +1706,7 @@ namespace libtorrent return; } - if (is_choked()) - { - if (ignore_unchoke_slots()) - { -#ifdef TORRENT_VERBOSE_LOGGING - peer_log("ABOUT TO UNCHOKE [ peer ignores unchoke slots ]"); -#endif - // if this peer is expempted from the choker - // just unchoke it immediately - send_unchoke(); - } - // TODO: 3 we should probably use ses.m_allowed_upload_slots here instead - // to work with auto-unchoke logic - else if (m_ses.num_uploads() < m_settings.get_int(settings_pack::unchoke_slots_limit) - || m_settings.get_int(settings_pack::unchoke_slots_limit) < 0) - { - // if the peer is choked and we have upload slots left, - // then unchoke it. Another condition that has to be met - // is that the torrent doesn't keep track of the individual - // up/down ratio for each peer (ratio == 0) or (if it does - // keep track) this particular connection isn't a leecher. - // If the peer was choked because it was leeching, don't - // unchoke it again. - // The exception to this last condition is if we're a seed. - // In that case we don't care if people are leeching, they - // can't pay for their downloads anyway. - m_ses.unchoke_peer(*this); - } -#if defined TORRENT_VERBOSE_LOGGING - else - { - peer_log("DID NOT UNCHOKE [ the number of uploads (%d) " - "is more than or equal to the limit (%d) ]" - , m_ses.num_uploads(), m_settings.get_int(settings_pack::unchoke_slots_limit)); - } -#endif - } - else + if (!is_choked()) { // the reason to send an extra unchoke message here is that // because of the handshake-round-trip optimization, we may @@ -1756,7 +1720,51 @@ namespace libtorrent peer_log("SENDING REDUNDANT UNCHOKE"); #endif write_unchoke(); + return; } + + maybe_unchoke_this_peer(); + } + + void peer_connection::maybe_unchoke_this_peer() + { + if (ignore_unchoke_slots()) + { +#ifdef TORRENT_VERBOSE_LOGGING + peer_log("ABOUT TO UNCHOKE [ peer ignores unchoke slots ]"); +#endif + // if this peer is expempted from the choker + // just unchoke it immediately + send_unchoke(); + } + // TODO: 3 we should probably use ses.m_allowed_upload_slots here instead + // to work with auto-unchoke logic + else if (m_ses.preemptive_unchoke()) + { + // if the peer is choked and we have upload slots left, + // then unchoke it. Another condition that has to be met + // is that the torrent doesn't keep track of the individual + // up/down ratio for each peer (ratio == 0) or (if it does + // keep track) this particular connection isn't a leecher. + // If the peer was choked because it was leeching, don't + // unchoke it again. + // The exception to this last condition is if we're a seed. + // In that case we don't care if people are leeching, they + // can't pay for their downloads anyway. + + boost::shared_ptr t = m_torrent.lock(); + TORRENT_ASSERT(t); + + t->unchoke_peer(*this); + } +#if defined TORRENT_VERBOSE_LOGGING + else + { + peer_log("DID NOT UNCHOKE [ the number of uploads (%d) " + "is more than or equal to the limit (%d) ]" + , m_ses.num_uploads(), m_settings.get_int(settings_pack::unchoke_slots_limit)); + } +#endif } // ----------------------------- @@ -1789,23 +1797,7 @@ namespace libtorrent boost::shared_ptr t = m_torrent.lock(); TORRENT_ASSERT(t); - if (!is_choked()) - { - if (ignore_unchoke_slots()) - { - send_choke(); - } - else - { - if (m_peer_info && m_peer_info->optimistically_unchoked) - { - m_peer_info->optimistically_unchoked = false; - m_ses.trigger_optimistic_unchoke(); - } - m_ses.choke_peer(*this); - m_ses.trigger_unchoke(); - } - } + choke_this_peer(); if (t->super_seeding()) { @@ -1815,6 +1807,28 @@ namespace libtorrent } } + void peer_connection::choke_this_peer() + { + if (is_choked()) return; + if (ignore_unchoke_slots()) + { + send_choke(); + return; + } + + boost::shared_ptr t = m_torrent.lock(); + TORRENT_ASSERT(t); + + if (m_peer_info && m_peer_info->optimistically_unchoked) + { + m_peer_info->optimistically_unchoked = false; + m_counters.inc_stats_counter(counters::num_peers_up_unchoked_optimistic, -1); + t->trigger_optimistic_unchoke(); + } + t->choke_peer(*this); + t->trigger_unchoke(); + } + // ----------------------------- // ----------- HAVE ------------ // ----------------------------- @@ -3568,15 +3582,26 @@ namespace libtorrent { INVARIANT_CHECK; - if (m_peer_info && m_peer_info->optimistically_unchoked) - m_peer_info->optimistically_unchoked = false; + if (m_choked) + { + TORRENT_ASSERT(m_peer_info == NULL + || m_peer_info->optimistically_unchoked == false); + return false; + } + + if (m_peer_info && m_peer_info->optimistically_unchoked) + { + m_peer_info->optimistically_unchoked = false; + m_counters.inc_stats_counter(counters::num_peers_up_unchoked_optimistic, -1); + } - if (m_choked) return false; #ifdef TORRENT_VERBOSE_LOGGING peer_log("==> CHOKE"); #endif write_choke(); - m_counters.inc_stats_counter(counters::num_peers_up_unchoked, -1); + m_counters.inc_stats_counter(counters::num_peers_up_unchoked_all, -1); + if (!ignore_unchoke_slots()) + m_counters.inc_stats_counter(counters::num_peers_up_unchoked, -1); m_choked = true; m_last_choke = time_now(); @@ -3636,7 +3661,9 @@ namespace libtorrent m_last_unchoke = time_now(); write_unchoke(); - m_counters.inc_stats_counter(counters::num_peers_up_unchoked); + m_counters.inc_stats_counter(counters::num_peers_up_unchoked_all); + if (!ignore_unchoke_slots()) + m_counters.inc_stats_counter(counters::num_peers_up_unchoked); m_choked = false; m_uploaded_at_last_unchoke = m_statistics.total_payload_upload(); diff --git a/src/session_impl.cpp b/src/session_impl.cpp index ad23a7015..74ee26110 100644 --- a/src/session_impl.cpp +++ b/src/session_impl.cpp @@ -454,7 +454,6 @@ namespace aux { , m_socks_listen_port(0) , m_interface_index(0) , m_allowed_upload_slots(8) - , m_num_unchoked(0) , m_unchoke_time_scaler(0) , m_auto_manage_time_scaler(0) , m_optimistic_unchoke_time_scaler(0) @@ -3112,7 +3111,6 @@ retry: TORRENT_ASSERT(p->is_disconnecting()); - if (!p->is_choked() && !p->ignore_unchoke_slots()) --m_num_unchoked; TORRENT_ASSERT(sp.use_count() > 0); connection_map::iterator i = m_connections.find(sp); @@ -3138,24 +3136,6 @@ retry: m_key = key; } - void session_impl::unchoke_peer(peer_connection& c) - { - TORRENT_ASSERT(!c.ignore_unchoke_slots()); - torrent* t = c.associated_torrent().lock().get(); - TORRENT_ASSERT(t); - if (t->unchoke_peer(c)) - ++m_num_unchoked; - } - - void session_impl::choke_peer(peer_connection& c) - { - TORRENT_ASSERT(!c.ignore_unchoke_slots()); - torrent* t = c.associated_torrent().lock().get(); - TORRENT_ASSERT(t); - if (t->choke_peer(c)) - --m_num_unchoked; - } - int session_impl::next_port() const { int start = m_settings.get_int(settings_pack::outgoing_port); @@ -4748,7 +4728,7 @@ retry: if (ret) { pi->optimistically_unchoked = true; - ++m_num_unchoked; + m_stats_counters.inc_stats_counter(counters::num_peers_up_unchoked_optimistic); pi->last_optimistically_unchoked = boost::uint16_t(session_time()); } else @@ -4765,8 +4745,8 @@ retry: peer_connection* p = static_cast(pi->connection); torrent* t = p->associated_torrent().lock().get(); pi->optimistically_unchoked = false; + m_stats_counters.inc_stats_counter(counters::num_peers_up_unchoked_optimistic, -1); t->choke_peer(*p); - --m_num_unchoked; } } } @@ -4972,6 +4952,7 @@ retry: if (p->is_choked()) continue; if (pi && pi->optimistically_unchoked) { + m_stats_counters.inc_stats_counter(counters::num_peers_up_unchoked_optimistic, -1); pi->optimistically_unchoked = false; // force a new optimistic unchoke m_optimistic_unchoke_time_scaler = 0; @@ -5056,7 +5037,8 @@ retry: // if our current upload rate is less than 90% of our // limit if (m_stat.upload_rate() < upload_limit * 0.9f - && m_allowed_upload_slots <= m_num_unchoked + 1 + && m_allowed_upload_slots + <= m_stats_counters[counters::num_peers_up_unchoked] + 1 && m_upload_rate.queue_size() < 2) { ++m_allowed_upload_slots; @@ -5091,7 +5073,6 @@ retry: } } - m_num_unchoked = 0; // go through all the peers and unchoke the first ones and choke // all the other ones. for (std::vector::iterator i = peers.begin() @@ -5132,7 +5113,6 @@ retry: } --unchoke_set_size; - ++m_num_unchoked; TORRENT_ASSERT(p->peer_info_struct()); if (p->peer_info_struct()->optimistically_unchoked) @@ -5142,6 +5122,7 @@ retry: // proper unchoke set m_optimistic_unchoke_time_scaler = 0; p->peer_info_struct()->optimistically_unchoked = false; + m_stats_counters.inc_stats_counter(counters::num_peers_up_unchoked_optimistic, -1); } } else @@ -5150,8 +5131,6 @@ retry: TORRENT_ASSERT(p->peer_info_struct()); if (!p->is_choked() && !p->peer_info_struct()->optimistically_unchoked) t->choke_peer(*p); - if (!p->is_choked()) - ++m_num_unchoked; } } } @@ -6405,7 +6384,7 @@ retry: s.num_peers = int(m_connections.size()); s.num_dead_peers = int(m_undead_peers.size()); - s.num_unchoked = m_num_unchoked; + s.num_unchoked = m_stats_counters[counters::num_peers_up_unchoked_all]; s.allowed_upload_slots = m_allowed_upload_slots; s.num_torrents = m_torrents.size(); @@ -6966,7 +6945,8 @@ retry: bool session_impl::preemptive_unchoke() const { - return m_num_unchoked < m_allowed_upload_slots; + return m_stats_counters[counters::num_peers_up_unchoked] < m_allowed_upload_slots + || m_settings.get_int(settings_pack::unchoke_slots_limit) < 0; } void session_impl::update_dht_upload_rate_limit() @@ -7694,6 +7674,7 @@ retry: if (m_settings.get_int(settings_pack::choking_algorithm) == settings_pack::auto_expand_choker) TORRENT_ASSERT(m_allowed_upload_slots >= m_settings.get_int(settings_pack::unchoke_slots_limit)); int unchokes = 0; + int unchokes_all = 0; int num_optimistic = 0; int disk_queue[2] = {0, 0}; for (connection_map::const_iterator i = m_connections.begin(); @@ -7709,8 +7690,17 @@ retry: peer_connection* p = i->get(); TORRENT_ASSERT(!p->is_disconnecting()); - if (p->ignore_unchoke_slots()) continue; - if (!p->is_choked()) ++unchokes; + if (p->ignore_unchoke_slots()) + { + if (!p->is_choked()) ++unchokes_all; + continue; + } + if (!p->is_choked()) + { + ++unchokes; + ++unchokes_all; + } + if (p->peer_info_struct() && p->peer_info_struct()->optimistically_unchoked) { @@ -7719,18 +7709,49 @@ retry: } } - TORRENT_ASSERT(disk_queue[peer_connection::download_channel] == m_stats_counters[counters::num_peers_down_disk]); - TORRENT_ASSERT(disk_queue[peer_connection::upload_channel] == m_stats_counters[counters::num_peers_up_disk]); + for (std::vector >::const_iterator i + = m_undead_peers.begin(); i != m_undead_peers.end(); ++i) + { + peer_connection* p = i->get(); + if (p->ignore_unchoke_slots()) + { + if (!p->is_choked()) ++unchokes_all; + continue; + } + if (!p->is_choked()) + { + ++unchokes_all; + ++unchokes; + } + + if (p->peer_info_struct() + && p->peer_info_struct()->optimistically_unchoked) + { + ++num_optimistic; + TORRENT_ASSERT(!p->is_choked()); + } + } + + TORRENT_ASSERT(disk_queue[peer_connection::download_channel] + == m_stats_counters[counters::num_peers_down_disk]); + TORRENT_ASSERT(disk_queue[peer_connection::upload_channel] + == m_stats_counters[counters::num_peers_up_disk]); if (m_settings.get_int(settings_pack::num_optimistic_unchoke_slots)) { - TORRENT_ASSERT(num_optimistic <= m_settings.get_int(settings_pack::num_optimistic_unchoke_slots)); + TORRENT_ASSERT(num_optimistic <= m_settings.get_int( + settings_pack::num_optimistic_unchoke_slots)); } - if (m_num_unchoked != unchokes) - { - TORRENT_ASSERT(false); - } + int unchoked_counter_all = m_stats_counters[counters::num_peers_up_unchoked_all]; + int unchoked_counter = m_stats_counters[counters::num_peers_up_unchoked]; + int unchoked_counter_optimistic + = m_stats_counters[counters::num_peers_up_unchoked_optimistic]; + + TORRENT_ASSERT_VAL(unchoked_counter_all == unchokes_all, unchokes_all); + TORRENT_ASSERT_VAL(unchoked_counter == unchokes, unchokes); + TORRENT_ASSERT_VAL(unchoked_counter_optimistic == num_optimistic, num_optimistic); + for (torrent_map::const_iterator j = m_torrents.begin(); j != m_torrents.end(); ++j) { diff --git a/src/session_stats.cpp b/src/session_stats.cpp index 078d7350e..a6cab026d 100644 --- a/src/session_stats.cpp +++ b/src/session_stats.cpp @@ -106,6 +106,11 @@ namespace libtorrent // the number of peer connections for each kind of socket. // these counts include half-open (connecting) peers. + // ``num_peers_up_unchoked_all`` is the total number of unchoked peers, + // whereas ``num_peers_up_unchoked`` only are unchoked peers that count + // against the limit (i.e. excluding peers that are unchoked because the + // limit doesn't apply to them). ``num_peers_up_unchoked_optimistic`` is + // the number of optimistically unchoked peers. METRIC(peer, num_tcp_peers) METRIC(peer, num_socks5_peers) METRIC(peer, num_http_proxy_peers) @@ -120,6 +125,8 @@ namespace libtorrent METRIC(peer, num_peers_connected) METRIC(peer, num_peers_up_interested) METRIC(peer, num_peers_down_interested) + METRIC(peer, num_peers_up_unchoked_all) + METRIC(peer, num_peers_up_unchoked_optimistic) METRIC(peer, num_peers_up_unchoked) METRIC(peer, num_peers_down_unchoked) METRIC(peer, num_peers_up_requests) diff --git a/src/torrent.cpp b/src/torrent.cpp index 22e0da4f8..e88d42cc0 100644 --- a/src/torrent.cpp +++ b/src/torrent.cpp @@ -5616,6 +5616,18 @@ namespace libtorrent return true; } + void torrent::trigger_unchoke() + { + m_ses.get_io_service().dispatch(boost::bind( + &aux::session_interface::trigger_unchoke, boost::ref(m_ses))); + } + + void torrent::trigger_optimistic_unchoke() + { + m_ses.get_io_service().dispatch(boost::bind( + &aux::session_interface::trigger_optimistic_unchoke, boost::ref(m_ses))); + } + void torrent::cancel_block(piece_block block) { INVARIANT_CHECK; @@ -5763,14 +5775,14 @@ namespace libtorrent if (!p->is_choked() && !p->ignore_unchoke_slots()) { --m_num_uploads; - m_ses.trigger_unchoke(); + trigger_unchoke(); } torrent_peer* pp = p->peer_info_struct(); if (pp) { if (pp->optimistically_unchoked) - m_ses.trigger_optimistic_unchoke(); + trigger_optimistic_unchoke(); TORRENT_ASSERT(pp->prev_amount_upload == 0); TORRENT_ASSERT(pp->prev_amount_download == 0); @@ -9036,8 +9048,7 @@ namespace libtorrent // remove any un-sent requests from the queue p->clear_request_queue(); // don't accept new requests from the peer - if (!p->is_choked() && !p->ignore_unchoke_slots()) - m_ses.choke_peer(*p); + p->choke_this_peer(); continue; }