DHT memory optimization

This commit is contained in:
Arvid Norberg 2013-01-28 04:00:23 +00:00
parent ecc42c0a78
commit 7223bf17e5
7 changed files with 112 additions and 74 deletions

View File

@ -1,3 +1,4 @@
* DHT memory optimization
* improve DHT lookup speed
* improve support for windows XP and earlier
* introduce global connection priority for improved swarm performance

View File

@ -1,6 +1,6 @@
/*
Copyright (c) 2006-2012, Arvid Norberg
Copyright (c) 2006-2013, Arvid Norberg
All rights reserved.
Redistribution and use in source and binary forms, with or without
@ -36,6 +36,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/kademlia/node_id.hpp"
#include "libtorrent/socket.hpp"
#include "libtorrent/address.hpp"
#include "libtorrent/union_endpoint.hpp"
namespace libtorrent { namespace dht
{
@ -43,9 +44,8 @@ namespace libtorrent { namespace dht
struct node_entry
{
node_entry(node_id const& id_, udp::endpoint ep, int roundtriptime = 0xffff, bool pinged = false)
: addr(ep.address())
, port(ep.port())
, timeout_count(pinged ? 0 : 0xffff)
: endpoint(ep)
, timeout_count(pinged ? 0 : 0xff)
, rtt(roundtriptime)
, id(id_)
{
@ -55,9 +55,8 @@ struct node_entry
}
node_entry(udp::endpoint ep)
: addr(ep.address())
, port(ep.port())
, timeout_count(0xffff)
: endpoint(ep)
, timeout_count(0xff)
, rtt(0xffff)
, id(0)
{
@ -67,7 +66,7 @@ struct node_entry
}
node_entry()
: timeout_count(0xffff)
: timeout_count(0xff)
, rtt(0xffff)
, id(0)
{
@ -76,27 +75,25 @@ struct node_entry
#endif
}
bool pinged() const { return timeout_count != 0xffff; }
void set_pinged() { if (timeout_count == 0xffff) timeout_count = 0; }
void timed_out() { if (pinged()) ++timeout_count; }
bool pinged() const { return timeout_count != 0xff; }
void set_pinged() { if (timeout_count == 0xff) timeout_count = 0; }
void timed_out() { if (pinged() && timeout_count < 0xfe) ++timeout_count; }
int fail_count() const { return pinged() ? timeout_count : 0; }
void reset_fail_count() { if (pinged()) timeout_count = 0; }
udp::endpoint ep() const { return udp::endpoint(addr, port); }
udp::endpoint ep() const { return udp::endpoint(endpoint); }
bool confirmed() const { return timeout_count == 0; }
void update_rtt(int new_rtt)
{
if (rtt == 0xffff) rtt = new_rtt;
else rtt = int(rtt) / 3 + int(new_rtt) * 2 / 3;
}
address addr() const { return endpoint.address(); }
int port() const { return endpoint.port; }
// TODO: 2 replace with a union of address_v4 and address_v6
// to not waste space. This struct is instantiated hundreds of times
// for the routing table
address addr;
boost::uint16_t port;
union_endpoint endpoint;
// the number of times this node has failed to
// respond in a row
boost::uint16_t timeout_count;
boost::uint8_t timeout_count;
// the average RTT of this node
boost::uint16_t rtt;
node_id id;

View File

@ -46,11 +46,41 @@ namespace libtorrent
*this = ep;
}
union_endpoint(udp::endpoint const& ep)
{
*this = ep;
}
union_endpoint()
{
*this = tcp::endpoint();
}
union_endpoint& operator=(udp::endpoint const& ep)
{
#if TORRENT_USE_IPV6
v4 = ep.address().is_v4();
if (v4)
addr.v4 = ep.address().to_v4().to_bytes();
else
addr.v6 = ep.address().to_v6().to_bytes();
#else
addr.v4 = ep.address().to_v4().to_bytes();
#endif
port = ep.port();
return *this;
}
operator udp::endpoint() const
{
#if TORRENT_USE_IPV6
if (v4) return udp::endpoint(address_v4(addr.v4), port);
else return udp::endpoint(address_v6(addr.v6), port);
#else
return udp::endpoint(address_v4(addr.v4), port);
#endif
}
union_endpoint& operator=(tcp::endpoint const& ep)
{
#if TORRENT_USE_IPV6
@ -66,6 +96,16 @@ namespace libtorrent
return *this;
}
libtorrent::address address() const
{
#if TORRENT_USE_IPV6
if (v4) return address_v4(addr.v4);
else return address_v6(addr.v6);
#else
return address_v4(addr.v4);
#endif
}
operator tcp::endpoint() const
{
#if TORRENT_USE_IPV6

View File

@ -568,7 +568,7 @@ namespace libtorrent { namespace dht
{
std::string node;
std::back_insert_iterator<std::string> out(node);
write_endpoint(udp::endpoint(i->addr, i->port), out);
write_endpoint(i->ep(), out);
nodes.list().push_back(entry(node));
}
if (!nodes.list().empty())

View File

@ -467,13 +467,13 @@ namespace
for (nodes_t::const_iterator i = nodes.begin()
, end(nodes.end()); i != end; ++i)
{
if (!i->addr.is_v4())
if (!i->addr().is_v4())
{
ipv6_nodes = true;
continue;
}
std::copy(i->id.begin(), i->id.end(), out);
write_endpoint(udp::endpoint(i->addr, i->port), out);
write_endpoint(udp::endpoint(i->addr(), i->port()), out);
}
if (ipv6_nodes)
@ -483,12 +483,12 @@ namespace
for (nodes_t::const_iterator i = nodes.begin()
, end(nodes.end()); i != end; ++i)
{
if (!i->addr.is_v6()) continue;
if (!i->addr().is_v6()) continue;
endpoint.resize(18 + 20);
std::string::iterator out = endpoint.begin();
std::copy(i->id.begin(), i->id.end(), out);
out += 20;
write_endpoint(udp::endpoint(i->addr, i->port), out);
write_endpoint(udp::endpoint(i->addr(), i->port()), out);
endpoint.resize(out - endpoint.begin());
p.list().push_back(entry(endpoint));
}

View File

@ -322,13 +322,13 @@ routing_table::table_t::iterator routing_table::find_bucket(node_id const& id)
bool compare_ip_cidr(node_entry const& lhs, node_entry const& rhs)
{
TORRENT_ASSERT(lhs.addr.is_v4() == rhs.addr.is_v4());
TORRENT_ASSERT(lhs.addr().is_v4() == rhs.addr().is_v4());
// the number of bits in the IPs that may match. If
// more bits that this matches, something suspicious is
// going on and we shouldn't add the second one to our
// routing table
int cutoff = rhs.addr.is_v4() ? 8 : 64;
int dist = cidr_distance(lhs.addr, rhs.addr);
int cutoff = rhs.addr().is_v4() ? 8 : 64;
int dist = cidr_distance(lhs.addr(), rhs.addr());
return dist <= cutoff;
}
@ -340,16 +340,16 @@ node_entry* routing_table::find_node(udp::endpoint const& ep, routing_table::tab
for (bucket_t::iterator j = i->replacements.begin();
j != i->replacements.end(); ++j)
{
if (j->addr != ep.address()) continue;
if (j->port != ep.port()) continue;
if (j->addr() != ep.address()) continue;
if (j->port() != ep.port()) continue;
*bucket = i;
return &*j;
}
for (bucket_t::iterator j = i->live_nodes.begin();
j != i->live_nodes.end(); ++j)
{
if (j->addr != ep.address()) continue;
if (j->port != ep.port()) continue;
if (j->addr() != ep.address()) continue;
if (j->port() != ep.port()) continue;
*bucket = i;
return &*j;
}
@ -368,7 +368,7 @@ bool routing_table::add_node(node_entry e)
if (e.id == m_id) return ret;
// do we already have this IP in the table?
if (m_ips.find(e.addr.to_v4().to_bytes()) != m_ips.end())
if (m_ips.find(e.addr().to_v4().to_bytes()) != m_ips.end())
{
// this exact IP already exists in the table. It might be the case
// that the node changed IP. If pinged is true, and the port also
@ -389,7 +389,7 @@ bool routing_table::add_node(node_entry e)
{
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(table) << "ignoring node (duplicate IP): "
<< e.id << " " << e.addr;
<< e.id << " " << e.addr();
#endif
return ret;
}
@ -412,10 +412,10 @@ bool routing_table::add_node(node_entry e)
for (bucket_t::iterator i = b.begin(), end(b.end());
i != end; ++i)
{
if (i->addr != e.addr || i->port != e.port) continue;
if (i->addr() != e.addr() || i->port() != e.port()) continue;
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(table) << "node ID changed, deleting old entry: "
<< i->id << " " << i->addr;
<< i->id << " " << i->addr();
#endif
b.erase(i);
done = true;
@ -426,10 +426,10 @@ bool routing_table::add_node(node_entry e)
for (bucket_t::iterator i = rb.begin(), end(rb.end());
i != end; ++i)
{
if (i->addr != e.addr || i->port != e.port) continue;
if (i->addr() != e.addr() || i->port() != e.port()) continue;
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(table) << "node ID changed, deleting old entry: "
<< i->id << " " << i->addr;
<< i->id << " " << i->addr();
#endif
rb.erase(i);
done = true;
@ -437,7 +437,7 @@ bool routing_table::add_node(node_entry e)
}
}
TORRENT_ASSERT(done);
m_ips.erase(e.addr.to_v4().to_bytes());
m_ips.erase(e.addr().to_v4().to_bytes());
}
}
@ -457,13 +457,13 @@ bool routing_table::add_node(node_entry e)
{
// a new IP address just claimed this node-ID
// ignore it
if (j->addr != e.addr || j->port != e.port) return ret;
if (j->addr() != e.addr() || j->port() != e.port()) return ret;
// we already have the node in our bucket
TORRENT_ASSERT(j->id == e.id && j->ep() == e.ep());
j->timeout_count = 0;
j->update_rtt(e.rtt);
// TORRENT_LOG(table) << "updating node: " << i->id << " " << i->addr;
// TORRENT_LOG(table) << "updating node: " << i->id << " " << i->addr();
return ret;
}
@ -476,12 +476,12 @@ bool routing_table::add_node(node_entry e)
{
// a new IP address just claimed this node-ID
// ignore it
if (j->addr != e.addr || j->port != e.port) return ret;
if (j->addr() != e.addr() || j->port() != e.port()) return ret;
TORRENT_ASSERT(j->id == e.id && j->ep() == e.ep());
j->timeout_count = 0;
j->update_rtt(e.rtt);
e = *j;
m_ips.erase(j->addr.to_v4().to_bytes());
m_ips.erase(j->addr().to_v4().to_bytes());
rb.erase(j);
}
@ -495,9 +495,9 @@ bool routing_table::add_node(node_entry e)
// close to this one. We know that it's not the same, because
// it claims a different node-ID. Ignore this to avoid attacks
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(table) << "ignoring node: " << e.id << " " << e.addr
TORRENT_LOG(table) << "ignoring node: " << e.id << " " << e.addr()
<< " existing node: "
<< j->id << " " << j->addr;
<< j->id << " " << j->addr();
#endif
return ret;
}
@ -507,9 +507,9 @@ bool routing_table::add_node(node_entry e)
{
// same thing but for the replacement bucket
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(table) << "ignoring (replacement) node: " << e.id << " " << e.addr
TORRENT_LOG(table) << "ignoring (replacement) node: " << e.id << " " << e.addr()
<< " existing node: "
<< j->id << " " << j->addr;
<< j->id << " " << j->addr();
#endif
return ret;
}
@ -520,8 +520,8 @@ bool routing_table::add_node(node_entry e)
{
if (b.empty()) b.reserve(bucket_size_limit);
b.push_back(e);
m_ips.insert(e.addr.to_v4().to_bytes());
// TORRENT_LOG(table) << "inserting node: " << e.id << " " << e.addr;
m_ips.insert(e.addr().to_v4().to_bytes());
// TORRENT_LOG(table) << "inserting node: " << e.id << " " << e.addr();
return ret;
}
@ -551,10 +551,10 @@ bool routing_table::add_node(node_entry e)
{
// j points to a node that has not been pinged.
// Replace it with this new one
m_ips.erase(j->addr.to_v4().to_bytes());
m_ips.erase(j->addr().to_v4().to_bytes());
*j = e;
m_ips.insert(e.addr.to_v4().to_bytes());
// TORRENT_LOG(table) << "replacing unpinged node: " << e.id << " " << e.addr;
m_ips.insert(e.addr().to_v4().to_bytes());
// TORRENT_LOG(table) << "replacing unpinged node: " << e.id << " " << e.addr();
return ret;
}
@ -572,10 +572,10 @@ bool routing_table::add_node(node_entry e)
{
// i points to a node that has been marked
// as stale. Replace it with this new one
m_ips.erase(j->addr.to_v4().to_bytes());
m_ips.erase(j->addr().to_v4().to_bytes());
*j = e;
m_ips.insert(e.addr.to_v4().to_bytes());
// TORRENT_LOG(table) << "replacing stale node: " << e.id << " " << e.addr;
m_ips.insert(e.addr().to_v4().to_bytes());
// TORRENT_LOG(table) << "replacing stale node: " << e.id << " " << e.addr();
return ret;
}
@ -587,10 +587,10 @@ bool routing_table::add_node(node_entry e)
if (j != b.end() && j->rtt > e.rtt)
{
m_ips.erase(j->addr.to_v4().to_bytes());
m_ips.erase(j->addr().to_v4().to_bytes());
*j = e;
m_ips.insert(e.addr.to_v4().to_bytes());
// TORRENT_LOG(table) << "replacing node with higher RTT: " << e.id << " " << e.addr;
m_ips.insert(e.addr().to_v4().to_bytes());
// TORRENT_LOG(table) << "replacing node with higher RTT: " << e.id << " " << e.addr();
return ret;
}
}
@ -624,14 +624,14 @@ bool routing_table::add_node(node_entry e)
// less reliable than this one, that has been pinged
j = std::find_if(rb.begin(), rb.end(), boost::bind(&node_entry::pinged, _1) == false);
if (j == rb.end()) j = rb.begin();
m_ips.erase(j->addr.to_v4().to_bytes());
m_ips.erase(j->addr().to_v4().to_bytes());
rb.erase(j);
}
if (rb.empty()) rb.reserve(m_bucket_size);
rb.push_back(e);
m_ips.insert(e.addr.to_v4().to_bytes());
// TORRENT_LOG(table) << "inserting node in replacement cache: " << e.id << " " << e.addr;
m_ips.insert(e.addr().to_v4().to_bytes());
// TORRENT_LOG(table) << "inserting node in replacement cache: " << e.id << " " << e.addr();
return ret;
}
@ -648,7 +648,7 @@ bool routing_table::add_node(node_entry e)
else if (int(nrb.size()) < m_bucket_size)
nrb.push_back(e);
m_ips.insert(e.addr.to_v4().to_bytes());
m_ips.insert(e.addr().to_v4().to_bytes());
while (m_buckets.back().live_nodes.size() > bucket_limit(m_buckets.size()-1))
split_bucket();
@ -773,13 +773,13 @@ void routing_table::node_failed(node_id const& id, udp::endpoint const& ep)
// has never responded at all, remove it
if (j->fail_count() >= m_settings.max_fail_count || !j->pinged())
{
m_ips.erase(j->addr.to_v4().to_bytes());
m_ips.erase(j->addr().to_v4().to_bytes());
b.erase(j);
}
return;
}
m_ips.erase(j->addr.to_v4().to_bytes());
m_ips.erase(j->addr().to_v4().to_bytes());
b.erase(j);
// sort by RTT first, to find the node with the lowest

View File

@ -1602,8 +1602,8 @@ int test_main()
if (!nodes.empty())
{
TEST_EQUAL(nodes[0].id, tmp);
TEST_EQUAL(nodes[0].addr, address_v4::from_string("4.4.4.4"));
TEST_EQUAL(nodes[0].port, 4);
TEST_EQUAL(nodes[0].addr(), address_v4::from_string("4.4.4.4"));
TEST_EQUAL(nodes[0].port(), 4);
TEST_EQUAL(nodes[0].timeout_count, 0);
}
@ -1616,8 +1616,8 @@ int test_main()
if (!nodes.empty())
{
TEST_EQUAL(nodes[0].id, tmp);
TEST_EQUAL(nodes[0].addr, address_v4::from_string("4.4.4.4"));
TEST_EQUAL(nodes[0].port, 4);
TEST_EQUAL(nodes[0].addr(), address_v4::from_string("4.4.4.4"));
TEST_EQUAL(nodes[0].port(), 4);
TEST_EQUAL(nodes[0].timeout_count, 1);
}
@ -1629,8 +1629,8 @@ int test_main()
if (!nodes.empty())
{
TEST_EQUAL(nodes[0].id, tmp);
TEST_EQUAL(nodes[0].addr, address_v4::from_string("4.4.4.4"));
TEST_EQUAL(nodes[0].port, 4);
TEST_EQUAL(nodes[0].addr(), address_v4::from_string("4.4.4.4"));
TEST_EQUAL(nodes[0].port(), 4);
TEST_EQUAL(nodes[0].timeout_count, 0);
}
@ -1643,8 +1643,8 @@ int test_main()
if (!nodes.empty())
{
TEST_EQUAL(nodes[0].id, tmp);
TEST_EQUAL(nodes[0].addr, address_v4::from_string("4.4.4.4"));
TEST_EQUAL(nodes[0].port, 4);
TEST_EQUAL(nodes[0].addr(), address_v4::from_string("4.4.4.4"));
TEST_EQUAL(nodes[0].port(), 4);
}
// test adding the same node ID again with a different IP (should be ignored)
@ -1654,8 +1654,8 @@ int test_main()
if (!nodes.empty())
{
TEST_EQUAL(nodes[0].id, tmp);
TEST_EQUAL(nodes[0].addr, address_v4::from_string("4.4.4.4"));
TEST_EQUAL(nodes[0].port, 4);
TEST_EQUAL(nodes[0].addr(), address_v4::from_string("4.4.4.4"));
TEST_EQUAL(nodes[0].port(), 4);
}
// test adding a node that ends up in the same bucket with an IP
@ -1667,8 +1667,8 @@ int test_main()
if (!nodes.empty())
{
TEST_EQUAL(nodes[0].id, tmp);
TEST_EQUAL(nodes[0].addr, address_v4::from_string("4.4.4.4"));
TEST_EQUAL(nodes[0].port, 4);
TEST_EQUAL(nodes[0].addr(), address_v4::from_string("4.4.4.4"));
TEST_EQUAL(nodes[0].port(), 4);
}
s.restrict_routing_ips = false;