From 2178d1ad2bbd5166054a843de76302a06e831c12 Mon Sep 17 00:00:00 2001 From: arvidn Date: Sat, 12 Mar 2016 01:07:17 -0500 Subject: [PATCH] improve robustness and performance of uTP PMTU discovery. fix duplicate ACK issue in uTP. demote an invariant check to 'expensive' --- ChangeLog | 2 + include/libtorrent/utp_socket_manager.hpp | 14 +++ src/block_cache.cpp | 2 + src/bt_peer_connection.cpp | 2 +- src/peer_connection.cpp | 10 +- src/utp_socket_manager.cpp | 15 ++- src/utp_stream.cpp | 131 +++++++++++++++------- 7 files changed, 121 insertions(+), 55 deletions(-) diff --git a/ChangeLog b/ChangeLog index 798542c40..e28359fe2 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,6 +1,8 @@ 1.1.0 release + * improve robustness and performance of uTP PMTU discovery + * fix duplicate ACK issue in uTP * support filtering which parts of session state are loaded by load_state() * deprecate support for adding torrents by HTTP URL * allow specifying which tracker to scrape in scrape_tracker diff --git a/include/libtorrent/utp_socket_manager.hpp b/include/libtorrent/utp_socket_manager.hpp index b622701f1..ea7298072 100644 --- a/include/libtorrent/utp_socket_manager.hpp +++ b/include/libtorrent/utp_socket_manager.hpp @@ -95,6 +95,17 @@ namespace libtorrent void defer_ack(utp_socket_impl* s); void subscribe_drained(utp_socket_impl* s); + void restrict_mtu(int mtu) + { + m_restrict_mtu[m_mtu_idx] = mtu; + m_mtu_idx = (m_mtu_idx + 1) % m_restrict_mtu.size(); + } + + int restrict_mtu() const + { + return *std::max_element(m_restrict_mtu.begin(), m_restrict_mtu.end()); + } + // used to keep stats of uTP events // the counter is the enum from ``counters``. void inc_stats_counter(int counter, int delta = 1); @@ -154,6 +165,9 @@ namespace libtorrent // stats counters counters& m_counters; + boost::array m_restrict_mtu; + int m_mtu_idx; + // this is passed on to the instantiate connection // if this is non-null it will create SSL connections over uTP void* m_ssl_context; diff --git a/src/block_cache.cpp b/src/block_cache.cpp index 93a46a957..dcc4a2cb7 100644 --- a/src/block_cache.cpp +++ b/src/block_cache.cpp @@ -1216,7 +1216,9 @@ int block_cache::pad_job(disk_io_job const* j, int blocks_in_piece void block_cache::insert_blocks(cached_piece_entry* pe, int block, file::iovec_t *iov , int iov_len, disk_io_job* j, int flags) { +#ifdef TORRENT_EXPENSIVE_INVARIANT_CHECKS INVARIANT_CHECK; +#endif TORRENT_ASSERT(pe); TORRENT_ASSERT(pe->in_use); diff --git a/src/bt_peer_connection.cpp b/src/bt_peer_connection.cpp index 113a85d6b..1826e7de8 100644 --- a/src/bt_peer_connection.cpp +++ b/src/bt_peer_connection.cpp @@ -3578,7 +3578,7 @@ namespace libtorrent int next_barrier = m_enc_handler.encrypt(iovec); #ifndef TORRENT_DISABLE_LOGGING if (next_barrier != 0) - peer_log(peer_log_alert::outgoing_message, "SEND_BARRIER" + peer_log(peer_log_alert::outgoing, "SEND_BARRIER" , "encrypted block s = %d", next_barrier); #endif return next_barrier; diff --git a/src/peer_connection.cpp b/src/peer_connection.cpp index db8f14d79..53251dcf7 100644 --- a/src/peer_connection.cpp +++ b/src/peer_connection.cpp @@ -1709,15 +1709,7 @@ namespace libtorrent else if (m_ses.preemptive_unchoke()) { // if the peer is choked and we have upload slots left, - // then unchoke it. Another condition that has to be met - // is that the torrent doesn't keep track of the individual - // up/down ratio for each peer (ratio == 0) or (if it does - // keep track) this particular connection isn't a leecher. - // If the peer was choked because it was leeching, don't - // unchoke it again. - // The exception to this last condition is if we're a seed. - // In that case we don't care if people are leeching, they - // can't pay for their downloads anyway. + // then unchoke it. boost::shared_ptr t = m_torrent.lock(); TORRENT_ASSERT(t); diff --git a/src/utp_socket_manager.cpp b/src/utp_socket_manager.cpp index c6cb1ee4b..58d534827 100644 --- a/src/utp_socket_manager.cpp +++ b/src/utp_socket_manager.cpp @@ -35,6 +35,7 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/utp_socket_manager.hpp" #include "libtorrent/instantiate_connection.hpp" #include "libtorrent/socket_io.hpp" +#include "libtorrent/socket.hpp" // for TORRENT_HAS_DONT_FRAGMENT #include "libtorrent/broadcast_socket.hpp" // for is_teredo #include "libtorrent/random.hpp" #include "libtorrent/performance_counters.hpp" @@ -59,8 +60,11 @@ namespace libtorrent , m_last_if_update(min_time()) , m_sock_buf_size(0) , m_counters(cnt) + , m_mtu_idx(0) , m_ssl_context(ssl_context) - {} + { + m_restrict_mtu.fill(65536); + } utp_socket_manager::~utp_socket_manager() { @@ -128,7 +132,6 @@ namespace libtorrent // the address field in the SOCKS header if (addr.is_v4()) mtu -= 4; else mtu -= 16; - } else { @@ -136,7 +139,7 @@ namespace libtorrent else mtu -= TORRENT_IPV6_HEADER; } - utp_mtu = mtu; + utp_mtu = (std::min)(mtu, restrict_mtu()); } void utp_socket_manager::send_packet(udp::endpoint const& ep, char const* p @@ -159,12 +162,18 @@ namespace libtorrent #ifdef TORRENT_HAS_DONT_FRAGMENT error_code tmp; if (flags & utp_socket_manager::dont_fragment) + { m_sock.set_option(libtorrent::dont_fragment(true), tmp); + TORRENT_ASSERT_VAL(!tmp, tmp.message()); + } #endif m_sock.send(ep, p, len, ec); #ifdef TORRENT_HAS_DONT_FRAGMENT if (flags & utp_socket_manager::dont_fragment) + { m_sock.set_option(libtorrent::dont_fragment(false), tmp); + TORRENT_ASSERT_VAL(!tmp, tmp.message()); + } #endif } diff --git a/src/utp_stream.cpp b/src/utp_stream.cpp index 318bd8972..aa0a16b52 100644 --- a/src/utp_stream.cpp +++ b/src/utp_stream.cpp @@ -65,10 +65,7 @@ static struct utp_logger FILE* utp_log_file; mutex utp_log_mutex; - utp_logger() : utp_log_file(0) - { - utp_log_file = NULL; - } + utp_logger() : utp_log_file(NULL) {} ~utp_logger() { if (utp_log_file) fclose(utp_log_file); @@ -806,7 +803,8 @@ void utp_socket_impl::update_mtu_limits() { INVARIANT_CHECK; - TORRENT_ASSERT(m_mtu_floor <= m_mtu_ceiling); + if (m_mtu_floor > m_mtu_ceiling) m_mtu_floor = m_mtu_ceiling; + m_mtu = (m_mtu_floor + m_mtu_ceiling) / 2; if ((m_cwnd >> 16) < m_mtu) m_cwnd = boost::int64_t(m_mtu) << 16; @@ -1370,6 +1368,7 @@ void utp_socket_impl::send_syn() p->size = sizeof(utp_header); p->header_size = sizeof(utp_header); p->num_transmissions = 0; + p->mtu_probe = false; #ifdef TORRENT_DEBUG p->num_fast_resend = 0; #endif @@ -1492,6 +1491,12 @@ void utp_socket_impl::send_reset(utp_header const* ph) error_code ec; m_sm->send_packet(udp::endpoint(m_remote_address, m_port) , reinterpret_cast(&h), sizeof(h), ec); + if (ec) + { + UTP_LOGV("%8p: socket error: %s\n" + , static_cast(this) + , ec.message().c_str()); + } } std::size_t utp_socket_impl::available() const @@ -1755,7 +1760,7 @@ bool utp_socket_impl::send_pkt(int flags) INVARIANT_CHECK; #endif - bool force = (flags & pkt_ack) || (flags & pkt_fin); + bool const force = (flags & pkt_ack) || (flags & pkt_fin); // TORRENT_ASSERT(m_state != UTP_STATE_FIN_SENT || (flags & pkt_ack)); @@ -1794,14 +1799,29 @@ bool utp_socket_impl::send_pkt(int flags) if (sack > 32) sack = 32; } - boost::uint32_t close_reason = m_close_reason; + boost::uint32_t const close_reason = m_close_reason; - int header_size = sizeof(utp_header) + // MTU DISCOVERY + + // under these conditions, the next packet we send should be an MTU probe. + // MTU probes get to use the mid-point packet size, whereas other packets + // use a conservative packet size of the largest known to work. The reason + // for the cwnd condition is to make sure the probe is surrounded by non- + // probes, to be able to distinguish a loss of the probe vs. just loss in + // general. + bool const mtu_probe = (m_mtu_seq == 0 + && m_write_buffer_size >= m_mtu_floor * 3 + && m_seq_nr != 0 + && (m_cwnd >> 16) > m_mtu_floor * 3); + + int const header_size = sizeof(utp_header) + (sack ? sack + 2 : 0) + (close_reason ? 6 : 0); - int payload_size = m_write_buffer_size; - if (m_mtu - header_size < payload_size) - payload_size = m_mtu - header_size; + + // for non MTU-probes, use the conservative packet size + int const effective_mtu = mtu_probe ? m_mtu : m_mtu_floor; + int payload_size = (std::min)(m_write_buffer_size + , effective_mtu - header_size); // if we have one MSS worth of data, make sure it fits in our // congestion window and the advertized receive window from @@ -1828,11 +1848,11 @@ bool utp_socket_impl::send_pkt(int flags) #if TORRENT_UTP_LOG UTP_LOGV("%8p: skipping send seq_nr:%d ack_nr:%d " "id:%d target:%s header_size:%d error:%s send_buffer_size:%d cwnd:%d " - "adv_wnd:%d in-flight:%d mtu:%d\n" + "adv_wnd:%d in-flight:%d mtu:%d effective-mtu:%d\n" , static_cast(this), int(m_seq_nr), int(m_ack_nr) , m_send_id, print_endpoint(udp::endpoint(m_remote_address, m_port)).c_str() , header_size, m_error.message().c_str(), m_write_buffer_size, int(m_cwnd >> 16) - , m_adv_wnd, m_bytes_in_flight, m_mtu); + , m_adv_wnd, m_bytes_in_flight, m_mtu, effective_mtu); #endif return false; } @@ -1876,8 +1896,8 @@ bool utp_socket_impl::send_pkt(int flags) // need to keep the packet around (in the outbuf) if (payload_size) { - p = static_cast(malloc(sizeof(packet) + m_mtu)); - p->allocated = m_mtu; + p = static_cast(malloc(sizeof(packet) + effective_mtu)); + p->allocated = effective_mtu; buf_holder.reset(reinterpret_cast(p)); m_sm->inc_stats_counter(counters::utp_payload_pkts_out); @@ -1946,9 +1966,9 @@ bool utp_socket_impl::send_pkt(int flags) else sack = 0; - boost::int32_t size_left = p->allocated - p->size; - TORRENT_ASSERT(size_left > 0); - size_left = (std::min)(size_left, m_write_buffer_size); + boost::int32_t const size_left = (std::min)(p->allocated - p->size + , m_write_buffer_size); + write_payload(p->buf + p->size, size_left); p->size += size_left; @@ -2001,9 +2021,9 @@ bool utp_socket_impl::send_pkt(int flags) // outstanding packet is acked, we'll send this // payload UTP_LOGV("%8p: NAGLE not enough payload send_buffer_size:%d cwnd:%d " - "adv_wnd:%d in-flight:%d mtu:%d\n" + "adv_wnd:%d in-flight:%d mtu:%d effective_mtu:%d\n" , static_cast(this), m_write_buffer_size, int(m_cwnd >> 16) - , m_adv_wnd, m_bytes_in_flight, m_mtu); + , m_adv_wnd, m_bytes_in_flight, m_mtu, effective_mtu); TORRENT_ASSERT(m_nagle_packet == NULL); TORRENT_ASSERT(h->seq_nr == m_seq_nr); m_nagle_packet = p; @@ -2011,10 +2031,9 @@ bool utp_socket_impl::send_pkt(int flags) return false; } - // MTU DISCOVERY - if (m_mtu_seq == 0 - && p->size > m_mtu_floor - && m_seq_nr != 0) + // for ST_DATA packets, payload size is 0. Such packets do not have unique + // sequence numbers and should never be used as mtu probes + if ((mtu_probe || p->mtu_probe) && payload_size > 0) { p->mtu_probe = true; m_mtu_seq = m_seq_nr; @@ -2063,7 +2082,9 @@ bool utp_socket_impl::send_pkt(int flags) if (ec == error::message_size) { #if TORRENT_UTP_LOG - UTP_LOGV("%8p: error sending packet: %s\n", static_cast(this), ec.message().c_str()); + UTP_LOGV("%8p: error sending packet: %s\n" + , static_cast(this) + , ec.message().c_str()); #endif // if we fail even though this is not a probe, we're screwed // since we'd have to repacketize @@ -2074,8 +2095,7 @@ bool utp_socket_impl::send_pkt(int flags) // resend the packet immediately without // it being an MTU probe p->mtu_probe = false; - if (m_mtu_seq == m_ack_nr) - m_mtu_seq = 0; + m_mtu_seq = 0; ec.clear(); #if TORRENT_UTP_LOG @@ -2124,6 +2144,11 @@ bool utp_socket_impl::send_pkt(int flags) #endif TORRENT_ASSERT(h->seq_nr == m_seq_nr); + // 0 is a special sequence number, since it's also used as "uninitialized". + // we never send an mtu probe for sequence number 0 + TORRENT_ASSERT(p->mtu_probe == (m_seq_nr == m_mtu_seq) + || m_seq_nr == 0); + // release the buffer, we're saving it in the circular // buffer of outgoing packets buf_holder.release(); @@ -2290,10 +2315,15 @@ bool utp_socket_impl::resend_packet(packet* p, bool fast_resend) return !m_stalled; } -void utp_socket_impl::experienced_loss(int seq_nr) +void utp_socket_impl::experienced_loss(int const seq_nr) { INVARIANT_CHECK; + // the window size could go below one MMS here, if it does, + // we'll get a timeout in about one second + + m_sm->inc_stats_counter(counters::utp_packet_loss); + // since loss often comes in bursts, we only cut the // window in half once per RTT. This is implemented // by limiting which packets can cause us to cut the @@ -2319,11 +2349,6 @@ void utp_socket_impl::experienced_loss(int seq_nr) m_slow_start = false; UTP_LOGV("%8p: experienced loss, slow_start -> 0\n", static_cast(this)); } - - // the window size could go below one MMS here, if it does, - // we'll get a timeout in about one second - - m_sm->inc_stats_counter(counters::utp_packet_loss); } void utp_socket_impl::set_state(int s) @@ -2484,7 +2509,7 @@ bool utp_socket_impl::cancel_handlers(error_code const& ec, bool kill) TORRENT_ASSERT(ec); bool ret = m_read_handler || m_write_handler || m_connect_handler; - + // calling the callbacks with m_userdata being 0 will just crash TORRENT_ASSERT((ret && bool(m_userdata)) || !ret); @@ -2665,11 +2690,9 @@ void utp_socket_impl::init_mtu(int link_mtu, int utp_mtu) // set the ceiling to what we found out from the interface m_mtu_ceiling = utp_mtu; - // however, start the search from a more conservative MTU - int overhead = link_mtu - utp_mtu; - m_mtu = TORRENT_ETHERNET_MTU - overhead; + // start in the middle of the PMTU search space + m_mtu = (m_mtu_ceiling + m_mtu_floor) / 2; if (m_mtu > m_mtu_ceiling) m_mtu = m_mtu_ceiling; - if (m_mtu_floor > utp_mtu) m_mtu_floor = utp_mtu; // if the window size is smaller than one packet size @@ -2886,8 +2909,13 @@ bool utp_socket_impl::incoming_packet(boost::uint8_t const* buf, int size // if we get an ack for the same sequence number as // was last ACKed, and we have outstanding packets, - // it counts as a duplicate ack - if (ph->ack_nr == m_acked_seq_nr && m_outbuf.size()) + // it counts as a duplicate ack. The reason to not count ST_DATA packets as + // duplicate ACKs is because we may be receiving a stream of those + // regardless of our outgoing traffic, which makes their ACK number not + // indicative of a dropped packet + if (ph->ack_nr == m_acked_seq_nr + && m_outbuf.size() + && ph->get_type() == ST_STATE) { ++m_duplicate_acks; } @@ -3548,6 +3576,15 @@ void utp_socket_impl::tick(time_point now) if (m_outbuf.size()) ++m_num_timeouts; + UTP_LOGV("%8p: timeout num-timeouts: %d max-resends: %d confirmed: %d " + " acked-seq-num: %d mtu-seq: %d\n" + , static_cast(this) + , m_num_timeouts + , m_sm->num_resends() + , m_confirmed + , m_acked_seq_nr + , m_mtu_seq); + // a socket that has not been confirmed to actually have a live remote end // (the IP may have been spoofed) fail on the first timeout. If we had // heard anything from this peer, it would have been confirmed. @@ -3590,7 +3627,7 @@ void utp_socket_impl::tick(time_point now) m_timeout = now + milliseconds(packet_timeout()); - UTP_LOGV("%8p: timeout resetting cwnd:%d\n" + UTP_LOGV("%8p: resetting cwnd:%d\n" , static_cast(this), int(m_cwnd >> 16)); // we dropped all packets, that includes the mtu probe @@ -3606,7 +3643,7 @@ void utp_socket_impl::tick(time_point now) // we're very likely to have an ssthres set, which will make us leave // slow start before inducing more delay or loss. m_slow_start = true; - UTP_LOGV("%8p: timeout slow_start -> 1\n", static_cast(this)); + UTP_LOGV("%8p: slow_start -> 1\n", static_cast(this)); // we need to go one past m_seq_nr to cover the case // where we just sent a SYN packet and then adjusted for @@ -3639,6 +3676,16 @@ void utp_socket_impl::tick(time_point now) , static_cast(this), p->num_transmissions, socket_state_names[m_state]); #endif + if (p->size > m_mtu_floor) + { + // the packet that caused the connection to fail was an mtu probe + // (note that the mtu_probe field won't be set at this point because + // it's cleared when the packet is re-sent). This suggests that + // perhaps our network throws away oversized packets without + // fragmenting them. Tell the socket manager to be more conservative + // about mtu ceiling in the future + m_sm->restrict_mtu(m_mtu); + } // the connection is dead m_error = boost::asio::error::timed_out; set_state(UTP_STATE_ERROR_WAIT);