support using 0 disk threads (to perform disk I/O in network thread)

This commit is contained in:
arvidn 2015-06-14 16:00:04 -04:00
parent db76ba759d
commit 5cb095f6d2
8 changed files with 180 additions and 76 deletions

View File

@ -1,3 +1,4 @@
* support using 0 disk threads (to perform disk I/O in network thread)
* removed deprecated handle_alert template
* enable logging build config by default (but alert mask disabled by default)
* deprecated RSS API

View File

@ -475,6 +475,7 @@ namespace libtorrent
disk_interface& disk_thread() { return m_disk_thread; }
void abort();
void abort_stage2();
torrent_handle find_torrent_handle(sha1_hash const& info_hash);

View File

@ -293,6 +293,8 @@ namespace libtorrent
void set_settings(settings_pack const* sett, alert_manager& alerts);
void set_num_threads(int i, bool wait = true);
void abort(bool wait);
void async_read(piece_manager* storage, peer_request const& r
, boost::function<void(disk_io_job const*)> const& handler, void* requester
, int flags = 0);
@ -382,7 +384,8 @@ namespace libtorrent
hasher_thread
};
void thread_fun(int thread_id, thread_type_t type);
void thread_fun(int thread_id, thread_type_t type
, boost::shared_ptr<io_service::work> w);
file_pool& files() { return m_file_pool; }
@ -454,8 +457,9 @@ namespace libtorrent
void perform_job(disk_io_job* j, tailqueue& completed_jobs);
// this queues up another job to be submitted
void add_job(disk_io_job* j);
void add_fence_job(piece_manager* storage, disk_io_job* j);
void add_job(disk_io_job* j, bool user_add = true);
void add_fence_job(piece_manager* storage, disk_io_job* j
, bool user_add = true);
// assumes l is locked (cache mutex).
// writes out the blocks [start, end) (releases the lock
@ -505,11 +509,19 @@ namespace libtorrent
// used to batch reclaiming of blocks to once per cycle
void commit_reclaimed_blocks();
void maybe_flush_write_blocks();
void execute_job(disk_io_job* j);
void immediate_execute();
void abort_jobs();
// this is a counter which is atomically incremented
// by each thread as it's started up, in order to
// assign a unique id to each thread
boost::atomic<int> m_num_threads;
// set to true once we start shutting down
boost::atomic<bool> m_abort;
// this is a counter of how many threads are currently running.
// it's used to identify the last thread still running while
// shutting down. This last thread is responsible for cleanup
@ -559,18 +571,6 @@ namespace libtorrent
// the main thread.
io_service& m_ios;
// this keeps the io_service::run() call blocked from
// returning. When shutting down, it's possible that
// the event queue is drained before the disk_io_thread
// has posted its last callback. When this happens, the
// io_service will have a pending callback from the
// disk_io_thread, but the event loop is not running.
// this means that the event is destructed after the
// disk_io_thread. If the event refers to a disk buffer
// it will try to free it, but the buffer pool won't
// exist anymore, and crash. This prevents that.
boost::optional<io_service::work> m_work;
// used to wake up the disk IO thread when there are new
// jobs on the job queue (m_queued_jobs)
condition_variable m_job_cond;

View File

@ -1721,7 +1721,7 @@ int block_cache::copy_from_piece(cached_piece_entry* pe, disk_io_job* j
// the cache, and we're not currently reading it in either
// since it's not pending
if (inc_block_refcount(pe, start_block, ref_reading) == false)
if (inc_block_refcount(pe, start_block, ref_reading) == false)
{
TORRENT_ASSERT(!expect_no_fail);
return -1;
@ -1734,7 +1734,9 @@ int block_cache::copy_from_piece(cached_piece_entry* pe, disk_io_job* j
{
// special case for block aligned request
// don't actually copy the buffer, just reference
// the existing block
// the existing block. Which means we don't want to decrement the
// refcount, we're handing the ownership of the reference to the calling
// thread.
cached_block_entry& bl = pe->blocks[start_block];
// make sure it didn't wrap

View File

@ -184,7 +184,7 @@ namespace libtorrent
}
else
{
iothread->set_num_threads(0);
iothread->abort(true);
}
iothread->submit_jobs();
}
@ -211,7 +211,7 @@ namespace libtorrent
add_files_impl(fs, parent_path(complete(utf8))
, filename(utf8), default_pred, flags);
}
void set_piece_hashes(create_torrent& t, std::wstring const& p
, boost::function<void(int)> f, error_code& ec)
{
@ -264,6 +264,7 @@ namespace libtorrent
boost::shared_ptr<char> dummy;
counters cnt;
disk_io_thread disk_thread(ios, cnt, 0);
disk_thread.set_num_threads(1);
storage_params params;
params.files = &t.files();

View File

@ -89,6 +89,7 @@ namespace libtorrent
{
#if DEBUG_DISK_THREAD
static mutex log_mutex;
static const time_point start = clock_type::now();
va_list v;
va_start(v, fmt);
@ -105,7 +106,8 @@ namespace libtorrent
}
va_end(v);
char buf[2300];
snprintf(buf, sizeof(buf), "%s: [%p] %s", time_now_string(), pthread_self(), usr);
int t = total_milliseconds(clock_type::now() - start);
snprintf(buf, sizeof(buf), "%05d: [%p] %s", t, pthread_self(), usr);
prepend_time = (usr[len-1] == '\n');
mutex::scoped_lock l(log_mutex);
fputs(buf, stderr);
@ -132,6 +134,7 @@ namespace libtorrent
, void* userdata
, int block_size)
: m_num_threads(0)
, m_abort(false)
, m_num_running_threads(0)
, m_userdata(userdata)
, m_last_cache_expiry(min_time())
@ -140,7 +143,6 @@ namespace libtorrent
, m_disk_cache(block_size, ios, boost::bind(&disk_io_thread::trigger_cache_trim, this))
, m_stats_counters(cnt)
, m_ios(ios)
, m_work(io_service::work(m_ios))
, m_last_disk_aio_performance_warning(min_time())
, m_outstanding_reclaim_message(false)
#if TORRENT_USE_ASSERTS
@ -173,8 +175,6 @@ namespace libtorrent
m_file_pool.resize((std::min)(m_file_pool.size_limit(), int(rl.rlim_cur * 2 / 10)));
}
#endif // TORRENT_USE_RLIMIT
set_num_threads(1);
}
disk_io_thread::~disk_io_thread()
@ -203,6 +203,19 @@ namespace libtorrent
#endif
}
void disk_io_thread::abort(bool wait)
{
m_abort = true;
if (m_num_threads == 0)
{
abort_jobs();
}
else
{
set_num_threads(0, wait);
}
}
// TODO: 1 it would be nice to have the number of threads be set dynamically
void disk_io_thread::set_num_threads(int i, bool wait)
{
@ -216,11 +229,23 @@ namespace libtorrent
int thread_id = (++m_num_threads) - 1;
thread_type_t type = generic_thread;
// this keeps the io_service::run() call blocked from returning.
// When shutting down, it's possible that the event queue is drained
// before the disk_io_thread has posted its last callback. When this
// happens, the io_service will have a pending callback from the
// disk_io_thread, but the event loop is not running. this means
// that the event is destructed after the disk_io_thread. If the
// event refers to a disk buffer it will try to free it, but the
// buffer pool won't exist anymore, and crash. This prevents that.
boost::shared_ptr<io_service::work> work =
boost::make_shared<io_service::work>(m_ios);
// the magic number 3 is also used in add_job()
// every 4:th thread is a hasher thread
if ((thread_id & 0x3) == 3) type = hasher_thread;
m_threads.push_back(boost::shared_ptr<thread>(
new thread(boost::bind(&disk_io_thread::thread_fun, this, thread_id, type))));
new thread(boost::bind(&disk_io_thread::thread_fun, this
, thread_id, type, work))));
}
}
else
@ -1337,7 +1362,7 @@ namespace libtorrent
TORRENT_PIECE_ASSERT(pe->outstanding_read == 1, pe);
// if we're shutting down, just cancel the jobs
if (m_num_threads == 0)
if (m_abort)
{
fail_jobs_impl(storage_error(boost::asio::error::operation_aborted)
, pe->read_jobs, completed_jobs);
@ -1400,7 +1425,7 @@ namespace libtorrent
if (next_job)
{
add_job(next_job);
add_job(next_job, false);
}
else
{
@ -1419,13 +1444,13 @@ namespace libtorrent
file::iovec_t b = { j->buffer.disk_block, size_t(j->d.io.buffer_size) };
int file_flags = file_flags_for_job(j);
m_stats_counters.inc_stats_counter(counters::num_writing_threads, 1);
// the actual write operation
int ret = j->storage->get_storage_impl()->writev(&b, 1
, j->piece, j->d.io.offset, file_flags, j->error);
m_stats_counters.inc_stats_counter(counters::num_writing_threads, -1);
if (!j->error.ec)
@ -1767,12 +1792,13 @@ namespace libtorrent
#if TORRENT_USE_ASSERTS
++pe->hash_passes;
#endif
l.unlock();
if (handler) handler(j);
free_job(j);
return;
}
l.unlock();
add_job(j);
}
@ -1995,7 +2021,7 @@ namespace libtorrent
j->piece = piece;
j->callback = handler;
if (m_num_threads == 0)
if (m_abort)
{
j->error.ec = boost::asio::error::operation_aborted;
if (handler) handler(j);
@ -2087,7 +2113,7 @@ namespace libtorrent
add_fence_job(storage, j);
}
void disk_io_thread::clear_piece(piece_manager* storage, int index)
void disk_io_thread::clear_piece(piece_manager* storage, int index)
{
mutex::scoped_lock l(m_cache_mutex);
@ -3030,12 +3056,13 @@ namespace libtorrent
return j->storage->get_storage_impl()->tick();
}
void disk_io_thread::add_fence_job(piece_manager* storage, disk_io_job* j)
void disk_io_thread::add_fence_job(piece_manager* storage, disk_io_job* j
, bool user_add)
{
// if this happens, it means we started to shut down
// the disk threads too early. We have to post all jobs
// before the disk threads are shut down
TORRENT_ASSERT(m_num_threads > 0);
TORRENT_ASSERT(!m_abort);
DLOG("add_fence:job: %s (outstanding: %d)\n"
, job_action_name[j->action]
@ -3057,6 +3084,10 @@ namespace libtorrent
// discard the flush job
free_job(fj);
if (m_num_threads == 0 && user_add)
immediate_execute();
return;
}
@ -3065,7 +3096,7 @@ namespace libtorrent
// flush of all those jobs now. Only write jobs linger, those are the
// jobs that needs to be kicked
TORRENT_ASSERT(j->blocked);
if (ret == disk_job_fence::fence_post_flush)
{
// now, we have to make sure that all outstanding jobs on this
@ -3081,9 +3112,12 @@ namespace libtorrent
TORRENT_ASSERT((fj->flags & disk_io_job::in_progress) == 0);
TORRENT_ASSERT(fj->blocked);
}
if (m_num_threads == 0 && user_add)
immediate_execute();
}
void disk_io_thread::add_job(disk_io_job* j)
void disk_io_thread::add_job(disk_io_job* j, bool user_add)
{
TORRENT_ASSERT(m_magic == 0x1337);
@ -3092,7 +3126,7 @@ namespace libtorrent
// if this happens, it means we started to shut down
// the disk threads too early. We have to post all jobs
// before the disk threads are shut down
TORRENT_ASSERT(m_num_threads > 0
TORRENT_ASSERT(!m_abort
|| j->action == disk_io_job::flush_piece
|| j->action == disk_io_job::trim_cache);
@ -3103,6 +3137,16 @@ namespace libtorrent
mutex::scoped_lock l(m_job_mutex);
TORRENT_ASSERT((j->flags & disk_io_job::in_progress) || !j->storage);
m_queued_jobs.push_back(j);
// if we literally have 0 disk threads, we have to execute the jobs
// immediately. If add job is called internally by the disk_io_thread,
// we need to defer executing it. We only want the top level to loop
// over the job queue (as is done below)
if (m_num_threads == 0 && user_add)
{
l.unlock();
immediate_execute();
}
return;
}
@ -3127,14 +3171,37 @@ namespace libtorrent
mutex::scoped_lock l(m_job_mutex);
TORRENT_ASSERT((j->flags & disk_io_job::in_progress) || !j->storage);
// if there are at least 3 threads, there's a hasher thread
// and the hash jobs go into a separate queue
// see set_num_threads()
if (m_num_threads > 3 && j->action == disk_io_job::hash)
{
m_queued_hash_jobs.push_back(j);
}
else
{
m_queued_jobs.push_back(j);
// if we literally have 0 disk threads, we have to execute the jobs
// immediately. If add job is called internally by the disk_io_thread,
// we need to defer executing it. We only want the top level to loop
// over the job queue (as is done below)
if (m_num_threads == 0 && user_add)
{
l.unlock();
immediate_execute();
}
}
}
void disk_io_thread::immediate_execute()
{
while (!m_queued_jobs.empty())
{
disk_io_job* j = (disk_io_job*)m_queued_jobs.pop_front();
maybe_flush_write_blocks();
execute_job(j);
}
}
void disk_io_thread::submit_jobs()
@ -3146,7 +3213,33 @@ namespace libtorrent
m_hash_job_cond.notify_all();
}
void disk_io_thread::thread_fun(int thread_id, thread_type_t type)
void disk_io_thread::maybe_flush_write_blocks()
{
time_point now = clock_type::now();
if (now <= m_last_cache_expiry + seconds(5)) return;
mutex::scoped_lock l(m_cache_mutex);
DLOG("blocked_jobs: %d queued_jobs: %d num_threads %d\n"
, int(m_stats_counters[counters::blocked_disk_jobs])
, m_queued_jobs.size(), int(m_num_threads));
m_last_cache_expiry = now;
tailqueue completed_jobs;
flush_expired_write_blocks(completed_jobs, l);
l.unlock();
if (completed_jobs.size())
add_completed_jobs(completed_jobs);
}
void disk_io_thread::execute_job(disk_io_job* j)
{
tailqueue completed_jobs;
perform_job(j, completed_jobs);
if (completed_jobs.size())
add_completed_jobs(completed_jobs);
}
void disk_io_thread::thread_fun(int thread_id, thread_type_t type
, boost::shared_ptr<io_service::work> w)
{
DLOG("started disk thread %d\n", int(thread_id));
@ -3189,41 +3282,24 @@ namespace libtorrent
if (thread_id == 0)
{
// there's no need for all threads to be doing this
time_point now = clock_type::now();
if (now > m_last_cache_expiry + seconds(5))
{
mutex::scoped_lock l2(m_cache_mutex);
DLOG("blocked_jobs: %d queued_jobs: %d num_threads %d\n"
, int(m_stats_counters[counters::blocked_disk_jobs])
, m_queued_jobs.size(), int(m_num_threads));
m_last_cache_expiry = now;
tailqueue completed_jobs;
flush_expired_write_blocks(completed_jobs, l2);
l2.unlock();
if (completed_jobs.size())
add_completed_jobs(completed_jobs);
}
maybe_flush_write_blocks();
}
tailqueue completed_jobs;
perform_job(j, completed_jobs);
mutex::scoped_lock l2(m_cache_mutex);
check_cache_level(l2, completed_jobs);
l2.unlock();
if (completed_jobs.size())
add_completed_jobs(completed_jobs);
execute_job(j);
l.lock();
}
l.unlock();
// do cleanup in the last running thread
// do cleanup in the last running thread
// if we're not aborting, that means we just configured the thread pool to
// not have any threads (i.e. perform all disk operations in the network
// thread). In this case, the cleanup will happen in abort().
m_stats_counters.inc_stats_counter(counters::num_running_threads, -1);
if (--m_num_running_threads > 0)
if (--m_num_running_threads > 0 || !m_abort)
{
DLOG("exiting disk thread %d. num_threads: %d\n", thread_id, int(m_num_threads));
DLOG("exiting disk thread %d. num_threads: %d aborting: %d\n"
, thread_id, int(m_num_threads), int(m_abort));
TORRENT_ASSERT(m_magic == 0x1337);
return;
}
@ -3234,7 +3310,12 @@ namespace libtorrent
// to read blocks in the disk cache. We need to wait until all
// references are removed from other threads before we can go
// ahead with the cleanup.
// This is not supposed to happen because the disk thread is now scheduled
// for shut down after all peers have shut down (see
// session_impl::abort_stage2()).
mutex::scoped_lock l2(m_cache_mutex);
TORRENT_ASSERT_VAL(m_disk_cache.pinned_blocks() == 0
, m_disk_cache.pinned_blocks());
while (m_disk_cache.pinned_blocks() > 0)
{
l2.unlock();
@ -3245,8 +3326,23 @@ namespace libtorrent
DLOG("disk thread %d is the last one alive. cleaning up\n", thread_id);
tailqueue jobs;
abort_jobs();
// release the io_service to allow the run() call to return
// we do this once we stop posting new callbacks to it.
#if defined TORRENT_ASIO_DEBUGGING
complete_async("disk_io_thread::work");
#endif
w.reset();
TORRENT_ASSERT(m_magic == 0x1337);
}
void disk_io_thread::abort_jobs()
{
TORRENT_ASSERT(m_magic == 0x1337);
tailqueue jobs;
m_disk_cache.clear(jobs);
fail_jobs(storage_error(boost::asio::error::operation_aborted), jobs);
@ -3263,12 +3359,7 @@ namespace libtorrent
= m_disk_cache.all_pieces();
TORRENT_ASSERT(pieces.first == pieces.second);
#endif
// release the io_service to allow the run() call to return
// we do this once we stop posting new callbacks to it.
#if defined TORRENT_ASIO_DEBUGGING
complete_async("disk_io_thread::work");
#endif
m_work.reset();
TORRENT_ASSERT(m_magic == 0x1337);
}
@ -3278,7 +3369,7 @@ namespace libtorrent
{
// we just exceeded the cache size limit. Trigger a trim job
disk_io_job* j = allocate_job(disk_io_job::trim_cache);
add_job(j);
add_job(j, false);
submit_jobs();
}
@ -3451,7 +3542,7 @@ namespace libtorrent
while (flush_jobs.size() > 0)
{
disk_io_job* j = (disk_io_job*)flush_jobs.pop_front();
add_job(j);
add_job(j, false);
}
m_job_cond.notify_all();

View File

@ -1083,10 +1083,17 @@ namespace aux {
m_undead_peers.clear();
// give all the sockets an opportunity to actually have their handlers
// called and cancelled before we continue the shutdown
m_io_service.post(boost::bind(&session_impl::abort_stage2, this));
}
void session_impl::abort_stage2()
{
// it's OK to detach the threads here. The disk_io_thread
// has an internal counter and won't release the network
// thread until they're all dead (via m_work).
m_disk_thread.set_num_threads(0, false);
m_disk_thread.abort(false);
// now it's OK for the network thread to exit
m_work.reset();
@ -5819,8 +5826,8 @@ retry:
void session_impl::update_disk_threads()
{
if (m_settings.get_int(settings_pack::aio_threads) < 1)
m_settings.set_int(settings_pack::aio_threads, 1);
if (m_settings.get_int(settings_pack::aio_threads) < 0)
m_settings.set_int(settings_pack::aio_threads, 0);
#if !TORRENT_USE_PREAD && !TORRENT_USE_PREADV
// if we don't have pread() nor preadv() there's no way

View File

@ -449,6 +449,7 @@ void test_check_files(std::string const& test_path
boost::asio::io_service ios;
counters cnt;
disk_io_thread io(ios, cnt, NULL);
io.set_num_threads(1);
disk_buffer_pool dp(16 * 1024, ios, boost::bind(&nop));
storage_params p;
p.files = &fs;