From 192ef4962b4847dff381d034e4856d0bc7a94d95 Mon Sep 17 00:00:00 2001 From: arvidn Date: Sun, 25 Dec 2016 23:38:05 -0800 Subject: [PATCH] simplify the buffer_allocator_interface --- include/libtorrent/aux_/session_impl.hpp | 4 -- include/libtorrent/disk_buffer_holder.hpp | 4 -- include/libtorrent/disk_interface.hpp | 5 ++- include/libtorrent/disk_io_thread.hpp | 13 +----- include/libtorrent/peer_connection.hpp | 1 - src/disk_io_thread.cpp | 22 +++++---- src/peer_connection.cpp | 55 ++++++++--------------- src/session_impl.cpp | 12 ----- src/torrent.cpp | 37 ++++++++++----- 9 files changed, 59 insertions(+), 94 deletions(-) diff --git a/include/libtorrent/aux_/session_impl.hpp b/include/libtorrent/aux_/session_impl.hpp index f66837761..126e80c6b 100644 --- a/include/libtorrent/aux_/session_impl.hpp +++ b/include/libtorrent/aux_/session_impl.hpp @@ -622,10 +622,6 @@ namespace libtorrent // implements buffer_allocator_interface void free_disk_buffer(char* buf) override; - disk_buffer_holder allocate_disk_buffer(char const* category) override; - disk_buffer_holder allocate_disk_buffer(bool& exceeded - , std::shared_ptr o - , char const* category) override; void reclaim_blocks(span refs) override; void do_reclaim_blocks(); diff --git a/include/libtorrent/disk_buffer_holder.hpp b/include/libtorrent/disk_buffer_holder.hpp index 130b0e7e2..d4c4c0792 100644 --- a/include/libtorrent/disk_buffer_holder.hpp +++ b/include/libtorrent/disk_buffer_holder.hpp @@ -50,10 +50,6 @@ namespace libtorrent { virtual void free_disk_buffer(char* b) = 0; virtual void reclaim_blocks(span refs) = 0; - virtual disk_buffer_holder allocate_disk_buffer(char const* category) = 0; - virtual disk_buffer_holder allocate_disk_buffer(bool& exceeded - , std::shared_ptr o - , char const* category) = 0; protected: ~buffer_allocator_interface() {} }; diff --git a/include/libtorrent/disk_interface.hpp b/include/libtorrent/disk_interface.hpp index c60ca9e6a..725e1ed2b 100644 --- a/include/libtorrent/disk_interface.hpp +++ b/include/libtorrent/disk_interface.hpp @@ -36,6 +36,7 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/bdecode.hpp" #include +#include #include "libtorrent/units.hpp" #include "libtorrent/aux_/vector.hpp" @@ -77,8 +78,8 @@ namespace libtorrent virtual void async_read(storage_interface* storage, peer_request const& r , std::function handler, void* requester, std::uint8_t flags = 0) = 0; - virtual void async_write(storage_interface* storage, peer_request const& r - , disk_buffer_holder buffer + virtual bool async_write(storage_interface* storage, peer_request const& r + , char const* buf, std::shared_ptr o , std::function handler , std::uint8_t flags = 0) = 0; virtual void async_hash(storage_interface* storage, piece_index_t piece, std::uint8_t flags diff --git a/include/libtorrent/disk_io_thread.hpp b/include/libtorrent/disk_io_thread.hpp index 21baa7c80..640505002 100644 --- a/include/libtorrent/disk_io_thread.hpp +++ b/include/libtorrent/disk_io_thread.hpp @@ -294,8 +294,8 @@ namespace libtorrent void async_read(storage_interface* storage, peer_request const& r , std::function handler, void* requester, std::uint8_t flags = 0) override; - void async_write(storage_interface* storage, peer_request const& r - , disk_buffer_holder buffer + bool async_write(storage_interface* storage, peer_request const& r + , char const* buf, std::shared_ptr o , std::function handler , std::uint8_t flags = 0) override; void async_hash(storage_interface* storage, piece_index_t piece, std::uint8_t flags @@ -332,16 +332,7 @@ namespace libtorrent // implements buffer_allocator_interface void reclaim_blocks(span ref) override; void free_disk_buffer(char* buf) override { m_disk_cache.free_buffer(buf); } - disk_buffer_holder allocate_disk_buffer(char const* category) override - { - bool exceed = false; - return allocate_disk_buffer(exceed, std::shared_ptr(), category); - } - void trigger_cache_trim(); - disk_buffer_holder allocate_disk_buffer(bool& exceeded, std::shared_ptr o - , char const* category) override; - void update_stats_counters(counters& c) const override; void get_cache_info(cache_status* ret, bool no_pieces = true , storage_interface const* storage = 0) const override; diff --git a/include/libtorrent/peer_connection.hpp b/include/libtorrent/peer_connection.hpp index 8c825099b..28be78ae4 100644 --- a/include/libtorrent/peer_connection.hpp +++ b/include/libtorrent/peer_connection.hpp @@ -550,7 +550,6 @@ namespace libtorrent void incoming_dont_have(piece_index_t piece_index); void incoming_bitfield(typed_bitfield const& bits); void incoming_request(peer_request const& r); - void incoming_piece(peer_request const& p, disk_buffer_holder data); void incoming_piece(peer_request const& p, char const* data); void incoming_piece_fragment(int bytes); void start_receive_piece(peer_request const& r); diff --git a/src/disk_io_thread.cpp b/src/disk_io_thread.cpp index 8a9309dfb..f49af2e04 100644 --- a/src/disk_io_thread.cpp +++ b/src/disk_io_thread.cpp @@ -1625,14 +1625,19 @@ namespace libtorrent return 1; } - void disk_io_thread::async_write(storage_interface* storage, peer_request const& r - , disk_buffer_holder buffer + bool disk_io_thread::async_write(storage_interface* storage, peer_request const& r + , char const* buf, std::shared_ptr o , std::function handler , std::uint8_t const flags) { TORRENT_ASSERT(r.length <= m_disk_cache.block_size()); TORRENT_ASSERT(r.length <= 16 * 1024); + bool exceeded = false; + disk_buffer_holder buffer(*this, m_disk_cache.allocate_buffer(exceeded, o, "receive buffer")); + if (!buffer) throw std::bad_alloc(); + std::memcpy(buffer.get(), buf, r.length); + disk_io_job* j = allocate_job(disk_io_job::write); j->storage = storage->shared_from_this(); j->piece = r.piece; @@ -1690,7 +1695,7 @@ namespace libtorrent // make the holder give up ownership of the buffer // since the job was successfully queued up buffer.release(); - return; + return exceeded; } std::unique_lock l(m_cache_mutex); @@ -1720,12 +1725,13 @@ namespace libtorrent // if we added the block (regardless of whether we also // issued a flush job or not), we're done. - return; + return exceeded; } l.unlock(); add_job(j); buffer.release(); + return exceeded; } void disk_io_thread::async_hash(storage_interface* storage @@ -3176,14 +3182,6 @@ namespace libtorrent submit_jobs(); } - disk_buffer_holder disk_io_thread::allocate_disk_buffer(bool& exceeded - , std::shared_ptr o - , char const* category) - { - char* ret = m_disk_cache.allocate_buffer(exceeded, o, category); - return disk_buffer_holder(*this, ret); - } - void disk_io_thread::add_completed_jobs(jobqueue_t& jobs) { jobqueue_t new_completed_jobs; diff --git a/src/peer_connection.cpp b/src/peer_connection.cpp index 3b2d69c4e..b4a19c3cf 100644 --- a/src/peer_connection.cpp +++ b/src/peer_connection.cpp @@ -2588,39 +2588,6 @@ namespace libtorrent // ----------------------------- void peer_connection::incoming_piece(peer_request const& p, char const* data) - { - TORRENT_ASSERT(is_single_thread()); - bool exceeded = false; - disk_buffer_holder buffer - = m_allocator.allocate_disk_buffer(exceeded, self(), "receive buffer"); - - if (!buffer) - { - disconnect(errors::no_memory, op_alloc_recvbuf); - return; - } - - // every peer is entitled to have two disk blocks allocated at any given - // time, regardless of whether the cache size is exceeded or not. If this - // was not the case, when the cache size setting is very small, most peers - // would be blocked most of the time, because the disk cache would - // continuously be in exceeded state. Only rarely would it actually drop - // down to 0 and unblock all peers. - if (exceeded && m_outstanding_writing_bytes > 0) - { - if ((m_channel_state[download_channel] & peer_info::bw_disk) == 0) - m_counters.inc_stats_counter(counters::num_peers_down_disk); - m_channel_state[download_channel] |= peer_info::bw_disk; -#ifndef TORRENT_DISABLE_LOGGING - peer_log(peer_log_alert::info, "DISK", "exceeded disk buffer watermark"); -#endif - } - - std::memcpy(buffer.get(), data, p.length); - incoming_piece(p, std::move(buffer)); - } - - void peer_connection::incoming_piece(peer_request const& p, disk_buffer_holder data) { TORRENT_ASSERT(is_single_thread()); INVARIANT_CHECK; @@ -2636,7 +2603,7 @@ namespace libtorrent if (m_remote.address().is_v4() && (m_remote.address().to_v4().to_ulong() & 0xf) == 0) { - data.get()[0] = ~data.get()[0]; + data[0] = ~data[0]; } #endif @@ -2654,7 +2621,7 @@ namespace libtorrent #ifndef TORRENT_DISABLE_EXTENSIONS for (auto const& e : m_extensions) { - if (e->on_piece(p, {data.get(), size_t(p.length)})) + if (e->on_piece(p, {data, size_t(p.length)})) { #if TORRENT_USE_ASSERTS TORRENT_ASSERT(m_received_in_piece == p.length); @@ -2828,10 +2795,26 @@ namespace libtorrent if (t->is_deleted()) return; - m_disk_thread.async_write(&t->storage(), p, std::move(data) + bool const exceeded = m_disk_thread.async_write(&t->storage(), p, data, self() , std::bind(&peer_connection::on_disk_write_complete , self(), _1, p, t)); + // every peer is entitled to have two disk blocks allocated at any given + // time, regardless of whether the cache size is exceeded or not. If this + // was not the case, when the cache size setting is very small, most peers + // would be blocked most of the time, because the disk cache would + // continuously be in exceeded state. Only rarely would it actually drop + // down to 0 and unblock all peers. + if (exceeded && m_outstanding_writing_bytes > 0) + { + if ((m_channel_state[download_channel] & peer_info::bw_disk) == 0) + m_counters.inc_stats_counter(counters::num_peers_down_disk); + m_channel_state[download_channel] |= peer_info::bw_disk; +#ifndef TORRENT_DISABLE_LOGGING + peer_log(peer_log_alert::info, "DISK", "exceeded disk buffer watermark"); +#endif + } + std::int64_t const write_queue_size = m_counters.inc_stats_counter( counters::queued_write_bytes, p.length); m_outstanding_writing_bytes += p.length; diff --git a/src/session_impl.cpp b/src/session_impl.cpp index 69f049d6c..310c0a9cc 100644 --- a/src/session_impl.cpp +++ b/src/session_impl.cpp @@ -6683,23 +6683,11 @@ namespace aux { m_blocks_to_reclaim.clear(); } - disk_buffer_holder session_impl::allocate_disk_buffer(char const* category) - { - return m_disk_thread.allocate_disk_buffer(category); - } - void session_impl::free_disk_buffer(char* buf) { m_disk_thread.free_disk_buffer(buf); } - disk_buffer_holder session_impl::allocate_disk_buffer(bool& exceeded - , std::shared_ptr o - , char const* category) - { - return m_disk_thread.allocate_disk_buffer(exceeded, o, category); - } - char* session_impl::allocate_buffer() { TORRENT_ASSERT(is_single_thread()); diff --git a/src/torrent.cpp b/src/torrent.cpp index d4f5c74f8..d8daae1bb 100644 --- a/src/torrent.cpp +++ b/src/torrent.cpp @@ -1213,6 +1213,28 @@ namespace libtorrent } } + struct piece_refcount + { + piece_refcount(piece_picker& p, piece_index_t piece) + : m_picker(p) + , m_piece(piece) + { + m_picker.inc_refcount(m_piece, nullptr); + } + + piece_refcount(piece_refcount const&) = delete; + piece_refcount& operator=(piece_refcount const&) = delete; + + ~piece_refcount() + { + m_picker.dec_refcount(m_piece, nullptr); + } + + private: + piece_picker& m_picker; + piece_index_t m_piece; + }; + // TODO: 3 there's some duplication between this function and // peer_connection::incoming_piece(). is there a way to merge something? void torrent::add_piece(piece_index_t const piece, char const* data, int const flags) @@ -1235,25 +1257,17 @@ namespace libtorrent peer_request p; p.piece = piece; p.start = 0; - picker().inc_refcount(piece, nullptr); + piece_refcount refcount{picker(), piece}; for (int i = 0; i < blocks_in_piece; ++i, p.start += block_size()) { if (picker().is_finished(piece_block(piece, i)) && (flags & torrent::overwrite_existing) == 0) continue; - p.length = (std::min)(piece_size - p.start, int(block_size())); - disk_buffer_holder buffer = m_ses.allocate_disk_buffer("add piece"); - // out of memory - if (!buffer) - { - picker().dec_refcount(piece, nullptr); - return; - } - std::memcpy(buffer.get(), data + p.start, p.length); + p.length = std::min(piece_size - p.start, int(block_size())); m_stats_counters.inc_stats_counter(counters::queued_write_bytes, p.length); - m_ses.disk_thread().async_write(&storage(), p, std::move(buffer) + m_ses.disk_thread().async_write(&storage(), p, data + p.start, nullptr , std::bind(&torrent::on_disk_write_complete , shared_from_this(), _1, p)); @@ -1274,7 +1288,6 @@ namespace libtorrent verify_piece(p.piece); } } - picker().dec_refcount(piece, nullptr); } void torrent::on_disk_write_complete(storage_error const& error