diff --git a/include/libtorrent/kademlia/dht_storage.hpp b/include/libtorrent/kademlia/dht_storage.hpp index 6ec1ed4c2..cec7448b7 100644 --- a/include/libtorrent/kademlia/dht_storage.hpp +++ b/include/libtorrent/kademlia/dht_storage.hpp @@ -206,6 +206,21 @@ namespace dht , span salt , address const& addr) = 0; + // This function retrieves a sample infohashes + // + // For implementers: + // The infohashes should be stored in ["samples"] (N × 20 bytes). + // the following keys should be filled + // item["interval"] - the subset refresh interval in seconds. + // item["num"] - number of infohashes in storage. + // + // Internally, this function is allowed to lazily evaluate, cache + // and modify the actual sample to put in ``item`` + // + // returns the number of infohashes in the sample. + // + virtual int get_infohashes_sample(entry& item) = 0; + // This function is called periodically (non-constant frequency). // // For implementers: diff --git a/include/libtorrent/session_settings.hpp b/include/libtorrent/session_settings.hpp index 48e8b3ede..468d334ab 100644 --- a/include/libtorrent/session_settings.hpp +++ b/include/libtorrent/session_settings.hpp @@ -189,6 +189,17 @@ namespace libtorrent // If the incoming requests causes to many bytes to be sent in responses, // incoming requests will be dropped until the quota has been replenished. int upload_rate_limit; + + // the infohashes sample recomputation interval (in seconds). + // The node will precompute a subset of the tracked infohashes and return + // that instead of calculating it upon each request. The permissible range + // is between 0 and 21600 seconds (inclusive). + int sample_infohashes_interval = 21600; + + // the maximum number of elements in the sampled subset of infohashes. + // If this number is too big, expect the DHT storage implementations + // to clamp it in order to allow UDP packets go through + int max_infohashes_sample_count = 20; }; diff --git a/simulation/test_dht_storage.cpp b/simulation/test_dht_storage.cpp index eadbf1d81..6e91ffd0c 100644 --- a/simulation/test_dht_storage.cpp +++ b/simulation/test_dht_storage.cpp @@ -162,4 +162,54 @@ TORRENT_TEST(dht_storage_counters) test_expiration(hours(1), s, c); // test expiration of everything after 3 hours } +TORRENT_TEST(dht_storage_infohashes_sample) +{ + dht_settings sett = test_settings(); + sett.max_torrents = 5; + sett.sample_infohashes_interval = 30; + sett.max_infohashes_sample_count = 2; + std::unique_ptr s(create_default_dht_storage(sett)); + + TEST_CHECK(s.get() != nullptr); + + sha1_hash const n1 = to_hash("5fbfbff10c5d6a4ec8a88e4c6ab4c28b95eee401"); + sha1_hash const n2 = to_hash("5fbfbff10c5d6a4ec8a88e4c6ab4c28b95eee402"); + sha1_hash const n3 = to_hash("5fbfbff10c5d6a4ec8a88e4c6ab4c28b95eee403"); + sha1_hash const n4 = to_hash("5fbfbff10c5d6a4ec8a88e4c6ab4c28b95eee404"); + + tcp::endpoint const p1 = ep("124.31.75.21", 1); + tcp::endpoint const p2 = ep("124.31.75.22", 1); + tcp::endpoint const p3 = ep("124.31.75.23", 1); + tcp::endpoint const p4 = ep("124.31.75.24", 1); + + s->announce_peer(n1, p1, "torrent_name1", false); + s->announce_peer(n2, p2, "torrent_name2", false); + s->announce_peer(n3, p3, "torrent_name3", false); + s->announce_peer(n4, p4, "torrent_name4", false); + + entry item; + int r = s->get_infohashes_sample(item); + TEST_EQUAL(r, 2); + + default_config cfg; + simulation sim(cfg); + sim::asio::io_service ios(sim, addr("10.0.0.1")); + + sim::asio::high_resolution_timer timer(ios); + timer.expires_from_now(hours(1)); // expiration of torrents + timer.async_wait([&s](boost::system::error_code const& ec) + { + libtorrent::aux::update_time_now(); + // tick here to trigger the torrents expiration + s->tick(); + + entry item; + int r = s->get_infohashes_sample(item); + TEST_EQUAL(r, 0); + }); + + boost::system::error_code ec; + sim.run(ec); +} + #endif // TORRENT_DISABLE_DHT diff --git a/src/kademlia/dht_storage.cpp b/src/kademlia/dht_storage.cpp index 31cd71b38..92dd48d01 100644 --- a/src/kademlia/dht_storage.cpp +++ b/src/kademlia/dht_storage.cpp @@ -171,6 +171,23 @@ namespace , immutable_item_comparator(node_ids)); } + constexpr int sample_infohashes_interval_max = 21600; + constexpr int infohashes_sample_count_max = 20; + + struct infohashes_sample + { + std::vector samples; + time_point created = min_time(); + + int count() const { return int(samples.size()); } + }; + + int clamp(int v, int lo, int hi) + { + TORRENT_ASSERT(lo <= hi); + return (v < lo) ? lo : (hi < v) ? hi : v; + } + class dht_default_storage final : public dht_storage_interface, boost::noncopyable { public: @@ -453,6 +470,21 @@ namespace touch_item(i->second, addr); } + int get_infohashes_sample(entry& item) override + { + item["interval"] = clamp(m_settings.sample_infohashes_interval + , 0, sample_infohashes_interval_max); + item["num"] = int(m_map.size()); + + refresh_infohashes_sample(); + + std::vector const& samples = m_infohashes_sample.samples; + item["samples"] = span( + reinterpret_cast(samples.data()), samples.size() * 20); + + return m_infohashes_sample.count(); + } + void tick() override { // look through all peers and see if any have timed out @@ -517,6 +549,8 @@ namespace std::map m_immutable_table; std::map m_mutable_table; + infohashes_sample m_infohashes_sample; + void purge_peers(std::vector& peers) { auto now = aux::time_now(); @@ -532,6 +566,48 @@ namespace if (!peers.empty() && peers.capacity() / peers.size() >= 4u) peers.shrink_to_fit(); } + + void refresh_infohashes_sample() + { + time_point const now = aux::time_now(); + int const interval = clamp(m_settings.sample_infohashes_interval + , 0, sample_infohashes_interval_max); + + int const max_count = clamp(m_settings.max_infohashes_sample_count + , 0, infohashes_sample_count_max); + int const count = std::min(max_count, int(m_map.size())); + + if (interval > 0 + && m_infohashes_sample.created + seconds(interval) > now + && m_infohashes_sample.count() >= max_count) + return; + + std::vector& samples = m_infohashes_sample.samples; + samples.clear(); + samples.reserve(count); + + int to_pick = count; + int candidates = int(m_map.size()); + + for (auto const& t : m_map) + { + if (to_pick == 0) + break; + + TORRENT_ASSERT(candidates >= to_pick); + + // pick this key with probability + // / + if (random(std::uint32_t(candidates--)) > std::uint32_t(to_pick)) + continue; + + samples.push_back(t.first); + --to_pick; + } + + TORRENT_ASSERT(int(samples.size()) == count); + m_infohashes_sample.created = now; + } }; } diff --git a/test/test_dht_storage.cpp b/test/test_dht_storage.cpp index c928891bf..526563c07 100644 --- a/test/test_dht_storage.cpp +++ b/test/test_dht_storage.cpp @@ -437,4 +437,76 @@ TORRENT_TEST(update_node_ids) TEST_CHECK(r); } +TORRENT_TEST(infohashes_sample) +{ + dht_settings sett = test_settings(); + sett.max_torrents = 5; + sett.sample_infohashes_interval = 10; + sett.max_infohashes_sample_count = 2; + std::unique_ptr s(create_default_dht_storage(sett)); + + tcp::endpoint const p1 = ep("124.31.75.21", 1); + tcp::endpoint const p2 = ep("124.31.75.22", 1); + tcp::endpoint const p3 = ep("124.31.75.23", 1); + tcp::endpoint const p4 = ep("124.31.75.24", 1); + + s->announce_peer(n1, p1, "torrent_name1", false); + s->announce_peer(n2, p2, "torrent_name2", false); + s->announce_peer(n3, p3, "torrent_name3", false); + s->announce_peer(n4, p4, "torrent_name4", false); + + entry item; + int r = s->get_infohashes_sample(item); + TEST_EQUAL(r, 2); + TEST_EQUAL(item["interval"].integer(), 10) + TEST_EQUAL(item["num"].integer(), 4); + TEST_EQUAL(item["samples"].string().size(), 2 * 20); + + // get all of them + sett.max_infohashes_sample_count = 5; + + item = entry(); + r = s->get_infohashes_sample(item); + TEST_EQUAL(r, 4); + TEST_EQUAL(item["interval"].integer(), 10) + TEST_EQUAL(item["num"].integer(), 4); + TEST_EQUAL(item["samples"].string().size(), 4 * 20); + + std::string const samples = item["samples"].to_string(); + TEST_CHECK(samples.find(aux::to_hex(n1)) != std::string::npos); + TEST_CHECK(samples.find(aux::to_hex(n2)) != std::string::npos); + TEST_CHECK(samples.find(aux::to_hex(n3)) != std::string::npos); + TEST_CHECK(samples.find(aux::to_hex(n4)) != std::string::npos); +} + +TORRENT_TEST(infohashes_sample_dist) +{ + dht_settings sett = test_settings(); + sett.max_torrents = 1000; + sett.sample_infohashes_interval = 0; // need this to force refresh every call + sett.max_infohashes_sample_count = 1; + std::unique_ptr s(create_default_dht_storage(sett)); + + for (int i = 0; i < 1000; ++i) + { + s->announce_peer(rand_hash(), tcp::endpoint(rand_v4(), std::uint16_t(i)) + , "torrent_name", false); + } + + std::set infohash_set; + for (int i = 0; i < 1000; ++i) + { + entry item; + int r = s->get_infohashes_sample(item); + TEST_EQUAL(r, 1); + TEST_EQUAL(item["interval"].integer(), 0) + TEST_EQUAL(item["num"].integer(), 1000); + TEST_EQUAL(item["samples"].string().size(), 20); + + infohash_set.insert(sha1_hash(item["samples"].string())); + } + std::printf("infohashes set size: %d\n", int(infohash_set.size())); + TEST_CHECK(infohash_set.size() > 500); +} + #endif