slight performance fix in disk elevator algorithm

This commit is contained in:
Arvid Norberg 2010-04-24 21:53:45 +00:00
parent 54c451e513
commit fb28835452
3 changed files with 62 additions and 11 deletions

View File

@ -28,6 +28,7 @@
incoming connection incoming connection
* added more detailed instrumentation of the disk I/O thread * added more detailed instrumentation of the disk I/O thread
* slight performance fix in disk elevator algorithm
* fixed potential issue where a piece could be checked twice * fixed potential issue where a piece could be checked twice
* fixed build issue on windows related to GetCompressedSize() * fixed build issue on windows related to GetCompressedSize()
* fixed deadlock when starting torrents with certain invalid tracker URLs * fixed deadlock when starting torrents with certain invalid tracker URLs

View File

@ -1467,6 +1467,8 @@ namespace libtorrent
typedef std::multimap<size_type, disk_io_job> read_jobs_t; typedef std::multimap<size_type, disk_io_job> read_jobs_t;
read_jobs_t sorted_read_jobs; read_jobs_t sorted_read_jobs;
read_jobs_t::iterator elevator_job_pos = sorted_read_jobs.begin(); read_jobs_t::iterator elevator_job_pos = sorted_read_jobs.begin();
size_type last_elevator_pos = 0;
bool need_update_elevator_pos = false;
for (;;) for (;;)
{ {
@ -1565,11 +1567,8 @@ namespace libtorrent
m_log << log_time() << " sorting_job" << std::endl; m_log << log_time() << " sorting_job" << std::endl;
#endif #endif
size_type phys_off = j.storage->physical_offset(j.piece, j.offset); size_type phys_off = j.storage->physical_offset(j.piece, j.offset);
bool update_pos = sorted_read_jobs.empty(); need_update_elevator_pos = need_update_elevator_pos || sorted_read_jobs.empty();
sorted_read_jobs.insert(std::pair<size_type, disk_io_job>(phys_off, j)); sorted_read_jobs.insert(std::pair<size_type, disk_io_job>(phys_off, j));
// if sorted_read_jobs used to be empty,
// we need to update the elevator position
if (update_pos) elevator_job_pos = sorted_read_jobs.begin();
continue; continue;
} }
} }
@ -1582,6 +1581,14 @@ namespace libtorrent
TORRENT_ASSERT(!sorted_read_jobs.empty()); TORRENT_ASSERT(!sorted_read_jobs.empty());
// if sorted_read_jobs used to be empty,
// we need to update the elevator position
if (need_update_elevator_pos)
{
elevator_job_pos = sorted_read_jobs.lower_bound(last_elevator_pos);
need_update_elevator_pos = false;
}
// if we've reached the end, change the elevator direction // if we've reached the end, change the elevator direction
if (elevator_job_pos == sorted_read_jobs.end() && elevator_direction == 1) if (elevator_job_pos == sorted_read_jobs.end() && elevator_direction == 1)
{ {
@ -1605,6 +1612,7 @@ namespace libtorrent
else --elevator_job_pos; else --elevator_job_pos;
TORRENT_ASSERT(to_erase != elevator_job_pos); TORRENT_ASSERT(to_erase != elevator_job_pos);
last_elevator_pos = to_erase->first;
sorted_read_jobs.erase(to_erase); sorted_read_jobs.erase(to_erase);
} }

View File

@ -125,11 +125,12 @@ int bufs_size(file::iovec_t const* bufs, int num_bufs);
// simulate a very slow first read // simulate a very slow first read
struct test_storage : storage_interface struct test_storage : storage_interface
{ {
test_storage() {} test_storage(): m_started(false), m_ready(false) {}
virtual bool initialize(bool allocate_files) { return true; } virtual bool initialize(bool allocate_files) { return true; }
virtual bool has_any_file() { return true; } virtual bool has_any_file() { return true; }
int write( int write(
const char* buf const char* buf
, int slot , int slot
@ -147,7 +148,18 @@ struct test_storage : storage_interface
{ {
if (slot == 0 || slot == 5999) if (slot == 0 || slot == 5999)
{ {
sleep(2000); mutex::scoped_lock l(m_mutex);
std::cerr << "--- starting job " << slot << " waiting for main thread ---\n" << std::endl;
m_ready = true;
m_ready_condition.signal(l);
while (!m_started)
m_condition.wait(l);
m_condition.clear(l);
m_ready_condition.clear(l);
m_ready = false;
m_started = false;
std::cerr << "--- starting ---\n" << std::endl; std::cerr << "--- starting ---\n" << std::endl;
} }
return size; return size;
@ -185,6 +197,28 @@ struct test_storage : storage_interface
virtual bool delete_files() { return false; } virtual bool delete_files() { return false; }
virtual ~test_storage() {} virtual ~test_storage() {}
void wait_for_ready()
{
mutex::scoped_lock l(m_mutex);
while (!m_ready)
m_ready_condition.wait(l);
}
void start()
{
mutex::scoped_lock l(m_mutex);
m_started = true;
m_condition.signal(l);
}
private:
condition m_ready_condition;
condition m_condition;
mutex m_mutex;
bool m_started;
bool m_ready;
}; };
storage_interface* create_test_storage(file_storage const& fs storage_interface* create_test_storage(file_storage const& fs
@ -260,11 +294,12 @@ void run_elevator_test()
dio.add_job(j); dio.add_job(j);
// test the elevator going up // test the elevator going up
turns = 0;
direction = 1; direction = 1;
last_job = 0; last_job = 0;
add_job(dio, 0, pm); // trigger delay in storage add_job(dio, 0, pm); // trigger delay in storage
// make sure the job is processed // make sure the job is processed
sleep(200); ((test_storage*)pm->get_storage_impl())->wait_for_ready();
boost::uint32_t p = 1234513; boost::uint32_t p = 1234513;
for (int i = 0; i < 100; ++i) for (int i = 0; i < 100; ++i)
@ -275,22 +310,25 @@ void run_elevator_test()
add_job(dio, job, pm); add_job(dio, job, pm);
} }
((test_storage*)pm->get_storage_impl())->start();
for (int i = 0; i < 101; ++i) for (int i = 0; i < 101; ++i)
{ {
ios.run_one(ec); ios.run_one(ec);
if (ec) std::cerr << "run_one: " << ec.message() << std::endl; if (ec) std::cerr << "run_one: " << ec.message() << std::endl;
} }
TEST_CHECK(turns < 2); TEST_CHECK(turns == 0);
TEST_EQUAL(job_counter, 0); TEST_EQUAL(job_counter, 0);
std::cerr << "number of elevator turns: " << turns << std::endl; std::cerr << "number of elevator turns: " << turns << std::endl;
// test the elevator going down // test the elevator going down
turns = 0;
direction = -1; direction = -1;
last_job = 6000; last_job = 6000;
add_job(dio, 5999, pm); // trigger delay in storage add_job(dio, 5999, pm); // trigger delay in storage
// make sure the job is processed // make sure the job is processed
sleep(200); ((test_storage*)pm->get_storage_impl())->wait_for_ready();
for (int i = 0; i < 100; ++i) for (int i = 0; i < 100; ++i)
{ {
@ -300,13 +338,15 @@ void run_elevator_test()
add_job(dio, job, pm); add_job(dio, job, pm);
} }
((test_storage*)pm->get_storage_impl())->start();
for (int i = 0; i < 101; ++i) for (int i = 0; i < 101; ++i)
{ {
ios.run_one(ec); ios.run_one(ec);
if (ec) std::cerr << "run_one: " << ec.message() << std::endl; if (ec) std::cerr << "run_one: " << ec.message() << std::endl;
} }
TEST_CHECK(turns < 2); TEST_CHECK(turns == 0);
TEST_EQUAL(job_counter, 0); TEST_EQUAL(job_counter, 0);
std::cerr << "number of elevator turns: " << turns << std::endl; std::cerr << "number of elevator turns: " << turns << std::endl;
@ -320,7 +360,7 @@ void run_elevator_test()
direction = 0; direction = 0;
add_job(dio, 0, pm); // trigger delay in storage add_job(dio, 0, pm); // trigger delay in storage
// make sure the job is processed // make sure the job is processed
sleep(200); ((test_storage*)pm->get_storage_impl())->wait_for_ready();
for (int i = 0; i < 100; ++i) for (int i = 0; i < 100; ++i)
{ {
@ -330,6 +370,8 @@ void run_elevator_test()
add_job(dio, job, pm); add_job(dio, job, pm);
} }
((test_storage*)pm->get_storage_impl())->start();
for (int i = 0; i < 101; ++i) for (int i = 0; i < 101; ++i)
{ {
ios.run_one(ec); ios.run_one(ec);