diff --git a/include/libtorrent/disk_io_thread.hpp b/include/libtorrent/disk_io_thread.hpp index 10d3b6ac7..ce9709905 100644 --- a/include/libtorrent/disk_io_thread.hpp +++ b/include/libtorrent/disk_io_thread.hpp @@ -550,6 +550,13 @@ namespace libtorrent // disk cache mutable mutex m_cache_mutex; block_cache m_disk_cache; + enum + { + cache_check_idle, + cache_check_active, + cache_check_reinvoke + }; + int m_cache_check_state; // total number of blocks in use by both the read // and the write cache. This is not supposed to diff --git a/src/disk_io_thread.cpp b/src/disk_io_thread.cpp index f864f5b0f..e36b1e2b3 100644 --- a/src/disk_io_thread.cpp +++ b/src/disk_io_thread.cpp @@ -160,6 +160,7 @@ namespace libtorrent , m_last_file_check(clock_type::now()) , m_file_pool(40) , m_disk_cache(block_size, ios, boost::bind(&disk_io_thread::trigger_cache_trim, this)) + , m_cache_check_state(cache_check_idle) , m_stats_counters(cnt) , m_ios(ios) , m_last_disk_aio_performance_warning(min_time()) @@ -939,6 +940,8 @@ namespace libtorrent kick_hasher(pe, l); num -= try_flush_hashed(pe, 1, completed_jobs, l); --pe->piece_refcount; + + m_disk_cache.maybe_free_piece(pe); } // when the write cache is under high pressure, it is likely @@ -1087,18 +1090,18 @@ namespace libtorrent TORRENT_ASSERT(j->next == 0); TORRENT_ASSERT((j->flags & disk_io_job::in_progress) || !j->storage); - mutex::scoped_lock l(m_cache_mutex); +#if DEBUG_DISK_THREAD + { + mutex::scoped_lock l(m_cache_mutex); - check_cache_level(l, completed_jobs); - - DLOG("perform_job job: %s ( %s%s) piece: %d offset: %d outstanding: %d\n" - , job_action_name[j->action] - , (j->flags & disk_io_job::fence) ? "fence ": "" - , (j->flags & disk_io_job::force_copy) ? "force_copy ": "" - , j->piece, j->d.io.offset - , j->storage ? j->storage->num_outstanding_jobs() : -1); - - l.unlock(); + DLOG("perform_job job: %s ( %s%s) piece: %d offset: %d outstanding: %d\n" + , job_action_name[j->action] + , (j->flags & disk_io_job::fence) ? "fence ": "" + , (j->flags & disk_io_job::force_copy) ? "force_copy ": "" + , j->piece, j->d.io.offset + , j->storage ? j->storage->num_outstanding_jobs() : -1); + } +#endif boost::shared_ptr storage = j->storage; @@ -1123,6 +1126,23 @@ namespace libtorrent m_stats_counters.inc_stats_counter(counters::num_running_disk_jobs, -1); + mutex::scoped_lock l(m_cache_mutex); + if (m_cache_check_state == cache_check_idle) + { + m_cache_check_state = cache_check_active; + while (m_cache_check_state != cache_check_idle) + { + check_cache_level(l, completed_jobs); + TORRENT_ASSERT(l.locked()); + --m_cache_check_state; + } + } + else + { + m_cache_check_state = cache_check_reinvoke; + } + l.unlock(); + if (ret == retry_job) { mutex::scoped_lock l2(m_job_mutex); @@ -1454,60 +1474,55 @@ namespace libtorrent INVARIANT_CHECK; TORRENT_ASSERT(j->d.io.buffer_size <= m_disk_cache.block_size()); - // should we put this write job in the cache? - // if we don't use the cache we shouldn't. - if (j->flags & disk_io_job::use_disk_cache) + mutex::scoped_lock l(m_cache_mutex); + + cached_piece_entry* pe = m_disk_cache.find_piece(j); + if (pe && pe->hashing_done) { - mutex::scoped_lock l(m_cache_mutex); - - cached_piece_entry* pe = m_disk_cache.find_piece(j); - if (pe && pe->hashing_done) - { #if TORRENT_USE_ASSERTS - print_piece_log(pe->piece_log); + print_piece_log(pe->piece_log); #endif - TORRENT_ASSERT(pe->blocks[j->d.io.offset / 16 / 1024].buf != j->buffer.disk_block); - TORRENT_ASSERT(pe->blocks[j->d.io.offset / 16 / 1024].buf != NULL); - j->error.ec = error::operation_aborted; - j->error.operation = storage_error::write; - return -1; - } + TORRENT_ASSERT(pe->blocks[j->d.io.offset / 16 / 1024].buf != j->buffer.disk_block); + TORRENT_ASSERT(pe->blocks[j->d.io.offset / 16 / 1024].buf != NULL); + j->error.ec = error::operation_aborted; + j->error.operation = storage_error::write; + return -1; + } - pe = m_disk_cache.add_dirty_block(j); + pe = m_disk_cache.add_dirty_block(j); - if (pe) - { + if (pe) + { #if TORRENT_USE_ASSERTS - pe->piece_log.push_back(piece_log_t(j->action, j->d.io.offset / 0x4000)); + pe->piece_log.push_back(piece_log_t(j->action, j->d.io.offset / 0x4000)); #endif - if (!pe->hashing_done - && pe->hash == 0 - && !m_settings.get_bool(settings_pack::disable_hash_checks)) - { - pe->hash = new partial_hash; - m_disk_cache.update_cache_state(pe); - } - - TORRENT_PIECE_ASSERT(pe->cache_state <= cached_piece_entry::read_lru1 || pe->cache_state == cached_piece_entry::read_lru2, pe); - ++pe->piece_refcount; - - // see if we can progress the hash cursor with this new block - kick_hasher(pe, l); - - TORRENT_PIECE_ASSERT(pe->cache_state <= cached_piece_entry::read_lru1 || pe->cache_state == cached_piece_entry::read_lru2, pe); - - // flushes the piece to disk in case - // it satisfies the condition for a write - // piece to be flushed - try_flush_hashed(pe, m_settings.get_int( - settings_pack::write_cache_line_size), completed_jobs, l); - - --pe->piece_refcount; - m_disk_cache.maybe_free_piece(pe); - - return defer_handler; + if (!pe->hashing_done + && pe->hash == 0 + && !m_settings.get_bool(settings_pack::disable_hash_checks)) + { + pe->hash = new partial_hash; + m_disk_cache.update_cache_state(pe); } + + TORRENT_PIECE_ASSERT(pe->cache_state <= cached_piece_entry::read_lru1 || pe->cache_state == cached_piece_entry::read_lru2, pe); + ++pe->piece_refcount; + + // see if we can progress the hash cursor with this new block + kick_hasher(pe, l); + + TORRENT_PIECE_ASSERT(pe->cache_state <= cached_piece_entry::read_lru1 || pe->cache_state == cached_piece_entry::read_lru2, pe); + + // flushes the piece to disk in case + // it satisfies the condition for a write + // piece to be flushed + try_flush_hashed(pe, m_settings.get_int( + settings_pack::write_cache_line_size), completed_jobs, l); + + --pe->piece_refcount; + m_disk_cache.maybe_free_piece(pe); + + return defer_handler; } // ok, we should just perform this job right now. @@ -1571,57 +1586,66 @@ namespace libtorrent { TORRENT_ASSERT(j->action == disk_io_job::read); - if (m_settings.get_bool(settings_pack::use_read_cache) - && m_settings.get_int(settings_pack::cache_size) != 0) + int ret = m_disk_cache.try_read(j); + if (ret >= 0) { - int ret = m_disk_cache.try_read(j); - if (ret >= 0) - { - m_stats_counters.inc_stats_counter(counters::num_blocks_cache_hits); - DLOG("do_read: cache hit\n"); - j->flags |= disk_io_job::cache_hit; - j->ret = ret; - return 0; - } - else if (ret == -2) - { - j->error.ec = error::no_memory; - j->error.operation = storage_error::alloc_cache_piece; - j->ret = disk_io_job::operation_failed; - return 0; - } + m_stats_counters.inc_stats_counter(counters::num_blocks_cache_hits); + DLOG("do_read: cache hit\n"); + j->flags |= disk_io_job::cache_hit; + j->ret = ret; + return 0; + } + else if (ret == -2) + { + j->error.ec = error::no_memory; + j->error.operation = storage_error::alloc_cache_piece; + j->ret = disk_io_job::operation_failed; + return 0; + } - if (check_fence && j->storage->is_blocked(j)) - { - // this means the job was queued up inside storage - m_stats_counters.inc_stats_counter(counters::blocked_disk_jobs); - DLOG("blocked job: %s (torrent: %d total: %d)\n" - , job_action_name[j->action], j->storage ? j->storage->num_blocked() : 0 - , int(m_stats_counters[counters::blocked_disk_jobs])); - return 2; - } + if (check_fence && j->storage->is_blocked(j)) + { + // this means the job was queued up inside storage + m_stats_counters.inc_stats_counter(counters::blocked_disk_jobs); + DLOG("blocked job: %s (torrent: %d total: %d)\n" + , job_action_name[j->action], j->storage ? j->storage->num_blocked() : 0 + , int(m_stats_counters[counters::blocked_disk_jobs])); + return 2; + } - cached_piece_entry* pe = m_disk_cache.allocate_piece(j, cached_piece_entry::read_lru1); - if (pe == NULL) - { - j->ret = -1; - j->error.ec = error::no_memory; - j->error.operation = storage_error::read; - return 0; - } - j->flags |= disk_io_job::use_disk_cache; - if (pe->outstanding_read) - { - TORRENT_PIECE_ASSERT(j->piece == pe->piece, pe); - pe->read_jobs.push_back(j); - return 2; - } + if (!m_settings.get_bool(settings_pack::use_read_cache) + || m_settings.get_int(settings_pack::cache_size) == 0) + { + // if the read cache is disabled then we can skip going through the cache + // but only if there is no existing piece entry. Otherwise there may be a + // partial hit on one-or-more dirty buffers so we must use the cache + // to avoid reading bogus data from storage + if (m_disk_cache.find_piece(j) == NULL) + return 1; + } + + cached_piece_entry* pe = m_disk_cache.allocate_piece(j, cached_piece_entry::read_lru1); + + if (pe == NULL) + { + j->ret = -1; + j->error.ec = error::no_memory; + j->error.operation = storage_error::read; + return 0; + } + j->flags |= disk_io_job::use_disk_cache; + if (pe->outstanding_read) + { + TORRENT_PIECE_ASSERT(j->piece == pe->piece, pe); + pe->read_jobs.push_back(j); + return 2; + } #if TORRENT_USE_ASSERTS - pe->piece_log.push_back(piece_log_t(piece_log_t::set_outstanding_jobs)); + pe->piece_log.push_back(piece_log_t(piece_log_t::set_outstanding_jobs)); #endif - pe->outstanding_read = 1; - } + pe->outstanding_read = 1; + return 1; } @@ -1685,53 +1709,49 @@ namespace libtorrent TORRENT_ASSERT(m_disk_cache.is_disk_buffer(j->buffer.disk_block)); l_.unlock(); #endif - if (m_settings.get_int(settings_pack::cache_size) != 0 - && m_settings.get_bool(settings_pack::use_write_cache)) + + TORRENT_ASSERT((r.start % m_disk_cache.block_size()) == 0); + + if (storage->is_blocked(j)) { - TORRENT_ASSERT((r.start % m_disk_cache.block_size()) == 0); - j->flags |= disk_io_job::use_disk_cache; - - if (storage->is_blocked(j)) - { - // this means the job was queued up inside storage - m_stats_counters.inc_stats_counter(counters::blocked_disk_jobs); - DLOG("blocked job: %s (torrent: %d total: %d)\n" - , job_action_name[j->action], j->storage ? j->storage->num_blocked() : 0 - , int(m_stats_counters[counters::blocked_disk_jobs])); - // make the holder give up ownership of the buffer - // since the job was successfully queued up - buffer.release(); - return; - } - - mutex::scoped_lock l(m_cache_mutex); - // if we succeed in adding the block to the cache, the job will - // be added along with it. we may not free j if so - cached_piece_entry* dpe = m_disk_cache.add_dirty_block(j); - - // if the buffer was successfully added to the cache - // our holder should no longer own it - if (dpe) buffer.release(); - - if (dpe && dpe->outstanding_flush == 0) - { - dpe->outstanding_flush = 1; - l.unlock(); - - // the block and write job were successfully inserted - // into the cache. Now, see if we should trigger a flush - j = allocate_job(disk_io_job::flush_hashed); - j->storage = storage->shared_from_this(); - j->piece = r.piece; - j->flags = flags; - add_job(j); - } - // if we added the block (regardless of whether we also - // issued a flush job or not), we're done. - if (dpe) return; - l.unlock(); + // this means the job was queued up inside storage + m_stats_counters.inc_stats_counter(counters::blocked_disk_jobs); + DLOG("blocked job: %s (torrent: %d total: %d)\n" + , job_action_name[j->action], j->storage ? j->storage->num_blocked() : 0 + , int(m_stats_counters[counters::blocked_disk_jobs])); + // make the holder give up ownership of the buffer + // since the job was successfully queued up + buffer.release(); + return; } + mutex::scoped_lock l(m_cache_mutex); + // if we succeed in adding the block to the cache, the job will + // be added along with it. we may not free j if so + cached_piece_entry* dpe = m_disk_cache.add_dirty_block(j); + + // if the buffer was successfully added to the cache + // our holder should no longer own it + if (dpe) buffer.release(); + + if (dpe && dpe->outstanding_flush == 0) + { + dpe->outstanding_flush = 1; + l.unlock(); + + // the block and write job were successfully inserted + // into the cache. Now, see if we should trigger a flush + j = allocate_job(disk_io_job::flush_hashed); + j->storage = storage->shared_from_this(); + j->piece = r.piece; + j->flags = flags; + add_job(j); + } + // if we added the block (regardless of whether we also + // issued a flush job or not), we're done. + if (dpe) return; + l.unlock(); + add_job(j); buffer.release(); } @@ -1778,13 +1798,6 @@ namespace libtorrent return; } l.unlock(); - - if (m_settings.get_bool(settings_pack::use_read_cache) - && m_settings.get_int(settings_pack::cache_size) != 0) - { - j->flags |= disk_io_job::use_disk_cache; - } - add_job(j); } @@ -2287,9 +2300,6 @@ namespace libtorrent { INVARIANT_CHECK; - if ((j->flags & disk_io_job::use_disk_cache) == 0) - return do_uncached_hash(j); - int const piece_size = j->storage->files()->piece_size(j->piece); int const file_flags = file_flags_for_job(j); @@ -2905,6 +2915,9 @@ namespace libtorrent cached_piece_entry* pe = m_disk_cache.find_piece(j); if (pe == NULL) return 0; + + pe->outstanding_flush = 0; + if (pe->num_dirty == 0) return 0; // if multiple threads are flushing this piece, this assert may fire @@ -2942,8 +2955,6 @@ namespace libtorrent TORRENT_ASSERT(l.locked()); -// TORRENT_PIECE_ASSERT(pe->outstanding_flush == 1, pe); - pe->outstanding_flush = 0; --pe->piece_refcount; m_disk_cache.maybe_free_piece(pe); @@ -3457,9 +3468,7 @@ namespace libtorrent { disk_io_job* j = new_jobs.pop_front(); - if (j->action == disk_io_job::read - && m_settings.get_bool(settings_pack::use_read_cache) - && m_settings.get_int(settings_pack::cache_size) != 0) + if (j->action == disk_io_job::read) { int state = prep_read_job_impl(j, false); switch (state)