diff --git a/examples/client_test.cpp b/examples/client_test.cpp index ba1a26643..fc7fafbe1 100644 --- a/examples/client_test.cpp +++ b/examples/client_test.cpp @@ -1391,8 +1391,9 @@ int main(int argc, char* argv[]) for (std::vector::iterator i = sess_stat.active_requests.begin() , end(sess_stat.active_requests.end()); i != end; ++i) { - snprintf(str, sizeof(str), " %s %d (%d) ( timeouts %d responses %d)\n" - , i->type, i->outstanding_requests, i->branch_factor, i->timeouts, i->responses); + snprintf(str, sizeof(str), " %s in flight: %d [limit: %d] timeouts %d responses %d\n" + , i->type, i->outstanding_requests, i->branch_factor, i->timeouts + , i->responses); out += str; } } diff --git a/include/libtorrent/kademlia/dht_tracker.hpp b/include/libtorrent/kademlia/dht_tracker.hpp index aafb8623b..53aa424c4 100644 --- a/include/libtorrent/kademlia/dht_tracker.hpp +++ b/include/libtorrent/kademlia/dht_tracker.hpp @@ -165,8 +165,6 @@ namespace libtorrent { namespace dht int m_replies_bytes_sent[5]; int m_queries_bytes_received[5]; int m_counter; - int m_announces; - int m_failed_announces; int m_total_message_input; int m_total_in_bytes; diff --git a/include/libtorrent/kademlia/find_data.hpp b/include/libtorrent/kademlia/find_data.hpp index 9565004af..39f330e83 100644 --- a/include/libtorrent/kademlia/find_data.hpp +++ b/include/libtorrent/kademlia/find_data.hpp @@ -101,6 +101,7 @@ public: , m_self(self) {} ~find_data_observer(); + void short_timeout(); void timeout(); void reply(msg const&); void abort() { m_algorithm = 0; } diff --git a/include/libtorrent/kademlia/node.hpp b/include/libtorrent/kademlia/node.hpp index 15b7e7a16..18d08ddcf 100644 --- a/include/libtorrent/kademlia/node.hpp +++ b/include/libtorrent/kademlia/node.hpp @@ -164,6 +164,7 @@ public: , m_token(write_token) {} + void short_timeout() {} void timeout() {} void reply(msg const&) {} void abort() {} diff --git a/include/libtorrent/kademlia/observer.hpp b/include/libtorrent/kademlia/observer.hpp index 7f80d405f..7aa9c109e 100644 --- a/include/libtorrent/kademlia/observer.hpp +++ b/include/libtorrent/kademlia/observer.hpp @@ -82,6 +82,10 @@ struct observer : boost::noncopyable // this is called when a reply is received virtual void reply(msg const& m) = 0; + // this is called if no response has been received after + // a few seconds, before the request has timed out + virtual void short_timeout() = 0; + // this is called when no reply has been received within // some timeout virtual void timeout() = 0; diff --git a/include/libtorrent/kademlia/rpc_manager.hpp b/include/libtorrent/kademlia/rpc_manager.hpp index ca86bf671..035465640 100644 --- a/include/libtorrent/kademlia/rpc_manager.hpp +++ b/include/libtorrent/kademlia/rpc_manager.hpp @@ -54,8 +54,6 @@ POSSIBILITY OF SUCH DAMAGE. namespace libtorrent { namespace dht { -struct observer; - #ifdef TORRENT_DHT_VERBOSE_LOGGING TORRENT_DECLARE_LOG(rpc); #endif @@ -64,6 +62,7 @@ struct null_observer : public observer { null_observer(boost::pool<>& allocator): observer(allocator) {} virtual void reply(msg const&) {} + virtual void short_timeout() {} virtual void timeout() {} void abort() {} }; diff --git a/include/libtorrent/kademlia/traversal_algorithm.hpp b/include/libtorrent/kademlia/traversal_algorithm.hpp index d14560bd4..cdcf8b044 100644 --- a/include/libtorrent/kademlia/traversal_algorithm.hpp +++ b/include/libtorrent/kademlia/traversal_algorithm.hpp @@ -61,7 +61,9 @@ class traversal_algorithm : boost::noncopyable public: void traverse(node_id const& id, udp::endpoint addr); void finished(node_id const& id); - void failed(node_id const& id, bool prevent_request = false); + + enum flags_t { prevent_request = 1, short_timeout = 2 }; + void failed(node_id const& id, int flags = 0); virtual ~traversal_algorithm(); boost::pool<>& allocator() const; void status(dht_lookup& l); @@ -81,7 +83,7 @@ public: node_id id; // TODO: replace with union of address_v4 and address_v6 and a port udp::endpoint addr; - enum { queried = 1, initial = 2, no_id = 4 }; + enum { queried = 1, initial = 2, no_id = 4, short_timeout = 8 }; unsigned char flags; }; diff --git a/src/kademlia/dht_tracker.cpp b/src/kademlia/dht_tracker.cpp index 9786bb058..97ca90012 100644 --- a/src/kademlia/dht_tracker.cpp +++ b/src/kademlia/dht_tracker.cpp @@ -110,6 +110,9 @@ namespace libtorrent { namespace dht int g_gr_message_input = 0; int g_mo_message_input = 0; int g_unknown_message_input = 0; + + int g_announces = 0; + int g_failed_announces = 0; #endif void intrusive_ptr_add_ref(dht_tracker const* c) @@ -230,8 +233,8 @@ namespace libtorrent { namespace dht std::fill_n(m_queries_bytes_received, 5, 0); std::fill_n(m_replies_sent, 5, 0); std::fill_n(m_queries_received, 5, 0); - m_announces = 0; - m_failed_announces = 0; + g_announces = 0; + g_failed_announces = 0; m_total_message_input = 0; m_total_in_bytes = 0; m_total_out_bytes = 0; @@ -396,8 +399,8 @@ namespace libtorrent { namespace dht pc << "\t" << torrents << "\t" << peers - << "\t" << m_announces / float(tick_period) - << "\t" << m_failed_announces / float(tick_period) + << "\t" << g_announces / float(tick_period) + << "\t" << g_failed_announces / float(tick_period) << "\t" << (m_total_message_input / float(tick_period)) << "\t" << (g_az_message_input / float(tick_period)) << "\t" << (g_ut_message_input / float(tick_period)) @@ -414,8 +417,8 @@ namespace libtorrent { namespace dht std::fill_n(m_queries_bytes_received, 5, 0); std::fill_n(m_replies_sent, 5, 0); std::fill_n(m_queries_received, 5, 0); - m_announces = 0; - m_failed_announces = 0; + g_announces = 0; + g_failed_announces = 0; m_total_message_input = 0; g_az_message_input = 0; g_ut_message_input = 0; diff --git a/src/kademlia/find_data.cpp b/src/kademlia/find_data.cpp index 53eb5de47..aaf488860 100644 --- a/src/kademlia/find_data.cpp +++ b/src/kademlia/find_data.cpp @@ -177,6 +177,12 @@ void find_data_observer::reply(msg const& m) #endif } +void find_data_observer::short_timeout() +{ + if (!m_algorithm) return; + m_algorithm->failed(m_self, traversal_algorithm::short_timeout); +} + void find_data_observer::timeout() { if (!m_algorithm) return; @@ -184,7 +190,6 @@ void find_data_observer::timeout() m_algorithm = 0; } - find_data::find_data( node_impl& node , node_id target diff --git a/src/kademlia/node.cpp b/src/kademlia/node.cpp index 83608efb6..05474e10f 100644 --- a/src/kademlia/node.cpp +++ b/src/kademlia/node.cpp @@ -794,6 +794,7 @@ void node_impl::incoming_request(msg const& m, entry& e) } else if (strcmp(query, "announce_peer") == 0) { + extern int g_failed_announces; key_desc_t msg_desc[] = { {"info_hash", lazy_entry::string_t, 20, 0}, {"port", lazy_entry::int_t, 0, 0}, @@ -803,6 +804,7 @@ void node_impl::incoming_request(msg const& m, entry& e) lazy_entry const* msg_keys[3]; if (!verify_message(arg_ent, msg_desc, msg_keys, 3, error_string, sizeof(error_string))) { + ++g_failed_announces; incoming_error(e, error_string); return; } @@ -810,6 +812,7 @@ void node_impl::incoming_request(msg const& m, entry& e) int port = msg_keys[1]->int_value(); if (port < 0 || port >= 65536) { + ++g_failed_announces; incoming_error(e, "invalid 'port' in announce"); return; } @@ -822,6 +825,7 @@ void node_impl::incoming_request(msg const& m, entry& e) if (!verify_token(msg_keys[2]->string_value(), msg_keys[0]->string_ptr(), m.addr)) { + ++g_failed_announces; incoming_error(e, "invalid token in announce"); return; } @@ -838,6 +842,8 @@ void node_impl::incoming_request(msg const& m, entry& e) std::set::iterator i = v.peers.find(e); if (i != v.peers.end()) v.peers.erase(i++); v.peers.insert(i, e); + extern int g_announces; + ++g_announces; } else if (strcmp(query, "find_torrent") == 0) { diff --git a/src/kademlia/rpc_manager.cpp b/src/kademlia/rpc_manager.cpp index d73f8a7e2..f94b507ae 100644 --- a/src/kademlia/rpc_manager.cpp +++ b/src/kademlia/rpc_manager.cpp @@ -338,15 +338,17 @@ time_duration rpc_manager::tick() { INVARIANT_CHECK; - const int timeout_ms = 10 * 1000; + const static int short_timeout = 2; + const static int timeout = 10; - // look for observers that has timed out + // look for observers that have timed out - if (m_next_transaction_id == m_oldest_transaction_id) return milliseconds(timeout_ms); + if (m_next_transaction_id == m_oldest_transaction_id) return seconds(short_timeout); std::vector timeouts; - time_duration ret = milliseconds(timeout_ms); + time_duration ret = seconds(short_timeout); + ptime now = time_now(); for (;m_next_transaction_id != m_oldest_transaction_id; m_oldest_transaction_id = (m_oldest_transaction_id + 1) % max_transactions) @@ -357,19 +359,14 @@ time_duration rpc_manager::tick() observer_ptr o = m_transactions[m_oldest_transaction_id]; if (!o) continue; - time_duration diff = o->sent() + milliseconds(timeout_ms) - time_now(); - if (diff > seconds(0)) + // if we reach an observer that hasn't timed out + // break, because every observer after this one will + // also not have timed out yet + time_duration diff = now - o->sent(); + if (diff < seconds(timeout)) { - if (diff < seconds(1)) - { - ret = seconds(1); - break; - } - else - { - ret = diff; - break; - } + ret = seconds(timeout) - diff; + break; } #ifndef BOOST_NO_EXCEPTIONS @@ -389,11 +386,35 @@ time_duration rpc_manager::tick() std::for_each(timeouts.begin(), timeouts.end(), bind(&observer::timeout, _1)); timeouts.clear(); - + // clear the aborted transactions, will likely // generate new requests. We need to swap, since the // destrutors may add more observers to the m_aborted_transactions std::vector().swap(m_aborted_transactions); + + for (int i = m_oldest_transaction_id; i != m_next_transaction_id; + i = (i + 1) % max_transactions) + { + observer_ptr o = m_transactions[i]; + if (!o) continue; + + // if we reach an observer that hasn't timed out + // break, because every observer after this one will + // also not have timed out yet + time_duration diff = now - o->sent(); + if (diff < seconds(short_timeout)) + { + ret = seconds(short_timeout) - diff; + break; + } + + // TODO: don't call short_timeout() again if we've + // already called it once + timeouts.push_back(o); + } + + std::for_each(timeouts.begin(), timeouts.end(), bind(&observer::short_timeout, _1)); + return ret; } diff --git a/src/kademlia/traversal_algorithm.cpp b/src/kademlia/traversal_algorithm.cpp index 539f21097..a414344bc 100644 --- a/src/kademlia/traversal_algorithm.cpp +++ b/src/kademlia/traversal_algorithm.cpp @@ -108,8 +108,29 @@ void traversal_algorithm::traverse(node_id const& id, udp::endpoint addr) void traversal_algorithm::finished(node_id const& id) { + std::vector::iterator i = std::find_if( + m_results.begin() + , m_results.end() + , bind( + std::equal_to() + , bind(&result::id, _1) + , id + ) + ); + + TORRENT_ASSERT(i != m_results.end()); + + if (i != m_results.end()) + { + // if this flag is set, it means we increased the + // branch factor for it, and we should restore it + if (i->flags & result::short_timeout) + --m_branch_factor; + } + ++m_responses; --m_invoke_count; + TORRENT_ASSERT(m_invoke_count >= 0); add_requests(); if (m_invoke_count == 0) done(); } @@ -117,9 +138,9 @@ void traversal_algorithm::finished(node_id const& id) // prevent request means that the total number of requests has // overflown. This query failed because it was the oldest one. // So, if this is true, don't make another request -void traversal_algorithm::failed(node_id const& id, bool prevent_request) +void traversal_algorithm::failed(node_id const& id, int flags) { - --m_invoke_count; + TORRENT_ASSERT(m_invoke_count >= 0); TORRENT_ASSERT(!id.is_all_zeros()); std::vector::iterator i = std::find_if( @@ -137,18 +158,44 @@ void traversal_algorithm::failed(node_id const& id, bool prevent_request) if (i != m_results.end()) { TORRENT_ASSERT(i->flags & result::queried); - m_failed.insert(i->addr); + if (flags & short_timeout) + { + // short timeout means that it has been more than + // two seconds since we sent the request, and that + // we'll most likely not get a response. But, in case + // we do get a late response, keep the handler + // around for some more, but open up the slot + // by increasing the branch factor + if ((i->flags & result::short_timeout) == 0) + ++m_branch_factor; + i->flags |= result::short_timeout; + } + else + { + m_failed.insert(i->addr); #ifdef TORRENT_DHT_VERBOSE_LOGGING - TORRENT_LOG(traversal) << "failed: " << i->id << " " << i->addr; + TORRENT_LOG(traversal) << "failed: " << i->id << " " << i->addr; #endif - // don't tell the routing table about - // node ids that we just generated ourself - if ((i->flags & result::no_id) == 0) - m_node.m_table.node_failed(id); - m_results.erase(i); - ++m_timeouts; + // if this flag is set, it means we increased the + // branch factor for it, and we should restore it + if (i->flags & result::short_timeout) + --m_branch_factor; + + // don't tell the routing table about + // node ids that we just generated ourself + if ((i->flags & result::no_id) == 0) + m_node.m_table.node_failed(id); + m_results.erase(i); + ++m_timeouts; + --m_invoke_count; + } } - if (prevent_request) + else + { + --m_invoke_count; + } + + if (flags & prevent_request) { --m_branch_factor; if (m_branch_factor <= 0) m_branch_factor = 1; @@ -186,13 +233,17 @@ void traversal_algorithm::add_requests() if (i == last_iterator()) break; +#ifndef BOOST_NO_EXCEPTIONS try { +#endif invoke(i->id, i->addr); ++m_invoke_count; i->flags |= result::queried; +#ifndef BOOST_NO_EXCEPTIONS } catch (std::exception& e) {} +#endif } }