DHT optimizations

This commit is contained in:
Arvid Norberg 2008-11-10 02:08:42 +00:00
parent 15a8cbcce7
commit 18d269dd62
14 changed files with 189 additions and 145 deletions

View File

@ -1,4 +1,5 @@
* DHT bandwidth usage optimizations
* rate limited DHT send socket
* tracker connections are now also subject to IP filtering
* improved optimistic unchoke logic

View File

@ -55,22 +55,20 @@ namespace messages
struct msg
{
msg() : reply(false), piggy_backed_ping(false)
, message_id(-1), port(0) {}
msg()
: reply(false)
, message_id(-1)
, port(0)
{}
// true if this message is a reply
bool reply;
// true if this is a reply with a piggy backed ping
bool piggy_backed_ping;
// the kind if message
int message_id;
// if this is a reply, a copy of the transaction id
// from the request. If it's a request, a transaction
// id that should be sent back in the reply
std::string transaction_id;
// if this packet has a piggy backed ping, this
// is the transaction id of that ping
std::string ping_transaction_id;
// the node id of the process sending the message
node_id id;
// the address of the process sending or receiving

View File

@ -41,30 +41,41 @@ namespace libtorrent { namespace dht
struct node_entry
{
node_entry(node_id const& id_, udp::endpoint addr_)
: id(id_)
, addr(addr_)
, fail_count(0)
node_entry(node_id const& id_, udp::endpoint ep, bool pinged = false)
: addr(ep.address())
, port(ep.port())
, timeout_count(pinged ? 0 : 0xffff)
, id(id_)
{
#ifdef TORRENT_DHT_VERBOSE_LOGGING
first_seen = time_now();
#endif
}
node_entry(udp::endpoint addr_)
: id(0)
, addr(addr_)
, fail_count(0)
node_entry(udp::endpoint ep)
: addr(ep.address())
, port(ep.port())
, timeout_count(0xffff)
, id(0)
{
#ifdef TORRENT_DHT_VERBOSE_LOGGING
first_seen = time_now();
#endif
}
node_id id;
udp::endpoint addr;
bool pinged() const { return timeout_count != 0xffff; }
void set_pinged() { if (timeout_count == 0xffff) timeout_count = 0; }
void timed_out() { if (pinged()) ++timeout_count; }
int fail_count() const { return pinged() ? timeout_count : 0; }
void reset_fail_count() { if (pinged()) timeout_count = 0; }
udp::endpoint ep() const { return udp::endpoint(addr, port); }
bool confirmed() const { return timeout_count == 0; }
address addr;
boost::uint16_t port;
// the number of times this node has failed to
// respond in a row
int fail_count;
boost::uint16_t timeout_count;
node_id id;
#ifdef TORRENT_DHT_VERBOSE_LOGGING
ptime first_seen;
#endif

View File

@ -183,15 +183,20 @@ public:
// the time from the last activity is more than 15 minutes
ptime next_refresh(int bucket);
enum
{
include_self = 1,
include_failed = 2
};
// fills the vector with the count nodes from our buckets that
// are nearest to the given id.
void find_node(node_id const& id, std::vector<node_entry>& l
, bool include_self, int count = 0);
, int options, int count = 0);
// returns true if the given node would be placed in a bucket
// that is not full. If the node already exists in the table
// this function returns false
bool need_node(node_id const& id);
// this may add a node to the routing table and mark it as
// not pinged. If the bucket the node falls into is full,
// the node will be ignored.
void heard_about(node_id const& id, udp::endpoint const& ep);
// this will set the given bucket's latest activity
// to the current time

View File

@ -91,7 +91,6 @@ public:
, observer_ptr o);
void reply(msg& m);
void reply_with_ping(msg& m);
#ifndef NDEBUG
size_t allocation_size() const;

View File

@ -135,7 +135,7 @@ traversal_algorithm::traversal_algorithm(
for (InIt i = start; i != end; ++i)
{
add_entry(i->id, i->addr, result::initial);
add_entry(i->id, udp::endpoint(i->addr, i->port), result::initial);
}
// in case the routing table is empty, use the

View File

@ -59,7 +59,7 @@ void closest_nodes_observer::reply(msg const& in)
for (msg::nodes_t::const_iterator i = in.nodes.begin()
, end(in.nodes.end()); i != end; ++i)
{
m_algorithm->traverse(i->id, i->addr);
m_algorithm->traverse(i->id, udp::endpoint(i->addr, i->port));
}
}
m_algorithm->finished(m_self);

View File

@ -816,7 +816,7 @@ namespace libtorrent { namespace dht
{
std::string node;
std::back_insert_iterator<std::string> out(node);
write_endpoint(i->addr, out);
write_endpoint(udp::endpoint(i->addr, i->port), out);
nodes.list().push_back(entry(node));
}
bucket_t cache;
@ -826,7 +826,7 @@ namespace libtorrent { namespace dht
{
std::string node;
std::back_insert_iterator<std::string> out(node);
write_endpoint(i->addr, out);
write_endpoint(udp::endpoint(i->addr, i->port), out);
nodes.list().push_back(entry(node));
}
if (!nodes.list().empty())
@ -886,13 +886,13 @@ namespace libtorrent { namespace dht
for (msg::nodes_t::const_iterator i = m.nodes.begin()
, end(m.nodes.end()); i != end; ++i)
{
if (!i->addr.address().is_v4())
if (!i->addr.is_v4())
{
ipv6_nodes = true;
continue;
}
std::copy(i->id.begin(), i->id.end(), out);
write_endpoint(i->addr, out);
write_endpoint(udp::endpoint(i->addr, i->port), out);
}
if (ipv6_nodes)
@ -902,12 +902,12 @@ namespace libtorrent { namespace dht
for (msg::nodes_t::const_iterator i = m.nodes.begin()
, end(m.nodes.end()); i != end; ++i)
{
if (!i->addr.address().is_v6()) continue;
if (!i->addr.is_v6()) continue;
endpoint.resize(18 + 20);
std::string::iterator out = endpoint.begin();
std::copy(i->id.begin(), i->id.end(), out);
out += 20;
write_endpoint(i->addr, out);
write_endpoint(udp::endpoint(i->addr, i->port), out);
endpoint.resize(out - endpoint.begin());
p.list().push_back(entry(endpoint));
}
@ -1080,18 +1080,6 @@ namespace libtorrent { namespace dht
#endif
}
TORRENT_LOG(dht_tracker) << log_line.str() << " ]";
if (!m.piggy_backed_ping) return;
msg pm;
pm.reply = false;
pm.piggy_backed_ping = false;
pm.message_id = messages::ping;
pm.transaction_id = m.ping_transaction_id;
pm.id = m.id;
pm.addr = m.addr;
send_packet(pm);
}
}}

View File

@ -64,7 +64,7 @@ void find_data_observer::reply(msg const& m)
for (msg::nodes_t::const_iterator i = m.nodes.begin()
, end(m.nodes.end()); i != end; ++i)
{
m_algorithm->traverse(i->id, i->addr);
m_algorithm->traverse(i->id, udp::endpoint(i->addr, i->port));
}
}
m_algorithm->finished(m_self);

View File

@ -81,7 +81,7 @@ void purge_peers(std::set<peer_entry>& peers)
if (i->added + minutes(int(announce_interval * 1.5f)) < time_now())
{
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(node) << "peer timed out at: " << i->addr.address();
TORRENT_LOG(node) << "peer timed out at: " << i->addr;
#endif
peers.erase(i++);
}
@ -171,7 +171,7 @@ void node_impl::refresh(node_id const& id
// to start the refresh with
std::vector<node_entry> start;
start.reserve(m_table.bucket_size());
m_table.find_node(id, start, false);
m_table.find_node(id, start, routing_table::include_failed);
new dht::refresh(*this, id, start.begin(), start.end(), f);
}
@ -240,7 +240,7 @@ void node_impl::refresh_bucket(int bucket)
std::vector<node_entry> start;
start.reserve(m_table.bucket_size());
m_table.find_node(target, start, false, m_table.bucket_size());
m_table.find_node(target, start, routing_table::include_failed);
new dht::refresh(*this, target, start.begin(), start.end(), bind(&nop));
m_table.touch_bucket(bucket);
@ -282,7 +282,7 @@ namespace
#ifndef NDEBUG
o->m_in_constructor = false;
#endif
rpc.invoke(messages::get_peers, i->addr, o);
rpc.invoke(messages::get_peers, udp::endpoint(i->addr, i->port), o);
nodes = true;
}
}
@ -481,12 +481,12 @@ void node_impl::incoming_request(msg const& m)
{
// we don't have any peers for this info_hash,
// return nodes instead
m_table.find_node(m.info_hash, reply.nodes, false);
m_table.find_node(m.info_hash, reply.nodes, 0);
#ifdef TORRENT_DHT_VERBOSE_LOGGING
for (std::vector<node_entry>::iterator i = reply.nodes.begin()
, end(reply.nodes.end()); i != end; ++i)
{
TORRENT_LOG(node) << " " << i->id << " " << i->addr;
TORRENT_LOG(node) << " " << i->id << " " << i->addr << ":" << i->port;
}
#endif
}
@ -496,12 +496,12 @@ void node_impl::incoming_request(msg const& m)
{
reply.info_hash = m.info_hash;
m_table.find_node(m.info_hash, reply.nodes, false);
m_table.find_node(m.info_hash, reply.nodes, 0);
#ifdef TORRENT_DHT_VERBOSE_LOGGING
for (std::vector<node_entry>::iterator i = reply.nodes.begin()
, end(reply.nodes.end()); i != end; ++i)
{
TORRENT_LOG(node) << " " << i->id << " " << i->addr;
TORRENT_LOG(node) << " " << i->id << " " << i->addr << ":" << i->port;
}
#endif
}
@ -513,10 +513,8 @@ void node_impl::incoming_request(msg const& m)
TORRENT_ASSERT(false);
};
if (m_table.need_node(m.id))
m_rpc.reply_with_ping(reply);
else
m_rpc.reply(reply);
m_table.heard_about(m.id, m.addr);
m_rpc.reply(reply);
}

View File

@ -66,7 +66,7 @@ void refresh_observer::reply(msg const& in)
for (msg::nodes_t::const_iterator i = in.nodes.begin()
, end(in.nodes.end()); i != end; ++i)
{
m_algorithm->traverse(i->id, i->addr);
m_algorithm->traverse(i->id, udp::endpoint(i->addr, i->port));
}
}
m_algorithm->finished(m_self);

View File

@ -150,8 +150,11 @@ void routing_table::print_state(std::ostream& os) const
for (bucket_t::const_iterator j = i->first.begin()
, end(i->first.end()); j != end; ++j)
{
os << "ip: " << j->addr << " fails: " << j->fail_count
<< " id: " << j->id << "\n";
os << " id: " << j->id
<< " ip: " << j->ep()
<< " fails: " << j->fail_count()
<< " pinged: " << j->pinged()
<< "\n";
}
}
}
@ -184,7 +187,7 @@ void routing_table::replacement_cache(bucket_t& nodes) const
}
}
bool routing_table::need_node(node_id const& id)
void routing_table::heard_about(node_id const& id, udp::endpoint const& ep)
{
int bucket_index = distance_exp(m_id, id);
TORRENT_ASSERT(bucket_index < (int)m_buckets.size());
@ -195,16 +198,26 @@ bool routing_table::need_node(node_id const& id)
// if the replacement cache is full, we don't
// need another node. The table is fine the
// way it is.
if ((int)rb.size() >= m_bucket_size) return false;
if ((int)rb.size() >= m_bucket_size) return;
// if the node already exists, we don't need it
if (std::find_if(b.begin(), b.end(), bind(&node_entry::id, _1) == id)
!= b.end()) return false;
!= b.end()) return;
if (std::find_if(rb.begin(), rb.end(), bind(&node_entry::id, _1) == id)
!= rb.end()) return false;
!= rb.end()) return;
return true;
if (b.size() < m_bucket_size)
{
if (bucket_index < m_lowest_active_bucket
&& bucket_index > 0)
m_lowest_active_bucket = bucket_index;
b.push_back(node_entry(id, ep, false));
return;
}
if (rb.size() < m_bucket_size)
rb.push_back(node_entry(id, ep, false));
}
void routing_table::node_failed(node_id const& id)
@ -225,17 +238,20 @@ void routing_table::node_failed(node_id const& id)
if (rb.empty())
{
++i->fail_count;
i->timed_out();
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(table) << " NODE FAILED"
" id: " << id <<
" ip: " << i->addr <<
" fails: " << i->fail_count <<
" ip: " << i->ep() <<
" fails: " << i->fail_count() <<
" pinged: " << i->pinged() <<
" up-time: " << total_seconds(time_now() - i->first_seen);
#endif
if (i->fail_count >= m_settings.max_fail_count)
// if this node has failed too many times, or if this node
// has never responded at all, remove it
if (i->fail_count() >= m_settings.max_fail_count || !i->pinged())
{
b.erase(i);
TORRENT_ASSERT(m_lowest_active_bucket <= bucket_index);
@ -249,8 +265,11 @@ void routing_table::node_failed(node_id const& id)
}
b.erase(i);
b.push_back(rb.back());
rb.erase(rb.end() - 1);
i = find_if(rb.begin(), rb.end(), bind(&node_entry::pinged, _1) == true);
if (i == rb.end()) i = rb.begin();
b.push_back(*i);
rb.erase(i);
}
void routing_table::add_router_node(udp::endpoint router)
@ -282,17 +301,14 @@ bool routing_table::node_seen(node_id const& id, udp::endpoint addr)
if (i != b.end())
{
// TODO: what do we do if we see a node with
// the same id as a node at a different address?
// TORRENT_ASSERT(i->addr == addr);
// we already have the node in our bucket
// just move it to the back since it was
// the last node we had any contact with
// in this bucket
b.erase(i);
b.push_back(node_entry(id, addr));
// TORRENT_LOG(table) << "replacing node: " << id << " " << addr;
i->set_pinged();
i->addr = addr.address();
i->port = addr.port();
// TORRENT_LOG(table) << "updating node: " << id << " " << addr;
return ret;
}
@ -303,7 +319,7 @@ bool routing_table::node_seen(node_id const& id, udp::endpoint addr)
if ((int)b.size() < m_bucket_size)
{
if (b.empty()) b.reserve(m_bucket_size);
b.push_back(node_entry(id, addr));
b.push_back(node_entry(id, addr, true));
// if bucket index is 0, the node is ourselves
// don't updated m_lowest_active_bucket
if (bucket_index < m_lowest_active_bucket
@ -313,8 +329,23 @@ bool routing_table::node_seen(node_id const& id, udp::endpoint addr)
return ret;
}
// if there is no room, we look for nodes marked as stale
// if there is no room, we look for nodes that are not 'pinged',
// i.e. we haven't confirmed that they respond to messages.
// Then we look for nodes marked as stale
// in the k-bucket. If we find one, we can replace it.
i = std::find_if(b.begin(), b.end(), bind(&node_entry::pinged, _1) == false);
if (i != b.end() && !i->pinged())
{
// i points to a node that has not been pinged.
// Replace it with this new one
b.erase(i);
b.push_back(node_entry(id, addr, true));
// TORRENT_LOG(table) << "replacing unpinged node: " << id << " " << addr;
return ret;
}
// A node is considered stale if it has failed at least one
// time. Here we choose the node that has failed most times.
// If we don't find one, place this node in the replacement-
@ -325,12 +356,12 @@ bool routing_table::node_seen(node_id const& id, udp::endpoint addr)
, bind(&node_entry::fail_count, _1)
< bind(&node_entry::fail_count, _2));
if (i != b.end() && i->fail_count > 0)
if (i != b.end() && i->fail_count() > 0)
{
// i points to a node that has been marked
// as stale. Replace it with this new one
b.erase(i);
b.push_back(node_entry(id, addr));
b.push_back(node_entry(id, addr, true));
// TORRENT_LOG(table) << "replacing stale node: " << id << " " << addr;
return ret;
}
@ -347,11 +378,27 @@ bool routing_table::node_seen(node_id const& id, udp::endpoint addr)
// if the node is already in the replacement bucket
// just return.
if (i != rb.end()) return ret;
if (i != rb.end())
{
// make sure we mark this node as pinged
// and if its address has changed, update
// that as well
i->set_pinged();
i->addr = addr.address();
i->port = addr.port();
return ret;
}
if ((int)rb.size() > m_bucket_size) rb.erase(rb.begin());
if ((int)rb.size() >= m_bucket_size)
{
// if the replacement bucket is full, remove the oldest entry
// but prefer nodes that haven't been pinged, since they are
// less reliable than this one, that has been pinged
i = std::find_if(rb.begin(), rb.end(), bind(&node_entry::pinged, _1) == false);
rb.erase(i != rb.end() ? i : rb.begin());
}
if (rb.empty()) rb.reserve(m_bucket_size);
rb.push_back(node_entry(id, addr));
rb.push_back(node_entry(id, addr, true));
// TORRENT_LOG(table) << "inserting node in replacement cache: " << id << " " << addr;
return ret;
}
@ -360,7 +407,7 @@ bool routing_table::need_bootstrap() const
{
for (const_iterator i = begin(); i != end(); ++i)
{
if (i->fail_count == 0) return false;
if (i->confirmed()) return false;
}
return true;
}
@ -378,10 +425,22 @@ DstIter copy_if_n(SrcIter begin, SrcIter end, DstIter target, size_t n, Pred p)
return target;
}
template <class SrcIter, class DstIter>
DstIter copy_n(SrcIter begin, SrcIter end, DstIter target, size_t n)
{
for (; n > 0 && begin != end; ++begin)
{
*target = *begin;
--n;
++target;
}
return target;
}
// fills the vector with the k nodes from our buckets that
// are nearest to the given id.
void routing_table::find_node(node_id const& target
, std::vector<node_entry>& l, bool include_self, int count)
, std::vector<node_entry>& l, int options, int count)
{
l.clear();
if (count == 0) count = m_bucket_size;
@ -392,14 +451,24 @@ void routing_table::find_node(node_id const& target
// copy all nodes that hasn't failed into the target
// vector.
copy_if_n(b.begin(), b.end(), std::back_inserter(l)
, (std::min)(size_t(count), b.size()), bind(&node_entry::fail_count, _1) == 0);
if (options & include_failed)
{
copy_n(b.begin(), b.end(), std::back_inserter(l)
, (std::min)(size_t(count), b.size()));
}
else
{
copy_if_n(b.begin(), b.end(), std::back_inserter(l)
, (std::min)(size_t(count), b.size())
, bind(&node_entry::confirmed, _1));
}
TORRENT_ASSERT((int)l.size() <= count);
if ((int)l.size() == count)
{
TORRENT_ASSERT(std::count_if(l.begin(), l.end()
, boost::bind(&node_entry::fail_count, _1) != 0) == 0);
TORRENT_ASSERT((options & include_failed)
|| std::count_if(l.begin(), l.end()
, !boost::bind(&node_entry::confirmed, _1)) == 0);
return;
}
@ -409,11 +478,18 @@ void routing_table::find_node(node_id const& target
// [0, bucket_index) if we are to include ourself
// or [1, bucket_index) if not.
bucket_t tmpb;
for (int i = include_self?0:1; i < bucket_index; ++i)
for (int i = (options & include_self)?0:1; i < bucket_index; ++i)
{
bucket_t& b = m_buckets[i].first;
std::remove_copy_if(b.begin(), b.end(), std::back_inserter(tmpb)
, bind(&node_entry::fail_count, _1));
if (options & include_failed)
{
copy(b.begin(), b.end(), std::back_inserter(tmpb));
}
else
{
std::remove_copy_if(b.begin(), b.end(), std::back_inserter(tmpb)
, !bind(&node_entry::confirmed, _1));
}
}
if (count - l.size() < tmpb.size())
@ -434,8 +510,9 @@ void routing_table::find_node(node_id const& target
// to look in.
if ((int)l.size() == count)
{
TORRENT_ASSERT(std::count_if(l.begin(), l.end()
, boost::bind(&node_entry::fail_count, _1) != 0) == 0);
TORRENT_ASSERT((options & include_failed)
|| std::count_if(l.begin(), l.end()
, !boost::bind(&node_entry::confirmed, _1)) == 0);
return;
}
@ -444,20 +521,29 @@ void routing_table::find_node(node_id const& target
bucket_t& b = m_buckets[i].first;
size_t to_copy = (std::min)(count - l.size(), b.size());
copy_if_n(b.begin(), b.end(), std::back_inserter(l)
, to_copy, bind(&node_entry::fail_count, _1) == 0);
if (options & include_failed)
{
copy_n(b.begin(), b.end(), std::back_inserter(l), to_copy);
}
else
{
copy_if_n(b.begin(), b.end(), std::back_inserter(l)
, to_copy, bind(&node_entry::confirmed, _1));
}
TORRENT_ASSERT((int)l.size() <= count);
if ((int)l.size() == count)
{
TORRENT_ASSERT(std::count_if(l.begin(), l.end()
, boost::bind(&node_entry::fail_count, _1) != 0) == 0);
TORRENT_ASSERT((options & include_failed)
|| std::count_if(l.begin(), l.end()
, !boost::bind(&node_entry::confirmed, _1)) == 0);
return;
}
}
TORRENT_ASSERT((int)l.size() <= count);
TORRENT_ASSERT(std::count_if(l.begin(), l.end()
, boost::bind(&node_entry::fail_count, _1) != 0) == 0);
TORRENT_ASSERT((options & include_failed)
|| std::count_if(l.begin(), l.end()
, !boost::bind(&node_entry::confirmed, _1)) == 0);
}
routing_table::iterator routing_table::begin() const

View File

@ -273,19 +273,6 @@ bool rpc_manager::incoming(msg const& m)
#endif
o->reply(m);
m_transactions[tid] = 0;
if (m.piggy_backed_ping)
{
// there is a ping request piggy
// backed in this reply
msg ph;
ph.message_id = messages::ping;
ph.transaction_id = m.ping_transaction_id;
ph.addr = m.addr;
ph.reply = true;
reply(ph);
}
return m_table.node_seen(m.id, m.addr);
}
else
@ -459,40 +446,10 @@ void rpc_manager::reply(msg& m)
if (m_destructing) return;
TORRENT_ASSERT(m.reply);
m.piggy_backed_ping = false;
m.id = m_our_id;
m_send(m);
}
void rpc_manager::reply_with_ping(msg& m)
{
INVARIANT_CHECK;
if (m_destructing) return;
TORRENT_ASSERT(m.reply);
m.piggy_backed_ping = true;
m.id = m_our_id;
m.ping_transaction_id.clear();
std::back_insert_iterator<std::string> out(m.ping_transaction_id);
io::write_uint16(m_next_transaction_id, out);
TORRENT_ASSERT(allocation_size() >= sizeof(null_observer));
observer_ptr o(new (allocator().malloc()) null_observer(allocator()));
#ifndef NDEBUG
o->m_in_constructor = false;
#endif
TORRENT_ASSERT(!m_transactions[m_next_transaction_id]);
o->sent = time_now();
o->target_addr = m.addr;
m_send(m);
new_transaction_id(o);
}
} } // namespace libtorrent::dht

View File

@ -231,6 +231,7 @@ namespace aux {
(*m_logger) << "sizeof(address_v4): " << sizeof(address_v4) << "\n";
(*m_logger) << "sizeof(address_v6): " << sizeof(address_v6) << "\n";
(*m_logger) << "sizeof(void*): " << sizeof(void*) << "\n";
(*m_logger) << "sizeof(node_entry): " << sizeof(dht::node_entry) << "\n";
#endif
#ifdef TORRENT_STATS