Merge pull request #351 from arvidn/dht-patch

DHT polishing
This commit is contained in:
Arvid Norberg 2016-01-01 17:20:16 -05:00
commit c6b75fa79e
13 changed files with 165 additions and 91 deletions

View File

@ -66,7 +66,7 @@ namespace libtorrent { namespace dht
{
struct dht_tracker;
struct dht_tracker
struct dht_tracker TORRENT_FINAL
: udp_socket_interface
, udp_socket_observer
, boost::enable_shared_from_this<dht_tracker>
@ -74,7 +74,7 @@ namespace libtorrent { namespace dht
dht_tracker(dht_observer* observer, rate_limited_udp_socket& sock
, dht_settings const& settings, counters& cnt
, dht_storage_constructor_type storage_constructor
, entry const* state = 0);
, entry const& state);
virtual ~dht_tracker();
void start(entry const& bootstrap
@ -137,7 +137,7 @@ namespace libtorrent { namespace dht
void connection_timeout(error_code const& e);
void refresh_timeout(error_code const& e);
void tick(error_code const& e);
void refresh_key(error_code const& e);
// implements udp_socket_interface
virtual bool has_quota();
@ -157,8 +157,7 @@ namespace libtorrent { namespace dht
std::vector<char> m_send_buf;
dos_blocker m_blocker;
time_point m_last_new_key;
deadline_timer m_timer;
deadline_timer m_key_refresh_timer;
deadline_timer m_connection_timer;
deadline_timer m_refresh_timer;
dht_settings const& m_settings;

View File

@ -202,7 +202,7 @@ public:
counters& stats_counters() const { return m_counters; }
dht_observer* observer() const { return m_observer; }
protected:
private:
void send_single_refresh(udp::endpoint const& ep, int bucket
, node_id const& id = node_id());
@ -213,7 +213,6 @@ protected:
libtorrent::dht_settings const& m_settings;
private:
typedef libtorrent::mutex mutex_t;
mutex_t m_mutex;

View File

@ -86,6 +86,9 @@ struct routing_table_node
class TORRENT_EXTRA_EXPORT routing_table : boost::noncopyable
{
public:
// TODO: 3 to improve memory locality and scanning performance, turn the
// routing table into a single vector with boundaries for the nodes instead.
// Perhaps replacement nodes should be in a separate vector.
typedef std::vector<routing_table_node> table_t;
routing_table(node_id const& id, int bucket_size

View File

@ -231,7 +231,7 @@ namespace libtorrent
}
// returns a bit-wise negated copy of the sha1-hash
sha1_hash operator~()
sha1_hash operator~() const
{
sha1_hash ret;
for (int i = 0; i < number_size; ++i)

View File

@ -65,27 +65,21 @@ using libtorrent::dht::packet_t;
using libtorrent::dht::msg;
using libtorrent::detail::write_endpoint;
enum
{
key_refresh = 5 // generate a new write token key every 5 minutes
};
namespace
{
const int tick_period = 1; // minutes
}
namespace libtorrent { namespace dht
{
void incoming_error(entry& e, char const* msg);
namespace {
node_id extract_node_id(entry const* e)
// generate a new write token key every 5 minutes
time_duration const key_refresh
= duration_cast<time_duration>(minutes(5));
node_id extract_node_id(entry const& e)
{
if (e == 0 || e->type() != entry::dictionary_t) return (node_id::min)();
entry const* nid = e->find_key("node-id");
if (nid == 0 || nid->type() != entry::string_t || nid->string().length() != 20)
if (e.type() != entry::dictionary_t) return (node_id::min)();
entry const* nid = e.find_key("node-id");
if (nid == NULL || nid->type() != entry::string_t || nid->string().length() != 20)
return (node_id::min)();
return node_id(nid->string().c_str());
}
@ -99,13 +93,12 @@ namespace libtorrent { namespace dht
, dht_settings const& settings
, counters& cnt
, dht_storage_constructor_type storage_constructor
, entry const* state)
, entry const& state)
: m_counters(cnt)
, m_dht(this, settings, extract_node_id(state), observer, cnt, storage_constructor)
, m_sock(sock)
, m_log(observer)
, m_last_new_key(clock_type::now() - minutes(int(key_refresh)))
, m_timer(sock.get_io_service())
, m_key_refresh_timer(sock.get_io_service())
, m_connection_timer(sock.get_io_service())
, m_refresh_timer(sock.get_io_service())
, m_settings(settings)
@ -137,8 +130,7 @@ namespace libtorrent { namespace dht
}
error_code ec;
m_timer.expires_from_now(seconds(1), ec);
m_timer.async_wait(boost::bind(&dht_tracker::tick, self(), _1));
refresh_key(ec);
m_connection_timer.expires_from_now(seconds(1), ec);
m_connection_timer.async_wait(
@ -153,7 +145,7 @@ namespace libtorrent { namespace dht
{
m_abort = true;
error_code ec;
m_timer.cancel(ec);
m_key_refresh_timer.cancel(ec);
m_connection_timer.cancel(ec);
m_refresh_timer.cancel(ec);
m_host_resolver.cancel();
@ -203,29 +195,26 @@ namespace libtorrent { namespace dht
boost::bind(&dht_tracker::refresh_timeout, self(), _1));
}
void dht_tracker::tick(error_code const& e)
void dht_tracker::refresh_key(error_code const& e)
{
if (e || m_abort) return;
error_code ec;
m_timer.expires_from_now(minutes(tick_period), ec);
m_timer.async_wait(boost::bind(&dht_tracker::tick, self(), _1));
m_key_refresh_timer.expires_from_now(key_refresh, ec);
m_key_refresh_timer.async_wait(boost::bind(&dht_tracker::refresh_key, self(), _1));
time_point now = clock_type::now();
if (now - minutes(int(key_refresh)) > m_last_new_key)
{
m_last_new_key = now;
m_dht.new_write_key();
#ifndef TORRENT_DISABLE_LOGGING
m_log->log(dht_logger::tracker, "*** new write key***");
#endif
}
/*
#if defined TORRENT_DEBUG && TORRENT_USE_IOSTREAM
std::ofstream st("dht_routing_table_state.txt", std::ios_base::trunc);
m_dht.print_state(st);
#endif
}
*/
void dht_tracker::get_peers(sha1_hash const& ih
, boost::function<void(std::vector<tcp::endpoint> const&)> f)

View File

@ -77,6 +77,14 @@ node_id calculate_node_id(node_id const& nid, dht_observer* observer)
{
address external_address;
if (observer) external_address = observer->external_address();
// if we don't have an observer, don't pretend that external_address is valid
// generating an ID based on 0.0.0.0 would be terrible. random is better
if (!observer || external_address == address())
{
return generate_random_id();
}
if (nid == (node_id::min)() || !verify_id(nid, external_address))
return generate_id(external_address);
@ -584,7 +592,6 @@ struct ping_observer : observer
}
};
void node::tick()
{
// every now and then we refresh our own ID, just to keep

View File

@ -51,6 +51,7 @@ node_id distance(node_id const& n1, node_id const& n2)
{
node_id ret;
node_id::iterator k = ret.begin();
// TODO: 3 the XORing should be done at full words instead of bytes
for (node_id::const_iterator i = n1.begin(), j = n2.begin()
, end(n1.end()); i != end; ++i, ++j, ++k)
{
@ -62,6 +63,7 @@ node_id distance(node_id const& n1, node_id const& n2)
// returns true if: distance(n1, ref) < distance(n2, ref)
bool compare_ref(node_id const& n1, node_id const& n2, node_id const& ref)
{
// TODO: 3 the XORing should be done at full words instead of bytes
for (node_id::const_iterator i = n1.begin(), j = n2.begin()
, k = ref.begin(), end(n1.end()); i != end; ++i, ++j, ++k)
{
@ -77,6 +79,8 @@ bool compare_ref(node_id const& n1, node_id const& n2, node_id const& ref)
// useful for finding out which bucket a node belongs to
int distance_exp(node_id const& n1, node_id const& n2)
{
// TODO: 3 the xoring should be done at full words and _builtin_clz() could
// be used as the last step
int byte = node_id::size - 1;
for (node_id::const_iterator i = n1.begin(), j = n2.begin()
, end(n1.end()); i != end; ++i, ++j, --byte)
@ -87,7 +91,7 @@ int distance_exp(node_id const& n1, node_id const& n2)
// we have found the first non-zero byte
// return the bit-number of the first bit
// that differs
int bit = byte * 8;
int const bit = byte * 8;
for (int b = 7; b >= 0; --b)
if (t >= (1 << b)) return bit + b;
return bit;

View File

@ -582,11 +582,13 @@ routing_table::add_node_status_t routing_table::add_node_impl(node_entry e)
table_t::iterator existing_bucket;
node_entry* existing = find_node(e.ep(), &existing_bucket);
if (!e.pinged() || existing == 0)
if (existing == 0)
{
// the new node is not pinged, or it's not an existing node
// we should ignore it, unless we allow duplicate IPs in our
// routing table
// the node we're trying to add is not a match with an existing node. we
// should ignore it, unless we allow duplicate IPs in our routing
// table. There could be a node with the same IP, but with a different
// port. m_ips just contain IP addresses, whereas the lookup we just
// performed was for full endpoints (address, port).
if (m_settings.restrict_routing_ips)
{
#ifndef TORRENT_DISABLE_LOGGING
@ -598,16 +600,27 @@ routing_table::add_node_status_t routing_table::add_node_impl(node_entry e)
return failed_to_add;
}
}
else if (existing && existing->id == e.id)
else if (existing->id == e.id)
{
// if the node ID is the same, just update the failcount
// and be done with it
// and be done with it.
existing->timeout_count = 0;
if (e.pinged())
{
existing->update_rtt(e.rtt);
existing->last_queried = e.last_queried;
}
return node_added;
}
else if (existing)
else if (!e.pinged())
{
// this may be a routing table poison attack. If we haven't confirmed
// that this peer actually exist with this new node ID yet, ignore it.
// we definitely don't want to replace the existing entry with this one
if (m_settings.restrict_routing_ips)
return failed_to_add;
}
else
{
TORRENT_ASSERT(existing->id != e.id);
// this is the same IP and port, but with
@ -620,8 +633,8 @@ routing_table::add_node_status_t routing_table::add_node_impl(node_entry e)
table_t::iterator i = find_bucket(e.id);
bucket_t& b = i->live_nodes;
bucket_t& rb = i->replacements;
int bucket_index = std::distance(m_buckets.begin(), i);
int bucket_size_limit = bucket_limit(bucket_index);
int const bucket_index = std::distance(m_buckets.begin(), i);
int const bucket_size_limit = bucket_limit(bucket_index);
bucket_t::iterator j;
@ -723,7 +736,7 @@ routing_table::add_node_status_t routing_table::add_node_impl(node_entry e)
// can we split the bucket?
// only nodes that haven't failed can split the bucket, and we can only
// split the last bucket
const bool can_split = (boost::next(i) == m_buckets.end()
bool const can_split = (boost::next(i) == m_buckets.end()
&& m_buckets.size() < 159)
&& e.fail_count() == 0
&& (i == m_buckets.begin() || boost::prior(i)->live_nodes.size() > 1);
@ -874,10 +887,13 @@ routing_table::add_node_status_t routing_table::add_node_impl(node_entry e)
*j = e;
m_ips.insert(e.addr().to_v4().to_bytes());
#ifndef TORRENT_DISABLE_LOGGING
if (m_log)
{
char hex_id[41];
to_hex(e.id.data(), sha1_hash::size, hex_id);
m_log->log(dht_logger::routing_table, "replacing node with higher RTT: %s %s"
, hex_id, print_address(e.addr()).c_str());
}
#endif
return node_added;
}
@ -931,8 +947,8 @@ void routing_table::split_bucket()
{
INVARIANT_CHECK;
int bucket_index = m_buckets.size()-1;
int bucket_size_limit = bucket_limit(bucket_index);
int const bucket_index = m_buckets.size()-1;
int const bucket_size_limit = bucket_limit(bucket_index);
TORRENT_ASSERT(int(m_buckets.back().live_nodes.size()) >= bucket_size_limit);
// this is the last bucket, and it's full already. Split
@ -946,10 +962,11 @@ void routing_table::split_bucket()
// move any node whose (160 - distane_exp(m_id, id)) >= (i - m_buckets.begin())
// to the new bucket
int new_bucket_size = bucket_limit(bucket_index + 1);
int const new_bucket_size = bucket_limit(bucket_index + 1);
for (bucket_t::iterator j = b.begin(); j != b.end();)
{
if (distance_exp(m_id, j->id) >= 159 - bucket_index)
int const d = distance_exp(m_id, j->id);
if (d >= 159 - bucket_index)
{
++j;
continue;
@ -1048,7 +1065,7 @@ void routing_table::node_failed(node_id const& nid, udp::endpoint const& ep)
#ifndef TORRENT_DISABLE_LOGGING
char hex_id[41];
to_hex(reinterpret_cast<char const*>(&nid[0]), 20, hex_id);
to_hex(nid.data(), 20, hex_id);
m_log->log(dht_logger::routing_table, "NODE FAILED id: %s ip: %s fails: %d pinged: %d up-time: %d"
, hex_id, print_endpoint(j->ep()).c_str()
, int(j->fail_count())
@ -1069,7 +1086,7 @@ void routing_table::node_failed(node_id const& nid, udp::endpoint const& ep)
#ifndef TORRENT_DISABLE_LOGGING
char hex_id[41];
to_hex(reinterpret_cast<char const*>(&nid[0]), 20, hex_id);
to_hex(nid.data(), 20, hex_id);
m_log->log(dht_logger::routing_table, "NODE FAILED id: %s ip: %s fails: %d pinged: %d up-time: %d"
, hex_id, print_endpoint(j->ep()).c_str()
, int(j->fail_count())

View File

@ -475,7 +475,7 @@ bool rpc_manager::invoke(entry& e, udp::endpoint target_addr
if (m_sock->send_packet(e, target_addr, 1))
{
m_transactions.insert(std::make_pair(tid,o));
m_transactions.insert(std::make_pair(tid, o));
#if TORRENT_USE_ASSERTS
o->m_was_sent = true;
#endif

View File

@ -5516,7 +5516,7 @@ retry:
, boost::ref(m_udp_socket), boost::cref(m_dht_settings)
, boost::ref(m_stats_counters)
, m_dht_storage_constructor
, &startup_state);
, startup_state);
for (std::vector<udp::endpoint>::iterator i = m_dht_router_nodes.begin()
, end(m_dht_router_nodes.end()); i != end; ++i)
@ -6731,12 +6731,17 @@ retry:
// since we have a new external IP now, we need to
// restart the DHT with a new node ID
#ifndef TORRENT_DISABLE_DHT
// TODO: 1 we only need to do this if our global IPv4 address has changed
// since the DHT (currently) only supports IPv4. Since restarting the DHT
// is kind of expensive, it would be nice to not do it unnecessarily
if (m_dht)
{
// TODO: 3 instead of restarting the whole DHT, change the external IP,
// node ID and re-jiggle the routing table in-place. A complete restart
// throws away all outstanding requests, which may be significant
// during bootstrap
entry s = m_dht->state();
int cur_state = 0;
int prev_state = 0;

View File

@ -1194,32 +1194,57 @@ TORRENT_TEST(dht)
// test kademlia functions
// this is a bit too expensive to do under valgrind
#ifndef TORRENT_USE_VALGRIND
for (int i = 0; i < 160; i += 8)
{
for (int j = 0; j < 160; j += 8)
{
node_id a(0);
a[(159-i) / 8] = 1 << (i & 7);
node_id b(0);
b[(159-j) / 8] = 1 << (j & 7);
int dist = distance_exp(a, b);
// distance_exp
TEST_CHECK(dist >= 0 && dist < 160);
TEST_CHECK(dist == ((i == j)?0:(std::max)(i, j)));
TEST_EQUAL(distance_exp(
to_hash("ffffffffffffffffffffffffffffffffffffffff"),
to_hash("0000000000000000000000000000000000000000"))
, 159);
for (int k = 0; k < 160; k += 8)
{
node_id c(0);
c[(159-k) / 8] = 1 << (k & 7);
TEST_EQUAL(distance_exp(
to_hash("ffffffffffffffffffffffffffffffffffffffff"),
to_hash("7fffffffffffffffffffffffffffffffffffffff"))
, 159);
bool cmp = compare_ref(a, b, c);
TEST_CHECK(cmp == (distance(a, c) < distance(b, c)));
}
}
}
#endif
TEST_EQUAL(distance_exp(
to_hash("ffffffffffffffffffffffffffffffffffffffff"),
to_hash("ffffffffffffffffffffffffffffffffffffffff"))
, 0);
TEST_EQUAL(distance_exp(
to_hash("ffffffffffffffffffffffffffffffffffffffff"),
to_hash("fffffffffffffffffffffffffffffffffffffffe"))
, 0);
TEST_EQUAL(distance_exp(
to_hash("8000000000000000000000000000000000000000"),
to_hash("fffffffffffffffffffffffffffffffffffffffe"))
, 158);
TEST_EQUAL(distance_exp(
to_hash("c000000000000000000000000000000000000000"),
to_hash("fffffffffffffffffffffffffffffffffffffffe"))
, 157);
TEST_EQUAL(distance_exp(
to_hash("e000000000000000000000000000000000000000"),
to_hash("fffffffffffffffffffffffffffffffffffffffe"))
, 156);
TEST_EQUAL(distance_exp(
to_hash("f000000000000000000000000000000000000000"),
to_hash("fffffffffffffffffffffffffffffffffffffffe"))
, 155);
TEST_EQUAL(distance_exp(
to_hash("f8f2340985723049587230495872304958703294"),
to_hash("f743589043r890f023980f90e203980d090c3840"))
, 155);
TEST_EQUAL(distance_exp(
to_hash("ffff740985723049587230495872304958703294"),
to_hash("ffff889043r890f023980f90e203980d090c3840"))
, 159 - 16);
{
// test kademlia routing table
@ -2500,5 +2525,31 @@ TORRENT_TEST(rpc_invalid_error_msg)
TEST_EQUAL(found, true);
}
// test bucket distribution
TORRENT_TEST(node_id_bucket_distribution)
{
int nodes_per_bucket[160] = {0};
dht::node_id reference_id = generate_id(rand_v4());
int const num_samples = 100000;
for (int i = 0; i < num_samples; ++i)
{
dht::node_id nid = generate_id(rand_v4());
int const bucket = 159 - distance_exp(reference_id, nid);
++nodes_per_bucket[bucket];
}
for (int i = 0; i < 25; ++i)
{
printf("%3d ", nodes_per_bucket[i]);
}
printf("\n");
int expected = num_samples / 2;
for (int i = 0; i < 25; ++i)
{
TEST_CHECK(std::abs(nodes_per_bucket[i] - expected) < num_samples / 20);
expected /= 2;
}
}
#endif