adds invariant check to make sure pieces that are complete are also checked. adds a debug function in the disk_io_thread. Fixes a problem with a failing disk write that would not clear the block for re-download. Adds some extra session logging

This commit is contained in:
Arvid Norberg 2007-10-18 00:32:16 +00:00
parent d0ef85e802
commit b032c26bf0
4 changed files with 125 additions and 13 deletions

View File

@ -106,6 +106,10 @@ namespace libtorrent
, boost::function<void(int, disk_io_job const&)> const& f , boost::function<void(int, disk_io_job const&)> const& f
= boost::function<void(int, disk_io_job const&)>()); = boost::function<void(int, disk_io_job const&)>());
#ifndef NDEBUG
disk_io_job find_job(boost::intrusive_ptr<piece_manager> s
, int action, int piece) const;
#endif
// keep track of the number of bytes in the job queue // keep track of the number of bytes in the job queue
// at any given time. i.e. the sum of all buffer_size. // at any given time. i.e. the sum of all buffer_size.
// this is used to slow down the download global download // this is used to slow down the download global download
@ -120,7 +124,7 @@ namespace libtorrent
private: private:
boost::mutex m_mutex; mutable boost::mutex m_mutex;
boost::condition m_signal; boost::condition m_signal;
bool m_abort; bool m_abort;
std::deque<disk_io_job> m_jobs; std::deque<disk_io_job> m_jobs;
@ -131,6 +135,7 @@ namespace libtorrent
#ifndef NDEBUG #ifndef NDEBUG
int m_block_size; int m_block_size;
disk_io_job m_current;
#endif #endif
#ifdef TORRENT_DISK_STATS #ifdef TORRENT_DISK_STATS

View File

@ -70,6 +70,31 @@ namespace libtorrent
m_disk_io_thread.join(); m_disk_io_thread.join();
} }
#ifndef NDEBUG
disk_io_job disk_io_thread::find_job(boost::intrusive_ptr<piece_manager> s
, int action, int piece) const
{
boost::mutex::scoped_lock l(m_mutex);
for (std::deque<disk_io_job>::const_iterator i = m_jobs.begin();
i != m_jobs.end(); ++i)
{
if (i->storage != s)
continue;
if ((i->action == action || action == -1) && i->piece == piece)
return *i;
}
if ((m_current.action == action || action == -1)
&& m_current.piece == piece)
return m_current;
disk_io_job ret;
ret.action = (disk_io_job::action_t)-1;
ret.piece = -1;
return ret;
}
#endif
// aborts read operations // aborts read operations
void disk_io_thread::stop(boost::intrusive_ptr<piece_manager> s) void disk_io_thread::stop(boost::intrusive_ptr<piece_manager> s)
{ {
@ -205,12 +230,19 @@ namespace libtorrent
m_log << log_time() << " idle" << std::endl; m_log << log_time() << " idle" << std::endl;
#endif #endif
boost::mutex::scoped_lock l(m_mutex); boost::mutex::scoped_lock l(m_mutex);
#ifndef NDEBUG
m_current.action = (disk_io_job::action_t)-1;
m_current.piece = -1;
#endif
while (m_jobs.empty() && !m_abort) while (m_jobs.empty() && !m_abort)
m_signal.wait(l); m_signal.wait(l);
if (m_abort && m_jobs.empty()) return; if (m_abort && m_jobs.empty()) return;
boost::function<void(int, disk_io_job const&)> handler; boost::function<void(int, disk_io_job const&)> handler;
handler.swap(m_jobs.front().callback); handler.swap(m_jobs.front().callback);
#ifndef NDEBUG
m_current = m_jobs.front();
#endif
disk_io_job j = m_jobs.front(); disk_io_job j = m_jobs.front();
m_jobs.pop_front(); m_jobs.pop_front();
m_queue_buffer_size -= j.buffer_size; m_queue_buffer_size -= j.buffer_size;

View File

@ -1373,18 +1373,25 @@ namespace libtorrent
{ {
session_impl::mutex_t::scoped_lock l(m_ses.m_mutex); session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
INVARIANT_CHECK;
m_outstanding_writing_bytes -= p.length; m_outstanding_writing_bytes -= p.length;
TORRENT_ASSERT(m_outstanding_writing_bytes >= 0); TORRENT_ASSERT(m_outstanding_writing_bytes >= 0);
#ifdef TORRENT_VERBOSE_LOGGING #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
(*m_logger) << " *** on_disk_write_complete() " << p.length << "\n"; (*m_ses.m_logger) << time_now_string() << " *** DISK_WRITE_COMPLETE [ p: "
<< p.piece << " o: " << p.start << " ]\n";
#endif #endif
// in case the outstanding bytes just dropped down // in case the outstanding bytes just dropped down
// to allow to receive more data // to allow to receive more data
setup_receive(); setup_receive();
piece_block block_finished(p.piece, p.start / t->block_size());
if (ret == -1 || !t) if (ret == -1 || !t)
{ {
if (t->has_picker()) t->picker().abort_download(block_finished);
if (!t) if (!t)
{ {
m_ses.connection_failed(m_socket, remote(), j.str.c_str()); m_ses.connection_failed(m_socket, remote(), j.str.c_str());
@ -1406,7 +1413,6 @@ namespace libtorrent
TORRENT_ASSERT(p.piece == j.piece); TORRENT_ASSERT(p.piece == j.piece);
TORRENT_ASSERT(p.start == j.offset); TORRENT_ASSERT(p.start == j.offset);
piece_block block_finished(p.piece, p.start / t->block_size());
picker.mark_as_finished(block_finished, peer_info_struct()); picker.mark_as_finished(block_finished, peer_info_struct());
if (t->alerts().should_post(alert::debug)) if (t->alerts().should_post(alert::debug))
{ {
@ -1414,13 +1420,6 @@ namespace libtorrent
block_finished.block_index, block_finished.piece_index, "block finished")); block_finished.block_index, block_finished.piece_index, "block finished"));
} }
if (!t->is_seed() && !m_torrent.expired())
{
// this is a free function defined in policy.cpp
request_a_block(*t, *this);
send_block_requests();
}
#ifndef NDEBUG #ifndef NDEBUG
try try
{ {
@ -1444,6 +1443,14 @@ namespace libtorrent
TORRENT_ASSERT(false); TORRENT_ASSERT(false);
} }
#endif #endif
if (!t->is_seed() && !m_torrent.expired())
{
// this is a free function defined in policy.cpp
request_a_block(*t, *this);
send_block_requests();
}
} }
// ----------------------------- // -----------------------------
@ -2891,6 +2898,35 @@ namespace libtorrent
TORRENT_ASSERT(false); TORRENT_ASSERT(false);
} }
if (t->has_picker())
{
// make sure that pieces that have completed the download
// of all their blocks are in the disk io thread's queue
// to be checked.
const std::vector<piece_picker::downloading_piece>& dl_queue
= t->picker().get_download_queue();
for (std::vector<piece_picker::downloading_piece>::const_iterator i =
dl_queue.begin(); i != dl_queue.end(); ++i)
{
const int blocks_per_piece = t->picker().blocks_in_piece(i->index);
bool complete = true;
for (int j = 0; j < blocks_per_piece; ++j)
{
if (i->info[j].state == piece_picker::block_info::state_finished)
continue;
complete = false;
break;
}
if (complete)
{
disk_io_job ret = m_ses.m_disk_thread.find_job(
&t->filesystem(), -1, i->index);
TORRENT_ASSERT(ret.action == disk_io_job::hash || ret.action == disk_io_job::write);
TORRENT_ASSERT(ret.piece == i->index);
}
}
}
// expensive when using checked iterators // expensive when using checked iterators
/* /*
if (t->valid_metadata()) if (t->valid_metadata())

View File

@ -819,6 +819,11 @@ namespace libtorrent
{ {
session_impl::mutex_t::scoped_lock l(m_ses.m_mutex); session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
(*m_ses.m_logger) << time_now_string() << " *** PIECE_FINISHED [ p: "
<< index << " chk: " << (passed_hash_check?"passed":"failed") << " ]\n";
#endif
bool was_seed = is_seed(); bool was_seed = is_seed();
bool was_finished = m_picker->num_filtered() + num_pieces() bool was_finished = m_picker->num_filtered() + num_pieces()
== torrent_file().num_pieces(); == torrent_file().num_pieces();
@ -2112,7 +2117,8 @@ namespace libtorrent
expire_bandwidth(channel, blk - amount); expire_bandwidth(channel, blk - amount);
} }
// called when torrent is finished (all interested pieces downloaded) // called when torrent is finished (all interesting
// pieces have been downloaded)
void torrent::finished() void torrent::finished()
{ {
INVARIANT_CHECK; INVARIANT_CHECK;
@ -2476,6 +2482,36 @@ namespace libtorrent
TORRENT_ASSERT(total_done == 0); TORRENT_ASSERT(total_done == 0);
} }
if (m_picker)
{
// make sure that pieces that have completed the download
// of all their blocks are in the disk io thread's queue
// to be checked.
const std::vector<piece_picker::downloading_piece>& dl_queue
= m_picker->get_download_queue();
for (std::vector<piece_picker::downloading_piece>::const_iterator i =
dl_queue.begin(); i != dl_queue.end(); ++i)
{
const int blocks_per_piece = m_picker->blocks_in_piece(i->index);
bool complete = true;
for (int j = 0; j < blocks_per_piece; ++j)
{
if (i->info[j].state == piece_picker::block_info::state_finished)
continue;
complete = false;
break;
}
if (complete)
{
disk_io_job ret = m_ses.m_disk_thread.find_job(
m_owning_storage, -1, i->index);
TORRENT_ASSERT(ret.action == disk_io_job::hash || ret.action == disk_io_job::write);
TORRENT_ASSERT(ret.piece == i->index);
}
}
}
// This check is very expensive. // This check is very expensive.
TORRENT_ASSERT(m_num_pieces TORRENT_ASSERT(m_num_pieces
== std::count(m_have_pieces.begin(), m_have_pieces.end(), true)); == std::count(m_have_pieces.begin(), m_have_pieces.end(), true));
@ -2733,7 +2769,7 @@ namespace libtorrent
void torrent::async_verify_piece(int piece_index, boost::function<void(bool)> const& f) void torrent::async_verify_piece(int piece_index, boost::function<void(bool)> const& f)
{ {
INVARIANT_CHECK; // INVARIANT_CHECK;
TORRENT_ASSERT(m_storage); TORRENT_ASSERT(m_storage);
TORRENT_ASSERT(m_storage->refcount() > 0); TORRENT_ASSERT(m_storage->refcount() > 0);
@ -2743,6 +2779,9 @@ namespace libtorrent
m_storage->async_hash(piece_index, bind(&torrent::on_piece_verified m_storage->async_hash(piece_index, bind(&torrent::on_piece_verified
, shared_from_this(), _1, _2, f)); , shared_from_this(), _1, _2, f));
#ifndef NDEBUG
check_invariant();
#endif
} }
void torrent::on_piece_verified(int ret, disk_io_job const& j void torrent::on_piece_verified(int ret, disk_io_job const& j