greatly simplified the rate limiter and generalized all rate limits (peer, torrent, global) to allow for arbitrary rate limit configurations

This commit is contained in:
Arvid Norberg 2009-04-26 00:21:59 +00:00
parent f7ebd88bec
commit 7109b86566
15 changed files with 551 additions and 1043 deletions

View File

@ -382,11 +382,9 @@ void print_peer_info(std::ostream& out, std::vector<libtorrent::peer_info> const
<< ((i->flags & peer_info::seed)?'s':'.')
<< ((i->flags & peer_info::on_parole)?'p':'.')
<< ((i->flags & peer_info::optimistic_unchoke)?'O':'.')
<< ((i->read_state == peer_info::bw_torrent)?'t':
(i->read_state == peer_info::bw_global)?'r':
<< ((i->read_state == peer_info::bw_limit)?'r':
(i->read_state == peer_info::bw_network)?'R':'.')
<< ((i->write_state == peer_info::bw_torrent)?'t':
(i->write_state == peer_info::bw_global)?'w':
<< ((i->write_state == peer_info::bw_limit)?'w':
(i->write_state == peer_info::bw_network)?'W':'.')
<< ((i->flags & peer_info::snubbed)?'S':'.')
<< ((i->flags & peer_info::upload_only)?'U':'D')

View File

@ -387,10 +387,13 @@ namespace libtorrent
// handing out bandwidth to connections that
// asks for it, it can also throttle the
// rate.
bandwidth_manager<peer_connection, torrent> m_download_channel;
bandwidth_manager<peer_connection, torrent> m_upload_channel;
bandwidth_manager<peer_connection> m_download_rate;
bandwidth_manager<peer_connection> m_upload_rate;
bandwidth_manager<peer_connection, torrent>* m_bandwidth_manager[2];
bandwidth_channel m_download_channel;
bandwidth_channel m_upload_channel;
bandwidth_channel* m_bandwidth_channel[2];
tracker_manager m_tracker_manager;
torrent_map m_torrents;
@ -530,13 +533,15 @@ namespace libtorrent
// NAT or not.
bool m_incoming_connection;
void second_tick(error_code const& e);
void on_tick(error_code const& e);
void recalculate_auto_managed_torrents();
void recalculate_unchoke_slots(int congested_torrents
, int uncongested_torrents);
void recalculate_optimistic_unchoke_slot();
ptime m_last_tick;
ptime m_last_second_tick;
// the last time we went through the peers
// to decide which ones to choke/unchoke
@ -581,11 +586,11 @@ namespace libtorrent
int m_tcp_mapping[2];
int m_udp_mapping[2];
// the timer used to fire the second_tick
// the timer used to fire the tick
deadline_timer m_timer;
// the index of the torrent that will be offered to
// connect to a peer next time second_tick is called.
// connect to a peer next time on_tick is called.
// This implements a round robin.
int m_next_connect_torrent;
#ifdef TORRENT_DEBUG

View File

@ -30,8 +30,8 @@ POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef TORRENT_BANDWIDTH_LIMIT_HPP_INCLUDED
#define TORRENT_BANDWIDTH_LIMIT_HPP_INCLUDED
#ifndef TORRENT_BANDWIDTH_CHANNEL_HPP_INCLUDED
#define TORRENT_BANDWIDTH_CHANNEL_HPP_INCLUDED
#include <boost/integer_traits.hpp>
@ -40,78 +40,77 @@ POSSIBILITY OF SUCH DAMAGE.
namespace libtorrent {
// member of peer_connection
struct bandwidth_limit
struct bandwidth_channel
{
static const int inf = boost::integer_traits<int>::const_max;
bandwidth_limit()
bandwidth_channel()
: m_quota_left(0)
, m_local_limit(inf)
, m_current_rate(0)
, m_limit(0)
{}
// 0 means infinite
void throttle(int limit)
{
TORRENT_ASSERT(limit > 0);
m_local_limit = limit;
TORRENT_ASSERT(limit >= 0);
// if the throttle is more than this, we might overflow
TORRENT_ASSERT(limit < INT_MAX / 31);
m_limit = limit;
}
int throttle() const
{
return m_local_limit;
return m_limit;
}
void assign(int amount)
int quota_left() const
{
if (m_limit == 0) return inf;
return (std::max)(m_quota_left, 0);
}
void update_quota(int dt_milliseconds)
{
if (m_limit == 0) return;
m_quota_left += (m_limit * dt_milliseconds + 500) / 1000;
if (m_quota_left > m_limit * 3) m_quota_left = m_limit * 3;
distribute_quota = (std::max)(m_quota_left, 0);
// fprintf(stderr, "%p: [%d]: + %d limit: %d\n", this, dt_milliseconds, (m_limit * dt_milliseconds + 500) / 1000, m_limit);
}
// this is used when connections disconnect with
// some quota left. It's returned to its bandwidth
// channels.
void return_quota(int amount)
{
TORRENT_ASSERT(amount >= 0);
m_current_rate += amount;
if (m_limit == 0) return;
m_quota_left += amount;
}
void use_quota(int amount)
{
TORRENT_ASSERT(amount <= m_quota_left);
TORRENT_ASSERT(amount >= 0);
if (m_limit == 0) return;
m_quota_left -= amount;
}
int quota_left() const
{
return (std::max)(m_quota_left, 0);
}
// used as temporary storage while distributing
// bandwidth
int tmp;
void expire(int amount)
{
TORRENT_ASSERT(amount >= 0);
m_current_rate -= amount;
}
int max_assignable() const
{
if (m_local_limit == inf) return inf;
if (m_local_limit <= m_current_rate) return 0;
return m_local_limit - m_current_rate;
}
// this is the number of bytes to distribute this round
int distribute_quota;
private:
// this is the amount of bandwidth we have
// been assigned without using yet. i.e.
// the bandwidth that we use up every time
// we receive or send a message. Once this
// hits zero, we need to request more
// bandwidth from the torrent which
// in turn will request bandwidth from
// the bandwidth manager
// been assigned without using yet.
int m_quota_left;
// the local limit is the number of bytes
// per window size we are allowed to use.
int m_local_limit;
// the current rate is the number of
// bytes we have been assigned within
// the window size.
int m_current_rate;
// the limit is the number of bytes
// per second we are allowed to use.
int m_limit;
};
}

View File

@ -38,7 +38,6 @@ POSSIBILITY OF SUCH DAMAGE.
#include <boost/function.hpp>
#include <boost/bind.hpp>
#include <boost/integer_traits.hpp>
#include <boost/thread/mutex.hpp>
#include <deque>
#ifdef TORRENT_VERBOSE_BANDWIDTH_LIMIT
@ -59,65 +58,16 @@ using boost::bind;
namespace libtorrent {
// the maximum block of bandwidth quota to
// hand out is 33kB. The block size may
// be smaller on lower limits
enum
{
max_bandwidth_block_size = 33000,
min_bandwidth_block_size = 400
};
const time_duration bw_window_size = seconds(1);
template<class PeerConnection, class Torrent>
struct history_entry
{
history_entry(intrusive_ptr<PeerConnection> p, weak_ptr<Torrent> t
, int a, ptime exp)
: expires_at(exp), amount(a), peer(p), tor(t) {}
history_entry(int a, ptime exp)
: expires_at(exp), amount(a), peer(), tor() {}
ptime expires_at;
int amount;
intrusive_ptr<PeerConnection> peer;
weak_ptr<Torrent> tor;
};
template<class T>
T clamp(T val, T ceiling, T floor)
{
TORRENT_ASSERT(ceiling >= floor);
if (val >= ceiling) return ceiling;
else if (val <= floor) return floor;
return val;
}
template<class T>
struct assign_at_exit
{
assign_at_exit(T& var, T val): var_(var), val_(val) {}
~assign_at_exit() { var_ = val_; }
T& var_;
T val_;
};
template<class PeerConnection, class Torrent>
template<class PeerConnection>
struct bandwidth_manager
{
bandwidth_manager(io_service& ios, int channel
bandwidth_manager(int channel
#ifdef TORRENT_VERBOSE_BANDWIDTH_LIMIT
, bool log = false
#endif
)
: m_ios(ios)
, m_history_timer(m_ios)
, m_limit(bandwidth_limit::inf)
, m_drain_quota(0)
, m_current_quota(0)
, m_queued_bytes(0)
: m_queued_bytes(0)
, m_channel(channel)
, m_in_hand_out_bandwidth(false)
, m_abort(false)
{
#ifdef TORRENT_VERBOSE_BANDWIDTH_LIMIT
@ -127,47 +77,16 @@ struct bandwidth_manager
#endif
}
void drain(int bytes)
{
mutex_t::scoped_lock l(m_mutex);
TORRENT_ASSERT(bytes >= 0);
m_drain_quota += bytes;
if (m_drain_quota > m_limit * 5) m_drain_quota = m_limit * 5;
}
void throttle(int limit)
{
mutex_t::scoped_lock l(m_mutex);
TORRENT_ASSERT(limit >= 0);
m_limit = limit;
}
int throttle() const
{
mutex_t::scoped_lock l(m_mutex);
return m_limit;
}
void close()
{
mutex_t::scoped_lock l(m_mutex);
m_abort = true;
m_queue.clear();
m_queued_bytes = 0;
m_history.clear();
m_current_quota = 0;
error_code ec;
m_history_timer.cancel(ec);
}
#ifdef TORRENT_DEBUG
bool is_queued(PeerConnection const* peer) const
{
mutex_t::scoped_lock l(m_mutex);
return is_queued(peer);
}
bool is_queued(PeerConnection const* peer, boost::mutex::scoped_lock& l) const
{
for (typename queue_t::const_iterator i = m_queue.begin()
, end(m_queue.end()); i != end; ++i)
@ -176,33 +95,15 @@ struct bandwidth_manager
}
return false;
}
bool is_in_history(PeerConnection const* peer) const
{
mutex_t::scoped_lock l(m_mutex);
return is_in_history(peer, l);
}
bool is_in_history(PeerConnection const* peer, boost::mutex::scoped_lock& l) const
{
for (typename history_t::const_iterator i
= m_history.begin(), end(m_history.end()); i != end; ++i)
{
if (i->peer.get() == peer) return true;
}
return false;
}
#endif
int queue_size() const
{
mutex_t::scoped_lock l(m_mutex);
return m_queue.size();
}
int queued_bytes() const
{
mutex_t::scoped_lock l(m_mutex);
return m_queued_bytes;
}
@ -210,297 +111,144 @@ struct bandwidth_manager
// others will cut in front of the non-prioritized peers.
// this is used by web seeds
void request_bandwidth(intrusive_ptr<PeerConnection> const& peer
, int blk, int priority)
, int blk, int priority
, bandwidth_channel* chan1 = 0
, bandwidth_channel* chan2 = 0
, bandwidth_channel* chan3 = 0
, bandwidth_channel* chan4 = 0
, bandwidth_channel* chan5 = 0
)
{
mutex_t::scoped_lock l(m_mutex);
INVARIANT_CHECK;
if (m_abort) return;
TORRENT_ASSERT(blk > 0);
TORRENT_ASSERT(!is_queued(peer.get(), l));
TORRENT_ASSERT(priority > 0);
TORRENT_ASSERT(!is_queued(peer.get()));
// make sure this peer isn't already in line
// waiting for bandwidth
TORRENT_ASSERT(peer->max_assignable_bandwidth(m_channel) > 0);
typename queue_t::reverse_iterator i(m_queue.rbegin());
while (i != m_queue.rend() && priority > i->priority)
bw_request<PeerConnection> bwr(peer, blk, priority);
int i = 0;
if (chan1 && chan1->throttle() > 0) bwr.channel[i++] = chan1;
if (chan2 && chan2->throttle() > 0) bwr.channel[i++] = chan2;
if (chan3 && chan3->throttle() > 0) bwr.channel[i++] = chan3;
if (chan4 && chan4->throttle() > 0) bwr.channel[i++] = chan4;
if (chan5 && chan5->throttle() > 0) bwr.channel[i++] = chan5;
if (i == 0)
{
++i->priority;
++i;
// the connection is not rate limited by any of its
// bandwidth channels, or it doesn't belong to any
// channels. There's no point in adding it to
// the queue, just satisfy the request immediately
bwr.peer->assign_bandwidth(m_channel, blk);
return;
}
m_queue.insert(i.base(), bw_queue_entry<PeerConnection, Torrent>(peer, blk, priority));
m_queued_bytes += blk;
if (!m_queue.empty()) hand_out_bandwidth(l);
m_queue.push_back(bwr);
}
#ifdef TORRENT_DEBUG
void check_invariant() const
{
int current_quota = 0;
for (typename history_t::const_iterator i
= m_history.begin(), end(m_history.end()); i != end; ++i)
{
current_quota += i->amount;
}
TORRENT_ASSERT(current_quota == m_current_quota);
int bytes = 0;
typename queue_t::const_iterator j = m_queue.begin();
if (j != m_queue.end())
{
bytes += j->max_block_size;
++j;
for (typename queue_t::const_iterator i = m_queue.begin()
, end(m_queue.end()); i != end && j != end; ++i, ++j)
{
TORRENT_ASSERT(i->priority >= j->priority);
bytes += j->max_block_size;
}
}
TORRENT_ASSERT(bytes == m_queued_bytes);
}
#endif
private:
void add_history_entry(history_entry<PeerConnection, Torrent> const& e)
void update_quotas(time_duration const& dt)
{
INVARIANT_CHECK;
m_history.push_front(e);
m_current_quota += e.amount;
// in case the size > 1 there is already a timer
// active that will be invoked, no need to set one up
#ifdef TORRENT_VERBOSE_BANDWIDTH_LIMIT
m_log << std::setw(7) << total_milliseconds(time_now() - m_start) << " + "
" queue: " << std::setw(3) << m_queue.size()
<< " used: " << std::setw(7) << m_current_quota
<< " limit: " << std::setw(7) << m_limit
<< " history: " << std::setw(3) << m_history.size()
<< std::endl;
#endif
if (m_history.size() > 1) return;
if (m_abort) return;
error_code ec;
TORRENT_ASSERT(e.expires_at > time_now());
m_history_timer.expires_at(e.expires_at, ec);
m_history_timer.async_wait(bind(&bandwidth_manager::on_history_expire, this, _1));
}
void on_history_expire(error_code const& e)
{
if (e) return;
mutex_t::scoped_lock l(m_mutex);
if (m_abort) return;
INVARIANT_CHECK;
TORRENT_ASSERT(!m_history.empty());
ptime now(time_now());
while (!m_history.empty() && m_history.back().expires_at <= now)
{
history_entry<PeerConnection, Torrent> e = m_history.back();
m_history.pop_back();
m_current_quota -= e.amount;
TORRENT_ASSERT(m_current_quota >= 0);
#ifdef TORRENT_VERBOSE_BANDWIDTH_LIMIT
m_log << std::setw(7) << total_milliseconds(time_now() - m_start) << " - "
" queue: " << std::setw(3) << m_queue.size()
<< " used: " << std::setw(7) << m_current_quota
<< " limit: " << std::setw(7) << m_limit
<< " history: " << std::setw(3) << m_history.size()
<< std::endl;
#endif
intrusive_ptr<PeerConnection> c = e.peer;
if (!c) continue;
shared_ptr<Torrent> t = e.tor.lock();
l.unlock();
if (!c->is_disconnecting()) c->expire_bandwidth(m_channel, e.amount);
if (t) t->expire_bandwidth(m_channel, e.amount);
l.lock();
}
// now, wait for the next chunk to expire
if (!m_history.empty() && !m_abort)
{
error_code ec;
TORRENT_ASSERT(m_history.back().expires_at > now);
m_history_timer.expires_at(m_history.back().expires_at, ec);
m_history_timer.async_wait(bind(&bandwidth_manager::on_history_expire, this, _1));
}
// since some bandwidth just expired, it
// means we can hand out more (in case there
// are still consumers in line)
if (!m_queue.empty()) hand_out_bandwidth(l);
}
void hand_out_bandwidth(boost::mutex::scoped_lock& l)
{
// if we're already handing out bandwidth, just return back
// to the loop further down on the callstack
if (m_in_hand_out_bandwidth) return;
m_in_hand_out_bandwidth = true;
// set it to false when exiting function
assign_at_exit<bool> sg(m_in_hand_out_bandwidth, false);
if (m_queue.empty()) return;
INVARIANT_CHECK;
ptime now(time_now());
int dt_milliseconds = total_milliseconds(dt);
if (dt_milliseconds > 3000) dt_milliseconds = 3000;
int limit = m_limit;
// for each bandwidth channel, call update_quota(dt)
// available bandwidth to hand out
int amount = limit - m_current_quota;
std::vector<bandwidth_channel*> channels;
if (amount <= 0) return;
if (m_drain_quota > 0)
for (typename queue_t::iterator i = m_queue.begin();
i != m_queue.end();)
{
int drain_amount = (std::min)(m_drain_quota, amount);
m_drain_quota -= drain_amount;
amount -= drain_amount;
add_history_entry(history_entry<PeerConnection, Torrent>(
drain_amount, now + bw_window_size));
}
queue_t tmp;
while (!m_queue.empty() && amount > 0)
{
bw_queue_entry<PeerConnection, Torrent> qe = m_queue.front();
TORRENT_ASSERT(qe.max_block_size > 0);
m_queued_bytes -= qe.max_block_size;
m_queue.pop_front();
shared_ptr<Torrent> t = qe.torrent.lock();
if (!t) continue;
if (qe.peer->is_disconnecting())
if (i->peer->is_disconnecting())
{
l.unlock();
t->expire_bandwidth(m_channel, qe.max_block_size);
l.lock();
m_queued_bytes -= i->request_size - i->assigned;
// return all assigned quota to all the
// bandwidth channels this peer belongs to
for (int j = 0; j < 5 && i->channel[j]; ++j)
{
bandwidth_channel* bwc = i->channel[j];
bwc->return_quota(i->assigned);
}
i = m_queue.erase(i);
continue;
}
// at this point, max_assignable may actually be zero. Since
// the rate limit of the peer might have changed while it
// was in the queue.
int max_assignable = qe.peer->max_assignable_bandwidth(m_channel);
if (max_assignable == 0)
for (int j = 0; j < 5 && i->channel[j]; ++j)
{
TORRENT_ASSERT(is_in_history(qe.peer.get(), l));
m_queued_bytes += qe.max_block_size;
tmp.push_back(qe);
continue;
bandwidth_channel* bwc = i->channel[j];
bwc->tmp = 0;
}
// this is the limit of the block size. It depends on the throttle
// so that it can be closer to optimal. Larger block sizes will give lower
// granularity to the rate but will be more efficient. At high rates
// the block sizes are bigger and at low rates, the granularity
// is more important and block sizes are smaller
// the minimum rate that can be given is the block size, so, the
// block size must be smaller for lower rates. This is because
// the history window is one second, and the block will be forgotten
// after one second.
int block_size = (std::min)(qe.peer->bandwidth_throttle(m_channel)
, limit / 10);
if (block_size < min_bandwidth_block_size)
{
block_size = (std::min)(int(min_bandwidth_block_size), limit);
}
else if (block_size > max_bandwidth_block_size)
{
if (limit == bandwidth_limit::inf)
{
block_size = max_bandwidth_block_size;
}
else
{
// try to make the block_size a divisor of
// m_limit to make the distributions as fair
// as possible
// TODO: move this calculcation to where the limit
// is changed
block_size = limit
/ (limit / max_bandwidth_block_size);
}
}
if (block_size > qe.max_block_size) block_size = qe.max_block_size;
if (amount < block_size / 4)
{
m_queued_bytes += qe.max_block_size;
tmp.push_back(qe);
// m_queue.push_front(qe);
break;
}
// so, hand out max_assignable, but no more than
// the available bandwidth (amount) and no more
// than the max_bandwidth_block_size
int hand_out_amount = (std::min)((std::min)(block_size, max_assignable)
, amount);
TORRENT_ASSERT(hand_out_amount > 0);
amount -= hand_out_amount;
TORRENT_ASSERT(hand_out_amount <= qe.max_block_size);
l.unlock();
t->assign_bandwidth(m_channel, hand_out_amount, qe.max_block_size);
qe.peer->assign_bandwidth(m_channel, hand_out_amount);
l.lock();
add_history_entry(history_entry<PeerConnection, Torrent>(
qe.peer, t, hand_out_amount, now + bw_window_size));
++i;
}
for (typename queue_t::iterator i = m_queue.begin()
, end(m_queue.end()); i != end; ++i)
{
for (int j = 0; j < 5 && i->channel[j]; ++j)
{
bandwidth_channel* bwc = i->channel[j];
if (bwc->tmp == 0) channels.push_back(bwc);
bwc->tmp += i->priority;
TORRENT_ASSERT(i->priority > 0);
}
}
for (std::vector<bandwidth_channel*>::iterator i = channels.begin()
, end(channels.end()); i != end; ++i)
{
(*i)->update_quota(dt_milliseconds);
}
queue_t tm;
for (typename queue_t::iterator i = m_queue.begin();
i != m_queue.end();)
{
int a = i->assign_bandwidth();
if (i->assigned == i->request_size
|| (i->ttl <= 0 && i->assigned > 0))
{
TORRENT_ASSERT(i->assigned <= i->request_size);
tm.push_back(*i);
i = m_queue.erase(i);
}
else
{
++i;
}
m_queued_bytes -= a;
}
while (!tm.empty())
{
bw_request<PeerConnection>& bwr = tm.back();
bwr.peer->assign_bandwidth(m_channel, bwr.assigned);
tm.pop_back();
}
if (!tmp.empty()) m_queue.insert(m_queue.begin(), tmp.begin(), tmp.end());
}
typedef boost::mutex mutex_t;
mutable mutex_t m_mutex;
// the io_service used for the timer
io_service& m_ios;
// the timer that is waiting for the entries
// in the history queue to expire (slide out
// of the history window)
deadline_timer m_history_timer;
// the rate limit (bytes per second)
int m_limit;
// bytes to drain without handing out to a peer
// used to deduct the IP overhead
int m_drain_quota;
// the sum of all recently handed out bandwidth blocks
int m_current_quota;
// these are the consumers that want bandwidth
typedef std::deque<bw_queue_entry<PeerConnection, Torrent> > queue_t;
typedef std::vector<bw_request<PeerConnection> > queue_t;
queue_t m_queue;
// the number of bytes all the requests in queue are for
int m_queued_bytes;
// these are the consumers that have received bandwidth
// that will expire
typedef std::deque<history_entry<PeerConnection, Torrent> > history_t;
history_t m_history;
// this is the channel within the consumers
// that bandwidth is assigned to (upload or download)
int m_channel;
// this is true while we're in the hand_out_bandwidth loop
// to prevent recursive invocations to interfere
bool m_in_hand_out_bandwidth;
bool m_abort;
#ifdef TORRENT_VERBOSE_BANDWIDTH_LIMIT

View File

@ -34,20 +34,61 @@ POSSIBILITY OF SUCH DAMAGE.
#define TORRENT_BANDWIDTH_QUEUE_ENTRY_HPP_INCLUDED
#include <boost/intrusive_ptr.hpp>
#include "libtorrent/bandwidth_limit.hpp"
namespace libtorrent {
template<class PeerConnection, class Torrent>
struct bw_queue_entry
template<class PeerConnection>
struct bw_request
{
bw_queue_entry(boost::intrusive_ptr<PeerConnection> const& pe
bw_request(boost::intrusive_ptr<PeerConnection> const& pe
, int blk, int prio)
: peer(pe), torrent(peer->associated_torrent())
, max_block_size(blk), priority(prio) {}
: peer(pe)
, priority(prio)
, assigned(0)
, request_size(blk)
, ttl(20)
{
TORRENT_ASSERT(priority > 0);
std::memset(channel, 0, sizeof(channel));
}
boost::intrusive_ptr<PeerConnection> peer;
boost::weak_ptr<Torrent> torrent;
int max_block_size;
int priority; // 0 is low prio
// 1 is normal prio
int priority;
// the number of bytes assigned to this request so far
int assigned;
// once assigned reaches this, we dispatch the request function
int request_size;
// the max number of rounds for this request to survive
// this ensures that requests gets responses at very low
// rate limits, when the requested size would take a long
// time to satisfy
int ttl;
// loops over the bandwidth channels and assigns bandwidth
// from the most limiting one
int assign_bandwidth()
{
TORRENT_ASSERT(assigned < request_size);
int quota = request_size - assigned;
TORRENT_ASSERT(quota >= 0);
for (int j = 0; j < 5 && channel[j]; ++j)
{
if (channel[j]->throttle() == 0) continue;
quota = (std::min)(channel[j]->distribute_quota * priority / channel[j]->tmp, quota);
}
assigned += quota;
for (int j = 0; j < 5 && channel[j]; ++j)
channel[j]->use_quota(quota);
TORRENT_ASSERT(assigned <= request_size);
--ttl;
TORRENT_ASSERT(assigned <= request_size);
return quota;
}
bandwidth_channel* channel[5];
};
}

View File

@ -224,7 +224,10 @@ namespace libtorrent
void no_download(bool b) { m_no_download = b; }
void set_priority(int p)
{ m_priority = p; }
{
TORRENT_ASSERT(p > 0);
m_priority = p;
}
void fast_reconnect(bool r);
bool fast_reconnect() const { return m_fast_reconnect; }
@ -433,14 +436,10 @@ namespace libtorrent
void cancel_request(piece_block const& b);
void send_block_requests();
int max_assignable_bandwidth(int channel) const
{ return m_bandwidth_limit[channel].max_assignable(); }
int bandwidth_throttle(int channel) const
{ return m_bandwidth_limit[channel].throttle(); }
{ return m_bandwidth_channel[channel].throttle(); }
void assign_bandwidth(int channel, int amount);
void expire_bandwidth(int channel, int amount);
#ifdef TORRENT_DEBUG
void check_invariant() const;
@ -578,7 +577,10 @@ namespace libtorrent
// the bandwidth channels, upload and download
// keeps track of the current quotas
bandwidth_limit m_bandwidth_limit[num_channels];
bandwidth_channel m_bandwidth_channel[num_channels];
// number of bytes this peer can send and receive
int m_quota[2];
// statistics about upload and download speeds
// and total amount of uploads and downloads for

View File

@ -84,7 +84,10 @@ namespace libtorrent
// bw_global: the channel is waiting for global quota
// bw_network: the channel is waiting for an async write
// for read operation to complete
enum bw_state { bw_idle, bw_torrent, bw_global, bw_network };
enum bw_state { bw_idle, bw_limit, bw_network };
#ifndef TORRENT_NO_DEPRECATE
enum bw_state_deprecated { bw_torrent = bw_limit, bw_global = bw_limit };
#endif
char read_state;
char write_state;

View File

@ -70,6 +70,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/hasher.hpp"
#include "libtorrent/assert.hpp"
#include "libtorrent/bitfield.hpp"
#include "libtorrent/aux_/session_impl.hpp"
namespace libtorrent
{
@ -288,26 +289,10 @@ namespace libtorrent
// --------------------------------------------
// BANDWIDTH MANAGEMENT
bandwidth_limit m_bandwidth_limit[2];
bandwidth_channel m_bandwidth_channel[2];
void request_bandwidth(int channel
, boost::intrusive_ptr<peer_connection> const& p
, int max_block_size, int priority);
void perform_bandwidth_request(int channel
, boost::intrusive_ptr<peer_connection> const& p
, int block_size, int priority);
void expire_bandwidth(int channel, int amount);
void assign_bandwidth(int channel, int amount, int blk);
int bandwidth_throttle(int channel) const;
int max_assignable_bandwidth(int channel) const
{ return m_bandwidth_limit[channel].max_assignable(); }
int bandwidth_queue_size(int channel) const;
// --------------------------------------------
// PEER MANAGEMENT
@ -845,10 +830,6 @@ namespace libtorrent
boost::scoped_ptr<piece_picker> m_picker;
// the queue of peer_connections that want more bandwidth
typedef std::deque<bw_queue_entry<peer_connection, torrent> > queue_t;
queue_t m_bandwidth_queue[2];
std::vector<announce_entry> m_trackers;
// this is an index into m_trackers

View File

@ -157,11 +157,11 @@ namespace libtorrent
// connection, we have to give it some initial bandwidth
// to send the handshake.
#ifndef TORRENT_DISABLE_ENCRYPTION
m_bandwidth_limit[download_channel].assign(2048);
m_bandwidth_limit[upload_channel].assign(2048);
m_quota[download_channel] = 2048;
m_quota[upload_channel] = 2048;
#else
m_bandwidth_limit[download_channel].assign(80);
m_bandwidth_limit[upload_channel].assign(80);
m_quota[download_channel] = 80;
m_quota[upload_channel] = 80;
#endif
#ifdef TORRENT_DEBUG

View File

@ -76,7 +76,7 @@ namespace libtorrent
prefer_whole_pieces(1);
// we only want left-over bandwidth
set_priority(0);
set_priority(1);
shared_ptr<torrent> tor = t.lock();
TORRENT_ASSERT(tor);
int blocks_per_piece = tor->torrent_file().piece_length() / tor->block_size();

View File

@ -105,8 +105,8 @@ namespace libtorrent
, m_reading_bytes(0)
, m_num_invalid_requests(0)
, m_priority(1)
, m_upload_limit(bandwidth_limit::inf)
, m_download_limit(bandwidth_limit::inf)
, m_upload_limit(0)
, m_download_limit(0)
, m_peer_info(peerinfo)
, m_speed(slow)
, m_connection_ticket(-1)
@ -148,6 +148,9 @@ namespace libtorrent
m_channel_state[upload_channel] = peer_info::bw_idle;
m_channel_state[download_channel] = peer_info::bw_idle;
m_quota[0] = 0;
m_quota[1] = 0;
TORRENT_ASSERT(peerinfo == 0 || peerinfo->banned == false);
#ifndef TORRENT_DISABLE_RESOLVE_COUNTRIES
std::fill(m_country, m_country + 2, 0);
@ -222,8 +225,8 @@ namespace libtorrent
, m_reading_bytes(0)
, m_num_invalid_requests(0)
, m_priority(1)
, m_upload_limit(bandwidth_limit::inf)
, m_download_limit(bandwidth_limit::inf)
, m_upload_limit(0)
, m_download_limit(0)
, m_peer_info(peerinfo)
, m_speed(slow)
, m_connection_ticket(-1)
@ -265,6 +268,9 @@ namespace libtorrent
m_channel_state[upload_channel] = peer_info::bw_idle;
m_channel_state[download_channel] = peer_info::bw_idle;
m_quota[0] = 0;
m_quota[1] = 0;
#ifndef TORRENT_DISABLE_RESOLVE_COUNTRIES
std::fill(m_country, m_country + 2, 0);
#ifndef TORRENT_DISABLE_GEO_IP
@ -2796,19 +2802,19 @@ namespace libtorrent
void peer_connection::set_upload_limit(int limit)
{
TORRENT_ASSERT(limit >= -1);
if (limit == -1) limit = (std::numeric_limits<int>::max)();
if (limit < 10) limit = 10;
if (limit < 0) limit = 0;
if (limit < 10 && limit > 0) limit = 10;
m_upload_limit = limit;
m_bandwidth_limit[upload_channel].throttle(m_upload_limit);
m_bandwidth_channel[upload_channel].throttle(m_upload_limit);
}
void peer_connection::set_download_limit(int limit)
{
TORRENT_ASSERT(limit >= -1);
if (limit == -1) limit = (std::numeric_limits<int>::max)();
if (limit < 10) limit = 10;
if (limit < 0) limit = 0;
if (limit < 10 && limit > 0) limit = 10;
m_download_limit = limit;
m_bandwidth_limit[download_channel].throttle(m_download_limit);
m_bandwidth_channel[download_channel].throttle(m_download_limit);
}
size_type peer_connection::share_diff() const
@ -2862,8 +2868,8 @@ namespace libtorrent
p.pid = pid();
p.ip = remote();
p.pending_disk_bytes = m_outstanding_writing_bytes;
p.send_quota = m_bandwidth_limit[upload_channel].quota_left();
p.receive_quota = m_bandwidth_limit[download_channel].quota_left();
p.send_quota = m_quota[upload_channel];
p.receive_quota = m_quota[download_channel];
p.num_pieces = m_num_pieces;
if (m_download_queue.empty()) p.request_timeout = -1;
else p.request_timeout = total_seconds(m_requested - now) + m_ses.settings().request_timeout
@ -2883,15 +2889,15 @@ namespace libtorrent
p.total_download = statistics().total_payload_download();
p.total_upload = statistics().total_payload_upload();
if (m_bandwidth_limit[upload_channel].throttle() == bandwidth_limit::inf)
if (m_bandwidth_channel[upload_channel].throttle() == 0)
p.upload_limit = -1;
else
p.upload_limit = m_bandwidth_limit[upload_channel].throttle();
p.upload_limit = m_bandwidth_channel[upload_channel].throttle();
if (m_bandwidth_limit[download_channel].throttle() == bandwidth_limit::inf)
if (m_bandwidth_channel[download_channel].throttle() == 0)
p.download_limit = -1;
else
p.download_limit = m_bandwidth_limit[download_channel].throttle();
p.download_limit = m_bandwidth_channel[download_channel].throttle();
p.load_balancing = total_free_upload();
@ -3068,6 +3074,43 @@ namespace libtorrent
INVARIANT_CHECK;
boost::shared_ptr<torrent> t = m_torrent.lock();
// drain the IP overhead from the bandwidth limiters
if (m_ses.m_settings.rate_limit_ip_overhead)
{
int download_overhead = m_statistics.download_ip_overhead();
int upload_overhead = m_statistics.upload_ip_overhead();
m_bandwidth_channel[download_channel].use_quota(download_overhead);
m_bandwidth_channel[upload_channel].use_quota(upload_overhead);
int up_limit = m_bandwidth_channel[upload_channel].throttle();
int down_limit = m_bandwidth_channel[download_channel].throttle();
if (t)
{
t->m_bandwidth_channel[download_channel].use_quota(download_overhead);
t->m_bandwidth_channel[upload_channel].use_quota(upload_overhead);
if (down_limit > 0
&& download_overhead >= down_limit
&& t->alerts().should_post<performance_alert>())
{
t->alerts().post_alert(performance_alert(t->get_handle()
, performance_alert::download_limit_too_low));
}
if (up_limit > 0
&& upload_overhead >= up_limit
&& t->alerts().should_post<performance_alert>())
{
t->alerts().post_alert(performance_alert(t->get_handle()
, performance_alert::upload_limit_too_low));
}
}
m_ses.m_download_channel.use_quota(download_overhead);
m_ses.m_upload_channel.use_quota(upload_overhead);
}
if (!t || m_disconnecting)
{
m_ses.m_half_open.done(m_connection_ticket);
@ -3258,7 +3301,7 @@ namespace libtorrent
// if we have downloaded more than one piece more
// than we have uploaded OR if we are a seed
// have an unlimited upload rate
m_bandwidth_limit[upload_channel].throttle(m_upload_limit);
m_bandwidth_channel[upload_channel].throttle(m_upload_limit);
}
else
{
@ -3275,14 +3318,17 @@ namespace libtorrent
if (t->ratio() != 1.f)
soon_downloaded = (size_type)(soon_downloaded*(double)t->ratio());
double upload_speed_limit = (std::min)((soon_downloaded - have_uploaded
+ bias) / break_even_time, double(m_upload_limit));
double upload_speed_limit = (soon_downloaded - have_uploaded
+ bias) / break_even_time;
if (m_upload_limit > 0 && m_upload_limit < upload_speed_limit)
upload_speed_limit = m_upload_limit;
upload_speed_limit = (std::min)(upload_speed_limit,
(double)(std::numeric_limits<int>::max)());
m_bandwidth_limit[upload_channel].throttle(
(std::min)((std::max)((int)upload_speed_limit, 20)
m_bandwidth_channel[upload_channel].throttle(
(std::min)((std::max)((int)upload_speed_limit, 10)
, m_upload_limit));
}
@ -3485,8 +3531,9 @@ namespace libtorrent
(*m_logger) << "bandwidth [ " << channel << " ] + " << amount << "\n";
#endif
m_bandwidth_limit[channel].assign(amount);
TORRENT_ASSERT(m_channel_state[channel] == peer_info::bw_global);
TORRENT_ASSERT(amount > 0);
m_quota[channel] += amount;
TORRENT_ASSERT(m_channel_state[channel] == peer_info::bw_limit);
m_channel_state[channel] = peer_info::bw_idle;
if (channel == upload_channel)
{
@ -3498,21 +3545,6 @@ namespace libtorrent
}
}
void peer_connection::expire_bandwidth(int channel, int amount)
{
session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
m_bandwidth_limit[channel].expire(amount);
if (channel == upload_channel)
{
setup_send();
}
else if (channel == download_channel)
{
setup_receive();
}
}
void peer_connection::setup_send()
{
session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
@ -3521,7 +3553,7 @@ namespace libtorrent
shared_ptr<torrent> t = m_torrent.lock();
if (m_bandwidth_limit[upload_channel].quota_left() == 0
if (m_quota[upload_channel] == 0
&& !m_send_buffer.empty()
&& !m_connecting
&& t
@ -3531,18 +3563,19 @@ namespace libtorrent
// bandwidth. So, we simply request bandwidth
// from the torrent
TORRENT_ASSERT(t);
if (m_bandwidth_limit[upload_channel].max_assignable() > 0)
{
int priority = is_interesting() * 2 + m_requests_in_buffer.size();
// peers that we are not interested in are non-prioritized
m_channel_state[upload_channel] = peer_info::bw_torrent;
t->request_bandwidth(upload_channel, self()
, m_send_buffer.size(), priority);
int priority = 1 + is_interesting() * 2 + m_requests_in_buffer.size();
// peers that we are not interested in are non-prioritized
m_channel_state[upload_channel] = peer_info::bw_limit;
m_ses.m_upload_rate.request_bandwidth(self()
, m_send_buffer.size(), priority
, &m_ses.m_upload_channel
, &t->m_bandwidth_channel[upload_channel]
, &m_bandwidth_channel[upload_channel]);
#ifdef TORRENT_VERBOSE_LOGGING
(*m_logger) << time_now_string() << " *** REQUEST_BANDWIDTH [ upload prio: "
<< priority << "]\n";
(*m_logger) << time_now_string() << " *** REQUEST_BANDWIDTH [ "
"upload: " << m_send_buffer.size()
<< " prio: " << priority << "]\n";
#endif
}
return;
}
@ -3552,7 +3585,7 @@ namespace libtorrent
if (m_send_buffer.empty())
{
(*m_logger) << time_now_string() << " *** SEND BUFFER DEPLETED ["
" quota: " << m_bandwidth_limit[upload_channel].quota_left() <<
" quota: " << m_quota[upload_channel] <<
" ignore: " << (m_ignore_bandwidth_limits?"yes":"no") <<
" buf: " << m_send_buffer.size() <<
" connecting: " << (m_connecting?"yes":"no") <<
@ -3561,7 +3594,7 @@ namespace libtorrent
else
{
(*m_logger) << time_now_string() << " *** CANNOT WRITE ["
" quota: " << m_bandwidth_limit[download_channel].quota_left() <<
" quota: " << m_quota[upload_channel] <<
" ignore: " << (m_ignore_bandwidth_limits?"yes":"no") <<
" buf: " << m_send_buffer.size() <<
" connecting: " << (m_connecting?"yes":"no") <<
@ -3575,7 +3608,7 @@ namespace libtorrent
if (!m_send_buffer.empty())
{
int amount_to_send = m_send_buffer.size();
int quota_left = m_bandwidth_limit[upload_channel].quota_left();
int quota_left = m_quota[upload_channel];
if (!m_ignore_bandwidth_limits && amount_to_send > quota_left)
amount_to_send = quota_left;
@ -3601,21 +3634,23 @@ namespace libtorrent
shared_ptr<torrent> t = m_torrent.lock();
if (m_bandwidth_limit[download_channel].quota_left() == 0
if (m_quota[download_channel] == 0
&& !m_connecting
&& t
&& !m_ignore_bandwidth_limits)
{
if (m_bandwidth_limit[download_channel].max_assignable() > 0)
{
#ifdef TORRENT_VERBOSE_LOGGING
(*m_logger) << time_now_string() << " *** REQUEST_BANDWIDTH [ download ]\n";
(*m_logger) << time_now_string() << " *** REQUEST_BANDWIDTH [ "
"download: " << (m_download_queue.size() * 16 * 1024 + 30)
<< " prio: " << m_priority << " ]\n";
#endif
TORRENT_ASSERT(m_channel_state[download_channel] == peer_info::bw_idle);
m_channel_state[download_channel] = peer_info::bw_torrent;
t->request_bandwidth(download_channel, self()
, m_download_queue.size() * 16 * 1024 + 30, m_priority);
}
TORRENT_ASSERT(m_channel_state[download_channel] == peer_info::bw_idle);
m_channel_state[download_channel] = peer_info::bw_limit;
m_ses.m_download_rate.request_bandwidth(self()
, m_outstanding_bytes + 30, m_priority
, &m_ses.m_download_channel
, &t->m_bandwidth_channel[download_channel]
, &m_bandwidth_channel[download_channel]);
return;
}
@ -3623,7 +3658,7 @@ namespace libtorrent
{
#ifdef TORRENT_VERBOSE_LOGGING
(*m_logger) << time_now_string() << " *** CANNOT READ ["
" quota: " << m_bandwidth_limit[download_channel].quota_left() <<
" quota: " << m_quota[download_channel] <<
" ignore: " << (m_ignore_bandwidth_limits?"yes":"no") <<
" outstanding: " << m_outstanding_writing_bytes <<
" outstanding-limit: " << m_ses.settings().max_outstanding_disk_bytes_per_connection <<
@ -3637,7 +3672,7 @@ namespace libtorrent
if (m_recv_pos >= m_soft_packet_size) m_soft_packet_size = 0;
if (m_soft_packet_size && max_receive > m_soft_packet_size - m_recv_pos)
max_receive = m_soft_packet_size - m_recv_pos;
int quota_left = m_bandwidth_limit[download_channel].quota_left();
int quota_left = m_quota[download_channel];
if (!m_ignore_bandwidth_limits && max_receive > quota_left)
max_receive = quota_left;
@ -3862,7 +3897,10 @@ namespace libtorrent
#endif
// correct the dl quota usage, if not all of the buffer was actually read
if (!m_ignore_bandwidth_limits)
m_bandwidth_limit[download_channel].use_quota(bytes_transferred);
{
TORRENT_ASSERT(bytes_transferred <= m_quota[download_channel]);
m_quota[download_channel] -= bytes_transferred;
}
if (m_disconnecting)
{
@ -3904,7 +3942,7 @@ namespace libtorrent
max_receive = m_packet_size - m_recv_pos;
if (m_soft_packet_size && max_receive > m_soft_packet_size - m_recv_pos)
max_receive = m_soft_packet_size - m_recv_pos;
int quota_left = m_bandwidth_limit[download_channel].quota_left();
int quota_left = m_quota[download_channel];
if (!m_ignore_bandwidth_limits && max_receive > quota_left)
max_receive = quota_left;
@ -3968,14 +4006,14 @@ namespace libtorrent
// if we have requests or pending data to be sent or announcements to be made
// we want to send data
return !m_send_buffer.empty()
&& (m_bandwidth_limit[upload_channel].quota_left() > 0
&& (m_quota[upload_channel] > 0
|| m_ignore_bandwidth_limits)
&& !m_connecting;
}
bool peer_connection::can_read() const
{
bool ret = (m_bandwidth_limit[download_channel].quota_left() > 0
bool ret = (m_quota[download_channel] > 0
|| m_ignore_bandwidth_limits)
&& !m_connecting
&& m_outstanding_writing_bytes <
@ -4159,7 +4197,10 @@ namespace libtorrent
m_channel_state[upload_channel] = peer_info::bw_idle;
if (!m_ignore_bandwidth_limits)
m_bandwidth_limit[upload_channel].use_quota(bytes_transferred);
{
TORRENT_ASSERT(bytes_transferred <= m_quota[upload_channel]);
m_quota[upload_channel] -= bytes_transferred;
}
m_statistics.trancieve_ip_packet(bytes_transferred, m_remote.address().is_v6());
@ -4205,6 +4246,9 @@ namespace libtorrent
{
TORRENT_ASSERT(bool(m_disk_recv_buffer) == (m_disk_recv_buffer_size > 0));
TORRENT_ASSERT(m_upload_limit >= 0);
TORRENT_ASSERT(m_download_limit >= 0);
boost::shared_ptr<torrent> t = m_torrent.lock();
if (m_disconnecting)
{
@ -4251,24 +4295,10 @@ namespace libtorrent
TORRENT_ASSERT(m_outstanding_bytes == outstanding_bytes);
}
/*
// this assertion correct most of the time, but sometimes right when the
// limit is changed it might break
for (int i = 0; i < 2; ++i)
{
// this peer is in the bandwidth history iff max_assignable < limit
TORRENT_ASSERT((m_bandwidth_limit[i].max_assignable() < m_bandwidth_limit[i].throttle())
== m_ses.m_bandwidth_manager[i]->is_in_history(this)
|| m_bandwidth_limit[i].throttle() == bandwidth_limit::inf);
}
*/
if (m_channel_state[download_channel] == peer_info::bw_torrent
|| m_channel_state[download_channel] == peer_info::bw_global)
TORRENT_ASSERT(m_bandwidth_limit[download_channel].quota_left() == 0);
if (m_channel_state[upload_channel] == peer_info::bw_torrent
|| m_channel_state[upload_channel] == peer_info::bw_global)
TORRENT_ASSERT(m_bandwidth_limit[upload_channel].quota_left() == 0);
if (m_channel_state[download_channel] == peer_info::bw_limit)
TORRENT_ASSERT(m_quota[download_channel] == 0);
if (m_channel_state[upload_channel] == peer_info::bw_limit)
TORRENT_ASSERT(m_quota[upload_channel] == 0);
std::set<piece_block> unique;
std::transform(m_download_queue.begin(), m_download_queue.end()

View File

@ -157,11 +157,11 @@ namespace aux {
, m_io_service()
, m_disk_thread(m_io_service)
, m_half_open(m_io_service)
, m_download_channel(m_io_service, peer_connection::download_channel)
, m_download_rate(peer_connection::download_channel)
#ifdef TORRENT_VERBOSE_BANDWIDTH_LIMIT
, m_upload_channel(m_io_service, peer_connection::upload_channel, true)
, m_upload_rate(peer_connection::upload_channel, true)
#else
, m_upload_channel(m_io_service, peer_connection::upload_channel)
, m_upload_rate(peer_connection::upload_channel)
#endif
, m_tracker_manager(*this, m_tracker_proxy)
, m_listen_port_retries(listen_port_range.second - listen_port_range.first)
@ -178,6 +178,7 @@ namespace aux {
, m_auto_scrape_time_scaler(180)
, m_incoming_connection(false)
, m_last_tick(time_now())
, m_last_second_tick(m_last_tick)
, m_last_choke(m_last_tick)
#ifndef TORRENT_DISABLE_DHT
, m_dht_same_port(true)
@ -222,8 +223,8 @@ namespace aux {
}
#endif
m_bandwidth_manager[peer_connection::download_channel] = &m_download_channel;
m_bandwidth_manager[peer_connection::upload_channel] = &m_upload_channel;
m_bandwidth_channel[peer_connection::download_channel] = &m_download_channel;
m_bandwidth_channel[peer_connection::upload_channel] = &m_upload_channel;
#ifdef TORRENT_UPNP_LOGGING
m_upnp_log.open("upnp.log", std::ios::in | std::ios::out | std::ios::trunc);
@ -278,9 +279,9 @@ namespace aux {
*i = printable[rand() % (sizeof(printable)-1)];
}
m_timer.expires_from_now(seconds(1), ec);
m_timer.expires_from_now(milliseconds(100), ec);
m_timer.async_wait(
bind(&session_impl::second_tick, this, _1));
bind(&session_impl::on_tick, this, _1));
m_thread.reset(new boost::thread(boost::ref(*this)));
}
@ -554,8 +555,8 @@ namespace aux {
(*m_logger) << time_now_string() << " shutting down connection queue\n";
#endif
m_download_channel.close();
m_upload_channel.close();
m_download_rate.close();
m_upload_rate.close();
}
void session_impl::set_port_filter(port_filter const& f)
@ -1086,7 +1087,7 @@ namespace aux {
return port;
}
void session_impl::second_tick(error_code const& e)
void session_impl::on_tick(error_code const& e)
{
session_impl::mutex_t::scoped_lock l(m_mutex);
@ -1105,13 +1106,22 @@ namespace aux {
}
ptime now = time_now();
float tick_interval = total_microseconds(now - m_last_tick) / 1000000.f;
m_last_tick = now;
error_code ec;
m_timer.expires_at(now + seconds(1), ec);
m_timer.expires_at(now + milliseconds(100), ec);
m_timer.async_wait(
bind(&session_impl::second_tick, this, _1));
bind(&session_impl::on_tick, this, _1));
m_download_rate.update_quotas(now - m_last_tick);
m_upload_rate.update_quotas(now - m_last_tick);
m_last_tick = now;
// only tick the rest once every
if (now - m_last_second_tick < seconds(1)) return;
float tick_interval = total_microseconds(now - m_last_second_tick) / 1000000.f;
m_last_second_tick = now;
#ifdef TORRENT_STATS
++m_second_counter;
@ -1198,7 +1208,7 @@ namespace aux {
{
torrent& t = *i->second;
TORRENT_ASSERT(!t.is_aborted());
if (t.bandwidth_queue_size(peer_connection::upload_channel))
if (t.statistics().upload_rate() > t.upload_limit() * 0.9f)
++congested_torrents;
else
++uncongested_torrents;
@ -1237,31 +1247,31 @@ namespace aux {
m_stat.received_dht_bytes(dht_down);
}
// drain the IP overhead from the bandwidth limiters
if (m_settings.rate_limit_ip_overhead)
{
m_download_channel.drain(
m_stat.download_ip_overhead()
+ m_stat.download_dht()
m_download_channel.use_quota(m_stat.download_dht()
+ m_stat.download_tracker());
if (m_stat.download_ip_overhead() >= m_download_channel.throttle()
&& m_alerts.should_post<performance_alert>())
m_upload_channel.use_quota(m_stat.upload_dht()
+ m_stat.upload_tracker());
int up_limit = m_upload_channel.throttle();
int down_limit = m_download_channel.throttle();
if (down_limit > 0
&& m_stat.download_ip_overhead() >= down_limit
&& m_alerts.should_post<performance_alert>())
{
m_alerts.post_alert(performance_alert(torrent_handle()
, performance_alert::download_limit_too_low));
, performance_alert::download_limit_too_low));
}
m_upload_channel.drain(
m_stat.upload_ip_overhead()
+ m_stat.upload_dht()
+ m_stat.upload_tracker());
if (m_stat.upload_ip_overhead() >= m_upload_channel.throttle()
&& m_alerts.should_post<performance_alert>())
if (up_limit > 0
&& m_stat.upload_ip_overhead() >= up_limit
&& m_alerts.should_post<performance_alert>())
{
m_alerts.post_alert(performance_alert(torrent_handle()
, performance_alert::upload_limit_too_low));
, performance_alert::upload_limit_too_low));
}
}
@ -1663,7 +1673,7 @@ namespace aux {
// assume a reasonable rate is 3 kB/s, unless there's an upload limit and
// a max number of slots, in which case we assume each upload slot gets
// roughly the same amount of bandwidth
if (m_upload_channel.throttle() != bandwidth_limit::inf && m_max_uploads > 0)
if (m_upload_channel.throttle() != bandwidth_channel::inf && m_max_uploads > 0)
rate = (std::max)(m_upload_channel.throttle() / m_max_uploads, 1);
// the time it takes to download one piece at this rate (in seconds)
@ -1752,10 +1762,10 @@ namespace aux {
, bind(&peer_connection::reset_choke_counters, _1));
// auto unchoke
int upload_limit = m_bandwidth_manager[peer_connection::upload_channel]->throttle();
int upload_limit = m_bandwidth_channel[peer_connection::upload_channel]->throttle();
if (!m_settings.auto_upload_slots_rate_based
&& m_settings.auto_upload_slots
&& upload_limit != bandwidth_limit::inf)
&& upload_limit > 0)
{
// if our current upload rate is less than 90% of our
// limit AND most torrents are not "congested", i.e.
@ -1764,11 +1774,11 @@ namespace aux {
if (m_stat.upload_rate() < upload_limit * 0.9f
&& m_allowed_upload_slots <= m_num_unchoked + 1
&& congested_torrents < uncongested_torrents
&& m_upload_channel.queue_size() < 2)
&& m_upload_rate.queue_size() < 2)
{
++m_allowed_upload_slots;
}
else if (m_upload_channel.queue_size() > 1
else if (m_upload_rate.queue_size() > 1
&& m_allowed_upload_slots > m_max_uploads)
{
--m_allowed_upload_slots;
@ -2235,11 +2245,11 @@ namespace aux {
s.total_redundant_bytes = m_total_redundant_bytes;
s.total_failed_bytes = m_total_failed_bytes;
s.up_bandwidth_queue = m_upload_channel.queue_size();
s.down_bandwidth_queue = m_download_channel.queue_size();
s.up_bandwidth_queue = m_upload_rate.queue_size();
s.down_bandwidth_queue = m_download_rate.queue_size();
s.up_bandwidth_bytes_queue = m_upload_channel.queued_bytes();
s.down_bandwidth_bytes_queue = m_download_channel.queued_bytes();
s.up_bandwidth_bytes_queue = m_upload_rate.queued_bytes();
s.down_bandwidth_bytes_queue = m_download_rate.queued_bytes();
s.has_incoming_connections = m_incoming_connection;
@ -2543,8 +2553,8 @@ namespace aux {
INVARIANT_CHECK;
if (bytes_per_second <= 0) bytes_per_second = bandwidth_limit::inf;
m_bandwidth_manager[peer_connection::download_channel]->throttle(bytes_per_second);
if (bytes_per_second <= 0) bytes_per_second = bandwidth_channel::inf;
m_bandwidth_channel[peer_connection::download_channel]->throttle(bytes_per_second);
}
void session_impl::set_upload_rate_limit(int bytes_per_second)
@ -2553,8 +2563,8 @@ namespace aux {
INVARIANT_CHECK;
if (bytes_per_second <= 0) bytes_per_second = bandwidth_limit::inf;
m_bandwidth_manager[peer_connection::upload_channel]->throttle(bytes_per_second);
if (bytes_per_second <= 0) bytes_per_second = bandwidth_channel::inf;
m_bandwidth_channel[peer_connection::upload_channel]->throttle(bytes_per_second);
}
void session_impl::set_alert_dispatch(boost::function<void(alert const&)> const& fun)
@ -2598,14 +2608,14 @@ namespace aux {
INVARIANT_CHECK;
int ret = m_bandwidth_manager[peer_connection::upload_channel]->throttle();
int ret = m_bandwidth_channel[peer_connection::upload_channel]->throttle();
return ret == (std::numeric_limits<int>::max)() ? -1 : ret;
}
int session_impl::download_rate_limit() const
{
mutex_t::scoped_lock l(m_mutex);
int ret = m_bandwidth_manager[peer_connection::download_channel]->throttle();
int ret = m_bandwidth_channel[peer_connection::download_channel]->throttle();
return ret == (std::numeric_limits<int>::max)() ? -1 : ret;
}

View File

@ -2598,18 +2598,6 @@ namespace libtorrent
p->set_peer_info(0);
TORRENT_ASSERT(i != m_connections.end());
m_connections.erase(i);
// remove from bandwidth request-queue
for (int c = 0; c < 2; ++c)
{
for (queue_t::iterator i = m_bandwidth_queue[c].begin()
, end(m_bandwidth_queue[c].end()); i != end; ++i)
{
if (i->peer != p) continue;
m_bandwidth_queue[c].erase(i);
break;
}
}
}
void torrent::connect_to_url_seed(web_seed_entry const& web)
@ -3823,92 +3811,7 @@ namespace libtorrent
int torrent::bandwidth_throttle(int channel) const
{
return m_bandwidth_limit[channel].throttle();
}
int torrent::bandwidth_queue_size(int channel) const
{
return (int)m_bandwidth_queue[channel].size();
}
void torrent::request_bandwidth(int channel
, boost::intrusive_ptr<peer_connection> const& p
, int max_block_size, int priority)
{
TORRENT_ASSERT(max_block_size > 0);
TORRENT_ASSERT(m_bandwidth_limit[channel].throttle() > 0);
TORRENT_ASSERT(p->max_assignable_bandwidth(channel) > 0);
TORRENT_ASSERT(p->m_channel_state[channel] == peer_info::bw_torrent);
int block_size = (std::min)(m_bandwidth_limit[channel].throttle() / 10
, max_block_size);
if (block_size <= 0) block_size = 1;
if (m_bandwidth_limit[channel].max_assignable() > 0)
{
perform_bandwidth_request(channel, p, block_size, priority);
}
else
{
// skip forward in the queue until we find a prioritized peer
// or hit the front of it.
queue_t::reverse_iterator i = m_bandwidth_queue[channel].rbegin();
while (i != m_bandwidth_queue[channel].rend() && priority > i->priority)
{
++i->priority;
++i;
}
m_bandwidth_queue[channel].insert(i.base(), bw_queue_entry<peer_connection, torrent>(
p, block_size, priority));
}
}
void torrent::expire_bandwidth(int channel, int amount)
{
session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
INVARIANT_CHECK;
TORRENT_ASSERT(amount > 0);
m_bandwidth_limit[channel].expire(amount);
queue_t tmp;
while (!m_bandwidth_queue[channel].empty())
{
bw_queue_entry<peer_connection, torrent> qe = m_bandwidth_queue[channel].front();
if (m_bandwidth_limit[channel].max_assignable() == 0)
break;
m_bandwidth_queue[channel].pop_front();
if (qe.peer->max_assignable_bandwidth(channel) <= 0)
{
TORRENT_ASSERT(m_ses.m_bandwidth_manager[channel]->is_in_history(qe.peer.get()));
if (!qe.peer->is_disconnecting()) tmp.push_back(qe);
continue;
}
perform_bandwidth_request(channel, qe.peer
, qe.max_block_size, qe.priority);
}
m_bandwidth_queue[channel].insert(m_bandwidth_queue[channel].begin(), tmp.begin(), tmp.end());
}
void torrent::perform_bandwidth_request(int channel
, boost::intrusive_ptr<peer_connection> const& p
, int block_size
, int priority)
{
TORRENT_ASSERT(p->m_channel_state[channel] == peer_info::bw_torrent);
p->m_channel_state[channel] = peer_info::bw_global;
m_ses.m_bandwidth_manager[channel]->request_bandwidth(p
, block_size, priority);
m_bandwidth_limit[channel].assign(block_size);
}
void torrent::assign_bandwidth(int channel, int amount, int blk)
{
session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
TORRENT_ASSERT(amount > 0);
TORRENT_ASSERT(amount <= blk);
if (amount < blk)
expire_bandwidth(channel, blk - amount);
return m_bandwidth_channel[channel].throttle();
}
// called when torrent is finished (all interesting
@ -4215,19 +4118,6 @@ namespace libtorrent
TORRENT_ASSERT(m_resume_entry.type() == lazy_entry::dict_t
|| m_resume_entry.type() == lazy_entry::none_t);
TORRENT_ASSERT(m_bandwidth_queue[0].size() <= m_connections.size());
TORRENT_ASSERT(m_bandwidth_queue[1].size() <= m_connections.size());
for (int c = 0; c < 2; ++c)
{
queue_t::const_iterator j = m_bandwidth_queue[c].begin();
if (j == m_bandwidth_queue[c].end()) continue;
++j;
for (queue_t::const_iterator i = m_bandwidth_queue[c].begin()
, end(m_bandwidth_queue[c].end()); i != end && j != end; ++i, ++j)
TORRENT_ASSERT(i->priority >= j->priority);
}
int num_uploads = 0;
std::map<piece_block, int> num_requests;
for (const_peer_iterator i = begin(); i != end(); ++i)
@ -4437,14 +4327,14 @@ namespace libtorrent
void torrent::set_upload_limit(int limit)
{
TORRENT_ASSERT(limit >= -1);
if (limit <= 0) limit = (std::numeric_limits<int>::max)();
if (limit <= 0) limit = 0;
if (limit < num_peers() * 10) limit = num_peers() * 10;
m_bandwidth_limit[peer_connection::upload_channel].throttle(limit);
m_bandwidth_channel[peer_connection::upload_channel].throttle(limit);
}
int torrent::upload_limit() const
{
int limit = m_bandwidth_limit[peer_connection::upload_channel].throttle();
int limit = m_bandwidth_channel[peer_connection::upload_channel].throttle();
if (limit == (std::numeric_limits<int>::max)()) limit = -1;
return limit;
}
@ -4452,14 +4342,14 @@ namespace libtorrent
void torrent::set_download_limit(int limit)
{
TORRENT_ASSERT(limit >= -1);
if (limit <= 0) limit = (std::numeric_limits<int>::max)();
if (limit <= 0) limit = 0;
if (limit < num_peers() * 10) limit = num_peers() * 10;
m_bandwidth_limit[peer_connection::download_channel].throttle(limit);
m_bandwidth_channel[peer_connection::download_channel].throttle(limit);
}
int torrent::download_limit() const
{
int limit = m_bandwidth_limit[peer_connection::download_channel].throttle();
int limit = m_bandwidth_channel[peer_connection::download_channel].throttle();
if (limit == (std::numeric_limits<int>::max)()) limit = -1;
return limit;
}
@ -4848,6 +4738,28 @@ namespace libtorrent
return;
}
if (m_settings.rate_limit_ip_overhead)
{
int up_limit = m_bandwidth_channel[peer_connection::upload_channel].throttle();
int down_limit = m_bandwidth_channel[peer_connection::download_channel].throttle();
if (down_limit > 0
&& m_stat.download_ip_overhead() >= down_limit
&& alerts().should_post<performance_alert>())
{
alerts().post_alert(performance_alert(get_handle()
, performance_alert::download_limit_too_low));
}
if (up_limit > 0
&& m_stat.upload_ip_overhead() >= up_limit
&& alerts().should_post<performance_alert>())
{
alerts().post_alert(performance_alert(get_handle()
, performance_alert::upload_limit_too_low));
}
}
time_duration since_last_tick = microsec(tick_interval * 1000000L);
if (is_seed()) m_seeding_time += since_last_tick;
m_active_time += since_last_tick;
@ -5340,8 +5252,8 @@ namespace libtorrent
{
st.last_scrape = total_seconds(now - m_last_scrape);
}
st.up_bandwidth_queue = (int)m_bandwidth_queue[peer_connection::upload_channel].size();
st.down_bandwidth_queue = (int)m_bandwidth_queue[peer_connection::download_channel].size();
st.up_bandwidth_queue = 0;
st.down_bandwidth_queue = 0;
st.num_peers = (int)std::count_if(m_connections.begin(), m_connections.end()
, !boost::bind(&peer_connection::is_connecting, _1));

View File

@ -76,7 +76,7 @@ namespace libtorrent
set_upload_only(true);
// we only want left-over bandwidth
set_priority(0);
set_priority(1);
shared_ptr<torrent> tor = t.lock();
TORRENT_ASSERT(tor);
int blocks_per_piece = tor->torrent_file().piece_length() / tor->block_size();

View File

@ -47,320 +47,112 @@ struct peer_connection;
using namespace libtorrent;
const float sample_time = 6.f; // seconds
const float sample_time = 20.f; // seconds
//#define VERBOSE_LOGGING
bandwidth_channel global_bwc;
struct peer_connection: intrusive_ptr_base<peer_connection>
{
typedef torrent torrent_type;
peer_connection(io_service& ios, boost::shared_ptr<torrent> const& t
, int prio, bool ignore_limits, std::string name);
bool ignore_bandwidth_limits() { return m_ignore_limits; }
int max_assignable_bandwidth(int channel) const
{ return m_bandwidth_limit[channel].max_assignable(); }
boost::weak_ptr<torrent> associated_torrent() const
{ return m_torrent; }
bool is_disconnecting() const { return m_abort; }
void assign_bandwidth(int channel, int amount);
void on_transfer(int channel, int amount);
void start();
void stop() { m_abort = true; }
void expire_bandwidth(int channel, int amount);
void tick();
int bandwidth_throttle(int channel) const
{ return m_bandwidth_limit[channel].throttle(); }
void throttle(int limit) { m_bandwidth_limit[0].throttle(limit); }
bandwidth_limit m_bandwidth_limit[1];
boost::weak_ptr<torrent> m_torrent;
int m_priority;
bool m_ignore_limits;
bool m_abort;
libtorrent::stat m_stats;
io_service& m_ios;
std::string m_name;
bool m_writing;
};
struct torrent
{
torrent(bandwidth_manager<peer_connection, torrent>& m)
: m_bandwidth_manager(m)
peer_connection(bandwidth_manager<peer_connection>& bwm
, bandwidth_channel& torrent_bwc, int prio, bool ignore_limits, std::string name)
: m_bwm(bwm)
, m_torrent_bandwidth_channel(torrent_bwc)
, m_priority(prio)
, m_ignore_limits(ignore_limits)
, m_name(name)
, m_quota(0)
{}
void assign_bandwidth(int channel, int amount, int max_block_size)
{
#ifdef VERBOSE_LOGGING
std::cerr << time_now_string()
<< ": assign bandwidth, " << amount << " blk: " << max_block_size << std::endl;
#endif
TEST_CHECK(amount > 0);
TEST_CHECK(amount <= max_block_size);
if (amount < max_block_size)
expire_bandwidth(channel, max_block_size - amount);
}
bool is_disconnecting() const { return false; }
bool ignore_bandwidth_limits() { return m_ignore_limits; }
void assign_bandwidth(int channel, int amount);
int bandwidth_throttle(int channel) const
{ return m_bandwidth_limit[channel].throttle(); }
void throttle(int limit) { m_bandwidth_channel.throttle(limit); }
int max_assignable_bandwidth(int channel) const
{ return m_bandwidth_limit[channel].max_assignable(); }
void start();
void request_bandwidth(int channel
, boost::intrusive_ptr<peer_connection> const& p
, int max_block_size
, int priority)
{
TORRENT_ASSERT(max_block_size > 0);
TORRENT_ASSERT(m_bandwidth_limit[channel].throttle() > 0);
int block_size = (std::min)(m_bandwidth_limit[channel].throttle() / 10
, max_block_size);
if (block_size <= 0) block_size = 1;
bandwidth_manager<peer_connection>& m_bwm;
if (m_bandwidth_limit[channel].max_assignable() > 0)
{
#ifdef VERBOSE_LOGGING
std::cerr << time_now_string()
<< ": request bandwidth " << block_size << std::endl;
#endif
perform_bandwidth_request(channel, p, block_size, priority);
}
else
{
#ifdef VERBOSE_LOGGING
std::cerr << time_now_string()
<< ": queue bandwidth request" << block_size << std::endl;
#endif
// skip forward in the queue until we find a prioritized peer
// or hit the front of it.
queue_t::reverse_iterator i = m_bandwidth_queue[channel].rbegin();
while (i != m_bandwidth_queue[channel].rend() && priority > i->priority)
{
++i->priority;
++i;
}
m_bandwidth_queue[channel].insert(i.base(), bw_queue_entry<peer_connection, torrent>(
p, block_size, priority));
}
}
bandwidth_channel m_bandwidth_channel;
bandwidth_channel& m_torrent_bandwidth_channel;
void expire_bandwidth(int channel, int amount);
void perform_bandwidth_request(int channel
, boost::intrusive_ptr<peer_connection> const& p
, int block_size
, int priority)
{
m_bandwidth_manager.request_bandwidth(p
, block_size, priority);
m_bandwidth_limit[channel].assign(block_size);
}
bandwidth_limit m_bandwidth_limit[1];
typedef std::deque<bw_queue_entry<peer_connection, torrent> > queue_t;
queue_t m_bandwidth_queue[1];
bandwidth_manager<peer_connection, torrent>& m_bandwidth_manager;
int m_priority;
bool m_ignore_limits;
std::string m_name;
int m_quota;
};
peer_connection::peer_connection(io_service& ios, boost::shared_ptr<torrent> const& t
, int prio, bool ignore_limits, std::string name)
: m_torrent(t)
, m_priority(prio)
, m_ignore_limits(ignore_limits)
, m_abort(false)
, m_ios(ios)
, m_name(name)
, m_writing(false)
{}
void peer_connection::assign_bandwidth(int channel, int amount)
{
TEST_CHECK(m_writing);
m_quota += amount;
#ifdef VERBOSE_LOGGING
std::cerr << time_now_string() << ": [" << m_name
std::cerr << " [" << m_name
<< "] assign bandwidth, " << amount << std::endl;
#endif
TEST_CHECK(amount > 0);
m_bandwidth_limit[channel].assign(amount);
m_ios.post(boost::bind(&peer_connection::on_transfer, self(), channel, amount));
}
void peer_connection::on_transfer(int channel, int amount)
{
TEST_CHECK(m_writing);
m_writing = false;
m_stats.sent_bytes(amount, 0);
boost::shared_ptr<torrent> t = m_torrent.lock();
if (!t) return;
if (m_bandwidth_limit[channel].max_assignable() > 0)
{
m_writing = true;
t->request_bandwidth(0, this, 32 * 1024, m_priority);
}
start();
}
void peer_connection::start()
{
boost::shared_ptr<torrent> t = m_torrent.lock();
if (!t) return;
m_writing = true;
t->request_bandwidth(0, this, 32 * 1024, m_priority);
m_bwm.request_bandwidth(self(), 150000, m_priority
, &m_bandwidth_channel
, &m_torrent_bandwidth_channel
, &global_bwc);
}
void peer_connection::expire_bandwidth(int channel, int amount)
{
TEST_CHECK(amount > 0);
#ifdef VERBOSE_LOGGING
std::cerr << time_now_string() << ": [" << m_name
<< "] expire bandwidth, " << amount << std::endl;
#endif
m_bandwidth_limit[channel].expire(amount);
if (!m_writing && m_bandwidth_limit[channel].max_assignable() > 0)
{
boost::shared_ptr<torrent> t = m_torrent.lock();
if (!t) return;
m_writing = true;
t->request_bandwidth(0, this, 32 * 1024, m_priority);
}
}
void peer_connection::tick()
{
#ifdef VERBOSE_LOGGING
std::cerr << time_now_string() << ": [" << m_name
<< "] tick, rate: " << m_stats.upload_rate() << std::endl;
#endif
m_stats.second_tick(1.f);
}
void torrent::expire_bandwidth(int channel, int amount)
{
#ifdef VERBOSE_LOGGING
std::cerr << time_now_string()
<< ": expire bandwidth, " << amount << std::endl;
#endif
TEST_CHECK(amount > 0);
m_bandwidth_limit[channel].expire(amount);
queue_t tmp;
while (!m_bandwidth_queue[channel].empty())
{
bw_queue_entry<peer_connection, torrent> qe = m_bandwidth_queue[channel].front();
if (m_bandwidth_limit[channel].max_assignable() == 0)
break;
m_bandwidth_queue[channel].pop_front();
if (qe.peer->max_assignable_bandwidth(channel) <= 0)
{
if (!qe.peer->is_disconnecting()) tmp.push_back(qe);
continue;
}
perform_bandwidth_request(channel, qe.peer
, qe.max_block_size, qe.priority);
}
m_bandwidth_queue[channel].insert(m_bandwidth_queue[channel].begin(), tmp.begin(), tmp.end());
}
typedef std::vector<boost::intrusive_ptr<peer_connection> > connections_t;
bool abort_tick = false;
void do_tick(error_code const&e, deadline_timer& tick, connections_t& v)
void do_change_rate(bandwidth_channel& t1, bandwidth_channel& t2, int limit)
{
if (e || abort_tick)
{
std::cerr << " tick aborted" << std::endl;
return;
}
std::for_each(v.begin(), v.end()
, boost::bind(&peer_connection::tick, _1));
error_code ec;
tick.expires_from_now(seconds(1), ec);
tick.async_wait(boost::bind(&do_tick, _1, boost::ref(tick), boost::ref(v)));
}
void do_stop(deadline_timer& tick, connections_t& v)
{
abort_tick = true;
error_code ec;
tick.cancel(ec);
std::for_each(v.begin(), v.end()
, boost::bind(&peer_connection::stop, _1));
std::cerr << " stopping..." << std::endl;
}
void do_change_rate(error_code const&e, deadline_timer& tick
, boost::shared_ptr<torrent> t1
, boost::shared_ptr<torrent> t2
, int limit
, int counter)
{
TEST_CHECK(!e);
if (e) return;
static int counter = 10;
--counter;
if (counter == 0)
{
t1->m_bandwidth_limit[0].throttle(limit);
t2->m_bandwidth_limit[0].throttle(limit);
t1.throttle(limit);
t2.throttle(limit);
return;
}
t1->m_bandwidth_limit[0].throttle(limit + limit / 2 * ((counter & 1)?-1:1));
t2->m_bandwidth_limit[0].throttle(limit + limit / 2 * ((counter & 1)?1:-1));
error_code ec;
tick.expires_from_now(milliseconds(1600), ec);
tick.async_wait(boost::bind(&do_change_rate, _1, boost::ref(tick), t1, t2, limit, counter-1));
t1.throttle(limit + limit / 2 * ((counter & 1)?-1:1));
t2.throttle(limit + limit / 2 * ((counter & 1)?1:-1));
}
void do_change_peer_rate(error_code const&e, deadline_timer& tick
, connections_t& v
, int limit
, int counter)
void do_change_peer_rate(connections_t& v, int limit)
{
TEST_CHECK(!e);
if (e) return;
if (counter == 0)
static int count = 10;
--count;
if (count == 0)
{
std::for_each(v.begin(), v.end()
, boost::bind(&peer_connection::throttle, _1, limit));
return;
}
int c = counter;
int c = count;
for (connections_t::iterator i = v.begin(); i != v.end(); ++i, ++c)
i->get()->throttle(limit + limit / 2 * ((c & 1)?-1:1));
error_code ec;
tick.expires_from_now(milliseconds(1100), ec);
tick.async_wait(boost::bind(&do_change_peer_rate, _1, boost::ref(tick), boost::ref(v), limit, counter-1));
}
void run_test(io_service& ios, connections_t& v)
void nop() {}
void run_test(connections_t& v
, bandwidth_manager<peer_connection>& manager
, boost::function<void()> f = &nop)
{
abort_tick = false;
std::cerr << "-------------" << std::endl;
deadline_timer tick(ios);
error_code ec;
tick.expires_from_now(seconds(1), ec);
tick.async_wait(boost::bind(&do_tick, _1, boost::ref(tick), boost::ref(v)));
deadline_timer complete(ios);
complete.expires_from_now(milliseconds(int(sample_time * 1000)), ec);
complete.async_wait(boost::bind(&do_stop, boost::ref(tick), boost::ref(v)));
std::for_each(v.begin(), v.end()
, boost::bind(&peer_connection::start, _1));
ios.run(ec);
for (int i = 0; i < int(sample_time * 10); ++i)
{
manager.update_quotas(milliseconds(100));
if ((i % 15) == 0) f();
}
}
bool close_to(float val, float comp, float err)
@ -368,12 +160,12 @@ bool close_to(float val, float comp, float err)
return fabs(val - comp) <= err;
}
void spawn_connections(connections_t& v, io_service& ios
, boost::shared_ptr<torrent> t, int num, char const* prefix)
void spawn_connections(connections_t& v, bandwidth_manager<peer_connection>& bwm
, bandwidth_channel& bwc, int num, char const* prefix)
{
for (int i = 0; i < num; ++i)
{
v.push_back(new peer_connection(ios, t, 200, false
v.push_back(new peer_connection(bwm, bwc, 200, false
, prefix + boost::lexical_cast<std::string>(i)));
}
}
@ -381,26 +173,25 @@ void spawn_connections(connections_t& v, io_service& ios
void test_equal_connections(int num, int limit)
{
std::cerr << "\ntest equal connections " << num << " " << limit << std::endl;
io_service ios;
bandwidth_manager<peer_connection, torrent> manager(ios, 0);
manager.throttle(limit);
bandwidth_manager<peer_connection> manager(0);
global_bwc.throttle(limit);
boost::shared_ptr<torrent> t1(new torrent(manager));
bandwidth_channel t1;
connections_t v;
spawn_connections(v, ios, t1, num, "p");
run_test(ios, v);
spawn_connections(v, manager, t1, num, "p");
run_test(v, manager);
float sum = 0.f;
float err = (std::max)(limit / num * 0.3f, 1000.f);
for (connections_t::iterator i = v.begin()
, end(v.end()); i != end; ++i)
{
sum += (*i)->m_stats.total_payload_upload();
sum += (*i)->m_quota;
std::cerr << (*i)->m_stats.total_payload_upload() / sample_time
std::cerr << (*i)->m_quota / sample_time
<< " target: " << (limit / num) << " eps: " << err << std::endl;
TEST_CHECK(close_to((*i)->m_stats.total_payload_upload() / sample_time, limit / num, err));
TEST_CHECK(close_to((*i)->m_quota / sample_time, limit / num, err));
}
sum /= sample_time;
std::cerr << "sum: " << sum << " target: " << limit << std::endl;
@ -414,24 +205,20 @@ void test_connections_variable_rate(int num, int limit, int torrent_limit)
<< " l: " << limit
<< " t: " << torrent_limit
<< std::endl;
io_service ios;
bandwidth_manager<peer_connection, torrent> manager(ios, 0);
bandwidth_manager<peer_connection> manager(0);
global_bwc.throttle(0);
boost::shared_ptr<torrent> t1(new torrent(manager));
bandwidth_channel t1;
if (torrent_limit)
t1->m_bandwidth_limit[0].throttle(torrent_limit);
t1.throttle(torrent_limit);
connections_t v;
spawn_connections(v, ios, t1, num, "p");
spawn_connections(v, manager, t1, num, "p");
std::for_each(v.begin(), v.end()
, boost::bind(&peer_connection::throttle, _1, limit));
error_code ec;
deadline_timer change_rate(ios);
change_rate.expires_from_now(milliseconds(1600), ec);
change_rate.async_wait(boost::bind(&do_change_peer_rate, _1, boost::ref(change_rate)
, boost::ref(v), limit, 9));
run_test(ios, v);
run_test(v, manager, boost::bind(&do_change_peer_rate
, boost::ref(v), limit));
if (torrent_limit > 0 && limit * num > torrent_limit)
limit = torrent_limit / num;
@ -441,11 +228,11 @@ void test_connections_variable_rate(int num, int limit, int torrent_limit)
for (connections_t::iterator i = v.begin()
, end(v.end()); i != end; ++i)
{
sum += (*i)->m_stats.total_payload_upload();
sum += (*i)->m_quota;
std::cerr << (*i)->m_stats.total_payload_upload() / sample_time
std::cerr << (*i)->m_quota / sample_time
<< " target: " << limit << " eps: " << err << std::endl;
TEST_CHECK(close_to((*i)->m_stats.total_payload_upload() / sample_time, limit, err));
TEST_CHECK(close_to((*i)->m_quota / sample_time, limit, err));
}
sum /= sample_time;
std::cerr << "sum: " << sum << " target: " << (limit * num) << std::endl;
@ -456,24 +243,24 @@ void test_connections_variable_rate(int num, int limit, int torrent_limit)
void test_single_peer(int limit, bool torrent_limit)
{
std::cerr << "\ntest single peer " << limit << " " << torrent_limit << std::endl;
io_service ios;
bandwidth_manager<peer_connection, torrent> manager(ios, 0);
boost::shared_ptr<torrent> t1(new torrent(manager));
bandwidth_manager<peer_connection> manager(0);
bandwidth_channel t1;
global_bwc.throttle(0);
if (torrent_limit)
t1->m_bandwidth_limit[0].throttle(limit);
t1.throttle(limit);
else
manager.throttle(limit);
global_bwc.throttle(limit);
connections_t v;
spawn_connections(v, ios, t1, 1, "p");
run_test(ios, v);
spawn_connections(v, manager, t1, 1, "p");
run_test(v, manager);
float sum = 0.f;
for (connections_t::iterator i = v.begin()
, end(v.end()); i != end; ++i)
{
sum += (*i)->m_stats.total_payload_upload();
sum += (*i)->m_quota;
}
sum /= sample_time;
std::cerr << sum << " target: " << limit << std::endl;
@ -487,25 +274,23 @@ void test_torrents(int num, int limit1, int limit2, int global_limit)
<< " l1: " << limit1
<< " l2: " << limit2
<< " g: " << global_limit << std::endl;
io_service ios;
bandwidth_manager<peer_connection, torrent> manager(ios, 0);
if (global_limit > 0)
manager.throttle(global_limit);
bandwidth_manager<peer_connection> manager(0);
global_bwc.throttle(global_limit);
boost::shared_ptr<torrent> t1(new torrent(manager));
boost::shared_ptr<torrent> t2(new torrent(manager));
bandwidth_channel t1;
bandwidth_channel t2;
t1->m_bandwidth_limit[0].throttle(limit1);
t2->m_bandwidth_limit[0].throttle(limit2);
t1.throttle(limit1);
t2.throttle(limit2);
connections_t v1;
spawn_connections(v1, ios, t1, num, "t1p");
spawn_connections(v1, manager, t1, num, "t1p");
connections_t v2;
spawn_connections(v2, ios, t2, num, "t2p");
spawn_connections(v2, manager, t2, num, "t2p");
connections_t v;
std::copy(v1.begin(), v1.end(), std::back_inserter(v));
std::copy(v2.begin(), v2.end(), std::back_inserter(v));
run_test(ios, v);
run_test(v, manager);
if (global_limit > 0 && global_limit < limit1 + limit2)
{
@ -516,7 +301,7 @@ void test_torrents(int num, int limit1, int limit2, int global_limit)
for (connections_t::iterator i = v1.begin()
, end(v1.end()); i != end; ++i)
{
sum += (*i)->m_stats.total_payload_upload();
sum += (*i)->m_quota;
}
sum /= sample_time;
std::cerr << sum << " target: " << limit1 << std::endl;
@ -527,7 +312,7 @@ void test_torrents(int num, int limit1, int limit2, int global_limit)
for (connections_t::iterator i = v2.begin()
, end(v2.end()); i != end; ++i)
{
sum += (*i)->m_stats.total_payload_upload();
sum += (*i)->m_quota;
}
sum /= sample_time;
std::cerr << sum << " target: " << limit2 << std::endl;
@ -540,31 +325,24 @@ void test_torrents_variable_rate(int num, int limit, int global_limit)
std::cerr << "\ntest torrents variable rate" << num
<< " l: " << limit
<< " g: " << global_limit << std::endl;
io_service ios;
bandwidth_manager<peer_connection, torrent> manager(ios, 0);
if (global_limit > 0)
manager.throttle(global_limit);
bandwidth_manager<peer_connection> manager(0);
global_bwc.throttle(global_limit);
boost::shared_ptr<torrent> t1(new torrent(manager));
boost::shared_ptr<torrent> t2(new torrent(manager));
bandwidth_channel t1;
bandwidth_channel t2;
t1->m_bandwidth_limit[0].throttle(limit);
t2->m_bandwidth_limit[0].throttle(limit);
t1.throttle(limit);
t2.throttle(limit);
connections_t v1;
spawn_connections(v1, ios, t1, num, "t1p");
spawn_connections(v1, manager, t1, num, "t1p");
connections_t v2;
spawn_connections(v2, ios, t2, num, "t2p");
spawn_connections(v2, manager, t2, num, "t2p");
connections_t v;
std::copy(v1.begin(), v1.end(), std::back_inserter(v));
std::copy(v2.begin(), v2.end(), std::back_inserter(v));
error_code ec;
deadline_timer change_rate(ios);
change_rate.expires_from_now(milliseconds(1100), ec);
change_rate.async_wait(boost::bind(&do_change_rate, _1, boost::ref(change_rate), t1, t2, limit, 9));
run_test(ios, v);
run_test(v, manager, boost::bind(&do_change_rate, boost::ref(t1), boost::ref(t2), limit));
if (global_limit > 0 && global_limit < 2 * limit)
limit = global_limit / 2;
@ -573,7 +351,7 @@ void test_torrents_variable_rate(int num, int limit, int global_limit)
for (connections_t::iterator i = v1.begin()
, end(v1.end()); i != end; ++i)
{
sum += (*i)->m_stats.total_payload_upload();
sum += (*i)->m_quota;
}
sum /= sample_time;
std::cerr << sum << " target: " << limit << std::endl;
@ -584,7 +362,7 @@ void test_torrents_variable_rate(int num, int limit, int global_limit)
for (connections_t::iterator i = v2.begin()
, end(v2.end()); i != end; ++i)
{
sum += (*i)->m_stats.total_payload_upload();
sum += (*i)->m_quota;
}
sum /= sample_time;
std::cerr << sum << " target: " << limit << std::endl;
@ -595,73 +373,74 @@ void test_torrents_variable_rate(int num, int limit, int global_limit)
void test_peer_priority(int limit, bool torrent_limit)
{
std::cerr << "\ntest peer priority " << limit << " " << torrent_limit << std::endl;
io_service ios;
bandwidth_manager<peer_connection, torrent> manager(ios, 0);
boost::shared_ptr<torrent> t1(new torrent(manager));
bandwidth_manager<peer_connection> manager(0);
bandwidth_channel t1;
global_bwc.throttle(0);
if (torrent_limit)
t1->m_bandwidth_limit[0].throttle(limit);
t1.throttle(limit);
else
manager.throttle(limit);
global_bwc.throttle(limit);
connections_t v1;
spawn_connections(v1, ios, t1, 10, "p");
spawn_connections(v1, manager, t1, 10, "p");
connections_t v;
std::copy(v1.begin(), v1.end(), std::back_inserter(v));
boost::intrusive_ptr<peer_connection> p(
new peer_connection(ios, t1, 0, false, "no-priority"));
new peer_connection(manager, t1, 1, false, "no-priority"));
v.push_back(p);
run_test(ios, v);
run_test(v, manager);
float sum = 0.f;
for (connections_t::iterator i = v1.begin()
, end(v1.end()); i != end; ++i)
{
sum += (*i)->m_stats.total_payload_upload();
sum += (*i)->m_quota;
}
sum /= sample_time;
std::cerr << sum << " target: " << limit << std::endl;
TEST_CHECK(sum > 0);
TEST_CHECK(close_to(sum, limit, 50));
std::cerr << "non-prioritized rate: " << p->m_stats.total_payload_upload() / sample_time << std::endl;
TEST_CHECK(p->m_stats.total_payload_upload() / sample_time < 10);
std::cerr << "non-prioritized rate: " << p->m_quota / sample_time
<< " target: " << (limit / 200 / 10) << std::endl;
TEST_CHECK(close_to(p->m_quota / sample_time, limit / 200 / 10, 5));
}
void test_no_starvation(int limit)
{
std::cerr << "\ntest no starvation " << limit << std::endl;
io_service ios;
bandwidth_manager<peer_connection, torrent> manager(ios, 0);
boost::shared_ptr<torrent> t1(new torrent(manager));
boost::shared_ptr<torrent> t2(new torrent(manager));
bandwidth_manager<peer_connection> manager(0);
bandwidth_channel t1;
bandwidth_channel t2;
manager.throttle(limit);
global_bwc.throttle(limit);
const int num_peers = 20;
connections_t v1;
spawn_connections(v1, ios, t1, num_peers, "p");
spawn_connections(v1, manager, t1, num_peers, "p");
connections_t v;
std::copy(v1.begin(), v1.end(), std::back_inserter(v));
boost::intrusive_ptr<peer_connection> p(
new peer_connection(ios, t2, 0, false, "no-priority"));
new peer_connection(manager, t2, 1, false, "no-priority"));
v.push_back(p);
run_test(ios, v);
run_test(v, manager);
float sum = 0.f;
for (connections_t::iterator i = v.begin()
, end(v.end()); i != end; ++i)
{
sum += (*i)->m_stats.total_payload_upload();
sum += (*i)->m_quota;
}
sum /= sample_time;
std::cerr << sum << " target: " << limit << std::endl;
TEST_CHECK(sum > 0);
TEST_CHECK(close_to(sum, limit, 50));
std::cerr << "non-prioritized rate: " << p->m_stats.total_payload_upload() / sample_time << std::endl;
TEST_CHECK(close_to(p->m_stats.total_payload_upload() / sample_time, limit / (num_peers + 1), 1000));
std::cerr << "non-prioritized rate: " << p->m_quota / sample_time
<< " target: " << (limit / 200 / num_peers) << std::endl;
TEST_CHECK(close_to(p->m_quota / sample_time, limit / 200 / num_peers, 5));
}
int test_main()