From 1aa47fc13ec1d7433891985ccdff8d2e2b6d1db8 Mon Sep 17 00:00:00 2001 From: Arvid Norberg Date: Sat, 14 Sep 2013 10:06:48 +0000 Subject: [PATCH] add more utp statistics and make the congestion controller less aggressive when the up-link is not saturated --- include/libtorrent/session_status.hpp | 15 ++++ include/libtorrent/utp_socket_manager.hpp | 24 ++++++ src/session_impl.cpp | 26 +++++++ src/utp_socket_manager.cpp | 20 +++++ src/utp_stream.cpp | 91 +++++++++++++++-------- 5 files changed, 144 insertions(+), 32 deletions(-) diff --git a/include/libtorrent/session_status.hpp b/include/libtorrent/session_status.hpp index 66420dd4f..6777699ef 100644 --- a/include/libtorrent/session_status.hpp +++ b/include/libtorrent/session_status.hpp @@ -70,11 +70,26 @@ namespace libtorrent struct utp_status { + // gauges int num_idle; int num_syn_sent; int num_connected; int num_fin_sent; int num_close_wait; + + // counters + boost::uint64_t packet_loss; + boost::uint64_t timeout; + boost::uint64_t packets_in; + boost::uint64_t packets_out; + boost::uint64_t fast_retransmit; + boost::uint64_t packet_resend; + boost::uint64_t samples_above_target; + boost::uint64_t samples_below_target; + boost::uint64_t payload_pkts_in; + boost::uint64_t payload_pkts_out; + boost::uint64_t invalid_pkts_in; + boost::uint64_t redundant_pkts_in; }; struct TORRENT_EXPORT session_status diff --git a/include/libtorrent/utp_socket_manager.hpp b/include/libtorrent/utp_socket_manager.hpp index 51483978c..c2f396f06 100644 --- a/include/libtorrent/utp_socket_manager.hpp +++ b/include/libtorrent/utp_socket_manager.hpp @@ -95,6 +95,27 @@ namespace libtorrent void defer_ack(utp_socket_impl* s); void subscribe_drained(utp_socket_impl* s); + enum counter_t + { + packet_loss = 0, + timeout, + packets_in, + packets_out, + fast_retransmit, + packet_resend, + samples_above_target, + samples_below_target, + payload_pkts_in, + payload_pkts_out, + invalid_pkts_in, + redundant_pkts_in, + + num_counters, + }; + + // used to keep stats of uTP events + void inc_stats_counter(int counter); + private: udp_socket& m_sock; incoming_utp_callback_t m_cb; @@ -143,6 +164,9 @@ namespace libtorrent // the buffer size of the socket. This is used // to now lower the buffer size int m_sock_buf_size; + + // stats counters + boost::uint64_t m_counters[num_counters]; }; } diff --git a/src/session_impl.cpp b/src/session_impl.cpp index 879a8f47b..8b7a6f3a6 100644 --- a/src/session_impl.cpp +++ b/src/session_impl.cpp @@ -1328,6 +1328,19 @@ namespace aux { ":peers up send buffer" + ":packet_loss" + ":timeout" + ":packets_in" + ":packets_out" + ":fast_retransmit" + ":packet_resend" + ":samples_above_target" + ":samples_below_target" + ":payload_pkts_in" + ":payload_pkts_out" + ":invalid_pkts_in" + ":redundant_pkts_in" + "\n\n", m_stats_logger); } #endif @@ -4100,6 +4113,19 @@ retry: STAT_LOG(d, peers_up_send_buffer); + STAT_LOG(PRId64, sst.utp_stats.packet_loss); + STAT_LOG(PRId64, sst.utp_stats.timeout); + STAT_LOG(PRId64, sst.utp_stats.packets_in); + STAT_LOG(PRId64, sst.utp_stats.packets_out); + STAT_LOG(PRId64, sst.utp_stats.fast_retransmit); + STAT_LOG(PRId64, sst.utp_stats.packet_resend); + STAT_LOG(PRId64, sst.utp_stats.samples_above_target); + STAT_LOG(PRId64, sst.utp_stats.samples_below_target); + STAT_LOG(PRId64, sst.utp_stats.payload_pkts_in); + STAT_LOG(PRId64, sst.utp_stats.payload_pkts_out); + STAT_LOG(PRId64, sst.utp_stats.invalid_pkts_in); + STAT_LOG(PRId64, sst.utp_stats.redundant_pkts_in); + fprintf(m_stats_logger, "\n"); #undef STAT_LOG diff --git a/src/utp_socket_manager.cpp b/src/utp_socket_manager.cpp index 0c8e05190..8a2f6e347 100644 --- a/src/utp_socket_manager.cpp +++ b/src/utp_socket_manager.cpp @@ -72,6 +72,19 @@ namespace libtorrent s.num_fin_sent = 0; s.num_close_wait = 0; + s.packet_loss = m_counters[packet_loss]; + s.timeout = m_counters[timeout]; + s.packets_in = m_counters[packets_in]; + s.packets_out = m_counters[packets_out]; + s.fast_retransmit = m_counters[fast_retransmit]; + s.packet_resend = m_counters[packet_resend]; + s.samples_above_target = m_counters[samples_above_target]; + s.samples_below_target = m_counters[samples_below_target]; + s.payload_pkts_in = m_counters[payload_pkts_in]; + s.payload_pkts_out = m_counters[payload_pkts_out]; + s.invalid_pkts_in = m_counters[invalid_pkts_in]; + s.redundant_pkts_in = m_counters[redundant_pkts_in]; + for (socket_map_t::const_iterator i = m_utp_sockets.begin() , end(m_utp_sockets.end()); i != end; ++i) { @@ -432,6 +445,13 @@ namespace libtorrent m_sock_buf_size = size; } + void utp_socket_manager::inc_stats_counter(int counter) + { + TORRENT_ASSERT(counter >= 0); + TORRENT_ASSERT(counter < num_counters); + ++m_counters[counter]; + } + utp_socket_impl* utp_socket_manager::new_utp_socket(utp_stream* str) { boost::uint16_t send_id = 0; diff --git a/src/utp_stream.cpp b/src/utp_stream.cpp index 7ff1a7333..5a0f38c01 100644 --- a/src/utp_stream.cpp +++ b/src/utp_stream.cpp @@ -229,7 +229,6 @@ struct utp_socket_impl , m_connect_handler(0) , m_remote_address() , m_timeout(time_now_hires() + milliseconds(m_sm->connect_timeout())) - , m_last_cwnd_hit(time_now()) , m_last_history_step(time_now_hires()) , m_cwnd(TORRENT_ETHERNET_MTU << 16) , m_buffered_incoming_bytes(0) @@ -421,12 +420,6 @@ struct utp_socket_impl // size less than one MSS. ptime m_timeout; - // the last time we wanted to send more data, but couldn't because - // it would bring the number of outstanding bytes above the cwnd. - // this is used to restrict increasing the cwnd size when we're - // not sending fast enough to need it bigger - ptime m_last_cwnd_hit; - // the last time we stepped the timestamp history ptime m_last_history_step; @@ -1524,7 +1517,7 @@ void utp_socket_impl::subscribe_drained() if (m_subscribe_drained) return; - UTP_LOGV("%8p: socket drained\n", this); + UTP_LOGV("%8p: subscribe drained\n", this); m_subscribe_drained = true; m_sm->subscribe_drained(this); } @@ -1578,6 +1571,7 @@ bool utp_socket_impl::send_pkt(int flags) // first see if we need to resend any packets + // TODO: this loop may not be very efficient for (int i = (m_acked_seq_nr + 1) & ACK_MASK; i != m_seq_nr; i = (i + 1) & ACK_MASK) { packet* p = (packet*)m_outbuf.at(i); @@ -1587,8 +1581,8 @@ bool utp_socket_impl::send_pkt(int flags) { // we couldn't resend the packet. It probably doesn't // fit in our cwnd. If force is set, we need to continue - // to send our force anyway, if we don't have to send an - // force, we might as well return + // to send our packet anyway, if we don't have force set, + // we might as well return if (!force) return false; // resend_packet might have failed if (m_state == UTP_STATE_ERROR_WAIT || m_state == UTP_STATE_DELETE) return false; @@ -1627,7 +1621,6 @@ bool utp_socket_impl::send_pkt(int flags) if ((flags & pkt_fin) == 0) payload_size = 0; // we're constrained by the window size - m_last_cwnd_hit = time_now_hires(); m_cwnd_full = true; UTP_LOGV("%8p: no space in window send_buffer_size:%d cwnd:%d " @@ -1686,6 +1679,8 @@ bool utp_socket_impl::send_pkt(int flags) { p = (packet*)malloc(sizeof(packet) + m_mtu); p->allocated = m_mtu; + + m_sm->inc_stats_counter(utp_socket_manager::payload_pkts_out); } else { @@ -1852,6 +1847,7 @@ bool utp_socket_impl::send_pkt(int flags) , p->mtu_probe ? utp_socket_manager::dont_fragment : 0); ++m_out_packets; + m_sm->inc_stats_counter(utp_socket_manager::packets_out); if (ec == error::message_size) { @@ -1993,7 +1989,6 @@ bool utp_socket_impl::resend_packet(packet* p, bool fast_resend) && p->size - p->header_size > window_size_left && m_bytes_in_flight > 0) { - m_last_cwnd_hit = time_now_hires(); m_cwnd_full = true; return false; } @@ -2005,6 +2000,9 @@ bool utp_socket_impl::resend_packet(packet* p, bool fast_resend) TORRENT_ASSERT(p->size - p->header_size >= 0); if (p->need_resend) m_bytes_in_flight += p->size - p->header_size; + m_sm->inc_stats_counter(utp_socket_manager::packet_resend); + if (fast_resend) m_sm->inc_stats_counter(utp_socket_manager::fast_retransmit); + p->need_resend = false; utp_header* h = (utp_header*)p->buf; // update packet header @@ -2036,6 +2034,8 @@ bool utp_socket_impl::resend_packet(packet* p, bool fast_resend) m_sm->send_packet(udp::endpoint(m_remote_address, m_port) , (char const*)p->buf, p->size, ec); ++m_out_packets; + m_sm->inc_stats_counter(utp_socket_manager::packets_out); + #if TORRENT_UTP_LOG UTP_LOGV("%8p: re-sending packet seq_nr:%d ack_nr:%d type:%s " @@ -2083,8 +2083,10 @@ void utp_socket_impl::experienced_loss(int seq_nr) // window size. The first packet that's lost will // update the limit to the last sequence number we sent. // i.e. only packet sent after this loss can cause another - // window size cut - if (compare_less_wrap(seq_nr, m_loss_seq_nr, ACK_MASK)) return; + // window size cut. The +1 is to turn the comparison into + // less than or equal to. If we experience loss of the + // same packet again, ignore it. + if (compare_less_wrap(seq_nr, m_loss_seq_nr + 1, ACK_MASK)) return; // cut window size in 2 m_cwnd = (std::max)(m_cwnd * m_sm->loss_multiplier() / 100, boost::int64_t(m_mtu << 16)); @@ -2096,6 +2098,7 @@ void utp_socket_impl::experienced_loss(int seq_nr) // if we happen to be in slow-start mode, we need to leave it m_slow_start = false; + m_sm->inc_stats_counter(utp_socket_manager::packet_loss); } void utp_socket_impl::maybe_inc_acked_seq_nr() @@ -2416,10 +2419,13 @@ bool utp_socket_impl::incoming_packet(boost::uint8_t const* buf, int size utp_header* ph = (utp_header*)buf; + m_sm->inc_stats_counter(utp_socket_manager::packets_in); + if (ph->get_version() != 1) { UTP_LOGV("%8p: incoming packet version:%d (ignored)\n" , this, int(ph->get_version())); + m_sm->inc_stats_counter(utp_socket_manager::invalid_pkts_in); return false; } @@ -2428,6 +2434,7 @@ bool utp_socket_impl::incoming_packet(boost::uint8_t const* buf, int size { UTP_LOGV("%8p: incoming packet id:%d expected:%d (ignored)\n" , this, int(ph->connection_id), int(m_recv_id)); + m_sm->inc_stats_counter(utp_socket_manager::invalid_pkts_in); return false; } @@ -2435,6 +2442,7 @@ bool utp_socket_impl::incoming_packet(boost::uint8_t const* buf, int size { UTP_LOGV("%8p: incoming packet type:%d (ignored)\n" , this, int(ph->get_type())); + m_sm->inc_stats_counter(utp_socket_manager::invalid_pkts_in); return false; } @@ -2447,6 +2455,7 @@ bool utp_socket_impl::incoming_packet(boost::uint8_t const* buf, int size if (m_state != UTP_STATE_NONE && ph->get_type() == ST_SYN) { UTP_LOGV("%8p: incoming packet type:ST_SYN (ignored)\n", this); + m_sm->inc_stats_counter(utp_socket_manager::invalid_pkts_in); return true; } @@ -2482,6 +2491,8 @@ bool utp_socket_impl::incoming_packet(boost::uint8_t const* buf, int size , this, m_reply_micro, prev_base ? base_change : 0); } + // TODO: 2 sequence number, source IP and connection ID should be + // verified before accepting a reset packet if (ph->get_type() == ST_RESET) { UTP_LOGV("%8p: incoming packet type:RESET\n", this); @@ -2511,6 +2522,7 @@ bool utp_socket_impl::incoming_packet(boost::uint8_t const* buf, int size { UTP_LOGV("%8p: incoming packet ack_nr:%d our seq_nr:%d (ignored)\n" , this, int(ph->ack_nr), m_seq_nr); + m_sm->inc_stats_counter(utp_socket_manager::redundant_pkts_in); return true; } @@ -2532,6 +2544,7 @@ bool utp_socket_impl::incoming_packet(boost::uint8_t const* buf, int size // we've already received this packet UTP_LOGV("%8p: incoming packet seq_nr:%d our ack_nr:%d (ignored)\n" , this, int(ph->seq_nr), m_ack_nr); + m_sm->inc_stats_counter(utp_socket_manager::redundant_pkts_in); return true; } */ @@ -2545,6 +2558,9 @@ bool utp_socket_impl::incoming_packet(boost::uint8_t const* buf, int size } + if (ph->get_type() == ST_DATA) + m_sm->inc_stats_counter(utp_socket_manager::payload_pkts_in); + if (m_state != UTP_STATE_NONE && m_state != UTP_STATE_SYN_SENT && compare_less_wrap((m_ack_nr + max_packets_reorder) & ACK_MASK, ph->seq_nr, ACK_MASK)) @@ -2556,6 +2572,7 @@ bool utp_socket_impl::incoming_packet(boost::uint8_t const* buf, int size // to drop the timestamp information. UTP_LOGV("%8p: incoming packet seq_nr:%d our ack_nr:%d (ignored)\n" , this, int(ph->seq_nr), m_ack_nr); + m_sm->inc_stats_counter(utp_socket_manager::redundant_pkts_in); return true; } @@ -2637,6 +2654,7 @@ bool utp_socket_impl::incoming_packet(boost::uint8_t const* buf, int size if (ptr - buf + 2 > size) { UTP_LOGV("%8p: invalid extension header\n", this); + m_sm->inc_stats_counter(utp_socket_manager::invalid_pkts_in); return true; } int next_extension = *ptr++; @@ -2645,12 +2663,14 @@ bool utp_socket_impl::incoming_packet(boost::uint8_t const* buf, int size { UTP_LOGV("%8p: invalid extension length:%d packet:%d\n" , this, len, int(ptr - buf)); + m_sm->inc_stats_counter(utp_socket_manager::invalid_pkts_in); return true; } if (ptr - buf + len > size_t(size)) { UTP_LOGV("%8p: invalid extension header size:%d packet:%d\n" , this, len, int(ptr - buf)); + m_sm->inc_stats_counter(utp_socket_manager::invalid_pkts_in); return true; } switch(extension) @@ -3077,6 +3097,10 @@ void utp_socket_impl::do_ledbat(int acked_bytes, int delay, int in_flight, ptime int target_delay = m_sm->target_delay(); + // true if the upper layer is pushing enough data down the socket to be + // limited by the cwnd. If this is not the case, we should not adjust cwnd. + bool cwnd_saturated = (m_bytes_in_flight + acked_bytes + m_mtu > (m_cwnd << 16)); + // all of these are fixed points with 16 bits fraction portion boost::int64_t window_factor = (boost::int64_t(acked_bytes) << 16) / in_flight; boost::int64_t delay_factor = (boost::int64_t(target_delay - delay) << 16) / target_delay; @@ -3085,39 +3109,41 @@ void utp_socket_impl::do_ledbat(int acked_bytes, int delay, int in_flight, ptime if (delay >= target_delay) { UTP_LOGV("%8p: off_target: %d slow_start -> 0\n", this, target_delay - delay); + m_sm->inc_stats_counter(utp_socket_manager::samples_above_target); m_slow_start = false; } + else + { + m_sm->inc_stats_counter(utp_socket_manager::samples_below_target); + } boost::int64_t linear_gain = (window_factor * delay_factor) >> 16; linear_gain *= boost::int64_t(m_sm->gain_factor()); - if (m_slow_start) + // if the user is not saturating the link (i.e. not filling the + // congestion window), don't adjust it at all. + if (cwnd_saturated) { - // mimic TCP slow-start by adding the number of acked - // bytes to cwnd - scaled_gain = (std::max)(boost::int64_t(acked_bytes) << 16, linear_gain); + if (m_slow_start) + { + // mimic TCP slow-start by adding the number of acked + // bytes to cwnd + scaled_gain = (std::max)(boost::int64_t(acked_bytes) << 16, linear_gain); + } + else + { + scaled_gain = linear_gain; + } } else { - scaled_gain = linear_gain; + scaled_gain = 0; } // make sure we don't wrap the cwnd if (scaled_gain >= INT64_MAX - m_cwnd) scaled_gain = INT64_MAX - m_cwnd - 1; - if (scaled_gain > 0 && !m_cwnd_full - && m_last_cwnd_hit + milliseconds(50) < now) - { - UTP_LOGV("%8p: last_cwnd_hit:%d full_cwnd:%d scaled_gain -> 0, slow_start -> 0\n", this - , total_milliseconds(now - m_last_cwnd_hit), int(m_cwnd_full)); - // we haven't bumped into the cwnd limit size in the last second - // this probably means we have a send rate limit, so we shouldn't make - // the cwnd size any larger - scaled_gain = 0; - m_slow_start = false; - } - UTP_LOGV("%8p: do_ledbat delay:%d off_target: %d window_factor:%f target_factor:%f " "scaled_gain:%f cwnd:%d slow_start:%d\n" , this, delay, target_delay - delay, window_factor / float(1 << 16) @@ -3143,7 +3169,6 @@ void utp_socket_impl::do_ledbat(int acked_bytes, int delay, int in_flight, ptime { UTP_LOGV("%8p: mtu:%d in_flight:%d adv_wnd:%d cwnd:%d acked_bytes:%d cwnd_full -> 0\n" , this, m_mtu, in_flight, int(m_adv_wnd), int(m_cwnd >> 16), acked_bytes); - if (m_cwnd_full) m_last_cwnd_hit = time_now_hires(); m_cwnd_full = false; } @@ -3194,6 +3219,8 @@ void utp_socket_impl::tick(ptime const& now) // TIMEOUT! // set cwnd to 1 MSS + m_sm->inc_stats_counter(utp_socket_manager::timeout); + if (m_outbuf.size()) ++m_num_timeouts; if (m_num_timeouts > m_sm->num_resends())