move the DHT rate limiter into the dht_tracker class and remove the rate_limited_udp_socket type. This further simplifies the udp socket (preparing for moving it into the listen_socket structure)

This commit is contained in:
arvidn 2016-01-17 15:09:27 -05:00
parent e2392017bc
commit 297b8943d0
10 changed files with 59 additions and 72 deletions

View File

@ -635,7 +635,9 @@ namespace libtorrent
void update_connection_speed(); void update_connection_speed();
void update_queued_disk_bytes(); void update_queued_disk_bytes();
void update_alert_queue_size(); void update_alert_queue_size();
#ifndef TORRENT_NO_DEPRECATE
void update_dht_upload_rate_limit(); void update_dht_upload_rate_limit();
#endif
void update_disk_threads(); void update_disk_threads();
void update_network_threads(); void update_network_threads();
void update_cache_buffer_chunk_size(); void update_cache_buffer_chunk_size();
@ -1031,7 +1033,7 @@ namespace libtorrent
// but for the udp port used by the DHT. // but for the udp port used by the DHT.
int m_external_udp_port; int m_external_udp_port;
rate_limited_udp_socket m_udp_socket; udp_socket m_udp_socket;
libtorrent::utp_socket_manager m_utp_socket_manager; libtorrent::utp_socket_manager m_utp_socket_manager;
#ifdef TORRENT_USE_OPENSSL #ifdef TORRENT_USE_OPENSSL

View File

@ -71,7 +71,7 @@ namespace libtorrent { namespace dht
, udp_socket_observer , udp_socket_observer
, boost::enable_shared_from_this<dht_tracker> , boost::enable_shared_from_this<dht_tracker>
{ {
dht_tracker(dht_observer* observer, rate_limited_udp_socket& sock dht_tracker(dht_observer* observer, udp_socket& sock
, dht_settings const& settings, counters& cnt , dht_settings const& settings, counters& cnt
, dht_storage_constructor_type storage_constructor , dht_storage_constructor_type storage_constructor
, entry const& state); , entry const& state);
@ -155,7 +155,7 @@ namespace libtorrent { namespace dht
counters& m_counters; counters& m_counters;
node m_dht; node m_dht;
rate_limited_udp_socket& m_sock; udp_socket& m_sock;
dht_logger* m_log; dht_logger* m_log;
std::vector<char> m_send_buf; std::vector<char> m_send_buf;
@ -170,6 +170,10 @@ namespace libtorrent { namespace dht
// used to resolve hostnames for nodes // used to resolve hostnames for nodes
udp::resolver m_host_resolver; udp::resolver m_host_resolver;
// state for the send rate limit
int m_send_quota;
time_point m_last_tick;
}; };
}} }}

View File

@ -1406,6 +1406,7 @@ namespace libtorrent
, block_ratelimit(5) , block_ratelimit(5)
, read_only(false) , read_only(false)
, item_lifetime(0) , item_lifetime(0)
, upload_rate_limit(8000)
{} {}
// the maximum number of peers to send in a reply to ``get_peers`` // the maximum number of peers to send in a reply to ``get_peers``
@ -1507,6 +1508,11 @@ namespace libtorrent
// the number of seconds a immutable/mutable item will be expired. // the number of seconds a immutable/mutable item will be expired.
// default is 0, means never expires. // default is 0, means never expires.
int item_lifetime; int item_lifetime;
// the number of bytes per second (on average) the DHT is allowed to send.
// If the incoming requests causes to many bytes to be sent in responses,
// incoming requests will be dropped until the quota has been replenished.
int upload_rate_limit;
}; };

View File

@ -1248,11 +1248,15 @@ namespace libtorrent
deprecated4, deprecated4,
#endif #endif
#ifndef TORRENT_NO_DEPRECATE
// ``dht_upload_rate_limit`` sets the rate limit on the DHT. This is // ``dht_upload_rate_limit`` sets the rate limit on the DHT. This is
// specified in bytes per second and defaults to 4000. For busy boxes // specified in bytes per second and defaults to 4000. For busy boxes
// with lots of torrents that requires more DHT traffic, this should // with lots of torrents that requires more DHT traffic, this should
// be raised. // be raised.
dht_upload_rate_limit, dht_upload_rate_limit,
#else
deprecated7,
#endif
// ``unchoke_slots_limit`` is the max number of unchoked peers in the // ``unchoke_slots_limit`` is the max number of unchoked peers in the
// session. The number of unchoke slots may be ignored depending on // session. The number of unchoke slots may be ignored depending on

View File

@ -309,21 +309,6 @@ namespace libtorrent
int m_outstanding_socks; int m_outstanding_socks;
#endif #endif
}; };
struct rate_limited_udp_socket : public udp_socket
{
rate_limited_udp_socket(io_service& ios);
void set_rate_limit(int limit) { m_rate_limit = limit; }
bool send(udp::endpoint const& ep, char const* p, int len
, error_code& ec, int flags = 0);
bool has_quota();
private:
int m_rate_limit;
int m_quota;
time_point m_last_tick;
};
} }
#endif #endif

View File

@ -49,6 +49,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/version.hpp" #include "libtorrent/version.hpp"
#include "libtorrent/time.hpp" #include "libtorrent/time.hpp"
#include "libtorrent/performance_counters.hpp" // for counters #include "libtorrent/performance_counters.hpp" // for counters
#include "libtorrent/aux_/time.hpp"
#include "libtorrent/aux_/disable_warnings_push.hpp" #include "libtorrent/aux_/disable_warnings_push.hpp"
@ -89,7 +90,7 @@ namespace libtorrent { namespace dht
// class that puts the networking and the kademlia node in a single // class that puts the networking and the kademlia node in a single
// unit and connecting them together. // unit and connecting them together.
dht_tracker::dht_tracker(dht_observer* observer dht_tracker::dht_tracker(dht_observer* observer
, rate_limited_udp_socket& sock , udp_socket& sock
, dht_settings const& settings , dht_settings const& settings
, counters& cnt , counters& cnt
, dht_storage_constructor_type storage_constructor , dht_storage_constructor_type storage_constructor
@ -104,6 +105,8 @@ namespace libtorrent { namespace dht
, m_settings(settings) , m_settings(settings)
, m_abort(false) , m_abort(false)
, m_host_resolver(sock.get_io_service()) , m_host_resolver(sock.get_io_service())
, m_send_quota(settings.upload_rate_limit)
, m_last_tick(aux::time_now())
{ {
#ifndef TORRENT_DISABLE_LOGGING #ifndef TORRENT_DISABLE_LOGGING
m_log->log(dht_logger::tracker, "starting DHT tracker with node id: %s" m_log->log(dht_logger::tracker, "starting DHT tracker with node id: %s"
@ -406,10 +409,18 @@ namespace libtorrent { namespace dht
bool dht_tracker::has_quota() bool dht_tracker::has_quota()
{ {
return m_sock.has_quota(); time_point now = clock_type::now();
time_duration delta = now - m_last_tick;
m_last_tick = now;
// add any new quota we've accrued since last time
m_send_quota += boost::uint64_t(m_settings.upload_rate_limit)
* total_microseconds(delta) / 1000000;
return m_send_quota > 0;
} }
bool dht_tracker::send_packet(libtorrent::entry& e, udp::endpoint const& addr, int send_flags) // TODO: 4 do we need the flags here?
bool dht_tracker::send_packet(libtorrent::entry& e, udp::endpoint const& addr
, int send_flags)
{ {
using libtorrent::bencode; using libtorrent::bencode;
using libtorrent::entry; using libtorrent::entry;
@ -420,10 +431,26 @@ namespace libtorrent { namespace dht
m_send_buf.clear(); m_send_buf.clear();
bencode(std::back_inserter(m_send_buf), e); bencode(std::back_inserter(m_send_buf), e);
error_code ec;
bool ret = m_sock.send(addr, &m_send_buf[0], int(m_send_buf.size()), ec, send_flags); // update the quota. We won't prevent the packet to be sent if we exceed
if (!ret || ec) // the quota, we'll just (potentially) block the next incoming request.
time_point const now = clock_type::now();
time_duration const delta = now - m_last_tick;
m_last_tick = now;
// add any new quota we've accrued since last time
m_send_quota += boost::uint64_t(m_settings.upload_rate_limit)
* total_microseconds(delta) / 1000000;
// allow 3 seconds worth of burst
if (m_send_quota > 3 * m_settings.upload_rate_limit)
m_send_quota = 3 * m_settings.upload_rate_limit;
m_send_quota -= m_send_buf.size();
error_code ec;
m_sock.send(addr, &m_send_buf[0], int(m_send_buf.size()), ec, send_flags);
if (ec)
{ {
m_counters.inc_stats_counter(counters::dht_messages_out_dropped); m_counters.inc_stats_counter(counters::dht_messages_out_dropped);
#ifndef TORRENT_DISABLE_LOGGING #ifndef TORRENT_DISABLE_LOGGING

View File

@ -204,10 +204,6 @@ namespace libtorrent
// unchoke many peers // unchoke many peers
set.set_int(settings_pack::unchoke_slots_limit, 2000); set.set_int(settings_pack::unchoke_slots_limit, 2000);
// we need more DHT capacity to ping more peers
// candidates before trying to connect
set.set_int(settings_pack::dht_upload_rate_limit, 20000);
// use 1 GB of cache // use 1 GB of cache
set.set_int(settings_pack::cache_size, 32768 * 2); set.set_int(settings_pack::cache_size, 32768 * 2);
set.set_bool(settings_pack::use_read_cache, true); set.set_bool(settings_pack::use_read_cache, true);

View File

@ -438,7 +438,6 @@ namespace aux {
#if TORRENT_USE_ASSERTS #if TORRENT_USE_ASSERTS
m_posting_torrent_updates = false; m_posting_torrent_updates = false;
#endif #endif
m_udp_socket.set_rate_limit(m_settings.get_int(settings_pack::dht_upload_rate_limit));
m_udp_socket.subscribe(&m_utp_socket_manager); m_udp_socket.subscribe(&m_utp_socket_manager);
m_udp_socket.subscribe(this); m_udp_socket.subscribe(this);
@ -6067,10 +6066,15 @@ retry:
|| m_settings.get_int(settings_pack::unchoke_slots_limit) < 0; || m_settings.get_int(settings_pack::unchoke_slots_limit) < 0;
} }
#ifndef TORRENT_NO_DEPRECATE
void session_impl::update_dht_upload_rate_limit() void session_impl::update_dht_upload_rate_limit()
{ {
m_udp_socket.set_rate_limit(m_settings.get_int(settings_pack::dht_upload_rate_limit)); #ifndef TORRENT_DISABLE_DHT
m_dht_settings.upload_rate_limit
= m_settings.get_int(settings_pack::dht_upload_rate_limit);
#endif
} }
#endif
void session_impl::update_disk_threads() void session_impl::update_disk_threads()
{ {

View File

@ -297,7 +297,7 @@ namespace libtorrent
SET(download_rate_limit, 0, &session_impl::update_download_rate), SET(download_rate_limit, 0, &session_impl::update_download_rate),
DEPRECATED_SET(local_upload_rate_limit, 0, &session_impl::update_local_upload_rate), DEPRECATED_SET(local_upload_rate_limit, 0, &session_impl::update_local_upload_rate),
DEPRECATED_SET(local_download_rate_limit, 0, &session_impl::update_local_download_rate), DEPRECATED_SET(local_download_rate_limit, 0, &session_impl::update_local_download_rate),
SET(dht_upload_rate_limit, 4000, &session_impl::update_dht_upload_rate_limit), DEPRECATED_SET(dht_upload_rate_limit, 4000, &session_impl::update_dht_upload_rate_limit),
SET(unchoke_slots_limit, 8, &session_impl::update_unchoke_limit), SET(unchoke_slots_limit, 8, &session_impl::update_unchoke_limit),
DEPRECATED_SET(half_open_limit, 0, 0), DEPRECATED_SET(half_open_limit, 0, 0),
SET(connections_limit, 200, &session_impl::update_connections_limit), SET(connections_limit, 200, &session_impl::update_connections_limit),

View File

@ -1413,44 +1413,3 @@ void udp_socket::drain_queue()
} }
} }
rate_limited_udp_socket::rate_limited_udp_socket(io_service& ios)
: udp_socket(ios)
, m_rate_limit(8000)
, m_quota(8000)
, m_last_tick(aux::time_now())
{
}
bool rate_limited_udp_socket::has_quota()
{
time_point now = clock_type::now();
time_duration delta = now - m_last_tick;
m_last_tick = now;
// add any new quota we've accrued since last time
m_quota += boost::uint64_t(m_rate_limit) * total_microseconds(delta) / 1000000;
return m_quota > 0;
}
bool rate_limited_udp_socket::send(udp::endpoint const& ep, char const* p
, int len, error_code& ec, int flags)
{
time_point now = clock_type::now();
time_duration delta = now - m_last_tick;
m_last_tick = now;
// add any new quota we've accrued since last time
m_quota += boost::uint64_t(m_rate_limit) * total_microseconds(delta) / 1000000;
// allow 3 seconds worth of burst
if (m_quota > 3 * m_rate_limit) m_quota = 3 * m_rate_limit;
// if there's no quota, and it's OK to drop, just
// drop the packet
if (m_quota < 0 && (flags & dont_drop) == 0) return false;
m_quota -= len;
if (m_quota < 0) m_quota = 0;
udp_socket::send(ep, p, len, ec, flags);
return true;
}