introduce a recursive mutex to protect the alert_manager, and hold the mutex while calling user callbacks and plugin hooks. This is required to ensure the underlying storage for alert allocations (in the stack allocators) is synchronized with the thread that may be accessing it

This commit is contained in:
arvidn 2018-04-30 00:34:41 +02:00 committed by Arvid Norberg
parent 785f173df3
commit 8808eb7cdd
6 changed files with 206 additions and 35 deletions

View File

@ -47,11 +47,12 @@ POSSIBILITY OF SUCH DAMAGE.
#include <boost/function/function0.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/config.hpp>
#include <list>
#include <utility> // for std::forward
#include "libtorrent/aux_/disable_warnings_pop.hpp"
#include <list>
#include <utility> // for std::forward
#ifdef __GNUC__
// this is to suppress the warnings for using std::auto_ptr
#pragma GCC diagnostic push
@ -80,7 +81,7 @@ namespace libtorrent {
template <class T, typename... Args>
void emplace_alert(Args&&... args)
{
mutex::scoped_lock lock(m_mutex);
recursive_mutex::scoped_lock lock(m_mutex);
#ifndef TORRENT_NO_DEPRECATE
if (m_dispatch)
{
@ -99,7 +100,7 @@ namespace libtorrent {
T alert(m_allocations[m_generation], std::forward<Args>(args)...);
m_alerts[m_generation].push_back(alert);
maybe_notify(&alert, lock);
maybe_notify(&alert);
}
#else
@ -116,7 +117,7 @@ namespace libtorrent {
template <class T>
bool should_post() const
{
mutex::scoped_lock lock(m_mutex);
recursive_mutex::scoped_lock lock(m_mutex);
if (m_alerts[m_generation].size() >= m_queue_size_limit
* (1 + T::priority))
{
@ -129,13 +130,13 @@ namespace libtorrent {
void set_alert_mask(boost::uint32_t m)
{
mutex::scoped_lock lock(m_mutex);
recursive_mutex::scoped_lock lock(m_mutex);
m_alert_mask = m;
}
boost::uint32_t alert_mask() const
{
mutex::scoped_lock lock(m_mutex);
recursive_mutex::scoped_lock lock(m_mutex);
return m_alert_mask;
}
@ -160,9 +161,14 @@ namespace libtorrent {
alert_manager(alert_manager const&);
alert_manager& operator=(alert_manager const&);
void maybe_notify(alert* a, mutex::scoped_lock& lock);
void maybe_notify(alert* a);
mutable mutex m_mutex;
// this mutex protects everything. Since it's held while executing user
// callbacks (the notify function and extension on_alert()) it must be
// recursive to post new alerts. This is implemented by storing the
// current thread-id in m_mutex_holder, if it matches ours, we don't need
// to lock
mutable recursive_mutex m_mutex;
condition_variable m_condition;
boost::uint32_t m_alert_mask;
int m_queue_size_limit;

View File

@ -24,7 +24,7 @@
BOOST_PP_ENUM_PARAMS(I, typename A)>
void emplace_alert(BOOST_PP_ENUM_BINARY_PARAMS(I, A, const& a) )
{
mutex::scoped_lock lock(m_mutex);
recursive_mutex::scoped_lock lock(m_mutex);
#ifndef TORRENT_NO_DEPRECATE
if (m_dispatch)
{
@ -46,7 +46,7 @@
BOOST_PP_ENUM_PARAMS(I, a));
m_alerts[m_generation].push_back(alert);
maybe_notify(&alert, lock);
maybe_notify(&alert);
}
#undef I

View File

@ -53,6 +53,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include <boost/asio/detail/thread.hpp>
#include <boost/asio/detail/mutex.hpp>
#include <boost/asio/detail/event.hpp>
#include <boost/asio/detail/scoped_lock.hpp>
#include <boost/cstdint.hpp>
#include "libtorrent/aux_/disable_warnings_pop.hpp"
@ -66,15 +67,25 @@ namespace libtorrent
// internal
void sleep(int milliseconds);
struct recursive_mutex;
struct TORRENT_EXTRA_EXPORT condition_variable
{
condition_variable();
~condition_variable();
void wait(mutex::scoped_lock& l);
void wait_for(mutex::scoped_lock& l, time_duration rel_time);
void wait_for(mutex::scoped_lock& l, time_duration);
void wait(boost::asio::detail::scoped_lock<recursive_mutex>& l);
void wait_for(boost::asio::detail::scoped_lock<recursive_mutex>&, time_duration);
void notify_all();
void notify();
private:
template <typename LockGuard>
void wait_impl(LockGuard& l);
template <typename LockGuard>
void wait_for_impl(LockGuard& l, time_duration rel_time);
#ifdef BOOST_HAS_PTHREADS
pthread_cond_t m_cond;
#elif defined TORRENT_WINDOWS || defined TORRENT_CYGWIN
@ -89,6 +100,37 @@ namespace libtorrent
#error not implemented
#endif
};
#if defined TORRENT_WINDOWS || defined TORRENT_CYGWIN
typedef DWORD thread_id_t;
#elif defined TORRENT_BEOS
typedef thread_id thread_id_t;
#endif
// internal
struct recursive_mutex
{
typedef boost::asio::detail::scoped_lock<recursive_mutex> scoped_lock;
recursive_mutex();
~recursive_mutex();
void lock();
void unlock();
private:
recursive_mutex(recursive_mutex const&);
recursive_mutex& operator=(recursive_mutex const&);
#ifdef BOOST_HAS_PTHREADS
::pthread_mutex_t m_mutex;
#else
mutex m_mutex;
condition_variable m_cond;
thread_id_t m_owner;
int m_count;
#endif
};
}
#endif

View File

@ -52,13 +52,13 @@ namespace libtorrent
int alert_manager::num_queued_resume() const
{
mutex::scoped_lock lock(m_mutex);
recursive_mutex::scoped_lock lock(m_mutex);
return m_num_queued_resume;
}
alert* alert_manager::wait_for_alert(time_duration max_wait)
{
mutex::scoped_lock lock(m_mutex);
recursive_mutex::scoped_lock lock(m_mutex);
if (!m_alerts[m_generation].empty())
return m_alerts[m_generation].front();
@ -71,7 +71,7 @@ namespace libtorrent
return NULL;
}
void alert_manager::maybe_notify(alert* a, mutex::scoped_lock& lock)
void alert_manager::maybe_notify(alert* a)
{
if (a->type() == save_resume_data_failed_alert::alert_type
|| a->type() == save_resume_data_alert::alert_type)
@ -79,8 +79,6 @@ namespace libtorrent
if (m_alerts[m_generation].size() == 1)
{
lock.unlock();
// we just posted to an empty queue. If anyone is waiting for
// alerts, we need to notify them. Also (potentially) call the
// user supplied m_notify callback to let the client wake up its
@ -91,10 +89,6 @@ namespace libtorrent
// > 0 notify them
m_condition.notify_all();
}
else
{
lock.unlock();
}
#ifndef TORRENT_DISABLE_EXTENSIONS
for (ses_extension_list_t::iterator i = m_ses_extensions.begin()
@ -125,7 +119,7 @@ namespace libtorrent
void alert_manager::set_dispatch_function(
boost::function<void(std::auto_ptr<alert>)> const& fun)
{
mutex::scoped_lock lock(m_mutex);
recursive_mutex::scoped_lock lock(m_mutex);
m_dispatch = fun;
@ -151,12 +145,10 @@ namespace libtorrent
void alert_manager::set_notify_function(boost::function<void()> const& fun)
{
mutex::scoped_lock lock(m_mutex);
recursive_mutex::scoped_lock lock(m_mutex);
m_notify = fun;
if (!m_alerts[m_generation].empty())
{
// never call a callback with the lock held!
lock.unlock();
if (m_notify) m_notify();
}
}
@ -170,10 +162,11 @@ namespace libtorrent
void alert_manager::get_all(std::vector<alert*>& alerts, int& num_resume)
{
mutex::scoped_lock lock(m_mutex);
alerts.clear();
recursive_mutex::scoped_lock lock(m_mutex);
TORRENT_ASSERT(m_num_queued_resume <= m_alerts[m_generation].size());
alerts.clear();
if (m_alerts[m_generation].empty()) return;
m_alerts[m_generation].get_pointers(alerts);
@ -189,13 +182,13 @@ namespace libtorrent
bool alert_manager::pending() const
{
mutex::scoped_lock lock(m_mutex);
recursive_mutex::scoped_lock lock(m_mutex);
return !m_alerts[m_generation].empty();
}
int alert_manager::set_alert_queue_size_limit(int queue_size_limit_)
{
mutex::scoped_lock lock(m_mutex);
recursive_mutex::scoped_lock lock(m_mutex);
std::swap(m_queue_size_limit, queue_size_limit_);
return queue_size_limit_;

View File

@ -59,7 +59,88 @@ namespace libtorrent
}
#ifdef BOOST_HAS_PTHREADS
recursive_mutex::recursive_mutex()
{
::pthread_mutexattr_t attr;
::pthread_mutexattr_init(&attr);
::pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
::pthread_mutex_init(&m_mutex, &attr);
::pthread_mutexattr_destroy(&attr);
}
recursive_mutex::~recursive_mutex()
{ ::pthread_mutex_destroy(&m_mutex); }
void recursive_mutex::lock()
{ ::pthread_mutex_lock(&m_mutex); }
void recursive_mutex::unlock()
{ ::pthread_mutex_unlock(&m_mutex); }
#else
namespace {
#if defined TORRENT_WINDOWS || defined TORRENT_CYGWIN
thread_id_t thread_self()
{
return GetCurrentThreadId();
}
#elif defined TORRENT_BEOS
thread_id_t thread_self()
{
return find_thread(NULL);
}
#endif
}
recursive_mutex::recursive_mutex()
: m_owner()
, m_count(0)
{}
recursive_mutex::~recursive_mutex() {}
void recursive_mutex::lock()
{
thread_id_t const self = thread_self();
mutex::scoped_lock lock(m_mutex);
if (m_owner == self)
{
++m_count;
}
else if (m_count == 0)
{
TORRENT_ASSERT(m_owner == thread_id_t());
m_owner = self;
m_count = 1;
}
else
{
while (m_count != 0)
m_cond.wait(lock);
TORRENT_ASSERT(m_count == 0);
TORRENT_ASSERT(m_owner == thread_id_t());
m_owner = self;
m_count = 1;
}
}
void recursive_mutex::unlock()
{
thread_id_t const self = thread_self();
mutex::scoped_lock lock(m_mutex);
TORRENT_ASSERT(m_owner == self);
TORRENT_ASSERT(m_count > 0);
if (--m_count == 0)
{
m_owner = thread_id_t();
m_cond.notify();
}
}
#endif
#ifdef BOOST_HAS_PTHREADS
condition_variable::condition_variable()
{
pthread_cond_init(&m_cond, 0);
@ -70,14 +151,16 @@ namespace libtorrent
pthread_cond_destroy(&m_cond);
}
void condition_variable::wait(mutex::scoped_lock& l)
template <typename LockGuard>
void condition_variable::wait_impl(LockGuard& l)
{
TORRENT_ASSERT(l.locked());
// wow, this is quite a hack
pthread_cond_wait(&m_cond, reinterpret_cast<pthread_mutex_t*>(&l.mutex()));
}
void condition_variable::wait_for(mutex::scoped_lock& l, time_duration rel_time)
template <typename LockGuard>
void condition_variable::wait_for_impl(LockGuard& l, time_duration rel_time)
{
TORRENT_ASSERT(l.locked());
@ -117,7 +200,8 @@ namespace libtorrent
CloseHandle(m_sem);
}
void condition_variable::wait(mutex::scoped_lock& l)
template <typename LockGuard>
void condition_variable::wait_impl(LockGuard& l)
{
TORRENT_ASSERT(l.locked());
++m_num_waiters;
@ -127,7 +211,8 @@ namespace libtorrent
--m_num_waiters;
}
void condition_variable::wait_for(mutex::scoped_lock& l, time_duration rel_time)
template <typename LockGuard>
void condition_variable::wait_for_impl(LockGuard& l, time_duration rel_time)
{
TORRENT_ASSERT(l.locked());
++m_num_waiters;
@ -158,7 +243,8 @@ namespace libtorrent
delete_sem(m_sem);
}
void condition_variable::wait(mutex::scoped_lock& l)
template <typename LockGuard>
void condition_variable::wait_impl(mutex::scoped_lock& l)
{
TORRENT_ASSERT(l.locked());
++m_num_waiters;
@ -168,7 +254,8 @@ namespace libtorrent
--m_num_waiters;
}
void condition_variable::wait_for(mutex::scoped_lock& l, time_duration rel_time)
template <typename LockGuard>
void condition_variable::wait_for_impl(LockGuard& l, time_duration rel_time)
{
TORRENT_ASSERT(l.locked());
++m_num_waiters;
@ -191,5 +278,19 @@ namespace libtorrent
#error not implemented
#endif
void condition_variable::wait(mutex::scoped_lock& l)
{ wait_impl(l); }
void condition_variable::wait_for(mutex::scoped_lock& l, time_duration rel_time)
{ wait_for_impl(l, rel_time); }
void condition_variable::wait(recursive_mutex::scoped_lock& l)
{ wait_impl(l); }
void condition_variable::wait_for(recursive_mutex::scoped_lock& l, time_duration rel_time)
{ wait_for_impl(l, rel_time); }
// explicitly instantiate for these locks
template void condition_variable::wait_impl<mutex::scoped_lock>(mutex::scoped_lock&);
template void condition_variable::wait_impl<recursive_mutex::scoped_lock>(recursive_mutex::scoped_lock&);
template void condition_variable::wait_for_impl<mutex::scoped_lock>(mutex::scoped_lock&, time_duration);
template void condition_variable::wait_for_impl<recursive_mutex::scoped_lock>(recursive_mutex::scoped_lock&, time_duration);
}

View File

@ -324,3 +324,32 @@ TORRENT_TEST(alert_mask)
TEST_CHECK(!mgr.should_post<torrent_paused_alert>());
}
#ifndef TORRENT_DISABLE_EXTENSIONS
struct post_plugin : lt::plugin
{
post_plugin(alert_manager& m) : mgr(m), depth(0) {}
void on_alert(alert const* a)
{
if (++depth > 10) return;
mgr.emplace_alert<piece_finished_alert>(torrent_handle(), 0);
}
alert_manager& mgr;
int depth;
};
// make sure the alert manager supports alerts being posted while executing a
// plugin handler
TORRENT_TEST(recursive_alerts)
{
alert_manager mgr(100, 0xffffffff);
boost::shared_ptr<post_plugin> pl = boost::make_shared<post_plugin>(boost::ref(mgr));
mgr.add_extension(pl);
mgr.emplace_alert<piece_finished_alert>(torrent_handle(), 0);
TEST_EQUAL(pl->depth, 11);
}
#endif // TORRENT_DISABLE_EXTENSIONS