simplify and improve unit test for distance_exp. make some immutable variables const in the DHT implementation. instead of waking up periodically just to check if it's time to refresh the DHT secret key, set the timer to only wake up to refresh the key. If we don't have a DHT observer (to ask for our external IP) or if we don't know our external IP, don't generate a node ID based on 0.0.0.0, just generate a random ID instead. Simplified and improved node replacement logic in the routing table a little bit

This commit is contained in:
arvidn 2016-01-01 09:21:07 -05:00
parent fb8d40a50c
commit 01e6b93854
13 changed files with 162 additions and 88 deletions

View File

@ -66,7 +66,7 @@ namespace libtorrent { namespace dht
{
struct dht_tracker;
struct dht_tracker
struct dht_tracker TORRENT_FINAL
: udp_socket_interface
, udp_socket_observer
, boost::enable_shared_from_this<dht_tracker>
@ -74,7 +74,7 @@ namespace libtorrent { namespace dht
dht_tracker(dht_observer* observer, rate_limited_udp_socket& sock
, dht_settings const& settings, counters& cnt
, dht_storage_constructor_type storage_constructor
, entry const* state = 0);
, entry const& state);
virtual ~dht_tracker();
void start(entry const& bootstrap
@ -137,7 +137,7 @@ namespace libtorrent { namespace dht
void connection_timeout(error_code const& e);
void refresh_timeout(error_code const& e);
void tick(error_code const& e);
void refresh_key(error_code const& e);
// implements udp_socket_interface
virtual bool has_quota();
@ -157,7 +157,6 @@ namespace libtorrent { namespace dht
std::vector<char> m_send_buf;
dos_blocker m_blocker;
time_point m_last_new_key;
deadline_timer m_timer;
deadline_timer m_connection_timer;
deadline_timer m_refresh_timer;

View File

@ -202,7 +202,7 @@ public:
counters& stats_counters() const { return m_counters; }
dht_observer* observer() const { return m_observer; }
protected:
private:
void send_single_refresh(udp::endpoint const& ep, int bucket
, node_id const& id = node_id());
@ -213,7 +213,6 @@ protected:
libtorrent::dht_settings const& m_settings;
private:
typedef libtorrent::mutex mutex_t;
mutex_t m_mutex;

View File

@ -49,7 +49,7 @@ struct TORRENT_EXTRA_EXPORT node_entry
node_entry(udp::endpoint ep);
node_entry();
void update_rtt(int new_rtt);
bool pinged() const { return timeout_count != 0xff; }
void set_pinged() { if (timeout_count == 0xff) timeout_count = 0; }
void timed_out() { if (pinged() && timeout_count < 0xfe) ++timeout_count; }

View File

@ -86,6 +86,9 @@ struct routing_table_node
class TORRENT_EXTRA_EXPORT routing_table : boost::noncopyable
{
public:
// TODO: 3 to improve memory locality and scanning performance, turn the
// routing table into a single vector with boundaries for the nodes instead.
// Perhaps replacement nodes should be in a separate vector.
typedef std::vector<routing_table_node> table_t;
routing_table(node_id const& id, int bucket_size

View File

@ -231,7 +231,7 @@ namespace libtorrent
}
// returns a bit-wise negated copy of the sha1-hash
sha1_hash operator~()
sha1_hash operator~() const
{
sha1_hash ret;
for (int i = 0; i < number_size; ++i)

View File

@ -65,27 +65,21 @@ using libtorrent::dht::packet_t;
using libtorrent::dht::msg;
using libtorrent::detail::write_endpoint;
enum
{
key_refresh = 5 // generate a new write token key every 5 minutes
};
namespace
{
const int tick_period = 1; // minutes
}
namespace libtorrent { namespace dht
{
void incoming_error(entry& e, char const* msg);
namespace {
node_id extract_node_id(entry const* e)
// generate a new write token key every 5 minutes
time_duration const key_refresh
= duration_cast<time_duration>(minutes(5));
node_id extract_node_id(entry const& e)
{
if (e == 0 || e->type() != entry::dictionary_t) return (node_id::min)();
entry const* nid = e->find_key("node-id");
if (nid == 0 || nid->type() != entry::string_t || nid->string().length() != 20)
if (e.type() != entry::dictionary_t) return (node_id::min)();
entry const* nid = e.find_key("node-id");
if (nid == NULL || nid->type() != entry::string_t || nid->string().length() != 20)
return (node_id::min)();
return node_id(nid->string().c_str());
}
@ -99,12 +93,11 @@ namespace libtorrent { namespace dht
, dht_settings const& settings
, counters& cnt
, dht_storage_constructor_type storage_constructor
, entry const* state)
, entry const& state)
: m_counters(cnt)
, m_dht(this, settings, extract_node_id(state), observer, cnt, storage_constructor)
, m_sock(sock)
, m_log(observer)
, m_last_new_key(clock_type::now() - minutes(int(key_refresh)))
, m_timer(sock.get_io_service())
, m_connection_timer(sock.get_io_service())
, m_refresh_timer(sock.get_io_service())
@ -137,8 +130,7 @@ namespace libtorrent { namespace dht
}
error_code ec;
m_timer.expires_from_now(seconds(1), ec);
m_timer.async_wait(boost::bind(&dht_tracker::tick, self(), _1));
refresh_key(ec);
m_connection_timer.expires_from_now(seconds(1), ec);
m_connection_timer.async_wait(
@ -203,29 +195,26 @@ namespace libtorrent { namespace dht
boost::bind(&dht_tracker::refresh_timeout, self(), _1));
}
void dht_tracker::tick(error_code const& e)
void dht_tracker::refresh_key(error_code const& e)
{
if (e || m_abort) return;
error_code ec;
m_timer.expires_from_now(minutes(tick_period), ec);
m_timer.async_wait(boost::bind(&dht_tracker::tick, self(), _1));
m_timer.expires_from_now(key_refresh, ec);
m_timer.async_wait(boost::bind(&dht_tracker::refresh_key, self(), _1));
time_point now = clock_type::now();
if (now - minutes(int(key_refresh)) > m_last_new_key)
{
m_last_new_key = now;
m_dht.new_write_key();
m_dht.new_write_key();
#ifndef TORRENT_DISABLE_LOGGING
m_log->log(dht_logger::tracker, "*** new write key***");
m_log->log(dht_logger::tracker, "*** new write key***");
#endif
}
}
/*
#if defined TORRENT_DEBUG && TORRENT_USE_IOSTREAM
std::ofstream st("dht_routing_table_state.txt", std::ios_base::trunc);
m_dht.print_state(st);
#endif
}
*/
void dht_tracker::get_peers(sha1_hash const& ih
, boost::function<void(std::vector<tcp::endpoint> const&)> f)

View File

@ -77,9 +77,17 @@ node_id calculate_node_id(node_id const& nid, dht_observer* observer)
{
address external_address;
if (observer) external_address = observer->external_address();
// if we don't have an observer, don't pretend that external_address is valid
// generating an ID based on 0.0.0.0 would be terrible. random is better
if (!observer || external_address == address())
{
return generate_random_id();
}
if (nid == (node_id::min)() || !verify_id(nid, external_address))
return generate_id(external_address);
return nid;
}
@ -584,7 +592,6 @@ struct ping_observer : observer
}
};
void node::tick()
{
// every now and then we refresh our own ID, just to keep

View File

@ -75,7 +75,7 @@ namespace libtorrent { namespace dht
first_seen = aux::time_now();
#endif
}
void node_entry::update_rtt(int new_rtt)
{
TORRENT_ASSERT(new_rtt <= 0xffff);

View File

@ -51,6 +51,7 @@ node_id distance(node_id const& n1, node_id const& n2)
{
node_id ret;
node_id::iterator k = ret.begin();
// TODO: 3 the XORing should be done at full words instead of bytes
for (node_id::const_iterator i = n1.begin(), j = n2.begin()
, end(n1.end()); i != end; ++i, ++j, ++k)
{
@ -62,6 +63,7 @@ node_id distance(node_id const& n1, node_id const& n2)
// returns true if: distance(n1, ref) < distance(n2, ref)
bool compare_ref(node_id const& n1, node_id const& n2, node_id const& ref)
{
// TODO: 3 the XORing should be done at full words instead of bytes
for (node_id::const_iterator i = n1.begin(), j = n2.begin()
, k = ref.begin(), end(n1.end()); i != end; ++i, ++j, ++k)
{
@ -77,6 +79,8 @@ bool compare_ref(node_id const& n1, node_id const& n2, node_id const& ref)
// useful for finding out which bucket a node belongs to
int distance_exp(node_id const& n1, node_id const& n2)
{
// TODO: 3 the xoring should be done at full words and _builtin_clz() could
// be used as the last step
int byte = node_id::size - 1;
for (node_id::const_iterator i = n1.begin(), j = n2.begin()
, end(n1.end()); i != end; ++i, ++j, --byte)
@ -87,7 +91,7 @@ int distance_exp(node_id const& n1, node_id const& n2)
// we have found the first non-zero byte
// return the bit-number of the first bit
// that differs
int bit = byte * 8;
int const bit = byte * 8;
for (int b = 7; b >= 0; --b)
if (t >= (1 << b)) return bit + b;
return bit;

View File

@ -582,11 +582,13 @@ routing_table::add_node_status_t routing_table::add_node_impl(node_entry e)
table_t::iterator existing_bucket;
node_entry* existing = find_node(e.ep(), &existing_bucket);
if (!e.pinged() || existing == 0)
if (existing == 0)
{
// the new node is not pinged, or it's not an existing node
// we should ignore it, unless we allow duplicate IPs in our
// routing table
// the node we're trying to add is not a match with an existing node. we
// should ignore it, unless we allow duplicate IPs in our routing
// table. There could be a node with the same IP, but with a different
// port. m_ips just contain IP addresses, whereas the lookup we just
// performed was for full endpoints (address, port).
if (m_settings.restrict_routing_ips)
{
#ifndef TORRENT_DISABLE_LOGGING
@ -598,16 +600,27 @@ routing_table::add_node_status_t routing_table::add_node_impl(node_entry e)
return failed_to_add;
}
}
else if (existing && existing->id == e.id)
else if (existing->id == e.id)
{
// if the node ID is the same, just update the failcount
// and be done with it
// and be done with it.
existing->timeout_count = 0;
existing->update_rtt(e.rtt);
existing->last_queried = e.last_queried;
if (e.pinged())
{
existing->update_rtt(e.rtt);
existing->last_queried = e.last_queried;
}
return node_added;
}
else if (existing)
else if (!e.pinged())
{
// this may be a routing table poison attack. If we haven't confirmed
// that this peer actually exist with this new node ID yet, ignore it.
// we definitely don't want to replace the existing entry with this one
if (m_settings.restrict_routing_ips)
return failed_to_add;
}
else
{
TORRENT_ASSERT(existing->id != e.id);
// this is the same IP and port, but with
@ -620,8 +633,8 @@ routing_table::add_node_status_t routing_table::add_node_impl(node_entry e)
table_t::iterator i = find_bucket(e.id);
bucket_t& b = i->live_nodes;
bucket_t& rb = i->replacements;
int bucket_index = std::distance(m_buckets.begin(), i);
int bucket_size_limit = bucket_limit(bucket_index);
int const bucket_index = std::distance(m_buckets.begin(), i);
int const bucket_size_limit = bucket_limit(bucket_index);
bucket_t::iterator j;
@ -723,7 +736,7 @@ routing_table::add_node_status_t routing_table::add_node_impl(node_entry e)
// can we split the bucket?
// only nodes that haven't failed can split the bucket, and we can only
// split the last bucket
const bool can_split = (boost::next(i) == m_buckets.end()
bool const can_split = (boost::next(i) == m_buckets.end()
&& m_buckets.size() < 159)
&& e.fail_count() == 0
&& (i == m_buckets.begin() || boost::prior(i)->live_nodes.size() > 1);
@ -874,10 +887,13 @@ routing_table::add_node_status_t routing_table::add_node_impl(node_entry e)
*j = e;
m_ips.insert(e.addr().to_v4().to_bytes());
#ifndef TORRENT_DISABLE_LOGGING
char hex_id[41];
to_hex(e.id.data(), sha1_hash::size, hex_id);
m_log->log(dht_logger::routing_table, "replacing node with higher RTT: %s %s"
, hex_id, print_address(e.addr()).c_str());
if (m_log)
{
char hex_id[41];
to_hex(e.id.data(), sha1_hash::size, hex_id);
m_log->log(dht_logger::routing_table, "replacing node with higher RTT: %s %s"
, hex_id, print_address(e.addr()).c_str());
}
#endif
return node_added;
}
@ -931,8 +947,8 @@ void routing_table::split_bucket()
{
INVARIANT_CHECK;
int bucket_index = m_buckets.size()-1;
int bucket_size_limit = bucket_limit(bucket_index);
int const bucket_index = m_buckets.size()-1;
int const bucket_size_limit = bucket_limit(bucket_index);
TORRENT_ASSERT(int(m_buckets.back().live_nodes.size()) >= bucket_size_limit);
// this is the last bucket, and it's full already. Split
@ -946,10 +962,11 @@ void routing_table::split_bucket()
// move any node whose (160 - distane_exp(m_id, id)) >= (i - m_buckets.begin())
// to the new bucket
int new_bucket_size = bucket_limit(bucket_index + 1);
int const new_bucket_size = bucket_limit(bucket_index + 1);
for (bucket_t::iterator j = b.begin(); j != b.end();)
{
if (distance_exp(m_id, j->id) >= 159 - bucket_index)
int const d = distance_exp(m_id, j->id);
if (d >= 159 - bucket_index)
{
++j;
continue;
@ -1048,7 +1065,7 @@ void routing_table::node_failed(node_id const& nid, udp::endpoint const& ep)
#ifndef TORRENT_DISABLE_LOGGING
char hex_id[41];
to_hex(reinterpret_cast<char const*>(&nid[0]), 20, hex_id);
to_hex(nid.data(), 20, hex_id);
m_log->log(dht_logger::routing_table, "NODE FAILED id: %s ip: %s fails: %d pinged: %d up-time: %d"
, hex_id, print_endpoint(j->ep()).c_str()
, int(j->fail_count())
@ -1069,7 +1086,7 @@ void routing_table::node_failed(node_id const& nid, udp::endpoint const& ep)
#ifndef TORRENT_DISABLE_LOGGING
char hex_id[41];
to_hex(reinterpret_cast<char const*>(&nid[0]), 20, hex_id);
to_hex(nid.data(), 20, hex_id);
m_log->log(dht_logger::routing_table, "NODE FAILED id: %s ip: %s fails: %d pinged: %d up-time: %d"
, hex_id, print_endpoint(j->ep()).c_str()
, int(j->fail_count())

View File

@ -475,7 +475,7 @@ bool rpc_manager::invoke(entry& e, udp::endpoint target_addr
if (m_sock->send_packet(e, target_addr, 1))
{
m_transactions.insert(std::make_pair(tid,o));
m_transactions.insert(std::make_pair(tid, o));
#if TORRENT_USE_ASSERTS
o->m_was_sent = true;
#endif

View File

@ -5516,7 +5516,7 @@ retry:
, boost::ref(m_udp_socket), boost::cref(m_dht_settings)
, boost::ref(m_stats_counters)
, m_dht_storage_constructor
, &startup_state);
, startup_state);
for (std::vector<udp::endpoint>::iterator i = m_dht_router_nodes.begin()
, end(m_dht_router_nodes.end()); i != end; ++i)
@ -6731,12 +6731,17 @@ retry:
// since we have a new external IP now, we need to
// restart the DHT with a new node ID
#ifndef TORRENT_DISABLE_DHT
// TODO: 1 we only need to do this if our global IPv4 address has changed
// since the DHT (currently) only supports IPv4. Since restarting the DHT
// is kind of expensive, it would be nice to not do it unnecessarily
if (m_dht)
{
// TODO: 3 instead of restarting the whole DHT, change the external IP,
// node ID and re-jiggle the routing table in-place. A complete restart
// throws away all outstanding requests, which may be significant
// during bootstrap
entry s = m_dht->state();
int cur_state = 0;
int prev_state = 0;

View File

@ -1194,32 +1194,57 @@ TORRENT_TEST(dht)
// test kademlia functions
// this is a bit too expensive to do under valgrind
#ifndef TORRENT_USE_VALGRIND
for (int i = 0; i < 160; i += 8)
{
for (int j = 0; j < 160; j += 8)
{
node_id a(0);
a[(159-i) / 8] = 1 << (i & 7);
node_id b(0);
b[(159-j) / 8] = 1 << (j & 7);
int dist = distance_exp(a, b);
// distance_exp
TEST_CHECK(dist >= 0 && dist < 160);
TEST_CHECK(dist == ((i == j)?0:(std::max)(i, j)));
TEST_EQUAL(distance_exp(
to_hash("ffffffffffffffffffffffffffffffffffffffff"),
to_hash("0000000000000000000000000000000000000000"))
, 159);
for (int k = 0; k < 160; k += 8)
{
node_id c(0);
c[(159-k) / 8] = 1 << (k & 7);
TEST_EQUAL(distance_exp(
to_hash("ffffffffffffffffffffffffffffffffffffffff"),
to_hash("7fffffffffffffffffffffffffffffffffffffff"))
, 159);
bool cmp = compare_ref(a, b, c);
TEST_CHECK(cmp == (distance(a, c) < distance(b, c)));
}
}
}
#endif
TEST_EQUAL(distance_exp(
to_hash("ffffffffffffffffffffffffffffffffffffffff"),
to_hash("ffffffffffffffffffffffffffffffffffffffff"))
, 0);
TEST_EQUAL(distance_exp(
to_hash("ffffffffffffffffffffffffffffffffffffffff"),
to_hash("fffffffffffffffffffffffffffffffffffffffe"))
, 0);
TEST_EQUAL(distance_exp(
to_hash("8000000000000000000000000000000000000000"),
to_hash("fffffffffffffffffffffffffffffffffffffffe"))
, 158);
TEST_EQUAL(distance_exp(
to_hash("c000000000000000000000000000000000000000"),
to_hash("fffffffffffffffffffffffffffffffffffffffe"))
, 157);
TEST_EQUAL(distance_exp(
to_hash("e000000000000000000000000000000000000000"),
to_hash("fffffffffffffffffffffffffffffffffffffffe"))
, 156);
TEST_EQUAL(distance_exp(
to_hash("f000000000000000000000000000000000000000"),
to_hash("fffffffffffffffffffffffffffffffffffffffe"))
, 155);
TEST_EQUAL(distance_exp(
to_hash("f8f2340985723049587230495872304958703294"),
to_hash("f743589043r890f023980f90e203980d090c3840"))
, 155);
TEST_EQUAL(distance_exp(
to_hash("ffff740985723049587230495872304958703294"),
to_hash("ffff889043r890f023980f90e203980d090c3840"))
, 159 - 16);
{
// test kademlia routing table
@ -2500,5 +2525,31 @@ TORRENT_TEST(rpc_invalid_error_msg)
TEST_EQUAL(found, true);
}
// test bucket distribution
TORRENT_TEST(node_id_bucket_distribution)
{
int nodes_per_bucket[160] = {0};
dht::node_id reference_id = generate_id(rand_v4());
int const num_samples = 100000;
for (int i = 0; i < num_samples; ++i)
{
dht::node_id nid = generate_id(rand_v4());
int const bucket = 159 - distance_exp(reference_id, nid);
++nodes_per_bucket[bucket];
}
for (int i = 0; i < 25; ++i)
{
printf("%3d ", nodes_per_bucket[i]);
}
printf("\n");
int expected = num_samples / 2;
for (int i = 0; i < 25; ++i)
{
TEST_CHECK(std::abs(nodes_per_bucket[i] - expected) < num_samples / 20);
expected /= 2;
}
}
#endif