remove some questionable code from disk_io_thread when deleting files for a torrent and extend transfer test to cover deleting files while seeding

This commit is contained in:
arvidn 2017-10-22 14:03:49 +02:00 committed by Arvid Norberg
parent 9e69bf3618
commit 80ac90727b
5 changed files with 121 additions and 108 deletions

View File

@ -110,6 +110,11 @@ namespace libtorrent {
// on a cache piece that may be flushed soon
static constexpr disk_job_flags_t in_progress = 2_bit;
// this is set for jobs that we're no longer interested in. Any aborted
// job that's executed should immediately fail with operation_aborted
// instead of executing
static constexpr disk_job_flags_t aborted = 6_bit;
// for write jobs, returns true if its block
// is not dirty anymore
bool completed(cached_piece_entry const* pe, int block_size);

View File

@ -102,6 +102,7 @@ namespace libtorrent {
constexpr disk_job_flags_t disk_io_job::fence;
constexpr disk_job_flags_t disk_io_job::in_progress;
constexpr disk_job_flags_t disk_io_job::aborted;
disk_io_job::disk_io_job()
: argument(remove_flags_t{})

View File

@ -276,8 +276,14 @@ constexpr disk_job_flags_t disk_interface::cache_hit;
std::unique_lock<std::mutex> l(m_job_mutex);
if (m_abort.exchange(true)) return;
bool const no_threads = m_num_running_threads == 0;
// abort outstanding jobs belonging to this torrent
for (auto i = m_hash_io_jobs.m_queued_jobs.iterate(); i.get(); i.next())
i.get()->flags |= disk_io_job::aborted;
l.unlock();
// if there are no disk threads, we can't wait for the jobs here, because
// we'd stall indefinitely
if (no_threads)
{
abort_jobs();
@ -1588,7 +1594,7 @@ constexpr disk_job_flags_t disk_interface::cache_hit;
j->callback = std::move(handler);
std::unique_lock<std::mutex> l(m_cache_mutex);
int ret = prep_read_job_impl(j);
int const ret = prep_read_job_impl(j);
l.unlock();
switch (ret)
@ -1613,11 +1619,11 @@ constexpr disk_job_flags_t disk_interface::cache_hit;
// 1 if it needs to be added to the job queue
// 2 if it was deferred and will be performed later (no need to
// add it to the queue)
int disk_io_thread::prep_read_job_impl(disk_io_job* j, bool check_fence)
int disk_io_thread::prep_read_job_impl(disk_io_job* j, bool const check_fence)
{
TORRENT_ASSERT(j->action == job_action_t::read);
int ret = m_disk_cache.try_read(j, *this);
int const ret = m_disk_cache.try_read(j, *this);
if (ret >= 0)
{
m_stats_counters.inc_stats_counter(counters::num_blocks_cache_hits);
@ -1839,65 +1845,26 @@ constexpr disk_job_flags_t disk_interface::cache_hit;
, remove_flags_t const options
, std::function<void(storage_error const&)> handler)
{
// remove cache blocks belonging to this torrent
jobqueue_t completed_jobs;
// remove outstanding jobs belonging to this torrent
std::unique_lock<std::mutex> l2(m_job_mutex);
// TODO: maybe the tailqueue_iterator<disk_io_job> should contain a pointer-pointer
// instead and have an unlink function
disk_io_job* qj = m_generic_io_jobs.m_queued_jobs.get_all();
jobqueue_t to_abort;
// if we encounter any read jobs in the queue, we need to clear the
// "outstanding_read" flag on its piece, as we abort the job
std::vector<std::pair<storage_interface*, piece_index_t>> pieces;
storage_interface* to_delete = m_torrents[storage].get();
while (qj)
{
disk_io_job* next = qj->next;
#if TORRENT_USE_ASSERTS
qj->next = nullptr;
#endif
if (qj->action == job_action_t::read)
// abort outstanding hash jobs belonging to this torrent
std::unique_lock<std::mutex> l(m_job_mutex);
std::shared_ptr<storage_interface> st
= m_torrents[storage]->shared_from_this();
// hash jobs
for (auto i = m_hash_io_jobs.m_queued_jobs.iterate(); i.get(); i.next())
{
pieces.push_back(std::make_pair(qj->storage.get(), qj->piece));
disk_io_job* j = i.get();
if (j->storage != st) continue;
j->flags |= disk_io_job::aborted;
}
if (qj->storage.get() == to_delete)
to_abort.push_back(qj);
else
m_generic_io_jobs.m_queued_jobs.push_back(qj);
qj = next;
}
l2.unlock();
std::unique_lock<std::mutex> l(m_cache_mutex);
for (auto& p : pieces)
{
cached_piece_entry* pe = m_disk_cache.find_piece(p.first, p.second);
if (pe == nullptr) continue;
TORRENT_ASSERT(pe->outstanding_read == 1);
pe->outstanding_read = 0;
}
flush_cache(to_delete, flush_delete_cache, completed_jobs, l);
l.unlock();
disk_io_job* j = allocate_job(job_action_t::delete_files);
j->storage = m_torrents[storage]->shared_from_this();
j->callback = std::move(handler);
j->argument = options;
add_fence_job(j);
fail_jobs_impl(storage_error(boost::asio::error::operation_aborted)
, to_abort, completed_jobs);
if (completed_jobs.size())
add_completed_jobs(completed_jobs);
}
void disk_io_thread::async_check_files(storage_index_t const storage
@ -1933,38 +1900,24 @@ constexpr disk_job_flags_t disk_interface::cache_hit;
void disk_io_thread::async_stop_torrent(storage_index_t const storage
, std::function<void()> handler)
{
// remove outstanding hash jobs belonging to this torrent
std::unique_lock<std::mutex> l2(m_job_mutex);
std::shared_ptr<storage_interface> st
= m_torrents[storage]->shared_from_this();
disk_io_job* qj = m_hash_io_jobs.m_queued_jobs.get_all();
jobqueue_t to_abort;
while (qj != nullptr)
{
disk_io_job* next = qj->next;
#if TORRENT_USE_ASSERTS
qj->next = nullptr;
#endif
if (qj->storage.get() == st.get())
to_abort.push_back(qj);
else
m_hash_io_jobs.m_queued_jobs.push_back(qj);
qj = next;
}
l2.unlock();
// abort outstanding hash jobs belonging to this torrent
std::unique_lock<std::mutex> l(m_job_mutex);
std::shared_ptr<storage_interface> st
= m_torrents[storage]->shared_from_this();
// hash jobs
for (auto i = m_hash_io_jobs.m_queued_jobs.iterate(); i.get(); i.next())
{
disk_io_job* j = i.get();
if (j->storage != st) continue;
j->flags |= disk_io_job::aborted;
}
}
disk_io_job* j = allocate_job(job_action_t::stop_torrent);
j->storage = st;
j->storage = m_torrents[storage]->shared_from_this();
j->callback = std::move(handler);
add_fence_job(j);
jobqueue_t completed_jobs;
fail_jobs_impl(storage_error(boost::asio::error::operation_aborted)
, to_abort, completed_jobs);
if (completed_jobs.size())
add_completed_jobs(completed_jobs);
}
void disk_io_thread::async_flush_piece(storage_index_t const storage
@ -3024,6 +2977,15 @@ constexpr disk_job_flags_t disk_interface::cache_hit;
void disk_io_thread::execute_job(disk_io_job* j)
{
jobqueue_t completed_jobs;
if (j->flags & disk_io_job::aborted)
{
j->ret = status_t::fatal_disk_error;
j->error = storage_error(boost::asio::error::operation_aborted);
completed_jobs.push_back(j);
add_completed_jobs(completed_jobs);
return;
}
perform_job(j, completed_jobs);
if (completed_jobs.size())
add_completed_jobs(completed_jobs);
@ -3325,7 +3287,7 @@ constexpr disk_job_flags_t disk_interface::cache_hit;
if (j->action == job_action_t::read)
{
int state = prep_read_job_impl(j, false);
int const state = prep_read_job_impl(j, false);
switch (state)
{
case 0:

View File

@ -2322,6 +2322,7 @@ namespace libtorrent {
INVARIANT_CHECK;
if (m_abort) return;
if (m_deleted) return;
state_updated();
@ -3772,6 +3773,7 @@ namespace libtorrent {
TORRENT_ASSERT(is_single_thread());
if (m_abort) return;
if (m_deleted) return;
bool const passed = settings().get_bool(settings_pack::disable_hash_checks)
|| (!error && sha1_hash(piece_hash) == m_torrent_file->hash_for_piece(piece));

View File

@ -120,14 +120,25 @@ storage_interface* test_storage_constructor(storage_params const& params, file_p
return new test_storage(params, pool);
}
struct transfer_tag;
using transfer_flags_t = lt::flags::bitfield_flag<std::uint8_t, transfer_tag>;
constexpr transfer_flags_t disk_full = 1_bit;
constexpr transfer_flags_t delete_files = 2_bit;
constexpr transfer_flags_t move_storage = 3_bit;
void test_transfer(int proxy_type, settings_pack const& sett
, bool test_disk_full = false
, transfer_flags_t flags = {}
, storage_mode_t storage_mode = storage_mode_sparse)
{
char const* test_name[] = {"no", "SOCKS4", "SOCKS5", "SOCKS5 password", "HTTP", "HTTP password"};
std::printf("\n\n ==== TESTING %s proxy ==== disk-full: %s\n\n\n"
, test_name[proxy_type], test_disk_full ? "true": "false");
std::printf("\n\n ==== TESTING %s proxy ==== disk-full: %s delete_files: %s move-storage: %s\n\n\n"
, test_name[proxy_type]
, (flags & disk_full) ? "true": "false"
, (flags & delete_files) ? "true": "false"
, (flags & move_storage) ? "true": "false"
);
// in case the previous run was terminated
error_code ec;
@ -246,23 +257,28 @@ void test_transfer(int proxy_type, settings_pack const& sett
// test using piece sizes smaller than 16kB
std::tie(tor1, tor2, ignore) = setup_transfer(&ses1, &ses2, nullptr
, true, false, true, "_transfer", 8 * 1024, &t, false, test_disk_full?&addp:&params);
, true, false, true, "_transfer", 1024 * 1024, &t, false
, (flags & disk_full) ? &addp : &params);
int num_pieces = tor2.torrent_file()->num_pieces();
std::vector<int> priorities(num_pieces, 1);
// also test to move the storage of the downloader and the uploader
// to make sure it can handle switching paths
bool test_move_storage = false;
int upload_mode_timer = 0;
wait_for_downloading(ses2, "ses2");
lt::time_point const start_time = lt::clock_type::now();
for (int i = 0; i < 200; ++i)
for (int i = 0; i < 20000; ++i)
{
torrent_status st1 = tor1.status();
torrent_status st2 = tor2.status();
if (lt::clock_type::now() - start_time > seconds(10))
{
std::cout << "timeout\n";
break;
}
// sleep a bit
ses2.wait_for_alert(lt::milliseconds(100));
torrent_status const st1 = tor1.status();
torrent_status const st2 = tor2.status();
print_alerts(ses1, "ses1", true, true, &on_alert);
print_alerts(ses2, "ses2", true, true, &on_alert);
@ -272,24 +288,34 @@ void test_transfer(int proxy_type, settings_pack const& sett
print_ses_rate(i / 10.f, &st1, &st2);
}
if (!test_move_storage && st2.progress > 0.25f)
std::cout << "progress: " << st2.progress << "\n";
if ((flags & move_storage) && st2.progress > 0.1f)
{
test_move_storage = true;
flags &= ~move_storage;
tor1.move_storage("tmp1_transfer_moved");
tor2.move_storage("tmp2_transfer_moved");
std::cout << "moving storage" << std::endl;
}
if ((flags & delete_files) && st2.progress > 0.1f)
{
ses1.remove_torrent(tor1, session::delete_files);
std::cout << "deleting files" << std::endl;
std::this_thread::sleep_for(lt::seconds(1));
break;
}
// wait 10 loops before we restart the torrent. This lets
// us catch all events that failed (and would put the torrent
// back into upload mode) before we restart it.
// TODO: factor out the disk-full test into its own unit test
if (test_disk_full
if (flags & disk_full
&& !(tor2.flags() & torrent_flags::upload_mode)
&& ++upload_mode_timer > 10)
{
test_disk_full = false;
flags &= ~disk_full;
((test_storage*)tor2.get_storage_impl())->set_limit(16 * 1024 * 1024);
// if we reset the upload mode too soon, there may be more disk
@ -323,23 +349,24 @@ void test_transfer(int proxy_type, settings_pack const& sett
continue;
}
if (!test_disk_full && st2.is_seeding) break;
if (!(flags & disk_full) && st2.is_seeding) break;
TEST_CHECK(st1.state == torrent_status::seeding
|| st1.state == torrent_status::checking_files);
TEST_CHECK(st2.state == torrent_status::downloading
|| st2.state == torrent_status::checking_resume_data
|| (test_disk_full && st2.errc));
|| ((flags & disk_full) && st2.errc));
if (!test_disk_full && peer_disconnects >= 2) break;
if (!(flags & disk_full) && peer_disconnects >= 2) break;
// if nothing is being transferred after 2 seconds, we're failing the test
// if (!test_disk_full && st1.upload_payload_rate == 0 && i > 20) break;
std::this_thread::sleep_for(lt::milliseconds(100));
// if (!(flags & disk_full) && st1.upload_payload_rate == 0 && i > 20) break;
}
TEST_CHECK(tor2.status().is_seeding);
if (!(flags & delete_files))
{
TEST_CHECK(tor2.status().is_seeding);
}
// this allows shutting down the sessions in parallel
p1 = ses1.abort();
@ -405,19 +432,35 @@ TORRENT_TEST(disk_full)
{
using namespace lt;
// test with a (simulated) full disk
test_transfer(0, settings_pack(), true);
test_transfer(0, settings_pack(), disk_full);
cleanup();
}
*/
TORRENT_TEST(move_storage)
{
using namespace lt;
test_transfer(0, settings_pack(), move_storage);
cleanup();
}
TORRENT_TEST(delete_files)
{
using namespace lt;
settings_pack p = settings_pack();
p.set_int(settings_pack::aio_threads, 10);
test_transfer(0, p, delete_files);
cleanup();
}
TORRENT_TEST(allow_fast)
{
using namespace lt;
// test allowed fast
settings_pack p;
p.set_int(settings_pack::allowed_fast_set_size, 2000);
test_transfer(0, p, false);
test_transfer(0, p);
cleanup();
}
@ -429,7 +472,7 @@ TORRENT_TEST(coalesce_reads)
settings_pack p;
p.set_int(settings_pack::read_cache_line_size, 16);
p.set_bool(settings_pack::coalesce_reads, true);
test_transfer(0, p, false);
test_transfer(0, p);
cleanup();
}
@ -440,7 +483,7 @@ TORRENT_TEST(coalesce_writes)
// test allowed fast
settings_pack p;
p.set_bool(settings_pack::coalesce_writes, true);
test_transfer(0, p, false);
test_transfer(0, p);
cleanup();
}
@ -451,7 +494,7 @@ TORRENT_TEST(allocate)
using namespace lt;
// test storage_mode_allocate
std::printf("full allocation mode\n");
test_transfer(0, settings_pack(), false, storage_mode_allocate);
test_transfer(0, settings_pack(), {}, storage_mode_allocate);
cleanup();
}