fix uTP NAGLE algorithm some more (seems stable now)

This commit is contained in:
Arvid Norberg 2012-06-28 03:53:52 +00:00
parent a5985f2758
commit a583c2fe88
1 changed files with 68 additions and 105 deletions

View File

@ -297,7 +297,9 @@ struct utp_socket_impl
void defer_ack(); void defer_ack();
void remove_sack_header(packet* p); void remove_sack_header(packet* p);
bool send_pkt(bool ack);
enum packet_flags_t { pkt_ack = 1, pkt_fin = 2 };
bool send_pkt(int flags = 0);
bool resend_packet(packet* p, bool fast_resend = false); bool resend_packet(packet* p, bool fast_resend = false);
void send_reset(utp_header* ph); void send_reset(utp_header* ph);
void parse_sack(boost::uint16_t packet_ack, boost::uint8_t const* ptr, int size, int* acked_bytes void parse_sack(boost::uint16_t packet_ack, boost::uint8_t const* ptr, int size, int* acked_bytes
@ -690,7 +692,7 @@ void utp_send_ack(utp_socket_impl* s)
{ {
TORRENT_ASSERT(s->m_deferred_ack); TORRENT_ASSERT(s->m_deferred_ack);
s->m_deferred_ack = false; s->m_deferred_ack = false;
s->send_pkt(true); s->send_pkt(utp_socket_impl::pkt_ack);
} }
void utp_socket_impl::update_mtu_limits() void utp_socket_impl::update_mtu_limits()
@ -1033,7 +1035,7 @@ void utp_stream::set_write_handler(handler_t h)
// try to write. send_pkt returns false if there's // try to write. send_pkt returns false if there's
// no more payload to send or if the congestion window // no more payload to send or if the congestion window
// is full and we can't send more packets right now // is full and we can't send more packets right now
while (m_impl->send_pkt(false)); while (m_impl->send_pkt());
// if there was an error in send_pkt(), m_impl may be // if there was an error in send_pkt(), m_impl may be
// 0 at this point // 0 at this point
@ -1261,71 +1263,10 @@ void utp_socket_impl::send_fin()
{ {
INVARIANT_CHECK; INVARIANT_CHECK;
TORRENT_ASSERT(m_state != UTP_STATE_FIN_SENT); send_pkt(pkt_fin);
// unless there was an error, we're now
// we need a heap allocated packet in order to stick it // in FIN-SENT state
// in the send buffer, so that we can resend it if (!m_error)
packet* p = (packet*)malloc(sizeof(packet) + sizeof(utp_header));
p->size = sizeof(utp_header);
p->header_size = sizeof(utp_header);
p->num_transmissions = 1;
p->need_resend = false;
utp_header* h = (utp_header*)p->buf;
h->type_ver = (ST_FIN << 4) | 1;
h->extension = 0;
h->connection_id = m_send_id;
h->timestamp_difference_microseconds = m_reply_micro;
h->wnd_size = m_in_buf_size - m_buffered_incoming_bytes - m_receive_buffer_size;
h->seq_nr = m_seq_nr;
h->ack_nr = m_ack_nr;
ptime now = time_now_hires();
p->send_time = now;
h->timestamp_microseconds = boost::uint32_t(total_microseconds(now - min_time()));
error_code ec;
m_sm->send_packet(udp::endpoint(m_remote_address, m_port)
, (char const*)h, sizeof(utp_header), ec);
#if TORRENT_UTP_LOG
UTP_LOGV("%8p: sending FIN seq_nr:%d ack_nr:%d type:%s "
"id:%d target:%s size:%d error:%s send_buffer_size:%d\n"
, this, int(h->seq_nr), int(h->ack_nr), packet_type_names[h->get_type()]
, m_send_id, print_endpoint(udp::endpoint(m_remote_address, m_port)).c_str()
, int(sizeof(utp_header)), ec.message().c_str(), m_write_buffer_size);
#endif
if (ec)
{
m_error = ec;
m_state = UTP_STATE_ERROR_WAIT;
test_socket_state();
free(p);
return;
}
#if !TORRENT_UT_SEQ
// if the other end closed the connection immediately
// our FIN packet will end up having the same sequence
// number as the SYN, so this assert is invalid
TORRENT_ASSERT(!m_outbuf.at(m_seq_nr));
#endif
packet* old = (packet*)m_outbuf.insert(m_seq_nr, p);
if (old)
{
TORRENT_ASSERT(((utp_header*)old->buf)->seq_nr == m_seq_nr);
if (!old->need_resend) m_bytes_in_flight -= old->size - old->header_size;
free(old);
}
TORRENT_ASSERT(h->seq_nr == m_seq_nr);
m_seq_nr = (m_seq_nr + 1) & ACK_MASK;
m_fast_resend_seq_nr = m_seq_nr;
TORRENT_ASSERT(!m_error);
m_state = UTP_STATE_FIN_SENT; m_state = UTP_STATE_FIN_SENT;
#if TORRENT_UTP_LOG #if TORRENT_UTP_LOG
@ -1561,14 +1502,13 @@ void utp_socket_impl::remove_sack_header(packet* p)
// if ack is true, we need to send a packet regardless of if there's // if ack is true, we need to send a packet regardless of if there's
// any data. Returns true if we could send more data (i.e. call // any data. Returns true if we could send more data (i.e. call
// send_pkt() again) // send_pkt() again)
bool utp_socket_impl::send_pkt(bool ack) bool utp_socket_impl::send_pkt(int flags)
{ {
INVARIANT_CHECK; INVARIANT_CHECK;
// This assert is bad because we call this function to ack bool force = (flags & pkt_ack) || (flags & pkt_fin);
// received FIN when we're in UTP_STATE_FIN_SENT.
// // TORRENT_ASSERT(m_state != UTP_STATE_FIN_SENT || (flags & pkt_ack));
// TORRENT_ASSERT(m_state != UTP_STATE_FIN_SENT);
// first see if we need to resend any packets // first see if we need to resend any packets
@ -1580,10 +1520,10 @@ bool utp_socket_impl::send_pkt(bool ack)
if (!resend_packet(p)) if (!resend_packet(p))
{ {
// we couldn't resend the packet. It probably doesn't // we couldn't resend the packet. It probably doesn't
// fit in our cwnd. If ack is set, we need to continue // fit in our cwnd. If force is set, we need to continue
// to send our ack anyway, if we don't have to send an // to send our force anyway, if we don't have to send an
// ack, we might as well return // force, we might as well return
if (!ack) return false; if (!force) return false;
// resend_packet might have failed // resend_packet might have failed
if (m_state == UTP_STATE_ERROR_WAIT || m_state == UTP_STATE_DELETE) return false; if (m_state == UTP_STATE_ERROR_WAIT || m_state == UTP_STATE_DELETE) return false;
break; break;
@ -1617,7 +1557,8 @@ bool utp_socket_impl::send_pkt(bool ack)
// this means there's not enough room in the send window for // this means there's not enough room in the send window for
// another packet. We have to hold off sending this data. // another packet. We have to hold off sending this data.
// we still need to send an ACK though // we still need to send an ACK though
payload_size = 0; // if we're trying to send a FIN, make an exception
if ((flags & pkt_fin) == 0) payload_size = 0;
// we're constrained by the window size // we're constrained by the window size
m_last_cwnd_hit = time_now_hires(); m_last_cwnd_hit = time_now_hires();
@ -1628,7 +1569,7 @@ bool utp_socket_impl::send_pkt(bool ack)
, this, m_write_buffer_size, int(m_cwnd >> 16) , this, m_write_buffer_size, int(m_cwnd >> 16)
, m_adv_wnd, m_bytes_in_flight, m_mtu); , m_adv_wnd, m_bytes_in_flight, m_mtu);
if (!ack) if (!force)
{ {
#if TORRENT_UTP_LOG #if TORRENT_UTP_LOG
UTP_LOGV("%8p: skipping send seq_nr:%d ack_nr:%d " UTP_LOGV("%8p: skipping send seq_nr:%d ack_nr:%d "
@ -1644,11 +1585,11 @@ bool utp_socket_impl::send_pkt(bool ack)
} }
// if we don't have any data to send, or can't send any data // if we don't have any data to send, or can't send any data
// and we don't have any data to ack, don't send a packet // and we don't have any data to force, don't send a packet
if (payload_size == 0 && !ack && !m_nagle_packet) if (payload_size == 0 && !force && !m_nagle_packet)
{ {
#if TORRENT_UTP_LOG #if TORRENT_UTP_LOG
UTP_LOGV("%8p: skipping send (no payload and no ack) seq_nr:%d ack_nr:%d " UTP_LOGV("%8p: skipping send (no payload and no force) seq_nr:%d ack_nr:%d "
"id:%d target:%s header_size:%d error:%s send_buffer_size:%d cwnd:%d " "id:%d target:%s header_size:%d error:%s send_buffer_size:%d cwnd:%d "
"adv_wnd:%d in-flight:%d mtu:%d\n" "adv_wnd:%d in-flight:%d mtu:%d\n"
, this, int(m_seq_nr), int(m_ack_nr) , this, int(m_seq_nr), int(m_ack_nr)
@ -1661,23 +1602,13 @@ bool utp_socket_impl::send_pkt(bool ack)
int packet_size = header_size + payload_size; int packet_size = header_size + payload_size;
// MTU DISCOVERY
bool use_as_probe = false;
if (m_mtu_seq == 0
&& packet_size > m_mtu_floor
&& m_seq_nr != 0)
{
use_as_probe = true;
m_mtu_seq = m_seq_nr;
}
packet* p = NULL; packet* p = NULL;
boost::uint8_t* ptr = NULL; boost::uint8_t* ptr = NULL;
utp_header* h = NULL; utp_header* h = NULL;
// payload size being zero means we're just sending // payload size being zero means we're just sending
// and ack. We should not pick up the nagle packet // an force. We should not pick up the nagle packet
if (!m_nagle_packet || (payload_size == 0 && ack)) if (!m_nagle_packet || (payload_size == 0 && force))
{ {
// we only need a heap allocation if we have payload and // we only need a heap allocation if we have payload and
// need to keep the packet around (in the outbuf) // need to keep the packet around (in the outbuf)
@ -1694,7 +1625,7 @@ bool utp_socket_impl::send_pkt(bool ack)
#ifdef TORRENT_DEBUG #ifdef TORRENT_DEBUG
stack_alloced = true; stack_alloced = true;
#endif #endif
TORRENT_ASSERT(ack); TORRENT_ASSERT(force);
p = (packet*)TORRENT_ALLOCA(char, sizeof(packet) + packet_size); p = (packet*)TORRENT_ALLOCA(char, sizeof(packet) + packet_size);
UTP_LOGV("%8p: allocating %d bytes on the stack\n", this, packet_size); UTP_LOGV("%8p: allocating %d bytes on the stack\n", this, packet_size);
p->allocated = packet_size; p->allocated = packet_size;
@ -1704,7 +1635,6 @@ bool utp_socket_impl::send_pkt(bool ack)
p->header_size = packet_size - payload_size; p->header_size = packet_size - payload_size;
p->num_transmissions = 0; p->num_transmissions = 0;
p->need_resend = false; p->need_resend = false;
p->mtu_probe = use_as_probe;
ptr = p->buf; ptr = p->buf;
h = (utp_header*)ptr; h = (utp_header*)ptr;
ptr += sizeof(utp_header); ptr += sizeof(utp_header);
@ -1726,8 +1656,9 @@ bool utp_socket_impl::send_pkt(bool ack)
ptr = p->buf + sizeof(utp_header); ptr = p->buf + sizeof(utp_header);
h = (utp_header*)p->buf; h = (utp_header*)p->buf;
TORRENT_ASSERT(h->seq_nr == m_seq_nr);
// if the packet has a selective ack header, we'll need // if the packet has a selective force header, we'll need
// to update it // to update it
if (h->extension == 1) if (h->extension == 1)
{ {
@ -1756,7 +1687,7 @@ bool utp_socket_impl::send_pkt(bool ack)
// no bytes in flight // no bytes in flight
if (m_bytes_in_flight > 0 if (m_bytes_in_flight > 0
&& p->size < p->allocated && p->size < p->allocated
&& !ack && !force
&& m_nagle) && m_nagle)
{ {
return false; return false;
@ -1780,8 +1711,8 @@ bool utp_socket_impl::send_pkt(bool ack)
} }
if (m_bytes_in_flight > 0 if (m_bytes_in_flight > 0
&& payload_size < m_mtu - header_size && p->size < p->allocated
&& !ack && !force
&& m_nagle) && m_nagle)
{ {
// this is nagle. If we don't have a full packet // this is nagle. If we don't have a full packet
@ -1794,14 +1725,32 @@ bool utp_socket_impl::send_pkt(bool ack)
, this, m_write_buffer_size, int(m_cwnd >> 16) , this, m_write_buffer_size, int(m_cwnd >> 16)
, m_adv_wnd, m_bytes_in_flight, m_mtu); , m_adv_wnd, m_bytes_in_flight, m_mtu);
TORRENT_ASSERT(m_nagle_packet == NULL); TORRENT_ASSERT(m_nagle_packet == NULL);
TORRENT_ASSERT(h->seq_nr == m_seq_nr);
m_nagle_packet = p; m_nagle_packet = p;
return false; return false;
} }
// MTU DISCOVERY
if (m_mtu_seq == 0
&& packet_size > m_mtu_floor
&& m_seq_nr != 0)
{
p->mtu_probe = true;
m_mtu_seq = m_seq_nr;
}
else
{
p->mtu_probe = false;
}
h->timestamp_difference_microseconds = m_reply_micro; h->timestamp_difference_microseconds = m_reply_micro;
h->wnd_size = m_in_buf_size - m_buffered_incoming_bytes - m_receive_buffer_size; h->wnd_size = m_in_buf_size - m_buffered_incoming_bytes - m_receive_buffer_size;
h->ack_nr = m_ack_nr; h->ack_nr = m_ack_nr;
// if this is a FIN packet, override the type
if (flags & pkt_fin)
h->type_ver = (ST_FIN << 4) | 1;
// fill in the timestamp as late as possible // fill in the timestamp as late as possible
++p->num_transmissions; ++p->num_transmissions;
ptime now = time_now_hires(); ptime now = time_now_hires();
@ -1851,14 +1800,19 @@ bool utp_socket_impl::send_pkt(bool ack)
// if we have payload, we need to save the packet until it's acked // if we have payload, we need to save the packet until it's acked
// and progress m_seq_nr // and progress m_seq_nr
if (payload_size) if (p->size > p->header_size)
{ {
// if we're sending a payload packet, there should not
// be a nagle packet waiting for more data
TORRENT_ASSERT(m_nagle_packet == NULL);
#if !TORRENT_UT_SEQ #if !TORRENT_UT_SEQ
// if the other end closed the connection immediately // if the other end closed the connection immediately
// our FIN packet will end up having the same sequence // our FIN packet will end up having the same sequence
// number as the SYN, so this assert is invalid // number as the SYN, so this assert is invalid
TORRENT_ASSERT(!m_outbuf.at(m_seq_nr)); TORRENT_ASSERT(!m_outbuf.at(m_seq_nr));
#endif #endif
TORRENT_ASSERT(h->seq_nr == m_seq_nr);
packet* old = (packet*)m_outbuf.insert(m_seq_nr, p); packet* old = (packet*)m_outbuf.insert(m_seq_nr, p);
if (old) if (old)
{ {
@ -1871,6 +1825,10 @@ bool utp_socket_impl::send_pkt(bool ack)
TORRENT_ASSERT(payload_size >= 0); TORRENT_ASSERT(payload_size >= 0);
m_bytes_in_flight += p->size - p->header_size; m_bytes_in_flight += p->size - p->header_size;
} }
else
{
TORRENT_ASSERT(h->seq_nr == m_seq_nr);
}
return m_write_buffer_size > 0 && !m_cwnd_full; return m_write_buffer_size > 0 && !m_cwnd_full;
} }
@ -3260,14 +3218,19 @@ void utp_socket_impl::check_invariant() const
i = (i + 1) & ACK_MASK) i = (i + 1) & ACK_MASK)
{ {
packet* p = (packet*)m_outbuf.at(i); packet* p = (packet*)m_outbuf.at(i);
if (m_mtu_seq == i && m_mtu_seq != 0) if (m_mtu_seq == i && m_mtu_seq != 0 && p)
{ {
TORRENT_ASSERT(p); TORRENT_ASSERT(p->mtu_probe);
if (p) TORRENT_ASSERT(p->mtu_probe);
} }
if (!p) continue; if (!p) continue;
TORRENT_ASSERT(((utp_header*)p->buf)->seq_nr == i); TORRENT_ASSERT(((utp_header*)p->buf)->seq_nr == i);
} }
if (m_nagle_packet)
{
// if this packet is full, it should have been sent
TORRENT_ASSERT(m_nagle_packet->size < m_nagle_packet->allocated);
}
} }
#endif #endif
} }