diff --git a/examples/client_test.cpp b/examples/client_test.cpp index 3c51429d3..c7d512f13 100644 --- a/examples/client_test.cpp +++ b/examples/client_test.cpp @@ -389,7 +389,7 @@ int print_peer_info(std::string& out if (print_ip) out += "IP "; out += "progress down (total | peak ) up (total | peak ) sent-req tmo bsy rcv flags dn up source "; if (print_fails) out += "fail hshf "; - if (print_send_bufs) out += "rq sndb rcvb q-bytes "; + if (print_send_bufs) out += "rq sndb (recvb |alloc | wmrk ) q-bytes "; if (print_timers) out += "inactive wait timeout q-time "; out += " v disk ^ rtt "; if (print_block) out += "block-progress "; @@ -471,9 +471,11 @@ int print_peer_info(std::string& out } if (print_send_bufs) { - std::snprintf(str, sizeof(str), "%2d %6d %6d%5dkB " + std::snprintf(str, sizeof(str), "%2d %6d %6d|%6d|%6d%5dkB " , i->requests_in_buffer, i->used_send_buffer , i->used_receive_buffer + , i->receive_buffer_size + , i->receive_buffer_watermark , i->queue_bytes / 1000); out += str; } diff --git a/include/libtorrent/aux_/array_view.hpp b/include/libtorrent/aux_/array_view.hpp index 2c9b438dc..f833860d7 100644 --- a/include/libtorrent/aux_/array_view.hpp +++ b/include/libtorrent/aux_/array_view.hpp @@ -34,6 +34,7 @@ POSSIBILITY OF SUCH DAMAGE. #define TORRENT_ARRAY_VIEW_HPP_INCLUDED #include +#include #include // for std::is_convertible #include "libtorrent/assert.hpp" diff --git a/include/libtorrent/aux_/suggest_piece.hpp b/include/libtorrent/aux_/suggest_piece.hpp index bfe4f4d63..6ff363cdf 100644 --- a/include/libtorrent/aux_/suggest_piece.hpp +++ b/include/libtorrent/aux_/suggest_piece.hpp @@ -107,10 +107,6 @@ struct suggest_piece } m_priority_pieces.push_back(index); - - std::printf("SUGGEST: "); - for (int p : m_priority_pieces) std::printf(" %d", p); - std::printf("\n"); } private: diff --git a/include/libtorrent/buffer.hpp b/include/libtorrent/buffer.hpp index f1f26b727..192560849 100644 --- a/include/libtorrent/buffer.hpp +++ b/include/libtorrent/buffer.hpp @@ -40,12 +40,34 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/invariant_check.hpp" #include "libtorrent/assert.hpp" +#include "libtorrent/aux_/array_view.hpp" + +#if defined __GLIBC__ +#include +#elif defined _MSC_VER +#include +#elif defined TORRENT_BSD +#include +#endif namespace libtorrent { + namespace { + inline std::size_t round_up8(std::size_t const v) + { + return (v + 7) & (~std::size_t(0x7)); + } + } + +// the buffer is allocated once and cannot be resized. The size() may be +// larger than requested, in case the underlying allocator over allocated. In +// order to "grow" an allocation, create a new buffer and initialize it by +// the range of bytes from the existing, and move-assign the new over the +// old. class buffer { public: + struct interval { interval() @@ -110,31 +132,58 @@ public: char const* end; }; - buffer(std::size_t n = 0) - : m_begin(0) - , m_size(0) - , m_capacity(0) + // allocate an uninitialized buffer of the specified size + buffer(std::size_t size = 0) { - if (n) resize(n); + TORRENT_ASSERT(size < std::size_t(std::numeric_limits::max())); + + if (size == 0) return; + + size = round_up8(size); + + m_begin = static_cast(std::malloc(size)); + if (m_begin == nullptr) + { +#ifndef BOOST_NO_EXCEPTIONS + throw std::bad_alloc(); +#else + std::terminate(); +#endif + } + + // the actual allocation may be larger than we requested. If so, let the + // user take advantage of every single byte +#if defined __GLIBC__ + m_size = malloc_usable_size(m_begin); +#elif defined _MSC_VER + m_size = _msize(m_begin); +#elif defined TORRENT_BSD + m_size = malloc_size(m_begin); +#else + m_size = size; +#endif } - buffer(buffer const& b) - : m_begin(0) - , m_size(0) - , m_capacity(0) + // allocate an uninitialized buffer of the specified size + // and copy the initialization range into the start of the buffer + buffer(std::size_t const size, aux::array_view initialize) + : buffer(size) { - if (b.size() == 0) return; - resize(b.size()); - std::memcpy(m_begin, b.begin(), b.size()); + TORRENT_ASSERT(initialize.size() <= size); + if (initialize.size() > 0) + { + memcpy(m_begin, initialize.data(), std::min(initialize.size(), size)); + } } + buffer(buffer const& b) = delete; + buffer(buffer&& b) : m_begin(b.m_begin) , m_size(b.m_size) - , m_capacity(b.m_capacity) { b.m_begin = nullptr; - b.m_size = b.m_capacity = 0; + b.m_size = 0; } buffer& operator=(buffer&& b) @@ -143,86 +192,30 @@ public: std::free(m_begin); m_begin = b.m_begin; m_size = b.m_size; - m_capacity = b.m_capacity; b.m_begin = nullptr; - b.m_size = b.m_capacity = 0; + b.m_size = 0; return *this; } - buffer& operator=(buffer const& b) - { - if (&b == this) return *this; - resize(b.size()); - if (b.size() == 0) return *this; - std::memcpy(m_begin, b.begin(), b.size()); - return *this; - } + buffer& operator=(buffer const& b) = delete; - ~buffer() - { - std::free(m_begin); - } + ~buffer() { std::free(m_begin); } + + // TODO: 3 fix the naming convention here + char* ptr() { return m_begin; } + char const* ptr() const { return m_begin; } buffer::interval data() { return interval(m_begin, m_begin + m_size); } buffer::const_interval data() const - { return const_interval(m_begin, m_begin + m_size); } + { return interval(m_begin, m_begin + m_size); } - void resize(std::size_t n) - { - TORRENT_ASSERT(n < std::numeric_limits::max()); - reserve(n); - m_size = std::uint32_t(n); - } + operator aux::array_view() + { return aux::array_view(m_begin, int(m_size)); } + operator aux::array_view() const + { return aux::array_view(m_begin, int(m_size)); } - void insert(char* point, char const* first, char const* last) - { - std::size_t p = point - m_begin; - if (point == m_begin + m_size) - { - resize(size() + last - first); - std::memcpy(m_begin + p, first, last - first); - return; - } - - resize(size() + last - first); - std::memmove(m_begin + p + (last - first), m_begin + p, last - first); - std::memcpy(m_begin + p, first, last - first); - } - - void erase(char* b, char* e) - { - TORRENT_ASSERT(e <= m_begin + m_size); - TORRENT_ASSERT(b >= m_begin); - TORRENT_ASSERT(b <= e); - if (e == m_begin + m_size) - { - resize(b - m_begin); - return; - } - std::memmove(b, e, m_begin + m_size - e); - TORRENT_ASSERT(e >= b); - TORRENT_ASSERT(uintptr_t(e - b) <= std::numeric_limits::max()); - TORRENT_ASSERT(uintptr_t(e - b) <= m_size); - m_size -= std::uint32_t(e - b); - } - - void clear() { m_size = 0; } std::size_t size() const { return m_size; } - std::size_t capacity() const { return m_capacity; } - void reserve(std::size_t n) - { - if (n <= capacity()) return; - TORRENT_ASSERT(n > 0); - TORRENT_ASSERT(n < 0xffffffffu); - - char* tmp = static_cast(std::realloc(m_begin, n)); -#ifndef BOOST_NO_EXCEPTIONS - if (tmp == nullptr) throw std::bad_alloc(); -#endif - m_begin = tmp; - m_capacity = std::uint32_t(n); - } bool empty() const { return m_size == 0; } char& operator[](std::size_t i) { TORRENT_ASSERT(i < size()); return m_begin[i]; } @@ -238,15 +231,13 @@ public: using std::swap; swap(m_begin, b.m_begin); swap(m_size, b.m_size); - swap(m_capacity, b.m_capacity); } private: - char* m_begin; - std::uint32_t m_size; - std::uint32_t m_capacity; + char* m_begin = nullptr; + // m_begin points to an allocation of this size. + std::size_t m_size = 0; }; - } #endif // BTORRENT_BUFFER_HPP_INCLUDED diff --git a/include/libtorrent/peer_connection.hpp b/include/libtorrent/peer_connection.hpp index ffc28a047..2f87d7fe0 100644 --- a/include/libtorrent/peer_connection.hpp +++ b/include/libtorrent/peer_connection.hpp @@ -719,8 +719,6 @@ namespace libtorrent protected: - size_t try_read(sync_t s, error_code& ec); - virtual void get_specific_peer_info(peer_info& p) const = 0; virtual void write_choke() = 0; @@ -742,6 +740,7 @@ namespace libtorrent virtual void on_connected() = 0; virtual void on_tick() {} + // implemented by concrete connection classes virtual void on_receive(error_code const& error , std::size_t bytes_transferred) = 0; virtual void on_sent(error_code const& error @@ -752,7 +751,10 @@ namespace libtorrent virtual std::tuple> hit_send_barrier(aux::array_view /* iovec */) - { return std::make_tuple(INT_MAX, aux::array_view()); } + { + return std::make_tuple(INT_MAX + , aux::array_view()); + } void attach_to_torrent(sha1_hash const& ih); @@ -760,21 +762,6 @@ namespace libtorrent void update_desired_queue_size(); - // called from the main loop when this connection has any - // work to do. - void on_send_data(error_code const& error - , std::size_t bytes_transferred); - void on_receive_data(error_code const& error - , std::size_t bytes_transferred); - - // _nb means null_buffers. i.e. we just know the socket is - // readable at this point, we don't know how much has been received - void on_receive_data_nb(error_code const& error - , std::size_t bytes_transferred); - - void receive_data_impl(error_code const& error - , std::size_t bytes_transferred, int read_loops); - void set_send_barrier(int bytes) { TORRENT_ASSERT(bytes == INT_MAX || bytes <= send_buffer_size()); @@ -788,6 +775,15 @@ namespace libtorrent io_service& get_io_service() { return m_ios; } private: + + // callbacks for data being sent or received + void on_send_data(error_code const& error + , std::size_t bytes_transferred); + void on_receive_data(error_code const& error + , std::size_t bytes_transferred); + + void account_received_bytes(int bytes_transferred); + // explicitly disallow assignment, to silence msvc warning peer_connection& operator=(peer_connection const&); diff --git a/include/libtorrent/peer_info.hpp b/include/libtorrent/peer_info.hpp index e372b056a..5a5f371fd 100644 --- a/include/libtorrent/peer_info.hpp +++ b/include/libtorrent/peer_info.hpp @@ -246,6 +246,7 @@ namespace libtorrent // allocated and used as receive buffer, respectively. int receive_buffer_size; int used_receive_buffer; + int receive_buffer_watermark; // the number of pieces this peer has participated in sending us that // turned out to fail the hash check. diff --git a/include/libtorrent/receive_buffer.hpp b/include/libtorrent/receive_buffer.hpp index a186171b3..b73ff84ce 100644 --- a/include/libtorrent/receive_buffer.hpp +++ b/include/libtorrent/receive_buffer.hpp @@ -35,6 +35,7 @@ POSSIBILITY OF SUCH DAMAGE. #include #include +#include #include namespace libtorrent { @@ -43,14 +44,6 @@ struct TORRENT_EXTRA_EXPORT receive_buffer { friend struct crypto_receive_buffer; - receive_buffer() - : m_recv_start(0) - , m_recv_end(0) - , m_recv_pos(0) - , m_packet_size(0) - , m_soft_packet_size(0) - {} - int packet_size() const { return m_packet_size; } int packet_bytes_remaining() const { @@ -59,19 +52,15 @@ struct TORRENT_EXTRA_EXPORT receive_buffer return m_packet_size - m_recv_pos; } - int max_receive(); + int max_receive() const; bool packet_finished() const { return m_packet_size <= m_recv_pos; } int pos() const { return m_recv_pos; } - int capacity() const { return int(m_recv_buffer.capacity()); } - - int regular_buffer_size() const - { - TORRENT_ASSERT(m_packet_size > 0); - return m_packet_size; - } + int capacity() const { return int(m_recv_buffer.size()); } + int watermark() const { return m_watermark.mean(); } boost::asio::mutable_buffer reserve(int size); + void grow(int limit); // tell the buffer we just received more bytes at the end of it. This will // advance the end cursor @@ -89,11 +78,6 @@ struct TORRENT_EXTRA_EXPORT receive_buffer // has the read cursor reached the end cursor? bool pos_at_end() { return m_recv_pos == m_recv_end; } - // make the buffer size divisible by 8 bytes (RC4 block size) - void clamp_size(); - - void set_soft_packet_size(int size) { m_soft_packet_size = size; } - // size = the packet size to remove from the receive buffer // packet_size = the next packet size to receive in the buffer // offset = the offset into the receive buffer where to remove `size` bytes @@ -104,8 +88,7 @@ struct TORRENT_EXTRA_EXPORT receive_buffer buffer::const_interval get() const; #if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) - // returns the entire regular buffer - // should only be used during the handshake + // returns the entire buffer buffer::interval mutable_buffer(); // returns the last 'bytes' from the receive buffer @@ -114,7 +97,7 @@ struct TORRENT_EXTRA_EXPORT receive_buffer // the purpose of this function is to free up and cut off all messages // in the receive buffer that have been parsed and processed. - void normalize(); + void normalize(int force_shrink = 0); bool normalized() const { return m_recv_start == 0; } void reset(int packet_size); @@ -130,16 +113,15 @@ private: // explicitly disallow assignment, to silence msvc warning receive_buffer& operator=(receive_buffer const&); - // recv_buf.begin (start of actual receive buffer) + // m_recv_buffer.data() (start of actual receive buffer) // | - // | m_recv_start (logical start of current - // | | receive buffer, as perceived by upper layers) + // | m_recv_start (tart of current packet) // | | // | | m_recv_pos (number of bytes consumed // | | | by upper layer, from logical receive buffer) // | | | // | x---------x - // | | | recv_buf.end (end of actual receive buffer) + // | | | m_recv_buffer.size() (end of actual receive buffer) // | | | | // v v v v // *------==========--------- @@ -151,25 +133,23 @@ private: // m_recv_buffer // the start of the logical receive buffer - int m_recv_start; + int m_recv_start = 0; // the number of valid, received bytes in m_recv_buffer - int m_recv_end; + int m_recv_end = 0; // the byte offset in m_recv_buffer that we have // are passing on to the upper layer. This is // always <= m_recv_end - int m_recv_pos; + int m_recv_pos = 0; // the size (in bytes) of the bittorrent message // we're currently receiving - int m_packet_size; + int m_packet_size = 0; - // the number of bytes that the other - // end has to send us in order to respond - // to all outstanding piece requests we - // have sent to it - int m_soft_packet_size; + // keep track of how much of the receive buffer we use, if we're not using + // enuogh of it we shrink it + sliding_average<20> m_watermark; buffer m_recv_buffer; }; @@ -183,10 +163,7 @@ private: struct crypto_receive_buffer { crypto_receive_buffer(receive_buffer& next) - : m_recv_pos(INT_MAX) - , m_packet_size(0) - , m_soft_packet_size(0) - , m_connection_buffer(next) + : m_connection_buffer(next) {} buffer::interval mutable_buffer() { return m_connection_buffer.mutable_buffer(); } @@ -219,8 +196,6 @@ struct crypto_receive_buffer void reset(int packet_size); void crypto_reset(int packet_size); - void set_soft_packet_size(int size); - int advance_pos(int bytes); buffer::const_interval get() const; @@ -231,9 +206,8 @@ private: // explicitly disallow assignment, to silence msvc warning crypto_receive_buffer& operator=(crypto_receive_buffer const&); - int m_recv_pos; - int m_packet_size; - int m_soft_packet_size; + int m_recv_pos = std::numeric_limits::max(); + int m_packet_size = 0; receive_buffer& m_connection_buffer; }; #endif // TORRENT_DISABLE_ENCRYPTION @@ -241,3 +215,4 @@ private: } // namespace libtorrent #endif // #ifndef TORRENT_RECEIVE_BUFFER_HPP_INCLUDED + diff --git a/include/libtorrent/settings_pack.hpp b/include/libtorrent/settings_pack.hpp index 0fb6751f3..159a641e7 100644 --- a/include/libtorrent/settings_pack.hpp +++ b/include/libtorrent/settings_pack.hpp @@ -1098,7 +1098,6 @@ namespace libtorrent // and is disconnected. max_rejects, - // ``recv_socket_buffer_size`` and ``send_socket_buffer_size`` // specifies the buffer sizes set on peer sockets. 0 (which is the // default) means the OS default (i.e. don't change the buffer sizes). // The socket buffer sizes are changed using setsockopt() with @@ -1106,6 +1105,10 @@ namespace libtorrent recv_socket_buffer_size, send_socket_buffer_size, + // the max number of bytes a single peer connection's receive buffer is + // allowed to grow to. + max_peer_recv_buffer_size, + // ``file_checks_delay_per_block`` is the number of milliseconds to // sleep in between disk read operations when checking torrents. This // defaults to 0, but can be set to higher numbers to slow down the diff --git a/include/libtorrent/sliding_average.hpp b/include/libtorrent/sliding_average.hpp index ecd342e3d..b3b32518a 100644 --- a/include/libtorrent/sliding_average.hpp +++ b/include/libtorrent/sliding_average.hpp @@ -45,6 +45,8 @@ template struct sliding_average { sliding_average(): m_mean(0), m_average_deviation(0), m_num_samples(0) {} + sliding_average(sliding_average const&) = default; + sliding_average& operator=(sliding_average const&) = default; void add_sample(int s) { @@ -75,19 +77,16 @@ struct sliding_average private: // both of these are fixed point values (* 64) - int m_mean; - int m_average_deviation; + int m_mean = 0; + int m_average_deviation = 0; // the number of samples we have received, but no more than inverted_gain // this is the effective inverted_gain - int m_num_samples; + int m_num_samples = 0; }; struct average_accumulator { - average_accumulator() - : m_num_samples(0) - , m_sample_sum(0) - {} + average_accumulator() {} void add_sample(int s) { @@ -108,8 +107,10 @@ struct average_accumulator return ret; } - int m_num_samples; - std::uint64_t m_sample_sum; +private: + + int m_num_samples = 0; + std::uint64_t m_sample_sum = 0; }; } diff --git a/include/libtorrent/stack_allocator.hpp b/include/libtorrent/stack_allocator.hpp index 692ec09c9..8b1468b71 100644 --- a/include/libtorrent/stack_allocator.hpp +++ b/include/libtorrent/stack_allocator.hpp @@ -42,6 +42,10 @@ namespace libtorrent { namespace aux { stack_allocator() {} + // non-copyable + stack_allocator(stack_allocator const&) = delete; + stack_allocator& operator=(stack_allocator const&) = delete; + int copy_string(std::string const& str) { int ret = int(m_storage.size()); @@ -100,11 +104,7 @@ namespace libtorrent { namespace aux private: - // non-copyable - stack_allocator(stack_allocator const&); - stack_allocator& operator=(stack_allocator const&); - - buffer m_storage; + std::vector m_storage; }; } } diff --git a/src/bt_peer_connection.cpp b/src/bt_peer_connection.cpp index 6cbf6b926..117ef3d68 100644 --- a/src/bt_peer_connection.cpp +++ b/src/bt_peer_connection.cpp @@ -1105,12 +1105,11 @@ namespace libtorrent boost::shared_ptr t = associated_torrent().lock(); TORRENT_ASSERT(t); - bool merkle = static_cast(recv_buffer.begin[0]) == 250; + bool const merkle = static_cast(recv_buffer.begin[0]) == 250; if (merkle) { if (recv_pos == 1) { - m_recv_buffer.set_soft_packet_size(13); received_bytes(0, received); return; } @@ -1119,13 +1118,10 @@ namespace libtorrent received_bytes(0, received); return; } - if (recv_pos == 13) + if (recv_pos >= 13) { - const char* ptr = recv_buffer.begin + 9; - int list_size = detail::read_int32(ptr); - // now we know how long the bencoded hash list is - // and we can allocate the disk buffer and receive - // into it + char const* ptr = recv_buffer.begin + 9; + int const list_size = detail::read_int32(ptr); if (list_size > m_recv_buffer.packet_size() - 13) { diff --git a/src/file_progress.cpp b/src/file_progress.cpp index 9fa15aeca..c8c67cccc 100644 --- a/src/file_progress.cpp +++ b/src/file_progress.cpp @@ -91,12 +91,13 @@ namespace libtorrent { namespace aux m_have_pieces.set_bit(piece); #endif - int size = (std::min)(std::uint64_t(piece_size), total_size - off); + TORRENT_ASSERT(total_size >= off); + std::int64_t size = (std::min)(std::uint64_t(piece_size), total_size - off); TORRENT_ASSERT(size >= 0); while (size) { - int add = (std::min)(std::int64_t(size), fs.file_size(file_index) - file_offset); + std::int64_t add = (std::min)(size, fs.file_size(file_index) - file_offset); TORRENT_ASSERT(add >= 0); m_file_progress[file_index] += add; diff --git a/src/peer_connection.cpp b/src/peer_connection.cpp index 8d1acdf50..9da90d46d 100644 --- a/src/peer_connection.cpp +++ b/src/peer_connection.cpp @@ -4485,6 +4485,7 @@ namespace libtorrent p.used_send_buffer = m_send_buffer.size(); p.receive_buffer_size = m_recv_buffer.capacity(); p.used_receive_buffer = m_recv_buffer.pos(); + p.receive_buffer_watermark = m_recv_buffer.watermark(); p.write_state = m_channel_state[upload_channel]; p.read_state = m_channel_state[download_channel]; @@ -5409,10 +5410,11 @@ namespace libtorrent } } - int peer_connection::request_bandwidth(int channel, int bytes) + int peer_connection::request_bandwidth(int const channel, int bytes) { TORRENT_ASSERT(is_single_thread()); INVARIANT_CHECK; + // we can only have one outstanding bandwidth request at a time if (m_channel_state[channel] & peer_info::bw_limit) return 0; @@ -5652,8 +5654,15 @@ namespace libtorrent if (m_disconnecting) return; + if (m_recv_buffer.capacity() < 100 + && m_recv_buffer.max_receive() == 0) + { + m_recv_buffer.reserve(100); + } + // we may want to request more quota at this point - request_bandwidth(download_channel); + int const buffer_size = m_recv_buffer.max_receive(); + request_bandwidth(download_channel, buffer_size); if (m_channel_state[download_channel] & peer_info::bw_network) return; @@ -5680,94 +5689,27 @@ namespace libtorrent // from being at or exceeding the limit down to below the limit return; } - error_code ec; - - try_read(read_async, ec); - } - - size_t peer_connection::try_read(sync_t s, error_code& ec) - { - TORRENT_ASSERT(is_single_thread()); TORRENT_ASSERT(m_connected); + if (m_quota[download_channel] == 0) return; - if (m_quota[download_channel] == 0) - { - ec = boost::asio::error::would_block; - return 0; - } + int const quota_left = m_quota[download_channel]; + int const max_receive = (std::min)(buffer_size, quota_left); - if (!can_read()) - { - ec = boost::asio::error::would_block; - return 0; - } - - int max_receive = m_recv_buffer.max_receive(); - - // only apply the contiguous receive buffer when we don't have any - // outstanding requests. When we're likely to receive pieces, we'll - // save more time from avoiding copying data from the socket - if (m_download_queue.empty()) - { - if (s == read_sync) - { - ec = boost::asio::error::would_block; - return 0; - } - - TORRENT_ASSERT((m_channel_state[download_channel] & peer_info::bw_network) == 0); - m_channel_state[download_channel] |= peer_info::bw_network; -#ifndef TORRENT_DISABLE_LOGGING - peer_log(peer_log_alert::incoming, "ASYNC_READ"); -#endif - - ADD_OUTSTANDING_ASYNC("peer_connection::on_receive_data_nb"); - m_socket->async_read_some(null_buffers(), make_read_handler( - std::bind(&peer_connection::on_receive_data_nb, self(), _1, _2))); - return 0; - } - - TORRENT_ASSERT(max_receive >= 0); - - int quota_left = m_quota[download_channel]; - if (max_receive > quota_left) - max_receive = quota_left; - - if (max_receive == 0) - { - ec = boost::asio::error::would_block; - return 0; - } + if (max_receive == 0) return; boost::asio::mutable_buffer const vec = m_recv_buffer.reserve(max_receive); - - if (s == read_async) - { - TORRENT_ASSERT((m_channel_state[download_channel] & peer_info::bw_network) == 0); - m_channel_state[download_channel] |= peer_info::bw_network; + TORRENT_ASSERT((m_channel_state[download_channel] & peer_info::bw_network) == 0); + m_channel_state[download_channel] |= peer_info::bw_network; #ifndef TORRENT_DISABLE_LOGGING - peer_log(peer_log_alert::incoming, "ASYNC_READ" - , "max: %d bytes", max_receive); + peer_log(peer_log_alert::incoming, "ASYNC_READ" + , "max: %d bytes", max_receive); #endif - // utp sockets aren't thread safe... - ADD_OUTSTANDING_ASYNC("peer_connection::on_receive_data"); - m_socket->async_read_some( - boost::asio::mutable_buffers_1(vec), make_read_handler( - std::bind(&peer_connection::on_receive_data, self(), _1, _2))); - return 0; - } - - size_t const ret = m_socket->read_some(boost::asio::mutable_buffers_1(vec), ec); - - // this is weird. You would imagine read_some() would do this - if (ret == 0 && !ec) ec = boost::asio::error::eof; - -#ifndef TORRENT_DISABLE_LOGGING - peer_log(peer_log_alert::incoming, "SYNC_READ", "max: %d ret: %d e: %s" - , max_receive, int(ret), ec ? ec.message().c_str() : ""); -#endif - return ret; + // utp sockets aren't thread safe... + ADD_OUTSTANDING_ASYNC("peer_connection::on_receive_data"); + m_socket->async_read_some( + boost::asio::mutable_buffers_1(vec), make_read_handler( + std::bind(&peer_connection::on_receive_data, self(), _1, _2))); } void peer_connection::append_send_buffer(char* buffer, int size @@ -5859,112 +5801,31 @@ namespace libtorrent bool m_cond; }; - void peer_connection::on_receive_data_nb(const error_code& error - , std::size_t bytes_transferred) - { - TORRENT_ASSERT(is_single_thread()); - COMPLETE_ASYNC("peer_connection::on_receive_data_nb"); - - // leave this bit set until we're done looping, reading from the socket. - // that way we don't trigger any async read calls until the end of this - // function. - TORRENT_ASSERT(m_channel_state[download_channel] & peer_info::bw_network); - - // nb is short for null_buffers. In this mode we don't actually - // allocate a receive buffer up-front, but get notified when - // we can read from the socket, and then determine how much there - // is to read. - - if (error) - { - TORRENT_ASSERT_VAL(error.value() != 0, error.value()); -#ifndef TORRENT_DISABLE_LOGGING - peer_log(peer_log_alert::info, "ERROR" - , "in peer_connection::on_receive_data_nb error: (%s:%d) %s" - , error.category().name(), error.value() - , error.message().c_str()); -#endif - on_receive(error, bytes_transferred); - disconnect(error, op_sock_read); - return; - } - - error_code ec; - std::size_t buffer_size = m_socket->available(ec); - if (ec) - { - disconnect(ec, op_available); - return; - } - -#ifndef TORRENT_DISABLE_LOGGING - peer_log(peer_log_alert::incoming, "READ_AVAILABLE" - , "bytes: %d", int(buffer_size)); -#endif - - // at this point the ioctl told us the socket doesn't have any - // pending bytes. This probably means some error happened. - // in order to find out though, we need to initiate a read - // operation - if (buffer_size == 0) - { - // try to read one byte. The socket is non-blocking anyway - // so worst case, we'll fail with EWOULDBLOCK - buffer_size = 1; - } - else - { - if (buffer_size > m_quota[download_channel]) - { - request_bandwidth(download_channel, int(buffer_size)); - buffer_size = m_quota[download_channel]; - } - // we're already waiting to get some more - // quota from the bandwidth manager - if (buffer_size == 0) - { - // allow reading from the socket again - TORRENT_ASSERT(m_channel_state[download_channel] & peer_info::bw_network); - m_channel_state[download_channel] &= ~peer_info::bw_network; - return; - } - } - - if (buffer_size > 2097152) buffer_size = 2097152; - - boost::asio::mutable_buffer buffer = m_recv_buffer.reserve(int(buffer_size)); - TORRENT_ASSERT(m_recv_buffer.normalized()); - - bytes_transferred = m_socket->read_some(boost::asio::mutable_buffers_1(buffer), ec); - - if (ec) - { - if (ec == boost::asio::error::try_again - || ec == boost::asio::error::would_block) - { - // allow reading from the socket again - TORRENT_ASSERT(m_channel_state[download_channel] & peer_info::bw_network); - m_channel_state[download_channel] &= ~peer_info::bw_network; - setup_receive(); - return; - } - disconnect(ec, op_sock_read); - return; - } - - receive_data_impl(error, bytes_transferred, 0); - } - // -------------------------- // RECEIVE DATA // -------------------------- - // nb is true if this callback is due to a null_buffers() - // invocation of async_read_some(). In that case, we need - // to disregard bytes_transferred. - // at all exit points of this function, one of the following MUST hold: - // 1. the socket is disconnecting - // 2. m_channel_state[download_channel] & peer_info::bw_network == 0 + void peer_connection::account_received_bytes(int const bytes_transferred) + { + // tell the receive buffer we just fed it this many bytes of incoming data + TORRENT_ASSERT(bytes_transferred > 0); + m_recv_buffer.received(bytes_transferred); + + // update the dl quota + TORRENT_ASSERT(bytes_transferred <= m_quota[download_channel]); + m_quota[download_channel] -= bytes_transferred; + + // account receiver buffer size stats to the session + m_ses.received_buffer(bytes_transferred); + + // estimage transport protocol overhead + trancieve_ip_packet(bytes_transferred, m_remote.address().is_v6()); + +#ifndef TORRENT_DISABLE_LOGGING + peer_log(peer_log_alert::incoming, "READ" + , "%d bytes", int(bytes_transferred)); +#endif + } void peer_connection::on_receive_data(const error_code& error , std::size_t bytes_transferred) @@ -5972,6 +5833,13 @@ namespace libtorrent TORRENT_ASSERT(is_single_thread()); COMPLETE_ASYNC("peer_connection::on_receive_data"); +#ifndef TORRENT_DISABLE_LOGGING + peer_log(peer_log_alert::incoming, "ON_RECEIVE_DATA" + , "bytes: %d error: (%s:%d) %s" + , int(bytes_transferred), error.category().name(), error.value() + , error.message().c_str()); +#endif + // leave this bit set until we're done looping, reading from the socket. // that way we don't trigger any async read calls until the end of this // function. @@ -5979,19 +5847,23 @@ namespace libtorrent TORRENT_ASSERT(bytes_transferred > 0 || error); - receive_data_impl(error, bytes_transferred, 10); - } + m_counters.inc_stats_counter(counters::on_read_counter); - void peer_connection::receive_data_impl(const error_code& error - , std::size_t bytes_transferred, int read_loops) - { - TORRENT_ASSERT(is_single_thread()); + INVARIANT_CHECK; + + if (error) + { #ifndef TORRENT_DISABLE_LOGGING - peer_log(peer_log_alert::incoming, "ON_RECEIVE_DATA" - , "bytes: %d error: (%s:%d) %s" - , int(bytes_transferred), error.category().name(), error.value() - , error.message().c_str()); + peer_log(peer_log_alert::info, "ERROR" + , "in peer_connection::on_receive_data_impl error: %s" + , error.message().c_str()); #endif + on_receive(error, bytes_transferred); + disconnect(error, op_sock_read); + return; + } + + m_last_receive = aux::time_now(); // submit all disk jobs later m_ses.deferred_submit_jobs(); @@ -6005,27 +5877,14 @@ namespace libtorrent // flush the send buffer at the end of this function cork _c(*this); - INVARIANT_CHECK; - - int bytes_in_loop = int(bytes_transferred); - - if (error) - { -#ifndef TORRENT_DISABLE_LOGGING - peer_log(peer_log_alert::info, "ERROR" - , "in peer_connection::on_receive_data_impl error: %s" - , error.message().c_str()); -#endif - trancieve_ip_packet(bytes_in_loop, m_remote.address().is_v6()); - on_receive(error, bytes_transferred); - disconnect(error, op_sock_read); - return; - } - TORRENT_ASSERT(bytes_transferred > 0); - m_counters.inc_stats_counter(counters::on_read_counter); - m_ses.received_buffer(int(bytes_transferred)); + // if we received exactly as many bytes as we provided a receive buffer + // for. There most likely are more bytes to read, and we should grow our + // receive buffer. + TORRENT_ASSERT(bytes_transferred <= m_recv_buffer.max_receive()); + bool const grow_buffer = (bytes_transferred == m_recv_buffer.max_receive()); + account_received_bytes(bytes_transferred); if (m_extension_outstanding_bytes > 0) m_extension_outstanding_bytes -= (std::min)(m_extension_outstanding_bytes, int(bytes_transferred)); @@ -6033,79 +5892,91 @@ namespace libtorrent check_graceful_pause(); if (m_disconnecting) return; - int num_loops = 0; - do + // this is the case where we try to grow the receive buffer and try to + // drain the socket + if (grow_buffer) { -#ifndef TORRENT_DISABLE_LOGGING - peer_log(peer_log_alert::incoming, "READ" - , "%d bytes", int(bytes_transferred)); -#endif - // correct the dl quota usage, if not all of the buffer was actually read - TORRENT_ASSERT(int(bytes_transferred) <= m_quota[download_channel]); - m_quota[download_channel] -= int(bytes_transferred); - - if (m_disconnecting) - { - trancieve_ip_packet(bytes_in_loop, m_remote.address().is_v6()); - return; - } - - TORRENT_ASSERT(bytes_transferred > 0); - m_recv_buffer.received(int(bytes_transferred)); - - int bytes = int(bytes_transferred); - int sub_transferred = 0; - do { -// TODO: The stats checks can not be honored when authenticated encryption is in use -// because we may have encrypted data which we cannot authenticate yet -#if 0 - std::int64_t cur_payload_dl = m_statistics.last_payload_downloaded(); - std::int64_t cur_protocol_dl = m_statistics.last_protocol_downloaded(); -#endif - sub_transferred = m_recv_buffer.advance_pos(bytes); - on_receive(error, sub_transferred); - bytes -= sub_transferred; - TORRENT_ASSERT(sub_transferred > 0); - -#if 0 - TORRENT_ASSERT(m_statistics.last_payload_downloaded() - cur_payload_dl >= 0); - TORRENT_ASSERT(m_statistics.last_protocol_downloaded() - cur_protocol_dl >= 0); - std::int64_t stats_diff = m_statistics.last_payload_downloaded() - cur_payload_dl + - m_statistics.last_protocol_downloaded() - cur_protocol_dl; - TORRENT_ASSERT(stats_diff == int(sub_transferred)); -#endif - if (m_disconnecting) return; - - } while (bytes > 0 && sub_transferred > 0); - - m_recv_buffer.normalize(); - - TORRENT_ASSERT(m_recv_buffer.pos_at_end()); - TORRENT_ASSERT(m_recv_buffer.packet_size() > 0); - - if (m_peer_choked) - { - m_recv_buffer.clamp_size(); - } - - if (num_loops > read_loops) break; - error_code ec; - bytes_transferred = try_read(read_sync, ec); - TORRENT_ASSERT(bytes_transferred > 0 || ec); - if (ec == boost::asio::error::would_block || ec == boost::asio::error::try_again) break; + std::size_t buffer_size = m_socket->available(ec); if (ec) { - trancieve_ip_packet(bytes_in_loop, m_remote.address().is_v6()); - disconnect(ec, op_sock_read); + disconnect(ec, op_available); return; } - bytes_in_loop += int(bytes_transferred); - ++num_loops; - } - while (bytes_transferred > 0); - m_last_receive = aux::time_now(); +#ifndef TORRENT_DISABLE_LOGGING + peer_log(peer_log_alert::incoming, "AVAILABLE" + , "%d bytes", int(buffer_size)); +#endif + + request_bandwidth(download_channel, buffer_size); + + int const quota_left = m_quota[download_channel]; + if (buffer_size > quota_left) buffer_size = quota_left; + if (buffer_size > 0) + { + boost::asio::mutable_buffer const vec = m_recv_buffer.reserve(buffer_size); + size_t bytes = m_socket->read_some(boost::asio::mutable_buffers_1(vec), ec); + + // this is weird. You would imagine read_some() would do this + if (bytes == 0 && !ec) ec = boost::asio::error::eof; + +#ifndef TORRENT_DISABLE_LOGGING + peer_log(peer_log_alert::incoming, "SYNC_READ", "max: %d ret: %d e: %s" + , int(buffer_size), int(bytes), ec ? ec.message().c_str() : ""); +#endif + + TORRENT_ASSERT(bytes > 0 || ec); + if (ec == boost::asio::error::would_block || ec == boost::asio::error::try_again) + { + bytes = 0; + } + else if (ec) + { + disconnect(ec, op_sock_read); + return; + } + + account_received_bytes(bytes); + + bytes_transferred += bytes; + } + } + + // feed bytes in receive buffer to upper layer by calling on_receive() + + bool const prev_choked = m_peer_choked; + int bytes = int(bytes_transferred); + int sub_transferred = 0; + do { + sub_transferred = m_recv_buffer.advance_pos(bytes); + on_receive(error, sub_transferred); + bytes -= sub_transferred; + TORRENT_ASSERT(sub_transferred > 0); + if (m_disconnecting) return; + } while (bytes > 0 && sub_transferred > 0); + + // if the peer went from unchoked to choked, suggest to the receive + // buffer that it shrinks to 100 bytes + int const force_shrink = (m_peer_choked && !prev_choked) + ? 100 : 0; + m_recv_buffer.normalize(force_shrink); + + if (m_recv_buffer.max_receive() == 0) + { + // the message we're receiving is larger than our receive + // buffer, we must grow. + int const buffer_size_limit + = m_settings.get_int(settings_pack::max_peer_recv_buffer_size); + m_recv_buffer.grow(buffer_size_limit); +#ifndef TORRENT_DISABLE_LOGGING + peer_log(peer_log_alert::incoming, "GROW_BUFFER", "%d bytes" + , m_recv_buffer.capacity()); +#endif + } + + TORRENT_ASSERT(m_recv_buffer.pos_at_end()); + TORRENT_ASSERT(m_recv_buffer.packet_size() > 0); if (is_seed()) { @@ -6113,8 +5984,6 @@ namespace libtorrent if (t) t->seen_complete(); } - trancieve_ip_packet(bytes_in_loop, m_remote.address().is_v6()); - // allow reading from the socket again TORRENT_ASSERT(m_channel_state[download_channel] & peer_info::bw_network); m_channel_state[download_channel] &= ~peer_info::bw_network; @@ -6345,16 +6214,14 @@ namespace libtorrent time_point now = clock_type::now(); - for (std::vector::iterator i = m_download_queue.begin() - , end(m_download_queue.end()); i != end; ++i) + for (auto& block : m_download_queue) { - if (i->send_buffer_offset == pending_block::not_in_buffer) continue; - std::int32_t offset = i->send_buffer_offset; - offset -= int(bytes_transferred); - if (offset < 0) - i->send_buffer_offset = pending_block::not_in_buffer; + if (block.send_buffer_offset == pending_block::not_in_buffer) + continue; + if (block.send_buffer_offset < int(bytes_transferred)) + block.send_buffer_offset = pending_block::not_in_buffer; else - i->send_buffer_offset = offset; + block.send_buffer_offset -= int(bytes_transferred); } m_channel_state[upload_channel] &= ~peer_info::bw_network; diff --git a/src/receive_buffer.cpp b/src/receive_buffer.cpp index 3ff1cb8ad..05395590d 100644 --- a/src/receive_buffer.cpp +++ b/src/receive_buffer.cpp @@ -34,61 +34,65 @@ POSSIBILITY OF SUCH DAMAGE. namespace libtorrent { - namespace { - int round_up8(int v) - { - return ((v & 7) == 0) ? v : v + (8 - (v & 7)); - } - } - -int receive_buffer::max_receive() +int receive_buffer::max_receive() const { - int max = packet_bytes_remaining(); - if (m_recv_pos >= m_soft_packet_size) m_soft_packet_size = 0; - if (m_soft_packet_size && max > m_soft_packet_size - m_recv_pos) - max = m_soft_packet_size - m_recv_pos; - return max; + return int(m_recv_buffer.size() - m_recv_end); } boost::asio::mutable_buffer receive_buffer::reserve(int size) { TORRENT_ASSERT(size > 0); TORRENT_ASSERT(m_recv_pos >= 0); - // this is unintuitive, but we used to use m_recv_pos in this function when - // we should have used m_recv_end. perhaps they always happen to be equal - TORRENT_ASSERT(m_recv_pos == m_recv_end); // normalize() must be called before receiving more data TORRENT_ASSERT(m_recv_start == 0); - m_recv_buffer.resize(m_recv_end + size); + if (m_recv_buffer.size() < m_recv_end + size) + { + int const new_size = std::max(m_recv_end + size, m_packet_size); + buffer new_buffer(new_size + , aux::array_view(m_recv_buffer.ptr(), m_recv_end)); + m_recv_buffer = std::move(new_buffer); + + // since we just increased the size of the buffer, reset the watermark to + // start at our new size (avoid flapping the buffer size) + m_watermark = sliding_average<20>(); + } + return boost::asio::buffer(&m_recv_buffer[0] + m_recv_end, size); } +void receive_buffer::grow(int const limit) +{ + int const current_size = int(m_recv_buffer.size()); + TORRENT_ASSERT(current_size < std::numeric_limits::max() / 3); + + // first grow to one piece message, then grow by 50% each time + int const new_size = (current_size < m_packet_size) + ? m_packet_size : std::min(current_size * 3 / 2, limit); + + // re-allcoate the buffer and copy over the part of it that's used + buffer new_buffer(new_size + , aux::array_view(m_recv_buffer.ptr(), m_recv_end)); + m_recv_buffer = std::move(new_buffer); + + // since we just increased the size of the buffer, reset the watermark to + // start at our new size (avoid flapping the buffer size) + m_watermark = sliding_average<20>(); +} + int receive_buffer::advance_pos(int bytes) { - int const packet_size = m_soft_packet_size ? m_soft_packet_size : m_packet_size; - int const limit = packet_size > m_recv_pos ? packet_size - m_recv_pos : packet_size; + int const limit = m_packet_size > m_recv_pos ? m_packet_size - m_recv_pos : m_packet_size; int const sub_transferred = (std::min)(bytes, limit); m_recv_pos += sub_transferred; - if (m_recv_pos >= m_soft_packet_size) m_soft_packet_size = 0; return sub_transferred; } -void receive_buffer::clamp_size() -{ - if (m_recv_pos == 0 - && (m_recv_buffer.capacity() - m_packet_size) > 128) - { - // round up to an even 8 bytes since that's the RC4 blocksize - buffer(round_up8(m_packet_size)).swap(m_recv_buffer); - } -} - // size = the packet size to remove from the receive buffer // packet_size = the next packet size to receive in the buffer // offset = the offset into the receive buffer where to remove `size` bytes -void receive_buffer::cut(int size, int packet_size, int offset) +void receive_buffer::cut(int const size, int const packet_size, int const offset) { TORRENT_ASSERT(packet_size > 0); TORRENT_ASSERT(int(m_recv_buffer.size()) >= size); @@ -104,9 +108,11 @@ void receive_buffer::cut(int size, int packet_size, int offset) TORRENT_ASSERT(m_recv_start - size <= m_recv_end); if (size > 0) + { std::memmove(&m_recv_buffer[0] + m_recv_start + offset , &m_recv_buffer[0] + m_recv_start + offset + size , m_recv_end - m_recv_start - size - offset); + } m_recv_pos -= size; m_recv_end -= size; @@ -168,13 +174,41 @@ boost::asio::mutable_buffer receive_buffer::mutable_buffer(int const bytes) // the purpose of this function is to free up and cut off all messages // in the receive buffer that have been parsed and processed. -void receive_buffer::normalize() +// it may also shrink the size of the buffer allocation if we haven't been using +// enough of it lately. +void receive_buffer::normalize(int force_shrink) { TORRENT_ASSERT(m_recv_end >= m_recv_start); - if (m_recv_start == 0) return; - if (m_recv_end > m_recv_start) - std::memmove(&m_recv_buffer[0], &m_recv_buffer[0] + m_recv_start, m_recv_end - m_recv_start); + m_watermark.add_sample(std::max(m_recv_end, m_packet_size)); + + // if the running average drops below half of the current buffer size, + // reallocate a smaller one. + bool const shrink_buffer = m_recv_buffer.size() / 2 > m_watermark.mean() + && m_watermark.mean() > (m_recv_end - m_recv_start); + + aux::array_view bytes_to_shift( + m_recv_buffer.ptr() + m_recv_start + , m_recv_end - m_recv_start); + + if (force_shrink) + { + const int target_size = std::max(std::max(force_shrink + , int(bytes_to_shift.size())), m_packet_size); + buffer new_buffer(target_size, bytes_to_shift); + m_recv_buffer = std::move(new_buffer); + } + else if (shrink_buffer) + { + buffer new_buffer(m_watermark.mean(), bytes_to_shift); + m_recv_buffer = std::move(new_buffer); + } + else if (m_recv_end > m_recv_start + && m_recv_start > 0) + { + std::memmove(m_recv_buffer.ptr(), bytes_to_shift.data() + , bytes_to_shift.size()); + } m_recv_end -= m_recv_start; m_recv_start = 0; @@ -276,24 +310,14 @@ void crypto_receive_buffer::crypto_reset(int packet_size) } } -void crypto_receive_buffer::set_soft_packet_size(int size) -{ - if (m_recv_pos == INT_MAX) - m_connection_buffer.set_soft_packet_size(size); - else - m_soft_packet_size = size; -} - int crypto_receive_buffer::advance_pos(int bytes) { if (m_recv_pos == INT_MAX) return bytes; - int packet_size = m_soft_packet_size ? m_soft_packet_size : m_packet_size; - int limit = packet_size > m_recv_pos ? packet_size - m_recv_pos : packet_size; - int sub_transferred = (std::min)(bytes, limit); + int const limit = m_packet_size > m_recv_pos ? m_packet_size - m_recv_pos : m_packet_size; + int const sub_transferred = (std::min)(bytes, limit); m_recv_pos += sub_transferred; m_connection_buffer.cut(0, m_connection_buffer.packet_size() + sub_transferred); - if (m_recv_pos >= m_soft_packet_size) m_soft_packet_size = 0; return sub_transferred; } diff --git a/src/session.cpp b/src/session.cpp index 7493c9805..f741df06c 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -84,6 +84,8 @@ namespace libtorrent set.set_bool(settings_pack::contiguous_recv_buffer, false); #endif + set.set_int(settings_pack::max_peer_recv_buffer_size, 32 * 1024 + 200); + set.set_int(settings_pack::disk_io_write_mode, settings_pack::disable_os_cache); set.set_int(settings_pack::disk_io_read_mode, settings_pack::disable_os_cache); @@ -171,6 +173,8 @@ namespace libtorrent set.set_int(settings_pack::max_out_request_queue, 1500); set.set_int(settings_pack::max_allowed_in_request_queue, 2000); + set.set_int(settings_pack::max_peer_recv_buffer_size, 5 * 1024 * 1024); + // we will probably see a high rate of alerts, make it less // likely to loose alerts set.set_int(settings_pack::alert_queue_size, 10000); diff --git a/src/settings_pack.cpp b/src/settings_pack.cpp index 6529639ef..4a01ba1b7 100644 --- a/src/settings_pack.cpp +++ b/src/settings_pack.cpp @@ -279,6 +279,7 @@ namespace libtorrent SET(max_rejects, 50, 0), SET(recv_socket_buffer_size, 0, &session_impl::update_socket_buffer_size), SET(send_socket_buffer_size, 0, &session_impl::update_socket_buffer_size), + SET_NOPREV(max_peer_recv_buffer_size, 2 * 1024 * 1024, 0), SET(file_checks_delay_per_block, 0, 0), SET(read_cache_line_size, 32, 0), SET(write_cache_line_size, 16, 0), diff --git a/test/test_buffer.cpp b/test/test_buffer.cpp index 39252210f..20a54a0c5 100644 --- a/test/test_buffer.cpp +++ b/test/test_buffer.cpp @@ -43,125 +43,97 @@ POSSIBILITY OF SUCH DAMAGE. using namespace libtorrent; -/* -template -T const& min_(T const& x, T const& y) -{ - return x < y ? x : y; -} - -void test_speed() -{ - buffer b; - - char data[32]; - - srand(0); - - boost::timer t; - - int const iterations = 5000000; - int const step = iterations / 20; - - for (int i = 0; i < iterations; ++i) - { - int x = rand(); - - if (i % step == 0) std::cerr << "."; - - std::size_t n = rand() % 32; - n = 32; - - if (x % 2) - { - b.insert(data, data + n); - } - else - { - b.erase(min_(b.size(), n)); - } - } - - float t1 = t.elapsed(); - std::cerr << "buffer elapsed: " << t.elapsed() << "\n"; - - std::vector v; - - srand(0); - t.restart(); - - for (int i = 0; i < iterations; ++i) - { - int x = rand(); - - if (i % step == 0) std::cerr << "."; - - std::size_t n = rand() % 32; - n = 32; - - if (x % 2) - { - v.insert(v.end(), data, data + n); - } - else - { - v.erase(v.begin(), v.begin() + min_(v.size(), n)); - } - } - - float t2 = t.elapsed(); - std::cerr << "std::vector elapsed: " << t.elapsed() << "\n"; - - assert(t1 < t2); -} -*/ - // -- test buffer -- -TORRENT_TEST(buffer) +static char const data[] = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}; + +TORRENT_TEST(buffer_constructor) { - char data[] = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}; - buffer b; + { + buffer b; + TEST_CHECK(b.size() == 0); + TEST_CHECK(b.empty()); + } - TEST_CHECK(b.size() == 0); - TEST_CHECK(b.capacity() == 0); - TEST_CHECK(b.empty()); + { + buffer b(10); + TEST_CHECK(b.size() >= 10); + } - b.resize(10); - TEST_CHECK(b.size() == 10); - TEST_CHECK(b.capacity() == 10); + { + buffer b(50, data); + TEST_CHECK(std::memcmp(b.ptr(), data, 10) == 0); + TEST_CHECK(b.size() >= 50); + } +} - std::memcpy(b.begin(), data, 10); - b.reserve(50); - TEST_CHECK(std::memcmp(b.begin(), data, 10) == 0); - TEST_CHECK(b.capacity() == 50); +TORRENT_TEST(buffer_swap) +{ + buffer b1; + TEST_CHECK(b1.size() == 0); + buffer b2(10, data); + std::size_t const b2_size = b2.size(); + TEST_CHECK(b2_size >= 10); - b.erase(b.begin() + 6, b.end()); - TEST_CHECK(std::memcmp(b.begin(), data, 6) == 0); - TEST_CHECK(b.capacity() == 50); - TEST_CHECK(b.size() == 6); + b1.swap(b2); - b.insert(b.begin(), data + 5, data + 10); - TEST_CHECK(b.capacity() == 50); - TEST_CHECK(b.size() == 11); - TEST_CHECK(std::memcmp(b.begin(), data + 5, 5) == 0); + TEST_CHECK(b2.size() == 0); + TEST_CHECK(b1.size() == b2_size); + TEST_CHECK(std::memcmp(b1.ptr(), data, 10) == 0); +} - b.clear(); - TEST_CHECK(b.size() == 0); - TEST_CHECK(b.capacity() == 50); +TORRENT_TEST(buffer_subscript) +{ + buffer b(50, data); + TEST_CHECK(std::memcmp(b.ptr(), data, 10) == 0); + TEST_CHECK(b.size() >= 50); - b.insert(b.end(), data, data + 10); - TEST_CHECK(b.size() == 10); - TEST_CHECK(std::memcmp(b.begin(), data, 10) == 0); + for (int i = 0; i < int(sizeof(data)/sizeof(data[0])); ++i) + TEST_CHECK(b[i] == data[i]); +} - b.erase(b.begin(), b.end()); - TEST_CHECK(b.capacity() == 50); - TEST_CHECK(b.size() == 0); +TORRENT_TEST(buffer_subscript2) +{ + buffer b(1); + TEST_CHECK(b.size() >= 1); - buffer().swap(b); - TEST_CHECK(b.capacity() == 0); + for (int i = 0; i < int(b.size()); ++i) + b[i] = i & 0xff; + for (int i = 0; i < int(b.size()); ++i) + TEST_CHECK(b[i] == (i & 0xff)); +} + +TORRENT_TEST(buffer_move_construct) +{ + buffer b1(50, data); + TEST_CHECK(std::memcmp(b1.ptr(), data, 10) == 0); + TEST_CHECK(b1.size() >= 50); + + buffer b2(std::move(b1)); + + TEST_CHECK(b1.size() == 0); + + TEST_CHECK(std::memcmp(b2.ptr(), data, 10) == 0); + TEST_CHECK(b2.size() >= 50); +} + +TORRENT_TEST(buffer_move_assign) +{ + buffer b1(50, data); + TEST_CHECK(std::memcmp(b1.ptr(), data, 10) == 0); + TEST_CHECK(b1.size() >= 50); + + buffer b2; + TEST_CHECK(b2.size() == 0); + + b2 = std::move(b1); + + TEST_CHECK(b1.size() == 0); + + TEST_CHECK(std::memcmp(b2.ptr(), data, 10) == 0); + TEST_CHECK(b2.size() >= 50); } // -- test chained buffer -- diff --git a/test/test_receive_buffer.cpp b/test/test_receive_buffer.cpp index 5ec624122..781f1898c 100644 --- a/test/test_receive_buffer.cpp +++ b/test/test_receive_buffer.cpp @@ -92,6 +92,128 @@ TORRENT_TEST(recv_buffer_packet_finished) TEST_EQUAL(b.packet_finished(), true); } +TORRENT_TEST(recv_buffer_grow_floor) +{ + receive_buffer b; + b.reset(1337); + b.grow(100000); + + // the exact size depends on the OS allocator. Technically there's no upper + // bound, but it's likely withint some reasonable size + TEST_CHECK(b.capacity() >= 1337); + TEST_CHECK(b.capacity() < 1337 + 1000); +} + +TORRENT_TEST(recv_buffer_grow) +{ + receive_buffer b; + b.reserve(200); + b.grow(100000); + // grow by 50% + TEST_CHECK(b.capacity() >= 300); + TEST_CHECK(b.capacity() < 300 + 500); +} + +TORRENT_TEST(recv_buffer_grow_limit) +{ + receive_buffer b; + b.reserve(2000); + b.grow(2100); + // grow by 50%, but capped by 2100 bytes + TEST_CHECK(b.capacity() >= 2100); + TEST_CHECK(b.capacity() < 2100 + 500); + printf("capacity: %d\n", b.capacity()); +} + +TORRENT_TEST(recv_buffer_reserve_minimum_grow) +{ + receive_buffer b; + b.reset(1337); + b.reserve(20); + + // we only asked for 20 more bytes, but since the message size was set to + // 1337, that's the minimum size to grow to + TEST_CHECK(b.capacity() >= 1337); + TEST_CHECK(b.capacity() < 1337 + 1000); +} + +TORRENT_TEST(recv_buffer_reserve_grow) +{ + receive_buffer b; + b.reserve(20); + + TEST_CHECK(b.capacity() >= 20); + TEST_CHECK(b.capacity() < 20 + 500); +} + +TORRENT_TEST(recv_buffer_reserve) +{ + receive_buffer b; + auto range1 = b.reserve(100); + + int const capacity = b.capacity(); + + b.reset(20); + b.received(20); + + TEST_EQUAL(b.capacity(), capacity); + + auto range2 = b.reserve(50); + + using namespace boost::asio; + + TEST_EQUAL(b.capacity(), capacity); + TEST_EQUAL(buffer_cast(range1) + 20, buffer_cast(range2)); + TEST_CHECK(buffer_size(range1) >= 20); + TEST_CHECK(buffer_size(range2) >= 50); +} + +TORRENT_TEST(receive_buffer_normalize) +{ + receive_buffer b; + b.reset(16000); + + // receive one large packet, to allocate a large receive buffer + for (int i = 0; i < 16; ++i) + { + b.reserve(1000); + b.received(1000); + b.normalize(); + } + + TEST_CHECK(b.capacity() >= 16000); + int const start_capacity = b.capacity(); + + // then receive lots of small packets. We should eventually re-allocate down + // to a smaller buffer + for (int i = 0; i < 15; ++i) + { + b.reset(160); + b.reserve(160); + b.received(160); + b.normalize(); + printf("capacity: %d watermark: %d\n", b.capacity(), b.watermark()); + } + + TEST_CHECK(b.capacity() <= start_capacity / 2); + printf("capacity: %d\n", b.capacity()); +} + +TORRENT_TEST(receive_buffer_max_receive) +{ + receive_buffer b; + b.reset(2000); + b.reserve(2000); + b.received(2000); + b.normalize(); + + b.reset(20); + int const max_receive = b.max_receive(); + TEST_CHECK(max_receive >= 2000); + b.received(20); + TEST_EQUAL(b.max_receive(), max_receive - 20); +} + #if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) TORRENT_TEST(recv_buffer_mutable_buffers)