diff --git a/include/Makefile.am b/include/Makefile.am index 2d886203e..9a9e650cf 100644 --- a/include/Makefile.am +++ b/include/Makefile.am @@ -65,9 +65,11 @@ libtorrent/kademlia/closest_nodes.hpp \ libtorrent/kademlia/dht_tracker.hpp \ libtorrent/kademlia/find_data.hpp \ libtorrent/kademlia/logging.hpp \ +libtorrent/kademlia/msg.hpp \ libtorrent/kademlia/node.hpp \ libtorrent/kademlia/node_entry.hpp \ libtorrent/kademlia/node_id.hpp \ +libtorrent/kademlia/observer.hpp \ libtorrent/kademlia/packet_iterator.hpp \ libtorrent/kademlia/refresh.hpp \ libtorrent/kademlia/routing_table.hpp \ diff --git a/include/libtorrent/kademlia/closest_nodes.hpp b/include/libtorrent/kademlia/closest_nodes.hpp index d5580b9c9..244e4bb38 100644 --- a/include/libtorrent/kademlia/closest_nodes.hpp +++ b/include/libtorrent/kademlia/closest_nodes.hpp @@ -38,6 +38,8 @@ POSSIBILITY OF SUCH DAMAGE. #include #include #include +#include +#include #include @@ -80,6 +82,35 @@ private: done_callback m_done_callback; }; +class closest_nodes_observer : public observer +{ +public: + closest_nodes_observer( + boost::intrusive_ptr const& algorithm + , node_id self + , node_id target) + : observer(algorithm->allocator()) + , m_algorithm(algorithm) + , m_target(target) + , m_self(self) + {} + ~closest_nodes_observer(); + + void send(msg& p) + { + p.info_hash = m_target; + } + + void timeout(); + void reply(msg const&); + void abort() { m_algorithm = 0; } + +private: + boost::intrusive_ptr m_algorithm; + node_id const m_target; + node_id const m_self; +}; + } } // namespace libtorrent::dht #endif // CLOSEST_NODES_050323_HPP diff --git a/include/libtorrent/kademlia/find_data.hpp b/include/libtorrent/kademlia/find_data.hpp index bbafcdd77..17d77c9d8 100644 --- a/include/libtorrent/kademlia/find_data.hpp +++ b/include/libtorrent/kademlia/find_data.hpp @@ -40,8 +40,10 @@ POSSIBILITY OF SUCH DAMAGE. #include #include #include -#include +#include +#include +#include #include namespace libtorrent { namespace dht @@ -89,6 +91,37 @@ private: bool m_done; }; +class find_data_observer : public observer +{ +public: + find_data_observer( + boost::intrusive_ptr const& algorithm + , node_id self + , node_id target) + : observer(algorithm->allocator()) + , m_algorithm(algorithm) + , m_target(target) + , m_self(self) + {} + ~find_data_observer(); + + void send(msg& m) + { + m.reply = false; + m.message_id = messages::get_peers; + m.info_hash = m_target; + } + + void timeout(); + void reply(msg const&); + void abort() { m_algorithm = 0; } + +private: + boost::intrusive_ptr m_algorithm; + node_id const m_target; + node_id const m_self; +}; + } } // namespace libtorrent::dht #endif // FIND_DATA_050323_HPP diff --git a/include/libtorrent/kademlia/msg.hpp b/include/libtorrent/kademlia/msg.hpp new file mode 100644 index 000000000..a205ce463 --- /dev/null +++ b/include/libtorrent/kademlia/msg.hpp @@ -0,0 +1,102 @@ +/* + +Copyright (c) 2007, Arvid Norberg +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in + the documentation and/or other materials provided with the distribution. + * Neither the name of the author nor the names of its + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. + +*/ + +#ifndef MSG_HPP +#define MSG_HPP + +#include +#include +#include "libtorrent/entry.hpp" +#include + +namespace libtorrent { +namespace dht { + +typedef std::vector packet_t; + +using asio::ip::udp; + +namespace messages +{ + enum { ping = 0, find_node = 1, get_peers = 2, announce_peer = 3, error = 4 }; + char const* const ids[] = { "ping", "find_node", "get_peers", "announce_peer", "error" }; +} // namespace messages + +struct msg +{ + msg() : reply(false), piggy_backed_ping(false) + , message_id(-1), port(0) {} + + // true if this message is a reply + bool reply; + // true if this is a reply with a piggy backed ping + bool piggy_backed_ping; + // the kind if message + int message_id; + // if this is a reply, a copy of the transaction id + // from the request. If it's a request, a transaction + // id that should be sent back in the reply + std::string transaction_id; + // if this packet has a piggy backed ping, this + // is the transaction id of that ping + std::string ping_transaction_id; + // the node id of the process sending the message + node_id id; + // the address of the process sending or receiving + // the message. + udp::endpoint addr; + // if this is a nodes response, these are the nodes + typedef std::vector nodes_t; + nodes_t nodes; + + typedef std::vector peers_t; + peers_t peers; + + // similar to transaction_id but for write operations. + entry write_token; + + // the info has for peer_requests, announce_peer + // and responses + node_id info_hash; + + // port for announce_peer messages + int port; + + // ERROR MESSAGES + int error_code; + std::string error_msg; +}; + + +} } + +#endif diff --git a/include/libtorrent/kademlia/node.hpp b/include/libtorrent/kademlia/node.hpp index 8752af833..850333043 100644 --- a/include/libtorrent/kademlia/node.hpp +++ b/include/libtorrent/kademlia/node.hpp @@ -41,6 +41,7 @@ POSSIBILITY OF SUCH DAMAGE. #include #include #include +#include #include #include @@ -85,6 +86,75 @@ inline bool operator<(peer_entry const& lhs, peer_entry const& rhs) struct null_type {}; +class announce_observer : public observer +{ +public: + announce_observer(boost::pool<>& allocator + , sha1_hash const& info_hash + , int listen_port + , entry const& write_token) + : observer(allocator) + , m_info_hash(info_hash) + , m_listen_port(listen_port) + , m_token(write_token) + {} + + void send(msg& m) + { + m.port = m_listen_port; + m.info_hash = m_info_hash; + m.write_token = m_token; + } + + void timeout() {} + void reply(msg const&) {} + void abort() {} + +private: + sha1_hash m_info_hash; + int m_listen_port; + entry m_token; +}; + +class get_peers_observer : public observer +{ +public: + get_peers_observer(sha1_hash const& info_hash + , int listen_port + , rpc_manager& rpc + , boost::function const&, sha1_hash const&)> f) + : observer(rpc.allocator()) + , m_info_hash(info_hash) + , m_listen_port(listen_port) + , m_rpc(rpc) + , m_fun(f) + {} + + void send(msg& m) + { + m.port = m_listen_port; + m.info_hash = m_info_hash; + } + + void timeout() {} + void reply(msg const& r) + { + m_rpc.invoke(messages::announce_peer, r.addr + , observer_ptr(new (m_rpc.allocator().malloc()) announce_observer( + m_rpc.allocator(), m_info_hash, m_listen_port, r.write_token))); + m_fun(r.peers, m_info_hash); + } + void abort() {} + +private: + sha1_hash m_info_hash; + int m_listen_port; + rpc_manager& m_rpc; + boost::function const&, sha1_hash const&)> m_fun; +}; + + + class node_impl : boost::noncopyable { typedef std::map table_t; diff --git a/include/libtorrent/kademlia/observer.hpp b/include/libtorrent/kademlia/observer.hpp new file mode 100644 index 000000000..141460dc0 --- /dev/null +++ b/include/libtorrent/kademlia/observer.hpp @@ -0,0 +1,92 @@ +/* + +Copyright (c) 2007, Arvid Norberg +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in + the documentation and/or other materials provided with the distribution. + * Neither the name of the author nor the names of its + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. + +*/ + +#ifndef OBSERVER_HPP +#define OBSERVER_HPP + +#include +#include +#include + +namespace libtorrent { +namespace dht { + +struct observer; +struct msg; + +// defined in rpc_manager.cpp +TORRENT_EXPORT void intrusive_ptr_add_ref(observer const*); +TORRENT_EXPORT void intrusive_ptr_release(observer const*); + +struct observer : boost::noncopyable +{ + friend TORRENT_EXPORT void intrusive_ptr_add_ref(observer const*); + friend TORRENT_EXPORT void intrusive_ptr_release(observer const*); + + observer(boost::pool<>& p) + : sent(time_now()) + , pool_allocator(p) + , m_refs(0) + {} + + virtual ~observer() {} + + // these two callbacks lets the observer add + // information to the message before it's sent + virtual void send(msg& m) = 0; + + // this is called when a reply is received + virtual void reply(msg const& m) = 0; + + // this is called when no reply has been received within + // some timeout + virtual void timeout() = 0; + + // if this is called the destructor should + // not invoke any new messages, and should + // only clean up. It means the rpc-manager + // is being destructed + virtual void abort() = 0; + + udp::endpoint target_addr; + ptime sent; +private: + boost::pool<>& pool_allocator; + // reference counter for intrusive_ptr + mutable boost::detail::atomic_count m_refs; +}; + +typedef boost::intrusive_ptr observer_ptr; + +} } + +#endif diff --git a/include/libtorrent/kademlia/refresh.hpp b/include/libtorrent/kademlia/refresh.hpp index 7231b26f2..953c4d871 100644 --- a/include/libtorrent/kademlia/refresh.hpp +++ b/include/libtorrent/kademlia/refresh.hpp @@ -37,6 +37,8 @@ POSSIBILITY OF SUCH DAMAGE. #include #include +#include +#include #include @@ -98,6 +100,59 @@ private: std::vector::iterator m_leftover_nodes_iterator; }; +class refresh_observer : public observer +{ +public: + refresh_observer( + boost::intrusive_ptr const& algorithm + , node_id self + , node_id target) + : observer(algorithm->allocator()) + , m_target(target) + , m_self(self) + , m_algorithm(algorithm) + {} + ~refresh_observer(); + + void send(msg& m) + { + m.info_hash = m_target; + } + + void timeout(); + void reply(msg const& m); + void abort() { m_algorithm = 0; } + + +private: + node_id const m_target; + node_id const m_self; + boost::intrusive_ptr m_algorithm; +}; + +class ping_observer : public observer +{ +public: + ping_observer( + boost::intrusive_ptr const& algorithm + , node_id self) + : observer(algorithm->allocator()) + , m_self(self) + , m_algorithm(algorithm) + {} + ~ping_observer(); + + void send(msg& p) {} + void timeout(); + void reply(msg const& m); + void abort() { m_algorithm = 0; } + + +private: + node_id const m_self; + boost::intrusive_ptr m_algorithm; +}; + template inline refresh::refresh( node_id target diff --git a/include/libtorrent/kademlia/rpc_manager.hpp b/include/libtorrent/kademlia/rpc_manager.hpp index c00b1b3fb..a7c47f29a 100644 --- a/include/libtorrent/kademlia/rpc_manager.hpp +++ b/include/libtorrent/kademlia/rpc_manager.hpp @@ -40,6 +40,7 @@ POSSIBILITY OF SUCH DAMAGE. #include #include #include +#include #include #include @@ -47,96 +48,27 @@ POSSIBILITY OF SUCH DAMAGE. #include #include #include +#include #include "libtorrent/time.hpp" namespace libtorrent { namespace dht { +struct observer; + using asio::ip::udp; #ifdef TORRENT_DHT_VERBOSE_LOGGING TORRENT_DECLARE_LOG(rpc); #endif -typedef std::vector packet_t; - -namespace messages +struct null_observer : public observer { - enum { ping = 0, find_node = 1, get_peers = 2, announce_peer = 3, error = 4 }; - char const* const ids[] = { "ping", "find_node", "get_peers", "announce_peer", "error" }; -} // namespace messages - -struct msg -{ - msg() : reply(false), piggy_backed_ping(false) - , message_id(-1), port(0) {} - - // true if this message is a reply - bool reply; - // true if this is a reply with a piggy backed ping - bool piggy_backed_ping; - // the kind if message - int message_id; - // if this is a reply, a copy of the transaction id - // from the request. If it's a request, a transaction - // id that should be sent back in the reply - std::string transaction_id; - // if this packet has a piggy backed ping, this - // is the transaction id of that ping - std::string ping_transaction_id; - // the node id of the process sending the message - node_id id; - // the address of the process sending or receiving - // the message. - udp::endpoint addr; - // if this is a nodes response, these are the nodes - typedef std::vector nodes_t; - nodes_t nodes; - - typedef std::vector peers_t; - peers_t peers; - - // similar to transaction_id but for write operations. - entry write_token; - - // the info has for peer_requests, announce_peer - // and responses - node_id info_hash; - - // port for announce_peer messages - int port; - - // ERROR MESSAGES - int error_code; - std::string error_msg; -}; - -struct observer : boost::noncopyable -{ - observer(): sent(time_now()) - {} - - virtual ~observer() {} - - // this two callbacks lets the observer add - // information to the message before it's sent - virtual void send(msg& m) = 0; - - // this is called when a reply is received - virtual void reply(msg const& m) = 0; - - // this is called when no reply has been received within - // some timeout - virtual void timeout() = 0; - - // if this is called the destructor should - // not invoke any new messages, and should - // only clean up. It means the rpc-manager - // is being destructed - virtual void abort() = 0; - - udp::endpoint target_addr; - ptime sent; + null_observer(boost::pool<>& allocator): observer(allocator) {} + virtual void reply(msg const&) {} + virtual void timeout() {} + virtual void send(msg&) {} + void abort() {} }; class routing_table; @@ -156,7 +88,7 @@ public: time_duration tick(); void invoke(int message_id, udp::endpoint target - , boost::shared_ptr o); + , observer_ptr o); void reply(msg& m); void reply_with_ping(msg& m); @@ -165,19 +97,24 @@ public: void check_invariant() const; #endif + boost::pool<>& allocator() const + { return m_pool_allocator; } + private: enum { max_transactions = 2048 }; - unsigned int new_transaction_id(boost::shared_ptr o); + unsigned int new_transaction_id(observer_ptr o); void update_oldest_transaction_id(); boost::uint32_t calc_connection_id(udp::endpoint addr); - typedef boost::array, max_transactions> + mutable boost::pool<> m_pool_allocator; + + typedef boost::array transactions_t; transactions_t m_transactions; - std::vector > m_aborted_transactions; + std::vector m_aborted_transactions; // this is the next transaction id to be used int m_next_transaction_id; diff --git a/include/libtorrent/kademlia/traversal_algorithm.hpp b/include/libtorrent/kademlia/traversal_algorithm.hpp index 6fa647ba4..d51ed5506 100644 --- a/include/libtorrent/kademlia/traversal_algorithm.hpp +++ b/include/libtorrent/kademlia/traversal_algorithm.hpp @@ -43,6 +43,7 @@ POSSIBILITY OF SUCH DAMAGE. #include #include #include +#include namespace libtorrent { namespace dht { @@ -60,6 +61,7 @@ public: void finished(node_id const& id); void failed(node_id const& id, bool prevent_request = false); virtual ~traversal_algorithm() {} + boost::pool<>& allocator() const; protected: template diff --git a/include/libtorrent/peer_connection.hpp b/include/libtorrent/peer_connection.hpp index ab2a9456b..546d6d3b5 100755 --- a/include/libtorrent/peer_connection.hpp +++ b/include/libtorrent/peer_connection.hpp @@ -89,7 +89,7 @@ namespace libtorrent } TORRENT_EXPORT void intrusive_ptr_add_ref(peer_connection const*); - TORRENT_EXPORT void intrusive_ptr_release(peer_connection const*); + TORRENT_EXPORT void intrusive_ptr_release(peer_connection const*); struct TORRENT_EXPORT protocol_error: std::runtime_error { @@ -470,7 +470,7 @@ namespace libtorrent // the time when we last got a part of a // piece packet from this peer - ptime m_last_piece; + ptime m_last_piece; int m_packet_size; int m_recv_pos; diff --git a/src/kademlia/closest_nodes.cpp b/src/kademlia/closest_nodes.cpp index e461bc453..0c7d9d276 100644 --- a/src/kademlia/closest_nodes.cpp +++ b/src/kademlia/closest_nodes.cpp @@ -41,36 +41,6 @@ namespace libtorrent { namespace dht using asio::ip::udp; -typedef boost::shared_ptr observer_ptr; - -class closest_nodes_observer : public observer -{ -public: - closest_nodes_observer( - boost::intrusive_ptr const& algorithm - , node_id self - , node_id target) - : m_algorithm(algorithm) - , m_target(target) - , m_self(self) - {} - ~closest_nodes_observer(); - - void send(msg& p) - { - p.info_hash = m_target; - } - - void timeout(); - void reply(msg const&); - void abort() { m_algorithm = 0; } - -private: - boost::intrusive_ptr m_algorithm; - node_id const m_target; - node_id const m_self; -}; - closest_nodes_observer::~closest_nodes_observer() { if (m_algorithm) m_algorithm->failed(m_self, true); @@ -129,8 +99,8 @@ closest_nodes::closest_nodes( void closest_nodes::invoke(node_id const& id, udp::endpoint addr) { - observer_ptr p(new closest_nodes_observer(this, id, m_target)); - m_rpc.invoke(messages::find_node, addr, p); + observer_ptr o(new (m_rpc.allocator().malloc()) closest_nodes_observer(this, id, m_target)); + m_rpc.invoke(messages::find_node, addr, o); } void closest_nodes::done() diff --git a/src/kademlia/dht_tracker.cpp b/src/kademlia/dht_tracker.cpp index d393f25a3..eda6cd864 100644 --- a/src/kademlia/dht_tracker.cpp +++ b/src/kademlia/dht_tracker.cpp @@ -628,7 +628,7 @@ namespace libtorrent { namespace dht m.error_msg = list.back().string(); m.error_code = list.front().integer(); #ifdef TORRENT_DHT_VERBOSE_LOGGING - TORRENT_LOG(dht_tracker) << " error: " << m.error_code << " " + TORRENT_LOG(dht_tracker) << " incoming error: " << m.error_code << " " << m.error_msg; #endif throw std::runtime_error("DHT error message"); @@ -658,9 +658,9 @@ namespace libtorrent { namespace dht #ifdef TORRENT_DHT_VERBOSE_LOGGING int current_buffer = (m_buffer + 1) & 1; std::string msg(m_in_buf[current_buffer].begin() - , m_in_buf[current_buffer].end()); + , m_in_buf[current_buffer].begin() + bytes_transferred); TORRENT_LOG(dht_tracker) << "invalid incoming packet: " - << e.what() << "\n" << msg.c_str() << "\n"; + << e.what() << "\n" << msg << "\n"; #endif } } @@ -794,6 +794,7 @@ namespace libtorrent { namespace dht using libtorrent::bencode; using libtorrent::entry; entry e(entry::dictionary_t); + assert(!m.transaction_id.empty() || m.message_id == messages::error); e["t"] = m.transaction_id; static char const version_str[] = {'L', 'T' , LIBTORRENT_VERSION_MAJOR, LIBTORRENT_VERSION_MINOR}; @@ -816,7 +817,7 @@ namespace libtorrent { namespace dht e["e"] = error_list; #ifdef TORRENT_DHT_VERBOSE_LOGGING TORRENT_LOG(dht_tracker) << time_now_string() - << " error: " << m.error_code << " " << m.error_msg; + << " outgoing error: " << m.error_code << " " << m.error_msg; #endif } else if (m.reply) diff --git a/src/kademlia/find_data.cpp b/src/kademlia/find_data.cpp index 8b47e52cf..4ada42fb3 100644 --- a/src/kademlia/find_data.cpp +++ b/src/kademlia/find_data.cpp @@ -40,38 +40,6 @@ POSSIBILITY OF SUCH DAMAGE. namespace libtorrent { namespace dht { -typedef boost::shared_ptr observer_ptr; - -class find_data_observer : public observer -{ -public: - find_data_observer( - boost::intrusive_ptr const& algorithm - , node_id self - , node_id target) - : m_algorithm(algorithm) - , m_target(target) - , m_self(self) - {} - ~find_data_observer(); - - void send(msg& m) - { - m.reply = false; - m.message_id = messages::get_peers; - m.info_hash = m_target; - } - - void timeout(); - void reply(msg const&); - void abort() { m_algorithm = 0; } - -private: - boost::intrusive_ptr m_algorithm; - node_id const m_target; - node_id const m_self; -}; - find_data_observer::~find_data_observer() { if (m_algorithm) m_algorithm->failed(m_self); @@ -141,8 +109,8 @@ void find_data::invoke(node_id const& id, asio::ip::udp::endpoint addr) return; } - observer_ptr p(new find_data_observer(this, id, m_target)); - m_rpc.invoke(messages::get_peers, addr, p); + observer_ptr o(new (m_rpc.allocator().malloc()) find_data_observer(this, id, m_target)); + m_rpc.invoke(messages::get_peers, addr, o); } void find_data::got_data(msg const* m) diff --git a/src/kademlia/node.cpp b/src/kademlia/node.cpp index 0ed598ebe..74641ec43 100644 --- a/src/kademlia/node.cpp +++ b/src/kademlia/node.cpp @@ -63,8 +63,6 @@ namespace } #endif -typedef boost::shared_ptr observer_ptr; - // TODO: configurable? enum { announce_interval = 30 }; @@ -267,70 +265,6 @@ void node_impl::incoming(msg const& m) namespace { - - class announce_observer : public observer - { - public: - announce_observer(sha1_hash const& info_hash, int listen_port - , entry const& write_token) - : m_info_hash(info_hash) - , m_listen_port(listen_port) - , m_token(write_token) - {} - - void send(msg& m) - { - m.port = m_listen_port; - m.info_hash = m_info_hash; - m.write_token = m_token; - } - - void timeout() {} - void reply(msg const&) {} - void abort() {} - - private: - sha1_hash m_info_hash; - int m_listen_port; - entry m_token; - }; - - class get_peers_observer : public observer - { - public: - get_peers_observer(sha1_hash const& info_hash, int listen_port - , rpc_manager& rpc - , boost::function const&, sha1_hash const&)> f) - : m_info_hash(info_hash) - , m_listen_port(listen_port) - , m_rpc(rpc) - , m_fun(f) - {} - - void send(msg& m) - { - m.port = m_listen_port; - m.info_hash = m_info_hash; - } - - void timeout() {} - void reply(msg const& r) - { - m_rpc.invoke(messages::announce_peer, r.addr - , boost::shared_ptr( - new announce_observer(m_info_hash, m_listen_port, r.write_token))); - m_fun(r.peers, m_info_hash); - } - void abort() {} - - private: - sha1_hash m_info_hash; - int m_listen_port; - rpc_manager& m_rpc; - boost::function const&, sha1_hash const&)> m_fun; - }; - - void announce_fun(std::vector const& v, rpc_manager& rpc , int listen_port, sha1_hash const& ih , boost::function const&, sha1_hash const&)> f) @@ -340,23 +274,11 @@ namespace for (std::vector::const_iterator i = v.begin() , end(v.end()); i != end; ++i) { - rpc.invoke(messages::get_peers, i->addr, boost::shared_ptr( - new get_peers_observer(ih, listen_port, rpc, f))); + rpc.invoke(messages::get_peers, i->addr, observer_ptr( + new (rpc.allocator().malloc()) get_peers_observer(ih, listen_port, rpc, f))); nodes = true; } } - -} - -namespace -{ - struct dummy_observer : observer - { - virtual void reply(msg const&) {} - virtual void timeout() {} - virtual void send(msg&) {} - virtual void abort() {} - }; } void node_impl::add_router_node(udp::endpoint router) @@ -368,8 +290,8 @@ void node_impl::add_node(udp::endpoint node) { // ping the node, and if we get a reply, it // will be added to the routing table - observer_ptr p(new dummy_observer()); - m_rpc.invoke(messages::ping, node, p); + observer_ptr o(new (m_rpc.allocator().malloc()) null_observer(m_rpc.allocator())); + m_rpc.invoke(messages::ping, node, o); } void node_impl::announce(sha1_hash const& info_hash, int listen_port diff --git a/src/kademlia/refresh.cpp b/src/kademlia/refresh.cpp index 9536540d6..ce94ca93b 100644 --- a/src/kademlia/refresh.cpp +++ b/src/kademlia/refresh.cpp @@ -36,6 +36,7 @@ POSSIBILITY OF SUCH DAMAGE. #include #include #include +#include #include @@ -52,38 +53,6 @@ using asio::ip::udp; TORRENT_DEFINE_LOG(refresh) #endif -typedef boost::shared_ptr observer_ptr; - -class refresh_observer : public observer -{ -public: - refresh_observer( - boost::intrusive_ptr const& algorithm - , node_id self - , node_id target - ) - : m_target(target) - , m_self(self) - , m_algorithm(algorithm) - {} - ~refresh_observer(); - - void send(msg& m) - { - m.info_hash = m_target; - } - - void timeout(); - void reply(msg const& m); - void abort() { m_algorithm = 0; } - - -private: - node_id const m_target; - node_id const m_self; - boost::intrusive_ptr m_algorithm; -}; - refresh_observer::~refresh_observer() { if (m_algorithm) m_algorithm->failed(m_self, true); @@ -112,29 +81,6 @@ void refresh_observer::timeout() m_algorithm = 0; } -class ping_observer : public observer -{ -public: - ping_observer( - boost::intrusive_ptr const& algorithm - , node_id self - ) - : m_self(self) - , m_algorithm(algorithm) - {} - ~ping_observer(); - - void send(msg& p) {} - void timeout(); - void reply(msg const& m); - void abort() { m_algorithm = 0; } - - -private: - node_id const m_self; - boost::intrusive_ptr m_algorithm; -}; - ping_observer::~ping_observer() { if (m_algorithm) m_algorithm->ping_timeout(m_self, true); @@ -157,13 +103,10 @@ void ping_observer::timeout() void refresh::invoke(node_id const& nid, udp::endpoint addr) { - observer_ptr p(new refresh_observer( - this - , nid - , m_target - )); + observer_ptr o(new (m_rpc.allocator().malloc()) refresh_observer( + this, nid, m_target)); - m_rpc.invoke(messages::find_node, addr, p); + m_rpc.invoke(messages::find_node, addr, o); } void refresh::done() @@ -211,8 +154,9 @@ void refresh::invoke_pings_or_finish(bool prevent_request) try { - observer_ptr p(new ping_observer(this, node.id)); - m_rpc.invoke(messages::ping, node.addr, p); + observer_ptr o(new (m_rpc.allocator().malloc()) ping_observer( + this, node.id)); + m_rpc.invoke(messages::ping, node.addr, o); ++m_active_pings; ++m_leftover_nodes_iterator; } diff --git a/src/kademlia/rpc_manager.cpp b/src/kademlia/rpc_manager.cpp index dfd939da3..93eac8565 100644 --- a/src/kademlia/rpc_manager.cpp +++ b/src/kademlia/rpc_manager.cpp @@ -34,12 +34,23 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/socket.hpp" #include +#include +#include +#include +#include +#include +#include #include #include #include #include #include +#include +#include +#include +#include +#include #include #include @@ -51,16 +62,51 @@ namespace libtorrent { namespace dht { namespace io = libtorrent::detail; +namespace mpl = boost::mpl; #ifdef TORRENT_DHT_VERBOSE_LOGGING TORRENT_DEFINE_LOG(rpc) #endif +void intrusive_ptr_add_ref(observer const* o) +{ + assert(o->m_refs >= 0); + assert(o != 0); + ++o->m_refs; +} + +void intrusive_ptr_release(observer const* o) +{ + assert(o->m_refs > 0); + assert(o != 0); + if (--o->m_refs == 0) + { + boost::pool<>& p = o->pool_allocator; + o->~observer(); + p.ordered_free(const_cast(o)); + } +} + node_id generate_id(); +typedef mpl::vector< + closest_nodes_observer + , find_data_observer + , announce_observer + , get_peers_observer + , refresh_observer + , ping_observer + , null_observer + > observer_types; + +typedef mpl::max_element< + mpl::transform_view > + >::type max_observer_type_iter; + rpc_manager::rpc_manager(fun const& f, node_id const& our_id , routing_table& table, send_fun const& sf) - : m_next_transaction_id(rand() % max_transactions) + : m_pool_allocator(sizeof(mpl::deref::type)) + , m_next_transaction_id(rand() % max_transactions) , m_oldest_transaction_id(m_next_transaction_id) , m_incoming(f) , m_send(sf) @@ -124,12 +170,14 @@ bool rpc_manager::incoming(msg const& m) << m.transaction_id.size() << " from " << m.addr; #endif msg reply; + reply.reply = true; reply.message_id = messages::error; reply.error_code = 203; // Protocol error - reply.error_msg = "reply with invalid transaction id, size " + m.transaction_id.size(); + reply.error_msg = "reply with invalid transaction id, size " + + boost::lexical_cast(m.transaction_id.size()); reply.addr = m.addr; reply.transaction_id = ""; - m_send(m); + m_send(reply); return false; } @@ -144,16 +192,17 @@ bool rpc_manager::incoming(msg const& m) << tid << " from " << m.addr; #endif msg reply; + reply.reply = true; reply.message_id = messages::error; reply.error_code = 203; // Protocol error reply.error_msg = "reply with invalid transaction id"; reply.addr = m.addr; reply.transaction_id = ""; - m_send(m); + m_send(reply); return false; } - boost::shared_ptr o = m_transactions[tid]; + observer_ptr o = m_transactions[tid]; if (!o) { @@ -179,7 +228,7 @@ bool rpc_manager::incoming(msg const& m) << std::endl; #endif o->reply(m); - m_transactions[tid].reset(); + m_transactions[tid] = 0; if (m.piggy_backed_ping) { @@ -214,7 +263,7 @@ time_duration rpc_manager::tick() if (m_next_transaction_id == m_oldest_transaction_id) return milliseconds(timeout_ms); - std::vector > timeouts; + std::vector timeouts; for (;m_next_transaction_id != m_oldest_transaction_id; m_oldest_transaction_id = (m_oldest_transaction_id + 1) % max_transactions) @@ -222,7 +271,7 @@ time_duration rpc_manager::tick() assert(m_oldest_transaction_id >= 0); assert(m_oldest_transaction_id < max_transactions); - boost::shared_ptr o = m_transactions[m_oldest_transaction_id]; + observer_ptr o = m_transactions[m_oldest_transaction_id]; if (!o) continue; time_duration diff = o->sent + milliseconds(timeout_ms) - time_now(); @@ -234,7 +283,7 @@ time_duration rpc_manager::tick() try { - m_transactions[m_oldest_transaction_id].reset(); + m_transactions[m_oldest_transaction_id] = 0; timeouts.push_back(o); } catch (std::exception) {} } @@ -245,11 +294,11 @@ time_duration rpc_manager::tick() // clear the aborted transactions, will likely // generate new requests. We need to swap, since the // destrutors may add more observers to the m_aborted_transactions - std::vector >().swap(m_aborted_transactions); + std::vector().swap(m_aborted_transactions); return milliseconds(timeout_ms); } -unsigned int rpc_manager::new_transaction_id(shared_ptr o) +unsigned int rpc_manager::new_transaction_id(observer_ptr o) { INVARIANT_CHECK; @@ -261,7 +310,7 @@ unsigned int rpc_manager::new_transaction_id(shared_ptr o) // it will prevent it from spawning new requests right now, // since that would break the invariant m_aborted_transactions.push_back(m_transactions[m_next_transaction_id]); - m_transactions[m_next_transaction_id].reset(); + m_transactions[m_next_transaction_id] = 0; assert(m_oldest_transaction_id == m_next_transaction_id); } assert(!m_transactions[tid]); @@ -294,7 +343,7 @@ void rpc_manager::update_oldest_transaction_id() } void rpc_manager::invoke(int message_id, udp::endpoint target_addr - , shared_ptr o) + , observer_ptr o) { INVARIANT_CHECK; @@ -352,17 +401,6 @@ void rpc_manager::reply(msg& m) m_send(m); } -namespace -{ - struct dummy_observer : observer - { - virtual void reply(msg const&) {} - virtual void timeout() {} - virtual void send(msg&) {} - void abort() {} - }; -} - void rpc_manager::reply_with_ping(msg& m) { INVARIANT_CHECK; @@ -377,7 +415,7 @@ void rpc_manager::reply_with_ping(msg& m) std::back_insert_iterator out(m.ping_transaction_id); io::write_uint16(m_next_transaction_id, out); - boost::shared_ptr o(new dummy_observer); + observer_ptr o(new (allocator().malloc()) null_observer(allocator())); assert(!m_transactions[m_next_transaction_id]); o->sent = time_now(); o->target_addr = m.addr; diff --git a/src/kademlia/traversal_algorithm.cpp b/src/kademlia/traversal_algorithm.cpp index e149281fe..ceb977f19 100644 --- a/src/kademlia/traversal_algorithm.cpp +++ b/src/kademlia/traversal_algorithm.cpp @@ -76,6 +76,11 @@ void traversal_algorithm::add_entry(node_id const& id, udp::endpoint addr, unsig } } +boost::pool<>& traversal_algorithm::allocator() const +{ + return m_rpc.allocator(); +} + void traversal_algorithm::traverse(node_id const& id, udp::endpoint addr) { add_entry(id, addr, 0); diff --git a/src/web_peer_connection.cpp b/src/web_peer_connection.cpp index e71af6a0d..bbe7c6e66 100755 --- a/src/web_peer_connection.cpp +++ b/src/web_peer_connection.cpp @@ -174,6 +174,7 @@ namespace libtorrent torrent_info const& info = t->torrent_file(); std::string request; + request.reserve(400); int size = r.length; const int block_size = t->block_size();