Merge pull request #128 from thomas-yuan/patch2

Add read-only support in dht_settings and outgoing query messages.
This commit is contained in:
Arvid Norberg 2015-09-08 19:47:18 -04:00
commit abe994c191
7 changed files with 215 additions and 7 deletions

View File

@ -290,6 +290,9 @@ void bind_session_settings()
.def_readwrite("privacy_lookups", &dht_settings::privacy_lookups)
.def_readwrite("enforce_node_id", &dht_settings::enforce_node_id)
.def_readwrite("ignore_dark_internet", &dht_settings::ignore_dark_internet)
.def_readwrite("block_timeout", &dht_settings::block_timeout)
.def_readwrite("block_ratelimit", &dht_settings::block_ratelimit)
.def_readwrite("read_only", &dht_settings::read_only)
;
#endif

View File

@ -81,6 +81,7 @@ class TORRENT_EXTRA_EXPORT rpc_manager
public:
rpc_manager(node_id const& our_id
, dht_settings const& settings
, routing_table& table
, udp_socket_interface* sock
, dht_logger* log);
@ -90,7 +91,7 @@ public:
// returns true if the node needs a refresh
// if so, id is assigned the node id to refresh
bool incoming(msg const&, node_id* id, libtorrent::dht_settings const& settings);
bool incoming(msg const&, node_id* id);
time_duration tick();
bool invoke(entry& e, udp::endpoint target
@ -125,6 +126,7 @@ private:
udp_socket_interface* m_sock;
dht_logger* m_log;
dht_settings const& m_settings;
routing_table& m_table;
time_point m_timer;
node_id m_our_id;

View File

@ -1403,6 +1403,7 @@ namespace libtorrent
, ignore_dark_internet(true)
, block_timeout(5 * 60)
, block_ratelimit(5)
, read_only(false)
{}
// the maximum number of peers to send in a reply to ``get_peers``
@ -1488,6 +1489,15 @@ namespace libtorrent
// the max number of packets per second a DHT node is allowed to send
// without getting banned.
int block_ratelimit;
// when set, the other nodes won't keep this node in their routing
// tables, it's meant for low-power and/or ephemeral devices that
// cannot support the DHT, it is also useful for mobile devices which
// are sensitive to network traffic and battery life.
// this node no longer responds to 'query' messages, and will place a
// 'ro' key (value = 1) in the top-level message dictionary of outgoing
// query messages.
bool read_only;
};

View File

@ -109,7 +109,7 @@ node::node(udp_socket_interface* sock
: m_settings(settings)
, m_id(calculate_node_id(nid, observer))
, m_table(m_id, 8, settings, observer)
, m_rpc(m_id, m_table, sock, observer)
, m_rpc(m_id, m_settings, m_table, sock, observer)
, m_observer(observer)
, m_last_tracker_tick(aux::time_now())
, m_last_self_refresh(min_time())
@ -274,12 +274,16 @@ void node::incoming(msg const& m)
case 'r':
{
node_id id;
m_rpc.incoming(m, &id, m_settings);
m_rpc.incoming(m, &id);
break;
}
case 'q':
{
TORRENT_ASSERT(m.message.dict_find_string_value("y") == "q");
// When a DHT node enters the read-only state, it no longer
// responds to 'query' messages that it receives.
if (m_settings.read_only) break;
entry e;
incoming_request(m, e);
m_sock->send_packet(e, m.addr, 0);
@ -296,7 +300,7 @@ void node::incoming(msg const& m)
}
#endif
node_id id;
m_rpc.incoming(m, &id, m_settings);
m_rpc.incoming(m, &id);
break;
}
}

View File

@ -163,11 +163,13 @@ enum { observer_size = max3<
};
rpc_manager::rpc_manager(node_id const& our_id
, dht_settings const& settings
, routing_table& table, udp_socket_interface* sock
, dht_logger* log)
: m_pool_allocator(observer_size, 10)
, m_sock(sock)
, m_log(log)
, m_settings(settings)
, m_table(table)
, m_timer(aux::time_now())
, m_our_id(our_id)
@ -244,8 +246,7 @@ void rpc_manager::unreachable(udp::endpoint const& ep)
}
}
bool rpc_manager::incoming(msg const& m, node_id* id
, libtorrent::dht_settings const& settings)
bool rpc_manager::incoming(msg const& m, node_id* id)
{
INVARIANT_CHECK;
@ -335,7 +336,7 @@ bool rpc_manager::incoming(msg const& m, node_id* id
}
node_id nid = node_id(node_id_ent.string_ptr());
if (settings.enforce_node_id && !verify_id(nid, m.addr.address()))
if (m_settings.enforce_node_id && !verify_id(nid, m.addr.address()))
{
o->timeout();
return false;
@ -439,6 +440,10 @@ bool rpc_manager::invoke(entry& e, udp::endpoint target_addr
io::write_uint16(tid, out);
e["t"] = transaction_id;
// When a DHT node enters the read-only state, in each outgoing query message,
// places a 'ro' key in the top-level message dictionary and sets its value to 1.
if (m_settings.read_only) e["ro"] = 1;
o->set_target(target_addr);
o->set_transaction_id(tid);

View File

@ -704,7 +704,15 @@ namespace aux {
dht_sett["max_dht_items"] = m_dht_settings.max_dht_items;
dht_sett["max_torrent_search_reply"] = m_dht_settings.max_torrent_search_reply;
dht_sett["restrict_routing_ips"] = m_dht_settings.restrict_routing_ips;
dht_sett["restrict_search_ips"] = m_dht_settings.restrict_search_ips;
dht_sett["extended_routing_table"] = m_dht_settings.extended_routing_table;
dht_sett["aggressive_lookups"] = m_dht_settings.aggressive_lookups;
dht_sett["privacy_lookups"] = m_dht_settings.privacy_lookups;
dht_sett["enforce_node_id"] = m_dht_settings.enforce_node_id;
dht_sett["ignore_dark_internet"] = m_dht_settings.ignore_dark_internet;
dht_sett["block_timeout"] = m_dht_settings.block_timeout;
dht_sett["block_ratelimit"] = m_dht_settings.block_ratelimit;
dht_sett["read_only"] = m_dht_settings.read_only;
}
if (m_dht && (flags & session::save_dht_state))
@ -769,8 +777,24 @@ namespace aux {
if (val) m_dht_settings.max_torrent_search_reply = val.int_value();
val = settings.dict_find_int("restrict_routing_ips");
if (val) m_dht_settings.restrict_routing_ips = val.int_value();
val = settings.dict_find_int("restrict_search_ips");
if (val) m_dht_settings.restrict_search_ips = val.int_value();
val = settings.dict_find_int("extended_routing_table");
if (val) m_dht_settings.extended_routing_table = val.int_value();
val = settings.dict_find_int("aggressive_lookups");
if (val) m_dht_settings.aggressive_lookups = val.int_value();
val = settings.dict_find_int("privacy_lookups");
if (val) m_dht_settings.privacy_lookups = val.int_value();
val = settings.dict_find_int("enforce_node_id");
if (val) m_dht_settings.enforce_node_id = val.int_value();
val = settings.dict_find_int("ignore_dark_internet");
if (val) m_dht_settings.ignore_dark_internet = val.int_value();
val = settings.dict_find_int("block_timeout");
if (val) m_dht_settings.block_timeout = val.int_value();
val = settings.dict_find_int("block_ratelimit");
if (val) m_dht_settings.block_ratelimit = val.int_value();
val = settings.dict_find_int("read_only");
if (val) m_dht_settings.read_only = val.int_value();
}
#endif

View File

@ -136,6 +136,57 @@ void lazy_from_entry(entry const& e, bdecode_node& l)
TEST_CHECK(ret == 0);
}
void send_simple_dht_request(node& node, char const* msg, udp::endpoint const& ep
, bdecode_node* reply, entry const& args,
char const* t = "10", bool has_response = true)
{
reply->clear();
entry e;
e["q"] = msg;
e["t"] = t;
e["y"] = "q";
e["a"] = args;
char msg_buf[1500];
int size = bencode(msg_buf, e);
#ifdef TORRENT_USE_VALGRIND
VALGRIND_CHECK_MEM_IS_DEFINED(msg_buf, size);
#endif
bdecode_node decoded;
error_code ec;
bdecode(msg_buf, msg_buf + size, decoded, ec);
TEST_CHECK(!ec);
dht::msg m(decoded, ep);
node.incoming(m);
// If the request is supposed to get a response, by now the node should have
// invoked the send function and put the response in g_sent_packets
std::list<std::pair<udp::endpoint, entry> >::iterator i = find_packet(ep);
if (has_response)
{
if (i == g_sent_packets.end())
{
TEST_ERROR("not response from DHT node");
return;
}
lazy_from_entry(i->second, *reply);
g_sent_packets.erase(i);
return;
}
// this request suppose won't be responsed.
if (i != g_sent_packets.end())
{
TEST_ERROR("shouldn't have response from DHT node");
return;
}
}
void send_dht_request(node& node, char const* msg, udp::endpoint const& ep
, bdecode_node* reply, char const* t = "10", char const* info_hash = 0
, char const* name = 0, std::string const token = std::string(), int port = 0
@ -451,6 +502,16 @@ dht_settings test_settings()
return sett;
}
entry test_args(sha1_hash const* nid = NULL)
{
entry a;
if (nid == NULL) a["id"] = generate_next().to_string();
else a["id"] = nid->to_string();
return a;
}
// TODO: test obfuscated_get_peers
// TODO: 2 split this test up into smaller test cases
TORRENT_TEST(dht)
@ -1334,6 +1395,7 @@ TORRENT_TEST(dht)
{"target", bdecode_node::string_t, 20, key_desc_t::last_child},
};
// bootstrap
g_sent_packets.clear();
@ -2059,5 +2121,103 @@ TORRENT_TEST(routing_table_extended)
#endif
}
TORRENT_TEST(read_only_node)
{
dht_settings sett = test_settings();
sett.read_only = true;
mock_socket s;
obs observer;
counters cnt;
dht::node node(&s, sett, node_id(0), &observer, cnt);
udp::endpoint source(address::from_string("10.0.0.1"), 20);
bdecode_node response;
entry args = test_args();
// for incoming requests, read_only node won't response.
send_simple_dht_request(node, "ping", source, &response, args, "10", false);
TEST_EQUAL(response.type(), bdecode_node::none_t);
args["target"] = "01010101010101010101";
send_simple_dht_request(node, "get", source, &response, args, "10", false);
TEST_EQUAL(response.type(), bdecode_node::none_t);
// also, the sender shouldn't be added to routing table.
boost::tuple<int, int, int> nums = node.size();
TEST_EQUAL(nums.get<0>(), 0);
TEST_EQUAL(nums.get<1>(), 0);
TEST_EQUAL(nums.get<2>(), 0);
// for outgoing requests, read_only node will add 'ro' key (value == 1)
// in top-level of request.
bdecode_node parsed[7];
char error_string[200];
udp::endpoint initial_node(address_v4::from_string("4.4.4.4"), 1234);
node.m_table.add_node(initial_node);
bdecode_node request;
sha1_hash target = generate_next();
node.get_item(target, get_item_cb);
TEST_EQUAL(g_sent_packets.size(), 1);
TEST_EQUAL(g_sent_packets.front().first, initial_node);
dht::key_desc_t get_item_desc[] = {
{"y", bdecode_node::string_t, 1, 0},
{"t", bdecode_node::string_t, 2, 0},
{"q", bdecode_node::string_t, 3, 0},
{"ro", bdecode_node::int_t, 4, key_desc_t::optional},
{"a", bdecode_node::dict_t, 0, key_desc_t::parse_children},
{"id", bdecode_node::string_t, 20, 0},
{"target", bdecode_node::string_t, 20, key_desc_t::last_child},
};
lazy_from_entry(g_sent_packets.front().second, request);
bool ret = verify_message(request, get_item_desc, parsed, 7, error_string
, sizeof(error_string));
TEST_CHECK(ret);
TEST_EQUAL(parsed[3].int_value(), 1);
// should have one node now, whichi is 4.4.4.4:1234
nums = node.size();
TEST_EQUAL(nums.get<0>(), 1);
TEST_EQUAL(nums.get<1>(), 0);
TEST_EQUAL(nums.get<2>(), 0);
// now, disable read_only, try again.
g_sent_packets.clear();
sett.read_only = false;
send_simple_dht_request(node, "get", source, &response, args, "10", true);
// sender should be added to routing table
nums = node.size();
TEST_EQUAL(nums.get<0>(), 2);
TEST_EQUAL(nums.get<1>(), 0);
TEST_EQUAL(nums.get<2>(), 0);
// now, request shouldn't have 'ro' key anymore
g_sent_packets.clear();
target = generate_next();
node.get_item(target, get_item_cb);
// since we have 2 nodes, we should have two packets.
TEST_EQUAL(g_sent_packets.size(), 2);
// both of them shouldn't a 'ro' key.
lazy_from_entry(g_sent_packets.front().second, request);
ret = verify_message(request, get_item_desc, parsed, 7, error_string
, sizeof(error_string));
TEST_CHECK(ret);
TEST_CHECK(!parsed[3]);
lazy_from_entry(g_sent_packets.back().second, request);
ret = verify_message(request, get_item_desc, parsed, 7, error_string
, sizeof(error_string));
TEST_CHECK(ret);
TEST_CHECK(!parsed[3]);
}
#endif