fix nagle implementation in uTP

This commit is contained in:
Arvid Norberg 2012-06-25 06:27:37 +00:00
parent c691205ff7
commit e51e953cb9
2 changed files with 222 additions and 85 deletions

View File

@ -1,3 +1,5 @@
* fix nagle implementation in uTP
* minor uTP tweaks
* fix end-game mode issue when some files are selected to not be downloaded
* improve uTP slow start

View File

@ -136,6 +136,9 @@ struct packet
// the last time this packet was sent
ptime send_time;
// the number of bytes actually allocated in 'buf'
boost::uint16_t allocated;
// the size of the buffer 'buf' points to
boost::uint16_t size;
@ -157,7 +160,7 @@ struct packet
bool mtu_probe:1;
// the actual packet buffer
char buf[];
boost::uint8_t buf[];
};
// since the uTP socket state may be needed after the
@ -214,6 +217,7 @@ struct utp_socket_impl
, void* userdata, utp_socket_manager* sm)
: m_sm(sm)
, m_userdata(userdata)
, m_nagle_packet(NULL)
, m_read_handler(0)
, m_write_handler(0)
, m_connect_handler(0)
@ -271,7 +275,7 @@ struct utp_socket_impl
void tick(ptime const& now);
void init_mtu(int link_mtu, int utp_mtu);
bool incoming_packet(char const* buf, int size
bool incoming_packet(boost::uint8_t const* buf, int size
, udp::endpoint const& ep, ptime receive_time);
bool should_delete() const;
tcp::endpoint remote_endpoint(error_code& ec) const
@ -291,17 +295,18 @@ struct utp_socket_impl
void send_fin();
void defer_ack();
void remove_sack_header(packet* p);
bool send_pkt(bool ack);
bool resend_packet(packet* p, bool fast_resend = false);
void send_reset(utp_header* ph);
void parse_sack(boost::uint16_t packet_ack, char const* ptr, int size, int* acked_bytes
void parse_sack(boost::uint16_t packet_ack, boost::uint8_t const* ptr, int size, int* acked_bytes
, ptime const now, boost::uint32_t& min_rtt);
void write_payload(char* ptr, int size);
void write_payload(boost::uint8_t* ptr, int size);
void maybe_inc_acked_seq_nr();
void ack_packet(packet* p, ptime const& receive_time
, boost::uint32_t& min_rtt, boost::uint16_t seq_nr);
void write_sack(char* buf, int size) const;
void incoming(char const* buf, int size, packet* p, ptime now);
void write_sack(boost::uint8_t* buf, int size) const;
void incoming(boost::uint8_t const* buf, int size, packet* p, ptime now);
void do_ledbat(int acked_bytes, int delay, int in_flight, ptime const now);
int packet_timeout() const;
bool test_socket_state();
@ -309,7 +314,7 @@ struct utp_socket_impl
void maybe_trigger_send_callback(ptime now);
bool cancel_handlers(error_code const& ec, bool kill);
bool consume_incoming_data(
utp_header const* ph, char const* ptr, int payload_size, ptime now);
utp_header const* ph, boost::uint8_t const* ptr, int payload_size, ptime now);
void update_mtu_limits();
void experienced_loss(int seq_nr);
@ -350,6 +355,11 @@ struct utp_socket_impl
// buffers. Buffers that empty are erased from the vector.
std::vector<iovec_t> m_write_buffer;
// if this is non NULL, it's a packet. This packet was held off because
// of NAGLE. We couldn't send it immediately. It's left
// here to accrue more bytes before we send it.
packet* m_nagle_packet;
// the user provided read buffer. If this has a size greater
// than 0, we'll always prefer using it over putting received
// data in the m_receive_buffer. As data is stored in the
@ -651,7 +661,7 @@ void utp_init_mtu(utp_socket_impl* s, int link_mtu, int utp_mtu)
bool utp_incoming_packet(utp_socket_impl* s, char const* p
, int size, udp::endpoint const& ep, ptime receive_time)
{
return s->incoming_packet(p, size, ep, receive_time);
return s->incoming_packet((boost::uint8_t const*)p, size, ep, receive_time);
}
bool utp_match(utp_socket_impl* s, udp::endpoint const& ep, boost::uint16_t id)
@ -1321,7 +1331,7 @@ std::size_t utp_socket_impl::available() const
return m_receive_buffer_size;
}
void utp_socket_impl::parse_sack(boost::uint16_t packet_ack, char const* ptr
void utp_socket_impl::parse_sack(boost::uint16_t packet_ack, boost::uint8_t const* ptr
, int size, int* acked_bytes, ptime const now, boost::uint32_t& min_rtt)
{
if (size == 0) return;
@ -1332,7 +1342,7 @@ void utp_socket_impl::parse_sack(boost::uint16_t packet_ack, char const* ptr
#if TORRENT_VERBOSE_UTP_LOG
std::string bitmask;
bitmask.reserve(size);
for (char const* b = ptr, *end = ptr + size; b != end; ++b)
for (boost::uint8_t const* b = ptr, *end = ptr + size; b != end; ++b)
{
unsigned char bitfield = unsigned(*b);
unsigned char mask = 1;
@ -1355,7 +1365,7 @@ void utp_socket_impl::parse_sack(boost::uint16_t packet_ack, char const* ptr
int last_ack = packet_ack;
// for each byte
for (char const* end = ptr + size; ptr != end; ++ptr)
for (boost::uint8_t const* end = ptr + size; ptr != end; ++ptr)
{
unsigned char bitfield = unsigned(*ptr);
unsigned char mask = 1;
@ -1420,7 +1430,7 @@ void utp_socket_impl::parse_sack(boost::uint16_t packet_ack, char const* ptr
// copies data from the write buffer into the packet
// pointed to by ptr
void utp_socket_impl::write_payload(char* ptr, int size)
void utp_socket_impl::write_payload(boost::uint8_t* ptr, int size)
{
#ifdef TORRENT_DEBUG
int write_buffer_size = 0;
@ -1487,6 +1497,28 @@ void utp_socket_impl::defer_ack()
m_sm->defer_ack(this);
}
void utp_socket_impl::remove_sack_header(packet* p)
{
// remove the sack header
boost::uint8_t* ptr = p->buf + sizeof(utp_header);
utp_header* h = (utp_header*)p->buf;
TORRENT_ASSERT(h->extension == 1);
h->extension = ptr[0];
int sack_size = ptr[1];
TORRENT_ASSERT(h->extension == 0);
UTP_LOGV("%8p: removing SACK header, %d bytes\n"
, this, sack_size + 2);
TORRENT_ASSERT(p->size >= p->header_size + sack_size + 2);
TORRENT_ASSERT(p->header_size >= sizeof(utp_header) + sack_size + 2);
memmove(ptr, ptr + sack_size + 2, p->size - p->header_size);
p->header_size -= sack_size + 2;
p->size -= sack_size + 2;
}
// sends a packet, pulls data from the write buffer (if there's any)
// if ack is true, we need to send a packet regardless of if there's
// any data. Returns true if we could send more data (i.e. call
@ -1522,8 +1554,6 @@ bool utp_socket_impl::send_pkt(bool ack)
m_fast_resend_seq_nr = (m_fast_resend_seq_nr + 1) & ACK_MASK;
}
bool ret = false;
int sack = 0;
if (m_inbuf.size())
{
@ -1536,10 +1566,7 @@ bool utp_socket_impl::send_pkt(bool ack)
int header_size = sizeof(utp_header) + (sack ? sack + 2 : 0);
int payload_size = m_write_buffer_size;
if (m_mtu - header_size < payload_size)
{
payload_size = m_mtu - header_size;
ret = true; // there's more data to send
}
// if we have one MSS worth of data, make sure it fits in our
// congestion window and the advertized receive window from
@ -1556,49 +1583,42 @@ bool utp_socket_impl::send_pkt(bool ack)
m_last_cwnd_hit = time_now_hires();
m_cwnd_full = true;
// there's no more space in the cwnd, no need to
// try to send more right now
ret = false;
UTP_LOGV("%8p: no space in window send_buffer_size:%d cwnd:%d "
"ret:%d adv_wnd:%d in-flight:%d mtu:%d\n"
"adv_wnd:%d in-flight:%d mtu:%d\n"
, this, m_write_buffer_size, int(m_cwnd >> 16)
, ret, m_adv_wnd, m_bytes_in_flight, m_mtu);
, m_adv_wnd, m_bytes_in_flight, m_mtu);
if (!ack)
{
#if TORRENT_UTP_LOG
UTP_LOGV("%8p: skipping send seq_nr:%d ack_nr:%d "
"id:%d target:%s header_size:%d error:%s send_buffer_size:%d cwnd:%d "
"adv_wnd:%d in-flight:%d mtu:%d\n"
, this, int(m_seq_nr), int(m_ack_nr)
, m_send_id, print_endpoint(udp::endpoint(m_remote_address, m_port)).c_str()
, header_size, m_error.message().c_str(), m_write_buffer_size, int(m_cwnd >> 16)
, m_adv_wnd, m_bytes_in_flight, m_mtu);
#endif
return false;
}
}
// if we don't have any data to send, or can't send any data
// and we don't have any data to ack, don't send a packet
if (payload_size == 0 && !ack)
if (payload_size == 0 && !ack && !m_nagle_packet)
{
#if TORRENT_UTP_LOG
UTP_LOGV("%8p: skipping send seq_nr:%d ack_nr:%d "
UTP_LOGV("%8p: skipping send (no payload and no ack) seq_nr:%d ack_nr:%d "
"id:%d target:%s header_size:%d error:%s send_buffer_size:%d cwnd:%d "
"ret:%d adv_wnd:%d in-flight:%d mtu:%d\n"
"adv_wnd:%d in-flight:%d mtu:%d\n"
, this, int(m_seq_nr), int(m_ack_nr)
, m_send_id, print_endpoint(udp::endpoint(m_remote_address, m_port)).c_str()
, header_size, m_error.message().c_str(), m_write_buffer_size, int(m_cwnd >> 16)
, int(ret), m_adv_wnd, m_bytes_in_flight, m_mtu);
, m_adv_wnd, m_bytes_in_flight, m_mtu);
#endif
return false;
}
if (((m_seq_nr - m_acked_seq_nr) & ACK_MASK) > 1
&& payload_size < m_mtu - header_size
&& !ack
&& m_nagle)
{
// this is nagle. If we don't have a full packet
// worth of payload to send AND we have at least
// one outstanding packet, hold off. Once the
// outstanding packet is acked, we'll send this
// payload
UTP_LOGV("%8p: NAGLE not enough payload send_buffer_size:%d cwnd:%d "
"ret:%d adv_wnd:%d in-flight:%d mtu:%d\n"
, this, m_write_buffer_size, int(m_cwnd >> 16)
, ret, m_adv_wnd, m_bytes_in_flight, m_mtu);
return false;
}
int packet_size = header_size + payload_size;
// MTU DISCOVERY
@ -1611,31 +1631,104 @@ bool utp_socket_impl::send_pkt(bool ack)
m_mtu_seq = m_seq_nr;
}
packet* p;
// we only need a heap allocation if we have payload and
// need to keep the packet around (in the outbuf)
if (payload_size) p = (packet*)malloc(sizeof(packet) + packet_size);
else p = (packet*)TORRENT_ALLOCA(char, sizeof(packet) + packet_size);
packet* p = NULL;
boost::uint8_t* ptr = NULL;
utp_header* h = NULL;
p->size = packet_size;
p->header_size = packet_size - payload_size;
p->num_transmissions = 1;
p->need_resend = false;
p->mtu_probe = use_as_probe;
char* ptr = p->buf;
utp_header* h = (utp_header*)ptr;
ptr += sizeof(utp_header);
// payload size being zero means we're just sending
// and ack. We should not pick up the nagle packet
if (!m_nagle_packet || (payload_size == 0 && ack))
{
// we only need a heap allocation if we have payload and
// need to keep the packet around (in the outbuf)
#ifdef TORRENT_DEBUG
bool stack_alloced = false;
#endif
if (payload_size)
{
p = (packet*)malloc(sizeof(packet) + m_mtu);
p->allocated = m_mtu;
}
else
{
#ifdef TORRENT_DEBUG
stack_alloced = true;
#endif
TORRENT_ASSERT(ack);
p = (packet*)TORRENT_ALLOCA(char, sizeof(packet) + packet_size);
UTP_LOGV("%8p: allocating %d bytes on the stack\n", this, packet_size);
p->allocated = packet_size;
}
h->type_ver = ((payload_size ? ST_DATA : ST_STATE) << 4) | 1;
h->extension = sack ? 1 : 0;
h->connection_id = m_send_id;
h->timestamp_difference_microseconds = m_reply_micro;
h->wnd_size = m_in_buf_size - m_buffered_incoming_bytes - m_receive_buffer_size;
// seq_nr is ignored for ST_STATE packets, so it doesn't
// matter that we say this is a sequence number we haven't
// actually sent yet
h->seq_nr = m_seq_nr;
h->ack_nr = m_ack_nr;
p->size = packet_size;
p->header_size = packet_size - payload_size;
p->num_transmissions = 0;
p->need_resend = false;
p->mtu_probe = use_as_probe;
ptr = p->buf;
h = (utp_header*)ptr;
ptr += sizeof(utp_header);
h->extension = sack ? 1 : 0;
h->connection_id = m_send_id;
// seq_nr is ignored for ST_STATE packets, so it doesn't
// matter that we say this is a sequence number we haven't
// actually sent yet
h->seq_nr = m_seq_nr;
h->type_ver = ((payload_size ? ST_DATA : ST_STATE) << 4) | 1;
write_payload(p->buf + p->header_size, payload_size);
}
else
{
// pick up the nagle packet and keep adding bytes to it
p = m_nagle_packet;
ptr = p->buf + sizeof(utp_header);
h = (utp_header*)p->buf;
// if the packet has a selective ack header, we'll need
// to update it
if (h->extension == 1)
{
sack = ptr[1];
if (m_inbuf.size() == 0 && h->ack_nr != m_ack_nr)
{
// we need to remove the sack header
remove_sack_header(p);
sack = 0;
}
}
else
sack = 0;
int size_left = p->allocated - p->size;
TORRENT_ASSERT(size_left > 0);
size_left = (std::min)(size_left, m_write_buffer_size);
write_payload(p->buf + p->size, size_left);
p->size += size_left;
UTP_LOGV("%8p: NAGLE appending %d bytes to nagle packet. new size: %d allocated: %d\n"
, this, size_left, p->size, p->allocated);
// did we fill up the whole mtu?
// if we didn't, we may still send it if there's
// no bytes in flight
if (m_bytes_in_flight > 0
&& p->size < p->allocated
&& !ack
&& m_nagle)
{
return false;
}
// clear the nagle packet pointer and fall through
// sending p
m_nagle_packet = NULL;
packet_size = p->size;
payload_size = p->size - p->header_size;
}
if (sack)
{
@ -1643,11 +1736,34 @@ bool utp_socket_impl::send_pkt(bool ack)
*ptr++ = sack; // bytes for SACK bitfield
write_sack(ptr, sack);
ptr += sack;
TORRENT_ASSERT(ptr <= p->buf + p->header_size);
}
write_payload(ptr, payload_size);
if (m_bytes_in_flight > 0
&& payload_size < m_mtu - header_size
&& !ack
&& m_nagle)
{
// this is nagle. If we don't have a full packet
// worth of payload to send AND we have at least
// one outstanding packet, hold off. Once the
// outstanding packet is acked, we'll send this
// payload
UTP_LOGV("%8p: NAGLE not enough payload send_buffer_size:%d cwnd:%d "
"adv_wnd:%d in-flight:%d mtu:%d\n"
, this, m_write_buffer_size, int(m_cwnd >> 16)
, m_adv_wnd, m_bytes_in_flight, m_mtu);
TORRENT_ASSERT(m_nagle_packet == NULL);
m_nagle_packet = p;
return false;
}
h->timestamp_difference_microseconds = m_reply_micro;
h->wnd_size = m_in_buf_size - m_buffered_incoming_bytes - m_receive_buffer_size;
h->ack_nr = m_ack_nr;
// fill in the timestamp as late as possible
++p->num_transmissions;
ptime now = time_now_hires();
p->send_time = now;
h->timestamp_microseconds = boost::uint32_t(total_microseconds(now - min_time()));
@ -1671,7 +1787,7 @@ bool utp_socket_impl::send_pkt(bool ack)
// if ((rand() % 100) > 0)
#endif
m_sm->send_packet(udp::endpoint(m_remote_address, m_port)
, (char const*)h, packet_size, ec
, (char const*)h, p->size, ec
, use_as_probe ? utp_socket_manager::dont_fragment : 0);
++m_out_packets;
@ -1711,18 +1827,18 @@ bool utp_socket_impl::send_pkt(bool ack)
}
m_seq_nr = (m_seq_nr + 1) & ACK_MASK;
TORRENT_ASSERT(payload_size >= 0);
m_bytes_in_flight += payload_size;
m_bytes_in_flight += p->size - p->header_size;
}
return ret;
return m_write_buffer_size > 0 && !m_cwnd_full;
}
// size is in bytes
void utp_socket_impl::write_sack(char* buf, int size) const
void utp_socket_impl::write_sack(boost::uint8_t* buf, int size) const
{
TORRENT_ASSERT(m_inbuf.size());
int ack_nr = (m_ack_nr + 2) & ACK_MASK;
char* end = buf + size;
boost::uint8_t* end = buf + size;
for (; buf != end; ++buf)
{
@ -1778,14 +1894,27 @@ bool utp_socket_impl::resend_packet(packet* p, bool fast_resend)
h->timestamp_difference_microseconds = m_reply_micro;
p->send_time = time_now_hires();
h->timestamp_microseconds = boost::uint32_t(total_microseconds(p->send_time - min_time()));
if (h->extension == 0)
// if the packet has a selective ack header, we'll need
// to update it
if (h->extension == 1 && h->ack_nr != m_ack_nr)
{
// if extension != 0, there might be a SACK in the header
// and we can't update the ack field (since the SACK bits
// depend on it). If it's zero however, we can update it.
h->ack_nr = m_ack_nr;
boost::uint8_t* ptr = p->buf + sizeof(utp_header);
int sack_size = ptr[1];
if (m_inbuf.size())
{
// update the sack header
write_sack(ptr + 2, sack_size);
TORRENT_ASSERT(ptr + sack_size + 2 <= p->buf + p->header_size);
}
else
{
remove_sack_header(p);
}
}
h->ack_nr = m_ack_nr;
error_code ec;
m_sm->send_packet(udp::endpoint(m_remote_address, m_port)
, (char const*)p->buf, p->size, ec);
@ -1909,7 +2038,7 @@ void utp_socket_impl::ack_packet(packet* p, ptime const& receive_time
free(p);
}
void utp_socket_impl::incoming(char const* buf, int size, packet* p, ptime now)
void utp_socket_impl::incoming(boost::uint8_t const* buf, int size, packet* p, ptime now)
{
while (!m_read_buffer.empty())
{
@ -1928,7 +2057,7 @@ void utp_socket_impl::incoming(char const* buf, int size, packet* p, ptime now)
UTP_LOGV("%8p: setting read timeout to 100 ms from now\n", this);
}
m_read += to_copy;
target->buf = ((char*)target->buf) + to_copy;
target->buf = ((boost::uint8_t*)target->buf) + to_copy;
target->len -= to_copy;
buf += to_copy;
UTP_LOGV("%8p: copied %d bytes into user receive buffer\n", this, to_copy);
@ -1987,7 +2116,7 @@ bool utp_socket_impl::cancel_handlers(error_code const& ec, bool kill)
}
bool utp_socket_impl::consume_incoming_data(
utp_header const* ph, char const* ptr, int payload_size
utp_header const* ph, boost::uint8_t const* ptr, int payload_size
, ptime now)
{
if (ph->get_type() != ST_DATA) return false;
@ -2136,7 +2265,7 @@ void utp_socket_impl::init_mtu(int link_mtu, int utp_mtu)
}
// return false if this is an invalid packet
bool utp_socket_impl::incoming_packet(char const* buf, int size
bool utp_socket_impl::incoming_packet(boost::uint8_t const* buf, int size
, udp::endpoint const& ep, ptime receive_time)
{
utp_header* ph = (utp_header*)buf;
@ -2351,7 +2480,7 @@ bool utp_socket_impl::incoming_packet(char const* buf, int size
}
// look for extended headers
char const* ptr = buf;
boost::uint8_t const* ptr = buf;
ptr += sizeof(utp_header);
unsigned int extension = ph->extension;
@ -2364,8 +2493,14 @@ bool utp_socket_impl::incoming_packet(char const* buf, int size
UTP_LOGV("%8p: invalid extension header\n", this);
return true;
}
int next_extension = unsigned(*ptr++);
int len = unsigned(*ptr++);
int next_extension = *ptr++;
int len = *ptr++;
if (len < 0)
{
UTP_LOGV("%8p: invalid extension length:%d packet:%d\n"
, this, len, int(ptr - buf));
return true;
}
if (ptr - buf + len > size_t(size))
{
UTP_LOGV("%8p: invalid extension header size:%d packet:%d\n"