fixed bug in allocate_resources, made the refresh timers more accurate and tries to even out the load bu not refreshing more than one bucket at a time

This commit is contained in:
Arvid Norberg 2006-08-06 16:36:00 +00:00
parent d17082aa1a
commit 2daaa9f8d2
8 changed files with 108 additions and 43 deletions

View File

@ -77,7 +77,8 @@ namespace libtorrent { namespace dht
void on_name_lookup(asio::error const& e
, udp::resolver::iterator host);
void second_tick(asio::error const& e);
void connection_timeout(asio::error const& e);
void refresh_timeout(asio::error const& e);
void tick(asio::error const& e);
// translate bittorrent kademlia message into the generice kademlia message
@ -100,7 +101,8 @@ namespace libtorrent { namespace dht
boost::posix_time::ptime m_last_refresh;
deadline_timer m_timer;
deadline_timer m_second_timer;
deadline_timer m_connection_timer;
deadline_timer m_refresh_timer;
dht_settings const& m_settings;
int m_refresh_bucket;
@ -121,6 +123,11 @@ namespace libtorrent { namespace dht
int m_lt_message_input;
int m_mp_message_input;
int m_gr_message_input;
int m_total_in_bytes;
int m_total_out_bytes;
int m_queries_out_bytes;
#endif
};
}}

View File

@ -129,13 +129,11 @@ public:
bool verify_token(msg const& m);
entry generate_token(msg const& m);
// the returned time is the delay until tick should be called
// again the next time
boost::posix_time::time_duration tick();
// checks the buckets for any that needs refreshing
void check_refresh();
// the returned time is the delay until connection_timeout()
// should be called again the next time
boost::posix_time::time_duration connection_timeout();
boost::posix_time::time_duration refresh_timeout();
// generates a new secret number used to generate write tokens
void new_write_key();

View File

@ -163,10 +163,11 @@ public:
// of its bucket.
bool node_seen(node_id const& id, udp::endpoint addr);
// returns true if the given bucket is empty but there are nodes
// returns time when the given bucket needs another refresh.
// if the given bucket is empty but there are nodes
// in a bucket closer to us, or if the bucket is non-empty and
// the time from the last activity is more than 15 minutes
bool should_refresh(int bucket);
boost::posix_time::ptime next_refresh(int bucket);
// fills the vector with the count nodes from our buckets that
// are nearest to the given id.
@ -178,6 +179,10 @@ public:
// this function returns false
bool need_node(node_id const& id);
// this will set the given bucket's latest activity
// to the current time
void touch_bucket(int bucket);
int bucket_size(int bucket)
{
assert(bucket >= 0 && bucket < 160);

View File

@ -232,9 +232,10 @@ namespace libtorrent
size_type used = (size_type)r.used + 1;
if (used < 1) used = 1;
size_type to_give = used * kNumer / kDenom;
if(to_give > resource_request::inf)
to_give = resource_request::inf;
if (to_give > resources_to_distribute)
to_give = resources_to_distribute;
assert(to_give >= 0);
assert(to_give <= resources_to_distribute);
resources_to_distribute -= give(r, (int)to_give);
assert(resources_to_distribute >= 0);
}

View File

@ -75,7 +75,7 @@ typedef asio::ip::address_v4 address;
namespace
{
const int tick_period = 5; // minutes
const int tick_period = 1; // minutes
struct count_peers
{
@ -123,7 +123,8 @@ namespace libtorrent { namespace dht
, m_buffer(0)
, m_last_refresh(second_clock::universal_time() - hours(1))
, m_timer(m_demuxer)
, m_second_timer(m_demuxer)
, m_connection_timer(m_demuxer)
, m_refresh_timer(m_demuxer)
, m_settings(settings)
, m_refresh_bucket(160)
, m_host_resolver(d)
@ -145,6 +146,9 @@ namespace libtorrent { namespace dht
m_lt_message_input = 0;
m_mp_message_input = 0;
m_gr_message_input = 0;
m_total_in_bytes = 0;
m_total_out_bytes = 0;
m_queries_out_bytes = 0;
// turns on and off individual components' logging
@ -178,17 +182,33 @@ namespace libtorrent { namespace dht
m_timer.expires_from_now(seconds(1));
m_timer.async_wait(bind(&dht_tracker::tick, this, _1));
m_second_timer.expires_from_now(seconds(10));
m_second_timer.async_wait(bind(&dht_tracker::second_tick, this, _1));
m_connection_timer.expires_from_now(seconds(10));
m_connection_timer.async_wait(bind(&dht_tracker::connection_timeout, this, _1));
m_refresh_timer.expires_from_now(minutes(15));
m_refresh_timer.async_wait(bind(&dht_tracker::refresh_timeout, this, _1));
}
void dht_tracker::second_tick(asio::error const& e)
void dht_tracker::connection_timeout(asio::error const& e)
try
{
if (e) return;
time_duration d = m_dht.tick();
m_second_timer.expires_from_now(d);
m_second_timer.async_wait(bind(&dht_tracker::second_tick, this, _1));
time_duration d = m_dht.connection_timeout();
m_connection_timer.expires_from_now(d);
m_connection_timer.async_wait(bind(&dht_tracker::connection_timeout, this, _1));
}
catch (std::exception&)
{
assert(false);
};
void dht_tracker::refresh_timeout(asio::error const& e)
try
{
if (e) return;
time_duration d = m_dht.refresh_timeout();
m_refresh_timer.expires_from_now(d);
m_refresh_timer.async_wait(bind(&dht_tracker::refresh_timeout, this, _1));
}
catch (std::exception&)
{
@ -203,7 +223,6 @@ namespace libtorrent { namespace dht
m_timer.async_wait(bind(&dht_tracker::tick, this, _1));
m_dht.new_write_key();
m_dht.check_refresh();
#ifdef TORRENT_DHT_VERBOSE_LOGGING
static bool first = true;
@ -243,7 +262,8 @@ namespace libtorrent { namespace dht
":num torrents:num peers:announces per min"
":failed announces per min:total msgs per min"
":ut msgs per min:lt msgs per min:mp msgs per min"
":gr msgs per min\n\n";
":gr msgs per min:bytes in per sec:bytes out per sec"
":queries out bytes per sec\n\n";
}
int active;
@ -267,6 +287,9 @@ namespace libtorrent { namespace dht
<< "\t" << (m_lt_message_input / float(tick_period))
<< "\t" << (m_mp_message_input / float(tick_period))
<< "\t" << (m_gr_message_input / float(tick_period))
<< "\t" << (m_total_in_bytes / float(tick_period*60))
<< "\t" << (m_total_out_bytes / float(tick_period*60))
<< "\t" << (m_queries_out_bytes / float(tick_period*60))
<< std::endl;
++m_counter;
std::fill_n(m_replies_bytes_sent, 5, 0);
@ -278,6 +301,9 @@ namespace libtorrent { namespace dht
m_total_message_input = 0;
m_ut_message_input = 0;
m_lt_message_input = 0;
m_total_in_bytes = 0;
m_total_out_bytes = 0;
m_queries_out_bytes = 0;
#endif
}
catch (std::exception&)
@ -309,6 +335,7 @@ namespace libtorrent { namespace dht
#ifdef TORRENT_DHT_VERBOSE_LOGGING
++m_total_message_input;
m_total_in_bytes += bytes_transferred;
#endif
try
@ -751,11 +778,17 @@ namespace libtorrent { namespace dht
, (int)m_send_buf.size()), m.addr);
#ifdef TORRENT_DHT_VERBOSE_LOGGING
m_total_out_bytes += m_send_buf.size();
if (m.reply)
{
++m_replies_sent[m.message_id];
m_replies_bytes_sent[m.message_id] += int(m_send_buf.size());
}
else
{
m_queries_out_bytes += m_send_buf.size();
}
TORRENT_LOG(dht_tracker) << e;
#endif

View File

@ -167,17 +167,6 @@ entry node_impl::generate_token(msg const& m)
return entry(token);
}
void node_impl::check_refresh()
{
for (int i = 0; i < 160; ++i)
{
if (m_table.should_refresh(i))
{
refresh_bucket(i);
}
}
}
void node_impl::refresh(node_id const& id
, boost::function0<void> f)
{
@ -256,6 +245,7 @@ void node_impl::refresh_bucket(int bucket)
refresh::initiate(target, m_settings.search_branching, 10, m_table.bucket_size()
, m_table, start.begin(), start.end(), m_rpc, bind(&nop));
m_table.touch_bucket(bucket);
}
@ -377,7 +367,35 @@ void node_impl::announce(sha1_hash const& info_hash, int listen_port
, info_hash, f));
}
time_duration node_impl::tick()
time_duration node_impl::refresh_timeout()
{
int refresh = -1;
ptime now = second_clock::universal_time();
ptime next = now + minutes(15);
for (int i = 0; i < 160; ++i)
{
ptime r = m_table.next_refresh(i);
if (r <= now)
{
if (refresh == -1) refresh = i;
}
else if (r < next)
{
next = r;
}
}
if (refresh != -1)
{
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(node) << "refreshing bucket: " << refresh;
#endif
refresh_bucket(refresh);
}
if (next < now + seconds(5)) return seconds(5);
return next - now;
}
time_duration node_impl::connection_timeout()
{
time_duration d = m_rpc.tick();

View File

@ -135,17 +135,20 @@ void routing_table::print_state(std::ostream& os) const
}
}
bool routing_table::should_refresh(int bucket)
void routing_table::touch_bucket(int bucket)
{
m_bucket_activity[bucket] = second_clock::universal_time();
}
boost::posix_time::ptime routing_table::next_refresh(int bucket)
{
assert(bucket < 160);
assert(bucket >= 0);
// lower than or equal to since a refresh of bucket 0 will
// effectively refresh the lowest active bucket as well
if (bucket <= m_lowest_active_bucket && bucket > 0) return false;
if (m_bucket_activity[bucket] + minutes(15)
> second_clock::universal_time())
return false;
return true;
if (bucket <= m_lowest_active_bucket && bucket > 0)
return second_clock::universal_time() + minutes(15);
return m_bucket_activity[bucket] + minutes(15);
}
void routing_table::replacement_cache(bucket_t& nodes) const

View File

@ -643,8 +643,7 @@ namespace libtorrent
{
os << "trackers:\n";
for (std::vector<announce_entry>::const_iterator i = trackers().begin();
i != trackers().end();
++i)
i != trackers().end(); ++i)
{
os << i->tier << ": " << i->url << "\n";
}
@ -652,6 +651,7 @@ namespace libtorrent
os << "comment: " << m_comment << "\n";
if (m_creation_date != ptime(date(not_a_date_time)))
os << "creation date: " << to_simple_string(m_creation_date) << "\n";
os << "private: " << (m_private?"yes":"no") << "\n";
os << "number of pieces: " << num_pieces() << "\n";
os << "piece length: " << piece_length() << "\n";
os << "files:\n";