merged DHT optimization from libtorrent_aio

This commit is contained in:
Arvid Norberg 2012-09-22 18:15:29 +00:00
parent 330aac4acb
commit 055f8a0598
7 changed files with 71 additions and 29 deletions

View File

@ -1,3 +1,4 @@
* improve DHT lookup times
* uTP path MTU discovery improvements
* optimized the torrent creator optimizer to scale significantly better with more files
* fix uTP edge case where udp socket buffer fills up

View File

@ -42,10 +42,11 @@ namespace libtorrent { namespace dht
struct node_entry
{
node_entry(node_id const& id_, udp::endpoint ep, bool pinged = false)
node_entry(node_id const& id_, udp::endpoint ep, int roundtriptime = 0xffff, bool pinged = false)
: addr(ep.address())
, port(ep.port())
, timeout_count(pinged ? 0 : 0xffff)
, rtt(roundtriptime)
, id(id_)
{
#ifdef TORRENT_DHT_VERBOSE_LOGGING
@ -57,6 +58,7 @@ struct node_entry
: addr(ep.address())
, port(ep.port())
, timeout_count(0xffff)
, rtt(0xffff)
, id(0)
{
#ifdef TORRENT_DHT_VERBOSE_LOGGING
@ -66,6 +68,7 @@ struct node_entry
node_entry()
: timeout_count(0xffff)
, rtt(0xffff)
, id(0)
{
#ifdef TORRENT_DHT_VERBOSE_LOGGING
@ -80,6 +83,11 @@ struct node_entry
void reset_fail_count() { if (pinged()) timeout_count = 0; }
udp::endpoint ep() const { return udp::endpoint(addr, port); }
bool confirmed() const { return timeout_count == 0; }
void update_rtt(int new_rtt)
{
if (rtt == 0xffff) rtt = new_rtt;
else rtt = int(rtt) / 3 + int(new_rtt) * 2 / 3;
}
// TODO: replace with a union of address_v4 and address_v6
address addr;
@ -87,6 +95,8 @@ struct node_entry
// the number of times this node has failed to
// respond in a row
boost::uint16_t timeout_count;
// the average RTT of this node
boost::uint16_t rtt;
node_id id;
#ifdef TORRENT_DHT_VERBOSE_LOGGING
ptime first_seen;

View File

@ -102,13 +102,13 @@ public:
router_iterator router_begin() const { return m_router_nodes.begin(); }
router_iterator router_end() const { return m_router_nodes.end(); }
bool add_node(node_entry const& e);
bool add_node(node_entry e);
// this function is called every time the node sees
// a sign of a node being alive. This node will either
// be inserted in the k-buckets or be moved to the top
// of its bucket.
bool node_seen(node_id const& id, udp::endpoint ep);
bool node_seen(node_id const& id, udp::endpoint ep, int rtt);
// this may add a node to the routing table and mark it as
// not pinged. If the bucket the node falls into is full,

View File

@ -232,9 +232,9 @@ namespace libtorrent { namespace dht
// turns on and off individual components' logging
// rpc_log().enable(false);
// node_log().enable(false);
// traversal_log().enable(false);
rpc_log().enable(false);
node_log().enable(false);
traversal_log().enable(false);
// dht_tracker_log.enable(false);
TORRENT_LOG(dht_tracker) << "starting DHT tracker with node id: " << m_dht.nid();

View File

@ -742,7 +742,7 @@ void node_impl::incoming_request(msg const& m, entry& e)
// the token was correct. That means this
// node is not spoofing its address. So, let
// the table get a chance to add it.
m_table.node_seen(id, m.addr);
m_table.node_seen(id, m.addr, 0xffff);
if (!m_map.empty() && int(m_map.size()) >= m_settings.max_torrents)
{
@ -950,7 +950,7 @@ void node_impl::incoming_request(msg const& m, entry& e)
f = &i->second;
}
m_table.node_seen(id, m.addr);
m_table.node_seen(id, m.addr, 0xffff);
f->last_seen = time_now();

View File

@ -170,6 +170,7 @@ void routing_table::print_state(std::ostream& os) const
, end(i->live_nodes.end()); j != end; ++j)
{
os << " id: " << j->id
<< " rtt: " << j->rtt
<< " ip: " << j->ep()
<< " fails: " << j->fail_count()
<< " pinged: " << j->pinged()
@ -316,7 +317,7 @@ node_entry* routing_table::find_node(udp::endpoint const& ep, routing_table::tab
return 0;
}
bool routing_table::add_node(node_entry const& e)
bool routing_table::add_node(node_entry e)
{
if (m_router_nodes.find(e.ep()) != m_router_nodes.end()) return false;
@ -359,6 +360,7 @@ bool routing_table::add_node(node_entry const& e)
if (existing->id == e.id)
{
existing->timeout_count = 0;
existing->update_rtt(e.rtt);
return ret;
}
@ -415,17 +417,30 @@ bool routing_table::add_node(node_entry const& e)
if (j->addr != e.addr || j->port != e.port) return ret;
// we already have the node in our bucket
// just move it to the back since it was
// the last node we had any contact with
// in this bucket
TORRENT_ASSERT(j->id == e.id && j->ep() == e.ep());
j->timeout_count = 0;
j->update_rtt(e.rtt);
// TORRENT_LOG(table) << "updating node: " << i->id << " " << i->addr;
return ret;
}
if (std::find_if(rb.begin(), rb.end(), boost::bind(&node_entry::id, _1) == e.id)
!= rb.end()) return ret;
// if this node exists in the replacement bucket. update it and
// pull it out from there. We may add it back to the replacement
// bucket, but we may also replace a node in the main bucket, now
// that we have an updated RTT
j = std::find_if(rb.begin(), rb.end(), boost::bind(&node_entry::id, _1) == e.id);
if (j != rb.end())
{
// a new IP address just claimed this node-ID
// ignore it
if (j->addr != e.addr || j->port != e.port) return ret;
TORRENT_ASSERT(j->id == e.id && j->ep() == e.ep());
j->timeout_count = 0;
j->update_rtt(e.rtt);
e = *j;
m_ips.erase(j->addr.to_v4().to_bytes());
rb.erase(j);
}
if (m_settings.restrict_routing_ips)
{
@ -447,7 +462,7 @@ bool routing_table::add_node(node_entry const& e)
j = std::find_if(rb.begin(), rb.end(), boost::bind(&compare_ip_cidr, _1, e));
if (j != rb.end())
{
// same thing bug for the replacement bucket
// same thing but for the replacement bucket
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(table) << "ignoring (replacement) node: " << e.id << " " << e.addr
<< " existing node: "
@ -457,10 +472,7 @@ bool routing_table::add_node(node_entry const& e)
}
}
// if the node was not present in our list
// we will only insert it if there is room
// for it, or if some of our nodes have gone
// offline
// if there's room in the main bucket, just insert it
if (int(b.size()) < m_bucket_size)
{
if (b.empty()) b.reserve(m_bucket_size);
@ -474,6 +486,8 @@ bool routing_table::add_node(node_entry const& e)
// i.e. we haven't confirmed that they respond to messages.
// Then we look for nodes marked as stale
// in the k-bucket. If we find one, we can replace it.
// as the last replacement strategy, we look for nodes with the
// highest RTT, and if it's higher than the new node, we replace it
// can we split the bucket?
bool can_split = false;
@ -483,7 +497,7 @@ bool routing_table::add_node(node_entry const& e)
// only nodes that are pinged and haven't failed
// can split the bucket, and we can only split
// the last bucket
can_split = (boost::next(i) == m_buckets.end() && m_buckets.size() < 160);
can_split = (boost::next(i) == m_buckets.end() && m_buckets.size() < 159);
// if the node we're trying to insert is considered pinged,
// we may replace other nodes that aren't pinged
@ -495,8 +509,7 @@ bool routing_table::add_node(node_entry const& e)
// j points to a node that has not been pinged.
// Replace it with this new one
m_ips.erase(j->addr.to_v4().to_bytes());
b.erase(j);
b.push_back(e);
*j = e;
m_ips.insert(e.addr.to_v4().to_bytes());
// TORRENT_LOG(table) << "replacing unpinged node: " << e.id << " " << e.addr;
return ret;
@ -517,12 +530,26 @@ bool routing_table::add_node(node_entry const& e)
// i points to a node that has been marked
// as stale. Replace it with this new one
m_ips.erase(j->addr.to_v4().to_bytes());
b.erase(j);
b.push_back(e);
*j = e;
m_ips.insert(e.addr.to_v4().to_bytes());
// TORRENT_LOG(table) << "replacing stale node: " << e.id << " " << e.addr;
return ret;
}
// in order to keep lookup times small, prefer nodes with low RTTs
j = std::max_element(b.begin(), b.end()
, boost::bind(&node_entry::rtt, _1)
< boost::bind(&node_entry::rtt, _2));
if (j != b.end() && j->rtt > e.rtt)
{
m_ips.erase(j->addr.to_v4().to_bytes());
*j = e;
m_ips.insert(e.addr.to_v4().to_bytes());
// TORRENT_LOG(table) << "replacing node with higher RTT: " << e.id << " " << e.addr;
return ret;
}
}
// if we can't split, try to insert into the replacement bucket
@ -729,7 +756,7 @@ void routing_table::add_router_node(udp::endpoint router)
// was spoofed or not (i.e. pinged == false)
void routing_table::heard_about(node_id const& id, udp::endpoint const& ep)
{
add_node(node_entry(id, ep, false));
add_node(node_entry(id, ep));
}
// this function is called every time the node sees
@ -739,9 +766,9 @@ void routing_table::heard_about(node_id const& id, udp::endpoint const& ep)
// the return value indicates if the table needs a refresh.
// if true, the node should refresh the table (i.e. do a find_node
// on its own id)
bool routing_table::node_seen(node_id const& id, udp::endpoint ep)
bool routing_table::node_seen(node_id const& id, udp::endpoint ep, int rtt)
{
return add_node(node_entry(id, ep, true));
return add_node(node_entry(id, ep, rtt, true));
}
bool routing_table::need_bootstrap() const

View File

@ -313,9 +313,11 @@ bool rpc_manager::incoming(msg const& m, node_id* id)
return false;
}
ptime now = time_now_hires();
#ifdef TORRENT_DHT_VERBOSE_LOGGING
std::ofstream reply_stats("round_trip_ms.log", std::ios::app);
reply_stats << m.addr << "\t" << total_milliseconds(time_now_hires() - o->sent())
reply_stats << m.addr << "\t" << total_milliseconds(now - o->sent())
<< std::endl;
#endif
@ -366,9 +368,11 @@ bool rpc_manager::incoming(msg const& m, node_id* id)
o->reply(m);
*id = node_id(node_id_ent->string_ptr());
int rtt = total_milliseconds(now - o->sent());
// we found an observer for this reply, hence the node is not spoofing
// add it to the routing table
return m_table.node_seen(*id, m.addr);
return m_table.node_seen(*id, m.addr, rtt);
}
time_duration rpc_manager::tick()