merged changes from RC_1_0

This commit is contained in:
Arvid Norberg 2014-11-02 09:41:29 +00:00
parent 34af25beaa
commit 3742fd2699
9 changed files with 144 additions and 113 deletions

View File

@ -1747,9 +1747,16 @@ int main(int argc, char* argv[])
for (std::vector<dht_routing_bucket>::iterator i = sess_stat.dht_routing_table.begin()
, end(sess_stat.dht_routing_table.end()); i != end; ++i, ++bucket)
{
char const* progress_bar =
"################################"
"################################"
"################################"
"################################";
snprintf(str, sizeof(str)
, "%3d [%3d, %d]\n"
, bucket, i->num_nodes, i->num_replacements);
, "%3d [%3d, %d] %s%s\n"
, bucket, i->num_nodes, i->num_replacements
, progress_bar + (128 - i->num_nodes)
, "--------" + (8 - i->num_replacements));
out += str;
}

View File

@ -223,7 +223,7 @@ public:
node_id const& nid() const { return m_id; }
boost::tuple<int, int> size() const{ return m_table.size(); }
boost::tuple<int, int> size() const { return m_table.size(); }
size_type num_global_nodes() const
{ return m_table.num_global_nodes(); }
@ -284,6 +284,8 @@ public:
protected:
void send_single_refresh(udp::endpoint const& ep, int bucket
, node_id const& id = node_id());
void lookup_peers(sha1_hash const& info_hash, entry& reply
, bool noseed, bool scrape) const;
bool lookup_torrents(sha1_hash const& target, entry& reply

View File

@ -52,7 +52,8 @@ struct node_entry
, bool pinged = false)
: last_queried(pinged ? time_now() : min_time())
, id(id_)
, endpoint(ep)
, a(ep.address().to_v4().to_bytes())
, p(ep.port())
, rtt(roundtriptime & 0xffff)
, timeout_count(pinged ? 0 : 0xff)
{
@ -64,7 +65,8 @@ struct node_entry
node_entry(udp::endpoint ep)
: last_queried(min_time())
, id(0)
, endpoint(ep)
, a(ep.address().to_v4().to_bytes())
, p(ep.port())
, rtt(0xffff)
, timeout_count(0xff)
{
@ -76,6 +78,7 @@ struct node_entry
node_entry()
: last_queried(min_time())
, id(0)
, p(0)
, rtt(0xffff)
, timeout_count(0xff)
{
@ -89,7 +92,7 @@ struct node_entry
void timed_out() { if (pinged() && timeout_count < 0xfe) ++timeout_count; }
int fail_count() const { return pinged() ? timeout_count : 0; }
void reset_fail_count() { if (pinged()) timeout_count = 0; }
udp::endpoint ep() const { return udp::endpoint(endpoint); }
udp::endpoint ep() const { return udp::endpoint(address_v4(a), p); }
bool confirmed() const { return timeout_count == 0; }
void update_rtt(int new_rtt)
{
@ -99,8 +102,8 @@ struct node_entry
if (rtt == 0xffff) rtt = new_rtt;
else rtt = int(rtt) * 2 / 3 + int(new_rtt) / 3;
}
address addr() const { return endpoint.address(); }
int port() const { return endpoint.port; }
address addr() const { return address_v4(a); }
int port() const { return p; }
#ifdef TORRENT_DHT_VERBOSE_LOGGING
ptime first_seen;
@ -111,7 +114,8 @@ struct node_entry
node_id id;
union_endpoint endpoint;
address_v4::bytes_type a;
boost::uint16_t p;
// the average RTT of this node
boost::uint16_t rtt;

View File

@ -39,6 +39,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include <boost/utility.hpp>
#include <boost/tuple/tuple.hpp>
#include <boost/array.hpp>
#include <boost/noncopyable.hpp>
#include <set>
#include <libtorrent/kademlia/logging.hpp>
@ -82,7 +83,7 @@ struct routing_table_node
// bucket has failed, then it is put in the replacement
// cache (just like in the paper).
class TORRENT_EXTRA_EXPORT routing_table
class TORRENT_EXTRA_EXPORT routing_table : boost::noncopyable
{
public:
typedef std::vector<routing_table_node> table_t;
@ -116,7 +117,7 @@ public:
// the node will be ignored.
void heard_about(node_id const& id, udp::endpoint const& ep);
node_entry const* next_refresh(node_id& target);
node_entry const* next_refresh();
enum
{

View File

@ -350,23 +350,7 @@ void node_impl::add_node(udp::endpoint node)
{
// ping the node, and if we get a reply, it
// will be added to the routing table
void* ptr = m_rpc.allocate_observer();
if (ptr == 0) return;
// create a dummy traversal_algorithm
// this is unfortunately necessary for the observer
// to free itself from the pool when it's being released
boost::intrusive_ptr<traversal_algorithm> algo(
new traversal_algorithm(*this, (node_id::min)()));
observer_ptr o(new (ptr) null_observer(algo, node, node_id(0)));
#if defined TORRENT_DEBUG || TORRENT_RELEASE_ASSERTS
o->m_in_constructor = false;
#endif
entry e;
e["y"] = "q";
e["q"] = "ping";
m_counters.inc_stats_counter(counters::dht_get_peers_out);
m_rpc.invoke(e, node, o);
send_single_refresh(node, m_table.num_active_buckets());
}
void node_impl::announce(sha1_hash const& info_hash, int listen_port, int flags
@ -477,28 +461,53 @@ void node_impl::tick()
return;
}
node_id target;
node_entry const* ne = m_table.next_refresh(target);
node_entry const* ne = m_table.next_refresh();
if (ne == NULL) return;
int bucket = 159 - distance_exp(m_id, ne->id);
send_single_refresh(ne->ep(), bucket, ne->id);
}
void node_impl::send_single_refresh(udp::endpoint const& ep, int bucket
, node_id const& id)
{
void* ptr = m_rpc.allocate_observer();
if (ptr == 0) return;
// generate a random node_id within the given bucket
// TODO: 2 it would be nice to have a bias towards node-id prefixes that
// are missing in the bucket
node_id target = generate_random_id();
node_id mask = generate_prefix_mask(bucket + 1);
// target = (target & ~mask) | (root & mask)
node_id root = m_id;
root &= mask;
target &= ~mask;
target |= root;
// create a dummy traversal_algorithm
// this is unfortunately necessary for the observer
// to free itself from the pool when it's being released
boost::intrusive_ptr<traversal_algorithm> algo(
new traversal_algorithm(*this, (node_id::min)()));
observer_ptr o(new (ptr) ping_observer(algo, ne->ep(), ne->id));
observer_ptr o(new (ptr) ping_observer(algo, ep, id));
#if defined TORRENT_DEBUG || TORRENT_RELEASE_ASSERTS
o->m_in_constructor = false;
#endif
entry e;
e["y"] = "q";
e["q"] = "find_node";
entry& a = e["a"];
a["target"] = target.to_string();
m_rpc.invoke(e, ne->ep(), o);
// use get_peers instead of find_node. We'll get nodes in the response
// either way.
e["q"] = "get_peers";
a["info_hash"] = target.to_string();
m_counters.inc_stats_counter(counters::dht_get_peers_out);
// e["q"] = "find_node";
// a["target"] = target.to_string();
m_rpc.invoke(e, ep, o);
}
time_duration node_impl::connection_timeout()

View File

@ -71,9 +71,13 @@ bool refresh::invoke(observer_ptr o)
{
entry e;
e["y"] = "q";
e["q"] = "find_node";
entry& a = e["a"];
a["target"] = target().to_string();
e["q"] = "get_peers";
a["info_hash"] = target().to_string();
// e["q"] = "find_node";
// a["target"] = target().to_string();
m_node.stats_counters().inc_stats_counter(counters::dht_find_node_out);
return m_node.m_rpc.invoke(e, o->target_ep(), o);
}

View File

@ -178,41 +178,24 @@ void routing_table::print_state(std::ostream& os) const
<< "global node count: " << num_global_nodes() << "\n"
<< "node_id: " << m_id << "\n\n";
os << "number of nodes per bucket:\n-- live ";
for (int i = 8; i < 160; ++i)
os << "-";
os << "\n";
os << "number of nodes per bucket:\n";
int max_size = bucket_limit(0);
for (int k = 0; k < max_size; ++k)
int idx = 0;
for (table_t::const_iterator i = m_buckets.begin(), end(m_buckets.end());
i != end; ++i, ++idx)
{
for (table_t::const_iterator i = m_buckets.begin(), end(m_buckets.end());
i != end; ++i)
{
os << (int(i->live_nodes.size()) > (max_size - 1 - k) ? "|" : " ");
}
os << std::setw(2) << idx << ": ";
for (int k = 0; k < int(i->live_nodes.size()); ++k)
os << "#";
for (int k = 0; k < int(i->replacements.size()); ++k)
os << "-";
os << "\n";
}
for (int i = 0; i < 160; ++i) os << "+";
os << "\n";
for (int k = 0; k < m_bucket_size; ++k)
{
for (table_t::const_iterator i = m_buckets.begin(), end(m_buckets.end());
i != end; ++i)
{
os << (int(i->replacements.size()) > k ? "|" : " ");
}
os << "\n";
}
os << "-- cached ";
for (int i = 10; i < 160; ++i)
os << "-";
os << "\n\n";
ptime now = time_now();
os << "nodes:\n";
os << "\nnodes:";
int bucket_index = 0;
for (table_t::const_iterator i = m_buckets.begin(), end(m_buckets.end());
i != end; ++i, ++bucket_index)
@ -267,7 +250,7 @@ void routing_table::print_state(std::ostream& os) const
}
}
os << "node spread per bucket:\n";
os << "\nnode spread per bucket:\n";
bucket_index = 0;
for (table_t::const_iterator i = m_buckets.begin(), end(m_buckets.end());
i != end; ++i, ++bucket_index)
@ -311,7 +294,9 @@ void routing_table::print_state(std::ostream& os) const
sub_buckets[b] = true;
}
os << bucket_index << " mask:" << (top_mask >> mask_shift) << ": [";
os << std::dec << std::setw(2) << bucket_index << " mask: " << std::setw(2)
<< std::hex << (top_mask >> mask_shift) << ": [";
for (int i = 0; i < bucket_size_limit; ++i) os << (sub_buckets[i] ? "X" : " ");
os << "]\n";
}
@ -319,7 +304,7 @@ void routing_table::print_state(std::ostream& os) const
#endif
node_entry const* routing_table::next_refresh(node_id& target)
node_entry const* routing_table::next_refresh()
{
// find the node with the least recent 'last_queried' field. if it's too
// recent, return false. Otherwise return a random target ID that's close to
@ -355,23 +340,8 @@ out:
// make sure we don't pick the same node again next time we want to refresh
// the routing table
if (candidate)
{
candidate->last_queried = time_now();
// generate a random node_id within the given bucket
// TODO: 2 it would be nice to have a bias towards node-id prefixes that
// are missing in the bucket
target = generate_random_id();
int num_bits = bucket_idx + 1;
node_id mask = generate_prefix_mask(num_bits);
// target = (target & ~mask) | (root & mask)
node_id root = m_id;
root &= mask;
target &= ~mask;
target |= root;
}
return candidate;
}
@ -454,8 +424,8 @@ void routing_table::remove_node(node_entry* n
&& n < &bucket->replacements[0] + bucket->replacements.size())
{
int idx = n - &bucket->replacements[0];
TORRENT_ASSERT(m_ips.count(n->endpoint.address().to_v4().to_bytes()) > 0);
erase_one(m_ips, n->endpoint.address().to_v4().to_bytes());
TORRENT_ASSERT(m_ips.count(n->a) > 0);
erase_one(m_ips, n->a);
bucket->replacements.erase(bucket->replacements.begin() + idx);
}
@ -464,8 +434,8 @@ void routing_table::remove_node(node_entry* n
&& n < &bucket->live_nodes[0] + bucket->live_nodes.size())
{
int idx = n - &bucket->live_nodes[0];
TORRENT_ASSERT(m_ips.count(n->endpoint.address().to_v4().to_bytes()) > 0);
erase_one(m_ips, n->endpoint.address().to_v4().to_bytes());
TORRENT_ASSERT(m_ips.count(n->a) > 0);
erase_one(m_ips, n->a);
bucket->live_nodes.erase(bucket->live_nodes.begin() + idx);
}
}
@ -623,15 +593,13 @@ bool routing_table::add_node(node_entry e)
// bit prefix has higher RTT than the new node, replace it.
// can we split the bucket?
bool can_split = false;
// only nodes that haven't failed can split the bucket, and we can only
// split the last bucket
bool can_split = (boost::next(i) == m_buckets.end() && m_buckets.size() < 159)
&& e.fail_count() == 0;
if (e.pinged() && e.fail_count() == 0)
{
// only nodes that are pinged and haven't failed
// can split the bucket, and we can only split
// the last bucket
can_split = (boost::next(i) == m_buckets.end() && m_buckets.size() < 159);
// if the node we're trying to insert is considered pinged,
// we may replace other nodes that aren't pinged
@ -929,21 +897,40 @@ void routing_table::for_each_node(
}
}
void routing_table::node_failed(node_id const& id, udp::endpoint const& ep)
void routing_table::node_failed(node_id const& nid, udp::endpoint const& ep)
{
INVARIANT_CHECK;
// if messages to ourself fails, ignore it
if (id == m_id) return;
if (nid == m_id) return;
table_t::iterator i = find_bucket(id);
table_t::iterator i = find_bucket(nid);
bucket_t& b = i->live_nodes;
bucket_t& rb = i->replacements;
bucket_t::iterator j = std::find_if(b.begin(), b.end()
, boost::bind(&node_entry::id, _1) == id);
, boost::bind(&node_entry::id, _1) == nid);
if (j == b.end()) return;
if (j == b.end())
{
j = std::find_if(rb.begin(), rb.end()
, boost::bind(&node_entry::id, _1) == nid);
if (j == rb.end()
|| j->ep() != ep) return;
j->timed_out();
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(table) << " NODE FAILED"
" id: " << nid <<
" ip: " << j->ep() <<
" fails: " << j->fail_count() <<
" pinged: " << j->pinged() <<
" up-time: " << total_seconds(time_now() - j->first_seen);
#endif
return;
}
// if the endpoint doesn't match, it's a different node
// claiming the same ID. The node we have in our routing
@ -956,7 +943,7 @@ void routing_table::node_failed(node_id const& id, udp::endpoint const& ep)
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(table) << " NODE FAILED"
" id: " << id <<
" id: " << nid <<
" ip: " << j->ep() <<
" fails: " << j->fail_count() <<
" pinged: " << j->pinged() <<
@ -973,7 +960,7 @@ void routing_table::node_failed(node_id const& id, udp::endpoint const& ep)
return;
}
erase_one(m_ips, j->addr().to_v4().to_bytes());
erase_one(m_ips, j->a);
b.erase(j);
// sort by RTT first, to find the node with the lowest

View File

@ -245,6 +245,10 @@ void traversal_algorithm::traverse(node_id const& id, udp::endpoint addr)
TORRENT_LOG(traversal) << time_now_string() << "[" << this << "] WARNING node returned a list which included a node with id 0";
}
#endif
// let the routing table know this node may exist
m_node.m_table.heard_about(id, addr);
add_entry(id, addr, 0);
}
@ -282,6 +286,11 @@ void traversal_algorithm::failed(observer_ptr o, int flags)
{
TORRENT_ASSERT(m_invoke_count >= 0);
// don't tell the routing table about
// node ids that we just generated ourself
if ((o->flags & observer::flag_no_id) == 0)
m_node.m_table.node_failed(o->id(), o->target_ep());
if (m_results.empty()) return;
TORRENT_ASSERT(o->flags & observer::flag_queried);
@ -325,10 +334,7 @@ void traversal_algorithm::failed(observer_ptr o, int flags)
<< " type: " << name()
;
#endif
// don't tell the routing table about
// node ids that we just generated ourself
if ((o->flags & observer::flag_no_id) == 0)
m_node.m_table.node_failed(o->id(), o->target_ep());
++m_timeouts;
--m_invoke_count;
TORRENT_ASSERT(m_invoke_count >= 0);

View File

@ -308,7 +308,7 @@ void announce_immutable_items(node_impl& node, udp::endpoint const* eps
{ "y", lazy_entry::string_t, 1, 0},
};
lazy_entry const* parsed[5];
lazy_entry const* parsed[6];
char error_string[200];
// fprintf(stderr, "msg: %s\n", print_entry(response).c_str());
@ -1375,7 +1375,8 @@ int test_main()
{"q", lazy_entry::string_t, 9, 0},
{"a", lazy_entry::dict_t, 0, key_desc_t::parse_children},
{"id", lazy_entry::string_t, 20, 0},
{"target", lazy_entry::string_t, 20, key_desc_t::last_child},
{"target", lazy_entry::string_t, 20, key_desc_t::optional},
{"info_hash", lazy_entry::string_t, 20, key_desc_t::optional | key_desc_t::last_child},
};
dht::key_desc_t get_peers_desc[] = {
@ -1398,6 +1399,7 @@ int test_main()
// bootstrap
g_sent_packets.clear();
do
{
dht::node_impl node(&ad, &s, sett, node_id::min(), ext, 0, cnt);
@ -1412,12 +1414,16 @@ int test_main()
TEST_EQUAL(g_sent_packets.front().first, initial_node);
lazy_from_entry(g_sent_packets.front().second, response);
ret = verify_message(&response, find_node_desc, parsed, 6, error_string, sizeof(error_string));
ret = verify_message(&response, find_node_desc, parsed, 7, error_string, sizeof(error_string));
if (ret)
{
TEST_EQUAL(parsed[0]->string_value(), "q");
TEST_EQUAL(parsed[2]->string_value(), "find_node");
if (parsed[0]->string_value() != "q" || parsed[2]->string_value() != "find_node") break;
TEST_CHECK(parsed[2]->string_value() == "find_node"
|| parsed[2]->string_value() == "get_peers");
if (parsed[0]->string_value() != "q"
|| (parsed[2]->string_value() != "find_node"
&& parsed[2]->string_value() != "get_peers")) break;
}
else
{
@ -1437,12 +1443,14 @@ int test_main()
TEST_EQUAL(g_sent_packets.front().first, found_node);
lazy_from_entry(g_sent_packets.front().second, response);
ret = verify_message(&response, find_node_desc, parsed, 6, error_string, sizeof(error_string));
ret = verify_message(&response, find_node_desc, parsed, 7, error_string, sizeof(error_string));
if (ret)
{
TEST_EQUAL(parsed[0]->string_value(), "q");
TEST_EQUAL(parsed[2]->string_value(), "find_node");
if (parsed[0]->string_value() != "q" || parsed[2]->string_value() != "find_node") break;
TEST_CHECK(parsed[2]->string_value() == "find_node"
|| parsed[2]->string_value() == "get_peers");
if (parsed[0]->string_value() != "q" || (parsed[2]->string_value() != "find_node"
&& parsed[2]->string_value() == "get_peers")) break;
}
else
{
@ -1460,6 +1468,7 @@ int test_main()
// get_peers
g_sent_packets.clear();
do
{
dht::node_id target = to_hash("1234876923549721020394873245098347598635");
@ -1551,6 +1560,7 @@ int test_main()
// immutable get
g_sent_packets.clear();
do
{
dht::node_impl node(&ad, &s, sett, node_id::min(), ext, 0, cnt);
@ -1595,6 +1605,7 @@ int test_main()
// mutable get
g_sent_packets.clear();
do
{
dht::node_impl node(&ad, &s, sett, node_id::min(), ext, 0, cnt);
@ -1669,7 +1680,7 @@ int test_main()
};
// immutable put
g_sent_packets.clear();
do
{
dht::node_impl node(&ad, &s, sett, node_id::min(), ext, 0, cnt);
@ -1749,7 +1760,7 @@ int test_main()
} while (false);
// mutable put
g_sent_packets.clear();
do
{
dht::node_impl node(&ad, &s, sett, node_id::min(), ext, 0, cnt);