diff --git a/simulation/libsimulator b/simulation/libsimulator index aaa1e3006..277389e87 160000 --- a/simulation/libsimulator +++ b/simulation/libsimulator @@ -1 +1 @@ -Subproject commit aaa1e30060301d6382a0898890a9b965b3a9e1bd +Subproject commit 277389e87cfddd1747da7a8c7287e52ef592d345 diff --git a/simulation/setup_dht.cpp b/simulation/setup_dht.cpp index a5f4835ad..bf80ed874 100644 --- a/simulation/setup_dht.cpp +++ b/simulation/setup_dht.cpp @@ -30,170 +30,308 @@ POSSIBILITY OF SUCH DAMAGE. */ -#include "libtorrent/session.hpp" #include "libtorrent/session_settings.hpp" #include "libtorrent/io_service.hpp" #include "libtorrent/deadline_timer.hpp" #include "libtorrent/address.hpp" #include "libtorrent/time.hpp" -#include "libtorrent/settings_pack.hpp" -#include "libtorrent/ip_filter.hpp" -#include "libtorrent/alert_types.hpp" +#include "libtorrent/kademlia/node.hpp" +#include "libtorrent/kademlia/dht_observer.hpp" +#include "setup_transfer.hpp" #include +#include // for unique_ptr +#include +#include "libtorrent/socket_io.hpp" // print_endpoint +#include "libtorrent/random.hpp" +#include "libtorrent/crc32c.hpp" +#include "libtorrent/alert_types.hpp" // for dht_routing_bucket #include "setup_dht.hpp" namespace lt = libtorrent; using namespace sim; +using namespace libtorrent; namespace { -struct network -{ - network(int num_nodes, network_setup_provider& config) - : m_config(config) - , m_ios(m_sim, asio::ip::address_v4::from_string("10.0.0.1")) - , m_start_time(lt::clock_type::now()) - , m_timer(m_ios) - , m_shutting_down(false) + lt::time_point start_time; + + // this is the IP address assigned to node 'idx' + asio::ip::address addr_from_int(int idx) { - - for (int i = 0; i < num_nodes; ++i) - { - // create a new io_service - char ep[30]; - snprintf(ep, sizeof(ep), "50.0.%d.%d", (i + 1) >> 8, (i + 1) & 0xff); - m_io_service.push_back(boost::make_shared( - boost::ref(m_sim), asio::ip::address_v4::from_string(ep))); - - lt::settings_pack pack = m_config.add_session(i); - - boost::shared_ptr ses = boost::make_shared(pack - , boost::ref(*m_io_service.back())); - m_nodes.push_back(ses); - - m_config.setup_session(*ses, i); - - ses->set_alert_notify(boost::bind(&network::on_alert_notify, this, i)); - } - - m_timer.expires_from_now(lt::seconds(1)); - m_timer.async_wait(boost::bind(&network::on_tick, this, _1)); - - sim::dump_network_graph(m_sim, "../dht-sim.dot"); + return asio::ip::address_v4(lt::random()); } - void on_tick(lt::error_code const& ec) + // this is the node ID assigned to node 'idx' + dht::node_id id_from_addr(lt::address const& addr) { - if (ec || m_shutting_down) return; - - if (m_config.on_tick()) - { - terminate(); - return; - } - - m_timer.expires_from_now(lt::seconds(1)); - m_timer.async_wait(boost::bind(&network::on_tick, this, _1)); + return dht::generate_id(addr); } - void on_alert_notify(int session_index) - { - // this function is called inside libtorrent and we cannot perform work - // immediately in it. We have to notify the outside to pull all the alerts - m_io_service[session_index]->post(boost::bind(&network::on_alerts, this, session_index)); - } - - void on_alerts(int session_index) - { - std::vector alerts; - - lt::session* ses = m_nodes[session_index].get(); - - // when shutting down, we may have destructed the session - if (ses == NULL) return; - - bool term = false; - ses->pop_alerts(&alerts); - - for (std::vector::iterator i = alerts.begin(); - i != alerts.end(); ++i) - { - lt::alert* a = *i; - - if (session_index == 0) - { - // only log the experience of node 0 - lt::time_duration d = a->timestamp() - m_start_time; - boost::uint32_t millis = lt::duration_cast(d).count(); - printf("%4d.%03d: [%02d] %s\n", millis / 1000, millis % 1000, - session_index, a->message().c_str()); - } - - if (m_config.on_alert(a, session_index)) - term = true; - - if (lt::alert_cast(a)) - { - // add a single DHT node to bootstrap from. Make everyone bootstrap - // from the node added 3 steps earlier (this makes the distribution a - // bit unrealisticly uniform). - int dht_bootstrap = (std::max)(0, session_index - 3); - - char ep[50]; - snprintf(ep, sizeof(ep), "50.0.%d.%d", (dht_bootstrap + 1) >> 8, (dht_bootstrap + 1) & 0xff); - ses->add_dht_node(std::pair( - ep, m_nodes[dht_bootstrap]->listen_port())); - } - } - - if (term) terminate(); - } - - void run() - { - m_sim.run(); - printf("simulation::run() returned\n"); - } - - void terminate() - { - printf("TERMINATING\n"); - - m_config.on_exit(); - - // terminate simulation - for (int i = 0; i < int(m_nodes.size()); ++i) - { - m_zombies.push_back(m_nodes[i]->abort()); - m_nodes[i].reset(); - } - - m_shutting_down = true; - } - -private: - - network_setup_provider& m_config; - - sim::default_config cfg; - sim::simulation m_sim{cfg}; - asio::io_service m_ios; - lt::time_point m_start_time; - - std::vector > m_nodes; - std::vector > m_io_service; - std::vector m_zombies; - lt::deadline_timer m_timer; - bool m_shutting_down; -}; - } // anonymous namespace -void setup_dht(int num_nodes, network_setup_provider& cfg) +struct dht_node final : lt::dht::udp_socket_interface { - network s(num_nodes, cfg); - s.run(); + enum flags_t + { + add_dead_nodes = 1 + }; + + dht_node(sim::simulation& sim, lt::dht_settings const& sett, lt::counters& cnt + , int idx, std::uint32_t flags) + : m_io_service(sim, addr_from_int(idx)) +#if LIBSIMULATOR_USE_MOVE + , m_socket(m_io_service) + , m_dht(this, sett, id_from_addr(m_io_service.get_ips().front()) + , nullptr, cnt) +#else + , m_socket(new asio::ip::udp::socket(m_io_service)) + , m_dht(new lt::dht::node(this, sett, id_from_addr(m_io_service.get_ips().front()) + , nullptr, cnt)) +#endif + , m_add_dead_nodes(flags & add_dead_nodes) + { + error_code ec; + sock().open(asio::ip::udp::v4()); + sock().bind(asio::ip::udp::endpoint(lt::address_v4::any(), 6881)); + + udp::socket::non_blocking_io ioc(true); + sock().io_control(ioc); + + sock().async_receive_from(asio::mutable_buffers_1(m_buffer, sizeof(m_buffer)) + , m_ep, boost::bind(&dht_node::on_read, this, _1, _2)); + } + +#if LIBSIMULATOR_USE_MOVE + // This type is not copyable, because the socket and the dht node is not + // copyable. + dht_node(dht_node const&) = delete; + dht_node& operator=(dht_node const&) = delete; + + // it's also not movable, because it passes in its this-pointer to the async + // receive function, which pins this object down. However, std::vector cannot + // hold non-movable and non-copyable types. Instead, pretend that it's + // movable and make sure it never needs to be moved (for instance, by + // reserving space in the vector before emplacing any nodes). + dht_node(dht_node&& n) noexcept + : m_socket(std::move(n.m_socket)) + , m_dht(this, n.m_dht.settings(), n.m_dht.nid() + , n.m_dht.observer(), n.m_dht.stats_counters()) + { + assert(false && "dht_node is not movable"); + throw std::runtime_error("dht_node is not movable"); + } + dht_node& operator=(dht_node&&) + noexcept + { + assert(false && "dht_node is not movable"); + throw std::runtime_error("dht_node is not movable"); + } +#endif + + void on_read(lt::error_code const& ec, std::size_t bytes_transferred) + { + if (ec) return; + + using libtorrent::entry; + using libtorrent::bdecode; + + int pos; + error_code err; + + // since the simulation is single threaded, we can get away with just + // allocating a single of these + static bdecode_node msg; + int ret = bdecode(m_buffer, m_buffer + bytes_transferred, msg, err, &pos, 10, 500); + if (ret != 0) return; + + if (msg.type() != bdecode_node::dict_t) return; + + libtorrent::dht::msg m(msg, m_ep); + dht().incoming(m); + + sock().async_receive_from(asio::mutable_buffers_1(m_buffer, sizeof(m_buffer)) + , m_ep, boost::bind(&dht_node::on_read, this, _1, _2)); + } + + bool has_quota() override { return true; } + bool send_packet(entry& e, udp::endpoint const& addr, int flags) override + { + // since the simulaton is single threaded, we can get away with allocating + // just a single send buffer + static std::vector send_buf; + + send_buf.clear(); + bencode(std::back_inserter(send_buf), e); + error_code ec; + + sock().send_to(boost::asio::const_buffers_1(send_buf.data(), int(send_buf.size())), addr); + return true; + } + + // the node_id and IP address of this node + std::pair node_info() const + { + return std::make_pair(dht().nid(), lt::udp::endpoint(m_io_service.get_ips().front(), 6881)); + } + + void bootstrap(std::vector> const& nodes) + { + // we don't want to tell every node about every other node. That's way too + // expensive. instead. pick a random subset of nodes proportionate to the + // bucket it would fall into + + dht::node_id id = dht().nid(); + + // the number of slots left per bucket + std::array nodes_per_bucket; + nodes_per_bucket.fill(8); + + // when we use the larger routing table, the low buckets are larger + nodes_per_bucket[0] = 128; + nodes_per_bucket[1] = 64; + nodes_per_bucket[2] = 32; + nodes_per_bucket[3] = 16; + + for (auto const& n : nodes) + { + if (n.first == id) continue; + int const bucket = 159 - dht::distance_exp(id, n.first); + +/* printf("%s ^ %s = %s %d\n" + , to_hex(id.to_string()).c_str() + , to_hex(n.first.to_string()).c_str() + , to_hex(dht::distance(id, n.first).to_string()).c_str() + , bucket); +*/ + // there are no more slots in this bucket, just move ont + if (nodes_per_bucket[bucket] == 0) continue; + --nodes_per_bucket[bucket]; + bool const added = dht().m_table.node_seen(n.first, n.second, (lt::random() % 300) + 10); + TORRENT_ASSERT(added); + if (m_add_dead_nodes) + { + // generate a random node ID that would fall in `bucket` + dht::node_id const mask = dht::generate_prefix_mask(bucket + 1); + dht::node_id target = dht::generate_random_id() & ~mask; + target |= id & mask; + dht().m_table.node_seen(target, rand_udp_ep(), (lt::random() % 300) + 10); + } + } +/* + for (int i = 0; i < 40; ++i) + { + printf("%d ", nodes_per_bucket[i]); + } + printf("\n"); +*/ +//#error add invalid IPs as well, to simulate churn + } + + void stop() + { + sock().close(); + } + +#if LIBSIMULATOR_USE_MOVE + lt::dht::node& dht() { return m_dht; } + lt::dht::node const& dht() const { return m_dht; } +#else + lt::dht::node& dht() { return *m_dht; } + lt::dht::node const& dht() const { return *m_dht; } +#endif + +private: + asio::io_service m_io_service; +#if LIBSIMULATOR_USE_MOVE + lt::udp::socket m_socket; + lt::udp::socket& sock() { return m_socket; } + lt::dht::node m_dht; +#else + std::shared_ptr m_socket; + lt::udp::socket& sock() { return *m_socket; } + std::shared_ptr m_dht; +#endif + lt::udp::endpoint m_ep; + bool m_add_dead_nodes; + // since the simulation is single-threaded, only one socket at a time will + // actually be receiving a packet, so we can get away with just allocating + // one receive buffer, shared by all nodes + static char m_buffer[1300]; +}; + +char dht_node::m_buffer[1300]; + +dht_network::dht_network(sim::simulation& sim, int num_nodes) +{ + m_sett.ignore_dark_internet = false; + m_sett.restrict_routing_ips = false; + m_nodes.reserve(num_nodes); + +// TODO: how can we introduce churn among peers? + + std::vector> all_nodes; + all_nodes.reserve(num_nodes); + + for (int i = 0; i < num_nodes; ++i) + { + // node 0 is the one we log + m_nodes.emplace_back(sim, m_sett, m_cnt, i, 0/*, dht_node::add_dead_nodes*/); + all_nodes.push_back(m_nodes.back().node_info()); + } + + int cnt = 0; + for (auto& n : m_nodes) + { + n.bootstrap(all_nodes); + if (++cnt == 50) + { + // every now and then, shuffle all_nodes to make the + // routing tables more randomly distributed + std::random_shuffle(all_nodes.begin(), all_nodes.end()); + cnt = 0; + } + } +} + +dht_network::~dht_network() {} + +void print_routing_table(std::vector const& rt) +{ + int bucket = 0; + for (std::vector::const_iterator i = rt.begin() + , end(rt.end()); i != end; ++i, ++bucket) + { + char const* progress_bar = + "################################" + "################################" + "################################" + "################################"; + char const* short_progress_bar = "--------"; + printf("%3d [%3d, %d] %s%s\n" + , bucket, i->num_nodes, i->num_replacements + , progress_bar + (128 - i->num_nodes) + , short_progress_bar + (8 - (std::min)(8, i->num_replacements))); + } +} + +std::vector dht_network::router_nodes() const +{ + int idx = 0; + std::vector ret; + ret.reserve(8); + for (auto const& n : m_nodes) + { + if (idx >= 8) break; + ++idx; + ret.push_back(n.node_info().second); + } + return ret; +} + +void dht_network::stop() +{ + for (auto& n : m_nodes) n.stop(); } diff --git a/simulation/setup_dht.hpp b/simulation/setup_dht.hpp index 943bae7be..dcf428c9b 100644 --- a/simulation/setup_dht.hpp +++ b/simulation/setup_dht.hpp @@ -31,32 +31,44 @@ POSSIBILITY OF SUCH DAMAGE. */ -#include "libtorrent/io_service.hpp" +#ifndef TORRENT_SETUP_DHT_HPP_INCLUDED +#define TORRENT_SETUP_DHT_HPP_INCLUDED -namespace libtorrent { - class alert; - struct settings_pack; - struct add_torrent_params; - class session; +#include +#include "libtorrent/session_settings.hpp" // for dht_settings +#include "libtorrent/performance_counters.hpp" // for counters + +namespace lt = libtorrent; + +namespace sim +{ + struct simulation; } -struct network_setup_provider +namespace libtorrent { - // can be used to check expected end conditions - virtual void on_exit() {} + struct dht_routing_bucket; +} - // called for every alert. if the simulation is done, return true - virtual bool on_alert(libtorrent::alert const* alert, int session_idx) - { return false; } +struct dht_node; - virtual bool on_tick() = 0; +void print_routing_table(std::vector const& rt); - // called for every session that's added - virtual libtorrent::settings_pack add_session(int idx) = 0; +struct dht_network +{ + dht_network(sim::simulation& sim, int num_nodes); + ~dht_network(); - // called for a session right after it has been created - virtual void setup_session(libtorrent::session& ses, int idx) = 0; + void stop(); + std::vector router_nodes() const; + +private: + + // used for all the nodes in the network + lt::counters m_cnt; + lt::dht_settings m_sett; + std::vector m_nodes; }; -void setup_dht(int num_nodes, network_setup_provider& config); +#endif diff --git a/simulation/test_dht.cpp b/simulation/test_dht.cpp index 1278facb8..a2c5e3560 100644 --- a/simulation/test_dht.cpp +++ b/simulation/test_dht.cpp @@ -40,85 +40,89 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/session_settings.hpp" #include "libtorrent/session.hpp" #include "libtorrent/alert_types.hpp" +#include "libtorrent/deadline_timer.hpp" +#include "libtorrent/socket_io.hpp" +#include "setup_swarm.hpp" +#include "setup_dht.hpp" namespace lt = libtorrent; -struct network_config : network_setup_provider +TORRENT_TEST(dht_bootstrap) { - network_config() - : m_start_time(lt::clock_type::now()) - , m_ticks(0) - {} + sim::default_config cfg; + sim::simulation sim{cfg}; - virtual void on_exit() override final {} + dht_network dht(sim, 1000); - // called for every alert. if the simulation is done, return true - virtual bool on_alert(lt::alert const* alert - , int session_idx) override final - { - if (lt::dht_stats_alert const* p = lt::alert_cast(alert)) + int routing_table_depth = 0; + int num_nodes = 0; + + setup_swarm(1, swarm_test::download, sim + // add session + , [](lt::settings_pack& pack) { + } + // add torrent + , [](lt::add_torrent_params& params) {} + // on alert + , [&](lt::alert const* a, lt::session& ses) { - int bucket = 0; - for (std::vector::const_iterator i = p->routing_table.begin() - , end(p->routing_table.end()); i != end; ++i, ++bucket) + if (lt::dht_stats_alert const* p = lt::alert_cast(a)) { - char const* progress_bar = - "################################" - "################################" - "################################" - "################################"; - char const* short_progress_bar = "--------"; - printf("%3d [%3d, %d] %s%s\n" - , bucket, i->num_nodes, i->num_replacements - , progress_bar + (128 - i->num_nodes) - , short_progress_bar + (8 - (std::min)(8, i->num_replacements))); + routing_table_depth = p->routing_table.size(); + int c = 0; + for (auto const& b : p->routing_table) + { + c += b.num_nodes; + c += b.num_replacements; + } + num_nodes = c; + print_routing_table(p->routing_table); } } + // terminate? + , [&](int ticks, lt::session& ses) -> bool + { + if (ticks == 0) + { + lt::dht_settings sett; + sett.ignore_dark_internet = false; + ses.set_dht_settings(sett); - return false; - } + // bootstrap off of 8 of the nodes + lt::entry state; + lt::entry::list_type& nodes = state["dht state"]["nodes"].list(); + for (auto const& n : dht.router_nodes()) + { + std::string node; + std::back_insert_iterator out(node); + lt::detail::write_endpoint(n, out); + nodes.push_back(lt::entry(node)); + } - bool on_tick() override final - { - m_first_session->post_dht_stats(); - if (++m_ticks > 80) return true; - return false; - } + std::vector buf; + lt::bencode(std::back_inserter(buf), state); + lt::bdecode_node e; + lt::error_code ec; + lt::bdecode(&buf[0], &buf[0] + buf.size(), e, ec); - // called for every session that's added - virtual lt::settings_pack add_session(int idx) override final - { - lt::settings_pack pack = settings(); + ses.load_state(e); + lt::settings_pack pack; + pack.set_bool(lt::settings_pack::enable_dht, true); + ses.apply_settings(pack); + } + if (ticks > 2) + { + printf("depth: %d nodes: %d\n", routing_table_depth, num_nodes); + TEST_CHECK(routing_table_depth >= 8); + TEST_CHECK(num_nodes >= 74); + dht.stop(); + return true; + } + ses.post_dht_stats(); + return false; + }); - pack.set_bool(lt::settings_pack::enable_dht, true); + sim.run(); - return pack; - } - - virtual void setup_session(lt::session& ses, int idx) override final - { - if (idx == 0) m_first_session = &ses; - - // we have to do this since all of our simulated IP addresses are close to - // each other - lt::dht_settings sett; - sett.restrict_routing_ips = false; - sett.restrict_search_ips = false; - sett.privacy_lookups = false; - sett.extended_routing_table = false; - ses.set_dht_settings(sett); - } - -private: - lt::time_point m_start_time; - boost::shared_ptr m_ti; - lt::session* m_first_session; - int m_ticks; -}; - -TORRENT_TEST(dht) -{ - network_config cfg; - setup_dht(10, cfg); } diff --git a/src/kademlia/routing_table.cpp b/src/kademlia/routing_table.cpp index c14940b62..d1c958806 100644 --- a/src/kademlia/routing_table.cpp +++ b/src/kademlia/routing_table.cpp @@ -559,7 +559,7 @@ bool routing_table::add_node(node_entry e) routing_table::add_node_status_t routing_table::add_node_impl(node_entry e) { #ifdef TORRENT_EXPENSIVE_INVARIANT_CHECKS - INVARIANT_CHECK; +// INVARIANT_CHECK; #endif // if we already have this (IP,port), don't do anything