DHT rate limiter fixes

This commit is contained in:
Arvid Norberg 2008-11-09 00:37:03 +00:00
parent 3ed38059aa
commit ce544e2300
3 changed files with 72 additions and 26 deletions

View File

@ -119,7 +119,7 @@ namespace libtorrent
rate_limited_udp_socket(io_service& ios, callback_t const& c, connection_queue& cc);
void set_rate_limit(int limit) { m_rate_limit = limit; }
bool can_send() const { return int(m_queue.size()) >= m_queue_size_limit; }
bool send(udp::endpoint const& ep, char const* p, int len, error_code& ec);
bool send(udp::endpoint const& ep, char const* p, int len, error_code& ec, int flags = 0);
void close();
private:

View File

@ -447,7 +447,7 @@ namespace libtorrent { namespace dht
{
#ifdef TORRENT_DHT_VERBOSE_LOGGING
std::string msg(buf, buf + bytes_transferred);
TORRENT_LOG(dht_tracker) << "invalid incoming packet\n";
TORRENT_LOG(dht_tracker) << "invalid incoming packet\n" << msg << "\n";
#endif
return;
}
@ -461,7 +461,8 @@ namespace libtorrent { namespace dht
if (e.type() != entry::dictionary_t)
{
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(dht_tracker) << " RECEIVED invalid dht packet";
std::string msg(buf, buf + bytes_transferred);
TORRENT_LOG(dht_tracker) << " RECEIVED invalid dht packet (not a dictionary)\n" << msg << "\n";
#endif
return;
}
@ -472,7 +473,13 @@ namespace libtorrent { namespace dht
entry const* transaction = e.find_key("t");
if (!transaction || transaction->type() != entry::string_t)
{
#ifdef TORRENT_DHT_VERBOSE_LOGGING
std::string msg(buf, buf + bytes_transferred);
TORRENT_LOG(dht_tracker) << " RECEIVED invalid dht packet (missing or invalid transaction id)";
#endif
return;
}
m.transaction_id = transaction->string();
@ -523,7 +530,12 @@ namespace libtorrent { namespace dht
entry const* y = e.find_key("y");
if (!y || y->type() != entry::string_t)
{
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(dht_tracker) << " RECEIVED invalid dht packet (missing or invalid type)";
#endif
return;
}
std::string const& msg_type = y->string();
@ -649,7 +661,12 @@ namespace libtorrent { namespace dht
std::string const& target = target_ent->string();
if (target.size() != 20)
{
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(dht_tracker) << "size of 'target' is not 20 bytes: " << target.size();
#endif
return;
}
std::copy(target.begin(), target.end(), m.info_hash.begin());
#ifdef TORRENT_DHT_VERBOSE_LOGGING
log_line << " t: " << boost::lexical_cast<std::string>(m.info_hash);
@ -661,11 +678,21 @@ namespace libtorrent { namespace dht
{
entry const* ih_ent = a->find_key("info_hash");
if (!ih_ent || ih_ent->type() != entry::string_t)
{
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(dht_tracker) << "missing 'info_hash' in get_peers query";
#endif
return;
}
std::string const& info_hash = ih_ent->string();
if (info_hash.size() != 20)
{
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(dht_tracker) << "size of 'info_hash' is not 20 bytes: " << info_hash.size();
#endif
return;
}
std::copy(info_hash.begin(), info_hash.end(), m.info_hash.begin());
m.message_id = libtorrent::dht::messages::get_peers;
#ifdef TORRENT_DHT_VERBOSE_LOGGING
@ -679,11 +706,21 @@ namespace libtorrent { namespace dht
#endif
entry const* ih_ent = a->find_key("info_hash");
if (!ih_ent || ih_ent->type() != entry::string_t)
{
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(dht_tracker) << "missing 'info_hash' in announce_peer query";
#endif
return;
}
std::string const& info_hash = ih_ent->string();
if (info_hash.size() != 20)
{
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(dht_tracker) << "size of 'info_hash' is not 20 bytes: " << info_hash.size();
#endif
return;
}
std::copy(info_hash.begin(), info_hash.end(), m.info_hash.begin());
entry const* port_ent = a->find_key("port");
if (!port_ent || port_ent->type() != entry::int_t)
@ -750,9 +787,8 @@ namespace libtorrent { namespace dht
else
{
#ifdef TORRENT_DHT_VERBOSE_LOGGING
std::string msg(buf, buf + bytes_transferred);
TORRENT_LOG(dht_tracker) << "invalid incoming packet: "
<< e.what() << "\n" << msg << "\n";
TORRENT_LOG(dht_tracker) << " *** UNSUPPORTED REQUEST *** : "
<< msg_type;
#endif
return;
}
@ -883,6 +919,7 @@ namespace libtorrent { namespace dht
{
using libtorrent::bencode;
using libtorrent::entry;
int send_flags = 0;
entry e(entry::dictionary_t);
TORRENT_ASSERT(!m.transaction_id.empty() || m.message_id == messages::error);
e["t"] = m.transaction_id;
@ -968,6 +1005,9 @@ namespace libtorrent { namespace dht
}
else
{
// set bit 1 of send_flags to indicate that
// this packet should not be dropped by the
// rate limiter.
e["y"] = "q";
e["a"] = entry(entry::dictionary_t);
entry& a = e["a"];
@ -986,6 +1026,7 @@ namespace libtorrent { namespace dht
{
case messages::find_node:
{
send_flags = 1;
a["target"] = std::string(m.info_hash.begin(), m.info_hash.end());
#ifdef TORRENT_DHT_VERBOSE_LOGGING
log_line << " target: " << boost::lexical_cast<std::string>(m.info_hash);
@ -994,6 +1035,7 @@ namespace libtorrent { namespace dht
}
case messages::get_peers:
{
send_flags = 1;
a["info_hash"] = std::string(m.info_hash.begin(), m.info_hash.end());
#ifdef TORRENT_DHT_VERBOSE_LOGGING
log_line << " ih: " << boost::lexical_cast<std::string>(m.info_hash);
@ -1001,6 +1043,7 @@ namespace libtorrent { namespace dht
break;
}
case messages::announce_peer:
send_flags = 1;
a["port"] = m.port;
a["info_hash"] = std::string(m.info_hash.begin(), m.info_hash.end());
a["token"] = m.write_token;
@ -1017,22 +1060,23 @@ namespace libtorrent { namespace dht
m_send_buf.clear();
bencode(std::back_inserter(m_send_buf), e);
error_code ec;
m_sock.send(m.addr, &m_send_buf[0], (int)m_send_buf.size(), ec);
// account for IP and UDP overhead
m_sent_bytes += m_send_buf.size() + 28;
if (m_sock.send(m.addr, &m_send_buf[0], (int)m_send_buf.size(), ec, send_flags))
{
// account for IP and UDP overhead
m_sent_bytes += m_send_buf.size() + 28;
#ifdef TORRENT_DHT_VERBOSE_LOGGING
m_total_out_bytes += m_send_buf.size();
m_total_out_bytes += m_send_buf.size();
if (m.reply)
{
++m_replies_sent[m.message_id];
m_replies_bytes_sent[m.message_id] += int(m_send_buf.size());
}
else
{
m_queries_out_bytes += m_send_buf.size();
if (m.reply)
{
++m_replies_sent[m.message_id];
m_replies_bytes_sent[m.message_id] += int(m_send_buf.size());
}
else
{
m_queries_out_bytes += m_send_buf.size();
}
}
TORRENT_LOG(dht_tracker) << log_line.str() << " ]";
#endif

View File

@ -536,9 +536,9 @@ rate_limited_udp_socket::rate_limited_udp_socket(io_service& ios
, callback_t const& c, connection_queue& cc)
: udp_socket(ios, c, cc)
, m_timer(ios)
, m_queue_size_limit(20)
, m_rate_limit(2000)
, m_quota(2000)
, m_queue_size_limit(200)
, m_rate_limit(4000)
, m_quota(4000)
, m_last_tick(time_now())
{
error_code ec;
@ -547,11 +547,13 @@ rate_limited_udp_socket::rate_limited_udp_socket(io_service& ios
TORRENT_ASSERT(!ec);
}
bool rate_limited_udp_socket::send(udp::endpoint const& ep, char const* p, int len, error_code& ec)
bool rate_limited_udp_socket::send(udp::endpoint const& ep, char const* p, int len, error_code& ec, int flags)
{
if (m_quota < len)
{
if (int(m_queue.size()) >= m_queue_size_limit) return false;
// bit 1 of flags means "don't drop"
if (int(m_queue.size()) >= m_queue_size_limit && (flags & 1) == 0)
return false;
m_queue.push_back(queued_packet());
queued_packet& qp = m_queue.back();
qp.ep = ep;
@ -560,7 +562,7 @@ bool rate_limited_udp_socket::send(udp::endpoint const& ep, char const* p, int l
}
m_quota -= len;
send(ep, p, len, ec);
udp_socket::send(ep, p, len, ec);
return true;
}
@ -583,7 +585,7 @@ void rate_limited_udp_socket::on_tick(error_code const& e)
queued_packet const& p = m_queue.front();
m_quota -= p.buf.size();
error_code ec;
send(p.ep, &p.buf[0], p.buf.size(), ec);
udp_socket::send(p.ep, &p.buf[0], p.buf.size(), ec);
m_queue.pop_front();
}
}