From 34af25beaaf0fdeb7e4448484bf472863f4ea9fb Mon Sep 17 00:00:00 2001 From: Arvid Norberg Date: Sat, 1 Nov 2014 22:47:56 +0000 Subject: [PATCH] merged chagnes from RC_1_0 --- ChangeLog | 2 + examples/client_test.cpp | 4 +- include/libtorrent/kademlia/node.hpp | 5 +- include/libtorrent/kademlia/node_entry.hpp | 21 +- include/libtorrent/kademlia/routing_table.hpp | 10 +- .../kademlia/traversal_algorithm.hpp | 2 + include/libtorrent/session_status.hpp | 2 + src/kademlia/node.cpp | 91 +++++++- src/kademlia/routing_table.cpp | 206 +++++++++--------- src/kademlia/traversal_algorithm.cpp | 2 - src/session_impl.cpp | 6 +- 11 files changed, 222 insertions(+), 129 deletions(-) diff --git a/ChangeLog b/ChangeLog index 65692cfed..374fb8b7c 100644 --- a/ChangeLog +++ b/ChangeLog @@ -35,6 +35,8 @@ 1.0.3 release + * improve DHT maintanence performance (by pinging instead of full lookups) + * fix bug in DHT routing table node-id prefix optimization * fix incorrect behavior of flag_use_resume_save_path * fix protocol race-condition in super seeding mode * support read-only DHT nodes diff --git a/examples/client_test.cpp b/examples/client_test.cpp index 5c336b828..a18a2e8b9 100644 --- a/examples/client_test.cpp +++ b/examples/client_test.cpp @@ -1748,8 +1748,8 @@ int main(int argc, char* argv[]) , end(sess_stat.dht_routing_table.end()); i != end; ++i, ++bucket) { snprintf(str, sizeof(str) - , "%3d [%2d, %2d] active: %d\n" - , bucket, i->num_nodes, i->num_replacements, i->last_active); + , "%3d [%3d, %d]\n" + , bucket, i->num_nodes, i->num_replacements); out += str; } diff --git a/include/libtorrent/kademlia/node.hpp b/include/libtorrent/kademlia/node.hpp index 795645bf8..992d51955 100644 --- a/include/libtorrent/kademlia/node.hpp +++ b/include/libtorrent/kademlia/node.hpp @@ -204,7 +204,6 @@ public: virtual ~node_impl() {} void tick(); - void refresh(node_id const& id, find_data::nodes_callback const& f); void bootstrap(std::vector const& nodes , find_data::nodes_callback const& f); void add_router_node(udp::endpoint router); @@ -317,6 +316,10 @@ private: ptime m_last_tracker_tick; + // the last time we issued a bootstrap or a refresh on our own ID, to expand + // the routing table buckets close to us. + ptime m_last_self_refresh; + // secret random numbers used to create write tokens int m_secret[2]; diff --git a/include/libtorrent/kademlia/node_entry.hpp b/include/libtorrent/kademlia/node_entry.hpp index b07dc8e1a..800f900dd 100644 --- a/include/libtorrent/kademlia/node_entry.hpp +++ b/include/libtorrent/kademlia/node_entry.hpp @@ -37,6 +37,7 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/socket.hpp" #include "libtorrent/address.hpp" #include "libtorrent/union_endpoint.hpp" +#include "libtorrent/time.hpp" // for time_now() #ifdef TORRENT_DHT_VERBOSE_LOGGING #include "libtorrent/time.hpp" @@ -47,8 +48,10 @@ namespace libtorrent { namespace dht struct node_entry { - node_entry(node_id const& id_, udp::endpoint ep, int roundtriptime = 0xffff, bool pinged = false) - : id(id_) + node_entry(node_id const& id_, udp::endpoint ep, int roundtriptime = 0xffff + , bool pinged = false) + : last_queried(pinged ? time_now() : min_time()) + , id(id_) , endpoint(ep) , rtt(roundtriptime & 0xffff) , timeout_count(pinged ? 0 : 0xff) @@ -59,7 +62,8 @@ struct node_entry } node_entry(udp::endpoint ep) - : id(0) + : last_queried(min_time()) + , id(0) , endpoint(ep) , rtt(0xffff) , timeout_count(0xff) @@ -70,7 +74,8 @@ struct node_entry } node_entry() - : id(0) + : last_queried(min_time()) + , id(0) , rtt(0xffff) , timeout_count(0xff) { @@ -88,8 +93,11 @@ struct node_entry bool confirmed() const { return timeout_count == 0; } void update_rtt(int new_rtt) { + TORRENT_ASSERT(new_rtt <= 0xffff); + TORRENT_ASSERT(new_rtt >= 0); + if (new_rtt == 0xffff) return; if (rtt == 0xffff) rtt = new_rtt; - else rtt = int(rtt) / 3 + int(new_rtt) * 2 / 3; + else rtt = int(rtt) * 2 / 3 + int(new_rtt) / 3; } address addr() const { return endpoint.address(); } int port() const { return endpoint.port; } @@ -98,6 +106,9 @@ struct node_entry ptime first_seen; #endif + // the time we last received a response for a request to this peer + ptime last_queried; + node_id id; union_endpoint endpoint; diff --git a/include/libtorrent/kademlia/routing_table.hpp b/include/libtorrent/kademlia/routing_table.hpp index 9938eaed8..6bc1b008f 100644 --- a/include/libtorrent/kademlia/routing_table.hpp +++ b/include/libtorrent/kademlia/routing_table.hpp @@ -70,7 +70,6 @@ struct routing_table_node { bucket_t replacements; bucket_t live_nodes; - ptime last_active; }; // differences in the implementation from the description in @@ -117,15 +116,14 @@ public: // the node will be ignored. void heard_about(node_id const& id, udp::endpoint const& ep); - // if any bucket in the routing table needs to be refreshed - // this function will return true and set the target to an - // appropriate target inside that bucket - bool need_refresh(node_id& target) const; + node_entry const* next_refresh(node_id& target); enum { + // nodes that have not been pinged are considered failed by this flag include_failed = 1 }; + // fills the vector with the count nodes from our buckets that // are nearest to the given id. void find_node(node_id const& id, std::vector& l @@ -169,8 +167,6 @@ public: void print_state(std::ostream& os) const; #endif - void touch_bucket(node_id const& target); - int bucket_limit(int bucket) const; #if TORRENT_USE_INVARIANT_CHECKS diff --git a/include/libtorrent/kademlia/traversal_algorithm.hpp b/include/libtorrent/kademlia/traversal_algorithm.hpp index fd0e34eac..996635f68 100644 --- a/include/libtorrent/kademlia/traversal_algorithm.hpp +++ b/include/libtorrent/kademlia/traversal_algorithm.hpp @@ -84,6 +84,8 @@ struct traversal_algorithm : boost::noncopyable int invoke_count() const { return m_invoke_count; } int branch_factor() const { return m_branch_factor; } + node_impl& node() const { return m_node; } + protected: // returns true if we're done diff --git a/include/libtorrent/session_status.hpp b/include/libtorrent/session_status.hpp index 09884acc1..9783247a4 100644 --- a/include/libtorrent/session_status.hpp +++ b/include/libtorrent/session_status.hpp @@ -93,8 +93,10 @@ namespace libtorrent int num_nodes; int num_replacements; +#ifndef TORRENT_NO_DEPRECATE // number of seconds since last activity int last_active; +#endif }; // holds counters and gauges for the uTP sockets diff --git a/src/kademlia/node.cpp b/src/kademlia/node.cpp index 1261e8042..0647d2f06 100644 --- a/src/kademlia/node.cpp +++ b/src/kademlia/node.cpp @@ -107,6 +107,7 @@ node_impl::node_impl(alert_dispatcher* alert_disp , m_rpc(m_id, m_table, sock) , m_observer(observer) , m_last_tracker_tick(time_now()) + , m_last_self_refresh(min_time()) , m_post_alert(alert_disp) , m_sock(sock) , m_counters(cnt) @@ -174,17 +175,11 @@ std::string node_impl::generate_token(udp::endpoint const& addr, char const* inf return token; } -void node_impl::refresh(node_id const& id - , find_data::nodes_callback const& f) -{ - boost::intrusive_ptr r(new dht::refresh(*this, id, f)); - r->start(); -} - void node_impl::bootstrap(std::vector const& nodes , find_data::nodes_callback const& f) { - boost::intrusive_ptr r(new dht::bootstrap(*this, m_id, f)); + boost::intrusive_ptr r(new dht::bootstrap(*this, m_id, f)); + m_last_self_refresh = time_now(); #ifdef TORRENT_DHT_VERBOSE_LOGGING int count = 0; @@ -272,8 +267,7 @@ void node_impl::incoming(msg const& m) case 'r': { node_id id; - if (m_rpc.incoming(m, &id, m_settings)) - refresh(id, boost::bind(&nop)); + m_rpc.incoming(m, &id, m_settings); break; } case 'q': @@ -426,12 +420,85 @@ void node_impl::get_item(char const* pk, std::string const& salt ta->start(); } +struct ping_observer : observer +{ + ping_observer( + boost::intrusive_ptr const& algorithm + , udp::endpoint const& ep, node_id const& id) + : observer(algorithm, ep, id) + {} + + // parses out "nodes" + virtual void reply(msg const& m) + { + flags |= flag_done; + + lazy_entry const* r = m.message.dict_find_dict("r"); + if (!r) + { +#ifdef TORRENT_DHT_VERBOSE_LOGGING + TORRENT_LOG(traversal) << "[" << m_algorithm.get() + << "] missing response dict"; +#endif + return; + } + + // look for nodes + lazy_entry const* n = r->dict_find_string("nodes"); + if (n) + { + char const* nodes = n->string_ptr(); + char const* end = nodes + n->string_length(); + + while (end - nodes >= 26) + { + node_id id; + std::copy(nodes, nodes + 20, id.begin()); + nodes += 20; + m_algorithm.get()->node().m_table.heard_about(id + , detail::read_v4_endpoint(nodes)); + } + } + } +}; + void node_impl::tick() { + // every now and then we refresh our own ID, just to keep + // expanding the routing table buckets closer to us. + ptime now = time_now(); + if (m_last_self_refresh + minutes(10) < now) + { + boost::intrusive_ptr r(new dht::refresh(*this, m_id + , boost::bind(&nop))); + r->start(); + m_last_self_refresh = now; + return; + } + node_id target; - if (m_table.need_refresh(target)) - refresh(target, boost::bind(&nop)); + node_entry const* ne = m_table.next_refresh(target); + if (ne == NULL) return; + + void* ptr = m_rpc.allocate_observer(); + if (ptr == 0) return; + + // create a dummy traversal_algorithm + // this is unfortunately necessary for the observer + // to free itself from the pool when it's being released + boost::intrusive_ptr algo( + new traversal_algorithm(*this, (node_id::min)())); + observer_ptr o(new (ptr) ping_observer(algo, ne->ep(), ne->id)); +#if defined TORRENT_DEBUG || TORRENT_RELEASE_ASSERTS + o->m_in_constructor = false; +#endif + entry e; + e["y"] = "q"; + e["q"] = "find_node"; + entry& a = e["a"]; + a["target"] = target.to_string(); + m_rpc.invoke(e, ne->ep(), o); } time_duration node_impl::connection_timeout() diff --git a/src/kademlia/routing_table.cpp b/src/kademlia/routing_table.cpp index 1b9e9c3d3..02fa3feab 100644 --- a/src/kademlia/routing_table.cpp +++ b/src/kademlia/routing_table.cpp @@ -100,18 +100,15 @@ void routing_table::status(session_status& s) const boost::tie(s.dht_nodes, s.dht_node_cache) = size(); s.dht_global_nodes = num_global_nodes(); - ptime now = time_now(); - for (table_t::const_iterator i = m_buckets.begin() , end(m_buckets.end()); i != end; ++i) { dht_routing_bucket b; b.num_nodes = i->live_nodes.size(); b.num_replacements = i->replacements.size(); - if (i->last_active.time_since_epoch().count() < 0) - b.last_active = INT_MAX; - else - b.last_active = int(total_seconds(now - i->last_active)); +#ifndef TORRENT_NO_DEPRECATE + b.last_active = 0; +#endif s.dht_routing_table.push_back(b); } } @@ -213,21 +210,25 @@ void routing_table::print_state(std::ostream& os) const os << "-"; os << "\n\n"; + ptime now = time_now(); + os << "nodes:\n"; int bucket_index = 0; for (table_t::const_iterator i = m_buckets.begin(), end(m_buckets.end()); i != end; ++i, ++bucket_index) { -// if (i->live_nodes.empty()) continue; - os << "=== BUCKET == " << bucket_index - << " == " << i->live_nodes.size() << "|" << i->replacements.size(); + os << "\n=== BUCKET == " << bucket_index + << " == " << i->live_nodes.size() << "|" << i->replacements.size() + << " ===== \n"; - if (i->last_active < min_time() + seconds(161)) - os << " == -"; + int id_shift; + // the last bucket is special, since it hasn't been split yet, it + // includes that top bit as well + if (bucket_index + 1 == m_buckets.size()) + id_shift = bucket_index; else - os << " == " << total_seconds(time_now() - i->last_active); + id_shift = bucket_index + 1; - os << " seconds ago ===== \n"; for (bucket_t::const_iterator j = i->live_nodes.begin() , end(i->live_nodes.end()); j != end; ++j) { @@ -243,14 +244,25 @@ void routing_table::print_state(std::ostream& os) const top_mask = (0xff << mask_shift) & 0xff; node_id id = j->id; - id <<= bucket_index + 1; - os << " prefix: " << ((id[0] & top_mask) >> mask_shift) - << " id: " << j->id - << " rtt: " << j->rtt - << " ip: " << j->ep() - << " fails: " << j->fail_count() - << " pinged: " << j->pinged() - << " dist: " << distance_exp(m_id, j->id) + id <<= id_shift; + + os << " prefx: " << std::setw(2) << std::hex << ((id[0] & top_mask) >> mask_shift) << std::dec + << " id: " << j->id; + if (j->rtt == 0xffff) + os << " rtt: "; + else + os << " rtt: " << std::setw(4) << j->rtt; + + os << " fail: " << j->fail_count() + << " ping: " << j->pinged() + << " dist: " << std::setw(3) << distance_exp(m_id, j->id); + + if (j->last_queried == min_time()) + os << " query: "; + else + os << " query: " << std::setw(3) << total_seconds(now - j->last_queried); + + os << " ip: " << j->ep() << "\n"; } } @@ -280,12 +292,20 @@ void routing_table::print_state(std::ostream& os) const TORRENT_ASSERT_VAL(bucket_size_limit <= 256, bucket_size_limit); bool sub_buckets[256]; memset(sub_buckets, 0, sizeof(sub_buckets)); + + int id_shift; + // the last bucket is special, since it hasn't been split yet, it + // includes that top bit as well + if (bucket_index + 1 == m_buckets.size()) + id_shift = bucket_index; + else + id_shift = bucket_index + 1; for (bucket_t::const_iterator j = i->live_nodes.begin() , end(i->live_nodes.end()); j != end; ++j) { node_id id = j->id; - id <<= bucket_index + 1; + id <<= id_shift; int b = (id[0] & top_mask) >> mask_shift; TORRENT_ASSERT(b >= 0 && b < int(sizeof(sub_buckets)/sizeof(sub_buckets[0]))); sub_buckets[b] = true; @@ -299,72 +319,60 @@ void routing_table::print_state(std::ostream& os) const #endif -void routing_table::touch_bucket(node_id const& target) +node_entry const* routing_table::next_refresh(node_id& target) { - table_t::iterator i = find_bucket(target); - i->last_active = time_now(); -} + // find the node with the least recent 'last_queried' field. if it's too + // recent, return false. Otherwise return a random target ID that's close to + // a missing prefix for that bucket -// returns true if lhs is in more need of a refresh than rhs -bool compare_bucket_refresh(routing_table_node const& lhs, routing_table_node const& rhs) -{ - // add the number of nodes to prioritize buckets with few nodes in them - return lhs.last_active + seconds(lhs.live_nodes.size() * 5) - < rhs.last_active + seconds(rhs.live_nodes.size() * 5); -} + node_entry* candidate = NULL; + int bucket_idx = -1; -// TODO: instad of refreshing a bucket by using find_nodes, -// ping each node periodically -bool routing_table::need_refresh(node_id& target) const -{ - INVARIANT_CHECK; - - ptime now = time_now(); - - // refresh our own bucket once every 15 minutes - if (now - minutes(15) > m_last_self_refresh) + // this will have a bias towards pinging nodes close to us first. + int idx = m_buckets.size() - 1; + for (table_t::reverse_iterator i = m_buckets.rbegin() + , end(m_buckets.rend()); i != end; ++i, --idx) { - m_last_self_refresh = now; - target = m_id; -#ifdef TORRENT_DHT_VERBOSE_LOGGING - TORRENT_LOG(table) << "need_refresh [ bucket: self target: " << target << " ]"; -#endif - return true; + for (bucket_t::iterator j = i->live_nodes.begin() + , end(i->live_nodes.end()); j != end; ++j) + { + if (j->last_queried == min_time()) + { + bucket_idx = idx; + candidate = &*j; + goto out; + } + + if (candidate == NULL || j->last_queried < candidate->last_queried) + { + candidate = &*j; + bucket_idx = idx; + } + } + } +out: + + // make sure we don't pick the same node again next time we want to refresh + // the routing table + if (candidate) + { + candidate->last_queried = time_now(); + + // generate a random node_id within the given bucket + // TODO: 2 it would be nice to have a bias towards node-id prefixes that + // are missing in the bucket + target = generate_random_id(); + int num_bits = bucket_idx + 1; + node_id mask = generate_prefix_mask(num_bits); + + // target = (target & ~mask) | (root & mask) + node_id root = m_id; + root &= mask; + target &= ~mask; + target |= root; } - if (m_buckets.empty()) return false; - - table_t::const_iterator i = std::min_element(m_buckets.begin(), m_buckets.end() - , &compare_bucket_refresh); - - if (now - minutes(15) < i->last_active) return false; - if (now - seconds(45) < m_last_refresh) return false; - - // generate a random node_id within the given bucket - target = generate_random_id(); - int num_bits = std::distance(m_buckets.begin(), i) + 1; - node_id mask = generate_prefix_mask(num_bits); - - // target = (target & ~mask) | (root & mask) - node_id root = m_id; - root &= mask; - target &= ~mask; - target |= root; - - // make sure this is in another subtree than m_id - // clear the (num_bits - 1) bit and then set it to the - // inverse of m_id's corresponding bit. - target[(num_bits - 1) / 8] &= ~(0x80 >> ((num_bits - 1) % 8)); - target[(num_bits - 1) / 8] |= - (~(m_id[(num_bits - 1) / 8])) & (0x80 >> ((num_bits - 1) % 8)); - - TORRENT_ASSERT(distance_exp(m_id, target) == 160 - num_bits); - -#ifdef TORRENT_DHT_VERBOSE_LOGGING - TORRENT_LOG(table) << "need_refresh [ bucket: " << num_bits << " target: " << target << " ]"; -#endif - m_last_refresh = now; - return true; + return candidate; } void routing_table::replacement_cache(bucket_t& nodes) const @@ -385,8 +393,6 @@ routing_table::table_t::iterator routing_table::find_bucket(node_id const& id) if (num_buckets == 0) { m_buckets.push_back(routing_table_node()); - // add 160 seconds to prioritize higher buckets (i.e. buckets closer to us) - m_buckets.back().last_active = min_time() + seconds(160); ++num_buckets; } @@ -509,6 +515,7 @@ bool routing_table::add_node(node_entry e) // and be done with it existing->timeout_count = 0; existing->update_rtt(e.rtt); + existing->last_queried = e.last_queried; return ret; } else if (existing) @@ -680,7 +687,12 @@ bool routing_table::add_node(node_entry e) mask = (0xff << mask_shift) & 0xff; node_id id = e.id; - id <<= bucket_index + 1; + // the last bucket is special, since it hasn't been split yet, it + // includes that top bit as well + if (bucket_index + 1 == m_buckets.size()) + id <<= bucket_index; + else + id <<= bucket_index + 1; // pick out all nodes that have the same prefix as the new node std::vector nodes; @@ -763,7 +775,8 @@ bool routing_table::add_node(node_entry e) *j = e; m_ips.insert(e.addr().to_v4().to_bytes()); #ifdef TORRENT_DHT_VERBOSE_LOGGING - TORRENT_LOG(table) << "replacing node with higher RTT: " << e.id << " " << e.addr(); + TORRENT_LOG(table) << "replacing node with higher RTT: " << e.id + << " " << e.addr(); #endif return ret; } @@ -823,6 +836,8 @@ bool routing_table::add_node(node_entry e) nb.push_back(e); else if (int(nrb.size()) < m_bucket_size) nrb.push_back(e); + else + nb.push_back(e); // trigger another split m_ips.insert(e.addr().to_v4().to_bytes()); @@ -842,9 +857,6 @@ void routing_table::split_bucket() // this is the last bucket, and it's full already. Split // it by adding another bucket m_buckets.push_back(routing_table_node()); - // the extra seconds added to the end is to prioritize - // buckets closer to us when refreshing - m_buckets.back().last_active = min_time() + seconds(160 - m_buckets.size()); bucket_t& new_bucket = m_buckets.back().live_nodes; bucket_t& new_replacement_bucket = m_buckets.back().replacements; @@ -980,20 +992,18 @@ void routing_table::add_router_node(udp::endpoint router) m_router_nodes.insert(router); } -// we heard from this node, but we don't know if it -// was spoofed or not (i.e. pinged == false) +// we heard from this node, but we don't know if it was spoofed or not (i.e. +// pinged == false) void routing_table::heard_about(node_id const& id, udp::endpoint const& ep) { add_node(node_entry(id, ep)); } -// this function is called every time the node sees -// a sign of a node being alive. This node will either -// be inserted in the k-buckets or be moved to the top -// of its bucket. -// the return value indicates if the table needs a refresh. -// if true, the node should refresh the table (i.e. do a find_node -// on its own id) +// this function is called every time the node sees a sign of a node being +// alive. This node will either be inserted in the k-buckets or be moved to the +// top of its bucket. the return value indicates if the table needs a refresh. +// if true, the node should refresh the table (i.e. do a find_node on its own +// id) bool routing_table::node_seen(node_id const& id, udp::endpoint ep, int rtt) { return add_node(node_entry(id, ep, rtt, true)); diff --git a/src/kademlia/traversal_algorithm.cpp b/src/kademlia/traversal_algorithm.cpp index 0e14796a6..0fc618dcb 100644 --- a/src/kademlia/traversal_algorithm.cpp +++ b/src/kademlia/traversal_algorithm.cpp @@ -476,8 +476,6 @@ void traversal_algorithm::add_router_entries() void traversal_algorithm::init() { - // update the last activity of this bucket - m_node.m_table.touch_bucket(m_target); m_branch_factor = m_node.branch_factor(); m_node.add_traversal_algorithm(this); } diff --git a/src/session_impl.cpp b/src/session_impl.cpp index 8ccccb461..ef88d6dde 100644 --- a/src/session_impl.cpp +++ b/src/session_impl.cpp @@ -3304,8 +3304,10 @@ retry: if (tor) session_log("prioritizing DHT announce: \"%s\"", tor->name().c_str()); #endif - // trigger a DHT announce right away if we just - // added a new torrent and there's no back-log + // trigger a DHT announce right away if we just added a new torrent and + // there's no back-log. in the timer handler, as long as there are more + // high priority torrents to be announced to the DHT, it will keep the + // timer interval short until all torrents have been announced. if (m_dht_torrents.size() == 1) { #if defined TORRENT_ASIO_DEBUGGING