added more bandwidth monitoring of DHT and TCP/IP overhead. added monitoring of the running DHT lookups

This commit is contained in:
Arvid Norberg 2008-09-20 17:42:25 +00:00
parent 98f9d5826c
commit 0338510b9b
22 changed files with 472 additions and 274 deletions

View File

@ -1,3 +1,8 @@
* added monitoring of the DHT lookups
* added bandwidth reports for estimated TCP/IP overhead and DHT
* includes DHT traffic in the rate limiter
release 0.14
* Removed 'connecting_to_tracker' torrent state
* Fix bug where FAST pieces were cancelled on choke

View File

@ -491,25 +491,42 @@ status()
``status()`` returns session wide-statistics and status. The ``session_status``
struct has the following members::
struct dht_lookup
{
char const* type;
int outstanding_requests;
int timeouts;
int responses;
int branch_factor;
};
struct session_status
{
bool has_incoming_connections;
float upload_rate;
float download_rate;
float payload_upload_rate;
float payload_download_rate;
size_type total_download;
size_type total_upload;
size_type total_redundant_bytes;
size_type total_failed_bytes;
float payload_upload_rate;
float payload_download_rate;
size_type total_payload_download;
size_type total_payload_upload;
float ip_overhead_upload_rate;
float ip_overhead_download_rate;
size_type total_ip_overhead_download;
size_type total_ip_overhead_upload;
float dht_upload_rate;
float dht_download_rate;
size_type total_dht_download;
size_type total_dht_upload;
size_type total_redundant_bytes;
size_type total_failed_bytes;
int num_peers;
int num_unchoked;
int allowed_upload_slots;
@ -518,19 +535,32 @@ struct has the following members::
int dht_cache_nodes;
int dht_torrents;
int dht_global_nodes;
std::vector<dht_lookup> active_requests;
};
``has_incoming_connections`` is false as long as no incoming connections have been
established on the listening socket. Every time you change the listen port, this will
be reset to false.
``upload_rate``, ``download_rate``, ``payload_download_rate`` and ``payload_upload_rate``
are the total download and upload rates accumulated from all torrents. The payload
versions is the payload download only.
``upload_rate``, ``download_rate`` are the total download and upload rates accumulated
from all torrents. This includes bittorrent protocol, DHT and an estimated TCP/IP
protocol overhead.
``total_download`` and ``total_upload`` are the total number of bytes downloaded and
uploaded to and from all torrents. ``total_payload_download`` and ``total_payload_upload``
are the same thing but where only the payload is considered.
uploaded to and from all torrents. This also includes all the protocol overhead.
``payload_download_rate`` and ``payload_upload_rate`` is the rate of the payload
down- and upload only.
``total_payload_download`` and ``total_payload_upload`` is the total transfers of payload
only. The payload does not include the bittorrent protocol overhead, but only parts of the
actual files to be downloaded.
``ip_overhead_upload_rate``, ``ip_overhead_download_rate``, ``total_ip_overhead_download``
and ``total_ip_overhead_upload`` is the estimated TCP/IP overhead in each direction.
``dht_upload_rate``, ``dht_download_rate``, ``total_dht_download`` and ``total_dht_upload``
is the DHT bandwidth usage.
``total_redundant_bytes`` is the number of bytes that has been received more than once.
This can happen if a request from a peer times out and is requested from a different
@ -562,6 +592,9 @@ becomes unresponsive.
``dht_global_nodes`` is an estimation of the total number of nodes in the DHT
network.
``active_requests`` is a vector of the currently running DHT lookups.
get_cache_status()
------------------
@ -3848,6 +3881,7 @@ is its synopsis:
progress_notification = *implementation defined*,
ip_block_notification = *implementation defined*,
performance_warning = *implementation defined*,
dht_notification = *implementation defined*,
all_categories = *implementation defined*
};
@ -4364,6 +4398,36 @@ generating the resume data. ``msg`` describes what went wrong.
std::string msg;
};
dht_announce_alert
------------------
This alert is generated when a DHT node announces to an info-hash on our DHT node. It belongs
to the ``dht_notification`` category.
::
struct dht_announce_alert: alert
{
// ...
address ip;
int port;
sha1_hash info_hash;
};
dht_get_peers_alert
-------------------
This alert is generated when a DHT node sends a ``get_peers`` message to our DHT node.
It belongs to the ``dht_notification`` category.
::
struct dht_get_peers_alert: alert
{
// ...
sha1_hash info_hash;
};
dispatcher
----------

View File

@ -159,6 +159,7 @@ bool print_log = false;
bool print_downloads = false;
bool print_piece_bar = false;
bool print_file_progress = false;
bool show_dht_status = false;
bool sequential_download = false;
bool print_ip = true;
@ -1186,6 +1187,7 @@ int main(int ac, char* av[])
if (c == 'd') print_downloads = !print_downloads;
if (c == 'f') print_file_progress = !print_file_progress;
if (c == 'a') print_piece_bar = !print_piece_bar;
if (c == 'g') show_dht_status = !show_dht_status;
// toggle columns
if (c == '1') print_ip = !print_ip;
if (c == '2') print_as = !print_as;
@ -1229,7 +1231,7 @@ int main(int ac, char* av[])
std::stringstream out;
out << "[q] quit [i] toggle peers [d] toggle downloading pieces [p] toggle paused "
"[a] toggle piece bar [s] toggle download sequential [f] toggle files "
"[j] force recheck [space] toggle session pause [c] clear error [v] scrape\n"
"[j] force recheck [space] toggle session pause [c] clear error [v] scrape [g] show DHT\n"
"[1] toggle IP [2] toggle AS [3] toggle timers [4] toggle block progress "
"[5] toggle peer rate [6] toggle failures [7] toggle send buffers\n";
@ -1360,10 +1362,16 @@ int main(int ac, char* av[])
out << "==== conns: " << sess_stat.num_peers
<< " down: " << esc("32") << add_suffix(sess_stat.download_rate) << "/s" << esc("0")
<< " (" << esc("32") << add_suffix(sess_stat.total_download) << esc("0") << ") "
" up: " << esc("31") << add_suffix(sess_stat.upload_rate) << "/s " << esc("0")
<< " (" << esc("32") << add_suffix(sess_stat.total_download) << esc("0") << ")"
" up: " << esc("31") << add_suffix(sess_stat.upload_rate) << "/s" << esc("0")
<< " (" << esc("31") << add_suffix(sess_stat.total_upload) << esc("0") << ")"
" waste: " << add_suffix(sess_stat.total_redundant_bytes)
" tcp/ip: "
<< esc("32") << add_suffix(sess_stat.ip_overhead_download_rate) << "/s" << esc("0") << " "
<< esc("31") << add_suffix(sess_stat.ip_overhead_upload_rate) << "/s" << esc("0")
<< " DHT: "
<< esc("32") << add_suffix(sess_stat.dht_download_rate) << "/s" << esc("0") << " "
<< esc("31") << add_suffix(sess_stat.dht_upload_rate) << "/s" << esc("0") << " ====\n"
"==== waste: " << add_suffix(sess_stat.total_redundant_bytes)
<< " fail: " << add_suffix(sess_stat.total_failed_bytes)
<< " unchoked: " << sess_stat.num_unchoked << " / " << sess_stat.allowed_upload_slots
<< " bw queues: (" << sess_stat.up_bandwidth_queue
@ -1374,6 +1382,20 @@ int main(int ac, char* av[])
<< " (" << add_suffix(cs.read_cache_size * 16 * 1024) << ")"
" ====" << std::endl;
if (show_dht_status)
{
out << "DHT nodes: " << sess_stat.dht_nodes
<< " DHT cached nodes: " << sess_stat.dht_node_cache
<< " total DHT size: " << sess_stat.dht_global_nodes << std::endl;
for (std::vector<dht_lookup>::iterator i = sess_stat.active_requests.begin()
, end(sess_stat.active_requests.end()); i != end; ++i)
{
out << " " << i->type << " " << i->outstanding_requests << " ("
<< i->branch_factor << ") ( timeouts "
<< i->timeouts << " responses " << i->responses << ")\n";
}
}
if (active_handle.is_valid())
{
torrent_handle h = active_handle;

View File

@ -83,6 +83,7 @@ namespace libtorrent {
progress_notification = 0x80,
ip_block_notification = 0x100,
performance_warning = 0x200,
dht_notification = 0x400,
all_categories = 0xffffffff
};

View File

@ -1122,6 +1122,54 @@ namespace libtorrent
return "blocked peer: " + ip.to_string(ec);
}
};
struct TORRENT_EXPORT dht_announce_alert: alert
{
dht_announce_alert(address const& ip_, int port_
, sha1_hash const& info_hash_)
: ip(ip_)
, port(port_)
, info_hash(info_hash_)
{}
address ip;
int port;
sha1_hash info_hash;
virtual std::auto_ptr<alert> clone() const
{ return std::auto_ptr<alert>(new dht_announce_alert(*this)); }
virtual char const* what() const { return "incoming dht announce"; }
const static int static_category = alert::dht_notification;
virtual int category() const { return static_category; }
virtual std::string message() const
{
error_code ec;
return "incoming dht annonce: " + ip.to_string(ec) + ":"
+ boost::lexical_cast<std::string>(port) + " ("
+ boost::lexical_cast<std::string>(info_hash) + ")";
}
};
struct TORRENT_EXPORT dht_get_peers_alert: alert
{
dht_get_peers_alert(sha1_hash const& info_hash_)
: info_hash(info_hash_)
{}
sha1_hash info_hash;
virtual std::auto_ptr<alert> clone() const
{ return std::auto_ptr<alert>(new dht_get_peers_alert(*this)); }
virtual char const* what() const { return "incoming dht get_peers request"; }
const static int static_category = alert::dht_notification;
virtual int category() const { return static_category; }
virtual std::string message() const
{
error_code ec;
return "incoming dht get_peers: "
+ boost::lexical_cast<std::string>(info_hash);
}
};
}

View File

@ -57,28 +57,18 @@ public:
void(std::vector<node_entry> const&)
> done_callback;
static void initiate(
node_id target
, int branch_factor
, int max_results
, routing_table& table
, rpc_manager& rpc
closest_nodes(
node_impl& node
, node_id target
, done_callback const& callback
);
virtual char const* name() const { return "closest nodes"; }
private:
void done();
void invoke(node_id const& id, udp::endpoint addr);
closest_nodes(
node_id target
, int branch_factor
, int max_results
, routing_table& table
, rpc_manager& rpc
, done_callback const& callback
);
done_callback m_done_callback;
};

View File

@ -55,6 +55,8 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/udp_socket.hpp"
#include "libtorrent/socket.hpp"
namespace libtorrent { namespace aux { struct session_impl; } }
namespace libtorrent { namespace dht
{
@ -71,7 +73,8 @@ namespace libtorrent { namespace dht
{
friend void intrusive_ptr_add_ref(dht_tracker const*);
friend void intrusive_ptr_release(dht_tracker const*);
dht_tracker(udp_socket& sock, dht_settings const& settings);
dht_tracker(libtorrent::aux::session_impl& ses, udp_socket& sock
, dht_settings const& settings);
void start(entry const& bootstrap);
void stop();
@ -110,6 +113,7 @@ namespace libtorrent { namespace dht
void send_packet(msg const& m);
node_impl m_dht;
libtorrent::aux::session_impl& m_ses;
udp_socket& m_sock;
std::vector<char> m_send_buf;

View File

@ -51,6 +51,7 @@ namespace libtorrent { namespace dht
typedef std::vector<char> packet_t;
class rpc_manager;
class node_impl;
// -------- find data -----------
@ -59,30 +60,18 @@ class find_data : public traversal_algorithm
public:
typedef boost::function<void(msg const*)> done_callback;
static void initiate(
node_id target
, int branch_factor
, int max_results
, routing_table& table
, rpc_manager& rpc
, done_callback const& callback
);
void got_data(msg const* m);
find_data(node_impl& node, node_id target
, done_callback const& callback);
virtual char const* name() const { return "get_peers"; }
private:
void done();
void invoke(node_id const& id, udp::endpoint addr);
find_data(
node_id target
, int branch_factor
, int max_results
, routing_table& table
, rpc_manager& rpc
, done_callback const& callback
);
done_callback m_done_callback;
boost::shared_ptr<packet_t> m_packet;
bool m_done;

View File

@ -53,6 +53,13 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/socket.hpp"
namespace libtorrent {
namespace aux { struct session_impl; }
struct session_status;
}
namespace libtorrent { namespace dht
{
@ -60,6 +67,8 @@ namespace libtorrent { namespace dht
TORRENT_DECLARE_LOG(node);
#endif
struct traversal_algorithm;
// this is the entry for every peer
// the timestamp is there to make it possible
// to remove stale peers
@ -154,13 +163,11 @@ private:
boost::function<void(std::vector<tcp::endpoint> const&, sha1_hash const&)> m_fun;
};
class node_impl : boost::noncopyable
{
typedef std::map<node_id, torrent_entry> table_t;
public:
node_impl(boost::function<void(msg const&)> const& f
node_impl(libtorrent::aux::session_impl& ses, boost::function<void(msg const&)> const& f
, dht_settings const& settings);
virtual ~node_impl() {}
@ -225,6 +232,16 @@ public:
void replacement_cache(bucket_t& nodes) const
{ m_table.replacement_cache(nodes); }
int branch_factor() const { return m_settings.search_branching; }
void add_traversal_algorithm(traversal_algorithm* a)
{ m_running_requests.insert(a); }
void remove_traversal_algorithm(traversal_algorithm* a)
{ m_running_requests.erase(a); }
void status(libtorrent::session_status& s);
protected:
// is called when a find data request is received. Should
// return false if the data is not stored on this node. If
@ -243,17 +260,27 @@ protected:
int m_max_peers_reply;
private:
// this list must be destructed after the rpc manager
// since it might have references to it
std::set<traversal_algorithm*> m_running_requests;
void incoming_request(msg const& h);
node_id m_id;
public:
routing_table m_table;
rpc_manager m_rpc;
private:
table_t m_map;
ptime m_last_tracker_tick;
// secret random numbers used to create write tokens
int m_secret[2];
libtorrent::aux::session_impl& m_ses;
};

View File

@ -57,35 +57,16 @@ class refresh : public traversal_algorithm
public:
typedef boost::function<void()> done_callback;
template<class InIt>
static void initiate(
node_id target
, int branch_factor
, int max_active_pings
, int max_results
, routing_table& table
, InIt first
, InIt last
, rpc_manager& rpc
, done_callback const& callback
);
void ping_reply(node_id id);
void ping_timeout(node_id id, bool prevent_request = false);
private:
template<class InIt>
refresh(
node_id target
, int branch_factor
, int max_active_pings
, int max_results
, routing_table& table
, InIt first
, InIt last
, rpc_manager& rpc
, done_callback const& callback
);
refresh(node_impl& node, node_id target, InIt first, InIt last
, done_callback const& callback);
virtual char const* name() const { return "refresh"; }
private:
void done();
void invoke(node_id const& id, udp::endpoint addr);
@ -155,26 +136,13 @@ private:
template<class InIt>
inline refresh::refresh(
node_id target
, int branch_factor
, int max_active_pings
, int max_results
, routing_table& table
node_impl& node
, node_id target
, InIt first
, InIt last
, rpc_manager& rpc
, done_callback const& callback
)
: traversal_algorithm(
target
, branch_factor
, max_results
, table
, rpc
, first
, last
)
, m_max_active_pings(max_active_pings)
, done_callback const& callback)
: traversal_algorithm(node, target, first, last)
, m_max_active_pings(10)
, m_active_pings(0)
, m_done_callback(callback)
{
@ -182,32 +150,6 @@ inline refresh::refresh(
add_requests();
}
template<class InIt>
inline void refresh::initiate(
node_id target
, int branch_factor
, int max_active_pings
, int max_results
, routing_table& table
, InIt first
, InIt last
, rpc_manager& rpc
, done_callback const& callback
)
{
new refresh(
target
, branch_factor
, max_active_pings
, max_results
, table
, first
, last
, rpc
, callback
);
}
} } // namespace libtorrent::dht
#endif // REFRESH_050324_HPP

View File

@ -45,6 +45,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include <boost/bind.hpp>
#include <boost/pool/pool.hpp>
namespace libtorrent { struct dht_lookup; }
namespace libtorrent { namespace dht
{
#ifdef TORRENT_DHT_VERBOSE_LOGGING
@ -52,6 +53,7 @@ TORRENT_DECLARE_LOG(traversal);
#endif
class rpc_manager;
class node_impl;
// this class may not be instantiated as a stack object
class traversal_algorithm : boost::noncopyable
@ -60,23 +62,21 @@ public:
void traverse(node_id const& id, udp::endpoint addr);
void finished(node_id const& id);
void failed(node_id const& id, bool prevent_request = false);
virtual ~traversal_algorithm() {}
virtual ~traversal_algorithm();
boost::pool<>& allocator() const;
void status(dht_lookup& l);
virtual char const* name() const { return "traversal_algorithm"; }
protected:
template<class InIt>
traversal_algorithm(
node_id target
, int branch_factor
, int max_results
, routing_table& table
, rpc_manager& rpc
, InIt start
, InIt end
);
traversal_algorithm(node_impl& node, node_id target, InIt start, InIt end);
void add_requests();
void add_entry(node_id const& id, udp::endpoint addr, unsigned char flags);
void add_router_entries();
void init();
virtual void done() = 0;
virtual void invoke(node_id const& id, udp::endpoint addr) = 0;
@ -107,33 +107,29 @@ protected:
int m_ref_count;
node_impl& m_node;
node_id m_target;
int m_branch_factor;
int m_max_results;
std::vector<result> m_results;
std::set<udp::endpoint> m_failed;
routing_table& m_table;
rpc_manager& m_rpc;
int m_invoke_count;
int m_branch_factor;
int m_responses;
int m_timeouts;
};
template<class InIt>
traversal_algorithm::traversal_algorithm(
node_id target
, int branch_factor
, int max_results
, routing_table& table
, rpc_manager& rpc
node_impl& node
, node_id target
, InIt start // <- nodes to initiate traversal with
, InIt end
)
, InIt end)
: m_ref_count(0)
, m_node(node)
, m_target(target)
, m_branch_factor(branch_factor)
, m_max_results(max_results)
, m_table(table)
, m_rpc(rpc)
, m_invoke_count(0)
, m_branch_factor(3)
, m_responses(0)
, m_timeouts(0)
{
using boost::bind;
@ -144,15 +140,8 @@ traversal_algorithm::traversal_algorithm(
// in case the routing table is empty, use the
// router nodes in the table
if (start == end)
{
for (routing_table::router_iterator i = table.router_begin()
, end(table.router_end()); i != end; ++i)
{
add_entry(node_id(0), *i, result::initial);
}
}
if (start == end) add_router_entries();
init();
}
} } // namespace libtorrent::dht

View File

@ -37,22 +37,43 @@ POSSIBILITY OF SUCH DAMAGE.
namespace libtorrent
{
#ifndef TORRENT_DISABLE_DHT
struct dht_lookup
{
char const* type;
int outstanding_requests;
int timeouts;
int responses;
int branch_factor;
};
#endif
struct TORRENT_EXPORT session_status
{
bool has_incoming_connections;
float upload_rate;
float download_rate;
float payload_upload_rate;
float payload_download_rate;
size_type total_download;
size_type total_upload;
float payload_upload_rate;
float payload_download_rate;
size_type total_payload_download;
size_type total_payload_upload;
float ip_overhead_upload_rate;
float ip_overhead_download_rate;
size_type total_ip_overhead_download;
size_type total_ip_overhead_upload;
float dht_upload_rate;
float dht_download_rate;
size_type total_dht_download;
size_type total_dht_upload;
size_type total_redundant_bytes;
size_type total_failed_bytes;
@ -68,6 +89,7 @@ namespace libtorrent
int dht_node_cache;
int dht_torrents;
size_type dht_global_nodes;
std::vector<dht_lookup> active_requests;
#endif
};

View File

@ -122,6 +122,18 @@ namespace libtorrent
m_stat[i] += s.m_stat[i];
}
void received_dht_bytes(int bytes)
{
TORRENT_ASSERT(bytes >= 0);
m_stat[download_dht_protocol].add(bytes);
}
void sent_dht_bytes(int bytes)
{
TORRENT_ASSERT(bytes >= 0);
m_stat[upload_dht_protocol].add(bytes);
}
void received_bytes(int bytes_payload, int bytes_protocol)
{
TORRENT_ASSERT(bytes_payload >= 0);
@ -162,6 +174,8 @@ namespace libtorrent
int upload_ip_overhead() const { return m_stat[upload_ip_protocol].counter(); }
int download_ip_overhead() const { return m_stat[download_ip_protocol].counter(); }
int upload_dht() const { return m_stat[upload_dht_protocol].counter(); }
int download_dht() const { return m_stat[download_dht_protocol].counter(); }
// should be called once every second
void second_tick(float tick_interval)
@ -174,7 +188,8 @@ namespace libtorrent
{
return (m_stat[upload_payload].rate_sum()
+ m_stat[upload_protocol].rate_sum()
+ m_stat[upload_ip_protocol].rate_sum())
+ m_stat[upload_ip_protocol].rate_sum()
+ m_stat[upload_dht_protocol].rate_sum())
/ float(stat_channel::history);
}
@ -182,13 +197,29 @@ namespace libtorrent
{
return (m_stat[download_payload].rate_sum()
+ m_stat[download_protocol].rate_sum()
+ m_stat[download_ip_protocol].rate_sum())
+ m_stat[download_ip_protocol].rate_sum()
+ m_stat[download_dht_protocol].rate_sum())
/ float(stat_channel::history);
}
size_type total_upload() const
{
return m_stat[upload_payload].total()
+ m_stat[upload_protocol].total()
+ m_stat[upload_ip_protocol].total()
+ m_stat[upload_dht_protocol].total();
}
size_type total_download() const
{
return m_stat[download_payload].total()
+ m_stat[download_protocol].total()
+ m_stat[download_ip_protocol].total()
+ m_stat[download_dht_protocol].total();
}
float upload_payload_rate() const
{ return m_stat[upload_payload].rate(); }
float download_payload_rate() const
{ return m_stat[download_payload].rate(); }
@ -202,6 +233,11 @@ namespace libtorrent
size_type total_protocol_download() const
{ return m_stat[download_protocol].total(); }
size_type total_transfer(int channel) const
{ return m_stat[channel].total(); }
float transfer_rate(int channel) const
{ return m_stat[channel].rate(); }
// this is used to offset the statistics when a
// peer_connection is opened and have some previous
// transfers from earlier connections.
@ -218,20 +254,22 @@ namespace libtorrent
size_type last_payload_uploaded() const
{ return m_stat[upload_payload].counter(); }
private:
// these are the channels we keep stats for
enum
{
upload_payload,
upload_protocol,
upload_ip_protocol,
upload_dht_protocol,
download_payload,
download_protocol,
download_ip_protocol,
download_dht_protocol,
num_channels
};
private:
stat_channel m_stat[num_channels];
};

View File

@ -230,6 +230,8 @@ namespace libtorrent
bool is_auto_managed() const { return m_auto_managed; }
void auto_managed(bool a);
bool should_check_files() const;
void delete_files();
// ============ start deprecation =============

View File

@ -35,6 +35,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include <libtorrent/kademlia/closest_nodes.hpp>
#include <libtorrent/kademlia/routing_table.hpp>
#include <libtorrent/kademlia/rpc_manager.hpp>
#include <libtorrent/kademlia/node.hpp>
#include "libtorrent/assert.hpp"
namespace libtorrent { namespace dht
@ -72,24 +73,11 @@ void closest_nodes_observer::timeout()
m_algorithm = 0;
}
closest_nodes::closest_nodes(
node_id target
, int branch_factor
, int max_results
, routing_table& table
, rpc_manager& rpc
, done_callback const& callback
)
: traversal_algorithm(
target
, branch_factor
, max_results
, table
, rpc
, table.begin()
, table.end()
)
node_impl& node
, node_id target
, done_callback const& callback)
: traversal_algorithm(node, target, node.m_table.begin(), node.m_table.end())
, m_done_callback(callback)
{
boost::intrusive_ptr<closest_nodes> self(this);
@ -98,18 +86,18 @@ closest_nodes::closest_nodes(
void closest_nodes::invoke(node_id const& id, udp::endpoint addr)
{
TORRENT_ASSERT(m_rpc.allocation_size() >= sizeof(closest_nodes_observer));
observer_ptr o(new (m_rpc.allocator().malloc()) closest_nodes_observer(this, id, m_target));
TORRENT_ASSERT(m_node.m_rpc.allocation_size() >= sizeof(closest_nodes_observer));
observer_ptr o(new (m_node.m_rpc.allocator().malloc()) closest_nodes_observer(this, id, m_target));
#ifndef NDEBUG
o->m_in_constructor = false;
#endif
m_rpc.invoke(messages::find_node, addr, o);
m_node.m_rpc.invoke(messages::find_node, addr, o);
}
void closest_nodes::done()
{
std::vector<node_entry> results;
int num_results = m_max_results;
int num_results = m_node.m_table.bucket_size();
for (std::vector<result>::iterator i = m_results.begin()
, end(m_results.end()); i != end && num_results > 0; ++i)
{
@ -121,17 +109,5 @@ void closest_nodes::done()
m_done_callback(results);
}
void closest_nodes::initiate(
node_id target
, int branch_factor
, int max_results
, routing_table& table
, rpc_manager& rpc
, done_callback const& callback
)
{
new closest_nodes(target, branch_factor, max_results, table, rpc, callback);
}
} } // namespace libtorrent::dht

View File

@ -47,6 +47,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/kademlia/traversal_algorithm.hpp"
#include "libtorrent/kademlia/dht_tracker.hpp"
#include "libtorrent/aux_/session_impl.hpp"
#include "libtorrent/socket.hpp"
#include "libtorrent/bencode.hpp"
#include "libtorrent/io.hpp"
@ -127,8 +128,10 @@ namespace libtorrent { namespace dht
// class that puts the networking and the kademlia node in a single
// unit and connecting them together.
dht_tracker::dht_tracker(udp_socket& sock, dht_settings const& settings)
: m_dht(bind(&dht_tracker::send_packet, this, _1), settings)
dht_tracker::dht_tracker(libtorrent::aux::session_impl& ses, udp_socket& sock
, dht_settings const& settings)
: m_dht(ses, bind(&dht_tracker::send_packet, this, _1), settings)
, m_ses(ses)
, m_sock(sock)
, m_last_new_key(time_now() - minutes(key_refresh))
, m_timer(sock.get_io_service())
@ -217,6 +220,7 @@ namespace libtorrent { namespace dht
boost::tie(s.dht_nodes, s.dht_node_cache) = m_dht.size();
s.dht_torrents = m_dht.data_size();
s.dht_global_nodes = m_dht.num_global_nodes();
m_dht.status(s);
}
void dht_tracker::connection_timeout(error_code const& e)
@ -377,6 +381,11 @@ namespace libtorrent { namespace dht
void dht_tracker::on_receive(udp::endpoint const& ep, char const* buf, int bytes_transferred)
try
{
{
libtorrent::aux::session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
m_ses.m_stat.received_dht_bytes(bytes_transferred);
}
node_ban_entry* match = 0;
node_ban_entry* min = m_ban_nodes;
ptime now = time_now();
@ -957,6 +966,11 @@ namespace libtorrent { namespace dht
error_code ec;
m_sock.send(m.addr, &m_send_buf[0], (int)m_send_buf.size(), ec);
{
libtorrent::aux::session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
m_ses.m_stat.sent_dht_bytes(m_send_buf.size());
}
#ifdef TORRENT_DHT_VERBOSE_LOGGING
m_total_out_bytes += m_send_buf.size();

View File

@ -35,6 +35,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include <libtorrent/kademlia/find_data.hpp>
#include <libtorrent/kademlia/routing_table.hpp>
#include <libtorrent/kademlia/rpc_manager.hpp>
#include <libtorrent/kademlia/node.hpp>
#include <libtorrent/io.hpp>
#include <libtorrent/socket.hpp>
@ -79,22 +80,10 @@ void find_data_observer::timeout()
find_data::find_data(
node_id target
, int branch_factor
, int max_results
, routing_table& table
, rpc_manager& rpc
, done_callback const& callback
)
: traversal_algorithm(
target
, branch_factor
, max_results
, table
, rpc
, table.begin()
, table.end()
)
node_impl& node
, node_id target
, done_callback const& callback)
: traversal_algorithm(node, target, node.m_table.begin(), node.m_table.end())
, m_done_callback(callback)
, m_done(false)
{
@ -110,12 +99,12 @@ void find_data::invoke(node_id const& id, udp::endpoint addr)
return;
}
TORRENT_ASSERT(m_rpc.allocation_size() >= sizeof(find_data_observer));
observer_ptr o(new (m_rpc.allocator().malloc()) find_data_observer(this, id, m_target));
TORRENT_ASSERT(m_node.m_rpc.allocation_size() >= sizeof(find_data_observer));
observer_ptr o(new (m_node.m_rpc.allocator().malloc()) find_data_observer(this, id, m_target));
#ifndef NDEBUG
o->m_in_constructor = false;
#endif
m_rpc.invoke(messages::get_peers, addr, o);
m_node.m_rpc.invoke(messages::get_peers, addr, o);
}
void find_data::got_data(msg const* m)
@ -130,18 +119,5 @@ void find_data::done()
if (!m_done) m_done_callback(0);
}
void find_data::initiate(
node_id target
, int branch_factor
, int max_results
, routing_table& table
, rpc_manager& rpc
, done_callback const& callback
)
{
std::cerr << "find_data::initiate, key: " << target << "\n";
new find_data(target, branch_factor, max_results, table, rpc, callback);
}
} } // namespace libtorrent::dht

View File

@ -41,6 +41,8 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/io.hpp"
#include "libtorrent/hasher.hpp"
#include "libtorrent/random_sample.hpp"
#include "libtorrent/alert_types.hpp"
#include "libtorrent/aux_/session_impl.hpp"
#include "libtorrent/kademlia/node_id.hpp"
#include "libtorrent/kademlia/rpc_manager.hpp"
#include "libtorrent/kademlia/routing_table.hpp"
@ -90,7 +92,8 @@ void purge_peers(std::set<peer_entry>& peers)
void nop() {}
node_impl::node_impl(boost::function<void(msg const&)> const& f
node_impl::node_impl(libtorrent::aux::session_impl& ses
, boost::function<void(msg const&)> const& f
, dht_settings const& settings)
: m_settings(settings)
, m_id(generate_id())
@ -98,6 +101,7 @@ node_impl::node_impl(boost::function<void(msg const&)> const& f
, m_rpc(bind(&node_impl::incoming_request, this, _1)
, m_id, m_table, f)
, m_last_tracker_tick(time_now())
, m_ses(ses)
{
m_secret[0] = std::rand();
m_secret[1] = std::rand();
@ -164,8 +168,7 @@ void node_impl::refresh(node_id const& id
std::vector<node_entry> start;
start.reserve(m_table.bucket_size());
m_table.find_node(id, start, false);
refresh::initiate(id, m_settings.search_branching, 10, m_table.bucket_size()
, m_table, start.begin(), start.end(), m_rpc, f);
new dht::refresh(*this, id, start.begin(), start.end(), f);
}
void node_impl::bootstrap(std::vector<udp::endpoint> const& nodes
@ -180,8 +183,7 @@ void node_impl::bootstrap(std::vector<udp::endpoint> const& nodes
std::vector<node_entry> start;
start.reserve(nodes.size());
std::copy(nodes.begin(), nodes.end(), std::back_inserter(start));
refresh::initiate(m_id, m_settings.search_branching, 10, m_table.bucket_size()
, m_table, start.begin(), start.end(), m_rpc, f);
new dht::refresh(*this, m_id, start.begin(), start.end(), f);
}
void node_impl::refresh()
@ -190,8 +192,7 @@ void node_impl::refresh()
start.reserve(m_table.size().get<0>());
std::copy(m_table.begin(), m_table.end(), std::back_inserter(start));
refresh::initiate(m_id, m_settings.search_branching, 10, m_table.bucket_size()
, m_table, start.begin(), start.end(), m_rpc, bind(&nop));
new dht::refresh(*this, m_id, start.begin(), start.end(), bind(&nop));
}
int node_impl::bucket_size(int bucket)
@ -237,8 +238,7 @@ void node_impl::refresh_bucket(int bucket) try
start.reserve(m_table.bucket_size());
m_table.find_node(target, start, false, m_table.bucket_size());
refresh::initiate(target, m_settings.search_branching, 10, m_table.bucket_size()
, m_table, start.begin(), start.end(), m_rpc, bind(&nop));
new dht::refresh(*this, target, start.begin(), start.end(), bind(&nop));
m_table.touch_bucket(bucket);
}
catch (std::exception&) {}
@ -312,10 +312,8 @@ void node_impl::announce(sha1_hash const& info_hash, int listen_port
#endif
// search for nodes with ids close to id, and then invoke the
// get_peers and then announce_peer rpc on them.
closest_nodes::initiate(info_hash, m_settings.search_branching
, m_table.bucket_size(), m_table, m_rpc
, boost::bind(&announce_fun, _1, boost::ref(m_rpc), listen_port
, info_hash, f));
new closest_nodes(*this, info_hash, boost::bind(&announce_fun, _1, boost::ref(m_rpc)
, listen_port, info_hash, f));
}
time_duration node_impl::refresh_timeout()
@ -338,7 +336,7 @@ time_duration node_impl::refresh_timeout()
{
TORRENT_ASSERT(refresh > -1);
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(node) << "refreshing bucket: " << refresh;
TORRENT_LOG(node) << "refreshing bucket: " << refresh;
#endif
refresh_bucket(refresh);
}
@ -393,6 +391,10 @@ time_duration node_impl::connection_timeout()
void node_impl::on_announce(msg const& m, msg& reply)
{
if (m_ses.m_alerts.should_post<dht_announce_alert>())
m_ses.m_alerts.post_alert(dht_announce_alert(
m.addr.address(), m.port, m.info_hash));
if (!verify_token(m))
{
reply.message_id = messages::error;
@ -423,8 +425,23 @@ namespace
}
}
void node_impl::status(session_status& s)
{
s.active_requests.clear();
for (std::set<traversal_algorithm*>::iterator i = m_running_requests.begin()
, end(m_running_requests.end()); i != end; ++i)
{
s.active_requests.push_back(dht_lookup());
dht_lookup& l = s.active_requests.back();
(*i)->status(l);
}
}
bool node_impl::on_find(msg const& m, std::vector<tcp::endpoint>& peers) const
{
if (m_ses.m_alerts.should_post<dht_get_peers_alert>())
m_ses.m_alerts.post_alert(dht_get_peers_alert(m.info_hash));
table_t::const_iterator i = m_map.find(m.info_hash);
if (i == m_map.end()) return false;

View File

@ -36,6 +36,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include <libtorrent/kademlia/routing_table.hpp>
#include <libtorrent/kademlia/rpc_manager.hpp>
#include <libtorrent/kademlia/logging.hpp>
#include <libtorrent/kademlia/node.hpp>
#include <libtorrent/kademlia/msg.hpp>
#include <libtorrent/io.hpp>
@ -101,20 +102,21 @@ void ping_observer::timeout()
void refresh::invoke(node_id const& nid, udp::endpoint addr)
{
TORRENT_ASSERT(m_rpc.allocation_size() >= sizeof(refresh_observer));
observer_ptr o(new (m_rpc.allocator().malloc()) refresh_observer(
TORRENT_ASSERT(m_node.m_rpc.allocation_size() >= sizeof(refresh_observer));
observer_ptr o(new (m_node.m_rpc.allocator().malloc()) refresh_observer(
this, nid, m_target));
#ifndef NDEBUG
o->m_in_constructor = false;
#endif
m_rpc.invoke(messages::find_node, addr, o);
m_node.m_rpc.invoke(messages::find_node, addr, o);
}
void refresh::done()
{
m_leftover_nodes_iterator = (int)m_results.size() > m_max_results ?
m_results.begin() + m_max_results : m_results.end();
int max_results = m_node.m_table.bucket_size();
m_leftover_nodes_iterator = (int)m_results.size() > max_results ?
m_results.begin() + max_results : m_results.end();
invoke_pings_or_finish();
}
@ -156,13 +158,13 @@ void refresh::invoke_pings_or_finish(bool prevent_request)
try
{
TORRENT_ASSERT(m_rpc.allocation_size() >= sizeof(ping_observer));
observer_ptr o(new (m_rpc.allocator().malloc()) ping_observer(
TORRENT_ASSERT(m_node.m_rpc.allocation_size() >= sizeof(ping_observer));
observer_ptr o(new (m_node.m_rpc.allocator().malloc()) ping_observer(
this, node.id));
#ifndef NDEBUG
o->m_in_constructor = false;
#endif
m_rpc.invoke(messages::ping, node.addr, o);
m_node.m_rpc.invoke(messages::ping, node.addr, o);
++m_active_pings;
++m_leftover_nodes_iterator;
}

View File

@ -35,6 +35,8 @@ POSSIBILITY OF SUCH DAMAGE.
#include <libtorrent/kademlia/traversal_algorithm.hpp>
#include <libtorrent/kademlia/routing_table.hpp>
#include <libtorrent/kademlia/rpc_manager.hpp>
#include <libtorrent/kademlia/node.hpp>
#include <libtorrent/session_status.hpp>
#include <boost/bind.hpp>
@ -82,7 +84,7 @@ void traversal_algorithm::add_entry(node_id const& id, udp::endpoint addr, unsig
boost::pool<>& traversal_algorithm::allocator() const
{
return m_rpc.allocator();
return m_node.m_rpc.allocator();
}
void traversal_algorithm::traverse(node_id const& id, udp::endpoint addr)
@ -96,6 +98,7 @@ void traversal_algorithm::traverse(node_id const& id, udp::endpoint addr)
void traversal_algorithm::finished(node_id const& id)
{
++m_responses;
--m_invoke_count;
add_requests();
if (m_invoke_count == 0) done();
@ -106,7 +109,7 @@ void traversal_algorithm::finished(node_id const& id)
// So, if this is true, don't make another request
void traversal_algorithm::failed(node_id const& id, bool prevent_request)
{
m_invoke_count--;
--m_invoke_count;
TORRENT_ASSERT(!id.is_all_zeros());
std::vector<result>::iterator i = std::find_if(
@ -131,8 +134,9 @@ void traversal_algorithm::failed(node_id const& id, bool prevent_request)
// don't tell the routing table about
// node ids that we just generated ourself
if ((i->flags & result::no_id) == 0)
m_table.node_failed(id);
m_node.m_table.node_failed(id);
m_results.erase(i);
++m_timeouts;
}
if (prevent_request)
{
@ -182,11 +186,40 @@ void traversal_algorithm::add_requests()
}
}
void traversal_algorithm::add_router_entries()
{
for (routing_table::router_iterator i = m_node.m_table.router_begin()
, end(m_node.m_table.router_end()); i != end; ++i)
{
add_entry(node_id(0), *i, result::initial);
}
}
void traversal_algorithm::init()
{
m_branch_factor = m_node.branch_factor();
m_node.add_traversal_algorithm(this);
}
traversal_algorithm::~traversal_algorithm()
{
m_node.remove_traversal_algorithm(this);
}
void traversal_algorithm::status(dht_lookup& l)
{
l.timeouts = m_timeouts;
l.responses = m_responses;
l.outstanding_requests = m_invoke_count;
l.branch_factor = m_branch_factor;
l.type = name();
}
std::vector<traversal_algorithm::result>::iterator traversal_algorithm::last_iterator()
{
return (int)m_results.size() >= m_max_results ?
m_results.begin() + m_max_results
: m_results.end();
int max_results = m_node.m_table.bucket_size();
return (int)m_results.size() >= max_results ?
m_results.begin() + max_results : m_results.end();
}
} } // namespace libtorrent::dht

View File

@ -1084,8 +1084,8 @@ namespace aux {
}
// drain the IP overhead from the bandwidth limiters
m_download_channel.drain(m_stat.download_ip_overhead());
m_upload_channel.drain(m_stat.upload_ip_overhead());
m_download_channel.drain(m_stat.download_ip_overhead() + m_stat.download_dht());
m_upload_channel.drain(m_stat.upload_ip_overhead() + m_stat.upload_dht());
m_stat.second_tick(tick_interval);
@ -1773,7 +1773,11 @@ namespace aux {
void session_impl::check_torrent(boost::shared_ptr<torrent> const& t)
{
if (m_abort) return;
TORRENT_ASSERT(!t->is_paused() || t->is_auto_managed());
TORRENT_ASSERT(t->state() == torrent_status::checking_files);
if (m_queued_for_checking.empty()) t->start_checking();
TORRENT_ASSERT(std::find(m_queued_for_checking.begin()
, m_queued_for_checking.end(), t) == m_queued_for_checking.end());
m_queued_for_checking.push_back(t);
}
@ -1978,20 +1982,29 @@ namespace aux {
s.has_incoming_connections = m_incoming_connection;
// total
s.download_rate = m_stat.download_rate();
s.total_upload = m_stat.total_upload();
s.upload_rate = m_stat.upload_rate();
s.total_download = m_stat.total_download();
s.payload_download_rate = m_stat.download_payload_rate();
s.payload_upload_rate = m_stat.upload_payload_rate();
// payload
s.payload_download_rate = m_stat.transfer_rate(stat::download_payload);
s.total_payload_download = m_stat.total_transfer(stat::download_payload);
s.payload_upload_rate = m_stat.transfer_rate(stat::upload_payload);
s.total_payload_upload = m_stat.total_transfer(stat::upload_payload);
s.total_download = m_stat.total_protocol_download()
+ m_stat.total_payload_download();
// IP-overhead
s.ip_overhead_download_rate = m_stat.transfer_rate(stat::download_ip_protocol);
s.total_ip_overhead_download = m_stat.total_transfer(stat::download_ip_protocol);
s.ip_overhead_upload_rate = m_stat.transfer_rate(stat::upload_ip_protocol);
s.total_ip_overhead_upload = m_stat.total_transfer(stat::upload_ip_protocol);
s.total_upload = m_stat.total_protocol_upload()
+ m_stat.total_payload_upload();
s.total_payload_download = m_stat.total_payload_download();
s.total_payload_upload = m_stat.total_payload_upload();
// DHT protocol
s.dht_download_rate = m_stat.transfer_rate(stat::download_dht_protocol);
s.total_dht_download = m_stat.total_transfer(stat::download_dht_protocol);
s.dht_upload_rate = m_stat.transfer_rate(stat::upload_dht_protocol);
s.total_dht_upload = m_stat.total_transfer(stat::upload_dht_protocol);
#ifndef TORRENT_DISABLE_DHT
if (m_dht)
@ -2050,7 +2063,7 @@ namespace aux {
, m_dht_settings.service_port
, m_dht_settings.service_port);
}
m_dht = new dht::dht_tracker(m_dht_socket, m_dht_settings);
m_dht = new dht::dht_tracker(*this, m_dht_socket, m_dht_settings);
if (!m_dht_socket.is_open() || m_dht_socket.local_port() != m_dht_settings.service_port)
{
m_dht_socket.bind(m_dht_settings.service_port);

View File

@ -655,7 +655,8 @@ namespace libtorrent
{
// either the fastresume data was rejected or there are
// some files
m_ses.check_torrent(shared_from_this());
if (!is_torrent_paused() || is_auto_managed())
m_ses.check_torrent(shared_from_this());
}
std::vector<char>().swap(m_resume_data);
@ -712,7 +713,8 @@ namespace libtorrent
pause();
return;
}
m_ses.check_torrent(shared_from_this());
if (!is_torrent_paused() || is_auto_managed())
m_ses.check_torrent(shared_from_this());
}
void torrent::start_checking()
@ -3726,10 +3728,18 @@ namespace libtorrent
INVARIANT_CHECK;
if (m_auto_managed == a) return;
bool checking_files = should_check_files();
m_auto_managed = a;
// recalculate which torrents should be
// paused
m_ses.m_auto_manage_time_scaler = 0;
if (!checking_files && should_check_files())
m_ses.check_torrent(shared_from_this());
else if (checking_files && !should_check_files())
{
// TODO: pause checking
}
}
// the higher seed rank, the more important to seed
@ -3822,6 +3832,12 @@ namespace libtorrent
}
}
bool torrent::should_check_files() const
{
return m_state == torrent_status::checking_files
&& (!is_paused() || m_auto_managed);
}
bool torrent::is_paused() const
{
return m_paused || m_ses.is_paused();
@ -3832,9 +3848,14 @@ namespace libtorrent
INVARIANT_CHECK;
if (m_paused) return;
bool checking_files = should_check_files();
m_paused = true;
if (m_ses.is_paused()) return;
do_pause();
if (checking_files && !should_check_files())
{
// TODO: pause checking
}
}
void torrent::do_pause()
@ -3887,8 +3908,11 @@ namespace libtorrent
INVARIANT_CHECK;
if (!m_paused) return;
bool checking_files = should_check_files();
m_paused = false;
do_resume();
if (!checking_files && should_check_files())
m_ses.check_torrent(shared_from_this());
}
void torrent::do_resume()