create a separate DHT node for each listen socket

This commit is contained in:
Steven Siloti 2017-04-20 21:45:43 -07:00 committed by Arvid Norberg
parent 9519fd4441
commit cdd50be859
22 changed files with 812 additions and 470 deletions

View File

@ -177,7 +177,9 @@ nobase_include_HEADERS = \
aux_/merkle.hpp \
aux_/session_call.hpp \
aux_/session_impl.hpp \
aux_/session_listen_socket.hpp \
aux_/session_settings.hpp \
aux_/session_udp_sockets.hpp \
aux_/proxy_settings.hpp \
aux_/session_interface.hpp \
aux_/suggest_piece.hpp \

View File

@ -36,6 +36,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/config.hpp"
#include "libtorrent/aux_/session_settings.hpp"
#include "libtorrent/aux_/session_interface.hpp"
#include "libtorrent/aux_/session_udp_sockets.hpp"
#include "libtorrent/linked_list.hpp"
#include "libtorrent/torrent_peer.hpp"
#include "libtorrent/torrent_peer_allocator.hpp"
@ -117,8 +118,14 @@ namespace dht {
class item;
}
struct listen_socket_t
struct listen_socket_t final : aux::session_listen_socket
{
address get_external_address() override
{ return external_address.external_address(); }
tcp::endpoint get_local_endpoint() override
{ return local_endpoint; }
listen_socket_t()
{
tcp_port_mapping[0] = -1;
@ -160,19 +167,13 @@ namespace dht {
// set to true if this is an SSL listen socket
bool ssl = false;
// this is true when the udp socket send() has failed with EAGAIN or
// EWOULDBLOCK. i.e. we're currently waiting for the socket to become
// writeable again. Once it is, we'll set it to false and notify the utp
// socket manager
bool udp_write_blocked = false;
// the actual sockets (TCP listen socket and UDP socket)
// An entry does not necessarily have a UDP or TCP socket. One of these
// pointers may be nullptr!
// These must be shared_ptr to avoid a dangling reference if an
// incoming packet is in the event queue when the socket is erased
std::shared_ptr<tcp::acceptor> sock;
std::shared_ptr<udp_socket> udp_sock;
std::shared_ptr<aux::session_udp_socket> udp_sock;
};
namespace aux {
@ -557,7 +558,9 @@ namespace aux {
void set_peer_id(peer_id const& id);
void set_key(std::uint32_t key);
std::uint16_t listen_port() const override;
std::uint16_t listen_port(listen_socket_t* sock) const;
std::uint16_t ssl_listen_port() const override;
std::uint16_t ssl_listen_port(listen_socket_t* sock) const;
alert_manager& alerts() override { return m_alerts; }
disk_interface& disk_thread() override { return m_disk_thread; }
@ -623,9 +626,8 @@ namespace aux {
int send_buffer_size() const override { return send_buffer_size_impl; }
// implements dht_observer
virtual void set_external_address(address const& ip
, address const& source) override;
virtual address external_address(udp proto) override;
virtual void set_external_address(aux::session_listen_socket* iface
, address const& ip, address const& source) override;
virtual void get_peers(sha1_hash const& ih) override;
virtual void announce(sha1_hash const& ih, address const& addr, int port) override;
virtual void outgoing_get_peers(sha1_hash const& target
@ -651,6 +653,9 @@ namespace aux {
void set_external_address(address const& ip
, int source_type, address const& source) override;
void set_external_address(tcp::endpoint const& local_endpoint
, address const& ip
, int source_type, address const& source) override;
virtual external_ip external_address() const override;
// used when posting synchronous function
@ -743,6 +748,12 @@ namespace aux {
void on_lsd_peer(tcp::endpoint const& peer, sha1_hash const& ih) override;
void setup_socket_buffers(socket_type& s) override;
void set_external_address(listen_socket_t& sock, address const& ip
, int const source_type, address const& source);
void interface_to_endpoints(std::string const& device, int const port
, bool const ssl, std::vector<listen_endpoint_t>& eps);
// the settings for the client
aux::session_settings m_settings;
@ -1054,15 +1065,28 @@ namespace aux {
, error_code& ec
, int flags);
void send_udp_packet_hostname_listen(aux::session_listen_socket* sock
, char const* hostname
, int port
, span<char const> p
, error_code& ec
, int flags);
void send_udp_packet(bool ssl
, udp::endpoint const& ep
, span<char const> p
, error_code& ec
, int flags);
void on_udp_writeable(std::weak_ptr<udp_socket> s, error_code const& ec);
void send_udp_packet_listen(aux::session_listen_socket* sock
, udp::endpoint const& ep
, span<char const> p
, error_code& ec
, int flags);
void on_udp_packet(std::weak_ptr<udp_socket> const& s
void on_udp_writeable(std::weak_ptr<session_udp_socket> s, error_code const& ec);
void on_udp_packet(std::weak_ptr<session_udp_socket> s
, bool ssl, error_code const& ec);
libtorrent::utp_socket_manager m_utp_socket_manager;

View File

@ -42,6 +42,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/error_code.hpp"
#include "libtorrent/socket.hpp" // for tcp::endpoint
#include "libtorrent/aux_/vector.hpp"
#include "libtorrent/aux_/session_listen_socket.hpp"
#include <functional>
#include <memory>
@ -138,6 +139,9 @@ namespace libtorrent { namespace aux {
virtual void set_external_address(address const& ip
, int source_type, address const& source) = 0;
virtual void set_external_address(tcp::endpoint const& local_endpoint
, address const& ip
, int source_type, address const& source) = 0;
virtual external_ip external_address() const = 0;
virtual disk_interface& disk_thread() = 0;

View File

@ -0,0 +1,60 @@
/*
Copyright (c) 2017, Steven Siloti
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in
the documentation and/or other materials provided with the distribution.
* Neither the name of the author nor the names of its
contributors may be used to endorse or promote products derived
from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef TORRENT_SESSION_LISTEN_SOCKET_HPP_INCLUDED
#define TORRENT_SESSION_LISTEN_SOCKET_HPP_INCLUDED
#include "libtorrent/address.hpp"
#include "libtorrent/socket.hpp" // for tcp::endpoint
namespace libtorrent { namespace aux {
// abstract interface for a listen socket owned by session_impl
// pointers to this type serve as a handle for the listen socket
// use a separate abstract type to prohibit outside access to private fields of listen_socket_t
// and because some users of these handles should not be coupled to session_impl
struct TORRENT_EXTRA_EXPORT session_listen_socket
{
virtual address get_external_address() = 0;
virtual tcp::endpoint get_local_endpoint() = 0;
session_listen_socket() = default;
protected:
session_listen_socket(session_listen_socket const&) = default;
session_listen_socket& operator=(session_listen_socket const&) = default;
~session_listen_socket() = default;
};
} }
#endif

View File

@ -0,0 +1,61 @@
/*
Copyright (c) 2017, Arvid Norberg, Steven Siloti
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in
the documentation and/or other materials provided with the distribution.
* Neither the name of the author nor the names of its
contributors may be used to endorse or promote products derived
from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef TORRENT_SESSION_UDP_SOCKETS_HPP_INCLUDED
#define TORRENT_SESSION_UDP_SOCKETS_HPP_INCLUDED
#include "libtorrent/udp_socket.hpp"
#include "libtorrent/config.hpp"
#include <boost/asio/io_service.hpp>
#include <vector>
namespace libtorrent { namespace aux {
struct session_udp_socket
{
explicit session_udp_socket(io_service& ios)
: sock(ios) {}
udp::endpoint local_endpoint() { return sock.local_endpoint(); }
udp_socket sock;
// this is true when the udp socket send() has failed with EAGAIN or
// EWOULDBLOCK. i.e. we're currently waiting for the socket to become
// writeable again. Once it is, we'll set it to false and notify the utp
// socket manager
bool write_blocked = false;
};
} }
#endif

View File

@ -37,7 +37,11 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/address.hpp"
#include "libtorrent/kademlia/msg.hpp"
namespace libtorrent { namespace dht {
namespace libtorrent {
namespace aux { struct session_listen_socket; }
namespace dht {
struct TORRENT_EXTRA_EXPORT dht_logger
{
@ -64,14 +68,13 @@ namespace libtorrent { namespace dht {
#endif
protected:
~dht_logger() {}
~dht_logger() = default;
};
struct TORRENT_EXTRA_EXPORT dht_observer : dht_logger
{
virtual void set_external_address(address const& addr
, address const& source) = 0;
virtual address external_address(udp proto) = 0;
virtual void set_external_address(aux::session_listen_socket* iface
, address const& addr, address const& source) = 0;
virtual void get_peers(sha1_hash const& ih) = 0;
virtual void outgoing_get_peers(sha1_hash const& target
, sha1_hash const& sent_target, udp::endpoint const& ep) = 0;
@ -80,7 +83,7 @@ namespace libtorrent { namespace dht {
, dht::msg const& request, entry& response) = 0;
protected:
~dht_observer() {}
~dht_observer() = default;
};
}}

View File

@ -40,6 +40,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include <libtorrent/kademlia/node_id.hpp>
#include <vector>
#include <utility>
namespace libtorrent {
@ -47,6 +48,8 @@ namespace libtorrent {
}
namespace libtorrent { namespace dht {
using node_ids_t = std::vector<std::pair<address, node_id>>;
// This structure helps to store and load the state
// of the ``dht_tracker``.
// At this moment the library is only a dual stack
@ -55,12 +58,9 @@ namespace libtorrent { namespace dht {
// .. _BEP32: http://bittorrent.org/beps/bep_0032.html
struct TORRENT_EXPORT dht_state
{
// the id of the IPv4 node
node_id nid;
// the id of the IPv6 node
node_id nid6;
node_ids_t nids;
// the bootstrap nodes saved from the IPv4 buckets node
// the bootstrap nodes saved from the buckets node
std::vector<udp::endpoint> nodes;
// the bootstrap nodes saved from the IPv6 buckets node
std::vector<udp::endpoint> nodes6;
@ -68,6 +68,7 @@ namespace libtorrent { namespace dht {
void clear();
};
TORRENT_EXTRA_EXPORT node_ids_t extract_node_ids(bdecode_node const& e, string_view key);
TORRENT_EXTRA_EXPORT dht_state read_dht_state(bdecode_node const& e);
TORRENT_EXTRA_EXPORT entry save_dht_state(dht_state const& state);
}}

View File

@ -39,6 +39,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include <libtorrent/kademlia/dos_blocker.hpp>
#include <libtorrent/kademlia/dht_state.hpp>
#include <libtorrent/aux_/session_listen_socket.hpp>
#include <libtorrent/socket.hpp>
#include <libtorrent/deadline_timer.hpp>
#include <libtorrent/span.hpp>
@ -56,10 +57,11 @@ namespace libtorrent {
namespace libtorrent { namespace dht {
struct TORRENT_EXTRA_EXPORT dht_tracker final
: udp_socket_interface
: socket_manager
, std::enable_shared_from_this<dht_tracker>
{
using send_fun_t = std::function<void(udp::endpoint const&
using send_fun_t = std::function<void(
aux::session_listen_socket*, udp::endpoint const&
, span<char const>, error_code&, int)>;
dht_tracker(dht_observer* observer
@ -69,14 +71,23 @@ namespace libtorrent { namespace dht {
, counters& cnt
, dht_storage_interface& storage
, dht_state state);
virtual ~dht_tracker();
#if defined(_MSC_VER) && _MSC_VER < 1910
// workaround for a bug in msvc 14.0
// it attempts to generate a copy constructor for some strange reason
// and fails because tracker_node is not copyable
dht_tracker(dht_tracker const&) = delete;
#endif
void start(find_data::nodes_callback const& f);
void stop();
// tell the node to recalculate its node id based on the current
// understanding of its external address (which may have changed)
void update_node_id();
void update_node_id(aux::session_listen_socket* s);
void new_socket(aux::session_listen_socket* s);
void delete_socket(aux::session_listen_socket* s);
void add_node(udp::endpoint const& node);
void add_router_node(udp::endpoint const& node);
@ -128,18 +139,33 @@ namespace libtorrent { namespace dht {
std::vector<std::pair<node_id, udp::endpoint>> live_nodes(node_id const& nid);
private:
struct tracker_node
{
tracker_node(io_service& ios
, aux::session_listen_socket* s, socket_manager* sock
, libtorrent::dht_settings const& settings
, node_id const& nid
, dht_observer* observer, counters& cnt
, get_foreign_node_t get_foreign_node
, dht_storage_interface& storage);
node dht;
deadline_timer connection_timer;
};
using tracker_nodes_t = std::map<aux::session_listen_socket*, tracker_node>;
std::shared_ptr<dht_tracker> self()
{ return shared_from_this(); }
void connection_timeout(node& n, error_code const& e);
void connection_timeout(tracker_node& n, error_code const& e);
void refresh_timeout(error_code const& e);
void refresh_key(error_code const& e);
void update_storage_node_ids();
node* get_node(node_id const& id, std::string const& family_name);
// implements udp_socket_interface
// implements socket_manager
virtual bool has_quota() override;
virtual bool send_packet(entry& e, udp::endpoint const& addr) override;
virtual bool send_packet(aux::session_listen_socket* s, entry& e, udp::endpoint const& addr) override;
// this is the bdecode_node DHT messages are parsed into. It's a member
// in order to avoid having to deallocate and re-allocate it for every
@ -149,27 +175,18 @@ namespace libtorrent { namespace dht {
counters& m_counters;
dht_storage_interface& m_storage;
dht_state m_state; // to be used only once
node m_dht;
#if TORRENT_USE_IPV6
node m_dht6;
#endif
tracker_nodes_t m_nodes;
send_fun_t m_send_fun;
dht_logger* m_log;
dht_observer* m_log;
std::vector<char> m_send_buf;
dos_blocker m_blocker;
deadline_timer m_key_refresh_timer;
deadline_timer m_connection_timer;
#if TORRENT_USE_IPV6
deadline_timer m_connection_timer6;
#endif
deadline_timer m_refresh_timer;
dht_settings const& m_settings;
std::map<std::string, node*> m_nodes;
bool m_abort;
bool m_running;
// used to resolve hostnames for nodes
udp::resolver m_host_resolver;

View File

@ -54,6 +54,7 @@ namespace libtorrent {
struct counters;
struct dht_routing_bucket;
struct dht_settings;
namespace aux { struct session_listen_socket; }
}
namespace libtorrent { namespace dht {
@ -75,22 +76,25 @@ public:
void reply(msg const&) { flags |= flag_done; }
};
struct udp_socket_interface
struct socket_manager
{
virtual bool has_quota() = 0;
virtual bool send_packet(entry& e, udp::endpoint const& addr) = 0;
virtual bool send_packet(aux::session_listen_socket* s, entry& e, udp::endpoint const& addr) = 0;
protected:
~udp_socket_interface() {}
~socket_manager() = default;
};
// get the closest node to the id with the given family_name
using get_foreign_node_t = std::function<node*(node_id const&, std::string const&)>;
class TORRENT_EXTRA_EXPORT node : boost::noncopyable
{
public:
node(udp proto, udp_socket_interface* sock
node(aux::session_listen_socket* sock, socket_manager* sock_man
, libtorrent::dht_settings const& settings
, node_id const& nid
, dht_observer* observer, counters& cnt
, std::map<std::string, node*> const& nodes
, get_foreign_node_t get_foreign_node
, dht_storage_interface& storage);
~node();
@ -241,7 +245,7 @@ private:
static protocol_descriptor const& map_protocol_to_descriptor(udp protocol);
std::map<std::string, node*> const& m_nodes;
get_foreign_node_t m_get_foreign_node;
dht_observer* m_observer;
@ -256,7 +260,8 @@ private:
// secret random numbers used to create write tokens
std::uint32_t m_secret[2];
udp_socket_interface* m_sock;
socket_manager* m_sock_man;
aux::session_listen_socket* m_sock;
counters& m_counters;
dht_storage_interface& m_storage;

View File

@ -45,12 +45,12 @@ POSSIBILITY OF SUCH DAMAGE.
#include <libtorrent/kademlia/node_id.hpp>
#include <libtorrent/kademlia/observer.hpp>
namespace libtorrent { struct dht_settings; class entry; }
namespace libtorrent { struct dht_settings; class entry; namespace aux { struct session_listen_socket; } }
namespace libtorrent { namespace dht {
struct dht_logger;
struct udp_socket_interface;
struct socket_manager;
struct TORRENT_EXTRA_EXPORT null_observer : public observer
{
@ -68,7 +68,8 @@ public:
rpc_manager(node_id const& our_id
, dht_settings const& settings
, routing_table& table
, udp_socket_interface* sock
, aux::session_listen_socket* sock
, socket_manager* sock_man
, dht_logger* log);
~rpc_manager();
@ -118,7 +119,8 @@ private:
std::unordered_multimap<int, observer_ptr> m_transactions;
udp_socket_interface* m_sock;
aux::session_listen_socket* m_sock;
socket_manager* m_sock_man;
#ifndef TORRENT_DISABLE_LOGGING
dht_logger* m_log;
#endif

View File

@ -199,11 +199,6 @@ namespace libtorrent {
// GUID must be uppercased string embraced in curly brackets.
// ``{E4F0B674-0DFC-48BB-98A5-2AA730BDB6D6}::7777`` - will accept
// connections on port 7777 on adapter with this GUID.
//
// .. note::
// The current support for opening arbitrary UDP sockets is limited.
// In this version of libtorrent, there will only ever be two UDP
// sockets, one for IPv4 and one for IPv6.
listen_interfaces,
// when using a poxy, this is the hostname where the proxy is running

View File

@ -44,6 +44,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/random.hpp"
#include "libtorrent/crc32c.hpp"
#include "libtorrent/alert_types.hpp" // for dht_routing_bucket
#include "libtorrent/aux_/session_listen_socket.hpp"
#include "setup_dht.hpp"
@ -78,28 +79,30 @@ namespace {
} // anonymous namespace
struct dht_node final : lt::dht::udp_socket_interface
struct dht_node final : lt::dht::socket_manager, lt::aux::session_listen_socket
{
dht_node(sim::simulation& sim, lt::dht_settings const& sett, lt::counters& cnt
, int const idx, std::uint32_t const flags)
: m_io_service(sim, (flags & dht_network::bind_ipv6) ? addr6_from_int(idx) : addr_from_int(idx))
, m_dht_storage(lt::dht::dht_default_storage_constructor(sett))
#if LIBSIMULATOR_USE_MOVE
, m_socket(m_io_service)
, m_dht((flags & dht_network::bind_ipv6) ? udp::v6() : udp::v4()
, this, sett, id_from_addr(m_io_service.get_ips().front())
, nullptr, cnt, m_nodes, *m_dht_storage)
#else
, m_socket(new asio::ip::udp::socket(m_io_service))
, m_dht(new lt::dht::node((flags & dht_network::bind_ipv6) ? udp::v6() : udp::v4()
, this, sett, id_from_addr(m_io_service.get_ips().front())
, nullptr, cnt, m_nodes, *m_dht_storage))
#endif
, m_add_dead_nodes((flags & dht_network::add_dead_nodes) != 0)
, m_ipv6((flags & dht_network::bind_ipv6) != 0)
#if LIBSIMULATOR_USE_MOVE
, m_socket(m_io_service)
, m_dht(this, this, sett, id_from_addr(m_io_service.get_ips().front())
, nullptr, cnt
, [](lt::dht::node_id const&, std::string const&) -> lt::dht::node* { return nullptr; }
, *m_dht_storage)
#else
, m_socket(new asio::ip::udp::socket(m_io_service))
, m_dht(new lt::dht::node(this, this, sett
, id_from_addr(m_io_service.get_ips().front())
, nullptr, cnt
, [](lt::dht::node_id const&, std::string const&) -> lt::dht::node* { return nullptr; }
, *m_dht_storage))
#endif
{
m_dht_storage->update_node_ids({id_from_addr(m_io_service.get_ips().front())});
m_nodes.insert(std::make_pair(dht().protocol_family_name(), &dht()));
error_code ec;
sock().open(m_ipv6 ? asio::ip::udp::v6() : asio::ip::udp::v4());
sock().bind(asio::ip::udp::endpoint(
@ -124,10 +127,13 @@ struct dht_node final : lt::dht::udp_socket_interface
// movable and make sure it never needs to be moved (for instance, by
// reserving space in the vector before emplacing any nodes).
dht_node(dht_node&& n) noexcept
: m_socket(std::move(n.m_socket))
, m_dht(n.m_ipv6 ? udp::v6() : udp::v4(), this, n.m_dht.settings(), n.m_dht.nid()
: m_add_dead_nodes(n.m_add_dead_nodes)
, m_ipv6(n.m_ipv6)
, m_socket(std::move(n.m_socket))
, m_dht(this, this, n.m_dht.settings(), n.m_dht.nid()
, n.m_dht.observer(), n.m_dht.stats_counters()
, std::map<std::string, lt::dht::node*>(), *n.m_dht_storage)
, [](lt::dht::node_id const&, std::string const&) -> lt::dht::node* { return nullptr; }
, *n.m_dht_storage)
{
assert(false && "dht_node is not movable");
throw std::runtime_error("dht_node is not movable");
@ -167,7 +173,7 @@ struct dht_node final : lt::dht::udp_socket_interface
}
bool has_quota() override { return true; }
bool send_packet(entry& e, udp::endpoint const& addr) override
bool send_packet(lt::aux::session_listen_socket* s, entry& e, udp::endpoint const& addr) override
{
// since the simulaton is single threaded, we can get away with allocating
// just a single send buffer
@ -181,6 +187,18 @@ struct dht_node final : lt::dht::udp_socket_interface
return true;
}
address get_external_address() override
{
return get_local_endpoint().address();
}
tcp::endpoint get_local_endpoint() override
{
if (sock().is_open()) return tcp::endpoint(sock().local_endpoint().address(), sock().local_endpoint().port());
if (m_ipv6) return tcp::endpoint(address_v6(), 0);
return tcp::endpoint(address_v4(), 0);
}
// the node_id and IP address of this node
std::pair<dht::node_id, lt::udp::endpoint> node_info() const
{
@ -257,7 +275,8 @@ struct dht_node final : lt::dht::udp_socket_interface
private:
asio::io_service m_io_service;
std::shared_ptr<dht::dht_storage_interface> m_dht_storage;
std::map<std::string, lt::dht::node*> m_nodes;
bool const m_add_dead_nodes;
bool const m_ipv6;
#if LIBSIMULATOR_USE_MOVE
lt::udp::socket m_socket;
lt::udp::socket& sock() { return m_socket; }
@ -268,8 +287,6 @@ private:
std::shared_ptr<lt::dht::node> m_dht;
#endif
lt::udp::endpoint m_ep;
bool m_add_dead_nodes;
bool m_ipv6;
char m_buffer[1300];
};

View File

@ -65,13 +65,7 @@ void bootstrap_session(std::vector<dht_network*> networks, lt::session& ses)
// bootstrap off of 8 of the nodes
auto router_nodes = dht->router_nodes();
char const* nodes_key;
if (router_nodes.front().address().is_v6())
nodes_key = "nodes6";
else
nodes_key = "nodes";
lt::entry::list_type& nodes = state["dht state"][nodes_key].list();
lt::entry::list_type& nodes = state["dht state"]["nodes"].list();
for (auto const& n : router_nodes)
{
std::string node;

View File

@ -34,6 +34,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include "simulator/simulator.hpp"
#include "libtorrent/aux_/session_listen_socket.hpp"
#include "libtorrent/udp_socket.hpp"
#include "libtorrent/kademlia/dht_tracker.hpp"
#include "libtorrent/kademlia/dht_state.hpp"
@ -56,16 +57,9 @@ using namespace std::placeholders;
struct obs : dht::dht_observer
{
void set_external_address(address const& /* addr */
void set_external_address(lt::aux::session_listen_socket*, address const& /* addr */
, address const& /* source */) override
{}
address external_address(udp proto) override
{
if (proto == udp::v4())
return address_v4::from_string("40.30.20.10");
else
return address_v6();
}
void get_peers(sha1_hash const&) override {}
void outgoing_get_peers(sha1_hash const& /* target */
, sha1_hash const& /* sent_target */, udp::endpoint const& /* ep */) override {}
@ -91,6 +85,25 @@ struct obs : dht::dht_observer
#endif
};
struct mock_socket : lt::aux::session_listen_socket
{
address get_external_address() override
{
return get_local_endpoint().address();
}
tcp::endpoint get_local_endpoint() override
{
return tcp::endpoint(address_v4::from_string("40.30.20.10"), 8888);
}
};
void send_packet(lt::udp_socket& sock, lt::aux::session_listen_socket*, udp::endpoint const& ep
, span<char const> p, error_code& ec, int flags)
{
sock.send(ep, p, ec, flags);
}
#endif // #if !defined TORRENT_DISABLE_DHT
TORRENT_TEST(dht_rate_limit)
@ -104,6 +117,7 @@ TORRENT_TEST(dht_rate_limit)
// receiver (the DHT under test)
lt::udp_socket sock(dht_ios);
obs o;
mock_socket ds;
error_code ec;
sock.bind(udp::endpoint(address_v4::from_string("40.30.20.10"), 8888), ec);
dht_settings dhtsett;
@ -117,8 +131,9 @@ TORRENT_TEST(dht_rate_limit)
dht::dht_state state;
std::unique_ptr<lt::dht::dht_storage_interface> dht_storage(dht::dht_default_storage_constructor(dhtsett));
auto dht = std::make_shared<lt::dht::dht_tracker>(
&o, dht_ios, std::bind(&udp_socket::send, &sock, _1, _2, _3, _4)
&o, dht_ios, std::bind(&send_packet, std::ref(sock), _1, _2, _3, _4, _5)
, dhtsett, cnt, *dht_storage, state);
dht->new_socket(&ds);
bool stop = false;
std::function<void(error_code const&, size_t)> on_read

View File

@ -36,16 +36,44 @@ POSSIBILITY OF SUCH DAMAGE.
#include <libtorrent/socket_io.hpp>
namespace libtorrent { namespace dht {
namespace {
node_id extract_node_id(bdecode_node const& e, string_view key)
node_ids_t extract_node_ids(bdecode_node const& e, string_view key)
{
if (e.type() != bdecode_node::dict_t) return node_id();
auto nid = e.dict_find_string_value(key);
if (nid.size() != 20) return node_id();
return node_id(nid);
if (e.type() != bdecode_node::dict_t) return node_ids_t();
node_ids_t ret;
// first look for an old-style nid
auto old_nid = e.dict_find_string_value(key);
if (old_nid.size() == 20)
{
ret.emplace_back(address(), node_id(old_nid));
return ret;
}
auto nids = e.dict_find_list(key);
if (!nids) return ret;
for (int i = 0; i < nids.list_size(); i++)
{
bdecode_node nid = nids.list_at(i);
if (nid.type() != bdecode_node::string_t) continue;
if (nid.string_length() < 20) continue;
char const* in = nid.string_ptr();
node_id id(in);
in += id.size();
address addr;
if (nid.string_length() == 24)
addr = detail::read_v4_address(in);
#if TORRENT_USE_IPV6
else if (nid.string_length() == 36)
addr = detail::read_v6_address(in);
#endif
else
continue;
ret.emplace_back(addr, id);
}
return ret;
}
namespace {
entry save_nodes(std::vector<udp::endpoint> const& nodes)
{
entry ret(entry::list_t);
@ -63,8 +91,8 @@ namespace {
void dht_state::clear()
{
nid.clear();
nid6.clear();
nids.clear();
nids.shrink_to_fit();
nodes.clear();
nodes.shrink_to_fit();
@ -78,10 +106,7 @@ namespace {
if (e.type() != bdecode_node::dict_t) return ret;
ret.nid = extract_node_id(e, "node-id");
#if TORRENT_USE_IPV6
ret.nid6 = extract_node_id(e, "node-id6");
#endif
ret.nids = extract_node_ids(e, "node-id");
if (bdecode_node const nodes = e.dict_find_list("nodes"))
ret.nodes = detail::read_endpoint_list<udp::endpoint>(nodes);
@ -89,18 +114,23 @@ namespace {
if (bdecode_node const nodes = e.dict_find_list("nodes6"))
ret.nodes6 = detail::read_endpoint_list<udp::endpoint>(nodes);
#endif
return ret;
}
entry save_dht_state(dht_state const& state)
{
entry ret(entry::dictionary_t);
ret["node-id"] = state.nid.to_string();
auto& nids = ret["node-id"].list();
for (auto const& n : state.nids)
{
std::string nid;
std::copy(n.second.begin(), n.second.end(), std::back_inserter(nid));
detail::write_address(n.first, std::back_inserter(nid));
nids.emplace_back(std::move(nid));
}
entry const nodes = save_nodes(state.nodes);
if (!nodes.list().empty()) ret["nodes"] = nodes;
#if TORRENT_USE_IPV6
ret["node-id6"] = state.nid6.to_string();
entry const nodes6 = save_nodes(state.nodes6);
if (!nodes6.list().empty()) ret["nodes6"] = nodes6;
#endif

View File

@ -44,6 +44,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include <libtorrent/aux_/time.hpp>
#include <libtorrent/session_status.hpp>
#include <libtorrent/session_settings.hpp>
#include <libtorrent/broadcast_socket.hpp> // for is_local
#ifndef TORRENT_DISABLE_LOGGING
#include <libtorrent/hex.hpp> // to_hex
@ -91,95 +92,105 @@ namespace libtorrent { namespace dht {
: m_counters(cnt)
, m_storage(storage)
, m_state(std::move(state))
, m_dht(udp::v4(), this, settings, m_state.nid
, observer, cnt, m_nodes, storage)
#if TORRENT_USE_IPV6
, m_dht6(udp::v6(), this, settings, m_state.nid6
, observer, cnt, m_nodes, storage)
#endif
, m_send_fun(send_fun)
, m_log(observer)
, m_key_refresh_timer(ios)
, m_connection_timer(ios)
#if TORRENT_USE_IPV6
, m_connection_timer6(ios)
#endif
, m_refresh_timer(ios)
, m_settings(settings)
, m_abort(false)
, m_running(false)
, m_host_resolver(ios)
, m_send_quota(settings.upload_rate_limit)
, m_last_tick(aux::time_now())
{
m_blocker.set_block_timer(m_settings.block_timeout);
m_blocker.set_rate_limit(m_settings.block_ratelimit);
}
m_nodes.insert(std::make_pair(m_dht.protocol_family_name(), &m_dht));
#if TORRENT_USE_IPV6
m_nodes.insert(std::make_pair(m_dht6.protocol_family_name(), &m_dht6));
#endif
void dht_tracker::update_node_id(aux::session_listen_socket* s)
{
auto n = m_nodes.find(s);
if (n != m_nodes.end())
n->second.dht.update_node_id();
update_storage_node_ids();
}
void dht_tracker::new_socket(aux::session_listen_socket* s)
{
address local_address = s->get_local_endpoint().address();
#if TORRENT_USE_IPV6
// don't try to start dht nodes on non-global IPv6 addresses
// with IPv4 the interface might be behind NAT so we can't skip them based on the scope of the local address
// and we might not have the external address yet
if (local_address.is_v6() && is_local(local_address))
return;
#endif
auto stored_nid = std::find_if(m_state.nids.begin(), m_state.nids.end()
, [&](node_ids_t::value_type const& nid) { return nid.first == local_address; });
node_id const nid = stored_nid != m_state.nids.end() ? stored_nid->second : node_id();
// must use piecewise construction because tracker_node::connection_timer
// is neither copyable nor movable
auto n = m_nodes.emplace(std::piecewise_construct_t(), std::forward_as_tuple(s)
, std::forward_as_tuple(m_key_refresh_timer.get_io_service()
, s, this, m_settings, nid, m_log, m_counters
, std::bind(&dht_tracker::get_node, this, _1, _2)
, m_storage));
#ifndef TORRENT_DISABLE_LOGGING
if (m_log->should_log(dht_logger::tracker))
{
m_log->log(dht_logger::tracker, "starting IPv4 DHT tracker with node id: %s"
, aux::to_hex(m_dht.nid()).c_str());
#if TORRENT_USE_IPV6
m_log->log(dht_logger::tracker, "starting IPv6 DHT tracker with node id: %s"
, aux::to_hex(m_dht6.nid()).c_str());
#endif
m_log->log(dht_logger::tracker, "starting %s DHT tracker with node id: %s"
, local_address.is_v4() ? "IPv4" : "IPv6"
, aux::to_hex(n.first->second.dht.nid()).c_str());
}
#endif
if (m_running && n.second)
{
error_code ec;
n.first->second.connection_timer.expires_from_now(seconds(1), ec);
n.first->second.connection_timer.async_wait(
std::bind(&dht_tracker::connection_timeout, self(), std::ref(n.first->second), _1));
n.first->second.dht.bootstrap(std::vector<udp::endpoint>(), find_data::nodes_callback());
}
}
dht_tracker::~dht_tracker() = default;
void dht_tracker::update_node_id()
void dht_tracker::delete_socket(aux::session_listen_socket* s)
{
m_dht.update_node_id();
#if TORRENT_USE_IPV6
m_dht6.update_node_id();
#endif
update_storage_node_ids();
m_nodes.erase(s);
}
void dht_tracker::start(find_data::nodes_callback const& f)
{
m_running = true;
error_code ec;
refresh_key(ec);
m_connection_timer.expires_from_now(seconds(1), ec);
m_connection_timer.async_wait(
std::bind(&dht_tracker::connection_timeout, self(), std::ref(m_dht), _1));
for (auto& n : m_nodes)
{
n.second.connection_timer.expires_from_now(seconds(1), ec);
n.second.connection_timer.async_wait(
std::bind(&dht_tracker::connection_timeout, self(), std::ref(n.second), _1));
#if TORRENT_USE_IPV6
m_connection_timer6.expires_from_now(seconds(1), ec);
m_connection_timer6.async_wait(
std::bind(&dht_tracker::connection_timeout, self(), std::ref(m_dht6), _1));
if (n.first->get_local_endpoint().protocol() == tcp::v6())
n.second.dht.bootstrap(concat(m_state.nodes6, m_state.nodes), f);
else
#endif
n.second.dht.bootstrap(concat(m_state.nodes, m_state.nodes6), f);
}
m_refresh_timer.expires_from_now(seconds(5), ec);
m_refresh_timer.async_wait(std::bind(&dht_tracker::refresh_timeout, self(), _1));
// bootstrap with mix of IP protocols via want/nodes/nodes6
m_dht.bootstrap(concat(m_state.nodes, m_state.nodes6), f);
#if TORRENT_USE_IPV6
m_dht6.bootstrap(concat(m_state.nodes6, m_state.nodes), f);
#endif
m_state.clear();
}
void dht_tracker::stop()
{
m_abort = true;
m_running = false;
error_code ec;
m_key_refresh_timer.cancel(ec);
m_connection_timer.cancel(ec);
#if TORRENT_USE_IPV6
m_connection_timer6.cancel(ec);
#endif
for (auto& n : m_nodes)
n.second.connection_timer.cancel(ec);
m_refresh_timer.cancel(ec);
m_host_resolver.cancel();
}
@ -196,20 +207,16 @@ namespace libtorrent { namespace dht {
s.active_requests.clear();
s.dht_total_allocations = 0;
m_dht.status(s);
#if TORRENT_USE_IPV6
m_dht6.status(s);
#endif
for (auto& n : m_nodes)
n.second.dht.status(s);
}
#endif
void dht_tracker::dht_status(std::vector<dht_routing_bucket>& table
, std::vector<dht_lookup>& requests)
{
m_dht.status(table, requests);
#if TORRENT_USE_IPV6
m_dht6.status(table, requests);
#endif
for (auto& n : m_nodes)
n.second.dht.status(table, requests);
}
void dht_tracker::update_stats_counters(counters& c) const
@ -224,35 +231,27 @@ namespace libtorrent { namespace dht {
c.set_value(counters::dht_node_cache, 0);
c.set_value(counters::dht_allocated_observers, 0);
add_dht_counters(m_dht, c);
#if TORRENT_USE_IPV6
add_dht_counters(m_dht6, c);
#endif
for (auto& n : m_nodes)
add_dht_counters(n.second.dht, c);
}
void dht_tracker::connection_timeout(node& n, error_code const& e)
void dht_tracker::connection_timeout(tracker_node& n, error_code const& e)
{
if (e || m_abort) return;
if (e || !m_running) return;
time_duration d = n.connection_timeout();
time_duration d = n.dht.connection_timeout();
error_code ec;
#if TORRENT_USE_IPV6
deadline_timer& timer = n.protocol() == udp::v4() ? m_connection_timer : m_connection_timer6;
#else
deadline_timer& timer = m_connection_timer;
#endif
deadline_timer& timer = n.connection_timer;
timer.expires_from_now(d, ec);
timer.async_wait(std::bind(&dht_tracker::connection_timeout, self(), std::ref(n), _1));
}
void dht_tracker::refresh_timeout(error_code const& e)
{
if (e || m_abort) return;
if (e || !m_running) return;
m_dht.tick();
#if TORRENT_USE_IPV6
m_dht6.tick();
#endif
for (auto& n : m_nodes)
n.second.dht.tick();
// periodically update the DOS blocker's settings from the dht_settings
m_blocker.set_block_timer(m_settings.block_timeout);
@ -266,16 +265,15 @@ namespace libtorrent { namespace dht {
void dht_tracker::refresh_key(error_code const& e)
{
if (e || m_abort) return;
if (e || !m_running) return;
error_code ec;
m_key_refresh_timer.expires_from_now(key_refresh, ec);
m_key_refresh_timer.async_wait(std::bind(&dht_tracker::refresh_key, self(), _1));
m_dht.new_write_key();
#if TORRENT_USE_IPV6
m_dht6.new_write_key();
#endif
for (auto& n : m_nodes)
n.second.dht.new_write_key();
#ifndef TORRENT_DISABLE_LOGGING
m_log->log(dht_logger::tracker, "*** new write key***");
#endif
@ -284,30 +282,37 @@ namespace libtorrent { namespace dht {
void dht_tracker::update_storage_node_ids()
{
std::vector<sha1_hash> ids;
ids.push_back(m_dht.nid());
#if TORRENT_USE_IPV6
ids.push_back(m_dht6.nid());
#endif
for (auto& n : m_nodes)
ids.push_back(n.second.dht.nid());
m_storage.update_node_ids(ids);
}
node* dht_tracker::get_node(node_id const& id, std::string const& family_name)
{
TORRENT_UNUSED(id);
for (auto& n : m_nodes)
{
// TODO: pick the closest node rather than the first
if (n.second.dht.protocol_family_name() == family_name)
return &n.second.dht;
}
return nullptr;
}
void dht_tracker::get_peers(sha1_hash const& ih
, std::function<void(std::vector<tcp::endpoint> const&)> f)
{
std::function<void(std::vector<std::pair<node_entry, std::string>> const&)> empty;
m_dht.get_peers(ih, f, empty, false);
#if TORRENT_USE_IPV6
m_dht6.get_peers(ih, f, empty, false);
#endif
for (auto& n : m_nodes)
n.second.dht.get_peers(ih, f, empty, false);
}
void dht_tracker::announce(sha1_hash const& ih, int listen_port, int flags
, std::function<void(std::vector<tcp::endpoint> const&)> f)
{
m_dht.announce(ih, listen_port, flags, f);
#if TORRENT_USE_IPV6
m_dht6.announce(ih, listen_port, flags, f);
#endif
for (auto& n : m_nodes)
n.second.dht.announce(ih, listen_port, flags, f);
}
namespace {
@ -393,11 +398,9 @@ namespace libtorrent { namespace dht {
void dht_tracker::get_item(sha1_hash const& target
, std::function<void(item const&)> cb)
{
auto ctx = std::make_shared<get_immutable_item_ctx>((TORRENT_USE_IPV6) ? 2 : 1);
m_dht.get_item(target, std::bind(&get_immutable_item_callback, _1, ctx, cb));
#if TORRENT_USE_IPV6
m_dht6.get_item(target, std::bind(&get_immutable_item_callback, _1, ctx, cb));
#endif
auto ctx = std::make_shared<get_immutable_item_ctx>(int(m_nodes.size()));
for (auto& n : m_nodes)
n.second.dht.get_item(target, std::bind(&get_immutable_item_callback, _1, ctx, cb));
}
// key is a 32-byte binary string, the public key to look up.
@ -406,11 +409,9 @@ namespace libtorrent { namespace dht {
, std::function<void(item const&, bool)> cb
, std::string salt)
{
auto ctx = std::make_shared<get_mutable_item_ctx>((TORRENT_USE_IPV6) ? 2 : 1);
m_dht.get_item(key, salt, std::bind(&get_mutable_item_callback, _1, _2, ctx, cb));
#if TORRENT_USE_IPV6
m_dht6.get_item(key, salt, std::bind(&get_mutable_item_callback, _1, _2, ctx, cb));
#endif
auto ctx = std::make_shared<get_mutable_item_ctx>(int(m_nodes.size()));
for (auto& n : m_nodes)
n.second.dht.get_item(key, salt, std::bind(&get_mutable_item_callback, _1, _2, ctx, cb));
}
void dht_tracker::put_item(entry const& data
@ -420,37 +421,32 @@ namespace libtorrent { namespace dht {
bencode(std::back_inserter(flat_data), data);
sha1_hash const target = item_target_id(flat_data);
auto ctx = std::make_shared<put_item_ctx>((TORRENT_USE_IPV6) ? 2 : 1);
m_dht.put_item(target, data, std::bind(&put_immutable_item_callback
auto ctx = std::make_shared<put_item_ctx>(int(m_nodes.size()));
for (auto& n : m_nodes)
n.second.dht.put_item(target, data, std::bind(&put_immutable_item_callback
, _1, ctx, cb));
#if TORRENT_USE_IPV6
m_dht6.put_item(target, data, std::bind(&put_immutable_item_callback
, _1, ctx, cb));
#endif
}
void dht_tracker::put_item(public_key const& key
, std::function<void(item const&, int)> cb
, std::function<void(item&)> data_cb, std::string salt)
{
auto ctx = std::make_shared<put_item_ctx>((TORRENT_USE_IPV6) ? 2 : 1);
m_dht.put_item(key, salt, std::bind(&put_mutable_item_callback
, _1, _2, ctx, cb), data_cb);
#if TORRENT_USE_IPV6
m_dht6.put_item(key, salt, std::bind(&put_mutable_item_callback
, _1, _2, ctx, cb), data_cb);
#endif
auto ctx = std::make_shared<put_item_ctx>(int(m_nodes.size()));
for (auto& n : m_nodes)
n.second.dht.put_item(key, salt, std::bind(&put_mutable_item_callback
, _1, _2, ctx, cb), data_cb);
}
void dht_tracker::direct_request(udp::endpoint const& ep, entry& e
, std::function<void(msg const&)> f)
{
#if TORRENT_USE_IPV6
if (ep.protocol() == udp::v6())
m_dht6.direct_request(ep, e, f);
else
#endif
m_dht.direct_request(ep, e, f);
for (auto& n : m_nodes)
{
if (ep.protocol() != (n.first->get_external_address().is_v4() ? udp::v4() : udp::v6()))
continue;
n.second.dht.direct_request(ep, e, f);
break;
}
}
void dht_tracker::incoming_error(error_code const& ec, udp::endpoint const& ep)
@ -466,10 +462,8 @@ namespace libtorrent { namespace dht {
#endif
)
{
m_dht.unreachable(ep);
#if TORRENT_USE_IPV6
m_dht6.unreachable(ep);
#endif
for (auto& n : m_nodes)
n.second.dht.unreachable(ep);
}
}
@ -538,29 +532,34 @@ namespace libtorrent { namespace dht {
#endif
libtorrent::dht::msg m(m_msg, ep);
m_dht.incoming(m);
#if TORRENT_USE_IPV6
m_dht6.incoming(m);
#endif
for (auto& n : m_nodes)
n.second.dht.incoming(m);
return true;
}
dht_tracker::tracker_node::tracker_node(io_service& ios
, aux::session_listen_socket* s, socket_manager* sock
, libtorrent::dht_settings const& settings
, node_id const& nid
, dht_observer* observer, counters& cnt
, get_foreign_node_t get_foreign_node
, dht_storage_interface& storage)
: dht(s, sock, settings, nid, observer, cnt, get_foreign_node, storage)
, connection_timer(ios)
{}
std::vector<std::pair<node_id, udp::endpoint>> dht_tracker::live_nodes(node_id const& nid)
{
std::vector<std::pair<node_id, udp::endpoint>> ret;
// TODO: figure out a better solution when multi-home is implemented
node const* dht = m_dht.nid() == nid ? &m_dht
#if TORRENT_USE_IPV6
: m_dht6.nid() == nid ? &m_dht6 : nullptr;
#else
: nullptr;
#endif
auto n = std::find_if(m_nodes.begin(), m_nodes.end()
, [&](tracker_nodes_t::value_type const& v) { return v.second.dht.nid() == nid; });
if (dht == nullptr) return ret;
dht->m_table.for_each_node([&ret](node_entry const& e)
{ ret.emplace_back(e.id, e.endpoint); }, nullptr);
if (n != m_nodes.end())
{
n->second.dht.m_table.for_each_node([&ret](node_entry const& e)
{ ret.emplace_back(e.id, e.endpoint); }, nullptr);
}
return ret;
}
@ -582,29 +581,27 @@ namespace libtorrent { namespace dht {
dht_state dht_tracker::state() const
{
dht_state ret;
ret.nid = m_dht.nid();
ret.nodes = save_nodes(m_dht);
#if TORRENT_USE_IPV6
ret.nid6 = m_dht6.nid();
ret.nodes6 = save_nodes(m_dht6);
#endif
for (auto& n : m_nodes)
{
// use the local rather than external address because if the user is behind NAT
// we won't know the external IP on startup
ret.nids.push_back(std::make_pair(n.first->get_local_endpoint().address(), n.second.dht.nid()));
auto nodes = save_nodes(n.second.dht);
ret.nodes.insert(ret.nodes.end(), nodes.begin(), nodes.end());
}
return ret;
}
void dht_tracker::add_node(udp::endpoint const& node)
{
m_dht.add_node(node);
#if TORRENT_USE_IPV6
m_dht6.add_node(node);
#endif
for (auto& n : m_nodes)
n.second.dht.add_node(node);
}
void dht_tracker::add_router_node(udp::endpoint const& node)
{
m_dht.add_router_node(node);
#if TORRENT_USE_IPV6
m_dht6.add_router_node(node);
#endif
for (auto& n : m_nodes)
n.second.dht.add_router_node(node);
}
bool dht_tracker::has_quota()
@ -624,7 +621,7 @@ namespace libtorrent { namespace dht {
return m_send_quota > 0;
}
bool dht_tracker::send_packet(entry& e, udp::endpoint const& addr)
bool dht_tracker::send_packet(aux::session_listen_socket* s, entry& e, udp::endpoint const& addr)
{
static char const version_str[] = {'L', 'T'
, LIBTORRENT_VERSION_MAJOR, LIBTORRENT_VERSION_MINOR};
@ -639,7 +636,25 @@ namespace libtorrent { namespace dht {
m_send_quota -= int(m_send_buf.size());
error_code ec;
m_send_fun(addr, m_send_buf, ec, 0);
if (s->get_local_endpoint().protocol().family() != addr.protocol().family())
{
// the node is trying to send a packet to a different address family
// than its socket, this can happen during bootstrap
// pick a node with the right address family and use its socket
auto n = std::find_if(m_nodes.begin(), m_nodes.end()
, [&](tracker_nodes_t::value_type const& v)
{ return v.first->get_local_endpoint().protocol().family() == addr.protocol().family(); });
if (n != m_nodes.end())
m_send_fun(n->first, addr, m_send_buf, ec, 0);
else
ec = boost::asio::error::address_family_not_supported;
}
else
{
m_send_fun(s, addr, m_send_buf, ec, 0);
}
if (ec)
{
m_counters.inc_stats_counter(counters::dht_messages_out_dropped);

View File

@ -48,6 +48,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/hasher.hpp"
#include "libtorrent/random.hpp"
#include <libtorrent/assert.hpp>
#include "libtorrent/aux_/session_listen_socket.hpp"
#include <libtorrent/aux_/time.hpp>
#include "libtorrent/aux_/throw.hpp"
#include "libtorrent/alert_types.hpp" // for dht_lookup
@ -72,14 +73,14 @@ namespace {
void nop() {}
node_id calculate_node_id(node_id const& nid, dht_observer* observer, udp protocol)
node_id calculate_node_id(node_id const& nid, aux::session_listen_socket* sock)
{
address external_address;
if (observer != nullptr) external_address = observer->external_address(protocol);
external_address = sock->get_external_address();
// if we don't have an observer, don't pretend that external_address is valid
// generating an ID based on 0.0.0.0 would be terrible. random is better
if (observer == nullptr || external_address.is_unspecified())
if (external_address.is_unspecified())
{
return generate_random_id();
}
@ -101,22 +102,23 @@ void incoming_error(entry& e, char const* msg, int error_code = 203)
} // anonymous namespace
node::node(udp proto, udp_socket_interface* sock
node::node(aux::session_listen_socket* sock, socket_manager* sock_man
, dht_settings const& settings
, node_id const& nid
, dht_observer* observer
, counters& cnt
, std::map<std::string, node*> const& nodes
, get_foreign_node_t get_foreign_node
, dht_storage_interface& storage)
: m_settings(settings)
, m_id(calculate_node_id(nid, observer, proto))
, m_table(m_id, proto, 8, settings, observer)
, m_rpc(m_id, m_settings, m_table, sock, observer)
, m_nodes(nodes)
, m_id(calculate_node_id(nid, sock))
, m_table(m_id, sock->get_local_endpoint().protocol() == tcp::v4() ? udp::v4() : udp::v6(), 8, settings, observer)
, m_rpc(m_id, m_settings, m_table, sock, sock_man, observer)
, m_get_foreign_node(get_foreign_node)
, m_observer(observer)
, m_protocol(map_protocol_to_descriptor(proto))
, m_protocol(map_protocol_to_descriptor(sock->get_local_endpoint().protocol() == tcp::v4() ? udp::v4() : udp::v6()))
, m_last_tracker_tick(aux::time_now())
, m_last_self_refresh(min_time())
, m_sock_man(sock_man)
, m_sock(sock)
, m_counters(cnt)
, m_storage(storage)
@ -136,7 +138,7 @@ void node::update_node_id()
// it's possible that our external address hasn't actually changed. If our
// current ID is still valid, don't do anything.
if (verify_id(m_id, m_observer->external_address(protocol())))
if (verify_id(m_id, m_sock->get_external_address()))
return;
#ifndef TORRENT_DISABLE_LOGGING
@ -144,7 +146,7 @@ void node::update_node_id()
, "updating node ID (because external IP address changed)");
#endif
m_id = generate_id(m_observer->external_address(protocol()));
m_id = generate_id(m_sock->get_external_address());
m_table.update_node_id(m_id);
m_rpc.update_node_id(m_id);
@ -285,7 +287,7 @@ void node::incoming(msg const& m)
address_v6::bytes_type b;
std::memcpy(&b[0], ext_ip.string_ptr(), 16);
if (m_observer != nullptr)
m_observer->set_external_address(address_v6(b)
m_observer->set_external_address(m_sock, address_v6(b)
, m.addr.address());
} else
#endif
@ -294,7 +296,7 @@ void node::incoming(msg const& m)
address_v4::bytes_type b;
std::memcpy(&b[0], ext_ip.string_ptr(), 4);
if (m_observer != nullptr)
m_observer->set_external_address(address_v4(b)
m_observer->set_external_address(m_sock, address_v4(b)
, m.addr.address());
}
@ -315,7 +317,7 @@ void node::incoming(msg const& m)
if (!native_address(m.addr)) break;
if (!m_sock->has_quota())
if (!m_sock_man->has_quota())
{
m_counters.inc_stats_counter(counters::dht_messages_in_dropped);
return;
@ -323,7 +325,7 @@ void node::incoming(msg const& m)
entry e;
incoming_request(m, e);
m_sock->send_packet(e, m.addr);
m_sock_man->send_packet(m_sock, e, m.addr);
break;
}
case 'e':
@ -1164,11 +1166,11 @@ void node::write_nodes_entries(sha1_hash const& info_hash
bdecode_node wanted = want.list_at(i);
if (wanted.type() != bdecode_node::string_t)
continue;
auto wanted_node = m_nodes.find(wanted.string_value().to_string());
if (wanted_node == m_nodes.end()) continue;
node* wanted_node = m_get_foreign_node(info_hash, wanted.string_value().to_string());
if (!wanted_node) continue;
std::vector<node_entry> n;
wanted_node->second->m_table.find_node(info_hash, n, 0);
r[wanted_node->second->protocol_nodes_key()] = write_nodes_entry(n);
wanted_node->m_table.find_node(info_hash, n, 0);
r[wanted_node->protocol_nodes_key()] = write_nodes_entry(n);
}
}

View File

@ -148,10 +148,13 @@ using observer_storage = aux::aligned_union<1
rpc_manager::rpc_manager(node_id const& our_id
, dht_settings const& settings
, routing_table& table, udp_socket_interface* sock
, routing_table& table
, aux::session_listen_socket* sock
, socket_manager* sock_man
, dht_logger* log)
: m_pool_allocator(sizeof(observer_storage), 10)
, m_sock(sock)
, m_sock_man(sock_man)
#ifndef TORRENT_DISABLE_LOGGING
, m_log(log)
#endif
@ -480,7 +483,7 @@ bool rpc_manager::invoke(entry& e, udp::endpoint const& target_addr
}
#endif
if (m_sock->send_packet(e, target_addr))
if (m_sock_man->send_packet(m_sock, e, target_addr))
{
m_transactions.insert(std::make_pair(tid, o));
#if TORRENT_USE_ASSERTS

View File

@ -760,12 +760,14 @@ namespace {
unsigned short session_handle::listen_port() const
{
return sync_call_ret<unsigned short>(&session_impl::listen_port);
return sync_call_ret<unsigned short, unsigned short(session_impl::*)() const>
(&session_impl::listen_port);
}
unsigned short session_handle::ssl_listen_port() const
{
return sync_call_ret<unsigned short>(&session_impl::ssl_listen_port);
return sync_call_ret<unsigned short, unsigned short(session_impl::*)() const>
(&session_impl::ssl_listen_port);
}
bool session_handle::is_listening() const

View File

@ -939,7 +939,7 @@ namespace aux {
// the uTP connections cannot be closed gracefully
if (l.udp_sock)
{
l.udp_sock->close();
l.udp_sock->sock.close();
}
}
@ -1160,6 +1160,7 @@ namespace {
if (req.ssl_ctx) req.listen_port = ssl_listen_port();
req.ssl_ctx = &m_ssl_ctx;
#endif
#if TORRENT_USE_I2P
if (!m_settings.get_str(settings_pack::i2p_hostname).empty())
{
@ -1168,7 +1169,6 @@ namespace {
#endif
//TODO: should there be an option to announce once per listen interface?
m_tracker_manager.queue_request(get_io_service(), req, c);
}
@ -1634,8 +1634,8 @@ namespace {
: socket_type_t::udp;
udp::endpoint const udp_bind_ep(bind_ep.address(), bind_ep.port());
ret.udp_sock = std::make_shared<udp_socket>(m_io_service);
ret.udp_sock->open(udp_bind_ep.protocol(), ec);
ret.udp_sock = std::make_shared<session_udp_socket>(m_io_service);
ret.udp_sock->sock.open(udp_bind_ep.protocol(), ec);
if (ec)
{
#ifndef TORRENT_DISABLE_LOGGING
@ -1657,7 +1657,7 @@ namespace {
#if TORRENT_HAS_BINDTODEVICE
if (!device.empty())
{
ret.udp_sock->set_option(bind_to_device(device.c_str()), ec);
ret.udp_sock->sock.set_option(bind_to_device(device.c_str()), ec);
if (ec)
{
#ifndef TORRENT_DISABLE_LOGGING
@ -1678,7 +1678,7 @@ namespace {
}
}
#endif
ret.udp_sock->bind(udp_bind_ep, ec);
ret.udp_sock->sock.bind(udp_bind_ep, ec);
last_op = listen_failed_alert::bind;
if (ec)
@ -1697,27 +1697,27 @@ namespace {
return ret;
}
ret.udp_external_port = ret.udp_sock->local_port();
ret.udp_external_port = ret.udp_sock->sock.local_port();
error_code err;
set_socket_buffer_size(*ret.udp_sock, m_settings, err);
set_socket_buffer_size(ret.udp_sock->sock, m_settings, err);
if (err)
{
if (m_alerts.should_post<udp_error_alert>())
m_alerts.emplace_alert<udp_error_alert>(ret.udp_sock->local_endpoint(ec), err);
m_alerts.emplace_alert<udp_error_alert>(ret.udp_sock->sock.local_endpoint(ec), err);
}
ret.udp_sock->set_force_proxy(m_settings.get_bool(settings_pack::force_proxy));
ret.udp_sock->sock.set_force_proxy(m_settings.get_bool(settings_pack::force_proxy));
// this call is necessary here because, unless the settings actually
// change after the session is up and listening, at no other point
// set_proxy_settings is called with the correct proxy configuration,
// internally, this method handle the SOCKS5's connection logic
ret.udp_sock->set_proxy_settings(proxy());
ret.udp_sock->sock.set_proxy_settings(proxy());
// TODO: 2 use a handler allocator here
ADD_OUTSTANDING_ASYNC("session_impl::on_udp_packet");
ret.udp_sock->async_read(std::bind(&session_impl::on_udp_packet
, this, std::weak_ptr<udp_socket>(ret.udp_sock), ret.ssl, _1));
ret.udp_sock->sock.async_read(std::bind(&session_impl::on_udp_packet
, this, ret.udp_sock, ret.ssl, _1));
#ifndef TORRENT_DISABLE_LOGGING
if (should_log())
@ -1763,6 +1763,55 @@ namespace {
reopen_listen_sockets();
}
void session_impl::interface_to_endpoints(std::string const& device, int const port
, bool const ssl, std::vector<listen_endpoint_t>& eps)
{
// First, check to see if it's an IP address
error_code err;
address const adr = address::from_string(device.c_str(), err);
if (!err)
{
#if !TORRENT_USE_IPV6
if (adr.is_v4())
#endif
eps.emplace_back(adr, port, std::string(), ssl);
}
else
{
// this is the case where device names a network device. We need to
// enumerate all IPs associated with this device
// TODO: 3 only run this once in the caller
std::vector<ip_interface> const ifs = enum_net_interfaces(m_io_service, err);
if (err)
{
#ifndef TORRENT_DISABLE_LOGGING
if (should_log())
{
session_log("failed to enumerate IPs on device: \"%s\": %s"
, device.c_str(), err.message().c_str());
}
#endif
if (m_alerts.should_post<listen_failed_alert>())
{
m_alerts.emplace_alert<listen_failed_alert>(device
, listen_failed_alert::enum_if, err
, socket_type_t::tcp);
}
return;
}
for (auto const& ipface : ifs)
{
// we're looking for a specific interface, and its address
// (which must be of the same family as the address we're
// connecting to)
if (device != ipface.name) continue;
eps.emplace_back(ipface.interface_address, port, device, ssl);
}
}
}
void session_impl::reopen_listen_sockets()
{
#ifndef TORRENT_DISABLE_LOGGING
@ -1813,55 +1862,18 @@ namespace {
// IP address or a device name. In case it's a device name, we want to
// (potentially) end up binding a socket for each IP address associated
// with that device.
// First, check to see if it's an IP address
error_code err;
address const adr = address::from_string(device.c_str(), err);
if (!err)
{
eps.emplace_back(adr, port, std::string(), ssl);
}
else
{
// this is the case where device names a network device. We need to
// enumerate all IPs associated with this device
// TODO: 3 only run this once, not every turn through the loop
std::vector<ip_interface> const ifs = enum_net_interfaces(m_io_service, ec);
if (ec)
{
#ifndef TORRENT_DISABLE_LOGGING
if (should_log())
{
session_log("failed to enumerate IPs on device: \"%s\": %s"
, device.c_str(), ec.message().c_str());
}
#endif
if (m_alerts.should_post<listen_failed_alert>())
{
m_alerts.emplace_alert<listen_failed_alert>(device
, listen_failed_alert::enum_if, ec
, socket_type_t::tcp);
}
continue;
}
for (auto const& ipface : ifs)
{
// we're looking for a specific interface, and its address
// (which must be of the same family as the address we're
// connecting to)
if (device != ipface.name) continue;
eps.emplace_back(ipface.interface_address, port, device, ssl);
}
}
interface_to_endpoints(device, port, ssl, eps);
}
auto remove_iter = partition_listen_sockets(eps, m_listen_sockets);
while (remove_iter != m_listen_sockets.end())
{
// TODO notify interested parties of this socket's demise
#ifndef TORRENT_DISABLE_DHT
if (m_dht)
m_dht->delete_socket(&*remove_iter);
#endif
#ifndef TORRENT_DISABLE_LOGGING
if (should_log())
{
@ -1871,7 +1883,7 @@ namespace {
}
#endif
if (remove_iter->sock) remove_iter->sock->close(ec);
if (remove_iter->udp_sock) remove_iter->udp_sock->close();
if (remove_iter->udp_sock) remove_iter->udp_sock->sock.close();
remove_iter = m_listen_sockets.erase(remove_iter);
}
@ -1885,8 +1897,12 @@ namespace {
if (!ec && (s.sock || s.udp_sock))
{
// TODO notify interested parties of this socket's creation
m_listen_sockets.push_back(s);
#ifndef TORRENT_DISABLE_DHT
if (m_dht)
m_dht->new_socket(&m_listen_sockets.back());
#endif
}
}
@ -1923,8 +1939,8 @@ namespace {
if (l.udp_sock)
{
udp::endpoint const udp_ep = l.udp_sock->local_endpoint(err);
if (!err && l.udp_sock->is_open())
udp::endpoint const udp_ep = l.udp_sock->sock.local_endpoint(err);
if (!err && l.udp_sock->sock.is_open())
{
socket_type_t const socket_type
= l.ssl
@ -1975,7 +1991,7 @@ namespace {
, listen_socket_t& s)
{
tcp::endpoint const tcp_ep = s.sock ? s.sock->local_endpoint() : tcp::endpoint();
udp::endpoint const udp_ep = s.udp_sock ? s.udp_sock->local_endpoint() : udp::endpoint();
udp::endpoint const udp_ep = s.udp_sock ? s.udp_sock->sock.local_endpoint() : udp::endpoint();
if ((mask & remap_natpmp) && m_natpmp)
{
@ -2092,29 +2108,39 @@ namespace {
, int const flags)
{
// for now, just pick the first socket with a matching address family
// TODO: 3 for proper multi-homed support, we may want to do something
// else here. Probably let the caller decide which interface to send over
// TODO: remove this function once all callers are updated to specify a socket
for (auto& i : m_listen_sockets)
{
if (!i.udp_sock) continue;
if (i.ssl) continue;
i.udp_sock->send_hostname(hostname, port, p, ec, flags);
if ((ec == error::would_block
|| ec == error::try_again)
&& !i.udp_write_blocked)
{
i.udp_write_blocked = true;
ADD_OUTSTANDING_ASYNC("session_impl::on_udp_writeable");
i.udp_sock->async_write(std::bind(&session_impl::on_udp_writeable
, this, std::weak_ptr<udp_socket>(i.udp_sock), _1));
}
send_udp_packet_hostname_listen(&i, hostname, port, p, ec, flags);
return;
}
ec = boost::asio::error::operation_not_supported;
}
void session_impl::send_udp_packet_hostname_listen(aux::session_listen_socket* sock
, char const* hostname
, int const port
, span<char const> p
, error_code& ec
, int const flags)
{
auto s = static_cast<listen_socket_t*>(sock)->udp_sock;
s->sock.send_hostname(hostname, port, p, ec, flags);
if ((ec == error::would_block || ec == error::try_again)
&& !s->write_blocked)
{
s->write_blocked = true;
ADD_OUTSTANDING_ASYNC("session_impl::on_udp_writeable");
s->sock.async_write(std::bind(&session_impl::on_udp_writeable
, this, s, _1));
}
}
void session_impl::send_udp_packet(bool const ssl
, udp::endpoint const& ep
, span<char const> p
@ -2131,42 +2157,55 @@ namespace {
if (i.local_endpoint.address().is_v4() != ep.address().is_v4())
continue;
i.udp_sock->send(ep, p, ec, flags);
if ((ec == error::would_block
|| ec == error::try_again)
&& !i.udp_write_blocked)
{
i.udp_write_blocked = true;
ADD_OUTSTANDING_ASYNC("session_impl::on_udp_writeable");
i.udp_sock->async_write(std::bind(&session_impl::on_udp_writeable
, this, std::weak_ptr<udp_socket>(i.udp_sock), _1));
}
send_udp_packet_listen(&i, ep, p, ec, flags);
return;
}
ec = boost::asio::error::operation_not_supported;
}
void session_impl::on_udp_writeable(std::weak_ptr<udp_socket> s, error_code const& ec)
void session_impl::send_udp_packet_listen(aux::session_listen_socket* sock
, udp::endpoint const& ep
, span<char const> p
, error_code& ec
, int const flags)
{
auto s = static_cast<listen_socket_t*>(sock)->udp_sock;
TORRENT_ASSERT(s->sock.local_endpoint().protocol() == ep.protocol());
s->sock.send(ep, p, ec, flags);
if ((ec == error::would_block
|| ec == error::try_again)
&& !s->write_blocked)
{
s->write_blocked = true;
ADD_OUTSTANDING_ASYNC("session_impl::on_udp_writeable");
s->sock.async_write(std::bind(&session_impl::on_udp_writeable
, this, s, _1));
}
}
void session_impl::on_udp_writeable(std::weak_ptr<session_udp_socket> sock, error_code const& ec)
{
COMPLETE_ASYNC("session_impl::on_udp_writeable");
if (ec) return;
std::shared_ptr<udp_socket> sock = s.lock();
if (!sock) return;
auto s = sock.lock();
if (!s) return;
s->write_blocked = false;
#ifdef TORRENT_USE_OPENSSL
std::list<listen_socket_t>::iterator i = std::find_if(
m_listen_sockets.begin(), m_listen_sockets.end()
, [&sock] (listen_socket_t const& ls) { return ls.udp_sock == sock; });
if (i == m_listen_sockets.end()) return;
i->udp_write_blocked = false;
, [&s] (listen_socket_t const& ls) { return ls.udp_sock == s; });
#endif
// notify the utp socket manager it can start sending on the socket again
struct utp_socket_manager& mgr =
#ifdef TORRENT_USE_OPENSSL
i->ssl ? m_ssl_utp_socket_manager :
(i != m_listen_sockets.end() && i->ssl) ? m_ssl_utp_socket_manager :
#endif
m_utp_socket_manager;
@ -2174,13 +2213,13 @@ namespace {
}
void session_impl::on_udp_packet(std::weak_ptr<udp_socket> const& socket
void session_impl::on_udp_packet(std::weak_ptr<session_udp_socket> socket
, bool const ssl, error_code const& ec)
{
COMPLETE_ASYNC("session_impl::on_udp_packet");
if (ec)
{
std::shared_ptr<udp_socket> s = socket.lock();
std::shared_ptr<session_udp_socket> s = socket.lock();
udp::endpoint ep;
if (s) ep = s->local_endpoint();
@ -2204,7 +2243,7 @@ namespace {
m_stats_counters.inc_stats_counter(counters::on_udp_counter);
std::shared_ptr<udp_socket> s = socket.lock();
std::shared_ptr<session_udp_socket> s = socket.lock();
if (!s) return;
struct utp_socket_manager& mgr =
@ -2217,7 +2256,7 @@ namespace {
{
aux::array<udp_socket::packet, 50> p;
error_code err;
int const num_packets = s->read(p, err);
int const num_packets = s->sock.read(p, err);
for (int i = 0; i < num_packets; ++i)
{
@ -2316,7 +2355,7 @@ namespace {
mgr.socket_drained();
ADD_OUTSTANDING_ASYNC("session_impl::on_udp_packet");
s->async_read(std::bind(&session_impl::on_udp_packet
s->sock.async_read(std::bind(&session_impl::on_udp_packet
, this, socket, ssl, _1));
}
@ -5057,9 +5096,7 @@ namespace {
void session_impl::update_proxy()
{
for (auto& i : m_listen_sockets)
{
i.udp_sock->set_proxy_settings(proxy());
}
i.udp_sock->sock.set_proxy_settings(proxy());
}
void session_impl::update_upnp()
@ -5132,20 +5169,33 @@ namespace {
}
std::uint16_t session_impl::listen_port() const
{
return listen_port(nullptr);
}
std::uint16_t session_impl::listen_port(listen_socket_t* sock) const
{
// if not, don't tell the tracker anything if we're in force_proxy
// mode. We don't want to leak our listen port since it can
// potentially identify us if it is leaked elsewhere
if (m_settings.get_bool(settings_pack::force_proxy)) return 0;
if (m_listen_sockets.empty()) return 0;
if (sock) return std::uint16_t(sock->tcp_external_port);
return std::uint16_t(m_listen_sockets.front().tcp_external_port);
}
// TODO: 2 this function should be removed and users need to deal with the
// more generic case of having multiple ssl ports
std::uint16_t session_impl::ssl_listen_port() const
{
return ssl_listen_port(nullptr);
}
std::uint16_t session_impl::ssl_listen_port(listen_socket_t* sock) const
{
#ifdef TORRENT_USE_OPENSSL
if (sock) return std::uint16_t(sock->tcp_external_port);
// if not, don't tell the tracker anything if we're in force_proxy
// mode. We don't want to leak our listen port since it can
// potentially identify us if it is leaked elsewhere
@ -5154,6 +5204,8 @@ namespace {
{
if (s.ssl) return std::uint16_t(s.tcp_external_port);
}
#else
TORRENT_UNUSED(sock);
#endif
return 0;
}
@ -5423,12 +5475,20 @@ namespace {
m_dht = std::make_shared<dht::dht_tracker>(
static_cast<dht::dht_observer*>(this)
, m_io_service
, std::bind(&session_impl::send_udp_packet, this, false, _1, _2, _3, _4)
, [=](aux::session_listen_socket* sock
, udp::endpoint const& ep
, span<char const> p
, error_code& ec
, int flags)
{ send_udp_packet_listen(sock, ep, p, ec, flags); }
, m_dht_settings
, m_stats_counters
, *m_dht_storage
, std::move(m_dht_state));
for (auto& s : m_listen_sockets)
m_dht->new_socket(&s);
for (auto const& n : m_dht_router_nodes)
{
m_dht->add_router_node(n);
@ -5447,6 +5507,7 @@ namespace {
if (m_alerts.should_post<dht_bootstrap_alert>())
m_alerts.emplace_alert<dht_bootstrap_alert>();
};
m_dht->start(cb);
}
@ -5861,17 +5922,18 @@ namespace {
}
#endif
}
if (l.udp_sock)
{
error_code ec;
set_tos(*l.udp_sock, tos, ec);
set_tos(l.udp_sock->sock, tos, ec);
#ifndef TORRENT_DISABLE_LOGGING
if (should_log())
{
session_log(">>> SET_TOS [ udp (%s %d) tos: %x e: %s ]"
, l.udp_sock->local_endpoint().address().to_string().c_str()
, l.udp_sock->local_port()
, l.udp_sock->sock.local_endpoint().address().to_string().c_str()
, l.udp_sock->sock.local_port()
, tos, ec.message().c_str());
}
#endif
@ -6019,14 +6081,14 @@ namespace {
for (auto const& l : m_listen_sockets)
{
error_code ec;
set_socket_buffer_size(*l.udp_sock, m_settings, ec);
set_socket_buffer_size(l.udp_sock->sock, m_settings, ec);
#ifndef TORRENT_DISABLE_LOGGING
if (ec && should_log())
{
error_code err;
session_log("socket buffer size [ udp %s %d]: (%d) %s"
, l.udp_sock->local_endpoint().address().to_string(err).c_str()
, l.udp_sock->local_port(), ec.value(), ec.message().c_str());
, l.udp_sock->sock.local_endpoint().address().to_string(err).c_str()
, l.udp_sock->sock.local_port(), ec.value(), ec.message().c_str());
}
#endif
ec.clear();
@ -6091,7 +6153,7 @@ namespace {
{
for (auto& i : m_listen_sockets)
{
i.udp_sock->set_force_proxy(m_settings.get_bool(settings_pack::force_proxy));
i.udp_sock->sock.set_force_proxy(m_settings.get_bool(settings_pack::force_proxy));
// close the TCP listen sockets
if (i.sock)
@ -6426,28 +6488,11 @@ namespace {
}
// this is the DHT observer version. DHT is the implied source
void session_impl::set_external_address(address const& ip
void session_impl::set_external_address(aux::session_listen_socket* iface, address const& ip
, address const& source)
{
set_external_address(ip, source_dht, source);
}
// TODO 3 pass in a specific listen socket rather than an address family
address session_impl::external_address(udp proto)
{
#if !TORRENT_USE_IPV6
TORRENT_UNUSED(proto);
#endif
address addr;
#if TORRENT_USE_IPV6
if (proto == udp::v6())
addr = address_v6();
else
#endif
addr = address_v4();
addr = external_address().external_address(addr);
return addr;
TORRENT_ASSERT(iface);
set_external_address(*static_cast<listen_socket_t*>(iface), ip, source_dht, source);
}
void session_impl::get_peers(sha1_hash const& ih)
@ -6547,6 +6592,32 @@ namespace {
void session_impl::set_external_address(address const& ip
, int const source_type, address const& source)
{
// for now, just pick the first socket with a matching address family
// TODO: remove this function once all callers are updated to specify a listen socket
for (auto& i : m_listen_sockets)
{
if (i.local_endpoint.address().is_v4() != ip.is_v4())
continue;
set_external_address(i, ip, source_type, source);
break;
}
}
void session_impl::set_external_address(
tcp::endpoint const& local_endpoint, address const& ip
, int const source_type, address const& source)
{
auto sock = std::find_if(m_listen_sockets.begin(), m_listen_sockets.end()
, [&](listen_socket_t const& v) { return v.local_endpoint == local_endpoint; });
if (sock != m_listen_sockets.end())
set_external_address(*sock, ip, source_type, source);
}
void session_impl::set_external_address(listen_socket_t& sock
, address const& ip, int const source_type, address const& source)
{
#ifndef TORRENT_DISABLE_LOGGING
if (should_log())
{
@ -6555,16 +6626,7 @@ namespace {
}
#endif
// for now, just pick the first socket with a matching address family
// TODO: 3 allow the caller to select which listen socket to update
for (auto& i : m_listen_sockets)
{
if (i.local_endpoint.address().is_v4() != ip.is_v4())
continue;
if (!i.external_address.cast_vote(ip, source_type, source)) return;
break;
}
if (!sock.external_address.cast_vote(ip, source_type, source)) return;
#ifndef TORRENT_DISABLE_LOGGING
session_log(" external IP updated");
@ -6582,7 +6644,7 @@ namespace {
// restart the DHT with a new node ID
#ifndef TORRENT_DISABLE_DHT
if (m_dht) m_dht->update_node_id();
if (m_dht) m_dht->update_node_id(&sock);
#endif
}

View File

@ -49,6 +49,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/bloom_filter.hpp"
#include "libtorrent/hasher.hpp"
#include "libtorrent/aux_/time.hpp"
#include "libtorrent/aux_/session_listen_socket.hpp"
#include "libtorrent/kademlia/node_id.hpp"
#include "libtorrent/kademlia/routing_table.hpp"
@ -108,10 +109,10 @@ void nop_node() {}
std::list<std::pair<udp::endpoint, entry>> g_sent_packets;
struct mock_socket final : udp_socket_interface
struct mock_socket final : socket_manager
{
bool has_quota() override { return true; }
bool send_packet(entry& msg, udp::endpoint const& ep) override
bool send_packet(aux::session_listen_socket* s, entry& msg, udp::endpoint const& ep) override
{
// TODO: 3 ideally the mock_socket would contain this queue of packets, to
// make tests independent
@ -120,6 +121,32 @@ struct mock_socket final : udp_socket_interface
}
};
struct mock_dht_socket final : aux::session_listen_socket
{
mock_dht_socket() : m_external_address(addr4("236.0.0.1")), m_local_endpoint(addr4("192.168.4.1"), 6881) {}
explicit mock_dht_socket(address ep) : m_external_address(ep), m_local_endpoint(ep, 6881) {}
address get_external_address() override { return m_external_address; }
tcp::endpoint get_local_endpoint() override { return m_local_endpoint; }
address m_external_address;
tcp::endpoint m_local_endpoint;
};
struct mock_dht_socket6 final : aux::session_listen_socket
{
address get_external_address() override { return m_external_address; }
tcp::endpoint get_local_endpoint() override { return m_local_endpoint; }
address m_external_address = addr6("2002::1");
tcp::endpoint m_local_endpoint = tcp::endpoint(addr6("2002::1"), 6881);
};
node* get_foreign_node_stub(node_id const&, std::string const&)
{
return nullptr;
}
sha1_hash generate_next()
{
sha1_hash ret;
@ -468,16 +495,12 @@ void put_immutable_item_cb(int num, int expect)
struct obs : dht::dht_observer
{
void set_external_address(address const& addr
void set_external_address(aux::session_listen_socket* s, address const& addr
, address const& source) override
{
m_external_address = addr;
static_cast<mock_dht_socket*>(s)->m_external_address = addr;
}
address external_address(udp proto) override
{
return m_external_address;
}
void get_peers(sha1_hash const& ih) override {}
void outgoing_get_peers(sha1_hash const& target
, sha1_hash const& sent_target, udp::endpoint const& ep) override {}
@ -500,8 +523,6 @@ struct obs : dht::dht_observer
bool on_dht_request(string_view query
, dht::msg const& request, entry& response) override { return false; }
address m_external_address = addr4("236.0.0.1");
#ifndef TORRENT_DISABLE_LOGGING
std::vector<std::string> m_log;
#endif
@ -520,20 +541,22 @@ struct dht_test_setup
{
explicit dht_test_setup(udp::endpoint src)
: sett(test_settings())
, ds(src.address())
, dht_storage(dht_default_storage_constructor(sett))
, source(src)
, dht_node(src.protocol(), &s, sett
, node_id(nullptr), &observer, cnt, nodes, *dht_storage)
, dht_node(&ds, &s, sett
, node_id(nullptr), &observer, cnt, get_foreign_node_stub, *dht_storage)
{
dht_storage->update_node_ids({node_id::min()});
}
dht_settings sett;
mock_socket s;
mock_dht_socket ds;
obs observer;
counters cnt;
std::unique_ptr<dht_storage_interface> dht_storage;
udp::endpoint source;
std::map<std::string, node*> nodes;
dht::node dht_node;
char error_string[200];
};
@ -2577,15 +2600,24 @@ TORRENT_TEST(dht_dual_stack)
// TODO: 3 use dht_test_setup class to simplify the node setup
dht_settings sett = test_settings();
mock_socket s;
mock_dht_socket sock4;
mock_dht_socket6 sock6;
obs observer;
counters cnt;
std::map<std::string, node*> nodes;
node* node4p = nullptr, *node6p = nullptr;
auto get_foreign_node = [&](node_id const&, std::string const& family)
{
if (family == "n4") return node4p;
if (family == "n6") return node6p;
TEST_CHECK(false);
return (node*)nullptr;
};
std::unique_ptr<dht_storage_interface> dht_storage(dht_default_storage_constructor(sett));
dht_storage->update_node_ids({node_id(nullptr)});
dht::node node4(udp::v4(), &s, sett, node_id(nullptr), &observer, cnt, nodes, *dht_storage);
dht::node node6(udp::v6(), &s, sett, node_id(nullptr), &observer, cnt, nodes, *dht_storage);
nodes.insert(std::make_pair("n4", &node4));
nodes.insert(std::make_pair("n6", &node6));
dht::node node4(&sock4, &s, sett, node_id(nullptr), &observer, cnt, get_foreign_node, *dht_storage);
dht::node node6(&sock6, &s, sett, node_id(nullptr), &observer, cnt, get_foreign_node, *dht_storage);
node4p = &node4;
node6p = &node6;
// DHT should be running on port 48199 now
bdecode_node response;
@ -3051,7 +3083,7 @@ TORRENT_TEST(node_set_id)
{
dht_test_setup t(udp::endpoint(rand_v4(), 20));
node_id old_nid = t.dht_node.nid();
t.observer.set_external_address(addr4("237.0.0.1"), rand_v4());
t.observer.set_external_address(&t.ds, addr4("237.0.0.1"), rand_v4());
t.dht_node.update_node_id();
TEST_CHECK(old_nid != t.dht_node.nid());
// now that we've changed the node's id, make sure the id sent in outgoing messages
@ -3080,13 +3112,13 @@ TORRENT_TEST(read_only_node)
dht_settings sett = test_settings();
sett.read_only = true;
mock_socket s;
mock_dht_socket ds;
obs observer;
counters cnt;
std::map<std::string, node*> nodes;
std::unique_ptr<dht_storage_interface> dht_storage(dht_default_storage_constructor(sett));
dht_storage->update_node_ids({node_id(nullptr)});
dht::node node(udp::v4(), &s, sett, node_id(nullptr), &observer, cnt, nodes, *dht_storage);
dht::node node(&ds, &s, sett, node_id(nullptr), &observer, cnt, get_foreign_node_stub, *dht_storage);
udp::endpoint source(addr("10.0.0.1"), 20);
bdecode_node response;
msg_args args;
@ -3169,13 +3201,13 @@ TORRENT_TEST(invalid_error_msg)
// TODO: 3 use dht_test_setup class to simplify the node setup
dht_settings sett = test_settings();
mock_socket s;
mock_dht_socket ds;
obs observer;
counters cnt;
std::map<std::string, node*> nodes;
std::unique_ptr<dht_storage_interface> dht_storage(dht_default_storage_constructor(sett));
dht_storage->update_node_ids({node_id(nullptr)});
dht::node node(udp::v4(), &s, sett, node_id(nullptr), &observer, cnt, nodes, *dht_storage);
dht::node node(&ds, &s, sett, node_id(nullptr), &observer, cnt, get_foreign_node_stub, *dht_storage);
udp::endpoint source(addr("10.0.0.1"), 20);
entry e;
@ -3210,15 +3242,15 @@ TORRENT_TEST(rpc_invalid_error_msg)
// TODO: 3 use dht_test_setup class to simplify the node setup
dht_settings sett = test_settings();
mock_socket s;
mock_dht_socket ds;
obs observer;
counters cnt;
std::map<std::string, node*> nodes;
dht::routing_table table(node_id(), udp::v4(), 8, sett, &observer);
dht::rpc_manager rpc(node_id(), sett, table, &s, &observer);
dht::rpc_manager rpc(node_id(), sett, table, &ds, &s, &observer);
std::unique_ptr<dht_storage_interface> dht_storage(dht_default_storage_constructor(sett));
dht_storage->update_node_ids({node_id(nullptr)});
dht::node node(udp::v4(), &s, sett, node_id(nullptr), &observer, cnt, nodes, *dht_storage);
dht::node node(&ds, &s, sett, node_id(nullptr), &observer, cnt, get_foreign_node_stub, *dht_storage);
udp::endpoint source(addr("10.0.0.1"), 20);
@ -3480,13 +3512,13 @@ TORRENT_TEST(dht_state)
{
dht_state s;
s.nid = to_hash("0000000000000000000000000000000000000001");
s.nids.emplace_back(address::from_string("1.1.1.1"), to_hash("0000000000000000000000000000000000000001"));
s.nodes.push_back(uep("1.1.1.1", 1));
s.nodes.push_back(uep("2.2.2.2", 2));
// not important that IPv6 is disabled here
s.nid6 = to_hash("0000000000000000000000000000000000000002");
s.nodes6.push_back(uep("3.3.3.3", 3));
s.nodes6.push_back(uep("4.4.4.4", 4));
s.nids.emplace_back(address::from_string("1::1"), to_hash("0000000000000000000000000000000000000002"));
s.nodes.push_back(uep("1::1", 3));
s.nodes.push_back(uep("2::2", 4));
entry const e = save_dht_state(s);
@ -3499,18 +3531,14 @@ TORRENT_TEST(dht_state)
TEST_CHECK(!r);
dht_state const s1 = read_dht_state(n);
TEST_EQUAL(s1.nid, s.nid);
TEST_CHECK(s1.nids == s.nids);
TEST_CHECK(s1.nodes == s.nodes);
TEST_EQUAL(s1.nid6, s.nid6);
TEST_CHECK(s1.nodes6 == s.nodes6);
// empty
bdecode_node n1;
dht_state const s2 = read_dht_state(n1);
TEST_EQUAL(s2.nid, node_id());
TEST_CHECK(s2.nids.empty());
TEST_CHECK(s2.nodes.empty());
TEST_EQUAL(s2.nid6, node_id());
TEST_CHECK(s2.nodes6.empty());
}
// TODO: test obfuscated_get_peers

View File

@ -110,13 +110,11 @@ TORRENT_TEST(dht_state)
sett.max_peers = 20000;
dht_state s;
s.nid = to_hash("0000000000000000000000000000000000000001");
s.nids.emplace_back(addr4("0.0.0.0"), to_hash("0000000000000000000000000000000000000001"));
s.nodes.push_back(uep("1.1.1.1", 1));
s.nodes.push_back(uep("2.2.2.2", 2));
// not important that IPv6 is disabled here
s.nid6 = to_hash("0000000000000000000000000000000000000002");
s.nodes6.push_back(uep("3.3.3.3", 3));
s.nodes6.push_back(uep("4.4.4.4", 4));
s.nids.emplace_back(addr6("::"), to_hash("0000000000000000000000000000000000000002"));
session_params params(p);
params.dht_settings = sett;
@ -140,9 +138,11 @@ TORRENT_TEST(dht_state)
TEST_EQUAL(params1.dht_settings.max_peers, 20000);
// not a chance the nid will be the fake initial ones
TEST_CHECK(params1.dht_state.nid != s.nid);
TEST_CHECK(params1.dht_state.nids[0].second != s.nids[0].second);
#if TORRENT_USE_IPV6
TEST_CHECK(params1.dht_state.nid6 != s.nid6);
// the host machine may not have IPv6 support in which case there will only be one entry
if (params1.dht_state.nids.size() > 1)
TEST_CHECK(params1.dht_state.nids[1].second != s.nids[1].second);
#endif
}
#endif