From 6c5aec3476818335663b1f8661b9af2da27cbf72 Mon Sep 17 00:00:00 2001 From: arvidn Date: Mon, 20 Jul 2015 08:20:17 -0700 Subject: [PATCH] fix request queue size performance issue --- ChangeLog | 1 + include/libtorrent/peer_connection.hpp | 19 +++++-- src/peer_connection.cpp | 73 ++++++++++++++++++-------- src/settings_pack.cpp | 4 +- src/utp_stream.cpp | 38 +++++++++----- tools/parse_utp_log.py | 13 +++-- 6 files changed, 105 insertions(+), 43 deletions(-) diff --git a/ChangeLog b/ChangeLog index 90ec3de2c..a180f3391 100644 --- a/ChangeLog +++ b/ChangeLog @@ -72,6 +72,7 @@ * almost completely changed the storage interface (for custom storage) * added support for hashing pieces in multiple threads + * fix request queue size performance issue * fix http scrape * add missing port mapping functions to python binding * fix bound-checking issue in bdecoder diff --git a/include/libtorrent/peer_connection.hpp b/include/libtorrent/peer_connection.hpp index e9f2b3496..dc1595304 100644 --- a/include/libtorrent/peer_connection.hpp +++ b/include/libtorrent/peer_connection.hpp @@ -127,7 +127,7 @@ namespace libtorrent // unexpectedly from the peer boost::uint32_t not_wanted:1; boost::uint32_t timed_out:1; - + // the busy flag is set if the block was // requested from another peer when this // request was queued. We only allow a single @@ -207,7 +207,7 @@ namespace libtorrent // take a raw pointer. torrent objects should always // outlive their peers boost::weak_ptr m_torrent; - + public: // a back reference to the session @@ -216,7 +216,7 @@ namespace libtorrent // settings that apply to this peer aux::session_settings const& m_settings; - + protected: // this is true if this connection has been added @@ -967,6 +967,12 @@ namespace libtorrent // for the round-robin unchoke algorithm. boost::int64_t m_uploaded_at_last_unchoke; + // the number of payload bytes downloaded last second tick + boost::int32_t m_downloaded_last_second; + + // the number of payload bytes uploaded last second tick + boost::int32_t m_uploaded_last_second; + // the number of bytes that the other // end has to send us in order to respond // to all outstanding piece requests we @@ -1113,6 +1119,7 @@ namespace libtorrent // the number of request we should queue up // at the remote end. + // TODO: 2 rename this target queue size boost::uint16_t m_desired_queue_size; #ifndef TORRENT_DISABLE_RESOLVE_COUNTRIES @@ -1229,6 +1236,12 @@ namespace libtorrent // other peers to compare it to. bool m_exceeded_limit:1; + // this is slow-start at the bittorrent layer. It affects how we increase + // desired queue size (i.e. the number of outstanding requests we keep). + // While the underlying transport protocol is in slow-start, the number of + // outstanding requests need to increase at the same pace to keep up. + bool m_slow_start:1; + // TODO: 3 factor this out into its own header and use it for UDP socket // and maybe second_timer as well template diff --git a/src/peer_connection.cpp b/src/peer_connection.cpp index e1d2c7722..1b6110709 100644 --- a/src/peer_connection.cpp +++ b/src/peer_connection.cpp @@ -145,6 +145,8 @@ namespace libtorrent , m_downloaded_at_last_round(0) , m_uploaded_at_last_round(0) , m_uploaded_at_last_unchoke(0) + , m_downloaded_last_second(0) + , m_uploaded_last_second(0) , m_outstanding_bytes(0) , m_last_seen_complete(0) , m_receiving_block(piece_block::invalid) @@ -159,7 +161,7 @@ namespace libtorrent , m_download_rate_peak(0) , m_upload_rate_peak(0) , m_send_barrier(INT_MAX) - , m_desired_queue_size(2) + , m_desired_queue_size(4) , m_prefer_contiguous_blocks(0) , m_disk_read_failures(0) , m_outstanding_piece_verification(0) @@ -181,6 +183,7 @@ namespace libtorrent , m_need_interest_update(false) , m_has_metadata(true) , m_exceeded_limit(false) + , m_slow_start(true) #if TORRENT_USE_ASSERTS , m_in_constructor(true) , m_disconnect_started(false) @@ -2611,6 +2614,10 @@ namespace libtorrent if (!m_bitfield_received) incoming_have_none(); if (is_disconnecting()) return; + // slow-start + if (m_slow_start) + m_desired_queue_size += 1; + update_desired_queue_size(); #ifndef TORRENT_DISABLE_EXTENSIONS @@ -3687,6 +3694,7 @@ namespace libtorrent boost::shared_ptr t = m_torrent.lock(); if (!t->ready_for_connections()) return; m_interesting = false; + m_slow_start = false; m_counters.inc_stats_counter(counters::num_peers_down_interested, -1); disconnect_if_redundant(); @@ -3711,7 +3719,7 @@ namespace libtorrent // don't suggest anything to a peer that isn't interested if (has_piece(piece) || !m_peer_interested) return; - + // we cannot suggest a piece we don't have! #if TORRENT_USE_ASSERTS boost::shared_ptr t = m_torrent.lock(); @@ -4536,23 +4544,30 @@ namespace libtorrent m_desired_queue_size = 1; return; } - + int download_rate = statistics().download_payload_rate(); - // calculate the desired download queue size + // the desired download queue size const int queue_time = m_settings.get_int(settings_pack::request_queue_time); - // (if the latency is more than this, the download will stall) - // so, the queue size is queue_time * down_rate / 16 kiB - // (16 kB is the size of each request) - // the minimum number of requests is 2 and the maximum is 48 - // the block size doesn't have to be 16. So we first query the - // torrent for it - boost::shared_ptr t = m_torrent.lock(); - const int block_size = t->block_size(); - TORRENT_ASSERT(block_size > 0); + // when we're in slow-start mode we increase the desired queue size every + // time we receive a piece, no need to adjust it here (other than + // enforcing the upper limit) + if (!m_slow_start) + { + // (if the latency is more than this, the download will stall) + // so, the queue size is queue_time * down_rate / 16 kiB + // (16 kB is the size of each request) + // the minimum number of requests is 2 and the maximum is 48 + // the block size doesn't have to be 16. So we first query the + // torrent for it + boost::shared_ptr t = m_torrent.lock(); + const int block_size = t->block_size(); - m_desired_queue_size = queue_time * download_rate / block_size; + TORRENT_ASSERT(block_size > 0); + + m_desired_queue_size = queue_time * download_rate / block_size; + } if (m_desired_queue_size > m_max_out_request_queue) m_desired_queue_size = m_max_out_request_queue; @@ -4561,9 +4576,9 @@ namespace libtorrent #ifndef TORRENT_DISABLE_LOGGING peer_log(peer_log_alert::info, "UPDATE_QUEUE_SIZE" - , "dqs: %d max: %d dl: %d qt: %d snubbed: %d" + , "dqs: %d max: %d dl: %d qt: %d snubbed: %d slow-start: %d" , m_desired_queue_size, m_max_out_request_queue - , download_rate, queue_time, int(m_snubbed)); + , download_rate, queue_time, int(m_snubbed), int(m_slow_start)); #endif } @@ -4787,6 +4802,23 @@ namespace libtorrent // if we haven't sent something in too long, send a keep-alive keep_alive(); + // if our download rate isn't increasing significantly anymore, end slow + // start. The 10kB is to have some slack here. + if (m_slow_start && m_downloaded_last_second > 0 + && m_downloaded_last_second + 10000 + >= m_statistics.last_payload_downloaded()) + { + m_slow_start = false; +#ifndef TORRENT_DISABLE_LOGGING + peer_log(peer_log_alert::info, "SLOW_START", "exit slow start: " + "prev-dl: %d dl: %d" + , int(m_downloaded_last_second) + , int(m_statistics.last_payload_downloaded())); +#endif + } + m_downloaded_last_second = m_statistics.last_payload_downloaded(); + m_uploaded_last_second = m_statistics.last_payload_uploaded(); + m_statistics.second_tick(tick_interval_ms); if (m_statistics.upload_payload_rate() > m_upload_rate_peak) @@ -4860,6 +4892,7 @@ namespace libtorrent if (!m_snubbed) { m_snubbed = true; + m_slow_start = false; if (t->alerts().should_post()) { t->alerts().emplace_alert(t->get_handle() @@ -4984,9 +5017,7 @@ namespace libtorrent // only add new piece-chunks if the send buffer is small enough // otherwise there will be no end to how large it will be! - boost::uint64_t upload_rate = m_statistics.upload_rate(); - - int buffer_size_watermark = int(upload_rate + int buffer_size_watermark = int(m_uploaded_last_second * m_settings.get_int(settings_pack::send_buffer_watermark_factor) / 100); if (buffer_size_watermark < m_settings.get_int(settings_pack::send_buffer_low_watermark)) @@ -5000,12 +5031,12 @@ namespace libtorrent #ifndef TORRENT_DISABLE_LOGGING peer_log(peer_log_alert::outgoing, "SEND_BUFFER_WATERMARK" - , "current watermark: %d max: %d min: %d factor: %d upload-rate: %d B/s" + , "current watermark: %d max: %d min: %d factor: %d uploaded: %d B/s" , buffer_size_watermark , m_ses.settings().get_int(settings_pack::send_buffer_watermark) , m_ses.settings().get_int(settings_pack::send_buffer_low_watermark) , m_ses.settings().get_int(settings_pack::send_buffer_watermark_factor) - , int(upload_rate)); + , int(m_uploaded_last_second)); #endif // don't just pop the front element here, since in seed mode one request may diff --git a/src/settings_pack.cpp b/src/settings_pack.cpp index d0eb5dfd1..5f8c1504f 100644 --- a/src/settings_pack.cpp +++ b/src/settings_pack.cpp @@ -245,7 +245,7 @@ namespace libtorrent SET(suggest_mode, settings_pack::no_piece_suggestions, 0), SET(max_queued_disk_bytes, 1024 * 1024, &session_impl::update_queued_disk_bytes), SET(handshake_timeout, 10, 0), - SET(send_buffer_low_watermark, 512, 0), + SET(send_buffer_low_watermark, 10 * 1024, 0), SET(send_buffer_watermark, 500 * 1024, 0), SET(send_buffer_watermark_factor, 50, 0), SET(choking_algorithm, settings_pack::fixed_slots_choker, 0), @@ -304,7 +304,7 @@ namespace libtorrent SET(connections_limit, 200, &session_impl::update_connections_limit), SET(connections_slack, 10, 0), SET(utp_target_delay, 100, 0), - SET(utp_gain_factor, 1500, 0), + SET(utp_gain_factor, 3000, 0), SET(utp_min_timeout, 500, 0), SET(utp_syn_resends, 2, 0), SET(utp_fin_resends, 2, 0), diff --git a/src/utp_stream.cpp b/src/utp_stream.cpp index 9d1fc3b5d..136ea9640 100644 --- a/src/utp_stream.cpp +++ b/src/utp_stream.cpp @@ -443,7 +443,7 @@ public: // it can also happen if the other end sends an advertized window // size less than one MSS. time_point m_timeout; - + // the last time we stepped the timestamp history time_point m_last_history_step; @@ -648,7 +648,7 @@ public: // this is done at startup of a socket in order to find its // link capacity faster. This behaves similar to TCP slow start bool m_slow_start:1; - + // this is true as long as we have as many packets in // flight as allowed by the congestion window (cwnd) bool m_cwnd_full:1; @@ -1699,7 +1699,9 @@ private: // congestion window, false if there is no more space. bool utp_socket_impl::send_pkt(int flags) { +#ifdef TORRENT_EXPENSIVE_INVARIANT_CHECKS INVARIANT_CHECK; +#endif bool force = (flags & pkt_ack) || (flags & pkt_fin); @@ -2254,8 +2256,15 @@ void utp_socket_impl::experienced_loss(int seq_nr) // less than or equal to. If we experience loss of the // same packet again, ignore it. if (compare_less_wrap(seq_nr, m_loss_seq_nr + 1, ACK_MASK)) return; - + + // cut window size in 2 + m_cwnd = (std::max)(m_cwnd * m_sm->loss_multiplier() / 100, boost::int64_t(m_mtu << 16)); + m_loss_seq_nr = m_seq_nr; + UTP_LOGV("%8p: Lost packet %d caused cwnd cut\n", this, seq_nr); + // if we happen to be in slow-start mode, we need to leave it + // note that we set ssthres to the window size _after_ reducing it. Next slow + // start should end before we over shoot. if (m_slow_start) { m_ssthres = m_cwnd >> 16; @@ -2263,14 +2272,9 @@ void utp_socket_impl::experienced_loss(int seq_nr) UTP_LOGV("%8p: experienced loss, slow_start -> 0\n", this); } - // cut window size in 2 - m_cwnd = (std::max)(m_cwnd * m_sm->loss_multiplier() / 100, boost::int64_t(m_mtu << 16)); - m_loss_seq_nr = m_seq_nr; - UTP_LOGV("%8p: Lost packet %d caused cwnd cut\n", this, seq_nr); - // the window size could go below one MMS here, if it does, // we'll get a timeout in about one second - + m_sm->inc_stats_counter(counters::utp_packet_loss); } @@ -2285,7 +2289,9 @@ void utp_socket_impl::set_state(int s) void utp_socket_impl::maybe_inc_acked_seq_nr() { +#ifdef TORRENT_EXPENSIVE_INVARIANT_CHECKS INVARIANT_CHECK; +#endif bool incremented = false; // don't pass m_seq_nr, since we move into sequence @@ -2317,7 +2323,9 @@ void utp_socket_impl::maybe_inc_acked_seq_nr() void utp_socket_impl::ack_packet(packet* p, time_point const& receive_time , boost::uint32_t& min_rtt, boost::uint16_t seq_nr) { +#ifdef TORRENT_EXPENSIVE_INVARIANT_CHECKS INVARIANT_CHECK; +#endif TORRENT_ASSERT(p); @@ -2364,7 +2372,9 @@ void utp_socket_impl::ack_packet(packet* p, time_point const& receive_time void utp_socket_impl::incoming(boost::uint8_t const* buf, int size, packet* p , time_point /* now */) { +#ifdef TORRENT_EXPENSIVE_INVARIANT_CHECKS INVARIANT_CHECK; +#endif while (!m_read_buffer.empty()) { @@ -3091,7 +3101,7 @@ bool utp_socket_impl::incoming_packet(boost::uint8_t const* buf, int size // it's impossible for delay to be more than the RTT, so make // sure to clamp it as a sanity check if (delay > min_rtt) delay = min_rtt; - + do_ledbat(acked_bytes, delay, prev_bytes_in_flight); m_send_delay = delay; } @@ -3345,7 +3355,7 @@ void utp_socket_impl::do_ledbat(const int acked_bytes, const int delay const boost::int64_t window_factor = (boost::int64_t(acked_bytes) << 16) / in_flight; const boost::int64_t delay_factor = (boost::int64_t(target_delay - delay) << 16) / target_delay; boost::int64_t scaled_gain; - + if (delay >= target_delay) { if (m_slow_start) @@ -3642,15 +3652,15 @@ void utp_socket_impl::check_receive_buffers() const void utp_socket_impl::check_invariant() const { for (int i = m_outbuf.cursor(); - i != int((m_outbuf.cursor() + m_outbuf.span()) & ACK_MASK); + i != int((m_outbuf.cursor() + m_outbuf.span()) & ACK_MASK); i = (i + 1) & ACK_MASK) { packet* p = (packet*)m_outbuf.at(i); - if (m_mtu_seq == i && m_mtu_seq != 0 && p) + if (!p) continue; + if (m_mtu_seq == i && m_mtu_seq != 0) { TORRENT_ASSERT(p->mtu_probe); } - if (!p) continue; TORRENT_ASSERT(((utp_header*)p->buf)->seq_nr == i); } diff --git a/tools/parse_utp_log.py b/tools/parse_utp_log.py index 788ddc261..179a03e13 100755 --- a/tools/parse_utp_log.py +++ b/tools/parse_utp_log.py @@ -55,9 +55,10 @@ delay_samples = 'points lc rgb "blue"' delay_base = 'steps lw 2 lc rgb "purple"' target_delay = 'steps lw 2 lc rgb "red"' off_target = 'dots lc rgb "blue"' -cwnd = 'steps lc rgb "green"' +cwnd = 'steps lc rgb "green" lw 2' window_size = 'steps lc rgb "sea-green"' rtt = 'lines lc rgb "light-blue"' +send_buffer = 'lines lc rgb "light-red"' metrics = { 'our_delay':['our delay (ms)', 'x1y2', delay_samples], @@ -79,7 +80,7 @@ metrics = { 'their_delay_base':['their delay base (us)', 'x1y1', delay_base], 'their_actual_delay':['their actual delay (us)', 'x1y1', delay_samples], 'actual_delay':['actual_delay (us)', 'x1y1', delay_samples], - 'send_buffer':['send buffer size (B)', 'x1y1', 'lines'], + 'send_buffer':['send buffer size (B)', 'x1y1', send_buffer], 'recv_buffer':['receive buffer size (B)', 'x1y1', 'lines'] } @@ -201,11 +202,17 @@ out.close() plot = [ { - 'data': ['upload_rate', 'max_window', 'cur_window', 'wnduser', 'cur_window_packets', 'packet_size', 'rtt'], + 'data': ['max_window', 'send_buffer', 'cur_window', 'rtt'], 'title': 'send-packet-size', 'y1': 'Bytes', 'y2': 'Time (ms)' }, + { + 'data': ['upload_rate', 'max_window', 'cur_window', 'wnduser', 'cur_window_packets', 'packet_size', 'rtt'], + 'title': 'slow-start', + 'y1': 'Bytes', + 'y2': 'Time (ms)' + }, { 'data': ['max_window', 'cur_window', 'our_delay', 'target_delay', 'ssthres'], 'title': 'cwnd',