diff --git a/include/libtorrent/kademlia/find_data.hpp b/include/libtorrent/kademlia/find_data.hpp index 016bb875f..af7db2840 100644 --- a/include/libtorrent/kademlia/find_data.hpp +++ b/include/libtorrent/kademlia/find_data.hpp @@ -79,7 +79,8 @@ public: protected: void done(); - virtual bool invoke(udp::endpoint addr); + observer_ptr new_observer(void* ptr, udp::endpoint const& ep, node_id const& id); + virtual bool invoke(observer_ptr o); private: @@ -95,8 +96,9 @@ class find_data_observer : public observer { public: find_data_observer( - boost::intrusive_ptr const& algorithm) - : observer(algorithm) + boost::intrusive_ptr const& algorithm + , udp::endpoint const& ep, node_id const& id) + : observer(algorithm, ep, id) {} void reply(msg const&); }; diff --git a/include/libtorrent/kademlia/node.hpp b/include/libtorrent/kademlia/node.hpp index d3c7ee02d..4f8089123 100644 --- a/include/libtorrent/kademlia/node.hpp +++ b/include/libtorrent/kademlia/node.hpp @@ -154,8 +154,9 @@ struct null_type {}; class announce_observer : public observer { public: - announce_observer(boost::intrusive_ptr const& algo) - : observer(algo) + announce_observer(boost::intrusive_ptr const& algo + , udp::endpoint const& ep, node_id const& id) + : observer(algo, ep, id) {} void reply(msg const&) { m_done = true; } diff --git a/include/libtorrent/kademlia/observer.hpp b/include/libtorrent/kademlia/observer.hpp index f6f8db707..a1cfe1bfe 100644 --- a/include/libtorrent/kademlia/observer.hpp +++ b/include/libtorrent/kademlia/observer.hpp @@ -67,10 +67,13 @@ struct observer : boost::noncopyable friend TORRENT_EXPORT void intrusive_ptr_add_ref(observer const*); friend TORRENT_EXPORT void intrusive_ptr_release(observer const*); - observer(boost::intrusive_ptr const& a) - : m_sent() + observer(boost::intrusive_ptr const& a + , udp::endpoint const& ep, node_id const& id) + : flags(0) + , m_sent() , m_refs(0) , m_algorithm(a) + , m_id(id) , m_is_v6(false) , m_short_timeout(false) , m_done(false) @@ -80,6 +83,7 @@ struct observer : boost::noncopyable m_in_constructor = true; m_was_sent = false; #endif + set_target(ep); } virtual ~observer(); @@ -109,12 +113,26 @@ struct observer : boost::noncopyable address target_addr() const; udp::endpoint target_ep() const; + void set_id(node_id const& id) { m_id = id; } + node_id const& id() const { return m_id; } + void set_transaction_id(boost::uint16_t tid) { m_transaction_id = tid; } boost::uint16_t transaction_id() const { return m_transaction_id; } + enum { + flag_queried = 1, + flag_initial = 2, + flag_no_id = 4, + flag_short_timeout = 8, + flag_failed = 16, + flag_ipv6_address = 32, + flag_alive = 64 + }; + unsigned char flags; + #ifndef TORRENT_DHT_VERBOSE_LOGGING protected: #endif @@ -128,6 +146,8 @@ protected: const boost::intrusive_ptr m_algorithm; + node_id m_id; + TORRENT_UNION addr_t { #if TORRENT_USE_IPV6 diff --git a/include/libtorrent/kademlia/refresh.hpp b/include/libtorrent/kademlia/refresh.hpp index fc29f3014..c30476a32 100644 --- a/include/libtorrent/kademlia/refresh.hpp +++ b/include/libtorrent/kademlia/refresh.hpp @@ -55,7 +55,8 @@ public: protected: - virtual bool invoke(udp::endpoint addr); + observer_ptr new_observer(void* ptr, udp::endpoint const& ep, node_id const& id); + virtual bool invoke(observer_ptr o); }; } } // namespace libtorrent::dht diff --git a/include/libtorrent/kademlia/rpc_manager.hpp b/include/libtorrent/kademlia/rpc_manager.hpp index 2089d4634..47e2a157f 100644 --- a/include/libtorrent/kademlia/rpc_manager.hpp +++ b/include/libtorrent/kademlia/rpc_manager.hpp @@ -55,7 +55,8 @@ TORRENT_DECLARE_LOG(rpc); struct null_observer : public observer { - null_observer(boost::intrusive_ptr& a): observer(a) {} + null_observer(boost::intrusive_ptr const& a + , udp::endpoint const& ep, node_id const& id): observer(a, ep, id) {} virtual void reply(msg const&) { m_done = true; } }; diff --git a/include/libtorrent/kademlia/traversal_algorithm.hpp b/include/libtorrent/kademlia/traversal_algorithm.hpp index 0cb0f2cd5..6ad7cde8d 100644 --- a/include/libtorrent/kademlia/traversal_algorithm.hpp +++ b/include/libtorrent/kademlia/traversal_algorithm.hpp @@ -39,6 +39,7 @@ POSSIBILITY OF SUCH DAMAGE. #include #include #include +#include #include #include @@ -60,10 +61,10 @@ class node_impl; struct traversal_algorithm : boost::noncopyable { void traverse(node_id const& id, udp::endpoint addr); - void finished(udp::endpoint const& ep); + void finished(observer_ptr o); enum flags_t { prevent_request = 1, short_timeout = 2 }; - void failed(udp::endpoint const& ep, int flags = 0); + void failed(observer_ptr o, int flags = 0); virtual ~traversal_algorithm(); boost::pool<>& allocator() const; void status(dht_lookup& l); @@ -75,60 +76,6 @@ struct traversal_algorithm : boost::noncopyable void add_entry(node_id const& id, udp::endpoint addr, unsigned char flags); - struct result - { - result(node_id const& id, udp::endpoint ep, unsigned char f = 0) - : id(id), flags(f) - { -#if TORRENT_USE_IPV6 - if (ep.address().is_v6()) - { - flags |= ipv6_address; - addr.v6 = ep.address().to_v6().to_bytes(); - } - else -#endif - { - flags &= ~ipv6_address; - addr.v4 = ep.address().to_v4().to_bytes(); - } - port = ep.port(); - } - - udp::endpoint endpoint() const - { -#if TORRENT_USE_IPV6 - if (flags & ipv6_address) - return udp::endpoint(address_v6(addr.v6), port); - else -#endif - return udp::endpoint(address_v4(addr.v4), port); - } - - node_id id; - - TORRENT_UNION addr_t - { - address_v4::bytes_type v4; -#if TORRENT_USE_IPV6 - address_v6::bytes_type v6; -#endif - } addr; - - boost::uint16_t port; - - enum { - queried = 1, - initial = 2, - no_id = 4, - short_timeout = 8, - failed = 16, - ipv6_address = 32, - alive = 64 - }; - unsigned char flags; - }; - traversal_algorithm( node_impl& node , node_id target) @@ -151,8 +98,13 @@ protected: void add_router_entries(); void init(); - virtual void done() {} - virtual bool invoke(udp::endpoint addr) { return false; } + virtual void done(); + // should construct an algorithm dependent + // observer in ptr. + virtual observer_ptr new_observer(void* ptr + , udp::endpoint const& ep, node_id const& id); + + virtual bool invoke(observer_ptr o) { return false; } friend void intrusive_ptr_add_ref(traversal_algorithm* p) { @@ -169,7 +121,7 @@ protected: node_impl& m_node; node_id m_target; - std::vector m_results; + std::vector m_results; int m_invoke_count; int m_branch_factor; int m_responses; diff --git a/parse_dht_log.py b/parse_dht_log.py index 9303e8c86..0e6063d06 100755 --- a/parse_dht_log.py +++ b/parse_dht_log.py @@ -37,11 +37,14 @@ for line in f: print line.split(' ') out = open('dht_announce_distribution.dat', 'w+') +print 'announce distribution items: %d' % len(announce_histogram) for k,v in announce_histogram.items(): print >>out, '%d %d' % (k, v) + print '%d %d' % (k, v) out.close() out = open('dht_node_uptime_distribution.dat', 'w+') +print 'node uptimes: %d' % len(node_uptime_histogram) for k,v in node_uptime_histogram.items(): print >>out, '%d %d' % (k + up_time_quanta/2, v) out.close() diff --git a/src/kademlia/dht_tracker.cpp b/src/kademlia/dht_tracker.cpp index 85d3e23d5..e1e6cb170 100644 --- a/src/kademlia/dht_tracker.cpp +++ b/src/kademlia/dht_tracker.cpp @@ -623,7 +623,7 @@ namespace libtorrent { namespace dht #ifdef TORRENT_DHT_VERBOSE_LOGGING std::stringstream log_line; lazy_entry print; - int ret = lazy_bdecode(&m_send_buf[0], &m_send_buf[0] + m_send_buf.size(), print, 0, ec); + int ret = lazy_bdecode(&m_send_buf[0], &m_send_buf[0] + m_send_buf.size(), print, ec); TORRENT_ASSERT(ret == 0); log_line << print_entry(print, true); #endif diff --git a/src/kademlia/find_data.cpp b/src/kademlia/find_data.cpp index 0e93ebb9a..31ba7ec71 100644 --- a/src/kademlia/find_data.cpp +++ b/src/kademlia/find_data.cpp @@ -172,7 +172,7 @@ void find_data_observer::reply(msg const& m) void add_entry_fun(void* userdata, node_entry const& e) { traversal_algorithm* f = (traversal_algorithm*)userdata; - f->add_entry(e.id, e.ep(), traversal_algorithm::result::initial); + f->add_entry(e.id, e.ep(), observer::flag_initial); } find_data::find_data( @@ -190,7 +190,17 @@ find_data::find_data( node.m_table.for_each_node(&add_entry_fun, 0, (traversal_algorithm*)this); } -bool find_data::invoke(udp::endpoint addr) +observer_ptr find_data::new_observer(void* ptr + , udp::endpoint const& ep, node_id const& id) +{ + observer_ptr o(new (ptr) find_data_observer(this, ep, id)); +#ifdef TORRENT_DEBUG + o->m_in_constructor = false; +#endif + return o; +} + +bool find_data::invoke(observer_ptr o) { if (m_done) { @@ -198,28 +208,12 @@ bool find_data::invoke(udp::endpoint addr) return false; } - TORRENT_ASSERT(m_node.m_rpc.allocation_size() >= sizeof(find_data_observer)); - void* ptr = m_node.m_rpc.allocator().malloc(); - if (ptr == 0) - { -#ifdef TORRENT_DHT_VERBOSE_LOGGING - TORRENT_LOG(traversal) << "[" << this << "] failed to " - "allocate memory for observer. aborting!"; -#endif - done(); - return false; - } - m_node.m_rpc.allocator().set_next_size(10); - observer_ptr o(new (ptr) find_data_observer(this)); -#ifdef TORRENT_DEBUG - o->m_in_constructor = false; -#endif entry e; e["y"] = "q"; e["q"] = "get_peers"; entry& a = e["a"]; a["info_hash"] = m_target.to_string(); - return m_node.m_rpc.invoke(e, addr, o); + return m_node.m_rpc.invoke(e, o->target_ep(), o); } void find_data::got_peers(std::vector const& peers) @@ -234,19 +228,26 @@ void find_data::done() m_done = true; +#ifdef TORRENT_DHT_VERBOSE_LOGGING + TORRENT_LOG(traversal) << time_now_string() << "[" << this << "] get_peers DONE"; +#endif + std::vector > results; int num_results = m_node.m_table.bucket_size(); - for (std::vector::iterator i = m_results.begin() + for (std::vector::iterator i = m_results.begin() , end(m_results.end()); i != end && num_results > 0; ++i) { - if (i->flags & result::no_id) continue; - if ((i->flags & result::queried) == 0) continue; - std::map::iterator j = m_write_tokens.find(i->id); + observer_ptr const& o = *i; + if (o->flags & observer::flag_no_id) continue; + if ((o->flags & observer::flag_queried) == 0) continue; + std::map::iterator j = m_write_tokens.find(o->id()); if (j == m_write_tokens.end()) continue; - results.push_back(std::make_pair(node_entry(i->id, i->endpoint()), j->second)); + results.push_back(std::make_pair(node_entry(o->id(), o->target_ep()), j->second)); --num_results; } m_nodes_callback(results, m_got_peers); + + traversal_algorithm::done(); } } } // namespace libtorrent::dht diff --git a/src/kademlia/node.cpp b/src/kademlia/node.cpp index 17836d3f3..bbe03f456 100644 --- a/src/kademlia/node.cpp +++ b/src/kademlia/node.cpp @@ -267,7 +267,7 @@ void node_impl::bootstrap(std::vector const& nodes #ifdef TORRENT_DHT_VERBOSE_LOGGING ++count; #endif - r->add_entry(node_id(0), *i, traversal_algorithm::result::initial); + r->add_entry(node_id(0), *i, observer::flag_initial); } #ifdef TORRENT_DHT_VERBOSE_LOGGING @@ -369,7 +369,7 @@ namespace void* ptr = node.m_rpc.allocator().malloc(); if (ptr == 0) return; node.m_rpc.allocator().set_next_size(10); - observer_ptr o(new (ptr) announce_observer(algo)); + observer_ptr o(new (ptr) announce_observer(algo, i->first.ep(), i->first.id)); #ifdef TORRENT_DEBUG o->m_in_constructor = false; #endif @@ -402,9 +402,11 @@ void node_impl::add_node(udp::endpoint node) m_rpc.allocator().set_next_size(10); // create a dummy traversal_algorithm + // this is unfortunately necessary for the observer + // to free itself from the pool when it's being released boost::intrusive_ptr algo( new traversal_algorithm(*this, (node_id::min)())); - observer_ptr o(new (ptr) null_observer(algo)); + observer_ptr o(new (ptr) null_observer(algo, node, node_id(0))); #ifdef TORRENT_DEBUG o->m_in_constructor = false; #endif diff --git a/src/kademlia/refresh.cpp b/src/kademlia/refresh.cpp index 07717f35a..54363189a 100644 --- a/src/kademlia/refresh.cpp +++ b/src/kademlia/refresh.cpp @@ -58,30 +58,24 @@ char const* refresh::name() const return "refresh"; } -bool refresh::invoke(udp::endpoint addr) +observer_ptr refresh::new_observer(void* ptr + , udp::endpoint const& ep, node_id const& id) { - TORRENT_ASSERT(m_node.m_rpc.allocation_size() >= sizeof(find_data_observer)); - void* ptr = m_node.m_rpc.allocator().malloc(); - if (ptr == 0) - { -#ifdef TORRENT_DHT_VERBOSE_LOGGING - TORRENT_LOG(traversal) << "[" << this << "] failed to " - "allocate memory for observer. aborting!"; -#endif - done(); - return false; - } - m_node.m_rpc.allocator().set_next_size(10); - observer_ptr o(new (ptr) find_data_observer(this)); + observer_ptr o(new (ptr) find_data_observer(this, ep, id)); #ifdef TORRENT_DEBUG o->m_in_constructor = false; #endif + return o; +} + +bool refresh::invoke(observer_ptr o) +{ entry e; e["y"] = "q"; e["q"] = "find_node"; entry& a = e["a"]; a["target"] = target().to_string(); - m_node.m_rpc.invoke(e, addr, o); + m_node.m_rpc.invoke(e, o->target_ep(), o); return true; } diff --git a/src/kademlia/rpc_manager.cpp b/src/kademlia/rpc_manager.cpp index 97f909d11..62ddc600c 100644 --- a/src/kademlia/rpc_manager.cpp +++ b/src/kademlia/rpc_manager.cpp @@ -125,14 +125,14 @@ void observer::abort() { if (m_done) return; m_done = true; - m_algorithm->failed(target_ep(), traversal_algorithm::prevent_request); + m_algorithm->failed(observer_ptr(this), traversal_algorithm::prevent_request); } void observer::done() { if (m_done) return; m_done = true; - m_algorithm->finished(target_ep()); + m_algorithm->finished(observer_ptr(this)); } void observer::short_timeout() @@ -140,7 +140,7 @@ void observer::short_timeout() if (m_short_timeout) return; TORRENT_ASSERT(m_short_timeout == false); m_short_timeout = true; - m_algorithm->failed(target_ep(), traversal_algorithm::short_timeout); + m_algorithm->failed(observer_ptr(this), traversal_algorithm::short_timeout); } // this is called when no reply has been received within @@ -149,7 +149,7 @@ void observer::timeout() { if (m_done) return; m_done = true; - m_algorithm->failed(target_ep()); + m_algorithm->failed(observer_ptr(this)); } node_id generate_id(); @@ -182,9 +182,11 @@ rpc_manager::rpc_manager(node_id const& our_id #define PRINT_OFFSETOF(x, y) TORRENT_LOG(rpc) << " +" << offsetof(x, y) << ": " #y TORRENT_LOG(rpc) << " observer: " << sizeof(observer); + PRINT_OFFSETOF(observer, flags); PRINT_OFFSETOF(observer, m_sent); PRINT_OFFSETOF(observer, m_refs); PRINT_OFFSETOF(observer, m_algorithm); + PRINT_OFFSETOF(observer, m_id); PRINT_OFFSETOF(observer, m_addr); PRINT_OFFSETOF(observer, m_port); PRINT_OFFSETOF(observer, m_transaction_id); @@ -193,12 +195,6 @@ rpc_manager::rpc_manager(node_id const& our_id TORRENT_LOG(rpc) << " null_observer: " << sizeof(null_observer); TORRENT_LOG(rpc) << " find_data_observer: " << sizeof(find_data_observer); - TORRENT_LOG(rpc) << " traversal_algorithm::result: " << sizeof(traversal_algorithm::result); - PRINT_OFFSETOF(traversal_algorithm::result, id); - PRINT_OFFSETOF(traversal_algorithm::result, addr); - PRINT_OFFSETOF(traversal_algorithm::result, port); - PRINT_OFFSETOF(traversal_algorithm::result, flags); - #undef PRINT_OFFSETOF #endif diff --git a/src/kademlia/traversal_algorithm.cpp b/src/kademlia/traversal_algorithm.cpp index eaaa0f906..b0e028e9a 100644 --- a/src/kademlia/traversal_algorithm.cpp +++ b/src/kademlia/traversal_algorithm.cpp @@ -46,36 +46,62 @@ namespace libtorrent { namespace dht TORRENT_DEFINE_LOG(traversal) #endif +observer_ptr traversal_algorithm::new_observer(void* ptr + , udp::endpoint const& ep, node_id const& id) +{ + observer_ptr o(new (ptr) null_observer(boost::intrusive_ptr(this), ep, id)); +#ifdef TORRENT_DEBUG + o->m_in_constructor = false; +#endif + return o; +} + void traversal_algorithm::add_entry(node_id const& id, udp::endpoint addr, unsigned char flags) { - result entry(id, addr, flags); - if (entry.id.is_all_zeros()) + TORRENT_ASSERT(m_node.m_rpc.allocation_size() >= sizeof(find_data_observer)); + m_node.m_rpc.allocator().set_next_size(10); + void* ptr = m_node.m_rpc.allocator().malloc(); + if (ptr == 0) { - entry.id = generate_id(); - entry.flags |= result::no_id; +#ifdef TORRENT_DHT_VERBOSE_LOGGING + TORRENT_LOG(traversal) << "[" << this << "] failed to " + "allocate memory for observer. aborting!"; +#endif + done(); + return; + } + observer_ptr o = new_observer(ptr, addr, id); + if (id.is_all_zeros()) + { + o->set_id(generate_id()); + o->flags |= observer::flag_no_id; } - std::vector::iterator i = std::lower_bound( + o->flags |= flags; + + std::vector::iterator i = std::lower_bound( m_results.begin() , m_results.end() - , entry + , o , boost::bind( compare_ref - , boost::bind(&result::id, _1) - , boost::bind(&result::id, _2) + , boost::bind(&observer::id, _1) + , boost::bind(&observer::id, _2) , m_target ) ); - if (i == m_results.end() || i->id != id) + if (i == m_results.end() || (*i)->id() != id) { TORRENT_ASSERT(std::find_if(m_results.begin(), m_results.end() - , boost::bind(&result::id, _1) == id) == m_results.end()); + , boost::bind(&observer::id, _1) == id) == m_results.end()); #ifdef TORRENT_DHT_VERBOSE_LOGGING TORRENT_LOG(traversal) << "[" << this << "] adding result: " << id << " " << addr; #endif - m_results.insert(i, entry); + i = m_results.insert(i, o); } + + if (m_results.size() > 100) m_results.resize(100); } void traversal_algorithm::start() @@ -102,29 +128,21 @@ void traversal_algorithm::traverse(node_id const& id, udp::endpoint addr) add_entry(id, addr, 0); } -void traversal_algorithm::finished(udp::endpoint const& ep) +void traversal_algorithm::finished(observer_ptr o) { - std::vector::iterator i = std::find_if( - m_results.begin() - , m_results.end() - , boost::bind( - std::equal_to() - , boost::bind(&result::endpoint, _1) - , ep - ) - ); +#ifdef TORRENT_DEBUG + std::vector::iterator i = std::find( + m_results.begin(), m_results.end(), o); TORRENT_ASSERT(i != m_results.end()); +#endif - if (i != m_results.end()) - { - // if this flag is set, it means we increased the - // branch factor for it, and we should restore it - if (i->flags & result::short_timeout) - --m_branch_factor; - } + // if this flag is set, it means we increased the + // branch factor for it, and we should restore it + if (o->flags & observer::flag_short_timeout) + --m_branch_factor; - i->flags |= result::alive; + o->flags |= observer::flag_alive; ++m_responses; --m_invoke_count; @@ -136,59 +154,52 @@ void traversal_algorithm::finished(udp::endpoint const& ep) // prevent request means that the total number of requests has // overflown. This query failed because it was the oldest one. // So, if this is true, don't make another request -void traversal_algorithm::failed(udp::endpoint const& ep, int flags) +void traversal_algorithm::failed(observer_ptr o, int flags) { TORRENT_ASSERT(m_invoke_count >= 0); if (m_results.empty()) return; - std::vector::iterator i = std::find_if( - m_results.begin() - , m_results.end() - , boost::bind( - std::equal_to() - , boost::bind(&result::endpoint, _1) - , ep - ) - ); - - TORRENT_ASSERT(i != m_results.end()); - - if (i != m_results.end()) + TORRENT_ASSERT(o->flags & observer::flag_queried); + if (flags & short_timeout) { - TORRENT_ASSERT(i->flags & result::queried); - if (flags & short_timeout) - { - // short timeout means that it has been more than - // two seconds since we sent the request, and that - // we'll most likely not get a response. But, in case - // we do get a late response, keep the handler - // around for some more, but open up the slot - // by increasing the branch factor - if ((i->flags & result::short_timeout) == 0) - ++m_branch_factor; - i->flags |= result::short_timeout; - } - else - { - i->flags |= result::failed; + // short timeout means that it has been more than + // two seconds since we sent the request, and that + // we'll most likely not get a response. But, in case + // we do get a late response, keep the handler + // around for some more, but open up the slot + // by increasing the branch factor + if ((o->flags & observer::flag_short_timeout) == 0) + ++m_branch_factor; + o->flags |= observer::flag_short_timeout; #ifdef TORRENT_DHT_VERBOSE_LOGGING - TORRENT_LOG(traversal) << " [" << this << "] failed: " - << i->id << " " << i->endpoint(); + TORRENT_LOG(traversal) << " [" << this << "] first chance timeout: " + << o->id() << " " << o->target_ep() + << " branch-factor: " << m_branch_factor + << " invoke-count: " << m_invoke_count; #endif - // if this flag is set, it means we increased the - // branch factor for it, and we should restore it - if (i->flags & result::short_timeout) - --m_branch_factor; + } + else + { + o->flags |= observer::flag_failed; + // if this flag is set, it means we increased the + // branch factor for it, and we should restore it + if (o->flags & observer::flag_short_timeout) + --m_branch_factor; - // don't tell the routing table about - // node ids that we just generated ourself - if ((i->flags & result::no_id) == 0) - m_node.m_table.node_failed(i->id); - ++m_timeouts; - --m_invoke_count; - TORRENT_ASSERT(m_invoke_count >= 0); - } +#ifdef TORRENT_DHT_VERBOSE_LOGGING + TORRENT_LOG(traversal) << " [" << this << "] failed: " + << o->id() << " " << o->target_ep() + << " branch-factor: " << m_branch_factor + << " invoke-count: " << m_invoke_count; +#endif + // don't tell the routing table about + // node ids that we just generated ourself + if ((o->flags & observer::flag_no_id) == 0) + m_node.m_table.node_failed(o->id()); + ++m_timeouts; + --m_invoke_count; + TORRENT_ASSERT(m_invoke_count >= 0); } if (flags & prevent_request) @@ -200,6 +211,13 @@ void traversal_algorithm::failed(udp::endpoint const& ep, int flags) if (m_invoke_count == 0) done(); } +void traversal_algorithm::done() +{ + // delete all our references to the observer objects so + // they will in turn release the traversal algorithm + m_results.clear(); +} + namespace { bool bitwise_nand(unsigned char lhs, unsigned char rhs) @@ -213,23 +231,25 @@ void traversal_algorithm::add_requests() int results_target = m_node.m_table.bucket_size(); // Find the first node that hasn't already been queried. - for (std::vector::iterator i = m_results.begin() + for (std::vector::iterator i = m_results.begin() , end(m_results.end()); i != end && results_target > 0 && m_invoke_count < m_branch_factor; ++i) { - if (i->flags & result::alive) --results_target; - if (i->flags & result::queried) continue; + if ((*i)->flags & observer::flag_alive) --results_target; + if ((*i)->flags & observer::flag_queried) continue; #ifdef TORRENT_DHT_VERBOSE_LOGGING - TORRENT_LOG(traversal) << " [" << this << "] nodes left: " - << (m_results.end() - i); + TORRENT_LOG(traversal) << " [" << this << "]" + << " nodes-left: " << (m_results.end() - i) + << " invoke-count: " << m_invoke_count + << " branch-factor: " << m_branch_factor; #endif - if (invoke(i->endpoint())) + if (invoke(*i)) { TORRENT_ASSERT(m_invoke_count >= 0); ++m_invoke_count; - i->flags |= result::queried; + (*i)->flags |= observer::flag_queried; } } } @@ -243,7 +263,7 @@ void traversal_algorithm::add_router_entries() for (routing_table::router_iterator i = m_node.m_table.router_begin() , end(m_node.m_table.router_end()); i != end; ++i) { - add_entry(node_id(0), *i, result::initial); + add_entry(node_id(0), *i, observer::flag_initial); } } @@ -268,10 +288,10 @@ void traversal_algorithm::status(dht_lookup& l) l.branch_factor = m_branch_factor; l.type = name(); l.nodes_left = 0; - for (std::vector::iterator i = m_results.begin() + for (std::vector::iterator i = m_results.begin() , end(m_results.end()); i != end; ++i) { - if (i->flags & result::queried) continue; + if ((*i)->flags & observer::flag_queried) continue; ++l.nodes_left; } }