diff --git a/include/libtorrent/disk_io_thread.hpp b/include/libtorrent/disk_io_thread.hpp index bd6d5e1ba..b93ea8b75 100644 --- a/include/libtorrent/disk_io_thread.hpp +++ b/include/libtorrent/disk_io_thread.hpp @@ -106,6 +106,10 @@ namespace libtorrent , boost::function const& f = boost::function()); +#ifndef NDEBUG + disk_io_job find_job(boost::intrusive_ptr s + , int action, int piece) const; +#endif // keep track of the number of bytes in the job queue // at any given time. i.e. the sum of all buffer_size. // this is used to slow down the download global download @@ -120,7 +124,7 @@ namespace libtorrent private: - boost::mutex m_mutex; + mutable boost::mutex m_mutex; boost::condition m_signal; bool m_abort; std::deque m_jobs; @@ -131,6 +135,7 @@ namespace libtorrent #ifndef NDEBUG int m_block_size; + disk_io_job m_current; #endif #ifdef TORRENT_DISK_STATS diff --git a/src/disk_io_thread.cpp b/src/disk_io_thread.cpp index 22ee12179..1bec3b76b 100644 --- a/src/disk_io_thread.cpp +++ b/src/disk_io_thread.cpp @@ -70,6 +70,31 @@ namespace libtorrent m_disk_io_thread.join(); } +#ifndef NDEBUG + disk_io_job disk_io_thread::find_job(boost::intrusive_ptr s + , int action, int piece) const + { + boost::mutex::scoped_lock l(m_mutex); + for (std::deque::const_iterator i = m_jobs.begin(); + i != m_jobs.end(); ++i) + { + if (i->storage != s) + continue; + if ((i->action == action || action == -1) && i->piece == piece) + return *i; + } + if ((m_current.action == action || action == -1) + && m_current.piece == piece) + return m_current; + + disk_io_job ret; + ret.action = (disk_io_job::action_t)-1; + ret.piece = -1; + return ret; + } + +#endif + // aborts read operations void disk_io_thread::stop(boost::intrusive_ptr s) { @@ -205,12 +230,19 @@ namespace libtorrent m_log << log_time() << " idle" << std::endl; #endif boost::mutex::scoped_lock l(m_mutex); +#ifndef NDEBUG + m_current.action = (disk_io_job::action_t)-1; + m_current.piece = -1; +#endif while (m_jobs.empty() && !m_abort) m_signal.wait(l); if (m_abort && m_jobs.empty()) return; boost::function handler; handler.swap(m_jobs.front().callback); +#ifndef NDEBUG + m_current = m_jobs.front(); +#endif disk_io_job j = m_jobs.front(); m_jobs.pop_front(); m_queue_buffer_size -= j.buffer_size; diff --git a/src/peer_connection.cpp b/src/peer_connection.cpp index 625b8eacd..4433889da 100755 --- a/src/peer_connection.cpp +++ b/src/peer_connection.cpp @@ -1373,18 +1373,25 @@ namespace libtorrent { session_impl::mutex_t::scoped_lock l(m_ses.m_mutex); + INVARIANT_CHECK; + m_outstanding_writing_bytes -= p.length; TORRENT_ASSERT(m_outstanding_writing_bytes >= 0); -#ifdef TORRENT_VERBOSE_LOGGING - (*m_logger) << " *** on_disk_write_complete() " << p.length << "\n"; +#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING) + (*m_ses.m_logger) << time_now_string() << " *** DISK_WRITE_COMPLETE [ p: " + << p.piece << " o: " << p.start << " ]\n"; #endif // in case the outstanding bytes just dropped down // to allow to receive more data setup_receive(); + piece_block block_finished(p.piece, p.start / t->block_size()); + if (ret == -1 || !t) { + if (t->has_picker()) t->picker().abort_download(block_finished); + if (!t) { m_ses.connection_failed(m_socket, remote(), j.str.c_str()); @@ -1406,7 +1413,6 @@ namespace libtorrent TORRENT_ASSERT(p.piece == j.piece); TORRENT_ASSERT(p.start == j.offset); - piece_block block_finished(p.piece, p.start / t->block_size()); picker.mark_as_finished(block_finished, peer_info_struct()); if (t->alerts().should_post(alert::debug)) { @@ -1414,13 +1420,6 @@ namespace libtorrent block_finished.block_index, block_finished.piece_index, "block finished")); } - if (!t->is_seed() && !m_torrent.expired()) - { - // this is a free function defined in policy.cpp - request_a_block(*t, *this); - send_block_requests(); - } - #ifndef NDEBUG try { @@ -1444,6 +1443,14 @@ namespace libtorrent TORRENT_ASSERT(false); } #endif + + if (!t->is_seed() && !m_torrent.expired()) + { + // this is a free function defined in policy.cpp + request_a_block(*t, *this); + send_block_requests(); + } + } // ----------------------------- @@ -2891,6 +2898,35 @@ namespace libtorrent TORRENT_ASSERT(false); } + if (t->has_picker()) + { + // make sure that pieces that have completed the download + // of all their blocks are in the disk io thread's queue + // to be checked. + const std::vector& dl_queue + = t->picker().get_download_queue(); + for (std::vector::const_iterator i = + dl_queue.begin(); i != dl_queue.end(); ++i) + { + const int blocks_per_piece = t->picker().blocks_in_piece(i->index); + + bool complete = true; + for (int j = 0; j < blocks_per_piece; ++j) + { + if (i->info[j].state == piece_picker::block_info::state_finished) + continue; + complete = false; + break; + } + if (complete) + { + disk_io_job ret = m_ses.m_disk_thread.find_job( + &t->filesystem(), -1, i->index); + TORRENT_ASSERT(ret.action == disk_io_job::hash || ret.action == disk_io_job::write); + TORRENT_ASSERT(ret.piece == i->index); + } + } + } // expensive when using checked iterators /* if (t->valid_metadata()) diff --git a/src/torrent.cpp b/src/torrent.cpp index 840e488ba..9a439e68f 100755 --- a/src/torrent.cpp +++ b/src/torrent.cpp @@ -819,6 +819,11 @@ namespace libtorrent { session_impl::mutex_t::scoped_lock l(m_ses.m_mutex); +#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING) + (*m_ses.m_logger) << time_now_string() << " *** PIECE_FINISHED [ p: " + << index << " chk: " << (passed_hash_check?"passed":"failed") << " ]\n"; +#endif + bool was_seed = is_seed(); bool was_finished = m_picker->num_filtered() + num_pieces() == torrent_file().num_pieces(); @@ -2112,7 +2117,8 @@ namespace libtorrent expire_bandwidth(channel, blk - amount); } - // called when torrent is finished (all interested pieces downloaded) + // called when torrent is finished (all interesting + // pieces have been downloaded) void torrent::finished() { INVARIANT_CHECK; @@ -2476,6 +2482,36 @@ namespace libtorrent TORRENT_ASSERT(total_done == 0); } + if (m_picker) + { + // make sure that pieces that have completed the download + // of all their blocks are in the disk io thread's queue + // to be checked. + const std::vector& dl_queue + = m_picker->get_download_queue(); + for (std::vector::const_iterator i = + dl_queue.begin(); i != dl_queue.end(); ++i) + { + const int blocks_per_piece = m_picker->blocks_in_piece(i->index); + + bool complete = true; + for (int j = 0; j < blocks_per_piece; ++j) + { + if (i->info[j].state == piece_picker::block_info::state_finished) + continue; + complete = false; + break; + } + if (complete) + { + disk_io_job ret = m_ses.m_disk_thread.find_job( + m_owning_storage, -1, i->index); + TORRENT_ASSERT(ret.action == disk_io_job::hash || ret.action == disk_io_job::write); + TORRENT_ASSERT(ret.piece == i->index); + } + } + } + // This check is very expensive. TORRENT_ASSERT(m_num_pieces == std::count(m_have_pieces.begin(), m_have_pieces.end(), true)); @@ -2733,7 +2769,7 @@ namespace libtorrent void torrent::async_verify_piece(int piece_index, boost::function const& f) { - INVARIANT_CHECK; +// INVARIANT_CHECK; TORRENT_ASSERT(m_storage); TORRENT_ASSERT(m_storage->refcount() > 0); @@ -2743,6 +2779,9 @@ namespace libtorrent m_storage->async_hash(piece_index, bind(&torrent::on_piece_verified , shared_from_this(), _1, _2, f)); +#ifndef NDEBUG + check_invariant(); +#endif } void torrent::on_piece_verified(int ret, disk_io_job const& j