Merge pull request #536 from ssiloti/always-cache-writes-1.1

Always cache writes
This commit is contained in:
Arvid Norberg 2016-03-15 01:49:16 -04:00
commit 6ba5cb7826
2 changed files with 176 additions and 160 deletions

View File

@ -550,6 +550,13 @@ namespace libtorrent
// disk cache // disk cache
mutable mutex m_cache_mutex; mutable mutex m_cache_mutex;
block_cache m_disk_cache; block_cache m_disk_cache;
enum
{
cache_check_idle,
cache_check_active,
cache_check_reinvoke
};
int m_cache_check_state;
// total number of blocks in use by both the read // total number of blocks in use by both the read
// and the write cache. This is not supposed to // and the write cache. This is not supposed to

View File

@ -160,6 +160,7 @@ namespace libtorrent
, m_last_file_check(clock_type::now()) , m_last_file_check(clock_type::now())
, m_file_pool(40) , m_file_pool(40)
, m_disk_cache(block_size, ios, boost::bind(&disk_io_thread::trigger_cache_trim, this)) , m_disk_cache(block_size, ios, boost::bind(&disk_io_thread::trigger_cache_trim, this))
, m_cache_check_state(cache_check_idle)
, m_stats_counters(cnt) , m_stats_counters(cnt)
, m_ios(ios) , m_ios(ios)
, m_last_disk_aio_performance_warning(min_time()) , m_last_disk_aio_performance_warning(min_time())
@ -939,6 +940,8 @@ namespace libtorrent
kick_hasher(pe, l); kick_hasher(pe, l);
num -= try_flush_hashed(pe, 1, completed_jobs, l); num -= try_flush_hashed(pe, 1, completed_jobs, l);
--pe->piece_refcount; --pe->piece_refcount;
m_disk_cache.maybe_free_piece(pe);
} }
// when the write cache is under high pressure, it is likely // when the write cache is under high pressure, it is likely
@ -1087,18 +1090,18 @@ namespace libtorrent
TORRENT_ASSERT(j->next == 0); TORRENT_ASSERT(j->next == 0);
TORRENT_ASSERT((j->flags & disk_io_job::in_progress) || !j->storage); TORRENT_ASSERT((j->flags & disk_io_job::in_progress) || !j->storage);
#if DEBUG_DISK_THREAD
{
mutex::scoped_lock l(m_cache_mutex); mutex::scoped_lock l(m_cache_mutex);
check_cache_level(l, completed_jobs);
DLOG("perform_job job: %s ( %s%s) piece: %d offset: %d outstanding: %d\n" DLOG("perform_job job: %s ( %s%s) piece: %d offset: %d outstanding: %d\n"
, job_action_name[j->action] , job_action_name[j->action]
, (j->flags & disk_io_job::fence) ? "fence ": "" , (j->flags & disk_io_job::fence) ? "fence ": ""
, (j->flags & disk_io_job::force_copy) ? "force_copy ": "" , (j->flags & disk_io_job::force_copy) ? "force_copy ": ""
, j->piece, j->d.io.offset , j->piece, j->d.io.offset
, j->storage ? j->storage->num_outstanding_jobs() : -1); , j->storage ? j->storage->num_outstanding_jobs() : -1);
}
l.unlock(); #endif
boost::shared_ptr<piece_manager> storage = j->storage; boost::shared_ptr<piece_manager> storage = j->storage;
@ -1123,6 +1126,23 @@ namespace libtorrent
m_stats_counters.inc_stats_counter(counters::num_running_disk_jobs, -1); m_stats_counters.inc_stats_counter(counters::num_running_disk_jobs, -1);
mutex::scoped_lock l(m_cache_mutex);
if (m_cache_check_state == cache_check_idle)
{
m_cache_check_state = cache_check_active;
while (m_cache_check_state != cache_check_idle)
{
check_cache_level(l, completed_jobs);
TORRENT_ASSERT(l.locked());
--m_cache_check_state;
}
}
else
{
m_cache_check_state = cache_check_reinvoke;
}
l.unlock();
if (ret == retry_job) if (ret == retry_job)
{ {
mutex::scoped_lock l2(m_job_mutex); mutex::scoped_lock l2(m_job_mutex);
@ -1454,10 +1474,6 @@ namespace libtorrent
INVARIANT_CHECK; INVARIANT_CHECK;
TORRENT_ASSERT(j->d.io.buffer_size <= m_disk_cache.block_size()); TORRENT_ASSERT(j->d.io.buffer_size <= m_disk_cache.block_size());
// should we put this write job in the cache?
// if we don't use the cache we shouldn't.
if (j->flags & disk_io_job::use_disk_cache)
{
mutex::scoped_lock l(m_cache_mutex); mutex::scoped_lock l(m_cache_mutex);
cached_piece_entry* pe = m_disk_cache.find_piece(j); cached_piece_entry* pe = m_disk_cache.find_piece(j);
@ -1508,7 +1524,6 @@ namespace libtorrent
return defer_handler; return defer_handler;
} }
}
// ok, we should just perform this job right now. // ok, we should just perform this job right now.
return do_uncached_write(j); return do_uncached_write(j);
@ -1571,9 +1586,6 @@ namespace libtorrent
{ {
TORRENT_ASSERT(j->action == disk_io_job::read); TORRENT_ASSERT(j->action == disk_io_job::read);
if (m_settings.get_bool(settings_pack::use_read_cache)
&& m_settings.get_int(settings_pack::cache_size) != 0)
{
int ret = m_disk_cache.try_read(j); int ret = m_disk_cache.try_read(j);
if (ret >= 0) if (ret >= 0)
{ {
@ -1601,7 +1613,19 @@ namespace libtorrent
return 2; return 2;
} }
if (!m_settings.get_bool(settings_pack::use_read_cache)
|| m_settings.get_int(settings_pack::cache_size) == 0)
{
// if the read cache is disabled then we can skip going through the cache
// but only if there is no existing piece entry. Otherwise there may be a
// partial hit on one-or-more dirty buffers so we must use the cache
// to avoid reading bogus data from storage
if (m_disk_cache.find_piece(j) == NULL)
return 1;
}
cached_piece_entry* pe = m_disk_cache.allocate_piece(j, cached_piece_entry::read_lru1); cached_piece_entry* pe = m_disk_cache.allocate_piece(j, cached_piece_entry::read_lru1);
if (pe == NULL) if (pe == NULL)
{ {
j->ret = -1; j->ret = -1;
@ -1621,7 +1645,7 @@ namespace libtorrent
pe->piece_log.push_back(piece_log_t(piece_log_t::set_outstanding_jobs)); pe->piece_log.push_back(piece_log_t(piece_log_t::set_outstanding_jobs));
#endif #endif
pe->outstanding_read = 1; pe->outstanding_read = 1;
}
return 1; return 1;
} }
@ -1685,11 +1709,8 @@ namespace libtorrent
TORRENT_ASSERT(m_disk_cache.is_disk_buffer(j->buffer.disk_block)); TORRENT_ASSERT(m_disk_cache.is_disk_buffer(j->buffer.disk_block));
l_.unlock(); l_.unlock();
#endif #endif
if (m_settings.get_int(settings_pack::cache_size) != 0
&& m_settings.get_bool(settings_pack::use_write_cache))
{
TORRENT_ASSERT((r.start % m_disk_cache.block_size()) == 0); TORRENT_ASSERT((r.start % m_disk_cache.block_size()) == 0);
j->flags |= disk_io_job::use_disk_cache;
if (storage->is_blocked(j)) if (storage->is_blocked(j))
{ {
@ -1730,7 +1751,6 @@ namespace libtorrent
// issued a flush job or not), we're done. // issued a flush job or not), we're done.
if (dpe) return; if (dpe) return;
l.unlock(); l.unlock();
}
add_job(j); add_job(j);
buffer.release(); buffer.release();
@ -1778,13 +1798,6 @@ namespace libtorrent
return; return;
} }
l.unlock(); l.unlock();
if (m_settings.get_bool(settings_pack::use_read_cache)
&& m_settings.get_int(settings_pack::cache_size) != 0)
{
j->flags |= disk_io_job::use_disk_cache;
}
add_job(j); add_job(j);
} }
@ -2287,9 +2300,6 @@ namespace libtorrent
{ {
INVARIANT_CHECK; INVARIANT_CHECK;
if ((j->flags & disk_io_job::use_disk_cache) == 0)
return do_uncached_hash(j);
int const piece_size = j->storage->files()->piece_size(j->piece); int const piece_size = j->storage->files()->piece_size(j->piece);
int const file_flags = file_flags_for_job(j); int const file_flags = file_flags_for_job(j);
@ -2905,6 +2915,9 @@ namespace libtorrent
cached_piece_entry* pe = m_disk_cache.find_piece(j); cached_piece_entry* pe = m_disk_cache.find_piece(j);
if (pe == NULL) return 0; if (pe == NULL) return 0;
pe->outstanding_flush = 0;
if (pe->num_dirty == 0) return 0; if (pe->num_dirty == 0) return 0;
// if multiple threads are flushing this piece, this assert may fire // if multiple threads are flushing this piece, this assert may fire
@ -2942,8 +2955,6 @@ namespace libtorrent
TORRENT_ASSERT(l.locked()); TORRENT_ASSERT(l.locked());
// TORRENT_PIECE_ASSERT(pe->outstanding_flush == 1, pe);
pe->outstanding_flush = 0;
--pe->piece_refcount; --pe->piece_refcount;
m_disk_cache.maybe_free_piece(pe); m_disk_cache.maybe_free_piece(pe);
@ -3457,9 +3468,7 @@ namespace libtorrent
{ {
disk_io_job* j = new_jobs.pop_front(); disk_io_job* j = new_jobs.pop_front();
if (j->action == disk_io_job::read if (j->action == disk_io_job::read)
&& m_settings.get_bool(settings_pack::use_read_cache)
&& m_settings.get_int(settings_pack::cache_size) != 0)
{ {
int state = prep_read_job_impl(j, false); int state = prep_read_job_impl(j, false);
switch (state) switch (state)