From e714e1aeba00c737b78e96f0272dc62680da883a Mon Sep 17 00:00:00 2001 From: Arvid Norberg Date: Mon, 2 Jul 2007 23:48:06 +0000 Subject: [PATCH] improved bandwidth limiter and added a unit test for it --- Jamfile | 1 - include/libtorrent/aux_/session_impl.hpp | 6 +- include/libtorrent/bandwidth_manager.hpp | 301 ++++++++++++++--- include/libtorrent/peer_connection.hpp | 7 +- include/libtorrent/stat.hpp | 2 +- include/libtorrent/torrent.hpp | 18 +- src/bandwidth_manager.cpp | 249 -------------- src/peer_connection.cpp | 5 +- src/session_impl.cpp | 33 +- src/torrent.cpp | 66 ++-- test/Jamfile | 1 + test/setup_transfer.cpp | 2 +- test/test_bandwidth_limiter.cpp | 402 +++++++++++++++++++++++ 13 files changed, 740 insertions(+), 353 deletions(-) delete mode 100644 src/bandwidth_manager.cpp create mode 100644 test/test_bandwidth_limiter.cpp diff --git a/Jamfile b/Jamfile index 632fca572..d96e9e190 100755 --- a/Jamfile +++ b/Jamfile @@ -150,7 +150,6 @@ lib ws2_32 : : ws2_32 ; SOURCES = allocate_resources alert - bandwidth_manager connection_queue entry escape_string diff --git a/include/libtorrent/aux_/session_impl.hpp b/include/libtorrent/aux_/session_impl.hpp index c270d9ffc..207016898 100644 --- a/include/libtorrent/aux_/session_impl.hpp +++ b/include/libtorrent/aux_/session_impl.hpp @@ -350,8 +350,10 @@ namespace libtorrent // handing out bandwidth to connections that // asks for it, it can also throttle the // rate. - bandwidth_manager m_dl_bandwidth_manager; - bandwidth_manager m_ul_bandwidth_manager; + bandwidth_manager m_download_channel; + bandwidth_manager m_upload_channel; + + bandwidth_manager* m_bandwidth_manager[2]; tracker_manager m_tracker_manager; torrent_map m_torrents; diff --git a/include/libtorrent/bandwidth_manager.hpp b/include/libtorrent/bandwidth_manager.hpp index dbf93aa77..30f99d0cf 100644 --- a/include/libtorrent/bandwidth_manager.hpp +++ b/include/libtorrent/bandwidth_manager.hpp @@ -34,6 +34,7 @@ POSSIBILITY OF SUCH DAMAGE. #define TORRENT_BANDWIDTH_MANAGER_HPP_INCLUDED #include "libtorrent/socket.hpp" +#include "libtorrent/invariant_check.hpp" #include #include @@ -48,39 +49,39 @@ using boost::shared_ptr; using boost::intrusive_ptr; using boost::bind; -namespace libtorrent +namespace libtorrent { + +// the maximum block of bandwidth quota to +// hand out is 33kB. The block size may +// be smaller on lower limits +enum { - - class peer_connection; - class torrent; - - // the maximum block of bandwidth quota to - // hand out is 33kB. The block size may - // be smaller on lower limits - const int max_bandwidth_block_size = 33000; - const int min_bandwidth_block_size = 4000; - -#if defined TORRENT_LOGGING || defined TORRENT_VERBOSE_LOGGING - namespace aux - { - struct session_impl; - } -#endif - -struct history_entry -{ - history_entry(intrusive_ptr p, weak_ptr t - , int a, ptime exp); - ptime expires_at; - int amount; - intrusive_ptr peer; - weak_ptr tor; + max_bandwidth_block_size = 33000, + min_bandwidth_block_size = 400 }; +const time_duration bw_window_size = seconds(1); + +template +struct history_entry +{ + history_entry(intrusive_ptr p, weak_ptr t + , int a, ptime exp) + : expires_at(exp), amount(a), peer(p), tor(t) {} + ptime expires_at; + int amount; + intrusive_ptr peer; + weak_ptr tor; +}; + +template struct bw_queue_entry { - bw_queue_entry(boost::intrusive_ptr const& pe, bool no_prio); - boost::intrusive_ptr peer; + bw_queue_entry(boost::intrusive_ptr const& pe + , int blk, bool no_prio) + : peer(pe), max_block_size(blk), non_prioritized(no_prio) {} + boost::intrusive_ptr peer; + int max_block_size; bool non_prioritized; }; @@ -158,9 +159,25 @@ private: int m_current_rate; }; +template +T clamp(T val, T ceiling, T floor) +{ + assert(ceiling >= floor); + if (val >= ceiling) return ceiling; + else if (val <= floor) return floor; + return val; +} + +template struct bandwidth_manager { - bandwidth_manager(io_service& ios, int channel); + bandwidth_manager(io_service& ios, int channel) + : m_ios(ios) + , m_history_timer(m_ios) + , m_limit(bandwidth_limit::inf) + , m_current_quota(0) + , m_channel(channel) + {} void throttle(int limit) { @@ -178,21 +195,222 @@ struct bandwidth_manager // non prioritized means that, if there's a line for bandwidth, // others will cut in front of the non-prioritized peers. // this is used by web seeds - void request_bandwidth(intrusive_ptr peer - , bool non_prioritized); + void request_bandwidth(intrusive_ptr peer + , int blk + , bool non_prioritized) + { + INVARIANT_CHECK; + assert(blk > 0); + + assert(!peer->ignore_bandwidth_limits()); + + // make sure this peer isn't already in line + // waiting for bandwidth +#ifndef NDEBUG + for (typename queue_t::iterator i = m_queue.begin() + , end(m_queue.end()); i != end; ++i) + { + assert(i->peer < peer || peer < i->peer); + } +#endif + + assert(peer->max_assignable_bandwidth(m_channel) > 0); + boost::shared_ptr t = peer->associated_torrent().lock(); + m_queue.push_back(bw_queue_entry(peer, blk, non_prioritized)); + if (!non_prioritized) + { + typename queue_t::reverse_iterator i = m_queue.rbegin(); + typename queue_t::reverse_iterator j(i); + for (++j; j != m_queue.rend(); ++j) + { + // if the peer's torrent is not the same one + // continue looking for a peer from the same torrent + if (j->peer->associated_torrent().lock() != t) + continue; + // if we found a peer from the same torrent that + // is prioritized, there is no point looking + // any further. + if (!j->non_prioritized) break; + + using std::swap; + swap(*i, *j); + i = j; + } + } + + if (m_queue.size() == 1) hand_out_bandwidth(); + } #ifndef NDEBUG - void check_invariant() const; -#endif -#if defined TORRENT_LOGGING || defined TORRENT_VERBOSE_LOGGING - aux::session_impl* m_ses; + void check_invariant() const + { + int current_quota = 0; + for (typename history_t::const_iterator i + = m_history.begin(), end(m_history.end()); i != end; ++i) + { + current_quota += i->amount; + } + + assert(current_quota == m_current_quota); + } #endif private: - void add_history_entry(history_entry const& e); - void on_history_expire(asio::error_code const& e); - void hand_out_bandwidth(); + void add_history_entry(history_entry const& e) try + { + INVARIANT_CHECK; + m_history.push_front(e); + m_current_quota += e.amount; + // in case the size > 1 there is already a timer + // active that will be invoked, no need to set one up + if (m_history.size() > 1) return; + + m_history_timer.expires_at(e.expires_at); + m_history_timer.async_wait(bind(&bandwidth_manager::on_history_expire, this, _1)); + } + catch (std::exception&) { assert(false); } + + void on_history_expire(asio::error_code const& e) try + { + INVARIANT_CHECK; + + if (e) return; + + assert(!m_history.empty()); + + ptime now(time_now()); + while (!m_history.empty() && m_history.back().expires_at <= now) + { + history_entry e = m_history.back(); + m_history.pop_back(); + m_current_quota -= e.amount; + assert(m_current_quota >= 0); + intrusive_ptr c = e.peer; + shared_ptr t = e.tor.lock(); + if (!c->is_disconnecting()) c->expire_bandwidth(m_channel, e.amount); + if (t) t->expire_bandwidth(m_channel, e.amount); + } + + // now, wait for the next chunk to expire + if (!m_history.empty()) + { + m_history_timer.expires_at(m_history.back().expires_at); + m_history_timer.async_wait(bind(&bandwidth_manager::on_history_expire, this, _1)); + } + + // since some bandwidth just expired, it + // means we can hand out more (in case there + // are still consumers in line) + if (!m_queue.empty()) hand_out_bandwidth(); + } + catch (std::exception&) + { + assert(false); + }; + + void hand_out_bandwidth() try + { + INVARIANT_CHECK; + + ptime now(time_now()); + + mutex_t::scoped_lock l(m_mutex); + int limit = m_limit; + l.unlock(); + + // available bandwidth to hand out + int amount = limit - m_current_quota; + + while (!m_queue.empty() && amount > 0) + { + assert(amount == limit - m_current_quota); + bw_queue_entry qe = m_queue.front(); + m_queue.pop_front(); + + shared_ptr t = qe.peer->associated_torrent().lock(); + if (!t) continue; + if (qe.peer->is_disconnecting()) + { + t->expire_bandwidth(m_channel, qe.max_block_size); + continue; + } + + // at this point, max_assignable may actually be zero. Since + // the bandwidth quota is subtracted once the data has been + // sent. If the peer was added to the queue while the data was + // still being sent, max_assignable may have been > 0 at that time. + int max_assignable = (std::min)( + qe.peer->max_assignable_bandwidth(m_channel) + , t->max_assignable_bandwidth(m_channel)); + if (max_assignable == 0) + { + t->expire_bandwidth(m_channel, qe.max_block_size); + continue; + } + + // this is the limit of the block size. It depends on the throttle + // so that it can be closer to optimal. Larger block sizes will give lower + // granularity to the rate but will be more efficient. At high rates + // the block sizes are bigger and at low rates, the granularity + // is more important and block sizes are smaller + + // the minimum rate that can be given is the block size, so, the + // block size must be smaller for lower rates. This is because + // the history window is one second, and the block will be forgotten + // after one second. + int block_size = (std::min)(qe.max_block_size + , (std::min)(qe.peer->bandwidth_throttle(m_channel) + , m_limit / 10)); + + if (block_size < min_bandwidth_block_size) + { + block_size = min_bandwidth_block_size; + } + else if (block_size > max_bandwidth_block_size) + { + if (m_limit == bandwidth_limit::inf) + { + block_size = max_bandwidth_block_size; + } + else + { + // try to make the block_size a divisor of + // m_limit to make the distributions as fair + // as possible + // TODO: move this calculcation to where the limit + // is changed + block_size = m_limit + / (m_limit / max_bandwidth_block_size); + } + } + + if (amount < block_size / 2) + { + m_queue.push_front(qe); + break; + } + + // don't hand out chunks larger than the throttle + // per second on the torrent + assert(qe.max_block_size <= t->bandwidth_throttle(m_channel)); + + // so, hand out max_assignable, but no more than + // the available bandwidth (amount) and no more + // than the max_bandwidth_block_size + int hand_out_amount = (std::min)((std::min)(block_size, max_assignable) + , amount); + assert(hand_out_amount > 0); + amount -= hand_out_amount; + t->assign_bandwidth(m_channel, hand_out_amount, qe.max_block_size); + qe.peer->assign_bandwidth(m_channel, hand_out_amount); + add_history_entry(history_entry( + qe.peer, t, hand_out_amount, now + bw_window_size)); + } + } + catch (std::exception& e) + { assert(false); }; + typedef boost::mutex mutex_t; mutable mutex_t m_mutex; @@ -212,11 +430,13 @@ private: int m_current_quota; // these are the consumers that want bandwidth - std::deque m_queue; + typedef std::deque > queue_t; + queue_t m_queue; // these are the consumers that have received bandwidth // that will expire - std::deque m_history; + typedef std::deque > history_t; + history_t m_history; // this is the channel within the consumers // that bandwidth is assigned to (upload or download) @@ -226,3 +446,4 @@ private: } #endif + diff --git a/include/libtorrent/peer_connection.hpp b/include/libtorrent/peer_connection.hpp index b25503a9a..b459c491e 100755 --- a/include/libtorrent/peer_connection.hpp +++ b/include/libtorrent/peer_connection.hpp @@ -317,10 +317,11 @@ namespace libtorrent void send_block_requests(); int max_assignable_bandwidth(int channel) const - { - return m_bandwidth_limit[channel].max_assignable(); - } + { return m_bandwidth_limit[channel].max_assignable(); } + int bandwidth_throttle(int channel) const + { return m_bandwidth_limit[channel].throttle(); } + void assign_bandwidth(int channel, int amount); void expire_bandwidth(int channel, int amount); diff --git a/include/libtorrent/stat.hpp b/include/libtorrent/stat.hpp index 8e92c12a1..8ef8da1b4 100755 --- a/include/libtorrent/stat.hpp +++ b/include/libtorrent/stat.hpp @@ -47,8 +47,8 @@ namespace libtorrent class TORRENT_EXPORT stat { friend class invariant_access; - enum { history = 10 }; public: + enum { history = 10 }; stat() : m_downloaded_payload(0) diff --git a/include/libtorrent/torrent.hpp b/include/libtorrent/torrent.hpp index 9c9a5d034..1274c46fc 100755 --- a/include/libtorrent/torrent.hpp +++ b/include/libtorrent/torrent.hpp @@ -225,14 +225,22 @@ namespace libtorrent bandwidth_limit m_bandwidth_limit[2]; void request_bandwidth(int channel - , boost::intrusive_ptr p + , boost::intrusive_ptr const& p + , bool non_prioritized); + + void perform_bandwidth_request(int channel + , boost::intrusive_ptr const& p + , int block_size , bool non_prioritized); void expire_bandwidth(int channel, int amount); - void assign_bandwidth(int channel, int amount); + void assign_bandwidth(int channel, int amount, int blk); int bandwidth_throttle(int channel) const; + int max_assignable_bandwidth(int channel) const + { return m_bandwidth_limit[channel].max_assignable(); } + // -------------------------------------------- // PEER MANAGEMENT @@ -668,10 +676,12 @@ namespace libtorrent boost::scoped_ptr m_picker; // the queue of peer_connections that want more bandwidth - std::deque m_bandwidth_queue[2]; + typedef std::deque > queue_t; + queue_t m_bandwidth_queue[2]; std::vector m_trackers; - // this is an index into m_torrent_file.trackers() + // this is an index into m_trackers + int m_last_working_tracker; int m_currently_trying_tracker; // the number of connection attempts that has diff --git a/src/bandwidth_manager.cpp b/src/bandwidth_manager.cpp deleted file mode 100644 index 19dc6fcf7..000000000 --- a/src/bandwidth_manager.cpp +++ /dev/null @@ -1,249 +0,0 @@ -/* - -Copyright (c) 2007, Arvid Norberg -All rights reserved. - -Redistribution and use in source and binary forms, with or without -modification, are permitted provided that the following conditions -are met: - - * Redistributions of source code must retain the above copyright - notice, this list of conditions and the following disclaimer. - * Redistributions in binary form must reproduce the above copyright - notice, this list of conditions and the following disclaimer in - the documentation and/or other materials provided with the distribution. - * Neither the name of the author nor the names of its - contributors may be used to endorse or promote products derived - from this software without specific prior written permission. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE -LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR -CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF -SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN -CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) -ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE -POSSIBILITY OF SUCH DAMAGE. - -*/ - -#include "libtorrent/pch.hpp" - -#include "libtorrent/invariant_check.hpp" -#include "libtorrent/bandwidth_manager.hpp" -#include "libtorrent/peer_connection.hpp" -#include "libtorrent/time.hpp" - -#if defined TORRENT_LOGGING || defined TORRENT_VERBOSE_LOGGING -#include "libtorrent/aux_/session_impl.hpp" -#endif - -namespace libtorrent -{ - namespace - { - const time_duration window_size = seconds(1); - } - - history_entry::history_entry(intrusive_ptr p - , weak_ptr t, int a, ptime exp) - : expires_at(exp), amount(a), peer(p), tor(t) - {} - - bw_queue_entry::bw_queue_entry(intrusive_ptr const& pe - , bool no_prio) - : peer(pe), non_prioritized(no_prio) - {} - - bandwidth_manager::bandwidth_manager(io_service& ios, int channel) - : m_ios(ios) - , m_history_timer(m_ios) - , m_limit(bandwidth_limit::inf) - , m_current_quota(0) - , m_channel(channel) - {} - - void bandwidth_manager::request_bandwidth(intrusive_ptr peer - , bool non_prioritized) - { - INVARIANT_CHECK; - - assert(!peer->ignore_bandwidth_limits()); - - // make sure this peer isn't already in line - // waiting for bandwidth -#ifndef NDEBUG - for (std::deque::iterator i = m_queue.begin() - , end(m_queue.end()); i != end; ++i) - { - assert(i->peer < peer || peer < i->peer); - } -#endif - - assert(peer->max_assignable_bandwidth(m_channel) > 0); - - // if the queue is empty, we have to push the new - // peer at the back of it. If the peer is non-prioritized - // it is not supposed to cut in fron of anybody, so then - // we also just add it at the end - if (m_queue.empty() || non_prioritized) - { - m_queue.push_back(bw_queue_entry(peer, non_prioritized)); - } - else - { - // skip forward in the queue until we find a prioritized peer - // or hit the front of it. - std::deque::reverse_iterator i = m_queue.rbegin(); - while (i != m_queue.rend() && i->non_prioritized) ++i; - m_queue.insert(i.base(), bw_queue_entry(peer, non_prioritized)); - } - if (m_queue.size() == 1) hand_out_bandwidth(); - } - - -#ifndef NDEBUG - void bandwidth_manager::check_invariant() const - { - int current_quota = 0; - for (std::deque::const_iterator i - = m_history.begin(), end(m_history.end()); i != end; ++i) - { - current_quota += i->amount; - } - - assert(current_quota == m_current_quota); - } -#endif - - void bandwidth_manager::add_history_entry(history_entry const& e) try - { - INVARIANT_CHECK; -#if defined TORRENT_LOGGING || defined TORRENT_VERBOSE_LOGGING -// (*m_ses->m_logger) << "bw history [" << m_channel << "]\n"; -#endif - - m_history.push_front(e); - m_current_quota += e.amount; - // in case the size > 1 there is already a timer - // active that will be invoked, no need to set one up - if (m_history.size() > 1) return; - - m_history_timer.expires_at(e.expires_at); - m_history_timer.async_wait(bind(&bandwidth_manager::on_history_expire, this, _1)); - } - catch (std::exception&) { assert(false); } - - void bandwidth_manager::on_history_expire(asio::error_code const& e) try - { - INVARIANT_CHECK; - - if (e) return; - -#if defined TORRENT_LOGGING || defined TORRENT_VERBOSE_LOGGING -// (*m_ses->m_logger) << "bw expire [" << m_channel << "]\n"; -#endif - - assert(!m_history.empty()); - - ptime now(time_now()); - while (!m_history.empty() && m_history.back().expires_at <= now) - { - history_entry e = m_history.back(); - m_history.pop_back(); - m_current_quota -= e.amount; - assert(m_current_quota >= 0); - intrusive_ptr c = e.peer; - shared_ptr t = e.tor.lock(); - if (!c->is_disconnecting()) c->expire_bandwidth(m_channel, e.amount); - if (t) t->expire_bandwidth(m_channel, e.amount); - } - - // now, wait for the next chunk to expire - if (!m_history.empty()) - { - m_history_timer.expires_at(m_history.back().expires_at); - m_history_timer.async_wait(bind(&bandwidth_manager::on_history_expire, this, _1)); - } - - // since some bandwidth just expired, it - // means we can hand out more (in case there - // are still consumers in line) - if (!m_queue.empty()) hand_out_bandwidth(); - } - catch (std::exception&) - { - assert(false); - }; - - void bandwidth_manager::hand_out_bandwidth() try - { - INVARIANT_CHECK; -#if defined TORRENT_LOGGING || defined TORRENT_VERBOSE_LOGGING -// (*m_ses->m_logger) << "hand out bw [" << m_channel << "]\n"; -#endif - - ptime now(time_now()); - - mutex_t::scoped_lock l(m_mutex); - int limit = m_limit; - l.unlock(); - - // available bandwidth to hand out - int amount = limit - m_current_quota; - - int bandwidth_block_size_limit = max_bandwidth_block_size; - if (m_queue.size() > 3 && bandwidth_block_size_limit > limit / int(m_queue.size())) - bandwidth_block_size_limit = std::max(max_bandwidth_block_size / int(m_queue.size() - 3) - , min_bandwidth_block_size); - - while (!m_queue.empty() && amount > 0) - { - assert(amount == limit - m_current_quota); - bw_queue_entry qe = m_queue.front(); - m_queue.pop_front(); - - shared_ptr t = qe.peer->associated_torrent().lock(); - if (!t) continue; - if (qe.peer->is_disconnecting()) - { - t->expire_bandwidth(m_channel, -1); - continue; - } - - // at this point, max_assignable may actually be zero. Since - // the bandwidth quota is subtracted once the data has been - // send. If the peer was added to the queue while the data was - // still being sent, max_assignable may have been > 0 at that time. - int max_assignable = qe.peer->max_assignable_bandwidth(m_channel); - if (max_assignable == 0) - { - t->expire_bandwidth(m_channel, -1); - continue; - } - // don't hand out chunks larger than the throttle - // per second on the torrent - if (max_assignable > t->bandwidth_throttle(m_channel)) - max_assignable = t->bandwidth_throttle(m_channel); - - // so, hand out max_assignable, but no more than - // the available bandwidth (amount) and no more - // than the max_bandwidth_block_size - int single_amount = std::min(amount - , std::min(bandwidth_block_size_limit - , max_assignable)); - assert(single_amount > 0); - amount -= single_amount; - qe.peer->assign_bandwidth(m_channel, single_amount); - t->assign_bandwidth(m_channel, single_amount); - add_history_entry(history_entry(qe.peer, t, single_amount, now + window_size)); - } - } - catch (std::exception& e) - { assert(false); }; - -} - diff --git a/src/peer_connection.cpp b/src/peer_connection.cpp index b815420c1..8a2ac0ae3 100755 --- a/src/peer_connection.cpp +++ b/src/peer_connection.cpp @@ -1907,9 +1907,8 @@ namespace libtorrent if (m_remote_dl_rate == 0) factor = 0.0f; - m_remote_dl_rate = - (m_remote_dl_rate * factor) + - ((m_remote_bytes_dled * (1.0f-factor)) / 60.f); + m_remote_dl_rate = int((m_remote_dl_rate * factor) + + ((m_remote_bytes_dled * (1.0f-factor)) / 60.f)); m_remote_bytes_dled = 0; m_remote_dl_update = now; diff --git a/src/session_impl.cpp b/src/session_impl.cpp index 1a744ab44..f07f48d62 100755 --- a/src/session_impl.cpp +++ b/src/session_impl.cpp @@ -482,8 +482,8 @@ namespace detail : m_strand(m_io_service) , m_files(40) , m_half_open(m_io_service) - , m_dl_bandwidth_manager(m_io_service, peer_connection::download_channel) - , m_ul_bandwidth_manager(m_io_service, peer_connection::upload_channel) + , m_download_channel(m_io_service, peer_connection::download_channel) + , m_upload_channel(m_io_service, peer_connection::upload_channel) , m_tracker_manager(m_settings, m_tracker_proxy) , m_listen_port_range(listen_port_range) , m_listen_interface(address::from_string(listen_interface), listen_port_range.first) @@ -501,13 +501,12 @@ namespace detail , m_next_connect_torrent(0) , m_checker_impl(*this) { + m_bandwidth_manager[peer_connection::download_channel] = &m_download_channel; + m_bandwidth_manager[peer_connection::upload_channel] = &m_upload_channel; #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING) m_logger = create_log("main_session", listen_port(), false); (*m_logger) << time_now_string() << "\n"; - - m_dl_bandwidth_manager.m_ses = this; - m_ul_bandwidth_manager.m_ses = this; #endif #ifdef TORRENT_STATS @@ -1771,14 +1770,6 @@ namespace detail } #endif - void session_impl::set_download_rate_limit(int bytes_per_second) - { - assert(bytes_per_second > 0 || bytes_per_second == -1); - mutex_t::scoped_lock l(m_mutex); - if (bytes_per_second == -1) bytes_per_second = bandwidth_limit::inf; - m_dl_bandwidth_manager.throttle(bytes_per_second); - } - bool session_impl::is_listening() const { mutex_t::scoped_lock l(m_mutex); @@ -1861,12 +1852,20 @@ namespace detail m_half_open.limit(limit); } + void session_impl::set_download_rate_limit(int bytes_per_second) + { + assert(bytes_per_second > 0 || bytes_per_second == -1); + mutex_t::scoped_lock l(m_mutex); + if (bytes_per_second == -1) bytes_per_second = bandwidth_limit::inf; + m_bandwidth_manager[peer_connection::download_channel]->throttle(bytes_per_second); + } + void session_impl::set_upload_rate_limit(int bytes_per_second) { assert(bytes_per_second > 0 || bytes_per_second == -1); mutex_t::scoped_lock l(m_mutex); if (bytes_per_second == -1) bytes_per_second = bandwidth_limit::inf; - m_ul_bandwidth_manager.throttle(bytes_per_second); + m_bandwidth_manager[peer_connection::upload_channel]->throttle(bytes_per_second); } int session_impl::num_uploads() const @@ -1887,7 +1886,6 @@ namespace detail return m_connections.size(); } - std::auto_ptr session_impl::pop_alert() { mutex_t::scoped_lock l(m_mutex); @@ -1905,15 +1903,14 @@ namespace detail int session_impl::upload_rate_limit() const { mutex_t::scoped_lock l(m_mutex); - return m_ul_bandwidth_manager.throttle(); + return m_bandwidth_manager[peer_connection::upload_channel]->throttle(); } int session_impl::download_rate_limit() const { mutex_t::scoped_lock l(m_mutex); - return m_dl_bandwidth_manager.throttle(); + return m_bandwidth_manager[peer_connection::download_channel]->throttle(); } - void session_impl::start_lsd() { diff --git a/src/torrent.cpp b/src/torrent.cpp index fe1bf8091..d173f8943 100755 --- a/src/torrent.cpp +++ b/src/torrent.cpp @@ -1946,35 +1946,29 @@ namespace libtorrent } } - bool torrent::request_bandwidth_from_session(int channel) const - { - int max_assignable = m_bandwidth_limit[channel].max_assignable(); - return max_assignable > max_bandwidth_block_size - || (m_bandwidth_limit[channel].throttle() < max_bandwidth_block_size - && max_assignable == m_bandwidth_limit[channel].throttle()); - } - int torrent::bandwidth_throttle(int channel) const { return m_bandwidth_limit[channel].throttle(); } void torrent::request_bandwidth(int channel - , boost::intrusive_ptr p + , boost::intrusive_ptr const& p , bool non_prioritized) { - if (request_bandwidth_from_session(channel)) + int block_size = m_bandwidth_limit[channel].throttle() / 10; + + if (m_bandwidth_limit[channel].max_assignable() > 0) { - if (channel == peer_connection::upload_channel) - m_ses.m_ul_bandwidth_manager.request_bandwidth(p, non_prioritized); - else if (channel == peer_connection::download_channel) - m_ses.m_dl_bandwidth_manager.request_bandwidth(p, non_prioritized); - - m_bandwidth_limit[channel].assign(max_bandwidth_block_size); + perform_bandwidth_request(channel, p, block_size, non_prioritized); } else { - m_bandwidth_queue[channel].push_back(bw_queue_entry(p, non_prioritized)); + // skip forward in the queue until we find a prioritized peer + // or hit the front of it. + queue_t::reverse_iterator i = m_bandwidth_queue[channel].rbegin(); + while (i != m_bandwidth_queue[channel].rend() && i->non_prioritized) ++i; + m_bandwidth_queue[channel].insert(i.base(), bw_queue_entry( + p, block_size, non_prioritized)); } } @@ -1982,30 +1976,40 @@ namespace libtorrent { session_impl::mutex_t::scoped_lock l(m_ses.m_mutex); - assert(amount >= -1); - if (amount == -1) amount = max_bandwidth_block_size; + assert(amount > 0); m_bandwidth_limit[channel].expire(amount); - while (!m_bandwidth_queue[channel].empty() - && request_bandwidth_from_session(channel)) + while (!m_bandwidth_queue[channel].empty()) { - bw_queue_entry qe = m_bandwidth_queue[channel].front(); + bw_queue_entry qe = m_bandwidth_queue[channel].front(); + if (m_bandwidth_limit[channel].max_assignable() == 0) + break; m_bandwidth_queue[channel].pop_front(); - if (channel == peer_connection::upload_channel) - m_ses.m_ul_bandwidth_manager.request_bandwidth(qe.peer, qe.non_prioritized); - else if (channel == peer_connection::download_channel) - m_ses.m_dl_bandwidth_manager.request_bandwidth(qe.peer, qe.non_prioritized); - m_bandwidth_limit[channel].assign(max_bandwidth_block_size); + perform_bandwidth_request(channel, qe.peer + , qe.max_block_size, qe.non_prioritized); } } - void torrent::assign_bandwidth(int channel, int amount) + void torrent::perform_bandwidth_request(int channel + , boost::intrusive_ptr const& p + , int block_size + , bool non_prioritized) + { + assert(m_bandwidth_limit[channel].max_assignable() >= block_size); + + m_ses.m_bandwidth_manager[channel]->request_bandwidth(p + , block_size, non_prioritized); + m_bandwidth_limit[channel].assign(block_size); + } + + void torrent::assign_bandwidth(int channel, int amount, int blk) { session_impl::mutex_t::scoped_lock l(m_ses.m_mutex); - assert(amount >= 0); - if (amount < max_bandwidth_block_size) - expire_bandwidth(channel, max_bandwidth_block_size - amount); + assert(amount > 0); + assert(amount <= blk); + if (amount < blk) + expire_bandwidth(channel, blk - amount); } // called when torrent is finished (all interested pieces downloaded) diff --git a/test/Jamfile b/test/Jamfile index 05b220738..a7cfabf9d 100644 --- a/test/Jamfile +++ b/test/Jamfile @@ -26,5 +26,6 @@ test-suite libtorrent : [ run test_swarm.cpp ] [ run test_allocate_resources.cpp ] [ run test_web_seed.cpp ] + [ run test_bandwidth_limiter.cpp ] ; diff --git a/test/setup_transfer.cpp b/test/setup_transfer.cpp index 412ebddc4..8a6a71c63 100644 --- a/test/setup_transfer.cpp +++ b/test/setup_transfer.cpp @@ -52,7 +52,7 @@ setup_transfer(session* ses1, session* ses2, session* ses3 t.add_tracker(tracker_url); std::vector piece(16 * 1024); - for (int i = 0; i < piece.size(); ++i) + for (int i = 0; i < int(piece.size()); ++i) piece[i] = (i % 26) + 'A'; // calculate the hash for all pieces diff --git a/test/test_bandwidth_limiter.cpp b/test/test_bandwidth_limiter.cpp new file mode 100644 index 000000000..370a5bf91 --- /dev/null +++ b/test/test_bandwidth_limiter.cpp @@ -0,0 +1,402 @@ +#include "test.hpp" + +#include "libtorrent/bandwidth_manager.hpp" +#include "libtorrent/socket.hpp" +#include "libtorrent/stat.hpp" +#include "libtorrent/time.hpp" + +#include + +struct torrent; +struct peer_connection; + +using namespace libtorrent; + +struct torrent +{ + torrent(bandwidth_manager& m) + : m_bandwidth_manager(m) + {} + + void assign_bandwidth(int channel, int amount, int max_block_size) + { +// std::cerr << time_now_string() +// << ": assign bandwidth, " << amount << " blk: " << max_block_size << std::endl; + assert(amount > 0); + if (amount < max_block_size) + expire_bandwidth(channel, max_block_size - amount); + } + + int bandwidth_throttle(int channel) const + { return m_bandwidth_limit[channel].throttle(); } + + int max_assignable_bandwidth(int channel) const + { return m_bandwidth_limit[channel].max_assignable(); } + + void request_bandwidth(int channel + , boost::intrusive_ptr const& p + , bool non_prioritized) + { +// std::cerr << time_now_string() +// << ": request bandwidth" << std::endl; + + int block_size = m_bandwidth_limit[channel].throttle() / 10; + + if (m_bandwidth_limit[channel].max_assignable() > 0) + { + perform_bandwidth_request(channel, p, block_size, non_prioritized); + } + else + { + // skip forward in the queue until we find a prioritized peer + // or hit the front of it. + queue_t::reverse_iterator i = m_bandwidth_queue[channel].rbegin(); + while (i != m_bandwidth_queue[channel].rend() && i->non_prioritized) ++i; + m_bandwidth_queue[channel].insert(i.base(), bw_queue_entry( + p, block_size, non_prioritized)); + } + } + + void expire_bandwidth(int channel, int amount) + { +// std::cerr << time_now_string() +// << ": expire bandwidth, " << amount << std::endl; + assert(amount > 0); + m_bandwidth_limit[channel].expire(amount); + while (!m_bandwidth_queue[channel].empty()) + { + bw_queue_entry qe = m_bandwidth_queue[channel].front(); + if (m_bandwidth_limit[channel].max_assignable() == 0) + break; + m_bandwidth_queue[channel].pop_front(); + perform_bandwidth_request(channel, qe.peer + , qe.max_block_size, qe.non_prioritized); + } + } + + void perform_bandwidth_request(int channel + , boost::intrusive_ptr const& p + , int block_size + , bool non_prioritized) + { + m_bandwidth_manager.request_bandwidth(p + , block_size, non_prioritized); + m_bandwidth_limit[channel].assign(block_size); + } + bandwidth_limit m_bandwidth_limit[1]; + typedef std::deque > queue_t; + queue_t m_bandwidth_queue[1]; + bandwidth_manager& m_bandwidth_manager; +}; + +struct peer_connection +{ + peer_connection(io_service& ios, boost::shared_ptr const& t + , bool prio, bool ignore_limits, std::string name) + : m_torrent(t) + , m_prioritized(prio) + , m_ignore_limits(ignore_limits) + , m_abort(false) + , m_ios(ios) + , m_name(name) + , m_refs(0) + {} + + bool ignore_bandwidth_limits() { return m_ignore_limits; } + int max_assignable_bandwidth(int channel) const + { return m_bandwidth_limit[channel].max_assignable(); } + boost::weak_ptr associated_torrent() const + { return m_torrent; } + bool is_disconnecting() const { return m_abort; } + void assign_bandwidth(int channel, int amount) + { +// std::cerr << time_now_string() << ": [" << m_name +// << "] assign bandwidth, " << amount << std::endl; + assert(amount > 0); + m_bandwidth_limit[channel].assign(amount); + boost::shared_ptr t = m_torrent.lock(); + if (!t) return; + m_stats.sent_bytes(amount, 0); + m_ios.post(boost::bind(&torrent::request_bandwidth, t, int(0) + , intrusive_ptr(this), !m_prioritized)); + } + void start() + { + boost::shared_ptr t = m_torrent.lock(); + if (!t) return; + t->request_bandwidth(0, this, !m_prioritized); + } + void stop() { m_abort = true; } + void expire_bandwidth(int channel, int amount) + { + assert(amount > 0); +// std::cerr << time_now_string() << ": [" << m_name +// << "] expire bandwidth, " << amount << std::endl; + m_bandwidth_limit[channel].expire(amount); + } + void tick() + { +// std::cerr << time_now_string() << ": [" << m_name +// << "] tick, rate: " << m_stats.upload_rate() << std::endl; + m_stats.second_tick(1.f); + } + + int bandwidth_throttle(int channel) const + { return m_bandwidth_limit[channel].throttle(); } + + bandwidth_limit m_bandwidth_limit[1]; + boost::weak_ptr m_torrent; + bool m_prioritized; + bool m_ignore_limits; + bool m_abort; + stat m_stats; + io_service& m_ios; + std::string m_name; + int m_refs; +}; + +void intrusive_ptr_add_ref(peer_connection* p) +{ + ++p->m_refs; +} + +void intrusive_ptr_release(peer_connection* p) +{ + if (--p->m_refs == 0) + delete p; +} + +typedef std::vector > connections_t; + +void do_tick(asio::error_code const&e, deadline_timer& tick, connections_t& v) +{ + if (e) return; + std::for_each(v.begin(), v.end() + , boost::bind(&peer_connection::tick, _1)); + tick.expires_from_now(seconds(1)); + tick.async_wait(boost::bind(&do_tick, _1, boost::ref(tick), boost::ref(v))); +} + +void do_stop(deadline_timer& tick, connections_t& v) +{ + tick.cancel(); + std::for_each(v.begin(), v.end() + , boost::bind(&peer_connection::stop, _1)); +} + +void run_test(io_service& ios, connections_t& v) +{ + std::cerr << "-------------\n" << std::endl; + deadline_timer tick(ios); + tick.expires_from_now(seconds(1)); + tick.async_wait(boost::bind(&do_tick, _1, boost::ref(tick), boost::ref(v))); + + deadline_timer complete(ios); + complete.expires_from_now(seconds(stat::history * 2)); + complete.async_wait(boost::bind(&do_stop, boost::ref(tick), boost::ref(v))); + + std::for_each(v.begin(), v.end() + , boost::bind(&peer_connection::start, _1)); + + ios.run(); +} + +bool close_to(float val, float comp, float err) +{ + return fabs(val - comp) <= err; +} + +void spawn_connections(connections_t& v, io_service& ios + , boost::shared_ptr t, int num, char const* prefix) +{ + for (int i = 0; i < num; ++i) + { + v.push_back(new peer_connection(ios, t, true, false + , prefix + boost::lexical_cast(i))); + } +} + +void test_equal_connections(int num, int limit) +{ + std::cerr << "test equal connections " << num << " " << limit << std::endl; + io_service ios; + bandwidth_manager manager(ios, 0); + manager.throttle(limit); + + boost::shared_ptr t1(new torrent(manager)); + + connections_t v; + spawn_connections(v, ios, t1, num, "p"); + run_test(ios, v); + + float sum = 0.f; + float err = std::max(limit / num * 0.3f, 1000.f); + for (connections_t::iterator i = v.begin() + , end(v.end()); i != end; ++i) + { + sum += (*i)->m_stats.upload_rate(); + + std::cerr << (*i)->m_stats.upload_rate() + << " target: " << (limit / num) << " eps: " << err << std::endl; + TEST_CHECK(close_to((*i)->m_stats.upload_rate(), limit / num, err)); + } + std::cerr << "sum: " << sum << " target: " << limit << std::endl; + TEST_CHECK(close_to(sum, limit, 50)); +} + +void test_single_peer(int limit, bool torrent_limit) +{ + std::cerr << "test single peer " << limit << " " << torrent_limit << std::endl; + io_service ios; + bandwidth_manager manager(ios, 0); + boost::shared_ptr t1(new torrent(manager)); + + if (torrent_limit) + t1->m_bandwidth_limit[0].throttle(limit); + else + manager.throttle(limit); + + connections_t v; + spawn_connections(v, ios, t1, 1, "p"); + run_test(ios, v); + + float sum = 0.f; + for (connections_t::iterator i = v.begin() + , end(v.end()); i != end; ++i) + { + sum += (*i)->m_stats.upload_rate(); + } + std::cerr << sum << " target: " << limit << std::endl; + TEST_CHECK(close_to(sum, limit, 1000)); +} + +void test_equal_torrents(int num, int limit) +{ + std::cerr << "test equal torrents " << num << " " << limit << std::endl; + io_service ios; + bandwidth_manager manager(ios, 0); + + boost::shared_ptr t1(new torrent(manager)); + boost::shared_ptr t2(new torrent(manager)); + + t1->m_bandwidth_limit[0].throttle(limit); + t2->m_bandwidth_limit[0].throttle(limit); + + connections_t v1; + spawn_connections(v1, ios, t1, num, "t1p"); + connections_t v2; + spawn_connections(v2, ios, t2, num, "t2p"); + connections_t v; + std::copy(v1.begin(), v1.end(), std::back_inserter(v)); + std::copy(v2.begin(), v2.end(), std::back_inserter(v)); + run_test(ios, v); + + float sum = 0.f; + for (connections_t::iterator i = v1.begin() + , end(v1.end()); i != end; ++i) + { + sum += (*i)->m_stats.upload_rate(); + } + std::cerr << sum << " target: " << limit << std::endl; + TEST_CHECK(close_to(sum, limit, 1000)); + + sum = 0.f; + for (connections_t::iterator i = v2.begin() + , end(v2.end()); i != end; ++i) + { + sum += (*i)->m_stats.upload_rate(); + } + std::cerr << sum << " target: " << limit << std::endl; + TEST_CHECK(close_to(sum, limit, 1000)); +} + +void test_peer_priority(int limit, bool torrent_limit) +{ + std::cerr << "test peer priority " << limit << " " << torrent_limit << std::endl; + io_service ios; + bandwidth_manager manager(ios, 0); + boost::shared_ptr t1(new torrent(manager)); + + if (torrent_limit) + t1->m_bandwidth_limit[0].throttle(limit); + else + manager.throttle(limit); + + connections_t v1; + spawn_connections(v1, ios, t1, 10, "p"); + connections_t v; + std::copy(v1.begin(), v1.end(), std::back_inserter(v)); + boost::intrusive_ptr p( + new peer_connection(ios, t1, false, false, "no-priority")); + v.push_back(p); + run_test(ios, v); + + float sum = 0.f; + for (connections_t::iterator i = v1.begin() + , end(v1.end()); i != end; ++i) + { + sum += (*i)->m_stats.upload_rate(); + } + std::cerr << sum << " target: " << limit << std::endl; + TEST_CHECK(close_to(sum, limit, 50)); + + std::cerr << "non-prioritized rate: " << p->m_stats.upload_rate() << std::endl; + TEST_CHECK(p->m_stats.upload_rate() < 10); +} + +void test_no_starvation(int limit) +{ + std::cerr << "test no starvation " << limit << std::endl; + io_service ios; + bandwidth_manager manager(ios, 0); + boost::shared_ptr t1(new torrent(manager)); + boost::shared_ptr t2(new torrent(manager)); + + manager.throttle(limit); + + const int num_peers = 20; + + connections_t v1; + spawn_connections(v1, ios, t1, num_peers, "p"); + connections_t v; + std::copy(v1.begin(), v1.end(), std::back_inserter(v)); + boost::intrusive_ptr p( + new peer_connection(ios, t2, false, false, "no-priority")); + v.push_back(p); + run_test(ios, v); + + float sum = 0.f; + for (connections_t::iterator i = v.begin() + , end(v.end()); i != end; ++i) + { + sum += (*i)->m_stats.upload_rate(); + } + std::cerr << sum << " target: " << limit << std::endl; + TEST_CHECK(close_to(sum, limit, 50)); + + std::cerr << "non-prioritized rate: " << p->m_stats.upload_rate() << std::endl; + TEST_CHECK(close_to(p->m_stats.upload_rate(), limit / (num_peers + 1), 1000)); +} + +int test_main() +{ + using namespace libtorrent; + + test_equal_connections(2, 20000); + test_equal_connections(3, 20000); + test_equal_connections(5, 20000); + test_equal_connections(7, 20000); + test_equal_connections(33, 60000); + test_equal_connections(33, 500000); + test_equal_torrents(1, 40000); + test_equal_torrents(24, 50000); + test_single_peer(40000, true); + test_single_peer(40000, false); + test_peer_priority(40000, false); + test_peer_priority(40000, true); + test_no_starvation(40000); + + return 0; +} + +