made dht requests be allocated in a pool allocator for heap and runtime efficiency. Fixes to DHT error responses

This commit is contained in:
Arvid Norberg 2007-05-23 08:45:12 +00:00
parent 4bcf8b53ff
commit 99a22dcae4
18 changed files with 498 additions and 325 deletions

View File

@ -65,9 +65,11 @@ libtorrent/kademlia/closest_nodes.hpp \
libtorrent/kademlia/dht_tracker.hpp \
libtorrent/kademlia/find_data.hpp \
libtorrent/kademlia/logging.hpp \
libtorrent/kademlia/msg.hpp \
libtorrent/kademlia/node.hpp \
libtorrent/kademlia/node_entry.hpp \
libtorrent/kademlia/node_id.hpp \
libtorrent/kademlia/observer.hpp \
libtorrent/kademlia/packet_iterator.hpp \
libtorrent/kademlia/refresh.hpp \
libtorrent/kademlia/routing_table.hpp \

View File

@ -38,6 +38,8 @@ POSSIBILITY OF SUCH DAMAGE.
#include <libtorrent/kademlia/traversal_algorithm.hpp>
#include <libtorrent/kademlia/node_id.hpp>
#include <libtorrent/kademlia/routing_table.hpp>
#include <libtorrent/kademlia/observer.hpp>
#include <libtorrent/kademlia/msg.hpp>
#include <boost/function.hpp>
@ -80,6 +82,35 @@ private:
done_callback m_done_callback;
};
class closest_nodes_observer : public observer
{
public:
closest_nodes_observer(
boost::intrusive_ptr<traversal_algorithm> const& algorithm
, node_id self
, node_id target)
: observer(algorithm->allocator())
, m_algorithm(algorithm)
, m_target(target)
, m_self(self)
{}
~closest_nodes_observer();
void send(msg& p)
{
p.info_hash = m_target;
}
void timeout();
void reply(msg const&);
void abort() { m_algorithm = 0; }
private:
boost::intrusive_ptr<traversal_algorithm> m_algorithm;
node_id const m_target;
node_id const m_self;
};
} } // namespace libtorrent::dht
#endif // CLOSEST_NODES_050323_HPP

View File

@ -40,8 +40,10 @@ POSSIBILITY OF SUCH DAMAGE.
#include <libtorrent/kademlia/routing_table.hpp>
#include <libtorrent/kademlia/rpc_manager.hpp>
#include <libtorrent/kademlia/packet_iterator.hpp>
#include <boost/optional.hpp>
#include <libtorrent/kademlia/observer.hpp>
#include <libtorrent/kademlia/msg.hpp>
#include <boost/optional.hpp>
#include <boost/function.hpp>
namespace libtorrent { namespace dht
@ -89,6 +91,37 @@ private:
bool m_done;
};
class find_data_observer : public observer
{
public:
find_data_observer(
boost::intrusive_ptr<find_data> const& algorithm
, node_id self
, node_id target)
: observer(algorithm->allocator())
, m_algorithm(algorithm)
, m_target(target)
, m_self(self)
{}
~find_data_observer();
void send(msg& m)
{
m.reply = false;
m.message_id = messages::get_peers;
m.info_hash = m_target;
}
void timeout();
void reply(msg const&);
void abort() { m_algorithm = 0; }
private:
boost::intrusive_ptr<find_data> m_algorithm;
node_id const m_target;
node_id const m_self;
};
} } // namespace libtorrent::dht
#endif // FIND_DATA_050323_HPP

View File

@ -0,0 +1,102 @@
/*
Copyright (c) 2007, Arvid Norberg
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in
the documentation and/or other materials provided with the distribution.
* Neither the name of the author nor the names of its
contributors may be used to endorse or promote products derived
from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef MSG_HPP
#define MSG_HPP
#include <string>
#include <libtorrent/kademlia/node_id.hpp>
#include "libtorrent/entry.hpp"
#include <asio/ip/udp.hpp>
namespace libtorrent {
namespace dht {
typedef std::vector<char> packet_t;
using asio::ip::udp;
namespace messages
{
enum { ping = 0, find_node = 1, get_peers = 2, announce_peer = 3, error = 4 };
char const* const ids[] = { "ping", "find_node", "get_peers", "announce_peer", "error" };
} // namespace messages
struct msg
{
msg() : reply(false), piggy_backed_ping(false)
, message_id(-1), port(0) {}
// true if this message is a reply
bool reply;
// true if this is a reply with a piggy backed ping
bool piggy_backed_ping;
// the kind if message
int message_id;
// if this is a reply, a copy of the transaction id
// from the request. If it's a request, a transaction
// id that should be sent back in the reply
std::string transaction_id;
// if this packet has a piggy backed ping, this
// is the transaction id of that ping
std::string ping_transaction_id;
// the node id of the process sending the message
node_id id;
// the address of the process sending or receiving
// the message.
udp::endpoint addr;
// if this is a nodes response, these are the nodes
typedef std::vector<node_entry> nodes_t;
nodes_t nodes;
typedef std::vector<tcp::endpoint> peers_t;
peers_t peers;
// similar to transaction_id but for write operations.
entry write_token;
// the info has for peer_requests, announce_peer
// and responses
node_id info_hash;
// port for announce_peer messages
int port;
// ERROR MESSAGES
int error_code;
std::string error_msg;
};
} }
#endif

View File

@ -41,6 +41,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include <libtorrent/kademlia/routing_table.hpp>
#include <libtorrent/kademlia/rpc_manager.hpp>
#include <libtorrent/kademlia/node_id.hpp>
#include <libtorrent/kademlia/msg.hpp>
#include <libtorrent/io.hpp>
#include <libtorrent/session_settings.hpp>
@ -85,6 +86,75 @@ inline bool operator<(peer_entry const& lhs, peer_entry const& rhs)
struct null_type {};
class announce_observer : public observer
{
public:
announce_observer(boost::pool<>& allocator
, sha1_hash const& info_hash
, int listen_port
, entry const& write_token)
: observer(allocator)
, m_info_hash(info_hash)
, m_listen_port(listen_port)
, m_token(write_token)
{}
void send(msg& m)
{
m.port = m_listen_port;
m.info_hash = m_info_hash;
m.write_token = m_token;
}
void timeout() {}
void reply(msg const&) {}
void abort() {}
private:
sha1_hash m_info_hash;
int m_listen_port;
entry m_token;
};
class get_peers_observer : public observer
{
public:
get_peers_observer(sha1_hash const& info_hash
, int listen_port
, rpc_manager& rpc
, boost::function<void(std::vector<tcp::endpoint> const&, sha1_hash const&)> f)
: observer(rpc.allocator())
, m_info_hash(info_hash)
, m_listen_port(listen_port)
, m_rpc(rpc)
, m_fun(f)
{}
void send(msg& m)
{
m.port = m_listen_port;
m.info_hash = m_info_hash;
}
void timeout() {}
void reply(msg const& r)
{
m_rpc.invoke(messages::announce_peer, r.addr
, observer_ptr(new (m_rpc.allocator().malloc()) announce_observer(
m_rpc.allocator(), m_info_hash, m_listen_port, r.write_token)));
m_fun(r.peers, m_info_hash);
}
void abort() {}
private:
sha1_hash m_info_hash;
int m_listen_port;
rpc_manager& m_rpc;
boost::function<void(std::vector<tcp::endpoint> const&, sha1_hash const&)> m_fun;
};
class node_impl : boost::noncopyable
{
typedef std::map<node_id, torrent_entry> table_t;

View File

@ -0,0 +1,92 @@
/*
Copyright (c) 2007, Arvid Norberg
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in
the documentation and/or other materials provided with the distribution.
* Neither the name of the author nor the names of its
contributors may be used to endorse or promote products derived
from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef OBSERVER_HPP
#define OBSERVER_HPP
#include <boost/pool/pool.hpp>
#include <boost/detail/atomic_count.hpp>
#include <boost/intrusive_ptr.hpp>
namespace libtorrent {
namespace dht {
struct observer;
struct msg;
// defined in rpc_manager.cpp
TORRENT_EXPORT void intrusive_ptr_add_ref(observer const*);
TORRENT_EXPORT void intrusive_ptr_release(observer const*);
struct observer : boost::noncopyable
{
friend TORRENT_EXPORT void intrusive_ptr_add_ref(observer const*);
friend TORRENT_EXPORT void intrusive_ptr_release(observer const*);
observer(boost::pool<>& p)
: sent(time_now())
, pool_allocator(p)
, m_refs(0)
{}
virtual ~observer() {}
// these two callbacks lets the observer add
// information to the message before it's sent
virtual void send(msg& m) = 0;
// this is called when a reply is received
virtual void reply(msg const& m) = 0;
// this is called when no reply has been received within
// some timeout
virtual void timeout() = 0;
// if this is called the destructor should
// not invoke any new messages, and should
// only clean up. It means the rpc-manager
// is being destructed
virtual void abort() = 0;
udp::endpoint target_addr;
ptime sent;
private:
boost::pool<>& pool_allocator;
// reference counter for intrusive_ptr
mutable boost::detail::atomic_count m_refs;
};
typedef boost::intrusive_ptr<observer> observer_ptr;
} }
#endif

View File

@ -37,6 +37,8 @@ POSSIBILITY OF SUCH DAMAGE.
#include <libtorrent/kademlia/traversal_algorithm.hpp>
#include <libtorrent/kademlia/node_id.hpp>
#include <libtorrent/kademlia/observer.hpp>
#include <libtorrent/kademlia/msg.hpp>
#include <boost/function.hpp>
@ -98,6 +100,59 @@ private:
std::vector<result>::iterator m_leftover_nodes_iterator;
};
class refresh_observer : public observer
{
public:
refresh_observer(
boost::intrusive_ptr<refresh> const& algorithm
, node_id self
, node_id target)
: observer(algorithm->allocator())
, m_target(target)
, m_self(self)
, m_algorithm(algorithm)
{}
~refresh_observer();
void send(msg& m)
{
m.info_hash = m_target;
}
void timeout();
void reply(msg const& m);
void abort() { m_algorithm = 0; }
private:
node_id const m_target;
node_id const m_self;
boost::intrusive_ptr<refresh> m_algorithm;
};
class ping_observer : public observer
{
public:
ping_observer(
boost::intrusive_ptr<refresh> const& algorithm
, node_id self)
: observer(algorithm->allocator())
, m_self(self)
, m_algorithm(algorithm)
{}
~ping_observer();
void send(msg& p) {}
void timeout();
void reply(msg const& m);
void abort() { m_algorithm = 0; }
private:
node_id const m_self;
boost::intrusive_ptr<refresh> m_algorithm;
};
template<class InIt>
inline refresh::refresh(
node_id target

View File

@ -40,6 +40,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include <boost/noncopyable.hpp>
#include <boost/cstdint.hpp>
#include <boost/array.hpp>
#include <boost/pool/pool.hpp>
#include <libtorrent/socket.hpp>
#include <libtorrent/entry.hpp>
@ -47,96 +48,27 @@ POSSIBILITY OF SUCH DAMAGE.
#include <libtorrent/kademlia/node_id.hpp>
#include <libtorrent/kademlia/logging.hpp>
#include <libtorrent/kademlia/node_entry.hpp>
#include <libtorrent/kademlia/observer.hpp>
#include "libtorrent/time.hpp"
namespace libtorrent { namespace dht
{
struct observer;
using asio::ip::udp;
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_DECLARE_LOG(rpc);
#endif
typedef std::vector<char> packet_t;
namespace messages
struct null_observer : public observer
{
enum { ping = 0, find_node = 1, get_peers = 2, announce_peer = 3, error = 4 };
char const* const ids[] = { "ping", "find_node", "get_peers", "announce_peer", "error" };
} // namespace messages
struct msg
{
msg() : reply(false), piggy_backed_ping(false)
, message_id(-1), port(0) {}
// true if this message is a reply
bool reply;
// true if this is a reply with a piggy backed ping
bool piggy_backed_ping;
// the kind if message
int message_id;
// if this is a reply, a copy of the transaction id
// from the request. If it's a request, a transaction
// id that should be sent back in the reply
std::string transaction_id;
// if this packet has a piggy backed ping, this
// is the transaction id of that ping
std::string ping_transaction_id;
// the node id of the process sending the message
node_id id;
// the address of the process sending or receiving
// the message.
udp::endpoint addr;
// if this is a nodes response, these are the nodes
typedef std::vector<node_entry> nodes_t;
nodes_t nodes;
typedef std::vector<tcp::endpoint> peers_t;
peers_t peers;
// similar to transaction_id but for write operations.
entry write_token;
// the info has for peer_requests, announce_peer
// and responses
node_id info_hash;
// port for announce_peer messages
int port;
// ERROR MESSAGES
int error_code;
std::string error_msg;
};
struct observer : boost::noncopyable
{
observer(): sent(time_now())
{}
virtual ~observer() {}
// this two callbacks lets the observer add
// information to the message before it's sent
virtual void send(msg& m) = 0;
// this is called when a reply is received
virtual void reply(msg const& m) = 0;
// this is called when no reply has been received within
// some timeout
virtual void timeout() = 0;
// if this is called the destructor should
// not invoke any new messages, and should
// only clean up. It means the rpc-manager
// is being destructed
virtual void abort() = 0;
udp::endpoint target_addr;
ptime sent;
null_observer(boost::pool<>& allocator): observer(allocator) {}
virtual void reply(msg const&) {}
virtual void timeout() {}
virtual void send(msg&) {}
void abort() {}
};
class routing_table;
@ -156,7 +88,7 @@ public:
time_duration tick();
void invoke(int message_id, udp::endpoint target
, boost::shared_ptr<observer> o);
, observer_ptr o);
void reply(msg& m);
void reply_with_ping(msg& m);
@ -165,19 +97,24 @@ public:
void check_invariant() const;
#endif
boost::pool<>& allocator() const
{ return m_pool_allocator; }
private:
enum { max_transactions = 2048 };
unsigned int new_transaction_id(boost::shared_ptr<observer> o);
unsigned int new_transaction_id(observer_ptr o);
void update_oldest_transaction_id();
boost::uint32_t calc_connection_id(udp::endpoint addr);
typedef boost::array<boost::shared_ptr<observer>, max_transactions>
mutable boost::pool<> m_pool_allocator;
typedef boost::array<observer_ptr, max_transactions>
transactions_t;
transactions_t m_transactions;
std::vector<boost::shared_ptr<observer> > m_aborted_transactions;
std::vector<observer_ptr > m_aborted_transactions;
// this is the next transaction id to be used
int m_next_transaction_id;

View File

@ -43,6 +43,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include <boost/noncopyable.hpp>
#include <boost/intrusive_ptr.hpp>
#include <boost/bind.hpp>
#include <boost/pool/pool.hpp>
namespace libtorrent { namespace dht
{
@ -60,6 +61,7 @@ public:
void finished(node_id const& id);
void failed(node_id const& id, bool prevent_request = false);
virtual ~traversal_algorithm() {}
boost::pool<>& allocator() const;
protected:
template<class InIt>

View File

@ -89,7 +89,7 @@ namespace libtorrent
}
TORRENT_EXPORT void intrusive_ptr_add_ref(peer_connection const*);
TORRENT_EXPORT void intrusive_ptr_release(peer_connection const*);
TORRENT_EXPORT void intrusive_ptr_release(peer_connection const*);
struct TORRENT_EXPORT protocol_error: std::runtime_error
{
@ -470,7 +470,7 @@ namespace libtorrent
// the time when we last got a part of a
// piece packet from this peer
ptime m_last_piece;
ptime m_last_piece;
int m_packet_size;
int m_recv_pos;

View File

@ -41,36 +41,6 @@ namespace libtorrent { namespace dht
using asio::ip::udp;
typedef boost::shared_ptr<observer> observer_ptr;
class closest_nodes_observer : public observer
{
public:
closest_nodes_observer(
boost::intrusive_ptr<traversal_algorithm> const& algorithm
, node_id self
, node_id target)
: m_algorithm(algorithm)
, m_target(target)
, m_self(self)
{}
~closest_nodes_observer();
void send(msg& p)
{
p.info_hash = m_target;
}
void timeout();
void reply(msg const&);
void abort() { m_algorithm = 0; }
private:
boost::intrusive_ptr<traversal_algorithm> m_algorithm;
node_id const m_target;
node_id const m_self;
};
closest_nodes_observer::~closest_nodes_observer()
{
if (m_algorithm) m_algorithm->failed(m_self, true);
@ -129,8 +99,8 @@ closest_nodes::closest_nodes(
void closest_nodes::invoke(node_id const& id, udp::endpoint addr)
{
observer_ptr p(new closest_nodes_observer(this, id, m_target));
m_rpc.invoke(messages::find_node, addr, p);
observer_ptr o(new (m_rpc.allocator().malloc()) closest_nodes_observer(this, id, m_target));
m_rpc.invoke(messages::find_node, addr, o);
}
void closest_nodes::done()

View File

@ -628,7 +628,7 @@ namespace libtorrent { namespace dht
m.error_msg = list.back().string();
m.error_code = list.front().integer();
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(dht_tracker) << " error: " << m.error_code << " "
TORRENT_LOG(dht_tracker) << " incoming error: " << m.error_code << " "
<< m.error_msg;
#endif
throw std::runtime_error("DHT error message");
@ -658,9 +658,9 @@ namespace libtorrent { namespace dht
#ifdef TORRENT_DHT_VERBOSE_LOGGING
int current_buffer = (m_buffer + 1) & 1;
std::string msg(m_in_buf[current_buffer].begin()
, m_in_buf[current_buffer].end());
, m_in_buf[current_buffer].begin() + bytes_transferred);
TORRENT_LOG(dht_tracker) << "invalid incoming packet: "
<< e.what() << "\n" << msg.c_str() << "\n";
<< e.what() << "\n" << msg << "\n";
#endif
}
}
@ -794,6 +794,7 @@ namespace libtorrent { namespace dht
using libtorrent::bencode;
using libtorrent::entry;
entry e(entry::dictionary_t);
assert(!m.transaction_id.empty() || m.message_id == messages::error);
e["t"] = m.transaction_id;
static char const version_str[] = {'L', 'T'
, LIBTORRENT_VERSION_MAJOR, LIBTORRENT_VERSION_MINOR};
@ -816,7 +817,7 @@ namespace libtorrent { namespace dht
e["e"] = error_list;
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(dht_tracker) << time_now_string()
<< " error: " << m.error_code << " " << m.error_msg;
<< " outgoing error: " << m.error_code << " " << m.error_msg;
#endif
}
else if (m.reply)

View File

@ -40,38 +40,6 @@ POSSIBILITY OF SUCH DAMAGE.
namespace libtorrent { namespace dht
{
typedef boost::shared_ptr<observer> observer_ptr;
class find_data_observer : public observer
{
public:
find_data_observer(
boost::intrusive_ptr<find_data> const& algorithm
, node_id self
, node_id target)
: m_algorithm(algorithm)
, m_target(target)
, m_self(self)
{}
~find_data_observer();
void send(msg& m)
{
m.reply = false;
m.message_id = messages::get_peers;
m.info_hash = m_target;
}
void timeout();
void reply(msg const&);
void abort() { m_algorithm = 0; }
private:
boost::intrusive_ptr<find_data> m_algorithm;
node_id const m_target;
node_id const m_self;
};
find_data_observer::~find_data_observer()
{
if (m_algorithm) m_algorithm->failed(m_self);
@ -141,8 +109,8 @@ void find_data::invoke(node_id const& id, asio::ip::udp::endpoint addr)
return;
}
observer_ptr p(new find_data_observer(this, id, m_target));
m_rpc.invoke(messages::get_peers, addr, p);
observer_ptr o(new (m_rpc.allocator().malloc()) find_data_observer(this, id, m_target));
m_rpc.invoke(messages::get_peers, addr, o);
}
void find_data::got_data(msg const* m)

View File

@ -63,8 +63,6 @@ namespace
}
#endif
typedef boost::shared_ptr<observer> observer_ptr;
// TODO: configurable?
enum { announce_interval = 30 };
@ -267,70 +265,6 @@ void node_impl::incoming(msg const& m)
namespace
{
class announce_observer : public observer
{
public:
announce_observer(sha1_hash const& info_hash, int listen_port
, entry const& write_token)
: m_info_hash(info_hash)
, m_listen_port(listen_port)
, m_token(write_token)
{}
void send(msg& m)
{
m.port = m_listen_port;
m.info_hash = m_info_hash;
m.write_token = m_token;
}
void timeout() {}
void reply(msg const&) {}
void abort() {}
private:
sha1_hash m_info_hash;
int m_listen_port;
entry m_token;
};
class get_peers_observer : public observer
{
public:
get_peers_observer(sha1_hash const& info_hash, int listen_port
, rpc_manager& rpc
, boost::function<void(std::vector<tcp::endpoint> const&, sha1_hash const&)> f)
: m_info_hash(info_hash)
, m_listen_port(listen_port)
, m_rpc(rpc)
, m_fun(f)
{}
void send(msg& m)
{
m.port = m_listen_port;
m.info_hash = m_info_hash;
}
void timeout() {}
void reply(msg const& r)
{
m_rpc.invoke(messages::announce_peer, r.addr
, boost::shared_ptr<observer>(
new announce_observer(m_info_hash, m_listen_port, r.write_token)));
m_fun(r.peers, m_info_hash);
}
void abort() {}
private:
sha1_hash m_info_hash;
int m_listen_port;
rpc_manager& m_rpc;
boost::function<void(std::vector<tcp::endpoint> const&, sha1_hash const&)> m_fun;
};
void announce_fun(std::vector<node_entry> const& v, rpc_manager& rpc
, int listen_port, sha1_hash const& ih
, boost::function<void(std::vector<tcp::endpoint> const&, sha1_hash const&)> f)
@ -340,23 +274,11 @@ namespace
for (std::vector<node_entry>::const_iterator i = v.begin()
, end(v.end()); i != end; ++i)
{
rpc.invoke(messages::get_peers, i->addr, boost::shared_ptr<observer>(
new get_peers_observer(ih, listen_port, rpc, f)));
rpc.invoke(messages::get_peers, i->addr, observer_ptr(
new (rpc.allocator().malloc()) get_peers_observer(ih, listen_port, rpc, f)));
nodes = true;
}
}
}
namespace
{
struct dummy_observer : observer
{
virtual void reply(msg const&) {}
virtual void timeout() {}
virtual void send(msg&) {}
virtual void abort() {}
};
}
void node_impl::add_router_node(udp::endpoint router)
@ -368,8 +290,8 @@ void node_impl::add_node(udp::endpoint node)
{
// ping the node, and if we get a reply, it
// will be added to the routing table
observer_ptr p(new dummy_observer());
m_rpc.invoke(messages::ping, node, p);
observer_ptr o(new (m_rpc.allocator().malloc()) null_observer(m_rpc.allocator()));
m_rpc.invoke(messages::ping, node, o);
}
void node_impl::announce(sha1_hash const& info_hash, int listen_port

View File

@ -36,6 +36,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include <libtorrent/kademlia/routing_table.hpp>
#include <libtorrent/kademlia/rpc_manager.hpp>
#include <libtorrent/kademlia/logging.hpp>
#include <libtorrent/kademlia/msg.hpp>
#include <libtorrent/io.hpp>
@ -52,38 +53,6 @@ using asio::ip::udp;
TORRENT_DEFINE_LOG(refresh)
#endif
typedef boost::shared_ptr<observer> observer_ptr;
class refresh_observer : public observer
{
public:
refresh_observer(
boost::intrusive_ptr<refresh> const& algorithm
, node_id self
, node_id target
)
: m_target(target)
, m_self(self)
, m_algorithm(algorithm)
{}
~refresh_observer();
void send(msg& m)
{
m.info_hash = m_target;
}
void timeout();
void reply(msg const& m);
void abort() { m_algorithm = 0; }
private:
node_id const m_target;
node_id const m_self;
boost::intrusive_ptr<refresh> m_algorithm;
};
refresh_observer::~refresh_observer()
{
if (m_algorithm) m_algorithm->failed(m_self, true);
@ -112,29 +81,6 @@ void refresh_observer::timeout()
m_algorithm = 0;
}
class ping_observer : public observer
{
public:
ping_observer(
boost::intrusive_ptr<refresh> const& algorithm
, node_id self
)
: m_self(self)
, m_algorithm(algorithm)
{}
~ping_observer();
void send(msg& p) {}
void timeout();
void reply(msg const& m);
void abort() { m_algorithm = 0; }
private:
node_id const m_self;
boost::intrusive_ptr<refresh> m_algorithm;
};
ping_observer::~ping_observer()
{
if (m_algorithm) m_algorithm->ping_timeout(m_self, true);
@ -157,13 +103,10 @@ void ping_observer::timeout()
void refresh::invoke(node_id const& nid, udp::endpoint addr)
{
observer_ptr p(new refresh_observer(
this
, nid
, m_target
));
observer_ptr o(new (m_rpc.allocator().malloc()) refresh_observer(
this, nid, m_target));
m_rpc.invoke(messages::find_node, addr, p);
m_rpc.invoke(messages::find_node, addr, o);
}
void refresh::done()
@ -211,8 +154,9 @@ void refresh::invoke_pings_or_finish(bool prevent_request)
try
{
observer_ptr p(new ping_observer(this, node.id));
m_rpc.invoke(messages::ping, node.addr, p);
observer_ptr o(new (m_rpc.allocator().malloc()) ping_observer(
this, node.id));
m_rpc.invoke(messages::ping, node.addr, o);
++m_active_pings;
++m_leftover_nodes_iterator;
}

View File

@ -34,12 +34,23 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/socket.hpp"
#include <boost/bind.hpp>
#include <boost/mpl/max_element.hpp>
#include <boost/mpl/vector.hpp>
#include <boost/mpl/sizeof.hpp>
#include <boost/mpl/transform_view.hpp>
#include <boost/mpl/deref.hpp>
#include <boost/lexical_cast.hpp>
#include <libtorrent/io.hpp>
#include <libtorrent/invariant_check.hpp>
#include <libtorrent/kademlia/rpc_manager.hpp>
#include <libtorrent/kademlia/logging.hpp>
#include <libtorrent/kademlia/routing_table.hpp>
#include <libtorrent/kademlia/find_data.hpp>
#include <libtorrent/kademlia/closest_nodes.hpp>
#include <libtorrent/kademlia/refresh.hpp>
#include <libtorrent/kademlia/node.hpp>
#include <libtorrent/kademlia/observer.hpp>
#include <libtorrent/hasher.hpp>
#include <fstream>
@ -51,16 +62,51 @@ namespace libtorrent { namespace dht
{
namespace io = libtorrent::detail;
namespace mpl = boost::mpl;
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_DEFINE_LOG(rpc)
#endif
void intrusive_ptr_add_ref(observer const* o)
{
assert(o->m_refs >= 0);
assert(o != 0);
++o->m_refs;
}
void intrusive_ptr_release(observer const* o)
{
assert(o->m_refs > 0);
assert(o != 0);
if (--o->m_refs == 0)
{
boost::pool<>& p = o->pool_allocator;
o->~observer();
p.ordered_free(const_cast<observer*>(o));
}
}
node_id generate_id();
typedef mpl::vector<
closest_nodes_observer
, find_data_observer
, announce_observer
, get_peers_observer
, refresh_observer
, ping_observer
, null_observer
> observer_types;
typedef mpl::max_element<
mpl::transform_view<observer_types, mpl::sizeof_<mpl::_1> >
>::type max_observer_type_iter;
rpc_manager::rpc_manager(fun const& f, node_id const& our_id
, routing_table& table, send_fun const& sf)
: m_next_transaction_id(rand() % max_transactions)
: m_pool_allocator(sizeof(mpl::deref<max_observer_type_iter::base>::type))
, m_next_transaction_id(rand() % max_transactions)
, m_oldest_transaction_id(m_next_transaction_id)
, m_incoming(f)
, m_send(sf)
@ -124,12 +170,14 @@ bool rpc_manager::incoming(msg const& m)
<< m.transaction_id.size() << " from " << m.addr;
#endif
msg reply;
reply.reply = true;
reply.message_id = messages::error;
reply.error_code = 203; // Protocol error
reply.error_msg = "reply with invalid transaction id, size " + m.transaction_id.size();
reply.error_msg = "reply with invalid transaction id, size "
+ boost::lexical_cast<std::string>(m.transaction_id.size());
reply.addr = m.addr;
reply.transaction_id = "";
m_send(m);
m_send(reply);
return false;
}
@ -144,16 +192,17 @@ bool rpc_manager::incoming(msg const& m)
<< tid << " from " << m.addr;
#endif
msg reply;
reply.reply = true;
reply.message_id = messages::error;
reply.error_code = 203; // Protocol error
reply.error_msg = "reply with invalid transaction id";
reply.addr = m.addr;
reply.transaction_id = "";
m_send(m);
m_send(reply);
return false;
}
boost::shared_ptr<observer> o = m_transactions[tid];
observer_ptr o = m_transactions[tid];
if (!o)
{
@ -179,7 +228,7 @@ bool rpc_manager::incoming(msg const& m)
<< std::endl;
#endif
o->reply(m);
m_transactions[tid].reset();
m_transactions[tid] = 0;
if (m.piggy_backed_ping)
{
@ -214,7 +263,7 @@ time_duration rpc_manager::tick()
if (m_next_transaction_id == m_oldest_transaction_id) return milliseconds(timeout_ms);
std::vector<shared_ptr<observer> > timeouts;
std::vector<observer_ptr > timeouts;
for (;m_next_transaction_id != m_oldest_transaction_id;
m_oldest_transaction_id = (m_oldest_transaction_id + 1) % max_transactions)
@ -222,7 +271,7 @@ time_duration rpc_manager::tick()
assert(m_oldest_transaction_id >= 0);
assert(m_oldest_transaction_id < max_transactions);
boost::shared_ptr<observer> o = m_transactions[m_oldest_transaction_id];
observer_ptr o = m_transactions[m_oldest_transaction_id];
if (!o) continue;
time_duration diff = o->sent + milliseconds(timeout_ms) - time_now();
@ -234,7 +283,7 @@ time_duration rpc_manager::tick()
try
{
m_transactions[m_oldest_transaction_id].reset();
m_transactions[m_oldest_transaction_id] = 0;
timeouts.push_back(o);
} catch (std::exception) {}
}
@ -245,11 +294,11 @@ time_duration rpc_manager::tick()
// clear the aborted transactions, will likely
// generate new requests. We need to swap, since the
// destrutors may add more observers to the m_aborted_transactions
std::vector<shared_ptr<observer> >().swap(m_aborted_transactions);
std::vector<observer_ptr >().swap(m_aborted_transactions);
return milliseconds(timeout_ms);
}
unsigned int rpc_manager::new_transaction_id(shared_ptr<observer> o)
unsigned int rpc_manager::new_transaction_id(observer_ptr o)
{
INVARIANT_CHECK;
@ -261,7 +310,7 @@ unsigned int rpc_manager::new_transaction_id(shared_ptr<observer> o)
// it will prevent it from spawning new requests right now,
// since that would break the invariant
m_aborted_transactions.push_back(m_transactions[m_next_transaction_id]);
m_transactions[m_next_transaction_id].reset();
m_transactions[m_next_transaction_id] = 0;
assert(m_oldest_transaction_id == m_next_transaction_id);
}
assert(!m_transactions[tid]);
@ -294,7 +343,7 @@ void rpc_manager::update_oldest_transaction_id()
}
void rpc_manager::invoke(int message_id, udp::endpoint target_addr
, shared_ptr<observer> o)
, observer_ptr o)
{
INVARIANT_CHECK;
@ -352,17 +401,6 @@ void rpc_manager::reply(msg& m)
m_send(m);
}
namespace
{
struct dummy_observer : observer
{
virtual void reply(msg const&) {}
virtual void timeout() {}
virtual void send(msg&) {}
void abort() {}
};
}
void rpc_manager::reply_with_ping(msg& m)
{
INVARIANT_CHECK;
@ -377,7 +415,7 @@ void rpc_manager::reply_with_ping(msg& m)
std::back_insert_iterator<std::string> out(m.ping_transaction_id);
io::write_uint16(m_next_transaction_id, out);
boost::shared_ptr<observer> o(new dummy_observer);
observer_ptr o(new (allocator().malloc()) null_observer(allocator()));
assert(!m_transactions[m_next_transaction_id]);
o->sent = time_now();
o->target_addr = m.addr;

View File

@ -76,6 +76,11 @@ void traversal_algorithm::add_entry(node_id const& id, udp::endpoint addr, unsig
}
}
boost::pool<>& traversal_algorithm::allocator() const
{
return m_rpc.allocator();
}
void traversal_algorithm::traverse(node_id const& id, udp::endpoint addr)
{
add_entry(id, addr, 0);

View File

@ -174,6 +174,7 @@ namespace libtorrent
torrent_info const& info = t->torrent_file();
std::string request;
request.reserve(400);
int size = r.length;
const int block_size = t->block_size();