DHT fixes. fixed bucket refresh issue. re-introduced refreshing our own bucket regularly. added more DHT instrumentation in session_status. added alert for when bootstrap is complete. Fixed bootstrap to ping remaining nodes when search is complete. Completed recent DHT observer rewrite to remove some redundant fields

This commit is contained in:
Arvid Norberg 2010-12-12 03:17:08 +00:00
parent 67d1c57b5e
commit b56a1638ae
16 changed files with 231 additions and 73 deletions

View File

@ -616,6 +616,16 @@ struct has the following members::
int timeouts;
int responses;
int branch_factor;
int nodes_left;
int last_sent;
int first_timeout;
};
struct dht_routing_bucket
{
int num_nodes;
int num_replacements;
int last_active;
};
struct utp_status
@ -671,6 +681,7 @@ struct has the following members::
int dht_torrents;
size_type dht_global_nodes;
std::vector<dht_lookup> active_requests;
std::vector<dht_routing_table> dht_routing_table;
int dht_total_allocations;
utp_status utp_stats;
@ -737,6 +748,9 @@ network.
``active_requests`` is a vector of the currently running DHT lookups.
``dht_routing_table`` contains information about every bucket in the DHT routing
table.
``dht_total_allocations`` is the number of nodes allocated dynamically for a
particular DHT lookup. This represents roughly the amount of memory used
by the DHT.
@ -5480,6 +5494,9 @@ is a bitmask with the following bits:
| | These alerts contain all statistics counters for the interval since |
| | the lasts stats alert. |
+--------------------------------+---------------------------------------------------------------------+
| ``dht_notification`` | Alerts on events in the DHT node. For incoming searches or |
| | bootstrapping being done etc. |
+--------------------------------+---------------------------------------------------------------------+
| ``all_categories`` | The full bitmask, representing all available categories. |
+--------------------------------+---------------------------------------------------------------------+
@ -5520,6 +5537,7 @@ is its synopsis:
ip_block_notification = *implementation defined*,
performance_warning = *implementation defined*,
dht_notification = *implementation defined*,
stats_notification = *implementation defined*,
all_categories = *implementation defined*
};
@ -5851,23 +5869,6 @@ the DHT.
The ``num_peers`` tells how many peers were returned from the tracker. This is
not necessarily all new peers, some of them may already be connected.
dht_reply_alert
---------------
This alert is generated each time the DHT receives peers from a node. ``num_peers``
is the number of peers we received in this packet. Typically these packets are
received from multiple DHT nodes, and so the alerts are typically generated
a few at a time.
::
struct dht_reply_alert: tracker_alert
{
// ...
int num_peers;
};
tracker_warning_alert
---------------------
@ -6559,6 +6560,35 @@ It belongs to the ``dht_notification`` category.
sha1_hash info_hash;
};
dht_reply_alert
---------------
This alert is generated each time the DHT receives peers from a node. ``num_peers``
is the number of peers we received in this packet. Typically these packets are
received from multiple DHT nodes, and so the alerts are typically generated
a few at a time.
::
struct dht_reply_alert: tracker_alert
{
// ...
int num_peers;
};
dht_bootstrap_alert
-------------------
This alert is posted when the initial DHT bootstrap is done. There's no any other
relevant information associated with this alert.
::
struct dht_bootstrap_alert: alert
{
// ...
};
anonymous_mode_alert
--------------------

View File

@ -1556,18 +1556,30 @@ int main(int argc, char* argv[])
#ifndef TORRENT_DISABLE_DHT
if (show_dht_status)
{
snprintf(str, sizeof(str), "DHT nodes: %d DHT cached nodes: %d total DHT size: %"PRId64" total observers: %d\n"
snprintf(str, sizeof(str), "DHT nodes: %d DHT cached nodes: %d "
"total DHT size: %"PRId64" total observers: %d\n"
, sess_stat.dht_nodes, sess_stat.dht_node_cache, sess_stat.dht_global_nodes
, sess_stat.dht_total_allocations);
out += str;
int bucket = 0;
for (std::vector<dht_routing_bucket>::iterator i = sess_stat.dht_routing_table.begin()
, end(sess_stat.dht_routing_table.end()); i != end; ++i, ++bucket)
{
snprintf(str, sizeof(str)
, "%3d [%2d, %2d] active: %d\n"
, bucket, i->num_nodes, i->num_replacements, i->last_active);
out += str;
}
for (std::vector<dht_lookup>::iterator i = sess_stat.active_requests.begin()
, end(sess_stat.active_requests.end()); i != end; ++i)
{
snprintf(str, sizeof(str)
, " %s in flight: %d [limit: %d] timeouts %d responses %d left %d\n"
, " %s in flight: %d [limit: %d] timeouts: %d responses: %d "
"left: %d last_sent: %d 1st-timeout: %d\n"
, i->type, i->outstanding_requests, i->branch_factor, i->timeouts
, i->responses, i->nodes_left);
, i->responses, i->nodes_left, i->last_sent, i->first_timeout);
out += str;
}
}

View File

@ -1184,6 +1184,16 @@ namespace libtorrent
std::string trackerid;
};
struct TORRENT_EXPORT dht_bootstrap_alert: alert
{
dht_bootstrap_alert() {}
TORRENT_DEFINE_ALERT(dht_bootstrap_alert);
const static int static_category = alert::dht_notification;
virtual std::string message() const;
};
}

View File

@ -160,7 +160,7 @@ public:
: observer(algo, ep, id)
{}
void reply(msg const&) { m_done = true; }
void reply(msg const&) { flags |= flag_done; }
};
struct count_peers

View File

@ -58,7 +58,7 @@ TORRENT_EXPORT void intrusive_ptr_release(observer const*);
// 16 4 4 pool_allocator
// 20 16 4 m_addr
// 36 2 2 m_port
// 38 1 1 m_is_v6, m_short_timeout, m_in_constructor, m_was_sent
// 38 1 1 flags
// 39 1 1 <padding>
// 40
@ -69,14 +69,11 @@ struct observer : boost::noncopyable
observer(boost::intrusive_ptr<traversal_algorithm> const& a
, udp::endpoint const& ep, node_id const& id)
: flags(0)
, m_sent()
: m_sent()
, m_refs(0)
, m_algorithm(a)
, m_id(id)
, m_is_v6(false)
, m_short_timeout(false)
, m_done(false)
, flags(0)
{
TORRENT_ASSERT(a);
#ifdef TORRENT_DEBUG
@ -95,7 +92,7 @@ struct observer : boost::noncopyable
// a few seconds, before the request has timed out
void short_timeout();
bool has_short_timeout() const { return m_short_timeout; }
bool has_short_timeout() const { return flags & flag_short_timeout; }
// this is called when no reply has been received within
// some timeout
@ -129,9 +126,9 @@ struct observer : boost::noncopyable
flag_short_timeout = 8,
flag_failed = 16,
flag_ipv6_address = 32,
flag_alive = 64
flag_alive = 64,
flag_done = 128
};
unsigned char flags;
#ifndef TORRENT_DHT_VERBOSE_LOGGING
protected:
@ -160,15 +157,10 @@ protected:
// the transaction ID for this call
boost::uint16_t m_transaction_id;
bool m_is_v6:1;
bool m_short_timeout:1;
// when true, this observer has reported
// back to the traversal algorithm already
bool m_done:1;
public:
unsigned char flags;
#ifdef TORRENT_DEBUG
public:
bool m_in_constructor:1;
bool m_was_sent:1;
#endif

View File

@ -59,6 +59,20 @@ protected:
virtual bool invoke(observer_ptr o);
};
class bootstrap : public refresh
{
public:
bootstrap(node_impl& node, node_id target
, done_callback const& callback);
virtual char const* name() const;
protected:
virtual void done();
};
} } // namespace libtorrent::dht
#endif // REFRESH_050324_HPP

View File

@ -184,6 +184,15 @@ private:
// the last time need_bootstrap() returned true
mutable ptime m_last_bootstrap;
// the last time the routing table was refreshed.
// this is used to stagger buckets needing refresh
// to be at least 45 seconds apart.
mutable ptime m_last_refresh;
// the last time we refreshed our own bucket
// refreshed every 15 minutes
mutable ptime m_last_self_refresh;
// this is a set of all the endpoints that have
// been identified as router nodes. They will

View File

@ -59,7 +59,7 @@ struct null_observer : public observer
{
null_observer(boost::intrusive_ptr<traversal_algorithm> const& a
, udp::endpoint const& ep, node_id const& id): observer(a, ep, id) {}
virtual void reply(msg const&) { m_done = true; }
virtual void reply(msg const&) { flags |= flag_done; }
};
class routing_table;

View File

@ -50,6 +50,24 @@ namespace libtorrent
int responses;
int branch_factor;
int nodes_left;
// the number of seconds ago the
// last message was sent that's still
// outstanding
int last_sent;
// the number of outstanding requests
// that have exceeded the short timeout
// and are considered timed out in the
// sense that they increased the branch
// factor
int first_timeout;
};
struct dht_routing_bucket
{
int num_nodes;
int num_replacements;
// number of seconds since last activity
int last_active;
};
#endif
@ -114,6 +132,7 @@ namespace libtorrent
int dht_torrents;
size_type dht_global_nodes;
std::vector<dht_lookup> active_requests;
std::vector<dht_routing_bucket> dht_routing_table;
int dht_total_allocations;
#endif

View File

@ -491,5 +491,10 @@ namespace libtorrent {
return "trackerid received: " + trackerid;
}
std::string dht_bootstrap_alert::message() const
{
return "DHT bootstrap complete";
}
} // namespace libtorrent

View File

@ -608,7 +608,9 @@ namespace libtorrent { namespace dht
}
void dht_tracker::on_bootstrap(std::vector<std::pair<node_entry, std::string> > const&)
{}
{
// #error post an alert
}
bool dht_tracker::send_packet(libtorrent::entry const& e, udp::endpoint const& addr, int send_flags)
{

View File

@ -255,7 +255,7 @@ void node_impl::refresh(node_id const& id
void node_impl::bootstrap(std::vector<udp::endpoint> const& nodes
, find_data::nodes_callback const& f)
{
boost::intrusive_ptr<dht::refresh> r(new dht::refresh(*this, m_id, f));
boost::intrusive_ptr<dht::refresh> r(new dht::bootstrap(*this, m_id, f));
#ifdef TORRENT_DHT_VERBOSE_LOGGING
int count = 0;
@ -275,13 +275,7 @@ void node_impl::bootstrap(std::vector<udp::endpoint> const& nodes
#endif
r->start();
}
/*
void node_impl::refresh()
{
boost::intrusive_ptr<dht::refresh> r(new dht::refresh(*this, m_id, boost::bind(&nop)));
r->start();
}
*/
int node_impl::bucket_size(int bucket)
{
return m_table.bucket_size(bucket);

View File

@ -79,5 +79,32 @@ bool refresh::invoke(observer_ptr o)
return true;
}
bootstrap::bootstrap(
node_impl& node
, node_id target
, done_callback const& callback)
: refresh(node, target, callback)
{
}
char const* bootstrap::name() const { return "bootstrap"; }
void bootstrap::done()
{
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(traversal) << " [" << this << "]"
<< " bootstrap done, pinging remaining nodes";
#endif
for (std::vector<observer_ptr>::iterator i = m_results.begin()
, end(m_results.end()); i != end; ++i)
{
if ((*i)->flags & observer::flag_queried) continue;
// this will send a ping
m_node.add_node((*i)->target_ep());
}
refresh::done();
}
} } // namespace libtorrent::dht

View File

@ -61,6 +61,8 @@ routing_table::routing_table(node_id const& id, int bucket_size
, m_settings(settings)
, m_id(id)
, m_last_bootstrap(min_time())
, m_last_refresh(min_time())
, m_last_self_refresh(min_time())
{
}
@ -68,6 +70,18 @@ void routing_table::status(session_status& s) const
{
boost::tie(s.dht_nodes, s.dht_node_cache) = size();
s.dht_global_nodes = num_global_nodes();
ptime now = time_now();
for (table_t::const_iterator i = m_buckets.begin()
, end(m_buckets.end()); i != end; ++i)
{
dht_routing_bucket b;
b.num_nodes = i->live_nodes.size();
b.num_replacements = i->replacements.size();
b.last_active = total_seconds(now - i->last_active);
s.dht_routing_table.push_back(b);
}
}
boost::tuple<int, int> routing_table::size() const
@ -141,8 +155,8 @@ void routing_table::print_state(std::ostream& os) const
i != end; ++i, ++bucket_index)
{
// if (i->live_nodes.empty()) continue;
os << "=== BUCKET = " << bucket_index
<< " = " << total_seconds(time_now() - i->last_active)
os << "=== BUCKET == " << bucket_index
<< " == " << total_seconds(time_now() - i->last_active)
<< " seconds ago ===== \n";
for (bucket_t::const_iterator j = i->live_nodes.begin()
, end(i->live_nodes.end()); j != end; ++j)
@ -167,13 +181,24 @@ void routing_table::touch_bucket(node_id const& target)
bool routing_table::need_refresh(node_id& target) const
{
ptime now = time_now();
// refresh our own bucket once every 15 minutes
if (now - m_last_self_refresh < minutes(15))
{
m_last_self_refresh = now;
target = m_id;
return true;
}
if (m_buckets.empty()) return false;
table_t::const_iterator i = std::min_element(m_buckets.begin(), m_buckets.end()
, boost::bind(&routing_table_node::last_active, _1)
< boost::bind(&routing_table_node::last_active, _2));
if (time_now() - i->last_active < minutes(15)) return false;
if (now - i->last_active < minutes(15)) return false;
if (now - m_last_refresh < seconds(45)) return false;
// generate a random node_id within the given bucket
target = generate_id(address());
@ -195,6 +220,9 @@ bool routing_table::need_refresh(node_id& target) const
(~(m_id[(num_bits - 1) / 8])) & (0x80 >> ((num_bits - 1) % 8));
TORRENT_ASSERT(distance_exp(m_id, target) == 160 - num_bits);
TORRENT_LOG(table) << "need_refresh [ bucket: " << num_bits << " target: " << target << " ]";
m_last_refresh = now;
return true;
}
@ -216,6 +244,7 @@ routing_table::table_t::iterator routing_table::find_bucket(node_id const& id)
if (num_buckets == 0)
{
m_buckets.push_back(routing_table_node());
m_buckets.back().last_active = min_time();
++num_buckets;
}
@ -368,6 +397,7 @@ bool routing_table::add_node(node_entry const& e)
// this is the last bucket, and it's full already. Split
// it by adding another bucket
m_buckets.push_back(routing_table_node());
m_buckets.back().last_active = min_time();
bucket_t& new_bucket = m_buckets.back().live_nodes;
bucket_t& new_replacement_bucket = m_buckets.back().replacements;

View File

@ -97,13 +97,13 @@ void observer::set_target(udp::endpoint const& ep)
#if TORRENT_USE_IPV6
if (ep.address().is_v6())
{
m_is_v6 = true;
flags |= flag_ipv6_address;
m_addr.v6 = ep.address().to_v6().to_bytes();
}
else
#endif
{
m_is_v6 = false;
flags &= ~flag_ipv6_address;
m_addr.v4 = ep.address().to_v4().to_bytes();
}
}
@ -111,7 +111,7 @@ void observer::set_target(udp::endpoint const& ep)
address observer::target_addr() const
{
#if TORRENT_USE_IPV6
if (m_is_v6)
if (flags & flag_ipv6_address)
return address_v6(m_addr.v6);
else
#endif
@ -125,23 +125,21 @@ udp::endpoint observer::target_ep() const
void observer::abort()
{
if (m_done) return;
m_done = true;
if (flags & flag_done) return;
flags |= flag_done;
m_algorithm->failed(observer_ptr(this), traversal_algorithm::prevent_request);
}
void observer::done()
{
if (m_done) return;
m_done = true;
if (flags & flag_done) return;
flags |= flag_done;
m_algorithm->finished(observer_ptr(this));
}
void observer::short_timeout()
{
if (m_short_timeout) return;
TORRENT_ASSERT(m_short_timeout == false);
m_short_timeout = true;
if (flags & flag_short_timeout) return;
m_algorithm->failed(observer_ptr(this), traversal_algorithm::short_timeout);
}
@ -149,8 +147,8 @@ void observer::short_timeout()
// some timeout
void observer::timeout()
{
if (m_done) return;
m_done = true;
if (flags & flag_done) return;
flags |= flag_done;
m_algorithm->failed(observer_ptr(this));
}
@ -487,7 +485,7 @@ observer::~observer()
// reported back to the traversal_algorithm as
// well. If it wasn't sent, it cannot have been
// reported back
TORRENT_ASSERT(m_was_sent == m_done);
TORRENT_ASSERT(m_was_sent == bool(flags & flag_done));
TORRENT_ASSERT(!m_in_constructor);
}

View File

@ -32,6 +32,8 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/pch.hpp"
#include "libtorrent/time.hpp" // for total_seconds
#include <libtorrent/kademlia/traversal_algorithm.hpp>
#include <libtorrent/kademlia/routing_table.hpp>
#include <libtorrent/kademlia/rpc_manager.hpp>
@ -63,8 +65,8 @@ void traversal_algorithm::add_entry(node_id const& id, udp::endpoint addr, unsig
if (ptr == 0)
{
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(traversal) << "[" << this << "] failed to "
"allocate memory for observer. aborting!";
TORRENT_LOG(traversal) << "[" << this << ":" << name()
<< "] failed to allocate memory for observer. aborting!";
#endif
done();
return;
@ -95,7 +97,8 @@ void traversal_algorithm::add_entry(node_id const& id, udp::endpoint addr, unsig
TORRENT_ASSERT(std::find_if(m_results.begin(), m_results.end()
, boost::bind(&observer::id, _1) == id) == m_results.end());
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(traversal) << "[" << this << "] adding result: " << id << " " << addr;
TORRENT_LOG(traversal) << "[" << this << ":" << name()
<< "] adding result: " << id << " " << addr;
#endif
i = m_results.insert(i, o);
}
@ -126,8 +129,8 @@ void traversal_algorithm::traverse(node_id const& id, udp::endpoint addr)
{
#ifdef TORRENT_DHT_VERBOSE_LOGGING
if (id.is_all_zeros())
TORRENT_LOG(traversal) << time_now_string() << "[" << this << "] WARNING: "
"node returned a list which included a node with id 0";
TORRENT_LOG(traversal) << time_now_string() << "[" << this << ":" << name()
<< "] WARNING: node returned a list which included a node with id 0";
#endif
add_entry(id, addr, 0);
}
@ -146,6 +149,7 @@ void traversal_algorithm::finished(observer_ptr o)
if (o->flags & observer::flag_short_timeout)
--m_branch_factor;
TORRENT_ASSERT(o->flags & observer::flag_queried);
o->flags |= observer::flag_alive;
++m_responses;
@ -177,7 +181,8 @@ void traversal_algorithm::failed(observer_ptr o, int flags)
++m_branch_factor;
o->flags |= observer::flag_short_timeout;
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(traversal) << " [" << this << "] first chance timeout: "
TORRENT_LOG(traversal) << " [" << this << ":" << name()
<< "] first chance timeout: "
<< o->id() << " " << o->target_ep()
<< " branch-factor: " << m_branch_factor
<< " invoke-count: " << m_invoke_count;
@ -192,8 +197,8 @@ void traversal_algorithm::failed(observer_ptr o, int flags)
--m_branch_factor;
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(traversal) << " [" << this << "] failed: "
<< o->id() << " " << o->target_ep()
TORRENT_LOG(traversal) << " [" << this << ":" << name()
<< "] failed: " << o->id() << " " << o->target_ep()
<< " branch-factor: " << m_branch_factor
<< " invoke-count: " << m_invoke_count;
#endif
@ -243,7 +248,7 @@ void traversal_algorithm::add_requests()
if ((*i)->flags & observer::flag_queried) continue;
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(traversal) << " [" << this << "]"
TORRENT_LOG(traversal) << " [" << this << ":" << name() << "]"
<< " nodes-left: " << (m_results.end() - i)
<< " invoke-count: " << m_invoke_count
<< " branch-factor: " << m_branch_factor;
@ -292,12 +297,23 @@ void traversal_algorithm::status(dht_lookup& l)
l.branch_factor = m_branch_factor;
l.type = name();
l.nodes_left = 0;
l.first_timeout = 0;
int last_sent = INT_MAX;
ptime now = time_now();
for (std::vector<observer_ptr>::iterator i = m_results.begin()
, end(m_results.end()); i != end; ++i)
{
if ((*i)->flags & observer::flag_queried) continue;
observer& o = **i;
if (o.flags & observer::flag_queried)
{
last_sent = (std::min)(last_sent, total_seconds(now - o.sent()));
if (o.has_short_timeout()) ++l.first_timeout;
continue;
}
++l.nodes_left;
}
l.last_sent = last_sent;
}
} } // namespace libtorrent::dht