diff --git a/include/libtorrent/kademlia/dht_tracker.hpp b/include/libtorrent/kademlia/dht_tracker.hpp index c86cf9421..d3e35cf76 100644 --- a/include/libtorrent/kademlia/dht_tracker.hpp +++ b/include/libtorrent/kademlia/dht_tracker.hpp @@ -66,7 +66,7 @@ namespace libtorrent { namespace dht { struct dht_tracker; - struct dht_tracker TORRENT_FINAL + struct TORRENT_EXTRA_EXPORT dht_tracker TORRENT_FINAL : udp_socket_interface , udp_socket_observer , boost::enable_shared_from_this diff --git a/include/libtorrent/kademlia/dos_blocker.hpp b/include/libtorrent/kademlia/dos_blocker.hpp index 830950202..44c0f8008 100644 --- a/include/libtorrent/kademlia/dos_blocker.hpp +++ b/include/libtorrent/kademlia/dos_blocker.hpp @@ -67,7 +67,7 @@ namespace libtorrent { namespace dht } private: - + // used to ignore abusive dht nodes struct node_ban_entry { diff --git a/include/libtorrent/performance_counters.hpp b/include/libtorrent/performance_counters.hpp index 09c8d0278..e3ee9aafe 100644 --- a/include/libtorrent/performance_counters.hpp +++ b/include/libtorrent/performance_counters.hpp @@ -217,6 +217,7 @@ namespace libtorrent recv_redundant_bytes, dht_messages_in, + dht_messages_in_dropped, dht_messages_out, dht_messages_out_dropped, dht_bytes_in, diff --git a/simulation/Jamfile b/simulation/Jamfile index 3fe4f3b0a..2c56e4852 100644 --- a/simulation/Jamfile +++ b/simulation/Jamfile @@ -37,5 +37,6 @@ alias libtorrent-sims : [ run test_trackers_extension.cpp ] [ run test_tracker.cpp ] [ run test_ip_filter.cpp ] + [ run test_dht_rate_limit.cpp ] ; diff --git a/simulation/test_dht_rate_limit.cpp b/simulation/test_dht_rate_limit.cpp new file mode 100644 index 000000000..fdf1be94c --- /dev/null +++ b/simulation/test_dht_rate_limit.cpp @@ -0,0 +1,185 @@ +/* + +Copyright (c) 2015, Steven Siloti +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in + the documentation and/or other materials provided with the distribution. + * Neither the name of the author nor the names of its + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. + +*/ + +#if !defined TORRENT_DISABLE_DHT + +#include "test.hpp" + +#include "simulator/simulator.hpp" + +#include "libtorrent/udp_socket.hpp" +#include "libtorrent/kademlia/dht_tracker.hpp" +#include "libtorrent/performance_counters.hpp" +#include "libtorrent/entry.hpp" +#include "libtorrent/session_settings.hpp" +#include "libtorrent/kademlia/dht_observer.hpp" + +#include +#include + +using namespace libtorrent; +namespace lt = libtorrent; +using namespace sim; + +struct obs : dht::dht_observer +{ + virtual void set_external_address(address const& addr + , address const& source) TORRENT_OVERRIDE + {} + virtual address external_address() TORRENT_OVERRIDE + { return address_v4::from_string("40.30.20.10"); } + virtual void get_peers(sha1_hash const& ih) TORRENT_OVERRIDE {} + virtual void outgoing_get_peers(sha1_hash const& target + , sha1_hash const& sent_target, udp::endpoint const& ep) TORRENT_OVERRIDE {} + virtual void announce(sha1_hash const& ih, address const& addr, int port) TORRENT_OVERRIDE {} + virtual void log(dht_logger::module_t l, char const* fmt, ...) TORRENT_OVERRIDE + { + va_list v; + va_start(v, fmt); + vprintf(fmt, v); + va_end(v); + puts("\n"); + } + virtual void log_packet(message_direction_t dir, char const* pkt, int len + , udp::endpoint node) TORRENT_OVERRIDE {} + virtual bool on_dht_request(char const* query, int query_len + , dht::msg const& request, entry& response) TORRENT_OVERRIDE { return false; } +}; + +#endif // #if !defined TORRENT_DISABLE_DHT + +TORRENT_TEST(dht_rate_limit) +{ +#if !defined TORRENT_DISABLE_DHT + + default_config cfg; + simulation sim(cfg); + asio::io_service dht_ios(sim, address_v4::from_string("40.30.20.10")); + + // receiver (the DHT under test) + lt::udp_socket sock(dht_ios); + obs o; + error_code ec; + sock.bind(udp::endpoint(address_v4::from_string("40.30.20.10"), 8888), ec); + dht_settings dhtsett; + dhtsett.block_ratelimit = 100000; // disable the DOS blocker + dhtsett.ignore_dark_internet = false; + dhtsett.upload_rate_limit = 400; + float const target_upload_rate = 400; + int const num_packets = 2000; + + counters cnt; + entry state; + boost::shared_ptr dht = boost::make_shared( + &o, sock, dhtsett, cnt, dht::dht_default_storage_constructor, state); + sock.subscribe(dht.get()); + + // sender + int num_packets_sent = 0; + asio::io_service sender_ios(sim, address_v4::from_string("10.20.30.40")); + udp::socket sender_sock(sender_ios); + sender_sock.open(udp::v4()); + sender_sock.bind(udp::endpoint(address_v4(), 4444)); + sender_sock.io_control(udp::socket::non_blocking_io(true)); + asio::high_resolution_timer timer(sender_ios); + std::function sender_tick = [&](error_code const& ec) + { + if (num_packets_sent == num_packets) + { + // we're done. shut down (a second from now, to let the dust settle) + timer.expires_from_now(chrono::seconds(1)); + timer.async_wait([&](error_code const& ec) + { + dht->stop(); + sock.unsubscribe(dht.get()); + sender_sock.close(); + sock.close(); + }); + return; + } + + char const packet[] = "d1:ad2:id20:ababababababababababe1:y1:q1:q4:pinge"; + sender_sock.send_to(asio::const_buffers_1(packet, sizeof(packet)-1) + , udp::endpoint(address_v4::from_string("40.30.20.10"), 8888)); + ++num_packets_sent; + + timer.expires_from_now(chrono::milliseconds(10)); + timer.async_wait(sender_tick); + }; + timer.expires_from_now(chrono::milliseconds(10)); + timer.async_wait(sender_tick); + + udp::endpoint from; + int num_bytes_received = 0; + int num_packets_received = 0; + char buffer[1500]; + std::function on_receive + = [&](error_code const& ec, std::size_t bytes) + { + if (ec) return; + + num_bytes_received += bytes; + ++num_packets_received; + + sender_sock.async_receive_from(asio::mutable_buffers_1(buffer, sizeof(buffer)) + , from, on_receive); + }; + sender_sock.async_receive_from(asio::mutable_buffers_1(buffer, sizeof(buffer)) + , from, on_receive); + + // run simulation + lt::clock_type::time_point start = lt::clock_type::now(); + sim.run(); + lt::clock_type::time_point end = lt::clock_type::now(); + + // subtract one target_upload_rate here, since we initialize the quota to one + // full second worth of bandwidth + float const average_upload_rate = (num_bytes_received - target_upload_rate) + / (duration_cast(end - start).count() * 0.001f); + + printf("send %d packets. received %d packets (%d bytes). average rate: %f (target: %f)\n" + , num_packets_sent, num_packets_received, num_bytes_received + , average_upload_rate, target_upload_rate); + + // the actual upload rate should be within 5% of the target + TEST_CHECK(std::abs(average_upload_rate - target_upload_rate) < target_upload_rate * 0.05); + + TEST_EQUAL(cnt[counters::dht_messages_in], num_packets); + + // the number of dropped packets + the number of received pings, should equal + // exactly the number of packets we sent + TEST_EQUAL(cnt[counters::dht_messages_in_dropped] + + cnt[counters::dht_ping_in], num_packets); + +#endif // #if !defined TORRENT_DISABLE_EXTENSIONS && !defined TORRENT_DISABLE_DHT +} + diff --git a/src/kademlia/dht_tracker.cpp b/src/kademlia/dht_tracker.cpp index c626baec6..529d0d516 100644 --- a/src/kademlia/dht_tracker.cpp +++ b/src/kademlia/dht_tracker.cpp @@ -108,6 +108,8 @@ namespace libtorrent { namespace dht , m_send_quota(settings.upload_rate_limit) , m_last_tick(aux::time_now()) { + m_blocker.set_block_timer(m_settings.block_timeout); + m_blocker.set_rate_limit(m_settings.block_ratelimit); #ifndef TORRENT_DISABLE_LOGGING m_log->log(dht_logger::tracker, "starting DHT tracker with node id: %s" , to_hex(m_dht.nid().to_string()).c_str()); @@ -319,11 +321,17 @@ namespace libtorrent { namespace dht int num = sizeof(class_a)/sizeof(class_a[0]); if (std::find(class_a, class_a + num, b[0]) != class_a + num) + { + m_counters.inc_stats_counter(counters::dht_messages_in_dropped); return true; + } } if (!m_blocker.incoming(ep.address(), clock_type::now(), m_log)) + { + m_counters.inc_stats_counter(counters::dht_messages_in_dropped); return true; + } using libtorrent::entry; using libtorrent::bdecode; @@ -335,6 +343,7 @@ namespace libtorrent { namespace dht int ret = bdecode(buf, buf + size, m_msg, err, &pos, 10, 500); if (ret != 0) { + m_counters.inc_stats_counter(counters::dht_messages_in_dropped); #ifndef TORRENT_DISABLE_LOGGING m_log->log_packet(dht_logger::incoming_message, buf, size, ep); #endif @@ -412,9 +421,15 @@ namespace libtorrent { namespace dht time_point now = clock_type::now(); time_duration delta = now - m_last_tick; m_last_tick = now; + // add any new quota we've accrued since last time m_send_quota += boost::uint64_t(m_settings.upload_rate_limit) * total_microseconds(delta) / 1000000; + + // allow 3 seconds worth of burst + if (m_send_quota > 3 * m_settings.upload_rate_limit) + m_send_quota = 3 * m_settings.upload_rate_limit; + return m_send_quota > 0; } @@ -432,17 +447,6 @@ namespace libtorrent { namespace dht // update the quota. We won't prevent the packet to be sent if we exceed // the quota, we'll just (potentially) block the next incoming request. - time_point const now = clock_type::now(); - time_duration const delta = now - m_last_tick; - m_last_tick = now; - - // add any new quota we've accrued since last time - m_send_quota += boost::uint64_t(m_settings.upload_rate_limit) - * total_microseconds(delta) / 1000000; - - // allow 3 seconds worth of burst - if (m_send_quota > 3 * m_settings.upload_rate_limit) - m_send_quota = 3 * m_settings.upload_rate_limit; m_send_quota -= m_send_buf.size(); diff --git a/src/kademlia/node.cpp b/src/kademlia/node.cpp index 9b017fcc6..debd81c9d 100644 --- a/src/kademlia/node.cpp +++ b/src/kademlia/node.cpp @@ -303,6 +303,12 @@ void node::incoming(msg const& m) // responds to 'query' messages that it receives. if (m_settings.read_only) break; + if (!m_sock->has_quota()) + { + m_counters.inc_stats_counter(counters::dht_messages_in_dropped); + return; + } + entry e; incoming_request(m, e); m_sock->send_packet(e, m.addr); @@ -775,9 +781,6 @@ void TORRENT_EXTRA_EXPORT write_nodes_entry(entry& r, nodes_t const& nodes) // build response void node::incoming_request(msg const& m, entry& e) { - if (!m_sock->has_quota()) - return; - e = entry(entry::dictionary_t); e["y"] = "r"; e["t"] = m.message.dict_find_string_value("t"); diff --git a/src/session_stats.cpp b/src/session_stats.cpp index 8bcf801a2..d35219d43 100644 --- a/src/session_stats.cpp +++ b/src/session_stats.cpp @@ -422,6 +422,16 @@ namespace libtorrent METRIC(dht, dht_messages_in) METRIC(dht, dht_messages_out) + // the number of incoming DHT requests that were dropped. There are a few + // different reasons why incoming DHT packets may be dropped: + // + // 1. there wasn't enough send quota to respond to them. + // 2. the Denial of service logic kicked in, blocking the peer + // 3. ignore_dark_internet is enabled, and the packet came from a + // non-public IP address + // 4. the bencoding of the message was invalid + METRIC(dht, dht_messages_in_dropped) + // the number of outgoing messages that failed to be // sent METRIC(dht, dht_messages_out_dropped)