diff --git a/ChangeLog b/ChangeLog index 45c84edc9..8f337086b 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,4 @@ + * fix uTP edge case where udp socket buffer fills up * fix nagle implementation in uTP * fix move_storage bugs diff --git a/include/libtorrent/udp_socket.hpp b/include/libtorrent/udp_socket.hpp index f64a1f9f0..6492a17fc 100644 --- a/include/libtorrent/udp_socket.hpp +++ b/include/libtorrent/udp_socket.hpp @@ -56,6 +56,10 @@ namespace libtorrent virtual bool incoming_packet(error_code const& ec , char const* hostname, char const* buf, int size) { return false; } + // called when the socket becomes writeable, after having + // failed with EWOULDBLOCK + virtual void writable() {} + // called every time the socket is drained of packets virtual void socket_drained() {} }; @@ -161,6 +165,9 @@ namespace libtorrent void call_handler(error_code const& ec, udp::endpoint const& ep, char const* buf, int size); void call_handler(error_code const& ec, const char* host, char const* buf, int size); void call_drained_handler(); + void call_writable_handler(); + + void on_writable(error_code const& ec, udp::socket* s); void setup_read(udp::socket* s); void on_read(udp::socket* s); @@ -233,6 +240,11 @@ namespace libtorrent // operations hanging on this socket int m_outstanding_ops; +#if TORRENT_USE_IPV6 + bool m_v6_write_subscribed:1; +#endif + bool m_v4_write_subscribed:1; + #if defined TORRENT_DEBUG || TORRENT_RELEASE_ASSERTS bool m_started; int m_magic; diff --git a/include/libtorrent/utp_socket_manager.hpp b/include/libtorrent/utp_socket_manager.hpp index 94aa024e1..802c6e096 100644 --- a/include/libtorrent/utp_socket_manager.hpp +++ b/include/libtorrent/utp_socket_manager.hpp @@ -59,6 +59,7 @@ namespace libtorrent , char const* p, int size); virtual bool incoming_packet(error_code const& ec, char const* host, char const* p, int size) { return false; } + virtual void writable(); virtual void socket_drained(); @@ -70,6 +71,7 @@ namespace libtorrent enum { dont_fragment = 1 }; void send_packet(udp::endpoint const& ep, char const* p, int len , error_code& ec, int flags = 0); + void subscribe_writable(utp_socket_impl* s); // internal, used by utp_stream void remove_socket(boost::uint16_t id); @@ -104,6 +106,11 @@ namespace libtorrent // have a chance to do that. This is to avoid sending // an ack for every single packet std::vector m_deferred_acks; + + // list of sockets that received EWOULDBLOCK from the + // underlying socket. They are notified when the socket + // becomes writable again + std::vector m_stalled_sockets; // the last socket we received a packet on utp_socket_impl* m_last_socket; diff --git a/include/libtorrent/utp_stream.hpp b/include/libtorrent/utp_stream.hpp index eba35632a..f227b1cf5 100644 --- a/include/libtorrent/utp_stream.hpp +++ b/include/libtorrent/utp_stream.hpp @@ -152,6 +152,7 @@ udp::endpoint utp_remote_endpoint(utp_socket_impl* s); boost::uint16_t utp_receive_id(utp_socket_impl* s); int utp_socket_state(utp_socket_impl const* s); void utp_send_ack(utp_socket_impl* s); +void utp_writable(utp_socket_impl* s); #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING int socket_impl_size(); diff --git a/src/udp_socket.cpp b/src/udp_socket.cpp index f360c4b2e..ee0f32ab9 100644 --- a/src/udp_socket.cpp +++ b/src/udp_socket.cpp @@ -74,6 +74,10 @@ udp_socket::udp_socket(asio::io_service& ios , m_tunnel_packets(false) , m_abort(false) , m_outstanding_ops(0) +#if TORRENT_USE_IPV6 + , m_v6_write_subscribed(false) +#endif + , m_v4_write_subscribed(false) { #if defined TORRENT_DEBUG || TORRENT_RELEASE_ASSERTS m_magic = 0x1337; @@ -178,13 +182,47 @@ void udp_socket::send(udp::endpoint const& ep, char const* p, int len } #if TORRENT_USE_IPV6 - if (ep.address().is_v4() && m_ipv4_sock.is_open()) + if (ep.address().is_v6() && m_ipv6_sock.is_open()) + m_ipv6_sock.send_to(asio::buffer(p, len), ep, 0, ec); + else #endif m_ipv4_sock.send_to(asio::buffer(p, len), ep, 0, ec); + + if (ec == error::would_block) + { #if TORRENT_USE_IPV6 - else - m_ipv6_sock.send_to(asio::buffer(p, len), ep, 0, ec); + if (ep.address().is_v6() && m_ipv6_sock.is_open()) + { + if (!m_v6_write_subscribed) + { + m_ipv6_sock.async_send(asio::null_buffers() + , boost::bind(&udp_socket::on_writable, this, _1, &m_ipv6_sock)); + m_v6_write_subscribed = true; + } + } + else #endif + { + if (!m_v4_write_subscribed) + { + m_ipv4_sock.async_send(asio::null_buffers() + , boost::bind(&udp_socket::on_writable, this, _1, &m_ipv4_sock)); + m_v4_write_subscribed = true; + } + } + } +} + +void udp_socket::on_writable(error_code const& ec, udp::socket* s) +{ +#if TORRENT_USE_IPV6 + if (s == &m_ipv6_sock) + m_v6_write_subscribed = false; + else +#endif + m_v4_write_subscribed = false; + + call_writable_handler(); } // called whenever the socket is readable @@ -290,6 +328,26 @@ void udp_socket::call_drained_handler() m_observers_locked = false; } +void udp_socket::call_writable_handler() +{ + m_observers_locked = true; + for (std::vector::iterator i = m_observers.begin(); + i != m_observers.end();) + { + TORRENT_TRY { + (*i)->writable(); + } TORRENT_CATCH (std::exception&) {} + if (*i == NULL) i = m_observers.erase(i); + else ++i; + } + if (!m_added_observers.empty()) + { + m_observers.insert(m_observers.end(), m_added_observers.begin(), m_added_observers.end()); + m_added_observers.clear(); + } + m_observers_locked = false; +} + void udp_socket::subscribe(udp_socket_observer* o) { TORRENT_ASSERT(std::find(m_observers.begin(), m_observers.end(), o) == m_observers.end()); diff --git a/src/utp_socket_manager.cpp b/src/utp_socket_manager.cpp index 729056ddb..178fec886 100644 --- a/src/utp_socket_manager.cpp +++ b/src/utp_socket_manager.cpp @@ -276,6 +276,25 @@ namespace libtorrent return false; } + void utp_socket_manager::subscribe_writable(utp_socket_impl* s) + { + TORRENT_ASSERT(std::find(m_stalled_sockets.begin(), m_stalled_sockets.end() + , s) == m_stalled_sockets.end()); + m_stalled_sockets.push_back(s); + } + + void utp_socket_manager::writable() + { + std::vector stalled_sockets; + m_stalled_sockets.swap(stalled_sockets); + for (std::vector::iterator i = stalled_sockets.begin() + , end(stalled_sockets.end()); i != end; ++i) + { + utp_socket_impl* s = *i; + utp_writable(s); + } + } + void utp_socket_manager::socket_drained() { // flush all deferred acks diff --git a/src/utp_stream.cpp b/src/utp_stream.cpp index be40d3a63..ee92d0d67 100644 --- a/src/utp_stream.cpp +++ b/src/utp_stream.cpp @@ -266,6 +266,7 @@ struct utp_socket_impl , m_slow_start(true) , m_cwnd_full(false) , m_deferred_ack(false) + , m_stalled(false) { TORRENT_ASSERT(m_userdata); for (int i = 0; i != num_delay_hist; ++i) @@ -278,6 +279,8 @@ struct utp_socket_impl void init_mtu(int link_mtu, int utp_mtu); bool incoming_packet(boost::uint8_t const* buf, int size , udp::endpoint const& ep, ptime receive_time); + void writable(); + bool should_delete() const; tcp::endpoint remote_endpoint(error_code& ec) const { @@ -627,6 +630,13 @@ struct utp_socket_impl // burst of incoming UDP packets is all drained, the utp socket // manager will send acks for all sockets on this list. bool m_deferred_ack:1; + + // if this socket tries to send a packet via the utp socket + // manager, and it fails with EWOULDBLOCK, the socket + // is stalled and this is set. It's also added to a list + // of sockets in the utp_socket_manager to be notified of + // the socket being writable again + bool m_stalled:1; }; #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING @@ -688,6 +698,13 @@ boost::uint16_t utp_receive_id(utp_socket_impl* s) return s->m_recv_id; } +void utp_writable(utp_socket_impl* s) +{ + TORRENT_ASSERT(s->m_stalled); + s->m_stalled = false; + s->writable(); +} + void utp_send_ack(utp_socket_impl* s) { TORRENT_ASSERT(s->m_deferred_ack); @@ -1196,7 +1213,7 @@ void utp_socket_impl::send_syn() packet* p = (packet*)malloc(sizeof(packet) + sizeof(utp_header)); p->size = sizeof(utp_header); p->header_size = sizeof(utp_header); - p->num_transmissions = 1; + p->num_transmissions = 0; p->need_resend = false; utp_header* h = (utp_header*)p->buf; h->type_ver = (ST_SYN << 4) | 1; @@ -1225,7 +1242,18 @@ void utp_socket_impl::send_syn() m_sm->send_packet(udp::endpoint(m_remote_address, m_port), (char const*)h , sizeof(utp_header), ec); - if (ec) + if (ec == error::would_block) + { +#if TORRENT_UTP_LOG + UTP_LOGV("%8p: socket stalled\n", this); +#endif + if (!m_stalled) + { + m_stalled = true; + m_sm->subscribe_writable(this); + } + } + else if (ec) { free(p); m_error = ec; @@ -1234,6 +1262,9 @@ void utp_socket_impl::send_syn() return; } + if (!m_stalled) + ++p->num_transmissions; + TORRENT_ASSERT(!m_outbuf.at(m_seq_nr)); m_outbuf.insert(m_seq_nr, p); TORRENT_ASSERT(h->seq_nr == m_seq_nr); @@ -1248,6 +1279,19 @@ void utp_socket_impl::send_syn() #endif } +// if a send ever failed with EWOULDBLOCK, we +// subscribe to the udp socket and will be +// signalled with this function. +void utp_socket_impl::writable() +{ +#if TORRENT_UTP_LOG + UTP_LOGV("%8p: writable\n", this); +#endif + while(send_pkt()); + + maybe_trigger_send_callback(time_now_hires()); +} + void utp_socket_impl::send_fin() { INVARIANT_CHECK; @@ -1741,7 +1785,6 @@ bool utp_socket_impl::send_pkt(int flags) h->type_ver = (ST_FIN << 4) | 1; // fill in the timestamp as late as possible - ++p->num_transmissions; ptime now = time_now_hires(); p->send_time = now; h->timestamp_microseconds = boost::uint32_t(total_microseconds(now - min_time())); @@ -1770,7 +1813,7 @@ bool utp_socket_impl::send_pkt(int flags) ++m_out_packets; - if (ec == error::message_size && p->mtu_probe) + if (ec == error::message_size) { m_mtu_ceiling = p->size - 1; if (m_mtu_floor > m_mtu_ceiling) m_mtu_floor = m_mtu_ceiling; @@ -1779,15 +1822,29 @@ bool utp_socket_impl::send_pkt(int flags) // as well, to resend the packet immediately without // it being an MTU probe } + else if (ec == error::would_block) + { +#if TORRENT_UTP_LOG + UTP_LOGV("%8p: socket stalled\n", this); +#endif + if (!m_stalled) + { + m_stalled = true; + m_sm->subscribe_writable(this); + } + } else if (ec) { + if (payload_size) free(p); m_error = ec; m_state = UTP_STATE_ERROR_WAIT; test_socket_state(); - if (payload_size) free(p); return false; } + if (!m_stalled) + ++p->num_transmissions; + // if we have payload, we need to save the packet until it's acked // and progress m_seq_nr if (p->size > p->header_size) @@ -1820,7 +1877,10 @@ bool utp_socket_impl::send_pkt(int flags) TORRENT_ASSERT(h->seq_nr == m_seq_nr); } - return m_write_buffer_size > 0 && !m_cwnd_full; + // if the socket is stalled, always return false, don't + // try to write more packets. We'll keep writing once + // the underlying UDP socket becomes writable + return m_write_buffer_size > 0 && !m_cwnd_full && !m_stalled; } // size is in bytes @@ -1882,7 +1942,6 @@ bool utp_socket_impl::resend_packet(packet* p, bool fast_resend) TORRENT_ASSERT(p->size - p->header_size >= 0); if (p->need_resend) m_bytes_in_flight += p->size - p->header_size; - ++p->num_transmissions; p->need_resend = false; utp_header* h = (utp_header*)p->buf; // update packet header @@ -1926,7 +1985,18 @@ bool utp_socket_impl::resend_packet(packet* p, bool fast_resend) , boost::uint32_t(h->timestamp_difference_microseconds)); #endif - if (ec) + if (ec == error::would_block) + { +#if TORRENT_UTP_LOG + UTP_LOGV("%8p: socket stalled\n", this); +#endif + if (!m_stalled) + { + m_stalled = true; + m_sm->subscribe_writable(this); + } + } + else if (ec) { m_error = ec; m_state = UTP_STATE_ERROR_WAIT; @@ -1934,7 +2004,10 @@ bool utp_socket_impl::resend_packet(packet* p, bool fast_resend) return false; } - return true; + if (!m_stalled) + ++p->num_transmissions; + + return !m_stalled; } void utp_socket_impl::experienced_loss(int seq_nr) @@ -2589,7 +2662,7 @@ bool utp_socket_impl::incoming_packet(boost::uint8_t const* buf, int size if (m_state == UTP_STATE_FIN_SENT) { - send_pkt(true); + send_pkt(pkt_ack); if (m_state == UTP_STATE_ERROR_WAIT || m_state == UTP_STATE_DELETE) return true; } else @@ -2718,7 +2791,7 @@ bool utp_socket_impl::incoming_packet(boost::uint8_t const* buf, int size // try to send more data as long as we can // if send_pkt returns true - while (send_pkt(false)); + while (send_pkt()); if (has_ack && prev_out_packets == m_out_packets) { @@ -3162,7 +3235,7 @@ void utp_socket_impl::tick(ptime const& now) } else if (m_state < UTP_STATE_FIN_SENT) { - send_pkt(false); + send_pkt(); if (m_state == UTP_STATE_ERROR_WAIT || m_state == UTP_STATE_DELETE) return; } else if (m_state == UTP_STATE_FIN_SENT)