merged storage::readv() and storage::writev()

This commit is contained in:
Arvid Norberg 2009-01-17 08:35:48 +00:00
parent ce6c09e070
commit 5b551c1761
4 changed files with 198 additions and 207 deletions

View File

@ -4974,6 +4974,14 @@ this::
virtual bool release_files() = 0;
virtual bool delete_files() = 0;
virtual ~storage_interface() {}
// non virtual functions
disk_io_thread* io_thread();
void set_error(boost::filesystem::path const& file, error_code const& ec) const;
error_code const& error() const;
std::string const& error_file() const;
void clear_error();
};
@ -4991,40 +4999,17 @@ it will also ``ftruncate`` all files to their target size.
Returning ``true`` indicates an error occurred.
readv()
-------
readv() writev()
----------------
::
int readv(file::iovec_t const* buf, int slot, int offset, int num_bufs) = 0;
This function should read the data in the given ``slot`` and at the given ``offset``.
It should read ``num_bufs`` buffers, where the size of each buffer is specified in the
buffer array ``bufs``. The file::iovec_t type has the following members::
struct iovec_t
{
void* iov_base;
size_t iov_len;
};
The return value is the number of bytes actually read.
Every buffer in ``bufs`` can be assumed to be page aligned and be of a page aligned size,
except for the last buffer of the torrent. The buffer can be assumed to fit a fully page
aligned number of bytes though.
writev()
--------
::
int write(const char* buf, int slot, int offset, int size) = 0;
This function should write the data to the given ``slot`` and at the given ``offset``.
It should write ``num_bufs`` buffers, where the size of each buffer is specified in the
buffer array ``bufs``. The file::iovec_t type has the following members::
These functions should read or write the data in or to the given ``slot`` at the given ``offset``.
It should read or write ``num_bufs`` buffers sequentially, where the size of each buffer
is specified in the buffer array ``bufs``. The file::iovec_t type has the following members::
struct iovec_t
{
@ -5032,13 +5017,18 @@ buffer array ``bufs``. The file::iovec_t type has the following members::
size_t iov_len;
};
The return value is the number of bytes actually written.
The return value is the number of bytes actually read or written, or -1 on failure. If
it returns -1, the error code is expected to be set to
Every buffer in ``bufs`` can be assumed to be page aligned and be of a page aligned size,
except for the last buffer of the torrent. The buffer can be assumed to fit a fully page
aligned number of bytes though.
This function should write the data in ``buf`` to the given slot (``slot``) at offset
``offset`` in that slot. The buffer size is ``size``.
except for the last buffer of the torrent. The allocated buffer can be assumed to fit a
fully page aligned number of bytes though. This is useful when reading and writing the
last piece of a file in unbuffered mode.
The ``offset`` is aligned to 16 kiB boundries *most of the time*, but there are rare
exceptions when it's not. Specifically if the read cache is disabled/or full and a
client requests unaligned data, or the file itself is not aligned in the torrent.
Most clients request aligned data.
move_storage()

View File

@ -130,6 +130,14 @@ namespace libtorrent
void close();
bool set_size(size_type size, error_code& ec);
int open_mode() const { return m_open_mode; }
// when opened in unbuffered mode, this is the
// required alignment of file_offsets. i.e.
// any (file_offset & (pos_alignment()-1)) == 0
// is a precondition
int pos_alignment() const;
size_type writev(size_type file_offset, iovec_t const* bufs, int num_bufs, error_code& ec);
size_type readv(size_type file_offset, iovec_t const* bufs, int num_bufs, error_code& ec);
@ -152,6 +160,9 @@ namespace libtorrent
static int m_page_size;
#endif
int m_open_mode;
#if defined TORRENT_WINDOWS || defined TORRENT_LINUX
mutable int m_sector_size;
#endif
};
}

View File

@ -115,6 +115,9 @@ namespace libtorrent
: m_fd(-1)
#endif
, m_open_mode(0)
#if defined TORRENT_WINDOWS || defined TORRENT_LINUX
, m_sector_size(0)
#endif
{}
file::file(fs::path const& path, int mode, error_code& ec)
@ -216,8 +219,51 @@ namespace libtorrent
#endif
}
int file::pos_alignment() const
{
// on linux and windows, file offsets needs
// to be aligned to the disk sector size
#if defined TORRENT_LINUX
if (m_sector_size == 0)
{
struct statvfs fs;
if (fstatvfs(m_fd, &fs) == 0)
m_sector_size = fs.f_bsize;
else
m_sector_size = 4096;
}
return m_sector_size;
#elif defined TORRENT_WINDOWS
if (m_sector_size == 0)
{
DWORD sectors_per_cluster;
DWORD bytes_per_sector;
DWORD free_clusters;
DWORD total_clusters;
#ifdef TORRENT_USE_WPATH
wchar_t backslash = L'\\';
#else
char backslash = '\\';
#endif
if (GetDiskFreeSpace(m_path.substr(0, m_path.find_first_of(backslash)+1).c_str()
, &sectors_per_cluster, &bytes_per_sector
, &free_clusters, &total_clusters))
m_sector_size = bytes_per_sector;
else
m_sector_size = 4096;
}
return m_sector_size;
#else
return 1;
#endif
}
void file::close()
{
#if defined TORRENT_WINDOWS || defined TORRENT_LINUX
m_sector_size = 0;
#endif
#ifdef TORRENT_WINDOWS
if (m_file_handle == INVALID_HANDLE_VALUE) return;
CloseHandle(m_file_handle);
@ -270,7 +316,9 @@ namespace libtorrent
{
bool eof = false;
int size = 0;
TORRENT_ASSERT((file_offset & (m_page_size-1)) == 0);
// when opened in no_buffer mode, the file_offset must
// be aligned to pos_alignment()
TORRENT_ASSERT((file_offset & (pos_alignment()-1)) == 0);
for (file::iovec_t const* i = bufs, *end(bufs + num_bufs); i < end; ++i)
{
TORRENT_ASSERT((int(i->iov_base) & (m_page_size-1)) == 0);
@ -427,7 +475,9 @@ namespace libtorrent
{
bool eof = false;
int size = 0;
TORRENT_ASSERT((file_offset & (m_page_size-1)) == 0);
// when opened in no_buffer mode, the file_offset must
// be aligned to pos_alignment()
TORRENT_ASSERT((file_offset & (pos_alignment()-1)) == 0);
for (file::iovec_t const* i = bufs, *end(bufs + num_bufs); i < end; ++i)
{
TORRENT_ASSERT((int(i->iov_base) & (m_page_size-1)) == 0);

View File

@ -445,9 +445,30 @@ namespace libtorrent
bool write_resume_data(entry& rd) const;
sha1_hash hash_for_slot(int slot, partial_hash& ph, int piece_size);
// this identifies a read or write operation
// so that storage::readwrite() knows what to
// do when it's actually touching the file
struct fileop
{
size_type (file::*regular_op)(size_type file_offset
, file::iovec_t const* bufs, int num_bufs, error_code& ec);
size_type (storage::*unaligned_op)(boost::shared_ptr<file> const& f
, size_type file_offset, file::iovec_t const* bufs, int num_bufs
, error_code& ec);
int mode;
};
int readwritev(file::iovec_t const* bufs, int slot, int offset
, int num_bufs, fileop const&);
~storage()
{ m_pool.release(this); }
size_type read_unaligned(boost::shared_ptr<file> const& file_handle
, size_type file_offset, file::iovec_t const* bufs, int num_bufs, error_code& ec);
size_type write_unaligned(boost::shared_ptr<file> const& file_handle
, size_type file_offset, file::iovec_t const* bufs, int num_bufs, error_code& ec);
file_storage const& files() const { return m_mapped_files?*m_mapped_files:m_files; }
boost::scoped_ptr<file_storage> m_mapped_files;
@ -1020,20 +1041,36 @@ ret:
return r;
}
int storage::readv(
file::iovec_t const* bufs
, int slot
, int offset
int storage::writev(file::iovec_t const* bufs, int slot, int offset
, int num_bufs)
{
fileop op = { &file::writev, &storage::write_unaligned, file::read_write };
return readwritev(bufs, slot, offset, num_bufs, op);
}
int storage::readv(file::iovec_t const* bufs, int slot, int offset
, int num_bufs)
{
fileop op = { &file::readv, &storage::read_unaligned, file::read_only };
return readwritev(bufs, slot, offset, num_bufs, op);
}
// much of what needs to be done when reading and writing
// is buffer management and piece to file mapping. Most
// of that is the same for reading and writing. This function
// is a template, and the fileop decides what to do with the
// file and the buffers.
int storage::readwritev(file::iovec_t const* bufs, int slot, int offset
, int num_bufs, fileop const& op)
{
TORRENT_ASSERT(bufs != 0);
TORRENT_ASSERT(slot >= 0 && slot < m_files.num_pieces());
TORRENT_ASSERT(slot >= 0);
TORRENT_ASSERT(slot < m_files.num_pieces());
TORRENT_ASSERT(offset >= 0);
TORRENT_ASSERT(offset < m_files.piece_size(slot));
TORRENT_ASSERT(num_bufs > 0);
int size = bufs_size(bufs, num_bufs);
TORRENT_ASSERT(size > 0);
#ifdef TORRENT_DEBUG
@ -1058,21 +1095,18 @@ ret:
++file_iter;
TORRENT_ASSERT(file_iter != files().end());
}
int buf_pos = 0;
error_code ec;
boost::shared_ptr<file> in;
int left_to_read = size;
boost::shared_ptr<file> file_handle;
int bytes_left = size;
int slot_size = static_cast<int>(m_files.piece_size(slot));
if (offset + left_to_read > slot_size)
left_to_read = slot_size - offset;
if (offset + bytes_left > slot_size)
bytes_left = slot_size - offset;
TORRENT_ASSERT(left_to_read >= 0);
size_type result = left_to_read;
TORRENT_ASSERT(left_to_read == size);
TORRENT_ASSERT(bytes_left >= 0);
#ifdef TORRENT_DEBUG
int counter = 0;
@ -1082,23 +1116,23 @@ ret:
file::iovec_t* current_buf = TORRENT_ALLOCA(file::iovec_t, num_bufs);
copy_bufs(bufs, size, current_buf);
TORRENT_ASSERT(count_bufs(current_buf, size) == num_bufs);
int read_bytes;
for (;left_to_read > 0; ++file_iter, left_to_read -= read_bytes
, buf_pos += read_bytes)
int file_bytes_left;
for (;bytes_left > 0; ++file_iter, bytes_left -= file_bytes_left
, buf_pos += file_bytes_left)
{
TORRENT_ASSERT(file_iter != files().end());
TORRENT_ASSERT(buf_pos >= 0);
read_bytes = left_to_read;
if (file_offset + read_bytes > file_iter->size)
read_bytes = (std::max)(static_cast<int>(file_iter->size - file_offset), 0);
file_bytes_left = bytes_left;
if (file_offset + file_bytes_left > file_iter->size)
file_bytes_left = (std::max)(static_cast<int>(file_iter->size - file_offset), 0);
if (read_bytes == 0) continue;
if (file_bytes_left == 0) continue;
#ifdef TORRENT_DEBUG
TORRENT_ASSERT(int(slices.size()) > counter);
size_type slice_size = slices[counter].size;
TORRENT_ASSERT(slice_size == read_bytes);
TORRENT_ASSERT(slice_size == file_bytes_left);
TORRENT_ASSERT(files().at(slices[counter].file_index).path
== file_iter->path);
++counter;
@ -1106,35 +1140,52 @@ ret:
if (file_iter->pad_file)
{
int num_tmp_bufs = copy_bufs(current_buf, read_bytes, tmp_bufs);
TORRENT_ASSERT(count_bufs(tmp_bufs, read_bytes) == num_tmp_bufs);
TORRENT_ASSERT(num_tmp_bufs <= num_bufs);
clear_bufs(tmp_bufs, num_tmp_bufs);
advance_bufs(current_buf, read_bytes);
TORRENT_ASSERT(count_bufs(current_buf, left_to_read - read_bytes) <= num_bufs);
if (op.mode == file::read_only)
{
int num_tmp_bufs = copy_bufs(current_buf, file_bytes_left, tmp_bufs);
TORRENT_ASSERT(count_bufs(tmp_bufs, file_bytes_left) == num_tmp_bufs);
TORRENT_ASSERT(num_tmp_bufs <= num_bufs);
clear_bufs(tmp_bufs, num_tmp_bufs);
}
advance_bufs(current_buf, file_bytes_left);
TORRENT_ASSERT(count_bufs(current_buf, bytes_left - file_bytes_left) <= num_bufs);
continue;
}
fs::path path = m_save_path / file_iter->path;
error_code ec;
int mode = file::read_only;
int mode = op.mode;
if (io_thread()
&& io_thread()->no_buffer()
&& ((file_iter->offset + file_iter->file_base) & (m_page_size-1)) == 0)
mode |= file::no_buffer;
in = m_pool.open_file(this, path, mode, ec);
if (!in || ec)
file_handle = m_pool.open_file(this, path, mode, ec);
if (!file_handle || ec)
{
set_error(path, ec);
return -1;
}
int num_tmp_bufs = copy_bufs(current_buf, read_bytes, tmp_bufs);
TORRENT_ASSERT(count_bufs(tmp_bufs, read_bytes) == num_tmp_bufs);
int num_tmp_bufs = copy_bufs(current_buf, file_bytes_left, tmp_bufs);
TORRENT_ASSERT(count_bufs(tmp_bufs, file_bytes_left) == num_tmp_bufs);
TORRENT_ASSERT(num_tmp_bufs <= num_bufs);
int actual_read = int(in->readv(file_iter->file_base
+ file_offset, tmp_bufs, num_tmp_bufs, ec));
int bytes_transferred = 0;
// if the file is opened in no_buffer mode, and the
// read is unaligned, we need to fall back on a slow
// special read that reads aligned buffers and copies
// it into the one supplied
if ((file_handle->open_mode() & file::no_buffer)
&& ((file_iter->file_base + file_offset) & (file_handle->pos_alignment()-1)) != 0)
{
bytes_transferred = (this->*op.unaligned_op)(file_handle, file_iter->file_base
+ file_offset, tmp_bufs, num_tmp_bufs, ec);
}
else
{
bytes_transferred = (int)((*file_handle).*op.regular_op)(file_iter->file_base
+ file_offset, tmp_bufs, num_tmp_bufs, ec);
}
file_offset = 0;
if (ec)
@ -1143,21 +1194,41 @@ ret:
return -1;
}
if (read_bytes != actual_read)
if (file_bytes_left != bytes_transferred)
{
// the file was not big enough
#ifdef TORRENT_WINDOWS
ec = error_code(ERROR_READ_FAULT, get_system_category());
ec = error_code(ERROR_HANDLE_EOF, get_system_category());
#else
ec = error_code(EIO, get_posix_category());
#endif
set_error(m_save_path / file_iter->path, ec);
return -1;
}
advance_bufs(current_buf, actual_read);
TORRENT_ASSERT(count_bufs(current_buf, left_to_read - read_bytes) <= num_bufs);
advance_bufs(current_buf, bytes_transferred);
TORRENT_ASSERT(count_bufs(current_buf, bytes_left - file_bytes_left) <= num_bufs);
}
return result;
return size;
}
// these functions are inefficient, but should be fairly uncommon. The read
// case happens if unaligned files are opened in no_buffer mode or if clients
// makes unaligned requests (and the disk cache is disabled or fully utilized
// for write cache).
// they read an unaligned buffer from a file that requires aligned access
size_type storage::read_unaligned(boost::shared_ptr<file> const& file_handle
, size_type file_offset, file::iovec_t const* bufs, int num_bufs, error_code& ec)
{
TORRENT_ASSERT(false); // not implemented
return 0;
}
size_type storage::write_unaligned(boost::shared_ptr<file> const& file_handle
, size_type file_offset, file::iovec_t const* bufs, int num_bufs, error_code& ec)
{
TORRENT_ASSERT(false); // not implemented
return 0;
}
int storage::write(
@ -1184,137 +1255,6 @@ ret:
return readv(&b, slot, offset, 1);
}
int storage::writev(
file::iovec_t const* bufs
, int slot
, int offset
, int num_bufs)
{
TORRENT_ASSERT(bufs != 0);
TORRENT_ASSERT(slot >= 0);
TORRENT_ASSERT(slot < m_files.num_pieces());
TORRENT_ASSERT(offset >= 0);
TORRENT_ASSERT(num_bufs > 0);
int size = bufs_size(bufs, num_bufs);
TORRENT_ASSERT(size > 0);
#ifdef TORRENT_DEBUG
std::vector<file_slice> slices
= files().map_block(slot, offset, size);
TORRENT_ASSERT(!slices.empty());
#endif
size_type start = slot * (size_type)m_files.piece_length() + offset;
TORRENT_ASSERT(start + size <= m_files.total_size());
// find the file iterator and file offset
size_type file_offset = start;
std::vector<file_entry>::const_iterator file_iter;
for (file_iter = files().begin();;)
{
if (file_offset < file_iter->size)
break;
file_offset -= file_iter->size;
++file_iter;
TORRENT_ASSERT(file_iter != files().end());
}
int buf_pos = 0;
error_code ec;
boost::shared_ptr<file> out;
int left_to_write = size;
int slot_size = static_cast<int>(m_files.piece_size(slot));
if (offset + left_to_write > slot_size)
left_to_write = slot_size - offset;
TORRENT_ASSERT(left_to_write >= 0);
#ifdef TORRENT_DEBUG
int counter = 0;
#endif
file::iovec_t* tmp_bufs = TORRENT_ALLOCA(file::iovec_t, num_bufs);
file::iovec_t* current_buf = TORRENT_ALLOCA(file::iovec_t, num_bufs);
copy_bufs(bufs, size, current_buf);
TORRENT_ASSERT(count_bufs(current_buf, size) == num_bufs);
int write_bytes;
for (;left_to_write > 0; ++file_iter, left_to_write -= write_bytes
, buf_pos += write_bytes)
{
TORRENT_ASSERT(file_iter != files().end());
TORRENT_ASSERT(buf_pos >= 0);
write_bytes = left_to_write;
if (file_offset + write_bytes > file_iter->size)
write_bytes = (std::max)(static_cast<int>(file_iter->size - file_offset), 0);
if (write_bytes == 0) continue;
#ifdef TORRENT_DEBUG
TORRENT_ASSERT(int(slices.size()) > counter);
size_type slice_size = slices[counter].size;
TORRENT_ASSERT(slice_size == write_bytes);
TORRENT_ASSERT(files().at(slices[counter].file_index).path
== file_iter->path);
++counter;
#endif
if (file_iter->pad_file)
{
advance_bufs(current_buf, write_bytes);
TORRENT_ASSERT(count_bufs(current_buf, left_to_write - write_bytes) <= num_bufs);
continue;
}
fs::path path = m_save_path / file_iter->path;
error_code ec;
int mode = file::read_write;
if (io_thread()
&& io_thread()->no_buffer()
&& ((file_iter->offset + file_iter->file_base) & (m_page_size-1)) == 0)
mode |= file::no_buffer;
out = m_pool.open_file(this, path, mode, ec);
if (!out || ec)
{
set_error(path, ec);
return -1;
}
int num_tmp_bufs = copy_bufs(current_buf, write_bytes, tmp_bufs);
TORRENT_ASSERT(count_bufs(tmp_bufs, write_bytes) == num_tmp_bufs);
int actual_written = int(out->writev(file_iter->file_base
+ file_offset, tmp_bufs, num_tmp_bufs, ec));
file_offset = 0;
if (ec)
{
set_error(m_save_path / file_iter->path, ec);
return -1;
}
if (write_bytes != actual_written)
{
// the file was not big enough
#ifdef TORRENT_WINDOWS
ec = error_code(ERROR_READ_FAULT, get_system_category());
#else
ec = error_code(EIO, get_posix_category());
#endif
set_error(m_save_path / file_iter->path, ec);
return -1;
}
advance_bufs(current_buf, actual_written);
}
return size;
}
storage_interface* default_storage_constructor(file_storage const& fs
, fs::path const& path, file_pool& fp)
{