packet allocation pool (#1600)

packet allocation pool:
- no more direct malloc/free with cast
- reuse allocated memory for 3 common allocation case (with limit)
- access to packet_pool through utp_socket_manager instance
This commit is contained in:
Andrei Kurushin 2017-02-10 16:25:03 +03:00 committed by Arvid Norberg
parent c33081b075
commit 76f91c3fc9
5 changed files with 228 additions and 68 deletions

View File

@ -1,3 +1,4 @@
* add packets pool allocator
* fix last_upload and last_download overflow after 9 hours in past
* python binding add more add_torrent_params fields and an invalid key check
* introduce introduce distinct types for peer_class_t, piece_index_t and

View File

@ -80,6 +80,7 @@ nobase_include_HEADERS = \
netlink.hpp \
operations.hpp \
packet_buffer.hpp \
packet_pool.hpp \
parse_url.hpp \
part_file.hpp \
pe_crypto.hpp \

View File

@ -0,0 +1,187 @@
/*
Copyright (c) 2005-2017, Arvid Norberg
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in
the documentation and/or other materials provided with the distribution.
* Neither the name of the author nor the names of its
contributors may be used to endorse or promote products derived
from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef TORRENT_PACKET_POOL_HPP
#define TORRENT_PACKET_POOL_HPP
#include "libtorrent/config.hpp"
#include "libtorrent/aux_/vector.hpp"
namespace libtorrent
{
// used for out-of-order incoming packets
// as well as sent packets that are waiting to be ACKed
struct packet
{
// the last time this packet was sent
time_point send_time;
// the number of bytes actually allocated in 'buf'
std::uint16_t allocated;
// the size of the buffer 'buf' points to
std::uint16_t size;
// this is the offset to the payload inside the buffer
// this is also used as a cursor to describe where the
// next payload that hasn't been consumed yet starts
std::uint16_t header_size;
// the number of times this packet has been sent
std::uint8_t num_transmissions:6;
// true if we need to send this packet again. All
// outstanding packets are marked as needing to be
// resent on timeouts
bool need_resend:1;
// this is set to true for packets that were
// sent with the DF bit set (Don't Fragment)
bool mtu_probe:1;
#if TORRENT_USE_ASSERTS
int num_fast_resend;
#endif
// the actual packet buffer
std::uint8_t buf[1];
};
struct packet_deleter
{
// deleter for std::unique_ptr
void operator()(packet* p) const
{
p->~packet();
std::free(p);
}
};
inline packet* create_packet(int size)
{
packet* p = static_cast<packet*>(std::malloc(sizeof(packet) + size));
new (p) packet();
return p;
}
using packet_ptr = std::unique_ptr<packet, packet_deleter>;
template<int ALLOCATE_SIZE>
struct TORRENT_EXTRA_EXPORT packet_slab
{
static const int allocate_size{ ALLOCATE_SIZE };
explicit packet_slab(std::size_t limit) : m_limit(limit) { m_storage.reserve(m_limit); }
packet_slab(const packet_slab&) = delete;
void try_push_back(packet_ptr &p)
{
if (m_storage.size() < m_limit)
m_storage.push_back(std::move(p));
}
packet_ptr alloc()
{
if (m_storage.empty())
return packet_ptr(create_packet(allocate_size));
else
{
auto ret = std::move(m_storage.back());
m_storage.pop_back();
return ret;
}
}
void flush() { m_storage.clear(); }
private:
const std::size_t m_limit;
aux::vector<packet_ptr, std::size_t> m_storage;
};
// single thread packet allocation packet pool
// can handle common cases of packet size by 3 pools
struct TORRENT_EXTRA_EXPORT packet_pool : private single_threaded
{
packet* acquire(int allocate)
{
TORRENT_ASSERT(is_single_thread());
TORRENT_ASSERT(allocate >= 0);
TORRENT_ASSERT(allocate <= std::numeric_limits<std::uint16_t>::max());
packet_ptr p{ alloc(allocate) };
p->allocated = static_cast<std::uint16_t>(allocate);
return p.release();
}
void release(packet* p)
{
TORRENT_ASSERT(is_single_thread());
if (p == nullptr) return;
packet_ptr pp(p); // takes ownership and may auto free if slab container does not move it
std::size_t allocated = pp->allocated;
if (allocated <= m_syn_slab.allocate_size) { m_syn_slab.try_push_back(pp); }
else if (allocated <= m_mtu_floor_slab.allocate_size) { m_mtu_floor_slab.try_push_back(pp); }
else if (allocated <= m_mtu_ceiling_slab.allocate_size) { m_mtu_ceiling_slab.try_push_back(pp); }
}
//TODO: call from utp manager to flush mem
void flush()
{
TORRENT_ASSERT(is_single_thread());
m_syn_slab.flush();
m_mtu_floor_slab.flush();
m_mtu_ceiling_slab.flush();
}
private:
packet_ptr alloc(int allocate)
{
if (allocate <= m_syn_slab.allocate_size) { return m_syn_slab.alloc(); }
else if (allocate <= m_mtu_floor_slab.allocate_size) { return m_mtu_floor_slab.alloc(); }
else if (allocate <= m_mtu_ceiling_slab.allocate_size) { return m_mtu_ceiling_slab.alloc(); }
return packet_ptr(create_packet(allocate));
}
packet_slab<sizeof(utp_header)> m_syn_slab{ 50 };
packet_slab<(TORRENT_INET_MIN_MTU - TORRENT_IPV4_HEADER - TORRENT_UDP_HEADER)> m_mtu_floor_slab{ 100 };
packet_slab<(TORRENT_ETHERNET_MTU - TORRENT_IPV4_HEADER - TORRENT_UDP_HEADER)> m_mtu_ceiling_slab{ 50 };
};
}
#endif // TORRENT_PACKET_POOL_HPP

View File

@ -41,6 +41,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/enum_net.hpp"
#include "libtorrent/aux_/session_settings.hpp"
#include "libtorrent/span.hpp"
#include "libtorrent/packet_pool.hpp"
namespace libtorrent
{
@ -119,6 +120,9 @@ namespace libtorrent
// the counter is the enum from ``counters``.
void inc_stats_counter(int counter, int delta = 1);
packet *acquire_packet(int allocate) { return m_packet_pool.acquire(allocate); }
void release_packet(packet *p) { m_packet_pool.release(p); }
private:
// explicitly disallow assignment, to silence msvc warning
utp_socket_manager& operator=(utp_socket_manager const&);
@ -188,6 +192,8 @@ namespace libtorrent
// this is passed on to the instantiate connection
// if this is non-nullptr it will create SSL connections over uTP
void* m_ssl_context;
packet_pool m_packet_pool;
};
}

View File

@ -172,44 +172,6 @@ TORRENT_EXTRA_EXPORT bool compare_less_wrap(std::uint32_t lhs
return dist_up < dist_down;
}
// used for out-of-order incoming packets
// as well as sent packets that are waiting to be ACKed
struct packet
{
// the last time this packet was sent
time_point send_time;
// the number of bytes actually allocated in 'buf'
std::uint16_t allocated;
// the size of the buffer 'buf' points to
std::uint16_t size;
// this is the offset to the payload inside the buffer
// this is also used as a cursor to describe where the
// next payload that hasn't been consumed yet starts
std::uint16_t header_size;
// the number of times this packet has been sent
std::uint8_t num_transmissions:6;
// true if we need to send this packet again. All
// outstanding packets are marked as needing to be
// resent on timeouts
bool need_resend:1;
// this is set to true for packets that were
// sent with the DF bit set (Don't Fragment)
bool mtu_probe:1;
#if TORRENT_USE_ASSERTS
int num_fast_resend;
#endif
// the actual packet buffer
std::uint8_t buf[1];
};
// since the uTP socket state may be needed after the
// utp_stream is closed, it's kept in a separate struct
// whose lifetime is not tied to the lifetime of utp_stream
@ -341,6 +303,9 @@ struct utp_socket_impl
void set_state(int s);
packet *acquire_packet(int allocate) { return m_sm->acquire_packet(allocate); }
void release_packet(packet *p) { m_sm->release_packet(p); }
private:
// non-copyable
@ -1074,7 +1039,7 @@ size_t utp_stream::read_some(bool clear_buffers)
// Consumed entire packet
if (p->header_size == p->size)
{
free(p);
m_impl->release_packet(p);
++pop_packets;
*i = nullptr;
++i;
@ -1167,23 +1132,23 @@ utp_socket_impl::~utp_socket_impl()
i != end; i = (i + 1) & ACK_MASK)
{
packet* p = m_inbuf.remove(i);
free(p);
release_packet(p);
}
for (std::uint16_t i = std::uint16_t(m_outbuf.cursor()), end((m_outbuf.cursor()
+ m_outbuf.capacity()) & ACK_MASK);
i != end; i = (i + 1) & ACK_MASK)
{
packet* p = m_outbuf.remove(i);
free(p);
release_packet(p);
}
for (std::vector<packet*>::iterator i = m_receive_buffer.begin()
, end = m_receive_buffer.end(); i != end; ++i)
{
free(*i);
release_packet(*i);
}
free(m_nagle_packet);
release_packet(m_nagle_packet);
m_nagle_packet = nullptr;
}
@ -1313,7 +1278,7 @@ void utp_socket_impl::send_syn()
m_ack_nr = 0;
m_fast_resend_seq_nr = m_seq_nr;
packet* p = static_cast<packet*>(malloc(sizeof(packet) + sizeof(utp_header)));
packet* p = acquire_packet(sizeof(utp_header));
p->size = sizeof(utp_header);
p->header_size = sizeof(utp_header);
p->num_transmissions = 0;
@ -1363,7 +1328,7 @@ void utp_socket_impl::send_syn()
}
else if (ec)
{
free(p);
release_packet(p);
m_error = ec;
set_state(UTP_STATE_ERROR_WAIT);
test_socket_state();
@ -1672,31 +1637,32 @@ void utp_socket_impl::remove_sack_header(packet* p)
p->size -= std::uint16_t(sack_size + 2);
}
struct holder
struct packet_holder
{
explicit holder(char* buf = nullptr): m_buf(buf) {}
~holder() { free(m_buf); }
explicit packet_holder(utp_socket_manager* sm, packet* p = nullptr) : m_sm(sm), m_packet(p) {}
~packet_holder() { m_sm->release_packet(m_packet); }
void reset(char* buf)
void reset(packet* p)
{
free(m_buf);
m_buf = buf;
m_sm->release_packet(m_packet);
m_packet = p;
}
char* release()
packet* release()
{
char* ret = m_buf;
m_buf = nullptr;
packet* ret = m_packet;
m_packet = nullptr;
return ret;
}
private:
// not copyable
holder(holder const&);
holder& operator=(holder const&);
packet_holder(packet_holder const&);
packet_holder& operator=(packet_holder const&);
char* m_buf;
utp_socket_manager* m_sm;
packet* m_packet;
};
// sends a packet, pulls data from the write buffer (if there's any)
@ -1838,7 +1804,7 @@ bool utp_socket_impl::send_pkt(int const flags)
// used to free the packet buffer in case we exit the
// function early
holder buf_holder;
packet_holder holder(m_sm);
// payload size being zero means we're just sending
// an force. We should not pick up the nagle packet
@ -1848,9 +1814,8 @@ bool utp_socket_impl::send_pkt(int const flags)
// need to keep the packet around (in the outbuf)
if (payload_size)
{
p = static_cast<packet*>(std::malloc(sizeof(packet) + effective_mtu));
p->allocated = std::uint16_t(effective_mtu);
buf_holder.reset(reinterpret_cast<char*>(p));
p = acquire_packet(effective_mtu);
holder.reset(p);
m_sm->inc_stats_counter(counters::utp_payload_pkts_out);
}
@ -1980,7 +1945,7 @@ bool utp_socket_impl::send_pkt(int const flags)
TORRENT_ASSERT(m_nagle_packet == nullptr);
TORRENT_ASSERT(h->seq_nr == m_seq_nr);
m_nagle_packet = p;
buf_holder.release();
holder.release();
return false;
}
@ -2104,13 +2069,13 @@ bool utp_socket_impl::send_pkt(int const flags)
// release the buffer, we're saving it in the circular
// buffer of outgoing packets
buf_holder.release();
holder.release();
packet* old = m_outbuf.insert(m_seq_nr, p);
if (old)
{
// TORRENT_ASSERT(reinterpret_cast<utp_header*>(old->buf)->seq_nr == m_seq_nr);
if (!old->need_resend) m_bytes_in_flight -= old->size - old->header_size;
free(old);
release_packet(old);
}
TORRENT_ASSERT(h->seq_nr == m_seq_nr);
m_seq_nr = (m_seq_nr + 1) & ACK_MASK;
@ -2393,7 +2358,7 @@ void utp_socket_impl::ack_packet(packet* p, time_point const& receive_time
m_rtt.add_sample(rtt / 1000);
if (rtt < min_rtt) min_rtt = rtt;
free(p);
release_packet(p);
}
void utp_socket_impl::incoming(std::uint8_t const* buf, int size, packet* p
@ -2433,7 +2398,7 @@ void utp_socket_impl::incoming(std::uint8_t const* buf, int size, packet* p
if (size == 0)
{
TORRENT_ASSERT(p == nullptr || p->header_size == p->size);
free(p);
release_packet(p);
return;
}
}
@ -2443,7 +2408,7 @@ void utp_socket_impl::incoming(std::uint8_t const* buf, int size, packet* p
if (!p)
{
TORRENT_ASSERT(buf);
p = static_cast<packet*>(std::malloc(sizeof(packet) + size));
p = acquire_packet(size);
p->size = std::uint16_t(size);
p->header_size = 0;
std::memcpy(p->buf, buf, size);
@ -2579,7 +2544,7 @@ bool utp_socket_impl::consume_incoming_data(
}
// we don't need to save the packet header, just the payload
packet* p = static_cast<packet*>(malloc(sizeof(packet) + payload_size));
packet* p = acquire_packet(payload_size);
p->size = std::uint16_t(payload_size);
p->header_size = 0;
p->num_transmissions = 0;