optimized DHT routing table memory usage

This commit is contained in:
Arvid Norberg 2010-01-03 11:08:39 +00:00
parent cbf2526a8c
commit 11cd7af4e7
11 changed files with 437 additions and 483 deletions

View File

@ -5,6 +5,7 @@
* cleaned up usage of MAX_PATH and related macros
* made it possible to build libtorrent without RTTI support
* added support to build with libgcrypt and a shipped version of libtommath
* optimized DHT routing table memory usage
0.15 release

View File

@ -184,6 +184,7 @@ public:
virtual ~node_impl() {}
void tick();
void refresh(node_id const& id, find_data::nodes_callback const& f);
void bootstrap(std::vector<udp::endpoint> const& nodes
, find_data::nodes_callback const& f);
@ -200,15 +201,8 @@ public:
return ret;
}
void refresh();
void refresh_bucket(int bucket);
int bucket_size(int bucket);
typedef routing_table::iterator iterator;
iterator begin() const { return m_table.begin(); }
iterator end() const { return m_table.end(); }
node_id const& nid() const { return m_id; }
boost::tuple<int, int> size() const{ return m_table.size(); }
@ -233,7 +227,6 @@ public:
// the returned time is the delay until connection_timeout()
// should be called again the next time
time_duration connection_timeout();
time_duration refresh_timeout();
// generates a new secret number used to generate write tokens
void new_write_key();

View File

@ -52,6 +52,7 @@ struct node_entry
first_seen = time_now();
#endif
}
node_entry(udp::endpoint ep)
: addr(ep.address())
, port(ep.port())

View File

@ -36,8 +36,6 @@ POSSIBILITY OF SUCH DAMAGE.
#include <vector>
#include <boost/cstdint.hpp>
#include <boost/iterator/iterator_facade.hpp>
#include <boost/iterator/iterator_categories.hpp>
#include <boost/utility.hpp>
#include <boost/tuple/tuple.hpp>
#include <boost/array.hpp>
@ -67,11 +65,16 @@ TORRENT_DECLARE_LOG(table);
typedef std::vector<node_entry> bucket_t;
struct routing_table_node
{
bucket_t replacements;
bucket_t live_nodes;
ptime last_active;
};
// differences in the implementation from the description in
// the paper:
//
// * The routing table tree is not allocated dynamically, there
// are always 160 buckets.
// * Nodes are not marked as being stale, they keep a counter
// that tells how many times in a row they have failed. When
// a new node is to be inserted, the node that has failed
@ -79,89 +82,9 @@ typedef std::vector<node_entry> bucket_t;
// bucket has failed, then it is put in the replacement
// cache (just like in the paper).
class routing_table;
namespace aux
{
// Iterates over a flattened routing_table structure.
class routing_table_iterator
: public boost::iterator_facade<
routing_table_iterator
, node_entry const
, boost::forward_traversal_tag
>
{
public:
routing_table_iterator()
{
}
private:
friend class libtorrent::dht::routing_table;
friend class boost::iterator_core_access;
typedef boost::array<std::pair<bucket_t, bucket_t>, 160>::const_iterator
bucket_iterator_t;
routing_table_iterator(
bucket_iterator_t begin
, bucket_iterator_t end)
: m_bucket_iterator(begin)
, m_bucket_end(end)
{
if (m_bucket_iterator == m_bucket_end) return;
m_iterator = begin->first.begin();
while (m_iterator == m_bucket_iterator->first.end())
{
if (++m_bucket_iterator == m_bucket_end)
break;
m_iterator = m_bucket_iterator->first.begin();
}
}
bool equal(routing_table_iterator const& other) const
{
return m_bucket_iterator == other.m_bucket_iterator
&& (m_bucket_iterator == m_bucket_end
|| *m_iterator == other.m_iterator);
}
void increment()
{
TORRENT_ASSERT(m_bucket_iterator != m_bucket_end);
++*m_iterator;
while (*m_iterator == m_bucket_iterator->first.end())
{
if (++m_bucket_iterator == m_bucket_end)
break;
m_iterator = m_bucket_iterator->first.begin();
}
}
node_entry const& dereference() const
{
TORRENT_ASSERT(m_bucket_iterator != m_bucket_end);
return **m_iterator;
}
bucket_iterator_t m_bucket_iterator;
bucket_iterator_t m_bucket_end;
// when debug iterators are enabled, default constructed
// iterators are not allowed to be copied. In the case
// where the routing table is empty, m_iterator would be
// default constructed and not copyable.
boost::optional<bucket_t::const_iterator> m_iterator;
};
} // namespace aux
class TORRENT_EXPORT routing_table
{
public:
typedef aux::routing_table_iterator iterator;
typedef iterator const_iterator;
routing_table(node_id const& id, int bucket_size
, dht_settings const& settings);
@ -178,46 +101,46 @@ public:
router_iterator router_begin() const { return m_router_nodes.begin(); }
router_iterator router_end() const { return m_router_nodes.end(); }
bool add_node(node_entry const& e);
// this function is called every time the node sees
// a sign of a node being alive. This node will either
// be inserted in the k-buckets or be moved to the top
// of its bucket.
bool node_seen(node_id const& id, udp::endpoint addr);
// returns time when the given bucket needs another refresh.
// if the given bucket is empty but there are nodes
// in a bucket closer to us, or if the bucket is non-empty and
// the time from the last activity is more than 15 minutes
ptime next_refresh(int bucket);
enum
{
include_self = 1,
include_failed = 2
};
// fills the vector with the count nodes from our buckets that
// are nearest to the given id.
void find_node(node_id const& id, std::vector<node_entry>& l
, int options, int count = 0);
bool node_seen(node_id const& id, udp::endpoint ep);
// this may add a node to the routing table and mark it as
// not pinged. If the bucket the node falls into is full,
// the node will be ignored.
void heard_about(node_id const& id, udp::endpoint const& ep);
// this will set the given bucket's latest activity
// to the current time
void touch_bucket(int bucket);
// if any bucket in the routing table needs to be refreshed
// this function will return true and set the target to an
// appropriate target inside that bucket
bool need_refresh(node_id& target) const;
enum
{
include_failed = 1
};
// fills the vector with the count nodes from our buckets that
// are nearest to the given id.
void find_node(node_id const& id, std::vector<node_entry>& l
, int options, int count = 0);
int bucket_size(int bucket)
{
TORRENT_ASSERT(bucket >= 0 && bucket < 160);
return (int)m_buckets[bucket].first.size();
int num_buckets = m_buckets.size();
if (bucket < num_buckets) bucket = num_buckets - 1;
table_t::iterator i = m_buckets.begin();
std::advance(i, bucket);
return (int)i->live_nodes.size();
}
int bucket_size() const { return m_bucket_size; }
iterator begin() const;
iterator end() const;
void for_each_node(void (*)(void*, node_entry const&)
, void (*)(void*, node_entry const&), void* userdata) const;
int bucket_size() const { return m_bucket_size; }
boost::tuple<int, int> size() const;
size_type num_global_nodes() const;
@ -225,11 +148,11 @@ public:
// returns true if there are no working nodes
// in the routing table
bool need_bootstrap() const;
int num_active_buckets() const
{ return 160 - m_lowest_active_bucket + 1; }
int num_active_buckets() const { return m_buckets.size(); }
void replacement_cache(bucket_t& nodes) const;
#ifdef TORRENT_DHT_VERBOSE_LOGGING
#if defined TORRENT_DHT_VERBOSE_LOGGING || defined TORRENT_DEBUG
// used for debug and monitoring purposes. This will print out
// the state of the routing table to the given stream
void print_state(std::ostream& os) const;
@ -237,17 +160,23 @@ public:
private:
typedef std::list<routing_table_node> table_t;
table_t::iterator find_bucket(node_id const& id);
// constant called k in paper
int m_bucket_size;
dht_settings const& m_settings;
// 160 (k-bucket, replacement cache) pairs
typedef boost::array<std::pair<bucket_t, bucket_t>, 160> table_t;
// (k-bucket, replacement cache) pairs
// the first entry is the bucket the furthest
// away from our own ID. Each time the bucket
// closest to us (m_buckets.back()) has more than
// bucket size nodes in it, another bucket is
// added to the end and it's split up between them
table_t m_buckets;
// timestamps of the last activity in each bucket
typedef boost::array<ptime, 160> table_activity_t;
table_activity_t m_bucket_activity;
node_id m_id; // our own node id
// this is a set of all the endpoints that have
@ -255,9 +184,6 @@ private:
// be used in searches, but they will never
// be added to the routing table.
std::set<udp::endpoint> m_router_nodes;
// this is the lowest bucket index with nodes in it
int m_lowest_active_bucket;
};
} } // namespace libtorrent::dht

View File

@ -74,7 +74,8 @@ public:
void unreachable(udp::endpoint const& ep);
// returns true if the node needs a refresh
bool incoming(msg const&);
// if so, id is assigned the node id to refresh
bool incoming(msg const&, node_id* id);
time_duration tick();
bool invoke(entry& e, udp::endpoint target

View File

@ -322,9 +322,9 @@ namespace libtorrent { namespace dht
mutex_t::scoped_lock l(m_mutex);
if (e || m_abort) return;
time_duration d = m_dht.refresh_timeout();
m_dht.tick();
error_code ec;
m_refresh_timer.expires_from_now(d, ec);
m_refresh_timer.expires_from_now(seconds(5), ec);
m_refresh_timer.async_wait(
bind(&dht_tracker::refresh_timeout, self(), _1));
}
@ -542,20 +542,22 @@ namespace libtorrent { namespace dht
m_dht.incoming(m);
}
void add_node_fun(void* userdata, node_entry const& e)
{
entry* n = (entry*)userdata;
std::string node;
std::back_insert_iterator<std::string> out(node);
write_endpoint(e.ep(), out);
n->list().push_back(entry(node));
}
entry dht_tracker::state() const
{
mutex_t::scoped_lock l(m_mutex);
entry ret(entry::dictionary_t);
{
entry nodes(entry::list_t);
for (node_impl::iterator i(m_dht.begin())
, end(m_dht.end()); i != end; ++i)
{
std::string node;
std::back_insert_iterator<std::string> out(node);
write_endpoint(udp::endpoint(i->addr, i->port), out);
nodes.list().push_back(entry(node));
}
m_dht.m_table.for_each_node(&add_node_fun, &add_node_fun, &nodes);
bucket_t cache;
m_dht.replacement_cache(cache);
for (bucket_t::iterator i(cache.begin())

View File

@ -169,6 +169,12 @@ void find_data_observer::reply(msg const& m)
done();
}
void add_entry_fun(void* userdata, node_entry const& e)
{
traversal_algorithm* f = (traversal_algorithm*)userdata;
f->add_entry(e.id, e.ep(), traversal_algorithm::result::initial);
}
find_data::find_data(
node_impl& node
, node_id target
@ -181,11 +187,7 @@ find_data::find_data(
, m_done(false)
, m_got_peers(false)
{
for (routing_table::const_iterator i = node.m_table.begin()
, end(node.m_table.end()); i != end; ++i)
{
add_entry(i->id, i->ep(), result::initial);
}
node.m_table.for_each_node(&add_entry_fun, 0, (traversal_algorithm*)this);
}
bool find_data::invoke(udp::endpoint addr)

View File

@ -267,13 +267,13 @@ void node_impl::bootstrap(std::vector<udp::endpoint> const& nodes
r->start();
}
/*
void node_impl::refresh()
{
boost::intrusive_ptr<dht::refresh> r(new dht::refresh(*this, m_id, boost::bind(&nop)));
r->start();
}
*/
int node_impl::bucket_size(int bucket)
{
return m_table.bucket_size(bucket);
@ -285,39 +285,6 @@ void node_impl::new_write_key()
m_secret[0] = std::rand();
}
void node_impl::refresh_bucket(int bucket)
{
TORRENT_ASSERT(bucket >= 0 && bucket < 160);
// generate a random node_id within the given bucket
node_id target = generate_id();
int num_bits = 160 - bucket;
node_id mask(0);
for (int i = 0; i < num_bits; ++i)
{
int byte = i / 8;
mask[byte] |= 0x80 >> (i % 8);
}
node_id root = m_id;
root &= mask;
target &= ~mask;
target |= root;
// make sure this is in another subtree than m_id
// clear the (num_bits - 1) bit and then set it to the
// inverse of m_id's corresponding bit.
target[(num_bits - 1) / 8] &= ~(0x80 >> ((num_bits - 1) % 8));
target[(num_bits - 1) / 8] |=
(~(m_id[(num_bits - 1) / 8])) & (0x80 >> ((num_bits - 1) % 8));
TORRENT_ASSERT(distance_exp(m_id, target) == bucket);
boost::intrusive_ptr<dht::refresh> ta(new dht::refresh(*this, target, bind(&nop)));
ta->start();
m_table.touch_bucket(bucket);
}
void node_impl::unreachable(udp::endpoint const& ep)
{
m_rpc.unreachable(ep);
@ -341,7 +308,9 @@ void node_impl::incoming(msg const& m)
{
case 'r':
{
if (m_rpc.incoming(m)) refresh();
node_id id;
if (m_rpc.incoming(m, &id))
refresh(id, boost::bind(&nop));
break;
}
case 'q':
@ -451,43 +420,11 @@ void node_impl::announce(sha1_hash const& info_hash, int listen_port
ta->start();
}
time_duration node_impl::refresh_timeout()
void node_impl::tick()
{
int refresh = -1;
ptime now = time_now();
ptime next = now + minutes(15);
for (int i = 0; i < 160; ++i)
{
ptime r = m_table.next_refresh(i);
if (r <= next)
{
refresh = i;
next = r;
}
}
if (next < now)
{
TORRENT_ASSERT(refresh > -1);
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(node) << "refreshing bucket: " << refresh;
#endif
refresh_bucket(refresh);
}
time_duration next_refresh = next - now;
time_duration min_next_refresh
= minutes(15) / m_table.num_active_buckets();
if (min_next_refresh > seconds(40))
min_next_refresh = seconds(40);
if (next_refresh < min_next_refresh)
next_refresh = min_next_refresh;
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(node) << "next refresh: " << total_seconds(next_refresh) << " seconds";
#endif
return next_refresh;
node_id target;
if (m_table.need_refresh(target))
refresh(target, boost::bind(&nop));
}
time_duration node_impl::connection_timeout()

View File

@ -60,13 +60,7 @@ routing_table::routing_table(node_id const& id, int bucket_size
: m_bucket_size(bucket_size)
, m_settings(settings)
, m_id(id)
, m_lowest_active_bucket(160)
{
// distribute the refresh times for the buckets in an
// attempt to even out the network load
for (int i = 0; i < 160; ++i)
m_bucket_activity[i] = time_now() - milliseconds(i*5625);
m_bucket_activity[0] = time_now() - minutes(15);
}
void routing_table::status(session_status& s) const
@ -82,27 +76,25 @@ boost::tuple<int, int> routing_table::size() const
for (table_t::const_iterator i = m_buckets.begin()
, end(m_buckets.end()); i != end; ++i)
{
nodes += i->first.size();
replacements += i->second.size();
nodes += i->live_nodes.size();
replacements += i->replacements.size();
}
return boost::make_tuple(nodes, replacements);
}
size_type routing_table::num_global_nodes() const
{
int first_full = m_lowest_active_bucket;
int num_nodes = 1; // we are one of the nodes
for (; first_full < 160
&& int(m_buckets[first_full].first.size()) < m_bucket_size;
++first_full)
for (table_t::const_iterator i = m_buckets.begin()
, end(m_buckets.end()); i != end; ++i)
{
num_nodes += m_buckets[first_full].first.size();
num_nodes += i->live_nodes.size();
}
return (2 << (160 - first_full)) * num_nodes;
return (2 << m_buckets.size()) * num_nodes;
}
#ifdef TORRENT_DHT_VERBOSE_LOGGING
#if defined TORRENT_DHT_VERBOSE_LOGGING || defined TORRENT_DEBUG
void routing_table::print_state(std::ostream& os) const
{
@ -116,27 +108,24 @@ void routing_table::print_state(std::ostream& os) const
os << "-";
os << "\n";
for (int k = 0; k < 8; ++k)
for (int k = 0; k < m_bucket_size; ++k)
{
for (table_t::const_iterator i = m_buckets.begin(), end(m_buckets.end());
i != end; ++i)
{
os << (int(i->first.size()) > (7 - k) ? "|" : " ");
os << (int(i->live_nodes.size()) > (m_bucket_size - 1 - k) ? "|" : " ");
}
os << "\n";
}
for (table_t::const_iterator i = m_buckets.begin(), end(m_buckets.end());
i != end; ++i)
{
os << "+";
}
for (int i = 0; i < 160; ++i) os << "+";
os << "\n";
for (int k = 0; k < 8; ++k)
for (int k = 0; k < m_bucket_size; ++k)
{
for (table_t::const_iterator i = m_buckets.begin(), end(m_buckets.end());
i != end; ++i)
{
os << (int(i->second.size()) > k ? "|" : " ");
os << (int(i->replacements.size()) > k ? "|" : " ");
}
os << "\n";
}
@ -146,34 +135,68 @@ void routing_table::print_state(std::ostream& os) const
os << "\n\n";
os << "nodes:\n";
int bucket_index = 0;
for (table_t::const_iterator i = m_buckets.begin(), end(m_buckets.end());
i != end; ++i)
i != end; ++i, ++bucket_index)
{
if (i->first.empty()) continue;
int bucket_index = int(i - m_buckets.begin());
// if (i->live_nodes.empty()) continue;
os << "=== BUCKET = " << bucket_index
<< " = " << (bucket_index >= m_lowest_active_bucket?"active":"inactive")
<< " = " << total_seconds(time_now() - m_bucket_activity[bucket_index])
<< " = " << total_seconds(time_now() - i->last_active)
<< " seconds ago ===== \n";
for (bucket_t::const_iterator j = i->first.begin()
, end(i->first.end()); j != end; ++j)
for (bucket_t::const_iterator j = i->live_nodes.begin()
, end(i->live_nodes.end()); j != end; ++j)
{
os << " id: " << j->id
<< " ip: " << j->ep()
<< " fails: " << j->fail_count()
<< " pinged: " << j->pinged()
<< " dist: " << distance_exp(m_id, j->id)
<< "\n";
}
}
}
#endif
/*
void routing_table::touch_bucket(int bucket)
{
m_bucket_activity[bucket] = time_now();
}
*/
bool routing_table::need_refresh(node_id& target) const
{
if (m_buckets.empty()) return false;
table_t::const_iterator i = std::min_element(m_buckets.begin(), m_buckets.end()
, boost::bind(&routing_table_node::last_active, _1)
< boost::bind(&routing_table_node::last_active, _2));
if (i->last_active > time_now() - minutes(15)) return false;
// generate a random node_id within the given bucket
target = generate_id();
int num_bits = std::distance(m_buckets.begin(), i);
node_id mask(0);
for (int i = 0; i < num_bits; ++i) mask[i/8] |= 0x80 >> (i&7);
// target = (target & ~mask) | (root & mask)
node_id root = m_id;
root &= mask;
target &= ~mask;
target |= root;
// make sure this is in another subtree than m_id
// clear the (num_bits - 1) bit and then set it to the
// inverse of m_id's corresponding bit.
target[(num_bits - 1) / 8] &= ~(0x80 >> ((num_bits - 1) % 8));
target[(num_bits - 1) / 8] |=
(~(m_id[(num_bits - 1) / 8])) & (0x80 >> ((num_bits - 1) % 8));
TORRENT_ASSERT(distance_exp(m_id, target) == 160 - std::distance(m_buckets.begin(), i));
return true;
}
/*
ptime routing_table::next_refresh(int bucket)
{
TORRENT_ASSERT(bucket < 160);
@ -184,69 +207,265 @@ ptime routing_table::next_refresh(int bucket)
return time_now() + minutes(15);
return m_bucket_activity[bucket] + minutes(15);
}
*/
void routing_table::replacement_cache(bucket_t& nodes) const
{
for (table_t::const_iterator i = m_buckets.begin()
, end(m_buckets.end()); i != end; ++i)
{
std::copy(i->second.begin(), i->second.end()
std::copy(i->replacements.begin(), i->replacements.end()
, std::back_inserter(nodes));
}
}
void routing_table::heard_about(node_id const& id, udp::endpoint const& ep)
routing_table::table_t::iterator routing_table::find_bucket(node_id const& id)
{
int bucket_index = distance_exp(m_id, id);
TORRENT_ASSERT(bucket_index < (int)m_buckets.size());
// TORRENT_ASSERT(id != m_id);
int num_buckets = m_buckets.size();
if (num_buckets == 0)
{
m_buckets.push_back(routing_table_node());
++num_buckets;
}
int bucket_index = (std::min)(159 - distance_exp(m_id, id), num_buckets - 1);
TORRENT_ASSERT(bucket_index < m_buckets.size());
TORRENT_ASSERT(bucket_index >= 0);
bucket_t& b = m_buckets[bucket_index].first;
bucket_t& rb = m_buckets[bucket_index].second;
table_t::iterator i = m_buckets.begin();
std::advance(i, bucket_index);
return i;
}
bool routing_table::add_node(node_entry const& e)
{
if (m_router_nodes.find(e.ep()) != m_router_nodes.end()) return false;
bool ret = need_bootstrap();
// don't add ourself
if (e.id == m_id) return ret;
table_t::iterator i = find_bucket(e.id);
bucket_t& b = i->live_nodes;
bucket_t& rb = i->replacements;
// if the replacement cache is full, we don't
// need another node. The table is fine the
// way it is.
if ((int)rb.size() >= m_bucket_size) return;
if ((int)rb.size() >= m_bucket_size) return ret;
// if the node already exists, we don't need it
if (std::find_if(b.begin(), b.end(), bind(&node_entry::id, _1) == id)
!= b.end()) return;
bucket_t::iterator j = std::find_if(b.begin(), b.end()
, bind(&node_entry::id, _1) == e.id);
if (std::find_if(rb.begin(), rb.end(), bind(&node_entry::id, _1) == id)
!= rb.end()) return;
if (b.size() < m_bucket_size)
if (j != b.end())
{
if (bucket_index < m_lowest_active_bucket
&& bucket_index > 0)
m_lowest_active_bucket = bucket_index;
b.push_back(node_entry(id, ep, false));
return;
// we already have the node in our bucket
// just move it to the back since it was
// the last node we had any contact with
// in this bucket
*j = e;
// TORRENT_LOG(table) << "updating node: " << i->id << " " << i->addr;
return ret;
}
if (rb.size() < m_bucket_size)
rb.push_back(node_entry(id, ep, false));
if (std::find_if(rb.begin(), rb.end(), bind(&node_entry::id, _1) == e.id)
!= rb.end()) return ret;
// if the node was not present in our list
// we will only insert it if there is room
// for it, or if some of our nodes have gone
// offline
if (b.size() < m_bucket_size)
{
if (b.empty()) b.reserve(m_bucket_size);
b.push_back(e);
// TORRENT_LOG(table) << "inserting node: " << e.id << " " << e.addr;
return ret;
}
// if there is no room, we look for nodes that are not 'pinged',
// i.e. we haven't confirmed that they respond to messages.
// Then we look for nodes marked as stale
// in the k-bucket. If we find one, we can replace it.
// can we split the bucket?
bool can_split = false;
if (e.pinged() && e.fail_count() == 0)
{
// only nodes that are pinged and haven't failed
// can split the bucket, and we can only split
// the last bucket
can_split = (boost::next(i) == m_buckets.end() && m_buckets.size() < 160);
// if the node we're trying to insert is considered pinged,
// we may replace other nodes that aren't pinged
j = std::find_if(b.begin(), b.end(), bind(&node_entry::pinged, _1) == false);
if (j != b.end() && !j->pinged())
{
// j points to a node that has not been pinged.
// Replace it with this new one
b.erase(j);
b.push_back(e);
// TORRENT_LOG(table) << "replacing unpinged node: " << e.id << " " << e.addr;
return ret;
}
// A node is considered stale if it has failed at least one
// time. Here we choose the node that has failed most times.
// If we don't find one, place this node in the replacement-
// cache and replace any nodes that will fail in the future
// with nodes from that cache.
j = std::max_element(b.begin(), b.end()
, bind(&node_entry::fail_count, _1)
< bind(&node_entry::fail_count, _2));
if (j != b.end() && j->fail_count() > 0)
{
// i points to a node that has been marked
// as stale. Replace it with this new one
b.erase(j);
b.push_back(e);
// TORRENT_LOG(table) << "replacing stale node: " << e.id << " " << e.addr;
return ret;
}
}
// if we can't split, try to insert into the replacement bucket
if (!can_split)
{
// if we don't have any identified stale nodes in
// the bucket, and the bucket is full, we have to
// cache this node and wait until some node fails
// and then replace it.
j = std::find_if(rb.begin(), rb.end()
, bind(&node_entry::id, _1) == e.id);
// if the node is already in the replacement bucket
// just return.
if (j != rb.end())
{
// make sure we mark this node as pinged
// and if its address has changed, update
// that as well
*j = e;
return ret;
}
if ((int)rb.size() >= m_bucket_size)
{
// if the replacement bucket is full, remove the oldest entry
// but prefer nodes that haven't been pinged, since they are
// less reliable than this one, that has been pinged
j = std::find_if(rb.begin(), rb.end(), bind(&node_entry::pinged, _1) == false);
rb.erase(j != rb.end() ? j : rb.begin());
}
if (rb.empty()) rb.reserve(m_bucket_size);
rb.push_back(e);
// TORRENT_LOG(table) << "inserting node in replacement cache: " << e.id << " " << e.addr;
return ret;
}
// this is the last bucket, and it's full already. Split
// it by adding another bucket
m_buckets.push_back(routing_table_node());
bucket_t& new_bucket = m_buckets.back().live_nodes;
bucket_t& new_replacement_bucket = m_buckets.back().replacements;
// move any node whose (160 - distane_exp(m_id, id)) >= (i - m_buckets.begin())
// to the new bucket
int bucket_index = std::distance(m_buckets.begin(), i);
for (bucket_t::iterator j = b.begin(); j != b.end();)
{
if (distance_exp(m_id, j->id) >= 159 - bucket_index)
{
++j;
continue;
}
// this entry belongs in the new bucket
new_bucket.push_back(*j);
j = b.erase(j);
}
for (bucket_t::iterator j = rb.begin(); j != rb.end();)
{
if (distance_exp(m_id, j->id) >= 159 - bucket_index)
{
++j;
continue;
}
// this entry belongs in the new bucket
new_replacement_bucket.push_back(*j);
j = rb.erase(j);
}
// now insert the new node in the appropriate bucket
if (distance_exp(m_id, e.id) >= 159 - bucket_index)
{
if (b.size() < m_bucket_size)
b.push_back(e);
else if (rb.size() < m_bucket_size)
rb.push_back(e);
}
else
{
if (new_bucket.size() < m_bucket_size)
new_bucket.push_back(e);
else if (new_replacement_bucket.size() < m_bucket_size)
new_replacement_bucket.push_back(e);
}
return ret;
}
void routing_table::for_each_node(
void (*fun1)(void*, node_entry const&)
, void (*fun2)(void*, node_entry const&)
, void* userdata) const
{
for (table_t::const_iterator i = m_buckets.begin()
, end(m_buckets.end()); i != end; ++i)
{
if (fun1)
{
for (bucket_t::const_iterator j = i->live_nodes.begin()
, end(i->live_nodes.end()); j != end; ++j)
fun1(userdata, *j);
}
if (fun2)
{
for (bucket_t::const_iterator j = i->replacements.begin()
, end(i->replacements.end()); j != end; ++j)
fun2(userdata, *j);
}
}
}
void routing_table::node_failed(node_id const& id)
{
int bucket_index = distance_exp(m_id, id);
TORRENT_ASSERT(bucket_index < (int)m_buckets.size());
TORRENT_ASSERT(bucket_index >= 0);
bucket_t& b = m_buckets[bucket_index].first;
bucket_t& rb = m_buckets[bucket_index].second;
// if messages to ourself fails, ignore it
if (id == m_id) return;
bucket_t::iterator i = std::find_if(b.begin(), b.end()
table_t::iterator i = find_bucket(id);
bucket_t& b = i->live_nodes;
bucket_t& rb = i->replacements;
bucket_t::iterator j = std::find_if(b.begin(), b.end()
, bind(&node_entry::id, _1) == id);
if (i == b.end()) return;
// if messages to ourself fails, ignore it
if (bucket_index == 0) return;
if (j == b.end()) return;
if (rb.empty())
{
i->timed_out();
j->timed_out();
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(table) << " NODE FAILED"
@ -259,25 +478,17 @@ void routing_table::node_failed(node_id const& id)
// if this node has failed too many times, or if this node
// has never responded at all, remove it
if (i->fail_count() >= m_settings.max_fail_count || !i->pinged())
{
b.erase(i);
TORRENT_ASSERT(m_lowest_active_bucket <= bucket_index);
while (m_lowest_active_bucket < 160
&& m_buckets[m_lowest_active_bucket].first.empty())
{
++m_lowest_active_bucket;
}
}
if (j->fail_count() >= m_settings.max_fail_count || !j->pinged())
b.erase(j);
return;
}
b.erase(i);
b.erase(j);
i = std::find_if(rb.begin(), rb.end(), bind(&node_entry::pinged, _1) == true);
if (i == rb.end()) i = rb.begin();
b.push_back(*i);
rb.erase(i);
j = std::find_if(rb.begin(), rb.end(), bind(&node_entry::pinged, _1) == true);
if (j == rb.end()) j = rb.begin();
b.push_back(*j);
rb.erase(j);
}
void routing_table::add_router_node(udp::endpoint router)
@ -285,6 +496,13 @@ void routing_table::add_router_node(udp::endpoint router)
m_router_nodes.insert(router);
}
// we heard from this node, but we don't know if it
// was spoofed or not (i.e. pinged == false)
void routing_table::heard_about(node_id const& id, udp::endpoint const& ep)
{
add_node(node_entry(id, ep, false));
}
// this function is called every time the node sees
// a sign of a node being alive. This node will either
// be inserted in the k-buckets or be moved to the top
@ -292,132 +510,21 @@ void routing_table::add_router_node(udp::endpoint router)
// the return value indicates if the table needs a refresh.
// if true, the node should refresh the table (i.e. do a find_node
// on its own id)
bool routing_table::node_seen(node_id const& id, udp::endpoint addr)
bool routing_table::node_seen(node_id const& id, udp::endpoint ep)
{
if (m_router_nodes.find(addr) != m_router_nodes.end()) return false;
int bucket_index = distance_exp(m_id, id);
TORRENT_ASSERT(bucket_index < (int)m_buckets.size());
TORRENT_ASSERT(bucket_index >= 0);
bucket_t& b = m_buckets[bucket_index].first;
bucket_t::iterator i = std::find_if(b.begin(), b.end()
, bind(&node_entry::id, _1) == id);
bool ret = need_bootstrap();
//m_bucket_activity[bucket_index] = time_now();
if (i != b.end())
{
// we already have the node in our bucket
// just move it to the back since it was
// the last node we had any contact with
// in this bucket
i->set_pinged();
i->reset_fail_count();
i->addr = addr.address();
i->port = addr.port();
// TORRENT_LOG(table) << "updating node: " << id << " " << addr;
return ret;
}
// if the node was not present in our list
// we will only insert it if there is room
// for it, or if some of our nodes have gone
// offline
if ((int)b.size() < m_bucket_size)
{
if (b.empty()) b.reserve(m_bucket_size);
b.push_back(node_entry(id, addr, true));
// if bucket index is 0, the node is ourselves
// don't updated m_lowest_active_bucket
if (bucket_index < m_lowest_active_bucket
&& bucket_index > 0)
m_lowest_active_bucket = bucket_index;
// TORRENT_LOG(table) << "inserting node: " << id << " " << addr;
return ret;
}
// if there is no room, we look for nodes that are not 'pinged',
// i.e. we haven't confirmed that they respond to messages.
// Then we look for nodes marked as stale
// in the k-bucket. If we find one, we can replace it.
i = std::find_if(b.begin(), b.end(), bind(&node_entry::pinged, _1) == false);
if (i != b.end() && !i->pinged())
{
// i points to a node that has not been pinged.
// Replace it with this new one
b.erase(i);
b.push_back(node_entry(id, addr, true));
// TORRENT_LOG(table) << "replacing unpinged node: " << id << " " << addr;
return ret;
}
// A node is considered stale if it has failed at least one
// time. Here we choose the node that has failed most times.
// If we don't find one, place this node in the replacement-
// cache and replace any nodes that will fail in the future
// with nodes from that cache.
i = std::max_element(b.begin(), b.end()
, bind(&node_entry::fail_count, _1)
< bind(&node_entry::fail_count, _2));
if (i != b.end() && i->fail_count() > 0)
{
// i points to a node that has been marked
// as stale. Replace it with this new one
b.erase(i);
b.push_back(node_entry(id, addr, true));
// TORRENT_LOG(table) << "replacing stale node: " << id << " " << addr;
return ret;
}
// if we don't have any identified stale nodes in
// the bucket, and the bucket is full, we have to
// cache this node and wait until some node fails
// and then replace it.
bucket_t& rb = m_buckets[bucket_index].second;
i = std::find_if(rb.begin(), rb.end()
, bind(&node_entry::id, _1) == id);
// if the node is already in the replacement bucket
// just return.
if (i != rb.end())
{
// make sure we mark this node as pinged
// and if its address has changed, update
// that as well
i->set_pinged();
i->reset_fail_count();
i->addr = addr.address();
i->port = addr.port();
return ret;
}
if ((int)rb.size() >= m_bucket_size)
{
// if the replacement bucket is full, remove the oldest entry
// but prefer nodes that haven't been pinged, since they are
// less reliable than this one, that has been pinged
i = std::find_if(rb.begin(), rb.end(), bind(&node_entry::pinged, _1) == false);
rb.erase(i != rb.end() ? i : rb.begin());
}
if (rb.empty()) rb.reserve(m_bucket_size);
rb.push_back(node_entry(id, addr, true));
// TORRENT_LOG(table) << "inserting node in replacement cache: " << id << " " << addr;
return ret;
return add_node(node_entry(id, ep, true));
}
bool routing_table::need_bootstrap() const
{
for (const_iterator i = begin(); i != end(); ++i)
for (table_t::const_iterator i = m_buckets.begin()
, end(m_buckets.end()); i != end; ++i)
{
if (i->confirmed()) return false;
for (bucket_t::const_iterator j = i->live_nodes.begin()
, end(i->live_nodes.end()); j != end; ++i)
{
if (j->confirmed()) return false;
}
}
return true;
}
@ -456,8 +563,8 @@ void routing_table::find_node(node_id const& target
if (count == 0) count = m_bucket_size;
l.reserve(count);
int bucket_index = distance_exp(m_id, target);
bucket_t& b = m_buckets[bucket_index].first;
table_t::iterator i = find_bucket(target);
bucket_t& b = i->live_nodes;
// copy all nodes that hasn't failed into the target
// vector.
@ -474,61 +581,43 @@ void routing_table::find_node(node_id const& target
}
TORRENT_ASSERT((int)l.size() <= count);
if (int(l.size()) == count)
{
TORRENT_ASSERT((options & include_failed)
|| std::count_if(l.begin(), l.end()
, !boost::bind(&node_entry::confirmed, _1)) == 0);
return;
}
if (int(l.size()) >= count) return;
// if we didn't have enough nodes in that bucket
// we have to reply with nodes from buckets closer
// to us. i.e. all the buckets in the range
// [0, bucket_index) if we are to include ourself
// or [1, bucket_index) if not.
bucket_t tmpb;
for (int i = (options & include_self)?0:1; i < bucket_index; ++i)
// to us.
table_t::iterator j = i;
++j;
for (; j != m_buckets.end() && l.size() < count; ++j)
{
bucket_t& b = m_buckets[i].first;
bucket_t& b = j->live_nodes;
size_t to_copy = (std::min)(count - l.size(), b.size());
if (options & include_failed)
{
copy(b.begin(), b.end(), std::back_inserter(tmpb));
copy(b.begin(), b.begin() + to_copy
, std::back_inserter(l));
}
else
{
std::remove_copy_if(b.begin(), b.end(), std::back_inserter(tmpb)
std::remove_copy_if(b.begin(), b.begin() + to_copy
, std::back_inserter(l)
, !bind(&node_entry::confirmed, _1));
}
}
if (count - l.size() < tmpb.size())
{
std::random_shuffle(tmpb.begin(), tmpb.end());
size_t to_copy = count - l.size();
std::copy(tmpb.begin(), tmpb.begin() + to_copy, std::back_inserter(l));
}
else
{
std::copy(tmpb.begin(), tmpb.end(), std::back_inserter(l));
}
if (int(l.size()) >= count) return;
TORRENT_ASSERT((int)l.size() <= count);
// if we still don't have enough nodes, copy nodes
// further away from us
// return if we have enough nodes or if the bucket index
// is the biggest index available (there are no more buckets)
// to look in.
if (int(l.size()) == count)
{
TORRENT_ASSERT((options & include_failed)
|| std::count_if(l.begin(), l.end()
, !boost::bind(&node_entry::confirmed, _1)) == 0);
return;
}
if (i == m_buckets.begin()) return;
j = i;
--j;
for (size_t i = bucket_index + 1; i < m_buckets.size(); ++i)
do
{
bucket_t& b = m_buckets[i].first;
bucket_t& b = j->live_nodes;
size_t to_copy = (std::min)(count - l.size(), b.size());
if (options & include_failed)
@ -540,22 +629,10 @@ void routing_table::find_node(node_id const& target
copy_if_n(b.begin(), b.end(), std::back_inserter(l)
, to_copy, bind(&node_entry::confirmed, _1));
}
TORRENT_ASSERT((int)l.size() <= count);
if (int(l.size()) == count)
{
TORRENT_ASSERT((options & include_failed)
|| std::count_if(l.begin(), l.end()
, !boost::bind(&node_entry::confirmed, _1)) == 0);
return;
}
while (j != m_buckets.begin() && l.size() < count);
}
TORRENT_ASSERT((int)l.size() <= count);
TORRENT_ASSERT((options & include_failed)
|| std::count_if(l.begin(), l.end()
, !boost::bind(&node_entry::confirmed, _1)) == 0);
}
/*
routing_table::iterator routing_table::begin() const
{
// +1 to avoid ourself
@ -566,6 +643,6 @@ routing_table::iterator routing_table::end() const
{
return iterator(m_buckets.end(), m_buckets.end());
}
*/
} } // namespace libtorrent::dht

View File

@ -271,7 +271,7 @@ void rpc_manager::unreachable(udp::endpoint const& ep)
// defined in node.cpp
void incoming_error(entry& e, char const* msg);
bool rpc_manager::incoming(msg const& m)
bool rpc_manager::incoming(msg const& m, node_id* id)
{
INVARIANT_CHECK;
@ -342,7 +342,11 @@ bool rpc_manager::incoming(msg const& m)
<< tid << " from " << m.addr;
#endif
o->reply(m);
return m_table.node_seen(node_id(node_id_ent->string_ptr()), m.addr);
*id = node_id(node_id_ent->string_ptr());
// we found an observer for this reply, hence the node is not spoofing
// add it to the routing table
return m_table.node_seen(*id, m.addr);
}
time_duration rpc_manager::tick()

View File

@ -138,6 +138,16 @@ void add_and_replace(libtorrent::dht::node_id& dst, libtorrent::dht::node_id con
carry = sum > 255;
}
}
void node_push_back(void* userdata, libtorrent::dht::node_entry const& n)
{
using namespace libtorrent::dht;
std::vector<node_entry>* nv = (std::vector<node_entry>*)userdata;
nv->push_back(n);
}
void nop(void* userdata, libtorrent::dht::node_entry const& n) {}
#endif
char upnp_xml[] =
@ -1021,39 +1031,39 @@ int test_main()
// test kademlia routing table
dht_settings s;
node_id id = to_hash("6123456789abcdef01232456789abcdef0123456");
node_id id = to_hash("3123456789abcdef01232456789abcdef0123456");
dht::routing_table table(id, 10, s);
table.node_seen(id, udp::endpoint(address_v4::any(), rand()));
node_id tmp;
node_id diff = to_hash("00001f7459456a9453f8719b09547c11d5f34064");
node_id tmp = id;
node_id diff = to_hash("15764f7459456a9453f8719b09547c11d5f34061");
std::vector<node_entry> nodes;
for (int i = 0; i < 1000; ++i)
for (int i = 0; i < 7000; ++i)
{
table.node_seen(tmp, udp::endpoint(address_v4::any(), rand()));
add_and_replace(tmp, diff);
}
TEST_EQUAL(table.num_active_buckets(), 11);
std::copy(table.begin(), table.end(), std::back_inserter(nodes));
#if defined TORRENT_DHT_VERBOSE_LOGGING || defined TORRENT_DEBUG
table.print_state(std::cerr);
#endif
table.for_each_node(node_push_back, nop, &nodes);
std::cout << "nodes: " << nodes.size() << std::endl;
std::vector<node_entry> temp;
std::generate(tmp.begin(), tmp.end(), &std::rand);
table.find_node(tmp, temp, 0, nodes.size() + 1);
table.find_node(tmp, temp, 0, nodes.size() * 2);
std::cout << "returned: " << temp.size() << std::endl;
TEST_CHECK(temp.size() == nodes.size());
std::generate(tmp.begin(), tmp.end(), &std::rand);
table.find_node(tmp, temp, routing_table::include_self, nodes.size() + 1);
std::cout << "returned: " << temp.size() << std::endl;
TEST_CHECK(temp.size() == nodes.size() + 1);
TEST_EQUAL(temp.size(), nodes.size());
std::generate(tmp.begin(), tmp.end(), &std::rand);
table.find_node(tmp, temp, 0, 7);
std::cout << "returned: " << temp.size() << std::endl;
TEST_CHECK(temp.size() == 7);
TEST_EQUAL(temp.size(), 7);
std::sort(nodes.begin(), nodes.end(), bind(&compare_ref
, bind(&node_entry::id, _1)
@ -1073,7 +1083,7 @@ int test_main()
std::generate(tmp.begin(), tmp.end(), &std::rand);
table.find_node(tmp, temp, 0, 15);
std::cout << "returned: " << temp.size() << std::endl;
TEST_CHECK(temp.size() == (std::min)(15, int(nodes.size())));
TEST_EQUAL(temp.size(), (std::min)(15, int(nodes.size())));
std::sort(nodes.begin(), nodes.end(), bind(&compare_ref
, bind(&node_entry::id, _1)