simplify the local service discovery logic to only deal with a single network. Instantiate LSD once per listen_socket_t instead of just once

This commit is contained in:
arvidn 2020-01-15 08:04:51 +01:00 committed by Arvid Norberg
parent 819eea722b
commit 2c8cf4834b
4 changed files with 135 additions and 62 deletions

View File

@ -237,6 +237,8 @@ namespace aux {
std::shared_ptr<natpmp> natpmp_mapper;
std::shared_ptr<upnp> upnp_mapper;
std::shared_ptr<struct lsd> lsd;
// set to true when we receive an incoming connection from this listen
// socket
bool incoming_connection = false;
@ -751,7 +753,7 @@ namespace aux {
bool verify_bound_address(address const& addr, bool utp
, error_code& ec) override;
bool has_lsd() const override { return m_lsd.get() != nullptr; }
bool has_lsd() const override;
std::vector<block_info>& block_info_storage() override { return m_block_info_storage; }
@ -1195,8 +1197,6 @@ namespace aux {
// this is deducted from the connect speed
int m_boost_connections = 0;
std::shared_ptr<lsd> m_lsd;
#if TORRENT_ABI_VERSION == 1
struct work_thread_t
{

View File

@ -43,7 +43,8 @@ namespace libtorrent {
struct lsd : std::enable_shared_from_this<lsd>
{
lsd(io_service& ios, aux::lsd_callback& cb);
lsd(io_service& ios, aux::lsd_callback& cb
, address const& listen_address, address const& netmask);
~lsd();
void start(error_code& ec);
@ -55,18 +56,18 @@ private:
std::shared_ptr<lsd> self() { return shared_from_this(); }
void announce_impl(sha1_hash const& ih, int listen_port
, int retry_count);
void announce_impl(sha1_hash const& ih, int listen_port, int retry_count);
void resend_announce(error_code const& e, sha1_hash const& ih
, int listen_port, int retry_count);
void on_announce(udp::endpoint const& from, span<char const> buffer);
void on_announce(error_code const& ec);
aux::lsd_callback& m_callback;
// the udp socket used to send and receive
// multicast messages on
broadcast_socket m_socket;
broadcast_socket m_socket6;
address m_listen_address;
address m_netmask;
udp::socket m_socket;
#ifndef TORRENT_DISABLE_LOGGING
bool should_log() const;
void debug_log(char const* fmt, ...) const TORRENT_FORMAT(2, 3);
@ -84,7 +85,6 @@ private:
int m_cookie;
bool m_disabled = false;
bool m_disabled6 = false;
};
}

View File

@ -43,6 +43,11 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/debug.hpp"
#include "libtorrent/hex.hpp" // to_hex, from_hex
#include "libtorrent/aux_/numeric_cast.hpp"
#include "libtorrent/enum_net.hpp"
#include "libtorrent/aux_/disable_warnings_push.hpp"
#include <boost/asio/ip/multicast.hpp>
#include "libtorrent/aux_/disable_warnings_pop.hpp"
using namespace std::placeholders;
@ -66,10 +71,12 @@ int render_lsd_packet(char* dst, int const len, int const listen_port
static error_code dummy;
lsd::lsd(io_service& ios, aux::lsd_callback& cb)
lsd::lsd(io_service& ios, aux::lsd_callback& cb
, address const& listen_address, address const& netmask)
: m_callback(cb)
, m_socket(udp::endpoint(make_address_v4("239.192.152.143", dummy), 6771))
, m_socket6(udp::endpoint(make_address_v6("ff15::efc0:988f", dummy), 6771))
, m_listen_address(listen_address)
, m_netmask(netmask)
, m_socket(ios)
, m_broadcast_timer(ios)
, m_cookie((random(0x7fffffff) ^ std::uintptr_t(this)) & 0x7fffffff)
{
@ -95,14 +102,39 @@ void lsd::debug_log(char const* fmt, ...) const
}
#endif
namespace {
address const lsd_multicast_addr4 = make_address_v4("239.192.152.143");
address const lsd_multicast_addr6 = make_address_v6("ff15::efc0:988f");
int const lsd_port = 6771;
}
void lsd::start(error_code& ec)
{
m_socket.open(std::bind(&lsd::on_announce, self(), _1, _2)
, lt::get_io_service(m_broadcast_timer), ec);
using namespace boost::asio::ip::multicast;
bool const v4 = m_listen_address.is_v4();
m_socket.open(v4 ? udp::v4() : udp::v6(), ec);
if (ec) return;
m_socket6.open(std::bind(&lsd::on_announce, self(), _1, _2)
, lt::get_io_service(m_broadcast_timer), ec);
m_socket.set_option(udp::socket::reuse_address(true), ec);
if (ec) return;
m_socket.bind(udp::endpoint(address_v4::any(), lsd_port), ec);
if (ec) return;
m_socket.set_option(join_group(v4 ? lsd_multicast_addr4 : lsd_multicast_addr6), ec);
if (ec) return;
m_socket.set_option(hops(32), ec);
if (ec) return;
m_socket.set_option(enable_loopback(true), ec);
if (ec) return;
if (v4)
{
m_socket.set_option(outbound_interface(m_listen_address.to_v4()), ec);
if (ec) return;
}
ADD_OUTSTANDING_ASYNC("lsd::on_announce");
m_socket.async_receive(boost::asio::null_buffers{}
, std::bind(&lsd::on_announce, self(), _1));
}
lsd::~lsd() = default;
@ -115,20 +147,30 @@ void lsd::announce(sha1_hash const& ih, int listen_port)
void lsd::announce_impl(sha1_hash const& ih, int const listen_port
, int retry_count)
{
if (m_disabled && m_disabled6) return;
if (m_disabled) return;
char msg[200];
#ifndef TORRENT_DISABLE_LOGGING
debug_log("==> LSD: ih: %s port: %u\n", aux::to_hex(ih).c_str(), listen_port);
#endif
error_code ec;
if (!m_disabled)
{
bool const v4 = m_listen_address.is_v4();
char const* v4_address = "239.192.152.143";
char const* v6_address = "[ff15::efc0:988f]";
int const msg_len = render_lsd_packet(msg, sizeof(msg), listen_port, aux::to_hex(ih).c_str()
, m_cookie, "239.192.152.143");
m_socket.send(msg, msg_len, ec);
, m_cookie, v4 ? v4_address : v6_address);
udp::endpoint const to(v4 ? lsd_multicast_addr4 : lsd_multicast_addr6
, lsd_port);
#ifndef TORRENT_DISABLE_LOGGING
debug_log("==> LSD: ih: %s port: %u [iface: %s]", aux::to_hex(ih).c_str()
, listen_port, m_listen_address.to_string().c_str());
#endif
m_socket.send_to(boost::asio::buffer(msg, static_cast<std::size_t>(msg_len))
, to, {}, ec);
if (ec)
{
m_disabled = true;
@ -142,28 +184,10 @@ void lsd::announce_impl(sha1_hash const& ih, int const listen_port
}
}
if (!m_disabled6)
{
int const msg_len = render_lsd_packet(msg, sizeof(msg), listen_port, aux::to_hex(ih).c_str()
, m_cookie, "[ff15::efc0:988f]");
m_socket6.send(msg, msg_len, ec);
if (ec)
{
m_disabled6 = true;
#ifndef TORRENT_DISABLE_LOGGING
if (should_log())
{
debug_log("*** LSD: failed to send message6: (%d) %s", ec.value()
, ec.message().c_str());
}
#endif
}
}
++retry_count;
if (retry_count >= 3) return;
if (m_disabled && m_disabled6) return;
if (m_disabled) return;
ADD_OUTSTANDING_ASYNC("lsd::resend_announce");
m_broadcast_timer.expires_from_now(seconds(2 * retry_count), ec);
@ -180,12 +204,43 @@ void lsd::resend_announce(error_code const& e, sha1_hash const& info_hash
announce_impl(info_hash, listen_port, retry_count);
}
void lsd::on_announce(udp::endpoint const& from, span<char const> buf)
void lsd::on_announce(error_code const& ec)
{
COMPLETE_ASYNC("lsd::on_announce");
if (ec) return;
std::array<char, 1500> buffer;
udp::endpoint from;
error_code err;
int const len = static_cast<int>(m_socket.receive_from(
boost::asio::buffer(buffer), from, {}, err));
ADD_OUTSTANDING_ASYNC("lsd::on_announce");
m_socket.async_receive(boost::asio::null_buffers{}
, std::bind(&lsd::on_announce, self(), _1));
if (!match_addr_mask(from.address(), m_listen_address, m_netmask))
{
// we don't care about this network. Ignore this packet
#ifndef TORRENT_DISABLE_LOGGING
debug_log("<== LSD: receive from out of network: %s"
, from.address().to_string().c_str());
#endif
return;
}
if (err)
{
#ifndef TORRENT_DISABLE_LOGGING
debug_log("<== LSD: receive error: %s", err.message().c_str());
#endif
return;
}
http_parser p;
bool error = false;
p.incoming(buf, error);
p.incoming(span<char const>{buffer.data(), len}, error);
if (!p.header_finished() || error)
{
@ -273,12 +328,10 @@ void lsd::on_announce(udp::endpoint const& from, span<char const> buf)
void lsd::close()
{
m_socket.close();
m_socket6.close();
error_code ec;
m_socket.close(ec);
m_broadcast_timer.cancel(ec);
m_disabled = true;
m_disabled6 = true;
}
} // libtorrent namespace

View File

@ -2004,6 +2004,8 @@ namespace aux {
remap_ports(remap_natpmp_and_upnp, *s);
}
update_lsd();
#if TORRENT_USE_I2P
open_new_incoming_i2p_connection();
#endif
@ -5022,6 +5024,12 @@ namespace aux {
, [&device](std::string const& s) { return s == device; });
}
bool session_impl::has_lsd() const
{
return std::any_of(m_listen_sockets.begin(), m_listen_sockets.end()
, [](std::shared_ptr<listen_socket_t> const& s) { return bool(s->lsd); });
}
void session_impl::remove_torrent(const torrent_handle& h
, remove_flags_t const options)
{
@ -5425,8 +5433,10 @@ namespace aux {
void session_impl::announce_lsd(sha1_hash const& ih, int port)
{
// use internal listen port for local peers
if (m_lsd)
m_lsd->announce(ih, port);
for (auto const& s : m_listen_sockets)
{
if (s->lsd) s->lsd->announce(ih, port);
}
}
void session_impl::on_lsd_peer(tcp::endpoint const& peer, sha1_hash const& ih)
@ -6638,13 +6648,20 @@ namespace aux {
{
INVARIANT_CHECK;
if (m_lsd) return;
m_lsd = std::make_shared<lsd>(m_io_service, *this);
error_code ec;
m_lsd->start(ec);
if (ec && m_alerts.should_post<lsd_error_alert>())
m_alerts.emplace_alert<lsd_error_alert>(ec);
for (auto& s : m_listen_sockets)
{
if (s->lsd) continue;
s->lsd = std::make_shared<lsd>(m_io_service, *this, s->local_endpoint.address()
, s->netmask);
error_code ec;
s->lsd->start(ec);
if (ec)
{
if (m_alerts.should_post<lsd_error_alert>())
m_alerts.emplace_alert<lsd_error_alert>(ec);
s->lsd.reset();
}
}
}
void session_impl::start_natpmp()
@ -6726,9 +6743,12 @@ namespace aux {
void session_impl::stop_lsd()
{
if (m_lsd)
m_lsd->close();
m_lsd.reset();
for (auto& s : m_listen_sockets)
{
if (!s->lsd) continue;
s->lsd->close();
s->lsd.reset();
}
}
void session_impl::stop_natpmp()