optimize the optimistic unchoke logic. extend the API for extensions to be able to affect the order of optimistic unchokes

This commit is contained in:
arvidn 2016-01-23 13:37:50 -05:00 committed by arvidn
parent fb790aec1a
commit f2ce2284da
7 changed files with 158 additions and 82 deletions

View File

@ -1169,15 +1169,19 @@ namespace libtorrent
typedef std::list<boost::shared_ptr<plugin> > ses_extension_list_t;
ses_extension_list_t m_ses_extensions;
// std::string could be used for the query names if only all common implementations used SSO
// *glares at gcc*
struct extention_dht_query
// the union of all session extensions' implemented_features(). This is
// used to exclude callbacks to the session extensions.
boost::uint32_t m_session_extension_features;
// std::string could be used for the query names if only all common
// implementations used SSO *glares at gcc*
struct extension_dht_query
{
boost::uint8_t query_len;
boost::array<char, max_dht_query_length> query;
dht_extension_handler_t handler;
};
typedef std::vector<extention_dht_query> m_extension_dht_queries_t;
typedef std::vector<extension_dht_query> m_extension_dht_queries_t;
m_extension_dht_queries_t m_extension_dht_queries;
#endif

View File

@ -208,6 +208,27 @@ namespace libtorrent
// hidden
virtual ~plugin() {}
// these are flags that can be returned by implemented_features()
// indicating which callbacks this plugin is interested in
enum feature_flags_t
{
// include this bit if your plugin needs to alter the order of the
// optimistic unchoke of peers. i.e. have the on_optimistic_unchoke()
// callback be called.
optimistic_unchoke_feature = 1,
// include this bit if your plugin needs to have on_tick() called
tick_feature = 2
};
// This function is expected to return a bitmask indicating which features
// this plugin implements. Some callbacks on this object may not be called
// unless the corresponding feature flag is returned here. Note that
// callbacks may still be called even if the corresponding feature is not
// specified in the return value here. See feature_flags_t for possible
// flags to return.
virtual boost::uint32_t implemented_features() { return 0; }
// this is called by the session every time a new torrent is added.
// The ``torrent*`` points to the internal torrent object created
// for the new torrent. The ``void*`` is the userdata pointer as
@ -237,12 +258,13 @@ namespace libtorrent
// called once per second
virtual void on_tick() {}
// called when choosing peers to optimistically unchoke
// peer's will be unchoked in the order they appear in the given
// vector which is initially sorted by when they were last
// optimistically unchoked.
// if the plugin returns true then the ordering provided will be
// used and no other plugin will be allowed to change it.
// called when choosing peers to optimisticallly unchoke. peer's will be
// unchoked in the order they appear in the given vector. if
// the plugin returns true then the ordering provided will be used and no
// other plugin will be allowed to change it. If your plugin expects this
// to be called, make sure to include the flag
// ``optimistic_unchoke_feature`` in the return value from
// implemented_features().
virtual bool on_optimistic_unchoke(std::vector<peer_connection_handle>& /* peers */)
{ return false; }

View File

@ -50,6 +50,7 @@ struct crypto_plugin;
typedef boost::system::error_code error_code;
// hidden
struct TORRENT_EXPORT peer_connection_handle
{
peer_connection_handle(boost::weak_ptr<peer_connection> impl)
@ -113,11 +114,11 @@ struct TORRENT_EXPORT peer_connection_handle
time_point time_of_last_unchoke() const;
bool operator==(peer_connection_handle const& o) const
{ return m_connection.lock() == o.m_connection.lock(); }
{ return !(m_connection < o.m_connection) && !(o.m_connection < m_connection); }
bool operator!=(peer_connection_handle const& o) const
{ return m_connection.lock() != o.m_connection.lock(); }
{ return m_connection < o.m_connection || o.m_connection < m_connection; }
bool operator<(peer_connection_handle const& o) const
{ return m_connection.lock() < o.m_connection.lock(); }
{ return m_connection < o.m_connection; }
boost::shared_ptr<peer_connection> native_handle() const
{

View File

@ -219,7 +219,7 @@ namespace libtorrent { namespace
m_torrent.set_progress_ppm(boost::int64_t(m_metadata_progress) * 1000000 / m_metadata_size);
}
void on_piece_pass(int)
void on_piece_pass(int) TORRENT_OVERRIDE
{
// if we became a seed, copy the metadata from
// the torrent before it is deallocated

View File

@ -430,6 +430,9 @@ namespace aux {
, m_download_connect_attempts(0)
, m_next_scrape_torrent(0)
, m_tick_residual(0)
#ifndef TORRENT_DISABLE_EXTENSIONS
, m_session_extension_features(0)
#endif
, m_deferred_submit_disk_jobs(false)
, m_pending_auto_manage(false)
, m_need_auto_manage(false)
@ -921,6 +924,7 @@ namespace aux {
boost::shared_ptr<plugin> p(new session_plugin_wrapper(ext));
m_ses_extensions.push_back(p);
m_session_extension_features |= p->implemented_features();
}
void session_impl::add_ses_extension(boost::shared_ptr<plugin> ext)
@ -931,6 +935,7 @@ namespace aux {
m_ses_extensions.push_back(ext);
m_alerts.add_extension(ext);
ext->added(session_handle(this));
m_session_extension_features |= ext->implemented_features();
// get any DHT queries the plugin would like to handle
// and record them in m_extension_dht_queries for lookup
@ -942,7 +947,7 @@ namespace aux {
{
TORRENT_ASSERT(e->first.size() <= max_dht_query_length);
if (e->first.size() > max_dht_query_length) continue;
extention_dht_query registration;
extension_dht_query registration;
registration.query_len = e->first.size();
std::copy(e->first.begin(), e->first.end(), registration.query.begin());
registration.handler = e->second;
@ -3047,12 +3052,15 @@ retry:
}
#ifndef TORRENT_DISABLE_EXTENSIONS
for (ses_extension_list_t::const_iterator i = m_ses_extensions.begin()
, end(m_ses_extensions.end()); i != end; ++i)
if (m_session_extension_features & plugin::tick_feature)
{
TORRENT_TRY {
(*i)->on_tick();
} TORRENT_CATCH(std::exception&) {}
for (ses_extension_list_t::const_iterator i = m_ses_extensions.begin()
, end(m_ses_extensions.end()); i != end; ++i)
{
TORRENT_TRY {
(*i)->on_tick();
} TORRENT_CATCH(std::exception&) {}
}
}
#endif
@ -3764,24 +3772,32 @@ retry:
}
namespace {
struct last_optimistic_unchoke_cmp
bool last_optimistic_unchoke_cmp(torrent_peer const* const l
, torrent_peer const* const r)
{
bool operator()(peer_connection_handle const& l
, peer_connection_handle const& r)
{
return l.native_handle()->peer_info_struct()->last_optimistically_unchoked
< r.native_handle()->peer_info_struct()->last_optimistically_unchoked;
}
};
return l->last_optimistically_unchoked
< r->last_optimistically_unchoked;
}
}
void session_impl::recalculate_optimistic_unchoke_slots()
{
INVARIANT_CHECK;
TORRENT_ASSERT(is_single_thread());
if (m_stats_counters[counters::num_unchoke_slots] == 0) return;
std::vector<peer_connection_handle> opt_unchoke;
std::vector<torrent_peer*> opt_unchoke;
// collect the currently optimistically unchoked peers here, so we can
// choke them when we've found new optimistic unchoke candidates.
std::vector<torrent_peer*> prev_opt_unchoke;
// TODO: 3 it would probably make sense to have a separate list of peers
// that are eligible for optimistic unchoke, similar to the torrents
// perhaps this could even iterate over the pool allocators of
// torrent_peer objects. It could probably be done in a single pass and
// collect the n best candidates
for (connection_map::iterator i = m_connections.begin()
, end(m_connections.end()); i != end; ++i)
{
@ -3790,90 +3806,123 @@ retry:
torrent_peer* pi = p->peer_info_struct();
if (!pi) continue;
if (pi->web_seed) continue;
torrent* t = p->associated_torrent().lock().get();
if (!t) continue;
if (t->is_paused()) continue;
if (pi->optimistically_unchoked)
{
TORRENT_ASSERT(!p->is_choked());
opt_unchoke.push_back(peer_connection_handle(*i));
prev_opt_unchoke.push_back(pi);
}
torrent* t = p->associated_torrent().lock().get();
if (!t) continue;
// TODO: 3 peers should know whether their torrent is paused or not,
// instead of having to ask it over and over again
if (t->is_paused()) continue;
if (!p->is_connecting()
&& !p->is_disconnecting()
&& p->is_peer_interested()
&& t->free_upload_slots()
&& p->is_choked()
&& (p->is_choked() || pi->optimistically_unchoked)
&& !p->ignore_unchoke_slots()
&& t->valid_metadata())
{
opt_unchoke.push_back(peer_connection_handle(*i));
opt_unchoke.push_back(pi);
}
}
// find the peers that has been waiting the longest to be optimistically
// unchoked
// avoid having a bias towards peers that happen to be sorted first
std::random_shuffle(opt_unchoke.begin(), opt_unchoke.end(), randint);
// sort all candidates based on when they were last optimistically
// unchoked.
std::sort(opt_unchoke.begin(), opt_unchoke.end(), last_optimistic_unchoke_cmp());
#ifndef TORRENT_DISABLE_EXTENSIONS
for (ses_extension_list_t::iterator i = m_ses_extensions.begin()
, end(m_ses_extensions.end()); i != end; ++i)
{
if ((*i)->on_optimistic_unchoke(opt_unchoke))
break;
}
#endif
int num_opt_unchoke = m_settings.get_int(settings_pack::num_optimistic_unchoke_slots);
int allowed_unchoke_slots = m_stats_counters[counters::num_unchoke_slots];
if (num_opt_unchoke == 0) num_opt_unchoke = (std::max)(1, allowed_unchoke_slots / 5);
if (num_opt_unchoke > int(opt_unchoke.size())) num_opt_unchoke =
int(opt_unchoke.size());
// find the n best optimistic unchoke candidates
std::partial_sort(opt_unchoke.begin()
, opt_unchoke.begin() + num_opt_unchoke
, opt_unchoke.end(), &last_optimistic_unchoke_cmp);
#ifndef TORRENT_DISABLE_EXTENSIONS
if (m_session_extension_features & plugin::optimistic_unchoke_feature)
{
// if there is an extension that wants to reorder the optimistic
// unchoke peers, first convert the vector into one containing
// peer_connection_handles, since that's the exported API
std::vector<peer_connection_handle> peers;
peers.reserve(opt_unchoke.size());
for (std::vector<torrent_peer*>::iterator i = opt_unchoke.begin()
, end(opt_unchoke.end()); i != end; ++i)
{
peers.push_back(peer_connection_handle(static_cast<peer_connection*>((*i)->connection)->self()));
}
for (ses_extension_list_t::iterator i = m_ses_extensions.begin()
, end(m_ses_extensions.end()); i != end; ++i)
{
if ((*i)->on_optimistic_unchoke(peers))
break;
}
// then convert back to the internal torrent_peer pointers
opt_unchoke.clear();
for (std::vector<peer_connection_handle>::iterator i = peers.begin()
, end(peers.end()); i != end; ++i)
{
opt_unchoke.push_back(i->native_handle()->peer_info_struct());
}
}
#endif
// unchoke the first num_opt_unchoke peers in the candidate set
// and make sure that the others are choked
for (std::vector<peer_connection_handle>::iterator i = opt_unchoke.begin()
, end(opt_unchoke.end()); i != end; ++i)
std::vector<torrent_peer*>::iterator opt_unchoke_end = opt_unchoke.begin()
+ num_opt_unchoke;
for (std::vector<torrent_peer*>::iterator i = opt_unchoke.begin();
i != opt_unchoke_end; ++i)
{
torrent_peer* pi = i->native_handle()->peer_info_struct();
if (num_opt_unchoke > 0)
torrent_peer* pi = *i;
if (pi->optimistically_unchoked)
{
--num_opt_unchoke;
if (!pi->optimistically_unchoked)
{
peer_connection* p = static_cast<peer_connection*>(pi->connection);
torrent* t = p->associated_torrent().lock().get();
bool ret = t->unchoke_peer(*p, true);
if (ret)
{
pi->optimistically_unchoked = true;
m_stats_counters.inc_stats_counter(counters::num_peers_up_unchoked_optimistic);
pi->last_optimistically_unchoked = boost::uint16_t(session_time());
}
else
{
// we failed to unchoke it, increment the count again
++num_opt_unchoke;
}
}
TORRENT_ASSERT(!pi->connection->is_choked());
// remove this peer from prev_opt_unchoke, to prevent us from
// choking it later. This peer gets another round of optimistic
// unchoke
std::vector<torrent_peer*>::iterator existing =
std::find(prev_opt_unchoke.begin(), prev_opt_unchoke.end(), pi);
TORRENT_ASSERT(existing != prev_opt_unchoke.end());
prev_opt_unchoke.erase(existing);
}
else
{
if (pi->optimistically_unchoked)
peer_connection* p = static_cast<peer_connection*>(pi->connection);
TORRENT_ASSERT(p->is_choked());
boost::shared_ptr<torrent> t = p->associated_torrent().lock();
bool ret = t->unchoke_peer(*p, true);
TORRENT_ASSERT(ret);
if (ret)
{
peer_connection* p = static_cast<peer_connection*>(pi->connection);
torrent* t = p->associated_torrent().lock().get();
pi->optimistically_unchoked = false;
m_stats_counters.inc_stats_counter(counters::num_peers_up_unchoked_optimistic, -1);
t->choke_peer(*p);
pi->optimistically_unchoked = true;
m_stats_counters.inc_stats_counter(counters::num_peers_up_unchoked_optimistic);
pi->last_optimistically_unchoked = boost::uint16_t(session_time());
}
}
}
// now, choke all the previous optimistically unchoked peers
for (std::vector<torrent_peer*>::iterator i = prev_opt_unchoke.begin()
, end(prev_opt_unchoke.end()); i != end; ++i)
{
torrent_peer* pi = *i;
TORRENT_ASSERT(pi->optimistically_unchoked);
peer_connection* p = static_cast<peer_connection*>(pi->connection);
boost::shared_ptr<torrent> t = p->associated_torrent().lock();
pi->optimistically_unchoked = false;
m_stats_counters.inc_stats_counter(counters::num_peers_up_unchoked_optimistic, -1);
t->choke_peer(*p);
}
}
void session_impl::try_connect_more_peers()

View File

@ -284,7 +284,7 @@ namespace libtorrent
return "";
}
#endif
libtorrent::address torrent_peer::address() const
{
#if TORRENT_USE_IPV6

View File

@ -167,7 +167,7 @@ namespace libtorrent { namespace
m_torrent.set_progress_ppm(boost::int64_t(m_metadata_progress) * 1000000 / m_metadata_size);
}
*/
void on_piece_pass(int)
void on_piece_pass(int) TORRENT_OVERRIDE
{
// if we became a seed, copy the metadata from
// the torrent before it is deallocated