introduce global connection priority for improved swarm performance

This commit is contained in:
Arvid Norberg 2012-12-31 06:54:54 +00:00
parent 5cd4296914
commit ac5a9e9882
17 changed files with 356 additions and 46 deletions

View File

@ -297,6 +297,7 @@ if(build_tests)
test_piece_picker
test_fast_extension
test_pe_crypto
test_peer_priority
test_bencoding
test_bdecode_performance
test_primitives

View File

@ -1,3 +1,4 @@
* introduce global connection priority for improved swarm performance
* make files deleted alert non-discardable
* make built-in sha functions not conflict with libcrypto
* improve web seed hash failure case

View File

@ -1151,6 +1151,7 @@ namespace libtorrent
#ifdef TORRENT_UPNP_LOGGING
std::ofstream m_upnp_log;
#endif
// TODO: factor the IP voting out to its own type, maybe templated by the address type. Have one IPv4 vote and a separate IPv6 vote. Maybe even better, have one per local interface sockets can be bound to
struct external_ip_t
{
external_ip_t(): sources(0), num_votes(0) {}

View File

@ -188,6 +188,19 @@ namespace libtorrent
m_peer_info = pi;
}
// this is called when the peer object is created, in case
// it was let in by the connections limit slack. This means
// the peer needs to, as soon as the handshake is done, either
// disconnect itself or another peer.
void peer_exceeds_limit()
{ m_exceeded_limit = true; }
// this is called if this peer causes another peer
// to be disconnected, in which case it has fulfilled
// its requirement.
void peer_disconnected_other()
{ m_exceeded_limit = false; }
policy::peer* peer_info_struct() const
{ return m_peer_info; }
@ -258,6 +271,8 @@ namespace libtorrent
m_priority = p;
}
boost::uint32_t peer_rank() const;
void fast_reconnect(bool r);
bool fast_reconnect() const { return m_fast_reconnect; }
@ -1172,6 +1187,14 @@ namespace libtorrent
// otherwise.
bool m_has_metadata:1;
// this is set to true if this peer was accepted exceeding
// the connection limit. It means it has to disconnect
// itself, or some other peer, as soon as it's completed
// the handshake. We need to wait for the handshake in
// order to know which torrent it belongs to, to know which
// other peers to compare it to.
bool m_exceeded_limit:1;
template <std::size_t Size>
struct handler_storage
{

View File

@ -101,6 +101,11 @@ namespace libtorrent
free_upload_amount = 4 * 16 * 1024
};
// calculate the priority of a peer based on its address. One of the
// endpoint should be our own. The priority is symmetric, so it doesn't
// matter which is which
TORRENT_EXTRA_EXPORT boost::uint32_t peer_priority(tcp::endpoint e1, tcp::endpoint e2);
void request_a_block(torrent& t, peer_connection& c);
class TORRENT_EXTRA_EXPORT policy
@ -172,6 +177,8 @@ namespace libtorrent
size_type total_download() const;
size_type total_upload() const;
boost::uint32_t rank(tcp::endpoint const& external) const;
libtorrent::address address() const;
char const* dest() const;
@ -198,6 +205,11 @@ namespace libtorrent
// will refer to a valid peer_connection
peer_connection* connection;
// as computed by hashing our IP with the remote
// IP of this peer
// calculated lazily
mutable boost::uint32_t peer_rank;
#ifndef TORRENT_DISABLE_GEO_IP
#ifdef TORRENT_DEBUG
// only used in debug mode to assert that
@ -430,7 +442,7 @@ namespace libtorrent
bool compare_peer_erase(policy::peer const& lhs, policy::peer const& rhs) const;
bool compare_peer(policy::peer const& lhs, policy::peer const& rhs
, address const& external_ip) const;
, tcp::endpoint const& external_ip) const;
iterator find_connect_candidate(int session_time);

View File

@ -802,6 +802,10 @@ namespace libtorrent
// the max number of connections in the session
int connections_limit;
// the number of extra incoming connections allowed
// temporarily, in order to support replacing peers
int connections_slack;
// target delay, milliseconds
int utp_target_delay;

View File

@ -33,6 +33,8 @@ POSSIBILITY OF SUCH DAMAGE.
#ifndef TORRENT_STRING_UTIL_HPP_INCLUDED
#define TORRENT_STRING_UTIL_HPP_INCLUDED
#include "libtorrent/config.hpp"
namespace libtorrent
{
TORRENT_EXTRA_EXPORT bool is_alpha(char c);

View File

@ -136,6 +136,8 @@ namespace libtorrent
void notify_extension_add_peer(tcp::endpoint const& ip, int src, int flags);
#endif
peer_connection* find_lowest_ranking_peer() const;
#if defined TORRENT_DEBUG || TORRENT_RELEASE_ASSERTS
bool has_peer(peer_connection* p) const
{ return m_connections.find(p) != m_connections.end(); }

View File

@ -35,6 +35,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include <vector>
#include <boost/limits.hpp>
#include <boost/bind.hpp>
#include <boost/cstdint.hpp>
#include <stdarg.h> // for va_start, va_end
#include "libtorrent/peer_connection.hpp"
@ -193,6 +194,7 @@ namespace libtorrent
, m_ignore_stats(false)
, m_corked(false)
, m_has_metadata(true)
, m_exceeded_limit(false)
#if defined TORRENT_DEBUG || TORRENT_RELEASE_ASSERTS
, m_in_constructor(true)
, m_disconnect_started(false)
@ -1073,6 +1075,26 @@ namespace libtorrent
if (m_disconnecting) return;
m_torrent = wpt;
if (m_exceeded_limit)
{
// find a peer in some torrent (presumably the one with most peers)
// and disconnect the lowest ranking peer
aux::session_impl::torrent_map::iterator i = std::max_element(m_ses.m_torrents.begin(), m_ses.m_torrents.end()
, boost::bind(&torrent::num_peers, boost::bind(&session_impl::torrent_map::value_type::second, _1))
< boost::bind(&torrent::num_peers, boost::bind(&session_impl::torrent_map::value_type::second, _2)));
TORRENT_ASSERT(i != m_ses.m_torrents.end());
if (i->second->num_peers() <= t->num_peers())
{
disconnect(errors::too_many_connections);
return;
}
// find the lowest ranking peer and disconnect that
peer_connection* p = i->second->find_lowest_ranking_peer();
p->disconnect(errors::too_many_connections);
peer_disconnected_other();
}
TORRENT_ASSERT(!m_torrent.expired());
// if the torrent isn't ready to accept
@ -1090,6 +1112,11 @@ namespace libtorrent
TORRENT_ASSERT(!m_torrent.expired());
}
boost::uint32_t peer_connection::peer_rank() const
{
return m_peer_info->rank(tcp::endpoint(m_ses.external_address(), m_ses.listen_port()));
}
// message handlers
// -----------------------------

View File

@ -115,6 +115,97 @@ namespace
namespace libtorrent
{
void apply_mask(boost::uint8_t* b, boost::uint8_t const* mask, int size)
{
for (int i = 0; i < size; ++i)
{
*b &= *mask;
++b;
++mask;
}
}
// 1. if the IP addresses are identical, hash the ports in 16 bit network-order
// binary representation, ordered lowest first.
// 2. if the IPs are in the same /24, hash the IPs ordered, lowest first.
// 3. if the IPs are in the ame /16, mask the IPs by 0xffffff55, hash them
// ordered, lowest first.
// 4. if IPs are not in the same /16, mask the IPs by 0xffff5555, hash them
// ordered, lowest first.
//
// * for IPv6 peers, just use the first 64 bits and widen the masks.
// like this: 0xffff5555 -> 0xffffffff55555555
// the lower 64 bits are always unmasked
//
// * for IPv6 addresses, compare /32 and /48 instead of /16 and /24
//
// * the two IP addresses that are used to calculate the rank must
// always be of the same address family
//
// * all IP addresses are in network byte order when hashed
boost::uint32_t peer_priority(tcp::endpoint e1, tcp::endpoint e2)
{
TORRENT_ASSERT(e1.address().is_v4() == e2.address().is_v4());
using std::swap;
hasher h;
if (e1.address() == e2.address())
{
if (e1.port() > e2.port())
swap(e1, e2);
boost::uint16_t p[2];
p[0] = htons(e1.port());
p[1] = htons(e2.port());
h.update((char const*)&p[0], 4);
}
#if TORRENT_USE_IPV6
else if (e1.address().is_v6())
{
const static boost::uint8_t v6mask[][8] = {
{ 0xff, 0xff, 0xff, 0xff, 0x55, 0x55, 0x55, 0x55 },
{ 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0x55, 0x55 },
{ 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff }
};
if (e1 > e2) swap(e1, e2);
address_v6::bytes_type b1 = e1.address().to_v6().to_bytes();
address_v6::bytes_type b2 = e2.address().to_v6().to_bytes();
int mask = memcmp(&b1[0], &b2[0], 4) ? 0
: memcmp(&b1[0], &b2[0], 6) ? 1 : 2;
apply_mask(&b1[0], v6mask[mask], 8);
apply_mask(&b2[0], v6mask[mask], 8);
h.update((char const*)&b1[0], b1.size());
h.update((char const*)&b2[0], b2.size());
}
#endif
else
{
const static boost::uint8_t v4mask[][4] = {
{ 0xff, 0xff, 0x55, 0x55 },
{ 0xff, 0xff, 0xff, 0x55 },
{ 0xff, 0xff, 0xff, 0xff }
};
if (e1 > e2) swap(e1, e2);
address_v4::bytes_type b1 = e1.address().to_v4().to_bytes();
address_v4::bytes_type b2 = e2.address().to_v4().to_bytes();
int mask = memcmp(&b1[0], &b2[0], 2) ? 0
: memcmp(&b1[0], &b2[0], 3) ? 1 : 2;
apply_mask(&b1[0], v4mask[mask], 4);
apply_mask(&b2[0], v4mask[mask], 4);
h.update((char const*)&b1[0], b1.size());
h.update((char const*)&b2[0], b2.size());
}
boost::uint32_t ret;
sha1_hash digest = h.final();
memcpy(&ret, &digest[0], 4);
return ntohl(ret);
}
// returns the rank of a peer's source. We have an affinity
// to connecting to peers with higher rank. This is to avoid
// problems when our peer list is diluted by stale peers from
@ -651,17 +742,7 @@ namespace libtorrent
TORRENT_ASSERT(m_finished == m_torrent->is_finished());
int min_reconnect_time = m_torrent->settings().min_reconnect_time;
address external_ip = m_torrent->session().external_address();
// don't bias any particular peers when seeding
if (m_finished || external_ip == address())
{
// set external_ip to a random value, to
// radomize which peers we prefer
address_v4::bytes_type bytes;
std::generate(bytes.begin(), bytes.end(), &random);
external_ip = address_v4(bytes);
}
tcp::endpoint external_ip(m_torrent->session().external_address(), m_torrent->session().listen_port());
if (m_round_robin >= int(m_peers.size())) m_round_robin = 0;
@ -750,8 +831,9 @@ namespace libtorrent
(*m_torrent->session().m_logger) << time_now_string()
<< " *** FOUND CONNECTION CANDIDATE ["
" ip: " << m_peers[candidate]->ip() <<
" d: " << cidr_distance(external_ip, m_peers[candidate]->address()) <<
" external: " << external_ip <<
" d: " << cidr_distance(external_ip.address(), m_peers[candidate]->address()) <<
" rank: " << m_peers[candidate]->rank(external_ip) <<
" external: " << external_ip.address() <<
" t: " << (session_time - m_peers[candidate]->last_connected) <<
" ]\n";
}
@ -1778,6 +1860,7 @@ namespace libtorrent
: prev_amount_upload(0)
, prev_amount_download(0)
, connection(0)
, peer_rank(0)
#ifndef TORRENT_DISABLE_GEO_IP
, inet_as(0)
#endif
@ -1819,6 +1902,16 @@ namespace libtorrent
TORRENT_ASSERT((src & 0xff) == src);
}
// TOOD: pass in both an IPv6 and IPv4 address here
boost::uint32_t policy::peer::rank(tcp::endpoint const& external) const
{
//TODO: really, keep track of one external IP per address family
//TODO: how do we deal with our external address changing?
if (peer_rank == 0)
peer_rank = peer_priority(external, tcp::endpoint(this->address(), this->port));
return peer_rank;
}
size_type policy::peer::total_download() const
{
if (connection != 0)
@ -1870,7 +1963,7 @@ namespace libtorrent
// this returns true if lhs is a better connect candidate than rhs
bool policy::compare_peer(policy::peer const& lhs, policy::peer const& rhs
, address const& external_ip) const
, tcp::endpoint const& external_ip) const
{
// prefer peers with lower failcount
if (lhs.failcount != rhs.failcount)
@ -1897,9 +1990,9 @@ namespace libtorrent
if (lhs_as != rhs_as) return lhs_as > rhs_as;
}
#endif
int lhs_distance = cidr_distance(external_ip, lhs.address());
int rhs_distance = cidr_distance(external_ip, rhs.address());
if (lhs_distance < rhs_distance) return true;
boost::uint32_t lhs_peer_rank = lhs.rank(external_ip);
boost::uint32_t rhs_peer_rank = rhs.rank(external_ip);
if (lhs_peer_rank > rhs_peer_rank) return true;
return false;
}
}

View File

@ -1259,6 +1259,7 @@ namespace libtorrent
, unchoke_slots_limit(8)
, half_open_limit(0)
, connections_limit(200)
, connections_slack(10)
, utp_target_delay(100) // milliseconds
, utp_gain_factor(1500) // bytes per rtt
, utp_min_timeout(500) // milliseconds

View File

@ -2765,13 +2765,23 @@ retry:
return;
}
// check if we have any active torrents
// if we don't reject the connection
if (m_torrents.empty())
{
#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
session_log(" There are no torrents, disconnect");
#endif
return;
}
// don't allow more connections than the max setting
bool reject = false;
if (m_settings.ignore_limits_on_local_network && is_local(endp.address()))
reject = m_settings.connections_limit < INT_MAX / 12
&& num_connections() >= m_settings.connections_limit * 12 / 10;
else
reject = num_connections() >= m_settings.connections_limit;
reject = num_connections() >= m_settings.connections_limit + m_settings.connections_slack;
if (reject)
{
@ -2782,23 +2792,13 @@ retry:
, error_code(errors::too_many_connections, get_libtorrent_category())));
}
#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
(*m_logger) << "number of connections limit exceeded (conns: "
<< num_connections() << ", limit: " << m_settings.connections_limit
<< "), connection rejected\n";
session_log("number of connections limit exceeded (conns: %d"
", limit: %d slack: %d), connection rejected\n"
, num_connections(), m_settings.connections_limit, m_settings.connections_slack);
#endif
return;
}
// check if we have any active torrents
// if we don't reject the connection
if (m_torrents.empty())
{
#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
session_log(" There are no torrents, disconnect");
#endif
return;
}
// if we don't have any active torrents, there's no
// point in accepting this connection. If, however,
// the setting to start up queued torrents when they
@ -2835,6 +2835,12 @@ retry:
if (!c->is_disconnecting())
{
// in case we've exceeded the limit, let this peer know that
// as soon as it's received the handshake, it needs to either
// disconnect or pick another peer to disconnect
if (num_connections() >= m_settings.connections_limit)
c->peer_exceeds_limit();
m_connections.insert(c);
c->start();
// update the next disk peer round-robin cursor

View File

@ -1492,6 +1492,17 @@ namespace libtorrent
#endif // TORRENT_OPENSSL
peer_connection* torrent::find_lowest_ranking_peer() const
{
// TODO: filter out peers that are disconnecting
peer_iterator lowest_rank = std::min_element(begin(), end()
, boost::bind(&peer_connection::peer_rank, _1)
< boost::bind(&peer_connection::peer_rank, _2));
if (lowest_rank == end()) return NULL;
return *lowest_rank;
}
// this may not be called from a constructor because of the call to
// shared_from_this()
void torrent::init()
@ -5885,22 +5896,48 @@ namespace libtorrent
// connection attempts that haven't completed yet,
// disconnect one of them and let this incoming
// connection through.
if (m_num_connecting < m_max_connections / 10)
if (m_num_connecting > m_max_connections / 10)
{
p->disconnect(errors::too_many_connections);
return false;
}
// find one of the connecting peers and disconnect it
// TODO: ideally, we would disconnect the oldest connection
// i.e. the one that has waited the longest to connect.
// find any peer that's connecting (i.e. a half-open TCP connection)
// that's also not disconnecting
std::set<peer_connection*>::iterator i = std::find_if(begin(), end()
, boost::bind(&peer_connection::is_connecting, _1)
&& !boost::bind(&peer_connection::is_disconnecting, _1));
// find one of the connecting peers and disconnect it
// TODO: ideally, we would disconnect the oldest connection
// i.e. the one that has waited the longest to connect.
for (std::set<peer_connection*>::iterator i = m_connections.begin()
, end(m_connections.end()); i != end; ++i)
if (i == end())
{
// this seems odd, but we might as well handle it
p->disconnect(errors::too_many_connections);
return false;
}
(*i)->disconnect(errors::too_many_connections);
// if this peer was let in via connections slack,
// it has done its duty of causing the disconnection
// of another peer
p->peer_disconnected_other();
}
else
{
peer_connection* peer = *i;
if (!peer->is_connecting()) continue;
peer->disconnect(errors::too_many_connections);
break;
// now, find the lowest rank peer and disconnect that
// if it's lower rank than the incoming connection
peer_connection* peer = find_lowest_ranking_peer();
// TODO: if peer is a really good peer, maybe we shouldn't disconnect it
if (peer && peer->peer_rank() < p->peer_rank())
{
peer->disconnect(errors::too_many_connections);
p->peer_disconnected_other();
}
else
{
p->disconnect(errors::too_many_connections);
return false;
}
}
}

View File

@ -39,6 +39,7 @@ project
;
test-suite libtorrent :
[ run test_peer_priority.cpp ]
[ run test_file.cpp ]
[ run test_threads.cpp ]
[ run test_rss.cpp ]

View File

@ -14,6 +14,7 @@ test_programs = \
test_metadata_extension \
test_natpmp \
test_pe_crypto \
test_peer_priority \
test_pex \
test_piece_picker \
test_primitives \
@ -56,6 +57,7 @@ test_ip_filter_SOURCES = test_ip_filter.cpp
test_lsd_SOURCES = test_lsd.cpp
test_metadata_extension_SOURCES = test_metadata_extension.cpp
test_natpmp_SOURCES = test_natpmp.cpp
test_peer_priority_SOURCES = test_peer_priority.cpp
test_pe_crypto_SOURCES = test_pe_crypto.cpp
test_pex_SOURCES = test_pex.cpp
test_piece_picker_SOURCES = test_piece_picker.cpp

View File

@ -0,0 +1,97 @@
/*
Copyright (c) 2012, Arvid Norberg
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in
the documentation and/or other materials provided with the distribution.
* Neither the name of the author nor the names of its
contributors may be used to endorse or promote products derived
from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
*/
#include "libtorrent/policy.hpp"
#include "libtorrent/hasher.hpp"
#include "test.hpp"
using namespace libtorrent;
boost::uint32_t hash_buffer(char const* buf, int len)
{
hasher h;
h.update(buf, len);
sha1_hash digest = h.final();
boost::uint32_t ret;
memcpy(&ret, &digest[0], 4);
return ntohl(ret);
}
int test_main()
{
// when the IP is the same, we hash the ports, sorted
boost::uint32_t p = peer_priority(
tcp::endpoint(address::from_string("230.12.123.3"), 0x4d2)
, tcp::endpoint(address::from_string("230.12.123.3"), 0x12c));
TEST_EQUAL(p, hash_buffer("\x01\x2c\x04\xd2", 4));
// when we're in the same /24, we just hash the IPs
p = peer_priority(
tcp::endpoint(address::from_string("230.12.123.1"), 0x4d2)
, tcp::endpoint(address::from_string("230.12.123.3"), 0x12c));
TEST_EQUAL(p, hash_buffer("\xe6\x0c\x7b\x01\xe6\x0c\x7b\x03", 8));
// when we're in the same /16, we just hash the IPs masked by
// 0xffffff55
p = peer_priority(
tcp::endpoint(address::from_string("230.12.23.1"), 0x4d2)
, tcp::endpoint(address::from_string("230.12.123.3"), 0x12c));
TEST_EQUAL(p, hash_buffer("\xe6\x0c\x17\x01\xe6\x0c\x7b\x01", 8));
// when we're in different /16, we just hash the IPs masked by
// 0xffff5555
p = peer_priority(
tcp::endpoint(address::from_string("230.120.23.1"), 0x4d2)
, tcp::endpoint(address::from_string("230.12.123.3"), 0x12c));
TEST_EQUAL(p, hash_buffer("\xe6\x0c\x51\x01\xe6\x78\x15\x01", 8));
// IPv6 has a twice as wide mask, and we only care about the top 64 bits
// when the IPs are the same, just hash the ports
p = peer_priority(
tcp::endpoint(address::from_string("ffff:ffff:ffff:ffff::1"), 0x4d2)
, tcp::endpoint(address::from_string("ffff:ffff:ffff:ffff::1"), 0x12c));
TEST_EQUAL(p, hash_buffer("\x01\x2c\x04\xd2", 4));
// these IPs don't belong to the same /32, so apply the full mask
// 0xffffffff55555555
p = peer_priority(
tcp::endpoint(address::from_string("ffff:ffff:ffff:ffff::1"), 0x4d2)
, tcp::endpoint(address::from_string("ffff:0fff:ffff:ffff::1"), 0x12c));
TEST_EQUAL(p, hash_buffer(
"\xff\xff\x0f\xff\x55\x55\x55\x55\x00\x00\x00\x00\x00\x00\x00\x01"
"\xff\xff\xff\xff\x55\x55\x55\x55\x00\x00\x00\x00\x00\x00\x00\x01", 32));
return 0;
}

View File

@ -1,6 +1,6 @@
/*
Copyright (c) 2008, Arvid Norberg
Copyright (c) 2008-2012, Arvid Norberg
All rights reserved.
Redistribution and use in source and binary forms, with or without