diff --git a/include/libtorrent/bandwidth_limit.hpp b/include/libtorrent/bandwidth_limit.hpp new file mode 100644 index 000000000..e0675aa31 --- /dev/null +++ b/include/libtorrent/bandwidth_limit.hpp @@ -0,0 +1,120 @@ +/* + +Copyright (c) 2007, Arvid Norberg +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in + the documentation and/or other materials provided with the distribution. + * Neither the name of the author nor the names of its + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. + +*/ + +#ifndef TORRENT_BANDWIDTH_LIMIT_HPP_INCLUDED +#define TORRENT_BANDWIDTH_LIMIT_HPP_INCLUDED + +#include + +#include "libtorrent/assert.hpp" + +namespace libtorrent { + +// member of peer_connection +struct bandwidth_limit +{ + static const int inf = boost::integer_traits::const_max; + + bandwidth_limit() + : m_quota_left(0) + , m_local_limit(inf) + , m_current_rate(0) + {} + + void throttle(int limit) + { + TORRENT_ASSERT(limit > 0); + m_local_limit = limit; + } + + int throttle() const + { + return m_local_limit; + } + + void assign(int amount) + { + TORRENT_ASSERT(amount >= 0); + m_current_rate += amount; + m_quota_left += amount; + } + + void use_quota(int amount) + { + TORRENT_ASSERT(amount <= m_quota_left); + m_quota_left -= amount; + } + + int quota_left() const + { + return (std::max)(m_quota_left, 0); + } + + void expire(int amount) + { + TORRENT_ASSERT(amount >= 0); + m_current_rate -= amount; + } + + int max_assignable() const + { + if (m_local_limit == inf) return inf; + if (m_local_limit <= m_current_rate) return 0; + return m_local_limit - m_current_rate; + } + +private: + + // this is the amount of bandwidth we have + // been assigned without using yet. i.e. + // the bandwidth that we use up every time + // we receive or send a message. Once this + // hits zero, we need to request more + // bandwidth from the torrent which + // in turn will request bandwidth from + // the bandwidth manager + int m_quota_left; + + // the local limit is the number of bytes + // per window size we are allowed to use. + int m_local_limit; + + // the current rate is the number of + // bytes we have been assigned within + // the window size. + int m_current_rate; +}; + +} + +#endif + diff --git a/include/libtorrent/bandwidth_manager.hpp b/include/libtorrent/bandwidth_manager.hpp index 53735dbd7..adba2d453 100644 --- a/include/libtorrent/bandwidth_manager.hpp +++ b/include/libtorrent/bandwidth_manager.hpp @@ -44,6 +44,8 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/socket.hpp" #include "libtorrent/invariant_check.hpp" #include "libtorrent/assert.hpp" +#include "libtorrent/bandwidth_limit.hpp" +#include "libtorrent/bandwidth_queue_entry.hpp" using boost::weak_ptr; using boost::shared_ptr; @@ -77,91 +79,6 @@ struct history_entry weak_ptr tor; }; -template -struct bw_queue_entry -{ - bw_queue_entry(boost::intrusive_ptr const& pe - , int blk, bool no_prio) - : peer(pe), max_block_size(blk), non_prioritized(no_prio) {} - boost::intrusive_ptr peer; - int max_block_size; - bool non_prioritized; -}; - -// member of peer_connection -struct bandwidth_limit -{ - static const int inf = boost::integer_traits::const_max; - - bandwidth_limit() throw() - : m_quota_left(0) - , m_local_limit(inf) - , m_current_rate(0) - {} - - void throttle(int limit) throw() - { - m_local_limit = limit; - } - - int throttle() const throw() - { - return m_local_limit; - } - - void assign(int amount) throw() - { - TORRENT_ASSERT(amount >= 0); - m_current_rate += amount; - m_quota_left += amount; - } - - void use_quota(int amount) throw() - { - TORRENT_ASSERT(amount <= m_quota_left); - m_quota_left -= amount; - } - - int quota_left() const throw() - { - return (std::max)(m_quota_left, 0); - } - - void expire(int amount) throw() - { - TORRENT_ASSERT(amount >= 0); - m_current_rate -= amount; - } - - int max_assignable() const throw() - { - if (m_local_limit == inf) return inf; - if (m_local_limit <= m_current_rate) return 0; - return m_local_limit - m_current_rate; - } - -private: - - // this is the amount of bandwidth we have - // been assigned without using yet. i.e. - // the bandwidth that we use up every time - // we receive or send a message. Once this - // hits zero, we need to request more - // bandwidth from the torrent which - // in turn will request bandwidth from - // the bandwidth manager - int m_quota_left; - - // the local limit is the number of bytes - // per window size we are allowed to use. - int m_local_limit; - - // the current rate is the number of - // bytes we have been assigned within - // the window size. - int m_current_rate; -}; - template T clamp(T val, T ceiling, T floor) throw() { @@ -204,15 +121,25 @@ struct bandwidth_manager } #ifndef NDEBUG - bool is_queued(PeerConnection const* peer) + bool is_queued(PeerConnection const* peer) const { - for (typename queue_t::iterator i = m_queue.begin() + for (typename queue_t::const_iterator i = m_queue.begin() , end(m_queue.end()); i != end; ++i) { if (i->peer.get() == peer) return true; } return false; } + + bool is_in_history(PeerConnection const* peer) const + { + for (typename history_t::const_iterator i + = m_history.begin(), end(m_history.end()); i != end; ++i) + { + if (i->peer.get() == peer) return true; + } + return false; + } #endif @@ -231,15 +158,9 @@ struct bandwidth_manager // make sure this peer isn't already in line // waiting for bandwidth TORRENT_ASSERT(!is_queued(peer.get())); + TORRENT_ASSERT(peer->max_assignable_bandwidth(m_channel) > 0); boost::shared_ptr t = peer->associated_torrent().lock(); - - if (peer->max_assignable_bandwidth(m_channel) == 0) - { - t->expire_bandwidth(m_channel, blk); - peer->assign_bandwidth(m_channel, 0); - return; - } m_queue.push_back(bw_queue_entry(peer, blk, non_prioritized)); if (!non_prioritized) { @@ -360,12 +281,21 @@ private: << " m_current_quota = " << m_current_quota << std::endl; #endif - while (!m_queue.empty() && amount > 0) + if (amount <= 0) + { + m_in_hand_out_bandwidth = false; + return; + } + + queue_t q; + queue_t tmp; + m_queue.swap(q); + while (!q.empty() && amount > 0) { TORRENT_ASSERT(amount == limit - m_current_quota); - bw_queue_entry qe = m_queue.front(); + bw_queue_entry qe = q.front(); TORRENT_ASSERT(qe.max_block_size > 0); - m_queue.pop_front(); + q.pop_front(); shared_ptr t = qe.peer->associated_torrent().lock(); if (!t) continue; @@ -377,17 +307,13 @@ private: } // at this point, max_assignable may actually be zero. Since - // the bandwidth quota is subtracted once the data has been - // sent. If the peer was added to the queue while the data was - // still being sent, max_assignable may have been > 0 at that time. - int max_assignable = (std::min)( - qe.peer->max_assignable_bandwidth(m_channel) - , t->max_assignable_bandwidth(m_channel)); + // the rate limit of the peer might have changed while it + // was in the queue. + int max_assignable = qe.peer->max_assignable_bandwidth(m_channel); if (max_assignable == 0) { - t->expire_bandwidth(m_channel, qe.max_block_size); - qe.peer->assign_bandwidth(m_channel, 0); - TORRENT_ASSERT(amount == limit - m_current_quota); + TORRENT_ASSERT(is_in_history(qe.peer.get())); + tmp.push_back(qe); continue; } @@ -432,7 +358,7 @@ private: #endif if (amount < block_size / 2) { - m_queue.push_front(qe); + tmp.push_back(qe); break; } @@ -451,6 +377,8 @@ private: qe.peer, t, hand_out_amount, now + bw_window_size)); TORRENT_ASSERT(amount == limit - m_current_quota); } + if (!q.empty()) m_queue.insert(m_queue.begin(), q.begin(), q.end()); + if (!tmp.empty()) m_queue.insert(m_queue.begin(), tmp.begin(), tmp.end()); m_in_hand_out_bandwidth = false; } diff --git a/include/libtorrent/bandwidth_queue_entry.hpp b/include/libtorrent/bandwidth_queue_entry.hpp new file mode 100644 index 000000000..76c119d96 --- /dev/null +++ b/include/libtorrent/bandwidth_queue_entry.hpp @@ -0,0 +1,54 @@ +/* + +Copyright (c) 2007, Arvid Norberg +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in + the documentation and/or other materials provided with the distribution. + * Neither the name of the author nor the names of its + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. + +*/ + +#ifndef TORRENT_BANDWIDTH_QUEUE_ENTRY_HPP_INCLUDED +#define TORRENT_BANDWIDTH_QUEUE_ENTRY_HPP_INCLUDED + +#include + +namespace libtorrent { + +template +struct bw_queue_entry +{ + bw_queue_entry(boost::intrusive_ptr const& pe + , int blk, bool no_prio) + : peer(pe), max_block_size(blk), non_prioritized(no_prio) {} + boost::intrusive_ptr peer; + int max_block_size; + bool non_prioritized; +}; + +} + +#endif + diff --git a/include/libtorrent/peer_connection.hpp b/include/libtorrent/peer_connection.hpp index 1a9c73d94..927bdeb47 100755 --- a/include/libtorrent/peer_connection.hpp +++ b/include/libtorrent/peer_connection.hpp @@ -69,7 +69,7 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/piece_block_progress.hpp" #include "libtorrent/config.hpp" #include "libtorrent/session.hpp" -#include "libtorrent/bandwidth_manager.hpp" +#include "libtorrent/bandwidth_limit.hpp" #include "libtorrent/policy.hpp" #include "libtorrent/socket_type.hpp" #include "libtorrent/intrusive_ptr_base.hpp" diff --git a/include/libtorrent/torrent.hpp b/include/libtorrent/torrent.hpp index 1f193fb4a..033644987 100755 --- a/include/libtorrent/torrent.hpp +++ b/include/libtorrent/torrent.hpp @@ -65,7 +65,8 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/piece_picker.hpp" #include "libtorrent/config.hpp" #include "libtorrent/escape_string.hpp" -#include "libtorrent/bandwidth_manager.hpp" +#include "libtorrent/bandwidth_limit.hpp" +#include "libtorrent/bandwidth_queue_entry.hpp" #include "libtorrent/storage.hpp" #include "libtorrent/hasher.hpp" #include "libtorrent/assert.hpp" diff --git a/src/peer_connection.cpp b/src/peer_connection.cpp index e9eb949f0..176d07dec 100755 --- a/src/peer_connection.cpp +++ b/src/peer_connection.cpp @@ -2916,6 +2916,12 @@ namespace libtorrent #ifndef NDEBUG void peer_connection::check_invariant() const { + for (int i = 0; i < 2; ++i) + { + // this peer is in the bandwidth history iff max_assignable < limit + TORRENT_ASSERT((m_bandwidth_limit[i].max_assignable() < m_bandwidth_limit[i].throttle()) + == m_ses.m_bandwidth_manager[i]->is_in_history(this)); + } if (m_peer_info) { TORRENT_ASSERT(m_peer_info->connection == this diff --git a/src/torrent.cpp b/src/torrent.cpp index 3ddbb1e58..bae5187d6 100755 --- a/src/torrent.cpp +++ b/src/torrent.cpp @@ -2264,6 +2264,7 @@ namespace libtorrent , bool non_prioritized) { TORRENT_ASSERT(m_bandwidth_limit[channel].throttle() > 0); + TORRENT_ASSERT(p->max_assignable_bandwidth(channel) > 0); int block_size = m_bandwidth_limit[channel].throttle() / 10; if (block_size <= 0) block_size = 1; @@ -2288,16 +2289,23 @@ namespace libtorrent TORRENT_ASSERT(amount > 0); m_bandwidth_limit[channel].expire(amount); - + queue_t tmp; while (!m_bandwidth_queue[channel].empty()) { bw_queue_entry qe = m_bandwidth_queue[channel].front(); if (m_bandwidth_limit[channel].max_assignable() == 0) break; m_bandwidth_queue[channel].pop_front(); + if (qe.peer->max_assignable_bandwidth(channel) <= 0) + { + TORRENT_ASSERT(m_ses.m_bandwidth_manager[channel]->is_in_history(qe.peer.get())); + if (!qe.peer->is_disconnecting()) tmp.push_back(qe); + continue; + } perform_bandwidth_request(channel, qe.peer , qe.max_block_size, qe.non_prioritized); } + m_bandwidth_queue[channel].insert(m_bandwidth_queue[channel].begin(), tmp.begin(), tmp.end()); } void torrent::perform_bandwidth_request(int channel diff --git a/test/test_bandwidth_limiter.cpp b/test/test_bandwidth_limiter.cpp index 97f659b6a..ba9b01bb0 100644 --- a/test/test_bandwidth_limiter.cpp +++ b/test/test_bandwidth_limiter.cpp @@ -4,6 +4,7 @@ #include "libtorrent/socket.hpp" #include "libtorrent/stat.hpp" #include "libtorrent/time.hpp" +#include "libtorrent/intrusive_ptr_base.hpp" #include @@ -28,8 +29,8 @@ struct torrent std::cerr << time_now_string() << ": assign bandwidth, " << amount << " blk: " << max_block_size << std::endl; #endif - assert(amount > 0); - assert(amount <= max_block_size); + TEST_CHECK(amount > 0); + TEST_CHECK(amount <= max_block_size); if (amount < max_block_size) expire_bandwidth(channel, max_block_size - amount); } @@ -44,7 +45,7 @@ struct torrent , boost::intrusive_ptr const& p , bool non_prioritized) { - assert(m_bandwidth_limit[channel].throttle() > 0); + TEST_CHECK(m_bandwidth_limit[channel].throttle() > 0); int block_size = m_bandwidth_limit[channel].throttle() / 10; if (block_size <= 0) block_size = 1; @@ -54,7 +55,6 @@ struct torrent std::cerr << time_now_string() << ": request bandwidth " << block_size << std::endl; #endif - perform_bandwidth_request(channel, p, block_size, non_prioritized); } else @@ -73,24 +73,7 @@ struct torrent } } - void expire_bandwidth(int channel, int amount) - { -#ifdef VERBOSE_LOGGING - std::cerr << time_now_string() - << ": expire bandwidth, " << amount << std::endl; -#endif - assert(amount > 0); - m_bandwidth_limit[channel].expire(amount); - while (!m_bandwidth_queue[channel].empty()) - { - bw_queue_entry qe = m_bandwidth_queue[channel].front(); - if (m_bandwidth_limit[channel].max_assignable() == 0) - break; - m_bandwidth_queue[channel].pop_front(); - perform_bandwidth_request(channel, qe.peer - , qe.max_block_size, qe.non_prioritized); - } - } + void expire_bandwidth(int channel, int amount); void perform_bandwidth_request(int channel , boost::intrusive_ptr const& p @@ -107,7 +90,7 @@ struct torrent bandwidth_manager& m_bandwidth_manager; }; -struct peer_connection +struct peer_connection: intrusive_ptr_base { peer_connection(io_service& ios, boost::shared_ptr const& t , bool prio, bool ignore_limits, std::string name) @@ -117,7 +100,7 @@ struct peer_connection , m_abort(false) , m_ios(ios) , m_name(name) - , m_refs(0) + , m_writing(false) {} bool ignore_bandwidth_limits() { return m_ignore_limits; } @@ -128,33 +111,53 @@ struct peer_connection bool is_disconnecting() const { return m_abort; } void assign_bandwidth(int channel, int amount) { + TEST_CHECK(m_writing); #ifdef VERBOSE_LOGGING std::cerr << time_now_string() << ": [" << m_name << "] assign bandwidth, " << amount << std::endl; #endif - assert(amount > 0); + TEST_CHECK(amount > 0); m_bandwidth_limit[channel].assign(amount); + m_ios.post(boost::bind(&peer_connection::on_transfer, self(), channel, amount)); + } + void on_transfer(int channel, int amount) + { + TEST_CHECK(m_writing); + m_writing = false; + m_stats.sent_bytes(amount, 0); + boost::shared_ptr t = m_torrent.lock(); if (!t) return; - m_stats.sent_bytes(amount, 0); - m_ios.post(boost::bind(&torrent::request_bandwidth, t, int(0) - , intrusive_ptr(this), !m_prioritized)); + if (m_bandwidth_limit[channel].max_assignable() > 0) + { + m_writing = true; + t->request_bandwidth(0, this, !m_prioritized); + } } void start() { boost::shared_ptr t = m_torrent.lock(); if (!t) return; + m_writing = true; t->request_bandwidth(0, this, !m_prioritized); } void stop() { m_abort = true; } void expire_bandwidth(int channel, int amount) { - assert(amount > 0); + TEST_CHECK(amount > 0); #ifdef VERBOSE_LOGGING std::cerr << time_now_string() << ": [" << m_name << "] expire bandwidth, " << amount << std::endl; #endif m_bandwidth_limit[channel].expire(amount); + + if (!m_writing && m_bandwidth_limit[channel].max_assignable() > 0) + { + boost::shared_ptr t = m_torrent.lock(); + if (!t) return; + m_writing = true; + t->request_bandwidth(0, this, !m_prioritized); + } } void tick() { @@ -168,6 +171,8 @@ struct peer_connection int bandwidth_throttle(int channel) const { return m_bandwidth_limit[channel].throttle(); } + void throttle(int limit) { m_bandwidth_limit[0].throttle(limit); } + bandwidth_limit m_bandwidth_limit[1]; boost::weak_ptr m_torrent; bool m_prioritized; @@ -176,18 +181,34 @@ struct peer_connection libtorrent::stat m_stats; io_service& m_ios; std::string m_name; - int m_refs; + bool m_writing; }; -void intrusive_ptr_add_ref(peer_connection* p) +void torrent::expire_bandwidth(int channel, int amount) { - ++p->m_refs; -} - -void intrusive_ptr_release(peer_connection* p) -{ - if (--p->m_refs == 0) - delete p; +#ifdef VERBOSE_LOGGING + std::cerr << time_now_string() + << ": expire bandwidth, " << amount << std::endl; +#endif + TEST_CHECK(amount > 0); + m_bandwidth_limit[channel].expire(amount); + queue_t tmp; + while (!m_bandwidth_queue[channel].empty()) + { + bw_queue_entry qe = m_bandwidth_queue[channel].front(); + if (m_bandwidth_limit[channel].max_assignable() == 0) + break; + m_bandwidth_queue[channel].pop_front(); + if (qe.peer->max_assignable_bandwidth(channel) <= 0) + { + TORRENT_ASSERT(m_bandwidth_manager.is_in_history(qe.peer.get())); + if (!qe.peer->is_disconnecting()) tmp.push_back(qe); + continue; + } + perform_bandwidth_request(channel, qe.peer + , qe.max_block_size, qe.non_prioritized); + } + m_bandwidth_queue[channel].insert(m_bandwidth_queue[channel].begin(), tmp.begin(), tmp.end()); } typedef std::vector > connections_t; @@ -216,10 +237,56 @@ void do_stop(deadline_timer& tick, connections_t& v) std::cerr << " stopping..." << std::endl; } +void do_change_rate(asio::error_code const&e, deadline_timer& tick + , boost::shared_ptr t1 + , boost::shared_ptr t2 + , int limit + , int counter) +{ + TEST_CHECK(!e); + if (e) return; + + if (counter == 0) + { + t1->m_bandwidth_limit[0].throttle(limit); + t2->m_bandwidth_limit[0].throttle(limit); + return; + } + + t1->m_bandwidth_limit[0].throttle(limit + limit / 2 * ((counter & 1)?-1:1)); + t2->m_bandwidth_limit[0].throttle(limit + limit / 2 * ((counter & 1)?1:-1)); + + tick.expires_from_now(milliseconds(1600)); + tick.async_wait(boost::bind(&do_change_rate, _1, boost::ref(tick), t1, t2, limit, counter-1)); +} + +void do_change_peer_rate(asio::error_code const&e, deadline_timer& tick + , connections_t& v + , int limit + , int counter) +{ + TEST_CHECK(!e); + if (e) return; + + if (counter == 0) + { + std::for_each(v.begin(), v.end() + , boost::bind(&peer_connection::throttle, _1, limit)); + return; + } + + int c = counter; + for (connections_t::iterator i = v.begin(); i != v.end(); ++i, ++c) + i->get()->throttle(limit + limit / 2 * ((c & 1)?-1:1)); + + tick.expires_from_now(milliseconds(1100)); + tick.async_wait(boost::bind(&do_change_peer_rate, _1, boost::ref(tick), boost::ref(v), limit, counter-1)); +} + void run_test(io_service& ios, connections_t& v) { abort_tick = false; - std::cerr << "-------------\n" << std::endl; + std::cerr << "-------------" << std::endl; deadline_timer tick(ios); tick.expires_from_now(seconds(1)); tick.async_wait(boost::bind(&do_tick, _1, boost::ref(tick), boost::ref(v))); @@ -251,7 +318,7 @@ void spawn_connections(connections_t& v, io_service& ios void test_equal_connections(int num, int limit) { - std::cerr << "test equal connections " << num << " " << limit << std::endl; + std::cerr << "\ntest equal connections " << num << " " << limit << std::endl; io_service ios; bandwidth_manager manager(ios, 0); manager.throttle(limit); @@ -279,9 +346,53 @@ void test_equal_connections(int num, int limit) TEST_CHECK(close_to(sum, limit, 50)); } +void test_connections_variable_rate(int num, int limit, int torrent_limit) +{ + std::cerr << "\ntest connections variable rate" << num + << " l: " << limit + << " t: " << torrent_limit + << std::endl; + io_service ios; + bandwidth_manager manager(ios, 0); + + boost::shared_ptr t1(new torrent(manager)); + if (torrent_limit) + t1->m_bandwidth_limit[0].throttle(torrent_limit); + + connections_t v; + spawn_connections(v, ios, t1, num, "p"); + std::for_each(v.begin(), v.end() + , boost::bind(&peer_connection::throttle, _1, limit)); + + deadline_timer change_rate(ios); + change_rate.expires_from_now(milliseconds(1600)); + change_rate.async_wait(boost::bind(&do_change_peer_rate, _1, boost::ref(change_rate) + , boost::ref(v), limit, 9)); + run_test(ios, v); + + if (torrent_limit > 0 && limit * num > torrent_limit) + limit = torrent_limit / num; + + float sum = 0.f; + float err = limit * 0.3f; + for (connections_t::iterator i = v.begin() + , end(v.end()); i != end; ++i) + { + sum += (*i)->m_stats.total_payload_upload(); + + std::cerr << (*i)->m_stats.total_payload_upload() / sample_time + << " target: " << limit << " eps: " << err << std::endl; + TEST_CHECK(close_to((*i)->m_stats.total_payload_upload() / sample_time, limit, err)); + } + sum /= sample_time; + std::cerr << "sum: " << sum << " target: " << (limit * num) << std::endl; + TEST_CHECK(sum > 0); + TEST_CHECK(close_to(sum, limit * num, limit * 0.3f * num)); +} + void test_single_peer(int limit, bool torrent_limit) { - std::cerr << "test single peer " << limit << " " << torrent_limit << std::endl; + std::cerr << "\ntest single peer " << limit << " " << torrent_limit << std::endl; io_service ios; bandwidth_manager manager(ios, 0); boost::shared_ptr t1(new torrent(manager)); @@ -309,7 +420,10 @@ void test_single_peer(int limit, bool torrent_limit) void test_torrents(int num, int limit1, int limit2, int global_limit) { - std::cerr << "test equal torrents " << num << " " << limit1 << " " << limit2 << std::endl; + std::cerr << "\ntest equal torrents " << num + << " l1: " << limit1 + << " l2: " << limit2 + << " g: " << global_limit << std::endl; io_service ios; bandwidth_manager manager(ios, 0); if (global_limit > 0) @@ -330,6 +444,11 @@ void test_torrents(int num, int limit1, int limit2, int global_limit) std::copy(v2.begin(), v2.end(), std::back_inserter(v)); run_test(ios, v); + if (global_limit > 0 && global_limit < limit1 + limit2) + { + limit1 = (std::min)(limit1, global_limit / 2); + limit2 = global_limit - limit1; + } float sum = 0.f; for (connections_t::iterator i = v1.begin() , end(v1.end()); i != end; ++i) @@ -353,9 +472,65 @@ void test_torrents(int num, int limit1, int limit2, int global_limit) TEST_CHECK(close_to(sum, limit2, 1000)); } +void test_torrents_variable_rate(int num, int limit, int global_limit) +{ + std::cerr << "\ntest torrents variable rate" << num + << " l: " << limit + << " g: " << global_limit << std::endl; + io_service ios; + bandwidth_manager manager(ios, 0); + if (global_limit > 0) + manager.throttle(global_limit); + + boost::shared_ptr t1(new torrent(manager)); + boost::shared_ptr t2(new torrent(manager)); + + t1->m_bandwidth_limit[0].throttle(limit); + t2->m_bandwidth_limit[0].throttle(limit); + + connections_t v1; + spawn_connections(v1, ios, t1, num, "t1p"); + connections_t v2; + spawn_connections(v2, ios, t2, num, "t2p"); + connections_t v; + std::copy(v1.begin(), v1.end(), std::back_inserter(v)); + std::copy(v2.begin(), v2.end(), std::back_inserter(v)); + + deadline_timer change_rate(ios); + change_rate.expires_from_now(milliseconds(1100)); + change_rate.async_wait(boost::bind(&do_change_rate, _1, boost::ref(change_rate), t1, t2, limit, 9)); + + run_test(ios, v); + + if (global_limit > 0 && global_limit < 2 * limit) + limit = global_limit / 2; + + float sum = 0.f; + for (connections_t::iterator i = v1.begin() + , end(v1.end()); i != end; ++i) + { + sum += (*i)->m_stats.total_payload_upload(); + } + sum /= sample_time; + std::cerr << sum << " target: " << limit << std::endl; + TEST_CHECK(sum > 0); + TEST_CHECK(close_to(sum, limit, 1000)); + + sum = 0.f; + for (connections_t::iterator i = v2.begin() + , end(v2.end()); i != end; ++i) + { + sum += (*i)->m_stats.total_payload_upload(); + } + sum /= sample_time; + std::cerr << sum << " target: " << limit << std::endl; + TEST_CHECK(sum > 0); + TEST_CHECK(close_to(sum, limit, 1000)); +} + void test_peer_priority(int limit, bool torrent_limit) { - std::cerr << "test peer priority " << limit << " " << torrent_limit << std::endl; + std::cerr << "\ntest peer priority " << limit << " " << torrent_limit << std::endl; io_service ios; bandwidth_manager manager(ios, 0); boost::shared_ptr t1(new torrent(manager)); @@ -391,7 +566,7 @@ void test_peer_priority(int limit, bool torrent_limit) void test_no_starvation(int limit) { - std::cerr << "test no starvation " << limit << std::endl; + std::cerr << "\ntest no starvation " << limit << std::endl; io_service ios; bandwidth_manager manager(ios, 0); boost::shared_ptr t1(new torrent(manager)); @@ -437,11 +612,21 @@ int test_main() test_equal_connections(7, 20000); test_equal_connections(33, 60000); test_equal_connections(33, 500000); + test_connections_variable_rate(2, 20, 0); + test_connections_variable_rate(5, 20000, 0); + test_connections_variable_rate(3, 2000, 6000); + test_connections_variable_rate(5, 2000, 30000); + test_connections_variable_rate(33, 500000, 0); test_torrents(2, 400, 400, 0); test_torrents(2, 100, 500, 0); test_torrents(2, 3000, 3000, 6000); test_torrents(1, 40000, 40000, 0); test_torrents(24, 50000, 50000, 0); + test_torrents(5, 6000, 6000, 3000); + test_torrents(5, 6000, 5000, 4000); + test_torrents(5, 20000, 20000, 30000); + test_torrents_variable_rate(5, 6000, 3000); + test_torrents_variable_rate(5, 20000, 30000); test_single_peer(40000, true); test_single_peer(40000, false); test_peer_priority(40000, false);