further simplify the DHT by getting rid of the result type and just using observers instead. This might save some memory and should fix the assert that would be triggered before
This commit is contained in:
parent
23ac52c164
commit
d3a8916f02
|
@ -79,7 +79,8 @@ public:
|
|||
protected:
|
||||
|
||||
void done();
|
||||
virtual bool invoke(udp::endpoint addr);
|
||||
observer_ptr new_observer(void* ptr, udp::endpoint const& ep, node_id const& id);
|
||||
virtual bool invoke(observer_ptr o);
|
||||
|
||||
private:
|
||||
|
||||
|
@ -95,8 +96,9 @@ class find_data_observer : public observer
|
|||
{
|
||||
public:
|
||||
find_data_observer(
|
||||
boost::intrusive_ptr<traversal_algorithm> const& algorithm)
|
||||
: observer(algorithm)
|
||||
boost::intrusive_ptr<traversal_algorithm> const& algorithm
|
||||
, udp::endpoint const& ep, node_id const& id)
|
||||
: observer(algorithm, ep, id)
|
||||
{}
|
||||
void reply(msg const&);
|
||||
};
|
||||
|
|
|
@ -154,8 +154,9 @@ struct null_type {};
|
|||
class announce_observer : public observer
|
||||
{
|
||||
public:
|
||||
announce_observer(boost::intrusive_ptr<traversal_algorithm> const& algo)
|
||||
: observer(algo)
|
||||
announce_observer(boost::intrusive_ptr<traversal_algorithm> const& algo
|
||||
, udp::endpoint const& ep, node_id const& id)
|
||||
: observer(algo, ep, id)
|
||||
{}
|
||||
|
||||
void reply(msg const&) { m_done = true; }
|
||||
|
|
|
@ -67,10 +67,13 @@ struct observer : boost::noncopyable
|
|||
friend TORRENT_EXPORT void intrusive_ptr_add_ref(observer const*);
|
||||
friend TORRENT_EXPORT void intrusive_ptr_release(observer const*);
|
||||
|
||||
observer(boost::intrusive_ptr<traversal_algorithm> const& a)
|
||||
: m_sent()
|
||||
observer(boost::intrusive_ptr<traversal_algorithm> const& a
|
||||
, udp::endpoint const& ep, node_id const& id)
|
||||
: flags(0)
|
||||
, m_sent()
|
||||
, m_refs(0)
|
||||
, m_algorithm(a)
|
||||
, m_id(id)
|
||||
, m_is_v6(false)
|
||||
, m_short_timeout(false)
|
||||
, m_done(false)
|
||||
|
@ -80,6 +83,7 @@ struct observer : boost::noncopyable
|
|||
m_in_constructor = true;
|
||||
m_was_sent = false;
|
||||
#endif
|
||||
set_target(ep);
|
||||
}
|
||||
|
||||
virtual ~observer();
|
||||
|
@ -109,12 +113,26 @@ struct observer : boost::noncopyable
|
|||
address target_addr() const;
|
||||
udp::endpoint target_ep() const;
|
||||
|
||||
void set_id(node_id const& id) { m_id = id; }
|
||||
node_id const& id() const { return m_id; }
|
||||
|
||||
void set_transaction_id(boost::uint16_t tid)
|
||||
{ m_transaction_id = tid; }
|
||||
|
||||
boost::uint16_t transaction_id() const
|
||||
{ return m_transaction_id; }
|
||||
|
||||
enum {
|
||||
flag_queried = 1,
|
||||
flag_initial = 2,
|
||||
flag_no_id = 4,
|
||||
flag_short_timeout = 8,
|
||||
flag_failed = 16,
|
||||
flag_ipv6_address = 32,
|
||||
flag_alive = 64
|
||||
};
|
||||
unsigned char flags;
|
||||
|
||||
#ifndef TORRENT_DHT_VERBOSE_LOGGING
|
||||
protected:
|
||||
#endif
|
||||
|
@ -128,6 +146,8 @@ protected:
|
|||
|
||||
const boost::intrusive_ptr<traversal_algorithm> m_algorithm;
|
||||
|
||||
node_id m_id;
|
||||
|
||||
TORRENT_UNION addr_t
|
||||
{
|
||||
#if TORRENT_USE_IPV6
|
||||
|
|
|
@ -55,7 +55,8 @@ public:
|
|||
|
||||
protected:
|
||||
|
||||
virtual bool invoke(udp::endpoint addr);
|
||||
observer_ptr new_observer(void* ptr, udp::endpoint const& ep, node_id const& id);
|
||||
virtual bool invoke(observer_ptr o);
|
||||
};
|
||||
|
||||
} } // namespace libtorrent::dht
|
||||
|
|
|
@ -55,7 +55,8 @@ TORRENT_DECLARE_LOG(rpc);
|
|||
|
||||
struct null_observer : public observer
|
||||
{
|
||||
null_observer(boost::intrusive_ptr<traversal_algorithm>& a): observer(a) {}
|
||||
null_observer(boost::intrusive_ptr<traversal_algorithm> const& a
|
||||
, udp::endpoint const& ep, node_id const& id): observer(a, ep, id) {}
|
||||
virtual void reply(msg const&) { m_done = true; }
|
||||
};
|
||||
|
||||
|
|
|
@ -39,6 +39,7 @@ POSSIBILITY OF SUCH DAMAGE.
|
|||
#include <libtorrent/kademlia/node_id.hpp>
|
||||
#include <libtorrent/kademlia/routing_table.hpp>
|
||||
#include <libtorrent/kademlia/logging.hpp>
|
||||
#include <libtorrent/kademlia/observer.hpp>
|
||||
#include <libtorrent/address.hpp>
|
||||
|
||||
#include <boost/noncopyable.hpp>
|
||||
|
@ -60,10 +61,10 @@ class node_impl;
|
|||
struct traversal_algorithm : boost::noncopyable
|
||||
{
|
||||
void traverse(node_id const& id, udp::endpoint addr);
|
||||
void finished(udp::endpoint const& ep);
|
||||
void finished(observer_ptr o);
|
||||
|
||||
enum flags_t { prevent_request = 1, short_timeout = 2 };
|
||||
void failed(udp::endpoint const& ep, int flags = 0);
|
||||
void failed(observer_ptr o, int flags = 0);
|
||||
virtual ~traversal_algorithm();
|
||||
boost::pool<>& allocator() const;
|
||||
void status(dht_lookup& l);
|
||||
|
@ -75,60 +76,6 @@ struct traversal_algorithm : boost::noncopyable
|
|||
|
||||
void add_entry(node_id const& id, udp::endpoint addr, unsigned char flags);
|
||||
|
||||
struct result
|
||||
{
|
||||
result(node_id const& id, udp::endpoint ep, unsigned char f = 0)
|
||||
: id(id), flags(f)
|
||||
{
|
||||
#if TORRENT_USE_IPV6
|
||||
if (ep.address().is_v6())
|
||||
{
|
||||
flags |= ipv6_address;
|
||||
addr.v6 = ep.address().to_v6().to_bytes();
|
||||
}
|
||||
else
|
||||
#endif
|
||||
{
|
||||
flags &= ~ipv6_address;
|
||||
addr.v4 = ep.address().to_v4().to_bytes();
|
||||
}
|
||||
port = ep.port();
|
||||
}
|
||||
|
||||
udp::endpoint endpoint() const
|
||||
{
|
||||
#if TORRENT_USE_IPV6
|
||||
if (flags & ipv6_address)
|
||||
return udp::endpoint(address_v6(addr.v6), port);
|
||||
else
|
||||
#endif
|
||||
return udp::endpoint(address_v4(addr.v4), port);
|
||||
}
|
||||
|
||||
node_id id;
|
||||
|
||||
TORRENT_UNION addr_t
|
||||
{
|
||||
address_v4::bytes_type v4;
|
||||
#if TORRENT_USE_IPV6
|
||||
address_v6::bytes_type v6;
|
||||
#endif
|
||||
} addr;
|
||||
|
||||
boost::uint16_t port;
|
||||
|
||||
enum {
|
||||
queried = 1,
|
||||
initial = 2,
|
||||
no_id = 4,
|
||||
short_timeout = 8,
|
||||
failed = 16,
|
||||
ipv6_address = 32,
|
||||
alive = 64
|
||||
};
|
||||
unsigned char flags;
|
||||
};
|
||||
|
||||
traversal_algorithm(
|
||||
node_impl& node
|
||||
, node_id target)
|
||||
|
@ -151,8 +98,13 @@ protected:
|
|||
void add_router_entries();
|
||||
void init();
|
||||
|
||||
virtual void done() {}
|
||||
virtual bool invoke(udp::endpoint addr) { return false; }
|
||||
virtual void done();
|
||||
// should construct an algorithm dependent
|
||||
// observer in ptr.
|
||||
virtual observer_ptr new_observer(void* ptr
|
||||
, udp::endpoint const& ep, node_id const& id);
|
||||
|
||||
virtual bool invoke(observer_ptr o) { return false; }
|
||||
|
||||
friend void intrusive_ptr_add_ref(traversal_algorithm* p)
|
||||
{
|
||||
|
@ -169,7 +121,7 @@ protected:
|
|||
|
||||
node_impl& m_node;
|
||||
node_id m_target;
|
||||
std::vector<result> m_results;
|
||||
std::vector<observer_ptr> m_results;
|
||||
int m_invoke_count;
|
||||
int m_branch_factor;
|
||||
int m_responses;
|
||||
|
|
|
@ -37,11 +37,14 @@ for line in f:
|
|||
print line.split(' ')
|
||||
|
||||
out = open('dht_announce_distribution.dat', 'w+')
|
||||
print 'announce distribution items: %d' % len(announce_histogram)
|
||||
for k,v in announce_histogram.items():
|
||||
print >>out, '%d %d' % (k, v)
|
||||
print '%d %d' % (k, v)
|
||||
out.close()
|
||||
|
||||
out = open('dht_node_uptime_distribution.dat', 'w+')
|
||||
print 'node uptimes: %d' % len(node_uptime_histogram)
|
||||
for k,v in node_uptime_histogram.items():
|
||||
print >>out, '%d %d' % (k + up_time_quanta/2, v)
|
||||
out.close()
|
||||
|
|
|
@ -623,7 +623,7 @@ namespace libtorrent { namespace dht
|
|||
#ifdef TORRENT_DHT_VERBOSE_LOGGING
|
||||
std::stringstream log_line;
|
||||
lazy_entry print;
|
||||
int ret = lazy_bdecode(&m_send_buf[0], &m_send_buf[0] + m_send_buf.size(), print, 0, ec);
|
||||
int ret = lazy_bdecode(&m_send_buf[0], &m_send_buf[0] + m_send_buf.size(), print, ec);
|
||||
TORRENT_ASSERT(ret == 0);
|
||||
log_line << print_entry(print, true);
|
||||
#endif
|
||||
|
|
|
@ -172,7 +172,7 @@ void find_data_observer::reply(msg const& m)
|
|||
void add_entry_fun(void* userdata, node_entry const& e)
|
||||
{
|
||||
traversal_algorithm* f = (traversal_algorithm*)userdata;
|
||||
f->add_entry(e.id, e.ep(), traversal_algorithm::result::initial);
|
||||
f->add_entry(e.id, e.ep(), observer::flag_initial);
|
||||
}
|
||||
|
||||
find_data::find_data(
|
||||
|
@ -190,7 +190,17 @@ find_data::find_data(
|
|||
node.m_table.for_each_node(&add_entry_fun, 0, (traversal_algorithm*)this);
|
||||
}
|
||||
|
||||
bool find_data::invoke(udp::endpoint addr)
|
||||
observer_ptr find_data::new_observer(void* ptr
|
||||
, udp::endpoint const& ep, node_id const& id)
|
||||
{
|
||||
observer_ptr o(new (ptr) find_data_observer(this, ep, id));
|
||||
#ifdef TORRENT_DEBUG
|
||||
o->m_in_constructor = false;
|
||||
#endif
|
||||
return o;
|
||||
}
|
||||
|
||||
bool find_data::invoke(observer_ptr o)
|
||||
{
|
||||
if (m_done)
|
||||
{
|
||||
|
@ -198,28 +208,12 @@ bool find_data::invoke(udp::endpoint addr)
|
|||
return false;
|
||||
}
|
||||
|
||||
TORRENT_ASSERT(m_node.m_rpc.allocation_size() >= sizeof(find_data_observer));
|
||||
void* ptr = m_node.m_rpc.allocator().malloc();
|
||||
if (ptr == 0)
|
||||
{
|
||||
#ifdef TORRENT_DHT_VERBOSE_LOGGING
|
||||
TORRENT_LOG(traversal) << "[" << this << "] failed to "
|
||||
"allocate memory for observer. aborting!";
|
||||
#endif
|
||||
done();
|
||||
return false;
|
||||
}
|
||||
m_node.m_rpc.allocator().set_next_size(10);
|
||||
observer_ptr o(new (ptr) find_data_observer(this));
|
||||
#ifdef TORRENT_DEBUG
|
||||
o->m_in_constructor = false;
|
||||
#endif
|
||||
entry e;
|
||||
e["y"] = "q";
|
||||
e["q"] = "get_peers";
|
||||
entry& a = e["a"];
|
||||
a["info_hash"] = m_target.to_string();
|
||||
return m_node.m_rpc.invoke(e, addr, o);
|
||||
return m_node.m_rpc.invoke(e, o->target_ep(), o);
|
||||
}
|
||||
|
||||
void find_data::got_peers(std::vector<tcp::endpoint> const& peers)
|
||||
|
@ -234,19 +228,26 @@ void find_data::done()
|
|||
|
||||
m_done = true;
|
||||
|
||||
#ifdef TORRENT_DHT_VERBOSE_LOGGING
|
||||
TORRENT_LOG(traversal) << time_now_string() << "[" << this << "] get_peers DONE";
|
||||
#endif
|
||||
|
||||
std::vector<std::pair<node_entry, std::string> > results;
|
||||
int num_results = m_node.m_table.bucket_size();
|
||||
for (std::vector<result>::iterator i = m_results.begin()
|
||||
for (std::vector<observer_ptr>::iterator i = m_results.begin()
|
||||
, end(m_results.end()); i != end && num_results > 0; ++i)
|
||||
{
|
||||
if (i->flags & result::no_id) continue;
|
||||
if ((i->flags & result::queried) == 0) continue;
|
||||
std::map<node_id, std::string>::iterator j = m_write_tokens.find(i->id);
|
||||
observer_ptr const& o = *i;
|
||||
if (o->flags & observer::flag_no_id) continue;
|
||||
if ((o->flags & observer::flag_queried) == 0) continue;
|
||||
std::map<node_id, std::string>::iterator j = m_write_tokens.find(o->id());
|
||||
if (j == m_write_tokens.end()) continue;
|
||||
results.push_back(std::make_pair(node_entry(i->id, i->endpoint()), j->second));
|
||||
results.push_back(std::make_pair(node_entry(o->id(), o->target_ep()), j->second));
|
||||
--num_results;
|
||||
}
|
||||
m_nodes_callback(results, m_got_peers);
|
||||
|
||||
traversal_algorithm::done();
|
||||
}
|
||||
|
||||
} } // namespace libtorrent::dht
|
||||
|
|
|
@ -267,7 +267,7 @@ void node_impl::bootstrap(std::vector<udp::endpoint> const& nodes
|
|||
#ifdef TORRENT_DHT_VERBOSE_LOGGING
|
||||
++count;
|
||||
#endif
|
||||
r->add_entry(node_id(0), *i, traversal_algorithm::result::initial);
|
||||
r->add_entry(node_id(0), *i, observer::flag_initial);
|
||||
}
|
||||
|
||||
#ifdef TORRENT_DHT_VERBOSE_LOGGING
|
||||
|
@ -369,7 +369,7 @@ namespace
|
|||
void* ptr = node.m_rpc.allocator().malloc();
|
||||
if (ptr == 0) return;
|
||||
node.m_rpc.allocator().set_next_size(10);
|
||||
observer_ptr o(new (ptr) announce_observer(algo));
|
||||
observer_ptr o(new (ptr) announce_observer(algo, i->first.ep(), i->first.id));
|
||||
#ifdef TORRENT_DEBUG
|
||||
o->m_in_constructor = false;
|
||||
#endif
|
||||
|
@ -402,9 +402,11 @@ void node_impl::add_node(udp::endpoint node)
|
|||
m_rpc.allocator().set_next_size(10);
|
||||
|
||||
// create a dummy traversal_algorithm
|
||||
// this is unfortunately necessary for the observer
|
||||
// to free itself from the pool when it's being released
|
||||
boost::intrusive_ptr<traversal_algorithm> algo(
|
||||
new traversal_algorithm(*this, (node_id::min)()));
|
||||
observer_ptr o(new (ptr) null_observer(algo));
|
||||
observer_ptr o(new (ptr) null_observer(algo, node, node_id(0)));
|
||||
#ifdef TORRENT_DEBUG
|
||||
o->m_in_constructor = false;
|
||||
#endif
|
||||
|
|
|
@ -58,30 +58,24 @@ char const* refresh::name() const
|
|||
return "refresh";
|
||||
}
|
||||
|
||||
bool refresh::invoke(udp::endpoint addr)
|
||||
observer_ptr refresh::new_observer(void* ptr
|
||||
, udp::endpoint const& ep, node_id const& id)
|
||||
{
|
||||
TORRENT_ASSERT(m_node.m_rpc.allocation_size() >= sizeof(find_data_observer));
|
||||
void* ptr = m_node.m_rpc.allocator().malloc();
|
||||
if (ptr == 0)
|
||||
{
|
||||
#ifdef TORRENT_DHT_VERBOSE_LOGGING
|
||||
TORRENT_LOG(traversal) << "[" << this << "] failed to "
|
||||
"allocate memory for observer. aborting!";
|
||||
#endif
|
||||
done();
|
||||
return false;
|
||||
}
|
||||
m_node.m_rpc.allocator().set_next_size(10);
|
||||
observer_ptr o(new (ptr) find_data_observer(this));
|
||||
observer_ptr o(new (ptr) find_data_observer(this, ep, id));
|
||||
#ifdef TORRENT_DEBUG
|
||||
o->m_in_constructor = false;
|
||||
#endif
|
||||
return o;
|
||||
}
|
||||
|
||||
bool refresh::invoke(observer_ptr o)
|
||||
{
|
||||
entry e;
|
||||
e["y"] = "q";
|
||||
e["q"] = "find_node";
|
||||
entry& a = e["a"];
|
||||
a["target"] = target().to_string();
|
||||
m_node.m_rpc.invoke(e, addr, o);
|
||||
m_node.m_rpc.invoke(e, o->target_ep(), o);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
|
|
@ -125,14 +125,14 @@ void observer::abort()
|
|||
{
|
||||
if (m_done) return;
|
||||
m_done = true;
|
||||
m_algorithm->failed(target_ep(), traversal_algorithm::prevent_request);
|
||||
m_algorithm->failed(observer_ptr(this), traversal_algorithm::prevent_request);
|
||||
}
|
||||
|
||||
void observer::done()
|
||||
{
|
||||
if (m_done) return;
|
||||
m_done = true;
|
||||
m_algorithm->finished(target_ep());
|
||||
m_algorithm->finished(observer_ptr(this));
|
||||
}
|
||||
|
||||
void observer::short_timeout()
|
||||
|
@ -140,7 +140,7 @@ void observer::short_timeout()
|
|||
if (m_short_timeout) return;
|
||||
TORRENT_ASSERT(m_short_timeout == false);
|
||||
m_short_timeout = true;
|
||||
m_algorithm->failed(target_ep(), traversal_algorithm::short_timeout);
|
||||
m_algorithm->failed(observer_ptr(this), traversal_algorithm::short_timeout);
|
||||
}
|
||||
|
||||
// this is called when no reply has been received within
|
||||
|
@ -149,7 +149,7 @@ void observer::timeout()
|
|||
{
|
||||
if (m_done) return;
|
||||
m_done = true;
|
||||
m_algorithm->failed(target_ep());
|
||||
m_algorithm->failed(observer_ptr(this));
|
||||
}
|
||||
|
||||
node_id generate_id();
|
||||
|
@ -182,9 +182,11 @@ rpc_manager::rpc_manager(node_id const& our_id
|
|||
#define PRINT_OFFSETOF(x, y) TORRENT_LOG(rpc) << " +" << offsetof(x, y) << ": " #y
|
||||
|
||||
TORRENT_LOG(rpc) << " observer: " << sizeof(observer);
|
||||
PRINT_OFFSETOF(observer, flags);
|
||||
PRINT_OFFSETOF(observer, m_sent);
|
||||
PRINT_OFFSETOF(observer, m_refs);
|
||||
PRINT_OFFSETOF(observer, m_algorithm);
|
||||
PRINT_OFFSETOF(observer, m_id);
|
||||
PRINT_OFFSETOF(observer, m_addr);
|
||||
PRINT_OFFSETOF(observer, m_port);
|
||||
PRINT_OFFSETOF(observer, m_transaction_id);
|
||||
|
@ -193,12 +195,6 @@ rpc_manager::rpc_manager(node_id const& our_id
|
|||
TORRENT_LOG(rpc) << " null_observer: " << sizeof(null_observer);
|
||||
TORRENT_LOG(rpc) << " find_data_observer: " << sizeof(find_data_observer);
|
||||
|
||||
TORRENT_LOG(rpc) << " traversal_algorithm::result: " << sizeof(traversal_algorithm::result);
|
||||
PRINT_OFFSETOF(traversal_algorithm::result, id);
|
||||
PRINT_OFFSETOF(traversal_algorithm::result, addr);
|
||||
PRINT_OFFSETOF(traversal_algorithm::result, port);
|
||||
PRINT_OFFSETOF(traversal_algorithm::result, flags);
|
||||
|
||||
#undef PRINT_OFFSETOF
|
||||
#endif
|
||||
|
||||
|
|
|
@ -46,36 +46,62 @@ namespace libtorrent { namespace dht
|
|||
TORRENT_DEFINE_LOG(traversal)
|
||||
#endif
|
||||
|
||||
observer_ptr traversal_algorithm::new_observer(void* ptr
|
||||
, udp::endpoint const& ep, node_id const& id)
|
||||
{
|
||||
observer_ptr o(new (ptr) null_observer(boost::intrusive_ptr<traversal_algorithm>(this), ep, id));
|
||||
#ifdef TORRENT_DEBUG
|
||||
o->m_in_constructor = false;
|
||||
#endif
|
||||
return o;
|
||||
}
|
||||
|
||||
void traversal_algorithm::add_entry(node_id const& id, udp::endpoint addr, unsigned char flags)
|
||||
{
|
||||
result entry(id, addr, flags);
|
||||
if (entry.id.is_all_zeros())
|
||||
TORRENT_ASSERT(m_node.m_rpc.allocation_size() >= sizeof(find_data_observer));
|
||||
m_node.m_rpc.allocator().set_next_size(10);
|
||||
void* ptr = m_node.m_rpc.allocator().malloc();
|
||||
if (ptr == 0)
|
||||
{
|
||||
entry.id = generate_id();
|
||||
entry.flags |= result::no_id;
|
||||
#ifdef TORRENT_DHT_VERBOSE_LOGGING
|
||||
TORRENT_LOG(traversal) << "[" << this << "] failed to "
|
||||
"allocate memory for observer. aborting!";
|
||||
#endif
|
||||
done();
|
||||
return;
|
||||
}
|
||||
observer_ptr o = new_observer(ptr, addr, id);
|
||||
if (id.is_all_zeros())
|
||||
{
|
||||
o->set_id(generate_id());
|
||||
o->flags |= observer::flag_no_id;
|
||||
}
|
||||
|
||||
std::vector<result>::iterator i = std::lower_bound(
|
||||
o->flags |= flags;
|
||||
|
||||
std::vector<observer_ptr>::iterator i = std::lower_bound(
|
||||
m_results.begin()
|
||||
, m_results.end()
|
||||
, entry
|
||||
, o
|
||||
, boost::bind(
|
||||
compare_ref
|
||||
, boost::bind(&result::id, _1)
|
||||
, boost::bind(&result::id, _2)
|
||||
, boost::bind(&observer::id, _1)
|
||||
, boost::bind(&observer::id, _2)
|
||||
, m_target
|
||||
)
|
||||
);
|
||||
|
||||
if (i == m_results.end() || i->id != id)
|
||||
if (i == m_results.end() || (*i)->id() != id)
|
||||
{
|
||||
TORRENT_ASSERT(std::find_if(m_results.begin(), m_results.end()
|
||||
, boost::bind(&result::id, _1) == id) == m_results.end());
|
||||
, boost::bind(&observer::id, _1) == id) == m_results.end());
|
||||
#ifdef TORRENT_DHT_VERBOSE_LOGGING
|
||||
TORRENT_LOG(traversal) << "[" << this << "] adding result: " << id << " " << addr;
|
||||
#endif
|
||||
m_results.insert(i, entry);
|
||||
i = m_results.insert(i, o);
|
||||
}
|
||||
|
||||
if (m_results.size() > 100) m_results.resize(100);
|
||||
}
|
||||
|
||||
void traversal_algorithm::start()
|
||||
|
@ -102,29 +128,21 @@ void traversal_algorithm::traverse(node_id const& id, udp::endpoint addr)
|
|||
add_entry(id, addr, 0);
|
||||
}
|
||||
|
||||
void traversal_algorithm::finished(udp::endpoint const& ep)
|
||||
void traversal_algorithm::finished(observer_ptr o)
|
||||
{
|
||||
std::vector<result>::iterator i = std::find_if(
|
||||
m_results.begin()
|
||||
, m_results.end()
|
||||
, boost::bind(
|
||||
std::equal_to<udp::endpoint>()
|
||||
, boost::bind(&result::endpoint, _1)
|
||||
, ep
|
||||
)
|
||||
);
|
||||
#ifdef TORRENT_DEBUG
|
||||
std::vector<observer_ptr>::iterator i = std::find(
|
||||
m_results.begin(), m_results.end(), o);
|
||||
|
||||
TORRENT_ASSERT(i != m_results.end());
|
||||
#endif
|
||||
|
||||
if (i != m_results.end())
|
||||
{
|
||||
// if this flag is set, it means we increased the
|
||||
// branch factor for it, and we should restore it
|
||||
if (i->flags & result::short_timeout)
|
||||
--m_branch_factor;
|
||||
}
|
||||
// if this flag is set, it means we increased the
|
||||
// branch factor for it, and we should restore it
|
||||
if (o->flags & observer::flag_short_timeout)
|
||||
--m_branch_factor;
|
||||
|
||||
i->flags |= result::alive;
|
||||
o->flags |= observer::flag_alive;
|
||||
|
||||
++m_responses;
|
||||
--m_invoke_count;
|
||||
|
@ -136,59 +154,52 @@ void traversal_algorithm::finished(udp::endpoint const& ep)
|
|||
// prevent request means that the total number of requests has
|
||||
// overflown. This query failed because it was the oldest one.
|
||||
// So, if this is true, don't make another request
|
||||
void traversal_algorithm::failed(udp::endpoint const& ep, int flags)
|
||||
void traversal_algorithm::failed(observer_ptr o, int flags)
|
||||
{
|
||||
TORRENT_ASSERT(m_invoke_count >= 0);
|
||||
|
||||
if (m_results.empty()) return;
|
||||
|
||||
std::vector<result>::iterator i = std::find_if(
|
||||
m_results.begin()
|
||||
, m_results.end()
|
||||
, boost::bind(
|
||||
std::equal_to<udp::endpoint>()
|
||||
, boost::bind(&result::endpoint, _1)
|
||||
, ep
|
||||
)
|
||||
);
|
||||
|
||||
TORRENT_ASSERT(i != m_results.end());
|
||||
|
||||
if (i != m_results.end())
|
||||
TORRENT_ASSERT(o->flags & observer::flag_queried);
|
||||
if (flags & short_timeout)
|
||||
{
|
||||
TORRENT_ASSERT(i->flags & result::queried);
|
||||
if (flags & short_timeout)
|
||||
{
|
||||
// short timeout means that it has been more than
|
||||
// two seconds since we sent the request, and that
|
||||
// we'll most likely not get a response. But, in case
|
||||
// we do get a late response, keep the handler
|
||||
// around for some more, but open up the slot
|
||||
// by increasing the branch factor
|
||||
if ((i->flags & result::short_timeout) == 0)
|
||||
++m_branch_factor;
|
||||
i->flags |= result::short_timeout;
|
||||
}
|
||||
else
|
||||
{
|
||||
i->flags |= result::failed;
|
||||
// short timeout means that it has been more than
|
||||
// two seconds since we sent the request, and that
|
||||
// we'll most likely not get a response. But, in case
|
||||
// we do get a late response, keep the handler
|
||||
// around for some more, but open up the slot
|
||||
// by increasing the branch factor
|
||||
if ((o->flags & observer::flag_short_timeout) == 0)
|
||||
++m_branch_factor;
|
||||
o->flags |= observer::flag_short_timeout;
|
||||
#ifdef TORRENT_DHT_VERBOSE_LOGGING
|
||||
TORRENT_LOG(traversal) << " [" << this << "] failed: "
|
||||
<< i->id << " " << i->endpoint();
|
||||
TORRENT_LOG(traversal) << " [" << this << "] first chance timeout: "
|
||||
<< o->id() << " " << o->target_ep()
|
||||
<< " branch-factor: " << m_branch_factor
|
||||
<< " invoke-count: " << m_invoke_count;
|
||||
#endif
|
||||
// if this flag is set, it means we increased the
|
||||
// branch factor for it, and we should restore it
|
||||
if (i->flags & result::short_timeout)
|
||||
--m_branch_factor;
|
||||
}
|
||||
else
|
||||
{
|
||||
o->flags |= observer::flag_failed;
|
||||
// if this flag is set, it means we increased the
|
||||
// branch factor for it, and we should restore it
|
||||
if (o->flags & observer::flag_short_timeout)
|
||||
--m_branch_factor;
|
||||
|
||||
// don't tell the routing table about
|
||||
// node ids that we just generated ourself
|
||||
if ((i->flags & result::no_id) == 0)
|
||||
m_node.m_table.node_failed(i->id);
|
||||
++m_timeouts;
|
||||
--m_invoke_count;
|
||||
TORRENT_ASSERT(m_invoke_count >= 0);
|
||||
}
|
||||
#ifdef TORRENT_DHT_VERBOSE_LOGGING
|
||||
TORRENT_LOG(traversal) << " [" << this << "] failed: "
|
||||
<< o->id() << " " << o->target_ep()
|
||||
<< " branch-factor: " << m_branch_factor
|
||||
<< " invoke-count: " << m_invoke_count;
|
||||
#endif
|
||||
// don't tell the routing table about
|
||||
// node ids that we just generated ourself
|
||||
if ((o->flags & observer::flag_no_id) == 0)
|
||||
m_node.m_table.node_failed(o->id());
|
||||
++m_timeouts;
|
||||
--m_invoke_count;
|
||||
TORRENT_ASSERT(m_invoke_count >= 0);
|
||||
}
|
||||
|
||||
if (flags & prevent_request)
|
||||
|
@ -200,6 +211,13 @@ void traversal_algorithm::failed(udp::endpoint const& ep, int flags)
|
|||
if (m_invoke_count == 0) done();
|
||||
}
|
||||
|
||||
void traversal_algorithm::done()
|
||||
{
|
||||
// delete all our references to the observer objects so
|
||||
// they will in turn release the traversal algorithm
|
||||
m_results.clear();
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
bool bitwise_nand(unsigned char lhs, unsigned char rhs)
|
||||
|
@ -213,23 +231,25 @@ void traversal_algorithm::add_requests()
|
|||
int results_target = m_node.m_table.bucket_size();
|
||||
|
||||
// Find the first node that hasn't already been queried.
|
||||
for (std::vector<result>::iterator i = m_results.begin()
|
||||
for (std::vector<observer_ptr>::iterator i = m_results.begin()
|
||||
, end(m_results.end()); i != end
|
||||
&& results_target > 0 && m_invoke_count < m_branch_factor; ++i)
|
||||
{
|
||||
if (i->flags & result::alive) --results_target;
|
||||
if (i->flags & result::queried) continue;
|
||||
if ((*i)->flags & observer::flag_alive) --results_target;
|
||||
if ((*i)->flags & observer::flag_queried) continue;
|
||||
|
||||
#ifdef TORRENT_DHT_VERBOSE_LOGGING
|
||||
TORRENT_LOG(traversal) << " [" << this << "] nodes left: "
|
||||
<< (m_results.end() - i);
|
||||
TORRENT_LOG(traversal) << " [" << this << "]"
|
||||
<< " nodes-left: " << (m_results.end() - i)
|
||||
<< " invoke-count: " << m_invoke_count
|
||||
<< " branch-factor: " << m_branch_factor;
|
||||
#endif
|
||||
|
||||
if (invoke(i->endpoint()))
|
||||
if (invoke(*i))
|
||||
{
|
||||
TORRENT_ASSERT(m_invoke_count >= 0);
|
||||
++m_invoke_count;
|
||||
i->flags |= result::queried;
|
||||
(*i)->flags |= observer::flag_queried;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -243,7 +263,7 @@ void traversal_algorithm::add_router_entries()
|
|||
for (routing_table::router_iterator i = m_node.m_table.router_begin()
|
||||
, end(m_node.m_table.router_end()); i != end; ++i)
|
||||
{
|
||||
add_entry(node_id(0), *i, result::initial);
|
||||
add_entry(node_id(0), *i, observer::flag_initial);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -268,10 +288,10 @@ void traversal_algorithm::status(dht_lookup& l)
|
|||
l.branch_factor = m_branch_factor;
|
||||
l.type = name();
|
||||
l.nodes_left = 0;
|
||||
for (std::vector<result>::iterator i = m_results.begin()
|
||||
for (std::vector<observer_ptr>::iterator i = m_results.begin()
|
||||
, end(m_results.end()); i != end; ++i)
|
||||
{
|
||||
if (i->flags & result::queried) continue;
|
||||
if ((*i)->flags & observer::flag_queried) continue;
|
||||
++l.nodes_left;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue