diff --git a/include/libtorrent/receive_buffer.hpp b/include/libtorrent/receive_buffer.hpp index b35a6cdd0..db377ae9d 100644 --- a/include/libtorrent/receive_buffer.hpp +++ b/include/libtorrent/receive_buffer.hpp @@ -40,7 +40,7 @@ POSSIBILITY OF SUCH DAMAGE. namespace libtorrent { -struct receive_buffer +struct TORRENT_EXTRA_EXPORT receive_buffer { friend struct crypto_receive_buffer; @@ -58,6 +58,7 @@ struct receive_buffer int packet_bytes_remaining() const { TORRENT_ASSERT(m_recv_start == 0); + TORRENT_ASSERT(m_packet_size > 0); return m_packet_size - m_recv_pos; } @@ -67,13 +68,19 @@ struct receive_buffer int pos() const { return m_recv_pos; } int capacity() const { return m_recv_buffer.capacity() + m_disk_recv_buffer_size; } - int regular_buffer_size() const { return m_packet_size - m_disk_recv_buffer_size; } + int regular_buffer_size() const + { + TORRENT_ASSERT(m_packet_size > 0); + return m_packet_size - m_disk_recv_buffer_size; + } // regular buffer only boost::asio::mutable_buffer reserve(int size); // with possible disk buffer usage int reserve(boost::array& vec, int size); + // tell the buffer we just receved more bytes at the end of it. This will + // advance the end cursor void received(int bytes_transferred) { TORRENT_ASSERT(m_packet_size > 0); @@ -82,9 +89,14 @@ struct receive_buffer + m_disk_recv_buffer_size)); } + // tell the buffer we consumed some bytes of it. This will advance the read + // cursor int advance_pos(int bytes); + + // has the read cursor reached the end cursor? bool pos_at_end() { return m_recv_pos == m_recv_end; } + // make the buffer size dividible by 8 bytes (RC4 block size) void clamp_size(); void set_soft_packet_size(int size) { m_soft_packet_size = size; } @@ -94,6 +106,8 @@ struct receive_buffer // offset = the offset into the receive buffer where to remove `size` bytes void cut(int size, int packet_size, int offset = 0); + // return the interval between the start of the buffer to the read cursor. + // This is the "current" packet. buffer::const_interval get() const; #if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) diff --git a/src/bt_peer_connection.cpp b/src/bt_peer_connection.cpp index 729a37041..113a85d6b 100644 --- a/src/bt_peer_connection.cpp +++ b/src/bt_peer_connection.cpp @@ -3284,7 +3284,6 @@ namespace libtorrent if (!m_recv_buffer.packet_finished()) return; recv_buffer = m_recv_buffer.get(); - #ifndef TORRENT_DISABLE_LOGGING std::string extensions; extensions.resize(8 * 8); diff --git a/src/receive_buffer.cpp b/src/receive_buffer.cpp index c9cb16190..d5e86a67d 100644 --- a/src/receive_buffer.cpp +++ b/src/receive_buffer.cpp @@ -54,8 +54,12 @@ boost::asio::mutable_buffer receive_buffer::reserve(int size) { TORRENT_ASSERT(size > 0); TORRENT_ASSERT(!m_disk_recv_buffer); - m_recv_buffer.resize(m_recv_pos + size); - return boost::asio::buffer(&m_recv_buffer[m_recv_pos], size); + // this is unintuitive, but we used to use m_recv_pos in this function when + // we should have used m_recv_end. perhaps they always happen to be equal + TORRENT_ASSERT(m_recv_pos == m_recv_end); + + m_recv_buffer.resize(m_recv_end + size); + return boost::asio::buffer(&m_recv_buffer[0] + m_recv_end, size); } int receive_buffer::reserve(boost::array& vec, int size) @@ -64,41 +68,48 @@ int receive_buffer::reserve(boost::array& vec, i TORRENT_ASSERT(m_recv_pos >= 0); TORRENT_ASSERT(m_packet_size > 0); + // normalize() must be called before receiving more data + TORRENT_ASSERT(m_recv_start == 0); + + // this is unintuitive, but we used to use m_recv_pos in this function when + // we should have used m_recv_end. perhaps they always happen to be equal + TORRENT_ASSERT(m_recv_pos == m_recv_end); + int num_bufs; - int regular_buf_size = regular_buffer_size(); + int const regular_buf_size = regular_buffer_size(); if (int(m_recv_buffer.size()) < regular_buf_size) m_recv_buffer.resize(round_up8(regular_buf_size)); - if (!m_disk_recv_buffer || regular_buf_size >= m_recv_pos + size) + if (!m_disk_recv_buffer || regular_buf_size >= m_recv_end + size) { // only receive into regular buffer - TORRENT_ASSERT(m_recv_pos + size <= int(m_recv_buffer.size())); - vec[0] = boost::asio::buffer(&m_recv_buffer[m_recv_pos], size); + TORRENT_ASSERT(m_recv_end + size <= int(m_recv_buffer.size())); + vec[0] = boost::asio::buffer(&m_recv_buffer[0] + m_recv_end, size); TORRENT_ASSERT(boost::asio::buffer_size(vec[0]) > 0); num_bufs = 1; } - else if (m_recv_pos >= regular_buf_size) + else if (m_recv_end >= regular_buf_size) { // only receive into disk buffer - TORRENT_ASSERT(m_recv_pos - regular_buf_size >= 0); - TORRENT_ASSERT(m_recv_pos - regular_buf_size + size <= m_disk_recv_buffer_size); - vec[0] = boost::asio::buffer(m_disk_recv_buffer.get() + m_recv_pos - regular_buf_size, size); + TORRENT_ASSERT(m_recv_end - regular_buf_size >= 0); + TORRENT_ASSERT(m_recv_end - regular_buf_size + size <= m_disk_recv_buffer_size); + vec[0] = boost::asio::buffer(m_disk_recv_buffer.get() + m_recv_end - regular_buf_size, size); TORRENT_ASSERT(boost::asio::buffer_size(vec[0]) > 0); num_bufs = 1; } else { // receive into both regular and disk buffer - TORRENT_ASSERT(size + m_recv_pos > regular_buf_size); - TORRENT_ASSERT(m_recv_pos < regular_buf_size); + TORRENT_ASSERT(size + m_recv_end > regular_buf_size); + TORRENT_ASSERT(m_recv_end < regular_buf_size); TORRENT_ASSERT(size - regular_buf_size - + m_recv_pos <= m_disk_recv_buffer_size); + + m_recv_end <= m_disk_recv_buffer_size); - vec[0] = boost::asio::buffer(&m_recv_buffer[m_recv_pos] - , regular_buf_size - m_recv_pos); + vec[0] = boost::asio::buffer(&m_recv_buffer[0] + m_recv_end + , regular_buf_size - m_recv_end); vec[1] = boost::asio::buffer(m_disk_recv_buffer.get() - , size - regular_buf_size + m_recv_pos); + , size - regular_buf_size + m_recv_end); TORRENT_ASSERT(boost::asio::buffer_size(vec[0]) + boost::asio::buffer_size(vec[1])> 0); num_bufs = 2; @@ -109,9 +120,9 @@ int receive_buffer::reserve(boost::array& vec, i int receive_buffer::advance_pos(int bytes) { - int packet_size = m_soft_packet_size ? m_soft_packet_size : m_packet_size; - int limit = packet_size > m_recv_pos ? packet_size - m_recv_pos : packet_size; - int sub_transferred = (std::min)(bytes, limit); + int const packet_size = m_soft_packet_size ? m_soft_packet_size : m_packet_size; + int const limit = packet_size > m_recv_pos ? packet_size - m_recv_pos : packet_size; + int const sub_transferred = (std::min)(bytes, limit); m_recv_pos += sub_transferred; if (m_recv_pos >= m_soft_packet_size) m_soft_packet_size = 0; return sub_transferred; @@ -174,7 +185,8 @@ buffer::const_interval receive_buffer::get() const TORRENT_ASSERT(m_recv_pos == 0); return buffer::interval(0,0); } - int rcv_pos = (std::min)(m_recv_pos, int(m_recv_buffer.size())); + + int rcv_pos = (std::min)(m_recv_pos, int(m_recv_buffer.size()) - m_recv_start); return buffer::const_interval(&m_recv_buffer[0] + m_recv_start , &m_recv_buffer[0] + m_recv_start + rcv_pos); } @@ -194,6 +206,8 @@ buffer::interval receive_buffer::mutable_buffer() , &m_recv_buffer[0] + m_recv_start + rcv_pos); } +// TODO: 2 should this take a boost::array<..., 2> instead? it could return the +// number of buffers added, just like reserve. void receive_buffer::mutable_buffers(std::vector& vec, int const bytes) { namespace asio = boost::asio; @@ -205,8 +219,8 @@ void receive_buffer::mutable_buffers(std::vector& v int const last_recv_pos = m_recv_pos - bytes; TORRENT_ASSERT(bytes <= m_recv_pos); - // the number of bytes in the current packet that are received into - // a regular receive buffer (as opposed to a disk cache buffer) + // the number of bytes in the current packet that are being received into a + // regular receive buffer (as opposed to a disk cache buffer) int const regular_buf_size = regular_buffer_size(); TORRENT_ASSERT(regular_buf_size >= 0); @@ -233,7 +247,7 @@ void receive_buffer::mutable_buffers(std::vector& v , m_recv_pos - regular_buf_size)); } -#if defined TORRENT_DEBUG || defined TORRENT_RELEASE_ASSERTS +#if TORRENT_USE_ASSERTS int vec_bytes = 0; for (std::vector::iterator i = vec.begin(); i != vec.end(); ++i) diff --git a/test/Jamfile b/test/Jamfile index 1281f4735..4d89d2e09 100644 --- a/test/Jamfile +++ b/test/Jamfile @@ -154,6 +154,7 @@ test-suite libtorrent : test_linked_list.cpp test_file_progress.cpp ] + [ run test_receive_buffer.cpp ] [ run test_alert_manager.cpp ] [ run test_direct_dht.cpp ] [ run test_magnet.cpp ] diff --git a/test/Makefile.am b/test/Makefile.am index 4e6ae74b2..35b3a3258 100644 --- a/test/Makefile.am +++ b/test/Makefile.am @@ -18,6 +18,7 @@ test_programs = \ test_pe_crypto \ test_pex \ test_read_piece \ + test_receive_buffer \ test_resume \ test_ssl \ test_storage \ diff --git a/test/test_receive_buffer.cpp b/test/test_receive_buffer.cpp new file mode 100644 index 000000000..7059864a0 --- /dev/null +++ b/test/test_receive_buffer.cpp @@ -0,0 +1,255 @@ +/* + +Copyright (c) 2016, Arvid Norberg +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in + the documentation and/or other materials provided with the distribution. + * Neither the name of the author nor the names of its + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. + +*/ + +#include "libtorrent/config.hpp" +#include "test.hpp" +#include "libtorrent/receive_buffer.hpp" + +using namespace libtorrent; + +struct allocator : buffer_allocator_interface +{ + void free_disk_buffer(char*) {} + char* allocate_disk_buffer(char const*) { TORRENT_ASSERT(false); return NULL; } + char* allocate_disk_buffer(bool& + , boost::shared_ptr + , char const*) { TORRENT_ASSERT(false); return NULL; } + char* async_allocate_disk_buffer(char const* + , boost::function const&) { TORRENT_ASSERT(false); return NULL; } + void reclaim_block(block_cache_reference ref) {} +}; + +TORRENT_TEST(recv_buffer_init) +{ + allocator a; + receive_buffer b(a); + + b.cut(0, 10); + + TEST_EQUAL(b.packet_size(), 10); + TEST_EQUAL(b.packet_bytes_remaining(), 10); + TEST_EQUAL(b.packet_finished(), false); + TEST_EQUAL(b.pos(), 0); + TEST_EQUAL(b.capacity(), 0); +} + +TORRENT_TEST(recv_buffer_pos_at_end_false) +{ + allocator a; + receive_buffer b(a); + + b.cut(0, 1000); + // allocate some space to receive into + boost::array vec; + int num_bufs = b.reserve(vec, 1000); + + // since we don't have a disk buffer, there should only be a single + // range/buffer + TEST_EQUAL(num_bufs, 1); + + b.received(1000); + b.advance_pos(999); + + TEST_EQUAL(b.pos_at_end(), false); +} + +TORRENT_TEST(recv_buffer_pos_at_end_true) +{ + allocator a; + receive_buffer b(a); + b.cut(0, 1000); + b.reserve(1000); + boost::array vec; + int num_bufs = b.reserve(vec, 1000); + TEST_EQUAL(num_bufs, 1); + b.received(1000); + b.advance_pos(1000); + TEST_EQUAL(b.pos_at_end(), true); +} + +TORRENT_TEST(recv_buffer_packet_finished) +{ + allocator a; + receive_buffer b(a); + // packet_size = 10 + b.cut(0, 10); + b.reserve(1000); + boost::array vec; + int num_bufs = b.reserve(vec, 1000); + TEST_EQUAL(num_bufs, 1); + b.received(1000); + + for (int i = 0; i < 10; ++i) + { + TEST_EQUAL(b.packet_finished(), false); + b.advance_pos(1); + } + TEST_EQUAL(b.packet_finished(), true); +} + +TORRENT_TEST(recv_buffer_disk_buffer) +{ + char disk_buffer; // fake disk buffer pointer + + allocator a; + receive_buffer b(a); + b.reserve(1000); + b.cut(0, 1000); // packet size = 1000 + boost::array vec; + b.assign_disk_buffer(&disk_buffer, 137); + int num_bufs = b.reserve(vec, 1000); + TEST_EQUAL(num_bufs, 2); + + // regular buffer disk buffer + // -----------------====== + // + // |----------------------| 1000 + // |-----| 137 + // |----------------| 863 + + TEST_EQUAL(boost::asio::buffer_size(vec[0]), 863); + TEST_EQUAL(boost::asio::buffer_size(vec[1]), 137); +} + +#if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) + +TORRENT_TEST(recv_buffer_mutable_buffers_regular_and_disk) +{ + char disk_buffer; // fake disk buffer pointer + + allocator a; + receive_buffer b(a); + b.reserve(1100); + b.cut(0, 100); // packet size = 100 + b.received(1100); + int packet_transferred = b.advance_pos(1100); + // this is just the first packet + TEST_EQUAL(packet_transferred, 100); + // the next packet is 1000, and we're done with the first 100 bytes now + b.cut(100, 1000); // packet size = 1000 + // and it has a disk buffer + b.assign_disk_buffer(&disk_buffer, 137); + std::vector vec; + packet_transferred = b.advance_pos(999); + TEST_EQUAL(packet_transferred, 999); + b.mutable_buffers(vec, 999); + TEST_EQUAL(vec.size(), 2); + + // previous packet + // | + // v regular buffer disk buffer + // - - - -----------------====== + // ^ + // | + // m_recv_start + + // |----------------------| 1000 packet size + // |-----| 137 disk buffer + // |----------------| 863 regular buffer + + TEST_EQUAL(boost::asio::buffer_size(vec[0]), 863); + TEST_EQUAL(boost::asio::buffer_size(vec[1]), 137 - 1); + TEST_EQUAL(boost::asio::buffer_size(vec[0]) + + boost::asio::buffer_size(vec[1]), 999); +} + +TORRENT_TEST(recv_buffer_mutable_buffers_regular_only) +{ + allocator a; + receive_buffer b(a); + b.reserve(1100); + b.cut(0, 100); // packet size = 100 + b.received(1100); + int packet_transferred = b.advance_pos(1100); + // this is just the first packet + TEST_EQUAL(packet_transferred, 100); + // the next packet is 1000, and we're done with the first 100 bytes now + b.cut(100, 1000); // packet size = 1000 + std::vector vec; + packet_transferred = b.advance_pos(999); + TEST_EQUAL(packet_transferred, 999); + b.mutable_buffers(vec, 999); + TEST_EQUAL(vec.size(), 1); + + // previous packet + // | + // v regular buffer + // - - - ----------------------- + // ^ + // | + // m_recv_start + + // |----------------------| 1000 packet size + // |---------------------| 999 regular buffer + + TEST_EQUAL(boost::asio::buffer_size(vec[0]), 999); +} + +TORRENT_TEST(recv_buffer_mutable_buffers_disk) +{ + char disk_buffer; // fake disk buffer pointer + + allocator a; + receive_buffer b(a); + b.reserve(1100); + b.cut(0, 100); // packet size = 100 + b.received(1100); + int packet_transferred = b.advance_pos(1100); + // this is just the first packet + TEST_EQUAL(packet_transferred, 100); + // the next packet is 1000, and we're done with the first 100 bytes now + b.cut(100, 1000); // packet size = 1000 + // and it has a disk buffer + b.assign_disk_buffer(&disk_buffer, 1000); + std::vector vec; + packet_transferred = b.advance_pos(999); + TEST_EQUAL(packet_transferred, 999); + b.mutable_buffers(vec, 999); + TEST_EQUAL(vec.size(), 1); + + // previous packet + // | + // v disk buffer + // - - - ======================= + // ^ + // | + // m_recv_start + + // |----------------------| 1000 packet size + // |----------------------| 999 disk buffer + + TEST_EQUAL(boost::asio::buffer_size(vec[0]), 999); + TEST_EQUAL(boost::asio::buffer_cast(vec[0]), &disk_buffer); +} + +#endif +