transition tracker_connections to use shared_ptr instead of intrusive_ptr. optimize udp tracker connection lookups by using unordered map instead of linear search.

This commit is contained in:
Arvid Norberg 2014-10-20 20:44:05 +00:00
parent 22f054e2ff
commit ffb66ec156
7 changed files with 174 additions and 91 deletions

View File

@ -1,3 +1,4 @@
* optimize UDP tracker packet handling
* support SSL over uTP connections
* support web seeds that resolve to multiple IPs
* added auto-sequential feature. download well-seeded torrents in-order

View File

@ -83,8 +83,11 @@ namespace libtorrent
private:
boost::intrusive_ptr<http_tracker_connection> self()
{ return boost::intrusive_ptr<http_tracker_connection>(this); }
boost::shared_ptr<http_tracker_connection> shared_from_this()
{
return boost::static_pointer_cast<http_tracker_connection>(
tracker_connection::shared_from_this());
}
void on_filter(http_connection& c, std::vector<tcp::endpoint>& endpoints);
void on_connect(http_connection& c);

View File

@ -44,10 +44,11 @@ POSSIBILITY OF SUCH DAMAGE.
#endif
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/cstdint.hpp>
#include <boost/weak_ptr.hpp>
#include <boost/intrusive_ptr.hpp>
#include <boost/tuple/tuple.hpp>
#include <boost/unordered_map.hpp>
#ifdef _MSC_VER
#pragma warning(pop)
@ -59,7 +60,6 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/peer_id.hpp"
#include "libtorrent/peer.hpp" // peer_entry
#include "libtorrent/deadline_timer.hpp"
#include "libtorrent/intrusive_ptr_base.hpp"
#include "libtorrent/size_type.hpp"
#include "libtorrent/union_endpoint.hpp"
#include "libtorrent/udp_socket.hpp" // for udp_socket_observer
@ -73,6 +73,8 @@ namespace libtorrent
class tracker_manager;
struct timeout_handler;
struct tracker_connection;
class udp_tracker_connection;
class http_tracker_connection;
namespace aux { struct session_impl; }
// returns -1 if gzip header is invalid or the header size in bytes
@ -222,7 +224,7 @@ namespace libtorrent
};
struct TORRENT_EXTRA_EXPORT timeout_handler
: intrusive_ptr_base<timeout_handler>
: boost::enable_shared_from_this<timeout_handler>
, boost::noncopyable
{
timeout_handler(io_service& str);
@ -241,9 +243,6 @@ namespace libtorrent
void timeout_callback(error_code const&);
boost::intrusive_ptr<timeout_handler> self()
{ return boost::intrusive_ptr<timeout_handler>(this); }
int m_completion_timeout;
typedef mutex mutex_t;
@ -264,6 +263,7 @@ namespace libtorrent
bool m_abort;
};
// TODO: 2 this class probably doesn't need to have virtual functions.
struct TORRENT_EXTRA_EXPORT tracker_connection
: timeout_handler
{
@ -272,6 +272,9 @@ namespace libtorrent
, io_service& ios
, boost::weak_ptr<request_callback> r);
void update_transaction_id(boost::shared_ptr<udp_tracker_connection> c
, boost::uint64_t tid);
boost::shared_ptr<request_callback> requester() const;
virtual ~tracker_connection() {}
@ -290,8 +293,11 @@ namespace libtorrent
, char const* /* hostname */
, char const* /* buf */, int /* size */) { return false; }
boost::intrusive_ptr<tracker_connection> self()
{ return boost::intrusive_ptr<tracker_connection>(this); }
boost::shared_ptr<tracker_connection> shared_from_this()
{
return boost::static_pointer_cast<tracker_connection>(
timeout_handler::shared_from_this());
}
private:
@ -307,7 +313,9 @@ namespace libtorrent
tracker_manager& m_man;
};
class TORRENT_EXTRA_EXPORT tracker_manager: public udp_socket_observer, boost::noncopyable
class TORRENT_EXTRA_EXPORT tracker_manager
: public udp_socket_observer
, boost::noncopyable
{
public:
@ -339,14 +347,23 @@ namespace libtorrent
virtual bool incoming_packet(error_code const& e, char const* hostname
, char const* buf, int size);
void update_transaction_id(
boost::shared_ptr<udp_tracker_connection> c
, boost::uint64_t tid);
private:
typedef mutex mutex_t;
mutable mutex_t m_mutex;
typedef std::list<boost::intrusive_ptr<tracker_connection> >
tracker_connections_t;
tracker_connections_t m_connections;
// maps transactionid to the udp_tracker_connection
// TODO: 2 this should be unique_ptr in the future
typedef boost::unordered_map<boost::uint32_t, boost::shared_ptr<udp_tracker_connection> > udp_conns_t;
udp_conns_t m_udp_conns;
typedef std::vector<boost::shared_ptr<http_tracker_connection> > http_conns_t;
http_conns_t m_http_conns;
aux::session_impl& m_ses;
bool m_abort;
};

View File

@ -77,6 +77,8 @@ namespace libtorrent
void start();
void close();
boost::uint32_t transaction_id() const { return m_transaction_id; }
private:
enum action_t
@ -87,8 +89,13 @@ namespace libtorrent
action_error
};
boost::intrusive_ptr<udp_tracker_connection> self()
{ return boost::intrusive_ptr<udp_tracker_connection>(this); }
boost::shared_ptr<udp_tracker_connection> shared_from_this()
{
return boost::static_pointer_cast<udp_tracker_connection>(
tracker_connection::shared_from_this());
}
void update_transaction_id();
void name_lookup(error_code const& error
, std::vector<address> const& addresses, int port);
@ -116,6 +123,7 @@ namespace libtorrent
udp::endpoint pick_target_endpoint() const;
std::string m_hostname;
// TODO: 3 this should be a vector
std::list<tcp::endpoint> m_endpoints;
aux::session_impl& m_ses;
@ -133,7 +141,7 @@ namespace libtorrent
udp::endpoint m_target;
int m_transaction_id;
boost::uint32_t m_transaction_id;
int m_attempts;
// action_t

View File

@ -205,10 +205,10 @@ namespace libtorrent
}
m_tracker_connection.reset(new http_connection(m_ios, m_ses.m_host_resolver
, boost::bind(&http_tracker_connection::on_response, self(), _1, _2, _3, _4)
, boost::bind(&http_tracker_connection::on_response, shared_from_this(), _1, _2, _3, _4)
, true, settings.get_int(settings_pack::max_http_recv_buffer_size)
, boost::bind(&http_tracker_connection::on_connect, self(), _1)
, boost::bind(&http_tracker_connection::on_filter, self(), _1, _2)
, boost::bind(&http_tracker_connection::on_connect, shared_from_this(), _1)
, boost::bind(&http_tracker_connection::on_filter, shared_from_this(), _1, _2)
#ifdef TORRENT_USE_OPENSSL
, tracker_req().ssl_ctx
#endif
@ -296,7 +296,7 @@ namespace libtorrent
, http_parser const& parser, char const* data, int size)
{
// keep this alive
boost::intrusive_ptr<http_tracker_connection> me(this);
boost::shared_ptr<http_tracker_connection> me(shared_from_this());
if (ec && ec != asio::error::eof)
{

View File

@ -34,6 +34,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include <cctype>
#include <boost/bind.hpp>
#include <boost/make_shared.hpp>
#include "libtorrent/tracker_manager.hpp"
#include "libtorrent/http_tracker_connection.hpp"
@ -89,7 +90,7 @@ namespace libtorrent
error_code ec;
m_timeout.expires_at(m_read_time + seconds(timeout), ec);
m_timeout.async_wait(boost::bind(
&timeout_handler::timeout_callback, self(), _1));
&timeout_handler::timeout_callback, shared_from_this(), _1));
}
void timeout_handler::restart_read_timeout()
@ -140,7 +141,7 @@ namespace libtorrent
error_code ec;
m_timeout.expires_at(m_read_time + seconds(timeout), ec);
m_timeout.async_wait(
boost::bind(&timeout_handler::timeout_callback, self(), _1));
boost::bind(&timeout_handler::timeout_callback, shared_from_this(), _1));
}
tracker_connection::tracker_connection(
@ -164,7 +165,7 @@ namespace libtorrent
{
// we need to post the error to avoid deadlock
get_io_service().post(boost::bind(&tracker_connection::fail_impl
, self(), ec, code, std::string(msg), interval, min_interval));
, shared_from_this(), ec, code, std::string(msg), interval, min_interval));
}
void tracker_connection::fail_impl(error_code const& ec, int code
@ -176,6 +177,7 @@ namespace libtorrent
close();
}
// TODO: 3 replace this with performance counters. remove depedency on session
void tracker_connection::sent_bytes(int bytes)
{
m_man.sent_bytes(bytes);
@ -214,11 +216,32 @@ namespace libtorrent
{
mutex_t::scoped_lock l(m_mutex);
tracker_connections_t::iterator i = std::find(m_connections.begin()
, m_connections.end(), boost::intrusive_ptr<const tracker_connection>(c));
if (i == m_connections.end()) return;
http_conns_t::iterator i = std::find_if(m_http_conns.begin()
, m_http_conns.end()
, boost::bind(&boost::shared_ptr<http_tracker_connection>::get, _1) == c);
if (i != m_http_conns.end())
{
m_http_conns.erase(i);
return;
}
m_connections.erase(i);
udp_conns_t::iterator j = std::find_if(m_udp_conns.begin()
, m_udp_conns.end()
, boost::bind(&boost::shared_ptr<udp_tracker_connection>::get
, boost::bind(&udp_conns_t::value_type::second, _1)) == c);
if (j != m_udp_conns.end())
{
m_udp_conns.erase(j);
return;
}
}
void tracker_manager::update_transaction_id(
boost::shared_ptr<udp_tracker_connection> c
, boost::uint64_t tid)
{
m_udp_conns.erase(c->transaction_id());
m_udp_conns[tid] = c;
}
void tracker_manager::queue_request(
@ -240,91 +263,108 @@ namespace libtorrent
std::string protocol = req.url.substr(0, req.url.find(':'));
boost::intrusive_ptr<tracker_connection> con;
#ifdef TORRENT_USE_OPENSSL
if (protocol == "http" || protocol == "https")
#else
if (protocol == "http")
#endif
{
con = new http_tracker_connection(
boost::shared_ptr<http_tracker_connection> con
= boost::make_shared<http_tracker_connection>(
ios, *this, req, c
, m_ses, auth
#if TORRENT_USE_I2P
, &m_ses.m_i2p_conn
#endif
);
m_http_conns.push_back(con);
con->start();
return;
}
else if (protocol == "udp")
{
con = new udp_tracker_connection(
ios, *this, req , c, m_ses, m_ses.proxy());
}
else
{
// we need to post the error to avoid deadlock
if (boost::shared_ptr<request_callback> r = c.lock())
ios.post(boost::bind(&request_callback::tracker_request_error, r, req
, -1, error_code(errors::unsupported_url_protocol)
, "", 0));
boost::shared_ptr<udp_tracker_connection> con
= boost::make_shared<udp_tracker_connection>(
ios, *this, req , c, m_ses, m_ses.proxy());
m_udp_conns[con->transaction_id()] = con;
con->start();
return;
}
m_connections.push_back(con);
boost::shared_ptr<request_callback> cb = con->requester();
con->start();
// we need to post the error to avoid deadlock
if (boost::shared_ptr<request_callback> r = c.lock())
ios.post(boost::bind(&request_callback::tracker_request_error, r, req
, -1, error_code(errors::unsupported_url_protocol)
, "", 0));
}
bool tracker_manager::incoming_packet(error_code const& e
, udp::endpoint const& ep, char const* buf, int size)
{
// m_ses.m_stat.received_tracker_bytes(len + 28);
for (tracker_connections_t::iterator i = m_connections.begin();
i != m_connections.end();)
{
boost::intrusive_ptr<tracker_connection> p = *i;
++i;
// on_receive() may remove the tracker connection from the list
if (p->on_receive(e, ep, buf, size)) return true;
}
return false;
// ignore packets smaller than 8 bytes
if (size < 8) return false;
const char* ptr = buf + 4;
boost::uint32_t transaction = detail::read_uint32(ptr);
udp_conns_t::iterator i = m_udp_conns.find(transaction);
if (i == m_udp_conns.end()) return false;
boost::shared_ptr<tracker_connection> p = i->second;
// on_receive() may remove the tracker connection from the list
return p->on_receive(e, ep, buf, size);
}
bool tracker_manager::incoming_packet(error_code const& e
, char const* hostname, char const* buf, int size)
{
// m_ses.m_stat.received_tracker_bytes(len + 28);
for (tracker_connections_t::iterator i = m_connections.begin();
i != m_connections.end();)
{
boost::intrusive_ptr<tracker_connection> p = *i;
++i;
// on_receive() may remove the tracker connection from the list
if (p->on_receive_hostname(e, hostname, buf, size)) return true;
}
return false;
// ignore packets smaller than 8 bytes
if (size < 8) return false;
const char* ptr = buf + 4;
boost::uint32_t transaction = detail::read_uint32(ptr);
udp_conns_t::iterator i = m_udp_conns.find(transaction);
if (i == m_udp_conns.end()) return false;
boost::shared_ptr<tracker_connection> p = i->second;
// on_receive() may remove the tracker connection from the list
return p->on_receive_hostname(e, hostname, buf, size);
}
void tracker_manager::abort_all_requests(bool all)
{
// removes all connections from m_connections
// except 'event=stopped'-requests
// removes all connections except 'event=stopped'-requests
mutex_t::scoped_lock l(m_mutex);
m_abort = true;
tracker_connections_t close_connections;
http_conns_t close_http_connections;
std::vector<boost::shared_ptr<udp_tracker_connection> > close_udp_connections;
for (tracker_connections_t::iterator i = m_connections.begin()
, end(m_connections.end()); i != end; ++i)
for (http_conns_t::iterator i = m_http_conns.begin()
, end(m_http_conns.end()); i != end; ++i)
{
boost::intrusive_ptr<tracker_connection> c = *i;
http_tracker_connection* c = i->get();
tracker_request const& req = c->tracker_req();
if (req.event == tracker_request::stopped && !all)
continue;
close_connections.push_back(c);
close_http_connections.push_back(*i);
#if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING
boost::shared_ptr<request_callback> rc = c->requester();
if (rc) rc->debug_log("aborting: %s", req.url.c_str());
#endif
}
for (udp_conns_t::iterator i = m_udp_conns.begin()
, end(m_udp_conns.end()); i != end; ++i)
{
boost::shared_ptr<udp_tracker_connection> c = i->second;
tracker_request const& req = c->tracker_req();
if (req.event == tracker_request::stopped && !all)
continue;
close_udp_connections.push_back(c);
#if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING
boost::shared_ptr<request_callback> rc = c->requester();
@ -333,8 +373,15 @@ namespace libtorrent
}
l.unlock();
for (tracker_connections_t::iterator i = close_connections.begin()
, end(close_connections.end()); i != end; ++i)
for (http_conns_t::iterator i = close_http_connections.begin()
, end(close_http_connections.end()); i != end; ++i)
{
(*i)->close();
}
for (std::vector<boost::shared_ptr<udp_tracker_connection> >::iterator i
= close_udp_connections.begin()
, end(close_udp_connections.end()); i != end; ++i)
{
(*i)->close();
}
@ -343,12 +390,12 @@ namespace libtorrent
bool tracker_manager::empty() const
{
mutex_t::scoped_lock l(m_mutex);
return m_connections.empty();
return m_http_conns.empty() && m_udp_conns.empty();
}
int tracker_manager::num_requests() const
{
mutex_t::scoped_lock l(m_mutex);
return m_connections.size();
return m_http_conns.size() + m_udp_conns.size();
}
}

View File

@ -77,6 +77,7 @@ namespace libtorrent
, m_state(action_error)
, m_abort(false)
{
update_transaction_id();
}
void udp_tracker_connection::start()
@ -120,7 +121,7 @@ namespace libtorrent
? resolver_interface::prefer_cache
: 0
, boost::bind(&udp_tracker_connection::name_lookup
, self(), _1, _2, port));
, shared_from_this(), _1, _2, port));
#if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING
boost::shared_ptr<request_callback> cb = requester();
@ -165,7 +166,7 @@ namespace libtorrent
, m_hostname.c_str(), print_endpoint(m_target).c_str());
#endif
m_ses.m_io_service.post(boost::bind(
&udp_tracker_connection::start_announce, self()));
&udp_tracker_connection::start_announce, shared_from_this()));
}
void udp_tracker_connection::name_lookup(error_code const& error
@ -221,7 +222,7 @@ namespace libtorrent
}
}
// if all endpoints were filtered by the IP filter, we can't connect
// ir all endpoints were filtered by the IP filter, we can't connect
if (m_endpoints.empty())
{
fail(error_code(errors::banned_by_ip_filter));
@ -357,7 +358,7 @@ namespace libtorrent
const char* ptr = buf;
int action = detail::read_int32(ptr);
int transaction = detail::read_int32(ptr);
boost::uint32_t transaction = detail::read_uint32(ptr);
#if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING
if (cb)
@ -400,6 +401,20 @@ namespace libtorrent
}
return false;
}
void udp_tracker_connection::update_transaction_id()
{
boost::uint32_t new_tid;
// don't use 0, because that has special meaning (unintialized)
do {
new_tid = random();
} while (new_tid == 0);
if (m_transaction_id != 0)
m_man.update_transaction_id(shared_from_this(), new_tid);
m_transaction_id = new_tid;
}
bool udp_tracker_connection::on_connect_response(char const* buf, int size)
{
@ -410,8 +425,7 @@ namespace libtorrent
buf += 8; // skip header
// reset transaction
m_transaction_id = 0;
m_attempts = 0;
update_transaction_id();
boost::uint64_t connection_id = detail::read_int64(buf);
mutex::scoped_lock l(m_cache_mutex);
@ -442,8 +456,7 @@ namespace libtorrent
char buf[16];
char* ptr = buf;
if (m_transaction_id == 0)
m_transaction_id = random() ^ (random() << 16);
TORRENT_ASSERT(m_transaction_id != 0);
detail::write_uint32(0x417, ptr);
detail::write_uint32(0x27101980, ptr); // connection_id
@ -472,9 +485,6 @@ namespace libtorrent
void udp_tracker_connection::send_udp_scrape()
{
if (m_transaction_id == 0)
m_transaction_id = random() ^ (random() << 16);
if (m_abort) return;
std::map<address, connection_cache_entry>::iterator i
@ -579,7 +589,7 @@ namespace libtorrent
{
restart_read_timeout();
int action = detail::read_int32(buf);
int transaction = detail::read_int32(buf);
boost::uint32_t transaction = detail::read_uint32(buf);
if (transaction != m_transaction_id)
{
@ -625,9 +635,6 @@ namespace libtorrent
void udp_tracker_connection::send_udp_announce()
{
if (m_transaction_id == 0)
m_transaction_id = random() ^ (random() << 16);
if (m_abort) return;
char buf[800];