use a condition variable in alert_manager to trigger wait_for_alert threads

This commit is contained in:
Arvid Norberg 2013-01-21 02:29:36 +00:00
parent 652bf8323b
commit f440bca2c4
5 changed files with 108 additions and 86 deletions

View File

@ -25,61 +25,7 @@
<span style="color: #3c3">9 relevant</span> <span style="color: #3c3">9 relevant</span>
<span style="color: #77f">16 feasible</span> <span style="color: #77f">16 feasible</span>
<span style="color: #999">33 notes</span> <span style="color: #999">33 notes</span>
<table width="100%" border="1" style="border-collapse: collapse;"><tr style="background: #fcc"><td>relevance&nbsp;3</td><td><a href="javascript:expand(0)">src/alert.cpp:370</a></td><td>change this to use a timed wait on a condition variable problem is, that's not necessarily portable. But it should be used where available. This implementation can be left the way it is for more primitive platforms</td></tr><tr id="0" style="display: none;" colspan="3"><td colspan="3"><h2>change this to use a timed wait on a condition variable <table width="100%" border="1" style="border-collapse: collapse;"><tr style="background: #fcc"><td>relevance&nbsp;3</td><td><a href="javascript:expand(0)">src/storage.cpp:989</a></td><td>use binary search to find the file entry</td></tr><tr id="0" style="display: none;" colspan="3"><td colspan="3"><h2>use binary search to find the file entry</h2><h4>src/storage.cpp:989</h4><pre style="background: #f6f6f6; border: solid 1px #ddd;"> size_type ret = 0;
problem is, that's not necessarily portable. But it should be used
where available. This implementation can be left the way it is for
more primitive platforms</h2><h4>src/alert.cpp:370</h4><pre style="background: #f6f6f6; border: solid 1px #ddd;"> delete m_alerts.front();
m_alerts.pop_front();
}
}
alert const* alert_manager::wait_for_alert(time_duration max_wait)
{
mutex::scoped_lock lock(m_mutex);
if (!m_alerts.empty()) return m_alerts.front();
// system_time end = get_system_time()
// + boost::posix_time::microseconds(total_microseconds(max_wait));
// this call can be interrupted prematurely by other signals
// while (m_condition.timed_wait(lock, end))
// if (!m_alerts.empty()) return m_alerts.front();
ptime start = time_now_hires();
<div style="background: #ffff00" width="100%"> while (m_alerts.empty())
</div> {
lock.unlock();
sleep(50);
lock.lock();
if (time_now_hires() - start &gt;= max_wait) return 0;
}
return m_alerts.front();
}
void alert_manager::set_dispatch_function(boost::function&lt;void(std::auto_ptr&lt;alert&gt;)&gt; const&amp; fun)
{
mutex::scoped_lock lock(m_mutex);
m_dispatch = fun;
std::deque&lt;alert*&gt; alerts;
m_alerts.swap(alerts);
lock.unlock();
while (!alerts.empty())
{
TORRENT_TRY {
m_dispatch(std::auto_ptr&lt;alert&gt;(alerts.front()));
} TORRENT_CATCH(std::exception&amp;) {}
alerts.pop_front();
}
}
void dispatch_alert(boost::function&lt;void(alert const&amp;)&gt; dispatcher
, alert* alert_)
</pre></td></tr><tr style="background: #fcc"><td>relevance&nbsp;3</td><td><a href="javascript:expand(1)">src/storage.cpp:989</a></td><td>use binary search to find the file entry</td></tr><tr id="1" style="display: none;" colspan="3"><td colspan="3"><h2>use binary search to find the file entry</h2><h4>src/storage.cpp:989</h4><pre style="background: #f6f6f6; border: solid 1px #ddd;"> size_type ret = 0;
if (f &amp;&amp; !ec) ret = f-&gt;phys_offset(file_offset); if (f &amp;&amp; !ec) ret = f-&gt;phys_offset(file_offset);
if (ret == 0) if (ret == 0)
@ -130,7 +76,7 @@ more primitive platforms</h2><h4>src/alert.cpp:370</h4><pre style="background: #
if (file_bytes_left == 0) continue; if (file_bytes_left == 0) continue;
if (file_iter-&gt;pad_file) continue; if (file_iter-&gt;pad_file) continue;
</pre></td></tr><tr style="background: #fcc"><td>relevance&nbsp;3</td><td><a href="javascript:expand(2)">src/storage.cpp:1095</a></td><td>use binary search to find the file entry</td></tr><tr id="2" style="display: none;" colspan="3"><td colspan="3"><h2>use binary search to find the file entry</h2><h4>src/storage.cpp:1095</h4><pre style="background: #f6f6f6; border: solid 1px #ddd;"> TORRENT_ASSERT(offset &gt;= 0); </pre></td></tr><tr style="background: #fcc"><td>relevance&nbsp;3</td><td><a href="javascript:expand(1)">src/storage.cpp:1095</a></td><td>use binary search to find the file entry</td></tr><tr id="1" style="display: none;" colspan="3"><td colspan="3"><h2>use binary search to find the file entry</h2><h4>src/storage.cpp:1095</h4><pre style="background: #f6f6f6; border: solid 1px #ddd;"> TORRENT_ASSERT(offset &gt;= 0);
TORRENT_ASSERT(offset &lt; m_files.piece_size(slot)); TORRENT_ASSERT(offset &lt; m_files.piece_size(slot));
TORRENT_ASSERT(num_bufs &gt; 0); TORRENT_ASSERT(num_bufs &gt; 0);
@ -181,7 +127,7 @@ more primitive platforms</h2><h4>src/alert.cpp:370</h4><pre style="background: #
copy_bufs(bufs, size, current_buf); copy_bufs(bufs, size, current_buf);
TORRENT_ASSERT(count_bufs(current_buf, size) == num_bufs); TORRENT_ASSERT(count_bufs(current_buf, size) == num_bufs);
int file_bytes_left; int file_bytes_left;
</pre></td></tr><tr style="background: #fcc"><td>relevance&nbsp;3</td><td><a href="javascript:expand(3)">src/torrent.cpp:1505</a></td><td>filter out peers that are disconnecting</td></tr><tr id="3" style="display: none;" colspan="3"><td colspan="3"><h2>filter out peers that are disconnecting</h2><h4>src/torrent.cpp:1505</h4><pre style="background: #f6f6f6; border: solid 1px #ddd;"> snprintf(filename, sizeof(filename), "/tmp/%d.pem", rand()); </pre></td></tr><tr style="background: #fcc"><td>relevance&nbsp;3</td><td><a href="javascript:expand(2)">src/torrent.cpp:1505</a></td><td>filter out peers that are disconnecting</td></tr><tr id="2" style="display: none;" colspan="3"><td colspan="3"><h2>filter out peers that are disconnecting</h2><h4>src/torrent.cpp:1505</h4><pre style="background: #f6f6f6; border: solid 1px #ddd;"> snprintf(filename, sizeof(filename), "/tmp/%d.pem", rand());
FILE* f = fopen(filename, "w+"); FILE* f = fopen(filename, "w+");
fwrite(cert.c_str(), cert.size(), 1, f); fwrite(cert.c_str(), cert.size(), 1, f);
fclose(f); fclose(f);
@ -232,7 +178,7 @@ more primitive platforms</h2><h4>src/alert.cpp:370</h4><pre style="background: #
{ {
set_error(errors::too_many_pieces_in_torrent, ""); set_error(errors::too_many_pieces_in_torrent, "");
pause(); pause();
</pre></td></tr><tr style="background: #fcc"><td>relevance&nbsp;3</td><td><a href="javascript:expand(4)">src/torrent.cpp:6004</a></td><td>if peer is a really good peer, maybe we shouldn't disconnect it</td></tr><tr id="4" style="display: none;" colspan="3"><td colspan="3"><h2>if peer is a really good peer, maybe we shouldn't disconnect it</h2><h4>src/torrent.cpp:6004</h4><pre style="background: #f6f6f6; border: solid 1px #ddd;"> return false; </pre></td></tr><tr style="background: #fcc"><td>relevance&nbsp;3</td><td><a href="javascript:expand(3)">src/torrent.cpp:6004</a></td><td>if peer is a really good peer, maybe we shouldn't disconnect it</td></tr><tr id="3" style="display: none;" colspan="3"><td colspan="3"><h2>if peer is a really good peer, maybe we shouldn't disconnect it</h2><h4>src/torrent.cpp:6004</h4><pre style="background: #f6f6f6; border: solid 1px #ddd;"> return false;
} }
TORRENT_ASSERT(m_connections.find(p) == m_connections.end()); TORRENT_ASSERT(m_connections.find(p) == m_connections.end());
m_connections.insert(p); m_connections.insert(p);
@ -283,7 +229,7 @@ more primitive platforms</h2><h4>src/alert.cpp:370</h4><pre style="background: #
return m_connections.size() &lt; m_max_connections return m_connections.size() &lt; m_max_connections
&amp;&amp; !is_paused() &amp;&amp; !is_paused()
&amp;&amp; ((m_state != torrent_status::checking_files &amp;&amp; ((m_state != torrent_status::checking_files
</pre></td></tr><tr style="background: #fcc"><td>relevance&nbsp;3</td><td><a href="javascript:expand(5)">src/torrent.cpp:8718</a></td><td>with 110 as response codes, we should just consider the tracker as a failure and not retry it anymore</td></tr><tr id="5" style="display: none;" colspan="3"><td colspan="3"><h2>with 110 as response codes, we should just consider </pre></td></tr><tr style="background: #fcc"><td>relevance&nbsp;3</td><td><a href="javascript:expand(4)">src/torrent.cpp:8718</a></td><td>with 110 as response codes, we should just consider the tracker as a failure and not retry it anymore</td></tr><tr id="4" style="display: none;" colspan="3"><td colspan="3"><h2>with 110 as response codes, we should just consider
the tracker as a failure and not retry the tracker as a failure and not retry
it anymore</h2><h4>src/torrent.cpp:8718</h4><pre style="background: #f6f6f6; border: solid 1px #ddd;"> TORRENT_ASSERT(m_ses.is_network_thread()); it anymore</h2><h4>src/torrent.cpp:8718</h4><pre style="background: #f6f6f6; border: solid 1px #ddd;"> TORRENT_ASSERT(m_ses.is_network_thread());
TORRENT_ASSERT(b &gt; 0); TORRENT_ASSERT(b &gt; 0);
@ -336,7 +282,7 @@ it anymore</h2><h4>src/torrent.cpp:8718</h4><pre style="background: #f6f6f6; bor
, ae?ae-&gt;fails:0, response_code, r.url, ec, msg)); , ae?ae-&gt;fails:0, response_code, r.url, ec, msg));
} }
} }
</pre></td></tr><tr style="background: #fcc"><td>relevance&nbsp;3</td><td><a href="javascript:expand(6)">src/utp_stream.cpp:412</a></td><td>remove the read timeout concept. This should not be necessary</td></tr><tr id="6" style="display: none;" colspan="3"><td colspan="3"><h2>remove the read timeout concept. This should not be necessary</h2><h4>src/utp_stream.cpp:412</h4><pre style="background: #f6f6f6; border: solid 1px #ddd;"> // these are the callbacks made into the utp_stream object </pre></td></tr><tr style="background: #fcc"><td>relevance&nbsp;3</td><td><a href="javascript:expand(5)">src/utp_stream.cpp:412</a></td><td>remove the read timeout concept. This should not be necessary</td></tr><tr id="5" style="display: none;" colspan="3"><td colspan="3"><h2>remove the read timeout concept. This should not be necessary</h2><h4>src/utp_stream.cpp:412</h4><pre style="background: #f6f6f6; border: solid 1px #ddd;"> // these are the callbacks made into the utp_stream object
// on read/write/connect events // on read/write/connect events
utp_stream::handler_t m_read_handler; utp_stream::handler_t m_read_handler;
utp_stream::handler_t m_write_handler; utp_stream::handler_t m_write_handler;
@ -358,7 +304,7 @@ it anymore</h2><h4>src/torrent.cpp:8718</h4><pre style="background: #f6f6f6; bor
// before) // before)
<div style="background: #ffff00" width="100%"> ptime m_read_timeout; <div style="background: #ffff00" width="100%"> ptime m_read_timeout;
</div> </div>
</pre></td></tr><tr style="background: #fcc"><td>relevance&nbsp;3</td><td><a href="javascript:expand(7)">src/utp_stream.cpp:415</a></td><td>remove the write timeout concept. This should not be necessary</td></tr><tr id="7" style="display: none;" colspan="3"><td colspan="3"><h2>remove the write timeout concept. This should not be necessary</h2><h4>src/utp_stream.cpp:415</h4><pre style="background: #f6f6f6; border: solid 1px #ddd;"> // these are the callbacks made into the utp_stream object </pre></td></tr><tr style="background: #fcc"><td>relevance&nbsp;3</td><td><a href="javascript:expand(6)">src/utp_stream.cpp:415</a></td><td>remove the write timeout concept. This should not be necessary</td></tr><tr id="6" style="display: none;" colspan="3"><td colspan="3"><h2>remove the write timeout concept. This should not be necessary</h2><h4>src/utp_stream.cpp:415</h4><pre style="background: #f6f6f6; border: solid 1px #ddd;"> // these are the callbacks made into the utp_stream object
// on read/write/connect events // on read/write/connect events
utp_stream::handler_t m_read_handler; utp_stream::handler_t m_read_handler;
utp_stream::handler_t m_write_handler; utp_stream::handler_t m_write_handler;
@ -409,7 +355,7 @@ it anymore</h2><h4>src/torrent.cpp:8718</h4><pre style="background: #f6f6f6; bor
timestamp_history m_their_delay_hist; timestamp_history m_their_delay_hist;
// the number of bytes we have buffered in m_inbuf // the number of bytes we have buffered in m_inbuf
</pre></td></tr><tr style="background: #fcc"><td>relevance&nbsp;3</td><td><a href="javascript:expand(8)">src/kademlia/rpc_manager.cpp:36</a></td><td>remove this dependency by having the dht observer have its own flags</td></tr><tr id="8" style="display: none;" colspan="3"><td colspan="3"><h2>remove this dependency by having the dht observer </pre></td></tr><tr style="background: #fcc"><td>relevance&nbsp;3</td><td><a href="javascript:expand(7)">src/kademlia/rpc_manager.cpp:36</a></td><td>remove this dependency by having the dht observer have its own flags</td></tr><tr id="7" style="display: none;" colspan="3"><td colspan="3"><h2>remove this dependency by having the dht observer
have its own flags</h2><h4>src/kademlia/rpc_manager.cpp:36</h4><pre style="background: #f6f6f6; border: solid 1px #ddd;"> contributors may be used to endorse or promote products derived have its own flags</h2><h4>src/kademlia/rpc_manager.cpp:36</h4><pre style="background: #f6f6f6; border: solid 1px #ddd;"> contributors may be used to endorse or promote products derived
from this software without specific prior written permission. from this software without specific prior written permission.
@ -461,6 +407,53 @@ namespace io = libtorrent::detail;
#ifdef TORRENT_DHT_VERBOSE_LOGGING #ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_DEFINE_LOG(rpc) TORRENT_DEFINE_LOG(rpc)
</pre></td></tr><tr style="background: #fcc"><td>relevance&nbsp;3</td><td><a href="javascript:expand(8)">include/libtorrent/thread.hpp:61</a></td><td>make this interface compatible with c++11 to allow for smooth transition for platforms with support</td></tr><tr id="8" style="display: none;" colspan="3"><td colspan="3"><h2>make this interface compatible with c++11
to allow for smooth transition for platforms with support</h2><h4>include/libtorrent/thread.hpp:61</h4><pre style="background: #f6f6f6; border: solid 1px #ddd;">#endif
#if defined TORRENT_BEOS
#include &lt;kernel/OS.h&gt;
#endif
#include &lt;memory&gt; // for auto_ptr required by asio
#include &lt;boost/asio/detail/thread.hpp&gt;
#include &lt;boost/asio/detail/mutex.hpp&gt;
#include &lt;boost/asio/detail/event.hpp&gt;
namespace libtorrent
{
typedef boost::asio::detail::thread thread;
typedef boost::asio::detail::mutex mutex;
typedef boost::asio::detail::event event;
TORRENT_EXPORT void sleep(int milliseconds);
<div style="background: #ffff00" width="100%"> struct TORRENT_EXTRA_EXPORT condition
</div> {
condition();
~condition();
void wait(mutex::scoped_lock&amp; l);
void timed_wait(mutex::scoped_lock&amp; l, int sleep_ms);
void signal_all(mutex::scoped_lock&amp; l);
private:
#ifdef BOOST_HAS_PTHREADS
pthread_cond_t m_cond;
#elif defined TORRENT_WINDOWS || defined TORRENT_CYGWIN
HANDLE m_sem;
mutex m_mutex;
int m_num_waiters;
#elif defined TORRENT_BEOS
sem_id m_sem;
mutex m_mutex;
int m_num_waiters;
#else
#error not implemented
#endif
};
}
#endif
</pre></td></tr><tr style="background: #fcc"><td>relevance&nbsp;3</td><td><a href="javascript:expand(9)">include/libtorrent/kademlia/dht_tracker.hpp:79</a></td><td>take a udp_socket_interface here instead. Move udp_socket_interface down into libtorrent core</td></tr><tr id="9" style="display: none;" colspan="3"><td colspan="3"><h2>take a udp_socket_interface here instead. Move udp_socket_interface down into libtorrent core</h2><h4>include/libtorrent/kademlia/dht_tracker.hpp:79</h4><pre style="background: #f6f6f6; border: solid 1px #ddd;"> struct lazy_entry; </pre></td></tr><tr style="background: #fcc"><td>relevance&nbsp;3</td><td><a href="javascript:expand(9)">include/libtorrent/kademlia/dht_tracker.hpp:79</a></td><td>take a udp_socket_interface here instead. Move udp_socket_interface down into libtorrent core</td></tr><tr id="9" style="display: none;" colspan="3"><td colspan="3"><h2>take a udp_socket_interface here instead. Move udp_socket_interface down into libtorrent core</h2><h4>include/libtorrent/kademlia/dht_tracker.hpp:79</h4><pre style="background: #f6f6f6; border: solid 1px #ddd;"> struct lazy_entry;
} }

View File

@ -171,11 +171,11 @@ namespace libtorrent {
#endif #endif
private: private:
void post_impl(std::auto_ptr<alert>& alert_); void post_impl(std::auto_ptr<alert>& alert_, mutex::scoped_lock& l);
std::deque<alert*> m_alerts; std::deque<alert*> m_alerts;
mutable mutex m_mutex; mutable mutex m_mutex;
// event m_condition; condition m_condition;
boost::uint32_t m_alert_mask; boost::uint32_t m_alert_mask;
size_t m_queue_size_limit; size_t m_queue_size_limit;
boost::function<void(std::auto_ptr<alert>)> m_dispatch; boost::function<void(std::auto_ptr<alert>)> m_dispatch;

View File

@ -58,11 +58,14 @@ namespace libtorrent
TORRENT_EXPORT void sleep(int milliseconds); TORRENT_EXPORT void sleep(int milliseconds);
// TODO: 3 make this interface compatible with c++11
// to allow for smooth transition for platforms with support
struct TORRENT_EXTRA_EXPORT condition struct TORRENT_EXTRA_EXPORT condition
{ {
condition(); condition();
~condition(); ~condition();
void wait(mutex::scoped_lock& l); void wait(mutex::scoped_lock& l);
void timed_wait(mutex::scoped_lock& l, int sleep_ms);
void signal_all(mutex::scoped_lock& l); void signal_all(mutex::scoped_lock& l);
private: private:
#ifdef BOOST_HAS_PTHREADS #ifdef BOOST_HAS_PTHREADS

View File

@ -358,27 +358,11 @@ namespace libtorrent {
if (!m_alerts.empty()) return m_alerts.front(); if (!m_alerts.empty()) return m_alerts.front();
// system_time end = get_system_time()
// + boost::posix_time::microseconds(total_microseconds(max_wait));
// this call can be interrupted prematurely by other signals // this call can be interrupted prematurely by other signals
// while (m_condition.timed_wait(lock, end)) m_condition.timed_wait(lock, total_milliseconds(max_wait));
// if (!m_alerts.empty()) return m_alerts.front(); if (!m_alerts.empty()) return m_alerts.front();
ptime start = time_now_hires(); return NULL;
// TODO: 3 change this to use a timed wait on a condition variable
// problem is, that's not necessarily portable. But it should be used
// where available. This implementation can be left the way it is for
// more primitive platforms
while (m_alerts.empty())
{
lock.unlock();
sleep(50);
lock.lock();
if (time_now_hires() - start >= max_wait) return 0;
}
return m_alerts.front();
} }
void alert_manager::set_dispatch_function(boost::function<void(std::auto_ptr<alert>)> const& fun) void alert_manager::set_dispatch_function(boost::function<void(std::auto_ptr<alert>)> const& fun)
@ -422,7 +406,7 @@ namespace libtorrent {
#endif #endif
mutex::scoped_lock lock(m_mutex); mutex::scoped_lock lock(m_mutex);
post_impl(a); post_impl(a, lock);
} }
void alert_manager::post_alert(const alert& alert_) void alert_manager::post_alert(const alert& alert_)
@ -440,10 +424,10 @@ namespace libtorrent {
#endif #endif
mutex::scoped_lock lock(m_mutex); mutex::scoped_lock lock(m_mutex);
post_impl(a); post_impl(a, lock);
} }
void alert_manager::post_impl(std::auto_ptr<alert>& alert_) void alert_manager::post_impl(std::auto_ptr<alert>& alert_, mutex::scoped_lock& l)
{ {
if (m_dispatch) if (m_dispatch)
{ {
@ -455,6 +439,8 @@ namespace libtorrent {
else if (m_alerts.size() < m_queue_size_limit || !alert_->discardable()) else if (m_alerts.size() < m_queue_size_limit || !alert_->discardable())
{ {
m_alerts.push_back(alert_.release()); m_alerts.push_back(alert_.release());
if (m_alerts.size() == 1)
m_condition.signal_all(l);
} }
} }

View File

@ -37,6 +37,11 @@ POSSIBILITY OF SUCH DAMAGE.
#include <kernel/OS.h> #include <kernel/OS.h>
#endif #endif
#ifdef BOOST_HAS_PTHREADS
#include <sys/time.h> // for gettimeofday()
#include <boost/cstdint.hpp>
#endif
namespace libtorrent namespace libtorrent
{ {
void sleep(int milliseconds) void sleep(int milliseconds)
@ -69,6 +74,21 @@ namespace libtorrent
pthread_cond_wait(&m_cond, (::pthread_mutex_t*)&l.mutex()); pthread_cond_wait(&m_cond, (::pthread_mutex_t*)&l.mutex());
} }
void condition::timed_wait(mutex::scoped_lock& l, int sleep_ms)
{
TORRENT_ASSERT(l.locked());
struct timeval tv;
struct timespec ts;
gettimeofday(&tv, NULL);
boost::uint64_t microseconds = tv.tv_usec + boost::uint64_t(sleep_ms % 1000) * 1000;
ts.tv_nsec = (microseconds % 1000000) * 1000;
ts.tv_sec = tv.tv_sec + sleep_ms / 1000 + microseconds / 1000000;
// wow, this is quite a hack
pthread_cond_timedwait(&m_cond, (::pthread_mutex_t*)&l.mutex(), &ts);
}
void condition::signal_all(mutex::scoped_lock& l) void condition::signal_all(mutex::scoped_lock& l)
{ {
TORRENT_ASSERT(l.locked()); TORRENT_ASSERT(l.locked());
@ -96,6 +116,16 @@ namespace libtorrent
--m_num_waiters; --m_num_waiters;
} }
void condition::timed_wait(mutex::scoped_lock& l, int sleep_ms)
{
TORRENT_ASSERT(l.locked());
++m_num_waiters;
l.unlock();
WaitForSingleObject(m_sem, sleep_ms);
l.lock();
--m_num_waiters;
}
void condition::signal_all(mutex::scoped_lock& l) void condition::signal_all(mutex::scoped_lock& l)
{ {
TORRENT_ASSERT(l.locked()); TORRENT_ASSERT(l.locked());
@ -122,6 +152,16 @@ namespace libtorrent
l.lock(); l.lock();
--m_num_waiters; --m_num_waiters;
} }
void condition::timed_wait(mutex::scoped_lock& l, int sleep_ms)
{
TORRENT_ASSERT(l.locked());
++m_num_waiters;
l.unlock();
acquire_sem_etc(m_sem, 1, B_RELATIVE_TIMEOUT, bigtime_t(sleep_ms) * 1000);
l.lock();
--m_num_waiters;
}
void condition::signal_all(mutex::scoped_lock& l) void condition::signal_all(mutex::scoped_lock& l)
{ {