added support for infohashes sample in dht storage (#1586)

added support for infohashes sample in dht storage
This commit is contained in:
Alden Torres 2017-01-28 20:24:53 -05:00 committed by Arvid Norberg
parent 17ee28d176
commit deecdb6cfa
5 changed files with 224 additions and 0 deletions

View File

@ -206,6 +206,21 @@ namespace dht
, span<char const> salt , span<char const> salt
, address const& addr) = 0; , address const& addr) = 0;
// This function retrieves a sample infohashes
//
// For implementers:
// The infohashes should be stored in ["samples"] (N × 20 bytes).
// the following keys should be filled
// item["interval"] - the subset refresh interval in seconds.
// item["num"] - number of infohashes in storage.
//
// Internally, this function is allowed to lazily evaluate, cache
// and modify the actual sample to put in ``item``
//
// returns the number of infohashes in the sample.
//
virtual int get_infohashes_sample(entry& item) = 0;
// This function is called periodically (non-constant frequency). // This function is called periodically (non-constant frequency).
// //
// For implementers: // For implementers:

View File

@ -189,6 +189,17 @@ namespace libtorrent
// If the incoming requests causes to many bytes to be sent in responses, // If the incoming requests causes to many bytes to be sent in responses,
// incoming requests will be dropped until the quota has been replenished. // incoming requests will be dropped until the quota has been replenished.
int upload_rate_limit; int upload_rate_limit;
// the infohashes sample recomputation interval (in seconds).
// The node will precompute a subset of the tracked infohashes and return
// that instead of calculating it upon each request. The permissible range
// is between 0 and 21600 seconds (inclusive).
int sample_infohashes_interval = 21600;
// the maximum number of elements in the sampled subset of infohashes.
// If this number is too big, expect the DHT storage implementations
// to clamp it in order to allow UDP packets go through
int max_infohashes_sample_count = 20;
}; };

View File

@ -162,4 +162,54 @@ TORRENT_TEST(dht_storage_counters)
test_expiration(hours(1), s, c); // test expiration of everything after 3 hours test_expiration(hours(1), s, c); // test expiration of everything after 3 hours
} }
TORRENT_TEST(dht_storage_infohashes_sample)
{
dht_settings sett = test_settings();
sett.max_torrents = 5;
sett.sample_infohashes_interval = 30;
sett.max_infohashes_sample_count = 2;
std::unique_ptr<dht_storage_interface> s(create_default_dht_storage(sett));
TEST_CHECK(s.get() != nullptr);
sha1_hash const n1 = to_hash("5fbfbff10c5d6a4ec8a88e4c6ab4c28b95eee401");
sha1_hash const n2 = to_hash("5fbfbff10c5d6a4ec8a88e4c6ab4c28b95eee402");
sha1_hash const n3 = to_hash("5fbfbff10c5d6a4ec8a88e4c6ab4c28b95eee403");
sha1_hash const n4 = to_hash("5fbfbff10c5d6a4ec8a88e4c6ab4c28b95eee404");
tcp::endpoint const p1 = ep("124.31.75.21", 1);
tcp::endpoint const p2 = ep("124.31.75.22", 1);
tcp::endpoint const p3 = ep("124.31.75.23", 1);
tcp::endpoint const p4 = ep("124.31.75.24", 1);
s->announce_peer(n1, p1, "torrent_name1", false);
s->announce_peer(n2, p2, "torrent_name2", false);
s->announce_peer(n3, p3, "torrent_name3", false);
s->announce_peer(n4, p4, "torrent_name4", false);
entry item;
int r = s->get_infohashes_sample(item);
TEST_EQUAL(r, 2);
default_config cfg;
simulation sim(cfg);
sim::asio::io_service ios(sim, addr("10.0.0.1"));
sim::asio::high_resolution_timer timer(ios);
timer.expires_from_now(hours(1)); // expiration of torrents
timer.async_wait([&s](boost::system::error_code const& ec)
{
libtorrent::aux::update_time_now();
// tick here to trigger the torrents expiration
s->tick();
entry item;
int r = s->get_infohashes_sample(item);
TEST_EQUAL(r, 0);
});
boost::system::error_code ec;
sim.run(ec);
}
#endif // TORRENT_DISABLE_DHT #endif // TORRENT_DISABLE_DHT

View File

@ -171,6 +171,23 @@ namespace
, immutable_item_comparator(node_ids)); , immutable_item_comparator(node_ids));
} }
constexpr int sample_infohashes_interval_max = 21600;
constexpr int infohashes_sample_count_max = 20;
struct infohashes_sample
{
std::vector<sha1_hash> samples;
time_point created = min_time();
int count() const { return int(samples.size()); }
};
int clamp(int v, int lo, int hi)
{
TORRENT_ASSERT(lo <= hi);
return (v < lo) ? lo : (hi < v) ? hi : v;
}
class dht_default_storage final : public dht_storage_interface, boost::noncopyable class dht_default_storage final : public dht_storage_interface, boost::noncopyable
{ {
public: public:
@ -453,6 +470,21 @@ namespace
touch_item(i->second, addr); touch_item(i->second, addr);
} }
int get_infohashes_sample(entry& item) override
{
item["interval"] = clamp(m_settings.sample_infohashes_interval
, 0, sample_infohashes_interval_max);
item["num"] = int(m_map.size());
refresh_infohashes_sample();
std::vector<sha1_hash> const& samples = m_infohashes_sample.samples;
item["samples"] = span<char const>(
reinterpret_cast<char const*>(samples.data()), samples.size() * 20);
return m_infohashes_sample.count();
}
void tick() override void tick() override
{ {
// look through all peers and see if any have timed out // look through all peers and see if any have timed out
@ -517,6 +549,8 @@ namespace
std::map<node_id, dht_immutable_item> m_immutable_table; std::map<node_id, dht_immutable_item> m_immutable_table;
std::map<node_id, dht_mutable_item> m_mutable_table; std::map<node_id, dht_mutable_item> m_mutable_table;
infohashes_sample m_infohashes_sample;
void purge_peers(std::vector<peer_entry>& peers) void purge_peers(std::vector<peer_entry>& peers)
{ {
auto now = aux::time_now(); auto now = aux::time_now();
@ -532,6 +566,48 @@ namespace
if (!peers.empty() && peers.capacity() / peers.size() >= 4u) if (!peers.empty() && peers.capacity() / peers.size() >= 4u)
peers.shrink_to_fit(); peers.shrink_to_fit();
} }
void refresh_infohashes_sample()
{
time_point const now = aux::time_now();
int const interval = clamp(m_settings.sample_infohashes_interval
, 0, sample_infohashes_interval_max);
int const max_count = clamp(m_settings.max_infohashes_sample_count
, 0, infohashes_sample_count_max);
int const count = std::min(max_count, int(m_map.size()));
if (interval > 0
&& m_infohashes_sample.created + seconds(interval) > now
&& m_infohashes_sample.count() >= max_count)
return;
std::vector<sha1_hash>& samples = m_infohashes_sample.samples;
samples.clear();
samples.reserve(count);
int to_pick = count;
int candidates = int(m_map.size());
for (auto const& t : m_map)
{
if (to_pick == 0)
break;
TORRENT_ASSERT(candidates >= to_pick);
// pick this key with probability
// <keys left to pick> / <keys left in the set>
if (random(std::uint32_t(candidates--)) > std::uint32_t(to_pick))
continue;
samples.push_back(t.first);
--to_pick;
}
TORRENT_ASSERT(int(samples.size()) == count);
m_infohashes_sample.created = now;
}
}; };
} }

View File

@ -437,4 +437,76 @@ TORRENT_TEST(update_node_ids)
TEST_CHECK(r); TEST_CHECK(r);
} }
TORRENT_TEST(infohashes_sample)
{
dht_settings sett = test_settings();
sett.max_torrents = 5;
sett.sample_infohashes_interval = 10;
sett.max_infohashes_sample_count = 2;
std::unique_ptr<dht_storage_interface> s(create_default_dht_storage(sett));
tcp::endpoint const p1 = ep("124.31.75.21", 1);
tcp::endpoint const p2 = ep("124.31.75.22", 1);
tcp::endpoint const p3 = ep("124.31.75.23", 1);
tcp::endpoint const p4 = ep("124.31.75.24", 1);
s->announce_peer(n1, p1, "torrent_name1", false);
s->announce_peer(n2, p2, "torrent_name2", false);
s->announce_peer(n3, p3, "torrent_name3", false);
s->announce_peer(n4, p4, "torrent_name4", false);
entry item;
int r = s->get_infohashes_sample(item);
TEST_EQUAL(r, 2);
TEST_EQUAL(item["interval"].integer(), 10)
TEST_EQUAL(item["num"].integer(), 4);
TEST_EQUAL(item["samples"].string().size(), 2 * 20);
// get all of them
sett.max_infohashes_sample_count = 5;
item = entry();
r = s->get_infohashes_sample(item);
TEST_EQUAL(r, 4);
TEST_EQUAL(item["interval"].integer(), 10)
TEST_EQUAL(item["num"].integer(), 4);
TEST_EQUAL(item["samples"].string().size(), 4 * 20);
std::string const samples = item["samples"].to_string();
TEST_CHECK(samples.find(aux::to_hex(n1)) != std::string::npos);
TEST_CHECK(samples.find(aux::to_hex(n2)) != std::string::npos);
TEST_CHECK(samples.find(aux::to_hex(n3)) != std::string::npos);
TEST_CHECK(samples.find(aux::to_hex(n4)) != std::string::npos);
}
TORRENT_TEST(infohashes_sample_dist)
{
dht_settings sett = test_settings();
sett.max_torrents = 1000;
sett.sample_infohashes_interval = 0; // need this to force refresh every call
sett.max_infohashes_sample_count = 1;
std::unique_ptr<dht_storage_interface> s(create_default_dht_storage(sett));
for (int i = 0; i < 1000; ++i)
{
s->announce_peer(rand_hash(), tcp::endpoint(rand_v4(), std::uint16_t(i))
, "torrent_name", false);
}
std::set<sha1_hash> infohash_set;
for (int i = 0; i < 1000; ++i)
{
entry item;
int r = s->get_infohashes_sample(item);
TEST_EQUAL(r, 1);
TEST_EQUAL(item["interval"].integer(), 0)
TEST_EQUAL(item["num"].integer(), 1000);
TEST_EQUAL(item["samples"].string().size(), 20);
infohash_set.insert(sha1_hash(item["samples"].string()));
}
std::printf("infohashes set size: %d\n", int(infohash_set.size()));
TEST_CHECK(infohash_set.size() > 500);
}
#endif #endif