Merge pull request #371 from arvidn/dht-set-nodeid

improve DHT bootstrap performance
This commit is contained in:
Arvid Norberg 2016-01-12 18:45:56 -05:00
commit ceed5329f4
12 changed files with 155 additions and 27 deletions

View File

@ -1,3 +1,5 @@
* improve DHT performance when changing external IP (primarily affects
bootstrapping).
* add feature to stop torrents immediately after checking files is done
* make all non-auto managed torrents exempt from queuing logic, including
checking torrents.

View File

@ -81,6 +81,10 @@ namespace libtorrent { namespace dht
, find_data::nodes_callback const& f);
void stop();
// tell the node to recalculate its node id based on the current
// understanding of its external address (which may have changed)
void update_node_id();
void add_node(udp::endpoint node);
void add_router_node(udp::endpoint const& node);

View File

@ -104,7 +104,9 @@ public:
, dht_observer* observer, counters& cnt
, dht_storage_constructor_type storage_constructor = dht_default_storage_constructor);
virtual ~node();
~node();
void update_node_id();
void tick();
void bootstrap(std::vector<udp::endpoint> const& nodes

View File

@ -83,6 +83,16 @@ struct routing_table_node
// bucket has failed, then it is put in the replacement
// cache (just like in the paper).
namespace impl
{
template <typename F>
inline void forwarder(void* userdata, node_entry const& node)
{
F* f = reinterpret_cast<F*>(userdata);
(*f)(node);
}
}
class TORRENT_EXTRA_EXPORT routing_table : boost::noncopyable
{
public:
@ -132,6 +142,10 @@ public:
// the node will be ignored.
void heard_about(node_id const& id, udp::endpoint const& ep);
// change our node ID. This can be expensive since nodes must be moved around
// and potentially dropped
void update_node_id(node_id id);
node_entry const* next_refresh();
enum
@ -157,6 +171,12 @@ public:
return int(i->live_nodes.size());
}
template <typename F>
void for_each_node(F f)
{
for_each_node(&impl::forwarder<F>, &impl::forwarder<F>, reinterpret_cast<void*>(&f));
}
void for_each_node(void (*)(void*, node_entry const&)
, void (*)(void*, node_entry const&), void* userdata) const;

View File

@ -113,8 +113,8 @@ TORRENT_TEST(dht_bootstrap)
if (ticks > 2)
{
printf("depth: %d nodes: %d\n", routing_table_depth, num_nodes);
TEST_CHECK(routing_table_depth >= 8);
TEST_CHECK(num_nodes >= 74);
TEST_CHECK(routing_table_depth >= 9);
TEST_CHECK(num_nodes >= 115);
dht.stop();
return true;
}

View File

@ -113,8 +113,13 @@ namespace libtorrent { namespace dht
dht_tracker::~dht_tracker() {}
void dht_tracker::update_node_id()
{
m_dht.update_node_id();
}
// defined in node.cpp
extern void nop();
void nop();
void dht_tracker::start(entry const& bootstrap
, find_data::nodes_callback const& f)

View File

@ -115,8 +115,28 @@ node::node(udp_socket_interface* sock
TORRENT_ASSERT(m_storage.get() != NULL);
}
node::~node()
node::~node() {}
void node::update_node_id()
{
// if we don't have an observer, we can't ask for the external IP (and our
// current node ID is likely not generated from an external address), so we
// can just stop here in that case.
if (!m_observer) return;
// it's possible that our external address hasn't actually changed. If our
// current ID is still valid, don't do anything.
if (verify_id(m_id, m_observer->external_address()))
return;
#ifndef TORRENT_DISABLE_LOGGING
if (m_observer) m_observer->log(dht_logger::node
, "updating node ID (because external IP address changed)");
#endif
m_id = generate_id(m_observer->external_address());
m_table.update_node_id(m_id);
}
bool node::verify_token(std::string const& token, char const* info_hash

View File

@ -58,10 +58,15 @@ bool bootstrap::invoke(observer_ptr o)
entry& a = e["a"];
e["q"] = "get_peers";
a["info_hash"] = target().to_string();
// in case our node id changes during the bootstrap, make sure to always use
// the current node id (rather than the target stored in the traversal
// algorithm)
node_id target = get_node().nid();
make_id_secret(target);
a["info_hash"] = target.to_string();
// e["q"] = "find_node";
// a["target"] = target().to_string();
// a["target"] = target.to_string();
m_node.stats_counters().inc_stats_counter(counters::dht_get_peers_out);
return m_node.m_rpc.invoke(e, o->target_ep(), o);
}

View File

@ -1024,6 +1024,38 @@ void routing_table::split_bucket()
}
}
void routing_table::update_node_id(node_id id)
{
m_id = id;
m_ips.clear();
// pull all nodes out of the routing table, effectively emptying it
table_t old_buckets;
old_buckets.swap(m_buckets);
// then add them all back. First add the main nodes, then the replacement
// nodes
for (int i = 0; i < old_buckets.size(); ++i)
{
bucket_t const& bucket = old_buckets[i].live_nodes;
for (int j = 0; j < bucket.size(); ++j)
{
add_node(bucket[j]);
}
}
// now add back the replacement nodes
for (int i = 0; i < old_buckets.size(); ++i)
{
bucket_t const& bucket = old_buckets[i].replacements;
for (int j = 0; j < bucket.size(); ++j)
{
add_node(bucket[j]);
}
}
}
void routing_table::for_each_node(
void (*fun1)(void*, node_entry const&)
, void (*fun2)(void*, node_entry const&)

View File

@ -180,7 +180,7 @@ void traversal_algorithm::add_entry(node_id const& id, udp::endpoint addr, unsig
char hex_id[41];
to_hex(reinterpret_cast<char const*>(&o->id()[0]), 20, hex_id);
get_node().observer()->log(dht_logger::traversal
, "[%p] IGNORING result id: %s addr: %s type: %s"
, "[%p] traversal DUPLICATE node. id: %s addr: %s type: %s"
, static_cast<void*>(this), hex_id, print_address(o->target_addr()).c_str(), name());
}
#endif

View File

@ -6736,25 +6736,7 @@ retry:
// restart the DHT with a new node ID
#ifndef TORRENT_DISABLE_DHT
// TODO: 1 we only need to do this if our global IPv4 address has changed
// since the DHT (currently) only supports IPv4. Since restarting the DHT
// is kind of expensive, it would be nice to not do it unnecessarily
if (m_dht)
{
// TODO: 3 instead of restarting the whole DHT, change the external IP,
// node ID and re-jiggle the routing table in-place. A complete restart
// throws away all outstanding requests, which may be significant
// during bootstrap
entry s = m_dht->state();
int cur_state = 0;
int prev_state = 0;
entry* nodes1 = s.find_key("nodes");
if (nodes1 && nodes1->type() == entry::list_t) cur_state = nodes1->list().size();
entry* nodes2 = m_dht_state.find_key("nodes");
if (nodes2 && nodes2->type() == entry::list_t) prev_state = nodes2->list().size();
if (cur_state > prev_state) m_dht_state = s;
start_dht(m_dht_state);
}
if (m_dht) m_dht->update_node_id();
#endif
}

View File

@ -2346,6 +2346,62 @@ TORRENT_TEST(routing_table_extended)
#endif
}
void inserter(std::set<node_id>* nodes, node_entry const& ne)
{
nodes->insert(nodes->begin(), ne.id);
}
TORRENT_TEST(routing_table_set_id)
{
dht_settings sett = test_settings();
sett.enforce_node_id = false;
sett.extended_routing_table = false;
obs observer;
node_id id = to_hash("0000000000000000000000000000000000000000");
// we can't add the nodes in straight 0,1,2,3 order. That way the routing
// table would get unbalanced and intermediate nodes would be dropped
std::vector<boost::uint8_t> node_id_prefix;
node_id_prefix.reserve(256);
for (int i = 0; i < 256; ++i) node_id_prefix.push_back(i);
std::random_shuffle(node_id_prefix.begin(), node_id_prefix.end());
routing_table tbl(id, 8, sett, &observer);
for (int i = 0; i < 256; ++i)
{
id[0] = node_id_prefix[i];
tbl.node_seen(id, rand_udp_ep(), 20 + (id[19] & 0xff));
}
TEST_EQUAL(tbl.num_active_buckets(), 6);
std::set<node_id> original_nodes;
tbl.for_each_node(boost::bind(&inserter, &original_nodes, _1));
#if defined TORRENT_DEBUG
tbl.print_state(std::cerr);
#endif
id = to_hash("ffffffffffffffffffffffffffffffffffffffff");
tbl.update_node_id(id);
TEST_CHECK(tbl.num_active_buckets() <= 4);
std::set<node_id> remaining_nodes;
tbl.for_each_node(boost::bind(&inserter, &remaining_nodes, _1));
std::set<node_id> intersection;
std::set_intersection(remaining_nodes.begin(), remaining_nodes.end()
, original_nodes.begin(), original_nodes.end()
, std::inserter(intersection, intersection.begin()));
// all remaining nodes also exist in the original nodes
TEST_EQUAL(intersection.size(), remaining_nodes.size());
#if defined TORRENT_DEBUG
tbl.print_state(std::cerr);
#endif
}
TORRENT_TEST(read_only_node)
{
dht_settings sett = test_settings();