diff --git a/include/libtorrent/disk_io_job.hpp b/include/libtorrent/disk_io_job.hpp index d76333584..1b0751573 100644 --- a/include/libtorrent/disk_io_job.hpp +++ b/include/libtorrent/disk_io_job.hpp @@ -110,6 +110,11 @@ namespace libtorrent { // on a cache piece that may be flushed soon static constexpr disk_job_flags_t in_progress = 2_bit; + // this is set for jobs that we're no longer interested in. Any aborted + // job that's executed should immediately fail with operation_aborted + // instead of executing + static constexpr disk_job_flags_t aborted = 6_bit; + // for write jobs, returns true if its block // is not dirty anymore bool completed(cached_piece_entry const* pe, int block_size); diff --git a/src/disk_io_job.cpp b/src/disk_io_job.cpp index 3f53e99a9..d0c6e751d 100644 --- a/src/disk_io_job.cpp +++ b/src/disk_io_job.cpp @@ -102,6 +102,7 @@ namespace libtorrent { constexpr disk_job_flags_t disk_io_job::fence; constexpr disk_job_flags_t disk_io_job::in_progress; + constexpr disk_job_flags_t disk_io_job::aborted; disk_io_job::disk_io_job() : argument(remove_flags_t{}) diff --git a/src/disk_io_thread.cpp b/src/disk_io_thread.cpp index 5617dd94a..b1a3cab3a 100644 --- a/src/disk_io_thread.cpp +++ b/src/disk_io_thread.cpp @@ -276,8 +276,14 @@ constexpr disk_job_flags_t disk_interface::cache_hit; std::unique_lock l(m_job_mutex); if (m_abort.exchange(true)) return; bool const no_threads = m_num_running_threads == 0; + // abort outstanding jobs belonging to this torrent + + for (auto i = m_hash_io_jobs.m_queued_jobs.iterate(); i.get(); i.next()) + i.get()->flags |= disk_io_job::aborted; l.unlock(); + // if there are no disk threads, we can't wait for the jobs here, because + // we'd stall indefinitely if (no_threads) { abort_jobs(); @@ -1588,7 +1594,7 @@ constexpr disk_job_flags_t disk_interface::cache_hit; j->callback = std::move(handler); std::unique_lock l(m_cache_mutex); - int ret = prep_read_job_impl(j); + int const ret = prep_read_job_impl(j); l.unlock(); switch (ret) @@ -1613,11 +1619,11 @@ constexpr disk_job_flags_t disk_interface::cache_hit; // 1 if it needs to be added to the job queue // 2 if it was deferred and will be performed later (no need to // add it to the queue) - int disk_io_thread::prep_read_job_impl(disk_io_job* j, bool check_fence) + int disk_io_thread::prep_read_job_impl(disk_io_job* j, bool const check_fence) { TORRENT_ASSERT(j->action == job_action_t::read); - int ret = m_disk_cache.try_read(j, *this); + int const ret = m_disk_cache.try_read(j, *this); if (ret >= 0) { m_stats_counters.inc_stats_counter(counters::num_blocks_cache_hits); @@ -1839,65 +1845,26 @@ constexpr disk_job_flags_t disk_interface::cache_hit; , remove_flags_t const options , std::function handler) { - // remove cache blocks belonging to this torrent - jobqueue_t completed_jobs; - - // remove outstanding jobs belonging to this torrent - std::unique_lock l2(m_job_mutex); - - // TODO: maybe the tailqueue_iterator should contain a pointer-pointer - // instead and have an unlink function - disk_io_job* qj = m_generic_io_jobs.m_queued_jobs.get_all(); - jobqueue_t to_abort; - - // if we encounter any read jobs in the queue, we need to clear the - // "outstanding_read" flag on its piece, as we abort the job - std::vector> pieces; - - storage_interface* to_delete = m_torrents[storage].get(); - - while (qj) { - disk_io_job* next = qj->next; -#if TORRENT_USE_ASSERTS - qj->next = nullptr; -#endif - if (qj->action == job_action_t::read) + // abort outstanding hash jobs belonging to this torrent + std::unique_lock l(m_job_mutex); + + std::shared_ptr st + = m_torrents[storage]->shared_from_this(); + // hash jobs + for (auto i = m_hash_io_jobs.m_queued_jobs.iterate(); i.get(); i.next()) { - pieces.push_back(std::make_pair(qj->storage.get(), qj->piece)); + disk_io_job* j = i.get(); + if (j->storage != st) continue; + j->flags |= disk_io_job::aborted; } - - if (qj->storage.get() == to_delete) - to_abort.push_back(qj); - else - m_generic_io_jobs.m_queued_jobs.push_back(qj); - qj = next; } - l2.unlock(); - - std::unique_lock l(m_cache_mutex); - for (auto& p : pieces) - { - cached_piece_entry* pe = m_disk_cache.find_piece(p.first, p.second); - if (pe == nullptr) continue; - TORRENT_ASSERT(pe->outstanding_read == 1); - pe->outstanding_read = 0; - } - - flush_cache(to_delete, flush_delete_cache, completed_jobs, l); - l.unlock(); disk_io_job* j = allocate_job(job_action_t::delete_files); j->storage = m_torrents[storage]->shared_from_this(); j->callback = std::move(handler); j->argument = options; add_fence_job(j); - - fail_jobs_impl(storage_error(boost::asio::error::operation_aborted) - , to_abort, completed_jobs); - - if (completed_jobs.size()) - add_completed_jobs(completed_jobs); } void disk_io_thread::async_check_files(storage_index_t const storage @@ -1933,38 +1900,24 @@ constexpr disk_job_flags_t disk_interface::cache_hit; void disk_io_thread::async_stop_torrent(storage_index_t const storage , std::function handler) { - // remove outstanding hash jobs belonging to this torrent - std::unique_lock l2(m_job_mutex); - - std::shared_ptr st - = m_torrents[storage]->shared_from_this(); - disk_io_job* qj = m_hash_io_jobs.m_queued_jobs.get_all(); - jobqueue_t to_abort; - - while (qj != nullptr) { - disk_io_job* next = qj->next; -#if TORRENT_USE_ASSERTS - qj->next = nullptr; -#endif - if (qj->storage.get() == st.get()) - to_abort.push_back(qj); - else - m_hash_io_jobs.m_queued_jobs.push_back(qj); - qj = next; - } - l2.unlock(); + // abort outstanding hash jobs belonging to this torrent + std::unique_lock l(m_job_mutex); + std::shared_ptr st + = m_torrents[storage]->shared_from_this(); + // hash jobs + for (auto i = m_hash_io_jobs.m_queued_jobs.iterate(); i.get(); i.next()) + { + disk_io_job* j = i.get(); + if (j->storage != st) continue; + j->flags |= disk_io_job::aborted; + } + } disk_io_job* j = allocate_job(job_action_t::stop_torrent); - j->storage = st; + j->storage = m_torrents[storage]->shared_from_this(); j->callback = std::move(handler); add_fence_job(j); - - jobqueue_t completed_jobs; - fail_jobs_impl(storage_error(boost::asio::error::operation_aborted) - , to_abort, completed_jobs); - if (completed_jobs.size()) - add_completed_jobs(completed_jobs); } void disk_io_thread::async_flush_piece(storage_index_t const storage @@ -3024,6 +2977,15 @@ constexpr disk_job_flags_t disk_interface::cache_hit; void disk_io_thread::execute_job(disk_io_job* j) { jobqueue_t completed_jobs; + if (j->flags & disk_io_job::aborted) + { + j->ret = status_t::fatal_disk_error; + j->error = storage_error(boost::asio::error::operation_aborted); + completed_jobs.push_back(j); + add_completed_jobs(completed_jobs); + return; + } + perform_job(j, completed_jobs); if (completed_jobs.size()) add_completed_jobs(completed_jobs); @@ -3325,7 +3287,7 @@ constexpr disk_job_flags_t disk_interface::cache_hit; if (j->action == job_action_t::read) { - int state = prep_read_job_impl(j, false); + int const state = prep_read_job_impl(j, false); switch (state) { case 0: diff --git a/src/torrent.cpp b/src/torrent.cpp index b01d9de0f..cf0ceea93 100644 --- a/src/torrent.cpp +++ b/src/torrent.cpp @@ -2322,6 +2322,7 @@ namespace libtorrent { INVARIANT_CHECK; if (m_abort) return; + if (m_deleted) return; state_updated(); @@ -3772,6 +3773,7 @@ namespace libtorrent { TORRENT_ASSERT(is_single_thread()); if (m_abort) return; + if (m_deleted) return; bool const passed = settings().get_bool(settings_pack::disable_hash_checks) || (!error && sha1_hash(piece_hash) == m_torrent_file->hash_for_piece(piece)); diff --git a/test/test_transfer.cpp b/test/test_transfer.cpp index a77041115..76dcdf559 100644 --- a/test/test_transfer.cpp +++ b/test/test_transfer.cpp @@ -120,14 +120,25 @@ storage_interface* test_storage_constructor(storage_params const& params, file_p return new test_storage(params, pool); } +struct transfer_tag; +using transfer_flags_t = lt::flags::bitfield_flag; + +constexpr transfer_flags_t disk_full = 1_bit; +constexpr transfer_flags_t delete_files = 2_bit; +constexpr transfer_flags_t move_storage = 3_bit; + void test_transfer(int proxy_type, settings_pack const& sett - , bool test_disk_full = false + , transfer_flags_t flags = {} , storage_mode_t storage_mode = storage_mode_sparse) { char const* test_name[] = {"no", "SOCKS4", "SOCKS5", "SOCKS5 password", "HTTP", "HTTP password"}; - std::printf("\n\n ==== TESTING %s proxy ==== disk-full: %s\n\n\n" - , test_name[proxy_type], test_disk_full ? "true": "false"); + std::printf("\n\n ==== TESTING %s proxy ==== disk-full: %s delete_files: %s move-storage: %s\n\n\n" + , test_name[proxy_type] + , (flags & disk_full) ? "true": "false" + , (flags & delete_files) ? "true": "false" + , (flags & move_storage) ? "true": "false" + ); // in case the previous run was terminated error_code ec; @@ -246,23 +257,28 @@ void test_transfer(int proxy_type, settings_pack const& sett // test using piece sizes smaller than 16kB std::tie(tor1, tor2, ignore) = setup_transfer(&ses1, &ses2, nullptr - , true, false, true, "_transfer", 8 * 1024, &t, false, test_disk_full?&addp:¶ms); + , true, false, true, "_transfer", 1024 * 1024, &t, false + , (flags & disk_full) ? &addp : ¶ms); int num_pieces = tor2.torrent_file()->num_pieces(); std::vector priorities(num_pieces, 1); - // also test to move the storage of the downloader and the uploader - // to make sure it can handle switching paths - bool test_move_storage = false; - int upload_mode_timer = 0; - wait_for_downloading(ses2, "ses2"); + lt::time_point const start_time = lt::clock_type::now(); - for (int i = 0; i < 200; ++i) + for (int i = 0; i < 20000; ++i) { - torrent_status st1 = tor1.status(); - torrent_status st2 = tor2.status(); + if (lt::clock_type::now() - start_time > seconds(10)) + { + std::cout << "timeout\n"; + break; + } + // sleep a bit + ses2.wait_for_alert(lt::milliseconds(100)); + + torrent_status const st1 = tor1.status(); + torrent_status const st2 = tor2.status(); print_alerts(ses1, "ses1", true, true, &on_alert); print_alerts(ses2, "ses2", true, true, &on_alert); @@ -272,24 +288,34 @@ void test_transfer(int proxy_type, settings_pack const& sett print_ses_rate(i / 10.f, &st1, &st2); } - if (!test_move_storage && st2.progress > 0.25f) + std::cout << "progress: " << st2.progress << "\n"; + if ((flags & move_storage) && st2.progress > 0.1f) { - test_move_storage = true; + flags &= ~move_storage; tor1.move_storage("tmp1_transfer_moved"); tor2.move_storage("tmp2_transfer_moved"); std::cout << "moving storage" << std::endl; } + if ((flags & delete_files) && st2.progress > 0.1f) + { + ses1.remove_torrent(tor1, session::delete_files); + std::cout << "deleting files" << std::endl; + + std::this_thread::sleep_for(lt::seconds(1)); + break; + } + // wait 10 loops before we restart the torrent. This lets // us catch all events that failed (and would put the torrent // back into upload mode) before we restart it. // TODO: factor out the disk-full test into its own unit test - if (test_disk_full + if (flags & disk_full && !(tor2.flags() & torrent_flags::upload_mode) && ++upload_mode_timer > 10) { - test_disk_full = false; + flags &= ~disk_full; ((test_storage*)tor2.get_storage_impl())->set_limit(16 * 1024 * 1024); // if we reset the upload mode too soon, there may be more disk @@ -323,23 +349,24 @@ void test_transfer(int proxy_type, settings_pack const& sett continue; } - if (!test_disk_full && st2.is_seeding) break; + if (!(flags & disk_full) && st2.is_seeding) break; TEST_CHECK(st1.state == torrent_status::seeding || st1.state == torrent_status::checking_files); TEST_CHECK(st2.state == torrent_status::downloading || st2.state == torrent_status::checking_resume_data - || (test_disk_full && st2.errc)); + || ((flags & disk_full) && st2.errc)); - if (!test_disk_full && peer_disconnects >= 2) break; + if (!(flags & disk_full) && peer_disconnects >= 2) break; // if nothing is being transferred after 2 seconds, we're failing the test -// if (!test_disk_full && st1.upload_payload_rate == 0 && i > 20) break; - - std::this_thread::sleep_for(lt::milliseconds(100)); +// if (!(flags & disk_full) && st1.upload_payload_rate == 0 && i > 20) break; } - TEST_CHECK(tor2.status().is_seeding); + if (!(flags & delete_files)) + { + TEST_CHECK(tor2.status().is_seeding); + } // this allows shutting down the sessions in parallel p1 = ses1.abort(); @@ -405,19 +432,35 @@ TORRENT_TEST(disk_full) { using namespace lt; // test with a (simulated) full disk - test_transfer(0, settings_pack(), true); + test_transfer(0, settings_pack(), disk_full); cleanup(); } */ +TORRENT_TEST(move_storage) +{ + using namespace lt; + test_transfer(0, settings_pack(), move_storage); + cleanup(); +} + +TORRENT_TEST(delete_files) +{ + using namespace lt; + settings_pack p = settings_pack(); + p.set_int(settings_pack::aio_threads, 10); + test_transfer(0, p, delete_files); + cleanup(); +} + TORRENT_TEST(allow_fast) { using namespace lt; // test allowed fast settings_pack p; p.set_int(settings_pack::allowed_fast_set_size, 2000); - test_transfer(0, p, false); + test_transfer(0, p); cleanup(); } @@ -429,7 +472,7 @@ TORRENT_TEST(coalesce_reads) settings_pack p; p.set_int(settings_pack::read_cache_line_size, 16); p.set_bool(settings_pack::coalesce_reads, true); - test_transfer(0, p, false); + test_transfer(0, p); cleanup(); } @@ -440,7 +483,7 @@ TORRENT_TEST(coalesce_writes) // test allowed fast settings_pack p; p.set_bool(settings_pack::coalesce_writes, true); - test_transfer(0, p, false); + test_transfer(0, p); cleanup(); } @@ -451,7 +494,7 @@ TORRENT_TEST(allocate) using namespace lt; // test storage_mode_allocate std::printf("full allocation mode\n"); - test_transfer(0, settings_pack(), false, storage_mode_allocate); + test_transfer(0, settings_pack(), {}, storage_mode_allocate); cleanup(); }