From ce544e2300f7cf78b414e4793c23ac5ad707dbb4 Mon Sep 17 00:00:00 2001 From: Arvid Norberg Date: Sun, 9 Nov 2008 00:37:03 +0000 Subject: [PATCH] DHT rate limiter fixes --- include/libtorrent/udp_socket.hpp | 2 +- src/kademlia/dht_tracker.cpp | 80 ++++++++++++++++++++++++------- src/udp_socket.cpp | 16 ++++--- 3 files changed, 72 insertions(+), 26 deletions(-) diff --git a/include/libtorrent/udp_socket.hpp b/include/libtorrent/udp_socket.hpp index 0b7c447ae..9a3b2a06b 100644 --- a/include/libtorrent/udp_socket.hpp +++ b/include/libtorrent/udp_socket.hpp @@ -119,7 +119,7 @@ namespace libtorrent rate_limited_udp_socket(io_service& ios, callback_t const& c, connection_queue& cc); void set_rate_limit(int limit) { m_rate_limit = limit; } bool can_send() const { return int(m_queue.size()) >= m_queue_size_limit; } - bool send(udp::endpoint const& ep, char const* p, int len, error_code& ec); + bool send(udp::endpoint const& ep, char const* p, int len, error_code& ec, int flags = 0); void close(); private: diff --git a/src/kademlia/dht_tracker.cpp b/src/kademlia/dht_tracker.cpp index 07b44ae5d..77af1f793 100644 --- a/src/kademlia/dht_tracker.cpp +++ b/src/kademlia/dht_tracker.cpp @@ -447,7 +447,7 @@ namespace libtorrent { namespace dht { #ifdef TORRENT_DHT_VERBOSE_LOGGING std::string msg(buf, buf + bytes_transferred); - TORRENT_LOG(dht_tracker) << "invalid incoming packet\n"; + TORRENT_LOG(dht_tracker) << "invalid incoming packet\n" << msg << "\n"; #endif return; } @@ -461,7 +461,8 @@ namespace libtorrent { namespace dht if (e.type() != entry::dictionary_t) { #ifdef TORRENT_DHT_VERBOSE_LOGGING - TORRENT_LOG(dht_tracker) << " RECEIVED invalid dht packet"; + std::string msg(buf, buf + bytes_transferred); + TORRENT_LOG(dht_tracker) << " RECEIVED invalid dht packet (not a dictionary)\n" << msg << "\n"; #endif return; } @@ -472,7 +473,13 @@ namespace libtorrent { namespace dht entry const* transaction = e.find_key("t"); if (!transaction || transaction->type() != entry::string_t) + { +#ifdef TORRENT_DHT_VERBOSE_LOGGING + std::string msg(buf, buf + bytes_transferred); + TORRENT_LOG(dht_tracker) << " RECEIVED invalid dht packet (missing or invalid transaction id)"; +#endif return; + } m.transaction_id = transaction->string(); @@ -523,7 +530,12 @@ namespace libtorrent { namespace dht entry const* y = e.find_key("y"); if (!y || y->type() != entry::string_t) + { +#ifdef TORRENT_DHT_VERBOSE_LOGGING + TORRENT_LOG(dht_tracker) << " RECEIVED invalid dht packet (missing or invalid type)"; +#endif return; + } std::string const& msg_type = y->string(); @@ -649,7 +661,12 @@ namespace libtorrent { namespace dht std::string const& target = target_ent->string(); if (target.size() != 20) + { +#ifdef TORRENT_DHT_VERBOSE_LOGGING + TORRENT_LOG(dht_tracker) << "size of 'target' is not 20 bytes: " << target.size(); +#endif return; + } std::copy(target.begin(), target.end(), m.info_hash.begin()); #ifdef TORRENT_DHT_VERBOSE_LOGGING log_line << " t: " << boost::lexical_cast(m.info_hash); @@ -661,11 +678,21 @@ namespace libtorrent { namespace dht { entry const* ih_ent = a->find_key("info_hash"); if (!ih_ent || ih_ent->type() != entry::string_t) + { +#ifdef TORRENT_DHT_VERBOSE_LOGGING + TORRENT_LOG(dht_tracker) << "missing 'info_hash' in get_peers query"; +#endif return; + } std::string const& info_hash = ih_ent->string(); if (info_hash.size() != 20) + { +#ifdef TORRENT_DHT_VERBOSE_LOGGING + TORRENT_LOG(dht_tracker) << "size of 'info_hash' is not 20 bytes: " << info_hash.size(); +#endif return; + } std::copy(info_hash.begin(), info_hash.end(), m.info_hash.begin()); m.message_id = libtorrent::dht::messages::get_peers; #ifdef TORRENT_DHT_VERBOSE_LOGGING @@ -679,11 +706,21 @@ namespace libtorrent { namespace dht #endif entry const* ih_ent = a->find_key("info_hash"); if (!ih_ent || ih_ent->type() != entry::string_t) + { +#ifdef TORRENT_DHT_VERBOSE_LOGGING + TORRENT_LOG(dht_tracker) << "missing 'info_hash' in announce_peer query"; +#endif return; + } std::string const& info_hash = ih_ent->string(); if (info_hash.size() != 20) + { +#ifdef TORRENT_DHT_VERBOSE_LOGGING + TORRENT_LOG(dht_tracker) << "size of 'info_hash' is not 20 bytes: " << info_hash.size(); +#endif return; + } std::copy(info_hash.begin(), info_hash.end(), m.info_hash.begin()); entry const* port_ent = a->find_key("port"); if (!port_ent || port_ent->type() != entry::int_t) @@ -750,9 +787,8 @@ namespace libtorrent { namespace dht else { #ifdef TORRENT_DHT_VERBOSE_LOGGING - std::string msg(buf, buf + bytes_transferred); - TORRENT_LOG(dht_tracker) << "invalid incoming packet: " - << e.what() << "\n" << msg << "\n"; + TORRENT_LOG(dht_tracker) << " *** UNSUPPORTED REQUEST *** : " + << msg_type; #endif return; } @@ -883,6 +919,7 @@ namespace libtorrent { namespace dht { using libtorrent::bencode; using libtorrent::entry; + int send_flags = 0; entry e(entry::dictionary_t); TORRENT_ASSERT(!m.transaction_id.empty() || m.message_id == messages::error); e["t"] = m.transaction_id; @@ -968,6 +1005,9 @@ namespace libtorrent { namespace dht } else { + // set bit 1 of send_flags to indicate that + // this packet should not be dropped by the + // rate limiter. e["y"] = "q"; e["a"] = entry(entry::dictionary_t); entry& a = e["a"]; @@ -986,6 +1026,7 @@ namespace libtorrent { namespace dht { case messages::find_node: { + send_flags = 1; a["target"] = std::string(m.info_hash.begin(), m.info_hash.end()); #ifdef TORRENT_DHT_VERBOSE_LOGGING log_line << " target: " << boost::lexical_cast(m.info_hash); @@ -994,6 +1035,7 @@ namespace libtorrent { namespace dht } case messages::get_peers: { + send_flags = 1; a["info_hash"] = std::string(m.info_hash.begin(), m.info_hash.end()); #ifdef TORRENT_DHT_VERBOSE_LOGGING log_line << " ih: " << boost::lexical_cast(m.info_hash); @@ -1001,6 +1043,7 @@ namespace libtorrent { namespace dht break; } case messages::announce_peer: + send_flags = 1; a["port"] = m.port; a["info_hash"] = std::string(m.info_hash.begin(), m.info_hash.end()); a["token"] = m.write_token; @@ -1017,22 +1060,23 @@ namespace libtorrent { namespace dht m_send_buf.clear(); bencode(std::back_inserter(m_send_buf), e); error_code ec; - m_sock.send(m.addr, &m_send_buf[0], (int)m_send_buf.size(), ec); - - // account for IP and UDP overhead - m_sent_bytes += m_send_buf.size() + 28; + if (m_sock.send(m.addr, &m_send_buf[0], (int)m_send_buf.size(), ec, send_flags)) + { + // account for IP and UDP overhead + m_sent_bytes += m_send_buf.size() + 28; #ifdef TORRENT_DHT_VERBOSE_LOGGING - m_total_out_bytes += m_send_buf.size(); + m_total_out_bytes += m_send_buf.size(); - if (m.reply) - { - ++m_replies_sent[m.message_id]; - m_replies_bytes_sent[m.message_id] += int(m_send_buf.size()); - } - else - { - m_queries_out_bytes += m_send_buf.size(); + if (m.reply) + { + ++m_replies_sent[m.message_id]; + m_replies_bytes_sent[m.message_id] += int(m_send_buf.size()); + } + else + { + m_queries_out_bytes += m_send_buf.size(); + } } TORRENT_LOG(dht_tracker) << log_line.str() << " ]"; #endif diff --git a/src/udp_socket.cpp b/src/udp_socket.cpp index e29d8d755..3e3407d49 100644 --- a/src/udp_socket.cpp +++ b/src/udp_socket.cpp @@ -536,9 +536,9 @@ rate_limited_udp_socket::rate_limited_udp_socket(io_service& ios , callback_t const& c, connection_queue& cc) : udp_socket(ios, c, cc) , m_timer(ios) - , m_queue_size_limit(20) - , m_rate_limit(2000) - , m_quota(2000) + , m_queue_size_limit(200) + , m_rate_limit(4000) + , m_quota(4000) , m_last_tick(time_now()) { error_code ec; @@ -547,11 +547,13 @@ rate_limited_udp_socket::rate_limited_udp_socket(io_service& ios TORRENT_ASSERT(!ec); } -bool rate_limited_udp_socket::send(udp::endpoint const& ep, char const* p, int len, error_code& ec) +bool rate_limited_udp_socket::send(udp::endpoint const& ep, char const* p, int len, error_code& ec, int flags) { if (m_quota < len) { - if (int(m_queue.size()) >= m_queue_size_limit) return false; + // bit 1 of flags means "don't drop" + if (int(m_queue.size()) >= m_queue_size_limit && (flags & 1) == 0) + return false; m_queue.push_back(queued_packet()); queued_packet& qp = m_queue.back(); qp.ep = ep; @@ -560,7 +562,7 @@ bool rate_limited_udp_socket::send(udp::endpoint const& ep, char const* p, int l } m_quota -= len; - send(ep, p, len, ec); + udp_socket::send(ep, p, len, ec); return true; } @@ -583,7 +585,7 @@ void rate_limited_udp_socket::on_tick(error_code const& e) queued_packet const& p = m_queue.front(); m_quota -= p.buf.size(); error_code ec; - send(p.ep, &p.buf[0], p.buf.size(), ec); + udp_socket::send(p.ep, &p.buf[0], p.buf.size(), ec); m_queue.pop_front(); } }