fixed race condition in bandwidth limiter

This commit is contained in:
Arvid Norberg 2008-01-03 18:53:07 +00:00
parent 29ccb9220b
commit a49cb42345
1 changed files with 14 additions and 6 deletions

View File

@ -123,6 +123,7 @@ struct bandwidth_manager
#ifndef NDEBUG #ifndef NDEBUG
bool is_queued(PeerConnection const* peer) const bool is_queued(PeerConnection const* peer) const
{ {
mutex_t::scoped_lock l(m_mutex);
for (typename queue_t::const_iterator i = m_queue.begin() for (typename queue_t::const_iterator i = m_queue.begin()
, end(m_queue.end()); i != end; ++i) , end(m_queue.end()); i != end; ++i)
{ {
@ -133,6 +134,7 @@ struct bandwidth_manager
bool is_in_history(PeerConnection const* peer) const bool is_in_history(PeerConnection const* peer) const
{ {
mutex_t::scoped_lock l(m_mutex);
for (typename history_t::const_iterator i for (typename history_t::const_iterator i
= m_history.begin(), end(m_history.end()); i != end; ++i) = m_history.begin(), end(m_history.end()); i != end; ++i)
{ {
@ -152,12 +154,13 @@ struct bandwidth_manager
{ {
INVARIANT_CHECK; INVARIANT_CHECK;
TORRENT_ASSERT(blk > 0); TORRENT_ASSERT(blk > 0);
TORRENT_ASSERT(!is_queued(peer.get()));
mutex_t::scoped_lock l(m_mutex);
TORRENT_ASSERT(!peer->ignore_bandwidth_limits()); TORRENT_ASSERT(!peer->ignore_bandwidth_limits());
// make sure this peer isn't already in line // make sure this peer isn't already in line
// waiting for bandwidth // waiting for bandwidth
TORRENT_ASSERT(!is_queued(peer.get()));
TORRENT_ASSERT(peer->max_assignable_bandwidth(m_channel) > 0); TORRENT_ASSERT(peer->max_assignable_bandwidth(m_channel) > 0);
boost::shared_ptr<Torrent> t = peer->associated_torrent().lock(); boost::shared_ptr<Torrent> t = peer->associated_torrent().lock();
@ -185,7 +188,7 @@ struct bandwidth_manager
#ifdef TORRENT_VERBOSE_BANDWIDTH_LIMIT #ifdef TORRENT_VERBOSE_BANDWIDTH_LIMIT
std::cerr << " req_bandwidht. m_queue.size() = " << m_queue.size() << std::endl; std::cerr << " req_bandwidht. m_queue.size() = " << m_queue.size() << std::endl;
#endif #endif
if (!m_queue.empty()) hand_out_bandwidth(); if (!m_queue.empty()) hand_out_bandwidth(l);
} }
#ifndef NDEBUG #ifndef NDEBUG
@ -229,6 +232,7 @@ private:
TORRENT_ASSERT(!m_history.empty()); TORRENT_ASSERT(!m_history.empty());
mutex_t::scoped_lock l(m_mutex);
ptime now(time_now()); ptime now(time_now());
while (!m_history.empty() && m_history.back().expires_at <= now) while (!m_history.empty() && m_history.back().expires_at <= now)
{ {
@ -238,8 +242,10 @@ private:
TORRENT_ASSERT(m_current_quota >= 0); TORRENT_ASSERT(m_current_quota >= 0);
intrusive_ptr<PeerConnection> c = e.peer; intrusive_ptr<PeerConnection> c = e.peer;
shared_ptr<Torrent> t = e.tor.lock(); shared_ptr<Torrent> t = e.tor.lock();
l.unlock();
if (!c->is_disconnecting()) c->expire_bandwidth(m_channel, e.amount); if (!c->is_disconnecting()) c->expire_bandwidth(m_channel, e.amount);
if (t) t->expire_bandwidth(m_channel, e.amount); if (t) t->expire_bandwidth(m_channel, e.amount);
l.lock();
} }
// now, wait for the next chunk to expire // now, wait for the next chunk to expire
@ -253,10 +259,10 @@ private:
// since some bandwidth just expired, it // since some bandwidth just expired, it
// means we can hand out more (in case there // means we can hand out more (in case there
// are still consumers in line) // are still consumers in line)
if (!m_queue.empty()) hand_out_bandwidth(); if (!m_queue.empty()) hand_out_bandwidth(l);
} }
void hand_out_bandwidth() void hand_out_bandwidth(boost::mutex::scoped_lock& l)
{ {
// if we're already handing out bandwidth, just return back // if we're already handing out bandwidth, just return back
// to the loop further down on the callstack // to the loop further down on the callstack
@ -267,9 +273,7 @@ private:
ptime now(time_now()); ptime now(time_now());
mutex_t::scoped_lock l(m_mutex);
int limit = m_limit; int limit = m_limit;
l.unlock();
// available bandwidth to hand out // available bandwidth to hand out
int amount = limit - m_current_quota; int amount = limit - m_current_quota;
@ -301,7 +305,9 @@ private:
if (!t) continue; if (!t) continue;
if (qe.peer->is_disconnecting()) if (qe.peer->is_disconnecting())
{ {
l.unlock();
t->expire_bandwidth(m_channel, qe.max_block_size); t->expire_bandwidth(m_channel, qe.max_block_size);
l.lock();
TORRENT_ASSERT(amount == limit - m_current_quota); TORRENT_ASSERT(amount == limit - m_current_quota);
continue; continue;
} }
@ -371,8 +377,10 @@ private:
TORRENT_ASSERT(amount == limit - m_current_quota); TORRENT_ASSERT(amount == limit - m_current_quota);
amount -= hand_out_amount; amount -= hand_out_amount;
TORRENT_ASSERT(hand_out_amount <= qe.max_block_size); TORRENT_ASSERT(hand_out_amount <= qe.max_block_size);
l.unlock();
t->assign_bandwidth(m_channel, hand_out_amount, qe.max_block_size); t->assign_bandwidth(m_channel, hand_out_amount, qe.max_block_size);
qe.peer->assign_bandwidth(m_channel, hand_out_amount); qe.peer->assign_bandwidth(m_channel, hand_out_amount);
l.lock();
add_history_entry(history_entry<PeerConnection, Torrent>( add_history_entry(history_entry<PeerConnection, Torrent>(
qe.peer, t, hand_out_amount, now + bw_window_size)); qe.peer, t, hand_out_amount, now + bw_window_size));
TORRENT_ASSERT(amount == limit - m_current_quota); TORRENT_ASSERT(amount == limit - m_current_quota);