clean up uTP implementation by not using any timers

This commit is contained in:
Arvid Norberg 2013-02-06 04:38:30 +00:00
parent 5b48490005
commit b3b1180069
5 changed files with 82 additions and 61 deletions

View File

@ -93,6 +93,7 @@ namespace libtorrent
int num_sockets() const { return m_utp_sockets.size(); }
void defer_ack(utp_socket_impl* s);
void subscribe_drained(utp_socket_impl* s);
private:
udp_socket& m_sock;
@ -107,6 +108,13 @@ namespace libtorrent
// have a chance to do that. This is to avoid sending
// an ack for every single packet
std::vector<utp_socket_impl*> m_deferred_acks;
// sockets that have received or sent packets this
// round, may subscribe to the event of draining the
// UDP socket. At that point they may call the
// user callback function to indicate bytes have been
// sent or received.
std::vector<utp_socket_impl*> m_drained_event;
// list of sockets that received EWOULDBLOCK from the
// underlying socket. They are notified when the socket

View File

@ -152,6 +152,7 @@ udp::endpoint utp_remote_endpoint(utp_socket_impl* s);
boost::uint16_t utp_receive_id(utp_socket_impl* s);
int utp_socket_state(utp_socket_impl const* s);
void utp_send_ack(utp_socket_impl* s);
void utp_socket_drained(utp_socket_impl* s);
void utp_writable(utp_socket_impl* s);
#if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING

View File

@ -366,6 +366,15 @@ namespace libtorrent
utp_socket_impl* s = *i;
utp_send_ack(s);
}
std::vector<utp_socket_impl*> drained_event;
m_drained_event.swap(drained_event);
for (std::vector<utp_socket_impl*>::iterator i = drained_event.begin()
, end(drained_event.end()); i != end; ++i)
{
utp_socket_impl* s = *i;
utp_socket_drained(s);
}
}
void utp_socket_manager::defer_ack(utp_socket_impl* s)
@ -375,6 +384,13 @@ namespace libtorrent
m_deferred_acks.push_back(s);
}
void utp_socket_manager::subscribe_drained(utp_socket_impl* s)
{
TORRENT_ASSERT(std::find(m_drained_event.begin(), m_drained_event.end(), s)
== m_drained_event.end());
m_drained_event.push_back(s);
}
void utp_socket_manager::remove_socket(boost::uint16_t id)
{
socket_map_t::iterator i = m_utp_sockets.find(id);

View File

@ -223,8 +223,6 @@ struct utp_socket_impl
, m_write_handler(0)
, m_connect_handler(0)
, m_remote_address()
, m_read_timeout()
, m_write_timeout()
, m_timeout(time_now_hires() + milliseconds(m_sm->connect_timeout()))
, m_last_cwnd_hit(time_now())
, m_last_history_step(time_now_hires())
@ -266,6 +264,7 @@ struct utp_socket_impl
, m_slow_start(true)
, m_cwnd_full(false)
, m_deferred_ack(false)
, m_subscribe_drained(false)
, m_stalled(false)
{
TORRENT_ASSERT(m_userdata);
@ -298,6 +297,7 @@ struct utp_socket_impl
void send_syn();
void send_fin();
void subscribe_drained();
void defer_ack();
void remove_sack_header(packet* p);
@ -316,8 +316,8 @@ struct utp_socket_impl
void do_ledbat(int acked_bytes, int delay, int in_flight, ptime const now);
int packet_timeout() const;
bool test_socket_state();
void maybe_trigger_receive_callback(ptime now);
void maybe_trigger_send_callback(ptime now);
void maybe_trigger_receive_callback();
void maybe_trigger_send_callback();
bool cancel_handlers(error_code const& ec, bool kill);
bool consume_incoming_data(
utp_header const* ph, boost::uint8_t const* ptr, int payload_size, ptime now);
@ -406,15 +406,6 @@ struct utp_socket_impl
packet_buffer m_inbuf;
packet_buffer m_outbuf;
// timers when we should trigger the read and
// write callbacks (unless the buffers fill up
// before)
// TODO: 3 remove the read timeout concept. This should not be necessary, use nagle-like semantics instead
ptime m_read_timeout;
// TODO: 3 remove the write timeout concept. This should not be necessary, use nagle-like semantics instead
ptime m_write_timeout;
// the time when the last packet we sent times out. Including re-sends.
// if we ever end up not having sent anything in one second (
// or one mean rtt + 2 average deviations, whichever is greater)
@ -635,6 +626,10 @@ struct utp_socket_impl
// manager will send acks for all sockets on this list.
bool m_deferred_ack:1;
// this is true if this socket has subscribed to be notified
// when this receive round is done
bool m_subscribe_drained:1;
// if this socket tries to send a packet via the utp socket
// manager, and it fails with EWOULDBLOCK, the socket
// is stalled and this is set. It's also added to a list
@ -716,6 +711,19 @@ void utp_send_ack(utp_socket_impl* s)
s->send_pkt(utp_socket_impl::pkt_ack);
}
void utp_socket_drained(utp_socket_impl* s)
{
s->m_subscribe_drained = false;
// at this point, we know we won't receive any
// more packets this round. So, we may want to
// call the receive callback function to
// let the user consume it
s->maybe_trigger_receive_callback();
s->maybe_trigger_send_callback();
}
void utp_socket_impl::update_mtu_limits()
{
INVARIANT_CHECK;
@ -945,7 +953,7 @@ void utp_stream::set_read_handler(handler_t h)
// client's buffer right away
m_impl->m_read += read_some(false);
m_impl->maybe_trigger_receive_callback(time_now_hires());
m_impl->maybe_trigger_receive_callback();
}
size_t utp_stream::read_some(bool clear_buffers)
@ -1054,7 +1062,7 @@ void utp_stream::set_write_handler(handler_t h)
// if there was an error in send_pkt(), m_impl may be
// 0 at this point
if (m_impl) m_impl->maybe_trigger_send_callback(time_now_hires());
if (m_impl) m_impl->maybe_trigger_send_callback();
}
void utp_stream::do_connect(tcp::endpoint const& ep, utp_stream::connect_handler_t handler)
@ -1137,41 +1145,35 @@ bool utp_socket_impl::should_delete() const
return ret;
}
void utp_socket_impl::maybe_trigger_receive_callback(ptime now)
void utp_socket_impl::maybe_trigger_receive_callback()
{
INVARIANT_CHECK;
// nothing has been read or there's no outstanding read operation
if (m_read == 0 || m_read_handler == 0) return;
if (m_read > m_read_buffer_size / 2 || now >= m_read_timeout)
{
UTP_LOGV("%8p: calling read handler read:%d\n", this, m_read);
m_read_handler(m_userdata, m_read, m_error, false);
m_read_handler = 0;
m_read = 0;
m_read_buffer_size = 0;
m_read_buffer.clear();
}
UTP_LOGV("%8p: calling read handler read:%d\n", this, m_read);
m_read_handler(m_userdata, m_read, m_error, false);
m_read_handler = 0;
m_read = 0;
m_read_buffer_size = 0;
m_read_buffer.clear();
}
void utp_socket_impl::maybe_trigger_send_callback(ptime now)
void utp_socket_impl::maybe_trigger_send_callback()
{
INVARIANT_CHECK;
// nothing has been written or there's no outstanding write operation
if (m_written == 0 || m_write_handler == 0) return;
if (m_written > m_write_buffer_size * 2 / 3 || now >= m_write_timeout)
{
UTP_LOGV("%8p: calling write handler written:%d\n", this, m_written);
UTP_LOGV("%8p: calling write handler written:%d\n", this, m_written);
m_write_handler(m_userdata, m_written, m_error, false);
m_write_handler = 0;
m_written = 0;
m_write_buffer_size = 0;
m_write_buffer.clear();
}
m_write_handler(m_userdata, m_written, m_error, false);
m_write_handler = 0;
m_written = 0;
m_write_buffer_size = 0;
m_write_buffer.clear();
}
bool utp_socket_impl::destroy()
@ -1311,7 +1313,7 @@ void utp_socket_impl::writable()
while(send_pkt());
maybe_trigger_send_callback(time_now_hires());
maybe_trigger_send_callback();
}
void utp_socket_impl::send_fin()
@ -1488,11 +1490,6 @@ void utp_socket_impl::write_payload(boost::uint8_t* ptr, int size)
TORRENT_ASSERT(to_copy < INT_MAX / 2 && m_written < INT_MAX / 2);
memcpy(ptr, static_cast<char const*>(i->buf), to_copy);
size -= to_copy;
if (m_written == 0)
{
m_write_timeout = now + milliseconds(300);
UTP_LOGV("%8p: setting write timeout to 300 ms from now\n", this);
}
m_written += to_copy;
ptr += to_copy;
i->len -= to_copy;
@ -1518,6 +1515,17 @@ void utp_socket_impl::write_payload(boost::uint8_t* ptr, int size)
#endif
}
void utp_socket_impl::subscribe_drained()
{
INVARIANT_CHECK;
if (m_subscribe_drained) return;
UTP_LOGV("%8p: socket drained\n", this);
m_subscribe_drained = true;
m_sm->subscribe_drained(this);
}
void utp_socket_impl::defer_ack()
{
INVARIANT_CHECK;
@ -2158,11 +2166,6 @@ void utp_socket_impl::incoming(boost::uint8_t const* buf, int size, packet* p, p
int to_copy = (std::min)(size, int(target->len));
memcpy(target->buf, buf, to_copy);
if (m_read == 0)
{
m_read_timeout = now + milliseconds(100);
UTP_LOGV("%8p: setting read timeout to 100 ms from now\n", this);
}
m_read += to_copy;
target->buf = ((boost::uint8_t*)target->buf) + to_copy;
target->len -= to_copy;
@ -2182,7 +2185,6 @@ void utp_socket_impl::incoming(boost::uint8_t const* buf, int size, packet* p, p
{
TORRENT_ASSERT(p == 0 || p->header_size == p->size);
free(p);
maybe_trigger_receive_callback(now);
return;
}
}
@ -2197,7 +2199,6 @@ void utp_socket_impl::incoming(boost::uint8_t const* buf, int size, packet* p, p
p->header_size = 0;
memcpy(p->buf, buf, size);
}
if (m_receive_buffer_size == 0) m_read_timeout = now + milliseconds(100);
// save this packet until the client issues another read
m_receive_buffer.push_back(p);
m_receive_buffer_size += p->size - p->header_size;
@ -2271,9 +2272,6 @@ bool utp_socket_impl::consume_incoming_data(
UTP_LOGV("%8p: reordered remove inbuf: %d (%d)\n"
, this, m_ack_nr, int(m_inbuf.size()));
}
// should we trigger the read handler?
maybe_trigger_receive_callback(now);
}
else
{
@ -2381,7 +2379,7 @@ void utp_socket_impl::init_mtu(int link_mtu, int utp_mtu)
// set it to one
if ((m_cwnd >> 16) < m_mtu) m_cwnd = boost::int64_t(m_mtu) << 16;
UTP_LOGV("%8p: intializing MTU to: %d [%d, %d]\n"
UTP_LOGV("%8p: initializing MTU to: %d [%d, %d]\n"
, this, m_mtu, m_mtu_floor, m_mtu_ceiling);
}
@ -2843,7 +2841,10 @@ bool utp_socket_impl::incoming_packet(boost::uint8_t const* buf, int size
defer_ack();
}
maybe_trigger_send_callback(receive_time);
// we may want to call the user callback function at the end
// of this round. Subscribe to that event
subscribe_drained();
if (m_state == UTP_STATE_ERROR_WAIT || m_state == UTP_STATE_DELETE) return true;
// Everything up to the FIN has been receieved, respond with a FIN
@ -3159,14 +3160,6 @@ void utp_socket_impl::tick(ptime const& now)
TORRENT_ASSERT(m_outbuf.at((m_acked_seq_nr + 1) & ACK_MASK) || ((m_seq_nr - m_acked_seq_nr) & ACK_MASK) <= 1);
// don't hang on to received data for too long, and don't
// wait too long telling the client we've sent some data.
// these functions will trigger time callback if we have
// a reason to and it's been long enough since we sent or
// received the data
maybe_trigger_receive_callback(now);
maybe_trigger_send_callback(now);
// if we're already in an error state, we're just waiting for the
// client to perform an operation so that we can communicate the
// error. No need to do anything else with this socket

View File

@ -68,6 +68,9 @@ void test_transfer()
// make sure we announce to both http and udp trackers
sett.prefer_udp_trackers = false;
// speed up loopback connections (by using the full MTU)
sett.utp_dynamic_sock_buf = true;
// for performance testing
// sett.disable_hash_checks = true;
// sett.utp_delayed_ack = 0;