diff --git a/include/libtorrent/utp_socket_manager.hpp b/include/libtorrent/utp_socket_manager.hpp index 88d75eb21..51483978c 100644 --- a/include/libtorrent/utp_socket_manager.hpp +++ b/include/libtorrent/utp_socket_manager.hpp @@ -93,6 +93,7 @@ namespace libtorrent int num_sockets() const { return m_utp_sockets.size(); } void defer_ack(utp_socket_impl* s); + void subscribe_drained(utp_socket_impl* s); private: udp_socket& m_sock; @@ -107,6 +108,13 @@ namespace libtorrent // have a chance to do that. This is to avoid sending // an ack for every single packet std::vector m_deferred_acks; + + // sockets that have received or sent packets this + // round, may subscribe to the event of draining the + // UDP socket. At that point they may call the + // user callback function to indicate bytes have been + // sent or received. + std::vector m_drained_event; // list of sockets that received EWOULDBLOCK from the // underlying socket. They are notified when the socket diff --git a/include/libtorrent/utp_stream.hpp b/include/libtorrent/utp_stream.hpp index 387d26029..8d3118a63 100644 --- a/include/libtorrent/utp_stream.hpp +++ b/include/libtorrent/utp_stream.hpp @@ -152,6 +152,7 @@ udp::endpoint utp_remote_endpoint(utp_socket_impl* s); boost::uint16_t utp_receive_id(utp_socket_impl* s); int utp_socket_state(utp_socket_impl const* s); void utp_send_ack(utp_socket_impl* s); +void utp_socket_drained(utp_socket_impl* s); void utp_writable(utp_socket_impl* s); #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING diff --git a/src/utp_socket_manager.cpp b/src/utp_socket_manager.cpp index e7f0b3d05..94534ecd1 100644 --- a/src/utp_socket_manager.cpp +++ b/src/utp_socket_manager.cpp @@ -366,6 +366,15 @@ namespace libtorrent utp_socket_impl* s = *i; utp_send_ack(s); } + + std::vector drained_event; + m_drained_event.swap(drained_event); + for (std::vector::iterator i = drained_event.begin() + , end(drained_event.end()); i != end; ++i) + { + utp_socket_impl* s = *i; + utp_socket_drained(s); + } } void utp_socket_manager::defer_ack(utp_socket_impl* s) @@ -375,6 +384,13 @@ namespace libtorrent m_deferred_acks.push_back(s); } + void utp_socket_manager::subscribe_drained(utp_socket_impl* s) + { + TORRENT_ASSERT(std::find(m_drained_event.begin(), m_drained_event.end(), s) + == m_drained_event.end()); + m_drained_event.push_back(s); + } + void utp_socket_manager::remove_socket(boost::uint16_t id) { socket_map_t::iterator i = m_utp_sockets.find(id); diff --git a/src/utp_stream.cpp b/src/utp_stream.cpp index e34e71f97..cab9658eb 100644 --- a/src/utp_stream.cpp +++ b/src/utp_stream.cpp @@ -223,8 +223,6 @@ struct utp_socket_impl , m_write_handler(0) , m_connect_handler(0) , m_remote_address() - , m_read_timeout() - , m_write_timeout() , m_timeout(time_now_hires() + milliseconds(m_sm->connect_timeout())) , m_last_cwnd_hit(time_now()) , m_last_history_step(time_now_hires()) @@ -266,6 +264,7 @@ struct utp_socket_impl , m_slow_start(true) , m_cwnd_full(false) , m_deferred_ack(false) + , m_subscribe_drained(false) , m_stalled(false) { TORRENT_ASSERT(m_userdata); @@ -298,6 +297,7 @@ struct utp_socket_impl void send_syn(); void send_fin(); + void subscribe_drained(); void defer_ack(); void remove_sack_header(packet* p); @@ -316,8 +316,8 @@ struct utp_socket_impl void do_ledbat(int acked_bytes, int delay, int in_flight, ptime const now); int packet_timeout() const; bool test_socket_state(); - void maybe_trigger_receive_callback(ptime now); - void maybe_trigger_send_callback(ptime now); + void maybe_trigger_receive_callback(); + void maybe_trigger_send_callback(); bool cancel_handlers(error_code const& ec, bool kill); bool consume_incoming_data( utp_header const* ph, boost::uint8_t const* ptr, int payload_size, ptime now); @@ -406,15 +406,6 @@ struct utp_socket_impl packet_buffer m_inbuf; packet_buffer m_outbuf; - // timers when we should trigger the read and - // write callbacks (unless the buffers fill up - // before) - // TODO: 3 remove the read timeout concept. This should not be necessary, use nagle-like semantics instead - ptime m_read_timeout; - - // TODO: 3 remove the write timeout concept. This should not be necessary, use nagle-like semantics instead - ptime m_write_timeout; - // the time when the last packet we sent times out. Including re-sends. // if we ever end up not having sent anything in one second ( // or one mean rtt + 2 average deviations, whichever is greater) @@ -635,6 +626,10 @@ struct utp_socket_impl // manager will send acks for all sockets on this list. bool m_deferred_ack:1; + // this is true if this socket has subscribed to be notified + // when this receive round is done + bool m_subscribe_drained:1; + // if this socket tries to send a packet via the utp socket // manager, and it fails with EWOULDBLOCK, the socket // is stalled and this is set. It's also added to a list @@ -716,6 +711,19 @@ void utp_send_ack(utp_socket_impl* s) s->send_pkt(utp_socket_impl::pkt_ack); } +void utp_socket_drained(utp_socket_impl* s) +{ + s->m_subscribe_drained = false; + + // at this point, we know we won't receive any + // more packets this round. So, we may want to + // call the receive callback function to + // let the user consume it + + s->maybe_trigger_receive_callback(); + s->maybe_trigger_send_callback(); +} + void utp_socket_impl::update_mtu_limits() { INVARIANT_CHECK; @@ -945,7 +953,7 @@ void utp_stream::set_read_handler(handler_t h) // client's buffer right away m_impl->m_read += read_some(false); - m_impl->maybe_trigger_receive_callback(time_now_hires()); + m_impl->maybe_trigger_receive_callback(); } size_t utp_stream::read_some(bool clear_buffers) @@ -1054,7 +1062,7 @@ void utp_stream::set_write_handler(handler_t h) // if there was an error in send_pkt(), m_impl may be // 0 at this point - if (m_impl) m_impl->maybe_trigger_send_callback(time_now_hires()); + if (m_impl) m_impl->maybe_trigger_send_callback(); } void utp_stream::do_connect(tcp::endpoint const& ep, utp_stream::connect_handler_t handler) @@ -1137,41 +1145,35 @@ bool utp_socket_impl::should_delete() const return ret; } -void utp_socket_impl::maybe_trigger_receive_callback(ptime now) +void utp_socket_impl::maybe_trigger_receive_callback() { INVARIANT_CHECK; // nothing has been read or there's no outstanding read operation if (m_read == 0 || m_read_handler == 0) return; - if (m_read > m_read_buffer_size / 2 || now >= m_read_timeout) - { - UTP_LOGV("%8p: calling read handler read:%d\n", this, m_read); - m_read_handler(m_userdata, m_read, m_error, false); - m_read_handler = 0; - m_read = 0; - m_read_buffer_size = 0; - m_read_buffer.clear(); - } + UTP_LOGV("%8p: calling read handler read:%d\n", this, m_read); + m_read_handler(m_userdata, m_read, m_error, false); + m_read_handler = 0; + m_read = 0; + m_read_buffer_size = 0; + m_read_buffer.clear(); } -void utp_socket_impl::maybe_trigger_send_callback(ptime now) +void utp_socket_impl::maybe_trigger_send_callback() { INVARIANT_CHECK; // nothing has been written or there's no outstanding write operation if (m_written == 0 || m_write_handler == 0) return; - if (m_written > m_write_buffer_size * 2 / 3 || now >= m_write_timeout) - { - UTP_LOGV("%8p: calling write handler written:%d\n", this, m_written); + UTP_LOGV("%8p: calling write handler written:%d\n", this, m_written); - m_write_handler(m_userdata, m_written, m_error, false); - m_write_handler = 0; - m_written = 0; - m_write_buffer_size = 0; - m_write_buffer.clear(); - } + m_write_handler(m_userdata, m_written, m_error, false); + m_write_handler = 0; + m_written = 0; + m_write_buffer_size = 0; + m_write_buffer.clear(); } bool utp_socket_impl::destroy() @@ -1311,7 +1313,7 @@ void utp_socket_impl::writable() while(send_pkt()); - maybe_trigger_send_callback(time_now_hires()); + maybe_trigger_send_callback(); } void utp_socket_impl::send_fin() @@ -1488,11 +1490,6 @@ void utp_socket_impl::write_payload(boost::uint8_t* ptr, int size) TORRENT_ASSERT(to_copy < INT_MAX / 2 && m_written < INT_MAX / 2); memcpy(ptr, static_cast(i->buf), to_copy); size -= to_copy; - if (m_written == 0) - { - m_write_timeout = now + milliseconds(300); - UTP_LOGV("%8p: setting write timeout to 300 ms from now\n", this); - } m_written += to_copy; ptr += to_copy; i->len -= to_copy; @@ -1518,6 +1515,17 @@ void utp_socket_impl::write_payload(boost::uint8_t* ptr, int size) #endif } +void utp_socket_impl::subscribe_drained() +{ + INVARIANT_CHECK; + + if (m_subscribe_drained) return; + + UTP_LOGV("%8p: socket drained\n", this); + m_subscribe_drained = true; + m_sm->subscribe_drained(this); +} + void utp_socket_impl::defer_ack() { INVARIANT_CHECK; @@ -2158,11 +2166,6 @@ void utp_socket_impl::incoming(boost::uint8_t const* buf, int size, packet* p, p int to_copy = (std::min)(size, int(target->len)); memcpy(target->buf, buf, to_copy); - if (m_read == 0) - { - m_read_timeout = now + milliseconds(100); - UTP_LOGV("%8p: setting read timeout to 100 ms from now\n", this); - } m_read += to_copy; target->buf = ((boost::uint8_t*)target->buf) + to_copy; target->len -= to_copy; @@ -2182,7 +2185,6 @@ void utp_socket_impl::incoming(boost::uint8_t const* buf, int size, packet* p, p { TORRENT_ASSERT(p == 0 || p->header_size == p->size); free(p); - maybe_trigger_receive_callback(now); return; } } @@ -2197,7 +2199,6 @@ void utp_socket_impl::incoming(boost::uint8_t const* buf, int size, packet* p, p p->header_size = 0; memcpy(p->buf, buf, size); } - if (m_receive_buffer_size == 0) m_read_timeout = now + milliseconds(100); // save this packet until the client issues another read m_receive_buffer.push_back(p); m_receive_buffer_size += p->size - p->header_size; @@ -2271,9 +2272,6 @@ bool utp_socket_impl::consume_incoming_data( UTP_LOGV("%8p: reordered remove inbuf: %d (%d)\n" , this, m_ack_nr, int(m_inbuf.size())); } - - // should we trigger the read handler? - maybe_trigger_receive_callback(now); } else { @@ -2381,7 +2379,7 @@ void utp_socket_impl::init_mtu(int link_mtu, int utp_mtu) // set it to one if ((m_cwnd >> 16) < m_mtu) m_cwnd = boost::int64_t(m_mtu) << 16; - UTP_LOGV("%8p: intializing MTU to: %d [%d, %d]\n" + UTP_LOGV("%8p: initializing MTU to: %d [%d, %d]\n" , this, m_mtu, m_mtu_floor, m_mtu_ceiling); } @@ -2843,7 +2841,10 @@ bool utp_socket_impl::incoming_packet(boost::uint8_t const* buf, int size defer_ack(); } - maybe_trigger_send_callback(receive_time); + // we may want to call the user callback function at the end + // of this round. Subscribe to that event + subscribe_drained(); + if (m_state == UTP_STATE_ERROR_WAIT || m_state == UTP_STATE_DELETE) return true; // Everything up to the FIN has been receieved, respond with a FIN @@ -3159,14 +3160,6 @@ void utp_socket_impl::tick(ptime const& now) TORRENT_ASSERT(m_outbuf.at((m_acked_seq_nr + 1) & ACK_MASK) || ((m_seq_nr - m_acked_seq_nr) & ACK_MASK) <= 1); - // don't hang on to received data for too long, and don't - // wait too long telling the client we've sent some data. - // these functions will trigger time callback if we have - // a reason to and it's been long enough since we sent or - // received the data - maybe_trigger_receive_callback(now); - maybe_trigger_send_callback(now); - // if we're already in an error state, we're just waiting for the // client to perform an operation so that we can communicate the // error. No need to do anything else with this socket diff --git a/test/test_utp.cpp b/test/test_utp.cpp index 19cbec869..7444ac94f 100644 --- a/test/test_utp.cpp +++ b/test/test_utp.cpp @@ -68,6 +68,9 @@ void test_transfer() // make sure we announce to both http and udp trackers sett.prefer_udp_trackers = false; + // speed up loopback connections (by using the full MTU) + sett.utp_dynamic_sock_buf = true; + // for performance testing // sett.disable_hash_checks = true; // sett.utp_delayed_ack = 0;