simplified the disk-io mutexes and made it more efficient. separated the disk io mutex into a buffer mutex, queue mutex and the piece cache mutex.

This commit is contained in:
Arvid Norberg 2008-06-12 04:40:37 +00:00
parent 933e3c8b54
commit 52f6204288
2 changed files with 82 additions and 104 deletions

View File

@ -207,20 +207,11 @@ namespace libtorrent
int num_blocks; int num_blocks;
// the pointers to the block data // the pointers to the block data
boost::shared_array<char*> blocks; boost::shared_array<char*> blocks;
#ifndef NDEBUG
~cached_piece_entry()
{
TORRENT_ASSERT(storage == 0);
}
#endif
}; };
typedef boost::recursive_mutex mutex_t; typedef boost::recursive_mutex mutex_t;
typedef std::list<cached_piece_entry> cache_t; typedef std::list<cached_piece_entry> cache_t;
char* allocate_buffer(mutex_t::scoped_lock& l);
void free_buffer(char* buf, mutex_t::scoped_lock& l);
// cache operations // cache operations
cache_t::iterator find_cached_piece( cache_t::iterator find_cached_piece(
cache_t& cache, disk_io_job const& j cache_t& cache, disk_io_job const& j
@ -228,7 +219,7 @@ namespace libtorrent
// write cache operations // write cache operations
void flush_oldest_piece(mutex_t::scoped_lock& l); void flush_oldest_piece(mutex_t::scoped_lock& l);
void flush_expired_pieces(mutex_t::scoped_lock& l); void flush_expired_pieces();
void flush_and_remove(cache_t::iterator i, mutex_t::scoped_lock& l); void flush_and_remove(cache_t::iterator i, mutex_t::scoped_lock& l);
void flush(cache_t::iterator i, mutex_t::scoped_lock& l); void flush(cache_t::iterator i, mutex_t::scoped_lock& l);
void cache_block(disk_io_job& j, mutex_t::scoped_lock& l); void cache_block(disk_io_job& j, mutex_t::scoped_lock& l);
@ -242,14 +233,18 @@ namespace libtorrent
bool make_room(int num_blocks bool make_room(int num_blocks
, cache_t::iterator ignore , cache_t::iterator ignore
, mutex_t::scoped_lock& l); , mutex_t::scoped_lock& l);
int try_read_from_cache(disk_io_job const& j, mutex_t::scoped_lock& l); int try_read_from_cache(disk_io_job const& j);
mutable mutex_t m_mutex; // this mutex only protects m_jobs, m_queue_buffer_size
// and m_abort
mutable mutex_t m_queue_mutex;
boost::condition m_signal; boost::condition m_signal;
bool m_abort; bool m_abort;
std::list<disk_io_job> m_jobs; std::list<disk_io_job> m_jobs;
size_type m_queue_buffer_size; size_type m_queue_buffer_size;
// this protects the piece cache and related members
mutable mutex_t m_piece_mutex;
// write cache // write cache
cache_t m_pieces; cache_t m_pieces;
@ -280,6 +275,8 @@ namespace libtorrent
bool m_coalesce_reads; bool m_coalesce_reads;
bool m_use_read_cache; bool m_use_read_cache;
// this only protects the pool allocator
mutable mutex_t m_pool_mutex;
#ifndef TORRENT_DISABLE_POOL_ALLOCATOR #ifndef TORRENT_DISABLE_POOL_ALLOCATOR
// memory pool for read and write operations // memory pool for read and write operations
// and disk cache // and disk cache

View File

@ -78,7 +78,7 @@ namespace libtorrent
void disk_io_thread::join() void disk_io_thread::join()
{ {
mutex_t::scoped_lock l(m_mutex); mutex_t::scoped_lock l(m_queue_mutex);
disk_io_job j; disk_io_job j;
j.action = disk_io_job::abort_thread; j.action = disk_io_job::abort_thread;
m_jobs.insert(m_jobs.begin(), j); m_jobs.insert(m_jobs.begin(), j);
@ -90,7 +90,7 @@ namespace libtorrent
void disk_io_thread::get_cache_info(sha1_hash const& ih, std::vector<cached_piece_info>& ret) const void disk_io_thread::get_cache_info(sha1_hash const& ih, std::vector<cached_piece_info>& ret) const
{ {
mutex_t::scoped_lock l(m_mutex); mutex_t::scoped_lock l(m_piece_mutex);
ret.clear(); ret.clear();
ret.reserve(m_pieces.size()); ret.reserve(m_pieces.size());
for (cache_t::const_iterator i = m_pieces.begin() for (cache_t::const_iterator i = m_pieces.begin()
@ -111,20 +111,20 @@ namespace libtorrent
cache_status disk_io_thread::status() const cache_status disk_io_thread::status() const
{ {
mutex_t::scoped_lock l(m_mutex); mutex_t::scoped_lock l(m_piece_mutex);
return m_cache_stats; return m_cache_stats;
} }
void disk_io_thread::set_cache_size(int s) void disk_io_thread::set_cache_size(int s)
{ {
mutex_t::scoped_lock l(m_mutex); mutex_t::scoped_lock l(m_piece_mutex);
TORRENT_ASSERT(s >= 0); TORRENT_ASSERT(s >= 0);
m_cache_size = s; m_cache_size = s;
} }
void disk_io_thread::set_cache_expiry(int ex) void disk_io_thread::set_cache_expiry(int ex)
{ {
mutex_t::scoped_lock l(m_mutex); mutex_t::scoped_lock l(m_piece_mutex);
TORRENT_ASSERT(ex > 0); TORRENT_ASSERT(ex > 0);
m_cache_expiry = ex; m_cache_expiry = ex;
} }
@ -132,7 +132,7 @@ namespace libtorrent
// aborts read operations // aborts read operations
void disk_io_thread::stop(boost::intrusive_ptr<piece_manager> s) void disk_io_thread::stop(boost::intrusive_ptr<piece_manager> s)
{ {
mutex_t::scoped_lock l(m_mutex); mutex_t::scoped_lock l(m_queue_mutex);
// read jobs are aborted, write and move jobs are syncronized // read jobs are aborted, write and move jobs are syncronized
for (std::list<disk_io_job>::iterator i = m_jobs.begin(); for (std::list<disk_io_job>::iterator i = m_jobs.begin();
i != m_jobs.end();) i != m_jobs.end();)
@ -200,10 +200,12 @@ namespace libtorrent
return cache.end(); return cache.end();
} }
void disk_io_thread::flush_expired_pieces(mutex_t::scoped_lock& l) void disk_io_thread::flush_expired_pieces()
{ {
ptime now = time_now(); ptime now = time_now();
mutex_t::scoped_lock l(m_piece_mutex);
INVARIANT_CHECK; INVARIANT_CHECK;
for (;;) for (;;)
{ {
@ -226,15 +228,12 @@ namespace libtorrent
for (int i = 0; i < blocks_in_piece; ++i) for (int i = 0; i < blocks_in_piece; ++i)
{ {
if (p.blocks[i] == 0) continue; if (p.blocks[i] == 0) continue;
free_buffer(p.blocks[i], l); free_buffer(p.blocks[i]);
p.blocks[i] = 0; p.blocks[i] = 0;
--p.num_blocks; --p.num_blocks;
--m_cache_stats.cache_size; --m_cache_stats.cache_size;
--m_cache_stats.read_cache_size; --m_cache_stats.read_cache_size;
} }
l.unlock();
p.storage = 0;
l.lock();
} }
bool disk_io_thread::clear_oldest_read_piece( bool disk_io_thread::clear_oldest_read_piece(
@ -330,7 +329,7 @@ namespace libtorrent
offset += m_block_size; offset += m_block_size;
buffer_size += block_size; buffer_size += block_size;
} }
free_buffer(p.blocks[i], l); free_buffer(p.blocks[i]);
p.blocks[i] = 0; p.blocks[i] = 0;
TORRENT_ASSERT(p.num_blocks > 0); TORRENT_ASSERT(p.num_blocks > 0);
--p.num_blocks; --p.num_blocks;
@ -343,9 +342,6 @@ namespace libtorrent
for (int i = 0; i < blocks_in_piece; ++i) for (int i = 0; i < blocks_in_piece; ++i)
TORRENT_ASSERT(p.blocks[i] == 0); TORRENT_ASSERT(p.blocks[i] == 0);
#endif #endif
l.unlock();
p.storage = 0;
l.lock();
} }
void disk_io_thread::cache_block(disk_io_job& j, mutex_t::scoped_lock& l) void disk_io_thread::cache_block(disk_io_job& j, mutex_t::scoped_lock& l)
@ -368,9 +364,6 @@ namespace libtorrent
p.blocks[block] = j.buffer; p.blocks[block] = j.buffer;
++m_cache_stats.cache_size; ++m_cache_stats.cache_size;
m_pieces.push_back(p); m_pieces.push_back(p);
#ifndef NDEBUG
p.storage = 0;
#endif
} }
// fills a piece with data from disk, returns the total number of bytes // fills a piece with data from disk, returns the total number of bytes
@ -388,7 +381,7 @@ namespace libtorrent
// stop allocating and don't read more than // stop allocating and don't read more than
// what we've allocated now // what we've allocated now
if (p.blocks[i]) break; if (p.blocks[i]) break;
p.blocks[i] = allocate_buffer(l); p.blocks[i] = allocate_buffer();
// the allocation failed, break // the allocation failed, break
if (p.blocks[i] == 0) break; if (p.blocks[i] == 0) break;
@ -484,9 +477,6 @@ namespace libtorrent
else else
m_read_pieces.push_back(p); m_read_pieces.push_back(p);
#ifndef NDEBUG
p.storage = 0;
#endif
return ret; return ret;
} }
@ -509,7 +499,7 @@ namespace libtorrent
if (p.blocks[k]) if (p.blocks[k])
{ {
#ifndef TORRENT_DISABLE_POOL_ALLOCATOR #ifndef TORRENT_DISABLE_POOL_ALLOCATOR
TORRENT_ASSERT(m_pool.is_from(p.blocks[k])); TORRENT_ASSERT(is_disk_buffer(p.blocks[k]));
#endif #endif
++blocks; ++blocks;
} }
@ -533,7 +523,7 @@ namespace libtorrent
if (p.blocks[k]) if (p.blocks[k])
{ {
#ifndef TORRENT_DISABLE_POOL_ALLOCATOR #ifndef TORRENT_DISABLE_POOL_ALLOCATOR
TORRENT_ASSERT(m_pool.is_from(p.blocks[k])); TORRENT_ASSERT(is_disk_buffer(p.blocks[k]));
#endif #endif
++blocks; ++blocks;
} }
@ -551,10 +541,11 @@ namespace libtorrent
} }
#endif #endif
int disk_io_thread::try_read_from_cache(disk_io_job const& j, mutex_t::scoped_lock& l) int disk_io_thread::try_read_from_cache(disk_io_job const& j)
{ {
TORRENT_ASSERT(j.buffer); TORRENT_ASSERT(j.buffer);
mutex_t::scoped_lock l(m_piece_mutex);
if (!m_use_read_cache) return -2; if (!m_use_read_cache) return -2;
cache_t::iterator p cache_t::iterator p
@ -624,7 +615,7 @@ namespace libtorrent
TORRENT_ASSERT(!j.callback); TORRENT_ASSERT(!j.callback);
TORRENT_ASSERT(j.storage); TORRENT_ASSERT(j.storage);
TORRENT_ASSERT(j.buffer_size <= m_block_size); TORRENT_ASSERT(j.buffer_size <= m_block_size);
mutex_t::scoped_lock l(m_mutex); mutex_t::scoped_lock l(m_queue_mutex);
#ifndef NDEBUG #ifndef NDEBUG
if (j.action == disk_io_job::write) if (j.action == disk_io_job::write)
{ {
@ -696,7 +687,7 @@ namespace libtorrent
#ifdef TORRENT_DISABLE_POOL_ALLOCATOR #ifdef TORRENT_DISABLE_POOL_ALLOCATOR
return true; return true;
#else #else
mutex_t::scoped_lock l(m_mutex); mutex_t::scoped_lock l(m_pool_mutex);
return m_pool.is_from(buffer); return m_pool.is_from(buffer);
#endif #endif
} }
@ -704,18 +695,7 @@ namespace libtorrent
char* disk_io_thread::allocate_buffer() char* disk_io_thread::allocate_buffer()
{ {
mutex_t::scoped_lock l(m_mutex); mutex_t::scoped_lock l(m_pool_mutex);
return allocate_buffer(l);
}
void disk_io_thread::free_buffer(char* buf)
{
mutex_t::scoped_lock l(m_mutex);
free_buffer(buf, l);
}
char* disk_io_thread::allocate_buffer(mutex_t::scoped_lock& l)
{
#ifdef TORRENT_STATS #ifdef TORRENT_STATS
++m_allocations; ++m_allocations;
#endif #endif
@ -726,8 +706,9 @@ namespace libtorrent
#endif #endif
} }
void disk_io_thread::free_buffer(char* buf, mutex_t::scoped_lock& l) void disk_io_thread::free_buffer(char* buf)
{ {
mutex_t::scoped_lock l(m_pool_mutex);
#ifdef TORRENT_STATS #ifdef TORRENT_STATS
--m_allocations; --m_allocations;
#endif #endif
@ -745,12 +726,15 @@ namespace libtorrent
#ifdef TORRENT_DISK_STATS #ifdef TORRENT_DISK_STATS
m_log << log_time() << " idle" << std::endl; m_log << log_time() << " idle" << std::endl;
#endif #endif
mutex_t::scoped_lock l(m_mutex); mutex_t::scoped_lock jl(m_queue_mutex);
while (m_jobs.empty() && !m_abort) while (m_jobs.empty() && !m_abort)
m_signal.wait(l); m_signal.wait(jl);
if (m_abort && m_jobs.empty()) if (m_abort && m_jobs.empty())
{ {
jl.unlock();
mutex_t::scoped_lock l(m_piece_mutex);
// flush all disk caches // flush all disk caches
for (cache_t::iterator i = m_pieces.begin() for (cache_t::iterator i = m_pieces.begin()
, end(m_pieces.end()); i != end; ++i) , end(m_pieces.end()); i != end; ++i)
@ -758,10 +742,8 @@ namespace libtorrent
for (cache_t::iterator i = m_read_pieces.begin() for (cache_t::iterator i = m_read_pieces.begin()
, end(m_read_pieces.end()); i != end; ++i) , end(m_read_pieces.end()); i != end; ++i)
free_piece(*i, l); free_piece(*i, l);
l.unlock();
m_pieces.clear(); m_pieces.clear();
m_read_pieces.clear(); m_read_pieces.clear();
l.lock();
return; return;
} }
@ -777,40 +759,14 @@ namespace libtorrent
disk_io_job j = m_jobs.front(); disk_io_job j = m_jobs.front();
m_jobs.pop_front(); m_jobs.pop_front();
if (j.action == disk_io_job::abort_thread)
{
m_abort = true;
for (std::list<disk_io_job>::iterator i = m_jobs.begin();
i != m_jobs.end();)
{
if (i->action == disk_io_job::read)
{
if (i->callback) m_ios.post(bind(i->callback, -1, *i));
m_jobs.erase(i++);
continue;
}
if (i->action == disk_io_job::check_files)
{
if (i->callback) m_ios.post(bind(i->callback
, piece_manager::disk_check_aborted, *i));
m_jobs.erase(i++);
continue;
}
++i;
}
continue;
}
m_queue_buffer_size -= j.buffer_size; m_queue_buffer_size -= j.buffer_size;
jl.unlock();
flush_expired_pieces(l); flush_expired_pieces();
l.unlock();
int ret = 0; int ret = 0;
TORRENT_ASSERT(j.storage); TORRENT_ASSERT(j.storage || j.action == disk_io_job::abort_thread);
#ifdef TORRENT_DISK_STATS #ifdef TORRENT_DISK_STATS
ptime start = time_now(); ptime start = time_now();
#endif #endif
@ -820,6 +776,31 @@ namespace libtorrent
switch (j.action) switch (j.action)
{ {
case disk_io_job::abort_thread:
{
mutex_t::scoped_lock jl(m_queue_mutex);
m_abort = true;
for (std::list<disk_io_job>::iterator i = m_jobs.begin();
i != m_jobs.end();)
{
if (i->action == disk_io_job::read)
{
if (i->callback) m_ios.post(bind(i->callback, -1, *i));
m_jobs.erase(i++);
continue;
}
if (i->action == disk_io_job::check_files)
{
if (i->callback) m_ios.post(bind(i->callback
, piece_manager::disk_check_aborted, *i));
m_jobs.erase(i++);
continue;
}
++i;
}
break;
}
case disk_io_job::read: case disk_io_job::read:
{ {
std::string const& error_string = j.storage->error(); std::string const& error_string = j.storage->error();
@ -837,7 +818,6 @@ namespace libtorrent
#ifdef TORRENT_DISK_STATS #ifdef TORRENT_DISK_STATS
m_log << log_time() << " read " << j.buffer_size << std::endl; m_log << log_time() << " read " << j.buffer_size << std::endl;
#endif #endif
mutex_t::scoped_lock l(m_mutex);
INVARIANT_CHECK; INVARIANT_CHECK;
TORRENT_ASSERT(j.buffer == 0); TORRENT_ASSERT(j.buffer == 0);
j.buffer = allocate_buffer(); j.buffer = allocate_buffer();
@ -850,7 +830,7 @@ namespace libtorrent
} }
disk_buffer_holder read_holder(*this, j.buffer); disk_buffer_holder read_holder(*this, j.buffer);
ret = try_read_from_cache(j, l); ret = try_read_from_cache(j);
// -2 means there's no space in the read cache // -2 means there's no space in the read cache
// or that the read cache is disabled // or that the read cache is disabled
@ -864,7 +844,6 @@ namespace libtorrent
} }
else if (ret == -2) else if (ret == -2)
{ {
l.unlock();
ret = j.storage->read_impl(j.buffer, j.piece, j.offset ret = j.storage->read_impl(j.buffer, j.piece, j.offset
, j.buffer_size); , j.buffer_size);
if (ret < 0) if (ret < 0)
@ -874,7 +853,6 @@ namespace libtorrent
j.storage->clear_error(); j.storage->clear_error();
break; break;
} }
l.lock();
++m_cache_stats.blocks_read; ++m_cache_stats.blocks_read;
} }
read_holder.release(); read_holder.release();
@ -897,7 +875,7 @@ namespace libtorrent
#ifdef TORRENT_DISK_STATS #ifdef TORRENT_DISK_STATS
m_log << log_time() << " write " << j.buffer_size << std::endl; m_log << log_time() << " write " << j.buffer_size << std::endl;
#endif #endif
mutex_t::scoped_lock l(m_mutex); mutex_t::scoped_lock l(m_piece_mutex);
INVARIANT_CHECK; INVARIANT_CHECK;
cache_t::iterator p cache_t::iterator p
= find_cached_piece(m_pieces, j, l); = find_cached_piece(m_pieces, j, l);
@ -934,8 +912,7 @@ namespace libtorrent
#ifdef TORRENT_DISK_STATS #ifdef TORRENT_DISK_STATS
m_log << log_time() << " hash" << std::endl; m_log << log_time() << " hash" << std::endl;
#endif #endif
mutex_t::scoped_lock l(m_mutex); mutex_t::scoped_lock l(m_piece_mutex);
INVARIANT_CHECK; INVARIANT_CHECK;
cache_t::iterator i cache_t::iterator i
@ -993,8 +970,8 @@ namespace libtorrent
m_log << log_time() << " release" << std::endl; m_log << log_time() << " release" << std::endl;
#endif #endif
TORRENT_ASSERT(j.buffer == 0); TORRENT_ASSERT(j.buffer == 0);
mutex_t::scoped_lock l(m_mutex);
mutex_t::scoped_lock l(m_piece_mutex);
INVARIANT_CHECK; INVARIANT_CHECK;
for (cache_t::iterator i = m_pieces.begin(); i != m_pieces.end();) for (cache_t::iterator i = m_pieces.begin(); i != m_pieces.end();)
@ -1009,10 +986,13 @@ namespace libtorrent
++i; ++i;
} }
} }
#ifndef TORRENT_DISABLE_POOL_ALLOCATOR
m_pool.release_memory();
#endif
l.unlock(); l.unlock();
#ifndef TORRENT_DISABLE_POOL_ALLOCATOR
{
mutex_t::scoped_lock l(m_pool_mutex);
m_pool.release_memory();
}
#endif
ret = j.storage->release_files_impl(); ret = j.storage->release_files_impl();
if (ret != 0) if (ret != 0)
{ {
@ -1028,8 +1008,8 @@ namespace libtorrent
m_log << log_time() << " delete" << std::endl; m_log << log_time() << " delete" << std::endl;
#endif #endif
TORRENT_ASSERT(j.buffer == 0); TORRENT_ASSERT(j.buffer == 0);
mutex_t::scoped_lock l(m_mutex);
mutex_t::scoped_lock l(m_piece_mutex);
INVARIANT_CHECK; INVARIANT_CHECK;
cache_t::iterator i = std::remove_if( cache_t::iterator i = std::remove_if(
@ -1042,15 +1022,18 @@ namespace libtorrent
for (int j = 0; j < blocks_in_piece; ++j) for (int j = 0; j < blocks_in_piece; ++j)
{ {
if (k->blocks[j] == 0) continue; if (k->blocks[j] == 0) continue;
free_buffer(k->blocks[j], l); free_buffer(k->blocks[j]);
k->blocks[j] = 0; k->blocks[j] = 0;
} }
} }
m_pieces.erase(i, m_pieces.end()); m_pieces.erase(i, m_pieces.end());
#ifndef TORRENT_DISABLE_POOL_ALLOCATOR
m_pool.release_memory();
#endif
l.unlock(); l.unlock();
#ifndef TORRENT_DISABLE_POOL_ALLOCATOR
{
mutex_t::scoped_lock l(m_pool_mutex);
m_pool.release_memory();
}
#endif
ret = j.storage->delete_files_impl(); ret = j.storage->delete_files_impl();
if (ret != 0) if (ret != 0)
{ {
@ -1094,9 +1077,7 @@ namespace libtorrent
// if the check is not done, add it at the end of the job queue // if the check is not done, add it at the end of the job queue
if (ret == piece_manager::need_full_check) if (ret == piece_manager::need_full_check)
{ {
mutex_t::scoped_lock l(m_mutex); add_job(j, handler);
m_jobs.push_back(j);
m_jobs.back().callback.swap(handler);
continue; continue;
} }
break; break;