more DHT fixes and simplifications

This commit is contained in:
Arvid Norberg 2009-10-09 02:34:25 +00:00
parent fdb3b355ad
commit 474566fa21
8 changed files with 52 additions and 43 deletions

View File

@ -1394,9 +1394,10 @@ int main(int argc, char* argv[])
for (std::vector<dht_lookup>::iterator i = sess_stat.active_requests.begin()
, end(sess_stat.active_requests.end()); i != end; ++i)
{
snprintf(str, sizeof(str), " %s in flight: %d [limit: %d] timeouts %d responses %d\n"
snprintf(str, sizeof(str)
, " %s in flight: %d [limit: %d] timeouts %d responses %d left %d\n"
, i->type, i->outstanding_requests, i->branch_factor, i->timeouts
, i->responses);
, i->responses, i->nodes_left);
out += str;
}
}

View File

@ -78,7 +78,7 @@ public:
protected:
void done();
virtual bool invoke(node_id const& id, udp::endpoint addr);
virtual bool invoke(udp::endpoint addr);
private:
@ -94,8 +94,7 @@ class find_data_observer : public observer
{
public:
find_data_observer(
boost::intrusive_ptr<traversal_algorithm> const& algorithm
, node_id self)
boost::intrusive_ptr<traversal_algorithm> const& algorithm)
: observer(algorithm)
{}
void reply(msg const&);

View File

@ -57,7 +57,7 @@ public:
protected:
virtual bool invoke(node_id const& id, udp::endpoint addr);
virtual bool invoke(udp::endpoint addr);
};
} } // namespace libtorrent::dht

View File

@ -117,7 +117,8 @@ public:
no_id = 4,
short_timeout = 8,
failed = 16,
ipv6_address = 32
ipv6_address = 32,
alive = 64
};
unsigned char flags;
};
@ -145,9 +146,7 @@ protected:
void init();
virtual void done() {}
virtual bool invoke(node_id const& id, udp::endpoint addr) { return false; }
std::vector<result>::iterator last_iterator();
virtual bool invoke(udp::endpoint addr) { return false; }
friend void intrusive_ptr_add_ref(traversal_algorithm* p)
{

View File

@ -48,6 +48,7 @@ namespace libtorrent
int timeouts;
int responses;
int branch_factor;
int nodes_left;
};
#endif

View File

@ -45,7 +45,7 @@ namespace libtorrent { namespace dht
{
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_DECLARE_LOG(dht_tracker);
TORRENT_DECLARE_LOG(traversal);
#endif
using detail::read_v4_endpoint;
@ -63,7 +63,7 @@ void find_data_observer::reply(msg const& m)
if (!r)
{
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(dht_tracker) << "[" << m_algorithm.get() << "] missing response dict";
TORRENT_LOG(traversal) << "[" << m_algorithm.get() << "] missing response dict";
#endif
return;
}
@ -72,7 +72,7 @@ void find_data_observer::reply(msg const& m)
if (!id || id->string_length() != 20)
{
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(dht_tracker) << "[" << m_algorithm.get() << "] invalid id in response";
TORRENT_LOG(traversal) << "[" << m_algorithm.get() << "] invalid id in response";
#endif
return;
}
@ -162,7 +162,7 @@ void find_data_observer::reply(msg const& m)
}
#ifdef TORRENT_DHT_VERBOSE_LOGGING
log_line << " ]";
TORRENT_LOG(dht_tracker) << log_line.str();
TORRENT_LOG(traversal) << log_line.str();
#endif
done();
}
@ -186,7 +186,7 @@ find_data::find_data(
}
}
bool find_data::invoke(node_id const& id, udp::endpoint addr)
bool find_data::invoke(udp::endpoint addr)
{
if (m_done)
{
@ -198,11 +198,15 @@ bool find_data::invoke(node_id const& id, udp::endpoint addr)
void* ptr = m_node.m_rpc.allocator().malloc();
if (ptr == 0)
{
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(traversal) << "[" << this << "] failed to "
"allocate memory for observer. aborting!";
#endif
done();
return false;
}
m_node.m_rpc.allocator().set_next_size(10);
observer_ptr o(new (ptr) find_data_observer(this, id));
observer_ptr o(new (ptr) find_data_observer(this));
#ifdef TORRENT_DEBUG
o->m_in_constructor = false;
#endif
@ -210,7 +214,7 @@ bool find_data::invoke(node_id const& id, udp::endpoint addr)
e["y"] = "q";
e["q"] = "get_peers";
entry& a = e["a"];
a["info_hash"] = id.to_string();
a["info_hash"] = m_target.to_string();
return m_node.m_rpc.invoke(e, addr, o);
}

View File

@ -41,6 +41,10 @@ POSSIBILITY OF SUCH DAMAGE.
namespace libtorrent { namespace dht
{
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_DECLARE_LOG(traversal);
#endif
refresh::refresh(
node_impl& node
, node_id target
@ -54,17 +58,21 @@ char const* refresh::name() const
return "refresh";
}
bool refresh::invoke(node_id const& nid, udp::endpoint addr)
bool refresh::invoke(udp::endpoint addr)
{
TORRENT_ASSERT(m_node.m_rpc.allocation_size() >= sizeof(find_data_observer));
void* ptr = m_node.m_rpc.allocator().malloc();
if (ptr == 0)
{
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(traversal) << "[" << this << "] failed to "
"allocate memory for observer. aborting!";
#endif
done();
return false;
}
m_node.m_rpc.allocator().set_next_size(10);
observer_ptr o(new (ptr) find_data_observer(this, nid));
observer_ptr o(new (ptr) find_data_observer(this));
#ifdef TORRENT_DEBUG
o->m_in_constructor = false;
#endif

View File

@ -126,6 +126,8 @@ void traversal_algorithm::finished(udp::endpoint const& ep)
--m_branch_factor;
}
i->flags |= result::alive;
++m_responses;
--m_invoke_count;
TORRENT_ASSERT(m_invoke_count >= 0);
@ -210,27 +212,22 @@ namespace
void traversal_algorithm::add_requests()
{
while (m_invoke_count < m_branch_factor)
int results_target = m_node.m_table.bucket_size();
// Find the first node that hasn't already been queried.
for (std::vector<result>::iterator i = m_results.begin()
, end(m_results.end()); i != end
&& results_target > 0 && m_invoke_count < m_branch_factor; ++i)
{
// Find the first node that hasn't already been queried.
// TODO: Better heuristic
std::vector<result>::iterator i = std::find_if(
m_results.begin()
, last_iterator()
, bind(
&bitwise_nand
, bind(&result::flags, _1)
, (unsigned char)result::queried
)
);
if (i->flags & result::alive) --results_target;
if (i->flags & result::queried) continue;
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(traversal) << " [" << this << "] nodes left ("
<< this << "): " << (last_iterator() - i);
TORRENT_LOG(traversal) << " [" << this << "] nodes left: "
<< (m_results.end() - i);
#endif
if (i == last_iterator()) break;
if (invoke(i->id, i->endpoint()))
if (invoke(i->endpoint()))
{
TORRENT_ASSERT(m_invoke_count >= 0);
++m_invoke_count;
@ -266,13 +263,13 @@ void traversal_algorithm::status(dht_lookup& l)
l.outstanding_requests = m_invoke_count;
l.branch_factor = m_branch_factor;
l.type = name();
}
std::vector<traversal_algorithm::result>::iterator traversal_algorithm::last_iterator()
{
int max_results = m_node.m_table.bucket_size();
return (int)m_results.size() >= max_results ?
m_results.begin() + max_results : m_results.end();
l.nodes_left = 0;
for (std::vector<result>::iterator i = m_results.begin()
, end(m_results.end()); i != end; ++i)
{
if (i->flags & result::queried) continue;
++l.nodes_left;
}
}
} } // namespace libtorrent::dht