From cdd50be859a470433e7dee6f90fc29c429129660 Mon Sep 17 00:00:00 2001 From: Steven Siloti Date: Thu, 20 Apr 2017 21:45:43 -0700 Subject: [PATCH] create a separate DHT node for each listen socket --- include/libtorrent/Makefile.am | 2 + include/libtorrent/aux_/session_impl.hpp | 50 ++- include/libtorrent/aux_/session_interface.hpp | 4 + .../libtorrent/aux_/session_listen_socket.hpp | 60 +++ .../libtorrent/aux_/session_udp_sockets.hpp | 61 ++++ include/libtorrent/kademlia/dht_observer.hpp | 15 +- include/libtorrent/kademlia/dht_state.hpp | 11 +- include/libtorrent/kademlia/dht_tracker.hpp | 55 ++- include/libtorrent/kademlia/node.hpp | 19 +- include/libtorrent/kademlia/rpc_manager.hpp | 10 +- include/libtorrent/settings_pack.hpp | 5 - simulation/setup_dht.cpp | 57 ++- simulation/test_dht.cpp | 8 +- simulation/test_dht_rate_limit.cpp | 33 +- src/kademlia/dht_state.cpp | 60 ++- src/kademlia/dht_tracker.cpp | 321 ++++++++-------- src/kademlia/node.cpp | 42 ++- src/kademlia/rpc_manager.cpp | 7 +- src/session_handle.cpp | 6 +- src/session_impl.cpp | 344 +++++++++++------- test/test_dht.cpp | 100 +++-- test/test_session_params.cpp | 12 +- 22 files changed, 812 insertions(+), 470 deletions(-) create mode 100644 include/libtorrent/aux_/session_listen_socket.hpp create mode 100644 include/libtorrent/aux_/session_udp_sockets.hpp diff --git a/include/libtorrent/Makefile.am b/include/libtorrent/Makefile.am index 4a1ba4fef..e748972fd 100644 --- a/include/libtorrent/Makefile.am +++ b/include/libtorrent/Makefile.am @@ -177,7 +177,9 @@ nobase_include_HEADERS = \ aux_/merkle.hpp \ aux_/session_call.hpp \ aux_/session_impl.hpp \ + aux_/session_listen_socket.hpp \ aux_/session_settings.hpp \ + aux_/session_udp_sockets.hpp \ aux_/proxy_settings.hpp \ aux_/session_interface.hpp \ aux_/suggest_piece.hpp \ diff --git a/include/libtorrent/aux_/session_impl.hpp b/include/libtorrent/aux_/session_impl.hpp index b7038eae3..9a080e908 100644 --- a/include/libtorrent/aux_/session_impl.hpp +++ b/include/libtorrent/aux_/session_impl.hpp @@ -36,6 +36,7 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/config.hpp" #include "libtorrent/aux_/session_settings.hpp" #include "libtorrent/aux_/session_interface.hpp" +#include "libtorrent/aux_/session_udp_sockets.hpp" #include "libtorrent/linked_list.hpp" #include "libtorrent/torrent_peer.hpp" #include "libtorrent/torrent_peer_allocator.hpp" @@ -117,8 +118,14 @@ namespace dht { class item; } - struct listen_socket_t + struct listen_socket_t final : aux::session_listen_socket { + address get_external_address() override + { return external_address.external_address(); } + + tcp::endpoint get_local_endpoint() override + { return local_endpoint; } + listen_socket_t() { tcp_port_mapping[0] = -1; @@ -160,19 +167,13 @@ namespace dht { // set to true if this is an SSL listen socket bool ssl = false; - // this is true when the udp socket send() has failed with EAGAIN or - // EWOULDBLOCK. i.e. we're currently waiting for the socket to become - // writeable again. Once it is, we'll set it to false and notify the utp - // socket manager - bool udp_write_blocked = false; - // the actual sockets (TCP listen socket and UDP socket) // An entry does not necessarily have a UDP or TCP socket. One of these // pointers may be nullptr! // These must be shared_ptr to avoid a dangling reference if an // incoming packet is in the event queue when the socket is erased std::shared_ptr sock; - std::shared_ptr udp_sock; + std::shared_ptr udp_sock; }; namespace aux { @@ -557,7 +558,9 @@ namespace aux { void set_peer_id(peer_id const& id); void set_key(std::uint32_t key); std::uint16_t listen_port() const override; + std::uint16_t listen_port(listen_socket_t* sock) const; std::uint16_t ssl_listen_port() const override; + std::uint16_t ssl_listen_port(listen_socket_t* sock) const; alert_manager& alerts() override { return m_alerts; } disk_interface& disk_thread() override { return m_disk_thread; } @@ -623,9 +626,8 @@ namespace aux { int send_buffer_size() const override { return send_buffer_size_impl; } // implements dht_observer - virtual void set_external_address(address const& ip - , address const& source) override; - virtual address external_address(udp proto) override; + virtual void set_external_address(aux::session_listen_socket* iface + , address const& ip, address const& source) override; virtual void get_peers(sha1_hash const& ih) override; virtual void announce(sha1_hash const& ih, address const& addr, int port) override; virtual void outgoing_get_peers(sha1_hash const& target @@ -651,6 +653,9 @@ namespace aux { void set_external_address(address const& ip , int source_type, address const& source) override; + void set_external_address(tcp::endpoint const& local_endpoint + , address const& ip + , int source_type, address const& source) override; virtual external_ip external_address() const override; // used when posting synchronous function @@ -743,6 +748,12 @@ namespace aux { void on_lsd_peer(tcp::endpoint const& peer, sha1_hash const& ih) override; void setup_socket_buffers(socket_type& s) override; + void set_external_address(listen_socket_t& sock, address const& ip + , int const source_type, address const& source); + + void interface_to_endpoints(std::string const& device, int const port + , bool const ssl, std::vector& eps); + // the settings for the client aux::session_settings m_settings; @@ -1054,15 +1065,28 @@ namespace aux { , error_code& ec , int flags); + void send_udp_packet_hostname_listen(aux::session_listen_socket* sock + , char const* hostname + , int port + , span p + , error_code& ec + , int flags); + void send_udp_packet(bool ssl , udp::endpoint const& ep , span p , error_code& ec , int flags); - void on_udp_writeable(std::weak_ptr s, error_code const& ec); + void send_udp_packet_listen(aux::session_listen_socket* sock + , udp::endpoint const& ep + , span p + , error_code& ec + , int flags); - void on_udp_packet(std::weak_ptr const& s + void on_udp_writeable(std::weak_ptr s, error_code const& ec); + + void on_udp_packet(std::weak_ptr s , bool ssl, error_code const& ec); libtorrent::utp_socket_manager m_utp_socket_manager; diff --git a/include/libtorrent/aux_/session_interface.hpp b/include/libtorrent/aux_/session_interface.hpp index 565110024..f2189ee40 100644 --- a/include/libtorrent/aux_/session_interface.hpp +++ b/include/libtorrent/aux_/session_interface.hpp @@ -42,6 +42,7 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/error_code.hpp" #include "libtorrent/socket.hpp" // for tcp::endpoint #include "libtorrent/aux_/vector.hpp" +#include "libtorrent/aux_/session_listen_socket.hpp" #include #include @@ -138,6 +139,9 @@ namespace libtorrent { namespace aux { virtual void set_external_address(address const& ip , int source_type, address const& source) = 0; + virtual void set_external_address(tcp::endpoint const& local_endpoint + , address const& ip + , int source_type, address const& source) = 0; virtual external_ip external_address() const = 0; virtual disk_interface& disk_thread() = 0; diff --git a/include/libtorrent/aux_/session_listen_socket.hpp b/include/libtorrent/aux_/session_listen_socket.hpp new file mode 100644 index 000000000..46bf9c978 --- /dev/null +++ b/include/libtorrent/aux_/session_listen_socket.hpp @@ -0,0 +1,60 @@ +/* + +Copyright (c) 2017, Steven Siloti +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: + +* Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. +* Redistributions in binary form must reproduce the above copyright +notice, this list of conditions and the following disclaimer in +the documentation and/or other materials provided with the distribution. +* Neither the name of the author nor the names of its +contributors may be used to endorse or promote products derived +from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. + +*/ + +#ifndef TORRENT_SESSION_LISTEN_SOCKET_HPP_INCLUDED +#define TORRENT_SESSION_LISTEN_SOCKET_HPP_INCLUDED + +#include "libtorrent/address.hpp" +#include "libtorrent/socket.hpp" // for tcp::endpoint + +namespace libtorrent { namespace aux { + + // abstract interface for a listen socket owned by session_impl + // pointers to this type serve as a handle for the listen socket + // use a separate abstract type to prohibit outside access to private fields of listen_socket_t + // and because some users of these handles should not be coupled to session_impl + struct TORRENT_EXTRA_EXPORT session_listen_socket + { + virtual address get_external_address() = 0; + virtual tcp::endpoint get_local_endpoint() = 0; + + session_listen_socket() = default; + + protected: + session_listen_socket(session_listen_socket const&) = default; + session_listen_socket& operator=(session_listen_socket const&) = default; + ~session_listen_socket() = default; + }; + +} } + +#endif diff --git a/include/libtorrent/aux_/session_udp_sockets.hpp b/include/libtorrent/aux_/session_udp_sockets.hpp new file mode 100644 index 000000000..43a7c0567 --- /dev/null +++ b/include/libtorrent/aux_/session_udp_sockets.hpp @@ -0,0 +1,61 @@ +/* + +Copyright (c) 2017, Arvid Norberg, Steven Siloti +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: + +* Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. +* Redistributions in binary form must reproduce the above copyright +notice, this list of conditions and the following disclaimer in +the documentation and/or other materials provided with the distribution. +* Neither the name of the author nor the names of its +contributors may be used to endorse or promote products derived +from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. + +*/ + +#ifndef TORRENT_SESSION_UDP_SOCKETS_HPP_INCLUDED +#define TORRENT_SESSION_UDP_SOCKETS_HPP_INCLUDED + +#include "libtorrent/udp_socket.hpp" +#include "libtorrent/config.hpp" +#include +#include + +namespace libtorrent { namespace aux { + + struct session_udp_socket + { + explicit session_udp_socket(io_service& ios) + : sock(ios) {} + + udp::endpoint local_endpoint() { return sock.local_endpoint(); } + + udp_socket sock; + + // this is true when the udp socket send() has failed with EAGAIN or + // EWOULDBLOCK. i.e. we're currently waiting for the socket to become + // writeable again. Once it is, we'll set it to false and notify the utp + // socket manager + bool write_blocked = false; + }; + +} } + +#endif diff --git a/include/libtorrent/kademlia/dht_observer.hpp b/include/libtorrent/kademlia/dht_observer.hpp index d09b3f569..bff8823e4 100644 --- a/include/libtorrent/kademlia/dht_observer.hpp +++ b/include/libtorrent/kademlia/dht_observer.hpp @@ -37,7 +37,11 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/address.hpp" #include "libtorrent/kademlia/msg.hpp" -namespace libtorrent { namespace dht { +namespace libtorrent { + +namespace aux { struct session_listen_socket; } + +namespace dht { struct TORRENT_EXTRA_EXPORT dht_logger { @@ -64,14 +68,13 @@ namespace libtorrent { namespace dht { #endif protected: - ~dht_logger() {} + ~dht_logger() = default; }; struct TORRENT_EXTRA_EXPORT dht_observer : dht_logger { - virtual void set_external_address(address const& addr - , address const& source) = 0; - virtual address external_address(udp proto) = 0; + virtual void set_external_address(aux::session_listen_socket* iface + , address const& addr, address const& source) = 0; virtual void get_peers(sha1_hash const& ih) = 0; virtual void outgoing_get_peers(sha1_hash const& target , sha1_hash const& sent_target, udp::endpoint const& ep) = 0; @@ -80,7 +83,7 @@ namespace libtorrent { namespace dht { , dht::msg const& request, entry& response) = 0; protected: - ~dht_observer() {} + ~dht_observer() = default; }; }} diff --git a/include/libtorrent/kademlia/dht_state.hpp b/include/libtorrent/kademlia/dht_state.hpp index b5ce2d70b..e882fcd1b 100644 --- a/include/libtorrent/kademlia/dht_state.hpp +++ b/include/libtorrent/kademlia/dht_state.hpp @@ -40,6 +40,7 @@ POSSIBILITY OF SUCH DAMAGE. #include #include +#include namespace libtorrent { @@ -47,6 +48,8 @@ namespace libtorrent { } namespace libtorrent { namespace dht { + + using node_ids_t = std::vector>; // This structure helps to store and load the state // of the ``dht_tracker``. // At this moment the library is only a dual stack @@ -55,12 +58,9 @@ namespace libtorrent { namespace dht { // .. _BEP32: http://bittorrent.org/beps/bep_0032.html struct TORRENT_EXPORT dht_state { - // the id of the IPv4 node - node_id nid; - // the id of the IPv6 node - node_id nid6; + node_ids_t nids; - // the bootstrap nodes saved from the IPv4 buckets node + // the bootstrap nodes saved from the buckets node std::vector nodes; // the bootstrap nodes saved from the IPv6 buckets node std::vector nodes6; @@ -68,6 +68,7 @@ namespace libtorrent { namespace dht { void clear(); }; + TORRENT_EXTRA_EXPORT node_ids_t extract_node_ids(bdecode_node const& e, string_view key); TORRENT_EXTRA_EXPORT dht_state read_dht_state(bdecode_node const& e); TORRENT_EXTRA_EXPORT entry save_dht_state(dht_state const& state); }} diff --git a/include/libtorrent/kademlia/dht_tracker.hpp b/include/libtorrent/kademlia/dht_tracker.hpp index 3827b0ea9..94ed7c6b1 100644 --- a/include/libtorrent/kademlia/dht_tracker.hpp +++ b/include/libtorrent/kademlia/dht_tracker.hpp @@ -39,6 +39,7 @@ POSSIBILITY OF SUCH DAMAGE. #include #include +#include #include #include #include @@ -56,10 +57,11 @@ namespace libtorrent { namespace libtorrent { namespace dht { struct TORRENT_EXTRA_EXPORT dht_tracker final - : udp_socket_interface + : socket_manager , std::enable_shared_from_this { - using send_fun_t = std::function, error_code&, int)>; dht_tracker(dht_observer* observer @@ -69,14 +71,23 @@ namespace libtorrent { namespace dht { , counters& cnt , dht_storage_interface& storage , dht_state state); - virtual ~dht_tracker(); + +#if defined(_MSC_VER) && _MSC_VER < 1910 + // workaround for a bug in msvc 14.0 + // it attempts to generate a copy constructor for some strange reason + // and fails because tracker_node is not copyable + dht_tracker(dht_tracker const&) = delete; +#endif void start(find_data::nodes_callback const& f); void stop(); // tell the node to recalculate its node id based on the current // understanding of its external address (which may have changed) - void update_node_id(); + void update_node_id(aux::session_listen_socket* s); + + void new_socket(aux::session_listen_socket* s); + void delete_socket(aux::session_listen_socket* s); void add_node(udp::endpoint const& node); void add_router_node(udp::endpoint const& node); @@ -128,18 +139,33 @@ namespace libtorrent { namespace dht { std::vector> live_nodes(node_id const& nid); private: + struct tracker_node + { + tracker_node(io_service& ios + , aux::session_listen_socket* s, socket_manager* sock + , libtorrent::dht_settings const& settings + , node_id const& nid + , dht_observer* observer, counters& cnt + , get_foreign_node_t get_foreign_node + , dht_storage_interface& storage); + + node dht; + deadline_timer connection_timer; + }; + using tracker_nodes_t = std::map; std::shared_ptr self() { return shared_from_this(); } - void connection_timeout(node& n, error_code const& e); + void connection_timeout(tracker_node& n, error_code const& e); void refresh_timeout(error_code const& e); void refresh_key(error_code const& e); void update_storage_node_ids(); + node* get_node(node_id const& id, std::string const& family_name); - // implements udp_socket_interface + // implements socket_manager virtual bool has_quota() override; - virtual bool send_packet(entry& e, udp::endpoint const& addr) override; + virtual bool send_packet(aux::session_listen_socket* s, entry& e, udp::endpoint const& addr) override; // this is the bdecode_node DHT messages are parsed into. It's a member // in order to avoid having to deallocate and re-allocate it for every @@ -149,27 +175,18 @@ namespace libtorrent { namespace dht { counters& m_counters; dht_storage_interface& m_storage; dht_state m_state; // to be used only once - node m_dht; -#if TORRENT_USE_IPV6 - node m_dht6; -#endif + tracker_nodes_t m_nodes; send_fun_t m_send_fun; - dht_logger* m_log; + dht_observer* m_log; std::vector m_send_buf; dos_blocker m_blocker; deadline_timer m_key_refresh_timer; - deadline_timer m_connection_timer; -#if TORRENT_USE_IPV6 - deadline_timer m_connection_timer6; -#endif deadline_timer m_refresh_timer; dht_settings const& m_settings; - std::map m_nodes; - - bool m_abort; + bool m_running; // used to resolve hostnames for nodes udp::resolver m_host_resolver; diff --git a/include/libtorrent/kademlia/node.hpp b/include/libtorrent/kademlia/node.hpp index b95f59378..e253b9e9b 100644 --- a/include/libtorrent/kademlia/node.hpp +++ b/include/libtorrent/kademlia/node.hpp @@ -54,6 +54,7 @@ namespace libtorrent { struct counters; struct dht_routing_bucket; struct dht_settings; + namespace aux { struct session_listen_socket; } } namespace libtorrent { namespace dht { @@ -75,22 +76,25 @@ public: void reply(msg const&) { flags |= flag_done; } }; -struct udp_socket_interface +struct socket_manager { virtual bool has_quota() = 0; - virtual bool send_packet(entry& e, udp::endpoint const& addr) = 0; + virtual bool send_packet(aux::session_listen_socket* s, entry& e, udp::endpoint const& addr) = 0; protected: - ~udp_socket_interface() {} + ~socket_manager() = default; }; +// get the closest node to the id with the given family_name +using get_foreign_node_t = std::function; + class TORRENT_EXTRA_EXPORT node : boost::noncopyable { public: - node(udp proto, udp_socket_interface* sock + node(aux::session_listen_socket* sock, socket_manager* sock_man , libtorrent::dht_settings const& settings , node_id const& nid , dht_observer* observer, counters& cnt - , std::map const& nodes + , get_foreign_node_t get_foreign_node , dht_storage_interface& storage); ~node(); @@ -241,7 +245,7 @@ private: static protocol_descriptor const& map_protocol_to_descriptor(udp protocol); - std::map const& m_nodes; + get_foreign_node_t m_get_foreign_node; dht_observer* m_observer; @@ -256,7 +260,8 @@ private: // secret random numbers used to create write tokens std::uint32_t m_secret[2]; - udp_socket_interface* m_sock; + socket_manager* m_sock_man; + aux::session_listen_socket* m_sock; counters& m_counters; dht_storage_interface& m_storage; diff --git a/include/libtorrent/kademlia/rpc_manager.hpp b/include/libtorrent/kademlia/rpc_manager.hpp index 88467d1a7..57a4dc50a 100644 --- a/include/libtorrent/kademlia/rpc_manager.hpp +++ b/include/libtorrent/kademlia/rpc_manager.hpp @@ -45,12 +45,12 @@ POSSIBILITY OF SUCH DAMAGE. #include #include -namespace libtorrent { struct dht_settings; class entry; } +namespace libtorrent { struct dht_settings; class entry; namespace aux { struct session_listen_socket; } } namespace libtorrent { namespace dht { struct dht_logger; -struct udp_socket_interface; +struct socket_manager; struct TORRENT_EXTRA_EXPORT null_observer : public observer { @@ -68,7 +68,8 @@ public: rpc_manager(node_id const& our_id , dht_settings const& settings , routing_table& table - , udp_socket_interface* sock + , aux::session_listen_socket* sock + , socket_manager* sock_man , dht_logger* log); ~rpc_manager(); @@ -118,7 +119,8 @@ private: std::unordered_multimap m_transactions; - udp_socket_interface* m_sock; + aux::session_listen_socket* m_sock; + socket_manager* m_sock_man; #ifndef TORRENT_DISABLE_LOGGING dht_logger* m_log; #endif diff --git a/include/libtorrent/settings_pack.hpp b/include/libtorrent/settings_pack.hpp index f47cc8c90..7bce78359 100644 --- a/include/libtorrent/settings_pack.hpp +++ b/include/libtorrent/settings_pack.hpp @@ -199,11 +199,6 @@ namespace libtorrent { // GUID must be uppercased string embraced in curly brackets. // ``{E4F0B674-0DFC-48BB-98A5-2AA730BDB6D6}::7777`` - will accept // connections on port 7777 on adapter with this GUID. - // - // .. note:: - // The current support for opening arbitrary UDP sockets is limited. - // In this version of libtorrent, there will only ever be two UDP - // sockets, one for IPv4 and one for IPv6. listen_interfaces, // when using a poxy, this is the hostname where the proxy is running diff --git a/simulation/setup_dht.cpp b/simulation/setup_dht.cpp index 3e34af040..5c74a74ec 100644 --- a/simulation/setup_dht.cpp +++ b/simulation/setup_dht.cpp @@ -44,6 +44,7 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/random.hpp" #include "libtorrent/crc32c.hpp" #include "libtorrent/alert_types.hpp" // for dht_routing_bucket +#include "libtorrent/aux_/session_listen_socket.hpp" #include "setup_dht.hpp" @@ -78,28 +79,30 @@ namespace { } // anonymous namespace -struct dht_node final : lt::dht::udp_socket_interface +struct dht_node final : lt::dht::socket_manager, lt::aux::session_listen_socket { dht_node(sim::simulation& sim, lt::dht_settings const& sett, lt::counters& cnt , int const idx, std::uint32_t const flags) : m_io_service(sim, (flags & dht_network::bind_ipv6) ? addr6_from_int(idx) : addr_from_int(idx)) , m_dht_storage(lt::dht::dht_default_storage_constructor(sett)) -#if LIBSIMULATOR_USE_MOVE - , m_socket(m_io_service) - , m_dht((flags & dht_network::bind_ipv6) ? udp::v6() : udp::v4() - , this, sett, id_from_addr(m_io_service.get_ips().front()) - , nullptr, cnt, m_nodes, *m_dht_storage) -#else - , m_socket(new asio::ip::udp::socket(m_io_service)) - , m_dht(new lt::dht::node((flags & dht_network::bind_ipv6) ? udp::v6() : udp::v4() - , this, sett, id_from_addr(m_io_service.get_ips().front()) - , nullptr, cnt, m_nodes, *m_dht_storage)) -#endif , m_add_dead_nodes((flags & dht_network::add_dead_nodes) != 0) , m_ipv6((flags & dht_network::bind_ipv6) != 0) +#if LIBSIMULATOR_USE_MOVE + , m_socket(m_io_service) + , m_dht(this, this, sett, id_from_addr(m_io_service.get_ips().front()) + , nullptr, cnt + , [](lt::dht::node_id const&, std::string const&) -> lt::dht::node* { return nullptr; } + , *m_dht_storage) +#else + , m_socket(new asio::ip::udp::socket(m_io_service)) + , m_dht(new lt::dht::node(this, this, sett + , id_from_addr(m_io_service.get_ips().front()) + , nullptr, cnt + , [](lt::dht::node_id const&, std::string const&) -> lt::dht::node* { return nullptr; } + , *m_dht_storage)) +#endif { m_dht_storage->update_node_ids({id_from_addr(m_io_service.get_ips().front())}); - m_nodes.insert(std::make_pair(dht().protocol_family_name(), &dht())); error_code ec; sock().open(m_ipv6 ? asio::ip::udp::v6() : asio::ip::udp::v4()); sock().bind(asio::ip::udp::endpoint( @@ -124,10 +127,13 @@ struct dht_node final : lt::dht::udp_socket_interface // movable and make sure it never needs to be moved (for instance, by // reserving space in the vector before emplacing any nodes). dht_node(dht_node&& n) noexcept - : m_socket(std::move(n.m_socket)) - , m_dht(n.m_ipv6 ? udp::v6() : udp::v4(), this, n.m_dht.settings(), n.m_dht.nid() + : m_add_dead_nodes(n.m_add_dead_nodes) + , m_ipv6(n.m_ipv6) + , m_socket(std::move(n.m_socket)) + , m_dht(this, this, n.m_dht.settings(), n.m_dht.nid() , n.m_dht.observer(), n.m_dht.stats_counters() - , std::map(), *n.m_dht_storage) + , [](lt::dht::node_id const&, std::string const&) -> lt::dht::node* { return nullptr; } + , *n.m_dht_storage) { assert(false && "dht_node is not movable"); throw std::runtime_error("dht_node is not movable"); @@ -167,7 +173,7 @@ struct dht_node final : lt::dht::udp_socket_interface } bool has_quota() override { return true; } - bool send_packet(entry& e, udp::endpoint const& addr) override + bool send_packet(lt::aux::session_listen_socket* s, entry& e, udp::endpoint const& addr) override { // since the simulaton is single threaded, we can get away with allocating // just a single send buffer @@ -181,6 +187,18 @@ struct dht_node final : lt::dht::udp_socket_interface return true; } + address get_external_address() override + { + return get_local_endpoint().address(); + } + + tcp::endpoint get_local_endpoint() override + { + if (sock().is_open()) return tcp::endpoint(sock().local_endpoint().address(), sock().local_endpoint().port()); + if (m_ipv6) return tcp::endpoint(address_v6(), 0); + return tcp::endpoint(address_v4(), 0); + } + // the node_id and IP address of this node std::pair node_info() const { @@ -257,7 +275,8 @@ struct dht_node final : lt::dht::udp_socket_interface private: asio::io_service m_io_service; std::shared_ptr m_dht_storage; - std::map m_nodes; + bool const m_add_dead_nodes; + bool const m_ipv6; #if LIBSIMULATOR_USE_MOVE lt::udp::socket m_socket; lt::udp::socket& sock() { return m_socket; } @@ -268,8 +287,6 @@ private: std::shared_ptr m_dht; #endif lt::udp::endpoint m_ep; - bool m_add_dead_nodes; - bool m_ipv6; char m_buffer[1300]; }; diff --git a/simulation/test_dht.cpp b/simulation/test_dht.cpp index 8ace908f5..55a4d8c6c 100644 --- a/simulation/test_dht.cpp +++ b/simulation/test_dht.cpp @@ -65,13 +65,7 @@ void bootstrap_session(std::vector networks, lt::session& ses) // bootstrap off of 8 of the nodes auto router_nodes = dht->router_nodes(); - char const* nodes_key; - if (router_nodes.front().address().is_v6()) - nodes_key = "nodes6"; - else - nodes_key = "nodes"; - - lt::entry::list_type& nodes = state["dht state"][nodes_key].list(); + lt::entry::list_type& nodes = state["dht state"]["nodes"].list(); for (auto const& n : router_nodes) { std::string node; diff --git a/simulation/test_dht_rate_limit.cpp b/simulation/test_dht_rate_limit.cpp index e5ea1b9dc..4c352eb6b 100644 --- a/simulation/test_dht_rate_limit.cpp +++ b/simulation/test_dht_rate_limit.cpp @@ -34,6 +34,7 @@ POSSIBILITY OF SUCH DAMAGE. #include "simulator/simulator.hpp" +#include "libtorrent/aux_/session_listen_socket.hpp" #include "libtorrent/udp_socket.hpp" #include "libtorrent/kademlia/dht_tracker.hpp" #include "libtorrent/kademlia/dht_state.hpp" @@ -56,16 +57,9 @@ using namespace std::placeholders; struct obs : dht::dht_observer { - void set_external_address(address const& /* addr */ + void set_external_address(lt::aux::session_listen_socket*, address const& /* addr */ , address const& /* source */) override {} - address external_address(udp proto) override - { - if (proto == udp::v4()) - return address_v4::from_string("40.30.20.10"); - else - return address_v6(); - } void get_peers(sha1_hash const&) override {} void outgoing_get_peers(sha1_hash const& /* target */ , sha1_hash const& /* sent_target */, udp::endpoint const& /* ep */) override {} @@ -91,6 +85,25 @@ struct obs : dht::dht_observer #endif }; +struct mock_socket : lt::aux::session_listen_socket +{ + address get_external_address() override + { + return get_local_endpoint().address(); + } + + tcp::endpoint get_local_endpoint() override + { + return tcp::endpoint(address_v4::from_string("40.30.20.10"), 8888); + } +}; + +void send_packet(lt::udp_socket& sock, lt::aux::session_listen_socket*, udp::endpoint const& ep + , span p, error_code& ec, int flags) +{ + sock.send(ep, p, ec, flags); +} + #endif // #if !defined TORRENT_DISABLE_DHT TORRENT_TEST(dht_rate_limit) @@ -104,6 +117,7 @@ TORRENT_TEST(dht_rate_limit) // receiver (the DHT under test) lt::udp_socket sock(dht_ios); obs o; + mock_socket ds; error_code ec; sock.bind(udp::endpoint(address_v4::from_string("40.30.20.10"), 8888), ec); dht_settings dhtsett; @@ -117,8 +131,9 @@ TORRENT_TEST(dht_rate_limit) dht::dht_state state; std::unique_ptr dht_storage(dht::dht_default_storage_constructor(dhtsett)); auto dht = std::make_shared( - &o, dht_ios, std::bind(&udp_socket::send, &sock, _1, _2, _3, _4) + &o, dht_ios, std::bind(&send_packet, std::ref(sock), _1, _2, _3, _4, _5) , dhtsett, cnt, *dht_storage, state); + dht->new_socket(&ds); bool stop = false; std::function on_read diff --git a/src/kademlia/dht_state.cpp b/src/kademlia/dht_state.cpp index 37471ab49..3b868d32a 100644 --- a/src/kademlia/dht_state.cpp +++ b/src/kademlia/dht_state.cpp @@ -36,16 +36,44 @@ POSSIBILITY OF SUCH DAMAGE. #include namespace libtorrent { namespace dht { -namespace { - node_id extract_node_id(bdecode_node const& e, string_view key) + node_ids_t extract_node_ids(bdecode_node const& e, string_view key) { - if (e.type() != bdecode_node::dict_t) return node_id(); - auto nid = e.dict_find_string_value(key); - if (nid.size() != 20) return node_id(); - return node_id(nid); + if (e.type() != bdecode_node::dict_t) return node_ids_t(); + node_ids_t ret; + // first look for an old-style nid + auto old_nid = e.dict_find_string_value(key); + if (old_nid.size() == 20) + { + ret.emplace_back(address(), node_id(old_nid)); + return ret; + } + auto nids = e.dict_find_list(key); + if (!nids) return ret; + for (int i = 0; i < nids.list_size(); i++) + { + bdecode_node nid = nids.list_at(i); + if (nid.type() != bdecode_node::string_t) continue; + if (nid.string_length() < 20) continue; + char const* in = nid.string_ptr(); + node_id id(in); + in += id.size(); + address addr; + if (nid.string_length() == 24) + addr = detail::read_v4_address(in); +#if TORRENT_USE_IPV6 + else if (nid.string_length() == 36) + addr = detail::read_v6_address(in); +#endif + else + continue; + ret.emplace_back(addr, id); + } + + return ret; } +namespace { entry save_nodes(std::vector const& nodes) { entry ret(entry::list_t); @@ -63,8 +91,8 @@ namespace { void dht_state::clear() { - nid.clear(); - nid6.clear(); + nids.clear(); + nids.shrink_to_fit(); nodes.clear(); nodes.shrink_to_fit(); @@ -78,10 +106,7 @@ namespace { if (e.type() != bdecode_node::dict_t) return ret; - ret.nid = extract_node_id(e, "node-id"); -#if TORRENT_USE_IPV6 - ret.nid6 = extract_node_id(e, "node-id6"); -#endif + ret.nids = extract_node_ids(e, "node-id"); if (bdecode_node const nodes = e.dict_find_list("nodes")) ret.nodes = detail::read_endpoint_list(nodes); @@ -89,18 +114,23 @@ namespace { if (bdecode_node const nodes = e.dict_find_list("nodes6")) ret.nodes6 = detail::read_endpoint_list(nodes); #endif - return ret; } entry save_dht_state(dht_state const& state) { entry ret(entry::dictionary_t); - ret["node-id"] = state.nid.to_string(); + auto& nids = ret["node-id"].list(); + for (auto const& n : state.nids) + { + std::string nid; + std::copy(n.second.begin(), n.second.end(), std::back_inserter(nid)); + detail::write_address(n.first, std::back_inserter(nid)); + nids.emplace_back(std::move(nid)); + } entry const nodes = save_nodes(state.nodes); if (!nodes.list().empty()) ret["nodes"] = nodes; #if TORRENT_USE_IPV6 - ret["node-id6"] = state.nid6.to_string(); entry const nodes6 = save_nodes(state.nodes6); if (!nodes6.list().empty()) ret["nodes6"] = nodes6; #endif diff --git a/src/kademlia/dht_tracker.cpp b/src/kademlia/dht_tracker.cpp index e86c0a379..85203dcf3 100644 --- a/src/kademlia/dht_tracker.cpp +++ b/src/kademlia/dht_tracker.cpp @@ -44,6 +44,7 @@ POSSIBILITY OF SUCH DAMAGE. #include #include #include +#include // for is_local #ifndef TORRENT_DISABLE_LOGGING #include // to_hex @@ -91,95 +92,105 @@ namespace libtorrent { namespace dht { : m_counters(cnt) , m_storage(storage) , m_state(std::move(state)) - , m_dht(udp::v4(), this, settings, m_state.nid - , observer, cnt, m_nodes, storage) -#if TORRENT_USE_IPV6 - , m_dht6(udp::v6(), this, settings, m_state.nid6 - , observer, cnt, m_nodes, storage) -#endif , m_send_fun(send_fun) , m_log(observer) , m_key_refresh_timer(ios) - , m_connection_timer(ios) -#if TORRENT_USE_IPV6 - , m_connection_timer6(ios) -#endif , m_refresh_timer(ios) , m_settings(settings) - , m_abort(false) + , m_running(false) , m_host_resolver(ios) , m_send_quota(settings.upload_rate_limit) , m_last_tick(aux::time_now()) { m_blocker.set_block_timer(m_settings.block_timeout); m_blocker.set_rate_limit(m_settings.block_ratelimit); + } - m_nodes.insert(std::make_pair(m_dht.protocol_family_name(), &m_dht)); -#if TORRENT_USE_IPV6 - m_nodes.insert(std::make_pair(m_dht6.protocol_family_name(), &m_dht6)); -#endif - + void dht_tracker::update_node_id(aux::session_listen_socket* s) + { + auto n = m_nodes.find(s); + if (n != m_nodes.end()) + n->second.dht.update_node_id(); update_storage_node_ids(); + } + + void dht_tracker::new_socket(aux::session_listen_socket* s) + { + address local_address = s->get_local_endpoint().address(); +#if TORRENT_USE_IPV6 + // don't try to start dht nodes on non-global IPv6 addresses + // with IPv4 the interface might be behind NAT so we can't skip them based on the scope of the local address + // and we might not have the external address yet + if (local_address.is_v6() && is_local(local_address)) + return; +#endif + auto stored_nid = std::find_if(m_state.nids.begin(), m_state.nids.end() + , [&](node_ids_t::value_type const& nid) { return nid.first == local_address; }); + node_id const nid = stored_nid != m_state.nids.end() ? stored_nid->second : node_id(); + // must use piecewise construction because tracker_node::connection_timer + // is neither copyable nor movable + auto n = m_nodes.emplace(std::piecewise_construct_t(), std::forward_as_tuple(s) + , std::forward_as_tuple(m_key_refresh_timer.get_io_service() + , s, this, m_settings, nid, m_log, m_counters + , std::bind(&dht_tracker::get_node, this, _1, _2) + , m_storage)); #ifndef TORRENT_DISABLE_LOGGING if (m_log->should_log(dht_logger::tracker)) { - m_log->log(dht_logger::tracker, "starting IPv4 DHT tracker with node id: %s" - , aux::to_hex(m_dht.nid()).c_str()); -#if TORRENT_USE_IPV6 - m_log->log(dht_logger::tracker, "starting IPv6 DHT tracker with node id: %s" - , aux::to_hex(m_dht6.nid()).c_str()); -#endif + m_log->log(dht_logger::tracker, "starting %s DHT tracker with node id: %s" + , local_address.is_v4() ? "IPv4" : "IPv6" + , aux::to_hex(n.first->second.dht.nid()).c_str()); } #endif + + if (m_running && n.second) + { + error_code ec; + n.first->second.connection_timer.expires_from_now(seconds(1), ec); + n.first->second.connection_timer.async_wait( + std::bind(&dht_tracker::connection_timeout, self(), std::ref(n.first->second), _1)); + n.first->second.dht.bootstrap(std::vector(), find_data::nodes_callback()); + } } - dht_tracker::~dht_tracker() = default; - - void dht_tracker::update_node_id() + void dht_tracker::delete_socket(aux::session_listen_socket* s) { - m_dht.update_node_id(); -#if TORRENT_USE_IPV6 - m_dht6.update_node_id(); -#endif - update_storage_node_ids(); + m_nodes.erase(s); } void dht_tracker::start(find_data::nodes_callback const& f) { + m_running = true; error_code ec; refresh_key(ec); - m_connection_timer.expires_from_now(seconds(1), ec); - m_connection_timer.async_wait( - std::bind(&dht_tracker::connection_timeout, self(), std::ref(m_dht), _1)); - + for (auto& n : m_nodes) + { + n.second.connection_timer.expires_from_now(seconds(1), ec); + n.second.connection_timer.async_wait( + std::bind(&dht_tracker::connection_timeout, self(), std::ref(n.second), _1)); #if TORRENT_USE_IPV6 - m_connection_timer6.expires_from_now(seconds(1), ec); - m_connection_timer6.async_wait( - std::bind(&dht_tracker::connection_timeout, self(), std::ref(m_dht6), _1)); + if (n.first->get_local_endpoint().protocol() == tcp::v6()) + n.second.dht.bootstrap(concat(m_state.nodes6, m_state.nodes), f); + else #endif + n.second.dht.bootstrap(concat(m_state.nodes, m_state.nodes6), f); + } m_refresh_timer.expires_from_now(seconds(5), ec); m_refresh_timer.async_wait(std::bind(&dht_tracker::refresh_timeout, self(), _1)); - // bootstrap with mix of IP protocols via want/nodes/nodes6 - m_dht.bootstrap(concat(m_state.nodes, m_state.nodes6), f); -#if TORRENT_USE_IPV6 - m_dht6.bootstrap(concat(m_state.nodes6, m_state.nodes), f); -#endif m_state.clear(); } void dht_tracker::stop() { - m_abort = true; + m_running = false; error_code ec; m_key_refresh_timer.cancel(ec); - m_connection_timer.cancel(ec); -#if TORRENT_USE_IPV6 - m_connection_timer6.cancel(ec); -#endif + for (auto& n : m_nodes) + n.second.connection_timer.cancel(ec); m_refresh_timer.cancel(ec); m_host_resolver.cancel(); } @@ -196,20 +207,16 @@ namespace libtorrent { namespace dht { s.active_requests.clear(); s.dht_total_allocations = 0; - m_dht.status(s); -#if TORRENT_USE_IPV6 - m_dht6.status(s); -#endif + for (auto& n : m_nodes) + n.second.dht.status(s); } #endif void dht_tracker::dht_status(std::vector& table , std::vector& requests) { - m_dht.status(table, requests); -#if TORRENT_USE_IPV6 - m_dht6.status(table, requests); -#endif + for (auto& n : m_nodes) + n.second.dht.status(table, requests); } void dht_tracker::update_stats_counters(counters& c) const @@ -224,35 +231,27 @@ namespace libtorrent { namespace dht { c.set_value(counters::dht_node_cache, 0); c.set_value(counters::dht_allocated_observers, 0); - add_dht_counters(m_dht, c); -#if TORRENT_USE_IPV6 - add_dht_counters(m_dht6, c); -#endif + for (auto& n : m_nodes) + add_dht_counters(n.second.dht, c); } - void dht_tracker::connection_timeout(node& n, error_code const& e) + void dht_tracker::connection_timeout(tracker_node& n, error_code const& e) { - if (e || m_abort) return; + if (e || !m_running) return; - time_duration d = n.connection_timeout(); + time_duration d = n.dht.connection_timeout(); error_code ec; -#if TORRENT_USE_IPV6 - deadline_timer& timer = n.protocol() == udp::v4() ? m_connection_timer : m_connection_timer6; -#else - deadline_timer& timer = m_connection_timer; -#endif + deadline_timer& timer = n.connection_timer; timer.expires_from_now(d, ec); timer.async_wait(std::bind(&dht_tracker::connection_timeout, self(), std::ref(n), _1)); } void dht_tracker::refresh_timeout(error_code const& e) { - if (e || m_abort) return; + if (e || !m_running) return; - m_dht.tick(); -#if TORRENT_USE_IPV6 - m_dht6.tick(); -#endif + for (auto& n : m_nodes) + n.second.dht.tick(); // periodically update the DOS blocker's settings from the dht_settings m_blocker.set_block_timer(m_settings.block_timeout); @@ -266,16 +265,15 @@ namespace libtorrent { namespace dht { void dht_tracker::refresh_key(error_code const& e) { - if (e || m_abort) return; + if (e || !m_running) return; error_code ec; m_key_refresh_timer.expires_from_now(key_refresh, ec); m_key_refresh_timer.async_wait(std::bind(&dht_tracker::refresh_key, self(), _1)); - m_dht.new_write_key(); -#if TORRENT_USE_IPV6 - m_dht6.new_write_key(); -#endif + for (auto& n : m_nodes) + n.second.dht.new_write_key(); + #ifndef TORRENT_DISABLE_LOGGING m_log->log(dht_logger::tracker, "*** new write key***"); #endif @@ -284,30 +282,37 @@ namespace libtorrent { namespace dht { void dht_tracker::update_storage_node_ids() { std::vector ids; - ids.push_back(m_dht.nid()); -#if TORRENT_USE_IPV6 - ids.push_back(m_dht6.nid()); -#endif + for (auto& n : m_nodes) + ids.push_back(n.second.dht.nid()); m_storage.update_node_ids(ids); } + node* dht_tracker::get_node(node_id const& id, std::string const& family_name) + { + TORRENT_UNUSED(id); + for (auto& n : m_nodes) + { + // TODO: pick the closest node rather than the first + if (n.second.dht.protocol_family_name() == family_name) + return &n.second.dht; + } + + return nullptr; + } + void dht_tracker::get_peers(sha1_hash const& ih , std::function const&)> f) { std::function> const&)> empty; - m_dht.get_peers(ih, f, empty, false); -#if TORRENT_USE_IPV6 - m_dht6.get_peers(ih, f, empty, false); -#endif + for (auto& n : m_nodes) + n.second.dht.get_peers(ih, f, empty, false); } void dht_tracker::announce(sha1_hash const& ih, int listen_port, int flags , std::function const&)> f) { - m_dht.announce(ih, listen_port, flags, f); -#if TORRENT_USE_IPV6 - m_dht6.announce(ih, listen_port, flags, f); -#endif + for (auto& n : m_nodes) + n.second.dht.announce(ih, listen_port, flags, f); } namespace { @@ -393,11 +398,9 @@ namespace libtorrent { namespace dht { void dht_tracker::get_item(sha1_hash const& target , std::function cb) { - auto ctx = std::make_shared((TORRENT_USE_IPV6) ? 2 : 1); - m_dht.get_item(target, std::bind(&get_immutable_item_callback, _1, ctx, cb)); -#if TORRENT_USE_IPV6 - m_dht6.get_item(target, std::bind(&get_immutable_item_callback, _1, ctx, cb)); -#endif + auto ctx = std::make_shared(int(m_nodes.size())); + for (auto& n : m_nodes) + n.second.dht.get_item(target, std::bind(&get_immutable_item_callback, _1, ctx, cb)); } // key is a 32-byte binary string, the public key to look up. @@ -406,11 +409,9 @@ namespace libtorrent { namespace dht { , std::function cb , std::string salt) { - auto ctx = std::make_shared((TORRENT_USE_IPV6) ? 2 : 1); - m_dht.get_item(key, salt, std::bind(&get_mutable_item_callback, _1, _2, ctx, cb)); -#if TORRENT_USE_IPV6 - m_dht6.get_item(key, salt, std::bind(&get_mutable_item_callback, _1, _2, ctx, cb)); -#endif + auto ctx = std::make_shared(int(m_nodes.size())); + for (auto& n : m_nodes) + n.second.dht.get_item(key, salt, std::bind(&get_mutable_item_callback, _1, _2, ctx, cb)); } void dht_tracker::put_item(entry const& data @@ -420,37 +421,32 @@ namespace libtorrent { namespace dht { bencode(std::back_inserter(flat_data), data); sha1_hash const target = item_target_id(flat_data); - auto ctx = std::make_shared((TORRENT_USE_IPV6) ? 2 : 1); - m_dht.put_item(target, data, std::bind(&put_immutable_item_callback + auto ctx = std::make_shared(int(m_nodes.size())); + for (auto& n : m_nodes) + n.second.dht.put_item(target, data, std::bind(&put_immutable_item_callback , _1, ctx, cb)); -#if TORRENT_USE_IPV6 - m_dht6.put_item(target, data, std::bind(&put_immutable_item_callback - , _1, ctx, cb)); -#endif } void dht_tracker::put_item(public_key const& key , std::function cb , std::function data_cb, std::string salt) { - auto ctx = std::make_shared((TORRENT_USE_IPV6) ? 2 : 1); - m_dht.put_item(key, salt, std::bind(&put_mutable_item_callback - , _1, _2, ctx, cb), data_cb); -#if TORRENT_USE_IPV6 - m_dht6.put_item(key, salt, std::bind(&put_mutable_item_callback - , _1, _2, ctx, cb), data_cb); -#endif + auto ctx = std::make_shared(int(m_nodes.size())); + for (auto& n : m_nodes) + n.second.dht.put_item(key, salt, std::bind(&put_mutable_item_callback + , _1, _2, ctx, cb), data_cb); } void dht_tracker::direct_request(udp::endpoint const& ep, entry& e , std::function f) { -#if TORRENT_USE_IPV6 - if (ep.protocol() == udp::v6()) - m_dht6.direct_request(ep, e, f); - else -#endif - m_dht.direct_request(ep, e, f); + for (auto& n : m_nodes) + { + if (ep.protocol() != (n.first->get_external_address().is_v4() ? udp::v4() : udp::v6())) + continue; + n.second.dht.direct_request(ep, e, f); + break; + } } void dht_tracker::incoming_error(error_code const& ec, udp::endpoint const& ep) @@ -466,10 +462,8 @@ namespace libtorrent { namespace dht { #endif ) { - m_dht.unreachable(ep); -#if TORRENT_USE_IPV6 - m_dht6.unreachable(ep); -#endif + for (auto& n : m_nodes) + n.second.dht.unreachable(ep); } } @@ -538,29 +532,34 @@ namespace libtorrent { namespace dht { #endif libtorrent::dht::msg m(m_msg, ep); - m_dht.incoming(m); -#if TORRENT_USE_IPV6 - m_dht6.incoming(m); -#endif + for (auto& n : m_nodes) + n.second.dht.incoming(m); return true; } + dht_tracker::tracker_node::tracker_node(io_service& ios + , aux::session_listen_socket* s, socket_manager* sock + , libtorrent::dht_settings const& settings + , node_id const& nid + , dht_observer* observer, counters& cnt + , get_foreign_node_t get_foreign_node + , dht_storage_interface& storage) + : dht(s, sock, settings, nid, observer, cnt, get_foreign_node, storage) + , connection_timer(ios) + {} + std::vector> dht_tracker::live_nodes(node_id const& nid) { std::vector> ret; - // TODO: figure out a better solution when multi-home is implemented - node const* dht = m_dht.nid() == nid ? &m_dht -#if TORRENT_USE_IPV6 - : m_dht6.nid() == nid ? &m_dht6 : nullptr; -#else - : nullptr; -#endif + auto n = std::find_if(m_nodes.begin(), m_nodes.end() + , [&](tracker_nodes_t::value_type const& v) { return v.second.dht.nid() == nid; }); - if (dht == nullptr) return ret; - - dht->m_table.for_each_node([&ret](node_entry const& e) - { ret.emplace_back(e.id, e.endpoint); }, nullptr); + if (n != m_nodes.end()) + { + n->second.dht.m_table.for_each_node([&ret](node_entry const& e) + { ret.emplace_back(e.id, e.endpoint); }, nullptr); + } return ret; } @@ -582,29 +581,27 @@ namespace libtorrent { namespace dht { dht_state dht_tracker::state() const { dht_state ret; - ret.nid = m_dht.nid(); - ret.nodes = save_nodes(m_dht); -#if TORRENT_USE_IPV6 - ret.nid6 = m_dht6.nid(); - ret.nodes6 = save_nodes(m_dht6); -#endif + for (auto& n : m_nodes) + { + // use the local rather than external address because if the user is behind NAT + // we won't know the external IP on startup + ret.nids.push_back(std::make_pair(n.first->get_local_endpoint().address(), n.second.dht.nid())); + auto nodes = save_nodes(n.second.dht); + ret.nodes.insert(ret.nodes.end(), nodes.begin(), nodes.end()); + } return ret; } void dht_tracker::add_node(udp::endpoint const& node) { - m_dht.add_node(node); -#if TORRENT_USE_IPV6 - m_dht6.add_node(node); -#endif + for (auto& n : m_nodes) + n.second.dht.add_node(node); } void dht_tracker::add_router_node(udp::endpoint const& node) { - m_dht.add_router_node(node); -#if TORRENT_USE_IPV6 - m_dht6.add_router_node(node); -#endif + for (auto& n : m_nodes) + n.second.dht.add_router_node(node); } bool dht_tracker::has_quota() @@ -624,7 +621,7 @@ namespace libtorrent { namespace dht { return m_send_quota > 0; } - bool dht_tracker::send_packet(entry& e, udp::endpoint const& addr) + bool dht_tracker::send_packet(aux::session_listen_socket* s, entry& e, udp::endpoint const& addr) { static char const version_str[] = {'L', 'T' , LIBTORRENT_VERSION_MAJOR, LIBTORRENT_VERSION_MINOR}; @@ -639,7 +636,25 @@ namespace libtorrent { namespace dht { m_send_quota -= int(m_send_buf.size()); error_code ec; - m_send_fun(addr, m_send_buf, ec, 0); + if (s->get_local_endpoint().protocol().family() != addr.protocol().family()) + { + // the node is trying to send a packet to a different address family + // than its socket, this can happen during bootstrap + // pick a node with the right address family and use its socket + auto n = std::find_if(m_nodes.begin(), m_nodes.end() + , [&](tracker_nodes_t::value_type const& v) + { return v.first->get_local_endpoint().protocol().family() == addr.protocol().family(); }); + + if (n != m_nodes.end()) + m_send_fun(n->first, addr, m_send_buf, ec, 0); + else + ec = boost::asio::error::address_family_not_supported; + } + else + { + m_send_fun(s, addr, m_send_buf, ec, 0); + } + if (ec) { m_counters.inc_stats_counter(counters::dht_messages_out_dropped); diff --git a/src/kademlia/node.cpp b/src/kademlia/node.cpp index e06efe670..0d83febc4 100644 --- a/src/kademlia/node.cpp +++ b/src/kademlia/node.cpp @@ -48,6 +48,7 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/hasher.hpp" #include "libtorrent/random.hpp" #include +#include "libtorrent/aux_/session_listen_socket.hpp" #include #include "libtorrent/aux_/throw.hpp" #include "libtorrent/alert_types.hpp" // for dht_lookup @@ -72,14 +73,14 @@ namespace { void nop() {} -node_id calculate_node_id(node_id const& nid, dht_observer* observer, udp protocol) +node_id calculate_node_id(node_id const& nid, aux::session_listen_socket* sock) { address external_address; - if (observer != nullptr) external_address = observer->external_address(protocol); + external_address = sock->get_external_address(); // if we don't have an observer, don't pretend that external_address is valid // generating an ID based on 0.0.0.0 would be terrible. random is better - if (observer == nullptr || external_address.is_unspecified()) + if (external_address.is_unspecified()) { return generate_random_id(); } @@ -101,22 +102,23 @@ void incoming_error(entry& e, char const* msg, int error_code = 203) } // anonymous namespace -node::node(udp proto, udp_socket_interface* sock +node::node(aux::session_listen_socket* sock, socket_manager* sock_man , dht_settings const& settings , node_id const& nid , dht_observer* observer , counters& cnt - , std::map const& nodes + , get_foreign_node_t get_foreign_node , dht_storage_interface& storage) : m_settings(settings) - , m_id(calculate_node_id(nid, observer, proto)) - , m_table(m_id, proto, 8, settings, observer) - , m_rpc(m_id, m_settings, m_table, sock, observer) - , m_nodes(nodes) + , m_id(calculate_node_id(nid, sock)) + , m_table(m_id, sock->get_local_endpoint().protocol() == tcp::v4() ? udp::v4() : udp::v6(), 8, settings, observer) + , m_rpc(m_id, m_settings, m_table, sock, sock_man, observer) + , m_get_foreign_node(get_foreign_node) , m_observer(observer) - , m_protocol(map_protocol_to_descriptor(proto)) + , m_protocol(map_protocol_to_descriptor(sock->get_local_endpoint().protocol() == tcp::v4() ? udp::v4() : udp::v6())) , m_last_tracker_tick(aux::time_now()) , m_last_self_refresh(min_time()) + , m_sock_man(sock_man) , m_sock(sock) , m_counters(cnt) , m_storage(storage) @@ -136,7 +138,7 @@ void node::update_node_id() // it's possible that our external address hasn't actually changed. If our // current ID is still valid, don't do anything. - if (verify_id(m_id, m_observer->external_address(protocol()))) + if (verify_id(m_id, m_sock->get_external_address())) return; #ifndef TORRENT_DISABLE_LOGGING @@ -144,7 +146,7 @@ void node::update_node_id() , "updating node ID (because external IP address changed)"); #endif - m_id = generate_id(m_observer->external_address(protocol())); + m_id = generate_id(m_sock->get_external_address()); m_table.update_node_id(m_id); m_rpc.update_node_id(m_id); @@ -285,7 +287,7 @@ void node::incoming(msg const& m) address_v6::bytes_type b; std::memcpy(&b[0], ext_ip.string_ptr(), 16); if (m_observer != nullptr) - m_observer->set_external_address(address_v6(b) + m_observer->set_external_address(m_sock, address_v6(b) , m.addr.address()); } else #endif @@ -294,7 +296,7 @@ void node::incoming(msg const& m) address_v4::bytes_type b; std::memcpy(&b[0], ext_ip.string_ptr(), 4); if (m_observer != nullptr) - m_observer->set_external_address(address_v4(b) + m_observer->set_external_address(m_sock, address_v4(b) , m.addr.address()); } @@ -315,7 +317,7 @@ void node::incoming(msg const& m) if (!native_address(m.addr)) break; - if (!m_sock->has_quota()) + if (!m_sock_man->has_quota()) { m_counters.inc_stats_counter(counters::dht_messages_in_dropped); return; @@ -323,7 +325,7 @@ void node::incoming(msg const& m) entry e; incoming_request(m, e); - m_sock->send_packet(e, m.addr); + m_sock_man->send_packet(m_sock, e, m.addr); break; } case 'e': @@ -1164,11 +1166,11 @@ void node::write_nodes_entries(sha1_hash const& info_hash bdecode_node wanted = want.list_at(i); if (wanted.type() != bdecode_node::string_t) continue; - auto wanted_node = m_nodes.find(wanted.string_value().to_string()); - if (wanted_node == m_nodes.end()) continue; + node* wanted_node = m_get_foreign_node(info_hash, wanted.string_value().to_string()); + if (!wanted_node) continue; std::vector n; - wanted_node->second->m_table.find_node(info_hash, n, 0); - r[wanted_node->second->protocol_nodes_key()] = write_nodes_entry(n); + wanted_node->m_table.find_node(info_hash, n, 0); + r[wanted_node->protocol_nodes_key()] = write_nodes_entry(n); } } diff --git a/src/kademlia/rpc_manager.cpp b/src/kademlia/rpc_manager.cpp index 3e655ad9d..a871b5d0e 100644 --- a/src/kademlia/rpc_manager.cpp +++ b/src/kademlia/rpc_manager.cpp @@ -148,10 +148,13 @@ using observer_storage = aux::aligned_union<1 rpc_manager::rpc_manager(node_id const& our_id , dht_settings const& settings - , routing_table& table, udp_socket_interface* sock + , routing_table& table + , aux::session_listen_socket* sock + , socket_manager* sock_man , dht_logger* log) : m_pool_allocator(sizeof(observer_storage), 10) , m_sock(sock) + , m_sock_man(sock_man) #ifndef TORRENT_DISABLE_LOGGING , m_log(log) #endif @@ -480,7 +483,7 @@ bool rpc_manager::invoke(entry& e, udp::endpoint const& target_addr } #endif - if (m_sock->send_packet(e, target_addr)) + if (m_sock_man->send_packet(m_sock, e, target_addr)) { m_transactions.insert(std::make_pair(tid, o)); #if TORRENT_USE_ASSERTS diff --git a/src/session_handle.cpp b/src/session_handle.cpp index da3eab4d7..f43e57059 100644 --- a/src/session_handle.cpp +++ b/src/session_handle.cpp @@ -760,12 +760,14 @@ namespace { unsigned short session_handle::listen_port() const { - return sync_call_ret(&session_impl::listen_port); + return sync_call_ret + (&session_impl::listen_port); } unsigned short session_handle::ssl_listen_port() const { - return sync_call_ret(&session_impl::ssl_listen_port); + return sync_call_ret + (&session_impl::ssl_listen_port); } bool session_handle::is_listening() const diff --git a/src/session_impl.cpp b/src/session_impl.cpp index 07c607190..3f4e4f332 100644 --- a/src/session_impl.cpp +++ b/src/session_impl.cpp @@ -939,7 +939,7 @@ namespace aux { // the uTP connections cannot be closed gracefully if (l.udp_sock) { - l.udp_sock->close(); + l.udp_sock->sock.close(); } } @@ -1160,6 +1160,7 @@ namespace { if (req.ssl_ctx) req.listen_port = ssl_listen_port(); req.ssl_ctx = &m_ssl_ctx; #endif + #if TORRENT_USE_I2P if (!m_settings.get_str(settings_pack::i2p_hostname).empty()) { @@ -1168,7 +1169,6 @@ namespace { #endif //TODO: should there be an option to announce once per listen interface? - m_tracker_manager.queue_request(get_io_service(), req, c); } @@ -1634,8 +1634,8 @@ namespace { : socket_type_t::udp; udp::endpoint const udp_bind_ep(bind_ep.address(), bind_ep.port()); - ret.udp_sock = std::make_shared(m_io_service); - ret.udp_sock->open(udp_bind_ep.protocol(), ec); + ret.udp_sock = std::make_shared(m_io_service); + ret.udp_sock->sock.open(udp_bind_ep.protocol(), ec); if (ec) { #ifndef TORRENT_DISABLE_LOGGING @@ -1657,7 +1657,7 @@ namespace { #if TORRENT_HAS_BINDTODEVICE if (!device.empty()) { - ret.udp_sock->set_option(bind_to_device(device.c_str()), ec); + ret.udp_sock->sock.set_option(bind_to_device(device.c_str()), ec); if (ec) { #ifndef TORRENT_DISABLE_LOGGING @@ -1678,7 +1678,7 @@ namespace { } } #endif - ret.udp_sock->bind(udp_bind_ep, ec); + ret.udp_sock->sock.bind(udp_bind_ep, ec); last_op = listen_failed_alert::bind; if (ec) @@ -1697,27 +1697,27 @@ namespace { return ret; } - ret.udp_external_port = ret.udp_sock->local_port(); + ret.udp_external_port = ret.udp_sock->sock.local_port(); error_code err; - set_socket_buffer_size(*ret.udp_sock, m_settings, err); + set_socket_buffer_size(ret.udp_sock->sock, m_settings, err); if (err) { if (m_alerts.should_post()) - m_alerts.emplace_alert(ret.udp_sock->local_endpoint(ec), err); + m_alerts.emplace_alert(ret.udp_sock->sock.local_endpoint(ec), err); } - ret.udp_sock->set_force_proxy(m_settings.get_bool(settings_pack::force_proxy)); + ret.udp_sock->sock.set_force_proxy(m_settings.get_bool(settings_pack::force_proxy)); // this call is necessary here because, unless the settings actually // change after the session is up and listening, at no other point // set_proxy_settings is called with the correct proxy configuration, // internally, this method handle the SOCKS5's connection logic - ret.udp_sock->set_proxy_settings(proxy()); + ret.udp_sock->sock.set_proxy_settings(proxy()); // TODO: 2 use a handler allocator here ADD_OUTSTANDING_ASYNC("session_impl::on_udp_packet"); - ret.udp_sock->async_read(std::bind(&session_impl::on_udp_packet - , this, std::weak_ptr(ret.udp_sock), ret.ssl, _1)); + ret.udp_sock->sock.async_read(std::bind(&session_impl::on_udp_packet + , this, ret.udp_sock, ret.ssl, _1)); #ifndef TORRENT_DISABLE_LOGGING if (should_log()) @@ -1763,6 +1763,55 @@ namespace { reopen_listen_sockets(); } + void session_impl::interface_to_endpoints(std::string const& device, int const port + , bool const ssl, std::vector& eps) + { + // First, check to see if it's an IP address + error_code err; + address const adr = address::from_string(device.c_str(), err); + if (!err) + { +#if !TORRENT_USE_IPV6 + if (adr.is_v4()) +#endif + eps.emplace_back(adr, port, std::string(), ssl); + } + else + { + // this is the case where device names a network device. We need to + // enumerate all IPs associated with this device + + // TODO: 3 only run this once in the caller + std::vector const ifs = enum_net_interfaces(m_io_service, err); + if (err) + { +#ifndef TORRENT_DISABLE_LOGGING + if (should_log()) + { + session_log("failed to enumerate IPs on device: \"%s\": %s" + , device.c_str(), err.message().c_str()); + } +#endif + if (m_alerts.should_post()) + { + m_alerts.emplace_alert(device + , listen_failed_alert::enum_if, err + , socket_type_t::tcp); + } + return; + } + + for (auto const& ipface : ifs) + { + // we're looking for a specific interface, and its address + // (which must be of the same family as the address we're + // connecting to) + if (device != ipface.name) continue; + eps.emplace_back(ipface.interface_address, port, device, ssl); + } + } + } + void session_impl::reopen_listen_sockets() { #ifndef TORRENT_DISABLE_LOGGING @@ -1813,55 +1862,18 @@ namespace { // IP address or a device name. In case it's a device name, we want to // (potentially) end up binding a socket for each IP address associated // with that device. - - // First, check to see if it's an IP address - error_code err; - address const adr = address::from_string(device.c_str(), err); - if (!err) - { - eps.emplace_back(adr, port, std::string(), ssl); - } - else - { - // this is the case where device names a network device. We need to - // enumerate all IPs associated with this device - - // TODO: 3 only run this once, not every turn through the loop - std::vector const ifs = enum_net_interfaces(m_io_service, ec); - if (ec) - { -#ifndef TORRENT_DISABLE_LOGGING - if (should_log()) - { - session_log("failed to enumerate IPs on device: \"%s\": %s" - , device.c_str(), ec.message().c_str()); - } -#endif - if (m_alerts.should_post()) - { - m_alerts.emplace_alert(device - , listen_failed_alert::enum_if, ec - , socket_type_t::tcp); - } - continue; - } - - for (auto const& ipface : ifs) - { - // we're looking for a specific interface, and its address - // (which must be of the same family as the address we're - // connecting to) - if (device != ipface.name) continue; - eps.emplace_back(ipface.interface_address, port, device, ssl); - } - } + interface_to_endpoints(device, port, ssl, eps); } auto remove_iter = partition_listen_sockets(eps, m_listen_sockets); while (remove_iter != m_listen_sockets.end()) { - // TODO notify interested parties of this socket's demise +#ifndef TORRENT_DISABLE_DHT + if (m_dht) + m_dht->delete_socket(&*remove_iter); +#endif + #ifndef TORRENT_DISABLE_LOGGING if (should_log()) { @@ -1871,7 +1883,7 @@ namespace { } #endif if (remove_iter->sock) remove_iter->sock->close(ec); - if (remove_iter->udp_sock) remove_iter->udp_sock->close(); + if (remove_iter->udp_sock) remove_iter->udp_sock->sock.close(); remove_iter = m_listen_sockets.erase(remove_iter); } @@ -1885,8 +1897,12 @@ namespace { if (!ec && (s.sock || s.udp_sock)) { - // TODO notify interested parties of this socket's creation m_listen_sockets.push_back(s); + +#ifndef TORRENT_DISABLE_DHT + if (m_dht) + m_dht->new_socket(&m_listen_sockets.back()); +#endif } } @@ -1923,8 +1939,8 @@ namespace { if (l.udp_sock) { - udp::endpoint const udp_ep = l.udp_sock->local_endpoint(err); - if (!err && l.udp_sock->is_open()) + udp::endpoint const udp_ep = l.udp_sock->sock.local_endpoint(err); + if (!err && l.udp_sock->sock.is_open()) { socket_type_t const socket_type = l.ssl @@ -1975,7 +1991,7 @@ namespace { , listen_socket_t& s) { tcp::endpoint const tcp_ep = s.sock ? s.sock->local_endpoint() : tcp::endpoint(); - udp::endpoint const udp_ep = s.udp_sock ? s.udp_sock->local_endpoint() : udp::endpoint(); + udp::endpoint const udp_ep = s.udp_sock ? s.udp_sock->sock.local_endpoint() : udp::endpoint(); if ((mask & remap_natpmp) && m_natpmp) { @@ -2092,29 +2108,39 @@ namespace { , int const flags) { // for now, just pick the first socket with a matching address family - // TODO: 3 for proper multi-homed support, we may want to do something - // else here. Probably let the caller decide which interface to send over + // TODO: remove this function once all callers are updated to specify a socket for (auto& i : m_listen_sockets) { if (!i.udp_sock) continue; if (i.ssl) continue; - i.udp_sock->send_hostname(hostname, port, p, ec, flags); - - if ((ec == error::would_block - || ec == error::try_again) - && !i.udp_write_blocked) - { - i.udp_write_blocked = true; - ADD_OUTSTANDING_ASYNC("session_impl::on_udp_writeable"); - i.udp_sock->async_write(std::bind(&session_impl::on_udp_writeable - , this, std::weak_ptr(i.udp_sock), _1)); - } + send_udp_packet_hostname_listen(&i, hostname, port, p, ec, flags); return; } ec = boost::asio::error::operation_not_supported; } + void session_impl::send_udp_packet_hostname_listen(aux::session_listen_socket* sock + , char const* hostname + , int const port + , span p + , error_code& ec + , int const flags) + { + auto s = static_cast(sock)->udp_sock; + + s->sock.send_hostname(hostname, port, p, ec, flags); + + if ((ec == error::would_block || ec == error::try_again) + && !s->write_blocked) + { + s->write_blocked = true; + ADD_OUTSTANDING_ASYNC("session_impl::on_udp_writeable"); + s->sock.async_write(std::bind(&session_impl::on_udp_writeable + , this, s, _1)); + } + } + void session_impl::send_udp_packet(bool const ssl , udp::endpoint const& ep , span p @@ -2131,42 +2157,55 @@ namespace { if (i.local_endpoint.address().is_v4() != ep.address().is_v4()) continue; - i.udp_sock->send(ep, p, ec, flags); - - if ((ec == error::would_block - || ec == error::try_again) - && !i.udp_write_blocked) - { - i.udp_write_blocked = true; - ADD_OUTSTANDING_ASYNC("session_impl::on_udp_writeable"); - i.udp_sock->async_write(std::bind(&session_impl::on_udp_writeable - , this, std::weak_ptr(i.udp_sock), _1)); - } + send_udp_packet_listen(&i, ep, p, ec, flags); return; } ec = boost::asio::error::operation_not_supported; } - void session_impl::on_udp_writeable(std::weak_ptr s, error_code const& ec) + void session_impl::send_udp_packet_listen(aux::session_listen_socket* sock + , udp::endpoint const& ep + , span p + , error_code& ec + , int const flags) + { + auto s = static_cast(sock)->udp_sock; + + TORRENT_ASSERT(s->sock.local_endpoint().protocol() == ep.protocol()); + + s->sock.send(ep, p, ec, flags); + + if ((ec == error::would_block + || ec == error::try_again) + && !s->write_blocked) + { + s->write_blocked = true; + ADD_OUTSTANDING_ASYNC("session_impl::on_udp_writeable"); + s->sock.async_write(std::bind(&session_impl::on_udp_writeable + , this, s, _1)); + } + } + + void session_impl::on_udp_writeable(std::weak_ptr sock, error_code const& ec) { COMPLETE_ASYNC("session_impl::on_udp_writeable"); if (ec) return; - std::shared_ptr sock = s.lock(); - if (!sock) return; + auto s = sock.lock(); + if (!s) return; + s->write_blocked = false; + +#ifdef TORRENT_USE_OPENSSL std::list::iterator i = std::find_if( m_listen_sockets.begin(), m_listen_sockets.end() - , [&sock] (listen_socket_t const& ls) { return ls.udp_sock == sock; }); - - if (i == m_listen_sockets.end()) return; - - i->udp_write_blocked = false; + , [&s] (listen_socket_t const& ls) { return ls.udp_sock == s; }); +#endif // notify the utp socket manager it can start sending on the socket again struct utp_socket_manager& mgr = #ifdef TORRENT_USE_OPENSSL - i->ssl ? m_ssl_utp_socket_manager : + (i != m_listen_sockets.end() && i->ssl) ? m_ssl_utp_socket_manager : #endif m_utp_socket_manager; @@ -2174,13 +2213,13 @@ namespace { } - void session_impl::on_udp_packet(std::weak_ptr const& socket + void session_impl::on_udp_packet(std::weak_ptr socket , bool const ssl, error_code const& ec) { COMPLETE_ASYNC("session_impl::on_udp_packet"); if (ec) { - std::shared_ptr s = socket.lock(); + std::shared_ptr s = socket.lock(); udp::endpoint ep; if (s) ep = s->local_endpoint(); @@ -2204,7 +2243,7 @@ namespace { m_stats_counters.inc_stats_counter(counters::on_udp_counter); - std::shared_ptr s = socket.lock(); + std::shared_ptr s = socket.lock(); if (!s) return; struct utp_socket_manager& mgr = @@ -2217,7 +2256,7 @@ namespace { { aux::array p; error_code err; - int const num_packets = s->read(p, err); + int const num_packets = s->sock.read(p, err); for (int i = 0; i < num_packets; ++i) { @@ -2316,7 +2355,7 @@ namespace { mgr.socket_drained(); ADD_OUTSTANDING_ASYNC("session_impl::on_udp_packet"); - s->async_read(std::bind(&session_impl::on_udp_packet + s->sock.async_read(std::bind(&session_impl::on_udp_packet , this, socket, ssl, _1)); } @@ -5057,9 +5096,7 @@ namespace { void session_impl::update_proxy() { for (auto& i : m_listen_sockets) - { - i.udp_sock->set_proxy_settings(proxy()); - } + i.udp_sock->sock.set_proxy_settings(proxy()); } void session_impl::update_upnp() @@ -5132,20 +5169,33 @@ namespace { } std::uint16_t session_impl::listen_port() const + { + return listen_port(nullptr); + } + + std::uint16_t session_impl::listen_port(listen_socket_t* sock) const { // if not, don't tell the tracker anything if we're in force_proxy // mode. We don't want to leak our listen port since it can // potentially identify us if it is leaked elsewhere if (m_settings.get_bool(settings_pack::force_proxy)) return 0; if (m_listen_sockets.empty()) return 0; + if (sock) return std::uint16_t(sock->tcp_external_port); return std::uint16_t(m_listen_sockets.front().tcp_external_port); } // TODO: 2 this function should be removed and users need to deal with the // more generic case of having multiple ssl ports std::uint16_t session_impl::ssl_listen_port() const + { + return ssl_listen_port(nullptr); + } + + std::uint16_t session_impl::ssl_listen_port(listen_socket_t* sock) const { #ifdef TORRENT_USE_OPENSSL + if (sock) return std::uint16_t(sock->tcp_external_port); + // if not, don't tell the tracker anything if we're in force_proxy // mode. We don't want to leak our listen port since it can // potentially identify us if it is leaked elsewhere @@ -5154,6 +5204,8 @@ namespace { { if (s.ssl) return std::uint16_t(s.tcp_external_port); } +#else + TORRENT_UNUSED(sock); #endif return 0; } @@ -5423,12 +5475,20 @@ namespace { m_dht = std::make_shared( static_cast(this) , m_io_service - , std::bind(&session_impl::send_udp_packet, this, false, _1, _2, _3, _4) + , [=](aux::session_listen_socket* sock + , udp::endpoint const& ep + , span p + , error_code& ec + , int flags) + { send_udp_packet_listen(sock, ep, p, ec, flags); } , m_dht_settings , m_stats_counters , *m_dht_storage , std::move(m_dht_state)); + for (auto& s : m_listen_sockets) + m_dht->new_socket(&s); + for (auto const& n : m_dht_router_nodes) { m_dht->add_router_node(n); @@ -5447,6 +5507,7 @@ namespace { if (m_alerts.should_post()) m_alerts.emplace_alert(); }; + m_dht->start(cb); } @@ -5861,17 +5922,18 @@ namespace { } #endif } + if (l.udp_sock) { error_code ec; - set_tos(*l.udp_sock, tos, ec); + set_tos(l.udp_sock->sock, tos, ec); #ifndef TORRENT_DISABLE_LOGGING if (should_log()) { session_log(">>> SET_TOS [ udp (%s %d) tos: %x e: %s ]" - , l.udp_sock->local_endpoint().address().to_string().c_str() - , l.udp_sock->local_port() + , l.udp_sock->sock.local_endpoint().address().to_string().c_str() + , l.udp_sock->sock.local_port() , tos, ec.message().c_str()); } #endif @@ -6019,14 +6081,14 @@ namespace { for (auto const& l : m_listen_sockets) { error_code ec; - set_socket_buffer_size(*l.udp_sock, m_settings, ec); + set_socket_buffer_size(l.udp_sock->sock, m_settings, ec); #ifndef TORRENT_DISABLE_LOGGING if (ec && should_log()) { error_code err; session_log("socket buffer size [ udp %s %d]: (%d) %s" - , l.udp_sock->local_endpoint().address().to_string(err).c_str() - , l.udp_sock->local_port(), ec.value(), ec.message().c_str()); + , l.udp_sock->sock.local_endpoint().address().to_string(err).c_str() + , l.udp_sock->sock.local_port(), ec.value(), ec.message().c_str()); } #endif ec.clear(); @@ -6091,7 +6153,7 @@ namespace { { for (auto& i : m_listen_sockets) { - i.udp_sock->set_force_proxy(m_settings.get_bool(settings_pack::force_proxy)); + i.udp_sock->sock.set_force_proxy(m_settings.get_bool(settings_pack::force_proxy)); // close the TCP listen sockets if (i.sock) @@ -6426,28 +6488,11 @@ namespace { } // this is the DHT observer version. DHT is the implied source - void session_impl::set_external_address(address const& ip + void session_impl::set_external_address(aux::session_listen_socket* iface, address const& ip , address const& source) { - set_external_address(ip, source_dht, source); - } - - // TODO 3 pass in a specific listen socket rather than an address family - address session_impl::external_address(udp proto) - { -#if !TORRENT_USE_IPV6 - TORRENT_UNUSED(proto); -#endif - - address addr; -#if TORRENT_USE_IPV6 - if (proto == udp::v6()) - addr = address_v6(); - else -#endif - addr = address_v4(); - addr = external_address().external_address(addr); - return addr; + TORRENT_ASSERT(iface); + set_external_address(*static_cast(iface), ip, source_dht, source); } void session_impl::get_peers(sha1_hash const& ih) @@ -6547,6 +6592,32 @@ namespace { void session_impl::set_external_address(address const& ip , int const source_type, address const& source) { + // for now, just pick the first socket with a matching address family + // TODO: remove this function once all callers are updated to specify a listen socket + for (auto& i : m_listen_sockets) + { + if (i.local_endpoint.address().is_v4() != ip.is_v4()) + continue; + + set_external_address(i, ip, source_type, source); + break; + } + } + + void session_impl::set_external_address( + tcp::endpoint const& local_endpoint, address const& ip + , int const source_type, address const& source) + { + auto sock = std::find_if(m_listen_sockets.begin(), m_listen_sockets.end() + , [&](listen_socket_t const& v) { return v.local_endpoint == local_endpoint; }); + + if (sock != m_listen_sockets.end()) + set_external_address(*sock, ip, source_type, source); + } + + void session_impl::set_external_address(listen_socket_t& sock + , address const& ip, int const source_type, address const& source) + { #ifndef TORRENT_DISABLE_LOGGING if (should_log()) { @@ -6555,16 +6626,7 @@ namespace { } #endif - // for now, just pick the first socket with a matching address family - // TODO: 3 allow the caller to select which listen socket to update - for (auto& i : m_listen_sockets) - { - if (i.local_endpoint.address().is_v4() != ip.is_v4()) - continue; - - if (!i.external_address.cast_vote(ip, source_type, source)) return; - break; - } + if (!sock.external_address.cast_vote(ip, source_type, source)) return; #ifndef TORRENT_DISABLE_LOGGING session_log(" external IP updated"); @@ -6582,7 +6644,7 @@ namespace { // restart the DHT with a new node ID #ifndef TORRENT_DISABLE_DHT - if (m_dht) m_dht->update_node_id(); + if (m_dht) m_dht->update_node_id(&sock); #endif } diff --git a/test/test_dht.cpp b/test/test_dht.cpp index 2018bd3f4..647aaac8a 100644 --- a/test/test_dht.cpp +++ b/test/test_dht.cpp @@ -49,6 +49,7 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/bloom_filter.hpp" #include "libtorrent/hasher.hpp" #include "libtorrent/aux_/time.hpp" +#include "libtorrent/aux_/session_listen_socket.hpp" #include "libtorrent/kademlia/node_id.hpp" #include "libtorrent/kademlia/routing_table.hpp" @@ -108,10 +109,10 @@ void nop_node() {} std::list> g_sent_packets; -struct mock_socket final : udp_socket_interface +struct mock_socket final : socket_manager { bool has_quota() override { return true; } - bool send_packet(entry& msg, udp::endpoint const& ep) override + bool send_packet(aux::session_listen_socket* s, entry& msg, udp::endpoint const& ep) override { // TODO: 3 ideally the mock_socket would contain this queue of packets, to // make tests independent @@ -120,6 +121,32 @@ struct mock_socket final : udp_socket_interface } }; +struct mock_dht_socket final : aux::session_listen_socket +{ + mock_dht_socket() : m_external_address(addr4("236.0.0.1")), m_local_endpoint(addr4("192.168.4.1"), 6881) {} + explicit mock_dht_socket(address ep) : m_external_address(ep), m_local_endpoint(ep, 6881) {} + + address get_external_address() override { return m_external_address; } + tcp::endpoint get_local_endpoint() override { return m_local_endpoint; } + + address m_external_address; + tcp::endpoint m_local_endpoint; +}; + +struct mock_dht_socket6 final : aux::session_listen_socket +{ + address get_external_address() override { return m_external_address; } + tcp::endpoint get_local_endpoint() override { return m_local_endpoint; } + + address m_external_address = addr6("2002::1"); + tcp::endpoint m_local_endpoint = tcp::endpoint(addr6("2002::1"), 6881); +}; + +node* get_foreign_node_stub(node_id const&, std::string const&) +{ + return nullptr; +} + sha1_hash generate_next() { sha1_hash ret; @@ -468,16 +495,12 @@ void put_immutable_item_cb(int num, int expect) struct obs : dht::dht_observer { - void set_external_address(address const& addr + void set_external_address(aux::session_listen_socket* s, address const& addr , address const& source) override { - m_external_address = addr; + static_cast(s)->m_external_address = addr; } - address external_address(udp proto) override - { - return m_external_address; - } void get_peers(sha1_hash const& ih) override {} void outgoing_get_peers(sha1_hash const& target , sha1_hash const& sent_target, udp::endpoint const& ep) override {} @@ -500,8 +523,6 @@ struct obs : dht::dht_observer bool on_dht_request(string_view query , dht::msg const& request, entry& response) override { return false; } - address m_external_address = addr4("236.0.0.1"); - #ifndef TORRENT_DISABLE_LOGGING std::vector m_log; #endif @@ -520,20 +541,22 @@ struct dht_test_setup { explicit dht_test_setup(udp::endpoint src) : sett(test_settings()) + , ds(src.address()) , dht_storage(dht_default_storage_constructor(sett)) , source(src) - , dht_node(src.protocol(), &s, sett - , node_id(nullptr), &observer, cnt, nodes, *dht_storage) + , dht_node(&ds, &s, sett + , node_id(nullptr), &observer, cnt, get_foreign_node_stub, *dht_storage) { dht_storage->update_node_ids({node_id::min()}); } + dht_settings sett; mock_socket s; + mock_dht_socket ds; obs observer; counters cnt; std::unique_ptr dht_storage; udp::endpoint source; - std::map nodes; dht::node dht_node; char error_string[200]; }; @@ -2577,15 +2600,24 @@ TORRENT_TEST(dht_dual_stack) // TODO: 3 use dht_test_setup class to simplify the node setup dht_settings sett = test_settings(); mock_socket s; + mock_dht_socket sock4; + mock_dht_socket6 sock6; obs observer; counters cnt; - std::map nodes; + node* node4p = nullptr, *node6p = nullptr; + auto get_foreign_node = [&](node_id const&, std::string const& family) + { + if (family == "n4") return node4p; + if (family == "n6") return node6p; + TEST_CHECK(false); + return (node*)nullptr; + }; std::unique_ptr dht_storage(dht_default_storage_constructor(sett)); dht_storage->update_node_ids({node_id(nullptr)}); - dht::node node4(udp::v4(), &s, sett, node_id(nullptr), &observer, cnt, nodes, *dht_storage); - dht::node node6(udp::v6(), &s, sett, node_id(nullptr), &observer, cnt, nodes, *dht_storage); - nodes.insert(std::make_pair("n4", &node4)); - nodes.insert(std::make_pair("n6", &node6)); + dht::node node4(&sock4, &s, sett, node_id(nullptr), &observer, cnt, get_foreign_node, *dht_storage); + dht::node node6(&sock6, &s, sett, node_id(nullptr), &observer, cnt, get_foreign_node, *dht_storage); + node4p = &node4; + node6p = &node6; // DHT should be running on port 48199 now bdecode_node response; @@ -3051,7 +3083,7 @@ TORRENT_TEST(node_set_id) { dht_test_setup t(udp::endpoint(rand_v4(), 20)); node_id old_nid = t.dht_node.nid(); - t.observer.set_external_address(addr4("237.0.0.1"), rand_v4()); + t.observer.set_external_address(&t.ds, addr4("237.0.0.1"), rand_v4()); t.dht_node.update_node_id(); TEST_CHECK(old_nid != t.dht_node.nid()); // now that we've changed the node's id, make sure the id sent in outgoing messages @@ -3080,13 +3112,13 @@ TORRENT_TEST(read_only_node) dht_settings sett = test_settings(); sett.read_only = true; mock_socket s; + mock_dht_socket ds; obs observer; counters cnt; - std::map nodes; std::unique_ptr dht_storage(dht_default_storage_constructor(sett)); dht_storage->update_node_ids({node_id(nullptr)}); - dht::node node(udp::v4(), &s, sett, node_id(nullptr), &observer, cnt, nodes, *dht_storage); + dht::node node(&ds, &s, sett, node_id(nullptr), &observer, cnt, get_foreign_node_stub, *dht_storage); udp::endpoint source(addr("10.0.0.1"), 20); bdecode_node response; msg_args args; @@ -3169,13 +3201,13 @@ TORRENT_TEST(invalid_error_msg) // TODO: 3 use dht_test_setup class to simplify the node setup dht_settings sett = test_settings(); mock_socket s; + mock_dht_socket ds; obs observer; counters cnt; - std::map nodes; std::unique_ptr dht_storage(dht_default_storage_constructor(sett)); dht_storage->update_node_ids({node_id(nullptr)}); - dht::node node(udp::v4(), &s, sett, node_id(nullptr), &observer, cnt, nodes, *dht_storage); + dht::node node(&ds, &s, sett, node_id(nullptr), &observer, cnt, get_foreign_node_stub, *dht_storage); udp::endpoint source(addr("10.0.0.1"), 20); entry e; @@ -3210,15 +3242,15 @@ TORRENT_TEST(rpc_invalid_error_msg) // TODO: 3 use dht_test_setup class to simplify the node setup dht_settings sett = test_settings(); mock_socket s; + mock_dht_socket ds; obs observer; counters cnt; - std::map nodes; dht::routing_table table(node_id(), udp::v4(), 8, sett, &observer); - dht::rpc_manager rpc(node_id(), sett, table, &s, &observer); + dht::rpc_manager rpc(node_id(), sett, table, &ds, &s, &observer); std::unique_ptr dht_storage(dht_default_storage_constructor(sett)); dht_storage->update_node_ids({node_id(nullptr)}); - dht::node node(udp::v4(), &s, sett, node_id(nullptr), &observer, cnt, nodes, *dht_storage); + dht::node node(&ds, &s, sett, node_id(nullptr), &observer, cnt, get_foreign_node_stub, *dht_storage); udp::endpoint source(addr("10.0.0.1"), 20); @@ -3480,13 +3512,13 @@ TORRENT_TEST(dht_state) { dht_state s; - s.nid = to_hash("0000000000000000000000000000000000000001"); + s.nids.emplace_back(address::from_string("1.1.1.1"), to_hash("0000000000000000000000000000000000000001")); s.nodes.push_back(uep("1.1.1.1", 1)); s.nodes.push_back(uep("2.2.2.2", 2)); // not important that IPv6 is disabled here - s.nid6 = to_hash("0000000000000000000000000000000000000002"); - s.nodes6.push_back(uep("3.3.3.3", 3)); - s.nodes6.push_back(uep("4.4.4.4", 4)); + s.nids.emplace_back(address::from_string("1::1"), to_hash("0000000000000000000000000000000000000002")); + s.nodes.push_back(uep("1::1", 3)); + s.nodes.push_back(uep("2::2", 4)); entry const e = save_dht_state(s); @@ -3499,18 +3531,14 @@ TORRENT_TEST(dht_state) TEST_CHECK(!r); dht_state const s1 = read_dht_state(n); - TEST_EQUAL(s1.nid, s.nid); + TEST_CHECK(s1.nids == s.nids); TEST_CHECK(s1.nodes == s.nodes); - TEST_EQUAL(s1.nid6, s.nid6); - TEST_CHECK(s1.nodes6 == s.nodes6); // empty bdecode_node n1; dht_state const s2 = read_dht_state(n1); - TEST_EQUAL(s2.nid, node_id()); + TEST_CHECK(s2.nids.empty()); TEST_CHECK(s2.nodes.empty()); - TEST_EQUAL(s2.nid6, node_id()); - TEST_CHECK(s2.nodes6.empty()); } // TODO: test obfuscated_get_peers diff --git a/test/test_session_params.cpp b/test/test_session_params.cpp index de0657875..f77c30777 100644 --- a/test/test_session_params.cpp +++ b/test/test_session_params.cpp @@ -110,13 +110,11 @@ TORRENT_TEST(dht_state) sett.max_peers = 20000; dht_state s; - s.nid = to_hash("0000000000000000000000000000000000000001"); + s.nids.emplace_back(addr4("0.0.0.0"), to_hash("0000000000000000000000000000000000000001")); s.nodes.push_back(uep("1.1.1.1", 1)); s.nodes.push_back(uep("2.2.2.2", 2)); // not important that IPv6 is disabled here - s.nid6 = to_hash("0000000000000000000000000000000000000002"); - s.nodes6.push_back(uep("3.3.3.3", 3)); - s.nodes6.push_back(uep("4.4.4.4", 4)); + s.nids.emplace_back(addr6("::"), to_hash("0000000000000000000000000000000000000002")); session_params params(p); params.dht_settings = sett; @@ -140,9 +138,11 @@ TORRENT_TEST(dht_state) TEST_EQUAL(params1.dht_settings.max_peers, 20000); // not a chance the nid will be the fake initial ones - TEST_CHECK(params1.dht_state.nid != s.nid); + TEST_CHECK(params1.dht_state.nids[0].second != s.nids[0].second); #if TORRENT_USE_IPV6 - TEST_CHECK(params1.dht_state.nid6 != s.nid6); + // the host machine may not have IPv6 support in which case there will only be one entry + if (params1.dht_state.nids.size() > 1) + TEST_CHECK(params1.dht_state.nids[1].second != s.nids[1].second); #endif } #endif