From 2abd9867cea6ca2fd6cca7877147751814087b1d Mon Sep 17 00:00:00 2001 From: Steven Siloti Date: Fri, 13 Nov 2015 21:08:57 -0800 Subject: [PATCH] IPv6 DHT support Fixes #110 --- ChangeLog | 1 + include/libtorrent/kademlia/dht_tracker.hpp | 10 +- include/libtorrent/kademlia/node.hpp | 27 +- include/libtorrent/kademlia/routing_table.hpp | 24 +- .../kademlia/traversal_algorithm.hpp | 7 +- simulation/setup_dht.cpp | 14 +- simulation/setup_swarm.cpp | 6 +- simulation/test_dht.cpp | 4 + simulation/test_dht_rate_limit.cpp | 9 +- src/kademlia/dht_tracker.cpp | 254 +++++++++++++++--- src/kademlia/get_peers.cpp | 3 +- src/kademlia/node.cpp | 98 +++++-- src/kademlia/routing_table.cpp | 45 +++- src/kademlia/rpc_manager.cpp | 13 +- src/kademlia/traversal_algorithm.cpp | 68 +++-- src/session_impl.cpp | 14 +- test/test_dht.cpp | 254 +++++++++++++++--- 17 files changed, 696 insertions(+), 155 deletions(-) diff --git a/ChangeLog b/ChangeLog index 251f4d53f..f7c3627b3 100644 --- a/ChangeLog +++ b/ChangeLog @@ -101,6 +101,7 @@ * added support for asynchronous disk I/O * almost completely changed the storage interface (for custom storage) * added support for hashing pieces in multiple threads + * added support for BEP 32, "IPv6 extension for DHT" * fix division by zero in super-seeding logic diff --git a/include/libtorrent/kademlia/dht_tracker.hpp b/include/libtorrent/kademlia/dht_tracker.hpp index f0be60acd..d2f44cf82 100644 --- a/include/libtorrent/kademlia/dht_tracker.hpp +++ b/include/libtorrent/kademlia/dht_tracker.hpp @@ -142,7 +142,7 @@ namespace libtorrent { namespace dht boost::shared_ptr self() { return shared_from_this(); } - void connection_timeout(error_code const& e); + void connection_timeout(node& n, error_code const& e); void refresh_timeout(error_code const& e); void refresh_key(error_code const& e); @@ -157,6 +157,9 @@ namespace libtorrent { namespace dht counters& m_counters; node m_dht; +#if TORRENT_USE_IPV6 + node m_dht6; +#endif send_fun_t m_send_fun; dht_logger* m_log; @@ -165,9 +168,14 @@ namespace libtorrent { namespace dht deadline_timer m_key_refresh_timer; deadline_timer m_connection_timer; +#if TORRENT_USE_IPV6 + deadline_timer m_connection_timer6; +#endif deadline_timer m_refresh_timer; dht_settings const& m_settings; + std::map m_nodes; + bool m_abort; // used to resolve hostnames for nodes diff --git a/include/libtorrent/kademlia/node.hpp b/include/libtorrent/kademlia/node.hpp index 8fa0ace9b..046a52cb3 100644 --- a/include/libtorrent/kademlia/node.hpp +++ b/include/libtorrent/kademlia/node.hpp @@ -73,6 +73,9 @@ namespace libtorrent { namespace dht struct traversal_algorithm; struct dht_observer; +extern char const* address_type_names[num_address_type]; +extern char const* address_type_keys[num_address_type]; + void TORRENT_EXTRA_EXPORT write_nodes_entry(entry& r, nodes_t const& nodes); struct null_type {}; @@ -99,9 +102,10 @@ protected: class TORRENT_EXTRA_EXPORT node : boost::noncopyable { public: - node(udp_socket_interface* sock + node(address_type at, udp_socket_interface* sock , libtorrent::dht_settings const& settings, node_id nid , dht_observer* observer, counters& cnt + , std::map const& nodes , dht_storage_constructor_type storage_constructor = dht_default_storage_constructor); ~node(); @@ -204,6 +208,21 @@ public: counters& stats_counters() const { return m_counters; } dht_observer* observer() const { return m_observer; } + + address_type native_address_type() { return m_address_type; } + char const* native_address_name() { return address_type_names[m_address_type]; } + char const* native_nodes_key() { return address_type_keys[m_address_type]; } + + bool native_address(udp::endpoint ep) const + { return native_address(ep.address()); } + bool native_address(tcp::endpoint ep) const + { return native_address(ep.address()); } + bool native_address(address addr) const + { + return (addr.is_v4() && m_address_type == ipv4) + || (addr.is_v6() && m_address_type == ipv6); + } + private: void send_single_refresh(udp::endpoint const& ep, int bucket @@ -224,15 +243,21 @@ private: void incoming_request(msg const& h, entry& e); + void write_nodes_entries(sha1_hash const& info_hash + , bdecode_node const& want, entry& r); + node_id m_id; public: routing_table m_table; rpc_manager m_rpc; + std::map const& m_nodes; private: dht_observer* m_observer; + address_type m_address_type; + time_point m_last_tracker_tick; // the last time we issued a bootstrap or a refresh on our own ID, to expand diff --git a/include/libtorrent/kademlia/routing_table.hpp b/include/libtorrent/kademlia/routing_table.hpp index e4d38fb7a..c0d2e1b4a 100644 --- a/include/libtorrent/kademlia/routing_table.hpp +++ b/include/libtorrent/kademlia/routing_table.hpp @@ -79,15 +79,29 @@ struct ip_set size_t count(address addr); void erase(address addr); + void clear() + { + m_ip4s.clear(); +#if TORRENT_USE_IPV6 + m_ip6s.clear(); +#endif + } + bool operator==(ip_set const& rh) { +#if TORRENT_USE_IPV6 return m_ip4s == rh.m_ip4s && m_ip6s == rh.m_ip6s; +#else + return m_ip4s == rh.m_ip4s; +#endif } // these must be multisets because there can be multiple routing table // entries for a single IP when restrict_routing_ips is set to false boost::unordered_multiset m_ip4s; +#if TORRENT_USE_IPV6 boost::unordered_multiset m_ip6s; +#endif }; // differences in the implementation from the description in @@ -118,7 +132,8 @@ public: // Perhaps replacement nodes should be in a separate vector. typedef std::vector table_t; - routing_table(node_id const& id, int bucket_size + routing_table(node_id const& id, address_type at + , int bucket_size , dht_settings const& settings , dht_logger* log); @@ -229,6 +244,12 @@ public: bool is_full(int bucket) const; + bool native_address(address addr) const + { + return (addr.is_v4() && m_address_type == ipv4) + || (addr.is_v6() && m_address_type == ipv6); + } + private: #ifndef TORRENT_DISABLE_LOGGING @@ -256,6 +277,7 @@ private: table_t m_buckets; node_id m_id; // our own node id + address_type m_address_type; // address type to be stored // the last seen depth (i.e. levels in the routing table) // it's mutable because it's updated by depth(), which is const diff --git a/include/libtorrent/kademlia/traversal_algorithm.hpp b/include/libtorrent/kademlia/traversal_algorithm.hpp index d04828619..48ff6a433 100644 --- a/include/libtorrent/kademlia/traversal_algorithm.hpp +++ b/include/libtorrent/kademlia/traversal_algorithm.hpp @@ -113,7 +113,7 @@ protected: delete p; } - node & m_node; + node& m_node; std::vector m_results; node_id const m_target; boost::uint16_t m_ref_count; @@ -124,8 +124,9 @@ protected: // the IP addresses of the nodes in m_results std::set m_peer4_prefixes; -// no IPv6 support yet anyway -// std::set m_peer6_prefixes; +#if TORRENT_USE_IPV6 + std::set m_peer6_prefixes; +#endif }; struct traversal_observer : observer diff --git a/simulation/setup_dht.cpp b/simulation/setup_dht.cpp index 6af902673..1aca24861 100644 --- a/simulation/setup_dht.cpp +++ b/simulation/setup_dht.cpp @@ -84,12 +84,12 @@ struct dht_node final : lt::dht::udp_socket_interface : m_io_service(sim, addr_from_int(idx)) #if LIBSIMULATOR_USE_MOVE , m_socket(m_io_service) - , m_dht(this, sett, id_from_addr(m_io_service.get_ips().front()) - , nullptr, cnt) + , m_dht(ipv4, this, sett, id_from_addr(m_io_service.get_ips().front()) + , nullptr, cnt, std::map()) #else , m_socket(new asio::ip::udp::socket(m_io_service)) - , m_dht(new lt::dht::node(this, sett, id_from_addr(m_io_service.get_ips().front()) - , nullptr, cnt)) + , m_dht(new lt::dht::node(ipv4, this, sett, id_from_addr(m_io_service.get_ips().front()) + , nullptr, cnt, std::map())) #endif , m_add_dead_nodes(flags & add_dead_nodes) { @@ -99,7 +99,6 @@ struct dht_node final : lt::dht::udp_socket_interface udp::socket::non_blocking_io ioc(true); sock().io_control(ioc); - sock().async_receive_from(asio::mutable_buffers_1(m_buffer, sizeof(m_buffer)) , m_ep, boost::bind(&dht_node::on_read, this, _1, _2)); } @@ -117,8 +116,9 @@ struct dht_node final : lt::dht::udp_socket_interface // reserving space in the vector before emplacing any nodes). dht_node(dht_node&& n) noexcept : m_socket(std::move(n.m_socket)) - , m_dht(this, n.m_dht.settings(), n.m_dht.nid() - , n.m_dht.observer(), n.m_dht.stats_counters()) + , m_dht(ipv4, this, n.m_dht.settings(), n.m_dht.nid() + , n.m_dht.observer(), n.m_dht.stats_counters() + , std::map()) { assert(false && "dht_node is not movable"); throw std::runtime_error("dht_node is not movable"); diff --git a/simulation/setup_swarm.cpp b/simulation/setup_swarm.cpp index 7eca2fbd0..2c808d922 100644 --- a/simulation/setup_swarm.cpp +++ b/simulation/setup_swarm.cpp @@ -255,10 +255,14 @@ void setup_swarm(int num_nodes for (int i = 0; i < num_nodes; ++i) { // create a new io_service + std::vector ips; char ep[30]; snprintf(ep, sizeof(ep), "50.0.%d.%d", (i + 1) >> 8, (i + 1) & 0xff); + ips.push_back(addr(ep)); + snprintf(ep, sizeof(ep), "2000::%X%X", (i + 1) >> 8, (i + 1) & 0xff); + ips.push_back(addr(ep)); io_service.push_back(boost::make_shared( - boost::ref(sim), addr(ep))); + boost::ref(sim), ips)); lt::settings_pack pack = default_settings; diff --git a/simulation/test_dht.cpp b/simulation/test_dht.cpp index 85e20665c..631fb09d7 100644 --- a/simulation/test_dht.cpp +++ b/simulation/test_dht.cpp @@ -44,6 +44,10 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/socket_io.hpp" #include "setup_swarm.hpp" #include "setup_dht.hpp" +#include "libtorrent/ed25519.hpp" +#include "libtorrent/bencode.hpp" +#include "libtorrent/kademlia/item.hpp" +#include namespace lt = libtorrent; diff --git a/simulation/test_dht_rate_limit.cpp b/simulation/test_dht_rate_limit.cpp index 458a864ee..bff180f5f 100644 --- a/simulation/test_dht_rate_limit.cpp +++ b/simulation/test_dht_rate_limit.cpp @@ -56,8 +56,13 @@ struct obs : dht::dht_observer virtual void set_external_address(address const& addr , address const& source) TORRENT_OVERRIDE {} - virtual address external_address() TORRENT_OVERRIDE - { return address_v4::from_string("40.30.20.10"); } + virtual address external_address(udp proto) TORRENT_OVERRIDE + { + if (proto == udp::v4()) + return address_v4::from_string("40.30.20.10"); + else + return address_v6(); + } virtual void get_peers(sha1_hash const& ih) TORRENT_OVERRIDE {} virtual void outgoing_get_peers(sha1_hash const& target , sha1_hash const& sent_target, udp::endpoint const& ep) TORRENT_OVERRIDE {} diff --git a/src/kademlia/dht_tracker.cpp b/src/kademlia/dht_tracker.cpp index 8b3ce2f08..5a21dc965 100644 --- a/src/kademlia/dht_tracker.cpp +++ b/src/kademlia/dht_tracker.cpp @@ -76,10 +76,10 @@ namespace libtorrent { namespace dht time_duration const key_refresh = duration_cast(minutes(5)); - node_id extract_node_id(entry const& e) + node_id extract_node_id(entry const& e, std::string const& key) { if (e.type() != entry::dictionary_t) return (node_id::min)(); - entry const* nid = e.find_key("node-id"); + entry const* nid = e.find_key(key); if (nid == NULL || nid->type() != entry::string_t || nid->string().length() != 20) return (node_id::min)(); return node_id(nid->string().c_str()); @@ -97,11 +97,19 @@ namespace libtorrent { namespace dht , dht_storage_constructor_type storage_constructor , entry const& state) : m_counters(cnt) - , m_dht(this, settings, extract_node_id(state), observer, cnt, storage_constructor) + , m_dht(ipv4, this, settings, extract_node_id(state, "node-id") + , observer, cnt, m_nodes, storage_constructor) +#if TORRENT_USE_IPV6 + , m_dht6(ipv6, this, settings, extract_node_id(state, "node-id6") + , observer, cnt, m_nodes, storage_constructor) +#endif , m_send_fun(send_fun) , m_log(observer) , m_key_refresh_timer(ios) , m_connection_timer(ios) +#if TORRENT_USE_IPV6 + , m_connection_timer6(ios) +#endif , m_refresh_timer(ios) , m_settings(settings) , m_abort(false) @@ -111,9 +119,19 @@ namespace libtorrent { namespace dht { m_blocker.set_block_timer(m_settings.block_timeout); m_blocker.set_rate_limit(m_settings.block_ratelimit); + + m_nodes.insert(std::make_pair(m_dht.native_address_name(), &m_dht)); +#if TORRENT_USE_IPV6 + m_nodes.insert(std::make_pair(m_dht6.native_address_name(), &m_dht6)); +#endif + #ifndef TORRENT_DISABLE_LOGGING - m_log->log(dht_logger::tracker, "starting DHT tracker with node id: %s" + m_log->log(dht_logger::tracker, "starting IPv4 DHT tracker with node id: %s" , to_hex(m_dht.nid().to_string()).c_str()); + #if TORRENT_USE_IPV6 + m_log->log(dht_logger::tracker, "starting IPv6 DHT tracker with node id: %s" + , to_hex(m_dht6.nid().to_string()).c_str()); + #endif #endif } @@ -131,6 +149,9 @@ namespace libtorrent { namespace dht , find_data::nodes_callback const& f) { std::vector initial_nodes; +#if TORRENT_USE_IPV6 + std::vector initial_nodes6; +#endif if (bootstrap.type() == entry::dictionary_t) { @@ -138,6 +159,12 @@ namespace libtorrent { namespace dht if (entry const* nodes = bootstrap.find_key("nodes")) read_endpoint_list(nodes, initial_nodes); } TORRENT_CATCH(std::exception&) {} +#if TORRENT_USE_IPV6 + TORRENT_TRY{ + if (entry const* nodes = bootstrap.find_key("nodes6")) + read_endpoint_list(nodes, initial_nodes6); + } TORRENT_CATCH(std::exception&) {} +#endif } error_code ec; @@ -145,11 +172,20 @@ namespace libtorrent { namespace dht m_connection_timer.expires_from_now(seconds(1), ec); m_connection_timer.async_wait( - boost::bind(&dht_tracker::connection_timeout, self(), _1)); + boost::bind(&dht_tracker::connection_timeout, self(), boost::ref(m_dht), _1)); + +#if TORRENT_USE_IPV6 + m_connection_timer6.expires_from_now(seconds(1), ec); + m_connection_timer6.async_wait( + boost::bind(&dht_tracker::connection_timeout, self(), boost::ref(m_dht6), _1)); +#endif m_refresh_timer.expires_from_now(seconds(5), ec); m_refresh_timer.async_wait(boost::bind(&dht_tracker::refresh_timeout, self(), _1)); m_dht.bootstrap(initial_nodes, f); +#if TORRENT_USE_IPV6 + m_dht6.bootstrap(initial_nodes6, f); +#endif } void dht_tracker::stop() @@ -158,6 +194,9 @@ namespace libtorrent { namespace dht error_code ec; m_key_refresh_timer.cancel(ec); m_connection_timer.cancel(ec); +#if TORRENT_USE_IPV6 + m_connection_timer6.cancel(ec); +#endif m_refresh_timer.cancel(ec); m_host_resolver.cancel(); } @@ -180,14 +219,19 @@ namespace libtorrent { namespace dht m_dht.update_stats_counters(c); } - void dht_tracker::connection_timeout(error_code const& e) + void dht_tracker::connection_timeout(node& n, error_code const& e) { if (e || m_abort) return; - time_duration d = m_dht.connection_timeout(); + time_duration d = n.connection_timeout(); error_code ec; - m_connection_timer.expires_from_now(d, ec); - m_connection_timer.async_wait(boost::bind(&dht_tracker::connection_timeout, self(), _1)); +#if TORRENT_USE_IPV6 + deadline_timer& timer = n.native_address_type() == ipv4 ? m_connection_timer : m_connection_timer6; +#else + deadline_timer& timer = m_connection_timer; +#endif + timer.expires_from_now(d, ec); + timer.async_wait(boost::bind(&dht_tracker::connection_timeout, self(), boost::ref(n), _1)); } void dht_tracker::refresh_timeout(error_code const& e) @@ -195,6 +239,9 @@ namespace libtorrent { namespace dht if (e || m_abort) return; m_dht.tick(); +#if TORRENT_USE_IPV6 + m_dht6.tick(); +#endif // periodically update the DOS blocker's settings from the dht_settings m_blocker.set_block_timer(m_settings.block_timeout); @@ -215,6 +262,7 @@ namespace libtorrent { namespace dht m_key_refresh_timer.async_wait(boost::bind(&dht_tracker::refresh_key, self(), _1)); m_dht.new_write_key(); + m_dht6.new_write_key(); #ifndef TORRENT_DISABLE_LOGGING m_log->log(dht_logger::tracker, "*** new write key***"); #endif @@ -224,6 +272,11 @@ namespace libtorrent { namespace dht #if defined TORRENT_DEBUG && TORRENT_USE_IOSTREAM std::ofstream st("dht_routing_table_state.txt", std::ios_base::trunc); m_dht.print_state(st); + + #if TORRENT_USE_IPV6 + std::ofstream st6("dht6_routing_table_state.txt", std::ios_base::trunc); + m_dht6.print_state(st6); + #endif #endif */ @@ -231,18 +284,111 @@ namespace libtorrent { namespace dht , boost::function const&)> f) { m_dht.get_peers(ih, f, NULL, false); +#if TORRENT_USE_IPV6 + m_dht6.get_peers(ih, f, NULL, false); +#endif } void dht_tracker::announce(sha1_hash const& ih, int listen_port, int flags , boost::function const&)> f) { m_dht.announce(ih, listen_port, flags, f); +#if TORRENT_USE_IPV6 + m_dht6.announce(ih, listen_port, flags, f); +#endif } + namespace { + + struct get_immutable_item_ctx + { + get_immutable_item_ctx(int traversals) + : active_traversals(traversals) + , item_posted(false) + {} + int active_traversals; + bool item_posted; + }; + + // these functions provide a slightly higher level + // interface to the get/put functionality in the DHT + void get_immutable_item_callback(item const& it, boost::shared_ptr ctx + , boost::function f) + { + // the reason to wrap here is to control the return value + // since it controls whether we re-put the content + TORRENT_ASSERT(!it.is_mutable()); + --ctx->active_traversals; + if (!ctx->item_posted && (!it.empty() || ctx->active_traversals == 0)) + { + ctx->item_posted = true; + f(it); + } + } + + struct get_mutable_item_ctx + { + get_mutable_item_ctx(int traversals) : active_traversals(traversals) {} + int active_traversals; + item it; + }; + + bool get_mutable_item_callback(item const& it, bool authoritative + , boost::shared_ptr ctx + , boost::function f) + { + TORRENT_ASSERT(it.is_mutable()); + if (authoritative) --ctx->active_traversals; + authoritative = authoritative && ctx->active_traversals == 0; + if ((ctx->it.empty() && !it.empty()) || (ctx->it.seq() < it.seq())) + { + ctx->it = it; + f(it, authoritative); + } + else if (authoritative) + f(it, authoritative); + + return false; + } + + struct put_item_ctx + { + put_item_ctx(int traversals) + : active_traversals(traversals) + , response_count(0) + {} + + int active_traversals; + int response_count; + }; + + void put_immutable_item_callback(int responses, boost::shared_ptr ctx + , boost::function f) + { + ctx->response_count += responses; + if (--ctx->active_traversals == 0) + f(ctx->response_count); + } + + void put_mutable_item_callback(item const& it, int responses, boost::shared_ptr ctx + , boost::function cb) + { + ctx->response_count += responses; + if (--ctx->active_traversals == 0) + cb(it, ctx->response_count); + } + + } // anonymous namespace + void dht_tracker::get_item(sha1_hash const& target , boost::function cb) { - m_dht.get_item(target, cb); + boost::shared_ptr + ctx = boost::make_shared((TORRENT_USE_IPV6) ? 2 : 1); + m_dht.get_item(target, boost::bind(&get_immutable_item_callback, _1, ctx, cb)); +#if TORRENT_USE_IPV6 + m_dht6.get_item(target, boost::bind(&get_immutable_item_callback, _1, ctx, cb)); +#endif } // key is a 32-byte binary string, the public key to look up. @@ -251,7 +397,12 @@ namespace libtorrent { namespace dht , boost::function cb , std::string salt) { - m_dht.get_item(key, salt, cb); + boost::shared_ptr + ctx = boost::make_shared((TORRENT_USE_IPV6) ? 2 : 1); + m_dht.get_item(key, salt, boost::bind(&get_mutable_item_callback, _1, _2, ctx, cb)); +#if TORRENT_USE_IPV6 + m_dht6.get_item(key, salt, boost::bind(&get_mutable_item_callback, _1, _2, ctx, cb)); +#endif } void dht_tracker::put_item(entry const& data @@ -262,20 +413,41 @@ namespace libtorrent { namespace dht sha1_hash target = item_target_id( std::pair(flat_data.c_str(), flat_data.size())); - m_dht.put_item(target, data, cb); + boost::shared_ptr + ctx = boost::make_shared((TORRENT_USE_IPV6) ? 2 : 1); + m_dht.put_item(target, data, boost::bind(&put_immutable_item_callback + , _1, ctx, cb)); +#if TORRENT_USE_IPV6 + m_dht6.put_item(target, data, boost::bind(&put_immutable_item_callback + , _1, ctx, cb)); +#endif } void dht_tracker::put_item(char const* key , boost::function cb , boost::function data_cb, std::string salt) { - m_dht.put_item(key, salt, cb, data_cb); + boost::shared_ptr + ctx = boost::make_shared((TORRENT_USE_IPV6) ? 2 : 1); + + m_dht.put_item(key, salt, boost::bind(&put_mutable_item_callback + , _1, _2, ctx, cb), data_cb); +#if TORRENT_USE_IPV6 + m_dht6.put_item(key, salt, boost::bind(&put_mutable_item_callback + , _1, _2, ctx, cb), data_cb); +#endif } void dht_tracker::direct_request(udp::endpoint ep, entry& e , boost::function f) { - m_dht.direct_request(ep, e, f); +#if TORRENT_USE_IPV6 + if (ep.protocol() == udp::v6()) + m_dht6.direct_request(ep, e, f); + else +#endif + m_dht.direct_request(ep, e, f); + } void dht_tracker::incoming_error(error_code const& ec, udp::endpoint const& ep) @@ -292,6 +464,9 @@ namespace libtorrent { namespace dht ) { m_dht.unreachable(ep); +#if TORRENT_USE_IPV6 + m_dht6.unreachable(ep); +#endif } } @@ -299,8 +474,6 @@ namespace libtorrent { namespace dht , char const* buf, int size) { if (size <= 20 || *buf != 'd' || buf[size-1] != 'e') return false; - // remove this line/check once the DHT supports IPv6 - if (!ep.address().is_v4()) return false; m_counters.inc_stats_counter(counters::dht_bytes_in, size); // account for IP and UDP overhead @@ -364,6 +537,9 @@ namespace libtorrent { namespace dht libtorrent::dht::msg m(m_msg, ep); m_dht.incoming(m); +#if TORRENT_USE_IPV6 + m_dht6.incoming(m); +#endif return true; } @@ -378,40 +554,52 @@ namespace libtorrent { namespace dht n->list().push_back(entry(node)); } + void save_nodes(entry& ret, node const& dht, std::string const& key) + { + entry nodes(entry::list_t); + dht.m_table.for_each_node(&add_node_fun, &add_node_fun, &nodes); + bucket_t cache; + dht.replacement_cache(cache); + for (bucket_t::iterator i(cache.begin()) + , end(cache.end()); i != end; ++i) + { + std::string node; + std::back_insert_iterator out(node); + write_endpoint(i->ep(), out); + nodes.list().push_back(entry(node)); + } + if (!nodes.list().empty()) + ret[key] = nodes; + } + } // anonymous namespace entry dht_tracker::state() const { entry ret(entry::dictionary_t); - { - entry nodes(entry::list_t); - m_dht.m_table.for_each_node(&add_node_fun, &add_node_fun, &nodes); - bucket_t cache; - m_dht.replacement_cache(cache); - for (bucket_t::iterator i(cache.begin()) - , end(cache.end()); i != end; ++i) - { - std::string node; - std::back_insert_iterator out(node); - write_endpoint(i->ep(), out); - nodes.list().push_back(entry(node)); - } - if (!nodes.list().empty()) - ret["nodes"] = nodes; - } - + save_nodes(ret, m_dht, "nodes"); ret["node-id"] = m_dht.nid().to_string(); +#if TORRENT_USE_IPV6 + save_nodes(ret, m_dht6, "nodes6"); + ret["node-id6"] = m_dht6.nid().to_string(); +#endif return ret; } void dht_tracker::add_node(udp::endpoint node) { m_dht.add_node(node); +#if TORRENT_USE_IPV6 + m_dht6.add_node(node); +#endif } void dht_tracker::add_router_node(udp::endpoint const& node) { m_dht.add_router_node(node); +#if TORRENT_USE_IPV6 + m_dht6.add_router_node(node); +#endif } bool dht_tracker::has_quota() diff --git a/src/kademlia/get_peers.cpp b/src/kademlia/get_peers.cpp index 05a72375d..c4717dd98 100644 --- a/src/kademlia/get_peers.cpp +++ b/src/kademlia/get_peers.cpp @@ -63,7 +63,8 @@ void get_peers_observer::reply(msg const& m) if (n) { std::vector peer_list; - if (n.list_size() == 1 && n.list_at(0).type() == bdecode_node::string_t) + if (n.list_size() == 1 && n.list_at(0).type() == bdecode_node::string_t + && m.addr.protocol() == udp::v4()) { // assume it's mainline format char const* peers = n.list_at(0).string_ptr(); diff --git a/src/kademlia/node.cpp b/src/kademlia/node.cpp index 1dca84d28..1f4744b33 100644 --- a/src/kademlia/node.cpp +++ b/src/kademlia/node.cpp @@ -69,6 +69,9 @@ namespace libtorrent { namespace dht using detail::write_endpoint; +char const* address_type_names[num_address_type] = { "n4", "n6" }; +char const* address_type_keys[num_address_type] = { "nodes", "nodes6" }; + namespace { void nop() {} @@ -93,16 +96,19 @@ node_id calculate_node_id(node_id const& nid, dht_observer* observer, address_ty } // anonymous namespace -node::node(udp_socket_interface* sock +node::node(address_type at, udp_socket_interface* sock , dht_settings const& settings, node_id nid , dht_observer* observer , struct counters& cnt + , std::map const& nodes , dht_storage_constructor_type storage_constructor) : m_settings(settings) - , m_id(calculate_node_id(nid, observer, ipv4)) - , m_table(m_id, 8, settings, observer) + , m_id(calculate_node_id(nid, observer, at)) + , m_table(m_id, at, 8, settings, observer) , m_rpc(m_id, m_settings, m_table, sock, observer) + , m_nodes(nodes) , m_observer(observer) + , m_address_type(at) , m_last_tracker_tick(aux::time_now()) , m_last_self_refresh(min_time()) , m_sock(sock) @@ -126,7 +132,7 @@ void node::update_node_id() // it's possible that our external address hasn't actually changed. If our // current ID is still valid, don't do anything. - if (verify_id(m_id, m_observer->external_address())) + if (verify_id(m_id, m_observer->external_address(m_address_type))) return; #ifndef TORRENT_DISABLE_LOGGING @@ -134,7 +140,7 @@ void node::update_node_id() , "updating node ID (because external IP address changed)"); #endif - m_id = generate_id(m_observer->external_address()); + m_id = generate_id(m_observer->external_address(m_address_type)); m_table.update_node_id(m_id); } @@ -303,6 +309,8 @@ void node::incoming(msg const& m) // responds to 'query' messages that it receives. if (m_settings.read_only) break; + if (!native_address(m.addr)) break; + if (!m_sock->has_quota()) { m_counters.inc_stats_counter(counters::dht_messages_in_dropped); @@ -407,6 +415,7 @@ void node::add_router_node(udp::endpoint router) void node::add_node(udp::endpoint node) { + if (!native_address(node)) return; // ping the node, and if we get a reply, it // will be added to the routing table send_single_refresh(node, m_table.num_active_buckets()); @@ -600,7 +609,11 @@ struct ping_observer : observer } // look for nodes - bdecode_node n = r.dict_find_string("nodes"); +#if TORRENT_USE_IPV6 + address_type at = algorithm()->get_node().native_address_type(); +#endif + char const* nodes_key = algorithm()->get_node().native_nodes_key(); + bdecode_node n = r.dict_find_string(nodes_key); if (n) { char const* nodes = n.string_ptr(); @@ -611,8 +624,14 @@ struct ping_observer : observer node_id id; std::copy(nodes, nodes + 20, id.begin()); nodes += 20; - algorithm()->get_node().m_table.heard_about(id - , detail::read_v4_endpoint(nodes)); + udp::endpoint ep; +#if TORRENT_USE_IPV6 + if (at == ipv6) + ep = detail::read_v6_endpoint(nodes); + else +#endif + ep = detail::read_v4_endpoint(nodes); + algorithm()->get_node().m_table.heard_about(id, ep); } } } @@ -765,14 +784,12 @@ void node::lookup_peers(sha1_hash const& info_hash, entry& reply m_storage->get_peers(info_hash, noseed, scrape, reply); } -void TORRENT_EXTRA_EXPORT write_nodes_entry(entry& r, nodes_t const& nodes) +void TORRENT_EXTRA_EXPORT write_nodes_entry(entry& n, nodes_t const& nodes) { - entry& n = r["nodes"]; std::back_insert_iterator out(n.string()); for (nodes_t::const_iterator i = nodes.begin() , end(nodes.end()); i != end; ++i) { - if (!i->addr().is_v4()) continue; std::copy(i->id.begin(), i->id.end(), out); write_endpoint(udp::endpoint(i->addr(), i->port()), out); } @@ -843,9 +860,10 @@ void node::incoming_request(msg const& m, entry& e) {"info_hash", bdecode_node::string_t, 20, 0}, {"noseed", bdecode_node::int_t, 0, key_desc_t::optional}, {"scrape", bdecode_node::int_t, 0, key_desc_t::optional}, + {"want", bdecode_node::list_t, 0, key_desc_t::optional}, }; - bdecode_node msg_keys[3]; + bdecode_node msg_keys[4]; if (!verify_message(arg_ent, msg_desc, msg_keys, error_string , sizeof(error_string))) { @@ -859,10 +877,8 @@ void node::incoming_request(msg const& m, entry& e) m_counters.inc_stats_counter(counters::dht_get_peers_in); sha1_hash info_hash(msg_keys[0].string_ptr()); - nodes_t n; // always return nodes as well as peers - m_table.find_node(info_hash, n, 0); - write_nodes_entry(reply, n); + write_nodes_entries(info_hash, msg_keys[3], reply); bool noseed = false; bool scrape = false; @@ -881,9 +897,10 @@ void node::incoming_request(msg const& m, entry& e) { key_desc_t msg_desc[] = { {"target", bdecode_node::string_t, 20, 0}, + {"want", bdecode_node::list_t, 0, key_desc_t::optional}, }; - bdecode_node msg_keys[1]; + bdecode_node msg_keys[2]; if (!verify_message(arg_ent, msg_desc, msg_keys, error_string, sizeof(error_string))) { incoming_error(e, error_string); @@ -893,10 +910,7 @@ void node::incoming_request(msg const& m, entry& e) m_counters.inc_stats_counter(counters::dht_find_node_in); sha1_hash target(msg_keys[0].string_ptr()); - // TODO: 2 find_node should write directly to the response entry - nodes_t n; - m_table.find_node(target, n, 0); - write_nodes_entry(reply, n); + write_nodes_entries(target, msg_keys[1], reply); } else if (query_len == 13 && memcmp(query, "announce_peer", 13) == 0) { @@ -1112,12 +1126,13 @@ void node::incoming_request(msg const& m, entry& e) key_desc_t msg_desc[] = { {"seq", bdecode_node::int_t, 0, key_desc_t::optional}, {"target", bdecode_node::string_t, 20, 0}, + {"want", bdecode_node::list_t, 0, key_desc_t::optional}, }; // k is not used for now // attempt to parse the message - bdecode_node msg_keys[2]; + bdecode_node msg_keys[3]; if (!verify_message(arg_ent, msg_desc, msg_keys, error_string , sizeof(error_string))) { @@ -1135,10 +1150,8 @@ void node::incoming_request(msg const& m, entry& e) reply["token"] = generate_token(m.addr, msg_keys[1].string_ptr()); - nodes_t n; // always return nodes as well as peers - m_table.find_node(target, n, 0); - write_nodes_entry(reply, n); + write_nodes_entries(target, msg_keys[2], reply); // if the get has a sequence number it must be for a mutable item // so don't bother searching the immutable table @@ -1173,12 +1186,43 @@ void node::incoming_request(msg const& m, entry& e) } sha1_hash target(target_ent.string_ptr()); - nodes_t n; // always return nodes as well as peers - m_table.find_node(target, n, 0); - write_nodes_entry(reply, n); + write_nodes_entries(target, arg_ent.dict_find_list("want"), reply); return; } } +void node::write_nodes_entries(sha1_hash const& info_hash + , bdecode_node const& want, entry& r) +{ + // if no wants entry was specified, include a nodes + // entry based on the protocol the request came in with + if (want.type() != bdecode_node::list_t) + { + nodes_t n; + m_table.find_node(info_hash, n, 0); + write_nodes_entry(r[native_nodes_key()], n); + return; + } + + // if there is a wants entry then we may need to reach into + // another node's routing table to get nodes of the requested type + // we use a map maintained by the owning dht_tracker to find the + // node associated with each string in the want list, which may + // include this node + for (int i = 0; i < want.list_size(); ++i) + { + bdecode_node wanted = want.list_at(i); + if (wanted.type() != bdecode_node::string_t) + continue; + std::map::const_iterator wanted_node + = m_nodes.find(wanted.string_value()); + if (wanted_node == m_nodes.end()) + continue; + nodes_t n; + wanted_node->second->m_table.find_node(info_hash, n, 0); + write_nodes_entry(r[wanted_node->second->native_nodes_key()], n); + } +} + } } // namespace libtorrent::dht diff --git a/src/kademlia/routing_table.cpp b/src/kademlia/routing_table.cpp index 09896eee0..ec31cca25 100644 --- a/src/kademlia/routing_table.cpp +++ b/src/kademlia/routing_table.cpp @@ -58,6 +58,20 @@ POSSIBILITY OF SUCH DAMAGE. using boost::uint8_t; +#if BOOST_VERSION <= 104700 +namespace boost { + size_t hash_value(libtorrent::address_v4::bytes_type ip) + { + return boost::hash_value(*reinterpret_cast(&ip[0])); + } + + size_t hash_value(libtorrent::address_v6::bytes_type ip) + { + return boost::hash_value(*reinterpret_cast(&ip[0])); + } +} +#endif + namespace libtorrent { namespace dht { namespace @@ -82,29 +96,35 @@ namespace void ip_set::insert(address addr) { - if (addr.is_v4()) - m_ip4s.insert(addr.to_v4().to_bytes()); - else +#if TORRENT_USE_IPV6 + if (addr.is_v6()) m_ip6s.insert(addr.to_v6().to_bytes()); + else +#endif + m_ip4s.insert(addr.to_v4().to_bytes()); } size_t ip_set::count(address addr) { - if (addr.is_v4()) - return m_ip4s.count(addr.to_v4().to_bytes()); - else +#if TORRENT_USE_IPV6 + if (addr.is_v6()) return m_ip6s.count(addr.to_v6().to_bytes()); + else +#endif + return m_ip4s.count(addr.to_v4().to_bytes()); } void ip_set::erase(address addr) { - if (addr.is_v4()) - erase_one(m_ip4s, addr.to_v4().to_bytes()); - else +#if TORRENT_USE_IPV6 + if (addr.is_v6()) erase_one(m_ip6s, addr.to_v6().to_bytes()); + else +#endif + erase_one(m_ip4s, addr.to_v4().to_bytes()); } -routing_table::routing_table(node_id const& id, int bucket_size +routing_table::routing_table(node_id const& id, address_type at, int bucket_size , dht_settings const& settings , dht_logger* log) : @@ -113,6 +133,7 @@ routing_table::routing_table(node_id const& id, int bucket_size #endif m_settings(settings) , m_id(id) + , m_address_type(at) , m_depth(0) , m_last_self_refresh(min_time()) , m_bucket_size(bucket_size) @@ -589,6 +610,10 @@ routing_table::add_node_status_t routing_table::add_node_impl(node_entry e) // INVARIANT_CHECK; #endif + // don't add if the address isn't the right type + if (!native_address(e.addr())) + return failed_to_add; + // if we already have this (IP,port), don't do anything if (m_router_nodes.find(e.ep()) != m_router_nodes.end()) return failed_to_add; diff --git a/src/kademlia/rpc_manager.cpp b/src/kademlia/rpc_manager.cpp index 0a5d1e5fa..7a23fc18b 100644 --- a/src/kademlia/rpc_manager.cpp +++ b/src/kademlia/rpc_manager.cpp @@ -287,8 +287,11 @@ bool rpc_manager::incoming(msg const& m, node_id* id) if (!o) { #ifndef TORRENT_DISABLE_LOGGING - m_log->log(dht_logger::rpc_manager, "reply with unknown transaction id size: %d from %s" - , int(transaction_id.size()), print_endpoint(m.addr).c_str()); + if (m_table.native_address(m.addr.address())) + { + m_log->log(dht_logger::rpc_manager, "reply with unknown transaction id size: %d from %s" + , int(transaction_id.size()), print_endpoint(m.addr).c_str()); + } #endif // this isn't necessarily because the other end is doing // something wrong. This can also happen when we restart @@ -464,6 +467,12 @@ bool rpc_manager::invoke(entry& e, udp::endpoint target_addr // places a 'ro' key in the top-level message dictionary and sets its value to 1. if (m_settings.read_only) e["ro"] = 1; + node& n = o->algorithm()->get_node(); + if (!n.native_address(o->target_addr())) + { + a["want"].list().push_back(entry(n.native_address_name())); + } + o->set_target(target_addr); o->set_transaction_id(tid); diff --git a/src/kademlia/traversal_algorithm.cpp b/src/kademlia/traversal_algorithm.cpp index 40f4a8cd7..da80bcd37 100644 --- a/src/kademlia/traversal_algorithm.cpp +++ b/src/kademlia/traversal_algorithm.cpp @@ -165,31 +165,45 @@ void traversal_algorithm::add_entry(node_id const& id, udp::endpoint addr, unsig if (m_node.settings().restrict_search_ips && !(flags & observer::flag_initial)) { - // mask the lower octet - boost::uint32_t prefix4 = o->target_addr().to_v4().to_ulong(); - prefix4 &= 0xffffff00; - - if (m_peer4_prefixes.count(prefix4) > 0) +#if TORRENT_USE_IPV6 + if (o->target_addr().is_v6()) { - // we already have a node in this search with an IP very - // close to this one. We know that it's not the same, because - // it claims a different node-ID. Ignore this to avoid attacks -#ifndef TORRENT_DISABLE_LOGGING - if (get_node().observer()) - { - char hex_id[41]; - to_hex(reinterpret_cast(&o->id()[0]), 20, hex_id); - get_node().observer()->log(dht_logger::traversal - , "[%p] traversal DUPLICATE node. id: %s addr: %s type: %s" - , static_cast(this), hex_id, print_address(o->target_addr()).c_str(), name()); - } + address_v6::bytes_type addr_bytes = o->target_addr().to_v6().to_bytes(); + address_v6::bytes_type::const_iterator prefix_it = addr_bytes.begin(); + boost::uint64_t prefix6 = detail::read_uint64(prefix_it); + + if (m_peer6_prefixes.insert(prefix6).second) + goto add_result; + } + else #endif - return; + { + // mask the lower octet + boost::uint32_t prefix4 = o->target_addr().to_v4().to_ulong(); + prefix4 &= 0xffffff00; + + if (m_peer4_prefixes.insert(prefix4).second) + goto add_result; } - m_peer4_prefixes.insert(prefix4); + // we already have a node in this search with an IP very + // close to this one. We know that it's not the same, because + // it claims a different node-ID. Ignore this to avoid attacks +#ifndef TORRENT_DISABLE_LOGGING + if (get_node().observer()) + { + char hex_id[41]; + to_hex(reinterpret_cast(&o->id()[0]), 20, hex_id); + get_node().observer()->log(dht_logger::traversal + , "[%p] traversal DUPLICATE node. id: %s addr: %s type: %s" + , static_cast(this), hex_id, print_address(o->target_addr()).c_str(), name()); + } +#endif + return; } + add_result: + TORRENT_ASSERT((o->flags & observer::flag_no_id) || std::find_if(m_results.begin(), m_results.end() , boost::bind(&observer::id, _1) == id) == m_results.end()); @@ -600,8 +614,13 @@ void traversal_observer::reply(msg const& m) , print_endpoint(target_ep()).c_str(), algorithm()->name()); } #endif + // look for nodes - bdecode_node n = r.dict_find_string("nodes"); +#if TORRENT_USE_IPV6 + address_type at = algorithm()->get_node().native_address_type(); +#endif + char const* nodes_key = algorithm()->get_node().native_nodes_key(); + bdecode_node n = r.dict_find_string(nodes_key); if (n) { char const* nodes = n.string_ptr(); @@ -612,7 +631,14 @@ void traversal_observer::reply(msg const& m) node_id id; std::copy(nodes, nodes + 20, id.begin()); nodes += 20; - algorithm()->traverse(id, read_v4_endpoint(nodes)); + udp::endpoint ep; +#if TORRENT_USE_IPV6 + if (at == ipv6) + ep = read_v6_endpoint(nodes); + else +#endif + ep = read_v4_endpoint(nodes); + algorithm()->traverse(id, ep); } } diff --git a/src/session_impl.cpp b/src/session_impl.cpp index 9662bcf48..e6c0cb5b0 100644 --- a/src/session_impl.cpp +++ b/src/session_impl.cpp @@ -6781,10 +6781,18 @@ namespace aux { address session_impl::external_address(address_type at) { - if (at == ipv4) - return m_external_ip.external_address(address_v4()); +#if !TORRENT_USE_IPV6 + TORRENT_UNUSED(at); +#endif + + address addr; +#if TORRENT_USE_IPV6 + if (at == ipv6) + addr = address_v6(); else - return m_external_ip.external_address(address_v6()); +#endif + addr = address_v4(); + return m_external_ip.external_address(addr); } void session_impl::get_peers(sha1_hash const& ih) diff --git a/test/test_dht.cpp b/test/test_dht.cpp index 8643af567..b35db16b9 100644 --- a/test/test_dht.cpp +++ b/test/test_dht.cpp @@ -201,7 +201,10 @@ struct msg_args { a["want"].list().push_back(w); return *this; } msg_args& nodes(nodes_t const& n) - { if (!n.empty()) dht::write_nodes_entry(a, n); return *this; } + { if (!n.empty()) dht::write_nodes_entry(a["nodes"], n); return *this; } + + msg_args& nodes6(nodes_t const& n) + { if (!n.empty()) dht::write_nodes_entry(a["nodes6"], n); return *this; } msg_args& peers(std::set const& p) { if (!p.empty()) write_peers(a.dict(), p); return *this; } @@ -512,13 +515,16 @@ dht_settings test_settings() // TODO: test obfuscated_get_peers // TODO: 2 split this test up into smaller test cases -TORRENT_TEST(dht) +void do_test_dht(address(&rand_addr)()) { dht_settings sett = test_settings(); mock_socket s; obs observer; counters cnt; - dht::node node(&s, sett, node_id(0), &observer, cnt); + udp::endpoint source(rand_addr(), 20); + std::map nodes; + dht::node node(source.protocol() == udp::v4() ? ipv4 : ipv6 + , &s, sett, node_id(0), &observer, cnt, nodes); // DHT should be running on port 48199 now bdecode_node response; @@ -526,7 +532,6 @@ TORRENT_TEST(dht) bool ret; // ====== ping ====== - udp::endpoint source(address::from_string("10.0.0.1"), 20); send_dht_request(node, "ping", source, &response); dht::key_desc_t pong_desc[] = { @@ -653,7 +658,7 @@ TORRENT_TEST(dht) // 50 downloaders and 50 seeds for (int i = 0; i < 100; ++i) { - source = udp::endpoint(rand_v4(), 6000); + source = udp::endpoint(rand_addr(), 6000); send_dht_request(node, "get_peers", source, &response , msg_args().info_hash("01010101010101010101")); @@ -742,10 +747,19 @@ TORRENT_TEST(dht) // enable node_id enforcement sett.enforce_node_id = true; - // this is one of the test vectors from: - // http://libtorrent.org/dht_sec.html - source = udp::endpoint(address::from_string("124.31.75.21"), 1); - node_id nid = to_hash("5fbfbff10c5d6a4ec8a88e4c6ab4c28b95eee401"); + node_id nid; + if (source.protocol() == udp::v4()) + { + // this is one of the test vectors from: + // http://libtorrent.org/dht_sec.html + source = udp::endpoint(address::from_string("124.31.75.21"), 1); + nid = to_hash("5fbfbff10c5d6a4ec8a88e4c6ab4c28b95eee401"); + } + else + { + source = udp::endpoint(address::from_string("2001:b829:2123:be84:e16c:d6ae:5290:49f1"), 1); + nid = to_hash("0a8ad123be84e16cd6ae529049f1f1bbe9ebb304"); + } // verify that we reject invalid node IDs // this is now an invalid node-id for 'source' @@ -781,7 +795,10 @@ TORRENT_TEST(dht) TEST_EQUAL(node.size().get<0>(), nodes_num); // now the node-id is valid. - nid[0] = 0x5f; + if (source.protocol() == udp::v4()) + nid[0] = 0x5f; + else + nid[0] = 0x0a; send_dht_request(node, "find_node", source, &response , msg_args().target("0101010101010101010101010101010101010101").nid(nid)); @@ -861,7 +878,7 @@ TORRENT_TEST(dht) udp::endpoint eps[1000]; for (int i = 0; i < 1000; ++i) - eps[i] = udp::endpoint(rand_v4(), (rand() % 16534) + 1); + eps[i] = udp::endpoint(rand_addr(), (rand() % 16534) + 1); announce_item items[] = { @@ -1248,16 +1265,29 @@ TORRENT_TEST(dht) // s.restrict_routing_ips = false; node_id id = to_hash("3123456789abcdef01232456789abcdef0123456"); const int bucket_size = 10; - dht::routing_table table(id, bucket_size, s, &observer); + dht::routing_table table(id, source.protocol() == udp::v4() ? ipv4 : ipv6, bucket_size, s, &observer); std::vector nodes; TEST_EQUAL(table.size().get<0>(), 0); node_id tmp = id; node_id diff = to_hash("15764f7459456a9453f8719b09547c11d5f34061"); + address node_addr; + address node_near_addr; + if (source.protocol() == udp::v4()) + { + node_addr = address_v4::from_string("4.4.4.4"); + node_near_addr = address_v4::from_string("4.4.4.5"); + } + else + { + node_addr = address_v6::from_string("2001:1111:1111:1111:1111:1111:1111:1111"); + node_near_addr = address_v6::from_string("2001:1111:1111:1111:eeee:eeee:eeee:eeee"); + } + // test a node with the same IP:port changing ID add_and_replace(tmp, diff); - table.node_seen(tmp, udp::endpoint(address::from_string("4.4.4.4"), 4), 10); + table.node_seen(tmp, udp::endpoint(node_addr, 4), 10); table.find_node(id, nodes, 0, 10); TEST_EQUAL(table.bucket_size(0), 1); TEST_EQUAL(table.size().get<0>(), 1); @@ -1265,13 +1295,13 @@ TORRENT_TEST(dht) if (!nodes.empty()) { TEST_EQUAL(nodes[0].id, tmp); - TEST_EQUAL(nodes[0].addr(), address_v4::from_string("4.4.4.4")); + TEST_EQUAL(nodes[0].addr(), node_addr); TEST_EQUAL(nodes[0].port(), 4); TEST_EQUAL(nodes[0].timeout_count, 0); } // set timeout_count to 1 - table.node_failed(tmp, udp::endpoint(address_v4::from_string("4.4.4.4"), 4)); + table.node_failed(tmp, udp::endpoint(node_addr, 4)); nodes.clear(); table.for_each_node(node_push_back, nop, &nodes); @@ -1279,58 +1309,58 @@ TORRENT_TEST(dht) if (!nodes.empty()) { TEST_EQUAL(nodes[0].id, tmp); - TEST_EQUAL(nodes[0].addr(), address_v4::from_string("4.4.4.4")); + TEST_EQUAL(nodes[0].addr(), node_addr); TEST_EQUAL(nodes[0].port(), 4); TEST_EQUAL(nodes[0].timeout_count, 1); } // add the exact same node again, it should set the timeout_count to 0 - table.node_seen(tmp, udp::endpoint(address::from_string("4.4.4.4"), 4), 10); + table.node_seen(tmp, udp::endpoint(node_addr, 4), 10); nodes.clear(); table.for_each_node(node_push_back, nop, &nodes); TEST_EQUAL(nodes.size(), 1); if (!nodes.empty()) { TEST_EQUAL(nodes[0].id, tmp); - TEST_EQUAL(nodes[0].addr(), address_v4::from_string("4.4.4.4")); + TEST_EQUAL(nodes[0].addr(), node_addr); TEST_EQUAL(nodes[0].port(), 4); TEST_EQUAL(nodes[0].timeout_count, 0); } // test adding the same IP:port again with a new node ID (should replace the old one) add_and_replace(tmp, diff); - table.node_seen(tmp, udp::endpoint(address::from_string("4.4.4.4"), 4), 10); + table.node_seen(tmp, udp::endpoint(node_addr, 4), 10); table.find_node(id, nodes, 0, 10); TEST_EQUAL(table.bucket_size(0), 1); TEST_EQUAL(nodes.size(), 1); if (!nodes.empty()) { TEST_EQUAL(nodes[0].id, tmp); - TEST_EQUAL(nodes[0].addr(), address_v4::from_string("4.4.4.4")); + TEST_EQUAL(nodes[0].addr(), node_addr); TEST_EQUAL(nodes[0].port(), 4); } // test adding the same node ID again with a different IP (should be ignored) - table.node_seen(tmp, udp::endpoint(address::from_string("4.4.4.4"), 5), 10); + table.node_seen(tmp, udp::endpoint(node_addr, 5), 10); table.find_node(id, nodes, 0, 10); TEST_EQUAL(table.bucket_size(0), 1); if (!nodes.empty()) { TEST_EQUAL(nodes[0].id, tmp); - TEST_EQUAL(nodes[0].addr(), address_v4::from_string("4.4.4.4")); + TEST_EQUAL(nodes[0].addr(), node_addr); TEST_EQUAL(nodes[0].port(), 4); } // test adding a node that ends up in the same bucket with an IP // very close to the current one (should be ignored) // if restrict_routing_ips == true - table.node_seen(tmp, udp::endpoint(address::from_string("4.4.4.5"), 5), 10); + table.node_seen(tmp, udp::endpoint(node_near_addr, 5), 10); table.find_node(id, nodes, 0, 10); TEST_EQUAL(table.bucket_size(0), 1); if (!nodes.empty()) { TEST_EQUAL(nodes[0].id, tmp); - TEST_EQUAL(nodes[0].addr(), address_v4::from_string("4.4.4.4")); + TEST_EQUAL(nodes[0].addr(), node_addr); TEST_EQUAL(nodes[0].port(), 4); } @@ -1339,12 +1369,12 @@ TORRENT_TEST(dht) init_rand_address(); add_and_replace(tmp, diff); - table.node_seen(id, udp::endpoint(rand_v4(), rand()), 10); + table.node_seen(id, udp::endpoint(rand_addr(), rand()), 10); nodes.clear(); for (int i = 0; i < 7000; ++i) { - table.node_seen(tmp, udp::endpoint(rand_v4(), rand()), 20 + (tmp[19] & 0xff)); + table.node_seen(tmp, udp::endpoint(rand_addr(), rand()), 20 + (tmp[19] & 0xff)); add_and_replace(tmp, diff); } printf("active buckets: %d\n", table.num_active_buckets()); @@ -1485,7 +1515,7 @@ TORRENT_TEST(dht) g_sent_packets.clear(); do { - dht::node node(&s, sett, (node_id::min)(), &observer, cnt); + dht::node node(ipv4, &s, sett, (node_id::min)(), &observer, cnt, nodes); udp::endpoint initial_node(address_v4::from_string("4.4.4.4"), 1234); std::vector nodesv; @@ -1557,7 +1587,7 @@ TORRENT_TEST(dht) do { dht::node_id target = to_hash("1234876923549721020394873245098347598635"); - dht::node node(&s, sett, (node_id::min)(), &observer, cnt); + dht::node node(ipv4, &s, sett, (node_id::min)(), &observer, cnt, nodes); udp::endpoint initial_node(address_v4::from_string("4.4.4.4"), 1234); node.m_table.add_node(initial_node); @@ -1652,7 +1682,7 @@ TORRENT_TEST(dht) g_sent_packets.clear(); do { - dht::node node(&s, sett, (node_id::min)(), &observer, cnt); + dht::node node(ipv4, &s, sett, (node_id::min)(), &observer, cnt, nodes); udp::endpoint initial_node(address_v4::from_string("4.4.4.4"), 1234); node.m_table.add_node(initial_node); @@ -1698,7 +1728,7 @@ TORRENT_TEST(dht) g_sent_packets.clear(); do { - dht::node node(&s, sett, (node_id::min)(), &observer, cnt); + dht::node node(ipv4, &s, sett, (node_id::min)(), &observer, cnt, nodes); udp::endpoint initial_node(address_v4::from_string("4.4.4.4"), 1234); node.m_table.add_node(initial_node); @@ -1785,7 +1815,7 @@ TORRENT_TEST(dht) // set the branching factor to k to make this a little easier int old_branching = sett.search_branching; sett.search_branching = 8; - dht::node node(&s, sett, (node_id::min)(), &observer, cnt); + dht::node node(ipv4, &s, sett, (node_id::min)(), &observer, cnt, nodes); enum { num_test_nodes = 8 }; node_entry nodes[num_test_nodes] = { node_entry(items[0].target, udp::endpoint(address_v4::from_string("1.1.1.1"), 1231)) @@ -1885,7 +1915,7 @@ TORRENT_TEST(dht) // set the branching factor to k to make this a little easier int old_branching = sett.search_branching; sett.search_branching = 8; - dht::node node(&s, sett, (node_id::min)(), &observer, cnt); + dht::node node(ipv4, &s, sett, (node_id::min)(), &observer, cnt, nodes); enum { num_test_nodes = 8 }; node_entry nodes[num_test_nodes] = { node_entry(items[0].target, udp::endpoint(address_v4::from_string("1.1.1.1"), 1231)) @@ -1987,7 +2017,7 @@ TORRENT_TEST(dht) // set the branching factor to k to make this a little easier int old_branching = sett.search_branching; sett.search_branching = 8; - dht::node node(&s, sett, (node_id::min)(), &observer, cnt); + dht::node node(ipv4, &s, sett, (node_id::min)(), &observer, cnt, nodes); sha1_hash target = hasher(public_key, item_pk_len).final(); enum { num_test_nodes = 9 }; // we need K + 1 nodes to create the failing sequence node_entry nodes[num_test_nodes] = @@ -2067,6 +2097,143 @@ TORRENT_TEST(dht) } while (false); } +TORRENT_TEST(dht) +{ + do_test_dht(rand_v4); +#if TORRENT_USE_IPV6 + if (supports_ipv6()) + do_test_dht(rand_v6); +#endif +} + +TORRENT_TEST(dht_dual_stack) +{ + dht_settings sett = test_settings(); + mock_socket s; + obs observer; + counters cnt; + std::map nodes; + dht::node node4(ipv4, &s, sett, node_id(0), &observer, cnt, nodes); + dht::node node6(ipv6, &s, sett, node_id(0), &observer, cnt, nodes); + nodes.insert(std::make_pair("n4", &node4)); + nodes.insert(std::make_pair("n6", &node6)); + + // DHT should be running on port 48199 now + bdecode_node response; + char error_string[200]; + bool ret; + + node_id id = to_hash("3123456789abcdef01232456789abcdef0123456"); + node4.m_table.node_seen(id, udp::endpoint(address::from_string("4.4.4.4"), 4440), 10); + node6.m_table.node_seen(id, udp::endpoint(address::from_string("4::4"), 4441), 10); + + // v4 node requesting v6 nodes + + udp::endpoint source(address::from_string("10.0.0.1"), 20); + + send_dht_request(node4, "find_node", source, &response + , msg_args().target("0101010101010101010101010101010101010101").want("n6")); + + dht::key_desc_t nodes6_desc[] = { + { "y", bdecode_node::string_t, 1, 0 }, + { "r", bdecode_node::dict_t, 0, key_desc_t::parse_children }, + { "id", bdecode_node::string_t, 20, 0 }, + { "nodes6", bdecode_node::string_t, 38, key_desc_t::last_child } + }; + + bdecode_node nodes6_keys[4]; + + ret = verify_message(response, nodes6_desc, nodes6_keys, error_string + , sizeof(error_string)); + + if (ret) + { + char const* nodes_ptr = nodes6_keys[3].string_ptr(); + TEST_CHECK(memcmp(nodes_ptr, id.data(), id.size) == 0); + nodes_ptr += id.size; + udp::endpoint rep = detail::read_v6_endpoint(nodes_ptr); + TEST_EQUAL(rep, udp::endpoint(address::from_string("4::4"), 4441)); + } + else + { + fprintf(stderr, "find_node response: %s\n", print_entry(response).c_str()); + TEST_ERROR(error_string); + } + + // v6 node requesting v4 nodes + + source.address(address::from_string("10::1")); + + send_dht_request(node6, "get_peers", source, &response + , msg_args().info_hash("0101010101010101010101010101010101010101").want("n4")); + + dht::key_desc_t nodes_desc[] = { + { "y", bdecode_node::string_t, 1, 0 }, + { "r", bdecode_node::dict_t, 0, key_desc_t::parse_children }, + { "id", bdecode_node::string_t, 20, 0 }, + { "nodes", bdecode_node::string_t, 26, key_desc_t::last_child } + }; + + bdecode_node nodes_keys[4]; + + ret = verify_message(response, nodes_desc, nodes_keys, error_string + , sizeof(error_string)); + + if (ret) + { + char const* nodes_ptr = nodes_keys[3].string_ptr(); + TEST_CHECK(memcmp(nodes_ptr, id.data(), id.size) == 0); + nodes_ptr += id.size; + udp::endpoint rep = detail::read_v4_endpoint(nodes_ptr); + TEST_EQUAL(rep, udp::endpoint(address::from_string("4.4.4.4"), 4440)); + } + else + { + fprintf(stderr, "find_node response: %s\n", print_entry(response).c_str()); + TEST_ERROR(error_string); + } + + // v6 node requesting both v4 and v6 nodes + + send_dht_request(node6, "find_nodes", source, &response + , msg_args().info_hash("0101010101010101010101010101010101010101") + .want("n4") + .want("n6")); + + dht::key_desc_t nodes46_desc[] = { + { "y", bdecode_node::string_t, 1, 0 }, + { "r", bdecode_node::dict_t, 0, key_desc_t::parse_children }, + { "id", bdecode_node::string_t, 20, 0 }, + { "nodes", bdecode_node::string_t, 26, 0 }, + { "nodes6", bdecode_node::string_t, 38, key_desc_t::last_child } + }; + + bdecode_node nodes46_keys[5]; + + ret = verify_message(response, nodes46_desc, nodes46_keys, error_string + , sizeof(error_string)); + + if (ret) + { + char const* nodes_ptr = nodes46_keys[3].string_ptr(); + TEST_CHECK(memcmp(nodes_ptr, id.data(), id.size) == 0); + nodes_ptr += id.size; + udp::endpoint rep = detail::read_v4_endpoint(nodes_ptr); + TEST_EQUAL(rep, udp::endpoint(address::from_string("4.4.4.4"), 4440)); + + nodes_ptr = nodes46_keys[4].string_ptr(); + TEST_CHECK(memcmp(nodes_ptr, id.data(), id.size) == 0); + nodes_ptr += id.size; + rep = detail::read_v6_endpoint(nodes_ptr); + TEST_EQUAL(rep, udp::endpoint(address::from_string("4::4"), 4441)); + } + else + { + fprintf(stderr, "find_node response: %s\n", print_entry(response).c_str()); + TEST_ERROR(error_string); + } +} + void get_test_keypair(char* public_key, char* private_key) { from_hex("77ff84905a91936367c01360803104f92432fcd904a43511876df5cdf3e7e548", 64, public_key); @@ -2252,7 +2419,7 @@ TORRENT_TEST(routing_table_uniform) node_id id = to_hash("1234876923549721020394873245098347598635"); node_id diff = to_hash("15764f7459456a9453f8719b09547c11d5f34061"); - routing_table tbl(id, 8, sett, &observer); + routing_table tbl(id, ipv4, 8, sett, &observer); // insert 256 nodes evenly distributed across the ID space. // we expect to fill the top 5 buckets @@ -2295,7 +2462,7 @@ TORRENT_TEST(routing_table_balance) sett.extended_routing_table = false; node_id id = to_hash("1234876923549721020394873245098347598635"); - routing_table tbl(id, 8, sett, &observer); + routing_table tbl(id, ipv4, 8, sett, &observer); // insert nodes in the routing table that will force it to split // and make sure we don't end up with a table completely out of balance @@ -2327,7 +2494,7 @@ TORRENT_TEST(routing_table_extended) for (int i = 0; i < 256; ++i) node_id_prefix.push_back(i); std::random_shuffle(node_id_prefix.begin(), node_id_prefix.end()); - routing_table tbl(id, 8, sett, &observer); + routing_table tbl(id, ipv4, 8, sett, &observer); for (int i = 0; i < 256; ++i) { add_and_replace(id, diff); @@ -2360,7 +2527,7 @@ TORRENT_TEST(routing_table_set_id) node_id_prefix.reserve(256); for (int i = 0; i < 256; ++i) node_id_prefix.push_back(i); std::random_shuffle(node_id_prefix.begin(), node_id_prefix.end()); - routing_table tbl(id, 8, sett, &observer); + routing_table tbl(id, ipv4, 8, sett, &observer); for (int i = 0; i < 256; ++i) { id[0] = node_id_prefix[i]; @@ -2404,8 +2571,9 @@ TORRENT_TEST(read_only_node) mock_socket s; obs observer; counters cnt; + std::map nodes; - dht::node node(&s, sett, node_id(0), &observer, cnt); + dht::node node(ipv4, &s, sett, node_id(0), &observer, cnt, nodes); udp::endpoint source(address::from_string("10.0.0.1"), 20); bdecode_node response; msg_args args; @@ -2491,8 +2659,9 @@ TORRENT_TEST(invalid_error_msg) mock_socket s; obs observer; counters cnt; + std::map nodes; - dht::node node(&s, sett, node_id(0), &observer, cnt); + dht::node node(ipv4, &s, sett, node_id(0), &observer, cnt, nodes); udp::endpoint source(address::from_string("10.0.0.1"), 20); entry e; @@ -2528,10 +2697,11 @@ TORRENT_TEST(rpc_invalid_error_msg) mock_socket s; obs observer; counters cnt; + std::map nodes; - dht::routing_table table(node_id(), 8, sett, &observer); + dht::routing_table table(node_id(), ipv4, 8, sett, &observer); dht::rpc_manager rpc(node_id(), sett, table, &s, &observer); - dht::node node(&s, sett, node_id(0), &observer, cnt); + dht::node node(ipv4, &s, sett, node_id(0), &observer, cnt, nodes); udp::endpoint source(address::from_string("10.0.0.1"), 20); @@ -2618,7 +2788,7 @@ TORRENT_TEST(dht_verify_node_address) s.extended_routing_table = false; node_id id = to_hash("3123456789abcdef01232456789abcdef0123456"); const int bucket_size = 10; - dht::routing_table table(id, bucket_size, s, &observer); + dht::routing_table table(id, ipv4, bucket_size, s, &observer); std::vector nodes; TEST_EQUAL(table.size().get<0>(), 0);