From 00d02f7859eecab8e6744f9c827c5e50a5667ff5 Mon Sep 17 00:00:00 2001 From: Arvid Norberg Date: Tue, 23 Dec 2008 20:04:12 +0000 Subject: [PATCH] improved DHT get_peers code. get_peers always returns nodes now, and announce_peer is always sent to the k closest nodes to the info-hash. This should significantly speed up retrieving peers from the DHT --- docs/dht_extensions.rst | 10 +++ include/libtorrent/kademlia/dht_tracker.hpp | 3 +- include/libtorrent/kademlia/find_data.hpp | 23 ++++--- include/libtorrent/kademlia/node.hpp | 43 +----------- parse_dht_stats.py | 8 +-- src/kademlia/dht_tracker.cpp | 19 +++--- src/kademlia/find_data.cpp | 35 +++++++--- src/kademlia/node.cpp | 76 +++++++++++---------- src/kademlia/rpc_manager.cpp | 11 ++- 9 files changed, 116 insertions(+), 112 deletions(-) diff --git a/docs/dht_extensions.rst b/docs/dht_extensions.rst index 6d64a8c18..9761a7675 100644 --- a/docs/dht_extensions.rst +++ b/docs/dht_extensions.rst @@ -5,6 +5,16 @@ Mainline DHT extensions libtorrent implements a few extensions to the Mainline DHT protocol. +get_peers response +------------------ + +libtorrent always responds with ``nodes`` to a get_peers request. If it has +peers for the specified info-hash, it will return ``values`` as well. This is +because just because some peer announced to us, doesn't mean that we are +among the 8 closest nodes of the info hash. libtorrent also keeps traversing +nodes using get_peers until it has found the 8 closest ones, and then announces +to those nodes. + client identification --------------------- diff --git a/include/libtorrent/kademlia/dht_tracker.hpp b/include/libtorrent/kademlia/dht_tracker.hpp index 60b1a7b7f..74eae2b63 100644 --- a/include/libtorrent/kademlia/dht_tracker.hpp +++ b/include/libtorrent/kademlia/dht_tracker.hpp @@ -90,8 +90,7 @@ namespace libtorrent { namespace dht entry state() const; void announce(sha1_hash const& ih, int listen_port - , boost::function const& - , sha1_hash const&)> f); + , boost::function const&)> f); void dht_status(session_status& s); void network_stats(int& sent, int& received); diff --git a/include/libtorrent/kademlia/find_data.hpp b/include/libtorrent/kademlia/find_data.hpp index 5321ca1d1..37f62157b 100644 --- a/include/libtorrent/kademlia/find_data.hpp +++ b/include/libtorrent/kademlia/find_data.hpp @@ -34,6 +34,7 @@ POSSIBILITY OF SUCH DAMAGE. #define FIND_DATA_050323_HPP #include +#include #include #include @@ -58,22 +59,29 @@ class node_impl; class find_data : public traversal_algorithm { public: - typedef boost::function done_callback; + typedef boost::function const&)> data_callback; + typedef boost::function > const&)> nodes_callback; void got_data(msg const* m); + void got_write_token(node_id const& n, std::string const& write_token) + { m_write_tokens[n] = write_token; } find_data(node_impl& node, node_id target - , done_callback const& callback); + , data_callback const& dcallback + , nodes_callback const& ncallback); virtual char const* name() const { return "get_peers"; } + node_id const target() const { return m_target; } private: void done(); void invoke(node_id const& id, udp::endpoint addr); - done_callback m_done_callback; - boost::shared_ptr m_packet; + data_callback m_data_callback; + nodes_callback m_nodes_callback; + std::map m_write_tokens; + node_id const m_target; bool m_done; }; @@ -82,11 +90,9 @@ class find_data_observer : public observer public: find_data_observer( boost::intrusive_ptr const& algorithm - , node_id self - , node_id target) + , node_id self) : observer(algorithm->allocator()) , m_algorithm(algorithm) - , m_target(target) , m_self(self) {} ~find_data_observer(); @@ -95,7 +101,7 @@ public: { m.reply = false; m.message_id = messages::get_peers; - m.info_hash = m_target; + m.info_hash = m_algorithm->target(); } void timeout(); @@ -104,7 +110,6 @@ public: private: boost::intrusive_ptr m_algorithm; - node_id const m_target; node_id const m_self; }; diff --git a/include/libtorrent/kademlia/node.hpp b/include/libtorrent/kademlia/node.hpp index 9f4d3c62f..d385c3422 100644 --- a/include/libtorrent/kademlia/node.hpp +++ b/include/libtorrent/kademlia/node.hpp @@ -125,46 +125,6 @@ private: std::string m_token; }; -class get_peers_observer : public observer -{ -public: - get_peers_observer(sha1_hash const& info_hash - , int listen_port - , rpc_manager& rpc - , boost::function const&, sha1_hash const&)> f) - : observer(rpc.allocator()) - , m_info_hash(info_hash) - , m_listen_port(listen_port) - , m_rpc(rpc) - , m_fun(f) - {} - - void send(msg& m) - { - m.port = m_listen_port; - m.info_hash = m_info_hash; - } - - void timeout() {} - void reply(msg const& r) - { - observer_ptr o(new (m_rpc.allocator().malloc()) announce_observer( - m_rpc.allocator(), m_info_hash, m_listen_port, r.write_token)); -#ifdef TORRENT_DEBUG - o->m_in_constructor = false; -#endif - m_rpc.invoke(messages::announce_peer, r.addr, o); - m_fun(r.peers, m_info_hash); - } - void abort() {} - -private: - sha1_hash m_info_hash; - int m_listen_port; - rpc_manager& m_rpc; - boost::function const&, sha1_hash const&)> m_fun; -}; - class node_impl : boost::noncopyable { typedef std::map table_t; @@ -211,8 +171,7 @@ public: #endif void announce(sha1_hash const& info_hash, int listen_port - , boost::function const& - , sha1_hash const&)> f); + , boost::function const&)> f); bool verify_token(msg const& m); std::string generate_token(msg const& m); diff --git a/parse_dht_stats.py b/parse_dht_stats.py index e530e39d5..a442701a5 100644 --- a/parse_dht_stats.py +++ b/parse_dht_stats.py @@ -42,11 +42,11 @@ replot gnuplot_scripts += [name] gen_stats_gnuplot('dht_routing_table_size', 'nodes', ['active nodes','passive nodes']) -gen_stats_gnuplot('dht_tracker_table_size', '', ['num torrents']) -gen_stats_gnuplot('dht_announces', 'number per minute', ['announces per min', 'failed announces per min']) -gen_stats_gnuplot('dht_clients', 'number per minute', ['total msgs per min', 'az msgs per min', 'ut msgs per min', 'lt msgs per min', 'mp msgs per min', 'gr msgs per min']) +gen_stats_gnuplot('dht_tracker_table_size', '', ['num torrents', 'num peers']) +gen_stats_gnuplot('dht_announces', 'messages per minute', ['announces per min', 'failed announces per min']) +gen_stats_gnuplot('dht_clients', 'messages per minute', ['total msgs per min', 'az msgs per min', 'ut msgs per min', 'lt msgs per min', 'mp msgs per min', 'gr msgs per min']) gen_stats_gnuplot('dht_rate', 'bytes per second', ['bytes in per sec', 'bytes out per sec']) -gen_stats_gnuplot('dht_errors', '', ['error replies sent', 'error queries recvd']) +gen_stats_gnuplot('dht_errors', 'messages per minute', ['error replies sent', 'error queries recvd']) for i in gnuplot_scripts: os.system('gnuplot %s.gnuplot' % i); diff --git a/src/kademlia/dht_tracker.cpp b/src/kademlia/dht_tracker.cpp index 403fb8e49..6b559f9c5 100644 --- a/src/kademlia/dht_tracker.cpp +++ b/src/kademlia/dht_tracker.cpp @@ -207,7 +207,7 @@ namespace libtorrent { namespace dht // turns on and off individual components' logging rpc_log().enable(false); - node_log().enable(false); +// node_log().enable(false); traversal_log().enable(false); // dht_tracker_log.enable(false); @@ -325,7 +325,7 @@ namespace libtorrent { namespace dht m_last_new_key = now; m_dht.new_write_key(); #ifdef TORRENT_DHT_VERBOSE_LOGGING - TORRENT_LOG(dht_tracker) << time_now_string() << " new write key"; + TORRENT_LOG(dht_tracker) << " *** new write key"; #endif } @@ -410,8 +410,7 @@ namespace libtorrent { namespace dht } void dht_tracker::announce(sha1_hash const& ih, int listen_port - , boost::function const& - , sha1_hash const&)> f) + , boost::function const&)> f) { mutex_t::scoped_lock l(m_mutex); m_dht.announce(ih, listen_port, f); @@ -609,6 +608,9 @@ namespace libtorrent { namespace dht std::copy(id->string_ptr(), id->string_ptr() + id->string_length(), m.id.begin()); +#ifdef TORRENT_DHT_VERBOSE_LOGGING + log_line << " id: " << m.id; +#endif lazy_entry const* n = r->dict_find_list("values"); if (n) { @@ -804,6 +806,7 @@ namespace libtorrent { namespace dht m.write_token = token->string_value(); m.message_id = libtorrent::dht::messages::announce_peer; #ifdef TORRENT_DHT_VERBOSE_LOGGING + log_line << " token: " << to_hex(m.write_token); log_line << " ih: " << boost::lexical_cast(m.info_hash); log_line << " p: " << m.port; @@ -1037,11 +1040,9 @@ namespace libtorrent { namespace dht } case messages::get_peers: { - if (m.peers.empty()) - { - write_nodes_entry(r, m); - } - else + write_nodes_entry(r, m); + + if (!m.peers.empty()) { r["values"] = entry(entry::list_t); entry& p = r["values"]; diff --git a/src/kademlia/find_data.cpp b/src/kademlia/find_data.cpp index fe2e7e5bc..50085345a 100644 --- a/src/kademlia/find_data.cpp +++ b/src/kademlia/find_data.cpp @@ -55,11 +55,13 @@ void find_data_observer::reply(msg const& m) return; } + if (!m.write_token.empty()) + m_algorithm->got_write_token(m.id, m.write_token); + if (!m.peers.empty()) - { m_algorithm->got_data(&m); - } - else + + if (!m.nodes.empty()) { for (msg::nodes_t::const_iterator i = m.nodes.begin() , end(m.nodes.end()); i != end; ++i) @@ -82,9 +84,12 @@ void find_data_observer::timeout() find_data::find_data( node_impl& node , node_id target - , done_callback const& callback) + , data_callback const& dcallback + , nodes_callback const& ncallback) : traversal_algorithm(node, target, node.m_table.begin(), node.m_table.end()) - , m_done_callback(callback) + , m_data_callback(dcallback) + , m_nodes_callback(ncallback) + , m_target(target) , m_done(false) { boost::intrusive_ptr self(this); @@ -100,7 +105,7 @@ void find_data::invoke(node_id const& id, udp::endpoint addr) } TORRENT_ASSERT(m_node.m_rpc.allocation_size() >= sizeof(find_data_observer)); - observer_ptr o(new (m_node.m_rpc.allocator().malloc()) find_data_observer(this, id, m_target)); + observer_ptr o(new (m_node.m_rpc.allocator().malloc()) find_data_observer(this, id)); #ifdef TORRENT_DEBUG o->m_in_constructor = false; #endif @@ -109,14 +114,26 @@ void find_data::invoke(node_id const& id, udp::endpoint addr) void find_data::got_data(msg const* m) { - m_done = true; - m_done_callback(m); + m_data_callback(m->peers); } void find_data::done() { if (m_invoke_count != 0) return; - if (!m_done) m_done_callback(0); + + std::vector > results; + int num_results = m_node.m_table.bucket_size(); + for (std::vector::iterator i = m_results.begin() + , end(m_results.end()); i != end && num_results > 0; ++i) + { + if (i->flags & result::no_id) continue; + if ((i->flags & result::queried) == 0) continue; + std::map::iterator j = m_write_tokens.find(i->id); + if (j == m_write_tokens.end()) continue; + results.push_back(std::make_pair(node_entry(i->id, i->addr), j->second)); + --num_results; + } + m_nodes_callback(results); } } } // namespace libtorrent::dht diff --git a/src/kademlia/node.cpp b/src/kademlia/node.cpp index a9a705252..eb53ce47a 100644 --- a/src/kademlia/node.cpp +++ b/src/kademlia/node.cpp @@ -172,12 +172,14 @@ void node_impl::refresh(node_id const& id void node_impl::bootstrap(std::vector const& nodes , boost::function0 f) { +/* #ifdef TORRENT_DHT_VERBOSE_LOGGING TORRENT_LOG(node) << "bootrapping: " << nodes.size(); for (std::vector::const_iterator i = nodes.begin() , end(nodes.end()); i != end; ++i) TORRENT_LOG(node) << " " << *i; #endif +*/ std::vector start; start.reserve(nodes.size()); std::copy(nodes.begin(), nodes.end(), std::back_inserter(start)); @@ -255,29 +257,29 @@ void node_impl::incoming(msg const& m) namespace { - void announce_fun(std::vector const& v, rpc_manager& rpc - , int listen_port, sha1_hash const& ih - , boost::function const&, sha1_hash const&)> f) + void announce_fun(std::vector > const& v + , rpc_manager& rpc, int listen_port, sha1_hash const& ih) { #ifdef TORRENT_DHT_VERBOSE_LOGGING - TORRENT_LOG(node) << "announce response [ ih: " << ih + TORRENT_LOG(node) << "sending announce_peer [ ih: " << ih << " p: " << listen_port << " nodes: " << v.size() << " ]" ; #endif - bool nodes = false; - // only store on the first k nodes - for (std::vector::const_iterator i = v.begin() + + // store on the first k nodes + for (std::vector >::const_iterator i = v.begin() , end(v.end()); i != end; ++i) { #ifdef TORRENT_DHT_VERBOSE_LOGGING - TORRENT_LOG(node) << " distance: " << (160 - distance_exp(ih, i->id)); + TORRENT_LOG(node) << " distance: " << (160 - distance_exp(ih, i->first.id)); #endif - observer_ptr o(new (rpc.allocator().malloc()) get_peers_observer(ih, listen_port, rpc, f)); + + observer_ptr o(new (rpc.allocator().malloc()) announce_observer( + rpc.allocator(), ih, listen_port, i->second)); #ifdef TORRENT_DEBUG o->m_in_constructor = false; #endif - rpc.invoke(messages::get_peers, udp::endpoint(i->addr, i->port), o); - nodes = true; + rpc.invoke(messages::announce_peer, i->first.ep(), o); } } } @@ -302,15 +304,15 @@ void node_impl::add_node(udp::endpoint node) } void node_impl::announce(sha1_hash const& info_hash, int listen_port - , boost::function const&, sha1_hash const&)> f) + , boost::function const&)> f) { #ifdef TORRENT_DHT_VERBOSE_LOGGING TORRENT_LOG(node) << "announcing [ ih: " << info_hash << " p: " << listen_port << " ]" ; #endif - // search for nodes with ids close to id, and then invoke the - // get_peers and then announce_peer rpc on them. - new closest_nodes(*this, info_hash, boost::bind(&announce_fun, _1, boost::ref(m_rpc) - , listen_port, info_hash, f)); + // search for nodes with ids close to id or with peers + // for info-hash id. then send announce_peer to them. + new find_data(*this, info_hash, f, boost::bind(&announce_fun, _1, boost::ref(m_rpc) + , listen_port, info_hash)); } time_duration node_impl::refresh_timeout() @@ -443,7 +445,7 @@ bool node_impl::on_find(msg const& m, std::vector& peers) const random_sample_n(boost::make_transform_iterator(v.peers.begin(), &get_endpoint) , boost::make_transform_iterator(v.peers.end(), &get_endpoint) , std::back_inserter(peers), num); - +/* #ifdef TORRENT_DHT_VERBOSE_LOGGING for (std::vector::iterator i = peers.begin() , end(peers.end()); i != end; ++i) @@ -451,6 +453,7 @@ bool node_impl::on_find(msg const& m, std::vector& peers) const TORRENT_LOG(node) << " " << *i; } #endif +*/ return true; } @@ -471,26 +474,10 @@ void node_impl::incoming_request(msg const& m) reply.info_hash = m.info_hash; reply.write_token = generate_token(m); - if (!on_find(m, reply.peers)) - { - // we don't have any peers for this info_hash, - // return nodes instead - m_table.find_node(m.info_hash, reply.nodes, 0); -#ifdef TORRENT_DHT_VERBOSE_LOGGING - for (std::vector::iterator i = reply.nodes.begin() - , end(reply.nodes.end()); i != end; ++i) - { - TORRENT_LOG(node) << " " << i->id << " " << i->ep(); - } -#endif - } - } - break; - case messages::find_node: - { - reply.info_hash = m.info_hash; - + on_find(m, reply.peers); + // always return nodes as well as peers m_table.find_node(m.info_hash, reply.nodes, 0); +/* #ifdef TORRENT_DHT_VERBOSE_LOGGING for (std::vector::iterator i = reply.nodes.begin() , end(reply.nodes.end()); i != end; ++i) @@ -498,6 +485,23 @@ void node_impl::incoming_request(msg const& m) TORRENT_LOG(node) << " " << i->id << " " << i->ep(); } #endif +*/ + } + break; + case messages::find_node: + { + reply.info_hash = m.info_hash; + + m_table.find_node(m.info_hash, reply.nodes, 0); +/* +#ifdef TORRENT_DHT_VERBOSE_LOGGING + for (std::vector::iterator i = reply.nodes.begin() + , end(reply.nodes.end()); i != end; ++i) + { + TORRENT_LOG(node) << " " << i->id << " " << i->ep(); + } +#endif +*/ } break; case messages::announce_peer: diff --git a/src/kademlia/rpc_manager.cpp b/src/kademlia/rpc_manager.cpp index 4dcdb8793..5dc59729c 100644 --- a/src/kademlia/rpc_manager.cpp +++ b/src/kademlia/rpc_manager.cpp @@ -93,7 +93,6 @@ typedef mpl::vector< closest_nodes_observer , find_data_observer , announce_observer - , get_peers_observer , refresh_observer , ping_observer , null_observer @@ -117,6 +116,16 @@ rpc_manager::rpc_manager(fun const& f, node_id const& our_id , m_destructing(false) { std::srand(time(0)); + +#ifdef TORRENT_DHT_VERBOSE_LOGGING + TORRENT_LOG(rpc) << "Constructing"; + TORRENT_LOG(rpc) << " closest_nodes_observer: " << sizeof(closest_nodes_observer); + TORRENT_LOG(rpc) << " find_data_observer: " << sizeof(find_data_observer); + TORRENT_LOG(rpc) << " announce_observer: " << sizeof(announce_observer); + TORRENT_LOG(rpc) << " refresh_observer: " << sizeof(refresh_observer); + TORRENT_LOG(rpc) << " ping_observer: " << sizeof(ping_observer); + TORRENT_LOG(rpc) << " null_observer: " << sizeof(null_observer); +#endif } rpc_manager::~rpc_manager()