From 2c8cf4834bfb41ce82fc6f6575b668bee23064cd Mon Sep 17 00:00:00 2001 From: arvidn Date: Wed, 15 Jan 2020 08:04:51 +0100 Subject: [PATCH] simplify the local service discovery logic to only deal with a single network. Instantiate LSD once per listen_socket_t instead of just once --- include/libtorrent/aux_/session_impl.hpp | 6 +- include/libtorrent/lsd.hpp | 18 ++-- src/lsd.cpp | 129 ++++++++++++++++------- src/session_impl.cpp | 44 +++++--- 4 files changed, 135 insertions(+), 62 deletions(-) diff --git a/include/libtorrent/aux_/session_impl.hpp b/include/libtorrent/aux_/session_impl.hpp index 43979c90e..117e9adfd 100644 --- a/include/libtorrent/aux_/session_impl.hpp +++ b/include/libtorrent/aux_/session_impl.hpp @@ -237,6 +237,8 @@ namespace aux { std::shared_ptr natpmp_mapper; std::shared_ptr upnp_mapper; + std::shared_ptr lsd; + // set to true when we receive an incoming connection from this listen // socket bool incoming_connection = false; @@ -751,7 +753,7 @@ namespace aux { bool verify_bound_address(address const& addr, bool utp , error_code& ec) override; - bool has_lsd() const override { return m_lsd.get() != nullptr; } + bool has_lsd() const override; std::vector& block_info_storage() override { return m_block_info_storage; } @@ -1195,8 +1197,6 @@ namespace aux { // this is deducted from the connect speed int m_boost_connections = 0; - std::shared_ptr m_lsd; - #if TORRENT_ABI_VERSION == 1 struct work_thread_t { diff --git a/include/libtorrent/lsd.hpp b/include/libtorrent/lsd.hpp index 66effd542..39c036c92 100644 --- a/include/libtorrent/lsd.hpp +++ b/include/libtorrent/lsd.hpp @@ -43,7 +43,8 @@ namespace libtorrent { struct lsd : std::enable_shared_from_this { - lsd(io_service& ios, aux::lsd_callback& cb); + lsd(io_service& ios, aux::lsd_callback& cb + , address const& listen_address, address const& netmask); ~lsd(); void start(error_code& ec); @@ -55,18 +56,18 @@ private: std::shared_ptr self() { return shared_from_this(); } - void announce_impl(sha1_hash const& ih, int listen_port - , int retry_count); + void announce_impl(sha1_hash const& ih, int listen_port, int retry_count); void resend_announce(error_code const& e, sha1_hash const& ih , int listen_port, int retry_count); - void on_announce(udp::endpoint const& from, span buffer); + void on_announce(error_code const& ec); aux::lsd_callback& m_callback; - // the udp socket used to send and receive - // multicast messages on - broadcast_socket m_socket; - broadcast_socket m_socket6; + address m_listen_address; + address m_netmask; + + udp::socket m_socket; + #ifndef TORRENT_DISABLE_LOGGING bool should_log() const; void debug_log(char const* fmt, ...) const TORRENT_FORMAT(2, 3); @@ -84,7 +85,6 @@ private: int m_cookie; bool m_disabled = false; - bool m_disabled6 = false; }; } diff --git a/src/lsd.cpp b/src/lsd.cpp index 14a0aa767..5d41d47c6 100644 --- a/src/lsd.cpp +++ b/src/lsd.cpp @@ -43,6 +43,11 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/debug.hpp" #include "libtorrent/hex.hpp" // to_hex, from_hex #include "libtorrent/aux_/numeric_cast.hpp" +#include "libtorrent/enum_net.hpp" + +#include "libtorrent/aux_/disable_warnings_push.hpp" +#include +#include "libtorrent/aux_/disable_warnings_pop.hpp" using namespace std::placeholders; @@ -66,10 +71,12 @@ int render_lsd_packet(char* dst, int const len, int const listen_port static error_code dummy; -lsd::lsd(io_service& ios, aux::lsd_callback& cb) +lsd::lsd(io_service& ios, aux::lsd_callback& cb + , address const& listen_address, address const& netmask) : m_callback(cb) - , m_socket(udp::endpoint(make_address_v4("239.192.152.143", dummy), 6771)) - , m_socket6(udp::endpoint(make_address_v6("ff15::efc0:988f", dummy), 6771)) + , m_listen_address(listen_address) + , m_netmask(netmask) + , m_socket(ios) , m_broadcast_timer(ios) , m_cookie((random(0x7fffffff) ^ std::uintptr_t(this)) & 0x7fffffff) { @@ -95,14 +102,39 @@ void lsd::debug_log(char const* fmt, ...) const } #endif +namespace { + address const lsd_multicast_addr4 = make_address_v4("239.192.152.143"); + address const lsd_multicast_addr6 = make_address_v6("ff15::efc0:988f"); + int const lsd_port = 6771; +} + void lsd::start(error_code& ec) { - m_socket.open(std::bind(&lsd::on_announce, self(), _1, _2) - , lt::get_io_service(m_broadcast_timer), ec); + using namespace boost::asio::ip::multicast; + bool const v4 = m_listen_address.is_v4(); + m_socket.open(v4 ? udp::v4() : udp::v6(), ec); if (ec) return; - m_socket6.open(std::bind(&lsd::on_announce, self(), _1, _2) - , lt::get_io_service(m_broadcast_timer), ec); + m_socket.set_option(udp::socket::reuse_address(true), ec); + if (ec) return; + + m_socket.bind(udp::endpoint(address_v4::any(), lsd_port), ec); + if (ec) return; + m_socket.set_option(join_group(v4 ? lsd_multicast_addr4 : lsd_multicast_addr6), ec); + if (ec) return; + m_socket.set_option(hops(32), ec); + if (ec) return; + m_socket.set_option(enable_loopback(true), ec); + if (ec) return; + if (v4) + { + m_socket.set_option(outbound_interface(m_listen_address.to_v4()), ec); + if (ec) return; + } + + ADD_OUTSTANDING_ASYNC("lsd::on_announce"); + m_socket.async_receive(boost::asio::null_buffers{} + , std::bind(&lsd::on_announce, self(), _1)); } lsd::~lsd() = default; @@ -115,20 +147,30 @@ void lsd::announce(sha1_hash const& ih, int listen_port) void lsd::announce_impl(sha1_hash const& ih, int const listen_port , int retry_count) { - if (m_disabled && m_disabled6) return; + if (m_disabled) return; char msg[200]; -#ifndef TORRENT_DISABLE_LOGGING - debug_log("==> LSD: ih: %s port: %u\n", aux::to_hex(ih).c_str(), listen_port); -#endif - error_code ec; if (!m_disabled) { + bool const v4 = m_listen_address.is_v4(); + char const* v4_address = "239.192.152.143"; + char const* v6_address = "[ff15::efc0:988f]"; + int const msg_len = render_lsd_packet(msg, sizeof(msg), listen_port, aux::to_hex(ih).c_str() - , m_cookie, "239.192.152.143"); - m_socket.send(msg, msg_len, ec); + , m_cookie, v4 ? v4_address : v6_address); + + udp::endpoint const to(v4 ? lsd_multicast_addr4 : lsd_multicast_addr6 + , lsd_port); + +#ifndef TORRENT_DISABLE_LOGGING + debug_log("==> LSD: ih: %s port: %u [iface: %s]", aux::to_hex(ih).c_str() + , listen_port, m_listen_address.to_string().c_str()); +#endif + + m_socket.send_to(boost::asio::buffer(msg, static_cast(msg_len)) + , to, {}, ec); if (ec) { m_disabled = true; @@ -142,28 +184,10 @@ void lsd::announce_impl(sha1_hash const& ih, int const listen_port } } - if (!m_disabled6) - { - int const msg_len = render_lsd_packet(msg, sizeof(msg), listen_port, aux::to_hex(ih).c_str() - , m_cookie, "[ff15::efc0:988f]"); - m_socket6.send(msg, msg_len, ec); - if (ec) - { - m_disabled6 = true; -#ifndef TORRENT_DISABLE_LOGGING - if (should_log()) - { - debug_log("*** LSD: failed to send message6: (%d) %s", ec.value() - , ec.message().c_str()); - } -#endif - } - } - ++retry_count; if (retry_count >= 3) return; - if (m_disabled && m_disabled6) return; + if (m_disabled) return; ADD_OUTSTANDING_ASYNC("lsd::resend_announce"); m_broadcast_timer.expires_from_now(seconds(2 * retry_count), ec); @@ -180,12 +204,43 @@ void lsd::resend_announce(error_code const& e, sha1_hash const& info_hash announce_impl(info_hash, listen_port, retry_count); } -void lsd::on_announce(udp::endpoint const& from, span buf) +void lsd::on_announce(error_code const& ec) { + COMPLETE_ASYNC("lsd::on_announce"); + if (ec) return; + + std::array buffer; + udp::endpoint from; + error_code err; + int const len = static_cast(m_socket.receive_from( + boost::asio::buffer(buffer), from, {}, err)); + + ADD_OUTSTANDING_ASYNC("lsd::on_announce"); + m_socket.async_receive(boost::asio::null_buffers{} + , std::bind(&lsd::on_announce, self(), _1)); + + if (!match_addr_mask(from.address(), m_listen_address, m_netmask)) + { + // we don't care about this network. Ignore this packet +#ifndef TORRENT_DISABLE_LOGGING + debug_log("<== LSD: receive from out of network: %s" + , from.address().to_string().c_str()); +#endif + return; + } + + if (err) + { +#ifndef TORRENT_DISABLE_LOGGING + debug_log("<== LSD: receive error: %s", err.message().c_str()); +#endif + return; + } + http_parser p; bool error = false; - p.incoming(buf, error); + p.incoming(span{buffer.data(), len}, error); if (!p.header_finished() || error) { @@ -273,12 +328,10 @@ void lsd::on_announce(udp::endpoint const& from, span buf) void lsd::close() { - m_socket.close(); - m_socket6.close(); error_code ec; + m_socket.close(ec); m_broadcast_timer.cancel(ec); m_disabled = true; - m_disabled6 = true; } } // libtorrent namespace diff --git a/src/session_impl.cpp b/src/session_impl.cpp index bedb33c92..24ae737c1 100644 --- a/src/session_impl.cpp +++ b/src/session_impl.cpp @@ -2004,6 +2004,8 @@ namespace aux { remap_ports(remap_natpmp_and_upnp, *s); } + update_lsd(); + #if TORRENT_USE_I2P open_new_incoming_i2p_connection(); #endif @@ -5022,6 +5024,12 @@ namespace aux { , [&device](std::string const& s) { return s == device; }); } + bool session_impl::has_lsd() const + { + return std::any_of(m_listen_sockets.begin(), m_listen_sockets.end() + , [](std::shared_ptr const& s) { return bool(s->lsd); }); + } + void session_impl::remove_torrent(const torrent_handle& h , remove_flags_t const options) { @@ -5425,8 +5433,10 @@ namespace aux { void session_impl::announce_lsd(sha1_hash const& ih, int port) { // use internal listen port for local peers - if (m_lsd) - m_lsd->announce(ih, port); + for (auto const& s : m_listen_sockets) + { + if (s->lsd) s->lsd->announce(ih, port); + } } void session_impl::on_lsd_peer(tcp::endpoint const& peer, sha1_hash const& ih) @@ -6638,13 +6648,20 @@ namespace aux { { INVARIANT_CHECK; - if (m_lsd) return; - - m_lsd = std::make_shared(m_io_service, *this); - error_code ec; - m_lsd->start(ec); - if (ec && m_alerts.should_post()) - m_alerts.emplace_alert(ec); + for (auto& s : m_listen_sockets) + { + if (s->lsd) continue; + s->lsd = std::make_shared(m_io_service, *this, s->local_endpoint.address() + , s->netmask); + error_code ec; + s->lsd->start(ec); + if (ec) + { + if (m_alerts.should_post()) + m_alerts.emplace_alert(ec); + s->lsd.reset(); + } + } } void session_impl::start_natpmp() @@ -6726,9 +6743,12 @@ namespace aux { void session_impl::stop_lsd() { - if (m_lsd) - m_lsd->close(); - m_lsd.reset(); + for (auto& s : m_listen_sockets) + { + if (!s->lsd) continue; + s->lsd->close(); + s->lsd.reset(); + } } void session_impl::stop_natpmp()