improve DHT routing table to return better nodes (lower RTT and closer to target)

This commit is contained in:
Arvid Norberg 2013-09-09 04:16:52 +00:00
parent 7c66c2911f
commit 8ab8172cb7
5 changed files with 146 additions and 45 deletions

View File

@ -1,3 +1,4 @@
* improve DHT routing table to return better nodes (lower RTT and closer to target)
* don't use pointers to resume_data and file_priorities in add_torrent_params
* allow moving files to absolute paths, out of the download directory
* make move_storage more generic to allow both overwriting files as well as taking existing ones

View File

@ -43,6 +43,8 @@ POSSIBILITY OF SUCH DAMAGE.
namespace libtorrent { namespace dht
{
struct node_entry;
typedef libtorrent::sha1_hash node_id;
// returns the distance between the two nodes
@ -61,6 +63,7 @@ node_id TORRENT_EXTRA_EXPORT generate_random_id();
node_id TORRENT_EXTRA_EXPORT generate_id_impl(address const& ip_, boost::uint32_t r);
bool TORRENT_EXTRA_EXPORT verify_id(node_id const& nid, address const& source_ip);
bool TORRENT_EXTRA_EXPORT matching_prefix(node_entry const& n, int mask, int prefix, int bucket_index);
} } // namespace libtorrent::dht

View File

@ -37,6 +37,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include <boost/crc.hpp>
#include "libtorrent/kademlia/node_id.hpp"
#include "libtorrent/kademlia/node_entry.hpp"
#include "libtorrent/hasher.hpp"
#include "libtorrent/assert.hpp"
#include "libtorrent/broadcast_socket.hpp" // for is_local et.al
@ -173,5 +174,12 @@ node_id generate_id(address const& ip)
return generate_id_impl(ip, random());
}
bool matching_prefix(node_entry const& n, int mask, int prefix, int bucket_index)
{
node_id id = n.id;
id <<= bucket_index + 1;
return (id[0] & mask) == prefix;
}
} } // namespace libtorrent::dht

View File

@ -44,7 +44,6 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/broadcast_socket.hpp" // for cidr_distance
#include "libtorrent/session_status.hpp"
#include "libtorrent/kademlia/node_id.hpp"
#include "libtorrent/session_settings.hpp"
#include "libtorrent/time.hpp"
using boost::uint8_t;
@ -181,7 +180,21 @@ void routing_table::print_state(std::ostream& os) const
for (bucket_t::const_iterator j = i->live_nodes.begin()
, end(i->live_nodes.end()); j != end; ++j)
{
os << " id: " << j->id
int bucket_size_limit = bucket_limit(bucket_index);
boost::uint32_t top_mask = bucket_size_limit - 1;
int mask_shift = 0;
TORRENT_ASSERT_VAL(bucket_size_limit > 0, bucket_size_limit);
while ((top_mask & 0x80) == 0)
{
top_mask <<= 1;
++mask_shift;
}
top_mask = (0xff << mask_shift) & 0xff;
node_id id = j->id;
id <<= bucket_index + 1;
os << " prefix: " << ((id[0] & top_mask) >> mask_shift)
<< " id: " << j->id
<< " rtt: " << j->rtt
<< " ip: " << j->ep()
<< " fails: " << j->fail_count()
@ -196,11 +209,25 @@ void routing_table::print_state(std::ostream& os) const
for (table_t::const_iterator i = m_buckets.begin(), end(m_buckets.end());
i != end; ++i, ++bucket_index)
{
// mask out the first 8 bits
node_id mask(0);
mask[0] = 0xe0;
const int mask_shift = 5;
bool sub_buckets[8];
int bucket_size_limit = bucket_limit(bucket_index);
// mask out the first 3 bits, or more depending
// on the bucket_size_limit
// we have all the lower bits set in (bucket_size_limit-1)
// but we want the left-most bits to be set. Shift it
// until the MSB is set
boost::uint32_t top_mask = bucket_size_limit - 1;
int mask_shift = 0;
TORRENT_ASSERT_VAL(bucket_size_limit > 0, bucket_size_limit);
while ((top_mask & 0x80) == 0)
{
top_mask <<= 1;
++mask_shift;
}
top_mask = (0xff << mask_shift) & 0xff;
bucket_size_limit = (top_mask >> mask_shift) + 1;
TORRENT_ASSERT_VAL(bucket_size_limit <= 256, bucket_size_limit);
bool sub_buckets[256];
memset(sub_buckets, 0, sizeof(sub_buckets));
for (bucket_t::const_iterator j = i->live_nodes.begin()
@ -208,15 +235,13 @@ void routing_table::print_state(std::ostream& os) const
{
node_id id = j->id;
id <<= bucket_index + 1;
id &= mask;
id >>= mask_shift;
int b = id[0];
TORRENT_ASSERT(b >= 0 && b < 8);
int b = (id[0] & top_mask) >> mask_shift;
TORRENT_ASSERT(b >= 0 && b < sizeof(sub_buckets));
sub_buckets[b] = true;
}
os << bucket_index << ": [";
for (int i = 0; i < 8; ++i) os << (sub_buckets[i] ? "X" : " ");
os << bucket_index << " mask:" << (top_mask >> mask_shift) << ": [";
for (int i = 0; i < bucket_size_limit; ++i) os << (sub_buckets[i] ? "X" : " ");
os << "]\n";
}
}
@ -237,6 +262,8 @@ bool compare_bucket_refresh(routing_table_node const& lhs, routing_table_node co
< rhs.last_active + seconds(rhs.live_nodes.size() * 5);
}
// TODO: instad of refreshing a bucket by using find_nodes,
// ping each node periodically
bool routing_table::need_refresh(node_id& target) const
{
ptime now = time_now();
@ -360,6 +387,7 @@ node_entry* routing_table::find_node(udp::endpoint const& ep, routing_table::tab
bool routing_table::add_node(node_entry e)
{
// if we already have this (IP,port), don't do anything
if (m_router_nodes.find(e.ep()) != m_router_nodes.end()) return false;
bool ret = need_bootstrap();
@ -408,39 +436,30 @@ bool routing_table::add_node(node_entry e)
// delete the current entry before we instert the new one
bucket_t& b = existing_bucket->live_nodes;
bucket_t& rb = existing_bucket->replacements;
bool done = false;
for (bucket_t::iterator i = b.begin(), end(b.end());
i != end; ++i)
bucket_t::iterator i = std::find_if(b.begin(), b.end()
, boost::bind(&node_entry::ep, _1) == e.ep());
if (i != b.end())
{
if (i->addr() != e.addr() || i->port() != e.port()) continue;
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(table) << "node ID changed, deleting old entry: "
<< i->id << " " << i->addr();
#endif
b.erase(i);
#if defined TORRENT_DEBUG || TORRENT_RELEASE_ASSERTS
done = true;
#endif
break;
}
if (!done)
else
{
for (bucket_t::iterator i = rb.begin(), end(rb.end());
i != end; ++i)
{
if (i->addr() != e.addr() || i->port() != e.port()) continue;
i = std::find_if(rb.begin(), rb.end()
, boost::bind(&node_entry::ep, _1) == e.ep());
// this must hold because existing != NULL
TORRENT_ASSERT(i != rb.end());
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(table) << "node ID changed, deleting old entry: "
<< i->id << " " << i->addr();
TORRENT_LOG(table) << "node ID changed, deleting old entry: "
<< i->id << " " << i->addr();
#endif
rb.erase(i);
#if defined TORRENT_DEBUG || TORRENT_RELEASE_ASSERTS
done = true;
#endif
break;
}
rb.erase(i);
}
TORRENT_ASSERT(done);
m_ips.erase(e.addr().to_v4().to_bytes());
}
}
@ -533,8 +552,11 @@ bool routing_table::add_node(node_entry e)
// i.e. we haven't confirmed that they respond to messages.
// Then we look for nodes marked as stale
// in the k-bucket. If we find one, we can replace it.
// as the last replacement strategy, we look for nodes with the
// highest RTT, and if it's higher than the new node, we replace it
// then we look for nodes with the same 3 bit prefix (or however
// many bits prefix the bucket size warrants). If there is no other
// node with this prefix, remove the duplicate with the highest RTT.
// as the last replacement strategy, if the node we found matching our
// bit prefix has higher RTT than the new node, replace it.
// can we split the bucket?
bool can_split = false;
@ -571,8 +593,9 @@ bool routing_table::add_node(node_entry e)
j = std::max_element(b.begin(), b.end()
, boost::bind(&node_entry::fail_count, _1)
< boost::bind(&node_entry::fail_count, _2));
TORRENT_ASSERT(j != b.end());
if (j != b.end() && j->fail_count() > 0)
if (j->fail_count() > 0)
{
// i points to a node that has been marked
// as stale. Replace it with this new one
@ -582,12 +605,76 @@ bool routing_table::add_node(node_entry e)
// TORRENT_LOG(table) << "replacing stale node: " << e.id << " " << e.addr();
return ret;
}
// in order to provide as few lookups as possible before finding
// the data someone is looking for, make sure there is an affinity
// towards having a good spread of node IDs in each bucket
// in order to keep lookup times small, prefer nodes with low RTTs
boost::uint32_t mask = bucket_size_limit - 1;
int mask_shift = 0;
TORRENT_ASSERT_VAL(mask > 0, mask);
while ((mask & 0x80) == 0)
{
mask <<= 1;
++mask_shift;
}
j = std::max_element(b.begin(), b.end()
, boost::bind(&node_entry::rtt, _1)
< boost::bind(&node_entry::rtt, _2));
// in case bucket_size_limit is not an even power of 2
mask = (0xff << mask_shift) & 0xff;
// TODO: 2 we should really find the node with a matching prefix _and_ the highest
// RTT of those (if there are more than one)
node_id id = e.id;
id <<= bucket_index + 1;
j = std::find_if(b.begin(), b.end(), boost::bind(&matching_prefix, _1, mask, id[0] & mask, bucket_index));
if (j == b.end())
{
// there is no node in this prefix-slot, there must be some
// nodes sharing a prefix. Find all noes that do not
// have a unique prefix
std::sort(b.begin(), b.end(), boost::bind(&node_entry::id, _1) < boost::bind(&node_entry::id, _2));
std::vector<bucket_t::iterator> nodes;
int last_prefix = -1;
nodes.reserve(b.size());
for (j = b.begin(); j != b.end(); ++j)
{
node_id id = j->id;
id <<= bucket_index + 1;
int this_prefix = id[0] & mask;
if (this_prefix != last_prefix)
{
last_prefix = this_prefix;
continue;
}
if (nodes.empty() || nodes.back() != j-1)
nodes.push_back(j-1);
nodes.push_back(j);
}
if (!nodes.empty())
{
// from these nodes, pick the one with the highest RTT
// and replace it
std::vector<bucket_t::iterator>::iterator k = std::max_element(nodes.begin(), nodes.end()
, boost::bind(&node_entry::rtt, boost::bind(&bucket_t::iterator::operator*, _1))
< boost::bind(&node_entry::rtt, boost::bind(&bucket_t::iterator::operator*, _2)));
// in this case, we would really rather replace the node even if
// they have the same RTT. so, to emulate >=
e.rtt = (std::max)(0, e.rtt - 1);
j = *k;
}
else
{
j = std::max_element(b.begin(), b.end()
, boost::bind(&node_entry::rtt, _1)
< boost::bind(&node_entry::rtt, _2));
}
}
if (j != b.end() && j->rtt > e.rtt)
{
@ -597,6 +684,8 @@ bool routing_table::add_node(node_entry e)
// TORRENT_LOG(table) << "replacing node with higher RTT: " << e.id << " " << e.addr();
return ret;
}
// in order to keep lookup times small, prefer nodes with low RTTs
}
// if we can't split, try to insert into the replacement bucket

View File

@ -597,12 +597,12 @@ setup_transfer(session* ses1, session* ses2, session* ses3
if (connect_peers)
{
std::auto_ptr<alert> a;
do
/* do
{
a = wait_for_alert(*ses2, state_changed_alert::alert_type, "ses2");
} while (static_cast<state_changed_alert*>(a.get())->state != torrent_status::downloading);
wait_for_alert(*ses1, torrent_finished_alert::alert_type, "ses1");
*/
// wait_for_alert(*ses1, torrent_finished_alert::alert_type, "ses1");
error_code ec;
if (use_ssl_ports)