diff --git a/ChangeLog b/ChangeLog index 5585f1c31..ea8045159 100644 --- a/ChangeLog +++ b/ChangeLog @@ -93,6 +93,8 @@ * resume data no longer has timestamps of files * require C++11 to build libtorrent + * uTP performance fixes + 1.1.11 release * fix move_storage with save_path with a trailing slash diff --git a/include/libtorrent/utp_socket_manager.hpp b/include/libtorrent/utp_socket_manager.hpp index 2110f02b5..849598ac4 100644 --- a/include/libtorrent/utp_socket_manager.hpp +++ b/include/libtorrent/utp_socket_manager.hpp @@ -147,11 +147,10 @@ namespace libtorrent { using socket_vector_t = std::vector; - // this is a list of sockets that needs to send an ack. - // once the UDP socket is drained, all of these will - // have a chance to do that. This is to avoid sending - // an ack for every single packet - socket_vector_t m_deferred_acks; + // if this is set, it means this socket still needs to send an ACK. Once + // we exit the loop processing packets, or switch to processing packets + // for a different socket, issue the ACK packet and clear this. + utp_socket_impl* m_deferred_ack = nullptr; // storage used for saving cpu time on "push_back" // by using already pre-allocated vector diff --git a/src/utp_socket_manager.cpp b/src/utp_socket_manager.cpp index 3cb80cb54..2a3947a48 100644 --- a/src/utp_socket_manager.cpp +++ b/src/utp_socket_manager.cpp @@ -177,6 +177,12 @@ namespace libtorrent { return utp_incoming_packet(m_last_socket, p, ep, receive_time); } + if (m_deferred_ack) + { + utp_send_ack(m_deferred_ack); + m_deferred_ack = nullptr; + } + auto r = m_utp_sockets.equal_range(id); for (; r.first != r.second; ++r.first) @@ -227,6 +233,7 @@ namespace libtorrent { utp_init_socket(str->get_impl(), std::move(socket)); bool ret = utp_incoming_packet(str->get_impl(), p, ep, receive_time); if (!ret) return false; + m_last_socket = str->get_impl(); m_cb(c); // the connection most likely changed its connection ID here // we need to move it to the correct ID @@ -262,16 +269,11 @@ namespace libtorrent { void utp_socket_manager::socket_drained() { - // flush all deferred acks - - if (!m_deferred_acks.empty()) + if (m_deferred_ack) { - m_temp_sockets.clear(); - m_deferred_acks.swap(m_temp_sockets); - for (auto const &s : m_temp_sockets) - { - utp_send_ack(s); - } + utp_socket_impl* s = m_deferred_ack; + m_deferred_ack = nullptr; + utp_send_ack(s); } if (!m_drained_event.empty()) @@ -287,9 +289,8 @@ namespace libtorrent { void utp_socket_manager::defer_ack(utp_socket_impl* s) { - TORRENT_ASSERT(std::find(m_deferred_acks.begin(), m_deferred_acks.end(), s) - == m_deferred_acks.end()); - m_deferred_acks.push_back(s); + TORRENT_ASSERT(m_deferred_ack == NULL || m_deferred_ack == s); + m_deferred_ack = s; } void utp_socket_manager::subscribe_drained(utp_socket_impl* s) @@ -316,6 +317,7 @@ namespace libtorrent { if (i == m_utp_sockets.end()) return; delete_utp_impl(i->second); if (m_last_socket == i->second) m_last_socket = nullptr; + if (m_deferred_ack == i->second) m_deferred_ack = nullptr; m_utp_sockets.erase(i); } diff --git a/src/utp_stream.cpp b/src/utp_stream.cpp index 4e397ec7b..746b4c7a8 100644 --- a/src/utp_stream.cpp +++ b/src/utp_stream.cpp @@ -48,10 +48,6 @@ POSSIBILITY OF SUCH DAMAGE. #include // for accumulate #endif -// the behavior of the sequence numbers as implemented by uTorrent is not -// particularly regular. This switch indicates the odd parts. -#define TORRENT_UT_SEQ 1 - #if TORRENT_UTP_LOG #include #include // for PRId64 et.al. @@ -280,7 +276,7 @@ struct utp_socket_impl void parse_close_reason(std::uint8_t const* ptr, int size); void write_payload(std::uint8_t* ptr, int size); void maybe_inc_acked_seq_nr(); - std::uint32_t ack_packet(packet_ptr p, time_point const& receive_time + std::uint32_t ack_packet(packet_ptr p, time_point receive_time , std::uint16_t seq_nr); void write_sack(std::uint8_t* buf, int size) const; void incoming(std::uint8_t const* buf, int size, packet_ptr p, time_point now); @@ -1067,8 +1063,8 @@ std::size_t utp_stream::read_some(bool const clear_buffers) if (m_impl->m_receive_buffer_size == 0) { - UTP_LOGV(" Didn't fill entire target: %d bytes left in buffer\n" - , m_impl->m_receive_buffer_size); + UTP_LOGV("%8p: Didn't fill entire target: %d bytes left in buffer\n" + , static_cast(m_impl), m_impl->m_receive_buffer_size); break; } } @@ -1452,14 +1448,14 @@ void utp_socket_impl::parse_close_reason(std::uint8_t const* ptr, int const size // returns (rtt, acked_bytes) std::pair utp_socket_impl::parse_sack(std::uint16_t const packet_ack - , std::uint8_t const* ptr, int size, time_point const now) + , std::uint8_t const* ptr, int const size, time_point const now) { INVARIANT_CHECK; if (size == 0) return { 0u, 0 }; // this is the sequence number the current bit represents - std::uint32_t ack_nr = (packet_ack + 2) & ACK_MASK; + std::uint16_t ack_nr = (packet_ack + 2) & ACK_MASK; #if TORRENT_VERBOSE_UTP_LOG std::string bitmask; @@ -1475,35 +1471,34 @@ std::pair utp_socket_impl::parse_sack(std::uint16_t const pa mask <<= 1; } } - UTP_LOGV("%8p: got SACK first:%d %s our_seq_nr:%u\n" - , static_cast(this), ack_nr, bitmask.c_str(), m_seq_nr); + UTP_LOGV("%8p: got SACK first:%d %s our_seq_nr:%u fast_resend_seq_nr:%d\n" + , static_cast(this), ack_nr, bitmask.c_str(), m_seq_nr, m_fast_resend_seq_nr); #endif - // the number of acked packets past the fast re-send sequence number - // this is used to determine if we should trigger more fast re-sends - int dups = 0; - - // the sequence number of the last ACKed packet - std::uint32_t last_ack = packet_ack; + aux::array resend; + int num_to_resend = 0; int acked_bytes = 0; std::uint32_t min_rtt = std::numeric_limits::max(); + // this was implicitly lost + if (!compare_less_wrap((packet_ack + 1) & ACK_MASK, m_fast_resend_seq_nr, ACK_MASK)) + { + resend[num_to_resend++] = (packet_ack + 1) & ACK_MASK; + } + // for each byte - for (std::uint8_t const* end = ptr + size; ptr != end; ++ptr) + std::uint8_t const* const start = ptr; + std::uint8_t const* const end = ptr + size; + for (; ptr != end; ++ptr) { std::uint8_t bitfield = *ptr; - unsigned char mask = 1; + std::uint8_t mask = 1; // for each bit for (int i = 0; i < 8; ++i) { if (mask & bitfield) { - last_ack = ack_nr; - if (m_fast_resend_seq_nr == ack_nr) - m_fast_resend_seq_nr = (m_fast_resend_seq_nr + 1) & ACK_MASK; - - if (compare_less_wrap(m_fast_resend_seq_nr, ack_nr, ACK_MASK)) ++dups; // this bit was set, ack_nr was received packet_ptr p = m_outbuf.remove(aux::numeric_cast(ack_nr)); if (p) @@ -1521,6 +1516,11 @@ std::pair utp_socket_impl::parse_sack(std::uint16_t const pa maybe_inc_acked_seq_nr(); } } + else if (!compare_less_wrap(ack_nr, m_fast_resend_seq_nr, ACK_MASK) + && num_to_resend < int(resend.size())) + { + resend[num_to_resend++] = ack_nr; + } mask <<= 1; ack_nr = (ack_nr + 1) & ACK_MASK; @@ -1533,30 +1533,74 @@ std::pair utp_socket_impl::parse_sack(std::uint16_t const pa if (ack_nr == m_seq_nr) break; } + // now, scan the bits in reverse, and count the number of ACKed packets. Only + // lost packets followed by 'dup_ack_limit' packets may be resent + // start with the sequence number represented by the last bit in the SACK + // bitmask + std::uint16_t last_resend = (packet_ack + 1 + size * 8) & ACK_MASK; + + // the number of acked packets past the fast re-send sequence number + // this is used to determine if we should trigger more fast re-sends + int dups = 0; + + for (std::uint8_t const* i = end; i != start; --i) + { + std::uint8_t const bitfield = i[-1]; + std::uint8_t mask = 0x80; + // for each bit + for (int k = 0; k < 8; ++k) + { + if (mask & bitfield) ++dups; + if (dups > dup_ack_limit) break; + last_resend = (last_resend - 1) & ACK_MASK; + mask >>= 1; + } + if (dups > dup_ack_limit) break; + } + + // we did not get enough packets acked in this message to warrant a resend + if (dups <= dup_ack_limit) + { + UTP_LOGV("%8p: only %d ACKs in SACK, requires more than %d to trigger fast retransmit\n" + , static_cast(this), dups, dup_ack_limit); + num_to_resend = 0; + } + + // now we need to (likely) prune the tail of the resend list, since all + // "unacked" packets that weren't followed by an acked one, don't count + while (num_to_resend > 0 && !compare_less_wrap(resend[num_to_resend - 1], last_resend, ACK_MASK)) + { + --num_to_resend; + } + TORRENT_ASSERT(m_outbuf.at((m_acked_seq_nr + 1) & ACK_MASK) || ((m_seq_nr - m_acked_seq_nr) & ACK_MASK) <= 1); - // we received more than dup_ack_limit ACKs in this SACK message. - // trigger fast re-send - if (dups >= dup_ack_limit && compare_less_wrap(m_fast_resend_seq_nr, last_ack, ACK_MASK)) - { - experienced_loss(m_fast_resend_seq_nr); - int num_resent = 0; + bool cut_cwnd = true; - // only re-sending a single packet per sack - // appears to improve performance by making it - // less likely to loose the re-sent packet. Because - // when that happens, we must time-out in order - // to continue, which takes a long time. - if (m_fast_resend_seq_nr != last_ack) + // we received more than dup_ack_limit ACKs in this SACK message. + // trigger fast re-send. This is not an equal check because 3 identical ACKS + // are only 2 duplicates + for (int i = 0; i < num_to_resend; ++i) + { + std::uint16_t const pkt_seq = resend[i]; + + packet* p = m_outbuf.at(pkt_seq); + UTP_LOGV("%8p: Packet %d lost. (fast_resend_seq_nr:%d trigger fast-resend)\n" + , static_cast(this), pkt_seq, m_fast_resend_seq_nr); + if (p) { - packet* p = m_outbuf.at(m_fast_resend_seq_nr); - m_fast_resend_seq_nr = (m_fast_resend_seq_nr + 1) & ACK_MASK; - if (p) + if (resend_packet(p, true)) { - ++num_resent; - if (resend_packet(p, true)) m_duplicate_acks = 0; + m_duplicate_acks = 0; + m_fast_resend_seq_nr = (pkt_seq + 1) & ACK_MASK; } } + + if (cut_cwnd) + { + experienced_loss(pkt_seq); + cut_cwnd = false; + } } return { min_rtt, acked_bytes }; @@ -1869,8 +1913,11 @@ bool utp_socket_impl::send_pkt(int const flags) write_payload(p->buf + p->size, size_left); p->size += std::uint16_t(size_left); - UTP_LOGV("%8p: NAGLE appending %d bytes to nagle packet. new size: %d allocated: %d\n" - , static_cast(this), size_left, p->size, p->allocated); + if (size_left > 0) + { + UTP_LOGV("%8p: NAGLE appending %d bytes to nagle packet. new size: %d allocated: %d\n" + , static_cast(this), size_left, p->size, p->allocated); + } // did we fill up the whole mtu? // if we didn't, we may still send it if there's @@ -2031,12 +2078,6 @@ bool utp_socket_impl::send_pkt(int const flags) // be a nagle packet waiting for more data TORRENT_ASSERT(!m_nagle_packet); -#if !TORRENT_UT_SEQ - // if the other end closed the connection immediately - // our FIN packet will end up having the same sequence - // number as the SYN, so this assert is invalid - TORRENT_ASSERT(!m_outbuf.at(m_seq_nr)); -#endif TORRENT_ASSERT(h->seq_nr == m_seq_nr); // 0 is a special sequence number, since it's also used as "uninitialized". @@ -2243,7 +2284,8 @@ void utp_socket_impl::experienced_loss(std::uint32_t const seq_nr) { m_ssthres = std::int32_t(m_cwnd >> 16); m_slow_start = false; - UTP_LOGV("%8p: experienced loss, slow_start -> 0\n", static_cast(this)); + UTP_LOGV("%8p: experienced loss, slow_start -> 0 ssthres:%d\n" + , static_cast(this), m_ssthres); } } @@ -2290,7 +2332,7 @@ void utp_socket_impl::maybe_inc_acked_seq_nr() } // returns RTT -std::uint32_t utp_socket_impl::ack_packet(packet_ptr p, time_point const& receive_time +std::uint32_t utp_socket_impl::ack_packet(packet_ptr p, time_point const receive_time , std::uint16_t seq_nr) { #ifdef TORRENT_EXPENSIVE_INVARIANT_CHECKS @@ -2314,7 +2356,6 @@ std::uint32_t utp_socket_impl::ack_packet(packet_ptr p, time_point const& receiv TORRENT_ASSERT(p->mtu_probe); // our mtu probe was acked! m_mtu_floor = std::max(m_mtu_floor, p->size); - if (m_mtu_ceiling < m_mtu_floor) m_mtu_ceiling = m_mtu_floor; update_mtu_limits(); } @@ -2703,11 +2744,10 @@ bool utp_socket_impl::incoming_packet(span buf // something is wrong. // If our state is state_none, this packet must be a syn packet // and the ack_nr should be ignored - std::uint16_t cmp_seq_nr = (m_seq_nr - 1) & ACK_MASK; -#if TORRENT_UT_SEQ - if (m_state == UTP_STATE_SYN_SENT && ph->get_type() == ST_STATE) - cmp_seq_nr = m_seq_nr; -#endif + std::uint16_t const cmp_seq_nr = + (m_state == UTP_STATE_SYN_SENT && ph->get_type() == ST_STATE) + ? m_seq_nr : (m_seq_nr - 1) & ACK_MASK; + if ((m_state != UTP_STATE_NONE || ph->get_type() != ST_SYN) && (compare_less_wrap(cmp_seq_nr, ph->ack_nr, ACK_MASK) || compare_less_wrap(ph->ack_nr, m_acked_seq_nr @@ -2745,7 +2785,9 @@ bool utp_socket_impl::incoming_packet(span buf // if the socket is closing, always ignore any packet // with a higher sequence number than the FIN sequence number - if (m_eof && compare_less_wrap(m_eof_seq_nr, ph->seq_nr, ACK_MASK)) + // ST_STATE messages always include the next seqnr. + if (m_eof && (compare_less_wrap(m_eof_seq_nr, ph->seq_nr, ACK_MASK) + || (m_eof_seq_nr == ph->seq_nr && ph->get_type() != ST_STATE))) { #if TORRENT_UTP_LOG UTP_LOG("%8p: ERROR: incoming packet type: %s seq_nr:%d eof_seq_nr:%d (ignored)\n" @@ -3430,13 +3472,15 @@ void utp_socket_impl::do_ledbat(const int acked_bytes, const int delay , static_cast(this), m_mtu, in_flight, int(m_adv_wnd), int(m_cwnd >> 16), acked_bytes); m_cwnd_full = false; } - +/* if ((m_cwnd >> 16) >= m_adv_wnd) { m_slow_start = false; + m_ssthres = (m_cwnd >> 16); UTP_LOGV("%8p: cwnd > advertized wnd (%u) slow_start -> 0\n" , static_cast(this), m_adv_wnd); } +*/ } void utp_stream::bind(endpoint_type const&, error_code&) { } @@ -3531,7 +3575,6 @@ void utp_socket_impl::tick(time_point const now) // we had was the probe. Assume it was dropped // because it was too big m_mtu_ceiling = m_mtu - 1; - if (m_mtu_floor > m_mtu_ceiling) m_mtu_floor = m_mtu_ceiling; update_mtu_limits(); } diff --git a/tools/parse_utp_log.py b/tools/parse_utp_log.py index 0531a88b4..2e5f0b514 100755 --- a/tools/parse_utp_log.py +++ b/tools/parse_utp_log.py @@ -82,13 +82,23 @@ metrics = { 'get_microseconds': ['clock (us)', 'x1y1', 'steps'], 'wnduser': ['advertised window size (B)', 'x1y1', 'steps'], 'ssthres': ['slow-start threshold (B)', 'x1y1', 'steps'], + 'timeout': ['until next timeout (ms)', 'x1y2', 'steps'], + 'rto': ['current timeout (ms)', 'x1y2', 'steps'], 'delay_base': ['delay base (us)', 'x1y1', delay_base], 'their_delay_base': ['their delay base (us)', 'x1y1', delay_base], 'their_actual_delay': ['their actual delay (us)', 'x1y1', delay_samples], 'actual_delay': ['actual_delay (us)', 'x1y1', delay_samples], 'send_buffer': ['send buffer size (B)', 'x1y1', send_buffer], - 'recv_buffer': ['receive buffer size (B)', 'x1y1', 'lines'] + 'recv_buffer': ['receive buffer size (B)', 'x1y1', 'lines'], + 'packet_loss': ['packet lost', 'x1y2', 'steps'], + 'packet_timeout': ['packet timed out', 'x1y2', 'steps'], + 'acked_bytes': ['Bytes ACKed by packet', 'x1y2', 'steps'], + 'bytes_sent': ['cumulative bytes sent', 'x1y2', 'steps'], + 'bytes_resent': ['cumulative bytes resent', 'x1y2', 'steps'], + 'written': ['reported written bytes', 'x1y2', 'steps'], + 'ack_nr': ['acked sequence number', 'x1y2', 'steps'], + 'seq_nr': ['sent sequence number', 'x1y2', 'steps'], } histogram_quantization = 1.0 @@ -101,10 +111,16 @@ begin = None title = "-" packet_loss = 0 packet_timeout = 0 +num_acked = 0 delay_histogram = {} packet_size_histogram = {} window_size = {'0': 0, '1': 0} +bytes_sent = 0 +bytes_resent = 0 +written = 0 +ack_nr = 0 +seq_nr = 0 # [35301484] 0x00ec1190: actual_delay:1021583 our_delay:102 their_delay:-1021345 off_target:297 max_window:2687 # upload_rate:18942 delay_base:1021481154 delay_sum:-1021242 target_delay:400 acked_bytes:1441 cur_window:2882 @@ -140,15 +156,37 @@ for line in file: print("\r%d " % counter, end=' ') if "lost." in line: - packet_loss = packet_loss + 1 + packet_loss += 1 continue - if "Packet timeout" in line: - packet_timeout = packet_timeout + 1 + + if "lost (timeout)" in line: + packet_timeout += 1 continue + if "acked packet " in line: + num_acked += 1 + if "sending packet" in line: v = line.split('size:')[1].split(' ')[0] packet_size_histogram[v] = 1 + packet_size_histogram.get(v, 0) + bytes_sent += int(v) + + if "re-sending packet" in line: + v = line.split('size:')[1].split(' ')[0] + bytes_resent += int(v) + + if 'calling write handler' in line: + v = line.split('written:')[1].split(' ')[0] + written += int(v) + + if "incoming packet" in line \ + and "ERROR" not in line \ + and "seq_nr:" in line \ + and "type:ST_SYN" not in line: + if "ack_nr:" not in line: + print(line) + ack_nr = int(line.split('ack_nr:')[1].split(' ')[0]) + seq_nr = int(line.split('seq_nr:')[1].split(' ')[0]) if "our_delay:" not in line: continue @@ -194,9 +232,16 @@ for line in file: print('%f\t' % int(reduce(lambda a, b: a + b, list(window_size.values()))), end=' ', file=out) else: print('%f\t' % v, end=' ', file=out) - print(float(packet_loss * 8000), float(packet_timeout * 8000), file=out) + + if fill_columns: + columns += ['packet_loss', 'packet_timeout', 'bytes_sent', 'ack_nr', + 'seq_nr', 'bytes_resent', 'written'] + print(float(packet_loss), float(packet_timeout), float(bytes_sent), + ack_nr, seq_nr, float(bytes_resent), written, file=out) packet_loss = 0 packet_timeout = 0 + num_acked = 0 + written = 0 out.close() @@ -217,6 +262,12 @@ plot = [ 'y1': 'Bytes', 'y2': 'Time (ms)' }, + { + 'data': ['max_window', 'send_buffer', 'cur_window', 'written'], + 'title': 'bytes-written', + 'y1': 'Bytes', + 'y2': 'Time (ms)' + }, { 'data': ['upload_rate', 'max_window', 'cur_window', 'wnduser', 'cur_window_packets', 'packet_size', 'rtt'], 'title': 'slow-start', @@ -229,6 +280,30 @@ plot = [ 'y1': 'Bytes', 'y2': 'Time (ms)' }, + { + 'data': ['max_window', 'cur_window', 'packet_loss'], + 'title': 'packet-loss', + 'y1': 'Bytes', + 'y2': 'count' + }, + { + 'data': ['max_window', 'cur_window', 'packet_timeout'], + 'title': 'packet-timeout', + 'y1': 'Bytes', + 'y2': 'count' + }, + { + 'data': ['max_window', 'cur_window', 'bytes_sent', 'bytes_resent'], + 'title': 'cumulative-bytes-sent', + 'y1': 'Bytes', + 'y2': 'Cumulative Bytes' + }, + { + 'data': ['max_window', 'cur_window', 'rto', 'timeout'], + 'title': 'connection-timeout', + 'y1': 'Bytes', + 'y2': 'Time (ms)' + }, { 'data': ['our_delay', 'max_window', 'target_delay', 'cur_window', 'wnduser', 'cur_window_packets'], 'title': 'uploading', @@ -237,19 +312,19 @@ plot = [ }, { 'data': ['our_delay', 'max_window', 'target_delay', 'cur_window', 'send_buffer'], - 'title': 'uploading_packets', + 'title': 'uploading-packets', 'y1': 'Bytes', 'y2': 'Time (ms)' }, { 'data': ['their_delay', 'target_delay', 'rtt'], - 'title': 'their_delay', + 'title': 'their-delay', 'y1': '', 'y2': 'Time (ms)' }, { 'data': ['their_actual_delay', 'their_delay_base'], - 'title': 'their_delay_base', + 'title': 'their-delay-base', 'y1': 'Time (us)', 'y2': '' }, @@ -261,9 +336,21 @@ plot = [ }, { 'data': ['actual_delay', 'delay_base'], - 'title': 'our_delay_base', + 'title': 'our-delay-base', 'y1': 'Time (us)', 'y2': '' + }, + { + 'data': ['ack_nr', 'seq_nr'], + 'title': 'sequence-numbers', + 'y1': 'Bytes', + 'y2': 'seqnr' + }, + { + 'data': ['max_window', 'cur_window', 'acked_bytes'], + 'title': 'ack-rate', + 'y1': 'Bytes', + 'y2': 'Bytes' } ]