first pass at rate limited udp socket (for DHT)
This commit is contained in:
parent
4e9124019b
commit
9bba20cdb4
|
@ -1,4 +1,5 @@
|
|||
|
||||
* rate limited DHT send socket
|
||||
* tracker connections are now also subject to IP filtering
|
||||
* improved optimistic unchoke logic
|
||||
* added monitoring of the DHT lookups
|
||||
|
|
|
@ -549,7 +549,7 @@ namespace libtorrent
|
|||
// but for the udp port used by the DHT.
|
||||
int m_external_udp_port;
|
||||
|
||||
udp_socket m_dht_socket;
|
||||
rate_limited_udp_socket m_dht_socket;
|
||||
|
||||
// these are used when starting the DHT
|
||||
// (and bootstrapping it), and then erased
|
||||
|
|
|
@ -73,7 +73,7 @@ namespace libtorrent { namespace dht
|
|||
{
|
||||
friend void intrusive_ptr_add_ref(dht_tracker const*);
|
||||
friend void intrusive_ptr_release(dht_tracker const*);
|
||||
dht_tracker(libtorrent::aux::session_impl& ses, udp_socket& sock
|
||||
dht_tracker(libtorrent::aux::session_impl& ses, rate_limited_udp_socket& sock
|
||||
, dht_settings const& settings);
|
||||
|
||||
void start(entry const& bootstrap);
|
||||
|
@ -115,7 +115,7 @@ namespace libtorrent { namespace dht
|
|||
|
||||
node_impl m_dht;
|
||||
libtorrent::aux::session_impl& m_ses;
|
||||
udp_socket& m_sock;
|
||||
rate_limited_udp_socket& m_sock;
|
||||
|
||||
std::vector<char> m_send_buf;
|
||||
|
||||
|
|
|
@ -35,6 +35,7 @@ POSSIBILITY OF SUCH DAMAGE.
|
|||
|
||||
#include "libtorrent/socket.hpp"
|
||||
#include "libtorrent/session_settings.hpp"
|
||||
#include "libtorrent/buffer.hpp"
|
||||
|
||||
#include <vector>
|
||||
#include <boost/function.hpp>
|
||||
|
@ -112,6 +113,29 @@ namespace libtorrent
|
|||
int m_magic;
|
||||
#endif
|
||||
};
|
||||
|
||||
struct rate_limited_udp_socket : public udp_socket
|
||||
{
|
||||
rate_limited_udp_socket(io_service& ios, callback_t const& c, connection_queue& cc);
|
||||
void set_rate_limit(int limit) { m_rate_limit = limit; }
|
||||
bool can_send() const { return int(m_queue.size()) >= m_queue_size_limit; }
|
||||
bool send(udp::endpoint const& ep, char const* p, int len, error_code& ec);
|
||||
|
||||
private:
|
||||
struct queued_packet
|
||||
{
|
||||
udp::endpoint ep;
|
||||
buffer buf;
|
||||
};
|
||||
void on_tick(error_code const& e);
|
||||
|
||||
deadline_timer m_timer;
|
||||
int m_queue_size_limit;
|
||||
int m_rate_limit;
|
||||
int m_quota;
|
||||
ptime m_last_tick;
|
||||
std::list<queued_packet> m_queue;
|
||||
};
|
||||
}
|
||||
|
||||
#endif
|
||||
|
|
|
@ -130,7 +130,7 @@ namespace libtorrent { namespace dht
|
|||
|
||||
// class that puts the networking and the kademlia node in a single
|
||||
// unit and connecting them together.
|
||||
dht_tracker::dht_tracker(libtorrent::aux::session_impl& ses, udp_socket& sock
|
||||
dht_tracker::dht_tracker(libtorrent::aux::session_impl& ses, rate_limited_udp_socket& sock
|
||||
, dht_settings const& settings)
|
||||
: m_dht(ses, bind(&dht_tracker::send_packet, this, _1), settings)
|
||||
, m_ses(ses)
|
||||
|
|
|
@ -532,3 +532,59 @@ void udp_socket::connect2(error_code const& e)
|
|||
m_tunnel_packets = true;
|
||||
}
|
||||
|
||||
rate_limited_udp_socket::rate_limited_udp_socket(io_service& ios
|
||||
, callback_t const& c, connection_queue& cc)
|
||||
: udp_socket(ios, c, cc)
|
||||
, m_timer(ios)
|
||||
, m_queue_size_limit(20)
|
||||
, m_rate_limit(2000)
|
||||
, m_quota(2000)
|
||||
, m_last_tick(time_now())
|
||||
{
|
||||
error_code ec;
|
||||
m_timer.expires_from_now(seconds(1), ec);
|
||||
m_timer.async_wait(boost::bind(&rate_limited_udp_socket::on_tick, this, _1));
|
||||
TORRENT_ASSERT(!ec);
|
||||
}
|
||||
|
||||
bool rate_limited_udp_socket::send(udp::endpoint const& ep, char const* p, int len, error_code& ec)
|
||||
{
|
||||
if (m_quota < len)
|
||||
{
|
||||
if (int(m_queue.size()) >= m_queue_size_limit) return false;
|
||||
m_queue.push_back(queued_packet());
|
||||
queued_packet& qp = m_queue.back();
|
||||
qp.ep = ep;
|
||||
qp.buf.insert(qp.buf.begin(), p, p + len);
|
||||
return true;
|
||||
}
|
||||
|
||||
m_quota -= len;
|
||||
send(ep, p, len, ec);
|
||||
return true;
|
||||
}
|
||||
|
||||
void rate_limited_udp_socket::on_tick(error_code const& e)
|
||||
{
|
||||
if (e) return;
|
||||
error_code ec;
|
||||
ptime now = time_now();
|
||||
m_timer.expires_at(now + seconds(1), ec);
|
||||
m_timer.async_wait(boost::bind(&rate_limited_udp_socket::on_tick, this, _1));
|
||||
|
||||
time_duration delta = now - m_last_tick;
|
||||
m_last_tick = now;
|
||||
if (m_quota < m_rate_limit) m_quota += m_rate_limit * total_milliseconds(delta) / 1000.f;
|
||||
|
||||
if (m_queue.empty()) return;
|
||||
|
||||
while (!m_queue.empty() && int(m_queue.front().buf.size()) >= m_quota)
|
||||
{
|
||||
queued_packet const& p = m_queue.front();
|
||||
m_quota -= p.buf.size();
|
||||
error_code ec;
|
||||
send(p.ep, &p.buf[0], p.buf.size(), ec);
|
||||
m_queue.pop_front();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue