remove requester parameter to disk read jobs

This commit is contained in:
arvidn 2017-06-08 00:46:49 +02:00 committed by Arvid Norberg
parent 700befc98a
commit 4de9f6a75b
11 changed files with 38 additions and 51 deletions

View File

@ -119,14 +119,15 @@ namespace aux {
cached_block_entry()
: buf(0)
, refcount(0)
, dirty(false)
, pending(false)
, dirty(0)
, pending(0)
, cache_hit(0)
{
}
char* buf;
enum { max_refcount = (1 << 30) - 1 };
enum { max_refcount = (1 << 29) - 1 };
// the number of references to this buffer. These references
// might be in outstanding asynchronous requests or in peer
@ -134,7 +135,7 @@ namespace aux {
// all references are gone and refcount reaches 0. The buf
// pointer in this struct doesn't count as a reference and
// is always the last to be cleared
std::uint32_t refcount:30;
std::uint32_t refcount:29;
// if this is true, this block needs to be written to
// disk before it's freed. Typically all blocks in a piece
@ -150,6 +151,11 @@ namespace aux {
// write job to write this block.
std::uint32_t pending:1;
// this is set to 1 if this block has been read at least once. If the same
// block is read twice, the whole piece is considered *frequently* used,
// not just recently used.
std::uint32_t cache_hit:1;
#if TORRENT_USE_ASSERTS
// this many of the references are held by hashing operations
int hashing_count = 0;
@ -200,10 +206,6 @@ namespace aux {
// state while we're calculating the hash.
std::unique_ptr<partial_hash> hash;
// set to a unique identifier of a peer that last
// requested from this piece.
void* last_requester = nullptr;
// the pointers to the block data. If this is a ghost
// cache entry, there won't be any data here
aux::unique_ptr<cached_block_entry[]> blocks;
@ -394,7 +396,7 @@ namespace aux {
// called when we're reading and we found the piece we're
// reading from in the hash table (not necessarily that we
// hit the block we needed)
void cache_hit(cached_piece_entry* p, void* requester, bool volatile_read);
void cache_hit(cached_piece_entry* p, int block, bool volatile_read);
// free block from piece entry
void free_block(cached_piece_entry* pe, int block);

View File

@ -138,13 +138,13 @@ namespace libtorrent {
virtual void async_read(storage_index_t storage, peer_request const& r
, std::function<void(disk_buffer_holder block, std::uint32_t flags, storage_error const& se)> handler
, void* requester, std::uint8_t flags = 0) = 0;
, std::uint8_t flags = 0) = 0;
virtual bool async_write(storage_index_t storage, peer_request const& r
, char const* buf, std::shared_ptr<disk_observer> o
, std::function<void(storage_error const&)> handler
, std::uint8_t flags = 0) = 0;
virtual void async_hash(storage_index_t storage, piece_index_t piece, std::uint8_t flags
, std::function<void(piece_index_t, sha1_hash const&, storage_error const&)> handler, void* requester) = 0;
, std::function<void(piece_index_t, sha1_hash const&, storage_error const&)> handler) = 0;
virtual void async_move_storage(storage_index_t storage, std::string p, move_flags_t flags
, std::function<void(status_t, std::string const&, storage_error const&)> handler) = 0;
virtual void async_release_files(storage_index_t storage

View File

@ -121,9 +121,6 @@ namespace libtorrent {
// is not dirty anymore
bool completed(cached_piece_entry const* pe, int block_size);
// unique identifier for the peer when reading
void* requester = nullptr;
// for read and write, this is the disk_buffer_holder
// for other jobs, it may point to other job-specific types
// for move_storage and rename_file this is a string

View File

@ -299,13 +299,13 @@ namespace aux {
void async_read(storage_index_t storage, peer_request const& r
, std::function<void(disk_buffer_holder block
, std::uint32_t flags, storage_error const& se)> handler, void* requester, std::uint8_t flags = 0) override;
, std::uint32_t flags, storage_error const& se)> handler, std::uint8_t flags = 0) override;
bool async_write(storage_index_t storage, peer_request const& r
, char const* buf, std::shared_ptr<disk_observer> o
, std::function<void(storage_error const&)> handler
, std::uint8_t flags = 0) override;
void async_hash(storage_index_t storage, piece_index_t piece, std::uint8_t flags
, std::function<void(piece_index_t, sha1_hash const&, storage_error const&)> handler, void* requester) override;
, std::function<void(piece_index_t, sha1_hash const&, storage_error const&)> handler) override;
void async_move_storage(storage_index_t storage, std::string p, move_flags_t flags
, std::function<void(status_t, std::string const&, storage_error const&)> handler) override;
void async_release_files(storage_index_t storage

View File

@ -375,7 +375,7 @@ int block_cache::try_read(disk_io_job* j, buffer_allocator_interface& allocator
#if TORRENT_USE_ASSERTS
p->piece_log.push_back(piece_log_t(j->action, j->d.io.offset / 0x4000));
#endif
cache_hit(p, j->requester, (j->flags & disk_interface::volatile_read) != 0);
cache_hit(p, j->d.io.offset / block_size(), (j->flags & disk_interface::volatile_read) != 0);
ret = copy_from_piece(p, j, allocator, expect_no_fail);
if (ret < 0) return ret;
@ -398,7 +398,7 @@ void block_cache::bump_lru(cached_piece_entry* p)
// this is called for pieces that we're reading from, when they
// are in the cache (including the ghost lists)
void block_cache::cache_hit(cached_piece_entry* p, void* requester, bool volatile_read)
void block_cache::cache_hit(cached_piece_entry* p, int block, bool volatile_read)
{
// this can be pretty expensive
// INVARIANT_CHECK;
@ -408,15 +408,12 @@ void block_cache::cache_hit(cached_piece_entry* p, void* requester, bool volatil
// move the piece into this queue. Whenever we have a cache
// hit, we move the piece into the lru2 queue (i.e. the most
// frequently used piece). However, we only do that if the
// requester is different than the last one. This is to
// avoid a single requester making it look like a piece is
// frequently requested, when in fact it's only a single peer
// frequently used piece).
std::uint16_t target_queue = cached_piece_entry::read_lru2;
if (p->last_requester == requester || requester == nullptr)
if (p->blocks[block].cache_hit == 0)
{
// if it's the same requester and the piece isn't in
// if it's not a duplicate hit and the piece isn't in
// any of the ghost lists, ignore it
if (p->cache_state == cached_piece_entry::read_lru1
|| p->cache_state == cached_piece_entry::read_lru2
@ -438,9 +435,6 @@ void block_cache::cache_hit(cached_piece_entry* p, void* requester, bool volatil
target_queue = cached_piece_entry::read_lru1;
}
if (requester != nullptr)
p->last_requester = requester;
// if we have this piece anywhere in L1 or L2, it's a "hit"
// and it should be bumped to the highest priority in L2
// i.e. "frequently used"
@ -626,7 +620,6 @@ cached_piece_entry* block_cache::allocate_piece(disk_io_job const* j, std::uint1
pe.blocks.reset(new (std::nothrow) cached_block_entry[blocks_in_piece]);
if (!pe.blocks) return nullptr;
pe.last_requester = j->requester;
p = const_cast<cached_piece_entry*>(&*m_pieces.insert(std::move(pe)).first);
j->storage->add_piece(p);
@ -1265,7 +1258,7 @@ void block_cache::insert_blocks(cached_piece_entry* pe, int block, span<iovec_t
TORRENT_ASSERT(pe->in_use);
TORRENT_PIECE_ASSERT(iov.size() > 0, pe);
cache_hit(pe, j->requester, (j->flags & disk_interface::volatile_read) != 0);
cache_hit(pe, j->d.io.offset / block_size(), (j->flags & disk_interface::volatile_read) != 0);
TORRENT_ASSERT(pe->in_use);
@ -1722,6 +1715,7 @@ int block_cache::copy_from_piece(cached_piece_entry* const pe
// refcount, we're handing the ownership of the reference to the calling
// thread.
cached_block_entry& bl = pe->blocks[start_block];
bl.cache_hit = 1;
// make sure it didn't wrap
TORRENT_PIECE_ASSERT(pe->refcount > 0, pe);
@ -1758,6 +1752,7 @@ int block_cache::copy_from_piece(cached_piece_entry* const pe
+ buffer_offset
, pe->blocks[block].buf + block_offset
, aux::numeric_cast<std::size_t>(to_copy));
pe->blocks[block].cache_hit = 1;
size -= to_copy;
block_offset = 0;
buffer_offset += to_copy;

View File

@ -186,7 +186,7 @@ namespace libtorrent {
{
st->iothread.async_hash(st->storage, st->piece_counter
, disk_interface::sequential_access
, std::bind(&on_hash, _1, _2, _3, st), nullptr);
, std::bind(&on_hash, _1, _2, _3, st));
++st->piece_counter;
}
else
@ -292,7 +292,7 @@ namespace libtorrent {
for (piece_index_t i(0); i < piece_index_t(piece_read_ahead); ++i)
{
disk_thread.async_hash(st.storage, i, disk_interface::sequential_access
, std::bind(&on_hash, _1, _2, _3, &st), nullptr);
, std::bind(&on_hash, _1, _2, _3, &st));
++st.piece_counter;
if (st.piece_counter >= t.files().end_piece()) break;
}

View File

@ -1560,7 +1560,7 @@ namespace libtorrent {
void disk_io_thread::async_read(storage_index_t storage, peer_request const& r
, std::function<void(disk_buffer_holder block, std::uint32_t const flags
, storage_error const& se)> handler, void* requester, std::uint8_t const flags)
, storage_error const& se)> handler, std::uint8_t const flags)
{
TORRENT_ASSERT(r.length <= m_disk_cache.block_size());
TORRENT_ASSERT(r.length <= 16 * 1024);
@ -1575,7 +1575,6 @@ namespace libtorrent {
j->d.io.buffer_size = std::uint16_t(r.length);
j->argument = disk_buffer_holder(*this, nullptr);
j->flags = flags;
j->requester = requester;
j->callback = std::move(handler);
std::unique_lock<std::mutex> l(m_cache_mutex);
@ -1767,14 +1766,13 @@ namespace libtorrent {
void disk_io_thread::async_hash(storage_index_t const storage
, piece_index_t piece, std::uint8_t flags
, std::function<void(piece_index_t, sha1_hash const&, storage_error const&)> handler, void* requester)
, std::function<void(piece_index_t, sha1_hash const&, storage_error const&)> handler)
{
disk_io_job* j = allocate_job(disk_io_job::hash);
j->storage = m_torrents[storage]->shared_from_this();
j->piece = piece;
j->callback = std::move(handler);
j->flags = flags;
j->requester = requester;
int piece_size = j->storage->files().piece_size(piece);
@ -2212,7 +2210,9 @@ namespace libtorrent {
#if TORRENT_USE_ASSERTS
pe->piece_log.push_back(piece_log_t(j->action));
#endif
m_disk_cache.cache_hit(pe, j->requester, (j->flags & disk_interface::volatile_read) != 0);
int const block_size = m_disk_cache.block_size();
m_disk_cache.cache_hit(pe, j->d.io.offset / block_size
, (j->flags & disk_interface::volatile_read) != 0);
TORRENT_PIECE_ASSERT(pe->cache_state <= cached_piece_entry::read_lru1 || pe->cache_state == cached_piece_entry::read_lru2, pe);
{

View File

@ -5102,7 +5102,7 @@ namespace libtorrent {
// verified this piece (r.piece)
m_disk_thread.async_hash(t->storage(), r.piece, 0
, std::bind(&peer_connection::on_seed_mode_hashed, self()
, _1, _2, _3), this);
, _1, _2, _3));
t->verifying(r.piece);
continue;
}
@ -5138,7 +5138,7 @@ namespace libtorrent {
m_disk_thread.async_read(t->storage(), r
, std::bind(&peer_connection::on_disk_read_complete
, self(), _1, _2, _3, r, clock_type::now()), this);
, self(), _1, _2, _3, r, clock_type::now()));
}
m_last_sent_payload = clock_type::now();
m_requests.erase(m_requests.begin() + i);

View File

@ -95,8 +95,7 @@ namespace {
{
m_torrent.session().disk_thread().async_read(m_torrent.storage()
, r, std::bind(&smart_ban_plugin::on_read_ok_block
, shared_from_this(), *i, i->second.peer->address(), _1, r.length, _2, _3)
, reinterpret_cast<void*>(1));
, shared_from_this(), *i, i->second.peer->address(), _1, r.length, _2, _3));
i = m_block_hashes.erase(i);
}
else
@ -152,7 +151,6 @@ namespace {
m_torrent.session().disk_thread().async_read(m_torrent.storage(), r
, std::bind(&smart_ban_plugin::on_read_failed_block
, shared_from_this(), pb, i->address(), _1, r.length, _2, _3)
, reinterpret_cast<torrent_peer*>(1)
, disk_io_job::force_copy);
}

View File

@ -858,7 +858,7 @@ namespace libtorrent {
r.length = (std::min)(piece_size - r.start, block_size());
m_ses.disk_thread().async_read(m_storage, r
, std::bind(&torrent::on_disk_read_complete
, shared_from_this(), _1, _2, _3, r, rp), reinterpret_cast<void*>(1));
, shared_from_this(), _1, _2, _3, r, rp));
}
}
@ -2242,7 +2242,7 @@ namespace libtorrent {
m_ses.disk_thread().async_hash(m_storage, m_checking_piece
, disk_interface::sequential_access | disk_interface::volatile_read
, std::bind(&torrent::on_piece_hashed
, shared_from_this(), _1, _2, _3), reinterpret_cast<void*>(1));
, shared_from_this(), _1, _2, _3));
++m_checking_piece;
if (m_checking_piece >= m_torrent_file->end_piece()) break;
}
@ -2365,7 +2365,7 @@ namespace libtorrent {
m_ses.disk_thread().async_hash(m_storage, m_checking_piece
, disk_interface::sequential_access | disk_interface::volatile_read
, std::bind(&torrent::on_piece_hashed
, shared_from_this(), _1, _2, _3), reinterpret_cast<void*>(1));
, shared_from_this(), _1, _2, _3));
++m_checking_piece;
#ifndef TORRENT_DISABLE_LOGGING
debug_log("on_piece_hashed, m_checking_piece: %d"
@ -10092,8 +10092,7 @@ namespace libtorrent {
TORRENT_ASSERT(m_storage);
m_ses.disk_thread().async_hash(m_storage, piece, 0
, std::bind(&torrent::on_piece_verified, shared_from_this(), _1, _2, _3)
, reinterpret_cast<void*>(1));
, std::bind(&torrent::on_piece_verified, shared_from_this(), _1, _2, _3));
}
announce_entry* torrent::find_tracker(std::string const& url)

View File

@ -149,7 +149,6 @@ static void nop() {}
rj.d.io.buffer_size = 0x4000; \
rj.piece = piece_index_t(p); \
rj.storage = pm; \
rj.requester = (void*)(r); \
rj.argument = disk_buffer_holder(alloc, nullptr); \
ret = bc.try_read(&rj, alloc)
@ -163,7 +162,6 @@ static void nop() {}
#define INSERT(p, b) \
wj.piece = piece_index_t(p); \
wj.requester = (void*)1; \
pe = bc.allocate_piece(&wj, cached_piece_entry::read_lru1); \
ret = bc.allocate_iovec(iov); \
TEST_EQUAL(ret, 0); \
@ -408,7 +406,7 @@ void test_arc_unghost()
// the block is now a ghost. If we cache-hit it,
// it should be promoted back to the main list
bc.cache_hit(pe, (void*)1, false);
bc.cache_hit(pe, 0, false);
bc.update_stats_counters(c);
TEST_EQUAL(c[counters::write_cache_blocks], 0);
@ -446,7 +444,6 @@ void test_unaligned_read()
rj.d.io.buffer_size = 0x4000;
rj.piece = piece_index_t(0);
rj.storage = pm;
rj.requester = (void*)1;
rj.argument = disk_buffer_holder(alloc, nullptr);
ret = bc.try_read(&rj, alloc);
@ -500,7 +497,6 @@ TORRENT_TEST(delete_piece)
rj.d.io.buffer_size = 0x4000;
rj.piece = piece_index_t(0);
rj.storage = pm;
rj.requester = (void*)1;
rj.argument = 0;
ret = bc.try_read(&rj, alloc);