merge RC_1_1 into master

This commit is contained in:
arvidn 2019-01-02 17:59:01 +01:00
commit 02d7d71c5f
5 changed files with 219 additions and 86 deletions

View File

@ -93,6 +93,8 @@
* resume data no longer has timestamps of files
* require C++11 to build libtorrent
* uTP performance fixes
1.1.11 release
* fix move_storage with save_path with a trailing slash

View File

@ -147,11 +147,10 @@ namespace libtorrent {
using socket_vector_t = std::vector<utp_socket_impl*>;
// this is a list of sockets that needs to send an ack.
// once the UDP socket is drained, all of these will
// have a chance to do that. This is to avoid sending
// an ack for every single packet
socket_vector_t m_deferred_acks;
// if this is set, it means this socket still needs to send an ACK. Once
// we exit the loop processing packets, or switch to processing packets
// for a different socket, issue the ACK packet and clear this.
utp_socket_impl* m_deferred_ack = nullptr;
// storage used for saving cpu time on "push_back"
// by using already pre-allocated vector

View File

@ -177,6 +177,12 @@ namespace libtorrent {
return utp_incoming_packet(m_last_socket, p, ep, receive_time);
}
if (m_deferred_ack)
{
utp_send_ack(m_deferred_ack);
m_deferred_ack = nullptr;
}
auto r = m_utp_sockets.equal_range(id);
for (; r.first != r.second; ++r.first)
@ -227,6 +233,7 @@ namespace libtorrent {
utp_init_socket(str->get_impl(), std::move(socket));
bool ret = utp_incoming_packet(str->get_impl(), p, ep, receive_time);
if (!ret) return false;
m_last_socket = str->get_impl();
m_cb(c);
// the connection most likely changed its connection ID here
// we need to move it to the correct ID
@ -262,16 +269,11 @@ namespace libtorrent {
void utp_socket_manager::socket_drained()
{
// flush all deferred acks
if (!m_deferred_acks.empty())
if (m_deferred_ack)
{
m_temp_sockets.clear();
m_deferred_acks.swap(m_temp_sockets);
for (auto const &s : m_temp_sockets)
{
utp_send_ack(s);
}
utp_socket_impl* s = m_deferred_ack;
m_deferred_ack = nullptr;
utp_send_ack(s);
}
if (!m_drained_event.empty())
@ -287,9 +289,8 @@ namespace libtorrent {
void utp_socket_manager::defer_ack(utp_socket_impl* s)
{
TORRENT_ASSERT(std::find(m_deferred_acks.begin(), m_deferred_acks.end(), s)
== m_deferred_acks.end());
m_deferred_acks.push_back(s);
TORRENT_ASSERT(m_deferred_ack == NULL || m_deferred_ack == s);
m_deferred_ack = s;
}
void utp_socket_manager::subscribe_drained(utp_socket_impl* s)
@ -316,6 +317,7 @@ namespace libtorrent {
if (i == m_utp_sockets.end()) return;
delete_utp_impl(i->second);
if (m_last_socket == i->second) m_last_socket = nullptr;
if (m_deferred_ack == i->second) m_deferred_ack = nullptr;
m_utp_sockets.erase(i);
}

View File

@ -48,10 +48,6 @@ POSSIBILITY OF SUCH DAMAGE.
#include <numeric> // for accumulate
#endif
// the behavior of the sequence numbers as implemented by uTorrent is not
// particularly regular. This switch indicates the odd parts.
#define TORRENT_UT_SEQ 1
#if TORRENT_UTP_LOG
#include <cstdarg>
#include <cinttypes> // for PRId64 et.al.
@ -280,7 +276,7 @@ struct utp_socket_impl
void parse_close_reason(std::uint8_t const* ptr, int size);
void write_payload(std::uint8_t* ptr, int size);
void maybe_inc_acked_seq_nr();
std::uint32_t ack_packet(packet_ptr p, time_point const& receive_time
std::uint32_t ack_packet(packet_ptr p, time_point receive_time
, std::uint16_t seq_nr);
void write_sack(std::uint8_t* buf, int size) const;
void incoming(std::uint8_t const* buf, int size, packet_ptr p, time_point now);
@ -1067,8 +1063,8 @@ std::size_t utp_stream::read_some(bool const clear_buffers)
if (m_impl->m_receive_buffer_size == 0)
{
UTP_LOGV(" Didn't fill entire target: %d bytes left in buffer\n"
, m_impl->m_receive_buffer_size);
UTP_LOGV("%8p: Didn't fill entire target: %d bytes left in buffer\n"
, static_cast<void*>(m_impl), m_impl->m_receive_buffer_size);
break;
}
}
@ -1452,14 +1448,14 @@ void utp_socket_impl::parse_close_reason(std::uint8_t const* ptr, int const size
// returns (rtt, acked_bytes)
std::pair<std::uint32_t, int> utp_socket_impl::parse_sack(std::uint16_t const packet_ack
, std::uint8_t const* ptr, int size, time_point const now)
, std::uint8_t const* ptr, int const size, time_point const now)
{
INVARIANT_CHECK;
if (size == 0) return { 0u, 0 };
// this is the sequence number the current bit represents
std::uint32_t ack_nr = (packet_ack + 2) & ACK_MASK;
std::uint16_t ack_nr = (packet_ack + 2) & ACK_MASK;
#if TORRENT_VERBOSE_UTP_LOG
std::string bitmask;
@ -1475,35 +1471,34 @@ std::pair<std::uint32_t, int> utp_socket_impl::parse_sack(std::uint16_t const pa
mask <<= 1;
}
}
UTP_LOGV("%8p: got SACK first:%d %s our_seq_nr:%u\n"
, static_cast<void*>(this), ack_nr, bitmask.c_str(), m_seq_nr);
UTP_LOGV("%8p: got SACK first:%d %s our_seq_nr:%u fast_resend_seq_nr:%d\n"
, static_cast<void*>(this), ack_nr, bitmask.c_str(), m_seq_nr, m_fast_resend_seq_nr);
#endif
// the number of acked packets past the fast re-send sequence number
// this is used to determine if we should trigger more fast re-sends
int dups = 0;
// the sequence number of the last ACKed packet
std::uint32_t last_ack = packet_ack;
aux::array<std::uint16_t, 5> resend;
int num_to_resend = 0;
int acked_bytes = 0;
std::uint32_t min_rtt = std::numeric_limits<std::uint32_t>::max();
// this was implicitly lost
if (!compare_less_wrap((packet_ack + 1) & ACK_MASK, m_fast_resend_seq_nr, ACK_MASK))
{
resend[num_to_resend++] = (packet_ack + 1) & ACK_MASK;
}
// for each byte
for (std::uint8_t const* end = ptr + size; ptr != end; ++ptr)
std::uint8_t const* const start = ptr;
std::uint8_t const* const end = ptr + size;
for (; ptr != end; ++ptr)
{
std::uint8_t bitfield = *ptr;
unsigned char mask = 1;
std::uint8_t mask = 1;
// for each bit
for (int i = 0; i < 8; ++i)
{
if (mask & bitfield)
{
last_ack = ack_nr;
if (m_fast_resend_seq_nr == ack_nr)
m_fast_resend_seq_nr = (m_fast_resend_seq_nr + 1) & ACK_MASK;
if (compare_less_wrap(m_fast_resend_seq_nr, ack_nr, ACK_MASK)) ++dups;
// this bit was set, ack_nr was received
packet_ptr p = m_outbuf.remove(aux::numeric_cast<packet_buffer::index_type>(ack_nr));
if (p)
@ -1521,6 +1516,11 @@ std::pair<std::uint32_t, int> utp_socket_impl::parse_sack(std::uint16_t const pa
maybe_inc_acked_seq_nr();
}
}
else if (!compare_less_wrap(ack_nr, m_fast_resend_seq_nr, ACK_MASK)
&& num_to_resend < int(resend.size()))
{
resend[num_to_resend++] = ack_nr;
}
mask <<= 1;
ack_nr = (ack_nr + 1) & ACK_MASK;
@ -1533,30 +1533,74 @@ std::pair<std::uint32_t, int> utp_socket_impl::parse_sack(std::uint16_t const pa
if (ack_nr == m_seq_nr) break;
}
// now, scan the bits in reverse, and count the number of ACKed packets. Only
// lost packets followed by 'dup_ack_limit' packets may be resent
// start with the sequence number represented by the last bit in the SACK
// bitmask
std::uint16_t last_resend = (packet_ack + 1 + size * 8) & ACK_MASK;
// the number of acked packets past the fast re-send sequence number
// this is used to determine if we should trigger more fast re-sends
int dups = 0;
for (std::uint8_t const* i = end; i != start; --i)
{
std::uint8_t const bitfield = i[-1];
std::uint8_t mask = 0x80;
// for each bit
for (int k = 0; k < 8; ++k)
{
if (mask & bitfield) ++dups;
if (dups > dup_ack_limit) break;
last_resend = (last_resend - 1) & ACK_MASK;
mask >>= 1;
}
if (dups > dup_ack_limit) break;
}
// we did not get enough packets acked in this message to warrant a resend
if (dups <= dup_ack_limit)
{
UTP_LOGV("%8p: only %d ACKs in SACK, requires more than %d to trigger fast retransmit\n"
, static_cast<void*>(this), dups, dup_ack_limit);
num_to_resend = 0;
}
// now we need to (likely) prune the tail of the resend list, since all
// "unacked" packets that weren't followed by an acked one, don't count
while (num_to_resend > 0 && !compare_less_wrap(resend[num_to_resend - 1], last_resend, ACK_MASK))
{
--num_to_resend;
}
TORRENT_ASSERT(m_outbuf.at((m_acked_seq_nr + 1) & ACK_MASK) || ((m_seq_nr - m_acked_seq_nr) & ACK_MASK) <= 1);
// we received more than dup_ack_limit ACKs in this SACK message.
// trigger fast re-send
if (dups >= dup_ack_limit && compare_less_wrap(m_fast_resend_seq_nr, last_ack, ACK_MASK))
{
experienced_loss(m_fast_resend_seq_nr);
int num_resent = 0;
bool cut_cwnd = true;
// only re-sending a single packet per sack
// appears to improve performance by making it
// less likely to loose the re-sent packet. Because
// when that happens, we must time-out in order
// to continue, which takes a long time.
if (m_fast_resend_seq_nr != last_ack)
// we received more than dup_ack_limit ACKs in this SACK message.
// trigger fast re-send. This is not an equal check because 3 identical ACKS
// are only 2 duplicates
for (int i = 0; i < num_to_resend; ++i)
{
std::uint16_t const pkt_seq = resend[i];
packet* p = m_outbuf.at(pkt_seq);
UTP_LOGV("%8p: Packet %d lost. (fast_resend_seq_nr:%d trigger fast-resend)\n"
, static_cast<void*>(this), pkt_seq, m_fast_resend_seq_nr);
if (p)
{
packet* p = m_outbuf.at(m_fast_resend_seq_nr);
m_fast_resend_seq_nr = (m_fast_resend_seq_nr + 1) & ACK_MASK;
if (p)
if (resend_packet(p, true))
{
++num_resent;
if (resend_packet(p, true)) m_duplicate_acks = 0;
m_duplicate_acks = 0;
m_fast_resend_seq_nr = (pkt_seq + 1) & ACK_MASK;
}
}
if (cut_cwnd)
{
experienced_loss(pkt_seq);
cut_cwnd = false;
}
}
return { min_rtt, acked_bytes };
@ -1869,8 +1913,11 @@ bool utp_socket_impl::send_pkt(int const flags)
write_payload(p->buf + p->size, size_left);
p->size += std::uint16_t(size_left);
UTP_LOGV("%8p: NAGLE appending %d bytes to nagle packet. new size: %d allocated: %d\n"
, static_cast<void*>(this), size_left, p->size, p->allocated);
if (size_left > 0)
{
UTP_LOGV("%8p: NAGLE appending %d bytes to nagle packet. new size: %d allocated: %d\n"
, static_cast<void*>(this), size_left, p->size, p->allocated);
}
// did we fill up the whole mtu?
// if we didn't, we may still send it if there's
@ -2031,12 +2078,6 @@ bool utp_socket_impl::send_pkt(int const flags)
// be a nagle packet waiting for more data
TORRENT_ASSERT(!m_nagle_packet);
#if !TORRENT_UT_SEQ
// if the other end closed the connection immediately
// our FIN packet will end up having the same sequence
// number as the SYN, so this assert is invalid
TORRENT_ASSERT(!m_outbuf.at(m_seq_nr));
#endif
TORRENT_ASSERT(h->seq_nr == m_seq_nr);
// 0 is a special sequence number, since it's also used as "uninitialized".
@ -2243,7 +2284,8 @@ void utp_socket_impl::experienced_loss(std::uint32_t const seq_nr)
{
m_ssthres = std::int32_t(m_cwnd >> 16);
m_slow_start = false;
UTP_LOGV("%8p: experienced loss, slow_start -> 0\n", static_cast<void*>(this));
UTP_LOGV("%8p: experienced loss, slow_start -> 0 ssthres:%d\n"
, static_cast<void*>(this), m_ssthres);
}
}
@ -2290,7 +2332,7 @@ void utp_socket_impl::maybe_inc_acked_seq_nr()
}
// returns RTT
std::uint32_t utp_socket_impl::ack_packet(packet_ptr p, time_point const& receive_time
std::uint32_t utp_socket_impl::ack_packet(packet_ptr p, time_point const receive_time
, std::uint16_t seq_nr)
{
#ifdef TORRENT_EXPENSIVE_INVARIANT_CHECKS
@ -2314,7 +2356,6 @@ std::uint32_t utp_socket_impl::ack_packet(packet_ptr p, time_point const& receiv
TORRENT_ASSERT(p->mtu_probe);
// our mtu probe was acked!
m_mtu_floor = std::max(m_mtu_floor, p->size);
if (m_mtu_ceiling < m_mtu_floor) m_mtu_ceiling = m_mtu_floor;
update_mtu_limits();
}
@ -2703,11 +2744,10 @@ bool utp_socket_impl::incoming_packet(span<std::uint8_t const> buf
// something is wrong.
// If our state is state_none, this packet must be a syn packet
// and the ack_nr should be ignored
std::uint16_t cmp_seq_nr = (m_seq_nr - 1) & ACK_MASK;
#if TORRENT_UT_SEQ
if (m_state == UTP_STATE_SYN_SENT && ph->get_type() == ST_STATE)
cmp_seq_nr = m_seq_nr;
#endif
std::uint16_t const cmp_seq_nr =
(m_state == UTP_STATE_SYN_SENT && ph->get_type() == ST_STATE)
? m_seq_nr : (m_seq_nr - 1) & ACK_MASK;
if ((m_state != UTP_STATE_NONE || ph->get_type() != ST_SYN)
&& (compare_less_wrap(cmp_seq_nr, ph->ack_nr, ACK_MASK)
|| compare_less_wrap(ph->ack_nr, m_acked_seq_nr
@ -2745,7 +2785,9 @@ bool utp_socket_impl::incoming_packet(span<std::uint8_t const> buf
// if the socket is closing, always ignore any packet
// with a higher sequence number than the FIN sequence number
if (m_eof && compare_less_wrap(m_eof_seq_nr, ph->seq_nr, ACK_MASK))
// ST_STATE messages always include the next seqnr.
if (m_eof && (compare_less_wrap(m_eof_seq_nr, ph->seq_nr, ACK_MASK)
|| (m_eof_seq_nr == ph->seq_nr && ph->get_type() != ST_STATE)))
{
#if TORRENT_UTP_LOG
UTP_LOG("%8p: ERROR: incoming packet type: %s seq_nr:%d eof_seq_nr:%d (ignored)\n"
@ -3430,13 +3472,15 @@ void utp_socket_impl::do_ledbat(const int acked_bytes, const int delay
, static_cast<void*>(this), m_mtu, in_flight, int(m_adv_wnd), int(m_cwnd >> 16), acked_bytes);
m_cwnd_full = false;
}
/*
if ((m_cwnd >> 16) >= m_adv_wnd)
{
m_slow_start = false;
m_ssthres = (m_cwnd >> 16);
UTP_LOGV("%8p: cwnd > advertized wnd (%u) slow_start -> 0\n"
, static_cast<void*>(this), m_adv_wnd);
}
*/
}
void utp_stream::bind(endpoint_type const&, error_code&) { }
@ -3531,7 +3575,6 @@ void utp_socket_impl::tick(time_point const now)
// we had was the probe. Assume it was dropped
// because it was too big
m_mtu_ceiling = m_mtu - 1;
if (m_mtu_floor > m_mtu_ceiling) m_mtu_floor = m_mtu_ceiling;
update_mtu_limits();
}

View File

@ -82,13 +82,23 @@ metrics = {
'get_microseconds': ['clock (us)', 'x1y1', 'steps'],
'wnduser': ['advertised window size (B)', 'x1y1', 'steps'],
'ssthres': ['slow-start threshold (B)', 'x1y1', 'steps'],
'timeout': ['until next timeout (ms)', 'x1y2', 'steps'],
'rto': ['current timeout (ms)', 'x1y2', 'steps'],
'delay_base': ['delay base (us)', 'x1y1', delay_base],
'their_delay_base': ['their delay base (us)', 'x1y1', delay_base],
'their_actual_delay': ['their actual delay (us)', 'x1y1', delay_samples],
'actual_delay': ['actual_delay (us)', 'x1y1', delay_samples],
'send_buffer': ['send buffer size (B)', 'x1y1', send_buffer],
'recv_buffer': ['receive buffer size (B)', 'x1y1', 'lines']
'recv_buffer': ['receive buffer size (B)', 'x1y1', 'lines'],
'packet_loss': ['packet lost', 'x1y2', 'steps'],
'packet_timeout': ['packet timed out', 'x1y2', 'steps'],
'acked_bytes': ['Bytes ACKed by packet', 'x1y2', 'steps'],
'bytes_sent': ['cumulative bytes sent', 'x1y2', 'steps'],
'bytes_resent': ['cumulative bytes resent', 'x1y2', 'steps'],
'written': ['reported written bytes', 'x1y2', 'steps'],
'ack_nr': ['acked sequence number', 'x1y2', 'steps'],
'seq_nr': ['sent sequence number', 'x1y2', 'steps'],
}
histogram_quantization = 1.0
@ -101,10 +111,16 @@ begin = None
title = "-"
packet_loss = 0
packet_timeout = 0
num_acked = 0
delay_histogram = {}
packet_size_histogram = {}
window_size = {'0': 0, '1': 0}
bytes_sent = 0
bytes_resent = 0
written = 0
ack_nr = 0
seq_nr = 0
# [35301484] 0x00ec1190: actual_delay:1021583 our_delay:102 their_delay:-1021345 off_target:297 max_window:2687
# upload_rate:18942 delay_base:1021481154 delay_sum:-1021242 target_delay:400 acked_bytes:1441 cur_window:2882
@ -140,15 +156,37 @@ for line in file:
print("\r%d " % counter, end=' ')
if "lost." in line:
packet_loss = packet_loss + 1
packet_loss += 1
continue
if "Packet timeout" in line:
packet_timeout = packet_timeout + 1
if "lost (timeout)" in line:
packet_timeout += 1
continue
if "acked packet " in line:
num_acked += 1
if "sending packet" in line:
v = line.split('size:')[1].split(' ')[0]
packet_size_histogram[v] = 1 + packet_size_histogram.get(v, 0)
bytes_sent += int(v)
if "re-sending packet" in line:
v = line.split('size:')[1].split(' ')[0]
bytes_resent += int(v)
if 'calling write handler' in line:
v = line.split('written:')[1].split(' ')[0]
written += int(v)
if "incoming packet" in line \
and "ERROR" not in line \
and "seq_nr:" in line \
and "type:ST_SYN" not in line:
if "ack_nr:" not in line:
print(line)
ack_nr = int(line.split('ack_nr:')[1].split(' ')[0])
seq_nr = int(line.split('seq_nr:')[1].split(' ')[0])
if "our_delay:" not in line:
continue
@ -194,9 +232,16 @@ for line in file:
print('%f\t' % int(reduce(lambda a, b: a + b, list(window_size.values()))), end=' ', file=out)
else:
print('%f\t' % v, end=' ', file=out)
print(float(packet_loss * 8000), float(packet_timeout * 8000), file=out)
if fill_columns:
columns += ['packet_loss', 'packet_timeout', 'bytes_sent', 'ack_nr',
'seq_nr', 'bytes_resent', 'written']
print(float(packet_loss), float(packet_timeout), float(bytes_sent),
ack_nr, seq_nr, float(bytes_resent), written, file=out)
packet_loss = 0
packet_timeout = 0
num_acked = 0
written = 0
out.close()
@ -217,6 +262,12 @@ plot = [
'y1': 'Bytes',
'y2': 'Time (ms)'
},
{
'data': ['max_window', 'send_buffer', 'cur_window', 'written'],
'title': 'bytes-written',
'y1': 'Bytes',
'y2': 'Time (ms)'
},
{
'data': ['upload_rate', 'max_window', 'cur_window', 'wnduser', 'cur_window_packets', 'packet_size', 'rtt'],
'title': 'slow-start',
@ -229,6 +280,30 @@ plot = [
'y1': 'Bytes',
'y2': 'Time (ms)'
},
{
'data': ['max_window', 'cur_window', 'packet_loss'],
'title': 'packet-loss',
'y1': 'Bytes',
'y2': 'count'
},
{
'data': ['max_window', 'cur_window', 'packet_timeout'],
'title': 'packet-timeout',
'y1': 'Bytes',
'y2': 'count'
},
{
'data': ['max_window', 'cur_window', 'bytes_sent', 'bytes_resent'],
'title': 'cumulative-bytes-sent',
'y1': 'Bytes',
'y2': 'Cumulative Bytes'
},
{
'data': ['max_window', 'cur_window', 'rto', 'timeout'],
'title': 'connection-timeout',
'y1': 'Bytes',
'y2': 'Time (ms)'
},
{
'data': ['our_delay', 'max_window', 'target_delay', 'cur_window', 'wnduser', 'cur_window_packets'],
'title': 'uploading',
@ -237,19 +312,19 @@ plot = [
},
{
'data': ['our_delay', 'max_window', 'target_delay', 'cur_window', 'send_buffer'],
'title': 'uploading_packets',
'title': 'uploading-packets',
'y1': 'Bytes',
'y2': 'Time (ms)'
},
{
'data': ['their_delay', 'target_delay', 'rtt'],
'title': 'their_delay',
'title': 'their-delay',
'y1': '',
'y2': 'Time (ms)'
},
{
'data': ['their_actual_delay', 'their_delay_base'],
'title': 'their_delay_base',
'title': 'their-delay-base',
'y1': 'Time (us)',
'y2': ''
},
@ -261,9 +336,21 @@ plot = [
},
{
'data': ['actual_delay', 'delay_base'],
'title': 'our_delay_base',
'title': 'our-delay-base',
'y1': 'Time (us)',
'y2': ''
},
{
'data': ['ack_nr', 'seq_nr'],
'title': 'sequence-numbers',
'y1': 'Bytes',
'y2': 'seqnr'
},
{
'data': ['max_window', 'cur_window', 'acked_bytes'],
'title': 'ack-rate',
'y1': 'Bytes',
'y2': 'Bytes'
}
]