diff --git a/include/libtorrent/alert.hpp b/include/libtorrent/alert.hpp index 96624d350..b43d17589 100644 --- a/include/libtorrent/alert.hpp +++ b/include/libtorrent/alert.hpp @@ -58,6 +58,7 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/time.hpp" #include "libtorrent/config.hpp" #include "libtorrent/assert.hpp" +#include "libtorrent/socket.hpp" // for io_service #ifndef TORRENT_MAX_ALERT_TYPES #define TORRENT_MAX_ALERT_TYPES 15 @@ -114,7 +115,7 @@ namespace libtorrent { public: enum { queue_size_limit_default = 1000 }; - alert_manager(); + alert_manager(io_service& ios); ~alert_manager(); void post_alert(const alert& alert_); @@ -149,6 +150,7 @@ namespace libtorrent { int m_alert_mask; size_t m_queue_size_limit; boost::function m_dispatch; + io_service& m_ios; }; struct TORRENT_EXPORT unhandled_alert : std::exception diff --git a/include/libtorrent/aux_/session_impl.hpp b/include/libtorrent/aux_/session_impl.hpp index c40a1f13c..bef6f705f 100644 --- a/include/libtorrent/aux_/session_impl.hpp +++ b/include/libtorrent/aux_/session_impl.hpp @@ -48,7 +48,6 @@ POSSIBILITY OF SUCH DAMAGE. #include #include -#include #include #include @@ -162,7 +161,7 @@ namespace libtorrent // must be locked to access the data // in this struct - typedef boost::recursive_mutex mutex_t; + typedef boost::mutex mutex_t; mutable mutex_t m_mutex; boost::weak_ptr find_torrent(const sha1_hash& info_hash); @@ -183,7 +182,7 @@ namespace libtorrent void start_dht(entry const& startup_state); void stop_dht(); - entry dht_state() const; + entry dht_state(session_impl::mutex_t::scoped_lock& l) const; void maybe_update_udp_mapping(int nat, int local_port, int external_port); #endif @@ -333,9 +332,6 @@ namespace libtorrent m_total_failed_bytes += b; } - // handles delayed alerts - alert_manager m_alerts; - std::pair allocate_buffer(int size); void free_buffer(char* buf, int size); @@ -347,7 +343,7 @@ namespace libtorrent // private: - void dht_state_callback(boost::condition& c + void on_dht_state_callback(boost::condition& c , entry& e, bool& done) const; void on_lsd_peer(tcp::endpoint peer, sha1_hash const& ih); void setup_socket_buffers(socket_type& s); @@ -408,6 +404,9 @@ namespace libtorrent // them mutable io_service m_io_service; + // handles delayed alerts + alert_manager m_alerts; + // handles disk io requests asynchronously // peers have pointers into the disk buffer // pool, and must be destructed before this diff --git a/include/libtorrent/connection_queue.hpp b/include/libtorrent/connection_queue.hpp index abfce1de1..c5b8c8939 100644 --- a/include/libtorrent/connection_queue.hpp +++ b/include/libtorrent/connection_queue.hpp @@ -36,7 +36,7 @@ POSSIBILITY OF SUCH DAMAGE. #include #include #include -#include +#include #include "libtorrent/socket.hpp" #include "libtorrent/time.hpp" @@ -71,8 +71,11 @@ public: private: - void try_connect(); + typedef boost::mutex mutex_t; + + void try_connect(mutex_t::scoped_lock& l); void on_timeout(error_code const& e); + void on_try_connect(); struct entry { @@ -98,7 +101,6 @@ private: deadline_timer m_timer; - typedef boost::recursive_mutex mutex_t; mutable mutex_t m_mutex; #ifdef TORRENT_DEBUG diff --git a/include/libtorrent/peer_connection.hpp b/include/libtorrent/peer_connection.hpp index d87042e4e..ca0fd0adf 100644 --- a/include/libtorrent/peer_connection.hpp +++ b/include/libtorrent/peer_connection.hpp @@ -307,7 +307,7 @@ namespace libtorrent ptime connected_time() const { return m_connect; } ptime last_received() const { return m_last_receive; } - void timed_out(); + void on_timeout(); // this will cause this peer_connection to be disconnected. void disconnect(char const* message, int error = 0); bool is_disconnecting() const { return m_disconnecting; } @@ -330,7 +330,7 @@ namespace libtorrent // initiate the tcp connection. This may be postponed until // the library isn't using up the limitation of half-open // tcp connections. - void connect(int ticket); + void on_connect(int ticket); // This is called for every peer right after the upload // bandwidth has been distributed among them diff --git a/include/libtorrent/torrent.hpp b/include/libtorrent/torrent.hpp index ba2fa5dee..075bc7b84 100644 --- a/include/libtorrent/torrent.hpp +++ b/include/libtorrent/torrent.hpp @@ -152,7 +152,8 @@ namespace libtorrent void on_resume_data_checked(int ret, disk_io_job const& j); void on_force_recheck(int ret, disk_io_job const& j); void on_piece_checked(int ret, disk_io_job const& j); - void files_checked(); + void files_checked_lock(); + void files_checked(aux::session_impl::mutex_t::scoped_lock const&); void start_checking(); void start_announcing(); diff --git a/src/alert.cpp b/src/alert.cpp index 6da97ed0a..103c0b407 100644 --- a/src/alert.cpp +++ b/src/alert.cpp @@ -35,6 +35,7 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/alert.hpp" #include #include +#include namespace libtorrent { @@ -42,9 +43,10 @@ namespace libtorrent { alert::~alert() {} ptime alert::timestamp() const { return m_timestamp; } - alert_manager::alert_manager() + alert_manager::alert_manager(io_service& ios) : m_alert_mask(alert::error_notification) , m_queue_size_limit(queue_size_limit_default) + , m_ios(ios) {} alert_manager::~alert_manager() @@ -76,9 +78,10 @@ namespace libtorrent { xt.nsec = boost::xtime::xtime_nsec_t(nsec); // apparently this call can be interrupted // prematurely if there are other signals - if (!m_condition.timed_wait(lock, xt)) return 0; - if (m_alerts.empty()) return 0; - return m_alerts.front(); + while (m_condition.timed_wait(lock, xt)) + if (!m_alerts.empty()) return m_alerts.front(); + + return 0; } void alert_manager::set_dispatch_function(boost::function const& fun) @@ -87,14 +90,25 @@ namespace libtorrent { m_dispatch = fun; - while (!m_alerts.empty()) + std::queue alerts = m_alerts; + while (!m_alerts.empty()) m_alerts.pop(); + lock.unlock(); + + while (!alerts.empty()) { - m_dispatch(*m_alerts.front()); - delete m_alerts.front(); - m_alerts.pop(); + m_dispatch(*alerts.front()); + delete alerts.front(); + alerts.pop(); } } + void dispatch_alert(boost::function dispatcher + , alert* alert_) + { + std::auto_ptr holder(alert_); + dispatcher(*alert_); + } + void alert_manager::post_alert(const alert& alert_) { boost::mutex::scoped_lock lock(m_mutex); @@ -102,7 +116,7 @@ namespace libtorrent { if (m_dispatch) { TORRENT_ASSERT(m_alerts.empty()); - m_dispatch(alert_); + m_ios.post(boost::bind(&dispatch_alert, m_dispatch, alert_.clone().release())); return; } diff --git a/src/connection_queue.cpp b/src/connection_queue.cpp index 131ec9725..e6af84726 100644 --- a/src/connection_queue.cpp +++ b/src/connection_queue.cpp @@ -90,7 +90,11 @@ namespace libtorrent e->ticket = m_next_ticket; e->timeout = timeout; ++m_next_ticket; - try_connect(); + + if (m_num_connecting < m_half_open_limit + || m_half_open_limit == 0) + m_timer.get_io_service().post(boost::bind( + &connection_queue::on_try_connect, this)); } void connection_queue::done(int ticket) @@ -108,7 +112,11 @@ namespace libtorrent } if (i->connecting) --m_num_connecting; m_queue.erase(i); - try_connect(); + + if (m_num_connecting < m_half_open_limit + || m_half_open_limit == 0) + m_timer.get_io_service().post(boost::bind( + &connection_queue::on_try_connect, this)); } void connection_queue::close() @@ -126,7 +134,13 @@ namespace libtorrent m_queue.pop_front(); if (e.connecting) --m_num_connecting; l.unlock(); - try { e.on_timeout(); } catch (std::exception&) {} +#ifndef BOOST_NO_EXCEPTIONS + try { +#endif + e.on_timeout(); +#ifndef BOOST_NO_EXCEPTIONS + } catch (std::exception&) {} +#endif l.lock(); } } @@ -155,9 +169,10 @@ namespace libtorrent #endif - void connection_queue::try_connect() + void connection_queue::try_connect(connection_queue::mutex_t::scoped_lock& l) { INVARIANT_CHECK; + TORRENT_ASSERT(l.owns_lock()); #ifdef TORRENT_CONNECTION_LOGGING m_log << log_time() << " " << free_slots() << std::endl; @@ -206,6 +221,8 @@ namespace libtorrent i = std::find_if(i, m_queue.end(), boost::bind(&entry::connecting, _1) == false); } + l.unlock(); + while (!to_connect.empty()) { entry& ent = to_connect.front(); @@ -218,6 +235,7 @@ namespace libtorrent #endif to_connect.pop_front(); } + } #ifdef TORRENT_DEBUG @@ -268,7 +286,13 @@ namespace libtorrent for (std::list::iterator i = timed_out.begin() , end(timed_out.end()); i != end; ++i) { - try { i->on_timeout(); } catch (std::exception&) {} +#ifndef BOOST_NO_EXCEPTIONS + try { +#endif + i->on_timeout(); +#ifndef BOOST_NO_EXCEPTIONS + } catch (std::exception&) {} +#endif } l.lock(); @@ -279,8 +303,13 @@ namespace libtorrent m_timer.expires_at(next_expire, ec); m_timer.async_wait(boost::bind(&connection_queue::on_timeout, this, _1)); } - try_connect(); + try_connect(l); } + void connection_queue::on_try_connect() + { + mutex_t::scoped_lock l(m_mutex); + try_connect(l); + } } diff --git a/src/peer_connection.cpp b/src/peer_connection.cpp index 271ef2b60..c16998324 100644 --- a/src/peer_connection.cpp +++ b/src/peer_connection.cpp @@ -2707,8 +2707,10 @@ namespace libtorrent } } - void peer_connection::timed_out() + void peer_connection::on_timeout() { + session_impl::mutex_t::scoped_lock l(m_ses.m_mutex); + TORRENT_ASSERT(m_connecting); #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING error_code ec; @@ -2723,8 +2725,6 @@ namespace libtorrent // 2 protocol error (client sent something invalid) void peer_connection::disconnect(char const* message, int error) { - session_impl::mutex_t::scoped_lock l(m_ses.m_mutex); - #ifdef TORRENT_DEBUG m_disconnect_started = true; #endif @@ -3545,8 +3545,6 @@ namespace libtorrent void peer_connection::assign_bandwidth(int channel, int amount) { - session_impl::mutex_t::scoped_lock l(m_ses.m_mutex); - #ifdef TORRENT_VERBOSE_LOGGING (*m_logger) << "bandwidth [ " << channel << " ] + " << amount << "\n"; #endif @@ -3608,8 +3606,6 @@ namespace libtorrent void peer_connection::setup_send() { - session_impl::mutex_t::scoped_lock l(m_ses.m_mutex); - if (m_channel_state[upload_channel] != peer_info::bw_idle) return; shared_ptr t = m_torrent.lock(); @@ -3690,8 +3686,6 @@ namespace libtorrent void peer_connection::setup_receive() { - session_impl::mutex_t::scoped_lock l(m_ses.m_mutex); - INVARIANT_CHECK; if (m_channel_state[download_channel] != peer_info::bw_idle) return; @@ -4098,8 +4092,9 @@ namespace libtorrent return ret; } - void peer_connection::connect(int ticket) + void peer_connection::on_connect(int ticket) { + session_impl::mutex_t::scoped_lock l(m_ses.m_mutex); #ifdef TORRENT_DEBUG // in case we disconnect here, we need to // keep the connection alive until the diff --git a/src/session.cpp b/src/session.cpp index 187399730..c9cfa4493 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -301,6 +301,7 @@ namespace libtorrent session::~session() { + session_impl::mutex_t::scoped_lock l(m_impl->m_mutex); #ifdef TORRENT_MEMDEBUG stop_malloc_debug(); #endif @@ -315,6 +316,7 @@ namespace libtorrent #ifndef TORRENT_DISABLE_EXTENSIONS void session::add_extension(boost::function(torrent*, void*)> ext) { + session_impl::mutex_t::scoped_lock l(m_impl->m_mutex); m_impl->add_extension(ext); } #endif @@ -322,11 +324,13 @@ namespace libtorrent #ifndef TORRENT_DISABLE_GEO_IP bool session::load_asnum_db(char const* file) { + session_impl::mutex_t::scoped_lock l(m_impl->m_mutex); return m_impl->load_asnum_db(file); } bool session::load_country_db(char const* file) { + session_impl::mutex_t::scoped_lock l(m_impl->m_mutex); return m_impl->load_country_db(file); } @@ -339,11 +343,13 @@ namespace libtorrent #ifndef BOOST_FILESYSTEM_NARROW_ONLY bool session::load_asnum_db(wchar_t const* file) { + session_impl::mutex_t::scoped_lock l(m_impl->m_mutex); return m_impl->load_asnum_db(file); } bool session::load_country_db(wchar_t const* file) { + session_impl::mutex_t::scoped_lock l(m_impl->m_mutex); return m_impl->load_country_db(file); } #endif @@ -351,57 +357,69 @@ namespace libtorrent void session::load_state(entry const& ses_state) { + session_impl::mutex_t::scoped_lock l(m_impl->m_mutex); m_impl->load_state(ses_state); } entry session::state() const { + session_impl::mutex_t::scoped_lock l(m_impl->m_mutex); return m_impl->state(); } void session::set_ip_filter(ip_filter const& f) { + session_impl::mutex_t::scoped_lock l(m_impl->m_mutex); m_impl->set_ip_filter(f); } void session::set_port_filter(port_filter const& f) { + session_impl::mutex_t::scoped_lock l(m_impl->m_mutex); m_impl->set_port_filter(f); } void session::set_peer_id(peer_id const& id) { + session_impl::mutex_t::scoped_lock l(m_impl->m_mutex); m_impl->set_peer_id(id); } peer_id session::id() const { + session_impl::mutex_t::scoped_lock l(m_impl->m_mutex); return m_impl->get_peer_id(); } io_service& session::get_io_service() { + session_impl::mutex_t::scoped_lock l(m_impl->m_mutex); return m_impl->m_io_service; } void session::set_key(int key) { + session_impl::mutex_t::scoped_lock l(m_impl->m_mutex); m_impl->set_key(key); } std::vector session::get_torrents() const { + session_impl::mutex_t::scoped_lock l(m_impl->m_mutex); return m_impl->get_torrents(); } torrent_handle session::find_torrent(sha1_hash const& info_hash) const { + session_impl::mutex_t::scoped_lock l(m_impl->m_mutex); return m_impl->find_torrent_handle(info_hash); } #ifndef BOOST_NO_EXCEPTIONS torrent_handle session::add_torrent(add_torrent_params const& params) { + session_impl::mutex_t::scoped_lock l(m_impl->m_mutex); + error_code ec; torrent_handle ret = m_impl->add_torrent(params, ec); if (ec) throw libtorrent_exception(ec); @@ -411,6 +429,7 @@ namespace libtorrent torrent_handle session::add_torrent(add_torrent_params const& params, error_code& ec) { + session_impl::mutex_t::scoped_lock l(m_impl->m_mutex); return m_impl->add_torrent(params, ec); } @@ -488,6 +507,7 @@ namespace libtorrent void session::remove_torrent(const torrent_handle& h, int options) { + session_impl::mutex_t::scoped_lock l(m_impl->m_mutex); m_impl->remove_torrent(h, options); } @@ -495,31 +515,50 @@ namespace libtorrent std::pair const& port_range , const char* net_interface) { + session_impl::mutex_t::scoped_lock l(m_impl->m_mutex); return m_impl->listen_on(port_range, net_interface); } unsigned short session::listen_port() const { + session_impl::mutex_t::scoped_lock l(m_impl->m_mutex); return m_impl->listen_port(); } session_status session::status() const { + session_impl::mutex_t::scoped_lock l(m_impl->m_mutex); return m_impl->status(); } - void session::pause() { m_impl->pause(); } - void session::resume() { m_impl->resume(); } - bool session::is_paused() const { return m_impl->is_paused(); } + void session::pause() + { + session_impl::mutex_t::scoped_lock l(m_impl->m_mutex); + m_impl->pause(); + } + + void session::resume() + { + session_impl::mutex_t::scoped_lock l(m_impl->m_mutex); + m_impl->resume(); + } + + bool session::is_paused() const + { + session_impl::mutex_t::scoped_lock l(m_impl->m_mutex); + return m_impl->is_paused(); + } void session::get_cache_info(sha1_hash const& ih , std::vector& ret) const { + session_impl::mutex_t::scoped_lock l(m_impl->m_mutex); m_impl->m_disk_thread.get_cache_info(ih, ret); } cache_status session::get_cache_status() const { + session_impl::mutex_t::scoped_lock l(m_impl->m_mutex); return m_impl->m_disk_thread.status(); } @@ -527,31 +566,37 @@ namespace libtorrent void session::start_dht(entry const& startup_state) { + session_impl::mutex_t::scoped_lock l(m_impl->m_mutex); m_impl->start_dht(startup_state); } void session::stop_dht() { + session_impl::mutex_t::scoped_lock l(m_impl->m_mutex); m_impl->stop_dht(); } void session::set_dht_settings(dht_settings const& settings) { + session_impl::mutex_t::scoped_lock l(m_impl->m_mutex); m_impl->set_dht_settings(settings); } entry session::dht_state() const { - return m_impl->dht_state(); + session_impl::mutex_t::scoped_lock l(m_impl->m_mutex); + return m_impl->dht_state(l); } void session::add_dht_node(std::pair const& node) { + session_impl::mutex_t::scoped_lock l(m_impl->m_mutex); m_impl->add_dht_node(node); } void session::add_dht_router(std::pair const& node) { + session_impl::mutex_t::scoped_lock l(m_impl->m_mutex); m_impl->add_dht_router(node); } @@ -560,57 +605,68 @@ namespace libtorrent #ifndef TORRENT_DISABLE_ENCRYPTION void session::set_pe_settings(pe_settings const& settings) { + session_impl::mutex_t::scoped_lock l(m_impl->m_mutex); m_impl->set_pe_settings(settings); } pe_settings const& session::get_pe_settings() const { + session_impl::mutex_t::scoped_lock l(m_impl->m_mutex); return m_impl->get_pe_settings(); } #endif bool session::is_listening() const { + session_impl::mutex_t::scoped_lock l(m_impl->m_mutex); return m_impl->is_listening(); } void session::set_settings(session_settings const& s) { + session_impl::mutex_t::scoped_lock l(m_impl->m_mutex); m_impl->set_settings(s); } session_settings const& session::settings() { + session_impl::mutex_t::scoped_lock l(m_impl->m_mutex); return m_impl->settings(); } void session::set_peer_proxy(proxy_settings const& s) { + session_impl::mutex_t::scoped_lock l(m_impl->m_mutex); m_impl->set_peer_proxy(s); } void session::set_web_seed_proxy(proxy_settings const& s) { + session_impl::mutex_t::scoped_lock l(m_impl->m_mutex); m_impl->set_web_seed_proxy(s); } void session::set_tracker_proxy(proxy_settings const& s) { + session_impl::mutex_t::scoped_lock l(m_impl->m_mutex); m_impl->set_tracker_proxy(s); } proxy_settings const& session::peer_proxy() const { + session_impl::mutex_t::scoped_lock l(m_impl->m_mutex); return m_impl->peer_proxy(); } proxy_settings const& session::web_seed_proxy() const { + session_impl::mutex_t::scoped_lock l(m_impl->m_mutex); return m_impl->web_seed_proxy(); } proxy_settings const& session::tracker_proxy() const { + session_impl::mutex_t::scoped_lock l(m_impl->m_mutex); return m_impl->tracker_proxy(); } @@ -618,42 +674,50 @@ namespace libtorrent #ifndef TORRENT_DISABLE_DHT void session::set_dht_proxy(proxy_settings const& s) { + session_impl::mutex_t::scoped_lock l(m_impl->m_mutex); m_impl->set_dht_proxy(s); } proxy_settings const& session::dht_proxy() const { + session_impl::mutex_t::scoped_lock l(m_impl->m_mutex); return m_impl->dht_proxy(); } #endif int session::max_uploads() const { + session_impl::mutex_t::scoped_lock l(m_impl->m_mutex); return m_impl->max_uploads(); } void session::set_max_uploads(int limit) { + session_impl::mutex_t::scoped_lock l(m_impl->m_mutex); m_impl->set_max_uploads(limit); } int session::max_connections() const { + session_impl::mutex_t::scoped_lock l(m_impl->m_mutex); return m_impl->max_connections(); } void session::set_max_connections(int limit) { + session_impl::mutex_t::scoped_lock l(m_impl->m_mutex); m_impl->set_max_connections(limit); } int session::max_half_open_connections() const { + session_impl::mutex_t::scoped_lock l(m_impl->m_mutex); return m_impl->max_half_open_connections(); } void session::set_max_half_open_connections(int limit) { + session_impl::mutex_t::scoped_lock l(m_impl->m_mutex); m_impl->set_max_half_open_connections(limit); } @@ -669,11 +733,13 @@ namespace libtorrent int session::upload_rate_limit() const { + session_impl::mutex_t::scoped_lock l(m_impl->m_mutex); return m_impl->upload_rate_limit(); } int session::download_rate_limit() const { + session_impl::mutex_t::scoped_lock l(m_impl->m_mutex); return m_impl->download_rate_limit(); } @@ -689,26 +755,31 @@ namespace libtorrent void session::set_upload_rate_limit(int bytes_per_second) { + session_impl::mutex_t::scoped_lock l(m_impl->m_mutex); m_impl->set_upload_rate_limit(bytes_per_second); } void session::set_download_rate_limit(int bytes_per_second) { + session_impl::mutex_t::scoped_lock l(m_impl->m_mutex); m_impl->set_download_rate_limit(bytes_per_second); } int session::num_uploads() const { + session_impl::mutex_t::scoped_lock l(m_impl->m_mutex); return m_impl->num_uploads(); } int session::num_connections() const { + session_impl::mutex_t::scoped_lock l(m_impl->m_mutex); return m_impl->num_connections(); } std::auto_ptr session::pop_alert() { + session_impl::mutex_t::scoped_lock l(m_impl->m_mutex); return m_impl->pop_alert(); } @@ -724,17 +795,20 @@ namespace libtorrent void session::set_alert_mask(int m) { + session_impl::mutex_t::scoped_lock l(m_impl->m_mutex); m_impl->set_alert_mask(m); } size_t session::set_alert_queue_size_limit(size_t queue_size_limit_) { + session_impl::mutex_t::scoped_lock l(m_impl->m_mutex); return m_impl->set_alert_queue_size_limit(queue_size_limit_); } #ifndef TORRENT_NO_DEPRECATE void session::set_severity_level(alert::severity_t s) { + session_impl::mutex_t::scoped_lock l(m_impl->m_mutex); int m = 0; switch (s) { @@ -755,36 +829,43 @@ namespace libtorrent void session::start_lsd() { + session_impl::mutex_t::scoped_lock l(m_impl->m_mutex); m_impl->start_lsd(); } natpmp* session::start_natpmp() { + session_impl::mutex_t::scoped_lock l(m_impl->m_mutex); return m_impl->start_natpmp(); } upnp* session::start_upnp() { + session_impl::mutex_t::scoped_lock l(m_impl->m_mutex); return m_impl->start_upnp(); } void session::stop_lsd() { + session_impl::mutex_t::scoped_lock l(m_impl->m_mutex); m_impl->stop_lsd(); } void session::stop_natpmp() { + session_impl::mutex_t::scoped_lock l(m_impl->m_mutex); m_impl->stop_natpmp(); } void session::stop_upnp() { + session_impl::mutex_t::scoped_lock l(m_impl->m_mutex); m_impl->stop_upnp(); } connection_queue& session::get_connection_queue() { + session_impl::mutex_t::scoped_lock l(m_impl->m_mutex); return m_impl->m_half_open; } } diff --git a/src/session_impl.cpp b/src/session_impl.cpp index 8a37702b0..f9904547b 100644 --- a/src/session_impl.cpp +++ b/src/session_impl.cpp @@ -173,6 +173,7 @@ namespace aux { #endif , m_files(40) , m_io_service() + , m_alerts(m_io_service) , m_disk_thread(m_io_service) , m_half_open(m_io_service) , m_download_rate(peer_connection::download_channel) @@ -379,7 +380,6 @@ namespace aux { bool session_impl::load_asnum_db(char const* file) { - mutex_t::scoped_lock l(m_mutex); if (m_asnum_db) GeoIP_delete(m_asnum_db); m_asnum_db = GeoIP_open(file, GEOIP_STANDARD); return m_asnum_db; @@ -388,7 +388,6 @@ namespace aux { #ifndef BOOST_FILESYSTEM_NARROW_ONLY bool session_impl::load_asnum_db(wchar_t const* file) { - mutex_t::scoped_lock l(m_mutex); if (m_asnum_db) GeoIP_delete(m_asnum_db); std::string utf8; wchar_utf8(file, utf8); @@ -398,7 +397,6 @@ namespace aux { bool session_impl::load_country_db(wchar_t const* file) { - mutex_t::scoped_lock l(m_mutex); if (m_country_db) GeoIP_delete(m_country_db); std::string utf8; wchar_utf8(file, utf8); @@ -409,7 +407,6 @@ namespace aux { bool session_impl::load_country_db(char const* file) { - mutex_t::scoped_lock l(m_mutex); if (m_country_db) GeoIP_delete(m_country_db); m_country_db = GeoIP_open(file, GEOIP_STANDARD); return m_country_db; @@ -420,7 +417,6 @@ namespace aux { void session_impl::load_state(entry const& ses_state) { if (ses_state.type() != entry::dictionary_t) return; - mutex_t::scoped_lock l(m_mutex); #ifndef TORRENT_DISABLE_GEO_IP entry const* as_map = ses_state.find_key("AS map"); if (as_map && as_map->type() == entry::dictionary_t) @@ -440,7 +436,6 @@ namespace aux { entry session_impl::state() const { - mutex_t::scoped_lock l(m_mutex); entry ret; #ifndef TORRENT_DISABLE_GEO_IP entry::dictionary_type& as_map = ret["AS map"].dict(); @@ -484,7 +479,6 @@ namespace aux { void session_impl::pause() { - mutex_t::scoped_lock l(m_mutex); if (m_paused) return; #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING (*m_logger) << time_now_string() << " *** session paused ***\n"; @@ -500,7 +494,6 @@ namespace aux { void session_impl::resume() { - mutex_t::scoped_lock l(m_mutex); if (!m_paused) return; m_paused = false; for (torrent_map::iterator i = m_torrents.begin() @@ -513,7 +506,6 @@ namespace aux { void session_impl::abort() { - mutex_t::scoped_lock l(m_mutex); if (m_abort) return; #if defined TORRENT_LOGGING (*m_logger) << time_now_string() << " *** ABORT CALLED ***\n"; @@ -571,7 +563,9 @@ namespace aux { #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING) (*m_logger) << time_now_string() << " aborting all connections (" << m_connections.size() << ")\n"; #endif - m_half_open.close(); + // closing all the connections needs to be done from a callback, + // when the session mutex is not held + m_io_service.post(boost::bind(&connection_queue::close, &m_half_open)); #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING) (*m_logger) << time_now_string() << " connection queue: " << m_half_open.size() << "\n"; @@ -590,7 +584,6 @@ namespace aux { #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING) (*m_logger) << time_now_string() << " connection queue: " << m_half_open.size() << "\n"; #endif - TORRENT_ASSERT(m_half_open.size() == 0); #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING) (*m_logger) << time_now_string() << " shutting down connection queue\n"; @@ -602,14 +595,11 @@ namespace aux { void session_impl::set_port_filter(port_filter const& f) { - mutex_t::scoped_lock l(m_mutex); m_port_filter = f; } void session_impl::set_ip_filter(ip_filter const& f) { - mutex_t::scoped_lock l(m_mutex); - INVARIANT_CHECK; m_ip_filter = f; @@ -623,8 +613,6 @@ namespace aux { void session_impl::set_settings(session_settings const& s) { - mutex_t::scoped_lock l(m_mutex); - INVARIANT_CHECK; TORRENT_ASSERT(s.file_pool_size > 0); @@ -922,7 +910,6 @@ namespace aux { if (e == asio::error::operation_aborted) return; - mutex_t::scoped_lock l(m_mutex); if (m_abort) return; error_code ec; @@ -1087,8 +1074,6 @@ namespace aux { void session_impl::close_connection(peer_connection const* p , char const* message) { - mutex_t::scoped_lock l(m_mutex); - // too expensive // INVARIANT_CHECK; @@ -1116,13 +1101,11 @@ namespace aux { void session_impl::set_peer_id(peer_id const& id) { - mutex_t::scoped_lock l(m_mutex); m_peer_id = id; } void session_impl::set_key(int key) { - mutex_t::scoped_lock l(m_mutex); m_key = key; } @@ -1961,9 +1944,10 @@ namespace aux { { eh_initializer(); + if (m_listen_interface.port() != 0) { session_impl::mutex_t::scoped_lock l(m_mutex); - if (m_listen_interface.port() != 0) open_listen_port(); + open_listen_port(); } do @@ -2036,7 +2020,6 @@ namespace aux { std::vector session_impl::get_torrents() { - mutex_t::scoped_lock l(m_mutex); std::vector ret; for (session_impl::torrent_map::iterator i @@ -2065,9 +2048,6 @@ namespace aux { return torrent_handle(); } - // lock the session and the checker thread (the order is important!) - mutex_t::scoped_lock l(m_mutex); - // INVARIANT_CHECK; if (is_aborted()) @@ -2179,8 +2159,6 @@ namespace aux { throw_invalid_handle(); #endif - mutex_t::scoped_lock l(m_mutex); - INVARIANT_CHECK; session_impl::torrent_map::iterator i = @@ -2210,8 +2188,6 @@ namespace aux { std::pair const& port_range , const char* net_interface) { - session_impl::mutex_t::scoped_lock l(m_mutex); - INVARIANT_CHECK; tcp::endpoint new_interface; @@ -2267,14 +2243,12 @@ namespace aux { unsigned short session_impl::listen_port() const { - mutex_t::scoped_lock l(m_mutex); if (m_listen_sockets.empty()) return 0; return m_listen_sockets.front().external_port; } void session_impl::announce_lsd(sha1_hash const& ih) { - mutex_t::scoped_lock l(m_mutex); // use internal listen port for local peers if (m_lsd.get()) m_lsd->announce(ih, m_listen_interface.port()); @@ -2353,8 +2327,6 @@ namespace aux { session_status session_impl::status() const { - mutex_t::scoped_lock l(m_mutex); - // INVARIANT_CHECK; session_status s; @@ -2428,8 +2400,6 @@ namespace aux { void session_impl::start_dht(entry const& startup_state) { - mutex_t::scoped_lock l(m_mutex); - INVARIANT_CHECK; if (m_dht) @@ -2506,7 +2476,6 @@ namespace aux { void session_impl::stop_dht() { - mutex_t::scoped_lock l(m_mutex); if (!m_dht) return; m_dht->stop(); m_dht = 0; @@ -2514,7 +2483,6 @@ namespace aux { void session_impl::set_dht_settings(dht_settings const& settings) { - mutex_t::scoped_lock l(m_mutex); // only change the dht listen port in case the settings // contains a vaiid port, and if it is different from // the current setting @@ -2537,7 +2505,7 @@ namespace aux { m_dht_settings.service_port = m_listen_interface.port(); } - void session_impl::dht_state_callback(boost::condition& c + void session_impl::on_dht_state_callback(boost::condition& c , entry& e, bool& done) const { mutex_t::scoped_lock l(m_mutex); @@ -2546,14 +2514,13 @@ namespace aux { c.notify_all(); } - entry session_impl::dht_state() const + entry session_impl::dht_state(session_impl::mutex_t::scoped_lock& l) const { boost::condition cond; - mutex_t::scoped_lock l(m_mutex); if (!m_dht) return entry(); entry e; bool done = false; - m_io_service.post(boost::bind(&session_impl::dht_state_callback + m_io_service.post(boost::bind(&session_impl::on_dht_state_callback , this, boost::ref(cond), boost::ref(e), boost::ref(done))); while (!done) cond.wait(l); return e; @@ -2562,14 +2529,12 @@ namespace aux { void session_impl::add_dht_node(std::pair const& node) { TORRENT_ASSERT(m_dht); - mutex_t::scoped_lock l(m_mutex); m_dht->add_node(node); } void session_impl::add_dht_router(std::pair const& node) { // router nodes should be added before the DHT is started (and bootstrapped) - mutex_t::scoped_lock l(m_mutex); if (m_dht) m_dht->add_router_node(node); else m_dht_router_nodes.push_back(node); } @@ -2579,25 +2544,26 @@ namespace aux { #ifndef TORRENT_DISABLE_ENCRYPTION void session_impl::set_pe_settings(pe_settings const& settings) { - mutex_t::scoped_lock l(m_mutex); m_pe_settings = settings; } #endif bool session_impl::is_listening() const { - mutex_t::scoped_lock l(m_mutex); return !m_listen_sockets.empty(); } session_impl::~session_impl() { + session_impl::mutex_t::scoped_lock l(m_mutex); + #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING) (*m_logger) << time_now_string() << "\n\n *** shutting down session *** \n\n"; #endif abort(); TORRENT_ASSERT(m_connections.empty()); + l.unlock(); // we need to wait for the disk-io thread to // die first, to make sure it won't post any // more messages to the io_service containing references @@ -2629,7 +2595,6 @@ namespace aux { void session_impl::set_max_uploads(int limit) { TORRENT_ASSERT(limit >= 0 || limit == -1); - mutex_t::scoped_lock l(m_mutex); INVARIANT_CHECK; @@ -2641,8 +2606,6 @@ namespace aux { void session_impl::set_max_connections(int limit) { - mutex_t::scoped_lock l(m_mutex); - INVARIANT_CHECK; if (limit <= 0) @@ -2663,8 +2626,6 @@ namespace aux { void session_impl::set_max_half_open_connections(int limit) { - mutex_t::scoped_lock l(m_mutex); - INVARIANT_CHECK; if (limit <= 0) limit = (std::numeric_limits::max)(); @@ -2693,8 +2654,6 @@ namespace aux { void session_impl::set_download_rate_limit(int bytes_per_second) { - mutex_t::scoped_lock l(m_mutex); - INVARIANT_CHECK; if (bytes_per_second <= 0) bytes_per_second = 0; @@ -2703,8 +2662,6 @@ namespace aux { void session_impl::set_upload_rate_limit(int bytes_per_second) { - mutex_t::scoped_lock l(m_mutex); - INVARIANT_CHECK; if (bytes_per_second <= 0) bytes_per_second = 0; @@ -2713,14 +2670,11 @@ namespace aux { void session_impl::set_alert_dispatch(boost::function const& fun) { - mutex_t::scoped_lock l(m_mutex); m_alerts.set_dispatch_function(fun); } std::auto_ptr session_impl::pop_alert() { - mutex_t::scoped_lock l(m_mutex); - // too expensive // INVARIANT_CHECK; @@ -2736,13 +2690,11 @@ namespace aux { void session_impl::set_alert_mask(int m) { - mutex_t::scoped_lock l(m_mutex); m_alerts.set_alert_mask(m); } size_t session_impl::set_alert_queue_size_limit(size_t queue_size_limit_) { - mutex_t::scoped_lock l(m_mutex); return m_alerts.set_alert_queue_size_limit(queue_size_limit_); } @@ -2766,14 +2718,11 @@ namespace aux { int session_impl::download_rate_limit() const { - mutex_t::scoped_lock l(m_mutex); return m_download_channel.throttle(); } void session_impl::start_lsd() { - mutex_t::scoped_lock l(m_mutex); - INVARIANT_CHECK; if (m_lsd) return; @@ -2785,8 +2734,6 @@ namespace aux { natpmp* session_impl::start_natpmp() { - mutex_t::scoped_lock l(m_mutex); - INVARIANT_CHECK; if (m_natpmp) return m_natpmp.get(); @@ -2812,8 +2759,6 @@ namespace aux { upnp* session_impl::start_upnp() { - mutex_t::scoped_lock l(m_mutex); - INVARIANT_CHECK; if (m_upnp) return m_upnp.get(); @@ -2842,7 +2787,6 @@ namespace aux { void session_impl::stop_lsd() { - mutex_t::scoped_lock l(m_mutex); if (m_lsd.get()) m_lsd->close(); m_lsd = 0; @@ -2850,7 +2794,6 @@ namespace aux { void session_impl::stop_natpmp() { - mutex_t::scoped_lock l(m_mutex); if (m_natpmp.get()) m_natpmp->close(); m_natpmp = 0; @@ -2858,7 +2801,6 @@ namespace aux { void session_impl::stop_upnp() { - mutex_t::scoped_lock l(m_mutex); if (m_upnp.get()) { m_upnp->close(); diff --git a/src/torrent.cpp b/src/torrent.cpp index 9e7bebde5..37031931e 100644 --- a/src/torrent.cpp +++ b/src/torrent.cpp @@ -546,7 +546,7 @@ namespace libtorrent if (m_seed_mode) { - m_ses.m_io_service.post(boost::bind(&torrent::files_checked, shared_from_this())); + m_ses.m_io_service.post(boost::bind(&torrent::files_checked_lock, shared_from_this())); std::vector().swap(m_resume_data); lazy_entry().swap(m_resume_entry); return; @@ -803,7 +803,7 @@ namespace libtorrent } } - files_checked(); + files_checked(l); } else { @@ -894,7 +894,7 @@ namespace libtorrent if (ret == 0) { // if there are no files, just start - files_checked(); + files_checked(l); } else { @@ -954,7 +954,7 @@ namespace libtorrent if (ret == piece_manager::need_full_check) return; dequeue_torrent_check(); - files_checked(); + files_checked(l); } void torrent::use_interface(const char* net_interface) @@ -1645,8 +1645,6 @@ namespace libtorrent // -2: piece failed check void torrent::piece_finished(int index, int passed_hash_check) { - session_impl::mutex_t::scoped_lock l(m_ses.m_mutex); - #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING (*m_ses.m_logger) << time_now_string() << " *** PIECE_FINISHED [ p: " << index << " chk: " << ((passed_hash_check == 0) @@ -2953,8 +2951,8 @@ namespace libtorrent c->start(); m_ses.m_half_open.enqueue( - bind(&peer_connection::connect, c, _1) - , bind(&peer_connection::timed_out, c) + bind(&peer_connection::on_connect, c, _1) + , bind(&peer_connection::on_timeout, c) , seconds(settings().peer_connect_timeout)); #ifndef BOOST_NO_EXCEPTIONS } @@ -3669,8 +3667,8 @@ namespace libtorrent { #endif m_ses.m_half_open.enqueue( - bind(&peer_connection::connect, c, _1) - , bind(&peer_connection::timed_out, c) + bind(&peer_connection::on_connect, c, _1) + , bind(&peer_connection::on_timeout, c) , seconds(timeout)); #ifndef BOOST_NO_EXCEPTIONS } @@ -3823,8 +3821,6 @@ namespace libtorrent void torrent::disconnect_all() { - session_impl::mutex_t::scoped_lock l(m_ses.m_mutex); - // doesn't work with the m_paused -> m_num_peers == 0 condition // INVARIANT_CHECK; @@ -4045,10 +4041,14 @@ namespace libtorrent return index; } - void torrent::files_checked() + void torrent::files_checked_lock() { session_impl::mutex_t::scoped_lock l(m_ses.m_mutex); - + files_checked(l); + } + + void torrent::files_checked(session_impl::mutex_t::scoped_lock const& l) + { TORRENT_ASSERT(m_torrent_file->is_valid()); if (m_abort) return; @@ -4211,8 +4211,6 @@ namespace libtorrent #ifdef TORRENT_DEBUG void torrent::check_invariant() const { - session_impl::mutex_t::scoped_lock l(m_ses.m_mutex); - if (is_paused()) TORRENT_ASSERT(num_peers() == 0); if (!should_check_files()) diff --git a/src/tracker_manager.cpp b/src/tracker_manager.cpp index 40966ed35..69c614ba8 100644 --- a/src/tracker_manager.cpp +++ b/src/tracker_manager.cpp @@ -183,7 +183,7 @@ namespace libtorrent void tracker_manager::sent_bytes(int bytes) { - aux::session_impl::mutex_t::scoped_lock l(m_ses.m_mutex); +// aux::session_impl::mutex_t::scoped_lock l(m_ses.m_mutex); m_ses.m_stat.sent_tracker_bytes(bytes); } diff --git a/test/test_pex.cpp b/test/test_pex.cpp index 680e96876..16aa2ae15 100644 --- a/test/test_pex.cpp +++ b/test/test_pex.cpp @@ -81,6 +81,11 @@ void test_pex() boost::tie(tor1, tor2, tor3) = setup_transfer(&ses1, &ses2, &ses3, true, false, false, "_pex"); + int mask = alert::all_categories & ~(alert::progress_notification | alert::performance_warning); + ses1.set_alert_mask(mask); + ses2.set_alert_mask(mask); + ses3.set_alert_mask(mask); + test_sleep(1000); // in this test, ses1 is a seed, ses2 is connected to ses1 and ses3. diff --git a/test/test_swarm.cpp b/test/test_swarm.cpp index 8bc5a4a5e..553d476f3 100644 --- a/test/test_swarm.cpp +++ b/test/test_swarm.cpp @@ -54,12 +54,8 @@ void test_swarm(bool super_seeding = false, bool strict = false, bool seed_mode try { remove_all("./tmp3_swarm"); } catch (std::exception&) {} session ses1(fingerprint("LT", 0, 1, 0, 0), std::make_pair(48000, 49000), "0.0.0.0", 0); - ses1.set_alert_mask(alert::all_categories & ~alert::progress_notification); session ses2(fingerprint("LT", 0, 1, 0, 0), std::make_pair(49000, 50000), "0.0.0.0", 0); - ses2.set_alert_mask(alert::all_categories & ~alert::progress_notification); session ses3(fingerprint("LT", 0, 1, 0, 0), std::make_pair(50000, 51000), "0.0.0.0", 0); - ses3.set_alert_mask(alert::all_categories & ~alert::progress_notification); - // this is to avoid everything finish from a single peer // immediately. To make the swarm actually connect all @@ -98,6 +94,11 @@ void test_swarm(bool super_seeding = false, bool strict = false, bool seed_mode boost::tie(tor1, tor2, tor3) = setup_transfer(&ses1, &ses2, &ses3, true , false, true, "_swarm", 32 * 1024, 0, super_seeding, &p); + int mask = alert::all_categories & ~(alert::progress_notification | alert::performance_warning); + ses1.set_alert_mask(mask); + ses2.set_alert_mask(mask); + ses3.set_alert_mask(mask); + if (time_critical) { tor2.set_piece_deadline(2, seconds(0));