move coalescing of reclaim block from disk_io_thread to session_impl (#1281)

This commit is contained in:
Arvid Norberg 2016-10-31 23:48:30 -04:00 committed by GitHub
parent 9ed948b3b1
commit 1c2a8fb96e
7 changed files with 34 additions and 36 deletions

View File

@ -595,7 +595,8 @@ namespace libtorrent
disk_buffer_holder allocate_disk_buffer(bool& exceeded disk_buffer_holder allocate_disk_buffer(bool& exceeded
, std::shared_ptr<disk_observer> o , std::shared_ptr<disk_observer> o
, char const* category) override; , char const* category) override;
void reclaim_block(block_cache_reference ref) override; void reclaim_blocks(span<block_cache_reference> refs) override;
void do_reclaim_blocks();
bool exceeded_cache_use() const bool exceeded_cache_use() const
{ return m_disk_thread.exceeded_cache_use(); } { return m_disk_thread.exceeded_cache_use(); }
@ -1087,6 +1088,12 @@ namespace libtorrent
// 5 minutes) // 5 minutes)
torrent_map::iterator m_next_lsd_torrent; torrent_map::iterator m_next_lsd_torrent;
// we try to return disk buffers to the disk thread in batches, to
// avoid hammering its mutex. We accrue blocks here and defer returning
// them in a function we post to the io_service
std::vector<block_cache_reference> m_blocks_to_reclaim;
bool m_pending_block_reclaim = false;
#ifndef TORRENT_DISABLE_DHT #ifndef TORRENT_DISABLE_DHT
// torrents are announced on the DHT in a // torrents are announced on the DHT in a
// round-robin fashion. All torrents are cycled through // round-robin fashion. All torrents are cycled through

View File

@ -36,6 +36,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/config.hpp" #include "libtorrent/config.hpp"
#include "libtorrent/assert.hpp" #include "libtorrent/assert.hpp"
#include "libtorrent/disk_io_job.hpp" // for block_cache_reference #include "libtorrent/disk_io_job.hpp" // for block_cache_reference
#include "libtorrent/span.hpp"
#include <memory> #include <memory>
@ -48,7 +49,7 @@ namespace libtorrent
struct TORRENT_EXTRA_EXPORT buffer_allocator_interface struct TORRENT_EXTRA_EXPORT buffer_allocator_interface
{ {
virtual void free_disk_buffer(char* b) = 0; virtual void free_disk_buffer(char* b) = 0;
virtual void reclaim_block(block_cache_reference ref) = 0; virtual void reclaim_blocks(span<block_cache_reference> refs) = 0;
virtual disk_buffer_holder allocate_disk_buffer(char const* category) = 0; virtual disk_buffer_holder allocate_disk_buffer(char const* category) = 0;
virtual disk_buffer_holder allocate_disk_buffer(bool& exceeded virtual disk_buffer_holder allocate_disk_buffer(bool& exceeded
, std::shared_ptr<disk_observer> o , std::shared_ptr<disk_observer> o

View File

@ -340,7 +340,7 @@ namespace libtorrent
void clear_piece(piece_manager* storage, int index) override; void clear_piece(piece_manager* storage, int index) override;
// implements buffer_allocator_interface // implements buffer_allocator_interface
void reclaim_block(block_cache_reference ref) override; void reclaim_blocks(span<block_cache_reference> ref) override;
void free_disk_buffer(char* buf) override { m_disk_cache.free_buffer(buf); } void free_disk_buffer(char* buf) override { m_disk_cache.free_buffer(buf); }
disk_buffer_holder allocate_disk_buffer(char const* category) override disk_buffer_holder allocate_disk_buffer(char const* category) override
{ {
@ -523,9 +523,6 @@ namespace libtorrent
void try_flush_write_blocks(int num, jobqueue_t& completed_jobs, std::unique_lock<std::mutex>& l); void try_flush_write_blocks(int num, jobqueue_t& completed_jobs, std::unique_lock<std::mutex>& l);
// used to batch reclaiming of blocks to once per cycle
void commit_reclaimed_blocks();
void maybe_flush_write_blocks(); void maybe_flush_write_blocks();
void execute_job(disk_io_job* j); void execute_job(disk_io_job* j);
void immediate_execute(); void immediate_execute();
@ -622,15 +619,6 @@ namespace libtorrent
// completion callbacks in m_completed jobs // completion callbacks in m_completed jobs
bool m_job_completions_in_flight = false; bool m_job_completions_in_flight = false;
// these are blocks that have been returned by the main thread
// but they haven't been freed yet. This is used to batch
// reclaiming of blocks, to only need one std::mutex lock per cycle
std::vector<block_cache_reference> m_blocks_to_reclaim;
// when this is true, there is an outstanding message in the
// message queue that will reclaim all blocks in
// m_blocks_to_reclaim, there's no need to send another one
bool m_outstanding_reclaim_message = false;
#if TORRENT_USE_ASSERTS #if TORRENT_USE_ASSERTS
int m_magic = 0x1337; int m_magic = 0x1337;
std::atomic<bool> m_jobs_aborted{false}; std::atomic<bool> m_jobs_aborted{false};

View File

@ -2436,7 +2436,7 @@ namespace libtorrent
, block_cache_reference ref) , block_cache_reference ref)
{ {
buffer_allocator_interface* buf = static_cast<buffer_allocator_interface*>(userdata); buffer_allocator_interface* buf = static_cast<buffer_allocator_interface*>(userdata);
buf->reclaim_block(ref); buf->reclaim_blocks(ref);
} }
void buffer_free_disk_buf(char* buffer, void* userdata void buffer_free_disk_buf(char* buffer, void* userdata

View File

@ -72,7 +72,7 @@ namespace libtorrent
void disk_buffer_holder::reset(disk_io_job const& j) void disk_buffer_holder::reset(disk_io_job const& j)
{ {
if (m_ref.storage) m_allocator->reclaim_block(m_ref); if (m_ref.storage) m_allocator->reclaim_blocks(m_ref);
else if (m_buf) m_allocator->free_disk_buffer(m_buf); else if (m_buf) m_allocator->free_disk_buffer(m_buf);
m_buf = j.buffer.disk_block; m_buf = j.buffer.disk_block;
m_ref = j.d.io.ref; m_ref = j.d.io.ref;
@ -88,7 +88,7 @@ namespace libtorrent
void disk_buffer_holder::reset(char* const buf) void disk_buffer_holder::reset(char* const buf)
{ {
if (m_ref.storage) m_allocator->reclaim_block(m_ref); if (m_ref.storage) m_allocator->reclaim_blocks(m_ref);
else if (m_buf) m_allocator->free_disk_buffer(m_buf); else if (m_buf) m_allocator->free_disk_buffer(m_buf);
m_buf = buf; m_buf = buf;
m_ref.storage = nullptr; m_ref.storage = nullptr;

View File

@ -251,26 +251,16 @@ namespace libtorrent
m_hash_threads.set_max_threads(num_hash_threads); m_hash_threads.set_max_threads(num_hash_threads);
} }
void disk_io_thread::reclaim_block(block_cache_reference ref) void disk_io_thread::reclaim_blocks(span<block_cache_reference> refs)
{ {
TORRENT_ASSERT(m_magic == 0x1337); TORRENT_ASSERT(m_magic == 0x1337);
TORRENT_ASSERT(ref.storage);
m_blocks_to_reclaim.push_back(ref);
if (m_outstanding_reclaim_message) return;
m_ios.post(std::bind(&disk_io_thread::commit_reclaimed_blocks, this));
m_outstanding_reclaim_message = true;
}
void disk_io_thread::commit_reclaimed_blocks()
{
TORRENT_ASSERT(m_magic == 0x1337);
TORRENT_ASSERT(m_outstanding_reclaim_message);
m_outstanding_reclaim_message = false;
std::unique_lock<std::mutex> l(m_cache_mutex); std::unique_lock<std::mutex> l(m_cache_mutex);
for (int i = 0; i < m_blocks_to_reclaim.size(); ++i) for (auto ref : refs)
m_disk_cache.reclaim_block(m_blocks_to_reclaim[i]); {
m_blocks_to_reclaim.clear(); TORRENT_ASSERT(ref.storage);
m_disk_cache.reclaim_block(ref);
}
} }
void disk_io_thread::set_settings(settings_pack const* pack, alert_manager& alerts) void disk_io_thread::set_settings(settings_pack const* pack, alert_manager& alerts)

View File

@ -6565,9 +6565,21 @@ namespace aux {
// decrement the refcount of the block in the disk cache // decrement the refcount of the block in the disk cache
// since the network thread doesn't need it anymore // since the network thread doesn't need it anymore
void session_impl::reclaim_block(block_cache_reference ref) void session_impl::reclaim_blocks(span<block_cache_reference> refs)
{ {
m_disk_thread.reclaim_block(ref); m_blocks_to_reclaim.insert(m_blocks_to_reclaim.end(), refs.begin(), refs.end());
if (m_pending_block_reclaim) return;
m_io_service.post(std::bind(&session_impl::do_reclaim_blocks, this));
m_pending_block_reclaim = true;
}
void session_impl::do_reclaim_blocks()
{
TORRENT_ASSERT(m_pending_block_reclaim);
m_pending_block_reclaim = false;
m_disk_thread.reclaim_blocks(m_blocks_to_reclaim);
m_blocks_to_reclaim.clear();
} }
disk_buffer_holder session_impl::allocate_disk_buffer(char const* category) disk_buffer_holder session_impl::allocate_disk_buffer(char const* category)