Merge pull request #1133 from ssiloti/peer-picker

fix DHT peer picking algorithm. The old code was always picking the first to_pick peers from the set.

* use a sorted vector to store peer announcments

Given the frequency of linear scans being done, std::set is clearly
sub-optimal for storing announced peers. A std::vector is the obvious
choice, which I also decided to make sorted. A sorted vector trades
better performance in announce_peer for slower purging, the latter
being mitigated by batching.

* shrink peers vectors with too much excess capacity
This commit is contained in:
Arvid Norberg 2016-09-23 13:48:00 -07:00 committed by GitHub
commit 24b271b7c1
2 changed files with 103 additions and 52 deletions

View File

@ -73,7 +73,8 @@ namespace
struct torrent_entry struct torrent_entry
{ {
std::string name; std::string name;
std::set<peer_entry> peers; std::vector<peer_entry> peers4;
std::vector<peer_entry> peers6;
}; };
// TODO: 2 make this configurable in dht_settings // TODO: 2 make this configurable in dht_settings
@ -176,7 +177,7 @@ namespace
{ {
size_t ret = 0; size_t ret = 0;
for (auto const& t : m_map) for (auto const& t : m_map)
ret += t.second.peers.size(); ret += t.second.peers4.size() + t.second.peers6.size();
return ret; return ret;
} }
#endif #endif
@ -193,6 +194,7 @@ namespace
if (i == m_map.end()) return int(m_map.size()) >= m_settings.max_torrents; if (i == m_map.end()) return int(m_map.size()) >= m_settings.max_torrents;
torrent_entry const& v = i->second; torrent_entry const& v = i->second;
auto const& peersv = requester.is_v4() ? v.peers4 : v.peers6;
if (!v.name.empty()) peers["n"] = v.name; if (!v.name.empty()) peers["n"] = v.name;
@ -201,7 +203,7 @@ namespace
bloom_filter<256> downloaders; bloom_filter<256> downloaders;
bloom_filter<256> seeds; bloom_filter<256> seeds;
for (auto const& p : v.peers) for (auto const& p : peersv)
{ {
sha1_hash const iphash = hash_address(p.addr.address()); sha1_hash const iphash = hash_address(p.addr.address());
if (p.seed) seeds.set(iphash); if (p.seed) seeds.set(iphash);
@ -214,51 +216,45 @@ namespace
else else
{ {
tcp const protocol = requester.is_v4() ? tcp::v4() : tcp::v6(); tcp const protocol = requester.is_v4() ? tcp::v4() : tcp::v6();
int max = m_settings.max_peers_reply; int to_pick = m_settings.max_peers_reply;
// if these are IPv6 peers their addresses are 4x the size of IPv4 // if these are IPv6 peers their addresses are 4x the size of IPv4
// so reduce the max peers 4 fold to compensate // so reduce the max peers 4 fold to compensate
// max_peers_reply should probably be specified in bytes // max_peers_reply should probably be specified in bytes
if (!v.peers.empty() && protocol == tcp::v6()) if (!peersv.empty() && protocol == tcp::v6())
max /= 4; to_pick /= 4;
// we're picking "to_pick" from a list of "num" at random.
int const to_pick = std::min(int(v.peers.size()), max);
std::set<peer_entry>::const_iterator iter = v.peers.begin();
entry::list_type& pe = peers["values"].list(); entry::list_type& pe = peers["values"].list();
for (int t = 0, m = 0; m < to_pick && iter != v.peers.end(); ++iter) int candidates = int(std::count_if(peersv.begin(), peersv.end()
, [=](peer_entry const& e) { return !(noseed && e.seed); }));
to_pick = (std::min)(to_pick, candidates);
for (auto iter = peersv.begin(); to_pick > 0; ++iter)
{ {
// if the node asking for peers is a seed, skip seeds from the // if the node asking for peers is a seed, skip seeds from the
// peer list // peer list
if (noseed && iter->seed) continue; if (noseed && iter->seed) continue;
// only include peers with the right address family TORRENT_ASSERT(candidates >= to_pick);
if (iter->addr.protocol() != protocol)
// pick this peer with probability
// <peers left to pick> / <peers left in the set>
if (random(uint32_t(candidates--)) > to_pick)
continue; continue;
++t; pe.push_back(entry());
std::string* str; std::string& str = pe.back().string();
if (t <= to_pick)
{
pe.push_back(entry());
str = &pe.back().string();
}
else
{
// maybe replace an item we've already picked
if (random(t - 1) >= to_pick) continue;
str = &pe[random(to_pick - 1)].string();
}
str->resize(18); str.resize(18);
std::string::iterator out = str->begin(); std::string::iterator out = str.begin();
detail::write_endpoint(iter->addr, out); detail::write_endpoint(iter->addr, out);
str->resize(out - str->begin()); str.resize(out - str.begin());
++m; --to_pick;
} }
} }
if (int(i->second.peers.size()) < m_settings.max_peers) if (int(peersv.size()) < m_settings.max_peers)
return false; return false;
// we're at the max peers stored for this torrent // we're at the max peers stored for this torrent
@ -267,8 +263,8 @@ namespace
// a different port than the one it is using to send DHT messages // a different port than the one it is using to send DHT messages
peer_entry requester_entry; peer_entry requester_entry;
requester_entry.addr.address(requester); requester_entry.addr.address(requester);
auto requester_iter = i->second.peers.lower_bound(requester_entry); auto requester_iter = std::lower_bound(peersv.begin(), peersv.end(), requester_entry);
return requester_iter == i->second.peers.end() return requester_iter == peersv.end()
|| requester_iter->addr.address() != requester; || requester_iter->addr.address() != requester;
} }
@ -301,23 +297,27 @@ namespace
v->name = name.substr(0, 100).to_string(); v->name = name.substr(0, 100).to_string();
} }
auto& peersv = endp.protocol() == tcp::v4() ? v->peers4 : v->peers6;
peer_entry peer; peer_entry peer;
peer.addr = endp; peer.addr = endp;
peer.added = aux::time_now(); peer.added = aux::time_now();
peer.seed = seed; peer.seed = seed;
auto i = v->peers.find(peer); auto i = std::lower_bound(peersv.begin(), peersv.end(), peer);
if (i != v->peers.end()) if (i != peersv.end() && i->addr == endp)
{ {
v->peers.erase(i++); *i = peer;
m_counters.peers -= 1;
} }
else if (v->peers.size() >= m_settings.max_peers) else if (peersv.size() >= m_settings.max_peers)
{ {
// we're at capacity, drop the announce // we're at capacity, drop the announce
return; return;
} }
v->peers.insert(i, peer); else
m_counters.peers += 1; {
peersv.insert(i, peer);
m_counters.peers += 1;
}
} }
bool get_immutable_item(sha1_hash const& target bool get_immutable_item(sha1_hash const& target
@ -457,9 +457,10 @@ namespace
for (auto i = m_map.begin(), end(m_map.end()); i != end;) for (auto i = m_map.begin(), end(m_map.end()); i != end;)
{ {
torrent_entry& t = i->second; torrent_entry& t = i->second;
purge_peers(t.peers); purge_peers(t.peers4);
purge_peers(t.peers6);
if (!t.peers.empty()) if (!t.peers4.empty() || !t.peers6.empty())
{ {
++i; ++i;
continue; continue;
@ -513,19 +514,20 @@ namespace
std::map<node_id, dht_immutable_item> m_immutable_table; std::map<node_id, dht_immutable_item> m_immutable_table;
std::map<node_id, dht_mutable_item> m_mutable_table; std::map<node_id, dht_mutable_item> m_mutable_table;
void purge_peers(std::set<peer_entry>& peers) void purge_peers(std::vector<peer_entry>& peers)
{ {
for (auto i = peers.begin(), end(peers.end()); i != end;) auto now = aux::time_now();
auto new_end = std::remove_if(peers.begin(), peers.end()
, [=](peer_entry const& e)
{ {
// the peer has timed out return e.added + minutes(int(announce_interval * 3 / 2)) < now;
if (i->added + minutes(int(announce_interval * 3 / 2)) < aux::time_now()) });
{
peers.erase(i++); m_counters.peers -= std::distance(new_end, peers.end());
m_counters.peers -= 1; peers.erase(new_end, peers.end());
} // if we're using less than 1/4 of the capacity free up the excess
else if (!peers.empty() && peers.capacity() / peers.size() >= 4u)
++i; peers.shrink_to_fit();
}
} }
}; };
} }

View File

@ -338,6 +338,55 @@ TORRENT_TEST(mutable_item_limit)
TEST_EQUAL(cnt.mutable_data, 42); TEST_EQUAL(cnt.mutable_data, 42);
} }
TORRENT_TEST(get_peers_dist)
{
// test that get_peers returns reasonably disjoint sets of peers with each call
// take two samples of 100 peers from 1000 and make sure there aren't too many
// peers found in both lists
dht_settings sett = test_settings();
sett.max_peers = 2000;
sett.max_peers_reply = 100;
std::unique_ptr<dht_storage_interface> s(create_default_dht_storage(sett));
address addr = rand_v4();
for (int i = 0; i < 1000; ++i)
{
s->announce_peer(n1, tcp::endpoint(addr, uint16_t(i))
, "torrent_name", false);
}
std::set<int> peer_set;
int duplicates = 0;
for (int i = 0; i < 2; ++i)
{
entry peers;
s->get_peers(n1, false, false, address(), peers);
TEST_EQUAL(peers["values"].list().size(), 100);
for (auto const& p : peers["values"].list())
{
int port = detail::read_v4_endpoint<tcp::endpoint>(p.string().begin()).port();
if (!peer_set.insert(port).second)
++duplicates;
}
}
std::printf("duplicate peers found: %d\n", duplicates);
TEST_CHECK(duplicates < 20);
// add 1000 seeds to the mix and make sure we still pick the desired number
// of peers if we select only non-seeds
for (int i = 1000; i < 2000; ++i)
{
s->announce_peer(n1, tcp::endpoint(addr, uint16_t(i))
, "torrent_name", true);
}
{
entry peers;
s->get_peers(n1, true, false, address(), peers);
TEST_EQUAL(peers["values"].list().size(), 100);
}
}
TORRENT_TEST(update_node_ids) TORRENT_TEST(update_node_ids)
{ {
dht_settings sett = test_settings(); dht_settings sett = test_settings();