make buffer_allocator_interface an implementation detail of the disk_io_thread (#1463)

make buffer_allocator_interface an implementation detail of the disk_io_thread by using disk_buffer_holder in the disk interface. make chained_buffer use holders instead of free deleter functions and userdata to support putting disk_buffer_holder objects in there. Also make the session buffers be held by holders
This commit is contained in:
Arvid Norberg 2016-12-28 17:47:18 -08:00 committed by GitHub
parent 516e86d0ea
commit af126fe507
19 changed files with 276 additions and 283 deletions

View File

@ -613,18 +613,13 @@ namespace libtorrent
void deferred_submit_jobs() override;
char* allocate_buffer() override;
ses_buffer_holder allocate_buffer() override;
torrent_peer* allocate_peer_entry(int type);
void free_peer_entry(torrent_peer* p);
void free_buffer(char* buf) override;
int send_buffer_size() const override { return send_buffer_size_impl; }
// implements buffer_allocator_interface
void free_disk_buffer(char* buf) override;
void reclaim_blocks(span<block_cache_reference> refs) override;
void do_reclaim_blocks();
// implements dht_observer
virtual void set_external_address(address const& ip
, address const& source) override;
@ -1139,12 +1134,6 @@ namespace libtorrent
// 5 minutes)
torrent_map::iterator m_next_lsd_torrent;
// we try to return disk buffers to the disk thread in batches, to
// avoid hammering its mutex. We accrue blocks here and defer returning
// them in a function we post to the io_service
std::vector<block_cache_reference> m_blocks_to_reclaim;
bool m_pending_block_reclaim = false;
#ifndef TORRENT_DISABLE_DHT
// torrents are announced on the DHT in a
// round-robin fashion. All torrents are cycled through

View File

@ -118,13 +118,14 @@ namespace libtorrent { namespace aux
};
#endif // TORRENT_DISABLE_LOGGING || TORRENT_USE_ASSERTS
struct ses_buffer_holder;
// TODO: 2 make this interface a lot smaller. It could be split up into
// several smaller interfaces. Each subsystem could then limit the size
// of the mock object to test it.
struct TORRENT_EXTRA_EXPORT session_interface
: buffer_allocator_interface
#if !defined TORRENT_DISABLE_LOGGING || TORRENT_USE_ASSERTS
, session_logger
: session_logger
#endif
{
// TODO: 2 the IP voting mechanism should be factored out
@ -193,7 +194,7 @@ namespace libtorrent { namespace aux
virtual void close_connection(peer_connection* p, error_code const& ec) = 0;
virtual int num_connections() const = 0;
virtual char* allocate_buffer() = 0;
virtual ses_buffer_holder allocate_buffer() = 0;
virtual void free_buffer(char* buf) = 0;
virtual int send_buffer_size() const = 0;
@ -321,6 +322,29 @@ namespace libtorrent { namespace aux
protected:
~session_interface() {}
};
struct ses_buffer_holder
{
ses_buffer_holder(session_interface& ses, char* buf)
: m_ses(&ses), m_buf(buf) {}
~ses_buffer_holder() { if (m_buf) m_ses->free_buffer(m_buf); }
ses_buffer_holder(ses_buffer_holder const&) = delete;
ses_buffer_holder& operator=(ses_buffer_holder const&) = delete;
ses_buffer_holder(ses_buffer_holder&& rhs) noexcept
: m_ses(rhs.m_ses), m_buf(rhs.m_buf) { rhs.m_buf = nullptr; }
ses_buffer_holder& operator=(ses_buffer_holder&& rhs) noexcept
{
if (m_buf) m_ses->free_buffer(m_buf);
m_buf = rhs.m_buf;
m_ses = rhs.m_ses;
rhs.m_buf = nullptr;
return *this;
}
char* get() const noexcept { return m_buf; }
private:
session_interface* m_ses;
char* m_buf;
};
}}
#endif

View File

@ -309,17 +309,31 @@ namespace libtorrent
void rc4_decrypt(span<char> buf);
#endif
public:
public:
// these functions encrypt the send buffer if m_rc4_encrypted
// is true, otherwise it passes the call to the
// peer_connection functions of the same names
virtual void append_const_send_buffer(char const* buffer, int size
, chained_buffer::free_buffer_fun destructor = &nop
, void* userdata = nullptr, aux::block_cache_reference ref
= aux::block_cache_reference()) override;
template <typename Holder>
void append_const_send_buffer(Holder buffer, int size)
{
#if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS)
if (!m_enc_handler.is_send_plaintext())
{
// if we're encrypting this buffer, we need to make a copy
// since we'll mutate it
std::unique_ptr<char[]> buf(new char[size]);
std::memcpy(buf.get(), buffer.get(), size);
append_send_buffer(std::move(buf), size);
}
else
#endif
{
append_send_buffer(std::move(buffer), size);
}
}
private:
private:
enum class state_t : std::uint8_t
{

View File

@ -45,6 +45,14 @@ POSSIBILITY OF SUCH DAMAGE.
#include <boost/asio/buffer.hpp>
#include "libtorrent/aux_/disable_warnings_pop.hpp"
#ifdef _MSC_VER
// visual studio requires the value in a deque to be copyable or movable. C++11
// only requires that when calling functions with those requirements
#define TORRENT_CPP98_DEQUE 1
#else
#define TORRENT_CPP98_DEQUE 0
#endif
namespace libtorrent
{
// TODO: 2 this type should probably be renamed to send_buffer
@ -58,14 +66,54 @@ namespace libtorrent
#endif
}
// destructs/frees the buffer (1st arg) with
// 2nd argument as userdata
typedef void (*free_buffer_fun)(char*, void*, aux::block_cache_reference ref);
private:
// destructs/frees the holder object
using destruct_holder_fun = void (*)(void*);
using move_construct_holder_fun = void (*)(void*, void*);
struct buffer_t
{
free_buffer_fun free_fun;
void* userdata;
buffer_t() {}
#if TORRENT_CPP98_DEQUE
buffer_t(buffer_t&& rhs) noexcept
{
destruct_holder = rhs.destruct_holder;
move_holder = rhs.move_holder;
buf = rhs.buf;
start = rhs.start;
size = rhs.size;
used_size = rhs.used_size;
move_holder(&holder, &rhs.holder);
}
buffer_t& operator=(buffer_t&& rhs) noexcept
{
destruct_holder(&holder);
destruct_holder = rhs.destruct_holder;
move_holder = rhs.move_holder;
buf = rhs.buf;
start = rhs.start;
size = rhs.size;
used_size = rhs.used_size;
move_holder(&holder, &rhs.holder);
return *this;
}
buffer_t(buffer_t const& rhs) noexcept
: buffer_t(std::move(const_cast<buffer_t&>(rhs))) {}
buffer_t& operator=(buffer_t const& rhs) noexcept
{ return this->operator=(std::move(const_cast<buffer_t&>(rhs))); }
#else
buffer_t(buffer_t&&) = delete;
buffer_t& operator=(buffer_t&&) = delete;
buffer_t(buffer_t const&) = delete;
buffer_t& operator=(buffer_t const&) = delete;
#endif
destruct_holder_fun destruct_holder;
#if TORRENT_CPP98_DEQUE
move_construct_holder_fun move_holder;
#endif
std::aligned_storage<32>::type holder;
// TODO: 2 the pointers here should probably be const, since
// they're not supposed to be mutated once inserted into the send
// buffer
@ -73,9 +121,10 @@ namespace libtorrent
char* start; // the first byte to send/receive in the buffer
int size; // the total size of the buffer
int used_size; // this is the number of bytes to send/receive
aux::block_cache_reference ref;
};
public:
bool empty() const { return m_bytes == 0; }
int size() const { return m_bytes; }
int capacity() const { return m_capacity; }
@ -83,13 +132,25 @@ namespace libtorrent
void pop_front(int bytes_to_pop);
//TODO: 3 use span<> instead of (buffer,s)
void append_buffer(char* buffer, int s, int used_size
, free_buffer_fun destructor, void* userdata
, aux::block_cache_reference ref = aux::block_cache_reference());
template <typename Holder>
void append_buffer(Holder buffer, int s, int used_size)
{
TORRENT_ASSERT(is_single_thread());
TORRENT_ASSERT(s >= used_size);
m_vec.emplace_back();
buffer_t& b = m_vec.back();
init_buffer_entry<Holder>(b, buffer, s, used_size);
}
void prepend_buffer(char* buffer, int s, int used_size
, free_buffer_fun destructor, void* userdata
, aux::block_cache_reference ref = aux::block_cache_reference());
template <typename Holder>
void prepend_buffer(Holder buffer, int s, int used_size)
{
TORRENT_ASSERT(is_single_thread());
TORRENT_ASSERT(s >= used_size);
m_vec.emplace_front();
buffer_t& b = m_vec.front();
init_buffer_entry<Holder>(b, buffer, s, used_size);
}
// returns the number of bytes available at the
// end of the last chained buffer.
@ -97,7 +158,7 @@ namespace libtorrent
// tries to copy the given buffer to the end of the
// last chained buffer. If there's not enough room
// it returns false
// it returns nullptr
char* append(char const* buf, int s);
// tries to allocate memory from the end
@ -114,6 +175,41 @@ namespace libtorrent
~chained_buffer();
private:
template <typename Holder>
void init_buffer_entry(buffer_t& b, Holder& buffer, int s, int used_size)
{
static_assert(sizeof(Holder) <= sizeof(b.holder), "buffer holder too large");
b.buf = buffer.get();
b.size = s;
b.start = buffer.get();
b.used_size = used_size;
#ifdef _MSC_VER
// this appears to be a false positive msvc warning
#pragma warning(push, 1)
#pragma warning(disable : 4100)
#endif
b.destruct_holder = [](void* holder)
{ reinterpret_cast<Holder*>(holder)->~Holder(); };
#if TORRENT_CPP98_DEQUE
b.move_holder = [](void* dst, void* src)
{ new (dst) Holder(std::move(*reinterpret_cast<Holder*>(src))); };
#endif
#ifdef _MSC_VER
#pragma warning(pop)
#endif
new (&b.holder) Holder(std::move(buffer));
m_bytes += used_size;
m_capacity += s;
TORRENT_ASSERT(m_bytes <= m_capacity);
}
template <typename Buffer>
void build_vec(int bytes, std::vector<Buffer>& vec);

View File

@ -39,7 +39,9 @@ POSSIBILITY OF SUCH DAMAGE.
#include <memory>
#include "libtorrent/units.hpp"
#include "libtorrent/disk_buffer_holder.hpp"
#include "libtorrent/aux_/vector.hpp"
#include "libtorrent/export.hpp"
namespace libtorrent
{
@ -51,6 +53,9 @@ namespace libtorrent
struct cache_status;
struct disk_buffer_holder;
struct counters;
struct settings_pack;
struct storage_params;
class file_storage;
enum class status_t : std::uint8_t
{
@ -76,8 +81,8 @@ namespace libtorrent
};
virtual void async_read(storage_interface* storage, peer_request const& r
, std::function<void(aux::block_cache_reference ref, char* block
, int flags, storage_error const& se)> handler, void* requester, std::uint8_t flags = 0) = 0;
, std::function<void(disk_buffer_holder block, int flags, storage_error const& se)> handler
, void* requester, std::uint8_t flags = 0) = 0;
virtual bool async_write(storage_interface* storage, peer_request const& r
, char const* buf, std::shared_ptr<disk_observer> o
, std::function<void(storage_error const&)> handler

View File

@ -57,6 +57,7 @@ namespace libtorrent
struct cached_piece_entry;
class torrent_info;
struct add_torrent_params;
struct buffer_allocator_interface;
// disk_io_jobs are allocated in a pool allocator in disk_io_thread
// they are always allocated from the network thread, posted
@ -77,7 +78,7 @@ namespace libtorrent
disk_io_job(disk_io_job const&) = delete;
disk_io_job& operator=(disk_io_job const&) = delete;
void call_callback();
void call_callback(buffer_allocator_interface&);
enum action_t : std::uint8_t
{
@ -146,8 +147,7 @@ namespace libtorrent
// this is called when operation completes
using read_handler = std::function<void(aux::block_cache_reference ref
, char* block, int flags, storage_error const& se)>;
using read_handler = std::function<void(disk_buffer_holder block, int flags, storage_error const& se)>;
using write_handler = std::function<void(storage_error const&)>;
using hash_handler = std::function<void(piece_index_t, sha1_hash const&, storage_error const&)>;
using move_handler = std::function<void(status_t, std::string const&, storage_error const&)>;

View File

@ -292,7 +292,7 @@ namespace libtorrent
void abort(bool wait);
void async_read(storage_interface* storage, peer_request const& r
, std::function<void(aux::block_cache_reference ref, char* block
, std::function<void(disk_buffer_holder block
, int flags, storage_error const& se)> handler, void* requester, std::uint8_t flags = 0) override;
bool async_write(storage_interface* storage, peer_request const& r
, char const* buf, std::shared_ptr<disk_observer> o

View File

@ -137,7 +137,6 @@ namespace libtorrent
aux::session_interface* ses;
aux::session_settings const* sett;
counters* stats_counters;
buffer_allocator_interface* allocator;
disk_interface* disk_thread;
io_service* ios;
std::weak_ptr<torrent> tor;
@ -147,7 +146,13 @@ namespace libtorrent
};
// internal
inline void nop(char*, void*, aux::block_cache_reference) {}
struct nop
{
explicit nop(char* b) : m_buf(b) {}
char* get() const { return m_buf; }
private:
char* m_buf;
};
struct TORRENT_EXTRA_EXPORT peer_connection_hot_members
{
@ -626,15 +631,12 @@ namespace libtorrent
void send_buffer(char const* begin, int size, int flags = 0);
void setup_send();
void append_send_buffer(char* buffer, int size
, chained_buffer::free_buffer_fun destructor = &nop
, void* userdata = nullptr, aux::block_cache_reference ref
= aux::block_cache_reference());
virtual void append_const_send_buffer(char const* buffer, int size
, chained_buffer::free_buffer_fun destructor = &nop
, void* userdata = nullptr, aux::block_cache_reference ref
= aux::block_cache_reference());
template <typename Holder>
void append_send_buffer(Holder buffer, int size)
{
TORRENT_ASSERT(is_single_thread());
m_send_buffer.append_buffer(std::move(buffer), size, size);
}
int outstanding_bytes() const { return m_outstanding_bytes; }
@ -758,9 +760,8 @@ namespace libtorrent
void do_update_interest();
void fill_send_buffer();
void on_disk_read_complete(aux::block_cache_reference ref
, char* disk_block, int flags, storage_error const& error, peer_request r
, time_point issue_time);
void on_disk_read_complete(disk_buffer_holder disk_block, int flags
, storage_error const& error, peer_request r, time_point issue_time);
void on_disk_write_complete(storage_error const& error
, peer_request r, std::shared_ptr<torrent> t);
void on_seed_mode_hashed(piece_index_t piece
@ -832,10 +833,6 @@ namespace libtorrent
// the disk thread to use to issue disk jobs to
disk_interface& m_disk_thread;
public:
buffer_allocator_interface& m_allocator;
private:
// io service
io_service& m_ios;

View File

@ -392,8 +392,7 @@ namespace libtorrent
error_code error;
};
void read_piece(piece_index_t piece);
void on_disk_read_complete(aux::block_cache_reference ref
, char* block, int flags, storage_error const& se
void on_disk_read_complete(disk_buffer_holder block, int flags, storage_error const& se
, peer_request r, std::shared_ptr<read_piece_struct> rp);
storage_mode_t storage_mode() const;

View File

@ -733,38 +733,8 @@ namespace libtorrent
m_rc4->decrypt(buf);
}
namespace {
void regular_c_free(char* buf, void* /* userdata */
, aux::block_cache_reference /* ref */)
{
std::free(buf);
}
}
#endif // #if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS)
void bt_peer_connection::append_const_send_buffer(char const* buffer, int size
, chained_buffer::free_buffer_fun destructor, void* userdata
, aux::block_cache_reference ref)
{
#if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS)
if (!m_enc_handler.is_send_plaintext())
{
// if we're encrypting this buffer, we need to make a copy
// since we'll mutate it
char* buf = static_cast<char*>(std::malloc(size));
std::memcpy(buf, buffer, size);
append_send_buffer(buf, size, &regular_c_free, nullptr);
destructor(const_cast<char*>(buffer), userdata, ref);
}
else
#endif
{
peer_connection::append_const_send_buffer(buffer, size, destructor
, userdata, ref);
}
}
void bt_peer_connection::write_handshake()
{
INVARIANT_CHECK;
@ -2433,24 +2403,6 @@ namespace libtorrent
#endif
}
namespace {
void buffer_reclaim_block(char* /* buffer */, void* userdata
, aux::block_cache_reference ref)
{
buffer_allocator_interface* buf = static_cast<buffer_allocator_interface*>(userdata);
buf->reclaim_blocks(ref);
}
void buffer_free_disk_buf(char* buffer, void* userdata
, aux::block_cache_reference /* ref */)
{
buffer_allocator_interface* buf = static_cast<buffer_allocator_interface*>(userdata);
buf->free_disk_buffer(buffer);
}
} // anonymous namespace
void bt_peer_connection::write_piece(peer_request const& r, disk_buffer_holder buffer)
{
INVARIANT_CHECK;
@ -2513,15 +2465,12 @@ namespace libtorrent
if (buffer.ref().storage == nullptr)
{
append_send_buffer(buffer.get(), r.length
, &buffer_free_disk_buf, &m_allocator);
append_send_buffer(std::move(buffer), r.length);
}
else
{
append_const_send_buffer(buffer.get(), r.length
, &buffer_reclaim_block, &m_allocator, buffer.ref());
append_const_send_buffer(std::move(buffer), r.length);
}
buffer.release();
m_payloads.push_back(range(send_buffer_size() - r.length, r.length));
setup_send();

View File

@ -40,6 +40,7 @@ namespace libtorrent
void chained_buffer::pop_front(int bytes_to_pop)
{
TORRENT_ASSERT(is_single_thread());
TORRENT_ASSERT(!m_destructed);
TORRENT_ASSERT(bytes_to_pop <= m_bytes);
while (bytes_to_pop > 0 && !m_vec.empty())
{
@ -55,7 +56,7 @@ namespace libtorrent
break;
}
b.free_fun(b.buf, b.userdata, b.ref);
b.destruct_holder(static_cast<void*>(&b.holder));
m_bytes -= b.used_size;
m_capacity -= b.size;
bytes_to_pop -= b.used_size;
@ -66,54 +67,16 @@ namespace libtorrent
}
}
void chained_buffer::append_buffer(char* buffer, int s, int used_size
, free_buffer_fun destructor, void* userdata
, aux::block_cache_reference ref)
{
TORRENT_ASSERT(is_single_thread());
TORRENT_ASSERT(s >= used_size);
buffer_t b;
b.buf = buffer;
b.size = s;
b.start = buffer;
b.used_size = used_size;
b.free_fun = destructor;
b.userdata = userdata;
b.ref = ref;
m_vec.push_back(b);
m_bytes += used_size;
m_capacity += s;
TORRENT_ASSERT(m_bytes <= m_capacity);
}
void chained_buffer::prepend_buffer(char* buffer, int s, int used_size
, free_buffer_fun destructor, void* userdata
, aux::block_cache_reference ref)
{
TORRENT_ASSERT(s >= used_size);
buffer_t b;
b.buf = buffer;
b.size = s;
b.start = buffer;
b.used_size = used_size;
b.free_fun = destructor;
b.userdata = userdata;
b.ref = ref;
m_vec.push_front(b);
m_bytes += used_size;
m_capacity += s;
TORRENT_ASSERT(m_bytes <= m_capacity);
}
// returns the number of bytes available at the
// end of the last chained buffer.
int chained_buffer::space_in_last_buffer()
{
TORRENT_ASSERT(is_single_thread());
TORRENT_ASSERT(!m_destructed);
if (m_vec.empty()) return 0;
buffer_t& b = m_vec.back();
TORRENT_ASSERT(b.start != nullptr);
TORRENT_ASSERT(b.buf != nullptr);
return b.size - b.used_size - int(b.start - b.buf);
}
@ -123,7 +86,8 @@ namespace libtorrent
char* chained_buffer::append(char const* buf, int s)
{
TORRENT_ASSERT(is_single_thread());
char* insert = allocate_appendix(s);
TORRENT_ASSERT(!m_destructed);
char* const insert = allocate_appendix(s);
if (insert == nullptr) return nullptr;
std::memcpy(insert, buf, s);
return insert;
@ -135,9 +99,12 @@ namespace libtorrent
char* chained_buffer::allocate_appendix(int s)
{
TORRENT_ASSERT(is_single_thread());
TORRENT_ASSERT(!m_destructed);
if (m_vec.empty()) return nullptr;
buffer_t& b = m_vec.back();
char* insert = b.start + b.used_size;
TORRENT_ASSERT(b.start != nullptr);
TORRENT_ASSERT(b.buf != nullptr);
char* const insert = b.start + b.used_size;
if (insert + s > b.buf + b.size) return nullptr;
b.used_size += s;
m_bytes += s;
@ -148,6 +115,7 @@ namespace libtorrent
std::vector<boost::asio::const_buffer> const& chained_buffer::build_iovec(int to_send)
{
TORRENT_ASSERT(is_single_thread());
TORRENT_ASSERT(!m_destructed);
m_tmp_vec.clear();
build_vec(to_send, m_tmp_vec);
return m_tmp_vec;
@ -155,15 +123,18 @@ namespace libtorrent
void chained_buffer::build_mutable_iovec(int bytes, std::vector<span<char>> &vec)
{
TORRENT_ASSERT(!m_destructed);
build_vec(bytes, vec);
}
template <typename Buffer>
void chained_buffer::build_vec(int bytes, std::vector<Buffer> &vec)
void chained_buffer::build_vec(int bytes, std::vector<Buffer>& vec)
{
for (std::deque<buffer_t>::iterator i = m_vec.begin()
, end(m_vec.end()); bytes > 0 && i != end; ++i)
TORRENT_ASSERT(!m_destructed);
for (auto i = m_vec.begin(), end(m_vec.end()); bytes > 0 && i != end; ++i)
{
TORRENT_ASSERT(i->start != nullptr);
TORRENT_ASSERT(i->buf != nullptr);
if (i->used_size > bytes)
{
TORRENT_ASSERT(bytes > 0);
@ -178,11 +149,9 @@ namespace libtorrent
void chained_buffer::clear()
{
for (std::deque<buffer_t>::iterator i = m_vec.begin()
, end(m_vec.end()); i != end; ++i)
{
i->free_fun(i->buf, i->userdata, i->ref);
}
TORRENT_ASSERT(!m_destructed);
for (auto& b : m_vec)
b.destruct_holder(static_cast<void*>(&b.holder));
m_bytes = 0;
m_capacity = 0;
m_vec.clear();
@ -190,14 +159,14 @@ namespace libtorrent
chained_buffer::~chained_buffer()
{
#if TORRENT_USE_ASSERTS
TORRENT_ASSERT(!m_destructed);
m_destructed = true;
#endif
TORRENT_ASSERT(is_single_thread());
TORRENT_ASSERT(m_bytes >= 0);
TORRENT_ASSERT(m_capacity >= 0);
clear();
#if TORRENT_USE_ASSERTS
m_destructed = true;
#endif
}
}

View File

@ -35,15 +35,19 @@ POSSIBILITY OF SUCH DAMAGE.
namespace libtorrent
{
struct buffer_allocator_interface;
namespace {
struct caller_visitor : boost::static_visitor<>
{
explicit caller_visitor(disk_io_job& j) : m_job(j) {}
explicit caller_visitor(buffer_allocator_interface& alloc, disk_io_job& j)
: m_job(j), m_alloc(alloc) {}
void operator()(disk_io_job::read_handler& h) const
{
if (!h) return;
h(m_job.d.io.ref, m_job.buffer.disk_block, m_job.flags, m_job.error);
disk_buffer_holder block(m_alloc, m_job.d.io.ref, m_job.buffer.disk_block);
h(std::move(block), m_job.flags, m_job.error);
}
void operator()(disk_io_job::write_handler& h) const
@ -90,6 +94,7 @@ namespace libtorrent
private:
disk_io_job& m_job;
buffer_allocator_interface& m_alloc;
};
}
@ -111,9 +116,9 @@ namespace libtorrent
free(buffer.string);
}
void disk_io_job::call_callback()
void disk_io_job::call_callback(buffer_allocator_interface& alloc)
{
boost::apply_visitor(caller_visitor(*this), callback);
boost::apply_visitor(caller_visitor(alloc, *this), callback);
}
bool disk_io_job::completed(cached_piece_entry const* pe, int block_size)

View File

@ -1515,8 +1515,8 @@ namespace libtorrent
}
void disk_io_thread::async_read(storage_interface* storage, peer_request const& r
, std::function<void(aux::block_cache_reference ref, char* block
, int flags, storage_error const& se)> handler, void* requester, std::uint8_t const flags)
, std::function<void(disk_buffer_holder block, int flags, storage_error const& se)> handler
, void* requester, std::uint8_t const flags)
{
TORRENT_ASSERT(r.length <= m_disk_cache.block_size());
TORRENT_ASSERT(r.length <= 16 * 1024);
@ -1541,7 +1541,7 @@ namespace libtorrent
switch (ret)
{
case 0:
j->call_callback();
j->call_callback(*this);
free_job(j);
break;
case 1:
@ -1766,7 +1766,7 @@ namespace libtorrent
#endif
l.unlock();
j->call_callback();
j->call_callback(*this);
free_job(j);
return;
}
@ -1920,7 +1920,7 @@ namespace libtorrent
if (m_abort)
{
j->error.ec = boost::asio::error::operation_aborted;
j->call_callback();
j->call_callback(*this);
free_job(j);
return;
}
@ -3385,7 +3385,7 @@ namespace libtorrent
#if TORRENT_USE_ASSERTS
j->callback_called = true;
#endif
j->call_callback();
j->call_callback(*this);
to_delete[cnt++] = j;
j = next;
if (cnt == int(to_delete.size()))

View File

@ -119,7 +119,6 @@ namespace libtorrent
, m_max_out_request_queue(m_settings.get_int(settings_pack::max_out_request_queue))
, m_remote(pack.endp)
, m_disk_thread(*pack.disk_thread)
, m_allocator(*pack.allocator)
, m_ios(*pack.ios)
, m_work(m_ios)
, m_outstanding_piece_verification(0)
@ -5127,7 +5126,7 @@ namespace libtorrent
m_disk_thread.async_read(&t->storage(), r
, std::bind(&peer_connection::on_disk_read_complete
, self(), _1, _2, _3, _4, r, clock_type::now()), this);
, self(), _1, _2, _3, r, clock_type::now()), this);
}
m_last_sent_payload = clock_type::now();
m_requests.erase(m_requests.begin() + i);
@ -5196,8 +5195,8 @@ namespace libtorrent
fill_send_buffer();
}
void peer_connection::on_disk_read_complete(aux::block_cache_reference ref
, char* disk_block, int const flags, storage_error const& error
void peer_connection::on_disk_read_complete(disk_buffer_holder buffer
, int const flags, storage_error const& error
, peer_request r, time_point issue_time)
{
TORRENT_ASSERT(is_single_thread());
@ -5213,7 +5212,7 @@ namespace libtorrent
peer_log(peer_log_alert::info, "FILE_ASYNC_READ_COMPLETE"
, "piece: %d s: %x l: %x b: %p c: %s e: %s rtt: %d us"
, static_cast<int>(r.piece), r.start, r.length
, static_cast<void*>(disk_block)
, static_cast<void*>(buffer.get())
, (flags & disk_interface::cache_hit ? "cache hit" : "cache miss")
, error.ec.message().c_str(), disk_rtt);
}
@ -5230,7 +5229,7 @@ namespace libtorrent
return;
}
TORRENT_ASSERT(disk_block == nullptr);
TORRENT_ASSERT(buffer.get() == nullptr);
write_dont_have(r.piece);
write_reject_request(r);
if (t->alerts().should_post<file_error_alert>())
@ -5248,11 +5247,6 @@ namespace libtorrent
// block, the peer is still useful
m_disk_read_failures = 0;
// even if we're disconnecting, we need to free this block
// otherwise the disk thread will hang, waiting for the network
// thread to be done with it
disk_buffer_holder buffer(m_allocator, ref, disk_block);
if (t && m_settings.get_int(settings_pack::suggest_mode)
== settings_pack::suggest_read_cache)
{
@ -5453,8 +5447,7 @@ namespace libtorrent
// this const_cast is a here because chained_buffer need to be
// fixed.
char* ptr = const_cast<char*>(i->data());
m_send_buffer.prepend_buffer(ptr
, size, size, &nop, nullptr);
m_send_buffer.prepend_buffer(nop(ptr), size, size);
}
set_send_barrier(next_barrier);
}
@ -5646,24 +5639,6 @@ namespace libtorrent
std::bind(&peer_connection::on_receive_data, self(), _1, _2)));
}
void peer_connection::append_send_buffer(char* buffer, int size
, chained_buffer::free_buffer_fun destructor, void* userdata
, aux::block_cache_reference ref)
{
TORRENT_ASSERT(is_single_thread());
m_send_buffer.append_buffer(buffer, size, size, destructor
, userdata, ref);
}
void peer_connection::append_const_send_buffer(char const* buffer, int size
, chained_buffer::free_buffer_fun destructor, void* userdata
, aux::block_cache_reference ref)
{
TORRENT_ASSERT(is_single_thread());
m_send_buffer.append_buffer(const_cast<char*>(buffer), size, size, destructor
, userdata, ref);
}
piece_block_progress peer_connection::downloading_piece_progress() const
{
#ifndef TORRENT_DISABLE_LOGGING
@ -5673,14 +5648,6 @@ namespace libtorrent
return piece_block_progress();
}
namespace {
void session_free_buffer(char* buffer, void* userdata, aux::block_cache_reference)
{
aux::session_interface* ses = static_cast<aux::session_interface*>(userdata);
ses->free_buffer(buffer);
}
}
void peer_connection::send_buffer(char const* buf, int size, int flags)
{
TORRENT_ASSERT(is_single_thread());
@ -5704,20 +5671,14 @@ namespace libtorrent
int i = 0;
while (size > 0)
{
char* chain_buf = m_ses.allocate_buffer();
if (chain_buf == nullptr)
{
disconnect(errors::no_memory, op_alloc_sndbuf);
return;
}
aux::ses_buffer_holder session_buf = m_ses.allocate_buffer();
const int alloc_buf_size = m_ses.send_buffer_size();
int buf_size = (std::min)(alloc_buf_size, size);
std::memcpy(chain_buf, buf, buf_size);
int const alloc_buf_size = m_ses.send_buffer_size();
int const buf_size = std::min(alloc_buf_size, size);
std::memcpy(session_buf.get(), buf, buf_size);
buf += buf_size;
size -= buf_size;
m_send_buffer.append_buffer(chain_buf, alloc_buf_size, buf_size
, &session_free_buffer, &m_ses);
m_send_buffer.append_buffer(std::move(session_buf), alloc_buf_size, buf_size);
++i;
}
setup_send();

View File

@ -2825,7 +2825,6 @@ namespace aux {
pack.ses = this;
pack.sett = &m_settings;
pack.stats_counters = &m_stats_counters;
pack.allocator = this;
pack.disk_thread = &m_disk_thread;
pack.ios = &m_io_service;
pack.tor = std::weak_ptr<torrent>();
@ -6664,39 +6663,15 @@ namespace aux {
#endif
}
// decrement the refcount of the block in the disk cache
// since the network thread doesn't need it anymore
void session_impl::reclaim_blocks(span<block_cache_reference> refs)
{
m_blocks_to_reclaim.insert(m_blocks_to_reclaim.end(), refs.begin(), refs.end());
if (m_pending_block_reclaim) return;
m_io_service.post(std::bind(&session_impl::do_reclaim_blocks, this));
m_pending_block_reclaim = true;
}
void session_impl::do_reclaim_blocks()
{
TORRENT_ASSERT(m_pending_block_reclaim);
m_pending_block_reclaim = false;
m_disk_thread.reclaim_blocks(m_blocks_to_reclaim);
m_blocks_to_reclaim.clear();
}
void session_impl::free_disk_buffer(char* buf)
{
m_disk_thread.free_disk_buffer(buf);
}
char* session_impl::allocate_buffer()
ses_buffer_holder session_impl::allocate_buffer()
{
TORRENT_ASSERT(is_single_thread());
#ifdef TORRENT_DISABLE_POOL_ALLOCATOR
int num_bytes = send_buffer_size();
return static_cast<char*>(malloc(num_bytes));
return ses_buffer_holder(*this, static_cast<char*>(malloc(num_bytes)));
#else
return static_cast<char*>(m_send_buffers.malloc());
return ses_buffer_holder(*this, static_cast<char*>(m_send_buffers.malloc()));
#endif
}

View File

@ -95,7 +95,7 @@ namespace
{
m_torrent.session().disk_thread().async_read(&m_torrent.storage()
, r, std::bind(&smart_ban_plugin::on_read_ok_block
, shared_from_this(), *i, i->second.peer->address(), _1, _2, r.length, _3, _4)
, shared_from_this(), *i, i->second.peer->address(), _1, r.length, _2, _3)
, reinterpret_cast<void*>(1));
m_block_hashes.erase(i++);
}
@ -152,7 +152,7 @@ namespace
// block read will have been deleted by the time it gets back to the network thread
m_torrent.session().disk_thread().async_read(&m_torrent.storage(), r
, std::bind(&smart_ban_plugin::on_read_failed_block
, shared_from_this(), pb, (*i)->address(), _1, _2, r.length, _3, _4)
, shared_from_this(), pb, (*i)->address(), _1, r.length, _2, _3)
, reinterpret_cast<torrent_peer*>(1)
, disk_io_job::force_copy);
}
@ -176,18 +176,16 @@ namespace
};
void on_read_failed_block(piece_block b, address a
, aux::block_cache_reference ref, char* disk_block, int const block_size, int
, disk_buffer_holder buffer, int const block_size, int
, storage_error const& error)
{
TORRENT_ASSERT(m_torrent.session().is_single_thread());
disk_buffer_holder buffer(m_torrent.session(), ref, disk_block);
// ignore read errors
if (error) return;
hasher h;
h.update({disk_block, std::size_t(block_size)});
h.update({buffer.get(), std::size_t(block_size)});
h.update(reinterpret_cast<char const*>(&m_salt), sizeof(m_salt));
std::pair<peer_list::iterator, peer_list::iterator> const range
@ -260,18 +258,16 @@ namespace
}
void on_read_ok_block(std::pair<piece_block, block_entry> b, address a
, aux::block_cache_reference ref, char* disk_block, int const block_size, int
, disk_buffer_holder buffer, int const block_size, int
, storage_error const& error)
{
TORRENT_ASSERT(m_torrent.session().is_single_thread());
disk_buffer_holder buffer(m_torrent.session(), ref, disk_block);
// ignore read errors
if (error) return;
hasher h;
h.update({disk_block, std::size_t(block_size)});
h.update({buffer.get(), std::size_t(block_size)});
h.update(reinterpret_cast<char const*>(&m_salt), sizeof(m_salt));
sha1_hash const ok_digest = h.final();

View File

@ -859,7 +859,7 @@ namespace libtorrent
r.length = (std::min)(piece_size - r.start, block_size());
m_ses.disk_thread().async_read(&storage(), r
, std::bind(&torrent::on_disk_read_complete
, shared_from_this(), _1, _2, _3, _4, r, rp), reinterpret_cast<void*>(1));
, shared_from_this(), _1, _2, _3, r, rp), reinterpret_cast<void*>(1));
}
}
@ -1124,15 +1124,13 @@ namespace libtorrent
}
catch (...) { handle_exception(); }
void torrent::on_disk_read_complete(aux::block_cache_reference ref
, char* block, int, storage_error const& se
void torrent::on_disk_read_complete(disk_buffer_holder buffer
, int, storage_error const& se
, peer_request r, std::shared_ptr<read_piece_struct> rp) try
{
// hold a reference until this function returns
TORRENT_ASSERT(is_single_thread());
disk_buffer_holder buffer(m_ses, ref, block);
--rp->blocks_left;
if (se)
{
@ -1142,7 +1140,7 @@ namespace libtorrent
}
else
{
std::memcpy(rp->piece_data.get() + r.start, block, r.length);
std::memcpy(rp->piece_data.get() + r.start, buffer.get(), r.length);
}
if (rp->blocks_left == 0)
@ -5853,7 +5851,6 @@ namespace libtorrent
pack.ses = &m_ses;
pack.sett = &settings();
pack.stats_counters = &m_ses.stats_counters();
pack.allocator = &m_ses;
pack.disk_thread = &m_ses.disk_thread();
pack.ios = &m_ses.get_io_service();
pack.tor = shared_from_this();
@ -6536,7 +6533,6 @@ namespace libtorrent
pack.ses = &m_ses;
pack.sett = &settings();
pack.stats_counters = &m_ses.stats_counters();
pack.allocator = &m_ses;
pack.disk_thread = &m_ses.disk_thread();
pack.ios = &m_ses.get_io_service();
pack.tor = shared_from_this();

View File

@ -280,7 +280,7 @@ namespace libtorrent { namespace
// TODO: we really need to increment the refcounter on the torrent
// while this buffer is still in the peer's send buffer
if (metadata_piece_size) m_pc.append_const_send_buffer(
metadata, metadata_piece_size);
nop(const_cast<char*>(metadata)), metadata_piece_size);
m_pc.stats_counters().inc_stats_counter(counters::num_outgoing_extended);
m_pc.stats_counters().inc_stats_counter(counters::num_outgoing_metadata);

View File

@ -140,9 +140,8 @@ TORRENT_TEST(buffer_move_assign)
std::set<char*> buffer_list;
void free_buffer(char* m, void* userdata, aux::block_cache_reference ref)
void free_buffer(char* m)
{
TEST_CHECK(userdata == (void*)0x1337);
std::set<char*>::iterator i = buffer_list.find(m);
TEST_CHECK(i != buffer_list.end());
@ -181,6 +180,25 @@ bool compare_chained_buffer(chained_buffer& b, char const* mem, int size)
return std::memcmp(&flat[0], mem, size) == 0;
}
struct holder
{
explicit holder(char* buf) : m_buf(buf) {}
~holder() { if (m_buf) free_buffer(m_buf); }
holder(holder const&) = delete;
holder& operator=(holder const&) = delete;
holder(holder&& rhs) : m_buf(rhs.m_buf) { rhs.m_buf = nullptr; }
holder& operator=(holder&& rhs)
{
if (m_buf) free_buffer(m_buf);
m_buf = rhs.m_buf;
rhs.m_buf = nullptr;
return *this;
}
char* get() const { return m_buf; }
private:
char* m_buf;
};
TORRENT_TEST(chained_buffer)
{
char data[] = "foobar";
@ -199,7 +217,7 @@ TORRENT_TEST(chained_buffer)
char* b1 = allocate_buffer(512);
std::memcpy(b1, data, 6);
b.append_buffer(b1, 512, 6, &free_buffer, (void*)0x1337);
b.append_buffer(holder(b1), 512, 6);
TEST_EQUAL(buffer_list.size(), 1);
TEST_CHECK(b.capacity() == 512);
@ -229,12 +247,12 @@ TORRENT_TEST(chained_buffer)
char* b2 = allocate_buffer(512);
std::memcpy(b2, data, 6);
b.append_buffer(b2, 512, 6, free_buffer, (void*)0x1337);
b.append_buffer(holder(b2), 512, 6);
TEST_CHECK(buffer_list.size() == 2);
char* b3 = allocate_buffer(512);
std::memcpy(b3, data, 6);
b.append_buffer(b3, 512, 6, &free_buffer, (void*)0x1337);
b.append_buffer(holder(b3), 512, 6);
TEST_CHECK(buffer_list.size() == 3);
TEST_CHECK(b.capacity() == 512 * 3);
@ -269,7 +287,7 @@ TORRENT_TEST(chained_buffer)
char* b4 = allocate_buffer(20);
std::memcpy(b4, data, 6);
std::memcpy(b4 + 6, data, 6);
b.append_buffer(b4, 20, 12, &free_buffer, (void*)0x1337);
b.append_buffer(holder(b4), 20, 12);
TEST_CHECK(b.space_in_last_buffer() == 8);
ret = b.append(data, 6) != nullptr;
@ -282,8 +300,8 @@ TORRENT_TEST(chained_buffer)
std::cout << b.space_in_last_buffer() << std::endl;
char* b5 = allocate_buffer(20);
std::memcpy(b4, data, 6);
b.append_buffer(b5, 20, 6, &free_buffer, (void*)0x1337);
std::memcpy(b5, data, 6);
b.append_buffer(holder(b5), 20, 6);
b.pop_front(22);
TEST_CHECK(b.size() == 5);