From f1585d8317ec21b55e16db0e0861499c8820355e Mon Sep 17 00:00:00 2001 From: Steven Siloti Date: Sat, 18 Jun 2016 13:53:23 -0700 Subject: [PATCH] Fix abort_jobs being called multiple times (#831) disk_io_thread::abort_jobs must only be called once, but it was possible for concurrent calls to be made from abort() and thread_fun() if the max threads had been set to zero but there were still disk I/O threads running. m_num_running_threads doesn't need to be atomic anymore --- include/libtorrent/disk_io_thread.hpp | 4 +++- src/create_torrent.cpp | 2 +- src/disk_io_thread.cpp | 34 ++++++++++++++++++++------- 3 files changed, 29 insertions(+), 11 deletions(-) diff --git a/include/libtorrent/disk_io_thread.hpp b/include/libtorrent/disk_io_thread.hpp index 1b206c0e5..5486a94a4 100644 --- a/include/libtorrent/disk_io_thread.hpp +++ b/include/libtorrent/disk_io_thread.hpp @@ -556,7 +556,8 @@ namespace libtorrent // this is a counter of how many threads are currently running. // it's used to identify the last thread still running while // shutting down. This last thread is responsible for cleanup - std::atomic m_num_running_threads; + // must hold the job mutex to access + int m_num_running_threads; // std::mutex to protect the m_generic_io_jobs and m_hash_io_jobs lists mutable std::mutex m_job_mutex; @@ -639,6 +640,7 @@ namespace libtorrent bool m_outstanding_reclaim_message; #if TORRENT_USE_ASSERTS int m_magic; + std::atomic m_jobs_aborted; #endif }; } diff --git a/src/create_torrent.cpp b/src/create_torrent.cpp index 150545be4..512428ad9 100644 --- a/src/create_torrent.cpp +++ b/src/create_torrent.cpp @@ -174,7 +174,7 @@ namespace libtorrent { // on error *ec = j->error.ec; - iothread->set_num_threads(0); + iothread->abort(true); return; } t->set_hash(j->piece, sha1_hash(j->d.piece_hash)); diff --git a/src/disk_io_thread.cpp b/src/disk_io_thread.cpp index cb242147f..63f40ae93 100644 --- a/src/disk_io_thread.cpp +++ b/src/disk_io_thread.cpp @@ -160,6 +160,7 @@ namespace libtorrent , m_outstanding_reclaim_message(false) #if TORRENT_USE_ASSERTS , m_magic(0x1337) + , m_jobs_aborted(false) #endif { ADD_OUTSTANDING_ASYNC("disk_io_thread::work"); @@ -196,16 +197,23 @@ namespace libtorrent void disk_io_thread::abort(bool wait) { - m_abort = true; - if (num_threads() == 0) + // abuse the job mutex to make setting m_abort and checking the thread count atomic + // see also the comment in thread_fun + std::unique_lock l(m_job_mutex); + if (m_abort.exchange(true)) return; + bool const no_threads = m_num_running_threads == 0; + l.unlock(); + + if (no_threads) { abort_jobs(); } - else - { - m_generic_threads.abort(wait); - m_hash_threads.abort(wait); - } + + // even if there are no threads it doesn't hurt to abort the pools + // it prevents threads from being started after an abort which is a good + // defensive programming measure + m_generic_threads.abort(wait); + m_hash_threads.abort(wait); } // TODO: 1 it would be nice to have the number of threads be set dynamically @@ -3195,10 +3203,12 @@ namespace libtorrent DLOG("started disk thread %s\n", thread_id_str.str().c_str()); + std::unique_lock l(m_job_mutex); + if (m_abort) return; + ++m_num_running_threads; m_stats_counters.inc_stats_counter(counters::num_running_threads, 1); - std::unique_lock l(m_job_mutex); for (;;) { disk_io_job* j = 0; @@ -3229,7 +3239,6 @@ namespace libtorrent l.lock(); } - l.unlock(); // do cleanup in the last running thread // if we're not aborting, that means we just configured the thread pool to @@ -3244,6 +3253,12 @@ namespace libtorrent return; } + // it is important to hold the job mutex while calling try_thread_exit() + // and continue to hold it until checking m_abort above so that abort() + // doesn't inadvertently trigger the code below when it thinks there are no + // more disk I/O threads running + l.unlock(); + // at this point, there are no queued jobs left. However, main // thread is still running and may still have peer_connections // that haven't fully destructed yet, reclaiming their references @@ -3281,6 +3296,7 @@ namespace libtorrent void disk_io_thread::abort_jobs() { TORRENT_ASSERT(m_magic == 0x1337); + TORRENT_ASSERT(!m_jobs_aborted.exchange(true)); jobqueue_t jobs; m_disk_cache.clear(jobs);