more steps towards isolating peer_connections from the session object. unchoking now happens via the torrent object. hopefully in the future, the session can determine which peers to unchoke without having direct access to them (i.e. with a mutex)

This commit is contained in:
Arvid Norberg 2014-09-22 03:47:43 +00:00
parent 27b7705cf2
commit d7d498c3a3
10 changed files with 203 additions and 132 deletions

View File

@ -495,18 +495,23 @@ namespace libtorrent
int rate_limit(peer_class_t c, int channel) const;
bool preemptive_unchoke() const;
int num_uploads() const { return m_num_unchoked; }
int num_uploads() const
{ return m_stats_counters[counters::num_peers_up_unchoked]; }
int num_connections() const
{ return m_connections.size(); }
int peak_up_rate() const { return m_peak_up_rate; }
void unchoke_peer(peer_connection& c);
void choke_peer(peer_connection& c);
void trigger_unchoke()
{ m_unchoke_time_scaler = 0; }
{
TORRENT_ASSERT(is_single_thread());
m_unchoke_time_scaler = 0;
}
void trigger_optimistic_unchoke()
{ m_optimistic_unchoke_time_scaler = 0; }
{
TORRENT_ASSERT(is_single_thread());
m_optimistic_unchoke_time_scaler = 0;
}
session_status status() const;
void set_peer_id(peer_id const& id);
@ -914,24 +919,24 @@ namespace libtorrent
// this should always be >= m_max_uploads
int m_allowed_upload_slots;
// the number of unchoked peers
int m_num_unchoked;
// this is initialized to the unchoke_interval
// session_setting and decreased every second.
// when it reaches zero, it is reset to the
// unchoke_interval and the unchoke set is
// recomputed.
// TODO: replace this by a proper asio timer
int m_unchoke_time_scaler;
// this is used to decide when to recalculate which
// torrents to keep queued and which to activate
// TODO: replace this by a proper asio timer
int m_auto_manage_time_scaler;
// works like unchoke_time_scaler but it
// is only decresed when the unchoke set
// is recomputed, and when it reaches zero,
// the optimistic unchoke is moved to another peer.
// TODO: replace this by a proper asio timer
int m_optimistic_unchoke_time_scaler;
// works like unchoke_time_scaler. Each time

View File

@ -147,8 +147,6 @@ namespace libtorrent { namespace aux
virtual bool is_aborted() const = 0;
virtual int num_uploads() const = 0;
virtual bool preemptive_unchoke() const = 0;
virtual void unchoke_peer(peer_connection& c) = 0;
virtual void choke_peer(peer_connection& c) = 0;
virtual void trigger_optimistic_unchoke() = 0;
virtual void trigger_unchoke() = 0;

View File

@ -469,6 +469,9 @@ namespace libtorrent
bool is_peer_interested() const { return m_peer_interested; }
bool has_peer_choked() const { return m_peer_choked; }
void choke_this_peer();
void maybe_unchoke_this_peer();
void update_interest();
virtual void get_peer_info(peer_info& p) const;

View File

@ -338,6 +338,8 @@ namespace libtorrent
num_peers_connected,
num_peers_up_interested,
num_peers_down_interested,
num_peers_up_unchoked_all,
num_peers_up_unchoked_optimistic,
num_peers_up_unchoked,
num_peers_down_unchoked,
num_peers_up_requests,

View File

@ -584,6 +584,9 @@ namespace libtorrent
bool choke_peer(peer_connection& c);
bool unchoke_peer(peer_connection& c, bool optimistic = false);
void trigger_unchoke();
void trigger_optimistic_unchoke();
// used by peer_connection to attach itself to a torrent
// since incoming connections don't know what torrent
// they're a part of until they have received an info_hash.

View File

@ -851,10 +851,7 @@ namespace libtorrent
// if the peer is ignoring unchoke slots, or if we have enough
// unused slots, unchoke this peer right away, to save a round-trip
// in case it's interested.
if (ignore_unchoke_slots())
send_unchoke();
else if (m_ses.preemptive_unchoke())
m_ses.unchoke_peer(*this);
maybe_unchoke_this_peer();
}
}
}
@ -3029,10 +3026,7 @@ namespace libtorrent
// if the peer is ignoring unchoke slots, or if we have enough
// unused slots, unchoke this peer right away, to save a round-trip
// in case it's interested.
if (ignore_unchoke_slots())
send_unchoke();
else if (m_ses.preemptive_unchoke())
m_ses.unchoke_peer(*this);
maybe_unchoke_this_peer();
}
}

View File

@ -873,7 +873,11 @@ namespace libtorrent
if (m_peer_interested)
m_counters.inc_stats_counter(counters::num_peers_up_interested, -1);
if (!m_choked)
m_counters.inc_stats_counter(counters::num_peers_up_unchoked, -1);
{
m_counters.inc_stats_counter(counters::num_peers_up_unchoked_all, -1);
if (!ignore_unchoke_slots())
m_counters.inc_stats_counter(counters::num_peers_up_unchoked, -1);
}
if (!m_peer_choked)
m_counters.inc_stats_counter(counters::num_peers_down_unchoked, -1);
if (m_connected)
@ -1284,8 +1288,14 @@ namespace libtorrent
return;
}
if (t->is_paused() && (!t->is_auto_managed()
|| !m_settings.get_bool(settings_pack::incoming_starts_queued_torrents)))
if (t->is_paused()
&& m_settings.get_bool(settings_pack::incoming_starts_queued_torrents)
&& !t->is_aborted())
{
t->resume();
}
if (t->is_paused() || t->is_aborted())
{
// paused torrents will not accept
// incoming connections unless they are auto managed
@ -1316,15 +1326,6 @@ namespace libtorrent
TORRENT_ASSERT(m_torrent.expired());
if (t->is_paused()
&& m_settings.get_bool(settings_pack::incoming_starts_queued_torrents)
&& !m_ses.is_paused()
&& !t->is_aborted()
&& !m_ses.is_aborted())
{
t->resume();
}
// check to make sure we don't have another connection with the same
// info_hash and peer_id. If we do. close this connection.
t->attach_peer(this);
@ -1705,44 +1706,7 @@ namespace libtorrent
return;
}
if (is_choked())
{
if (ignore_unchoke_slots())
{
#ifdef TORRENT_VERBOSE_LOGGING
peer_log("ABOUT TO UNCHOKE [ peer ignores unchoke slots ]");
#endif
// if this peer is expempted from the choker
// just unchoke it immediately
send_unchoke();
}
// TODO: 3 we should probably use ses.m_allowed_upload_slots here instead
// to work with auto-unchoke logic
else if (m_ses.num_uploads() < m_settings.get_int(settings_pack::unchoke_slots_limit)
|| m_settings.get_int(settings_pack::unchoke_slots_limit) < 0)
{
// if the peer is choked and we have upload slots left,
// then unchoke it. Another condition that has to be met
// is that the torrent doesn't keep track of the individual
// up/down ratio for each peer (ratio == 0) or (if it does
// keep track) this particular connection isn't a leecher.
// If the peer was choked because it was leeching, don't
// unchoke it again.
// The exception to this last condition is if we're a seed.
// In that case we don't care if people are leeching, they
// can't pay for their downloads anyway.
m_ses.unchoke_peer(*this);
}
#if defined TORRENT_VERBOSE_LOGGING
else
{
peer_log("DID NOT UNCHOKE [ the number of uploads (%d) "
"is more than or equal to the limit (%d) ]"
, m_ses.num_uploads(), m_settings.get_int(settings_pack::unchoke_slots_limit));
}
#endif
}
else
if (!is_choked())
{
// the reason to send an extra unchoke message here is that
// because of the handshake-round-trip optimization, we may
@ -1756,7 +1720,51 @@ namespace libtorrent
peer_log("SENDING REDUNDANT UNCHOKE");
#endif
write_unchoke();
return;
}
maybe_unchoke_this_peer();
}
void peer_connection::maybe_unchoke_this_peer()
{
if (ignore_unchoke_slots())
{
#ifdef TORRENT_VERBOSE_LOGGING
peer_log("ABOUT TO UNCHOKE [ peer ignores unchoke slots ]");
#endif
// if this peer is expempted from the choker
// just unchoke it immediately
send_unchoke();
}
// TODO: 3 we should probably use ses.m_allowed_upload_slots here instead
// to work with auto-unchoke logic
else if (m_ses.preemptive_unchoke())
{
// if the peer is choked and we have upload slots left,
// then unchoke it. Another condition that has to be met
// is that the torrent doesn't keep track of the individual
// up/down ratio for each peer (ratio == 0) or (if it does
// keep track) this particular connection isn't a leecher.
// If the peer was choked because it was leeching, don't
// unchoke it again.
// The exception to this last condition is if we're a seed.
// In that case we don't care if people are leeching, they
// can't pay for their downloads anyway.
boost::shared_ptr<torrent> t = m_torrent.lock();
TORRENT_ASSERT(t);
t->unchoke_peer(*this);
}
#if defined TORRENT_VERBOSE_LOGGING
else
{
peer_log("DID NOT UNCHOKE [ the number of uploads (%d) "
"is more than or equal to the limit (%d) ]"
, m_ses.num_uploads(), m_settings.get_int(settings_pack::unchoke_slots_limit));
}
#endif
}
// -----------------------------
@ -1789,23 +1797,7 @@ namespace libtorrent
boost::shared_ptr<torrent> t = m_torrent.lock();
TORRENT_ASSERT(t);
if (!is_choked())
{
if (ignore_unchoke_slots())
{
send_choke();
}
else
{
if (m_peer_info && m_peer_info->optimistically_unchoked)
{
m_peer_info->optimistically_unchoked = false;
m_ses.trigger_optimistic_unchoke();
}
m_ses.choke_peer(*this);
m_ses.trigger_unchoke();
}
}
choke_this_peer();
if (t->super_seeding())
{
@ -1815,6 +1807,28 @@ namespace libtorrent
}
}
void peer_connection::choke_this_peer()
{
if (is_choked()) return;
if (ignore_unchoke_slots())
{
send_choke();
return;
}
boost::shared_ptr<torrent> t = m_torrent.lock();
TORRENT_ASSERT(t);
if (m_peer_info && m_peer_info->optimistically_unchoked)
{
m_peer_info->optimistically_unchoked = false;
m_counters.inc_stats_counter(counters::num_peers_up_unchoked_optimistic, -1);
t->trigger_optimistic_unchoke();
}
t->choke_peer(*this);
t->trigger_unchoke();
}
// -----------------------------
// ----------- HAVE ------------
// -----------------------------
@ -3568,15 +3582,26 @@ namespace libtorrent
{
INVARIANT_CHECK;
if (m_peer_info && m_peer_info->optimistically_unchoked)
m_peer_info->optimistically_unchoked = false;
if (m_choked)
{
TORRENT_ASSERT(m_peer_info == NULL
|| m_peer_info->optimistically_unchoked == false);
return false;
}
if (m_peer_info && m_peer_info->optimistically_unchoked)
{
m_peer_info->optimistically_unchoked = false;
m_counters.inc_stats_counter(counters::num_peers_up_unchoked_optimistic, -1);
}
if (m_choked) return false;
#ifdef TORRENT_VERBOSE_LOGGING
peer_log("==> CHOKE");
#endif
write_choke();
m_counters.inc_stats_counter(counters::num_peers_up_unchoked, -1);
m_counters.inc_stats_counter(counters::num_peers_up_unchoked_all, -1);
if (!ignore_unchoke_slots())
m_counters.inc_stats_counter(counters::num_peers_up_unchoked, -1);
m_choked = true;
m_last_choke = time_now();
@ -3636,7 +3661,9 @@ namespace libtorrent
m_last_unchoke = time_now();
write_unchoke();
m_counters.inc_stats_counter(counters::num_peers_up_unchoked);
m_counters.inc_stats_counter(counters::num_peers_up_unchoked_all);
if (!ignore_unchoke_slots())
m_counters.inc_stats_counter(counters::num_peers_up_unchoked);
m_choked = false;
m_uploaded_at_last_unchoke = m_statistics.total_payload_upload();

View File

@ -454,7 +454,6 @@ namespace aux {
, m_socks_listen_port(0)
, m_interface_index(0)
, m_allowed_upload_slots(8)
, m_num_unchoked(0)
, m_unchoke_time_scaler(0)
, m_auto_manage_time_scaler(0)
, m_optimistic_unchoke_time_scaler(0)
@ -3112,7 +3111,6 @@ retry:
TORRENT_ASSERT(p->is_disconnecting());
if (!p->is_choked() && !p->ignore_unchoke_slots()) --m_num_unchoked;
TORRENT_ASSERT(sp.use_count() > 0);
connection_map::iterator i = m_connections.find(sp);
@ -3138,24 +3136,6 @@ retry:
m_key = key;
}
void session_impl::unchoke_peer(peer_connection& c)
{
TORRENT_ASSERT(!c.ignore_unchoke_slots());
torrent* t = c.associated_torrent().lock().get();
TORRENT_ASSERT(t);
if (t->unchoke_peer(c))
++m_num_unchoked;
}
void session_impl::choke_peer(peer_connection& c)
{
TORRENT_ASSERT(!c.ignore_unchoke_slots());
torrent* t = c.associated_torrent().lock().get();
TORRENT_ASSERT(t);
if (t->choke_peer(c))
--m_num_unchoked;
}
int session_impl::next_port() const
{
int start = m_settings.get_int(settings_pack::outgoing_port);
@ -4748,7 +4728,7 @@ retry:
if (ret)
{
pi->optimistically_unchoked = true;
++m_num_unchoked;
m_stats_counters.inc_stats_counter(counters::num_peers_up_unchoked_optimistic);
pi->last_optimistically_unchoked = boost::uint16_t(session_time());
}
else
@ -4765,8 +4745,8 @@ retry:
peer_connection* p = static_cast<peer_connection*>(pi->connection);
torrent* t = p->associated_torrent().lock().get();
pi->optimistically_unchoked = false;
m_stats_counters.inc_stats_counter(counters::num_peers_up_unchoked_optimistic, -1);
t->choke_peer(*p);
--m_num_unchoked;
}
}
}
@ -4972,6 +4952,7 @@ retry:
if (p->is_choked()) continue;
if (pi && pi->optimistically_unchoked)
{
m_stats_counters.inc_stats_counter(counters::num_peers_up_unchoked_optimistic, -1);
pi->optimistically_unchoked = false;
// force a new optimistic unchoke
m_optimistic_unchoke_time_scaler = 0;
@ -5056,7 +5037,8 @@ retry:
// if our current upload rate is less than 90% of our
// limit
if (m_stat.upload_rate() < upload_limit * 0.9f
&& m_allowed_upload_slots <= m_num_unchoked + 1
&& m_allowed_upload_slots
<= m_stats_counters[counters::num_peers_up_unchoked] + 1
&& m_upload_rate.queue_size() < 2)
{
++m_allowed_upload_slots;
@ -5091,7 +5073,6 @@ retry:
}
}
m_num_unchoked = 0;
// go through all the peers and unchoke the first ones and choke
// all the other ones.
for (std::vector<peer_connection*>::iterator i = peers.begin()
@ -5132,7 +5113,6 @@ retry:
}
--unchoke_set_size;
++m_num_unchoked;
TORRENT_ASSERT(p->peer_info_struct());
if (p->peer_info_struct()->optimistically_unchoked)
@ -5142,6 +5122,7 @@ retry:
// proper unchoke set
m_optimistic_unchoke_time_scaler = 0;
p->peer_info_struct()->optimistically_unchoked = false;
m_stats_counters.inc_stats_counter(counters::num_peers_up_unchoked_optimistic, -1);
}
}
else
@ -5150,8 +5131,6 @@ retry:
TORRENT_ASSERT(p->peer_info_struct());
if (!p->is_choked() && !p->peer_info_struct()->optimistically_unchoked)
t->choke_peer(*p);
if (!p->is_choked())
++m_num_unchoked;
}
}
}
@ -6405,7 +6384,7 @@ retry:
s.num_peers = int(m_connections.size());
s.num_dead_peers = int(m_undead_peers.size());
s.num_unchoked = m_num_unchoked;
s.num_unchoked = m_stats_counters[counters::num_peers_up_unchoked_all];
s.allowed_upload_slots = m_allowed_upload_slots;
s.num_torrents = m_torrents.size();
@ -6966,7 +6945,8 @@ retry:
bool session_impl::preemptive_unchoke() const
{
return m_num_unchoked < m_allowed_upload_slots;
return m_stats_counters[counters::num_peers_up_unchoked] < m_allowed_upload_slots
|| m_settings.get_int(settings_pack::unchoke_slots_limit) < 0;
}
void session_impl::update_dht_upload_rate_limit()
@ -7694,6 +7674,7 @@ retry:
if (m_settings.get_int(settings_pack::choking_algorithm) == settings_pack::auto_expand_choker)
TORRENT_ASSERT(m_allowed_upload_slots >= m_settings.get_int(settings_pack::unchoke_slots_limit));
int unchokes = 0;
int unchokes_all = 0;
int num_optimistic = 0;
int disk_queue[2] = {0, 0};
for (connection_map::const_iterator i = m_connections.begin();
@ -7709,8 +7690,17 @@ retry:
peer_connection* p = i->get();
TORRENT_ASSERT(!p->is_disconnecting());
if (p->ignore_unchoke_slots()) continue;
if (!p->is_choked()) ++unchokes;
if (p->ignore_unchoke_slots())
{
if (!p->is_choked()) ++unchokes_all;
continue;
}
if (!p->is_choked())
{
++unchokes;
++unchokes_all;
}
if (p->peer_info_struct()
&& p->peer_info_struct()->optimistically_unchoked)
{
@ -7719,18 +7709,49 @@ retry:
}
}
TORRENT_ASSERT(disk_queue[peer_connection::download_channel] == m_stats_counters[counters::num_peers_down_disk]);
TORRENT_ASSERT(disk_queue[peer_connection::upload_channel] == m_stats_counters[counters::num_peers_up_disk]);
for (std::vector<boost::shared_ptr<peer_connection> >::const_iterator i
= m_undead_peers.begin(); i != m_undead_peers.end(); ++i)
{
peer_connection* p = i->get();
if (p->ignore_unchoke_slots())
{
if (!p->is_choked()) ++unchokes_all;
continue;
}
if (!p->is_choked())
{
++unchokes_all;
++unchokes;
}
if (p->peer_info_struct()
&& p->peer_info_struct()->optimistically_unchoked)
{
++num_optimistic;
TORRENT_ASSERT(!p->is_choked());
}
}
TORRENT_ASSERT(disk_queue[peer_connection::download_channel]
== m_stats_counters[counters::num_peers_down_disk]);
TORRENT_ASSERT(disk_queue[peer_connection::upload_channel]
== m_stats_counters[counters::num_peers_up_disk]);
if (m_settings.get_int(settings_pack::num_optimistic_unchoke_slots))
{
TORRENT_ASSERT(num_optimistic <= m_settings.get_int(settings_pack::num_optimistic_unchoke_slots));
TORRENT_ASSERT(num_optimistic <= m_settings.get_int(
settings_pack::num_optimistic_unchoke_slots));
}
if (m_num_unchoked != unchokes)
{
TORRENT_ASSERT(false);
}
int unchoked_counter_all = m_stats_counters[counters::num_peers_up_unchoked_all];
int unchoked_counter = m_stats_counters[counters::num_peers_up_unchoked];
int unchoked_counter_optimistic
= m_stats_counters[counters::num_peers_up_unchoked_optimistic];
TORRENT_ASSERT_VAL(unchoked_counter_all == unchokes_all, unchokes_all);
TORRENT_ASSERT_VAL(unchoked_counter == unchokes, unchokes);
TORRENT_ASSERT_VAL(unchoked_counter_optimistic == num_optimistic, num_optimistic);
for (torrent_map::const_iterator j
= m_torrents.begin(); j != m_torrents.end(); ++j)
{

View File

@ -106,6 +106,11 @@ namespace libtorrent
// the number of peer connections for each kind of socket.
// these counts include half-open (connecting) peers.
// ``num_peers_up_unchoked_all`` is the total number of unchoked peers,
// whereas ``num_peers_up_unchoked`` only are unchoked peers that count
// against the limit (i.e. excluding peers that are unchoked because the
// limit doesn't apply to them). ``num_peers_up_unchoked_optimistic`` is
// the number of optimistically unchoked peers.
METRIC(peer, num_tcp_peers)
METRIC(peer, num_socks5_peers)
METRIC(peer, num_http_proxy_peers)
@ -120,6 +125,8 @@ namespace libtorrent
METRIC(peer, num_peers_connected)
METRIC(peer, num_peers_up_interested)
METRIC(peer, num_peers_down_interested)
METRIC(peer, num_peers_up_unchoked_all)
METRIC(peer, num_peers_up_unchoked_optimistic)
METRIC(peer, num_peers_up_unchoked)
METRIC(peer, num_peers_down_unchoked)
METRIC(peer, num_peers_up_requests)

View File

@ -5616,6 +5616,18 @@ namespace libtorrent
return true;
}
void torrent::trigger_unchoke()
{
m_ses.get_io_service().dispatch(boost::bind(
&aux::session_interface::trigger_unchoke, boost::ref(m_ses)));
}
void torrent::trigger_optimistic_unchoke()
{
m_ses.get_io_service().dispatch(boost::bind(
&aux::session_interface::trigger_optimistic_unchoke, boost::ref(m_ses)));
}
void torrent::cancel_block(piece_block block)
{
INVARIANT_CHECK;
@ -5763,14 +5775,14 @@ namespace libtorrent
if (!p->is_choked() && !p->ignore_unchoke_slots())
{
--m_num_uploads;
m_ses.trigger_unchoke();
trigger_unchoke();
}
torrent_peer* pp = p->peer_info_struct();
if (pp)
{
if (pp->optimistically_unchoked)
m_ses.trigger_optimistic_unchoke();
trigger_optimistic_unchoke();
TORRENT_ASSERT(pp->prev_amount_upload == 0);
TORRENT_ASSERT(pp->prev_amount_download == 0);
@ -9036,8 +9048,7 @@ namespace libtorrent
// remove any un-sent requests from the queue
p->clear_request_queue();
// don't accept new requests from the peer
if (!p->is_choked() && !p->ignore_unchoke_slots())
m_ses.choke_peer(*p);
p->choke_this_peer();
continue;
}