diff --git a/docs/manual.rst b/docs/manual.rst index 2b3fc810c..3dc64d1d4 100644 --- a/docs/manual.rst +++ b/docs/manual.rst @@ -616,6 +616,16 @@ struct has the following members:: int timeouts; int responses; int branch_factor; + int nodes_left; + int last_sent; + int first_timeout; + }; + + struct dht_routing_bucket + { + int num_nodes; + int num_replacements; + int last_active; }; struct utp_status @@ -671,6 +681,7 @@ struct has the following members:: int dht_torrents; size_type dht_global_nodes; std::vector active_requests; + std::vector dht_routing_table; int dht_total_allocations; utp_status utp_stats; @@ -737,6 +748,9 @@ network. ``active_requests`` is a vector of the currently running DHT lookups. +``dht_routing_table`` contains information about every bucket in the DHT routing +table. + ``dht_total_allocations`` is the number of nodes allocated dynamically for a particular DHT lookup. This represents roughly the amount of memory used by the DHT. @@ -5480,6 +5494,9 @@ is a bitmask with the following bits: | | These alerts contain all statistics counters for the interval since | | | the lasts stats alert. | +--------------------------------+---------------------------------------------------------------------+ +| ``dht_notification`` | Alerts on events in the DHT node. For incoming searches or | +| | bootstrapping being done etc. | ++--------------------------------+---------------------------------------------------------------------+ | ``all_categories`` | The full bitmask, representing all available categories. | +--------------------------------+---------------------------------------------------------------------+ @@ -5520,6 +5537,7 @@ is its synopsis: ip_block_notification = *implementation defined*, performance_warning = *implementation defined*, dht_notification = *implementation defined*, + stats_notification = *implementation defined*, all_categories = *implementation defined* }; @@ -5851,23 +5869,6 @@ the DHT. The ``num_peers`` tells how many peers were returned from the tracker. This is not necessarily all new peers, some of them may already be connected. -dht_reply_alert ---------------- - -This alert is generated each time the DHT receives peers from a node. ``num_peers`` -is the number of peers we received in this packet. Typically these packets are -received from multiple DHT nodes, and so the alerts are typically generated -a few at a time. - -:: - - struct dht_reply_alert: tracker_alert - { - // ... - int num_peers; - }; - - tracker_warning_alert --------------------- @@ -6559,6 +6560,35 @@ It belongs to the ``dht_notification`` category. sha1_hash info_hash; }; +dht_reply_alert +--------------- + +This alert is generated each time the DHT receives peers from a node. ``num_peers`` +is the number of peers we received in this packet. Typically these packets are +received from multiple DHT nodes, and so the alerts are typically generated +a few at a time. + +:: + + struct dht_reply_alert: tracker_alert + { + // ... + int num_peers; + }; + +dht_bootstrap_alert +------------------- + +This alert is posted when the initial DHT bootstrap is done. There's no any other +relevant information associated with this alert. + +:: + + struct dht_bootstrap_alert: alert + { + // ... + }; + anonymous_mode_alert -------------------- diff --git a/examples/client_test.cpp b/examples/client_test.cpp index 1355ec196..d65ee89ea 100644 --- a/examples/client_test.cpp +++ b/examples/client_test.cpp @@ -1556,18 +1556,30 @@ int main(int argc, char* argv[]) #ifndef TORRENT_DISABLE_DHT if (show_dht_status) { - snprintf(str, sizeof(str), "DHT nodes: %d DHT cached nodes: %d total DHT size: %"PRId64" total observers: %d\n" + snprintf(str, sizeof(str), "DHT nodes: %d DHT cached nodes: %d " + "total DHT size: %"PRId64" total observers: %d\n" , sess_stat.dht_nodes, sess_stat.dht_node_cache, sess_stat.dht_global_nodes , sess_stat.dht_total_allocations); out += str; + int bucket = 0; + for (std::vector::iterator i = sess_stat.dht_routing_table.begin() + , end(sess_stat.dht_routing_table.end()); i != end; ++i, ++bucket) + { + snprintf(str, sizeof(str) + , "%3d [%2d, %2d] active: %d\n" + , bucket, i->num_nodes, i->num_replacements, i->last_active); + out += str; + } + for (std::vector::iterator i = sess_stat.active_requests.begin() , end(sess_stat.active_requests.end()); i != end; ++i) { snprintf(str, sizeof(str) - , " %s in flight: %d [limit: %d] timeouts %d responses %d left %d\n" + , " %s in flight: %d [limit: %d] timeouts: %d responses: %d " + "left: %d last_sent: %d 1st-timeout: %d\n" , i->type, i->outstanding_requests, i->branch_factor, i->timeouts - , i->responses, i->nodes_left); + , i->responses, i->nodes_left, i->last_sent, i->first_timeout); out += str; } } diff --git a/include/libtorrent/alert_types.hpp b/include/libtorrent/alert_types.hpp index 212e23aca..1902fcc72 100644 --- a/include/libtorrent/alert_types.hpp +++ b/include/libtorrent/alert_types.hpp @@ -1184,6 +1184,16 @@ namespace libtorrent std::string trackerid; }; + struct TORRENT_EXPORT dht_bootstrap_alert: alert + { + dht_bootstrap_alert() {} + + TORRENT_DEFINE_ALERT(dht_bootstrap_alert); + + const static int static_category = alert::dht_notification; + virtual std::string message() const; + }; + } diff --git a/include/libtorrent/kademlia/node.hpp b/include/libtorrent/kademlia/node.hpp index 5ba4e48fa..7bca2a4bc 100644 --- a/include/libtorrent/kademlia/node.hpp +++ b/include/libtorrent/kademlia/node.hpp @@ -160,7 +160,7 @@ public: : observer(algo, ep, id) {} - void reply(msg const&) { m_done = true; } + void reply(msg const&) { flags |= flag_done; } }; struct count_peers diff --git a/include/libtorrent/kademlia/observer.hpp b/include/libtorrent/kademlia/observer.hpp index a1cfe1bfe..9739eab7d 100644 --- a/include/libtorrent/kademlia/observer.hpp +++ b/include/libtorrent/kademlia/observer.hpp @@ -58,7 +58,7 @@ TORRENT_EXPORT void intrusive_ptr_release(observer const*); // 16 4 4 pool_allocator // 20 16 4 m_addr // 36 2 2 m_port -// 38 1 1 m_is_v6, m_short_timeout, m_in_constructor, m_was_sent +// 38 1 1 flags // 39 1 1 // 40 @@ -69,14 +69,11 @@ struct observer : boost::noncopyable observer(boost::intrusive_ptr const& a , udp::endpoint const& ep, node_id const& id) - : flags(0) - , m_sent() + : m_sent() , m_refs(0) , m_algorithm(a) , m_id(id) - , m_is_v6(false) - , m_short_timeout(false) - , m_done(false) + , flags(0) { TORRENT_ASSERT(a); #ifdef TORRENT_DEBUG @@ -95,7 +92,7 @@ struct observer : boost::noncopyable // a few seconds, before the request has timed out void short_timeout(); - bool has_short_timeout() const { return m_short_timeout; } + bool has_short_timeout() const { return flags & flag_short_timeout; } // this is called when no reply has been received within // some timeout @@ -129,9 +126,9 @@ struct observer : boost::noncopyable flag_short_timeout = 8, flag_failed = 16, flag_ipv6_address = 32, - flag_alive = 64 + flag_alive = 64, + flag_done = 128 }; - unsigned char flags; #ifndef TORRENT_DHT_VERBOSE_LOGGING protected: @@ -160,15 +157,10 @@ protected: // the transaction ID for this call boost::uint16_t m_transaction_id; - - bool m_is_v6:1; - bool m_short_timeout:1; - // when true, this observer has reported - // back to the traversal algorithm already - bool m_done:1; +public: + unsigned char flags; #ifdef TORRENT_DEBUG -public: bool m_in_constructor:1; bool m_was_sent:1; #endif diff --git a/include/libtorrent/kademlia/refresh.hpp b/include/libtorrent/kademlia/refresh.hpp index c30476a32..80bec806b 100644 --- a/include/libtorrent/kademlia/refresh.hpp +++ b/include/libtorrent/kademlia/refresh.hpp @@ -59,6 +59,20 @@ protected: virtual bool invoke(observer_ptr o); }; +class bootstrap : public refresh +{ +public: + bootstrap(node_impl& node, node_id target + , done_callback const& callback); + + virtual char const* name() const; + +protected: + + virtual void done(); + +}; + } } // namespace libtorrent::dht #endif // REFRESH_050324_HPP diff --git a/include/libtorrent/kademlia/routing_table.hpp b/include/libtorrent/kademlia/routing_table.hpp index 3ed4a4f70..c3f2cb06f 100644 --- a/include/libtorrent/kademlia/routing_table.hpp +++ b/include/libtorrent/kademlia/routing_table.hpp @@ -184,6 +184,15 @@ private: // the last time need_bootstrap() returned true mutable ptime m_last_bootstrap; + + // the last time the routing table was refreshed. + // this is used to stagger buckets needing refresh + // to be at least 45 seconds apart. + mutable ptime m_last_refresh; + + // the last time we refreshed our own bucket + // refreshed every 15 minutes + mutable ptime m_last_self_refresh; // this is a set of all the endpoints that have // been identified as router nodes. They will diff --git a/include/libtorrent/kademlia/rpc_manager.hpp b/include/libtorrent/kademlia/rpc_manager.hpp index 7fcfad494..a1d190f27 100644 --- a/include/libtorrent/kademlia/rpc_manager.hpp +++ b/include/libtorrent/kademlia/rpc_manager.hpp @@ -59,7 +59,7 @@ struct null_observer : public observer { null_observer(boost::intrusive_ptr const& a , udp::endpoint const& ep, node_id const& id): observer(a, ep, id) {} - virtual void reply(msg const&) { m_done = true; } + virtual void reply(msg const&) { flags |= flag_done; } }; class routing_table; diff --git a/include/libtorrent/session_status.hpp b/include/libtorrent/session_status.hpp index 188951503..347d54be2 100644 --- a/include/libtorrent/session_status.hpp +++ b/include/libtorrent/session_status.hpp @@ -50,6 +50,24 @@ namespace libtorrent int responses; int branch_factor; int nodes_left; + // the number of seconds ago the + // last message was sent that's still + // outstanding + int last_sent; + // the number of outstanding requests + // that have exceeded the short timeout + // and are considered timed out in the + // sense that they increased the branch + // factor + int first_timeout; + }; + + struct dht_routing_bucket + { + int num_nodes; + int num_replacements; + // number of seconds since last activity + int last_active; }; #endif @@ -114,6 +132,7 @@ namespace libtorrent int dht_torrents; size_type dht_global_nodes; std::vector active_requests; + std::vector dht_routing_table; int dht_total_allocations; #endif diff --git a/src/alert.cpp b/src/alert.cpp index 938a19529..c3a4bb193 100644 --- a/src/alert.cpp +++ b/src/alert.cpp @@ -491,5 +491,10 @@ namespace libtorrent { return "trackerid received: " + trackerid; } + std::string dht_bootstrap_alert::message() const + { + return "DHT bootstrap complete"; + } + } // namespace libtorrent diff --git a/src/kademlia/dht_tracker.cpp b/src/kademlia/dht_tracker.cpp index e26791263..8897a396b 100644 --- a/src/kademlia/dht_tracker.cpp +++ b/src/kademlia/dht_tracker.cpp @@ -608,7 +608,9 @@ namespace libtorrent { namespace dht } void dht_tracker::on_bootstrap(std::vector > const&) - {} + { + // #error post an alert + } bool dht_tracker::send_packet(libtorrent::entry const& e, udp::endpoint const& addr, int send_flags) { diff --git a/src/kademlia/node.cpp b/src/kademlia/node.cpp index 3bd5c9cbd..4acc02b07 100644 --- a/src/kademlia/node.cpp +++ b/src/kademlia/node.cpp @@ -255,7 +255,7 @@ void node_impl::refresh(node_id const& id void node_impl::bootstrap(std::vector const& nodes , find_data::nodes_callback const& f) { - boost::intrusive_ptr r(new dht::refresh(*this, m_id, f)); + boost::intrusive_ptr r(new dht::bootstrap(*this, m_id, f)); #ifdef TORRENT_DHT_VERBOSE_LOGGING int count = 0; @@ -275,13 +275,7 @@ void node_impl::bootstrap(std::vector const& nodes #endif r->start(); } -/* -void node_impl::refresh() -{ - boost::intrusive_ptr r(new dht::refresh(*this, m_id, boost::bind(&nop))); - r->start(); -} -*/ + int node_impl::bucket_size(int bucket) { return m_table.bucket_size(bucket); diff --git a/src/kademlia/refresh.cpp b/src/kademlia/refresh.cpp index 54363189a..fda33f0c3 100644 --- a/src/kademlia/refresh.cpp +++ b/src/kademlia/refresh.cpp @@ -79,5 +79,32 @@ bool refresh::invoke(observer_ptr o) return true; } +bootstrap::bootstrap( + node_impl& node + , node_id target + , done_callback const& callback) + : refresh(node, target, callback) +{ +} + +char const* bootstrap::name() const { return "bootstrap"; } + +void bootstrap::done() +{ +#ifdef TORRENT_DHT_VERBOSE_LOGGING + TORRENT_LOG(traversal) << " [" << this << "]" + << " bootstrap done, pinging remaining nodes"; +#endif + + for (std::vector::iterator i = m_results.begin() + , end(m_results.end()); i != end; ++i) + { + if ((*i)->flags & observer::flag_queried) continue; + // this will send a ping + m_node.add_node((*i)->target_ep()); + } + refresh::done(); +} + } } // namespace libtorrent::dht diff --git a/src/kademlia/routing_table.cpp b/src/kademlia/routing_table.cpp index 40d27206f..580326554 100644 --- a/src/kademlia/routing_table.cpp +++ b/src/kademlia/routing_table.cpp @@ -61,6 +61,8 @@ routing_table::routing_table(node_id const& id, int bucket_size , m_settings(settings) , m_id(id) , m_last_bootstrap(min_time()) + , m_last_refresh(min_time()) + , m_last_self_refresh(min_time()) { } @@ -68,6 +70,18 @@ void routing_table::status(session_status& s) const { boost::tie(s.dht_nodes, s.dht_node_cache) = size(); s.dht_global_nodes = num_global_nodes(); + + ptime now = time_now(); + + for (table_t::const_iterator i = m_buckets.begin() + , end(m_buckets.end()); i != end; ++i) + { + dht_routing_bucket b; + b.num_nodes = i->live_nodes.size(); + b.num_replacements = i->replacements.size(); + b.last_active = total_seconds(now - i->last_active); + s.dht_routing_table.push_back(b); + } } boost::tuple routing_table::size() const @@ -141,8 +155,8 @@ void routing_table::print_state(std::ostream& os) const i != end; ++i, ++bucket_index) { // if (i->live_nodes.empty()) continue; - os << "=== BUCKET = " << bucket_index - << " = " << total_seconds(time_now() - i->last_active) + os << "=== BUCKET == " << bucket_index + << " == " << total_seconds(time_now() - i->last_active) << " seconds ago ===== \n"; for (bucket_t::const_iterator j = i->live_nodes.begin() , end(i->live_nodes.end()); j != end; ++j) @@ -167,13 +181,24 @@ void routing_table::touch_bucket(node_id const& target) bool routing_table::need_refresh(node_id& target) const { + ptime now = time_now(); + + // refresh our own bucket once every 15 minutes + if (now - m_last_self_refresh < minutes(15)) + { + m_last_self_refresh = now; + target = m_id; + return true; + } + if (m_buckets.empty()) return false; table_t::const_iterator i = std::min_element(m_buckets.begin(), m_buckets.end() , boost::bind(&routing_table_node::last_active, _1) < boost::bind(&routing_table_node::last_active, _2)); - if (time_now() - i->last_active < minutes(15)) return false; + if (now - i->last_active < minutes(15)) return false; + if (now - m_last_refresh < seconds(45)) return false; // generate a random node_id within the given bucket target = generate_id(address()); @@ -195,6 +220,9 @@ bool routing_table::need_refresh(node_id& target) const (~(m_id[(num_bits - 1) / 8])) & (0x80 >> ((num_bits - 1) % 8)); TORRENT_ASSERT(distance_exp(m_id, target) == 160 - num_bits); + + TORRENT_LOG(table) << "need_refresh [ bucket: " << num_bits << " target: " << target << " ]"; + m_last_refresh = now; return true; } @@ -216,6 +244,7 @@ routing_table::table_t::iterator routing_table::find_bucket(node_id const& id) if (num_buckets == 0) { m_buckets.push_back(routing_table_node()); + m_buckets.back().last_active = min_time(); ++num_buckets; } @@ -368,6 +397,7 @@ bool routing_table::add_node(node_entry const& e) // this is the last bucket, and it's full already. Split // it by adding another bucket m_buckets.push_back(routing_table_node()); + m_buckets.back().last_active = min_time(); bucket_t& new_bucket = m_buckets.back().live_nodes; bucket_t& new_replacement_bucket = m_buckets.back().replacements; diff --git a/src/kademlia/rpc_manager.cpp b/src/kademlia/rpc_manager.cpp index 3d30903b0..bd91dcb2b 100644 --- a/src/kademlia/rpc_manager.cpp +++ b/src/kademlia/rpc_manager.cpp @@ -97,13 +97,13 @@ void observer::set_target(udp::endpoint const& ep) #if TORRENT_USE_IPV6 if (ep.address().is_v6()) { - m_is_v6 = true; + flags |= flag_ipv6_address; m_addr.v6 = ep.address().to_v6().to_bytes(); } else #endif { - m_is_v6 = false; + flags &= ~flag_ipv6_address; m_addr.v4 = ep.address().to_v4().to_bytes(); } } @@ -111,7 +111,7 @@ void observer::set_target(udp::endpoint const& ep) address observer::target_addr() const { #if TORRENT_USE_IPV6 - if (m_is_v6) + if (flags & flag_ipv6_address) return address_v6(m_addr.v6); else #endif @@ -125,23 +125,21 @@ udp::endpoint observer::target_ep() const void observer::abort() { - if (m_done) return; - m_done = true; + if (flags & flag_done) return; + flags |= flag_done; m_algorithm->failed(observer_ptr(this), traversal_algorithm::prevent_request); } void observer::done() { - if (m_done) return; - m_done = true; + if (flags & flag_done) return; + flags |= flag_done; m_algorithm->finished(observer_ptr(this)); } void observer::short_timeout() { - if (m_short_timeout) return; - TORRENT_ASSERT(m_short_timeout == false); - m_short_timeout = true; + if (flags & flag_short_timeout) return; m_algorithm->failed(observer_ptr(this), traversal_algorithm::short_timeout); } @@ -149,8 +147,8 @@ void observer::short_timeout() // some timeout void observer::timeout() { - if (m_done) return; - m_done = true; + if (flags & flag_done) return; + flags |= flag_done; m_algorithm->failed(observer_ptr(this)); } @@ -487,7 +485,7 @@ observer::~observer() // reported back to the traversal_algorithm as // well. If it wasn't sent, it cannot have been // reported back - TORRENT_ASSERT(m_was_sent == m_done); + TORRENT_ASSERT(m_was_sent == bool(flags & flag_done)); TORRENT_ASSERT(!m_in_constructor); } diff --git a/src/kademlia/traversal_algorithm.cpp b/src/kademlia/traversal_algorithm.cpp index 7e5c45ce4..96befd7a3 100644 --- a/src/kademlia/traversal_algorithm.cpp +++ b/src/kademlia/traversal_algorithm.cpp @@ -32,6 +32,8 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/pch.hpp" +#include "libtorrent/time.hpp" // for total_seconds + #include #include #include @@ -63,8 +65,8 @@ void traversal_algorithm::add_entry(node_id const& id, udp::endpoint addr, unsig if (ptr == 0) { #ifdef TORRENT_DHT_VERBOSE_LOGGING - TORRENT_LOG(traversal) << "[" << this << "] failed to " - "allocate memory for observer. aborting!"; + TORRENT_LOG(traversal) << "[" << this << ":" << name() + << "] failed to allocate memory for observer. aborting!"; #endif done(); return; @@ -95,7 +97,8 @@ void traversal_algorithm::add_entry(node_id const& id, udp::endpoint addr, unsig TORRENT_ASSERT(std::find_if(m_results.begin(), m_results.end() , boost::bind(&observer::id, _1) == id) == m_results.end()); #ifdef TORRENT_DHT_VERBOSE_LOGGING - TORRENT_LOG(traversal) << "[" << this << "] adding result: " << id << " " << addr; + TORRENT_LOG(traversal) << "[" << this << ":" << name() + << "] adding result: " << id << " " << addr; #endif i = m_results.insert(i, o); } @@ -126,8 +129,8 @@ void traversal_algorithm::traverse(node_id const& id, udp::endpoint addr) { #ifdef TORRENT_DHT_VERBOSE_LOGGING if (id.is_all_zeros()) - TORRENT_LOG(traversal) << time_now_string() << "[" << this << "] WARNING: " - "node returned a list which included a node with id 0"; + TORRENT_LOG(traversal) << time_now_string() << "[" << this << ":" << name() + << "] WARNING: node returned a list which included a node with id 0"; #endif add_entry(id, addr, 0); } @@ -146,6 +149,7 @@ void traversal_algorithm::finished(observer_ptr o) if (o->flags & observer::flag_short_timeout) --m_branch_factor; + TORRENT_ASSERT(o->flags & observer::flag_queried); o->flags |= observer::flag_alive; ++m_responses; @@ -177,7 +181,8 @@ void traversal_algorithm::failed(observer_ptr o, int flags) ++m_branch_factor; o->flags |= observer::flag_short_timeout; #ifdef TORRENT_DHT_VERBOSE_LOGGING - TORRENT_LOG(traversal) << " [" << this << "] first chance timeout: " + TORRENT_LOG(traversal) << " [" << this << ":" << name() + << "] first chance timeout: " << o->id() << " " << o->target_ep() << " branch-factor: " << m_branch_factor << " invoke-count: " << m_invoke_count; @@ -192,8 +197,8 @@ void traversal_algorithm::failed(observer_ptr o, int flags) --m_branch_factor; #ifdef TORRENT_DHT_VERBOSE_LOGGING - TORRENT_LOG(traversal) << " [" << this << "] failed: " - << o->id() << " " << o->target_ep() + TORRENT_LOG(traversal) << " [" << this << ":" << name() + << "] failed: " << o->id() << " " << o->target_ep() << " branch-factor: " << m_branch_factor << " invoke-count: " << m_invoke_count; #endif @@ -243,7 +248,7 @@ void traversal_algorithm::add_requests() if ((*i)->flags & observer::flag_queried) continue; #ifdef TORRENT_DHT_VERBOSE_LOGGING - TORRENT_LOG(traversal) << " [" << this << "]" + TORRENT_LOG(traversal) << " [" << this << ":" << name() << "]" << " nodes-left: " << (m_results.end() - i) << " invoke-count: " << m_invoke_count << " branch-factor: " << m_branch_factor; @@ -292,12 +297,23 @@ void traversal_algorithm::status(dht_lookup& l) l.branch_factor = m_branch_factor; l.type = name(); l.nodes_left = 0; + l.first_timeout = 0; + + int last_sent = INT_MAX; + ptime now = time_now(); for (std::vector::iterator i = m_results.begin() , end(m_results.end()); i != end; ++i) { - if ((*i)->flags & observer::flag_queried) continue; + observer& o = **i; + if (o.flags & observer::flag_queried) + { + last_sent = (std::min)(last_sent, total_seconds(now - o.sent())); + if (o.has_short_timeout()) ++l.first_timeout; + continue; + } ++l.nodes_left; } + l.last_sent = last_sent; } } } // namespace libtorrent::dht