From 8808eb7cdd6f3394b76e2b58df3fa37c1841f6d1 Mon Sep 17 00:00:00 2001 From: arvidn Date: Mon, 30 Apr 2018 00:34:41 +0200 Subject: [PATCH] introduce a recursive mutex to protect the alert_manager, and hold the mutex while calling user callbacks and plugin hooks. This is required to ensure the underlying storage for alert allocations (in the stack allocators) is synchronized with the thread that may be accessing it --- include/libtorrent/alert_manager.hpp | 24 ++-- .../aux_/alert_manager_variadic_emplace.hpp | 4 +- include/libtorrent/thread.hpp | 44 ++++++- src/alert_manager.cpp | 27 ++--- src/thread.cpp | 113 +++++++++++++++++- test/test_alert_manager.cpp | 29 +++++ 6 files changed, 206 insertions(+), 35 deletions(-) diff --git a/include/libtorrent/alert_manager.hpp b/include/libtorrent/alert_manager.hpp index 8449b09f4..6c7ead1c0 100644 --- a/include/libtorrent/alert_manager.hpp +++ b/include/libtorrent/alert_manager.hpp @@ -47,11 +47,12 @@ POSSIBILITY OF SUCH DAMAGE. #include #include #include -#include -#include // for std::forward #include "libtorrent/aux_/disable_warnings_pop.hpp" +#include +#include // for std::forward + #ifdef __GNUC__ // this is to suppress the warnings for using std::auto_ptr #pragma GCC diagnostic push @@ -80,7 +81,7 @@ namespace libtorrent { template void emplace_alert(Args&&... args) { - mutex::scoped_lock lock(m_mutex); + recursive_mutex::scoped_lock lock(m_mutex); #ifndef TORRENT_NO_DEPRECATE if (m_dispatch) { @@ -99,7 +100,7 @@ namespace libtorrent { T alert(m_allocations[m_generation], std::forward(args)...); m_alerts[m_generation].push_back(alert); - maybe_notify(&alert, lock); + maybe_notify(&alert); } #else @@ -116,7 +117,7 @@ namespace libtorrent { template bool should_post() const { - mutex::scoped_lock lock(m_mutex); + recursive_mutex::scoped_lock lock(m_mutex); if (m_alerts[m_generation].size() >= m_queue_size_limit * (1 + T::priority)) { @@ -129,13 +130,13 @@ namespace libtorrent { void set_alert_mask(boost::uint32_t m) { - mutex::scoped_lock lock(m_mutex); + recursive_mutex::scoped_lock lock(m_mutex); m_alert_mask = m; } boost::uint32_t alert_mask() const { - mutex::scoped_lock lock(m_mutex); + recursive_mutex::scoped_lock lock(m_mutex); return m_alert_mask; } @@ -160,9 +161,14 @@ namespace libtorrent { alert_manager(alert_manager const&); alert_manager& operator=(alert_manager const&); - void maybe_notify(alert* a, mutex::scoped_lock& lock); + void maybe_notify(alert* a); - mutable mutex m_mutex; + // this mutex protects everything. Since it's held while executing user + // callbacks (the notify function and extension on_alert()) it must be + // recursive to post new alerts. This is implemented by storing the + // current thread-id in m_mutex_holder, if it matches ours, we don't need + // to lock + mutable recursive_mutex m_mutex; condition_variable m_condition; boost::uint32_t m_alert_mask; int m_queue_size_limit; diff --git a/include/libtorrent/aux_/alert_manager_variadic_emplace.hpp b/include/libtorrent/aux_/alert_manager_variadic_emplace.hpp index 5c360eb40..180632edd 100644 --- a/include/libtorrent/aux_/alert_manager_variadic_emplace.hpp +++ b/include/libtorrent/aux_/alert_manager_variadic_emplace.hpp @@ -24,7 +24,7 @@ BOOST_PP_ENUM_PARAMS(I, typename A)> void emplace_alert(BOOST_PP_ENUM_BINARY_PARAMS(I, A, const& a) ) { - mutex::scoped_lock lock(m_mutex); + recursive_mutex::scoped_lock lock(m_mutex); #ifndef TORRENT_NO_DEPRECATE if (m_dispatch) { @@ -46,7 +46,7 @@ BOOST_PP_ENUM_PARAMS(I, a)); m_alerts[m_generation].push_back(alert); - maybe_notify(&alert, lock); + maybe_notify(&alert); } #undef I diff --git a/include/libtorrent/thread.hpp b/include/libtorrent/thread.hpp index abdef22ca..616a2cad6 100644 --- a/include/libtorrent/thread.hpp +++ b/include/libtorrent/thread.hpp @@ -53,6 +53,7 @@ POSSIBILITY OF SUCH DAMAGE. #include #include #include +#include #include #include "libtorrent/aux_/disable_warnings_pop.hpp" @@ -66,15 +67,25 @@ namespace libtorrent // internal void sleep(int milliseconds); + struct recursive_mutex; + struct TORRENT_EXTRA_EXPORT condition_variable { condition_variable(); ~condition_variable(); void wait(mutex::scoped_lock& l); - void wait_for(mutex::scoped_lock& l, time_duration rel_time); + void wait_for(mutex::scoped_lock& l, time_duration); + void wait(boost::asio::detail::scoped_lock& l); + void wait_for(boost::asio::detail::scoped_lock&, time_duration); void notify_all(); void notify(); private: + + template + void wait_impl(LockGuard& l); + template + void wait_for_impl(LockGuard& l, time_duration rel_time); + #ifdef BOOST_HAS_PTHREADS pthread_cond_t m_cond; #elif defined TORRENT_WINDOWS || defined TORRENT_CYGWIN @@ -89,6 +100,37 @@ namespace libtorrent #error not implemented #endif }; + +#if defined TORRENT_WINDOWS || defined TORRENT_CYGWIN + typedef DWORD thread_id_t; +#elif defined TORRENT_BEOS + typedef thread_id thread_id_t; +#endif + + // internal + struct recursive_mutex + { + typedef boost::asio::detail::scoped_lock scoped_lock; + + recursive_mutex(); + ~recursive_mutex(); + + void lock(); + void unlock(); + + private: + recursive_mutex(recursive_mutex const&); + recursive_mutex& operator=(recursive_mutex const&); + +#ifdef BOOST_HAS_PTHREADS + ::pthread_mutex_t m_mutex; +#else + mutex m_mutex; + condition_variable m_cond; + thread_id_t m_owner; + int m_count; +#endif + }; } #endif diff --git a/src/alert_manager.cpp b/src/alert_manager.cpp index 13d6a8083..47456d002 100644 --- a/src/alert_manager.cpp +++ b/src/alert_manager.cpp @@ -52,13 +52,13 @@ namespace libtorrent int alert_manager::num_queued_resume() const { - mutex::scoped_lock lock(m_mutex); + recursive_mutex::scoped_lock lock(m_mutex); return m_num_queued_resume; } alert* alert_manager::wait_for_alert(time_duration max_wait) { - mutex::scoped_lock lock(m_mutex); + recursive_mutex::scoped_lock lock(m_mutex); if (!m_alerts[m_generation].empty()) return m_alerts[m_generation].front(); @@ -71,7 +71,7 @@ namespace libtorrent return NULL; } - void alert_manager::maybe_notify(alert* a, mutex::scoped_lock& lock) + void alert_manager::maybe_notify(alert* a) { if (a->type() == save_resume_data_failed_alert::alert_type || a->type() == save_resume_data_alert::alert_type) @@ -79,8 +79,6 @@ namespace libtorrent if (m_alerts[m_generation].size() == 1) { - lock.unlock(); - // we just posted to an empty queue. If anyone is waiting for // alerts, we need to notify them. Also (potentially) call the // user supplied m_notify callback to let the client wake up its @@ -91,10 +89,6 @@ namespace libtorrent // > 0 notify them m_condition.notify_all(); } - else - { - lock.unlock(); - } #ifndef TORRENT_DISABLE_EXTENSIONS for (ses_extension_list_t::iterator i = m_ses_extensions.begin() @@ -125,7 +119,7 @@ namespace libtorrent void alert_manager::set_dispatch_function( boost::function)> const& fun) { - mutex::scoped_lock lock(m_mutex); + recursive_mutex::scoped_lock lock(m_mutex); m_dispatch = fun; @@ -151,12 +145,10 @@ namespace libtorrent void alert_manager::set_notify_function(boost::function const& fun) { - mutex::scoped_lock lock(m_mutex); + recursive_mutex::scoped_lock lock(m_mutex); m_notify = fun; if (!m_alerts[m_generation].empty()) { - // never call a callback with the lock held! - lock.unlock(); if (m_notify) m_notify(); } } @@ -170,10 +162,11 @@ namespace libtorrent void alert_manager::get_all(std::vector& alerts, int& num_resume) { - mutex::scoped_lock lock(m_mutex); + alerts.clear(); + + recursive_mutex::scoped_lock lock(m_mutex); TORRENT_ASSERT(m_num_queued_resume <= m_alerts[m_generation].size()); - alerts.clear(); if (m_alerts[m_generation].empty()) return; m_alerts[m_generation].get_pointers(alerts); @@ -189,13 +182,13 @@ namespace libtorrent bool alert_manager::pending() const { - mutex::scoped_lock lock(m_mutex); + recursive_mutex::scoped_lock lock(m_mutex); return !m_alerts[m_generation].empty(); } int alert_manager::set_alert_queue_size_limit(int queue_size_limit_) { - mutex::scoped_lock lock(m_mutex); + recursive_mutex::scoped_lock lock(m_mutex); std::swap(m_queue_size_limit, queue_size_limit_); return queue_size_limit_; diff --git a/src/thread.cpp b/src/thread.cpp index 49a8c6ab4..1e12942c5 100644 --- a/src/thread.cpp +++ b/src/thread.cpp @@ -59,7 +59,88 @@ namespace libtorrent } #ifdef BOOST_HAS_PTHREADS + recursive_mutex::recursive_mutex() + { + ::pthread_mutexattr_t attr; + ::pthread_mutexattr_init(&attr); + ::pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE); + ::pthread_mutex_init(&m_mutex, &attr); + ::pthread_mutexattr_destroy(&attr); + } + recursive_mutex::~recursive_mutex() + { ::pthread_mutex_destroy(&m_mutex); } + + void recursive_mutex::lock() + { ::pthread_mutex_lock(&m_mutex); } + + void recursive_mutex::unlock() + { ::pthread_mutex_unlock(&m_mutex); } +#else + +namespace { +#if defined TORRENT_WINDOWS || defined TORRENT_CYGWIN + thread_id_t thread_self() + { + return GetCurrentThreadId(); + } +#elif defined TORRENT_BEOS + thread_id_t thread_self() + { + return find_thread(NULL); + } +#endif +} + + recursive_mutex::recursive_mutex() + : m_owner() + , m_count(0) + {} + + recursive_mutex::~recursive_mutex() {} + + void recursive_mutex::lock() + { + thread_id_t const self = thread_self(); + + mutex::scoped_lock lock(m_mutex); + if (m_owner == self) + { + ++m_count; + } + else if (m_count == 0) + { + TORRENT_ASSERT(m_owner == thread_id_t()); + m_owner = self; + m_count = 1; + } + else + { + while (m_count != 0) + m_cond.wait(lock); + TORRENT_ASSERT(m_count == 0); + TORRENT_ASSERT(m_owner == thread_id_t()); + m_owner = self; + m_count = 1; + } + } + + void recursive_mutex::unlock() + { + thread_id_t const self = thread_self(); + mutex::scoped_lock lock(m_mutex); + TORRENT_ASSERT(m_owner == self); + TORRENT_ASSERT(m_count > 0); + + if (--m_count == 0) + { + m_owner = thread_id_t(); + m_cond.notify(); + } + } +#endif + +#ifdef BOOST_HAS_PTHREADS condition_variable::condition_variable() { pthread_cond_init(&m_cond, 0); @@ -70,14 +151,16 @@ namespace libtorrent pthread_cond_destroy(&m_cond); } - void condition_variable::wait(mutex::scoped_lock& l) + template + void condition_variable::wait_impl(LockGuard& l) { TORRENT_ASSERT(l.locked()); // wow, this is quite a hack pthread_cond_wait(&m_cond, reinterpret_cast(&l.mutex())); } - void condition_variable::wait_for(mutex::scoped_lock& l, time_duration rel_time) + template + void condition_variable::wait_for_impl(LockGuard& l, time_duration rel_time) { TORRENT_ASSERT(l.locked()); @@ -117,7 +200,8 @@ namespace libtorrent CloseHandle(m_sem); } - void condition_variable::wait(mutex::scoped_lock& l) + template + void condition_variable::wait_impl(LockGuard& l) { TORRENT_ASSERT(l.locked()); ++m_num_waiters; @@ -127,7 +211,8 @@ namespace libtorrent --m_num_waiters; } - void condition_variable::wait_for(mutex::scoped_lock& l, time_duration rel_time) + template + void condition_variable::wait_for_impl(LockGuard& l, time_duration rel_time) { TORRENT_ASSERT(l.locked()); ++m_num_waiters; @@ -158,7 +243,8 @@ namespace libtorrent delete_sem(m_sem); } - void condition_variable::wait(mutex::scoped_lock& l) + template + void condition_variable::wait_impl(mutex::scoped_lock& l) { TORRENT_ASSERT(l.locked()); ++m_num_waiters; @@ -168,7 +254,8 @@ namespace libtorrent --m_num_waiters; } - void condition_variable::wait_for(mutex::scoped_lock& l, time_duration rel_time) + template + void condition_variable::wait_for_impl(LockGuard& l, time_duration rel_time) { TORRENT_ASSERT(l.locked()); ++m_num_waiters; @@ -191,5 +278,19 @@ namespace libtorrent #error not implemented #endif + void condition_variable::wait(mutex::scoped_lock& l) + { wait_impl(l); } + void condition_variable::wait_for(mutex::scoped_lock& l, time_duration rel_time) + { wait_for_impl(l, rel_time); } + void condition_variable::wait(recursive_mutex::scoped_lock& l) + { wait_impl(l); } + void condition_variable::wait_for(recursive_mutex::scoped_lock& l, time_duration rel_time) + { wait_for_impl(l, rel_time); } + + // explicitly instantiate for these locks + template void condition_variable::wait_impl(mutex::scoped_lock&); + template void condition_variable::wait_impl(recursive_mutex::scoped_lock&); + template void condition_variable::wait_for_impl(mutex::scoped_lock&, time_duration); + template void condition_variable::wait_for_impl(recursive_mutex::scoped_lock&, time_duration); } diff --git a/test/test_alert_manager.cpp b/test/test_alert_manager.cpp index c8f97c74b..a1de83797 100644 --- a/test/test_alert_manager.cpp +++ b/test/test_alert_manager.cpp @@ -324,3 +324,32 @@ TORRENT_TEST(alert_mask) TEST_CHECK(!mgr.should_post()); } +#ifndef TORRENT_DISABLE_EXTENSIONS +struct post_plugin : lt::plugin +{ + post_plugin(alert_manager& m) : mgr(m), depth(0) {} + void on_alert(alert const* a) + { + if (++depth > 10) return; + mgr.emplace_alert(torrent_handle(), 0); + } + + alert_manager& mgr; + int depth; +}; + +// make sure the alert manager supports alerts being posted while executing a +// plugin handler +TORRENT_TEST(recursive_alerts) +{ + alert_manager mgr(100, 0xffffffff); + boost::shared_ptr pl = boost::make_shared(boost::ref(mgr)); + mgr.add_extension(pl); + + mgr.emplace_alert(torrent_handle(), 0); + + TEST_EQUAL(pl->depth, 11); +} + +#endif // TORRENT_DISABLE_EXTENSIONS +