collapse logic in piece_manager into storage directly

This commit is contained in:
arvidn 2016-11-12 21:45:30 -05:00 committed by Arvid Norberg
parent 2b58cb9de3
commit 14236993ea
15 changed files with 228 additions and 262 deletions

View File

@ -55,7 +55,7 @@ POSSIBILITY OF SUCH DAMAGE.
namespace libtorrent
{
struct disk_io_job;
class piece_manager;
struct storage_interface;
struct cache_status;
struct block_cache_reference;
struct counters;
@ -182,7 +182,7 @@ namespace libtorrent
}
// storage this piece belongs to
std::shared_ptr<piece_manager> storage;
std::shared_ptr<storage_interface> storage;
// write jobs hanging off of this piece
tailqueue<disk_io_job> jobs;
@ -413,7 +413,7 @@ namespace libtorrent
// to it, otherwise 0.
cached_piece_entry* find_piece(block_cache_reference const& ref);
cached_piece_entry* find_piece(disk_io_job const* j);
cached_piece_entry* find_piece(piece_manager* st, int piece);
cached_piece_entry* find_piece(storage_interface* st, int piece);
// clear free all buffers marked as dirty with
// refcount of 0.

View File

@ -40,7 +40,7 @@ POSSIBILITY OF SUCH DAMAGE.
namespace libtorrent
{
struct disk_io_job;
class piece_manager;
struct storage_interface;
struct peer_request;
struct disk_observer;
struct file_pool;
@ -59,44 +59,44 @@ namespace libtorrent
file_exist = -4
};
virtual void async_read(piece_manager* storage, peer_request const& r
virtual void async_read(storage_interface* storage, peer_request const& r
, std::function<void(disk_io_job const*)> handler, void* requester
, int flags = 0) = 0;
virtual void async_write(piece_manager* storage, peer_request const& r
virtual void async_write(storage_interface* storage, peer_request const& r
, disk_buffer_holder buffer
, std::function<void(disk_io_job const*)> handler
, int flags = 0) = 0;
virtual void async_hash(piece_manager* storage, int piece, int flags
virtual void async_hash(storage_interface* storage, int piece, int flags
, std::function<void(disk_io_job const*)> handler, void* requester) = 0;
virtual void async_move_storage(piece_manager* storage, std::string const& p, int flags
virtual void async_move_storage(storage_interface* storage, std::string const& p, int flags
, std::function<void(disk_io_job const*)> handler) = 0;
virtual void async_release_files(piece_manager* storage
virtual void async_release_files(storage_interface* storage
, std::function<void(disk_io_job const*)> handler
= std::function<void(disk_io_job const*)>()) = 0;
virtual void async_check_files(piece_manager* storage
virtual void async_check_files(storage_interface* storage
, add_torrent_params const* resume_data
, std::vector<std::string>& links
, std::function<void(disk_io_job const*)> handler) = 0;
virtual void async_flush_piece(piece_manager* storage, int piece
virtual void async_flush_piece(storage_interface* storage, int piece
, std::function<void(disk_io_job const*)> handler
= std::function<void(disk_io_job const*)>()) = 0;
virtual void async_stop_torrent(piece_manager* storage
virtual void async_stop_torrent(storage_interface* storage
, std::function<void(disk_io_job const*)> handler)= 0;
virtual void async_rename_file(piece_manager* storage, int index, std::string const& name
virtual void async_rename_file(storage_interface* storage, int index, std::string const& name
, std::function<void(disk_io_job const*)> handler) = 0;
virtual void async_delete_files(piece_manager* storage, int options
virtual void async_delete_files(storage_interface* storage, int options
, std::function<void(disk_io_job const*)> handler) = 0;
virtual void async_set_file_priority(piece_manager* storage
virtual void async_set_file_priority(storage_interface* storage
, std::vector<std::uint8_t> const& prio
, std::function<void(disk_io_job const*)> handler) = 0;
virtual void async_clear_piece(piece_manager* storage, int index
virtual void async_clear_piece(storage_interface* storage, int index
, std::function<void(disk_io_job const*)> handler) = 0;
virtual void clear_piece(piece_manager* storage, int index) = 0;
virtual void clear_piece(storage_interface* storage, int index) = 0;
virtual void update_stats_counters(counters& c) const = 0;
virtual void get_cache_info(cache_status* ret, bool no_pieces = true
, piece_manager const* storage = 0) const = 0;
, storage_interface const* storage = 0) const = 0;
virtual file_pool& files() = 0;

View File

@ -43,7 +43,8 @@ POSSIBILITY OF SUCH DAMAGE.
namespace libtorrent
{
class piece_manager;
struct storage_interface;
using storage_interface = storage_interface;
struct cached_piece_entry;
class torrent_info;
struct add_torrent_params;
@ -148,7 +149,7 @@ namespace libtorrent
} buffer;
// the disk storage this job applies to (if applicable)
std::shared_ptr<piece_manager> storage;
std::shared_ptr<storage_interface> storage;
// this is called when operation completes
std::function<void(disk_io_job const*)> callback;

View File

@ -62,7 +62,7 @@ namespace libtorrent
struct cached_piece_info
{
piece_manager* storage;
storage_interface* storage;
// holds one entry for each block in this piece. ``true`` represents
// the data for that block being in the disk cache and ``false`` means it's not.
@ -286,45 +286,45 @@ namespace libtorrent
void abort(bool wait);
void async_read(piece_manager* storage, peer_request const& r
void async_read(storage_interface* storage, peer_request const& r
, std::function<void(disk_io_job const*)> handler, void* requester
, int flags = 0) override;
void async_write(piece_manager* storage, peer_request const& r
void async_write(storage_interface* storage, peer_request const& r
, disk_buffer_holder buffer
, std::function<void(disk_io_job const*)> handler
, int flags = 0) override;
void async_hash(piece_manager* storage, int piece, int flags
void async_hash(storage_interface* storage, int piece, int flags
, std::function<void(disk_io_job const*)> handler, void* requester) override;
void async_move_storage(piece_manager* storage, std::string const& p, int flags
void async_move_storage(storage_interface* storage, std::string const& p, int flags
, std::function<void(disk_io_job const*)> handler) override;
void async_release_files(piece_manager* storage
void async_release_files(storage_interface* storage
, std::function<void(disk_io_job const*)> handler
= std::function<void(disk_io_job const*)>()) override;
void async_delete_files(piece_manager* storage, int options
void async_delete_files(storage_interface* storage, int options
, std::function<void(disk_io_job const*)> handler) override;
void async_check_files(piece_manager* storage
void async_check_files(storage_interface* storage
, add_torrent_params const* resume_data
, std::vector<std::string>& links
, std::function<void(disk_io_job const*)> handler) override;
void async_rename_file(piece_manager* storage, int index, std::string const& name
void async_rename_file(storage_interface* storage, int index, std::string const& name
, std::function<void(disk_io_job const*)> handler) override;
void async_stop_torrent(piece_manager* storage
void async_stop_torrent(storage_interface* storage
, std::function<void(disk_io_job const*)> handler) override;
void async_flush_piece(piece_manager* storage, int piece
void async_flush_piece(storage_interface* storage, int piece
, std::function<void(disk_io_job const*)> handler
= std::function<void(disk_io_job const*)>()) override;
void async_set_file_priority(piece_manager* storage
void async_set_file_priority(storage_interface* storage
, std::vector<std::uint8_t> const& prio
, std::function<void(disk_io_job const*)> handler) override;
void async_clear_piece(piece_manager* storage, int index
void async_clear_piece(storage_interface* storage, int index
, std::function<void(disk_io_job const*)> handler) override;
// this is not asynchronous and requires that the piece does not
// have any pending buffers. It's meant to be used for pieces that
// were just read and hashed and failed the hash check.
// there should be no read-operations left, and all buffers should
// be discardable
void clear_piece(piece_manager* storage, int index) override;
void clear_piece(storage_interface* storage, int index) override;
// implements buffer_allocator_interface
void reclaim_blocks(span<block_cache_reference> ref) override;
@ -341,7 +341,7 @@ namespace libtorrent
void update_stats_counters(counters& c) const override;
void get_cache_info(cache_status* ret, bool no_pieces = true
, piece_manager const* storage = 0) const override;
, storage_interface const* storage = 0) const override;
// this submits all queued up jobs to the thread
void submit_jobs();
@ -450,7 +450,7 @@ namespace libtorrent
// this queues up another job to be submitted
void add_job(disk_io_job* j, bool user_add = true);
void add_fence_job(piece_manager* storage, disk_io_job* j
void add_fence_job(storage_interface* storage, disk_io_job* j
, bool user_add = true);
// assumes l is locked (cache std::mutex).
@ -490,7 +490,7 @@ namespace libtorrent
// used for asserts and only applies for fence jobs
flush_expect_clear = 8
};
void flush_cache(piece_manager* storage, std::uint32_t flags, jobqueue_t& completed_jobs, std::unique_lock<std::mutex>& l);
void flush_cache(storage_interface* storage, std::uint32_t flags, jobqueue_t& completed_jobs, std::unique_lock<std::mutex>& l);
void flush_expired_write_blocks(jobqueue_t& completed_jobs, std::unique_lock<std::mutex>& l);
void flush_piece(cached_piece_entry* pe, int flags, jobqueue_t& completed_jobs, std::unique_lock<std::mutex>& l);
@ -590,7 +590,7 @@ namespace libtorrent
// storages that have had write activity recently and will get ticked
// soon, for deferred actions (say, flushing partfile metadata)
std::vector<std::pair<time_point, std::weak_ptr<piece_manager>>> m_need_tick;
std::vector<std::pair<time_point, std::weak_ptr<storage_interface>>> m_need_tick;
// this is protected by the completed_jobs_mutex. It's true whenever
// there's a call_job_handlers message in-flight to the network thread. We

View File

@ -147,6 +147,89 @@ namespace libtorrent
TORRENT_EXTRA_EXPORT span<file::iovec_t> advance_bufs(span<file::iovec_t> bufs, int bytes);
TORRENT_EXTRA_EXPORT void clear_bufs(span<file::iovec_t const> bufs);
struct disk_io_thread;
// implements the disk I/O job fence used by the storage_interface
// to provide to the disk thread. Whenever a disk job needs
// exclusive access to the storage for that torrent, it raises
// the fence, blocking all new jobs, until there are no longer
// any outstanding jobs on the torrent, then the fence is lowered
// and it can be performed, along with the backlog of jobs that
// accrued while the fence was up
struct TORRENT_EXTRA_EXPORT disk_job_fence
{
disk_job_fence();
~disk_job_fence()
{
TORRENT_ASSERT(int(m_outstanding_jobs) == 0);
TORRENT_ASSERT(m_blocked_jobs.size() == 0);
}
// returns one of the fence_* enums.
// if there are no outstanding jobs on the
// storage, fence_post_fence is returned, the flush job is expected
// to be discarded by the caller.
// fence_post_flush is returned if the fence job was blocked and queued,
// but the flush job should be posted (i.e. put on the job queue)
// fence_post_none if both the fence and the flush jobs were queued.
enum { fence_post_fence = 0, fence_post_flush = 1, fence_post_none = 2 };
int raise_fence(disk_io_job* fence_job, disk_io_job* flush_job
, counters& cnt);
bool has_fence() const;
// called whenever a job completes and is posted back to the
// main network thread. the tailqueue of jobs will have the
// backed-up jobs prepended to it in case this resulted in the
// fence being lowered.
int job_complete(disk_io_job* j, tailqueue<disk_io_job>& job_queue);
int num_outstanding_jobs() const { return m_outstanding_jobs; }
// if there is a fence up, returns true and adds the job
// to the queue of blocked jobs
bool is_blocked(disk_io_job* j);
// the number of blocked jobs
int num_blocked() const;
private:
// when > 0, this storage is blocked for new async
// operations until all outstanding jobs have completed.
// at that point, the m_blocked_jobs are issued
// the count is the number of fence job currently in the queue
int m_has_fence;
// when there's a fence up, jobs are queued up in here
// until the fence is lowered
tailqueue<disk_io_job> m_blocked_jobs;
// the number of disk_io_job objects there are, belonging
// to this torrent, currently pending, hanging off of
// cached_piece_entry objects. This is used to determine
// when the fence can be lowered
std::atomic<int> m_outstanding_jobs;
// must be held when accessing m_has_fence and
// m_blocked_jobs
mutable std::mutex m_mutex;
};
// this class keeps track of which pieces, belonging to
// a specific storage, are in the cache right now. It's
// used for quickly being able to evict all pieces for a
// specific torrent
struct TORRENT_EXTRA_EXPORT storage_piece_set
{
void add_piece(cached_piece_entry* p);
void remove_piece(cached_piece_entry* p);
bool has_piece(cached_piece_entry const* p) const;
int num_pieces() const { return int(m_cached_pieces.size()); }
std::unordered_set<cached_piece_entry*> const& cached_pieces() const
{ return m_cached_pieces; }
private:
// these are cached pieces belonging to this storage
std::unordered_set<cached_piece_entry*> m_cached_pieces;
};
// flags for async_move_storage
enum move_flags_t
{
@ -194,6 +277,10 @@ namespace libtorrent
// reads garbage. It's useful mostly for benchmarking and profiling purpose.
//
struct TORRENT_EXPORT storage_interface
: public std::enable_shared_from_this<storage_interface>
, public disk_job_fence
, public storage_piece_set
, boost::noncopyable
{
// hidden
storage_interface(): m_settings(0) {}
@ -339,6 +426,24 @@ namespace libtorrent
// off again.
virtual bool tick() { return false; }
file_storage const* files() const { return m_files; }
bool set_need_tick()
{
bool const prev = m_need_tick;
m_need_tick = true;
return prev;
}
void do_tick()
{
m_need_tick = false;
tick();
}
void set_files(file_storage const* f) { m_files = f; }
void set_owner(std::shared_ptr<void> const& tor) { m_torrent = tor; }
// access global session_settings
aux::session_settings const& settings() const { return *m_settings; }
@ -347,12 +452,24 @@ namespace libtorrent
// initialized in disk_io_thread::perform_async_job
aux::session_settings* m_settings;
private:
bool m_need_tick = false;
file_storage const* m_files;
// the reason for this to be a void pointer
// is to avoid creating a dependency on the
// torrent. This shared_ptr is here only
// to keep the torrent object alive until
// the storage_interface destructs. This is because
// the torrent_info object is owned by the torrent.
std::shared_ptr<void> m_torrent;
};
// The default implementation of storage_interface. Behaves as a normal
// bittorrent client. It is possible to derive from this class in order to
// override some of its behavior, when implementing a custom storage.
class TORRENT_EXPORT default_storage : public storage_interface, boost::noncopyable
class TORRENT_EXPORT default_storage : public storage_interface
{
friend struct write_fileop;
friend struct read_fileop;
@ -442,139 +559,6 @@ namespace libtorrent
bool m_allocate_files;
};
struct disk_io_thread;
// implements the disk I/O job fence used by the piece_manager
// to provide to the disk thread. Whenever a disk job needs
// exclusive access to the storage for that torrent, it raises
// the fence, blocking all new jobs, until there are no longer
// any outstanding jobs on the torrent, then the fence is lowered
// and it can be performed, along with the backlog of jobs that
// accrued while the fence was up
struct TORRENT_EXTRA_EXPORT disk_job_fence
{
disk_job_fence();
~disk_job_fence()
{
TORRENT_ASSERT(int(m_outstanding_jobs) == 0);
TORRENT_ASSERT(m_blocked_jobs.size() == 0);
}
// returns one of the fence_* enums.
// if there are no outstanding jobs on the
// storage, fence_post_fence is returned, the flush job is expected
// to be discarded by the caller.
// fence_post_flush is returned if the fence job was blocked and queued,
// but the flush job should be posted (i.e. put on the job queue)
// fence_post_none if both the fence and the flush jobs were queued.
enum { fence_post_fence = 0, fence_post_flush = 1, fence_post_none = 2 };
int raise_fence(disk_io_job* fence_job, disk_io_job* flush_job
, counters& cnt);
bool has_fence() const;
// called whenever a job completes and is posted back to the
// main network thread. the tailqueue of jobs will have the
// backed-up jobs prepended to it in case this resulted in the
// fence being lowered.
int job_complete(disk_io_job* j, tailqueue<disk_io_job>& job_queue);
int num_outstanding_jobs() const { return m_outstanding_jobs; }
// if there is a fence up, returns true and adds the job
// to the queue of blocked jobs
bool is_blocked(disk_io_job* j);
// the number of blocked jobs
int num_blocked() const;
private:
// when > 0, this storage is blocked for new async
// operations until all outstanding jobs have completed.
// at that point, the m_blocked_jobs are issued
// the count is the number of fence job currently in the queue
int m_has_fence;
// when there's a fence up, jobs are queued up in here
// until the fence is lowered
tailqueue<disk_io_job> m_blocked_jobs;
// the number of disk_io_job objects there are, belonging
// to this torrent, currently pending, hanging off of
// cached_piece_entry objects. This is used to determine
// when the fence can be lowered
std::atomic<int> m_outstanding_jobs;
// must be held when accessing m_has_fence and
// m_blocked_jobs
mutable std::mutex m_mutex;
};
// this class keeps track of which pieces, belonging to
// a specific storage, are in the cache right now. It's
// used for quickly being able to evict all pieces for a
// specific torrent
struct TORRENT_EXTRA_EXPORT storage_piece_set
{
void add_piece(cached_piece_entry* p);
void remove_piece(cached_piece_entry* p);
bool has_piece(cached_piece_entry const* p) const;
int num_pieces() const { return int(m_cached_pieces.size()); }
std::unordered_set<cached_piece_entry*> const& cached_pieces() const
{ return m_cached_pieces; }
private:
// these are cached pieces belonging to this storage
std::unordered_set<cached_piece_entry*> m_cached_pieces;
};
class TORRENT_EXTRA_EXPORT piece_manager
: public std::enable_shared_from_this<piece_manager>
, public disk_job_fence
, public storage_piece_set
, boost::noncopyable
{
friend struct disk_io_thread;
public:
piece_manager(
storage_interface* storage_impl
, std::shared_ptr<void> const& torrent
, file_storage* files);
~piece_manager();
file_storage const* files() const { return &m_files; }
storage_interface* get_storage_impl() { return m_storage.get(); }
bool set_need_tick()
{
bool const prev = m_need_tick;
m_need_tick = true;
return prev;
}
void tick()
{
m_need_tick = false;
m_storage->tick();
}
private:
file_storage const& m_files;
bool m_need_tick = false;
std::unique_ptr<storage_interface> m_storage;
// the reason for this to be a void pointer
// is to avoid creating a dependency on the
// torrent. This shared_ptr is here only
// to keep the torrent object alive until
// the piece_manager destructs. This is because
// the torrent_info object is owned by the torrent.
std::shared_ptr<void> m_torrent;
};
// this identifies a read or write operation so that readwritev() knows
// what to do when it's actually touching the file
struct fileop

View File

@ -83,7 +83,7 @@ namespace libtorrent
{
class http_parser;
class piece_manager;
struct storage_interface;
struct torrent_plugin;
struct bitfield;
struct announce_entry;
@ -923,7 +923,7 @@ namespace libtorrent
int num_known_peers() const { return m_peer_list ? m_peer_list->num_peers() : 0; }
int num_connect_candidates() const { return m_peer_list ? m_peer_list->num_connect_candidates() : 0; }
piece_manager& storage();
storage_interface& storage();
bool has_storage() const { return m_storage.get() != nullptr; }
torrent_info const& torrent_file() const
@ -1153,21 +1153,21 @@ namespace libtorrent
// if this pointer is 0, the torrent is in
// a state where the metadata hasn't been
// received yet, or during shutdown.
// the piece_manager keeps the torrent object
// the storage_interface keeps the torrent object
// alive by holding a shared_ptr to it and
// the torrent keeps the piece manager alive
// with this shared_ptr. This cycle is
// broken when torrent::abort() is called
// Then the torrent releases the piece_manager
// and when the piece_manager is complete with all
// Then the torrent releases the storage_interface
// and when the storage_interface is complete with all
// outstanding disk io jobs (that keeps
// the piece_manager alive) it will destruct
// the storage_interface alive) it will destruct
// and release the torrent file. The reason for
// this is that the torrent_info is used by
// the piece_manager, and stored in the
// the storage_interface, and stored in the
// torrent, so the torrent cannot destruct
// before the piece_manager.
std::shared_ptr<piece_manager> m_storage;
// before the storage_interface.
std::shared_ptr<storage_interface> m_storage;
#ifdef TORRENT_USE_OPENSSL
std::shared_ptr<boost::asio::ssl::context> m_ssl_ctx;

View File

@ -1558,7 +1558,7 @@ void block_cache::check_invariant() const
int cached_read_blocks = 0;
int num_pinned = 0;
std::set<piece_manager*> storages;
std::set<storage_interface*> storages;
for (int i = 0; i < cached_piece_entry::num_lrus; ++i)
{
@ -1819,7 +1819,7 @@ bool block_cache::maybe_free_piece(cached_piece_entry* pe)
cached_piece_entry* block_cache::find_piece(block_cache_reference const& ref)
{
return find_piece(static_cast<piece_manager*>(ref.storage), ref.piece);
return find_piece(static_cast<storage_interface*>(ref.storage), ref.piece);
}
cached_piece_entry* block_cache::find_piece(disk_io_job const* j)
@ -1827,7 +1827,7 @@ cached_piece_entry* block_cache::find_piece(disk_io_job const* j)
return find_piece(j->storage.get(), j->piece);
}
cached_piece_entry* block_cache::find_piece(piece_manager* st, int piece)
cached_piece_entry* block_cache::find_piece(storage_interface* st, int piece)
{
cached_piece_entry model;
model.storage = st->shared_from_this();

View File

@ -160,7 +160,7 @@ namespace libtorrent
}
void on_hash(disk_io_job const* j, create_torrent* t
, std::shared_ptr<piece_manager> storage, disk_io_thread* iothread
, std::shared_ptr<storage_interface> storage, disk_io_thread* iothread
, int* piece_counter, int* completed_piece
, std::function<void(int)> const* f, error_code* ec)
{
@ -262,8 +262,6 @@ namespace libtorrent
return;
}
// dummy torrent object pointer
std::shared_ptr<char> dummy;
counters cnt;
disk_io_thread disk_thread(ios, cnt);
@ -274,10 +272,9 @@ namespace libtorrent
params.pool = &disk_thread.files();
params.mode = storage_mode_sparse;
storage_interface* storage_impl = default_storage_constructor(params);
std::shared_ptr<piece_manager> storage = std::make_shared<piece_manager>(
storage_impl, dummy, const_cast<file_storage*>(&t.files()));
std::shared_ptr<storage_interface> storage(default_storage_constructor(params));
storage->set_files(&t.files());
settings_pack sett;
sett.set_int(settings_pack::cache_size, 0);

View File

@ -31,7 +31,7 @@ POSSIBILITY OF SUCH DAMAGE.
*/
#include "libtorrent/disk_buffer_holder.hpp"
#include "libtorrent/storage.hpp" // for piece_manager
#include "libtorrent/storage.hpp" // for storage_interface
namespace libtorrent
{
@ -63,9 +63,9 @@ namespace libtorrent
TORRENT_ASSERT(m_ref.storage == nullptr || m_ref.piece >= 0);
TORRENT_ASSERT(m_ref.storage == nullptr || m_ref.block >= 0);
TORRENT_ASSERT(m_ref.storage == nullptr
|| m_ref.piece < static_cast<piece_manager*>(m_ref.storage)->files()->num_pieces());
|| m_ref.piece < static_cast<storage_interface*>(m_ref.storage)->files()->num_pieces());
TORRENT_ASSERT(m_ref.storage == nullptr
|| m_ref.block <= static_cast<piece_manager*>(m_ref.storage)->files()->piece_length() / 0x4000);
|| m_ref.block <= static_cast<storage_interface*>(m_ref.storage)->files()->piece_length() / 0x4000);
TORRENT_ASSERT(j.action != disk_io_job::rename_file);
TORRENT_ASSERT(j.action != disk_io_job::move_storage);
}
@ -80,8 +80,8 @@ namespace libtorrent
TORRENT_ASSERT(m_ref.piece >= 0);
TORRENT_ASSERT(m_ref.storage != nullptr);
TORRENT_ASSERT(m_ref.block >= 0);
TORRENT_ASSERT(m_ref.piece < static_cast<piece_manager*>(m_ref.storage)->files()->num_pieces());
TORRENT_ASSERT(m_ref.block <= static_cast<piece_manager*>(m_ref.storage)->files()->piece_length() / 0x4000);
TORRENT_ASSERT(m_ref.piece < static_cast<storage_interface*>(m_ref.storage)->files()->num_pieces());
TORRENT_ASSERT(m_ref.block <= static_cast<storage_interface*>(m_ref.storage)->files()->piece_length() / 0x4000);
TORRENT_ASSERT(j.action != disk_io_job::rename_file);
TORRENT_ASSERT(j.action != disk_io_job::move_storage);
}

View File

@ -630,7 +630,7 @@ namespace libtorrent
for (int i = 1; i <= num_blocks; ++i)
{
if (i < num_blocks && flushing[i] == flushing[i - 1] + 1) continue;
int const ret = pe->storage->get_storage_impl()->writev(
int const ret = pe->storage->writev(
iov_start.first(i - flushing_start)
, piece + flushing[flushing_start] / blocks_in_piece
, (flushing[flushing_start] % blocks_in_piece) * block_size
@ -812,7 +812,7 @@ namespace libtorrent
}
}
void disk_io_thread::flush_cache(piece_manager* storage, std::uint32_t flags
void disk_io_thread::flush_cache(storage_interface* storage, std::uint32_t flags
, jobqueue_t& completed_jobs, std::unique_lock<std::mutex>& l)
{
if (storage)
@ -887,7 +887,7 @@ namespace libtorrent
DLOG("try_flush_write_blocks: %d\n", num);
list_iterator<cached_piece_entry> range = m_disk_cache.write_lru_pieces();
std::vector<std::pair<piece_manager*, int>> pieces;
std::vector<std::pair<storage_interface*, int>> pieces;
pieces.reserve(m_disk_cache.num_write_lru_pieces());
for (list_iterator<cached_piece_entry> p = range; p.get() && num > 0; p.next())
@ -1077,14 +1077,14 @@ namespace libtorrent
}
#endif
std::shared_ptr<piece_manager> storage = j->storage;
std::shared_ptr<storage_interface> storage = j->storage;
// TODO: instead of doing this. pass in the settings to each storage_interface
// call. Each disk thread could hold its most recent understanding of the settings
// in a shared_ptr, and update it every time it wakes up from a job. That way
// each access to the settings won't require a std::mutex to be held.
if (storage && storage->get_storage_impl()->m_settings == nullptr)
storage->get_storage_impl()->m_settings = &m_settings;
if (storage && storage->m_settings == nullptr)
storage->m_settings = &m_settings;
TORRENT_ASSERT(j->action < sizeof(job_functions) / sizeof(job_functions[0]));
@ -1187,7 +1187,7 @@ namespace libtorrent
, m_settings.get_bool(settings_pack::coalesce_reads));
file::iovec_t b = { j->buffer.disk_block, size_t(j->d.io.buffer_size) };
int ret = j->storage->get_storage_impl()->readv(b
int ret = j->storage->readv(b
, j->piece, j->d.io.offset, file_flags, j->error);
TORRENT_ASSERT(ret >= 0 || j->error.ec);
@ -1262,7 +1262,7 @@ namespace libtorrent
, m_settings.get_bool(settings_pack::coalesce_reads));
time_point start_time = clock_type::now();
ret = j->storage->get_storage_impl()->readv(iov
ret = j->storage->readv(iov
, j->piece, adjusted_offset, file_flags, j->error);
if (!j->error.ec)
@ -1423,7 +1423,7 @@ namespace libtorrent
m_stats_counters.inc_stats_counter(counters::num_writing_threads, 1);
// the actual write operation
int const ret = j->storage->get_storage_impl()->writev(b
int const ret = j->storage->writev(b
, j->piece, j->d.io.offset, file_flags, j->error);
m_stats_counters.inc_stats_counter(counters::num_writing_threads, -1);
@ -1507,7 +1507,7 @@ namespace libtorrent
return do_uncached_write(j);
}
void disk_io_thread::async_read(piece_manager* storage, peer_request const& r
void disk_io_thread::async_read(storage_interface* storage, peer_request const& r
, std::function<void(disk_io_job const*)> handler, void* requester
, int flags)
{
@ -1618,7 +1618,7 @@ namespace libtorrent
return 1;
}
void disk_io_thread::async_write(piece_manager* storage, peer_request const& r
void disk_io_thread::async_write(storage_interface* storage, peer_request const& r
, disk_buffer_holder buffer
, std::function<void(disk_io_job const*)> handler
, int const flags)
@ -1721,7 +1721,7 @@ namespace libtorrent
buffer.release();
}
void disk_io_thread::async_hash(piece_manager* storage, int piece, int flags
void disk_io_thread::async_hash(storage_interface* storage, int piece, int flags
, std::function<void(disk_io_job const*)> handler, void* requester)
{
disk_io_job* j = allocate_job(disk_io_job::hash);
@ -1759,7 +1759,7 @@ namespace libtorrent
add_job(j);
}
void disk_io_thread::async_move_storage(piece_manager* storage, std::string const& p, int flags
void disk_io_thread::async_move_storage(storage_interface* storage, std::string const& p, int flags
, std::function<void(disk_io_job const*)> handler)
{
disk_io_job* j = allocate_job(disk_io_job::move_storage);
@ -1771,7 +1771,7 @@ namespace libtorrent
add_fence_job(storage, j);
}
void disk_io_thread::async_release_files(piece_manager* storage
void disk_io_thread::async_release_files(storage_interface* storage
, std::function<void(disk_io_job const*)> handler)
{
disk_io_job* j = allocate_job(disk_io_job::release_files);
@ -1781,7 +1781,7 @@ namespace libtorrent
add_fence_job(storage, j);
}
void disk_io_thread::async_delete_files(piece_manager* storage
void disk_io_thread::async_delete_files(storage_interface* storage
, int const options
, std::function<void(disk_io_job const*)> handler)
{
@ -1827,7 +1827,7 @@ namespace libtorrent
add_completed_jobs(completed_jobs);
}
void disk_io_thread::async_check_files(piece_manager* storage
void disk_io_thread::async_check_files(storage_interface* storage
, add_torrent_params const* resume_data
, std::vector<std::string>& links
, std::function<void(disk_io_job const*)> handler)
@ -1845,7 +1845,7 @@ namespace libtorrent
add_fence_job(storage, j);
}
void disk_io_thread::async_rename_file(piece_manager* storage, int index, std::string const& name
void disk_io_thread::async_rename_file(storage_interface* storage, int index, std::string const& name
, std::function<void(disk_io_job const*)> handler)
{
disk_io_job* j = allocate_job(disk_io_job::rename_file);
@ -1856,7 +1856,7 @@ namespace libtorrent
add_fence_job(storage, j);
}
void disk_io_thread::async_stop_torrent(piece_manager* storage
void disk_io_thread::async_stop_torrent(storage_interface* storage
, std::function<void(disk_io_job const*)> handler)
{
// remove outstanding hash jobs belonging to this torrent
@ -1891,7 +1891,7 @@ namespace libtorrent
add_completed_jobs(completed_jobs);
}
void disk_io_thread::async_flush_piece(piece_manager* storage, int piece
void disk_io_thread::async_flush_piece(storage_interface* storage, int piece
, std::function<void(disk_io_job const*)> handler)
{
disk_io_job* j = allocate_job(disk_io_job::flush_piece);
@ -1910,7 +1910,7 @@ namespace libtorrent
add_job(j);
}
void disk_io_thread::async_set_file_priority(piece_manager* storage
void disk_io_thread::async_set_file_priority(storage_interface* storage
, std::vector<std::uint8_t> const& prios
, std::function<void(disk_io_job const*)> handler)
{
@ -1924,7 +1924,7 @@ namespace libtorrent
add_fence_job(storage, j);
}
void disk_io_thread::async_clear_piece(piece_manager* storage, int index
void disk_io_thread::async_clear_piece(storage_interface* storage, int index
, std::function<void(disk_io_job const*)> handler)
{
disk_io_job* j = allocate_job(disk_io_job::clear_piece);
@ -1941,7 +1941,7 @@ namespace libtorrent
add_fence_job(storage, j);
}
void disk_io_thread::clear_piece(piece_manager* storage, int index)
void disk_io_thread::clear_piece(storage_interface* storage, int index)
{
std::unique_lock<std::mutex> l(m_cache_mutex);
@ -2103,7 +2103,7 @@ namespace libtorrent
time_point start_time = clock_type::now();
iov.iov_len = (std::min)(block_size, piece_size - offset);
ret = j->storage->get_storage_impl()->readv(iov, j->piece
ret = j->storage->readv(iov, j->piece
, offset, file_flags, j->error);
if (ret < 0) break;
@ -2294,7 +2294,7 @@ namespace libtorrent
time_point start_time = clock_type::now();
TORRENT_PIECE_ASSERT(offset == i * block_size, pe);
ret = j->storage->get_storage_impl()->readv(iov, j->piece
ret = j->storage->readv(iov, j->piece
, offset, file_flags, j->error);
if (ret < 0)
@ -2378,7 +2378,7 @@ namespace libtorrent
TORRENT_ASSERT(j->storage->num_outstanding_jobs() == 1);
// if files have to be closed, that's the storage's responsibility
return j->storage->get_storage_impl()->move_storage(j->buffer.string
return j->storage->move_storage(j->buffer.string
, j->flags, j->error);
}
@ -2391,7 +2391,7 @@ namespace libtorrent
flush_cache(j->storage.get(), flush_write_cache, completed_jobs, l);
l.unlock();
j->storage->get_storage_impl()->release_files(j->error);
j->storage->release_files(j->error);
return j->error ? -1 : 0;
}
@ -2408,7 +2408,7 @@ namespace libtorrent
, completed_jobs, l);
l.unlock();
j->storage->get_storage_impl()->delete_files(j->buffer.delete_options, j->error);
j->storage->delete_files(j->buffer.delete_options, j->error);
return j->error ? -1 : 0;
}
@ -2431,7 +2431,7 @@ namespace libtorrent
// torrent. The storage must create hard links (or copy) those files. If
// any file does not exist or is inaccessible, the disk job must fail.
TORRENT_ASSERT(j->storage->m_files.piece_length() > 0);
TORRENT_ASSERT(j->storage->files()->piece_length() > 0);
// if we don't have any resume data, return
// or if error is set and return value is 'no_error' or 'need_full_check'
@ -2440,14 +2440,14 @@ namespace libtorrent
// when wrong in the disk access
storage_error se;
if ((rd->have_pieces.empty()
|| !j->storage->get_storage_impl()->verify_resume_data(*rd
|| !j->storage->verify_resume_data(*rd
, links ? *links : std::vector<std::string>(), j->error))
&& !m_settings.get_bool(settings_pack::no_recheck_incomplete_resume))
{
// j->error may have been set at this point, by verify_resume_data()
// it's important to not have it cleared out subsequent calls, as long
// as they succeed.
bool const has_files = j->storage->get_storage_impl()->has_any_file(se);
bool const has_files = j->storage->has_any_file(se);
if (se)
{
@ -2458,7 +2458,7 @@ namespace libtorrent
if (has_files)
{
// always initialize the storage
j->storage->get_storage_impl()->initialize(se);
j->storage->initialize(se);
if (se)
{
j->error = se;
@ -2468,7 +2468,7 @@ namespace libtorrent
}
}
j->storage->get_storage_impl()->initialize(se);
j->storage->initialize(se);
if (se)
{
j->error = se;
@ -2483,7 +2483,7 @@ namespace libtorrent
TORRENT_ASSERT(j->storage->num_outstanding_jobs() == 1);
// if files need to be closed, that's the storage's responsibility
j->storage->get_storage_impl()->rename_file(j->piece, j->buffer.string
j->storage->rename_file(j->piece, j->buffer.string
, j->error);
return j->error ? -1 : 0;
}
@ -2502,7 +2502,7 @@ namespace libtorrent
m_disk_cache.release_memory();
j->storage->get_storage_impl()->release_files(j->error);
j->storage->release_files(j->error);
return j->error ? -1 : 0;
}
@ -2552,7 +2552,7 @@ namespace libtorrent
}
void disk_io_thread::get_cache_info(cache_status* ret, bool no_pieces
, piece_manager const* storage) const
, storage_interface const* storage) const
{
std::unique_lock<std::mutex> l(m_cache_mutex);
@ -2735,7 +2735,7 @@ namespace libtorrent
int disk_io_thread::do_file_priority(disk_io_job* j, jobqueue_t& /* completed_jobs */ )
{
std::unique_ptr<std::vector<std::uint8_t>> p(j->buffer.priorities);
j->storage->get_storage_impl()->set_file_priority(*p, j->error);
j->storage->set_file_priority(*p, j->error);
return 0;
}
@ -2778,7 +2778,7 @@ namespace libtorrent
return retry_job;
}
void disk_io_thread::add_fence_job(piece_manager* storage, disk_io_job* j
void disk_io_thread::add_fence_job(storage_interface* storage, disk_io_job* j
, bool user_add)
{
// if this happens, it means we started to shut down
@ -3042,7 +3042,7 @@ namespace libtorrent
time_point const now = aux::time_now();
while (!m_need_tick.empty() && m_need_tick.front().first < now)
{
std::shared_ptr<piece_manager> st = m_need_tick.front().second.lock();
std::shared_ptr<storage_interface> st = m_need_tick.front().second.lock();
m_need_tick.erase(m_need_tick.begin());
if (st) st->tick();
}

View File

@ -451,7 +451,7 @@ namespace libtorrent
void session_handle::get_cache_info(cache_status* ret
, torrent_handle h, int flags) const
{
piece_manager* st = nullptr;
storage_interface* st = nullptr;
std::shared_ptr<torrent> t = h.m_torrent.lock();
if (t)
{

View File

@ -1353,7 +1353,7 @@ namespace libtorrent
// of normal bittorrent operation, since it will just send garbage
// to peers and throw away all the data it downloads. It would end
// up being banned immediately
class disabled_storage final : public storage_interface, boost::noncopyable
class disabled_storage final : public storage_interface
{
public:
bool has_any_file(storage_error&) override { return false; }
@ -1465,20 +1465,6 @@ namespace libtorrent
#endif
}
// -- piece_manager -----------------------------------------------------
piece_manager::piece_manager(
storage_interface* storage_impl
, std::shared_ptr<void> const& torrent
, file_storage* files)
: m_files(*files)
, m_storage(storage_impl)
, m_torrent(torrent)
{
}
piece_manager::~piece_manager() = default;
// ====== disk_job_fence implementation ========
disk_job_fence::disk_job_fence()

View File

@ -1210,7 +1210,7 @@ namespace libtorrent
storage_interface* torrent::get_storage()
{
if (!m_storage) return nullptr;
return m_storage->get_storage_impl();
return m_storage.get();
}
void torrent::need_picker()
@ -1654,13 +1654,11 @@ namespace libtorrent
params.info = m_torrent_file.get();
TORRENT_ASSERT(m_storage_constructor);
storage_interface* storage_impl = m_storage_constructor(params);
m_storage.reset(m_storage_constructor(params));
m_storage->set_files(&m_torrent_file->files());
// the shared_from_this() will create an intentional
// cycle of ownership, se the hpp file for description.
m_storage = std::make_shared<piece_manager>(
storage_impl, shared_from_this()
, const_cast<file_storage*>(&m_torrent_file->files()));
m_storage->set_owner(shared_from_this());
}
peer_connection* torrent::find_lowest_ranking_peer() const
@ -7844,7 +7842,7 @@ namespace libtorrent
}
catch (...) { handle_exception(); }
piece_manager& torrent::storage()
storage_interface& torrent::storage()
{
TORRENT_ASSERT(m_storage.get());
return *m_storage;

View File

@ -96,12 +96,12 @@ static void nop() {}
fs.add_file("a/test7", 0x4000); \
fs.set_piece_length(0x8000); \
fs.set_num_pieces(5); \
test_storage_impl* st = new test_storage_impl; \
std::shared_ptr<piece_manager> pm \
= std::make_shared<piece_manager>(st, std::shared_ptr<int>(new int), &fs); \
std::shared_ptr<storage_interface> pm \
= std::make_shared<test_storage_impl>(); \
pm->set_files(&fs); \
error_code ec; \
bc.set_settings(sett, ec); \
st->m_settings = &sett; \
pm->m_settings = &sett; \
disk_io_job rj; \
disk_io_job wj; \
INITIALIZE_JOB(rj) \

View File

@ -472,8 +472,8 @@ void test_check_files(std::string const& test_path
p.pool = &fp;
p.mode = storage_mode;
std::shared_ptr<void> dummy;
std::shared_ptr<piece_manager> pm = std::make_shared<piece_manager>(new default_storage(p), dummy, &fs);
std::shared_ptr<storage_interface> pm = std::make_shared<default_storage>(p);
pm->set_files(&fs);
std::mutex lock;
bool done = false;