more DHT simplifications

This commit is contained in:
Arvid Norberg 2009-10-07 20:51:02 +00:00
parent ba922defc6
commit cee42ff5a1
15 changed files with 305 additions and 375 deletions

View File

@ -783,7 +783,10 @@ int main(int argc, char* argv[])
handles_t handles;
session ses(fingerprint("LT", LIBTORRENT_VERSION_MAJOR, LIBTORRENT_VERSION_MINOR, 0, 0)
, session::add_default_plugins
, alert::all_categories & (~alert::dht_notification));
, alert::all_categories
& ~(alert::dht_notification
+ alert::progress_notification
+ alert::debug_notification));
std::vector<char> in;
if (load_file(".ses_state", in) == 0)

View File

@ -77,7 +77,7 @@ namespace libtorrent { namespace dht
{
friend void intrusive_ptr_add_ref(dht_tracker const*);
friend void intrusive_ptr_release(dht_tracker const*);
friend void send_callback(void* userdata, entry const& e, udp::endpoint const& addr, int flags);
friend bool send_callback(void* userdata, entry const& e, udp::endpoint const& addr, int flags);
dht_tracker(libtorrent::aux::session_impl& ses, rate_limited_udp_socket& sock
, dht_settings const& settings, entry const* state = 0);
@ -115,7 +115,7 @@ namespace libtorrent { namespace dht
void tick(error_code const& e);
void on_bootstrap(std::vector<std::pair<node_entry, std::string> > const&);
void send_packet(libtorrent::entry const& e, udp::endpoint const& addr, int send_flags);
bool send_packet(libtorrent::entry const& e, udp::endpoint const& addr, int send_flags);
node_impl m_dht;
libtorrent::aux::session_impl& m_ses;

View File

@ -61,7 +61,7 @@ class find_data : public traversal_algorithm
{
public:
typedef boost::function<void(std::vector<tcp::endpoint> const&)> data_callback;
typedef boost::function<void(std::vector<std::pair<node_entry, std::string> > const&)> nodes_callback;
typedef boost::function<void(std::vector<std::pair<node_entry, std::string> > const&, bool)> nodes_callback;
void got_peers(std::vector<tcp::endpoint> const& peers);
void got_write_token(node_id const& n, std::string const& write_token)
@ -78,46 +78,27 @@ public:
protected:
void done();
virtual bool invoke(node_id const& id, udp::endpoint addr);
private:
virtual void invoke(node_id const& id, udp::endpoint addr);
data_callback m_data_callback;
nodes_callback m_nodes_callback;
std::map<node_id, std::string> m_write_tokens;
node_id const m_target;
bool m_done;
bool m_done:1;
bool m_got_peers:1;
};
class find_data_observer : public observer
{
public:
find_data_observer(
boost::intrusive_ptr<find_data> const& algorithm
boost::intrusive_ptr<traversal_algorithm> const& algorithm
, node_id self)
: observer(algorithm->allocator())
, m_algorithm(algorithm)
, m_self(self)
: observer(algorithm)
{}
~find_data_observer();
void short_timeout();
void timeout();
void reply(msg const&);
void abort() { m_algorithm = 0; }
// with verbose logging, we log the size and
// offset of this structs members, so we need
// access to all of them
#ifndef TORRENT_DHT_VERBOSE_LOGGING
private:
#endif
boost::intrusive_ptr<find_data> m_algorithm;
// the node this observer sent a message to
// this is used to mark the result in the right
// node when we get a response
// TODO: replace this with the observer this-pointer
node_id const m_self;
};
} } // namespace libtorrent::dht

View File

@ -154,25 +154,11 @@ struct null_type {};
class announce_observer : public observer
{
public:
announce_observer(boost::pool<>& allocator
, sha1_hash const& info_hash
, int listen_port
, std::string const& write_token)
: observer(allocator)
, m_info_hash(info_hash)
, m_listen_port(listen_port)
, m_token(write_token)
announce_observer(boost::intrusive_ptr<traversal_algorithm> const& algo)
: observer(algo)
{}
void short_timeout() {}
void timeout() {}
void reply(msg const&) {}
void abort() {}
private:
sha1_hash m_info_hash;
int m_listen_port;
std::string m_token;
void reply(msg const&) { m_done = true; }
};
struct count_peers
@ -192,7 +178,7 @@ typedef std::map<node_id, torrent_entry> table_t;
typedef std::map<std::pair<node_id, sha1_hash>, search_torrent_entry> search_table_t;
public:
node_impl(libtorrent::aux::session_impl& ses
, void (*f)(void*, entry const&, udp::endpoint const&, int)
, bool (*f)(void*, entry const&, udp::endpoint const&, int)
, dht_settings const& settings, boost::optional<node_id> nid
, void* userdata);
@ -317,7 +303,7 @@ private:
int m_secret[2];
libtorrent::aux::session_impl& m_ses;
void (*m_send)(void*, entry const&, udp::endpoint const&, int);
bool (*m_send)(void*, entry const&, udp::endpoint const&, int);
void* m_userdata;
};

View File

@ -43,6 +43,7 @@ namespace dht {
struct observer;
struct msg;
struct traversal_algorithm;
// defined in rpc_manager.cpp
TORRENT_EXPORT void intrusive_ptr_add_ref(observer const*);
@ -55,7 +56,7 @@ TORRENT_EXPORT void intrusive_ptr_release(observer const*);
// 16 4 4 pool_allocator
// 20 16 4 m_addr
// 36 2 2 m_port
// 38 1 1 m_is_v6, m_in_constructor
// 38 1 1 m_is_v6, m_short_timeout, m_in_constructor, m_was_sent
// 39 1 1 <padding>
// 40
@ -64,37 +65,41 @@ struct observer : boost::noncopyable
friend TORRENT_EXPORT void intrusive_ptr_add_ref(observer const*);
friend TORRENT_EXPORT void intrusive_ptr_release(observer const*);
observer(boost::pool<>& p)
observer(boost::intrusive_ptr<traversal_algorithm> const& a)
: m_sent()
, m_refs(0)
, pool_allocator(p)
, m_algorithm(a)
, m_is_v6(false)
, m_short_timeout(false)
, m_done(false)
{
TORRENT_ASSERT(a);
#ifdef TORRENT_DEBUG
m_in_constructor = true;
m_was_sent = false;
#endif
}
virtual ~observer()
{
TORRENT_ASSERT(!m_in_constructor);
}
virtual ~observer();
// this is called when a reply is received
virtual void reply(msg const& m) = 0;
// this is called if no response has been received after
// a few seconds, before the request has timed out
virtual void short_timeout() = 0;
void short_timeout();
bool has_short_timeout() const { return m_short_timeout; }
// this is called when no reply has been received within
// some timeout
virtual void timeout() = 0;
void timeout();
// if this is called the destructor should
// not invoke any new messages, and should
// only clean up. It means the rpc-manager
// is being destructed
virtual void abort() = 0;
void abort();
ptime sent() const { return m_sent; }
@ -102,18 +107,25 @@ struct observer : boost::noncopyable
address target_addr() const;
udp::endpoint target_ep() const;
// with verbose logging, we log the size and
// offset of this structs members, so we need
// access to all of them
void set_transaction_id(boost::uint16_t tid)
{ m_transaction_id = tid; }
boost::uint16_t transaction_id() const
{ return m_transaction_id; }
#ifndef TORRENT_DHT_VERBOSE_LOGGING
private:
protected:
#endif
void done();
ptime m_sent;
// reference counter for intrusive_ptr
mutable boost::detail::atomic_count m_refs;
boost::pool<>& pool_allocator;
const boost::intrusive_ptr<traversal_algorithm> m_algorithm;
union addr_t
{
#if TORRENT_USE_IPV6
@ -124,10 +136,19 @@ private:
boost::uint16_t m_port;
// the transaction ID for this call
boost::uint16_t m_transaction_id;
bool m_is_v6:1;
bool m_short_timeout:1;
// when true, this observer has reported
// back to the traversal algorithm already
bool m_done:1;
#ifdef TORRENT_DEBUG
public:
bool m_in_constructor:1;
bool m_was_sent:1;
#endif
};

View File

@ -55,9 +55,9 @@ public:
virtual char const* name() const;
private:
protected:
void invoke(node_id const& id, udp::endpoint addr);
virtual bool invoke(node_id const& id, udp::endpoint addr);
};
} } // namespace libtorrent::dht

View File

@ -60,11 +60,8 @@ TORRENT_DECLARE_LOG(rpc);
struct null_observer : public observer
{
null_observer(boost::pool<>& allocator): observer(allocator) {}
virtual void reply(msg const&) {}
virtual void short_timeout() {}
virtual void timeout() {}
void abort() {}
null_observer(boost::intrusive_ptr<traversal_algorithm>& a): observer(a) {}
virtual void reply(msg const&) { m_done = true; }
};
class routing_table;
@ -72,7 +69,7 @@ class routing_table;
class rpc_manager
{
public:
typedef void (*send_fun)(void* userdata, entry const&, udp::endpoint const&, int);
typedef bool (*send_fun)(void* userdata, entry const&, udp::endpoint const&, int);
rpc_manager(node_id const& our_id
, routing_table& table, send_fun const& sf
@ -85,7 +82,7 @@ public:
bool incoming(msg const&);
time_duration tick();
void invoke(entry& e, udp::endpoint target
bool invoke(entry& e, udp::endpoint target
, observer_ptr o);
void add_our_id(entry& e);
@ -100,27 +97,17 @@ public:
private:
enum { max_transactions = 2048 };
unsigned int new_transaction_id(observer_ptr o);
void update_oldest_transaction_id();
enum { max_transaction_id = 0x10000 };
boost::uint32_t calc_connection_id(udp::endpoint addr);
mutable boost::pool<> m_pool_allocator;
typedef boost::array<observer_ptr, max_transactions>
transactions_t;
typedef std::list<observer_ptr> transactions_t;
transactions_t m_transactions;
std::vector<observer_ptr> m_aborted_transactions;
// this is the next transaction id to be used
int m_next_transaction_id;
// this is the oldest transaction id still
// (possibly) in use. This is the transaction
// that will time out first, the one we are
// waiting for to time out
int m_oldest_transaction_id;
send_fun m_send;
void* m_userdata;

View File

@ -60,10 +60,10 @@ class traversal_algorithm : boost::noncopyable
{
public:
void traverse(node_id const& id, udp::endpoint addr);
void finished(node_id const& id);
void finished(udp::endpoint const& ep);
enum flags_t { prevent_request = 1, short_timeout = 2 };
void failed(node_id const& id, int flags = 0);
void failed(udp::endpoint const& ep, int flags = 0);
virtual ~traversal_algorithm();
boost::pool<>& allocator() const;
void status(dht_lookup& l);
@ -77,18 +77,51 @@ public:
struct result
{
result(node_id const& id, udp::endpoint addr, unsigned char f = 0)
: id(id), addr(addr), flags(f) {}
result(node_id const& id, udp::endpoint ep, unsigned char f = 0)
: id(id), flags(f)
{
if (ep.address().is_v6())
{
flags |= ipv6_address;
addr.v6 = ep.address().to_v6().to_bytes();
}
else
{
flags &= ~ipv6_address;
addr.v4 = ep.address().to_v4().to_bytes();
}
port = ep.port();
}
udp::endpoint endpoint() const
{
if (flags & ipv6_address)
return udp::endpoint(address_v6(addr.v6), port);
else
return udp::endpoint(address_v4(addr.v4), port);
}
node_id id;
// TODO: replace with union of address_v4 and address_v6 and a port
udp::endpoint addr;
enum { queried = 1, initial = 2, no_id = 4, short_timeout = 8 };
union addr_t
{
address_v4::bytes_type v4;
address_v6::bytes_type v6;
} addr;
boost::uint16_t port;
enum {
queried = 1,
initial = 2,
no_id = 4,
short_timeout = 8,
failed = 16,
ipv6_address = 32
};
unsigned char flags;
};
protected:
traversal_algorithm(
node_impl& node
, node_id target)
@ -99,14 +132,20 @@ protected:
, m_branch_factor(3)
, m_responses(0)
, m_timeouts(0)
{}
{
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(traversal) << " [" << this << "] new traversal process";
#endif
}
protected:
void add_requests();
void add_router_entries();
void init();
virtual void done() = 0;
virtual void invoke(node_id const& id, udp::endpoint addr) = 0;
virtual void done() {}
virtual bool invoke(node_id const& id, udp::endpoint addr) { return false; }
std::vector<result>::iterator last_iterator();
@ -126,7 +165,6 @@ protected:
node_impl& m_node;
node_id m_target;
std::vector<result> m_results;
std::set<udp::endpoint> m_failed;
int m_invoke_count;
int m_branch_factor;
int m_responses;

View File

@ -308,12 +308,7 @@ namespace libtorrent
{ none, requested, writing, finished };
private:
#ifdef __SUNPRO_CC
// sunpro is strict about POD types in unions
struct
#else
union
#endif
union addr_t
{
address_v4::bytes_type v4;
address_v6::bytes_type v6;

View File

@ -200,10 +200,10 @@ namespace libtorrent { namespace dht
return boost::optional<node_id>(node_id(nid->string().c_str()));
}
void send_callback(void* userdata, entry const& e, udp::endpoint const& addr, int flags)
bool send_callback(void* userdata, entry const& e, udp::endpoint const& addr, int flags)
{
dht_tracker* self = (dht_tracker*)userdata;
self->send_packet(e, addr, flags);
return self->send_packet(e, addr, flags);
}
// class that puts the networking and the kademlia node in a single
@ -244,7 +244,7 @@ namespace libtorrent { namespace dht
// rpc_log().enable(false);
// node_log().enable(false);
traversal_log().enable(false);
// traversal_log().enable(false);
// dht_tracker_log.enable(false);
TORRENT_LOG(dht_tracker) << "starting DHT tracker with node id: " << m_dht.nid();
@ -617,7 +617,7 @@ namespace libtorrent { namespace dht
void dht_tracker::on_bootstrap(std::vector<std::pair<node_entry, std::string> > const&)
{}
void dht_tracker::send_packet(libtorrent::entry const& e, udp::endpoint const& addr, int send_flags)
bool dht_tracker::send_packet(libtorrent::entry const& e, udp::endpoint const& addr, int send_flags)
{
using libtorrent::bencode;
using libtorrent::entry;
@ -636,6 +636,8 @@ namespace libtorrent { namespace dht
error_code ec;
if (m_sock.send(addr, &m_send_buf[0], (int)m_send_buf.size(), ec, send_flags))
{
if (ec) return false;
// account for IP and UDP overhead
m_sent_bytes += m_send_buf.size() + (addr.address().is_v6() ? 48 : 28);
@ -654,13 +656,15 @@ namespace libtorrent { namespace dht
}
TORRENT_LOG(dht_tracker) << "==> " << addr << " " << log_line.str();
#endif
return true;
}
#ifdef TORRENT_DHT_VERBOSE_LOGGING
else
{
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(dht_tracker) << "==> " << addr << " DROPPED " << log_line.str();
}
#endif
return false;
}
}
}}

View File

@ -52,29 +52,18 @@ using detail::read_v4_endpoint;
using detail::read_v6_endpoint;
using detail::read_endpoint_list;
find_data_observer::~find_data_observer()
{
if (m_algorithm) m_algorithm->failed(m_self);
}
void find_data_observer::reply(msg const& m)
{
if (!m_algorithm)
{
TORRENT_ASSERT(false);
return;
}
#ifdef TORRENT_DHT_VERBOSE_LOGGING
std::stringstream log_line;
log_line << " incoming get_peer response [ ";
log_line << "[" << m_algorithm.get() << "] incoming get_peer response [ ";
#endif
lazy_entry const* r = m.message.dict_find_dict("r");
if (!r)
{
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(dht_tracker) << " missing response dict";
TORRENT_LOG(dht_tracker) << "[" << m_algorithm.get() << "] missing response dict";
#endif
return;
}
@ -83,7 +72,7 @@ void find_data_observer::reply(msg const& m)
if (!id || id->string_length() != 20)
{
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(dht_tracker) << " invalid id in response";
TORRENT_LOG(dht_tracker) << "[" << m_algorithm.get() << "] invalid id in response";
#endif
return;
}
@ -91,7 +80,9 @@ void find_data_observer::reply(msg const& m)
lazy_entry const* token = r->dict_find_string("token");
if (token)
{
m_algorithm->got_write_token(node_id(id->string_ptr()), token->string_value());
static_cast<find_data*>(m_algorithm.get())->got_write_token(
node_id(id->string_ptr()), token->string_value());
#ifdef TORRENT_DHT_VERBOSE_LOGGING
log_line << " token: " << to_hex(token->string_value());
#endif
@ -122,7 +113,7 @@ void find_data_observer::reply(msg const& m)
log_line << " p: " << n->list_size();
#endif
}
m_algorithm->got_peers(peer_list);
static_cast<find_data*>(m_algorithm.get())->got_peers(peer_list);
}
// look for nodes
@ -169,25 +160,11 @@ void find_data_observer::reply(msg const& m)
#endif
}
}
m_algorithm->finished(m_self);
m_algorithm = 0;
#ifdef TORRENT_DHT_VERBOSE_LOGGING
log_line << " ]";
TORRENT_LOG(dht_tracker) << log_line.str();
#endif
}
void find_data_observer::short_timeout()
{
if (!m_algorithm) return;
m_algorithm->failed(m_self, traversal_algorithm::short_timeout);
}
void find_data_observer::timeout()
{
if (!m_algorithm) return;
m_algorithm->failed(m_self);
m_algorithm = 0;
done();
}
find_data::find_data(
@ -200,6 +177,7 @@ find_data::find_data(
, m_nodes_callback(ncallback)
, m_target(target)
, m_done(false)
, m_got_peers(false)
{
for (routing_table::const_iterator i = node.m_table.begin()
, end(node.m_table.end()); i != end; ++i)
@ -208,12 +186,12 @@ find_data::find_data(
}
}
void find_data::invoke(node_id const& id, udp::endpoint addr)
bool find_data::invoke(node_id const& id, udp::endpoint addr)
{
if (m_done)
{
m_invoke_count = -1;
return;
return false;
}
TORRENT_ASSERT(m_node.m_rpc.allocation_size() >= sizeof(find_data_observer));
@ -221,7 +199,7 @@ void find_data::invoke(node_id const& id, udp::endpoint addr)
if (ptr == 0)
{
done();
return;
return false;
}
m_node.m_rpc.allocator().set_next_size(10);
observer_ptr o(new (ptr) find_data_observer(this, id));
@ -233,11 +211,12 @@ void find_data::invoke(node_id const& id, udp::endpoint addr)
e["q"] = "get_peers";
entry& a = e["a"];
a["info_hash"] = id.to_string();
m_node.m_rpc.invoke(e, addr, o);
return m_node.m_rpc.invoke(e, addr, o);
}
void find_data::got_peers(std::vector<tcp::endpoint> const& peers)
{
if (!peers.empty()) m_got_peers = true;
m_data_callback(peers);
}
@ -245,6 +224,8 @@ void find_data::done()
{
if (m_invoke_count != 0) return;
m_done = true;
std::vector<std::pair<node_entry, std::string> > results;
int num_results = m_node.m_table.bucket_size();
for (std::vector<result>::iterator i = m_results.begin()
@ -254,10 +235,10 @@ void find_data::done()
if ((i->flags & result::queried) == 0) continue;
std::map<node_id, std::string>::iterator j = m_write_tokens.find(i->id);
if (j == m_write_tokens.end()) continue;
results.push_back(std::make_pair(node_entry(i->id, i->addr), j->second));
results.push_back(std::make_pair(node_entry(i->id, i->endpoint()), j->second));
--num_results;
}
m_nodes_callback(results);
m_nodes_callback(results, m_got_peers);
}
} } // namespace libtorrent::dht

View File

@ -183,7 +183,7 @@ void nop() {}
// instead, and make the dht_tracker less dependent on session_impl
// which would make it simpler to unit test
node_impl::node_impl(libtorrent::aux::session_impl& ses
, void (*f)(void*, entry const&, udp::endpoint const&, int)
, bool (*f)(void*, entry const&, udp::endpoint const&, int)
, dht_settings const& settings
, boost::optional<node_id> nid
, void* userdata)
@ -372,7 +372,7 @@ void node_impl::incoming(msg const& m)
namespace
{
void announce_fun(std::vector<std::pair<node_entry, std::string> > const& v
, rpc_manager& rpc, int listen_port, sha1_hash const& ih)
, node_impl& node, int listen_port, sha1_hash const& ih)
{
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(node) << "sending announce_peer [ ih: " << ih
@ -380,6 +380,10 @@ namespace
<< " nodes: " << v.size() << " ]" ;
#endif
// create a dummy traversal_algorithm
boost::intrusive_ptr<traversal_algorithm> algo(
new traversal_algorithm(node, node_id::min()));
// store on the first k nodes
for (std::vector<std::pair<node_entry, std::string> >::const_iterator i = v.begin()
, end(v.end()); i != end; ++i)
@ -388,11 +392,10 @@ namespace
TORRENT_LOG(node) << " distance: " << (160 - distance_exp(ih, i->first.id));
#endif
void* ptr = rpc.allocator().malloc();
void* ptr = node.m_rpc.allocator().malloc();
if (ptr == 0) return;
rpc.allocator().set_next_size(10);
observer_ptr o(new (ptr) announce_observer(
rpc.allocator(), ih, listen_port, i->second));
node.m_rpc.allocator().set_next_size(10);
observer_ptr o(new (ptr) announce_observer(algo));
#ifdef TORRENT_DEBUG
o->m_in_constructor = false;
#endif
@ -400,9 +403,10 @@ namespace
e["y"] = "q";
e["q"] = "announce_peer";
entry& a = e["a"];
a["info_hash"] = ih.to_string();
a["port"] = listen_port;
a["token"] = i->second;
rpc.invoke(e, i->first.ep(), o);
node.m_rpc.invoke(e, i->first.ep(), o);
}
}
}
@ -422,7 +426,11 @@ void node_impl::add_node(udp::endpoint node)
void* ptr = m_rpc.allocator().malloc();
if (ptr == 0) return;
m_rpc.allocator().set_next_size(10);
observer_ptr o(new (ptr) null_observer(m_rpc.allocator()));
// create a dummy traversal_algorithm
boost::intrusive_ptr<traversal_algorithm> algo(
new traversal_algorithm(*this, node_id::min()));
observer_ptr o(new (ptr) null_observer(algo));
#ifdef TORRENT_DEBUG
o->m_in_constructor = false;
#endif
@ -441,7 +449,7 @@ void node_impl::announce(sha1_hash const& info_hash, int listen_port
// search for nodes with ids close to id or with peers
// for info-hash id. then send announce_peer to them.
boost::intrusive_ptr<find_data> ta(new find_data(*this, info_hash, f
, boost::bind(&announce_fun, _1, boost::ref(m_rpc)
, boost::bind(&announce_fun, _1, boost::ref(*this)
, listen_port, info_hash)));
ta->start();
}
@ -583,6 +591,7 @@ bool node_impl::lookup_peers(sha1_hash const& info_hash, entry& reply) const
if (i == m_map.end()) return false;
torrent_entry const& v = i->second;
if (v.peers.empty()) return false;
int num = (std::min)((int)v.peers.size(), m_settings.max_peers_reply);
int t = 0;
@ -767,9 +776,9 @@ void node_impl::incoming_request(msg const& m, entry& e)
m_table.find_node(info_hash, n, 0);
write_nodes_entry(reply, n);
lookup_peers(info_hash, reply);
bool ret = lookup_peers(info_hash, reply);
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(node) << " values: " << reply["values"].list().size();
if (ret) TORRENT_LOG(node) << " values: " << reply["values"].list().size();
#endif
}
else if (strcmp(query, "find_node") == 0)

View File

@ -54,14 +54,14 @@ char const* refresh::name() const
return "refresh";
}
void refresh::invoke(node_id const& nid, udp::endpoint addr)
bool refresh::invoke(node_id const& nid, udp::endpoint addr)
{
TORRENT_ASSERT(m_node.m_rpc.allocation_size() >= sizeof(find_data_observer));
void* ptr = m_node.m_rpc.allocator().malloc();
if (ptr == 0)
{
done();
return;
return false;
}
m_node.m_rpc.allocator().set_next_size(10);
observer_ptr o(new (ptr) find_data_observer(this, nid));
@ -74,6 +74,7 @@ void refresh::invoke(node_id const& nid, udp::endpoint addr)
entry& a = e["a"];
a["target"] = target().to_string();
m_node.m_rpc.invoke(e, addr, o);
return true;
}
} } // namespace libtorrent::dht

View File

@ -80,7 +80,7 @@ void intrusive_ptr_release(observer const* o)
TORRENT_ASSERT(o != 0);
if (--o->m_refs == 0)
{
boost::pool<>& p = o->pool_allocator;
boost::pool<>& p = o->m_algorithm->allocator();
(const_cast<observer*>(o))->~observer();
p.free(const_cast<observer*>(o));
}
@ -123,6 +123,36 @@ udp::endpoint observer::target_ep() const
return udp::endpoint(target_addr(), m_port);
}
void observer::abort()
{
if (m_done) return;
m_done = true;
m_algorithm->failed(target_ep(), traversal_algorithm::prevent_request);
}
void observer::done()
{
if (m_done) return;
m_done = true;
m_algorithm->finished(target_ep());
}
void observer::short_timeout()
{
if (m_short_timeout) return;
TORRENT_ASSERT(m_short_timeout == false);
m_short_timeout = true;
m_algorithm->failed(target_ep(), traversal_algorithm::short_timeout);
}
// this is called when no reply has been received within
// some timeout
void observer::timeout()
{
if (m_done) return;
m_done = true;
m_algorithm->failed(target_ep());
}
node_id generate_id();
@ -140,8 +170,7 @@ rpc_manager::rpc_manager(node_id const& our_id
, routing_table& table, send_fun const& sf
, void* userdata)
: m_pool_allocator(sizeof(mpl::deref<max_observer_type_iter::base>::type), 10)
, m_next_transaction_id(std::rand() % max_transactions)
, m_oldest_transaction_id(m_next_transaction_id)
, m_next_transaction_id(std::rand() % max_transaction_id)
, m_send(sf)
, m_userdata(userdata)
, m_our_id(our_id)
@ -154,24 +183,30 @@ rpc_manager::rpc_manager(node_id const& our_id
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(rpc) << "Constructing";
TORRENT_LOG(rpc) << " announce_observer: " << sizeof(announce_observer);
TORRENT_LOG(rpc) << " null_observer: " << sizeof(null_observer);
#define PRINT_OFFSETOF(x, y) TORRENT_LOG(rpc) << " +" << offsetof(x, y) << ": " #y
TORRENT_LOG(rpc) << " observer: " << sizeof(observer);
PRINT_OFFSETOF(observer, pool_allocator);
PRINT_OFFSETOF(observer, m_sent);
PRINT_OFFSETOF(observer, m_refs);
PRINT_OFFSETOF(observer, m_algorithm);
PRINT_OFFSETOF(observer, m_addr);
PRINT_OFFSETOF(observer, m_port);
PRINT_OFFSETOF(observer, m_transaction_id);
TORRENT_LOG(rpc) << " announce_observer: " << sizeof(announce_observer);
TORRENT_LOG(rpc) << " null_observer: " << sizeof(null_observer);
TORRENT_LOG(rpc) << " find_data_observer: " << sizeof(find_data_observer);
PRINT_OFFSETOF(find_data_observer, m_algorithm);
PRINT_OFFSETOF(find_data_observer, m_self);
TORRENT_LOG(rpc) << " traversal_algorithm::result: " << sizeof(traversal_algorithm::result);
PRINT_OFFSETOF(traversal_algorithm::result, id);
PRINT_OFFSETOF(traversal_algorithm::result, addr);
PRINT_OFFSETOF(traversal_algorithm::result, port);
PRINT_OFFSETOF(traversal_algorithm::result, flags);
#undef PRINT_OFFSETOF
#endif
}
rpc_manager::~rpc_manager()
@ -181,13 +216,11 @@ rpc_manager::~rpc_manager()
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(rpc) << "Destructing";
#endif
std::for_each(m_aborted_transactions.begin(), m_aborted_transactions.end()
, bind(&observer::abort, _1));
for (transactions_t::iterator i = m_transactions.begin()
, end(m_transactions.end()); i != end; ++i)
{
if (*i) (*i)->abort();
(*i)->abort();
}
}
@ -200,16 +233,13 @@ size_t rpc_manager::allocation_size() const
void rpc_manager::check_invariant() const
{
TORRENT_ASSERT(m_oldest_transaction_id >= 0);
TORRENT_ASSERT(m_oldest_transaction_id < max_transactions);
TORRENT_ASSERT(m_next_transaction_id >= 0);
TORRENT_ASSERT(m_next_transaction_id < max_transactions);
TORRENT_ASSERT(!m_transactions[m_next_transaction_id]);
TORRENT_ASSERT(m_next_transaction_id < max_transaction_id);
for (int i = (m_next_transaction_id + 1) % max_transactions;
i != m_oldest_transaction_id; i = (i + 1) % max_transactions)
for (transactions_t::const_iterator i = m_transactions.begin()
, end(m_transactions.end()); i != end; ++i)
{
TORRENT_ASSERT(!m_transactions[i]);
TORRENT_ASSERT(*i);
}
}
#endif
@ -219,31 +249,20 @@ void rpc_manager::unreachable(udp::endpoint const& ep)
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(rpc) << time_now_string() << " PORT_UNREACHABLE [ ip: " << ep << " ]";
#endif
int num_active = m_oldest_transaction_id < m_next_transaction_id
? m_next_transaction_id - m_oldest_transaction_id
: max_transactions - m_oldest_transaction_id + m_next_transaction_id;
TORRENT_ASSERT((m_oldest_transaction_id + num_active) % max_transactions
== m_next_transaction_id);
int tid = m_oldest_transaction_id;
for (int i = 0; i < num_active; ++i, ++tid)
for (transactions_t::iterator i = m_transactions.begin();
i != m_transactions.end();)
{
if (tid >= max_transactions) tid = 0;
observer_ptr const& o = m_transactions[tid];
if (!o) continue;
TORRENT_ASSERT(*i);
observer_ptr const& o = *i;
if (o->target_ep() != ep) continue;
observer_ptr ptr = m_transactions[tid];
m_transactions[tid] = 0;
if (tid == m_oldest_transaction_id)
{
++m_oldest_transaction_id;
if (m_oldest_transaction_id >= max_transactions)
m_oldest_transaction_id = 0;
}
observer_ptr ptr = *i;
m_transactions.erase(i++);
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(rpc) << " found transaction [ tid: " << tid << " ]";
TORRENT_LOG(rpc) << " found transaction [ tid: " << ptr->transaction_id() << " ]";
#endif
ptr->timeout();
return;
break;
}
}
@ -269,7 +288,18 @@ bool rpc_manager::incoming(msg const& m)
observer_ptr o;
if (tid >= (int)m_transactions.size() || tid < 0)
for (transactions_t::iterator i = m_transactions.begin()
, end(m_transactions.end()); i != end; ++i)
{
TORRENT_ASSERT(*i);
if ((*i)->transaction_id() != tid) continue;
if (m.addr.address() != (*i)->target_addr()) continue;
o = *i;
m_transactions.erase(i);
break;
}
if (!o)
{
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(rpc) << "Reply with invalid transaction id size: "
@ -281,26 +311,6 @@ bool rpc_manager::incoming(msg const& m)
return false;
}
o = m_transactions[tid];
if (!o)
{
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(rpc) << "Reply to a timed out request "
<< tid << " from " << m.addr;
#endif
return false;
}
if (m.addr.address() != o->target_addr())
{
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(rpc) << "Reply with incorrect address and valid transaction id: "
<< tid << " from " << m.addr << " expected: " << o->target_addr();
#endif
return false;
}
#ifdef TORRENT_DHT_VERBOSE_LOGGING
std::ofstream reply_stats("round_trip_ms.log", std::ios::app);
reply_stats << m.addr << "\t" << total_milliseconds(time_now_hires() - o->sent())
@ -326,11 +336,10 @@ bool rpc_manager::incoming(msg const& m)
}
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(rpc) << "Reply with transaction id: "
TORRENT_LOG(rpc) << "[" << o->m_algorithm.get() << "] Reply with transaction id: "
<< tid << " from " << m.addr;
#endif
o->reply(m);
m_transactions[tid] = 0;
return m_table.node_seen(node_id(node_id_ent->string_ptr()), m.addr);
}
@ -338,26 +347,22 @@ time_duration rpc_manager::tick()
{
INVARIANT_CHECK;
const static int short_timeout = 2;
const static int timeout = 10;
const static int short_timeout = 3;
const static int timeout = 20;
// look for observers that have timed out
if (m_next_transaction_id == m_oldest_transaction_id) return seconds(short_timeout);
if (m_transactions.empty()) return seconds(short_timeout);
std::vector<observer_ptr> timeouts;
std::list<observer_ptr> timeouts;
time_duration ret = seconds(short_timeout);
ptime now = time_now();
for (;m_next_transaction_id != m_oldest_transaction_id;
m_oldest_transaction_id = (m_oldest_transaction_id + 1) % max_transactions)
for (transactions_t::iterator i = m_transactions.begin();
i != m_transactions.end();)
{
TORRENT_ASSERT(m_oldest_transaction_id >= 0);
TORRENT_ASSERT(m_oldest_transaction_id < max_transactions);
observer_ptr o = m_transactions[m_oldest_transaction_id];
if (!o) continue;
observer_ptr o = *i;
// if we reach an observer that hasn't timed out
// break, because every observer after this one will
@ -369,34 +374,21 @@ time_duration rpc_manager::tick()
break;
}
#ifndef BOOST_NO_EXCEPTIONS
try
{
#endif
m_transactions[m_oldest_transaction_id] = 0;
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(rpc) << "Timing out transaction id: "
<< m_oldest_transaction_id << " from " << o->target_ep();
TORRENT_LOG(rpc) << "[" << o->m_algorithm.get() << "] Timing out transaction id: "
<< (*i)->transaction_id() << " from " << o->target_ep();
#endif
m_transactions.erase(i++);
timeouts.push_back(o);
#ifndef BOOST_NO_EXCEPTIONS
} catch (std::exception) {}
#endif
}
std::for_each(timeouts.begin(), timeouts.end(), bind(&observer::timeout, _1));
timeouts.clear();
// clear the aborted transactions, will likely
// generate new requests. We need to swap, since the
// destrutors may add more observers to the m_aborted_transactions
std::vector<observer_ptr>().swap(m_aborted_transactions);
for (int i = m_oldest_transaction_id; i != m_next_transaction_id;
i = (i + 1) % max_transactions)
for (transactions_t::iterator i = m_transactions.begin();
i != m_transactions.end(); ++i)
{
observer_ptr o = m_transactions[i];
if (!o) continue;
observer_ptr o = *i;
// if we reach an observer that hasn't timed out
// break, because every observer after this one will
@ -408,6 +400,8 @@ time_duration rpc_manager::tick()
break;
}
if (o->has_short_timeout()) continue;
// TODO: don't call short_timeout() again if we've
// already called it once
timeouts.push_back(o);
@ -418,84 +412,22 @@ time_duration rpc_manager::tick()
return ret;
}
unsigned int rpc_manager::new_transaction_id(observer_ptr o)
{
INVARIANT_CHECK;
unsigned int tid = m_next_transaction_id;
m_next_transaction_id = (m_next_transaction_id + 1) % max_transactions;
if (m_transactions[m_next_transaction_id])
{
// moving the observer into the set of aborted transactions
// it will prevent it from spawning new requests right now,
// since that would break the invariant
observer_ptr o = m_transactions[m_next_transaction_id];
m_aborted_transactions.push_back(o);
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(rpc) << "[new_transaction_id] Aborting message with transaction id: "
<< m_next_transaction_id << " sent to " << o->target_ep()
<< " " << total_seconds(time_now() - o->sent()) << " seconds ago";
#endif
m_transactions[m_next_transaction_id] = 0;
TORRENT_ASSERT(m_oldest_transaction_id == m_next_transaction_id);
}
TORRENT_ASSERT(!m_transactions[tid]);
m_transactions[tid] = o;
if (m_oldest_transaction_id == m_next_transaction_id)
{
m_oldest_transaction_id = (m_oldest_transaction_id + 1) % max_transactions;
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(rpc) << "WARNING: transaction limit reached! Too many concurrent"
" messages! limit: " << (int)max_transactions;
#endif
update_oldest_transaction_id();
}
return tid;
}
void rpc_manager::update_oldest_transaction_id()
{
INVARIANT_CHECK;
TORRENT_ASSERT(m_oldest_transaction_id != m_next_transaction_id);
while (!m_transactions[m_oldest_transaction_id])
{
m_oldest_transaction_id = (m_oldest_transaction_id + 1)
% max_transactions;
if (m_oldest_transaction_id == m_next_transaction_id)
break;
}
}
void rpc_manager::add_our_id(entry& e)
{
e["id"] = m_our_id.to_string();
}
void rpc_manager::invoke(entry& e, udp::endpoint target_addr
bool rpc_manager::invoke(entry& e, udp::endpoint target_addr
, observer_ptr o)
{
INVARIANT_CHECK;
if (m_destructing)
{
o->abort();
return;
}
if (m_destructing) return false;
e["y"] = "q";
entry& a = e["a"];
add_our_id(a);
TORRENT_ASSERT(!m_transactions[m_next_transaction_id]);
#ifdef TORRENT_DEBUG
int potential_new_id = m_next_transaction_id;
#endif
#ifndef BOOST_NO_EXCEPTIONS
try
{
#endif
std::string transaction_id;
transaction_id.resize(2);
char* out = &transaction_id[0];
@ -503,34 +435,34 @@ void rpc_manager::invoke(entry& e, udp::endpoint target_addr
e["t"] = transaction_id;
o->set_target(target_addr);
o->set_transaction_id(m_next_transaction_id);
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(rpc) << "Invoking " << e["q"].string() << " -> " << target_addr;
TORRENT_LOG(rpc) << "[" << o->m_algorithm.get() << "] invoking "
<< e["q"].string() << " -> " << target_addr;
#endif
m_send(m_userdata, e, target_addr, 1);
new_transaction_id(o);
#ifndef BOOST_NO_EXCEPTIONS
}
catch (std::exception& e)
if (m_send(m_userdata, e, target_addr, 1))
{
// m_send may fail with "no route to host"
TORRENT_ASSERT(potential_new_id == m_next_transaction_id);
o->abort();
}
m_transactions.push_back(o);
++m_next_transaction_id;
m_next_transaction_id %= max_transaction_id;
#ifdef TORRENT_DEBUG
o->m_was_sent = true;
#endif
}
/*
void rpc_manager::reply(msg& m)
{
INVARIANT_CHECK;
if (m_destructing) return;
TORRENT_ASSERT(m.reply);
m.id = m_our_id;
m_send(m);
return true;
}
*/
observer::~observer()
{
// if the message was sent, it must have been
// reported back to the traversal_algorithm as
// well. If it wasn't sent, it cannot have been
// reported back
TORRENT_ASSERT(m_was_sent == m_done);
TORRENT_ASSERT(!m_in_constructor);
}
} } // namespace libtorrent::dht

View File

@ -50,8 +50,6 @@ TORRENT_DEFINE_LOG(traversal)
void traversal_algorithm::add_entry(node_id const& id, udp::endpoint addr, unsigned char flags)
{
if (m_failed.find(addr) != m_failed.end()) return;
result entry(id, addr, flags);
if (entry.id.is_all_zeros())
{
@ -76,7 +74,7 @@ void traversal_algorithm::add_entry(node_id const& id, udp::endpoint addr, unsig
TORRENT_ASSERT(std::find_if(m_results.begin(), m_results.end()
, bind(&result::id, _1) == id) == m_results.end());
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(traversal) << "adding result: " << id << " " << addr;
TORRENT_LOG(traversal) << "[" << this << "] adding result: " << id << " " << addr;
#endif
m_results.insert(i, entry);
}
@ -84,12 +82,11 @@ void traversal_algorithm::add_entry(node_id const& id, udp::endpoint addr, unsig
void traversal_algorithm::start()
{
add_requests();
// in case the routing table is empty, use the
// router nodes in the table
if (m_results.empty()) add_router_entries();
init();
add_requests();
}
boost::pool<>& traversal_algorithm::allocator() const
@ -101,20 +98,21 @@ void traversal_algorithm::traverse(node_id const& id, udp::endpoint addr)
{
#ifdef TORRENT_DHT_VERBOSE_LOGGING
if (id.is_all_zeros())
TORRENT_LOG(traversal) << time_now_string() << " WARNING: node returned a list which included a node with id 0";
TORRENT_LOG(traversal) << time_now_string() << "[" << this << "] WARNING: "
"node returned a list which included a node with id 0";
#endif
add_entry(id, addr, 0);
}
void traversal_algorithm::finished(node_id const& id)
void traversal_algorithm::finished(udp::endpoint const& ep)
{
std::vector<result>::iterator i = std::find_if(
m_results.begin()
, m_results.end()
, bind(
std::equal_to<node_id>()
, bind(&result::id, _1)
, id
std::equal_to<udp::endpoint>()
, bind(&result::endpoint, _1)
, ep
)
);
@ -138,18 +136,19 @@ void traversal_algorithm::finished(node_id const& id)
// prevent request means that the total number of requests has
// overflown. This query failed because it was the oldest one.
// So, if this is true, don't make another request
void traversal_algorithm::failed(node_id const& id, int flags)
void traversal_algorithm::failed(udp::endpoint const& ep, int flags)
{
TORRENT_ASSERT(m_invoke_count >= 0);
TORRENT_ASSERT(!id.is_all_zeros());
if (m_results.empty()) return;
std::vector<result>::iterator i = std::find_if(
m_results.begin()
, m_results.end()
, bind(
std::equal_to<node_id>()
, bind(&result::id, _1)
, id
std::equal_to<udp::endpoint>()
, bind(&result::endpoint, _1)
, ep
)
);
@ -172,9 +171,10 @@ void traversal_algorithm::failed(node_id const& id, int flags)
}
else
{
m_failed.insert(i->addr);
i->flags |= result::failed;
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(traversal) << "failed: " << i->id << " " << i->addr;
TORRENT_LOG(traversal) << " [" << this << "] failed: "
<< i->id << " " << i->endpoint();
#endif
// if this flag is set, it means we increased the
// branch factor for it, and we should restore it
@ -184,16 +184,12 @@ void traversal_algorithm::failed(node_id const& id, int flags)
// don't tell the routing table about
// node ids that we just generated ourself
if ((i->flags & result::no_id) == 0)
m_node.m_table.node_failed(id);
m_results.erase(i);
m_node.m_table.node_failed(i->id);
++m_timeouts;
--m_invoke_count;
TORRENT_ASSERT(m_invoke_count >= 0);
}
}
else
{
--m_invoke_count;
}
if (flags & prevent_request)
{
@ -228,22 +224,18 @@ void traversal_algorithm::add_requests()
)
);
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(traversal) << "nodes left (" << this << "): " << (last_iterator() - i);
TORRENT_LOG(traversal) << " [" << this << "] nodes left ("
<< this << "): " << (last_iterator() - i);
#endif
if (i == last_iterator()) break;
#ifndef BOOST_NO_EXCEPTIONS
try
if (invoke(i->id, i->endpoint()))
{
#endif
invoke(i->id, i->addr);
TORRENT_ASSERT(m_invoke_count >= 0);
++m_invoke_count;
i->flags |= result::queried;
#ifndef BOOST_NO_EXCEPTIONS
}
catch (std::exception& e) {}
#endif
}
}