From 5cb095f6d29d2271acec5af58d579bf5da9c8a5a Mon Sep 17 00:00:00 2001 From: arvidn Date: Sun, 14 Jun 2015 16:00:04 -0400 Subject: [PATCH] support using 0 disk threads (to perform disk I/O in network thread) --- ChangeLog | 1 + include/libtorrent/aux_/session_impl.hpp | 1 + include/libtorrent/disk_io_thread.hpp | 30 ++-- src/block_cache.cpp | 6 +- src/create_torrent.cpp | 5 +- src/disk_io_thread.cpp | 199 +++++++++++++++++------ src/session_impl.cpp | 13 +- test/test_storage.cpp | 1 + 8 files changed, 180 insertions(+), 76 deletions(-) diff --git a/ChangeLog b/ChangeLog index 0e8ff86ba..0b938ded7 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,4 @@ + * support using 0 disk threads (to perform disk I/O in network thread) * removed deprecated handle_alert template * enable logging build config by default (but alert mask disabled by default) * deprecated RSS API diff --git a/include/libtorrent/aux_/session_impl.hpp b/include/libtorrent/aux_/session_impl.hpp index ea004600c..d997c99c0 100644 --- a/include/libtorrent/aux_/session_impl.hpp +++ b/include/libtorrent/aux_/session_impl.hpp @@ -475,6 +475,7 @@ namespace libtorrent disk_interface& disk_thread() { return m_disk_thread; } void abort(); + void abort_stage2(); torrent_handle find_torrent_handle(sha1_hash const& info_hash); diff --git a/include/libtorrent/disk_io_thread.hpp b/include/libtorrent/disk_io_thread.hpp index 3a44215ec..e1d4c3d25 100644 --- a/include/libtorrent/disk_io_thread.hpp +++ b/include/libtorrent/disk_io_thread.hpp @@ -293,6 +293,8 @@ namespace libtorrent void set_settings(settings_pack const* sett, alert_manager& alerts); void set_num_threads(int i, bool wait = true); + void abort(bool wait); + void async_read(piece_manager* storage, peer_request const& r , boost::function const& handler, void* requester , int flags = 0); @@ -382,7 +384,8 @@ namespace libtorrent hasher_thread }; - void thread_fun(int thread_id, thread_type_t type); + void thread_fun(int thread_id, thread_type_t type + , boost::shared_ptr w); file_pool& files() { return m_file_pool; } @@ -454,8 +457,9 @@ namespace libtorrent void perform_job(disk_io_job* j, tailqueue& completed_jobs); // this queues up another job to be submitted - void add_job(disk_io_job* j); - void add_fence_job(piece_manager* storage, disk_io_job* j); + void add_job(disk_io_job* j, bool user_add = true); + void add_fence_job(piece_manager* storage, disk_io_job* j + , bool user_add = true); // assumes l is locked (cache mutex). // writes out the blocks [start, end) (releases the lock @@ -505,11 +509,19 @@ namespace libtorrent // used to batch reclaiming of blocks to once per cycle void commit_reclaimed_blocks(); + void maybe_flush_write_blocks(); + void execute_job(disk_io_job* j); + void immediate_execute(); + void abort_jobs(); + // this is a counter which is atomically incremented // by each thread as it's started up, in order to // assign a unique id to each thread boost::atomic m_num_threads; + // set to true once we start shutting down + boost::atomic m_abort; + // this is a counter of how many threads are currently running. // it's used to identify the last thread still running while // shutting down. This last thread is responsible for cleanup @@ -559,18 +571,6 @@ namespace libtorrent // the main thread. io_service& m_ios; - // this keeps the io_service::run() call blocked from - // returning. When shutting down, it's possible that - // the event queue is drained before the disk_io_thread - // has posted its last callback. When this happens, the - // io_service will have a pending callback from the - // disk_io_thread, but the event loop is not running. - // this means that the event is destructed after the - // disk_io_thread. If the event refers to a disk buffer - // it will try to free it, but the buffer pool won't - // exist anymore, and crash. This prevents that. - boost::optional m_work; - // used to wake up the disk IO thread when there are new // jobs on the job queue (m_queued_jobs) condition_variable m_job_cond; diff --git a/src/block_cache.cpp b/src/block_cache.cpp index ce409803d..cdc420452 100644 --- a/src/block_cache.cpp +++ b/src/block_cache.cpp @@ -1721,7 +1721,7 @@ int block_cache::copy_from_piece(cached_piece_entry* pe, disk_io_job* j // the cache, and we're not currently reading it in either // since it's not pending - if (inc_block_refcount(pe, start_block, ref_reading) == false) + if (inc_block_refcount(pe, start_block, ref_reading) == false) { TORRENT_ASSERT(!expect_no_fail); return -1; @@ -1734,7 +1734,9 @@ int block_cache::copy_from_piece(cached_piece_entry* pe, disk_io_job* j { // special case for block aligned request // don't actually copy the buffer, just reference - // the existing block + // the existing block. Which means we don't want to decrement the + // refcount, we're handing the ownership of the reference to the calling + // thread. cached_block_entry& bl = pe->blocks[start_block]; // make sure it didn't wrap diff --git a/src/create_torrent.cpp b/src/create_torrent.cpp index 5fa213e94..e069b231d 100644 --- a/src/create_torrent.cpp +++ b/src/create_torrent.cpp @@ -184,7 +184,7 @@ namespace libtorrent } else { - iothread->set_num_threads(0); + iothread->abort(true); } iothread->submit_jobs(); } @@ -211,7 +211,7 @@ namespace libtorrent add_files_impl(fs, parent_path(complete(utf8)) , filename(utf8), default_pred, flags); } - + void set_piece_hashes(create_torrent& t, std::wstring const& p , boost::function f, error_code& ec) { @@ -264,6 +264,7 @@ namespace libtorrent boost::shared_ptr dummy; counters cnt; disk_io_thread disk_thread(ios, cnt, 0); + disk_thread.set_num_threads(1); storage_params params; params.files = &t.files(); diff --git a/src/disk_io_thread.cpp b/src/disk_io_thread.cpp index ff1329d76..2e2909d55 100644 --- a/src/disk_io_thread.cpp +++ b/src/disk_io_thread.cpp @@ -89,6 +89,7 @@ namespace libtorrent { #if DEBUG_DISK_THREAD static mutex log_mutex; + static const time_point start = clock_type::now(); va_list v; va_start(v, fmt); @@ -105,7 +106,8 @@ namespace libtorrent } va_end(v); char buf[2300]; - snprintf(buf, sizeof(buf), "%s: [%p] %s", time_now_string(), pthread_self(), usr); + int t = total_milliseconds(clock_type::now() - start); + snprintf(buf, sizeof(buf), "%05d: [%p] %s", t, pthread_self(), usr); prepend_time = (usr[len-1] == '\n'); mutex::scoped_lock l(log_mutex); fputs(buf, stderr); @@ -132,6 +134,7 @@ namespace libtorrent , void* userdata , int block_size) : m_num_threads(0) + , m_abort(false) , m_num_running_threads(0) , m_userdata(userdata) , m_last_cache_expiry(min_time()) @@ -140,7 +143,6 @@ namespace libtorrent , m_disk_cache(block_size, ios, boost::bind(&disk_io_thread::trigger_cache_trim, this)) , m_stats_counters(cnt) , m_ios(ios) - , m_work(io_service::work(m_ios)) , m_last_disk_aio_performance_warning(min_time()) , m_outstanding_reclaim_message(false) #if TORRENT_USE_ASSERTS @@ -173,8 +175,6 @@ namespace libtorrent m_file_pool.resize((std::min)(m_file_pool.size_limit(), int(rl.rlim_cur * 2 / 10))); } #endif // TORRENT_USE_RLIMIT - - set_num_threads(1); } disk_io_thread::~disk_io_thread() @@ -203,6 +203,19 @@ namespace libtorrent #endif } + void disk_io_thread::abort(bool wait) + { + m_abort = true; + if (m_num_threads == 0) + { + abort_jobs(); + } + else + { + set_num_threads(0, wait); + } + } + // TODO: 1 it would be nice to have the number of threads be set dynamically void disk_io_thread::set_num_threads(int i, bool wait) { @@ -216,11 +229,23 @@ namespace libtorrent int thread_id = (++m_num_threads) - 1; thread_type_t type = generic_thread; + // this keeps the io_service::run() call blocked from returning. + // When shutting down, it's possible that the event queue is drained + // before the disk_io_thread has posted its last callback. When this + // happens, the io_service will have a pending callback from the + // disk_io_thread, but the event loop is not running. this means + // that the event is destructed after the disk_io_thread. If the + // event refers to a disk buffer it will try to free it, but the + // buffer pool won't exist anymore, and crash. This prevents that. + boost::shared_ptr work = + boost::make_shared(m_ios); + // the magic number 3 is also used in add_job() // every 4:th thread is a hasher thread if ((thread_id & 0x3) == 3) type = hasher_thread; m_threads.push_back(boost::shared_ptr( - new thread(boost::bind(&disk_io_thread::thread_fun, this, thread_id, type)))); + new thread(boost::bind(&disk_io_thread::thread_fun, this + , thread_id, type, work)))); } } else @@ -1337,7 +1362,7 @@ namespace libtorrent TORRENT_PIECE_ASSERT(pe->outstanding_read == 1, pe); // if we're shutting down, just cancel the jobs - if (m_num_threads == 0) + if (m_abort) { fail_jobs_impl(storage_error(boost::asio::error::operation_aborted) , pe->read_jobs, completed_jobs); @@ -1400,7 +1425,7 @@ namespace libtorrent if (next_job) { - add_job(next_job); + add_job(next_job, false); } else { @@ -1419,13 +1444,13 @@ namespace libtorrent file::iovec_t b = { j->buffer.disk_block, size_t(j->d.io.buffer_size) }; int file_flags = file_flags_for_job(j); - + m_stats_counters.inc_stats_counter(counters::num_writing_threads, 1); // the actual write operation int ret = j->storage->get_storage_impl()->writev(&b, 1 , j->piece, j->d.io.offset, file_flags, j->error); - + m_stats_counters.inc_stats_counter(counters::num_writing_threads, -1); if (!j->error.ec) @@ -1767,12 +1792,13 @@ namespace libtorrent #if TORRENT_USE_ASSERTS ++pe->hash_passes; #endif - + l.unlock(); if (handler) handler(j); free_job(j); return; } + l.unlock(); add_job(j); } @@ -1995,7 +2021,7 @@ namespace libtorrent j->piece = piece; j->callback = handler; - if (m_num_threads == 0) + if (m_abort) { j->error.ec = boost::asio::error::operation_aborted; if (handler) handler(j); @@ -2087,7 +2113,7 @@ namespace libtorrent add_fence_job(storage, j); } - void disk_io_thread::clear_piece(piece_manager* storage, int index) + void disk_io_thread::clear_piece(piece_manager* storage, int index) { mutex::scoped_lock l(m_cache_mutex); @@ -3030,12 +3056,13 @@ namespace libtorrent return j->storage->get_storage_impl()->tick(); } - void disk_io_thread::add_fence_job(piece_manager* storage, disk_io_job* j) + void disk_io_thread::add_fence_job(piece_manager* storage, disk_io_job* j + , bool user_add) { // if this happens, it means we started to shut down // the disk threads too early. We have to post all jobs // before the disk threads are shut down - TORRENT_ASSERT(m_num_threads > 0); + TORRENT_ASSERT(!m_abort); DLOG("add_fence:job: %s (outstanding: %d)\n" , job_action_name[j->action] @@ -3057,6 +3084,10 @@ namespace libtorrent // discard the flush job free_job(fj); + + if (m_num_threads == 0 && user_add) + immediate_execute(); + return; } @@ -3065,7 +3096,7 @@ namespace libtorrent // flush of all those jobs now. Only write jobs linger, those are the // jobs that needs to be kicked TORRENT_ASSERT(j->blocked); - + if (ret == disk_job_fence::fence_post_flush) { // now, we have to make sure that all outstanding jobs on this @@ -3081,9 +3112,12 @@ namespace libtorrent TORRENT_ASSERT((fj->flags & disk_io_job::in_progress) == 0); TORRENT_ASSERT(fj->blocked); } + + if (m_num_threads == 0 && user_add) + immediate_execute(); } - void disk_io_thread::add_job(disk_io_job* j) + void disk_io_thread::add_job(disk_io_job* j, bool user_add) { TORRENT_ASSERT(m_magic == 0x1337); @@ -3092,7 +3126,7 @@ namespace libtorrent // if this happens, it means we started to shut down // the disk threads too early. We have to post all jobs // before the disk threads are shut down - TORRENT_ASSERT(m_num_threads > 0 + TORRENT_ASSERT(!m_abort || j->action == disk_io_job::flush_piece || j->action == disk_io_job::trim_cache); @@ -3103,6 +3137,16 @@ namespace libtorrent mutex::scoped_lock l(m_job_mutex); TORRENT_ASSERT((j->flags & disk_io_job::in_progress) || !j->storage); m_queued_jobs.push_back(j); + + // if we literally have 0 disk threads, we have to execute the jobs + // immediately. If add job is called internally by the disk_io_thread, + // we need to defer executing it. We only want the top level to loop + // over the job queue (as is done below) + if (m_num_threads == 0 && user_add) + { + l.unlock(); + immediate_execute(); + } return; } @@ -3127,14 +3171,37 @@ namespace libtorrent mutex::scoped_lock l(m_job_mutex); TORRENT_ASSERT((j->flags & disk_io_job::in_progress) || !j->storage); - + // if there are at least 3 threads, there's a hasher thread // and the hash jobs go into a separate queue // see set_num_threads() if (m_num_threads > 3 && j->action == disk_io_job::hash) + { m_queued_hash_jobs.push_back(j); + } else + { m_queued_jobs.push_back(j); + // if we literally have 0 disk threads, we have to execute the jobs + // immediately. If add job is called internally by the disk_io_thread, + // we need to defer executing it. We only want the top level to loop + // over the job queue (as is done below) + if (m_num_threads == 0 && user_add) + { + l.unlock(); + immediate_execute(); + } + } + } + + void disk_io_thread::immediate_execute() + { + while (!m_queued_jobs.empty()) + { + disk_io_job* j = (disk_io_job*)m_queued_jobs.pop_front(); + maybe_flush_write_blocks(); + execute_job(j); + } } void disk_io_thread::submit_jobs() @@ -3146,7 +3213,33 @@ namespace libtorrent m_hash_job_cond.notify_all(); } - void disk_io_thread::thread_fun(int thread_id, thread_type_t type) + void disk_io_thread::maybe_flush_write_blocks() + { + time_point now = clock_type::now(); + if (now <= m_last_cache_expiry + seconds(5)) return; + + mutex::scoped_lock l(m_cache_mutex); + DLOG("blocked_jobs: %d queued_jobs: %d num_threads %d\n" + , int(m_stats_counters[counters::blocked_disk_jobs]) + , m_queued_jobs.size(), int(m_num_threads)); + m_last_cache_expiry = now; + tailqueue completed_jobs; + flush_expired_write_blocks(completed_jobs, l); + l.unlock(); + if (completed_jobs.size()) + add_completed_jobs(completed_jobs); + } + + void disk_io_thread::execute_job(disk_io_job* j) + { + tailqueue completed_jobs; + perform_job(j, completed_jobs); + if (completed_jobs.size()) + add_completed_jobs(completed_jobs); + } + + void disk_io_thread::thread_fun(int thread_id, thread_type_t type + , boost::shared_ptr w) { DLOG("started disk thread %d\n", int(thread_id)); @@ -3189,41 +3282,24 @@ namespace libtorrent if (thread_id == 0) { // there's no need for all threads to be doing this - time_point now = clock_type::now(); - if (now > m_last_cache_expiry + seconds(5)) - { - mutex::scoped_lock l2(m_cache_mutex); - DLOG("blocked_jobs: %d queued_jobs: %d num_threads %d\n" - , int(m_stats_counters[counters::blocked_disk_jobs]) - , m_queued_jobs.size(), int(m_num_threads)); - m_last_cache_expiry = now; - tailqueue completed_jobs; - flush_expired_write_blocks(completed_jobs, l2); - l2.unlock(); - if (completed_jobs.size()) - add_completed_jobs(completed_jobs); - } + maybe_flush_write_blocks(); } - tailqueue completed_jobs; - perform_job(j, completed_jobs); - - mutex::scoped_lock l2(m_cache_mutex); - check_cache_level(l2, completed_jobs); - l2.unlock(); - - if (completed_jobs.size()) - add_completed_jobs(completed_jobs); + execute_job(j); l.lock(); } l.unlock(); - // do cleanup in the last running thread + // do cleanup in the last running thread + // if we're not aborting, that means we just configured the thread pool to + // not have any threads (i.e. perform all disk operations in the network + // thread). In this case, the cleanup will happen in abort(). m_stats_counters.inc_stats_counter(counters::num_running_threads, -1); - if (--m_num_running_threads > 0) + if (--m_num_running_threads > 0 || !m_abort) { - DLOG("exiting disk thread %d. num_threads: %d\n", thread_id, int(m_num_threads)); + DLOG("exiting disk thread %d. num_threads: %d aborting: %d\n" + , thread_id, int(m_num_threads), int(m_abort)); TORRENT_ASSERT(m_magic == 0x1337); return; } @@ -3234,7 +3310,12 @@ namespace libtorrent // to read blocks in the disk cache. We need to wait until all // references are removed from other threads before we can go // ahead with the cleanup. + // This is not supposed to happen because the disk thread is now scheduled + // for shut down after all peers have shut down (see + // session_impl::abort_stage2()). mutex::scoped_lock l2(m_cache_mutex); + TORRENT_ASSERT_VAL(m_disk_cache.pinned_blocks() == 0 + , m_disk_cache.pinned_blocks()); while (m_disk_cache.pinned_blocks() > 0) { l2.unlock(); @@ -3245,8 +3326,23 @@ namespace libtorrent DLOG("disk thread %d is the last one alive. cleaning up\n", thread_id); - tailqueue jobs; + abort_jobs(); + // release the io_service to allow the run() call to return + // we do this once we stop posting new callbacks to it. +#if defined TORRENT_ASIO_DEBUGGING + complete_async("disk_io_thread::work"); +#endif + w.reset(); + + TORRENT_ASSERT(m_magic == 0x1337); + } + + void disk_io_thread::abort_jobs() + { + TORRENT_ASSERT(m_magic == 0x1337); + + tailqueue jobs; m_disk_cache.clear(jobs); fail_jobs(storage_error(boost::asio::error::operation_aborted), jobs); @@ -3263,12 +3359,7 @@ namespace libtorrent = m_disk_cache.all_pieces(); TORRENT_ASSERT(pieces.first == pieces.second); #endif - // release the io_service to allow the run() call to return - // we do this once we stop posting new callbacks to it. -#if defined TORRENT_ASIO_DEBUGGING - complete_async("disk_io_thread::work"); -#endif - m_work.reset(); + TORRENT_ASSERT(m_magic == 0x1337); } @@ -3278,7 +3369,7 @@ namespace libtorrent { // we just exceeded the cache size limit. Trigger a trim job disk_io_job* j = allocate_job(disk_io_job::trim_cache); - add_job(j); + add_job(j, false); submit_jobs(); } @@ -3451,7 +3542,7 @@ namespace libtorrent while (flush_jobs.size() > 0) { disk_io_job* j = (disk_io_job*)flush_jobs.pop_front(); - add_job(j); + add_job(j, false); } m_job_cond.notify_all(); diff --git a/src/session_impl.cpp b/src/session_impl.cpp index 3f0a642d4..b5c83781f 100644 --- a/src/session_impl.cpp +++ b/src/session_impl.cpp @@ -1083,10 +1083,17 @@ namespace aux { m_undead_peers.clear(); + // give all the sockets an opportunity to actually have their handlers + // called and cancelled before we continue the shutdown + m_io_service.post(boost::bind(&session_impl::abort_stage2, this)); + } + + void session_impl::abort_stage2() + { // it's OK to detach the threads here. The disk_io_thread // has an internal counter and won't release the network // thread until they're all dead (via m_work). - m_disk_thread.set_num_threads(0, false); + m_disk_thread.abort(false); // now it's OK for the network thread to exit m_work.reset(); @@ -5819,8 +5826,8 @@ retry: void session_impl::update_disk_threads() { - if (m_settings.get_int(settings_pack::aio_threads) < 1) - m_settings.set_int(settings_pack::aio_threads, 1); + if (m_settings.get_int(settings_pack::aio_threads) < 0) + m_settings.set_int(settings_pack::aio_threads, 0); #if !TORRENT_USE_PREAD && !TORRENT_USE_PREADV // if we don't have pread() nor preadv() there's no way diff --git a/test/test_storage.cpp b/test/test_storage.cpp index 32d8ba116..3a8d6a063 100644 --- a/test/test_storage.cpp +++ b/test/test_storage.cpp @@ -449,6 +449,7 @@ void test_check_files(std::string const& test_path boost::asio::io_service ios; counters cnt; disk_io_thread io(ios, cnt, NULL); + io.set_num_threads(1); disk_buffer_pool dp(16 * 1024, ios, boost::bind(&nop)); storage_params p; p.files = &fs;