From d3a8916f022001ce102f8f4a85e221cf01a4b77a Mon Sep 17 00:00:00 2001 From: Arvid Norberg Date: Fri, 5 Nov 2010 19:06:50 +0000 Subject: [PATCH] further simplify the DHT by getting rid of the result type and just using observers instead. This might save some memory and should fix the assert that would be triggered before --- include/libtorrent/kademlia/find_data.hpp | 8 +- include/libtorrent/kademlia/node.hpp | 5 +- include/libtorrent/kademlia/observer.hpp | 24 ++- include/libtorrent/kademlia/refresh.hpp | 3 +- include/libtorrent/kademlia/rpc_manager.hpp | 3 +- .../kademlia/traversal_algorithm.hpp | 70 ++----- parse_dht_log.py | 3 + src/kademlia/dht_tracker.cpp | 2 +- src/kademlia/find_data.cpp | 49 ++--- src/kademlia/node.cpp | 8 +- src/kademlia/refresh.cpp | 24 +-- src/kademlia/rpc_manager.cpp | 16 +- src/kademlia/traversal_algorithm.cpp | 186 ++++++++++-------- 13 files changed, 197 insertions(+), 204 deletions(-) diff --git a/include/libtorrent/kademlia/find_data.hpp b/include/libtorrent/kademlia/find_data.hpp index 016bb875f..af7db2840 100644 --- a/include/libtorrent/kademlia/find_data.hpp +++ b/include/libtorrent/kademlia/find_data.hpp @@ -79,7 +79,8 @@ public: protected: void done(); - virtual bool invoke(udp::endpoint addr); + observer_ptr new_observer(void* ptr, udp::endpoint const& ep, node_id const& id); + virtual bool invoke(observer_ptr o); private: @@ -95,8 +96,9 @@ class find_data_observer : public observer { public: find_data_observer( - boost::intrusive_ptr const& algorithm) - : observer(algorithm) + boost::intrusive_ptr const& algorithm + , udp::endpoint const& ep, node_id const& id) + : observer(algorithm, ep, id) {} void reply(msg const&); }; diff --git a/include/libtorrent/kademlia/node.hpp b/include/libtorrent/kademlia/node.hpp index d3c7ee02d..4f8089123 100644 --- a/include/libtorrent/kademlia/node.hpp +++ b/include/libtorrent/kademlia/node.hpp @@ -154,8 +154,9 @@ struct null_type {}; class announce_observer : public observer { public: - announce_observer(boost::intrusive_ptr const& algo) - : observer(algo) + announce_observer(boost::intrusive_ptr const& algo + , udp::endpoint const& ep, node_id const& id) + : observer(algo, ep, id) {} void reply(msg const&) { m_done = true; } diff --git a/include/libtorrent/kademlia/observer.hpp b/include/libtorrent/kademlia/observer.hpp index f6f8db707..a1cfe1bfe 100644 --- a/include/libtorrent/kademlia/observer.hpp +++ b/include/libtorrent/kademlia/observer.hpp @@ -67,10 +67,13 @@ struct observer : boost::noncopyable friend TORRENT_EXPORT void intrusive_ptr_add_ref(observer const*); friend TORRENT_EXPORT void intrusive_ptr_release(observer const*); - observer(boost::intrusive_ptr const& a) - : m_sent() + observer(boost::intrusive_ptr const& a + , udp::endpoint const& ep, node_id const& id) + : flags(0) + , m_sent() , m_refs(0) , m_algorithm(a) + , m_id(id) , m_is_v6(false) , m_short_timeout(false) , m_done(false) @@ -80,6 +83,7 @@ struct observer : boost::noncopyable m_in_constructor = true; m_was_sent = false; #endif + set_target(ep); } virtual ~observer(); @@ -109,12 +113,26 @@ struct observer : boost::noncopyable address target_addr() const; udp::endpoint target_ep() const; + void set_id(node_id const& id) { m_id = id; } + node_id const& id() const { return m_id; } + void set_transaction_id(boost::uint16_t tid) { m_transaction_id = tid; } boost::uint16_t transaction_id() const { return m_transaction_id; } + enum { + flag_queried = 1, + flag_initial = 2, + flag_no_id = 4, + flag_short_timeout = 8, + flag_failed = 16, + flag_ipv6_address = 32, + flag_alive = 64 + }; + unsigned char flags; + #ifndef TORRENT_DHT_VERBOSE_LOGGING protected: #endif @@ -128,6 +146,8 @@ protected: const boost::intrusive_ptr m_algorithm; + node_id m_id; + TORRENT_UNION addr_t { #if TORRENT_USE_IPV6 diff --git a/include/libtorrent/kademlia/refresh.hpp b/include/libtorrent/kademlia/refresh.hpp index fc29f3014..c30476a32 100644 --- a/include/libtorrent/kademlia/refresh.hpp +++ b/include/libtorrent/kademlia/refresh.hpp @@ -55,7 +55,8 @@ public: protected: - virtual bool invoke(udp::endpoint addr); + observer_ptr new_observer(void* ptr, udp::endpoint const& ep, node_id const& id); + virtual bool invoke(observer_ptr o); }; } } // namespace libtorrent::dht diff --git a/include/libtorrent/kademlia/rpc_manager.hpp b/include/libtorrent/kademlia/rpc_manager.hpp index 2089d4634..47e2a157f 100644 --- a/include/libtorrent/kademlia/rpc_manager.hpp +++ b/include/libtorrent/kademlia/rpc_manager.hpp @@ -55,7 +55,8 @@ TORRENT_DECLARE_LOG(rpc); struct null_observer : public observer { - null_observer(boost::intrusive_ptr& a): observer(a) {} + null_observer(boost::intrusive_ptr const& a + , udp::endpoint const& ep, node_id const& id): observer(a, ep, id) {} virtual void reply(msg const&) { m_done = true; } }; diff --git a/include/libtorrent/kademlia/traversal_algorithm.hpp b/include/libtorrent/kademlia/traversal_algorithm.hpp index 0cb0f2cd5..6ad7cde8d 100644 --- a/include/libtorrent/kademlia/traversal_algorithm.hpp +++ b/include/libtorrent/kademlia/traversal_algorithm.hpp @@ -39,6 +39,7 @@ POSSIBILITY OF SUCH DAMAGE. #include #include #include +#include #include #include @@ -60,10 +61,10 @@ class node_impl; struct traversal_algorithm : boost::noncopyable { void traverse(node_id const& id, udp::endpoint addr); - void finished(udp::endpoint const& ep); + void finished(observer_ptr o); enum flags_t { prevent_request = 1, short_timeout = 2 }; - void failed(udp::endpoint const& ep, int flags = 0); + void failed(observer_ptr o, int flags = 0); virtual ~traversal_algorithm(); boost::pool<>& allocator() const; void status(dht_lookup& l); @@ -75,60 +76,6 @@ struct traversal_algorithm : boost::noncopyable void add_entry(node_id const& id, udp::endpoint addr, unsigned char flags); - struct result - { - result(node_id const& id, udp::endpoint ep, unsigned char f = 0) - : id(id), flags(f) - { -#if TORRENT_USE_IPV6 - if (ep.address().is_v6()) - { - flags |= ipv6_address; - addr.v6 = ep.address().to_v6().to_bytes(); - } - else -#endif - { - flags &= ~ipv6_address; - addr.v4 = ep.address().to_v4().to_bytes(); - } - port = ep.port(); - } - - udp::endpoint endpoint() const - { -#if TORRENT_USE_IPV6 - if (flags & ipv6_address) - return udp::endpoint(address_v6(addr.v6), port); - else -#endif - return udp::endpoint(address_v4(addr.v4), port); - } - - node_id id; - - TORRENT_UNION addr_t - { - address_v4::bytes_type v4; -#if TORRENT_USE_IPV6 - address_v6::bytes_type v6; -#endif - } addr; - - boost::uint16_t port; - - enum { - queried = 1, - initial = 2, - no_id = 4, - short_timeout = 8, - failed = 16, - ipv6_address = 32, - alive = 64 - }; - unsigned char flags; - }; - traversal_algorithm( node_impl& node , node_id target) @@ -151,8 +98,13 @@ protected: void add_router_entries(); void init(); - virtual void done() {} - virtual bool invoke(udp::endpoint addr) { return false; } + virtual void done(); + // should construct an algorithm dependent + // observer in ptr. + virtual observer_ptr new_observer(void* ptr + , udp::endpoint const& ep, node_id const& id); + + virtual bool invoke(observer_ptr o) { return false; } friend void intrusive_ptr_add_ref(traversal_algorithm* p) { @@ -169,7 +121,7 @@ protected: node_impl& m_node; node_id m_target; - std::vector m_results; + std::vector m_results; int m_invoke_count; int m_branch_factor; int m_responses; diff --git a/parse_dht_log.py b/parse_dht_log.py index 9303e8c86..0e6063d06 100755 --- a/parse_dht_log.py +++ b/parse_dht_log.py @@ -37,11 +37,14 @@ for line in f: print line.split(' ') out = open('dht_announce_distribution.dat', 'w+') +print 'announce distribution items: %d' % len(announce_histogram) for k,v in announce_histogram.items(): print >>out, '%d %d' % (k, v) + print '%d %d' % (k, v) out.close() out = open('dht_node_uptime_distribution.dat', 'w+') +print 'node uptimes: %d' % len(node_uptime_histogram) for k,v in node_uptime_histogram.items(): print >>out, '%d %d' % (k + up_time_quanta/2, v) out.close() diff --git a/src/kademlia/dht_tracker.cpp b/src/kademlia/dht_tracker.cpp index 85d3e23d5..e1e6cb170 100644 --- a/src/kademlia/dht_tracker.cpp +++ b/src/kademlia/dht_tracker.cpp @@ -623,7 +623,7 @@ namespace libtorrent { namespace dht #ifdef TORRENT_DHT_VERBOSE_LOGGING std::stringstream log_line; lazy_entry print; - int ret = lazy_bdecode(&m_send_buf[0], &m_send_buf[0] + m_send_buf.size(), print, 0, ec); + int ret = lazy_bdecode(&m_send_buf[0], &m_send_buf[0] + m_send_buf.size(), print, ec); TORRENT_ASSERT(ret == 0); log_line << print_entry(print, true); #endif diff --git a/src/kademlia/find_data.cpp b/src/kademlia/find_data.cpp index 0e93ebb9a..31ba7ec71 100644 --- a/src/kademlia/find_data.cpp +++ b/src/kademlia/find_data.cpp @@ -172,7 +172,7 @@ void find_data_observer::reply(msg const& m) void add_entry_fun(void* userdata, node_entry const& e) { traversal_algorithm* f = (traversal_algorithm*)userdata; - f->add_entry(e.id, e.ep(), traversal_algorithm::result::initial); + f->add_entry(e.id, e.ep(), observer::flag_initial); } find_data::find_data( @@ -190,7 +190,17 @@ find_data::find_data( node.m_table.for_each_node(&add_entry_fun, 0, (traversal_algorithm*)this); } -bool find_data::invoke(udp::endpoint addr) +observer_ptr find_data::new_observer(void* ptr + , udp::endpoint const& ep, node_id const& id) +{ + observer_ptr o(new (ptr) find_data_observer(this, ep, id)); +#ifdef TORRENT_DEBUG + o->m_in_constructor = false; +#endif + return o; +} + +bool find_data::invoke(observer_ptr o) { if (m_done) { @@ -198,28 +208,12 @@ bool find_data::invoke(udp::endpoint addr) return false; } - TORRENT_ASSERT(m_node.m_rpc.allocation_size() >= sizeof(find_data_observer)); - void* ptr = m_node.m_rpc.allocator().malloc(); - if (ptr == 0) - { -#ifdef TORRENT_DHT_VERBOSE_LOGGING - TORRENT_LOG(traversal) << "[" << this << "] failed to " - "allocate memory for observer. aborting!"; -#endif - done(); - return false; - } - m_node.m_rpc.allocator().set_next_size(10); - observer_ptr o(new (ptr) find_data_observer(this)); -#ifdef TORRENT_DEBUG - o->m_in_constructor = false; -#endif entry e; e["y"] = "q"; e["q"] = "get_peers"; entry& a = e["a"]; a["info_hash"] = m_target.to_string(); - return m_node.m_rpc.invoke(e, addr, o); + return m_node.m_rpc.invoke(e, o->target_ep(), o); } void find_data::got_peers(std::vector const& peers) @@ -234,19 +228,26 @@ void find_data::done() m_done = true; +#ifdef TORRENT_DHT_VERBOSE_LOGGING + TORRENT_LOG(traversal) << time_now_string() << "[" << this << "] get_peers DONE"; +#endif + std::vector > results; int num_results = m_node.m_table.bucket_size(); - for (std::vector::iterator i = m_results.begin() + for (std::vector::iterator i = m_results.begin() , end(m_results.end()); i != end && num_results > 0; ++i) { - if (i->flags & result::no_id) continue; - if ((i->flags & result::queried) == 0) continue; - std::map::iterator j = m_write_tokens.find(i->id); + observer_ptr const& o = *i; + if (o->flags & observer::flag_no_id) continue; + if ((o->flags & observer::flag_queried) == 0) continue; + std::map::iterator j = m_write_tokens.find(o->id()); if (j == m_write_tokens.end()) continue; - results.push_back(std::make_pair(node_entry(i->id, i->endpoint()), j->second)); + results.push_back(std::make_pair(node_entry(o->id(), o->target_ep()), j->second)); --num_results; } m_nodes_callback(results, m_got_peers); + + traversal_algorithm::done(); } } } // namespace libtorrent::dht diff --git a/src/kademlia/node.cpp b/src/kademlia/node.cpp index 17836d3f3..bbe03f456 100644 --- a/src/kademlia/node.cpp +++ b/src/kademlia/node.cpp @@ -267,7 +267,7 @@ void node_impl::bootstrap(std::vector const& nodes #ifdef TORRENT_DHT_VERBOSE_LOGGING ++count; #endif - r->add_entry(node_id(0), *i, traversal_algorithm::result::initial); + r->add_entry(node_id(0), *i, observer::flag_initial); } #ifdef TORRENT_DHT_VERBOSE_LOGGING @@ -369,7 +369,7 @@ namespace void* ptr = node.m_rpc.allocator().malloc(); if (ptr == 0) return; node.m_rpc.allocator().set_next_size(10); - observer_ptr o(new (ptr) announce_observer(algo)); + observer_ptr o(new (ptr) announce_observer(algo, i->first.ep(), i->first.id)); #ifdef TORRENT_DEBUG o->m_in_constructor = false; #endif @@ -402,9 +402,11 @@ void node_impl::add_node(udp::endpoint node) m_rpc.allocator().set_next_size(10); // create a dummy traversal_algorithm + // this is unfortunately necessary for the observer + // to free itself from the pool when it's being released boost::intrusive_ptr algo( new traversal_algorithm(*this, (node_id::min)())); - observer_ptr o(new (ptr) null_observer(algo)); + observer_ptr o(new (ptr) null_observer(algo, node, node_id(0))); #ifdef TORRENT_DEBUG o->m_in_constructor = false; #endif diff --git a/src/kademlia/refresh.cpp b/src/kademlia/refresh.cpp index 07717f35a..54363189a 100644 --- a/src/kademlia/refresh.cpp +++ b/src/kademlia/refresh.cpp @@ -58,30 +58,24 @@ char const* refresh::name() const return "refresh"; } -bool refresh::invoke(udp::endpoint addr) +observer_ptr refresh::new_observer(void* ptr + , udp::endpoint const& ep, node_id const& id) { - TORRENT_ASSERT(m_node.m_rpc.allocation_size() >= sizeof(find_data_observer)); - void* ptr = m_node.m_rpc.allocator().malloc(); - if (ptr == 0) - { -#ifdef TORRENT_DHT_VERBOSE_LOGGING - TORRENT_LOG(traversal) << "[" << this << "] failed to " - "allocate memory for observer. aborting!"; -#endif - done(); - return false; - } - m_node.m_rpc.allocator().set_next_size(10); - observer_ptr o(new (ptr) find_data_observer(this)); + observer_ptr o(new (ptr) find_data_observer(this, ep, id)); #ifdef TORRENT_DEBUG o->m_in_constructor = false; #endif + return o; +} + +bool refresh::invoke(observer_ptr o) +{ entry e; e["y"] = "q"; e["q"] = "find_node"; entry& a = e["a"]; a["target"] = target().to_string(); - m_node.m_rpc.invoke(e, addr, o); + m_node.m_rpc.invoke(e, o->target_ep(), o); return true; } diff --git a/src/kademlia/rpc_manager.cpp b/src/kademlia/rpc_manager.cpp index 97f909d11..62ddc600c 100644 --- a/src/kademlia/rpc_manager.cpp +++ b/src/kademlia/rpc_manager.cpp @@ -125,14 +125,14 @@ void observer::abort() { if (m_done) return; m_done = true; - m_algorithm->failed(target_ep(), traversal_algorithm::prevent_request); + m_algorithm->failed(observer_ptr(this), traversal_algorithm::prevent_request); } void observer::done() { if (m_done) return; m_done = true; - m_algorithm->finished(target_ep()); + m_algorithm->finished(observer_ptr(this)); } void observer::short_timeout() @@ -140,7 +140,7 @@ void observer::short_timeout() if (m_short_timeout) return; TORRENT_ASSERT(m_short_timeout == false); m_short_timeout = true; - m_algorithm->failed(target_ep(), traversal_algorithm::short_timeout); + m_algorithm->failed(observer_ptr(this), traversal_algorithm::short_timeout); } // this is called when no reply has been received within @@ -149,7 +149,7 @@ void observer::timeout() { if (m_done) return; m_done = true; - m_algorithm->failed(target_ep()); + m_algorithm->failed(observer_ptr(this)); } node_id generate_id(); @@ -182,9 +182,11 @@ rpc_manager::rpc_manager(node_id const& our_id #define PRINT_OFFSETOF(x, y) TORRENT_LOG(rpc) << " +" << offsetof(x, y) << ": " #y TORRENT_LOG(rpc) << " observer: " << sizeof(observer); + PRINT_OFFSETOF(observer, flags); PRINT_OFFSETOF(observer, m_sent); PRINT_OFFSETOF(observer, m_refs); PRINT_OFFSETOF(observer, m_algorithm); + PRINT_OFFSETOF(observer, m_id); PRINT_OFFSETOF(observer, m_addr); PRINT_OFFSETOF(observer, m_port); PRINT_OFFSETOF(observer, m_transaction_id); @@ -193,12 +195,6 @@ rpc_manager::rpc_manager(node_id const& our_id TORRENT_LOG(rpc) << " null_observer: " << sizeof(null_observer); TORRENT_LOG(rpc) << " find_data_observer: " << sizeof(find_data_observer); - TORRENT_LOG(rpc) << " traversal_algorithm::result: " << sizeof(traversal_algorithm::result); - PRINT_OFFSETOF(traversal_algorithm::result, id); - PRINT_OFFSETOF(traversal_algorithm::result, addr); - PRINT_OFFSETOF(traversal_algorithm::result, port); - PRINT_OFFSETOF(traversal_algorithm::result, flags); - #undef PRINT_OFFSETOF #endif diff --git a/src/kademlia/traversal_algorithm.cpp b/src/kademlia/traversal_algorithm.cpp index eaaa0f906..b0e028e9a 100644 --- a/src/kademlia/traversal_algorithm.cpp +++ b/src/kademlia/traversal_algorithm.cpp @@ -46,36 +46,62 @@ namespace libtorrent { namespace dht TORRENT_DEFINE_LOG(traversal) #endif +observer_ptr traversal_algorithm::new_observer(void* ptr + , udp::endpoint const& ep, node_id const& id) +{ + observer_ptr o(new (ptr) null_observer(boost::intrusive_ptr(this), ep, id)); +#ifdef TORRENT_DEBUG + o->m_in_constructor = false; +#endif + return o; +} + void traversal_algorithm::add_entry(node_id const& id, udp::endpoint addr, unsigned char flags) { - result entry(id, addr, flags); - if (entry.id.is_all_zeros()) + TORRENT_ASSERT(m_node.m_rpc.allocation_size() >= sizeof(find_data_observer)); + m_node.m_rpc.allocator().set_next_size(10); + void* ptr = m_node.m_rpc.allocator().malloc(); + if (ptr == 0) { - entry.id = generate_id(); - entry.flags |= result::no_id; +#ifdef TORRENT_DHT_VERBOSE_LOGGING + TORRENT_LOG(traversal) << "[" << this << "] failed to " + "allocate memory for observer. aborting!"; +#endif + done(); + return; + } + observer_ptr o = new_observer(ptr, addr, id); + if (id.is_all_zeros()) + { + o->set_id(generate_id()); + o->flags |= observer::flag_no_id; } - std::vector::iterator i = std::lower_bound( + o->flags |= flags; + + std::vector::iterator i = std::lower_bound( m_results.begin() , m_results.end() - , entry + , o , boost::bind( compare_ref - , boost::bind(&result::id, _1) - , boost::bind(&result::id, _2) + , boost::bind(&observer::id, _1) + , boost::bind(&observer::id, _2) , m_target ) ); - if (i == m_results.end() || i->id != id) + if (i == m_results.end() || (*i)->id() != id) { TORRENT_ASSERT(std::find_if(m_results.begin(), m_results.end() - , boost::bind(&result::id, _1) == id) == m_results.end()); + , boost::bind(&observer::id, _1) == id) == m_results.end()); #ifdef TORRENT_DHT_VERBOSE_LOGGING TORRENT_LOG(traversal) << "[" << this << "] adding result: " << id << " " << addr; #endif - m_results.insert(i, entry); + i = m_results.insert(i, o); } + + if (m_results.size() > 100) m_results.resize(100); } void traversal_algorithm::start() @@ -102,29 +128,21 @@ void traversal_algorithm::traverse(node_id const& id, udp::endpoint addr) add_entry(id, addr, 0); } -void traversal_algorithm::finished(udp::endpoint const& ep) +void traversal_algorithm::finished(observer_ptr o) { - std::vector::iterator i = std::find_if( - m_results.begin() - , m_results.end() - , boost::bind( - std::equal_to() - , boost::bind(&result::endpoint, _1) - , ep - ) - ); +#ifdef TORRENT_DEBUG + std::vector::iterator i = std::find( + m_results.begin(), m_results.end(), o); TORRENT_ASSERT(i != m_results.end()); +#endif - if (i != m_results.end()) - { - // if this flag is set, it means we increased the - // branch factor for it, and we should restore it - if (i->flags & result::short_timeout) - --m_branch_factor; - } + // if this flag is set, it means we increased the + // branch factor for it, and we should restore it + if (o->flags & observer::flag_short_timeout) + --m_branch_factor; - i->flags |= result::alive; + o->flags |= observer::flag_alive; ++m_responses; --m_invoke_count; @@ -136,59 +154,52 @@ void traversal_algorithm::finished(udp::endpoint const& ep) // prevent request means that the total number of requests has // overflown. This query failed because it was the oldest one. // So, if this is true, don't make another request -void traversal_algorithm::failed(udp::endpoint const& ep, int flags) +void traversal_algorithm::failed(observer_ptr o, int flags) { TORRENT_ASSERT(m_invoke_count >= 0); if (m_results.empty()) return; - std::vector::iterator i = std::find_if( - m_results.begin() - , m_results.end() - , boost::bind( - std::equal_to() - , boost::bind(&result::endpoint, _1) - , ep - ) - ); - - TORRENT_ASSERT(i != m_results.end()); - - if (i != m_results.end()) + TORRENT_ASSERT(o->flags & observer::flag_queried); + if (flags & short_timeout) { - TORRENT_ASSERT(i->flags & result::queried); - if (flags & short_timeout) - { - // short timeout means that it has been more than - // two seconds since we sent the request, and that - // we'll most likely not get a response. But, in case - // we do get a late response, keep the handler - // around for some more, but open up the slot - // by increasing the branch factor - if ((i->flags & result::short_timeout) == 0) - ++m_branch_factor; - i->flags |= result::short_timeout; - } - else - { - i->flags |= result::failed; + // short timeout means that it has been more than + // two seconds since we sent the request, and that + // we'll most likely not get a response. But, in case + // we do get a late response, keep the handler + // around for some more, but open up the slot + // by increasing the branch factor + if ((o->flags & observer::flag_short_timeout) == 0) + ++m_branch_factor; + o->flags |= observer::flag_short_timeout; #ifdef TORRENT_DHT_VERBOSE_LOGGING - TORRENT_LOG(traversal) << " [" << this << "] failed: " - << i->id << " " << i->endpoint(); + TORRENT_LOG(traversal) << " [" << this << "] first chance timeout: " + << o->id() << " " << o->target_ep() + << " branch-factor: " << m_branch_factor + << " invoke-count: " << m_invoke_count; #endif - // if this flag is set, it means we increased the - // branch factor for it, and we should restore it - if (i->flags & result::short_timeout) - --m_branch_factor; + } + else + { + o->flags |= observer::flag_failed; + // if this flag is set, it means we increased the + // branch factor for it, and we should restore it + if (o->flags & observer::flag_short_timeout) + --m_branch_factor; - // don't tell the routing table about - // node ids that we just generated ourself - if ((i->flags & result::no_id) == 0) - m_node.m_table.node_failed(i->id); - ++m_timeouts; - --m_invoke_count; - TORRENT_ASSERT(m_invoke_count >= 0); - } +#ifdef TORRENT_DHT_VERBOSE_LOGGING + TORRENT_LOG(traversal) << " [" << this << "] failed: " + << o->id() << " " << o->target_ep() + << " branch-factor: " << m_branch_factor + << " invoke-count: " << m_invoke_count; +#endif + // don't tell the routing table about + // node ids that we just generated ourself + if ((o->flags & observer::flag_no_id) == 0) + m_node.m_table.node_failed(o->id()); + ++m_timeouts; + --m_invoke_count; + TORRENT_ASSERT(m_invoke_count >= 0); } if (flags & prevent_request) @@ -200,6 +211,13 @@ void traversal_algorithm::failed(udp::endpoint const& ep, int flags) if (m_invoke_count == 0) done(); } +void traversal_algorithm::done() +{ + // delete all our references to the observer objects so + // they will in turn release the traversal algorithm + m_results.clear(); +} + namespace { bool bitwise_nand(unsigned char lhs, unsigned char rhs) @@ -213,23 +231,25 @@ void traversal_algorithm::add_requests() int results_target = m_node.m_table.bucket_size(); // Find the first node that hasn't already been queried. - for (std::vector::iterator i = m_results.begin() + for (std::vector::iterator i = m_results.begin() , end(m_results.end()); i != end && results_target > 0 && m_invoke_count < m_branch_factor; ++i) { - if (i->flags & result::alive) --results_target; - if (i->flags & result::queried) continue; + if ((*i)->flags & observer::flag_alive) --results_target; + if ((*i)->flags & observer::flag_queried) continue; #ifdef TORRENT_DHT_VERBOSE_LOGGING - TORRENT_LOG(traversal) << " [" << this << "] nodes left: " - << (m_results.end() - i); + TORRENT_LOG(traversal) << " [" << this << "]" + << " nodes-left: " << (m_results.end() - i) + << " invoke-count: " << m_invoke_count + << " branch-factor: " << m_branch_factor; #endif - if (invoke(i->endpoint())) + if (invoke(*i)) { TORRENT_ASSERT(m_invoke_count >= 0); ++m_invoke_count; - i->flags |= result::queried; + (*i)->flags |= observer::flag_queried; } } } @@ -243,7 +263,7 @@ void traversal_algorithm::add_router_entries() for (routing_table::router_iterator i = m_node.m_table.router_begin() , end(m_node.m_table.router_end()); i != end; ++i) { - add_entry(node_id(0), *i, result::initial); + add_entry(node_id(0), *i, observer::flag_initial); } } @@ -268,10 +288,10 @@ void traversal_algorithm::status(dht_lookup& l) l.branch_factor = m_branch_factor; l.type = name(); l.nodes_left = 0; - for (std::vector::iterator i = m_results.begin() + for (std::vector::iterator i = m_results.begin() , end(m_results.end()); i != end; ++i) { - if (i->flags & result::queried) continue; + if ((*i)->flags & observer::flag_queried) continue; ++l.nodes_left; } }