From 297b8943d0a9f1ff8ff90cd1369b089b3aa1444d Mon Sep 17 00:00:00 2001 From: arvidn Date: Sun, 17 Jan 2016 15:09:27 -0500 Subject: [PATCH 1/3] move the DHT rate limiter into the dht_tracker class and remove the rate_limited_udp_socket type. This further simplifies the udp socket (preparing for moving it into the listen_socket structure) --- include/libtorrent/aux_/session_impl.hpp | 4 +- include/libtorrent/kademlia/dht_tracker.hpp | 8 +++- include/libtorrent/session_settings.hpp | 6 +++ include/libtorrent/settings_pack.hpp | 4 ++ include/libtorrent/udp_socket.hpp | 15 -------- src/kademlia/dht_tracker.cpp | 39 +++++++++++++++++--- src/session.cpp | 4 -- src/session_impl.cpp | 8 +++- src/settings_pack.cpp | 2 +- src/udp_socket.cpp | 41 --------------------- 10 files changed, 59 insertions(+), 72 deletions(-) diff --git a/include/libtorrent/aux_/session_impl.hpp b/include/libtorrent/aux_/session_impl.hpp index 6ebfbffbc..40747feb4 100644 --- a/include/libtorrent/aux_/session_impl.hpp +++ b/include/libtorrent/aux_/session_impl.hpp @@ -635,7 +635,9 @@ namespace libtorrent void update_connection_speed(); void update_queued_disk_bytes(); void update_alert_queue_size(); +#ifndef TORRENT_NO_DEPRECATE void update_dht_upload_rate_limit(); +#endif void update_disk_threads(); void update_network_threads(); void update_cache_buffer_chunk_size(); @@ -1031,7 +1033,7 @@ namespace libtorrent // but for the udp port used by the DHT. int m_external_udp_port; - rate_limited_udp_socket m_udp_socket; + udp_socket m_udp_socket; libtorrent::utp_socket_manager m_utp_socket_manager; #ifdef TORRENT_USE_OPENSSL diff --git a/include/libtorrent/kademlia/dht_tracker.hpp b/include/libtorrent/kademlia/dht_tracker.hpp index 7474b6204..e2cc65c38 100644 --- a/include/libtorrent/kademlia/dht_tracker.hpp +++ b/include/libtorrent/kademlia/dht_tracker.hpp @@ -71,7 +71,7 @@ namespace libtorrent { namespace dht , udp_socket_observer , boost::enable_shared_from_this { - dht_tracker(dht_observer* observer, rate_limited_udp_socket& sock + dht_tracker(dht_observer* observer, udp_socket& sock , dht_settings const& settings, counters& cnt , dht_storage_constructor_type storage_constructor , entry const& state); @@ -155,7 +155,7 @@ namespace libtorrent { namespace dht counters& m_counters; node m_dht; - rate_limited_udp_socket& m_sock; + udp_socket& m_sock; dht_logger* m_log; std::vector m_send_buf; @@ -170,6 +170,10 @@ namespace libtorrent { namespace dht // used to resolve hostnames for nodes udp::resolver m_host_resolver; + + // state for the send rate limit + int m_send_quota; + time_point m_last_tick; }; }} diff --git a/include/libtorrent/session_settings.hpp b/include/libtorrent/session_settings.hpp index a65b73826..e69b8208c 100644 --- a/include/libtorrent/session_settings.hpp +++ b/include/libtorrent/session_settings.hpp @@ -1406,6 +1406,7 @@ namespace libtorrent , block_ratelimit(5) , read_only(false) , item_lifetime(0) + , upload_rate_limit(8000) {} // the maximum number of peers to send in a reply to ``get_peers`` @@ -1507,6 +1508,11 @@ namespace libtorrent // the number of seconds a immutable/mutable item will be expired. // default is 0, means never expires. int item_lifetime; + + // the number of bytes per second (on average) the DHT is allowed to send. + // If the incoming requests causes to many bytes to be sent in responses, + // incoming requests will be dropped until the quota has been replenished. + int upload_rate_limit; }; diff --git a/include/libtorrent/settings_pack.hpp b/include/libtorrent/settings_pack.hpp index 9531ecb9c..7575ab887 100644 --- a/include/libtorrent/settings_pack.hpp +++ b/include/libtorrent/settings_pack.hpp @@ -1248,11 +1248,15 @@ namespace libtorrent deprecated4, #endif +#ifndef TORRENT_NO_DEPRECATE // ``dht_upload_rate_limit`` sets the rate limit on the DHT. This is // specified in bytes per second and defaults to 4000. For busy boxes // with lots of torrents that requires more DHT traffic, this should // be raised. dht_upload_rate_limit, +#else + deprecated7, +#endif // ``unchoke_slots_limit`` is the max number of unchoked peers in the // session. The number of unchoke slots may be ignored depending on diff --git a/include/libtorrent/udp_socket.hpp b/include/libtorrent/udp_socket.hpp index 1b21cde01..4293b46b2 100644 --- a/include/libtorrent/udp_socket.hpp +++ b/include/libtorrent/udp_socket.hpp @@ -309,21 +309,6 @@ namespace libtorrent int m_outstanding_socks; #endif }; - - struct rate_limited_udp_socket : public udp_socket - { - rate_limited_udp_socket(io_service& ios); - void set_rate_limit(int limit) { m_rate_limit = limit; } - bool send(udp::endpoint const& ep, char const* p, int len - , error_code& ec, int flags = 0); - bool has_quota(); - - private: - - int m_rate_limit; - int m_quota; - time_point m_last_tick; - }; } #endif diff --git a/src/kademlia/dht_tracker.cpp b/src/kademlia/dht_tracker.cpp index 2f8d7e67c..98cf7c5be 100644 --- a/src/kademlia/dht_tracker.cpp +++ b/src/kademlia/dht_tracker.cpp @@ -49,6 +49,7 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/version.hpp" #include "libtorrent/time.hpp" #include "libtorrent/performance_counters.hpp" // for counters +#include "libtorrent/aux_/time.hpp" #include "libtorrent/aux_/disable_warnings_push.hpp" @@ -89,7 +90,7 @@ namespace libtorrent { namespace dht // class that puts the networking and the kademlia node in a single // unit and connecting them together. dht_tracker::dht_tracker(dht_observer* observer - , rate_limited_udp_socket& sock + , udp_socket& sock , dht_settings const& settings , counters& cnt , dht_storage_constructor_type storage_constructor @@ -104,6 +105,8 @@ namespace libtorrent { namespace dht , m_settings(settings) , m_abort(false) , m_host_resolver(sock.get_io_service()) + , m_send_quota(settings.upload_rate_limit) + , m_last_tick(aux::time_now()) { #ifndef TORRENT_DISABLE_LOGGING m_log->log(dht_logger::tracker, "starting DHT tracker with node id: %s" @@ -406,10 +409,18 @@ namespace libtorrent { namespace dht bool dht_tracker::has_quota() { - return m_sock.has_quota(); + time_point now = clock_type::now(); + time_duration delta = now - m_last_tick; + m_last_tick = now; + // add any new quota we've accrued since last time + m_send_quota += boost::uint64_t(m_settings.upload_rate_limit) + * total_microseconds(delta) / 1000000; + return m_send_quota > 0; } - bool dht_tracker::send_packet(libtorrent::entry& e, udp::endpoint const& addr, int send_flags) + // TODO: 4 do we need the flags here? + bool dht_tracker::send_packet(libtorrent::entry& e, udp::endpoint const& addr + , int send_flags) { using libtorrent::bencode; using libtorrent::entry; @@ -420,10 +431,26 @@ namespace libtorrent { namespace dht m_send_buf.clear(); bencode(std::back_inserter(m_send_buf), e); - error_code ec; - bool ret = m_sock.send(addr, &m_send_buf[0], int(m_send_buf.size()), ec, send_flags); - if (!ret || ec) + // update the quota. We won't prevent the packet to be sent if we exceed + // the quota, we'll just (potentially) block the next incoming request. + time_point const now = clock_type::now(); + time_duration const delta = now - m_last_tick; + m_last_tick = now; + + // add any new quota we've accrued since last time + m_send_quota += boost::uint64_t(m_settings.upload_rate_limit) + * total_microseconds(delta) / 1000000; + + // allow 3 seconds worth of burst + if (m_send_quota > 3 * m_settings.upload_rate_limit) + m_send_quota = 3 * m_settings.upload_rate_limit; + + m_send_quota -= m_send_buf.size(); + + error_code ec; + m_sock.send(addr, &m_send_buf[0], int(m_send_buf.size()), ec, send_flags); + if (ec) { m_counters.inc_stats_counter(counters::dht_messages_out_dropped); #ifndef TORRENT_DISABLE_LOGGING diff --git a/src/session.cpp b/src/session.cpp index adacc0eb5..f89724a69 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -204,10 +204,6 @@ namespace libtorrent // unchoke many peers set.set_int(settings_pack::unchoke_slots_limit, 2000); - // we need more DHT capacity to ping more peers - // candidates before trying to connect - set.set_int(settings_pack::dht_upload_rate_limit, 20000); - // use 1 GB of cache set.set_int(settings_pack::cache_size, 32768 * 2); set.set_bool(settings_pack::use_read_cache, true); diff --git a/src/session_impl.cpp b/src/session_impl.cpp index fc128f79c..9b9fc0ab0 100644 --- a/src/session_impl.cpp +++ b/src/session_impl.cpp @@ -438,7 +438,6 @@ namespace aux { #if TORRENT_USE_ASSERTS m_posting_torrent_updates = false; #endif - m_udp_socket.set_rate_limit(m_settings.get_int(settings_pack::dht_upload_rate_limit)); m_udp_socket.subscribe(&m_utp_socket_manager); m_udp_socket.subscribe(this); @@ -6067,10 +6066,15 @@ retry: || m_settings.get_int(settings_pack::unchoke_slots_limit) < 0; } +#ifndef TORRENT_NO_DEPRECATE void session_impl::update_dht_upload_rate_limit() { - m_udp_socket.set_rate_limit(m_settings.get_int(settings_pack::dht_upload_rate_limit)); +#ifndef TORRENT_DISABLE_DHT + m_dht_settings.upload_rate_limit + = m_settings.get_int(settings_pack::dht_upload_rate_limit); +#endif } +#endif void session_impl::update_disk_threads() { diff --git a/src/settings_pack.cpp b/src/settings_pack.cpp index 9d34fa9f6..dabcb5e8b 100644 --- a/src/settings_pack.cpp +++ b/src/settings_pack.cpp @@ -297,7 +297,7 @@ namespace libtorrent SET(download_rate_limit, 0, &session_impl::update_download_rate), DEPRECATED_SET(local_upload_rate_limit, 0, &session_impl::update_local_upload_rate), DEPRECATED_SET(local_download_rate_limit, 0, &session_impl::update_local_download_rate), - SET(dht_upload_rate_limit, 4000, &session_impl::update_dht_upload_rate_limit), + DEPRECATED_SET(dht_upload_rate_limit, 4000, &session_impl::update_dht_upload_rate_limit), SET(unchoke_slots_limit, 8, &session_impl::update_unchoke_limit), DEPRECATED_SET(half_open_limit, 0, 0), SET(connections_limit, 200, &session_impl::update_connections_limit), diff --git a/src/udp_socket.cpp b/src/udp_socket.cpp index c3726b5ca..92586fc4e 100644 --- a/src/udp_socket.cpp +++ b/src/udp_socket.cpp @@ -1413,44 +1413,3 @@ void udp_socket::drain_queue() } } -rate_limited_udp_socket::rate_limited_udp_socket(io_service& ios) - : udp_socket(ios) - , m_rate_limit(8000) - , m_quota(8000) - , m_last_tick(aux::time_now()) -{ -} - -bool rate_limited_udp_socket::has_quota() -{ - time_point now = clock_type::now(); - time_duration delta = now - m_last_tick; - m_last_tick = now; - // add any new quota we've accrued since last time - m_quota += boost::uint64_t(m_rate_limit) * total_microseconds(delta) / 1000000; - return m_quota > 0; -} - -bool rate_limited_udp_socket::send(udp::endpoint const& ep, char const* p - , int len, error_code& ec, int flags) -{ - time_point now = clock_type::now(); - time_duration delta = now - m_last_tick; - m_last_tick = now; - - // add any new quota we've accrued since last time - m_quota += boost::uint64_t(m_rate_limit) * total_microseconds(delta) / 1000000; - - // allow 3 seconds worth of burst - if (m_quota > 3 * m_rate_limit) m_quota = 3 * m_rate_limit; - - // if there's no quota, and it's OK to drop, just - // drop the packet - if (m_quota < 0 && (flags & dont_drop) == 0) return false; - - m_quota -= len; - if (m_quota < 0) m_quota = 0; - udp_socket::send(ep, p, len, ec, flags); - return true; -} - From 06b52f1421bd0d21597a9cf551aaf6c74df597bc Mon Sep 17 00:00:00 2001 From: arvidn Date: Mon, 18 Jan 2016 00:07:21 -0500 Subject: [PATCH 2/3] some cleanup of the dht_socket_interface and fix tests and simulations to build with the dht disabled --- include/libtorrent/kademlia/dht_tracker.hpp | 5 ++--- include/libtorrent/kademlia/node.hpp | 2 +- include/libtorrent/udp_socket.hpp | 7 +++---- simulation/setup_dht.cpp | 5 ++++- simulation/test_dht.cpp | 3 +++ simulation/test_dht_storage.cpp | 6 ++++++ src/kademlia/dht_tracker.cpp | 6 ++---- src/kademlia/node.cpp | 4 ++-- src/kademlia/rpc_manager.cpp | 4 ++-- test/test_dht.cpp | 6 +++--- test/test_direct_dht.cpp | 8 ++++---- 11 files changed, 32 insertions(+), 24 deletions(-) diff --git a/include/libtorrent/kademlia/dht_tracker.hpp b/include/libtorrent/kademlia/dht_tracker.hpp index e2cc65c38..c86cf9421 100644 --- a/include/libtorrent/kademlia/dht_tracker.hpp +++ b/include/libtorrent/kademlia/dht_tracker.hpp @@ -144,9 +144,8 @@ namespace libtorrent { namespace dht void refresh_key(error_code const& e); // implements udp_socket_interface - virtual bool has_quota(); - virtual bool send_packet(libtorrent::entry& e, udp::endpoint const& addr - , int send_flags); + virtual bool has_quota() TORRENT_OVERRIDE; + virtual bool send_packet(libtorrent::entry& e, udp::endpoint const& addr) TORRENT_OVERRIDE; // this is the bdecode_node DHT messages are parsed into. It's a member // in order to avoid having to deallocate and re-allocate it for every diff --git a/include/libtorrent/kademlia/node.hpp b/include/libtorrent/kademlia/node.hpp index c55503e98..8fa0ace9b 100644 --- a/include/libtorrent/kademlia/node.hpp +++ b/include/libtorrent/kademlia/node.hpp @@ -91,7 +91,7 @@ public: struct udp_socket_interface { virtual bool has_quota() = 0; - virtual bool send_packet(entry& e, udp::endpoint const& addr, int flags) = 0; + virtual bool send_packet(entry& e, udp::endpoint const& addr) = 0; protected: ~udp_socket_interface() {} }; diff --git a/include/libtorrent/udp_socket.hpp b/include/libtorrent/udp_socket.hpp index 4293b46b2..54dc6d755 100644 --- a/include/libtorrent/udp_socket.hpp +++ b/include/libtorrent/udp_socket.hpp @@ -73,10 +73,9 @@ namespace libtorrent ~udp_socket(); enum flags_t { - dont_drop = 1 - , peer_connection = 2 - , tracker_connection = 4 - , dont_queue = 8 + peer_connection = 1 + , tracker_connection = 2 + , dont_queue = 4 }; bool is_open() const diff --git a/simulation/setup_dht.cpp b/simulation/setup_dht.cpp index 64bce3385..6af902673 100644 --- a/simulation/setup_dht.cpp +++ b/simulation/setup_dht.cpp @@ -52,6 +52,8 @@ namespace lt = libtorrent; using namespace sim; using namespace libtorrent; +#ifndef TORRENT_DISABLE_DHT + namespace { lt::time_point start_time; @@ -155,7 +157,7 @@ struct dht_node final : lt::dht::udp_socket_interface } bool has_quota() override { return true; } - bool send_packet(entry& e, udp::endpoint const& addr, int flags) override + bool send_packet(entry& e, udp::endpoint const& addr) override { // since the simulaton is single threaded, we can get away with allocating // just a single send buffer @@ -329,4 +331,5 @@ void dht_network::stop() for (auto& n : m_nodes) n.stop(); } +#endif // TORRENT_DISABLE_DHT diff --git a/simulation/test_dht.cpp b/simulation/test_dht.cpp index b671ece3e..85e20665c 100644 --- a/simulation/test_dht.cpp +++ b/simulation/test_dht.cpp @@ -49,6 +49,7 @@ namespace lt = libtorrent; TORRENT_TEST(dht_bootstrap) { +#ifndef TORRENT_DISABLE_DHT sim::default_config cfg; sim::simulation sim{cfg}; @@ -124,5 +125,7 @@ TORRENT_TEST(dht_bootstrap) sim.run(); +#endif // TORRENT_DISABLE_DHT + } diff --git a/simulation/test_dht_storage.cpp b/simulation/test_dht_storage.cpp index f45f2f4f3..e0b947992 100644 --- a/simulation/test_dht_storage.cpp +++ b/simulation/test_dht_storage.cpp @@ -52,6 +52,8 @@ using namespace sim::asio; using sim::simulation; using sim::default_config; +#ifndef TORRENT_DISABLE_DHT + namespace { dht_settings test_settings() { @@ -98,8 +100,11 @@ void test_expiration(high_resolution_clock::duration const& expiry_time sim.run(ec); } +#endif // TORRENT_DISABLE_DHT + TORRENT_TEST(dht_storage_counters) { +#ifndef TORRENT_DISABLE_DHT dht_settings sett = test_settings(); boost::shared_ptr s(dht_default_storage_constructor(node_id(0), sett)); @@ -151,5 +156,6 @@ TORRENT_TEST(dht_storage_counters) c.immutable_data = 0; c.mutable_data = 0; test_expiration(hours(1), s, c); // test expiration of everything after 3 hours +#endif // TORRENT_DISABLE_DHT } diff --git a/src/kademlia/dht_tracker.cpp b/src/kademlia/dht_tracker.cpp index 98cf7c5be..c626baec6 100644 --- a/src/kademlia/dht_tracker.cpp +++ b/src/kademlia/dht_tracker.cpp @@ -418,9 +418,7 @@ namespace libtorrent { namespace dht return m_send_quota > 0; } - // TODO: 4 do we need the flags here? - bool dht_tracker::send_packet(libtorrent::entry& e, udp::endpoint const& addr - , int send_flags) + bool dht_tracker::send_packet(libtorrent::entry& e, udp::endpoint const& addr) { using libtorrent::bencode; using libtorrent::entry; @@ -449,7 +447,7 @@ namespace libtorrent { namespace dht m_send_quota -= m_send_buf.size(); error_code ec; - m_sock.send(addr, &m_send_buf[0], int(m_send_buf.size()), ec, send_flags); + m_sock.send(addr, &m_send_buf[0], int(m_send_buf.size()), ec, 0); if (ec) { m_counters.inc_stats_counter(counters::dht_messages_out_dropped); diff --git a/src/kademlia/node.cpp b/src/kademlia/node.cpp index efeb77016..9b017fcc6 100644 --- a/src/kademlia/node.cpp +++ b/src/kademlia/node.cpp @@ -252,7 +252,7 @@ void node::incoming(msg const& m) // want to open up a magnification opportunity // entry e; // incoming_error(e, "missing 'y' entry"); -// m_sock.send_packet(e, m.addr, 0); +// m_sock.send_packet(e, m.addr); return; } @@ -305,7 +305,7 @@ void node::incoming(msg const& m) entry e; incoming_request(m, e); - m_sock->send_packet(e, m.addr, 0); + m_sock->send_packet(e, m.addr); break; } case 'e': diff --git a/src/kademlia/rpc_manager.cpp b/src/kademlia/rpc_manager.cpp index 63aaa4a8d..7d58c857a 100644 --- a/src/kademlia/rpc_manager.cpp +++ b/src/kademlia/rpc_manager.cpp @@ -297,7 +297,7 @@ bool rpc_manager::incoming(msg const& m, node_id* id) // attack. // entry e; // incoming_error(e, "invalid transaction id"); -// m_sock->send_packet(e, m.addr, 0); +// m_sock->send_packet(e, m.addr); return false; } @@ -473,7 +473,7 @@ bool rpc_manager::invoke(entry& e, udp::endpoint target_addr , print_endpoint(target_addr).c_str()); #endif - if (m_sock->send_packet(e, target_addr, 1)) + if (m_sock->send_packet(e, target_addr)) { m_transactions.insert(std::make_pair(tid, o)); #if TORRENT_USE_ASSERTS diff --git a/test/test_dht.cpp b/test/test_dht.cpp index f7afa9775..d95a8e4e8 100644 --- a/test/test_dht.cpp +++ b/test/test_dht.cpp @@ -95,10 +95,10 @@ static void nop(void* userdata, libtorrent::dht::node_entry const& n) {} std::list > g_sent_packets; -struct mock_socket : udp_socket_interface +struct mock_socket TORRENT_FINAL : udp_socket_interface { - bool has_quota() { return true; } - bool send_packet(entry& msg, udp::endpoint const& ep, int flags) + bool has_quota() TORRENT_OVERRIDE { return true; } + bool send_packet(entry& msg, udp::endpoint const& ep) TORRENT_OVERRIDE { // TODO: ideally the mock_socket would contain this queue of packets, to // make tests independent diff --git a/test/test_direct_dht.cpp b/test/test_direct_dht.cpp index c0887d067..206be5c65 100644 --- a/test/test_direct_dht.cpp +++ b/test/test_direct_dht.cpp @@ -32,7 +32,7 @@ POSSIBILITY OF SUCH DAMAGE. #include "test.hpp" -#ifndef TORRENT_DISABLE_EXTENSIONS +#if !defined TORRENT_DISABLE_EXTENSIONS && !defined TORRENT_DISABLE_DHT #include "libtorrent/config.hpp" #include "libtorrent/session.hpp" @@ -85,11 +85,11 @@ dht_direct_response_alert* get_direct_response(lt::session& ses) } -#endif // #ifndef TORRENT_DISABLE_EXTENSIONS +#endif // #if !defined TORRENT_DISABLE_EXTENSIONS && !defined TORRENT_DISABLE_DHT TORRENT_TEST(direct_dht_request) { -#ifndef TORRENT_DISABLE_EXTENSIONS +#if !defined TORRENT_DISABLE_EXTENSIONS && !defined TORRENT_DISABLE_DHT settings_pack sp; sp.set_bool(settings_pack::enable_lsd, false); sp.set_bool(settings_pack::enable_natpmp, false); @@ -134,5 +134,5 @@ TORRENT_TEST(direct_dht_request) TEST_EQUAL(ra->response().type(), bdecode_node::none_t); TEST_EQUAL(ra->userdata, (void*)123456); } -#endif // #ifndef TORRENT_DISABLE_EXTENSIONS +#endif // #if !defined TORRENT_DISABLE_EXTENSIONS && !defined TORRENT_DISABLE_DHT } From d5203c67d9fa1724b33cd56db7aea2e8454ebf14 Mon Sep 17 00:00:00 2001 From: arvidn Date: Mon, 18 Jan 2016 02:21:27 -0500 Subject: [PATCH 3/3] add DHT rate limit test (simulation). introduce a new counter for dropped incoming dht messages. --- include/libtorrent/kademlia/dht_tracker.hpp | 2 +- include/libtorrent/kademlia/dos_blocker.hpp | 2 +- include/libtorrent/performance_counters.hpp | 1 + simulation/Jamfile | 1 + simulation/test_dht_rate_limit.cpp | 185 ++++++++++++++++++++ src/kademlia/dht_tracker.cpp | 26 +-- src/kademlia/node.cpp | 9 +- src/session_stats.cpp | 10 ++ 8 files changed, 220 insertions(+), 16 deletions(-) create mode 100644 simulation/test_dht_rate_limit.cpp diff --git a/include/libtorrent/kademlia/dht_tracker.hpp b/include/libtorrent/kademlia/dht_tracker.hpp index c86cf9421..d3e35cf76 100644 --- a/include/libtorrent/kademlia/dht_tracker.hpp +++ b/include/libtorrent/kademlia/dht_tracker.hpp @@ -66,7 +66,7 @@ namespace libtorrent { namespace dht { struct dht_tracker; - struct dht_tracker TORRENT_FINAL + struct TORRENT_EXTRA_EXPORT dht_tracker TORRENT_FINAL : udp_socket_interface , udp_socket_observer , boost::enable_shared_from_this diff --git a/include/libtorrent/kademlia/dos_blocker.hpp b/include/libtorrent/kademlia/dos_blocker.hpp index 830950202..44c0f8008 100644 --- a/include/libtorrent/kademlia/dos_blocker.hpp +++ b/include/libtorrent/kademlia/dos_blocker.hpp @@ -67,7 +67,7 @@ namespace libtorrent { namespace dht } private: - + // used to ignore abusive dht nodes struct node_ban_entry { diff --git a/include/libtorrent/performance_counters.hpp b/include/libtorrent/performance_counters.hpp index 09c8d0278..e3ee9aafe 100644 --- a/include/libtorrent/performance_counters.hpp +++ b/include/libtorrent/performance_counters.hpp @@ -217,6 +217,7 @@ namespace libtorrent recv_redundant_bytes, dht_messages_in, + dht_messages_in_dropped, dht_messages_out, dht_messages_out_dropped, dht_bytes_in, diff --git a/simulation/Jamfile b/simulation/Jamfile index 3fe4f3b0a..2c56e4852 100644 --- a/simulation/Jamfile +++ b/simulation/Jamfile @@ -37,5 +37,6 @@ alias libtorrent-sims : [ run test_trackers_extension.cpp ] [ run test_tracker.cpp ] [ run test_ip_filter.cpp ] + [ run test_dht_rate_limit.cpp ] ; diff --git a/simulation/test_dht_rate_limit.cpp b/simulation/test_dht_rate_limit.cpp new file mode 100644 index 000000000..fdf1be94c --- /dev/null +++ b/simulation/test_dht_rate_limit.cpp @@ -0,0 +1,185 @@ +/* + +Copyright (c) 2015, Steven Siloti +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in + the documentation and/or other materials provided with the distribution. + * Neither the name of the author nor the names of its + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. + +*/ + +#if !defined TORRENT_DISABLE_DHT + +#include "test.hpp" + +#include "simulator/simulator.hpp" + +#include "libtorrent/udp_socket.hpp" +#include "libtorrent/kademlia/dht_tracker.hpp" +#include "libtorrent/performance_counters.hpp" +#include "libtorrent/entry.hpp" +#include "libtorrent/session_settings.hpp" +#include "libtorrent/kademlia/dht_observer.hpp" + +#include +#include + +using namespace libtorrent; +namespace lt = libtorrent; +using namespace sim; + +struct obs : dht::dht_observer +{ + virtual void set_external_address(address const& addr + , address const& source) TORRENT_OVERRIDE + {} + virtual address external_address() TORRENT_OVERRIDE + { return address_v4::from_string("40.30.20.10"); } + virtual void get_peers(sha1_hash const& ih) TORRENT_OVERRIDE {} + virtual void outgoing_get_peers(sha1_hash const& target + , sha1_hash const& sent_target, udp::endpoint const& ep) TORRENT_OVERRIDE {} + virtual void announce(sha1_hash const& ih, address const& addr, int port) TORRENT_OVERRIDE {} + virtual void log(dht_logger::module_t l, char const* fmt, ...) TORRENT_OVERRIDE + { + va_list v; + va_start(v, fmt); + vprintf(fmt, v); + va_end(v); + puts("\n"); + } + virtual void log_packet(message_direction_t dir, char const* pkt, int len + , udp::endpoint node) TORRENT_OVERRIDE {} + virtual bool on_dht_request(char const* query, int query_len + , dht::msg const& request, entry& response) TORRENT_OVERRIDE { return false; } +}; + +#endif // #if !defined TORRENT_DISABLE_DHT + +TORRENT_TEST(dht_rate_limit) +{ +#if !defined TORRENT_DISABLE_DHT + + default_config cfg; + simulation sim(cfg); + asio::io_service dht_ios(sim, address_v4::from_string("40.30.20.10")); + + // receiver (the DHT under test) + lt::udp_socket sock(dht_ios); + obs o; + error_code ec; + sock.bind(udp::endpoint(address_v4::from_string("40.30.20.10"), 8888), ec); + dht_settings dhtsett; + dhtsett.block_ratelimit = 100000; // disable the DOS blocker + dhtsett.ignore_dark_internet = false; + dhtsett.upload_rate_limit = 400; + float const target_upload_rate = 400; + int const num_packets = 2000; + + counters cnt; + entry state; + boost::shared_ptr dht = boost::make_shared( + &o, sock, dhtsett, cnt, dht::dht_default_storage_constructor, state); + sock.subscribe(dht.get()); + + // sender + int num_packets_sent = 0; + asio::io_service sender_ios(sim, address_v4::from_string("10.20.30.40")); + udp::socket sender_sock(sender_ios); + sender_sock.open(udp::v4()); + sender_sock.bind(udp::endpoint(address_v4(), 4444)); + sender_sock.io_control(udp::socket::non_blocking_io(true)); + asio::high_resolution_timer timer(sender_ios); + std::function sender_tick = [&](error_code const& ec) + { + if (num_packets_sent == num_packets) + { + // we're done. shut down (a second from now, to let the dust settle) + timer.expires_from_now(chrono::seconds(1)); + timer.async_wait([&](error_code const& ec) + { + dht->stop(); + sock.unsubscribe(dht.get()); + sender_sock.close(); + sock.close(); + }); + return; + } + + char const packet[] = "d1:ad2:id20:ababababababababababe1:y1:q1:q4:pinge"; + sender_sock.send_to(asio::const_buffers_1(packet, sizeof(packet)-1) + , udp::endpoint(address_v4::from_string("40.30.20.10"), 8888)); + ++num_packets_sent; + + timer.expires_from_now(chrono::milliseconds(10)); + timer.async_wait(sender_tick); + }; + timer.expires_from_now(chrono::milliseconds(10)); + timer.async_wait(sender_tick); + + udp::endpoint from; + int num_bytes_received = 0; + int num_packets_received = 0; + char buffer[1500]; + std::function on_receive + = [&](error_code const& ec, std::size_t bytes) + { + if (ec) return; + + num_bytes_received += bytes; + ++num_packets_received; + + sender_sock.async_receive_from(asio::mutable_buffers_1(buffer, sizeof(buffer)) + , from, on_receive); + }; + sender_sock.async_receive_from(asio::mutable_buffers_1(buffer, sizeof(buffer)) + , from, on_receive); + + // run simulation + lt::clock_type::time_point start = lt::clock_type::now(); + sim.run(); + lt::clock_type::time_point end = lt::clock_type::now(); + + // subtract one target_upload_rate here, since we initialize the quota to one + // full second worth of bandwidth + float const average_upload_rate = (num_bytes_received - target_upload_rate) + / (duration_cast(end - start).count() * 0.001f); + + printf("send %d packets. received %d packets (%d bytes). average rate: %f (target: %f)\n" + , num_packets_sent, num_packets_received, num_bytes_received + , average_upload_rate, target_upload_rate); + + // the actual upload rate should be within 5% of the target + TEST_CHECK(std::abs(average_upload_rate - target_upload_rate) < target_upload_rate * 0.05); + + TEST_EQUAL(cnt[counters::dht_messages_in], num_packets); + + // the number of dropped packets + the number of received pings, should equal + // exactly the number of packets we sent + TEST_EQUAL(cnt[counters::dht_messages_in_dropped] + + cnt[counters::dht_ping_in], num_packets); + +#endif // #if !defined TORRENT_DISABLE_EXTENSIONS && !defined TORRENT_DISABLE_DHT +} + diff --git a/src/kademlia/dht_tracker.cpp b/src/kademlia/dht_tracker.cpp index c626baec6..529d0d516 100644 --- a/src/kademlia/dht_tracker.cpp +++ b/src/kademlia/dht_tracker.cpp @@ -108,6 +108,8 @@ namespace libtorrent { namespace dht , m_send_quota(settings.upload_rate_limit) , m_last_tick(aux::time_now()) { + m_blocker.set_block_timer(m_settings.block_timeout); + m_blocker.set_rate_limit(m_settings.block_ratelimit); #ifndef TORRENT_DISABLE_LOGGING m_log->log(dht_logger::tracker, "starting DHT tracker with node id: %s" , to_hex(m_dht.nid().to_string()).c_str()); @@ -319,11 +321,17 @@ namespace libtorrent { namespace dht int num = sizeof(class_a)/sizeof(class_a[0]); if (std::find(class_a, class_a + num, b[0]) != class_a + num) + { + m_counters.inc_stats_counter(counters::dht_messages_in_dropped); return true; + } } if (!m_blocker.incoming(ep.address(), clock_type::now(), m_log)) + { + m_counters.inc_stats_counter(counters::dht_messages_in_dropped); return true; + } using libtorrent::entry; using libtorrent::bdecode; @@ -335,6 +343,7 @@ namespace libtorrent { namespace dht int ret = bdecode(buf, buf + size, m_msg, err, &pos, 10, 500); if (ret != 0) { + m_counters.inc_stats_counter(counters::dht_messages_in_dropped); #ifndef TORRENT_DISABLE_LOGGING m_log->log_packet(dht_logger::incoming_message, buf, size, ep); #endif @@ -412,9 +421,15 @@ namespace libtorrent { namespace dht time_point now = clock_type::now(); time_duration delta = now - m_last_tick; m_last_tick = now; + // add any new quota we've accrued since last time m_send_quota += boost::uint64_t(m_settings.upload_rate_limit) * total_microseconds(delta) / 1000000; + + // allow 3 seconds worth of burst + if (m_send_quota > 3 * m_settings.upload_rate_limit) + m_send_quota = 3 * m_settings.upload_rate_limit; + return m_send_quota > 0; } @@ -432,17 +447,6 @@ namespace libtorrent { namespace dht // update the quota. We won't prevent the packet to be sent if we exceed // the quota, we'll just (potentially) block the next incoming request. - time_point const now = clock_type::now(); - time_duration const delta = now - m_last_tick; - m_last_tick = now; - - // add any new quota we've accrued since last time - m_send_quota += boost::uint64_t(m_settings.upload_rate_limit) - * total_microseconds(delta) / 1000000; - - // allow 3 seconds worth of burst - if (m_send_quota > 3 * m_settings.upload_rate_limit) - m_send_quota = 3 * m_settings.upload_rate_limit; m_send_quota -= m_send_buf.size(); diff --git a/src/kademlia/node.cpp b/src/kademlia/node.cpp index 9b017fcc6..debd81c9d 100644 --- a/src/kademlia/node.cpp +++ b/src/kademlia/node.cpp @@ -303,6 +303,12 @@ void node::incoming(msg const& m) // responds to 'query' messages that it receives. if (m_settings.read_only) break; + if (!m_sock->has_quota()) + { + m_counters.inc_stats_counter(counters::dht_messages_in_dropped); + return; + } + entry e; incoming_request(m, e); m_sock->send_packet(e, m.addr); @@ -775,9 +781,6 @@ void TORRENT_EXTRA_EXPORT write_nodes_entry(entry& r, nodes_t const& nodes) // build response void node::incoming_request(msg const& m, entry& e) { - if (!m_sock->has_quota()) - return; - e = entry(entry::dictionary_t); e["y"] = "r"; e["t"] = m.message.dict_find_string_value("t"); diff --git a/src/session_stats.cpp b/src/session_stats.cpp index 8bcf801a2..d35219d43 100644 --- a/src/session_stats.cpp +++ b/src/session_stats.cpp @@ -422,6 +422,16 @@ namespace libtorrent METRIC(dht, dht_messages_in) METRIC(dht, dht_messages_out) + // the number of incoming DHT requests that were dropped. There are a few + // different reasons why incoming DHT packets may be dropped: + // + // 1. there wasn't enough send quota to respond to them. + // 2. the Denial of service logic kicked in, blocking the peer + // 3. ignore_dark_internet is enabled, and the packet came from a + // non-public IP address + // 4. the bencoding of the message was invalid + METRIC(dht, dht_messages_in_dropped) + // the number of outgoing messages that failed to be // sent METRIC(dht, dht_messages_out_dropped)