improved DHT get_peers code. get_peers always returns nodes now, and announce_peer is always sent to the k closest nodes to the info-hash. This should significantly speed up retrieving peers from the DHT

This commit is contained in:
Arvid Norberg 2008-12-23 20:04:12 +00:00
parent d323e9bdfd
commit 00d02f7859
9 changed files with 116 additions and 112 deletions

View File

@ -5,6 +5,16 @@ Mainline DHT extensions
libtorrent implements a few extensions to the Mainline DHT protocol. libtorrent implements a few extensions to the Mainline DHT protocol.
get_peers response
------------------
libtorrent always responds with ``nodes`` to a get_peers request. If it has
peers for the specified info-hash, it will return ``values`` as well. This is
because just because some peer announced to us, doesn't mean that we are
among the 8 closest nodes of the info hash. libtorrent also keeps traversing
nodes using get_peers until it has found the 8 closest ones, and then announces
to those nodes.
client identification client identification
--------------------- ---------------------

View File

@ -90,8 +90,7 @@ namespace libtorrent { namespace dht
entry state() const; entry state() const;
void announce(sha1_hash const& ih, int listen_port void announce(sha1_hash const& ih, int listen_port
, boost::function<void(std::vector<tcp::endpoint> const& , boost::function<void(std::vector<tcp::endpoint> const&)> f);
, sha1_hash const&)> f);
void dht_status(session_status& s); void dht_status(session_status& s);
void network_stats(int& sent, int& received); void network_stats(int& sent, int& received);

View File

@ -34,6 +34,7 @@ POSSIBILITY OF SUCH DAMAGE.
#define FIND_DATA_050323_HPP #define FIND_DATA_050323_HPP
#include <vector> #include <vector>
#include <map>
#include <libtorrent/kademlia/traversal_algorithm.hpp> #include <libtorrent/kademlia/traversal_algorithm.hpp>
#include <libtorrent/kademlia/node_id.hpp> #include <libtorrent/kademlia/node_id.hpp>
@ -58,22 +59,29 @@ class node_impl;
class find_data : public traversal_algorithm class find_data : public traversal_algorithm
{ {
public: public:
typedef boost::function<void(msg const*)> done_callback; typedef boost::function<void(std::vector<tcp::endpoint> const&)> data_callback;
typedef boost::function<void(std::vector<std::pair<node_entry, std::string> > const&)> nodes_callback;
void got_data(msg const* m); void got_data(msg const* m);
void got_write_token(node_id const& n, std::string const& write_token)
{ m_write_tokens[n] = write_token; }
find_data(node_impl& node, node_id target find_data(node_impl& node, node_id target
, done_callback const& callback); , data_callback const& dcallback
, nodes_callback const& ncallback);
virtual char const* name() const { return "get_peers"; } virtual char const* name() const { return "get_peers"; }
node_id const target() const { return m_target; }
private: private:
void done(); void done();
void invoke(node_id const& id, udp::endpoint addr); void invoke(node_id const& id, udp::endpoint addr);
done_callback m_done_callback; data_callback m_data_callback;
boost::shared_ptr<packet_t> m_packet; nodes_callback m_nodes_callback;
std::map<node_id, std::string> m_write_tokens;
node_id const m_target;
bool m_done; bool m_done;
}; };
@ -82,11 +90,9 @@ class find_data_observer : public observer
public: public:
find_data_observer( find_data_observer(
boost::intrusive_ptr<find_data> const& algorithm boost::intrusive_ptr<find_data> const& algorithm
, node_id self , node_id self)
, node_id target)
: observer(algorithm->allocator()) : observer(algorithm->allocator())
, m_algorithm(algorithm) , m_algorithm(algorithm)
, m_target(target)
, m_self(self) , m_self(self)
{} {}
~find_data_observer(); ~find_data_observer();
@ -95,7 +101,7 @@ public:
{ {
m.reply = false; m.reply = false;
m.message_id = messages::get_peers; m.message_id = messages::get_peers;
m.info_hash = m_target; m.info_hash = m_algorithm->target();
} }
void timeout(); void timeout();
@ -104,7 +110,6 @@ public:
private: private:
boost::intrusive_ptr<find_data> m_algorithm; boost::intrusive_ptr<find_data> m_algorithm;
node_id const m_target;
node_id const m_self; node_id const m_self;
}; };

View File

@ -125,46 +125,6 @@ private:
std::string m_token; std::string m_token;
}; };
class get_peers_observer : public observer
{
public:
get_peers_observer(sha1_hash const& info_hash
, int listen_port
, rpc_manager& rpc
, boost::function<void(std::vector<tcp::endpoint> const&, sha1_hash const&)> f)
: observer(rpc.allocator())
, m_info_hash(info_hash)
, m_listen_port(listen_port)
, m_rpc(rpc)
, m_fun(f)
{}
void send(msg& m)
{
m.port = m_listen_port;
m.info_hash = m_info_hash;
}
void timeout() {}
void reply(msg const& r)
{
observer_ptr o(new (m_rpc.allocator().malloc()) announce_observer(
m_rpc.allocator(), m_info_hash, m_listen_port, r.write_token));
#ifdef TORRENT_DEBUG
o->m_in_constructor = false;
#endif
m_rpc.invoke(messages::announce_peer, r.addr, o);
m_fun(r.peers, m_info_hash);
}
void abort() {}
private:
sha1_hash m_info_hash;
int m_listen_port;
rpc_manager& m_rpc;
boost::function<void(std::vector<tcp::endpoint> const&, sha1_hash const&)> m_fun;
};
class node_impl : boost::noncopyable class node_impl : boost::noncopyable
{ {
typedef std::map<node_id, torrent_entry> table_t; typedef std::map<node_id, torrent_entry> table_t;
@ -211,8 +171,7 @@ public:
#endif #endif
void announce(sha1_hash const& info_hash, int listen_port void announce(sha1_hash const& info_hash, int listen_port
, boost::function<void(std::vector<tcp::endpoint> const& , boost::function<void(std::vector<tcp::endpoint> const&)> f);
, sha1_hash const&)> f);
bool verify_token(msg const& m); bool verify_token(msg const& m);
std::string generate_token(msg const& m); std::string generate_token(msg const& m);

View File

@ -42,11 +42,11 @@ replot
gnuplot_scripts += [name] gnuplot_scripts += [name]
gen_stats_gnuplot('dht_routing_table_size', 'nodes', ['active nodes','passive nodes']) gen_stats_gnuplot('dht_routing_table_size', 'nodes', ['active nodes','passive nodes'])
gen_stats_gnuplot('dht_tracker_table_size', '', ['num torrents']) gen_stats_gnuplot('dht_tracker_table_size', '', ['num torrents', 'num peers'])
gen_stats_gnuplot('dht_announces', 'number per minute', ['announces per min', 'failed announces per min']) gen_stats_gnuplot('dht_announces', 'messages per minute', ['announces per min', 'failed announces per min'])
gen_stats_gnuplot('dht_clients', 'number per minute', ['total msgs per min', 'az msgs per min', 'ut msgs per min', 'lt msgs per min', 'mp msgs per min', 'gr msgs per min']) gen_stats_gnuplot('dht_clients', 'messages per minute', ['total msgs per min', 'az msgs per min', 'ut msgs per min', 'lt msgs per min', 'mp msgs per min', 'gr msgs per min'])
gen_stats_gnuplot('dht_rate', 'bytes per second', ['bytes in per sec', 'bytes out per sec']) gen_stats_gnuplot('dht_rate', 'bytes per second', ['bytes in per sec', 'bytes out per sec'])
gen_stats_gnuplot('dht_errors', '', ['error replies sent', 'error queries recvd']) gen_stats_gnuplot('dht_errors', 'messages per minute', ['error replies sent', 'error queries recvd'])
for i in gnuplot_scripts: for i in gnuplot_scripts:
os.system('gnuplot %s.gnuplot' % i); os.system('gnuplot %s.gnuplot' % i);

View File

@ -207,7 +207,7 @@ namespace libtorrent { namespace dht
// turns on and off individual components' logging // turns on and off individual components' logging
rpc_log().enable(false); rpc_log().enable(false);
node_log().enable(false); // node_log().enable(false);
traversal_log().enable(false); traversal_log().enable(false);
// dht_tracker_log.enable(false); // dht_tracker_log.enable(false);
@ -325,7 +325,7 @@ namespace libtorrent { namespace dht
m_last_new_key = now; m_last_new_key = now;
m_dht.new_write_key(); m_dht.new_write_key();
#ifdef TORRENT_DHT_VERBOSE_LOGGING #ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(dht_tracker) << time_now_string() << " new write key"; TORRENT_LOG(dht_tracker) << " *** new write key";
#endif #endif
} }
@ -410,8 +410,7 @@ namespace libtorrent { namespace dht
} }
void dht_tracker::announce(sha1_hash const& ih, int listen_port void dht_tracker::announce(sha1_hash const& ih, int listen_port
, boost::function<void(std::vector<tcp::endpoint> const& , boost::function<void(std::vector<tcp::endpoint> const&)> f)
, sha1_hash const&)> f)
{ {
mutex_t::scoped_lock l(m_mutex); mutex_t::scoped_lock l(m_mutex);
m_dht.announce(ih, listen_port, f); m_dht.announce(ih, listen_port, f);
@ -609,6 +608,9 @@ namespace libtorrent { namespace dht
std::copy(id->string_ptr(), id->string_ptr() std::copy(id->string_ptr(), id->string_ptr()
+ id->string_length(), m.id.begin()); + id->string_length(), m.id.begin());
#ifdef TORRENT_DHT_VERBOSE_LOGGING
log_line << " id: " << m.id;
#endif
lazy_entry const* n = r->dict_find_list("values"); lazy_entry const* n = r->dict_find_list("values");
if (n) if (n)
{ {
@ -804,6 +806,7 @@ namespace libtorrent { namespace dht
m.write_token = token->string_value(); m.write_token = token->string_value();
m.message_id = libtorrent::dht::messages::announce_peer; m.message_id = libtorrent::dht::messages::announce_peer;
#ifdef TORRENT_DHT_VERBOSE_LOGGING #ifdef TORRENT_DHT_VERBOSE_LOGGING
log_line << " token: " << to_hex(m.write_token);
log_line << " ih: " << boost::lexical_cast<std::string>(m.info_hash); log_line << " ih: " << boost::lexical_cast<std::string>(m.info_hash);
log_line << " p: " << m.port; log_line << " p: " << m.port;
@ -1037,11 +1040,9 @@ namespace libtorrent { namespace dht
} }
case messages::get_peers: case messages::get_peers:
{ {
if (m.peers.empty()) write_nodes_entry(r, m);
{
write_nodes_entry(r, m); if (!m.peers.empty())
}
else
{ {
r["values"] = entry(entry::list_t); r["values"] = entry(entry::list_t);
entry& p = r["values"]; entry& p = r["values"];

View File

@ -55,11 +55,13 @@ void find_data_observer::reply(msg const& m)
return; return;
} }
if (!m.write_token.empty())
m_algorithm->got_write_token(m.id, m.write_token);
if (!m.peers.empty()) if (!m.peers.empty())
{
m_algorithm->got_data(&m); m_algorithm->got_data(&m);
}
else if (!m.nodes.empty())
{ {
for (msg::nodes_t::const_iterator i = m.nodes.begin() for (msg::nodes_t::const_iterator i = m.nodes.begin()
, end(m.nodes.end()); i != end; ++i) , end(m.nodes.end()); i != end; ++i)
@ -82,9 +84,12 @@ void find_data_observer::timeout()
find_data::find_data( find_data::find_data(
node_impl& node node_impl& node
, node_id target , node_id target
, done_callback const& callback) , data_callback const& dcallback
, nodes_callback const& ncallback)
: traversal_algorithm(node, target, node.m_table.begin(), node.m_table.end()) : traversal_algorithm(node, target, node.m_table.begin(), node.m_table.end())
, m_done_callback(callback) , m_data_callback(dcallback)
, m_nodes_callback(ncallback)
, m_target(target)
, m_done(false) , m_done(false)
{ {
boost::intrusive_ptr<find_data> self(this); boost::intrusive_ptr<find_data> self(this);
@ -100,7 +105,7 @@ void find_data::invoke(node_id const& id, udp::endpoint addr)
} }
TORRENT_ASSERT(m_node.m_rpc.allocation_size() >= sizeof(find_data_observer)); TORRENT_ASSERT(m_node.m_rpc.allocation_size() >= sizeof(find_data_observer));
observer_ptr o(new (m_node.m_rpc.allocator().malloc()) find_data_observer(this, id, m_target)); observer_ptr o(new (m_node.m_rpc.allocator().malloc()) find_data_observer(this, id));
#ifdef TORRENT_DEBUG #ifdef TORRENT_DEBUG
o->m_in_constructor = false; o->m_in_constructor = false;
#endif #endif
@ -109,14 +114,26 @@ void find_data::invoke(node_id const& id, udp::endpoint addr)
void find_data::got_data(msg const* m) void find_data::got_data(msg const* m)
{ {
m_done = true; m_data_callback(m->peers);
m_done_callback(m);
} }
void find_data::done() void find_data::done()
{ {
if (m_invoke_count != 0) return; if (m_invoke_count != 0) return;
if (!m_done) m_done_callback(0);
std::vector<std::pair<node_entry, std::string> > results;
int num_results = m_node.m_table.bucket_size();
for (std::vector<result>::iterator i = m_results.begin()
, end(m_results.end()); i != end && num_results > 0; ++i)
{
if (i->flags & result::no_id) continue;
if ((i->flags & result::queried) == 0) continue;
std::map<node_id, std::string>::iterator j = m_write_tokens.find(i->id);
if (j == m_write_tokens.end()) continue;
results.push_back(std::make_pair(node_entry(i->id, i->addr), j->second));
--num_results;
}
m_nodes_callback(results);
} }
} } // namespace libtorrent::dht } } // namespace libtorrent::dht

View File

@ -172,12 +172,14 @@ void node_impl::refresh(node_id const& id
void node_impl::bootstrap(std::vector<udp::endpoint> const& nodes void node_impl::bootstrap(std::vector<udp::endpoint> const& nodes
, boost::function0<void> f) , boost::function0<void> f)
{ {
/*
#ifdef TORRENT_DHT_VERBOSE_LOGGING #ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(node) << "bootrapping: " << nodes.size(); TORRENT_LOG(node) << "bootrapping: " << nodes.size();
for (std::vector<udp::endpoint>::const_iterator i = nodes.begin() for (std::vector<udp::endpoint>::const_iterator i = nodes.begin()
, end(nodes.end()); i != end; ++i) , end(nodes.end()); i != end; ++i)
TORRENT_LOG(node) << " " << *i; TORRENT_LOG(node) << " " << *i;
#endif #endif
*/
std::vector<node_entry> start; std::vector<node_entry> start;
start.reserve(nodes.size()); start.reserve(nodes.size());
std::copy(nodes.begin(), nodes.end(), std::back_inserter(start)); std::copy(nodes.begin(), nodes.end(), std::back_inserter(start));
@ -255,29 +257,29 @@ void node_impl::incoming(msg const& m)
namespace namespace
{ {
void announce_fun(std::vector<node_entry> const& v, rpc_manager& rpc void announce_fun(std::vector<std::pair<node_entry, std::string> > const& v
, int listen_port, sha1_hash const& ih , rpc_manager& rpc, int listen_port, sha1_hash const& ih)
, boost::function<void(std::vector<tcp::endpoint> const&, sha1_hash const&)> f)
{ {
#ifdef TORRENT_DHT_VERBOSE_LOGGING #ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(node) << "announce response [ ih: " << ih TORRENT_LOG(node) << "sending announce_peer [ ih: " << ih
<< " p: " << listen_port << " p: " << listen_port
<< " nodes: " << v.size() << " ]" ; << " nodes: " << v.size() << " ]" ;
#endif #endif
bool nodes = false;
// only store on the first k nodes // store on the first k nodes
for (std::vector<node_entry>::const_iterator i = v.begin() for (std::vector<std::pair<node_entry, std::string> >::const_iterator i = v.begin()
, end(v.end()); i != end; ++i) , end(v.end()); i != end; ++i)
{ {
#ifdef TORRENT_DHT_VERBOSE_LOGGING #ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(node) << " distance: " << (160 - distance_exp(ih, i->id)); TORRENT_LOG(node) << " distance: " << (160 - distance_exp(ih, i->first.id));
#endif #endif
observer_ptr o(new (rpc.allocator().malloc()) get_peers_observer(ih, listen_port, rpc, f));
observer_ptr o(new (rpc.allocator().malloc()) announce_observer(
rpc.allocator(), ih, listen_port, i->second));
#ifdef TORRENT_DEBUG #ifdef TORRENT_DEBUG
o->m_in_constructor = false; o->m_in_constructor = false;
#endif #endif
rpc.invoke(messages::get_peers, udp::endpoint(i->addr, i->port), o); rpc.invoke(messages::announce_peer, i->first.ep(), o);
nodes = true;
} }
} }
} }
@ -302,15 +304,15 @@ void node_impl::add_node(udp::endpoint node)
} }
void node_impl::announce(sha1_hash const& info_hash, int listen_port void node_impl::announce(sha1_hash const& info_hash, int listen_port
, boost::function<void(std::vector<tcp::endpoint> const&, sha1_hash const&)> f) , boost::function<void(std::vector<tcp::endpoint> const&)> f)
{ {
#ifdef TORRENT_DHT_VERBOSE_LOGGING #ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(node) << "announcing [ ih: " << info_hash << " p: " << listen_port << " ]" ; TORRENT_LOG(node) << "announcing [ ih: " << info_hash << " p: " << listen_port << " ]" ;
#endif #endif
// search for nodes with ids close to id, and then invoke the // search for nodes with ids close to id or with peers
// get_peers and then announce_peer rpc on them. // for info-hash id. then send announce_peer to them.
new closest_nodes(*this, info_hash, boost::bind(&announce_fun, _1, boost::ref(m_rpc) new find_data(*this, info_hash, f, boost::bind(&announce_fun, _1, boost::ref(m_rpc)
, listen_port, info_hash, f)); , listen_port, info_hash));
} }
time_duration node_impl::refresh_timeout() time_duration node_impl::refresh_timeout()
@ -443,7 +445,7 @@ bool node_impl::on_find(msg const& m, std::vector<tcp::endpoint>& peers) const
random_sample_n(boost::make_transform_iterator(v.peers.begin(), &get_endpoint) random_sample_n(boost::make_transform_iterator(v.peers.begin(), &get_endpoint)
, boost::make_transform_iterator(v.peers.end(), &get_endpoint) , boost::make_transform_iterator(v.peers.end(), &get_endpoint)
, std::back_inserter(peers), num); , std::back_inserter(peers), num);
/*
#ifdef TORRENT_DHT_VERBOSE_LOGGING #ifdef TORRENT_DHT_VERBOSE_LOGGING
for (std::vector<tcp::endpoint>::iterator i = peers.begin() for (std::vector<tcp::endpoint>::iterator i = peers.begin()
, end(peers.end()); i != end; ++i) , end(peers.end()); i != end; ++i)
@ -451,6 +453,7 @@ bool node_impl::on_find(msg const& m, std::vector<tcp::endpoint>& peers) const
TORRENT_LOG(node) << " " << *i; TORRENT_LOG(node) << " " << *i;
} }
#endif #endif
*/
return true; return true;
} }
@ -471,26 +474,10 @@ void node_impl::incoming_request(msg const& m)
reply.info_hash = m.info_hash; reply.info_hash = m.info_hash;
reply.write_token = generate_token(m); reply.write_token = generate_token(m);
if (!on_find(m, reply.peers)) on_find(m, reply.peers);
{ // always return nodes as well as peers
// we don't have any peers for this info_hash,
// return nodes instead
m_table.find_node(m.info_hash, reply.nodes, 0);
#ifdef TORRENT_DHT_VERBOSE_LOGGING
for (std::vector<node_entry>::iterator i = reply.nodes.begin()
, end(reply.nodes.end()); i != end; ++i)
{
TORRENT_LOG(node) << " " << i->id << " " << i->ep();
}
#endif
}
}
break;
case messages::find_node:
{
reply.info_hash = m.info_hash;
m_table.find_node(m.info_hash, reply.nodes, 0); m_table.find_node(m.info_hash, reply.nodes, 0);
/*
#ifdef TORRENT_DHT_VERBOSE_LOGGING #ifdef TORRENT_DHT_VERBOSE_LOGGING
for (std::vector<node_entry>::iterator i = reply.nodes.begin() for (std::vector<node_entry>::iterator i = reply.nodes.begin()
, end(reply.nodes.end()); i != end; ++i) , end(reply.nodes.end()); i != end; ++i)
@ -498,6 +485,23 @@ void node_impl::incoming_request(msg const& m)
TORRENT_LOG(node) << " " << i->id << " " << i->ep(); TORRENT_LOG(node) << " " << i->id << " " << i->ep();
} }
#endif #endif
*/
}
break;
case messages::find_node:
{
reply.info_hash = m.info_hash;
m_table.find_node(m.info_hash, reply.nodes, 0);
/*
#ifdef TORRENT_DHT_VERBOSE_LOGGING
for (std::vector<node_entry>::iterator i = reply.nodes.begin()
, end(reply.nodes.end()); i != end; ++i)
{
TORRENT_LOG(node) << " " << i->id << " " << i->ep();
}
#endif
*/
} }
break; break;
case messages::announce_peer: case messages::announce_peer:

View File

@ -93,7 +93,6 @@ typedef mpl::vector<
closest_nodes_observer closest_nodes_observer
, find_data_observer , find_data_observer
, announce_observer , announce_observer
, get_peers_observer
, refresh_observer , refresh_observer
, ping_observer , ping_observer
, null_observer , null_observer
@ -117,6 +116,16 @@ rpc_manager::rpc_manager(fun const& f, node_id const& our_id
, m_destructing(false) , m_destructing(false)
{ {
std::srand(time(0)); std::srand(time(0));
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(rpc) << "Constructing";
TORRENT_LOG(rpc) << " closest_nodes_observer: " << sizeof(closest_nodes_observer);
TORRENT_LOG(rpc) << " find_data_observer: " << sizeof(find_data_observer);
TORRENT_LOG(rpc) << " announce_observer: " << sizeof(announce_observer);
TORRENT_LOG(rpc) << " refresh_observer: " << sizeof(refresh_observer);
TORRENT_LOG(rpc) << " ping_observer: " << sizeof(ping_observer);
TORRENT_LOG(rpc) << " null_observer: " << sizeof(null_observer);
#endif
} }
rpc_manager::~rpc_manager() rpc_manager::~rpc_manager()