serve cache hits without a round-trip to the disk thread for improved seeding performance

This commit is contained in:
Arvid Norberg 2011-11-16 07:09:12 +00:00
parent 4efabcefff
commit 85ef2528c6
3 changed files with 58 additions and 25 deletions

View File

@ -351,7 +351,11 @@ namespace libtorrent
int free_piece(cached_piece_entry& p, mutex::scoped_lock& l);
int drain_piece_bufs(cached_piece_entry& p, std::vector<char*>& buf
, mutex::scoped_lock& l);
int try_read_from_cache(disk_io_job const& j, bool& hit);
enum cache_flags_t {
cache_only = 1
};
int try_read_from_cache(disk_io_job const& j, bool& hit, int flags = 0);
int read_piece_from_cache_and_hash(disk_io_job const& j, sha1_hash& h);
int cache_piece(disk_io_job const& j, cache_piece_index_t::iterator& p
, bool& hit, int options, mutex::scoped_lock& l);

View File

@ -32,6 +32,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/disk_buffer_pool.hpp"
#include "libtorrent/assert.hpp"
#include <algorithm>
#if TORRENT_USE_MLOCK && !defined TORRENT_WINDOWS
#include <sys/mman.h>
@ -99,7 +100,7 @@ namespace libtorrent
#ifdef TORRENT_DISABLE_POOL_ALLOCATOR
char* ret = page_aligned_allocator::malloc(m_block_size);
#else
char* ret = (char*)m_pool.ordered_malloc();
char* ret = (char*)m_pool.malloc();
m_pool.set_next_size(m_settings.cache_buffer_chunk_size);
#endif
++m_in_use;

View File

@ -1234,7 +1234,7 @@ namespace libtorrent
return j.buffer_size;
}
int disk_io_thread::try_read_from_cache(disk_io_job const& j, bool& hit)
int disk_io_thread::try_read_from_cache(disk_io_job const& j, bool& hit, int flags)
{
TORRENT_ASSERT(j.buffer);
TORRENT_ASSERT(j.cache_min_time >= 0);
@ -1254,6 +1254,7 @@ namespace libtorrent
// we got a request for.
if (p == idx.end())
{
if (flags & cache_only) return -2;
// if we use an explicit read cache and we
// couldn't find the block in the cache,
// pretend that there's not enough space
@ -1290,13 +1291,28 @@ namespace libtorrent
return m_queue_buffer_size;
}
typedef std::list<std::pair<disk_io_job, int> > job_queue_t;
void completion_queue_handler(job_queue_t* completed_jobs)
{
boost::shared_ptr<job_queue_t> holder(completed_jobs);
for (job_queue_t::iterator i = completed_jobs->begin()
, end(completed_jobs->end()); i != end; ++i)
{
TORRENT_TRY
{
i->first.callback(i->second, i->first);
}
TORRENT_CATCH(std::exception& e)
{}
}
}
int disk_io_thread::add_job(disk_io_job const& j
, mutex::scoped_lock& l
, boost::function<void(int, disk_io_job const&)> const& f)
{
m_jobs.push_back(j);
m_jobs.back().callback.swap(const_cast<boost::function<void(int, disk_io_job const&)>&>(f));
m_jobs.back().start_time = time_now_hires();
const_cast<disk_io_job&>(j).start_time = time_now_hires();
if (j.action == disk_io_job::write)
{
@ -1305,6 +1321,36 @@ namespace libtorrent
&& m_settings.max_queued_disk_bytes > 0)
m_exceeded_write_queue = true;
}
else if (j.action == disk_io_job::read)
{
// if this is a cache hit, return it right away!
// this is OK because the cache is actually protected by
// the m_piece_mutex
bool hit = false;
if (j.buffer == 0)
{
// this is OK because the disk_buffer pool has its
// own mutex to protect the pool allocator
const_cast<disk_io_job&>(j).buffer = allocate_buffer("send buffer");
}
int ret = try_read_from_cache(j, hit, cache_only);
if (hit && ret >= 0)
{
TORRENT_ASSERT(f);
const_cast<disk_io_job&>(j).callback.swap(
const_cast<boost::function<void(int, disk_io_job const&)>&>(f));
job_queue_t* q = new job_queue_t;
q->push_back(std::make_pair(j, ret));
m_ios.post(boost::bind(completion_queue_handler, q));
return m_queue_buffer_size;
}
free_buffer(j.buffer);
const_cast<disk_io_job&>(j).buffer = 0;
}
m_jobs.push_back(j);
m_jobs.back().callback.swap(const_cast<boost::function<void(int, disk_io_job const&)>&>(f));
m_signal.signal(l);
return m_queue_buffer_size;
}
@ -1340,23 +1386,6 @@ namespace libtorrent
return false;
}
typedef std::list<std::pair<disk_io_job, int> > job_queue_t;
void completion_queue_handler(job_queue_t* completed_jobs)
{
boost::shared_ptr<job_queue_t> holder(completed_jobs);
for (job_queue_t::iterator i = completed_jobs->begin()
, end(completed_jobs->end()); i != end; ++i)
{
TORRENT_TRY
{
i->first.callback(i->second, i->first);
}
TORRENT_CATCH(std::exception& e)
{}
}
}
void disk_io_thread::post_callback(disk_io_job const& j, int ret)
{
if (!j.callback) return;
@ -1937,8 +1966,7 @@ namespace libtorrent
m_log << log_time();
#endif
INVARIANT_CHECK;
TORRENT_ASSERT(j.buffer == 0);
j.buffer = allocate_buffer("send buffer");
if (j.buffer == 0) j.buffer = allocate_buffer("send buffer");
TORRENT_ASSERT(j.buffer_size <= m_block_size);
if (j.buffer == 0)
{