Merge pull request #268 from ssiloti/dht-ipv6

Support DHT over IPv6
This commit is contained in:
Arvid Norberg 2016-04-25 19:19:48 -04:00
commit 3c785b96fc
26 changed files with 1127 additions and 276 deletions

View File

@ -101,6 +101,7 @@
* added support for asynchronous disk I/O
* almost completely changed the storage interface (for custom storage)
* added support for hashing pieces in multiple threads
* added support for BEP 32, "IPv6 extension for DHT"
* fix division by zero in super-seeding logic

View File

@ -604,7 +604,7 @@ namespace libtorrent
// implements dht_observer
virtual void set_external_address(address const& ip
, address const& source) TORRENT_OVERRIDE;
virtual address external_address() TORRENT_OVERRIDE;
virtual address external_address(udp proto) TORRENT_OVERRIDE;
virtual void get_peers(sha1_hash const& ih) TORRENT_OVERRIDE;
virtual void announce(sha1_hash const& ih, address const& addr, int port) TORRENT_OVERRIDE;
virtual void outgoing_get_peers(sha1_hash const& target

View File

@ -68,7 +68,7 @@ namespace libtorrent { namespace dht
{
virtual void set_external_address(address const& addr
, address const& source) = 0;
virtual address external_address() = 0;
virtual address external_address(udp proto) = 0;
virtual void get_peers(sha1_hash const& ih) = 0;
virtual void outgoing_get_peers(sha1_hash const& target
, sha1_hash const& sent_target, udp::endpoint const& ep) = 0;

View File

@ -142,7 +142,7 @@ namespace libtorrent { namespace dht
boost::shared_ptr<dht_tracker> self()
{ return shared_from_this(); }
void connection_timeout(error_code const& e);
void connection_timeout(node& n, error_code const& e);
void refresh_timeout(error_code const& e);
void refresh_key(error_code const& e);
@ -157,6 +157,9 @@ namespace libtorrent { namespace dht
counters& m_counters;
node m_dht;
#if TORRENT_USE_IPV6
node m_dht6;
#endif
send_fun_t m_send_fun;
dht_logger* m_log;
@ -165,9 +168,14 @@ namespace libtorrent { namespace dht
deadline_timer m_key_refresh_timer;
deadline_timer m_connection_timer;
#if TORRENT_USE_IPV6
deadline_timer m_connection_timer6;
#endif
deadline_timer m_refresh_timer;
dht_settings const& m_settings;
std::map<std::string, node*> m_nodes;
bool m_abort;
// used to resolve hostnames for nodes

View File

@ -99,9 +99,10 @@ protected:
class TORRENT_EXTRA_EXPORT node : boost::noncopyable
{
public:
node(udp_socket_interface* sock
node(udp proto, udp_socket_interface* sock
, libtorrent::dht_settings const& settings, node_id nid
, dht_observer* observer, counters& cnt
, std::map<std::string, node*> const& nodes
, dht_storage_constructor_type storage_constructor = dht_default_storage_constructor);
~node();
@ -204,6 +205,21 @@ public:
counters& stats_counters() const { return m_counters; }
dht_observer* observer() const { return m_observer; }
udp protocol() { return m_protocol.protocol; }
char const* protocol_family_name() { return m_protocol.family_name; }
char const* protocol_nodes_key() { return m_protocol.nodes_key; }
bool native_address(udp::endpoint ep) const
{ return ep.protocol().family() == m_protocol.protocol.family(); }
bool native_address(tcp::endpoint ep) const
{ return ep.protocol().family() == m_protocol.protocol.family(); }
bool native_address(address addr) const
{
return (addr.is_v4() && m_protocol.protocol == m_protocol.protocol.v4())
|| (addr.is_v6() && m_protocol.protocol == m_protocol.protocol.v6());
}
private:
void send_single_refresh(udp::endpoint const& ep, int bucket
@ -224,15 +240,30 @@ private:
void incoming_request(msg const& h, entry& e);
void write_nodes_entries(sha1_hash const& info_hash
, bdecode_node const& want, entry& r);
node_id m_id;
public:
routing_table m_table;
rpc_manager m_rpc;
std::map<std::string, node*> const& m_nodes;
private:
struct protocol_descriptor
{
udp protocol;
char const* family_name;
char const* nodes_key;
};
static protocol_descriptor const& map_protocol_to_descriptor(udp protocol);
dht_observer* m_observer;
protocol_descriptor const& m_protocol;
time_point m_last_tracker_tick;
// the last time we issued a bootstrap or a refresh on our own ID, to expand

View File

@ -55,10 +55,10 @@ struct TORRENT_EXTRA_EXPORT node_entry
void timed_out() { if (pinged() && timeout_count < 0xfe) ++timeout_count; }
int fail_count() const { return pinged() ? timeout_count : 0; }
void reset_fail_count() { if (pinged()) timeout_count = 0; }
udp::endpoint ep() const { return udp::endpoint(address_v4(a), p); }
udp::endpoint ep() const { return endpoint; }
bool confirmed() const { return timeout_count == 0; }
address addr() const { return address_v4(a); }
int port() const { return p; }
address addr() const { return endpoint.address(); }
int port() const { return endpoint.port; }
#ifndef TORRENT_DISABLE_LOGGING
time_point first_seen;
@ -69,8 +69,7 @@ struct TORRENT_EXTRA_EXPORT node_entry
node_id id;
address_v4::bytes_type a;
boost::uint16_t p;
union_endpoint endpoint;
// the average RTT of this node
boost::uint16_t rtt;

View File

@ -73,6 +73,37 @@ struct routing_table_node
bucket_t live_nodes;
};
struct ip_set
{
void insert(address addr);
size_t count(address addr);
void erase(address addr);
void clear()
{
m_ip4s.clear();
#if TORRENT_USE_IPV6
m_ip6s.clear();
#endif
}
bool operator==(ip_set const& rh)
{
#if TORRENT_USE_IPV6
return m_ip4s == rh.m_ip4s && m_ip6s == rh.m_ip6s;
#else
return m_ip4s == rh.m_ip4s;
#endif
}
// these must be multisets because there can be multiple routing table
// entries for a single IP when restrict_routing_ips is set to false
boost::unordered_multiset<address_v4::bytes_type> m_ip4s;
#if TORRENT_USE_IPV6
boost::unordered_multiset<address_v6::bytes_type> m_ip6s;
#endif
};
// differences in the implementation from the description in
// the paper:
//
@ -101,7 +132,8 @@ public:
// Perhaps replacement nodes should be in a separate vector.
typedef std::vector<routing_table_node> table_t;
routing_table(node_id const& id, int bucket_size
routing_table(node_id const& id, udp proto
, int bucket_size
, dht_settings const& settings
, dht_logger* log);
@ -212,6 +244,15 @@ public:
bool is_full(int bucket) const;
bool native_address(address addr) const
{
return (addr.is_v4() && m_protocol == udp::v4())
|| (addr.is_v6() && m_protocol == udp::v6());
}
bool native_endpoint(udp::endpoint ep) const
{ return ep.protocol() == m_protocol; }
private:
#ifndef TORRENT_DISABLE_LOGGING
@ -239,6 +280,7 @@ private:
table_t m_buckets;
node_id m_id; // our own node id
udp m_protocol; // protocol this table is for
// the last seen depth (i.e. levels in the routing table)
// it's mutable because it's updated by depth(), which is const
@ -256,9 +298,8 @@ private:
// these are all the IPs that are in the routing
// table. It's used to only allow a single entry
// per IP in the whole table. Currently only for
// IPv4
boost::unordered_multiset<address_v4::bytes_type> m_ips;
// per IP in the whole table.
ip_set m_ips;
// constant called k in paper
int m_bucket_size;

View File

@ -113,7 +113,7 @@ protected:
delete p;
}
node & m_node;
node& m_node;
std::vector<observer_ptr> m_results;
node_id const m_target;
boost::uint16_t m_ref_count;
@ -124,8 +124,9 @@ protected:
// the IP addresses of the nodes in m_results
std::set<boost::uint32_t> m_peer4_prefixes;
// no IPv6 support yet anyway
// std::set<boost::uint64_t> m_peer6_prefixes;
#if TORRENT_USE_IPV6
std::set<boost::uint64_t> m_peer6_prefixes;
#endif
};
struct traversal_observer : observer

View File

@ -38,6 +38,71 @@ POSSIBILITY OF SUCH DAMAGE.
namespace libtorrent
{
struct union_address
{
union_address()
{
*this = address();
}
union_address(address const& a)
{
*this = a;
}
union_address& operator=(address const& a)
{
#if TORRENT_USE_IPV6
v4 = a.is_v4();
if (v4)
addr.v4 = a.to_v4().to_bytes();
else
addr.v6 = a.to_v6().to_bytes();
#else
addr.v4 = a.to_v4().to_bytes();
#endif
return *this;
}
bool operator==(union_address const& rh) const
{
#if TORRENT_USE_IPV6
if (v4 != rh.v4) return false;
if (v4)
return addr.v4 == rh.addr.v4;
else
return addr.v6 == rh.addr.v6;
#else
return addr.v4 == rh.addr.v4;
#endif
}
bool operator!=(union_address const& rh) const
{
return !(*this == rh);
}
operator address() const
{
#if TORRENT_USE_IPV6
if (v4) return address(address_v4(addr.v4));
else return address(address_v6(addr.v6));
#else
return address(address_v4(addr.v4));
#endif
}
TORRENT_UNION addr_t
{
address_v4::bytes_type v4;
#if TORRENT_USE_IPV6
address_v6::bytes_type v6;
#endif
} addr;
#if TORRENT_USE_IPV6
bool v4:1;
#endif
};
struct union_endpoint
{
@ -58,75 +123,35 @@ namespace libtorrent
union_endpoint& operator=(udp::endpoint const& ep)
{
#if TORRENT_USE_IPV6
v4 = ep.address().is_v4();
if (v4)
addr.v4 = ep.address().to_v4().to_bytes();
else
addr.v6 = ep.address().to_v6().to_bytes();
#else
addr.v4 = ep.address().to_v4().to_bytes();
#endif
addr = ep.address();
port = ep.port();
return *this;
}
operator udp::endpoint() const
{
#if TORRENT_USE_IPV6
if (v4) return udp::endpoint(address_v4(addr.v4), port);
else return udp::endpoint(address_v6(addr.v6), port);
#else
return udp::endpoint(address_v4(addr.v4), port);
#endif
return udp::endpoint(addr, port);
}
union_endpoint& operator=(tcp::endpoint const& ep)
{
#if TORRENT_USE_IPV6
v4 = ep.address().is_v4();
if (v4)
addr.v4 = ep.address().to_v4().to_bytes();
else
addr.v6 = ep.address().to_v6().to_bytes();
#else
addr.v4 = ep.address().to_v4().to_bytes();
#endif
addr = ep.address();
port = ep.port();
return *this;
}
libtorrent::address address() const
{
#if TORRENT_USE_IPV6
if (v4) return address_v4(addr.v4);
else return address_v6(addr.v6);
#else
return address_v4(addr.v4);
#endif
return addr;
}
operator tcp::endpoint() const
{
#if TORRENT_USE_IPV6
if (v4) return tcp::endpoint(address_v4(addr.v4), port);
else return tcp::endpoint(address_v6(addr.v6), port);
#else
return tcp::endpoint(address_v4(addr.v4), port);
#endif
return tcp::endpoint(addr, port);
}
TORRENT_UNION addr_t
{
address_v4::bytes_type v4;
#if TORRENT_USE_IPV6
address_v6::bytes_type v6;
#endif
} addr;
union_address addr;
boost::uint16_t port;
#if TORRENT_USE_IPV6
bool v4:1;
#endif
};
}

View File

@ -64,6 +64,13 @@ namespace {
return asio::ip::address_v4(lt::random());
}
asio::ip::address addr6_from_int(int idx)
{
asio::ip::address_v6::bytes_type bytes;
for (uint8_t& b : bytes) b = uint8_t(lt::random());
return asio::ip::address_v6(bytes);
}
// this is the node ID assigned to node 'idx'
dht::node_id id_from_addr(lt::address const& addr)
{
@ -74,32 +81,30 @@ namespace {
struct dht_node final : lt::dht::udp_socket_interface
{
enum flags_t
{
add_dead_nodes = 1
};
dht_node(sim::simulation& sim, lt::dht_settings const& sett, lt::counters& cnt
, int idx, std::uint32_t flags)
: m_io_service(sim, addr_from_int(idx))
: m_io_service(sim, (flags & dht_network::bind_ipv6) ? addr6_from_int(idx) : addr_from_int(idx))
#if LIBSIMULATOR_USE_MOVE
, m_socket(m_io_service)
, m_dht(this, sett, id_from_addr(m_io_service.get_ips().front())
, nullptr, cnt)
, m_dht((flags & dht_network::bind_ipv6) ? udp::v6() : udp::v4()
, this, sett, id_from_addr(m_io_service.get_ips().front())
, nullptr, cnt, std::map<std::string, lt::dht::node*>())
#else
, m_socket(new asio::ip::udp::socket(m_io_service))
, m_dht(new lt::dht::node(this, sett, id_from_addr(m_io_service.get_ips().front())
, nullptr, cnt))
, m_dht(new lt::dht::node((flags & dht_network::bind_ipv6) ? udp::v6() : udp::v4()
, this, sett, id_from_addr(m_io_service.get_ips().front())
, nullptr, cnt, std::map<std::string, lt::dht::node*>()))
#endif
, m_add_dead_nodes(flags & add_dead_nodes)
, m_add_dead_nodes(flags & dht_network::add_dead_nodes)
, m_ipv6(flags & dht_network::bind_ipv6)
{
error_code ec;
sock().open(asio::ip::udp::v4());
sock().bind(asio::ip::udp::endpoint(lt::address_v4::any(), 6881));
sock().open(m_ipv6 ? asio::ip::udp::v6() : asio::ip::udp::v4());
sock().bind(asio::ip::udp::endpoint(
m_ipv6 ? lt::address(lt::address_v6::any()) : lt::address(lt::address_v4::any()), 6881));
udp::socket::non_blocking_io ioc(true);
sock().io_control(ioc);
sock().async_receive_from(asio::mutable_buffers_1(m_buffer, sizeof(m_buffer))
, m_ep, boost::bind(&dht_node::on_read, this, _1, _2));
}
@ -117,8 +122,9 @@ struct dht_node final : lt::dht::udp_socket_interface
// reserving space in the vector before emplacing any nodes).
dht_node(dht_node&& n) noexcept
: m_socket(std::move(n.m_socket))
, m_dht(this, n.m_dht.settings(), n.m_dht.nid()
, n.m_dht.observer(), n.m_dht.stats_counters())
, m_dht(n.m_ipv6 ? udp::v6() : udp::v4(), this, n.m_dht.settings(), n.m_dht.nid()
, n.m_dht.observer(), n.m_dht.stats_counters()
, std::map<std::string, lt::dht::node*>())
{
assert(false && "dht_node is not movable");
throw std::runtime_error("dht_node is not movable");
@ -217,7 +223,8 @@ struct dht_node final : lt::dht::udp_socket_interface
dht::node_id const mask = dht::generate_prefix_mask(bucket + 1);
dht::node_id target = dht::generate_random_id() & ~mask;
target |= id & mask;
dht().m_table.node_seen(target, rand_udp_ep(), (lt::random() % 300) + 10);
dht().m_table.node_seen(target, rand_udp_ep(m_ipv6 ? rand_v6 : rand_v4)
, (lt::random() % 300) + 10);
}
}
/*
@ -256,10 +263,11 @@ private:
#endif
lt::udp::endpoint m_ep;
bool m_add_dead_nodes;
bool m_ipv6;
char m_buffer[1300];
};
dht_network::dht_network(sim::simulation& sim, int num_nodes)
dht_network::dht_network(sim::simulation& sim, int num_nodes, std::uint32_t flags)
{
m_sett.ignore_dark_internet = false;
m_sett.restrict_routing_ips = false;
@ -273,7 +281,7 @@ dht_network::dht_network(sim::simulation& sim, int num_nodes)
for (int i = 0; i < num_nodes; ++i)
{
// node 0 is the one we log
m_nodes.emplace_back(sim, m_sett, m_cnt, i, 0/*, dht_node::add_dead_nodes*/);
m_nodes.emplace_back(sim, m_sett, m_cnt, i, flags);
all_nodes.push_back(m_nodes.back().node_info());
}

View File

@ -56,7 +56,13 @@ void print_routing_table(std::vector<lt::dht_routing_bucket> const& rt);
struct dht_network
{
dht_network(sim::simulation& sim, int num_nodes);
enum flags_t
{
add_dead_nodes = 1,
bind_ipv6 = 2
};
dht_network(sim::simulation& sim, int num_nodes, std::uint32_t flags = 0);
~dht_network();
void stop();

View File

@ -255,10 +255,14 @@ void setup_swarm(int num_nodes
for (int i = 0; i < num_nodes; ++i)
{
// create a new io_service
std::vector<asio::ip::address> ips;
char ep[30];
snprintf(ep, sizeof(ep), "50.0.%d.%d", (i + 1) >> 8, (i + 1) & 0xff);
ips.push_back(addr(ep));
snprintf(ep, sizeof(ep), "2000::%X%X", (i + 1) >> 8, (i + 1) & 0xff);
ips.push_back(addr(ep));
io_service.push_back(boost::make_shared<sim::asio::io_service>(
boost::ref(sim), addr(ep)));
boost::ref(sim), ips));
lt::settings_pack pack = default_settings;

View File

@ -44,9 +44,56 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/socket_io.hpp"
#include "setup_swarm.hpp"
#include "setup_dht.hpp"
#include "libtorrent/ed25519.hpp"
#include "libtorrent/bencode.hpp"
#include "libtorrent/kademlia/item.hpp"
#include <boost/bind.hpp>
namespace lt = libtorrent;
#ifndef TORRENT_DISABLE_DHT
void bootstrap_session(std::vector<dht_network*> networks, lt::session& ses)
{
lt::dht_settings sett;
sett.ignore_dark_internet = false;
ses.set_dht_settings(sett);
lt::entry state;
for (auto dht : networks)
{
// bootstrap off of 8 of the nodes
auto router_nodes = dht->router_nodes();
char const* nodes_key;
if (router_nodes.front().address().is_v6())
nodes_key = "nodes6";
else
nodes_key = "nodes";
lt::entry::list_type& nodes = state["dht state"][nodes_key].list();
for (auto const& n : router_nodes)
{
std::string node;
std::back_insert_iterator<std::string> out(node);
lt::detail::write_endpoint(n, out);
nodes.push_back(lt::entry(node));
}
}
std::vector<char> buf;
lt::bencode(std::back_inserter(buf), state);
lt::bdecode_node e;
lt::error_code ec;
lt::bdecode(&buf[0], &buf[0] + buf.size(), e, ec);
ses.load_state(e);
lt::settings_pack pack;
pack.set_bool(lt::settings_pack::enable_dht, true);
ses.apply_settings(pack);
}
#endif // TORRENT_DISABLE_DHT
TORRENT_TEST(dht_bootstrap)
{
#ifndef TORRENT_DISABLE_DHT
@ -85,31 +132,7 @@ TORRENT_TEST(dht_bootstrap)
{
if (ticks == 0)
{
lt::dht_settings sett;
sett.ignore_dark_internet = false;
ses.set_dht_settings(sett);
// bootstrap off of 8 of the nodes
lt::entry state;
lt::entry::list_type& nodes = state["dht state"]["nodes"].list();
for (auto const& n : dht.router_nodes())
{
std::string node;
std::back_insert_iterator<std::string> out(node);
lt::detail::write_endpoint(n, out);
nodes.push_back(lt::entry(node));
}
std::vector<char> buf;
lt::bencode(std::back_inserter(buf), state);
lt::bdecode_node e;
lt::error_code ec;
lt::bdecode(&buf[0], &buf[0] + buf.size(), e, ec);
ses.load_state(e);
lt::settings_pack pack;
pack.set_bool(lt::settings_pack::enable_dht, true);
ses.apply_settings(pack);
bootstrap_session({&dht}, ses);
}
if (ticks > 2)
{
@ -129,3 +152,196 @@ TORRENT_TEST(dht_bootstrap)
}
TORRENT_TEST(dht_dual_stack_get_peers)
{
#ifndef TORRENT_DISABLE_DHT
sim::default_config cfg;
sim::simulation sim{ cfg };
dht_network dht(sim, 100);
dht_network dht6(sim, 100, dht_network::bind_ipv6);
lt::sha1_hash const test_ih("01234567890123456789");
bool got_peer_v4 = false, got_peer_v6 = false;
setup_swarm(1, swarm_test::download, sim
// add session
, [](lt::settings_pack& pack) {
}
// add torrent
, [](lt::add_torrent_params& params) {}
// on alert
, [&](lt::alert const* a, lt::session& ses)
{
if (lt::dht_get_peers_reply_alert const* p = lt::alert_cast<lt::dht_get_peers_reply_alert>(a))
{
std::vector<lt::tcp::endpoint> peers;
p->peers(peers);
for (lt::tcp::endpoint const& peer : peers)
{
// TODO: verify that the endpoint matches the session's
got_peer_v4 |= peer.address().is_v4();
got_peer_v6 |= peer.address().is_v6();
}
}
}
// terminate?
, [&](int ticks, lt::session& ses) -> bool
{
if (ticks == 0)
{
bootstrap_session({&dht, &dht6}, ses);
}
if (ticks == 2)
{
ses.dht_announce(test_ih, 6881);
}
if (ticks == 4)
{
ses.dht_get_peers(test_ih);
}
if (ticks == 6)
{
TEST_CHECK(got_peer_v4);
TEST_CHECK(got_peer_v6);
return true;
}
return false;
});
sim.run();
#endif // TORRENT_DISABLE_DHT
}
TORRENT_TEST(dht_dual_stack_immutable_item)
{
#ifndef TORRENT_DISABLE_DHT
sim::default_config cfg;
sim::simulation sim{ cfg };
dht_network dht(sim, 100);
dht_network dht6(sim, 100, dht_network::bind_ipv6);
lt::sha1_hash item_hash;
bool got_item = false;
setup_swarm(1, swarm_test::download, sim
// add session
, [](lt::settings_pack& pack) {
}
// add torrent
, [](lt::add_torrent_params& params) {}
// on alert
, [&](lt::alert const* a, lt::session& ses)
{
if (lt::dht_immutable_item_alert const* p = lt::alert_cast<lt::dht_immutable_item_alert>(a))
{
// we should only get one alert for each request
TEST_CHECK(!got_item);
got_item = p->target == item_hash && p->item.string() == "immutable item";
}
}
// terminate?
, [&](int ticks, lt::session& ses) -> bool
{
if (ticks == 0)
{
bootstrap_session({&dht, &dht6}, ses);
}
if (ticks == 2)
{
item_hash = ses.dht_put_item(lt::entry("immutable item"));
}
if (ticks == 4)
{
ses.dht_get_item(item_hash);
}
if (ticks == 6)
{
TEST_CHECK(got_item);
return true;
}
return false;
});
sim.run();
#endif // TORRENT_DISABLE_DHT
}
TORRENT_TEST(dht_dual_stack_mutable_item)
{
#ifndef TORRENT_DISABLE_DHT
sim::default_config cfg;
sim::simulation sim{ cfg };
dht_network dht(sim, 100);
dht_network dht6(sim, 100, dht_network::bind_ipv6);
boost::array<char, ed25519_private_key_size> sk;
boost::array<char, ed25519_public_key_size> pk;
int put_count = 0;
bool got_item = false;
setup_swarm(1, swarm_test::download, sim
// add session
, [](lt::settings_pack& pack) {
}
// add torrent
, [](lt::add_torrent_params& params) {}
// on alert
, [&](lt::alert const* a, lt::session& ses)
{
if (lt::dht_mutable_item_alert const* p = lt::alert_cast<lt::dht_mutable_item_alert>(a))
{
TEST_CHECK(!got_item);
if (p->authoritative)
got_item = p->key == pk && p->item.string() == "mutable item";
}
}
// terminate?
, [&](int ticks, lt::session& ses) -> bool
{
if (ticks == 0)
{
bootstrap_session({&dht, &dht6}, ses);
}
if (ticks == 2)
{
boost::array<unsigned char, ed25519_seed_size> seed;
ed25519_create_keypair((unsigned char*)pk.data()
, (unsigned char*)sk.data(), seed.data());
ses.dht_put_item(pk, [&](lt::entry& item, boost::array<char, 64>& sig
, boost::uint64_t& seq, std::string const& salt)
{
item = "mutable item";
seq = 1;
std::vector<char> v;
lt::bencode(std::back_inserter(v), item);
lt::dht::sign_mutable_item(
std::make_pair(v.data(), v.size()), std::make_pair(salt.data(), salt.size())
, seq, pk.data(), sk.data(), sig.data());
put_count++;
});
}
if (ticks == 4)
{
// should be one for each stack, ipv4 and ipv6
TEST_EQUAL(put_count, 2);
ses.dht_get_item(pk);
}
if (ticks == 6)
{
TEST_CHECK(got_item);
return true;
}
return false;
});
sim.run();
#endif // TORRENT_DISABLE_DHT
}

View File

@ -56,8 +56,13 @@ struct obs : dht::dht_observer
virtual void set_external_address(address const& addr
, address const& source) TORRENT_OVERRIDE
{}
virtual address external_address() TORRENT_OVERRIDE
{ return address_v4::from_string("40.30.20.10"); }
virtual address external_address(udp proto) TORRENT_OVERRIDE
{
if (proto == udp::v4())
return address_v4::from_string("40.30.20.10");
else
return address_v6();
}
virtual void get_peers(sha1_hash const& ih) TORRENT_OVERRIDE {}
virtual void outgoing_get_peers(sha1_hash const& target
, sha1_hash const& sent_target, udp::endpoint const& ep) TORRENT_OVERRIDE {}

View File

@ -241,7 +241,13 @@ namespace
}
else
{
int num = (std::min)(int(v.peers.size()), m_settings.max_peers_reply);
int max = m_settings.max_peers_reply;
// if these are IPv6 peers their addresses are 4x the size of IPv4
// so reduce the max peers 4 fold to compensate
// max_peers_reply should probably be specified in bytes
if (!v.peers.empty() && v.peers.begin()->addr.protocol() == tcp::v6())
max /= 4;
int num = (std::min)(int(v.peers.size()), max);
std::set<peer_entry>::const_iterator iter = v.peers.begin();
entry::list_type& pe = peers["values"].list();
std::string endpoint;

View File

@ -76,10 +76,10 @@ namespace libtorrent { namespace dht
time_duration const key_refresh
= duration_cast<time_duration>(minutes(5));
node_id extract_node_id(entry const& e)
node_id extract_node_id(entry const& e, std::string const& key)
{
if (e.type() != entry::dictionary_t) return (node_id::min)();
entry const* nid = e.find_key("node-id");
entry const* nid = e.find_key(key);
if (nid == NULL || nid->type() != entry::string_t || nid->string().length() != 20)
return (node_id::min)();
return node_id(nid->string().c_str());
@ -97,11 +97,19 @@ namespace libtorrent { namespace dht
, dht_storage_constructor_type storage_constructor
, entry const& state)
: m_counters(cnt)
, m_dht(this, settings, extract_node_id(state), observer, cnt, storage_constructor)
, m_dht(udp::v4(), this, settings, extract_node_id(state, "node-id")
, observer, cnt, m_nodes, storage_constructor)
#if TORRENT_USE_IPV6
, m_dht6(udp::v6(), this, settings, extract_node_id(state, "node-id6")
, observer, cnt, m_nodes, storage_constructor)
#endif
, m_send_fun(send_fun)
, m_log(observer)
, m_key_refresh_timer(ios)
, m_connection_timer(ios)
#if TORRENT_USE_IPV6
, m_connection_timer6(ios)
#endif
, m_refresh_timer(ios)
, m_settings(settings)
, m_abort(false)
@ -111,9 +119,19 @@ namespace libtorrent { namespace dht
{
m_blocker.set_block_timer(m_settings.block_timeout);
m_blocker.set_rate_limit(m_settings.block_ratelimit);
m_nodes.insert(std::make_pair(m_dht.protocol_family_name(), &m_dht));
#if TORRENT_USE_IPV6
m_nodes.insert(std::make_pair(m_dht6.protocol_family_name(), &m_dht6));
#endif
#ifndef TORRENT_DISABLE_LOGGING
m_log->log(dht_logger::tracker, "starting DHT tracker with node id: %s"
m_log->log(dht_logger::tracker, "starting IPv4 DHT tracker with node id: %s"
, to_hex(m_dht.nid().to_string()).c_str());
#if TORRENT_USE_IPV6
m_log->log(dht_logger::tracker, "starting IPv6 DHT tracker with node id: %s"
, to_hex(m_dht6.nid().to_string()).c_str());
#endif
#endif
}
@ -131,6 +149,9 @@ namespace libtorrent { namespace dht
, find_data::nodes_callback const& f)
{
std::vector<udp::endpoint> initial_nodes;
#if TORRENT_USE_IPV6
std::vector<udp::endpoint> initial_nodes6;
#endif
if (bootstrap.type() == entry::dictionary_t)
{
@ -138,6 +159,12 @@ namespace libtorrent { namespace dht
if (entry const* nodes = bootstrap.find_key("nodes"))
read_endpoint_list<udp::endpoint>(nodes, initial_nodes);
} TORRENT_CATCH(std::exception&) {}
#if TORRENT_USE_IPV6
TORRENT_TRY{
if (entry const* nodes = bootstrap.find_key("nodes6"))
read_endpoint_list<udp::endpoint>(nodes, initial_nodes6);
} TORRENT_CATCH(std::exception&) {}
#endif
}
error_code ec;
@ -145,11 +172,20 @@ namespace libtorrent { namespace dht
m_connection_timer.expires_from_now(seconds(1), ec);
m_connection_timer.async_wait(
boost::bind(&dht_tracker::connection_timeout, self(), _1));
boost::bind(&dht_tracker::connection_timeout, self(), boost::ref(m_dht), _1));
#if TORRENT_USE_IPV6
m_connection_timer6.expires_from_now(seconds(1), ec);
m_connection_timer6.async_wait(
boost::bind(&dht_tracker::connection_timeout, self(), boost::ref(m_dht6), _1));
#endif
m_refresh_timer.expires_from_now(seconds(5), ec);
m_refresh_timer.async_wait(boost::bind(&dht_tracker::refresh_timeout, self(), _1));
m_dht.bootstrap(initial_nodes, f);
#if TORRENT_USE_IPV6
m_dht6.bootstrap(initial_nodes6, f);
#endif
}
void dht_tracker::stop()
@ -158,6 +194,9 @@ namespace libtorrent { namespace dht
error_code ec;
m_key_refresh_timer.cancel(ec);
m_connection_timer.cancel(ec);
#if TORRENT_USE_IPV6
m_connection_timer6.cancel(ec);
#endif
m_refresh_timer.cancel(ec);
m_host_resolver.cancel();
}
@ -180,14 +219,19 @@ namespace libtorrent { namespace dht
m_dht.update_stats_counters(c);
}
void dht_tracker::connection_timeout(error_code const& e)
void dht_tracker::connection_timeout(node& n, error_code const& e)
{
if (e || m_abort) return;
time_duration d = m_dht.connection_timeout();
time_duration d = n.connection_timeout();
error_code ec;
m_connection_timer.expires_from_now(d, ec);
m_connection_timer.async_wait(boost::bind(&dht_tracker::connection_timeout, self(), _1));
#if TORRENT_USE_IPV6
deadline_timer& timer = n.protocol() == udp::v4() ? m_connection_timer : m_connection_timer6;
#else
deadline_timer& timer = m_connection_timer;
#endif
timer.expires_from_now(d, ec);
timer.async_wait(boost::bind(&dht_tracker::connection_timeout, self(), boost::ref(n), _1));
}
void dht_tracker::refresh_timeout(error_code const& e)
@ -195,6 +239,9 @@ namespace libtorrent { namespace dht
if (e || m_abort) return;
m_dht.tick();
#if TORRENT_USE_IPV6
m_dht6.tick();
#endif
// periodically update the DOS blocker's settings from the dht_settings
m_blocker.set_block_timer(m_settings.block_timeout);
@ -215,6 +262,7 @@ namespace libtorrent { namespace dht
m_key_refresh_timer.async_wait(boost::bind(&dht_tracker::refresh_key, self(), _1));
m_dht.new_write_key();
m_dht6.new_write_key();
#ifndef TORRENT_DISABLE_LOGGING
m_log->log(dht_logger::tracker, "*** new write key***");
#endif
@ -224,6 +272,11 @@ namespace libtorrent { namespace dht
#if defined TORRENT_DEBUG && TORRENT_USE_IOSTREAM
std::ofstream st("dht_routing_table_state.txt", std::ios_base::trunc);
m_dht.print_state(st);
#if TORRENT_USE_IPV6
std::ofstream st6("dht6_routing_table_state.txt", std::ios_base::trunc);
m_dht6.print_state(st6);
#endif
#endif
*/
@ -231,18 +284,111 @@ namespace libtorrent { namespace dht
, boost::function<void(std::vector<tcp::endpoint> const&)> f)
{
m_dht.get_peers(ih, f, NULL, false);
#if TORRENT_USE_IPV6
m_dht6.get_peers(ih, f, NULL, false);
#endif
}
void dht_tracker::announce(sha1_hash const& ih, int listen_port, int flags
, boost::function<void(std::vector<tcp::endpoint> const&)> f)
{
m_dht.announce(ih, listen_port, flags, f);
#if TORRENT_USE_IPV6
m_dht6.announce(ih, listen_port, flags, f);
#endif
}
namespace {
struct get_immutable_item_ctx
{
get_immutable_item_ctx(int traversals)
: active_traversals(traversals)
, item_posted(false)
{}
int active_traversals;
bool item_posted;
};
// these functions provide a slightly higher level
// interface to the get/put functionality in the DHT
void get_immutable_item_callback(item const& it, boost::shared_ptr<get_immutable_item_ctx> ctx
, boost::function<void(item const&)> f)
{
// the reason to wrap here is to control the return value
// since it controls whether we re-put the content
TORRENT_ASSERT(!it.is_mutable());
--ctx->active_traversals;
if (!ctx->item_posted && (!it.empty() || ctx->active_traversals == 0))
{
ctx->item_posted = true;
f(it);
}
}
struct get_mutable_item_ctx
{
get_mutable_item_ctx(int traversals) : active_traversals(traversals) {}
int active_traversals;
item it;
};
bool get_mutable_item_callback(item const& it, bool authoritative
, boost::shared_ptr<get_mutable_item_ctx> ctx
, boost::function<void(item const&, bool)> f)
{
TORRENT_ASSERT(it.is_mutable());
if (authoritative) --ctx->active_traversals;
authoritative = authoritative && ctx->active_traversals == 0;
if ((ctx->it.empty() && !it.empty()) || (ctx->it.seq() < it.seq()))
{
ctx->it = it;
f(it, authoritative);
}
else if (authoritative)
f(it, authoritative);
return false;
}
struct put_item_ctx
{
put_item_ctx(int traversals)
: active_traversals(traversals)
, response_count(0)
{}
int active_traversals;
int response_count;
};
void put_immutable_item_callback(int responses, boost::shared_ptr<put_item_ctx> ctx
, boost::function<void(int)> f)
{
ctx->response_count += responses;
if (--ctx->active_traversals == 0)
f(ctx->response_count);
}
void put_mutable_item_callback(item const& it, int responses, boost::shared_ptr<put_item_ctx> ctx
, boost::function<void(item const&, int)> cb)
{
ctx->response_count += responses;
if (--ctx->active_traversals == 0)
cb(it, ctx->response_count);
}
} // anonymous namespace
void dht_tracker::get_item(sha1_hash const& target
, boost::function<void(item const&)> cb)
{
m_dht.get_item(target, cb);
boost::shared_ptr<get_immutable_item_ctx>
ctx = boost::make_shared<get_immutable_item_ctx>((TORRENT_USE_IPV6) ? 2 : 1);
m_dht.get_item(target, boost::bind(&get_immutable_item_callback, _1, ctx, cb));
#if TORRENT_USE_IPV6
m_dht6.get_item(target, boost::bind(&get_immutable_item_callback, _1, ctx, cb));
#endif
}
// key is a 32-byte binary string, the public key to look up.
@ -251,7 +397,12 @@ namespace libtorrent { namespace dht
, boost::function<void(item const&, bool)> cb
, std::string salt)
{
m_dht.get_item(key, salt, cb);
boost::shared_ptr<get_mutable_item_ctx>
ctx = boost::make_shared<get_mutable_item_ctx>((TORRENT_USE_IPV6) ? 2 : 1);
m_dht.get_item(key, salt, boost::bind(&get_mutable_item_callback, _1, _2, ctx, cb));
#if TORRENT_USE_IPV6
m_dht6.get_item(key, salt, boost::bind(&get_mutable_item_callback, _1, _2, ctx, cb));
#endif
}
void dht_tracker::put_item(entry const& data
@ -262,20 +413,41 @@ namespace libtorrent { namespace dht
sha1_hash target = item_target_id(
std::pair<char const*, int>(flat_data.c_str(), flat_data.size()));
m_dht.put_item(target, data, cb);
boost::shared_ptr<put_item_ctx>
ctx = boost::make_shared<put_item_ctx>((TORRENT_USE_IPV6) ? 2 : 1);
m_dht.put_item(target, data, boost::bind(&put_immutable_item_callback
, _1, ctx, cb));
#if TORRENT_USE_IPV6
m_dht6.put_item(target, data, boost::bind(&put_immutable_item_callback
, _1, ctx, cb));
#endif
}
void dht_tracker::put_item(char const* key
, boost::function<void(item const&, int)> cb
, boost::function<void(item&)> data_cb, std::string salt)
{
m_dht.put_item(key, salt, cb, data_cb);
boost::shared_ptr<put_item_ctx>
ctx = boost::make_shared<put_item_ctx>((TORRENT_USE_IPV6) ? 2 : 1);
m_dht.put_item(key, salt, boost::bind(&put_mutable_item_callback
, _1, _2, ctx, cb), data_cb);
#if TORRENT_USE_IPV6
m_dht6.put_item(key, salt, boost::bind(&put_mutable_item_callback
, _1, _2, ctx, cb), data_cb);
#endif
}
void dht_tracker::direct_request(udp::endpoint ep, entry& e
, boost::function<void(msg const&)> f)
{
m_dht.direct_request(ep, e, f);
#if TORRENT_USE_IPV6
if (ep.protocol() == udp::v6())
m_dht6.direct_request(ep, e, f);
else
#endif
m_dht.direct_request(ep, e, f);
}
void dht_tracker::incoming_error(error_code const& ec, udp::endpoint const& ep)
@ -292,6 +464,9 @@ namespace libtorrent { namespace dht
)
{
m_dht.unreachable(ep);
#if TORRENT_USE_IPV6
m_dht6.unreachable(ep);
#endif
}
}
@ -299,8 +474,6 @@ namespace libtorrent { namespace dht
, char const* buf, int size)
{
if (size <= 20 || *buf != 'd' || buf[size-1] != 'e') return false;
// remove this line/check once the DHT supports IPv6
if (!ep.address().is_v4()) return false;
m_counters.inc_stats_counter(counters::dht_bytes_in, size);
// account for IP and UDP overhead
@ -364,6 +537,9 @@ namespace libtorrent { namespace dht
libtorrent::dht::msg m(m_msg, ep);
m_dht.incoming(m);
#if TORRENT_USE_IPV6
m_dht6.incoming(m);
#endif
return true;
}
@ -378,40 +554,52 @@ namespace libtorrent { namespace dht
n->list().push_back(entry(node));
}
void save_nodes(entry& ret, node const& dht, std::string const& key)
{
entry nodes(entry::list_t);
dht.m_table.for_each_node(&add_node_fun, &add_node_fun, &nodes);
bucket_t cache;
dht.replacement_cache(cache);
for (bucket_t::iterator i(cache.begin())
, end(cache.end()); i != end; ++i)
{
std::string node;
std::back_insert_iterator<std::string> out(node);
write_endpoint(i->ep(), out);
nodes.list().push_back(entry(node));
}
if (!nodes.list().empty())
ret[key] = nodes;
}
} // anonymous namespace
entry dht_tracker::state() const
{
entry ret(entry::dictionary_t);
{
entry nodes(entry::list_t);
m_dht.m_table.for_each_node(&add_node_fun, &add_node_fun, &nodes);
bucket_t cache;
m_dht.replacement_cache(cache);
for (bucket_t::iterator i(cache.begin())
, end(cache.end()); i != end; ++i)
{
std::string node;
std::back_insert_iterator<std::string> out(node);
write_endpoint(i->ep(), out);
nodes.list().push_back(entry(node));
}
if (!nodes.list().empty())
ret["nodes"] = nodes;
}
save_nodes(ret, m_dht, "nodes");
ret["node-id"] = m_dht.nid().to_string();
#if TORRENT_USE_IPV6
save_nodes(ret, m_dht6, "nodes6");
ret["node-id6"] = m_dht6.nid().to_string();
#endif
return ret;
}
void dht_tracker::add_node(udp::endpoint node)
{
m_dht.add_node(node);
#if TORRENT_USE_IPV6
m_dht6.add_node(node);
#endif
}
void dht_tracker::add_router_node(udp::endpoint const& node)
{
m_dht.add_router_node(node);
#if TORRENT_USE_IPV6
m_dht6.add_router_node(node);
#endif
}
bool dht_tracker::has_quota()

View File

@ -63,7 +63,8 @@ void get_peers_observer::reply(msg const& m)
if (n)
{
std::vector<tcp::endpoint> peer_list;
if (n.list_size() == 1 && n.list_at(0).type() == bdecode_node::string_t)
if (n.list_size() == 1 && n.list_at(0).type() == bdecode_node::string_t
&& m.addr.protocol() == udp::v4())
{
// assume it's mainline format
char const* peers = n.list_at(0).string_ptr();

View File

@ -73,14 +73,14 @@ namespace {
void nop() {}
node_id calculate_node_id(node_id const& nid, dht_observer* observer)
node_id calculate_node_id(node_id const& nid, dht_observer* observer, udp protocol)
{
address external_address;
if (observer) external_address = observer->external_address();
if (observer) external_address = observer->external_address(protocol);
// if we don't have an observer, don't pretend that external_address is valid
// generating an ID based on 0.0.0.0 would be terrible. random is better
if (!observer || external_address == address())
if (!observer || external_address.is_unspecified())
{
return generate_random_id();
}
@ -93,16 +93,19 @@ node_id calculate_node_id(node_id const& nid, dht_observer* observer)
} // anonymous namespace
node::node(udp_socket_interface* sock
node::node(udp proto, udp_socket_interface* sock
, dht_settings const& settings, node_id nid
, dht_observer* observer
, struct counters& cnt
, std::map<std::string, node*> const& nodes
, dht_storage_constructor_type storage_constructor)
: m_settings(settings)
, m_id(calculate_node_id(nid, observer))
, m_table(m_id, 8, settings, observer)
, m_id(calculate_node_id(nid, observer, proto))
, m_table(m_id, proto, 8, settings, observer)
, m_rpc(m_id, m_settings, m_table, sock, observer)
, m_nodes(nodes)
, m_observer(observer)
, m_protocol(map_protocol_to_descriptor(proto))
, m_last_tracker_tick(aux::time_now())
, m_last_self_refresh(min_time())
, m_sock(sock)
@ -126,7 +129,7 @@ void node::update_node_id()
// it's possible that our external address hasn't actually changed. If our
// current ID is still valid, don't do anything.
if (verify_id(m_id, m_observer->external_address()))
if (verify_id(m_id, m_observer->external_address(protocol())))
return;
#ifndef TORRENT_DISABLE_LOGGING
@ -134,7 +137,7 @@ void node::update_node_id()
, "updating node ID (because external IP address changed)");
#endif
m_id = generate_id(m_observer->external_address());
m_id = generate_id(m_observer->external_address(protocol()));
m_table.update_node_id(m_id);
}
@ -303,6 +306,8 @@ void node::incoming(msg const& m)
// responds to 'query' messages that it receives.
if (m_settings.read_only) break;
if (!native_address(m.addr)) break;
if (!m_sock->has_quota())
{
m_counters.inc_stats_counter(counters::dht_messages_in_dropped);
@ -407,6 +412,7 @@ void node::add_router_node(udp::endpoint router)
void node::add_node(udp::endpoint node)
{
if (!native_address(node)) return;
// ping the node, and if we get a reply, it
// will be added to the routing table
send_single_refresh(node, m_table.num_active_buckets());
@ -600,7 +606,11 @@ struct ping_observer : observer
}
// look for nodes
bdecode_node n = r.dict_find_string("nodes");
#if TORRENT_USE_IPV6
udp protocol = algorithm()->get_node().protocol();
#endif
char const* nodes_key = algorithm()->get_node().protocol_nodes_key();
bdecode_node n = r.dict_find_string(nodes_key);
if (n)
{
char const* nodes = n.string_ptr();
@ -611,8 +621,14 @@ struct ping_observer : observer
node_id id;
std::copy(nodes, nodes + 20, id.begin());
nodes += 20;
algorithm()->get_node().m_table.heard_about(id
, detail::read_v4_endpoint<udp::endpoint>(nodes));
udp::endpoint ep;
#if TORRENT_USE_IPV6
if (protocol == udp::v6())
ep = detail::read_v6_endpoint<udp::endpoint>(nodes);
else
#endif
ep = detail::read_v4_endpoint<udp::endpoint>(nodes);
algorithm()->get_node().m_table.heard_about(id, ep);
}
}
}
@ -765,14 +781,12 @@ void node::lookup_peers(sha1_hash const& info_hash, entry& reply
m_storage->get_peers(info_hash, noseed, scrape, reply);
}
void TORRENT_EXTRA_EXPORT write_nodes_entry(entry& r, nodes_t const& nodes)
void TORRENT_EXTRA_EXPORT write_nodes_entry(entry& n, nodes_t const& nodes)
{
entry& n = r["nodes"];
std::back_insert_iterator<std::string> out(n.string());
for (nodes_t::const_iterator i = nodes.begin()
, end(nodes.end()); i != end; ++i)
{
if (!i->addr().is_v4()) continue;
std::copy(i->id.begin(), i->id.end(), out);
write_endpoint(udp::endpoint(i->addr(), i->port()), out);
}
@ -843,9 +857,10 @@ void node::incoming_request(msg const& m, entry& e)
{"info_hash", bdecode_node::string_t, 20, 0},
{"noseed", bdecode_node::int_t, 0, key_desc_t::optional},
{"scrape", bdecode_node::int_t, 0, key_desc_t::optional},
{"want", bdecode_node::list_t, 0, key_desc_t::optional},
};
bdecode_node msg_keys[3];
bdecode_node msg_keys[4];
if (!verify_message(arg_ent, msg_desc, msg_keys, error_string
, sizeof(error_string)))
{
@ -859,10 +874,8 @@ void node::incoming_request(msg const& m, entry& e)
m_counters.inc_stats_counter(counters::dht_get_peers_in);
sha1_hash info_hash(msg_keys[0].string_ptr());
nodes_t n;
// always return nodes as well as peers
m_table.find_node(info_hash, n, 0);
write_nodes_entry(reply, n);
write_nodes_entries(info_hash, msg_keys[3], reply);
bool noseed = false;
bool scrape = false;
@ -881,9 +894,10 @@ void node::incoming_request(msg const& m, entry& e)
{
key_desc_t msg_desc[] = {
{"target", bdecode_node::string_t, 20, 0},
{"want", bdecode_node::list_t, 0, key_desc_t::optional},
};
bdecode_node msg_keys[1];
bdecode_node msg_keys[2];
if (!verify_message(arg_ent, msg_desc, msg_keys, error_string, sizeof(error_string)))
{
incoming_error(e, error_string);
@ -893,10 +907,7 @@ void node::incoming_request(msg const& m, entry& e)
m_counters.inc_stats_counter(counters::dht_find_node_in);
sha1_hash target(msg_keys[0].string_ptr());
// TODO: 2 find_node should write directly to the response entry
nodes_t n;
m_table.find_node(target, n, 0);
write_nodes_entry(reply, n);
write_nodes_entries(target, msg_keys[1], reply);
}
else if (query_len == 13 && memcmp(query, "announce_peer", 13) == 0)
{
@ -1112,12 +1123,13 @@ void node::incoming_request(msg const& m, entry& e)
key_desc_t msg_desc[] = {
{"seq", bdecode_node::int_t, 0, key_desc_t::optional},
{"target", bdecode_node::string_t, 20, 0},
{"want", bdecode_node::list_t, 0, key_desc_t::optional},
};
// k is not used for now
// attempt to parse the message
bdecode_node msg_keys[2];
bdecode_node msg_keys[3];
if (!verify_message(arg_ent, msg_desc, msg_keys, error_string
, sizeof(error_string)))
{
@ -1135,10 +1147,8 @@ void node::incoming_request(msg const& m, entry& e)
reply["token"] = generate_token(m.addr, msg_keys[1].string_ptr());
nodes_t n;
// always return nodes as well as peers
m_table.find_node(target, n, 0);
write_nodes_entry(reply, n);
write_nodes_entries(target, msg_keys[2], reply);
// if the get has a sequence number it must be for a mutable item
// so don't bother searching the immutable table
@ -1173,12 +1183,59 @@ void node::incoming_request(msg const& m, entry& e)
}
sha1_hash target(target_ent.string_ptr());
nodes_t n;
// always return nodes as well as peers
m_table.find_node(target, n, 0);
write_nodes_entry(reply, n);
write_nodes_entries(target, arg_ent.dict_find_list("want"), reply);
return;
}
}
void node::write_nodes_entries(sha1_hash const& info_hash
, bdecode_node const& want, entry& r)
{
// if no wants entry was specified, include a nodes
// entry based on the protocol the request came in with
if (want.type() != bdecode_node::list_t)
{
nodes_t n;
m_table.find_node(info_hash, n, 0);
write_nodes_entry(r[protocol_nodes_key()], n);
return;
}
// if there is a wants entry then we may need to reach into
// another node's routing table to get nodes of the requested type
// we use a map maintained by the owning dht_tracker to find the
// node associated with each string in the want list, which may
// include this node
for (int i = 0; i < want.list_size(); ++i)
{
bdecode_node wanted = want.list_at(i);
if (wanted.type() != bdecode_node::string_t)
continue;
std::map<std::string, node*>::const_iterator wanted_node
= m_nodes.find(wanted.string_value());
if (wanted_node == m_nodes.end())
continue;
nodes_t n;
wanted_node->second->m_table.find_node(info_hash, n, 0);
write_nodes_entry(r[wanted_node->second->protocol_nodes_key()], n);
}
}
node::protocol_descriptor const& node::map_protocol_to_descriptor(udp protocol)
{
static protocol_descriptor descriptors[] =
{ {udp::v4(), "n4", "nodes"}
, {udp::v6(), "n6", "nodes6"} };
for (int i = 0; i < sizeof(descriptors) / sizeof(protocol_descriptor); ++i)
{
if (descriptors[i].protocol == protocol)
return descriptors[i];
}
TORRENT_ASSERT(false);
throw std::out_of_range("unknown protocol");
}
} } // namespace libtorrent::dht

View File

@ -41,8 +41,7 @@ namespace libtorrent { namespace dht
, bool pinged)
: last_queried(pinged ? aux::time_now() : min_time())
, id(id_)
, a(ep.address().to_v4().to_bytes())
, p(ep.port())
, endpoint(ep)
, rtt(roundtriptime & 0xffff)
, timeout_count(pinged ? 0 : 0xff)
{
@ -54,8 +53,7 @@ namespace libtorrent { namespace dht
node_entry::node_entry(udp::endpoint ep)
: last_queried(min_time())
, id(0)
, a(ep.address().to_v4().to_bytes())
, p(ep.port())
, endpoint(ep)
, rtt(0xffff)
, timeout_count(0xff)
{
@ -67,7 +65,6 @@ namespace libtorrent { namespace dht
node_entry::node_entry()
: last_queried(min_time())
, id(0)
, p(0)
, rtt(0xffff)
, timeout_count(0xff)
{

View File

@ -60,10 +60,15 @@ using boost::uint8_t;
#if BOOST_VERSION <= 104700
namespace boost {
size_t hash_value(libtorrent::address_v4::bytes_type ip)
{
return boost::hash_value(*reinterpret_cast<boost::uint32_t*>(&ip[0]));
}
size_t hash_value(libtorrent::address_v4::bytes_type ip)
{
return boost::hash_value(*reinterpret_cast<boost::uint32_t*>(&ip[0]));
}
size_t hash_value(libtorrent::address_v6::bytes_type ip)
{
return boost::hash_value(*reinterpret_cast<boost::uint64_t*>(&ip[0]));
}
}
#endif
@ -89,7 +94,37 @@ namespace
}
}
routing_table::routing_table(node_id const& id, int bucket_size
void ip_set::insert(address addr)
{
#if TORRENT_USE_IPV6
if (addr.is_v6())
m_ip6s.insert(addr.to_v6().to_bytes());
else
#endif
m_ip4s.insert(addr.to_v4().to_bytes());
}
size_t ip_set::count(address addr)
{
#if TORRENT_USE_IPV6
if (addr.is_v6())
return m_ip6s.count(addr.to_v6().to_bytes());
else
#endif
return m_ip4s.count(addr.to_v4().to_bytes());
}
void ip_set::erase(address addr)
{
#if TORRENT_USE_IPV6
if (addr.is_v6())
erase_one(m_ip6s, addr.to_v6().to_bytes());
else
#endif
erase_one(m_ip4s, addr.to_v4().to_bytes());
}
routing_table::routing_table(node_id const& id, udp proto, int bucket_size
, dht_settings const& settings
, dht_logger* log)
:
@ -98,6 +133,7 @@ routing_table::routing_table(node_id const& id, int bucket_size
#endif
m_settings(settings)
, m_id(id)
, m_protocol(proto)
, m_depth(0)
, m_last_self_refresh(min_time())
, m_bucket_size(bucket_size)
@ -514,8 +550,8 @@ void routing_table::remove_node(node_entry* n
&& n < &bucket->replacements[0] + bucket->replacements.size())
{
int idx = n - &bucket->replacements[0];
TORRENT_ASSERT(m_ips.count(n->a) > 0);
erase_one(m_ips, n->a);
TORRENT_ASSERT(m_ips.count(n->addr()) > 0);
m_ips.erase(n->addr());
bucket->replacements.erase(bucket->replacements.begin() + idx);
}
@ -524,8 +560,8 @@ void routing_table::remove_node(node_entry* n
&& n < &bucket->live_nodes[0] + bucket->live_nodes.size())
{
int idx = n - &bucket->live_nodes[0];
TORRENT_ASSERT(m_ips.count(n->a) > 0);
erase_one(m_ips, n->a);
TORRENT_ASSERT(m_ips.count(n->addr()) > 0);
m_ips.erase(n->addr());
bucket->live_nodes.erase(bucket->live_nodes.begin() + idx);
}
}
@ -574,6 +610,10 @@ routing_table::add_node_status_t routing_table::add_node_impl(node_entry e)
// INVARIANT_CHECK;
#endif
// don't add if the address isn't the right type
if (!native_endpoint(e.ep()))
return failed_to_add;
// if we already have this (IP,port), don't do anything
if (m_router_nodes.find(e.ep()) != m_router_nodes.end())
return failed_to_add;
@ -582,7 +622,7 @@ routing_table::add_node_status_t routing_table::add_node_impl(node_entry e)
if (e.id == m_id) return failed_to_add;
// do we already have this IP in the table?
if (m_ips.count(e.addr().to_v4().to_bytes()) > 0)
if (m_ips.count(e.addr()) > 0)
{
// this exact IP already exists in the table. It might be the case
// that the node changed IP. If pinged is true, and the port also
@ -688,7 +728,7 @@ routing_table::add_node_status_t routing_table::add_node_impl(node_entry e)
j->timeout_count = 0;
j->update_rtt(e.rtt);
e = *j;
erase_one(m_ips, j->addr().to_v4().to_bytes());
m_ips.erase(j->addr());
rb.erase(j);
}
@ -737,7 +777,7 @@ ip_ok:
{
if (b.empty()) b.reserve(bucket_size_limit);
b.push_back(e);
m_ips.insert(e.addr().to_v4().to_bytes());
m_ips.insert(e.addr());
return node_added;
}
@ -762,9 +802,9 @@ ip_ok:
{
// j points to a node that has not been pinged.
// Replace it with this new one
erase_one(m_ips, j->addr().to_v4().to_bytes());
m_ips.erase(j->addr());
*j = e;
m_ips.insert(e.addr().to_v4().to_bytes());
m_ips.insert(e.addr());
return node_added;
}
@ -783,9 +823,9 @@ ip_ok:
{
// i points to a node that has been marked
// as stale. Replace it with this new one
erase_one(m_ips, j->addr().to_v4().to_bytes());
m_ips.erase(j->addr());
*j = e;
m_ips.insert(e.addr().to_v4().to_bytes());
m_ips.insert(e.addr());
return node_added;
}
@ -893,9 +933,9 @@ ip_ok:
if (j != b.end() && (force_replace || j->rtt > e.rtt))
{
erase_one(m_ips, j->addr().to_v4().to_bytes());
m_ips.erase(j->addr());
*j = e;
m_ips.insert(e.addr().to_v4().to_bytes());
m_ips.insert(e.addr());
#ifndef TORRENT_DISABLE_LOGGING
if (m_log)
{
@ -940,13 +980,13 @@ ip_ok:
// less reliable than this one, that has been pinged
j = std::find_if(rb.begin(), rb.end(), boost::bind(&node_entry::pinged, _1) == false);
if (j == rb.end()) j = rb.begin();
erase_one(m_ips, j->addr().to_v4().to_bytes());
m_ips.erase(j->addr());
rb.erase(j);
}
if (rb.empty()) rb.reserve(m_bucket_size);
rb.push_back(e);
m_ips.insert(e.addr().to_v4().to_bytes());
m_ips.insert(e.addr());
return node_added;
}
@ -1140,13 +1180,13 @@ void routing_table::node_failed(node_id const& nid, udp::endpoint const& ep)
// has never responded at all, remove it
if (j->fail_count() >= m_settings.max_fail_count || !j->pinged())
{
erase_one(m_ips, j->addr().to_v4().to_bytes());
m_ips.erase(j->addr());
b.erase(j);
}
return;
}
erase_one(m_ips, j->a);
m_ips.erase(j->addr());
b.erase(j);
// sort by RTT first, to find the node with the lowest
@ -1277,7 +1317,7 @@ void routing_table::find_node(node_id const& target
#if TORRENT_USE_INVARIANT_CHECKS
void routing_table::check_invariant() const
{
boost::unordered_multiset<address_v4::bytes_type> all_ips;
ip_set all_ips;
for (table_t::const_iterator i = m_buckets.begin()
, end(m_buckets.end()); i != end; ++i)
@ -1285,12 +1325,13 @@ void routing_table::check_invariant() const
for (bucket_t::const_iterator j = i->replacements.begin();
j != i->replacements.end(); ++j)
{
all_ips.insert(j->addr().to_v4().to_bytes());
all_ips.insert(j->addr());
}
for (bucket_t::const_iterator j = i->live_nodes.begin();
j != i->live_nodes.end(); ++j)
{
all_ips.insert(j->addr().to_v4().to_bytes());
TORRENT_ASSERT(j->addr().is_v4() == i->live_nodes.begin()->addr().is_v4());
all_ips.insert(j->addr());
}
}

View File

@ -287,8 +287,11 @@ bool rpc_manager::incoming(msg const& m, node_id* id)
if (!o)
{
#ifndef TORRENT_DISABLE_LOGGING
m_log->log(dht_logger::rpc_manager, "reply with unknown transaction id size: %d from %s"
, int(transaction_id.size()), print_endpoint(m.addr).c_str());
if (m_table.native_endpoint(m.addr))
{
m_log->log(dht_logger::rpc_manager, "reply with unknown transaction id size: %d from %s"
, int(transaction_id.size()), print_endpoint(m.addr).c_str());
}
#endif
// this isn't necessarily because the other end is doing
// something wrong. This can also happen when we restart
@ -464,6 +467,12 @@ bool rpc_manager::invoke(entry& e, udp::endpoint target_addr
// places a 'ro' key in the top-level message dictionary and sets its value to 1.
if (m_settings.read_only) e["ro"] = 1;
node& n = o->algorithm()->get_node();
if (!n.native_address(o->target_addr()))
{
a["want"].list().push_back(entry(n.protocol_family_name()));
}
o->set_target(target_addr);
o->set_transaction_id(tid);

View File

@ -165,31 +165,45 @@ void traversal_algorithm::add_entry(node_id const& id, udp::endpoint addr, unsig
if (m_node.settings().restrict_search_ips
&& !(flags & observer::flag_initial))
{
// mask the lower octet
boost::uint32_t prefix4 = o->target_addr().to_v4().to_ulong();
prefix4 &= 0xffffff00;
if (m_peer4_prefixes.count(prefix4) > 0)
#if TORRENT_USE_IPV6
if (o->target_addr().is_v6())
{
// we already have a node in this search with an IP very
// close to this one. We know that it's not the same, because
// it claims a different node-ID. Ignore this to avoid attacks
#ifndef TORRENT_DISABLE_LOGGING
if (get_node().observer())
{
char hex_id[41];
to_hex(reinterpret_cast<char const*>(&o->id()[0]), 20, hex_id);
get_node().observer()->log(dht_logger::traversal
, "[%p] traversal DUPLICATE node. id: %s addr: %s type: %s"
, static_cast<void*>(this), hex_id, print_address(o->target_addr()).c_str(), name());
}
address_v6::bytes_type addr_bytes = o->target_addr().to_v6().to_bytes();
address_v6::bytes_type::const_iterator prefix_it = addr_bytes.begin();
boost::uint64_t prefix6 = detail::read_uint64(prefix_it);
if (m_peer6_prefixes.insert(prefix6).second)
goto add_result;
}
else
#endif
return;
{
// mask the lower octet
boost::uint32_t prefix4 = o->target_addr().to_v4().to_ulong();
prefix4 &= 0xffffff00;
if (m_peer4_prefixes.insert(prefix4).second)
goto add_result;
}
m_peer4_prefixes.insert(prefix4);
// we already have a node in this search with an IP very
// close to this one. We know that it's not the same, because
// it claims a different node-ID. Ignore this to avoid attacks
#ifndef TORRENT_DISABLE_LOGGING
if (get_node().observer())
{
char hex_id[41];
to_hex(reinterpret_cast<char const*>(&o->id()[0]), 20, hex_id);
get_node().observer()->log(dht_logger::traversal
, "[%p] traversal DUPLICATE node. id: %s addr: %s type: %s"
, static_cast<void*>(this), hex_id, print_address(o->target_addr()).c_str(), name());
}
#endif
return;
}
add_result:
TORRENT_ASSERT((o->flags & observer::flag_no_id)
|| std::find_if(m_results.begin(), m_results.end()
, boost::bind(&observer::id, _1) == id) == m_results.end());
@ -600,8 +614,13 @@ void traversal_observer::reply(msg const& m)
, print_endpoint(target_ep()).c_str(), algorithm()->name());
}
#endif
// look for nodes
bdecode_node n = r.dict_find_string("nodes");
#if TORRENT_USE_IPV6
udp protocol = algorithm()->get_node().protocol();
#endif
char const* nodes_key = algorithm()->get_node().protocol_nodes_key();
bdecode_node n = r.dict_find_string(nodes_key);
if (n)
{
char const* nodes = n.string_ptr();
@ -612,7 +631,14 @@ void traversal_observer::reply(msg const& m)
node_id id;
std::copy(nodes, nodes + 20, id.begin());
nodes += 20;
algorithm()->traverse(id, read_v4_endpoint<udp::endpoint>(nodes));
udp::endpoint ep;
#if TORRENT_USE_IPV6
if (protocol == udp::v6())
ep = read_v6_endpoint<udp::endpoint>(nodes);
else
#endif
ep = read_v4_endpoint<udp::endpoint>(nodes);
algorithm()->traverse(id, ep);
}
}

View File

@ -6779,9 +6779,20 @@ namespace aux {
set_external_address(ip, source_dht, source);
}
address session_impl::external_address()
address session_impl::external_address(udp proto)
{
return m_external_ip.external_address(address_v4());
#if !TORRENT_USE_IPV6
TORRENT_UNUSED(proto);
#endif
address addr;
#if TORRENT_USE_IPV6
if (proto == udp::v6())
addr = address_v6();
else
#endif
addr = address_v4();
return m_external_ip.external_address(addr);
}
void session_impl::get_peers(sha1_hash const& ih)

View File

@ -124,10 +124,10 @@ tcp::endpoint rand_tcp_ep()
return tcp::endpoint(rand_v4(), g_port + 1024);
}
udp::endpoint rand_udp_ep()
udp::endpoint rand_udp_ep(libtorrent::address(&rand_addr)())
{
g_port = (g_port + 1) % 14037;
return udp::endpoint(rand_v4(), g_port + 1024);
return udp::endpoint(rand_addr(), g_port + 1024);
}
std::map<std::string, boost::int64_t> get_counters(libtorrent::session& s)

View File

@ -57,7 +57,7 @@ EXPORT libtorrent::address rand_v4();
EXPORT libtorrent::address rand_v6();
#endif
EXPORT libtorrent::tcp::endpoint rand_tcp_ep();
EXPORT libtorrent::udp::endpoint rand_udp_ep();
EXPORT libtorrent::udp::endpoint rand_udp_ep(libtorrent::address(&rand_addr)() = rand_v4);
EXPORT libtorrent::sha1_hash rand_hash();

View File

@ -201,7 +201,10 @@ struct msg_args
{ a["want"].list().push_back(w); return *this; }
msg_args& nodes(nodes_t const& n)
{ if (!n.empty()) dht::write_nodes_entry(a, n); return *this; }
{ if (!n.empty()) dht::write_nodes_entry(a["nodes"], n); return *this; }
msg_args& nodes6(nodes_t const& n)
{ if (!n.empty()) dht::write_nodes_entry(a["nodes6"], n); return *this; }
msg_args& peers(std::set<tcp::endpoint> const& p)
{ if (!p.empty()) write_peers(a.dict(), p); return *this; }
@ -476,7 +479,7 @@ struct obs : dht::dht_observer
, address const& source) TORRENT_OVERRIDE
{}
virtual address external_address() TORRENT_OVERRIDE
virtual address external_address(udp proto) TORRENT_OVERRIDE
{
return address_v4::from_string("236.0.0.1");
}
@ -512,13 +515,16 @@ dht_settings test_settings()
// TODO: test obfuscated_get_peers
// TODO: 2 split this test up into smaller test cases
TORRENT_TEST(dht)
void do_test_dht(address(&rand_addr)())
{
dht_settings sett = test_settings();
mock_socket s;
obs observer;
counters cnt;
dht::node node(&s, sett, node_id(0), &observer, cnt);
udp::endpoint source(rand_addr(), 20);
std::map<std::string, node*> nodes;
dht::node node(source.protocol(), &s, sett
, node_id(0), &observer, cnt, nodes);
// DHT should be running on port 48199 now
bdecode_node response;
@ -526,7 +532,6 @@ TORRENT_TEST(dht)
bool ret;
// ====== ping ======
udp::endpoint source(address::from_string("10.0.0.1"), 20);
send_dht_request(node, "ping", source, &response);
dht::key_desc_t pong_desc[] = {
@ -653,7 +658,7 @@ TORRENT_TEST(dht)
// 50 downloaders and 50 seeds
for (int i = 0; i < 100; ++i)
{
source = udp::endpoint(rand_v4(), 6000);
source = udp::endpoint(rand_addr(), 6000);
send_dht_request(node, "get_peers", source, &response
, msg_args().info_hash("01010101010101010101"));
@ -742,10 +747,19 @@ TORRENT_TEST(dht)
// enable node_id enforcement
sett.enforce_node_id = true;
// this is one of the test vectors from:
// http://libtorrent.org/dht_sec.html
source = udp::endpoint(address::from_string("124.31.75.21"), 1);
node_id nid = to_hash("5fbfbff10c5d6a4ec8a88e4c6ab4c28b95eee401");
node_id nid;
if (source.protocol() == udp::v4())
{
// this is one of the test vectors from:
// http://libtorrent.org/dht_sec.html
source = udp::endpoint(address::from_string("124.31.75.21"), 1);
nid = to_hash("5fbfbff10c5d6a4ec8a88e4c6ab4c28b95eee401");
}
else
{
source = udp::endpoint(address::from_string("2001:b829:2123:be84:e16c:d6ae:5290:49f1"), 1);
nid = to_hash("0a8ad123be84e16cd6ae529049f1f1bbe9ebb304");
}
// verify that we reject invalid node IDs
// this is now an invalid node-id for 'source'
@ -781,7 +795,10 @@ TORRENT_TEST(dht)
TEST_EQUAL(node.size().get<0>(), nodes_num);
// now the node-id is valid.
nid[0] = 0x5f;
if (source.protocol() == udp::v4())
nid[0] = 0x5f;
else
nid[0] = 0x0a;
send_dht_request(node, "find_node", source, &response
, msg_args().target("0101010101010101010101010101010101010101").nid(nid));
@ -861,7 +878,7 @@ TORRENT_TEST(dht)
udp::endpoint eps[1000];
for (int i = 0; i < 1000; ++i)
eps[i] = udp::endpoint(rand_v4(), (rand() % 16534) + 1);
eps[i] = udp::endpoint(rand_addr(), (rand() % 16534) + 1);
announce_item items[] =
{
@ -1248,16 +1265,29 @@ TORRENT_TEST(dht)
// s.restrict_routing_ips = false;
node_id id = to_hash("3123456789abcdef01232456789abcdef0123456");
const int bucket_size = 10;
dht::routing_table table(id, bucket_size, s, &observer);
dht::routing_table table(id, source.protocol(), bucket_size, s, &observer);
std::vector<node_entry> nodes;
TEST_EQUAL(table.size().get<0>(), 0);
node_id tmp = id;
node_id diff = to_hash("15764f7459456a9453f8719b09547c11d5f34061");
address node_addr;
address node_near_addr;
if (source.protocol() == udp::v4())
{
node_addr = address_v4::from_string("4.4.4.4");
node_near_addr = address_v4::from_string("4.4.4.5");
}
else
{
node_addr = address_v6::from_string("2001:1111:1111:1111:1111:1111:1111:1111");
node_near_addr = address_v6::from_string("2001:1111:1111:1111:eeee:eeee:eeee:eeee");
}
// test a node with the same IP:port changing ID
add_and_replace(tmp, diff);
table.node_seen(tmp, udp::endpoint(address::from_string("4.4.4.4"), 4), 10);
table.node_seen(tmp, udp::endpoint(node_addr, 4), 10);
table.find_node(id, nodes, 0, 10);
TEST_EQUAL(table.bucket_size(0), 1);
TEST_EQUAL(table.size().get<0>(), 1);
@ -1265,13 +1295,13 @@ TORRENT_TEST(dht)
if (!nodes.empty())
{
TEST_EQUAL(nodes[0].id, tmp);
TEST_EQUAL(nodes[0].addr(), address_v4::from_string("4.4.4.4"));
TEST_EQUAL(nodes[0].addr(), node_addr);
TEST_EQUAL(nodes[0].port(), 4);
TEST_EQUAL(nodes[0].timeout_count, 0);
}
// set timeout_count to 1
table.node_failed(tmp, udp::endpoint(address_v4::from_string("4.4.4.4"), 4));
table.node_failed(tmp, udp::endpoint(node_addr, 4));
nodes.clear();
table.for_each_node(node_push_back, nop, &nodes);
@ -1279,58 +1309,58 @@ TORRENT_TEST(dht)
if (!nodes.empty())
{
TEST_EQUAL(nodes[0].id, tmp);
TEST_EQUAL(nodes[0].addr(), address_v4::from_string("4.4.4.4"));
TEST_EQUAL(nodes[0].addr(), node_addr);
TEST_EQUAL(nodes[0].port(), 4);
TEST_EQUAL(nodes[0].timeout_count, 1);
}
// add the exact same node again, it should set the timeout_count to 0
table.node_seen(tmp, udp::endpoint(address::from_string("4.4.4.4"), 4), 10);
table.node_seen(tmp, udp::endpoint(node_addr, 4), 10);
nodes.clear();
table.for_each_node(node_push_back, nop, &nodes);
TEST_EQUAL(nodes.size(), 1);
if (!nodes.empty())
{
TEST_EQUAL(nodes[0].id, tmp);
TEST_EQUAL(nodes[0].addr(), address_v4::from_string("4.4.4.4"));
TEST_EQUAL(nodes[0].addr(), node_addr);
TEST_EQUAL(nodes[0].port(), 4);
TEST_EQUAL(nodes[0].timeout_count, 0);
}
// test adding the same IP:port again with a new node ID (should replace the old one)
add_and_replace(tmp, diff);
table.node_seen(tmp, udp::endpoint(address::from_string("4.4.4.4"), 4), 10);
table.node_seen(tmp, udp::endpoint(node_addr, 4), 10);
table.find_node(id, nodes, 0, 10);
TEST_EQUAL(table.bucket_size(0), 1);
TEST_EQUAL(nodes.size(), 1);
if (!nodes.empty())
{
TEST_EQUAL(nodes[0].id, tmp);
TEST_EQUAL(nodes[0].addr(), address_v4::from_string("4.4.4.4"));
TEST_EQUAL(nodes[0].addr(), node_addr);
TEST_EQUAL(nodes[0].port(), 4);
}
// test adding the same node ID again with a different IP (should be ignored)
table.node_seen(tmp, udp::endpoint(address::from_string("4.4.4.4"), 5), 10);
table.node_seen(tmp, udp::endpoint(node_addr, 5), 10);
table.find_node(id, nodes, 0, 10);
TEST_EQUAL(table.bucket_size(0), 1);
if (!nodes.empty())
{
TEST_EQUAL(nodes[0].id, tmp);
TEST_EQUAL(nodes[0].addr(), address_v4::from_string("4.4.4.4"));
TEST_EQUAL(nodes[0].addr(), node_addr);
TEST_EQUAL(nodes[0].port(), 4);
}
// test adding a node that ends up in the same bucket with an IP
// very close to the current one (should be ignored)
// if restrict_routing_ips == true
table.node_seen(tmp, udp::endpoint(address::from_string("4.4.4.5"), 5), 10);
table.node_seen(tmp, udp::endpoint(node_near_addr, 5), 10);
table.find_node(id, nodes, 0, 10);
TEST_EQUAL(table.bucket_size(0), 1);
if (!nodes.empty())
{
TEST_EQUAL(nodes[0].id, tmp);
TEST_EQUAL(nodes[0].addr(), address_v4::from_string("4.4.4.4"));
TEST_EQUAL(nodes[0].addr(), node_addr);
TEST_EQUAL(nodes[0].port(), 4);
}
@ -1339,12 +1369,12 @@ TORRENT_TEST(dht)
init_rand_address();
add_and_replace(tmp, diff);
table.node_seen(id, udp::endpoint(rand_v4(), rand()), 10);
table.node_seen(id, udp::endpoint(rand_addr(), rand()), 10);
nodes.clear();
for (int i = 0; i < 7000; ++i)
{
table.node_seen(tmp, udp::endpoint(rand_v4(), rand()), 20 + (tmp[19] & 0xff));
table.node_seen(tmp, udp::endpoint(rand_addr(), rand()), 20 + (tmp[19] & 0xff));
add_and_replace(tmp, diff);
}
printf("active buckets: %d\n", table.num_active_buckets());
@ -1485,7 +1515,7 @@ TORRENT_TEST(dht)
g_sent_packets.clear();
do
{
dht::node node(&s, sett, (node_id::min)(), &observer, cnt);
dht::node node(udp::v4(), &s, sett, (node_id::min)(), &observer, cnt, nodes);
udp::endpoint initial_node(address_v4::from_string("4.4.4.4"), 1234);
std::vector<udp::endpoint> nodesv;
@ -1557,7 +1587,7 @@ TORRENT_TEST(dht)
do
{
dht::node_id target = to_hash("1234876923549721020394873245098347598635");
dht::node node(&s, sett, (node_id::min)(), &observer, cnt);
dht::node node(udp::v4(), &s, sett, (node_id::min)(), &observer, cnt, nodes);
udp::endpoint initial_node(address_v4::from_string("4.4.4.4"), 1234);
node.m_table.add_node(initial_node);
@ -1652,7 +1682,7 @@ TORRENT_TEST(dht)
g_sent_packets.clear();
do
{
dht::node node(&s, sett, (node_id::min)(), &observer, cnt);
dht::node node(udp::v4(), &s, sett, (node_id::min)(), &observer, cnt, nodes);
udp::endpoint initial_node(address_v4::from_string("4.4.4.4"), 1234);
node.m_table.add_node(initial_node);
@ -1698,7 +1728,7 @@ TORRENT_TEST(dht)
g_sent_packets.clear();
do
{
dht::node node(&s, sett, (node_id::min)(), &observer, cnt);
dht::node node(udp::v4(), &s, sett, (node_id::min)(), &observer, cnt, nodes);
udp::endpoint initial_node(address_v4::from_string("4.4.4.4"), 1234);
node.m_table.add_node(initial_node);
@ -1785,7 +1815,7 @@ TORRENT_TEST(dht)
// set the branching factor to k to make this a little easier
int old_branching = sett.search_branching;
sett.search_branching = 8;
dht::node node(&s, sett, (node_id::min)(), &observer, cnt);
dht::node node(udp::v4(), &s, sett, (node_id::min)(), &observer, cnt, nodes);
enum { num_test_nodes = 8 };
node_entry nodes[num_test_nodes] =
{ node_entry(items[0].target, udp::endpoint(address_v4::from_string("1.1.1.1"), 1231))
@ -1885,7 +1915,7 @@ TORRENT_TEST(dht)
// set the branching factor to k to make this a little easier
int old_branching = sett.search_branching;
sett.search_branching = 8;
dht::node node(&s, sett, (node_id::min)(), &observer, cnt);
dht::node node(udp::v4(), &s, sett, (node_id::min)(), &observer, cnt, nodes);
enum { num_test_nodes = 8 };
node_entry nodes[num_test_nodes] =
{ node_entry(items[0].target, udp::endpoint(address_v4::from_string("1.1.1.1"), 1231))
@ -1987,7 +2017,7 @@ TORRENT_TEST(dht)
// set the branching factor to k to make this a little easier
int old_branching = sett.search_branching;
sett.search_branching = 8;
dht::node node(&s, sett, (node_id::min)(), &observer, cnt);
dht::node node(udp::v4(), &s, sett, (node_id::min)(), &observer, cnt, nodes);
sha1_hash target = hasher(public_key, item_pk_len).final();
enum { num_test_nodes = 9 }; // we need K + 1 nodes to create the failing sequence
node_entry nodes[num_test_nodes] =
@ -2067,6 +2097,143 @@ TORRENT_TEST(dht)
} while (false);
}
TORRENT_TEST(dht)
{
do_test_dht(rand_v4);
#if TORRENT_USE_IPV6
if (supports_ipv6())
do_test_dht(rand_v6);
#endif
}
TORRENT_TEST(dht_dual_stack)
{
dht_settings sett = test_settings();
mock_socket s;
obs observer;
counters cnt;
std::map<std::string, node*> nodes;
dht::node node4(udp::v4(), &s, sett, node_id(0), &observer, cnt, nodes);
dht::node node6(udp::v6(), &s, sett, node_id(0), &observer, cnt, nodes);
nodes.insert(std::make_pair("n4", &node4));
nodes.insert(std::make_pair("n6", &node6));
// DHT should be running on port 48199 now
bdecode_node response;
char error_string[200];
bool ret;
node_id id = to_hash("3123456789abcdef01232456789abcdef0123456");
node4.m_table.node_seen(id, udp::endpoint(address::from_string("4.4.4.4"), 4440), 10);
node6.m_table.node_seen(id, udp::endpoint(address::from_string("4::4"), 4441), 10);
// v4 node requesting v6 nodes
udp::endpoint source(address::from_string("10.0.0.1"), 20);
send_dht_request(node4, "find_node", source, &response
, msg_args().target("0101010101010101010101010101010101010101").want("n6"));
dht::key_desc_t nodes6_desc[] = {
{ "y", bdecode_node::string_t, 1, 0 },
{ "r", bdecode_node::dict_t, 0, key_desc_t::parse_children },
{ "id", bdecode_node::string_t, 20, 0 },
{ "nodes6", bdecode_node::string_t, 38, key_desc_t::last_child }
};
bdecode_node nodes6_keys[4];
ret = verify_message(response, nodes6_desc, nodes6_keys, error_string
, sizeof(error_string));
if (ret)
{
char const* nodes_ptr = nodes6_keys[3].string_ptr();
TEST_CHECK(memcmp(nodes_ptr, id.data(), id.size) == 0);
nodes_ptr += id.size;
udp::endpoint rep = detail::read_v6_endpoint<udp::endpoint>(nodes_ptr);
TEST_EQUAL(rep, udp::endpoint(address::from_string("4::4"), 4441));
}
else
{
fprintf(stderr, "find_node response: %s\n", print_entry(response).c_str());
TEST_ERROR(error_string);
}
// v6 node requesting v4 nodes
source.address(address::from_string("10::1"));
send_dht_request(node6, "get_peers", source, &response
, msg_args().info_hash("0101010101010101010101010101010101010101").want("n4"));
dht::key_desc_t nodes_desc[] = {
{ "y", bdecode_node::string_t, 1, 0 },
{ "r", bdecode_node::dict_t, 0, key_desc_t::parse_children },
{ "id", bdecode_node::string_t, 20, 0 },
{ "nodes", bdecode_node::string_t, 26, key_desc_t::last_child }
};
bdecode_node nodes_keys[4];
ret = verify_message(response, nodes_desc, nodes_keys, error_string
, sizeof(error_string));
if (ret)
{
char const* nodes_ptr = nodes_keys[3].string_ptr();
TEST_CHECK(memcmp(nodes_ptr, id.data(), id.size) == 0);
nodes_ptr += id.size;
udp::endpoint rep = detail::read_v4_endpoint<udp::endpoint>(nodes_ptr);
TEST_EQUAL(rep, udp::endpoint(address::from_string("4.4.4.4"), 4440));
}
else
{
fprintf(stderr, "find_node response: %s\n", print_entry(response).c_str());
TEST_ERROR(error_string);
}
// v6 node requesting both v4 and v6 nodes
send_dht_request(node6, "find_nodes", source, &response
, msg_args().info_hash("0101010101010101010101010101010101010101")
.want("n4")
.want("n6"));
dht::key_desc_t nodes46_desc[] = {
{ "y", bdecode_node::string_t, 1, 0 },
{ "r", bdecode_node::dict_t, 0, key_desc_t::parse_children },
{ "id", bdecode_node::string_t, 20, 0 },
{ "nodes", bdecode_node::string_t, 26, 0 },
{ "nodes6", bdecode_node::string_t, 38, key_desc_t::last_child }
};
bdecode_node nodes46_keys[5];
ret = verify_message(response, nodes46_desc, nodes46_keys, error_string
, sizeof(error_string));
if (ret)
{
char const* nodes_ptr = nodes46_keys[3].string_ptr();
TEST_CHECK(memcmp(nodes_ptr, id.data(), id.size) == 0);
nodes_ptr += id.size;
udp::endpoint rep = detail::read_v4_endpoint<udp::endpoint>(nodes_ptr);
TEST_EQUAL(rep, udp::endpoint(address::from_string("4.4.4.4"), 4440));
nodes_ptr = nodes46_keys[4].string_ptr();
TEST_CHECK(memcmp(nodes_ptr, id.data(), id.size) == 0);
nodes_ptr += id.size;
rep = detail::read_v6_endpoint<udp::endpoint>(nodes_ptr);
TEST_EQUAL(rep, udp::endpoint(address::from_string("4::4"), 4441));
}
else
{
fprintf(stderr, "find_node response: %s\n", print_entry(response).c_str());
TEST_ERROR(error_string);
}
}
void get_test_keypair(char* public_key, char* private_key)
{
from_hex("77ff84905a91936367c01360803104f92432fcd904a43511876df5cdf3e7e548", 64, public_key);
@ -2252,7 +2419,7 @@ TORRENT_TEST(routing_table_uniform)
node_id id = to_hash("1234876923549721020394873245098347598635");
node_id diff = to_hash("15764f7459456a9453f8719b09547c11d5f34061");
routing_table tbl(id, 8, sett, &observer);
routing_table tbl(id, udp::v4(), 8, sett, &observer);
// insert 256 nodes evenly distributed across the ID space.
// we expect to fill the top 5 buckets
@ -2295,7 +2462,7 @@ TORRENT_TEST(routing_table_balance)
sett.extended_routing_table = false;
node_id id = to_hash("1234876923549721020394873245098347598635");
routing_table tbl(id, 8, sett, &observer);
routing_table tbl(id, udp::v4(), 8, sett, &observer);
// insert nodes in the routing table that will force it to split
// and make sure we don't end up with a table completely out of balance
@ -2327,7 +2494,7 @@ TORRENT_TEST(routing_table_extended)
for (int i = 0; i < 256; ++i) node_id_prefix.push_back(i);
std::random_shuffle(node_id_prefix.begin(), node_id_prefix.end());
routing_table tbl(id, 8, sett, &observer);
routing_table tbl(id, udp::v4(), 8, sett, &observer);
for (int i = 0; i < 256; ++i)
{
add_and_replace(id, diff);
@ -2360,7 +2527,7 @@ TORRENT_TEST(routing_table_set_id)
node_id_prefix.reserve(256);
for (int i = 0; i < 256; ++i) node_id_prefix.push_back(i);
std::random_shuffle(node_id_prefix.begin(), node_id_prefix.end());
routing_table tbl(id, 8, sett, &observer);
routing_table tbl(id, udp::v4(), 8, sett, &observer);
for (int i = 0; i < 256; ++i)
{
id[0] = node_id_prefix[i];
@ -2404,8 +2571,9 @@ TORRENT_TEST(read_only_node)
mock_socket s;
obs observer;
counters cnt;
std::map<std::string, node*> nodes;
dht::node node(&s, sett, node_id(0), &observer, cnt);
dht::node node(udp::v4(), &s, sett, node_id(0), &observer, cnt, nodes);
udp::endpoint source(address::from_string("10.0.0.1"), 20);
bdecode_node response;
msg_args args;
@ -2491,8 +2659,9 @@ TORRENT_TEST(invalid_error_msg)
mock_socket s;
obs observer;
counters cnt;
std::map<std::string, node*> nodes;
dht::node node(&s, sett, node_id(0), &observer, cnt);
dht::node node(udp::v4(), &s, sett, node_id(0), &observer, cnt, nodes);
udp::endpoint source(address::from_string("10.0.0.1"), 20);
entry e;
@ -2528,10 +2697,11 @@ TORRENT_TEST(rpc_invalid_error_msg)
mock_socket s;
obs observer;
counters cnt;
std::map<std::string, node*> nodes;
dht::routing_table table(node_id(), 8, sett, &observer);
dht::routing_table table(node_id(), udp::v4(), 8, sett, &observer);
dht::rpc_manager rpc(node_id(), sett, table, &s, &observer);
dht::node node(&s, sett, node_id(0), &observer, cnt);
dht::node node(udp::v4(), &s, sett, node_id(0), &observer, cnt, nodes);
udp::endpoint source(address::from_string("10.0.0.1"), 20);
@ -2618,7 +2788,7 @@ TORRENT_TEST(dht_verify_node_address)
s.extended_routing_table = false;
node_id id = to_hash("3123456789abcdef01232456789abcdef0123456");
const int bucket_size = 10;
dht::routing_table table(id, bucket_size, s, &observer);
dht::routing_table table(id, udp::v4(), bucket_size, s, &observer);
std::vector<node_entry> nodes;
TEST_EQUAL(table.size().get<0>(), 0);