From ee2688b83af277189ddec918aafd04647baefc25 Mon Sep 17 00:00:00 2001 From: arvidn Date: Sat, 11 Feb 2017 14:21:48 -0500 Subject: [PATCH] use unique_ptr for packets in utp_stream --- include/libtorrent/packet_buffer.hpp | 40 +--- include/libtorrent/packet_pool.hpp | 54 +++--- include/libtorrent/utp_socket_manager.hpp | 4 +- include/libtorrent/utp_stream.hpp | 14 -- src/packet_buffer.cpp | 39 ++-- src/utp_stream.cpp | 217 ++++++++-------------- test/test_packet_buffer.cpp | 106 ++++++----- 7 files changed, 199 insertions(+), 275 deletions(-) diff --git a/include/libtorrent/packet_buffer.hpp b/include/libtorrent/packet_buffer.hpp index a4bdea8f0..8cd1d83eb 100644 --- a/include/libtorrent/packet_buffer.hpp +++ b/include/libtorrent/packet_buffer.hpp @@ -35,11 +35,15 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/config.hpp" #include "libtorrent/aux_/unique_ptr.hpp" +#include "libtorrent/packet_pool.hpp" // for packet_ptr/packet_deleter #include #include +#include // for unique_ptr namespace libtorrent { + struct packet; + // this is a circular buffer that automatically resizes // itself as elements are inserted. Elements are indexed // by integers and are assumed to be sequential. Unless the @@ -67,12 +71,12 @@ namespace libtorrent // whenever the element at the cursor is removed, the // cursor is bumped to the next occupied element - class TORRENT_EXTRA_EXPORT packet_buffer_impl + class TORRENT_EXTRA_EXPORT packet_buffer { public: - typedef std::uint32_t index_type; + using index_type = std::uint32_t; - void* insert(index_type idx, void* value); + packet_ptr insert(index_type idx, packet_ptr value); int size() const { return m_size; } @@ -80,9 +84,9 @@ namespace libtorrent int capacity() const { return m_capacity; } - void* at(index_type idx) const; + packet* at(index_type idx) const; - void* remove(index_type idx); + packet_ptr remove(index_type idx); void reserve(int size); @@ -95,7 +99,7 @@ namespace libtorrent #endif private: - aux::unique_ptr m_storage; + aux::unique_ptr m_storage; int m_capacity = 0; // this is the total number of elements that are occupied @@ -108,30 +112,6 @@ namespace libtorrent index_type m_last{0}; }; - template - class packet_buffer : packet_buffer_impl - { - public: - - using packet_buffer_impl::index_type; - using packet_buffer_impl::size; - using packet_buffer_impl::capacity; - using packet_buffer_impl::reserve; - using packet_buffer_impl::cursor; - using packet_buffer_impl::span; - - T* insert(index_type i, T* p) - { - return static_cast(packet_buffer_impl::insert(i, p)); - } - - T* at(index_type idx) const - { return static_cast(packet_buffer_impl::at(idx)); } - - T* remove(index_type idx) - { return static_cast(packet_buffer_impl::remove(idx)); } - }; - } #endif // TORRENT_PACKET_BUFFER_HPP_INCLUDED diff --git a/include/libtorrent/packet_pool.hpp b/include/libtorrent/packet_pool.hpp index 9405c86b2..2ef15937d 100644 --- a/include/libtorrent/packet_pool.hpp +++ b/include/libtorrent/packet_pool.hpp @@ -38,10 +38,23 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/aux_/throw.hpp" #include "libtorrent/aux_/vector.hpp" #include "libtorrent/aux_/numeric_cast.hpp" +#include "libtorrent/time.hpp" #include "libtorrent/assert.hpp" +#include "libtorrent/debug.hpp" // for single_threaded namespace libtorrent { + // internal: some MTU and protocol header sizes constants + constexpr int TORRENT_IPV4_HEADER = 20; + constexpr int TORRENT_IPV6_HEADER = 40; + constexpr int TORRENT_UDP_HEADER = 8; + constexpr int TORRENT_UTP_HEADER = 20; + constexpr int TORRENT_SOCKS5_HEADER = 6; // plus the size of the destination address + + constexpr int TORRENT_ETHERNET_MTU = 1500; + constexpr int TORRENT_TEREDO_MTU = 1280; + constexpr int TORRENT_INET_MIN_MTU = 576; + constexpr int TORRENT_INET_MAX_MTU = 0xffff; // used for out-of-order incoming packets // as well as sent packets that are waiting to be ACKed @@ -81,7 +94,6 @@ namespace libtorrent std::uint8_t buf[1]; }; - struct packet_deleter { // deleter for std::unique_ptr @@ -93,22 +105,22 @@ namespace libtorrent } }; - inline packet* create_packet(int const size) + using packet_ptr = std::unique_ptr; + + inline packet_ptr create_packet(int const size) { packet* p = static_cast(std::malloc(sizeof(packet) + size)); if (p == nullptr) aux::throw_ex(); new (p) packet(); p->allocated = aux::numeric_cast(size); - return p; + return packet_ptr(p); } - using packet_ptr = std::unique_ptr; - struct TORRENT_EXTRA_EXPORT packet_slab { int const allocate_size; - explicit packet_slab(std::size_t const limit, int const alloc_size) + explicit packet_slab(int const alloc_size, std::size_t const limit = 10) : allocate_size(alloc_size) , m_limit(limit) { @@ -125,7 +137,7 @@ namespace libtorrent packet_ptr alloc() { - if (m_storage.empty()) return packet_ptr(create_packet(allocate_size)); + if (m_storage.empty()) return create_packet(allocate_size); auto ret = std::move(m_storage.back()); m_storage.pop_back(); return ret; @@ -139,35 +151,33 @@ namespace libtorrent private: const std::size_t m_limit; - aux::vector m_storage; + std::vector m_storage; }; // single thread packet allocation packet pool // can handle common cases of packet size by 3 pools struct TORRENT_EXTRA_EXPORT packet_pool : private single_threaded { - packet* acquire(int allocate) + packet_ptr acquire(int const allocate) { TORRENT_ASSERT(is_single_thread()); TORRENT_ASSERT(allocate >= 0); TORRENT_ASSERT(allocate <= std::numeric_limits::max()); - packet_ptr p{ alloc(allocate) }; - return p.release(); + return alloc(allocate); } - void release(packet* p) + void release(packet_ptr p) { TORRENT_ASSERT(is_single_thread()); - if (p == nullptr) return; + if (p.get() == nullptr) return; - packet_ptr pp(p); // takes ownership and may auto free if slab container does not move it - int const allocated = pp->allocated; + int const allocated = p->allocated; - if (allocated == m_syn_slab.allocate_size) { m_syn_slab.try_push_back(pp); } - else if (allocated == m_mtu_floor_slab.allocate_size) { m_mtu_floor_slab.try_push_back(pp); } - else if (allocated == m_mtu_ceiling_slab.allocate_size) { m_mtu_ceiling_slab.try_push_back(pp); } + if (allocated == m_syn_slab.allocate_size) { m_syn_slab.try_push_back(p); } + else if (allocated == m_mtu_floor_slab.allocate_size) { m_mtu_floor_slab.try_push_back(p); } + else if (allocated == m_mtu_ceiling_slab.allocate_size) { m_mtu_ceiling_slab.try_push_back(p); } } // periodically free up some of the cached packets @@ -186,13 +196,13 @@ namespace libtorrent if (allocate <= m_syn_slab.allocate_size) { return m_syn_slab.alloc(); } else if (allocate <= m_mtu_floor_slab.allocate_size) { return m_mtu_floor_slab.alloc(); } else if (allocate <= m_mtu_ceiling_slab.allocate_size) { return m_mtu_ceiling_slab.alloc(); } - return packet_ptr(create_packet(allocate)); + return create_packet(allocate); } static int const mtu_floor_size = TORRENT_INET_MIN_MTU - TORRENT_IPV4_HEADER - TORRENT_UDP_HEADER; static int const mtu_ceiling_size = TORRENT_ETHERNET_MTU - TORRENT_IPV4_HEADER - TORRENT_UDP_HEADER; - packet_slab m_syn_slab{ 10, sizeof(utp_header) }; - packet_slab m_mtu_floor_slab{ 10, mtu_floor_size }; - packet_slab m_mtu_ceiling_slab{ 10 , mtu_ceiling_size }; + packet_slab m_syn_slab{ TORRENT_UTP_HEADER }; + packet_slab m_mtu_floor_slab{ mtu_floor_size }; + packet_slab m_mtu_ceiling_slab{ mtu_ceiling_size }; }; } diff --git a/include/libtorrent/utp_socket_manager.hpp b/include/libtorrent/utp_socket_manager.hpp index 4c2085494..b50b84e1d 100644 --- a/include/libtorrent/utp_socket_manager.hpp +++ b/include/libtorrent/utp_socket_manager.hpp @@ -120,8 +120,8 @@ namespace libtorrent // the counter is the enum from ``counters``. void inc_stats_counter(int counter, int delta = 1); - packet *acquire_packet(int allocate) { return m_packet_pool.acquire(allocate); } - void release_packet(packet *p) { m_packet_pool.release(p); } + packet_ptr acquire_packet(int const allocate) { return m_packet_pool.acquire(allocate); } + void release_packet(packet_ptr p) { m_packet_pool.release(std::move(p)); } void decay() { m_packet_pool.decay(); } private: diff --git a/include/libtorrent/utp_stream.hpp b/include/libtorrent/utp_stream.hpp index 107e3735b..a85a880cd 100644 --- a/include/libtorrent/utp_stream.hpp +++ b/include/libtorrent/utp_stream.hpp @@ -73,20 +73,6 @@ namespace libtorrent struct utp_socket_manager; - // internal: some MTU and protocol header sizes constants - enum - { - TORRENT_IPV4_HEADER = 20, - TORRENT_IPV6_HEADER = 40, - TORRENT_UDP_HEADER = 8, - TORRENT_SOCKS5_HEADER = 6, // plus the size of the destination address - - TORRENT_ETHERNET_MTU = 1500, - TORRENT_TEREDO_MTU = 1280, - TORRENT_INET_MIN_MTU = 576, - TORRENT_INET_MAX_MTU = 0xffff - }; - // internal: the point of the bif_endian_int is two-fold // one purpose is to not have any alignment requirements // so that any buffer received from the network can be cast diff --git a/src/packet_buffer.cpp b/src/packet_buffer.cpp index c734a5963..e2112b2f9 100644 --- a/src/packet_buffer.cpp +++ b/src/packet_buffer.cpp @@ -42,7 +42,7 @@ namespace libtorrent { , std::uint32_t mask); #if TORRENT_USE_INVARIANT_CHECKS - void packet_buffer_impl::check_invariant() const + void packet_buffer::check_invariant() const { int count = 0; for (int i = 0; i < int(m_capacity); ++i) @@ -53,7 +53,7 @@ namespace libtorrent { } #endif - void* packet_buffer_impl::insert(index_type idx, void* value) + packet_ptr packet_buffer::insert(index_type idx, packet_ptr value) { INVARIANT_CHECK; @@ -61,7 +61,7 @@ namespace libtorrent { // you're not allowed to insert NULLs! TORRENT_ASSERT(value); - if (value == nullptr) return remove(idx); + if (!value) return remove(idx); if (m_size != 0) { @@ -108,34 +108,32 @@ namespace libtorrent { if (m_capacity == 0) reserve(16); - void* old_value = m_storage[idx & (m_capacity - 1)]; - m_storage[idx & (m_capacity - 1)] = value; + packet_ptr old_value = std::move(m_storage[idx & (m_capacity - 1)]); + m_storage[idx & (m_capacity - 1)] = std::move(value); if (m_size == 0) m_first = idx; // if we're just replacing an old value, the number // of elements in the buffer doesn't actually increase - if (old_value == nullptr) ++m_size; + if (!old_value) ++m_size; TORRENT_ASSERT_VAL(m_first <= 0xffff, m_first); return old_value; } - void* packet_buffer_impl::at(index_type idx) const + packet* packet_buffer::at(index_type idx) const { INVARIANT_CHECK; if (idx >= m_first + m_capacity) return nullptr; if (compare_less_wrap(idx, m_first, 0xffff)) - { return nullptr; - } std::size_t const mask = m_capacity - 1; - return m_storage[idx & mask]; + return m_storage[idx & mask].get(); } - void packet_buffer_impl::reserve(int size) + void packet_buffer::reserve(int size) { INVARIANT_CHECK; TORRENT_ASSERT_VAL(size <= 0xffff, size); @@ -144,31 +142,28 @@ namespace libtorrent { while (new_size < size) new_size <<= 1; - aux::unique_ptr new_storage(new void*[new_size]); - - for (index_type i = 0; i < std::uint32_t(new_size); ++i) - new_storage[i] = nullptr; + aux::unique_ptr new_storage(new packet_ptr[new_size]); for (index_type i = m_first; i < (m_first + m_capacity); ++i) - new_storage[i & (new_size - 1)] = m_storage[i & (m_capacity - 1)]; + new_storage[i & (new_size - 1)] = std::move(m_storage[i & (m_capacity - 1)]); m_storage = std::move(new_storage); m_capacity = new_size; } - void* packet_buffer_impl::remove(index_type idx) + packet_ptr packet_buffer::remove(index_type idx) { INVARIANT_CHECK; // TODO: use compare_less_wrap for this comparison as well if (idx >= m_first + m_capacity) - return nullptr; + return packet_ptr(); if (compare_less_wrap(idx, m_first, 0xffff)) - return nullptr; + return packet_ptr(); std::size_t const mask = m_capacity - 1; - void* old_value = m_storage[idx & mask]; - m_storage[idx & mask] = nullptr; + packet_ptr old_value = std::move(m_storage[idx & mask]); + m_storage[idx & mask].reset(); if (old_value) { @@ -196,5 +191,5 @@ namespace libtorrent { TORRENT_ASSERT_VAL(m_first <= 0xffff, m_first); return old_value; } - } + diff --git a/src/utp_stream.cpp b/src/utp_stream.cpp index 0bd46e396..a69d7eecc 100644 --- a/src/utp_stream.cpp +++ b/src/utp_stream.cpp @@ -286,10 +286,10 @@ struct utp_socket_impl void parse_close_reason(std::uint8_t const* ptr, int size); void write_payload(std::uint8_t* ptr, int size); void maybe_inc_acked_seq_nr(); - void ack_packet(packet* p, time_point const& receive_time + void ack_packet(packet_ptr p, time_point const& receive_time , std::uint32_t& min_rtt, std::uint16_t seq_nr); void write_sack(std::uint8_t* buf, int size) const; - void incoming(std::uint8_t const* buf, int size, packet* p, time_point now); + void incoming(std::uint8_t const* buf, int size, packet_ptr p, time_point now); void do_ledbat(int acked_bytes, int delay, int in_flight); int packet_timeout() const; bool test_socket_state(); @@ -303,8 +303,8 @@ struct utp_socket_impl void set_state(int s); - packet *acquire_packet(int allocate) { return m_sm->acquire_packet(allocate); } - void release_packet(packet *p) { m_sm->release_packet(p); } + packet_ptr acquire_packet(int const allocate) { return m_sm->acquire_packet(allocate); } + void release_packet(packet_ptr p) { m_sm->release_packet(std::move(p)); } private: @@ -359,7 +359,7 @@ public: // if this is non nullptr, it's a packet. This packet was held off because // of NAGLE. We couldn't send it immediately. It's left // here to accrue more bytes before we send it. - packet* m_nagle_packet = nullptr; + packet_ptr m_nagle_packet; // the user provided read buffer. If this has a size greater // than 0, we'll always prefer using it over putting received @@ -372,7 +372,7 @@ public: // packets we've received without a read operation // active. Store them here until the client triggers // an async_read_some - std::vector m_receive_buffer; + std::vector m_receive_buffer; // this is the error on this socket. If m_state is // set to UTP_STATE_ERROR_WAIT, this error should be @@ -391,8 +391,8 @@ public: // the send and receive buffers // maps packet sequence numbers - packet_buffer m_inbuf; - packet_buffer m_outbuf; + packet_buffer m_inbuf; + packet_buffer m_outbuf; // the time when the last packet we sent times out. Including re-sends. // if we ever end up not having sent anything in one second ( @@ -1005,7 +1005,7 @@ size_t utp_stream::read_some(bool clear_buffers) size_t ret = 0; int pop_packets = 0; - for (std::vector::iterator i = m_impl->m_receive_buffer.begin() + for (auto i = m_impl->m_receive_buffer.begin() , end(m_impl->m_receive_buffer.end()); i != end;) { if (target == m_impl->m_read_buffer.end()) @@ -1018,8 +1018,8 @@ size_t utp_stream::read_some(bool clear_buffers) m_impl->check_receive_buffers(); - packet* p = *i; - int to_copy = (std::min)(p->size - p->header_size, int(target->len)); + packet* p = i->get(); + int to_copy = std::min(p->size - p->header_size, int(target->len)); TORRENT_ASSERT(to_copy >= 0); memcpy(target->buf, p->buf + p->header_size, to_copy); ret += to_copy; @@ -1039,9 +1039,9 @@ size_t utp_stream::read_some(bool clear_buffers) // Consumed entire packet if (p->header_size == p->size) { - m_impl->release_packet(p); + m_impl->release_packet(std::move(*i)); + i->reset(); ++pop_packets; - *i = nullptr; ++i; } @@ -1131,25 +1131,22 @@ utp_socket_impl::~utp_socket_impl() + m_inbuf.capacity()) & ACK_MASK); i != end; i = (i + 1) & ACK_MASK) { - packet* p = m_inbuf.remove(i); - release_packet(p); + packet_ptr p = m_inbuf.remove(i); + release_packet(std::move(p)); } for (std::uint16_t i = std::uint16_t(m_outbuf.cursor()), end((m_outbuf.cursor() + m_outbuf.capacity()) & ACK_MASK); i != end; i = (i + 1) & ACK_MASK) { - packet* p = m_outbuf.remove(i); - release_packet(p); + packet_ptr p = m_outbuf.remove(i); + release_packet(std::move(p)); } - for (std::vector::iterator i = m_receive_buffer.begin() - , end = m_receive_buffer.end(); i != end; ++i) - { - release_packet(*i); - } + for (auto& p : m_receive_buffer) + release_packet(std::move(p)); - release_packet(m_nagle_packet); - m_nagle_packet = nullptr; + release_packet(std::move(m_nagle_packet)); + m_nagle_packet.reset(); } bool utp_socket_impl::should_delete() const @@ -1278,7 +1275,7 @@ void utp_socket_impl::send_syn() m_ack_nr = 0; m_fast_resend_seq_nr = m_seq_nr; - packet* p = acquire_packet(sizeof(utp_header)); + packet_ptr p = acquire_packet(sizeof(utp_header)); p->size = sizeof(utp_header); p->header_size = sizeof(utp_header); p->num_transmissions = 0; @@ -1328,7 +1325,7 @@ void utp_socket_impl::send_syn() } else if (ec) { - release_packet(p); + release_packet(std::move(p)); m_error = ec; set_state(UTP_STATE_ERROR_WAIT); test_socket_state(); @@ -1339,9 +1336,9 @@ void utp_socket_impl::send_syn() ++p->num_transmissions; TORRENT_ASSERT(!m_outbuf.at(m_seq_nr)); - m_outbuf.insert(m_seq_nr, p); TORRENT_ASSERT(h->seq_nr == m_seq_nr); TORRENT_ASSERT(p->buf == reinterpret_cast(h)); + m_outbuf.insert(m_seq_nr, std::move(p)); m_seq_nr = (m_seq_nr + 1) & ACK_MASK; @@ -1484,14 +1481,14 @@ void utp_socket_impl::parse_sack(std::uint16_t packet_ack, std::uint8_t const* p if (compare_less_wrap(m_fast_resend_seq_nr, ack_nr, ACK_MASK)) ++dups; // this bit was set, ack_nr was received - packet* p = m_outbuf.remove(ack_nr); + packet_ptr p = m_outbuf.remove(ack_nr); if (p) { *acked_bytes += p->size - p->header_size; // each ACKed packet counts as a duplicate ack UTP_LOGV("%8p: duplicate_acks:%u fast_resend_seq_nr:%u\n" , static_cast(this), m_duplicate_acks, m_fast_resend_seq_nr); - ack_packet(p, now, min_rtt, std::uint16_t(ack_nr)); + ack_packet(std::move(p), now, min_rtt, std::uint16_t(ack_nr)); } else { @@ -1559,7 +1556,7 @@ void utp_socket_impl::write_payload(std::uint8_t* ptr, int size) while (size > 0) { // i points to the iovec we'll start copying from - int to_copy = (std::min)(size, int(i->len)); + int to_copy = std::min(size, int(i->len)); TORRENT_ASSERT(to_copy >= 0); TORRENT_ASSERT(to_copy < INT_MAX / 2 && m_written < INT_MAX / 2); memcpy(ptr, static_cast(i->buf), to_copy); @@ -1637,34 +1634,6 @@ void utp_socket_impl::remove_sack_header(packet* p) p->size -= std::uint16_t(sack_size + 2); } -struct packet_holder -{ - explicit packet_holder(utp_socket_manager* sm, packet* p = nullptr) : m_sm(sm), m_packet(p) {} - ~packet_holder() { m_sm->release_packet(m_packet); } - - void reset(packet* p) - { - m_sm->release_packet(m_packet); - m_packet = p; - } - - packet* release() - { - packet* ret = m_packet; - m_packet = nullptr; - return ret; - } - -private: - - // not copyable - packet_holder(packet_holder const&); - packet_holder& operator=(packet_holder const&); - - utp_socket_manager* m_sm; - packet* m_packet; -}; - // sends a packet, pulls data from the write buffer (if there's any) // if ack is true, we need to send a packet regardless of if there's // any data. Returns true if we could send more data (i.e. call @@ -1737,14 +1706,14 @@ bool utp_socket_impl::send_pkt(int const flags) // for non MTU-probes, use the conservative packet size int const effective_mtu = mtu_probe ? m_mtu : m_mtu_floor; - int payload_size = (std::min)(m_write_buffer_size + int payload_size = std::min(m_write_buffer_size , effective_mtu - header_size); TORRENT_ASSERT(payload_size >= 0); // if we have one MSS worth of data, make sure it fits in our // congestion window and the advertised receive window from // the other end. - if (m_bytes_in_flight + payload_size > (std::min)(int(m_cwnd >> 16) + if (m_bytes_in_flight + payload_size > std::min(int(m_cwnd >> 16) , int(m_adv_wnd - m_bytes_in_flight))) { // this means there's not enough room in the send window for @@ -1794,46 +1763,20 @@ bool utp_socket_impl::send_pkt(int const flags) int packet_size = header_size + payload_size; - packet* p = nullptr; + packet_ptr p; std::uint8_t* ptr = nullptr; utp_header* h = nullptr; -#if TORRENT_USE_ASSERTS - bool stack_alloced = false; -#endif - - // used to free the packet buffer in case we exit the - // function early - packet_holder holder(m_sm); - // payload size being zero means we're just sending // an force. We should not pick up the nagle packet if (!m_nagle_packet || (payload_size == 0 && force)) { - // we only need a heap allocation if we have payload and - // need to keep the packet around (in the outbuf) + p = acquire_packet(effective_mtu); + if (payload_size) { - p = acquire_packet(effective_mtu); - holder.reset(p); - m_sm->inc_stats_counter(counters::utp_payload_pkts_out); } - else - { -#if TORRENT_USE_ASSERTS - stack_alloced = true; -#endif - TORRENT_ASSERT(force); - // this alloca() statement won't necessarily produce - // correctly aligned memory. That's why we ask for 7 more bytes - // and adjust our pointer to be aligned later - TORRENT_ALLOCA(ps, char, sizeof(packet) + packet_size + sizeof(packet*) - 1); - p = reinterpret_cast(ps.data()); - p = reinterpret_cast(align_pointer(p)); - UTP_LOGV("%8p: allocating %d bytes on the stack\n", static_cast(this), packet_size); - p->allocated = std::uint16_t(packet_size); - } p->size = std::uint16_t(packet_size); p->header_size = std::uint16_t(packet_size - payload_size); @@ -1861,7 +1804,8 @@ bool utp_socket_impl::send_pkt(int const flags) else { // pick up the nagle packet and keep adding bytes to it - p = m_nagle_packet; + p = std::move(m_nagle_packet); + m_nagle_packet.reset(); ptr = p->buf + sizeof(utp_header); h = reinterpret_cast(p->buf); @@ -1877,14 +1821,14 @@ bool utp_socket_impl::send_pkt(int const flags) if (m_inbuf.size() == 0) { // we need to remove the sack header - remove_sack_header(p); + remove_sack_header(p.get()); sack = 0; } } else sack = 0; - std::int32_t const size_left = (std::min)(p->allocated - p->size + std::int32_t const size_left = std::min(p->allocated - p->size , m_write_buffer_size); write_payload(p->buf + p->size, size_left); @@ -1897,17 +1841,16 @@ bool utp_socket_impl::send_pkt(int const flags) // if we didn't, we may still send it if there's // no bytes in flight if (m_bytes_in_flight > 0 - && p->size < p->allocated + && p->size < std::min(p->allocated, m_mtu_floor) && !force && m_nagle) { + // the packet is still not a full MSS, so put it back into the nagle + // packet + m_nagle_packet = std::move(p); return false; } - // clear the nagle packet pointer and fall through - // sending p - m_nagle_packet = nullptr; - packet_size = p->size; payload_size = p->size - p->header_size; } @@ -1942,10 +1885,9 @@ bool utp_socket_impl::send_pkt(int const flags) "adv_wnd:%d in-flight:%d mtu:%d effective_mtu:%d\n" , static_cast(this), m_write_buffer_size, int(m_cwnd >> 16) , m_adv_wnd, m_bytes_in_flight, m_mtu, effective_mtu); - TORRENT_ASSERT(m_nagle_packet == nullptr); + TORRENT_ASSERT(!m_nagle_packet); TORRENT_ASSERT(h->seq_nr == m_seq_nr); - m_nagle_packet = p; - holder.release(); + m_nagle_packet = std::move(p); return false; } @@ -1962,7 +1904,7 @@ bool utp_socket_impl::send_pkt(int const flags) } h->timestamp_difference_microseconds = m_reply_micro; - h->wnd_size = (std::max)(m_in_buf_size - m_buffered_incoming_bytes + h->wnd_size = std::max(m_in_buf_size - m_buffered_incoming_bytes - m_receive_buffer_size, std::int32_t(0)); h->ack_nr = m_ack_nr; @@ -2036,10 +1978,10 @@ bool utp_socket_impl::send_pkt(int const flags) } else if (ec) { - TORRENT_ASSERT(stack_alloced != bool(payload_size != 0)); m_error = ec; set_state(UTP_STATE_ERROR_WAIT); test_socket_state(); + release_packet(std::move(p)); return false; } @@ -2052,7 +1994,7 @@ bool utp_socket_impl::send_pkt(int const flags) { // if we're sending a payload packet, there should not // be a nagle packet waiting for more data - TORRENT_ASSERT(m_nagle_packet == nullptr); + TORRENT_ASSERT(!m_nagle_packet); #if !TORRENT_UT_SEQ // if the other end closed the connection immediately @@ -2069,18 +2011,18 @@ bool utp_socket_impl::send_pkt(int const flags) // release the buffer, we're saving it in the circular // buffer of outgoing packets - holder.release(); - packet* old = m_outbuf.insert(m_seq_nr, p); + int const new_in_flight = p->size - p->header_size; + packet_ptr old = m_outbuf.insert(m_seq_nr, std::move(p)); if (old) { // TORRENT_ASSERT(reinterpret_cast(old->buf)->seq_nr == m_seq_nr); if (!old->need_resend) m_bytes_in_flight -= old->size - old->header_size; - release_packet(old); + release_packet(std::move(old)); } TORRENT_ASSERT(h->seq_nr == m_seq_nr); m_seq_nr = (m_seq_nr + 1) & ACK_MASK; TORRENT_ASSERT(payload_size >= 0); - m_bytes_in_flight += p->size - p->header_size; + m_bytes_in_flight += new_in_flight; } else { @@ -2140,7 +2082,7 @@ bool utp_socket_impl::resend_packet(packet* p, bool fast_resend) // since we can't re-packetize, some packets that are // larger than the congestion window must be allowed through // but only if we don't have any outstanding bytes - int window_size_left = (std::min)(int(m_cwnd >> 16), int(m_adv_wnd)) - m_bytes_in_flight; + int window_size_left = std::min(int(m_cwnd >> 16), int(m_adv_wnd)) - m_bytes_in_flight; if (!fast_resend && p->size - p->header_size > window_size_left && m_bytes_in_flight > 0) @@ -2254,7 +2196,7 @@ void utp_socket_impl::experienced_loss(int const seq_nr) if (compare_less_wrap(seq_nr, m_loss_seq_nr + 1, ACK_MASK)) return; // cut window size in 2 - m_cwnd = (std::max)(m_cwnd * m_sm->loss_multiplier() / 100 + m_cwnd = std::max(m_cwnd * m_sm->loss_multiplier() / 100 , std::int64_t(m_mtu) * (1 << 16)); m_loss_seq_nr = m_seq_nr; UTP_LOGV("%8p: Lost packet %d caused cwnd cut\n", static_cast(this), seq_nr); @@ -2312,7 +2254,7 @@ void utp_socket_impl::maybe_inc_acked_seq_nr() m_duplicate_acks = 0; } -void utp_socket_impl::ack_packet(packet* p, time_point const& receive_time +void utp_socket_impl::ack_packet(packet_ptr p, time_point const& receive_time , std::uint32_t& min_rtt, std::uint16_t seq_nr) { #ifdef TORRENT_EXPENSIVE_INVARIANT_CHECKS @@ -2335,7 +2277,7 @@ void utp_socket_impl::ack_packet(packet* p, time_point const& receive_time { TORRENT_ASSERT(p->mtu_probe); // our mtu probe was acked! - m_mtu_floor = (std::max)(m_mtu_floor, p->size); + m_mtu_floor = std::max(m_mtu_floor, p->size); if (m_mtu_ceiling < m_mtu_floor) m_mtu_ceiling = m_mtu_floor; update_mtu_limits(); } @@ -2358,10 +2300,10 @@ void utp_socket_impl::ack_packet(packet* p, time_point const& receive_time m_rtt.add_sample(rtt / 1000); if (rtt < min_rtt) min_rtt = rtt; - release_packet(p); + release_packet(std::move(p)); } -void utp_socket_impl::incoming(std::uint8_t const* buf, int size, packet* p +void utp_socket_impl::incoming(std::uint8_t const* buf, int size, packet_ptr p , time_point /* now */) { #ifdef TORRENT_EXPENSIVE_INVARIANT_CHECKS @@ -2378,7 +2320,7 @@ void utp_socket_impl::incoming(std::uint8_t const* buf, int size, packet* p } iovec_t* target = &m_read_buffer.front(); - int to_copy = (std::min)(size, int(target->len)); + int const to_copy = std::min(size, int(target->len)); std::memcpy(target->buf, buf, to_copy); m_read += to_copy; target->buf = reinterpret_cast(target->buf) + to_copy; @@ -2397,8 +2339,8 @@ void utp_socket_impl::incoming(std::uint8_t const* buf, int size, packet* p if (size == 0) { - TORRENT_ASSERT(p == nullptr || p->header_size == p->size); - release_packet(p); + TORRENT_ASSERT(!p || p->header_size == p->size); + release_packet(std::move(p)); return; } } @@ -2414,8 +2356,8 @@ void utp_socket_impl::incoming(std::uint8_t const* buf, int size, packet* p std::memcpy(p->buf, buf, size); } // save this packet until the client issues another read - m_receive_buffer.push_back(p); m_receive_buffer_size += p->size - p->header_size; + m_receive_buffer.emplace_back(std::move(p)); UTP_LOGV("%8p: incoming: saving packet in receive buffer (%d)\n", static_cast(this), m_receive_buffer_size); @@ -2477,7 +2419,7 @@ bool utp_socket_impl::consume_incoming_data( if (ph->seq_nr == ((m_ack_nr + 1) & ACK_MASK)) { - TORRENT_ASSERT(m_inbuf.at(m_ack_nr) == nullptr); + TORRENT_ASSERT(!m_inbuf.at(m_ack_nr)); if (m_buffered_incoming_bytes + m_receive_buffer_size + payload_size > m_in_buf_size) { @@ -2487,12 +2429,12 @@ bool utp_socket_impl::consume_incoming_data( } // we received a packet in order - incoming(ptr, payload_size, nullptr, now); + incoming(ptr, payload_size, packet_ptr(), now); m_ack_nr = (m_ack_nr + 1) & ACK_MASK; // If this packet was previously in the reorder buffer // it would have been acked when m_ack_nr-1 was acked. - TORRENT_ASSERT(m_inbuf.at(m_ack_nr) == nullptr); + TORRENT_ASSERT(!m_inbuf.at(m_ack_nr)); UTP_LOGV("%8p: remove inbuf: %d (%d)\n" , static_cast(this), m_ack_nr, int(m_inbuf.size())); @@ -2501,12 +2443,12 @@ bool utp_socket_impl::consume_incoming_data( { int const next_ack_nr = (m_ack_nr + 1) & ACK_MASK; - packet* p = m_inbuf.remove(next_ack_nr); + packet_ptr p = m_inbuf.remove(next_ack_nr); if (!p) break; m_buffered_incoming_bytes -= p->size - p->header_size; - incoming(nullptr, p->size - p->header_size, p, now); + incoming(nullptr, p->size - p->header_size, std::move(p), now); m_ack_nr = std::uint16_t(next_ack_nr); @@ -2544,7 +2486,7 @@ bool utp_socket_impl::consume_incoming_data( } // we don't need to save the packet header, just the payload - packet* p = acquire_packet(payload_size); + packet_ptr p = acquire_packet(payload_size); p->size = std::uint16_t(payload_size); p->header_size = 0; p->num_transmissions = 0; @@ -2553,8 +2495,8 @@ bool utp_socket_impl::consume_incoming_data( #endif p->need_resend = false; std::memcpy(p->buf, ptr, payload_size); - m_inbuf.insert(ph->seq_nr, p); m_buffered_incoming_bytes += p->size; + m_inbuf.insert(ph->seq_nr, std::move(p)); UTP_LOGV("%8p: out of order. insert inbuf: %d (%d) m_ack_nr: %d\n" , static_cast(this), int(ph->seq_nr), int(m_inbuf.size()), m_ack_nr); @@ -2859,12 +2801,12 @@ bool utp_socket_impl::incoming_packet(span buf { if (m_fast_resend_seq_nr == ack_nr) m_fast_resend_seq_nr = (m_fast_resend_seq_nr + 1) & ACK_MASK; - packet* p = m_outbuf.remove(ack_nr); + packet_ptr p = m_outbuf.remove(ack_nr); if (!p) continue; acked_bytes += p->size - p->header_size; - ack_packet(p, receive_time, min_rtt, std::uint16_t(ack_nr)); + ack_packet(std::move(p), receive_time, min_rtt, std::uint16_t(ack_nr)); } maybe_inc_acked_seq_nr(); @@ -3089,7 +3031,7 @@ bool utp_socket_impl::incoming_packet(span buf m_send_delay = delay; } - m_recv_delay = (std::min)(their_delay, min_rtt); + m_recv_delay = std::min(their_delay, min_rtt); consume_incoming_data(ph, ptr, payload_size, receive_time); @@ -3334,7 +3276,7 @@ void utp_socket_impl::do_ledbat(const int acked_bytes, const int delay TORRENT_ASSERT(in_flight > 0); TORRENT_ASSERT(acked_bytes > 0); - const int target_delay = (std::max)(1, m_sm->target_delay()); + const int target_delay = std::max(1, m_sm->target_delay()); // true if the upper layer is pushing enough data down the socket to be // limited by the cwnd. If this is not the case, we should not adjust cwnd. @@ -3387,7 +3329,7 @@ void utp_socket_impl::do_ledbat(const int acked_bytes, const int delay } else { - scaled_gain = (std::max)(exponential_gain, linear_gain); + scaled_gain = std::max(exponential_gain, linear_gain); } } else @@ -3424,7 +3366,7 @@ void utp_socket_impl::do_ledbat(const int acked_bytes, const int delay TORRENT_ASSERT(m_cwnd >= 0); - int window_size_left = (std::min)(int(m_cwnd >> 16), int(m_adv_wnd)) - in_flight + acked_bytes; + int window_size_left = std::min(int(m_cwnd >> 16), int(m_adv_wnd)) - in_flight + acked_bytes; if (window_size_left >= m_mtu) { UTP_LOGV("%8p: mtu:%d in_flight:%d adv_wnd:%d cwnd:%d acked_bytes:%d cwnd_full -> 0\n" @@ -3461,7 +3403,7 @@ int utp_socket_impl::packet_timeout() const // avoid overflow by simply capping based on number of timeouts as well if (m_num_timeouts >= 7) return 60000; - int timeout = (std::max)(m_sm->min_timeout(), m_rtt.mean() + m_rtt.avg_deviation() * 2); + int timeout = std::max(m_sm->min_timeout(), m_rtt.mean() + m_rtt.avg_deviation() * 2); if (m_num_timeouts > 0) timeout += (1 << (int(m_num_timeouts) - 1)) * 1000; // timeouts over 1 minute are capped @@ -3657,16 +3599,11 @@ void utp_socket_impl::check_receive_buffers() const { INVARIANT_CHECK; - std::size_t size = 0; + int size = 0; + for (auto const& p : m_receive_buffer) + size += p ? p->size - p->header_size : 0; - for (std::vector::const_iterator i = m_receive_buffer.begin() - , end(m_receive_buffer.end()); i != end; ++i) - { - if (packet const* p = *i) - size += p->size - p->header_size; - } - - TORRENT_ASSERT(int(size) == m_receive_buffer_size); + TORRENT_ASSERT(size == m_receive_buffer_size); } #if TORRENT_USE_INVARIANT_CHECKS diff --git a/test/test_packet_buffer.cpp b/test/test_packet_buffer.cpp index c4bdd9849..25250dc13 100644 --- a/test/test_packet_buffer.cpp +++ b/test/test_packet_buffer.cpp @@ -32,73 +32,86 @@ POSSIBILITY OF SUCH DAMAGE. #include "test.hpp" #include "libtorrent/packet_buffer.hpp" +#include "libtorrent/packet_pool.hpp" using libtorrent::packet_buffer; +using libtorrent::packet_ptr; +using libtorrent::packet_pool; +using libtorrent::packet; + +packet_ptr make_pkt(packet_pool& pool, int const val) +{ + packet_ptr ret = pool.acquire(20); + *reinterpret_cast(ret->buf) = val; + return ret; +} + +int get_val(packet* pkt) +{ + TORRENT_ASSERT(pkt != nullptr); + return *reinterpret_cast(pkt->buf); +} // test packet_buffer TORRENT_TEST(insert) { - packet_buffer pb; - - int a123 = 123; - int a125 = 125; - int a500 = 500; - int a501 = 501; + packet_pool pool; + packet_buffer pb; TEST_EQUAL(pb.capacity(), 0); TEST_EQUAL(pb.size(), 0); TEST_EQUAL(pb.span(), 0); - pb.insert(123, &a123); - TEST_EQUAL(pb.at(123 + 16), 0); + pb.insert(123, make_pkt(pool, 123)); + TEST_CHECK(pb.at(123 + 16) == nullptr); - TEST_CHECK(pb.at(123) == &a123); + TEST_EQUAL(get_val(pb.at(123)), 123); TEST_CHECK(pb.capacity() > 0); TEST_EQUAL(pb.size(), 1); TEST_EQUAL(pb.span(), 1); TEST_EQUAL(pb.cursor(), 123); - pb.insert(125, &a125); + pb.insert(125, make_pkt(pool, 125)); - TEST_CHECK(pb.at(125) == &a125); + TEST_EQUAL(get_val(pb.at(125)), 125); TEST_EQUAL(pb.size(), 2); TEST_EQUAL(pb.span(), 3); TEST_EQUAL(pb.cursor(), 123); - pb.insert(500, &a500); + pb.insert(500, make_pkt(pool, 4)); TEST_EQUAL(pb.size(), 3); TEST_EQUAL(pb.span(), 501 - 123); TEST_EQUAL(pb.capacity(), 512); - pb.insert(500, &a501); + pb.insert(500, make_pkt(pool, 5)); TEST_EQUAL(pb.size(), 3); - pb.insert(500, &a500); + TEST_EQUAL(get_val(pb.insert(500, make_pkt(pool, 4)).get()), 5); TEST_EQUAL(pb.size(), 3); - TEST_CHECK(pb.remove(123) == &a123); + TEST_EQUAL(get_val(pb.remove(123).get()), 123); TEST_EQUAL(pb.size(), 2); TEST_EQUAL(pb.span(), 501 - 125); TEST_EQUAL(pb.cursor(), 125); - TEST_CHECK(pb.remove(125) == &a125); + TEST_EQUAL(get_val(pb.remove(125).get()), 125); TEST_EQUAL(pb.size(), 1); TEST_EQUAL(pb.span(), 1); TEST_EQUAL(pb.cursor(), 500); - TEST_CHECK(pb.remove(500) == &a500); + TEST_EQUAL(get_val(pb.remove(500).get()), 4); TEST_EQUAL(pb.size(), 0); TEST_EQUAL(pb.span(), 0); for (int i = 0; i < 0xff; ++i) { int index = (i + 0xfff0) & 0xffff; - pb.insert(index, reinterpret_cast(size_t(index) + 1)); + pb.insert(index, make_pkt(pool, index + 1)); std::printf("insert: %u (mask: %x)\n", index, int(pb.capacity() - 1)); TEST_EQUAL(pb.capacity(), 512); if (i >= 14) { index = (index - 14) & 0xffff; std::printf("remove: %u\n", index); - TEST_CHECK(pb.remove(index) == reinterpret_cast(size_t(index) + 1)); + TEST_EQUAL(get_val(pb.remove(index).get()), std::uint8_t(index + 1)); TEST_EQUAL(pb.size(), 14); } } @@ -107,60 +120,63 @@ TORRENT_TEST(insert) TORRENT_TEST(wrap) { // test wrapping the indices - packet_buffer pb; + packet_pool pool; + packet_buffer pb; TEST_EQUAL(pb.size(), 0); - pb.insert(0xfffe, (void*)1); - TEST_CHECK(pb.at(0xfffe) == (void*)1); + pb.insert(0xfffe, make_pkt(pool, 1)); + TEST_EQUAL(get_val(pb.at(0xfffe)), 1); - pb.insert(2, (void*)2); - TEST_CHECK(pb.at(2) == (void*)2); + pb.insert(2, make_pkt(pool, 2)); + TEST_EQUAL(get_val(pb.at(2)), 2); pb.remove(0xfffe); - TEST_CHECK(pb.at(0xfffe) == (void*)nullptr); - TEST_CHECK(pb.at(2) == (void*)2); + TEST_CHECK(pb.at(0xfffe) == nullptr); + TEST_EQUAL(get_val(pb.at(2)), 2); } TORRENT_TEST(wrap2) { // test wrapping the indices - packet_buffer pb; + packet_pool pool; + packet_buffer pb; TEST_EQUAL(pb.size(), 0); - pb.insert(0xfff3, (void*)1); - TEST_CHECK(pb.at(0xfff3) == (void*)1); + pb.insert(0xfff3, make_pkt(pool, 1)); + TEST_EQUAL(get_val(pb.at(0xfff3)), 1); int new_index = (0xfff3 + pb.capacity()) & 0xffff; - pb.insert(new_index, (void*)2); - TEST_CHECK(pb.at(new_index) == (void*)2); + pb.insert(new_index, make_pkt(pool, 2)); + TEST_EQUAL(get_val(pb.at(new_index)), 2); - void* old = pb.remove(0xfff3); - TEST_CHECK(old == (void*)1); - TEST_CHECK(pb.at(0xfff3) == (void*)nullptr); - TEST_CHECK(pb.at(new_index) == (void*)2); + packet_ptr old = pb.remove(0xfff3); + TEST_CHECK(get_val(old.get()) == 1); + TEST_CHECK(pb.at(0xfff3) == nullptr); + TEST_EQUAL(get_val(pb.at(new_index)), 2); } TORRENT_TEST(reverse_wrap) { // test wrapping the indices backwards - packet_buffer pb; + packet_pool pool; + packet_buffer pb; TEST_EQUAL(pb.size(), 0); - pb.insert(0xfff3, (void*)1); - TEST_CHECK(pb.at(0xfff3) == (void*)1); + pb.insert(0xfff3, make_pkt(pool, 1)); + TEST_EQUAL(get_val(pb.at(0xfff3)), 1); int new_index = (0xfff3 + pb.capacity()) & 0xffff; - pb.insert(new_index, (void*)2); - TEST_CHECK(pb.at(new_index) == (void*)2); + pb.insert(new_index, make_pkt(pool, 2)); + TEST_EQUAL(get_val(pb.at(new_index)), 2); - void* old = pb.remove(0xfff3); - TEST_CHECK(old == (void*)1); - TEST_CHECK(pb.at(0xfff3) == (void*)nullptr); - TEST_CHECK(pb.at(new_index) == (void*)2); + packet_ptr old = pb.remove(0xfff3); + TEST_CHECK(get_val(old.get()) == 1); + TEST_CHECK(pb.at(0xfff3) == nullptr); + TEST_EQUAL(get_val(pb.at(new_index)), 2); - pb.insert(0xffff, (void*)0xffff); + pb.insert(0xffff, make_pkt(pool, 3)); }