diff --git a/include/libtorrent/kademlia/dht_tracker.hpp b/include/libtorrent/kademlia/dht_tracker.hpp index 0f2749db3..ed8be35b4 100644 --- a/include/libtorrent/kademlia/dht_tracker.hpp +++ b/include/libtorrent/kademlia/dht_tracker.hpp @@ -66,7 +66,7 @@ namespace libtorrent { namespace dht { struct dht_tracker; - struct dht_tracker + struct dht_tracker TORRENT_FINAL : udp_socket_interface , udp_socket_observer , boost::enable_shared_from_this @@ -74,7 +74,7 @@ namespace libtorrent { namespace dht dht_tracker(dht_observer* observer, rate_limited_udp_socket& sock , dht_settings const& settings, counters& cnt , dht_storage_constructor_type storage_constructor - , entry const* state = 0); + , entry const& state); virtual ~dht_tracker(); void start(entry const& bootstrap @@ -137,7 +137,7 @@ namespace libtorrent { namespace dht void connection_timeout(error_code const& e); void refresh_timeout(error_code const& e); - void tick(error_code const& e); + void refresh_key(error_code const& e); // implements udp_socket_interface virtual bool has_quota(); @@ -157,8 +157,7 @@ namespace libtorrent { namespace dht std::vector m_send_buf; dos_blocker m_blocker; - time_point m_last_new_key; - deadline_timer m_timer; + deadline_timer m_key_refresh_timer; deadline_timer m_connection_timer; deadline_timer m_refresh_timer; dht_settings const& m_settings; diff --git a/include/libtorrent/kademlia/node.hpp b/include/libtorrent/kademlia/node.hpp index 2075fb8b4..108c75b1f 100644 --- a/include/libtorrent/kademlia/node.hpp +++ b/include/libtorrent/kademlia/node.hpp @@ -202,7 +202,7 @@ public: counters& stats_counters() const { return m_counters; } dht_observer* observer() const { return m_observer; } -protected: +private: void send_single_refresh(udp::endpoint const& ep, int bucket , node_id const& id = node_id()); @@ -213,7 +213,6 @@ protected: libtorrent::dht_settings const& m_settings; -private: typedef libtorrent::mutex mutex_t; mutex_t m_mutex; diff --git a/include/libtorrent/kademlia/node_entry.hpp b/include/libtorrent/kademlia/node_entry.hpp index bf6b32639..ad03f17b2 100644 --- a/include/libtorrent/kademlia/node_entry.hpp +++ b/include/libtorrent/kademlia/node_entry.hpp @@ -49,7 +49,7 @@ struct TORRENT_EXTRA_EXPORT node_entry node_entry(udp::endpoint ep); node_entry(); void update_rtt(int new_rtt); - + bool pinged() const { return timeout_count != 0xff; } void set_pinged() { if (timeout_count == 0xff) timeout_count = 0; } void timed_out() { if (pinged() && timeout_count < 0xfe) ++timeout_count; } diff --git a/include/libtorrent/kademlia/routing_table.hpp b/include/libtorrent/kademlia/routing_table.hpp index 0dd8095da..265154863 100644 --- a/include/libtorrent/kademlia/routing_table.hpp +++ b/include/libtorrent/kademlia/routing_table.hpp @@ -86,6 +86,9 @@ struct routing_table_node class TORRENT_EXTRA_EXPORT routing_table : boost::noncopyable { public: + // TODO: 3 to improve memory locality and scanning performance, turn the + // routing table into a single vector with boundaries for the nodes instead. + // Perhaps replacement nodes should be in a separate vector. typedef std::vector table_t; routing_table(node_id const& id, int bucket_size diff --git a/include/libtorrent/sha1_hash.hpp b/include/libtorrent/sha1_hash.hpp index 731d34a5b..3e694e206 100644 --- a/include/libtorrent/sha1_hash.hpp +++ b/include/libtorrent/sha1_hash.hpp @@ -231,7 +231,7 @@ namespace libtorrent } // returns a bit-wise negated copy of the sha1-hash - sha1_hash operator~() + sha1_hash operator~() const { sha1_hash ret; for (int i = 0; i < number_size; ++i) diff --git a/src/kademlia/dht_tracker.cpp b/src/kademlia/dht_tracker.cpp index 29a511835..ecdd6c745 100644 --- a/src/kademlia/dht_tracker.cpp +++ b/src/kademlia/dht_tracker.cpp @@ -65,27 +65,21 @@ using libtorrent::dht::packet_t; using libtorrent::dht::msg; using libtorrent::detail::write_endpoint; -enum -{ - key_refresh = 5 // generate a new write token key every 5 minutes -}; - -namespace -{ - const int tick_period = 1; // minutes -} - namespace libtorrent { namespace dht { void incoming_error(entry& e, char const* msg); namespace { - node_id extract_node_id(entry const* e) + // generate a new write token key every 5 minutes + time_duration const key_refresh + = duration_cast(minutes(5)); + + node_id extract_node_id(entry const& e) { - if (e == 0 || e->type() != entry::dictionary_t) return (node_id::min)(); - entry const* nid = e->find_key("node-id"); - if (nid == 0 || nid->type() != entry::string_t || nid->string().length() != 20) + if (e.type() != entry::dictionary_t) return (node_id::min)(); + entry const* nid = e.find_key("node-id"); + if (nid == NULL || nid->type() != entry::string_t || nid->string().length() != 20) return (node_id::min)(); return node_id(nid->string().c_str()); } @@ -99,13 +93,12 @@ namespace libtorrent { namespace dht , dht_settings const& settings , counters& cnt , dht_storage_constructor_type storage_constructor - , entry const* state) + , entry const& state) : m_counters(cnt) , m_dht(this, settings, extract_node_id(state), observer, cnt, storage_constructor) , m_sock(sock) , m_log(observer) - , m_last_new_key(clock_type::now() - minutes(int(key_refresh))) - , m_timer(sock.get_io_service()) + , m_key_refresh_timer(sock.get_io_service()) , m_connection_timer(sock.get_io_service()) , m_refresh_timer(sock.get_io_service()) , m_settings(settings) @@ -137,8 +130,7 @@ namespace libtorrent { namespace dht } error_code ec; - m_timer.expires_from_now(seconds(1), ec); - m_timer.async_wait(boost::bind(&dht_tracker::tick, self(), _1)); + refresh_key(ec); m_connection_timer.expires_from_now(seconds(1), ec); m_connection_timer.async_wait( @@ -153,7 +145,7 @@ namespace libtorrent { namespace dht { m_abort = true; error_code ec; - m_timer.cancel(ec); + m_key_refresh_timer.cancel(ec); m_connection_timer.cancel(ec); m_refresh_timer.cancel(ec); m_host_resolver.cancel(); @@ -203,29 +195,26 @@ namespace libtorrent { namespace dht boost::bind(&dht_tracker::refresh_timeout, self(), _1)); } - void dht_tracker::tick(error_code const& e) + void dht_tracker::refresh_key(error_code const& e) { if (e || m_abort) return; error_code ec; - m_timer.expires_from_now(minutes(tick_period), ec); - m_timer.async_wait(boost::bind(&dht_tracker::tick, self(), _1)); + m_key_refresh_timer.expires_from_now(key_refresh, ec); + m_key_refresh_timer.async_wait(boost::bind(&dht_tracker::refresh_key, self(), _1)); - time_point now = clock_type::now(); - if (now - minutes(int(key_refresh)) > m_last_new_key) - { - m_last_new_key = now; - m_dht.new_write_key(); + m_dht.new_write_key(); #ifndef TORRENT_DISABLE_LOGGING - m_log->log(dht_logger::tracker, "*** new write key***"); + m_log->log(dht_logger::tracker, "*** new write key***"); #endif - } + } +/* #if defined TORRENT_DEBUG && TORRENT_USE_IOSTREAM std::ofstream st("dht_routing_table_state.txt", std::ios_base::trunc); m_dht.print_state(st); #endif - } +*/ void dht_tracker::get_peers(sha1_hash const& ih , boost::function const&)> f) diff --git a/src/kademlia/node.cpp b/src/kademlia/node.cpp index 20a4f1066..0d9a666e9 100644 --- a/src/kademlia/node.cpp +++ b/src/kademlia/node.cpp @@ -77,9 +77,17 @@ node_id calculate_node_id(node_id const& nid, dht_observer* observer) { address external_address; if (observer) external_address = observer->external_address(); + + // if we don't have an observer, don't pretend that external_address is valid + // generating an ID based on 0.0.0.0 would be terrible. random is better + if (!observer || external_address == address()) + { + return generate_random_id(); + } + if (nid == (node_id::min)() || !verify_id(nid, external_address)) return generate_id(external_address); - + return nid; } @@ -584,7 +592,6 @@ struct ping_observer : observer } }; - void node::tick() { // every now and then we refresh our own ID, just to keep diff --git a/src/kademlia/node_entry.cpp b/src/kademlia/node_entry.cpp index 5e35883bd..ecfdbcc74 100644 --- a/src/kademlia/node_entry.cpp +++ b/src/kademlia/node_entry.cpp @@ -75,7 +75,7 @@ namespace libtorrent { namespace dht first_seen = aux::time_now(); #endif } - + void node_entry::update_rtt(int new_rtt) { TORRENT_ASSERT(new_rtt <= 0xffff); diff --git a/src/kademlia/node_id.cpp b/src/kademlia/node_id.cpp index 4bb3e7931..52a26119a 100644 --- a/src/kademlia/node_id.cpp +++ b/src/kademlia/node_id.cpp @@ -51,6 +51,7 @@ node_id distance(node_id const& n1, node_id const& n2) { node_id ret; node_id::iterator k = ret.begin(); + // TODO: 3 the XORing should be done at full words instead of bytes for (node_id::const_iterator i = n1.begin(), j = n2.begin() , end(n1.end()); i != end; ++i, ++j, ++k) { @@ -62,6 +63,7 @@ node_id distance(node_id const& n1, node_id const& n2) // returns true if: distance(n1, ref) < distance(n2, ref) bool compare_ref(node_id const& n1, node_id const& n2, node_id const& ref) { + // TODO: 3 the XORing should be done at full words instead of bytes for (node_id::const_iterator i = n1.begin(), j = n2.begin() , k = ref.begin(), end(n1.end()); i != end; ++i, ++j, ++k) { @@ -77,6 +79,8 @@ bool compare_ref(node_id const& n1, node_id const& n2, node_id const& ref) // useful for finding out which bucket a node belongs to int distance_exp(node_id const& n1, node_id const& n2) { + // TODO: 3 the xoring should be done at full words and _builtin_clz() could + // be used as the last step int byte = node_id::size - 1; for (node_id::const_iterator i = n1.begin(), j = n2.begin() , end(n1.end()); i != end; ++i, ++j, --byte) @@ -87,7 +91,7 @@ int distance_exp(node_id const& n1, node_id const& n2) // we have found the first non-zero byte // return the bit-number of the first bit // that differs - int bit = byte * 8; + int const bit = byte * 8; for (int b = 7; b >= 0; --b) if (t >= (1 << b)) return bit + b; return bit; diff --git a/src/kademlia/routing_table.cpp b/src/kademlia/routing_table.cpp index 67c73ef24..4c9eb37fb 100644 --- a/src/kademlia/routing_table.cpp +++ b/src/kademlia/routing_table.cpp @@ -582,11 +582,13 @@ routing_table::add_node_status_t routing_table::add_node_impl(node_entry e) table_t::iterator existing_bucket; node_entry* existing = find_node(e.ep(), &existing_bucket); - if (!e.pinged() || existing == 0) + if (existing == 0) { - // the new node is not pinged, or it's not an existing node - // we should ignore it, unless we allow duplicate IPs in our - // routing table + // the node we're trying to add is not a match with an existing node. we + // should ignore it, unless we allow duplicate IPs in our routing + // table. There could be a node with the same IP, but with a different + // port. m_ips just contain IP addresses, whereas the lookup we just + // performed was for full endpoints (address, port). if (m_settings.restrict_routing_ips) { #ifndef TORRENT_DISABLE_LOGGING @@ -598,16 +600,27 @@ routing_table::add_node_status_t routing_table::add_node_impl(node_entry e) return failed_to_add; } } - else if (existing && existing->id == e.id) + else if (existing->id == e.id) { // if the node ID is the same, just update the failcount - // and be done with it + // and be done with it. existing->timeout_count = 0; - existing->update_rtt(e.rtt); - existing->last_queried = e.last_queried; + if (e.pinged()) + { + existing->update_rtt(e.rtt); + existing->last_queried = e.last_queried; + } return node_added; } - else if (existing) + else if (!e.pinged()) + { + // this may be a routing table poison attack. If we haven't confirmed + // that this peer actually exist with this new node ID yet, ignore it. + // we definitely don't want to replace the existing entry with this one + if (m_settings.restrict_routing_ips) + return failed_to_add; + } + else { TORRENT_ASSERT(existing->id != e.id); // this is the same IP and port, but with @@ -620,8 +633,8 @@ routing_table::add_node_status_t routing_table::add_node_impl(node_entry e) table_t::iterator i = find_bucket(e.id); bucket_t& b = i->live_nodes; bucket_t& rb = i->replacements; - int bucket_index = std::distance(m_buckets.begin(), i); - int bucket_size_limit = bucket_limit(bucket_index); + int const bucket_index = std::distance(m_buckets.begin(), i); + int const bucket_size_limit = bucket_limit(bucket_index); bucket_t::iterator j; @@ -723,7 +736,7 @@ routing_table::add_node_status_t routing_table::add_node_impl(node_entry e) // can we split the bucket? // only nodes that haven't failed can split the bucket, and we can only // split the last bucket - const bool can_split = (boost::next(i) == m_buckets.end() + bool const can_split = (boost::next(i) == m_buckets.end() && m_buckets.size() < 159) && e.fail_count() == 0 && (i == m_buckets.begin() || boost::prior(i)->live_nodes.size() > 1); @@ -874,10 +887,13 @@ routing_table::add_node_status_t routing_table::add_node_impl(node_entry e) *j = e; m_ips.insert(e.addr().to_v4().to_bytes()); #ifndef TORRENT_DISABLE_LOGGING - char hex_id[41]; - to_hex(e.id.data(), sha1_hash::size, hex_id); - m_log->log(dht_logger::routing_table, "replacing node with higher RTT: %s %s" - , hex_id, print_address(e.addr()).c_str()); + if (m_log) + { + char hex_id[41]; + to_hex(e.id.data(), sha1_hash::size, hex_id); + m_log->log(dht_logger::routing_table, "replacing node with higher RTT: %s %s" + , hex_id, print_address(e.addr()).c_str()); + } #endif return node_added; } @@ -931,8 +947,8 @@ void routing_table::split_bucket() { INVARIANT_CHECK; - int bucket_index = m_buckets.size()-1; - int bucket_size_limit = bucket_limit(bucket_index); + int const bucket_index = m_buckets.size()-1; + int const bucket_size_limit = bucket_limit(bucket_index); TORRENT_ASSERT(int(m_buckets.back().live_nodes.size()) >= bucket_size_limit); // this is the last bucket, and it's full already. Split @@ -946,10 +962,11 @@ void routing_table::split_bucket() // move any node whose (160 - distane_exp(m_id, id)) >= (i - m_buckets.begin()) // to the new bucket - int new_bucket_size = bucket_limit(bucket_index + 1); + int const new_bucket_size = bucket_limit(bucket_index + 1); for (bucket_t::iterator j = b.begin(); j != b.end();) { - if (distance_exp(m_id, j->id) >= 159 - bucket_index) + int const d = distance_exp(m_id, j->id); + if (d >= 159 - bucket_index) { ++j; continue; @@ -1048,7 +1065,7 @@ void routing_table::node_failed(node_id const& nid, udp::endpoint const& ep) #ifndef TORRENT_DISABLE_LOGGING char hex_id[41]; - to_hex(reinterpret_cast(&nid[0]), 20, hex_id); + to_hex(nid.data(), 20, hex_id); m_log->log(dht_logger::routing_table, "NODE FAILED id: %s ip: %s fails: %d pinged: %d up-time: %d" , hex_id, print_endpoint(j->ep()).c_str() , int(j->fail_count()) @@ -1069,7 +1086,7 @@ void routing_table::node_failed(node_id const& nid, udp::endpoint const& ep) #ifndef TORRENT_DISABLE_LOGGING char hex_id[41]; - to_hex(reinterpret_cast(&nid[0]), 20, hex_id); + to_hex(nid.data(), 20, hex_id); m_log->log(dht_logger::routing_table, "NODE FAILED id: %s ip: %s fails: %d pinged: %d up-time: %d" , hex_id, print_endpoint(j->ep()).c_str() , int(j->fail_count()) diff --git a/src/kademlia/rpc_manager.cpp b/src/kademlia/rpc_manager.cpp index 613619d44..4f46cad71 100644 --- a/src/kademlia/rpc_manager.cpp +++ b/src/kademlia/rpc_manager.cpp @@ -475,7 +475,7 @@ bool rpc_manager::invoke(entry& e, udp::endpoint target_addr if (m_sock->send_packet(e, target_addr, 1)) { - m_transactions.insert(std::make_pair(tid,o)); + m_transactions.insert(std::make_pair(tid, o)); #if TORRENT_USE_ASSERTS o->m_was_sent = true; #endif diff --git a/src/session_impl.cpp b/src/session_impl.cpp index 9d31d85d8..18656e1f3 100644 --- a/src/session_impl.cpp +++ b/src/session_impl.cpp @@ -5516,7 +5516,7 @@ retry: , boost::ref(m_udp_socket), boost::cref(m_dht_settings) , boost::ref(m_stats_counters) , m_dht_storage_constructor - , &startup_state); + , startup_state); for (std::vector::iterator i = m_dht_router_nodes.begin() , end(m_dht_router_nodes.end()); i != end; ++i) @@ -6731,12 +6731,17 @@ retry: // since we have a new external IP now, we need to // restart the DHT with a new node ID + #ifndef TORRENT_DISABLE_DHT // TODO: 1 we only need to do this if our global IPv4 address has changed // since the DHT (currently) only supports IPv4. Since restarting the DHT // is kind of expensive, it would be nice to not do it unnecessarily if (m_dht) { + // TODO: 3 instead of restarting the whole DHT, change the external IP, + // node ID and re-jiggle the routing table in-place. A complete restart + // throws away all outstanding requests, which may be significant + // during bootstrap entry s = m_dht->state(); int cur_state = 0; int prev_state = 0; diff --git a/test/test_dht.cpp b/test/test_dht.cpp index cb1b1cd5f..14a1d8f20 100644 --- a/test/test_dht.cpp +++ b/test/test_dht.cpp @@ -1194,32 +1194,57 @@ TORRENT_TEST(dht) // test kademlia functions - // this is a bit too expensive to do under valgrind -#ifndef TORRENT_USE_VALGRIND - for (int i = 0; i < 160; i += 8) - { - for (int j = 0; j < 160; j += 8) - { - node_id a(0); - a[(159-i) / 8] = 1 << (i & 7); - node_id b(0); - b[(159-j) / 8] = 1 << (j & 7); - int dist = distance_exp(a, b); + // distance_exp - TEST_CHECK(dist >= 0 && dist < 160); - TEST_CHECK(dist == ((i == j)?0:(std::max)(i, j))); + TEST_EQUAL(distance_exp( + to_hash("ffffffffffffffffffffffffffffffffffffffff"), + to_hash("0000000000000000000000000000000000000000")) + , 159); - for (int k = 0; k < 160; k += 8) - { - node_id c(0); - c[(159-k) / 8] = 1 << (k & 7); + TEST_EQUAL(distance_exp( + to_hash("ffffffffffffffffffffffffffffffffffffffff"), + to_hash("7fffffffffffffffffffffffffffffffffffffff")) + , 159); - bool cmp = compare_ref(a, b, c); - TEST_CHECK(cmp == (distance(a, c) < distance(b, c))); - } - } - } -#endif + TEST_EQUAL(distance_exp( + to_hash("ffffffffffffffffffffffffffffffffffffffff"), + to_hash("ffffffffffffffffffffffffffffffffffffffff")) + , 0); + + TEST_EQUAL(distance_exp( + to_hash("ffffffffffffffffffffffffffffffffffffffff"), + to_hash("fffffffffffffffffffffffffffffffffffffffe")) + , 0); + + TEST_EQUAL(distance_exp( + to_hash("8000000000000000000000000000000000000000"), + to_hash("fffffffffffffffffffffffffffffffffffffffe")) + , 158); + + TEST_EQUAL(distance_exp( + to_hash("c000000000000000000000000000000000000000"), + to_hash("fffffffffffffffffffffffffffffffffffffffe")) + , 157); + + TEST_EQUAL(distance_exp( + to_hash("e000000000000000000000000000000000000000"), + to_hash("fffffffffffffffffffffffffffffffffffffffe")) + , 156); + + TEST_EQUAL(distance_exp( + to_hash("f000000000000000000000000000000000000000"), + to_hash("fffffffffffffffffffffffffffffffffffffffe")) + , 155); + + TEST_EQUAL(distance_exp( + to_hash("f8f2340985723049587230495872304958703294"), + to_hash("f743589043r890f023980f90e203980d090c3840")) + , 155); + + TEST_EQUAL(distance_exp( + to_hash("ffff740985723049587230495872304958703294"), + to_hash("ffff889043r890f023980f90e203980d090c3840")) + , 159 - 16); { // test kademlia routing table @@ -2500,5 +2525,31 @@ TORRENT_TEST(rpc_invalid_error_msg) TEST_EQUAL(found, true); } +// test bucket distribution +TORRENT_TEST(node_id_bucket_distribution) +{ + int nodes_per_bucket[160] = {0}; + dht::node_id reference_id = generate_id(rand_v4()); + int const num_samples = 100000; + for (int i = 0; i < num_samples; ++i) + { + dht::node_id nid = generate_id(rand_v4()); + int const bucket = 159 - distance_exp(reference_id, nid); + ++nodes_per_bucket[bucket]; + } + + for (int i = 0; i < 25; ++i) + { + printf("%3d ", nodes_per_bucket[i]); + } + printf("\n"); + + int expected = num_samples / 2; + for (int i = 0; i < 25; ++i) + { + TEST_CHECK(std::abs(nodes_per_bucket[i] - expected) < num_samples / 20); + expected /= 2; + } +} #endif