From a1299c3a101f909c893a21666cdfa7fb347c4275 Mon Sep 17 00:00:00 2001 From: Arvid Norberg Date: Tue, 12 Jan 2010 01:56:48 +0000 Subject: [PATCH] optimize disk I/O elevator algorithm to spend less time picking job --- examples/Jamfile | 1 + examples/client_test.cpp | 33 +-- include/libtorrent/disk_io_thread.hpp | 10 +- parse_disk_log.py | 14 +- src/disk_io_thread.cpp | 277 +++++++++++++------------- src/time.cpp | 2 +- 6 files changed, 174 insertions(+), 163 deletions(-) diff --git a/examples/Jamfile b/examples/Jamfile index 3314bb802..c2a6ae6e2 100644 --- a/examples/Jamfile +++ b/examples/Jamfile @@ -22,4 +22,5 @@ exe simple_client : simple_client.cpp ; exe dump_torrent : dump_torrent.cpp ; exe make_torrent : make_torrent.cpp ; exe enum_if : enum_if.cpp ; +exe connection_tester : connection_tester.cpp ; diff --git a/examples/client_test.cpp b/examples/client_test.cpp index b97d9425f..26960391a 100644 --- a/examples/client_test.cpp +++ b/examples/client_test.cpp @@ -499,6 +499,17 @@ void print_peer_info(std::string& out, std::vector const& typedef std::multimap handles_t; +int listen_port = 6881; +float preferred_ratio = 0.f; +std::string allocation_mode = "sparse"; +std::string save_path("."); +int torrent_upload_limit = 0; +int torrent_download_limit = 0; +std::string monitor_dir; +std::string bind_to_interface = ""; +int poll_interval = 5; +int max_connections_per_torrent = 50; + using boost::bind; // monitored_dir is true if this torrent is added because @@ -548,7 +559,7 @@ void add_torrent(libtorrent::session& ses handles.insert(std::pair( monitored_dir?std::string(torrent):std::string(), h)); - h.set_max_connections(50); + h.set_max_connections(max_connections_per_torrent); h.set_max_uploads(-1); h.set_ratio(preferred_ratio); h.set_upload_limit(torrent_upload_limit); @@ -675,7 +686,7 @@ void handle_alert(libtorrent::session& ses, libtorrent::alert* a if (torrent_finished_alert* p = alert_cast(a)) { - p->handle.set_max_connections(30); + p->handle.set_max_connections(max_connections_per_torrent / 2); // write resume data for the finished torrent // the alert handler for save_resume_data_alert @@ -736,6 +747,7 @@ int main(int argc, char* argv[]) " -t sets the scan interval of the monitor dir\n" " -x loads an emule IP-filter file\n" " -c sets the max number of connections\n" + " -T sets the max number of connections per torrent\n" #if TORRENT_USE_I2P " -i the hostname to an I2P SAM bridge to use\n" #endif @@ -743,6 +755,7 @@ int main(int argc, char* argv[]) " -F sets the UI refresh rate. This is the number of\n" " seconds between screen refreshes.\n" " -n announce to trackers in all tiers\n" + " -h allow multiple connections from the same IP\n" "\n\n" "TORRENT is a path to a .torrent file\n" "MAGNETURL is a magnet: url\n") @@ -812,16 +825,6 @@ int main(int argc, char* argv[]) ses.load_country_db("GeoIP.dat"); #endif - int listen_port = 6881; - float preferred_ratio = 0.f; - std::string allocation_mode = "sparse"; - std::string save_path("."); - int torrent_upload_limit = 0; - int torrent_download_limit = 0; - std::string monitor_dir; - std::string bind_to_interface = ""; - int poll_interval = 5; - // load the torrents given on the commandline for (int i = 1; i < argc; ++i) @@ -848,7 +851,7 @@ int main(int argc, char* argv[]) handles.insert(std::pair(std::string(), h)); - h.set_max_connections(50); + h.set_max_connections(max_connections_per_torrent); h.set_max_uploads(-1); h.set_ratio(preferred_ratio); h.set_upload_limit(torrent_upload_limit); @@ -883,7 +886,7 @@ int main(int argc, char* argv[]) handles.insert(std::pair(std::string(), h)); - h.set_max_connections(50); + h.set_max_connections(max_connections_per_torrent); h.set_max_uploads(-1); h.set_ratio(preferred_ratio); h.set_upload_limit(torrent_upload_limit); @@ -905,6 +908,7 @@ int main(int argc, char* argv[]) { case 'f': g_log_file = fopen(arg, "w+"); break; case 'o': ses.set_max_half_open_connections(atoi(arg)); break; + case 'h': settings.allow_multiple_connections_per_ip = true; --i; break; case 'p': listen_port = atoi(arg); break; case 'r': preferred_ratio = atoi(arg); @@ -954,6 +958,7 @@ int main(int argc, char* argv[]) } break; case 'c': ses.set_max_connections(atoi(arg)); break; + case 'T': max_connections_per_torrent = atoi(arg); break; #if TORRENT_USE_I2P case 'i': { diff --git a/include/libtorrent/disk_io_thread.hpp b/include/libtorrent/disk_io_thread.hpp index c4fa0926e..6115710b9 100644 --- a/include/libtorrent/disk_io_thread.hpp +++ b/include/libtorrent/disk_io_thread.hpp @@ -72,7 +72,6 @@ namespace libtorrent , buffer_size(0) , piece(0) , offset(0) - , phys_offset(-1) , priority(0) {} @@ -103,7 +102,6 @@ namespace libtorrent boost::intrusive_ptr storage; // arguments used for read and write int piece, offset; - size_type phys_offset; // used for move_storage and rename_file. On errors, this is set // to the error message std::string str; @@ -127,12 +125,6 @@ namespace libtorrent boost::function callback; }; - // returns true if the disk job requires ordering - // some jobs may not be processed until all jobs - // ahead of it in the queue have been processed - // jobs that require this are fence operation - bool is_fence_operation(disk_io_job const& j); - // returns true if the fundamental operation // of the given disk job is a read operation bool is_read_operation(disk_io_job const& j); @@ -312,6 +304,8 @@ namespace libtorrent boost::shared_array blocks; }; + // TODO: turn this into a multi-index list + // sorted by piece and last use time typedef std::list cache_t; private: diff --git a/parse_disk_log.py b/parse_disk_log.py index cad9096e0..b3d36432c 100755 --- a/parse_disk_log.py +++ b/parse_disk_log.py @@ -11,7 +11,11 @@ if len(sys.argv) < 2: print "usage: parse_disk_log.py logfile [seconds]" sys.exit(1) -keys = ['write', 'read', 'hash', 'move', 'release', 'idle', 'delete', 'check_fastresume', 'check_files', 'clear-cache', 'abort_thread', 'abort_torrent', 'save_resume_data', 'rename_file', 'flushing', 'update_settings'] +keys = ['write', 'read', 'hash', 'move', 'release', 'idle', \ + 'delete', 'check_fastresume', 'check_files', 'clear-cache', \ + 'abort_thread', 'abort_torrent', 'save_resume_data', 'rename_file', \ + 'flushing', 'update_settings', 'finalize_file', 'sorting_job', \ + 'check_cache_hit'] throughput_keys = ['write', 'read'] # logfile format: @@ -21,9 +25,9 @@ throughput_keys = ['write', 'read'] # 34722 write if len(sys.argv) > 2: - quantization = long(sys.argv[2]) * 1000 + quantization = long(sys.argv[2]) * 1000000 else: - quantization = 5000 + quantization = 500000 out = open('disk_io.dat', 'wb') out2 = open('disk_throughput.dat', 'wb') @@ -86,7 +90,7 @@ out = open('disk_io.gnuplot', 'wb') print >>out, "set term png size 1200,700" print >>out, 'set output "disk_throughput.png"' -print >>out, 'set title "disk throughput per %s second(s)"' % (quantization / 1000) +print >>out, 'set title "disk throughput per %f second(s)"' % (quantization / 1000000.f) print >>out, 'set ylabel "throughput (kB/s)"' print >>out, 'plot', i = 0 @@ -98,7 +102,7 @@ print >>out, 'x=0' print >>out, 'set output "disk_io.png"' print >>out, 'set ylabel "utilization (%)"' print >>out, 'set xrange [0:*]' -print >>out, 'set title "disk io utilization per %s second(s)"' % (quantization / 1000) +print >>out, 'set title "disk io utilization per %f second(s)"' % (quantization / 1000000.f) print >>out, "set key box" print >>out, "set style data histogram" print >>out, "set style histogram rowstacked" diff --git a/src/disk_io_thread.cpp b/src/disk_io_thread.cpp index e8d01a26a..927ce4218 100644 --- a/src/disk_io_thread.cpp +++ b/src/disk_io_thread.cpp @@ -52,6 +52,10 @@ POSSIBILITY OF SUCH DAMAGE. namespace libtorrent { + bool should_cancel_on_abort(disk_io_job const& j); + bool is_read_operation(disk_io_job const& j); + bool operation_has_buffer(disk_io_job const& j); + disk_buffer_pool::disk_buffer_pool(int block_size) : m_block_size(block_size) , m_in_use(0) @@ -371,15 +375,9 @@ namespace libtorrent ++i; continue; } - if (i->action == disk_io_job::read) + if (should_cancel_on_abort(*i)) { - post_callback(i->callback, *i, -1); - m_jobs.erase(i++); - continue; - } - if (i->action == disk_io_job::check_files) - { - post_callback(i->callback, *i, piece_manager::disk_check_aborted); + post_callback(i->callback, *i, -3); m_jobs.erase(i++); continue; } @@ -1320,34 +1318,34 @@ namespace libtorrent enum action_flags_t { read_operation = 1 - , fence_operation = 2 - , buffer_operation = 4 + , buffer_operation = 2 + , cancel_on_abort = 4 }; static const boost::uint8_t action_flags[] = { - read_operation + buffer_operation // read + read_operation + buffer_operation + cancel_on_abort // read , buffer_operation // write , 0 // hash - , fence_operation // move_storage - , fence_operation // release_files - , fence_operation // delete_files - , fence_operation // check_fastresume - , read_operation // check_files - , fence_operation // save_resume_data - , fence_operation // rename_file - , fence_operation // abort_thread - , fence_operation // clear_read_cache - , fence_operation // abort_torrent - , 0 // update_settings - , read_operation // read_and_hash + , 0 // move_storage + , 0 // release_files + , 0 // delete_files + , 0 // check_fastresume + , read_operation + cancel_on_abort // check_files + , 0 // save_resume_data + , 0 // rename_file + , 0 // abort_thread + , 0 // clear_read_cache + , 0 // abort_torrent + , cancel_on_abort // update_settings + , read_operation + cancel_on_abort // read_and_hash , 0 // finalize_file }; - bool is_fence_operation(disk_io_job const& j) + bool should_cancel_on_abort(disk_io_job const& j) { TORRENT_ASSERT(j.action >= 0 && j.action < sizeof(action_flags)); - return action_flags[j.action] & fence_operation; + return action_flags[j.action] & cancel_on_abort; } bool is_read_operation(disk_io_job const& j) @@ -1364,9 +1362,13 @@ namespace libtorrent void disk_io_thread::thread_fun() { - size_type elevator_position = 0; + // 1 = forward in list, -1 = backwards in list int elevator_direction = 1; + typedef std::multimap read_jobs_t; + read_jobs_t sorted_read_jobs; + read_jobs_t::iterator elevator_job_pos = sorted_read_jobs.begin(); + for (;;) { #ifdef TORRENT_DISK_STATS @@ -1374,7 +1376,7 @@ namespace libtorrent #endif mutex::scoped_lock jl(m_queue_mutex); - while (m_jobs.empty() && !m_abort) + while (m_jobs.empty() && sorted_read_jobs.empty() && !m_abort) { // if there hasn't been an event in one second // see if we should flush the cache @@ -1404,111 +1406,96 @@ namespace libtorrent return; } - std::list::iterator selected_job = m_jobs.begin(); + disk_io_job j; - if (m_settings.allow_reordered_disk_operations - && is_read_operation(*selected_job)) + if (!m_jobs.empty()) { - // Before reading the current block, read any - // blocks between the read head and the queued - // block, elevator style + // we have a job in the job queue. If it's + // a read operation and we are allowed to + // reorder jobs, sort it into the read job + // list and continue, otherwise just pop it + // and use it later + j = m_jobs.front(); + m_jobs.pop_front(); + jl.unlock(); - std::list::iterator best_job, i; - size_type score, best_score = (size_type) -1; + bool defer = false; - for (;;) + if (is_read_operation(j)) { - for (i = m_jobs.begin(); i != m_jobs.end(); ++i) + defer = true; + + // at this point the operation we're looking + // at is a read operation. If this read operation + // can be fully satisfied by the read cache, handle + // it immediately + if (m_settings.use_read_cache) { - // ignore fence_operations - if (is_fence_operation(*i)) - continue; - - // always prioritize all disk-I/O jobs - // that are not read operations - if (!is_read_operation(*i)) - { - best_job = i; - best_score = 0; - break; - } - - // at this point the operation we're looking - // at is a read operation. If this read operation - // can be fully satisfied by the read cache, handle - // it immediately - if (m_settings.use_read_cache) - { - // unfortunately we need to lock the cache - // if the cache querying function would be - // made asyncronous, this would not be - // necessary anymore - mutex::scoped_lock l(m_piece_mutex); - cache_t::iterator p - = find_cached_piece(m_read_pieces, *i, l); - - // if it's a cache hit, process the job immediately - if (p != m_read_pieces.end() && is_cache_hit(p, *i, l)) - { - best_job = i; - best_score = 0; - break; - } - } - - // we only need to query for physical offset - // for read operations, since those are - // the only ones we re-order - if (i->phys_offset == -1) - i->phys_offset = i->storage->physical_offset(i->piece, i->offset); - - if (elevator_direction > 0) - { - score = i->phys_offset - elevator_position; - if (i->phys_offset >= elevator_position - && (score < best_score - || best_score == (size_type)-1)) - { - best_score = score; - best_job = i; - } - } - else - { - score = elevator_position - i->phys_offset; - if (i->phys_offset <= elevator_position - && (score < best_score - || best_score == (size_type)-1)) - { - best_score = score; - best_job = i; - } - } +#ifdef TORRENT_DISK_STATS + m_log << log_time() << " check_cache_hit" << std::endl; +#endif + // unfortunately we need to lock the cache + // if the cache querying function would be + // made asyncronous, this would not be + // necessary anymore + mutex::scoped_lock l(m_piece_mutex); + cache_t::iterator p + = find_cached_piece(m_read_pieces, j, l); + + // if it's a cache hit, process the job immediately + if (p != m_read_pieces.end() && is_cache_hit(p, j, l)) + defer = false; } - - if (best_score != (size_type) -1) - break; - - elevator_direction = -elevator_direction; } - selected_job = best_job; - // only update the elevator position for read jobs - if (is_read_operation(*selected_job)) - elevator_position = selected_job->phys_offset; + if (m_settings.allow_reordered_disk_operations && defer) + { +#ifdef TORRENT_DISK_STATS + m_log << log_time() << " sorting_job" << std::endl; +#endif + size_type phys_off = j.storage->physical_offset(j.piece, j.offset); + sorted_read_jobs.insert(std::pair(phys_off, j)); + continue; + } + } + else + { + // the job queue is empty, pick the next read job + // from the sorted job list. So we don't need the + // job queue lock anymore + jl.unlock(); + + TORRENT_ASSERT(!sorted_read_jobs.empty()); + + // if we've reached the end, change the elevator direction + if (elevator_job_pos == sorted_read_jobs.end() && elevator_direction == 1) + { + elevator_direction = -1; + --elevator_job_pos; + } + + j = elevator_job_pos->second; + read_jobs_t::iterator to_erase = elevator_job_pos; + + // if we've reached the begining of the sorted list, + // change the elvator direction + if (elevator_job_pos == sorted_read_jobs.begin() && elevator_direction == -1) + elevator_direction = 1; + + // move the elevator before erasing the job we're processing + // to keep the iterator valid + if (elevator_direction > 0) ++elevator_job_pos; + else --elevator_job_pos; + + sorted_read_jobs.erase(to_erase); } // if there's a buffer in this job, it will be freed // when this holder is destructed, unless it has been // released. disk_buffer_holder holder(*this - , operation_has_buffer(*selected_job) ? selected_job->buffer : 0); + , operation_has_buffer(j) ? j.buffer : 0); - boost::function handler; - handler.swap(selected_job->callback); - - disk_io_job j = *selected_job; - m_jobs.erase(selected_job); if (j.action == disk_io_job::write) { TORRENT_ASSERT(m_queue_buffer_size >= j.buffer_size); @@ -1526,7 +1513,6 @@ namespace libtorrent // can trigger all the connections waiting for this event post = true; } - jl.unlock(); if (post) m_ios.post(m_queue_callback); @@ -1576,14 +1562,27 @@ namespace libtorrent ++i; continue; } - if (i->action == disk_io_job::check_files) + if (should_cancel_on_abort(*i)) { - post_callback(i->callback, *i, piece_manager::disk_check_aborted); + post_callback(i->callback, *i, -3); m_jobs.erase(i++); continue; } ++i; } + jl.unlock(); + // now clear all the read jobs + for (read_jobs_t::iterator i = sorted_read_jobs.begin(); + i != sorted_read_jobs.end();) + { + if (i->second.storage != j.storage) + { + ++i; + continue; + } + post_callback(i->second.callback, i->second, -3); + sorted_read_jobs.erase(i++); + } break; } case disk_io_job::abort_thread: @@ -1591,25 +1590,33 @@ namespace libtorrent #ifdef TORRENT_DISK_STATS m_log << log_time() << " abort_thread " << std::endl; #endif + // clear all read jobs mutex::scoped_lock jl(m_queue_mutex); for (std::list::iterator i = m_jobs.begin(); i != m_jobs.end();) { - if (i->action == disk_io_job::read) + if (should_cancel_on_abort(*i)) { - post_callback(i->callback, *i, -1); - m_jobs.erase(i++); - continue; - } - if (i->action == disk_io_job::check_files) - { - post_callback(i->callback, *i, piece_manager::disk_check_aborted); + post_callback(i->callback, *i, -3); m_jobs.erase(i++); continue; } ++i; } + jl.unlock(); + + for (read_jobs_t::iterator i = sorted_read_jobs.begin(); + i != sorted_read_jobs.end();) + { + if (i->second.storage != j.storage) + { + ++i; + continue; + } + post_callback(i->second.callback, i->second, -3); + sorted_read_jobs.erase(i++); + } m_abort = true; break; @@ -1777,7 +1784,7 @@ namespace libtorrent --p->num_blocks; } p->blocks[block].buf = j.buffer; - p->blocks[block].callback.swap(handler); + p->blocks[block].callback.swap(j.callback); #ifdef TORRENT_DISK_STATS rename_buffer(j.buffer, "write cache"); #endif @@ -1790,7 +1797,7 @@ namespace libtorrent } else { - if (cache_block(j, handler, l) < 0) + if (cache_block(j, j.callback, l) < 0) { l.unlock(); file::iovec_t iov = {j.buffer, j.buffer_size}; @@ -1998,9 +2005,9 @@ namespace libtorrent #ifndef BOOST_NO_EXCEPTIONS try { #endif - TORRENT_ASSERT(handler); - if (handler && ret == piece_manager::need_full_check) - post_callback(handler, j, ret); + TORRENT_ASSERT(j.callback); + if (j.callback && ret == piece_manager::need_full_check) + post_callback(j.callback, j, ret); #ifndef BOOST_NO_EXCEPTIONS } catch (std::exception&) {} #endif @@ -2016,7 +2023,7 @@ namespace libtorrent // if the check is not done, add it at the end of the job queue if (ret == piece_manager::need_full_check) { - add_job(j, handler); + add_job(j, j.callback); continue; } break; @@ -2057,7 +2064,7 @@ namespace libtorrent } #endif -// if (!handler) std::cerr << "DISK THREAD: no callback specified" << std::endl; +// if (!j.callback) std::cerr << "DISK THREAD: no callback specified" << std::endl; // else std::cerr << "DISK THREAD: invoking callback" << std::endl; #ifndef BOOST_NO_EXCEPTIONS try { @@ -2069,7 +2076,7 @@ namespace libtorrent && j.buffer != 0) rename_buffer(j.buffer, "posted send buffer"); #endif - post_callback(handler, j, ret); + post_callback(j.callback, j, ret); #ifndef BOOST_NO_EXCEPTIONS } catch (std::exception&) { diff --git a/src/time.cpp b/src/time.cpp index e49f897db..bbac4a6f8 100644 --- a/src/time.cpp +++ b/src/time.cpp @@ -82,7 +82,7 @@ namespace libtorrent { static const ptime start = time_now_hires(); char ret[200]; - snprintf(ret, sizeof(ret), "%d", total_milliseconds(time_now_hires() - start)); + snprintf(ret, sizeof(ret), "%d", total_microseconds(time_now_hires() - start)); return ret; } }