diff --git a/include/libtorrent/alert_manager.hpp b/include/libtorrent/alert_manager.hpp index 8449b09f4..6c7ead1c0 100644 --- a/include/libtorrent/alert_manager.hpp +++ b/include/libtorrent/alert_manager.hpp @@ -47,11 +47,12 @@ POSSIBILITY OF SUCH DAMAGE. #include #include #include -#include -#include // for std::forward #include "libtorrent/aux_/disable_warnings_pop.hpp" +#include +#include // for std::forward + #ifdef __GNUC__ // this is to suppress the warnings for using std::auto_ptr #pragma GCC diagnostic push @@ -80,7 +81,7 @@ namespace libtorrent { template void emplace_alert(Args&&... args) { - mutex::scoped_lock lock(m_mutex); + recursive_mutex::scoped_lock lock(m_mutex); #ifndef TORRENT_NO_DEPRECATE if (m_dispatch) { @@ -99,7 +100,7 @@ namespace libtorrent { T alert(m_allocations[m_generation], std::forward(args)...); m_alerts[m_generation].push_back(alert); - maybe_notify(&alert, lock); + maybe_notify(&alert); } #else @@ -116,7 +117,7 @@ namespace libtorrent { template bool should_post() const { - mutex::scoped_lock lock(m_mutex); + recursive_mutex::scoped_lock lock(m_mutex); if (m_alerts[m_generation].size() >= m_queue_size_limit * (1 + T::priority)) { @@ -129,13 +130,13 @@ namespace libtorrent { void set_alert_mask(boost::uint32_t m) { - mutex::scoped_lock lock(m_mutex); + recursive_mutex::scoped_lock lock(m_mutex); m_alert_mask = m; } boost::uint32_t alert_mask() const { - mutex::scoped_lock lock(m_mutex); + recursive_mutex::scoped_lock lock(m_mutex); return m_alert_mask; } @@ -160,9 +161,14 @@ namespace libtorrent { alert_manager(alert_manager const&); alert_manager& operator=(alert_manager const&); - void maybe_notify(alert* a, mutex::scoped_lock& lock); + void maybe_notify(alert* a); - mutable mutex m_mutex; + // this mutex protects everything. Since it's held while executing user + // callbacks (the notify function and extension on_alert()) it must be + // recursive to post new alerts. This is implemented by storing the + // current thread-id in m_mutex_holder, if it matches ours, we don't need + // to lock + mutable recursive_mutex m_mutex; condition_variable m_condition; boost::uint32_t m_alert_mask; int m_queue_size_limit; diff --git a/include/libtorrent/aux_/alert_manager_variadic_emplace.hpp b/include/libtorrent/aux_/alert_manager_variadic_emplace.hpp index 5c360eb40..180632edd 100644 --- a/include/libtorrent/aux_/alert_manager_variadic_emplace.hpp +++ b/include/libtorrent/aux_/alert_manager_variadic_emplace.hpp @@ -24,7 +24,7 @@ BOOST_PP_ENUM_PARAMS(I, typename A)> void emplace_alert(BOOST_PP_ENUM_BINARY_PARAMS(I, A, const& a) ) { - mutex::scoped_lock lock(m_mutex); + recursive_mutex::scoped_lock lock(m_mutex); #ifndef TORRENT_NO_DEPRECATE if (m_dispatch) { @@ -46,7 +46,7 @@ BOOST_PP_ENUM_PARAMS(I, a)); m_alerts[m_generation].push_back(alert); - maybe_notify(&alert, lock); + maybe_notify(&alert); } #undef I diff --git a/include/libtorrent/thread.hpp b/include/libtorrent/thread.hpp index abdef22ca..616a2cad6 100644 --- a/include/libtorrent/thread.hpp +++ b/include/libtorrent/thread.hpp @@ -53,6 +53,7 @@ POSSIBILITY OF SUCH DAMAGE. #include #include #include +#include #include #include "libtorrent/aux_/disable_warnings_pop.hpp" @@ -66,15 +67,25 @@ namespace libtorrent // internal void sleep(int milliseconds); + struct recursive_mutex; + struct TORRENT_EXTRA_EXPORT condition_variable { condition_variable(); ~condition_variable(); void wait(mutex::scoped_lock& l); - void wait_for(mutex::scoped_lock& l, time_duration rel_time); + void wait_for(mutex::scoped_lock& l, time_duration); + void wait(boost::asio::detail::scoped_lock& l); + void wait_for(boost::asio::detail::scoped_lock&, time_duration); void notify_all(); void notify(); private: + + template + void wait_impl(LockGuard& l); + template + void wait_for_impl(LockGuard& l, time_duration rel_time); + #ifdef BOOST_HAS_PTHREADS pthread_cond_t m_cond; #elif defined TORRENT_WINDOWS || defined TORRENT_CYGWIN @@ -89,6 +100,37 @@ namespace libtorrent #error not implemented #endif }; + +#if defined TORRENT_WINDOWS || defined TORRENT_CYGWIN + typedef DWORD thread_id_t; +#elif defined TORRENT_BEOS + typedef thread_id thread_id_t; +#endif + + // internal + struct recursive_mutex + { + typedef boost::asio::detail::scoped_lock scoped_lock; + + recursive_mutex(); + ~recursive_mutex(); + + void lock(); + void unlock(); + + private: + recursive_mutex(recursive_mutex const&); + recursive_mutex& operator=(recursive_mutex const&); + +#ifdef BOOST_HAS_PTHREADS + ::pthread_mutex_t m_mutex; +#else + mutex m_mutex; + condition_variable m_cond; + thread_id_t m_owner; + int m_count; +#endif + }; } #endif diff --git a/src/alert_manager.cpp b/src/alert_manager.cpp index 13d6a8083..47456d002 100644 --- a/src/alert_manager.cpp +++ b/src/alert_manager.cpp @@ -52,13 +52,13 @@ namespace libtorrent int alert_manager::num_queued_resume() const { - mutex::scoped_lock lock(m_mutex); + recursive_mutex::scoped_lock lock(m_mutex); return m_num_queued_resume; } alert* alert_manager::wait_for_alert(time_duration max_wait) { - mutex::scoped_lock lock(m_mutex); + recursive_mutex::scoped_lock lock(m_mutex); if (!m_alerts[m_generation].empty()) return m_alerts[m_generation].front(); @@ -71,7 +71,7 @@ namespace libtorrent return NULL; } - void alert_manager::maybe_notify(alert* a, mutex::scoped_lock& lock) + void alert_manager::maybe_notify(alert* a) { if (a->type() == save_resume_data_failed_alert::alert_type || a->type() == save_resume_data_alert::alert_type) @@ -79,8 +79,6 @@ namespace libtorrent if (m_alerts[m_generation].size() == 1) { - lock.unlock(); - // we just posted to an empty queue. If anyone is waiting for // alerts, we need to notify them. Also (potentially) call the // user supplied m_notify callback to let the client wake up its @@ -91,10 +89,6 @@ namespace libtorrent // > 0 notify them m_condition.notify_all(); } - else - { - lock.unlock(); - } #ifndef TORRENT_DISABLE_EXTENSIONS for (ses_extension_list_t::iterator i = m_ses_extensions.begin() @@ -125,7 +119,7 @@ namespace libtorrent void alert_manager::set_dispatch_function( boost::function)> const& fun) { - mutex::scoped_lock lock(m_mutex); + recursive_mutex::scoped_lock lock(m_mutex); m_dispatch = fun; @@ -151,12 +145,10 @@ namespace libtorrent void alert_manager::set_notify_function(boost::function const& fun) { - mutex::scoped_lock lock(m_mutex); + recursive_mutex::scoped_lock lock(m_mutex); m_notify = fun; if (!m_alerts[m_generation].empty()) { - // never call a callback with the lock held! - lock.unlock(); if (m_notify) m_notify(); } } @@ -170,10 +162,11 @@ namespace libtorrent void alert_manager::get_all(std::vector& alerts, int& num_resume) { - mutex::scoped_lock lock(m_mutex); + alerts.clear(); + + recursive_mutex::scoped_lock lock(m_mutex); TORRENT_ASSERT(m_num_queued_resume <= m_alerts[m_generation].size()); - alerts.clear(); if (m_alerts[m_generation].empty()) return; m_alerts[m_generation].get_pointers(alerts); @@ -189,13 +182,13 @@ namespace libtorrent bool alert_manager::pending() const { - mutex::scoped_lock lock(m_mutex); + recursive_mutex::scoped_lock lock(m_mutex); return !m_alerts[m_generation].empty(); } int alert_manager::set_alert_queue_size_limit(int queue_size_limit_) { - mutex::scoped_lock lock(m_mutex); + recursive_mutex::scoped_lock lock(m_mutex); std::swap(m_queue_size_limit, queue_size_limit_); return queue_size_limit_; diff --git a/src/thread.cpp b/src/thread.cpp index 49a8c6ab4..1e12942c5 100644 --- a/src/thread.cpp +++ b/src/thread.cpp @@ -59,7 +59,88 @@ namespace libtorrent } #ifdef BOOST_HAS_PTHREADS + recursive_mutex::recursive_mutex() + { + ::pthread_mutexattr_t attr; + ::pthread_mutexattr_init(&attr); + ::pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE); + ::pthread_mutex_init(&m_mutex, &attr); + ::pthread_mutexattr_destroy(&attr); + } + recursive_mutex::~recursive_mutex() + { ::pthread_mutex_destroy(&m_mutex); } + + void recursive_mutex::lock() + { ::pthread_mutex_lock(&m_mutex); } + + void recursive_mutex::unlock() + { ::pthread_mutex_unlock(&m_mutex); } +#else + +namespace { +#if defined TORRENT_WINDOWS || defined TORRENT_CYGWIN + thread_id_t thread_self() + { + return GetCurrentThreadId(); + } +#elif defined TORRENT_BEOS + thread_id_t thread_self() + { + return find_thread(NULL); + } +#endif +} + + recursive_mutex::recursive_mutex() + : m_owner() + , m_count(0) + {} + + recursive_mutex::~recursive_mutex() {} + + void recursive_mutex::lock() + { + thread_id_t const self = thread_self(); + + mutex::scoped_lock lock(m_mutex); + if (m_owner == self) + { + ++m_count; + } + else if (m_count == 0) + { + TORRENT_ASSERT(m_owner == thread_id_t()); + m_owner = self; + m_count = 1; + } + else + { + while (m_count != 0) + m_cond.wait(lock); + TORRENT_ASSERT(m_count == 0); + TORRENT_ASSERT(m_owner == thread_id_t()); + m_owner = self; + m_count = 1; + } + } + + void recursive_mutex::unlock() + { + thread_id_t const self = thread_self(); + mutex::scoped_lock lock(m_mutex); + TORRENT_ASSERT(m_owner == self); + TORRENT_ASSERT(m_count > 0); + + if (--m_count == 0) + { + m_owner = thread_id_t(); + m_cond.notify(); + } + } +#endif + +#ifdef BOOST_HAS_PTHREADS condition_variable::condition_variable() { pthread_cond_init(&m_cond, 0); @@ -70,14 +151,16 @@ namespace libtorrent pthread_cond_destroy(&m_cond); } - void condition_variable::wait(mutex::scoped_lock& l) + template + void condition_variable::wait_impl(LockGuard& l) { TORRENT_ASSERT(l.locked()); // wow, this is quite a hack pthread_cond_wait(&m_cond, reinterpret_cast(&l.mutex())); } - void condition_variable::wait_for(mutex::scoped_lock& l, time_duration rel_time) + template + void condition_variable::wait_for_impl(LockGuard& l, time_duration rel_time) { TORRENT_ASSERT(l.locked()); @@ -117,7 +200,8 @@ namespace libtorrent CloseHandle(m_sem); } - void condition_variable::wait(mutex::scoped_lock& l) + template + void condition_variable::wait_impl(LockGuard& l) { TORRENT_ASSERT(l.locked()); ++m_num_waiters; @@ -127,7 +211,8 @@ namespace libtorrent --m_num_waiters; } - void condition_variable::wait_for(mutex::scoped_lock& l, time_duration rel_time) + template + void condition_variable::wait_for_impl(LockGuard& l, time_duration rel_time) { TORRENT_ASSERT(l.locked()); ++m_num_waiters; @@ -158,7 +243,8 @@ namespace libtorrent delete_sem(m_sem); } - void condition_variable::wait(mutex::scoped_lock& l) + template + void condition_variable::wait_impl(mutex::scoped_lock& l) { TORRENT_ASSERT(l.locked()); ++m_num_waiters; @@ -168,7 +254,8 @@ namespace libtorrent --m_num_waiters; } - void condition_variable::wait_for(mutex::scoped_lock& l, time_duration rel_time) + template + void condition_variable::wait_for_impl(LockGuard& l, time_duration rel_time) { TORRENT_ASSERT(l.locked()); ++m_num_waiters; @@ -191,5 +278,19 @@ namespace libtorrent #error not implemented #endif + void condition_variable::wait(mutex::scoped_lock& l) + { wait_impl(l); } + void condition_variable::wait_for(mutex::scoped_lock& l, time_duration rel_time) + { wait_for_impl(l, rel_time); } + void condition_variable::wait(recursive_mutex::scoped_lock& l) + { wait_impl(l); } + void condition_variable::wait_for(recursive_mutex::scoped_lock& l, time_duration rel_time) + { wait_for_impl(l, rel_time); } + + // explicitly instantiate for these locks + template void condition_variable::wait_impl(mutex::scoped_lock&); + template void condition_variable::wait_impl(recursive_mutex::scoped_lock&); + template void condition_variable::wait_for_impl(mutex::scoped_lock&, time_duration); + template void condition_variable::wait_for_impl(recursive_mutex::scoped_lock&, time_duration); } diff --git a/test/test_alert_manager.cpp b/test/test_alert_manager.cpp index c8f97c74b..a1de83797 100644 --- a/test/test_alert_manager.cpp +++ b/test/test_alert_manager.cpp @@ -324,3 +324,32 @@ TORRENT_TEST(alert_mask) TEST_CHECK(!mgr.should_post()); } +#ifndef TORRENT_DISABLE_EXTENSIONS +struct post_plugin : lt::plugin +{ + post_plugin(alert_manager& m) : mgr(m), depth(0) {} + void on_alert(alert const* a) + { + if (++depth > 10) return; + mgr.emplace_alert(torrent_handle(), 0); + } + + alert_manager& mgr; + int depth; +}; + +// make sure the alert manager supports alerts being posted while executing a +// plugin handler +TORRENT_TEST(recursive_alerts) +{ + alert_manager mgr(100, 0xffffffff); + boost::shared_ptr pl = boost::make_shared(boost::ref(mgr)); + mgr.add_extension(pl); + + mgr.emplace_alert(torrent_handle(), 0); + + TEST_EQUAL(pl->depth, 11); +} + +#endif // TORRENT_DISABLE_EXTENSIONS +