optimized memory usage in the DHT, and added some handling for out-of-memory cases

This commit is contained in:
Arvid Norberg 2009-05-13 22:18:41 +00:00
parent bdd7e21831
commit 00ccf9064e
10 changed files with 82 additions and 28 deletions

View File

@ -77,18 +77,16 @@ class closest_nodes_observer : public observer
public: public:
closest_nodes_observer( closest_nodes_observer(
boost::intrusive_ptr<traversal_algorithm> const& algorithm boost::intrusive_ptr<traversal_algorithm> const& algorithm
, node_id self , node_id self)
, node_id target)
: observer(algorithm->allocator()) : observer(algorithm->allocator())
, m_algorithm(algorithm) , m_algorithm(algorithm)
, m_target(target)
, m_self(self) , m_self(self)
{} {}
~closest_nodes_observer(); ~closest_nodes_observer();
void send(msg& p) void send(msg& p)
{ {
p.info_hash = m_target; p.info_hash = m_algorithm->target();
} }
void timeout(); void timeout();
@ -97,7 +95,6 @@ public:
private: private:
boost::intrusive_ptr<traversal_algorithm> m_algorithm; boost::intrusive_ptr<traversal_algorithm> m_algorithm;
node_id const m_target;
node_id const m_self; node_id const m_self;
}; };

View File

@ -84,7 +84,13 @@ struct observer : boost::noncopyable
// is being destructed // is being destructed
virtual void abort() = 0; virtual void abort() = 0;
udp::endpoint target_addr; #if TORRENT_USE_IPV6
address target_addr;
#else
address_v4 target_addr;
#endif
uint16_t port;
udp::endpoint target_ep() const { return udp::endpoint(target_addr, port); }
ptime sent; ptime sent;
#ifdef TORRENT_DEBUG #ifdef TORRENT_DEBUG
bool m_in_constructor; bool m_in_constructor;

View File

@ -86,18 +86,16 @@ class refresh_observer : public observer
public: public:
refresh_observer( refresh_observer(
boost::intrusive_ptr<refresh> const& algorithm boost::intrusive_ptr<refresh> const& algorithm
, node_id self , node_id self)
, node_id target)
: observer(algorithm->allocator()) : observer(algorithm->allocator())
, m_target(target)
, m_self(self)
, m_algorithm(algorithm) , m_algorithm(algorithm)
, m_self(self)
{} {}
~refresh_observer(); ~refresh_observer();
void send(msg& m) void send(msg& m)
{ {
m.info_hash = m_target; m.info_hash = m_algorithm->target();
} }
void timeout(); void timeout();
@ -106,9 +104,8 @@ public:
private: private:
node_id const m_target;
node_id const m_self;
boost::intrusive_ptr<refresh> m_algorithm; boost::intrusive_ptr<refresh> m_algorithm;
node_id const m_self;
}; };
class ping_observer : public observer class ping_observer : public observer

View File

@ -68,6 +68,8 @@ public:
virtual char const* name() const { return "traversal_algorithm"; } virtual char const* name() const { return "traversal_algorithm"; }
node_id const& target() const { return m_target; }
protected: protected:
template<class InIt> template<class InIt>

View File

@ -59,7 +59,7 @@ void closest_nodes_observer::reply(msg const& in)
for (msg::nodes_t::const_iterator i = in.nodes.begin() for (msg::nodes_t::const_iterator i = in.nodes.begin()
, end(in.nodes.end()); i != end; ++i) , end(in.nodes.end()); i != end; ++i)
{ {
m_algorithm->traverse(i->id, udp::endpoint(i->addr, i->port)); m_algorithm->traverse(i->id, i->ep());
} }
} }
m_algorithm->finished(m_self); m_algorithm->finished(m_self);
@ -87,7 +87,14 @@ closest_nodes::closest_nodes(
void closest_nodes::invoke(node_id const& id, udp::endpoint addr) void closest_nodes::invoke(node_id const& id, udp::endpoint addr)
{ {
TORRENT_ASSERT(m_node.m_rpc.allocation_size() >= sizeof(closest_nodes_observer)); TORRENT_ASSERT(m_node.m_rpc.allocation_size() >= sizeof(closest_nodes_observer));
observer_ptr o(new (m_node.m_rpc.allocator().malloc()) closest_nodes_observer(this, id, m_target)); void* ptr = m_node.m_rpc.allocator().malloc();
if (ptr == 0)
{
done();
return;
}
m_node.m_rpc.allocator().set_next_size(10);
observer_ptr o(new (ptr) closest_nodes_observer(this, id));
#ifdef TORRENT_DEBUG #ifdef TORRENT_DEBUG
o->m_in_constructor = false; o->m_in_constructor = false;
#endif #endif

View File

@ -105,7 +105,14 @@ void find_data::invoke(node_id const& id, udp::endpoint addr)
} }
TORRENT_ASSERT(m_node.m_rpc.allocation_size() >= sizeof(find_data_observer)); TORRENT_ASSERT(m_node.m_rpc.allocation_size() >= sizeof(find_data_observer));
observer_ptr o(new (m_node.m_rpc.allocator().malloc()) find_data_observer(this, id)); void* ptr = m_node.m_rpc.allocator().malloc();
if (ptr == 0)
{
done();
return;
}
m_node.m_rpc.allocator().set_next_size(10);
observer_ptr o(new (ptr) find_data_observer(this, id));
#ifdef TORRENT_DEBUG #ifdef TORRENT_DEBUG
o->m_in_constructor = false; o->m_in_constructor = false;
#endif #endif

View File

@ -274,7 +274,10 @@ namespace
TORRENT_LOG(node) << " distance: " << (160 - distance_exp(ih, i->first.id)); TORRENT_LOG(node) << " distance: " << (160 - distance_exp(ih, i->first.id));
#endif #endif
observer_ptr o(new (rpc.allocator().malloc()) announce_observer( void* ptr = rpc.allocator().malloc();
if (ptr == 0) return;
rpc.allocator().set_next_size(10);
observer_ptr o(new (ptr) announce_observer(
rpc.allocator(), ih, listen_port, i->second)); rpc.allocator(), ih, listen_port, i->second));
#ifdef TORRENT_DEBUG #ifdef TORRENT_DEBUG
o->m_in_constructor = false; o->m_in_constructor = false;
@ -296,7 +299,10 @@ void node_impl::add_node(udp::endpoint node)
{ {
// ping the node, and if we get a reply, it // ping the node, and if we get a reply, it
// will be added to the routing table // will be added to the routing table
observer_ptr o(new (m_rpc.allocator().malloc()) null_observer(m_rpc.allocator())); void* ptr = m_rpc.allocator().malloc();
if (ptr == 0) return;
m_rpc.allocator().set_next_size(10);
observer_ptr o(new (ptr) null_observer(m_rpc.allocator()));
#ifdef TORRENT_DEBUG #ifdef TORRENT_DEBUG
o->m_in_constructor = false; o->m_in_constructor = false;
#endif #endif

View File

@ -103,8 +103,14 @@ void ping_observer::timeout()
void refresh::invoke(node_id const& nid, udp::endpoint addr) void refresh::invoke(node_id const& nid, udp::endpoint addr)
{ {
TORRENT_ASSERT(m_node.m_rpc.allocation_size() >= sizeof(refresh_observer)); TORRENT_ASSERT(m_node.m_rpc.allocation_size() >= sizeof(refresh_observer));
observer_ptr o(new (m_node.m_rpc.allocator().malloc()) refresh_observer( void* ptr = m_node.m_rpc.allocator().malloc();
this, nid, m_target)); if (ptr == 0)
{
done();
return;
}
m_node.m_rpc.allocator().set_next_size(10);
observer_ptr o(new (ptr) refresh_observer( this, nid));
#ifdef TORRENT_DEBUG #ifdef TORRENT_DEBUG
o->m_in_constructor = false; o->m_in_constructor = false;
#endif #endif
@ -156,19 +162,25 @@ void refresh::invoke_pings_or_finish(bool prevent_request)
continue; continue;
} }
#ifndef BOOST_NO_EXCEPTIONS
try try
{ {
#endif
TORRENT_ASSERT(m_node.m_rpc.allocation_size() >= sizeof(ping_observer)); TORRENT_ASSERT(m_node.m_rpc.allocation_size() >= sizeof(ping_observer));
observer_ptr o(new (m_node.m_rpc.allocator().malloc()) ping_observer( void* ptr = m_node.m_rpc.allocator().malloc();
this, node.id)); if (ptr == 0) return;
m_node.m_rpc.allocator().set_next_size(10);
observer_ptr o(new (ptr) ping_observer(this, node.id));
#ifdef TORRENT_DEBUG #ifdef TORRENT_DEBUG
o->m_in_constructor = false; o->m_in_constructor = false;
#endif #endif
m_node.m_rpc.invoke(messages::ping, node.addr, o); m_node.m_rpc.invoke(messages::ping, node.addr, o);
++m_active_pings; ++m_active_pings;
++m_leftover_nodes_iterator; ++m_leftover_nodes_iterator;
#ifndef BOOST_NO_EXCEPTIONS
} }
catch (std::exception& e) {} catch (std::exception& e) {}
#endif
} }
} }

View File

@ -104,7 +104,7 @@ typedef mpl::max_element<
rpc_manager::rpc_manager(fun const& f, node_id const& our_id rpc_manager::rpc_manager(fun const& f, node_id const& our_id
, routing_table& table, send_fun const& sf) , routing_table& table, send_fun const& sf)
: m_pool_allocator(sizeof(mpl::deref<max_observer_type_iter::base>::type)) : m_pool_allocator(sizeof(mpl::deref<max_observer_type_iter::base>::type), 10)
, m_next_transaction_id(std::rand() % max_transactions) , m_next_transaction_id(std::rand() % max_transactions)
, m_oldest_transaction_id(m_next_transaction_id) , m_oldest_transaction_id(m_next_transaction_id)
, m_incoming(f) , m_incoming(f)
@ -184,7 +184,7 @@ void rpc_manager::unreachable(udp::endpoint const& ep)
if (tid >= max_transactions) tid = 0; if (tid >= max_transactions) tid = 0;
observer_ptr const& o = m_transactions[tid]; observer_ptr const& o = m_transactions[tid];
if (!o) continue; if (!o) continue;
if (o->target_addr != ep) continue; if (o->target_ep() != ep) continue;
observer_ptr ptr = m_transactions[tid]; observer_ptr ptr = m_transactions[tid];
m_transactions[tid] = 0; m_transactions[tid] = 0;
if (tid == m_oldest_transaction_id) if (tid == m_oldest_transaction_id)
@ -262,7 +262,7 @@ bool rpc_manager::incoming(msg const& m)
return false; return false;
} }
if (m.addr.address() != o->target_addr.address()) if (m.addr.address() != o->target_addr)
{ {
#ifdef TORRENT_DHT_VERBOSE_LOGGING #ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(rpc) << "Reply with incorrect address and valid transaction id: " TORRENT_LOG(rpc) << "Reply with incorrect address and valid transaction id: "
@ -336,7 +336,7 @@ time_duration rpc_manager::tick()
m_transactions[m_oldest_transaction_id] = 0; m_transactions[m_oldest_transaction_id] = 0;
#ifdef TORRENT_DHT_VERBOSE_LOGGING #ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(rpc) << "Timing out transaction id: " TORRENT_LOG(rpc) << "Timing out transaction id: "
<< m_oldest_transaction_id << " from " << o->target_addr; << m_oldest_transaction_id << " from " << o->target_ep();
#endif #endif
timeouts.push_back(o); timeouts.push_back(o);
} catch (std::exception) {} } catch (std::exception) {}
@ -367,7 +367,7 @@ unsigned int rpc_manager::new_transaction_id(observer_ptr o)
m_aborted_transactions.push_back(o); m_aborted_transactions.push_back(o);
#ifdef TORRENT_DHT_VERBOSE_LOGGING #ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(rpc) << "[new_transaction_id] Aborting message with transaction id: " TORRENT_LOG(rpc) << "[new_transaction_id] Aborting message with transaction id: "
<< m_next_transaction_id << " sent to " << o->target_addr << m_next_transaction_id << " sent to " << o->target_ep()
<< " " << total_seconds(time_now() - o->sent) << " seconds ago"; << " " << total_seconds(time_now() - o->sent) << " seconds ago";
#endif #endif
m_transactions[m_next_transaction_id] = 0; m_transactions[m_next_transaction_id] = 0;
@ -431,7 +431,8 @@ void rpc_manager::invoke(int message_id, udp::endpoint target_addr
o->send(m); o->send(m);
o->sent = time_now(); o->sent = time_now();
o->target_addr = target_addr; o->target_addr = target_addr.address();
o->port = target_addr.port();
#ifdef TORRENT_DHT_VERBOSE_LOGGING #ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(rpc) << "Invoking " << messages::ids[message_id] TORRENT_LOG(rpc) << "Invoking " << messages::ids[message_id]

View File

@ -54,6 +54,19 @@ POSSIBILITY OF SUCH DAMAGE.
#pragma warning(pop) #pragma warning(pop)
#endif #endif
// for logging the size of DHT structures
#if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING
#ifndef TORRENT_DISABLE_DHT
#include <libtorrent/kademlia/find_data.hpp>
#include <libtorrent/kademlia/closest_nodes.hpp>
#include <libtorrent/kademlia/refresh.hpp>
#include <libtorrent/kademlia/node.hpp>
#include <libtorrent/kademlia/observer.hpp>
#endif
#endif
#include "libtorrent/peer_id.hpp" #include "libtorrent/peer_id.hpp"
#include "libtorrent/torrent_info.hpp" #include "libtorrent/torrent_info.hpp"
#include "libtorrent/tracker_manager.hpp" #include "libtorrent/tracker_manager.hpp"
@ -261,6 +274,12 @@ namespace aux {
PRINT_OFFSETOF(policy::peer, port) PRINT_OFFSETOF(policy::peer, port)
PRINT_OFFSETOF(policy::peer, hashfails) PRINT_OFFSETOF(policy::peer, hashfails)
PRINT_SIZEOF(dht::closest_nodes_observer)
PRINT_SIZEOF(dht::find_data_observer)
PRINT_SIZEOF(dht::announce_observer)
PRINT_SIZEOF(dht::refresh_observer)
PRINT_SIZEOF(dht::ping_observer)
PRINT_SIZEOF(dht::null_observer)
#undef PRINT_OFFSETOF #undef PRINT_OFFSETOF
#undef PRINT_SIZEOF #undef PRINT_SIZEOF