Merge pull request #356 from arvidn/dht-simulator

overhaul the dht simulator
This commit is contained in:
Arvid Norberg 2016-01-09 11:24:11 -05:00
commit 641b6e51f8
5 changed files with 380 additions and 226 deletions

@ -1 +1 @@
Subproject commit aaa1e30060301d6382a0898890a9b965b3a9e1bd
Subproject commit 277389e87cfddd1747da7a8c7287e52ef592d345

View File

@ -30,170 +30,308 @@ POSSIBILITY OF SUCH DAMAGE.
*/
#include "libtorrent/session.hpp"
#include "libtorrent/session_settings.hpp"
#include "libtorrent/io_service.hpp"
#include "libtorrent/deadline_timer.hpp"
#include "libtorrent/address.hpp"
#include "libtorrent/time.hpp"
#include "libtorrent/settings_pack.hpp"
#include "libtorrent/ip_filter.hpp"
#include "libtorrent/alert_types.hpp"
#include "libtorrent/kademlia/node.hpp"
#include "libtorrent/kademlia/dht_observer.hpp"
#include "setup_transfer.hpp"
#include <boost/bind.hpp>
#include <memory> // for unique_ptr
#include <random>
#include "libtorrent/socket_io.hpp" // print_endpoint
#include "libtorrent/random.hpp"
#include "libtorrent/crc32c.hpp"
#include "libtorrent/alert_types.hpp" // for dht_routing_bucket
#include "setup_dht.hpp"
namespace lt = libtorrent;
using namespace sim;
using namespace libtorrent;
namespace {
struct network
{
network(int num_nodes, network_setup_provider& config)
: m_config(config)
, m_ios(m_sim, asio::ip::address_v4::from_string("10.0.0.1"))
, m_start_time(lt::clock_type::now())
, m_timer(m_ios)
, m_shutting_down(false)
lt::time_point start_time;
// this is the IP address assigned to node 'idx'
asio::ip::address addr_from_int(int idx)
{
for (int i = 0; i < num_nodes; ++i)
{
// create a new io_service
char ep[30];
snprintf(ep, sizeof(ep), "50.0.%d.%d", (i + 1) >> 8, (i + 1) & 0xff);
m_io_service.push_back(boost::make_shared<sim::asio::io_service>(
boost::ref(m_sim), asio::ip::address_v4::from_string(ep)));
lt::settings_pack pack = m_config.add_session(i);
boost::shared_ptr<lt::session> ses = boost::make_shared<lt::session>(pack
, boost::ref(*m_io_service.back()));
m_nodes.push_back(ses);
m_config.setup_session(*ses, i);
ses->set_alert_notify(boost::bind(&network::on_alert_notify, this, i));
}
m_timer.expires_from_now(lt::seconds(1));
m_timer.async_wait(boost::bind(&network::on_tick, this, _1));
sim::dump_network_graph(m_sim, "../dht-sim.dot");
return asio::ip::address_v4(lt::random());
}
void on_tick(lt::error_code const& ec)
// this is the node ID assigned to node 'idx'
dht::node_id id_from_addr(lt::address const& addr)
{
if (ec || m_shutting_down) return;
if (m_config.on_tick())
{
terminate();
return;
}
m_timer.expires_from_now(lt::seconds(1));
m_timer.async_wait(boost::bind(&network::on_tick, this, _1));
return dht::generate_id(addr);
}
void on_alert_notify(int session_index)
{
// this function is called inside libtorrent and we cannot perform work
// immediately in it. We have to notify the outside to pull all the alerts
m_io_service[session_index]->post(boost::bind(&network::on_alerts, this, session_index));
}
void on_alerts(int session_index)
{
std::vector<lt::alert*> alerts;
lt::session* ses = m_nodes[session_index].get();
// when shutting down, we may have destructed the session
if (ses == NULL) return;
bool term = false;
ses->pop_alerts(&alerts);
for (std::vector<lt::alert*>::iterator i = alerts.begin();
i != alerts.end(); ++i)
{
lt::alert* a = *i;
if (session_index == 0)
{
// only log the experience of node 0
lt::time_duration d = a->timestamp() - m_start_time;
boost::uint32_t millis = lt::duration_cast<lt::milliseconds>(d).count();
printf("%4d.%03d: [%02d] %s\n", millis / 1000, millis % 1000,
session_index, a->message().c_str());
}
if (m_config.on_alert(a, session_index))
term = true;
if (lt::alert_cast<lt::listen_succeeded_alert>(a))
{
// add a single DHT node to bootstrap from. Make everyone bootstrap
// from the node added 3 steps earlier (this makes the distribution a
// bit unrealisticly uniform).
int dht_bootstrap = (std::max)(0, session_index - 3);
char ep[50];
snprintf(ep, sizeof(ep), "50.0.%d.%d", (dht_bootstrap + 1) >> 8, (dht_bootstrap + 1) & 0xff);
ses->add_dht_node(std::pair<std::string, int>(
ep, m_nodes[dht_bootstrap]->listen_port()));
}
}
if (term) terminate();
}
void run()
{
m_sim.run();
printf("simulation::run() returned\n");
}
void terminate()
{
printf("TERMINATING\n");
m_config.on_exit();
// terminate simulation
for (int i = 0; i < int(m_nodes.size()); ++i)
{
m_zombies.push_back(m_nodes[i]->abort());
m_nodes[i].reset();
}
m_shutting_down = true;
}
private:
network_setup_provider& m_config;
sim::default_config cfg;
sim::simulation m_sim{cfg};
asio::io_service m_ios;
lt::time_point m_start_time;
std::vector<boost::shared_ptr<lt::session> > m_nodes;
std::vector<boost::shared_ptr<sim::asio::io_service> > m_io_service;
std::vector<lt::session_proxy> m_zombies;
lt::deadline_timer m_timer;
bool m_shutting_down;
};
} // anonymous namespace
void setup_dht(int num_nodes, network_setup_provider& cfg)
struct dht_node final : lt::dht::udp_socket_interface
{
network s(num_nodes, cfg);
s.run();
enum flags_t
{
add_dead_nodes = 1
};
dht_node(sim::simulation& sim, lt::dht_settings const& sett, lt::counters& cnt
, int idx, std::uint32_t flags)
: m_io_service(sim, addr_from_int(idx))
#if LIBSIMULATOR_USE_MOVE
, m_socket(m_io_service)
, m_dht(this, sett, id_from_addr(m_io_service.get_ips().front())
, nullptr, cnt)
#else
, m_socket(new asio::ip::udp::socket(m_io_service))
, m_dht(new lt::dht::node(this, sett, id_from_addr(m_io_service.get_ips().front())
, nullptr, cnt))
#endif
, m_add_dead_nodes(flags & add_dead_nodes)
{
error_code ec;
sock().open(asio::ip::udp::v4());
sock().bind(asio::ip::udp::endpoint(lt::address_v4::any(), 6881));
udp::socket::non_blocking_io ioc(true);
sock().io_control(ioc);
sock().async_receive_from(asio::mutable_buffers_1(m_buffer, sizeof(m_buffer))
, m_ep, boost::bind(&dht_node::on_read, this, _1, _2));
}
#if LIBSIMULATOR_USE_MOVE
// This type is not copyable, because the socket and the dht node is not
// copyable.
dht_node(dht_node const&) = delete;
dht_node& operator=(dht_node const&) = delete;
// it's also not movable, because it passes in its this-pointer to the async
// receive function, which pins this object down. However, std::vector cannot
// hold non-movable and non-copyable types. Instead, pretend that it's
// movable and make sure it never needs to be moved (for instance, by
// reserving space in the vector before emplacing any nodes).
dht_node(dht_node&& n) noexcept
: m_socket(std::move(n.m_socket))
, m_dht(this, n.m_dht.settings(), n.m_dht.nid()
, n.m_dht.observer(), n.m_dht.stats_counters())
{
assert(false && "dht_node is not movable");
throw std::runtime_error("dht_node is not movable");
}
dht_node& operator=(dht_node&&)
noexcept
{
assert(false && "dht_node is not movable");
throw std::runtime_error("dht_node is not movable");
}
#endif
void on_read(lt::error_code const& ec, std::size_t bytes_transferred)
{
if (ec) return;
using libtorrent::entry;
using libtorrent::bdecode;
int pos;
error_code err;
// since the simulation is single threaded, we can get away with just
// allocating a single of these
static bdecode_node msg;
int ret = bdecode(m_buffer, m_buffer + bytes_transferred, msg, err, &pos, 10, 500);
if (ret != 0) return;
if (msg.type() != bdecode_node::dict_t) return;
libtorrent::dht::msg m(msg, m_ep);
dht().incoming(m);
sock().async_receive_from(asio::mutable_buffers_1(m_buffer, sizeof(m_buffer))
, m_ep, boost::bind(&dht_node::on_read, this, _1, _2));
}
bool has_quota() override { return true; }
bool send_packet(entry& e, udp::endpoint const& addr, int flags) override
{
// since the simulaton is single threaded, we can get away with allocating
// just a single send buffer
static std::vector<char> send_buf;
send_buf.clear();
bencode(std::back_inserter(send_buf), e);
error_code ec;
sock().send_to(boost::asio::const_buffers_1(send_buf.data(), int(send_buf.size())), addr);
return true;
}
// the node_id and IP address of this node
std::pair<dht::node_id, lt::udp::endpoint> node_info() const
{
return std::make_pair(dht().nid(), lt::udp::endpoint(m_io_service.get_ips().front(), 6881));
}
void bootstrap(std::vector<std::pair<dht::node_id, lt::udp::endpoint>> const& nodes)
{
// we don't want to tell every node about every other node. That's way too
// expensive. instead. pick a random subset of nodes proportionate to the
// bucket it would fall into
dht::node_id id = dht().nid();
// the number of slots left per bucket
std::array<int, 160> nodes_per_bucket;
nodes_per_bucket.fill(8);
// when we use the larger routing table, the low buckets are larger
nodes_per_bucket[0] = 128;
nodes_per_bucket[1] = 64;
nodes_per_bucket[2] = 32;
nodes_per_bucket[3] = 16;
for (auto const& n : nodes)
{
if (n.first == id) continue;
int const bucket = 159 - dht::distance_exp(id, n.first);
/* printf("%s ^ %s = %s %d\n"
, to_hex(id.to_string()).c_str()
, to_hex(n.first.to_string()).c_str()
, to_hex(dht::distance(id, n.first).to_string()).c_str()
, bucket);
*/
// there are no more slots in this bucket, just move ont
if (nodes_per_bucket[bucket] == 0) continue;
--nodes_per_bucket[bucket];
bool const added = dht().m_table.node_seen(n.first, n.second, (lt::random() % 300) + 10);
TORRENT_ASSERT(added);
if (m_add_dead_nodes)
{
// generate a random node ID that would fall in `bucket`
dht::node_id const mask = dht::generate_prefix_mask(bucket + 1);
dht::node_id target = dht::generate_random_id() & ~mask;
target |= id & mask;
dht().m_table.node_seen(target, rand_udp_ep(), (lt::random() % 300) + 10);
}
}
/*
for (int i = 0; i < 40; ++i)
{
printf("%d ", nodes_per_bucket[i]);
}
printf("\n");
*/
//#error add invalid IPs as well, to simulate churn
}
void stop()
{
sock().close();
}
#if LIBSIMULATOR_USE_MOVE
lt::dht::node& dht() { return m_dht; }
lt::dht::node const& dht() const { return m_dht; }
#else
lt::dht::node& dht() { return *m_dht; }
lt::dht::node const& dht() const { return *m_dht; }
#endif
private:
asio::io_service m_io_service;
#if LIBSIMULATOR_USE_MOVE
lt::udp::socket m_socket;
lt::udp::socket& sock() { return m_socket; }
lt::dht::node m_dht;
#else
std::shared_ptr<lt::udp::socket> m_socket;
lt::udp::socket& sock() { return *m_socket; }
std::shared_ptr<lt::dht::node> m_dht;
#endif
lt::udp::endpoint m_ep;
bool m_add_dead_nodes;
// since the simulation is single-threaded, only one socket at a time will
// actually be receiving a packet, so we can get away with just allocating
// one receive buffer, shared by all nodes
static char m_buffer[1300];
};
char dht_node::m_buffer[1300];
dht_network::dht_network(sim::simulation& sim, int num_nodes)
{
m_sett.ignore_dark_internet = false;
m_sett.restrict_routing_ips = false;
m_nodes.reserve(num_nodes);
// TODO: how can we introduce churn among peers?
std::vector<std::pair<dht::node_id, lt::udp::endpoint>> all_nodes;
all_nodes.reserve(num_nodes);
for (int i = 0; i < num_nodes; ++i)
{
// node 0 is the one we log
m_nodes.emplace_back(sim, m_sett, m_cnt, i, 0/*, dht_node::add_dead_nodes*/);
all_nodes.push_back(m_nodes.back().node_info());
}
int cnt = 0;
for (auto& n : m_nodes)
{
n.bootstrap(all_nodes);
if (++cnt == 50)
{
// every now and then, shuffle all_nodes to make the
// routing tables more randomly distributed
std::random_shuffle(all_nodes.begin(), all_nodes.end());
cnt = 0;
}
}
}
dht_network::~dht_network() {}
void print_routing_table(std::vector<lt::dht_routing_bucket> const& rt)
{
int bucket = 0;
for (std::vector<lt::dht_routing_bucket>::const_iterator i = rt.begin()
, end(rt.end()); i != end; ++i, ++bucket)
{
char const* progress_bar =
"################################"
"################################"
"################################"
"################################";
char const* short_progress_bar = "--------";
printf("%3d [%3d, %d] %s%s\n"
, bucket, i->num_nodes, i->num_replacements
, progress_bar + (128 - i->num_nodes)
, short_progress_bar + (8 - (std::min)(8, i->num_replacements)));
}
}
std::vector<lt::udp::endpoint> dht_network::router_nodes() const
{
int idx = 0;
std::vector<lt::udp::endpoint> ret;
ret.reserve(8);
for (auto const& n : m_nodes)
{
if (idx >= 8) break;
++idx;
ret.push_back(n.node_info().second);
}
return ret;
}
void dht_network::stop()
{
for (auto& n : m_nodes) n.stop();
}

View File

@ -31,32 +31,44 @@ POSSIBILITY OF SUCH DAMAGE.
*/
#include "libtorrent/io_service.hpp"
#ifndef TORRENT_SETUP_DHT_HPP_INCLUDED
#define TORRENT_SETUP_DHT_HPP_INCLUDED
namespace libtorrent {
class alert;
struct settings_pack;
struct add_torrent_params;
class session;
#include <vector>
#include "libtorrent/session_settings.hpp" // for dht_settings
#include "libtorrent/performance_counters.hpp" // for counters
namespace lt = libtorrent;
namespace sim
{
struct simulation;
}
struct network_setup_provider
namespace libtorrent
{
// can be used to check expected end conditions
virtual void on_exit() {}
struct dht_routing_bucket;
}
// called for every alert. if the simulation is done, return true
virtual bool on_alert(libtorrent::alert const* alert, int session_idx)
{ return false; }
struct dht_node;
virtual bool on_tick() = 0;
void print_routing_table(std::vector<lt::dht_routing_bucket> const& rt);
// called for every session that's added
virtual libtorrent::settings_pack add_session(int idx) = 0;
struct dht_network
{
dht_network(sim::simulation& sim, int num_nodes);
~dht_network();
// called for a session right after it has been created
virtual void setup_session(libtorrent::session& ses, int idx) = 0;
void stop();
std::vector<lt::udp::endpoint> router_nodes() const;
private:
// used for all the nodes in the network
lt::counters m_cnt;
lt::dht_settings m_sett;
std::vector<dht_node> m_nodes;
};
void setup_dht(int num_nodes, network_setup_provider& config);
#endif

View File

@ -40,85 +40,89 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/session_settings.hpp"
#include "libtorrent/session.hpp"
#include "libtorrent/alert_types.hpp"
#include "libtorrent/deadline_timer.hpp"
#include "libtorrent/socket_io.hpp"
#include "setup_swarm.hpp"
#include "setup_dht.hpp"
namespace lt = libtorrent;
struct network_config : network_setup_provider
TORRENT_TEST(dht_bootstrap)
{
network_config()
: m_start_time(lt::clock_type::now())
, m_ticks(0)
{}
sim::default_config cfg;
sim::simulation sim{cfg};
virtual void on_exit() override final {}
dht_network dht(sim, 1000);
// called for every alert. if the simulation is done, return true
virtual bool on_alert(lt::alert const* alert
, int session_idx) override final
{
if (lt::dht_stats_alert const* p = lt::alert_cast<lt::dht_stats_alert>(alert))
int routing_table_depth = 0;
int num_nodes = 0;
setup_swarm(1, swarm_test::download, sim
// add session
, [](lt::settings_pack& pack) {
}
// add torrent
, [](lt::add_torrent_params& params) {}
// on alert
, [&](lt::alert const* a, lt::session& ses)
{
int bucket = 0;
for (std::vector<lt::dht_routing_bucket>::const_iterator i = p->routing_table.begin()
, end(p->routing_table.end()); i != end; ++i, ++bucket)
if (lt::dht_stats_alert const* p = lt::alert_cast<lt::dht_stats_alert>(a))
{
char const* progress_bar =
"################################"
"################################"
"################################"
"################################";
char const* short_progress_bar = "--------";
printf("%3d [%3d, %d] %s%s\n"
, bucket, i->num_nodes, i->num_replacements
, progress_bar + (128 - i->num_nodes)
, short_progress_bar + (8 - (std::min)(8, i->num_replacements)));
routing_table_depth = p->routing_table.size();
int c = 0;
for (auto const& b : p->routing_table)
{
c += b.num_nodes;
c += b.num_replacements;
}
num_nodes = c;
print_routing_table(p->routing_table);
}
}
// terminate?
, [&](int ticks, lt::session& ses) -> bool
{
if (ticks == 0)
{
lt::dht_settings sett;
sett.ignore_dark_internet = false;
ses.set_dht_settings(sett);
return false;
}
// bootstrap off of 8 of the nodes
lt::entry state;
lt::entry::list_type& nodes = state["dht state"]["nodes"].list();
for (auto const& n : dht.router_nodes())
{
std::string node;
std::back_insert_iterator<std::string> out(node);
lt::detail::write_endpoint(n, out);
nodes.push_back(lt::entry(node));
}
bool on_tick() override final
{
m_first_session->post_dht_stats();
if (++m_ticks > 80) return true;
return false;
}
std::vector<char> buf;
lt::bencode(std::back_inserter(buf), state);
lt::bdecode_node e;
lt::error_code ec;
lt::bdecode(&buf[0], &buf[0] + buf.size(), e, ec);
// called for every session that's added
virtual lt::settings_pack add_session(int idx) override final
{
lt::settings_pack pack = settings();
ses.load_state(e);
lt::settings_pack pack;
pack.set_bool(lt::settings_pack::enable_dht, true);
ses.apply_settings(pack);
}
if (ticks > 2)
{
printf("depth: %d nodes: %d\n", routing_table_depth, num_nodes);
TEST_CHECK(routing_table_depth >= 8);
TEST_CHECK(num_nodes >= 74);
dht.stop();
return true;
}
ses.post_dht_stats();
return false;
});
pack.set_bool(lt::settings_pack::enable_dht, true);
sim.run();
return pack;
}
virtual void setup_session(lt::session& ses, int idx) override final
{
if (idx == 0) m_first_session = &ses;
// we have to do this since all of our simulated IP addresses are close to
// each other
lt::dht_settings sett;
sett.restrict_routing_ips = false;
sett.restrict_search_ips = false;
sett.privacy_lookups = false;
sett.extended_routing_table = false;
ses.set_dht_settings(sett);
}
private:
lt::time_point m_start_time;
boost::shared_ptr<lt::torrent_info> m_ti;
lt::session* m_first_session;
int m_ticks;
};
TORRENT_TEST(dht)
{
network_config cfg;
setup_dht(10, cfg);
}

View File

@ -559,7 +559,7 @@ bool routing_table::add_node(node_entry e)
routing_table::add_node_status_t routing_table::add_node_impl(node_entry e)
{
#ifdef TORRENT_EXPENSIVE_INVARIANT_CHECKS
INVARIANT_CHECK;
// INVARIANT_CHECK;
#endif
// if we already have this (IP,port), don't do anything