From 8be7ab559aafd037041d94481d20051f47151f69 Mon Sep 17 00:00:00 2001 From: arvidn Date: Tue, 7 Jan 2020 17:53:45 +0100 Subject: [PATCH] remove outgoing udp sockets and replace with listen_socket_t --- CMakeLists.txt | 1 - Jamfile | 1 - .../libtorrent/aux_/listen_socket_handle.hpp | 2 + include/libtorrent/aux_/session_impl.hpp | 20 +- include/libtorrent/aux_/session_interface.hpp | 1 - .../libtorrent/aux_/session_udp_sockets.hpp | 42 +--- include/libtorrent/utp_socket_manager.hpp | 2 +- simulation/test_socks5.cpp | 6 +- src/Makefile.am | 1 - src/session_impl.cpp | 202 ++++-------------- src/session_udp_sockets.cpp | 123 ----------- src/torrent.cpp | 3 +- src/utp_stream.cpp | 2 +- 13 files changed, 67 insertions(+), 339 deletions(-) delete mode 100644 src/session_udp_sockets.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index ffe2f71ed..3b60dbd0e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -338,7 +338,6 @@ set(sources session_handle session_impl session_settings - session_udp_sockets proxy_settings session_stats settings_pack diff --git a/Jamfile b/Jamfile index 8a39bf9ba..1bcca03ae 100644 --- a/Jamfile +++ b/Jamfile @@ -665,7 +665,6 @@ SOURCES = session_handle session_impl session_call - session_udp_sockets settings_pack sha1 sha1_hash diff --git a/include/libtorrent/aux_/listen_socket_handle.hpp b/include/libtorrent/aux_/listen_socket_handle.hpp index 88f7b44d9..776ebf709 100644 --- a/include/libtorrent/aux_/listen_socket_handle.hpp +++ b/include/libtorrent/aux_/listen_socket_handle.hpp @@ -77,6 +77,8 @@ namespace libtorrent { namespace aux { listen_socket_t* get() const; + std::weak_ptr get_ptr() const { return m_sock; } + private: std::weak_ptr m_sock; }; diff --git a/include/libtorrent/aux_/session_impl.hpp b/include/libtorrent/aux_/session_impl.hpp index 0a45f6ea3..9e67616f4 100644 --- a/include/libtorrent/aux_/session_impl.hpp +++ b/include/libtorrent/aux_/session_impl.hpp @@ -137,7 +137,7 @@ namespace aux { int port = 0; }; - struct TORRENT_EXTRA_EXPORT listen_socket_t + struct TORRENT_EXTRA_EXPORT listen_socket_t : utp_socket_interface { static constexpr listen_socket_flags_t accept_incoming = 0_bit; static constexpr listen_socket_flags_t has_gateway = 1_bit; @@ -154,6 +154,13 @@ namespace aux { listen_socket_t& operator=(listen_socket_t const&) = delete; listen_socket_t& operator=(listen_socket_t&&) = delete; + udp::endpoint get_local_endpoint() override + { + error_code ec; + if (udp_sock) return udp_sock->sock.local_endpoint(ec); + return {local_endpoint.address(), local_endpoint.port()}; + } + // returns true if this listen socket/interface can reach and be reached // by the given address. This is useful to know whether it should be // annoucned to a tracker (given the tracker's IP) or whether it should @@ -737,9 +744,8 @@ namespace aux { mutable std::condition_variable cond; // implements session_interface - bool has_udp_outgoing_sockets() const override; - tcp::endpoint bind_outgoing_socket(socket_type& s, address - const& remote_address, error_code& ec) const override; + tcp::endpoint bind_outgoing_socket(socket_type& s + , address const& remote_address, error_code& ec) const override; bool verify_incoming_interface(address const& addr); bool verify_bound_address(address const& addr, bool utp , error_code& ec) override; @@ -984,8 +990,6 @@ namespace aux { // we might need more than one listen socket std::vector> m_listen_sockets; - outgoing_sockets m_outgoing_sockets; - #if TORRENT_USE_I2P i2p_connection m_i2p_conn; std::shared_ptr m_i2p_listen_socket; @@ -1147,7 +1151,7 @@ namespace aux { ec = boost::asio::error::bad_descriptor; return; } - send_udp_packet_hostname(s->udp_sock, hostname, port, p, ec, flags); + send_udp_packet_hostname(sock.get_ptr(), hostname, port, p, ec, flags); } void send_udp_packet(std::weak_ptr sock @@ -1168,7 +1172,7 @@ namespace aux { ec = boost::asio::error::bad_descriptor; return; } - send_udp_packet(s->udp_sock, ep, p, ec, flags); + send_udp_packet(sock.get_ptr(), ep, p, ec, flags); } void on_udp_writeable(std::weak_ptr s, error_code const& ec); diff --git a/include/libtorrent/aux_/session_interface.hpp b/include/libtorrent/aux_/session_interface.hpp index 294e44834..9d8a2912b 100644 --- a/include/libtorrent/aux_/session_interface.hpp +++ b/include/libtorrent/aux_/session_interface.hpp @@ -208,7 +208,6 @@ namespace aux { virtual void for_each_listen_socket(std::function f) = 0; // ask for which interface and port to bind outgoing peer connections on - virtual bool has_udp_outgoing_sockets() const = 0; virtual tcp::endpoint bind_outgoing_socket(socket_type& s, address const& remote_address, error_code& ec) const = 0; virtual bool verify_bound_address(address const& addr, bool utp diff --git a/include/libtorrent/aux_/session_udp_sockets.hpp b/include/libtorrent/aux_/session_udp_sockets.hpp index 02956ea8b..5a8c7a947 100644 --- a/include/libtorrent/aux_/session_udp_sockets.hpp +++ b/include/libtorrent/aux_/session_udp_sockets.hpp @@ -52,12 +52,12 @@ namespace aux { enum class transport : std::uint8_t { plaintext, ssl }; - struct session_udp_socket : utp_socket_interface + struct session_udp_socket { explicit session_udp_socket(io_service& ios, listen_socket_handle ls) : sock(ios, std::move(ls)) {} - udp::endpoint local_endpoint() override { return sock.local_endpoint(); } + udp::endpoint local_endpoint() { return sock.local_endpoint(); } udp_socket sock; @@ -72,44 +72,6 @@ namespace aux { bool write_blocked = false; }; - struct outgoing_udp_socket final : session_udp_socket - { - outgoing_udp_socket(io_service& ios, std::string const& dev, transport ssl_) - : session_udp_socket(ios, listen_socket_handle{}), device(dev), ssl(ssl_) {} - - // the name of the device the socket is bound to, may be empty - // if the socket is not bound to a device - std::string const device; - - // set to true if this is an SSL socket - transport const ssl; - }; - - // sockets used for outgoing utp connections - struct TORRENT_EXTRA_EXPORT outgoing_sockets - { - // partitions sockets based on whether they match one of the given endpoints - // all matched sockets are ordered before unmatched sockets - // matched endpoints are removed from the vector - // returns an iterator to the first unmatched socket - std::vector>::iterator - partition_outgoing_sockets(std::vector& eps); - - tcp::endpoint bind(socket_type& s, address const& remote_address - , error_code& ec) const; - - void update_proxy(proxy_settings const& settings, alert_manager& alerts); - - // close all sockets - void close(); - - std::vector> sockets; - private: - // round-robin index into sockets - // one dimension for IPv4/IPv6 and a second for SSL/non-SSL - mutable std::array, 2> index = {{ {{0, 0}}, {{0, 0}} }}; - }; - } } #endif diff --git a/include/libtorrent/utp_socket_manager.hpp b/include/libtorrent/utp_socket_manager.hpp index 09883448e..ba301f845 100644 --- a/include/libtorrent/utp_socket_manager.hpp +++ b/include/libtorrent/utp_socket_manager.hpp @@ -52,7 +52,7 @@ namespace libtorrent { // interface/handle to the underlying udp socket struct TORRENT_EXTRA_EXPORT utp_socket_interface { - virtual udp::endpoint local_endpoint() = 0; + virtual udp::endpoint get_local_endpoint() = 0; protected: virtual ~utp_socket_interface() = default; }; diff --git a/simulation/test_socks5.cpp b/simulation/test_socks5.cpp index 71122f67c..73085c3d8 100644 --- a/simulation/test_socks5.cpp +++ b/simulation/test_socks5.cpp @@ -278,8 +278,6 @@ TORRENT_TEST(socks5_udp_retry) // number of UDP ASSOCIATE commands invoked on the socks proxy // We run for 60 seconds. The sokcks5 retry interval is expected to be 5 - // seconds, meaning there should have been 12 connection attempts since we - // have an outgoing_udp_socket as well, it will also attempt to establish a - // SOCKS5 UDP tunnel, so it will bring it to 24 attempts - TEST_EQUAL(socks5.cmd_counts()[2], 24); + // seconds, meaning there should have been 12 connection attempts + TEST_EQUAL(socks5.cmd_counts()[2], 12); } diff --git a/src/Makefile.am b/src/Makefile.am index dacdf806f..7afead306 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -120,7 +120,6 @@ libtorrent_rasterbar_la_SOURCES = \ session_handle.cpp \ session_impl.cpp \ session_settings.cpp \ - session_udp_sockets.cpp \ proxy_settings.cpp \ settings_pack.cpp \ sha1_hash.cpp \ diff --git a/src/session_impl.cpp b/src/session_impl.cpp index e0d6839e7..b349d80e0 100644 --- a/src/session_impl.cpp +++ b/src/session_impl.cpp @@ -333,7 +333,7 @@ namespace aux { bool listen_socket_t::can_route(address const& addr) const { - if (local_endpoint.address().is_v4() != addr.is_v4()) return false; + if (is_v4(local_endpoint) != addr.is_v4()) return false; if (local_endpoint.address().is_v6() && local_endpoint.address().to_v6().scope_id() != addr.to_v6().scope_id()) @@ -654,7 +654,6 @@ namespace aux { // apply all m_settings to this session run_all_updates(*this); reopen_listen_sockets(false); - reopen_outgoing_sockets(); #if TORRENT_USE_INVARIANT_CHECKS check_invariant(); @@ -978,8 +977,6 @@ namespace aux { } } - m_outgoing_sockets.close(); - // we need to give all the sockets an opportunity to actually have their handlers // called and cancelled before we continue the shutdown. This is a bit // complicated, if there are no "undead" peers, it's safe to resume the @@ -1345,11 +1342,6 @@ namespace aux { && pack.get_str(settings_pack::listen_interfaces) != m_settings.get_str(settings_pack::listen_interfaces)); - bool const reopen_outgoing_port = - (pack.has_val(settings_pack::outgoing_interfaces) - && pack.get_str(settings_pack::outgoing_interfaces) - != m_settings.get_str(settings_pack::outgoing_interfaces)); - #ifndef TORRENT_DISABLE_LOGGING session_log("applying settings pack, reopen_listen_port=%s" , reopen_listen_port ? "true" : "false"); @@ -1369,9 +1361,6 @@ namespace aux { { reopen_listen_sockets(); } - - if (reopen_outgoing_port) - reopen_outgoing_sockets(); } std::shared_ptr session_impl::setup_listener( @@ -2030,139 +2019,9 @@ namespace aux { #endif } - void session_impl::reopen_outgoing_sockets() - { - // first build a list of endpoints we should be listening on - // we need to remove any unneeded sockets first to avoid the possibility - // of a new socket failing to bind due to a conflict with a stale socket - std::vector eps; - - for (auto const& iface : m_outgoing_interfaces) - { - interface_to_endpoints(iface, 0, transport::plaintext, listen_socket_t::accept_incoming, eps); -#ifdef TORRENT_USE_OPENSSL - interface_to_endpoints(iface, 0, transport::ssl, listen_socket_t::accept_incoming, eps); -#endif - } - - // if no outgoing interfaces are specified, create sockets to use - // any interface - if (eps.empty()) - { - eps.emplace_back(address_v4(), 0, "", transport::plaintext, listen_socket_flags_t{}); - eps.emplace_back(address_v6(), 0, "", transport::plaintext, listen_socket_flags_t{}); -#ifdef TORRENT_USE_OPENSSL - eps.emplace_back(address_v4(), 0, "", transport::ssl, listen_socket_flags_t{}); - eps.emplace_back(address_v6(), 0, "", transport::ssl, listen_socket_flags_t{}); -#endif - } - - auto remove_iter = m_outgoing_sockets.partition_outgoing_sockets(eps); - - for (auto i = remove_iter; i != m_outgoing_sockets.sockets.end(); ++i) - { - auto& remove_sock = *i; - m_utp_socket_manager.remove_udp_socket(remove_sock); - -#ifndef TORRENT_DISABLE_LOGGING - if (should_log()) - { - session_log("Closing outgoing UDP socket for %s on device \"%s\"" - , print_endpoint(remove_sock->local_endpoint()).c_str() - , remove_sock->device.c_str()); - } -#endif - remove_sock->sock.close(); - } - - m_outgoing_sockets.sockets.erase(remove_iter, m_outgoing_sockets.sockets.end()); - - // open new sockets on any endpoints that didn't match with - // an existing socket - for (auto const& ep : eps) - { - error_code ec; - udp::endpoint const udp_bind_ep(ep.addr, 0); - - auto udp_sock = std::make_shared(m_io_service, ep.device, ep.ssl); - udp_sock->sock.open(udp_bind_ep.protocol(), ec); - if (ec) - { -#ifndef TORRENT_DISABLE_LOGGING - if (should_log()) - { - session_log("failed to open UDP socket: %s: %s" - , ep.device.c_str(), ec.message().c_str()); - } -#endif - if (m_alerts.should_post()) - m_alerts.emplace_alert(udp_bind_ep - , operation_t::sock_open, ec); - continue; - } - -#if TORRENT_HAS_BINDTODEVICE - if (!ep.device.empty()) - { - udp_sock->sock.set_option(bind_to_device(ep.device.c_str()), ec); -#ifndef TORRENT_DISABLE_LOGGING - if (ec && should_log()) - { - session_log("bind to device failed (device: %s): %s" - , ep.device.c_str(), ec.message().c_str()); - } -#endif // TORRENT_DISABLE_LOGGING - ec.clear(); - } -#endif - udp_sock->sock.bind(udp_bind_ep, ec); - - if (ec) - { -#ifndef TORRENT_DISABLE_LOGGING - if (should_log()) - { - session_log("failed to bind UDP socket: %s: %s" - , ep.device.c_str(), ec.message().c_str()); - } -#endif - if (m_alerts.should_post()) - m_alerts.emplace_alert(udp_bind_ep - , operation_t::sock_bind, ec); - continue; - } - - error_code err; - set_socket_buffer_size(udp_sock->sock, m_settings, err); - if (err) - { - if (m_alerts.should_post()) - m_alerts.emplace_alert(udp_sock->sock.local_endpoint(ec) - , operation_t::alloc_recvbuf, err); - } - - // this call is necessary here because, unless the settings actually - // change after the session is up and listening, at no other point - // set_proxy_settings is called with the correct proxy configuration, - // internally, this method handle the SOCKS5's connection logic - udp_sock->sock.set_proxy_settings(proxy(), m_alerts); - - ADD_OUTSTANDING_ASYNC("session_impl::on_udp_packet"); - udp_sock->sock.async_read(aux::make_handler(std::bind(&session_impl::on_udp_packet - , this, udp_sock, std::weak_ptr(), ep.ssl, _1) - , udp_sock->udp_handler_storage, *this)); - - if (!ec && udp_sock) - { - m_outgoing_sockets.sockets.push_back(udp_sock); - } - } - } - void session_impl::reopen_network_sockets(reopen_network_flags_t const options) { reopen_listen_sockets(bool(options & session_handle::reopen_map_ports)); - reopen_outgoing_sockets(); } namespace { @@ -2333,7 +2192,7 @@ namespace aux { return; } - auto s = std::static_pointer_cast(si); + auto s = std::static_pointer_cast(si)->udp_sock; s->sock.send_hostname(hostname, port, p, ec, flags); @@ -2360,7 +2219,7 @@ namespace aux { return; } - auto s = std::static_pointer_cast(si); + auto s = std::static_pointer_cast(si)->udp_sock; TORRENT_ASSERT(s->sock.is_closed() || s->sock.local_endpoint().protocol() == ep.protocol()); @@ -2474,7 +2333,7 @@ namespace aux { // give the uTP socket manager first dibs on the packet. Presumably // the majority of packets are uTP packets. - if (!mgr.incoming_packet(socket, packet.from, buf)) + if (!mgr.incoming_packet(ls, packet.from, buf)) { // if it wasn't a uTP packet, try the other users of the UDP // socket @@ -5031,13 +4890,8 @@ namespace aux { #endif } - bool session_impl::has_udp_outgoing_sockets() const - { - return !m_outgoing_sockets.sockets.empty(); - } - - tcp::endpoint session_impl::bind_outgoing_socket(socket_type& s, address - const& remote_address, error_code& ec) const + tcp::endpoint session_impl::bind_outgoing_socket(socket_type& s + , address const& remote_address, error_code& ec) const { tcp::endpoint bind_ep(address_v4(), 0); if (m_settings.get_int(settings_pack::outgoing_port) > 0) @@ -5060,9 +4914,46 @@ namespace aux { if (is_utp(s)) { - auto const ep = m_outgoing_sockets.bind(s, remote_address, ec); - if (ep.port() != 0 || ec) - return ep; + // TODO: factor out this logic into a separate function for unit + // testing + + utp_socket_impl* impl = nullptr; + transport ssl = transport::plaintext; +#ifdef TORRENT_USE_OPENSSL + if (s.get>() != nullptr) + { + impl = s.get>()->next_layer().get_impl(); + ssl = transport::ssl; + } + else +#endif + impl = s.get()->get_impl(); + + std::vector> with_gateways; + std::shared_ptr match; + for (auto& ls : m_listen_sockets) + { + if (is_v4(ls->local_endpoint) != remote_address.is_v4()) continue; + if (ls->ssl != ssl) continue; + if (ls->flags & listen_socket_t::has_gateway) + with_gateways.push_back(ls); + + if (match_addr_mask(ls->local_endpoint.address(), remote_address, ls->netmask)) + { + // is this better than the previous match? + match = ls; + } + } + if (!match && !with_gateways.empty()) + match = with_gateways[random(with_gateways.size() - 1)]; + + if (match) + { + utp_init_socket(impl, match); + return match->local_endpoint; + } + ec.assign(boost::system::errc::not_supported, generic_category()); + return {}; } if (!m_outgoing_interfaces.empty()) @@ -5333,7 +5224,6 @@ namespace aux { { for (auto& i : m_listen_sockets) i->udp_sock->sock.set_proxy_settings(proxy(), m_alerts); - m_outgoing_sockets.update_proxy(proxy(), m_alerts); } void session_impl::update_ip_notifier() diff --git a/src/session_udp_sockets.cpp b/src/session_udp_sockets.cpp deleted file mode 100644 index 9b314c064..000000000 --- a/src/session_udp_sockets.cpp +++ /dev/null @@ -1,123 +0,0 @@ -/* - -Copyright (c) 2017, Arvid Norberg, Steven Siloti -All rights reserved. - -Redistribution and use in source and binary forms, with or without -modification, are permitted provided that the following conditions -are met: - -* Redistributions of source code must retain the above copyright -notice, this list of conditions and the following disclaimer. -* Redistributions in binary form must reproduce the above copyright -notice, this list of conditions and the following disclaimer in -the documentation and/or other materials provided with the distribution. -* Neither the name of the author nor the names of its -contributors may be used to endorse or promote products derived -from this software without specific prior written permission. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE -LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR -CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF -SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN -CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) -ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE -POSSIBILITY OF SUCH DAMAGE. - -*/ - -#include "libtorrent/aux_/session_udp_sockets.hpp" -#include "libtorrent/aux_/session_impl.hpp" -#include "libtorrent/error_code.hpp" - -namespace libtorrent { namespace aux { - - std::vector>::iterator - outgoing_sockets::partition_outgoing_sockets(std::vector& eps) - { - return std::partition(sockets.begin(), sockets.end() - , [&eps](std::shared_ptr const& sock) - { - auto match = std::find_if(eps.begin(), eps.end() - , [&sock](listen_endpoint_t const& ep) - { - return ep.device == sock->device - && ep.addr == sock->sock.local_endpoint().address() - && ep.ssl == sock->ssl; - }); - - if (match != eps.end()) - { - // remove the matched endpoint to signal the caller that it - // doesn't need to create a socket for the endpoint - eps.erase(match); - return true; - } - else - { - return false; - } - }); - } - - tcp::endpoint outgoing_sockets::bind(socket_type& s - , address const& remote_address, error_code& ec) const - { - if (sockets.empty()) - { - ec.assign(boost::system::errc::not_supported, generic_category()); - return tcp::endpoint(); - } - - utp_socket_impl* impl = nullptr; - transport ssl = transport::plaintext; -#ifdef TORRENT_USE_OPENSSL - if (s.get>() != nullptr) - { - impl = s.get>()->next_layer().get_impl(); - ssl = transport::ssl; - } - else -#endif - impl = s.get()->get_impl(); - - auto& idx = index[remote_address.is_v4() ? 0 : 1][ssl == transport::ssl ? 1 : 0]; - auto const index_begin = idx; - - for (;;) - { - if (++idx >= sockets.size()) - idx = 0; - - if (is_v4(sockets[idx]->local_endpoint()) != remote_address.is_v4() - || sockets[idx]->ssl != ssl) - { - if (idx == index_begin) break; - continue; - } - - utp_init_socket(impl, sockets[idx]); - auto udp_ep = sockets[idx]->local_endpoint(); - return tcp::endpoint(udp_ep.address(), udp_ep.port()); - } - - return tcp::endpoint(); - } - - void outgoing_sockets::update_proxy(proxy_settings const& settings, alert_manager& alerts) - { - for (auto const& i : sockets) - i->sock.set_proxy_settings(settings, alerts); - } - - void outgoing_sockets::close() - { - for (auto const& l : sockets) - l->sock.close(); - } - -} } diff --git a/src/torrent.cpp b/src/torrent.cpp index 5f6cff268..719da35e9 100644 --- a/src/torrent.cpp +++ b/src/torrent.cpp @@ -6617,8 +6617,7 @@ bool is_downloading_state(int const st) if (settings().get_bool(settings_pack::enable_outgoing_utp) && (!settings().get_bool(settings_pack::enable_outgoing_tcp) || peerinfo->supports_utp - || peerinfo->confirmed_supports_utp) - && m_ses.has_udp_outgoing_sockets()) + || peerinfo->confirmed_supports_utp)) { sm = m_ses.utp_socket_manager(); } diff --git a/src/utp_stream.cpp b/src/utp_stream.cpp index 160fdd85e..82225b308 100644 --- a/src/utp_stream.cpp +++ b/src/utp_stream.cpp @@ -821,7 +821,7 @@ utp_stream::endpoint_type utp_stream::local_endpoint(error_code& ec) const return endpoint_type(); } - udp::endpoint ep = s->local_endpoint(); + udp::endpoint ep = s->get_local_endpoint(); return endpoint_type(ep.address(), ep.port()); }