diff --git a/ChangeLog b/ChangeLog index 6489a6c51..286f391ec 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,4 @@ + * fixed up some edge cases in DHT routing table and improved unit test of it * added error category and error codes for HTTP errors * made the DHT implementation slightly more robust against routing table poisoning and node ID spoofing * support chunked encoding in http downloads (http_connection) diff --git a/include/libtorrent/kademlia/routing_table.hpp b/include/libtorrent/kademlia/routing_table.hpp index 893e28323..f6bfe9d74 100644 --- a/include/libtorrent/kademlia/routing_table.hpp +++ b/include/libtorrent/kademlia/routing_table.hpp @@ -91,7 +91,7 @@ public: void status(session_status& s) const; - void node_failed(node_id const& id); + void node_failed(node_id const& id, udp::endpoint const& ep); // adds an endpoint that will never be added to // the routing table @@ -132,6 +132,7 @@ public: int bucket_size(int bucket) { int num_buckets = m_buckets.size(); + if (num_buckets == 0) return 0; if (bucket < num_buckets) bucket = num_buckets - 1; table_t::iterator i = m_buckets.begin(); std::advance(i, bucket); @@ -167,6 +168,11 @@ private: table_t::iterator find_bucket(node_id const& id); + // return a pointer the node_entry with the given endpoint + // or 0 if we don't have such a node. Both the address and the + // port has to match + node_entry* find_node(udp::endpoint const& ep, routing_table::table_t::iterator* bucket); + // constant called k in paper int m_bucket_size; diff --git a/src/kademlia/routing_table.cpp b/src/kademlia/routing_table.cpp index b1ce649db..7c2b90e85 100644 --- a/src/kademlia/routing_table.cpp +++ b/src/kademlia/routing_table.cpp @@ -290,61 +290,115 @@ bool compare_ip_cidr(node_entry const& lhs, node_entry const& rhs) return dist <= cutoff; } +node_entry* routing_table::find_node(udp::endpoint const& ep, routing_table::table_t::iterator* bucket) +{ + for (table_t::iterator i = m_buckets.begin() + , end(m_buckets.end()); i != end; ++i) + { + for (bucket_t::iterator j = i->replacements.begin(); + j != i->replacements.end(); ++j) + { + if (j->addr != ep.address()) continue; + if (j->port != ep.port()) continue; + *bucket = i; + return &*j; + } + for (bucket_t::iterator j = i->live_nodes.begin(); + j != i->live_nodes.end(); ++j) + { + if (j->addr != ep.address()) continue; + if (j->port != ep.port()) continue; + *bucket = i; + return &*j; + } + } + *bucket = m_buckets.end(); + return 0; +} + bool routing_table::add_node(node_entry const& e) { if (m_router_nodes.find(e.ep()) != m_router_nodes.end()) return false; bool ret = need_bootstrap(); - // if we're restricting IPs, check if we already have this IP in the table - if (m_settings.restrict_routing_ips - && m_ips.find(e.addr.to_v4().to_bytes()) != m_ips.end()) - { -#ifdef TORRENT_DHT_VERBOSE_LOGGING - bool id_match = false; - bool found = false; - node_id other_id; - for (table_t::iterator i = m_buckets.begin() - , end(m_buckets.end()); i != end; ++i) - { - for (bucket_t::iterator j = i->replacements.begin(); - j != i->replacements.end(); ++j) - { - if (j->addr != e.addr) continue; - found = true; - other_id = j->id; - if (j->id != e.id) continue; - id_match = true; - break; - } - if (id_match) break; - for (bucket_t::iterator j = i->live_nodes.begin(); - j != i->live_nodes.end(); ++j) - { - if (j->addr != e.addr) continue; - found = true; - other_id = j->id; - if (j->id != e.id) continue; - id_match = true; - break; - } - if (id_match) break; - } - TORRENT_ASSERT(found); - if (!id_match) - { - TORRENT_LOG(table) << "ignoring node (duplicate IP): " - << e.id << " " << e.addr - << " existing: " << other_id; - } -#endif - return ret; - } - // don't add ourself if (e.id == m_id) return ret; - table_t::iterator i = find_bucket(e.id); + // do we already have this IP in the table? + if (m_ips.find(e.addr.to_v4().to_bytes()) != m_ips.end()) + { + // this exact IP already exists in the table. It might be the case + // that the node changed IP. If pinged is true, and the port also + // matches the we assume it's in fact the same node, and just update + // the routing table + // pinged means that we have sent a message to the IP, port and received + // a response with a correct transaction ID, i.e. it is verified to not + // be the result of a poioned routing table + + node_entry* existing = 0; + table_t::iterator existing_bucket; + if (!e.pinged() || (existing = find_node(e.ep(), &existing_bucket)) == 0) + { + // the new node is not pinged, or it's not an existing node + // we should ignore it, unless we allow duplicate IPs in our + // routing table + if (m_settings.restrict_routing_ips) + { +#ifdef TORRENT_DHT_VERBOSE_LOGGING + TORRENT_LOG(table) << "ignoring node (duplicate IP): " + << e.id << " " << e.addr; +#endif + return ret; + } + } + if (e.pinged() && existing) + { + // if the node ID is the same, just update the failcount + // and be done with it + if (existing->id == e.id) + { + existing->timeout_count = 0; + return ret; + } + + // delete the current entry before we instert the new one + bucket_t& b = existing_bucket->live_nodes; + bucket_t& rb = existing_bucket->replacements; + bool done = false; + for (bucket_t::iterator i = b.begin(), end(b.end()); + i != end; ++i) + { + if (i->addr != e.addr || i->port != e.port) continue; + #ifdef TORRENT_DHT_VERBOSE_LOGGING + TORRENT_LOG(table) << "node ID changed, deleting old entry: " + << i->id << " " << i->addr; + #endif + b.erase(i); + done = true; + break; + } + if (!done) + { + for (bucket_t::iterator i = rb.begin(), end(rb.end()); + i != end; ++i) + { + if (i->addr != e.addr || i->port != e.port) continue; + #ifdef TORRENT_DHT_VERBOSE_LOGGING + TORRENT_LOG(table) << "node ID changed, deleting old entry: " + << i->id << " " << i->addr; + #endif + rb.erase(i); + done = true; + break; + } + } + TORRENT_ASSERT(done); + m_ips.erase(e.addr.to_v4().to_bytes()); + } + } + + table_t::iterator i = find_bucket(e.id); bucket_t& b = i->live_nodes; bucket_t& rb = i->replacements; @@ -364,7 +418,8 @@ bool routing_table::add_node(node_entry const& e) // just move it to the back since it was // the last node we had any contact with // in this bucket - *j = e; + TORRENT_ASSERT(j->id == e.id && j->ep() == e.ep()); + j->timeout_count = 0; // TORRENT_LOG(table) << "updating node: " << i->id << " " << i->addr; return ret; } @@ -614,7 +669,7 @@ void routing_table::for_each_node( } } -void routing_table::node_failed(node_id const& id) +void routing_table::node_failed(node_id const& id, udp::endpoint const& ep) { // if messages to ourself fails, ignore it if (id == m_id) return; @@ -627,6 +682,11 @@ void routing_table::node_failed(node_id const& id) , boost::bind(&node_entry::id, _1) == id); if (j == b.end()) return; + + // if the endpoint doesn't match, it's a different node + // claiming the same ID. The node we have in our routing + // table is not necessarily stale + if (j->ep() != ep) return; if (rb.empty()) { diff --git a/src/kademlia/traversal_algorithm.cpp b/src/kademlia/traversal_algorithm.cpp index 7da9991d2..3ed742b92 100644 --- a/src/kademlia/traversal_algorithm.cpp +++ b/src/kademlia/traversal_algorithm.cpp @@ -255,7 +255,7 @@ void traversal_algorithm::failed(observer_ptr o, int flags) // don't tell the routing table about // node ids that we just generated ourself if ((o->flags & observer::flag_no_id) == 0) - m_node.m_table.node_failed(o->id()); + m_node.m_table.node_failed(o->id(), o->target_ep()); ++m_timeouts; --m_invoke_count; TORRENT_ASSERT(m_invoke_count >= 0); diff --git a/test/test_primitives.cpp b/test/test_primitives.cpp index c85c3ab23..d219f50d7 100644 --- a/test/test_primitives.cpp +++ b/test/test_primitives.cpp @@ -1355,20 +1355,109 @@ int test_main() // test kademlia routing table dht_settings s; - s.restrict_routing_ips = false; +// s.restrict_routing_ips = false; node_id id = to_hash("3123456789abcdef01232456789abcdef0123456"); dht::routing_table table(id, 10, s); - table.node_seen(id, udp::endpoint(address_v4::any(), rand())); + std::vector nodes; + TEST_EQUAL(table.size().get<0>(), 0); node_id tmp = id; node_id diff = to_hash("15764f7459456a9453f8719b09547c11d5f34061"); - std::vector nodes; + + // test a node with the same IP:port changing ID + add_and_replace(tmp, diff); + table.node_seen(tmp, udp::endpoint(address::from_string("4.4.4.4"), 4)); + table.find_node(id, nodes, 0, 10); + TEST_EQUAL(table.bucket_size(0), 1); + TEST_EQUAL(table.size().get<0>(), 1); + TEST_EQUAL(nodes.size(), 1); + if (!nodes.empty()) + { + TEST_EQUAL(nodes[0].id, tmp); + TEST_EQUAL(nodes[0].addr, address_v4::from_string("4.4.4.4")); + TEST_EQUAL(nodes[0].port, 4); + TEST_EQUAL(nodes[0].timeout_count, 0); + } + + // set timeout_count to 1 + table.node_failed(tmp, udp::endpoint(address_v4::from_string("4.4.4.4"), 4)); + + nodes.clear(); + table.for_each_node(node_push_back, nop, &nodes); + TEST_EQUAL(nodes.size(), 1); + if (!nodes.empty()) + { + TEST_EQUAL(nodes[0].id, tmp); + TEST_EQUAL(nodes[0].addr, address_v4::from_string("4.4.4.4")); + TEST_EQUAL(nodes[0].port, 4); + TEST_EQUAL(nodes[0].timeout_count, 1); + } + + // add the exact same node again, it should set the timeout_count to 0 + table.node_seen(tmp, udp::endpoint(address::from_string("4.4.4.4"), 4)); + nodes.clear(); + table.for_each_node(node_push_back, nop, &nodes); + TEST_EQUAL(nodes.size(), 1); + if (!nodes.empty()) + { + TEST_EQUAL(nodes[0].id, tmp); + TEST_EQUAL(nodes[0].addr, address_v4::from_string("4.4.4.4")); + TEST_EQUAL(nodes[0].port, 4); + TEST_EQUAL(nodes[0].timeout_count, 0); + } + + // test adding the same IP:port again with a new node ID (should replace the old one) + add_and_replace(tmp, diff); + table.node_seen(tmp, udp::endpoint(address::from_string("4.4.4.4"), 4)); + table.find_node(id, nodes, 0, 10); + TEST_EQUAL(table.bucket_size(0), 1); + TEST_EQUAL(nodes.size(), 1); + if (!nodes.empty()) + { + TEST_EQUAL(nodes[0].id, tmp); + TEST_EQUAL(nodes[0].addr, address_v4::from_string("4.4.4.4")); + TEST_EQUAL(nodes[0].port, 4); + } + + // test adding the same node ID again with a different IP (should be ignored) + table.node_seen(tmp, udp::endpoint(address::from_string("4.4.4.4"), 5)); + table.find_node(id, nodes, 0, 10); + TEST_EQUAL(table.bucket_size(0), 1); + if (!nodes.empty()) + { + TEST_EQUAL(nodes[0].id, tmp); + TEST_EQUAL(nodes[0].addr, address_v4::from_string("4.4.4.4")); + TEST_EQUAL(nodes[0].port, 4); + } + + // test adding a node that ends up in the same bucket with an IP + // very close to the current one (should be ignored) + // if restrict_routing_ips == true + table.node_seen(tmp, udp::endpoint(address::from_string("4.4.4.5"), 5)); + table.find_node(id, nodes, 0, 10); + TEST_EQUAL(table.bucket_size(0), 1); + if (!nodes.empty()) + { + TEST_EQUAL(nodes[0].id, tmp); + TEST_EQUAL(nodes[0].addr, address_v4::from_string("4.4.4.4")); + TEST_EQUAL(nodes[0].port, 4); + } + + s.restrict_routing_ips = false; + + add_and_replace(tmp, diff); + table.node_seen(id, udp::endpoint(rand_v4(), rand())); + + nodes.clear(); for (int i = 0; i < 7000; ++i) { - table.node_seen(tmp, udp::endpoint(address_v4::any(), rand())); + table.node_seen(tmp, udp::endpoint(rand_v4(), rand())); add_and_replace(tmp, diff); } TEST_EQUAL(table.num_active_buckets(), 11); + TEST_CHECK(table.size().get<0>() > 10 * 10); +//#error test num_global_nodes +//#error test need_refresh #if defined TORRENT_DHT_VERBOSE_LOGGING || defined TORRENT_DEBUG table.print_state(std::cerr);