From b43015217486c306e09478e3a264d6892a0f0af0 Mon Sep 17 00:00:00 2001 From: Arvid Norberg Date: Wed, 19 Oct 2011 05:46:49 +0000 Subject: [PATCH] post disk job completions in batches to save CPU in the network thread --- include/libtorrent/disk_io_thread.hpp | 9 ++-- src/disk_io_thread.cpp | 63 +++++++++++++++++++-------- 2 files changed, 52 insertions(+), 20 deletions(-) diff --git a/include/libtorrent/disk_io_thread.hpp b/include/libtorrent/disk_io_thread.hpp index dd5540e6a..38ef5a5bf 100644 --- a/include/libtorrent/disk_io_thread.hpp +++ b/include/libtorrent/disk_io_thread.hpp @@ -318,8 +318,7 @@ namespace libtorrent = boost::function()); bool test_error(disk_io_job& j); - void post_callback(boost::function const& handler - , disk_io_job const& j, int ret); + void post_callback(disk_io_job const& j, int ret); // cache operations cache_piece_index_t::iterator find_cached_piece( @@ -376,7 +375,7 @@ namespace libtorrent // read cache cache_t m_read_pieces; - void flip_stats(); + void flip_stats(ptime now); // total number of blocks in use by both the read // and the write cache. This is not supposed to @@ -441,6 +440,10 @@ namespace libtorrent // the session_impl object file_pool& m_file_pool; + // when completion notifications are queued, they're stuck + // in this list + std::list > m_queued_completions; + // thread for performing blocking disk io operations thread m_disk_io_thread; }; diff --git a/src/disk_io_thread.cpp b/src/disk_io_thread.cpp index bd0a13d4d..0e5ac40d9 100644 --- a/src/disk_io_thread.cpp +++ b/src/disk_io_thread.cpp @@ -124,7 +124,7 @@ namespace libtorrent return !m_exceeded_write_queue; } - void disk_io_thread::flip_stats() + void disk_io_thread::flip_stats(ptime now) { // calling mean() will actually reset the accumulators m_cache_stats.average_queue_time = m_queue_time.mean(); @@ -133,7 +133,8 @@ namespace libtorrent m_cache_stats.average_hash_time = m_hash_time.mean(); m_cache_stats.average_job_time = m_job_time.mean(); m_cache_stats.average_sort_time = m_sort_time.mean(); - m_last_stats_flip = time_now(); + + m_last_stats_flip = now; } void disk_io_thread::get_cache_info(sha1_hash const& ih, std::vector& ret) const @@ -209,7 +210,7 @@ namespace libtorrent TORRENT_ASSERT(m_queue_buffer_size >= i->buffer_size); m_queue_buffer_size -= i->buffer_size; } - post_callback(i->callback, *i, -3); + post_callback(*i, -3); i = m_jobs.erase(i); continue; } @@ -663,8 +664,9 @@ namespace libtorrent j.buffer_size = (std::min)(piece_size - i * m_block_size, m_block_size); int result = j.error ? -1 : j.buffer_size; j.offset = i * m_block_size; + j.callback = p.blocks[i].callback; buffers.push_back(p.blocks[i].buf); - post_callback(p.blocks[i].callback, j, result); + post_callback(j, result); p.blocks[i].callback.clear(); p.blocks[i].buf = 0; ++ret; @@ -1338,13 +1340,27 @@ namespace libtorrent return false; } - void disk_io_thread::post_callback( - boost::function const& handler - , disk_io_job const& j, int ret) + typedef std::list > job_queue_t; + void completion_queue_handler(job_queue_t* completed_jobs) { - if (!handler) return; + boost::shared_ptr holder(completed_jobs); - m_ios.post(boost::bind(handler, ret, j)); + for (job_queue_t::iterator i = completed_jobs->begin() + , end(completed_jobs->end()); i != end; ++i) + { + TORRENT_TRY + { + i->first.callback(i->second, i->first); + } + TORRENT_CATCH(std::exception& e) + {} + } + } + + void disk_io_thread::post_callback(disk_io_job const& j, int ret) + { + if (!j.callback) return; + m_queued_completions.push_back(std::make_pair(j, ret)); } enum action_flags_t @@ -1446,8 +1462,19 @@ namespace libtorrent #ifdef TORRENT_DISK_STATS m_log << log_time() << " idle" << std::endl; #endif + + mutex::scoped_lock jl(m_queue_mutex); + if (m_queued_completions.size() >= 30 || (m_jobs.empty() && !m_queued_completions.empty())) + { + job_queue_t* q = new job_queue_t; + q->swap(m_queued_completions); + m_ios.post(boost::bind(completion_queue_handler, q)); + } + + + ptime job_start; while (m_jobs.empty() && m_sorted_read_jobs.empty() && !m_abort) { // if there hasn't been an event in one second @@ -1457,7 +1484,8 @@ namespace libtorrent m_signal.wait(jl); m_signal.clear(jl); - if (time_now() > m_last_stats_flip + seconds(1)) flip_stats(); + job_start = time_now(); + if (job_start >= m_last_stats_flip + seconds(1)) flip_stats(job_start); } if (m_abort && m_jobs.empty()) @@ -1494,7 +1522,6 @@ namespace libtorrent disk_io_job j; ptime now = time_now_hires(); - m_queue_time.add_sample(total_microseconds(now - j.start_time)); ptime operation_start = now; // make sure we don't starve out the read queue by just issuing @@ -1654,6 +1681,8 @@ namespace libtorrent m_sorted_read_jobs.erase(to_erase); } + m_queue_time.add_sample(total_microseconds(now - j.start_time)); + // if there's a buffer in this job, it will be freed // when this holder is destructed, unless it has been // released. @@ -1744,7 +1773,7 @@ namespace libtorrent TORRENT_ASSERT(m_queue_buffer_size >= i->buffer_size); m_queue_buffer_size -= i->buffer_size; } - post_callback(i->callback, *i, -3); + post_callback(*i, -3); i = m_jobs.erase(i); continue; } @@ -1759,7 +1788,7 @@ namespace libtorrent ++i; continue; } - post_callback(i->second.callback, i->second, -3); + post_callback(i->second, -3); if (elevator_job_pos == i) ++elevator_job_pos; m_sorted_read_jobs.erase(i++); } @@ -1806,7 +1835,7 @@ namespace libtorrent TORRENT_ASSERT(m_queue_buffer_size >= i->buffer_size); m_queue_buffer_size -= i->buffer_size; } - post_callback(i->callback, *i, -3); + post_callback(*i, -3); i = m_jobs.erase(i); continue; } @@ -1822,7 +1851,7 @@ namespace libtorrent ++i; continue; } - post_callback(i->second.callback, i->second, -3); + post_callback(i->second, -3); if (elevator_job_pos == i) ++elevator_job_pos; m_sorted_read_jobs.erase(i++); } @@ -2328,7 +2357,7 @@ namespace libtorrent TORRENT_TRY { TORRENT_ASSERT(j.callback); if (j.callback && ret == piece_manager::need_full_check) - post_callback(j.callback, j, ret); + post_callback(j, ret); } TORRENT_CATCH(std::exception&) {} if (ret != piece_manager::need_full_check) break; } @@ -2399,7 +2428,7 @@ namespace libtorrent && j.buffer != 0) rename_buffer(j.buffer, "posted send buffer"); #endif - post_callback(j.callback, j, ret); + post_callback(j, ret); } TORRENT_CATCH(std::exception&) { TORRENT_ASSERT(false); }