From a583c2fe88a668e228170ff71ec2387f0b5f9e09 Mon Sep 17 00:00:00 2001 From: Arvid Norberg Date: Thu, 28 Jun 2012 03:53:52 +0000 Subject: [PATCH] fix uTP NAGLE algorithm some more (seems stable now) --- src/utp_stream.cpp | 173 ++++++++++++++++++--------------------------- 1 file changed, 68 insertions(+), 105 deletions(-) diff --git a/src/utp_stream.cpp b/src/utp_stream.cpp index 96ae017d2..93ad5b821 100644 --- a/src/utp_stream.cpp +++ b/src/utp_stream.cpp @@ -297,7 +297,9 @@ struct utp_socket_impl void defer_ack(); void remove_sack_header(packet* p); - bool send_pkt(bool ack); + + enum packet_flags_t { pkt_ack = 1, pkt_fin = 2 }; + bool send_pkt(int flags = 0); bool resend_packet(packet* p, bool fast_resend = false); void send_reset(utp_header* ph); void parse_sack(boost::uint16_t packet_ack, boost::uint8_t const* ptr, int size, int* acked_bytes @@ -690,7 +692,7 @@ void utp_send_ack(utp_socket_impl* s) { TORRENT_ASSERT(s->m_deferred_ack); s->m_deferred_ack = false; - s->send_pkt(true); + s->send_pkt(utp_socket_impl::pkt_ack); } void utp_socket_impl::update_mtu_limits() @@ -1033,7 +1035,7 @@ void utp_stream::set_write_handler(handler_t h) // try to write. send_pkt returns false if there's // no more payload to send or if the congestion window // is full and we can't send more packets right now - while (m_impl->send_pkt(false)); + while (m_impl->send_pkt()); // if there was an error in send_pkt(), m_impl may be // 0 at this point @@ -1261,72 +1263,11 @@ void utp_socket_impl::send_fin() { INVARIANT_CHECK; - TORRENT_ASSERT(m_state != UTP_STATE_FIN_SENT); - - // we need a heap allocated packet in order to stick it - // in the send buffer, so that we can resend it - packet* p = (packet*)malloc(sizeof(packet) + sizeof(utp_header)); - - p->size = sizeof(utp_header); - p->header_size = sizeof(utp_header); - p->num_transmissions = 1; - p->need_resend = false; - utp_header* h = (utp_header*)p->buf; - - h->type_ver = (ST_FIN << 4) | 1; - h->extension = 0; - h->connection_id = m_send_id; - h->timestamp_difference_microseconds = m_reply_micro; - h->wnd_size = m_in_buf_size - m_buffered_incoming_bytes - m_receive_buffer_size; - h->seq_nr = m_seq_nr; - h->ack_nr = m_ack_nr; - - ptime now = time_now_hires(); - p->send_time = now; - h->timestamp_microseconds = boost::uint32_t(total_microseconds(now - min_time())); - - error_code ec; - m_sm->send_packet(udp::endpoint(m_remote_address, m_port) - , (char const*)h, sizeof(utp_header), ec); - -#if TORRENT_UTP_LOG - UTP_LOGV("%8p: sending FIN seq_nr:%d ack_nr:%d type:%s " - "id:%d target:%s size:%d error:%s send_buffer_size:%d\n" - , this, int(h->seq_nr), int(h->ack_nr), packet_type_names[h->get_type()] - , m_send_id, print_endpoint(udp::endpoint(m_remote_address, m_port)).c_str() - , int(sizeof(utp_header)), ec.message().c_str(), m_write_buffer_size); -#endif - - if (ec) - { - m_error = ec; - m_state = UTP_STATE_ERROR_WAIT; - test_socket_state(); - free(p); - return; - } - -#if !TORRENT_UT_SEQ - // if the other end closed the connection immediately - // our FIN packet will end up having the same sequence - // number as the SYN, so this assert is invalid - TORRENT_ASSERT(!m_outbuf.at(m_seq_nr)); -#endif - - packet* old = (packet*)m_outbuf.insert(m_seq_nr, p); - if (old) - { - TORRENT_ASSERT(((utp_header*)old->buf)->seq_nr == m_seq_nr); - if (!old->need_resend) m_bytes_in_flight -= old->size - old->header_size; - free(old); - } - TORRENT_ASSERT(h->seq_nr == m_seq_nr); - - m_seq_nr = (m_seq_nr + 1) & ACK_MASK; - m_fast_resend_seq_nr = m_seq_nr; - - TORRENT_ASSERT(!m_error); - m_state = UTP_STATE_FIN_SENT; + send_pkt(pkt_fin); + // unless there was an error, we're now + // in FIN-SENT state + if (!m_error) + m_state = UTP_STATE_FIN_SENT; #if TORRENT_UTP_LOG UTP_LOGV("%8p: state:%s\n", this, socket_state_names[m_state]); @@ -1561,14 +1502,13 @@ void utp_socket_impl::remove_sack_header(packet* p) // if ack is true, we need to send a packet regardless of if there's // any data. Returns true if we could send more data (i.e. call // send_pkt() again) -bool utp_socket_impl::send_pkt(bool ack) +bool utp_socket_impl::send_pkt(int flags) { INVARIANT_CHECK; - // This assert is bad because we call this function to ack - // received FIN when we're in UTP_STATE_FIN_SENT. - // - // TORRENT_ASSERT(m_state != UTP_STATE_FIN_SENT); + bool force = (flags & pkt_ack) || (flags & pkt_fin); + +// TORRENT_ASSERT(m_state != UTP_STATE_FIN_SENT || (flags & pkt_ack)); // first see if we need to resend any packets @@ -1580,10 +1520,10 @@ bool utp_socket_impl::send_pkt(bool ack) if (!resend_packet(p)) { // we couldn't resend the packet. It probably doesn't - // fit in our cwnd. If ack is set, we need to continue - // to send our ack anyway, if we don't have to send an - // ack, we might as well return - if (!ack) return false; + // fit in our cwnd. If force is set, we need to continue + // to send our force anyway, if we don't have to send an + // force, we might as well return + if (!force) return false; // resend_packet might have failed if (m_state == UTP_STATE_ERROR_WAIT || m_state == UTP_STATE_DELETE) return false; break; @@ -1617,7 +1557,8 @@ bool utp_socket_impl::send_pkt(bool ack) // this means there's not enough room in the send window for // another packet. We have to hold off sending this data. // we still need to send an ACK though - payload_size = 0; + // if we're trying to send a FIN, make an exception + if ((flags & pkt_fin) == 0) payload_size = 0; // we're constrained by the window size m_last_cwnd_hit = time_now_hires(); @@ -1628,7 +1569,7 @@ bool utp_socket_impl::send_pkt(bool ack) , this, m_write_buffer_size, int(m_cwnd >> 16) , m_adv_wnd, m_bytes_in_flight, m_mtu); - if (!ack) + if (!force) { #if TORRENT_UTP_LOG UTP_LOGV("%8p: skipping send seq_nr:%d ack_nr:%d " @@ -1644,11 +1585,11 @@ bool utp_socket_impl::send_pkt(bool ack) } // if we don't have any data to send, or can't send any data - // and we don't have any data to ack, don't send a packet - if (payload_size == 0 && !ack && !m_nagle_packet) + // and we don't have any data to force, don't send a packet + if (payload_size == 0 && !force && !m_nagle_packet) { #if TORRENT_UTP_LOG - UTP_LOGV("%8p: skipping send (no payload and no ack) seq_nr:%d ack_nr:%d " + UTP_LOGV("%8p: skipping send (no payload and no force) seq_nr:%d ack_nr:%d " "id:%d target:%s header_size:%d error:%s send_buffer_size:%d cwnd:%d " "adv_wnd:%d in-flight:%d mtu:%d\n" , this, int(m_seq_nr), int(m_ack_nr) @@ -1661,23 +1602,13 @@ bool utp_socket_impl::send_pkt(bool ack) int packet_size = header_size + payload_size; - // MTU DISCOVERY - bool use_as_probe = false; - if (m_mtu_seq == 0 - && packet_size > m_mtu_floor - && m_seq_nr != 0) - { - use_as_probe = true; - m_mtu_seq = m_seq_nr; - } - packet* p = NULL; boost::uint8_t* ptr = NULL; utp_header* h = NULL; // payload size being zero means we're just sending - // and ack. We should not pick up the nagle packet - if (!m_nagle_packet || (payload_size == 0 && ack)) + // an force. We should not pick up the nagle packet + if (!m_nagle_packet || (payload_size == 0 && force)) { // we only need a heap allocation if we have payload and // need to keep the packet around (in the outbuf) @@ -1694,7 +1625,7 @@ bool utp_socket_impl::send_pkt(bool ack) #ifdef TORRENT_DEBUG stack_alloced = true; #endif - TORRENT_ASSERT(ack); + TORRENT_ASSERT(force); p = (packet*)TORRENT_ALLOCA(char, sizeof(packet) + packet_size); UTP_LOGV("%8p: allocating %d bytes on the stack\n", this, packet_size); p->allocated = packet_size; @@ -1704,7 +1635,6 @@ bool utp_socket_impl::send_pkt(bool ack) p->header_size = packet_size - payload_size; p->num_transmissions = 0; p->need_resend = false; - p->mtu_probe = use_as_probe; ptr = p->buf; h = (utp_header*)ptr; ptr += sizeof(utp_header); @@ -1726,8 +1656,9 @@ bool utp_socket_impl::send_pkt(bool ack) ptr = p->buf + sizeof(utp_header); h = (utp_header*)p->buf; + TORRENT_ASSERT(h->seq_nr == m_seq_nr); - // if the packet has a selective ack header, we'll need + // if the packet has a selective force header, we'll need // to update it if (h->extension == 1) { @@ -1756,7 +1687,7 @@ bool utp_socket_impl::send_pkt(bool ack) // no bytes in flight if (m_bytes_in_flight > 0 && p->size < p->allocated - && !ack + && !force && m_nagle) { return false; @@ -1780,8 +1711,8 @@ bool utp_socket_impl::send_pkt(bool ack) } if (m_bytes_in_flight > 0 - && payload_size < m_mtu - header_size - && !ack + && p->size < p->allocated + && !force && m_nagle) { // this is nagle. If we don't have a full packet @@ -1794,14 +1725,32 @@ bool utp_socket_impl::send_pkt(bool ack) , this, m_write_buffer_size, int(m_cwnd >> 16) , m_adv_wnd, m_bytes_in_flight, m_mtu); TORRENT_ASSERT(m_nagle_packet == NULL); + TORRENT_ASSERT(h->seq_nr == m_seq_nr); m_nagle_packet = p; return false; } + // MTU DISCOVERY + if (m_mtu_seq == 0 + && packet_size > m_mtu_floor + && m_seq_nr != 0) + { + p->mtu_probe = true; + m_mtu_seq = m_seq_nr; + } + else + { + p->mtu_probe = false; + } + h->timestamp_difference_microseconds = m_reply_micro; h->wnd_size = m_in_buf_size - m_buffered_incoming_bytes - m_receive_buffer_size; h->ack_nr = m_ack_nr; + // if this is a FIN packet, override the type + if (flags & pkt_fin) + h->type_ver = (ST_FIN << 4) | 1; + // fill in the timestamp as late as possible ++p->num_transmissions; ptime now = time_now_hires(); @@ -1851,14 +1800,19 @@ bool utp_socket_impl::send_pkt(bool ack) // if we have payload, we need to save the packet until it's acked // and progress m_seq_nr - if (payload_size) + if (p->size > p->header_size) { + // if we're sending a payload packet, there should not + // be a nagle packet waiting for more data + TORRENT_ASSERT(m_nagle_packet == NULL); + #if !TORRENT_UT_SEQ // if the other end closed the connection immediately // our FIN packet will end up having the same sequence // number as the SYN, so this assert is invalid TORRENT_ASSERT(!m_outbuf.at(m_seq_nr)); #endif + TORRENT_ASSERT(h->seq_nr == m_seq_nr); packet* old = (packet*)m_outbuf.insert(m_seq_nr, p); if (old) { @@ -1871,6 +1825,10 @@ bool utp_socket_impl::send_pkt(bool ack) TORRENT_ASSERT(payload_size >= 0); m_bytes_in_flight += p->size - p->header_size; } + else + { + TORRENT_ASSERT(h->seq_nr == m_seq_nr); + } return m_write_buffer_size > 0 && !m_cwnd_full; } @@ -3260,14 +3218,19 @@ void utp_socket_impl::check_invariant() const i = (i + 1) & ACK_MASK) { packet* p = (packet*)m_outbuf.at(i); - if (m_mtu_seq == i && m_mtu_seq != 0) + if (m_mtu_seq == i && m_mtu_seq != 0 && p) { - TORRENT_ASSERT(p); - if (p) TORRENT_ASSERT(p->mtu_probe); + TORRENT_ASSERT(p->mtu_probe); } if (!p) continue; TORRENT_ASSERT(((utp_header*)p->buf)->seq_nr == i); } + + if (m_nagle_packet) + { + // if this packet is full, it should have been sent + TORRENT_ASSERT(m_nagle_packet->size < m_nagle_packet->allocated); + } } #endif }