From 7109b8656655588942c671f621024d88aba19f17 Mon Sep 17 00:00:00 2001 From: Arvid Norberg Date: Sun, 26 Apr 2009 00:21:59 +0000 Subject: [PATCH] greatly simplified the rate limiter and generalized all rate limits (peer, torrent, global) to allow for arbitrary rate limit configurations --- examples/client_test.cpp | 6 +- include/libtorrent/aux_/session_impl.hpp | 17 +- include/libtorrent/bandwidth_limit.hpp | 85 ++-- include/libtorrent/bandwidth_manager.hpp | 450 ++++-------------- include/libtorrent/bandwidth_queue_entry.hpp | 57 ++- include/libtorrent/peer_connection.hpp | 16 +- include/libtorrent/peer_info.hpp | 5 +- include/libtorrent/torrent.hpp | 23 +- src/bt_peer_connection.cpp | 8 +- src/http_seed_connection.cpp | 2 +- src/peer_connection.cpp | 202 ++++---- src/session_impl.cpp | 100 ++-- src/torrent.cpp | 150 ++---- src/web_peer_connection.cpp | 2 +- test/test_bandwidth_limiter.cpp | 471 +++++-------------- 15 files changed, 551 insertions(+), 1043 deletions(-) diff --git a/examples/client_test.cpp b/examples/client_test.cpp index e190755ae..190ea5af9 100644 --- a/examples/client_test.cpp +++ b/examples/client_test.cpp @@ -382,11 +382,9 @@ void print_peer_info(std::ostream& out, std::vector const << ((i->flags & peer_info::seed)?'s':'.') << ((i->flags & peer_info::on_parole)?'p':'.') << ((i->flags & peer_info::optimistic_unchoke)?'O':'.') - << ((i->read_state == peer_info::bw_torrent)?'t': - (i->read_state == peer_info::bw_global)?'r': + << ((i->read_state == peer_info::bw_limit)?'r': (i->read_state == peer_info::bw_network)?'R':'.') - << ((i->write_state == peer_info::bw_torrent)?'t': - (i->write_state == peer_info::bw_global)?'w': + << ((i->write_state == peer_info::bw_limit)?'w': (i->write_state == peer_info::bw_network)?'W':'.') << ((i->flags & peer_info::snubbed)?'S':'.') << ((i->flags & peer_info::upload_only)?'U':'D') diff --git a/include/libtorrent/aux_/session_impl.hpp b/include/libtorrent/aux_/session_impl.hpp index c0197fdb0..7c67b72ce 100644 --- a/include/libtorrent/aux_/session_impl.hpp +++ b/include/libtorrent/aux_/session_impl.hpp @@ -387,10 +387,13 @@ namespace libtorrent // handing out bandwidth to connections that // asks for it, it can also throttle the // rate. - bandwidth_manager m_download_channel; - bandwidth_manager m_upload_channel; + bandwidth_manager m_download_rate; + bandwidth_manager m_upload_rate; - bandwidth_manager* m_bandwidth_manager[2]; + bandwidth_channel m_download_channel; + bandwidth_channel m_upload_channel; + + bandwidth_channel* m_bandwidth_channel[2]; tracker_manager m_tracker_manager; torrent_map m_torrents; @@ -530,13 +533,15 @@ namespace libtorrent // NAT or not. bool m_incoming_connection; - void second_tick(error_code const& e); + void on_tick(error_code const& e); + void recalculate_auto_managed_torrents(); void recalculate_unchoke_slots(int congested_torrents , int uncongested_torrents); void recalculate_optimistic_unchoke_slot(); ptime m_last_tick; + ptime m_last_second_tick; // the last time we went through the peers // to decide which ones to choke/unchoke @@ -581,11 +586,11 @@ namespace libtorrent int m_tcp_mapping[2]; int m_udp_mapping[2]; - // the timer used to fire the second_tick + // the timer used to fire the tick deadline_timer m_timer; // the index of the torrent that will be offered to - // connect to a peer next time second_tick is called. + // connect to a peer next time on_tick is called. // This implements a round robin. int m_next_connect_torrent; #ifdef TORRENT_DEBUG diff --git a/include/libtorrent/bandwidth_limit.hpp b/include/libtorrent/bandwidth_limit.hpp index e0675aa31..e3b54bf65 100644 --- a/include/libtorrent/bandwidth_limit.hpp +++ b/include/libtorrent/bandwidth_limit.hpp @@ -30,8 +30,8 @@ POSSIBILITY OF SUCH DAMAGE. */ -#ifndef TORRENT_BANDWIDTH_LIMIT_HPP_INCLUDED -#define TORRENT_BANDWIDTH_LIMIT_HPP_INCLUDED +#ifndef TORRENT_BANDWIDTH_CHANNEL_HPP_INCLUDED +#define TORRENT_BANDWIDTH_CHANNEL_HPP_INCLUDED #include @@ -40,78 +40,77 @@ POSSIBILITY OF SUCH DAMAGE. namespace libtorrent { // member of peer_connection -struct bandwidth_limit +struct bandwidth_channel { static const int inf = boost::integer_traits::const_max; - bandwidth_limit() + bandwidth_channel() : m_quota_left(0) - , m_local_limit(inf) - , m_current_rate(0) + , m_limit(0) {} + // 0 means infinite void throttle(int limit) { - TORRENT_ASSERT(limit > 0); - m_local_limit = limit; + TORRENT_ASSERT(limit >= 0); + // if the throttle is more than this, we might overflow + TORRENT_ASSERT(limit < INT_MAX / 31); + m_limit = limit; } int throttle() const { - return m_local_limit; + return m_limit; } - void assign(int amount) + int quota_left() const + { + if (m_limit == 0) return inf; + return (std::max)(m_quota_left, 0); + } + + void update_quota(int dt_milliseconds) + { + if (m_limit == 0) return; + m_quota_left += (m_limit * dt_milliseconds + 500) / 1000; + if (m_quota_left > m_limit * 3) m_quota_left = m_limit * 3; + distribute_quota = (std::max)(m_quota_left, 0); +// fprintf(stderr, "%p: [%d]: + %d limit: %d\n", this, dt_milliseconds, (m_limit * dt_milliseconds + 500) / 1000, m_limit); + } + + // this is used when connections disconnect with + // some quota left. It's returned to its bandwidth + // channels. + void return_quota(int amount) { TORRENT_ASSERT(amount >= 0); - m_current_rate += amount; + if (m_limit == 0) return; m_quota_left += amount; } void use_quota(int amount) { - TORRENT_ASSERT(amount <= m_quota_left); + TORRENT_ASSERT(amount >= 0); + if (m_limit == 0) return; m_quota_left -= amount; } - int quota_left() const - { - return (std::max)(m_quota_left, 0); - } + // used as temporary storage while distributing + // bandwidth + int tmp; - void expire(int amount) - { - TORRENT_ASSERT(amount >= 0); - m_current_rate -= amount; - } - - int max_assignable() const - { - if (m_local_limit == inf) return inf; - if (m_local_limit <= m_current_rate) return 0; - return m_local_limit - m_current_rate; - } + // this is the number of bytes to distribute this round + int distribute_quota; private: // this is the amount of bandwidth we have - // been assigned without using yet. i.e. - // the bandwidth that we use up every time - // we receive or send a message. Once this - // hits zero, we need to request more - // bandwidth from the torrent which - // in turn will request bandwidth from - // the bandwidth manager + // been assigned without using yet. int m_quota_left; - // the local limit is the number of bytes - // per window size we are allowed to use. - int m_local_limit; - - // the current rate is the number of - // bytes we have been assigned within - // the window size. - int m_current_rate; + // the limit is the number of bytes + // per second we are allowed to use. + int m_limit; }; } diff --git a/include/libtorrent/bandwidth_manager.hpp b/include/libtorrent/bandwidth_manager.hpp index 535f29608..9c222abe6 100644 --- a/include/libtorrent/bandwidth_manager.hpp +++ b/include/libtorrent/bandwidth_manager.hpp @@ -38,7 +38,6 @@ POSSIBILITY OF SUCH DAMAGE. #include #include #include -#include #include #ifdef TORRENT_VERBOSE_BANDWIDTH_LIMIT @@ -59,65 +58,16 @@ using boost::bind; namespace libtorrent { -// the maximum block of bandwidth quota to -// hand out is 33kB. The block size may -// be smaller on lower limits -enum -{ - max_bandwidth_block_size = 33000, - min_bandwidth_block_size = 400 -}; - -const time_duration bw_window_size = seconds(1); - -template -struct history_entry -{ - history_entry(intrusive_ptr p, weak_ptr t - , int a, ptime exp) - : expires_at(exp), amount(a), peer(p), tor(t) {} - history_entry(int a, ptime exp) - : expires_at(exp), amount(a), peer(), tor() {} - ptime expires_at; - int amount; - intrusive_ptr peer; - weak_ptr tor; -}; - -template -T clamp(T val, T ceiling, T floor) -{ - TORRENT_ASSERT(ceiling >= floor); - if (val >= ceiling) return ceiling; - else if (val <= floor) return floor; - return val; -} - -template -struct assign_at_exit -{ - assign_at_exit(T& var, T val): var_(var), val_(val) {} - ~assign_at_exit() { var_ = val_; } - T& var_; - T val_; -}; - -template +template struct bandwidth_manager { - bandwidth_manager(io_service& ios, int channel + bandwidth_manager(int channel #ifdef TORRENT_VERBOSE_BANDWIDTH_LIMIT , bool log = false #endif ) - : m_ios(ios) - , m_history_timer(m_ios) - , m_limit(bandwidth_limit::inf) - , m_drain_quota(0) - , m_current_quota(0) - , m_queued_bytes(0) + : m_queued_bytes(0) , m_channel(channel) - , m_in_hand_out_bandwidth(false) , m_abort(false) { #ifdef TORRENT_VERBOSE_BANDWIDTH_LIMIT @@ -127,47 +77,16 @@ struct bandwidth_manager #endif } - void drain(int bytes) - { - mutex_t::scoped_lock l(m_mutex); - TORRENT_ASSERT(bytes >= 0); - m_drain_quota += bytes; - if (m_drain_quota > m_limit * 5) m_drain_quota = m_limit * 5; - } - - void throttle(int limit) - { - mutex_t::scoped_lock l(m_mutex); - TORRENT_ASSERT(limit >= 0); - m_limit = limit; - } - - int throttle() const - { - mutex_t::scoped_lock l(m_mutex); - return m_limit; - } - void close() { - mutex_t::scoped_lock l(m_mutex); m_abort = true; m_queue.clear(); m_queued_bytes = 0; - m_history.clear(); - m_current_quota = 0; error_code ec; - m_history_timer.cancel(ec); } #ifdef TORRENT_DEBUG bool is_queued(PeerConnection const* peer) const - { - mutex_t::scoped_lock l(m_mutex); - return is_queued(peer); - } - - bool is_queued(PeerConnection const* peer, boost::mutex::scoped_lock& l) const { for (typename queue_t::const_iterator i = m_queue.begin() , end(m_queue.end()); i != end; ++i) @@ -176,33 +95,15 @@ struct bandwidth_manager } return false; } - - bool is_in_history(PeerConnection const* peer) const - { - mutex_t::scoped_lock l(m_mutex); - return is_in_history(peer, l); - } - - bool is_in_history(PeerConnection const* peer, boost::mutex::scoped_lock& l) const - { - for (typename history_t::const_iterator i - = m_history.begin(), end(m_history.end()); i != end; ++i) - { - if (i->peer.get() == peer) return true; - } - return false; - } #endif int queue_size() const { - mutex_t::scoped_lock l(m_mutex); return m_queue.size(); } int queued_bytes() const { - mutex_t::scoped_lock l(m_mutex); return m_queued_bytes; } @@ -210,297 +111,144 @@ struct bandwidth_manager // others will cut in front of the non-prioritized peers. // this is used by web seeds void request_bandwidth(intrusive_ptr const& peer - , int blk, int priority) + , int blk, int priority + , bandwidth_channel* chan1 = 0 + , bandwidth_channel* chan2 = 0 + , bandwidth_channel* chan3 = 0 + , bandwidth_channel* chan4 = 0 + , bandwidth_channel* chan5 = 0 + ) { - mutex_t::scoped_lock l(m_mutex); INVARIANT_CHECK; if (m_abort) return; + TORRENT_ASSERT(blk > 0); - TORRENT_ASSERT(!is_queued(peer.get(), l)); + TORRENT_ASSERT(priority > 0); + TORRENT_ASSERT(!is_queued(peer.get())); - // make sure this peer isn't already in line - // waiting for bandwidth - TORRENT_ASSERT(peer->max_assignable_bandwidth(m_channel) > 0); - - typename queue_t::reverse_iterator i(m_queue.rbegin()); - while (i != m_queue.rend() && priority > i->priority) + bw_request bwr(peer, blk, priority); + int i = 0; + if (chan1 && chan1->throttle() > 0) bwr.channel[i++] = chan1; + if (chan2 && chan2->throttle() > 0) bwr.channel[i++] = chan2; + if (chan3 && chan3->throttle() > 0) bwr.channel[i++] = chan3; + if (chan4 && chan4->throttle() > 0) bwr.channel[i++] = chan4; + if (chan5 && chan5->throttle() > 0) bwr.channel[i++] = chan5; + if (i == 0) { - ++i->priority; - ++i; + // the connection is not rate limited by any of its + // bandwidth channels, or it doesn't belong to any + // channels. There's no point in adding it to + // the queue, just satisfy the request immediately + bwr.peer->assign_bandwidth(m_channel, blk); + return; } - m_queue.insert(i.base(), bw_queue_entry(peer, blk, priority)); m_queued_bytes += blk; - if (!m_queue.empty()) hand_out_bandwidth(l); + m_queue.push_back(bwr); } #ifdef TORRENT_DEBUG void check_invariant() const { - int current_quota = 0; - for (typename history_t::const_iterator i - = m_history.begin(), end(m_history.end()); i != end; ++i) - { - current_quota += i->amount; - } - TORRENT_ASSERT(current_quota == m_current_quota); - - int bytes = 0; - typename queue_t::const_iterator j = m_queue.begin(); - if (j != m_queue.end()) - { - bytes += j->max_block_size; - ++j; - for (typename queue_t::const_iterator i = m_queue.begin() - , end(m_queue.end()); i != end && j != end; ++i, ++j) - { - TORRENT_ASSERT(i->priority >= j->priority); - bytes += j->max_block_size; - } - } - TORRENT_ASSERT(bytes == m_queued_bytes); } #endif -private: - - void add_history_entry(history_entry const& e) + void update_quotas(time_duration const& dt) { - INVARIANT_CHECK; - - m_history.push_front(e); - m_current_quota += e.amount; - // in case the size > 1 there is already a timer - // active that will be invoked, no need to set one up - -#ifdef TORRENT_VERBOSE_BANDWIDTH_LIMIT - m_log << std::setw(7) << total_milliseconds(time_now() - m_start) << " + " - " queue: " << std::setw(3) << m_queue.size() - << " used: " << std::setw(7) << m_current_quota - << " limit: " << std::setw(7) << m_limit - << " history: " << std::setw(3) << m_history.size() - << std::endl; -#endif - if (m_history.size() > 1) return; - if (m_abort) return; - - error_code ec; - TORRENT_ASSERT(e.expires_at > time_now()); - m_history_timer.expires_at(e.expires_at, ec); - m_history_timer.async_wait(bind(&bandwidth_manager::on_history_expire, this, _1)); - } - - void on_history_expire(error_code const& e) - { - if (e) return; - - mutex_t::scoped_lock l(m_mutex); - if (m_abort) return; - INVARIANT_CHECK; - - TORRENT_ASSERT(!m_history.empty()); - - ptime now(time_now()); - while (!m_history.empty() && m_history.back().expires_at <= now) - { - history_entry e = m_history.back(); - m_history.pop_back(); - m_current_quota -= e.amount; - TORRENT_ASSERT(m_current_quota >= 0); - -#ifdef TORRENT_VERBOSE_BANDWIDTH_LIMIT - m_log << std::setw(7) << total_milliseconds(time_now() - m_start) << " - " - " queue: " << std::setw(3) << m_queue.size() - << " used: " << std::setw(7) << m_current_quota - << " limit: " << std::setw(7) << m_limit - << " history: " << std::setw(3) << m_history.size() - << std::endl; -#endif - intrusive_ptr c = e.peer; - if (!c) continue; - shared_ptr t = e.tor.lock(); - l.unlock(); - if (!c->is_disconnecting()) c->expire_bandwidth(m_channel, e.amount); - if (t) t->expire_bandwidth(m_channel, e.amount); - l.lock(); - } - - // now, wait for the next chunk to expire - if (!m_history.empty() && !m_abort) - { - error_code ec; - TORRENT_ASSERT(m_history.back().expires_at > now); - m_history_timer.expires_at(m_history.back().expires_at, ec); - m_history_timer.async_wait(bind(&bandwidth_manager::on_history_expire, this, _1)); - } - - // since some bandwidth just expired, it - // means we can hand out more (in case there - // are still consumers in line) - if (!m_queue.empty()) hand_out_bandwidth(l); - } - - void hand_out_bandwidth(boost::mutex::scoped_lock& l) - { - // if we're already handing out bandwidth, just return back - // to the loop further down on the callstack - if (m_in_hand_out_bandwidth) return; - m_in_hand_out_bandwidth = true; - // set it to false when exiting function - assign_at_exit sg(m_in_hand_out_bandwidth, false); + if (m_queue.empty()) return; INVARIANT_CHECK; - ptime now(time_now()); + int dt_milliseconds = total_milliseconds(dt); + if (dt_milliseconds > 3000) dt_milliseconds = 3000; - int limit = m_limit; + // for each bandwidth channel, call update_quota(dt) - // available bandwidth to hand out - int amount = limit - m_current_quota; + std::vector channels; - if (amount <= 0) return; - - if (m_drain_quota > 0) + for (typename queue_t::iterator i = m_queue.begin(); + i != m_queue.end();) { - int drain_amount = (std::min)(m_drain_quota, amount); - m_drain_quota -= drain_amount; - amount -= drain_amount; - add_history_entry(history_entry( - drain_amount, now + bw_window_size)); - } - - queue_t tmp; - while (!m_queue.empty() && amount > 0) - { - bw_queue_entry qe = m_queue.front(); - TORRENT_ASSERT(qe.max_block_size > 0); - m_queued_bytes -= qe.max_block_size; - m_queue.pop_front(); - - shared_ptr t = qe.torrent.lock(); - if (!t) continue; - if (qe.peer->is_disconnecting()) + if (i->peer->is_disconnecting()) { - l.unlock(); - t->expire_bandwidth(m_channel, qe.max_block_size); - l.lock(); + m_queued_bytes -= i->request_size - i->assigned; + + // return all assigned quota to all the + // bandwidth channels this peer belongs to + for (int j = 0; j < 5 && i->channel[j]; ++j) + { + bandwidth_channel* bwc = i->channel[j]; + bwc->return_quota(i->assigned); + } + + i = m_queue.erase(i); continue; } - - // at this point, max_assignable may actually be zero. Since - // the rate limit of the peer might have changed while it - // was in the queue. - int max_assignable = qe.peer->max_assignable_bandwidth(m_channel); - if (max_assignable == 0) + for (int j = 0; j < 5 && i->channel[j]; ++j) { - TORRENT_ASSERT(is_in_history(qe.peer.get(), l)); - m_queued_bytes += qe.max_block_size; - tmp.push_back(qe); - continue; + bandwidth_channel* bwc = i->channel[j]; + bwc->tmp = 0; } - - // this is the limit of the block size. It depends on the throttle - // so that it can be closer to optimal. Larger block sizes will give lower - // granularity to the rate but will be more efficient. At high rates - // the block sizes are bigger and at low rates, the granularity - // is more important and block sizes are smaller - - // the minimum rate that can be given is the block size, so, the - // block size must be smaller for lower rates. This is because - // the history window is one second, and the block will be forgotten - // after one second. - int block_size = (std::min)(qe.peer->bandwidth_throttle(m_channel) - , limit / 10); - - if (block_size < min_bandwidth_block_size) - { - block_size = (std::min)(int(min_bandwidth_block_size), limit); - } - else if (block_size > max_bandwidth_block_size) - { - if (limit == bandwidth_limit::inf) - { - block_size = max_bandwidth_block_size; - } - else - { - // try to make the block_size a divisor of - // m_limit to make the distributions as fair - // as possible - // TODO: move this calculcation to where the limit - // is changed - block_size = limit - / (limit / max_bandwidth_block_size); - } - } - if (block_size > qe.max_block_size) block_size = qe.max_block_size; - - if (amount < block_size / 4) - { - m_queued_bytes += qe.max_block_size; - tmp.push_back(qe); -// m_queue.push_front(qe); - break; - } - - // so, hand out max_assignable, but no more than - // the available bandwidth (amount) and no more - // than the max_bandwidth_block_size - int hand_out_amount = (std::min)((std::min)(block_size, max_assignable) - , amount); - TORRENT_ASSERT(hand_out_amount > 0); - amount -= hand_out_amount; - TORRENT_ASSERT(hand_out_amount <= qe.max_block_size); - l.unlock(); - t->assign_bandwidth(m_channel, hand_out_amount, qe.max_block_size); - qe.peer->assign_bandwidth(m_channel, hand_out_amount); - l.lock(); - add_history_entry(history_entry( - qe.peer, t, hand_out_amount, now + bw_window_size)); + ++i; + } + + for (typename queue_t::iterator i = m_queue.begin() + , end(m_queue.end()); i != end; ++i) + { + for (int j = 0; j < 5 && i->channel[j]; ++j) + { + bandwidth_channel* bwc = i->channel[j]; + if (bwc->tmp == 0) channels.push_back(bwc); + bwc->tmp += i->priority; + TORRENT_ASSERT(i->priority > 0); + } + } + + for (std::vector::iterator i = channels.begin() + , end(channels.end()); i != end; ++i) + { + (*i)->update_quota(dt_milliseconds); + } + + queue_t tm; + + for (typename queue_t::iterator i = m_queue.begin(); + i != m_queue.end();) + { + int a = i->assign_bandwidth(); + if (i->assigned == i->request_size + || (i->ttl <= 0 && i->assigned > 0)) + { + TORRENT_ASSERT(i->assigned <= i->request_size); + tm.push_back(*i); + i = m_queue.erase(i); + } + else + { + ++i; + } + m_queued_bytes -= a; + } + + while (!tm.empty()) + { + bw_request& bwr = tm.back(); + bwr.peer->assign_bandwidth(m_channel, bwr.assigned); + tm.pop_back(); } - if (!tmp.empty()) m_queue.insert(m_queue.begin(), tmp.begin(), tmp.end()); } - typedef boost::mutex mutex_t; - mutable mutex_t m_mutex; - - // the io_service used for the timer - io_service& m_ios; - - // the timer that is waiting for the entries - // in the history queue to expire (slide out - // of the history window) - deadline_timer m_history_timer; - - // the rate limit (bytes per second) - int m_limit; - - // bytes to drain without handing out to a peer - // used to deduct the IP overhead - int m_drain_quota; - - // the sum of all recently handed out bandwidth blocks - int m_current_quota; - // these are the consumers that want bandwidth - typedef std::deque > queue_t; + typedef std::vector > queue_t; queue_t m_queue; + // the number of bytes all the requests in queue are for int m_queued_bytes; - // these are the consumers that have received bandwidth - // that will expire - typedef std::deque > history_t; - history_t m_history; - // this is the channel within the consumers // that bandwidth is assigned to (upload or download) int m_channel; - // this is true while we're in the hand_out_bandwidth loop - // to prevent recursive invocations to interfere - bool m_in_hand_out_bandwidth; - bool m_abort; #ifdef TORRENT_VERBOSE_BANDWIDTH_LIMIT diff --git a/include/libtorrent/bandwidth_queue_entry.hpp b/include/libtorrent/bandwidth_queue_entry.hpp index 54f669062..e3dfbe467 100644 --- a/include/libtorrent/bandwidth_queue_entry.hpp +++ b/include/libtorrent/bandwidth_queue_entry.hpp @@ -34,20 +34,61 @@ POSSIBILITY OF SUCH DAMAGE. #define TORRENT_BANDWIDTH_QUEUE_ENTRY_HPP_INCLUDED #include +#include "libtorrent/bandwidth_limit.hpp" namespace libtorrent { -template -struct bw_queue_entry +template +struct bw_request { - bw_queue_entry(boost::intrusive_ptr const& pe + bw_request(boost::intrusive_ptr const& pe , int blk, int prio) - : peer(pe), torrent(peer->associated_torrent()) - , max_block_size(blk), priority(prio) {} + : peer(pe) + , priority(prio) + , assigned(0) + , request_size(blk) + , ttl(20) + { + TORRENT_ASSERT(priority > 0); + std::memset(channel, 0, sizeof(channel)); + } + boost::intrusive_ptr peer; - boost::weak_ptr torrent; - int max_block_size; - int priority; // 0 is low prio + // 1 is normal prio + int priority; + // the number of bytes assigned to this request so far + int assigned; + // once assigned reaches this, we dispatch the request function + int request_size; + + // the max number of rounds for this request to survive + // this ensures that requests gets responses at very low + // rate limits, when the requested size would take a long + // time to satisfy + int ttl; + + // loops over the bandwidth channels and assigns bandwidth + // from the most limiting one + int assign_bandwidth() + { + TORRENT_ASSERT(assigned < request_size); + int quota = request_size - assigned; + TORRENT_ASSERT(quota >= 0); + for (int j = 0; j < 5 && channel[j]; ++j) + { + if (channel[j]->throttle() == 0) continue; + quota = (std::min)(channel[j]->distribute_quota * priority / channel[j]->tmp, quota); + } + assigned += quota; + for (int j = 0; j < 5 && channel[j]; ++j) + channel[j]->use_quota(quota); + TORRENT_ASSERT(assigned <= request_size); + --ttl; + TORRENT_ASSERT(assigned <= request_size); + return quota; + } + + bandwidth_channel* channel[5]; }; } diff --git a/include/libtorrent/peer_connection.hpp b/include/libtorrent/peer_connection.hpp index 9e3679c4a..c53859342 100644 --- a/include/libtorrent/peer_connection.hpp +++ b/include/libtorrent/peer_connection.hpp @@ -224,7 +224,10 @@ namespace libtorrent void no_download(bool b) { m_no_download = b; } void set_priority(int p) - { m_priority = p; } + { + TORRENT_ASSERT(p > 0); + m_priority = p; + } void fast_reconnect(bool r); bool fast_reconnect() const { return m_fast_reconnect; } @@ -433,14 +436,10 @@ namespace libtorrent void cancel_request(piece_block const& b); void send_block_requests(); - int max_assignable_bandwidth(int channel) const - { return m_bandwidth_limit[channel].max_assignable(); } - int bandwidth_throttle(int channel) const - { return m_bandwidth_limit[channel].throttle(); } + { return m_bandwidth_channel[channel].throttle(); } void assign_bandwidth(int channel, int amount); - void expire_bandwidth(int channel, int amount); #ifdef TORRENT_DEBUG void check_invariant() const; @@ -578,7 +577,10 @@ namespace libtorrent // the bandwidth channels, upload and download // keeps track of the current quotas - bandwidth_limit m_bandwidth_limit[num_channels]; + bandwidth_channel m_bandwidth_channel[num_channels]; + + // number of bytes this peer can send and receive + int m_quota[2]; // statistics about upload and download speeds // and total amount of uploads and downloads for diff --git a/include/libtorrent/peer_info.hpp b/include/libtorrent/peer_info.hpp index 1a0dc1a3a..1fc251823 100644 --- a/include/libtorrent/peer_info.hpp +++ b/include/libtorrent/peer_info.hpp @@ -84,7 +84,10 @@ namespace libtorrent // bw_global: the channel is waiting for global quota // bw_network: the channel is waiting for an async write // for read operation to complete - enum bw_state { bw_idle, bw_torrent, bw_global, bw_network }; + enum bw_state { bw_idle, bw_limit, bw_network }; +#ifndef TORRENT_NO_DEPRECATE + enum bw_state_deprecated { bw_torrent = bw_limit, bw_global = bw_limit }; +#endif char read_state; char write_state; diff --git a/include/libtorrent/torrent.hpp b/include/libtorrent/torrent.hpp index 2436fd0b8..31fad4605 100644 --- a/include/libtorrent/torrent.hpp +++ b/include/libtorrent/torrent.hpp @@ -70,6 +70,7 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/hasher.hpp" #include "libtorrent/assert.hpp" #include "libtorrent/bitfield.hpp" +#include "libtorrent/aux_/session_impl.hpp" namespace libtorrent { @@ -288,26 +289,10 @@ namespace libtorrent // -------------------------------------------- // BANDWIDTH MANAGEMENT - bandwidth_limit m_bandwidth_limit[2]; + bandwidth_channel m_bandwidth_channel[2]; - void request_bandwidth(int channel - , boost::intrusive_ptr const& p - , int max_block_size, int priority); - - void perform_bandwidth_request(int channel - , boost::intrusive_ptr const& p - , int block_size, int priority); - - void expire_bandwidth(int channel, int amount); - void assign_bandwidth(int channel, int amount, int blk); - int bandwidth_throttle(int channel) const; - int max_assignable_bandwidth(int channel) const - { return m_bandwidth_limit[channel].max_assignable(); } - - int bandwidth_queue_size(int channel) const; - // -------------------------------------------- // PEER MANAGEMENT @@ -845,10 +830,6 @@ namespace libtorrent boost::scoped_ptr m_picker; - // the queue of peer_connections that want more bandwidth - typedef std::deque > queue_t; - queue_t m_bandwidth_queue[2]; - std::vector m_trackers; // this is an index into m_trackers diff --git a/src/bt_peer_connection.cpp b/src/bt_peer_connection.cpp index 35a0af8e3..860211447 100644 --- a/src/bt_peer_connection.cpp +++ b/src/bt_peer_connection.cpp @@ -157,11 +157,11 @@ namespace libtorrent // connection, we have to give it some initial bandwidth // to send the handshake. #ifndef TORRENT_DISABLE_ENCRYPTION - m_bandwidth_limit[download_channel].assign(2048); - m_bandwidth_limit[upload_channel].assign(2048); + m_quota[download_channel] = 2048; + m_quota[upload_channel] = 2048; #else - m_bandwidth_limit[download_channel].assign(80); - m_bandwidth_limit[upload_channel].assign(80); + m_quota[download_channel] = 80; + m_quota[upload_channel] = 80; #endif #ifdef TORRENT_DEBUG diff --git a/src/http_seed_connection.cpp b/src/http_seed_connection.cpp index 26f2f5dfd..a554ca3e9 100644 --- a/src/http_seed_connection.cpp +++ b/src/http_seed_connection.cpp @@ -76,7 +76,7 @@ namespace libtorrent prefer_whole_pieces(1); // we only want left-over bandwidth - set_priority(0); + set_priority(1); shared_ptr tor = t.lock(); TORRENT_ASSERT(tor); int blocks_per_piece = tor->torrent_file().piece_length() / tor->block_size(); diff --git a/src/peer_connection.cpp b/src/peer_connection.cpp index 40943d201..0c3c85061 100644 --- a/src/peer_connection.cpp +++ b/src/peer_connection.cpp @@ -105,8 +105,8 @@ namespace libtorrent , m_reading_bytes(0) , m_num_invalid_requests(0) , m_priority(1) - , m_upload_limit(bandwidth_limit::inf) - , m_download_limit(bandwidth_limit::inf) + , m_upload_limit(0) + , m_download_limit(0) , m_peer_info(peerinfo) , m_speed(slow) , m_connection_ticket(-1) @@ -148,6 +148,9 @@ namespace libtorrent m_channel_state[upload_channel] = peer_info::bw_idle; m_channel_state[download_channel] = peer_info::bw_idle; + m_quota[0] = 0; + m_quota[1] = 0; + TORRENT_ASSERT(peerinfo == 0 || peerinfo->banned == false); #ifndef TORRENT_DISABLE_RESOLVE_COUNTRIES std::fill(m_country, m_country + 2, 0); @@ -222,8 +225,8 @@ namespace libtorrent , m_reading_bytes(0) , m_num_invalid_requests(0) , m_priority(1) - , m_upload_limit(bandwidth_limit::inf) - , m_download_limit(bandwidth_limit::inf) + , m_upload_limit(0) + , m_download_limit(0) , m_peer_info(peerinfo) , m_speed(slow) , m_connection_ticket(-1) @@ -265,6 +268,9 @@ namespace libtorrent m_channel_state[upload_channel] = peer_info::bw_idle; m_channel_state[download_channel] = peer_info::bw_idle; + m_quota[0] = 0; + m_quota[1] = 0; + #ifndef TORRENT_DISABLE_RESOLVE_COUNTRIES std::fill(m_country, m_country + 2, 0); #ifndef TORRENT_DISABLE_GEO_IP @@ -2796,19 +2802,19 @@ namespace libtorrent void peer_connection::set_upload_limit(int limit) { TORRENT_ASSERT(limit >= -1); - if (limit == -1) limit = (std::numeric_limits::max)(); - if (limit < 10) limit = 10; + if (limit < 0) limit = 0; + if (limit < 10 && limit > 0) limit = 10; m_upload_limit = limit; - m_bandwidth_limit[upload_channel].throttle(m_upload_limit); + m_bandwidth_channel[upload_channel].throttle(m_upload_limit); } void peer_connection::set_download_limit(int limit) { TORRENT_ASSERT(limit >= -1); - if (limit == -1) limit = (std::numeric_limits::max)(); - if (limit < 10) limit = 10; + if (limit < 0) limit = 0; + if (limit < 10 && limit > 0) limit = 10; m_download_limit = limit; - m_bandwidth_limit[download_channel].throttle(m_download_limit); + m_bandwidth_channel[download_channel].throttle(m_download_limit); } size_type peer_connection::share_diff() const @@ -2862,8 +2868,8 @@ namespace libtorrent p.pid = pid(); p.ip = remote(); p.pending_disk_bytes = m_outstanding_writing_bytes; - p.send_quota = m_bandwidth_limit[upload_channel].quota_left(); - p.receive_quota = m_bandwidth_limit[download_channel].quota_left(); + p.send_quota = m_quota[upload_channel]; + p.receive_quota = m_quota[download_channel]; p.num_pieces = m_num_pieces; if (m_download_queue.empty()) p.request_timeout = -1; else p.request_timeout = total_seconds(m_requested - now) + m_ses.settings().request_timeout @@ -2883,15 +2889,15 @@ namespace libtorrent p.total_download = statistics().total_payload_download(); p.total_upload = statistics().total_payload_upload(); - if (m_bandwidth_limit[upload_channel].throttle() == bandwidth_limit::inf) + if (m_bandwidth_channel[upload_channel].throttle() == 0) p.upload_limit = -1; else - p.upload_limit = m_bandwidth_limit[upload_channel].throttle(); + p.upload_limit = m_bandwidth_channel[upload_channel].throttle(); - if (m_bandwidth_limit[download_channel].throttle() == bandwidth_limit::inf) + if (m_bandwidth_channel[download_channel].throttle() == 0) p.download_limit = -1; else - p.download_limit = m_bandwidth_limit[download_channel].throttle(); + p.download_limit = m_bandwidth_channel[download_channel].throttle(); p.load_balancing = total_free_upload(); @@ -3068,6 +3074,43 @@ namespace libtorrent INVARIANT_CHECK; boost::shared_ptr t = m_torrent.lock(); + + // drain the IP overhead from the bandwidth limiters + if (m_ses.m_settings.rate_limit_ip_overhead) + { + int download_overhead = m_statistics.download_ip_overhead(); + int upload_overhead = m_statistics.upload_ip_overhead(); + m_bandwidth_channel[download_channel].use_quota(download_overhead); + m_bandwidth_channel[upload_channel].use_quota(upload_overhead); + + int up_limit = m_bandwidth_channel[upload_channel].throttle(); + int down_limit = m_bandwidth_channel[download_channel].throttle(); + + if (t) + { + t->m_bandwidth_channel[download_channel].use_quota(download_overhead); + t->m_bandwidth_channel[upload_channel].use_quota(upload_overhead); + + if (down_limit > 0 + && download_overhead >= down_limit + && t->alerts().should_post()) + { + t->alerts().post_alert(performance_alert(t->get_handle() + , performance_alert::download_limit_too_low)); + } + + if (up_limit > 0 + && upload_overhead >= up_limit + && t->alerts().should_post()) + { + t->alerts().post_alert(performance_alert(t->get_handle() + , performance_alert::upload_limit_too_low)); + } + } + m_ses.m_download_channel.use_quota(download_overhead); + m_ses.m_upload_channel.use_quota(upload_overhead); + } + if (!t || m_disconnecting) { m_ses.m_half_open.done(m_connection_ticket); @@ -3258,7 +3301,7 @@ namespace libtorrent // if we have downloaded more than one piece more // than we have uploaded OR if we are a seed // have an unlimited upload rate - m_bandwidth_limit[upload_channel].throttle(m_upload_limit); + m_bandwidth_channel[upload_channel].throttle(m_upload_limit); } else { @@ -3275,14 +3318,17 @@ namespace libtorrent if (t->ratio() != 1.f) soon_downloaded = (size_type)(soon_downloaded*(double)t->ratio()); - double upload_speed_limit = (std::min)((soon_downloaded - have_uploaded - + bias) / break_even_time, double(m_upload_limit)); + double upload_speed_limit = (soon_downloaded - have_uploaded + + bias) / break_even_time; + + if (m_upload_limit > 0 && m_upload_limit < upload_speed_limit) + upload_speed_limit = m_upload_limit; upload_speed_limit = (std::min)(upload_speed_limit, (double)(std::numeric_limits::max)()); - m_bandwidth_limit[upload_channel].throttle( - (std::min)((std::max)((int)upload_speed_limit, 20) + m_bandwidth_channel[upload_channel].throttle( + (std::min)((std::max)((int)upload_speed_limit, 10) , m_upload_limit)); } @@ -3485,8 +3531,9 @@ namespace libtorrent (*m_logger) << "bandwidth [ " << channel << " ] + " << amount << "\n"; #endif - m_bandwidth_limit[channel].assign(amount); - TORRENT_ASSERT(m_channel_state[channel] == peer_info::bw_global); + TORRENT_ASSERT(amount > 0); + m_quota[channel] += amount; + TORRENT_ASSERT(m_channel_state[channel] == peer_info::bw_limit); m_channel_state[channel] = peer_info::bw_idle; if (channel == upload_channel) { @@ -3498,21 +3545,6 @@ namespace libtorrent } } - void peer_connection::expire_bandwidth(int channel, int amount) - { - session_impl::mutex_t::scoped_lock l(m_ses.m_mutex); - - m_bandwidth_limit[channel].expire(amount); - if (channel == upload_channel) - { - setup_send(); - } - else if (channel == download_channel) - { - setup_receive(); - } - } - void peer_connection::setup_send() { session_impl::mutex_t::scoped_lock l(m_ses.m_mutex); @@ -3521,7 +3553,7 @@ namespace libtorrent shared_ptr t = m_torrent.lock(); - if (m_bandwidth_limit[upload_channel].quota_left() == 0 + if (m_quota[upload_channel] == 0 && !m_send_buffer.empty() && !m_connecting && t @@ -3531,18 +3563,19 @@ namespace libtorrent // bandwidth. So, we simply request bandwidth // from the torrent TORRENT_ASSERT(t); - if (m_bandwidth_limit[upload_channel].max_assignable() > 0) - { - int priority = is_interesting() * 2 + m_requests_in_buffer.size(); - // peers that we are not interested in are non-prioritized - m_channel_state[upload_channel] = peer_info::bw_torrent; - t->request_bandwidth(upload_channel, self() - , m_send_buffer.size(), priority); + int priority = 1 + is_interesting() * 2 + m_requests_in_buffer.size(); + // peers that we are not interested in are non-prioritized + m_channel_state[upload_channel] = peer_info::bw_limit; + m_ses.m_upload_rate.request_bandwidth(self() + , m_send_buffer.size(), priority + , &m_ses.m_upload_channel + , &t->m_bandwidth_channel[upload_channel] + , &m_bandwidth_channel[upload_channel]); #ifdef TORRENT_VERBOSE_LOGGING - (*m_logger) << time_now_string() << " *** REQUEST_BANDWIDTH [ upload prio: " - << priority << "]\n"; + (*m_logger) << time_now_string() << " *** REQUEST_BANDWIDTH [ " + "upload: " << m_send_buffer.size() + << " prio: " << priority << "]\n"; #endif - } return; } @@ -3552,7 +3585,7 @@ namespace libtorrent if (m_send_buffer.empty()) { (*m_logger) << time_now_string() << " *** SEND BUFFER DEPLETED [" - " quota: " << m_bandwidth_limit[upload_channel].quota_left() << + " quota: " << m_quota[upload_channel] << " ignore: " << (m_ignore_bandwidth_limits?"yes":"no") << " buf: " << m_send_buffer.size() << " connecting: " << (m_connecting?"yes":"no") << @@ -3561,7 +3594,7 @@ namespace libtorrent else { (*m_logger) << time_now_string() << " *** CANNOT WRITE [" - " quota: " << m_bandwidth_limit[download_channel].quota_left() << + " quota: " << m_quota[upload_channel] << " ignore: " << (m_ignore_bandwidth_limits?"yes":"no") << " buf: " << m_send_buffer.size() << " connecting: " << (m_connecting?"yes":"no") << @@ -3575,7 +3608,7 @@ namespace libtorrent if (!m_send_buffer.empty()) { int amount_to_send = m_send_buffer.size(); - int quota_left = m_bandwidth_limit[upload_channel].quota_left(); + int quota_left = m_quota[upload_channel]; if (!m_ignore_bandwidth_limits && amount_to_send > quota_left) amount_to_send = quota_left; @@ -3601,21 +3634,23 @@ namespace libtorrent shared_ptr t = m_torrent.lock(); - if (m_bandwidth_limit[download_channel].quota_left() == 0 + if (m_quota[download_channel] == 0 && !m_connecting && t && !m_ignore_bandwidth_limits) { - if (m_bandwidth_limit[download_channel].max_assignable() > 0) - { #ifdef TORRENT_VERBOSE_LOGGING - (*m_logger) << time_now_string() << " *** REQUEST_BANDWIDTH [ download ]\n"; + (*m_logger) << time_now_string() << " *** REQUEST_BANDWIDTH [ " + "download: " << (m_download_queue.size() * 16 * 1024 + 30) + << " prio: " << m_priority << " ]\n"; #endif - TORRENT_ASSERT(m_channel_state[download_channel] == peer_info::bw_idle); - m_channel_state[download_channel] = peer_info::bw_torrent; - t->request_bandwidth(download_channel, self() - , m_download_queue.size() * 16 * 1024 + 30, m_priority); - } + TORRENT_ASSERT(m_channel_state[download_channel] == peer_info::bw_idle); + m_channel_state[download_channel] = peer_info::bw_limit; + m_ses.m_download_rate.request_bandwidth(self() + , m_outstanding_bytes + 30, m_priority + , &m_ses.m_download_channel + , &t->m_bandwidth_channel[download_channel] + , &m_bandwidth_channel[download_channel]); return; } @@ -3623,7 +3658,7 @@ namespace libtorrent { #ifdef TORRENT_VERBOSE_LOGGING (*m_logger) << time_now_string() << " *** CANNOT READ [" - " quota: " << m_bandwidth_limit[download_channel].quota_left() << + " quota: " << m_quota[download_channel] << " ignore: " << (m_ignore_bandwidth_limits?"yes":"no") << " outstanding: " << m_outstanding_writing_bytes << " outstanding-limit: " << m_ses.settings().max_outstanding_disk_bytes_per_connection << @@ -3637,7 +3672,7 @@ namespace libtorrent if (m_recv_pos >= m_soft_packet_size) m_soft_packet_size = 0; if (m_soft_packet_size && max_receive > m_soft_packet_size - m_recv_pos) max_receive = m_soft_packet_size - m_recv_pos; - int quota_left = m_bandwidth_limit[download_channel].quota_left(); + int quota_left = m_quota[download_channel]; if (!m_ignore_bandwidth_limits && max_receive > quota_left) max_receive = quota_left; @@ -3862,7 +3897,10 @@ namespace libtorrent #endif // correct the dl quota usage, if not all of the buffer was actually read if (!m_ignore_bandwidth_limits) - m_bandwidth_limit[download_channel].use_quota(bytes_transferred); + { + TORRENT_ASSERT(bytes_transferred <= m_quota[download_channel]); + m_quota[download_channel] -= bytes_transferred; + } if (m_disconnecting) { @@ -3904,7 +3942,7 @@ namespace libtorrent max_receive = m_packet_size - m_recv_pos; if (m_soft_packet_size && max_receive > m_soft_packet_size - m_recv_pos) max_receive = m_soft_packet_size - m_recv_pos; - int quota_left = m_bandwidth_limit[download_channel].quota_left(); + int quota_left = m_quota[download_channel]; if (!m_ignore_bandwidth_limits && max_receive > quota_left) max_receive = quota_left; @@ -3968,14 +4006,14 @@ namespace libtorrent // if we have requests or pending data to be sent or announcements to be made // we want to send data return !m_send_buffer.empty() - && (m_bandwidth_limit[upload_channel].quota_left() > 0 + && (m_quota[upload_channel] > 0 || m_ignore_bandwidth_limits) && !m_connecting; } bool peer_connection::can_read() const { - bool ret = (m_bandwidth_limit[download_channel].quota_left() > 0 + bool ret = (m_quota[download_channel] > 0 || m_ignore_bandwidth_limits) && !m_connecting && m_outstanding_writing_bytes < @@ -4159,7 +4197,10 @@ namespace libtorrent m_channel_state[upload_channel] = peer_info::bw_idle; if (!m_ignore_bandwidth_limits) - m_bandwidth_limit[upload_channel].use_quota(bytes_transferred); + { + TORRENT_ASSERT(bytes_transferred <= m_quota[upload_channel]); + m_quota[upload_channel] -= bytes_transferred; + } m_statistics.trancieve_ip_packet(bytes_transferred, m_remote.address().is_v6()); @@ -4205,6 +4246,9 @@ namespace libtorrent { TORRENT_ASSERT(bool(m_disk_recv_buffer) == (m_disk_recv_buffer_size > 0)); + TORRENT_ASSERT(m_upload_limit >= 0); + TORRENT_ASSERT(m_download_limit >= 0); + boost::shared_ptr t = m_torrent.lock(); if (m_disconnecting) { @@ -4251,24 +4295,10 @@ namespace libtorrent TORRENT_ASSERT(m_outstanding_bytes == outstanding_bytes); } -/* - // this assertion correct most of the time, but sometimes right when the - // limit is changed it might break - for (int i = 0; i < 2; ++i) - { - // this peer is in the bandwidth history iff max_assignable < limit - TORRENT_ASSERT((m_bandwidth_limit[i].max_assignable() < m_bandwidth_limit[i].throttle()) - == m_ses.m_bandwidth_manager[i]->is_in_history(this) - || m_bandwidth_limit[i].throttle() == bandwidth_limit::inf); - } -*/ - - if (m_channel_state[download_channel] == peer_info::bw_torrent - || m_channel_state[download_channel] == peer_info::bw_global) - TORRENT_ASSERT(m_bandwidth_limit[download_channel].quota_left() == 0); - if (m_channel_state[upload_channel] == peer_info::bw_torrent - || m_channel_state[upload_channel] == peer_info::bw_global) - TORRENT_ASSERT(m_bandwidth_limit[upload_channel].quota_left() == 0); + if (m_channel_state[download_channel] == peer_info::bw_limit) + TORRENT_ASSERT(m_quota[download_channel] == 0); + if (m_channel_state[upload_channel] == peer_info::bw_limit) + TORRENT_ASSERT(m_quota[upload_channel] == 0); std::set unique; std::transform(m_download_queue.begin(), m_download_queue.end() diff --git a/src/session_impl.cpp b/src/session_impl.cpp index 10be683da..e73a35e0d 100644 --- a/src/session_impl.cpp +++ b/src/session_impl.cpp @@ -157,11 +157,11 @@ namespace aux { , m_io_service() , m_disk_thread(m_io_service) , m_half_open(m_io_service) - , m_download_channel(m_io_service, peer_connection::download_channel) + , m_download_rate(peer_connection::download_channel) #ifdef TORRENT_VERBOSE_BANDWIDTH_LIMIT - , m_upload_channel(m_io_service, peer_connection::upload_channel, true) + , m_upload_rate(peer_connection::upload_channel, true) #else - , m_upload_channel(m_io_service, peer_connection::upload_channel) + , m_upload_rate(peer_connection::upload_channel) #endif , m_tracker_manager(*this, m_tracker_proxy) , m_listen_port_retries(listen_port_range.second - listen_port_range.first) @@ -178,6 +178,7 @@ namespace aux { , m_auto_scrape_time_scaler(180) , m_incoming_connection(false) , m_last_tick(time_now()) + , m_last_second_tick(m_last_tick) , m_last_choke(m_last_tick) #ifndef TORRENT_DISABLE_DHT , m_dht_same_port(true) @@ -222,8 +223,8 @@ namespace aux { } #endif - m_bandwidth_manager[peer_connection::download_channel] = &m_download_channel; - m_bandwidth_manager[peer_connection::upload_channel] = &m_upload_channel; + m_bandwidth_channel[peer_connection::download_channel] = &m_download_channel; + m_bandwidth_channel[peer_connection::upload_channel] = &m_upload_channel; #ifdef TORRENT_UPNP_LOGGING m_upnp_log.open("upnp.log", std::ios::in | std::ios::out | std::ios::trunc); @@ -278,9 +279,9 @@ namespace aux { *i = printable[rand() % (sizeof(printable)-1)]; } - m_timer.expires_from_now(seconds(1), ec); + m_timer.expires_from_now(milliseconds(100), ec); m_timer.async_wait( - bind(&session_impl::second_tick, this, _1)); + bind(&session_impl::on_tick, this, _1)); m_thread.reset(new boost::thread(boost::ref(*this))); } @@ -554,8 +555,8 @@ namespace aux { (*m_logger) << time_now_string() << " shutting down connection queue\n"; #endif - m_download_channel.close(); - m_upload_channel.close(); + m_download_rate.close(); + m_upload_rate.close(); } void session_impl::set_port_filter(port_filter const& f) @@ -1086,7 +1087,7 @@ namespace aux { return port; } - void session_impl::second_tick(error_code const& e) + void session_impl::on_tick(error_code const& e) { session_impl::mutex_t::scoped_lock l(m_mutex); @@ -1105,13 +1106,22 @@ namespace aux { } ptime now = time_now(); - float tick_interval = total_microseconds(now - m_last_tick) / 1000000.f; - m_last_tick = now; error_code ec; - m_timer.expires_at(now + seconds(1), ec); + m_timer.expires_at(now + milliseconds(100), ec); m_timer.async_wait( - bind(&session_impl::second_tick, this, _1)); + bind(&session_impl::on_tick, this, _1)); + + m_download_rate.update_quotas(now - m_last_tick); + m_upload_rate.update_quotas(now - m_last_tick); + + m_last_tick = now; + + // only tick the rest once every + if (now - m_last_second_tick < seconds(1)) return; + + float tick_interval = total_microseconds(now - m_last_second_tick) / 1000000.f; + m_last_second_tick = now; #ifdef TORRENT_STATS ++m_second_counter; @@ -1198,7 +1208,7 @@ namespace aux { { torrent& t = *i->second; TORRENT_ASSERT(!t.is_aborted()); - if (t.bandwidth_queue_size(peer_connection::upload_channel)) + if (t.statistics().upload_rate() > t.upload_limit() * 0.9f) ++congested_torrents; else ++uncongested_torrents; @@ -1237,31 +1247,31 @@ namespace aux { m_stat.received_dht_bytes(dht_down); } - // drain the IP overhead from the bandwidth limiters if (m_settings.rate_limit_ip_overhead) { - m_download_channel.drain( - m_stat.download_ip_overhead() - + m_stat.download_dht() + m_download_channel.use_quota(m_stat.download_dht() + m_stat.download_tracker()); - if (m_stat.download_ip_overhead() >= m_download_channel.throttle() - && m_alerts.should_post()) + m_upload_channel.use_quota(m_stat.upload_dht() + + m_stat.upload_tracker()); + + int up_limit = m_upload_channel.throttle(); + int down_limit = m_download_channel.throttle(); + + if (down_limit > 0 + && m_stat.download_ip_overhead() >= down_limit + && m_alerts.should_post()) { m_alerts.post_alert(performance_alert(torrent_handle() - , performance_alert::download_limit_too_low)); + , performance_alert::download_limit_too_low)); } - m_upload_channel.drain( - m_stat.upload_ip_overhead() - + m_stat.upload_dht() - + m_stat.upload_tracker()); - - if (m_stat.upload_ip_overhead() >= m_upload_channel.throttle() - && m_alerts.should_post()) + if (up_limit > 0 + && m_stat.upload_ip_overhead() >= up_limit + && m_alerts.should_post()) { m_alerts.post_alert(performance_alert(torrent_handle() - , performance_alert::upload_limit_too_low)); + , performance_alert::upload_limit_too_low)); } } @@ -1663,7 +1673,7 @@ namespace aux { // assume a reasonable rate is 3 kB/s, unless there's an upload limit and // a max number of slots, in which case we assume each upload slot gets // roughly the same amount of bandwidth - if (m_upload_channel.throttle() != bandwidth_limit::inf && m_max_uploads > 0) + if (m_upload_channel.throttle() != bandwidth_channel::inf && m_max_uploads > 0) rate = (std::max)(m_upload_channel.throttle() / m_max_uploads, 1); // the time it takes to download one piece at this rate (in seconds) @@ -1752,10 +1762,10 @@ namespace aux { , bind(&peer_connection::reset_choke_counters, _1)); // auto unchoke - int upload_limit = m_bandwidth_manager[peer_connection::upload_channel]->throttle(); + int upload_limit = m_bandwidth_channel[peer_connection::upload_channel]->throttle(); if (!m_settings.auto_upload_slots_rate_based && m_settings.auto_upload_slots - && upload_limit != bandwidth_limit::inf) + && upload_limit > 0) { // if our current upload rate is less than 90% of our // limit AND most torrents are not "congested", i.e. @@ -1764,11 +1774,11 @@ namespace aux { if (m_stat.upload_rate() < upload_limit * 0.9f && m_allowed_upload_slots <= m_num_unchoked + 1 && congested_torrents < uncongested_torrents - && m_upload_channel.queue_size() < 2) + && m_upload_rate.queue_size() < 2) { ++m_allowed_upload_slots; } - else if (m_upload_channel.queue_size() > 1 + else if (m_upload_rate.queue_size() > 1 && m_allowed_upload_slots > m_max_uploads) { --m_allowed_upload_slots; @@ -2235,11 +2245,11 @@ namespace aux { s.total_redundant_bytes = m_total_redundant_bytes; s.total_failed_bytes = m_total_failed_bytes; - s.up_bandwidth_queue = m_upload_channel.queue_size(); - s.down_bandwidth_queue = m_download_channel.queue_size(); + s.up_bandwidth_queue = m_upload_rate.queue_size(); + s.down_bandwidth_queue = m_download_rate.queue_size(); - s.up_bandwidth_bytes_queue = m_upload_channel.queued_bytes(); - s.down_bandwidth_bytes_queue = m_download_channel.queued_bytes(); + s.up_bandwidth_bytes_queue = m_upload_rate.queued_bytes(); + s.down_bandwidth_bytes_queue = m_download_rate.queued_bytes(); s.has_incoming_connections = m_incoming_connection; @@ -2543,8 +2553,8 @@ namespace aux { INVARIANT_CHECK; - if (bytes_per_second <= 0) bytes_per_second = bandwidth_limit::inf; - m_bandwidth_manager[peer_connection::download_channel]->throttle(bytes_per_second); + if (bytes_per_second <= 0) bytes_per_second = bandwidth_channel::inf; + m_bandwidth_channel[peer_connection::download_channel]->throttle(bytes_per_second); } void session_impl::set_upload_rate_limit(int bytes_per_second) @@ -2553,8 +2563,8 @@ namespace aux { INVARIANT_CHECK; - if (bytes_per_second <= 0) bytes_per_second = bandwidth_limit::inf; - m_bandwidth_manager[peer_connection::upload_channel]->throttle(bytes_per_second); + if (bytes_per_second <= 0) bytes_per_second = bandwidth_channel::inf; + m_bandwidth_channel[peer_connection::upload_channel]->throttle(bytes_per_second); } void session_impl::set_alert_dispatch(boost::function const& fun) @@ -2598,14 +2608,14 @@ namespace aux { INVARIANT_CHECK; - int ret = m_bandwidth_manager[peer_connection::upload_channel]->throttle(); + int ret = m_bandwidth_channel[peer_connection::upload_channel]->throttle(); return ret == (std::numeric_limits::max)() ? -1 : ret; } int session_impl::download_rate_limit() const { mutex_t::scoped_lock l(m_mutex); - int ret = m_bandwidth_manager[peer_connection::download_channel]->throttle(); + int ret = m_bandwidth_channel[peer_connection::download_channel]->throttle(); return ret == (std::numeric_limits::max)() ? -1 : ret; } diff --git a/src/torrent.cpp b/src/torrent.cpp index 5e6631029..d25cb88ea 100644 --- a/src/torrent.cpp +++ b/src/torrent.cpp @@ -2598,18 +2598,6 @@ namespace libtorrent p->set_peer_info(0); TORRENT_ASSERT(i != m_connections.end()); m_connections.erase(i); - - // remove from bandwidth request-queue - for (int c = 0; c < 2; ++c) - { - for (queue_t::iterator i = m_bandwidth_queue[c].begin() - , end(m_bandwidth_queue[c].end()); i != end; ++i) - { - if (i->peer != p) continue; - m_bandwidth_queue[c].erase(i); - break; - } - } } void torrent::connect_to_url_seed(web_seed_entry const& web) @@ -3823,92 +3811,7 @@ namespace libtorrent int torrent::bandwidth_throttle(int channel) const { - return m_bandwidth_limit[channel].throttle(); - } - - int torrent::bandwidth_queue_size(int channel) const - { - return (int)m_bandwidth_queue[channel].size(); - } - - void torrent::request_bandwidth(int channel - , boost::intrusive_ptr const& p - , int max_block_size, int priority) - { - TORRENT_ASSERT(max_block_size > 0); - TORRENT_ASSERT(m_bandwidth_limit[channel].throttle() > 0); - TORRENT_ASSERT(p->max_assignable_bandwidth(channel) > 0); - TORRENT_ASSERT(p->m_channel_state[channel] == peer_info::bw_torrent); - int block_size = (std::min)(m_bandwidth_limit[channel].throttle() / 10 - , max_block_size); - if (block_size <= 0) block_size = 1; - - if (m_bandwidth_limit[channel].max_assignable() > 0) - { - perform_bandwidth_request(channel, p, block_size, priority); - } - else - { - // skip forward in the queue until we find a prioritized peer - // or hit the front of it. - queue_t::reverse_iterator i = m_bandwidth_queue[channel].rbegin(); - while (i != m_bandwidth_queue[channel].rend() && priority > i->priority) - { - ++i->priority; - ++i; - } - m_bandwidth_queue[channel].insert(i.base(), bw_queue_entry( - p, block_size, priority)); - } - } - - void torrent::expire_bandwidth(int channel, int amount) - { - session_impl::mutex_t::scoped_lock l(m_ses.m_mutex); - - INVARIANT_CHECK; - - TORRENT_ASSERT(amount > 0); - m_bandwidth_limit[channel].expire(amount); - queue_t tmp; - while (!m_bandwidth_queue[channel].empty()) - { - bw_queue_entry qe = m_bandwidth_queue[channel].front(); - if (m_bandwidth_limit[channel].max_assignable() == 0) - break; - m_bandwidth_queue[channel].pop_front(); - if (qe.peer->max_assignable_bandwidth(channel) <= 0) - { - TORRENT_ASSERT(m_ses.m_bandwidth_manager[channel]->is_in_history(qe.peer.get())); - if (!qe.peer->is_disconnecting()) tmp.push_back(qe); - continue; - } - perform_bandwidth_request(channel, qe.peer - , qe.max_block_size, qe.priority); - } - m_bandwidth_queue[channel].insert(m_bandwidth_queue[channel].begin(), tmp.begin(), tmp.end()); - } - - void torrent::perform_bandwidth_request(int channel - , boost::intrusive_ptr const& p - , int block_size - , int priority) - { - TORRENT_ASSERT(p->m_channel_state[channel] == peer_info::bw_torrent); - p->m_channel_state[channel] = peer_info::bw_global; - m_ses.m_bandwidth_manager[channel]->request_bandwidth(p - , block_size, priority); - m_bandwidth_limit[channel].assign(block_size); - } - - void torrent::assign_bandwidth(int channel, int amount, int blk) - { - session_impl::mutex_t::scoped_lock l(m_ses.m_mutex); - - TORRENT_ASSERT(amount > 0); - TORRENT_ASSERT(amount <= blk); - if (amount < blk) - expire_bandwidth(channel, blk - amount); + return m_bandwidth_channel[channel].throttle(); } // called when torrent is finished (all interesting @@ -4215,19 +4118,6 @@ namespace libtorrent TORRENT_ASSERT(m_resume_entry.type() == lazy_entry::dict_t || m_resume_entry.type() == lazy_entry::none_t); - TORRENT_ASSERT(m_bandwidth_queue[0].size() <= m_connections.size()); - TORRENT_ASSERT(m_bandwidth_queue[1].size() <= m_connections.size()); - - for (int c = 0; c < 2; ++c) - { - queue_t::const_iterator j = m_bandwidth_queue[c].begin(); - if (j == m_bandwidth_queue[c].end()) continue; - ++j; - for (queue_t::const_iterator i = m_bandwidth_queue[c].begin() - , end(m_bandwidth_queue[c].end()); i != end && j != end; ++i, ++j) - TORRENT_ASSERT(i->priority >= j->priority); - } - int num_uploads = 0; std::map num_requests; for (const_peer_iterator i = begin(); i != end(); ++i) @@ -4437,14 +4327,14 @@ namespace libtorrent void torrent::set_upload_limit(int limit) { TORRENT_ASSERT(limit >= -1); - if (limit <= 0) limit = (std::numeric_limits::max)(); + if (limit <= 0) limit = 0; if (limit < num_peers() * 10) limit = num_peers() * 10; - m_bandwidth_limit[peer_connection::upload_channel].throttle(limit); + m_bandwidth_channel[peer_connection::upload_channel].throttle(limit); } int torrent::upload_limit() const { - int limit = m_bandwidth_limit[peer_connection::upload_channel].throttle(); + int limit = m_bandwidth_channel[peer_connection::upload_channel].throttle(); if (limit == (std::numeric_limits::max)()) limit = -1; return limit; } @@ -4452,14 +4342,14 @@ namespace libtorrent void torrent::set_download_limit(int limit) { TORRENT_ASSERT(limit >= -1); - if (limit <= 0) limit = (std::numeric_limits::max)(); + if (limit <= 0) limit = 0; if (limit < num_peers() * 10) limit = num_peers() * 10; - m_bandwidth_limit[peer_connection::download_channel].throttle(limit); + m_bandwidth_channel[peer_connection::download_channel].throttle(limit); } int torrent::download_limit() const { - int limit = m_bandwidth_limit[peer_connection::download_channel].throttle(); + int limit = m_bandwidth_channel[peer_connection::download_channel].throttle(); if (limit == (std::numeric_limits::max)()) limit = -1; return limit; } @@ -4848,6 +4738,28 @@ namespace libtorrent return; } + if (m_settings.rate_limit_ip_overhead) + { + int up_limit = m_bandwidth_channel[peer_connection::upload_channel].throttle(); + int down_limit = m_bandwidth_channel[peer_connection::download_channel].throttle(); + + if (down_limit > 0 + && m_stat.download_ip_overhead() >= down_limit + && alerts().should_post()) + { + alerts().post_alert(performance_alert(get_handle() + , performance_alert::download_limit_too_low)); + } + + if (up_limit > 0 + && m_stat.upload_ip_overhead() >= up_limit + && alerts().should_post()) + { + alerts().post_alert(performance_alert(get_handle() + , performance_alert::upload_limit_too_low)); + } + } + time_duration since_last_tick = microsec(tick_interval * 1000000L); if (is_seed()) m_seeding_time += since_last_tick; m_active_time += since_last_tick; @@ -5340,8 +5252,8 @@ namespace libtorrent { st.last_scrape = total_seconds(now - m_last_scrape); } - st.up_bandwidth_queue = (int)m_bandwidth_queue[peer_connection::upload_channel].size(); - st.down_bandwidth_queue = (int)m_bandwidth_queue[peer_connection::download_channel].size(); + st.up_bandwidth_queue = 0; + st.down_bandwidth_queue = 0; st.num_peers = (int)std::count_if(m_connections.begin(), m_connections.end() , !boost::bind(&peer_connection::is_connecting, _1)); diff --git a/src/web_peer_connection.cpp b/src/web_peer_connection.cpp index 9a63229e0..cabdf9323 100644 --- a/src/web_peer_connection.cpp +++ b/src/web_peer_connection.cpp @@ -76,7 +76,7 @@ namespace libtorrent set_upload_only(true); // we only want left-over bandwidth - set_priority(0); + set_priority(1); shared_ptr tor = t.lock(); TORRENT_ASSERT(tor); int blocks_per_piece = tor->torrent_file().piece_length() / tor->block_size(); diff --git a/test/test_bandwidth_limiter.cpp b/test/test_bandwidth_limiter.cpp index 190f86b8e..2202c99a1 100644 --- a/test/test_bandwidth_limiter.cpp +++ b/test/test_bandwidth_limiter.cpp @@ -47,320 +47,112 @@ struct peer_connection; using namespace libtorrent; -const float sample_time = 6.f; // seconds +const float sample_time = 20.f; // seconds //#define VERBOSE_LOGGING +bandwidth_channel global_bwc; struct peer_connection: intrusive_ptr_base { - typedef torrent torrent_type; - - peer_connection(io_service& ios, boost::shared_ptr const& t - , int prio, bool ignore_limits, std::string name); - - bool ignore_bandwidth_limits() { return m_ignore_limits; } - int max_assignable_bandwidth(int channel) const - { return m_bandwidth_limit[channel].max_assignable(); } - boost::weak_ptr associated_torrent() const - { return m_torrent; } - bool is_disconnecting() const { return m_abort; } - void assign_bandwidth(int channel, int amount); - void on_transfer(int channel, int amount); - void start(); - void stop() { m_abort = true; } - void expire_bandwidth(int channel, int amount); - void tick(); - - int bandwidth_throttle(int channel) const - { return m_bandwidth_limit[channel].throttle(); } - - void throttle(int limit) { m_bandwidth_limit[0].throttle(limit); } - - bandwidth_limit m_bandwidth_limit[1]; - boost::weak_ptr m_torrent; - int m_priority; - bool m_ignore_limits; - bool m_abort; - libtorrent::stat m_stats; - io_service& m_ios; - std::string m_name; - bool m_writing; -}; - -struct torrent -{ - torrent(bandwidth_manager& m) - : m_bandwidth_manager(m) + peer_connection(bandwidth_manager& bwm + , bandwidth_channel& torrent_bwc, int prio, bool ignore_limits, std::string name) + : m_bwm(bwm) + , m_torrent_bandwidth_channel(torrent_bwc) + , m_priority(prio) + , m_ignore_limits(ignore_limits) + , m_name(name) + , m_quota(0) {} - void assign_bandwidth(int channel, int amount, int max_block_size) - { -#ifdef VERBOSE_LOGGING - std::cerr << time_now_string() - << ": assign bandwidth, " << amount << " blk: " << max_block_size << std::endl; -#endif - TEST_CHECK(amount > 0); - TEST_CHECK(amount <= max_block_size); - if (amount < max_block_size) - expire_bandwidth(channel, max_block_size - amount); - } + bool is_disconnecting() const { return false; } + bool ignore_bandwidth_limits() { return m_ignore_limits; } + void assign_bandwidth(int channel, int amount); - int bandwidth_throttle(int channel) const - { return m_bandwidth_limit[channel].throttle(); } + void throttle(int limit) { m_bandwidth_channel.throttle(limit); } - int max_assignable_bandwidth(int channel) const - { return m_bandwidth_limit[channel].max_assignable(); } + void start(); - void request_bandwidth(int channel - , boost::intrusive_ptr const& p - , int max_block_size - , int priority) - { - TORRENT_ASSERT(max_block_size > 0); - TORRENT_ASSERT(m_bandwidth_limit[channel].throttle() > 0); - int block_size = (std::min)(m_bandwidth_limit[channel].throttle() / 10 - , max_block_size); - if (block_size <= 0) block_size = 1; + bandwidth_manager& m_bwm; - if (m_bandwidth_limit[channel].max_assignable() > 0) - { -#ifdef VERBOSE_LOGGING - std::cerr << time_now_string() - << ": request bandwidth " << block_size << std::endl; -#endif - perform_bandwidth_request(channel, p, block_size, priority); - } - else - { -#ifdef VERBOSE_LOGGING - std::cerr << time_now_string() - << ": queue bandwidth request" << block_size << std::endl; -#endif - // skip forward in the queue until we find a prioritized peer - // or hit the front of it. - queue_t::reverse_iterator i = m_bandwidth_queue[channel].rbegin(); - while (i != m_bandwidth_queue[channel].rend() && priority > i->priority) - { - ++i->priority; - ++i; - } - m_bandwidth_queue[channel].insert(i.base(), bw_queue_entry( - p, block_size, priority)); - } - } + bandwidth_channel m_bandwidth_channel; + bandwidth_channel& m_torrent_bandwidth_channel; - void expire_bandwidth(int channel, int amount); - - void perform_bandwidth_request(int channel - , boost::intrusive_ptr const& p - , int block_size - , int priority) - { - m_bandwidth_manager.request_bandwidth(p - , block_size, priority); - m_bandwidth_limit[channel].assign(block_size); - } - bandwidth_limit m_bandwidth_limit[1]; - typedef std::deque > queue_t; - queue_t m_bandwidth_queue[1]; - bandwidth_manager& m_bandwidth_manager; + int m_priority; + bool m_ignore_limits; + std::string m_name; + int m_quota; }; -peer_connection::peer_connection(io_service& ios, boost::shared_ptr const& t - , int prio, bool ignore_limits, std::string name) - : m_torrent(t) - , m_priority(prio) - , m_ignore_limits(ignore_limits) - , m_abort(false) - , m_ios(ios) - , m_name(name) - , m_writing(false) -{} - void peer_connection::assign_bandwidth(int channel, int amount) { - TEST_CHECK(m_writing); + m_quota += amount; #ifdef VERBOSE_LOGGING - std::cerr << time_now_string() << ": [" << m_name + std::cerr << " [" << m_name << "] assign bandwidth, " << amount << std::endl; #endif TEST_CHECK(amount > 0); - m_bandwidth_limit[channel].assign(amount); - m_ios.post(boost::bind(&peer_connection::on_transfer, self(), channel, amount)); -} - -void peer_connection::on_transfer(int channel, int amount) -{ - TEST_CHECK(m_writing); - m_writing = false; - m_stats.sent_bytes(amount, 0); - - boost::shared_ptr t = m_torrent.lock(); - if (!t) return; - if (m_bandwidth_limit[channel].max_assignable() > 0) - { - m_writing = true; - t->request_bandwidth(0, this, 32 * 1024, m_priority); - } + start(); } void peer_connection::start() { - boost::shared_ptr t = m_torrent.lock(); - if (!t) return; - m_writing = true; - t->request_bandwidth(0, this, 32 * 1024, m_priority); + m_bwm.request_bandwidth(self(), 150000, m_priority + , &m_bandwidth_channel + , &m_torrent_bandwidth_channel + , &global_bwc); } -void peer_connection::expire_bandwidth(int channel, int amount) -{ - TEST_CHECK(amount > 0); -#ifdef VERBOSE_LOGGING - std::cerr << time_now_string() << ": [" << m_name - << "] expire bandwidth, " << amount << std::endl; -#endif - m_bandwidth_limit[channel].expire(amount); - - if (!m_writing && m_bandwidth_limit[channel].max_assignable() > 0) - { - boost::shared_ptr t = m_torrent.lock(); - if (!t) return; - m_writing = true; - t->request_bandwidth(0, this, 32 * 1024, m_priority); - } -} - -void peer_connection::tick() -{ -#ifdef VERBOSE_LOGGING - std::cerr << time_now_string() << ": [" << m_name - << "] tick, rate: " << m_stats.upload_rate() << std::endl; -#endif - m_stats.second_tick(1.f); -} - - -void torrent::expire_bandwidth(int channel, int amount) -{ -#ifdef VERBOSE_LOGGING - std::cerr << time_now_string() - << ": expire bandwidth, " << amount << std::endl; -#endif - TEST_CHECK(amount > 0); - m_bandwidth_limit[channel].expire(amount); - queue_t tmp; - while (!m_bandwidth_queue[channel].empty()) - { - bw_queue_entry qe = m_bandwidth_queue[channel].front(); - if (m_bandwidth_limit[channel].max_assignable() == 0) - break; - m_bandwidth_queue[channel].pop_front(); - if (qe.peer->max_assignable_bandwidth(channel) <= 0) - { - if (!qe.peer->is_disconnecting()) tmp.push_back(qe); - continue; - } - perform_bandwidth_request(channel, qe.peer - , qe.max_block_size, qe.priority); - } - m_bandwidth_queue[channel].insert(m_bandwidth_queue[channel].begin(), tmp.begin(), tmp.end()); -} typedef std::vector > connections_t; -bool abort_tick = false; - -void do_tick(error_code const&e, deadline_timer& tick, connections_t& v) +void do_change_rate(bandwidth_channel& t1, bandwidth_channel& t2, int limit) { - if (e || abort_tick) - { - std::cerr << " tick aborted" << std::endl; - return; - } - std::for_each(v.begin(), v.end() - , boost::bind(&peer_connection::tick, _1)); - error_code ec; - tick.expires_from_now(seconds(1), ec); - tick.async_wait(boost::bind(&do_tick, _1, boost::ref(tick), boost::ref(v))); -} - -void do_stop(deadline_timer& tick, connections_t& v) -{ - abort_tick = true; - error_code ec; - tick.cancel(ec); - std::for_each(v.begin(), v.end() - , boost::bind(&peer_connection::stop, _1)); - std::cerr << " stopping..." << std::endl; -} - -void do_change_rate(error_code const&e, deadline_timer& tick - , boost::shared_ptr t1 - , boost::shared_ptr t2 - , int limit - , int counter) -{ - TEST_CHECK(!e); - if (e) return; - + static int counter = 10; + --counter; if (counter == 0) { - t1->m_bandwidth_limit[0].throttle(limit); - t2->m_bandwidth_limit[0].throttle(limit); + t1.throttle(limit); + t2.throttle(limit); return; } - t1->m_bandwidth_limit[0].throttle(limit + limit / 2 * ((counter & 1)?-1:1)); - t2->m_bandwidth_limit[0].throttle(limit + limit / 2 * ((counter & 1)?1:-1)); - - error_code ec; - tick.expires_from_now(milliseconds(1600), ec); - tick.async_wait(boost::bind(&do_change_rate, _1, boost::ref(tick), t1, t2, limit, counter-1)); + t1.throttle(limit + limit / 2 * ((counter & 1)?-1:1)); + t2.throttle(limit + limit / 2 * ((counter & 1)?1:-1)); } -void do_change_peer_rate(error_code const&e, deadline_timer& tick - , connections_t& v - , int limit - , int counter) +void do_change_peer_rate(connections_t& v, int limit) { - TEST_CHECK(!e); - if (e) return; - - if (counter == 0) + static int count = 10; + --count; + if (count == 0) { std::for_each(v.begin(), v.end() , boost::bind(&peer_connection::throttle, _1, limit)); return; } - int c = counter; + int c = count; for (connections_t::iterator i = v.begin(); i != v.end(); ++i, ++c) i->get()->throttle(limit + limit / 2 * ((c & 1)?-1:1)); - - error_code ec; - tick.expires_from_now(milliseconds(1100), ec); - tick.async_wait(boost::bind(&do_change_peer_rate, _1, boost::ref(tick), boost::ref(v), limit, counter-1)); } -void run_test(io_service& ios, connections_t& v) +void nop() {} + +void run_test(connections_t& v + , bandwidth_manager& manager + , boost::function f = &nop) { - abort_tick = false; std::cerr << "-------------" << std::endl; - deadline_timer tick(ios); - error_code ec; - tick.expires_from_now(seconds(1), ec); - tick.async_wait(boost::bind(&do_tick, _1, boost::ref(tick), boost::ref(v))); - - deadline_timer complete(ios); - complete.expires_from_now(milliseconds(int(sample_time * 1000)), ec); - complete.async_wait(boost::bind(&do_stop, boost::ref(tick), boost::ref(v))); - std::for_each(v.begin(), v.end() , boost::bind(&peer_connection::start, _1)); - ios.run(ec); + for (int i = 0; i < int(sample_time * 10); ++i) + { + manager.update_quotas(milliseconds(100)); + if ((i % 15) == 0) f(); + } } bool close_to(float val, float comp, float err) @@ -368,12 +160,12 @@ bool close_to(float val, float comp, float err) return fabs(val - comp) <= err; } -void spawn_connections(connections_t& v, io_service& ios - , boost::shared_ptr t, int num, char const* prefix) +void spawn_connections(connections_t& v, bandwidth_manager& bwm + , bandwidth_channel& bwc, int num, char const* prefix) { for (int i = 0; i < num; ++i) { - v.push_back(new peer_connection(ios, t, 200, false + v.push_back(new peer_connection(bwm, bwc, 200, false , prefix + boost::lexical_cast(i))); } } @@ -381,26 +173,25 @@ void spawn_connections(connections_t& v, io_service& ios void test_equal_connections(int num, int limit) { std::cerr << "\ntest equal connections " << num << " " << limit << std::endl; - io_service ios; - bandwidth_manager manager(ios, 0); - manager.throttle(limit); + bandwidth_manager manager(0); + global_bwc.throttle(limit); - boost::shared_ptr t1(new torrent(manager)); + bandwidth_channel t1; connections_t v; - spawn_connections(v, ios, t1, num, "p"); - run_test(ios, v); + spawn_connections(v, manager, t1, num, "p"); + run_test(v, manager); float sum = 0.f; float err = (std::max)(limit / num * 0.3f, 1000.f); for (connections_t::iterator i = v.begin() , end(v.end()); i != end; ++i) { - sum += (*i)->m_stats.total_payload_upload(); + sum += (*i)->m_quota; - std::cerr << (*i)->m_stats.total_payload_upload() / sample_time + std::cerr << (*i)->m_quota / sample_time << " target: " << (limit / num) << " eps: " << err << std::endl; - TEST_CHECK(close_to((*i)->m_stats.total_payload_upload() / sample_time, limit / num, err)); + TEST_CHECK(close_to((*i)->m_quota / sample_time, limit / num, err)); } sum /= sample_time; std::cerr << "sum: " << sum << " target: " << limit << std::endl; @@ -414,24 +205,20 @@ void test_connections_variable_rate(int num, int limit, int torrent_limit) << " l: " << limit << " t: " << torrent_limit << std::endl; - io_service ios; - bandwidth_manager manager(ios, 0); + bandwidth_manager manager(0); + global_bwc.throttle(0); - boost::shared_ptr t1(new torrent(manager)); + bandwidth_channel t1; if (torrent_limit) - t1->m_bandwidth_limit[0].throttle(torrent_limit); + t1.throttle(torrent_limit); connections_t v; - spawn_connections(v, ios, t1, num, "p"); + spawn_connections(v, manager, t1, num, "p"); std::for_each(v.begin(), v.end() , boost::bind(&peer_connection::throttle, _1, limit)); - error_code ec; - deadline_timer change_rate(ios); - change_rate.expires_from_now(milliseconds(1600), ec); - change_rate.async_wait(boost::bind(&do_change_peer_rate, _1, boost::ref(change_rate) - , boost::ref(v), limit, 9)); - run_test(ios, v); + run_test(v, manager, boost::bind(&do_change_peer_rate + , boost::ref(v), limit)); if (torrent_limit > 0 && limit * num > torrent_limit) limit = torrent_limit / num; @@ -441,11 +228,11 @@ void test_connections_variable_rate(int num, int limit, int torrent_limit) for (connections_t::iterator i = v.begin() , end(v.end()); i != end; ++i) { - sum += (*i)->m_stats.total_payload_upload(); + sum += (*i)->m_quota; - std::cerr << (*i)->m_stats.total_payload_upload() / sample_time + std::cerr << (*i)->m_quota / sample_time << " target: " << limit << " eps: " << err << std::endl; - TEST_CHECK(close_to((*i)->m_stats.total_payload_upload() / sample_time, limit, err)); + TEST_CHECK(close_to((*i)->m_quota / sample_time, limit, err)); } sum /= sample_time; std::cerr << "sum: " << sum << " target: " << (limit * num) << std::endl; @@ -456,24 +243,24 @@ void test_connections_variable_rate(int num, int limit, int torrent_limit) void test_single_peer(int limit, bool torrent_limit) { std::cerr << "\ntest single peer " << limit << " " << torrent_limit << std::endl; - io_service ios; - bandwidth_manager manager(ios, 0); - boost::shared_ptr t1(new torrent(manager)); + bandwidth_manager manager(0); + bandwidth_channel t1; + global_bwc.throttle(0); if (torrent_limit) - t1->m_bandwidth_limit[0].throttle(limit); + t1.throttle(limit); else - manager.throttle(limit); + global_bwc.throttle(limit); connections_t v; - spawn_connections(v, ios, t1, 1, "p"); - run_test(ios, v); + spawn_connections(v, manager, t1, 1, "p"); + run_test(v, manager); float sum = 0.f; for (connections_t::iterator i = v.begin() , end(v.end()); i != end; ++i) { - sum += (*i)->m_stats.total_payload_upload(); + sum += (*i)->m_quota; } sum /= sample_time; std::cerr << sum << " target: " << limit << std::endl; @@ -487,25 +274,23 @@ void test_torrents(int num, int limit1, int limit2, int global_limit) << " l1: " << limit1 << " l2: " << limit2 << " g: " << global_limit << std::endl; - io_service ios; - bandwidth_manager manager(ios, 0); - if (global_limit > 0) - manager.throttle(global_limit); + bandwidth_manager manager(0); + global_bwc.throttle(global_limit); - boost::shared_ptr t1(new torrent(manager)); - boost::shared_ptr t2(new torrent(manager)); + bandwidth_channel t1; + bandwidth_channel t2; - t1->m_bandwidth_limit[0].throttle(limit1); - t2->m_bandwidth_limit[0].throttle(limit2); + t1.throttle(limit1); + t2.throttle(limit2); connections_t v1; - spawn_connections(v1, ios, t1, num, "t1p"); + spawn_connections(v1, manager, t1, num, "t1p"); connections_t v2; - spawn_connections(v2, ios, t2, num, "t2p"); + spawn_connections(v2, manager, t2, num, "t2p"); connections_t v; std::copy(v1.begin(), v1.end(), std::back_inserter(v)); std::copy(v2.begin(), v2.end(), std::back_inserter(v)); - run_test(ios, v); + run_test(v, manager); if (global_limit > 0 && global_limit < limit1 + limit2) { @@ -516,7 +301,7 @@ void test_torrents(int num, int limit1, int limit2, int global_limit) for (connections_t::iterator i = v1.begin() , end(v1.end()); i != end; ++i) { - sum += (*i)->m_stats.total_payload_upload(); + sum += (*i)->m_quota; } sum /= sample_time; std::cerr << sum << " target: " << limit1 << std::endl; @@ -527,7 +312,7 @@ void test_torrents(int num, int limit1, int limit2, int global_limit) for (connections_t::iterator i = v2.begin() , end(v2.end()); i != end; ++i) { - sum += (*i)->m_stats.total_payload_upload(); + sum += (*i)->m_quota; } sum /= sample_time; std::cerr << sum << " target: " << limit2 << std::endl; @@ -540,31 +325,24 @@ void test_torrents_variable_rate(int num, int limit, int global_limit) std::cerr << "\ntest torrents variable rate" << num << " l: " << limit << " g: " << global_limit << std::endl; - io_service ios; - bandwidth_manager manager(ios, 0); - if (global_limit > 0) - manager.throttle(global_limit); + bandwidth_manager manager(0); + global_bwc.throttle(global_limit); - boost::shared_ptr t1(new torrent(manager)); - boost::shared_ptr t2(new torrent(manager)); + bandwidth_channel t1; + bandwidth_channel t2; - t1->m_bandwidth_limit[0].throttle(limit); - t2->m_bandwidth_limit[0].throttle(limit); + t1.throttle(limit); + t2.throttle(limit); connections_t v1; - spawn_connections(v1, ios, t1, num, "t1p"); + spawn_connections(v1, manager, t1, num, "t1p"); connections_t v2; - spawn_connections(v2, ios, t2, num, "t2p"); + spawn_connections(v2, manager, t2, num, "t2p"); connections_t v; std::copy(v1.begin(), v1.end(), std::back_inserter(v)); std::copy(v2.begin(), v2.end(), std::back_inserter(v)); - error_code ec; - deadline_timer change_rate(ios); - change_rate.expires_from_now(milliseconds(1100), ec); - change_rate.async_wait(boost::bind(&do_change_rate, _1, boost::ref(change_rate), t1, t2, limit, 9)); - - run_test(ios, v); + run_test(v, manager, boost::bind(&do_change_rate, boost::ref(t1), boost::ref(t2), limit)); if (global_limit > 0 && global_limit < 2 * limit) limit = global_limit / 2; @@ -573,7 +351,7 @@ void test_torrents_variable_rate(int num, int limit, int global_limit) for (connections_t::iterator i = v1.begin() , end(v1.end()); i != end; ++i) { - sum += (*i)->m_stats.total_payload_upload(); + sum += (*i)->m_quota; } sum /= sample_time; std::cerr << sum << " target: " << limit << std::endl; @@ -584,7 +362,7 @@ void test_torrents_variable_rate(int num, int limit, int global_limit) for (connections_t::iterator i = v2.begin() , end(v2.end()); i != end; ++i) { - sum += (*i)->m_stats.total_payload_upload(); + sum += (*i)->m_quota; } sum /= sample_time; std::cerr << sum << " target: " << limit << std::endl; @@ -595,73 +373,74 @@ void test_torrents_variable_rate(int num, int limit, int global_limit) void test_peer_priority(int limit, bool torrent_limit) { std::cerr << "\ntest peer priority " << limit << " " << torrent_limit << std::endl; - io_service ios; - bandwidth_manager manager(ios, 0); - boost::shared_ptr t1(new torrent(manager)); + bandwidth_manager manager(0); + bandwidth_channel t1; + global_bwc.throttle(0); if (torrent_limit) - t1->m_bandwidth_limit[0].throttle(limit); + t1.throttle(limit); else - manager.throttle(limit); + global_bwc.throttle(limit); connections_t v1; - spawn_connections(v1, ios, t1, 10, "p"); + spawn_connections(v1, manager, t1, 10, "p"); connections_t v; std::copy(v1.begin(), v1.end(), std::back_inserter(v)); boost::intrusive_ptr p( - new peer_connection(ios, t1, 0, false, "no-priority")); + new peer_connection(manager, t1, 1, false, "no-priority")); v.push_back(p); - run_test(ios, v); + run_test(v, manager); float sum = 0.f; for (connections_t::iterator i = v1.begin() , end(v1.end()); i != end; ++i) { - sum += (*i)->m_stats.total_payload_upload(); + sum += (*i)->m_quota; } sum /= sample_time; std::cerr << sum << " target: " << limit << std::endl; TEST_CHECK(sum > 0); TEST_CHECK(close_to(sum, limit, 50)); - std::cerr << "non-prioritized rate: " << p->m_stats.total_payload_upload() / sample_time << std::endl; - TEST_CHECK(p->m_stats.total_payload_upload() / sample_time < 10); + std::cerr << "non-prioritized rate: " << p->m_quota / sample_time + << " target: " << (limit / 200 / 10) << std::endl; + TEST_CHECK(close_to(p->m_quota / sample_time, limit / 200 / 10, 5)); } void test_no_starvation(int limit) { std::cerr << "\ntest no starvation " << limit << std::endl; - io_service ios; - bandwidth_manager manager(ios, 0); - boost::shared_ptr t1(new torrent(manager)); - boost::shared_ptr t2(new torrent(manager)); + bandwidth_manager manager(0); + bandwidth_channel t1; + bandwidth_channel t2; - manager.throttle(limit); + global_bwc.throttle(limit); const int num_peers = 20; connections_t v1; - spawn_connections(v1, ios, t1, num_peers, "p"); + spawn_connections(v1, manager, t1, num_peers, "p"); connections_t v; std::copy(v1.begin(), v1.end(), std::back_inserter(v)); boost::intrusive_ptr p( - new peer_connection(ios, t2, 0, false, "no-priority")); + new peer_connection(manager, t2, 1, false, "no-priority")); v.push_back(p); - run_test(ios, v); + run_test(v, manager); float sum = 0.f; for (connections_t::iterator i = v.begin() , end(v.end()); i != end; ++i) { - sum += (*i)->m_stats.total_payload_upload(); + sum += (*i)->m_quota; } sum /= sample_time; std::cerr << sum << " target: " << limit << std::endl; TEST_CHECK(sum > 0); TEST_CHECK(close_to(sum, limit, 50)); - std::cerr << "non-prioritized rate: " << p->m_stats.total_payload_upload() / sample_time << std::endl; - TEST_CHECK(close_to(p->m_stats.total_payload_upload() / sample_time, limit / (num_peers + 1), 1000)); + std::cerr << "non-prioritized rate: " << p->m_quota / sample_time + << " target: " << (limit / 200 / num_peers) << std::endl; + TEST_CHECK(close_to(p->m_quota / sample_time, limit / 200 / num_peers, 5)); } int test_main()