From 2b9b2a188adfc256d610d674a8a4db37dc9253dd Mon Sep 17 00:00:00 2001 From: Arvid Norberg Date: Thu, 21 Jun 2012 15:05:57 +0000 Subject: [PATCH] remove uTP delayed ack and instead send acks when the udp socket has been drained. simplify the udp socket to use null_buffers and allocate less memory for buffers. this also eliminated the race condition when resizing the udp socket receive buffer which greatly simplified it --- include/libtorrent/aux_/session_impl.hpp | 2 + include/libtorrent/session_settings.hpp | 2 + include/libtorrent/udp_socket.hpp | 35 ++- include/libtorrent/utp_socket_manager.hpp | 10 +- include/libtorrent/utp_stream.hpp | 1 + src/session.cpp | 2 + src/session_impl.cpp | 9 + src/udp_socket.cpp | 282 +++++++--------------- src/utp_socket_manager.cpp | 21 ++ src/utp_stream.cpp | 77 +++--- 10 files changed, 189 insertions(+), 252 deletions(-) diff --git a/include/libtorrent/aux_/session_impl.hpp b/include/libtorrent/aux_/session_impl.hpp index 3934f8457..f7875a15d 100644 --- a/include/libtorrent/aux_/session_impl.hpp +++ b/include/libtorrent/aux_/session_impl.hpp @@ -890,6 +890,8 @@ namespace libtorrent void on_receive_udp_hostname(error_code const& e , char const* hostname, char const* buf, int len); + void on_udp_socket_drained(); + // see m_external_listen_port. This is the same // but for the udp port used by the DHT. int m_external_udp_port; diff --git a/include/libtorrent/session_settings.hpp b/include/libtorrent/session_settings.hpp index 66b4218b3..bbbd65aaa 100644 --- a/include/libtorrent/session_settings.hpp +++ b/include/libtorrent/session_settings.hpp @@ -825,8 +825,10 @@ namespace libtorrent // initial timeout for uTP SYN packets int utp_connect_timeout; +#ifndef TORRENT_NO_DEPRECATE // number of milliseconds of delaying ACKing packets the most int utp_delayed_ack; +#endif // set to true if the uTP socket buffer size is allowed to increase // dynamically based on the NIC MTU setting. This is true by default diff --git a/include/libtorrent/udp_socket.hpp b/include/libtorrent/udp_socket.hpp index dea2af217..156d4045b 100644 --- a/include/libtorrent/udp_socket.hpp +++ b/include/libtorrent/udp_socket.hpp @@ -51,12 +51,15 @@ namespace libtorrent class udp_socket { public: + // TODO: instead of these callbacks, support observers typedef boost::function callback_t; typedef boost::function callback2_t; + typedef boost::function drain_callback_t; - udp_socket(io_service& ios, callback_t const& c, callback2_t const& c2, connection_queue& cc); + udp_socket(io_service& ios, callback_t const& c, callback2_t const& c2 + , drain_callback_t const& dc, connection_queue& cc); ~udp_socket(); enum flags_t { dont_drop = 1, peer_connection = 2 }; @@ -143,7 +146,13 @@ namespace libtorrent // name as source callback2_t m_callback2; - void on_read(udp::socket* sock, error_code const& e, std::size_t bytes_transferred); + // called every time we drain the udp sockets + drain_callback_t m_drained_callback; + + void setup_read(udp::socket* s); + void on_read(udp::socket* s); + void on_read_impl(udp::socket* sock, udp::endpoint const& ep + , error_code const& e, std::size_t bytes_transferred); void on_name_lookup(error_code const& e, tcp::resolver::iterator i); void on_timeout(); void on_connect(int ticket); @@ -161,7 +170,6 @@ namespace libtorrent void wrap(char const* hostname, int port, char const* p, int len, error_code& ec); void unwrap(error_code const& e, char const* buf, int size); - void maybe_realloc_buffers(int which = 3); bool maybe_clear_callback(); #if defined TORRENT_DEBUG || TORRENT_RELEASE_ASSERTS @@ -180,25 +188,11 @@ namespace libtorrent #endif udp::socket m_ipv4_sock; - udp::endpoint m_v4_ep; - int m_v4_buf_size; - char* m_v4_buf; - // this is set to true to indicate that the - // m_v4_buf should be reallocated to the size - // of the buffer size members the next time their - // read handler gets triggered - bool m_reallocate_buffer4; + int m_buf_size; + char* m_buf; #if TORRENT_USE_IPV6 udp::socket m_ipv6_sock; - udp::endpoint m_v6_ep; - int m_v6_buf_size; - char* m_v6_buf; - // this is set to true to indicate that the - // m_v6_buf should be reallocated to the size - // of the buffer size members the next time their - // read handler gets triggered - bool m_reallocate_buffer6; #endif boost::uint16_t m_bind_port; @@ -235,7 +229,8 @@ namespace libtorrent struct rate_limited_udp_socket : public udp_socket { - rate_limited_udp_socket(io_service& ios, callback_t const& c, callback2_t const& c2, connection_queue& cc); + rate_limited_udp_socket(io_service& ios, callback_t const& c + , callback2_t const& c2, drain_callback_t const& dc, connection_queue& cc); void set_rate_limit(int limit) { m_rate_limit = limit; } bool can_send() const { return int(m_queue.size()) >= m_queue_size_limit; } bool send(udp::endpoint const& ep, char const* p, int len, error_code& ec, int flags = 0); diff --git a/include/libtorrent/utp_socket_manager.hpp b/include/libtorrent/utp_socket_manager.hpp index 939e22e06..d25e499f5 100644 --- a/include/libtorrent/utp_socket_manager.hpp +++ b/include/libtorrent/utp_socket_manager.hpp @@ -56,6 +56,7 @@ namespace libtorrent // return false if this is not a uTP packet bool incoming_packet(char const* p, int size, udp::endpoint const& ep); + void socket_drained(); void tick(ptime now); @@ -76,7 +77,6 @@ namespace libtorrent int fin_resends() const { return m_sett.utp_fin_resends; } int num_resends() const { return m_sett.utp_num_resends; } int connect_timeout() const { return m_sett.utp_connect_timeout; } - int delayed_ack() const { return m_sett.utp_delayed_ack; } int min_timeout() const { return m_sett.utp_min_timeout; } int loss_multiplier() const { return m_sett.utp_loss_multiplier; } bool allow_dynamic_sock_buf() const { return m_sett.utp_dynamic_sock_buf; } @@ -85,6 +85,8 @@ namespace libtorrent void set_sock_buf(int size); int num_sockets() const { return m_utp_sockets.size(); } + void defer_ack(utp_socket_impl* s); + private: udp_socket& m_sock; incoming_utp_callback_t m_cb; @@ -93,6 +95,12 @@ namespace libtorrent typedef std::multimap socket_map_t; socket_map_t m_utp_sockets; + // this is a list of sockets that needs to send an ack. + // once the UDP socket is drained, all of these will + // have a chance to do that. This is to avoid sending + // an ack for every single packet + std::vector m_deferred_acks; + // the last socket we received a packet on utp_socket_impl* m_last_socket; diff --git a/include/libtorrent/utp_stream.hpp b/include/libtorrent/utp_stream.hpp index 4dd5ae077..eba35632a 100644 --- a/include/libtorrent/utp_stream.hpp +++ b/include/libtorrent/utp_stream.hpp @@ -151,6 +151,7 @@ bool utp_match(utp_socket_impl* s, udp::endpoint const& ep, boost::uint16_t id); udp::endpoint utp_remote_endpoint(utp_socket_impl* s); boost::uint16_t utp_receive_id(utp_socket_impl* s); int utp_socket_state(utp_socket_impl const* s); +void utp_send_ack(utp_socket_impl* s); #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING int socket_impl_size(); diff --git a/src/session.cpp b/src/session.cpp index b05cc67c7..e47e6dfad 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -1257,7 +1257,9 @@ namespace libtorrent , utp_fin_resends(2) , utp_num_resends(6) , utp_connect_timeout(3000) // milliseconds +#ifndef TORRENT_NO_DEPRECATE , utp_delayed_ack(0) // milliseconds +#endif , utp_dynamic_sock_buf(true) , utp_loss_multiplier(50) // specified in percent , mixed_mode_algorithm(peer_proportional) diff --git a/src/session_impl.cpp b/src/session_impl.cpp index d5cbbac06..9690189f0 100644 --- a/src/session_impl.cpp +++ b/src/session_impl.cpp @@ -644,6 +644,7 @@ namespace aux { , m_udp_socket(m_io_service , boost::bind(&session_impl::on_receive_udp, this, _1, _2, _3, _4) , boost::bind(&session_impl::on_receive_udp_hostname, this, _1, _2, _3, _4) + , boost::bind(&session_impl::on_udp_socket_drained, this) , m_half_open) , m_utp_socket_manager(m_settings, m_udp_socket , boost::bind(&session_impl::incoming_connection, this, _1)) @@ -2500,6 +2501,14 @@ namespace aux { } } + // this is called every time all packets have been read from + // the udp socket. The utp_socket_manager uses this event to + // trigger a flush of deferred ACKs + void session_impl::on_udp_socket_drained() + { + m_utp_socket_manager.socket_drained(); + } + void session_impl::async_accept(boost::shared_ptr const& listener, bool ssl) { TORRENT_ASSERT(!m_abort); diff --git a/src/udp_socket.cpp b/src/udp_socket.cpp index 6077e966a..57bedb16b 100644 --- a/src/udp_socket.cpp +++ b/src/udp_socket.cpp @@ -55,18 +55,16 @@ using namespace libtorrent; udp_socket::udp_socket(asio::io_service& ios , udp_socket::callback_t const& c , udp_socket::callback2_t const& c2 + , udp_socket::drain_callback_t const& dc , connection_queue& cc) : m_callback(c) , m_callback2(c2) + , m_drained_callback(dc) , m_ipv4_sock(ios) - , m_v4_buf_size(0) - , m_v4_buf(0) - , m_reallocate_buffer4(false) + , m_buf_size(0) + , m_buf(0) #if TORRENT_USE_IPV6 , m_ipv6_sock(ios) - , m_v6_buf_size(0) - , m_v6_buf(0) - , m_reallocate_buffer6(false) #endif , m_bind_port(0) , m_v4_outstanding(0) @@ -91,19 +89,14 @@ udp_socket::udp_socket(asio::io_service& ios #endif #endif - m_v4_buf_size = 2000; - m_v4_buf = (char*)malloc(m_v4_buf_size); -#if TORRENT_USE_IPV6 - m_v6_buf_size = 2000; - m_v6_buf = (char*)malloc(m_v6_buf_size); -#endif + m_buf_size = 2000; + m_buf = (char*)malloc(m_buf_size); } udp_socket::~udp_socket() { - free(m_v4_buf); + free(m_buf); #if TORRENT_USE_IPV6 - free(m_v6_buf); TORRENT_ASSERT_VAL(m_v6_outstanding == 0, m_v6_outstanding); #endif TORRENT_ASSERT_VAL(m_v4_outstanding == 0, m_v4_outstanding); @@ -214,45 +207,8 @@ void udp_socket::send(udp::endpoint const& ep, char const* p, int len #endif } -void udp_socket::maybe_realloc_buffers(int which) -{ - TORRENT_ASSERT(is_single_thread()); - bool no_mem = false; - if (m_reallocate_buffer4 && (which & 1) && m_v4_outstanding == 0) - { - TORRENT_ASSERT(m_v4_outstanding == 0); - void* tmp = realloc(m_v4_buf, m_v4_buf_size); - if (tmp != 0) m_v4_buf = (char*)tmp; - else no_mem = true; - m_reallocate_buffer4 = false; - } -#if TORRENT_USE_IPV6 - if (m_reallocate_buffer6 && (which & 2) && m_v6_outstanding == 0) - { - TORRENT_ASSERT(m_v6_outstanding == 0); - void* tmp = realloc(m_v6_buf, m_v6_buf_size); - if (tmp != 0) m_v6_buf = (char*)tmp; - else no_mem = true; - m_reallocate_buffer6 = false; - } -#endif - - if (no_mem) - { - free(m_v4_buf); - m_v4_buf = 0; - m_v4_buf_size = 0; -#if TORRENT_USE_IPV6 - free(m_v6_buf); - m_v6_buf = 0; - m_v6_buf_size = 0; -#endif - if (m_callback) m_callback(error::no_memory, m_v4_ep, 0, 0); - close(); - } -} - -void udp_socket::on_read(udp::socket* s, error_code const& e, std::size_t bytes_transferred) +// called whenever the socket is readable +void udp_socket::on_read(udp::socket* s) { #if defined TORRENT_ASIO_DEBUGGING complete_async("udp_socket::on_read"); @@ -283,16 +239,34 @@ void udp_socket::on_read(udp::socket* s, error_code const& e, std::size_t bytes_ CHECK_MAGIC; if (!m_callback) return; + for (;;) + { + error_code ec; + udp::endpoint ep; + size_t bytes_transferred = s->receive_from(asio::buffer(m_buf, m_buf_size), ep, 0, ec); + if (ec == asio::error::would_block) break; + on_read_impl(s, ep, ec, bytes_transferred); + } + if (m_drained_callback) m_drained_callback(); + setup_read(s); +} + +void udp_socket::on_read_impl(udp::socket* s, udp::endpoint const& ep + , error_code const& e, std::size_t bytes_transferred) +{ + TORRENT_ASSERT(m_magic == 0x1337); + TORRENT_ASSERT(is_single_thread()); + if (e) { TORRENT_TRY { #if TORRENT_USE_IPV6 if (s == &m_ipv6_sock) - m_callback(e, m_v6_ep, 0, 0); + m_callback(e, ep, 0, 0); else #endif - m_callback(e, m_v4_ep, 0, 0); + m_callback(e, ep, 0, 0); } TORRENT_CATCH (std::exception&) {} @@ -315,106 +289,42 @@ void udp_socket::on_read(udp::socket* s, error_code const& e, std::size_t bytes_ if (m_abort) return; -#if defined TORRENT_ASIO_DEBUGGING - add_outstanding_async("udp_socket::on_read"); -#endif -#if TORRENT_USE_IPV6 - if (s == &m_ipv6_sock && num_outstanding() == 0) - { - maybe_realloc_buffers(2); - if (m_abort) return; - ++m_v6_outstanding; - s->async_receive_from(asio::buffer(m_v6_buf, m_v6_buf_size) - , m_v6_ep, boost::bind(&udp_socket::on_read, this, s, _1, _2)); - } - else -#endif - if (m_v4_outstanding == 0) - { - maybe_realloc_buffers(1); - if (m_abort) return; - ++m_v4_outstanding; - s->async_receive_from(asio::buffer(m_v4_buf, m_v4_buf_size) - , m_v4_ep, boost::bind(&udp_socket::on_read, this, s, _1, _2)); - } - -#ifdef TORRENT_DEBUG - m_started = true; -#endif return; } + TORRENT_TRY { + + if (m_tunnel_packets) + { + // if the source IP doesn't match the proxy's, ignore the packet + if (ep == m_proxy_addr) + unwrap(e, m_buf, bytes_transferred); + } + else + { + m_callback(e, ep, m_buf, bytes_transferred); + } + + } TORRENT_CATCH (std::exception&) {} +} + +void udp_socket::setup_read(udp::socket* s) +{ + if (m_abort) return; + #if TORRENT_USE_IPV6 if (s == &m_ipv6_sock) - { - TORRENT_TRY { - - if (m_tunnel_packets) - { - // if the source IP doesn't match the proxy's, ignore the packet - if (m_v6_ep == m_proxy_addr) - unwrap(e, m_v6_buf, bytes_transferred); - } - else - { - m_callback(e, m_v6_ep, m_v6_buf, bytes_transferred); - } - - } TORRENT_CATCH (std::exception&) {} - - if (m_abort) return; - - if (num_outstanding() == 0) - { - maybe_realloc_buffers(2); - if (m_abort) return; - -#if defined TORRENT_ASIO_DEBUGGING - add_outstanding_async("udp_socket::on_read"); -#endif - ++m_v6_outstanding; - s->async_receive_from(asio::buffer(m_v6_buf, m_v6_buf_size) - , m_v6_ep, boost::bind(&udp_socket::on_read, this, s, _1, _2)); - } - } + ++m_v6_outstanding; else -#endif // TORRENT_USE_IPV6 - { - - TORRENT_TRY { - - if (m_tunnel_packets) - { - // if the source IP doesn't match the proxy's, ignore the packet - if (m_v4_ep == m_proxy_addr) - unwrap(e, m_v4_buf, bytes_transferred); - } - else - { - m_callback(e, m_v4_ep, m_v4_buf, bytes_transferred); - } - - } TORRENT_CATCH (std::exception&) {} - - if (m_abort) return; - - if (m_v4_outstanding == 0) - { - maybe_realloc_buffers(1); - if (m_abort) return; +#endif + ++m_v4_outstanding; #if defined TORRENT_ASIO_DEBUGGING - add_outstanding_async("udp_socket::on_read"); -#endif - ++m_v4_outstanding; - s->async_receive_from(asio::buffer(m_v4_buf, m_v4_buf_size) - , m_v4_ep, boost::bind(&udp_socket::on_read, this, s, _1, _2)); - } - } - -#ifdef TORRENT_DEBUG - m_started = true; + add_outstanding_async("udp_socket::on_read"); #endif + udp::endpoint ep; + s->async_receive_from(asio::null_buffers() + , ep, boost::bind(&udp_socket::on_read, this, s)); } void udp_socket::wrap(udp::endpoint const& ep, char const* p, int len, error_code& ec) @@ -575,14 +485,26 @@ void udp_socket::close() void udp_socket::set_buf_size(int s) { TORRENT_ASSERT(is_single_thread()); - if (s > m_v4_buf_size) + bool no_mem = false; + void* tmp = realloc(m_buf, s); + if (tmp != 0) { - m_v4_buf_size = s; - m_reallocate_buffer4 = true; -#if TORRENT_USE_IPV6 - m_v6_buf_size = s; - m_reallocate_buffer6 = true; -#endif + m_buf = (char*)tmp; + m_buf_size = s; + } + else + { + no_mem = true; + } + + if (no_mem) + { + free(m_buf); + m_buf = 0; + m_buf_size = 0; + udp::endpoint ep; + if (m_callback) m_callback(error::no_memory, ep, 0, 0); + close(); } } @@ -605,18 +527,11 @@ void udp_socket::bind(udp::endpoint const& ep, error_code& ec) if (ec) return; m_ipv4_sock.bind(ep, ec); if (ec) return; + udp::socket::non_blocking_io ioc(true); + m_ipv4_sock.io_control(ioc, ec); + if (ec) return; if (m_v4_outstanding == 0) - { - maybe_realloc_buffers(1); - if (m_abort) return; -#if defined TORRENT_ASIO_DEBUGGING - add_outstanding_async("udp_socket::on_read"); -#endif - ++m_v4_outstanding; - m_ipv4_sock.async_receive_from(asio::buffer(m_v4_buf, m_v4_buf_size) - , m_v4_ep, boost::bind(&udp_socket::on_read, this, &m_ipv4_sock - , _1, _2)); - } + setup_read(&m_ipv4_sock); } #if TORRENT_USE_IPV6 else @@ -625,18 +540,11 @@ void udp_socket::bind(udp::endpoint const& ep, error_code& ec) if (ec) return; m_ipv6_sock.bind(ep, ec); if (ec) return; + udp::socket::non_blocking_io ioc(true); + m_ipv6_sock.io_control(ioc, ec); + if (ec) return; if (m_v6_outstanding == 0) - { - maybe_realloc_buffers(2); - if (m_abort) return; -#if defined TORRENT_ASIO_DEBUGGING - add_outstanding_async("udp_socket::on_read"); -#endif - ++m_v6_outstanding; - m_ipv6_sock.async_receive_from(asio::buffer(m_v6_buf, m_v6_buf_size) - , m_v6_ep, boost::bind(&udp_socket::on_read, this, &m_ipv6_sock - , _1, _2)); - } + setup_read(&m_ipv6_sock); } #endif #ifdef TORRENT_DEBUG @@ -660,41 +568,24 @@ void udp_socket::bind(int port) if (m_ipv6_sock.is_open()) m_ipv6_sock.close(ec); #endif - maybe_realloc_buffers(); if (m_abort) return; m_ipv4_sock.open(udp::v4(), ec); if (!ec) { -#if defined TORRENT_ASIO_DEBUGGING - add_outstanding_async("udp_socket::on_read"); -#endif m_ipv4_sock.bind(udp::endpoint(address_v4::any(), port), ec); if (m_v4_outstanding == 0) - { - ++m_v4_outstanding; - m_ipv4_sock.async_receive_from(asio::buffer(m_v4_buf, m_v4_buf_size) - , m_v4_ep, boost::bind(&udp_socket::on_read, this, &m_ipv4_sock - , _1, _2)); - } + setup_read(&m_ipv4_sock); } #if TORRENT_USE_IPV6 m_ipv6_sock.open(udp::v6(), ec); if (!ec) { -#if defined TORRENT_ASIO_DEBUGGING - add_outstanding_async("udp_socket::on_read"); -#endif m_ipv6_sock.set_option(v6only(true), ec); m_ipv6_sock.bind(udp::endpoint(address_v6::any(), port), ec); if (m_v6_outstanding == 0) - { - ++m_v6_outstanding; - m_ipv6_sock.async_receive_from(asio::buffer(m_v6_buf, m_v6_buf_size) - , m_v6_ep, boost::bind(&udp_socket::on_read, this, &m_ipv6_sock - , _1, _2)); - } + setup_read(&m_ipv6_sock); } #endif // TORRENT_USE_IPV6 @@ -1189,8 +1080,9 @@ void udp_socket::hung_up(error_code const& e) rate_limited_udp_socket::rate_limited_udp_socket(io_service& ios , callback_t const& c , callback2_t const& c2 + , drain_callback_t const& dc , connection_queue& cc) - : udp_socket(ios, c, c2, cc) + : udp_socket(ios, c, c2, dc, cc) , m_timer(ios) , m_queue_size_limit(200) , m_rate_limit(4000) diff --git a/src/utp_socket_manager.cpp b/src/utp_socket_manager.cpp index 6865d86b7..3b5a6efd1 100644 --- a/src/utp_socket_manager.cpp +++ b/src/utp_socket_manager.cpp @@ -275,6 +275,27 @@ namespace libtorrent return false; } + void utp_socket_manager::socket_drained() + { + // flush all deferred acks + + std::vector deferred_acks; + m_deferred_acks.swap(deferred_acks); + for (std::vector::iterator i = deferred_acks.begin() + , end(deferred_acks.end()); i != end; ++i) + { + utp_socket_impl* s = *i; + utp_send_ack(s); + } + } + + void utp_socket_manager::defer_ack(utp_socket_impl* s) + { + TORRENT_ASSERT(std::find(m_deferred_acks.begin(), m_deferred_acks.end(), s) + == m_deferred_acks.end()); + m_deferred_acks.push_back(s); + } + void utp_socket_manager::remove_socket(boost::uint16_t id) { socket_map_t::iterator i = m_utp_sockets.find(id); diff --git a/src/utp_stream.cpp b/src/utp_stream.cpp index 5e72c5de0..7531a2788 100644 --- a/src/utp_stream.cpp +++ b/src/utp_stream.cpp @@ -39,8 +39,8 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/random.hpp" #include -#define TORRENT_UTP_LOG 0 -#define TORRENT_VERBOSE_UTP_LOG 0 +#define TORRENT_UTP_LOG 1 +#define TORRENT_VERBOSE_UTP_LOG 1 #define TORRENT_UT_SEQ 1 #if TORRENT_UTP_LOG @@ -221,7 +221,6 @@ struct utp_socket_impl , m_write_timeout() , m_timeout(time_now_hires() + milliseconds(m_sm->connect_timeout())) , m_last_cwnd_hit(time_now()) - , m_ack_timer(time_now() + minutes(10)) , m_last_history_step(time_now_hires()) , m_cwnd(TORRENT_ETHERNET_MTU << 16) , m_buffered_incoming_bytes(0) @@ -258,8 +257,10 @@ struct utp_socket_impl , m_eof(false) , m_attached(true) , m_nagle(true) + // TODO: make slow start work , m_slow_start(false) , m_cwnd_full(false) + , m_deferred_ack(false) { TORRENT_ASSERT(m_userdata); for (int i = 0; i != num_delay_hist; ++i) @@ -289,6 +290,7 @@ struct utp_socket_impl void send_syn(); void send_fin(); + void defer_ack(); bool send_pkt(bool ack); bool resend_packet(packet* p, bool fast_resend = false); void send_reset(utp_header* ph); @@ -403,11 +405,6 @@ struct utp_socket_impl // not sending fast enough to need it bigger ptime m_last_cwnd_hit; - // the next time we need to send an ACK the latest - // updated every time we send an ACK and every time we - // put off sending an ACK for a received packet - ptime m_ack_timer; - // the last time we stepped the timestamp history ptime m_last_history_step; @@ -601,6 +598,12 @@ struct utp_socket_impl // this is true as long as we have as many packets in // flight as allowed by the congestion window (cwnd) bool m_cwnd_full:1; + + // this is set to true when this socket has added itself to + // the utp socket manager's list of deferred acks. Once the + // burst of incoming UDP packets is all drained, the utp socket + // manager will send acks for all sockets on this list. + bool m_deferred_ack:1; }; #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING @@ -662,6 +665,13 @@ boost::uint16_t utp_receive_id(utp_socket_impl* s) return s->m_recv_id; } +void utp_send_ack(utp_socket_impl* s) +{ + TORRENT_ASSERT(s->m_deferred_ack); + s->m_deferred_ack = false; + s->send_pkt(true); +} + void utp_socket_impl::update_mtu_limits() { TORRENT_ASSERT(m_mtu_floor <= m_mtu_ceiling); @@ -1026,6 +1036,7 @@ void utp_stream::do_connect(tcp::endpoint const& ep, utp_stream::connect_handler utp_socket_impl::~utp_socket_impl() { TORRENT_ASSERT(!m_attached); + TORRENT_ASSERT(!m_deferred_ack); UTP_LOGV("%8p: destroying utp socket state\n", this); @@ -1460,6 +1471,13 @@ void utp_socket_impl::write_payload(char* ptr, int size) #endif } +void utp_socket_impl::defer_ack() +{ + if (m_deferred_ack) return; + m_deferred_ack = true; + m_sm->defer_ack(this); +} + // sends a packet, pulls data from the write buffer (if there's any) // if ack is true, we need to send a packet regardless of if there's // any data. Returns true if we could send more data (i.e. call @@ -1667,11 +1685,6 @@ bool utp_socket_impl::send_pkt(bool ack) return false; } - // we just sent a packet. this means we just ACKed the last received - // packet as well. So, we can now reset the delayed ack timer to - // not trigger for a long time - m_ack_timer = now + minutes(10); - // if we have payload, we need to save the packet until it's acked // and progress m_seq_nr if (payload_size) @@ -2465,7 +2478,7 @@ bool utp_socket_impl::incoming_packet(char const* buf, int size TORRENT_ASSERT(m_send_id == ph->connection_id); TORRENT_ASSERT(m_recv_id == ((m_send_id + 1) & 0xffff)); - send_pkt(true); + defer_ack(); return true; } @@ -2543,23 +2556,23 @@ bool utp_socket_impl::incoming_packet(char const* buf, int size // (i.e. ST_STATE) we're not ACKing anything. If we just // received a FIN packet, we need to ack that as well bool has_ack = ph->get_type() == ST_DATA || ph->get_type() == ST_FIN || ph->get_type() == ST_SYN; - int delayed_ack = m_sm->delayed_ack(); - if (has_ack && delayed_ack && m_ack_timer > receive_time) + int prev_out_packets = m_out_packets; + + // try to send more data as long as we can + // if send_pkt returns true + while (send_pkt(false)); + + if (has_ack && prev_out_packets == m_out_packets) { - // we have data to ACK, and delayed ACKs are enabled. - // update the ACK timer and clear the flag, to pretend - // like we don't have anything to ACK - m_ack_timer = (std::min)(m_ack_timer, receive_time + milliseconds(delayed_ack)); - has_ack = false; - UTP_LOGV("%8p: delaying ack. timer triggers in %d milliseconds\n" - , this, int(total_milliseconds(m_ack_timer - time_now_hires()))); + // we need to ack some data we received, and we didn't + // end up sending any payload packets in the loop + // above (becasue m_out_packets would have been incremented + // in that case). This means we need to send an ack. + // don't do it right away, because we may still receive + // more packets. defer the ack to send as few acks as possible + defer_ack(); } - if (send_pkt(has_ack)) - { - // try to send more data as long as we can - while (send_pkt(false)); - } maybe_trigger_send_callback(receive_time); if (m_state == UTP_STATE_ERROR_WAIT || m_state == UTP_STATE_DELETE) return true; @@ -2989,14 +3002,6 @@ void utp_socket_impl::tick(ptime const& now) } } - if (now > m_ack_timer) - { - UTP_LOGV("%8p: ack timer expired, sending ACK\n", this); - // we need to send an ACK now! - send_pkt(true); - if (m_state == UTP_STATE_ERROR_WAIT || m_state == UTP_STATE_DELETE) return; - } - switch (m_state) { case UTP_STATE_NONE: