From 01e6b9385405788da3908a60e67b3e36ed66ac98 Mon Sep 17 00:00:00 2001 From: arvidn Date: Fri, 1 Jan 2016 09:21:07 -0500 Subject: [PATCH] simplify and improve unit test for distance_exp. make some immutable variables const in the DHT implementation. instead of waking up periodically just to check if it's time to refresh the DHT secret key, set the timer to only wake up to refresh the key. If we don't have a DHT observer (to ask for our external IP) or if we don't know our external IP, don't generate a node ID based on 0.0.0.0, just generate a random ID instead. Simplified and improved node replacement logic in the routing table a little bit --- include/libtorrent/kademlia/dht_tracker.hpp | 7 +- include/libtorrent/kademlia/node.hpp | 3 +- include/libtorrent/kademlia/node_entry.hpp | 2 +- include/libtorrent/kademlia/routing_table.hpp | 3 + include/libtorrent/sha1_hash.hpp | 2 +- src/kademlia/dht_tracker.cpp | 47 ++++----- src/kademlia/node.cpp | 11 ++- src/kademlia/node_entry.cpp | 2 +- src/kademlia/node_id.cpp | 6 +- src/kademlia/routing_table.cpp | 61 +++++++----- src/kademlia/rpc_manager.cpp | 2 +- src/session_impl.cpp | 7 +- test/test_dht.cpp | 97 ++++++++++++++----- 13 files changed, 162 insertions(+), 88 deletions(-) diff --git a/include/libtorrent/kademlia/dht_tracker.hpp b/include/libtorrent/kademlia/dht_tracker.hpp index 0f2749db3..5732badec 100644 --- a/include/libtorrent/kademlia/dht_tracker.hpp +++ b/include/libtorrent/kademlia/dht_tracker.hpp @@ -66,7 +66,7 @@ namespace libtorrent { namespace dht { struct dht_tracker; - struct dht_tracker + struct dht_tracker TORRENT_FINAL : udp_socket_interface , udp_socket_observer , boost::enable_shared_from_this @@ -74,7 +74,7 @@ namespace libtorrent { namespace dht dht_tracker(dht_observer* observer, rate_limited_udp_socket& sock , dht_settings const& settings, counters& cnt , dht_storage_constructor_type storage_constructor - , entry const* state = 0); + , entry const& state); virtual ~dht_tracker(); void start(entry const& bootstrap @@ -137,7 +137,7 @@ namespace libtorrent { namespace dht void connection_timeout(error_code const& e); void refresh_timeout(error_code const& e); - void tick(error_code const& e); + void refresh_key(error_code const& e); // implements udp_socket_interface virtual bool has_quota(); @@ -157,7 +157,6 @@ namespace libtorrent { namespace dht std::vector m_send_buf; dos_blocker m_blocker; - time_point m_last_new_key; deadline_timer m_timer; deadline_timer m_connection_timer; deadline_timer m_refresh_timer; diff --git a/include/libtorrent/kademlia/node.hpp b/include/libtorrent/kademlia/node.hpp index 2075fb8b4..108c75b1f 100644 --- a/include/libtorrent/kademlia/node.hpp +++ b/include/libtorrent/kademlia/node.hpp @@ -202,7 +202,7 @@ public: counters& stats_counters() const { return m_counters; } dht_observer* observer() const { return m_observer; } -protected: +private: void send_single_refresh(udp::endpoint const& ep, int bucket , node_id const& id = node_id()); @@ -213,7 +213,6 @@ protected: libtorrent::dht_settings const& m_settings; -private: typedef libtorrent::mutex mutex_t; mutex_t m_mutex; diff --git a/include/libtorrent/kademlia/node_entry.hpp b/include/libtorrent/kademlia/node_entry.hpp index bf6b32639..ad03f17b2 100644 --- a/include/libtorrent/kademlia/node_entry.hpp +++ b/include/libtorrent/kademlia/node_entry.hpp @@ -49,7 +49,7 @@ struct TORRENT_EXTRA_EXPORT node_entry node_entry(udp::endpoint ep); node_entry(); void update_rtt(int new_rtt); - + bool pinged() const { return timeout_count != 0xff; } void set_pinged() { if (timeout_count == 0xff) timeout_count = 0; } void timed_out() { if (pinged() && timeout_count < 0xfe) ++timeout_count; } diff --git a/include/libtorrent/kademlia/routing_table.hpp b/include/libtorrent/kademlia/routing_table.hpp index 0dd8095da..265154863 100644 --- a/include/libtorrent/kademlia/routing_table.hpp +++ b/include/libtorrent/kademlia/routing_table.hpp @@ -86,6 +86,9 @@ struct routing_table_node class TORRENT_EXTRA_EXPORT routing_table : boost::noncopyable { public: + // TODO: 3 to improve memory locality and scanning performance, turn the + // routing table into a single vector with boundaries for the nodes instead. + // Perhaps replacement nodes should be in a separate vector. typedef std::vector table_t; routing_table(node_id const& id, int bucket_size diff --git a/include/libtorrent/sha1_hash.hpp b/include/libtorrent/sha1_hash.hpp index 731d34a5b..3e694e206 100644 --- a/include/libtorrent/sha1_hash.hpp +++ b/include/libtorrent/sha1_hash.hpp @@ -231,7 +231,7 @@ namespace libtorrent } // returns a bit-wise negated copy of the sha1-hash - sha1_hash operator~() + sha1_hash operator~() const { sha1_hash ret; for (int i = 0; i < number_size; ++i) diff --git a/src/kademlia/dht_tracker.cpp b/src/kademlia/dht_tracker.cpp index 29a511835..12bc0ced2 100644 --- a/src/kademlia/dht_tracker.cpp +++ b/src/kademlia/dht_tracker.cpp @@ -65,27 +65,21 @@ using libtorrent::dht::packet_t; using libtorrent::dht::msg; using libtorrent::detail::write_endpoint; -enum -{ - key_refresh = 5 // generate a new write token key every 5 minutes -}; - -namespace -{ - const int tick_period = 1; // minutes -} - namespace libtorrent { namespace dht { void incoming_error(entry& e, char const* msg); namespace { - node_id extract_node_id(entry const* e) + // generate a new write token key every 5 minutes + time_duration const key_refresh + = duration_cast(minutes(5)); + + node_id extract_node_id(entry const& e) { - if (e == 0 || e->type() != entry::dictionary_t) return (node_id::min)(); - entry const* nid = e->find_key("node-id"); - if (nid == 0 || nid->type() != entry::string_t || nid->string().length() != 20) + if (e.type() != entry::dictionary_t) return (node_id::min)(); + entry const* nid = e.find_key("node-id"); + if (nid == NULL || nid->type() != entry::string_t || nid->string().length() != 20) return (node_id::min)(); return node_id(nid->string().c_str()); } @@ -99,12 +93,11 @@ namespace libtorrent { namespace dht , dht_settings const& settings , counters& cnt , dht_storage_constructor_type storage_constructor - , entry const* state) + , entry const& state) : m_counters(cnt) , m_dht(this, settings, extract_node_id(state), observer, cnt, storage_constructor) , m_sock(sock) , m_log(observer) - , m_last_new_key(clock_type::now() - minutes(int(key_refresh))) , m_timer(sock.get_io_service()) , m_connection_timer(sock.get_io_service()) , m_refresh_timer(sock.get_io_service()) @@ -137,8 +130,7 @@ namespace libtorrent { namespace dht } error_code ec; - m_timer.expires_from_now(seconds(1), ec); - m_timer.async_wait(boost::bind(&dht_tracker::tick, self(), _1)); + refresh_key(ec); m_connection_timer.expires_from_now(seconds(1), ec); m_connection_timer.async_wait( @@ -203,29 +195,26 @@ namespace libtorrent { namespace dht boost::bind(&dht_tracker::refresh_timeout, self(), _1)); } - void dht_tracker::tick(error_code const& e) + void dht_tracker::refresh_key(error_code const& e) { if (e || m_abort) return; error_code ec; - m_timer.expires_from_now(minutes(tick_period), ec); - m_timer.async_wait(boost::bind(&dht_tracker::tick, self(), _1)); + m_timer.expires_from_now(key_refresh, ec); + m_timer.async_wait(boost::bind(&dht_tracker::refresh_key, self(), _1)); - time_point now = clock_type::now(); - if (now - minutes(int(key_refresh)) > m_last_new_key) - { - m_last_new_key = now; - m_dht.new_write_key(); + m_dht.new_write_key(); #ifndef TORRENT_DISABLE_LOGGING - m_log->log(dht_logger::tracker, "*** new write key***"); + m_log->log(dht_logger::tracker, "*** new write key***"); #endif - } + } +/* #if defined TORRENT_DEBUG && TORRENT_USE_IOSTREAM std::ofstream st("dht_routing_table_state.txt", std::ios_base::trunc); m_dht.print_state(st); #endif - } +*/ void dht_tracker::get_peers(sha1_hash const& ih , boost::function const&)> f) diff --git a/src/kademlia/node.cpp b/src/kademlia/node.cpp index 20a4f1066..0d9a666e9 100644 --- a/src/kademlia/node.cpp +++ b/src/kademlia/node.cpp @@ -77,9 +77,17 @@ node_id calculate_node_id(node_id const& nid, dht_observer* observer) { address external_address; if (observer) external_address = observer->external_address(); + + // if we don't have an observer, don't pretend that external_address is valid + // generating an ID based on 0.0.0.0 would be terrible. random is better + if (!observer || external_address == address()) + { + return generate_random_id(); + } + if (nid == (node_id::min)() || !verify_id(nid, external_address)) return generate_id(external_address); - + return nid; } @@ -584,7 +592,6 @@ struct ping_observer : observer } }; - void node::tick() { // every now and then we refresh our own ID, just to keep diff --git a/src/kademlia/node_entry.cpp b/src/kademlia/node_entry.cpp index 5e35883bd..ecfdbcc74 100644 --- a/src/kademlia/node_entry.cpp +++ b/src/kademlia/node_entry.cpp @@ -75,7 +75,7 @@ namespace libtorrent { namespace dht first_seen = aux::time_now(); #endif } - + void node_entry::update_rtt(int new_rtt) { TORRENT_ASSERT(new_rtt <= 0xffff); diff --git a/src/kademlia/node_id.cpp b/src/kademlia/node_id.cpp index 4bb3e7931..52a26119a 100644 --- a/src/kademlia/node_id.cpp +++ b/src/kademlia/node_id.cpp @@ -51,6 +51,7 @@ node_id distance(node_id const& n1, node_id const& n2) { node_id ret; node_id::iterator k = ret.begin(); + // TODO: 3 the XORing should be done at full words instead of bytes for (node_id::const_iterator i = n1.begin(), j = n2.begin() , end(n1.end()); i != end; ++i, ++j, ++k) { @@ -62,6 +63,7 @@ node_id distance(node_id const& n1, node_id const& n2) // returns true if: distance(n1, ref) < distance(n2, ref) bool compare_ref(node_id const& n1, node_id const& n2, node_id const& ref) { + // TODO: 3 the XORing should be done at full words instead of bytes for (node_id::const_iterator i = n1.begin(), j = n2.begin() , k = ref.begin(), end(n1.end()); i != end; ++i, ++j, ++k) { @@ -77,6 +79,8 @@ bool compare_ref(node_id const& n1, node_id const& n2, node_id const& ref) // useful for finding out which bucket a node belongs to int distance_exp(node_id const& n1, node_id const& n2) { + // TODO: 3 the xoring should be done at full words and _builtin_clz() could + // be used as the last step int byte = node_id::size - 1; for (node_id::const_iterator i = n1.begin(), j = n2.begin() , end(n1.end()); i != end; ++i, ++j, --byte) @@ -87,7 +91,7 @@ int distance_exp(node_id const& n1, node_id const& n2) // we have found the first non-zero byte // return the bit-number of the first bit // that differs - int bit = byte * 8; + int const bit = byte * 8; for (int b = 7; b >= 0; --b) if (t >= (1 << b)) return bit + b; return bit; diff --git a/src/kademlia/routing_table.cpp b/src/kademlia/routing_table.cpp index 67c73ef24..4c9eb37fb 100644 --- a/src/kademlia/routing_table.cpp +++ b/src/kademlia/routing_table.cpp @@ -582,11 +582,13 @@ routing_table::add_node_status_t routing_table::add_node_impl(node_entry e) table_t::iterator existing_bucket; node_entry* existing = find_node(e.ep(), &existing_bucket); - if (!e.pinged() || existing == 0) + if (existing == 0) { - // the new node is not pinged, or it's not an existing node - // we should ignore it, unless we allow duplicate IPs in our - // routing table + // the node we're trying to add is not a match with an existing node. we + // should ignore it, unless we allow duplicate IPs in our routing + // table. There could be a node with the same IP, but with a different + // port. m_ips just contain IP addresses, whereas the lookup we just + // performed was for full endpoints (address, port). if (m_settings.restrict_routing_ips) { #ifndef TORRENT_DISABLE_LOGGING @@ -598,16 +600,27 @@ routing_table::add_node_status_t routing_table::add_node_impl(node_entry e) return failed_to_add; } } - else if (existing && existing->id == e.id) + else if (existing->id == e.id) { // if the node ID is the same, just update the failcount - // and be done with it + // and be done with it. existing->timeout_count = 0; - existing->update_rtt(e.rtt); - existing->last_queried = e.last_queried; + if (e.pinged()) + { + existing->update_rtt(e.rtt); + existing->last_queried = e.last_queried; + } return node_added; } - else if (existing) + else if (!e.pinged()) + { + // this may be a routing table poison attack. If we haven't confirmed + // that this peer actually exist with this new node ID yet, ignore it. + // we definitely don't want to replace the existing entry with this one + if (m_settings.restrict_routing_ips) + return failed_to_add; + } + else { TORRENT_ASSERT(existing->id != e.id); // this is the same IP and port, but with @@ -620,8 +633,8 @@ routing_table::add_node_status_t routing_table::add_node_impl(node_entry e) table_t::iterator i = find_bucket(e.id); bucket_t& b = i->live_nodes; bucket_t& rb = i->replacements; - int bucket_index = std::distance(m_buckets.begin(), i); - int bucket_size_limit = bucket_limit(bucket_index); + int const bucket_index = std::distance(m_buckets.begin(), i); + int const bucket_size_limit = bucket_limit(bucket_index); bucket_t::iterator j; @@ -723,7 +736,7 @@ routing_table::add_node_status_t routing_table::add_node_impl(node_entry e) // can we split the bucket? // only nodes that haven't failed can split the bucket, and we can only // split the last bucket - const bool can_split = (boost::next(i) == m_buckets.end() + bool const can_split = (boost::next(i) == m_buckets.end() && m_buckets.size() < 159) && e.fail_count() == 0 && (i == m_buckets.begin() || boost::prior(i)->live_nodes.size() > 1); @@ -874,10 +887,13 @@ routing_table::add_node_status_t routing_table::add_node_impl(node_entry e) *j = e; m_ips.insert(e.addr().to_v4().to_bytes()); #ifndef TORRENT_DISABLE_LOGGING - char hex_id[41]; - to_hex(e.id.data(), sha1_hash::size, hex_id); - m_log->log(dht_logger::routing_table, "replacing node with higher RTT: %s %s" - , hex_id, print_address(e.addr()).c_str()); + if (m_log) + { + char hex_id[41]; + to_hex(e.id.data(), sha1_hash::size, hex_id); + m_log->log(dht_logger::routing_table, "replacing node with higher RTT: %s %s" + , hex_id, print_address(e.addr()).c_str()); + } #endif return node_added; } @@ -931,8 +947,8 @@ void routing_table::split_bucket() { INVARIANT_CHECK; - int bucket_index = m_buckets.size()-1; - int bucket_size_limit = bucket_limit(bucket_index); + int const bucket_index = m_buckets.size()-1; + int const bucket_size_limit = bucket_limit(bucket_index); TORRENT_ASSERT(int(m_buckets.back().live_nodes.size()) >= bucket_size_limit); // this is the last bucket, and it's full already. Split @@ -946,10 +962,11 @@ void routing_table::split_bucket() // move any node whose (160 - distane_exp(m_id, id)) >= (i - m_buckets.begin()) // to the new bucket - int new_bucket_size = bucket_limit(bucket_index + 1); + int const new_bucket_size = bucket_limit(bucket_index + 1); for (bucket_t::iterator j = b.begin(); j != b.end();) { - if (distance_exp(m_id, j->id) >= 159 - bucket_index) + int const d = distance_exp(m_id, j->id); + if (d >= 159 - bucket_index) { ++j; continue; @@ -1048,7 +1065,7 @@ void routing_table::node_failed(node_id const& nid, udp::endpoint const& ep) #ifndef TORRENT_DISABLE_LOGGING char hex_id[41]; - to_hex(reinterpret_cast(&nid[0]), 20, hex_id); + to_hex(nid.data(), 20, hex_id); m_log->log(dht_logger::routing_table, "NODE FAILED id: %s ip: %s fails: %d pinged: %d up-time: %d" , hex_id, print_endpoint(j->ep()).c_str() , int(j->fail_count()) @@ -1069,7 +1086,7 @@ void routing_table::node_failed(node_id const& nid, udp::endpoint const& ep) #ifndef TORRENT_DISABLE_LOGGING char hex_id[41]; - to_hex(reinterpret_cast(&nid[0]), 20, hex_id); + to_hex(nid.data(), 20, hex_id); m_log->log(dht_logger::routing_table, "NODE FAILED id: %s ip: %s fails: %d pinged: %d up-time: %d" , hex_id, print_endpoint(j->ep()).c_str() , int(j->fail_count()) diff --git a/src/kademlia/rpc_manager.cpp b/src/kademlia/rpc_manager.cpp index 613619d44..4f46cad71 100644 --- a/src/kademlia/rpc_manager.cpp +++ b/src/kademlia/rpc_manager.cpp @@ -475,7 +475,7 @@ bool rpc_manager::invoke(entry& e, udp::endpoint target_addr if (m_sock->send_packet(e, target_addr, 1)) { - m_transactions.insert(std::make_pair(tid,o)); + m_transactions.insert(std::make_pair(tid, o)); #if TORRENT_USE_ASSERTS o->m_was_sent = true; #endif diff --git a/src/session_impl.cpp b/src/session_impl.cpp index 9d31d85d8..18656e1f3 100644 --- a/src/session_impl.cpp +++ b/src/session_impl.cpp @@ -5516,7 +5516,7 @@ retry: , boost::ref(m_udp_socket), boost::cref(m_dht_settings) , boost::ref(m_stats_counters) , m_dht_storage_constructor - , &startup_state); + , startup_state); for (std::vector::iterator i = m_dht_router_nodes.begin() , end(m_dht_router_nodes.end()); i != end; ++i) @@ -6731,12 +6731,17 @@ retry: // since we have a new external IP now, we need to // restart the DHT with a new node ID + #ifndef TORRENT_DISABLE_DHT // TODO: 1 we only need to do this if our global IPv4 address has changed // since the DHT (currently) only supports IPv4. Since restarting the DHT // is kind of expensive, it would be nice to not do it unnecessarily if (m_dht) { + // TODO: 3 instead of restarting the whole DHT, change the external IP, + // node ID and re-jiggle the routing table in-place. A complete restart + // throws away all outstanding requests, which may be significant + // during bootstrap entry s = m_dht->state(); int cur_state = 0; int prev_state = 0; diff --git a/test/test_dht.cpp b/test/test_dht.cpp index cb1b1cd5f..14a1d8f20 100644 --- a/test/test_dht.cpp +++ b/test/test_dht.cpp @@ -1194,32 +1194,57 @@ TORRENT_TEST(dht) // test kademlia functions - // this is a bit too expensive to do under valgrind -#ifndef TORRENT_USE_VALGRIND - for (int i = 0; i < 160; i += 8) - { - for (int j = 0; j < 160; j += 8) - { - node_id a(0); - a[(159-i) / 8] = 1 << (i & 7); - node_id b(0); - b[(159-j) / 8] = 1 << (j & 7); - int dist = distance_exp(a, b); + // distance_exp - TEST_CHECK(dist >= 0 && dist < 160); - TEST_CHECK(dist == ((i == j)?0:(std::max)(i, j))); + TEST_EQUAL(distance_exp( + to_hash("ffffffffffffffffffffffffffffffffffffffff"), + to_hash("0000000000000000000000000000000000000000")) + , 159); - for (int k = 0; k < 160; k += 8) - { - node_id c(0); - c[(159-k) / 8] = 1 << (k & 7); + TEST_EQUAL(distance_exp( + to_hash("ffffffffffffffffffffffffffffffffffffffff"), + to_hash("7fffffffffffffffffffffffffffffffffffffff")) + , 159); - bool cmp = compare_ref(a, b, c); - TEST_CHECK(cmp == (distance(a, c) < distance(b, c))); - } - } - } -#endif + TEST_EQUAL(distance_exp( + to_hash("ffffffffffffffffffffffffffffffffffffffff"), + to_hash("ffffffffffffffffffffffffffffffffffffffff")) + , 0); + + TEST_EQUAL(distance_exp( + to_hash("ffffffffffffffffffffffffffffffffffffffff"), + to_hash("fffffffffffffffffffffffffffffffffffffffe")) + , 0); + + TEST_EQUAL(distance_exp( + to_hash("8000000000000000000000000000000000000000"), + to_hash("fffffffffffffffffffffffffffffffffffffffe")) + , 158); + + TEST_EQUAL(distance_exp( + to_hash("c000000000000000000000000000000000000000"), + to_hash("fffffffffffffffffffffffffffffffffffffffe")) + , 157); + + TEST_EQUAL(distance_exp( + to_hash("e000000000000000000000000000000000000000"), + to_hash("fffffffffffffffffffffffffffffffffffffffe")) + , 156); + + TEST_EQUAL(distance_exp( + to_hash("f000000000000000000000000000000000000000"), + to_hash("fffffffffffffffffffffffffffffffffffffffe")) + , 155); + + TEST_EQUAL(distance_exp( + to_hash("f8f2340985723049587230495872304958703294"), + to_hash("f743589043r890f023980f90e203980d090c3840")) + , 155); + + TEST_EQUAL(distance_exp( + to_hash("ffff740985723049587230495872304958703294"), + to_hash("ffff889043r890f023980f90e203980d090c3840")) + , 159 - 16); { // test kademlia routing table @@ -2500,5 +2525,31 @@ TORRENT_TEST(rpc_invalid_error_msg) TEST_EQUAL(found, true); } +// test bucket distribution +TORRENT_TEST(node_id_bucket_distribution) +{ + int nodes_per_bucket[160] = {0}; + dht::node_id reference_id = generate_id(rand_v4()); + int const num_samples = 100000; + for (int i = 0; i < num_samples; ++i) + { + dht::node_id nid = generate_id(rand_v4()); + int const bucket = 159 - distance_exp(reference_id, nid); + ++nodes_per_bucket[bucket]; + } + + for (int i = 0; i < 25; ++i) + { + printf("%3d ", nodes_per_bucket[i]); + } + printf("\n"); + + int expected = num_samples / 2; + for (int i = 0; i < 25; ++i) + { + TEST_CHECK(std::abs(nodes_per_bucket[i] - expected) < num_samples / 20); + expected /= 2; + } +} #endif