improved bandwidth limiter and added a unit test for it

This commit is contained in:
Arvid Norberg 2007-07-02 23:48:06 +00:00
parent 8aa37dfa39
commit e714e1aeba
13 changed files with 740 additions and 353 deletions

View File

@ -150,7 +150,6 @@ lib ws2_32 : : <name>ws2_32 ;
SOURCES =
allocate_resources
alert
bandwidth_manager
connection_queue
entry
escape_string

View File

@ -350,8 +350,10 @@ namespace libtorrent
// handing out bandwidth to connections that
// asks for it, it can also throttle the
// rate.
bandwidth_manager m_dl_bandwidth_manager;
bandwidth_manager m_ul_bandwidth_manager;
bandwidth_manager<peer_connection, torrent> m_download_channel;
bandwidth_manager<peer_connection, torrent> m_upload_channel;
bandwidth_manager<peer_connection, torrent>* m_bandwidth_manager[2];
tracker_manager m_tracker_manager;
torrent_map m_torrents;

View File

@ -34,6 +34,7 @@ POSSIBILITY OF SUCH DAMAGE.
#define TORRENT_BANDWIDTH_MANAGER_HPP_INCLUDED
#include "libtorrent/socket.hpp"
#include "libtorrent/invariant_check.hpp"
#include <boost/shared_ptr.hpp>
#include <boost/intrusive_ptr.hpp>
@ -48,39 +49,39 @@ using boost::shared_ptr;
using boost::intrusive_ptr;
using boost::bind;
namespace libtorrent
namespace libtorrent {
// the maximum block of bandwidth quota to
// hand out is 33kB. The block size may
// be smaller on lower limits
enum
{
class peer_connection;
class torrent;
// the maximum block of bandwidth quota to
// hand out is 33kB. The block size may
// be smaller on lower limits
const int max_bandwidth_block_size = 33000;
const int min_bandwidth_block_size = 4000;
#if defined TORRENT_LOGGING || defined TORRENT_VERBOSE_LOGGING
namespace aux
{
struct session_impl;
}
#endif
struct history_entry
{
history_entry(intrusive_ptr<peer_connection> p, weak_ptr<torrent> t
, int a, ptime exp);
ptime expires_at;
int amount;
intrusive_ptr<peer_connection> peer;
weak_ptr<torrent> tor;
max_bandwidth_block_size = 33000,
min_bandwidth_block_size = 400
};
const time_duration bw_window_size = seconds(1);
template<class PeerConnection, class Torrent>
struct history_entry
{
history_entry(intrusive_ptr<PeerConnection> p, weak_ptr<Torrent> t
, int a, ptime exp)
: expires_at(exp), amount(a), peer(p), tor(t) {}
ptime expires_at;
int amount;
intrusive_ptr<PeerConnection> peer;
weak_ptr<Torrent> tor;
};
template<class PeerConnection>
struct bw_queue_entry
{
bw_queue_entry(boost::intrusive_ptr<peer_connection> const& pe, bool no_prio);
boost::intrusive_ptr<peer_connection> peer;
bw_queue_entry(boost::intrusive_ptr<PeerConnection> const& pe
, int blk, bool no_prio)
: peer(pe), max_block_size(blk), non_prioritized(no_prio) {}
boost::intrusive_ptr<PeerConnection> peer;
int max_block_size;
bool non_prioritized;
};
@ -158,9 +159,25 @@ private:
int m_current_rate;
};
template<class T>
T clamp(T val, T ceiling, T floor)
{
assert(ceiling >= floor);
if (val >= ceiling) return ceiling;
else if (val <= floor) return floor;
return val;
}
template<class PeerConnection, class Torrent>
struct bandwidth_manager
{
bandwidth_manager(io_service& ios, int channel);
bandwidth_manager(io_service& ios, int channel)
: m_ios(ios)
, m_history_timer(m_ios)
, m_limit(bandwidth_limit::inf)
, m_current_quota(0)
, m_channel(channel)
{}
void throttle(int limit)
{
@ -178,21 +195,222 @@ struct bandwidth_manager
// non prioritized means that, if there's a line for bandwidth,
// others will cut in front of the non-prioritized peers.
// this is used by web seeds
void request_bandwidth(intrusive_ptr<peer_connection> peer
, bool non_prioritized);
void request_bandwidth(intrusive_ptr<PeerConnection> peer
, int blk
, bool non_prioritized)
{
INVARIANT_CHECK;
assert(blk > 0);
assert(!peer->ignore_bandwidth_limits());
// make sure this peer isn't already in line
// waiting for bandwidth
#ifndef NDEBUG
for (typename queue_t::iterator i = m_queue.begin()
, end(m_queue.end()); i != end; ++i)
{
assert(i->peer < peer || peer < i->peer);
}
#endif
assert(peer->max_assignable_bandwidth(m_channel) > 0);
boost::shared_ptr<Torrent> t = peer->associated_torrent().lock();
m_queue.push_back(bw_queue_entry<PeerConnection>(peer, blk, non_prioritized));
if (!non_prioritized)
{
typename queue_t::reverse_iterator i = m_queue.rbegin();
typename queue_t::reverse_iterator j(i);
for (++j; j != m_queue.rend(); ++j)
{
// if the peer's torrent is not the same one
// continue looking for a peer from the same torrent
if (j->peer->associated_torrent().lock() != t)
continue;
// if we found a peer from the same torrent that
// is prioritized, there is no point looking
// any further.
if (!j->non_prioritized) break;
using std::swap;
swap(*i, *j);
i = j;
}
}
if (m_queue.size() == 1) hand_out_bandwidth();
}
#ifndef NDEBUG
void check_invariant() const;
#endif
#if defined TORRENT_LOGGING || defined TORRENT_VERBOSE_LOGGING
aux::session_impl* m_ses;
void check_invariant() const
{
int current_quota = 0;
for (typename history_t::const_iterator i
= m_history.begin(), end(m_history.end()); i != end; ++i)
{
current_quota += i->amount;
}
assert(current_quota == m_current_quota);
}
#endif
private:
void add_history_entry(history_entry const& e);
void on_history_expire(asio::error_code const& e);
void hand_out_bandwidth();
void add_history_entry(history_entry<PeerConnection, Torrent> const& e) try
{
INVARIANT_CHECK;
m_history.push_front(e);
m_current_quota += e.amount;
// in case the size > 1 there is already a timer
// active that will be invoked, no need to set one up
if (m_history.size() > 1) return;
m_history_timer.expires_at(e.expires_at);
m_history_timer.async_wait(bind(&bandwidth_manager::on_history_expire, this, _1));
}
catch (std::exception&) { assert(false); }
void on_history_expire(asio::error_code const& e) try
{
INVARIANT_CHECK;
if (e) return;
assert(!m_history.empty());
ptime now(time_now());
while (!m_history.empty() && m_history.back().expires_at <= now)
{
history_entry<PeerConnection, Torrent> e = m_history.back();
m_history.pop_back();
m_current_quota -= e.amount;
assert(m_current_quota >= 0);
intrusive_ptr<PeerConnection> c = e.peer;
shared_ptr<Torrent> t = e.tor.lock();
if (!c->is_disconnecting()) c->expire_bandwidth(m_channel, e.amount);
if (t) t->expire_bandwidth(m_channel, e.amount);
}
// now, wait for the next chunk to expire
if (!m_history.empty())
{
m_history_timer.expires_at(m_history.back().expires_at);
m_history_timer.async_wait(bind(&bandwidth_manager::on_history_expire, this, _1));
}
// since some bandwidth just expired, it
// means we can hand out more (in case there
// are still consumers in line)
if (!m_queue.empty()) hand_out_bandwidth();
}
catch (std::exception&)
{
assert(false);
};
void hand_out_bandwidth() try
{
INVARIANT_CHECK;
ptime now(time_now());
mutex_t::scoped_lock l(m_mutex);
int limit = m_limit;
l.unlock();
// available bandwidth to hand out
int amount = limit - m_current_quota;
while (!m_queue.empty() && amount > 0)
{
assert(amount == limit - m_current_quota);
bw_queue_entry<PeerConnection> qe = m_queue.front();
m_queue.pop_front();
shared_ptr<Torrent> t = qe.peer->associated_torrent().lock();
if (!t) continue;
if (qe.peer->is_disconnecting())
{
t->expire_bandwidth(m_channel, qe.max_block_size);
continue;
}
// at this point, max_assignable may actually be zero. Since
// the bandwidth quota is subtracted once the data has been
// sent. If the peer was added to the queue while the data was
// still being sent, max_assignable may have been > 0 at that time.
int max_assignable = (std::min)(
qe.peer->max_assignable_bandwidth(m_channel)
, t->max_assignable_bandwidth(m_channel));
if (max_assignable == 0)
{
t->expire_bandwidth(m_channel, qe.max_block_size);
continue;
}
// this is the limit of the block size. It depends on the throttle
// so that it can be closer to optimal. Larger block sizes will give lower
// granularity to the rate but will be more efficient. At high rates
// the block sizes are bigger and at low rates, the granularity
// is more important and block sizes are smaller
// the minimum rate that can be given is the block size, so, the
// block size must be smaller for lower rates. This is because
// the history window is one second, and the block will be forgotten
// after one second.
int block_size = (std::min)(qe.max_block_size
, (std::min)(qe.peer->bandwidth_throttle(m_channel)
, m_limit / 10));
if (block_size < min_bandwidth_block_size)
{
block_size = min_bandwidth_block_size;
}
else if (block_size > max_bandwidth_block_size)
{
if (m_limit == bandwidth_limit::inf)
{
block_size = max_bandwidth_block_size;
}
else
{
// try to make the block_size a divisor of
// m_limit to make the distributions as fair
// as possible
// TODO: move this calculcation to where the limit
// is changed
block_size = m_limit
/ (m_limit / max_bandwidth_block_size);
}
}
if (amount < block_size / 2)
{
m_queue.push_front(qe);
break;
}
// don't hand out chunks larger than the throttle
// per second on the torrent
assert(qe.max_block_size <= t->bandwidth_throttle(m_channel));
// so, hand out max_assignable, but no more than
// the available bandwidth (amount) and no more
// than the max_bandwidth_block_size
int hand_out_amount = (std::min)((std::min)(block_size, max_assignable)
, amount);
assert(hand_out_amount > 0);
amount -= hand_out_amount;
t->assign_bandwidth(m_channel, hand_out_amount, qe.max_block_size);
qe.peer->assign_bandwidth(m_channel, hand_out_amount);
add_history_entry(history_entry<PeerConnection, Torrent>(
qe.peer, t, hand_out_amount, now + bw_window_size));
}
}
catch (std::exception& e)
{ assert(false); };
typedef boost::mutex mutex_t;
mutable mutex_t m_mutex;
@ -212,11 +430,13 @@ private:
int m_current_quota;
// these are the consumers that want bandwidth
std::deque<bw_queue_entry> m_queue;
typedef std::deque<bw_queue_entry<PeerConnection> > queue_t;
queue_t m_queue;
// these are the consumers that have received bandwidth
// that will expire
std::deque<history_entry> m_history;
typedef std::deque<history_entry<PeerConnection, Torrent> > history_t;
history_t m_history;
// this is the channel within the consumers
// that bandwidth is assigned to (upload or download)
@ -226,3 +446,4 @@ private:
}
#endif

View File

@ -317,10 +317,11 @@ namespace libtorrent
void send_block_requests();
int max_assignable_bandwidth(int channel) const
{
return m_bandwidth_limit[channel].max_assignable();
}
{ return m_bandwidth_limit[channel].max_assignable(); }
int bandwidth_throttle(int channel) const
{ return m_bandwidth_limit[channel].throttle(); }
void assign_bandwidth(int channel, int amount);
void expire_bandwidth(int channel, int amount);

View File

@ -47,8 +47,8 @@ namespace libtorrent
class TORRENT_EXPORT stat
{
friend class invariant_access;
enum { history = 10 };
public:
enum { history = 10 };
stat()
: m_downloaded_payload(0)

View File

@ -225,14 +225,22 @@ namespace libtorrent
bandwidth_limit m_bandwidth_limit[2];
void request_bandwidth(int channel
, boost::intrusive_ptr<peer_connection> p
, boost::intrusive_ptr<peer_connection> const& p
, bool non_prioritized);
void perform_bandwidth_request(int channel
, boost::intrusive_ptr<peer_connection> const& p
, int block_size
, bool non_prioritized);
void expire_bandwidth(int channel, int amount);
void assign_bandwidth(int channel, int amount);
void assign_bandwidth(int channel, int amount, int blk);
int bandwidth_throttle(int channel) const;
int max_assignable_bandwidth(int channel) const
{ return m_bandwidth_limit[channel].max_assignable(); }
// --------------------------------------------
// PEER MANAGEMENT
@ -668,10 +676,12 @@ namespace libtorrent
boost::scoped_ptr<piece_picker> m_picker;
// the queue of peer_connections that want more bandwidth
std::deque<bw_queue_entry> m_bandwidth_queue[2];
typedef std::deque<bw_queue_entry<peer_connection> > queue_t;
queue_t m_bandwidth_queue[2];
std::vector<announce_entry> m_trackers;
// this is an index into m_torrent_file.trackers()
// this is an index into m_trackers
int m_last_working_tracker;
int m_currently_trying_tracker;
// the number of connection attempts that has

View File

@ -1,249 +0,0 @@
/*
Copyright (c) 2007, Arvid Norberg
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in
the documentation and/or other materials provided with the distribution.
* Neither the name of the author nor the names of its
contributors may be used to endorse or promote products derived
from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
*/
#include "libtorrent/pch.hpp"
#include "libtorrent/invariant_check.hpp"
#include "libtorrent/bandwidth_manager.hpp"
#include "libtorrent/peer_connection.hpp"
#include "libtorrent/time.hpp"
#if defined TORRENT_LOGGING || defined TORRENT_VERBOSE_LOGGING
#include "libtorrent/aux_/session_impl.hpp"
#endif
namespace libtorrent
{
namespace
{
const time_duration window_size = seconds(1);
}
history_entry::history_entry(intrusive_ptr<peer_connection> p
, weak_ptr<torrent> t, int a, ptime exp)
: expires_at(exp), amount(a), peer(p), tor(t)
{}
bw_queue_entry::bw_queue_entry(intrusive_ptr<peer_connection> const& pe
, bool no_prio)
: peer(pe), non_prioritized(no_prio)
{}
bandwidth_manager::bandwidth_manager(io_service& ios, int channel)
: m_ios(ios)
, m_history_timer(m_ios)
, m_limit(bandwidth_limit::inf)
, m_current_quota(0)
, m_channel(channel)
{}
void bandwidth_manager::request_bandwidth(intrusive_ptr<peer_connection> peer
, bool non_prioritized)
{
INVARIANT_CHECK;
assert(!peer->ignore_bandwidth_limits());
// make sure this peer isn't already in line
// waiting for bandwidth
#ifndef NDEBUG
for (std::deque<bw_queue_entry>::iterator i = m_queue.begin()
, end(m_queue.end()); i != end; ++i)
{
assert(i->peer < peer || peer < i->peer);
}
#endif
assert(peer->max_assignable_bandwidth(m_channel) > 0);
// if the queue is empty, we have to push the new
// peer at the back of it. If the peer is non-prioritized
// it is not supposed to cut in fron of anybody, so then
// we also just add it at the end
if (m_queue.empty() || non_prioritized)
{
m_queue.push_back(bw_queue_entry(peer, non_prioritized));
}
else
{
// skip forward in the queue until we find a prioritized peer
// or hit the front of it.
std::deque<bw_queue_entry>::reverse_iterator i = m_queue.rbegin();
while (i != m_queue.rend() && i->non_prioritized) ++i;
m_queue.insert(i.base(), bw_queue_entry(peer, non_prioritized));
}
if (m_queue.size() == 1) hand_out_bandwidth();
}
#ifndef NDEBUG
void bandwidth_manager::check_invariant() const
{
int current_quota = 0;
for (std::deque<history_entry>::const_iterator i
= m_history.begin(), end(m_history.end()); i != end; ++i)
{
current_quota += i->amount;
}
assert(current_quota == m_current_quota);
}
#endif
void bandwidth_manager::add_history_entry(history_entry const& e) try
{
INVARIANT_CHECK;
#if defined TORRENT_LOGGING || defined TORRENT_VERBOSE_LOGGING
// (*m_ses->m_logger) << "bw history [" << m_channel << "]\n";
#endif
m_history.push_front(e);
m_current_quota += e.amount;
// in case the size > 1 there is already a timer
// active that will be invoked, no need to set one up
if (m_history.size() > 1) return;
m_history_timer.expires_at(e.expires_at);
m_history_timer.async_wait(bind(&bandwidth_manager::on_history_expire, this, _1));
}
catch (std::exception&) { assert(false); }
void bandwidth_manager::on_history_expire(asio::error_code const& e) try
{
INVARIANT_CHECK;
if (e) return;
#if defined TORRENT_LOGGING || defined TORRENT_VERBOSE_LOGGING
// (*m_ses->m_logger) << "bw expire [" << m_channel << "]\n";
#endif
assert(!m_history.empty());
ptime now(time_now());
while (!m_history.empty() && m_history.back().expires_at <= now)
{
history_entry e = m_history.back();
m_history.pop_back();
m_current_quota -= e.amount;
assert(m_current_quota >= 0);
intrusive_ptr<peer_connection> c = e.peer;
shared_ptr<torrent> t = e.tor.lock();
if (!c->is_disconnecting()) c->expire_bandwidth(m_channel, e.amount);
if (t) t->expire_bandwidth(m_channel, e.amount);
}
// now, wait for the next chunk to expire
if (!m_history.empty())
{
m_history_timer.expires_at(m_history.back().expires_at);
m_history_timer.async_wait(bind(&bandwidth_manager::on_history_expire, this, _1));
}
// since some bandwidth just expired, it
// means we can hand out more (in case there
// are still consumers in line)
if (!m_queue.empty()) hand_out_bandwidth();
}
catch (std::exception&)
{
assert(false);
};
void bandwidth_manager::hand_out_bandwidth() try
{
INVARIANT_CHECK;
#if defined TORRENT_LOGGING || defined TORRENT_VERBOSE_LOGGING
// (*m_ses->m_logger) << "hand out bw [" << m_channel << "]\n";
#endif
ptime now(time_now());
mutex_t::scoped_lock l(m_mutex);
int limit = m_limit;
l.unlock();
// available bandwidth to hand out
int amount = limit - m_current_quota;
int bandwidth_block_size_limit = max_bandwidth_block_size;
if (m_queue.size() > 3 && bandwidth_block_size_limit > limit / int(m_queue.size()))
bandwidth_block_size_limit = std::max(max_bandwidth_block_size / int(m_queue.size() - 3)
, min_bandwidth_block_size);
while (!m_queue.empty() && amount > 0)
{
assert(amount == limit - m_current_quota);
bw_queue_entry qe = m_queue.front();
m_queue.pop_front();
shared_ptr<torrent> t = qe.peer->associated_torrent().lock();
if (!t) continue;
if (qe.peer->is_disconnecting())
{
t->expire_bandwidth(m_channel, -1);
continue;
}
// at this point, max_assignable may actually be zero. Since
// the bandwidth quota is subtracted once the data has been
// send. If the peer was added to the queue while the data was
// still being sent, max_assignable may have been > 0 at that time.
int max_assignable = qe.peer->max_assignable_bandwidth(m_channel);
if (max_assignable == 0)
{
t->expire_bandwidth(m_channel, -1);
continue;
}
// don't hand out chunks larger than the throttle
// per second on the torrent
if (max_assignable > t->bandwidth_throttle(m_channel))
max_assignable = t->bandwidth_throttle(m_channel);
// so, hand out max_assignable, but no more than
// the available bandwidth (amount) and no more
// than the max_bandwidth_block_size
int single_amount = std::min(amount
, std::min(bandwidth_block_size_limit
, max_assignable));
assert(single_amount > 0);
amount -= single_amount;
qe.peer->assign_bandwidth(m_channel, single_amount);
t->assign_bandwidth(m_channel, single_amount);
add_history_entry(history_entry(qe.peer, t, single_amount, now + window_size));
}
}
catch (std::exception& e)
{ assert(false); };
}

View File

@ -1907,9 +1907,8 @@ namespace libtorrent
if (m_remote_dl_rate == 0) factor = 0.0f;
m_remote_dl_rate =
(m_remote_dl_rate * factor) +
((m_remote_bytes_dled * (1.0f-factor)) / 60.f);
m_remote_dl_rate = int((m_remote_dl_rate * factor) +
((m_remote_bytes_dled * (1.0f-factor)) / 60.f));
m_remote_bytes_dled = 0;
m_remote_dl_update = now;

View File

@ -482,8 +482,8 @@ namespace detail
: m_strand(m_io_service)
, m_files(40)
, m_half_open(m_io_service)
, m_dl_bandwidth_manager(m_io_service, peer_connection::download_channel)
, m_ul_bandwidth_manager(m_io_service, peer_connection::upload_channel)
, m_download_channel(m_io_service, peer_connection::download_channel)
, m_upload_channel(m_io_service, peer_connection::upload_channel)
, m_tracker_manager(m_settings, m_tracker_proxy)
, m_listen_port_range(listen_port_range)
, m_listen_interface(address::from_string(listen_interface), listen_port_range.first)
@ -501,13 +501,12 @@ namespace detail
, m_next_connect_torrent(0)
, m_checker_impl(*this)
{
m_bandwidth_manager[peer_connection::download_channel] = &m_download_channel;
m_bandwidth_manager[peer_connection::upload_channel] = &m_upload_channel;
#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
m_logger = create_log("main_session", listen_port(), false);
(*m_logger) << time_now_string() << "\n";
m_dl_bandwidth_manager.m_ses = this;
m_ul_bandwidth_manager.m_ses = this;
#endif
#ifdef TORRENT_STATS
@ -1771,14 +1770,6 @@ namespace detail
}
#endif
void session_impl::set_download_rate_limit(int bytes_per_second)
{
assert(bytes_per_second > 0 || bytes_per_second == -1);
mutex_t::scoped_lock l(m_mutex);
if (bytes_per_second == -1) bytes_per_second = bandwidth_limit::inf;
m_dl_bandwidth_manager.throttle(bytes_per_second);
}
bool session_impl::is_listening() const
{
mutex_t::scoped_lock l(m_mutex);
@ -1861,12 +1852,20 @@ namespace detail
m_half_open.limit(limit);
}
void session_impl::set_download_rate_limit(int bytes_per_second)
{
assert(bytes_per_second > 0 || bytes_per_second == -1);
mutex_t::scoped_lock l(m_mutex);
if (bytes_per_second == -1) bytes_per_second = bandwidth_limit::inf;
m_bandwidth_manager[peer_connection::download_channel]->throttle(bytes_per_second);
}
void session_impl::set_upload_rate_limit(int bytes_per_second)
{
assert(bytes_per_second > 0 || bytes_per_second == -1);
mutex_t::scoped_lock l(m_mutex);
if (bytes_per_second == -1) bytes_per_second = bandwidth_limit::inf;
m_ul_bandwidth_manager.throttle(bytes_per_second);
m_bandwidth_manager[peer_connection::upload_channel]->throttle(bytes_per_second);
}
int session_impl::num_uploads() const
@ -1887,7 +1886,6 @@ namespace detail
return m_connections.size();
}
std::auto_ptr<alert> session_impl::pop_alert()
{
mutex_t::scoped_lock l(m_mutex);
@ -1905,15 +1903,14 @@ namespace detail
int session_impl::upload_rate_limit() const
{
mutex_t::scoped_lock l(m_mutex);
return m_ul_bandwidth_manager.throttle();
return m_bandwidth_manager[peer_connection::upload_channel]->throttle();
}
int session_impl::download_rate_limit() const
{
mutex_t::scoped_lock l(m_mutex);
return m_dl_bandwidth_manager.throttle();
return m_bandwidth_manager[peer_connection::download_channel]->throttle();
}
void session_impl::start_lsd()
{

View File

@ -1946,35 +1946,29 @@ namespace libtorrent
}
}
bool torrent::request_bandwidth_from_session(int channel) const
{
int max_assignable = m_bandwidth_limit[channel].max_assignable();
return max_assignable > max_bandwidth_block_size
|| (m_bandwidth_limit[channel].throttle() < max_bandwidth_block_size
&& max_assignable == m_bandwidth_limit[channel].throttle());
}
int torrent::bandwidth_throttle(int channel) const
{
return m_bandwidth_limit[channel].throttle();
}
void torrent::request_bandwidth(int channel
, boost::intrusive_ptr<peer_connection> p
, boost::intrusive_ptr<peer_connection> const& p
, bool non_prioritized)
{
if (request_bandwidth_from_session(channel))
int block_size = m_bandwidth_limit[channel].throttle() / 10;
if (m_bandwidth_limit[channel].max_assignable() > 0)
{
if (channel == peer_connection::upload_channel)
m_ses.m_ul_bandwidth_manager.request_bandwidth(p, non_prioritized);
else if (channel == peer_connection::download_channel)
m_ses.m_dl_bandwidth_manager.request_bandwidth(p, non_prioritized);
m_bandwidth_limit[channel].assign(max_bandwidth_block_size);
perform_bandwidth_request(channel, p, block_size, non_prioritized);
}
else
{
m_bandwidth_queue[channel].push_back(bw_queue_entry(p, non_prioritized));
// skip forward in the queue until we find a prioritized peer
// or hit the front of it.
queue_t::reverse_iterator i = m_bandwidth_queue[channel].rbegin();
while (i != m_bandwidth_queue[channel].rend() && i->non_prioritized) ++i;
m_bandwidth_queue[channel].insert(i.base(), bw_queue_entry<peer_connection>(
p, block_size, non_prioritized));
}
}
@ -1982,30 +1976,40 @@ namespace libtorrent
{
session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
assert(amount >= -1);
if (amount == -1) amount = max_bandwidth_block_size;
assert(amount > 0);
m_bandwidth_limit[channel].expire(amount);
while (!m_bandwidth_queue[channel].empty()
&& request_bandwidth_from_session(channel))
while (!m_bandwidth_queue[channel].empty())
{
bw_queue_entry qe = m_bandwidth_queue[channel].front();
bw_queue_entry<peer_connection> qe = m_bandwidth_queue[channel].front();
if (m_bandwidth_limit[channel].max_assignable() == 0)
break;
m_bandwidth_queue[channel].pop_front();
if (channel == peer_connection::upload_channel)
m_ses.m_ul_bandwidth_manager.request_bandwidth(qe.peer, qe.non_prioritized);
else if (channel == peer_connection::download_channel)
m_ses.m_dl_bandwidth_manager.request_bandwidth(qe.peer, qe.non_prioritized);
m_bandwidth_limit[channel].assign(max_bandwidth_block_size);
perform_bandwidth_request(channel, qe.peer
, qe.max_block_size, qe.non_prioritized);
}
}
void torrent::assign_bandwidth(int channel, int amount)
void torrent::perform_bandwidth_request(int channel
, boost::intrusive_ptr<peer_connection> const& p
, int block_size
, bool non_prioritized)
{
assert(m_bandwidth_limit[channel].max_assignable() >= block_size);
m_ses.m_bandwidth_manager[channel]->request_bandwidth(p
, block_size, non_prioritized);
m_bandwidth_limit[channel].assign(block_size);
}
void torrent::assign_bandwidth(int channel, int amount, int blk)
{
session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
assert(amount >= 0);
if (amount < max_bandwidth_block_size)
expire_bandwidth(channel, max_bandwidth_block_size - amount);
assert(amount > 0);
assert(amount <= blk);
if (amount < blk)
expire_bandwidth(channel, blk - amount);
}
// called when torrent is finished (all interested pieces downloaded)

View File

@ -26,5 +26,6 @@ test-suite libtorrent :
[ run test_swarm.cpp ]
[ run test_allocate_resources.cpp ]
[ run test_web_seed.cpp ]
[ run test_bandwidth_limiter.cpp ]
;

View File

@ -52,7 +52,7 @@ setup_transfer(session* ses1, session* ses2, session* ses3
t.add_tracker(tracker_url);
std::vector<char> piece(16 * 1024);
for (int i = 0; i < piece.size(); ++i)
for (int i = 0; i < int(piece.size()); ++i)
piece[i] = (i % 26) + 'A';
// calculate the hash for all pieces

View File

@ -0,0 +1,402 @@
#include "test.hpp"
#include "libtorrent/bandwidth_manager.hpp"
#include "libtorrent/socket.hpp"
#include "libtorrent/stat.hpp"
#include "libtorrent/time.hpp"
#include <boost/lexical_cast.hpp>
struct torrent;
struct peer_connection;
using namespace libtorrent;
struct torrent
{
torrent(bandwidth_manager<peer_connection, torrent>& m)
: m_bandwidth_manager(m)
{}
void assign_bandwidth(int channel, int amount, int max_block_size)
{
// std::cerr << time_now_string()
// << ": assign bandwidth, " << amount << " blk: " << max_block_size << std::endl;
assert(amount > 0);
if (amount < max_block_size)
expire_bandwidth(channel, max_block_size - amount);
}
int bandwidth_throttle(int channel) const
{ return m_bandwidth_limit[channel].throttle(); }
int max_assignable_bandwidth(int channel) const
{ return m_bandwidth_limit[channel].max_assignable(); }
void request_bandwidth(int channel
, boost::intrusive_ptr<peer_connection> const& p
, bool non_prioritized)
{
// std::cerr << time_now_string()
// << ": request bandwidth" << std::endl;
int block_size = m_bandwidth_limit[channel].throttle() / 10;
if (m_bandwidth_limit[channel].max_assignable() > 0)
{
perform_bandwidth_request(channel, p, block_size, non_prioritized);
}
else
{
// skip forward in the queue until we find a prioritized peer
// or hit the front of it.
queue_t::reverse_iterator i = m_bandwidth_queue[channel].rbegin();
while (i != m_bandwidth_queue[channel].rend() && i->non_prioritized) ++i;
m_bandwidth_queue[channel].insert(i.base(), bw_queue_entry<peer_connection>(
p, block_size, non_prioritized));
}
}
void expire_bandwidth(int channel, int amount)
{
// std::cerr << time_now_string()
// << ": expire bandwidth, " << amount << std::endl;
assert(amount > 0);
m_bandwidth_limit[channel].expire(amount);
while (!m_bandwidth_queue[channel].empty())
{
bw_queue_entry<peer_connection> qe = m_bandwidth_queue[channel].front();
if (m_bandwidth_limit[channel].max_assignable() == 0)
break;
m_bandwidth_queue[channel].pop_front();
perform_bandwidth_request(channel, qe.peer
, qe.max_block_size, qe.non_prioritized);
}
}
void perform_bandwidth_request(int channel
, boost::intrusive_ptr<peer_connection> const& p
, int block_size
, bool non_prioritized)
{
m_bandwidth_manager.request_bandwidth(p
, block_size, non_prioritized);
m_bandwidth_limit[channel].assign(block_size);
}
bandwidth_limit m_bandwidth_limit[1];
typedef std::deque<bw_queue_entry<peer_connection> > queue_t;
queue_t m_bandwidth_queue[1];
bandwidth_manager<peer_connection, torrent>& m_bandwidth_manager;
};
struct peer_connection
{
peer_connection(io_service& ios, boost::shared_ptr<torrent> const& t
, bool prio, bool ignore_limits, std::string name)
: m_torrent(t)
, m_prioritized(prio)
, m_ignore_limits(ignore_limits)
, m_abort(false)
, m_ios(ios)
, m_name(name)
, m_refs(0)
{}
bool ignore_bandwidth_limits() { return m_ignore_limits; }
int max_assignable_bandwidth(int channel) const
{ return m_bandwidth_limit[channel].max_assignable(); }
boost::weak_ptr<torrent> associated_torrent() const
{ return m_torrent; }
bool is_disconnecting() const { return m_abort; }
void assign_bandwidth(int channel, int amount)
{
// std::cerr << time_now_string() << ": [" << m_name
// << "] assign bandwidth, " << amount << std::endl;
assert(amount > 0);
m_bandwidth_limit[channel].assign(amount);
boost::shared_ptr<torrent> t = m_torrent.lock();
if (!t) return;
m_stats.sent_bytes(amount, 0);
m_ios.post(boost::bind(&torrent::request_bandwidth, t, int(0)
, intrusive_ptr<peer_connection>(this), !m_prioritized));
}
void start()
{
boost::shared_ptr<torrent> t = m_torrent.lock();
if (!t) return;
t->request_bandwidth(0, this, !m_prioritized);
}
void stop() { m_abort = true; }
void expire_bandwidth(int channel, int amount)
{
assert(amount > 0);
// std::cerr << time_now_string() << ": [" << m_name
// << "] expire bandwidth, " << amount << std::endl;
m_bandwidth_limit[channel].expire(amount);
}
void tick()
{
// std::cerr << time_now_string() << ": [" << m_name
// << "] tick, rate: " << m_stats.upload_rate() << std::endl;
m_stats.second_tick(1.f);
}
int bandwidth_throttle(int channel) const
{ return m_bandwidth_limit[channel].throttle(); }
bandwidth_limit m_bandwidth_limit[1];
boost::weak_ptr<torrent> m_torrent;
bool m_prioritized;
bool m_ignore_limits;
bool m_abort;
stat m_stats;
io_service& m_ios;
std::string m_name;
int m_refs;
};
void intrusive_ptr_add_ref(peer_connection* p)
{
++p->m_refs;
}
void intrusive_ptr_release(peer_connection* p)
{
if (--p->m_refs == 0)
delete p;
}
typedef std::vector<boost::intrusive_ptr<peer_connection> > connections_t;
void do_tick(asio::error_code const&e, deadline_timer& tick, connections_t& v)
{
if (e) return;
std::for_each(v.begin(), v.end()
, boost::bind(&peer_connection::tick, _1));
tick.expires_from_now(seconds(1));
tick.async_wait(boost::bind(&do_tick, _1, boost::ref(tick), boost::ref(v)));
}
void do_stop(deadline_timer& tick, connections_t& v)
{
tick.cancel();
std::for_each(v.begin(), v.end()
, boost::bind(&peer_connection::stop, _1));
}
void run_test(io_service& ios, connections_t& v)
{
std::cerr << "-------------\n" << std::endl;
deadline_timer tick(ios);
tick.expires_from_now(seconds(1));
tick.async_wait(boost::bind(&do_tick, _1, boost::ref(tick), boost::ref(v)));
deadline_timer complete(ios);
complete.expires_from_now(seconds(stat::history * 2));
complete.async_wait(boost::bind(&do_stop, boost::ref(tick), boost::ref(v)));
std::for_each(v.begin(), v.end()
, boost::bind(&peer_connection::start, _1));
ios.run();
}
bool close_to(float val, float comp, float err)
{
return fabs(val - comp) <= err;
}
void spawn_connections(connections_t& v, io_service& ios
, boost::shared_ptr<torrent> t, int num, char const* prefix)
{
for (int i = 0; i < num; ++i)
{
v.push_back(new peer_connection(ios, t, true, false
, prefix + boost::lexical_cast<std::string>(i)));
}
}
void test_equal_connections(int num, int limit)
{
std::cerr << "test equal connections " << num << " " << limit << std::endl;
io_service ios;
bandwidth_manager<peer_connection, torrent> manager(ios, 0);
manager.throttle(limit);
boost::shared_ptr<torrent> t1(new torrent(manager));
connections_t v;
spawn_connections(v, ios, t1, num, "p");
run_test(ios, v);
float sum = 0.f;
float err = std::max(limit / num * 0.3f, 1000.f);
for (connections_t::iterator i = v.begin()
, end(v.end()); i != end; ++i)
{
sum += (*i)->m_stats.upload_rate();
std::cerr << (*i)->m_stats.upload_rate()
<< " target: " << (limit / num) << " eps: " << err << std::endl;
TEST_CHECK(close_to((*i)->m_stats.upload_rate(), limit / num, err));
}
std::cerr << "sum: " << sum << " target: " << limit << std::endl;
TEST_CHECK(close_to(sum, limit, 50));
}
void test_single_peer(int limit, bool torrent_limit)
{
std::cerr << "test single peer " << limit << " " << torrent_limit << std::endl;
io_service ios;
bandwidth_manager<peer_connection, torrent> manager(ios, 0);
boost::shared_ptr<torrent> t1(new torrent(manager));
if (torrent_limit)
t1->m_bandwidth_limit[0].throttle(limit);
else
manager.throttle(limit);
connections_t v;
spawn_connections(v, ios, t1, 1, "p");
run_test(ios, v);
float sum = 0.f;
for (connections_t::iterator i = v.begin()
, end(v.end()); i != end; ++i)
{
sum += (*i)->m_stats.upload_rate();
}
std::cerr << sum << " target: " << limit << std::endl;
TEST_CHECK(close_to(sum, limit, 1000));
}
void test_equal_torrents(int num, int limit)
{
std::cerr << "test equal torrents " << num << " " << limit << std::endl;
io_service ios;
bandwidth_manager<peer_connection, torrent> manager(ios, 0);
boost::shared_ptr<torrent> t1(new torrent(manager));
boost::shared_ptr<torrent> t2(new torrent(manager));
t1->m_bandwidth_limit[0].throttle(limit);
t2->m_bandwidth_limit[0].throttle(limit);
connections_t v1;
spawn_connections(v1, ios, t1, num, "t1p");
connections_t v2;
spawn_connections(v2, ios, t2, num, "t2p");
connections_t v;
std::copy(v1.begin(), v1.end(), std::back_inserter(v));
std::copy(v2.begin(), v2.end(), std::back_inserter(v));
run_test(ios, v);
float sum = 0.f;
for (connections_t::iterator i = v1.begin()
, end(v1.end()); i != end; ++i)
{
sum += (*i)->m_stats.upload_rate();
}
std::cerr << sum << " target: " << limit << std::endl;
TEST_CHECK(close_to(sum, limit, 1000));
sum = 0.f;
for (connections_t::iterator i = v2.begin()
, end(v2.end()); i != end; ++i)
{
sum += (*i)->m_stats.upload_rate();
}
std::cerr << sum << " target: " << limit << std::endl;
TEST_CHECK(close_to(sum, limit, 1000));
}
void test_peer_priority(int limit, bool torrent_limit)
{
std::cerr << "test peer priority " << limit << " " << torrent_limit << std::endl;
io_service ios;
bandwidth_manager<peer_connection, torrent> manager(ios, 0);
boost::shared_ptr<torrent> t1(new torrent(manager));
if (torrent_limit)
t1->m_bandwidth_limit[0].throttle(limit);
else
manager.throttle(limit);
connections_t v1;
spawn_connections(v1, ios, t1, 10, "p");
connections_t v;
std::copy(v1.begin(), v1.end(), std::back_inserter(v));
boost::intrusive_ptr<peer_connection> p(
new peer_connection(ios, t1, false, false, "no-priority"));
v.push_back(p);
run_test(ios, v);
float sum = 0.f;
for (connections_t::iterator i = v1.begin()
, end(v1.end()); i != end; ++i)
{
sum += (*i)->m_stats.upload_rate();
}
std::cerr << sum << " target: " << limit << std::endl;
TEST_CHECK(close_to(sum, limit, 50));
std::cerr << "non-prioritized rate: " << p->m_stats.upload_rate() << std::endl;
TEST_CHECK(p->m_stats.upload_rate() < 10);
}
void test_no_starvation(int limit)
{
std::cerr << "test no starvation " << limit << std::endl;
io_service ios;
bandwidth_manager<peer_connection, torrent> manager(ios, 0);
boost::shared_ptr<torrent> t1(new torrent(manager));
boost::shared_ptr<torrent> t2(new torrent(manager));
manager.throttle(limit);
const int num_peers = 20;
connections_t v1;
spawn_connections(v1, ios, t1, num_peers, "p");
connections_t v;
std::copy(v1.begin(), v1.end(), std::back_inserter(v));
boost::intrusive_ptr<peer_connection> p(
new peer_connection(ios, t2, false, false, "no-priority"));
v.push_back(p);
run_test(ios, v);
float sum = 0.f;
for (connections_t::iterator i = v.begin()
, end(v.end()); i != end; ++i)
{
sum += (*i)->m_stats.upload_rate();
}
std::cerr << sum << " target: " << limit << std::endl;
TEST_CHECK(close_to(sum, limit, 50));
std::cerr << "non-prioritized rate: " << p->m_stats.upload_rate() << std::endl;
TEST_CHECK(close_to(p->m_stats.upload_rate(), limit / (num_peers + 1), 1000));
}
int test_main()
{
using namespace libtorrent;
test_equal_connections(2, 20000);
test_equal_connections(3, 20000);
test_equal_connections(5, 20000);
test_equal_connections(7, 20000);
test_equal_connections(33, 60000);
test_equal_connections(33, 500000);
test_equal_torrents(1, 40000);
test_equal_torrents(24, 50000);
test_single_peer(40000, true);
test_single_peer(40000, false);
test_peer_priority(40000, false);
test_peer_priority(40000, true);
test_no_starvation(40000);
return 0;
}