diff --git a/docs/manual.html b/docs/manual.html
index cd8ec16bb..e55cb2aaa 100644
--- a/docs/manual.html
+++ b/docs/manual.html
@@ -660,7 +660,11 @@ struct cache_status
{
size_type blocks_written;
size_type writes;
- int write_size;
+ size_type blocks_read;
+ size_type blocks_read_hit;
+ size_type reads;
+ int cache_size;
+ int read_cache_size;
};
@@ -671,7 +675,14 @@ session was started.
The ratio (blocks_written - writes) / blocks_written represents
the number of saved write operations per total write operations. i.e. a kind
of cache hit ratio for the write cahe.
-write_size is the number of 16 KiB blocks currently in the write cache.
+blocks_read is the number of blocks that were requested from the
+bittorrent engine (from peers), that were served from disk or cache.
+blocks_read_hit is the number of blocks that were served from cache.
+The ratio blocks_read_hit / blocks_read is the cache hit ratio
+for the read cache.
+cache_size is the number of 16 KiB blocks currently in the disk cache.
+This includes both read and write cache.
+read_cache_size is the number of 16KiB blocks in the read cache.
@@ -690,14 +701,14 @@ struct cached_piece_info
{
int piece;
std::vector<bool> blocks;
- ptime last_write;
+ ptime last_use;
};
piece is the piece index for this cache entry.
blocks has one entry for each block in this piece. true represents
the data for that block being in the disk cache and false means it's not.
-
last_write is the time when a block was last written to this piece. The older
+
last_use is the time when a block was last written to this piece. The older
a piece is, the more likely it is to be flushed to disk.
diff --git a/docs/manual.rst b/docs/manual.rst
index 716a76190..c38df951b 100644
--- a/docs/manual.rst
+++ b/docs/manual.rst
@@ -506,7 +506,11 @@ Returns status of the disk cache for this session.
{
size_type blocks_written;
size_type writes;
- int write_size;
+ size_type blocks_read;
+ size_type blocks_read_hit;
+ size_type reads;
+ int cache_size;
+ int read_cache_size;
};
``blocks_written`` is the total number of 16 KiB blocks written to disk
@@ -519,7 +523,18 @@ The ratio (``blocks_written`` - ``writes``) / ``blocks_written`` represents
the number of saved write operations per total write operations. i.e. a kind
of cache hit ratio for the write cahe.
-``write_size`` is the number of 16 KiB blocks currently in the write cache.
+``blocks_read`` is the number of blocks that were requested from the
+bittorrent engine (from peers), that were served from disk or cache.
+
+``blocks_read_hit`` is the number of blocks that were served from cache.
+
+The ratio ``blocks_read_hit`` / ``blocks_read`` is the cache hit ratio
+for the read cache.
+
+``cache_size`` is the number of 16 KiB blocks currently in the disk cache.
+This includes both read and write cache.
+
+``read_cache_size`` is the number of 16KiB blocks in the read cache.
get_cache_info()
----------------
@@ -539,7 +554,7 @@ specified info-hash (``ih``).
{
int piece;
std::vector
blocks;
- ptime last_write;
+ ptime last_use;
};
``piece`` is the piece index for this cache entry.
@@ -547,7 +562,7 @@ specified info-hash (``ih``).
``blocks`` has one entry for each block in this piece. ``true`` represents
the data for that block being in the disk cache and ``false`` means it's not.
-``last_write`` is the time when a block was last written to this piece. The older
+``last_use`` is the time when a block was last written to this piece. The older
a piece is, the more likely it is to be flushed to disk.
is_listening() listen_port() listen_on()
diff --git a/examples/client_test.cpp b/examples/client_test.cpp
index 2d11596de..c3724f2a5 100644
--- a/examples/client_test.cpp
+++ b/examples/client_test.cpp
@@ -1199,7 +1199,7 @@ int main(int ac, char* av[])
}
char* piece_state[4] = {"", "slow", "medium", "fast"};
out << "] " << piece_state[i->piece_state];
- if (cp) out << (i->piece_state > 0?" | ":"") << "cache age: " << total_seconds(time_now() - cp->last_write);
+ if (cp) out << (i->piece_state > 0?" | ":"") << "cache age: " << total_seconds(time_now() - cp->last_use);
out << "\n";
}
@@ -1229,6 +1229,8 @@ int main(int ac, char* av[])
}
cache_status cs = ses.get_cache_status();
+ if (cs.blocks_read < 1) cs.blocks_read = 1;
+ if (cs.blocks_written < 1) cs.blocks_written = 1;
out << "==== conns: " << sess_stat.num_peers
<< " down: " << esc("32") << add_suffix(sess_stat.download_rate) << "/s" << esc("0")
@@ -1239,8 +1241,10 @@ int main(int ac, char* av[])
<< " bw queues: (" << sess_stat.up_bandwidth_queue
<< " | " << sess_stat.down_bandwidth_queue << ") "
" write cache hits: " << ((cs.blocks_written - cs.writes) * 100 / cs.blocks_written) << "% "
- " cache size: " << add_suffix(cs.write_size * 16 * 1024)
- << " ====" << std::endl;
+ " read cache hits: " << (cs.blocks_read_hit * 100 / cs.blocks_read) << "% "
+ " cache size: " << add_suffix(cs.cache_size * 16 * 1024)
+ << " (" << add_suffix(cs.read_cache_size * 16 * 1024) << ")"
+ " ====" << std::endl;
if (print_log)
{
diff --git a/include/libtorrent/disk_io_thread.hpp b/include/libtorrent/disk_io_thread.hpp
index 417be5943..9f5d97e6b 100644
--- a/include/libtorrent/disk_io_thread.hpp
+++ b/include/libtorrent/disk_io_thread.hpp
@@ -45,6 +45,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include
#include
#include
+#include
#include "libtorrent/config.hpp"
namespace libtorrent
@@ -54,7 +55,7 @@ namespace libtorrent
{
int piece;
std::vector blocks;
- ptime last_write;
+ ptime last_use;
};
struct disk_io_job
@@ -101,13 +102,36 @@ namespace libtorrent
struct cache_status
{
+ cache_status()
+ : blocks_written(0)
+ , writes(0)
+ , blocks_read(0)
+ , blocks_read_hit(0)
+ , reads(0)
+ , cache_size(0)
+ , read_cache_size(0)
+ {}
+
// the number of 16kB blocks written
size_type blocks_written;
// the number of write operations used
size_type writes;
// (blocks_written - writes) / blocks_written represents the
// "cache hit" ratio in the write cache
- int write_size;
+ // the number of blocks read
+
+ // the number of blocks passed back to the bittorrent engine
+ size_type blocks_read;
+ // the number of blocks that was just copied from the read cache
+ size_type blocks_read_hit;
+ // the number of read operations used
+ size_type reads;
+
+ // the number of blocks in the cache (both read and write)
+ int cache_size;
+
+ // the number of blocks in the cache used for read cache
+ int read_cache_size;
};
// this is a singleton consisting of the thread and a queue
@@ -153,6 +177,10 @@ namespace libtorrent
char* allocate_buffer();
void free_buffer(char* buf);
+#ifndef NDEBUG
+ void check_invariant() const;
+#endif
+
private:
typedef boost::recursive_mutex mutex_t;
@@ -163,7 +191,7 @@ namespace libtorrent
// storage this piece belongs to
boost::intrusive_ptr storage;
// the last time a block was writting to this piece
- ptime last_write;
+ ptime last_use;
// the number of blocks in the cache for this piece
int num_blocks;
// the pointers to the block data
@@ -173,33 +201,70 @@ namespace libtorrent
char* allocate_buffer(mutex_t::scoped_lock& l);
void free_buffer(char* buf, mutex_t::scoped_lock& l);
+ // cache operations
std::vector::iterator find_cached_piece(
- disk_io_job const& j, mutex_t::scoped_lock& l);
+ std::vector& cache, disk_io_job const& j
+ , mutex_t::scoped_lock& l);
+
+ // write cache operations
void flush_oldest_piece(mutex_t::scoped_lock& l);
void flush_expired_pieces(mutex_t::scoped_lock& l);
void flush_and_remove(std::vector::iterator i, mutex_t::scoped_lock& l);
void flush(std::vector::iterator i, mutex_t::scoped_lock& l);
void cache_block(disk_io_job& j, mutex_t::scoped_lock& l);
+ // read cache operations
+ bool clear_oldest_read_piece(mutex_t::scoped_lock& l);
+ int read_into_piece(cached_piece_entry& p, int start_block, mutex_t::scoped_lock& l);
+ int cache_read_block(disk_io_job const& j, mutex_t::scoped_lock& l);
+ void free_piece(cached_piece_entry& p, mutex_t::scoped_lock& l);
+ bool make_room(int num_blocks, mutex_t::scoped_lock& l);
+ int try_read_from_cache(disk_io_job const& j, mutex_t::scoped_lock& l);
+
mutable mutex_t m_mutex;
boost::condition m_signal;
bool m_abort;
std::deque m_jobs;
size_type m_queue_buffer_size;
+ // write cache
std::vector m_pieces;
+
+ // read cache
+ std::vector m_read_pieces;
+
+ // total number of blocks in use by both the read
+ // and the write cache. This is not supposed to
+ // exceed m_cache_size
+ cache_status m_cache_stats;
int m_num_cached_blocks;
+
// in (16kB) blocks
int m_cache_size;
+
// expiration time of cache entries in seconds
int m_cache_expiry;
+ // if set to true, each piece flush will allocate
+ // one piece worth of temporary memory on the heap
+ // and copy the block data into it, and then perform
+ // a single write operation from that buffer.
+ // if memory is constrained, that temporary buffer
+ // might is avoided by setting this to false.
+ // in case the allocation fails, the piece flush
+ // falls back to writing each block separately.
+ bool m_coalesce_writes;
+ bool m_coalesce_reads;
+ bool m_use_read_cache;
+
// memory pool for read and write operations
// and disk cache
boost::pool<> m_pool;
-#ifndef NDEBUG
+ // number of bytes per block. The BitTorrent
+ // protocol defines the block size to 16 KiB.
int m_block_size;
+#ifndef NDEBUG
disk_io_job m_current;
#endif
diff --git a/src/disk_io_thread.cpp b/src/disk_io_thread.cpp
index 4b8e424dc..fdeabe659 100644
--- a/src/disk_io_thread.cpp
+++ b/src/disk_io_thread.cpp
@@ -33,6 +33,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/storage.hpp"
#include
#include "libtorrent/disk_io_thread.hpp"
+#include
#ifdef _WIN32
#include
@@ -49,15 +50,13 @@ namespace libtorrent
disk_io_thread::disk_io_thread(asio::io_service& ios, int block_size)
: m_abort(false)
, m_queue_buffer_size(0)
- , m_num_cached_blocks(0)
, m_cache_size(512) // 512 * 16kB = 8MB
, m_cache_expiry(60) // 1 minute
+ , m_coalesce_writes(true)
+ , m_coalesce_reads(true)
+ , m_use_read_cache(true)
, m_pool(block_size)
-#ifndef NDEBUG
, m_block_size(block_size)
-#endif
- , m_writes(0)
- , m_blocks_written(0)
, m_ios(ios)
, m_disk_io_thread(boost::ref(*this))
{
@@ -121,8 +120,8 @@ namespace libtorrent
if (ti.info_hash() != ih) continue;
cached_piece_info info;
info.piece = i->piece;
- info.last_write = i->last_write;
- int blocks_in_piece = (ti.piece_size(i->piece) + (16 * 1024) - 1) / (16 * 1024);
+ info.last_use = i->last_use;
+ int blocks_in_piece = (ti.piece_size(i->piece) + (m_block_size) - 1) / m_block_size;
info.blocks.resize(blocks_in_piece);
for (int b = 0; b < blocks_in_piece; ++b)
if (i->blocks[b]) info.blocks[b] = true;
@@ -133,11 +132,7 @@ namespace libtorrent
cache_status disk_io_thread::status() const
{
mutex_t::scoped_lock l(m_mutex);
- cache_status st;
- st.blocks_written = m_blocks_written;
- st.writes = m_writes;
- st.write_size = m_num_cached_blocks;
- return st;
+ return m_cache_stats;
}
void disk_io_thread::set_cache_size(int s)
@@ -187,7 +182,7 @@ namespace libtorrent
namespace
{
// The semantic of this operator is:
- // shouls lhs come before rhs in the job queue
+ // should lhs come before rhs in the job queue
bool operator<(disk_io_job const& lhs, disk_io_job const& rhs)
{
// NOTE: comparison inverted to make higher priority
@@ -206,16 +201,17 @@ namespace libtorrent
}
std::vector::iterator disk_io_thread::find_cached_piece(
- disk_io_job const& j, mutex_t::scoped_lock& l)
+ std::vector& cache
+ , disk_io_job const& j, mutex_t::scoped_lock& l)
{
TORRENT_ASSERT(l.locked());
- for (std::vector::iterator i = m_pieces.begin()
- , end(m_pieces.end()); i != end; ++i)
+ for (std::vector::iterator i = cache.begin()
+ , end(cache.end()); i != end; ++i)
{
if (i->storage != j.storage || i->piece != j.piece) continue;
return i;
}
- return m_pieces.end();
+ return cache.end();
}
void disk_io_thread::flush_expired_pieces(mutex_t::scoped_lock& l)
@@ -223,26 +219,66 @@ namespace libtorrent
ptime now = time_now();
TORRENT_ASSERT(l.locked());
+ INVARIANT_CHECK;
for (;;)
{
std::vector::iterator i = std::min_element(
m_pieces.begin(), m_pieces.end()
- , bind(&cached_piece_entry::last_write, _1)
- < bind(&cached_piece_entry::last_write, _2));
+ , bind(&cached_piece_entry::last_use, _1)
+ < bind(&cached_piece_entry::last_use, _2));
if (i == m_pieces.end()) return;
- int age = total_seconds(now - i->last_write);
+ int age = total_seconds(now - i->last_use);
if (age < m_cache_expiry) return;
flush_and_remove(i, l);
}
}
+ void disk_io_thread::free_piece(cached_piece_entry& p, mutex_t::scoped_lock& l)
+ {
+ TORRENT_ASSERT(l.locked());
+ int piece_size = p.storage->info()->piece_size(p.piece);
+ int blocks_in_piece = (piece_size + m_block_size - 1) / m_block_size;
+
+ for (int i = 0; i < blocks_in_piece; ++i)
+ {
+ if (p.blocks[i] == 0) continue;
+ free_buffer(p.blocks[i], l);
+ p.blocks[i] = 0;
+ --p.num_blocks;
+ --m_cache_stats.cache_size;
+ --m_cache_stats.read_cache_size;
+ }
+ }
+
+ bool disk_io_thread::clear_oldest_read_piece(mutex_t::scoped_lock& l)
+ {
+ INVARIANT_CHECK;
+
+ std::vector::iterator i = std::min_element(
+ m_read_pieces.begin(), m_read_pieces.end()
+ , bind(&cached_piece_entry::last_use, _1)
+ < bind(&cached_piece_entry::last_use, _2));
+ if (i != m_read_pieces.end())
+ {
+ free_piece(*i, l);
+ m_read_pieces.erase(i);
+ return true;
+ }
+ return false;
+ }
+
void disk_io_thread::flush_oldest_piece(mutex_t::scoped_lock& l)
{
TORRENT_ASSERT(l.locked());
+ INVARIANT_CHECK;
+ // first look if there are any read cache entries that can
+ // be cleared
+ if (clear_oldest_read_piece(l)) return;
+
std::vector::iterator i = std::min_element(
m_pieces.begin(), m_pieces.end()
- , bind(&cached_piece_entry::last_write, _1)
- < bind(&cached_piece_entry::last_write, _2));
+ , bind(&cached_piece_entry::last_use, _1)
+ < bind(&cached_piece_entry::last_use, _2));
if (i == m_pieces.end()) return;
flush_and_remove(i, l);
}
@@ -258,18 +294,17 @@ namespace libtorrent
, mutex_t::scoped_lock& l)
{
TORRENT_ASSERT(l.locked());
+ INVARIANT_CHECK;
cached_piece_entry& p = *e;
int piece_size = p.storage->info()->piece_size(p.piece);
#ifdef TORRENT_DISK_STATS
m_log << log_time() << " flushing " << piece_size << std::endl;
#endif
TORRENT_ASSERT(piece_size > 0);
-// char* buf = (char*)alloca(piece_size);
- std::vector temp(piece_size);
- char* buf = &temp[0];
- TORRENT_ASSERT(buf != 0);
+ boost::scoped_array buf;
+ if (m_coalesce_writes) buf.reset(new (std::nothrow) char[piece_size]);
- int blocks_in_piece = (piece_size + (16 * 1024) - 1) / (16 * 1024);
+ int blocks_in_piece = (piece_size + m_block_size - 1) / m_block_size;
int buffer_size = 0;
int offset = 0;
for (int i = 0; i <= blocks_in_piece; ++i)
@@ -277,30 +312,44 @@ namespace libtorrent
if (i == blocks_in_piece || p.blocks[i] == 0)
{
if (buffer_size == 0) continue;
+ TORRENT_ASSERT(buf);
- TORRENT_ASSERT(buffer_size <= i * 16 * 1024);
+ TORRENT_ASSERT(buffer_size <= i * m_block_size);
l.unlock();
- p.storage->write_impl(buf, p.piece, (std::min)(i * 16 * 1024, piece_size) - buffer_size, buffer_size);
+ p.storage->write_impl(buf.get(), p.piece, (std::min)(
+ i * m_block_size, piece_size) - buffer_size, buffer_size);
l.lock();
- ++m_writes;
+ ++m_cache_stats.writes;
// std::cerr << " flushing p: " << p.piece << " bytes: " << buffer_size << std::endl;
buffer_size = 0;
offset = 0;
continue;
}
- int block_size = (std::min)(piece_size - offset, 16 * 1024);
+ int block_size = (std::min)(piece_size - i * m_block_size, m_block_size);
TORRENT_ASSERT(offset + block_size <= piece_size);
TORRENT_ASSERT(offset + block_size > 0);
- std::memcpy(buf + offset, p.blocks[i], block_size);
- offset += 16 * 1024;
+ if (!buf)
+ {
+ l.unlock();
+ p.storage->write_impl(p.blocks[i], p.piece, i * m_block_size, block_size);
+ l.lock();
+ ++m_cache_stats.writes;
+ }
+ else
+ {
+ std::memcpy(buf.get() + offset, p.blocks[i], block_size);
+ offset += m_block_size;
+ buffer_size += block_size;
+ }
free_buffer(p.blocks[i], l);
p.blocks[i] = 0;
- buffer_size += block_size;
- ++m_blocks_written;
- --m_num_cached_blocks;
+ TORRENT_ASSERT(p.num_blocks > 0);
+ --p.num_blocks;
+ ++m_cache_stats.blocks_written;
+ --m_cache_stats.cache_size;
}
TORRENT_ASSERT(buffer_size == 0);
-// std::cerr << " flushing p: " << p.piece << " cached_blocks: " << m_num_cached_blocks << std::endl;
+// std::cerr << " flushing p: " << p.piece << " cached_blocks: " << m_cache_stats.cache_size << std::endl;
#ifndef NDEBUG
for (int i = 0; i < blocks_in_piece; ++i)
TORRENT_ASSERT(p.blocks[i] == 0);
@@ -310,39 +359,277 @@ namespace libtorrent
void disk_io_thread::cache_block(disk_io_job& j, mutex_t::scoped_lock& l)
{
TORRENT_ASSERT(l.locked());
- TORRENT_ASSERT(find_cached_piece(j, l) == m_pieces.end());
+ INVARIANT_CHECK;
+ TORRENT_ASSERT(find_cached_piece(m_pieces, j, l) == m_pieces.end());
cached_piece_entry p;
int piece_size = j.storage->info()->piece_size(j.piece);
- int blocks_in_piece = (piece_size + (16 * 1024) - 1) / (16 * 1024);
+ int blocks_in_piece = (piece_size + m_block_size - 1) / m_block_size;
p.piece = j.piece;
p.storage = j.storage;
- p.last_write = time_now();
+ p.last_use = time_now();
p.num_blocks = 1;
p.blocks.reset(new char*[blocks_in_piece]);
std::memset(&p.blocks[0], 0, blocks_in_piece * sizeof(char*));
- int block = j.offset / (16 * 1024);
-// std::cerr << " adding cache entry for p: " << j.piece << " block: " << block << " cached_blocks: " << m_num_cached_blocks << std::endl;
+ int block = j.offset / m_block_size;
+// std::cerr << " adding cache entry for p: " << j.piece << " block: " << block << " cached_blocks: " << m_cache_stats.cache_size << std::endl;
p.blocks[block] = j.buffer;
- ++m_num_cached_blocks;
+ ++m_cache_stats.cache_size;
m_pieces.push_back(p);
}
+ // fills a piece with data from disk, returns the total number of bytes
+ // read or -1 if there was an error
+ int disk_io_thread::read_into_piece(cached_piece_entry& p, int start_block, mutex_t::scoped_lock& l)
+ {
+ TORRENT_ASSERT(l.locked());
+
+ int piece_size = p.storage->info()->piece_size(p.piece);
+ int blocks_in_piece = (piece_size + m_block_size - 1) / m_block_size;
+
+ int end_block = start_block;
+ for (int i = start_block; i < blocks_in_piece
+ && m_cache_stats.cache_size < m_cache_size; ++i)
+ {
+ // this is a block that is already allocated
+ // stop allocating and don't read more than
+ // what we've allocated now
+ if (p.blocks[i]) break;
+ p.blocks[i] = allocate_buffer(l);
+
+ // the allocation failed, break
+ if (p.blocks[i] == 0) break;
+ ++p.num_blocks;
+ ++m_cache_stats.cache_size;
+ ++m_cache_stats.read_cache_size;
+ ++end_block;
+ }
+
+ if (end_block == start_block) return -2;
+
+ int buffer_size = piece_size - (end_block - 1) * m_block_size + (end_block - start_block - 1) * m_block_size;
+ TORRENT_ASSERT(buffer_size <= piece_size);
+ TORRENT_ASSERT(buffer_size + start_block * m_block_size <= piece_size);
+ boost::scoped_array buf;
+ if (m_coalesce_reads) buf.reset(new (std::nothrow) char[buffer_size]);
+ int ret = 0;
+ if (buf)
+ {
+ l.unlock();
+ ret += p.storage->read_impl(buf.get(), p.piece, start_block * m_block_size, buffer_size);
+ l.lock();
+ ++m_cache_stats.reads;
+ }
+
+ int piece_offset = start_block * m_block_size;
+ int offset = 0;
+ for (int i = start_block; i < end_block; ++i)
+ {
+ int block_size = (std::min)(piece_size - piece_offset, m_block_size);
+ if (p.blocks[i] == 0) break;
+ TORRENT_ASSERT(offset <= buffer_size);
+ TORRENT_ASSERT(piece_offset <= piece_size);
+ if (buf)
+ {
+ std::memcpy(p.blocks[i], buf.get() + offset, block_size);
+ }
+ else
+ {
+ l.unlock();
+ ret += p.storage->read_impl(p.blocks[i], p.piece, piece_offset, block_size);
+ l.lock();
+ ++m_cache_stats.reads;
+ }
+ offset += m_block_size;
+ piece_offset += m_block_size;
+ }
+ TORRENT_ASSERT(ret <= buffer_size);
+ return (ret != buffer_size) ? -1 : ret;
+ }
+
+ bool disk_io_thread::make_room(int num_blocks, mutex_t::scoped_lock& l)
+ {
+ TORRENT_ASSERT(l.locked());
+
+ if (m_cache_size - m_cache_stats.cache_size < num_blocks)
+ {
+ // there's not enough room in the cache, clear a piece
+ // from the read cache
+ if (!clear_oldest_read_piece(l)) return false;
+ }
+
+ return m_cache_size - m_cache_stats.cache_size >= num_blocks;
+ }
+
+ // returns -1 on read error, -2 if there isn't any space in the cache
+ // or the number of bytes read
+ int disk_io_thread::cache_read_block(disk_io_job const& j, mutex_t::scoped_lock& l)
+ {
+ TORRENT_ASSERT(l.locked());
+
+ INVARIANT_CHECK;
+
+ int piece_size = j.storage->info()->piece_size(j.piece);
+ int blocks_in_piece = (piece_size + m_block_size - 1) / m_block_size;
+
+ int start_block = j.offset / m_block_size;
+
+ if (!make_room(blocks_in_piece - start_block, l)) return -2;
+
+ cached_piece_entry p;
+ p.piece = j.piece;
+ p.storage = j.storage;
+ p.last_use = time_now();
+ p.num_blocks = 0;
+ p.blocks.reset(new char*[blocks_in_piece]);
+ std::memset(&p.blocks[0], 0, blocks_in_piece * sizeof(char*));
+ int ret = read_into_piece(p, start_block, l);
+
+ if (ret == -1)
+ free_piece(p, l);
+ else
+ m_read_pieces.push_back(p);
+
+ return ret;
+ }
+
+#ifndef NDEBUG
+ void disk_io_thread::check_invariant() const
+ {
+ int cached_write_blocks = 0;
+ for (std::vector::const_iterator i = m_pieces.begin()
+ , end(m_pieces.end()); i != end; ++i)
+ {
+ cached_piece_entry const& p = *i;
+ TORRENT_ASSERT(p.blocks);
+
+ int piece_size = p.storage->info()->piece_size(p.piece);
+ int blocks_in_piece = (piece_size + m_block_size - 1) / m_block_size;
+ int blocks = 0;
+ for (int k = 0; k < blocks_in_piece; ++k)
+ {
+ if (p.blocks[k])
+ {
+ TORRENT_ASSERT(m_pool.is_from(p.blocks[k]));
+ ++blocks;
+ }
+ }
+// TORRENT_ASSERT(blocks == p.num_blocks);
+ cached_write_blocks += blocks;
+ }
+
+ int cached_read_blocks = 0;
+ for (std::vector::const_iterator i = m_read_pieces.begin()
+ , end(m_read_pieces.end()); i != end; ++i)
+ {
+ cached_piece_entry const& p = *i;
+ TORRENT_ASSERT(p.blocks);
+
+ int piece_size = p.storage->info()->piece_size(p.piece);
+ int blocks_in_piece = (piece_size + m_block_size - 1) / m_block_size;
+ int blocks = 0;
+ for (int k = 0; k < blocks_in_piece; ++k)
+ {
+ if (p.blocks[k])
+ {
+ TORRENT_ASSERT(m_pool.is_from(p.blocks[k]));
+ ++blocks;
+ }
+ }
+// TORRENT_ASSERT(blocks == p.num_blocks);
+ cached_read_blocks += blocks;
+ }
+
+ TORRENT_ASSERT(cached_read_blocks + cached_write_blocks == m_cache_stats.cache_size);
+ TORRENT_ASSERT(cached_read_blocks == m_cache_stats.read_cache_size);
+
+ // when writing, there may be a one block difference, right before an old piece
+ // is flushed
+ TORRENT_ASSERT(m_cache_stats.cache_size <= m_cache_size + 1);
+ }
+#endif
+
+ int disk_io_thread::try_read_from_cache(disk_io_job const& j, mutex_t::scoped_lock& l)
+ {
+ TORRENT_ASSERT(l.locked());
+ TORRENT_ASSERT(j.buffer);
+
+ if (!m_use_read_cache) return -2;
+
+ std::vector::iterator p
+ = find_cached_piece(m_read_pieces, j, l);
+
+ bool hit = true;
+ int ret = 0;
+
+ // if the piece cannot be found in the cache,
+ // read the whole piece starting at the block
+ // we got a request for.
+ if (p == m_read_pieces.end())
+ {
+ ret = cache_read_block(j, l);
+ hit = false;
+ if (ret < 0) return ret;
+ p = m_read_pieces.end() - 1;
+ TORRENT_ASSERT(p->piece == j.piece);
+ TORRENT_ASSERT(p->storage == j.storage);
+ }
+
+ if (p != m_read_pieces.end())
+ {
+ // copy from the cache and update the last use timestamp
+ int block = j.offset / m_block_size;
+ int block_offset = j.offset % m_block_size;
+ int buffer_offset = 0;
+ int size = j.buffer_size;
+ if (p->blocks[block] == 0)
+ {
+ int piece_size = j.storage->info()->piece_size(j.piece);
+ int blocks_in_piece = (piece_size + m_block_size - 1) / m_block_size;
+ int end_block = block;
+ while (end_block < blocks_in_piece && p->blocks[end_block] == 0) ++end_block;
+ if (!make_room(end_block - block, l)) return -2;
+ ret = read_into_piece(*p, block, l);
+ hit = false;
+ if (ret == -1) return ret;
+ }
+
+ p->last_use = time_now();
+ while (size > 0)
+ {
+ TORRENT_ASSERT(p->blocks[block]);
+ int to_copy = (std::min)(m_block_size
+ - block_offset, size);
+ std::memcpy(j.buffer + buffer_offset
+ , p->blocks[block] + block_offset
+ , to_copy);
+ size -= to_copy;
+ block_offset = 0;
+ buffer_offset += to_copy;
+ }
+ ret = j.buffer_size;
+ ++m_cache_stats.blocks_read;
+ if (hit) ++m_cache_stats.blocks_read_hit;
+ }
+ return ret;
+ }
+
void disk_io_thread::add_job(disk_io_job const& j
, boost::function const& f)
{
TORRENT_ASSERT(!j.callback);
TORRENT_ASSERT(j.storage);
- TORRENT_ASSERT(j.buffer_size <= 16 * 1024);
+ TORRENT_ASSERT(j.buffer_size <= m_block_size);
mutex_t::scoped_lock l(m_mutex);
#ifndef NDEBUG
if (j.action == disk_io_job::write)
{
- std::vector::iterator p = find_cached_piece(j, l);
+ std::vector::iterator p
+ = find_cached_piece(m_pieces, j, l);
if (p != m_pieces.end())
{
- int block = j.offset / (16 * 1024);
+ int block = j.offset / m_block_size;
char const* buffer = p->blocks[block];
TORRENT_ASSERT(buffer == 0);
}
@@ -460,7 +747,7 @@ namespace libtorrent
int ret = 0;
- bool free_current_buffer = true;
+ bool allocated_buffer = false;
TORRENT_ASSERT(j.storage);
#ifdef TORRENT_DISK_STATS
ptime start = time_now();
@@ -485,10 +772,11 @@ namespace libtorrent
#ifdef TORRENT_DISK_STATS
m_log << log_time() << " read " << j.buffer_size << std::endl;
#endif
- free_current_buffer = false;
+ mutex_t::scoped_lock l(m_mutex);
if (j.buffer == 0)
{
j.buffer = allocate_buffer();
+ allocated_buffer = true;
TORRENT_ASSERT(j.buffer_size <= m_block_size);
if (j.buffer == 0)
{
@@ -497,40 +785,65 @@ namespace libtorrent
break;
}
}
- ret = j.storage->read_impl(j.buffer, j.piece, j.offset
- , j.buffer_size);
- if (ret < 0)
+
+ ret = try_read_from_cache(j, l);
+
+ // -2 means there's no space in the read cache
+ // or that the read cache is disabled
+ if (ret == -1)
{
+ if (allocated_buffer) free_buffer(j.buffer, l);
+ j.buffer = 0;
j.str = j.storage->error();
j.storage->clear_error();
+ break;
+ }
+ else if (ret == -2)
+ {
+ l.unlock();
+ ret = j.storage->read_impl(j.buffer, j.piece, j.offset
+ , j.buffer_size);
+ if (ret < 0)
+ {
+ if (allocated_buffer) free_buffer(j.buffer);
+ j.str = j.storage->error();
+ j.storage->clear_error();
+ break;
+ }
+ l.lock();
+ ++m_cache_stats.blocks_read;
}
break;
}
case disk_io_job::write:
{
- mutex_t::scoped_lock l(m_mutex);
#ifdef TORRENT_DISK_STATS
m_log << log_time() << " write " << j.buffer_size << std::endl;
#endif
- std::vector::iterator p = find_cached_piece(j, l);
- int block = j.offset / (16 * 1024);
+ mutex_t::scoped_lock l(m_mutex);
+ std::vector::iterator p
+ = find_cached_piece(m_pieces, j, l);
+ int block = j.offset / m_block_size;
TORRENT_ASSERT(j.buffer);
TORRENT_ASSERT(j.buffer_size <= m_block_size);
if (p != m_pieces.end())
{
TORRENT_ASSERT(p->blocks[block] == 0);
- if (p->blocks[block]) free_buffer(p->blocks[block]);
+ if (p->blocks[block])
+ {
+ free_buffer(p->blocks[block]);
+ --p->num_blocks;
+ }
p->blocks[block] = j.buffer;
- ++m_num_cached_blocks;
+ ++m_cache_stats.cache_size;
++p->num_blocks;
- p->last_write = time_now();
+ p->last_use = time_now();
}
else
{
cache_block(j, l);
}
- free_current_buffer = false;
- if (m_num_cached_blocks >= m_cache_size)
+ if (m_cache_stats.cache_size >= m_cache_size)
flush_oldest_piece(l);
break;
}
@@ -540,7 +853,8 @@ namespace libtorrent
m_log << log_time() << " hash" << std::endl;
#endif
mutex_t::scoped_lock l(m_mutex);
- std::vector::iterator i = find_cached_piece(j, l);
+ std::vector::iterator i
+ = find_cached_piece(m_pieces, j, l);
if (i != m_pieces.end()) flush_and_remove(i, l);
l.unlock();
sha1_hash h = j.storage->hash_for_piece_impl(j.piece);
@@ -607,7 +921,7 @@ namespace libtorrent
for (std::vector::iterator k = i; k != m_pieces.end(); ++k)
{
torrent_info const& ti = *k->storage->info();
- int blocks_in_piece = (ti.piece_size(k->piece) + (16 * 1024) - 1) / (16 * 1024);
+ int blocks_in_piece = (ti.piece_size(k->piece) + m_block_size - 1) / m_block_size;
for (int j = 0; j < blocks_in_piece; ++j)
{
if (k->blocks[j] == 0) continue;
@@ -655,8 +969,6 @@ namespace libtorrent
m_current.storage = 0;
m_current.callback.clear();
#endif
-
- if (j.buffer && free_current_buffer) free_buffer(j.buffer);
}
TORRENT_ASSERT(false);
}