factor out readwritev to a free function, to allow unit testing of it

This commit is contained in:
arvidn 2015-11-15 22:36:55 -05:00
parent 12b5404970
commit 964031c070
4 changed files with 263 additions and 186 deletions

View File

@ -398,6 +398,8 @@ namespace libtorrent
// override some of its behavior, when implementing a custom storage. // override some of its behavior, when implementing a custom storage.
class TORRENT_EXPORT default_storage : public storage_interface, boost::noncopyable class TORRENT_EXPORT default_storage : public storage_interface, boost::noncopyable
{ {
friend struct write_fileop;
friend struct read_fileop;
public: public:
// constructs the default_storage based on the give file_storage (fs). // constructs the default_storage based on the give file_storage (fs).
// ``mapped`` is an optional argument (it may be NULL). If non-NULL it // ``mapped`` is an optional argument (it may be NULL). If non-NULL it
@ -452,23 +454,7 @@ namespace libtorrent
int sparse_end(int start) const; int sparse_end(int start) const;
// this identifies a read or write operation
// so that default_storage::readwritev() knows what to
// do when it's actually touching the file
struct fileop
{
// file operation
boost::int64_t (file::*op)(boost::int64_t file_offset
, file::iovec_t const* bufs, int num_bufs, error_code& ec, int flags);
// file open mode (file::read_only, file::write_only etc.)
// this is used to open the file, but also passed along as the
// flags argument to the file operation (readv or writev)
int mode;
};
void delete_one_file(std::string const& p, error_code& ec); void delete_one_file(std::string const& p, error_code& ec);
int readwritev(file::iovec_t const* bufs, int piece, int offset
, int num_bufs, fileop const& op, storage_error& ec);
void need_partfile(); void need_partfile();
@ -501,7 +487,7 @@ namespace libtorrent
// whose bit is 0, we set the file size, to make the file allocated // whose bit is 0, we set the file size, to make the file allocated
// on disk (in full allocation mode) and just sparsely allocated in // on disk (in full allocation mode) and just sparsely allocated in
// case of sparse allocation mode // case of sparse allocation mode
bitfield m_file_created; mutable bitfield m_file_created;
bool m_allocate_files; bool m_allocate_files;
}; };
@ -716,6 +702,21 @@ namespace libtorrent
boost::shared_ptr<void> m_torrent; boost::shared_ptr<void> m_torrent;
}; };
// this identifies a read or write operation so that readwritev() knows
// what to do when it's actually touching the file
struct fileop
{
virtual int file_op(int file_index, boost::int64_t file_offset, int size
, file::iovec_t const* bufs, storage_error& ec) = 0;
};
// this function is responsible for turning read and write operations in the
// torrent space (pieces) into read and write operations in the filesystem
// space (files on disk).
TORRENT_EXTRA_EXPORT int readwritev(file_storage const& files
, file::iovec_t const* bufs, int piece, int offset, int num_bufs
, fileop& op, storage_error& ec);
} }
#endif // TORRENT_STORAGE_HPP_INCLUDED #endif // TORRENT_STORAGE_HPP_INCLUDED

View File

@ -263,8 +263,7 @@ TORRENT_EXPORT void assert_fail(char const* expr, int line
char const* message = "assertion failed. Please file a bugreport at " char const* message = "assertion failed. Please file a bugreport at "
"https://github.com/arvidn/libtorrent/issues\n" "https://github.com/arvidn/libtorrent/issues\n"
"Please include the following information:\n\n" "Please include the following information:\n\n"
"version: " LIBTORRENT_VERSION "\n" "version: " LIBTORRENT_VERSION "-" LIBTORRENT_REVISION "\n";
LIBTORRENT_REVISION "\n";
switch (kind) switch (kind)
{ {

View File

@ -166,7 +166,6 @@ namespace libtorrent
namespace { namespace {
#if TORRENT_USE_ASSERTS
int count_bufs(file::iovec_t const* bufs, int bytes) int count_bufs(file::iovec_t const* bufs, int bytes)
{ {
int size = 0; int size = 0;
@ -175,11 +174,9 @@ namespace libtorrent
for (file::iovec_t const* i = bufs;; ++i, ++count) for (file::iovec_t const* i = bufs;; ++i, ++count)
{ {
size += i->iov_len; size += i->iov_len;
TORRENT_ASSERT(size <= bytes);
if (size >= bytes) return count; if (size >= bytes) return count;
} }
} }
#endif
#ifdef TORRENT_DISK_STATS #ifdef TORRENT_DISK_STATS
static boost::atomic<int> event_id; static boost::atomic<int> event_id;
@ -223,8 +220,185 @@ namespace libtorrent
} }
} }
#endif #endif
} // anonymous namespace } // anonymous namespace
struct write_fileop : fileop
{
write_fileop(default_storage& st, int flags)
: m_storage(st)
, m_flags(flags)
{}
int file_op(int file_index, boost::int64_t file_offset, int size
, file::iovec_t const* bufs, storage_error& ec)
TORRENT_OVERRIDE TORRENT_FINAL
{
if (m_storage.files().pad_file_at(file_index))
{
// writing to a pad-file is a no-op
return size;
}
int num_bufs = count_bufs(bufs, size);
if (file_index < int(m_storage.m_file_priority.size())
&& m_storage.m_file_priority[file_index] == 0)
{
m_storage.need_partfile();
error_code e;
peer_request map = m_storage.files().map_file(file_index
, file_offset, 0);
int ret = m_storage.m_part_file->writev(bufs, num_bufs
, map.piece, map.start, e);
if (e)
{
ec.ec = e;
ec.file = file_index;
ec.operation = storage_error::partfile_write;
return -1;
}
return ret;
}
// invalidate our stat cache for this file, since
// we're writing to it
m_storage.m_stat_cache.set_dirty(file_index);
file_handle handle = m_storage.open_file(file_index
, file::read_write, ec);
if (ec) return -1;
// please ignore the adjusted_offset. It's just file_offset.
boost::int64_t adjusted_offset =
#ifndef TORRENT_NO_DEPRECATE
m_storage.files().file_base_deprecated(file_index) +
#endif
file_offset;
#ifdef TORRENT_DISK_STATS
write_access_log(adjusted_offset, handle->file_id(), op_start | op_write, clock_type::now());
#endif
error_code e;
int ret = handle->writev(adjusted_offset
, bufs, num_bufs, e, m_flags);
// set this unconditionally in case the upper layer would like to treat
// short reads as errors
ec.operation = storage_error::write;
// we either get an error or 0 or more bytes read
TORRENT_ASSERT(e || ret >= 0);
#ifdef TORRENT_DISK_STATS
write_access_log(adjusted_offset + ret , handle->file_id(), op_end | op_write, clock_type::now());
#endif
TORRENT_ASSERT(ret <= bufs_size(bufs, num_bufs));
if (e)
{
ec.ec = e;
ec.file = file_index;
return -1;
}
return ret;
}
private:
default_storage& m_storage;
int m_flags;
};
struct read_fileop : fileop
{
read_fileop(default_storage& st, int flags)
: m_storage(st)
, m_flags(flags)
{}
int file_op(int file_index, boost::int64_t file_offset, int size
, file::iovec_t const* bufs, storage_error& ec)
TORRENT_OVERRIDE TORRENT_FINAL
{
int num_bufs = count_bufs(bufs, size);
if (m_storage.files().pad_file_at(file_index))
{
// reading from a pad file yields zeroes
clear_bufs(bufs, num_bufs);
return size;
}
if (file_index < int(m_storage.m_file_priority.size())
&& m_storage.m_file_priority[file_index] == 0)
{
m_storage.need_partfile();
error_code e;
peer_request map = m_storage.files().map_file(file_index
, file_offset, 0);
int ret = m_storage.m_part_file->readv(bufs, num_bufs
, map.piece, map.start, e);
if (e)
{
ec.ec = e;
ec.file = file_index;
ec.operation = storage_error::partfile_read;
return -1;
}
return ret;
}
file_handle handle = m_storage.open_file(file_index
, file::read_only | m_flags, ec);
if (ec) return -1;
// please ignore the adjusted_offset. It's just file_offset.
boost::int64_t adjusted_offset =
#ifndef TORRENT_NO_DEPRECATE
m_storage.files().file_base_deprecated(file_index) +
#endif
file_offset;
#ifdef TORRENT_DISK_STATS
write_access_log(adjusted_offset, handle->file_id(), op_start | op_read, clock_type::now());
#endif
error_code e;
int ret = handle->readv(adjusted_offset
, bufs, num_bufs, e, m_flags);
// set this unconditionally in case the upper layer would like to treat
// short reads as errors
ec.operation = storage_error::read;
// we either get an error or 0 or more bytes read
TORRENT_ASSERT(e || ret >= 0);
#ifdef TORRENT_DISK_STATS
write_access_log(adjusted_offset + ret , handle->file_id(), op_end | op_read, clock_type::now());
#endif
TORRENT_ASSERT(ret <= bufs_size(bufs, num_bufs));
if (e)
{
ec.ec = e;
ec.file = file_index;
return -1;
}
return ret;
}
private:
default_storage& m_storage;
int m_flags;
};
default_storage::default_storage(storage_params const& params) default_storage::default_storage(storage_params const& params)
: m_files(*params.files) : m_files(*params.files)
, m_pool(*params.pool) , m_pool(*params.pool)
@ -1098,50 +1272,46 @@ namespace libtorrent
int default_storage::readv(file::iovec_t const* bufs, int num_bufs int default_storage::readv(file::iovec_t const* bufs, int num_bufs
, int piece, int offset, int flags, storage_error& ec) , int piece, int offset, int flags, storage_error& ec)
{ {
fileop op = { &file::readv read_fileop op(*this, flags);
, file::read_only | flags };
#ifdef TORRENT_SIMULATE_SLOW_READ #ifdef TORRENT_SIMULATE_SLOW_READ
boost::thread::sleep(boost::get_system_time() boost::thread::sleep(boost::get_system_time()
+ boost::posix_time::milliseconds(1000)); + boost::posix_time::milliseconds(1000));
#endif #endif
return readwritev(bufs, piece, offset, num_bufs, op, ec); return readwritev(files(), bufs, piece, offset, num_bufs, op, ec);
} }
int default_storage::writev(file::iovec_t const* bufs, int num_bufs int default_storage::writev(file::iovec_t const* bufs, int num_bufs
, int piece, int offset, int flags, storage_error& ec) , int piece, int offset, int flags, storage_error& ec)
{ {
fileop op = { &file::writev write_fileop op(*this, flags);
, file::read_write | flags }; return readwritev(files(), bufs, piece, offset, num_bufs, op, ec);
return readwritev(bufs, piece, offset, num_bufs, op, ec);
} }
// much of what needs to be done when reading and writing // much of what needs to be done when reading and writing is buffer
// is buffer management and piece to file mapping. Most // management and piece to file mapping. Most of that is the same for reading
// of that is the same for reading and writing. This function // and writing. This function is a template, and the fileop decides what to
// is a template, and the fileop decides what to do with the // do with the file and the buffers.
// file and the buffers. int readwritev(file_storage const& files, file::iovec_t const* const bufs
int default_storage::readwritev(file::iovec_t const* const bufs , const int piece, const int offset, const int num_bufs, fileop& op
, const int piece, const int offset , storage_error& ec)
, const int num_bufs, fileop const& op, storage_error& ec)
{ {
TORRENT_ASSERT(bufs != 0); TORRENT_ASSERT(bufs != 0);
TORRENT_ASSERT(piece >= 0); TORRENT_ASSERT(piece >= 0);
TORRENT_ASSERT(piece < m_files.num_pieces()); TORRENT_ASSERT(piece < files.num_pieces());
TORRENT_ASSERT(offset >= 0); TORRENT_ASSERT(offset >= 0);
TORRENT_ASSERT(num_bufs > 0); TORRENT_ASSERT(num_bufs > 0);
const int size = bufs_size(bufs, num_bufs); const int size = bufs_size(bufs, num_bufs);
TORRENT_ASSERT(size > 0); TORRENT_ASSERT(size > 0);
TORRENT_ASSERT(files().is_loaded()); TORRENT_ASSERT(files.is_loaded());
// find the file iterator and file offset // find the file iterator and file offset
boost::uint64_t torrent_offset = piece * boost::uint64_t(m_files.piece_length()) + offset; boost::uint64_t torrent_offset = piece * boost::uint64_t(files.piece_length()) + offset;
int file_index = files().file_index_at_offset(torrent_offset); int file_index = files.file_index_at_offset(torrent_offset);
TORRENT_ASSERT(torrent_offset >= files().file_offset(file_index)); TORRENT_ASSERT(torrent_offset >= files.file_offset(file_index));
TORRENT_ASSERT(torrent_offset < files().file_offset(file_index) + files().file_size(file_index)); TORRENT_ASSERT(torrent_offset < files.file_offset(file_index) + files.file_size(file_index));
boost::int64_t file_offset = torrent_offset - files().file_offset(file_index); boost::int64_t file_offset = torrent_offset - files.file_offset(file_index);
file_handle handle;
// the number of bytes left before this read or write operation is // the number of bytes left before this read or write operation is
// completely satisfied. // completely satisfied.
@ -1149,188 +1319,69 @@ namespace libtorrent
TORRENT_ASSERT(bytes_left >= 0); TORRENT_ASSERT(bytes_left >= 0);
file::iovec_t* tmp_bufs = TORRENT_ALLOCA(file::iovec_t, num_bufs); // copy the iovec array so we can use it to keep track of our current
// location by updating the head base pointer and size. (see
// advance_bufs())
file::iovec_t* current_buf = TORRENT_ALLOCA(file::iovec_t, num_bufs); file::iovec_t* current_buf = TORRENT_ALLOCA(file::iovec_t, num_bufs);
copy_bufs(bufs, size, current_buf); copy_bufs(bufs, size, current_buf);
TORRENT_ASSERT(count_bufs(current_buf, size) == num_bufs); TORRENT_ASSERT(count_bufs(current_buf, size) == num_bufs);
file::iovec_t* tmp_buf = TORRENT_ALLOCA(file::iovec_t, num_bufs);
// the number of bytes left to read in the current file (specified by // the number of bytes left to read in the current file (specified by
// file_index). This is the minimum of (file_size - file_offset) and // file_index). This is the minimum of (file_size - file_offset) and
// bytes_left. // bytes_left.
int file_bytes_left; int file_bytes_left;
// this is set to true if we advance the file_index. If so, we need to
// re-open the file handle below, before we start the read or write to
// it.
bool need_reopen_file = true;
while (bytes_left > 0) while (bytes_left > 0)
{ {
file_bytes_left = bytes_left; file_bytes_left = bytes_left;
if (file_offset + file_bytes_left > files().file_size(file_index)) if (file_offset + file_bytes_left > files.file_size(file_index))
file_bytes_left = (std::max)(static_cast<int>(files().file_size(file_index) - file_offset), 0); file_bytes_left = (std::max)(static_cast<int>(files.file_size(file_index) - file_offset), 0);
// there are no bytes left in this file, move to the next one // there are no bytes left in this file, move to the next one
// this loop skips over empty files // this loop skips over empty files
while (file_bytes_left == 0) while (file_bytes_left == 0)
{ {
need_reopen_file = true;
++file_index; ++file_index;
file_offset = 0; file_offset = 0;
TORRENT_ASSERT(file_index < files().num_files()); TORRENT_ASSERT(file_index < files.num_files());
// this should not happen. bytes_left should be clamped by the total // this should not happen. bytes_left should be clamped by the total
// size of the torrent, so we should never run off the end of it // size of the torrent, so we should never run off the end of it
if (file_index >= files().num_files()) return size; if (file_index >= files.num_files()) return size;
file_bytes_left = bytes_left; file_bytes_left = bytes_left;
if (file_offset + file_bytes_left > files().file_size(file_index)) if (file_offset + file_bytes_left > files.file_size(file_index))
file_bytes_left = (std::max)(static_cast<int>(files().file_size(file_index) - file_offset), 0); file_bytes_left = (std::max)(static_cast<int>(files.file_size(file_index) - file_offset), 0);
} }
if (files().pad_file_at(file_index)) // make a copy of the iovec array that _just_ covers the next
{ // file_bytes_left bytes, i.e. just this one operation
if ((op.mode & file::rw_mask) == file::read_only) copy_bufs(current_buf, file_bytes_left, tmp_buf);
{
int num_tmp_bufs = copy_bufs(current_buf, file_bytes_left, tmp_bufs);
TORRENT_ASSERT(count_bufs(tmp_bufs, file_bytes_left) == num_tmp_bufs);
TORRENT_ASSERT(num_tmp_bufs <= num_bufs);
clear_bufs(tmp_bufs, num_tmp_bufs);
}
advance_bufs(current_buf, file_bytes_left);
bytes_left -= file_bytes_left;
file_offset += file_bytes_left;
TORRENT_ASSERT(count_bufs(current_buf, bytes_left) <= num_bufs);
continue;
}
int num_tmp_bufs = copy_bufs(current_buf, file_bytes_left, tmp_bufs); int bytes_transferred = op.file_op(file_index, file_offset,
TORRENT_ASSERT(count_bufs(tmp_bufs, file_bytes_left) == num_tmp_bufs); file_bytes_left, tmp_buf, ec);
TORRENT_ASSERT(num_tmp_bufs <= num_bufs); if (ec) return -1;
int bytes_transferred = 0;
error_code e;
if ((op.mode & file::rw_mask) == file::read_write)
{
// invalidate our stat cache for this file, since
// we're writing to it
m_stat_cache.set_dirty(file_index);
}
if ((file_index < int(m_file_priority.size())
&& m_file_priority[file_index] == 0)
|| files().pad_file_at(file_index))
{
need_partfile();
if ((op.mode & file::rw_mask) == file::read_write)
{
// TODO: 3 which one of these are called should be determined by
// the fileop object.
// write
bytes_transferred = m_part_file->writev(tmp_bufs, num_tmp_bufs
, piece, offset, e);
}
else
{
// read
bytes_transferred = m_part_file->readv(tmp_bufs, num_tmp_bufs
, piece, offset, e);
}
if (e)
{
ec.ec = e;
ec.file = file_index;
ec.operation = (op.mode & file::rw_mask) == file::read_only
? storage_error::partfile_read : storage_error::partfile_write;
return -1;
}
}
else
{
if (need_reopen_file)
{
handle = open_file(file_index, op.mode, ec);
if (ec) return -1;
need_reopen_file = false;
if (m_allocate_files && (op.mode & file::rw_mask) != file::read_only)
{
if (m_file_created.size() != files().num_files())
m_file_created.resize(files().num_files(), false);
TORRENT_ASSERT(int(m_file_created.size()) == files().num_files());
TORRENT_ASSERT(file_index < m_file_created.size());
// if this is the first time we open this file for writing,
// and we have m_allocate_files enabled, set the final size of
// the file right away, to allocate it on the filesystem.
if (m_file_created[file_index] == false)
{
handle->set_size(files().file_size(file_index), e);
m_file_created.set_bit(file_index);
if (e)
{
ec.ec = e;
ec.file = file_index;
ec.operation = storage_error::fallocate;
return -1;
}
}
}
}
// please ignore the adjusted_offset. It's just file_offset.
boost::int64_t adjusted_offset =
#ifndef TORRENT_NO_DEPRECATE
files().file_base_deprecated(file_index) +
#endif
file_offset;
#ifdef TORRENT_DISK_STATS
int flags = ((op.mode & file::rw_mask) == file::read_only) ? op_read : op_write;
write_access_log(adjusted_offset, handle->file_id(), op_start | flags, clock_type::now());
#endif
bytes_transferred = int(((*handle).*op.op)(adjusted_offset
, tmp_bufs, num_tmp_bufs, e, op.mode));
// we either get an error or 0 or more bytes read
TORRENT_ASSERT(e || bytes_transferred >= 0);
#ifdef TORRENT_DISK_STATS
write_access_log(adjusted_offset + bytes_transferred, handle->file_id(), op_end | flags, clock_type::now());
#endif
TORRENT_ASSERT(bytes_transferred <= bufs_size(tmp_bufs, num_tmp_bufs));
}
if (e)
{
ec.ec = e;
ec.file = file_index;
ec.operation = (op.mode & file::rw_mask) == file::read_only
? storage_error::read : storage_error::write;
return -1;
}
// advance our position in the iovec array and the file offset.
advance_bufs(current_buf, bytes_transferred);
bytes_left -= bytes_transferred; bytes_left -= bytes_transferred;
file_offset += bytes_transferred; file_offset += bytes_transferred;
TORRENT_ASSERT(count_bufs(current_buf, bytes_left) <= num_bufs);
// if the file operation returned 0, we've hit end-of-file. We're done // if the file operation returned 0, we've hit end-of-file. We're done
if (bytes_transferred == 0) if (bytes_transferred == 0)
{ {
if (file_bytes_left > 0 ) if (file_bytes_left > 0 )
{ {
// fill in this information in case the caller wants to treat // fill in this information in case the caller wants to treat
// this as an error // a short-read as an error
ec.file = file_index; ec.file = file_index;
ec.operation = (op.mode & file::rw_mask) == file::read_only
? storage_error::read : storage_error::write;
} }
return size - bytes_left; return size - bytes_left;
} }
advance_bufs(current_buf, bytes_transferred);
TORRENT_ASSERT(count_bufs(current_buf, bytes_left) <= num_bufs);
} }
return size; return size;
} }
@ -1366,6 +1417,31 @@ namespace libtorrent
return file_handle(); return file_handle();
} }
TORRENT_ASSERT(h); TORRENT_ASSERT(h);
if (m_allocate_files && (mode & file::rw_mask) != file::read_only)
{
if (m_file_created.size() != files().num_files())
m_file_created.resize(files().num_files(), false);
TORRENT_ASSERT(int(m_file_created.size()) == files().num_files());
TORRENT_ASSERT(file < m_file_created.size());
// if this is the first time we open this file for writing,
// and we have m_allocate_files enabled, set the final size of
// the file right away, to allocate it on the filesystem.
if (m_file_created[file] == false)
{
error_code e;
h->set_size(files().file_size(file), e);
m_file_created.set_bit(file);
if (e)
{
ec.ec = e;
ec.file = file;
ec.operation = storage_error::fallocate;
return h;
}
}
}
return h; return h;
} }

View File

@ -478,6 +478,7 @@ void test_check_files(std::string const& test_path
#define storage_mode_compact storage_mode_sparse #define storage_mode_compact storage_mode_sparse
#endif #endif
// TODO: 2 split this test up into smaller parts
void run_test(bool unbuffered) void run_test(bool unbuffered)
{ {
std::string test_path = current_working_directory(); std::string test_path = current_working_directory();