From 3742fd2699e29b2cdc242601d51849c1ac9ec8df Mon Sep 17 00:00:00 2001 From: Arvid Norberg Date: Sun, 2 Nov 2014 09:41:29 +0000 Subject: [PATCH] merged changes from RC_1_0 --- examples/client_test.cpp | 11 +- include/libtorrent/kademlia/node.hpp | 4 +- include/libtorrent/kademlia/node_entry.hpp | 16 ++- include/libtorrent/kademlia/routing_table.hpp | 5 +- src/kademlia/node.cpp | 55 +++++---- src/kademlia/refresh.cpp | 8 +- src/kademlia/routing_table.cpp | 113 ++++++++---------- src/kademlia/traversal_algorithm.cpp | 14 ++- test/test_dht.cpp | 31 +++-- 9 files changed, 144 insertions(+), 113 deletions(-) diff --git a/examples/client_test.cpp b/examples/client_test.cpp index a18a2e8b9..546fde1ed 100644 --- a/examples/client_test.cpp +++ b/examples/client_test.cpp @@ -1747,9 +1747,16 @@ int main(int argc, char* argv[]) for (std::vector::iterator i = sess_stat.dht_routing_table.begin() , end(sess_stat.dht_routing_table.end()); i != end; ++i, ++bucket) { + char const* progress_bar = + "################################" + "################################" + "################################" + "################################"; snprintf(str, sizeof(str) - , "%3d [%3d, %d]\n" - , bucket, i->num_nodes, i->num_replacements); + , "%3d [%3d, %d] %s%s\n" + , bucket, i->num_nodes, i->num_replacements + , progress_bar + (128 - i->num_nodes) + , "--------" + (8 - i->num_replacements)); out += str; } diff --git a/include/libtorrent/kademlia/node.hpp b/include/libtorrent/kademlia/node.hpp index 992d51955..31140a70e 100644 --- a/include/libtorrent/kademlia/node.hpp +++ b/include/libtorrent/kademlia/node.hpp @@ -223,7 +223,7 @@ public: node_id const& nid() const { return m_id; } - boost::tuple size() const{ return m_table.size(); } + boost::tuple size() const { return m_table.size(); } size_type num_global_nodes() const { return m_table.num_global_nodes(); } @@ -284,6 +284,8 @@ public: protected: + void send_single_refresh(udp::endpoint const& ep, int bucket + , node_id const& id = node_id()); void lookup_peers(sha1_hash const& info_hash, entry& reply , bool noseed, bool scrape) const; bool lookup_torrents(sha1_hash const& target, entry& reply diff --git a/include/libtorrent/kademlia/node_entry.hpp b/include/libtorrent/kademlia/node_entry.hpp index 800f900dd..6bc5acada 100644 --- a/include/libtorrent/kademlia/node_entry.hpp +++ b/include/libtorrent/kademlia/node_entry.hpp @@ -52,7 +52,8 @@ struct node_entry , bool pinged = false) : last_queried(pinged ? time_now() : min_time()) , id(id_) - , endpoint(ep) + , a(ep.address().to_v4().to_bytes()) + , p(ep.port()) , rtt(roundtriptime & 0xffff) , timeout_count(pinged ? 0 : 0xff) { @@ -64,7 +65,8 @@ struct node_entry node_entry(udp::endpoint ep) : last_queried(min_time()) , id(0) - , endpoint(ep) + , a(ep.address().to_v4().to_bytes()) + , p(ep.port()) , rtt(0xffff) , timeout_count(0xff) { @@ -76,6 +78,7 @@ struct node_entry node_entry() : last_queried(min_time()) , id(0) + , p(0) , rtt(0xffff) , timeout_count(0xff) { @@ -89,7 +92,7 @@ struct node_entry void timed_out() { if (pinged() && timeout_count < 0xfe) ++timeout_count; } int fail_count() const { return pinged() ? timeout_count : 0; } void reset_fail_count() { if (pinged()) timeout_count = 0; } - udp::endpoint ep() const { return udp::endpoint(endpoint); } + udp::endpoint ep() const { return udp::endpoint(address_v4(a), p); } bool confirmed() const { return timeout_count == 0; } void update_rtt(int new_rtt) { @@ -99,8 +102,8 @@ struct node_entry if (rtt == 0xffff) rtt = new_rtt; else rtt = int(rtt) * 2 / 3 + int(new_rtt) / 3; } - address addr() const { return endpoint.address(); } - int port() const { return endpoint.port; } + address addr() const { return address_v4(a); } + int port() const { return p; } #ifdef TORRENT_DHT_VERBOSE_LOGGING ptime first_seen; @@ -111,7 +114,8 @@ struct node_entry node_id id; - union_endpoint endpoint; + address_v4::bytes_type a; + boost::uint16_t p; // the average RTT of this node boost::uint16_t rtt; diff --git a/include/libtorrent/kademlia/routing_table.hpp b/include/libtorrent/kademlia/routing_table.hpp index 6bc1b008f..476d083ad 100644 --- a/include/libtorrent/kademlia/routing_table.hpp +++ b/include/libtorrent/kademlia/routing_table.hpp @@ -39,6 +39,7 @@ POSSIBILITY OF SUCH DAMAGE. #include #include #include +#include #include #include @@ -82,7 +83,7 @@ struct routing_table_node // bucket has failed, then it is put in the replacement // cache (just like in the paper). -class TORRENT_EXTRA_EXPORT routing_table +class TORRENT_EXTRA_EXPORT routing_table : boost::noncopyable { public: typedef std::vector table_t; @@ -116,7 +117,7 @@ public: // the node will be ignored. void heard_about(node_id const& id, udp::endpoint const& ep); - node_entry const* next_refresh(node_id& target); + node_entry const* next_refresh(); enum { diff --git a/src/kademlia/node.cpp b/src/kademlia/node.cpp index 0647d2f06..044d3dcad 100644 --- a/src/kademlia/node.cpp +++ b/src/kademlia/node.cpp @@ -350,23 +350,7 @@ void node_impl::add_node(udp::endpoint node) { // ping the node, and if we get a reply, it // will be added to the routing table - void* ptr = m_rpc.allocate_observer(); - if (ptr == 0) return; - - // create a dummy traversal_algorithm - // this is unfortunately necessary for the observer - // to free itself from the pool when it's being released - boost::intrusive_ptr algo( - new traversal_algorithm(*this, (node_id::min)())); - observer_ptr o(new (ptr) null_observer(algo, node, node_id(0))); -#if defined TORRENT_DEBUG || TORRENT_RELEASE_ASSERTS - o->m_in_constructor = false; -#endif - entry e; - e["y"] = "q"; - e["q"] = "ping"; - m_counters.inc_stats_counter(counters::dht_get_peers_out); - m_rpc.invoke(e, node, o); + send_single_refresh(node, m_table.num_active_buckets()); } void node_impl::announce(sha1_hash const& info_hash, int listen_port, int flags @@ -477,28 +461,53 @@ void node_impl::tick() return; } - node_id target; - node_entry const* ne = m_table.next_refresh(target); + node_entry const* ne = m_table.next_refresh(); if (ne == NULL) return; + int bucket = 159 - distance_exp(m_id, ne->id); + send_single_refresh(ne->ep(), bucket, ne->id); +} + +void node_impl::send_single_refresh(udp::endpoint const& ep, int bucket + , node_id const& id) +{ void* ptr = m_rpc.allocate_observer(); if (ptr == 0) return; + // generate a random node_id within the given bucket + // TODO: 2 it would be nice to have a bias towards node-id prefixes that + // are missing in the bucket + node_id target = generate_random_id(); + node_id mask = generate_prefix_mask(bucket + 1); + + // target = (target & ~mask) | (root & mask) + node_id root = m_id; + root &= mask; + target &= ~mask; + target |= root; + // create a dummy traversal_algorithm // this is unfortunately necessary for the observer // to free itself from the pool when it's being released boost::intrusive_ptr algo( new traversal_algorithm(*this, (node_id::min)())); - observer_ptr o(new (ptr) ping_observer(algo, ne->ep(), ne->id)); + observer_ptr o(new (ptr) ping_observer(algo, ep, id)); #if defined TORRENT_DEBUG || TORRENT_RELEASE_ASSERTS o->m_in_constructor = false; #endif entry e; e["y"] = "q"; - e["q"] = "find_node"; entry& a = e["a"]; - a["target"] = target.to_string(); - m_rpc.invoke(e, ne->ep(), o); + + // use get_peers instead of find_node. We'll get nodes in the response + // either way. + e["q"] = "get_peers"; + a["info_hash"] = target.to_string(); + m_counters.inc_stats_counter(counters::dht_get_peers_out); + +// e["q"] = "find_node"; +// a["target"] = target.to_string(); + m_rpc.invoke(e, ep, o); } time_duration node_impl::connection_timeout() diff --git a/src/kademlia/refresh.cpp b/src/kademlia/refresh.cpp index 09a81b16e..06c58e7c6 100644 --- a/src/kademlia/refresh.cpp +++ b/src/kademlia/refresh.cpp @@ -71,9 +71,13 @@ bool refresh::invoke(observer_ptr o) { entry e; e["y"] = "q"; - e["q"] = "find_node"; entry& a = e["a"]; - a["target"] = target().to_string(); + + e["q"] = "get_peers"; + a["info_hash"] = target().to_string(); + +// e["q"] = "find_node"; +// a["target"] = target().to_string(); m_node.stats_counters().inc_stats_counter(counters::dht_find_node_out); return m_node.m_rpc.invoke(e, o->target_ep(), o); } diff --git a/src/kademlia/routing_table.cpp b/src/kademlia/routing_table.cpp index 02fa3feab..602b8400b 100644 --- a/src/kademlia/routing_table.cpp +++ b/src/kademlia/routing_table.cpp @@ -178,41 +178,24 @@ void routing_table::print_state(std::ostream& os) const << "global node count: " << num_global_nodes() << "\n" << "node_id: " << m_id << "\n\n"; - os << "number of nodes per bucket:\n-- live "; - for (int i = 8; i < 160; ++i) - os << "-"; - os << "\n"; + os << "number of nodes per bucket:\n"; - int max_size = bucket_limit(0); - for (int k = 0; k < max_size; ++k) + int idx = 0; + + for (table_t::const_iterator i = m_buckets.begin(), end(m_buckets.end()); + i != end; ++i, ++idx) { - for (table_t::const_iterator i = m_buckets.begin(), end(m_buckets.end()); - i != end; ++i) - { - os << (int(i->live_nodes.size()) > (max_size - 1 - k) ? "|" : " "); - } + os << std::setw(2) << idx << ": "; + for (int k = 0; k < int(i->live_nodes.size()); ++k) + os << "#"; + for (int k = 0; k < int(i->replacements.size()); ++k) + os << "-"; os << "\n"; } - for (int i = 0; i < 160; ++i) os << "+"; - os << "\n"; - - for (int k = 0; k < m_bucket_size; ++k) - { - for (table_t::const_iterator i = m_buckets.begin(), end(m_buckets.end()); - i != end; ++i) - { - os << (int(i->replacements.size()) > k ? "|" : " "); - } - os << "\n"; - } - os << "-- cached "; - for (int i = 10; i < 160; ++i) - os << "-"; - os << "\n\n"; ptime now = time_now(); - os << "nodes:\n"; + os << "\nnodes:"; int bucket_index = 0; for (table_t::const_iterator i = m_buckets.begin(), end(m_buckets.end()); i != end; ++i, ++bucket_index) @@ -267,7 +250,7 @@ void routing_table::print_state(std::ostream& os) const } } - os << "node spread per bucket:\n"; + os << "\nnode spread per bucket:\n"; bucket_index = 0; for (table_t::const_iterator i = m_buckets.begin(), end(m_buckets.end()); i != end; ++i, ++bucket_index) @@ -311,7 +294,9 @@ void routing_table::print_state(std::ostream& os) const sub_buckets[b] = true; } - os << bucket_index << " mask:" << (top_mask >> mask_shift) << ": ["; + os << std::dec << std::setw(2) << bucket_index << " mask: " << std::setw(2) + << std::hex << (top_mask >> mask_shift) << ": ["; + for (int i = 0; i < bucket_size_limit; ++i) os << (sub_buckets[i] ? "X" : " "); os << "]\n"; } @@ -319,7 +304,7 @@ void routing_table::print_state(std::ostream& os) const #endif -node_entry const* routing_table::next_refresh(node_id& target) +node_entry const* routing_table::next_refresh() { // find the node with the least recent 'last_queried' field. if it's too // recent, return false. Otherwise return a random target ID that's close to @@ -355,23 +340,8 @@ out: // make sure we don't pick the same node again next time we want to refresh // the routing table if (candidate) - { candidate->last_queried = time_now(); - // generate a random node_id within the given bucket - // TODO: 2 it would be nice to have a bias towards node-id prefixes that - // are missing in the bucket - target = generate_random_id(); - int num_bits = bucket_idx + 1; - node_id mask = generate_prefix_mask(num_bits); - - // target = (target & ~mask) | (root & mask) - node_id root = m_id; - root &= mask; - target &= ~mask; - target |= root; - } - return candidate; } @@ -454,8 +424,8 @@ void routing_table::remove_node(node_entry* n && n < &bucket->replacements[0] + bucket->replacements.size()) { int idx = n - &bucket->replacements[0]; - TORRENT_ASSERT(m_ips.count(n->endpoint.address().to_v4().to_bytes()) > 0); - erase_one(m_ips, n->endpoint.address().to_v4().to_bytes()); + TORRENT_ASSERT(m_ips.count(n->a) > 0); + erase_one(m_ips, n->a); bucket->replacements.erase(bucket->replacements.begin() + idx); } @@ -464,8 +434,8 @@ void routing_table::remove_node(node_entry* n && n < &bucket->live_nodes[0] + bucket->live_nodes.size()) { int idx = n - &bucket->live_nodes[0]; - TORRENT_ASSERT(m_ips.count(n->endpoint.address().to_v4().to_bytes()) > 0); - erase_one(m_ips, n->endpoint.address().to_v4().to_bytes()); + TORRENT_ASSERT(m_ips.count(n->a) > 0); + erase_one(m_ips, n->a); bucket->live_nodes.erase(bucket->live_nodes.begin() + idx); } } @@ -623,15 +593,13 @@ bool routing_table::add_node(node_entry e) // bit prefix has higher RTT than the new node, replace it. // can we split the bucket? - bool can_split = false; + // only nodes that haven't failed can split the bucket, and we can only + // split the last bucket + bool can_split = (boost::next(i) == m_buckets.end() && m_buckets.size() < 159) + && e.fail_count() == 0; if (e.pinged() && e.fail_count() == 0) { - // only nodes that are pinged and haven't failed - // can split the bucket, and we can only split - // the last bucket - can_split = (boost::next(i) == m_buckets.end() && m_buckets.size() < 159); - // if the node we're trying to insert is considered pinged, // we may replace other nodes that aren't pinged @@ -929,21 +897,40 @@ void routing_table::for_each_node( } } -void routing_table::node_failed(node_id const& id, udp::endpoint const& ep) +void routing_table::node_failed(node_id const& nid, udp::endpoint const& ep) { INVARIANT_CHECK; // if messages to ourself fails, ignore it - if (id == m_id) return; + if (nid == m_id) return; - table_t::iterator i = find_bucket(id); + table_t::iterator i = find_bucket(nid); bucket_t& b = i->live_nodes; bucket_t& rb = i->replacements; bucket_t::iterator j = std::find_if(b.begin(), b.end() - , boost::bind(&node_entry::id, _1) == id); + , boost::bind(&node_entry::id, _1) == nid); - if (j == b.end()) return; + if (j == b.end()) + { + j = std::find_if(rb.begin(), rb.end() + , boost::bind(&node_entry::id, _1) == nid); + + if (j == rb.end() + || j->ep() != ep) return; + + j->timed_out(); + +#ifdef TORRENT_DHT_VERBOSE_LOGGING + TORRENT_LOG(table) << " NODE FAILED" + " id: " << nid << + " ip: " << j->ep() << + " fails: " << j->fail_count() << + " pinged: " << j->pinged() << + " up-time: " << total_seconds(time_now() - j->first_seen); +#endif + return; + } // if the endpoint doesn't match, it's a different node // claiming the same ID. The node we have in our routing @@ -956,7 +943,7 @@ void routing_table::node_failed(node_id const& id, udp::endpoint const& ep) #ifdef TORRENT_DHT_VERBOSE_LOGGING TORRENT_LOG(table) << " NODE FAILED" - " id: " << id << + " id: " << nid << " ip: " << j->ep() << " fails: " << j->fail_count() << " pinged: " << j->pinged() << @@ -973,7 +960,7 @@ void routing_table::node_failed(node_id const& id, udp::endpoint const& ep) return; } - erase_one(m_ips, j->addr().to_v4().to_bytes()); + erase_one(m_ips, j->a); b.erase(j); // sort by RTT first, to find the node with the lowest diff --git a/src/kademlia/traversal_algorithm.cpp b/src/kademlia/traversal_algorithm.cpp index 0fc618dcb..6011df64f 100644 --- a/src/kademlia/traversal_algorithm.cpp +++ b/src/kademlia/traversal_algorithm.cpp @@ -245,6 +245,10 @@ void traversal_algorithm::traverse(node_id const& id, udp::endpoint addr) TORRENT_LOG(traversal) << time_now_string() << "[" << this << "] WARNING node returned a list which included a node with id 0"; } #endif + + // let the routing table know this node may exist + m_node.m_table.heard_about(id, addr); + add_entry(id, addr, 0); } @@ -282,6 +286,11 @@ void traversal_algorithm::failed(observer_ptr o, int flags) { TORRENT_ASSERT(m_invoke_count >= 0); + // don't tell the routing table about + // node ids that we just generated ourself + if ((o->flags & observer::flag_no_id) == 0) + m_node.m_table.node_failed(o->id(), o->target_ep()); + if (m_results.empty()) return; TORRENT_ASSERT(o->flags & observer::flag_queried); @@ -325,10 +334,7 @@ void traversal_algorithm::failed(observer_ptr o, int flags) << " type: " << name() ; #endif - // don't tell the routing table about - // node ids that we just generated ourself - if ((o->flags & observer::flag_no_id) == 0) - m_node.m_table.node_failed(o->id(), o->target_ep()); + ++m_timeouts; --m_invoke_count; TORRENT_ASSERT(m_invoke_count >= 0); diff --git a/test/test_dht.cpp b/test/test_dht.cpp index cd3b96a9c..31c23607d 100644 --- a/test/test_dht.cpp +++ b/test/test_dht.cpp @@ -308,7 +308,7 @@ void announce_immutable_items(node_impl& node, udp::endpoint const* eps { "y", lazy_entry::string_t, 1, 0}, }; - lazy_entry const* parsed[5]; + lazy_entry const* parsed[6]; char error_string[200]; // fprintf(stderr, "msg: %s\n", print_entry(response).c_str()); @@ -1375,7 +1375,8 @@ int test_main() {"q", lazy_entry::string_t, 9, 0}, {"a", lazy_entry::dict_t, 0, key_desc_t::parse_children}, {"id", lazy_entry::string_t, 20, 0}, - {"target", lazy_entry::string_t, 20, key_desc_t::last_child}, + {"target", lazy_entry::string_t, 20, key_desc_t::optional}, + {"info_hash", lazy_entry::string_t, 20, key_desc_t::optional | key_desc_t::last_child}, }; dht::key_desc_t get_peers_desc[] = { @@ -1398,6 +1399,7 @@ int test_main() // bootstrap + g_sent_packets.clear(); do { dht::node_impl node(&ad, &s, sett, node_id::min(), ext, 0, cnt); @@ -1412,12 +1414,16 @@ int test_main() TEST_EQUAL(g_sent_packets.front().first, initial_node); lazy_from_entry(g_sent_packets.front().second, response); - ret = verify_message(&response, find_node_desc, parsed, 6, error_string, sizeof(error_string)); + ret = verify_message(&response, find_node_desc, parsed, 7, error_string, sizeof(error_string)); if (ret) { TEST_EQUAL(parsed[0]->string_value(), "q"); - TEST_EQUAL(parsed[2]->string_value(), "find_node"); - if (parsed[0]->string_value() != "q" || parsed[2]->string_value() != "find_node") break; + TEST_CHECK(parsed[2]->string_value() == "find_node" + || parsed[2]->string_value() == "get_peers"); + + if (parsed[0]->string_value() != "q" + || (parsed[2]->string_value() != "find_node" + && parsed[2]->string_value() != "get_peers")) break; } else { @@ -1437,12 +1443,14 @@ int test_main() TEST_EQUAL(g_sent_packets.front().first, found_node); lazy_from_entry(g_sent_packets.front().second, response); - ret = verify_message(&response, find_node_desc, parsed, 6, error_string, sizeof(error_string)); + ret = verify_message(&response, find_node_desc, parsed, 7, error_string, sizeof(error_string)); if (ret) { TEST_EQUAL(parsed[0]->string_value(), "q"); - TEST_EQUAL(parsed[2]->string_value(), "find_node"); - if (parsed[0]->string_value() != "q" || parsed[2]->string_value() != "find_node") break; + TEST_CHECK(parsed[2]->string_value() == "find_node" + || parsed[2]->string_value() == "get_peers"); + if (parsed[0]->string_value() != "q" || (parsed[2]->string_value() != "find_node" + && parsed[2]->string_value() == "get_peers")) break; } else { @@ -1460,6 +1468,7 @@ int test_main() // get_peers + g_sent_packets.clear(); do { dht::node_id target = to_hash("1234876923549721020394873245098347598635"); @@ -1551,6 +1560,7 @@ int test_main() // immutable get + g_sent_packets.clear(); do { dht::node_impl node(&ad, &s, sett, node_id::min(), ext, 0, cnt); @@ -1595,6 +1605,7 @@ int test_main() // mutable get + g_sent_packets.clear(); do { dht::node_impl node(&ad, &s, sett, node_id::min(), ext, 0, cnt); @@ -1669,7 +1680,7 @@ int test_main() }; // immutable put - + g_sent_packets.clear(); do { dht::node_impl node(&ad, &s, sett, node_id::min(), ext, 0, cnt); @@ -1749,7 +1760,7 @@ int test_main() } while (false); // mutable put - + g_sent_packets.clear(); do { dht::node_impl node(&ad, &s, sett, node_id::min(), ext, 0, cnt);