add more utp statistics and make the congestion controller less aggressive when the up-link is not saturated

This commit is contained in:
Arvid Norberg 2013-09-14 10:06:48 +00:00
parent 84fcc0baf0
commit 1aa47fc13e
5 changed files with 144 additions and 32 deletions

View File

@ -70,11 +70,26 @@ namespace libtorrent
struct utp_status
{
// gauges
int num_idle;
int num_syn_sent;
int num_connected;
int num_fin_sent;
int num_close_wait;
// counters
boost::uint64_t packet_loss;
boost::uint64_t timeout;
boost::uint64_t packets_in;
boost::uint64_t packets_out;
boost::uint64_t fast_retransmit;
boost::uint64_t packet_resend;
boost::uint64_t samples_above_target;
boost::uint64_t samples_below_target;
boost::uint64_t payload_pkts_in;
boost::uint64_t payload_pkts_out;
boost::uint64_t invalid_pkts_in;
boost::uint64_t redundant_pkts_in;
};
struct TORRENT_EXPORT session_status

View File

@ -95,6 +95,27 @@ namespace libtorrent
void defer_ack(utp_socket_impl* s);
void subscribe_drained(utp_socket_impl* s);
enum counter_t
{
packet_loss = 0,
timeout,
packets_in,
packets_out,
fast_retransmit,
packet_resend,
samples_above_target,
samples_below_target,
payload_pkts_in,
payload_pkts_out,
invalid_pkts_in,
redundant_pkts_in,
num_counters,
};
// used to keep stats of uTP events
void inc_stats_counter(int counter);
private:
udp_socket& m_sock;
incoming_utp_callback_t m_cb;
@ -143,6 +164,9 @@ namespace libtorrent
// the buffer size of the socket. This is used
// to now lower the buffer size
int m_sock_buf_size;
// stats counters
boost::uint64_t m_counters[num_counters];
};
}

View File

@ -1328,6 +1328,19 @@ namespace aux {
":peers up send buffer"
":packet_loss"
":timeout"
":packets_in"
":packets_out"
":fast_retransmit"
":packet_resend"
":samples_above_target"
":samples_below_target"
":payload_pkts_in"
":payload_pkts_out"
":invalid_pkts_in"
":redundant_pkts_in"
"\n\n", m_stats_logger);
}
#endif
@ -4100,6 +4113,19 @@ retry:
STAT_LOG(d, peers_up_send_buffer);
STAT_LOG(PRId64, sst.utp_stats.packet_loss);
STAT_LOG(PRId64, sst.utp_stats.timeout);
STAT_LOG(PRId64, sst.utp_stats.packets_in);
STAT_LOG(PRId64, sst.utp_stats.packets_out);
STAT_LOG(PRId64, sst.utp_stats.fast_retransmit);
STAT_LOG(PRId64, sst.utp_stats.packet_resend);
STAT_LOG(PRId64, sst.utp_stats.samples_above_target);
STAT_LOG(PRId64, sst.utp_stats.samples_below_target);
STAT_LOG(PRId64, sst.utp_stats.payload_pkts_in);
STAT_LOG(PRId64, sst.utp_stats.payload_pkts_out);
STAT_LOG(PRId64, sst.utp_stats.invalid_pkts_in);
STAT_LOG(PRId64, sst.utp_stats.redundant_pkts_in);
fprintf(m_stats_logger, "\n");
#undef STAT_LOG

View File

@ -72,6 +72,19 @@ namespace libtorrent
s.num_fin_sent = 0;
s.num_close_wait = 0;
s.packet_loss = m_counters[packet_loss];
s.timeout = m_counters[timeout];
s.packets_in = m_counters[packets_in];
s.packets_out = m_counters[packets_out];
s.fast_retransmit = m_counters[fast_retransmit];
s.packet_resend = m_counters[packet_resend];
s.samples_above_target = m_counters[samples_above_target];
s.samples_below_target = m_counters[samples_below_target];
s.payload_pkts_in = m_counters[payload_pkts_in];
s.payload_pkts_out = m_counters[payload_pkts_out];
s.invalid_pkts_in = m_counters[invalid_pkts_in];
s.redundant_pkts_in = m_counters[redundant_pkts_in];
for (socket_map_t::const_iterator i = m_utp_sockets.begin()
, end(m_utp_sockets.end()); i != end; ++i)
{
@ -432,6 +445,13 @@ namespace libtorrent
m_sock_buf_size = size;
}
void utp_socket_manager::inc_stats_counter(int counter)
{
TORRENT_ASSERT(counter >= 0);
TORRENT_ASSERT(counter < num_counters);
++m_counters[counter];
}
utp_socket_impl* utp_socket_manager::new_utp_socket(utp_stream* str)
{
boost::uint16_t send_id = 0;

View File

@ -229,7 +229,6 @@ struct utp_socket_impl
, m_connect_handler(0)
, m_remote_address()
, m_timeout(time_now_hires() + milliseconds(m_sm->connect_timeout()))
, m_last_cwnd_hit(time_now())
, m_last_history_step(time_now_hires())
, m_cwnd(TORRENT_ETHERNET_MTU << 16)
, m_buffered_incoming_bytes(0)
@ -421,12 +420,6 @@ struct utp_socket_impl
// size less than one MSS.
ptime m_timeout;
// the last time we wanted to send more data, but couldn't because
// it would bring the number of outstanding bytes above the cwnd.
// this is used to restrict increasing the cwnd size when we're
// not sending fast enough to need it bigger
ptime m_last_cwnd_hit;
// the last time we stepped the timestamp history
ptime m_last_history_step;
@ -1524,7 +1517,7 @@ void utp_socket_impl::subscribe_drained()
if (m_subscribe_drained) return;
UTP_LOGV("%8p: socket drained\n", this);
UTP_LOGV("%8p: subscribe drained\n", this);
m_subscribe_drained = true;
m_sm->subscribe_drained(this);
}
@ -1578,6 +1571,7 @@ bool utp_socket_impl::send_pkt(int flags)
// first see if we need to resend any packets
// TODO: this loop may not be very efficient
for (int i = (m_acked_seq_nr + 1) & ACK_MASK; i != m_seq_nr; i = (i + 1) & ACK_MASK)
{
packet* p = (packet*)m_outbuf.at(i);
@ -1587,8 +1581,8 @@ bool utp_socket_impl::send_pkt(int flags)
{
// we couldn't resend the packet. It probably doesn't
// fit in our cwnd. If force is set, we need to continue
// to send our force anyway, if we don't have to send an
// force, we might as well return
// to send our packet anyway, if we don't have force set,
// we might as well return
if (!force) return false;
// resend_packet might have failed
if (m_state == UTP_STATE_ERROR_WAIT || m_state == UTP_STATE_DELETE) return false;
@ -1627,7 +1621,6 @@ bool utp_socket_impl::send_pkt(int flags)
if ((flags & pkt_fin) == 0) payload_size = 0;
// we're constrained by the window size
m_last_cwnd_hit = time_now_hires();
m_cwnd_full = true;
UTP_LOGV("%8p: no space in window send_buffer_size:%d cwnd:%d "
@ -1686,6 +1679,8 @@ bool utp_socket_impl::send_pkt(int flags)
{
p = (packet*)malloc(sizeof(packet) + m_mtu);
p->allocated = m_mtu;
m_sm->inc_stats_counter(utp_socket_manager::payload_pkts_out);
}
else
{
@ -1852,6 +1847,7 @@ bool utp_socket_impl::send_pkt(int flags)
, p->mtu_probe ? utp_socket_manager::dont_fragment : 0);
++m_out_packets;
m_sm->inc_stats_counter(utp_socket_manager::packets_out);
if (ec == error::message_size)
{
@ -1993,7 +1989,6 @@ bool utp_socket_impl::resend_packet(packet* p, bool fast_resend)
&& p->size - p->header_size > window_size_left
&& m_bytes_in_flight > 0)
{
m_last_cwnd_hit = time_now_hires();
m_cwnd_full = true;
return false;
}
@ -2005,6 +2000,9 @@ bool utp_socket_impl::resend_packet(packet* p, bool fast_resend)
TORRENT_ASSERT(p->size - p->header_size >= 0);
if (p->need_resend) m_bytes_in_flight += p->size - p->header_size;
m_sm->inc_stats_counter(utp_socket_manager::packet_resend);
if (fast_resend) m_sm->inc_stats_counter(utp_socket_manager::fast_retransmit);
p->need_resend = false;
utp_header* h = (utp_header*)p->buf;
// update packet header
@ -2036,6 +2034,8 @@ bool utp_socket_impl::resend_packet(packet* p, bool fast_resend)
m_sm->send_packet(udp::endpoint(m_remote_address, m_port)
, (char const*)p->buf, p->size, ec);
++m_out_packets;
m_sm->inc_stats_counter(utp_socket_manager::packets_out);
#if TORRENT_UTP_LOG
UTP_LOGV("%8p: re-sending packet seq_nr:%d ack_nr:%d type:%s "
@ -2083,8 +2083,10 @@ void utp_socket_impl::experienced_loss(int seq_nr)
// window size. The first packet that's lost will
// update the limit to the last sequence number we sent.
// i.e. only packet sent after this loss can cause another
// window size cut
if (compare_less_wrap(seq_nr, m_loss_seq_nr, ACK_MASK)) return;
// window size cut. The +1 is to turn the comparison into
// less than or equal to. If we experience loss of the
// same packet again, ignore it.
if (compare_less_wrap(seq_nr, m_loss_seq_nr + 1, ACK_MASK)) return;
// cut window size in 2
m_cwnd = (std::max)(m_cwnd * m_sm->loss_multiplier() / 100, boost::int64_t(m_mtu << 16));
@ -2096,6 +2098,7 @@ void utp_socket_impl::experienced_loss(int seq_nr)
// if we happen to be in slow-start mode, we need to leave it
m_slow_start = false;
m_sm->inc_stats_counter(utp_socket_manager::packet_loss);
}
void utp_socket_impl::maybe_inc_acked_seq_nr()
@ -2416,10 +2419,13 @@ bool utp_socket_impl::incoming_packet(boost::uint8_t const* buf, int size
utp_header* ph = (utp_header*)buf;
m_sm->inc_stats_counter(utp_socket_manager::packets_in);
if (ph->get_version() != 1)
{
UTP_LOGV("%8p: incoming packet version:%d (ignored)\n"
, this, int(ph->get_version()));
m_sm->inc_stats_counter(utp_socket_manager::invalid_pkts_in);
return false;
}
@ -2428,6 +2434,7 @@ bool utp_socket_impl::incoming_packet(boost::uint8_t const* buf, int size
{
UTP_LOGV("%8p: incoming packet id:%d expected:%d (ignored)\n"
, this, int(ph->connection_id), int(m_recv_id));
m_sm->inc_stats_counter(utp_socket_manager::invalid_pkts_in);
return false;
}
@ -2435,6 +2442,7 @@ bool utp_socket_impl::incoming_packet(boost::uint8_t const* buf, int size
{
UTP_LOGV("%8p: incoming packet type:%d (ignored)\n"
, this, int(ph->get_type()));
m_sm->inc_stats_counter(utp_socket_manager::invalid_pkts_in);
return false;
}
@ -2447,6 +2455,7 @@ bool utp_socket_impl::incoming_packet(boost::uint8_t const* buf, int size
if (m_state != UTP_STATE_NONE && ph->get_type() == ST_SYN)
{
UTP_LOGV("%8p: incoming packet type:ST_SYN (ignored)\n", this);
m_sm->inc_stats_counter(utp_socket_manager::invalid_pkts_in);
return true;
}
@ -2482,6 +2491,8 @@ bool utp_socket_impl::incoming_packet(boost::uint8_t const* buf, int size
, this, m_reply_micro, prev_base ? base_change : 0);
}
// TODO: 2 sequence number, source IP and connection ID should be
// verified before accepting a reset packet
if (ph->get_type() == ST_RESET)
{
UTP_LOGV("%8p: incoming packet type:RESET\n", this);
@ -2511,6 +2522,7 @@ bool utp_socket_impl::incoming_packet(boost::uint8_t const* buf, int size
{
UTP_LOGV("%8p: incoming packet ack_nr:%d our seq_nr:%d (ignored)\n"
, this, int(ph->ack_nr), m_seq_nr);
m_sm->inc_stats_counter(utp_socket_manager::redundant_pkts_in);
return true;
}
@ -2532,6 +2544,7 @@ bool utp_socket_impl::incoming_packet(boost::uint8_t const* buf, int size
// we've already received this packet
UTP_LOGV("%8p: incoming packet seq_nr:%d our ack_nr:%d (ignored)\n"
, this, int(ph->seq_nr), m_ack_nr);
m_sm->inc_stats_counter(utp_socket_manager::redundant_pkts_in);
return true;
}
*/
@ -2545,6 +2558,9 @@ bool utp_socket_impl::incoming_packet(boost::uint8_t const* buf, int size
}
if (ph->get_type() == ST_DATA)
m_sm->inc_stats_counter(utp_socket_manager::payload_pkts_in);
if (m_state != UTP_STATE_NONE
&& m_state != UTP_STATE_SYN_SENT
&& compare_less_wrap((m_ack_nr + max_packets_reorder) & ACK_MASK, ph->seq_nr, ACK_MASK))
@ -2556,6 +2572,7 @@ bool utp_socket_impl::incoming_packet(boost::uint8_t const* buf, int size
// to drop the timestamp information.
UTP_LOGV("%8p: incoming packet seq_nr:%d our ack_nr:%d (ignored)\n"
, this, int(ph->seq_nr), m_ack_nr);
m_sm->inc_stats_counter(utp_socket_manager::redundant_pkts_in);
return true;
}
@ -2637,6 +2654,7 @@ bool utp_socket_impl::incoming_packet(boost::uint8_t const* buf, int size
if (ptr - buf + 2 > size)
{
UTP_LOGV("%8p: invalid extension header\n", this);
m_sm->inc_stats_counter(utp_socket_manager::invalid_pkts_in);
return true;
}
int next_extension = *ptr++;
@ -2645,12 +2663,14 @@ bool utp_socket_impl::incoming_packet(boost::uint8_t const* buf, int size
{
UTP_LOGV("%8p: invalid extension length:%d packet:%d\n"
, this, len, int(ptr - buf));
m_sm->inc_stats_counter(utp_socket_manager::invalid_pkts_in);
return true;
}
if (ptr - buf + len > size_t(size))
{
UTP_LOGV("%8p: invalid extension header size:%d packet:%d\n"
, this, len, int(ptr - buf));
m_sm->inc_stats_counter(utp_socket_manager::invalid_pkts_in);
return true;
}
switch(extension)
@ -3077,6 +3097,10 @@ void utp_socket_impl::do_ledbat(int acked_bytes, int delay, int in_flight, ptime
int target_delay = m_sm->target_delay();
// true if the upper layer is pushing enough data down the socket to be
// limited by the cwnd. If this is not the case, we should not adjust cwnd.
bool cwnd_saturated = (m_bytes_in_flight + acked_bytes + m_mtu > (m_cwnd << 16));
// all of these are fixed points with 16 bits fraction portion
boost::int64_t window_factor = (boost::int64_t(acked_bytes) << 16) / in_flight;
boost::int64_t delay_factor = (boost::int64_t(target_delay - delay) << 16) / target_delay;
@ -3085,39 +3109,41 @@ void utp_socket_impl::do_ledbat(int acked_bytes, int delay, int in_flight, ptime
if (delay >= target_delay)
{
UTP_LOGV("%8p: off_target: %d slow_start -> 0\n", this, target_delay - delay);
m_sm->inc_stats_counter(utp_socket_manager::samples_above_target);
m_slow_start = false;
}
else
{
m_sm->inc_stats_counter(utp_socket_manager::samples_below_target);
}
boost::int64_t linear_gain = (window_factor * delay_factor) >> 16;
linear_gain *= boost::int64_t(m_sm->gain_factor());
if (m_slow_start)
// if the user is not saturating the link (i.e. not filling the
// congestion window), don't adjust it at all.
if (cwnd_saturated)
{
// mimic TCP slow-start by adding the number of acked
// bytes to cwnd
scaled_gain = (std::max)(boost::int64_t(acked_bytes) << 16, linear_gain);
if (m_slow_start)
{
// mimic TCP slow-start by adding the number of acked
// bytes to cwnd
scaled_gain = (std::max)(boost::int64_t(acked_bytes) << 16, linear_gain);
}
else
{
scaled_gain = linear_gain;
}
}
else
{
scaled_gain = linear_gain;
scaled_gain = 0;
}
// make sure we don't wrap the cwnd
if (scaled_gain >= INT64_MAX - m_cwnd)
scaled_gain = INT64_MAX - m_cwnd - 1;
if (scaled_gain > 0 && !m_cwnd_full
&& m_last_cwnd_hit + milliseconds(50) < now)
{
UTP_LOGV("%8p: last_cwnd_hit:%d full_cwnd:%d scaled_gain -> 0, slow_start -> 0\n", this
, total_milliseconds(now - m_last_cwnd_hit), int(m_cwnd_full));
// we haven't bumped into the cwnd limit size in the last second
// this probably means we have a send rate limit, so we shouldn't make
// the cwnd size any larger
scaled_gain = 0;
m_slow_start = false;
}
UTP_LOGV("%8p: do_ledbat delay:%d off_target: %d window_factor:%f target_factor:%f "
"scaled_gain:%f cwnd:%d slow_start:%d\n"
, this, delay, target_delay - delay, window_factor / float(1 << 16)
@ -3143,7 +3169,6 @@ void utp_socket_impl::do_ledbat(int acked_bytes, int delay, int in_flight, ptime
{
UTP_LOGV("%8p: mtu:%d in_flight:%d adv_wnd:%d cwnd:%d acked_bytes:%d cwnd_full -> 0\n"
, this, m_mtu, in_flight, int(m_adv_wnd), int(m_cwnd >> 16), acked_bytes);
if (m_cwnd_full) m_last_cwnd_hit = time_now_hires();
m_cwnd_full = false;
}
@ -3194,6 +3219,8 @@ void utp_socket_impl::tick(ptime const& now)
// TIMEOUT!
// set cwnd to 1 MSS
m_sm->inc_stats_counter(utp_socket_manager::timeout);
if (m_outbuf.size()) ++m_num_timeouts;
if (m_num_timeouts > m_sm->num_resends())