fixed up some edge cases in DHT routing table and improved unit test of it. Primarily fixed issues where an IP could take over someone elses node ID and where an IP would change its node ID

This commit is contained in:
Arvid Norberg 2011-01-17 07:49:44 +00:00
parent baffe32942
commit eec2672d26
5 changed files with 210 additions and 54 deletions

View File

@ -1,3 +1,4 @@
* fixed up some edge cases in DHT routing table and improved unit test of it
* added error category and error codes for HTTP errors * added error category and error codes for HTTP errors
* made the DHT implementation slightly more robust against routing table poisoning and node ID spoofing * made the DHT implementation slightly more robust against routing table poisoning and node ID spoofing
* support chunked encoding in http downloads (http_connection) * support chunked encoding in http downloads (http_connection)

View File

@ -91,7 +91,7 @@ public:
void status(session_status& s) const; void status(session_status& s) const;
void node_failed(node_id const& id); void node_failed(node_id const& id, udp::endpoint const& ep);
// adds an endpoint that will never be added to // adds an endpoint that will never be added to
// the routing table // the routing table
@ -132,6 +132,7 @@ public:
int bucket_size(int bucket) int bucket_size(int bucket)
{ {
int num_buckets = m_buckets.size(); int num_buckets = m_buckets.size();
if (num_buckets == 0) return 0;
if (bucket < num_buckets) bucket = num_buckets - 1; if (bucket < num_buckets) bucket = num_buckets - 1;
table_t::iterator i = m_buckets.begin(); table_t::iterator i = m_buckets.begin();
std::advance(i, bucket); std::advance(i, bucket);
@ -167,6 +168,11 @@ private:
table_t::iterator find_bucket(node_id const& id); table_t::iterator find_bucket(node_id const& id);
// return a pointer the node_entry with the given endpoint
// or 0 if we don't have such a node. Both the address and the
// port has to match
node_entry* find_node(udp::endpoint const& ep, routing_table::table_t::iterator* bucket);
// constant called k in paper // constant called k in paper
int m_bucket_size; int m_bucket_size;

View File

@ -290,61 +290,115 @@ bool compare_ip_cidr(node_entry const& lhs, node_entry const& rhs)
return dist <= cutoff; return dist <= cutoff;
} }
node_entry* routing_table::find_node(udp::endpoint const& ep, routing_table::table_t::iterator* bucket)
{
for (table_t::iterator i = m_buckets.begin()
, end(m_buckets.end()); i != end; ++i)
{
for (bucket_t::iterator j = i->replacements.begin();
j != i->replacements.end(); ++j)
{
if (j->addr != ep.address()) continue;
if (j->port != ep.port()) continue;
*bucket = i;
return &*j;
}
for (bucket_t::iterator j = i->live_nodes.begin();
j != i->live_nodes.end(); ++j)
{
if (j->addr != ep.address()) continue;
if (j->port != ep.port()) continue;
*bucket = i;
return &*j;
}
}
*bucket = m_buckets.end();
return 0;
}
bool routing_table::add_node(node_entry const& e) bool routing_table::add_node(node_entry const& e)
{ {
if (m_router_nodes.find(e.ep()) != m_router_nodes.end()) return false; if (m_router_nodes.find(e.ep()) != m_router_nodes.end()) return false;
bool ret = need_bootstrap(); bool ret = need_bootstrap();
// if we're restricting IPs, check if we already have this IP in the table
if (m_settings.restrict_routing_ips
&& m_ips.find(e.addr.to_v4().to_bytes()) != m_ips.end())
{
#ifdef TORRENT_DHT_VERBOSE_LOGGING
bool id_match = false;
bool found = false;
node_id other_id;
for (table_t::iterator i = m_buckets.begin()
, end(m_buckets.end()); i != end; ++i)
{
for (bucket_t::iterator j = i->replacements.begin();
j != i->replacements.end(); ++j)
{
if (j->addr != e.addr) continue;
found = true;
other_id = j->id;
if (j->id != e.id) continue;
id_match = true;
break;
}
if (id_match) break;
for (bucket_t::iterator j = i->live_nodes.begin();
j != i->live_nodes.end(); ++j)
{
if (j->addr != e.addr) continue;
found = true;
other_id = j->id;
if (j->id != e.id) continue;
id_match = true;
break;
}
if (id_match) break;
}
TORRENT_ASSERT(found);
if (!id_match)
{
TORRENT_LOG(table) << "ignoring node (duplicate IP): "
<< e.id << " " << e.addr
<< " existing: " << other_id;
}
#endif
return ret;
}
// don't add ourself // don't add ourself
if (e.id == m_id) return ret; if (e.id == m_id) return ret;
table_t::iterator i = find_bucket(e.id); // do we already have this IP in the table?
if (m_ips.find(e.addr.to_v4().to_bytes()) != m_ips.end())
{
// this exact IP already exists in the table. It might be the case
// that the node changed IP. If pinged is true, and the port also
// matches the we assume it's in fact the same node, and just update
// the routing table
// pinged means that we have sent a message to the IP, port and received
// a response with a correct transaction ID, i.e. it is verified to not
// be the result of a poioned routing table
node_entry* existing = 0;
table_t::iterator existing_bucket;
if (!e.pinged() || (existing = find_node(e.ep(), &existing_bucket)) == 0)
{
// the new node is not pinged, or it's not an existing node
// we should ignore it, unless we allow duplicate IPs in our
// routing table
if (m_settings.restrict_routing_ips)
{
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(table) << "ignoring node (duplicate IP): "
<< e.id << " " << e.addr;
#endif
return ret;
}
}
if (e.pinged() && existing)
{
// if the node ID is the same, just update the failcount
// and be done with it
if (existing->id == e.id)
{
existing->timeout_count = 0;
return ret;
}
// delete the current entry before we instert the new one
bucket_t& b = existing_bucket->live_nodes;
bucket_t& rb = existing_bucket->replacements;
bool done = false;
for (bucket_t::iterator i = b.begin(), end(b.end());
i != end; ++i)
{
if (i->addr != e.addr || i->port != e.port) continue;
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(table) << "node ID changed, deleting old entry: "
<< i->id << " " << i->addr;
#endif
b.erase(i);
done = true;
break;
}
if (!done)
{
for (bucket_t::iterator i = rb.begin(), end(rb.end());
i != end; ++i)
{
if (i->addr != e.addr || i->port != e.port) continue;
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(table) << "node ID changed, deleting old entry: "
<< i->id << " " << i->addr;
#endif
rb.erase(i);
done = true;
break;
}
}
TORRENT_ASSERT(done);
m_ips.erase(e.addr.to_v4().to_bytes());
}
}
table_t::iterator i = find_bucket(e.id);
bucket_t& b = i->live_nodes; bucket_t& b = i->live_nodes;
bucket_t& rb = i->replacements; bucket_t& rb = i->replacements;
@ -364,7 +418,8 @@ bool routing_table::add_node(node_entry const& e)
// just move it to the back since it was // just move it to the back since it was
// the last node we had any contact with // the last node we had any contact with
// in this bucket // in this bucket
*j = e; TORRENT_ASSERT(j->id == e.id && j->ep() == e.ep());
j->timeout_count = 0;
// TORRENT_LOG(table) << "updating node: " << i->id << " " << i->addr; // TORRENT_LOG(table) << "updating node: " << i->id << " " << i->addr;
return ret; return ret;
} }
@ -614,7 +669,7 @@ void routing_table::for_each_node(
} }
} }
void routing_table::node_failed(node_id const& id) void routing_table::node_failed(node_id const& id, udp::endpoint const& ep)
{ {
// if messages to ourself fails, ignore it // if messages to ourself fails, ignore it
if (id == m_id) return; if (id == m_id) return;
@ -627,6 +682,11 @@ void routing_table::node_failed(node_id const& id)
, boost::bind(&node_entry::id, _1) == id); , boost::bind(&node_entry::id, _1) == id);
if (j == b.end()) return; if (j == b.end()) return;
// if the endpoint doesn't match, it's a different node
// claiming the same ID. The node we have in our routing
// table is not necessarily stale
if (j->ep() != ep) return;
if (rb.empty()) if (rb.empty())
{ {

View File

@ -255,7 +255,7 @@ void traversal_algorithm::failed(observer_ptr o, int flags)
// don't tell the routing table about // don't tell the routing table about
// node ids that we just generated ourself // node ids that we just generated ourself
if ((o->flags & observer::flag_no_id) == 0) if ((o->flags & observer::flag_no_id) == 0)
m_node.m_table.node_failed(o->id()); m_node.m_table.node_failed(o->id(), o->target_ep());
++m_timeouts; ++m_timeouts;
--m_invoke_count; --m_invoke_count;
TORRENT_ASSERT(m_invoke_count >= 0); TORRENT_ASSERT(m_invoke_count >= 0);

View File

@ -1355,20 +1355,109 @@ int test_main()
// test kademlia routing table // test kademlia routing table
dht_settings s; dht_settings s;
s.restrict_routing_ips = false; // s.restrict_routing_ips = false;
node_id id = to_hash("3123456789abcdef01232456789abcdef0123456"); node_id id = to_hash("3123456789abcdef01232456789abcdef0123456");
dht::routing_table table(id, 10, s); dht::routing_table table(id, 10, s);
table.node_seen(id, udp::endpoint(address_v4::any(), rand())); std::vector<node_entry> nodes;
TEST_EQUAL(table.size().get<0>(), 0);
node_id tmp = id; node_id tmp = id;
node_id diff = to_hash("15764f7459456a9453f8719b09547c11d5f34061"); node_id diff = to_hash("15764f7459456a9453f8719b09547c11d5f34061");
std::vector<node_entry> nodes;
// test a node with the same IP:port changing ID
add_and_replace(tmp, diff);
table.node_seen(tmp, udp::endpoint(address::from_string("4.4.4.4"), 4));
table.find_node(id, nodes, 0, 10);
TEST_EQUAL(table.bucket_size(0), 1);
TEST_EQUAL(table.size().get<0>(), 1);
TEST_EQUAL(nodes.size(), 1);
if (!nodes.empty())
{
TEST_EQUAL(nodes[0].id, tmp);
TEST_EQUAL(nodes[0].addr, address_v4::from_string("4.4.4.4"));
TEST_EQUAL(nodes[0].port, 4);
TEST_EQUAL(nodes[0].timeout_count, 0);
}
// set timeout_count to 1
table.node_failed(tmp, udp::endpoint(address_v4::from_string("4.4.4.4"), 4));
nodes.clear();
table.for_each_node(node_push_back, nop, &nodes);
TEST_EQUAL(nodes.size(), 1);
if (!nodes.empty())
{
TEST_EQUAL(nodes[0].id, tmp);
TEST_EQUAL(nodes[0].addr, address_v4::from_string("4.4.4.4"));
TEST_EQUAL(nodes[0].port, 4);
TEST_EQUAL(nodes[0].timeout_count, 1);
}
// add the exact same node again, it should set the timeout_count to 0
table.node_seen(tmp, udp::endpoint(address::from_string("4.4.4.4"), 4));
nodes.clear();
table.for_each_node(node_push_back, nop, &nodes);
TEST_EQUAL(nodes.size(), 1);
if (!nodes.empty())
{
TEST_EQUAL(nodes[0].id, tmp);
TEST_EQUAL(nodes[0].addr, address_v4::from_string("4.4.4.4"));
TEST_EQUAL(nodes[0].port, 4);
TEST_EQUAL(nodes[0].timeout_count, 0);
}
// test adding the same IP:port again with a new node ID (should replace the old one)
add_and_replace(tmp, diff);
table.node_seen(tmp, udp::endpoint(address::from_string("4.4.4.4"), 4));
table.find_node(id, nodes, 0, 10);
TEST_EQUAL(table.bucket_size(0), 1);
TEST_EQUAL(nodes.size(), 1);
if (!nodes.empty())
{
TEST_EQUAL(nodes[0].id, tmp);
TEST_EQUAL(nodes[0].addr, address_v4::from_string("4.4.4.4"));
TEST_EQUAL(nodes[0].port, 4);
}
// test adding the same node ID again with a different IP (should be ignored)
table.node_seen(tmp, udp::endpoint(address::from_string("4.4.4.4"), 5));
table.find_node(id, nodes, 0, 10);
TEST_EQUAL(table.bucket_size(0), 1);
if (!nodes.empty())
{
TEST_EQUAL(nodes[0].id, tmp);
TEST_EQUAL(nodes[0].addr, address_v4::from_string("4.4.4.4"));
TEST_EQUAL(nodes[0].port, 4);
}
// test adding a node that ends up in the same bucket with an IP
// very close to the current one (should be ignored)
// if restrict_routing_ips == true
table.node_seen(tmp, udp::endpoint(address::from_string("4.4.4.5"), 5));
table.find_node(id, nodes, 0, 10);
TEST_EQUAL(table.bucket_size(0), 1);
if (!nodes.empty())
{
TEST_EQUAL(nodes[0].id, tmp);
TEST_EQUAL(nodes[0].addr, address_v4::from_string("4.4.4.4"));
TEST_EQUAL(nodes[0].port, 4);
}
s.restrict_routing_ips = false;
add_and_replace(tmp, diff);
table.node_seen(id, udp::endpoint(rand_v4(), rand()));
nodes.clear();
for (int i = 0; i < 7000; ++i) for (int i = 0; i < 7000; ++i)
{ {
table.node_seen(tmp, udp::endpoint(address_v4::any(), rand())); table.node_seen(tmp, udp::endpoint(rand_v4(), rand()));
add_and_replace(tmp, diff); add_and_replace(tmp, diff);
} }
TEST_EQUAL(table.num_active_buckets(), 11); TEST_EQUAL(table.num_active_buckets(), 11);
TEST_CHECK(table.size().get<0>() > 10 * 10);
//#error test num_global_nodes
//#error test need_refresh
#if defined TORRENT_DHT_VERBOSE_LOGGING || defined TORRENT_DEBUG #if defined TORRENT_DHT_VERBOSE_LOGGING || defined TORRENT_DEBUG
table.print_state(std::cerr); table.print_state(std::cerr);