improved disk read performance and fixed bug in storage_interface's backwards compatibility functions

This commit is contained in:
Arvid Norberg 2009-09-05 07:21:10 +00:00
parent c7b1d7e7d6
commit bec481acdf
14 changed files with 484 additions and 24 deletions

View File

@ -244,6 +244,9 @@ feature.compose <disk-stats>on : <define>TORRENT_DISK_STATS ;
feature memdebug : off on : composite propagated ;
feature.compose <memdebug>on : <define>TORRENT_MEMDEBUG ;
feature simulate-slow-read : off on : composite propagated ;
feature.compose <simulate-slow-read>on : <define>TORRENT_SIMULATE_SLOW_READ ;
feature logging : none default errors verbose : composite propagated link-incompatible ;
feature.compose <logging>default : <define>TORRENT_LOGGING ;
feature.compose <logging>errors : <define>TORRENT_ERROR_LOGGING ;

Binary file not shown.

After

Width:  |  Height:  |  Size: 5.6 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 5.2 KiB

View File

@ -174,6 +174,34 @@ The graph to the right shows the same download but with the new optimized disk c
algorithm. It clearly shows an increased utilization, which means higher read hit rates
or smaller caches with maintained hit rate.
high performance disk subsystem
-------------------------------
In some circumstances, the disk cache may not suffice to provide maximum performance.
One such example is high performance seeding, to a large number of peers, over a fast
up-link. In such a case, the amount of RAM may simply not be enough to cache disk
reads. When there's not enough RAM to cache disk reads, the disk throughput would
typically degrade to perform as poorly as with no cache at all, with the majority
of the time spent waiting for the disk head to seek.
To solve this problem, libtorrent sorts read requests by their physical offset on the
disk. They are processed by having the disk read head sweep back and forth over the drive.
This makes libtorrent very suitable for large scale, high-throughput seeding.
.. image:: disk_access_no_elevator.png
:width: 49%
.. image:: disk_access_elevator.png
:width: 49%
These plots illustrates the physical disk offset for reads over time. The left plot
is of a run where disk operation re-ordering is turned off and the righ is when it's
turned on. The right one has a relatively smooth sine wave shape whereas the left
one is more random and involves much longer seeks back and forth over the disk.
True physical disk offset queries are only supported on newer linux kernels and Mac OS X.
network buffers
---------------

View File

@ -72,6 +72,7 @@ namespace libtorrent
, buffer_size(0)
, piece(0)
, offset(0)
, phys_offset(-1)
, priority(0)
{}
@ -101,6 +102,7 @@ namespace libtorrent
boost::intrusive_ptr<piece_manager> storage;
// arguments used for read and write
int piece, offset;
size_type phys_offset;
// used for move_storage and rename_file. On errors, this is set
// to the error message
std::string str;
@ -124,6 +126,20 @@ namespace libtorrent
boost::function<void(int, disk_io_job const&)> callback;
};
// returns true if the disk job requires ordering
// some jobs may not be processed until all jobs
// ahead of it in the queue have been processed
// jobs that require this are fence operation
bool is_fence_operation(disk_io_job const& j);
// returns true if the fundamental operation
// of the given disk job is a read operation
bool is_read_operation(disk_io_job const& j);
// this is true if the buffer field in the disk_io_job
// points to a disk buffer
bool operation_has_buffer(disk_io_job const& j);
struct cache_status
{
cache_status()

View File

@ -157,6 +157,8 @@ namespace libtorrent
// belongs to a data-region
size_type sparse_end(size_type start) const;
size_type phys_offset(size_type offset);
private:
#ifdef TORRENT_WINDOWS

View File

@ -178,6 +178,7 @@ namespace libtorrent
, write_cache_line_size(32)
, optimistic_disk_retry(10 * 60)
, disable_hash_checks(false)
, allow_reordered_disk_operations(true)
, allow_i2p_mixed(false)
, max_suggest_pieces(10)
{}
@ -626,6 +627,16 @@ namespace libtorrent
// disabled_storage)
bool disable_hash_checks;
// if this is true, disk read operations may
// be re-ordered based on their physical disk
// read offset. This greatly improves throughput
// when uploading to many peers. This assumes
// a traditional hard drive with a read head
// and spinning platters. If your storage medium
// is a solid state drive, this optimization
// doesn't give you an benefits
bool allow_reordered_disk_operations;
// if this is true, i2p torrents are allowed
// to also get peers from other sources than
// the tracker, and connect to regular IPs,

View File

@ -131,6 +131,8 @@ namespace libtorrent
// negative return value indicates an error
virtual int write(const char* buf, int slot, int offset, int size) = 0;
virtual size_type physical_offset(int slot, int offset) = 0;
// returns the end of the sparse region the slot 'start'
// resides in i.e. the next slot with content. If start
// is not in a sparse region, start itself is returned
@ -342,6 +344,8 @@ namespace libtorrent
, int offset
, int num_bufs);
size_type physical_offset(int piece_index, int offset);
// returns the number of pieces left in the
// file currently being checked
int skip_file() const;

View File

@ -11,9 +11,9 @@ lines = open(sys.argv[1], 'rb').readlines()
keys = ['read', 'write', 'head movement', 'seek per read byte', 'seek per written byte']
colors = ['70e070', 'e07070', '3030f0', '10a010', 'a01010']
style = ['points pointtype 1', 'points pointtype 2', 'lines', 'lines', 'lines']
style = ['linespoints', 'points pointtype 2', 'lines', 'lines', 'lines']
axis = ['x1y1', 'x1y1', 'x1y2', 'x1y2', 'x1y2']
plot = [True, True, False, True, True]
plot = [True, False, False, False, False]
out = open('disk_access_log.dat', 'w+')
@ -65,16 +65,16 @@ for l in lines:
out.close()
out = open('disk_access.gnuplot', 'wb')
print >>out, "set term png size 1200,700"
print >>out, "set term png size 600,300"
print >>out, 'set output "disk_access.png"'
print >>out, 'set xrange [0:*]'
print >>out, 'set y2range [0:*]'
print >>out, 'set xrange [*:*]'
#print >>out, 'set y2range [0:*]'
print >>out, 'set xlabel "time (ms)"'
print >>out, 'set ylabel "file position"'
print >>out, 'set y2label "bytes / %d second(s)"' % (time / 1000)
#print >>out, 'set y2label "bytes / %d second(s)"' % (time / 1000)
print >>out, "set key box"
print >>out, "set tics nomirror"
print >>out, "set y2tics 100"
#print >>out, "set y2tics 100"
print >>out, 'plot',
count = 1
for k in keys:

View File

@ -30,6 +30,10 @@ POSSIBILITY OF SUCH DAMAGE.
*/
/*
Disk queue elevator patch by Morten Husveit
*/
#include "libtorrent/storage.hpp"
#include "libtorrent/disk_io_thread.hpp"
#include "libtorrent/disk_buffer_holder.hpp"
@ -1292,8 +1296,55 @@ namespace libtorrent
m_ios.post(bind(handler, ret, j));
}
enum action_flags_t
{
read_operation = 1
, fence_operation = 2
, buffer_operation = 4
};
static const uint8_t action_flags[] =
{
read_operation + buffer_operation // read
, buffer_operation // write
, 0 // hash
, fence_operation // move_storage
, fence_operation // release_files
, fence_operation // delete_files
, fence_operation // check_fastresume
, read_operation // check_files
, fence_operation // save_resume_data
, fence_operation // rename_file
, fence_operation // abort_thread
, fence_operation // clear_read_cache
, fence_operation // abort_torrent
, 0 // update_settings
, read_operation // read_and_hash
};
bool is_fence_operation(disk_io_job const& j)
{
TORRENT_ASSERT(j.action >= 0 && j.action < sizeof(action_flags));
return action_flags[j.action] & fence_operation;
}
bool is_read_operation(disk_io_job const& j)
{
TORRENT_ASSERT(j.action >= 0 && j.action < sizeof(action_flags));
return action_flags[j.action] & read_operation;
}
bool operation_has_buffer(disk_io_job const& j)
{
TORRENT_ASSERT(j.action >= 0 && j.action < sizeof(action_flags));
return action_flags[j.action] & buffer_operation;
}
void disk_io_thread::operator()()
{
size_type elevator_position = 0;
int elevator_direction = 1;
for (;;)
{
#ifdef TORRENT_DISK_STATS
@ -1330,19 +1381,88 @@ namespace libtorrent
return;
}
std::list<disk_io_job>::iterator selected_job = m_jobs.begin();
if (m_settings.allow_reordered_disk_operations
&& is_read_operation(*selected_job))
{
// Before reading the current block, read any
// blocks between the read head and the queued
// block, elevator style
std::list<disk_io_job>::iterator best_job, i;
size_type score, best_score = (size_type) -1;
for (;;)
{
for (i = m_jobs.begin(); i != m_jobs.end(); ++i)
{
// ignore fence_operations
if (is_fence_operation(*i))
continue;
// always prioritize all disk-I/O jobs
// that are not read operations
if (!is_read_operation(*i))
{
best_job = i;
best_score = 0;
break;
}
// we only need to query for physical offset
// for read operations, since those are
// the only ones we re-order
if (i->phys_offset == -1)
i->phys_offset = i->storage->physical_offset(i->piece, i->offset);
if (elevator_direction > 0)
{
score = i->phys_offset - elevator_position;
if (i->phys_offset >= elevator_position
&& (score < best_score
|| best_score == (size_type)-1))
{
best_score = score;
best_job = i;
}
}
else
{
score = elevator_position - i->phys_offset;
if (i->phys_offset <= elevator_position
&& (score < best_score
|| best_score == (size_type)-1))
{
best_score = score;
best_job = i;
}
}
}
if (best_score != (size_type) -1)
break;
elevator_direction = -elevator_direction;
}
selected_job = best_job;
// only update the elevator position for read jobs
if (is_read_operation(*selected_job))
elevator_position = selected_job->phys_offset;
}
// if there's a buffer in this job, it will be freed
// when this holder is destructed, unless it has been
// released.
disk_buffer_holder holder(*this
, m_jobs.front().action != disk_io_job::check_fastresume
&& m_jobs.front().action != disk_io_job::update_settings
? m_jobs.front().buffer : 0);
, operation_has_buffer(*selected_job) ? selected_job->buffer : 0);
boost::function<void(int, disk_io_job const&)> handler;
handler.swap(m_jobs.front().callback);
handler.swap(selected_job->callback);
disk_io_job j = m_jobs.front();
m_jobs.pop_front();
disk_io_job j = *selected_job;
m_jobs.erase(selected_job);
if (j.action == disk_io_job::write)
{
TORRENT_ASSERT(m_queue_buffer_size >= j.buffer_size);

View File

@ -30,6 +30,10 @@ POSSIBILITY OF SUCH DAMAGE.
*/
/*
Physical file offset patch by Morten Husveit
*/
#include "libtorrent/pch.hpp"
#include "libtorrent/config.hpp"
#include "libtorrent/alloca.hpp"
@ -51,6 +55,12 @@ POSSIBILITY OF SUCH DAMAGE.
#include <sys/types.h>
#include <sys/statvfs.h>
#include <errno.h>
#ifdef HAVE_FIEMAP
#include <ioctl.h>
#include <linux/fiemap.h>
#endif
#include <fcntl.h> // for F_LOG2PHYS
#include <boost/static_assert.hpp>
// make sure the _FILE_OFFSET_BITS define worked
@ -760,6 +770,48 @@ namespace libtorrent
#endif // TORRENT_WINDOWS
}
size_type file::phys_offset(size_type offset)
{
#ifdef HAVE_FIEMAP
// for documentation of this feature
// http://lwn.net/Articles/297696/
struct
{
struct fiemap fiemap;
struct fiemap_extent extent;
} fm;
memset(&fm, 0, sizeof(fm));
fm.fiemap.fm_start = offset;
fm.fiemap.fm_length = size_alignment();
// this sounds expensive
fm.fiemap.fm_flags = FIEMAP_FLAG_SYNC;
fm.fiemap.fm_extent_count = 1;
if (ioctl(m_fd, FS_IOC_FIEMAP, &fm) == -1)
return 0;
if (fm.fiemap.fm_extents[0].fe_flags & FIEMAP_EXTENT_UNKNOWN)
return 0;
// the returned extent is not guaranteed to start
// at the requested offset, adjust for that in
// case they differ
return fm.fiemap.fm_extents[0].fe_physical + (offset - fm.fiemap.fm_extents[0].fe_logical);
#elif defined F_LOG2PHYS
// for documentation of this feature
// http://developer.apple.com/mac/library/documentation/Darwin/Reference/ManPages/man2/fcntl.2.html
log2phys l;
size_type ret = lseek(m_fd, offset, SEEK_SET);
if (ret < 0) return 0;
if (fcntl(m_fd, F_LOG2PHYS, &l) != -1)
return l.l2p_devoffset;
#endif
return 0;
}
bool file::set_size(size_type s, error_code& ec)
{
TORRENT_ASSERT(is_open());

View File

@ -294,7 +294,7 @@ namespace libtorrent
int ret = 0;
for (file::iovec_t const* i = bufs, *end(bufs + num_bufs); i < end; ++i)
{
int r = write((char const*)i->iov_base, slot, offset, i->iov_len);
int r = read((char*)i->iov_base, slot, offset, i->iov_len);
offset += i->iov_len;
if (r == -1) return -1;
ret += r;
@ -308,7 +308,7 @@ namespace libtorrent
int ret = 0;
for (file::iovec_t const* i = bufs, *end(bufs + num_bufs); i < end; ++i)
{
int r = read((char*)i->iov_base, slot, offset, i->iov_len);
int r = write((char const*)i->iov_base, slot, offset, i->iov_len);
offset += i->iov_len;
if (r == -1) return -1;
ret += r;
@ -414,6 +414,7 @@ namespace libtorrent
int sparse_end(int start) const;
int readv(file::iovec_t const* bufs, int slot, int offset, int num_bufs);
int writev(file::iovec_t const* buf, int slot, int offset, int num_bufs);
size_type physical_offset(int slot, int offset);
bool move_slot(int src_slot, int dst_slot);
bool swap_slots(int slot1, int slot2);
bool swap_slots3(int slot1, int slot2, int slot3);
@ -1181,7 +1182,7 @@ ret:
if (pool)
{
pool->m_disk_access_log << log_time() << " write "
<< (size_type(slot) * m_files.piece_length() + offset) << std::endl;
<< physical_offset(slot, offset) << std::endl;
}
#endif
fileop op = { &file::writev, &storage::write_unaligned
@ -1191,14 +1192,48 @@ ret:
if (pool)
{
pool->m_disk_access_log << log_time() << " write_end "
<< (size_type(slot) * m_files.piece_length() + offset + ret) << std::endl;
<< (physical_offset(slot, offset) + ret) << std::endl;
}
return ret;
#else
return readwritev(bufs, slot, offset, num_bufs, op);
return readwritev(bufs, slot, offset, num_bufs, op);
#endif
}
size_type storage::physical_offset(int slot, int offset)
{
TORRENT_ASSERT(slot >= 0);
TORRENT_ASSERT(slot < m_files.num_pieces());
TORRENT_ASSERT(offset >= 0);
// find the file and file
size_type tor_off = size_type(slot)
* files().piece_length() + offset;
file_storage::iterator file_iter = files().file_at_offset(tor_off);
size_type file_offset = tor_off - file_iter->offset;
TORRENT_ASSERT(file_offset >= 0);
fs::path p(m_save_path / file_iter->path);
error_code ec;
// open the file read only to avoid re-opening
// it in case it's already opened in read-only mode
boost::shared_ptr<file> f = m_pool.open_file(
this, p, file::read_only, ec);
size_type ret = 0;
if (f && !ec) ret = f->phys_offset(file_offset);
if (ret == 0)
{
// this means we don't support true physical offset
// just make something up
return size_type(slot) * files().piece_length() + offset;
}
return ret;
}
int storage::readv(file::iovec_t const* bufs, int slot, int offset
, int num_bufs)
{
@ -1207,17 +1242,21 @@ ret:
if (pool)
{
pool->m_disk_access_log << log_time() << " read "
<< (size_type(slot) * m_files.piece_length() + offset) << std::endl;
<< physical_offset(slot, offset) << std::endl;
}
#endif
fileop op = { &file::readv, &storage::read_unaligned
, m_settings ? settings().disk_io_read_mode : 0, file::read_only };
#ifdef TORRENT_SIMULATE_SLOW_READ
boost::thread::sleep(boost::get_system_time()
+ boost::posix_time::milliseconds(1000));
#endif
#ifdef TORRENT_DISK_STATS
int ret = readwritev(bufs, slot, offset, num_bufs, op);
if (pool)
{
pool->m_disk_access_log << log_time() << " read_end "
<< (size_type(slot) * m_files.piece_length() + offset + ret) << std::endl;
<< (physical_offset(slot, offset) + ret) << std::endl;
}
return ret;
#else
@ -1467,6 +1506,7 @@ ret:
bool move_storage(fs::path save_path) { return true; }
int read(char* buf, int slot, int offset, int size) { return size; }
int write(char const* buf, int slot, int offset, int size) { return size; }
size_type physical_offset(int slot, int offset) { return 0; }
int readv(file::iovec_t const* bufs, int slot, int offset, int num_bufs)
{
#ifdef TORRENT_DISK_STATS
@ -1474,7 +1514,7 @@ ret:
if (pool)
{
pool->m_disk_access_log << log_time() << " read "
<< (size_type(slot) * m_piece_size + offset) << std::endl;
<< physical_offset(slot, offset) << std::endl;
}
#endif
int ret = 0;
@ -1484,7 +1524,7 @@ ret:
if (pool)
{
pool->m_disk_access_log << log_time() << " read_end "
<< (size_type(slot) * m_piece_size + offset + ret) << std::endl;
<< (physical_offset(slot, offset) + ret) << std::endl;
}
#endif
return ret;
@ -1496,7 +1536,7 @@ ret:
if (pool)
{
pool->m_disk_access_log << log_time() << " write "
<< (size_type(slot) * m_piece_size + offset) << std::endl;
<< physical_offset(slot, offset) << std::endl;
}
#endif
int ret = 0;
@ -1506,7 +1546,7 @@ ret:
if (pool)
{
pool->m_disk_access_log << log_time() << " write_end "
<< (size_type(slot) * m_piece_size + offset + ret) << std::endl;
<< (physical_offset(slot, offset) + ret) << std::endl;
}
#endif
return ret;
@ -1921,6 +1961,20 @@ ret:
return ret;
}
size_type piece_manager::physical_offset(
int piece_index
, int offset)
{
TORRENT_ASSERT(offset >= 0);
TORRENT_ASSERT(piece_index >= 0 && piece_index < m_files.num_pieces());
int slot = slot_for(piece_index);
// we may not have a slot for this piece yet.
// assume there is no re-mapping of slots
if (slot < 0) slot = piece_index;
return m_storage->physical_offset(slot, offset);
}
int piece_manager::identify_data(
sha1_hash const& large_hash
, sha1_hash const& small_hash

View File

@ -121,6 +121,170 @@ void print_error(int ret, boost::scoped_ptr<storage_interface> const& s)
<< std::endl;
}
int bufs_size(file::iovec_t const* bufs, int num_bufs);
// simulate a very slow first read
struct test_storage : storage_interface
{
test_storage() {}
virtual bool initialize(bool allocate_files) { return true; }
virtual bool has_any_file() { return true; }
int write(
const char* buf
, int slot
, int offset
, int size)
{
return size;
}
int read(
char* buf
, int slot
, int offset
, int size)
{
if (slot == 0 || slot == 5999)
{
boost::thread::sleep(boost::get_system_time()
+ boost::posix_time::seconds(2));
std::cerr << "--- starting ---\n" << std::endl;
}
return size;
}
size_type physical_offset(int slot, int offset)
{ return slot * 16 * 1024 + offset; }
virtual int sparse_end(int start) const
{ return start; }
virtual bool move_storage(fs::path save_path)
{ return false; }
virtual bool verify_resume_data(lazy_entry const& rd, error_code& error)
{ return false; }
virtual bool write_resume_data(entry& rd) const
{ return false; }
virtual bool move_slot(int src_slot, int dst_slot)
{ return false; }
virtual bool swap_slots(int slot1, int slot2)
{ return false; }
virtual bool swap_slots3(int slot1, int slot2, int slot3)
{ return false; }
virtual bool release_files() { return false; }
virtual bool rename_file(int index, std::string const& new_filename)
{ return false; }
virtual bool delete_files() { return false; }
virtual ~test_storage() {}
};
storage_interface* create_test_storage(file_storage const& fs
, file_storage const* mapped, fs::path const& path, file_pool& fp)
{
return new test_storage;
}
void nop() {}
int job_counter = 0;
void callback_up(int ret, disk_io_job const& j)
{
static int last_job = 0;
TEST_CHECK(last_job <= j.piece);
last_job = j.piece;
std::cerr << "completed job #" << j.piece << std::endl;
--job_counter;
}
void callback_down(int ret, disk_io_job const& j)
{
static int last_job = 6000;
TEST_CHECK(last_job >= j.piece);
last_job = j.piece;
std::cerr << "completed job #" << j.piece << std::endl;
--job_counter;
}
void add_job_up(disk_io_thread& dio, int piece, boost::intrusive_ptr<piece_manager>& pm)
{
disk_io_job j;
j.action = disk_io_job::read;
j.storage = pm;
j.piece = piece;
++job_counter;
dio.add_job(j, boost::bind(&callback_up, _1, _2));
}
void add_job_down(disk_io_thread& dio, int piece, boost::intrusive_ptr<piece_manager>& pm)
{
disk_io_job j;
j.action = disk_io_job::read;
j.storage = pm;
j.piece = piece;
++job_counter;
dio.add_job(j, boost::bind(&callback_down, _1, _2));
}
void run_elevator_test()
{
io_service ios;
file_pool fp;
boost::intrusive_ptr<torrent_info> ti = ::create_torrent(0, 16, 6000);
{
disk_io_thread dio(ios, &nop);
boost::intrusive_ptr<piece_manager> pm(new piece_manager(boost::shared_ptr<void>(), ti, ""
, fp, dio, &create_test_storage, storage_mode_sparse));
// test the elevator going up
add_job_up(dio, 0, pm);
uint32_t p = 1234513;
for (int i = 0; i < 100; ++i)
{
p *= 123;
int job = (p % 5999) + 1;
std::cerr << "starting job #" << job << std::endl;
add_job_up(dio, job, pm);
}
for (int i = 0; i < 101; ++i)
ios.run_one();
TEST_CHECK(job_counter == 0);
// test the elevator going down
add_job_down(dio, 5999, pm);
for (int i = 0; i < 100; ++i)
{
p *= 123;
int job = (p % 5999) + 1;
std::cerr << "starting job #" << job << std::endl;
add_job_down(dio, job, pm);
}
for (int i = 0; i < 101; ++i)
ios.run_one();
TEST_CHECK(job_counter == 0);
dio.join();
}
}
void run_storage_tests(boost::intrusive_ptr<torrent_info> info
, file_storage& fs
, path const& test_path
@ -657,6 +821,9 @@ void test_rename_file_in_fastresume(path const& test_path)
int test_main()
{
run_elevator_test();
// initialize test pieces
for (char* p = piece0, *end(piece0 + piece_size); p < end; ++p)
*p = rand();

View File

@ -147,6 +147,9 @@ struct test_storage : storage_interface
return ret;
}
virtual size_type physical_offset(int piece_index, int offset)
{ return m_lower_layer->physical_offset(piece_index, offset); }
virtual int read(char* buf, int slot, int offset, int size)
{ return m_lower_layer->read(buf, slot, offset, size); }