IPv6 DHT support

Fixes #110
This commit is contained in:
Steven Siloti 2015-11-13 21:08:57 -08:00
parent 356d2506bd
commit 2abd9867ce
17 changed files with 696 additions and 155 deletions

View File

@ -101,6 +101,7 @@
* added support for asynchronous disk I/O
* almost completely changed the storage interface (for custom storage)
* added support for hashing pieces in multiple threads
* added support for BEP 32, "IPv6 extension for DHT"
* fix division by zero in super-seeding logic

View File

@ -142,7 +142,7 @@ namespace libtorrent { namespace dht
boost::shared_ptr<dht_tracker> self()
{ return shared_from_this(); }
void connection_timeout(error_code const& e);
void connection_timeout(node& n, error_code const& e);
void refresh_timeout(error_code const& e);
void refresh_key(error_code const& e);
@ -157,6 +157,9 @@ namespace libtorrent { namespace dht
counters& m_counters;
node m_dht;
#if TORRENT_USE_IPV6
node m_dht6;
#endif
send_fun_t m_send_fun;
dht_logger* m_log;
@ -165,9 +168,14 @@ namespace libtorrent { namespace dht
deadline_timer m_key_refresh_timer;
deadline_timer m_connection_timer;
#if TORRENT_USE_IPV6
deadline_timer m_connection_timer6;
#endif
deadline_timer m_refresh_timer;
dht_settings const& m_settings;
std::map<std::string, node*> m_nodes;
bool m_abort;
// used to resolve hostnames for nodes

View File

@ -73,6 +73,9 @@ namespace libtorrent { namespace dht
struct traversal_algorithm;
struct dht_observer;
extern char const* address_type_names[num_address_type];
extern char const* address_type_keys[num_address_type];
void TORRENT_EXTRA_EXPORT write_nodes_entry(entry& r, nodes_t const& nodes);
struct null_type {};
@ -99,9 +102,10 @@ protected:
class TORRENT_EXTRA_EXPORT node : boost::noncopyable
{
public:
node(udp_socket_interface* sock
node(address_type at, udp_socket_interface* sock
, libtorrent::dht_settings const& settings, node_id nid
, dht_observer* observer, counters& cnt
, std::map<std::string, node*> const& nodes
, dht_storage_constructor_type storage_constructor = dht_default_storage_constructor);
~node();
@ -204,6 +208,21 @@ public:
counters& stats_counters() const { return m_counters; }
dht_observer* observer() const { return m_observer; }
address_type native_address_type() { return m_address_type; }
char const* native_address_name() { return address_type_names[m_address_type]; }
char const* native_nodes_key() { return address_type_keys[m_address_type]; }
bool native_address(udp::endpoint ep) const
{ return native_address(ep.address()); }
bool native_address(tcp::endpoint ep) const
{ return native_address(ep.address()); }
bool native_address(address addr) const
{
return (addr.is_v4() && m_address_type == ipv4)
|| (addr.is_v6() && m_address_type == ipv6);
}
private:
void send_single_refresh(udp::endpoint const& ep, int bucket
@ -224,15 +243,21 @@ private:
void incoming_request(msg const& h, entry& e);
void write_nodes_entries(sha1_hash const& info_hash
, bdecode_node const& want, entry& r);
node_id m_id;
public:
routing_table m_table;
rpc_manager m_rpc;
std::map<std::string, node*> const& m_nodes;
private:
dht_observer* m_observer;
address_type m_address_type;
time_point m_last_tracker_tick;
// the last time we issued a bootstrap or a refresh on our own ID, to expand

View File

@ -79,15 +79,29 @@ struct ip_set
size_t count(address addr);
void erase(address addr);
void clear()
{
m_ip4s.clear();
#if TORRENT_USE_IPV6
m_ip6s.clear();
#endif
}
bool operator==(ip_set const& rh)
{
#if TORRENT_USE_IPV6
return m_ip4s == rh.m_ip4s && m_ip6s == rh.m_ip6s;
#else
return m_ip4s == rh.m_ip4s;
#endif
}
// these must be multisets because there can be multiple routing table
// entries for a single IP when restrict_routing_ips is set to false
boost::unordered_multiset<address_v4::bytes_type> m_ip4s;
#if TORRENT_USE_IPV6
boost::unordered_multiset<address_v6::bytes_type> m_ip6s;
#endif
};
// differences in the implementation from the description in
@ -118,7 +132,8 @@ public:
// Perhaps replacement nodes should be in a separate vector.
typedef std::vector<routing_table_node> table_t;
routing_table(node_id const& id, int bucket_size
routing_table(node_id const& id, address_type at
, int bucket_size
, dht_settings const& settings
, dht_logger* log);
@ -229,6 +244,12 @@ public:
bool is_full(int bucket) const;
bool native_address(address addr) const
{
return (addr.is_v4() && m_address_type == ipv4)
|| (addr.is_v6() && m_address_type == ipv6);
}
private:
#ifndef TORRENT_DISABLE_LOGGING
@ -256,6 +277,7 @@ private:
table_t m_buckets;
node_id m_id; // our own node id
address_type m_address_type; // address type to be stored
// the last seen depth (i.e. levels in the routing table)
// it's mutable because it's updated by depth(), which is const

View File

@ -113,7 +113,7 @@ protected:
delete p;
}
node & m_node;
node& m_node;
std::vector<observer_ptr> m_results;
node_id const m_target;
boost::uint16_t m_ref_count;
@ -124,8 +124,9 @@ protected:
// the IP addresses of the nodes in m_results
std::set<boost::uint32_t> m_peer4_prefixes;
// no IPv6 support yet anyway
// std::set<boost::uint64_t> m_peer6_prefixes;
#if TORRENT_USE_IPV6
std::set<boost::uint64_t> m_peer6_prefixes;
#endif
};
struct traversal_observer : observer

View File

@ -84,12 +84,12 @@ struct dht_node final : lt::dht::udp_socket_interface
: m_io_service(sim, addr_from_int(idx))
#if LIBSIMULATOR_USE_MOVE
, m_socket(m_io_service)
, m_dht(this, sett, id_from_addr(m_io_service.get_ips().front())
, nullptr, cnt)
, m_dht(ipv4, this, sett, id_from_addr(m_io_service.get_ips().front())
, nullptr, cnt, std::map<std::string, lt::dht::node*>())
#else
, m_socket(new asio::ip::udp::socket(m_io_service))
, m_dht(new lt::dht::node(this, sett, id_from_addr(m_io_service.get_ips().front())
, nullptr, cnt))
, m_dht(new lt::dht::node(ipv4, this, sett, id_from_addr(m_io_service.get_ips().front())
, nullptr, cnt, std::map<std::string, lt::dht::node*>()))
#endif
, m_add_dead_nodes(flags & add_dead_nodes)
{
@ -99,7 +99,6 @@ struct dht_node final : lt::dht::udp_socket_interface
udp::socket::non_blocking_io ioc(true);
sock().io_control(ioc);
sock().async_receive_from(asio::mutable_buffers_1(m_buffer, sizeof(m_buffer))
, m_ep, boost::bind(&dht_node::on_read, this, _1, _2));
}
@ -117,8 +116,9 @@ struct dht_node final : lt::dht::udp_socket_interface
// reserving space in the vector before emplacing any nodes).
dht_node(dht_node&& n) noexcept
: m_socket(std::move(n.m_socket))
, m_dht(this, n.m_dht.settings(), n.m_dht.nid()
, n.m_dht.observer(), n.m_dht.stats_counters())
, m_dht(ipv4, this, n.m_dht.settings(), n.m_dht.nid()
, n.m_dht.observer(), n.m_dht.stats_counters()
, std::map<std::string, lt::dht::node*>())
{
assert(false && "dht_node is not movable");
throw std::runtime_error("dht_node is not movable");

View File

@ -255,10 +255,14 @@ void setup_swarm(int num_nodes
for (int i = 0; i < num_nodes; ++i)
{
// create a new io_service
std::vector<asio::ip::address> ips;
char ep[30];
snprintf(ep, sizeof(ep), "50.0.%d.%d", (i + 1) >> 8, (i + 1) & 0xff);
ips.push_back(addr(ep));
snprintf(ep, sizeof(ep), "2000::%X%X", (i + 1) >> 8, (i + 1) & 0xff);
ips.push_back(addr(ep));
io_service.push_back(boost::make_shared<sim::asio::io_service>(
boost::ref(sim), addr(ep)));
boost::ref(sim), ips));
lt::settings_pack pack = default_settings;

View File

@ -44,6 +44,10 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/socket_io.hpp"
#include "setup_swarm.hpp"
#include "setup_dht.hpp"
#include "libtorrent/ed25519.hpp"
#include "libtorrent/bencode.hpp"
#include "libtorrent/kademlia/item.hpp"
#include <boost/bind.hpp>
namespace lt = libtorrent;

View File

@ -56,8 +56,13 @@ struct obs : dht::dht_observer
virtual void set_external_address(address const& addr
, address const& source) TORRENT_OVERRIDE
{}
virtual address external_address() TORRENT_OVERRIDE
{ return address_v4::from_string("40.30.20.10"); }
virtual address external_address(udp proto) TORRENT_OVERRIDE
{
if (proto == udp::v4())
return address_v4::from_string("40.30.20.10");
else
return address_v6();
}
virtual void get_peers(sha1_hash const& ih) TORRENT_OVERRIDE {}
virtual void outgoing_get_peers(sha1_hash const& target
, sha1_hash const& sent_target, udp::endpoint const& ep) TORRENT_OVERRIDE {}

View File

@ -76,10 +76,10 @@ namespace libtorrent { namespace dht
time_duration const key_refresh
= duration_cast<time_duration>(minutes(5));
node_id extract_node_id(entry const& e)
node_id extract_node_id(entry const& e, std::string const& key)
{
if (e.type() != entry::dictionary_t) return (node_id::min)();
entry const* nid = e.find_key("node-id");
entry const* nid = e.find_key(key);
if (nid == NULL || nid->type() != entry::string_t || nid->string().length() != 20)
return (node_id::min)();
return node_id(nid->string().c_str());
@ -97,11 +97,19 @@ namespace libtorrent { namespace dht
, dht_storage_constructor_type storage_constructor
, entry const& state)
: m_counters(cnt)
, m_dht(this, settings, extract_node_id(state), observer, cnt, storage_constructor)
, m_dht(ipv4, this, settings, extract_node_id(state, "node-id")
, observer, cnt, m_nodes, storage_constructor)
#if TORRENT_USE_IPV6
, m_dht6(ipv6, this, settings, extract_node_id(state, "node-id6")
, observer, cnt, m_nodes, storage_constructor)
#endif
, m_send_fun(send_fun)
, m_log(observer)
, m_key_refresh_timer(ios)
, m_connection_timer(ios)
#if TORRENT_USE_IPV6
, m_connection_timer6(ios)
#endif
, m_refresh_timer(ios)
, m_settings(settings)
, m_abort(false)
@ -111,9 +119,19 @@ namespace libtorrent { namespace dht
{
m_blocker.set_block_timer(m_settings.block_timeout);
m_blocker.set_rate_limit(m_settings.block_ratelimit);
m_nodes.insert(std::make_pair(m_dht.native_address_name(), &m_dht));
#if TORRENT_USE_IPV6
m_nodes.insert(std::make_pair(m_dht6.native_address_name(), &m_dht6));
#endif
#ifndef TORRENT_DISABLE_LOGGING
m_log->log(dht_logger::tracker, "starting DHT tracker with node id: %s"
m_log->log(dht_logger::tracker, "starting IPv4 DHT tracker with node id: %s"
, to_hex(m_dht.nid().to_string()).c_str());
#if TORRENT_USE_IPV6
m_log->log(dht_logger::tracker, "starting IPv6 DHT tracker with node id: %s"
, to_hex(m_dht6.nid().to_string()).c_str());
#endif
#endif
}
@ -131,6 +149,9 @@ namespace libtorrent { namespace dht
, find_data::nodes_callback const& f)
{
std::vector<udp::endpoint> initial_nodes;
#if TORRENT_USE_IPV6
std::vector<udp::endpoint> initial_nodes6;
#endif
if (bootstrap.type() == entry::dictionary_t)
{
@ -138,6 +159,12 @@ namespace libtorrent { namespace dht
if (entry const* nodes = bootstrap.find_key("nodes"))
read_endpoint_list<udp::endpoint>(nodes, initial_nodes);
} TORRENT_CATCH(std::exception&) {}
#if TORRENT_USE_IPV6
TORRENT_TRY{
if (entry const* nodes = bootstrap.find_key("nodes6"))
read_endpoint_list<udp::endpoint>(nodes, initial_nodes6);
} TORRENT_CATCH(std::exception&) {}
#endif
}
error_code ec;
@ -145,11 +172,20 @@ namespace libtorrent { namespace dht
m_connection_timer.expires_from_now(seconds(1), ec);
m_connection_timer.async_wait(
boost::bind(&dht_tracker::connection_timeout, self(), _1));
boost::bind(&dht_tracker::connection_timeout, self(), boost::ref(m_dht), _1));
#if TORRENT_USE_IPV6
m_connection_timer6.expires_from_now(seconds(1), ec);
m_connection_timer6.async_wait(
boost::bind(&dht_tracker::connection_timeout, self(), boost::ref(m_dht6), _1));
#endif
m_refresh_timer.expires_from_now(seconds(5), ec);
m_refresh_timer.async_wait(boost::bind(&dht_tracker::refresh_timeout, self(), _1));
m_dht.bootstrap(initial_nodes, f);
#if TORRENT_USE_IPV6
m_dht6.bootstrap(initial_nodes6, f);
#endif
}
void dht_tracker::stop()
@ -158,6 +194,9 @@ namespace libtorrent { namespace dht
error_code ec;
m_key_refresh_timer.cancel(ec);
m_connection_timer.cancel(ec);
#if TORRENT_USE_IPV6
m_connection_timer6.cancel(ec);
#endif
m_refresh_timer.cancel(ec);
m_host_resolver.cancel();
}
@ -180,14 +219,19 @@ namespace libtorrent { namespace dht
m_dht.update_stats_counters(c);
}
void dht_tracker::connection_timeout(error_code const& e)
void dht_tracker::connection_timeout(node& n, error_code const& e)
{
if (e || m_abort) return;
time_duration d = m_dht.connection_timeout();
time_duration d = n.connection_timeout();
error_code ec;
m_connection_timer.expires_from_now(d, ec);
m_connection_timer.async_wait(boost::bind(&dht_tracker::connection_timeout, self(), _1));
#if TORRENT_USE_IPV6
deadline_timer& timer = n.native_address_type() == ipv4 ? m_connection_timer : m_connection_timer6;
#else
deadline_timer& timer = m_connection_timer;
#endif
timer.expires_from_now(d, ec);
timer.async_wait(boost::bind(&dht_tracker::connection_timeout, self(), boost::ref(n), _1));
}
void dht_tracker::refresh_timeout(error_code const& e)
@ -195,6 +239,9 @@ namespace libtorrent { namespace dht
if (e || m_abort) return;
m_dht.tick();
#if TORRENT_USE_IPV6
m_dht6.tick();
#endif
// periodically update the DOS blocker's settings from the dht_settings
m_blocker.set_block_timer(m_settings.block_timeout);
@ -215,6 +262,7 @@ namespace libtorrent { namespace dht
m_key_refresh_timer.async_wait(boost::bind(&dht_tracker::refresh_key, self(), _1));
m_dht.new_write_key();
m_dht6.new_write_key();
#ifndef TORRENT_DISABLE_LOGGING
m_log->log(dht_logger::tracker, "*** new write key***");
#endif
@ -224,6 +272,11 @@ namespace libtorrent { namespace dht
#if defined TORRENT_DEBUG && TORRENT_USE_IOSTREAM
std::ofstream st("dht_routing_table_state.txt", std::ios_base::trunc);
m_dht.print_state(st);
#if TORRENT_USE_IPV6
std::ofstream st6("dht6_routing_table_state.txt", std::ios_base::trunc);
m_dht6.print_state(st6);
#endif
#endif
*/
@ -231,18 +284,111 @@ namespace libtorrent { namespace dht
, boost::function<void(std::vector<tcp::endpoint> const&)> f)
{
m_dht.get_peers(ih, f, NULL, false);
#if TORRENT_USE_IPV6
m_dht6.get_peers(ih, f, NULL, false);
#endif
}
void dht_tracker::announce(sha1_hash const& ih, int listen_port, int flags
, boost::function<void(std::vector<tcp::endpoint> const&)> f)
{
m_dht.announce(ih, listen_port, flags, f);
#if TORRENT_USE_IPV6
m_dht6.announce(ih, listen_port, flags, f);
#endif
}
namespace {
struct get_immutable_item_ctx
{
get_immutable_item_ctx(int traversals)
: active_traversals(traversals)
, item_posted(false)
{}
int active_traversals;
bool item_posted;
};
// these functions provide a slightly higher level
// interface to the get/put functionality in the DHT
void get_immutable_item_callback(item const& it, boost::shared_ptr<get_immutable_item_ctx> ctx
, boost::function<void(item const&)> f)
{
// the reason to wrap here is to control the return value
// since it controls whether we re-put the content
TORRENT_ASSERT(!it.is_mutable());
--ctx->active_traversals;
if (!ctx->item_posted && (!it.empty() || ctx->active_traversals == 0))
{
ctx->item_posted = true;
f(it);
}
}
struct get_mutable_item_ctx
{
get_mutable_item_ctx(int traversals) : active_traversals(traversals) {}
int active_traversals;
item it;
};
bool get_mutable_item_callback(item const& it, bool authoritative
, boost::shared_ptr<get_mutable_item_ctx> ctx
, boost::function<void(item const&, bool)> f)
{
TORRENT_ASSERT(it.is_mutable());
if (authoritative) --ctx->active_traversals;
authoritative = authoritative && ctx->active_traversals == 0;
if ((ctx->it.empty() && !it.empty()) || (ctx->it.seq() < it.seq()))
{
ctx->it = it;
f(it, authoritative);
}
else if (authoritative)
f(it, authoritative);
return false;
}
struct put_item_ctx
{
put_item_ctx(int traversals)
: active_traversals(traversals)
, response_count(0)
{}
int active_traversals;
int response_count;
};
void put_immutable_item_callback(int responses, boost::shared_ptr<put_item_ctx> ctx
, boost::function<void(int)> f)
{
ctx->response_count += responses;
if (--ctx->active_traversals == 0)
f(ctx->response_count);
}
void put_mutable_item_callback(item const& it, int responses, boost::shared_ptr<put_item_ctx> ctx
, boost::function<void(item const&, int)> cb)
{
ctx->response_count += responses;
if (--ctx->active_traversals == 0)
cb(it, ctx->response_count);
}
} // anonymous namespace
void dht_tracker::get_item(sha1_hash const& target
, boost::function<void(item const&)> cb)
{
m_dht.get_item(target, cb);
boost::shared_ptr<get_immutable_item_ctx>
ctx = boost::make_shared<get_immutable_item_ctx>((TORRENT_USE_IPV6) ? 2 : 1);
m_dht.get_item(target, boost::bind(&get_immutable_item_callback, _1, ctx, cb));
#if TORRENT_USE_IPV6
m_dht6.get_item(target, boost::bind(&get_immutable_item_callback, _1, ctx, cb));
#endif
}
// key is a 32-byte binary string, the public key to look up.
@ -251,7 +397,12 @@ namespace libtorrent { namespace dht
, boost::function<void(item const&, bool)> cb
, std::string salt)
{
m_dht.get_item(key, salt, cb);
boost::shared_ptr<get_mutable_item_ctx>
ctx = boost::make_shared<get_mutable_item_ctx>((TORRENT_USE_IPV6) ? 2 : 1);
m_dht.get_item(key, salt, boost::bind(&get_mutable_item_callback, _1, _2, ctx, cb));
#if TORRENT_USE_IPV6
m_dht6.get_item(key, salt, boost::bind(&get_mutable_item_callback, _1, _2, ctx, cb));
#endif
}
void dht_tracker::put_item(entry const& data
@ -262,20 +413,41 @@ namespace libtorrent { namespace dht
sha1_hash target = item_target_id(
std::pair<char const*, int>(flat_data.c_str(), flat_data.size()));
m_dht.put_item(target, data, cb);
boost::shared_ptr<put_item_ctx>
ctx = boost::make_shared<put_item_ctx>((TORRENT_USE_IPV6) ? 2 : 1);
m_dht.put_item(target, data, boost::bind(&put_immutable_item_callback
, _1, ctx, cb));
#if TORRENT_USE_IPV6
m_dht6.put_item(target, data, boost::bind(&put_immutable_item_callback
, _1, ctx, cb));
#endif
}
void dht_tracker::put_item(char const* key
, boost::function<void(item const&, int)> cb
, boost::function<void(item&)> data_cb, std::string salt)
{
m_dht.put_item(key, salt, cb, data_cb);
boost::shared_ptr<put_item_ctx>
ctx = boost::make_shared<put_item_ctx>((TORRENT_USE_IPV6) ? 2 : 1);
m_dht.put_item(key, salt, boost::bind(&put_mutable_item_callback
, _1, _2, ctx, cb), data_cb);
#if TORRENT_USE_IPV6
m_dht6.put_item(key, salt, boost::bind(&put_mutable_item_callback
, _1, _2, ctx, cb), data_cb);
#endif
}
void dht_tracker::direct_request(udp::endpoint ep, entry& e
, boost::function<void(msg const&)> f)
{
m_dht.direct_request(ep, e, f);
#if TORRENT_USE_IPV6
if (ep.protocol() == udp::v6())
m_dht6.direct_request(ep, e, f);
else
#endif
m_dht.direct_request(ep, e, f);
}
void dht_tracker::incoming_error(error_code const& ec, udp::endpoint const& ep)
@ -292,6 +464,9 @@ namespace libtorrent { namespace dht
)
{
m_dht.unreachable(ep);
#if TORRENT_USE_IPV6
m_dht6.unreachable(ep);
#endif
}
}
@ -299,8 +474,6 @@ namespace libtorrent { namespace dht
, char const* buf, int size)
{
if (size <= 20 || *buf != 'd' || buf[size-1] != 'e') return false;
// remove this line/check once the DHT supports IPv6
if (!ep.address().is_v4()) return false;
m_counters.inc_stats_counter(counters::dht_bytes_in, size);
// account for IP and UDP overhead
@ -364,6 +537,9 @@ namespace libtorrent { namespace dht
libtorrent::dht::msg m(m_msg, ep);
m_dht.incoming(m);
#if TORRENT_USE_IPV6
m_dht6.incoming(m);
#endif
return true;
}
@ -378,40 +554,52 @@ namespace libtorrent { namespace dht
n->list().push_back(entry(node));
}
void save_nodes(entry& ret, node const& dht, std::string const& key)
{
entry nodes(entry::list_t);
dht.m_table.for_each_node(&add_node_fun, &add_node_fun, &nodes);
bucket_t cache;
dht.replacement_cache(cache);
for (bucket_t::iterator i(cache.begin())
, end(cache.end()); i != end; ++i)
{
std::string node;
std::back_insert_iterator<std::string> out(node);
write_endpoint(i->ep(), out);
nodes.list().push_back(entry(node));
}
if (!nodes.list().empty())
ret[key] = nodes;
}
} // anonymous namespace
entry dht_tracker::state() const
{
entry ret(entry::dictionary_t);
{
entry nodes(entry::list_t);
m_dht.m_table.for_each_node(&add_node_fun, &add_node_fun, &nodes);
bucket_t cache;
m_dht.replacement_cache(cache);
for (bucket_t::iterator i(cache.begin())
, end(cache.end()); i != end; ++i)
{
std::string node;
std::back_insert_iterator<std::string> out(node);
write_endpoint(i->ep(), out);
nodes.list().push_back(entry(node));
}
if (!nodes.list().empty())
ret["nodes"] = nodes;
}
save_nodes(ret, m_dht, "nodes");
ret["node-id"] = m_dht.nid().to_string();
#if TORRENT_USE_IPV6
save_nodes(ret, m_dht6, "nodes6");
ret["node-id6"] = m_dht6.nid().to_string();
#endif
return ret;
}
void dht_tracker::add_node(udp::endpoint node)
{
m_dht.add_node(node);
#if TORRENT_USE_IPV6
m_dht6.add_node(node);
#endif
}
void dht_tracker::add_router_node(udp::endpoint const& node)
{
m_dht.add_router_node(node);
#if TORRENT_USE_IPV6
m_dht6.add_router_node(node);
#endif
}
bool dht_tracker::has_quota()

View File

@ -63,7 +63,8 @@ void get_peers_observer::reply(msg const& m)
if (n)
{
std::vector<tcp::endpoint> peer_list;
if (n.list_size() == 1 && n.list_at(0).type() == bdecode_node::string_t)
if (n.list_size() == 1 && n.list_at(0).type() == bdecode_node::string_t
&& m.addr.protocol() == udp::v4())
{
// assume it's mainline format
char const* peers = n.list_at(0).string_ptr();

View File

@ -69,6 +69,9 @@ namespace libtorrent { namespace dht
using detail::write_endpoint;
char const* address_type_names[num_address_type] = { "n4", "n6" };
char const* address_type_keys[num_address_type] = { "nodes", "nodes6" };
namespace {
void nop() {}
@ -93,16 +96,19 @@ node_id calculate_node_id(node_id const& nid, dht_observer* observer, address_ty
} // anonymous namespace
node::node(udp_socket_interface* sock
node::node(address_type at, udp_socket_interface* sock
, dht_settings const& settings, node_id nid
, dht_observer* observer
, struct counters& cnt
, std::map<std::string, node*> const& nodes
, dht_storage_constructor_type storage_constructor)
: m_settings(settings)
, m_id(calculate_node_id(nid, observer, ipv4))
, m_table(m_id, 8, settings, observer)
, m_id(calculate_node_id(nid, observer, at))
, m_table(m_id, at, 8, settings, observer)
, m_rpc(m_id, m_settings, m_table, sock, observer)
, m_nodes(nodes)
, m_observer(observer)
, m_address_type(at)
, m_last_tracker_tick(aux::time_now())
, m_last_self_refresh(min_time())
, m_sock(sock)
@ -126,7 +132,7 @@ void node::update_node_id()
// it's possible that our external address hasn't actually changed. If our
// current ID is still valid, don't do anything.
if (verify_id(m_id, m_observer->external_address()))
if (verify_id(m_id, m_observer->external_address(m_address_type)))
return;
#ifndef TORRENT_DISABLE_LOGGING
@ -134,7 +140,7 @@ void node::update_node_id()
, "updating node ID (because external IP address changed)");
#endif
m_id = generate_id(m_observer->external_address());
m_id = generate_id(m_observer->external_address(m_address_type));
m_table.update_node_id(m_id);
}
@ -303,6 +309,8 @@ void node::incoming(msg const& m)
// responds to 'query' messages that it receives.
if (m_settings.read_only) break;
if (!native_address(m.addr)) break;
if (!m_sock->has_quota())
{
m_counters.inc_stats_counter(counters::dht_messages_in_dropped);
@ -407,6 +415,7 @@ void node::add_router_node(udp::endpoint router)
void node::add_node(udp::endpoint node)
{
if (!native_address(node)) return;
// ping the node, and if we get a reply, it
// will be added to the routing table
send_single_refresh(node, m_table.num_active_buckets());
@ -600,7 +609,11 @@ struct ping_observer : observer
}
// look for nodes
bdecode_node n = r.dict_find_string("nodes");
#if TORRENT_USE_IPV6
address_type at = algorithm()->get_node().native_address_type();
#endif
char const* nodes_key = algorithm()->get_node().native_nodes_key();
bdecode_node n = r.dict_find_string(nodes_key);
if (n)
{
char const* nodes = n.string_ptr();
@ -611,8 +624,14 @@ struct ping_observer : observer
node_id id;
std::copy(nodes, nodes + 20, id.begin());
nodes += 20;
algorithm()->get_node().m_table.heard_about(id
, detail::read_v4_endpoint<udp::endpoint>(nodes));
udp::endpoint ep;
#if TORRENT_USE_IPV6
if (at == ipv6)
ep = detail::read_v6_endpoint<udp::endpoint>(nodes);
else
#endif
ep = detail::read_v4_endpoint<udp::endpoint>(nodes);
algorithm()->get_node().m_table.heard_about(id, ep);
}
}
}
@ -765,14 +784,12 @@ void node::lookup_peers(sha1_hash const& info_hash, entry& reply
m_storage->get_peers(info_hash, noseed, scrape, reply);
}
void TORRENT_EXTRA_EXPORT write_nodes_entry(entry& r, nodes_t const& nodes)
void TORRENT_EXTRA_EXPORT write_nodes_entry(entry& n, nodes_t const& nodes)
{
entry& n = r["nodes"];
std::back_insert_iterator<std::string> out(n.string());
for (nodes_t::const_iterator i = nodes.begin()
, end(nodes.end()); i != end; ++i)
{
if (!i->addr().is_v4()) continue;
std::copy(i->id.begin(), i->id.end(), out);
write_endpoint(udp::endpoint(i->addr(), i->port()), out);
}
@ -843,9 +860,10 @@ void node::incoming_request(msg const& m, entry& e)
{"info_hash", bdecode_node::string_t, 20, 0},
{"noseed", bdecode_node::int_t, 0, key_desc_t::optional},
{"scrape", bdecode_node::int_t, 0, key_desc_t::optional},
{"want", bdecode_node::list_t, 0, key_desc_t::optional},
};
bdecode_node msg_keys[3];
bdecode_node msg_keys[4];
if (!verify_message(arg_ent, msg_desc, msg_keys, error_string
, sizeof(error_string)))
{
@ -859,10 +877,8 @@ void node::incoming_request(msg const& m, entry& e)
m_counters.inc_stats_counter(counters::dht_get_peers_in);
sha1_hash info_hash(msg_keys[0].string_ptr());
nodes_t n;
// always return nodes as well as peers
m_table.find_node(info_hash, n, 0);
write_nodes_entry(reply, n);
write_nodes_entries(info_hash, msg_keys[3], reply);
bool noseed = false;
bool scrape = false;
@ -881,9 +897,10 @@ void node::incoming_request(msg const& m, entry& e)
{
key_desc_t msg_desc[] = {
{"target", bdecode_node::string_t, 20, 0},
{"want", bdecode_node::list_t, 0, key_desc_t::optional},
};
bdecode_node msg_keys[1];
bdecode_node msg_keys[2];
if (!verify_message(arg_ent, msg_desc, msg_keys, error_string, sizeof(error_string)))
{
incoming_error(e, error_string);
@ -893,10 +910,7 @@ void node::incoming_request(msg const& m, entry& e)
m_counters.inc_stats_counter(counters::dht_find_node_in);
sha1_hash target(msg_keys[0].string_ptr());
// TODO: 2 find_node should write directly to the response entry
nodes_t n;
m_table.find_node(target, n, 0);
write_nodes_entry(reply, n);
write_nodes_entries(target, msg_keys[1], reply);
}
else if (query_len == 13 && memcmp(query, "announce_peer", 13) == 0)
{
@ -1112,12 +1126,13 @@ void node::incoming_request(msg const& m, entry& e)
key_desc_t msg_desc[] = {
{"seq", bdecode_node::int_t, 0, key_desc_t::optional},
{"target", bdecode_node::string_t, 20, 0},
{"want", bdecode_node::list_t, 0, key_desc_t::optional},
};
// k is not used for now
// attempt to parse the message
bdecode_node msg_keys[2];
bdecode_node msg_keys[3];
if (!verify_message(arg_ent, msg_desc, msg_keys, error_string
, sizeof(error_string)))
{
@ -1135,10 +1150,8 @@ void node::incoming_request(msg const& m, entry& e)
reply["token"] = generate_token(m.addr, msg_keys[1].string_ptr());
nodes_t n;
// always return nodes as well as peers
m_table.find_node(target, n, 0);
write_nodes_entry(reply, n);
write_nodes_entries(target, msg_keys[2], reply);
// if the get has a sequence number it must be for a mutable item
// so don't bother searching the immutable table
@ -1173,12 +1186,43 @@ void node::incoming_request(msg const& m, entry& e)
}
sha1_hash target(target_ent.string_ptr());
nodes_t n;
// always return nodes as well as peers
m_table.find_node(target, n, 0);
write_nodes_entry(reply, n);
write_nodes_entries(target, arg_ent.dict_find_list("want"), reply);
return;
}
}
void node::write_nodes_entries(sha1_hash const& info_hash
, bdecode_node const& want, entry& r)
{
// if no wants entry was specified, include a nodes
// entry based on the protocol the request came in with
if (want.type() != bdecode_node::list_t)
{
nodes_t n;
m_table.find_node(info_hash, n, 0);
write_nodes_entry(r[native_nodes_key()], n);
return;
}
// if there is a wants entry then we may need to reach into
// another node's routing table to get nodes of the requested type
// we use a map maintained by the owning dht_tracker to find the
// node associated with each string in the want list, which may
// include this node
for (int i = 0; i < want.list_size(); ++i)
{
bdecode_node wanted = want.list_at(i);
if (wanted.type() != bdecode_node::string_t)
continue;
std::map<std::string, node*>::const_iterator wanted_node
= m_nodes.find(wanted.string_value());
if (wanted_node == m_nodes.end())
continue;
nodes_t n;
wanted_node->second->m_table.find_node(info_hash, n, 0);
write_nodes_entry(r[wanted_node->second->native_nodes_key()], n);
}
}
} } // namespace libtorrent::dht

View File

@ -58,6 +58,20 @@ POSSIBILITY OF SUCH DAMAGE.
using boost::uint8_t;
#if BOOST_VERSION <= 104700
namespace boost {
size_t hash_value(libtorrent::address_v4::bytes_type ip)
{
return boost::hash_value(*reinterpret_cast<boost::uint32_t*>(&ip[0]));
}
size_t hash_value(libtorrent::address_v6::bytes_type ip)
{
return boost::hash_value(*reinterpret_cast<boost::uint64_t*>(&ip[0]));
}
}
#endif
namespace libtorrent { namespace dht
{
namespace
@ -82,29 +96,35 @@ namespace
void ip_set::insert(address addr)
{
if (addr.is_v4())
m_ip4s.insert(addr.to_v4().to_bytes());
else
#if TORRENT_USE_IPV6
if (addr.is_v6())
m_ip6s.insert(addr.to_v6().to_bytes());
else
#endif
m_ip4s.insert(addr.to_v4().to_bytes());
}
size_t ip_set::count(address addr)
{
if (addr.is_v4())
return m_ip4s.count(addr.to_v4().to_bytes());
else
#if TORRENT_USE_IPV6
if (addr.is_v6())
return m_ip6s.count(addr.to_v6().to_bytes());
else
#endif
return m_ip4s.count(addr.to_v4().to_bytes());
}
void ip_set::erase(address addr)
{
if (addr.is_v4())
erase_one(m_ip4s, addr.to_v4().to_bytes());
else
#if TORRENT_USE_IPV6
if (addr.is_v6())
erase_one(m_ip6s, addr.to_v6().to_bytes());
else
#endif
erase_one(m_ip4s, addr.to_v4().to_bytes());
}
routing_table::routing_table(node_id const& id, int bucket_size
routing_table::routing_table(node_id const& id, address_type at, int bucket_size
, dht_settings const& settings
, dht_logger* log)
:
@ -113,6 +133,7 @@ routing_table::routing_table(node_id const& id, int bucket_size
#endif
m_settings(settings)
, m_id(id)
, m_address_type(at)
, m_depth(0)
, m_last_self_refresh(min_time())
, m_bucket_size(bucket_size)
@ -589,6 +610,10 @@ routing_table::add_node_status_t routing_table::add_node_impl(node_entry e)
// INVARIANT_CHECK;
#endif
// don't add if the address isn't the right type
if (!native_address(e.addr()))
return failed_to_add;
// if we already have this (IP,port), don't do anything
if (m_router_nodes.find(e.ep()) != m_router_nodes.end())
return failed_to_add;

View File

@ -287,8 +287,11 @@ bool rpc_manager::incoming(msg const& m, node_id* id)
if (!o)
{
#ifndef TORRENT_DISABLE_LOGGING
m_log->log(dht_logger::rpc_manager, "reply with unknown transaction id size: %d from %s"
, int(transaction_id.size()), print_endpoint(m.addr).c_str());
if (m_table.native_address(m.addr.address()))
{
m_log->log(dht_logger::rpc_manager, "reply with unknown transaction id size: %d from %s"
, int(transaction_id.size()), print_endpoint(m.addr).c_str());
}
#endif
// this isn't necessarily because the other end is doing
// something wrong. This can also happen when we restart
@ -464,6 +467,12 @@ bool rpc_manager::invoke(entry& e, udp::endpoint target_addr
// places a 'ro' key in the top-level message dictionary and sets its value to 1.
if (m_settings.read_only) e["ro"] = 1;
node& n = o->algorithm()->get_node();
if (!n.native_address(o->target_addr()))
{
a["want"].list().push_back(entry(n.native_address_name()));
}
o->set_target(target_addr);
o->set_transaction_id(tid);

View File

@ -165,31 +165,45 @@ void traversal_algorithm::add_entry(node_id const& id, udp::endpoint addr, unsig
if (m_node.settings().restrict_search_ips
&& !(flags & observer::flag_initial))
{
// mask the lower octet
boost::uint32_t prefix4 = o->target_addr().to_v4().to_ulong();
prefix4 &= 0xffffff00;
if (m_peer4_prefixes.count(prefix4) > 0)
#if TORRENT_USE_IPV6
if (o->target_addr().is_v6())
{
// we already have a node in this search with an IP very
// close to this one. We know that it's not the same, because
// it claims a different node-ID. Ignore this to avoid attacks
#ifndef TORRENT_DISABLE_LOGGING
if (get_node().observer())
{
char hex_id[41];
to_hex(reinterpret_cast<char const*>(&o->id()[0]), 20, hex_id);
get_node().observer()->log(dht_logger::traversal
, "[%p] traversal DUPLICATE node. id: %s addr: %s type: %s"
, static_cast<void*>(this), hex_id, print_address(o->target_addr()).c_str(), name());
}
address_v6::bytes_type addr_bytes = o->target_addr().to_v6().to_bytes();
address_v6::bytes_type::const_iterator prefix_it = addr_bytes.begin();
boost::uint64_t prefix6 = detail::read_uint64(prefix_it);
if (m_peer6_prefixes.insert(prefix6).second)
goto add_result;
}
else
#endif
return;
{
// mask the lower octet
boost::uint32_t prefix4 = o->target_addr().to_v4().to_ulong();
prefix4 &= 0xffffff00;
if (m_peer4_prefixes.insert(prefix4).second)
goto add_result;
}
m_peer4_prefixes.insert(prefix4);
// we already have a node in this search with an IP very
// close to this one. We know that it's not the same, because
// it claims a different node-ID. Ignore this to avoid attacks
#ifndef TORRENT_DISABLE_LOGGING
if (get_node().observer())
{
char hex_id[41];
to_hex(reinterpret_cast<char const*>(&o->id()[0]), 20, hex_id);
get_node().observer()->log(dht_logger::traversal
, "[%p] traversal DUPLICATE node. id: %s addr: %s type: %s"
, static_cast<void*>(this), hex_id, print_address(o->target_addr()).c_str(), name());
}
#endif
return;
}
add_result:
TORRENT_ASSERT((o->flags & observer::flag_no_id)
|| std::find_if(m_results.begin(), m_results.end()
, boost::bind(&observer::id, _1) == id) == m_results.end());
@ -600,8 +614,13 @@ void traversal_observer::reply(msg const& m)
, print_endpoint(target_ep()).c_str(), algorithm()->name());
}
#endif
// look for nodes
bdecode_node n = r.dict_find_string("nodes");
#if TORRENT_USE_IPV6
address_type at = algorithm()->get_node().native_address_type();
#endif
char const* nodes_key = algorithm()->get_node().native_nodes_key();
bdecode_node n = r.dict_find_string(nodes_key);
if (n)
{
char const* nodes = n.string_ptr();
@ -612,7 +631,14 @@ void traversal_observer::reply(msg const& m)
node_id id;
std::copy(nodes, nodes + 20, id.begin());
nodes += 20;
algorithm()->traverse(id, read_v4_endpoint<udp::endpoint>(nodes));
udp::endpoint ep;
#if TORRENT_USE_IPV6
if (at == ipv6)
ep = read_v6_endpoint<udp::endpoint>(nodes);
else
#endif
ep = read_v4_endpoint<udp::endpoint>(nodes);
algorithm()->traverse(id, ep);
}
}

View File

@ -6781,10 +6781,18 @@ namespace aux {
address session_impl::external_address(address_type at)
{
if (at == ipv4)
return m_external_ip.external_address(address_v4());
#if !TORRENT_USE_IPV6
TORRENT_UNUSED(at);
#endif
address addr;
#if TORRENT_USE_IPV6
if (at == ipv6)
addr = address_v6();
else
return m_external_ip.external_address(address_v6());
#endif
addr = address_v4();
return m_external_ip.external_address(addr);
}
void session_impl::get_peers(sha1_hash const& ih)

View File

@ -201,7 +201,10 @@ struct msg_args
{ a["want"].list().push_back(w); return *this; }
msg_args& nodes(nodes_t const& n)
{ if (!n.empty()) dht::write_nodes_entry(a, n); return *this; }
{ if (!n.empty()) dht::write_nodes_entry(a["nodes"], n); return *this; }
msg_args& nodes6(nodes_t const& n)
{ if (!n.empty()) dht::write_nodes_entry(a["nodes6"], n); return *this; }
msg_args& peers(std::set<tcp::endpoint> const& p)
{ if (!p.empty()) write_peers(a.dict(), p); return *this; }
@ -512,13 +515,16 @@ dht_settings test_settings()
// TODO: test obfuscated_get_peers
// TODO: 2 split this test up into smaller test cases
TORRENT_TEST(dht)
void do_test_dht(address(&rand_addr)())
{
dht_settings sett = test_settings();
mock_socket s;
obs observer;
counters cnt;
dht::node node(&s, sett, node_id(0), &observer, cnt);
udp::endpoint source(rand_addr(), 20);
std::map<std::string, node*> nodes;
dht::node node(source.protocol() == udp::v4() ? ipv4 : ipv6
, &s, sett, node_id(0), &observer, cnt, nodes);
// DHT should be running on port 48199 now
bdecode_node response;
@ -526,7 +532,6 @@ TORRENT_TEST(dht)
bool ret;
// ====== ping ======
udp::endpoint source(address::from_string("10.0.0.1"), 20);
send_dht_request(node, "ping", source, &response);
dht::key_desc_t pong_desc[] = {
@ -653,7 +658,7 @@ TORRENT_TEST(dht)
// 50 downloaders and 50 seeds
for (int i = 0; i < 100; ++i)
{
source = udp::endpoint(rand_v4(), 6000);
source = udp::endpoint(rand_addr(), 6000);
send_dht_request(node, "get_peers", source, &response
, msg_args().info_hash("01010101010101010101"));
@ -742,10 +747,19 @@ TORRENT_TEST(dht)
// enable node_id enforcement
sett.enforce_node_id = true;
// this is one of the test vectors from:
// http://libtorrent.org/dht_sec.html
source = udp::endpoint(address::from_string("124.31.75.21"), 1);
node_id nid = to_hash("5fbfbff10c5d6a4ec8a88e4c6ab4c28b95eee401");
node_id nid;
if (source.protocol() == udp::v4())
{
// this is one of the test vectors from:
// http://libtorrent.org/dht_sec.html
source = udp::endpoint(address::from_string("124.31.75.21"), 1);
nid = to_hash("5fbfbff10c5d6a4ec8a88e4c6ab4c28b95eee401");
}
else
{
source = udp::endpoint(address::from_string("2001:b829:2123:be84:e16c:d6ae:5290:49f1"), 1);
nid = to_hash("0a8ad123be84e16cd6ae529049f1f1bbe9ebb304");
}
// verify that we reject invalid node IDs
// this is now an invalid node-id for 'source'
@ -781,7 +795,10 @@ TORRENT_TEST(dht)
TEST_EQUAL(node.size().get<0>(), nodes_num);
// now the node-id is valid.
nid[0] = 0x5f;
if (source.protocol() == udp::v4())
nid[0] = 0x5f;
else
nid[0] = 0x0a;
send_dht_request(node, "find_node", source, &response
, msg_args().target("0101010101010101010101010101010101010101").nid(nid));
@ -861,7 +878,7 @@ TORRENT_TEST(dht)
udp::endpoint eps[1000];
for (int i = 0; i < 1000; ++i)
eps[i] = udp::endpoint(rand_v4(), (rand() % 16534) + 1);
eps[i] = udp::endpoint(rand_addr(), (rand() % 16534) + 1);
announce_item items[] =
{
@ -1248,16 +1265,29 @@ TORRENT_TEST(dht)
// s.restrict_routing_ips = false;
node_id id = to_hash("3123456789abcdef01232456789abcdef0123456");
const int bucket_size = 10;
dht::routing_table table(id, bucket_size, s, &observer);
dht::routing_table table(id, source.protocol() == udp::v4() ? ipv4 : ipv6, bucket_size, s, &observer);
std::vector<node_entry> nodes;
TEST_EQUAL(table.size().get<0>(), 0);
node_id tmp = id;
node_id diff = to_hash("15764f7459456a9453f8719b09547c11d5f34061");
address node_addr;
address node_near_addr;
if (source.protocol() == udp::v4())
{
node_addr = address_v4::from_string("4.4.4.4");
node_near_addr = address_v4::from_string("4.4.4.5");
}
else
{
node_addr = address_v6::from_string("2001:1111:1111:1111:1111:1111:1111:1111");
node_near_addr = address_v6::from_string("2001:1111:1111:1111:eeee:eeee:eeee:eeee");
}
// test a node with the same IP:port changing ID
add_and_replace(tmp, diff);
table.node_seen(tmp, udp::endpoint(address::from_string("4.4.4.4"), 4), 10);
table.node_seen(tmp, udp::endpoint(node_addr, 4), 10);
table.find_node(id, nodes, 0, 10);
TEST_EQUAL(table.bucket_size(0), 1);
TEST_EQUAL(table.size().get<0>(), 1);
@ -1265,13 +1295,13 @@ TORRENT_TEST(dht)
if (!nodes.empty())
{
TEST_EQUAL(nodes[0].id, tmp);
TEST_EQUAL(nodes[0].addr(), address_v4::from_string("4.4.4.4"));
TEST_EQUAL(nodes[0].addr(), node_addr);
TEST_EQUAL(nodes[0].port(), 4);
TEST_EQUAL(nodes[0].timeout_count, 0);
}
// set timeout_count to 1
table.node_failed(tmp, udp::endpoint(address_v4::from_string("4.4.4.4"), 4));
table.node_failed(tmp, udp::endpoint(node_addr, 4));
nodes.clear();
table.for_each_node(node_push_back, nop, &nodes);
@ -1279,58 +1309,58 @@ TORRENT_TEST(dht)
if (!nodes.empty())
{
TEST_EQUAL(nodes[0].id, tmp);
TEST_EQUAL(nodes[0].addr(), address_v4::from_string("4.4.4.4"));
TEST_EQUAL(nodes[0].addr(), node_addr);
TEST_EQUAL(nodes[0].port(), 4);
TEST_EQUAL(nodes[0].timeout_count, 1);
}
// add the exact same node again, it should set the timeout_count to 0
table.node_seen(tmp, udp::endpoint(address::from_string("4.4.4.4"), 4), 10);
table.node_seen(tmp, udp::endpoint(node_addr, 4), 10);
nodes.clear();
table.for_each_node(node_push_back, nop, &nodes);
TEST_EQUAL(nodes.size(), 1);
if (!nodes.empty())
{
TEST_EQUAL(nodes[0].id, tmp);
TEST_EQUAL(nodes[0].addr(), address_v4::from_string("4.4.4.4"));
TEST_EQUAL(nodes[0].addr(), node_addr);
TEST_EQUAL(nodes[0].port(), 4);
TEST_EQUAL(nodes[0].timeout_count, 0);
}
// test adding the same IP:port again with a new node ID (should replace the old one)
add_and_replace(tmp, diff);
table.node_seen(tmp, udp::endpoint(address::from_string("4.4.4.4"), 4), 10);
table.node_seen(tmp, udp::endpoint(node_addr, 4), 10);
table.find_node(id, nodes, 0, 10);
TEST_EQUAL(table.bucket_size(0), 1);
TEST_EQUAL(nodes.size(), 1);
if (!nodes.empty())
{
TEST_EQUAL(nodes[0].id, tmp);
TEST_EQUAL(nodes[0].addr(), address_v4::from_string("4.4.4.4"));
TEST_EQUAL(nodes[0].addr(), node_addr);
TEST_EQUAL(nodes[0].port(), 4);
}
// test adding the same node ID again with a different IP (should be ignored)
table.node_seen(tmp, udp::endpoint(address::from_string("4.4.4.4"), 5), 10);
table.node_seen(tmp, udp::endpoint(node_addr, 5), 10);
table.find_node(id, nodes, 0, 10);
TEST_EQUAL(table.bucket_size(0), 1);
if (!nodes.empty())
{
TEST_EQUAL(nodes[0].id, tmp);
TEST_EQUAL(nodes[0].addr(), address_v4::from_string("4.4.4.4"));
TEST_EQUAL(nodes[0].addr(), node_addr);
TEST_EQUAL(nodes[0].port(), 4);
}
// test adding a node that ends up in the same bucket with an IP
// very close to the current one (should be ignored)
// if restrict_routing_ips == true
table.node_seen(tmp, udp::endpoint(address::from_string("4.4.4.5"), 5), 10);
table.node_seen(tmp, udp::endpoint(node_near_addr, 5), 10);
table.find_node(id, nodes, 0, 10);
TEST_EQUAL(table.bucket_size(0), 1);
if (!nodes.empty())
{
TEST_EQUAL(nodes[0].id, tmp);
TEST_EQUAL(nodes[0].addr(), address_v4::from_string("4.4.4.4"));
TEST_EQUAL(nodes[0].addr(), node_addr);
TEST_EQUAL(nodes[0].port(), 4);
}
@ -1339,12 +1369,12 @@ TORRENT_TEST(dht)
init_rand_address();
add_and_replace(tmp, diff);
table.node_seen(id, udp::endpoint(rand_v4(), rand()), 10);
table.node_seen(id, udp::endpoint(rand_addr(), rand()), 10);
nodes.clear();
for (int i = 0; i < 7000; ++i)
{
table.node_seen(tmp, udp::endpoint(rand_v4(), rand()), 20 + (tmp[19] & 0xff));
table.node_seen(tmp, udp::endpoint(rand_addr(), rand()), 20 + (tmp[19] & 0xff));
add_and_replace(tmp, diff);
}
printf("active buckets: %d\n", table.num_active_buckets());
@ -1485,7 +1515,7 @@ TORRENT_TEST(dht)
g_sent_packets.clear();
do
{
dht::node node(&s, sett, (node_id::min)(), &observer, cnt);
dht::node node(ipv4, &s, sett, (node_id::min)(), &observer, cnt, nodes);
udp::endpoint initial_node(address_v4::from_string("4.4.4.4"), 1234);
std::vector<udp::endpoint> nodesv;
@ -1557,7 +1587,7 @@ TORRENT_TEST(dht)
do
{
dht::node_id target = to_hash("1234876923549721020394873245098347598635");
dht::node node(&s, sett, (node_id::min)(), &observer, cnt);
dht::node node(ipv4, &s, sett, (node_id::min)(), &observer, cnt, nodes);
udp::endpoint initial_node(address_v4::from_string("4.4.4.4"), 1234);
node.m_table.add_node(initial_node);
@ -1652,7 +1682,7 @@ TORRENT_TEST(dht)
g_sent_packets.clear();
do
{
dht::node node(&s, sett, (node_id::min)(), &observer, cnt);
dht::node node(ipv4, &s, sett, (node_id::min)(), &observer, cnt, nodes);
udp::endpoint initial_node(address_v4::from_string("4.4.4.4"), 1234);
node.m_table.add_node(initial_node);
@ -1698,7 +1728,7 @@ TORRENT_TEST(dht)
g_sent_packets.clear();
do
{
dht::node node(&s, sett, (node_id::min)(), &observer, cnt);
dht::node node(ipv4, &s, sett, (node_id::min)(), &observer, cnt, nodes);
udp::endpoint initial_node(address_v4::from_string("4.4.4.4"), 1234);
node.m_table.add_node(initial_node);
@ -1785,7 +1815,7 @@ TORRENT_TEST(dht)
// set the branching factor to k to make this a little easier
int old_branching = sett.search_branching;
sett.search_branching = 8;
dht::node node(&s, sett, (node_id::min)(), &observer, cnt);
dht::node node(ipv4, &s, sett, (node_id::min)(), &observer, cnt, nodes);
enum { num_test_nodes = 8 };
node_entry nodes[num_test_nodes] =
{ node_entry(items[0].target, udp::endpoint(address_v4::from_string("1.1.1.1"), 1231))
@ -1885,7 +1915,7 @@ TORRENT_TEST(dht)
// set the branching factor to k to make this a little easier
int old_branching = sett.search_branching;
sett.search_branching = 8;
dht::node node(&s, sett, (node_id::min)(), &observer, cnt);
dht::node node(ipv4, &s, sett, (node_id::min)(), &observer, cnt, nodes);
enum { num_test_nodes = 8 };
node_entry nodes[num_test_nodes] =
{ node_entry(items[0].target, udp::endpoint(address_v4::from_string("1.1.1.1"), 1231))
@ -1987,7 +2017,7 @@ TORRENT_TEST(dht)
// set the branching factor to k to make this a little easier
int old_branching = sett.search_branching;
sett.search_branching = 8;
dht::node node(&s, sett, (node_id::min)(), &observer, cnt);
dht::node node(ipv4, &s, sett, (node_id::min)(), &observer, cnt, nodes);
sha1_hash target = hasher(public_key, item_pk_len).final();
enum { num_test_nodes = 9 }; // we need K + 1 nodes to create the failing sequence
node_entry nodes[num_test_nodes] =
@ -2067,6 +2097,143 @@ TORRENT_TEST(dht)
} while (false);
}
TORRENT_TEST(dht)
{
do_test_dht(rand_v4);
#if TORRENT_USE_IPV6
if (supports_ipv6())
do_test_dht(rand_v6);
#endif
}
TORRENT_TEST(dht_dual_stack)
{
dht_settings sett = test_settings();
mock_socket s;
obs observer;
counters cnt;
std::map<std::string, node*> nodes;
dht::node node4(ipv4, &s, sett, node_id(0), &observer, cnt, nodes);
dht::node node6(ipv6, &s, sett, node_id(0), &observer, cnt, nodes);
nodes.insert(std::make_pair("n4", &node4));
nodes.insert(std::make_pair("n6", &node6));
// DHT should be running on port 48199 now
bdecode_node response;
char error_string[200];
bool ret;
node_id id = to_hash("3123456789abcdef01232456789abcdef0123456");
node4.m_table.node_seen(id, udp::endpoint(address::from_string("4.4.4.4"), 4440), 10);
node6.m_table.node_seen(id, udp::endpoint(address::from_string("4::4"), 4441), 10);
// v4 node requesting v6 nodes
udp::endpoint source(address::from_string("10.0.0.1"), 20);
send_dht_request(node4, "find_node", source, &response
, msg_args().target("0101010101010101010101010101010101010101").want("n6"));
dht::key_desc_t nodes6_desc[] = {
{ "y", bdecode_node::string_t, 1, 0 },
{ "r", bdecode_node::dict_t, 0, key_desc_t::parse_children },
{ "id", bdecode_node::string_t, 20, 0 },
{ "nodes6", bdecode_node::string_t, 38, key_desc_t::last_child }
};
bdecode_node nodes6_keys[4];
ret = verify_message(response, nodes6_desc, nodes6_keys, error_string
, sizeof(error_string));
if (ret)
{
char const* nodes_ptr = nodes6_keys[3].string_ptr();
TEST_CHECK(memcmp(nodes_ptr, id.data(), id.size) == 0);
nodes_ptr += id.size;
udp::endpoint rep = detail::read_v6_endpoint<udp::endpoint>(nodes_ptr);
TEST_EQUAL(rep, udp::endpoint(address::from_string("4::4"), 4441));
}
else
{
fprintf(stderr, "find_node response: %s\n", print_entry(response).c_str());
TEST_ERROR(error_string);
}
// v6 node requesting v4 nodes
source.address(address::from_string("10::1"));
send_dht_request(node6, "get_peers", source, &response
, msg_args().info_hash("0101010101010101010101010101010101010101").want("n4"));
dht::key_desc_t nodes_desc[] = {
{ "y", bdecode_node::string_t, 1, 0 },
{ "r", bdecode_node::dict_t, 0, key_desc_t::parse_children },
{ "id", bdecode_node::string_t, 20, 0 },
{ "nodes", bdecode_node::string_t, 26, key_desc_t::last_child }
};
bdecode_node nodes_keys[4];
ret = verify_message(response, nodes_desc, nodes_keys, error_string
, sizeof(error_string));
if (ret)
{
char const* nodes_ptr = nodes_keys[3].string_ptr();
TEST_CHECK(memcmp(nodes_ptr, id.data(), id.size) == 0);
nodes_ptr += id.size;
udp::endpoint rep = detail::read_v4_endpoint<udp::endpoint>(nodes_ptr);
TEST_EQUAL(rep, udp::endpoint(address::from_string("4.4.4.4"), 4440));
}
else
{
fprintf(stderr, "find_node response: %s\n", print_entry(response).c_str());
TEST_ERROR(error_string);
}
// v6 node requesting both v4 and v6 nodes
send_dht_request(node6, "find_nodes", source, &response
, msg_args().info_hash("0101010101010101010101010101010101010101")
.want("n4")
.want("n6"));
dht::key_desc_t nodes46_desc[] = {
{ "y", bdecode_node::string_t, 1, 0 },
{ "r", bdecode_node::dict_t, 0, key_desc_t::parse_children },
{ "id", bdecode_node::string_t, 20, 0 },
{ "nodes", bdecode_node::string_t, 26, 0 },
{ "nodes6", bdecode_node::string_t, 38, key_desc_t::last_child }
};
bdecode_node nodes46_keys[5];
ret = verify_message(response, nodes46_desc, nodes46_keys, error_string
, sizeof(error_string));
if (ret)
{
char const* nodes_ptr = nodes46_keys[3].string_ptr();
TEST_CHECK(memcmp(nodes_ptr, id.data(), id.size) == 0);
nodes_ptr += id.size;
udp::endpoint rep = detail::read_v4_endpoint<udp::endpoint>(nodes_ptr);
TEST_EQUAL(rep, udp::endpoint(address::from_string("4.4.4.4"), 4440));
nodes_ptr = nodes46_keys[4].string_ptr();
TEST_CHECK(memcmp(nodes_ptr, id.data(), id.size) == 0);
nodes_ptr += id.size;
rep = detail::read_v6_endpoint<udp::endpoint>(nodes_ptr);
TEST_EQUAL(rep, udp::endpoint(address::from_string("4::4"), 4441));
}
else
{
fprintf(stderr, "find_node response: %s\n", print_entry(response).c_str());
TEST_ERROR(error_string);
}
}
void get_test_keypair(char* public_key, char* private_key)
{
from_hex("77ff84905a91936367c01360803104f92432fcd904a43511876df5cdf3e7e548", 64, public_key);
@ -2252,7 +2419,7 @@ TORRENT_TEST(routing_table_uniform)
node_id id = to_hash("1234876923549721020394873245098347598635");
node_id diff = to_hash("15764f7459456a9453f8719b09547c11d5f34061");
routing_table tbl(id, 8, sett, &observer);
routing_table tbl(id, ipv4, 8, sett, &observer);
// insert 256 nodes evenly distributed across the ID space.
// we expect to fill the top 5 buckets
@ -2295,7 +2462,7 @@ TORRENT_TEST(routing_table_balance)
sett.extended_routing_table = false;
node_id id = to_hash("1234876923549721020394873245098347598635");
routing_table tbl(id, 8, sett, &observer);
routing_table tbl(id, ipv4, 8, sett, &observer);
// insert nodes in the routing table that will force it to split
// and make sure we don't end up with a table completely out of balance
@ -2327,7 +2494,7 @@ TORRENT_TEST(routing_table_extended)
for (int i = 0; i < 256; ++i) node_id_prefix.push_back(i);
std::random_shuffle(node_id_prefix.begin(), node_id_prefix.end());
routing_table tbl(id, 8, sett, &observer);
routing_table tbl(id, ipv4, 8, sett, &observer);
for (int i = 0; i < 256; ++i)
{
add_and_replace(id, diff);
@ -2360,7 +2527,7 @@ TORRENT_TEST(routing_table_set_id)
node_id_prefix.reserve(256);
for (int i = 0; i < 256; ++i) node_id_prefix.push_back(i);
std::random_shuffle(node_id_prefix.begin(), node_id_prefix.end());
routing_table tbl(id, 8, sett, &observer);
routing_table tbl(id, ipv4, 8, sett, &observer);
for (int i = 0; i < 256; ++i)
{
id[0] = node_id_prefix[i];
@ -2404,8 +2571,9 @@ TORRENT_TEST(read_only_node)
mock_socket s;
obs observer;
counters cnt;
std::map<std::string, node*> nodes;
dht::node node(&s, sett, node_id(0), &observer, cnt);
dht::node node(ipv4, &s, sett, node_id(0), &observer, cnt, nodes);
udp::endpoint source(address::from_string("10.0.0.1"), 20);
bdecode_node response;
msg_args args;
@ -2491,8 +2659,9 @@ TORRENT_TEST(invalid_error_msg)
mock_socket s;
obs observer;
counters cnt;
std::map<std::string, node*> nodes;
dht::node node(&s, sett, node_id(0), &observer, cnt);
dht::node node(ipv4, &s, sett, node_id(0), &observer, cnt, nodes);
udp::endpoint source(address::from_string("10.0.0.1"), 20);
entry e;
@ -2528,10 +2697,11 @@ TORRENT_TEST(rpc_invalid_error_msg)
mock_socket s;
obs observer;
counters cnt;
std::map<std::string, node*> nodes;
dht::routing_table table(node_id(), 8, sett, &observer);
dht::routing_table table(node_id(), ipv4, 8, sett, &observer);
dht::rpc_manager rpc(node_id(), sett, table, &s, &observer);
dht::node node(&s, sett, node_id(0), &observer, cnt);
dht::node node(ipv4, &s, sett, node_id(0), &observer, cnt, nodes);
udp::endpoint source(address::from_string("10.0.0.1"), 20);
@ -2618,7 +2788,7 @@ TORRENT_TEST(dht_verify_node_address)
s.extended_routing_table = false;
node_id id = to_hash("3123456789abcdef01232456789abcdef0123456");
const int bucket_size = 10;
dht::routing_table table(id, bucket_size, s, &observer);
dht::routing_table table(id, ipv4, bucket_size, s, &observer);
std::vector<node_entry> nodes;
TEST_EQUAL(table.size().get<0>(), 0);