fix message posting to work from multiple threads

This commit is contained in:
Arvid Norberg 2010-08-26 17:00:24 +00:00
parent d68719376f
commit 02668e8f75
13 changed files with 219 additions and 43 deletions

View File

@ -40,6 +40,7 @@ set(sources
socks5_stream
stat
storage
thread
time
torrent
torrent_handle

View File

@ -409,6 +409,7 @@ SOURCES =
magnet_uri
parse_url
ConvertUTF
thread
# -- extensions --
metadata_transfer

View File

@ -150,7 +150,7 @@ namespace libtorrent {
private:
std::deque<alert*> m_alerts;
mutable mutex m_mutex;
condition m_condition;
event m_condition;
int m_alert_mask;
size_t m_queue_size_limit;
boost::function<void(std::auto_ptr<alert>)> m_dispatch;

View File

@ -406,7 +406,7 @@ namespace libtorrent
// this mutex only protects m_jobs, m_queue_buffer_size
// and m_abort
mutable mutex m_queue_mutex;
condition m_signal;
event m_signal;
bool m_abort;
bool m_waiting_to_shutdown;
std::list<disk_io_job> m_jobs;

View File

@ -44,26 +44,31 @@ POSSIBILITY OF SUCH DAMAGE.
#include <boost/asio/detail/mutex.hpp>
#include <boost/asio/detail/event.hpp>
#ifdef TORRENT_BEOS
#include <kernel/OS.h>
#endif
namespace libtorrent
{
typedef boost::asio::detail::thread thread;
typedef boost::asio::detail::mutex mutex;
typedef boost::asio::detail::event condition;
typedef boost::asio::detail::event event;
inline void sleep(int milliseconds)
void sleep(int milliseconds);
struct condition
{
#if defined TORRENT_WINDOWS || defined TORRENT_CYGWIN
Sleep(milliseconds);
#elif defined TORRENT_BEOS
snooze_until(system_time() + boost::int64_t(milliseconds) * 1000, B_SYSTEM_TIMEBASE);
condition();
~condition();
void wait(mutex::scoped_lock& l);
void signal_all(mutex::scoped_lock& l);
private:
#ifdef BOOST_HAS_PTHREADS
pthread_cond_t m_cond;
#elif defined TORRENT_WINDOWS || defined TORRENT_CYGWIN
HANDLE m_sem;
mutex m_mutex;
int m_num_waiters;
#else
usleep(milliseconds * 1000);
#error not implemented
#endif
}
};
}
#endif

View File

@ -69,6 +69,7 @@ libtorrent_rasterbar_la_SOURCES = \
socks5_stream.cpp \
stat.cpp \
storage.cpp \
thread.cpp \
torrent.cpp \
torrent_handle.cpp \
torrent_info.cpp \

View File

@ -237,7 +237,7 @@ namespace libtorrent
*ret = f();
mutex::scoped_lock l(*m);
*done = true;
e->signal(l);
e->signal_all(l);
}
void fun_wrap(bool* done, condition* e, mutex* m, boost::function<void(void)> f)
@ -245,7 +245,7 @@ namespace libtorrent
f();
mutex::scoped_lock l(*m);
*done = true;
e->signal(l);
e->signal_all(l);
}
#define TORRENT_ASYNC_CALL(x) \
@ -260,55 +260,48 @@ namespace libtorrent
#define TORRENT_SYNC_CALL(x) \
bool done = false; \
mutex::scoped_lock l(m_impl->mut); \
m_impl->cond.clear(l); \
m_impl->m_io_service.post(boost::bind(&fun_wrap, &done, &m_impl->cond, &m_impl->mut, boost::function<void(void)>(boost::bind(&session_impl:: x, m_impl.get())))); \
do { m_impl->cond.wait(l); m_impl->cond.clear(l); } while(!done)
do { m_impl->cond.wait(l); } while(!done)
#define TORRENT_SYNC_CALL1(x, a1) \
bool done = false; \
mutex::scoped_lock l(m_impl->mut); \
m_impl->cond.clear(l); \
m_impl->m_io_service.post(boost::bind(&fun_wrap, &done, &m_impl->cond, &m_impl->mut, boost::function<void(void)>(boost::bind(&session_impl:: x, m_impl.get(), a1)))); \
do { m_impl->cond.wait(l); m_impl->cond.clear(l); } while(!done)
do { m_impl->cond.wait(l); } while(!done)
#define TORRENT_SYNC_CALL2(x, a1, a2) \
bool done = false; \
mutex::scoped_lock l(m_impl->mut); \
m_impl->cond.clear(l); \
m_impl->m_io_service.post(boost::bind(&fun_wrap, &done, &m_impl->cond, &m_impl->mut, boost::function<void(void)>(boost::bind(&session_impl:: x, m_impl.get(), a1, a2)))); \
do { m_impl->cond.wait(l); m_impl->cond.clear(l); } while(!done)
do { m_impl->cond.wait(l); } while(!done)
#define TORRENT_SYNC_CALL_RET(type, x) \
bool done = false; \
type r; \
mutex::scoped_lock l(m_impl->mut); \
m_impl->cond.clear(l); \
m_impl->m_io_service.post(boost::bind(&fun_ret<type>, &r, &done, &m_impl->cond, &m_impl->mut, boost::function<type(void)>(boost::bind(&session_impl:: x, m_impl.get())))); \
do { m_impl->cond.wait(l); m_impl->cond.clear(l); } while(!done)
do { m_impl->cond.wait(l); } while(!done)
#define TORRENT_SYNC_CALL_RET1(type, x, a1) \
bool done = false; \
type r; \
mutex::scoped_lock l(m_impl->mut); \
m_impl->cond.clear(l); \
m_impl->m_io_service.post(boost::bind(&fun_ret<type>, &r, &done, &m_impl->cond, &m_impl->mut, boost::function<type(void)>(boost::bind(&session_impl:: x, m_impl.get(), a1)))); \
do { m_impl->cond.wait(l); m_impl->cond.clear(l); } while(!done)
do { m_impl->cond.wait(l); } while(!done)
#define TORRENT_SYNC_CALL_RET2(type, x, a1, a2) \
bool done = false; \
type r; \
mutex::scoped_lock l(m_impl->mut); \
m_impl->cond.clear(l); \
m_impl->m_io_service.post(boost::bind(&fun_ret<type>, &r, &done, &m_impl->cond, &m_impl->mut, boost::function<type(void)>(boost::bind(&session_impl:: x, m_impl.get(), a1, a2)))); \
do { m_impl->cond.wait(l); m_impl->cond.clear(l); } while(!done)
do { m_impl->cond.wait(l); } while(!done)
#define TORRENT_SYNC_CALL_RET3(type, x, a1, a2, a3) \
bool done = false; \
type r; \
mutex::scoped_lock l(m_impl->mut); \
m_impl->cond.clear(l); \
m_impl->m_io_service.post(boost::bind(&fun_ret<type>, &r, &done, &m_impl->cond, &m_impl->mut, boost::function<type(void)>(boost::bind(&session_impl:: x, m_impl.get(), a1, a2, a3)))); \
do { m_impl->cond.wait(l); m_impl->cond.clear(l); } while(!done)
do { m_impl->cond.wait(l); } while(!done)
session::session(
fingerprint const& id

109
src/thread.cpp Normal file
View File

@ -0,0 +1,109 @@
/*
Copyright (c) 2010, Arvid Norberg
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in
the documentation and/or other materials provided with the distribution.
* Neither the name of the author nor the names of its
contributors may be used to endorse or promote products derived
from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
*/
#include "libtorrent/thread.hpp"
#include "libtorrent/assert.hpp"
#ifdef TORRENT_BEOS
#include <kernel/OS.h>
#endif
namespace libtorrent
{
void sleep(int milliseconds)
{
#if defined TORRENT_WINDOWS || defined TORRENT_CYGWIN
Sleep(milliseconds);
#elif defined TORRENT_BEOS
snooze_until(system_time() + boost::int64_t(milliseconds) * 1000, B_SYSTEM_TIMEBASE);
#else
usleep(milliseconds * 1000);
#endif
}
#ifdef BOOST_HAS_PTHREADS
condition::condition()
{
pthread_cond_init(&m_cond, 0);
}
condition::~condition()
{
pthread_cond_destroy(&m_cond);
}
void condition::wait(mutex::scoped_lock& l)
{
TORRENT_ASSERT(l.locked());
// wow, this is quite a hack
pthread_cond_wait(&m_cond, (::pthread_mutex_t*)&l.mutex());
}
void condition::signal_all(mutex::scoped_lock& l)
{
TORRENT_ASSERT(l.locked());
pthread_cond_broadcast(&m_cond);
}
#elif defined TORRENT_WINDOWS || defined TORRENT_CYGWIN
condition::condition()
: m_num_waiters(0)
{
m_sem = CreateSemaphore(0, 0, INT_MAX, 0);
}
condition::~condition()
{
CloseHandle(m_sem);
}
void condition::wait(mutex::scoped_lock& l)
{
TORRENT_ASSERT(l.locked());
++m_num_waiters;
l.unlock();
WaitForSingleObject(m_sem, INFINITE);
l.lock();
--m_num_waiters;
}
void condition::signal_all(mutex::scoped_lock& l)
{
TORRENT_ASSERT(l.locked());
ReleaseSemaphore(m_sem, m_num_waiters, 0);
}
#else
#error not implemented
#endif
}

View File

@ -82,7 +82,7 @@ namespace libtorrent
*ret = f();
mutex::scoped_lock l(*m);
*done = true;
e->signal(l);
e->signal_all(l);
}
// defined in session.cpp
@ -118,7 +118,6 @@ namespace libtorrent
bool done = false; \
session_impl& ses = t->session(); \
mutex::scoped_lock l(ses.mut); \
ses.cond.clear(l); \
ses.m_io_service.post(boost::bind(&fun_wrap, &done, &ses.cond, &ses.mut, boost::function<void(void)>(boost::bind(&torrent:: x, t)))); \
do { ses.cond.wait(l); } while(!done)
@ -128,7 +127,6 @@ namespace libtorrent
bool done = false; \
session_impl& ses = t->session(); \
mutex::scoped_lock l(ses.mut); \
ses.cond.clear(l); \
ses.m_io_service.post(boost::bind(&fun_wrap, &done, &ses.cond, &ses.mut, boost::function<void(void)>(boost::bind(&torrent:: x, t, a1)))); \
t.reset(); \
do { ses.cond.wait(l); } while(!done); }
@ -139,7 +137,6 @@ namespace libtorrent
bool done = false; \
session_impl& ses = t->session(); \
mutex::scoped_lock l(ses.mut); \
ses.cond.clear(l); \
ses.m_io_service.post(boost::bind(&fun_wrap, &done, &ses.cond, &ses.mut, boost::function<void(void)>(boost::bind(&torrent:: x, t, a1, a2)))); \
t.reset(); \
do { ses.cond.wait(l); } while(!done); }
@ -150,7 +147,6 @@ namespace libtorrent
bool done = false; \
session_impl& ses = t->session(); \
mutex::scoped_lock l(ses.mut); \
ses.cond.clear(l); \
ses.m_io_service.post(boost::bind(&fun_wrap, &done, &ses.cond, &ses.mut, boost::function<void(void)>(boost::bind(&torrent:: x, t, a1, a2, a3)))); \
t.reset(); \
do { ses.cond.wait(l); } while(!done); }
@ -162,7 +158,6 @@ namespace libtorrent
session_impl& ses = t->session(); \
type r; \
mutex::scoped_lock l(ses.mut); \
ses.cond.clear(l); \
ses.m_io_service.post(boost::bind(&fun_ret<type>, &r, &done, &ses.cond, &ses.mut, boost::function<type(void)>(boost::bind(&torrent:: x, t)))); \
t.reset(); \
do { ses.cond.wait(l); } while(!done)
@ -174,7 +169,6 @@ namespace libtorrent
session_impl& ses = t->session(); \
type r; \
mutex::scoped_lock l(ses.mut); \
ses.cond.clear(l); \
ses.m_io_service.post(boost::bind(&fun_ret<type>, &r, &done, &ses.cond, &ses.mut, boost::function<type(void)>(boost::bind(&torrent:: x, t, a1)))); \
t.reset(); \
do { ses.cond.wait(l); } while(!done)
@ -186,7 +180,6 @@ namespace libtorrent
session_impl& ses = t->session(); \
type r; \
mutex::scoped_lock l(ses.mut); \
ses.cond.clear(l); \
ses.m_io_service.post(boost::bind(&fun_ret<type>, &r, &done, &ses.cond, &ses.mut, boost::function<type(void)>(boost::bind(&torrent:: x, t, a1, a2)))); \
t.reset(); \
do { ses.cond.wait(l); } while(!done)
@ -734,7 +727,6 @@ namespace libtorrent
bool done = false;
session_impl& ses = t->session();
mutex::scoped_lock l(ses.mut);
ses.cond.clear(l);
ses.m_io_service.post(boost::bind(&fun_wrap, &done, &ses.cond
, &ses.mut, boost::function<void(void)>(boost::bind(
&piece_manager::write_resume_data, &t->filesystem(), boost::ref(ret)))));

View File

@ -17,6 +17,7 @@ project
;
test-suite libtorrent :
[ run test_threads.cpp ]
[ run test_bandwidth_limiter.cpp ]
[ run test_buffer.cpp ]
[ run test_piece_picker.cpp ]

View File

@ -345,7 +345,7 @@ setup_transfer(session* ses1, session* ses2, session* ses3
boost::asio::io_service* tracker_ios = 0;
boost::shared_ptr<libtorrent::thread> tracker_server;
libtorrent::mutex tracker_lock;
libtorrent::condition tracker_initialized;
libtorrent::event tracker_initialized;
bool udp_failed = false;
@ -491,7 +491,7 @@ void udp_tracker_thread(int* port)
boost::asio::io_service* web_ios = 0;
boost::shared_ptr<libtorrent::thread> web_server;
libtorrent::mutex web_lock;
libtorrent::condition web_initialized;
libtorrent::event web_initialized;
void stop_web_server()
{

View File

@ -240,8 +240,8 @@ struct test_storage : storage_interface
}
private:
condition m_ready_condition;
condition m_condition;
event m_ready_condition;
event m_condition;
libtorrent::mutex m_mutex;
bool m_started;
bool m_ready;

73
test/test_threads.cpp Normal file
View File

@ -0,0 +1,73 @@
/*
Copyright (c) 2010, Arvid Norberg
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in
the documentation and/or other materials provided with the distribution.
* Neither the name of the author nor the names of its
contributors may be used to endorse or promote products derived
from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
*/
#include <boost/bind.hpp>
#include <list>
#include "libtorrent/thread.hpp"
#include "test.hpp"
using namespace libtorrent;
void fun(condition* s, mutex* m, int i)
{
fprintf(stderr, "thread %d waiting\n", i);
mutex::scoped_lock l(*m);
s->wait(l);
fprintf(stderr, "thread %d done\n", i);
}
int test_main()
{
condition cond;
mutex m;
std::list<thread*> threads;
for (int i = 0; i < 20; ++i)
{
threads.push_back(new thread(boost::bind(&fun, &cond, &m, i)));
}
// make sure all threads are waiting on the condition
sleep(10);
mutex::scoped_lock l(m);
cond.signal_all(l);
l.unlock();
for (std::list<thread*>::iterator i = threads.begin(); i != threads.end(); ++i)
{
(*i)->join();
delete *i;
}
return 0;
}