the DHT now refreshes buckets properly.

This commit is contained in:
Arvid Norberg 2007-05-12 01:52:25 +00:00
parent 9ffde51404
commit 9f902e7b9f
12 changed files with 517 additions and 462 deletions

View File

@ -20,16 +20,16 @@
</tbody>
</table>
<div class="contents topic" id="table-of-contents">
<p class="topic-title first"><a name="table-of-contents">Table of contents</a></p>
<p class="topic-title first">Table of contents</p>
<ul class="simple">
<li><a class="reference" href="#introduction" id="id6" name="id6">introduction</a></li>
<li><a class="reference" href="#features" id="id7" name="id7">features</a></li>
<li><a class="reference" href="#portability" id="id8" name="id8">portability</a></li>
<li><a class="reference" href="#license" id="id9" name="id9">license</a></li>
<li><a class="reference" href="#introduction" id="id4">introduction</a></li>
<li><a class="reference" href="#features" id="id5">features</a></li>
<li><a class="reference" href="#portability" id="id6">portability</a></li>
<li><a class="reference" href="#license" id="id7">license</a></li>
</ul>
</div>
<div class="section">
<h1><a id="introduction" name="introduction">introduction</a></h1>
<div class="section" id="introduction">
<h1>introduction</h1>
<p>libtorrent is a C++ library that aims to be a good alternative to all the
other bittorrent implementations around. It is a
library and not a full featured client, although it comes with a working
@ -41,8 +41,8 @@ example client.</p>
<li>to be very easy to use</li>
</ul>
</div>
<div class="section">
<h1><a id="features" name="features">features</a></h1>
<div class="section" id="features">
<h1>features</h1>
<p>libtorrent is still being developed, however it is stable. It is an ongoing
project (including this documentation). The current state includes the
following features:</p>
@ -67,7 +67,7 @@ as well as all local peers in a separate fast-resume file.</li>
This means it can download parts of the same piece from different peers.
It will also prefer to download whole pieces from single peers if the
download speed is high enough from that particular peer.</li>
<li>supports the <a class="reference" href="extension_protocol.html">udp-tracker protocol</a> by Olaf van der Spek.</li>
<li>supports the <a class="reference" href="udp_tracker_protocol.html">udp-tracker protocol</a> by Olaf van der Spek.</li>
<li>queues torrents for file check, instead of checking all of them in parallel.</li>
<li>supports http proxies and basic proxy authentication</li>
<li>gzipped tracker-responses</li>
@ -76,7 +76,7 @@ unchoked peers</li>
<li>implements fair trade. User settable trade-ratio, must at least be 1:1,
but one can choose to trade 1 for 2 or any other ratio that isn't unfair
to the other party.</li>
<li>supports an <a class="reference" href="udp_tracker_protocol.html">extension protocol</a>. See <a class="reference" href="manual.html#extensions">extensions</a>.</li>
<li>supports an <a class="reference" href="extension_protocol.html">extension protocol</a>. See <a class="reference" href="manual.html#extensions">extensions</a>.</li>
<li>supports the <tt class="docutils literal"><span class="pre">no_peer_id=1</span></tt> extension that will ease the load off trackers.</li>
<li>possibility to limit the number of connections.</li>
<li>delays have messages if there's no other outgoing traffic to the peer, and
@ -91,8 +91,8 @@ want to download.</li>
being connected</li>
</ul>
</div>
<div class="section">
<h1><a id="portability" name="portability">portability</a></h1>
<div class="section" id="portability">
<h1>portability</h1>
<p>libtorrent is portable at least among Windows, MacOS X and other UNIX-systems.
It uses Boost.Thread, Boost.Filesystem, Boost.Date_time and various other
boost libraries as well as <a class="reference" href="http://www.zlib.org">zlib</a> (shipped) and <a class="reference" href="http://asio.sf.net">asio</a> (shipped). At least version
@ -114,8 +114,8 @@ epoll on linux and kqueue on MacOS X and BSD.</p>
<li>msvc6</li>
</ul>
</div>
<div class="section">
<h1><a id="license" name="license">license</a></h1>
<div class="section" id="license">
<h1>license</h1>
<p>libtorrent is released under the <a class="reference" href="http://www.opensource.org/licenses/bsd-license.php">BSD-license</a>.</p>
<p>This means that you can use the library in your project without having to
release its source code. The only requirement is that you give credit

File diff suppressed because it is too large Load Diff

View File

@ -388,6 +388,7 @@ struct has the following members::
int dht_nodes;
int dht_cache_nodes;
int dht_torrents;
int dht_global_nodes;
};
``has_incoming_connections`` is false as long as no incoming connections have been
@ -414,6 +415,8 @@ becomes unresponsive.
``dht_torrents`` are the number of torrents tracked by the DHT at the moment.
``dht_global_nodes`` is an estimation of the total number of nodes in the DHT
network.
is_listening() listen_port() listen_on()
----------------------------------------

View File

@ -116,6 +116,8 @@ public:
node_id const& nid() const { return m_id; }
boost::tuple<int, int> size() const{ return m_table.size(); }
size_type num_global_nodes() const
{ return m_table.num_global_nodes(); }
data_iterator begin_data() { return m_map.begin(); }
data_iterator end_data() { return m_map.end(); }

View File

@ -49,6 +49,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include <libtorrent/kademlia/node_id.hpp>
#include <libtorrent/kademlia/node_entry.hpp>
#include <libtorrent/session_settings.hpp>
#include <libtorrent/size_type.hpp>
namespace libtorrent { namespace dht
{
@ -201,16 +202,20 @@ public:
iterator end() const;
boost::tuple<int, int> size() const;
size_type num_global_nodes() const;
// returns true if there are no working nodes
// in the routing table
bool need_bootstrap() const;
int num_active_buckets() const
{ return 160 - m_lowest_active_bucket + 1; }
void replacement_cache(bucket_t& nodes) const;
#ifdef TORRENT_DHT_VERBOSE_LOGGING
// used for debug and monitoring purposes. This will print out
// the state of the routing table to the given stream
void print_state(std::ostream& os) const;
#endif
private:

View File

@ -69,7 +69,7 @@ namespace messages
struct msg
{
msg() : reply(false), piggy_backed_ping(false)
, port(0) {}
, message_id(-1), port(0) {}
// true if this message is a reply
bool reply;
@ -158,8 +158,8 @@ public:
void invoke(int message_id, udp::endpoint target
, boost::shared_ptr<observer> o);
void reply(msg& m, msg const& reply_to);
void reply_with_ping(msg& m, msg const& reply_to);
void reply(msg& m);
void reply_with_ping(msg& m);
#ifndef NDEBUG
void check_invariant() const;

View File

@ -59,6 +59,7 @@ namespace libtorrent
int dht_nodes;
int dht_node_cache;
int dht_torrents;
size_type dht_global_nodes;
#endif
};

View File

@ -207,7 +207,7 @@ namespace libtorrent { namespace dht
m_connection_timer.async_wait(m_strand.wrap(
bind(&dht_tracker::connection_timeout, self(), _1)));
m_refresh_timer.expires_from_now(minutes(15));
m_refresh_timer.expires_from_now(seconds(5));
m_refresh_timer.async_wait(m_strand.wrap(bind(&dht_tracker::refresh_timeout, self(), _1)));
m_dht.bootstrap(initial_nodes, bind(&dht_tracker::on_bootstrap, self()));
@ -225,6 +225,7 @@ namespace libtorrent { namespace dht
{
boost::tie(s.dht_nodes, s.dht_node_cache) = m_dht.size();
s.dht_torrents = m_dht.data_size();
s.dht_global_nodes = m_dht.num_global_nodes();
}
void dht_tracker::connection_timeout(asio::error_code const& e)
@ -442,7 +443,7 @@ namespace libtorrent { namespace dht
}
else
{
TORRENT_LOG(dht_tracker) << " client: generic";
TORRENT_LOG(dht_tracker) << " client: " << client;
}
}
catch (std::exception&)
@ -617,6 +618,7 @@ namespace libtorrent { namespace dht
TORRENT_LOG(dht_tracker) << " error: " << m.error_code << " "
<< m.error_msg;
#endif
throw std::runtime_error("DHT error message");
}
else
{
@ -635,7 +637,7 @@ namespace libtorrent { namespace dht
}
TORRENT_LOG(dht_tracker) << e;
#endif
assert(m.message_id != messages::error);
m_dht.incoming(m);
}
catch (std::exception& e)
@ -774,16 +776,15 @@ namespace libtorrent { namespace dht
}
void dht_tracker::send_packet(msg const& m)
try
{
using libtorrent::bencode;
using libtorrent::entry;
entry e(entry::dictionary_t);
e["t"] = m.transaction_id;
std::string version_str("LT ");
std::string::iterator i = version_str.begin() + 2;
detail::write_uint8(LIBTORRENT_VERSION_MAJOR, i);
detail::write_uint8(LIBTORRENT_VERSION_MINOR, i);
e["v"] = version_str;
static char const version_str[] = {'L', 'T'
, LIBTORRENT_VERSION_MAJOR, LIBTORRENT_VERSION_MINOR};
e["v"] = std::string(version_str, version_str + 4);
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(dht_tracker) << time_now_string()
@ -796,6 +797,7 @@ namespace libtorrent { namespace dht
assert(m.reply);
e["y"] = "e";
entry error_list(entry::list_t);
assert(m.error_code > 200 && m.error_code <= 204);
error_list.list().push_back(entry(m.error_code));
error_list.list().push_back(entry(m.error_msg));
e["e"] = error_list;
@ -914,8 +916,10 @@ namespace libtorrent { namespace dht
m_send_buf.clear();
bencode(std::back_inserter(m_send_buf), e);
asio::error_code ec;
m_socket.send_to(asio::buffer(&m_send_buf[0]
, (int)m_send_buf.size()), m.addr);
, (int)m_send_buf.size()), m.addr, 0, ec);
if (ec) return;
#ifdef TORRENT_DHT_VERBOSE_LOGGING
m_total_out_bytes += m_send_buf.size();
@ -944,6 +948,13 @@ namespace libtorrent { namespace dht
send_packet(pm);
}
catch (std::exception&)
{
// m_send may fail with "no route to host"
// but it shouldn't throw since an error code
// is passed in instead
assert(false);
}
}}

View File

@ -381,17 +381,15 @@ time_duration node_impl::refresh_timeout()
for (int i = 0; i < 160; ++i)
{
ptime r = m_table.next_refresh(i);
if (r <= now)
{
if (refresh == -1) refresh = i;
}
else if (r < next)
if (r <= next)
{
refresh = i;
next = r;
}
}
if (refresh != -1)
if (next < now)
{
assert(refresh > -1);
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(node) << "refreshing bucket: " << refresh;
#endif
@ -400,8 +398,18 @@ time_duration node_impl::refresh_timeout()
}
catch (std::exception&) {}
if (next < now + seconds(5)) return seconds(5);
return next - now;
time_duration next_refresh = next - now;
time_duration min_next_refresh
= minutes(15) / (m_table.num_active_buckets());
if (next_refresh < min_next_refresh)
next_refresh = min_next_refresh;
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(node) << "next refresh: " << total_seconds(next_refresh) << " seconds";
#endif
return next_refresh;
}
time_duration node_impl::connection_timeout()
@ -493,6 +501,11 @@ bool node_impl::on_find(msg const& m, std::vector<tcp::endpoint>& peers) const
void node_impl::incoming_request(msg const& m)
{
msg reply;
reply.message_id = m.message_id;
reply.addr = m.addr;
reply.reply = true;
reply.transaction_id = m.transaction_id;
switch (m.message_id)
{
case messages::ping:
@ -532,16 +545,16 @@ void node_impl::incoming_request(msg const& m)
}
break;
case messages::announce_peer:
{
on_announce(m, reply);
}
break;
default:
assert(false);
};
if (m_table.need_node(m.id))
m_rpc.reply_with_ping(reply, m);
m_rpc.reply_with_ping(reply);
else
m_rpc.reply(reply, m);
m_rpc.reply(reply);
}

View File

@ -63,7 +63,8 @@ routing_table::routing_table(node_id const& id, int bucket_size
// distribute the refresh times for the buckets in an
// attempt do even out the network load
for (int i = 0; i < 160; ++i)
m_bucket_activity[i] = time_now() - seconds(15*60 - i*5);
m_bucket_activity[i] = time_now() - milliseconds(i*5625);
m_bucket_activity[0] = time_now() - minutes(15);
}
boost::tuple<int, int> routing_table::size() const
@ -79,14 +80,33 @@ boost::tuple<int, int> routing_table::size() const
return boost::make_tuple(nodes, replacements);
}
size_type routing_table::num_global_nodes() const
{
int first_full = m_lowest_active_bucket;
int num_nodes = 1; // we are one of the nodes
for (; first_full < 160 && m_buckets[first_full].first.size() < m_bucket_size;
++first_full)
{
num_nodes += m_buckets[first_full].first.size();
}
return (2 << (160 - first_full)) * num_nodes;
}
#ifdef TORRENT_DHT_VERBOSE_LOGGING
void routing_table::print_state(std::ostream& os) const
{
os << "kademlia routing table state\n"
<< "bucket_size: " << m_bucket_size << "\n"
<< "global node count: " << num_global_nodes() << "\n"
<< "node_id: " << m_id << "\n\n";
os << "number of nodes per bucket:\n"
"live\n";
os << "number of nodes per bucket:\n-- live ";
for (int i = 8; i < 160; ++i)
os << "-";
os << "\n";
for (int k = 0; k < 8; ++k)
{
for (table_t::const_iterator i = m_buckets.begin(), end(m_buckets.end());
@ -111,16 +131,20 @@ void routing_table::print_state(std::ostream& os) const
}
os << "\n";
}
os << "cached\n-----------\n";
os << "-- cached ";
for (int i = 10; i < 160; ++i)
os << "-";
os << "\n\n";
os << "nodes:\n";
for (table_t::const_iterator i = m_buckets.begin(), end(m_buckets.end());
i != end; ++i)
{
int bucket_index = int(i - m_buckets.begin());
os << "bucket " << bucket_index << " "
<< " " << (bucket_index >= m_lowest_active_bucket?"active":"inactive")
<< "\n";
os << "=== BUCKET = " << bucket_index
<< " = " << (bucket_index >= m_lowest_active_bucket?"active":"inactive")
<< " = " << total_seconds(time_now() - m_bucket_activity[bucket_index])
<< " s ago ===== \n";
for (bucket_t::const_iterator j = i->first.begin()
, end(i->first.end()); j != end; ++j)
{
@ -130,6 +154,8 @@ void routing_table::print_state(std::ostream& os) const
}
}
#endif
void routing_table::touch_bucket(int bucket)
{
m_bucket_activity[bucket] = time_now();
@ -241,7 +267,7 @@ bool routing_table::node_seen(node_id const& id, udp::endpoint addr)
bool ret = need_bootstrap();
m_bucket_activity[bucket_index] = time_now();
//m_bucket_activity[bucket_index] = time_now();
if (i != b.end())
{

View File

@ -133,7 +133,7 @@ bool rpc_manager::incoming(msg const& m)
|| tid < 0)
{
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(rpc) << "Reply with unknown transaction id: "
TORRENT_LOG(rpc) << "Reply with invalid transaction id: "
<< tid << " from " << m.addr;
#endif
return false;
@ -145,7 +145,7 @@ bool rpc_manager::incoming(msg const& m)
{
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(rpc) << "Reply with unknown transaction id: "
<< tid << " from " << m.addr;
<< tid << " from " << m.addr << " (possibly timed out)";
#endif
return false;
}
@ -174,17 +174,16 @@ bool rpc_manager::incoming(msg const& m)
msg ph;
ph.message_id = messages::ping;
ph.transaction_id = m.ping_transaction_id;
ph.id = m_our_id;
ph.addr = m.addr;
ph.reply = true;
msg empty;
reply(empty, ph);
reply(ph);
}
return m_table.node_seen(m.id, m.addr);
}
else
{
assert(m.message_id != messages::error);
// this is an incoming request
m_incoming(m);
}
@ -326,19 +325,15 @@ void rpc_manager::invoke(int message_id, udp::endpoint target_addr
}
}
void rpc_manager::reply(msg& m, msg const& reply_to)
void rpc_manager::reply(msg& m)
{
INVARIANT_CHECK;
if (m_destructing) return;
if (m.message_id != messages::error)
m.message_id = reply_to.message_id;
m.addr = reply_to.addr;
m.reply = true;
assert(m.reply);
m.piggy_backed_ping = false;
m.id = m_our_id;
m.transaction_id = reply_to.transaction_id;
m_send(m);
}
@ -354,22 +349,16 @@ namespace
};
}
void rpc_manager::reply_with_ping(msg& m, msg const& reply_to)
void rpc_manager::reply_with_ping(msg& m)
{
INVARIANT_CHECK;
if (m_destructing) return;
assert(m.reply);
if (m.message_id != messages::error)
m.message_id = reply_to.message_id;
m.addr = reply_to.addr;
m.reply = true;
m.piggy_backed_ping = true;
m.id = m_our_id;
m.transaction_id = reply_to.transaction_id;
try
{
m.ping_transaction_id.clear();
std::back_insert_iterator<std::string> out(m.ping_transaction_id);
io::write_uint16(m_next_transaction_id, out);
@ -382,11 +371,6 @@ void rpc_manager::reply_with_ping(msg& m, msg const& reply_to)
m_send(m);
new_transaction_id(o);
}
catch (std::exception& e)
{
// m_send may fail with "no route to host"
}
}

View File

@ -1544,6 +1544,7 @@ namespace libtorrent { namespace detail
s.dht_nodes = 0;
s.dht_node_cache = 0;
s.dht_torrents = 0;
s.dht_global_nodes = 0;
}
#endif