From 13facbdb8336ab0afcd2c8070a89b9c9ced3a7e8 Mon Sep 17 00:00:00 2001 From: Arvid Norberg Date: Fri, 15 Dec 2006 17:47:21 +0000 Subject: [PATCH] added a strand for all async operations for thread safety --- include/libtorrent/aux_/session_impl.hpp | 3 +- .../libtorrent/http_tracker_connection.hpp | 3 +- include/libtorrent/kademlia/dht_tracker.hpp | 4 +- include/libtorrent/socket.hpp | 5 ++- include/libtorrent/tracker_manager.hpp | 8 ++-- include/libtorrent/udp_tracker_connection.hpp | 3 +- src/http_tracker_connection.cpp | 13 +++--- src/kademlia/dht_tracker.cpp | 40 +++++++++-------- src/peer_connection.cpp | 9 ++-- src/session_impl.cpp | 44 ++++++++++--------- src/torrent.cpp | 31 +++++++------ src/tracker_manager.cpp | 22 +++++----- src/udp_tracker_connection.cpp | 12 ++--- 13 files changed, 106 insertions(+), 91 deletions(-) diff --git a/include/libtorrent/aux_/session_impl.hpp b/include/libtorrent/aux_/session_impl.hpp index 66ff276d1..06c690ac2 100644 --- a/include/libtorrent/aux_/session_impl.hpp +++ b/include/libtorrent/aux_/session_impl.hpp @@ -273,7 +273,8 @@ namespace libtorrent // this is where all active sockets are stored. // the selector can sleep while there's no activity on // them - demuxer m_selector; + io_service m_io_service; + asio::strand m_strand; tracker_manager m_tracker_manager; torrent_map m_torrents; diff --git a/include/libtorrent/http_tracker_connection.hpp b/include/libtorrent/http_tracker_connection.hpp index f5596f5dd..9dffbe049 100755 --- a/include/libtorrent/http_tracker_connection.hpp +++ b/include/libtorrent/http_tracker_connection.hpp @@ -110,7 +110,7 @@ namespace libtorrent public: http_tracker_connection( - demuxer& d + asio::strand& str , tracker_manager& man , tracker_request const& req , std::string const& hostname @@ -145,6 +145,7 @@ namespace libtorrent tracker_manager& m_man; http_parser m_parser; + asio::strand& m_strand; tcp::resolver m_name_lookup; int m_port; boost::shared_ptr m_socket; diff --git a/include/libtorrent/kademlia/dht_tracker.hpp b/include/libtorrent/kademlia/dht_tracker.hpp index f879ece82..e97b692af 100644 --- a/include/libtorrent/kademlia/dht_tracker.hpp +++ b/include/libtorrent/kademlia/dht_tracker.hpp @@ -62,7 +62,7 @@ namespace libtorrent { namespace dht struct dht_tracker { - dht_tracker(asio::io_service& d, dht_settings const& settings + dht_tracker(asio::io_service& ios, dht_settings const& settings , asio::ip::address listen_interface, entry const& bootstrap); void add_node(udp::endpoint node); @@ -95,7 +95,7 @@ namespace libtorrent { namespace dht void on_bootstrap(); void send_packet(msg const& m); - asio::io_service& m_demuxer; + asio::strand m_strand; asio::ip::udp::socket m_socket; node_impl m_dht; diff --git a/include/libtorrent/socket.hpp b/include/libtorrent/socket.hpp index 394c2a133..5df7a4579 100755 --- a/include/libtorrent/socket.hpp +++ b/include/libtorrent/socket.hpp @@ -50,6 +50,7 @@ POSSIBILITY OF SUCH DAMAGE. #include #include #include +#include #ifdef __OBJC__ #undef Protocol @@ -71,7 +72,7 @@ namespace libtorrent using boost::asio::stream_socket; using boost::asio::datagram_socket; using boost::asio::socket_acceptor; - using boost::asio::demuxer; + using boost::asio::io_service; using boost::asio::ipv4::host_resolver; using boost::asio::async_write; using boost::asio::ipv4::host; @@ -87,7 +88,7 @@ namespace libtorrent typedef asio::ip::address_v6 address_v6; typedef asio::ip::udp::socket datagram_socket; typedef asio::ip::tcp::acceptor socket_acceptor; - typedef asio::io_service demuxer; + typedef asio::io_service io_service; using asio::async_write; using asio::deadline_timer; diff --git a/include/libtorrent/tracker_manager.hpp b/include/libtorrent/tracker_manager.hpp index c7734f414..1b34bc09e 100755 --- a/include/libtorrent/tracker_manager.hpp +++ b/include/libtorrent/tracker_manager.hpp @@ -158,7 +158,7 @@ namespace libtorrent friend void intrusive_ptr_add_ref(timeout_handler const*); friend void intrusive_ptr_release(timeout_handler const*); - timeout_handler(demuxer& d); + timeout_handler(asio::strand& str); void set_timeout(int completion_timeout, int read_timeout); void restart_read_timeout(); @@ -174,7 +174,7 @@ namespace libtorrent boost::intrusive_ptr self() { return boost::intrusive_ptr(this); } - demuxer& m_demuxer; + asio::strand& m_strand; // used for timeouts // this is set when the request has been sent boost::posix_time::ptime m_start_time; @@ -196,7 +196,7 @@ namespace libtorrent { tracker_connection(tracker_manager& man , tracker_request req - , demuxer& d + , asio::strand& str , boost::weak_ptr r); request_callback& requester(); @@ -224,7 +224,7 @@ namespace libtorrent : m_settings(s) {} void queue_request( - demuxer& d + asio::strand& str , tracker_request r , std::string const& auth , boost::weak_ptr c diff --git a/include/libtorrent/udp_tracker_connection.hpp b/include/libtorrent/udp_tracker_connection.hpp index dfdd4ebf5..977c41691 100755 --- a/include/libtorrent/udp_tracker_connection.hpp +++ b/include/libtorrent/udp_tracker_connection.hpp @@ -66,7 +66,7 @@ namespace libtorrent public: udp_tracker_connection( - demuxer& d + asio::strand& str , tracker_manager& man , tracker_request const& req , std::string const& hostname @@ -103,6 +103,7 @@ namespace libtorrent tracker_manager& m_man; + asio::strand& m_strand; tcp::resolver m_name_lookup; int m_port; boost::shared_ptr m_socket; diff --git a/src/http_tracker_connection.cpp b/src/http_tracker_connection.cpp index fca98febf..fb90ab67b 100755 --- a/src/http_tracker_connection.cpp +++ b/src/http_tracker_connection.cpp @@ -217,7 +217,7 @@ namespace libtorrent } http_tracker_connection::http_tracker_connection( - demuxer& d + asio::strand& str , tracker_manager& man , tracker_request const& req , std::string const& hostname @@ -226,9 +226,10 @@ namespace libtorrent , boost::weak_ptr c , session_settings const& stn , std::string const& auth) - : tracker_connection(man, req, d, c) + : tracker_connection(man, req, str, c) , m_man(man) - , m_name_lookup(d) + , m_strand(str) + , m_name_lookup(m_strand.io_service()) , m_port(port) , m_recv_pos(0) , m_buffer(http_buffer_size) @@ -368,8 +369,8 @@ namespace libtorrent tcp::resolver::query q(*connect_to_host , boost::lexical_cast(m_port)); - m_name_lookup.async_resolve(q - , boost::bind(&http_tracker_connection::name_lookup, self(), _1, _2)); + m_name_lookup.async_resolve(q, m_strand.wrap( + boost::bind(&http_tracker_connection::name_lookup, self(), _1, _2))); set_timeout(m_settings.tracker_completion_timeout , m_settings.tracker_receive_timeout); } @@ -572,7 +573,7 @@ namespace libtorrent else req.url.assign(location.begin(), location.begin() + i); - m_man.queue_request(m_socket->io_service(), req + m_man.queue_request(m_strand, req , m_password, m_requester); close(); return; diff --git a/src/kademlia/dht_tracker.cpp b/src/kademlia/dht_tracker.cpp index 7c3137dfe..526b94b37 100644 --- a/src/kademlia/dht_tracker.cpp +++ b/src/kademlia/dht_tracker.cpp @@ -132,20 +132,20 @@ namespace libtorrent { namespace dht // class that puts the networking and the kademlia node in a single // unit and connecting them together. - dht_tracker::dht_tracker(asio::io_service& d, dht_settings const& settings + dht_tracker::dht_tracker(asio::io_service& ios, dht_settings const& settings , asio::ip::address listen_interface, entry const& bootstrap) - : m_demuxer(d) - , m_socket(m_demuxer, udp::endpoint(listen_interface, settings.service_port)) + : m_strand(ios) + , m_socket(ios, udp::endpoint(listen_interface, settings.service_port)) , m_dht(bind(&dht_tracker::send_packet, this, _1), settings , read_id(bootstrap)) , m_buffer(0) , m_last_refresh(second_clock::universal_time() - hours(1)) - , m_timer(m_demuxer) - , m_connection_timer(m_demuxer) - , m_refresh_timer(m_demuxer) + , m_timer(ios) + , m_connection_timer(ios) + , m_refresh_timer(ios) , m_settings(settings) , m_refresh_bucket(160) - , m_host_resolver(d) + , m_host_resolver(ios) { using boost::bind; @@ -191,15 +191,16 @@ namespace libtorrent { namespace dht m_socket.async_receive_from(asio::buffer(&m_in_buf[m_buffer][0] , m_in_buf[m_buffer].size()), m_remote_endpoint[m_buffer] - , bind(&dht_tracker::on_receive, this, _1, _2)); + , m_strand.wrap(bind(&dht_tracker::on_receive, this, _1, _2))); m_timer.expires_from_now(seconds(1)); - m_timer.async_wait(bind(&dht_tracker::tick, this, _1)); + m_timer.async_wait(m_strand.wrap(bind(&dht_tracker::tick, this, _1))); m_connection_timer.expires_from_now(seconds(10)); - m_connection_timer.async_wait(bind(&dht_tracker::connection_timeout, this, _1)); + m_connection_timer.async_wait(m_strand.wrap( + bind(&dht_tracker::connection_timeout, this, _1))); m_refresh_timer.expires_from_now(minutes(15)); - m_refresh_timer.async_wait(bind(&dht_tracker::refresh_timeout, this, _1)); + m_refresh_timer.async_wait(m_strand.wrap(bind(&dht_tracker::refresh_timeout, this, _1))); } void dht_tracker::dht_status(session_status& s) @@ -214,7 +215,7 @@ namespace libtorrent { namespace dht if (e) return; time_duration d = m_dht.connection_timeout(); m_connection_timer.expires_from_now(d); - m_connection_timer.async_wait(bind(&dht_tracker::connection_timeout, this, _1)); + m_connection_timer.async_wait(m_strand.wrap(bind(&dht_tracker::connection_timeout, this, _1))); } catch (std::exception& exc) { @@ -229,7 +230,8 @@ namespace libtorrent { namespace dht if (e) return; time_duration d = m_dht.refresh_timeout(); m_refresh_timer.expires_from_now(d); - m_refresh_timer.async_wait(bind(&dht_tracker::refresh_timeout, this, _1)); + m_refresh_timer.async_wait(m_strand.wrap( + bind(&dht_tracker::refresh_timeout, this, _1))); } catch (std::exception&) { @@ -248,7 +250,7 @@ namespace libtorrent { namespace dht { if (e) return; m_timer.expires_from_now(minutes(tick_period)); - m_timer.async_wait(bind(&dht_tracker::tick, this, _1)); + m_timer.async_wait(m_strand.wrap(bind(&dht_tracker::tick, this, _1))); m_dht.new_write_key(); @@ -357,7 +359,7 @@ namespace libtorrent { namespace dht m_buffer = (m_buffer + 1) & 1; m_socket.async_receive_from(asio::buffer(&m_in_buf[m_buffer][0] , m_in_buf[m_buffer].size()), m_remote_endpoint[m_buffer] - , bind(&dht_tracker::on_receive, this, _1, _2)); + , m_strand.wrap(bind(&dht_tracker::on_receive, this, _1, _2))); if (error) return; @@ -650,8 +652,8 @@ namespace libtorrent { namespace dht void dht_tracker::add_node(std::pair const& node) { udp::resolver::query q(node.first, lexical_cast(node.second)); - m_host_resolver.async_resolve(q, bind(&dht_tracker::on_name_lookup - , this, _1, _2)); + m_host_resolver.async_resolve(q, m_strand.wrap( + bind(&dht_tracker::on_name_lookup, this, _1, _2))); } void dht_tracker::on_name_lookup(asio::error_code const& e @@ -668,8 +670,8 @@ namespace libtorrent { namespace dht void dht_tracker::add_router_node(std::pair const& node) { udp::resolver::query q(node.first, lexical_cast(node.second)); - m_host_resolver.async_resolve(q, bind(&dht_tracker::on_router_name_lookup - , this, _1, _2)); + m_host_resolver.async_resolve(q, m_strand.wrap( + bind(&dht_tracker::on_router_name_lookup, this, _1, _2))); } void dht_tracker::on_router_name_lookup(asio::error_code const& e diff --git a/src/peer_connection.cpp b/src/peer_connection.cpp index e7c2718cb..d69046fbb 100755 --- a/src/peer_connection.cpp +++ b/src/peer_connection.cpp @@ -615,13 +615,10 @@ namespace libtorrent // be requested from other peers t->picker().abort_download(*i); } - - m_download_queue.clear(); - m_request_queue.clear(); } - assert(m_download_queue.empty()); - assert(m_request_queue.empty()); + m_download_queue.clear(); + m_request_queue.clear(); #ifndef NDEBUG // t->picker().integrity_check(m_torrent); @@ -1474,7 +1471,7 @@ namespace libtorrent if (m_disconnecting) return; m_disconnecting = true; - m_ses.m_selector.post(boost::bind(&close_socket_ignore_error, m_socket)); + m_ses.m_io_service.post(boost::bind(&close_socket_ignore_error, m_socket)); boost::shared_ptr t = m_torrent.lock(); diff --git a/src/session_impl.cpp b/src/session_impl.cpp index 79ad7c60b..4dedf0302 100755 --- a/src/session_impl.cpp +++ b/src/session_impl.cpp @@ -474,7 +474,8 @@ namespace libtorrent { namespace detail std::pair listen_port_range , fingerprint const& cl_fprint , char const* listen_interface) - : m_tracker_manager(m_settings) + : m_strand(m_io_service) + , m_tracker_manager(m_settings) , m_listen_port_range(listen_port_range) , m_listen_interface(address::from_string(listen_interface), listen_port_range.first) , m_abort(false) @@ -486,7 +487,7 @@ namespace libtorrent { namespace detail , m_incoming_connection(false) , m_files(40) , m_last_tick(microsec_clock::universal_time()) - , m_timer(m_selector) + , m_timer(m_io_service) , m_checker_impl(*this) { @@ -530,7 +531,8 @@ namespace libtorrent { namespace detail } m_timer.expires_from_now(seconds(1)); - m_timer.async_wait(bind(&session_impl::second_tick, this, _1)); + m_timer.async_wait(m_strand.wrap( + bind(&session_impl::second_tick, this, _1))); m_thread.reset(new boost::thread(boost::ref(*this))); m_checker_thread.reset(new boost::thread(boost::ref(m_checker_impl))); @@ -557,7 +559,7 @@ namespace libtorrent { namespace detail assert(!m_abort); // abort the main thread m_abort = true; - m_selector.interrupt(); + m_io_service.interrupt(); l.unlock(); mutex::scoped_lock l2(m_checker_impl.m_mutex); @@ -606,7 +608,7 @@ namespace libtorrent { namespace detail try { // create listener socket - m_listen_socket = boost::shared_ptr(new socket_acceptor(m_selector)); + m_listen_socket = boost::shared_ptr(new socket_acceptor(m_io_service)); for(;;) { @@ -704,7 +706,7 @@ namespace libtorrent { namespace detail void session_impl::async_accept() { - shared_ptr c(new stream_socket(m_selector)); + shared_ptr c(new stream_socket(m_io_service)); m_listen_socket->async_accept(*c , bind(&session_impl::on_incoming_connection, this, c , weak_ptr(m_listen_socket), _1)); @@ -892,7 +894,7 @@ namespace libtorrent { namespace detail (*m_logger) << "*** SECOND TIMER FAILED " << e.message() << "\n"; #endif m_abort = true; - m_selector.interrupt(); + m_io_service.interrupt(); return; } @@ -902,7 +904,8 @@ namespace libtorrent { namespace detail m_last_tick = microsec_clock::universal_time(); m_timer.expires_from_now(seconds(1)); - m_timer.async_wait(bind(&session_impl::second_tick, this, _1)); + m_timer.async_wait(m_strand.wrap( + bind(&session_impl::second_tick, this, _1))); // do the second_tick() on each connection // this will update their statistics (download and upload speeds) @@ -952,7 +955,7 @@ namespace libtorrent { namespace detail tracker_request req = t.generate_tracker_request(); req.listen_port = m_listen_interface.port(); req.key = m_key; - m_tracker_manager.queue_request(m_selector, req, t.tracker_login() + m_tracker_manager.queue_request(m_strand, req, t.tracker_login() , i->second); if (m_alerts.should_post(alert::info)) @@ -1062,7 +1065,7 @@ namespace libtorrent { namespace detail { try { - m_selector.run(); + m_io_service.run(); assert(m_abort == true); } catch (std::exception& e) @@ -1076,7 +1079,7 @@ namespace libtorrent { namespace detail } while (!m_abort); - deadline_timer tracker_timer(m_selector); + deadline_timer tracker_timer(m_io_service); session_impl::mutex_t::scoped_lock l(m_mutex); @@ -1094,9 +1097,9 @@ namespace libtorrent { namespace detail #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING) boost::shared_ptr tl(new tracker_logger(*this)); m_tracker_loggers.push_back(tl); - m_tracker_manager.queue_request(m_selector, req, login, tl); + m_tracker_manager.queue_request(m_strand, req, login, tl); #else - m_tracker_manager.queue_request(m_selector, req, login); + m_tracker_manager.queue_request(m_strand, req, login); #endif } } @@ -1109,10 +1112,11 @@ namespace libtorrent { namespace detail && !m_tracker_manager.empty()) { tracker_timer.expires_from_now(boost::posix_time::milliseconds(100)); - tracker_timer.async_wait(bind(&demuxer::interrupt, &m_selector)); + tracker_timer.async_wait(m_strand.wrap( + bind(&io_service::interrupt, &m_io_service))); - m_selector.reset(); - m_selector.run(); + m_io_service.reset(); + m_io_service.run(); } l.lock(); @@ -1380,10 +1384,10 @@ namespace libtorrent { namespace detail #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING) boost::shared_ptr tl(new tracker_logger(*this)); m_tracker_loggers.push_back(tl); - m_tracker_manager.queue_request(m_selector, req + m_tracker_manager.queue_request(m_strand, req , t.tracker_login(), tl); #else - m_tracker_manager.queue_request(m_selector, req + m_tracker_manager.queue_request(m_strand, req , t.tracker_login()); #endif @@ -1514,7 +1518,7 @@ namespace libtorrent { namespace detail { mutex_t::scoped_lock l(m_mutex); m_dht.reset(); - m_dht.reset(new dht::dht_tracker(m_selector + m_dht.reset(new dht::dht_tracker(m_io_service , m_dht_settings, m_listen_interface.address() , startup_state)); } @@ -1579,7 +1583,7 @@ namespace libtorrent { namespace detail // lock the main thread and abort it mutex_t::scoped_lock l(m_mutex); m_abort = true; - m_selector.interrupt(); + m_io_service.interrupt(); } m_thread->join(); diff --git a/src/torrent.cpp b/src/torrent.cpp index 3ccf572a6..ed5f18b17 100755 --- a/src/torrent.cpp +++ b/src/torrent.cpp @@ -238,9 +238,9 @@ namespace libtorrent , m_duration(1800) , m_complete(-1) , m_incomplete(-1) - , m_host_resolver(ses.m_selector) + , m_host_resolver(ses.m_io_service) #ifndef TORRENT_DISABLE_DHT - , m_dht_announce_timer(ses.m_selector) + , m_dht_announce_timer(ses.m_io_service) #endif , m_policy() , m_ses(ses) @@ -328,7 +328,8 @@ namespace libtorrent if (!tf.priv()) { m_dht_announce_timer.expires_from_now(seconds(10)); - m_dht_announce_timer.async_wait(bind(&torrent::on_dht_announce, this, _1)); + m_dht_announce_timer.async_wait(m_ses.m_strand.wrap( + bind(&torrent::on_dht_announce, this, _1))); } #endif } @@ -355,9 +356,9 @@ namespace libtorrent , m_duration(1800) , m_complete(-1) , m_incomplete(-1) - , m_host_resolver(ses.m_selector) + , m_host_resolver(ses.m_io_service) #ifndef TORRENT_DISABLE_DHT - , m_dht_announce_timer(ses.m_selector) + , m_dht_announce_timer(ses.m_io_service) #endif , m_policy() , m_ses(ses) @@ -443,7 +444,8 @@ namespace libtorrent m_torrent_file.add_tracker(tracker_url); #ifndef TORRENT_DISABLE_DHT m_dht_announce_timer.expires_from_now(seconds(10)); - m_dht_announce_timer.async_wait(bind(&torrent::on_dht_announce, this, _1)); + m_dht_announce_timer.async_wait(m_ses.m_strand.wrap( + bind(&torrent::on_dht_announce, this, _1))); #endif } @@ -523,14 +525,15 @@ namespace libtorrent { if (e) return; m_dht_announce_timer.expires_from_now(boost::posix_time::minutes(30)); - m_dht_announce_timer.async_wait(bind(&torrent::on_dht_announce, this, _1)); + m_dht_announce_timer.async_wait(m_ses.m_strand.wrap( + bind(&torrent::on_dht_announce, this, _1))); if (!m_ses.m_dht) return; // TODO: There should be a way to abort an announce operation on the dht. // when the torrent is destructed boost::weak_ptr self(shared_from_this()); m_ses.m_dht->announce(m_torrent_file.info_hash() , m_ses.m_listen_interface.port() - , bind(&torrent::on_dht_announce_response_disp, self, _1)); + , m_ses.m_strand.wrap(bind(&torrent::on_dht_announce_response_disp, self, _1))); } void torrent::on_dht_announce_response(std::vector const& peers) @@ -1176,16 +1179,16 @@ namespace libtorrent if (m_ses.settings().proxy_ip.empty()) { tcp::resolver::query q(hostname, boost::lexical_cast(port)); - m_host_resolver.async_resolve(q, bind(&torrent::on_name_lookup - , shared_from_this(), _1, _2, url)); + m_host_resolver.async_resolve(q, m_ses.m_strand.wrap( + bind(&torrent::on_name_lookup, shared_from_this(), _1, _2, url))); } else { // use proxy tcp::resolver::query q(m_ses.settings().proxy_ip , boost::lexical_cast(m_ses.settings().proxy_port)); - m_host_resolver.async_resolve(q, bind(&torrent::on_name_lookup - , shared_from_this(), _1, _2, url)); + m_host_resolver.async_resolve(q, m_ses.m_strand.wrap( + bind(&torrent::on_name_lookup, shared_from_this(), _1, _2, url))); } } @@ -1234,7 +1237,7 @@ namespace libtorrent return; } - boost::shared_ptr s(new stream_socket(m_ses.m_selector)); + boost::shared_ptr s(new stream_socket(m_ses.m_io_service)); boost::intrusive_ptr c(new web_peer_connection( m_ses, shared_from_this(), s, a, url)); @@ -1295,7 +1298,7 @@ namespace libtorrent if (m_connections.find(a) != m_connections.end()) throw protocol_error("already connected to peer"); - boost::shared_ptr s(new stream_socket(m_ses.m_selector)); + boost::shared_ptr s(new stream_socket(m_ses.m_io_service)); boost::intrusive_ptr c(new bt_peer_connection( m_ses, shared_from_this(), s, a)); diff --git a/src/tracker_manager.cpp b/src/tracker_manager.cpp index d234a3b2b..4724e764a 100755 --- a/src/tracker_manager.cpp +++ b/src/tracker_manager.cpp @@ -313,11 +313,11 @@ namespace libtorrent } - timeout_handler::timeout_handler(demuxer& d) - : m_demuxer(d) + timeout_handler::timeout_handler(asio::strand& str) + : m_strand(str) , m_start_time(second_clock::universal_time()) , m_read_time(second_clock::universal_time()) - , m_timeout(d) + , m_timeout(str.io_service()) , m_completion_timeout(0) , m_read_timeout(0) , m_refs(0) @@ -333,7 +333,8 @@ namespace libtorrent m_timeout.expires_at(std::min( m_read_time + seconds(m_read_timeout) , m_start_time + seconds(m_completion_timeout))); - m_timeout.async_wait(bind(&timeout_handler::timeout_callback, self(), _1)); + m_timeout.async_wait(m_strand.wrap(bind( + &timeout_handler::timeout_callback, self(), _1))); } void timeout_handler::restart_read_timeout() @@ -368,7 +369,8 @@ namespace libtorrent m_timeout.expires_at(std::min( m_read_time + seconds(m_read_timeout) , m_start_time + seconds(m_completion_timeout))); - m_timeout.async_wait(bind(&timeout_handler::timeout_callback, self(), _1)); + m_timeout.async_wait(m_strand.wrap( + bind(&timeout_handler::timeout_callback, self(), _1))); } catch (std::exception& e) { @@ -378,9 +380,9 @@ namespace libtorrent tracker_connection::tracker_connection( tracker_manager& man , tracker_request req - , demuxer& d + , asio::strand& str , boost::weak_ptr r) - : timeout_handler(d) + : timeout_handler(str) , m_requester(r) , m_man(man) , m_req(req) @@ -478,7 +480,7 @@ namespace libtorrent } void tracker_manager::queue_request( - demuxer& d + asio::strand& str , tracker_request req , std::string const& auth , boost::weak_ptr c) @@ -503,7 +505,7 @@ namespace libtorrent if (protocol == "http") { con = new http_tracker_connection( - d + str , *this , req , hostname @@ -516,7 +518,7 @@ namespace libtorrent else if (protocol == "udp") { con = new udp_tracker_connection( - d + str , *this , req , hostname diff --git a/src/udp_tracker_connection.cpp b/src/udp_tracker_connection.cpp index e07d1368e..94e57d5c5 100755 --- a/src/udp_tracker_connection.cpp +++ b/src/udp_tracker_connection.cpp @@ -73,26 +73,28 @@ namespace libtorrent { udp_tracker_connection::udp_tracker_connection( - demuxer& d + asio::strand& str , tracker_manager& man , tracker_request const& req , std::string const& hostname , unsigned short port , boost::weak_ptr c , session_settings const& stn) - : tracker_connection(man, req, d, c) + : tracker_connection(man, req, str, c) , m_man(man) - , m_name_lookup(d) + , m_strand(str) + , m_name_lookup(m_strand.io_service()) , m_port(port) , m_transaction_id(0) , m_connection_id(0) , m_settings(stn) , m_attempts(0) { - m_socket.reset(new datagram_socket(d)); + m_socket.reset(new datagram_socket(m_strand.io_service())); tcp::resolver::query q(hostname, "0"); m_name_lookup.async_resolve(q - , boost::bind(&udp_tracker_connection::name_lookup, self(), _1, _2)); + , m_strand.wrap(boost::bind( + &udp_tracker_connection::name_lookup, self(), _1, _2))); set_timeout(m_settings.tracker_completion_timeout , m_settings.tracker_receive_timeout); }