From c52de084210643d13bd213bda2a70da19be8f269 Mon Sep 17 00:00:00 2001 From: Arvid Norberg Date: Sun, 19 Nov 2006 15:29:58 +0000 Subject: [PATCH] added more logging and modified the resource allocation algorithm. A new, looser, upload and download bandwidth limiter is now used, in order to be able to use more of the available bandwidth --- .../aux_/allocate_resources_impl.hpp | 105 ++++++++++++++++-- include/libtorrent/torrent.hpp | 6 + src/allocate_resources.cpp | 2 +- src/bt_peer_connection.cpp | 4 +- src/kademlia/dht_tracker.cpp | 2 + src/peer_connection.cpp | 24 +++- src/session_impl.cpp | 2 +- src/torrent.cpp | 56 +++++++++- 8 files changed, 178 insertions(+), 23 deletions(-) diff --git a/include/libtorrent/aux_/allocate_resources_impl.hpp b/include/libtorrent/aux_/allocate_resources_impl.hpp index 4be3d8a1e..066314b03 100644 --- a/include/libtorrent/aux_/allocate_resources_impl.hpp +++ b/include/libtorrent/aux_/allocate_resources_impl.hpp @@ -74,6 +74,11 @@ namespace libtorrent return accepted; } + inline int div_round_up(int numerator, int denominator) + { + return (numerator + denominator - 1) / denominator; + } + #ifndef NDEBUG template @@ -120,7 +125,11 @@ namespace libtorrent sum_max = saturated_add(sum_max, ((*i).*m_res).max); sum_min = saturated_add(sum_min, ((*i).*m_res).min); } - assert(sum_given == (std::min)(std::max(m_resources, sum_min), sum_max)); + if (sum_given != (std::min)(std::max(m_resources, sum_min), sum_max)) + { + std::cerr << sum_given << " " << m_resources << " " << sum_min << " " << sum_max << std::endl; + assert(false); + } } }; @@ -157,19 +166,89 @@ namespace libtorrent int sum_max = 0; int sum_min = 0; + // the number of consumer that saturated their + // quota last time slice + int num_saturated = 0; + // the total resources those that saturated their + // quota used. This is used to calculate the mean + // of the saturating consumers, in order to + // balance their quotas for the next time slice. + size_type saturated_sum = 0; for (It i = start; i != end; ++i) { - sum_max = saturated_add(sum_max, ((*i).*res).max); - assert(((*i).*res).min < resource_request::inf); - assert(((*i).*res).min >= 0); - assert(((*i).*res).min <= ((*i).*res).max); - sum_min += ((*i).*res).min; - ((*i).*res).given = ((*i).*res).min; + resource_request& r = (*i).*res; + sum_max = saturated_add(sum_max, r.max); + assert(r.min < resource_request::inf); + assert(r.min >= 0); + assert(r.min <= r.max); + sum_min += r.min; + + // a consumer that uses 95% or more of its assigned + // quota is considered saturating + size_type used = r.used; + if (used * 20 / r.given >= 19) + { + ++num_saturated; + saturated_sum += r.given; + } } if (resources == 0 || sum_max == 0) return; + if (sum_max <= resources) + { + // it turns out that there's no competition for resources + // after all. + for (It i = start; i != end; ++i) + { + ((*i).*res).given = ((*i).*res).max; + } + return; + } + + if (sum_min >= resources) + { + // the amount of resources is smaller than + // the minimum resources to distribute, so + // give everyone the minimum + for (It i = start; i != end; ++i) + { + ((*i).*res).given = ((*i).*res).min; + } + return; + } + + // now, the "used" field will be used as a target value. + // the algorithm following this loop will then scale the + // used values to fit the available resources and store + // the scaled values as given. So, the ratios of the + // used values will be maintained. + for (It i = start; i != end; ++i) + { + resource_request& r = (*i).*res; + + int target; + size_type used = r.used; + if (used * 20 / r.given >= 19) + { + assert(num_saturated > 0); + target = div_round_up(saturated_sum, num_saturated); + target += div_round_up(target, 10); + } + else + { + target = r.used; + } + if (target > r.max) target = r.max; + else if (target < r.min) target = r.min; + + // move 50% towards the the target value + r.used = r.given + div_round_up(target - r.given, 2); + r.given = r.min; + } + + resources = (std::max)(resources, sum_min); int resources_to_distribute = (std::min)(resources, sum_max) - sum_min; assert(resources_to_distribute >= 0); @@ -178,12 +257,14 @@ namespace libtorrent #endif while (resources_to_distribute > 0) { + // in order to scale, we need to calculate the sum of + // all the used values. size_type total_used = 0; size_type max_used = 0; for (It i = start; i != end; ++i) { resource_request& r = (*i).*res; - if(r.given == r.max) continue; + if (r.given == r.max) continue; assert(r.given < r.max); @@ -191,19 +272,18 @@ namespace libtorrent total_used += (size_type)r.used + 1; } + size_type kNumer = resources_to_distribute; size_type kDenom = total_used; assert(kNumer >= 0); assert(kDenom >= 0); assert(kNumer <= (std::numeric_limits::max)()); - assert(total_used < (std::numeric_limits::max)()); if (kNumer * max_used <= kDenom) { kNumer = 1; kDenom = max_used; assert(kDenom >= 0); - assert(kDenom <= (std::numeric_limits::max)()); } for (It i = start; i != end && resources_to_distribute > 0; ++i) @@ -220,7 +300,11 @@ namespace libtorrent to_give = resources_to_distribute; assert(to_give >= 0); assert(to_give <= resources_to_distribute); +#ifndef NDEBUG + int tmp = resources_to_distribute; +#endif resources_to_distribute -= give(r, (int)to_give); + assert(resources_to_distribute <= tmp); assert(resources_to_distribute >= 0); } @@ -230,6 +314,7 @@ namespace libtorrent prev_resources_to_distribute = resources_to_distribute; #endif } + assert(resources_to_distribute == 0); } } // namespace libtorrent::aux diff --git a/include/libtorrent/torrent.hpp b/include/libtorrent/torrent.hpp index fe38f9c4e..eb06f2b59 100755 --- a/include/libtorrent/torrent.hpp +++ b/include/libtorrent/torrent.hpp @@ -567,6 +567,12 @@ namespace libtorrent // can upload per second int m_upload_bandwidth_limit; int m_download_bandwidth_limit; + + // the accumulated excess upload and download + // bandwidth used. Used to balance out the + // bandwidth to match the limit over time + int m_excess_ul; + int m_excess_dl; boost::filesystem::path m_save_path; diff --git a/src/allocate_resources.cpp b/src/allocate_resources.cpp index 7d08b036d..94d124328 100644 --- a/src/allocate_resources.cpp +++ b/src/allocate_resources.cpp @@ -1,6 +1,6 @@ /* -Copyright (c) 2003, Magnus Jonsson, Arvid Norberg +Copyright (c) 2006, Magnus Jonsson, Arvid Norberg All rights reserved. Redistribution and use in source and binary forms, with or without diff --git a/src/bt_peer_connection.cpp b/src/bt_peer_connection.cpp index add52b311..efee77553 100755 --- a/src/bt_peer_connection.cpp +++ b/src/bt_peer_connection.cpp @@ -742,13 +742,13 @@ namespace libtorrent } catch (std::exception& exc) { -#ifdef TORRENT_VERBOSE_LOGGIGN +#ifdef TORRENT_VERBOSE_LOGGING (*m_logger) << "invalid extended handshake: " << exc.what() << "\n"; #endif return; } -#ifdef TORRENT_VERBOSE_LOGGIGN +#ifdef TORRENT_VERBOSE_LOGGING std::stringstream ext; root.print(ext); (*m_logger) << "<== EXTENDED HANDSHAKE: \n" << ext.str(); diff --git a/src/kademlia/dht_tracker.cpp b/src/kademlia/dht_tracker.cpp index 1b98d1710..c11ddb0e2 100644 --- a/src/kademlia/dht_tracker.cpp +++ b/src/kademlia/dht_tracker.cpp @@ -218,6 +218,8 @@ namespace libtorrent { namespace dht } catch (std::exception& exc) { + std::cerr << "exception-type: " << typeid(exc).name() << std::endl; + std::cerr << "what: " << exc.what() << std::endl; assert(false); }; diff --git a/src/peer_connection.cpp b/src/peer_connection.cpp index 506f0fc30..8404176ca 100755 --- a/src/peer_connection.cpp +++ b/src/peer_connection.cpp @@ -351,7 +351,18 @@ namespace libtorrent // optimization, don't send have messages // to peers that already have the piece if (has_piece(index)) return; + +#ifdef TORRENT_VERBOSE_LOGGING + using namespace boost::posix_time; + (*m_logger) << to_simple_string(second_clock::universal_time()) + << " ==> HAVE [ piece: " << index << "]\n"; +#endif write_have(index); +#ifndef NDEBUG + boost::shared_ptr t = m_torrent.lock(); + assert(t); + assert(t->have_piece(index)); +#endif } bool peer_connection::has_piece(int i) const @@ -866,6 +877,11 @@ namespace libtorrent && r.length + r.start <= t->torrent_file().piece_size(r.piece) && m_peer_interested) { +#ifdef TORRENT_VERBOSE_LOGGING + using namespace boost::posix_time; + (*m_logger) << to_simple_string(second_clock::universal_time()) + << " <== REQUEST [ piece: " << r.piece << " | s: " << r.start << " | l: " << r.length << " ]\n"; +#endif // if we have choked the client // ignore the request if (m_choked) @@ -873,11 +889,6 @@ namespace libtorrent m_requests.push_back(r); fill_send_buffer(); -#ifdef TORRENT_VERBOSE_LOGGING - using namespace boost::posix_time; - (*m_logger) << to_simple_string(second_clock::universal_time()) - << " <== REQUEST [ piece: " << r.piece << " | s: " << r.start << " | l: " << r.length << " ]\n"; -#endif } else { @@ -890,7 +901,8 @@ namespace libtorrent "l: " << r.length << " | " "i: " << m_peer_interested << " | " "t: " << (int)t->torrent_file().piece_size(r.piece) << " | " - "n: " << t->torrent_file().num_pieces() << " ]\n"; + "n: " << t->torrent_file().num_pieces() << " | " + "h: " << t->have_piece(r.piece) << " ]\n"; #endif ++m_num_invalid_requests; diff --git a/src/session_impl.cpp b/src/session_impl.cpp index 0e9179548..485d9724e 100755 --- a/src/session_impl.cpp +++ b/src/session_impl.cpp @@ -732,7 +732,7 @@ namespace libtorrent { namespace detail #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING) (*m_logger) << endp << " <== INCOMING CONNECTION\n"; #endif - if (m_ip_filter.access(endp.address().to_v4()) & ip_filter::blocked) + if (m_ip_filter.access(endp.address()) & ip_filter::blocked) { #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING) (*m_logger) << "filtered blocked ip\n"; diff --git a/src/torrent.cpp b/src/torrent.cpp index 8e214914d..2bb8fe696 100755 --- a/src/torrent.cpp +++ b/src/torrent.cpp @@ -233,6 +233,8 @@ namespace libtorrent , m_net_interface(net_interface.address(), 0) , m_upload_bandwidth_limit(std::numeric_limits::max()) , m_download_bandwidth_limit(std::numeric_limits::max()) + , m_excess_ul(0) + , m_excess_dl(0) , m_save_path(complete(save_path)) , m_compact_mode(compact_mode) , m_default_block_size(block_size) @@ -331,6 +333,8 @@ namespace libtorrent , m_net_interface(net_interface.address(), 0) , m_upload_bandwidth_limit(std::numeric_limits::max()) , m_download_bandwidth_limit(std::numeric_limits::max()) + , m_excess_ul(0) + , m_excess_dl(0) , m_save_path(complete(save_path)) , m_compact_mode(compact_mode) , m_default_block_size(block_size) @@ -520,7 +524,7 @@ namespace libtorrent session_impl::mutex_t::scoped_lock l(m_ses.m_mutex); m_failed_trackers = 0; - // less than 5 minutes announce intervals + // announce intervals less than 5 minutes // are insane. if (interval < 60 * 5) interval = 60 * 5; @@ -1125,6 +1129,13 @@ namespace libtorrent boost::shared_ptr s(new stream_socket(m_ses.m_selector)); boost::intrusive_ptr c(new web_peer_connection( m_ses, shared_from_this(), s, a, url)); + +#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING) + (*m_ses.m_logger) << "add url seed (" << c->m_dl_bandwidth_quota.given << ", " + << c->m_dl_bandwidth_quota.used << ") "; + (*m_ses.m_logger) << "\n"; +#endif + #ifndef NDEBUG c->m_in_constructor = false; #endif @@ -1176,6 +1187,14 @@ namespace libtorrent boost::shared_ptr s(new stream_socket(m_ses.m_selector)); boost::intrusive_ptr c(new bt_peer_connection( m_ses, shared_from_this(), s, a)); + +#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING) + (*m_ses.m_logger) << "connect_to_peer (" << c->m_dl_bandwidth_quota.given << ", " + << c->m_dl_bandwidth_quota.used << ") "; + (*m_ses.m_logger) << "\n"; +#endif + + #ifndef NDEBUG c->m_in_constructor = false; #endif @@ -1797,13 +1816,44 @@ namespace libtorrent assert(m_ul_bandwidth_quota.given >= 0); assert(m_dl_bandwidth_quota.given >= 0); + int ul_used = 0; + int dl_used = 0; + for (peer_iterator i = m_connections.begin(); + i != m_connections.end(); ++i) + { + ul_used += i->second->m_ul_bandwidth_quota.used; + dl_used += i->second->m_dl_bandwidth_quota.used; + } + + m_excess_ul += ul_used - m_ul_bandwidth_quota.given; + m_excess_dl += dl_used - m_dl_bandwidth_quota.given; + + m_excess_ul = std::max(m_excess_ul, 0); + m_excess_dl = std::max(m_excess_dl, 0); + +#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING) +/* for (peer_iterator i = m_connections.begin(); + i != m_connections.end(); ++i) + { + (*m_ses.m_logger) << i->second->remote() << " (" << i->second->m_dl_bandwidth_quota.given << ", " + << i->second->m_dl_bandwidth_quota.used << ") "; + } +*/ + (*m_ses.m_logger) << m_excess_ul << " " << m_excess_dl << "\n"; +#endif + + int ul_to_distribute = int((m_ul_bandwidth_quota.given + - m_excess_ul) * 1.3f); + int dl_to_distribute = int((m_dl_bandwidth_quota.given + - m_excess_dl) * 1.3f); + // distribute allowed upload among the peers - allocate_resources(m_ul_bandwidth_quota.given + allocate_resources(ul_to_distribute , m_connections , &peer_connection::m_ul_bandwidth_quota); // distribute allowed download among the peers - allocate_resources(m_dl_bandwidth_quota.given + allocate_resources(dl_to_distribute , m_connections , &peer_connection::m_dl_bandwidth_quota);