diff --git a/ChangeLog b/ChangeLog index fe24805a9..411aabbbf 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,4 +1,5 @@ + * fixed disk I/O performance of checking hashes and creating torrents * fix race condition in part_file * fix part_file open mode compatibility test * fixed race condition in random number generator diff --git a/src/disk_io_thread.cpp b/src/disk_io_thread.cpp index 3c0865867..7b69f55fe 100644 --- a/src/disk_io_thread.cpp +++ b/src/disk_io_thread.cpp @@ -2374,8 +2374,28 @@ namespace libtorrent } partial_hash* ph = pe->hash; - int block_size = m_disk_cache.block_size(); - int blocks_in_piece = (piece_size + block_size - 1) / block_size; + int const block_size = m_disk_cache.block_size(); + int const blocks_in_piece = (piece_size + block_size - 1) / block_size; + + // we don't care about anything to the left of ph->offset + // since those blocks have already been hashed. + // we just care about [firs_block, first_block + blocks_left] + int const first_block = ph->offset / block_size; + int const blocks_left = blocks_in_piece - first_block; + + // ph->offset + // | first_block + // | | + // v v + // +---+---+---+---+---+---+ + // | | | | | | | + // +---+---+---+---+---+---+ + // + // \-----------/ + // blocks_left + // + // \-----------------------/ + // blocks_in_piece // keep track of which blocks we have locked by incrementing // their refcounts. This is used to decrement only these blocks @@ -2387,13 +2407,13 @@ namespace libtorrent // increment the refcounts of all // blocks up front, and then hash them without holding the lock TORRENT_PIECE_ASSERT(ph->offset % block_size == 0, pe); - for (int i = ph->offset / block_size; i < blocks_in_piece; ++i) + for (int i = 0; i < blocks_left; ++i) { // is the block not in the cache? - if (pe->blocks[i].buf == NULL) continue; + if (pe->blocks[first_block + i].buf == NULL) continue; // if we fail to lock the block, it' no longer in the cache - if (m_disk_cache.inc_block_refcount(pe, i, block_cache::ref_hashing) == false) + if (m_disk_cache.inc_block_refcount(pe, first_block + i, block_cache::ref_hashing) == false) continue; locked_blocks[num_locked_blocks++] = i; @@ -2410,96 +2430,152 @@ namespace libtorrent l.unlock(); + bool slow_path = true; int ret = 0; - int next_locked_block = 0; - for (int i = offset / block_size; i < blocks_in_piece; ++i) + + if (num_locked_blocks == 0) { - file::iovec_t iov; - iov.iov_len = (std::min)(block_size, piece_size - offset); - - if (next_locked_block < num_locked_blocks - && locked_blocks[next_locked_block] == i) + // this is the fast path where we don't have any blocks in the cache. + // We'll need to read all (remaining blocks) from disk + file::iovec_t* iov = TORRENT_ALLOCA(file::iovec_t, blocks_left); + ret = m_disk_cache.allocate_iovec(iov, blocks_left); + if (ret >= 0) { - ++next_locked_block; - TORRENT_PIECE_ASSERT(pe->blocks[i].buf, pe); - TORRENT_PIECE_ASSERT(offset == i * block_size, pe); - offset += iov.iov_len; - ph->h.update(pe->blocks[i].buf, iov.iov_len); - } - else - { - iov.iov_base = m_disk_cache.allocate_buffer("hashing"); + // this is the offset that's aligned to block boundaries + boost::int64_t adjusted_offset = j->d.io.offset & ~(block_size-1); - if (iov.iov_base == NULL) - { - l.lock(); - // TODO: introduce a holder class that automatically increments - // and decrements the piece_refcount - - // decrement the refcounts of the blocks we just hashed - for (int k = 0; k < num_locked_blocks; ++k) - m_disk_cache.dec_block_refcount(pe, locked_blocks[k], block_cache::ref_hashing); - - --pe->piece_refcount; - pe->hashing = false; - delete pe->hash; - pe->hash = NULL; - - m_disk_cache.maybe_free_piece(pe); - - j->error.ec = errors::no_memory; - j->error.operation = storage_error::alloc_cache_piece; - return -1; - } - - DLOG("do_hash: reading (piece: %d block: %d)\n", int(pe->piece), i); + // if this is the last piece, adjust the size of the + // last buffer to match up + iov[blocks_left-1].iov_len = std::min(int(piece_size - adjusted_offset) + - (blocks_left - 1) * block_size, block_size); + TORRENT_ASSERT(iov[blocks_left-1].iov_len > 0); time_point const start_time = clock_type::now(); + ret = j->storage->get_storage_impl()->readv(iov, blocks_left + , j->piece, offset, file_flags, j->error); - TORRENT_PIECE_ASSERT(offset == i * block_size, pe); - ret = j->storage->get_storage_impl()->readv(&iov, 1, j->piece - , offset, file_flags, j->error); - - if (ret < 0) - { - TORRENT_ASSERT(j->error.ec && j->error.operation != 0); - m_disk_cache.free_buffer(static_cast(iov.iov_base)); - l.lock(); - break; - } - - // treat a short read as an error. The hash will be invalid, the - // block cannot be cached and the main thread should skip the rest - // of this file - if (ret != iov.iov_len) - { - ret = -1; - j->error.ec.assign(boost::asio::error::eof - , boost::asio::error::get_misc_category()); - j->error.operation = storage_error::read; - m_disk_cache.free_buffer(static_cast(iov.iov_base)); - l.lock(); - break; - } - - if (!j->error.ec) + if (ret >= 0) { boost::uint32_t const read_time = total_microseconds(clock_type::now() - start_time); - m_stats_counters.inc_stats_counter(counters::num_read_back); - m_stats_counters.inc_stats_counter(counters::num_blocks_read); + m_stats_counters.inc_stats_counter(counters::num_read_back, blocks_left); + m_stats_counters.inc_stats_counter(counters::num_blocks_read, blocks_left); m_stats_counters.inc_stats_counter(counters::num_read_ops); m_stats_counters.inc_stats_counter(counters::disk_read_time, read_time); m_stats_counters.inc_stats_counter(counters::disk_job_time, read_time); + + for (int i = 0; i < blocks_left; ++i) + { + offset += iov[i].iov_len; + ph->h.update(static_cast(iov[i].iov_base), iov[i].iov_len); + } + slow_path = false; + + l.lock(); + m_disk_cache.insert_blocks(pe, first_block, iov, blocks_left, j); + l.unlock(); } + else + { + TORRENT_ASSERT(j->error.ec && j->error.operation != 0); + m_disk_cache.free_iovec(iov, blocks_left); + } + } + } - TORRENT_PIECE_ASSERT(offset == i * block_size, pe); - offset += iov.iov_len; - ph->h.update(static_cast(iov.iov_base), iov.iov_len); + if (slow_path) + { + ret = 0; + int next_locked_block = 0; + for (int i = 0; i < blocks_left; ++i) + { + file::iovec_t iov; + iov.iov_len = (std::min)(block_size, piece_size - offset); - l.lock(); - m_disk_cache.insert_blocks(pe, i, &iov, 1, j); - l.unlock(); + if (next_locked_block < num_locked_blocks + && locked_blocks[next_locked_block] == i) + { + ++next_locked_block; + TORRENT_PIECE_ASSERT(pe->blocks[first_block + i].buf, pe); + TORRENT_PIECE_ASSERT(offset == (first_block + i) * block_size, pe); + offset += iov.iov_len; + ph->h.update(pe->blocks[first_block + i].buf, iov.iov_len); + } + else + { + iov.iov_base = m_disk_cache.allocate_buffer("hashing"); + + if (iov.iov_base == NULL) + { + l.lock(); + // TODO: introduce a holder class that automatically increments + // and decrements the piece_refcount + + // decrement the refcounts of the blocks we just hashed + for (int k = 0; k < num_locked_blocks; ++k) + m_disk_cache.dec_block_refcount(pe, first_block + locked_blocks[k], block_cache::ref_hashing); + + --pe->piece_refcount; + pe->hashing = false; + delete pe->hash; + pe->hash = NULL; + + m_disk_cache.maybe_free_piece(pe); + + j->error.ec = errors::no_memory; + j->error.operation = storage_error::alloc_cache_piece; + return -1; + } + + DLOG("do_hash: reading (piece: %d block: %d)\n", int(pe->piece), first_block + i); + + time_point const start_time = clock_type::now(); + + TORRENT_PIECE_ASSERT(offset == (first_block + i) * block_size, pe); + ret = j->storage->get_storage_impl()->readv(&iov, 1, j->piece + , offset, file_flags, j->error); + + if (ret < 0) + { + TORRENT_ASSERT(j->error.ec && j->error.operation != 0); + m_disk_cache.free_buffer(static_cast(iov.iov_base)); + l.lock(); + break; + } + + // treat a short read as an error. The hash will be invalid, the + // block cannot be cached and the main thread should skip the rest + // of this file + if (ret != iov.iov_len) + { + ret = -1; + j->error.ec.assign(boost::asio::error::eof + , boost::asio::error::get_misc_category()); + j->error.operation = storage_error::read; + m_disk_cache.free_buffer(static_cast(iov.iov_base)); + l.lock(); + break; + } + + if (!j->error.ec) + { + boost::uint32_t const read_time = total_microseconds(clock_type::now() - start_time); + + m_stats_counters.inc_stats_counter(counters::num_read_back); + m_stats_counters.inc_stats_counter(counters::num_blocks_read); + m_stats_counters.inc_stats_counter(counters::num_read_ops); + m_stats_counters.inc_stats_counter(counters::disk_read_time, read_time); + m_stats_counters.inc_stats_counter(counters::disk_job_time, read_time); + } + + TORRENT_PIECE_ASSERT(offset == (first_block + i) * block_size, pe); + offset += iov.iov_len; + ph->h.update(static_cast(iov.iov_base), iov.iov_len); + + l.lock(); + m_disk_cache.insert_blocks(pe, (first_block + i), &iov, 1, j); + l.unlock(); + } } } @@ -2510,7 +2586,7 @@ namespace libtorrent // decrement the refcounts of the blocks we just hashed for (int i = 0; i < num_locked_blocks; ++i) - m_disk_cache.dec_block_refcount(pe, locked_blocks[i], block_cache::ref_hashing); + m_disk_cache.dec_block_refcount(pe, first_block + locked_blocks[i], block_cache::ref_hashing); --pe->piece_refcount;