merged chagnes from RC_1_0

This commit is contained in:
Arvid Norberg 2014-11-01 22:47:56 +00:00
parent 8bd8426f04
commit 34af25beaa
11 changed files with 222 additions and 129 deletions

View File

@ -35,6 +35,8 @@
1.0.3 release
* improve DHT maintanence performance (by pinging instead of full lookups)
* fix bug in DHT routing table node-id prefix optimization
* fix incorrect behavior of flag_use_resume_save_path
* fix protocol race-condition in super seeding mode
* support read-only DHT nodes

View File

@ -1748,8 +1748,8 @@ int main(int argc, char* argv[])
, end(sess_stat.dht_routing_table.end()); i != end; ++i, ++bucket)
{
snprintf(str, sizeof(str)
, "%3d [%2d, %2d] active: %d\n"
, bucket, i->num_nodes, i->num_replacements, i->last_active);
, "%3d [%3d, %d]\n"
, bucket, i->num_nodes, i->num_replacements);
out += str;
}

View File

@ -204,7 +204,6 @@ public:
virtual ~node_impl() {}
void tick();
void refresh(node_id const& id, find_data::nodes_callback const& f);
void bootstrap(std::vector<udp::endpoint> const& nodes
, find_data::nodes_callback const& f);
void add_router_node(udp::endpoint router);
@ -317,6 +316,10 @@ private:
ptime m_last_tracker_tick;
// the last time we issued a bootstrap or a refresh on our own ID, to expand
// the routing table buckets close to us.
ptime m_last_self_refresh;
// secret random numbers used to create write tokens
int m_secret[2];

View File

@ -37,6 +37,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/socket.hpp"
#include "libtorrent/address.hpp"
#include "libtorrent/union_endpoint.hpp"
#include "libtorrent/time.hpp" // for time_now()
#ifdef TORRENT_DHT_VERBOSE_LOGGING
#include "libtorrent/time.hpp"
@ -47,8 +48,10 @@ namespace libtorrent { namespace dht
struct node_entry
{
node_entry(node_id const& id_, udp::endpoint ep, int roundtriptime = 0xffff, bool pinged = false)
: id(id_)
node_entry(node_id const& id_, udp::endpoint ep, int roundtriptime = 0xffff
, bool pinged = false)
: last_queried(pinged ? time_now() : min_time())
, id(id_)
, endpoint(ep)
, rtt(roundtriptime & 0xffff)
, timeout_count(pinged ? 0 : 0xff)
@ -59,7 +62,8 @@ struct node_entry
}
node_entry(udp::endpoint ep)
: id(0)
: last_queried(min_time())
, id(0)
, endpoint(ep)
, rtt(0xffff)
, timeout_count(0xff)
@ -70,7 +74,8 @@ struct node_entry
}
node_entry()
: id(0)
: last_queried(min_time())
, id(0)
, rtt(0xffff)
, timeout_count(0xff)
{
@ -88,8 +93,11 @@ struct node_entry
bool confirmed() const { return timeout_count == 0; }
void update_rtt(int new_rtt)
{
TORRENT_ASSERT(new_rtt <= 0xffff);
TORRENT_ASSERT(new_rtt >= 0);
if (new_rtt == 0xffff) return;
if (rtt == 0xffff) rtt = new_rtt;
else rtt = int(rtt) / 3 + int(new_rtt) * 2 / 3;
else rtt = int(rtt) * 2 / 3 + int(new_rtt) / 3;
}
address addr() const { return endpoint.address(); }
int port() const { return endpoint.port; }
@ -98,6 +106,9 @@ struct node_entry
ptime first_seen;
#endif
// the time we last received a response for a request to this peer
ptime last_queried;
node_id id;
union_endpoint endpoint;

View File

@ -70,7 +70,6 @@ struct routing_table_node
{
bucket_t replacements;
bucket_t live_nodes;
ptime last_active;
};
// differences in the implementation from the description in
@ -117,15 +116,14 @@ public:
// the node will be ignored.
void heard_about(node_id const& id, udp::endpoint const& ep);
// if any bucket in the routing table needs to be refreshed
// this function will return true and set the target to an
// appropriate target inside that bucket
bool need_refresh(node_id& target) const;
node_entry const* next_refresh(node_id& target);
enum
{
// nodes that have not been pinged are considered failed by this flag
include_failed = 1
};
// fills the vector with the count nodes from our buckets that
// are nearest to the given id.
void find_node(node_id const& id, std::vector<node_entry>& l
@ -169,8 +167,6 @@ public:
void print_state(std::ostream& os) const;
#endif
void touch_bucket(node_id const& target);
int bucket_limit(int bucket) const;
#if TORRENT_USE_INVARIANT_CHECKS

View File

@ -84,6 +84,8 @@ struct traversal_algorithm : boost::noncopyable
int invoke_count() const { return m_invoke_count; }
int branch_factor() const { return m_branch_factor; }
node_impl& node() const { return m_node; }
protected:
// returns true if we're done

View File

@ -93,8 +93,10 @@ namespace libtorrent
int num_nodes;
int num_replacements;
#ifndef TORRENT_NO_DEPRECATE
// number of seconds since last activity
int last_active;
#endif
};
// holds counters and gauges for the uTP sockets

View File

@ -107,6 +107,7 @@ node_impl::node_impl(alert_dispatcher* alert_disp
, m_rpc(m_id, m_table, sock)
, m_observer(observer)
, m_last_tracker_tick(time_now())
, m_last_self_refresh(min_time())
, m_post_alert(alert_disp)
, m_sock(sock)
, m_counters(cnt)
@ -174,17 +175,11 @@ std::string node_impl::generate_token(udp::endpoint const& addr, char const* inf
return token;
}
void node_impl::refresh(node_id const& id
, find_data::nodes_callback const& f)
{
boost::intrusive_ptr<dht::refresh> r(new dht::refresh(*this, id, f));
r->start();
}
void node_impl::bootstrap(std::vector<udp::endpoint> const& nodes
, find_data::nodes_callback const& f)
{
boost::intrusive_ptr<dht::refresh> r(new dht::bootstrap(*this, m_id, f));
boost::intrusive_ptr<dht::bootstrap> r(new dht::bootstrap(*this, m_id, f));
m_last_self_refresh = time_now();
#ifdef TORRENT_DHT_VERBOSE_LOGGING
int count = 0;
@ -272,8 +267,7 @@ void node_impl::incoming(msg const& m)
case 'r':
{
node_id id;
if (m_rpc.incoming(m, &id, m_settings))
refresh(id, boost::bind(&nop));
m_rpc.incoming(m, &id, m_settings);
break;
}
case 'q':
@ -426,12 +420,85 @@ void node_impl::get_item(char const* pk, std::string const& salt
ta->start();
}
struct ping_observer : observer
{
ping_observer(
boost::intrusive_ptr<traversal_algorithm> const& algorithm
, udp::endpoint const& ep, node_id const& id)
: observer(algorithm, ep, id)
{}
// parses out "nodes"
virtual void reply(msg const& m)
{
flags |= flag_done;
lazy_entry const* r = m.message.dict_find_dict("r");
if (!r)
{
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(traversal) << "[" << m_algorithm.get()
<< "] missing response dict";
#endif
return;
}
// look for nodes
lazy_entry const* n = r->dict_find_string("nodes");
if (n)
{
char const* nodes = n->string_ptr();
char const* end = nodes + n->string_length();
while (end - nodes >= 26)
{
node_id id;
std::copy(nodes, nodes + 20, id.begin());
nodes += 20;
m_algorithm.get()->node().m_table.heard_about(id
, detail::read_v4_endpoint<udp::endpoint>(nodes));
}
}
}
};
void node_impl::tick()
{
// every now and then we refresh our own ID, just to keep
// expanding the routing table buckets closer to us.
ptime now = time_now();
if (m_last_self_refresh + minutes(10) < now)
{
boost::intrusive_ptr<dht::refresh> r(new dht::refresh(*this, m_id
, boost::bind(&nop)));
r->start();
m_last_self_refresh = now;
return;
}
node_id target;
if (m_table.need_refresh(target))
refresh(target, boost::bind(&nop));
node_entry const* ne = m_table.next_refresh(target);
if (ne == NULL) return;
void* ptr = m_rpc.allocate_observer();
if (ptr == 0) return;
// create a dummy traversal_algorithm
// this is unfortunately necessary for the observer
// to free itself from the pool when it's being released
boost::intrusive_ptr<traversal_algorithm> algo(
new traversal_algorithm(*this, (node_id::min)()));
observer_ptr o(new (ptr) ping_observer(algo, ne->ep(), ne->id));
#if defined TORRENT_DEBUG || TORRENT_RELEASE_ASSERTS
o->m_in_constructor = false;
#endif
entry e;
e["y"] = "q";
e["q"] = "find_node";
entry& a = e["a"];
a["target"] = target.to_string();
m_rpc.invoke(e, ne->ep(), o);
}
time_duration node_impl::connection_timeout()

View File

@ -100,18 +100,15 @@ void routing_table::status(session_status& s) const
boost::tie(s.dht_nodes, s.dht_node_cache) = size();
s.dht_global_nodes = num_global_nodes();
ptime now = time_now();
for (table_t::const_iterator i = m_buckets.begin()
, end(m_buckets.end()); i != end; ++i)
{
dht_routing_bucket b;
b.num_nodes = i->live_nodes.size();
b.num_replacements = i->replacements.size();
if (i->last_active.time_since_epoch().count() < 0)
b.last_active = INT_MAX;
else
b.last_active = int(total_seconds(now - i->last_active));
#ifndef TORRENT_NO_DEPRECATE
b.last_active = 0;
#endif
s.dht_routing_table.push_back(b);
}
}
@ -213,21 +210,25 @@ void routing_table::print_state(std::ostream& os) const
os << "-";
os << "\n\n";
ptime now = time_now();
os << "nodes:\n";
int bucket_index = 0;
for (table_t::const_iterator i = m_buckets.begin(), end(m_buckets.end());
i != end; ++i, ++bucket_index)
{
// if (i->live_nodes.empty()) continue;
os << "=== BUCKET == " << bucket_index
<< " == " << i->live_nodes.size() << "|" << i->replacements.size();
os << "\n=== BUCKET == " << bucket_index
<< " == " << i->live_nodes.size() << "|" << i->replacements.size()
<< " ===== \n";
if (i->last_active < min_time() + seconds(161))
os << " == -";
int id_shift;
// the last bucket is special, since it hasn't been split yet, it
// includes that top bit as well
if (bucket_index + 1 == m_buckets.size())
id_shift = bucket_index;
else
os << " == " << total_seconds(time_now() - i->last_active);
id_shift = bucket_index + 1;
os << " seconds ago ===== \n";
for (bucket_t::const_iterator j = i->live_nodes.begin()
, end(i->live_nodes.end()); j != end; ++j)
{
@ -243,14 +244,25 @@ void routing_table::print_state(std::ostream& os) const
top_mask = (0xff << mask_shift) & 0xff;
node_id id = j->id;
id <<= bucket_index + 1;
os << " prefix: " << ((id[0] & top_mask) >> mask_shift)
<< " id: " << j->id
<< " rtt: " << j->rtt
<< " ip: " << j->ep()
<< " fails: " << j->fail_count()
<< " pinged: " << j->pinged()
<< " dist: " << distance_exp(m_id, j->id)
id <<= id_shift;
os << " prefx: " << std::setw(2) << std::hex << ((id[0] & top_mask) >> mask_shift) << std::dec
<< " id: " << j->id;
if (j->rtt == 0xffff)
os << " rtt: ";
else
os << " rtt: " << std::setw(4) << j->rtt;
os << " fail: " << j->fail_count()
<< " ping: " << j->pinged()
<< " dist: " << std::setw(3) << distance_exp(m_id, j->id);
if (j->last_queried == min_time())
os << " query: ";
else
os << " query: " << std::setw(3) << total_seconds(now - j->last_queried);
os << " ip: " << j->ep()
<< "\n";
}
}
@ -280,12 +292,20 @@ void routing_table::print_state(std::ostream& os) const
TORRENT_ASSERT_VAL(bucket_size_limit <= 256, bucket_size_limit);
bool sub_buckets[256];
memset(sub_buckets, 0, sizeof(sub_buckets));
int id_shift;
// the last bucket is special, since it hasn't been split yet, it
// includes that top bit as well
if (bucket_index + 1 == m_buckets.size())
id_shift = bucket_index;
else
id_shift = bucket_index + 1;
for (bucket_t::const_iterator j = i->live_nodes.begin()
, end(i->live_nodes.end()); j != end; ++j)
{
node_id id = j->id;
id <<= bucket_index + 1;
id <<= id_shift;
int b = (id[0] & top_mask) >> mask_shift;
TORRENT_ASSERT(b >= 0 && b < int(sizeof(sub_buckets)/sizeof(sub_buckets[0])));
sub_buckets[b] = true;
@ -299,72 +319,60 @@ void routing_table::print_state(std::ostream& os) const
#endif
void routing_table::touch_bucket(node_id const& target)
node_entry const* routing_table::next_refresh(node_id& target)
{
table_t::iterator i = find_bucket(target);
i->last_active = time_now();
}
// find the node with the least recent 'last_queried' field. if it's too
// recent, return false. Otherwise return a random target ID that's close to
// a missing prefix for that bucket
// returns true if lhs is in more need of a refresh than rhs
bool compare_bucket_refresh(routing_table_node const& lhs, routing_table_node const& rhs)
{
// add the number of nodes to prioritize buckets with few nodes in them
return lhs.last_active + seconds(lhs.live_nodes.size() * 5)
< rhs.last_active + seconds(rhs.live_nodes.size() * 5);
}
node_entry* candidate = NULL;
int bucket_idx = -1;
// TODO: instad of refreshing a bucket by using find_nodes,
// ping each node periodically
bool routing_table::need_refresh(node_id& target) const
{
INVARIANT_CHECK;
ptime now = time_now();
// refresh our own bucket once every 15 minutes
if (now - minutes(15) > m_last_self_refresh)
// this will have a bias towards pinging nodes close to us first.
int idx = m_buckets.size() - 1;
for (table_t::reverse_iterator i = m_buckets.rbegin()
, end(m_buckets.rend()); i != end; ++i, --idx)
{
m_last_self_refresh = now;
target = m_id;
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(table) << "need_refresh [ bucket: self target: " << target << " ]";
#endif
return true;
for (bucket_t::iterator j = i->live_nodes.begin()
, end(i->live_nodes.end()); j != end; ++j)
{
if (j->last_queried == min_time())
{
bucket_idx = idx;
candidate = &*j;
goto out;
}
if (candidate == NULL || j->last_queried < candidate->last_queried)
{
candidate = &*j;
bucket_idx = idx;
}
}
}
out:
// make sure we don't pick the same node again next time we want to refresh
// the routing table
if (candidate)
{
candidate->last_queried = time_now();
// generate a random node_id within the given bucket
// TODO: 2 it would be nice to have a bias towards node-id prefixes that
// are missing in the bucket
target = generate_random_id();
int num_bits = bucket_idx + 1;
node_id mask = generate_prefix_mask(num_bits);
// target = (target & ~mask) | (root & mask)
node_id root = m_id;
root &= mask;
target &= ~mask;
target |= root;
}
if (m_buckets.empty()) return false;
table_t::const_iterator i = std::min_element(m_buckets.begin(), m_buckets.end()
, &compare_bucket_refresh);
if (now - minutes(15) < i->last_active) return false;
if (now - seconds(45) < m_last_refresh) return false;
// generate a random node_id within the given bucket
target = generate_random_id();
int num_bits = std::distance(m_buckets.begin(), i) + 1;
node_id mask = generate_prefix_mask(num_bits);
// target = (target & ~mask) | (root & mask)
node_id root = m_id;
root &= mask;
target &= ~mask;
target |= root;
// make sure this is in another subtree than m_id
// clear the (num_bits - 1) bit and then set it to the
// inverse of m_id's corresponding bit.
target[(num_bits - 1) / 8] &= ~(0x80 >> ((num_bits - 1) % 8));
target[(num_bits - 1) / 8] |=
(~(m_id[(num_bits - 1) / 8])) & (0x80 >> ((num_bits - 1) % 8));
TORRENT_ASSERT(distance_exp(m_id, target) == 160 - num_bits);
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(table) << "need_refresh [ bucket: " << num_bits << " target: " << target << " ]";
#endif
m_last_refresh = now;
return true;
return candidate;
}
void routing_table::replacement_cache(bucket_t& nodes) const
@ -385,8 +393,6 @@ routing_table::table_t::iterator routing_table::find_bucket(node_id const& id)
if (num_buckets == 0)
{
m_buckets.push_back(routing_table_node());
// add 160 seconds to prioritize higher buckets (i.e. buckets closer to us)
m_buckets.back().last_active = min_time() + seconds(160);
++num_buckets;
}
@ -509,6 +515,7 @@ bool routing_table::add_node(node_entry e)
// and be done with it
existing->timeout_count = 0;
existing->update_rtt(e.rtt);
existing->last_queried = e.last_queried;
return ret;
}
else if (existing)
@ -680,7 +687,12 @@ bool routing_table::add_node(node_entry e)
mask = (0xff << mask_shift) & 0xff;
node_id id = e.id;
id <<= bucket_index + 1;
// the last bucket is special, since it hasn't been split yet, it
// includes that top bit as well
if (bucket_index + 1 == m_buckets.size())
id <<= bucket_index;
else
id <<= bucket_index + 1;
// pick out all nodes that have the same prefix as the new node
std::vector<bucket_t::iterator> nodes;
@ -763,7 +775,8 @@ bool routing_table::add_node(node_entry e)
*j = e;
m_ips.insert(e.addr().to_v4().to_bytes());
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(table) << "replacing node with higher RTT: " << e.id << " " << e.addr();
TORRENT_LOG(table) << "replacing node with higher RTT: " << e.id
<< " " << e.addr();
#endif
return ret;
}
@ -823,6 +836,8 @@ bool routing_table::add_node(node_entry e)
nb.push_back(e);
else if (int(nrb.size()) < m_bucket_size)
nrb.push_back(e);
else
nb.push_back(e); // trigger another split
m_ips.insert(e.addr().to_v4().to_bytes());
@ -842,9 +857,6 @@ void routing_table::split_bucket()
// this is the last bucket, and it's full already. Split
// it by adding another bucket
m_buckets.push_back(routing_table_node());
// the extra seconds added to the end is to prioritize
// buckets closer to us when refreshing
m_buckets.back().last_active = min_time() + seconds(160 - m_buckets.size());
bucket_t& new_bucket = m_buckets.back().live_nodes;
bucket_t& new_replacement_bucket = m_buckets.back().replacements;
@ -980,20 +992,18 @@ void routing_table::add_router_node(udp::endpoint router)
m_router_nodes.insert(router);
}
// we heard from this node, but we don't know if it
// was spoofed or not (i.e. pinged == false)
// we heard from this node, but we don't know if it was spoofed or not (i.e.
// pinged == false)
void routing_table::heard_about(node_id const& id, udp::endpoint const& ep)
{
add_node(node_entry(id, ep));
}
// this function is called every time the node sees
// a sign of a node being alive. This node will either
// be inserted in the k-buckets or be moved to the top
// of its bucket.
// the return value indicates if the table needs a refresh.
// if true, the node should refresh the table (i.e. do a find_node
// on its own id)
// this function is called every time the node sees a sign of a node being
// alive. This node will either be inserted in the k-buckets or be moved to the
// top of its bucket. the return value indicates if the table needs a refresh.
// if true, the node should refresh the table (i.e. do a find_node on its own
// id)
bool routing_table::node_seen(node_id const& id, udp::endpoint ep, int rtt)
{
return add_node(node_entry(id, ep, rtt, true));

View File

@ -476,8 +476,6 @@ void traversal_algorithm::add_router_entries()
void traversal_algorithm::init()
{
// update the last activity of this bucket
m_node.m_table.touch_bucket(m_target);
m_branch_factor = m_node.branch_factor();
m_node.add_traversal_algorithm(this);
}

View File

@ -3304,8 +3304,10 @@ retry:
if (tor)
session_log("prioritizing DHT announce: \"%s\"", tor->name().c_str());
#endif
// trigger a DHT announce right away if we just
// added a new torrent and there's no back-log
// trigger a DHT announce right away if we just added a new torrent and
// there's no back-log. in the timer handler, as long as there are more
// high priority torrents to be announced to the DHT, it will keep the
// timer interval short until all torrents have been announced.
if (m_dht_torrents.size() == 1)
{
#if defined TORRENT_ASIO_DEBUGGING