scalable thread pool (#770)

This commit is contained in:
Steven Siloti 2016-06-15 17:49:28 -07:00 committed by Arvid Norberg
parent c0fb61c5d7
commit 2fed4103f8
11 changed files with 758 additions and 147 deletions

View File

@ -106,6 +106,7 @@ set(sources
disk_job_pool
disk_buffer_pool
disk_io_thread
disk_io_thread_pool
enum_net
broadcast_socket
magnet_uri

View File

@ -560,6 +560,8 @@ SOURCES =
disk_buffer_holder
disk_buffer_pool
disk_io_job
disk_io_thread
disk_io_thread_pool
disk_job_pool
entry
error_code
@ -627,8 +629,6 @@ SOURCES =
utp_stream
file_pool
lsd
disk_buffer_pool
disk_io_thread
enum_net
broadcast_socket
magnet_uri

View File

@ -38,6 +38,7 @@ nobase_include_HEADERS = \
disk_interface.hpp \
disk_io_job.hpp \
disk_io_thread.hpp \
disk_io_thread_pool.hpp \
disk_observer.hpp \
disk_job_pool.hpp \
ed25519.hpp \

View File

@ -38,6 +38,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/allocator.hpp"
#include "libtorrent/io_service.hpp"
#include "libtorrent/sliding_average.hpp"
#include "libtorrent/disk_io_thread_pool.hpp"
#include "libtorrent/disk_io_job.hpp"
#include "libtorrent/disk_job_pool.hpp"
#include "libtorrent/block_cache.hpp"
@ -294,7 +295,7 @@ namespace libtorrent
~disk_io_thread();
void set_settings(settings_pack const* sett, alert_manager& alerts);
void set_num_threads(int i, bool wait = true);
void set_num_threads(int i);
void abort(bool wait);
@ -385,8 +386,7 @@ namespace libtorrent
hasher_thread
};
void thread_fun(int thread_id, thread_type_t type
, boost::shared_ptr<io_service::work> w);
void thread_fun(thread_type_t type, io_service::work w);
virtual file_pool& files() override { return m_file_pool; }
@ -434,6 +434,31 @@ namespace libtorrent
private:
struct job_queue : pool_thread_interface
{
job_queue(disk_io_thread& owner, thread_type_t type)
: m_owner(owner), m_type(type)
{}
virtual void notify_all() override
{
m_job_cond.notify_all();
}
virtual void thread_fun(io_service::work work) override
{ m_owner.thread_fun(m_type, work); }
disk_io_thread& m_owner;
thread_type_t const m_type;
// used to wake up the disk IO thread when there are new
// jobs on the job queue (m_queued_jobs)
std::condition_variable m_job_cond;
// jobs queued for servicing
jobqueue_t m_queued_jobs;
};
enum return_value_t
{
// the do_* functions can return this to indicate the disk
@ -445,6 +470,10 @@ namespace libtorrent
retry_job = -201
};
// returns true if the thread should exit
static bool wait_for_job(job_queue& jobq, disk_io_thread_pool& threads
, std::unique_lock<std::mutex>& l);
void add_completed_job(disk_io_job* j);
void add_completed_jobs(jobqueue_t& jobs);
void add_completed_jobs_impl(jobqueue_t& jobs
@ -515,10 +544,11 @@ namespace libtorrent
void immediate_execute();
void abort_jobs();
// this is a counter which is atomically incremented
// by each thread as it's started up, in order to
// assign a unique id to each thread
std::atomic<int> m_num_threads;
// returns the maximum number of threads
// the actual number of threads may be less
int num_threads() const;
job_queue& queue_for_job(disk_io_job* j);
disk_io_thread_pool& pool_for_job(disk_io_job* j);
// set to true once we start shutting down
std::atomic<bool> m_abort;
@ -528,8 +558,16 @@ namespace libtorrent
// shutting down. This last thread is responsible for cleanup
std::atomic<int> m_num_running_threads;
// the actual threads running disk jobs
std::vector<std::thread> m_threads;
// std::mutex to protect the m_generic_io_jobs and m_hash_io_jobs lists
mutable std::mutex m_job_mutex;
// most jobs are posted to m_generic_io_jobs
// but hash jobs are posted to m_hash_io_jobs if m_hash_threads
// has a non-zero maximum thread count
job_queue m_generic_io_jobs;
disk_io_thread_pool m_generic_threads;
job_queue m_hash_io_jobs;
disk_io_thread_pool m_hash_threads;
aux::session_settings m_settings;
@ -579,22 +617,6 @@ namespace libtorrent
// the main thread.
io_service& m_ios;
// used to wake up the disk IO thread when there are new
// jobs on the job queue (m_queued_jobs)
std::condition_variable m_job_cond;
// std::mutex to protect the m_queued_jobs list
mutable std::mutex m_job_mutex;
// jobs queued for servicing
jobqueue_t m_queued_jobs;
// when using more than 2 threads, this is
// used for just hashing jobs, just for threads
// dedicated to do hashing
std::condition_variable m_hash_job_cond;
jobqueue_t m_queued_hash_jobs;
// used to rate limit disk performance warnings
time_point m_last_disk_aio_performance_warning;

View File

@ -0,0 +1,143 @@
/*
Copyright (c) 2005-2016, Arvid Norberg, Steven Siloti
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in
the documentation and/or other materials provided with the distribution.
* Neither the name of the author nor the names of its
contributors may be used to endorse or promote products derived
from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef TORRENT_DISK_IO_THREAD_POOL
#define TORRENT_DISK_IO_THREAD_POOL
#include "libtorrent/config.hpp"
#include "libtorrent/export.hpp"
#include "libtorrent/deadline_timer.hpp"
#include "libtorrent/io_service_fwd.hpp"
#include "libtorrent/error_code.hpp"
#include <thread>
#include <mutex>
#include <atomic>
#include <functional>
namespace libtorrent
{
struct pool_thread_interface
{
virtual ~pool_thread_interface() {}
virtual void notify_all() = 0;
virtual void thread_fun(io_service::work /* work */) = 0;
};
// this class implements the policy for creating and destroying I/O threads
// threads are created when job_queued is called to signal the arrival of
// new jobs
// once a minute threads are destroyed if at least one thread has been
// idle for the entire minute
// the pool_thread_interface is used to spawn and notify the worker threads
struct TORRENT_EXTRA_EXPORT disk_io_thread_pool
{
disk_io_thread_pool(pool_thread_interface& thread_iface
, io_service& ios);
~disk_io_thread_pool();
// set the maximum number of I/O threads which may be running
// the actual number of threads will be <= this number
void set_max_threads(int const i);
void abort(bool wait);
int max_threads() const { return m_max_threads; }
// thread_idle, thread_active, and job_queued are NOT thread safe
// all calls to them must be serialized
// it is expected that they will be called while holding the
// job queue mutex
// theese functions should be called by the thread_fun to signal its state
// threads are considered active when they are started so thread_idle should
// be called first
// these calls are not thread safe
void thread_idle() { ++m_num_idle_threads; }
void thread_active();
// check if there is an outstanding request for I/O threads to stop
// this is a weak check, if it returns true try_thread_exit may still
// return false
bool should_exit() { return m_threads_to_exit > 0; }
// this should be the last function an I/O thread calls before breaking
// out of its service loop
// if it returns true then the thread MUST exit
// if it returns false the thread should not exit
bool try_thread_exit(std::thread::id id);
// get the thread id of the first thread in the internal vector
// since this is the first thread it will remain the same until the first
// thread exits
// it can be used to trigger maintainance jobs which should only run on one thread
std::thread::id first_thread_id();
int num_threads()
{
std::lock_guard<std::mutex> l(m_mutex);
return int(m_threads.size());
}
// this should be called whenever new jobs are queued
// queue_size is the current size of the job queue
// not thread safe
void job_queued(int queue_size);
private:
void reap_idle_threads(error_code const& ec);
// the caller must hold m_mutex
void stop_threads(int num_to_stop);
pool_thread_interface& m_thread_iface;
std::atomic<int> m_max_threads;
// the number of threads the reaper decided should exit
std::atomic<int> m_threads_to_exit;
// must hold m_mutex to access
bool m_abort;
std::atomic<int> m_num_idle_threads;
// the minimum number of idle threads seen since the last reaping
std::atomic<int> m_min_idle_threads;
// ensures thread creation/destruction is atomic
std::mutex m_mutex;
// the actual threads running disk jobs
std::vector<std::thread> m_threads;
// timer to check for and reap idle threads
deadline_timer m_idle_timer;
};
} // namespace libtorrent
#endif

View File

@ -41,6 +41,7 @@ alias libtorrent-sims :
[ run test_metadata_extension.cpp ]
[ run test_trackers_extension.cpp ]
[ run test_tracker.cpp ]
[ run test_thread_pool.cpp ]
[ run test_ip_filter.cpp ]
[ run test_dht_rate_limit.cpp ]
[ run test_fast_extensions.cpp ]

View File

@ -0,0 +1,215 @@
/*
Copyright (c) 2016, Steven Siloti
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in
the documentation and/or other materials provided with the distribution.
* Neither the name of the author nor the names of its
contributors may be used to endorse or promote products derived
from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
*/
#include "test.hpp"
#include "simulator/simulator.hpp"
#include "libtorrent/disk_io_thread_pool.hpp"
#include <condition_variable>
namespace lt = libtorrent;
struct test_threads : lt::pool_thread_interface
{
test_threads()
: m_active_threads(0)
{}
virtual void notify_all() override { m_cond.notify_all(); }
virtual void thread_fun(lt::io_service::work w) override
{
std::unique_lock<std::mutex> l(m_mutex);
for (;;)
{
m_pool->thread_idle();
while (!m_pool->should_exit() && m_active_threads >= m_target_active_threads)
m_cond.wait(l);
m_pool->thread_active();
if (m_pool->try_thread_exit(std::this_thread::get_id()))
break;
if (m_active_threads < m_target_active_threads)
{
++m_active_threads;
while (!m_pool->should_exit() && m_active_threads <= m_target_active_threads)
m_cond.wait(l);
--m_active_threads;
}
if (m_pool->try_thread_exit(std::this_thread::get_id()))
break;
}
l.unlock();
m_exit_cond.notify_all();
}
// change the number of active threads and wait for the threads
// to settle at the new value
void set_active_threads(int target)
{
std::unique_lock<std::mutex> l(m_mutex);
assert(target <= m_pool->num_threads());
m_target_active_threads = target;
while (m_active_threads != m_target_active_threads)
{
l.unlock();
m_cond.notify_all();
std::this_thread::yield();
l.lock();
}
}
// this is to close a race between a thread exiting and a test checking the
// thread count
void wait_for_thread_exit(int num_threads)
{
std::unique_lock<std::mutex> l(m_mutex);
m_exit_cond.wait_for(l, std::chrono::seconds(30), [&]()
{
return m_pool->num_threads() == num_threads;
});
}
lt::disk_io_thread_pool* m_pool;
std::mutex m_mutex;
std::condition_variable m_cond;
std::condition_variable m_exit_cond;
// must hold m_mutex to access
int m_active_threads;
// must hold m_mutex to access
int m_target_active_threads;
};
TORRENT_TEST(disk_io_thread_pool_idle_reaping)
{
sim::default_config cfg;
sim::simulation sim{ cfg };
test_threads threads;
sim::asio::io_service ios(sim);
lt::disk_io_thread_pool pool(threads, ios);
threads.m_pool = &pool;
pool.set_max_threads(3);
pool.job_queued(3);
TEST_EQUAL(pool.num_threads(), 3);
// make sure all the threads are up and settled in the active state
threads.set_active_threads(3);
// first just kill one thread
threads.set_active_threads(2);
lt::deadline_timer idle_delay(ios);
// the thread will be killed the second time the reaper runs and we need
// to wait one extra minute to make sure the check runs after the reaper
idle_delay.expires_from_now(std::chrono::minutes(3));
idle_delay.async_wait([&](lt::error_code ec)
{
// this is a kludge to work around a race between the thread
// exiting and checking the number of threads
// in production we only check num_threads from the disk I/O threads
// so there are no race problems there
threads.wait_for_thread_exit(2);
TEST_EQUAL(pool.num_threads(), 2);
sim.stop();
});
sim.run();
sim.reset();
// now kill the rest
threads.set_active_threads(0);
idle_delay.expires_from_now(std::chrono::minutes(3));
idle_delay.async_wait([&](lt::error_code ec)
{
// see comment above about this kludge
threads.wait_for_thread_exit(0);
TEST_EQUAL(pool.num_threads(), 0);
});
sim.run();
}
TORRENT_TEST(disk_io_thread_pool_abort_wait)
{
sim::default_config cfg;
sim::simulation sim{ cfg };
test_threads threads;
sim::asio::io_service ios(sim);
lt::disk_io_thread_pool pool(threads, ios);
threads.m_pool = &pool;
pool.set_max_threads(3);
pool.job_queued(3);
TEST_EQUAL(pool.num_threads(), 3);
pool.abort(true);
TEST_EQUAL(pool.num_threads(), 0);
}
#if 0
// disabled for now because io_service::work doesn't work under the simulator
// and we need it to stop this test from exiting prematurely
TORRENT_TEST(disk_io_thread_pool_abort_no_wait)
{
sim::default_config cfg;
sim::simulation sim{ cfg };
test_threads threads;
sim::asio::io_service ios(sim);
lt::disk_io_thread_pool pool(threads, ios);
threads.m_pool = &pool;
pool.set_max_threads(3);
pool.job_queued(3);
TEST_EQUAL(pool.num_threads(), 3);
pool.abort(false);
TEST_EQUAL(pool.num_threads(), 0);
sim.run();
}
#endif
TORRENT_TEST(disk_io_thread_pool_max_threads)
{
sim::default_config cfg;
sim::simulation sim{ cfg };
test_threads threads;
sim::asio::io_service ios(sim);
lt::disk_io_thread_pool pool(threads, ios);
threads.m_pool = &pool;
// first check that the thread limit is respected when adding jobs
pool.set_max_threads(3);
pool.job_queued(4);
TEST_EQUAL(pool.num_threads(), 3);
// now check that the number of threads is reduced when the max threads is reduced
pool.set_max_threads(2);
// see comment above about this kludge
threads.wait_for_thread_exit(2);
TEST_EQUAL(pool.num_threads(), 2);
}

View File

@ -59,6 +59,7 @@ libtorrent_rasterbar_la_SOURCES = \
disk_buffer_pool.cpp \
disk_io_job.cpp \
disk_io_thread.cpp \
disk_io_thread_pool.cpp \
disk_job_pool.cpp \
entry.cpp \
enum_net.cpp \

View File

@ -64,6 +64,7 @@ POSSIBILITY OF SUCH DAMAGE.
#define DEBUG_DISK_THREAD 0
#if DEBUG_DISK_THREAD
#include <cstdarg>
#define DLOG(...) debug_log(__VA_ARGS__)
#else
#define DLOG(...) do {} while(false)
@ -141,9 +142,12 @@ namespace libtorrent
, counters& cnt
, void* userdata
, int block_size)
: m_num_threads(0)
, m_abort(false)
: m_abort(false)
, m_num_running_threads(0)
, m_generic_io_jobs(*this, generic_thread)
, m_generic_threads(m_generic_io_jobs, ios)
, m_hash_io_jobs(*this, hasher_thread)
, m_hash_threads(m_hash_io_jobs, ios)
, m_userdata(userdata)
, m_last_cache_expiry(min_time())
, m_last_file_check(clock_type::now())
@ -193,63 +197,26 @@ namespace libtorrent
void disk_io_thread::abort(bool wait)
{
m_abort = true;
if (m_num_threads == 0)
if (num_threads() == 0)
{
abort_jobs();
}
else
{
set_num_threads(0, wait);
m_generic_threads.abort(wait);
m_hash_threads.abort(wait);
}
}
// TODO: 1 it would be nice to have the number of threads be set dynamically
void disk_io_thread::set_num_threads(int const i, bool const wait)
void disk_io_thread::set_num_threads(int const i)
{
TORRENT_ASSERT(m_magic == 0x1337);
if (i == m_num_threads) return;
if (i > m_num_threads)
{
while (m_num_threads < i)
{
int thread_id = (++m_num_threads) - 1;
thread_type_t type = generic_thread;
// this keeps the io_service::run() call blocked from returning.
// When shutting down, it's possible that the event queue is drained
// before the disk_io_thread has posted its last callback. When this
// happens, the io_service will have a pending callback from the
// disk_io_thread, but the event loop is not running. this means
// that the event is destructed after the disk_io_thread. If the
// event refers to a disk buffer it will try to free it, but the
// buffer pool won't exist anymore, and crash. This prevents that.
boost::shared_ptr<io_service::work> work =
boost::make_shared<io_service::work>(boost::ref(m_ios));
// the magic number 3 is also used in add_job()
// every 4:th thread is a hasher thread
if ((thread_id & 0x3) == 3) type = hasher_thread;
m_threads.emplace_back(&disk_io_thread::thread_fun, this
, thread_id, type, work);
}
}
else
{
while (m_num_threads > i) { --m_num_threads; }
std::unique_lock<std::mutex> l(m_job_mutex);
m_job_cond.notify_all();
m_hash_job_cond.notify_all();
l.unlock();
for (int j = m_num_threads; j < m_threads.size(); ++j)
{
if (wait)
m_threads[j].join();
else
m_threads[j].detach();
}
m_threads.resize(m_num_threads);
}
// add one hasher thread for every three generic threads
int const num_hash_threads = i / 4;
m_generic_threads.set_max_threads(i - num_hash_threads);
m_hash_threads.set_max_threads(num_hash_threads);
}
void disk_io_thread::reclaim_block(block_cache_reference ref)
@ -1140,6 +1107,8 @@ namespace libtorrent
if (ret == retry_job)
{
job_queue& q = queue_for_job(j);
std::unique_lock<std::mutex> l2(m_job_mutex);
// to avoid busy looping here, give up
// our quanta in case there aren't any other
@ -1151,8 +1120,8 @@ namespace libtorrent
TORRENT_ASSERT((j->flags & disk_io_job::in_progress) || !j->storage);
bool need_sleep = m_queued_jobs.empty();
m_queued_jobs.push_back(j);
bool const need_sleep = q.m_queued_jobs.empty();
q.m_queued_jobs.push_back(j);
l2.unlock();
if (need_sleep) std::this_thread::yield();
return;
@ -1818,7 +1787,7 @@ namespace libtorrent
// TODO: maybe the tailqueue_iterator<disk_io_job> should contain a pointer-pointer
// instead and have an unlink function
disk_io_job* qj = m_queued_jobs.get_all();
disk_io_job* qj = m_generic_io_jobs.m_queued_jobs.get_all();
jobqueue_t to_abort;
while (qj)
@ -1830,7 +1799,7 @@ namespace libtorrent
if (qj->storage.get() == storage)
to_abort.push_back(qj);
else
m_queued_jobs.push_back(qj);
m_generic_io_jobs.m_queued_jobs.push_back(qj);
qj = next;
}
l2.unlock();
@ -1899,7 +1868,7 @@ namespace libtorrent
// remove outstanding hash jobs belonging to this torrent
std::unique_lock<std::mutex> l2(m_job_mutex);
disk_io_job* qj = m_queued_hash_jobs.get_all();
disk_io_job* qj = m_hash_io_jobs.m_queued_jobs.get_all();
jobqueue_t to_abort;
while (qj)
@ -1911,7 +1880,7 @@ namespace libtorrent
if (qj->storage.get() == storage)
to_abort.push_back(qj);
else
m_queued_hash_jobs.push_back(qj);
m_hash_io_jobs.m_queued_jobs.push_back(qj);
qj = next;
}
l2.unlock();
@ -2720,8 +2689,8 @@ namespace libtorrent
c.set_value(counters::num_read_jobs, read_jobs_in_use());
c.set_value(counters::num_write_jobs, write_jobs_in_use());
c.set_value(counters::num_jobs, jobs_in_use());
c.set_value(counters::queued_disk_jobs, m_queued_jobs.size()
+ m_queued_hash_jobs.size());
c.set_value(counters::queued_disk_jobs, m_generic_io_jobs.m_queued_jobs.size()
+ m_hash_io_jobs.m_queued_jobs.size());
jl.unlock();
@ -2824,7 +2793,7 @@ namespace libtorrent
#ifndef TORRENT_NO_DEPRECATE
std::unique_lock<std::mutex> jl(m_job_mutex);
ret->queued_jobs = m_queued_jobs.size() + m_queued_hash_jobs.size();
ret->queued_jobs = m_generic_io_jobs.m_queued_jobs.size() + m_hash_io_jobs.m_queued_jobs.size();
jl.unlock();
#endif
}
@ -3015,13 +2984,13 @@ namespace libtorrent
std::unique_lock<std::mutex> l(m_job_mutex);
TORRENT_ASSERT((j->flags & disk_io_job::in_progress) || !j->storage);
// prioritize fence jobs since they're blocking other jobs
m_queued_jobs.push_front(j);
m_generic_io_jobs.m_queued_jobs.push_front(j);
l.unlock();
// discard the flush job
free_job(fj);
if (m_num_threads == 0 && user_add)
if (num_threads() == 0 && user_add)
immediate_execute();
return;
@ -3041,7 +3010,7 @@ namespace libtorrent
std::unique_lock<std::mutex> l(m_job_mutex);
TORRENT_ASSERT((fj->flags & disk_io_job::in_progress) || !fj->storage);
m_queued_jobs.push_front(fj);
m_generic_io_jobs.m_queued_jobs.push_front(fj);
}
else
{
@ -3049,7 +3018,7 @@ namespace libtorrent
TORRENT_ASSERT(fj->blocked);
}
if (m_num_threads == 0 && user_add)
if (num_threads() == 0 && user_add)
immediate_execute();
}
@ -3072,13 +3041,13 @@ namespace libtorrent
{
std::unique_lock<std::mutex> l(m_job_mutex);
TORRENT_ASSERT((j->flags & disk_io_job::in_progress) || !j->storage);
m_queued_jobs.push_back(j);
m_generic_io_jobs.m_queued_jobs.push_back(j);
// if we literally have 0 disk threads, we have to execute the jobs
// immediately. If add job is called internally by the disk_io_thread,
// we need to defer executing it. We only want the top level to loop
// over the job queue (as is done below)
if (m_num_threads == 0 && user_add)
if (num_threads() == 0 && user_add)
{
l.unlock();
immediate_execute();
@ -3108,33 +3077,24 @@ namespace libtorrent
TORRENT_ASSERT((j->flags & disk_io_job::in_progress) || !j->storage);
// if there are at least 3 threads, there's a hasher thread
// and the hash jobs go into a separate queue
// see set_num_threads()
if (m_num_threads > 3 && j->action == disk_io_job::hash)
job_queue& q = queue_for_job(j);
q.m_queued_jobs.push_back(j);
// if we literally have 0 disk threads, we have to execute the jobs
// immediately. If add job is called internally by the disk_io_thread,
// we need to defer executing it. We only want the top level to loop
// over the job queue (as is done below)
if (pool_for_job(j).max_threads() == 0 && user_add)
{
m_queued_hash_jobs.push_back(j);
}
else
{
m_queued_jobs.push_back(j);
// if we literally have 0 disk threads, we have to execute the jobs
// immediately. If add job is called internally by the disk_io_thread,
// we need to defer executing it. We only want the top level to loop
// over the job queue (as is done below)
if (m_num_threads == 0 && user_add)
{
l.unlock();
immediate_execute();
}
l.unlock();
immediate_execute();
}
}
void disk_io_thread::immediate_execute()
{
while (!m_queued_jobs.empty())
while (!m_generic_io_jobs.m_queued_jobs.empty())
{
disk_io_job* j = m_queued_jobs.pop_front();
disk_io_job* j = m_generic_io_jobs.m_queued_jobs.pop_front();
maybe_flush_write_blocks();
execute_job(j);
}
@ -3143,10 +3103,16 @@ namespace libtorrent
void disk_io_thread::submit_jobs()
{
std::unique_lock<std::mutex> l(m_job_mutex);
if (!m_queued_jobs.empty())
m_job_cond.notify_all();
if (!m_queued_hash_jobs.empty())
m_hash_job_cond.notify_all();
if (!m_generic_io_jobs.m_queued_jobs.empty())
{
m_generic_io_jobs.m_job_cond.notify_all();
m_generic_threads.job_queued(m_generic_io_jobs.m_queued_jobs.size());
}
if (!m_hash_io_jobs.m_queued_jobs.empty())
{
m_hash_io_jobs.m_job_cond.notify_all();
m_hash_threads.job_queued(m_hash_io_jobs.m_queued_jobs.size());
}
}
void disk_io_thread::maybe_flush_write_blocks()
@ -3157,7 +3123,7 @@ namespace libtorrent
std::unique_lock<std::mutex> l(m_cache_mutex);
DLOG("blocked_jobs: %d queued_jobs: %d num_threads %d\n"
, int(m_stats_counters[counters::blocked_disk_jobs])
, m_queued_jobs.size(), int(m_num_threads));
, m_generic_io_jobs.m_queued_jobs.size(), int(num_threads()));
m_last_cache_expiry = now;
jobqueue_t completed_jobs;
flush_expired_write_blocks(completed_jobs, l);
@ -3174,10 +3140,56 @@ namespace libtorrent
add_completed_jobs(completed_jobs);
}
void disk_io_thread::thread_fun(int thread_id, thread_type_t type
, boost::shared_ptr<io_service::work> w)
bool disk_io_thread::wait_for_job(job_queue& jobq, disk_io_thread_pool& threads
, std::unique_lock<std::mutex>& l)
{
DLOG("started disk thread %d\n", int(thread_id));
TORRENT_ASSERT(l.owns_lock());
// the thread should only go active if it is exiting or there is work to do
// if the thread goes active on every wakeup it causes the minimum idle thread
// count to be lower than it should be
// for performance reasons we also want to avoid going idle and active again
// if there is already work to do
if (jobq.m_queued_jobs.empty())
{
threads.thread_idle();
do
{
// if the number of wanted threads is decreased,
// we may stop this thread
// when we're terminating the last thread, make sure
// we finish up all queued jobs first
if (threads.should_exit()
&& (jobq.m_queued_jobs.empty()
|| threads.num_threads() > 1)
// try_thread_exit must be the last condition
&& threads.try_thread_exit(std::this_thread::get_id()))
{
// time to exit this thread.
threads.thread_active();
return true;
}
jobq.m_job_cond.wait(l);
} while (jobq.m_queued_jobs.empty());
threads.thread_active();
}
return false;
}
void disk_io_thread::thread_fun(thread_type_t type
, io_service::work w)
{
std::thread::id thread_id = std::this_thread::get_id();
#if DEBUG_DISK_THREAD
std::stringstream thread_id_str;
thread_id_str << thread_id;
#endif
DLOG("started disk thread %s\n", thread_id_str.str().c_str());
++m_num_running_threads;
m_stats_counters.inc_stats_counter(counters::num_running_threads, 1);
@ -3188,34 +3200,22 @@ namespace libtorrent
disk_io_job* j = 0;
if (type == generic_thread)
{
TORRENT_ASSERT(l.owns_lock());
while (m_queued_jobs.empty() && thread_id < m_num_threads) m_job_cond.wait(l);
// if the number of wanted threads is decreased,
// we may stop this thread
// when we're terminating the last thread (id=0), make sure
// we finish up all queued jobs first
if (thread_id >= m_num_threads && !(thread_id == 0 && m_queued_jobs.size() > 0))
{
// time to exit this thread.
break;
}
j = m_queued_jobs.pop_front();
bool const should_exit = wait_for_job(m_generic_io_jobs, m_generic_threads, l);
if (should_exit) break;
j = m_generic_io_jobs.m_queued_jobs.pop_front();
}
else if (type == hasher_thread)
{
TORRENT_ASSERT(l.owns_lock());
while (m_queued_hash_jobs.empty() && thread_id < m_num_threads) m_hash_job_cond.wait(l);
if (m_queued_hash_jobs.empty() && thread_id >= m_num_threads) break;
j = m_queued_hash_jobs.pop_front();
bool const should_exit = wait_for_job(m_hash_io_jobs, m_hash_threads, l);
if (should_exit) break;
j = m_hash_io_jobs.m_queued_jobs.pop_front();
}
l.unlock();
TORRENT_ASSERT((j->flags & disk_io_job::in_progress) || !j->storage);
if (thread_id == 0)
if (thread_id == m_generic_threads.first_thread_id() && type == generic_thread)
{
// there's no need for all threads to be doing this
maybe_flush_write_blocks();
@ -3234,8 +3234,8 @@ namespace libtorrent
m_stats_counters.inc_stats_counter(counters::num_running_threads, -1);
if (--m_num_running_threads > 0 || !m_abort)
{
DLOG("exiting disk thread %d. num_threads: %d aborting: %d\n"
, thread_id, int(m_num_threads), int(m_abort));
DLOG("exiting disk thread %s. num_threads: %d aborting: %d\n"
, thread_id_str.str().c_str(), int(num_threads()), int(m_abort));
TORRENT_ASSERT(m_magic == 0x1337);
return;
}
@ -3260,18 +3260,18 @@ namespace libtorrent
}
l2.unlock();
DLOG("disk thread %d is the last one alive. cleaning up\n", thread_id);
DLOG("disk thread %s is the last one alive. cleaning up\n", thread_id_str.str().c_str());
abort_jobs();
TORRENT_ASSERT(m_magic == 0x1337);
// release the io_service to allow the run() call to return
// we do this once we stop posting new callbacks to it.
COMPLETE_ASYNC("disk_io_thread::work");
w.reset();
// at this point, the disk_io_thread object may have been destructed.
// the call to w.reset() above is what synchronizes with the main thread
// w's dtor releases the io_service to allow the run() call to return
// we do this once we stop posting new callbacks to it.
// after the dtor has been called, the disk_io_thread object may be destructed
TORRENT_UNUSED(w);
}
void disk_io_thread::abort_jobs()
@ -3299,6 +3299,27 @@ namespace libtorrent
TORRENT_ASSERT(m_magic == 0x1337);
}
int disk_io_thread::num_threads() const
{
return m_generic_threads.max_threads() + m_hash_threads.max_threads();
}
disk_io_thread::job_queue& disk_io_thread::queue_for_job(disk_io_job* j)
{
if (m_hash_threads.max_threads() > 0 && j->action == disk_io_job::hash)
return m_hash_io_jobs;
else
return m_generic_io_jobs;
}
disk_io_thread_pool& disk_io_thread::pool_for_job(disk_io_job* j)
{
if (m_hash_threads.max_threads() > 0 && j->action == disk_io_job::hash)
return m_hash_threads;
else
return m_generic_threads;
}
// this is a callback called by the block_cache when
// it's exceeding the disk cache size.
void disk_io_thread::trigger_cache_trim()
@ -3469,7 +3490,7 @@ namespace libtorrent
l_.unlock();
std::unique_lock<std::mutex> l(m_job_mutex);
m_queued_jobs.append(other_jobs);
m_generic_io_jobs.m_queued_jobs.append(other_jobs);
l.unlock();
while (flush_jobs.size() > 0)
@ -3478,7 +3499,10 @@ namespace libtorrent
add_job(j, false);
}
m_job_cond.notify_all();
l.lock();
m_generic_io_jobs.m_job_cond.notify_all();
m_generic_threads.job_queued(m_generic_io_jobs.m_queued_jobs.size());
l.unlock();
}
std::unique_lock<std::mutex> l(m_completed_jobs_mutex);

203
src/disk_io_thread_pool.cpp Normal file
View File

@ -0,0 +1,203 @@
/*
Copyright (c) 2005-2016, Arvid Norberg, Steven Siloti
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in
the documentation and/or other materials provided with the distribution.
* Neither the name of the author nor the names of its
contributors may be used to endorse or promote products derived
from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
*/
#include "libtorrent/disk_io_thread_pool.hpp"
#include "libtorrent/disk_io_thread.hpp"
#include <functional>
namespace
{
constexpr std::chrono::seconds reap_idle_threads_interval(60);
}
namespace libtorrent
{
disk_io_thread_pool::disk_io_thread_pool(pool_thread_interface& thread_iface
, io_service& ios)
: m_thread_iface(thread_iface)
, m_max_threads(0)
, m_threads_to_exit(0)
, m_abort(false)
, m_num_idle_threads(0)
, m_min_idle_threads(0)
, m_idle_timer(ios)
{}
disk_io_thread_pool::~disk_io_thread_pool()
{
abort(true);
}
void disk_io_thread_pool::set_max_threads(int const i)
{
std::lock_guard<std::mutex> l(m_mutex);
if (i == m_max_threads) return;
m_max_threads = i;
if (m_threads.size() < i) return;
stop_threads(int(m_threads.size()) - i);
}
void disk_io_thread_pool::abort(bool wait)
{
std::unique_lock<std::mutex> l(m_mutex);
if (m_abort) return;
m_max_threads = 0;
m_abort = true;
m_idle_timer.cancel();
stop_threads(int(m_threads.size()));
for (auto& t : m_threads)
{
if (wait)
{
// must release m_mutex to avoid a deadlock if the thread
// tries to aquire it
l.unlock();
t.join();
l.lock();
}
else
t.detach();
}
m_threads.clear();
}
void disk_io_thread_pool::thread_active()
{
--m_num_idle_threads;
TORRENT_ASSERT(m_num_idle_threads >= 0);
int current_min = m_min_idle_threads;
while (m_num_idle_threads < current_min
&& !m_min_idle_threads.compare_exchange_weak(current_min, m_num_idle_threads));
}
bool disk_io_thread_pool::try_thread_exit(std::thread::id id)
{
int to_exit = m_threads_to_exit;
while (to_exit > 0 &&
!m_threads_to_exit.compare_exchange_weak(to_exit, to_exit - 1));
if (to_exit > 0)
{
std::unique_lock<std::mutex> l(m_mutex);
if (!m_abort)
{
auto new_end = std::remove_if(m_threads.begin(), m_threads.end()
, [id, &l](std::thread& t)
{
if (t.get_id() == id)
{
t.detach();
return true;
}
return false;
});
TORRENT_ASSERT(new_end != m_threads.end());
m_threads.erase(new_end, m_threads.end());
if (m_threads.empty()) m_idle_timer.cancel();
}
}
return to_exit > 0;
}
std::thread::id disk_io_thread_pool::first_thread_id()
{
std::lock_guard<std::mutex> l(m_mutex);
if (m_threads.empty()) return std::thread::id();
return m_threads.front().get_id();
}
void disk_io_thread_pool::job_queued(int queue_size)
{
// this check is not strictly necessary
// but do it to avoid aquiring the mutex in the trivial case
if (m_num_idle_threads >= queue_size) return;
std::lock_guard<std::mutex> l(m_mutex);
if (m_abort) return;
// reduce the number of threads requested to stop if we're going to need
// them for these new jobs
int to_exit = m_threads_to_exit;
while (to_exit > (std::max)(0, m_num_idle_threads - queue_size) &&
!m_threads_to_exit.compare_exchange_weak(to_exit
, (std::max)(0, m_num_idle_threads - queue_size)));
// now start threads until we either have enough to service
// all queued jobs without blocking or hit the max
for (int i = m_num_idle_threads
; i < queue_size && int(m_threads.size()) < m_max_threads
; ++i)
{
// if this is the first thread started, start the reaper timer
if (m_threads.empty())
{
m_idle_timer.expires_from_now(reap_idle_threads_interval);
m_idle_timer.async_wait([this](error_code const& ec) { reap_idle_threads(ec); });
}
// work keeps the io_service::run() call blocked from returning.
// When shutting down, it's possible that the event queue is drained
// before the disk_io_thread has posted its last callback. When this
// happens, the io_service will have a pending callback from the
// disk_io_thread, but the event loop is not running. this means
// that the event is destructed after the disk_io_thread. If the
// event refers to a disk buffer it will try to free it, but the
// buffer pool won't exist anymore, and crash. This prevents that.
m_threads.emplace_back(&pool_thread_interface::thread_fun
, &m_thread_iface, io_service::work(m_idle_timer.get_io_service()));
}
}
void disk_io_thread_pool::reap_idle_threads(error_code const& ec)
{
// take the minimum number of idle threads during the last
// sample period and request that many threads to exit
if (ec) return;
std::lock_guard<std::mutex> l(m_mutex);
if (m_abort) return;
if (m_threads.size() == 0) return;
m_idle_timer.expires_from_now(reap_idle_threads_interval);
m_idle_timer.async_wait([this](error_code const& e) { reap_idle_threads(e); });
int const min_idle = m_min_idle_threads.exchange(m_num_idle_threads);
if (min_idle <= 0) return;
// stop either the minimum number of idle threads or the number of threads
// which must be stopped to get below the max, whichever is larger
int const to_stop = (std::max)(min_idle, int(m_threads.size()) - m_max_threads);
stop_threads(to_stop);
}
void disk_io_thread_pool::stop_threads(int num_to_stop)
{
m_threads_to_exit = num_to_stop;
m_thread_iface.notify_all();
}
} // namespace libtorrent

View File

@ -477,7 +477,7 @@ void test_check_files(std::string const& test_path
ios.reset();
run_until(ios, done);
io.set_num_threads(0);
io.abort(true);
}
// TODO: 2 split this test up into smaller parts