optimized disk cache to work with large caches

This commit is contained in:
Arvid Norberg 2010-01-27 04:25:45 +00:00
parent 5cc3cbd8b1
commit f6c0d5af35
3 changed files with 222 additions and 188 deletions

View File

@ -10,6 +10,7 @@
* made it possible to build libtorrent without RTTI support * made it possible to build libtorrent without RTTI support
* added support to build with libgcrypt and a shipped version of libtommath * added support to build with libgcrypt and a shipped version of libtommath
* optimized DHT routing table memory usage * optimized DHT routing table memory usage
* optimized disk cache to work with large caches
0.15 release 0.15 release

View File

@ -53,8 +53,20 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/session_settings.hpp" #include "libtorrent/session_settings.hpp"
#include "libtorrent/thread.hpp" #include "libtorrent/thread.hpp"
#include <boost/multi_index_container.hpp>
#include <boost/multi_index/member.hpp>
#include <boost/multi_index/ordered_index.hpp>
#include <boost/multi_index/mem_fun.hpp>
namespace libtorrent namespace libtorrent
{ {
using boost::multi_index::multi_index_container;
using boost::multi_index::ordered_non_unique;
using boost::multi_index::ordered_unique;
using boost::multi_index::indexed_by;
using boost::multi_index::member;
using boost::multi_index::const_mem_fun;
struct cached_piece_info struct cached_piece_info
{ {
int piece; int piece;
@ -304,11 +316,22 @@ namespace libtorrent
int num_blocks; int num_blocks;
// the pointers to the block data // the pointers to the block data
boost::shared_array<cached_block_entry> blocks; boost::shared_array<cached_block_entry> blocks;
std::pair<void*, int> storage_piece_pair() const
{ return std::pair<void*, int>(storage.get(), piece); }
}; };
// TODO: turn this into a multi-index list typedef multi_index_container<
// sorted by piece and last use time cached_piece_entry, indexed_by<
typedef std::list<cached_piece_entry> cache_t; ordered_unique<const_mem_fun<cached_piece_entry, std::pair<void*, int>
, &cached_piece_entry::storage_piece_pair> >
, ordered_non_unique<member<cached_piece_entry, ptime
, &cached_piece_entry::last_use>, std::greater<ptime> >
>
> cache_t;
typedef cache_t::nth_index<0>::type cache_piece_index_t;
typedef cache_t::nth_index<1>::type cache_lru_index_t;
private: private:
@ -322,30 +345,28 @@ namespace libtorrent
, disk_io_job const& j, int ret); , disk_io_job const& j, int ret);
// cache operations // cache operations
cache_t::iterator find_cached_piece( cache_piece_index_t::iterator find_cached_piece(
cache_t& cache, disk_io_job const& j cache_t& cache, disk_io_job const& j
, mutex::scoped_lock& l); , mutex::scoped_lock& l);
bool is_cache_hit(cache_t::iterator p bool is_cache_hit(cached_piece_entry& p
, disk_io_job const& j, mutex::scoped_lock& l); , disk_io_job const& j, mutex::scoped_lock& l);
int copy_from_piece(cache_t::iterator p, bool& hit int copy_from_piece(cached_piece_entry& p, bool& hit
, disk_io_job const& j, mutex::scoped_lock& l); , disk_io_job const& j, mutex::scoped_lock& l);
// write cache operations // write cache operations
enum options_t { dont_flush_write_blocks = 1, ignore_cache_size = 2 }; enum options_t { dont_flush_write_blocks = 1, ignore_cache_size = 2 };
int flush_cache_blocks(mutex::scoped_lock& l int flush_cache_blocks(mutex::scoped_lock& l
, int blocks, cache_t::iterator ignore , int blocks, int ignore = -1, int options = 0);
, int options = 0);
void flush_expired_pieces(); void flush_expired_pieces();
int flush_and_remove(cache_t::iterator i, mutex::scoped_lock& l); int flush_contiguous_blocks(cached_piece_entry& p
int flush_contiguous_blocks(disk_io_thread::cache_t::iterator e
, mutex::scoped_lock& l, int lower_limit = 0); , mutex::scoped_lock& l, int lower_limit = 0);
int flush_range(cache_t::iterator i, int start, int end, mutex::scoped_lock& l); int flush_range(cached_piece_entry& p, int start, int end, mutex::scoped_lock& l);
int cache_block(disk_io_job& j int cache_block(disk_io_job& j
, boost::function<void(int,disk_io_job const&)>& handler , boost::function<void(int,disk_io_job const&)>& handler
, mutex::scoped_lock& l); , mutex::scoped_lock& l);
// read cache operations // read cache operations
int clear_oldest_read_piece(int num_blocks, cache_t::iterator ignore int clear_oldest_read_piece(int num_blocks, int ignore
, mutex::scoped_lock& l); , mutex::scoped_lock& l);
int read_into_piece(cached_piece_entry& p, int start_block int read_into_piece(cached_piece_entry& p, int start_block
, int options, int num_blocks, mutex::scoped_lock& l); , int options, int num_blocks, mutex::scoped_lock& l);
@ -353,7 +374,7 @@ namespace libtorrent
int free_piece(cached_piece_entry& p, mutex::scoped_lock& l); int free_piece(cached_piece_entry& p, mutex::scoped_lock& l);
int try_read_from_cache(disk_io_job const& j); int try_read_from_cache(disk_io_job const& j);
int read_piece_from_cache_and_hash(disk_io_job const& j, sha1_hash& h); int read_piece_from_cache_and_hash(disk_io_job const& j, sha1_hash& h);
int cache_piece(disk_io_job const& j, cache_t::iterator& p int cache_piece(disk_io_job const& j, cache_piece_index_t::iterator& p
, bool& hit, int options, mutex::scoped_lock& l); , bool& hit, int options, mutex::scoped_lock& l);
// this mutex only protects m_jobs, m_queue_buffer_size // this mutex only protects m_jobs, m_queue_buffer_size

View File

@ -419,17 +419,24 @@ namespace libtorrent
} }
} }
disk_io_thread::cache_t::iterator disk_io_thread::find_cached_piece( struct update_last_use
{
void operator()(disk_io_thread::cached_piece_entry& p)
{
TORRENT_ASSERT(p.storage);
p.last_use = time_now();
}
};
disk_io_thread::cache_piece_index_t::iterator disk_io_thread::find_cached_piece(
disk_io_thread::cache_t& cache disk_io_thread::cache_t& cache
, disk_io_job const& j, mutex::scoped_lock& l) , disk_io_job const& j, mutex::scoped_lock& l)
{ {
for (cache_t::iterator i = cache.begin() cache_piece_index_t& idx = cache.get<0>();
, end(cache.end()); i != end; ++i) cache_piece_index_t::iterator i
{ = idx.find(std::pair<void*, int>(j.storage.get(), j.piece));
if (i->storage != j.storage || i->piece != j.piece) continue; TORRENT_ASSERT(i == idx.end() || (i->storage == j.storage && i->piece == j.piece));
return i; return i;
}
return cache.end();
} }
void disk_io_thread::flush_expired_pieces() void disk_io_thread::flush_expired_pieces()
@ -440,33 +447,23 @@ namespace libtorrent
INVARIANT_CHECK; INVARIANT_CHECK;
// flush write cache // flush write cache
for (;;) cache_lru_index_t& widx = m_pieces.get<1>();
cache_lru_index_t::iterator i = widx.begin();
time_duration cut_off = seconds(m_settings.cache_expiry);
while (i != widx.end() && now - i->last_use > cut_off)
{ {
cache_t::iterator i = std::min_element( TORRENT_ASSERT(i->storage);
m_pieces.begin(), m_pieces.end() flush_range(const_cast<cached_piece_entry&>(*i), 0, INT_MAX, l);
, bind(&cached_piece_entry::last_use, _1) widx.erase(i++);
< bind(&cached_piece_entry::last_use, _2));
if (i == m_pieces.end()) break;
int age = total_seconds(now - i->last_use);
if (age < m_settings.cache_expiry) break;
flush_and_remove(i, l);
} }
if (m_settings.explicit_read_cache) return; if (m_settings.explicit_read_cache) return;
// flush read cache // flush read cache
for (;;) cache_lru_index_t& ridx = m_read_pieces.get<1>();
{ i = ridx.begin();
cache_t::iterator i = std::min_element( while (i != ridx.end() && now - i->last_use > cut_off)
m_read_pieces.begin(), m_read_pieces.end() ridx.erase(i++);
, bind(&cached_piece_entry::last_use, _1)
< bind(&cached_piece_entry::last_use, _2));
if (i == m_read_pieces.end()) break;
int age = total_seconds(now - i->last_use);
if (age < m_settings.cache_expiry) break;
free_piece(*i, l);
m_read_pieces.erase(i);
}
} }
// returns the number of blocks that were freed // returns the number of blocks that were freed
@ -491,61 +488,61 @@ namespace libtorrent
// returns the number of blocks that were freed // returns the number of blocks that were freed
int disk_io_thread::clear_oldest_read_piece( int disk_io_thread::clear_oldest_read_piece(
int num_blocks int num_blocks, int ignore, mutex::scoped_lock& l)
, cache_t::iterator ignore
, mutex::scoped_lock& l)
{ {
INVARIANT_CHECK; INVARIANT_CHECK;
cache_t::iterator i = std::min_element( cache_lru_index_t& idx = m_read_pieces.get<1>();
m_read_pieces.begin(), m_read_pieces.end() if (idx.empty()) return 0;
, bind(&cached_piece_entry::last_use, _1)
< bind(&cached_piece_entry::last_use, _2)); cache_lru_index_t::iterator i = idx.begin();
if (i != m_read_pieces.end() && i != ignore) if (i->piece == ignore)
{ {
// don't replace an entry that is less than one second old ++i;
if (time_now() - i->last_use < seconds(1)) return 0; if (i == idx.end()) return 0;
int blocks = 0;
if (num_blocks >= i->num_blocks)
{
blocks = free_piece(*i, l);
}
else
{
// delete blocks from the start and from the end
// until num_blocks have been freed
int end = (i->storage->info()->piece_size(i->piece) + m_block_size - 1) / m_block_size - 1;
int start = 0;
while (num_blocks)
{
while (i->blocks[start].buf == 0 && start <= end) ++start;
if (start > end) break;
free_buffer(i->blocks[start].buf);
i->blocks[start].buf = 0;
++blocks;
--i->num_blocks;
--m_cache_stats.cache_size;
--m_cache_stats.read_cache_size;
--num_blocks;
if (!num_blocks) break;
while (i->blocks[end].buf == 0 && start <= end) --end;
if (start > end) break;
free_buffer(i->blocks[end].buf);
i->blocks[end].buf = 0;
++blocks;
--i->num_blocks;
--m_cache_stats.cache_size;
--m_cache_stats.read_cache_size;
--num_blocks;
}
}
if (i->num_blocks == 0) m_read_pieces.erase(i);
return blocks;
} }
return 0;
// don't replace an entry that is less than one second old
if (time_now() - i->last_use < seconds(1)) return 0;
int blocks = 0;
if (num_blocks >= i->num_blocks)
{
blocks = free_piece(const_cast<cached_piece_entry&>(*i), l);
}
else
{
// delete blocks from the start and from the end
// until num_blocks have been freed
int end = (i->storage->info()->piece_size(i->piece) + m_block_size - 1) / m_block_size - 1;
int start = 0;
while (num_blocks)
{
while (i->blocks[start].buf == 0 && start <= end) ++start;
if (start > end) break;
free_buffer(i->blocks[start].buf);
i->blocks[start].buf = 0;
++blocks;
--const_cast<cached_piece_entry&>(*i).num_blocks;
--m_cache_stats.cache_size;
--m_cache_stats.read_cache_size;
--num_blocks;
if (!num_blocks) break;
while (i->blocks[end].buf == 0 && start <= end) --end;
if (start > end) break;
free_buffer(i->blocks[end].buf);
i->blocks[end].buf = 0;
++blocks;
--const_cast<cached_piece_entry&>(*i).num_blocks;
--m_cache_stats.cache_size;
--m_cache_stats.read_cache_size;
--num_blocks;
}
}
if (i->num_blocks == 0) idx.erase(i);
return blocks;
} }
int contiguous_blocks(disk_io_thread::cached_piece_entry const& b) int contiguous_blocks(disk_io_thread::cached_piece_entry const& b)
@ -566,7 +563,7 @@ namespace libtorrent
return ret; return ret;
} }
int disk_io_thread::flush_contiguous_blocks(disk_io_thread::cache_t::iterator e int disk_io_thread::flush_contiguous_blocks(cached_piece_entry& p
, mutex::scoped_lock& l, int lower_limit) , mutex::scoped_lock& l, int lower_limit)
{ {
// first find the largest range of contiguous blocks // first find the largest range of contiguous blocks
@ -574,11 +571,11 @@ namespace libtorrent
int current = 0; int current = 0;
int pos = 0; int pos = 0;
int start = 0; int start = 0;
int blocks_in_piece = (e->storage->info()->piece_size(e->piece) int blocks_in_piece = (p.storage->info()->piece_size(p.piece)
+ m_block_size - 1) / m_block_size; + m_block_size - 1) / m_block_size;
for (int i = 0; i < blocks_in_piece; ++i) for (int i = 0; i < blocks_in_piece; ++i)
{ {
if (e->blocks[i].buf) ++current; if (p.blocks[i].buf) ++current;
else else
{ {
if (current > len) if (current > len)
@ -597,14 +594,13 @@ namespace libtorrent
} }
if (len < lower_limit || len <= 0) return 0; if (len < lower_limit || len <= 0) return 0;
len = flush_range(e, pos, pos + len, l); len = flush_range(p, pos, pos + len, l);
if (e->num_blocks == 0) m_pieces.erase(e);
return len; return len;
} }
// flushes 'blocks' blocks from the cache // flushes 'blocks' blocks from the cache
int disk_io_thread::flush_cache_blocks(mutex::scoped_lock& l int disk_io_thread::flush_cache_blocks(mutex::scoped_lock& l
, int blocks, cache_t::iterator ignore, int options) , int blocks, int ignore, int options)
{ {
// first look if there are any read cache entries that can // first look if there are any read cache entries that can
// be cleared // be cleared
@ -620,28 +616,29 @@ namespace libtorrent
if (m_settings.disk_cache_algorithm == session_settings::lru) if (m_settings.disk_cache_algorithm == session_settings::lru)
{ {
cache_lru_index_t& idx = m_read_pieces.get<1>();
while (blocks > 0) while (blocks > 0)
{ {
cache_t::iterator i = std::min_element( cache_lru_index_t::iterator i = idx.begin();
m_pieces.begin(), m_pieces.end() if (i == idx.end()) return ret;
, bind(&cached_piece_entry::last_use, _1) tmp = flush_range(const_cast<cached_piece_entry&>(*i), 0, INT_MAX, l);
< bind(&cached_piece_entry::last_use, _2)); idx.erase(i);
if (i == m_pieces.end()) return ret;
tmp = flush_and_remove(i, l);
blocks -= tmp; blocks -= tmp;
ret += tmp; ret += tmp;
} }
} }
else if (m_settings.disk_cache_algorithm == session_settings::largest_contiguous) else if (m_settings.disk_cache_algorithm == session_settings::largest_contiguous)
{ {
cache_lru_index_t& idx = m_read_pieces.get<1>();
while (blocks > 0) while (blocks > 0)
{ {
cache_t::iterator i = std::max_element( cache_lru_index_t::iterator i =
m_pieces.begin(), m_pieces.end() std::max_element(idx.begin(), idx.end()
, bind(&contiguous_blocks, _1) , bind(&contiguous_blocks, _1)
< bind(&contiguous_blocks, _2)); < bind(&contiguous_blocks, _2));
if (i == m_pieces.end()) return ret; if (i == idx.end()) return ret;
tmp = flush_contiguous_blocks(i, l); tmp = flush_contiguous_blocks(const_cast<cached_piece_entry&>(*i), l);
if (i->num_blocks == 0) idx.erase(i);
blocks -= tmp; blocks -= tmp;
ret += tmp; ret += tmp;
} }
@ -649,23 +646,13 @@ namespace libtorrent
return ret; return ret;
} }
int disk_io_thread::flush_and_remove(disk_io_thread::cache_t::iterator e int disk_io_thread::flush_range(cached_piece_entry& p
, mutex::scoped_lock& l)
{
int ret = flush_range(e, 0, INT_MAX, l);
m_pieces.erase(e);
return ret;
}
int disk_io_thread::flush_range(disk_io_thread::cache_t::iterator e
, int start, int end, mutex::scoped_lock& l) , int start, int end, mutex::scoped_lock& l)
{ {
INVARIANT_CHECK; INVARIANT_CHECK;
TORRENT_ASSERT(start < end); TORRENT_ASSERT(start < end);
// TODO: copy *e and unlink it before unlocking
cached_piece_entry& p = *e;
int piece_size = p.storage->info()->piece_size(p.piece); int piece_size = p.storage->info()->piece_size(p.piece);
#ifdef TORRENT_DISK_STATS #ifdef TORRENT_DISK_STATS
m_log << log_time() << " flushing " << piece_size << std::endl; m_log << log_time() << " flushing " << piece_size << std::endl;
@ -792,7 +779,9 @@ namespace libtorrent
p.blocks[block].buf = j.buffer; p.blocks[block].buf = j.buffer;
p.blocks[block].callback.swap(handler); p.blocks[block].callback.swap(handler);
++m_cache_stats.cache_size; ++m_cache_stats.cache_size;
m_pieces.push_back(p); cache_lru_index_t& idx = m_pieces.get<1>();
TORRENT_ASSERT(p.storage);
idx.insert(p);
return 0; return 0;
} }
@ -939,7 +928,8 @@ namespace libtorrent
// this function will create a new cached_piece_entry // this function will create a new cached_piece_entry
// and requires that it doesn't already exist // and requires that it doesn't already exist
TORRENT_ASSERT(find_cached_piece(m_read_pieces, j, l) == m_read_pieces.end()); cache_piece_index_t& idx = m_read_pieces.get<0>();
TORRENT_ASSERT(find_cached_piece(m_read_pieces, j, l) == idx.end());
int piece_size = j.storage->info()->piece_size(j.piece); int piece_size = j.storage->info()->piece_size(j.piece);
int blocks_in_piece = (piece_size + m_block_size - 1) / m_block_size; int blocks_in_piece = (piece_size + m_block_size - 1) / m_block_size;
@ -954,7 +944,7 @@ namespace libtorrent
if (in_use() + blocks_to_read > m_settings.cache_size) if (in_use() + blocks_to_read > m_settings.cache_size)
{ {
int clear = in_use() + blocks_to_read - m_settings.cache_size; int clear = in_use() + blocks_to_read - m_settings.cache_size;
if (flush_cache_blocks(l, clear, m_read_pieces.end(), dont_flush_write_blocks) < clear) if (flush_cache_blocks(l, clear, j.piece, dont_flush_write_blocks) < clear)
return -2; return -2;
} }
@ -965,9 +955,11 @@ namespace libtorrent
p.num_blocks = 0; p.num_blocks = 0;
p.blocks.reset(new (std::nothrow) cached_block_entry[blocks_in_piece]); p.blocks.reset(new (std::nothrow) cached_block_entry[blocks_in_piece]);
if (!p.blocks) return -1; if (!p.blocks) return -1;
int ret = read_into_piece(p, start_block, 0, blocks_to_read, l); int ret = read_into_piece(p, start_block, 0, blocks_to_read, l);
if (ret >= 0) m_read_pieces.push_back(p); TORRENT_ASSERT(p.storage);
if (ret >= 0) idx.insert(p);
return ret; return ret;
} }
@ -976,13 +968,14 @@ namespace libtorrent
void disk_io_thread::check_invariant() const void disk_io_thread::check_invariant() const
{ {
int cached_write_blocks = 0; int cached_write_blocks = 0;
for (cache_t::const_iterator i = m_pieces.begin() cache_piece_index_t const& idx = m_pieces.get<0>();
, end(m_pieces.end()); i != end; ++i) for (cache_piece_index_t::const_iterator i = idx.begin()
, end(idx.end()); i != end; ++i)
{ {
cached_piece_entry const& p = *i; cached_piece_entry const& p = *i;
TORRENT_ASSERT(p.blocks); TORRENT_ASSERT(p.blocks);
if (!p.storage) continue; TORRENT_ASSERT(p.storage);
int piece_size = p.storage->info()->piece_size(p.piece); int piece_size = p.storage->info()->piece_size(p.piece);
int blocks_in_piece = (piece_size + m_block_size - 1) / m_block_size; int blocks_in_piece = (piece_size + m_block_size - 1) / m_block_size;
int blocks = 0; int blocks = 0;
@ -1024,8 +1017,8 @@ namespace libtorrent
cached_read_blocks += blocks; cached_read_blocks += blocks;
} }
TORRENT_ASSERT(cached_read_blocks + cached_write_blocks == m_cache_stats.cache_size);
TORRENT_ASSERT(cached_read_blocks == m_cache_stats.read_cache_size); TORRENT_ASSERT(cached_read_blocks == m_cache_stats.read_cache_size);
TORRENT_ASSERT(cached_read_blocks + cached_write_blocks == m_cache_stats.cache_size);
#ifdef TORRENT_DISK_STATS #ifdef TORRENT_DISK_STATS
int read_allocs = m_categories.find(std::string("read cache"))->second; int read_allocs = m_categories.find(std::string("read cache"))->second;
@ -1043,11 +1036,12 @@ namespace libtorrent
// reads the full piece specified by j into the read cache // reads the full piece specified by j into the read cache
// returns the iterator to it and whether or not it already // returns the iterator to it and whether or not it already
// was in the cache (hit). // was in the cache (hit).
int disk_io_thread::cache_piece(disk_io_job const& j, cache_t::iterator& p int disk_io_thread::cache_piece(disk_io_job const& j, cache_piece_index_t::iterator& p
, bool& hit, int options, mutex::scoped_lock& l) , bool& hit, int options, mutex::scoped_lock& l)
{ {
INVARIANT_CHECK; INVARIANT_CHECK;
cache_piece_index_t& idx = m_read_pieces.get<0>();
p = find_cached_piece(m_read_pieces, j, l); p = find_cached_piece(m_read_pieces, j, l);
hit = true; hit = true;
@ -1060,9 +1054,11 @@ namespace libtorrent
{ {
INVARIANT_CHECK; INVARIANT_CHECK;
// we have the piece in the cache, but not all of the blocks // we have the piece in the cache, but not all of the blocks
ret = read_into_piece(*p, 0, options, blocks_in_piece, l); ret = read_into_piece(const_cast<cached_piece_entry&>(*p), 0
, options, blocks_in_piece, l);
hit = false; hit = false;
if (ret < 0) return ret; if (ret < 0) return ret;
idx.modify(p, update_last_use());
} }
else if (p == m_read_pieces.end()) else if (p == m_read_pieces.end())
{ {
@ -1080,12 +1076,15 @@ namespace libtorrent
if (!pe.blocks) return -1; if (!pe.blocks) return -1;
ret = read_into_piece(pe, 0, options, INT_MAX, l); ret = read_into_piece(pe, 0, options, INT_MAX, l);
if (ret >= 0) m_read_pieces.push_back(pe);
hit = false; hit = false;
p = m_read_pieces.end(); if (ret < 0) return ret;
--p; TORRENT_ASSERT(pe.storage);
p = idx.insert(pe).first;
}
else
{
idx.modify(p, update_last_use());
} }
p->last_use = time_now();
TORRENT_ASSERT(!m_read_pieces.empty()); TORRENT_ASSERT(!m_read_pieces.empty());
TORRENT_ASSERT(p->piece == j.piece); TORRENT_ASSERT(p->piece == j.piece);
TORRENT_ASSERT(p->storage == j.storage); TORRENT_ASSERT(p->storage == j.storage);
@ -1099,7 +1098,7 @@ namespace libtorrent
mutex::scoped_lock l(m_piece_mutex); mutex::scoped_lock l(m_piece_mutex);
cache_t::iterator p; cache_piece_index_t::iterator p;
bool hit; bool hit;
int ret = cache_piece(j, p, hit, ignore_cache_size, l); int ret = cache_piece(j, p, hit, ignore_cache_size, l);
if (ret < 0) return ret; if (ret < 0) return ret;
@ -1117,9 +1116,11 @@ namespace libtorrent
} }
h = ctx.final(); h = ctx.final();
ret = copy_from_piece(p, hit, j, l); ret = copy_from_piece(const_cast<cached_piece_entry&>(*p), hit, j, l);
TORRENT_ASSERT(ret > 0); TORRENT_ASSERT(ret > 0);
if (ret < 0) return ret; if (ret < 0) return ret;
cache_piece_index_t& idx = m_read_pieces.get<0>();
idx.modify(p, update_last_use());
// if read cache is disabled or we exceeded the // if read cache is disabled or we exceeded the
// limit, remove this piece from the cache // limit, remove this piece from the cache
@ -1135,7 +1136,7 @@ namespace libtorrent
TORRENT_ASSERT(p->storage == j.storage); TORRENT_ASSERT(p->storage == j.storage);
if (p != m_read_pieces.end()) if (p != m_read_pieces.end())
{ {
free_piece(*p, l); free_piece(const_cast<cached_piece_entry&>(*p), l);
m_read_pieces.erase(p); m_read_pieces.erase(p);
} }
} }
@ -1152,7 +1153,7 @@ namespace libtorrent
// this is similar to copy_from_piece() but it // this is similar to copy_from_piece() but it
// doesn't do anything but determining if it's a // doesn't do anything but determining if it's a
// cache hit or not // cache hit or not
bool disk_io_thread::is_cache_hit(cache_t::iterator p bool disk_io_thread::is_cache_hit(cached_piece_entry& p
, disk_io_job const& j, mutex::scoped_lock& l) , disk_io_job const& j, mutex::scoped_lock& l)
{ {
int block = j.offset / m_block_size; int block = j.offset / m_block_size;
@ -1164,13 +1165,13 @@ namespace libtorrent
// if we have to read more than one block, and // if we have to read more than one block, and
// the first block is there, make sure we test // the first block is there, make sure we test
// for the second block // for the second block
if (p->blocks[start_block].buf != 0 && min_blocks_to_read > 1) if (p.blocks[start_block].buf != 0 && min_blocks_to_read > 1)
++start_block; ++start_block;
return p->blocks[start_block].buf != 0; return p.blocks[start_block].buf != 0;
} }
int disk_io_thread::copy_from_piece(cache_t::iterator p, bool& hit int disk_io_thread::copy_from_piece(cached_piece_entry& p, bool& hit
, disk_io_job const& j, mutex::scoped_lock& l) , disk_io_job const& j, mutex::scoped_lock& l)
{ {
TORRENT_ASSERT(j.buffer); TORRENT_ASSERT(j.buffer);
@ -1183,12 +1184,12 @@ namespace libtorrent
int min_blocks_to_read = block_offset > 0 ? 2 : 1; int min_blocks_to_read = block_offset > 0 ? 2 : 1;
TORRENT_ASSERT(size <= m_block_size); TORRENT_ASSERT(size <= m_block_size);
int start_block = block; int start_block = block;
if (p->blocks[start_block].buf != 0 && min_blocks_to_read > 1) if (p.blocks[start_block].buf != 0 && min_blocks_to_read > 1)
++start_block; ++start_block;
// if block_offset > 0, we need to read two blocks, and then // if block_offset > 0, we need to read two blocks, and then
// copy parts of both, because it's not aligned to the block // copy parts of both, because it's not aligned to the block
// boundaries // boundaries
if (p->blocks[start_block].buf == 0) if (p.blocks[start_block].buf == 0)
{ {
// if we use an explicit read cache, pretend there's no // if we use an explicit read cache, pretend there's no
// space to force hitting disk without caching anything // space to force hitting disk without caching anything
@ -1197,7 +1198,7 @@ namespace libtorrent
int piece_size = j.storage->info()->piece_size(j.piece); int piece_size = j.storage->info()->piece_size(j.piece);
int blocks_in_piece = (piece_size + m_block_size - 1) / m_block_size; int blocks_in_piece = (piece_size + m_block_size - 1) / m_block_size;
int end_block = start_block; int end_block = start_block;
while (end_block < blocks_in_piece && p->blocks[end_block].buf == 0) ++end_block; while (end_block < blocks_in_piece && p.blocks[end_block].buf == 0) ++end_block;
int blocks_to_read = end_block - block; int blocks_to_read = end_block - block;
blocks_to_read = (std::min)(blocks_to_read, (std::max)((m_settings.cache_size blocks_to_read = (std::min)(blocks_to_read, (std::max)((m_settings.cache_size
@ -1209,25 +1210,24 @@ namespace libtorrent
if (in_use() + blocks_to_read > m_settings.cache_size) if (in_use() + blocks_to_read > m_settings.cache_size)
{ {
int clear = in_use() + blocks_to_read - m_settings.cache_size; int clear = in_use() + blocks_to_read - m_settings.cache_size;
if (flush_cache_blocks(l, clear, m_read_pieces.end(), dont_flush_write_blocks) < clear) if (flush_cache_blocks(l, clear, p.piece, dont_flush_write_blocks) < clear)
return -2; return -2;
} }
int ret = read_into_piece(*p, block, 0, blocks_to_read, l); int ret = read_into_piece(p, block, 0, blocks_to_read, l);
hit = false; hit = false;
if (ret < 0) return ret; if (ret < 0) return ret;
if (ret < size + block_offset) return -2; if (ret < size + block_offset) return -2;
TORRENT_ASSERT(p->blocks[block].buf); TORRENT_ASSERT(p.blocks[block].buf);
} }
p->last_use = time_now();
while (size > 0) while (size > 0)
{ {
TORRENT_ASSERT(p->blocks[block].buf); TORRENT_ASSERT(p.blocks[block].buf);
int to_copy = (std::min)(m_block_size int to_copy = (std::min)(m_block_size
- block_offset, size); - block_offset, size);
std::memcpy(j.buffer + buffer_offset std::memcpy(j.buffer + buffer_offset
, p->blocks[block].buf + block_offset , p.blocks[block].buf + block_offset
, to_copy); , to_copy);
size -= to_copy; size -= to_copy;
block_offset = 0; block_offset = 0;
@ -1244,7 +1244,8 @@ namespace libtorrent
mutex::scoped_lock l(m_piece_mutex); mutex::scoped_lock l(m_piece_mutex);
if (!m_settings.use_read_cache) return -2; if (!m_settings.use_read_cache) return -2;
cache_t::iterator p cache_piece_index_t& idx = m_read_pieces.get<0>();
cache_piece_index_t::iterator p
= find_cached_piece(m_read_pieces, j, l); = find_cached_piece(m_read_pieces, j, l);
bool hit = true; bool hit = true;
@ -1253,7 +1254,7 @@ namespace libtorrent
// if the piece cannot be found in the cache, // if the piece cannot be found in the cache,
// read the whole piece starting at the block // read the whole piece starting at the block
// we got a request for. // we got a request for.
if (p == m_read_pieces.end()) if (p == idx.end())
{ {
// if we use an explicit read cache and we // if we use an explicit read cache and we
// couldn't find the block in the cache, // couldn't find the block in the cache,
@ -1265,17 +1266,18 @@ namespace libtorrent
ret = cache_read_block(j, l); ret = cache_read_block(j, l);
hit = false; hit = false;
if (ret < 0) return ret; if (ret < 0) return ret;
p = m_read_pieces.end();
--p; p = find_cached_piece(m_read_pieces, j, l);
TORRENT_ASSERT(!m_read_pieces.empty()); TORRENT_ASSERT(!m_read_pieces.empty());
TORRENT_ASSERT(p->piece == j.piece); TORRENT_ASSERT(p->piece == j.piece);
TORRENT_ASSERT(p->storage == j.storage); TORRENT_ASSERT(p->storage == j.storage);
} }
if (p == m_read_pieces.end()) return ret; TORRENT_ASSERT(p != idx.end());
ret = copy_from_piece(p, hit, j, l); ret = copy_from_piece(const_cast<cached_piece_entry&>(*p), hit, j, l);
if (ret < 0) return ret; if (ret < 0) return ret;
idx.modify(p, update_last_use());
ret = j.buffer_size; ret = j.buffer_size;
++m_cache_stats.blocks_read; ++m_cache_stats.blocks_read;
@ -1419,12 +1421,16 @@ namespace libtorrent
mutex::scoped_lock l(m_piece_mutex); mutex::scoped_lock l(m_piece_mutex);
// flush all disk caches // flush all disk caches
for (cache_t::iterator i = m_pieces.begin() cache_piece_index_t& widx = m_pieces.get<0>();
, end(m_pieces.end()); i != end; ++i) for (cache_piece_index_t::iterator i = widx.begin()
flush_range(i, 0, INT_MAX, l); , end(widx.end()); i != end; ++i)
for (cache_t::iterator i = m_read_pieces.begin() flush_range(const_cast<cached_piece_entry&>(*i), 0, INT_MAX, l);
, end(m_read_pieces.end()); i != end; ++i)
free_piece(*i, l); cache_piece_index_t& idx = m_read_pieces.get<0>();
for (cache_piece_index_t::iterator i = idx.begin()
, end(idx.end()); i != end; ++i)
free_piece(const_cast<cached_piece_entry&>(*i), l);
m_pieces.clear(); m_pieces.clear();
m_read_pieces.clear(); m_read_pieces.clear();
// release the io_service to allow the run() call to return // release the io_service to allow the run() call to return
@ -1466,11 +1472,12 @@ namespace libtorrent
// made asyncronous, this would not be // made asyncronous, this would not be
// necessary anymore // necessary anymore
mutex::scoped_lock l(m_piece_mutex); mutex::scoped_lock l(m_piece_mutex);
cache_t::iterator p cache_piece_index_t::iterator p
= find_cached_piece(m_read_pieces, j, l); = find_cached_piece(m_read_pieces, j, l);
cache_piece_index_t& idx = m_read_pieces.get<0>();
// if it's a cache hit, process the job immediately // if it's a cache hit, process the job immediately
if (p != m_read_pieces.end() && is_cache_hit(p, j, l)) if (p != idx.end() && is_cache_hit(const_cast<cached_piece_entry&>(*p), j, l))
defer = false; defer = false;
} }
} }
@ -1809,21 +1816,21 @@ namespace libtorrent
INVARIANT_CHECK; INVARIANT_CHECK;
if (in_use() >= m_settings.cache_size) if (in_use() >= m_settings.cache_size)
flush_cache_blocks(l, in_use() - m_settings.cache_size + 1, m_read_pieces.end()); flush_cache_blocks(l, in_use() - m_settings.cache_size + 1);
cache_t::iterator p cache_piece_index_t& idx = m_pieces.get<0>();
= find_cached_piece(m_pieces, j, l); cache_piece_index_t::iterator p = find_cached_piece(m_pieces, j, l);
int block = j.offset / m_block_size; int block = j.offset / m_block_size;
TORRENT_ASSERT(j.buffer); TORRENT_ASSERT(j.buffer);
TORRENT_ASSERT(j.buffer_size <= m_block_size); TORRENT_ASSERT(j.buffer_size <= m_block_size);
if (p != m_pieces.end()) if (p != idx.end())
{ {
TORRENT_ASSERT(p->blocks[block].buf == 0); TORRENT_ASSERT(p->blocks[block].buf == 0);
if (p->blocks[block].buf) if (p->blocks[block].buf)
{ {
free_buffer(p->blocks[block].buf); free_buffer(p->blocks[block].buf);
--m_cache_stats.cache_size; --m_cache_stats.cache_size;
--p->num_blocks; --const_cast<cached_piece_entry&>(*p).num_blocks;
} }
p->blocks[block].buf = j.buffer; p->blocks[block].buf = j.buffer;
p->blocks[block].callback.swap(j.callback); p->blocks[block].callback.swap(j.callback);
@ -1831,11 +1838,13 @@ namespace libtorrent
rename_buffer(j.buffer, "write cache"); rename_buffer(j.buffer, "write cache");
#endif #endif
++m_cache_stats.cache_size; ++m_cache_stats.cache_size;
++p->num_blocks; ++const_cast<cached_piece_entry&>(*p).num_blocks;
p->last_use = time_now(); idx.modify(p, update_last_use());
// we might just have created a contiguous range // we might just have created a contiguous range
// that meets the requirement to be flushed. try it // that meets the requirement to be flushed. try it
flush_contiguous_blocks(p, l, m_settings.write_cache_line_size); flush_contiguous_blocks(const_cast<cached_piece_entry&>(*p)
, l, m_settings.write_cache_line_size);
if (p->num_blocks == 0) idx.erase(p);
} }
else else
{ {
@ -1859,7 +1868,7 @@ namespace libtorrent
holder.release(); holder.release();
if (in_use() > m_settings.cache_size) if (in_use() > m_settings.cache_size)
flush_cache_blocks(l, in_use() - m_settings.cache_size, m_read_pieces.end()); flush_cache_blocks(l, in_use() - m_settings.cache_size);
break; break;
} }
@ -1878,7 +1887,7 @@ namespace libtorrent
INVARIANT_CHECK; INVARIANT_CHECK;
TORRENT_ASSERT(j.buffer == 0); TORRENT_ASSERT(j.buffer == 0);
cache_t::iterator p; cache_piece_index_t::iterator p;
bool hit; bool hit;
ret = cache_piece(j, p, hit, 0, l); ret = cache_piece(j, p, hit, 0, l);
if (ret == -2) ret = -1; if (ret == -2) ret = -1;
@ -1894,11 +1903,13 @@ namespace libtorrent
mutex::scoped_lock l(m_piece_mutex); mutex::scoped_lock l(m_piece_mutex);
INVARIANT_CHECK; INVARIANT_CHECK;
cache_t::iterator i cache_piece_index_t& idx = m_pieces.get<0>();
= find_cached_piece(m_pieces, j, l); cache_piece_index_t::iterator i = find_cached_piece(m_pieces, j, l);
if (i != m_pieces.end()) if (i != idx.end())
{ {
flush_and_remove(i, l); TORRENT_ASSERT(i->storage);
int ret = flush_range(const_cast<cached_piece_entry&>(*i), 0, INT_MAX, l);
idx.erase(i);
if (test_error(j)) if (test_error(j))
{ {
ret = -1; ret = -1;
@ -1953,7 +1964,7 @@ namespace libtorrent
{ {
if (i->storage == j.storage) if (i->storage == j.storage)
{ {
flush_range(i, 0, INT_MAX, l); flush_range(const_cast<cached_piece_entry&>(*i), 0, INT_MAX, l);
i = m_pieces.erase(i); i = m_pieces.erase(i);
} }
else else
@ -1983,7 +1994,7 @@ namespace libtorrent
{ {
if (i->storage == j.storage) if (i->storage == j.storage)
{ {
free_piece(*i, l); free_piece(const_cast<cached_piece_entry&>(*i), l);
i = m_read_pieces.erase(i); i = m_read_pieces.erase(i);
} }
else else
@ -2006,22 +2017,23 @@ namespace libtorrent
mutex::scoped_lock l(m_piece_mutex); mutex::scoped_lock l(m_piece_mutex);
INVARIANT_CHECK; INVARIANT_CHECK;
cache_t::iterator i = std::remove_if( cache_piece_index_t& idx = m_pieces.get<0>();
m_pieces.begin(), m_pieces.end(), bind(&cached_piece_entry::storage, _1) == j.storage); cache_piece_index_t::iterator start = idx.lower_bound(std::pair<void*, int>(j.storage.get(), 0));
cache_piece_index_t::iterator end = idx.upper_bound(std::pair<void*, int>(j.storage.get(), INT_MAX));
for (cache_t::iterator k = i; k != m_pieces.end(); ++k) for (cache_piece_index_t::iterator i = start; i != end; ++i)
{ {
torrent_info const& ti = *k->storage->info(); torrent_info const& ti = *i->storage->info();
int blocks_in_piece = (ti.piece_size(k->piece) + m_block_size - 1) / m_block_size; int blocks_in_piece = (ti.piece_size(i->piece) + m_block_size - 1) / m_block_size;
for (int j = 0; j < blocks_in_piece; ++j) for (int j = 0; j < blocks_in_piece; ++j)
{ {
if (k->blocks[j].buf == 0) continue; if (i->blocks[j].buf == 0) continue;
free_buffer(k->blocks[j].buf); free_buffer(i->blocks[j].buf);
k->blocks[j].buf = 0; i->blocks[j].buf = 0;
--m_cache_stats.cache_size; --m_cache_stats.cache_size;
} }
} }
m_pieces.erase(i, m_pieces.end()); idx.erase(start, end);
l.unlock(); l.unlock();
release_memory(); release_memory();