From e51e953cb9bf47e51ec8a66552412bb7709074b5 Mon Sep 17 00:00:00 2001 From: Arvid Norberg Date: Mon, 25 Jun 2012 06:27:37 +0000 Subject: [PATCH] fix nagle implementation in uTP --- ChangeLog | 2 + src/utp_stream.cpp | 305 ++++++++++++++++++++++++++++++++------------- 2 files changed, 222 insertions(+), 85 deletions(-) diff --git a/ChangeLog b/ChangeLog index 28748bd66..459e34b32 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,5 @@ + * fix nagle implementation in uTP + * minor uTP tweaks * fix end-game mode issue when some files are selected to not be downloaded * improve uTP slow start diff --git a/src/utp_stream.cpp b/src/utp_stream.cpp index 072bb7ada..5b94f187d 100644 --- a/src/utp_stream.cpp +++ b/src/utp_stream.cpp @@ -136,6 +136,9 @@ struct packet // the last time this packet was sent ptime send_time; + // the number of bytes actually allocated in 'buf' + boost::uint16_t allocated; + // the size of the buffer 'buf' points to boost::uint16_t size; @@ -157,7 +160,7 @@ struct packet bool mtu_probe:1; // the actual packet buffer - char buf[]; + boost::uint8_t buf[]; }; // since the uTP socket state may be needed after the @@ -214,6 +217,7 @@ struct utp_socket_impl , void* userdata, utp_socket_manager* sm) : m_sm(sm) , m_userdata(userdata) + , m_nagle_packet(NULL) , m_read_handler(0) , m_write_handler(0) , m_connect_handler(0) @@ -271,7 +275,7 @@ struct utp_socket_impl void tick(ptime const& now); void init_mtu(int link_mtu, int utp_mtu); - bool incoming_packet(char const* buf, int size + bool incoming_packet(boost::uint8_t const* buf, int size , udp::endpoint const& ep, ptime receive_time); bool should_delete() const; tcp::endpoint remote_endpoint(error_code& ec) const @@ -291,17 +295,18 @@ struct utp_socket_impl void send_fin(); void defer_ack(); + void remove_sack_header(packet* p); bool send_pkt(bool ack); bool resend_packet(packet* p, bool fast_resend = false); void send_reset(utp_header* ph); - void parse_sack(boost::uint16_t packet_ack, char const* ptr, int size, int* acked_bytes + void parse_sack(boost::uint16_t packet_ack, boost::uint8_t const* ptr, int size, int* acked_bytes , ptime const now, boost::uint32_t& min_rtt); - void write_payload(char* ptr, int size); + void write_payload(boost::uint8_t* ptr, int size); void maybe_inc_acked_seq_nr(); void ack_packet(packet* p, ptime const& receive_time , boost::uint32_t& min_rtt, boost::uint16_t seq_nr); - void write_sack(char* buf, int size) const; - void incoming(char const* buf, int size, packet* p, ptime now); + void write_sack(boost::uint8_t* buf, int size) const; + void incoming(boost::uint8_t const* buf, int size, packet* p, ptime now); void do_ledbat(int acked_bytes, int delay, int in_flight, ptime const now); int packet_timeout() const; bool test_socket_state(); @@ -309,7 +314,7 @@ struct utp_socket_impl void maybe_trigger_send_callback(ptime now); bool cancel_handlers(error_code const& ec, bool kill); bool consume_incoming_data( - utp_header const* ph, char const* ptr, int payload_size, ptime now); + utp_header const* ph, boost::uint8_t const* ptr, int payload_size, ptime now); void update_mtu_limits(); void experienced_loss(int seq_nr); @@ -350,6 +355,11 @@ struct utp_socket_impl // buffers. Buffers that empty are erased from the vector. std::vector m_write_buffer; + // if this is non NULL, it's a packet. This packet was held off because + // of NAGLE. We couldn't send it immediately. It's left + // here to accrue more bytes before we send it. + packet* m_nagle_packet; + // the user provided read buffer. If this has a size greater // than 0, we'll always prefer using it over putting received // data in the m_receive_buffer. As data is stored in the @@ -651,7 +661,7 @@ void utp_init_mtu(utp_socket_impl* s, int link_mtu, int utp_mtu) bool utp_incoming_packet(utp_socket_impl* s, char const* p , int size, udp::endpoint const& ep, ptime receive_time) { - return s->incoming_packet(p, size, ep, receive_time); + return s->incoming_packet((boost::uint8_t const*)p, size, ep, receive_time); } bool utp_match(utp_socket_impl* s, udp::endpoint const& ep, boost::uint16_t id) @@ -1321,7 +1331,7 @@ std::size_t utp_socket_impl::available() const return m_receive_buffer_size; } -void utp_socket_impl::parse_sack(boost::uint16_t packet_ack, char const* ptr +void utp_socket_impl::parse_sack(boost::uint16_t packet_ack, boost::uint8_t const* ptr , int size, int* acked_bytes, ptime const now, boost::uint32_t& min_rtt) { if (size == 0) return; @@ -1332,7 +1342,7 @@ void utp_socket_impl::parse_sack(boost::uint16_t packet_ack, char const* ptr #if TORRENT_VERBOSE_UTP_LOG std::string bitmask; bitmask.reserve(size); - for (char const* b = ptr, *end = ptr + size; b != end; ++b) + for (boost::uint8_t const* b = ptr, *end = ptr + size; b != end; ++b) { unsigned char bitfield = unsigned(*b); unsigned char mask = 1; @@ -1355,7 +1365,7 @@ void utp_socket_impl::parse_sack(boost::uint16_t packet_ack, char const* ptr int last_ack = packet_ack; // for each byte - for (char const* end = ptr + size; ptr != end; ++ptr) + for (boost::uint8_t const* end = ptr + size; ptr != end; ++ptr) { unsigned char bitfield = unsigned(*ptr); unsigned char mask = 1; @@ -1420,7 +1430,7 @@ void utp_socket_impl::parse_sack(boost::uint16_t packet_ack, char const* ptr // copies data from the write buffer into the packet // pointed to by ptr -void utp_socket_impl::write_payload(char* ptr, int size) +void utp_socket_impl::write_payload(boost::uint8_t* ptr, int size) { #ifdef TORRENT_DEBUG int write_buffer_size = 0; @@ -1487,6 +1497,28 @@ void utp_socket_impl::defer_ack() m_sm->defer_ack(this); } +void utp_socket_impl::remove_sack_header(packet* p) +{ + // remove the sack header + boost::uint8_t* ptr = p->buf + sizeof(utp_header); + utp_header* h = (utp_header*)p->buf; + + TORRENT_ASSERT(h->extension == 1); + + h->extension = ptr[0]; + int sack_size = ptr[1]; + TORRENT_ASSERT(h->extension == 0); + + UTP_LOGV("%8p: removing SACK header, %d bytes\n" + , this, sack_size + 2); + + TORRENT_ASSERT(p->size >= p->header_size + sack_size + 2); + TORRENT_ASSERT(p->header_size >= sizeof(utp_header) + sack_size + 2); + memmove(ptr, ptr + sack_size + 2, p->size - p->header_size); + p->header_size -= sack_size + 2; + p->size -= sack_size + 2; +} + // sends a packet, pulls data from the write buffer (if there's any) // if ack is true, we need to send a packet regardless of if there's // any data. Returns true if we could send more data (i.e. call @@ -1522,8 +1554,6 @@ bool utp_socket_impl::send_pkt(bool ack) m_fast_resend_seq_nr = (m_fast_resend_seq_nr + 1) & ACK_MASK; } - bool ret = false; - int sack = 0; if (m_inbuf.size()) { @@ -1536,10 +1566,7 @@ bool utp_socket_impl::send_pkt(bool ack) int header_size = sizeof(utp_header) + (sack ? sack + 2 : 0); int payload_size = m_write_buffer_size; if (m_mtu - header_size < payload_size) - { payload_size = m_mtu - header_size; - ret = true; // there's more data to send - } // if we have one MSS worth of data, make sure it fits in our // congestion window and the advertized receive window from @@ -1556,49 +1583,42 @@ bool utp_socket_impl::send_pkt(bool ack) m_last_cwnd_hit = time_now_hires(); m_cwnd_full = true; - // there's no more space in the cwnd, no need to - // try to send more right now - ret = false; - UTP_LOGV("%8p: no space in window send_buffer_size:%d cwnd:%d " - "ret:%d adv_wnd:%d in-flight:%d mtu:%d\n" + "adv_wnd:%d in-flight:%d mtu:%d\n" , this, m_write_buffer_size, int(m_cwnd >> 16) - , ret, m_adv_wnd, m_bytes_in_flight, m_mtu); + , m_adv_wnd, m_bytes_in_flight, m_mtu); + + if (!ack) + { +#if TORRENT_UTP_LOG + UTP_LOGV("%8p: skipping send seq_nr:%d ack_nr:%d " + "id:%d target:%s header_size:%d error:%s send_buffer_size:%d cwnd:%d " + "adv_wnd:%d in-flight:%d mtu:%d\n" + , this, int(m_seq_nr), int(m_ack_nr) + , m_send_id, print_endpoint(udp::endpoint(m_remote_address, m_port)).c_str() + , header_size, m_error.message().c_str(), m_write_buffer_size, int(m_cwnd >> 16) + , m_adv_wnd, m_bytes_in_flight, m_mtu); +#endif + return false; + } } // if we don't have any data to send, or can't send any data // and we don't have any data to ack, don't send a packet - if (payload_size == 0 && !ack) + if (payload_size == 0 && !ack && !m_nagle_packet) { #if TORRENT_UTP_LOG - UTP_LOGV("%8p: skipping send seq_nr:%d ack_nr:%d " + UTP_LOGV("%8p: skipping send (no payload and no ack) seq_nr:%d ack_nr:%d " "id:%d target:%s header_size:%d error:%s send_buffer_size:%d cwnd:%d " - "ret:%d adv_wnd:%d in-flight:%d mtu:%d\n" + "adv_wnd:%d in-flight:%d mtu:%d\n" , this, int(m_seq_nr), int(m_ack_nr) , m_send_id, print_endpoint(udp::endpoint(m_remote_address, m_port)).c_str() , header_size, m_error.message().c_str(), m_write_buffer_size, int(m_cwnd >> 16) - , int(ret), m_adv_wnd, m_bytes_in_flight, m_mtu); + , m_adv_wnd, m_bytes_in_flight, m_mtu); #endif return false; } - if (((m_seq_nr - m_acked_seq_nr) & ACK_MASK) > 1 - && payload_size < m_mtu - header_size - && !ack - && m_nagle) - { - // this is nagle. If we don't have a full packet - // worth of payload to send AND we have at least - // one outstanding packet, hold off. Once the - // outstanding packet is acked, we'll send this - // payload - UTP_LOGV("%8p: NAGLE not enough payload send_buffer_size:%d cwnd:%d " - "ret:%d adv_wnd:%d in-flight:%d mtu:%d\n" - , this, m_write_buffer_size, int(m_cwnd >> 16) - , ret, m_adv_wnd, m_bytes_in_flight, m_mtu); - return false; - } - int packet_size = header_size + payload_size; // MTU DISCOVERY @@ -1611,31 +1631,104 @@ bool utp_socket_impl::send_pkt(bool ack) m_mtu_seq = m_seq_nr; } - packet* p; - // we only need a heap allocation if we have payload and - // need to keep the packet around (in the outbuf) - if (payload_size) p = (packet*)malloc(sizeof(packet) + packet_size); - else p = (packet*)TORRENT_ALLOCA(char, sizeof(packet) + packet_size); + packet* p = NULL; + boost::uint8_t* ptr = NULL; + utp_header* h = NULL; - p->size = packet_size; - p->header_size = packet_size - payload_size; - p->num_transmissions = 1; - p->need_resend = false; - p->mtu_probe = use_as_probe; - char* ptr = p->buf; - utp_header* h = (utp_header*)ptr; - ptr += sizeof(utp_header); + // payload size being zero means we're just sending + // and ack. We should not pick up the nagle packet + if (!m_nagle_packet || (payload_size == 0 && ack)) + { + // we only need a heap allocation if we have payload and + // need to keep the packet around (in the outbuf) +#ifdef TORRENT_DEBUG + bool stack_alloced = false; +#endif + if (payload_size) + { + p = (packet*)malloc(sizeof(packet) + m_mtu); + p->allocated = m_mtu; + } + else + { +#ifdef TORRENT_DEBUG + stack_alloced = true; +#endif + TORRENT_ASSERT(ack); + p = (packet*)TORRENT_ALLOCA(char, sizeof(packet) + packet_size); + UTP_LOGV("%8p: allocating %d bytes on the stack\n", this, packet_size); + p->allocated = packet_size; + } - h->type_ver = ((payload_size ? ST_DATA : ST_STATE) << 4) | 1; - h->extension = sack ? 1 : 0; - h->connection_id = m_send_id; - h->timestamp_difference_microseconds = m_reply_micro; - h->wnd_size = m_in_buf_size - m_buffered_incoming_bytes - m_receive_buffer_size; - // seq_nr is ignored for ST_STATE packets, so it doesn't - // matter that we say this is a sequence number we haven't - // actually sent yet - h->seq_nr = m_seq_nr; - h->ack_nr = m_ack_nr; + p->size = packet_size; + p->header_size = packet_size - payload_size; + p->num_transmissions = 0; + p->need_resend = false; + p->mtu_probe = use_as_probe; + ptr = p->buf; + h = (utp_header*)ptr; + ptr += sizeof(utp_header); + + h->extension = sack ? 1 : 0; + h->connection_id = m_send_id; + // seq_nr is ignored for ST_STATE packets, so it doesn't + // matter that we say this is a sequence number we haven't + // actually sent yet + h->seq_nr = m_seq_nr; + h->type_ver = ((payload_size ? ST_DATA : ST_STATE) << 4) | 1; + + write_payload(p->buf + p->header_size, payload_size); + } + else + { + // pick up the nagle packet and keep adding bytes to it + p = m_nagle_packet; + + ptr = p->buf + sizeof(utp_header); + h = (utp_header*)p->buf; + + // if the packet has a selective ack header, we'll need + // to update it + if (h->extension == 1) + { + sack = ptr[1]; + if (m_inbuf.size() == 0 && h->ack_nr != m_ack_nr) + { + // we need to remove the sack header + remove_sack_header(p); + sack = 0; + } + } + else + sack = 0; + + int size_left = p->allocated - p->size; + TORRENT_ASSERT(size_left > 0); + size_left = (std::min)(size_left, m_write_buffer_size); + write_payload(p->buf + p->size, size_left); + p->size += size_left; + + UTP_LOGV("%8p: NAGLE appending %d bytes to nagle packet. new size: %d allocated: %d\n" + , this, size_left, p->size, p->allocated); + + // did we fill up the whole mtu? + // if we didn't, we may still send it if there's + // no bytes in flight + if (m_bytes_in_flight > 0 + && p->size < p->allocated + && !ack + && m_nagle) + { + return false; + } + + // clear the nagle packet pointer and fall through + // sending p + m_nagle_packet = NULL; + + packet_size = p->size; + payload_size = p->size - p->header_size; + } if (sack) { @@ -1643,11 +1736,34 @@ bool utp_socket_impl::send_pkt(bool ack) *ptr++ = sack; // bytes for SACK bitfield write_sack(ptr, sack); ptr += sack; + TORRENT_ASSERT(ptr <= p->buf + p->header_size); } - write_payload(ptr, payload_size); + if (m_bytes_in_flight > 0 + && payload_size < m_mtu - header_size + && !ack + && m_nagle) + { + // this is nagle. If we don't have a full packet + // worth of payload to send AND we have at least + // one outstanding packet, hold off. Once the + // outstanding packet is acked, we'll send this + // payload + UTP_LOGV("%8p: NAGLE not enough payload send_buffer_size:%d cwnd:%d " + "adv_wnd:%d in-flight:%d mtu:%d\n" + , this, m_write_buffer_size, int(m_cwnd >> 16) + , m_adv_wnd, m_bytes_in_flight, m_mtu); + TORRENT_ASSERT(m_nagle_packet == NULL); + m_nagle_packet = p; + return false; + } + + h->timestamp_difference_microseconds = m_reply_micro; + h->wnd_size = m_in_buf_size - m_buffered_incoming_bytes - m_receive_buffer_size; + h->ack_nr = m_ack_nr; // fill in the timestamp as late as possible + ++p->num_transmissions; ptime now = time_now_hires(); p->send_time = now; h->timestamp_microseconds = boost::uint32_t(total_microseconds(now - min_time())); @@ -1671,7 +1787,7 @@ bool utp_socket_impl::send_pkt(bool ack) // if ((rand() % 100) > 0) #endif m_sm->send_packet(udp::endpoint(m_remote_address, m_port) - , (char const*)h, packet_size, ec + , (char const*)h, p->size, ec , use_as_probe ? utp_socket_manager::dont_fragment : 0); ++m_out_packets; @@ -1711,18 +1827,18 @@ bool utp_socket_impl::send_pkt(bool ack) } m_seq_nr = (m_seq_nr + 1) & ACK_MASK; TORRENT_ASSERT(payload_size >= 0); - m_bytes_in_flight += payload_size; + m_bytes_in_flight += p->size - p->header_size; } - return ret; + return m_write_buffer_size > 0 && !m_cwnd_full; } // size is in bytes -void utp_socket_impl::write_sack(char* buf, int size) const +void utp_socket_impl::write_sack(boost::uint8_t* buf, int size) const { TORRENT_ASSERT(m_inbuf.size()); int ack_nr = (m_ack_nr + 2) & ACK_MASK; - char* end = buf + size; + boost::uint8_t* end = buf + size; for (; buf != end; ++buf) { @@ -1778,14 +1894,27 @@ bool utp_socket_impl::resend_packet(packet* p, bool fast_resend) h->timestamp_difference_microseconds = m_reply_micro; p->send_time = time_now_hires(); h->timestamp_microseconds = boost::uint32_t(total_microseconds(p->send_time - min_time())); - if (h->extension == 0) + + // if the packet has a selective ack header, we'll need + // to update it + if (h->extension == 1 && h->ack_nr != m_ack_nr) { - // if extension != 0, there might be a SACK in the header - // and we can't update the ack field (since the SACK bits - // depend on it). If it's zero however, we can update it. - h->ack_nr = m_ack_nr; + boost::uint8_t* ptr = p->buf + sizeof(utp_header); + int sack_size = ptr[1]; + if (m_inbuf.size()) + { + // update the sack header + write_sack(ptr + 2, sack_size); + TORRENT_ASSERT(ptr + sack_size + 2 <= p->buf + p->header_size); + } + else + { + remove_sack_header(p); + } } + h->ack_nr = m_ack_nr; + error_code ec; m_sm->send_packet(udp::endpoint(m_remote_address, m_port) , (char const*)p->buf, p->size, ec); @@ -1909,7 +2038,7 @@ void utp_socket_impl::ack_packet(packet* p, ptime const& receive_time free(p); } -void utp_socket_impl::incoming(char const* buf, int size, packet* p, ptime now) +void utp_socket_impl::incoming(boost::uint8_t const* buf, int size, packet* p, ptime now) { while (!m_read_buffer.empty()) { @@ -1928,7 +2057,7 @@ void utp_socket_impl::incoming(char const* buf, int size, packet* p, ptime now) UTP_LOGV("%8p: setting read timeout to 100 ms from now\n", this); } m_read += to_copy; - target->buf = ((char*)target->buf) + to_copy; + target->buf = ((boost::uint8_t*)target->buf) + to_copy; target->len -= to_copy; buf += to_copy; UTP_LOGV("%8p: copied %d bytes into user receive buffer\n", this, to_copy); @@ -1987,7 +2116,7 @@ bool utp_socket_impl::cancel_handlers(error_code const& ec, bool kill) } bool utp_socket_impl::consume_incoming_data( - utp_header const* ph, char const* ptr, int payload_size + utp_header const* ph, boost::uint8_t const* ptr, int payload_size , ptime now) { if (ph->get_type() != ST_DATA) return false; @@ -2136,7 +2265,7 @@ void utp_socket_impl::init_mtu(int link_mtu, int utp_mtu) } // return false if this is an invalid packet -bool utp_socket_impl::incoming_packet(char const* buf, int size +bool utp_socket_impl::incoming_packet(boost::uint8_t const* buf, int size , udp::endpoint const& ep, ptime receive_time) { utp_header* ph = (utp_header*)buf; @@ -2351,7 +2480,7 @@ bool utp_socket_impl::incoming_packet(char const* buf, int size } // look for extended headers - char const* ptr = buf; + boost::uint8_t const* ptr = buf; ptr += sizeof(utp_header); unsigned int extension = ph->extension; @@ -2364,8 +2493,14 @@ bool utp_socket_impl::incoming_packet(char const* buf, int size UTP_LOGV("%8p: invalid extension header\n", this); return true; } - int next_extension = unsigned(*ptr++); - int len = unsigned(*ptr++); + int next_extension = *ptr++; + int len = *ptr++; + if (len < 0) + { + UTP_LOGV("%8p: invalid extension length:%d packet:%d\n" + , this, len, int(ptr - buf)); + return true; + } if (ptr - buf + len > size_t(size)) { UTP_LOGV("%8p: invalid extension header size:%d packet:%d\n"