post disk job completions in batches to save CPU in the network thread

This commit is contained in:
Arvid Norberg 2011-10-19 05:46:49 +00:00
parent d3fb55e1da
commit b430152174
2 changed files with 52 additions and 20 deletions

View File

@ -318,8 +318,7 @@ namespace libtorrent
= boost::function<void(int, disk_io_job const&)>());
bool test_error(disk_io_job& j);
void post_callback(boost::function<void(int, disk_io_job const&)> const& handler
, disk_io_job const& j, int ret);
void post_callback(disk_io_job const& j, int ret);
// cache operations
cache_piece_index_t::iterator find_cached_piece(
@ -376,7 +375,7 @@ namespace libtorrent
// read cache
cache_t m_read_pieces;
void flip_stats();
void flip_stats(ptime now);
// total number of blocks in use by both the read
// and the write cache. This is not supposed to
@ -441,6 +440,10 @@ namespace libtorrent
// the session_impl object
file_pool& m_file_pool;
// when completion notifications are queued, they're stuck
// in this list
std::list<std::pair<disk_io_job, int> > m_queued_completions;
// thread for performing blocking disk io operations
thread m_disk_io_thread;
};

View File

@ -124,7 +124,7 @@ namespace libtorrent
return !m_exceeded_write_queue;
}
void disk_io_thread::flip_stats()
void disk_io_thread::flip_stats(ptime now)
{
// calling mean() will actually reset the accumulators
m_cache_stats.average_queue_time = m_queue_time.mean();
@ -133,7 +133,8 @@ namespace libtorrent
m_cache_stats.average_hash_time = m_hash_time.mean();
m_cache_stats.average_job_time = m_job_time.mean();
m_cache_stats.average_sort_time = m_sort_time.mean();
m_last_stats_flip = time_now();
m_last_stats_flip = now;
}
void disk_io_thread::get_cache_info(sha1_hash const& ih, std::vector<cached_piece_info>& ret) const
@ -209,7 +210,7 @@ namespace libtorrent
TORRENT_ASSERT(m_queue_buffer_size >= i->buffer_size);
m_queue_buffer_size -= i->buffer_size;
}
post_callback(i->callback, *i, -3);
post_callback(*i, -3);
i = m_jobs.erase(i);
continue;
}
@ -663,8 +664,9 @@ namespace libtorrent
j.buffer_size = (std::min)(piece_size - i * m_block_size, m_block_size);
int result = j.error ? -1 : j.buffer_size;
j.offset = i * m_block_size;
j.callback = p.blocks[i].callback;
buffers.push_back(p.blocks[i].buf);
post_callback(p.blocks[i].callback, j, result);
post_callback(j, result);
p.blocks[i].callback.clear();
p.blocks[i].buf = 0;
++ret;
@ -1338,13 +1340,27 @@ namespace libtorrent
return false;
}
void disk_io_thread::post_callback(
boost::function<void(int, disk_io_job const&)> const& handler
, disk_io_job const& j, int ret)
typedef std::list<std::pair<disk_io_job, int> > job_queue_t;
void completion_queue_handler(job_queue_t* completed_jobs)
{
if (!handler) return;
boost::shared_ptr<job_queue_t> holder(completed_jobs);
m_ios.post(boost::bind(handler, ret, j));
for (job_queue_t::iterator i = completed_jobs->begin()
, end(completed_jobs->end()); i != end; ++i)
{
TORRENT_TRY
{
i->first.callback(i->second, i->first);
}
TORRENT_CATCH(std::exception& e)
{}
}
}
void disk_io_thread::post_callback(disk_io_job const& j, int ret)
{
if (!j.callback) return;
m_queued_completions.push_back(std::make_pair(j, ret));
}
enum action_flags_t
@ -1446,8 +1462,19 @@ namespace libtorrent
#ifdef TORRENT_DISK_STATS
m_log << log_time() << " idle" << std::endl;
#endif
mutex::scoped_lock jl(m_queue_mutex);
if (m_queued_completions.size() >= 30 || (m_jobs.empty() && !m_queued_completions.empty()))
{
job_queue_t* q = new job_queue_t;
q->swap(m_queued_completions);
m_ios.post(boost::bind(completion_queue_handler, q));
}
ptime job_start;
while (m_jobs.empty() && m_sorted_read_jobs.empty() && !m_abort)
{
// if there hasn't been an event in one second
@ -1457,7 +1484,8 @@ namespace libtorrent
m_signal.wait(jl);
m_signal.clear(jl);
if (time_now() > m_last_stats_flip + seconds(1)) flip_stats();
job_start = time_now();
if (job_start >= m_last_stats_flip + seconds(1)) flip_stats(job_start);
}
if (m_abort && m_jobs.empty())
@ -1494,7 +1522,6 @@ namespace libtorrent
disk_io_job j;
ptime now = time_now_hires();
m_queue_time.add_sample(total_microseconds(now - j.start_time));
ptime operation_start = now;
// make sure we don't starve out the read queue by just issuing
@ -1654,6 +1681,8 @@ namespace libtorrent
m_sorted_read_jobs.erase(to_erase);
}
m_queue_time.add_sample(total_microseconds(now - j.start_time));
// if there's a buffer in this job, it will be freed
// when this holder is destructed, unless it has been
// released.
@ -1744,7 +1773,7 @@ namespace libtorrent
TORRENT_ASSERT(m_queue_buffer_size >= i->buffer_size);
m_queue_buffer_size -= i->buffer_size;
}
post_callback(i->callback, *i, -3);
post_callback(*i, -3);
i = m_jobs.erase(i);
continue;
}
@ -1759,7 +1788,7 @@ namespace libtorrent
++i;
continue;
}
post_callback(i->second.callback, i->second, -3);
post_callback(i->second, -3);
if (elevator_job_pos == i) ++elevator_job_pos;
m_sorted_read_jobs.erase(i++);
}
@ -1806,7 +1835,7 @@ namespace libtorrent
TORRENT_ASSERT(m_queue_buffer_size >= i->buffer_size);
m_queue_buffer_size -= i->buffer_size;
}
post_callback(i->callback, *i, -3);
post_callback(*i, -3);
i = m_jobs.erase(i);
continue;
}
@ -1822,7 +1851,7 @@ namespace libtorrent
++i;
continue;
}
post_callback(i->second.callback, i->second, -3);
post_callback(i->second, -3);
if (elevator_job_pos == i) ++elevator_job_pos;
m_sorted_read_jobs.erase(i++);
}
@ -2328,7 +2357,7 @@ namespace libtorrent
TORRENT_TRY {
TORRENT_ASSERT(j.callback);
if (j.callback && ret == piece_manager::need_full_check)
post_callback(j.callback, j, ret);
post_callback(j, ret);
} TORRENT_CATCH(std::exception&) {}
if (ret != piece_manager::need_full_check) break;
}
@ -2399,7 +2428,7 @@ namespace libtorrent
&& j.buffer != 0)
rename_buffer(j.buffer, "posted send buffer");
#endif
post_callback(j.callback, j, ret);
post_callback(j, ret);
} TORRENT_CATCH(std::exception&) {
TORRENT_ASSERT(false);
}