fixed deadlock in dht code

This commit is contained in:
Arvid Norberg 2008-11-02 10:01:04 +00:00
parent 0d8f74cf78
commit f2c9e02994
4 changed files with 36 additions and 12 deletions

View File

@ -90,6 +90,7 @@ namespace libtorrent { namespace dht
, sha1_hash const&)> f); , sha1_hash const&)> f);
void dht_status(session_status& s); void dht_status(session_status& s);
void network_stats(int& sent, int& received);
// translate bittorrent kademlia message into the generic kademlia message // translate bittorrent kademlia message into the generic kademlia message
// used by the library // used by the library
@ -134,6 +135,10 @@ namespace libtorrent { namespace dht
// used to resolve hostnames for nodes // used to resolve hostnames for nodes
udp::resolver m_host_resolver; udp::resolver m_host_resolver;
// sent and received bytes since queried last time
int m_sent_bytes;
int m_received_bytes;
// used to ignore abusive dht nodes // used to ignore abusive dht nodes
struct node_ban_entry struct node_ban_entry
{ {

View File

@ -146,16 +146,12 @@ namespace libtorrent
{ {
TORRENT_ASSERT(bytes >= 0); TORRENT_ASSERT(bytes >= 0);
m_stat[download_dht_protocol].add(bytes); m_stat[download_dht_protocol].add(bytes);
// assuming IPv4 and UDP headers
m_stat[download_ip_protocol].add(28);
} }
void sent_dht_bytes(int bytes) void sent_dht_bytes(int bytes)
{ {
TORRENT_ASSERT(bytes >= 0); TORRENT_ASSERT(bytes >= 0);
m_stat[upload_dht_protocol].add(bytes); m_stat[upload_dht_protocol].add(bytes);
// assuming IPv4 and UDP headers
m_stat[upload_ip_protocol].add(28);
} }
void received_tracker_bytes(int bytes) void received_tracker_bytes(int bytes)

View File

@ -179,6 +179,7 @@ namespace libtorrent { namespace dht
{ {
std::vector<udp::endpoint> initial_nodes; std::vector<udp::endpoint> initial_nodes;
mutex_t::scoped_lock l(m_mutex);
if (bootstrap.type() == entry::dictionary_t) if (bootstrap.type() == entry::dictionary_t)
{ {
try try
@ -221,12 +222,22 @@ namespace libtorrent { namespace dht
void dht_tracker::dht_status(session_status& s) void dht_tracker::dht_status(session_status& s)
{ {
mutex_t::scoped_lock l(m_mutex);
boost::tie(s.dht_nodes, s.dht_node_cache) = m_dht.size(); boost::tie(s.dht_nodes, s.dht_node_cache) = m_dht.size();
s.dht_torrents = m_dht.data_size(); s.dht_torrents = m_dht.data_size();
s.dht_global_nodes = m_dht.num_global_nodes(); s.dht_global_nodes = m_dht.num_global_nodes();
m_dht.status(s); m_dht.status(s);
} }
void dht_tracker::network_stats(int& sent, int& received)
{
mutex_t::scoped_lock l(m_mutex);
sent = m_sent_bytes;
received = m_received_bytes;
m_sent_bytes = 0;
m_received_bytes = 0;
}
void dht_tracker::connection_timeout(error_code const& e) void dht_tracker::connection_timeout(error_code const& e)
{ {
mutex_t::scoped_lock l(m_mutex); mutex_t::scoped_lock l(m_mutex);
@ -355,12 +366,14 @@ namespace libtorrent { namespace dht
, boost::function<void(std::vector<tcp::endpoint> const& , boost::function<void(std::vector<tcp::endpoint> const&
, sha1_hash const&)> f) , sha1_hash const&)> f)
{ {
mutex_t::scoped_lock l(m_mutex);
m_dht.announce(ih, listen_port, f); m_dht.announce(ih, listen_port, f);
} }
void dht_tracker::on_unreachable(udp::endpoint const& ep) void dht_tracker::on_unreachable(udp::endpoint const& ep)
{ {
mutex_t::scoped_lock l(m_mutex);
m_dht.unreachable(ep); m_dht.unreachable(ep);
} }
@ -368,10 +381,9 @@ namespace libtorrent { namespace dht
// used by the library // used by the library
void dht_tracker::on_receive(udp::endpoint const& ep, char const* buf, int bytes_transferred) void dht_tracker::on_receive(udp::endpoint const& ep, char const* buf, int bytes_transferred)
{ {
{ mutex_t::scoped_lock l(m_mutex);
libtorrent::aux::session_impl::mutex_t::scoped_lock l(m_ses.m_mutex); // account for IP and UDP overhead
m_ses.m_stat.received_dht_bytes(bytes_transferred); m_received_bytes += bytes_transferred + 28;
}
node_ban_entry* match = 0; node_ban_entry* match = 0;
node_ban_entry* min = m_ban_nodes; node_ban_entry* min = m_ban_nodes;
@ -759,6 +771,7 @@ namespace libtorrent { namespace dht
entry dht_tracker::state() const entry dht_tracker::state() const
{ {
mutex_t::scoped_lock l(m_mutex);
entry ret(entry::dictionary_t); entry ret(entry::dictionary_t);
{ {
entry nodes(entry::list_t); entry nodes(entry::list_t);
@ -790,11 +803,13 @@ namespace libtorrent { namespace dht
void dht_tracker::add_node(udp::endpoint node) void dht_tracker::add_node(udp::endpoint node)
{ {
mutex_t::scoped_lock l(m_mutex);
m_dht.add_node(node); m_dht.add_node(node);
} }
void dht_tracker::add_node(std::pair<std::string, int> const& node) void dht_tracker::add_node(std::pair<std::string, int> const& node)
{ {
mutex_t::scoped_lock l(m_mutex);
udp::resolver::query q(node.first, lexical_cast<std::string>(node.second)); udp::resolver::query q(node.first, lexical_cast<std::string>(node.second));
m_host_resolver.async_resolve(q, m_host_resolver.async_resolve(q,
bind(&dht_tracker::on_name_lookup, self(), _1, _2)); bind(&dht_tracker::on_name_lookup, self(), _1, _2));
@ -809,6 +824,7 @@ namespace libtorrent { namespace dht
void dht_tracker::add_router_node(std::pair<std::string, int> const& node) void dht_tracker::add_router_node(std::pair<std::string, int> const& node)
{ {
mutex_t::scoped_lock l(m_mutex);
udp::resolver::query q(node.first, lexical_cast<std::string>(node.second)); udp::resolver::query q(node.first, lexical_cast<std::string>(node.second));
m_host_resolver.async_resolve(q, m_host_resolver.async_resolve(q,
bind(&dht_tracker::on_router_name_lookup, self(), _1, _2)); bind(&dht_tracker::on_router_name_lookup, self(), _1, _2));
@ -1003,10 +1019,8 @@ namespace libtorrent { namespace dht
error_code ec; error_code ec;
m_sock.send(m.addr, &m_send_buf[0], (int)m_send_buf.size(), ec); m_sock.send(m.addr, &m_send_buf[0], (int)m_send_buf.size(), ec);
{ // account for IP and UDP overhead
libtorrent::aux::session_impl::mutex_t::scoped_lock l(m_ses.m_mutex); m_sent_bytes += m_send_buf.size() + 28;
m_ses.m_stat.sent_dht_bytes(m_send_buf.size());
}
#ifdef TORRENT_DHT_VERBOSE_LOGGING #ifdef TORRENT_DHT_VERBOSE_LOGGING
m_total_out_bytes += m_send_buf.size(); m_total_out_bytes += m_send_buf.size();

View File

@ -1108,6 +1108,15 @@ namespace aux {
++i; ++i;
} }
if (m_dht)
{
int dht_down;
int dht_up;
m_dht->network_stats(dht_up, dht_down);
m_stat.sent_dht_bytes(dht_up);
m_stat.received_dht_bytes(dht_down);
}
// drain the IP overhead from the bandwidth limiters // drain the IP overhead from the bandwidth limiters
m_download_channel.drain( m_download_channel.drain(
m_stat.download_ip_overhead() m_stat.download_ip_overhead()