From ffb66ec156e6671ec34f0a908455afc4cbda3aa8 Mon Sep 17 00:00:00 2001 From: Arvid Norberg Date: Mon, 20 Oct 2014 20:44:05 +0000 Subject: [PATCH] transition tracker_connections to use shared_ptr instead of intrusive_ptr. optimize udp tracker connection lookups by using unordered map instead of linear search. --- ChangeLog | 1 + .../libtorrent/http_tracker_connection.hpp | 7 +- include/libtorrent/tracker_manager.hpp | 41 +++-- include/libtorrent/udp_tracker_connection.hpp | 14 +- src/http_tracker_connection.cpp | 8 +- src/tracker_manager.cpp | 157 ++++++++++++------ src/udp_tracker_connection.cpp | 37 +++-- 7 files changed, 174 insertions(+), 91 deletions(-) diff --git a/ChangeLog b/ChangeLog index 8ac70d50f..54690fc3d 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,4 @@ + * optimize UDP tracker packet handling * support SSL over uTP connections * support web seeds that resolve to multiple IPs * added auto-sequential feature. download well-seeded torrents in-order diff --git a/include/libtorrent/http_tracker_connection.hpp b/include/libtorrent/http_tracker_connection.hpp index 96eba7496..6b6a0d40e 100644 --- a/include/libtorrent/http_tracker_connection.hpp +++ b/include/libtorrent/http_tracker_connection.hpp @@ -83,8 +83,11 @@ namespace libtorrent private: - boost::intrusive_ptr self() - { return boost::intrusive_ptr(this); } + boost::shared_ptr shared_from_this() + { + return boost::static_pointer_cast( + tracker_connection::shared_from_this()); + } void on_filter(http_connection& c, std::vector& endpoints); void on_connect(http_connection& c); diff --git a/include/libtorrent/tracker_manager.hpp b/include/libtorrent/tracker_manager.hpp index f75ab3f05..7a96f6a63 100644 --- a/include/libtorrent/tracker_manager.hpp +++ b/include/libtorrent/tracker_manager.hpp @@ -44,10 +44,11 @@ POSSIBILITY OF SUCH DAMAGE. #endif #include +#include #include #include -#include #include +#include #ifdef _MSC_VER #pragma warning(pop) @@ -59,7 +60,6 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/peer_id.hpp" #include "libtorrent/peer.hpp" // peer_entry #include "libtorrent/deadline_timer.hpp" -#include "libtorrent/intrusive_ptr_base.hpp" #include "libtorrent/size_type.hpp" #include "libtorrent/union_endpoint.hpp" #include "libtorrent/udp_socket.hpp" // for udp_socket_observer @@ -73,6 +73,8 @@ namespace libtorrent class tracker_manager; struct timeout_handler; struct tracker_connection; + class udp_tracker_connection; + class http_tracker_connection; namespace aux { struct session_impl; } // returns -1 if gzip header is invalid or the header size in bytes @@ -222,7 +224,7 @@ namespace libtorrent }; struct TORRENT_EXTRA_EXPORT timeout_handler - : intrusive_ptr_base + : boost::enable_shared_from_this , boost::noncopyable { timeout_handler(io_service& str); @@ -241,9 +243,6 @@ namespace libtorrent void timeout_callback(error_code const&); - boost::intrusive_ptr self() - { return boost::intrusive_ptr(this); } - int m_completion_timeout; typedef mutex mutex_t; @@ -264,6 +263,7 @@ namespace libtorrent bool m_abort; }; + // TODO: 2 this class probably doesn't need to have virtual functions. struct TORRENT_EXTRA_EXPORT tracker_connection : timeout_handler { @@ -272,6 +272,9 @@ namespace libtorrent , io_service& ios , boost::weak_ptr r); + void update_transaction_id(boost::shared_ptr c + , boost::uint64_t tid); + boost::shared_ptr requester() const; virtual ~tracker_connection() {} @@ -290,8 +293,11 @@ namespace libtorrent , char const* /* hostname */ , char const* /* buf */, int /* size */) { return false; } - boost::intrusive_ptr self() - { return boost::intrusive_ptr(this); } + boost::shared_ptr shared_from_this() + { + return boost::static_pointer_cast( + timeout_handler::shared_from_this()); + } private: @@ -307,7 +313,9 @@ namespace libtorrent tracker_manager& m_man; }; - class TORRENT_EXTRA_EXPORT tracker_manager: public udp_socket_observer, boost::noncopyable + class TORRENT_EXTRA_EXPORT tracker_manager + : public udp_socket_observer + , boost::noncopyable { public: @@ -339,14 +347,23 @@ namespace libtorrent virtual bool incoming_packet(error_code const& e, char const* hostname , char const* buf, int size); + void update_transaction_id( + boost::shared_ptr c + , boost::uint64_t tid); + private: typedef mutex mutex_t; mutable mutex_t m_mutex; - typedef std::list > - tracker_connections_t; - tracker_connections_t m_connections; + // maps transactionid to the udp_tracker_connection + // TODO: 2 this should be unique_ptr in the future + typedef boost::unordered_map > udp_conns_t; + udp_conns_t m_udp_conns; + + typedef std::vector > http_conns_t; + http_conns_t m_http_conns; + aux::session_impl& m_ses; bool m_abort; }; diff --git a/include/libtorrent/udp_tracker_connection.hpp b/include/libtorrent/udp_tracker_connection.hpp index ee24b5232..7e8837c59 100644 --- a/include/libtorrent/udp_tracker_connection.hpp +++ b/include/libtorrent/udp_tracker_connection.hpp @@ -77,6 +77,8 @@ namespace libtorrent void start(); void close(); + boost::uint32_t transaction_id() const { return m_transaction_id; } + private: enum action_t @@ -87,8 +89,13 @@ namespace libtorrent action_error }; - boost::intrusive_ptr self() - { return boost::intrusive_ptr(this); } + boost::shared_ptr shared_from_this() + { + return boost::static_pointer_cast( + tracker_connection::shared_from_this()); + } + + void update_transaction_id(); void name_lookup(error_code const& error , std::vector
const& addresses, int port); @@ -116,6 +123,7 @@ namespace libtorrent udp::endpoint pick_target_endpoint() const; std::string m_hostname; + // TODO: 3 this should be a vector std::list m_endpoints; aux::session_impl& m_ses; @@ -133,7 +141,7 @@ namespace libtorrent udp::endpoint m_target; - int m_transaction_id; + boost::uint32_t m_transaction_id; int m_attempts; // action_t diff --git a/src/http_tracker_connection.cpp b/src/http_tracker_connection.cpp index 4d88fe47b..4f51d3030 100644 --- a/src/http_tracker_connection.cpp +++ b/src/http_tracker_connection.cpp @@ -205,10 +205,10 @@ namespace libtorrent } m_tracker_connection.reset(new http_connection(m_ios, m_ses.m_host_resolver - , boost::bind(&http_tracker_connection::on_response, self(), _1, _2, _3, _4) + , boost::bind(&http_tracker_connection::on_response, shared_from_this(), _1, _2, _3, _4) , true, settings.get_int(settings_pack::max_http_recv_buffer_size) - , boost::bind(&http_tracker_connection::on_connect, self(), _1) - , boost::bind(&http_tracker_connection::on_filter, self(), _1, _2) + , boost::bind(&http_tracker_connection::on_connect, shared_from_this(), _1) + , boost::bind(&http_tracker_connection::on_filter, shared_from_this(), _1, _2) #ifdef TORRENT_USE_OPENSSL , tracker_req().ssl_ctx #endif @@ -296,7 +296,7 @@ namespace libtorrent , http_parser const& parser, char const* data, int size) { // keep this alive - boost::intrusive_ptr me(this); + boost::shared_ptr me(shared_from_this()); if (ec && ec != asio::error::eof) { diff --git a/src/tracker_manager.cpp b/src/tracker_manager.cpp index 4aa5dd0b9..78913f902 100644 --- a/src/tracker_manager.cpp +++ b/src/tracker_manager.cpp @@ -34,6 +34,7 @@ POSSIBILITY OF SUCH DAMAGE. #include #include +#include #include "libtorrent/tracker_manager.hpp" #include "libtorrent/http_tracker_connection.hpp" @@ -89,7 +90,7 @@ namespace libtorrent error_code ec; m_timeout.expires_at(m_read_time + seconds(timeout), ec); m_timeout.async_wait(boost::bind( - &timeout_handler::timeout_callback, self(), _1)); + &timeout_handler::timeout_callback, shared_from_this(), _1)); } void timeout_handler::restart_read_timeout() @@ -140,7 +141,7 @@ namespace libtorrent error_code ec; m_timeout.expires_at(m_read_time + seconds(timeout), ec); m_timeout.async_wait( - boost::bind(&timeout_handler::timeout_callback, self(), _1)); + boost::bind(&timeout_handler::timeout_callback, shared_from_this(), _1)); } tracker_connection::tracker_connection( @@ -164,7 +165,7 @@ namespace libtorrent { // we need to post the error to avoid deadlock get_io_service().post(boost::bind(&tracker_connection::fail_impl - , self(), ec, code, std::string(msg), interval, min_interval)); + , shared_from_this(), ec, code, std::string(msg), interval, min_interval)); } void tracker_connection::fail_impl(error_code const& ec, int code @@ -176,6 +177,7 @@ namespace libtorrent close(); } + // TODO: 3 replace this with performance counters. remove depedency on session void tracker_connection::sent_bytes(int bytes) { m_man.sent_bytes(bytes); @@ -214,11 +216,32 @@ namespace libtorrent { mutex_t::scoped_lock l(m_mutex); - tracker_connections_t::iterator i = std::find(m_connections.begin() - , m_connections.end(), boost::intrusive_ptr(c)); - if (i == m_connections.end()) return; + http_conns_t::iterator i = std::find_if(m_http_conns.begin() + , m_http_conns.end() + , boost::bind(&boost::shared_ptr::get, _1) == c); + if (i != m_http_conns.end()) + { + m_http_conns.erase(i); + return; + } - m_connections.erase(i); + udp_conns_t::iterator j = std::find_if(m_udp_conns.begin() + , m_udp_conns.end() + , boost::bind(&boost::shared_ptr::get + , boost::bind(&udp_conns_t::value_type::second, _1)) == c); + if (j != m_udp_conns.end()) + { + m_udp_conns.erase(j); + return; + } + } + + void tracker_manager::update_transaction_id( + boost::shared_ptr c + , boost::uint64_t tid) + { + m_udp_conns.erase(c->transaction_id()); + m_udp_conns[tid] = c; } void tracker_manager::queue_request( @@ -240,91 +263,108 @@ namespace libtorrent std::string protocol = req.url.substr(0, req.url.find(':')); - boost::intrusive_ptr con; - #ifdef TORRENT_USE_OPENSSL if (protocol == "http" || protocol == "https") #else if (protocol == "http") #endif { - con = new http_tracker_connection( + boost::shared_ptr con + = boost::make_shared( ios, *this, req, c , m_ses, auth #if TORRENT_USE_I2P , &m_ses.m_i2p_conn #endif ); + m_http_conns.push_back(con); + con->start(); + return; } else if (protocol == "udp") { - con = new udp_tracker_connection( - ios, *this, req , c, m_ses, m_ses.proxy()); - } - else - { - // we need to post the error to avoid deadlock - if (boost::shared_ptr r = c.lock()) - ios.post(boost::bind(&request_callback::tracker_request_error, r, req - , -1, error_code(errors::unsupported_url_protocol) - , "", 0)); + boost::shared_ptr con + = boost::make_shared( + ios, *this, req , c, m_ses, m_ses.proxy()); + m_udp_conns[con->transaction_id()] = con; + con->start(); return; } - m_connections.push_back(con); - - boost::shared_ptr cb = con->requester(); - con->start(); + // we need to post the error to avoid deadlock + if (boost::shared_ptr r = c.lock()) + ios.post(boost::bind(&request_callback::tracker_request_error, r, req + , -1, error_code(errors::unsupported_url_protocol) + , "", 0)); } bool tracker_manager::incoming_packet(error_code const& e , udp::endpoint const& ep, char const* buf, int size) { - // m_ses.m_stat.received_tracker_bytes(len + 28); - for (tracker_connections_t::iterator i = m_connections.begin(); - i != m_connections.end();) - { - boost::intrusive_ptr p = *i; - ++i; - // on_receive() may remove the tracker connection from the list - if (p->on_receive(e, ep, buf, size)) return true; - } - return false; + // ignore packets smaller than 8 bytes + if (size < 8) return false; + + const char* ptr = buf + 4; + boost::uint32_t transaction = detail::read_uint32(ptr); + udp_conns_t::iterator i = m_udp_conns.find(transaction); + + if (i == m_udp_conns.end()) return false; + + boost::shared_ptr p = i->second; + // on_receive() may remove the tracker connection from the list + return p->on_receive(e, ep, buf, size); } bool tracker_manager::incoming_packet(error_code const& e , char const* hostname, char const* buf, int size) { - // m_ses.m_stat.received_tracker_bytes(len + 28); - for (tracker_connections_t::iterator i = m_connections.begin(); - i != m_connections.end();) - { - boost::intrusive_ptr p = *i; - ++i; - // on_receive() may remove the tracker connection from the list - if (p->on_receive_hostname(e, hostname, buf, size)) return true; - } - return false; + // ignore packets smaller than 8 bytes + if (size < 8) return false; + + const char* ptr = buf + 4; + boost::uint32_t transaction = detail::read_uint32(ptr); + udp_conns_t::iterator i = m_udp_conns.find(transaction); + + if (i == m_udp_conns.end()) return false; + + boost::shared_ptr p = i->second; + // on_receive() may remove the tracker connection from the list + return p->on_receive_hostname(e, hostname, buf, size); } void tracker_manager::abort_all_requests(bool all) { - // removes all connections from m_connections - // except 'event=stopped'-requests + // removes all connections except 'event=stopped'-requests mutex_t::scoped_lock l(m_mutex); m_abort = true; - tracker_connections_t close_connections; + http_conns_t close_http_connections; + std::vector > close_udp_connections; - for (tracker_connections_t::iterator i = m_connections.begin() - , end(m_connections.end()); i != end; ++i) + for (http_conns_t::iterator i = m_http_conns.begin() + , end(m_http_conns.end()); i != end; ++i) { - boost::intrusive_ptr c = *i; + http_tracker_connection* c = i->get(); tracker_request const& req = c->tracker_req(); if (req.event == tracker_request::stopped && !all) continue; - close_connections.push_back(c); + close_http_connections.push_back(*i); + +#if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING + boost::shared_ptr rc = c->requester(); + if (rc) rc->debug_log("aborting: %s", req.url.c_str()); +#endif + } + for (udp_conns_t::iterator i = m_udp_conns.begin() + , end(m_udp_conns.end()); i != end; ++i) + { + boost::shared_ptr c = i->second; + tracker_request const& req = c->tracker_req(); + if (req.event == tracker_request::stopped && !all) + continue; + + close_udp_connections.push_back(c); #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING boost::shared_ptr rc = c->requester(); @@ -333,8 +373,15 @@ namespace libtorrent } l.unlock(); - for (tracker_connections_t::iterator i = close_connections.begin() - , end(close_connections.end()); i != end; ++i) + for (http_conns_t::iterator i = close_http_connections.begin() + , end(close_http_connections.end()); i != end; ++i) + { + (*i)->close(); + } + + for (std::vector >::iterator i + = close_udp_connections.begin() + , end(close_udp_connections.end()); i != end; ++i) { (*i)->close(); } @@ -343,12 +390,12 @@ namespace libtorrent bool tracker_manager::empty() const { mutex_t::scoped_lock l(m_mutex); - return m_connections.empty(); + return m_http_conns.empty() && m_udp_conns.empty(); } int tracker_manager::num_requests() const { mutex_t::scoped_lock l(m_mutex); - return m_connections.size(); + return m_http_conns.size() + m_udp_conns.size(); } } diff --git a/src/udp_tracker_connection.cpp b/src/udp_tracker_connection.cpp index e72449335..c752f5972 100644 --- a/src/udp_tracker_connection.cpp +++ b/src/udp_tracker_connection.cpp @@ -77,6 +77,7 @@ namespace libtorrent , m_state(action_error) , m_abort(false) { + update_transaction_id(); } void udp_tracker_connection::start() @@ -120,7 +121,7 @@ namespace libtorrent ? resolver_interface::prefer_cache : 0 , boost::bind(&udp_tracker_connection::name_lookup - , self(), _1, _2, port)); + , shared_from_this(), _1, _2, port)); #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING boost::shared_ptr cb = requester(); @@ -165,7 +166,7 @@ namespace libtorrent , m_hostname.c_str(), print_endpoint(m_target).c_str()); #endif m_ses.m_io_service.post(boost::bind( - &udp_tracker_connection::start_announce, self())); + &udp_tracker_connection::start_announce, shared_from_this())); } void udp_tracker_connection::name_lookup(error_code const& error @@ -221,7 +222,7 @@ namespace libtorrent } } - // if all endpoints were filtered by the IP filter, we can't connect + // ir all endpoints were filtered by the IP filter, we can't connect if (m_endpoints.empty()) { fail(error_code(errors::banned_by_ip_filter)); @@ -357,7 +358,7 @@ namespace libtorrent const char* ptr = buf; int action = detail::read_int32(ptr); - int transaction = detail::read_int32(ptr); + boost::uint32_t transaction = detail::read_uint32(ptr); #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING if (cb) @@ -400,6 +401,20 @@ namespace libtorrent } return false; } + + void udp_tracker_connection::update_transaction_id() + { + boost::uint32_t new_tid; + + // don't use 0, because that has special meaning (unintialized) + do { + new_tid = random(); + } while (new_tid == 0); + + if (m_transaction_id != 0) + m_man.update_transaction_id(shared_from_this(), new_tid); + m_transaction_id = new_tid; + } bool udp_tracker_connection::on_connect_response(char const* buf, int size) { @@ -410,8 +425,7 @@ namespace libtorrent buf += 8; // skip header // reset transaction - m_transaction_id = 0; - m_attempts = 0; + update_transaction_id(); boost::uint64_t connection_id = detail::read_int64(buf); mutex::scoped_lock l(m_cache_mutex); @@ -442,8 +456,7 @@ namespace libtorrent char buf[16]; char* ptr = buf; - if (m_transaction_id == 0) - m_transaction_id = random() ^ (random() << 16); + TORRENT_ASSERT(m_transaction_id != 0); detail::write_uint32(0x417, ptr); detail::write_uint32(0x27101980, ptr); // connection_id @@ -472,9 +485,6 @@ namespace libtorrent void udp_tracker_connection::send_udp_scrape() { - if (m_transaction_id == 0) - m_transaction_id = random() ^ (random() << 16); - if (m_abort) return; std::map::iterator i @@ -579,7 +589,7 @@ namespace libtorrent { restart_read_timeout(); int action = detail::read_int32(buf); - int transaction = detail::read_int32(buf); + boost::uint32_t transaction = detail::read_uint32(buf); if (transaction != m_transaction_id) { @@ -625,9 +635,6 @@ namespace libtorrent void udp_tracker_connection::send_udp_announce() { - if (m_transaction_id == 0) - m_transaction_id = random() ^ (random() << 16); - if (m_abort) return; char buf[800];