diff --git a/CMakeLists.txt b/CMakeLists.txt index 6ef644926..f2bc1e869 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -106,6 +106,7 @@ set(sources disk_job_pool disk_buffer_pool disk_io_thread + disk_io_thread_pool enum_net broadcast_socket magnet_uri diff --git a/Jamfile b/Jamfile index 7aae63b4f..eecd7040c 100644 --- a/Jamfile +++ b/Jamfile @@ -560,6 +560,8 @@ SOURCES = disk_buffer_holder disk_buffer_pool disk_io_job + disk_io_thread + disk_io_thread_pool disk_job_pool entry error_code @@ -627,8 +629,6 @@ SOURCES = utp_stream file_pool lsd - disk_buffer_pool - disk_io_thread enum_net broadcast_socket magnet_uri diff --git a/include/libtorrent/Makefile.am b/include/libtorrent/Makefile.am index d5b4a8d74..09fab32c4 100644 --- a/include/libtorrent/Makefile.am +++ b/include/libtorrent/Makefile.am @@ -38,6 +38,7 @@ nobase_include_HEADERS = \ disk_interface.hpp \ disk_io_job.hpp \ disk_io_thread.hpp \ + disk_io_thread_pool.hpp \ disk_observer.hpp \ disk_job_pool.hpp \ ed25519.hpp \ diff --git a/include/libtorrent/disk_io_thread.hpp b/include/libtorrent/disk_io_thread.hpp index 95de78d18..84ab78e7c 100644 --- a/include/libtorrent/disk_io_thread.hpp +++ b/include/libtorrent/disk_io_thread.hpp @@ -38,6 +38,7 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/allocator.hpp" #include "libtorrent/io_service.hpp" #include "libtorrent/sliding_average.hpp" +#include "libtorrent/disk_io_thread_pool.hpp" #include "libtorrent/disk_io_job.hpp" #include "libtorrent/disk_job_pool.hpp" #include "libtorrent/block_cache.hpp" @@ -294,7 +295,7 @@ namespace libtorrent ~disk_io_thread(); void set_settings(settings_pack const* sett, alert_manager& alerts); - void set_num_threads(int i, bool wait = true); + void set_num_threads(int i); void abort(bool wait); @@ -385,8 +386,7 @@ namespace libtorrent hasher_thread }; - void thread_fun(int thread_id, thread_type_t type - , boost::shared_ptr w); + void thread_fun(thread_type_t type, io_service::work w); virtual file_pool& files() override { return m_file_pool; } @@ -434,6 +434,31 @@ namespace libtorrent private: + struct job_queue : pool_thread_interface + { + job_queue(disk_io_thread& owner, thread_type_t type) + : m_owner(owner), m_type(type) + {} + + virtual void notify_all() override + { + m_job_cond.notify_all(); + } + + virtual void thread_fun(io_service::work work) override + { m_owner.thread_fun(m_type, work); } + + disk_io_thread& m_owner; + thread_type_t const m_type; + + // used to wake up the disk IO thread when there are new + // jobs on the job queue (m_queued_jobs) + std::condition_variable m_job_cond; + + // jobs queued for servicing + jobqueue_t m_queued_jobs; + }; + enum return_value_t { // the do_* functions can return this to indicate the disk @@ -445,6 +470,10 @@ namespace libtorrent retry_job = -201 }; + // returns true if the thread should exit + static bool wait_for_job(job_queue& jobq, disk_io_thread_pool& threads + , std::unique_lock& l); + void add_completed_job(disk_io_job* j); void add_completed_jobs(jobqueue_t& jobs); void add_completed_jobs_impl(jobqueue_t& jobs @@ -515,10 +544,11 @@ namespace libtorrent void immediate_execute(); void abort_jobs(); - // this is a counter which is atomically incremented - // by each thread as it's started up, in order to - // assign a unique id to each thread - std::atomic m_num_threads; + // returns the maximum number of threads + // the actual number of threads may be less + int num_threads() const; + job_queue& queue_for_job(disk_io_job* j); + disk_io_thread_pool& pool_for_job(disk_io_job* j); // set to true once we start shutting down std::atomic m_abort; @@ -528,8 +558,16 @@ namespace libtorrent // shutting down. This last thread is responsible for cleanup std::atomic m_num_running_threads; - // the actual threads running disk jobs - std::vector m_threads; + // std::mutex to protect the m_generic_io_jobs and m_hash_io_jobs lists + mutable std::mutex m_job_mutex; + + // most jobs are posted to m_generic_io_jobs + // but hash jobs are posted to m_hash_io_jobs if m_hash_threads + // has a non-zero maximum thread count + job_queue m_generic_io_jobs; + disk_io_thread_pool m_generic_threads; + job_queue m_hash_io_jobs; + disk_io_thread_pool m_hash_threads; aux::session_settings m_settings; @@ -579,22 +617,6 @@ namespace libtorrent // the main thread. io_service& m_ios; - // used to wake up the disk IO thread when there are new - // jobs on the job queue (m_queued_jobs) - std::condition_variable m_job_cond; - - // std::mutex to protect the m_queued_jobs list - mutable std::mutex m_job_mutex; - - // jobs queued for servicing - jobqueue_t m_queued_jobs; - - // when using more than 2 threads, this is - // used for just hashing jobs, just for threads - // dedicated to do hashing - std::condition_variable m_hash_job_cond; - jobqueue_t m_queued_hash_jobs; - // used to rate limit disk performance warnings time_point m_last_disk_aio_performance_warning; diff --git a/include/libtorrent/disk_io_thread_pool.hpp b/include/libtorrent/disk_io_thread_pool.hpp new file mode 100644 index 000000000..0ed76a21a --- /dev/null +++ b/include/libtorrent/disk_io_thread_pool.hpp @@ -0,0 +1,143 @@ +/* + +Copyright (c) 2005-2016, Arvid Norberg, Steven Siloti +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: + +* Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. +* Redistributions in binary form must reproduce the above copyright +notice, this list of conditions and the following disclaimer in +the documentation and/or other materials provided with the distribution. +* Neither the name of the author nor the names of its +contributors may be used to endorse or promote products derived +from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. + +*/ + +#ifndef TORRENT_DISK_IO_THREAD_POOL +#define TORRENT_DISK_IO_THREAD_POOL + +#include "libtorrent/config.hpp" +#include "libtorrent/export.hpp" +#include "libtorrent/deadline_timer.hpp" +#include "libtorrent/io_service_fwd.hpp" +#include "libtorrent/error_code.hpp" + +#include +#include +#include +#include + +namespace libtorrent +{ + struct pool_thread_interface + { + virtual ~pool_thread_interface() {} + + virtual void notify_all() = 0; + virtual void thread_fun(io_service::work /* work */) = 0; + }; + + // this class implements the policy for creating and destroying I/O threads + // threads are created when job_queued is called to signal the arrival of + // new jobs + // once a minute threads are destroyed if at least one thread has been + // idle for the entire minute + // the pool_thread_interface is used to spawn and notify the worker threads + struct TORRENT_EXTRA_EXPORT disk_io_thread_pool + { + disk_io_thread_pool(pool_thread_interface& thread_iface + , io_service& ios); + ~disk_io_thread_pool(); + + // set the maximum number of I/O threads which may be running + // the actual number of threads will be <= this number + void set_max_threads(int const i); + void abort(bool wait); + int max_threads() const { return m_max_threads; } + + // thread_idle, thread_active, and job_queued are NOT thread safe + // all calls to them must be serialized + // it is expected that they will be called while holding the + // job queue mutex + + // theese functions should be called by the thread_fun to signal its state + // threads are considered active when they are started so thread_idle should + // be called first + // these calls are not thread safe + void thread_idle() { ++m_num_idle_threads; } + void thread_active(); + + // check if there is an outstanding request for I/O threads to stop + // this is a weak check, if it returns true try_thread_exit may still + // return false + bool should_exit() { return m_threads_to_exit > 0; } + // this should be the last function an I/O thread calls before breaking + // out of its service loop + // if it returns true then the thread MUST exit + // if it returns false the thread should not exit + bool try_thread_exit(std::thread::id id); + + // get the thread id of the first thread in the internal vector + // since this is the first thread it will remain the same until the first + // thread exits + // it can be used to trigger maintainance jobs which should only run on one thread + std::thread::id first_thread_id(); + int num_threads() + { + std::lock_guard l(m_mutex); + return int(m_threads.size()); + } + + // this should be called whenever new jobs are queued + // queue_size is the current size of the job queue + // not thread safe + void job_queued(int queue_size); + + private: + void reap_idle_threads(error_code const& ec); + + // the caller must hold m_mutex + void stop_threads(int num_to_stop); + + pool_thread_interface& m_thread_iface; + + std::atomic m_max_threads; + // the number of threads the reaper decided should exit + std::atomic m_threads_to_exit; + + // must hold m_mutex to access + bool m_abort; + + std::atomic m_num_idle_threads; + // the minimum number of idle threads seen since the last reaping + std::atomic m_min_idle_threads; + + // ensures thread creation/destruction is atomic + std::mutex m_mutex; + + // the actual threads running disk jobs + std::vector m_threads; + + // timer to check for and reap idle threads + deadline_timer m_idle_timer; + }; +} // namespace libtorrent + +#endif diff --git a/simulation/Jamfile b/simulation/Jamfile index 7448c966f..d729cefbd 100644 --- a/simulation/Jamfile +++ b/simulation/Jamfile @@ -41,6 +41,7 @@ alias libtorrent-sims : [ run test_metadata_extension.cpp ] [ run test_trackers_extension.cpp ] [ run test_tracker.cpp ] + [ run test_thread_pool.cpp ] [ run test_ip_filter.cpp ] [ run test_dht_rate_limit.cpp ] [ run test_fast_extensions.cpp ] diff --git a/simulation/test_thread_pool.cpp b/simulation/test_thread_pool.cpp new file mode 100644 index 000000000..919c8a6ea --- /dev/null +++ b/simulation/test_thread_pool.cpp @@ -0,0 +1,215 @@ +/* + +Copyright (c) 2016, Steven Siloti +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: + +* Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. +* Redistributions in binary form must reproduce the above copyright +notice, this list of conditions and the following disclaimer in +the documentation and/or other materials provided with the distribution. +* Neither the name of the author nor the names of its +contributors may be used to endorse or promote products derived +from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. + +*/ + +#include "test.hpp" +#include "simulator/simulator.hpp" +#include "libtorrent/disk_io_thread_pool.hpp" +#include + +namespace lt = libtorrent; + +struct test_threads : lt::pool_thread_interface +{ + test_threads() + : m_active_threads(0) + {} + + virtual void notify_all() override { m_cond.notify_all(); } + virtual void thread_fun(lt::io_service::work w) override + { + std::unique_lock l(m_mutex); + for (;;) + { + m_pool->thread_idle(); + while (!m_pool->should_exit() && m_active_threads >= m_target_active_threads) + m_cond.wait(l); + m_pool->thread_active(); + + if (m_pool->try_thread_exit(std::this_thread::get_id())) + break; + + if (m_active_threads < m_target_active_threads) + { + ++m_active_threads; + while (!m_pool->should_exit() && m_active_threads <= m_target_active_threads) + m_cond.wait(l); + --m_active_threads; + } + + if (m_pool->try_thread_exit(std::this_thread::get_id())) + break; + } + + l.unlock(); + m_exit_cond.notify_all(); + } + + // change the number of active threads and wait for the threads + // to settle at the new value + void set_active_threads(int target) + { + std::unique_lock l(m_mutex); + assert(target <= m_pool->num_threads()); + m_target_active_threads = target; + while (m_active_threads != m_target_active_threads) + { + l.unlock(); + m_cond.notify_all(); + std::this_thread::yield(); + l.lock(); + } + } + + // this is to close a race between a thread exiting and a test checking the + // thread count + void wait_for_thread_exit(int num_threads) + { + std::unique_lock l(m_mutex); + m_exit_cond.wait_for(l, std::chrono::seconds(30), [&]() + { + return m_pool->num_threads() == num_threads; + }); + } + + lt::disk_io_thread_pool* m_pool; + std::mutex m_mutex; + std::condition_variable m_cond; + std::condition_variable m_exit_cond; + + // must hold m_mutex to access + int m_active_threads; + // must hold m_mutex to access + int m_target_active_threads; +}; + +TORRENT_TEST(disk_io_thread_pool_idle_reaping) +{ + sim::default_config cfg; + sim::simulation sim{ cfg }; + + test_threads threads; + sim::asio::io_service ios(sim); + lt::disk_io_thread_pool pool(threads, ios); + threads.m_pool = &pool; + pool.set_max_threads(3); + pool.job_queued(3); + TEST_EQUAL(pool.num_threads(), 3); + // make sure all the threads are up and settled in the active state + threads.set_active_threads(3); + + // first just kill one thread + threads.set_active_threads(2); + lt::deadline_timer idle_delay(ios); + // the thread will be killed the second time the reaper runs and we need + // to wait one extra minute to make sure the check runs after the reaper + idle_delay.expires_from_now(std::chrono::minutes(3)); + idle_delay.async_wait([&](lt::error_code ec) + { + // this is a kludge to work around a race between the thread + // exiting and checking the number of threads + // in production we only check num_threads from the disk I/O threads + // so there are no race problems there + threads.wait_for_thread_exit(2); + TEST_EQUAL(pool.num_threads(), 2); + sim.stop(); + }); + sim.run(); + sim.reset(); + + // now kill the rest + threads.set_active_threads(0); + idle_delay.expires_from_now(std::chrono::minutes(3)); + idle_delay.async_wait([&](lt::error_code ec) + { + // see comment above about this kludge + threads.wait_for_thread_exit(0); + TEST_EQUAL(pool.num_threads(), 0); + }); + sim.run(); +} + +TORRENT_TEST(disk_io_thread_pool_abort_wait) +{ + sim::default_config cfg; + sim::simulation sim{ cfg }; + + test_threads threads; + sim::asio::io_service ios(sim); + lt::disk_io_thread_pool pool(threads, ios); + threads.m_pool = &pool; + pool.set_max_threads(3); + pool.job_queued(3); + TEST_EQUAL(pool.num_threads(), 3); + pool.abort(true); + TEST_EQUAL(pool.num_threads(), 0); +} + +#if 0 +// disabled for now because io_service::work doesn't work under the simulator +// and we need it to stop this test from exiting prematurely +TORRENT_TEST(disk_io_thread_pool_abort_no_wait) +{ + sim::default_config cfg; + sim::simulation sim{ cfg }; + + test_threads threads; + sim::asio::io_service ios(sim); + lt::disk_io_thread_pool pool(threads, ios); + threads.m_pool = &pool; + pool.set_max_threads(3); + pool.job_queued(3); + TEST_EQUAL(pool.num_threads(), 3); + pool.abort(false); + TEST_EQUAL(pool.num_threads(), 0); + sim.run(); +} +#endif + +TORRENT_TEST(disk_io_thread_pool_max_threads) +{ + sim::default_config cfg; + sim::simulation sim{ cfg }; + + test_threads threads; + sim::asio::io_service ios(sim); + lt::disk_io_thread_pool pool(threads, ios); + threads.m_pool = &pool; + // first check that the thread limit is respected when adding jobs + pool.set_max_threads(3); + pool.job_queued(4); + TEST_EQUAL(pool.num_threads(), 3); + // now check that the number of threads is reduced when the max threads is reduced + pool.set_max_threads(2); + // see comment above about this kludge + threads.wait_for_thread_exit(2); + TEST_EQUAL(pool.num_threads(), 2); +} diff --git a/src/Makefile.am b/src/Makefile.am index 99eeba21e..aaba9405d 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -59,6 +59,7 @@ libtorrent_rasterbar_la_SOURCES = \ disk_buffer_pool.cpp \ disk_io_job.cpp \ disk_io_thread.cpp \ + disk_io_thread_pool.cpp \ disk_job_pool.cpp \ entry.cpp \ enum_net.cpp \ diff --git a/src/disk_io_thread.cpp b/src/disk_io_thread.cpp index c66c49c7c..7f524f425 100644 --- a/src/disk_io_thread.cpp +++ b/src/disk_io_thread.cpp @@ -64,6 +64,7 @@ POSSIBILITY OF SUCH DAMAGE. #define DEBUG_DISK_THREAD 0 #if DEBUG_DISK_THREAD +#include #define DLOG(...) debug_log(__VA_ARGS__) #else #define DLOG(...) do {} while(false) @@ -141,9 +142,12 @@ namespace libtorrent , counters& cnt , void* userdata , int block_size) - : m_num_threads(0) - , m_abort(false) + : m_abort(false) , m_num_running_threads(0) + , m_generic_io_jobs(*this, generic_thread) + , m_generic_threads(m_generic_io_jobs, ios) + , m_hash_io_jobs(*this, hasher_thread) + , m_hash_threads(m_hash_io_jobs, ios) , m_userdata(userdata) , m_last_cache_expiry(min_time()) , m_last_file_check(clock_type::now()) @@ -193,63 +197,26 @@ namespace libtorrent void disk_io_thread::abort(bool wait) { m_abort = true; - if (m_num_threads == 0) + if (num_threads() == 0) { abort_jobs(); } else { - set_num_threads(0, wait); + m_generic_threads.abort(wait); + m_hash_threads.abort(wait); } } // TODO: 1 it would be nice to have the number of threads be set dynamically - void disk_io_thread::set_num_threads(int const i, bool const wait) + void disk_io_thread::set_num_threads(int const i) { TORRENT_ASSERT(m_magic == 0x1337); - if (i == m_num_threads) return; - if (i > m_num_threads) - { - while (m_num_threads < i) - { - int thread_id = (++m_num_threads) - 1; - thread_type_t type = generic_thread; - - // this keeps the io_service::run() call blocked from returning. - // When shutting down, it's possible that the event queue is drained - // before the disk_io_thread has posted its last callback. When this - // happens, the io_service will have a pending callback from the - // disk_io_thread, but the event loop is not running. this means - // that the event is destructed after the disk_io_thread. If the - // event refers to a disk buffer it will try to free it, but the - // buffer pool won't exist anymore, and crash. This prevents that. - boost::shared_ptr work = - boost::make_shared(boost::ref(m_ios)); - - // the magic number 3 is also used in add_job() - // every 4:th thread is a hasher thread - if ((thread_id & 0x3) == 3) type = hasher_thread; - m_threads.emplace_back(&disk_io_thread::thread_fun, this - , thread_id, type, work); - } - } - else - { - while (m_num_threads > i) { --m_num_threads; } - std::unique_lock l(m_job_mutex); - m_job_cond.notify_all(); - m_hash_job_cond.notify_all(); - l.unlock(); - for (int j = m_num_threads; j < m_threads.size(); ++j) - { - if (wait) - m_threads[j].join(); - else - m_threads[j].detach(); - } - m_threads.resize(m_num_threads); - } + // add one hasher thread for every three generic threads + int const num_hash_threads = i / 4; + m_generic_threads.set_max_threads(i - num_hash_threads); + m_hash_threads.set_max_threads(num_hash_threads); } void disk_io_thread::reclaim_block(block_cache_reference ref) @@ -1140,6 +1107,8 @@ namespace libtorrent if (ret == retry_job) { + job_queue& q = queue_for_job(j); + std::unique_lock l2(m_job_mutex); // to avoid busy looping here, give up // our quanta in case there aren't any other @@ -1151,8 +1120,8 @@ namespace libtorrent TORRENT_ASSERT((j->flags & disk_io_job::in_progress) || !j->storage); - bool need_sleep = m_queued_jobs.empty(); - m_queued_jobs.push_back(j); + bool const need_sleep = q.m_queued_jobs.empty(); + q.m_queued_jobs.push_back(j); l2.unlock(); if (need_sleep) std::this_thread::yield(); return; @@ -1818,7 +1787,7 @@ namespace libtorrent // TODO: maybe the tailqueue_iterator should contain a pointer-pointer // instead and have an unlink function - disk_io_job* qj = m_queued_jobs.get_all(); + disk_io_job* qj = m_generic_io_jobs.m_queued_jobs.get_all(); jobqueue_t to_abort; while (qj) @@ -1830,7 +1799,7 @@ namespace libtorrent if (qj->storage.get() == storage) to_abort.push_back(qj); else - m_queued_jobs.push_back(qj); + m_generic_io_jobs.m_queued_jobs.push_back(qj); qj = next; } l2.unlock(); @@ -1899,7 +1868,7 @@ namespace libtorrent // remove outstanding hash jobs belonging to this torrent std::unique_lock l2(m_job_mutex); - disk_io_job* qj = m_queued_hash_jobs.get_all(); + disk_io_job* qj = m_hash_io_jobs.m_queued_jobs.get_all(); jobqueue_t to_abort; while (qj) @@ -1911,7 +1880,7 @@ namespace libtorrent if (qj->storage.get() == storage) to_abort.push_back(qj); else - m_queued_hash_jobs.push_back(qj); + m_hash_io_jobs.m_queued_jobs.push_back(qj); qj = next; } l2.unlock(); @@ -2720,8 +2689,8 @@ namespace libtorrent c.set_value(counters::num_read_jobs, read_jobs_in_use()); c.set_value(counters::num_write_jobs, write_jobs_in_use()); c.set_value(counters::num_jobs, jobs_in_use()); - c.set_value(counters::queued_disk_jobs, m_queued_jobs.size() - + m_queued_hash_jobs.size()); + c.set_value(counters::queued_disk_jobs, m_generic_io_jobs.m_queued_jobs.size() + + m_hash_io_jobs.m_queued_jobs.size()); jl.unlock(); @@ -2824,7 +2793,7 @@ namespace libtorrent #ifndef TORRENT_NO_DEPRECATE std::unique_lock jl(m_job_mutex); - ret->queued_jobs = m_queued_jobs.size() + m_queued_hash_jobs.size(); + ret->queued_jobs = m_generic_io_jobs.m_queued_jobs.size() + m_hash_io_jobs.m_queued_jobs.size(); jl.unlock(); #endif } @@ -3015,13 +2984,13 @@ namespace libtorrent std::unique_lock l(m_job_mutex); TORRENT_ASSERT((j->flags & disk_io_job::in_progress) || !j->storage); // prioritize fence jobs since they're blocking other jobs - m_queued_jobs.push_front(j); + m_generic_io_jobs.m_queued_jobs.push_front(j); l.unlock(); // discard the flush job free_job(fj); - if (m_num_threads == 0 && user_add) + if (num_threads() == 0 && user_add) immediate_execute(); return; @@ -3041,7 +3010,7 @@ namespace libtorrent std::unique_lock l(m_job_mutex); TORRENT_ASSERT((fj->flags & disk_io_job::in_progress) || !fj->storage); - m_queued_jobs.push_front(fj); + m_generic_io_jobs.m_queued_jobs.push_front(fj); } else { @@ -3049,7 +3018,7 @@ namespace libtorrent TORRENT_ASSERT(fj->blocked); } - if (m_num_threads == 0 && user_add) + if (num_threads() == 0 && user_add) immediate_execute(); } @@ -3072,13 +3041,13 @@ namespace libtorrent { std::unique_lock l(m_job_mutex); TORRENT_ASSERT((j->flags & disk_io_job::in_progress) || !j->storage); - m_queued_jobs.push_back(j); + m_generic_io_jobs.m_queued_jobs.push_back(j); // if we literally have 0 disk threads, we have to execute the jobs // immediately. If add job is called internally by the disk_io_thread, // we need to defer executing it. We only want the top level to loop // over the job queue (as is done below) - if (m_num_threads == 0 && user_add) + if (num_threads() == 0 && user_add) { l.unlock(); immediate_execute(); @@ -3108,33 +3077,24 @@ namespace libtorrent TORRENT_ASSERT((j->flags & disk_io_job::in_progress) || !j->storage); - // if there are at least 3 threads, there's a hasher thread - // and the hash jobs go into a separate queue - // see set_num_threads() - if (m_num_threads > 3 && j->action == disk_io_job::hash) + job_queue& q = queue_for_job(j); + q.m_queued_jobs.push_back(j); + // if we literally have 0 disk threads, we have to execute the jobs + // immediately. If add job is called internally by the disk_io_thread, + // we need to defer executing it. We only want the top level to loop + // over the job queue (as is done below) + if (pool_for_job(j).max_threads() == 0 && user_add) { - m_queued_hash_jobs.push_back(j); - } - else - { - m_queued_jobs.push_back(j); - // if we literally have 0 disk threads, we have to execute the jobs - // immediately. If add job is called internally by the disk_io_thread, - // we need to defer executing it. We only want the top level to loop - // over the job queue (as is done below) - if (m_num_threads == 0 && user_add) - { - l.unlock(); - immediate_execute(); - } + l.unlock(); + immediate_execute(); } } void disk_io_thread::immediate_execute() { - while (!m_queued_jobs.empty()) + while (!m_generic_io_jobs.m_queued_jobs.empty()) { - disk_io_job* j = m_queued_jobs.pop_front(); + disk_io_job* j = m_generic_io_jobs.m_queued_jobs.pop_front(); maybe_flush_write_blocks(); execute_job(j); } @@ -3143,10 +3103,16 @@ namespace libtorrent void disk_io_thread::submit_jobs() { std::unique_lock l(m_job_mutex); - if (!m_queued_jobs.empty()) - m_job_cond.notify_all(); - if (!m_queued_hash_jobs.empty()) - m_hash_job_cond.notify_all(); + if (!m_generic_io_jobs.m_queued_jobs.empty()) + { + m_generic_io_jobs.m_job_cond.notify_all(); + m_generic_threads.job_queued(m_generic_io_jobs.m_queued_jobs.size()); + } + if (!m_hash_io_jobs.m_queued_jobs.empty()) + { + m_hash_io_jobs.m_job_cond.notify_all(); + m_hash_threads.job_queued(m_hash_io_jobs.m_queued_jobs.size()); + } } void disk_io_thread::maybe_flush_write_blocks() @@ -3157,7 +3123,7 @@ namespace libtorrent std::unique_lock l(m_cache_mutex); DLOG("blocked_jobs: %d queued_jobs: %d num_threads %d\n" , int(m_stats_counters[counters::blocked_disk_jobs]) - , m_queued_jobs.size(), int(m_num_threads)); + , m_generic_io_jobs.m_queued_jobs.size(), int(num_threads())); m_last_cache_expiry = now; jobqueue_t completed_jobs; flush_expired_write_blocks(completed_jobs, l); @@ -3174,10 +3140,56 @@ namespace libtorrent add_completed_jobs(completed_jobs); } - void disk_io_thread::thread_fun(int thread_id, thread_type_t type - , boost::shared_ptr w) + bool disk_io_thread::wait_for_job(job_queue& jobq, disk_io_thread_pool& threads + , std::unique_lock& l) { - DLOG("started disk thread %d\n", int(thread_id)); + TORRENT_ASSERT(l.owns_lock()); + + // the thread should only go active if it is exiting or there is work to do + // if the thread goes active on every wakeup it causes the minimum idle thread + // count to be lower than it should be + // for performance reasons we also want to avoid going idle and active again + // if there is already work to do + if (jobq.m_queued_jobs.empty()) + { + threads.thread_idle(); + + do + { + // if the number of wanted threads is decreased, + // we may stop this thread + // when we're terminating the last thread, make sure + // we finish up all queued jobs first + if (threads.should_exit() + && (jobq.m_queued_jobs.empty() + || threads.num_threads() > 1) + // try_thread_exit must be the last condition + && threads.try_thread_exit(std::this_thread::get_id())) + { + // time to exit this thread. + threads.thread_active(); + return true; + } + + jobq.m_job_cond.wait(l); + } while (jobq.m_queued_jobs.empty()); + + threads.thread_active(); + } + + return false; + } + + void disk_io_thread::thread_fun(thread_type_t type + , io_service::work w) + { + std::thread::id thread_id = std::this_thread::get_id(); +#if DEBUG_DISK_THREAD + std::stringstream thread_id_str; + thread_id_str << thread_id; +#endif + + DLOG("started disk thread %s\n", thread_id_str.str().c_str()); ++m_num_running_threads; m_stats_counters.inc_stats_counter(counters::num_running_threads, 1); @@ -3188,34 +3200,22 @@ namespace libtorrent disk_io_job* j = 0; if (type == generic_thread) { - TORRENT_ASSERT(l.owns_lock()); - while (m_queued_jobs.empty() && thread_id < m_num_threads) m_job_cond.wait(l); - - // if the number of wanted threads is decreased, - // we may stop this thread - // when we're terminating the last thread (id=0), make sure - // we finish up all queued jobs first - if (thread_id >= m_num_threads && !(thread_id == 0 && m_queued_jobs.size() > 0)) - { - // time to exit this thread. - break; - } - - j = m_queued_jobs.pop_front(); + bool const should_exit = wait_for_job(m_generic_io_jobs, m_generic_threads, l); + if (should_exit) break; + j = m_generic_io_jobs.m_queued_jobs.pop_front(); } else if (type == hasher_thread) { - TORRENT_ASSERT(l.owns_lock()); - while (m_queued_hash_jobs.empty() && thread_id < m_num_threads) m_hash_job_cond.wait(l); - if (m_queued_hash_jobs.empty() && thread_id >= m_num_threads) break; - j = m_queued_hash_jobs.pop_front(); + bool const should_exit = wait_for_job(m_hash_io_jobs, m_hash_threads, l); + if (should_exit) break; + j = m_hash_io_jobs.m_queued_jobs.pop_front(); } l.unlock(); TORRENT_ASSERT((j->flags & disk_io_job::in_progress) || !j->storage); - if (thread_id == 0) + if (thread_id == m_generic_threads.first_thread_id() && type == generic_thread) { // there's no need for all threads to be doing this maybe_flush_write_blocks(); @@ -3234,8 +3234,8 @@ namespace libtorrent m_stats_counters.inc_stats_counter(counters::num_running_threads, -1); if (--m_num_running_threads > 0 || !m_abort) { - DLOG("exiting disk thread %d. num_threads: %d aborting: %d\n" - , thread_id, int(m_num_threads), int(m_abort)); + DLOG("exiting disk thread %s. num_threads: %d aborting: %d\n" + , thread_id_str.str().c_str(), int(num_threads()), int(m_abort)); TORRENT_ASSERT(m_magic == 0x1337); return; } @@ -3260,18 +3260,18 @@ namespace libtorrent } l2.unlock(); - DLOG("disk thread %d is the last one alive. cleaning up\n", thread_id); + DLOG("disk thread %s is the last one alive. cleaning up\n", thread_id_str.str().c_str()); abort_jobs(); TORRENT_ASSERT(m_magic == 0x1337); - // release the io_service to allow the run() call to return - // we do this once we stop posting new callbacks to it. COMPLETE_ASYNC("disk_io_thread::work"); - w.reset(); - // at this point, the disk_io_thread object may have been destructed. - // the call to w.reset() above is what synchronizes with the main thread + + // w's dtor releases the io_service to allow the run() call to return + // we do this once we stop posting new callbacks to it. + // after the dtor has been called, the disk_io_thread object may be destructed + TORRENT_UNUSED(w); } void disk_io_thread::abort_jobs() @@ -3299,6 +3299,27 @@ namespace libtorrent TORRENT_ASSERT(m_magic == 0x1337); } + int disk_io_thread::num_threads() const + { + return m_generic_threads.max_threads() + m_hash_threads.max_threads(); + } + + disk_io_thread::job_queue& disk_io_thread::queue_for_job(disk_io_job* j) + { + if (m_hash_threads.max_threads() > 0 && j->action == disk_io_job::hash) + return m_hash_io_jobs; + else + return m_generic_io_jobs; + } + + disk_io_thread_pool& disk_io_thread::pool_for_job(disk_io_job* j) + { + if (m_hash_threads.max_threads() > 0 && j->action == disk_io_job::hash) + return m_hash_threads; + else + return m_generic_threads; + } + // this is a callback called by the block_cache when // it's exceeding the disk cache size. void disk_io_thread::trigger_cache_trim() @@ -3469,7 +3490,7 @@ namespace libtorrent l_.unlock(); std::unique_lock l(m_job_mutex); - m_queued_jobs.append(other_jobs); + m_generic_io_jobs.m_queued_jobs.append(other_jobs); l.unlock(); while (flush_jobs.size() > 0) @@ -3478,7 +3499,10 @@ namespace libtorrent add_job(j, false); } - m_job_cond.notify_all(); + l.lock(); + m_generic_io_jobs.m_job_cond.notify_all(); + m_generic_threads.job_queued(m_generic_io_jobs.m_queued_jobs.size()); + l.unlock(); } std::unique_lock l(m_completed_jobs_mutex); diff --git a/src/disk_io_thread_pool.cpp b/src/disk_io_thread_pool.cpp new file mode 100644 index 000000000..fdb4438f2 --- /dev/null +++ b/src/disk_io_thread_pool.cpp @@ -0,0 +1,203 @@ +/* + +Copyright (c) 2005-2016, Arvid Norberg, Steven Siloti +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: + +* Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. +* Redistributions in binary form must reproduce the above copyright +notice, this list of conditions and the following disclaimer in +the documentation and/or other materials provided with the distribution. +* Neither the name of the author nor the names of its +contributors may be used to endorse or promote products derived +from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. + +*/ + +#include "libtorrent/disk_io_thread_pool.hpp" +#include "libtorrent/disk_io_thread.hpp" +#include + +namespace +{ + constexpr std::chrono::seconds reap_idle_threads_interval(60); +} + +namespace libtorrent +{ + disk_io_thread_pool::disk_io_thread_pool(pool_thread_interface& thread_iface + , io_service& ios) + : m_thread_iface(thread_iface) + , m_max_threads(0) + , m_threads_to_exit(0) + , m_abort(false) + , m_num_idle_threads(0) + , m_min_idle_threads(0) + , m_idle_timer(ios) + {} + + disk_io_thread_pool::~disk_io_thread_pool() + { + abort(true); + } + + void disk_io_thread_pool::set_max_threads(int const i) + { + std::lock_guard l(m_mutex); + if (i == m_max_threads) return; + m_max_threads = i; + if (m_threads.size() < i) return; + stop_threads(int(m_threads.size()) - i); + } + + void disk_io_thread_pool::abort(bool wait) + { + std::unique_lock l(m_mutex); + if (m_abort) return; + m_max_threads = 0; + m_abort = true; + m_idle_timer.cancel(); + stop_threads(int(m_threads.size())); + for (auto& t : m_threads) + { + if (wait) + { + // must release m_mutex to avoid a deadlock if the thread + // tries to aquire it + l.unlock(); + t.join(); + l.lock(); + } + else + t.detach(); + } + m_threads.clear(); + } + + void disk_io_thread_pool::thread_active() + { + --m_num_idle_threads; + TORRENT_ASSERT(m_num_idle_threads >= 0); + + int current_min = m_min_idle_threads; + while (m_num_idle_threads < current_min + && !m_min_idle_threads.compare_exchange_weak(current_min, m_num_idle_threads)); + } + + bool disk_io_thread_pool::try_thread_exit(std::thread::id id) + { + int to_exit = m_threads_to_exit; + while (to_exit > 0 && + !m_threads_to_exit.compare_exchange_weak(to_exit, to_exit - 1)); + if (to_exit > 0) + { + std::unique_lock l(m_mutex); + if (!m_abort) + { + auto new_end = std::remove_if(m_threads.begin(), m_threads.end() + , [id, &l](std::thread& t) + { + if (t.get_id() == id) + { + t.detach(); + return true; + } + return false; + }); + TORRENT_ASSERT(new_end != m_threads.end()); + m_threads.erase(new_end, m_threads.end()); + if (m_threads.empty()) m_idle_timer.cancel(); + } + } + return to_exit > 0; + } + + std::thread::id disk_io_thread_pool::first_thread_id() + { + std::lock_guard l(m_mutex); + if (m_threads.empty()) return std::thread::id(); + return m_threads.front().get_id(); + } + + void disk_io_thread_pool::job_queued(int queue_size) + { + // this check is not strictly necessary + // but do it to avoid aquiring the mutex in the trivial case + if (m_num_idle_threads >= queue_size) return; + std::lock_guard l(m_mutex); + if (m_abort) return; + + // reduce the number of threads requested to stop if we're going to need + // them for these new jobs + int to_exit = m_threads_to_exit; + while (to_exit > (std::max)(0, m_num_idle_threads - queue_size) && + !m_threads_to_exit.compare_exchange_weak(to_exit + , (std::max)(0, m_num_idle_threads - queue_size))); + + // now start threads until we either have enough to service + // all queued jobs without blocking or hit the max + for (int i = m_num_idle_threads + ; i < queue_size && int(m_threads.size()) < m_max_threads + ; ++i) + { + // if this is the first thread started, start the reaper timer + if (m_threads.empty()) + { + m_idle_timer.expires_from_now(reap_idle_threads_interval); + m_idle_timer.async_wait([this](error_code const& ec) { reap_idle_threads(ec); }); + } + + // work keeps the io_service::run() call blocked from returning. + // When shutting down, it's possible that the event queue is drained + // before the disk_io_thread has posted its last callback. When this + // happens, the io_service will have a pending callback from the + // disk_io_thread, but the event loop is not running. this means + // that the event is destructed after the disk_io_thread. If the + // event refers to a disk buffer it will try to free it, but the + // buffer pool won't exist anymore, and crash. This prevents that. + m_threads.emplace_back(&pool_thread_interface::thread_fun + , &m_thread_iface, io_service::work(m_idle_timer.get_io_service())); + } + } + + void disk_io_thread_pool::reap_idle_threads(error_code const& ec) + { + // take the minimum number of idle threads during the last + // sample period and request that many threads to exit + if (ec) return; + std::lock_guard l(m_mutex); + if (m_abort) return; + if (m_threads.size() == 0) return; + m_idle_timer.expires_from_now(reap_idle_threads_interval); + m_idle_timer.async_wait([this](error_code const& e) { reap_idle_threads(e); }); + int const min_idle = m_min_idle_threads.exchange(m_num_idle_threads); + if (min_idle <= 0) return; + // stop either the minimum number of idle threads or the number of threads + // which must be stopped to get below the max, whichever is larger + int const to_stop = (std::max)(min_idle, int(m_threads.size()) - m_max_threads); + stop_threads(to_stop); + } + + void disk_io_thread_pool::stop_threads(int num_to_stop) + { + m_threads_to_exit = num_to_stop; + m_thread_iface.notify_all(); + } + +} // namespace libtorrent diff --git a/test/test_storage.cpp b/test/test_storage.cpp index e6f4ea681..b702e25cf 100644 --- a/test/test_storage.cpp +++ b/test/test_storage.cpp @@ -477,7 +477,7 @@ void test_check_files(std::string const& test_path ios.reset(); run_until(ios, done); - io.set_num_threads(0); + io.abort(true); } // TODO: 2 split this test up into smaller parts