only allow pinged nodes into the routing table (#3006)

This is an attempt to prevent bogus nodes entering the routing table and
being propogated to other nodes.
This commit is contained in:
Steven Siloti 2018-05-08 03:47:29 -07:00 committed by Arvid Norberg
parent 81fd09775c
commit 96d215a32a
4 changed files with 67 additions and 53 deletions

View File

@ -1669,6 +1669,7 @@ COLUMN OPTIONS
, short_progress_bar + (8 - std::min(8, n.num_replacements))); , short_progress_bar + (8 - std::min(8, n.num_replacements)));
out += str; out += str;
pos += 1; pos += 1;
++bucket;
} }
for (lt::dht_lookup const& l : dht_active_requests) for (lt::dht_lookup const& l : dht_active_requests)

View File

@ -138,7 +138,7 @@ TORRENT_TEST(dht_bootstrap)
{ {
bootstrap_session({&dht}, ses); bootstrap_session({&dht}, ses);
} }
if (ticks > 2) if (ticks > 500)
{ {
ses.post_session_stats(); ses.post_session_stats();
std::printf("depth: %d nodes: %d\n", routing_table_depth, num_nodes); std::printf("depth: %d nodes: %d\n", routing_table_depth, num_nodes);

View File

@ -263,6 +263,21 @@ node_entry const* routing_table::next_refresh()
candidate = &n; candidate = &n;
} }
} }
if (i == m_buckets.rbegin()
|| int(i->live_nodes.size()) < bucket_limit(int(std::distance(i, end)) - 1))
{
// this bucket isn't full or it can be split
// check for an unpinged replacement
// node which may be eligible for the live bucket if confirmed
auto r = std::find_if(i->replacements.begin(), i->replacements.end()
, [](node_entry const& e) { return !e.pinged() && e.last_queried == min_time(); });
if (r != i->replacements.end())
{
candidate = &*r;
goto out;
}
}
} }
out: out:
@ -367,7 +382,7 @@ void routing_table::fill_from_replacements(table_t::iterator bucket)
while (int(b.size()) < bucket_size && !rb.empty()) while (int(b.size()) < bucket_size && !rb.empty())
{ {
auto j = std::find_if(rb.begin(), rb.end(), std::bind(&node_entry::pinged, _1)); auto j = std::find_if(rb.begin(), rb.end(), std::bind(&node_entry::pinged, _1));
if (j == rb.end()) j = rb.begin(); if (j == rb.end()) break;
b.push_back(*j); b.push_back(*j);
rb.erase(j); rb.erase(j);
} }
@ -487,6 +502,9 @@ routing_table::add_node_status_t routing_table::add_node_impl(node_entry e)
existing->update_rtt(e.rtt); existing->update_rtt(e.rtt);
existing->last_queried = e.last_queried; existing->last_queried = e.last_queried;
} }
// if this was a replacement node it may be elligible for
// promotion to the live bucket
fill_from_replacements(existing_bucket);
return node_added; return node_added;
} }
else if (existing->id.is_all_zeros()) else if (existing->id.is_all_zeros())
@ -614,11 +632,11 @@ routing_table::add_node_status_t routing_table::add_node_impl(node_entry e)
ip_ok: ip_ok:
// can we split the bucket? // can we split the bucket?
// only nodes that haven't failed can split the bucket, and we can only // only nodes that have been confirmed can split the bucket, and we can only
// split the last bucket // split the last bucket
bool const can_split = (std::next(i) == m_buckets.end() bool const can_split = (std::next(i) == m_buckets.end()
&& m_buckets.size() < 159) && m_buckets.size() < 159)
&& e.fail_count() == 0 && e.confirmed()
&& (i == m_buckets.begin() || std::prev(i)->live_nodes.size() > 1); && (i == m_buckets.begin() || std::prev(i)->live_nodes.size() > 1);
// if there's room in the main bucket, just insert it // if there's room in the main bucket, just insert it
@ -626,7 +644,7 @@ ip_ok:
// bucket's size limit. This makes use split the low-numbered buckets split // bucket's size limit. This makes use split the low-numbered buckets split
// earlier when we have larger low buckets, to make it less likely that we // earlier when we have larger low buckets, to make it less likely that we
// lose nodes // lose nodes
if (int(b.size()) < (can_split ? next_bucket_size_limit : bucket_size_limit)) if (e.pinged() && int(b.size()) < (can_split ? next_bucket_size_limit : bucket_size_limit))
{ {
if (b.empty()) b.reserve(bucket_size_limit); if (b.empty()) b.reserve(bucket_size_limit);
b.push_back(e); b.push_back(e);
@ -634,9 +652,7 @@ ip_ok:
return node_added; return node_added;
} }
// if there is no room, we look for nodes that are not 'pinged', // if there is no room, we look for nodes marked as stale
// i.e. we haven't confirmed that they respond to messages.
// Then we look for nodes marked as stale
// in the k-bucket. If we find one, we can replace it. // in the k-bucket. If we find one, we can replace it.
// then we look for nodes with the same 3 bit prefix (or however // then we look for nodes with the same 3 bit prefix (or however
// many bits prefix the bucket size warrants). If there is no other // many bits prefix the bucket size warrants). If there is no other
@ -644,24 +660,8 @@ ip_ok:
// as the last replacement strategy, if the node we found matching our // as the last replacement strategy, if the node we found matching our
// bit prefix has higher RTT than the new node, replace it. // bit prefix has higher RTT than the new node, replace it.
if (e.pinged() && e.fail_count() == 0) if (e.confirmed())
{ {
// if the node we're trying to insert is considered pinged,
// we may replace other nodes that aren't pinged
j = std::find_if(b.begin(), b.end()
, [](node_entry const& ne) { return !ne.pinged(); });
if (j != b.end() && !j->pinged())
{
// j points to a node that has not been pinged.
// Replace it with this new one
m_ips.erase(j->addr());
*j = e;
m_ips.insert(e.addr());
return node_added;
}
// A node is considered stale if it has failed at least one // A node is considered stale if it has failed at least one
// time. Here we choose the node that has failed most times. // time. Here we choose the node that has failed most times.
// If we don't find one, place this node in the replacement- // If we don't find one, place this node in the replacement-
@ -899,7 +899,7 @@ void routing_table::split_bucket()
{ {
if (distance_exp(m_id, j->id) >= 159 - bucket_index) if (distance_exp(m_id, j->id) >= 159 - bucket_index)
{ {
if (int(b.size()) >= bucket_size_limit) if (!j->pinged() || int(b.size()) >= bucket_size_limit)
{ {
++j; ++j;
continue; continue;
@ -909,7 +909,7 @@ void routing_table::split_bucket()
else else
{ {
// this entry belongs in the new bucket // this entry belongs in the new bucket
if (int(new_bucket.size()) < new_bucket_size) if (j->pinged() && int(new_bucket.size()) < new_bucket_size)
new_bucket.push_back(*j); new_bucket.push_back(*j);
else else
new_replacement_bucket.push_back(*j); new_replacement_bucket.push_back(*j);
@ -1147,6 +1147,7 @@ void routing_table::check_invariant() const
for (auto const& j : i.live_nodes) for (auto const& j : i.live_nodes)
{ {
TORRENT_ASSERT(j.addr().is_v4() == i.live_nodes.begin()->addr().is_v4()); TORRENT_ASSERT(j.addr().is_v4() == i.live_nodes.begin()->addr().is_v4());
TORRENT_ASSERT(j.pinged());
all_ips.insert(j.addr()); all_ips.insert(j.addr());
} }
} }

View File

@ -1070,7 +1070,9 @@ void test_id_enforcement(address(&rand_addr)())
// verify that we reject invalid node IDs // verify that we reject invalid node IDs
// this is now an invalid node-id for 'source' // this is now an invalid node-id for 'source'
nid[0] = 0x18; nid[0] = 0x18;
int nodes_num = std::get<0>(t.dht_node.size()); // the test nodes don't get pinged so they will only get as far
// as the replacement bucket
int nodes_num = std::get<1>(t.dht_node.size());
send_dht_request(t.dht_node, "find_node", t.source, &response send_dht_request(t.dht_node, "find_node", t.source, &response
, msg_args() , msg_args()
.target(sha1_hash("0101010101010101010101010101010101010101")) .target(sha1_hash("0101010101010101010101010101010101010101"))
@ -1100,7 +1102,7 @@ void test_id_enforcement(address(&rand_addr)())
} }
// a node with invalid node-id shouldn't be added to routing table. // a node with invalid node-id shouldn't be added to routing table.
TEST_EQUAL(std::get<0>(t.dht_node.size()), nodes_num); TEST_EQUAL(std::get<1>(t.dht_node.size()), nodes_num);
// now the node-id is valid. // now the node-id is valid.
if (is_v4(t.source)) if (is_v4(t.source))
@ -1133,7 +1135,7 @@ void test_id_enforcement(address(&rand_addr)())
std::printf(" invalid error response: %s\n", t.error_string); std::printf(" invalid error response: %s\n", t.error_string);
} }
// node with valid node-id should be added to routing table. // node with valid node-id should be added to routing table.
TEST_EQUAL(std::get<0>(t.dht_node.size()), nodes_num + 1); TEST_EQUAL(std::get<1>(t.dht_node.size()), nodes_num + 1);
} }
} // anonymous namespace } // anonymous namespace
@ -1240,14 +1242,14 @@ namespace {
std::array<node_entry, 8> build_nodes() std::array<node_entry, 8> build_nodes()
{ {
std::array<node_entry, 8> nodes = { std::array<node_entry, 8> nodes = {
{ node_entry(items[0].target, udp::endpoint(addr4("1.1.1.1"), 1231)) { node_entry(items[0].target, udp::endpoint(addr4("1.1.1.1"), 1231), 10, true)
, node_entry(items[1].target, udp::endpoint(addr4("2.2.2.2"), 1232)) , node_entry(items[1].target, udp::endpoint(addr4("2.2.2.2"), 1232), 10, true)
, node_entry(items[2].target, udp::endpoint(addr4("3.3.3.3"), 1233)) , node_entry(items[2].target, udp::endpoint(addr4("3.3.3.3"), 1233), 10, true)
, node_entry(items[3].target, udp::endpoint(addr4("4.4.4.4"), 1234)) , node_entry(items[3].target, udp::endpoint(addr4("4.4.4.4"), 1234), 10, true)
, node_entry(items[4].target, udp::endpoint(addr4("5.5.5.5"), 1235)) , node_entry(items[4].target, udp::endpoint(addr4("5.5.5.5"), 1235), 10, true)
, node_entry(items[5].target, udp::endpoint(addr4("6.6.6.6"), 1236)) , node_entry(items[5].target, udp::endpoint(addr4("6.6.6.6"), 1236), 10, true)
, node_entry(items[6].target, udp::endpoint(addr4("7.7.7.7"), 1237)) , node_entry(items[6].target, udp::endpoint(addr4("7.7.7.7"), 1237), 10, true)
, node_entry(items[7].target, udp::endpoint(addr4("8.8.8.8"), 1238)) } , node_entry(items[7].target, udp::endpoint(addr4("8.8.8.8"), 1238), 10, true) }
}; };
return nodes; return nodes;
} }
@ -1255,15 +1257,15 @@ namespace {
std::array<node_entry, 9> build_nodes(sha1_hash target) std::array<node_entry, 9> build_nodes(sha1_hash target)
{ {
std::array<node_entry, 9> nodes = { std::array<node_entry, 9> nodes = {
{ node_entry(target, udp::endpoint(addr4("1.1.1.1"), 1231)) { node_entry(target, udp::endpoint(addr4("1.1.1.1"), 1231), 10, true)
, node_entry(target, udp::endpoint(addr4("2.2.2.2"), 1232)) , node_entry(target, udp::endpoint(addr4("2.2.2.2"), 1232), 10, true)
, node_entry(target, udp::endpoint(addr4("3.3.3.3"), 1233)) , node_entry(target, udp::endpoint(addr4("3.3.3.3"), 1233), 10, true)
, node_entry(target, udp::endpoint(addr4("4.4.4.4"), 1234)) , node_entry(target, udp::endpoint(addr4("4.4.4.4"), 1234), 10, true)
, node_entry(target, udp::endpoint(addr4("5.5.5.5"), 1235)) , node_entry(target, udp::endpoint(addr4("5.5.5.5"), 1235), 10, true)
, node_entry(target, udp::endpoint(addr4("6.6.6.6"), 1236)) , node_entry(target, udp::endpoint(addr4("6.6.6.6"), 1236), 10, true)
, node_entry(target, udp::endpoint(addr4("7.7.7.7"), 1237)) , node_entry(target, udp::endpoint(addr4("7.7.7.7"), 1237), 10, true)
, node_entry(target, udp::endpoint(addr4("8.8.8.8"), 1238)) , node_entry(target, udp::endpoint(addr4("8.8.8.8"), 1238), 10, true)
, node_entry(target, udp::endpoint(addr4("9.9.9.9"), 1239)) } , node_entry(target, udp::endpoint(addr4("9.9.9.9"), 1239), 10, true) }
}; };
return nodes; return nodes;
} }
@ -2099,7 +2101,8 @@ void test_get_peers(address(&rand_addr)())
dht::node_id const target = to_hash("1234876923549721020394873245098347598635"); dht::node_id const target = to_hash("1234876923549721020394873245098347598635");
udp::endpoint const initial_node(rand_addr(), 1234); udp::endpoint const initial_node(rand_addr(), 1234);
t.dht_node.m_table.add_node(node_entry{initial_node}); dht::node_id const initial_node_id = to_hash("1111111111222222222233333333334444444444");
t.dht_node.m_table.add_node(node_entry{initial_node_id, initial_node, 10, true});
t.dht_node.announce(target, 1234, false, get_peers_cb); t.dht_node.announce(target, 1234, false, get_peers_cb);
@ -2234,7 +2237,8 @@ void test_mutable_get(address(&rand_addr)(), bool const with_salt)
g_sent_packets.clear(); g_sent_packets.clear();
udp::endpoint const initial_node(rand_addr(), 1234); udp::endpoint const initial_node(rand_addr(), 1234);
t.dht_node.m_table.add_node(node_entry{initial_node}); dht::node_id const initial_node_id = to_hash("1111111111222222222233333333334444444444");
t.dht_node.m_table.add_node(node_entry{initial_node_id, initial_node, 10, true});
g_put_item.assign(items[0].ent, salt, seq, pk, sk); g_put_item.assign(items[0].ent, salt, seq, pk, sk);
t.dht_node.put_item(pk, std::string() t.dht_node.put_item(pk, std::string()
@ -2333,7 +2337,8 @@ TORRENT_TEST(immutable_get)
g_sent_packets.clear(); g_sent_packets.clear();
udp::endpoint initial_node(addr4("4.4.4.4"), 1234); udp::endpoint initial_node(addr4("4.4.4.4"), 1234);
t.dht_node.m_table.add_node(node_entry{initial_node}); dht::node_id const initial_node_id = to_hash("1111111111222222222233333333334444444444");
t.dht_node.m_table.add_node(node_entry{initial_node_id, initial_node, 10, true});
t.dht_node.get_item(items[0].target, get_immutable_item_cb); t.dht_node.get_item(items[0].target, get_immutable_item_cb);
@ -3246,7 +3251,8 @@ TORRENT_TEST(read_only_node)
bdecode_node parsed[7]; bdecode_node parsed[7];
char error_string[200]; char error_string[200];
udp::endpoint initial_node(addr("4.4.4.4"), 1234); udp::endpoint initial_node(addr("4.4.4.4"), 1234);
node.m_table.add_node(node_entry{initial_node}); dht::node_id const initial_node_id = to_hash("1111111111222222222233333333334444444444");
node.m_table.add_node(node_entry{initial_node_id, initial_node, 10, true});
bdecode_node request; bdecode_node request;
sha1_hash target = generate_next(); sha1_hash target = generate_next();
@ -3272,16 +3278,21 @@ TORRENT_TEST(read_only_node)
// should have one node now, which is 4.4.4.4:1234 // should have one node now, which is 4.4.4.4:1234
TEST_EQUAL(std::get<0>(node.size()), 1); TEST_EQUAL(std::get<0>(node.size()), 1);
// and no replacement nodes
TEST_EQUAL(std::get<1>(node.size()), 0);
// now, disable read_only, try again. // now, disable read_only, try again.
g_sent_packets.clear(); g_sent_packets.clear();
sett.read_only = false; sett.read_only = false;
send_dht_request(node, "get", source, &response); send_dht_request(node, "get", source, &response);
// sender should be added to routing table, there are 2 nodes now. // sender should be added to repacement bucket
TEST_EQUAL(std::get<0>(node.size()), 2); TEST_EQUAL(std::get<1>(node.size()), 1);
g_sent_packets.clear(); g_sent_packets.clear();
#if 0
// TODO: this won't work because the second node isn't pinged so it wont
// be added to the routing table
target = generate_next(); target = generate_next();
node.get_item(target, get_immutable_item_cb); node.get_item(target, get_immutable_item_cb);
@ -3300,6 +3311,7 @@ TORRENT_TEST(read_only_node)
TEST_CHECK(ret); TEST_CHECK(ret);
TEST_CHECK(!parsed[3]); TEST_CHECK(!parsed[3]);
#endif
} }
#ifndef TORRENT_DISABLE_LOGGING #ifndef TORRENT_DISABLE_LOGGING