simplify disk thread a little bit (#1504)

simplify disk thread a little bit
This commit is contained in:
Arvid Norberg 2017-01-08 19:22:59 -05:00 committed by GitHub
parent 224db08997
commit 1cf3689578
5 changed files with 27 additions and 41 deletions

View File

@ -351,14 +351,6 @@ namespace libtorrent
{ return m_disk_cache.is_disk_buffer(buffer); } { return m_disk_cache.is_disk_buffer(buffer); }
#endif #endif
enum class thread_type_t : std::uint8_t
{
generic,
hasher
};
void thread_fun(thread_type_t type, io_service::work w);
virtual file_pool& files() override { return m_file_pool; } virtual file_pool& files() override { return m_file_pool; }
int prep_read_job_impl(disk_io_job* j, bool check_fence = true); int prep_read_job_impl(disk_io_job* j, bool check_fence = true);
@ -395,20 +387,24 @@ namespace libtorrent
struct job_queue : pool_thread_interface struct job_queue : pool_thread_interface
{ {
job_queue(disk_io_thread& owner, thread_type_t type) explicit job_queue(disk_io_thread& owner) : m_owner(owner) {}
: m_owner(owner), m_type(type)
{}
virtual void notify_all() override virtual void notify_all() override
{ {
m_job_cond.notify_all(); m_job_cond.notify_all();
} }
virtual void thread_fun(io_service::work work) override void thread_fun(disk_io_thread_pool& pool, io_service::work work) override
{ m_owner.thread_fun(m_type, work); } {
m_owner.thread_fun(*this, pool);
// w's dtor releases the io_service to allow the run() call to return
// we do this once we stop posting new callbacks to it.
// after the dtor has been called, the disk_io_thread object may be destructed
TORRENT_UNUSED(work);
}
disk_io_thread& m_owner; disk_io_thread& m_owner;
thread_type_t const m_type;
// used to wake up the disk IO thread when there are new // used to wake up the disk IO thread when there are new
// jobs on the job queue (m_queued_jobs) // jobs on the job queue (m_queued_jobs)
@ -418,6 +414,8 @@ namespace libtorrent
jobqueue_t m_queued_jobs; jobqueue_t m_queued_jobs;
}; };
void thread_fun(job_queue& queue, disk_io_thread_pool& pool);
// returns true if the thread should exit // returns true if the thread should exit
static bool wait_for_job(job_queue& jobq, disk_io_thread_pool& threads static bool wait_for_job(job_queue& jobq, disk_io_thread_pool& threads
, std::unique_lock<std::mutex>& l); , std::unique_lock<std::mutex>& l);

View File

@ -45,12 +45,14 @@ POSSIBILITY OF SUCH DAMAGE.
namespace libtorrent namespace libtorrent
{ {
struct disk_io_thread_pool;
struct pool_thread_interface struct pool_thread_interface
{ {
virtual ~pool_thread_interface() {} virtual ~pool_thread_interface() {}
virtual void notify_all() = 0; virtual void notify_all() = 0;
virtual void thread_fun(io_service::work /* work */) = 0; virtual void thread_fun(disk_io_thread_pool&, io_service::work) = 0;
}; };
// this class implements the policy for creating and destroying I/O threads // this class implements the policy for creating and destroying I/O threads

View File

@ -42,7 +42,7 @@ struct test_threads : lt::pool_thread_interface
test_threads() {} test_threads() {}
void notify_all() override { m_cond.notify_all(); } void notify_all() override { m_cond.notify_all(); }
void thread_fun(lt::io_service::work w) override void thread_fun(lt::disk_io_thread_pool&, lt::io_service::work) override
{ {
std::unique_lock<std::mutex> l(m_mutex); std::unique_lock<std::mutex> l(m_mutex);
for (;;) for (;;)

View File

@ -191,9 +191,9 @@ namespace libtorrent
disk_io_thread::disk_io_thread(io_service& ios disk_io_thread::disk_io_thread(io_service& ios
, counters& cnt , counters& cnt
, int const block_size) , int const block_size)
: m_generic_io_jobs(*this, thread_type_t::generic) : m_generic_io_jobs(*this)
, m_generic_threads(m_generic_io_jobs, ios) , m_generic_threads(m_generic_io_jobs, ios)
, m_hash_io_jobs(*this, thread_type_t::hasher) , m_hash_io_jobs(*this)
, m_hash_threads(m_hash_io_jobs, ios) , m_hash_threads(m_hash_io_jobs, ios)
, m_disk_cache(block_size, ios, std::bind(&disk_io_thread::trigger_cache_trim, this)) , m_disk_cache(block_size, ios, std::bind(&disk_io_thread::trigger_cache_trim, this))
, m_stats_counters(cnt) , m_stats_counters(cnt)
@ -3045,10 +3045,10 @@ namespace libtorrent
return false; return false;
} }
void disk_io_thread::thread_fun(thread_type_t type void disk_io_thread::thread_fun(job_queue& queue
, io_service::work w) , disk_io_thread_pool& pool)
{ {
std::thread::id thread_id = std::this_thread::get_id(); std::thread::id const thread_id = std::this_thread::get_id();
#if DEBUG_DISK_THREAD #if DEBUG_DISK_THREAD
std::stringstream thread_id_str; std::stringstream thread_id_str;
thread_id_str << thread_id; thread_id_str << thread_id;
@ -3065,24 +3065,14 @@ namespace libtorrent
for (;;) for (;;)
{ {
disk_io_job* j = nullptr; disk_io_job* j = nullptr;
if (type == thread_type_t::generic) bool const should_exit = wait_for_job(queue, pool, l);
{
bool const should_exit = wait_for_job(m_generic_io_jobs, m_generic_threads, l);
if (should_exit) break; if (should_exit) break;
j = m_generic_io_jobs.m_queued_jobs.pop_front(); j = queue.m_queued_jobs.pop_front();
}
else if (type == thread_type_t::hasher)
{
bool const should_exit = wait_for_job(m_hash_io_jobs, m_hash_threads, l);
if (should_exit) break;
j = m_hash_io_jobs.m_queued_jobs.pop_front();
}
l.unlock(); l.unlock();
TORRENT_ASSERT((j->flags & disk_io_job::in_progress) || !j->storage); TORRENT_ASSERT((j->flags & disk_io_job::in_progress) || !j->storage);
if (thread_id == m_generic_threads.first_thread_id() && type == thread_type_t::generic) if (&pool == &m_generic_threads && thread_id == pool.first_thread_id())
{ {
// there's no need for all threads to be doing this // there's no need for all threads to be doing this
maybe_flush_write_blocks(); maybe_flush_write_blocks();
@ -3147,11 +3137,6 @@ namespace libtorrent
TORRENT_ASSERT(m_magic == 0x1337); TORRENT_ASSERT(m_magic == 0x1337);
COMPLETE_ASYNC("disk_io_thread::work"); COMPLETE_ASYNC("disk_io_thread::work");
// w's dtor releases the io_service to allow the run() call to return
// we do this once we stop posting new callbacks to it.
// after the dtor has been called, the disk_io_thread object may be destructed
TORRENT_UNUSED(w);
} }
void disk_io_thread::abort_jobs() void disk_io_thread::abort_jobs()

View File

@ -173,7 +173,8 @@ namespace libtorrent
// event refers to a disk buffer it will try to free it, but the // event refers to a disk buffer it will try to free it, but the
// buffer pool won't exist anymore, and crash. This prevents that. // buffer pool won't exist anymore, and crash. This prevents that.
m_threads.emplace_back(&pool_thread_interface::thread_fun m_threads.emplace_back(&pool_thread_interface::thread_fun
, &m_thread_iface, io_service::work(m_idle_timer.get_io_service())); , &m_thread_iface, std::ref(*this)
, io_service::work(m_idle_timer.get_io_service()));
} }
} }