fix to start async operations inside the network thread to maintain thread safety within asio

This commit is contained in:
Arvid Norberg 2010-09-25 20:07:27 +00:00
parent f1a7bc3f01
commit e6d400084e
3 changed files with 62 additions and 7 deletions

View File

@ -146,6 +146,7 @@ namespace libtorrent
#endif #endif
); );
~session_impl(); ~session_impl();
void start();
#ifndef TORRENT_DISABLE_EXTENSIONS #ifndef TORRENT_DISABLE_EXTENSIONS
void add_extension(boost::function<boost::shared_ptr<torrent_plugin>( void add_extension(boost::function<boost::shared_ptr<torrent_plugin>(

View File

@ -750,14 +750,26 @@ namespace aux {
url_random((char*)&m_peer_id[print.length()], (char*)&m_peer_id[0] + 20); url_random((char*)&m_peer_id[print.length()], (char*)&m_peer_id[0] + 20);
m_thread.reset(new thread(boost::bind(&session_impl::main_thread, this)));
}
void session_impl::start()
{
// this is where we should set up all async operations. This
// is called from within the network thread as opposed to the
// constructor which is called from the main thread
error_code ec;
m_timer.expires_from_now(milliseconds(m_settings.tick_interval), ec); m_timer.expires_from_now(milliseconds(m_settings.tick_interval), ec);
m_timer.async_wait(boost::bind(&session_impl::on_tick, this, _1)); m_timer.async_wait(boost::bind(&session_impl::on_tick, this, _1));
TORRENT_ASSERT(!ec);
int delay = (std::max)(m_settings.local_service_announce_interval int delay = (std::max)(m_settings.local_service_announce_interval
/ (std::max)(int(m_torrents.size()), 1), 1); / (std::max)(int(m_torrents.size()), 1), 1);
m_lsd_announce_timer.expires_from_now(seconds(delay), ec); m_lsd_announce_timer.expires_from_now(seconds(delay), ec);
m_lsd_announce_timer.async_wait( m_lsd_announce_timer.async_wait(
boost::bind(&session_impl::on_lsd_announce, this, _1)); boost::bind(&session_impl::on_lsd_announce, this, _1));
TORRENT_ASSERT(!ec);
#ifndef TORRENT_DISABLE_DHT #ifndef TORRENT_DISABLE_DHT
delay = (std::max)(m_settings.dht_announce_interval delay = (std::max)(m_settings.dht_announce_interval
@ -765,16 +777,17 @@ namespace aux {
m_dht_announce_timer.expires_from_now(seconds(delay), ec); m_dht_announce_timer.expires_from_now(seconds(delay), ec);
m_dht_announce_timer.async_wait( m_dht_announce_timer.async_wait(
boost::bind(&session_impl::on_dht_announce, this, _1)); boost::bind(&session_impl::on_dht_announce, this, _1));
TORRENT_ASSERT(!ec);
#endif #endif
// no reuse_address // no reuse_address
open_listen_port(false); open_listen_port(false);
m_thread.reset(new thread(boost::bind(&session_impl::main_thread, this)));
} }
void session_impl::save_state(entry* eptr, boost::uint32_t flags) const void session_impl::save_state(entry* eptr, boost::uint32_t flags) const
{ {
TORRENT_ASSERT(is_network_thread());
entry& e = *eptr; entry& e = *eptr;
if (flags & session::save_settings) if (flags & session::save_settings)
@ -836,6 +849,8 @@ namespace aux {
void session_impl::set_proxy(proxy_settings const& s) void session_impl::set_proxy(proxy_settings const& s)
{ {
TORRENT_ASSERT(is_network_thread());
m_proxy = s; m_proxy = s;
// in case we just set a socks proxy, we might have to // in case we just set a socks proxy, we might have to
// open the socks incoming connection // open the socks incoming connection
@ -845,6 +860,8 @@ namespace aux {
void session_impl::load_state(lazy_entry const* e) void session_impl::load_state(lazy_entry const* e)
{ {
TORRENT_ASSERT(is_network_thread());
lazy_entry const* settings; lazy_entry const* settings;
if (e->type() != lazy_entry::dict_t) return; if (e->type() != lazy_entry::dict_t) return;
@ -927,12 +944,16 @@ namespace aux {
char const* session_impl::country_for_ip(address const& a) char const* session_impl::country_for_ip(address const& a)
{ {
TORRENT_ASSERT(is_network_thread());
if (!a.is_v4() || m_country_db == 0) return 0; if (!a.is_v4() || m_country_db == 0) return 0;
return GeoIP_country_code_by_ipnum(m_country_db, a.to_v4().to_ulong()); return GeoIP_country_code_by_ipnum(m_country_db, a.to_v4().to_ulong());
} }
int session_impl::as_for_ip(address const& a) int session_impl::as_for_ip(address const& a)
{ {
TORRENT_ASSERT(is_network_thread());
if (!a.is_v4() || m_asnum_db == 0) return 0; if (!a.is_v4() || m_asnum_db == 0) return 0;
char* name = GeoIP_name_by_ipnum(m_asnum_db, a.to_v4().to_ulong()); char* name = GeoIP_name_by_ipnum(m_asnum_db, a.to_v4().to_ulong());
if (name == 0) return 0; if (name == 0) return 0;
@ -943,6 +964,8 @@ namespace aux {
std::string session_impl::as_name_for_ip(address const& a) std::string session_impl::as_name_for_ip(address const& a)
{ {
TORRENT_ASSERT(is_network_thread());
if (!a.is_v4() || m_asnum_db == 0) return std::string(); if (!a.is_v4() || m_asnum_db == 0) return std::string();
char* name = GeoIP_name_by_ipnum(m_asnum_db, a.to_v4().to_ulong()); char* name = GeoIP_name_by_ipnum(m_asnum_db, a.to_v4().to_ulong());
if (name == 0) return std::string(); if (name == 0) return std::string();
@ -954,6 +977,8 @@ namespace aux {
std::pair<const int, int>* session_impl::lookup_as(int as) std::pair<const int, int>* session_impl::lookup_as(int as)
{ {
TORRENT_ASSERT(is_network_thread());
std::map<int, int>::iterator i = m_as_peak.lower_bound(as); std::map<int, int>::iterator i = m_as_peak.lower_bound(as);
if (i == m_as_peak.end() || i->first != as) if (i == m_as_peak.end() || i->first != as)
@ -966,6 +991,8 @@ namespace aux {
void session_impl::load_asnum_db(std::string file) void session_impl::load_asnum_db(std::string file)
{ {
TORRENT_ASSERT(is_network_thread());
if (m_asnum_db) GeoIP_delete(m_asnum_db); if (m_asnum_db) GeoIP_delete(m_asnum_db);
m_asnum_db = GeoIP_open(file.c_str(), GEOIP_STANDARD); m_asnum_db = GeoIP_open(file.c_str(), GEOIP_STANDARD);
// return m_asnum_db; // return m_asnum_db;
@ -974,6 +1001,8 @@ namespace aux {
#if TORRENT_USE_WSTRING #if TORRENT_USE_WSTRING
void session_impl::load_asnum_dbw(std::wstring file) void session_impl::load_asnum_dbw(std::wstring file)
{ {
TORRENT_ASSERT(is_network_thread());
if (m_asnum_db) GeoIP_delete(m_asnum_db); if (m_asnum_db) GeoIP_delete(m_asnum_db);
std::string utf8; std::string utf8;
wchar_utf8(file, utf8); wchar_utf8(file, utf8);
@ -983,6 +1012,8 @@ namespace aux {
void session_impl::load_country_dbw(std::wstring file) void session_impl::load_country_dbw(std::wstring file)
{ {
TORRENT_ASSERT(is_network_thread());
if (m_country_db) GeoIP_delete(m_country_db); if (m_country_db) GeoIP_delete(m_country_db);
std::string utf8; std::string utf8;
wchar_utf8(file, utf8); wchar_utf8(file, utf8);
@ -993,6 +1024,8 @@ namespace aux {
void session_impl::load_country_db(std::string file) void session_impl::load_country_db(std::string file)
{ {
TORRENT_ASSERT(is_network_thread());
if (m_country_db) GeoIP_delete(m_country_db); if (m_country_db) GeoIP_delete(m_country_db);
m_country_db = GeoIP_open(file.c_str(), GEOIP_STANDARD); m_country_db = GeoIP_open(file.c_str(), GEOIP_STANDARD);
// return m_country_db; // return m_country_db;
@ -1004,6 +1037,7 @@ namespace aux {
void session_impl::add_extension( void session_impl::add_extension(
boost::function<boost::shared_ptr<torrent_plugin>(torrent*, void*)> ext) boost::function<boost::shared_ptr<torrent_plugin>(torrent*, void*)> ext)
{ {
TORRENT_ASSERT(is_network_thread());
TORRENT_ASSERT_VAL(ext, ext); TORRENT_ASSERT_VAL(ext, ext);
typedef boost::shared_ptr<torrent_plugin>(*function_t)(torrent*, void*); typedef boost::shared_ptr<torrent_plugin>(*function_t)(torrent*, void*);
@ -1022,12 +1056,16 @@ namespace aux {
#ifndef TORRENT_DISABLE_DHT #ifndef TORRENT_DISABLE_DHT
void session_impl::add_dht_node(udp::endpoint n) void session_impl::add_dht_node(udp::endpoint n)
{ {
TORRENT_ASSERT(is_network_thread());
if (m_dht) m_dht->add_node(n); if (m_dht) m_dht->add_node(n);
} }
#endif #endif
void session_impl::pause() void session_impl::pause()
{ {
TORRENT_ASSERT(is_network_thread());
if (m_paused) return; if (m_paused) return;
#if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING
(*m_logger) << time_now_string() << " *** session paused ***\n"; (*m_logger) << time_now_string() << " *** session paused ***\n";
@ -1043,6 +1081,8 @@ namespace aux {
void session_impl::resume() void session_impl::resume()
{ {
TORRENT_ASSERT(is_network_thread());
if (!m_paused) return; if (!m_paused) return;
m_paused = false; m_paused = false;
for (torrent_map::iterator i = m_torrents.begin() for (torrent_map::iterator i = m_torrents.begin()
@ -1055,6 +1095,8 @@ namespace aux {
void session_impl::abort() void session_impl::abort()
{ {
TORRENT_ASSERT(is_network_thread());
if (m_abort) return; if (m_abort) return;
#if defined TORRENT_LOGGING #if defined TORRENT_LOGGING
(*m_logger) << time_now_string() << " *** ABORT CALLED ***\n"; (*m_logger) << time_now_string() << " *** ABORT CALLED ***\n";
@ -1449,6 +1491,8 @@ namespace aux {
void session_impl::open_listen_port(bool reuse_address) void session_impl::open_listen_port(bool reuse_address)
{ {
TORRENT_ASSERT(is_network_thread());
// close the open listen sockets // close the open listen sockets
m_listen_sockets.clear(); m_listen_sockets.clear();
m_incoming_connection = false; m_incoming_connection = false;
@ -3028,6 +3072,9 @@ namespace aux {
TORRENT_ASSERT(is_network_thread()); TORRENT_ASSERT(is_network_thread());
eh_initializer(); eh_initializer();
// initialize async operations
start();
bool stop_loop = false; bool stop_loop = false;
while (!stop_loop) while (!stop_loop)
{ {

View File

@ -36,6 +36,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/connection_queue.hpp" #include "libtorrent/connection_queue.hpp"
#include "libtorrent/escape_string.hpp" #include "libtorrent/escape_string.hpp"
#include "libtorrent/socket_io.hpp" #include "libtorrent/socket_io.hpp"
#include "libtorrent/error.hpp"
#include <stdlib.h> #include <stdlib.h>
#include <boost/bind.hpp> #include <boost/bind.hpp>
#include <boost/array.hpp> #include <boost/array.hpp>
@ -79,7 +80,7 @@ udp_socket::~udp_socket()
#ifdef TORRENT_DEBUG #ifdef TORRENT_DEBUG
TORRENT_ASSERT(m_magic == 0x1337); TORRENT_ASSERT(m_magic == 0x1337);
TORRENT_ASSERT(!m_callback || !m_started); TORRENT_ASSERT(!m_callback || !m_started);
TORRENT_ASSERT(m_outstanding == 0); TORRENT_ASSERT_VAL(m_outstanding == 0, m_outstanding);
m_magic = 0; m_magic = 0;
#endif #endif
} }
@ -420,11 +421,17 @@ void udp_socket::close()
TORRENT_ASSERT(m_magic == 0x1337); TORRENT_ASSERT(m_magic == 0x1337);
error_code ec; error_code ec;
// if we close the socket here, we can't shut down
// utp connections or NAT-PMP. We need to cancel the
// outstanding operations
m_ipv4_sock.cancel(ec); m_ipv4_sock.cancel(ec);
TORRENT_ASSERT_VAL(!ec || ec == error::bad_descriptor, ec);
#if TORRENT_USE_IPV6 #if TORRENT_USE_IPV6
m_ipv6_sock.cancel(ec); m_ipv6_sock.cancel(ec);
TORRENT_ASSERT_VAL(!ec || ec == error::bad_descriptor, ec);
#endif #endif
m_socks5_sock.cancel(ec); m_socks5_sock.cancel(ec);
TORRENT_ASSERT_VAL(!ec || ec == error::bad_descriptor, ec);
m_resolver.cancel(); m_resolver.cancel();
m_abort = true; m_abort = true;
@ -615,7 +622,7 @@ void udp_socket::on_connected(error_code const& e)
write_uint8(0, p); // no authentication write_uint8(0, p); // no authentication
write_uint8(2, p); // username/password write_uint8(2, p); // username/password
} }
TORRENT_ASSERT(p - m_tmp_buf < sizeof(m_tmp_buf)); TORRENT_ASSERT_VAL(p - m_tmp_buf < sizeof(m_tmp_buf), (p - m_tmp_buf));
asio::async_write(m_socks5_sock, asio::buffer(m_tmp_buf, p - m_tmp_buf) asio::async_write(m_socks5_sock, asio::buffer(m_tmp_buf, p - m_tmp_buf)
, boost::bind(&udp_socket::handshake1, this, _1)); , boost::bind(&udp_socket::handshake1, this, _1));
} }
@ -666,7 +673,7 @@ void udp_socket::handshake2(error_code const& e)
write_string(m_proxy_settings.username, p); write_string(m_proxy_settings.username, p);
write_uint8(m_proxy_settings.password.size(), p); write_uint8(m_proxy_settings.password.size(), p);
write_string(m_proxy_settings.password, p); write_string(m_proxy_settings.password, p);
TORRENT_ASSERT(p - m_tmp_buf < sizeof(m_tmp_buf)); TORRENT_ASSERT_VAL(p - m_tmp_buf < sizeof(m_tmp_buf), (p - m_tmp_buf));
asio::async_write(m_socks5_sock, asio::buffer(m_tmp_buf, p - m_tmp_buf) asio::async_write(m_socks5_sock, asio::buffer(m_tmp_buf, p - m_tmp_buf)
, boost::bind(&udp_socket::handshake3, this, _1)); , boost::bind(&udp_socket::handshake3, this, _1));
} }
@ -732,7 +739,7 @@ void udp_socket::socks_forward_udp(mutex::scoped_lock& l)
port = m_ipv6_sock.local_endpoint(ec).port(); port = m_ipv6_sock.local_endpoint(ec).port();
#endif #endif
detail::write_uint16(port , p); detail::write_uint16(port , p);
TORRENT_ASSERT(p - m_tmp_buf < sizeof(m_tmp_buf)); TORRENT_ASSERT_VAL(p - m_tmp_buf < sizeof(m_tmp_buf), (p - m_tmp_buf));
asio::async_write(m_socks5_sock, asio::buffer(m_tmp_buf, p - m_tmp_buf) asio::async_write(m_socks5_sock, asio::buffer(m_tmp_buf, p - m_tmp_buf)
, boost::bind(&udp_socket::connect1, this, _1)); , boost::bind(&udp_socket::connect1, this, _1));
} }
@ -828,7 +835,7 @@ rate_limited_udp_socket::rate_limited_udp_socket(io_service& ios
error_code ec; error_code ec;
m_timer.expires_from_now(seconds(1), ec); m_timer.expires_from_now(seconds(1), ec);
m_timer.async_wait(boost::bind(&rate_limited_udp_socket::on_tick, this, _1)); m_timer.async_wait(boost::bind(&rate_limited_udp_socket::on_tick, this, _1));
TORRENT_ASSERT(!ec); TORRENT_ASSERT_VAL(!ec, ec);
} }
bool rate_limited_udp_socket::send(udp::endpoint const& ep, char const* p, int len, error_code& ec, int flags) bool rate_limited_udp_socket::send(udp::endpoint const& ep, char const* p, int len, error_code& ec, int flags)