From 11cd7af4e79711d4b040d9945b21987496febe36 Mon Sep 17 00:00:00 2001 From: Arvid Norberg Date: Sun, 3 Jan 2010 11:08:39 +0000 Subject: [PATCH] optimized DHT routing table memory usage --- ChangeLog | 1 + include/libtorrent/kademlia/node.hpp | 9 +- include/libtorrent/kademlia/node_entry.hpp | 1 + include/libtorrent/kademlia/routing_table.hpp | 160 ++--- include/libtorrent/kademlia/rpc_manager.hpp | 3 +- src/kademlia/dht_tracker.cpp | 22 +- src/kademlia/find_data.cpp | 12 +- src/kademlia/node.cpp | 81 +-- src/kademlia/routing_table.cpp | 585 ++++++++++-------- src/kademlia/rpc_manager.cpp | 8 +- test/test_primitives.cpp | 38 +- 11 files changed, 437 insertions(+), 483 deletions(-) diff --git a/ChangeLog b/ChangeLog index ccc41d593..4bab24e27 100644 --- a/ChangeLog +++ b/ChangeLog @@ -5,6 +5,7 @@ * cleaned up usage of MAX_PATH and related macros * made it possible to build libtorrent without RTTI support * added support to build with libgcrypt and a shipped version of libtommath + * optimized DHT routing table memory usage 0.15 release diff --git a/include/libtorrent/kademlia/node.hpp b/include/libtorrent/kademlia/node.hpp index a41a7d971..d3c7ee02d 100644 --- a/include/libtorrent/kademlia/node.hpp +++ b/include/libtorrent/kademlia/node.hpp @@ -184,6 +184,7 @@ public: virtual ~node_impl() {} + void tick(); void refresh(node_id const& id, find_data::nodes_callback const& f); void bootstrap(std::vector const& nodes , find_data::nodes_callback const& f); @@ -200,15 +201,8 @@ public: return ret; } - void refresh(); - void refresh_bucket(int bucket); int bucket_size(int bucket); - typedef routing_table::iterator iterator; - - iterator begin() const { return m_table.begin(); } - iterator end() const { return m_table.end(); } - node_id const& nid() const { return m_id; } boost::tuple size() const{ return m_table.size(); } @@ -233,7 +227,6 @@ public: // the returned time is the delay until connection_timeout() // should be called again the next time time_duration connection_timeout(); - time_duration refresh_timeout(); // generates a new secret number used to generate write tokens void new_write_key(); diff --git a/include/libtorrent/kademlia/node_entry.hpp b/include/libtorrent/kademlia/node_entry.hpp index def471d75..d636d9234 100644 --- a/include/libtorrent/kademlia/node_entry.hpp +++ b/include/libtorrent/kademlia/node_entry.hpp @@ -52,6 +52,7 @@ struct node_entry first_seen = time_now(); #endif } + node_entry(udp::endpoint ep) : addr(ep.address()) , port(ep.port()) diff --git a/include/libtorrent/kademlia/routing_table.hpp b/include/libtorrent/kademlia/routing_table.hpp index 160a720c2..28184ca2d 100644 --- a/include/libtorrent/kademlia/routing_table.hpp +++ b/include/libtorrent/kademlia/routing_table.hpp @@ -36,8 +36,6 @@ POSSIBILITY OF SUCH DAMAGE. #include #include -#include -#include #include #include #include @@ -67,11 +65,16 @@ TORRENT_DECLARE_LOG(table); typedef std::vector bucket_t; +struct routing_table_node +{ + bucket_t replacements; + bucket_t live_nodes; + ptime last_active; +}; + // differences in the implementation from the description in // the paper: // -// * The routing table tree is not allocated dynamically, there -// are always 160 buckets. // * Nodes are not marked as being stale, they keep a counter // that tells how many times in a row they have failed. When // a new node is to be inserted, the node that has failed @@ -79,89 +82,9 @@ typedef std::vector bucket_t; // bucket has failed, then it is put in the replacement // cache (just like in the paper). -class routing_table; - -namespace aux -{ - - // Iterates over a flattened routing_table structure. - class routing_table_iterator - : public boost::iterator_facade< - routing_table_iterator - , node_entry const - , boost::forward_traversal_tag - > - { - public: - routing_table_iterator() - { - } - - private: - friend class libtorrent::dht::routing_table; - friend class boost::iterator_core_access; - - typedef boost::array, 160>::const_iterator - bucket_iterator_t; - - routing_table_iterator( - bucket_iterator_t begin - , bucket_iterator_t end) - : m_bucket_iterator(begin) - , m_bucket_end(end) - { - if (m_bucket_iterator == m_bucket_end) return; - m_iterator = begin->first.begin(); - while (m_iterator == m_bucket_iterator->first.end()) - { - if (++m_bucket_iterator == m_bucket_end) - break; - m_iterator = m_bucket_iterator->first.begin(); - } - } - - bool equal(routing_table_iterator const& other) const - { - return m_bucket_iterator == other.m_bucket_iterator - && (m_bucket_iterator == m_bucket_end - || *m_iterator == other.m_iterator); - } - - void increment() - { - TORRENT_ASSERT(m_bucket_iterator != m_bucket_end); - ++*m_iterator; - while (*m_iterator == m_bucket_iterator->first.end()) - { - if (++m_bucket_iterator == m_bucket_end) - break; - m_iterator = m_bucket_iterator->first.begin(); - } - } - - node_entry const& dereference() const - { - TORRENT_ASSERT(m_bucket_iterator != m_bucket_end); - return **m_iterator; - } - - bucket_iterator_t m_bucket_iterator; - bucket_iterator_t m_bucket_end; - // when debug iterators are enabled, default constructed - // iterators are not allowed to be copied. In the case - // where the routing table is empty, m_iterator would be - // default constructed and not copyable. - boost::optional m_iterator; - }; - -} // namespace aux - class TORRENT_EXPORT routing_table { public: - typedef aux::routing_table_iterator iterator; - typedef iterator const_iterator; - routing_table(node_id const& id, int bucket_size , dht_settings const& settings); @@ -178,46 +101,46 @@ public: router_iterator router_begin() const { return m_router_nodes.begin(); } router_iterator router_end() const { return m_router_nodes.end(); } + bool add_node(node_entry const& e); + // this function is called every time the node sees // a sign of a node being alive. This node will either // be inserted in the k-buckets or be moved to the top // of its bucket. - bool node_seen(node_id const& id, udp::endpoint addr); + bool node_seen(node_id const& id, udp::endpoint ep); + + // this may add a node to the routing table and mark it as + // not pinged. If the bucket the node falls into is full, + // the node will be ignored. + void heard_about(node_id const& id, udp::endpoint const& ep); - // returns time when the given bucket needs another refresh. - // if the given bucket is empty but there are nodes - // in a bucket closer to us, or if the bucket is non-empty and - // the time from the last activity is more than 15 minutes - ptime next_refresh(int bucket); + // if any bucket in the routing table needs to be refreshed + // this function will return true and set the target to an + // appropriate target inside that bucket + bool need_refresh(node_id& target) const; enum { - include_self = 1, - include_failed = 2 + include_failed = 1 }; // fills the vector with the count nodes from our buckets that // are nearest to the given id. void find_node(node_id const& id, std::vector& l , int options, int count = 0); - // this may add a node to the routing table and mark it as - // not pinged. If the bucket the node falls into is full, - // the node will be ignored. - void heard_about(node_id const& id, udp::endpoint const& ep); - - // this will set the given bucket's latest activity - // to the current time - void touch_bucket(int bucket); - int bucket_size(int bucket) { - TORRENT_ASSERT(bucket >= 0 && bucket < 160); - return (int)m_buckets[bucket].first.size(); + int num_buckets = m_buckets.size(); + if (bucket < num_buckets) bucket = num_buckets - 1; + table_t::iterator i = m_buckets.begin(); + std::advance(i, bucket); + return (int)i->live_nodes.size(); } - int bucket_size() const { return m_bucket_size; } - iterator begin() const; - iterator end() const; + void for_each_node(void (*)(void*, node_entry const&) + , void (*)(void*, node_entry const&), void* userdata) const; + + int bucket_size() const { return m_bucket_size; } boost::tuple size() const; size_type num_global_nodes() const; @@ -225,11 +148,11 @@ public: // returns true if there are no working nodes // in the routing table bool need_bootstrap() const; - int num_active_buckets() const - { return 160 - m_lowest_active_bucket + 1; } + int num_active_buckets() const { return m_buckets.size(); } void replacement_cache(bucket_t& nodes) const; -#ifdef TORRENT_DHT_VERBOSE_LOGGING + +#if defined TORRENT_DHT_VERBOSE_LOGGING || defined TORRENT_DEBUG // used for debug and monitoring purposes. This will print out // the state of the routing table to the given stream void print_state(std::ostream& os) const; @@ -237,17 +160,23 @@ public: private: + typedef std::list table_t; + + table_t::iterator find_bucket(node_id const& id); + // constant called k in paper int m_bucket_size; dht_settings const& m_settings; - // 160 (k-bucket, replacement cache) pairs - typedef boost::array, 160> table_t; + // (k-bucket, replacement cache) pairs + // the first entry is the bucket the furthest + // away from our own ID. Each time the bucket + // closest to us (m_buckets.back()) has more than + // bucket size nodes in it, another bucket is + // added to the end and it's split up between them table_t m_buckets; - // timestamps of the last activity in each bucket - typedef boost::array table_activity_t; - table_activity_t m_bucket_activity; + node_id m_id; // our own node id // this is a set of all the endpoints that have @@ -255,9 +184,6 @@ private: // be used in searches, but they will never // be added to the routing table. std::set m_router_nodes; - - // this is the lowest bucket index with nodes in it - int m_lowest_active_bucket; }; } } // namespace libtorrent::dht diff --git a/include/libtorrent/kademlia/rpc_manager.hpp b/include/libtorrent/kademlia/rpc_manager.hpp index ba59a2a96..2089d4634 100644 --- a/include/libtorrent/kademlia/rpc_manager.hpp +++ b/include/libtorrent/kademlia/rpc_manager.hpp @@ -74,7 +74,8 @@ public: void unreachable(udp::endpoint const& ep); // returns true if the node needs a refresh - bool incoming(msg const&); + // if so, id is assigned the node id to refresh + bool incoming(msg const&, node_id* id); time_duration tick(); bool invoke(entry& e, udp::endpoint target diff --git a/src/kademlia/dht_tracker.cpp b/src/kademlia/dht_tracker.cpp index 0bd7530db..34e1064eb 100644 --- a/src/kademlia/dht_tracker.cpp +++ b/src/kademlia/dht_tracker.cpp @@ -322,9 +322,9 @@ namespace libtorrent { namespace dht mutex_t::scoped_lock l(m_mutex); if (e || m_abort) return; - time_duration d = m_dht.refresh_timeout(); + m_dht.tick(); error_code ec; - m_refresh_timer.expires_from_now(d, ec); + m_refresh_timer.expires_from_now(seconds(5), ec); m_refresh_timer.async_wait( bind(&dht_tracker::refresh_timeout, self(), _1)); } @@ -542,20 +542,22 @@ namespace libtorrent { namespace dht m_dht.incoming(m); } + void add_node_fun(void* userdata, node_entry const& e) + { + entry* n = (entry*)userdata; + std::string node; + std::back_insert_iterator out(node); + write_endpoint(e.ep(), out); + n->list().push_back(entry(node)); + } + entry dht_tracker::state() const { mutex_t::scoped_lock l(m_mutex); entry ret(entry::dictionary_t); { entry nodes(entry::list_t); - for (node_impl::iterator i(m_dht.begin()) - , end(m_dht.end()); i != end; ++i) - { - std::string node; - std::back_insert_iterator out(node); - write_endpoint(udp::endpoint(i->addr, i->port), out); - nodes.list().push_back(entry(node)); - } + m_dht.m_table.for_each_node(&add_node_fun, &add_node_fun, &nodes); bucket_t cache; m_dht.replacement_cache(cache); for (bucket_t::iterator i(cache.begin()) diff --git a/src/kademlia/find_data.cpp b/src/kademlia/find_data.cpp index b30c704d3..0e93ebb9a 100644 --- a/src/kademlia/find_data.cpp +++ b/src/kademlia/find_data.cpp @@ -169,6 +169,12 @@ void find_data_observer::reply(msg const& m) done(); } +void add_entry_fun(void* userdata, node_entry const& e) +{ + traversal_algorithm* f = (traversal_algorithm*)userdata; + f->add_entry(e.id, e.ep(), traversal_algorithm::result::initial); +} + find_data::find_data( node_impl& node , node_id target @@ -181,11 +187,7 @@ find_data::find_data( , m_done(false) , m_got_peers(false) { - for (routing_table::const_iterator i = node.m_table.begin() - , end(node.m_table.end()); i != end; ++i) - { - add_entry(i->id, i->ep(), result::initial); - } + node.m_table.for_each_node(&add_entry_fun, 0, (traversal_algorithm*)this); } bool find_data::invoke(udp::endpoint addr) diff --git a/src/kademlia/node.cpp b/src/kademlia/node.cpp index 24e96338e..6cdc12e3d 100644 --- a/src/kademlia/node.cpp +++ b/src/kademlia/node.cpp @@ -267,13 +267,13 @@ void node_impl::bootstrap(std::vector const& nodes r->start(); } - +/* void node_impl::refresh() { boost::intrusive_ptr r(new dht::refresh(*this, m_id, boost::bind(&nop))); r->start(); } - +*/ int node_impl::bucket_size(int bucket) { return m_table.bucket_size(bucket); @@ -285,39 +285,6 @@ void node_impl::new_write_key() m_secret[0] = std::rand(); } -void node_impl::refresh_bucket(int bucket) -{ - TORRENT_ASSERT(bucket >= 0 && bucket < 160); - - // generate a random node_id within the given bucket - node_id target = generate_id(); - int num_bits = 160 - bucket; - node_id mask(0); - for (int i = 0; i < num_bits; ++i) - { - int byte = i / 8; - mask[byte] |= 0x80 >> (i % 8); - } - - node_id root = m_id; - root &= mask; - target &= ~mask; - target |= root; - - // make sure this is in another subtree than m_id - // clear the (num_bits - 1) bit and then set it to the - // inverse of m_id's corresponding bit. - target[(num_bits - 1) / 8] &= ~(0x80 >> ((num_bits - 1) % 8)); - target[(num_bits - 1) / 8] |= - (~(m_id[(num_bits - 1) / 8])) & (0x80 >> ((num_bits - 1) % 8)); - - TORRENT_ASSERT(distance_exp(m_id, target) == bucket); - - boost::intrusive_ptr ta(new dht::refresh(*this, target, bind(&nop))); - ta->start(); - m_table.touch_bucket(bucket); -} - void node_impl::unreachable(udp::endpoint const& ep) { m_rpc.unreachable(ep); @@ -341,7 +308,9 @@ void node_impl::incoming(msg const& m) { case 'r': { - if (m_rpc.incoming(m)) refresh(); + node_id id; + if (m_rpc.incoming(m, &id)) + refresh(id, boost::bind(&nop)); break; } case 'q': @@ -451,43 +420,11 @@ void node_impl::announce(sha1_hash const& info_hash, int listen_port ta->start(); } -time_duration node_impl::refresh_timeout() +void node_impl::tick() { - int refresh = -1; - ptime now = time_now(); - ptime next = now + minutes(15); - for (int i = 0; i < 160; ++i) - { - ptime r = m_table.next_refresh(i); - if (r <= next) - { - refresh = i; - next = r; - } - } - if (next < now) - { - TORRENT_ASSERT(refresh > -1); -#ifdef TORRENT_DHT_VERBOSE_LOGGING - TORRENT_LOG(node) << "refreshing bucket: " << refresh; -#endif - refresh_bucket(refresh); - } - - time_duration next_refresh = next - now; - time_duration min_next_refresh - = minutes(15) / m_table.num_active_buckets(); - if (min_next_refresh > seconds(40)) - min_next_refresh = seconds(40); - - if (next_refresh < min_next_refresh) - next_refresh = min_next_refresh; - -#ifdef TORRENT_DHT_VERBOSE_LOGGING - TORRENT_LOG(node) << "next refresh: " << total_seconds(next_refresh) << " seconds"; -#endif - - return next_refresh; + node_id target; + if (m_table.need_refresh(target)) + refresh(target, boost::bind(&nop)); } time_duration node_impl::connection_timeout() diff --git a/src/kademlia/routing_table.cpp b/src/kademlia/routing_table.cpp index f0e91a695..bcc963530 100644 --- a/src/kademlia/routing_table.cpp +++ b/src/kademlia/routing_table.cpp @@ -60,13 +60,7 @@ routing_table::routing_table(node_id const& id, int bucket_size : m_bucket_size(bucket_size) , m_settings(settings) , m_id(id) - , m_lowest_active_bucket(160) { - // distribute the refresh times for the buckets in an - // attempt to even out the network load - for (int i = 0; i < 160; ++i) - m_bucket_activity[i] = time_now() - milliseconds(i*5625); - m_bucket_activity[0] = time_now() - minutes(15); } void routing_table::status(session_status& s) const @@ -82,27 +76,25 @@ boost::tuple routing_table::size() const for (table_t::const_iterator i = m_buckets.begin() , end(m_buckets.end()); i != end; ++i) { - nodes += i->first.size(); - replacements += i->second.size(); + nodes += i->live_nodes.size(); + replacements += i->replacements.size(); } return boost::make_tuple(nodes, replacements); } size_type routing_table::num_global_nodes() const { - int first_full = m_lowest_active_bucket; int num_nodes = 1; // we are one of the nodes - for (; first_full < 160 - && int(m_buckets[first_full].first.size()) < m_bucket_size; - ++first_full) + for (table_t::const_iterator i = m_buckets.begin() + , end(m_buckets.end()); i != end; ++i) { - num_nodes += m_buckets[first_full].first.size(); + num_nodes += i->live_nodes.size(); } - return (2 << (160 - first_full)) * num_nodes; + return (2 << m_buckets.size()) * num_nodes; } -#ifdef TORRENT_DHT_VERBOSE_LOGGING +#if defined TORRENT_DHT_VERBOSE_LOGGING || defined TORRENT_DEBUG void routing_table::print_state(std::ostream& os) const { @@ -116,27 +108,24 @@ void routing_table::print_state(std::ostream& os) const os << "-"; os << "\n"; - for (int k = 0; k < 8; ++k) + for (int k = 0; k < m_bucket_size; ++k) { for (table_t::const_iterator i = m_buckets.begin(), end(m_buckets.end()); i != end; ++i) { - os << (int(i->first.size()) > (7 - k) ? "|" : " "); + os << (int(i->live_nodes.size()) > (m_bucket_size - 1 - k) ? "|" : " "); } os << "\n"; } - for (table_t::const_iterator i = m_buckets.begin(), end(m_buckets.end()); - i != end; ++i) - { - os << "+"; - } + for (int i = 0; i < 160; ++i) os << "+"; os << "\n"; - for (int k = 0; k < 8; ++k) + + for (int k = 0; k < m_bucket_size; ++k) { for (table_t::const_iterator i = m_buckets.begin(), end(m_buckets.end()); i != end; ++i) { - os << (int(i->second.size()) > k ? "|" : " "); + os << (int(i->replacements.size()) > k ? "|" : " "); } os << "\n"; } @@ -146,34 +135,68 @@ void routing_table::print_state(std::ostream& os) const os << "\n\n"; os << "nodes:\n"; + int bucket_index = 0; for (table_t::const_iterator i = m_buckets.begin(), end(m_buckets.end()); - i != end; ++i) + i != end; ++i, ++bucket_index) { - if (i->first.empty()) continue; - int bucket_index = int(i - m_buckets.begin()); +// if (i->live_nodes.empty()) continue; os << "=== BUCKET = " << bucket_index - << " = " << (bucket_index >= m_lowest_active_bucket?"active":"inactive") - << " = " << total_seconds(time_now() - m_bucket_activity[bucket_index]) + << " = " << total_seconds(time_now() - i->last_active) << " seconds ago ===== \n"; - for (bucket_t::const_iterator j = i->first.begin() - , end(i->first.end()); j != end; ++j) + for (bucket_t::const_iterator j = i->live_nodes.begin() + , end(i->live_nodes.end()); j != end; ++j) { os << " id: " << j->id << " ip: " << j->ep() << " fails: " << j->fail_count() << " pinged: " << j->pinged() + << " dist: " << distance_exp(m_id, j->id) << "\n"; } } } #endif - +/* void routing_table::touch_bucket(int bucket) { m_bucket_activity[bucket] = time_now(); } +*/ +bool routing_table::need_refresh(node_id& target) const +{ + if (m_buckets.empty()) return false; + + table_t::const_iterator i = std::min_element(m_buckets.begin(), m_buckets.end() + , boost::bind(&routing_table_node::last_active, _1) + < boost::bind(&routing_table_node::last_active, _2)); + + if (i->last_active > time_now() - minutes(15)) return false; + + // generate a random node_id within the given bucket + target = generate_id(); + int num_bits = std::distance(m_buckets.begin(), i); + node_id mask(0); + for (int i = 0; i < num_bits; ++i) mask[i/8] |= 0x80 >> (i&7); + + // target = (target & ~mask) | (root & mask) + node_id root = m_id; + root &= mask; + target &= ~mask; + target |= root; + + // make sure this is in another subtree than m_id + // clear the (num_bits - 1) bit and then set it to the + // inverse of m_id's corresponding bit. + target[(num_bits - 1) / 8] &= ~(0x80 >> ((num_bits - 1) % 8)); + target[(num_bits - 1) / 8] |= + (~(m_id[(num_bits - 1) / 8])) & (0x80 >> ((num_bits - 1) % 8)); + + TORRENT_ASSERT(distance_exp(m_id, target) == 160 - std::distance(m_buckets.begin(), i)); + return true; +} +/* ptime routing_table::next_refresh(int bucket) { TORRENT_ASSERT(bucket < 160); @@ -184,69 +207,265 @@ ptime routing_table::next_refresh(int bucket) return time_now() + minutes(15); return m_bucket_activity[bucket] + minutes(15); } +*/ void routing_table::replacement_cache(bucket_t& nodes) const { for (table_t::const_iterator i = m_buckets.begin() , end(m_buckets.end()); i != end; ++i) { - std::copy(i->second.begin(), i->second.end() + std::copy(i->replacements.begin(), i->replacements.end() , std::back_inserter(nodes)); } } -void routing_table::heard_about(node_id const& id, udp::endpoint const& ep) +routing_table::table_t::iterator routing_table::find_bucket(node_id const& id) { - int bucket_index = distance_exp(m_id, id); - TORRENT_ASSERT(bucket_index < (int)m_buckets.size()); +// TORRENT_ASSERT(id != m_id); + + int num_buckets = m_buckets.size(); + if (num_buckets == 0) + { + m_buckets.push_back(routing_table_node()); + ++num_buckets; + } + + int bucket_index = (std::min)(159 - distance_exp(m_id, id), num_buckets - 1); + TORRENT_ASSERT(bucket_index < m_buckets.size()); TORRENT_ASSERT(bucket_index >= 0); - bucket_t& b = m_buckets[bucket_index].first; - bucket_t& rb = m_buckets[bucket_index].second; + + table_t::iterator i = m_buckets.begin(); + std::advance(i, bucket_index); + return i; +} + +bool routing_table::add_node(node_entry const& e) +{ + if (m_router_nodes.find(e.ep()) != m_router_nodes.end()) return false; + + bool ret = need_bootstrap(); + + // don't add ourself + if (e.id == m_id) return ret; + + table_t::iterator i = find_bucket(e.id); + bucket_t& b = i->live_nodes; + bucket_t& rb = i->replacements; // if the replacement cache is full, we don't // need another node. The table is fine the // way it is. - if ((int)rb.size() >= m_bucket_size) return; + if ((int)rb.size() >= m_bucket_size) return ret; // if the node already exists, we don't need it - if (std::find_if(b.begin(), b.end(), bind(&node_entry::id, _1) == id) - != b.end()) return; + bucket_t::iterator j = std::find_if(b.begin(), b.end() + , bind(&node_entry::id, _1) == e.id); - if (std::find_if(rb.begin(), rb.end(), bind(&node_entry::id, _1) == id) - != rb.end()) return; - - if (b.size() < m_bucket_size) + if (j != b.end()) { - if (bucket_index < m_lowest_active_bucket - && bucket_index > 0) - m_lowest_active_bucket = bucket_index; - b.push_back(node_entry(id, ep, false)); - return; + // we already have the node in our bucket + // just move it to the back since it was + // the last node we had any contact with + // in this bucket + *j = e; +// TORRENT_LOG(table) << "updating node: " << i->id << " " << i->addr; + return ret; } - if (rb.size() < m_bucket_size) - rb.push_back(node_entry(id, ep, false)); + if (std::find_if(rb.begin(), rb.end(), bind(&node_entry::id, _1) == e.id) + != rb.end()) return ret; + + // if the node was not present in our list + // we will only insert it if there is room + // for it, or if some of our nodes have gone + // offline + if (b.size() < m_bucket_size) + { + if (b.empty()) b.reserve(m_bucket_size); + b.push_back(e); +// TORRENT_LOG(table) << "inserting node: " << e.id << " " << e.addr; + return ret; + } + + // if there is no room, we look for nodes that are not 'pinged', + // i.e. we haven't confirmed that they respond to messages. + // Then we look for nodes marked as stale + // in the k-bucket. If we find one, we can replace it. + + // can we split the bucket? + bool can_split = false; + + if (e.pinged() && e.fail_count() == 0) + { + // only nodes that are pinged and haven't failed + // can split the bucket, and we can only split + // the last bucket + can_split = (boost::next(i) == m_buckets.end() && m_buckets.size() < 160); + + // if the node we're trying to insert is considered pinged, + // we may replace other nodes that aren't pinged + + j = std::find_if(b.begin(), b.end(), bind(&node_entry::pinged, _1) == false); + + if (j != b.end() && !j->pinged()) + { + // j points to a node that has not been pinged. + // Replace it with this new one + b.erase(j); + b.push_back(e); +// TORRENT_LOG(table) << "replacing unpinged node: " << e.id << " " << e.addr; + return ret; + } + + // A node is considered stale if it has failed at least one + // time. Here we choose the node that has failed most times. + // If we don't find one, place this node in the replacement- + // cache and replace any nodes that will fail in the future + // with nodes from that cache. + + j = std::max_element(b.begin(), b.end() + , bind(&node_entry::fail_count, _1) + < bind(&node_entry::fail_count, _2)); + + if (j != b.end() && j->fail_count() > 0) + { + // i points to a node that has been marked + // as stale. Replace it with this new one + b.erase(j); + b.push_back(e); +// TORRENT_LOG(table) << "replacing stale node: " << e.id << " " << e.addr; + return ret; + } + } + + // if we can't split, try to insert into the replacement bucket + + if (!can_split) + { + // if we don't have any identified stale nodes in + // the bucket, and the bucket is full, we have to + // cache this node and wait until some node fails + // and then replace it. + + j = std::find_if(rb.begin(), rb.end() + , bind(&node_entry::id, _1) == e.id); + + // if the node is already in the replacement bucket + // just return. + if (j != rb.end()) + { + // make sure we mark this node as pinged + // and if its address has changed, update + // that as well + *j = e; + return ret; + } + + if ((int)rb.size() >= m_bucket_size) + { + // if the replacement bucket is full, remove the oldest entry + // but prefer nodes that haven't been pinged, since they are + // less reliable than this one, that has been pinged + j = std::find_if(rb.begin(), rb.end(), bind(&node_entry::pinged, _1) == false); + rb.erase(j != rb.end() ? j : rb.begin()); + } + + if (rb.empty()) rb.reserve(m_bucket_size); + rb.push_back(e); +// TORRENT_LOG(table) << "inserting node in replacement cache: " << e.id << " " << e.addr; + return ret; + } + + // this is the last bucket, and it's full already. Split + // it by adding another bucket + m_buckets.push_back(routing_table_node()); + bucket_t& new_bucket = m_buckets.back().live_nodes; + bucket_t& new_replacement_bucket = m_buckets.back().replacements; + + // move any node whose (160 - distane_exp(m_id, id)) >= (i - m_buckets.begin()) + // to the new bucket + int bucket_index = std::distance(m_buckets.begin(), i); + for (bucket_t::iterator j = b.begin(); j != b.end();) + { + if (distance_exp(m_id, j->id) >= 159 - bucket_index) + { + ++j; + continue; + } + // this entry belongs in the new bucket + new_bucket.push_back(*j); + j = b.erase(j); + } + for (bucket_t::iterator j = rb.begin(); j != rb.end();) + { + if (distance_exp(m_id, j->id) >= 159 - bucket_index) + { + ++j; + continue; + } + // this entry belongs in the new bucket + new_replacement_bucket.push_back(*j); + j = rb.erase(j); + } + + // now insert the new node in the appropriate bucket + if (distance_exp(m_id, e.id) >= 159 - bucket_index) + { + if (b.size() < m_bucket_size) + b.push_back(e); + else if (rb.size() < m_bucket_size) + rb.push_back(e); + } + else + { + if (new_bucket.size() < m_bucket_size) + new_bucket.push_back(e); + else if (new_replacement_bucket.size() < m_bucket_size) + new_replacement_bucket.push_back(e); + } + return ret; +} + +void routing_table::for_each_node( + void (*fun1)(void*, node_entry const&) + , void (*fun2)(void*, node_entry const&) + , void* userdata) const +{ + for (table_t::const_iterator i = m_buckets.begin() + , end(m_buckets.end()); i != end; ++i) + { + if (fun1) + { + for (bucket_t::const_iterator j = i->live_nodes.begin() + , end(i->live_nodes.end()); j != end; ++j) + fun1(userdata, *j); + } + if (fun2) + { + for (bucket_t::const_iterator j = i->replacements.begin() + , end(i->replacements.end()); j != end; ++j) + fun2(userdata, *j); + } + } } void routing_table::node_failed(node_id const& id) { - int bucket_index = distance_exp(m_id, id); - TORRENT_ASSERT(bucket_index < (int)m_buckets.size()); - TORRENT_ASSERT(bucket_index >= 0); - bucket_t& b = m_buckets[bucket_index].first; - bucket_t& rb = m_buckets[bucket_index].second; + // if messages to ourself fails, ignore it + if (id == m_id) return; - bucket_t::iterator i = std::find_if(b.begin(), b.end() + table_t::iterator i = find_bucket(id); + bucket_t& b = i->live_nodes; + bucket_t& rb = i->replacements; + + bucket_t::iterator j = std::find_if(b.begin(), b.end() , bind(&node_entry::id, _1) == id); - if (i == b.end()) return; + if (j == b.end()) return; - // if messages to ourself fails, ignore it - if (bucket_index == 0) return; - if (rb.empty()) { - i->timed_out(); + j->timed_out(); #ifdef TORRENT_DHT_VERBOSE_LOGGING TORRENT_LOG(table) << " NODE FAILED" @@ -259,25 +478,17 @@ void routing_table::node_failed(node_id const& id) // if this node has failed too many times, or if this node // has never responded at all, remove it - if (i->fail_count() >= m_settings.max_fail_count || !i->pinged()) - { - b.erase(i); - TORRENT_ASSERT(m_lowest_active_bucket <= bucket_index); - while (m_lowest_active_bucket < 160 - && m_buckets[m_lowest_active_bucket].first.empty()) - { - ++m_lowest_active_bucket; - } - } + if (j->fail_count() >= m_settings.max_fail_count || !j->pinged()) + b.erase(j); return; } - b.erase(i); + b.erase(j); - i = std::find_if(rb.begin(), rb.end(), bind(&node_entry::pinged, _1) == true); - if (i == rb.end()) i = rb.begin(); - b.push_back(*i); - rb.erase(i); + j = std::find_if(rb.begin(), rb.end(), bind(&node_entry::pinged, _1) == true); + if (j == rb.end()) j = rb.begin(); + b.push_back(*j); + rb.erase(j); } void routing_table::add_router_node(udp::endpoint router) @@ -285,6 +496,13 @@ void routing_table::add_router_node(udp::endpoint router) m_router_nodes.insert(router); } +// we heard from this node, but we don't know if it +// was spoofed or not (i.e. pinged == false) +void routing_table::heard_about(node_id const& id, udp::endpoint const& ep) +{ + add_node(node_entry(id, ep, false)); +} + // this function is called every time the node sees // a sign of a node being alive. This node will either // be inserted in the k-buckets or be moved to the top @@ -292,132 +510,21 @@ void routing_table::add_router_node(udp::endpoint router) // the return value indicates if the table needs a refresh. // if true, the node should refresh the table (i.e. do a find_node // on its own id) -bool routing_table::node_seen(node_id const& id, udp::endpoint addr) +bool routing_table::node_seen(node_id const& id, udp::endpoint ep) { - if (m_router_nodes.find(addr) != m_router_nodes.end()) return false; - int bucket_index = distance_exp(m_id, id); - TORRENT_ASSERT(bucket_index < (int)m_buckets.size()); - TORRENT_ASSERT(bucket_index >= 0); - bucket_t& b = m_buckets[bucket_index].first; - - bucket_t::iterator i = std::find_if(b.begin(), b.end() - , bind(&node_entry::id, _1) == id); - - bool ret = need_bootstrap(); - - //m_bucket_activity[bucket_index] = time_now(); - - if (i != b.end()) - { - // we already have the node in our bucket - // just move it to the back since it was - // the last node we had any contact with - // in this bucket - i->set_pinged(); - i->reset_fail_count(); - i->addr = addr.address(); - i->port = addr.port(); -// TORRENT_LOG(table) << "updating node: " << id << " " << addr; - return ret; - } - - // if the node was not present in our list - // we will only insert it if there is room - // for it, or if some of our nodes have gone - // offline - if ((int)b.size() < m_bucket_size) - { - if (b.empty()) b.reserve(m_bucket_size); - b.push_back(node_entry(id, addr, true)); - // if bucket index is 0, the node is ourselves - // don't updated m_lowest_active_bucket - if (bucket_index < m_lowest_active_bucket - && bucket_index > 0) - m_lowest_active_bucket = bucket_index; -// TORRENT_LOG(table) << "inserting node: " << id << " " << addr; - return ret; - } - - // if there is no room, we look for nodes that are not 'pinged', - // i.e. we haven't confirmed that they respond to messages. - // Then we look for nodes marked as stale - // in the k-bucket. If we find one, we can replace it. - - i = std::find_if(b.begin(), b.end(), bind(&node_entry::pinged, _1) == false); - - if (i != b.end() && !i->pinged()) - { - // i points to a node that has not been pinged. - // Replace it with this new one - b.erase(i); - b.push_back(node_entry(id, addr, true)); -// TORRENT_LOG(table) << "replacing unpinged node: " << id << " " << addr; - return ret; - } - - // A node is considered stale if it has failed at least one - // time. Here we choose the node that has failed most times. - // If we don't find one, place this node in the replacement- - // cache and replace any nodes that will fail in the future - // with nodes from that cache. - - i = std::max_element(b.begin(), b.end() - , bind(&node_entry::fail_count, _1) - < bind(&node_entry::fail_count, _2)); - - if (i != b.end() && i->fail_count() > 0) - { - // i points to a node that has been marked - // as stale. Replace it with this new one - b.erase(i); - b.push_back(node_entry(id, addr, true)); -// TORRENT_LOG(table) << "replacing stale node: " << id << " " << addr; - return ret; - } - - // if we don't have any identified stale nodes in - // the bucket, and the bucket is full, we have to - // cache this node and wait until some node fails - // and then replace it. - - bucket_t& rb = m_buckets[bucket_index].second; - - i = std::find_if(rb.begin(), rb.end() - , bind(&node_entry::id, _1) == id); - - // if the node is already in the replacement bucket - // just return. - if (i != rb.end()) - { - // make sure we mark this node as pinged - // and if its address has changed, update - // that as well - i->set_pinged(); - i->reset_fail_count(); - i->addr = addr.address(); - i->port = addr.port(); - return ret; - } - - if ((int)rb.size() >= m_bucket_size) - { - // if the replacement bucket is full, remove the oldest entry - // but prefer nodes that haven't been pinged, since they are - // less reliable than this one, that has been pinged - i = std::find_if(rb.begin(), rb.end(), bind(&node_entry::pinged, _1) == false); - rb.erase(i != rb.end() ? i : rb.begin()); - } - if (rb.empty()) rb.reserve(m_bucket_size); - rb.push_back(node_entry(id, addr, true)); -// TORRENT_LOG(table) << "inserting node in replacement cache: " << id << " " << addr; - return ret; + return add_node(node_entry(id, ep, true)); } bool routing_table::need_bootstrap() const { - for (const_iterator i = begin(); i != end(); ++i) + for (table_t::const_iterator i = m_buckets.begin() + , end(m_buckets.end()); i != end; ++i) { - if (i->confirmed()) return false; + for (bucket_t::const_iterator j = i->live_nodes.begin() + , end(i->live_nodes.end()); j != end; ++i) + { + if (j->confirmed()) return false; + } } return true; } @@ -456,8 +563,8 @@ void routing_table::find_node(node_id const& target if (count == 0) count = m_bucket_size; l.reserve(count); - int bucket_index = distance_exp(m_id, target); - bucket_t& b = m_buckets[bucket_index].first; + table_t::iterator i = find_bucket(target); + bucket_t& b = i->live_nodes; // copy all nodes that hasn't failed into the target // vector. @@ -474,61 +581,43 @@ void routing_table::find_node(node_id const& target } TORRENT_ASSERT((int)l.size() <= count); - if (int(l.size()) == count) - { - TORRENT_ASSERT((options & include_failed) - || std::count_if(l.begin(), l.end() - , !boost::bind(&node_entry::confirmed, _1)) == 0); - return; - } + if (int(l.size()) >= count) return; // if we didn't have enough nodes in that bucket // we have to reply with nodes from buckets closer - // to us. i.e. all the buckets in the range - // [0, bucket_index) if we are to include ourself - // or [1, bucket_index) if not. - bucket_t tmpb; - for (int i = (options & include_self)?0:1; i < bucket_index; ++i) + // to us. + table_t::iterator j = i; + ++j; + + for (; j != m_buckets.end() && l.size() < count; ++j) { - bucket_t& b = m_buckets[i].first; + bucket_t& b = j->live_nodes; + size_t to_copy = (std::min)(count - l.size(), b.size()); if (options & include_failed) { - copy(b.begin(), b.end(), std::back_inserter(tmpb)); + copy(b.begin(), b.begin() + to_copy + , std::back_inserter(l)); } else { - std::remove_copy_if(b.begin(), b.end(), std::back_inserter(tmpb) + std::remove_copy_if(b.begin(), b.begin() + to_copy + , std::back_inserter(l) , !bind(&node_entry::confirmed, _1)); } } - if (count - l.size() < tmpb.size()) - { - std::random_shuffle(tmpb.begin(), tmpb.end()); - size_t to_copy = count - l.size(); - std::copy(tmpb.begin(), tmpb.begin() + to_copy, std::back_inserter(l)); - } - else - { - std::copy(tmpb.begin(), tmpb.end(), std::back_inserter(l)); - } - - TORRENT_ASSERT((int)l.size() <= count); + if (int(l.size()) >= count) return; - // return if we have enough nodes or if the bucket index - // is the biggest index available (there are no more buckets) - // to look in. - if (int(l.size()) == count) - { - TORRENT_ASSERT((options & include_failed) - || std::count_if(l.begin(), l.end() - , !boost::bind(&node_entry::confirmed, _1)) == 0); - return; - } + // if we still don't have enough nodes, copy nodes + // further away from us - for (size_t i = bucket_index + 1; i < m_buckets.size(); ++i) + if (i == m_buckets.begin()) return; + j = i; + --j; + + do { - bucket_t& b = m_buckets[i].first; + bucket_t& b = j->live_nodes; size_t to_copy = (std::min)(count - l.size(), b.size()); if (options & include_failed) @@ -540,22 +629,10 @@ void routing_table::find_node(node_id const& target copy_if_n(b.begin(), b.end(), std::back_inserter(l) , to_copy, bind(&node_entry::confirmed, _1)); } - TORRENT_ASSERT((int)l.size() <= count); - if (int(l.size()) == count) - { - TORRENT_ASSERT((options & include_failed) - || std::count_if(l.begin(), l.end() - , !boost::bind(&node_entry::confirmed, _1)) == 0); - return; - } } - TORRENT_ASSERT((int)l.size() <= count); - - TORRENT_ASSERT((options & include_failed) - || std::count_if(l.begin(), l.end() - , !boost::bind(&node_entry::confirmed, _1)) == 0); + while (j != m_buckets.begin() && l.size() < count); } - +/* routing_table::iterator routing_table::begin() const { // +1 to avoid ourself @@ -566,6 +643,6 @@ routing_table::iterator routing_table::end() const { return iterator(m_buckets.end(), m_buckets.end()); } - +*/ } } // namespace libtorrent::dht diff --git a/src/kademlia/rpc_manager.cpp b/src/kademlia/rpc_manager.cpp index a5c1cfed9..5fd80a2de 100644 --- a/src/kademlia/rpc_manager.cpp +++ b/src/kademlia/rpc_manager.cpp @@ -271,7 +271,7 @@ void rpc_manager::unreachable(udp::endpoint const& ep) // defined in node.cpp void incoming_error(entry& e, char const* msg); -bool rpc_manager::incoming(msg const& m) +bool rpc_manager::incoming(msg const& m, node_id* id) { INVARIANT_CHECK; @@ -342,7 +342,11 @@ bool rpc_manager::incoming(msg const& m) << tid << " from " << m.addr; #endif o->reply(m); - return m_table.node_seen(node_id(node_id_ent->string_ptr()), m.addr); + *id = node_id(node_id_ent->string_ptr()); + + // we found an observer for this reply, hence the node is not spoofing + // add it to the routing table + return m_table.node_seen(*id, m.addr); } time_duration rpc_manager::tick() diff --git a/test/test_primitives.cpp b/test/test_primitives.cpp index 50e937012..65b1d276e 100644 --- a/test/test_primitives.cpp +++ b/test/test_primitives.cpp @@ -138,6 +138,16 @@ void add_and_replace(libtorrent::dht::node_id& dst, libtorrent::dht::node_id con carry = sum > 255; } } + +void node_push_back(void* userdata, libtorrent::dht::node_entry const& n) +{ + using namespace libtorrent::dht; + std::vector* nv = (std::vector*)userdata; + nv->push_back(n); +} + +void nop(void* userdata, libtorrent::dht::node_entry const& n) {} + #endif char upnp_xml[] = @@ -1021,39 +1031,39 @@ int test_main() // test kademlia routing table dht_settings s; - node_id id = to_hash("6123456789abcdef01232456789abcdef0123456"); + node_id id = to_hash("3123456789abcdef01232456789abcdef0123456"); dht::routing_table table(id, 10, s); table.node_seen(id, udp::endpoint(address_v4::any(), rand())); - node_id tmp; - node_id diff = to_hash("00001f7459456a9453f8719b09547c11d5f34064"); + node_id tmp = id; + node_id diff = to_hash("15764f7459456a9453f8719b09547c11d5f34061"); std::vector nodes; - for (int i = 0; i < 1000; ++i) + for (int i = 0; i < 7000; ++i) { table.node_seen(tmp, udp::endpoint(address_v4::any(), rand())); add_and_replace(tmp, diff); } + TEST_EQUAL(table.num_active_buckets(), 11); - std::copy(table.begin(), table.end(), std::back_inserter(nodes)); +#if defined TORRENT_DHT_VERBOSE_LOGGING || defined TORRENT_DEBUG + table.print_state(std::cerr); +#endif + + table.for_each_node(node_push_back, nop, &nodes); std::cout << "nodes: " << nodes.size() << std::endl; std::vector temp; std::generate(tmp.begin(), tmp.end(), &std::rand); - table.find_node(tmp, temp, 0, nodes.size() + 1); + table.find_node(tmp, temp, 0, nodes.size() * 2); std::cout << "returned: " << temp.size() << std::endl; - TEST_CHECK(temp.size() == nodes.size()); - - std::generate(tmp.begin(), tmp.end(), &std::rand); - table.find_node(tmp, temp, routing_table::include_self, nodes.size() + 1); - std::cout << "returned: " << temp.size() << std::endl; - TEST_CHECK(temp.size() == nodes.size() + 1); + TEST_EQUAL(temp.size(), nodes.size()); std::generate(tmp.begin(), tmp.end(), &std::rand); table.find_node(tmp, temp, 0, 7); std::cout << "returned: " << temp.size() << std::endl; - TEST_CHECK(temp.size() == 7); + TEST_EQUAL(temp.size(), 7); std::sort(nodes.begin(), nodes.end(), bind(&compare_ref , bind(&node_entry::id, _1) @@ -1073,7 +1083,7 @@ int test_main() std::generate(tmp.begin(), tmp.end(), &std::rand); table.find_node(tmp, temp, 0, 15); std::cout << "returned: " << temp.size() << std::endl; - TEST_CHECK(temp.size() == (std::min)(15, int(nodes.size()))); + TEST_EQUAL(temp.size(), (std::min)(15, int(nodes.size()))); std::sort(nodes.begin(), nodes.end(), bind(&compare_ref , bind(&node_entry::id, _1)