use boost::variant for disk_io_job in/out parameter, to support holding a proper disk io buffer handle and string

This commit is contained in:
arvidn 2017-04-16 16:37:39 -04:00 committed by Arvid Norberg
parent 7d09dba14e
commit df299fcb00
13 changed files with 145 additions and 156 deletions

View File

@ -40,6 +40,10 @@ namespace libtorrent { namespace aux {
struct block_cache_reference
{
block_cache_reference() = default;
block_cache_reference(storage_index_t const idx, std::int32_t const c)
: storage(idx), cookie(c) {}
// if the cookie is set to this value, it doesn't refer to anything in the
// cache (and the buffer is mutable)
constexpr static std::int32_t none = 0x7fffffff;

View File

@ -376,7 +376,8 @@ namespace aux {
// returns the number of bytes read on success (cache hit)
// -1 on cache miss
int try_read(disk_io_job* j, bool expect_no_fail = false);
int try_read(disk_io_job* j, buffer_allocator_interface& allocator
, bool expect_no_fail = false);
// called when we're reading and we found the piece we're
// reading from in the hash table (not necessarily that we
@ -465,7 +466,8 @@ namespace aux {
// returns number of bytes read on success, -1 on cache miss
// (just because the piece is in the cache, doesn't mean all
// the blocks are there)
int copy_from_piece(cached_piece_entry* p, disk_io_job* j, bool expect_no_fail = false);
int copy_from_piece(cached_piece_entry* p, disk_io_job* j
, buffer_allocator_interface& allocator, bool expect_no_fail = false);
void free_piece(cached_piece_entry* p);
int drain_piece_bufs(cached_piece_entry& p, std::vector<char*>& buf);

View File

@ -145,7 +145,7 @@ namespace libtorrent {
, std::uint8_t flags = 0) = 0;
virtual void async_hash(storage_index_t storage, piece_index_t piece, std::uint8_t flags
, std::function<void(piece_index_t, sha1_hash const&, storage_error const&)> handler, void* requester) = 0;
virtual void async_move_storage(storage_index_t storage, std::string const& p, std::uint8_t flags
virtual void async_move_storage(storage_index_t storage, std::string p, std::uint8_t flags
, std::function<void(status_t, std::string const&, storage_error const&)> handler) = 0;
virtual void async_release_files(storage_index_t storage
, std::function<void()> handler = std::function<void()>()) = 0;
@ -158,12 +158,12 @@ namespace libtorrent {
virtual void async_stop_torrent(storage_index_t storage
, std::function<void()> handler = std::function<void()>()) = 0;
virtual void async_rename_file(storage_index_t storage
, file_index_t index, std::string const& name
, file_index_t index, std::string name
, std::function<void(std::string const&, file_index_t, storage_error const&)> handler) = 0;
virtual void async_delete_files(storage_index_t storage, int options
, std::function<void(storage_error const&)> handler) = 0;
virtual void async_set_file_priority(storage_index_t storage
, aux::vector<std::uint8_t, file_index_t> const& prio
, aux::vector<std::uint8_t, file_index_t> prio
, std::function<void(storage_error const&)> handler) = 0;
virtual void async_clear_piece(storage_index_t storage, piece_index_t index

View File

@ -57,7 +57,6 @@ namespace libtorrent {
struct cached_piece_entry;
class torrent_info;
struct add_torrent_params;
struct buffer_allocator_interface;
// disk_io_jobs are allocated in a pool allocator in disk_io_thread
// they are always allocated from the network thread, posted
@ -74,11 +73,10 @@ namespace libtorrent {
struct TORRENT_EXTRA_EXPORT disk_io_job : tailqueue_node<disk_io_job>
{
disk_io_job();
~disk_io_job();
disk_io_job(disk_io_job const&) = delete;
disk_io_job& operator=(disk_io_job const&) = delete;
void call_callback(buffer_allocator_interface&);
void call_callback();
enum action_t : std::uint8_t
{
@ -126,21 +124,14 @@ namespace libtorrent {
// unique identifier for the peer when reading
void* requester = nullptr;
// for write, this points to the data to write,
// for read, the data read is returned here
// for read and write, this is the disk_buffer_holder
// for other jobs, it may point to other job-specific types
// for move_storage and rename_file this is a string allocated
// with malloc()
// for get_cache_info this points to a cache_status object which
// is filled in
union
{
char* disk_block;
char* string;
add_torrent_params const* check_resume_data;
aux::vector<std::uint8_t, file_index_t>* priorities;
int delete_options;
} buffer;
// for move_storage and rename_file this is a string
boost::variant<disk_buffer_holder
, std::string
, add_torrent_params const*
, aux::vector<std::uint8_t, file_index_t>
, int> argument;
// the disk storage this job applies to (if applicable)
std::shared_ptr<storage_interface> storage;
@ -184,16 +175,6 @@ namespace libtorrent {
struct io_args
{
// if this is set, the read operation is required to
// release the block references once it's done sending
// the buffer. For aligned block requests (by far the
// most common) the buffers are not actually copied
// into the send buffer, but simply referenced. When this
// is set in a response to a read, the buffer needs to
// be de-referenced by sending a reclaim_block message
// back to the disk thread
aux::block_cache_reference ref;
// for read and write, the offset into the piece
// the read or write should start
// for hash jobs, this is the first block the hash

View File

@ -306,7 +306,7 @@ namespace aux {
, std::uint8_t flags = 0) override;
void async_hash(storage_index_t storage, piece_index_t piece, std::uint8_t flags
, std::function<void(piece_index_t, sha1_hash const&, storage_error const&)> handler, void* requester) override;
void async_move_storage(storage_index_t storage, std::string const& p, std::uint8_t flags
void async_move_storage(storage_index_t storage, std::string p, std::uint8_t flags
, std::function<void(status_t, std::string const&, storage_error const&)> handler) override;
void async_release_files(storage_index_t storage
, std::function<void()> handler = std::function<void()>()) override;
@ -316,14 +316,14 @@ namespace aux {
, add_torrent_params const* resume_data
, aux::vector<std::string, file_index_t>& links
, std::function<void(status_t, storage_error const&)> handler) override;
void async_rename_file(storage_index_t storage, file_index_t index, std::string const& name
void async_rename_file(storage_index_t storage, file_index_t index, std::string name
, std::function<void(std::string const&, file_index_t, storage_error const&)> handler) override;
void async_stop_torrent(storage_index_t storage
, std::function<void()> handler) override;
void async_flush_piece(storage_index_t storage, piece_index_t piece
, std::function<void()> handler = std::function<void()>()) override;
void async_set_file_priority(storage_index_t storage
, aux::vector<std::uint8_t, file_index_t> const& prio
, aux::vector<std::uint8_t, file_index_t> prio
, std::function<void(storage_error const&)> handler) override;
void async_clear_piece(storage_index_t storage, piece_index_t index

View File

@ -1023,7 +1023,7 @@ namespace libtorrent {
// renames the file with the given index to the new name
// the name may include a directory path
// returns false on failure
void rename_file(file_index_t index, std::string const& name);
void rename_file(file_index_t index, std::string name);
// unless this returns true, new connections must wait
// with their initialization.

View File

@ -44,6 +44,8 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/aux_/block_cache_reference.hpp"
#include "libtorrent/aux_/numeric_cast.hpp"
#include <boost/variant/get.hpp>
/*
The disk cache mimics ARC (adaptive replacement cache).
@ -354,11 +356,12 @@ block_cache::block_cache(int block_size, io_service& ios
// returns:
// -1: not in cache
// -2: no memory
int block_cache::try_read(disk_io_job* j, bool expect_no_fail)
int block_cache::try_read(disk_io_job* j, buffer_allocator_interface& allocator
, bool expect_no_fail)
{
INVARIANT_CHECK;
TORRENT_ASSERT(j->buffer.disk_block == nullptr);
TORRENT_ASSERT(!boost::get<disk_buffer_holder>(j->argument));
cached_piece_entry* p = find_piece(j);
@ -374,7 +377,7 @@ int block_cache::try_read(disk_io_job* j, bool expect_no_fail)
#endif
cache_hit(p, j->requester, (j->flags & disk_interface::volatile_read) != 0);
ret = copy_from_piece(p, j, expect_no_fail);
ret = copy_from_piece(p, j, allocator, expect_no_fail);
if (ret < 0) return ret;
ret = j->d.io.buffer_size;
@ -711,13 +714,13 @@ cached_piece_entry* block_cache::allocate_piece(disk_io_job const* j, std::uint1
cached_piece_entry* block_cache::add_dirty_block(disk_io_job* j)
{
#if !defined TORRENT_DISABLE_POOL_ALLOCATOR
TORRENT_ASSERT(is_disk_buffer(j->buffer.disk_block));
TORRENT_ASSERT(is_disk_buffer(boost::get<disk_buffer_holder>(j->argument).get()));
#endif
#ifdef TORRENT_EXPENSIVE_INVARIANT_CHECKS
INVARIANT_CHECK;
#endif
TORRENT_ASSERT(j->buffer.disk_block);
TORRENT_ASSERT(boost::get<disk_buffer_holder>(j->argument));
TORRENT_ASSERT(m_write_cache_size + m_read_cache_size + 1 <= in_use());
cached_piece_entry* pe = allocate_piece(j, cached_piece_entry::write_lru);
@ -747,26 +750,25 @@ cached_piece_entry* block_cache::add_dirty_block(disk_io_job* j)
cached_block_entry& b = pe->blocks[block];
TORRENT_PIECE_ASSERT(b.buf != j->buffer.disk_block, pe);
TORRENT_PIECE_ASSERT(b.buf != boost::get<disk_buffer_holder>(j->argument).get(), pe);
// we might have a left-over read block from
// hash checking
// we might also have a previous dirty block which
// we're still waiting for to be written
if (b.buf != nullptr && b.buf != j->buffer.disk_block)
if (b.buf != nullptr && b.buf != boost::get<disk_buffer_holder>(j->argument).get())
{
TORRENT_PIECE_ASSERT(b.refcount == 0 && !b.pending, pe);
free_block(pe, block);
TORRENT_PIECE_ASSERT(b.dirty == 0, pe);
}
b.buf = j->buffer.disk_block;
b.buf = boost::get<disk_buffer_holder>(j->argument).release();
b.dirty = true;
++pe->num_blocks;
++pe->num_dirty;
++m_write_cache_size;
j->buffer.disk_block = nullptr;
TORRENT_PIECE_ASSERT(j->piece == pe->piece, pe);
TORRENT_PIECE_ASSERT(j->flags & disk_io_job::in_progress, pe);
TORRENT_PIECE_ASSERT(j->piece == pe->piece, pe);
@ -1697,13 +1699,13 @@ void block_cache::check_invariant() const
// -2: out of memory
int block_cache::copy_from_piece(cached_piece_entry* const pe
, disk_io_job* const j
, disk_io_job* const j, buffer_allocator_interface& allocator
, bool const expect_no_fail)
{
INVARIANT_CHECK;
TORRENT_UNUSED(expect_no_fail);
TORRENT_PIECE_ASSERT(j->buffer.disk_block == nullptr, pe);
TORRENT_PIECE_ASSERT(!boost::get<disk_buffer_holder>(j->argument).get(), pe);
TORRENT_PIECE_ASSERT(pe->in_use, pe);
// copy from the cache and update the last use timestamp
@ -1746,9 +1748,10 @@ int block_cache::copy_from_piece(cached_piece_entry* const pe
// make sure it didn't wrap
TORRENT_PIECE_ASSERT(pe->refcount > 0, pe);
int const blocks_per_piece = (j->storage->files().piece_length() + block_size() - 1) / block_size();
j->d.io.ref.storage = j->storage->storage_index();
j->d.io.ref.cookie = static_cast<int>(pe->piece) * blocks_per_piece + start_block;
j->buffer.disk_block = bl.buf + (j->d.io.offset & (block_size() - 1));
j->argument = disk_buffer_holder(allocator
, aux::block_cache_reference{ j->storage->storage_index()
, static_cast<int>(pe->piece) * blocks_per_piece + start_block}
, bl.buf + (j->d.io.offset & (block_size() - 1)));
j->storage->inc_refcount();
++m_send_buffer_blocks;
@ -1763,15 +1766,17 @@ int block_cache::copy_from_piece(cached_piece_entry* const pe
return -1;
}
j->buffer.disk_block = allocate_buffer("send buffer");
if (j->buffer.disk_block == nullptr) return -2;
j->argument = disk_buffer_holder(allocator
, allocate_buffer("send buffer"));
if (!boost::get<disk_buffer_holder>(j->argument)) return -2;
while (size > 0)
{
TORRENT_PIECE_ASSERT(pe->blocks[block].buf, pe);
int to_copy = (std::min)(block_size()
- block_offset, size);
std::memcpy(j->buffer.disk_block + buffer_offset
std::memcpy(boost::get<disk_buffer_holder>(j->argument).get()
+ buffer_offset
, pe->blocks[block].buf + block_offset
, aux::numeric_cast<std::size_t>(to_copy));
size -= to_copy;

View File

@ -37,7 +37,7 @@ POSSIBILITY OF SUCH DAMAGE.
namespace libtorrent {
disk_buffer_holder::disk_buffer_holder(buffer_allocator_interface& alloc, char* buf) noexcept
: m_allocator(&alloc), m_buf(buf)
: m_allocator(&alloc), m_buf(buf), m_ref()
{}
disk_buffer_holder& disk_buffer_holder::operator=(disk_buffer_holder&& h) noexcept

View File

@ -32,22 +32,23 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/disk_io_job.hpp"
#include "libtorrent/block_cache.hpp" // for cached_piece_entry
#include "libtorrent/disk_buffer_holder.hpp"
#include <boost/variant/get.hpp>
namespace libtorrent {
struct buffer_allocator_interface;
namespace {
struct caller_visitor : boost::static_visitor<>
{
explicit caller_visitor(buffer_allocator_interface& alloc, disk_io_job& j)
: m_job(j), m_alloc(alloc) {}
explicit caller_visitor(disk_io_job& j)
: m_job(j) {}
void operator()(disk_io_job::read_handler& h) const
{
if (!h) return;
disk_buffer_holder block(m_alloc, m_job.d.io.ref, m_job.buffer.disk_block);
h(std::move(block), m_job.flags, m_job.error);
h(std::move(boost::get<disk_buffer_holder>(m_job.argument))
, m_job.flags, m_job.error);
}
void operator()(disk_io_job::write_handler& h) const
@ -65,7 +66,8 @@ namespace libtorrent {
void operator()(disk_io_job::move_handler& h) const
{
if (!h) return;
h(m_job.ret, std::string(m_job.buffer.string), m_job.error);
h(m_job.ret, std::move(boost::get<std::string>(m_job.argument))
, m_job.error);
}
void operator()(disk_io_job::release_handler& h) const
@ -83,7 +85,8 @@ namespace libtorrent {
void operator()(disk_io_job::rename_handler& h) const
{
if (!h) return;
h(m_job.buffer.string, m_job.file_index, m_job.error);
h(std::move(boost::get<std::string>(m_job.argument))
, m_job.file_index, m_job.error);
}
void operator()(disk_io_job::clear_piece_handler& h) const
@ -94,29 +97,21 @@ namespace libtorrent {
private:
disk_io_job& m_job;
buffer_allocator_interface& m_alloc;
};
}
disk_io_job::disk_io_job()
: piece(0)
: argument(0)
, piece(0)
, action(read)
{
buffer.disk_block = nullptr;
d.io.offset = 0;
d.io.buffer_size = 0;
d.io.ref.cookie = aux::block_cache_reference::none;
}
disk_io_job::~disk_io_job()
void disk_io_job::call_callback()
{
if (action == rename_file || action == move_storage)
free(buffer.string);
}
void disk_io_job::call_callback(buffer_allocator_interface& alloc)
{
boost::apply_visitor(caller_visitor(alloc, *this), callback);
boost::apply_visitor(caller_visitor(*this), callback);
}
bool disk_io_job::completed(cached_piece_entry const* pe, int block_size)

View File

@ -55,6 +55,8 @@ POSSIBILITY OF SUCH DAMAGE.
#include <functional>
#include <boost/variant/get.hpp>
#define DEBUG_DISK_THREAD 0
#if DEBUG_DISK_THREAD
@ -1213,8 +1215,9 @@ namespace libtorrent {
status_t disk_io_thread::do_uncached_read(disk_io_job* j)
{
j->buffer.disk_block = m_disk_cache.allocate_buffer("send buffer");
if (j->buffer.disk_block == nullptr)
j->argument = disk_buffer_holder(*this, m_disk_cache.allocate_buffer("send buffer"));
auto& buffer = boost::get<disk_buffer_holder>(j->argument);
if (buffer.get() == nullptr)
{
j->error.ec = error::no_memory;
j->error.operation = storage_error::alloc_cache_piece;
@ -1225,7 +1228,7 @@ namespace libtorrent {
std::uint32_t const file_flags = file_flags_for_job(j
, m_settings.get_bool(settings_pack::coalesce_reads));
iovec_t b = {j->buffer.disk_block, std::size_t(j->d.io.buffer_size)};
iovec_t b = {buffer.get(), std::size_t(j->d.io.buffer_size)};
int ret = j->storage->readv(b
, j->piece, j->d.io.offset, file_flags, j->error);
@ -1356,7 +1359,7 @@ namespace libtorrent {
TORRENT_ASSERT(pe->blocks[block].buf);
int tmp = m_disk_cache.try_read(j, true);
int tmp = m_disk_cache.try_read(j, *this, true);
// This should always succeed because we just checked to see there is a
// buffer for this block
@ -1406,7 +1409,7 @@ namespace libtorrent {
disk_io_job* j = stalled_jobs.pop_front();
TORRENT_ASSERT(j->flags & disk_io_job::in_progress);
int ret = m_disk_cache.try_read(j);
int ret = m_disk_cache.try_read(j, *this);
if (ret >= 0)
{
// cache-hit
@ -1456,8 +1459,9 @@ namespace libtorrent {
status_t disk_io_thread::do_uncached_write(disk_io_job* j)
{
time_point const start_time = clock_type::now();
auto buffer = std::move(boost::get<disk_buffer_holder>(j->argument));
iovec_t const b = {j->buffer.disk_block, std::size_t(j->d.io.buffer_size)};
iovec_t const b = { buffer.get(), std::size_t(j->d.io.buffer_size)};
std::uint32_t const file_flags = file_flags_for_job(j
, m_settings.get_bool(settings_pack::coalesce_writes));
@ -1483,9 +1487,6 @@ namespace libtorrent {
if (!j->storage->set_need_tick())
m_need_tick.push_back({aux::time_now() + minutes(2), j->storage});
m_disk_cache.free_buffer(j->buffer.disk_block);
j->buffer.disk_block = nullptr;
return ret != j->d.io.buffer_size
? status_t::fatal_disk_error : status_t::no_error;
}
@ -1502,7 +1503,8 @@ namespace libtorrent {
#if TORRENT_USE_ASSERTS
print_piece_log(pe->piece_log);
#endif
TORRENT_ASSERT(pe->blocks[j->d.io.offset / 16 / 1024].buf != j->buffer.disk_block);
TORRENT_ASSERT(pe->blocks[j->d.io.offset / 16 / 1024].buf
!= boost::get<disk_buffer_holder>(j->argument).get());
TORRENT_ASSERT(pe->blocks[j->d.io.offset / 16 / 1024].buf != nullptr);
j->error.ec = error::operation_aborted;
j->error.operation = storage_error::write;
@ -1564,7 +1566,7 @@ namespace libtorrent {
j->piece = r.piece;
j->d.io.offset = r.start;
j->d.io.buffer_size = std::uint16_t(r.length);
j->buffer.disk_block = nullptr;
j->argument = disk_buffer_holder(*this, nullptr);
j->flags = flags;
j->requester = requester;
j->callback = std::move(handler);
@ -1576,7 +1578,7 @@ namespace libtorrent {
switch (ret)
{
case 0:
j->call_callback(*this);
j->call_callback();
free_job(j);
break;
case 1:
@ -1599,7 +1601,7 @@ namespace libtorrent {
{
TORRENT_ASSERT(j->action == disk_io_job::read);
int ret = m_disk_cache.try_read(j);
int ret = m_disk_cache.try_read(j, *this);
if (ret >= 0)
{
m_stats_counters.inc_stats_counter(counters::num_blocks_cache_hits);
@ -1679,7 +1681,7 @@ namespace libtorrent {
j->piece = r.piece;
j->d.io.offset = r.start;
j->d.io.buffer_size = std::uint16_t(r.length);
j->buffer.disk_block = buffer.get();
j->argument = std::move(buffer);
j->callback = std::move(handler);
j->flags = flags;
@ -1708,17 +1710,11 @@ namespace libtorrent {
int piece_size = p.storage->files().piece_size(p.piece);
int blocks_in_piece = (piece_size + bs - 1) / bs;
for (int k = 0; k < blocks_in_piece; ++k)
TORRENT_PIECE_ASSERT(p.blocks[k].buf != j->buffer.disk_block, &p);
TORRENT_PIECE_ASSERT(p.blocks[k].buf != boost::get<disk_buffer_holder>(j->argument).get(), &p);
}
l2_.unlock();
#endif
#if !defined TORRENT_DISABLE_POOL_ALLOCATOR && TORRENT_USE_ASSERTS
std::unique_lock<std::mutex> l_(m_cache_mutex);
TORRENT_ASSERT(m_disk_cache.is_disk_buffer(j->buffer.disk_block));
l_.unlock();
#endif
TORRENT_ASSERT((r.start % m_disk_cache.block_size()) == 0);
if (j->storage->is_blocked(j))
@ -1728,9 +1724,6 @@ namespace libtorrent {
DLOG("blocked job: %s (torrent: %d total: %d)\n"
, job_action_name[j->action], j->storage ? j->storage->num_blocked() : 0
, int(m_stats_counters[counters::blocked_disk_jobs]));
// make the holder give up ownership of the buffer
// since the job was successfully queued up
buffer.release();
return exceeded;
}
@ -1739,12 +1732,8 @@ namespace libtorrent {
// be added along with it. we may not free j if so
cached_piece_entry* dpe = m_disk_cache.add_dirty_block(j);
// if the buffer was successfully added to the cache
// our holder should no longer own it
if (dpe)
{
buffer.release();
if (dpe->outstanding_flush == 0)
{
dpe->outstanding_flush = 1;
@ -1766,7 +1755,6 @@ namespace libtorrent {
l.unlock();
add_job(j);
buffer.release();
return exceeded;
}
@ -1801,7 +1789,7 @@ namespace libtorrent {
#endif
l.unlock();
j->call_callback(*this);
j->call_callback();
free_job(j);
return;
}
@ -1810,12 +1798,12 @@ namespace libtorrent {
}
void disk_io_thread::async_move_storage(storage_index_t const storage
, std::string const& p, std::uint8_t const flags
, std::string p, std::uint8_t const flags
, std::function<void(status_t, std::string const&, storage_error const&)> handler)
{
disk_io_job* j = allocate_job(disk_io_job::move_storage);
j->storage = m_torrents[storage]->shared_from_this();
j->buffer.string = allocate_string_copy(p.c_str());
j->argument = std::move(p);
j->callback = std::move(handler);
j->flags = flags;
@ -1869,7 +1857,7 @@ namespace libtorrent {
disk_io_job* j = allocate_job(disk_io_job::delete_files);
j->storage = m_torrents[storage]->shared_from_this();
j->callback = std::move(handler);
j->buffer.delete_options = options;
j->argument = options;
add_fence_job(j);
fail_jobs_impl(storage_error(boost::asio::error::operation_aborted)
@ -1890,7 +1878,7 @@ namespace libtorrent {
disk_io_job* j = allocate_job(disk_io_job::check_fastresume);
j->storage = m_torrents[storage]->shared_from_this();
j->buffer.check_resume_data = resume_data;
j->argument = resume_data;
j->d.links = links_vector;
j->callback = std::move(handler);
@ -1898,13 +1886,13 @@ namespace libtorrent {
}
void disk_io_thread::async_rename_file(storage_index_t const storage
, file_index_t index, std::string const& name
, file_index_t index, std::string name
, std::function<void(std::string const&, file_index_t, storage_error const&)> handler)
{
disk_io_job* j = allocate_job(disk_io_job::rename_file);
j->storage = m_torrents[storage]->shared_from_this();
j->file_index = index;
j->buffer.string = allocate_string_copy(name.c_str());
j->argument = std::move(name);
j->callback = std::move(handler);
add_fence_job(j);
}
@ -1958,7 +1946,7 @@ namespace libtorrent {
if (m_abort)
{
j->error.ec = boost::asio::error::operation_aborted;
j->call_callback(*this);
j->call_callback();
free_job(j);
return;
}
@ -1967,14 +1955,12 @@ namespace libtorrent {
}
void disk_io_thread::async_set_file_priority(storage_index_t const storage
, aux::vector<std::uint8_t, file_index_t> const& prios
, aux::vector<std::uint8_t, file_index_t> prios
, std::function<void(storage_error const&)> handler)
{
aux::vector<std::uint8_t, file_index_t>* p = new aux::vector<std::uint8_t, file_index_t>(prios);
disk_io_job* j = allocate_job(disk_io_job::file_priority);
j->storage = m_torrents[storage]->shared_from_this();
j->buffer.priorities = p;
j->argument = std::move(prios);
j->callback = std::move(handler);
add_fence_job(j);
@ -2439,7 +2425,7 @@ namespace libtorrent {
TORRENT_ASSERT(j->storage->num_outstanding_jobs() == 1);
// if files have to be closed, that's the storage's responsibility
return j->storage->move_storage(j->buffer.string
return j->storage->move_storage(boost::get<std::string>(j->argument)
, j->flags, j->error);
}
@ -2458,7 +2444,7 @@ namespace libtorrent {
status_t disk_io_thread::do_delete_files(disk_io_job* j, jobqueue_t& completed_jobs)
{
TORRENT_ASSERT(j->buffer.delete_options != 0);
TORRENT_ASSERT(boost::get<int>(j->argument) != 0);
// if this assert fails, something's wrong with the fence logic
TORRENT_ASSERT(j->storage->num_outstanding_jobs() == 1);
@ -2469,7 +2455,7 @@ namespace libtorrent {
, completed_jobs, l);
l.unlock();
j->storage->delete_files(j->buffer.delete_options, j->error);
j->storage->delete_files(boost::get<int>(j->argument), j->error);
return j->error ? status_t::fatal_disk_error : status_t::no_error;
}
@ -2478,7 +2464,7 @@ namespace libtorrent {
// if this assert fails, something's wrong with the fence logic
TORRENT_ASSERT(j->storage->num_outstanding_jobs() == 1);
add_torrent_params const* rd = j->buffer.check_resume_data;
add_torrent_params const* rd = boost::get<add_torrent_params const*>(j->argument);
add_torrent_params tmp;
if (rd == nullptr) rd = &tmp;
@ -2544,7 +2530,7 @@ namespace libtorrent {
TORRENT_ASSERT(j->storage->num_outstanding_jobs() == 1);
// if files need to be closed, that's the storage's responsibility
j->storage->rename_file(j->file_index, j->buffer.string
j->storage->rename_file(j->file_index, boost::get<std::string>(j->argument)
, j->error);
return j->error ? status_t::fatal_disk_error : status_t::no_error;
}
@ -2797,8 +2783,9 @@ namespace libtorrent {
status_t disk_io_thread::do_file_priority(disk_io_job* j, jobqueue_t& /* completed_jobs */ )
{
std::unique_ptr<aux::vector<std::uint8_t, file_index_t>> p(j->buffer.priorities);
j->storage->set_file_priority(*p, j->error);
j->storage->set_file_priority(
boost::get<aux::vector<std::uint8_t, file_index_t>>(j->argument)
, j->error);
return status_t::no_error;
}
@ -3282,17 +3269,16 @@ namespace libtorrent {
disk_io_job const* j = static_cast<disk_io_job const*>(i.get());
TORRENT_ASSERT((j->flags & disk_io_job::in_progress) || !j->storage);
if (j->action == disk_io_job::write)
{
std::unique_lock<std::mutex> l(m_cache_mutex);
cached_piece_entry* pe = m_disk_cache.find_piece(j);
if (pe)
{
TORRENT_ASSERT(pe->blocks[j->d.io.offset / 16 / 1024].buf != j->buffer.disk_block);
TORRENT_ASSERT(pe->blocks[j->d.io.offset / 16 / 1024].buf == nullptr);
TORRENT_ASSERT(!pe->hashing_done);
}
}
if (j->action != disk_io_job::write) continue;
std::unique_lock<std::mutex> l(m_cache_mutex);
cached_piece_entry* pe = m_disk_cache.find_piece(j);
if (!pe) continue;
TORRENT_ASSERT(pe->blocks[j->d.io.offset / 16 / 1024].buf
!= boost::get<disk_buffer_holder>(j->argument).get());
TORRENT_ASSERT(pe->blocks[j->d.io.offset / 16 / 1024].buf == nullptr);
TORRENT_ASSERT(!pe->hashing_done);
}
#endif
jobqueue_t other_jobs;
@ -3419,7 +3405,7 @@ namespace libtorrent {
#if TORRENT_USE_ASSERTS
j->callback_called = true;
#endif
j->call_callback(*this);
j->call_callback();
to_delete[cnt++] = j;
j = next;
if (cnt == int(to_delete.size()))

View File

@ -7410,7 +7410,7 @@ namespace libtorrent {
return m_save_path;
}
void torrent::rename_file(file_index_t const index, std::string const& name)
void torrent::rename_file(file_index_t const index, std::string name)
{
INVARIANT_CHECK;
@ -7428,7 +7428,7 @@ namespace libtorrent {
return;
}
m_ses.disk_thread().async_rename_file(m_storage, index, name
m_ses.disk_thread().async_rename_file(m_storage, index, std::move(name)
, std::bind(&torrent::on_file_renamed, shared_from_this(), _1, _2, _3));
return;
}
@ -7462,9 +7462,9 @@ namespace libtorrent {
#if TORRENT_USE_UNC_PATHS
std::string path = canonicalize_path(save_path);
#else
std::string const& path = save_path;
std::string path = save_path;
#endif
m_ses.disk_thread().async_move_storage(m_storage, path, std::uint8_t(flags)
m_ses.disk_thread().async_move_storage(m_storage, std::move(path), std::uint8_t(flags)
, std::bind(&torrent::on_storage_moved, shared_from_this(), _1, _2, _3));
m_moving_storage = true;
}

View File

@ -74,6 +74,24 @@ struct test_storage_impl : storage_interface
void delete_files(int, storage_error& ec) override {}
};
struct allocator : buffer_allocator_interface
{
allocator(block_cache& bc, storage_interface* st)
: m_cache(bc), m_storage(st) {}
void free_disk_buffer(char* b) override
{ m_cache.free_buffer(b); }
void reclaim_blocks(span<aux::block_cache_reference> refs) override
{
for (auto ref : refs)
m_cache.reclaim_block(m_storage, ref);
}
private:
block_cache& m_cache;
storage_interface* m_storage;
};
static void nop() {}
#if TORRENT_USE_ASSERTS
@ -99,6 +117,7 @@ static void nop() {}
fs.set_num_pieces(5); \
std::shared_ptr<storage_interface> pm \
= std::make_shared<test_storage_impl>(fs); \
allocator alloc(bc, pm.get()); \
bc.set_settings(sett); \
pm->m_settings = &sett; \
disk_io_job rj; \
@ -120,7 +139,7 @@ static void nop() {}
wj.d.io.offset = (b) * 0x4000; \
wj.d.io.buffer_size = 0x4000; \
wj.piece = piece_index_t(p); \
wj.buffer.disk_block = bc.allocate_buffer("write-test"); \
wj.argument = disk_buffer_holder(alloc, bc.allocate_buffer("write-test")); \
pe = bc.add_dirty_block(&wj)
#define READ_BLOCK(p, b, r) \
@ -130,13 +149,8 @@ static void nop() {}
rj.piece = piece_index_t(p); \
rj.storage = pm; \
rj.requester = (void*)(r); \
rj.buffer.disk_block = 0; \
ret = bc.try_read(&rj)
#define RETURN_BUFFER \
if (rj.d.io.ref.cookie != aux::block_cache_reference::none) bc.reclaim_block(pm.get(), rj.d.io.ref); \
else if (rj.buffer.disk_block) bc.free_buffer(rj.buffer.disk_block); \
rj.d.io.ref.cookie = aux::block_cache_reference::none
rj.argument = disk_buffer_holder(alloc, nullptr); \
ret = bc.try_read(&rj, alloc)
#define FLUSH(flushing) \
for (int i = 0; i < int(sizeof(flushing)/sizeof((flushing)[0])); ++i) \
@ -183,7 +197,8 @@ void test_write()
TEST_CHECK(ret >= 0);
// return the reference to the buffer we just read
RETURN_BUFFER;
rj.argument = 0;
TEST_EQUAL(bc.pinned_blocks(), 0);
bc.update_stats_counters(c);
TEST_EQUAL(c[counters::pinned_blocks], 0);
@ -197,9 +212,7 @@ void test_write()
bc.update_stats_counters(c);
TEST_EQUAL(c[counters::pinned_blocks], 0);
// just in case it wasn't we're supposed to return the reference
// to the buffer
RETURN_BUFFER;
rj.argument = 0;
tailqueue<disk_io_job> jobs;
bc.clear(jobs);
@ -322,7 +335,7 @@ void test_arc_promote()
// it's supposed to be a cache hit
TEST_CHECK(ret >= 0);
// return the reference to the buffer we just read
RETURN_BUFFER;
rj.argument = 0;
bc.update_stats_counters(c);
TEST_EQUAL(c[counters::write_cache_blocks], 0);
@ -343,7 +356,7 @@ void test_arc_promote()
// it's supposed to be a cache hit
TEST_CHECK(ret >= 0);
// return the reference to the buffer we just read
RETURN_BUFFER;
rj.argument = 0;
bc.update_stats_counters(c);
TEST_EQUAL(c[counters::write_cache_blocks], 0);
@ -433,8 +446,8 @@ void test_unaligned_read()
rj.piece = piece_index_t(0);
rj.storage = pm;
rj.requester = (void*)1;
rj.buffer.disk_block = nullptr;
ret = bc.try_read(&rj);
rj.argument = disk_buffer_holder(alloc, nullptr);
ret = bc.try_read(&rj, alloc);
// unaligned reads copies the data into a new buffer
// rather than
@ -446,7 +459,7 @@ void test_unaligned_read()
// it's supposed to be a cache hit
TEST_CHECK(ret >= 0);
// return the reference to the buffer we just read
RETURN_BUFFER;
rj.argument = 0;
tailqueue<disk_io_job> jobs;
bc.clear(jobs);

View File

@ -50,6 +50,8 @@ POSSIBILITY OF SUCH DAMAGE.
#include <iostream>
#include <fstream>
#include <boost/variant/get.hpp>
using namespace std::placeholders;
using namespace libtorrent;
namespace lt = libtorrent;
@ -67,7 +69,8 @@ void on_read_piece(int ret, disk_io_job const& j, char const* data, int size)
{
std::cout << time_now_string() << " on_read_piece piece: " << j.piece << std::endl;
TEST_EQUAL(ret, size);
if (ret > 0) TEST_CHECK(std::equal(j.buffer.disk_block, j.buffer.disk_block + ret, data));
auto& buffer = boost::get<disk_buffer_holder>(j.argument);
if (ret > 0) TEST_CHECK(std::equal(buffer.get(), buffer.get() + ret, data));
}
void on_check_resume_data(status_t const status, storage_error const& error, bool* done)