added a strand for all async operations for thread safety
This commit is contained in:
parent
c567d06406
commit
13facbdb83
|
@ -273,7 +273,8 @@ namespace libtorrent
|
|||
// this is where all active sockets are stored.
|
||||
// the selector can sleep while there's no activity on
|
||||
// them
|
||||
demuxer m_selector;
|
||||
io_service m_io_service;
|
||||
asio::strand m_strand;
|
||||
|
||||
tracker_manager m_tracker_manager;
|
||||
torrent_map m_torrents;
|
||||
|
|
|
@ -110,7 +110,7 @@ namespace libtorrent
|
|||
public:
|
||||
|
||||
http_tracker_connection(
|
||||
demuxer& d
|
||||
asio::strand& str
|
||||
, tracker_manager& man
|
||||
, tracker_request const& req
|
||||
, std::string const& hostname
|
||||
|
@ -145,6 +145,7 @@ namespace libtorrent
|
|||
tracker_manager& m_man;
|
||||
http_parser m_parser;
|
||||
|
||||
asio::strand& m_strand;
|
||||
tcp::resolver m_name_lookup;
|
||||
int m_port;
|
||||
boost::shared_ptr<stream_socket> m_socket;
|
||||
|
|
|
@ -62,7 +62,7 @@ namespace libtorrent { namespace dht
|
|||
|
||||
struct dht_tracker
|
||||
{
|
||||
dht_tracker(asio::io_service& d, dht_settings const& settings
|
||||
dht_tracker(asio::io_service& ios, dht_settings const& settings
|
||||
, asio::ip::address listen_interface, entry const& bootstrap);
|
||||
|
||||
void add_node(udp::endpoint node);
|
||||
|
@ -95,7 +95,7 @@ namespace libtorrent { namespace dht
|
|||
void on_bootstrap();
|
||||
void send_packet(msg const& m);
|
||||
|
||||
asio::io_service& m_demuxer;
|
||||
asio::strand m_strand;
|
||||
asio::ip::udp::socket m_socket;
|
||||
|
||||
node_impl m_dht;
|
||||
|
|
|
@ -50,6 +50,7 @@ POSSIBILITY OF SUCH DAMAGE.
|
|||
#include <asio/io_service.hpp>
|
||||
#include <asio/deadline_timer.hpp>
|
||||
#include <asio/write.hpp>
|
||||
#include <asio/strand.hpp>
|
||||
|
||||
#ifdef __OBJC__
|
||||
#undef Protocol
|
||||
|
@ -71,7 +72,7 @@ namespace libtorrent
|
|||
using boost::asio::stream_socket;
|
||||
using boost::asio::datagram_socket;
|
||||
using boost::asio::socket_acceptor;
|
||||
using boost::asio::demuxer;
|
||||
using boost::asio::io_service;
|
||||
using boost::asio::ipv4::host_resolver;
|
||||
using boost::asio::async_write;
|
||||
using boost::asio::ipv4::host;
|
||||
|
@ -87,7 +88,7 @@ namespace libtorrent
|
|||
typedef asio::ip::address_v6 address_v6;
|
||||
typedef asio::ip::udp::socket datagram_socket;
|
||||
typedef asio::ip::tcp::acceptor socket_acceptor;
|
||||
typedef asio::io_service demuxer;
|
||||
typedef asio::io_service io_service;
|
||||
|
||||
using asio::async_write;
|
||||
using asio::deadline_timer;
|
||||
|
|
|
@ -158,7 +158,7 @@ namespace libtorrent
|
|||
friend void intrusive_ptr_add_ref(timeout_handler const*);
|
||||
friend void intrusive_ptr_release(timeout_handler const*);
|
||||
|
||||
timeout_handler(demuxer& d);
|
||||
timeout_handler(asio::strand& str);
|
||||
|
||||
void set_timeout(int completion_timeout, int read_timeout);
|
||||
void restart_read_timeout();
|
||||
|
@ -174,7 +174,7 @@ namespace libtorrent
|
|||
boost::intrusive_ptr<timeout_handler> self()
|
||||
{ return boost::intrusive_ptr<timeout_handler>(this); }
|
||||
|
||||
demuxer& m_demuxer;
|
||||
asio::strand& m_strand;
|
||||
// used for timeouts
|
||||
// this is set when the request has been sent
|
||||
boost::posix_time::ptime m_start_time;
|
||||
|
@ -196,7 +196,7 @@ namespace libtorrent
|
|||
{
|
||||
tracker_connection(tracker_manager& man
|
||||
, tracker_request req
|
||||
, demuxer& d
|
||||
, asio::strand& str
|
||||
, boost::weak_ptr<request_callback> r);
|
||||
|
||||
request_callback& requester();
|
||||
|
@ -224,7 +224,7 @@ namespace libtorrent
|
|||
: m_settings(s) {}
|
||||
|
||||
void queue_request(
|
||||
demuxer& d
|
||||
asio::strand& str
|
||||
, tracker_request r
|
||||
, std::string const& auth
|
||||
, boost::weak_ptr<request_callback> c
|
||||
|
|
|
@ -66,7 +66,7 @@ namespace libtorrent
|
|||
public:
|
||||
|
||||
udp_tracker_connection(
|
||||
demuxer& d
|
||||
asio::strand& str
|
||||
, tracker_manager& man
|
||||
, tracker_request const& req
|
||||
, std::string const& hostname
|
||||
|
@ -103,6 +103,7 @@ namespace libtorrent
|
|||
|
||||
tracker_manager& m_man;
|
||||
|
||||
asio::strand& m_strand;
|
||||
tcp::resolver m_name_lookup;
|
||||
int m_port;
|
||||
boost::shared_ptr<datagram_socket> m_socket;
|
||||
|
|
|
@ -217,7 +217,7 @@ namespace libtorrent
|
|||
}
|
||||
|
||||
http_tracker_connection::http_tracker_connection(
|
||||
demuxer& d
|
||||
asio::strand& str
|
||||
, tracker_manager& man
|
||||
, tracker_request const& req
|
||||
, std::string const& hostname
|
||||
|
@ -226,9 +226,10 @@ namespace libtorrent
|
|||
, boost::weak_ptr<request_callback> c
|
||||
, session_settings const& stn
|
||||
, std::string const& auth)
|
||||
: tracker_connection(man, req, d, c)
|
||||
: tracker_connection(man, req, str, c)
|
||||
, m_man(man)
|
||||
, m_name_lookup(d)
|
||||
, m_strand(str)
|
||||
, m_name_lookup(m_strand.io_service())
|
||||
, m_port(port)
|
||||
, m_recv_pos(0)
|
||||
, m_buffer(http_buffer_size)
|
||||
|
@ -368,8 +369,8 @@ namespace libtorrent
|
|||
|
||||
tcp::resolver::query q(*connect_to_host
|
||||
, boost::lexical_cast<std::string>(m_port));
|
||||
m_name_lookup.async_resolve(q
|
||||
, boost::bind(&http_tracker_connection::name_lookup, self(), _1, _2));
|
||||
m_name_lookup.async_resolve(q, m_strand.wrap(
|
||||
boost::bind(&http_tracker_connection::name_lookup, self(), _1, _2)));
|
||||
set_timeout(m_settings.tracker_completion_timeout
|
||||
, m_settings.tracker_receive_timeout);
|
||||
}
|
||||
|
@ -572,7 +573,7 @@ namespace libtorrent
|
|||
else
|
||||
req.url.assign(location.begin(), location.begin() + i);
|
||||
|
||||
m_man.queue_request(m_socket->io_service(), req
|
||||
m_man.queue_request(m_strand, req
|
||||
, m_password, m_requester);
|
||||
close();
|
||||
return;
|
||||
|
|
|
@ -132,20 +132,20 @@ namespace libtorrent { namespace dht
|
|||
|
||||
// class that puts the networking and the kademlia node in a single
|
||||
// unit and connecting them together.
|
||||
dht_tracker::dht_tracker(asio::io_service& d, dht_settings const& settings
|
||||
dht_tracker::dht_tracker(asio::io_service& ios, dht_settings const& settings
|
||||
, asio::ip::address listen_interface, entry const& bootstrap)
|
||||
: m_demuxer(d)
|
||||
, m_socket(m_demuxer, udp::endpoint(listen_interface, settings.service_port))
|
||||
: m_strand(ios)
|
||||
, m_socket(ios, udp::endpoint(listen_interface, settings.service_port))
|
||||
, m_dht(bind(&dht_tracker::send_packet, this, _1), settings
|
||||
, read_id(bootstrap))
|
||||
, m_buffer(0)
|
||||
, m_last_refresh(second_clock::universal_time() - hours(1))
|
||||
, m_timer(m_demuxer)
|
||||
, m_connection_timer(m_demuxer)
|
||||
, m_refresh_timer(m_demuxer)
|
||||
, m_timer(ios)
|
||||
, m_connection_timer(ios)
|
||||
, m_refresh_timer(ios)
|
||||
, m_settings(settings)
|
||||
, m_refresh_bucket(160)
|
||||
, m_host_resolver(d)
|
||||
, m_host_resolver(ios)
|
||||
{
|
||||
using boost::bind;
|
||||
|
||||
|
@ -191,15 +191,16 @@ namespace libtorrent { namespace dht
|
|||
|
||||
m_socket.async_receive_from(asio::buffer(&m_in_buf[m_buffer][0]
|
||||
, m_in_buf[m_buffer].size()), m_remote_endpoint[m_buffer]
|
||||
, bind(&dht_tracker::on_receive, this, _1, _2));
|
||||
, m_strand.wrap(bind(&dht_tracker::on_receive, this, _1, _2)));
|
||||
m_timer.expires_from_now(seconds(1));
|
||||
m_timer.async_wait(bind(&dht_tracker::tick, this, _1));
|
||||
m_timer.async_wait(m_strand.wrap(bind(&dht_tracker::tick, this, _1)));
|
||||
|
||||
m_connection_timer.expires_from_now(seconds(10));
|
||||
m_connection_timer.async_wait(bind(&dht_tracker::connection_timeout, this, _1));
|
||||
m_connection_timer.async_wait(m_strand.wrap(
|
||||
bind(&dht_tracker::connection_timeout, this, _1)));
|
||||
|
||||
m_refresh_timer.expires_from_now(minutes(15));
|
||||
m_refresh_timer.async_wait(bind(&dht_tracker::refresh_timeout, this, _1));
|
||||
m_refresh_timer.async_wait(m_strand.wrap(bind(&dht_tracker::refresh_timeout, this, _1)));
|
||||
}
|
||||
|
||||
void dht_tracker::dht_status(session_status& s)
|
||||
|
@ -214,7 +215,7 @@ namespace libtorrent { namespace dht
|
|||
if (e) return;
|
||||
time_duration d = m_dht.connection_timeout();
|
||||
m_connection_timer.expires_from_now(d);
|
||||
m_connection_timer.async_wait(bind(&dht_tracker::connection_timeout, this, _1));
|
||||
m_connection_timer.async_wait(m_strand.wrap(bind(&dht_tracker::connection_timeout, this, _1)));
|
||||
}
|
||||
catch (std::exception& exc)
|
||||
{
|
||||
|
@ -229,7 +230,8 @@ namespace libtorrent { namespace dht
|
|||
if (e) return;
|
||||
time_duration d = m_dht.refresh_timeout();
|
||||
m_refresh_timer.expires_from_now(d);
|
||||
m_refresh_timer.async_wait(bind(&dht_tracker::refresh_timeout, this, _1));
|
||||
m_refresh_timer.async_wait(m_strand.wrap(
|
||||
bind(&dht_tracker::refresh_timeout, this, _1)));
|
||||
}
|
||||
catch (std::exception&)
|
||||
{
|
||||
|
@ -248,7 +250,7 @@ namespace libtorrent { namespace dht
|
|||
{
|
||||
if (e) return;
|
||||
m_timer.expires_from_now(minutes(tick_period));
|
||||
m_timer.async_wait(bind(&dht_tracker::tick, this, _1));
|
||||
m_timer.async_wait(m_strand.wrap(bind(&dht_tracker::tick, this, _1)));
|
||||
|
||||
m_dht.new_write_key();
|
||||
|
||||
|
@ -357,7 +359,7 @@ namespace libtorrent { namespace dht
|
|||
m_buffer = (m_buffer + 1) & 1;
|
||||
m_socket.async_receive_from(asio::buffer(&m_in_buf[m_buffer][0]
|
||||
, m_in_buf[m_buffer].size()), m_remote_endpoint[m_buffer]
|
||||
, bind(&dht_tracker::on_receive, this, _1, _2));
|
||||
, m_strand.wrap(bind(&dht_tracker::on_receive, this, _1, _2)));
|
||||
|
||||
if (error) return;
|
||||
|
||||
|
@ -650,8 +652,8 @@ namespace libtorrent { namespace dht
|
|||
void dht_tracker::add_node(std::pair<std::string, int> const& node)
|
||||
{
|
||||
udp::resolver::query q(node.first, lexical_cast<std::string>(node.second));
|
||||
m_host_resolver.async_resolve(q, bind(&dht_tracker::on_name_lookup
|
||||
, this, _1, _2));
|
||||
m_host_resolver.async_resolve(q, m_strand.wrap(
|
||||
bind(&dht_tracker::on_name_lookup, this, _1, _2)));
|
||||
}
|
||||
|
||||
void dht_tracker::on_name_lookup(asio::error_code const& e
|
||||
|
@ -668,8 +670,8 @@ namespace libtorrent { namespace dht
|
|||
void dht_tracker::add_router_node(std::pair<std::string, int> const& node)
|
||||
{
|
||||
udp::resolver::query q(node.first, lexical_cast<std::string>(node.second));
|
||||
m_host_resolver.async_resolve(q, bind(&dht_tracker::on_router_name_lookup
|
||||
, this, _1, _2));
|
||||
m_host_resolver.async_resolve(q, m_strand.wrap(
|
||||
bind(&dht_tracker::on_router_name_lookup, this, _1, _2)));
|
||||
}
|
||||
|
||||
void dht_tracker::on_router_name_lookup(asio::error_code const& e
|
||||
|
|
|
@ -615,13 +615,10 @@ namespace libtorrent
|
|||
// be requested from other peers
|
||||
t->picker().abort_download(*i);
|
||||
}
|
||||
|
||||
m_download_queue.clear();
|
||||
m_request_queue.clear();
|
||||
}
|
||||
|
||||
assert(m_download_queue.empty());
|
||||
assert(m_request_queue.empty());
|
||||
m_download_queue.clear();
|
||||
m_request_queue.clear();
|
||||
|
||||
#ifndef NDEBUG
|
||||
// t->picker().integrity_check(m_torrent);
|
||||
|
@ -1474,7 +1471,7 @@ namespace libtorrent
|
|||
|
||||
if (m_disconnecting) return;
|
||||
m_disconnecting = true;
|
||||
m_ses.m_selector.post(boost::bind(&close_socket_ignore_error, m_socket));
|
||||
m_ses.m_io_service.post(boost::bind(&close_socket_ignore_error, m_socket));
|
||||
|
||||
boost::shared_ptr<torrent> t = m_torrent.lock();
|
||||
|
||||
|
|
|
@ -474,7 +474,8 @@ namespace libtorrent { namespace detail
|
|||
std::pair<int, int> listen_port_range
|
||||
, fingerprint const& cl_fprint
|
||||
, char const* listen_interface)
|
||||
: m_tracker_manager(m_settings)
|
||||
: m_strand(m_io_service)
|
||||
, m_tracker_manager(m_settings)
|
||||
, m_listen_port_range(listen_port_range)
|
||||
, m_listen_interface(address::from_string(listen_interface), listen_port_range.first)
|
||||
, m_abort(false)
|
||||
|
@ -486,7 +487,7 @@ namespace libtorrent { namespace detail
|
|||
, m_incoming_connection(false)
|
||||
, m_files(40)
|
||||
, m_last_tick(microsec_clock::universal_time())
|
||||
, m_timer(m_selector)
|
||||
, m_timer(m_io_service)
|
||||
, m_checker_impl(*this)
|
||||
{
|
||||
|
||||
|
@ -530,7 +531,8 @@ namespace libtorrent { namespace detail
|
|||
}
|
||||
|
||||
m_timer.expires_from_now(seconds(1));
|
||||
m_timer.async_wait(bind(&session_impl::second_tick, this, _1));
|
||||
m_timer.async_wait(m_strand.wrap(
|
||||
bind(&session_impl::second_tick, this, _1)));
|
||||
|
||||
m_thread.reset(new boost::thread(boost::ref(*this)));
|
||||
m_checker_thread.reset(new boost::thread(boost::ref(m_checker_impl)));
|
||||
|
@ -557,7 +559,7 @@ namespace libtorrent { namespace detail
|
|||
assert(!m_abort);
|
||||
// abort the main thread
|
||||
m_abort = true;
|
||||
m_selector.interrupt();
|
||||
m_io_service.interrupt();
|
||||
l.unlock();
|
||||
|
||||
mutex::scoped_lock l2(m_checker_impl.m_mutex);
|
||||
|
@ -606,7 +608,7 @@ namespace libtorrent { namespace detail
|
|||
try
|
||||
{
|
||||
// create listener socket
|
||||
m_listen_socket = boost::shared_ptr<socket_acceptor>(new socket_acceptor(m_selector));
|
||||
m_listen_socket = boost::shared_ptr<socket_acceptor>(new socket_acceptor(m_io_service));
|
||||
|
||||
for(;;)
|
||||
{
|
||||
|
@ -704,7 +706,7 @@ namespace libtorrent { namespace detail
|
|||
|
||||
void session_impl::async_accept()
|
||||
{
|
||||
shared_ptr<stream_socket> c(new stream_socket(m_selector));
|
||||
shared_ptr<stream_socket> c(new stream_socket(m_io_service));
|
||||
m_listen_socket->async_accept(*c
|
||||
, bind(&session_impl::on_incoming_connection, this, c
|
||||
, weak_ptr<socket_acceptor>(m_listen_socket), _1));
|
||||
|
@ -892,7 +894,7 @@ namespace libtorrent { namespace detail
|
|||
(*m_logger) << "*** SECOND TIMER FAILED " << e.message() << "\n";
|
||||
#endif
|
||||
m_abort = true;
|
||||
m_selector.interrupt();
|
||||
m_io_service.interrupt();
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -902,7 +904,8 @@ namespace libtorrent { namespace detail
|
|||
m_last_tick = microsec_clock::universal_time();
|
||||
|
||||
m_timer.expires_from_now(seconds(1));
|
||||
m_timer.async_wait(bind(&session_impl::second_tick, this, _1));
|
||||
m_timer.async_wait(m_strand.wrap(
|
||||
bind(&session_impl::second_tick, this, _1)));
|
||||
|
||||
// do the second_tick() on each connection
|
||||
// this will update their statistics (download and upload speeds)
|
||||
|
@ -952,7 +955,7 @@ namespace libtorrent { namespace detail
|
|||
tracker_request req = t.generate_tracker_request();
|
||||
req.listen_port = m_listen_interface.port();
|
||||
req.key = m_key;
|
||||
m_tracker_manager.queue_request(m_selector, req, t.tracker_login()
|
||||
m_tracker_manager.queue_request(m_strand, req, t.tracker_login()
|
||||
, i->second);
|
||||
|
||||
if (m_alerts.should_post(alert::info))
|
||||
|
@ -1062,7 +1065,7 @@ namespace libtorrent { namespace detail
|
|||
{
|
||||
try
|
||||
{
|
||||
m_selector.run();
|
||||
m_io_service.run();
|
||||
assert(m_abort == true);
|
||||
}
|
||||
catch (std::exception& e)
|
||||
|
@ -1076,7 +1079,7 @@ namespace libtorrent { namespace detail
|
|||
}
|
||||
while (!m_abort);
|
||||
|
||||
deadline_timer tracker_timer(m_selector);
|
||||
deadline_timer tracker_timer(m_io_service);
|
||||
|
||||
session_impl::mutex_t::scoped_lock l(m_mutex);
|
||||
|
||||
|
@ -1094,9 +1097,9 @@ namespace libtorrent { namespace detail
|
|||
#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
|
||||
boost::shared_ptr<tracker_logger> tl(new tracker_logger(*this));
|
||||
m_tracker_loggers.push_back(tl);
|
||||
m_tracker_manager.queue_request(m_selector, req, login, tl);
|
||||
m_tracker_manager.queue_request(m_strand, req, login, tl);
|
||||
#else
|
||||
m_tracker_manager.queue_request(m_selector, req, login);
|
||||
m_tracker_manager.queue_request(m_strand, req, login);
|
||||
#endif
|
||||
}
|
||||
}
|
||||
|
@ -1109,10 +1112,11 @@ namespace libtorrent { namespace detail
|
|||
&& !m_tracker_manager.empty())
|
||||
{
|
||||
tracker_timer.expires_from_now(boost::posix_time::milliseconds(100));
|
||||
tracker_timer.async_wait(bind(&demuxer::interrupt, &m_selector));
|
||||
tracker_timer.async_wait(m_strand.wrap(
|
||||
bind(&io_service::interrupt, &m_io_service)));
|
||||
|
||||
m_selector.reset();
|
||||
m_selector.run();
|
||||
m_io_service.reset();
|
||||
m_io_service.run();
|
||||
}
|
||||
|
||||
l.lock();
|
||||
|
@ -1380,10 +1384,10 @@ namespace libtorrent { namespace detail
|
|||
#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
|
||||
boost::shared_ptr<tracker_logger> tl(new tracker_logger(*this));
|
||||
m_tracker_loggers.push_back(tl);
|
||||
m_tracker_manager.queue_request(m_selector, req
|
||||
m_tracker_manager.queue_request(m_strand, req
|
||||
, t.tracker_login(), tl);
|
||||
#else
|
||||
m_tracker_manager.queue_request(m_selector, req
|
||||
m_tracker_manager.queue_request(m_strand, req
|
||||
, t.tracker_login());
|
||||
#endif
|
||||
|
||||
|
@ -1514,7 +1518,7 @@ namespace libtorrent { namespace detail
|
|||
{
|
||||
mutex_t::scoped_lock l(m_mutex);
|
||||
m_dht.reset();
|
||||
m_dht.reset(new dht::dht_tracker(m_selector
|
||||
m_dht.reset(new dht::dht_tracker(m_io_service
|
||||
, m_dht_settings, m_listen_interface.address()
|
||||
, startup_state));
|
||||
}
|
||||
|
@ -1579,7 +1583,7 @@ namespace libtorrent { namespace detail
|
|||
// lock the main thread and abort it
|
||||
mutex_t::scoped_lock l(m_mutex);
|
||||
m_abort = true;
|
||||
m_selector.interrupt();
|
||||
m_io_service.interrupt();
|
||||
}
|
||||
m_thread->join();
|
||||
|
||||
|
|
|
@ -238,9 +238,9 @@ namespace libtorrent
|
|||
, m_duration(1800)
|
||||
, m_complete(-1)
|
||||
, m_incomplete(-1)
|
||||
, m_host_resolver(ses.m_selector)
|
||||
, m_host_resolver(ses.m_io_service)
|
||||
#ifndef TORRENT_DISABLE_DHT
|
||||
, m_dht_announce_timer(ses.m_selector)
|
||||
, m_dht_announce_timer(ses.m_io_service)
|
||||
#endif
|
||||
, m_policy()
|
||||
, m_ses(ses)
|
||||
|
@ -328,7 +328,8 @@ namespace libtorrent
|
|||
if (!tf.priv())
|
||||
{
|
||||
m_dht_announce_timer.expires_from_now(seconds(10));
|
||||
m_dht_announce_timer.async_wait(bind(&torrent::on_dht_announce, this, _1));
|
||||
m_dht_announce_timer.async_wait(m_ses.m_strand.wrap(
|
||||
bind(&torrent::on_dht_announce, this, _1)));
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
@ -355,9 +356,9 @@ namespace libtorrent
|
|||
, m_duration(1800)
|
||||
, m_complete(-1)
|
||||
, m_incomplete(-1)
|
||||
, m_host_resolver(ses.m_selector)
|
||||
, m_host_resolver(ses.m_io_service)
|
||||
#ifndef TORRENT_DISABLE_DHT
|
||||
, m_dht_announce_timer(ses.m_selector)
|
||||
, m_dht_announce_timer(ses.m_io_service)
|
||||
#endif
|
||||
, m_policy()
|
||||
, m_ses(ses)
|
||||
|
@ -443,7 +444,8 @@ namespace libtorrent
|
|||
m_torrent_file.add_tracker(tracker_url);
|
||||
#ifndef TORRENT_DISABLE_DHT
|
||||
m_dht_announce_timer.expires_from_now(seconds(10));
|
||||
m_dht_announce_timer.async_wait(bind(&torrent::on_dht_announce, this, _1));
|
||||
m_dht_announce_timer.async_wait(m_ses.m_strand.wrap(
|
||||
bind(&torrent::on_dht_announce, this, _1)));
|
||||
#endif
|
||||
}
|
||||
|
||||
|
@ -523,14 +525,15 @@ namespace libtorrent
|
|||
{
|
||||
if (e) return;
|
||||
m_dht_announce_timer.expires_from_now(boost::posix_time::minutes(30));
|
||||
m_dht_announce_timer.async_wait(bind(&torrent::on_dht_announce, this, _1));
|
||||
m_dht_announce_timer.async_wait(m_ses.m_strand.wrap(
|
||||
bind(&torrent::on_dht_announce, this, _1)));
|
||||
if (!m_ses.m_dht) return;
|
||||
// TODO: There should be a way to abort an announce operation on the dht.
|
||||
// when the torrent is destructed
|
||||
boost::weak_ptr<torrent> self(shared_from_this());
|
||||
m_ses.m_dht->announce(m_torrent_file.info_hash()
|
||||
, m_ses.m_listen_interface.port()
|
||||
, bind(&torrent::on_dht_announce_response_disp, self, _1));
|
||||
, m_ses.m_strand.wrap(bind(&torrent::on_dht_announce_response_disp, self, _1)));
|
||||
}
|
||||
|
||||
void torrent::on_dht_announce_response(std::vector<tcp::endpoint> const& peers)
|
||||
|
@ -1176,16 +1179,16 @@ namespace libtorrent
|
|||
if (m_ses.settings().proxy_ip.empty())
|
||||
{
|
||||
tcp::resolver::query q(hostname, boost::lexical_cast<std::string>(port));
|
||||
m_host_resolver.async_resolve(q, bind(&torrent::on_name_lookup
|
||||
, shared_from_this(), _1, _2, url));
|
||||
m_host_resolver.async_resolve(q, m_ses.m_strand.wrap(
|
||||
bind(&torrent::on_name_lookup, shared_from_this(), _1, _2, url)));
|
||||
}
|
||||
else
|
||||
{
|
||||
// use proxy
|
||||
tcp::resolver::query q(m_ses.settings().proxy_ip
|
||||
, boost::lexical_cast<std::string>(m_ses.settings().proxy_port));
|
||||
m_host_resolver.async_resolve(q, bind(&torrent::on_name_lookup
|
||||
, shared_from_this(), _1, _2, url));
|
||||
m_host_resolver.async_resolve(q, m_ses.m_strand.wrap(
|
||||
bind(&torrent::on_name_lookup, shared_from_this(), _1, _2, url)));
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -1234,7 +1237,7 @@ namespace libtorrent
|
|||
return;
|
||||
}
|
||||
|
||||
boost::shared_ptr<stream_socket> s(new stream_socket(m_ses.m_selector));
|
||||
boost::shared_ptr<stream_socket> s(new stream_socket(m_ses.m_io_service));
|
||||
boost::intrusive_ptr<peer_connection> c(new web_peer_connection(
|
||||
m_ses, shared_from_this(), s, a, url));
|
||||
|
||||
|
@ -1295,7 +1298,7 @@ namespace libtorrent
|
|||
if (m_connections.find(a) != m_connections.end())
|
||||
throw protocol_error("already connected to peer");
|
||||
|
||||
boost::shared_ptr<stream_socket> s(new stream_socket(m_ses.m_selector));
|
||||
boost::shared_ptr<stream_socket> s(new stream_socket(m_ses.m_io_service));
|
||||
boost::intrusive_ptr<peer_connection> c(new bt_peer_connection(
|
||||
m_ses, shared_from_this(), s, a));
|
||||
|
||||
|
|
|
@ -313,11 +313,11 @@ namespace libtorrent
|
|||
}
|
||||
|
||||
|
||||
timeout_handler::timeout_handler(demuxer& d)
|
||||
: m_demuxer(d)
|
||||
timeout_handler::timeout_handler(asio::strand& str)
|
||||
: m_strand(str)
|
||||
, m_start_time(second_clock::universal_time())
|
||||
, m_read_time(second_clock::universal_time())
|
||||
, m_timeout(d)
|
||||
, m_timeout(str.io_service())
|
||||
, m_completion_timeout(0)
|
||||
, m_read_timeout(0)
|
||||
, m_refs(0)
|
||||
|
@ -333,7 +333,8 @@ namespace libtorrent
|
|||
m_timeout.expires_at(std::min(
|
||||
m_read_time + seconds(m_read_timeout)
|
||||
, m_start_time + seconds(m_completion_timeout)));
|
||||
m_timeout.async_wait(bind(&timeout_handler::timeout_callback, self(), _1));
|
||||
m_timeout.async_wait(m_strand.wrap(bind(
|
||||
&timeout_handler::timeout_callback, self(), _1)));
|
||||
}
|
||||
|
||||
void timeout_handler::restart_read_timeout()
|
||||
|
@ -368,7 +369,8 @@ namespace libtorrent
|
|||
m_timeout.expires_at(std::min(
|
||||
m_read_time + seconds(m_read_timeout)
|
||||
, m_start_time + seconds(m_completion_timeout)));
|
||||
m_timeout.async_wait(bind(&timeout_handler::timeout_callback, self(), _1));
|
||||
m_timeout.async_wait(m_strand.wrap(
|
||||
bind(&timeout_handler::timeout_callback, self(), _1)));
|
||||
}
|
||||
catch (std::exception& e)
|
||||
{
|
||||
|
@ -378,9 +380,9 @@ namespace libtorrent
|
|||
tracker_connection::tracker_connection(
|
||||
tracker_manager& man
|
||||
, tracker_request req
|
||||
, demuxer& d
|
||||
, asio::strand& str
|
||||
, boost::weak_ptr<request_callback> r)
|
||||
: timeout_handler(d)
|
||||
: timeout_handler(str)
|
||||
, m_requester(r)
|
||||
, m_man(man)
|
||||
, m_req(req)
|
||||
|
@ -478,7 +480,7 @@ namespace libtorrent
|
|||
}
|
||||
|
||||
void tracker_manager::queue_request(
|
||||
demuxer& d
|
||||
asio::strand& str
|
||||
, tracker_request req
|
||||
, std::string const& auth
|
||||
, boost::weak_ptr<request_callback> c)
|
||||
|
@ -503,7 +505,7 @@ namespace libtorrent
|
|||
if (protocol == "http")
|
||||
{
|
||||
con = new http_tracker_connection(
|
||||
d
|
||||
str
|
||||
, *this
|
||||
, req
|
||||
, hostname
|
||||
|
@ -516,7 +518,7 @@ namespace libtorrent
|
|||
else if (protocol == "udp")
|
||||
{
|
||||
con = new udp_tracker_connection(
|
||||
d
|
||||
str
|
||||
, *this
|
||||
, req
|
||||
, hostname
|
||||
|
|
|
@ -73,26 +73,28 @@ namespace libtorrent
|
|||
{
|
||||
|
||||
udp_tracker_connection::udp_tracker_connection(
|
||||
demuxer& d
|
||||
asio::strand& str
|
||||
, tracker_manager& man
|
||||
, tracker_request const& req
|
||||
, std::string const& hostname
|
||||
, unsigned short port
|
||||
, boost::weak_ptr<request_callback> c
|
||||
, session_settings const& stn)
|
||||
: tracker_connection(man, req, d, c)
|
||||
: tracker_connection(man, req, str, c)
|
||||
, m_man(man)
|
||||
, m_name_lookup(d)
|
||||
, m_strand(str)
|
||||
, m_name_lookup(m_strand.io_service())
|
||||
, m_port(port)
|
||||
, m_transaction_id(0)
|
||||
, m_connection_id(0)
|
||||
, m_settings(stn)
|
||||
, m_attempts(0)
|
||||
{
|
||||
m_socket.reset(new datagram_socket(d));
|
||||
m_socket.reset(new datagram_socket(m_strand.io_service()));
|
||||
tcp::resolver::query q(hostname, "0");
|
||||
m_name_lookup.async_resolve(q
|
||||
, boost::bind(&udp_tracker_connection::name_lookup, self(), _1, _2));
|
||||
, m_strand.wrap(boost::bind(
|
||||
&udp_tracker_connection::name_lookup, self(), _1, _2)));
|
||||
set_timeout(m_settings.tracker_completion_timeout
|
||||
, m_settings.tracker_receive_timeout);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue