From 97a40a45cd3cccd84a88459a82e4a879dabfe947 Mon Sep 17 00:00:00 2001 From: Arvid Norberg Date: Tue, 22 Feb 2011 02:53:26 +0000 Subject: [PATCH] fully support unbuffered I/O --- ChangeLog | 1 + include/libtorrent/allocator.hpp | 21 +++++ include/libtorrent/disk_buffer_holder.hpp | 4 +- include/libtorrent/disk_io_thread.hpp | 3 - include/libtorrent/storage.hpp | 5 +- src/disk_buffer_holder.cpp | 25 ++---- src/disk_io_thread.cpp | 71 +---------------- src/storage.cpp | 93 ++++++++++++++++------- 8 files changed, 99 insertions(+), 124 deletions(-) diff --git a/ChangeLog b/ChangeLog index b46890030..657516e01 100644 --- a/ChangeLog +++ b/ChangeLog @@ -83,6 +83,7 @@ incoming connection * added more detailed instrumentation of the disk I/O thread + * implemented unaligned write (for unbuffered I/O) * fixed broadcast_lsd option * fixed udp-socket race condition when using a proxy * end-game mode optimizations diff --git a/include/libtorrent/allocator.hpp b/include/libtorrent/allocator.hpp index 0d481531e..2a52320ce 100644 --- a/include/libtorrent/allocator.hpp +++ b/include/libtorrent/allocator.hpp @@ -50,6 +50,27 @@ namespace libtorrent static void free(char* const block); }; + struct TORRENT_EXPORT aligned_holder + { + aligned_holder(): m_buf(0) {} + aligned_holder(int size): m_buf(page_aligned_allocator::malloc(size)) {} + ~aligned_holder() { if (m_buf) page_aligned_allocator::free(m_buf); } + char* get() const { return m_buf; } + void reset(char* buf = 0) + { + if (m_buf) page_aligned_allocator::free(m_buf); + m_buf = buf; + } + void swap(aligned_holder& h) + { + char* tmp = m_buf; + m_buf = h.m_buf; + h.m_buf = tmp; + } + private: + char* m_buf; + }; + } #endif diff --git a/include/libtorrent/disk_buffer_holder.hpp b/include/libtorrent/disk_buffer_holder.hpp index d5536cedf..e342f1185 100644 --- a/include/libtorrent/disk_buffer_holder.hpp +++ b/include/libtorrent/disk_buffer_holder.hpp @@ -47,11 +47,10 @@ namespace libtorrent { disk_buffer_holder(aux::session_impl& ses, char* buf); disk_buffer_holder(disk_buffer_pool& disk_pool, char* buf); - disk_buffer_holder(disk_buffer_pool& disk_pool, char* buf, int num_blocks); ~disk_buffer_holder(); char* release(); char* get() const { return m_buf; } - void reset(char* buf = 0, int num_blocks = 1); + void reset(char* buf = 0); void swap(disk_buffer_holder& h) { TORRENT_ASSERT(&h.m_disk_pool == &m_disk_pool); @@ -65,7 +64,6 @@ namespace libtorrent private: disk_buffer_pool& m_disk_pool; char* m_buf; - int m_num_blocks; }; } diff --git a/include/libtorrent/disk_io_thread.hpp b/include/libtorrent/disk_io_thread.hpp index 8da16d6e5..6e83ba664 100644 --- a/include/libtorrent/disk_io_thread.hpp +++ b/include/libtorrent/disk_io_thread.hpp @@ -222,9 +222,6 @@ namespace libtorrent void free_buffer(char* buf); void free_multiple_buffers(char** bufvec, int numbufs); - char* allocate_buffers(int blocks, char const* category); - void free_buffers(char* buf, int blocks); - int block_size() const { return m_block_size; } #ifdef TORRENT_STATS diff --git a/include/libtorrent/storage.hpp b/include/libtorrent/storage.hpp index 19469d38e..6ef03491a 100644 --- a/include/libtorrent/storage.hpp +++ b/include/libtorrent/storage.hpp @@ -62,6 +62,7 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/disk_buffer_holder.hpp" #include "libtorrent/thread.hpp" #include "libtorrent/storage_defs.hpp" +#include "libtorrent/allocator.hpp" namespace libtorrent { @@ -415,8 +416,8 @@ namespace libtorrent // used to move pieces while expanding // the storage from compact allocation // to full allocation - disk_buffer_holder m_scratch_buffer; - disk_buffer_holder m_scratch_buffer2; + aligned_holder m_scratch_buffer; + aligned_holder m_scratch_buffer2; // the piece that is in the scratch buffer int m_scratch_piece; diff --git a/src/disk_buffer_holder.cpp b/src/disk_buffer_holder.cpp index 4effe02d5..d3588f839 100644 --- a/src/disk_buffer_holder.cpp +++ b/src/disk_buffer_holder.cpp @@ -38,32 +38,21 @@ namespace libtorrent { disk_buffer_holder::disk_buffer_holder(aux::session_impl& ses, char* buf) - : m_disk_pool(ses.m_disk_thread), m_buf(buf), m_num_blocks(1) + : m_disk_pool(ses.m_disk_thread), m_buf(buf) { TORRENT_ASSERT(buf == 0 || m_disk_pool.is_disk_buffer(buf)); } disk_buffer_holder::disk_buffer_holder(disk_buffer_pool& iothread, char* buf) - : m_disk_pool(iothread), m_buf(buf), m_num_blocks(1) + : m_disk_pool(iothread), m_buf(buf) { TORRENT_ASSERT(buf == 0 || m_disk_pool.is_disk_buffer(buf)); } - disk_buffer_holder::disk_buffer_holder(disk_buffer_pool& iothread, char* buf, int num_blocks) - : m_disk_pool(iothread), m_buf(buf), m_num_blocks(num_blocks) + void disk_buffer_holder::reset(char* buf) { - TORRENT_ASSERT(buf == 0 || m_disk_pool.is_disk_buffer(buf)); - } - - void disk_buffer_holder::reset(char* buf, int num_blocks) - { - if (m_buf) - { - if (m_num_blocks == 1) m_disk_pool.free_buffer(m_buf); - else m_disk_pool.free_buffers(m_buf, m_num_blocks); - } + if (m_buf) m_disk_pool.free_buffer(m_buf); m_buf = buf; - m_num_blocks = num_blocks; } char* disk_buffer_holder::release() @@ -75,11 +64,7 @@ namespace libtorrent disk_buffer_holder::~disk_buffer_holder() { - if (m_buf) - { - if (m_num_blocks == 1) m_disk_pool.free_buffer(m_buf); - else m_disk_pool.free_buffers(m_buf, m_num_blocks); - } + if (m_buf) m_disk_pool.free_buffer(m_buf); } } diff --git a/src/disk_io_thread.cpp b/src/disk_io_thread.cpp index d8a8a08cf..d3bb3e594 100644 --- a/src/disk_io_thread.cpp +++ b/src/disk_io_thread.cpp @@ -225,80 +225,11 @@ namespace libtorrent #ifdef TORRENT_DISABLE_POOL_ALLOCATOR page_aligned_allocator::free(buf); #else - m_pool.ordered_free(buf); + m_pool.free(buf); #endif --m_in_use; } - char* disk_buffer_pool::allocate_buffers(int num_blocks, char const* category) - { - mutex::scoped_lock l(m_pool_mutex); - TORRENT_ASSERT(m_magic == 0x1337); -#ifdef TORRENT_DISABLE_POOL_ALLOCATOR - char* ret = page_aligned_allocator::malloc(m_block_size * num_blocks); -#else - char* ret = (char*)m_pool.ordered_malloc(num_blocks); - m_pool.set_next_size(m_settings.cache_buffer_chunk_size); -#endif - m_in_use += num_blocks; -#if TORRENT_USE_MLOCK - if (m_settings.lock_disk_cache) - { -#ifdef TORRENT_WINDOWS - VirtualLock(ret, m_block_size * num_blocks); -#else - mlock(ret, m_block_size * num_blocks); -#endif - } -#endif -#if defined TORRENT_DISK_STATS || defined TORRENT_STATS - m_allocations += num_blocks; -#endif -#ifdef TORRENT_DISK_STATS - m_categories[category] += num_blocks; - m_buf_to_category[ret] = category; - m_log << log_time() << " " << category << ": " << m_categories[category] << "\n"; -#endif - TORRENT_ASSERT(ret == 0 || is_disk_buffer(ret, l)); - return ret; - } - - void disk_buffer_pool::free_buffers(char* buf, int num_blocks) - { - TORRENT_ASSERT(buf); - TORRENT_ASSERT(num_blocks >= 1); - mutex::scoped_lock l(m_pool_mutex); - TORRENT_ASSERT(m_magic == 0x1337); - TORRENT_ASSERT(is_disk_buffer(buf, l)); -#if defined TORRENT_DISK_STATS || defined TORRENT_STATS - m_allocations -= num_blocks; -#endif -#ifdef TORRENT_DISK_STATS - TORRENT_ASSERT(m_categories.find(m_buf_to_category[buf]) - != m_categories.end()); - std::string const& category = m_buf_to_category[buf]; - m_categories[category] -= num_blocks; - m_log << log_time() << " " << category << ": " << m_categories[category] << "\n"; - m_buf_to_category.erase(buf); -#endif -#if TORRENT_USE_MLOCK - if (m_settings.lock_disk_cache) - { -#ifdef TORRENT_WINDOWS - VirtualUnlock(buf, m_block_size * num_blocks); -#else - munlock(buf, m_block_size * num_blocks); -#endif - } -#endif -#ifdef TORRENT_DISABLE_POOL_ALLOCATOR - page_aligned_allocator::free(buf); -#else - m_pool.ordered_free(buf, num_blocks); -#endif - m_in_use -= num_blocks; - } - void disk_buffer_pool::release_memory() { TORRENT_ASSERT(m_magic == 0x1337); diff --git a/src/storage.cpp b/src/storage.cpp index e1b13bd16..66322b5bf 100644 --- a/src/storage.cpp +++ b/src/storage.cpp @@ -1285,28 +1285,76 @@ ret: const size_type aligned_start = file_offset - start_adjust; const int aligned_size = ((size+start_adjust) & size_align) ? ((size+start_adjust) & ~size_align) + size_align + 1 : size + start_adjust; - const int num_blocks = (aligned_size + block_size - 1) / block_size; TORRENT_ASSERT((aligned_size & size_align) == 0); - disk_buffer_holder tmp_buf(*disk_pool(), disk_pool()->allocate_buffers(num_blocks, "read scratch"), num_blocks); - file::iovec_t b = {tmp_buf.get(), aligned_size}; + // allocate a temporary, aligned, buffer + aligned_holder aligned_buf(aligned_size); + file::iovec_t b = {aligned_buf.get(), aligned_size}; size_type ret = file_handle->readv(aligned_start, &b, 1, ec); - if (ret < 0) return ret; - char* read_buf = tmp_buf.get() + start_adjust; + if (ret < 0) + { + TORRENT_ASSERT(ec); + return ret; + } + if (ret < aligned_size) return (std::max)(size - (start_adjust - ret), size_type(0)); + + char* read_buf = aligned_buf.get() + start_adjust; for (file::iovec_t const* i = bufs, *end(bufs + num_bufs); i != end; ++i) { memcpy(i->iov_base, read_buf, i->iov_len); read_buf += i->iov_len; } - if (ret < size + start_adjust) return ret - start_adjust; + return size; } + // this is the really expensive one. To write unaligned, we need to read + // an aligned block, overlay the unaligned buffer, and then write it back size_type storage::write_unaligned(boost::intrusive_ptr const& file_handle , size_type file_offset, file::iovec_t const* bufs, int num_bufs, error_code& ec) { - TORRENT_ASSERT(false); // not implemented - return 0; + const int pos_align = file_handle->pos_alignment()-1; + const int size_align = file_handle->size_alignment()-1; + const int block_size = disk_pool()->block_size(); + + const int size = bufs_size(bufs, num_bufs); + const int start_adjust = file_offset & pos_align; + TORRENT_ASSERT(start_adjust == (file_offset % file_handle->pos_alignment())); + const size_type aligned_start = file_offset - start_adjust; + const int aligned_size = ((size+start_adjust) & size_align) + ? ((size+start_adjust) & ~size_align) + size_align + 1 : size + start_adjust; + TORRENT_ASSERT((aligned_size & size_align) == 0); + + // allocate a temporary, aligned, buffer + aligned_holder aligned_buf(aligned_size); + file::iovec_t b = {aligned_buf.get(), aligned_size}; + size_type ret = file_handle->readv(aligned_start, &b, 1, ec); + if (ret < 0) + { + TORRENT_ASSERT(ec); + return ret; + } + if (ret < aligned_size) return (std::max)(size - (start_adjust - ret), size_type(0)); + + // OK, we read the portion of the file. Now, overlay the buffer we're writing + + char* write_buf = aligned_buf.get() + start_adjust; + for (file::iovec_t const* i = bufs, *end(bufs + num_bufs); i != end; ++i) + { + memcpy(write_buf, i->iov_base, i->iov_len); + write_buf += i->iov_len; + } + + // write the buffer back to disk + ret = file_handle->writev(aligned_start, &b, 1, ec); + + if (ret < 0) + { + TORRENT_ASSERT(ec); + return ret; + } + if (ret < aligned_size) return (std::max)(size - (start_adjust - ret), size_type(0)); + return size; } int storage::write( @@ -1451,8 +1499,6 @@ ret: , m_state(state_none) , m_current_slot(0) , m_out_of_place(false) - , m_scratch_buffer(io, 0) - , m_scratch_buffer2(io, 0) , m_scratch_piece(-1) , m_last_piece(-1) , m_storage_constructor(sc) @@ -2274,17 +2320,12 @@ ret: if (other_piece >= 0) { - if (!m_scratch_buffer2) - { - int blocks_per_piece = (std::max)(m_files.piece_length() - / m_io_thread.block_size(), 1); - m_scratch_buffer2.reset(m_io_thread.allocate_buffers( - blocks_per_piece, "check scratch"), blocks_per_piece); - } + if (!m_scratch_buffer2.get()) + m_scratch_buffer2.reset(page_aligned_allocator::malloc(m_files.piece_length())); int piece_size = m_files.piece_size(other_piece); - if (m_storage->read(m_scratch_buffer2.get(), piece, 0, piece_size) - != piece_size) + file::iovec_t b = {m_scratch_buffer2.get(), piece_size}; + if (m_storage->readv(&b, piece, 0, 1) != piece_size) { error = m_storage->error(); TORRENT_ASSERT(error); @@ -2297,7 +2338,8 @@ ret: // the slot where this piece belongs is // free. Just move the piece there. int piece_size = m_files.piece_size(piece); - if (m_storage->write(m_scratch_buffer.get(), piece, 0, piece_size) != piece_size) + file::iovec_t b = {m_scratch_buffer.get(), piece_size}; + if (m_storage->writev(&b, piece, 0, 1) != piece_size) { error = m_storage->error(); TORRENT_ASSERT(error); @@ -2306,8 +2348,7 @@ ret: m_piece_to_slot[piece] = piece; m_slot_to_piece[piece] = piece; - if (other_piece >= 0) - m_scratch_buffer.swap(m_scratch_buffer2); + if (other_piece >= 0) m_scratch_buffer.swap(m_scratch_buffer2); TORRENT_ASSERT(int(m_piece_to_slot.size()) == m_files.num_pieces()); return need_full_check; @@ -2335,15 +2376,15 @@ ret: // there is another piece in the slot // where this one goes. Store it in the scratch // buffer until next iteration. - if (!m_scratch_buffer) + if (!m_scratch_buffer.get()) { int blocks_per_piece = (std::max)(m_files.piece_length() / m_io_thread.block_size(), 1); - m_scratch_buffer.reset(m_io_thread.allocate_buffers( - blocks_per_piece, "check scratch"), blocks_per_piece); + m_scratch_buffer.reset(page_aligned_allocator::malloc(m_files.piece_length())); } int piece_size = m_files.piece_size(other_piece); - if (m_storage->read(m_scratch_buffer.get(), piece, 0, piece_size) != piece_size) + file::iovec_t b = {m_scratch_buffer.get(), piece_size}; + if (m_storage->readv(&b, piece, 0, piece_size) != piece_size) { error = m_storage->error(); TORRENT_ASSERT(error);