remove explicit ticking of storage from the torrent. instead make the disk_io_thread do it automatically

This commit is contained in:
arvidn 2016-11-06 01:39:41 -05:00 committed by Arvid Norberg
parent c72053a75c
commit 3ff89f29d7
9 changed files with 36 additions and 77 deletions

View File

@ -79,8 +79,6 @@ namespace libtorrent
virtual void async_set_file_priority(piece_manager* storage virtual void async_set_file_priority(piece_manager* storage
, std::vector<std::uint8_t> const& prio , std::vector<std::uint8_t> const& prio
, std::function<void(disk_io_job const*)> handler) = 0; , std::function<void(disk_io_job const*)> handler) = 0;
virtual void async_tick_torrent(piece_manager* storage
, std::function<void(disk_io_job const*)> handler) = 0;
virtual void async_clear_piece(piece_manager* storage, int index virtual void async_clear_piece(piece_manager* storage, int index
, std::function<void(disk_io_job const*)> handler) = 0; , std::function<void(disk_io_job const*)> handler) = 0;

View File

@ -92,7 +92,6 @@ namespace libtorrent
, trim_cache , trim_cache
, file_priority , file_priority
, clear_piece , clear_piece
, tick_storage
, resolve_links , resolve_links
, num_job_ids , num_job_ids

View File

@ -317,8 +317,6 @@ namespace libtorrent
void async_set_file_priority(piece_manager* storage void async_set_file_priority(piece_manager* storage
, std::vector<std::uint8_t> const& prio , std::vector<std::uint8_t> const& prio
, std::function<void(disk_io_job const*)> handler) override; , std::function<void(disk_io_job const*)> handler) override;
void async_tick_torrent(piece_manager* storage
, std::function<void(disk_io_job const*)> handler) override;
void async_clear_piece(piece_manager* storage, int index void async_clear_piece(piece_manager* storage, int index
, std::function<void(disk_io_job const*)> handler) override; , std::function<void(disk_io_job const*)> handler) override;
@ -400,7 +398,6 @@ namespace libtorrent
int do_trim_cache(disk_io_job* j, jobqueue_t& completed_jobs); int do_trim_cache(disk_io_job* j, jobqueue_t& completed_jobs);
int do_file_priority(disk_io_job* j, jobqueue_t& completed_jobs); int do_file_priority(disk_io_job* j, jobqueue_t& completed_jobs);
int do_clear_piece(disk_io_job* j, jobqueue_t& completed_jobs); int do_clear_piece(disk_io_job* j, jobqueue_t& completed_jobs);
int do_tick(disk_io_job* j, jobqueue_t& completed_jobs);
int do_resolve_links(disk_io_job* j, jobqueue_t& completed_jobs); int do_resolve_links(disk_io_job* j, jobqueue_t& completed_jobs);
void call_job_handlers(); void call_job_handlers();
@ -598,6 +595,10 @@ namespace libtorrent
std::mutex m_completed_jobs_mutex; std::mutex m_completed_jobs_mutex;
jobqueue_t m_completed_jobs; jobqueue_t m_completed_jobs;
// storages that have had write activity recently and will get ticked
// soon, for deferred actions (say, flushing partfile metadata)
std::vector<std::pair<time_point, std::weak_ptr<piece_manager>>> m_need_tick;
// this is protected by the completed_jobs_mutex. It's true whenever // this is protected by the completed_jobs_mutex. It's true whenever
// there's a call_job_handlers message in-flight to the network thread. We // there's a call_job_handlers message in-flight to the network thread. We
// only ever keep one such message in flight at a time, and coalesce // only ever keep one such message in flight at a time, and coalesce

View File

@ -560,6 +560,19 @@ namespace libtorrent
storage_interface* get_storage_impl() { return m_storage.get(); } storage_interface* get_storage_impl() { return m_storage.get(); }
bool set_need_tick()
{
bool const prev = m_need_tick;
m_need_tick = true;
return prev;
}
void tick()
{
m_need_tick = false;
m_storage->tick();
}
private: private:
// if error is set and return value is 'no_error' or 'need_full_check' // if error is set and return value is 'no_error' or 'need_full_check'
@ -579,6 +592,8 @@ namespace libtorrent
#endif #endif
file_storage const& m_files; file_storage const& m_files;
bool m_need_tick = false;
std::unique_ptr<storage_interface> m_storage; std::unique_ptr<storage_interface> m_storage;
// the reason for this to be a void pointer // the reason for this to be a void pointer

View File

@ -381,8 +381,6 @@ namespace libtorrent
, peer_request p); , peer_request p);
void on_disk_tick_done(disk_io_job const* j); void on_disk_tick_done(disk_io_job const* j);
void schedule_storage_tick();
void set_progress_ppm(int p) { m_progress_ppm = p; } void set_progress_ppm(int p) { m_progress_ppm = p; }
struct read_piece_struct struct read_piece_struct
{ {
@ -1537,14 +1535,6 @@ namespace libtorrent
// the number of bytes of padding files // the number of bytes of padding files
std::uint32_t m_padding:24; std::uint32_t m_padding:24;
// this is a second count-down to when we should tick the
// storage for this torrent. Ticking the storage is used
// to periodically flush the partfile metadata and possibly
// other deferred flushing. Any disk operation starts this
// counter (unless it's already counting down). 0 means no
// ticking is needed.
std::uint32_t m_storage_tick:8;
// ---- // ----
// the scrape data from the tracker response, this // the scrape data from the tracker response, this

View File

@ -201,7 +201,6 @@ const char* const job_action_name[] =
"trim_cache", "trim_cache",
"set_file_priority", "set_file_priority",
"clear_piece", "clear_piece",
"tick_storage",
"resolve_links" "resolve_links"
}; };

View File

@ -638,7 +638,7 @@ namespace libtorrent
for (int i = 1; i <= num_blocks; ++i) for (int i = 1; i <= num_blocks; ++i)
{ {
if (i < num_blocks && flushing[i] == flushing[i - 1] + 1) continue; if (i < num_blocks && flushing[i] == flushing[i - 1] + 1) continue;
int ret = pe->storage->get_storage_impl()->writev( int const ret = pe->storage->get_storage_impl()->writev(
iov_start.first(i - flushing_start) iov_start.first(i - flushing_start)
, piece + flushing[flushing_start] / blocks_in_piece , piece + flushing[flushing_start] / blocks_in_piece
, (flushing[flushing_start] % blocks_in_piece) * block_size , (flushing[flushing_start] % blocks_in_piece) * block_size
@ -650,6 +650,9 @@ namespace libtorrent
m_stats_counters.inc_stats_counter(counters::num_writing_threads, -1); m_stats_counters.inc_stats_counter(counters::num_writing_threads, -1);
if (!pe->storage->set_need_tick())
m_need_tick.push_back({aux::time_now() + minutes(2), pe->storage});
if (!failed) if (!failed)
{ {
TORRENT_PIECE_ASSERT(!error, pe); TORRENT_PIECE_ASSERT(!error, pe);
@ -1028,8 +1031,7 @@ namespace libtorrent
&disk_io_thread::do_flush_storage, &disk_io_thread::do_flush_storage,
&disk_io_thread::do_trim_cache, &disk_io_thread::do_trim_cache,
&disk_io_thread::do_file_priority, &disk_io_thread::do_file_priority,
&disk_io_thread::do_clear_piece, &disk_io_thread::do_clear_piece
&disk_io_thread::do_tick,
}; };
} // anonymous namespace } // anonymous namespace
@ -1431,7 +1433,7 @@ namespace libtorrent
m_stats_counters.inc_stats_counter(counters::num_writing_threads, 1); m_stats_counters.inc_stats_counter(counters::num_writing_threads, 1);
// the actual write operation // the actual write operation
int ret = j->storage->get_storage_impl()->writev(b int const ret = j->storage->get_storage_impl()->writev(b
, j->piece, j->d.io.offset, file_flags, j->error); , j->piece, j->d.io.offset, file_flags, j->error);
m_stats_counters.inc_stats_counter(counters::num_writing_threads, -1); m_stats_counters.inc_stats_counter(counters::num_writing_threads, -1);
@ -1447,6 +1449,9 @@ namespace libtorrent
m_stats_counters.inc_stats_counter(counters::disk_job_time, write_time); m_stats_counters.inc_stats_counter(counters::disk_job_time, write_time);
} }
if (!j->storage->set_need_tick())
m_need_tick.push_back({aux::time_now() + minutes(2), j->storage});
m_disk_cache.free_buffer(j->buffer.disk_block); m_disk_cache.free_buffer(j->buffer.disk_block);
j->buffer.disk_block = nullptr; j->buffer.disk_block = nullptr;
@ -1934,16 +1939,6 @@ namespace libtorrent
add_fence_job(storage, j); add_fence_job(storage, j);
} }
void disk_io_thread::async_tick_torrent(piece_manager* storage
, std::function<void(disk_io_job const*)> handler)
{
disk_io_job* j = allocate_job(disk_io_job::tick_storage);
j->storage = storage->shared_from_this();
j->callback = std::move(handler);
add_job(j);
}
void disk_io_thread::async_clear_piece(piece_manager* storage, int index void disk_io_thread::async_clear_piece(piece_manager* storage, int index
, std::function<void(disk_io_job const*)> handler) , std::function<void(disk_io_job const*)> handler)
{ {
@ -2752,13 +2747,6 @@ namespace libtorrent
return retry_job; return retry_job;
} }
int disk_io_thread::do_tick(disk_io_job* j, jobqueue_t& /* completed_jobs */ )
{
// true means this storage wants more ticks, false
// disables ticking (until it's enabled again)
return j->storage->get_storage_impl()->tick();
}
void disk_io_thread::add_fence_job(piece_manager* storage, disk_io_job* j void disk_io_thread::add_fence_job(piece_manager* storage, disk_io_job* j
, bool user_add) , bool user_add)
{ {
@ -3019,6 +3007,14 @@ namespace libtorrent
{ {
// there's no need for all threads to be doing this // there's no need for all threads to be doing this
maybe_flush_write_blocks(); maybe_flush_write_blocks();
time_point const now = aux::time_now();
while (!m_need_tick.empty() && m_need_tick.front().first < now)
{
std::shared_ptr<piece_manager> st = m_need_tick.front().second.lock();
m_need_tick.erase(m_need_tick.begin());
if (st) st->tick();
}
} }
execute_job(j); execute_job(j);

View File

@ -2994,8 +2994,6 @@ namespace libtorrent
return; return;
} }
t->schedule_storage_tick();
// in case the outstanding bytes just dropped down // in case the outstanding bytes just dropped down
// to allow to receive more data // to allow to receive more data
setup_receive(); setup_receive();

View File

@ -206,7 +206,6 @@ namespace libtorrent
, m_apply_ip_filter((p.flags & add_torrent_params::flag_apply_ip_filter) != 0) , m_apply_ip_filter((p.flags & add_torrent_params::flag_apply_ip_filter) != 0)
, m_pending_active_change(false) , m_pending_active_change(false)
, m_padding(0) , m_padding(0)
, m_storage_tick(0)
, m_incomplete(0xffffff) , m_incomplete(0xffffff)
, m_announce_to_dht((p.flags & add_torrent_params::flag_paused) == 0) , m_announce_to_dht((p.flags & add_torrent_params::flag_paused) == 0)
, m_in_state_updates(false) , m_in_state_updates(false)
@ -1321,21 +1320,11 @@ namespace libtorrent
picker().dec_refcount(piece, nullptr); picker().dec_refcount(piece, nullptr);
} }
void torrent::schedule_storage_tick()
{
// schedule a disk tick in 2 minutes or so
if (m_storage_tick != 0) return;
m_storage_tick = 120 + random(60);
update_want_tick();
}
void torrent::on_disk_write_complete(disk_io_job const* j void torrent::on_disk_write_complete(disk_io_job const* j
, peer_request p) try , peer_request p) try
{ {
TORRENT_ASSERT(is_single_thread()); TORRENT_ASSERT(is_single_thread());
schedule_storage_tick();
m_stats_counters.inc_stats_counter(counters::queued_write_bytes, -p.length); m_stats_counters.inc_stats_counter(counters::queued_write_bytes, -p.length);
// std::fprintf(stderr, "torrent::on_disk_write_complete ret:%d piece:%d block:%d\n" // std::fprintf(stderr, "torrent::on_disk_write_complete ret:%d piece:%d block:%d\n"
@ -1370,16 +1359,6 @@ namespace libtorrent
} }
catch (...) { handle_exception(); } catch (...) { handle_exception(); }
void torrent::on_disk_tick_done(disk_io_job const* j) try
{
if (j->ret && m_storage_tick == 0)
{
m_storage_tick = 120 + random(20);
update_want_tick();
}
}
catch (...) { handle_exception(); }
bool torrent::add_merkle_nodes(std::map<int, sha1_hash> const& nodes, int piece) bool torrent::add_merkle_nodes(std::map<int, sha1_hash> const& nodes, int piece)
{ {
return m_torrent_file->add_merkle_nodes(nodes, piece); return m_torrent_file->add_merkle_nodes(nodes, piece);
@ -7167,10 +7146,6 @@ namespace libtorrent
if (!m_connections.empty()) return true; if (!m_connections.empty()) return true;
// there's a deferred storage tick waiting
// to happen
if (m_storage_tick) return true;
// we might want to connect web seeds // we might want to connect web seeds
if (!is_finished() && !m_web_seeds.empty() && m_files_checked) if (!is_finished() && !m_web_seeds.empty() && m_files_checked)
return true; return true;
@ -9129,18 +9104,6 @@ namespace libtorrent
set_upload_mode(false); set_upload_mode(false);
} }
if (m_storage_tick > 0)
{
--m_storage_tick;
if (m_storage_tick == 0 && m_storage)
{
m_ses.disk_thread().async_tick_torrent(&storage()
, std::bind(&torrent::on_disk_tick_done
, shared_from_this(), _1));
update_want_tick();
}
}
if (is_paused() && !m_graceful_pause_mode) if (is_paused() && !m_graceful_pause_mode)
{ {
// let the stats fade out to 0 // let the stats fade out to 0