fix request queue size performance issue

This commit is contained in:
arvidn 2015-07-20 08:20:17 -07:00
parent 9231476ac6
commit 6c5aec3476
6 changed files with 105 additions and 43 deletions

View File

@ -72,6 +72,7 @@
* almost completely changed the storage interface (for custom storage) * almost completely changed the storage interface (for custom storage)
* added support for hashing pieces in multiple threads * added support for hashing pieces in multiple threads
* fix request queue size performance issue
* fix http scrape * fix http scrape
* add missing port mapping functions to python binding * add missing port mapping functions to python binding
* fix bound-checking issue in bdecoder * fix bound-checking issue in bdecoder

View File

@ -127,7 +127,7 @@ namespace libtorrent
// unexpectedly from the peer // unexpectedly from the peer
boost::uint32_t not_wanted:1; boost::uint32_t not_wanted:1;
boost::uint32_t timed_out:1; boost::uint32_t timed_out:1;
// the busy flag is set if the block was // the busy flag is set if the block was
// requested from another peer when this // requested from another peer when this
// request was queued. We only allow a single // request was queued. We only allow a single
@ -207,7 +207,7 @@ namespace libtorrent
// take a raw pointer. torrent objects should always // take a raw pointer. torrent objects should always
// outlive their peers // outlive their peers
boost::weak_ptr<torrent> m_torrent; boost::weak_ptr<torrent> m_torrent;
public: public:
// a back reference to the session // a back reference to the session
@ -216,7 +216,7 @@ namespace libtorrent
// settings that apply to this peer // settings that apply to this peer
aux::session_settings const& m_settings; aux::session_settings const& m_settings;
protected: protected:
// this is true if this connection has been added // this is true if this connection has been added
@ -967,6 +967,12 @@ namespace libtorrent
// for the round-robin unchoke algorithm. // for the round-robin unchoke algorithm.
boost::int64_t m_uploaded_at_last_unchoke; boost::int64_t m_uploaded_at_last_unchoke;
// the number of payload bytes downloaded last second tick
boost::int32_t m_downloaded_last_second;
// the number of payload bytes uploaded last second tick
boost::int32_t m_uploaded_last_second;
// the number of bytes that the other // the number of bytes that the other
// end has to send us in order to respond // end has to send us in order to respond
// to all outstanding piece requests we // to all outstanding piece requests we
@ -1113,6 +1119,7 @@ namespace libtorrent
// the number of request we should queue up // the number of request we should queue up
// at the remote end. // at the remote end.
// TODO: 2 rename this target queue size
boost::uint16_t m_desired_queue_size; boost::uint16_t m_desired_queue_size;
#ifndef TORRENT_DISABLE_RESOLVE_COUNTRIES #ifndef TORRENT_DISABLE_RESOLVE_COUNTRIES
@ -1229,6 +1236,12 @@ namespace libtorrent
// other peers to compare it to. // other peers to compare it to.
bool m_exceeded_limit:1; bool m_exceeded_limit:1;
// this is slow-start at the bittorrent layer. It affects how we increase
// desired queue size (i.e. the number of outstanding requests we keep).
// While the underlying transport protocol is in slow-start, the number of
// outstanding requests need to increase at the same pace to keep up.
bool m_slow_start:1;
// TODO: 3 factor this out into its own header and use it for UDP socket // TODO: 3 factor this out into its own header and use it for UDP socket
// and maybe second_timer as well // and maybe second_timer as well
template <class Handler, std::size_t Size> template <class Handler, std::size_t Size>

View File

@ -145,6 +145,8 @@ namespace libtorrent
, m_downloaded_at_last_round(0) , m_downloaded_at_last_round(0)
, m_uploaded_at_last_round(0) , m_uploaded_at_last_round(0)
, m_uploaded_at_last_unchoke(0) , m_uploaded_at_last_unchoke(0)
, m_downloaded_last_second(0)
, m_uploaded_last_second(0)
, m_outstanding_bytes(0) , m_outstanding_bytes(0)
, m_last_seen_complete(0) , m_last_seen_complete(0)
, m_receiving_block(piece_block::invalid) , m_receiving_block(piece_block::invalid)
@ -159,7 +161,7 @@ namespace libtorrent
, m_download_rate_peak(0) , m_download_rate_peak(0)
, m_upload_rate_peak(0) , m_upload_rate_peak(0)
, m_send_barrier(INT_MAX) , m_send_barrier(INT_MAX)
, m_desired_queue_size(2) , m_desired_queue_size(4)
, m_prefer_contiguous_blocks(0) , m_prefer_contiguous_blocks(0)
, m_disk_read_failures(0) , m_disk_read_failures(0)
, m_outstanding_piece_verification(0) , m_outstanding_piece_verification(0)
@ -181,6 +183,7 @@ namespace libtorrent
, m_need_interest_update(false) , m_need_interest_update(false)
, m_has_metadata(true) , m_has_metadata(true)
, m_exceeded_limit(false) , m_exceeded_limit(false)
, m_slow_start(true)
#if TORRENT_USE_ASSERTS #if TORRENT_USE_ASSERTS
, m_in_constructor(true) , m_in_constructor(true)
, m_disconnect_started(false) , m_disconnect_started(false)
@ -2611,6 +2614,10 @@ namespace libtorrent
if (!m_bitfield_received) incoming_have_none(); if (!m_bitfield_received) incoming_have_none();
if (is_disconnecting()) return; if (is_disconnecting()) return;
// slow-start
if (m_slow_start)
m_desired_queue_size += 1;
update_desired_queue_size(); update_desired_queue_size();
#ifndef TORRENT_DISABLE_EXTENSIONS #ifndef TORRENT_DISABLE_EXTENSIONS
@ -3687,6 +3694,7 @@ namespace libtorrent
boost::shared_ptr<torrent> t = m_torrent.lock(); boost::shared_ptr<torrent> t = m_torrent.lock();
if (!t->ready_for_connections()) return; if (!t->ready_for_connections()) return;
m_interesting = false; m_interesting = false;
m_slow_start = false;
m_counters.inc_stats_counter(counters::num_peers_down_interested, -1); m_counters.inc_stats_counter(counters::num_peers_down_interested, -1);
disconnect_if_redundant(); disconnect_if_redundant();
@ -3711,7 +3719,7 @@ namespace libtorrent
// don't suggest anything to a peer that isn't interested // don't suggest anything to a peer that isn't interested
if (has_piece(piece) || !m_peer_interested) if (has_piece(piece) || !m_peer_interested)
return; return;
// we cannot suggest a piece we don't have! // we cannot suggest a piece we don't have!
#if TORRENT_USE_ASSERTS #if TORRENT_USE_ASSERTS
boost::shared_ptr<torrent> t = m_torrent.lock(); boost::shared_ptr<torrent> t = m_torrent.lock();
@ -4536,23 +4544,30 @@ namespace libtorrent
m_desired_queue_size = 1; m_desired_queue_size = 1;
return; return;
} }
int download_rate = statistics().download_payload_rate(); int download_rate = statistics().download_payload_rate();
// calculate the desired download queue size // the desired download queue size
const int queue_time = m_settings.get_int(settings_pack::request_queue_time); const int queue_time = m_settings.get_int(settings_pack::request_queue_time);
// (if the latency is more than this, the download will stall)
// so, the queue size is queue_time * down_rate / 16 kiB
// (16 kB is the size of each request)
// the minimum number of requests is 2 and the maximum is 48
// the block size doesn't have to be 16. So we first query the
// torrent for it
boost::shared_ptr<torrent> t = m_torrent.lock();
const int block_size = t->block_size();
TORRENT_ASSERT(block_size > 0); // when we're in slow-start mode we increase the desired queue size every
// time we receive a piece, no need to adjust it here (other than
// enforcing the upper limit)
if (!m_slow_start)
{
// (if the latency is more than this, the download will stall)
// so, the queue size is queue_time * down_rate / 16 kiB
// (16 kB is the size of each request)
// the minimum number of requests is 2 and the maximum is 48
// the block size doesn't have to be 16. So we first query the
// torrent for it
boost::shared_ptr<torrent> t = m_torrent.lock();
const int block_size = t->block_size();
m_desired_queue_size = queue_time * download_rate / block_size; TORRENT_ASSERT(block_size > 0);
m_desired_queue_size = queue_time * download_rate / block_size;
}
if (m_desired_queue_size > m_max_out_request_queue) if (m_desired_queue_size > m_max_out_request_queue)
m_desired_queue_size = m_max_out_request_queue; m_desired_queue_size = m_max_out_request_queue;
@ -4561,9 +4576,9 @@ namespace libtorrent
#ifndef TORRENT_DISABLE_LOGGING #ifndef TORRENT_DISABLE_LOGGING
peer_log(peer_log_alert::info, "UPDATE_QUEUE_SIZE" peer_log(peer_log_alert::info, "UPDATE_QUEUE_SIZE"
, "dqs: %d max: %d dl: %d qt: %d snubbed: %d" , "dqs: %d max: %d dl: %d qt: %d snubbed: %d slow-start: %d"
, m_desired_queue_size, m_max_out_request_queue , m_desired_queue_size, m_max_out_request_queue
, download_rate, queue_time, int(m_snubbed)); , download_rate, queue_time, int(m_snubbed), int(m_slow_start));
#endif #endif
} }
@ -4787,6 +4802,23 @@ namespace libtorrent
// if we haven't sent something in too long, send a keep-alive // if we haven't sent something in too long, send a keep-alive
keep_alive(); keep_alive();
// if our download rate isn't increasing significantly anymore, end slow
// start. The 10kB is to have some slack here.
if (m_slow_start && m_downloaded_last_second > 0
&& m_downloaded_last_second + 10000
>= m_statistics.last_payload_downloaded())
{
m_slow_start = false;
#ifndef TORRENT_DISABLE_LOGGING
peer_log(peer_log_alert::info, "SLOW_START", "exit slow start: "
"prev-dl: %d dl: %d"
, int(m_downloaded_last_second)
, int(m_statistics.last_payload_downloaded()));
#endif
}
m_downloaded_last_second = m_statistics.last_payload_downloaded();
m_uploaded_last_second = m_statistics.last_payload_uploaded();
m_statistics.second_tick(tick_interval_ms); m_statistics.second_tick(tick_interval_ms);
if (m_statistics.upload_payload_rate() > m_upload_rate_peak) if (m_statistics.upload_payload_rate() > m_upload_rate_peak)
@ -4860,6 +4892,7 @@ namespace libtorrent
if (!m_snubbed) if (!m_snubbed)
{ {
m_snubbed = true; m_snubbed = true;
m_slow_start = false;
if (t->alerts().should_post<peer_snubbed_alert>()) if (t->alerts().should_post<peer_snubbed_alert>())
{ {
t->alerts().emplace_alert<peer_snubbed_alert>(t->get_handle() t->alerts().emplace_alert<peer_snubbed_alert>(t->get_handle()
@ -4984,9 +5017,7 @@ namespace libtorrent
// only add new piece-chunks if the send buffer is small enough // only add new piece-chunks if the send buffer is small enough
// otherwise there will be no end to how large it will be! // otherwise there will be no end to how large it will be!
boost::uint64_t upload_rate = m_statistics.upload_rate(); int buffer_size_watermark = int(m_uploaded_last_second
int buffer_size_watermark = int(upload_rate
* m_settings.get_int(settings_pack::send_buffer_watermark_factor) / 100); * m_settings.get_int(settings_pack::send_buffer_watermark_factor) / 100);
if (buffer_size_watermark < m_settings.get_int(settings_pack::send_buffer_low_watermark)) if (buffer_size_watermark < m_settings.get_int(settings_pack::send_buffer_low_watermark))
@ -5000,12 +5031,12 @@ namespace libtorrent
#ifndef TORRENT_DISABLE_LOGGING #ifndef TORRENT_DISABLE_LOGGING
peer_log(peer_log_alert::outgoing, "SEND_BUFFER_WATERMARK" peer_log(peer_log_alert::outgoing, "SEND_BUFFER_WATERMARK"
, "current watermark: %d max: %d min: %d factor: %d upload-rate: %d B/s" , "current watermark: %d max: %d min: %d factor: %d uploaded: %d B/s"
, buffer_size_watermark , buffer_size_watermark
, m_ses.settings().get_int(settings_pack::send_buffer_watermark) , m_ses.settings().get_int(settings_pack::send_buffer_watermark)
, m_ses.settings().get_int(settings_pack::send_buffer_low_watermark) , m_ses.settings().get_int(settings_pack::send_buffer_low_watermark)
, m_ses.settings().get_int(settings_pack::send_buffer_watermark_factor) , m_ses.settings().get_int(settings_pack::send_buffer_watermark_factor)
, int(upload_rate)); , int(m_uploaded_last_second));
#endif #endif
// don't just pop the front element here, since in seed mode one request may // don't just pop the front element here, since in seed mode one request may

View File

@ -245,7 +245,7 @@ namespace libtorrent
SET(suggest_mode, settings_pack::no_piece_suggestions, 0), SET(suggest_mode, settings_pack::no_piece_suggestions, 0),
SET(max_queued_disk_bytes, 1024 * 1024, &session_impl::update_queued_disk_bytes), SET(max_queued_disk_bytes, 1024 * 1024, &session_impl::update_queued_disk_bytes),
SET(handshake_timeout, 10, 0), SET(handshake_timeout, 10, 0),
SET(send_buffer_low_watermark, 512, 0), SET(send_buffer_low_watermark, 10 * 1024, 0),
SET(send_buffer_watermark, 500 * 1024, 0), SET(send_buffer_watermark, 500 * 1024, 0),
SET(send_buffer_watermark_factor, 50, 0), SET(send_buffer_watermark_factor, 50, 0),
SET(choking_algorithm, settings_pack::fixed_slots_choker, 0), SET(choking_algorithm, settings_pack::fixed_slots_choker, 0),
@ -304,7 +304,7 @@ namespace libtorrent
SET(connections_limit, 200, &session_impl::update_connections_limit), SET(connections_limit, 200, &session_impl::update_connections_limit),
SET(connections_slack, 10, 0), SET(connections_slack, 10, 0),
SET(utp_target_delay, 100, 0), SET(utp_target_delay, 100, 0),
SET(utp_gain_factor, 1500, 0), SET(utp_gain_factor, 3000, 0),
SET(utp_min_timeout, 500, 0), SET(utp_min_timeout, 500, 0),
SET(utp_syn_resends, 2, 0), SET(utp_syn_resends, 2, 0),
SET(utp_fin_resends, 2, 0), SET(utp_fin_resends, 2, 0),

View File

@ -443,7 +443,7 @@ public:
// it can also happen if the other end sends an advertized window // it can also happen if the other end sends an advertized window
// size less than one MSS. // size less than one MSS.
time_point m_timeout; time_point m_timeout;
// the last time we stepped the timestamp history // the last time we stepped the timestamp history
time_point m_last_history_step; time_point m_last_history_step;
@ -648,7 +648,7 @@ public:
// this is done at startup of a socket in order to find its // this is done at startup of a socket in order to find its
// link capacity faster. This behaves similar to TCP slow start // link capacity faster. This behaves similar to TCP slow start
bool m_slow_start:1; bool m_slow_start:1;
// this is true as long as we have as many packets in // this is true as long as we have as many packets in
// flight as allowed by the congestion window (cwnd) // flight as allowed by the congestion window (cwnd)
bool m_cwnd_full:1; bool m_cwnd_full:1;
@ -1699,7 +1699,9 @@ private:
// congestion window, false if there is no more space. // congestion window, false if there is no more space.
bool utp_socket_impl::send_pkt(int flags) bool utp_socket_impl::send_pkt(int flags)
{ {
#ifdef TORRENT_EXPENSIVE_INVARIANT_CHECKS
INVARIANT_CHECK; INVARIANT_CHECK;
#endif
bool force = (flags & pkt_ack) || (flags & pkt_fin); bool force = (flags & pkt_ack) || (flags & pkt_fin);
@ -2254,8 +2256,15 @@ void utp_socket_impl::experienced_loss(int seq_nr)
// less than or equal to. If we experience loss of the // less than or equal to. If we experience loss of the
// same packet again, ignore it. // same packet again, ignore it.
if (compare_less_wrap(seq_nr, m_loss_seq_nr + 1, ACK_MASK)) return; if (compare_less_wrap(seq_nr, m_loss_seq_nr + 1, ACK_MASK)) return;
// cut window size in 2
m_cwnd = (std::max)(m_cwnd * m_sm->loss_multiplier() / 100, boost::int64_t(m_mtu << 16));
m_loss_seq_nr = m_seq_nr;
UTP_LOGV("%8p: Lost packet %d caused cwnd cut\n", this, seq_nr);
// if we happen to be in slow-start mode, we need to leave it // if we happen to be in slow-start mode, we need to leave it
// note that we set ssthres to the window size _after_ reducing it. Next slow
// start should end before we over shoot.
if (m_slow_start) if (m_slow_start)
{ {
m_ssthres = m_cwnd >> 16; m_ssthres = m_cwnd >> 16;
@ -2263,14 +2272,9 @@ void utp_socket_impl::experienced_loss(int seq_nr)
UTP_LOGV("%8p: experienced loss, slow_start -> 0\n", this); UTP_LOGV("%8p: experienced loss, slow_start -> 0\n", this);
} }
// cut window size in 2
m_cwnd = (std::max)(m_cwnd * m_sm->loss_multiplier() / 100, boost::int64_t(m_mtu << 16));
m_loss_seq_nr = m_seq_nr;
UTP_LOGV("%8p: Lost packet %d caused cwnd cut\n", this, seq_nr);
// the window size could go below one MMS here, if it does, // the window size could go below one MMS here, if it does,
// we'll get a timeout in about one second // we'll get a timeout in about one second
m_sm->inc_stats_counter(counters::utp_packet_loss); m_sm->inc_stats_counter(counters::utp_packet_loss);
} }
@ -2285,7 +2289,9 @@ void utp_socket_impl::set_state(int s)
void utp_socket_impl::maybe_inc_acked_seq_nr() void utp_socket_impl::maybe_inc_acked_seq_nr()
{ {
#ifdef TORRENT_EXPENSIVE_INVARIANT_CHECKS
INVARIANT_CHECK; INVARIANT_CHECK;
#endif
bool incremented = false; bool incremented = false;
// don't pass m_seq_nr, since we move into sequence // don't pass m_seq_nr, since we move into sequence
@ -2317,7 +2323,9 @@ void utp_socket_impl::maybe_inc_acked_seq_nr()
void utp_socket_impl::ack_packet(packet* p, time_point const& receive_time void utp_socket_impl::ack_packet(packet* p, time_point const& receive_time
, boost::uint32_t& min_rtt, boost::uint16_t seq_nr) , boost::uint32_t& min_rtt, boost::uint16_t seq_nr)
{ {
#ifdef TORRENT_EXPENSIVE_INVARIANT_CHECKS
INVARIANT_CHECK; INVARIANT_CHECK;
#endif
TORRENT_ASSERT(p); TORRENT_ASSERT(p);
@ -2364,7 +2372,9 @@ void utp_socket_impl::ack_packet(packet* p, time_point const& receive_time
void utp_socket_impl::incoming(boost::uint8_t const* buf, int size, packet* p void utp_socket_impl::incoming(boost::uint8_t const* buf, int size, packet* p
, time_point /* now */) , time_point /* now */)
{ {
#ifdef TORRENT_EXPENSIVE_INVARIANT_CHECKS
INVARIANT_CHECK; INVARIANT_CHECK;
#endif
while (!m_read_buffer.empty()) while (!m_read_buffer.empty())
{ {
@ -3091,7 +3101,7 @@ bool utp_socket_impl::incoming_packet(boost::uint8_t const* buf, int size
// it's impossible for delay to be more than the RTT, so make // it's impossible for delay to be more than the RTT, so make
// sure to clamp it as a sanity check // sure to clamp it as a sanity check
if (delay > min_rtt) delay = min_rtt; if (delay > min_rtt) delay = min_rtt;
do_ledbat(acked_bytes, delay, prev_bytes_in_flight); do_ledbat(acked_bytes, delay, prev_bytes_in_flight);
m_send_delay = delay; m_send_delay = delay;
} }
@ -3345,7 +3355,7 @@ void utp_socket_impl::do_ledbat(const int acked_bytes, const int delay
const boost::int64_t window_factor = (boost::int64_t(acked_bytes) << 16) / in_flight; const boost::int64_t window_factor = (boost::int64_t(acked_bytes) << 16) / in_flight;
const boost::int64_t delay_factor = (boost::int64_t(target_delay - delay) << 16) / target_delay; const boost::int64_t delay_factor = (boost::int64_t(target_delay - delay) << 16) / target_delay;
boost::int64_t scaled_gain; boost::int64_t scaled_gain;
if (delay >= target_delay) if (delay >= target_delay)
{ {
if (m_slow_start) if (m_slow_start)
@ -3642,15 +3652,15 @@ void utp_socket_impl::check_receive_buffers() const
void utp_socket_impl::check_invariant() const void utp_socket_impl::check_invariant() const
{ {
for (int i = m_outbuf.cursor(); for (int i = m_outbuf.cursor();
i != int((m_outbuf.cursor() + m_outbuf.span()) & ACK_MASK); i != int((m_outbuf.cursor() + m_outbuf.span()) & ACK_MASK);
i = (i + 1) & ACK_MASK) i = (i + 1) & ACK_MASK)
{ {
packet* p = (packet*)m_outbuf.at(i); packet* p = (packet*)m_outbuf.at(i);
if (m_mtu_seq == i && m_mtu_seq != 0 && p) if (!p) continue;
if (m_mtu_seq == i && m_mtu_seq != 0)
{ {
TORRENT_ASSERT(p->mtu_probe); TORRENT_ASSERT(p->mtu_probe);
} }
if (!p) continue;
TORRENT_ASSERT(((utp_header*)p->buf)->seq_nr == i); TORRENT_ASSERT(((utp_header*)p->buf)->seq_nr == i);
} }

View File

@ -55,9 +55,10 @@ delay_samples = 'points lc rgb "blue"'
delay_base = 'steps lw 2 lc rgb "purple"' delay_base = 'steps lw 2 lc rgb "purple"'
target_delay = 'steps lw 2 lc rgb "red"' target_delay = 'steps lw 2 lc rgb "red"'
off_target = 'dots lc rgb "blue"' off_target = 'dots lc rgb "blue"'
cwnd = 'steps lc rgb "green"' cwnd = 'steps lc rgb "green" lw 2'
window_size = 'steps lc rgb "sea-green"' window_size = 'steps lc rgb "sea-green"'
rtt = 'lines lc rgb "light-blue"' rtt = 'lines lc rgb "light-blue"'
send_buffer = 'lines lc rgb "light-red"'
metrics = { metrics = {
'our_delay':['our delay (ms)', 'x1y2', delay_samples], 'our_delay':['our delay (ms)', 'x1y2', delay_samples],
@ -79,7 +80,7 @@ metrics = {
'their_delay_base':['their delay base (us)', 'x1y1', delay_base], 'their_delay_base':['their delay base (us)', 'x1y1', delay_base],
'their_actual_delay':['their actual delay (us)', 'x1y1', delay_samples], 'their_actual_delay':['their actual delay (us)', 'x1y1', delay_samples],
'actual_delay':['actual_delay (us)', 'x1y1', delay_samples], 'actual_delay':['actual_delay (us)', 'x1y1', delay_samples],
'send_buffer':['send buffer size (B)', 'x1y1', 'lines'], 'send_buffer':['send buffer size (B)', 'x1y1', send_buffer],
'recv_buffer':['receive buffer size (B)', 'x1y1', 'lines'] 'recv_buffer':['receive buffer size (B)', 'x1y1', 'lines']
} }
@ -201,11 +202,17 @@ out.close()
plot = [ plot = [
{ {
'data': ['upload_rate', 'max_window', 'cur_window', 'wnduser', 'cur_window_packets', 'packet_size', 'rtt'], 'data': ['max_window', 'send_buffer', 'cur_window', 'rtt'],
'title': 'send-packet-size', 'title': 'send-packet-size',
'y1': 'Bytes', 'y1': 'Bytes',
'y2': 'Time (ms)' 'y2': 'Time (ms)'
}, },
{
'data': ['upload_rate', 'max_window', 'cur_window', 'wnduser', 'cur_window_packets', 'packet_size', 'rtt'],
'title': 'slow-start',
'y1': 'Bytes',
'y2': 'Time (ms)'
},
{ {
'data': ['max_window', 'cur_window', 'our_delay', 'target_delay', 'ssthres'], 'data': ['max_window', 'cur_window', 'our_delay', 'target_delay', 'ssthres'],
'title': 'cwnd', 'title': 'cwnd',