diff --git a/include/libtorrent/aux_/session_impl.hpp b/include/libtorrent/aux_/session_impl.hpp index 126e80c6b..d12b4d312 100644 --- a/include/libtorrent/aux_/session_impl.hpp +++ b/include/libtorrent/aux_/session_impl.hpp @@ -613,18 +613,13 @@ namespace libtorrent void deferred_submit_jobs() override; - char* allocate_buffer() override; + ses_buffer_holder allocate_buffer() override; torrent_peer* allocate_peer_entry(int type); void free_peer_entry(torrent_peer* p); void free_buffer(char* buf) override; int send_buffer_size() const override { return send_buffer_size_impl; } - // implements buffer_allocator_interface - void free_disk_buffer(char* buf) override; - void reclaim_blocks(span refs) override; - void do_reclaim_blocks(); - // implements dht_observer virtual void set_external_address(address const& ip , address const& source) override; @@ -1139,12 +1134,6 @@ namespace libtorrent // 5 minutes) torrent_map::iterator m_next_lsd_torrent; - // we try to return disk buffers to the disk thread in batches, to - // avoid hammering its mutex. We accrue blocks here and defer returning - // them in a function we post to the io_service - std::vector m_blocks_to_reclaim; - bool m_pending_block_reclaim = false; - #ifndef TORRENT_DISABLE_DHT // torrents are announced on the DHT in a // round-robin fashion. All torrents are cycled through diff --git a/include/libtorrent/aux_/session_interface.hpp b/include/libtorrent/aux_/session_interface.hpp index 19e6f91de..3878ec4a4 100644 --- a/include/libtorrent/aux_/session_interface.hpp +++ b/include/libtorrent/aux_/session_interface.hpp @@ -118,13 +118,14 @@ namespace libtorrent { namespace aux }; #endif // TORRENT_DISABLE_LOGGING || TORRENT_USE_ASSERTS + struct ses_buffer_holder; + // TODO: 2 make this interface a lot smaller. It could be split up into // several smaller interfaces. Each subsystem could then limit the size // of the mock object to test it. struct TORRENT_EXTRA_EXPORT session_interface - : buffer_allocator_interface #if !defined TORRENT_DISABLE_LOGGING || TORRENT_USE_ASSERTS - , session_logger + : session_logger #endif { // TODO: 2 the IP voting mechanism should be factored out @@ -193,7 +194,7 @@ namespace libtorrent { namespace aux virtual void close_connection(peer_connection* p, error_code const& ec) = 0; virtual int num_connections() const = 0; - virtual char* allocate_buffer() = 0; + virtual ses_buffer_holder allocate_buffer() = 0; virtual void free_buffer(char* buf) = 0; virtual int send_buffer_size() const = 0; @@ -321,6 +322,29 @@ namespace libtorrent { namespace aux protected: ~session_interface() {} }; + + struct ses_buffer_holder + { + ses_buffer_holder(session_interface& ses, char* buf) + : m_ses(&ses), m_buf(buf) {} + ~ses_buffer_holder() { if (m_buf) m_ses->free_buffer(m_buf); } + ses_buffer_holder(ses_buffer_holder const&) = delete; + ses_buffer_holder& operator=(ses_buffer_holder const&) = delete; + ses_buffer_holder(ses_buffer_holder&& rhs) noexcept + : m_ses(rhs.m_ses), m_buf(rhs.m_buf) { rhs.m_buf = nullptr; } + ses_buffer_holder& operator=(ses_buffer_holder&& rhs) noexcept + { + if (m_buf) m_ses->free_buffer(m_buf); + m_buf = rhs.m_buf; + m_ses = rhs.m_ses; + rhs.m_buf = nullptr; + return *this; + } + char* get() const noexcept { return m_buf; } + private: + session_interface* m_ses; + char* m_buf; + }; }} #endif diff --git a/include/libtorrent/bt_peer_connection.hpp b/include/libtorrent/bt_peer_connection.hpp index b7a0b1dcf..2facf40cd 100644 --- a/include/libtorrent/bt_peer_connection.hpp +++ b/include/libtorrent/bt_peer_connection.hpp @@ -309,17 +309,31 @@ namespace libtorrent void rc4_decrypt(span buf); #endif -public: + public: // these functions encrypt the send buffer if m_rc4_encrypted // is true, otherwise it passes the call to the // peer_connection functions of the same names - virtual void append_const_send_buffer(char const* buffer, int size - , chained_buffer::free_buffer_fun destructor = &nop - , void* userdata = nullptr, aux::block_cache_reference ref - = aux::block_cache_reference()) override; + template + void append_const_send_buffer(Holder buffer, int size) + { +#if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) + if (!m_enc_handler.is_send_plaintext()) + { + // if we're encrypting this buffer, we need to make a copy + // since we'll mutate it + std::unique_ptr buf(new char[size]); + std::memcpy(buf.get(), buffer.get(), size); + append_send_buffer(std::move(buf), size); + } + else +#endif + { + append_send_buffer(std::move(buffer), size); + } + } -private: + private: enum class state_t : std::uint8_t { diff --git a/include/libtorrent/chained_buffer.hpp b/include/libtorrent/chained_buffer.hpp index 4a1fbb223..fa68690b0 100644 --- a/include/libtorrent/chained_buffer.hpp +++ b/include/libtorrent/chained_buffer.hpp @@ -45,6 +45,14 @@ POSSIBILITY OF SUCH DAMAGE. #include #include "libtorrent/aux_/disable_warnings_pop.hpp" +#ifdef _MSC_VER +// visual studio requires the value in a deque to be copyable or movable. C++11 +// only requires that when calling functions with those requirements +#define TORRENT_CPP98_DEQUE 1 +#else +#define TORRENT_CPP98_DEQUE 0 +#endif + namespace libtorrent { // TODO: 2 this type should probably be renamed to send_buffer @@ -58,14 +66,54 @@ namespace libtorrent #endif } - // destructs/frees the buffer (1st arg) with - // 2nd argument as userdata - typedef void (*free_buffer_fun)(char*, void*, aux::block_cache_reference ref); + private: + + // destructs/frees the holder object + using destruct_holder_fun = void (*)(void*); + using move_construct_holder_fun = void (*)(void*, void*); struct buffer_t { - free_buffer_fun free_fun; - void* userdata; + buffer_t() {} +#if TORRENT_CPP98_DEQUE + buffer_t(buffer_t&& rhs) noexcept + { + destruct_holder = rhs.destruct_holder; + move_holder = rhs.move_holder; + buf = rhs.buf; + start = rhs.start; + size = rhs.size; + used_size = rhs.used_size; + move_holder(&holder, &rhs.holder); + } + buffer_t& operator=(buffer_t&& rhs) noexcept + { + destruct_holder(&holder); + destruct_holder = rhs.destruct_holder; + move_holder = rhs.move_holder; + buf = rhs.buf; + start = rhs.start; + size = rhs.size; + used_size = rhs.used_size; + move_holder(&holder, &rhs.holder); + return *this; + } + buffer_t(buffer_t const& rhs) noexcept + : buffer_t(std::move(const_cast(rhs))) {} + buffer_t& operator=(buffer_t const& rhs) noexcept + { return this->operator=(std::move(const_cast(rhs))); } +#else + buffer_t(buffer_t&&) = delete; + buffer_t& operator=(buffer_t&&) = delete; + buffer_t(buffer_t const&) = delete; + buffer_t& operator=(buffer_t const&) = delete; +#endif + + destruct_holder_fun destruct_holder; +#if TORRENT_CPP98_DEQUE + move_construct_holder_fun move_holder; +#endif + std::aligned_storage<32>::type holder; // TODO: 2 the pointers here should probably be const, since // they're not supposed to be mutated once inserted into the send // buffer @@ -73,9 +121,10 @@ namespace libtorrent char* start; // the first byte to send/receive in the buffer int size; // the total size of the buffer int used_size; // this is the number of bytes to send/receive - aux::block_cache_reference ref; }; + public: + bool empty() const { return m_bytes == 0; } int size() const { return m_bytes; } int capacity() const { return m_capacity; } @@ -83,13 +132,25 @@ namespace libtorrent void pop_front(int bytes_to_pop); //TODO: 3 use span<> instead of (buffer,s) - void append_buffer(char* buffer, int s, int used_size - , free_buffer_fun destructor, void* userdata - , aux::block_cache_reference ref = aux::block_cache_reference()); + template + void append_buffer(Holder buffer, int s, int used_size) + { + TORRENT_ASSERT(is_single_thread()); + TORRENT_ASSERT(s >= used_size); + m_vec.emplace_back(); + buffer_t& b = m_vec.back(); + init_buffer_entry(b, buffer, s, used_size); + } - void prepend_buffer(char* buffer, int s, int used_size - , free_buffer_fun destructor, void* userdata - , aux::block_cache_reference ref = aux::block_cache_reference()); + template + void prepend_buffer(Holder buffer, int s, int used_size) + { + TORRENT_ASSERT(is_single_thread()); + TORRENT_ASSERT(s >= used_size); + m_vec.emplace_front(); + buffer_t& b = m_vec.front(); + init_buffer_entry(b, buffer, s, used_size); + } // returns the number of bytes available at the // end of the last chained buffer. @@ -97,7 +158,7 @@ namespace libtorrent // tries to copy the given buffer to the end of the // last chained buffer. If there's not enough room - // it returns false + // it returns nullptr char* append(char const* buf, int s); // tries to allocate memory from the end @@ -114,6 +175,41 @@ namespace libtorrent ~chained_buffer(); private: + + template + void init_buffer_entry(buffer_t& b, Holder& buffer, int s, int used_size) + { + static_assert(sizeof(Holder) <= sizeof(b.holder), "buffer holder too large"); + + b.buf = buffer.get(); + b.size = s; + b.start = buffer.get(); + b.used_size = used_size; + +#ifdef _MSC_VER +// this appears to be a false positive msvc warning +#pragma warning(push, 1) +#pragma warning(disable : 4100) +#endif + b.destruct_holder = [](void* holder) + { reinterpret_cast(holder)->~Holder(); }; + +#if TORRENT_CPP98_DEQUE + b.move_holder = [](void* dst, void* src) + { new (dst) Holder(std::move(*reinterpret_cast(src))); }; +#endif + +#ifdef _MSC_VER +#pragma warning(pop) +#endif + + new (&b.holder) Holder(std::move(buffer)); + + m_bytes += used_size; + m_capacity += s; + TORRENT_ASSERT(m_bytes <= m_capacity); + } + template void build_vec(int bytes, std::vector& vec); diff --git a/include/libtorrent/disk_interface.hpp b/include/libtorrent/disk_interface.hpp index 725e1ed2b..1aebecf99 100644 --- a/include/libtorrent/disk_interface.hpp +++ b/include/libtorrent/disk_interface.hpp @@ -39,7 +39,9 @@ POSSIBILITY OF SUCH DAMAGE. #include #include "libtorrent/units.hpp" +#include "libtorrent/disk_buffer_holder.hpp" #include "libtorrent/aux_/vector.hpp" +#include "libtorrent/export.hpp" namespace libtorrent { @@ -51,6 +53,9 @@ namespace libtorrent struct cache_status; struct disk_buffer_holder; struct counters; + struct settings_pack; + struct storage_params; + class file_storage; enum class status_t : std::uint8_t { @@ -76,8 +81,8 @@ namespace libtorrent }; virtual void async_read(storage_interface* storage, peer_request const& r - , std::function handler, void* requester, std::uint8_t flags = 0) = 0; + , std::function handler + , void* requester, std::uint8_t flags = 0) = 0; virtual bool async_write(storage_interface* storage, peer_request const& r , char const* buf, std::shared_ptr o , std::function handler diff --git a/include/libtorrent/disk_io_job.hpp b/include/libtorrent/disk_io_job.hpp index cf01132b9..127d5ec27 100644 --- a/include/libtorrent/disk_io_job.hpp +++ b/include/libtorrent/disk_io_job.hpp @@ -57,6 +57,7 @@ namespace libtorrent struct cached_piece_entry; class torrent_info; struct add_torrent_params; + struct buffer_allocator_interface; // disk_io_jobs are allocated in a pool allocator in disk_io_thread // they are always allocated from the network thread, posted @@ -77,7 +78,7 @@ namespace libtorrent disk_io_job(disk_io_job const&) = delete; disk_io_job& operator=(disk_io_job const&) = delete; - void call_callback(); + void call_callback(buffer_allocator_interface&); enum action_t : std::uint8_t { @@ -146,8 +147,7 @@ namespace libtorrent // this is called when operation completes - using read_handler = std::function; + using read_handler = std::function; using write_handler = std::function; using hash_handler = std::function; using move_handler = std::function; diff --git a/include/libtorrent/disk_io_thread.hpp b/include/libtorrent/disk_io_thread.hpp index 640505002..b8828603a 100644 --- a/include/libtorrent/disk_io_thread.hpp +++ b/include/libtorrent/disk_io_thread.hpp @@ -292,7 +292,7 @@ namespace libtorrent void abort(bool wait); void async_read(storage_interface* storage, peer_request const& r - , std::function handler, void* requester, std::uint8_t flags = 0) override; bool async_write(storage_interface* storage, peer_request const& r , char const* buf, std::shared_ptr o diff --git a/include/libtorrent/peer_connection.hpp b/include/libtorrent/peer_connection.hpp index 28be78ae4..159000f1e 100644 --- a/include/libtorrent/peer_connection.hpp +++ b/include/libtorrent/peer_connection.hpp @@ -137,7 +137,6 @@ namespace libtorrent aux::session_interface* ses; aux::session_settings const* sett; counters* stats_counters; - buffer_allocator_interface* allocator; disk_interface* disk_thread; io_service* ios; std::weak_ptr tor; @@ -147,7 +146,13 @@ namespace libtorrent }; // internal - inline void nop(char*, void*, aux::block_cache_reference) {} + struct nop + { + explicit nop(char* b) : m_buf(b) {} + char* get() const { return m_buf; } + private: + char* m_buf; + }; struct TORRENT_EXTRA_EXPORT peer_connection_hot_members { @@ -626,15 +631,12 @@ namespace libtorrent void send_buffer(char const* begin, int size, int flags = 0); void setup_send(); - void append_send_buffer(char* buffer, int size - , chained_buffer::free_buffer_fun destructor = &nop - , void* userdata = nullptr, aux::block_cache_reference ref - = aux::block_cache_reference()); - - virtual void append_const_send_buffer(char const* buffer, int size - , chained_buffer::free_buffer_fun destructor = &nop - , void* userdata = nullptr, aux::block_cache_reference ref - = aux::block_cache_reference()); + template + void append_send_buffer(Holder buffer, int size) + { + TORRENT_ASSERT(is_single_thread()); + m_send_buffer.append_buffer(std::move(buffer), size, size); + } int outstanding_bytes() const { return m_outstanding_bytes; } @@ -758,9 +760,8 @@ namespace libtorrent void do_update_interest(); void fill_send_buffer(); - void on_disk_read_complete(aux::block_cache_reference ref - , char* disk_block, int flags, storage_error const& error, peer_request r - , time_point issue_time); + void on_disk_read_complete(disk_buffer_holder disk_block, int flags + , storage_error const& error, peer_request r, time_point issue_time); void on_disk_write_complete(storage_error const& error , peer_request r, std::shared_ptr t); void on_seed_mode_hashed(piece_index_t piece @@ -832,10 +833,6 @@ namespace libtorrent // the disk thread to use to issue disk jobs to disk_interface& m_disk_thread; - public: - buffer_allocator_interface& m_allocator; - private: - // io service io_service& m_ios; diff --git a/include/libtorrent/torrent.hpp b/include/libtorrent/torrent.hpp index 9b0e91262..bb6ea12c7 100644 --- a/include/libtorrent/torrent.hpp +++ b/include/libtorrent/torrent.hpp @@ -392,8 +392,7 @@ namespace libtorrent error_code error; }; void read_piece(piece_index_t piece); - void on_disk_read_complete(aux::block_cache_reference ref - , char* block, int flags, storage_error const& se + void on_disk_read_complete(disk_buffer_holder block, int flags, storage_error const& se , peer_request r, std::shared_ptr rp); storage_mode_t storage_mode() const; diff --git a/src/bt_peer_connection.cpp b/src/bt_peer_connection.cpp index 6e03a7275..65b151c36 100644 --- a/src/bt_peer_connection.cpp +++ b/src/bt_peer_connection.cpp @@ -733,38 +733,8 @@ namespace libtorrent m_rc4->decrypt(buf); } - namespace { - void regular_c_free(char* buf, void* /* userdata */ - , aux::block_cache_reference /* ref */) - { - std::free(buf); - } - } - #endif // #if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) - void bt_peer_connection::append_const_send_buffer(char const* buffer, int size - , chained_buffer::free_buffer_fun destructor, void* userdata - , aux::block_cache_reference ref) - { -#if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) - if (!m_enc_handler.is_send_plaintext()) - { - // if we're encrypting this buffer, we need to make a copy - // since we'll mutate it - char* buf = static_cast(std::malloc(size)); - std::memcpy(buf, buffer, size); - append_send_buffer(buf, size, ®ular_c_free, nullptr); - destructor(const_cast(buffer), userdata, ref); - } - else -#endif - { - peer_connection::append_const_send_buffer(buffer, size, destructor - , userdata, ref); - } - } - void bt_peer_connection::write_handshake() { INVARIANT_CHECK; @@ -2433,24 +2403,6 @@ namespace libtorrent #endif } - namespace { - - void buffer_reclaim_block(char* /* buffer */, void* userdata - , aux::block_cache_reference ref) - { - buffer_allocator_interface* buf = static_cast(userdata); - buf->reclaim_blocks(ref); - } - - void buffer_free_disk_buf(char* buffer, void* userdata - , aux::block_cache_reference /* ref */) - { - buffer_allocator_interface* buf = static_cast(userdata); - buf->free_disk_buffer(buffer); - } - - } // anonymous namespace - void bt_peer_connection::write_piece(peer_request const& r, disk_buffer_holder buffer) { INVARIANT_CHECK; @@ -2513,15 +2465,12 @@ namespace libtorrent if (buffer.ref().storage == nullptr) { - append_send_buffer(buffer.get(), r.length - , &buffer_free_disk_buf, &m_allocator); + append_send_buffer(std::move(buffer), r.length); } else { - append_const_send_buffer(buffer.get(), r.length - , &buffer_reclaim_block, &m_allocator, buffer.ref()); + append_const_send_buffer(std::move(buffer), r.length); } - buffer.release(); m_payloads.push_back(range(send_buffer_size() - r.length, r.length)); setup_send(); diff --git a/src/chained_buffer.cpp b/src/chained_buffer.cpp index b71f1997f..b71eca020 100644 --- a/src/chained_buffer.cpp +++ b/src/chained_buffer.cpp @@ -40,6 +40,7 @@ namespace libtorrent void chained_buffer::pop_front(int bytes_to_pop) { TORRENT_ASSERT(is_single_thread()); + TORRENT_ASSERT(!m_destructed); TORRENT_ASSERT(bytes_to_pop <= m_bytes); while (bytes_to_pop > 0 && !m_vec.empty()) { @@ -55,7 +56,7 @@ namespace libtorrent break; } - b.free_fun(b.buf, b.userdata, b.ref); + b.destruct_holder(static_cast(&b.holder)); m_bytes -= b.used_size; m_capacity -= b.size; bytes_to_pop -= b.used_size; @@ -66,54 +67,16 @@ namespace libtorrent } } - void chained_buffer::append_buffer(char* buffer, int s, int used_size - , free_buffer_fun destructor, void* userdata - , aux::block_cache_reference ref) - { - TORRENT_ASSERT(is_single_thread()); - TORRENT_ASSERT(s >= used_size); - buffer_t b; - b.buf = buffer; - b.size = s; - b.start = buffer; - b.used_size = used_size; - b.free_fun = destructor; - b.userdata = userdata; - b.ref = ref; - m_vec.push_back(b); - - m_bytes += used_size; - m_capacity += s; - TORRENT_ASSERT(m_bytes <= m_capacity); - } - - void chained_buffer::prepend_buffer(char* buffer, int s, int used_size - , free_buffer_fun destructor, void* userdata - , aux::block_cache_reference ref) - { - TORRENT_ASSERT(s >= used_size); - buffer_t b; - b.buf = buffer; - b.size = s; - b.start = buffer; - b.used_size = used_size; - b.free_fun = destructor; - b.userdata = userdata; - b.ref = ref; - m_vec.push_front(b); - - m_bytes += used_size; - m_capacity += s; - TORRENT_ASSERT(m_bytes <= m_capacity); - } - // returns the number of bytes available at the // end of the last chained buffer. int chained_buffer::space_in_last_buffer() { TORRENT_ASSERT(is_single_thread()); + TORRENT_ASSERT(!m_destructed); if (m_vec.empty()) return 0; buffer_t& b = m_vec.back(); + TORRENT_ASSERT(b.start != nullptr); + TORRENT_ASSERT(b.buf != nullptr); return b.size - b.used_size - int(b.start - b.buf); } @@ -123,7 +86,8 @@ namespace libtorrent char* chained_buffer::append(char const* buf, int s) { TORRENT_ASSERT(is_single_thread()); - char* insert = allocate_appendix(s); + TORRENT_ASSERT(!m_destructed); + char* const insert = allocate_appendix(s); if (insert == nullptr) return nullptr; std::memcpy(insert, buf, s); return insert; @@ -135,9 +99,12 @@ namespace libtorrent char* chained_buffer::allocate_appendix(int s) { TORRENT_ASSERT(is_single_thread()); + TORRENT_ASSERT(!m_destructed); if (m_vec.empty()) return nullptr; buffer_t& b = m_vec.back(); - char* insert = b.start + b.used_size; + TORRENT_ASSERT(b.start != nullptr); + TORRENT_ASSERT(b.buf != nullptr); + char* const insert = b.start + b.used_size; if (insert + s > b.buf + b.size) return nullptr; b.used_size += s; m_bytes += s; @@ -148,6 +115,7 @@ namespace libtorrent std::vector const& chained_buffer::build_iovec(int to_send) { TORRENT_ASSERT(is_single_thread()); + TORRENT_ASSERT(!m_destructed); m_tmp_vec.clear(); build_vec(to_send, m_tmp_vec); return m_tmp_vec; @@ -155,15 +123,18 @@ namespace libtorrent void chained_buffer::build_mutable_iovec(int bytes, std::vector> &vec) { + TORRENT_ASSERT(!m_destructed); build_vec(bytes, vec); } template - void chained_buffer::build_vec(int bytes, std::vector &vec) + void chained_buffer::build_vec(int bytes, std::vector& vec) { - for (std::deque::iterator i = m_vec.begin() - , end(m_vec.end()); bytes > 0 && i != end; ++i) + TORRENT_ASSERT(!m_destructed); + for (auto i = m_vec.begin(), end(m_vec.end()); bytes > 0 && i != end; ++i) { + TORRENT_ASSERT(i->start != nullptr); + TORRENT_ASSERT(i->buf != nullptr); if (i->used_size > bytes) { TORRENT_ASSERT(bytes > 0); @@ -178,11 +149,9 @@ namespace libtorrent void chained_buffer::clear() { - for (std::deque::iterator i = m_vec.begin() - , end(m_vec.end()); i != end; ++i) - { - i->free_fun(i->buf, i->userdata, i->ref); - } + TORRENT_ASSERT(!m_destructed); + for (auto& b : m_vec) + b.destruct_holder(static_cast(&b.holder)); m_bytes = 0; m_capacity = 0; m_vec.clear(); @@ -190,14 +159,14 @@ namespace libtorrent chained_buffer::~chained_buffer() { -#if TORRENT_USE_ASSERTS TORRENT_ASSERT(!m_destructed); - m_destructed = true; -#endif TORRENT_ASSERT(is_single_thread()); TORRENT_ASSERT(m_bytes >= 0); TORRENT_ASSERT(m_capacity >= 0); clear(); +#if TORRENT_USE_ASSERTS + m_destructed = true; +#endif } } diff --git a/src/disk_io_job.cpp b/src/disk_io_job.cpp index 61fd52cad..4c8e4132d 100644 --- a/src/disk_io_job.cpp +++ b/src/disk_io_job.cpp @@ -35,15 +35,19 @@ POSSIBILITY OF SUCH DAMAGE. namespace libtorrent { + struct buffer_allocator_interface; + namespace { struct caller_visitor : boost::static_visitor<> { - explicit caller_visitor(disk_io_job& j) : m_job(j) {} + explicit caller_visitor(buffer_allocator_interface& alloc, disk_io_job& j) + : m_job(j), m_alloc(alloc) {} void operator()(disk_io_job::read_handler& h) const { if (!h) return; - h(m_job.d.io.ref, m_job.buffer.disk_block, m_job.flags, m_job.error); + disk_buffer_holder block(m_alloc, m_job.d.io.ref, m_job.buffer.disk_block); + h(std::move(block), m_job.flags, m_job.error); } void operator()(disk_io_job::write_handler& h) const @@ -90,6 +94,7 @@ namespace libtorrent private: disk_io_job& m_job; + buffer_allocator_interface& m_alloc; }; } @@ -111,9 +116,9 @@ namespace libtorrent free(buffer.string); } - void disk_io_job::call_callback() + void disk_io_job::call_callback(buffer_allocator_interface& alloc) { - boost::apply_visitor(caller_visitor(*this), callback); + boost::apply_visitor(caller_visitor(alloc, *this), callback); } bool disk_io_job::completed(cached_piece_entry const* pe, int block_size) diff --git a/src/disk_io_thread.cpp b/src/disk_io_thread.cpp index d8e77330a..37a19a4d0 100644 --- a/src/disk_io_thread.cpp +++ b/src/disk_io_thread.cpp @@ -1515,8 +1515,8 @@ namespace libtorrent } void disk_io_thread::async_read(storage_interface* storage, peer_request const& r - , std::function handler, void* requester, std::uint8_t const flags) + , std::function handler + , void* requester, std::uint8_t const flags) { TORRENT_ASSERT(r.length <= m_disk_cache.block_size()); TORRENT_ASSERT(r.length <= 16 * 1024); @@ -1541,7 +1541,7 @@ namespace libtorrent switch (ret) { case 0: - j->call_callback(); + j->call_callback(*this); free_job(j); break; case 1: @@ -1766,7 +1766,7 @@ namespace libtorrent #endif l.unlock(); - j->call_callback(); + j->call_callback(*this); free_job(j); return; } @@ -1920,7 +1920,7 @@ namespace libtorrent if (m_abort) { j->error.ec = boost::asio::error::operation_aborted; - j->call_callback(); + j->call_callback(*this); free_job(j); return; } @@ -3385,7 +3385,7 @@ namespace libtorrent #if TORRENT_USE_ASSERTS j->callback_called = true; #endif - j->call_callback(); + j->call_callback(*this); to_delete[cnt++] = j; j = next; if (cnt == int(to_delete.size())) diff --git a/src/peer_connection.cpp b/src/peer_connection.cpp index b4a19c3cf..3aa3cdf18 100644 --- a/src/peer_connection.cpp +++ b/src/peer_connection.cpp @@ -119,7 +119,6 @@ namespace libtorrent , m_max_out_request_queue(m_settings.get_int(settings_pack::max_out_request_queue)) , m_remote(pack.endp) , m_disk_thread(*pack.disk_thread) - , m_allocator(*pack.allocator) , m_ios(*pack.ios) , m_work(m_ios) , m_outstanding_piece_verification(0) @@ -5127,7 +5126,7 @@ namespace libtorrent m_disk_thread.async_read(&t->storage(), r , std::bind(&peer_connection::on_disk_read_complete - , self(), _1, _2, _3, _4, r, clock_type::now()), this); + , self(), _1, _2, _3, r, clock_type::now()), this); } m_last_sent_payload = clock_type::now(); m_requests.erase(m_requests.begin() + i); @@ -5196,8 +5195,8 @@ namespace libtorrent fill_send_buffer(); } - void peer_connection::on_disk_read_complete(aux::block_cache_reference ref - , char* disk_block, int const flags, storage_error const& error + void peer_connection::on_disk_read_complete(disk_buffer_holder buffer + , int const flags, storage_error const& error , peer_request r, time_point issue_time) { TORRENT_ASSERT(is_single_thread()); @@ -5213,7 +5212,7 @@ namespace libtorrent peer_log(peer_log_alert::info, "FILE_ASYNC_READ_COMPLETE" , "piece: %d s: %x l: %x b: %p c: %s e: %s rtt: %d us" , static_cast(r.piece), r.start, r.length - , static_cast(disk_block) + , static_cast(buffer.get()) , (flags & disk_interface::cache_hit ? "cache hit" : "cache miss") , error.ec.message().c_str(), disk_rtt); } @@ -5230,7 +5229,7 @@ namespace libtorrent return; } - TORRENT_ASSERT(disk_block == nullptr); + TORRENT_ASSERT(buffer.get() == nullptr); write_dont_have(r.piece); write_reject_request(r); if (t->alerts().should_post()) @@ -5248,11 +5247,6 @@ namespace libtorrent // block, the peer is still useful m_disk_read_failures = 0; - // even if we're disconnecting, we need to free this block - // otherwise the disk thread will hang, waiting for the network - // thread to be done with it - disk_buffer_holder buffer(m_allocator, ref, disk_block); - if (t && m_settings.get_int(settings_pack::suggest_mode) == settings_pack::suggest_read_cache) { @@ -5453,8 +5447,7 @@ namespace libtorrent // this const_cast is a here because chained_buffer need to be // fixed. char* ptr = const_cast(i->data()); - m_send_buffer.prepend_buffer(ptr - , size, size, &nop, nullptr); + m_send_buffer.prepend_buffer(nop(ptr), size, size); } set_send_barrier(next_barrier); } @@ -5646,24 +5639,6 @@ namespace libtorrent std::bind(&peer_connection::on_receive_data, self(), _1, _2))); } - void peer_connection::append_send_buffer(char* buffer, int size - , chained_buffer::free_buffer_fun destructor, void* userdata - , aux::block_cache_reference ref) - { - TORRENT_ASSERT(is_single_thread()); - m_send_buffer.append_buffer(buffer, size, size, destructor - , userdata, ref); - } - - void peer_connection::append_const_send_buffer(char const* buffer, int size - , chained_buffer::free_buffer_fun destructor, void* userdata - , aux::block_cache_reference ref) - { - TORRENT_ASSERT(is_single_thread()); - m_send_buffer.append_buffer(const_cast(buffer), size, size, destructor - , userdata, ref); - } - piece_block_progress peer_connection::downloading_piece_progress() const { #ifndef TORRENT_DISABLE_LOGGING @@ -5673,14 +5648,6 @@ namespace libtorrent return piece_block_progress(); } - namespace { - void session_free_buffer(char* buffer, void* userdata, aux::block_cache_reference) - { - aux::session_interface* ses = static_cast(userdata); - ses->free_buffer(buffer); - } - } - void peer_connection::send_buffer(char const* buf, int size, int flags) { TORRENT_ASSERT(is_single_thread()); @@ -5704,20 +5671,14 @@ namespace libtorrent int i = 0; while (size > 0) { - char* chain_buf = m_ses.allocate_buffer(); - if (chain_buf == nullptr) - { - disconnect(errors::no_memory, op_alloc_sndbuf); - return; - } + aux::ses_buffer_holder session_buf = m_ses.allocate_buffer(); - const int alloc_buf_size = m_ses.send_buffer_size(); - int buf_size = (std::min)(alloc_buf_size, size); - std::memcpy(chain_buf, buf, buf_size); + int const alloc_buf_size = m_ses.send_buffer_size(); + int const buf_size = std::min(alloc_buf_size, size); + std::memcpy(session_buf.get(), buf, buf_size); buf += buf_size; size -= buf_size; - m_send_buffer.append_buffer(chain_buf, alloc_buf_size, buf_size - , &session_free_buffer, &m_ses); + m_send_buffer.append_buffer(std::move(session_buf), alloc_buf_size, buf_size); ++i; } setup_send(); diff --git a/src/session_impl.cpp b/src/session_impl.cpp index 310c0a9cc..0a30d20be 100644 --- a/src/session_impl.cpp +++ b/src/session_impl.cpp @@ -2825,7 +2825,6 @@ namespace aux { pack.ses = this; pack.sett = &m_settings; pack.stats_counters = &m_stats_counters; - pack.allocator = this; pack.disk_thread = &m_disk_thread; pack.ios = &m_io_service; pack.tor = std::weak_ptr(); @@ -6664,39 +6663,15 @@ namespace aux { #endif } - // decrement the refcount of the block in the disk cache - // since the network thread doesn't need it anymore - void session_impl::reclaim_blocks(span refs) - { - m_blocks_to_reclaim.insert(m_blocks_to_reclaim.end(), refs.begin(), refs.end()); - if (m_pending_block_reclaim) return; - - m_io_service.post(std::bind(&session_impl::do_reclaim_blocks, this)); - m_pending_block_reclaim = true; - } - - void session_impl::do_reclaim_blocks() - { - TORRENT_ASSERT(m_pending_block_reclaim); - m_pending_block_reclaim = false; - m_disk_thread.reclaim_blocks(m_blocks_to_reclaim); - m_blocks_to_reclaim.clear(); - } - - void session_impl::free_disk_buffer(char* buf) - { - m_disk_thread.free_disk_buffer(buf); - } - - char* session_impl::allocate_buffer() + ses_buffer_holder session_impl::allocate_buffer() { TORRENT_ASSERT(is_single_thread()); #ifdef TORRENT_DISABLE_POOL_ALLOCATOR int num_bytes = send_buffer_size(); - return static_cast(malloc(num_bytes)); + return ses_buffer_holder(*this, static_cast(malloc(num_bytes))); #else - return static_cast(m_send_buffers.malloc()); + return ses_buffer_holder(*this, static_cast(m_send_buffers.malloc())); #endif } diff --git a/src/smart_ban.cpp b/src/smart_ban.cpp index 926980fb8..f5a2b8807 100644 --- a/src/smart_ban.cpp +++ b/src/smart_ban.cpp @@ -95,7 +95,7 @@ namespace { m_torrent.session().disk_thread().async_read(&m_torrent.storage() , r, std::bind(&smart_ban_plugin::on_read_ok_block - , shared_from_this(), *i, i->second.peer->address(), _1, _2, r.length, _3, _4) + , shared_from_this(), *i, i->second.peer->address(), _1, r.length, _2, _3) , reinterpret_cast(1)); m_block_hashes.erase(i++); } @@ -152,7 +152,7 @@ namespace // block read will have been deleted by the time it gets back to the network thread m_torrent.session().disk_thread().async_read(&m_torrent.storage(), r , std::bind(&smart_ban_plugin::on_read_failed_block - , shared_from_this(), pb, (*i)->address(), _1, _2, r.length, _3, _4) + , shared_from_this(), pb, (*i)->address(), _1, r.length, _2, _3) , reinterpret_cast(1) , disk_io_job::force_copy); } @@ -176,18 +176,16 @@ namespace }; void on_read_failed_block(piece_block b, address a - , aux::block_cache_reference ref, char* disk_block, int const block_size, int + , disk_buffer_holder buffer, int const block_size, int , storage_error const& error) { TORRENT_ASSERT(m_torrent.session().is_single_thread()); - disk_buffer_holder buffer(m_torrent.session(), ref, disk_block); - // ignore read errors if (error) return; hasher h; - h.update({disk_block, std::size_t(block_size)}); + h.update({buffer.get(), std::size_t(block_size)}); h.update(reinterpret_cast(&m_salt), sizeof(m_salt)); std::pair const range @@ -260,18 +258,16 @@ namespace } void on_read_ok_block(std::pair b, address a - , aux::block_cache_reference ref, char* disk_block, int const block_size, int + , disk_buffer_holder buffer, int const block_size, int , storage_error const& error) { TORRENT_ASSERT(m_torrent.session().is_single_thread()); - disk_buffer_holder buffer(m_torrent.session(), ref, disk_block); - // ignore read errors if (error) return; hasher h; - h.update({disk_block, std::size_t(block_size)}); + h.update({buffer.get(), std::size_t(block_size)}); h.update(reinterpret_cast(&m_salt), sizeof(m_salt)); sha1_hash const ok_digest = h.final(); diff --git a/src/torrent.cpp b/src/torrent.cpp index ae4c8f278..041762438 100644 --- a/src/torrent.cpp +++ b/src/torrent.cpp @@ -859,7 +859,7 @@ namespace libtorrent r.length = (std::min)(piece_size - r.start, block_size()); m_ses.disk_thread().async_read(&storage(), r , std::bind(&torrent::on_disk_read_complete - , shared_from_this(), _1, _2, _3, _4, r, rp), reinterpret_cast(1)); + , shared_from_this(), _1, _2, _3, r, rp), reinterpret_cast(1)); } } @@ -1124,15 +1124,13 @@ namespace libtorrent } catch (...) { handle_exception(); } - void torrent::on_disk_read_complete(aux::block_cache_reference ref - , char* block, int, storage_error const& se + void torrent::on_disk_read_complete(disk_buffer_holder buffer + , int, storage_error const& se , peer_request r, std::shared_ptr rp) try { // hold a reference until this function returns TORRENT_ASSERT(is_single_thread()); - disk_buffer_holder buffer(m_ses, ref, block); - --rp->blocks_left; if (se) { @@ -1142,7 +1140,7 @@ namespace libtorrent } else { - std::memcpy(rp->piece_data.get() + r.start, block, r.length); + std::memcpy(rp->piece_data.get() + r.start, buffer.get(), r.length); } if (rp->blocks_left == 0) @@ -5853,7 +5851,6 @@ namespace libtorrent pack.ses = &m_ses; pack.sett = &settings(); pack.stats_counters = &m_ses.stats_counters(); - pack.allocator = &m_ses; pack.disk_thread = &m_ses.disk_thread(); pack.ios = &m_ses.get_io_service(); pack.tor = shared_from_this(); @@ -6536,7 +6533,6 @@ namespace libtorrent pack.ses = &m_ses; pack.sett = &settings(); pack.stats_counters = &m_ses.stats_counters(); - pack.allocator = &m_ses; pack.disk_thread = &m_ses.disk_thread(); pack.ios = &m_ses.get_io_service(); pack.tor = shared_from_this(); diff --git a/src/ut_metadata.cpp b/src/ut_metadata.cpp index 3d29907ad..d43c5b330 100644 --- a/src/ut_metadata.cpp +++ b/src/ut_metadata.cpp @@ -280,7 +280,7 @@ namespace libtorrent { namespace // TODO: we really need to increment the refcounter on the torrent // while this buffer is still in the peer's send buffer if (metadata_piece_size) m_pc.append_const_send_buffer( - metadata, metadata_piece_size); + nop(const_cast(metadata)), metadata_piece_size); m_pc.stats_counters().inc_stats_counter(counters::num_outgoing_extended); m_pc.stats_counters().inc_stats_counter(counters::num_outgoing_metadata); diff --git a/test/test_buffer.cpp b/test/test_buffer.cpp index fbc90bc4e..09b654541 100644 --- a/test/test_buffer.cpp +++ b/test/test_buffer.cpp @@ -140,9 +140,8 @@ TORRENT_TEST(buffer_move_assign) std::set buffer_list; -void free_buffer(char* m, void* userdata, aux::block_cache_reference ref) +void free_buffer(char* m) { - TEST_CHECK(userdata == (void*)0x1337); std::set::iterator i = buffer_list.find(m); TEST_CHECK(i != buffer_list.end()); @@ -181,6 +180,25 @@ bool compare_chained_buffer(chained_buffer& b, char const* mem, int size) return std::memcmp(&flat[0], mem, size) == 0; } +struct holder +{ + explicit holder(char* buf) : m_buf(buf) {} + ~holder() { if (m_buf) free_buffer(m_buf); } + holder(holder const&) = delete; + holder& operator=(holder const&) = delete; + holder(holder&& rhs) : m_buf(rhs.m_buf) { rhs.m_buf = nullptr; } + holder& operator=(holder&& rhs) + { + if (m_buf) free_buffer(m_buf); + m_buf = rhs.m_buf; + rhs.m_buf = nullptr; + return *this; + } + char* get() const { return m_buf; } +private: + char* m_buf; +}; + TORRENT_TEST(chained_buffer) { char data[] = "foobar"; @@ -199,7 +217,7 @@ TORRENT_TEST(chained_buffer) char* b1 = allocate_buffer(512); std::memcpy(b1, data, 6); - b.append_buffer(b1, 512, 6, &free_buffer, (void*)0x1337); + b.append_buffer(holder(b1), 512, 6); TEST_EQUAL(buffer_list.size(), 1); TEST_CHECK(b.capacity() == 512); @@ -229,12 +247,12 @@ TORRENT_TEST(chained_buffer) char* b2 = allocate_buffer(512); std::memcpy(b2, data, 6); - b.append_buffer(b2, 512, 6, free_buffer, (void*)0x1337); + b.append_buffer(holder(b2), 512, 6); TEST_CHECK(buffer_list.size() == 2); char* b3 = allocate_buffer(512); std::memcpy(b3, data, 6); - b.append_buffer(b3, 512, 6, &free_buffer, (void*)0x1337); + b.append_buffer(holder(b3), 512, 6); TEST_CHECK(buffer_list.size() == 3); TEST_CHECK(b.capacity() == 512 * 3); @@ -269,7 +287,7 @@ TORRENT_TEST(chained_buffer) char* b4 = allocate_buffer(20); std::memcpy(b4, data, 6); std::memcpy(b4 + 6, data, 6); - b.append_buffer(b4, 20, 12, &free_buffer, (void*)0x1337); + b.append_buffer(holder(b4), 20, 12); TEST_CHECK(b.space_in_last_buffer() == 8); ret = b.append(data, 6) != nullptr; @@ -282,8 +300,8 @@ TORRENT_TEST(chained_buffer) std::cout << b.space_in_last_buffer() << std::endl; char* b5 = allocate_buffer(20); - std::memcpy(b4, data, 6); - b.append_buffer(b5, 20, 6, &free_buffer, (void*)0x1337); + std::memcpy(b5, data, 6); + b.append_buffer(holder(b5), 20, 6); b.pop_front(22); TEST_CHECK(b.size() == 5);