diff --git a/ChangeLog b/ChangeLog index 4eb998f83..4d6e01233 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,4 @@ + * add packets pool allocator * fix last_upload and last_download overflow after 9 hours in past * python binding add more add_torrent_params fields and an invalid key check * introduce introduce distinct types for peer_class_t, piece_index_t and diff --git a/include/libtorrent/Makefile.am b/include/libtorrent/Makefile.am index bff16eda3..11d5de8d3 100644 --- a/include/libtorrent/Makefile.am +++ b/include/libtorrent/Makefile.am @@ -80,6 +80,7 @@ nobase_include_HEADERS = \ netlink.hpp \ operations.hpp \ packet_buffer.hpp \ + packet_pool.hpp \ parse_url.hpp \ part_file.hpp \ pe_crypto.hpp \ diff --git a/include/libtorrent/packet_pool.hpp b/include/libtorrent/packet_pool.hpp new file mode 100644 index 000000000..bd8c57b22 --- /dev/null +++ b/include/libtorrent/packet_pool.hpp @@ -0,0 +1,187 @@ +/* + +Copyright (c) 2005-2017, Arvid Norberg +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in + the documentation and/or other materials provided with the distribution. + * Neither the name of the author nor the names of its + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. + +*/ + +#ifndef TORRENT_PACKET_POOL_HPP +#define TORRENT_PACKET_POOL_HPP + +#include "libtorrent/config.hpp" + +#include "libtorrent/aux_/vector.hpp" + +namespace libtorrent +{ + + // used for out-of-order incoming packets + // as well as sent packets that are waiting to be ACKed + struct packet + { + // the last time this packet was sent + time_point send_time; + + // the number of bytes actually allocated in 'buf' + std::uint16_t allocated; + + // the size of the buffer 'buf' points to + std::uint16_t size; + + // this is the offset to the payload inside the buffer + // this is also used as a cursor to describe where the + // next payload that hasn't been consumed yet starts + std::uint16_t header_size; + + // the number of times this packet has been sent + std::uint8_t num_transmissions:6; + + // true if we need to send this packet again. All + // outstanding packets are marked as needing to be + // resent on timeouts + bool need_resend:1; + + // this is set to true for packets that were + // sent with the DF bit set (Don't Fragment) + bool mtu_probe:1; + + #if TORRENT_USE_ASSERTS + int num_fast_resend; + #endif + + // the actual packet buffer + std::uint8_t buf[1]; + }; + + + struct packet_deleter + { + // deleter for std::unique_ptr + void operator()(packet* p) const + { + p->~packet(); + std::free(p); + } + }; + + inline packet* create_packet(int size) + { + packet* p = static_cast(std::malloc(sizeof(packet) + size)); + new (p) packet(); + return p; + } + + using packet_ptr = std::unique_ptr; + + template + struct TORRENT_EXTRA_EXPORT packet_slab + { + static const int allocate_size{ ALLOCATE_SIZE }; + + explicit packet_slab(std::size_t limit) : m_limit(limit) { m_storage.reserve(m_limit); } + + packet_slab(const packet_slab&) = delete; + + void try_push_back(packet_ptr &p) + { + if (m_storage.size() < m_limit) + m_storage.push_back(std::move(p)); + } + + packet_ptr alloc() + { + if (m_storage.empty()) + return packet_ptr(create_packet(allocate_size)); + else + { + auto ret = std::move(m_storage.back()); + m_storage.pop_back(); + return ret; + } + } + + void flush() { m_storage.clear(); } + private: + const std::size_t m_limit; + aux::vector m_storage; + }; + + // single thread packet allocation packet pool + // can handle common cases of packet size by 3 pools + struct TORRENT_EXTRA_EXPORT packet_pool : private single_threaded + { + packet* acquire(int allocate) + { + TORRENT_ASSERT(is_single_thread()); + TORRENT_ASSERT(allocate >= 0); + TORRENT_ASSERT(allocate <= std::numeric_limits::max()); + + packet_ptr p{ alloc(allocate) }; + p->allocated = static_cast(allocate); + return p.release(); + } + + void release(packet* p) + { + TORRENT_ASSERT(is_single_thread()); + + if (p == nullptr) return; + + packet_ptr pp(p); // takes ownership and may auto free if slab container does not move it + std::size_t allocated = pp->allocated; + + if (allocated <= m_syn_slab.allocate_size) { m_syn_slab.try_push_back(pp); } + else if (allocated <= m_mtu_floor_slab.allocate_size) { m_mtu_floor_slab.try_push_back(pp); } + else if (allocated <= m_mtu_ceiling_slab.allocate_size) { m_mtu_ceiling_slab.try_push_back(pp); } + } + + //TODO: call from utp manager to flush mem + void flush() + { + TORRENT_ASSERT(is_single_thread()); + + m_syn_slab.flush(); + m_mtu_floor_slab.flush(); + m_mtu_ceiling_slab.flush(); + } + + private: + packet_ptr alloc(int allocate) + { + if (allocate <= m_syn_slab.allocate_size) { return m_syn_slab.alloc(); } + else if (allocate <= m_mtu_floor_slab.allocate_size) { return m_mtu_floor_slab.alloc(); } + else if (allocate <= m_mtu_ceiling_slab.allocate_size) { return m_mtu_ceiling_slab.alloc(); } + return packet_ptr(create_packet(allocate)); + } + packet_slab m_syn_slab{ 50 }; + packet_slab<(TORRENT_INET_MIN_MTU - TORRENT_IPV4_HEADER - TORRENT_UDP_HEADER)> m_mtu_floor_slab{ 100 }; + packet_slab<(TORRENT_ETHERNET_MTU - TORRENT_IPV4_HEADER - TORRENT_UDP_HEADER)> m_mtu_ceiling_slab{ 50 }; + }; +} + +#endif // TORRENT_PACKET_POOL_HPP diff --git a/include/libtorrent/utp_socket_manager.hpp b/include/libtorrent/utp_socket_manager.hpp index a7f70faec..9d6dc6a5d 100644 --- a/include/libtorrent/utp_socket_manager.hpp +++ b/include/libtorrent/utp_socket_manager.hpp @@ -41,6 +41,7 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/enum_net.hpp" #include "libtorrent/aux_/session_settings.hpp" #include "libtorrent/span.hpp" +#include "libtorrent/packet_pool.hpp" namespace libtorrent { @@ -119,6 +120,9 @@ namespace libtorrent // the counter is the enum from ``counters``. void inc_stats_counter(int counter, int delta = 1); + packet *acquire_packet(int allocate) { return m_packet_pool.acquire(allocate); } + void release_packet(packet *p) { m_packet_pool.release(p); } + private: // explicitly disallow assignment, to silence msvc warning utp_socket_manager& operator=(utp_socket_manager const&); @@ -188,6 +192,8 @@ namespace libtorrent // this is passed on to the instantiate connection // if this is non-nullptr it will create SSL connections over uTP void* m_ssl_context; + + packet_pool m_packet_pool; }; } diff --git a/src/utp_stream.cpp b/src/utp_stream.cpp index de0a1d92b..0bd46e396 100644 --- a/src/utp_stream.cpp +++ b/src/utp_stream.cpp @@ -172,44 +172,6 @@ TORRENT_EXTRA_EXPORT bool compare_less_wrap(std::uint32_t lhs return dist_up < dist_down; } -// used for out-of-order incoming packets -// as well as sent packets that are waiting to be ACKed -struct packet -{ - // the last time this packet was sent - time_point send_time; - - // the number of bytes actually allocated in 'buf' - std::uint16_t allocated; - - // the size of the buffer 'buf' points to - std::uint16_t size; - - // this is the offset to the payload inside the buffer - // this is also used as a cursor to describe where the - // next payload that hasn't been consumed yet starts - std::uint16_t header_size; - - // the number of times this packet has been sent - std::uint8_t num_transmissions:6; - - // true if we need to send this packet again. All - // outstanding packets are marked as needing to be - // resent on timeouts - bool need_resend:1; - - // this is set to true for packets that were - // sent with the DF bit set (Don't Fragment) - bool mtu_probe:1; - -#if TORRENT_USE_ASSERTS - int num_fast_resend; -#endif - - // the actual packet buffer - std::uint8_t buf[1]; -}; - // since the uTP socket state may be needed after the // utp_stream is closed, it's kept in a separate struct // whose lifetime is not tied to the lifetime of utp_stream @@ -341,6 +303,9 @@ struct utp_socket_impl void set_state(int s); + packet *acquire_packet(int allocate) { return m_sm->acquire_packet(allocate); } + void release_packet(packet *p) { m_sm->release_packet(p); } + private: // non-copyable @@ -1074,7 +1039,7 @@ size_t utp_stream::read_some(bool clear_buffers) // Consumed entire packet if (p->header_size == p->size) { - free(p); + m_impl->release_packet(p); ++pop_packets; *i = nullptr; ++i; @@ -1167,23 +1132,23 @@ utp_socket_impl::~utp_socket_impl() i != end; i = (i + 1) & ACK_MASK) { packet* p = m_inbuf.remove(i); - free(p); + release_packet(p); } for (std::uint16_t i = std::uint16_t(m_outbuf.cursor()), end((m_outbuf.cursor() + m_outbuf.capacity()) & ACK_MASK); i != end; i = (i + 1) & ACK_MASK) { packet* p = m_outbuf.remove(i); - free(p); + release_packet(p); } for (std::vector::iterator i = m_receive_buffer.begin() , end = m_receive_buffer.end(); i != end; ++i) { - free(*i); + release_packet(*i); } - free(m_nagle_packet); + release_packet(m_nagle_packet); m_nagle_packet = nullptr; } @@ -1313,7 +1278,7 @@ void utp_socket_impl::send_syn() m_ack_nr = 0; m_fast_resend_seq_nr = m_seq_nr; - packet* p = static_cast(malloc(sizeof(packet) + sizeof(utp_header))); + packet* p = acquire_packet(sizeof(utp_header)); p->size = sizeof(utp_header); p->header_size = sizeof(utp_header); p->num_transmissions = 0; @@ -1363,7 +1328,7 @@ void utp_socket_impl::send_syn() } else if (ec) { - free(p); + release_packet(p); m_error = ec; set_state(UTP_STATE_ERROR_WAIT); test_socket_state(); @@ -1672,31 +1637,32 @@ void utp_socket_impl::remove_sack_header(packet* p) p->size -= std::uint16_t(sack_size + 2); } -struct holder +struct packet_holder { - explicit holder(char* buf = nullptr): m_buf(buf) {} - ~holder() { free(m_buf); } + explicit packet_holder(utp_socket_manager* sm, packet* p = nullptr) : m_sm(sm), m_packet(p) {} + ~packet_holder() { m_sm->release_packet(m_packet); } - void reset(char* buf) + void reset(packet* p) { - free(m_buf); - m_buf = buf; + m_sm->release_packet(m_packet); + m_packet = p; } - char* release() + packet* release() { - char* ret = m_buf; - m_buf = nullptr; + packet* ret = m_packet; + m_packet = nullptr; return ret; } private: // not copyable - holder(holder const&); - holder& operator=(holder const&); + packet_holder(packet_holder const&); + packet_holder& operator=(packet_holder const&); - char* m_buf; + utp_socket_manager* m_sm; + packet* m_packet; }; // sends a packet, pulls data from the write buffer (if there's any) @@ -1838,7 +1804,7 @@ bool utp_socket_impl::send_pkt(int const flags) // used to free the packet buffer in case we exit the // function early - holder buf_holder; + packet_holder holder(m_sm); // payload size being zero means we're just sending // an force. We should not pick up the nagle packet @@ -1848,9 +1814,8 @@ bool utp_socket_impl::send_pkt(int const flags) // need to keep the packet around (in the outbuf) if (payload_size) { - p = static_cast(std::malloc(sizeof(packet) + effective_mtu)); - p->allocated = std::uint16_t(effective_mtu); - buf_holder.reset(reinterpret_cast(p)); + p = acquire_packet(effective_mtu); + holder.reset(p); m_sm->inc_stats_counter(counters::utp_payload_pkts_out); } @@ -1980,7 +1945,7 @@ bool utp_socket_impl::send_pkt(int const flags) TORRENT_ASSERT(m_nagle_packet == nullptr); TORRENT_ASSERT(h->seq_nr == m_seq_nr); m_nagle_packet = p; - buf_holder.release(); + holder.release(); return false; } @@ -2104,13 +2069,13 @@ bool utp_socket_impl::send_pkt(int const flags) // release the buffer, we're saving it in the circular // buffer of outgoing packets - buf_holder.release(); + holder.release(); packet* old = m_outbuf.insert(m_seq_nr, p); if (old) { // TORRENT_ASSERT(reinterpret_cast(old->buf)->seq_nr == m_seq_nr); if (!old->need_resend) m_bytes_in_flight -= old->size - old->header_size; - free(old); + release_packet(old); } TORRENT_ASSERT(h->seq_nr == m_seq_nr); m_seq_nr = (m_seq_nr + 1) & ACK_MASK; @@ -2393,7 +2358,7 @@ void utp_socket_impl::ack_packet(packet* p, time_point const& receive_time m_rtt.add_sample(rtt / 1000); if (rtt < min_rtt) min_rtt = rtt; - free(p); + release_packet(p); } void utp_socket_impl::incoming(std::uint8_t const* buf, int size, packet* p @@ -2433,7 +2398,7 @@ void utp_socket_impl::incoming(std::uint8_t const* buf, int size, packet* p if (size == 0) { TORRENT_ASSERT(p == nullptr || p->header_size == p->size); - free(p); + release_packet(p); return; } } @@ -2443,7 +2408,7 @@ void utp_socket_impl::incoming(std::uint8_t const* buf, int size, packet* p if (!p) { TORRENT_ASSERT(buf); - p = static_cast(std::malloc(sizeof(packet) + size)); + p = acquire_packet(size); p->size = std::uint16_t(size); p->header_size = 0; std::memcpy(p->buf, buf, size); @@ -2579,7 +2544,7 @@ bool utp_socket_impl::consume_incoming_data( } // we don't need to save the packet header, just the payload - packet* p = static_cast(malloc(sizeof(packet) + payload_size)); + packet* p = acquire_packet(payload_size); p->size = std::uint16_t(payload_size); p->header_size = 0; p->num_transmissions = 0;