remove option to disable contiguous receive buffers (#818)

removed option to disable contiguous receive buffers
This commit is contained in:
Arvid Norberg 2016-06-15 08:04:53 -04:00 committed by GitHub
parent db73946564
commit ce161a9d62
14 changed files with 59 additions and 468 deletions

View File

@ -1,3 +1,4 @@
* removed option to disable contiguous receive buffers
* deprecated public to_hex() and from_hex() functions * deprecated public to_hex() and from_hex() functions
* separated address and port fields in listen alerts * separated address and port fields in listen alerts
* added support for parsing new x.pe parameter from BEP 9 * added support for parsing new x.pe parameter from BEP 9

View File

@ -748,8 +748,6 @@ namespace libtorrent
virtual int hit_send_barrier(std::vector<boost::asio::mutable_buffer>&) virtual int hit_send_barrier(std::vector<boost::asio::mutable_buffer>&)
{ return INT_MAX; } { return INT_MAX; }
bool allocate_disk_receive_buffer(int disk_buffer_size);
void attach_to_torrent(sha1_hash const& ih); void attach_to_torrent(sha1_hash const& ih);
bool verify_piece(peer_request const& p) const; bool verify_piece(peer_request const& p) const;

View File

@ -44,14 +44,12 @@ struct TORRENT_EXTRA_EXPORT receive_buffer
{ {
friend struct crypto_receive_buffer; friend struct crypto_receive_buffer;
receive_buffer(buffer_allocator_interface& allocator) receive_buffer()
: m_recv_start(0) : m_recv_start(0)
, m_recv_end(0) , m_recv_end(0)
, m_recv_pos(0) , m_recv_pos(0)
, m_packet_size(0) , m_packet_size(0)
, m_soft_packet_size(0) , m_soft_packet_size(0)
, m_disk_recv_buffer_size(0)
, m_disk_recv_buffer(allocator, 0)
{} {}
int packet_size() const { return m_packet_size; } int packet_size() const { return m_packet_size; }
@ -66,18 +64,15 @@ struct TORRENT_EXTRA_EXPORT receive_buffer
bool packet_finished() const { return m_packet_size <= m_recv_pos; } bool packet_finished() const { return m_packet_size <= m_recv_pos; }
int pos() const { return m_recv_pos; } int pos() const { return m_recv_pos; }
int capacity() const { return int(m_recv_buffer.capacity()) + m_disk_recv_buffer_size; } int capacity() const { return int(m_recv_buffer.capacity()); }
int regular_buffer_size() const int regular_buffer_size() const
{ {
TORRENT_ASSERT(m_packet_size > 0); TORRENT_ASSERT(m_packet_size > 0);
return m_packet_size - m_disk_recv_buffer_size; return m_packet_size;
} }
// regular buffer only
boost::asio::mutable_buffer reserve(int size); boost::asio::mutable_buffer reserve(int size);
// with possible disk buffer usage
int reserve(std::array<boost::asio::mutable_buffer, 2>& vec, int size);
// tell the buffer we just received more bytes at the end of it. This will // tell the buffer we just received more bytes at the end of it. This will
// advance the end cursor // advance the end cursor
@ -85,8 +80,7 @@ struct TORRENT_EXTRA_EXPORT receive_buffer
{ {
TORRENT_ASSERT(m_packet_size > 0); TORRENT_ASSERT(m_packet_size > 0);
m_recv_end += bytes_transferred; m_recv_end += bytes_transferred;
TORRENT_ASSERT(m_recv_pos <= int(m_recv_buffer.size() TORRENT_ASSERT(m_recv_pos <= int(m_recv_buffer.size()));
+ m_disk_recv_buffer_size));
} }
// tell the buffer we consumed some bytes of it. This will advance the read // tell the buffer we consumed some bytes of it. This will advance the read
@ -114,26 +108,11 @@ struct TORRENT_EXTRA_EXPORT receive_buffer
// returns the entire regular buffer // returns the entire regular buffer
// should only be used during the handshake // should only be used during the handshake
buffer::interval mutable_buffer(); buffer::interval mutable_buffer();
// returns the last 'bytes' from the receive buffer // returns the last 'bytes' from the receive buffer
void mutable_buffers(std::vector<boost::asio::mutable_buffer>& vec, int bytes); boost::asio::mutable_buffer mutable_buffers(int bytes);
#endif #endif
void free_disk_buffer()
{
m_disk_recv_buffer.reset();
m_disk_recv_buffer_size = 0;
}
bool has_disk_buffer() const { return m_disk_recv_buffer; }
void assert_no_disk_buffer() const
{
TORRENT_ASSERT(!m_disk_recv_buffer);
TORRENT_ASSERT(m_disk_recv_buffer_size == 0);
}
void assign_disk_buffer(char* buffer, int size);
char* release_disk_buffer();
// the purpose of this function is to free up and cut off all messages // the purpose of this function is to free up and cut off all messages
// in the receive buffer that have been parsed and processed. // in the receive buffer that have been parsed and processed.
void normalize(); void normalize();
@ -141,13 +120,10 @@ struct TORRENT_EXTRA_EXPORT receive_buffer
void reset(int packet_size); void reset(int packet_size);
bool can_recv_contiguous(int /*size*/) const { return true; }
#if TORRENT_USE_INVARIANT_CHECKS #if TORRENT_USE_INVARIANT_CHECKS
void check_invariant() const void check_invariant() const
{ {
TORRENT_ASSERT(m_recv_end >= m_recv_start); TORRENT_ASSERT(m_recv_end >= m_recv_start);
TORRENT_ASSERT(bool(m_disk_recv_buffer) == (m_disk_recv_buffer_size > 0));
} }
#endif #endif
@ -175,12 +151,6 @@ private:
// beyond this point is garbage) // beyond this point is garbage)
// m_recv_buffer // m_recv_buffer
// when not using contiguous receive buffers, there
// may be a disk_recv_buffer in the mix as well. Whenever
// m_disk_recv_buffer_size > 0 (and presumably also
// m_disk_recv_buffer != NULL) the disk buffer is imagined
// to be appended to the receive buffer right after m_recv_end.
// the start of the logical receive buffer // the start of the logical receive buffer
int m_recv_start; int m_recv_start;
@ -202,15 +172,7 @@ private:
// have sent to it // have sent to it
int m_soft_packet_size; int m_soft_packet_size;
int m_disk_recv_buffer_size;
buffer m_recv_buffer; buffer m_recv_buffer;
// if this peer is receiving a piece, this
// points to a disk buffer that the data is
// read into. This eliminates a memcopy from
// the receive buffer into the disk buffer
disk_buffer_holder m_disk_recv_buffer;
}; };
#if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) #if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS)
@ -229,9 +191,6 @@ struct crypto_receive_buffer
{} {}
buffer::interval mutable_buffer() { return m_connection_buffer.mutable_buffer(); } buffer::interval mutable_buffer() { return m_connection_buffer.mutable_buffer(); }
char* release_disk_buffer() { return m_connection_buffer.release_disk_buffer(); }
bool has_disk_buffer() const { return m_connection_buffer.has_disk_buffer(); }
void assert_no_disk_buffer() const { m_connection_buffer.assert_no_disk_buffer(); }
bool packet_finished() const; bool packet_finished() const;
@ -267,17 +226,7 @@ struct crypto_receive_buffer
buffer::const_interval get() const; buffer::const_interval get() const;
bool can_recv_contiguous(int /*size*/) const boost::asio::mutable_buffer mutable_buffers(std::size_t bytes);
{
// TODO: Detect when the start of the next crpyto packet is aligned
// with the start of piece data and the crpyto packet is at least
// as large as the piece data. With a little extra work
// we could receive directly into a disk buffer in that case.
return m_recv_pos == INT_MAX;
}
void mutable_buffers(std::vector<boost::asio::mutable_buffer>& vec
, std::size_t bytes_transfered);
private: private:
// explicitly disallow assignment, to silence msvc warning // explicitly disallow assignment, to silence msvc warning

View File

@ -134,13 +134,6 @@ namespace libtorrent
// (``cache_size``). Any existing file with the same name will be // (``cache_size``). Any existing file with the same name will be
// replaced. // replaced.
// //
// Since this setting sets a hard upper limit on cache usage, it
// cannot be combined with
// ``settings_pack::contiguous_recv_buffer``, since that feature
// treats the ``cache_size`` setting as a soft (but still pretty hard)
// limit. The result of combining the two is peers being disconnected
// after failing to allocate more disk buffers.
//
// This feature requires the ``mmap`` system call, on systems that // This feature requires the ``mmap`` system call, on systems that
// don't have ``mmap`` this setting is ignored. // don't have ``mmap`` this setting is ignored.
mmap_cache, mmap_cache,
@ -546,6 +539,7 @@ namespace libtorrent
// feet. // feet.
lock_files, lock_files,
#ifndef TORRENT_NO_DEPRECATE
// ``contiguous_recv_buffer`` determines whether or not libtorrent // ``contiguous_recv_buffer`` determines whether or not libtorrent
// should receive data from peers into a contiguous intermediate // should receive data from peers into a contiguous intermediate
// buffer, to then copy blocks into disk buffers from, or to make many // buffer, to then copy blocks into disk buffers from, or to make many
@ -558,6 +552,9 @@ namespace libtorrent
// seeding to peers, since that's when it provides performance // seeding to peers, since that's when it provides performance
// improvements. // improvements.
contiguous_recv_buffer, contiguous_recv_buffer,
#else
deprecated15,
#endif
// when true, web seeds sending bad data will be banned // when true, web seeds sending bad data will be banned
ban_web_seeds, ban_web_seeds,

View File

@ -1161,43 +1161,19 @@ namespace libtorrent
disconnect(errors::packet_too_large, op_bittorrent, 2); disconnect(errors::packet_too_large, op_bittorrent, 2);
return; return;
} }
m_recv_buffer.assert_no_disk_buffer();
if (!m_settings.get_bool(settings_pack::contiguous_recv_buffer) &&
m_recv_buffer.can_recv_contiguous(m_recv_buffer.packet_size() - 13 - list_size))
{
if (!allocate_disk_receive_buffer(m_recv_buffer.packet_size() - 13 - list_size))
{
received_bytes(0, received);
return;
}
}
} }
} }
else else
{ {
if (recv_pos == 1) if (recv_pos == 1)
{ {
m_recv_buffer.assert_no_disk_buffer();
if (m_recv_buffer.packet_size() - 9 > t->block_size()) if (m_recv_buffer.packet_size() - 9 > t->block_size())
{ {
disconnect(errors::packet_too_large, op_bittorrent, 2); disconnect(errors::packet_too_large, op_bittorrent, 2);
return; return;
} }
if (!m_settings.get_bool(settings_pack::contiguous_recv_buffer) &&
m_recv_buffer.can_recv_contiguous(m_recv_buffer.packet_size() - 9))
{
if (!allocate_disk_receive_buffer(m_recv_buffer.packet_size() - 9))
{
received_bytes(0, received);
return;
} }
} }
}
}
TORRENT_ASSERT(m_settings.get_bool(settings_pack::contiguous_recv_buffer) || m_recv_buffer.has_disk_buffer() || m_recv_buffer.packet_size() == 9);
// classify the received data as protocol chatter // classify the received data as protocol chatter
// or data payload for the statistics // or data payload for the statistics
int piece_bytes = 0; int piece_bytes = 0;
@ -1269,8 +1245,6 @@ namespace libtorrent
if (is_disconnecting()) return; if (is_disconnecting()) return;
} }
TORRENT_ASSERT(m_settings.get_bool(settings_pack::contiguous_recv_buffer) || m_recv_buffer.has_disk_buffer() || m_recv_buffer.packet_size() == header_size);
incoming_piece_fragment(piece_bytes); incoming_piece_fragment(piece_bytes);
if (!m_recv_buffer.packet_finished()) return; if (!m_recv_buffer.packet_finished()) return;
@ -1317,17 +1291,8 @@ namespace libtorrent
} }
} }
char* disk_buffer = m_recv_buffer.release_disk_buffer();
if (disk_buffer)
{
disk_buffer_holder holder(m_allocator, disk_buffer);
incoming_piece(p, holder);
}
else
{
incoming_piece(p, recv_buffer.begin + header_size); incoming_piece(p, recv_buffer.begin + header_size);
} }
}
// ----------------------------- // -----------------------------
// ---------- CANCEL ----------- // ---------- CANCEL -----------

View File

@ -179,7 +179,7 @@ namespace libtorrent
if (recv_buffer.crypto_packet_finished()) if (recv_buffer.crypto_packet_finished())
{ {
std::vector<boost::asio::mutable_buffer> wr_buf; std::vector<boost::asio::mutable_buffer> wr_buf;
recv_buffer.mutable_buffers(wr_buf, bytes_transferred); wr_buf.push_back(recv_buffer.mutable_buffers(bytes_transferred));
int packet_size = 0; int packet_size = 0;
int produce = int(bytes_transferred); int produce = int(bytes_transferred);
m_dec_handler->decrypt(wr_buf, consume, produce, packet_size); m_dec_handler->decrypt(wr_buf, consume, produce, packet_size);

View File

@ -123,7 +123,6 @@ namespace libtorrent
, m_peer_info(pack.peerinfo) , m_peer_info(pack.peerinfo)
, m_counters(*pack.stats_counters) , m_counters(*pack.stats_counters)
, m_num_pieces(0) , m_num_pieces(0)
, m_recv_buffer(*pack.allocator)
, m_max_out_request_queue(m_settings.get_int(settings_pack::max_out_request_queue)) , m_max_out_request_queue(m_settings.get_int(settings_pack::max_out_request_queue))
, m_remote(pack.endp) , m_remote(pack.endp)
, m_disk_thread(*pack.disk_thread) , m_disk_thread(*pack.disk_thread)
@ -2644,8 +2643,6 @@ namespace libtorrent
boost::shared_ptr<torrent> t = m_torrent.lock(); boost::shared_ptr<torrent> t = m_torrent.lock();
TORRENT_ASSERT(t); TORRENT_ASSERT(t);
m_recv_buffer.assert_no_disk_buffer();
// we're not receiving any block right now // we're not receiving any block right now
m_receiving_block = piece_block::invalid; m_receiving_block = piece_block::invalid;
@ -4114,7 +4111,6 @@ namespace libtorrent
// make sure we free up all send buffers that are owned // make sure we free up all send buffers that are owned
// by the disk thread // by the disk thread
m_send_buffer.clear(); m_send_buffer.clear();
m_recv_buffer.free_disk_buffer();
} }
// we cannot do this in a constructor // we cannot do this in a constructor
@ -4515,59 +4511,6 @@ namespace libtorrent
p.local_endpoint = get_socket()->local_endpoint(ec); p.local_endpoint = get_socket()->local_endpoint(ec);
} }
// allocates a disk buffer of size 'disk_buffer_size' and replaces the
// end of the current receive buffer with it. i.e. the receive pos
// must be <= packet_size - disk_buffer_size
// the disk buffer can be accessed through release_disk_receive_buffer()
// when it is queried, the responsibility to free it is transferred
// to the caller
bool peer_connection::allocate_disk_receive_buffer(int disk_buffer_size)
{
TORRENT_ASSERT(is_single_thread());
INVARIANT_CHECK;
m_recv_buffer.assert_no_disk_buffer();
TORRENT_ASSERT(m_recv_buffer.pos() <= m_recv_buffer.packet_size() - disk_buffer_size);
TORRENT_ASSERT(disk_buffer_size <= 16 * 1024);
if (disk_buffer_size == 0) return true;
if (disk_buffer_size > 16 * 1024)
{
disconnect(errors::invalid_piece_size, op_bittorrent, 2);
return false;
}
// first free the old buffer
m_recv_buffer.free_disk_buffer();
// then allocate a new one
bool exceeded = false;
m_recv_buffer.assign_disk_buffer(
m_allocator.allocate_disk_buffer(exceeded, self(), "receive buffer")
, disk_buffer_size);
if (!m_recv_buffer.has_disk_buffer())
{
disconnect(errors::no_memory, op_alloc_recvbuf);
return false;
}
// to understand why m_outstanding_writing_bytes is here, see comment by
// the other call to allocate_disk_buffer()
if (exceeded && m_outstanding_writing_bytes > 0)
{
#ifndef TORRENT_DISABLE_LOGGING
peer_log(peer_log_alert::info, "DISK", "exceeded disk buffer watermark");
#endif
if ((m_channel_state[download_channel] & peer_info::bw_disk) == 0)
m_counters.inc_stats_counter(counters::num_peers_down_disk);
m_channel_state[download_channel] |= peer_info::bw_disk;
}
return true;
}
void peer_connection::superseed_piece(int replace_piece, int new_piece) void peer_connection::superseed_piece(int replace_piece, int new_piece)
{ {
TORRENT_ASSERT(is_single_thread()); TORRENT_ASSERT(is_single_thread());
@ -5750,13 +5693,10 @@ namespace libtorrent
int max_receive = m_recv_buffer.max_receive(); int max_receive = m_recv_buffer.max_receive();
std::array<boost::asio::mutable_buffer, 2> vec;
int num_bufs = 0;
// only apply the contiguous receive buffer when we don't have any // only apply the contiguous receive buffer when we don't have any
// outstanding requests. When we're likely to receive pieces, we'll // outstanding requests. When we're likely to receive pieces, we'll
// save more time from avoiding copying data from the socket // save more time from avoiding copying data from the socket
if ((m_settings.get_bool(settings_pack::contiguous_recv_buffer) if (m_download_queue.empty())
|| m_download_queue.empty()) && !m_recv_buffer.has_disk_buffer())
{ {
if (s == read_sync) if (s == read_sync)
{ {
@ -5788,7 +5728,7 @@ namespace libtorrent
return 0; return 0;
} }
num_bufs = m_recv_buffer.reserve(vec, max_receive); boost::asio::mutable_buffer const vec = m_recv_buffer.reserve(max_receive);
if (s == read_async) if (s == read_async)
{ {
@ -5801,33 +5741,14 @@ namespace libtorrent
// utp sockets aren't thread safe... // utp sockets aren't thread safe...
ADD_OUTSTANDING_ASYNC("peer_connection::on_receive_data"); ADD_OUTSTANDING_ASYNC("peer_connection::on_receive_data");
if (num_bufs == 1)
{
TORRENT_ASSERT(boost::asio::buffer_size(vec[0]) > 0);
m_socket->async_read_some( m_socket->async_read_some(
boost::asio::mutable_buffers_1(vec[0]), make_read_handler( boost::asio::mutable_buffers_1(vec), make_read_handler(
std::bind(&peer_connection::on_receive_data, self(), _1, _2))); std::bind(&peer_connection::on_receive_data, self(), _1, _2)));
}
else
{
TORRENT_ASSERT(boost::asio::buffer_size(vec[0])
+ boost::asio::buffer_size(vec[1])> 0);
m_socket->async_read_some(
vec, make_read_handler(
std::bind(&peer_connection::on_receive_data, self(), _1, _2)));
}
return 0; return 0;
} }
size_t ret = 0; size_t const ret = m_socket->read_some(boost::asio::mutable_buffers_1(vec), ec);
if (num_bufs == 1)
{
ret = m_socket->read_some(boost::asio::mutable_buffers_1(vec[0]), ec);
}
else
{
ret = m_socket->read_some(vec, ec);
}
// this is weird. You would imagine read_some() would do this // this is weird. You would imagine read_some() would do this
if (ret == 0 && !ec) ec = boost::asio::error::eof; if (ret == 0 && !ec) ec = boost::asio::error::eof;
@ -6454,7 +6375,6 @@ namespace libtorrent
// make sure we free up all send buffers that are owned // make sure we free up all send buffers that are owned
// by the disk thread // by the disk thread
m_send_buffer.clear(); m_send_buffer.clear();
m_recv_buffer.free_disk_buffer();
return; return;
} }

View File

@ -53,69 +53,16 @@ int receive_buffer::max_receive()
boost::asio::mutable_buffer receive_buffer::reserve(int size) boost::asio::mutable_buffer receive_buffer::reserve(int size)
{ {
TORRENT_ASSERT(size > 0); TORRENT_ASSERT(size > 0);
TORRENT_ASSERT(!m_disk_recv_buffer); TORRENT_ASSERT(m_recv_pos >= 0);
// this is unintuitive, but we used to use m_recv_pos in this function when // this is unintuitive, but we used to use m_recv_pos in this function when
// we should have used m_recv_end. perhaps they always happen to be equal // we should have used m_recv_end. perhaps they always happen to be equal
TORRENT_ASSERT(m_recv_pos == m_recv_end); TORRENT_ASSERT(m_recv_pos == m_recv_end);
m_recv_buffer.resize(m_recv_end + size);
return boost::asio::buffer(&m_recv_buffer[0] + m_recv_end, size);
}
int receive_buffer::reserve(std::array<boost::asio::mutable_buffer, 2>& vec, int size)
{
TORRENT_ASSERT(size > 0);
TORRENT_ASSERT(m_recv_pos >= 0);
TORRENT_ASSERT(m_packet_size > 0);
// normalize() must be called before receiving more data // normalize() must be called before receiving more data
TORRENT_ASSERT(m_recv_start == 0); TORRENT_ASSERT(m_recv_start == 0);
// this is unintuitive, but we used to use m_recv_pos in this function when m_recv_buffer.resize(m_recv_end + size);
// we should have used m_recv_end. perhaps they always happen to be equal return boost::asio::buffer(&m_recv_buffer[0] + m_recv_end, size);
TORRENT_ASSERT(m_recv_pos == m_recv_end);
int num_bufs = -1;
int const regular_buf_size = regular_buffer_size();
if (int(m_recv_buffer.size()) < regular_buf_size)
m_recv_buffer.resize(round_up8(regular_buf_size));
if (!m_disk_recv_buffer || regular_buf_size >= m_recv_end + size)
{
// only receive into regular buffer
TORRENT_ASSERT(m_recv_end + size <= int(m_recv_buffer.size()));
vec[0] = boost::asio::buffer(&m_recv_buffer[0] + m_recv_end, size);
TORRENT_ASSERT(boost::asio::buffer_size(vec[0]) > 0);
num_bufs = 1;
}
else if (m_recv_end >= regular_buf_size)
{
// only receive into disk buffer
TORRENT_ASSERT(m_recv_end - regular_buf_size >= 0);
TORRENT_ASSERT(m_recv_end - regular_buf_size + size <= m_disk_recv_buffer_size);
vec[0] = boost::asio::buffer(m_disk_recv_buffer.get() + m_recv_end - regular_buf_size, size);
TORRENT_ASSERT(boost::asio::buffer_size(vec[0]) > 0);
num_bufs = 1;
}
else
{
// receive into both regular and disk buffer
TORRENT_ASSERT(size + m_recv_end > regular_buf_size);
TORRENT_ASSERT(m_recv_end < regular_buf_size);
TORRENT_ASSERT(size - regular_buf_size
+ m_recv_end <= m_disk_recv_buffer_size);
vec[0] = boost::asio::buffer(&m_recv_buffer[0] + m_recv_end
, regular_buf_size - m_recv_end);
vec[1] = boost::asio::buffer(m_disk_recv_buffer.get()
, size - regular_buf_size + m_recv_end);
TORRENT_ASSERT(boost::asio::buffer_size(vec[0])
+ boost::asio::buffer_size(vec[1])> 0);
num_bufs = 2;
}
return num_bufs;
} }
int receive_buffer::advance_pos(int bytes) int receive_buffer::advance_pos(int bytes)
@ -199,84 +146,26 @@ buffer::interval receive_buffer::mutable_buffer()
TORRENT_ASSERT(m_recv_pos == 0); TORRENT_ASSERT(m_recv_pos == 0);
return buffer::interval(0,0); return buffer::interval(0,0);
} }
TORRENT_ASSERT(!m_disk_recv_buffer); int const rcv_pos = (std::min)(m_recv_pos, int(m_recv_buffer.size()));
TORRENT_ASSERT(m_disk_recv_buffer_size == 0);
int rcv_pos = (std::min)(m_recv_pos, int(m_recv_buffer.size()));
return buffer::interval(&m_recv_buffer[0] + m_recv_start return buffer::interval(&m_recv_buffer[0] + m_recv_start
, &m_recv_buffer[0] + m_recv_start + rcv_pos); , &m_recv_buffer[0] + m_recv_start + rcv_pos);
} }
// TODO: 2 should this take a std::array<..., 2> instead? it could return the boost::asio::mutable_buffer receive_buffer::mutable_buffers(int const bytes)
// number of buffers added, just like reserve.
void receive_buffer::mutable_buffers(std::vector<boost::asio::mutable_buffer>& vec, int const bytes)
{ {
namespace asio = boost::asio; namespace asio = boost::asio;
// bytes is the number of bytes we just received, and m_recv_pos has // bytes is the number of bytes we just received, and m_recv_pos has
// already been adjusted for these bytes. The receive pos immediately // already been adjusted for these bytes. The receive pos immediately
// before we received these bytes was (m_recv_pos - bytes) // before we received these bytes was (m_recv_pos - bytes)
int const last_recv_pos = m_recv_pos - bytes; int const last_recv_pos = m_recv_pos - bytes;
TORRENT_ASSERT(bytes <= m_recv_pos); TORRENT_ASSERT(bytes <= m_recv_pos);
// the number of bytes in the current packet that are being received into a return asio::mutable_buffer(&m_recv_buffer[0] + m_recv_start
// regular receive buffer (as opposed to a disk cache buffer) + last_recv_pos, bytes);
int const regular_buf_size = regular_buffer_size();
TORRENT_ASSERT(regular_buf_size >= 0);
if (!m_disk_recv_buffer || regular_buf_size >= m_recv_pos)
{
// we just received into a regular disk buffer
vec.push_back(asio::mutable_buffer(&m_recv_buffer[0] + m_recv_start
+ last_recv_pos, bytes));
}
else if (last_recv_pos >= regular_buf_size)
{
// we only received into a disk buffer
vec.push_back(asio::mutable_buffer(m_disk_recv_buffer.get()
+ last_recv_pos - regular_buf_size, bytes));
}
else
{
// we received into a regular and a disk buffer
TORRENT_ASSERT(last_recv_pos < regular_buf_size);
TORRENT_ASSERT(m_recv_pos > regular_buf_size);
vec.push_back(asio::mutable_buffer(&m_recv_buffer[0] + m_recv_start + last_recv_pos
, regular_buf_size - last_recv_pos));
vec.push_back(asio::mutable_buffer(m_disk_recv_buffer.get()
, m_recv_pos - regular_buf_size));
}
#if TORRENT_USE_ASSERTS
int vec_bytes = 0;
for (std::vector<asio::mutable_buffer>::iterator i = vec.begin();
i != vec.end(); ++i)
vec_bytes += int(boost::asio::buffer_size(*i));
TORRENT_ASSERT(vec_bytes == bytes);
#endif
} }
#endif #endif
void receive_buffer::assign_disk_buffer(char* buffer, int size)
{
TORRENT_ASSERT(m_packet_size > 0);
assert_no_disk_buffer();
m_disk_recv_buffer.reset(buffer);
if (m_disk_recv_buffer)
m_disk_recv_buffer_size = size;
}
char* receive_buffer::release_disk_buffer()
{
if (!m_disk_recv_buffer) return 0;
TORRENT_ASSERT(m_disk_recv_buffer_size <= m_recv_end);
TORRENT_ASSERT(m_recv_start <= m_recv_end - m_disk_recv_buffer_size);
m_recv_end -= m_disk_recv_buffer_size;
m_disk_recv_buffer_size = 0;
return m_disk_recv_buffer.release();
}
// the purpose of this function is to free up and cut off all messages // the purpose of this function is to free up and cut off all messages
// in the receive buffer that have been parsed and processed. // in the receive buffer that have been parsed and processed.
void receive_buffer::normalize() void receive_buffer::normalize()
@ -370,7 +259,6 @@ void crypto_receive_buffer::crypto_reset(int packet_size)
TORRENT_ASSERT(crypto_packet_finished()); TORRENT_ASSERT(crypto_packet_finished());
TORRENT_ASSERT(m_recv_pos == INT_MAX || m_recv_pos == m_connection_buffer.pos()); TORRENT_ASSERT(m_recv_pos == INT_MAX || m_recv_pos == m_connection_buffer.pos());
TORRENT_ASSERT(m_recv_pos == INT_MAX || m_connection_buffer.pos_at_end()); TORRENT_ASSERT(m_recv_pos == INT_MAX || m_connection_buffer.pos_at_end());
TORRENT_ASSERT(!m_connection_buffer.has_disk_buffer());
if (packet_size == 0) if (packet_size == 0)
{ {
@ -417,16 +305,13 @@ buffer::const_interval crypto_receive_buffer::get() const
return recv_buffer; return recv_buffer;
} }
void crypto_receive_buffer::mutable_buffers( boost::asio::mutable_buffer crypto_receive_buffer::mutable_buffers(
std::vector<boost::asio::mutable_buffer>& vec std::size_t const bytes)
, std::size_t bytes_transfered)
{ {
int pending_decryption = int(bytes_transfered); int const pending_decryption = (m_recv_pos != INT_MAX)
if (m_recv_pos != INT_MAX) ? m_connection_buffer.packet_size() - m_recv_pos
{ : int(bytes);
pending_decryption = m_connection_buffer.packet_size() - m_recv_pos; return m_connection_buffer.mutable_buffers(pending_decryption);
}
m_connection_buffer.mutable_buffers(vec, pending_decryption);
} }
#endif // TORRENT_DISABLE_ENCRYPTION #endif // TORRENT_DISABLE_ENCRYPTION

View File

@ -77,10 +77,12 @@ namespace libtorrent
{ {
TORRENT_EXPORT void min_memory_usage(settings_pack& set) TORRENT_EXPORT void min_memory_usage(settings_pack& set)
{ {
#ifndef TORRENT_NO_DEPRECATE
// receive data directly into disk buffers // receive data directly into disk buffers
// this yields more system calls to read() and // this yields more system calls to read() and
// kqueue(), but saves RAM. // kqueue(), but saves RAM.
set.set_bool(settings_pack::contiguous_recv_buffer, false); set.set_bool(settings_pack::contiguous_recv_buffer, false);
#endif
set.set_int(settings_pack::disk_io_write_mode, settings_pack::disable_os_cache); set.set_int(settings_pack::disk_io_write_mode, settings_pack::disable_os_cache);
set.set_int(settings_pack::disk_io_read_mode, settings_pack::disable_os_cache); set.set_int(settings_pack::disk_io_read_mode, settings_pack::disable_os_cache);

View File

@ -195,7 +195,7 @@ namespace libtorrent
SET(apply_ip_filter_to_trackers, true, 0), SET(apply_ip_filter_to_trackers, true, 0),
SET(use_disk_read_ahead, true, 0), SET(use_disk_read_ahead, true, 0),
SET(lock_files, false, 0), SET(lock_files, false, 0),
SET(contiguous_recv_buffer, true, 0), DEPRECATED_SET(contiguous_recv_buffer, true, 0),
SET(ban_web_seeds, true, 0), SET(ban_web_seeds, true, 0),
SET_NOPREV(allow_partial_disk_writes, true, 0), SET_NOPREV(allow_partial_disk_writes, true, 0),
SET(force_proxy, false, &session_impl::update_force_proxy), SET(force_proxy, false, &session_impl::update_force_proxy),

View File

@ -141,7 +141,6 @@ void web_peer_connection::disconnect(error_code const& ec
// prevent the peer from trying to send anything more // prevent the peer from trying to send anything more
m_send_buffer.clear(); m_send_buffer.clear();
m_recv_buffer.free_disk_buffer();
// when the web server closed our write-end of the socket (i.e. its // when the web server closed our write-end of the socket (i.e. its
// read-end), if it's an HTTP 1.0 server. we will stop sending more // read-end), if it's an HTTP 1.0 server. we will stop sending more

View File

@ -36,22 +36,9 @@ POSSIBILITY OF SUCH DAMAGE.
using namespace libtorrent; using namespace libtorrent;
struct allocator : buffer_allocator_interface
{
void free_disk_buffer(char*) {}
char* allocate_disk_buffer(char const*) { TORRENT_ASSERT_FAIL(); return NULL; }
char* allocate_disk_buffer(bool&
, boost::shared_ptr<disk_observer>
, char const*) { TORRENT_ASSERT_FAIL(); return NULL; }
char* async_allocate_disk_buffer(char const*
, boost::function<void(char*)> const&) { TORRENT_ASSERT_FAIL(); return NULL; }
void reclaim_block(block_cache_reference ref) {}
};
TORRENT_TEST(recv_buffer_init) TORRENT_TEST(recv_buffer_init)
{ {
allocator a; receive_buffer b;
receive_buffer b(a);
b.cut(0, 10); b.cut(0, 10);
@ -64,17 +51,12 @@ TORRENT_TEST(recv_buffer_init)
TORRENT_TEST(recv_buffer_pos_at_end_false) TORRENT_TEST(recv_buffer_pos_at_end_false)
{ {
allocator a; receive_buffer b;
receive_buffer b(a);
b.cut(0, 1000); b.cut(0, 1000);
// allocate some space to receive into // allocate some space to receive into
std::array<boost::asio::mutable_buffer, 2> vec; boost::asio::mutable_buffer vec
int num_bufs = b.reserve(vec, 1000); = b.reserve(1000);
// since we don't have a disk buffer, there should only be a single
// range/buffer
TEST_EQUAL(num_bufs, 1);
b.received(1000); b.received(1000);
b.advance_pos(999); b.advance_pos(999);
@ -84,13 +66,10 @@ TORRENT_TEST(recv_buffer_pos_at_end_false)
TORRENT_TEST(recv_buffer_pos_at_end_true) TORRENT_TEST(recv_buffer_pos_at_end_true)
{ {
allocator a; receive_buffer b;
receive_buffer b(a);
b.cut(0, 1000); b.cut(0, 1000);
b.reserve(1000); b.reserve(1000);
std::array<boost::asio::mutable_buffer, 2> vec; boost::asio::mutable_buffer vec = b.reserve(1000);
int num_bufs = b.reserve(vec, 1000);
TEST_EQUAL(num_bufs, 1);
b.received(1000); b.received(1000);
b.advance_pos(1000); b.advance_pos(1000);
TEST_EQUAL(b.pos_at_end(), true); TEST_EQUAL(b.pos_at_end(), true);
@ -98,14 +77,11 @@ TORRENT_TEST(recv_buffer_pos_at_end_true)
TORRENT_TEST(recv_buffer_packet_finished) TORRENT_TEST(recv_buffer_packet_finished)
{ {
allocator a; receive_buffer b;
receive_buffer b(a);
// packet_size = 10 // packet_size = 10
b.cut(0, 10); b.cut(0, 10);
b.reserve(1000); b.reserve(1000);
std::array<boost::asio::mutable_buffer, 2> vec; boost::asio::mutable_buffer vec = b.reserve(1000);
int num_bufs = b.reserve(vec, 1000);
TEST_EQUAL(num_bufs, 1);
b.received(1000); b.received(1000);
for (int i = 0; i < 10; ++i) for (int i = 0; i < 10; ++i)
@ -116,39 +92,11 @@ TORRENT_TEST(recv_buffer_packet_finished)
TEST_EQUAL(b.packet_finished(), true); TEST_EQUAL(b.packet_finished(), true);
} }
TORRENT_TEST(recv_buffer_disk_buffer)
{
char disk_buffer; // fake disk buffer pointer
allocator a;
receive_buffer b(a);
b.reserve(1000);
b.cut(0, 1000); // packet size = 1000
std::array<boost::asio::mutable_buffer, 2> vec;
b.assign_disk_buffer(&disk_buffer, 137);
int const num_bufs = b.reserve(vec, 1000);
TEST_EQUAL(num_bufs, 2);
// regular buffer disk buffer
// -----------------======
//
// |----------------------| 1000
// |-----| 137
// |----------------| 863
TEST_EQUAL(boost::asio::buffer_size(vec[0]), 863);
// cppcheck-suppress arrayIndexOutOfBounds
TEST_EQUAL(boost::asio::buffer_size(vec[1]), 137);
}
#if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) #if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS)
TORRENT_TEST(recv_buffer_mutable_buffers_regular_and_disk) TORRENT_TEST(recv_buffer_mutable_buffers)
{ {
char disk_buffer; // fake disk buffer pointer receive_buffer b;
allocator a;
receive_buffer b(a);
b.reserve(1100); b.reserve(1100);
b.cut(0, 100); // packet size = 100 b.cut(0, 100); // packet size = 100
b.received(1100); b.received(1100);
@ -157,99 +105,22 @@ TORRENT_TEST(recv_buffer_mutable_buffers_regular_and_disk)
TEST_EQUAL(packet_transferred, 100); TEST_EQUAL(packet_transferred, 100);
// the next packet is 1000, and we're done with the first 100 bytes now // the next packet is 1000, and we're done with the first 100 bytes now
b.cut(100, 1000); // packet size = 1000 b.cut(100, 1000); // packet size = 1000
// and it has a disk buffer
b.assign_disk_buffer(&disk_buffer, 137);
std::vector<boost::asio::mutable_buffer> vec;
packet_transferred = b.advance_pos(999); packet_transferred = b.advance_pos(999);
TEST_EQUAL(packet_transferred, 999); TEST_EQUAL(packet_transferred, 999);
b.mutable_buffers(vec, 999); boost::asio::mutable_buffer vec = b.mutable_buffers(999);
TEST_EQUAL(vec.size(), 2);
// previous packet // previous packet
// | // |
// v regular buffer disk buffer // v buffer
// - - - -----------------======
// ^
// |
// m_recv_start
// |----------------------| 1000 packet size
// |-----| 137 disk buffer
// |----------------| 863 regular buffer
TEST_EQUAL(boost::asio::buffer_size(vec[0]), 863);
TEST_EQUAL(boost::asio::buffer_size(vec[1]), 137 - 1);
TEST_EQUAL(boost::asio::buffer_size(vec[0])
+ boost::asio::buffer_size(vec[1]), 999);
}
TORRENT_TEST(recv_buffer_mutable_buffers_regular_only)
{
allocator a;
receive_buffer b(a);
b.reserve(1100);
b.cut(0, 100); // packet size = 100
b.received(1100);
int packet_transferred = b.advance_pos(1100);
// this is just the first packet
TEST_EQUAL(packet_transferred, 100);
// the next packet is 1000, and we're done with the first 100 bytes now
b.cut(100, 1000); // packet size = 1000
std::vector<boost::asio::mutable_buffer> vec;
packet_transferred = b.advance_pos(999);
TEST_EQUAL(packet_transferred, 999);
b.mutable_buffers(vec, 999);
TEST_EQUAL(vec.size(), 1);
// previous packet
// |
// v regular buffer
// - - - ----------------------- // - - - -----------------------
// ^ // ^
// | // |
// m_recv_start // m_recv_start
// |----------------------| 1000 packet size // |----------------------| 1000 packet size
// |---------------------| 999 regular buffer // |---------------------| 999 buffer
TEST_EQUAL(boost::asio::buffer_size(vec[0]), 999); TEST_EQUAL(boost::asio::buffer_size(vec), 999);
}
TORRENT_TEST(recv_buffer_mutable_buffers_disk)
{
char disk_buffer; // fake disk buffer pointer
allocator a;
receive_buffer b(a);
b.reserve(1100);
b.cut(0, 100); // packet size = 100
b.received(1100);
int packet_transferred = b.advance_pos(1100);
// this is just the first packet
TEST_EQUAL(packet_transferred, 100);
// the next packet is 1000, and we're done with the first 100 bytes now
b.cut(100, 1000); // packet size = 1000
// and it has a disk buffer
b.assign_disk_buffer(&disk_buffer, 1000);
std::vector<boost::asio::mutable_buffer> vec;
packet_transferred = b.advance_pos(999);
TEST_EQUAL(packet_transferred, 999);
b.mutable_buffers(vec, 999);
TEST_EQUAL(vec.size(), 1);
// previous packet
// |
// v disk buffer
// - - - =======================
// ^
// |
// m_recv_start
// |----------------------| 1000 packet size
// |----------------------| 999 disk buffer
TEST_EQUAL(boost::asio::buffer_size(vec[0]), 999);
TEST_EQUAL(boost::asio::buffer_cast<char*>(vec[0]), &disk_buffer);
} }
#endif #endif

View File

@ -97,7 +97,9 @@ TORRENT_TEST(test_name)
TEST_EQUAL(setting_by_name(#n), settings_pack:: n) \ TEST_EQUAL(setting_by_name(#n), settings_pack:: n) \
TEST_CHECK(strcmp(name_for_setting(settings_pack:: n), #n) == 0) TEST_CHECK(strcmp(name_for_setting(settings_pack:: n), #n) == 0)
#ifndef TORRENT_NO_DEPRECATE
TEST_NAME(contiguous_recv_buffer); TEST_NAME(contiguous_recv_buffer);
#endif
TEST_NAME(choking_algorithm); TEST_NAME(choking_algorithm);
TEST_NAME(seeding_piece_quota); TEST_NAME(seeding_piece_quota);
#ifndef TORRENT_NO_DEPRECATE #ifndef TORRENT_NO_DEPRECATE

View File

@ -357,6 +357,7 @@ void cleanup()
remove_all("tmp2_transfer_moved", ec); remove_all("tmp2_transfer_moved", ec);
} }
#ifndef TORRENT_NO_DEPRECATE
TORRENT_TEST(no_contiguous_buffers) TORRENT_TEST(no_contiguous_buffers)
{ {
using namespace libtorrent; using namespace libtorrent;
@ -368,6 +369,7 @@ TORRENT_TEST(no_contiguous_buffers)
cleanup(); cleanup();
} }
#endif
// test with all kinds of proxies // test with all kinds of proxies
TORRENT_TEST(socks5_pw) TORRENT_TEST(socks5_pw)