use unique_ptr for packets in utp_stream

This commit is contained in:
arvidn 2017-02-11 14:21:48 -05:00 committed by Arvid Norberg
parent 4a80e2667c
commit ee2688b83a
7 changed files with 199 additions and 275 deletions

View File

@ -35,11 +35,15 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/config.hpp"
#include "libtorrent/aux_/unique_ptr.hpp"
#include "libtorrent/packet_pool.hpp" // for packet_ptr/packet_deleter
#include <cstdint>
#include <cstddef>
#include <memory> // for unique_ptr
namespace libtorrent
{
struct packet;
// this is a circular buffer that automatically resizes
// itself as elements are inserted. Elements are indexed
// by integers and are assumed to be sequential. Unless the
@ -67,12 +71,12 @@ namespace libtorrent
// whenever the element at the cursor is removed, the
// cursor is bumped to the next occupied element
class TORRENT_EXTRA_EXPORT packet_buffer_impl
class TORRENT_EXTRA_EXPORT packet_buffer
{
public:
typedef std::uint32_t index_type;
using index_type = std::uint32_t;
void* insert(index_type idx, void* value);
packet_ptr insert(index_type idx, packet_ptr value);
int size() const
{ return m_size; }
@ -80,9 +84,9 @@ namespace libtorrent
int capacity() const
{ return m_capacity; }
void* at(index_type idx) const;
packet* at(index_type idx) const;
void* remove(index_type idx);
packet_ptr remove(index_type idx);
void reserve(int size);
@ -95,7 +99,7 @@ namespace libtorrent
#endif
private:
aux::unique_ptr<void*[], index_type> m_storage;
aux::unique_ptr<packet_ptr[], index_type> m_storage;
int m_capacity = 0;
// this is the total number of elements that are occupied
@ -108,30 +112,6 @@ namespace libtorrent
index_type m_last{0};
};
template <typename T>
class packet_buffer : packet_buffer_impl
{
public:
using packet_buffer_impl::index_type;
using packet_buffer_impl::size;
using packet_buffer_impl::capacity;
using packet_buffer_impl::reserve;
using packet_buffer_impl::cursor;
using packet_buffer_impl::span;
T* insert(index_type i, T* p)
{
return static_cast<T*>(packet_buffer_impl::insert(i, p));
}
T* at(index_type idx) const
{ return static_cast<T*>(packet_buffer_impl::at(idx)); }
T* remove(index_type idx)
{ return static_cast<T*>(packet_buffer_impl::remove(idx)); }
};
}
#endif // TORRENT_PACKET_BUFFER_HPP_INCLUDED

View File

@ -38,10 +38,23 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/aux_/throw.hpp"
#include "libtorrent/aux_/vector.hpp"
#include "libtorrent/aux_/numeric_cast.hpp"
#include "libtorrent/time.hpp"
#include "libtorrent/assert.hpp"
#include "libtorrent/debug.hpp" // for single_threaded
namespace libtorrent
{
// internal: some MTU and protocol header sizes constants
constexpr int TORRENT_IPV4_HEADER = 20;
constexpr int TORRENT_IPV6_HEADER = 40;
constexpr int TORRENT_UDP_HEADER = 8;
constexpr int TORRENT_UTP_HEADER = 20;
constexpr int TORRENT_SOCKS5_HEADER = 6; // plus the size of the destination address
constexpr int TORRENT_ETHERNET_MTU = 1500;
constexpr int TORRENT_TEREDO_MTU = 1280;
constexpr int TORRENT_INET_MIN_MTU = 576;
constexpr int TORRENT_INET_MAX_MTU = 0xffff;
// used for out-of-order incoming packets
// as well as sent packets that are waiting to be ACKed
@ -81,7 +94,6 @@ namespace libtorrent
std::uint8_t buf[1];
};
struct packet_deleter
{
// deleter for std::unique_ptr
@ -93,22 +105,22 @@ namespace libtorrent
}
};
inline packet* create_packet(int const size)
using packet_ptr = std::unique_ptr<packet, packet_deleter>;
inline packet_ptr create_packet(int const size)
{
packet* p = static_cast<packet*>(std::malloc(sizeof(packet) + size));
if (p == nullptr) aux::throw_ex<std::bad_alloc>();
new (p) packet();
p->allocated = aux::numeric_cast<std::uint16_t>(size);
return p;
return packet_ptr(p);
}
using packet_ptr = std::unique_ptr<packet, packet_deleter>;
struct TORRENT_EXTRA_EXPORT packet_slab
{
int const allocate_size;
explicit packet_slab(std::size_t const limit, int const alloc_size)
explicit packet_slab(int const alloc_size, std::size_t const limit = 10)
: allocate_size(alloc_size)
, m_limit(limit)
{
@ -125,7 +137,7 @@ namespace libtorrent
packet_ptr alloc()
{
if (m_storage.empty()) return packet_ptr(create_packet(allocate_size));
if (m_storage.empty()) return create_packet(allocate_size);
auto ret = std::move(m_storage.back());
m_storage.pop_back();
return ret;
@ -139,35 +151,33 @@ namespace libtorrent
private:
const std::size_t m_limit;
aux::vector<packet_ptr, std::size_t> m_storage;
std::vector<packet_ptr> m_storage;
};
// single thread packet allocation packet pool
// can handle common cases of packet size by 3 pools
struct TORRENT_EXTRA_EXPORT packet_pool : private single_threaded
{
packet* acquire(int allocate)
packet_ptr acquire(int const allocate)
{
TORRENT_ASSERT(is_single_thread());
TORRENT_ASSERT(allocate >= 0);
TORRENT_ASSERT(allocate <= std::numeric_limits<std::uint16_t>::max());
packet_ptr p{ alloc(allocate) };
return p.release();
return alloc(allocate);
}
void release(packet* p)
void release(packet_ptr p)
{
TORRENT_ASSERT(is_single_thread());
if (p == nullptr) return;
if (p.get() == nullptr) return;
packet_ptr pp(p); // takes ownership and may auto free if slab container does not move it
int const allocated = pp->allocated;
int const allocated = p->allocated;
if (allocated == m_syn_slab.allocate_size) { m_syn_slab.try_push_back(pp); }
else if (allocated == m_mtu_floor_slab.allocate_size) { m_mtu_floor_slab.try_push_back(pp); }
else if (allocated == m_mtu_ceiling_slab.allocate_size) { m_mtu_ceiling_slab.try_push_back(pp); }
if (allocated == m_syn_slab.allocate_size) { m_syn_slab.try_push_back(p); }
else if (allocated == m_mtu_floor_slab.allocate_size) { m_mtu_floor_slab.try_push_back(p); }
else if (allocated == m_mtu_ceiling_slab.allocate_size) { m_mtu_ceiling_slab.try_push_back(p); }
}
// periodically free up some of the cached packets
@ -186,13 +196,13 @@ namespace libtorrent
if (allocate <= m_syn_slab.allocate_size) { return m_syn_slab.alloc(); }
else if (allocate <= m_mtu_floor_slab.allocate_size) { return m_mtu_floor_slab.alloc(); }
else if (allocate <= m_mtu_ceiling_slab.allocate_size) { return m_mtu_ceiling_slab.alloc(); }
return packet_ptr(create_packet(allocate));
return create_packet(allocate);
}
static int const mtu_floor_size = TORRENT_INET_MIN_MTU - TORRENT_IPV4_HEADER - TORRENT_UDP_HEADER;
static int const mtu_ceiling_size = TORRENT_ETHERNET_MTU - TORRENT_IPV4_HEADER - TORRENT_UDP_HEADER;
packet_slab m_syn_slab{ 10, sizeof(utp_header) };
packet_slab m_mtu_floor_slab{ 10, mtu_floor_size };
packet_slab m_mtu_ceiling_slab{ 10 , mtu_ceiling_size };
packet_slab m_syn_slab{ TORRENT_UTP_HEADER };
packet_slab m_mtu_floor_slab{ mtu_floor_size };
packet_slab m_mtu_ceiling_slab{ mtu_ceiling_size };
};
}

View File

@ -120,8 +120,8 @@ namespace libtorrent
// the counter is the enum from ``counters``.
void inc_stats_counter(int counter, int delta = 1);
packet *acquire_packet(int allocate) { return m_packet_pool.acquire(allocate); }
void release_packet(packet *p) { m_packet_pool.release(p); }
packet_ptr acquire_packet(int const allocate) { return m_packet_pool.acquire(allocate); }
void release_packet(packet_ptr p) { m_packet_pool.release(std::move(p)); }
void decay() { m_packet_pool.decay(); }
private:

View File

@ -73,20 +73,6 @@ namespace libtorrent
struct utp_socket_manager;
// internal: some MTU and protocol header sizes constants
enum
{
TORRENT_IPV4_HEADER = 20,
TORRENT_IPV6_HEADER = 40,
TORRENT_UDP_HEADER = 8,
TORRENT_SOCKS5_HEADER = 6, // plus the size of the destination address
TORRENT_ETHERNET_MTU = 1500,
TORRENT_TEREDO_MTU = 1280,
TORRENT_INET_MIN_MTU = 576,
TORRENT_INET_MAX_MTU = 0xffff
};
// internal: the point of the bif_endian_int is two-fold
// one purpose is to not have any alignment requirements
// so that any buffer received from the network can be cast

View File

@ -42,7 +42,7 @@ namespace libtorrent {
, std::uint32_t mask);
#if TORRENT_USE_INVARIANT_CHECKS
void packet_buffer_impl::check_invariant() const
void packet_buffer::check_invariant() const
{
int count = 0;
for (int i = 0; i < int(m_capacity); ++i)
@ -53,7 +53,7 @@ namespace libtorrent {
}
#endif
void* packet_buffer_impl::insert(index_type idx, void* value)
packet_ptr packet_buffer::insert(index_type idx, packet_ptr value)
{
INVARIANT_CHECK;
@ -61,7 +61,7 @@ namespace libtorrent {
// you're not allowed to insert NULLs!
TORRENT_ASSERT(value);
if (value == nullptr) return remove(idx);
if (!value) return remove(idx);
if (m_size != 0)
{
@ -108,34 +108,32 @@ namespace libtorrent {
if (m_capacity == 0) reserve(16);
void* old_value = m_storage[idx & (m_capacity - 1)];
m_storage[idx & (m_capacity - 1)] = value;
packet_ptr old_value = std::move(m_storage[idx & (m_capacity - 1)]);
m_storage[idx & (m_capacity - 1)] = std::move(value);
if (m_size == 0) m_first = idx;
// if we're just replacing an old value, the number
// of elements in the buffer doesn't actually increase
if (old_value == nullptr) ++m_size;
if (!old_value) ++m_size;
TORRENT_ASSERT_VAL(m_first <= 0xffff, m_first);
return old_value;
}
void* packet_buffer_impl::at(index_type idx) const
packet* packet_buffer::at(index_type idx) const
{
INVARIANT_CHECK;
if (idx >= m_first + m_capacity)
return nullptr;
if (compare_less_wrap(idx, m_first, 0xffff))
{
return nullptr;
}
std::size_t const mask = m_capacity - 1;
return m_storage[idx & mask];
return m_storage[idx & mask].get();
}
void packet_buffer_impl::reserve(int size)
void packet_buffer::reserve(int size)
{
INVARIANT_CHECK;
TORRENT_ASSERT_VAL(size <= 0xffff, size);
@ -144,31 +142,28 @@ namespace libtorrent {
while (new_size < size)
new_size <<= 1;
aux::unique_ptr<void*[], index_type> new_storage(new void*[new_size]);
for (index_type i = 0; i < std::uint32_t(new_size); ++i)
new_storage[i] = nullptr;
aux::unique_ptr<packet_ptr[], index_type> new_storage(new packet_ptr[new_size]);
for (index_type i = m_first; i < (m_first + m_capacity); ++i)
new_storage[i & (new_size - 1)] = m_storage[i & (m_capacity - 1)];
new_storage[i & (new_size - 1)] = std::move(m_storage[i & (m_capacity - 1)]);
m_storage = std::move(new_storage);
m_capacity = new_size;
}
void* packet_buffer_impl::remove(index_type idx)
packet_ptr packet_buffer::remove(index_type idx)
{
INVARIANT_CHECK;
// TODO: use compare_less_wrap for this comparison as well
if (idx >= m_first + m_capacity)
return nullptr;
return packet_ptr();
if (compare_less_wrap(idx, m_first, 0xffff))
return nullptr;
return packet_ptr();
std::size_t const mask = m_capacity - 1;
void* old_value = m_storage[idx & mask];
m_storage[idx & mask] = nullptr;
packet_ptr old_value = std::move(m_storage[idx & mask]);
m_storage[idx & mask].reset();
if (old_value)
{
@ -196,5 +191,5 @@ namespace libtorrent {
TORRENT_ASSERT_VAL(m_first <= 0xffff, m_first);
return old_value;
}
}

View File

@ -286,10 +286,10 @@ struct utp_socket_impl
void parse_close_reason(std::uint8_t const* ptr, int size);
void write_payload(std::uint8_t* ptr, int size);
void maybe_inc_acked_seq_nr();
void ack_packet(packet* p, time_point const& receive_time
void ack_packet(packet_ptr p, time_point const& receive_time
, std::uint32_t& min_rtt, std::uint16_t seq_nr);
void write_sack(std::uint8_t* buf, int size) const;
void incoming(std::uint8_t const* buf, int size, packet* p, time_point now);
void incoming(std::uint8_t const* buf, int size, packet_ptr p, time_point now);
void do_ledbat(int acked_bytes, int delay, int in_flight);
int packet_timeout() const;
bool test_socket_state();
@ -303,8 +303,8 @@ struct utp_socket_impl
void set_state(int s);
packet *acquire_packet(int allocate) { return m_sm->acquire_packet(allocate); }
void release_packet(packet *p) { m_sm->release_packet(p); }
packet_ptr acquire_packet(int const allocate) { return m_sm->acquire_packet(allocate); }
void release_packet(packet_ptr p) { m_sm->release_packet(std::move(p)); }
private:
@ -359,7 +359,7 @@ public:
// if this is non nullptr, it's a packet. This packet was held off because
// of NAGLE. We couldn't send it immediately. It's left
// here to accrue more bytes before we send it.
packet* m_nagle_packet = nullptr;
packet_ptr m_nagle_packet;
// the user provided read buffer. If this has a size greater
// than 0, we'll always prefer using it over putting received
@ -372,7 +372,7 @@ public:
// packets we've received without a read operation
// active. Store them here until the client triggers
// an async_read_some
std::vector<packet*> m_receive_buffer;
std::vector<packet_ptr> m_receive_buffer;
// this is the error on this socket. If m_state is
// set to UTP_STATE_ERROR_WAIT, this error should be
@ -391,8 +391,8 @@ public:
// the send and receive buffers
// maps packet sequence numbers
packet_buffer<packet> m_inbuf;
packet_buffer<packet> m_outbuf;
packet_buffer m_inbuf;
packet_buffer m_outbuf;
// the time when the last packet we sent times out. Including re-sends.
// if we ever end up not having sent anything in one second (
@ -1005,7 +1005,7 @@ size_t utp_stream::read_some(bool clear_buffers)
size_t ret = 0;
int pop_packets = 0;
for (std::vector<packet*>::iterator i = m_impl->m_receive_buffer.begin()
for (auto i = m_impl->m_receive_buffer.begin()
, end(m_impl->m_receive_buffer.end()); i != end;)
{
if (target == m_impl->m_read_buffer.end())
@ -1018,8 +1018,8 @@ size_t utp_stream::read_some(bool clear_buffers)
m_impl->check_receive_buffers();
packet* p = *i;
int to_copy = (std::min)(p->size - p->header_size, int(target->len));
packet* p = i->get();
int to_copy = std::min(p->size - p->header_size, int(target->len));
TORRENT_ASSERT(to_copy >= 0);
memcpy(target->buf, p->buf + p->header_size, to_copy);
ret += to_copy;
@ -1039,9 +1039,9 @@ size_t utp_stream::read_some(bool clear_buffers)
// Consumed entire packet
if (p->header_size == p->size)
{
m_impl->release_packet(p);
m_impl->release_packet(std::move(*i));
i->reset();
++pop_packets;
*i = nullptr;
++i;
}
@ -1131,25 +1131,22 @@ utp_socket_impl::~utp_socket_impl()
+ m_inbuf.capacity()) & ACK_MASK);
i != end; i = (i + 1) & ACK_MASK)
{
packet* p = m_inbuf.remove(i);
release_packet(p);
packet_ptr p = m_inbuf.remove(i);
release_packet(std::move(p));
}
for (std::uint16_t i = std::uint16_t(m_outbuf.cursor()), end((m_outbuf.cursor()
+ m_outbuf.capacity()) & ACK_MASK);
i != end; i = (i + 1) & ACK_MASK)
{
packet* p = m_outbuf.remove(i);
release_packet(p);
packet_ptr p = m_outbuf.remove(i);
release_packet(std::move(p));
}
for (std::vector<packet*>::iterator i = m_receive_buffer.begin()
, end = m_receive_buffer.end(); i != end; ++i)
{
release_packet(*i);
}
for (auto& p : m_receive_buffer)
release_packet(std::move(p));
release_packet(m_nagle_packet);
m_nagle_packet = nullptr;
release_packet(std::move(m_nagle_packet));
m_nagle_packet.reset();
}
bool utp_socket_impl::should_delete() const
@ -1278,7 +1275,7 @@ void utp_socket_impl::send_syn()
m_ack_nr = 0;
m_fast_resend_seq_nr = m_seq_nr;
packet* p = acquire_packet(sizeof(utp_header));
packet_ptr p = acquire_packet(sizeof(utp_header));
p->size = sizeof(utp_header);
p->header_size = sizeof(utp_header);
p->num_transmissions = 0;
@ -1328,7 +1325,7 @@ void utp_socket_impl::send_syn()
}
else if (ec)
{
release_packet(p);
release_packet(std::move(p));
m_error = ec;
set_state(UTP_STATE_ERROR_WAIT);
test_socket_state();
@ -1339,9 +1336,9 @@ void utp_socket_impl::send_syn()
++p->num_transmissions;
TORRENT_ASSERT(!m_outbuf.at(m_seq_nr));
m_outbuf.insert(m_seq_nr, p);
TORRENT_ASSERT(h->seq_nr == m_seq_nr);
TORRENT_ASSERT(p->buf == reinterpret_cast<std::uint8_t*>(h));
m_outbuf.insert(m_seq_nr, std::move(p));
m_seq_nr = (m_seq_nr + 1) & ACK_MASK;
@ -1484,14 +1481,14 @@ void utp_socket_impl::parse_sack(std::uint16_t packet_ack, std::uint8_t const* p
if (compare_less_wrap(m_fast_resend_seq_nr, ack_nr, ACK_MASK)) ++dups;
// this bit was set, ack_nr was received
packet* p = m_outbuf.remove(ack_nr);
packet_ptr p = m_outbuf.remove(ack_nr);
if (p)
{
*acked_bytes += p->size - p->header_size;
// each ACKed packet counts as a duplicate ack
UTP_LOGV("%8p: duplicate_acks:%u fast_resend_seq_nr:%u\n"
, static_cast<void*>(this), m_duplicate_acks, m_fast_resend_seq_nr);
ack_packet(p, now, min_rtt, std::uint16_t(ack_nr));
ack_packet(std::move(p), now, min_rtt, std::uint16_t(ack_nr));
}
else
{
@ -1559,7 +1556,7 @@ void utp_socket_impl::write_payload(std::uint8_t* ptr, int size)
while (size > 0)
{
// i points to the iovec we'll start copying from
int to_copy = (std::min)(size, int(i->len));
int to_copy = std::min(size, int(i->len));
TORRENT_ASSERT(to_copy >= 0);
TORRENT_ASSERT(to_copy < INT_MAX / 2 && m_written < INT_MAX / 2);
memcpy(ptr, static_cast<char const*>(i->buf), to_copy);
@ -1637,34 +1634,6 @@ void utp_socket_impl::remove_sack_header(packet* p)
p->size -= std::uint16_t(sack_size + 2);
}
struct packet_holder
{
explicit packet_holder(utp_socket_manager* sm, packet* p = nullptr) : m_sm(sm), m_packet(p) {}
~packet_holder() { m_sm->release_packet(m_packet); }
void reset(packet* p)
{
m_sm->release_packet(m_packet);
m_packet = p;
}
packet* release()
{
packet* ret = m_packet;
m_packet = nullptr;
return ret;
}
private:
// not copyable
packet_holder(packet_holder const&);
packet_holder& operator=(packet_holder const&);
utp_socket_manager* m_sm;
packet* m_packet;
};
// sends a packet, pulls data from the write buffer (if there's any)
// if ack is true, we need to send a packet regardless of if there's
// any data. Returns true if we could send more data (i.e. call
@ -1737,14 +1706,14 @@ bool utp_socket_impl::send_pkt(int const flags)
// for non MTU-probes, use the conservative packet size
int const effective_mtu = mtu_probe ? m_mtu : m_mtu_floor;
int payload_size = (std::min)(m_write_buffer_size
int payload_size = std::min(m_write_buffer_size
, effective_mtu - header_size);
TORRENT_ASSERT(payload_size >= 0);
// if we have one MSS worth of data, make sure it fits in our
// congestion window and the advertised receive window from
// the other end.
if (m_bytes_in_flight + payload_size > (std::min)(int(m_cwnd >> 16)
if (m_bytes_in_flight + payload_size > std::min(int(m_cwnd >> 16)
, int(m_adv_wnd - m_bytes_in_flight)))
{
// this means there's not enough room in the send window for
@ -1794,46 +1763,20 @@ bool utp_socket_impl::send_pkt(int const flags)
int packet_size = header_size + payload_size;
packet* p = nullptr;
packet_ptr p;
std::uint8_t* ptr = nullptr;
utp_header* h = nullptr;
#if TORRENT_USE_ASSERTS
bool stack_alloced = false;
#endif
// used to free the packet buffer in case we exit the
// function early
packet_holder holder(m_sm);
// payload size being zero means we're just sending
// an force. We should not pick up the nagle packet
if (!m_nagle_packet || (payload_size == 0 && force))
{
// we only need a heap allocation if we have payload and
// need to keep the packet around (in the outbuf)
p = acquire_packet(effective_mtu);
if (payload_size)
{
p = acquire_packet(effective_mtu);
holder.reset(p);
m_sm->inc_stats_counter(counters::utp_payload_pkts_out);
}
else
{
#if TORRENT_USE_ASSERTS
stack_alloced = true;
#endif
TORRENT_ASSERT(force);
// this alloca() statement won't necessarily produce
// correctly aligned memory. That's why we ask for 7 more bytes
// and adjust our pointer to be aligned later
TORRENT_ALLOCA(ps, char, sizeof(packet) + packet_size + sizeof(packet*) - 1);
p = reinterpret_cast<packet*>(ps.data());
p = reinterpret_cast<packet*>(align_pointer(p));
UTP_LOGV("%8p: allocating %d bytes on the stack\n", static_cast<void*>(this), packet_size);
p->allocated = std::uint16_t(packet_size);
}
p->size = std::uint16_t(packet_size);
p->header_size = std::uint16_t(packet_size - payload_size);
@ -1861,7 +1804,8 @@ bool utp_socket_impl::send_pkt(int const flags)
else
{
// pick up the nagle packet and keep adding bytes to it
p = m_nagle_packet;
p = std::move(m_nagle_packet);
m_nagle_packet.reset();
ptr = p->buf + sizeof(utp_header);
h = reinterpret_cast<utp_header*>(p->buf);
@ -1877,14 +1821,14 @@ bool utp_socket_impl::send_pkt(int const flags)
if (m_inbuf.size() == 0)
{
// we need to remove the sack header
remove_sack_header(p);
remove_sack_header(p.get());
sack = 0;
}
}
else
sack = 0;
std::int32_t const size_left = (std::min)(p->allocated - p->size
std::int32_t const size_left = std::min(p->allocated - p->size
, m_write_buffer_size);
write_payload(p->buf + p->size, size_left);
@ -1897,17 +1841,16 @@ bool utp_socket_impl::send_pkt(int const flags)
// if we didn't, we may still send it if there's
// no bytes in flight
if (m_bytes_in_flight > 0
&& p->size < p->allocated
&& p->size < std::min(p->allocated, m_mtu_floor)
&& !force
&& m_nagle)
{
// the packet is still not a full MSS, so put it back into the nagle
// packet
m_nagle_packet = std::move(p);
return false;
}
// clear the nagle packet pointer and fall through
// sending p
m_nagle_packet = nullptr;
packet_size = p->size;
payload_size = p->size - p->header_size;
}
@ -1942,10 +1885,9 @@ bool utp_socket_impl::send_pkt(int const flags)
"adv_wnd:%d in-flight:%d mtu:%d effective_mtu:%d\n"
, static_cast<void*>(this), m_write_buffer_size, int(m_cwnd >> 16)
, m_adv_wnd, m_bytes_in_flight, m_mtu, effective_mtu);
TORRENT_ASSERT(m_nagle_packet == nullptr);
TORRENT_ASSERT(!m_nagle_packet);
TORRENT_ASSERT(h->seq_nr == m_seq_nr);
m_nagle_packet = p;
holder.release();
m_nagle_packet = std::move(p);
return false;
}
@ -1962,7 +1904,7 @@ bool utp_socket_impl::send_pkt(int const flags)
}
h->timestamp_difference_microseconds = m_reply_micro;
h->wnd_size = (std::max)(m_in_buf_size - m_buffered_incoming_bytes
h->wnd_size = std::max(m_in_buf_size - m_buffered_incoming_bytes
- m_receive_buffer_size, std::int32_t(0));
h->ack_nr = m_ack_nr;
@ -2036,10 +1978,10 @@ bool utp_socket_impl::send_pkt(int const flags)
}
else if (ec)
{
TORRENT_ASSERT(stack_alloced != bool(payload_size != 0));
m_error = ec;
set_state(UTP_STATE_ERROR_WAIT);
test_socket_state();
release_packet(std::move(p));
return false;
}
@ -2052,7 +1994,7 @@ bool utp_socket_impl::send_pkt(int const flags)
{
// if we're sending a payload packet, there should not
// be a nagle packet waiting for more data
TORRENT_ASSERT(m_nagle_packet == nullptr);
TORRENT_ASSERT(!m_nagle_packet);
#if !TORRENT_UT_SEQ
// if the other end closed the connection immediately
@ -2069,18 +2011,18 @@ bool utp_socket_impl::send_pkt(int const flags)
// release the buffer, we're saving it in the circular
// buffer of outgoing packets
holder.release();
packet* old = m_outbuf.insert(m_seq_nr, p);
int const new_in_flight = p->size - p->header_size;
packet_ptr old = m_outbuf.insert(m_seq_nr, std::move(p));
if (old)
{
// TORRENT_ASSERT(reinterpret_cast<utp_header*>(old->buf)->seq_nr == m_seq_nr);
if (!old->need_resend) m_bytes_in_flight -= old->size - old->header_size;
release_packet(old);
release_packet(std::move(old));
}
TORRENT_ASSERT(h->seq_nr == m_seq_nr);
m_seq_nr = (m_seq_nr + 1) & ACK_MASK;
TORRENT_ASSERT(payload_size >= 0);
m_bytes_in_flight += p->size - p->header_size;
m_bytes_in_flight += new_in_flight;
}
else
{
@ -2140,7 +2082,7 @@ bool utp_socket_impl::resend_packet(packet* p, bool fast_resend)
// since we can't re-packetize, some packets that are
// larger than the congestion window must be allowed through
// but only if we don't have any outstanding bytes
int window_size_left = (std::min)(int(m_cwnd >> 16), int(m_adv_wnd)) - m_bytes_in_flight;
int window_size_left = std::min(int(m_cwnd >> 16), int(m_adv_wnd)) - m_bytes_in_flight;
if (!fast_resend
&& p->size - p->header_size > window_size_left
&& m_bytes_in_flight > 0)
@ -2254,7 +2196,7 @@ void utp_socket_impl::experienced_loss(int const seq_nr)
if (compare_less_wrap(seq_nr, m_loss_seq_nr + 1, ACK_MASK)) return;
// cut window size in 2
m_cwnd = (std::max)(m_cwnd * m_sm->loss_multiplier() / 100
m_cwnd = std::max(m_cwnd * m_sm->loss_multiplier() / 100
, std::int64_t(m_mtu) * (1 << 16));
m_loss_seq_nr = m_seq_nr;
UTP_LOGV("%8p: Lost packet %d caused cwnd cut\n", static_cast<void*>(this), seq_nr);
@ -2312,7 +2254,7 @@ void utp_socket_impl::maybe_inc_acked_seq_nr()
m_duplicate_acks = 0;
}
void utp_socket_impl::ack_packet(packet* p, time_point const& receive_time
void utp_socket_impl::ack_packet(packet_ptr p, time_point const& receive_time
, std::uint32_t& min_rtt, std::uint16_t seq_nr)
{
#ifdef TORRENT_EXPENSIVE_INVARIANT_CHECKS
@ -2335,7 +2277,7 @@ void utp_socket_impl::ack_packet(packet* p, time_point const& receive_time
{
TORRENT_ASSERT(p->mtu_probe);
// our mtu probe was acked!
m_mtu_floor = (std::max)(m_mtu_floor, p->size);
m_mtu_floor = std::max(m_mtu_floor, p->size);
if (m_mtu_ceiling < m_mtu_floor) m_mtu_ceiling = m_mtu_floor;
update_mtu_limits();
}
@ -2358,10 +2300,10 @@ void utp_socket_impl::ack_packet(packet* p, time_point const& receive_time
m_rtt.add_sample(rtt / 1000);
if (rtt < min_rtt) min_rtt = rtt;
release_packet(p);
release_packet(std::move(p));
}
void utp_socket_impl::incoming(std::uint8_t const* buf, int size, packet* p
void utp_socket_impl::incoming(std::uint8_t const* buf, int size, packet_ptr p
, time_point /* now */)
{
#ifdef TORRENT_EXPENSIVE_INVARIANT_CHECKS
@ -2378,7 +2320,7 @@ void utp_socket_impl::incoming(std::uint8_t const* buf, int size, packet* p
}
iovec_t* target = &m_read_buffer.front();
int to_copy = (std::min)(size, int(target->len));
int const to_copy = std::min(size, int(target->len));
std::memcpy(target->buf, buf, to_copy);
m_read += to_copy;
target->buf = reinterpret_cast<std::uint8_t*>(target->buf) + to_copy;
@ -2397,8 +2339,8 @@ void utp_socket_impl::incoming(std::uint8_t const* buf, int size, packet* p
if (size == 0)
{
TORRENT_ASSERT(p == nullptr || p->header_size == p->size);
release_packet(p);
TORRENT_ASSERT(!p || p->header_size == p->size);
release_packet(std::move(p));
return;
}
}
@ -2414,8 +2356,8 @@ void utp_socket_impl::incoming(std::uint8_t const* buf, int size, packet* p
std::memcpy(p->buf, buf, size);
}
// save this packet until the client issues another read
m_receive_buffer.push_back(p);
m_receive_buffer_size += p->size - p->header_size;
m_receive_buffer.emplace_back(std::move(p));
UTP_LOGV("%8p: incoming: saving packet in receive buffer (%d)\n", static_cast<void*>(this), m_receive_buffer_size);
@ -2477,7 +2419,7 @@ bool utp_socket_impl::consume_incoming_data(
if (ph->seq_nr == ((m_ack_nr + 1) & ACK_MASK))
{
TORRENT_ASSERT(m_inbuf.at(m_ack_nr) == nullptr);
TORRENT_ASSERT(!m_inbuf.at(m_ack_nr));
if (m_buffered_incoming_bytes + m_receive_buffer_size + payload_size > m_in_buf_size)
{
@ -2487,12 +2429,12 @@ bool utp_socket_impl::consume_incoming_data(
}
// we received a packet in order
incoming(ptr, payload_size, nullptr, now);
incoming(ptr, payload_size, packet_ptr(), now);
m_ack_nr = (m_ack_nr + 1) & ACK_MASK;
// If this packet was previously in the reorder buffer
// it would have been acked when m_ack_nr-1 was acked.
TORRENT_ASSERT(m_inbuf.at(m_ack_nr) == nullptr);
TORRENT_ASSERT(!m_inbuf.at(m_ack_nr));
UTP_LOGV("%8p: remove inbuf: %d (%d)\n"
, static_cast<void*>(this), m_ack_nr, int(m_inbuf.size()));
@ -2501,12 +2443,12 @@ bool utp_socket_impl::consume_incoming_data(
{
int const next_ack_nr = (m_ack_nr + 1) & ACK_MASK;
packet* p = m_inbuf.remove(next_ack_nr);
packet_ptr p = m_inbuf.remove(next_ack_nr);
if (!p) break;
m_buffered_incoming_bytes -= p->size - p->header_size;
incoming(nullptr, p->size - p->header_size, p, now);
incoming(nullptr, p->size - p->header_size, std::move(p), now);
m_ack_nr = std::uint16_t(next_ack_nr);
@ -2544,7 +2486,7 @@ bool utp_socket_impl::consume_incoming_data(
}
// we don't need to save the packet header, just the payload
packet* p = acquire_packet(payload_size);
packet_ptr p = acquire_packet(payload_size);
p->size = std::uint16_t(payload_size);
p->header_size = 0;
p->num_transmissions = 0;
@ -2553,8 +2495,8 @@ bool utp_socket_impl::consume_incoming_data(
#endif
p->need_resend = false;
std::memcpy(p->buf, ptr, payload_size);
m_inbuf.insert(ph->seq_nr, p);
m_buffered_incoming_bytes += p->size;
m_inbuf.insert(ph->seq_nr, std::move(p));
UTP_LOGV("%8p: out of order. insert inbuf: %d (%d) m_ack_nr: %d\n"
, static_cast<void*>(this), int(ph->seq_nr), int(m_inbuf.size()), m_ack_nr);
@ -2859,12 +2801,12 @@ bool utp_socket_impl::incoming_packet(span<std::uint8_t const> buf
{
if (m_fast_resend_seq_nr == ack_nr)
m_fast_resend_seq_nr = (m_fast_resend_seq_nr + 1) & ACK_MASK;
packet* p = m_outbuf.remove(ack_nr);
packet_ptr p = m_outbuf.remove(ack_nr);
if (!p) continue;
acked_bytes += p->size - p->header_size;
ack_packet(p, receive_time, min_rtt, std::uint16_t(ack_nr));
ack_packet(std::move(p), receive_time, min_rtt, std::uint16_t(ack_nr));
}
maybe_inc_acked_seq_nr();
@ -3089,7 +3031,7 @@ bool utp_socket_impl::incoming_packet(span<std::uint8_t const> buf
m_send_delay = delay;
}
m_recv_delay = (std::min)(their_delay, min_rtt);
m_recv_delay = std::min(their_delay, min_rtt);
consume_incoming_data(ph, ptr, payload_size, receive_time);
@ -3334,7 +3276,7 @@ void utp_socket_impl::do_ledbat(const int acked_bytes, const int delay
TORRENT_ASSERT(in_flight > 0);
TORRENT_ASSERT(acked_bytes > 0);
const int target_delay = (std::max)(1, m_sm->target_delay());
const int target_delay = std::max(1, m_sm->target_delay());
// true if the upper layer is pushing enough data down the socket to be
// limited by the cwnd. If this is not the case, we should not adjust cwnd.
@ -3387,7 +3329,7 @@ void utp_socket_impl::do_ledbat(const int acked_bytes, const int delay
}
else
{
scaled_gain = (std::max)(exponential_gain, linear_gain);
scaled_gain = std::max(exponential_gain, linear_gain);
}
}
else
@ -3424,7 +3366,7 @@ void utp_socket_impl::do_ledbat(const int acked_bytes, const int delay
TORRENT_ASSERT(m_cwnd >= 0);
int window_size_left = (std::min)(int(m_cwnd >> 16), int(m_adv_wnd)) - in_flight + acked_bytes;
int window_size_left = std::min(int(m_cwnd >> 16), int(m_adv_wnd)) - in_flight + acked_bytes;
if (window_size_left >= m_mtu)
{
UTP_LOGV("%8p: mtu:%d in_flight:%d adv_wnd:%d cwnd:%d acked_bytes:%d cwnd_full -> 0\n"
@ -3461,7 +3403,7 @@ int utp_socket_impl::packet_timeout() const
// avoid overflow by simply capping based on number of timeouts as well
if (m_num_timeouts >= 7) return 60000;
int timeout = (std::max)(m_sm->min_timeout(), m_rtt.mean() + m_rtt.avg_deviation() * 2);
int timeout = std::max(m_sm->min_timeout(), m_rtt.mean() + m_rtt.avg_deviation() * 2);
if (m_num_timeouts > 0) timeout += (1 << (int(m_num_timeouts) - 1)) * 1000;
// timeouts over 1 minute are capped
@ -3657,16 +3599,11 @@ void utp_socket_impl::check_receive_buffers() const
{
INVARIANT_CHECK;
std::size_t size = 0;
int size = 0;
for (auto const& p : m_receive_buffer)
size += p ? p->size - p->header_size : 0;
for (std::vector<packet*>::const_iterator i = m_receive_buffer.begin()
, end(m_receive_buffer.end()); i != end; ++i)
{
if (packet const* p = *i)
size += p->size - p->header_size;
}
TORRENT_ASSERT(int(size) == m_receive_buffer_size);
TORRENT_ASSERT(size == m_receive_buffer_size);
}
#if TORRENT_USE_INVARIANT_CHECKS

View File

@ -32,73 +32,86 @@ POSSIBILITY OF SUCH DAMAGE.
#include "test.hpp"
#include "libtorrent/packet_buffer.hpp"
#include "libtorrent/packet_pool.hpp"
using libtorrent::packet_buffer;
using libtorrent::packet_ptr;
using libtorrent::packet_pool;
using libtorrent::packet;
packet_ptr make_pkt(packet_pool& pool, int const val)
{
packet_ptr ret = pool.acquire(20);
*reinterpret_cast<std::uint8_t*>(ret->buf) = val;
return ret;
}
int get_val(packet* pkt)
{
TORRENT_ASSERT(pkt != nullptr);
return *reinterpret_cast<std::uint8_t*>(pkt->buf);
}
// test packet_buffer
TORRENT_TEST(insert)
{
packet_buffer<int> pb;
int a123 = 123;
int a125 = 125;
int a500 = 500;
int a501 = 501;
packet_pool pool;
packet_buffer pb;
TEST_EQUAL(pb.capacity(), 0);
TEST_EQUAL(pb.size(), 0);
TEST_EQUAL(pb.span(), 0);
pb.insert(123, &a123);
TEST_EQUAL(pb.at(123 + 16), 0);
pb.insert(123, make_pkt(pool, 123));
TEST_CHECK(pb.at(123 + 16) == nullptr);
TEST_CHECK(pb.at(123) == &a123);
TEST_EQUAL(get_val(pb.at(123)), 123);
TEST_CHECK(pb.capacity() > 0);
TEST_EQUAL(pb.size(), 1);
TEST_EQUAL(pb.span(), 1);
TEST_EQUAL(pb.cursor(), 123);
pb.insert(125, &a125);
pb.insert(125, make_pkt(pool, 125));
TEST_CHECK(pb.at(125) == &a125);
TEST_EQUAL(get_val(pb.at(125)), 125);
TEST_EQUAL(pb.size(), 2);
TEST_EQUAL(pb.span(), 3);
TEST_EQUAL(pb.cursor(), 123);
pb.insert(500, &a500);
pb.insert(500, make_pkt(pool, 4));
TEST_EQUAL(pb.size(), 3);
TEST_EQUAL(pb.span(), 501 - 123);
TEST_EQUAL(pb.capacity(), 512);
pb.insert(500, &a501);
pb.insert(500, make_pkt(pool, 5));
TEST_EQUAL(pb.size(), 3);
pb.insert(500, &a500);
TEST_EQUAL(get_val(pb.insert(500, make_pkt(pool, 4)).get()), 5);
TEST_EQUAL(pb.size(), 3);
TEST_CHECK(pb.remove(123) == &a123);
TEST_EQUAL(get_val(pb.remove(123).get()), 123);
TEST_EQUAL(pb.size(), 2);
TEST_EQUAL(pb.span(), 501 - 125);
TEST_EQUAL(pb.cursor(), 125);
TEST_CHECK(pb.remove(125) == &a125);
TEST_EQUAL(get_val(pb.remove(125).get()), 125);
TEST_EQUAL(pb.size(), 1);
TEST_EQUAL(pb.span(), 1);
TEST_EQUAL(pb.cursor(), 500);
TEST_CHECK(pb.remove(500) == &a500);
TEST_EQUAL(get_val(pb.remove(500).get()), 4);
TEST_EQUAL(pb.size(), 0);
TEST_EQUAL(pb.span(), 0);
for (int i = 0; i < 0xff; ++i)
{
int index = (i + 0xfff0) & 0xffff;
pb.insert(index, reinterpret_cast<int*>(size_t(index) + 1));
pb.insert(index, make_pkt(pool, index + 1));
std::printf("insert: %u (mask: %x)\n", index, int(pb.capacity() - 1));
TEST_EQUAL(pb.capacity(), 512);
if (i >= 14)
{
index = (index - 14) & 0xffff;
std::printf("remove: %u\n", index);
TEST_CHECK(pb.remove(index) == reinterpret_cast<int*>(size_t(index) + 1));
TEST_EQUAL(get_val(pb.remove(index).get()), std::uint8_t(index + 1));
TEST_EQUAL(pb.size(), 14);
}
}
@ -107,60 +120,63 @@ TORRENT_TEST(insert)
TORRENT_TEST(wrap)
{
// test wrapping the indices
packet_buffer<void> pb;
packet_pool pool;
packet_buffer pb;
TEST_EQUAL(pb.size(), 0);
pb.insert(0xfffe, (void*)1);
TEST_CHECK(pb.at(0xfffe) == (void*)1);
pb.insert(0xfffe, make_pkt(pool, 1));
TEST_EQUAL(get_val(pb.at(0xfffe)), 1);
pb.insert(2, (void*)2);
TEST_CHECK(pb.at(2) == (void*)2);
pb.insert(2, make_pkt(pool, 2));
TEST_EQUAL(get_val(pb.at(2)), 2);
pb.remove(0xfffe);
TEST_CHECK(pb.at(0xfffe) == (void*)nullptr);
TEST_CHECK(pb.at(2) == (void*)2);
TEST_CHECK(pb.at(0xfffe) == nullptr);
TEST_EQUAL(get_val(pb.at(2)), 2);
}
TORRENT_TEST(wrap2)
{
// test wrapping the indices
packet_buffer<void> pb;
packet_pool pool;
packet_buffer pb;
TEST_EQUAL(pb.size(), 0);
pb.insert(0xfff3, (void*)1);
TEST_CHECK(pb.at(0xfff3) == (void*)1);
pb.insert(0xfff3, make_pkt(pool, 1));
TEST_EQUAL(get_val(pb.at(0xfff3)), 1);
int new_index = (0xfff3 + pb.capacity()) & 0xffff;
pb.insert(new_index, (void*)2);
TEST_CHECK(pb.at(new_index) == (void*)2);
pb.insert(new_index, make_pkt(pool, 2));
TEST_EQUAL(get_val(pb.at(new_index)), 2);
void* old = pb.remove(0xfff3);
TEST_CHECK(old == (void*)1);
TEST_CHECK(pb.at(0xfff3) == (void*)nullptr);
TEST_CHECK(pb.at(new_index) == (void*)2);
packet_ptr old = pb.remove(0xfff3);
TEST_CHECK(get_val(old.get()) == 1);
TEST_CHECK(pb.at(0xfff3) == nullptr);
TEST_EQUAL(get_val(pb.at(new_index)), 2);
}
TORRENT_TEST(reverse_wrap)
{
// test wrapping the indices backwards
packet_buffer<void> pb;
packet_pool pool;
packet_buffer pb;
TEST_EQUAL(pb.size(), 0);
pb.insert(0xfff3, (void*)1);
TEST_CHECK(pb.at(0xfff3) == (void*)1);
pb.insert(0xfff3, make_pkt(pool, 1));
TEST_EQUAL(get_val(pb.at(0xfff3)), 1);
int new_index = (0xfff3 + pb.capacity()) & 0xffff;
pb.insert(new_index, (void*)2);
TEST_CHECK(pb.at(new_index) == (void*)2);
pb.insert(new_index, make_pkt(pool, 2));
TEST_EQUAL(get_val(pb.at(new_index)), 2);
void* old = pb.remove(0xfff3);
TEST_CHECK(old == (void*)1);
TEST_CHECK(pb.at(0xfff3) == (void*)nullptr);
TEST_CHECK(pb.at(new_index) == (void*)2);
packet_ptr old = pb.remove(0xfff3);
TEST_CHECK(get_val(old.get()) == 1);
TEST_CHECK(pb.at(0xfff3) == nullptr);
TEST_EQUAL(get_val(pb.at(new_index)), 2);
pb.insert(0xffff, (void*)0xffff);
pb.insert(0xffff, make_pkt(pool, 3));
}