added support for enumerating the internal DHT live nodes (#1712)

added support for enumerating the internal DHT live nodes
This commit is contained in:
Alden Torres 2017-02-18 23:02:23 -05:00 committed by Arvid Norberg
parent 9dd48e88a4
commit 1e98bf19a5
13 changed files with 219 additions and 40 deletions

View File

@ -1,3 +1,4 @@
* added support for retrieval of DHT live nodes
* complete UNC path support
* add packets pool allocator
* fix last_upload and last_download overflow after 9 hours in past

View File

@ -2510,11 +2510,35 @@ namespace libtorrent
aux::allocation_slot const m_msg_idx;
};
struct TORRENT_EXPORT dht_live_nodes_alert final : alert
{
dht_live_nodes_alert(aux::stack_allocator& alloc
, sha1_hash const& nid
, std::vector<std::pair<sha1_hash, udp::endpoint>> const& nodes);
TORRENT_DEFINE_ALERT(dht_live_nodes_alert, 91)
static const int static_category = alert::dht_notification;
virtual std::string message() const override;
sha1_hash const node_id;
int num_nodes() const;
std::vector<std::pair<sha1_hash, udp::endpoint>> nodes() const;
private:
std::reference_wrapper<aux::stack_allocator> m_alloc;
int m_v4_num_nodes = 0;
int m_v6_num_nodes = 0;
aux::allocation_slot m_v4_nodes_idx;
aux::allocation_slot m_v6_nodes_idx;
};
#undef TORRENT_DEFINE_ALERT_IMPL
#undef TORRENT_DEFINE_ALERT
#undef TORRENT_DEFINE_ALERT_PRIO
enum { num_alert_types = 91 }; // this enum represents "max_alert_index" + 1
enum { num_alert_types = 92 }; // this enum represents "max_alert_index" + 1
}
#endif

View File

@ -394,6 +394,8 @@ namespace libtorrent
void dht_get_peers(sha1_hash const& info_hash);
void dht_announce(sha1_hash const& info_hash, int port = 0, int flags = 0);
void dht_live_nodes(sha1_hash const& nid);
void dht_direct_request(udp::endpoint ep, entry& e
, void* userdata = nullptr);

View File

@ -125,6 +125,8 @@ namespace libtorrent { namespace dht
void incoming_error(error_code const& ec, udp::endpoint const& ep);
bool incoming_packet(udp::endpoint const& ep, span<char const> buf);
std::vector<std::pair<node_id, udp::endpoint>> live_nodes(node_id const& nid);
private:
std::shared_ptr<dht_tracker> self()

View File

@ -224,14 +224,11 @@ public:
return int(i->live_nodes.size());
}
template <typename F>
void for_each_node(F f)
{
for_each_node(&impl::forwarder<F>, &impl::forwarder<F>, reinterpret_cast<void*>(&f));
}
void for_each_node(std::function<void(node_entry const&)> live_cb
, std::function<void(node_entry const&)> replacements_cb) const;
void for_each_node(void (*)(void*, node_entry const&)
, void (*)(void*, node_entry const&), void* userdata) const;
void for_each_node(std::function<void(node_entry const&)> f) const
{ for_each_node(f, f); }
int bucket_size() const { return m_bucket_size; }

View File

@ -442,6 +442,13 @@ namespace libtorrent
void dht_get_peers(sha1_hash const& info_hash);
void dht_announce(sha1_hash const& info_hash, int port = 0, int flags = 0);
// Retrieve all the live DHT (identified by ``nid``) nodes. All the
// nodes id and endpoint will be returned in the list of nodes in the
// alert ``dht_live_nodes_alert``.
// Since this alert is a response to an explicit call, it will always be
// posted, regardless of the alert mask.
void dht_live_nodes(sha1_hash const& nid);
// Send an arbitrary DHT request directly to the specified endpoint. This
// function is intended for use by plugins. When a response is received
// or the request times out, a dht_direct_response_alert will be posted

View File

@ -2129,4 +2129,85 @@ namespace libtorrent
return buf;
}
dht_live_nodes_alert::dht_live_nodes_alert(aux::stack_allocator& alloc
, sha1_hash const& nid
, std::vector<std::pair<sha1_hash, udp::endpoint>> const& nodes)
: node_id(nid)
, m_alloc(alloc)
{
for (auto const& n : nodes)
{
if (n.second.protocol() == udp::v4())
m_v4_num_nodes++;
#if TORRENT_USE_IPV6
else
m_v6_num_nodes++;
#endif
}
m_v4_nodes_idx = alloc.allocate(m_v4_num_nodes * (20 + 6));
m_v6_nodes_idx = alloc.allocate(m_v6_num_nodes * (20 + 18));
char* v4_ptr = alloc.ptr(m_v4_nodes_idx);
#if TORRENT_USE_IPV6
char* v6_ptr = alloc.ptr(m_v6_nodes_idx);
#endif
for (auto const& n : nodes)
{
udp::endpoint const& endp = n.second;
if (endp.protocol() == udp::v4())
{
detail::write_string(n.first.to_string(), v4_ptr);
detail::write_endpoint(endp, v4_ptr);
}
#if TORRENT_USE_IPV6
else
{
detail::write_string(n.first.to_string(), v6_ptr);
detail::write_endpoint(endp, v6_ptr);
}
#endif
}
}
std::string dht_live_nodes_alert::message() const
{
char msg[200];
std::snprintf(msg, sizeof(msg), "dht live nodes for id: %s, nodes %d"
, aux::to_hex(node_id).c_str(), num_nodes());
return msg;
}
int dht_live_nodes_alert::num_nodes() const
{
return m_v4_num_nodes + m_v6_num_nodes;
}
std::vector<std::pair<sha1_hash, udp::endpoint>> dht_live_nodes_alert::nodes() const
{
aux::vector<std::pair<sha1_hash, udp::endpoint>> nodes;
nodes.reserve(num_nodes());
char const* v4_ptr = m_alloc.get().ptr(m_v4_nodes_idx);
for (int i = 0; i < m_v4_num_nodes; i++)
{
sha1_hash ih;
std::memcpy(ih.data(), v4_ptr, 20);
v4_ptr += 20;
nodes.emplace_back(ih, detail::read_v4_endpoint<udp::endpoint>(v4_ptr));
}
#if TORRENT_USE_IPV6
char const* v6_ptr = m_alloc.get().ptr(m_v6_nodes_idx);
for (int i = 0; i < m_v6_num_nodes; i++)
{
sha1_hash ih;
std::memcpy(ih.data(), v6_ptr, 20);
v6_ptr += 20;
nodes.emplace_back(ih, detail::read_v6_endpoint<udp::endpoint>(v6_ptr));
}
#endif
return nodes;
}
} // namespace libtorrent

View File

@ -536,19 +536,35 @@ namespace libtorrent { namespace dht
return true;
}
namespace {
void add_node_fun(void* userdata, node_entry const& e)
std::vector<std::pair<node_id, udp::endpoint>> dht_tracker::live_nodes(node_id const& nid)
{
auto v = static_cast<std::vector<udp::endpoint>*>(userdata);
v->push_back(e.ep());
std::vector<std::pair<node_id, udp::endpoint>> ret;
// TODO: figure out a better solution when multi-home is implemented
node const* dht = m_dht.nid() == nid ? &m_dht
#if TORRENT_USE_IPV6
: m_dht6.nid() == nid ? &m_dht6 : nullptr;
#else
: nullptr
#endif
if (dht == nullptr) return ret;
dht->m_table.for_each_node([&ret](node_entry const& e)
{ ret.emplace_back(e.id, e.endpoint); }, nullptr);
return ret;
}
namespace {
std::vector<udp::endpoint> save_nodes(node const& dht)
{
std::vector<udp::endpoint> ret;
// TODO: refactor for more use of lambda
dht.m_table.for_each_node(&add_node_fun, &add_node_fun, &ret);
dht.m_table.for_each_node([&ret](node_entry const& e)
{ ret.push_back(e.ep()); });
return ret;
}

View File

@ -946,22 +946,20 @@ void routing_table::update_node_id(node_id const& id)
add_node(n);
}
void routing_table::for_each_node(
void (*fun1)(void*, node_entry const&)
, void (*fun2)(void*, node_entry const&)
, void* userdata) const
void routing_table::for_each_node(std::function<void(node_entry const&)> live_cb
, std::function<void(node_entry const&)> replacements_cb) const
{
for (auto const& i : m_buckets)
{
if (fun1)
if (live_cb)
{
for (auto const& j : i.live_nodes)
fun1(userdata, j);
live_cb(j);
}
if (fun2)
if (replacements_cb)
{
for (auto const& j : i.replacements)
fun2(userdata, j);
replacements_cb(j);
}
}
}

View File

@ -601,6 +601,15 @@ namespace libtorrent
#endif
}
void session_handle::dht_live_nodes(sha1_hash const& nid)
{
#ifndef TORRENT_DISABLE_DHT
async_call(&session_impl::dht_live_nodes, nid);
#else
TORRENT_UNUSED(nid);
#endif
}
void session_handle::dht_direct_request(udp::endpoint ep, entry const& e, void* userdata)
{
#ifndef TORRENT_DISABLE_DHT

View File

@ -5831,6 +5831,13 @@ namespace aux {
m_dht->announce(info_hash, port, flags, std::bind(&on_dht_get_peers, std::ref(m_alerts), info_hash, _1));
}
void session_impl::dht_live_nodes(sha1_hash const& nid)
{
if (!m_dht) return;
auto nodes = m_dht->live_nodes(nid);
m_alerts.emplace_alert<dht_live_nodes_alert>(nid, nodes);
}
void session_impl::dht_direct_request(udp::endpoint ep, entry& e, void* userdata)
{
if (!m_dht) return;

View File

@ -71,3 +71,46 @@ TORRENT_TEST(dht_get_peers_reply_alert)
std::sort(peers.begin(), peers.end());
TEST_CHECK(v == peers);
}
TORRENT_TEST(dht_live_nodes_alert)
{
alert_manager mgr(1, dht_live_nodes_alert::static_category);
TEST_EQUAL(mgr.should_post<dht_live_nodes_alert>(), true);
sha1_hash const ih = rand_hash();
sha1_hash const h1 = rand_hash();
sha1_hash const h2 = rand_hash();
sha1_hash const h3 = rand_hash();
sha1_hash const h4 = rand_hash();
sha1_hash const h5 = rand_hash();
udp::endpoint const ep1 = rand_udp_ep(rand_v4);
udp::endpoint const ep2 = rand_udp_ep(rand_v4);
udp::endpoint const ep3 = rand_udp_ep(rand_v4);
#if TORRENT_USE_IPV6
udp::endpoint const ep4 = rand_udp_ep(rand_v6);
udp::endpoint const ep5 = rand_udp_ep(rand_v6);
#else
udp::endpoint const ep4 = rand_udp_ep(rand_v4);
udp::endpoint const ep5 = rand_udp_ep(rand_v4);
#endif
std::vector<std::pair<sha1_hash, udp::endpoint>> v;
v.emplace_back(h1, ep1);
v.emplace_back(h2, ep2);
v.emplace_back(h3, ep3);
v.emplace_back(h4, ep4);
v.emplace_back(h5, ep5);
mgr.emplace_alert<dht_live_nodes_alert>(ih, v);
auto const* a = alert_cast<dht_live_nodes_alert>(mgr.wait_for_alert(seconds(0)));
TEST_CHECK(a != nullptr);
TEST_EQUAL(a->node_id, ih);
TEST_EQUAL(a->num_nodes(), 5);
auto nodes = a->nodes();
std::sort(v.begin(), v.end());
std::sort(nodes.begin(), nodes.end());
TEST_CHECK(v == nodes);
}

View File

@ -88,25 +88,22 @@ sequence_number next_seq(sequence_number s)
return sequence_number(s.value + 1);
}
void add_and_replace(libtorrent::dht::node_id& dst, libtorrent::dht::node_id const& add)
void add_and_replace(node_id& dst, node_id const& add)
{
bool carry = false;
for (int k = 19; k >= 0; --k)
{
int sum = dst[k] + add[k] + (carry?1:0);
int sum = dst[k] + add[k] + (carry ? 1 : 0);
dst[k] = sum & 255;
carry = sum > 255;
}
}
void node_push_back(void* userdata, libtorrent::dht::node_entry const& n)
void node_push_back(std::vector<node_entry>* nv, node_entry const& n)
{
using namespace libtorrent::dht;
std::vector<node_entry>* nv = (std::vector<node_entry>*)userdata;
nv->push_back(n);
}
void nop(void* userdata, libtorrent::dht::node_entry const& n) {}
void nop_node() {}
std::list<std::pair<udp::endpoint, entry>> g_sent_packets;
@ -1556,7 +1553,7 @@ void test_routing_table(address(&rand_addr)())
table.node_failed(tmp, udp::endpoint(node_addr, 4));
nodes.clear();
table.for_each_node(node_push_back, nop, &nodes);
table.for_each_node(std::bind(node_push_back, &nodes, _1), nullptr);
TEST_EQUAL(nodes.size(), 1);
if (!nodes.empty())
{
@ -1569,7 +1566,7 @@ void test_routing_table(address(&rand_addr)())
// add the exact same node again, it should set the timeout_count to 0
table.node_seen(tmp, udp::endpoint(node_addr, 4), 10);
nodes.clear();
table.for_each_node(node_push_back, nop, &nodes);
table.for_each_node(std::bind(node_push_back, &nodes, _1), nullptr);
TEST_EQUAL(nodes.size(), 1);
if (!nodes.empty())
{
@ -1631,7 +1628,7 @@ void test_routing_table(address(&rand_addr)())
print_state(std::cout, table);
table.for_each_node(node_push_back, nop, &nodes);
table.for_each_node(std::bind(node_push_back, &nodes, _1), nullptr);
std::printf("nodes: %d\n", int(nodes.size()));
@ -1678,8 +1675,6 @@ void test_routing_table(address(&rand_addr)())
}
}
using namespace libtorrent::dht;
char const* ips[] = {
"124.31.75.21",
"21.75.31.124",
@ -2969,13 +2964,13 @@ TORRENT_TEST(routing_table_for_each)
print_state(std::cout, tbl);
std::vector<node_entry> v;
tbl.for_each_node(node_push_back, nop, &v);
tbl.for_each_node(std::bind(node_push_back, &v, _1), nullptr);
TEST_EQUAL(v.size(), 2);
v.clear();
tbl.for_each_node(nop, node_push_back, &v);
tbl.for_each_node(nullptr, std::bind(node_push_back, &v, _1));
TEST_EQUAL(v.size(), 2);
v.clear();
tbl.for_each_node(node_push_back, node_push_back, &v);
tbl.for_each_node(std::bind(node_push_back, &v, _1));
TEST_EQUAL(v.size(), 4);
}
@ -3300,9 +3295,6 @@ TORRENT_TEST(dht_verify_node_address)
TORRENT_TEST(generate_prefix_mask)
{
// test node-id functions
using namespace libtorrent::dht;
std::vector<std::pair<int, char const*>> const test = {
{ 0, "0000000000000000000000000000000000000000" },
{ 1, "8000000000000000000000000000000000000000" },