remove outgoing udp sockets and replace with listen_socket_t

This commit is contained in:
arvidn 2020-01-07 17:53:45 +01:00 committed by Arvid Norberg
parent 4ceb2ea467
commit 8be7ab559a
13 changed files with 67 additions and 339 deletions

View File

@ -338,7 +338,6 @@ set(sources
session_handle
session_impl
session_settings
session_udp_sockets
proxy_settings
session_stats
settings_pack

View File

@ -665,7 +665,6 @@ SOURCES =
session_handle
session_impl
session_call
session_udp_sockets
settings_pack
sha1
sha1_hash

View File

@ -77,6 +77,8 @@ namespace libtorrent { namespace aux {
listen_socket_t* get() const;
std::weak_ptr<listen_socket_t> get_ptr() const { return m_sock; }
private:
std::weak_ptr<listen_socket_t> m_sock;
};

View File

@ -137,7 +137,7 @@ namespace aux {
int port = 0;
};
struct TORRENT_EXTRA_EXPORT listen_socket_t
struct TORRENT_EXTRA_EXPORT listen_socket_t : utp_socket_interface
{
static constexpr listen_socket_flags_t accept_incoming = 0_bit;
static constexpr listen_socket_flags_t has_gateway = 1_bit;
@ -154,6 +154,13 @@ namespace aux {
listen_socket_t& operator=(listen_socket_t const&) = delete;
listen_socket_t& operator=(listen_socket_t&&) = delete;
udp::endpoint get_local_endpoint() override
{
error_code ec;
if (udp_sock) return udp_sock->sock.local_endpoint(ec);
return {local_endpoint.address(), local_endpoint.port()};
}
// returns true if this listen socket/interface can reach and be reached
// by the given address. This is useful to know whether it should be
// annoucned to a tracker (given the tracker's IP) or whether it should
@ -737,9 +744,8 @@ namespace aux {
mutable std::condition_variable cond;
// implements session_interface
bool has_udp_outgoing_sockets() const override;
tcp::endpoint bind_outgoing_socket(socket_type& s, address
const& remote_address, error_code& ec) const override;
tcp::endpoint bind_outgoing_socket(socket_type& s
, address const& remote_address, error_code& ec) const override;
bool verify_incoming_interface(address const& addr);
bool verify_bound_address(address const& addr, bool utp
, error_code& ec) override;
@ -984,8 +990,6 @@ namespace aux {
// we might need more than one listen socket
std::vector<std::shared_ptr<listen_socket_t>> m_listen_sockets;
outgoing_sockets m_outgoing_sockets;
#if TORRENT_USE_I2P
i2p_connection m_i2p_conn;
std::shared_ptr<socket_type> m_i2p_listen_socket;
@ -1147,7 +1151,7 @@ namespace aux {
ec = boost::asio::error::bad_descriptor;
return;
}
send_udp_packet_hostname(s->udp_sock, hostname, port, p, ec, flags);
send_udp_packet_hostname(sock.get_ptr(), hostname, port, p, ec, flags);
}
void send_udp_packet(std::weak_ptr<utp_socket_interface> sock
@ -1168,7 +1172,7 @@ namespace aux {
ec = boost::asio::error::bad_descriptor;
return;
}
send_udp_packet(s->udp_sock, ep, p, ec, flags);
send_udp_packet(sock.get_ptr(), ep, p, ec, flags);
}
void on_udp_writeable(std::weak_ptr<session_udp_socket> s, error_code const& ec);

View File

@ -208,7 +208,6 @@ namespace aux {
virtual void for_each_listen_socket(std::function<void(aux::listen_socket_handle const&)> f) = 0;
// ask for which interface and port to bind outgoing peer connections on
virtual bool has_udp_outgoing_sockets() const = 0;
virtual tcp::endpoint bind_outgoing_socket(socket_type& s, address const&
remote_address, error_code& ec) const = 0;
virtual bool verify_bound_address(address const& addr, bool utp

View File

@ -52,12 +52,12 @@ namespace aux {
enum class transport : std::uint8_t { plaintext, ssl };
struct session_udp_socket : utp_socket_interface
struct session_udp_socket
{
explicit session_udp_socket(io_service& ios, listen_socket_handle ls)
: sock(ios, std::move(ls)) {}
udp::endpoint local_endpoint() override { return sock.local_endpoint(); }
udp::endpoint local_endpoint() { return sock.local_endpoint(); }
udp_socket sock;
@ -72,44 +72,6 @@ namespace aux {
bool write_blocked = false;
};
struct outgoing_udp_socket final : session_udp_socket
{
outgoing_udp_socket(io_service& ios, std::string const& dev, transport ssl_)
: session_udp_socket(ios, listen_socket_handle{}), device(dev), ssl(ssl_) {}
// the name of the device the socket is bound to, may be empty
// if the socket is not bound to a device
std::string const device;
// set to true if this is an SSL socket
transport const ssl;
};
// sockets used for outgoing utp connections
struct TORRENT_EXTRA_EXPORT outgoing_sockets
{
// partitions sockets based on whether they match one of the given endpoints
// all matched sockets are ordered before unmatched sockets
// matched endpoints are removed from the vector
// returns an iterator to the first unmatched socket
std::vector<std::shared_ptr<outgoing_udp_socket>>::iterator
partition_outgoing_sockets(std::vector<listen_endpoint_t>& eps);
tcp::endpoint bind(socket_type& s, address const& remote_address
, error_code& ec) const;
void update_proxy(proxy_settings const& settings, alert_manager& alerts);
// close all sockets
void close();
std::vector<std::shared_ptr<outgoing_udp_socket>> sockets;
private:
// round-robin index into sockets
// one dimension for IPv4/IPv6 and a second for SSL/non-SSL
mutable std::array<std::array<std::uint8_t, 2>, 2> index = {{ {{0, 0}}, {{0, 0}} }};
};
} }
#endif

View File

@ -52,7 +52,7 @@ namespace libtorrent {
// interface/handle to the underlying udp socket
struct TORRENT_EXTRA_EXPORT utp_socket_interface
{
virtual udp::endpoint local_endpoint() = 0;
virtual udp::endpoint get_local_endpoint() = 0;
protected:
virtual ~utp_socket_interface() = default;
};

View File

@ -278,8 +278,6 @@ TORRENT_TEST(socks5_udp_retry)
// number of UDP ASSOCIATE commands invoked on the socks proxy
// We run for 60 seconds. The sokcks5 retry interval is expected to be 5
// seconds, meaning there should have been 12 connection attempts since we
// have an outgoing_udp_socket as well, it will also attempt to establish a
// SOCKS5 UDP tunnel, so it will bring it to 24 attempts
TEST_EQUAL(socks5.cmd_counts()[2], 24);
// seconds, meaning there should have been 12 connection attempts
TEST_EQUAL(socks5.cmd_counts()[2], 12);
}

View File

@ -120,7 +120,6 @@ libtorrent_rasterbar_la_SOURCES = \
session_handle.cpp \
session_impl.cpp \
session_settings.cpp \
session_udp_sockets.cpp \
proxy_settings.cpp \
settings_pack.cpp \
sha1_hash.cpp \

View File

@ -333,7 +333,7 @@ namespace aux {
bool listen_socket_t::can_route(address const& addr) const
{
if (local_endpoint.address().is_v4() != addr.is_v4()) return false;
if (is_v4(local_endpoint) != addr.is_v4()) return false;
if (local_endpoint.address().is_v6()
&& local_endpoint.address().to_v6().scope_id() != addr.to_v6().scope_id())
@ -654,7 +654,6 @@ namespace aux {
// apply all m_settings to this session
run_all_updates(*this);
reopen_listen_sockets(false);
reopen_outgoing_sockets();
#if TORRENT_USE_INVARIANT_CHECKS
check_invariant();
@ -978,8 +977,6 @@ namespace aux {
}
}
m_outgoing_sockets.close();
// we need to give all the sockets an opportunity to actually have their handlers
// called and cancelled before we continue the shutdown. This is a bit
// complicated, if there are no "undead" peers, it's safe to resume the
@ -1345,11 +1342,6 @@ namespace aux {
&& pack.get_str(settings_pack::listen_interfaces)
!= m_settings.get_str(settings_pack::listen_interfaces));
bool const reopen_outgoing_port =
(pack.has_val(settings_pack::outgoing_interfaces)
&& pack.get_str(settings_pack::outgoing_interfaces)
!= m_settings.get_str(settings_pack::outgoing_interfaces));
#ifndef TORRENT_DISABLE_LOGGING
session_log("applying settings pack, reopen_listen_port=%s"
, reopen_listen_port ? "true" : "false");
@ -1369,9 +1361,6 @@ namespace aux {
{
reopen_listen_sockets();
}
if (reopen_outgoing_port)
reopen_outgoing_sockets();
}
std::shared_ptr<listen_socket_t> session_impl::setup_listener(
@ -2030,139 +2019,9 @@ namespace aux {
#endif
}
void session_impl::reopen_outgoing_sockets()
{
// first build a list of endpoints we should be listening on
// we need to remove any unneeded sockets first to avoid the possibility
// of a new socket failing to bind due to a conflict with a stale socket
std::vector<listen_endpoint_t> eps;
for (auto const& iface : m_outgoing_interfaces)
{
interface_to_endpoints(iface, 0, transport::plaintext, listen_socket_t::accept_incoming, eps);
#ifdef TORRENT_USE_OPENSSL
interface_to_endpoints(iface, 0, transport::ssl, listen_socket_t::accept_incoming, eps);
#endif
}
// if no outgoing interfaces are specified, create sockets to use
// any interface
if (eps.empty())
{
eps.emplace_back(address_v4(), 0, "", transport::plaintext, listen_socket_flags_t{});
eps.emplace_back(address_v6(), 0, "", transport::plaintext, listen_socket_flags_t{});
#ifdef TORRENT_USE_OPENSSL
eps.emplace_back(address_v4(), 0, "", transport::ssl, listen_socket_flags_t{});
eps.emplace_back(address_v6(), 0, "", transport::ssl, listen_socket_flags_t{});
#endif
}
auto remove_iter = m_outgoing_sockets.partition_outgoing_sockets(eps);
for (auto i = remove_iter; i != m_outgoing_sockets.sockets.end(); ++i)
{
auto& remove_sock = *i;
m_utp_socket_manager.remove_udp_socket(remove_sock);
#ifndef TORRENT_DISABLE_LOGGING
if (should_log())
{
session_log("Closing outgoing UDP socket for %s on device \"%s\""
, print_endpoint(remove_sock->local_endpoint()).c_str()
, remove_sock->device.c_str());
}
#endif
remove_sock->sock.close();
}
m_outgoing_sockets.sockets.erase(remove_iter, m_outgoing_sockets.sockets.end());
// open new sockets on any endpoints that didn't match with
// an existing socket
for (auto const& ep : eps)
{
error_code ec;
udp::endpoint const udp_bind_ep(ep.addr, 0);
auto udp_sock = std::make_shared<outgoing_udp_socket>(m_io_service, ep.device, ep.ssl);
udp_sock->sock.open(udp_bind_ep.protocol(), ec);
if (ec)
{
#ifndef TORRENT_DISABLE_LOGGING
if (should_log())
{
session_log("failed to open UDP socket: %s: %s"
, ep.device.c_str(), ec.message().c_str());
}
#endif
if (m_alerts.should_post<udp_error_alert>())
m_alerts.emplace_alert<udp_error_alert>(udp_bind_ep
, operation_t::sock_open, ec);
continue;
}
#if TORRENT_HAS_BINDTODEVICE
if (!ep.device.empty())
{
udp_sock->sock.set_option(bind_to_device(ep.device.c_str()), ec);
#ifndef TORRENT_DISABLE_LOGGING
if (ec && should_log())
{
session_log("bind to device failed (device: %s): %s"
, ep.device.c_str(), ec.message().c_str());
}
#endif // TORRENT_DISABLE_LOGGING
ec.clear();
}
#endif
udp_sock->sock.bind(udp_bind_ep, ec);
if (ec)
{
#ifndef TORRENT_DISABLE_LOGGING
if (should_log())
{
session_log("failed to bind UDP socket: %s: %s"
, ep.device.c_str(), ec.message().c_str());
}
#endif
if (m_alerts.should_post<udp_error_alert>())
m_alerts.emplace_alert<udp_error_alert>(udp_bind_ep
, operation_t::sock_bind, ec);
continue;
}
error_code err;
set_socket_buffer_size(udp_sock->sock, m_settings, err);
if (err)
{
if (m_alerts.should_post<udp_error_alert>())
m_alerts.emplace_alert<udp_error_alert>(udp_sock->sock.local_endpoint(ec)
, operation_t::alloc_recvbuf, err);
}
// this call is necessary here because, unless the settings actually
// change after the session is up and listening, at no other point
// set_proxy_settings is called with the correct proxy configuration,
// internally, this method handle the SOCKS5's connection logic
udp_sock->sock.set_proxy_settings(proxy(), m_alerts);
ADD_OUTSTANDING_ASYNC("session_impl::on_udp_packet");
udp_sock->sock.async_read(aux::make_handler(std::bind(&session_impl::on_udp_packet
, this, udp_sock, std::weak_ptr<listen_socket_t>(), ep.ssl, _1)
, udp_sock->udp_handler_storage, *this));
if (!ec && udp_sock)
{
m_outgoing_sockets.sockets.push_back(udp_sock);
}
}
}
void session_impl::reopen_network_sockets(reopen_network_flags_t const options)
{
reopen_listen_sockets(bool(options & session_handle::reopen_map_ports));
reopen_outgoing_sockets();
}
namespace {
@ -2333,7 +2192,7 @@ namespace aux {
return;
}
auto s = std::static_pointer_cast<session_udp_socket>(si);
auto s = std::static_pointer_cast<aux::listen_socket_t>(si)->udp_sock;
s->sock.send_hostname(hostname, port, p, ec, flags);
@ -2360,7 +2219,7 @@ namespace aux {
return;
}
auto s = std::static_pointer_cast<session_udp_socket>(si);
auto s = std::static_pointer_cast<aux::listen_socket_t>(si)->udp_sock;
TORRENT_ASSERT(s->sock.is_closed() || s->sock.local_endpoint().protocol() == ep.protocol());
@ -2474,7 +2333,7 @@ namespace aux {
// give the uTP socket manager first dibs on the packet. Presumably
// the majority of packets are uTP packets.
if (!mgr.incoming_packet(socket, packet.from, buf))
if (!mgr.incoming_packet(ls, packet.from, buf))
{
// if it wasn't a uTP packet, try the other users of the UDP
// socket
@ -5031,13 +4890,8 @@ namespace aux {
#endif
}
bool session_impl::has_udp_outgoing_sockets() const
{
return !m_outgoing_sockets.sockets.empty();
}
tcp::endpoint session_impl::bind_outgoing_socket(socket_type& s, address
const& remote_address, error_code& ec) const
tcp::endpoint session_impl::bind_outgoing_socket(socket_type& s
, address const& remote_address, error_code& ec) const
{
tcp::endpoint bind_ep(address_v4(), 0);
if (m_settings.get_int(settings_pack::outgoing_port) > 0)
@ -5060,9 +4914,46 @@ namespace aux {
if (is_utp(s))
{
auto const ep = m_outgoing_sockets.bind(s, remote_address, ec);
if (ep.port() != 0 || ec)
return ep;
// TODO: factor out this logic into a separate function for unit
// testing
utp_socket_impl* impl = nullptr;
transport ssl = transport::plaintext;
#ifdef TORRENT_USE_OPENSSL
if (s.get<ssl_stream<utp_stream>>() != nullptr)
{
impl = s.get<ssl_stream<utp_stream>>()->next_layer().get_impl();
ssl = transport::ssl;
}
else
#endif
impl = s.get<utp_stream>()->get_impl();
std::vector<std::shared_ptr<listen_socket_t>> with_gateways;
std::shared_ptr<listen_socket_t> match;
for (auto& ls : m_listen_sockets)
{
if (is_v4(ls->local_endpoint) != remote_address.is_v4()) continue;
if (ls->ssl != ssl) continue;
if (ls->flags & listen_socket_t::has_gateway)
with_gateways.push_back(ls);
if (match_addr_mask(ls->local_endpoint.address(), remote_address, ls->netmask))
{
// is this better than the previous match?
match = ls;
}
}
if (!match && !with_gateways.empty())
match = with_gateways[random(with_gateways.size() - 1)];
if (match)
{
utp_init_socket(impl, match);
return match->local_endpoint;
}
ec.assign(boost::system::errc::not_supported, generic_category());
return {};
}
if (!m_outgoing_interfaces.empty())
@ -5333,7 +5224,6 @@ namespace aux {
{
for (auto& i : m_listen_sockets)
i->udp_sock->sock.set_proxy_settings(proxy(), m_alerts);
m_outgoing_sockets.update_proxy(proxy(), m_alerts);
}
void session_impl::update_ip_notifier()

View File

@ -1,123 +0,0 @@
/*
Copyright (c) 2017, Arvid Norberg, Steven Siloti
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in
the documentation and/or other materials provided with the distribution.
* Neither the name of the author nor the names of its
contributors may be used to endorse or promote products derived
from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
*/
#include "libtorrent/aux_/session_udp_sockets.hpp"
#include "libtorrent/aux_/session_impl.hpp"
#include "libtorrent/error_code.hpp"
namespace libtorrent { namespace aux {
std::vector<std::shared_ptr<outgoing_udp_socket>>::iterator
outgoing_sockets::partition_outgoing_sockets(std::vector<listen_endpoint_t>& eps)
{
return std::partition(sockets.begin(), sockets.end()
, [&eps](std::shared_ptr<outgoing_udp_socket> const& sock)
{
auto match = std::find_if(eps.begin(), eps.end()
, [&sock](listen_endpoint_t const& ep)
{
return ep.device == sock->device
&& ep.addr == sock->sock.local_endpoint().address()
&& ep.ssl == sock->ssl;
});
if (match != eps.end())
{
// remove the matched endpoint to signal the caller that it
// doesn't need to create a socket for the endpoint
eps.erase(match);
return true;
}
else
{
return false;
}
});
}
tcp::endpoint outgoing_sockets::bind(socket_type& s
, address const& remote_address, error_code& ec) const
{
if (sockets.empty())
{
ec.assign(boost::system::errc::not_supported, generic_category());
return tcp::endpoint();
}
utp_socket_impl* impl = nullptr;
transport ssl = transport::plaintext;
#ifdef TORRENT_USE_OPENSSL
if (s.get<ssl_stream<utp_stream>>() != nullptr)
{
impl = s.get<ssl_stream<utp_stream>>()->next_layer().get_impl();
ssl = transport::ssl;
}
else
#endif
impl = s.get<utp_stream>()->get_impl();
auto& idx = index[remote_address.is_v4() ? 0 : 1][ssl == transport::ssl ? 1 : 0];
auto const index_begin = idx;
for (;;)
{
if (++idx >= sockets.size())
idx = 0;
if (is_v4(sockets[idx]->local_endpoint()) != remote_address.is_v4()
|| sockets[idx]->ssl != ssl)
{
if (idx == index_begin) break;
continue;
}
utp_init_socket(impl, sockets[idx]);
auto udp_ep = sockets[idx]->local_endpoint();
return tcp::endpoint(udp_ep.address(), udp_ep.port());
}
return tcp::endpoint();
}
void outgoing_sockets::update_proxy(proxy_settings const& settings, alert_manager& alerts)
{
for (auto const& i : sockets)
i->sock.set_proxy_settings(settings, alerts);
}
void outgoing_sockets::close()
{
for (auto const& l : sockets)
l->sock.close();
}
} }

View File

@ -6617,8 +6617,7 @@ bool is_downloading_state(int const st)
if (settings().get_bool(settings_pack::enable_outgoing_utp)
&& (!settings().get_bool(settings_pack::enable_outgoing_tcp)
|| peerinfo->supports_utp
|| peerinfo->confirmed_supports_utp)
&& m_ses.has_udp_outgoing_sockets())
|| peerinfo->confirmed_supports_utp))
{
sm = m_ses.utp_socket_manager();
}

View File

@ -821,7 +821,7 @@ utp_stream::endpoint_type utp_stream::local_endpoint(error_code& ec) const
return endpoint_type();
}
udp::endpoint ep = s->local_endpoint();
udp::endpoint ep = s->get_local_endpoint();
return endpoint_type(ep.address(), ep.port());
}