fully support unbuffered I/O

This commit is contained in:
Arvid Norberg 2011-02-22 02:53:26 +00:00
parent f22cf00aa4
commit 97a40a45cd
8 changed files with 99 additions and 124 deletions

View File

@ -83,6 +83,7 @@
incoming connection incoming connection
* added more detailed instrumentation of the disk I/O thread * added more detailed instrumentation of the disk I/O thread
* implemented unaligned write (for unbuffered I/O)
* fixed broadcast_lsd option * fixed broadcast_lsd option
* fixed udp-socket race condition when using a proxy * fixed udp-socket race condition when using a proxy
* end-game mode optimizations * end-game mode optimizations

View File

@ -50,6 +50,27 @@ namespace libtorrent
static void free(char* const block); static void free(char* const block);
}; };
struct TORRENT_EXPORT aligned_holder
{
aligned_holder(): m_buf(0) {}
aligned_holder(int size): m_buf(page_aligned_allocator::malloc(size)) {}
~aligned_holder() { if (m_buf) page_aligned_allocator::free(m_buf); }
char* get() const { return m_buf; }
void reset(char* buf = 0)
{
if (m_buf) page_aligned_allocator::free(m_buf);
m_buf = buf;
}
void swap(aligned_holder& h)
{
char* tmp = m_buf;
m_buf = h.m_buf;
h.m_buf = tmp;
}
private:
char* m_buf;
};
} }
#endif #endif

View File

@ -47,11 +47,10 @@ namespace libtorrent
{ {
disk_buffer_holder(aux::session_impl& ses, char* buf); disk_buffer_holder(aux::session_impl& ses, char* buf);
disk_buffer_holder(disk_buffer_pool& disk_pool, char* buf); disk_buffer_holder(disk_buffer_pool& disk_pool, char* buf);
disk_buffer_holder(disk_buffer_pool& disk_pool, char* buf, int num_blocks);
~disk_buffer_holder(); ~disk_buffer_holder();
char* release(); char* release();
char* get() const { return m_buf; } char* get() const { return m_buf; }
void reset(char* buf = 0, int num_blocks = 1); void reset(char* buf = 0);
void swap(disk_buffer_holder& h) void swap(disk_buffer_holder& h)
{ {
TORRENT_ASSERT(&h.m_disk_pool == &m_disk_pool); TORRENT_ASSERT(&h.m_disk_pool == &m_disk_pool);
@ -65,7 +64,6 @@ namespace libtorrent
private: private:
disk_buffer_pool& m_disk_pool; disk_buffer_pool& m_disk_pool;
char* m_buf; char* m_buf;
int m_num_blocks;
}; };
} }

View File

@ -222,9 +222,6 @@ namespace libtorrent
void free_buffer(char* buf); void free_buffer(char* buf);
void free_multiple_buffers(char** bufvec, int numbufs); void free_multiple_buffers(char** bufvec, int numbufs);
char* allocate_buffers(int blocks, char const* category);
void free_buffers(char* buf, int blocks);
int block_size() const { return m_block_size; } int block_size() const { return m_block_size; }
#ifdef TORRENT_STATS #ifdef TORRENT_STATS

View File

@ -62,6 +62,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/disk_buffer_holder.hpp" #include "libtorrent/disk_buffer_holder.hpp"
#include "libtorrent/thread.hpp" #include "libtorrent/thread.hpp"
#include "libtorrent/storage_defs.hpp" #include "libtorrent/storage_defs.hpp"
#include "libtorrent/allocator.hpp"
namespace libtorrent namespace libtorrent
{ {
@ -415,8 +416,8 @@ namespace libtorrent
// used to move pieces while expanding // used to move pieces while expanding
// the storage from compact allocation // the storage from compact allocation
// to full allocation // to full allocation
disk_buffer_holder m_scratch_buffer; aligned_holder m_scratch_buffer;
disk_buffer_holder m_scratch_buffer2; aligned_holder m_scratch_buffer2;
// the piece that is in the scratch buffer // the piece that is in the scratch buffer
int m_scratch_piece; int m_scratch_piece;

View File

@ -38,32 +38,21 @@ namespace libtorrent
{ {
disk_buffer_holder::disk_buffer_holder(aux::session_impl& ses, char* buf) disk_buffer_holder::disk_buffer_holder(aux::session_impl& ses, char* buf)
: m_disk_pool(ses.m_disk_thread), m_buf(buf), m_num_blocks(1) : m_disk_pool(ses.m_disk_thread), m_buf(buf)
{ {
TORRENT_ASSERT(buf == 0 || m_disk_pool.is_disk_buffer(buf)); TORRENT_ASSERT(buf == 0 || m_disk_pool.is_disk_buffer(buf));
} }
disk_buffer_holder::disk_buffer_holder(disk_buffer_pool& iothread, char* buf) disk_buffer_holder::disk_buffer_holder(disk_buffer_pool& iothread, char* buf)
: m_disk_pool(iothread), m_buf(buf), m_num_blocks(1) : m_disk_pool(iothread), m_buf(buf)
{ {
TORRENT_ASSERT(buf == 0 || m_disk_pool.is_disk_buffer(buf)); TORRENT_ASSERT(buf == 0 || m_disk_pool.is_disk_buffer(buf));
} }
disk_buffer_holder::disk_buffer_holder(disk_buffer_pool& iothread, char* buf, int num_blocks) void disk_buffer_holder::reset(char* buf)
: m_disk_pool(iothread), m_buf(buf), m_num_blocks(num_blocks)
{ {
TORRENT_ASSERT(buf == 0 || m_disk_pool.is_disk_buffer(buf)); if (m_buf) m_disk_pool.free_buffer(m_buf);
}
void disk_buffer_holder::reset(char* buf, int num_blocks)
{
if (m_buf)
{
if (m_num_blocks == 1) m_disk_pool.free_buffer(m_buf);
else m_disk_pool.free_buffers(m_buf, m_num_blocks);
}
m_buf = buf; m_buf = buf;
m_num_blocks = num_blocks;
} }
char* disk_buffer_holder::release() char* disk_buffer_holder::release()
@ -75,11 +64,7 @@ namespace libtorrent
disk_buffer_holder::~disk_buffer_holder() disk_buffer_holder::~disk_buffer_holder()
{ {
if (m_buf) if (m_buf) m_disk_pool.free_buffer(m_buf);
{
if (m_num_blocks == 1) m_disk_pool.free_buffer(m_buf);
else m_disk_pool.free_buffers(m_buf, m_num_blocks);
}
} }
} }

View File

@ -225,80 +225,11 @@ namespace libtorrent
#ifdef TORRENT_DISABLE_POOL_ALLOCATOR #ifdef TORRENT_DISABLE_POOL_ALLOCATOR
page_aligned_allocator::free(buf); page_aligned_allocator::free(buf);
#else #else
m_pool.ordered_free(buf); m_pool.free(buf);
#endif #endif
--m_in_use; --m_in_use;
} }
char* disk_buffer_pool::allocate_buffers(int num_blocks, char const* category)
{
mutex::scoped_lock l(m_pool_mutex);
TORRENT_ASSERT(m_magic == 0x1337);
#ifdef TORRENT_DISABLE_POOL_ALLOCATOR
char* ret = page_aligned_allocator::malloc(m_block_size * num_blocks);
#else
char* ret = (char*)m_pool.ordered_malloc(num_blocks);
m_pool.set_next_size(m_settings.cache_buffer_chunk_size);
#endif
m_in_use += num_blocks;
#if TORRENT_USE_MLOCK
if (m_settings.lock_disk_cache)
{
#ifdef TORRENT_WINDOWS
VirtualLock(ret, m_block_size * num_blocks);
#else
mlock(ret, m_block_size * num_blocks);
#endif
}
#endif
#if defined TORRENT_DISK_STATS || defined TORRENT_STATS
m_allocations += num_blocks;
#endif
#ifdef TORRENT_DISK_STATS
m_categories[category] += num_blocks;
m_buf_to_category[ret] = category;
m_log << log_time() << " " << category << ": " << m_categories[category] << "\n";
#endif
TORRENT_ASSERT(ret == 0 || is_disk_buffer(ret, l));
return ret;
}
void disk_buffer_pool::free_buffers(char* buf, int num_blocks)
{
TORRENT_ASSERT(buf);
TORRENT_ASSERT(num_blocks >= 1);
mutex::scoped_lock l(m_pool_mutex);
TORRENT_ASSERT(m_magic == 0x1337);
TORRENT_ASSERT(is_disk_buffer(buf, l));
#if defined TORRENT_DISK_STATS || defined TORRENT_STATS
m_allocations -= num_blocks;
#endif
#ifdef TORRENT_DISK_STATS
TORRENT_ASSERT(m_categories.find(m_buf_to_category[buf])
!= m_categories.end());
std::string const& category = m_buf_to_category[buf];
m_categories[category] -= num_blocks;
m_log << log_time() << " " << category << ": " << m_categories[category] << "\n";
m_buf_to_category.erase(buf);
#endif
#if TORRENT_USE_MLOCK
if (m_settings.lock_disk_cache)
{
#ifdef TORRENT_WINDOWS
VirtualUnlock(buf, m_block_size * num_blocks);
#else
munlock(buf, m_block_size * num_blocks);
#endif
}
#endif
#ifdef TORRENT_DISABLE_POOL_ALLOCATOR
page_aligned_allocator::free(buf);
#else
m_pool.ordered_free(buf, num_blocks);
#endif
m_in_use -= num_blocks;
}
void disk_buffer_pool::release_memory() void disk_buffer_pool::release_memory()
{ {
TORRENT_ASSERT(m_magic == 0x1337); TORRENT_ASSERT(m_magic == 0x1337);

View File

@ -1285,28 +1285,76 @@ ret:
const size_type aligned_start = file_offset - start_adjust; const size_type aligned_start = file_offset - start_adjust;
const int aligned_size = ((size+start_adjust) & size_align) const int aligned_size = ((size+start_adjust) & size_align)
? ((size+start_adjust) & ~size_align) + size_align + 1 : size + start_adjust; ? ((size+start_adjust) & ~size_align) + size_align + 1 : size + start_adjust;
const int num_blocks = (aligned_size + block_size - 1) / block_size;
TORRENT_ASSERT((aligned_size & size_align) == 0); TORRENT_ASSERT((aligned_size & size_align) == 0);
disk_buffer_holder tmp_buf(*disk_pool(), disk_pool()->allocate_buffers(num_blocks, "read scratch"), num_blocks); // allocate a temporary, aligned, buffer
file::iovec_t b = {tmp_buf.get(), aligned_size}; aligned_holder aligned_buf(aligned_size);
file::iovec_t b = {aligned_buf.get(), aligned_size};
size_type ret = file_handle->readv(aligned_start, &b, 1, ec); size_type ret = file_handle->readv(aligned_start, &b, 1, ec);
if (ret < 0) return ret; if (ret < 0)
char* read_buf = tmp_buf.get() + start_adjust; {
TORRENT_ASSERT(ec);
return ret;
}
if (ret < aligned_size) return (std::max)(size - (start_adjust - ret), size_type(0));
char* read_buf = aligned_buf.get() + start_adjust;
for (file::iovec_t const* i = bufs, *end(bufs + num_bufs); i != end; ++i) for (file::iovec_t const* i = bufs, *end(bufs + num_bufs); i != end; ++i)
{ {
memcpy(i->iov_base, read_buf, i->iov_len); memcpy(i->iov_base, read_buf, i->iov_len);
read_buf += i->iov_len; read_buf += i->iov_len;
} }
if (ret < size + start_adjust) return ret - start_adjust;
return size; return size;
} }
// this is the really expensive one. To write unaligned, we need to read
// an aligned block, overlay the unaligned buffer, and then write it back
size_type storage::write_unaligned(boost::intrusive_ptr<file> const& file_handle size_type storage::write_unaligned(boost::intrusive_ptr<file> const& file_handle
, size_type file_offset, file::iovec_t const* bufs, int num_bufs, error_code& ec) , size_type file_offset, file::iovec_t const* bufs, int num_bufs, error_code& ec)
{ {
TORRENT_ASSERT(false); // not implemented const int pos_align = file_handle->pos_alignment()-1;
return 0; const int size_align = file_handle->size_alignment()-1;
const int block_size = disk_pool()->block_size();
const int size = bufs_size(bufs, num_bufs);
const int start_adjust = file_offset & pos_align;
TORRENT_ASSERT(start_adjust == (file_offset % file_handle->pos_alignment()));
const size_type aligned_start = file_offset - start_adjust;
const int aligned_size = ((size+start_adjust) & size_align)
? ((size+start_adjust) & ~size_align) + size_align + 1 : size + start_adjust;
TORRENT_ASSERT((aligned_size & size_align) == 0);
// allocate a temporary, aligned, buffer
aligned_holder aligned_buf(aligned_size);
file::iovec_t b = {aligned_buf.get(), aligned_size};
size_type ret = file_handle->readv(aligned_start, &b, 1, ec);
if (ret < 0)
{
TORRENT_ASSERT(ec);
return ret;
}
if (ret < aligned_size) return (std::max)(size - (start_adjust - ret), size_type(0));
// OK, we read the portion of the file. Now, overlay the buffer we're writing
char* write_buf = aligned_buf.get() + start_adjust;
for (file::iovec_t const* i = bufs, *end(bufs + num_bufs); i != end; ++i)
{
memcpy(write_buf, i->iov_base, i->iov_len);
write_buf += i->iov_len;
}
// write the buffer back to disk
ret = file_handle->writev(aligned_start, &b, 1, ec);
if (ret < 0)
{
TORRENT_ASSERT(ec);
return ret;
}
if (ret < aligned_size) return (std::max)(size - (start_adjust - ret), size_type(0));
return size;
} }
int storage::write( int storage::write(
@ -1451,8 +1499,6 @@ ret:
, m_state(state_none) , m_state(state_none)
, m_current_slot(0) , m_current_slot(0)
, m_out_of_place(false) , m_out_of_place(false)
, m_scratch_buffer(io, 0)
, m_scratch_buffer2(io, 0)
, m_scratch_piece(-1) , m_scratch_piece(-1)
, m_last_piece(-1) , m_last_piece(-1)
, m_storage_constructor(sc) , m_storage_constructor(sc)
@ -2274,17 +2320,12 @@ ret:
if (other_piece >= 0) if (other_piece >= 0)
{ {
if (!m_scratch_buffer2) if (!m_scratch_buffer2.get())
{ m_scratch_buffer2.reset(page_aligned_allocator::malloc(m_files.piece_length()));
int blocks_per_piece = (std::max)(m_files.piece_length()
/ m_io_thread.block_size(), 1);
m_scratch_buffer2.reset(m_io_thread.allocate_buffers(
blocks_per_piece, "check scratch"), blocks_per_piece);
}
int piece_size = m_files.piece_size(other_piece); int piece_size = m_files.piece_size(other_piece);
if (m_storage->read(m_scratch_buffer2.get(), piece, 0, piece_size) file::iovec_t b = {m_scratch_buffer2.get(), piece_size};
!= piece_size) if (m_storage->readv(&b, piece, 0, 1) != piece_size)
{ {
error = m_storage->error(); error = m_storage->error();
TORRENT_ASSERT(error); TORRENT_ASSERT(error);
@ -2297,7 +2338,8 @@ ret:
// the slot where this piece belongs is // the slot where this piece belongs is
// free. Just move the piece there. // free. Just move the piece there.
int piece_size = m_files.piece_size(piece); int piece_size = m_files.piece_size(piece);
if (m_storage->write(m_scratch_buffer.get(), piece, 0, piece_size) != piece_size) file::iovec_t b = {m_scratch_buffer.get(), piece_size};
if (m_storage->writev(&b, piece, 0, 1) != piece_size)
{ {
error = m_storage->error(); error = m_storage->error();
TORRENT_ASSERT(error); TORRENT_ASSERT(error);
@ -2306,8 +2348,7 @@ ret:
m_piece_to_slot[piece] = piece; m_piece_to_slot[piece] = piece;
m_slot_to_piece[piece] = piece; m_slot_to_piece[piece] = piece;
if (other_piece >= 0) if (other_piece >= 0) m_scratch_buffer.swap(m_scratch_buffer2);
m_scratch_buffer.swap(m_scratch_buffer2);
TORRENT_ASSERT(int(m_piece_to_slot.size()) == m_files.num_pieces()); TORRENT_ASSERT(int(m_piece_to_slot.size()) == m_files.num_pieces());
return need_full_check; return need_full_check;
@ -2335,15 +2376,15 @@ ret:
// there is another piece in the slot // there is another piece in the slot
// where this one goes. Store it in the scratch // where this one goes. Store it in the scratch
// buffer until next iteration. // buffer until next iteration.
if (!m_scratch_buffer) if (!m_scratch_buffer.get())
{ {
int blocks_per_piece = (std::max)(m_files.piece_length() / m_io_thread.block_size(), 1); int blocks_per_piece = (std::max)(m_files.piece_length() / m_io_thread.block_size(), 1);
m_scratch_buffer.reset(m_io_thread.allocate_buffers( m_scratch_buffer.reset(page_aligned_allocator::malloc(m_files.piece_length()));
blocks_per_piece, "check scratch"), blocks_per_piece);
} }
int piece_size = m_files.piece_size(other_piece); int piece_size = m_files.piece_size(other_piece);
if (m_storage->read(m_scratch_buffer.get(), piece, 0, piece_size) != piece_size) file::iovec_t b = {m_scratch_buffer.get(), piece_size};
if (m_storage->readv(&b, piece, 0, piece_size) != piece_size)
{ {
error = m_storage->error(); error = m_storage->error();
TORRENT_ASSERT(error); TORRENT_ASSERT(error);