simplify peer_connection's read handlers and the receive buffer. (#853)

simplify peer_connection's read handlers and the receive buffer. removed the concept of soft-packet-size. removed the secondary null-buffer receive path. removed try_read. simplify the buffer class to not be resizeable. make receive_buffer shrink the buffer allocation. add cap to growing the receive buffer. buffer sizes are always divisible by 8.
This commit is contained in:
Arvid Norberg 2016-06-26 20:41:03 -04:00 committed by GitHub
parent 263bd31782
commit c567a66f5e
18 changed files with 576 additions and 623 deletions

View File

@ -389,7 +389,7 @@ int print_peer_info(std::string& out
if (print_ip) out += "IP ";
out += "progress down (total | peak ) up (total | peak ) sent-req tmo bsy rcv flags dn up source ";
if (print_fails) out += "fail hshf ";
if (print_send_bufs) out += "rq sndb rcvb q-bytes ";
if (print_send_bufs) out += "rq sndb (recvb |alloc | wmrk ) q-bytes ";
if (print_timers) out += "inactive wait timeout q-time ";
out += " v disk ^ rtt ";
if (print_block) out += "block-progress ";
@ -471,9 +471,11 @@ int print_peer_info(std::string& out
}
if (print_send_bufs)
{
std::snprintf(str, sizeof(str), "%2d %6d %6d%5dkB "
std::snprintf(str, sizeof(str), "%2d %6d %6d|%6d|%6d%5dkB "
, i->requests_in_buffer, i->used_send_buffer
, i->used_receive_buffer
, i->receive_buffer_size
, i->receive_buffer_watermark
, i->queue_bytes / 1000);
out += str;
}

View File

@ -34,6 +34,7 @@ POSSIBILITY OF SUCH DAMAGE.
#define TORRENT_ARRAY_VIEW_HPP_INCLUDED
#include <vector>
#include <array>
#include <type_traits> // for std::is_convertible
#include "libtorrent/assert.hpp"

View File

@ -107,10 +107,6 @@ struct suggest_piece
}
m_priority_pieces.push_back(index);
std::printf("SUGGEST: ");
for (int p : m_priority_pieces) std::printf(" %d", p);
std::printf("\n");
}
private:

View File

@ -40,12 +40,34 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/invariant_check.hpp"
#include "libtorrent/assert.hpp"
#include "libtorrent/aux_/array_view.hpp"
#if defined __GLIBC__
#include <malloc.h>
#elif defined _MSC_VER
#include <malloc.h>
#elif defined TORRENT_BSD
#include <malloc/malloc.h>
#endif
namespace libtorrent {
namespace {
inline std::size_t round_up8(std::size_t const v)
{
return (v + 7) & (~std::size_t(0x7));
}
}
// the buffer is allocated once and cannot be resized. The size() may be
// larger than requested, in case the underlying allocator over allocated. In
// order to "grow" an allocation, create a new buffer and initialize it by
// the range of bytes from the existing, and move-assign the new over the
// old.
class buffer
{
public:
struct interval
{
interval()
@ -110,31 +132,58 @@ public:
char const* end;
};
buffer(std::size_t n = 0)
: m_begin(0)
, m_size(0)
, m_capacity(0)
// allocate an uninitialized buffer of the specified size
buffer(std::size_t size = 0)
{
if (n) resize(n);
TORRENT_ASSERT(size < std::size_t(std::numeric_limits<std::int32_t>::max()));
if (size == 0) return;
size = round_up8(size);
m_begin = static_cast<char*>(std::malloc(size));
if (m_begin == nullptr)
{
#ifndef BOOST_NO_EXCEPTIONS
throw std::bad_alloc();
#else
std::terminate();
#endif
}
// the actual allocation may be larger than we requested. If so, let the
// user take advantage of every single byte
#if defined __GLIBC__
m_size = malloc_usable_size(m_begin);
#elif defined _MSC_VER
m_size = _msize(m_begin);
#elif defined TORRENT_BSD
m_size = malloc_size(m_begin);
#else
m_size = size;
#endif
}
buffer(buffer const& b)
: m_begin(0)
, m_size(0)
, m_capacity(0)
// allocate an uninitialized buffer of the specified size
// and copy the initialization range into the start of the buffer
buffer(std::size_t const size, aux::array_view<char const> initialize)
: buffer(size)
{
if (b.size() == 0) return;
resize(b.size());
std::memcpy(m_begin, b.begin(), b.size());
TORRENT_ASSERT(initialize.size() <= size);
if (initialize.size() > 0)
{
memcpy(m_begin, initialize.data(), std::min(initialize.size(), size));
}
}
buffer(buffer const& b) = delete;
buffer(buffer&& b)
: m_begin(b.m_begin)
, m_size(b.m_size)
, m_capacity(b.m_capacity)
{
b.m_begin = nullptr;
b.m_size = b.m_capacity = 0;
b.m_size = 0;
}
buffer& operator=(buffer&& b)
@ -143,86 +192,30 @@ public:
std::free(m_begin);
m_begin = b.m_begin;
m_size = b.m_size;
m_capacity = b.m_capacity;
b.m_begin = nullptr;
b.m_size = b.m_capacity = 0;
b.m_size = 0;
return *this;
}
buffer& operator=(buffer const& b)
{
if (&b == this) return *this;
resize(b.size());
if (b.size() == 0) return *this;
std::memcpy(m_begin, b.begin(), b.size());
return *this;
}
buffer& operator=(buffer const& b) = delete;
~buffer()
{
std::free(m_begin);
}
~buffer() { std::free(m_begin); }
// TODO: 3 fix the naming convention here
char* ptr() { return m_begin; }
char const* ptr() const { return m_begin; }
buffer::interval data()
{ return interval(m_begin, m_begin + m_size); }
buffer::const_interval data() const
{ return const_interval(m_begin, m_begin + m_size); }
{ return interval(m_begin, m_begin + m_size); }
void resize(std::size_t n)
{
TORRENT_ASSERT(n < std::numeric_limits<std::uint32_t>::max());
reserve(n);
m_size = std::uint32_t(n);
}
operator aux::array_view<char>()
{ return aux::array_view<char>(m_begin, int(m_size)); }
operator aux::array_view<char const>() const
{ return aux::array_view<char const>(m_begin, int(m_size)); }
void insert(char* point, char const* first, char const* last)
{
std::size_t p = point - m_begin;
if (point == m_begin + m_size)
{
resize(size() + last - first);
std::memcpy(m_begin + p, first, last - first);
return;
}
resize(size() + last - first);
std::memmove(m_begin + p + (last - first), m_begin + p, last - first);
std::memcpy(m_begin + p, first, last - first);
}
void erase(char* b, char* e)
{
TORRENT_ASSERT(e <= m_begin + m_size);
TORRENT_ASSERT(b >= m_begin);
TORRENT_ASSERT(b <= e);
if (e == m_begin + m_size)
{
resize(b - m_begin);
return;
}
std::memmove(b, e, m_begin + m_size - e);
TORRENT_ASSERT(e >= b);
TORRENT_ASSERT(uintptr_t(e - b) <= std::numeric_limits<std::uint32_t>::max());
TORRENT_ASSERT(uintptr_t(e - b) <= m_size);
m_size -= std::uint32_t(e - b);
}
void clear() { m_size = 0; }
std::size_t size() const { return m_size; }
std::size_t capacity() const { return m_capacity; }
void reserve(std::size_t n)
{
if (n <= capacity()) return;
TORRENT_ASSERT(n > 0);
TORRENT_ASSERT(n < 0xffffffffu);
char* tmp = static_cast<char*>(std::realloc(m_begin, n));
#ifndef BOOST_NO_EXCEPTIONS
if (tmp == nullptr) throw std::bad_alloc();
#endif
m_begin = tmp;
m_capacity = std::uint32_t(n);
}
bool empty() const { return m_size == 0; }
char& operator[](std::size_t i) { TORRENT_ASSERT(i < size()); return m_begin[i]; }
@ -238,15 +231,13 @@ public:
using std::swap;
swap(m_begin, b.m_begin);
swap(m_size, b.m_size);
swap(m_capacity, b.m_capacity);
}
private:
char* m_begin;
std::uint32_t m_size;
std::uint32_t m_capacity;
char* m_begin = nullptr;
// m_begin points to an allocation of this size.
std::size_t m_size = 0;
};
}
#endif // BTORRENT_BUFFER_HPP_INCLUDED

View File

@ -719,8 +719,6 @@ namespace libtorrent
protected:
size_t try_read(sync_t s, error_code& ec);
virtual void get_specific_peer_info(peer_info& p) const = 0;
virtual void write_choke() = 0;
@ -742,6 +740,7 @@ namespace libtorrent
virtual void on_connected() = 0;
virtual void on_tick() {}
// implemented by concrete connection classes
virtual void on_receive(error_code const& error
, std::size_t bytes_transferred) = 0;
virtual void on_sent(error_code const& error
@ -752,7 +751,10 @@ namespace libtorrent
virtual
std::tuple<int, aux::array_view<boost::asio::const_buffer>>
hit_send_barrier(aux::array_view<boost::asio::mutable_buffer> /* iovec */)
{ return std::make_tuple(INT_MAX, aux::array_view<boost::asio::const_buffer>()); }
{
return std::make_tuple(INT_MAX
, aux::array_view<boost::asio::const_buffer>());
}
void attach_to_torrent(sha1_hash const& ih);
@ -760,21 +762,6 @@ namespace libtorrent
void update_desired_queue_size();
// called from the main loop when this connection has any
// work to do.
void on_send_data(error_code const& error
, std::size_t bytes_transferred);
void on_receive_data(error_code const& error
, std::size_t bytes_transferred);
// _nb means null_buffers. i.e. we just know the socket is
// readable at this point, we don't know how much has been received
void on_receive_data_nb(error_code const& error
, std::size_t bytes_transferred);
void receive_data_impl(error_code const& error
, std::size_t bytes_transferred, int read_loops);
void set_send_barrier(int bytes)
{
TORRENT_ASSERT(bytes == INT_MAX || bytes <= send_buffer_size());
@ -788,6 +775,15 @@ namespace libtorrent
io_service& get_io_service() { return m_ios; }
private:
// callbacks for data being sent or received
void on_send_data(error_code const& error
, std::size_t bytes_transferred);
void on_receive_data(error_code const& error
, std::size_t bytes_transferred);
void account_received_bytes(int bytes_transferred);
// explicitly disallow assignment, to silence msvc warning
peer_connection& operator=(peer_connection const&);

View File

@ -246,6 +246,7 @@ namespace libtorrent
// allocated and used as receive buffer, respectively.
int receive_buffer_size;
int used_receive_buffer;
int receive_buffer_watermark;
// the number of pieces this peer has participated in sending us that
// turned out to fail the hash check.

View File

@ -35,6 +35,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include <libtorrent/buffer.hpp>
#include <libtorrent/disk_buffer_holder.hpp>
#include <libtorrent/sliding_average.hpp>
#include <boost/asio/buffer.hpp>
namespace libtorrent {
@ -43,14 +44,6 @@ struct TORRENT_EXTRA_EXPORT receive_buffer
{
friend struct crypto_receive_buffer;
receive_buffer()
: m_recv_start(0)
, m_recv_end(0)
, m_recv_pos(0)
, m_packet_size(0)
, m_soft_packet_size(0)
{}
int packet_size() const { return m_packet_size; }
int packet_bytes_remaining() const
{
@ -59,19 +52,15 @@ struct TORRENT_EXTRA_EXPORT receive_buffer
return m_packet_size - m_recv_pos;
}
int max_receive();
int max_receive() const;
bool packet_finished() const { return m_packet_size <= m_recv_pos; }
int pos() const { return m_recv_pos; }
int capacity() const { return int(m_recv_buffer.capacity()); }
int regular_buffer_size() const
{
TORRENT_ASSERT(m_packet_size > 0);
return m_packet_size;
}
int capacity() const { return int(m_recv_buffer.size()); }
int watermark() const { return m_watermark.mean(); }
boost::asio::mutable_buffer reserve(int size);
void grow(int limit);
// tell the buffer we just received more bytes at the end of it. This will
// advance the end cursor
@ -89,11 +78,6 @@ struct TORRENT_EXTRA_EXPORT receive_buffer
// has the read cursor reached the end cursor?
bool pos_at_end() { return m_recv_pos == m_recv_end; }
// make the buffer size divisible by 8 bytes (RC4 block size)
void clamp_size();
void set_soft_packet_size(int size) { m_soft_packet_size = size; }
// size = the packet size to remove from the receive buffer
// packet_size = the next packet size to receive in the buffer
// offset = the offset into the receive buffer where to remove `size` bytes
@ -104,8 +88,7 @@ struct TORRENT_EXTRA_EXPORT receive_buffer
buffer::const_interval get() const;
#if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS)
// returns the entire regular buffer
// should only be used during the handshake
// returns the entire buffer
buffer::interval mutable_buffer();
// returns the last 'bytes' from the receive buffer
@ -114,7 +97,7 @@ struct TORRENT_EXTRA_EXPORT receive_buffer
// the purpose of this function is to free up and cut off all messages
// in the receive buffer that have been parsed and processed.
void normalize();
void normalize(int force_shrink = 0);
bool normalized() const { return m_recv_start == 0; }
void reset(int packet_size);
@ -130,16 +113,15 @@ private:
// explicitly disallow assignment, to silence msvc warning
receive_buffer& operator=(receive_buffer const&);
// recv_buf.begin (start of actual receive buffer)
// m_recv_buffer.data() (start of actual receive buffer)
// |
// | m_recv_start (logical start of current
// | | receive buffer, as perceived by upper layers)
// | m_recv_start (tart of current packet)
// | |
// | | m_recv_pos (number of bytes consumed
// | | | by upper layer, from logical receive buffer)
// | | |
// | x---------x
// | | | recv_buf.end (end of actual receive buffer)
// | | | m_recv_buffer.size() (end of actual receive buffer)
// | | | |
// v v v v
// *------==========---------
@ -151,25 +133,23 @@ private:
// m_recv_buffer
// the start of the logical receive buffer
int m_recv_start;
int m_recv_start = 0;
// the number of valid, received bytes in m_recv_buffer
int m_recv_end;
int m_recv_end = 0;
// the byte offset in m_recv_buffer that we have
// are passing on to the upper layer. This is
// always <= m_recv_end
int m_recv_pos;
int m_recv_pos = 0;
// the size (in bytes) of the bittorrent message
// we're currently receiving
int m_packet_size;
int m_packet_size = 0;
// the number of bytes that the other
// end has to send us in order to respond
// to all outstanding piece requests we
// have sent to it
int m_soft_packet_size;
// keep track of how much of the receive buffer we use, if we're not using
// enuogh of it we shrink it
sliding_average<20> m_watermark;
buffer m_recv_buffer;
};
@ -183,10 +163,7 @@ private:
struct crypto_receive_buffer
{
crypto_receive_buffer(receive_buffer& next)
: m_recv_pos(INT_MAX)
, m_packet_size(0)
, m_soft_packet_size(0)
, m_connection_buffer(next)
: m_connection_buffer(next)
{}
buffer::interval mutable_buffer() { return m_connection_buffer.mutable_buffer(); }
@ -219,8 +196,6 @@ struct crypto_receive_buffer
void reset(int packet_size);
void crypto_reset(int packet_size);
void set_soft_packet_size(int size);
int advance_pos(int bytes);
buffer::const_interval get() const;
@ -231,9 +206,8 @@ private:
// explicitly disallow assignment, to silence msvc warning
crypto_receive_buffer& operator=(crypto_receive_buffer const&);
int m_recv_pos;
int m_packet_size;
int m_soft_packet_size;
int m_recv_pos = std::numeric_limits<int>::max();
int m_packet_size = 0;
receive_buffer& m_connection_buffer;
};
#endif // TORRENT_DISABLE_ENCRYPTION
@ -241,3 +215,4 @@ private:
} // namespace libtorrent
#endif // #ifndef TORRENT_RECEIVE_BUFFER_HPP_INCLUDED

View File

@ -1098,7 +1098,6 @@ namespace libtorrent
// and is disconnected.
max_rejects,
// ``recv_socket_buffer_size`` and ``send_socket_buffer_size``
// specifies the buffer sizes set on peer sockets. 0 (which is the
// default) means the OS default (i.e. don't change the buffer sizes).
// The socket buffer sizes are changed using setsockopt() with
@ -1106,6 +1105,10 @@ namespace libtorrent
recv_socket_buffer_size,
send_socket_buffer_size,
// the max number of bytes a single peer connection's receive buffer is
// allowed to grow to.
max_peer_recv_buffer_size,
// ``file_checks_delay_per_block`` is the number of milliseconds to
// sleep in between disk read operations when checking torrents. This
// defaults to 0, but can be set to higher numbers to slow down the

View File

@ -45,6 +45,8 @@ template <int inverted_gain>
struct sliding_average
{
sliding_average(): m_mean(0), m_average_deviation(0), m_num_samples(0) {}
sliding_average(sliding_average const&) = default;
sliding_average& operator=(sliding_average const&) = default;
void add_sample(int s)
{
@ -75,19 +77,16 @@ struct sliding_average
private:
// both of these are fixed point values (* 64)
int m_mean;
int m_average_deviation;
int m_mean = 0;
int m_average_deviation = 0;
// the number of samples we have received, but no more than inverted_gain
// this is the effective inverted_gain
int m_num_samples;
int m_num_samples = 0;
};
struct average_accumulator
{
average_accumulator()
: m_num_samples(0)
, m_sample_sum(0)
{}
average_accumulator() {}
void add_sample(int s)
{
@ -108,8 +107,10 @@ struct average_accumulator
return ret;
}
int m_num_samples;
std::uint64_t m_sample_sum;
private:
int m_num_samples = 0;
std::uint64_t m_sample_sum = 0;
};
}

View File

@ -42,6 +42,10 @@ namespace libtorrent { namespace aux
{
stack_allocator() {}
// non-copyable
stack_allocator(stack_allocator const&) = delete;
stack_allocator& operator=(stack_allocator const&) = delete;
int copy_string(std::string const& str)
{
int ret = int(m_storage.size());
@ -100,11 +104,7 @@ namespace libtorrent { namespace aux
private:
// non-copyable
stack_allocator(stack_allocator const&);
stack_allocator& operator=(stack_allocator const&);
buffer m_storage;
std::vector<char> m_storage;
};
} }

View File

@ -1105,12 +1105,11 @@ namespace libtorrent
boost::shared_ptr<torrent> t = associated_torrent().lock();
TORRENT_ASSERT(t);
bool merkle = static_cast<std::uint8_t>(recv_buffer.begin[0]) == 250;
bool const merkle = static_cast<std::uint8_t>(recv_buffer.begin[0]) == 250;
if (merkle)
{
if (recv_pos == 1)
{
m_recv_buffer.set_soft_packet_size(13);
received_bytes(0, received);
return;
}
@ -1119,13 +1118,10 @@ namespace libtorrent
received_bytes(0, received);
return;
}
if (recv_pos == 13)
if (recv_pos >= 13)
{
const char* ptr = recv_buffer.begin + 9;
int list_size = detail::read_int32(ptr);
// now we know how long the bencoded hash list is
// and we can allocate the disk buffer and receive
// into it
char const* ptr = recv_buffer.begin + 9;
int const list_size = detail::read_int32(ptr);
if (list_size > m_recv_buffer.packet_size() - 13)
{

View File

@ -91,12 +91,13 @@ namespace libtorrent { namespace aux
m_have_pieces.set_bit(piece);
#endif
int size = (std::min)(std::uint64_t(piece_size), total_size - off);
TORRENT_ASSERT(total_size >= off);
std::int64_t size = (std::min)(std::uint64_t(piece_size), total_size - off);
TORRENT_ASSERT(size >= 0);
while (size)
{
int add = (std::min)(std::int64_t(size), fs.file_size(file_index) - file_offset);
std::int64_t add = (std::min)(size, fs.file_size(file_index) - file_offset);
TORRENT_ASSERT(add >= 0);
m_file_progress[file_index] += add;

View File

@ -4485,6 +4485,7 @@ namespace libtorrent
p.used_send_buffer = m_send_buffer.size();
p.receive_buffer_size = m_recv_buffer.capacity();
p.used_receive_buffer = m_recv_buffer.pos();
p.receive_buffer_watermark = m_recv_buffer.watermark();
p.write_state = m_channel_state[upload_channel];
p.read_state = m_channel_state[download_channel];
@ -5409,10 +5410,11 @@ namespace libtorrent
}
}
int peer_connection::request_bandwidth(int channel, int bytes)
int peer_connection::request_bandwidth(int const channel, int bytes)
{
TORRENT_ASSERT(is_single_thread());
INVARIANT_CHECK;
// we can only have one outstanding bandwidth request at a time
if (m_channel_state[channel] & peer_info::bw_limit) return 0;
@ -5652,8 +5654,15 @@ namespace libtorrent
if (m_disconnecting) return;
if (m_recv_buffer.capacity() < 100
&& m_recv_buffer.max_receive() == 0)
{
m_recv_buffer.reserve(100);
}
// we may want to request more quota at this point
request_bandwidth(download_channel);
int const buffer_size = m_recv_buffer.max_receive();
request_bandwidth(download_channel, buffer_size);
if (m_channel_state[download_channel] & peer_info::bw_network) return;
@ -5680,94 +5689,27 @@ namespace libtorrent
// from being at or exceeding the limit down to below the limit
return;
}
error_code ec;
try_read(read_async, ec);
}
size_t peer_connection::try_read(sync_t s, error_code& ec)
{
TORRENT_ASSERT(is_single_thread());
TORRENT_ASSERT(m_connected);
if (m_quota[download_channel] == 0) return;
if (m_quota[download_channel] == 0)
{
ec = boost::asio::error::would_block;
return 0;
}
int const quota_left = m_quota[download_channel];
int const max_receive = (std::min)(buffer_size, quota_left);
if (!can_read())
{
ec = boost::asio::error::would_block;
return 0;
}
int max_receive = m_recv_buffer.max_receive();
// only apply the contiguous receive buffer when we don't have any
// outstanding requests. When we're likely to receive pieces, we'll
// save more time from avoiding copying data from the socket
if (m_download_queue.empty())
{
if (s == read_sync)
{
ec = boost::asio::error::would_block;
return 0;
}
TORRENT_ASSERT((m_channel_state[download_channel] & peer_info::bw_network) == 0);
m_channel_state[download_channel] |= peer_info::bw_network;
#ifndef TORRENT_DISABLE_LOGGING
peer_log(peer_log_alert::incoming, "ASYNC_READ");
#endif
ADD_OUTSTANDING_ASYNC("peer_connection::on_receive_data_nb");
m_socket->async_read_some(null_buffers(), make_read_handler(
std::bind(&peer_connection::on_receive_data_nb, self(), _1, _2)));
return 0;
}
TORRENT_ASSERT(max_receive >= 0);
int quota_left = m_quota[download_channel];
if (max_receive > quota_left)
max_receive = quota_left;
if (max_receive == 0)
{
ec = boost::asio::error::would_block;
return 0;
}
if (max_receive == 0) return;
boost::asio::mutable_buffer const vec = m_recv_buffer.reserve(max_receive);
if (s == read_async)
{
TORRENT_ASSERT((m_channel_state[download_channel] & peer_info::bw_network) == 0);
m_channel_state[download_channel] |= peer_info::bw_network;
TORRENT_ASSERT((m_channel_state[download_channel] & peer_info::bw_network) == 0);
m_channel_state[download_channel] |= peer_info::bw_network;
#ifndef TORRENT_DISABLE_LOGGING
peer_log(peer_log_alert::incoming, "ASYNC_READ"
, "max: %d bytes", max_receive);
peer_log(peer_log_alert::incoming, "ASYNC_READ"
, "max: %d bytes", max_receive);
#endif
// utp sockets aren't thread safe...
ADD_OUTSTANDING_ASYNC("peer_connection::on_receive_data");
m_socket->async_read_some(
boost::asio::mutable_buffers_1(vec), make_read_handler(
std::bind(&peer_connection::on_receive_data, self(), _1, _2)));
return 0;
}
size_t const ret = m_socket->read_some(boost::asio::mutable_buffers_1(vec), ec);
// this is weird. You would imagine read_some() would do this
if (ret == 0 && !ec) ec = boost::asio::error::eof;
#ifndef TORRENT_DISABLE_LOGGING
peer_log(peer_log_alert::incoming, "SYNC_READ", "max: %d ret: %d e: %s"
, max_receive, int(ret), ec ? ec.message().c_str() : "");
#endif
return ret;
// utp sockets aren't thread safe...
ADD_OUTSTANDING_ASYNC("peer_connection::on_receive_data");
m_socket->async_read_some(
boost::asio::mutable_buffers_1(vec), make_read_handler(
std::bind(&peer_connection::on_receive_data, self(), _1, _2)));
}
void peer_connection::append_send_buffer(char* buffer, int size
@ -5859,112 +5801,31 @@ namespace libtorrent
bool m_cond;
};
void peer_connection::on_receive_data_nb(const error_code& error
, std::size_t bytes_transferred)
{
TORRENT_ASSERT(is_single_thread());
COMPLETE_ASYNC("peer_connection::on_receive_data_nb");
// leave this bit set until we're done looping, reading from the socket.
// that way we don't trigger any async read calls until the end of this
// function.
TORRENT_ASSERT(m_channel_state[download_channel] & peer_info::bw_network);
// nb is short for null_buffers. In this mode we don't actually
// allocate a receive buffer up-front, but get notified when
// we can read from the socket, and then determine how much there
// is to read.
if (error)
{
TORRENT_ASSERT_VAL(error.value() != 0, error.value());
#ifndef TORRENT_DISABLE_LOGGING
peer_log(peer_log_alert::info, "ERROR"
, "in peer_connection::on_receive_data_nb error: (%s:%d) %s"
, error.category().name(), error.value()
, error.message().c_str());
#endif
on_receive(error, bytes_transferred);
disconnect(error, op_sock_read);
return;
}
error_code ec;
std::size_t buffer_size = m_socket->available(ec);
if (ec)
{
disconnect(ec, op_available);
return;
}
#ifndef TORRENT_DISABLE_LOGGING
peer_log(peer_log_alert::incoming, "READ_AVAILABLE"
, "bytes: %d", int(buffer_size));
#endif
// at this point the ioctl told us the socket doesn't have any
// pending bytes. This probably means some error happened.
// in order to find out though, we need to initiate a read
// operation
if (buffer_size == 0)
{
// try to read one byte. The socket is non-blocking anyway
// so worst case, we'll fail with EWOULDBLOCK
buffer_size = 1;
}
else
{
if (buffer_size > m_quota[download_channel])
{
request_bandwidth(download_channel, int(buffer_size));
buffer_size = m_quota[download_channel];
}
// we're already waiting to get some more
// quota from the bandwidth manager
if (buffer_size == 0)
{
// allow reading from the socket again
TORRENT_ASSERT(m_channel_state[download_channel] & peer_info::bw_network);
m_channel_state[download_channel] &= ~peer_info::bw_network;
return;
}
}
if (buffer_size > 2097152) buffer_size = 2097152;
boost::asio::mutable_buffer buffer = m_recv_buffer.reserve(int(buffer_size));
TORRENT_ASSERT(m_recv_buffer.normalized());
bytes_transferred = m_socket->read_some(boost::asio::mutable_buffers_1(buffer), ec);
if (ec)
{
if (ec == boost::asio::error::try_again
|| ec == boost::asio::error::would_block)
{
// allow reading from the socket again
TORRENT_ASSERT(m_channel_state[download_channel] & peer_info::bw_network);
m_channel_state[download_channel] &= ~peer_info::bw_network;
setup_receive();
return;
}
disconnect(ec, op_sock_read);
return;
}
receive_data_impl(error, bytes_transferred, 0);
}
// --------------------------
// RECEIVE DATA
// --------------------------
// nb is true if this callback is due to a null_buffers()
// invocation of async_read_some(). In that case, we need
// to disregard bytes_transferred.
// at all exit points of this function, one of the following MUST hold:
// 1. the socket is disconnecting
// 2. m_channel_state[download_channel] & peer_info::bw_network == 0
void peer_connection::account_received_bytes(int const bytes_transferred)
{
// tell the receive buffer we just fed it this many bytes of incoming data
TORRENT_ASSERT(bytes_transferred > 0);
m_recv_buffer.received(bytes_transferred);
// update the dl quota
TORRENT_ASSERT(bytes_transferred <= m_quota[download_channel]);
m_quota[download_channel] -= bytes_transferred;
// account receiver buffer size stats to the session
m_ses.received_buffer(bytes_transferred);
// estimage transport protocol overhead
trancieve_ip_packet(bytes_transferred, m_remote.address().is_v6());
#ifndef TORRENT_DISABLE_LOGGING
peer_log(peer_log_alert::incoming, "READ"
, "%d bytes", int(bytes_transferred));
#endif
}
void peer_connection::on_receive_data(const error_code& error
, std::size_t bytes_transferred)
@ -5972,6 +5833,13 @@ namespace libtorrent
TORRENT_ASSERT(is_single_thread());
COMPLETE_ASYNC("peer_connection::on_receive_data");
#ifndef TORRENT_DISABLE_LOGGING
peer_log(peer_log_alert::incoming, "ON_RECEIVE_DATA"
, "bytes: %d error: (%s:%d) %s"
, int(bytes_transferred), error.category().name(), error.value()
, error.message().c_str());
#endif
// leave this bit set until we're done looping, reading from the socket.
// that way we don't trigger any async read calls until the end of this
// function.
@ -5979,19 +5847,23 @@ namespace libtorrent
TORRENT_ASSERT(bytes_transferred > 0 || error);
receive_data_impl(error, bytes_transferred, 10);
}
m_counters.inc_stats_counter(counters::on_read_counter);
void peer_connection::receive_data_impl(const error_code& error
, std::size_t bytes_transferred, int read_loops)
{
TORRENT_ASSERT(is_single_thread());
INVARIANT_CHECK;
if (error)
{
#ifndef TORRENT_DISABLE_LOGGING
peer_log(peer_log_alert::incoming, "ON_RECEIVE_DATA"
, "bytes: %d error: (%s:%d) %s"
, int(bytes_transferred), error.category().name(), error.value()
, error.message().c_str());
peer_log(peer_log_alert::info, "ERROR"
, "in peer_connection::on_receive_data_impl error: %s"
, error.message().c_str());
#endif
on_receive(error, bytes_transferred);
disconnect(error, op_sock_read);
return;
}
m_last_receive = aux::time_now();
// submit all disk jobs later
m_ses.deferred_submit_jobs();
@ -6005,27 +5877,14 @@ namespace libtorrent
// flush the send buffer at the end of this function
cork _c(*this);
INVARIANT_CHECK;
int bytes_in_loop = int(bytes_transferred);
if (error)
{
#ifndef TORRENT_DISABLE_LOGGING
peer_log(peer_log_alert::info, "ERROR"
, "in peer_connection::on_receive_data_impl error: %s"
, error.message().c_str());
#endif
trancieve_ip_packet(bytes_in_loop, m_remote.address().is_v6());
on_receive(error, bytes_transferred);
disconnect(error, op_sock_read);
return;
}
TORRENT_ASSERT(bytes_transferred > 0);
m_counters.inc_stats_counter(counters::on_read_counter);
m_ses.received_buffer(int(bytes_transferred));
// if we received exactly as many bytes as we provided a receive buffer
// for. There most likely are more bytes to read, and we should grow our
// receive buffer.
TORRENT_ASSERT(bytes_transferred <= m_recv_buffer.max_receive());
bool const grow_buffer = (bytes_transferred == m_recv_buffer.max_receive());
account_received_bytes(bytes_transferred);
if (m_extension_outstanding_bytes > 0)
m_extension_outstanding_bytes -= (std::min)(m_extension_outstanding_bytes, int(bytes_transferred));
@ -6033,79 +5892,91 @@ namespace libtorrent
check_graceful_pause();
if (m_disconnecting) return;
int num_loops = 0;
do
// this is the case where we try to grow the receive buffer and try to
// drain the socket
if (grow_buffer)
{
#ifndef TORRENT_DISABLE_LOGGING
peer_log(peer_log_alert::incoming, "READ"
, "%d bytes", int(bytes_transferred));
#endif
// correct the dl quota usage, if not all of the buffer was actually read
TORRENT_ASSERT(int(bytes_transferred) <= m_quota[download_channel]);
m_quota[download_channel] -= int(bytes_transferred);
if (m_disconnecting)
{
trancieve_ip_packet(bytes_in_loop, m_remote.address().is_v6());
return;
}
TORRENT_ASSERT(bytes_transferred > 0);
m_recv_buffer.received(int(bytes_transferred));
int bytes = int(bytes_transferred);
int sub_transferred = 0;
do {
// TODO: The stats checks can not be honored when authenticated encryption is in use
// because we may have encrypted data which we cannot authenticate yet
#if 0
std::int64_t cur_payload_dl = m_statistics.last_payload_downloaded();
std::int64_t cur_protocol_dl = m_statistics.last_protocol_downloaded();
#endif
sub_transferred = m_recv_buffer.advance_pos(bytes);
on_receive(error, sub_transferred);
bytes -= sub_transferred;
TORRENT_ASSERT(sub_transferred > 0);
#if 0
TORRENT_ASSERT(m_statistics.last_payload_downloaded() - cur_payload_dl >= 0);
TORRENT_ASSERT(m_statistics.last_protocol_downloaded() - cur_protocol_dl >= 0);
std::int64_t stats_diff = m_statistics.last_payload_downloaded() - cur_payload_dl +
m_statistics.last_protocol_downloaded() - cur_protocol_dl;
TORRENT_ASSERT(stats_diff == int(sub_transferred));
#endif
if (m_disconnecting) return;
} while (bytes > 0 && sub_transferred > 0);
m_recv_buffer.normalize();
TORRENT_ASSERT(m_recv_buffer.pos_at_end());
TORRENT_ASSERT(m_recv_buffer.packet_size() > 0);
if (m_peer_choked)
{
m_recv_buffer.clamp_size();
}
if (num_loops > read_loops) break;
error_code ec;
bytes_transferred = try_read(read_sync, ec);
TORRENT_ASSERT(bytes_transferred > 0 || ec);
if (ec == boost::asio::error::would_block || ec == boost::asio::error::try_again) break;
std::size_t buffer_size = m_socket->available(ec);
if (ec)
{
trancieve_ip_packet(bytes_in_loop, m_remote.address().is_v6());
disconnect(ec, op_sock_read);
disconnect(ec, op_available);
return;
}
bytes_in_loop += int(bytes_transferred);
++num_loops;
}
while (bytes_transferred > 0);
m_last_receive = aux::time_now();
#ifndef TORRENT_DISABLE_LOGGING
peer_log(peer_log_alert::incoming, "AVAILABLE"
, "%d bytes", int(buffer_size));
#endif
request_bandwidth(download_channel, buffer_size);
int const quota_left = m_quota[download_channel];
if (buffer_size > quota_left) buffer_size = quota_left;
if (buffer_size > 0)
{
boost::asio::mutable_buffer const vec = m_recv_buffer.reserve(buffer_size);
size_t bytes = m_socket->read_some(boost::asio::mutable_buffers_1(vec), ec);
// this is weird. You would imagine read_some() would do this
if (bytes == 0 && !ec) ec = boost::asio::error::eof;
#ifndef TORRENT_DISABLE_LOGGING
peer_log(peer_log_alert::incoming, "SYNC_READ", "max: %d ret: %d e: %s"
, int(buffer_size), int(bytes), ec ? ec.message().c_str() : "");
#endif
TORRENT_ASSERT(bytes > 0 || ec);
if (ec == boost::asio::error::would_block || ec == boost::asio::error::try_again)
{
bytes = 0;
}
else if (ec)
{
disconnect(ec, op_sock_read);
return;
}
account_received_bytes(bytes);
bytes_transferred += bytes;
}
}
// feed bytes in receive buffer to upper layer by calling on_receive()
bool const prev_choked = m_peer_choked;
int bytes = int(bytes_transferred);
int sub_transferred = 0;
do {
sub_transferred = m_recv_buffer.advance_pos(bytes);
on_receive(error, sub_transferred);
bytes -= sub_transferred;
TORRENT_ASSERT(sub_transferred > 0);
if (m_disconnecting) return;
} while (bytes > 0 && sub_transferred > 0);
// if the peer went from unchoked to choked, suggest to the receive
// buffer that it shrinks to 100 bytes
int const force_shrink = (m_peer_choked && !prev_choked)
? 100 : 0;
m_recv_buffer.normalize(force_shrink);
if (m_recv_buffer.max_receive() == 0)
{
// the message we're receiving is larger than our receive
// buffer, we must grow.
int const buffer_size_limit
= m_settings.get_int(settings_pack::max_peer_recv_buffer_size);
m_recv_buffer.grow(buffer_size_limit);
#ifndef TORRENT_DISABLE_LOGGING
peer_log(peer_log_alert::incoming, "GROW_BUFFER", "%d bytes"
, m_recv_buffer.capacity());
#endif
}
TORRENT_ASSERT(m_recv_buffer.pos_at_end());
TORRENT_ASSERT(m_recv_buffer.packet_size() > 0);
if (is_seed())
{
@ -6113,8 +5984,6 @@ namespace libtorrent
if (t) t->seen_complete();
}
trancieve_ip_packet(bytes_in_loop, m_remote.address().is_v6());
// allow reading from the socket again
TORRENT_ASSERT(m_channel_state[download_channel] & peer_info::bw_network);
m_channel_state[download_channel] &= ~peer_info::bw_network;
@ -6345,16 +6214,14 @@ namespace libtorrent
time_point now = clock_type::now();
for (std::vector<pending_block>::iterator i = m_download_queue.begin()
, end(m_download_queue.end()); i != end; ++i)
for (auto& block : m_download_queue)
{
if (i->send_buffer_offset == pending_block::not_in_buffer) continue;
std::int32_t offset = i->send_buffer_offset;
offset -= int(bytes_transferred);
if (offset < 0)
i->send_buffer_offset = pending_block::not_in_buffer;
if (block.send_buffer_offset == pending_block::not_in_buffer)
continue;
if (block.send_buffer_offset < int(bytes_transferred))
block.send_buffer_offset = pending_block::not_in_buffer;
else
i->send_buffer_offset = offset;
block.send_buffer_offset -= int(bytes_transferred);
}
m_channel_state[upload_channel] &= ~peer_info::bw_network;

View File

@ -34,61 +34,65 @@ POSSIBILITY OF SUCH DAMAGE.
namespace libtorrent {
namespace {
int round_up8(int v)
{
return ((v & 7) == 0) ? v : v + (8 - (v & 7));
}
}
int receive_buffer::max_receive()
int receive_buffer::max_receive() const
{
int max = packet_bytes_remaining();
if (m_recv_pos >= m_soft_packet_size) m_soft_packet_size = 0;
if (m_soft_packet_size && max > m_soft_packet_size - m_recv_pos)
max = m_soft_packet_size - m_recv_pos;
return max;
return int(m_recv_buffer.size() - m_recv_end);
}
boost::asio::mutable_buffer receive_buffer::reserve(int size)
{
TORRENT_ASSERT(size > 0);
TORRENT_ASSERT(m_recv_pos >= 0);
// this is unintuitive, but we used to use m_recv_pos in this function when
// we should have used m_recv_end. perhaps they always happen to be equal
TORRENT_ASSERT(m_recv_pos == m_recv_end);
// normalize() must be called before receiving more data
TORRENT_ASSERT(m_recv_start == 0);
m_recv_buffer.resize(m_recv_end + size);
if (m_recv_buffer.size() < m_recv_end + size)
{
int const new_size = std::max(m_recv_end + size, m_packet_size);
buffer new_buffer(new_size
, aux::array_view<char const>(m_recv_buffer.ptr(), m_recv_end));
m_recv_buffer = std::move(new_buffer);
// since we just increased the size of the buffer, reset the watermark to
// start at our new size (avoid flapping the buffer size)
m_watermark = sliding_average<20>();
}
return boost::asio::buffer(&m_recv_buffer[0] + m_recv_end, size);
}
void receive_buffer::grow(int const limit)
{
int const current_size = int(m_recv_buffer.size());
TORRENT_ASSERT(current_size < std::numeric_limits<int>::max() / 3);
// first grow to one piece message, then grow by 50% each time
int const new_size = (current_size < m_packet_size)
? m_packet_size : std::min(current_size * 3 / 2, limit);
// re-allcoate the buffer and copy over the part of it that's used
buffer new_buffer(new_size
, aux::array_view<char const>(m_recv_buffer.ptr(), m_recv_end));
m_recv_buffer = std::move(new_buffer);
// since we just increased the size of the buffer, reset the watermark to
// start at our new size (avoid flapping the buffer size)
m_watermark = sliding_average<20>();
}
int receive_buffer::advance_pos(int bytes)
{
int const packet_size = m_soft_packet_size ? m_soft_packet_size : m_packet_size;
int const limit = packet_size > m_recv_pos ? packet_size - m_recv_pos : packet_size;
int const limit = m_packet_size > m_recv_pos ? m_packet_size - m_recv_pos : m_packet_size;
int const sub_transferred = (std::min)(bytes, limit);
m_recv_pos += sub_transferred;
if (m_recv_pos >= m_soft_packet_size) m_soft_packet_size = 0;
return sub_transferred;
}
void receive_buffer::clamp_size()
{
if (m_recv_pos == 0
&& (m_recv_buffer.capacity() - m_packet_size) > 128)
{
// round up to an even 8 bytes since that's the RC4 blocksize
buffer(round_up8(m_packet_size)).swap(m_recv_buffer);
}
}
// size = the packet size to remove from the receive buffer
// packet_size = the next packet size to receive in the buffer
// offset = the offset into the receive buffer where to remove `size` bytes
void receive_buffer::cut(int size, int packet_size, int offset)
void receive_buffer::cut(int const size, int const packet_size, int const offset)
{
TORRENT_ASSERT(packet_size > 0);
TORRENT_ASSERT(int(m_recv_buffer.size()) >= size);
@ -104,9 +108,11 @@ void receive_buffer::cut(int size, int packet_size, int offset)
TORRENT_ASSERT(m_recv_start - size <= m_recv_end);
if (size > 0)
{
std::memmove(&m_recv_buffer[0] + m_recv_start + offset
, &m_recv_buffer[0] + m_recv_start + offset + size
, m_recv_end - m_recv_start - size - offset);
}
m_recv_pos -= size;
m_recv_end -= size;
@ -168,13 +174,41 @@ boost::asio::mutable_buffer receive_buffer::mutable_buffer(int const bytes)
// the purpose of this function is to free up and cut off all messages
// in the receive buffer that have been parsed and processed.
void receive_buffer::normalize()
// it may also shrink the size of the buffer allocation if we haven't been using
// enough of it lately.
void receive_buffer::normalize(int force_shrink)
{
TORRENT_ASSERT(m_recv_end >= m_recv_start);
if (m_recv_start == 0) return;
if (m_recv_end > m_recv_start)
std::memmove(&m_recv_buffer[0], &m_recv_buffer[0] + m_recv_start, m_recv_end - m_recv_start);
m_watermark.add_sample(std::max(m_recv_end, m_packet_size));
// if the running average drops below half of the current buffer size,
// reallocate a smaller one.
bool const shrink_buffer = m_recv_buffer.size() / 2 > m_watermark.mean()
&& m_watermark.mean() > (m_recv_end - m_recv_start);
aux::array_view<char const> bytes_to_shift(
m_recv_buffer.ptr() + m_recv_start
, m_recv_end - m_recv_start);
if (force_shrink)
{
const int target_size = std::max(std::max(force_shrink
, int(bytes_to_shift.size())), m_packet_size);
buffer new_buffer(target_size, bytes_to_shift);
m_recv_buffer = std::move(new_buffer);
}
else if (shrink_buffer)
{
buffer new_buffer(m_watermark.mean(), bytes_to_shift);
m_recv_buffer = std::move(new_buffer);
}
else if (m_recv_end > m_recv_start
&& m_recv_start > 0)
{
std::memmove(m_recv_buffer.ptr(), bytes_to_shift.data()
, bytes_to_shift.size());
}
m_recv_end -= m_recv_start;
m_recv_start = 0;
@ -276,24 +310,14 @@ void crypto_receive_buffer::crypto_reset(int packet_size)
}
}
void crypto_receive_buffer::set_soft_packet_size(int size)
{
if (m_recv_pos == INT_MAX)
m_connection_buffer.set_soft_packet_size(size);
else
m_soft_packet_size = size;
}
int crypto_receive_buffer::advance_pos(int bytes)
{
if (m_recv_pos == INT_MAX) return bytes;
int packet_size = m_soft_packet_size ? m_soft_packet_size : m_packet_size;
int limit = packet_size > m_recv_pos ? packet_size - m_recv_pos : packet_size;
int sub_transferred = (std::min)(bytes, limit);
int const limit = m_packet_size > m_recv_pos ? m_packet_size - m_recv_pos : m_packet_size;
int const sub_transferred = (std::min)(bytes, limit);
m_recv_pos += sub_transferred;
m_connection_buffer.cut(0, m_connection_buffer.packet_size() + sub_transferred);
if (m_recv_pos >= m_soft_packet_size) m_soft_packet_size = 0;
return sub_transferred;
}

View File

@ -84,6 +84,8 @@ namespace libtorrent
set.set_bool(settings_pack::contiguous_recv_buffer, false);
#endif
set.set_int(settings_pack::max_peer_recv_buffer_size, 32 * 1024 + 200);
set.set_int(settings_pack::disk_io_write_mode, settings_pack::disable_os_cache);
set.set_int(settings_pack::disk_io_read_mode, settings_pack::disable_os_cache);
@ -171,6 +173,8 @@ namespace libtorrent
set.set_int(settings_pack::max_out_request_queue, 1500);
set.set_int(settings_pack::max_allowed_in_request_queue, 2000);
set.set_int(settings_pack::max_peer_recv_buffer_size, 5 * 1024 * 1024);
// we will probably see a high rate of alerts, make it less
// likely to loose alerts
set.set_int(settings_pack::alert_queue_size, 10000);

View File

@ -279,6 +279,7 @@ namespace libtorrent
SET(max_rejects, 50, 0),
SET(recv_socket_buffer_size, 0, &session_impl::update_socket_buffer_size),
SET(send_socket_buffer_size, 0, &session_impl::update_socket_buffer_size),
SET_NOPREV(max_peer_recv_buffer_size, 2 * 1024 * 1024, 0),
SET(file_checks_delay_per_block, 0, 0),
SET(read_cache_line_size, 32, 0),
SET(write_cache_line_size, 16, 0),

View File

@ -43,125 +43,97 @@ POSSIBILITY OF SUCH DAMAGE.
using namespace libtorrent;
/*
template<class T>
T const& min_(T const& x, T const& y)
{
return x < y ? x : y;
}
void test_speed()
{
buffer b;
char data[32];
srand(0);
boost::timer t;
int const iterations = 5000000;
int const step = iterations / 20;
for (int i = 0; i < iterations; ++i)
{
int x = rand();
if (i % step == 0) std::cerr << ".";
std::size_t n = rand() % 32;
n = 32;
if (x % 2)
{
b.insert(data, data + n);
}
else
{
b.erase(min_(b.size(), n));
}
}
float t1 = t.elapsed();
std::cerr << "buffer elapsed: " << t.elapsed() << "\n";
std::vector<char> v;
srand(0);
t.restart();
for (int i = 0; i < iterations; ++i)
{
int x = rand();
if (i % step == 0) std::cerr << ".";
std::size_t n = rand() % 32;
n = 32;
if (x % 2)
{
v.insert(v.end(), data, data + n);
}
else
{
v.erase(v.begin(), v.begin() + min_(v.size(), n));
}
}
float t2 = t.elapsed();
std::cerr << "std::vector elapsed: " << t.elapsed() << "\n";
assert(t1 < t2);
}
*/
// -- test buffer --
TORRENT_TEST(buffer)
static char const data[] = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
TORRENT_TEST(buffer_constructor)
{
char data[] = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
buffer b;
{
buffer b;
TEST_CHECK(b.size() == 0);
TEST_CHECK(b.empty());
}
TEST_CHECK(b.size() == 0);
TEST_CHECK(b.capacity() == 0);
TEST_CHECK(b.empty());
{
buffer b(10);
TEST_CHECK(b.size() >= 10);
}
b.resize(10);
TEST_CHECK(b.size() == 10);
TEST_CHECK(b.capacity() == 10);
{
buffer b(50, data);
TEST_CHECK(std::memcmp(b.ptr(), data, 10) == 0);
TEST_CHECK(b.size() >= 50);
}
}
std::memcpy(b.begin(), data, 10);
b.reserve(50);
TEST_CHECK(std::memcmp(b.begin(), data, 10) == 0);
TEST_CHECK(b.capacity() == 50);
TORRENT_TEST(buffer_swap)
{
buffer b1;
TEST_CHECK(b1.size() == 0);
buffer b2(10, data);
std::size_t const b2_size = b2.size();
TEST_CHECK(b2_size >= 10);
b.erase(b.begin() + 6, b.end());
TEST_CHECK(std::memcmp(b.begin(), data, 6) == 0);
TEST_CHECK(b.capacity() == 50);
TEST_CHECK(b.size() == 6);
b1.swap(b2);
b.insert(b.begin(), data + 5, data + 10);
TEST_CHECK(b.capacity() == 50);
TEST_CHECK(b.size() == 11);
TEST_CHECK(std::memcmp(b.begin(), data + 5, 5) == 0);
TEST_CHECK(b2.size() == 0);
TEST_CHECK(b1.size() == b2_size);
TEST_CHECK(std::memcmp(b1.ptr(), data, 10) == 0);
}
b.clear();
TEST_CHECK(b.size() == 0);
TEST_CHECK(b.capacity() == 50);
TORRENT_TEST(buffer_subscript)
{
buffer b(50, data);
TEST_CHECK(std::memcmp(b.ptr(), data, 10) == 0);
TEST_CHECK(b.size() >= 50);
b.insert(b.end(), data, data + 10);
TEST_CHECK(b.size() == 10);
TEST_CHECK(std::memcmp(b.begin(), data, 10) == 0);
for (int i = 0; i < int(sizeof(data)/sizeof(data[0])); ++i)
TEST_CHECK(b[i] == data[i]);
}
b.erase(b.begin(), b.end());
TEST_CHECK(b.capacity() == 50);
TEST_CHECK(b.size() == 0);
TORRENT_TEST(buffer_subscript2)
{
buffer b(1);
TEST_CHECK(b.size() >= 1);
buffer().swap(b);
TEST_CHECK(b.capacity() == 0);
for (int i = 0; i < int(b.size()); ++i)
b[i] = i & 0xff;
for (int i = 0; i < int(b.size()); ++i)
TEST_CHECK(b[i] == (i & 0xff));
}
TORRENT_TEST(buffer_move_construct)
{
buffer b1(50, data);
TEST_CHECK(std::memcmp(b1.ptr(), data, 10) == 0);
TEST_CHECK(b1.size() >= 50);
buffer b2(std::move(b1));
TEST_CHECK(b1.size() == 0);
TEST_CHECK(std::memcmp(b2.ptr(), data, 10) == 0);
TEST_CHECK(b2.size() >= 50);
}
TORRENT_TEST(buffer_move_assign)
{
buffer b1(50, data);
TEST_CHECK(std::memcmp(b1.ptr(), data, 10) == 0);
TEST_CHECK(b1.size() >= 50);
buffer b2;
TEST_CHECK(b2.size() == 0);
b2 = std::move(b1);
TEST_CHECK(b1.size() == 0);
TEST_CHECK(std::memcmp(b2.ptr(), data, 10) == 0);
TEST_CHECK(b2.size() >= 50);
}
// -- test chained buffer --

View File

@ -92,6 +92,128 @@ TORRENT_TEST(recv_buffer_packet_finished)
TEST_EQUAL(b.packet_finished(), true);
}
TORRENT_TEST(recv_buffer_grow_floor)
{
receive_buffer b;
b.reset(1337);
b.grow(100000);
// the exact size depends on the OS allocator. Technically there's no upper
// bound, but it's likely withint some reasonable size
TEST_CHECK(b.capacity() >= 1337);
TEST_CHECK(b.capacity() < 1337 + 1000);
}
TORRENT_TEST(recv_buffer_grow)
{
receive_buffer b;
b.reserve(200);
b.grow(100000);
// grow by 50%
TEST_CHECK(b.capacity() >= 300);
TEST_CHECK(b.capacity() < 300 + 500);
}
TORRENT_TEST(recv_buffer_grow_limit)
{
receive_buffer b;
b.reserve(2000);
b.grow(2100);
// grow by 50%, but capped by 2100 bytes
TEST_CHECK(b.capacity() >= 2100);
TEST_CHECK(b.capacity() < 2100 + 500);
printf("capacity: %d\n", b.capacity());
}
TORRENT_TEST(recv_buffer_reserve_minimum_grow)
{
receive_buffer b;
b.reset(1337);
b.reserve(20);
// we only asked for 20 more bytes, but since the message size was set to
// 1337, that's the minimum size to grow to
TEST_CHECK(b.capacity() >= 1337);
TEST_CHECK(b.capacity() < 1337 + 1000);
}
TORRENT_TEST(recv_buffer_reserve_grow)
{
receive_buffer b;
b.reserve(20);
TEST_CHECK(b.capacity() >= 20);
TEST_CHECK(b.capacity() < 20 + 500);
}
TORRENT_TEST(recv_buffer_reserve)
{
receive_buffer b;
auto range1 = b.reserve(100);
int const capacity = b.capacity();
b.reset(20);
b.received(20);
TEST_EQUAL(b.capacity(), capacity);
auto range2 = b.reserve(50);
using namespace boost::asio;
TEST_EQUAL(b.capacity(), capacity);
TEST_EQUAL(buffer_cast<char*>(range1) + 20, buffer_cast<char*>(range2));
TEST_CHECK(buffer_size(range1) >= 20);
TEST_CHECK(buffer_size(range2) >= 50);
}
TORRENT_TEST(receive_buffer_normalize)
{
receive_buffer b;
b.reset(16000);
// receive one large packet, to allocate a large receive buffer
for (int i = 0; i < 16; ++i)
{
b.reserve(1000);
b.received(1000);
b.normalize();
}
TEST_CHECK(b.capacity() >= 16000);
int const start_capacity = b.capacity();
// then receive lots of small packets. We should eventually re-allocate down
// to a smaller buffer
for (int i = 0; i < 15; ++i)
{
b.reset(160);
b.reserve(160);
b.received(160);
b.normalize();
printf("capacity: %d watermark: %d\n", b.capacity(), b.watermark());
}
TEST_CHECK(b.capacity() <= start_capacity / 2);
printf("capacity: %d\n", b.capacity());
}
TORRENT_TEST(receive_buffer_max_receive)
{
receive_buffer b;
b.reset(2000);
b.reserve(2000);
b.received(2000);
b.normalize();
b.reset(20);
int const max_receive = b.max_receive();
TEST_CHECK(max_receive >= 2000);
b.received(20);
TEST_EQUAL(b.max_receive(), max_receive - 20);
}
#if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS)
TORRENT_TEST(recv_buffer_mutable_buffers)