diff --git a/include/libtorrent/aux_/session_impl.hpp b/include/libtorrent/aux_/session_impl.hpp index 2607b0356..a8363f419 100644 --- a/include/libtorrent/aux_/session_impl.hpp +++ b/include/libtorrent/aux_/session_impl.hpp @@ -94,73 +94,6 @@ namespace libtorrent { struct session_impl; - // this data is shared between the main thread and the - // thread that initialize pieces - struct piece_checker_data - { - piece_checker_data() - : processing(false), progress(0.f), abort(false) {} - - boost::shared_ptr torrent_ptr; - fs::path save_path; - - sha1_hash info_hash; - - void parse_resume_data( - const entry& rd - , const torrent_info& info - , std::string& error); - - std::vector piece_map; - std::vector unfinished_pieces; - std::vector block_info; - std::vector peers; - std::vector banned_peers; - entry resume_data; - - // this is true if this torrent is being processed (checked) - // if it is not being processed, then it can be removed from - // the queue without problems, otherwise the abort flag has - // to be set. - bool processing; - - // is filled in by storage::initialize_pieces() - // and represents the progress. It should be a - // value in the range [0, 1] - float progress; - - // abort defaults to false and is typically - // filled in by torrent_handle when the user - // aborts the torrent - bool abort; - }; - - struct checker_impl: boost::noncopyable - { - checker_impl(session_impl& s): m_ses(s), m_abort(false) {} - void operator()(); - piece_checker_data* find_torrent(const sha1_hash& info_hash); - void remove_torrent(sha1_hash const& info_hash, int options); - -#ifndef NDEBUG - void check_invariant() const; -#endif - - // when the files has been checked - // the torrent is added to the session - session_impl& m_ses; - - mutable boost::mutex m_mutex; - boost::condition m_cond; - - // a list of all torrents that are currently in queue - // or checking their files - std::deque > m_torrents; - std::deque > m_processing; - - bool m_abort; - }; - #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING struct tracker_logger; #endif @@ -284,6 +217,9 @@ namespace libtorrent std::vector get_torrents(); + void check_torrent(boost::shared_ptr const& t); + void done_checking(boost::shared_ptr const& t); + void set_severity_level(alert::severity_t s); std::auto_ptr pop_alert(); @@ -435,6 +371,7 @@ namespace libtorrent tracker_manager m_tracker_manager; torrent_map m_torrents; + std::list > m_queued_for_checking; // this maps sockets to their peer_connection // object. It is the complete list of all connected @@ -625,16 +562,8 @@ namespace libtorrent extension_list_t m_extensions; #endif - // data shared between the main thread - // and the checker thread - checker_impl m_checker_impl; - // the main working thread boost::scoped_ptr m_thread; - - // the thread that calls initialize_pieces() - // on all torrents before they start downloading - boost::scoped_ptr m_checker_thread; }; #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING diff --git a/include/libtorrent/disk_io_thread.hpp b/include/libtorrent/disk_io_thread.hpp index bfdfa59d7..98e8623b6 100644 --- a/include/libtorrent/disk_io_thread.hpp +++ b/include/libtorrent/disk_io_thread.hpp @@ -77,6 +77,8 @@ namespace libtorrent , move_storage , release_files , delete_files + , check_fastresume + , check_files }; action_t action; @@ -154,10 +156,6 @@ namespace libtorrent , boost::function const& f = boost::function()); -#ifndef NDEBUG - disk_io_job find_job(boost::intrusive_ptr s - , int action, int piece) const; -#endif // keep track of the number of bytes in the job queue // at any given time. i.e. the sum of all buffer_size. // this is used to slow down the download global download @@ -268,9 +266,6 @@ namespace libtorrent // number of bytes per block. The BitTorrent // protocol defines the block size to 16 KiB. int m_block_size; -#ifndef NDEBUG - disk_io_job m_current; -#endif #ifdef TORRENT_DISK_STATS std::ofstream m_log; diff --git a/include/libtorrent/piece_picker.hpp b/include/libtorrent/piece_picker.hpp index e3949fc51..581975af9 100755 --- a/include/libtorrent/piece_picker.hpp +++ b/include/libtorrent/piece_picker.hpp @@ -138,10 +138,7 @@ namespace libtorrent // the vector tells which pieces we already have // and which we don't have. - void files_checked( - std::vector const& pieces - , std::vector const& unfinished - , std::vector& verify_pieces); + void init(std::vector const& pieces); // increases the peer count for the given piece // (is used when a HAVE message is received) diff --git a/include/libtorrent/storage.hpp b/include/libtorrent/storage.hpp index 1d9ab0331..26f0202ac 100755 --- a/include/libtorrent/storage.hpp +++ b/include/libtorrent/storage.hpp @@ -119,12 +119,12 @@ namespace libtorrent // if allocate_files is true. // allocate_files is true if allocation mode // is set to full and sparse files are supported + // false return value indicates an error virtual bool initialize(bool allocate_files) = 0; // negative return value indicates an error virtual size_type read(char* buf, int slot, int offset, int size) = 0; - // may throw file_error if storage for slot hasn't been allocated // negative return value indicates an error virtual size_type write(const char* buf, int slot, int offset, int size) = 0; @@ -194,48 +194,23 @@ namespace libtorrent , fs::path const& path , file_pool& fp , disk_io_thread& io - , storage_constructor_type sc); + , storage_constructor_type sc + , storage_mode_t sm); ~piece_manager(); torrent_info const* info() const { return m_info.get(); } - bool check_fastresume(aux::piece_checker_data& d - , std::vector& pieces, int& num_pieces, storage_mode_t storage_mode - , std::string& error_msg); - std::pair check_files(std::vector& pieces - , int& num_pieces, boost::recursive_mutex& mutex, bool& error); - // frees a buffer that was returned from a read operation void free_buffer(char* buf); - void write_resume_data(entry& rd) const - { m_storage->write_resume_data(rd); } + void write_resume_data(entry& rd, std::vector const& have) const; - bool verify_resume_data(entry const& rd, std::string& error) - { -#ifndef NDEBUG - m_resume_data_verified = true; -#endif - return m_storage->verify_resume_data(rd, error); - } - - bool is_allocating() const - { return m_state == state_expand_pieces; } - - void mark_failed(int index); - - std::string const& error() const { return m_storage->error(); } - void clear_error() { m_storage->clear_error(); } - - unsigned long piece_crc( - int slot_index - , int block_size - , piece_picker::block_info const* bi); - - int slot_for(int piece) const; - int piece_for(int slot) const; + void async_check_fastresume(entry const* resume_data + , boost::function const& handler); + void async_check_files(boost::function const& handler); + void async_read( peer_request const& r , boost::function const& handler @@ -249,8 +224,6 @@ namespace libtorrent void async_hash(int piece, boost::function const& f); - fs::path save_path() const; - void async_release_files( boost::function const& handler = boost::function()); @@ -262,12 +235,44 @@ namespace libtorrent void async_move_storage(fs::path const& p , boost::function const& handler); - // fills the vector that maps all allocated - // slots to the piece that is stored (or - // partially stored) there. -2 is the index - // of unassigned pieces and -1 is unallocated - void export_piece_map(std::vector& pieces - , std::vector const& have) const; + enum return_t + { + // return values from check_fastresume and check_files + no_error = 0, + need_full_check = -1, + fatal_disk_error = -2, + }; + + private: + + fs::path save_path() const; + + bool verify_resume_data(entry const& rd, std::string& error) + { return m_storage->verify_resume_data(rd, error); } + + bool is_allocating() const + { return m_state == state_expand_pieces; } + + void mark_failed(int index); + + std::string const& error() const { return m_storage->error(); } + void clear_error() { m_storage->clear_error(); } + + int slot_for(int piece) const; + int piece_for(int slot) const; + + // helper functions for check_dastresume + int check_no_fastresume(std::string& error); + int check_init_storage(std::string& error); + + // if error is set and return value is 'no_error' or 'need_full_check' + // the error message indicates that the fast resume data was rejected + // if 'fatal_disk_error' is returned, the error message indicates what + // when wrong in the disk access + int check_fastresume(entry const& rd, std::string& error); + + // this function returns true if the checking is complete + int check_files(int& current_slot, int& have_piece, std::string& error); bool compact_allocation() const { return m_storage_mode == storage_mode_compact; } @@ -275,19 +280,9 @@ namespace libtorrent #ifndef NDEBUG std::string name() const { return m_info->name(); } #endif - - private: bool allocate_slots(int num_slots, bool abort_on_disk = false); - int identify_data( - const std::vector& piece_data - , int current_slot - , std::vector& have_pieces - , int& num_pieces - , const std::multimap& hash_to_piece - , boost::recursive_mutex& mutex); - size_type read_impl( char* buf , int piece_index @@ -300,8 +295,10 @@ namespace libtorrent , int offset , int size); - bool check_one_piece(std::vector& pieces, int& num_pieces - , boost::recursive_mutex& mutex); + bool check_one_piece(int& have_piece); + int identify_data( + const std::vector& piece_data + , int current_slot); void switch_to_full_mode(); sha1_hash hash_for_piece_impl(int piece); @@ -357,8 +354,6 @@ namespace libtorrent state_none, // the file checking is complete state_finished, - // creating the directories - state_create_files, // checking the files state_full_check, // move pieces to their final position @@ -403,9 +398,6 @@ namespace libtorrent // the piece_manager destructs. This is because // the torrent_info object is owned by the torrent. boost::shared_ptr m_torrent; -#ifndef NDEBUG - bool m_resume_data_verified; -#endif }; } diff --git a/include/libtorrent/torrent.hpp b/include/libtorrent/torrent.hpp index 70d72a4cf..bca4a9202 100755 --- a/include/libtorrent/torrent.hpp +++ b/include/libtorrent/torrent.hpp @@ -98,20 +98,19 @@ namespace libtorrent torrent( aux::session_impl& ses - , aux::checker_impl& checker , boost::intrusive_ptr tf , fs::path const& save_path , tcp::endpoint const& net_interface , storage_mode_t m_storage_mode , int block_size , storage_constructor_type sc - , bool paused); + , bool paused + , entry const& resume_data); // used with metadata-less torrents // (the metadata is downloaded from the peers) torrent( aux::session_impl& ses - , aux::checker_impl& checker , char const* tracker_url , sha1_hash const& info_hash , char const* name @@ -120,7 +119,8 @@ namespace libtorrent , storage_mode_t m_storage_mode , int block_size , storage_constructor_type sc - , bool paused); + , bool paused + , entry const& resume_data); ~torrent(); @@ -142,6 +142,12 @@ namespace libtorrent // it will initialize the storage and the piece-picker void init(); + void on_resume_data_checked(int ret, disk_io_job const& j); + void on_piece_checked(int ret, disk_io_job const& j); + void files_checked(); + void start_checking(); + + storage_mode_t storage_mode() const { return m_storage_mode; } // this will flag the torrent as aborted. The main // loop in session_impl will check for this state // on all torrents once every second, and take @@ -149,19 +155,12 @@ namespace libtorrent void abort(); bool is_aborted() const { return m_abort; } - // returns true if this torrent is being allocated - // by the checker thread. - bool is_allocating() const; - session_settings const& settings() const; aux::session_impl& session() { return m_ses; } void set_sequential_download(bool sd); - bool verify_resume_data(entry const& rd, std::string& error) - { TORRENT_ASSERT(m_storage); return m_storage->verify_resume_data(rd, error); } - void second_tick(stat& accumulator, float tick_interval); // debug purpose only @@ -169,11 +168,6 @@ namespace libtorrent std::string name() const; - bool check_fastresume(aux::piece_checker_data&); - std::pair check_files(bool& error); - void files_checked(std::vector const& - unfinished_pieces); - stat statistics() const { return m_stat; } size_type bytes_left() const; boost::tuples::tuple bytes_done() const; @@ -705,7 +699,6 @@ namespace libtorrent // a back reference to the session // this torrent belongs to. aux::session_impl& m_ses; - aux::checker_impl& m_checker; boost::scoped_ptr m_picker; @@ -768,6 +761,12 @@ namespace libtorrent // determines the storage state for this torrent. storage_mode_t m_storage_mode; + // the state of this torrent (queued, checking, downloading) + torrent_status::state_t m_state; + float m_progress; + + entry m_resume_data; + // defaults to 16 kiB, but can be set by the user // when creating the torrent const int m_default_block_size; diff --git a/include/libtorrent/torrent_handle.hpp b/include/libtorrent/torrent_handle.hpp index 759944f56..50c583dcf 100755 --- a/include/libtorrent/torrent_handle.hpp +++ b/include/libtorrent/torrent_handle.hpp @@ -278,7 +278,7 @@ namespace libtorrent friend struct aux::session_impl; friend class torrent; - torrent_handle(): m_ses(0), m_chk(0), m_info_hash(0) {} + torrent_handle(): m_ses(0), m_info_hash(0) {} void get_peer_info(std::vector& v) const; bool send_chat_message(tcp::endpoint ip, std::string message) const; @@ -418,15 +418,12 @@ namespace libtorrent private: - torrent_handle(aux::session_impl* s, - aux::checker_impl* c, - const sha1_hash& h) + torrent_handle(aux::session_impl* s + , const sha1_hash& h) : m_ses(s) - , m_chk(c) , m_info_hash(h) { TORRENT_ASSERT(m_ses != 0); - TORRENT_ASSERT(m_chk != 0); } #ifndef NDEBUG @@ -434,7 +431,6 @@ namespace libtorrent #endif aux::session_impl* m_ses; - aux::checker_impl* m_chk; sha1_hash m_info_hash; }; diff --git a/src/disk_io_thread.cpp b/src/disk_io_thread.cpp index d4676104e..8adccaf80 100644 --- a/src/disk_io_thread.cpp +++ b/src/disk_io_thread.cpp @@ -73,31 +73,6 @@ namespace libtorrent TORRENT_ASSERT(m_abort == true); } -#ifndef NDEBUG - disk_io_job disk_io_thread::find_job(boost::intrusive_ptr s - , int action, int piece) const - { - mutex_t::scoped_lock l(m_mutex); - for (std::list::const_iterator i = m_jobs.begin(); - i != m_jobs.end(); ++i) - { - if (i->storage != s) - continue; - if ((i->action == action || action == -1) && i->piece == piece) - return *i; - } - if ((m_current.action == action || action == -1) - && m_current.piece == piece) - return m_current; - - disk_io_job ret; - ret.action = (disk_io_job::action_t)-1; - ret.piece = -1; - return ret; - } - -#endif - void disk_io_thread::join() { mutex_t::scoped_lock l(m_mutex); @@ -735,19 +710,14 @@ namespace libtorrent m_log << log_time() << " idle" << std::endl; #endif mutex_t::scoped_lock l(m_mutex); -#ifndef NDEBUG - m_current.action = (disk_io_job::action_t)-1; - m_current.piece = -1; -#endif + while (m_jobs.empty() && !m_abort) m_signal.wait(l); if (m_abort && m_jobs.empty()) return; boost::function handler; handler.swap(m_jobs.front().callback); -#ifndef NDEBUG - m_current = m_jobs.front(); -#endif + disk_io_job j = m_jobs.front(); m_jobs.pop_front(); m_queue_buffer_size -= j.buffer_size; @@ -876,8 +846,8 @@ namespace libtorrent j.storage->clear_error(); break; } - j.str.resize(20); - std::memcpy(&j.str[0], &h[0], 20); + ret = (j.storage->info()->hash_for_piece(j.piece) == h)?0:-1; + if (ret == -1) j.storage->mark_failed(j.piece); break; } case disk_io_job::move_storage: @@ -950,6 +920,47 @@ namespace libtorrent } break; } + case disk_io_job::check_fastresume: + { +#ifdef TORRENT_DISK_STATS + m_log << log_time() << " check fastresume" << std::endl; +#endif + entry const* rd = (entry const*)j.buffer; + TORRENT_ASSERT(rd != 0); + ret = j.storage->check_fastresume(*rd, j.str); + break; + } + case disk_io_job::check_files: + { +#ifdef TORRENT_DISK_STATS + m_log << log_time() << " check files" << std::endl; +#endif + int piece_size = j.storage->info()->piece_length(); + for (int processed = 0; processed < 4 * 1024 * 1024; processed += piece_size) + { + ret = j.storage->check_files(j.piece, j.offset, j.str); + +#ifndef BOOST_NO_EXCEPTIONS + try { +#endif + TORRENT_ASSERT(handler); + if (handler && ret == piece_manager::need_full_check) + m_ios.post(bind(handler, ret, j)); +#ifndef BOOST_NO_EXCEPTIONS + } catch (std::exception&) {} +#endif + if (ret != piece_manager::need_full_check) break; + } + // if the check is not done, add it at the end of the job queue + if (ret == piece_manager::need_full_check) + { + mutex_t::scoped_lock l(m_mutex); + m_jobs.push_back(j); + m_jobs.back().callback.swap(handler); + continue; + } + break; + } } } #ifndef BOOST_NO_EXCEPTIONS @@ -973,12 +984,6 @@ namespace libtorrent #ifndef BOOST_NO_EXCEPTIONS } catch (std::exception&) {} #endif - - -#ifndef NDEBUG - m_current.storage = 0; - m_current.callback.clear(); -#endif } TORRENT_ASSERT(false); } diff --git a/src/piece_picker.cpp b/src/piece_picker.cpp index 736f78937..cdad7425e 100755 --- a/src/piece_picker.cpp +++ b/src/piece_picker.cpp @@ -122,10 +122,7 @@ namespace libtorrent } // pieces is a bitmask with the pieces we have - void piece_picker::files_checked( - std::vector const& pieces - , std::vector const& unfinished - , std::vector& verify_pieces) + void piece_picker::init(std::vector const& pieces) { TORRENT_PIECE_PICKER_INVARIANT_CHECK; #ifndef NDEBUG @@ -153,25 +150,6 @@ namespace libtorrent p.index = 0; } } - - // if we have fast resume info - // use it - if (!unfinished.empty()) - { - for (std::vector::const_iterator i - = unfinished.begin(); i != unfinished.end(); ++i) - { - for (int j = 0; j < m_blocks_per_piece; ++j) - { - if (i->info[j].state == block_info::state_finished) - mark_as_finished(piece_block(i->index, j), 0); - } - if (is_piece_finished(i->index)) - { - verify_pieces.push_back(i->index); - } - } - } } void piece_picker::piece_info(int index, piece_picker::downloading_piece& st) const diff --git a/src/session_impl.cpp b/src/session_impl.cpp index 8dd0a3f8a..89cc122e0 100755 --- a/src/session_impl.cpp +++ b/src/session_impl.cpp @@ -116,362 +116,8 @@ namespace detail } - } namespace aux { - // This is the checker thread - // it is looping in an infinite loop - // until the session is aborted. It will - // normally just block in a wait() call, - // waiting for a signal from session that - // there's a new torrent to check. - - void checker_impl::operator()() - { - eh_initializer(); - // if we're currently performing a full file check, - // this is the torrent being processed - boost::shared_ptr processing; - boost::shared_ptr t; - for (;;) - { - // temporary torrent used while checking fastresume data - t.reset(); - { - boost::mutex::scoped_lock l(m_mutex); - - INVARIANT_CHECK; - - // if the job queue is empty and - // we shouldn't abort - // wait for a signal - while (m_torrents.empty() && !m_abort && !processing) - m_cond.wait(l); - - if (m_abort) - { - // no lock is needed here, because the main thread - // has already been shut down by now - processing.reset(); - t.reset(); - std::for_each(m_torrents.begin(), m_torrents.end() - , boost::bind(&torrent::abort - , boost::bind(&shared_ptr::get - , boost::bind(&piece_checker_data::torrent_ptr, _1)))); - m_torrents.clear(); - std::for_each(m_processing.begin(), m_processing.end() - , boost::bind(&torrent::abort - , boost::bind(&shared_ptr::get - , boost::bind(&piece_checker_data::torrent_ptr, _1)))); - m_processing.clear(); - return; - } - - if (!m_torrents.empty()) - { - t = m_torrents.front(); - if (t->abort) - { - // make sure the locking order is - // consistent to avoid dead locks - // we need to lock the session because closing - // torrents assume to have access to it - l.unlock(); - session_impl::mutex_t::scoped_lock l2(m_ses.m_mutex); - l.lock(); - - t->torrent_ptr->abort(); - m_torrents.pop_front(); - continue; - } - } - } - - if (t) - { - std::string error_msg; - t->parse_resume_data(t->resume_data, t->torrent_ptr->torrent_file() - , error_msg); - - // lock the session to add the new torrent - session_impl::mutex_t::scoped_lock l(m_ses.m_mutex); - - if (!error_msg.empty() && m_ses.m_alerts.should_post(alert::warning)) - { - m_ses.m_alerts.post_alert(fastresume_rejected_alert( - t->torrent_ptr->get_handle() - , error_msg)); -#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING) - (*m_ses.m_logger) << "fastresume data for " - << t->torrent_ptr->torrent_file().name() << " rejected: " - << error_msg << "\n"; -#endif - } - - mutex::scoped_lock l2(m_mutex); - - if (m_torrents.empty() || m_torrents.front() != t) - { - // this means the torrent was removed right after it was - // added. Abort the checking. - t.reset(); - continue; - } - - // clear the resume data now that it has been used - // (the fast resume data is now parsed and stored in t) - t->resume_data = entry(); - bool up_to_date = t->torrent_ptr->check_fastresume(*t); - - if (up_to_date) - { - INVARIANT_CHECK; - - TORRENT_ASSERT(!m_torrents.empty()); - TORRENT_ASSERT(m_torrents.front() == t); - - t->torrent_ptr->files_checked(t->unfinished_pieces); - m_torrents.pop_front(); - - // we cannot add the torrent if the session is aborted. - if (!m_ses.is_aborted()) - { - m_ses.m_torrents.insert(std::make_pair(t->info_hash, t->torrent_ptr)); - if (m_ses.m_alerts.should_post(alert::info)) - { - m_ses.m_alerts.post_alert(torrent_checked_alert( - processing->torrent_ptr->get_handle() - , "torrent finished checking")); - } - if (t->torrent_ptr->is_seed() && m_ses.m_alerts.should_post(alert::info)) - { - m_ses.m_alerts.post_alert(torrent_finished_alert( - t->torrent_ptr->get_handle() - , "torrent is complete")); - } - - peer_id id; - std::fill(id.begin(), id.end(), 0); - for (std::vector::const_iterator i = t->peers.begin(); - i != t->peers.end(); ++i) - { - t->torrent_ptr->get_policy().peer_from_tracker(*i, id - , peer_info::resume_data, 0); - } - - for (std::vector::const_iterator i = t->banned_peers.begin(); - i != t->banned_peers.end(); ++i) - { - policy::peer* p = t->torrent_ptr->get_policy().peer_from_tracker(*i, id - , peer_info::resume_data, 0); - if (p) p->banned = true; - } - } - else - { - t->torrent_ptr->abort(); - } - t.reset(); - continue; - } - - l.unlock(); - - // move the torrent from - // m_torrents to m_processing - TORRENT_ASSERT(m_torrents.front() == t); - - m_torrents.pop_front(); - m_processing.push_back(t); - if (!processing) - { - processing = t; - processing->processing = true; - t.reset(); - } - } - if (!processing) continue; - - TORRENT_ASSERT(processing); - - bool finished = false; - bool error = false; - float progress = 0.f; - boost::tie(finished, progress) = processing->torrent_ptr->check_files(error); - - if (error) - { - // This will happen if the storage fails to initialize - session_impl::mutex_t::scoped_lock l(m_ses.m_mutex); - mutex::scoped_lock l2(m_mutex); - - if (!m_processing.empty() - && m_processing.front() == processing) - m_processing.pop_front(); - processing.reset(); - if (!m_processing.empty()) - { - processing = m_processing.front(); - processing->processing = true; - } - continue; - } - - { - mutex::scoped_lock l2(m_mutex); - - INVARIANT_CHECK; - - processing->progress = progress; - if (processing->abort) - { - TORRENT_ASSERT(!m_processing.empty()); - TORRENT_ASSERT(m_processing.front() == processing); - m_processing.pop_front(); - - // make sure the lock order is correct - l2.unlock(); - session_impl::mutex_t::scoped_lock l(m_ses.m_mutex); - l2.lock(); - processing->torrent_ptr->abort(); - - processing.reset(); - if (!m_processing.empty()) - { - processing = m_processing.front(); - processing->processing = true; - } - continue; - } - } - if (finished) - { - // lock the session to add the new torrent - session_impl::mutex_t::scoped_lock l(m_ses.m_mutex); - mutex::scoped_lock l2(m_mutex); - - INVARIANT_CHECK; - - TORRENT_ASSERT(!m_processing.empty()); - TORRENT_ASSERT(m_processing.front() == processing); - - // TODO: factor out the adding of torrents to the session - // and to the checker thread to avoid duplicating the - // check for abortion. - if (!m_ses.is_aborted()) - { - processing->torrent_ptr->files_checked(processing->unfinished_pieces); - m_ses.m_torrents.insert(std::make_pair( - processing->info_hash, processing->torrent_ptr)); - if (m_ses.m_alerts.should_post(alert::info)) - { - m_ses.m_alerts.post_alert(torrent_checked_alert( - processing->torrent_ptr->get_handle() - , "torrent finished checking")); - } - if (processing->torrent_ptr->is_seed() - && m_ses.m_alerts.should_post(alert::info)) - { - m_ses.m_alerts.post_alert(torrent_finished_alert( - processing->torrent_ptr->get_handle() - , "torrent is complete")); - } - - peer_id id; - std::fill(id.begin(), id.end(), 0); - for (std::vector::const_iterator i = processing->peers.begin(); - i != processing->peers.end(); ++i) - { - processing->torrent_ptr->get_policy().peer_from_tracker(*i, id - , peer_info::resume_data, 0); - } - - for (std::vector::const_iterator i = processing->banned_peers.begin(); - i != processing->banned_peers.end(); ++i) - { - policy::peer* p = processing->torrent_ptr->get_policy().peer_from_tracker(*i, id - , peer_info::resume_data, 0); - if (p) p->banned = true; - } - } - else - { - processing->torrent_ptr->abort(); - } - processing.reset(); - m_processing.pop_front(); - if (!m_processing.empty()) - { - processing = m_processing.front(); - processing->processing = true; - } - } - } - } - - aux::piece_checker_data* checker_impl::find_torrent(sha1_hash const& info_hash) - { - INVARIANT_CHECK; - for (std::deque >::iterator i - = m_torrents.begin(); i != m_torrents.end(); ++i) - { - if ((*i)->info_hash == info_hash) return i->get(); - } - for (std::deque >::iterator i - = m_processing.begin(); i != m_processing.end(); ++i) - { - if ((*i)->info_hash == info_hash) return i->get(); - } - - return 0; - } - - void checker_impl::remove_torrent(sha1_hash const& info_hash, int options) - { - INVARIANT_CHECK; - for (std::deque >::iterator i - = m_torrents.begin(); i != m_torrents.end(); ++i) - { - if ((*i)->info_hash == info_hash) - { - TORRENT_ASSERT((*i)->processing == false); - if (options & session::delete_files) - (*i)->torrent_ptr->delete_files(); - m_torrents.erase(i); - return; - } - } - for (std::deque >::iterator i - = m_processing.begin(); i != m_processing.end(); ++i) - { - if ((*i)->info_hash == info_hash) - { - TORRENT_ASSERT((*i)->processing == false); - if (options & session::delete_files) - (*i)->torrent_ptr->delete_files(); - m_processing.erase(i); - return; - } - } - - TORRENT_ASSERT(false); - } - -#ifndef NDEBUG - void checker_impl::check_invariant() const - { - for (std::deque >::const_iterator i - = m_torrents.begin(); i != m_torrents.end(); ++i) - { - TORRENT_ASSERT(*i); - TORRENT_ASSERT((*i)->torrent_ptr); - } - for (std::deque >::const_iterator i - = m_processing.begin(); i != m_processing.end(); ++i) - { - TORRENT_ASSERT(*i); - TORRENT_ASSERT((*i)->torrent_ptr); - } - } -#endif +} +namespace aux { struct seed_random_generator { @@ -524,7 +170,6 @@ namespace detail #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING , m_logpath(logpath) #endif - , m_checker_impl(*this) { #ifdef WIN32 // windows XP has a limit on the number of @@ -596,7 +241,6 @@ namespace detail bind(&session_impl::second_tick, this, _1)); m_thread.reset(new boost::thread(boost::ref(*this))); - m_checker_thread.reset(new boost::thread(boost::ref(m_checker_impl))); } #ifndef TORRENT_DISABLE_EXTENSIONS @@ -711,10 +355,6 @@ namespace detail m_download_channel.close(); m_upload_channel.close(); - - mutex::scoped_lock l2(m_checker_impl.m_mutex); - // abort the checker thread - m_checker_impl.m_abort = true; } void session_impl::set_port_filter(port_filter const& f) @@ -1077,25 +717,6 @@ namespace detail #endif }; -/* - namespace - { - struct compare_peer_ptr - { - bool operator()(peer_connection const* lhs - , intrusive_ptr const& rhs) - { - return lhs < rhs.get(); - } - - bool operator()(intrusive_ptr const& lhs - , peer_connection const* rhs) - { - return lhs.get() < rhs; - } - }; - } -*/ void session_impl::close_connection(peer_connection const* p , char const* message) { @@ -1647,40 +1268,21 @@ namespace detail std::vector session_impl::get_torrents() { mutex_t::scoped_lock l(m_mutex); - mutex::scoped_lock l2(m_checker_impl.m_mutex); std::vector ret; - for (std::deque >::iterator i - = m_checker_impl.m_torrents.begin() - , end(m_checker_impl.m_torrents.end()); i != end; ++i) - { - if ((*i)->abort) continue; - ret.push_back(torrent_handle(this, &m_checker_impl - , (*i)->info_hash)); - } - - for (std::deque >::iterator i - = m_checker_impl.m_processing.begin() - , end(m_checker_impl.m_processing.end()); i != end; ++i) - { - if ((*i)->abort) continue; - ret.push_back(torrent_handle(this, &m_checker_impl - , (*i)->info_hash)); - } for (session_impl::torrent_map::iterator i = m_torrents.begin(), end(m_torrents.end()); i != end; ++i) { if (i->second->is_aborted()) continue; - ret.push_back(torrent_handle(this, &m_checker_impl - , i->first)); + ret.push_back(torrent_handle(this, i->first)); } return ret; } torrent_handle session_impl::find_torrent_handle(sha1_hash const& info_hash) { - return torrent_handle(this, &m_checker_impl, info_hash); + return torrent_handle(this, info_hash); } torrent_handle session_impl::add_torrent( @@ -1699,7 +1301,6 @@ namespace detail // lock the session and the checker thread (the order is important!) mutex_t::scoped_lock l(m_mutex); - mutex::scoped_lock l2(m_checker_impl.m_mutex); // INVARIANT_CHECK; @@ -1710,17 +1311,13 @@ namespace detail if (!find_torrent(ti->info_hash()).expired()) throw duplicate_torrent(); - // is the torrent currently being checked? - if (m_checker_impl.find_torrent(ti->info_hash())) - throw duplicate_torrent(); - // create the torrent and the data associated with // the checker thread and store it before starting // the thread boost::shared_ptr torrent_ptr( - new torrent(*this, m_checker_impl, ti, save_path + new torrent(*this, ti, save_path , m_listen_interface, storage_mode, 16 * 1024 - , sc, paused)); + , sc, paused, resume_data)); torrent_ptr->start(); #ifndef TORRENT_DISABLE_EXTENSIONS @@ -1732,13 +1329,6 @@ namespace detail } #endif - boost::shared_ptr d( - new aux::piece_checker_data); - d->torrent_ptr = torrent_ptr; - d->save_path = save_path; - d->info_hash = ti->info_hash(); - d->resume_data = resume_data; - #ifndef TORRENT_DISABLE_DHT if (m_dht) { @@ -1750,13 +1340,23 @@ namespace detail } #endif - // add the torrent to the queue to be checked - m_checker_impl.m_torrents.push_back(d); - // and notify the thread that it got another - // job in its queue - m_checker_impl.m_cond.notify_one(); + m_torrents.insert(std::make_pair(ti->info_hash(), torrent_ptr)); - return torrent_handle(this, &m_checker_impl, ti->info_hash()); + return torrent_handle(this, ti->info_hash()); + } + + void session_impl::check_torrent(boost::shared_ptr const& t) + { + if (m_queued_for_checking.empty()) t->start_checking(); + m_queued_for_checking.push_back(t); + } + + void session_impl::done_checking(boost::shared_ptr const& t) + { + TORRENT_ASSERT(m_queued_for_checking.front() == t); + m_queued_for_checking.pop_front(); + if (!m_queued_for_checking.empty()) + m_queued_for_checking.front()->start_checking(); } torrent_handle session_impl::add_torrent( @@ -1764,7 +1364,7 @@ namespace detail , sha1_hash const& info_hash , char const* name , fs::path const& save_path - , entry const& + , entry const& resume_data , storage_mode_t storage_mode , storage_constructor_type sc , bool paused @@ -1773,14 +1373,6 @@ namespace detail // TODO: support resume data in this case TORRENT_ASSERT(!save_path.empty()); - { - // lock the checker_thread - mutex::scoped_lock l(m_checker_impl.m_mutex); - - // is the torrent currently being checked? - if (m_checker_impl.find_torrent(info_hash)) - throw duplicate_torrent(); - } // lock the session session_impl::mutex_t::scoped_lock l(m_mutex); @@ -1789,7 +1381,11 @@ namespace detail // is the torrent already active? if (!find_torrent(info_hash).expired()) +#ifndef BOOST_NO_EXCEPTIONS throw duplicate_torrent(); +#else + return torrent_handle(); +#endif // you cannot add new torrents to a session that is closing down TORRENT_ASSERT(!is_aborted()); @@ -1798,9 +1394,9 @@ namespace detail // the checker thread and store it before starting // the thread boost::shared_ptr torrent_ptr( - new torrent(*this, m_checker_impl, tracker_url, info_hash, name + new torrent(*this, tracker_url, info_hash, name , save_path, m_listen_interface, storage_mode, 16 * 1024 - , sc, paused)); + , sc, paused, resume_data)); torrent_ptr->start(); #ifndef TORRENT_DISABLE_EXTENSIONS @@ -1812,16 +1408,14 @@ namespace detail } #endif - m_torrents.insert( - std::make_pair(info_hash, torrent_ptr)).first; + m_torrents.insert(std::make_pair(info_hash, torrent_ptr)); - return torrent_handle(this, &m_checker_impl, info_hash); + return torrent_handle(this, info_hash); } void session_impl::remove_torrent(const torrent_handle& h, int options) { if (h.m_ses != this) return; - TORRENT_ASSERT(h.m_chk == &m_checker_impl || h.m_chk == 0); TORRENT_ASSERT(h.m_ses != 0); mutex_t::scoped_lock l(m_mutex); @@ -1871,19 +1465,6 @@ namespace detail TORRENT_ASSERT(m_torrents.find(i_hash) == m_torrents.end()); return; } - - if (h.m_chk) - { - mutex::scoped_lock l(m_checker_impl.m_mutex); - - aux::piece_checker_data* d = m_checker_impl.find_torrent(h.m_info_hash); - if (d != 0) - { - if (d->processing) d->abort = true; - else m_checker_impl.remove_torrent(h.m_info_hash, options); - return; - } - } } bool session_impl::listen_on( @@ -2182,31 +1763,6 @@ namespace detail TORRENT_ASSERT(m_torrents.empty()); - // it's important that the main thread is closed completely before - // the checker thread is terminated. Because all the connections - // have to be closed and removed from the torrents before they - // can be destructed. (because the weak pointers in the - // peer_connections will be invalidated when the torrents are - // destructed and then the invariant will be broken). - - { - mutex::scoped_lock l(m_checker_impl.m_mutex); - // abort the checker thread - m_checker_impl.m_abort = true; - - // abort the currently checking torrent - if (!m_checker_impl.m_torrents.empty()) - { - m_checker_impl.m_torrents.front()->abort = true; - } - m_checker_impl.m_cond.notify_one(); - } - -#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING) - (*m_logger) << time_now_string() << " waiting for checker thread\n"; -#endif - m_checker_thread->join(); - #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING) (*m_logger) << time_now_string() << " waiting for disk io thread\n"; #endif @@ -2476,210 +2032,5 @@ namespace detail } #endif - void piece_checker_data::parse_resume_data( - const entry& resume_data - , const torrent_info& info - , std::string& error) - { - // if we don't have any resume data, return - if (resume_data.type() == entry::undefined_t) return; - - entry rd = resume_data; - - try - { - if (rd["file-format"].string() != "libtorrent resume file") - { - error = "missing file format tag"; - return; - } - - if (rd["file-version"].integer() > 1) - { - error = "incompatible file version " - + boost::lexical_cast(rd["file-version"].integer()); - return; - } - - // verify info_hash - sha1_hash hash = rd["info-hash"].string(); - if (hash != info.info_hash()) - { - error = "mismatching info-hash: " + boost::lexical_cast(hash); - return; - } - - // the peers - - if (entry* peers_entry = rd.find_key("peers")) - { - entry::list_type& peer_list = peers_entry->list(); - - std::vector tmp_peers; - tmp_peers.reserve(peer_list.size()); - for (entry::list_type::iterator i = peer_list.begin(); - i != peer_list.end(); ++i) - { - tcp::endpoint a( - address::from_string((*i)["ip"].string()) - , (unsigned short)(*i)["port"].integer()); - tmp_peers.push_back(a); - } - - peers.swap(tmp_peers); - } - - if (entry* banned_peers_entry = rd.find_key("banned_peers")) - { - entry::list_type& peer_list = banned_peers_entry->list(); - - std::vector tmp_peers; - tmp_peers.reserve(peer_list.size()); - for (entry::list_type::iterator i = peer_list.begin(); - i != peer_list.end(); ++i) - { - tcp::endpoint a( - address::from_string((*i)["ip"].string()) - , (unsigned short)(*i)["port"].integer()); - tmp_peers.push_back(a); - } - - banned_peers.swap(tmp_peers); - } - - // read piece map - const entry::list_type& slots = rd["slots"].list(); - if ((int)slots.size() > info.num_pieces()) - { - error = "file has more slots than torrent (slots: " - + boost::lexical_cast(slots.size()) + " size: " - + boost::lexical_cast(info.num_pieces()) + " )"; - return; - } - - std::vector tmp_pieces; - tmp_pieces.reserve(slots.size()); - for (entry::list_type::const_iterator i = slots.begin(); - i != slots.end(); ++i) - { - int index = (int)i->integer(); - if (index >= info.num_pieces() || index < -2) - { - error = "too high index number in slot map (index: " - + boost::lexical_cast(index) + " size: " - + boost::lexical_cast(info.num_pieces()) + ")"; - return; - } - tmp_pieces.push_back(index); - } - - // only bother to check the partial pieces if we have the same block size - // as in the fast resume data. If the blocksize has changed, then throw - // away all partial pieces. - std::vector tmp_unfinished; - int num_blocks_per_piece = (int)rd["blocks per piece"].integer(); - if (num_blocks_per_piece == info.piece_length() / torrent_ptr->block_size()) - { - // the unfinished pieces - - entry::list_type& unfinished = rd["unfinished"].list(); - int unfinished_size = int(unfinished.size()); - block_info.resize(num_blocks_per_piece * unfinished_size); - tmp_unfinished.reserve(unfinished_size); - int index = 0; - for (entry::list_type::iterator i = unfinished.begin(); - i != unfinished.end(); ++i, ++index) - { - piece_picker::downloading_piece p; - p.info = &block_info[index * num_blocks_per_piece]; - p.index = (int)(*i)["piece"].integer(); - if (p.index < 0 || p.index >= info.num_pieces()) - { - error = "invalid piece index in unfinished piece list (index: " - + boost::lexical_cast(p.index) + " size: " - + boost::lexical_cast(info.num_pieces()) + ")"; - return; - } - - const std::string& bitmask = (*i)["bitmask"].string(); - - const int num_bitmask_bytes = (std::max)(num_blocks_per_piece / 8, 1); - if ((int)bitmask.size() != num_bitmask_bytes) - { - error = "invalid size of bitmask (" + boost::lexical_cast(bitmask.size()) + ")"; - return; - } - for (int j = 0; j < num_bitmask_bytes; ++j) - { - unsigned char bits = bitmask[j]; - int num_bits = (std::min)(num_blocks_per_piece - j*8, 8); - for (int k = 0; k < num_bits; ++k) - { - const int bit = j * 8 + k; - if (bits & (1 << k)) - { - p.info[bit].state = piece_picker::block_info::state_finished; - ++p.finished; - } - } - } - - if (p.finished == 0) continue; - - std::vector::iterator slot_iter - = std::find(tmp_pieces.begin(), tmp_pieces.end(), p.index); - if (slot_iter == tmp_pieces.end()) - { - // this piece is marked as unfinished - // but doesn't have any storage - error = "piece " + boost::lexical_cast(p.index) + " is " - "marked as unfinished, but doesn't have any storage"; - return; - } - - TORRENT_ASSERT(*slot_iter == p.index); - int slot_index = static_cast(slot_iter - tmp_pieces.begin()); - const entry* ad = i->find_key("adler32"); - - if (ad && ad->type() == entry::int_t) - { - unsigned long adler - = torrent_ptr->filesystem().piece_crc( - slot_index - , torrent_ptr->block_size() - , p.info); - - // crc's didn't match, don't use the resume data - if (ad->integer() != entry::integer_type(adler)) - { - error = "checksum mismatch on piece " - + boost::lexical_cast(p.index); - return; - } - } - - tmp_unfinished.push_back(p); - } - } - - if (!torrent_ptr->verify_resume_data(rd, error)) - return; - - piece_map.swap(tmp_pieces); - unfinished_pieces.swap(tmp_unfinished); - } - catch (invalid_encoding&) - { - return; - } - catch (type_error&) - { - return; - } - catch (file_error&) - { - return; - } - } }} diff --git a/src/storage.cpp b/src/storage.cpp index 89df4a4d3..f57af370b 100755 --- a/src/storage.cpp +++ b/src/storage.cpp @@ -462,7 +462,12 @@ namespace libtorrent #endif file(m_save_path / file_iter->path, file::out); #ifndef BOOST_NO_EXCEPTIONS - } catch (std::exception&) {} + } + catch (std::exception& e) + { + m_error = e.what(); + return true; + } #endif continue; } @@ -480,7 +485,12 @@ namespace libtorrent f->set_size(file_iter->size); } #ifndef BOOST_NO_EXCEPTIONS - } catch (std::exception&) {} + } + catch (std::exception& e) + { + m_error = e.what(); + return true; + } #endif } // close files that were opened in write mode @@ -1094,9 +1104,10 @@ namespace libtorrent , fs::path const& save_path , file_pool& fp , disk_io_thread& io - , storage_constructor_type sc) + , storage_constructor_type sc + , storage_mode_t sm) : m_storage(sc(ti, save_path, fp)) - , m_storage_mode(storage_mode_sparse) + , m_storage_mode(sm) , m_info(ti) , m_save_path(complete(save_path)) , m_state(state_none) @@ -1107,9 +1118,6 @@ namespace libtorrent , m_io_thread(io) , m_torrent(torrent) { -#ifndef NDEBUG - m_resume_data_verified = false; -#endif } piece_manager::~piece_manager() @@ -1149,6 +1157,26 @@ namespace libtorrent m_io_thread.add_job(j, handler); } + void piece_manager::async_check_fastresume(entry const* resume_data + , boost::function const& handler) + { + TORRENT_ASSERT(resume_data != 0); + disk_io_job j; + j.storage = this; + j.action = disk_io_job::check_fastresume; + j.buffer = (char*)resume_data; + m_io_thread.add_job(j, handler); + } + + void piece_manager::async_check_files( + boost::function const& handler) + { + disk_io_job j; + j.storage = this; + j.action = disk_io_job::check_files; + m_io_thread.add_job(j, handler); + } + void piece_manager::async_read( peer_request const& r , boost::function const& handler @@ -1238,45 +1266,44 @@ namespace libtorrent return false; } - void piece_manager::export_piece_map( - std::vector& p, std::vector const& have) const + void piece_manager::write_resume_data(entry& rd + , std::vector const& have) const { boost::recursive_mutex::scoped_lock lock(m_mutex); INVARIANT_CHECK; + m_storage->write_resume_data(rd); + + entry::list_type& slots = rd["slots"].list(); if (m_storage_mode == storage_mode_compact) { - p.clear(); - p.reserve(m_info->num_pieces()); + slots.clear(); std::vector::const_reverse_iterator last; for (last = m_slot_to_piece.rbegin(); last != m_slot_to_piece.rend(); ++last) { - if (*last != unallocated && have[*last]) break; + if (*last != unallocated) break; } for (std::vector::const_iterator i = m_slot_to_piece.begin(); i != last.base(); ++i) { - p.push_back((*i >= 0 && have[*i]) ? *i : unassigned); + slots.push_back((*i >= 0) ? *i : unassigned); } } else { - p.reserve(m_info->num_pieces()); for (int i = 0; i < m_info->num_pieces(); ++i) { - p.push_back(have[i] ? i : unassigned); + slots.push_back(have[i] ? i : unassigned); } } } void piece_manager::mark_failed(int piece_index) { - boost::recursive_mutex::scoped_lock lock(m_mutex); - INVARIANT_CHECK; if (m_storage_mode != storage_mode_compact) return; @@ -1290,43 +1317,6 @@ namespace libtorrent m_free_slots.push_back(slot_index); } - unsigned long piece_manager::piece_crc( - int slot_index - , int block_size - , piece_picker::block_info const* bi) - { - TORRENT_ASSERT(slot_index >= 0); - TORRENT_ASSERT(slot_index < m_info->num_pieces()); - TORRENT_ASSERT(block_size > 0); - - adler32_crc crc; - std::vector buf(block_size); - int num_blocks = static_cast(m_info->piece_size(slot_index)) / block_size; - int last_block_size = static_cast(m_info->piece_size(slot_index)) % block_size; - if (last_block_size == 0) last_block_size = block_size; - - for (int i = 0; i < num_blocks-1; ++i) - { - if (bi[i].state != piece_picker::block_info::state_finished) continue; - m_storage->read( - &buf[0] - , slot_index - , i * block_size - , block_size); - crc.update(&buf[0], block_size); - } - if (num_blocks > 0 && bi[num_blocks - 1].state == piece_picker::block_info::state_finished) - { - m_storage->read( - &buf[0] - , slot_index - , block_size * (num_blocks - 1) - , last_block_size); - crc.update(&buf[0], last_block_size); - } - return crc.final(); - } - size_type piece_manager::read_impl( char* buf , int piece_index @@ -1382,16 +1372,10 @@ namespace libtorrent int piece_manager::identify_data( const std::vector& piece_data - , int current_slot - , std::vector& have_pieces - , int& num_pieces - , const std::multimap& hash_to_piece - , boost::recursive_mutex& mutex) + , int current_slot) { // INVARIANT_CHECK; - TORRENT_ASSERT((int)have_pieces.size() == m_info->num_pieces()); - const int piece_size = static_cast(m_info->piece_length()); const int last_piece_size = static_cast(m_info->piece_size( m_info->num_pieces() - 1)); @@ -1421,8 +1405,8 @@ namespace libtorrent map_iter end2; // makes the lookups for the small digest and the large digest - boost::tie(begin1, end1) = hash_to_piece.equal_range(small_hash); - boost::tie(begin2, end2) = hash_to_piece.equal_range(large_hash); + boost::tie(begin1, end1) = m_hash_to_piece.equal_range(small_hash); + boost::tie(begin2, end2) = m_hash_to_piece.equal_range(large_hash); // copy all potential piece indices into this vector std::vector matching_pieces; @@ -1448,15 +1432,11 @@ namespace libtorrent // we will assume that the piece is in the right place const int piece_index = current_slot; - // lock because we're writing to have_pieces - boost::recursive_mutex::scoped_lock l(mutex); - - if (have_pieces[piece_index]) + int other_slot = m_piece_to_slot[piece_index]; + if (other_slot >= 0) { // we have already found a piece with // this index. - int other_slot = m_piece_to_slot[piece_index]; - TORRENT_ASSERT(other_slot >= 0); // take one of the other matching pieces // that hasn't already been assigned @@ -1464,18 +1444,15 @@ namespace libtorrent for (std::vector::iterator i = matching_pieces.begin(); i != matching_pieces.end(); ++i) { - if (have_pieces[*i] || *i == piece_index) continue; + if (m_piece_to_slot[*i] >= 0 || *i == piece_index) continue; other_piece = *i; break; } if (other_piece >= 0) { // replace the old slot with 'other_piece' - TORRENT_ASSERT(have_pieces[other_piece] == false); - have_pieces[other_piece] = true; m_slot_to_piece[other_slot] = other_piece; m_piece_to_slot[other_piece] = other_slot; - ++num_pieces; } else { @@ -1491,19 +1468,9 @@ namespace libtorrent TORRENT_ASSERT(m_piece_to_slot[piece_index] != current_slot); TORRENT_ASSERT(m_piece_to_slot[piece_index] >= 0); m_piece_to_slot[piece_index] = has_no_slot; -#ifndef NDEBUG - // to make the assert happy, a few lines down - have_pieces[piece_index] = false; -#endif - } - else - { - ++num_pieces; } - TORRENT_ASSERT(have_pieces[piece_index] == false); TORRENT_ASSERT(m_piece_to_slot[piece_index] == has_no_slot); - have_pieces[piece_index] = true; return piece_index; } @@ -1514,21 +1481,14 @@ namespace libtorrent for (std::vector::iterator i = matching_pieces.begin(); i != matching_pieces.end(); ++i) { - if (have_pieces[*i]) continue; + if (m_piece_to_slot[*i] >= 0) continue; free_piece = *i; break; } if (free_piece >= 0) { - // lock because we're writing to have_pieces - boost::recursive_mutex::scoped_lock l(mutex); - - TORRENT_ASSERT(have_pieces[free_piece] == false); TORRENT_ASSERT(m_piece_to_slot[free_piece] == has_no_slot); - have_pieces[free_piece] = true; - ++num_pieces; - return free_piece; } else @@ -1538,15 +1498,92 @@ namespace libtorrent } } + int piece_manager::check_no_fastresume(std::string& error) + { + torrent_info::file_iterator i = m_info->begin_files(true); + torrent_info::file_iterator end = m_info->end_files(true); + + for (; i != end; ++i) + { + bool file_exists = false; + fs::path f = m_save_path / i->path; +#ifndef BOOST_NO_EXCEPTIONS + try + { +#endif +#if defined(_WIN32) && defined(UNICODE) && BOOST_VERSION < 103400 + file_exists = exists_win(f); +#elif defined(_WIN32) && defined(UNICODE) + fs::wpath wf = safe_convert(f.string()); + file_exists = exists(wf); +#else + file_exists = exists(f); +#endif +#ifndef BOOST_NO_EXCEPTIONS + } + catch (std::exception& e) + { + error = f.string(); + error += ": "; + error += e.what(); + return fatal_disk_error; + } +#endif + if (file_exists && i->size > 0) + { + m_state = state_full_check; + m_piece_to_slot.clear(); + m_piece_to_slot.resize(m_info->num_pieces(), has_no_slot); + m_slot_to_piece.clear(); + m_slot_to_piece.resize(m_info->num_pieces(), unallocated); + return need_full_check; + } + } + + if (m_storage_mode == storage_mode_compact) + { + // in compact mode without checking, we need to + // populate the unallocated list + TORRENT_ASSERT(m_unallocated_slots.empty()); + for (int i = 0, end(m_info->num_pieces()); i < end; ++i) + m_unallocated_slots.push_back(i); + m_piece_to_slot.resize(m_info->num_pieces(), has_no_slot); + m_slot_to_piece.resize(m_info->num_pieces(), unallocated); + } + + return check_init_storage(error); + } + + int piece_manager::check_init_storage(std::string& error) + { + if (m_storage->initialize(m_storage_mode == storage_mode_allocate)) + { + error = m_storage->error(); + m_storage->clear_error(); + return fatal_disk_error; + } + m_state = state_finished; + buffer().swap(m_scratch_buffer); + buffer().swap(m_scratch_buffer2); + if (m_storage_mode != storage_mode_compact) + { + // if no piece is out of place + // since we're in full allocation mode, we can + // forget the piece allocation tables + std::vector().swap(m_piece_to_slot); + std::vector().swap(m_slot_to_piece); + std::vector().swap(m_free_slots); + std::vector().swap(m_unallocated_slots); + } + return no_error; + } + // check if the fastresume data is up to date // if it is, use it and return true. If it // isn't return false and the full check // will be run - bool piece_manager::check_fastresume( - aux::piece_checker_data& data - , std::vector& pieces - , int& num_pieces, storage_mode_t storage_mode - , std::string& error_msg) + int piece_manager::check_fastresume( + entry const& rd, std::string& error) { boost::recursive_mutex::scoped_lock lock(m_mutex); @@ -1554,7 +1591,144 @@ namespace libtorrent TORRENT_ASSERT(m_info->piece_length() > 0); - m_storage_mode = storage_mode; + // if we don't have any resume data, return + if (rd.type() == entry::undefined_t) return check_no_fastresume(error); + + if (rd.type() != entry::dictionary_t) + { + error = "invalid fastresume data (not a bencoded dictionary)"; + return check_no_fastresume(error); + } + + entry const* file_format = rd.find_key("file-format"); + + if (file_format == 0 || file_format->type() != entry::string_t) + { + error = "missing file format tag"; + return check_no_fastresume(error); + } + + if (file_format->string() != "libtorrent resume file") + { + error = "invalid file format tag"; + return check_no_fastresume(error); + } + + entry const* info_hash = rd.find_key("info-hash"); + if (info_hash == 0 || info_hash->type() != entry::string_t) + { + error = "missing info-hash"; + return check_no_fastresume(error); + } + + if (sha1_hash(info_hash->string()) != m_info->info_hash()) + { + error = "mismatching info-hash"; + return check_no_fastresume(error); + } + + int block_size = (std::min)(16 * 1024, m_info->piece_length()); + entry const* blocks_per_piece_ent = rd.find_key("blocks per piece"); + if (blocks_per_piece_ent != 0 + && blocks_per_piece_ent->type() == entry::int_t + && blocks_per_piece_ent->integer() != m_info->piece_length() / block_size) + { + error = "invalid 'blocks per piece' entry"; + return check_no_fastresume(error); + } + + storage_mode_t storage_mode = storage_mode_compact; + entry const* allocation = rd.find_key("allocation"); + if (allocation != 0 + && allocation->type() == entry::string_t + && allocation->string() != "compact") + storage_mode = storage_mode_sparse; + + // read piece map + entry const* slots = rd.find_key("slots"); + if (slots == 0 || slots->type() != entry::list_t) + { + error = "missing slot list"; + return check_no_fastresume(error); + } + + if ((int)slots->list().size() > m_info->num_pieces()) + { + error = "file has more slots than torrent (slots: " + + boost::lexical_cast(slots->list().size()) + " size: " + + boost::lexical_cast(m_info->num_pieces()) + " )"; + return check_no_fastresume(error); + } + + // assume no piece is out of place (i.e. in a slot + // other than the one it should be in) + bool out_of_place = false; + + if (storage_mode == storage_mode_compact) + { + int num_pieces = int(m_info->num_pieces()); + m_slot_to_piece.resize(num_pieces, unallocated); + m_piece_to_slot.resize(num_pieces, has_no_slot); + int slot = 0; + for (entry::list_type::const_iterator i = slots->list().begin(); + i != slots->list().end(); ++i, ++slot) + { + if (i->type() != entry::int_t) + { + error = "invalid entry type in slot list"; + return check_no_fastresume(error); + } + + int index = int(i->integer()); + if (index >= num_pieces || index < -2) + { + error = "too high index number in slot map (index: " + + boost::lexical_cast(index) + " size: " + + boost::lexical_cast(num_pieces) + ")"; + return check_no_fastresume(error); + } + if (index >= 0) + { + m_slot_to_piece[slot] = index; + m_piece_to_slot[index] = slot; + if (slot != index) out_of_place = true; + } + else if (index == unassigned) + { + if (m_storage_mode == storage_mode_compact) + m_free_slots.push_back(slot); + } + else + { + TORRENT_ASSERT(index == unallocated); + if (m_storage_mode == storage_mode_compact) + m_unallocated_slots.push_back(slot); + } + } + } + else + { + int slot = 0; + for (entry::list_type::const_iterator i = slots->list().begin(); + i != slots->list().end(); ++i, ++slot) + { + if (i->type() != entry::int_t) + { + error = "invalid entry type in slot list"; + return check_no_fastresume(error); + } + + int index = int(i->integer()); + if (index != slot) + { + error = "invalid slot index"; + return check_no_fastresume(error); + } + } + } + + if (!m_storage->verify_resume_data(rd, error)) + return check_no_fastresume(error); // This will corrupt the storage // use while debugging to find @@ -1562,123 +1736,44 @@ namespace libtorrent // by check_pieces. // m_storage->shuffle(); - m_piece_to_slot.resize(m_info->num_pieces(), has_no_slot); - m_slot_to_piece.resize(m_info->num_pieces(), unallocated); - TORRENT_ASSERT(m_free_slots.empty()); - TORRENT_ASSERT(m_unallocated_slots.empty()); - - // assume no piece is out of place (i.e. in a slot - // other than the one it should be in) - bool out_of_place = false; - - pieces.clear(); - pieces.resize(m_info->num_pieces(), false); - num_pieces = 0; - - // if we have fast-resume info - // use it instead of doing the actual checking - if (!data.piece_map.empty() - && int(data.piece_map.size()) <= m_info->num_pieces()) + if (m_storage_mode == storage_mode_compact) { - TORRENT_ASSERT(m_resume_data_verified); - for (int i = 0; i < (int)data.piece_map.size(); ++i) + if (m_unallocated_slots.empty()) switch_to_full_mode(); + } + else + { + TORRENT_ASSERT(m_free_slots.empty()); + TORRENT_ASSERT(m_unallocated_slots.empty()); + + if (out_of_place) { - m_slot_to_piece[i] = data.piece_map[i]; - if (data.piece_map[i] >= 0) - { - if (data.piece_map[i] != i) out_of_place = true; - m_piece_to_slot[data.piece_map[i]] = i; - int found_piece = data.piece_map[i]; - - // if the piece is not in the unfinished list - // we have all of it - if (std::find_if( - data.unfinished_pieces.begin() - , data.unfinished_pieces.end() - , piece_picker::has_index(found_piece)) - == data.unfinished_pieces.end()) - { - ++num_pieces; - pieces[found_piece] = true; - } - } - else if (data.piece_map[i] == unassigned) - { - if (m_storage_mode == storage_mode_compact) - m_free_slots.push_back(i); - } - else - { - TORRENT_ASSERT(data.piece_map[i] == unallocated); - if (m_storage_mode == storage_mode_compact) - m_unallocated_slots.push_back(i); - } + // in this case we're in full allocation mode, but + // we're resuming a compact allocated storage + m_state = state_expand_pieces; + m_current_slot = 0; + error = "pieces needs to be reordered"; + return need_full_check; } - - if (m_storage_mode == storage_mode_compact) - { - m_unallocated_slots.reserve(int(m_info->num_pieces() - data.piece_map.size())); - for (int i = (int)data.piece_map.size(); i < (int)m_info->num_pieces(); ++i) - { - m_unallocated_slots.push_back(i); - } - if (m_unallocated_slots.empty()) - { - switch_to_full_mode(); - } - } - else - { - if (!out_of_place) - { - // if no piece is out of place - // since we're in full allocation mode, we can - // forget the piece allocation tables - - std::vector().swap(m_piece_to_slot); - std::vector().swap(m_slot_to_piece); - m_state = state_create_files; - return false; - } - else - { - // in this case we're in full allocation mode, but - // we're resuming a compact allocated storage - m_state = state_expand_pieces; - m_current_slot = 0; - error_msg = "pieces needs to be reordered"; - return false; - } - } - - m_state = state_create_files; - return false; } - m_state = state_full_check; - return false; + return check_init_storage(error); } /* state chart: - check_fastresume() - - | | - | v + check_fastresume() ----------+ + | + | | | + | v v | +------------+ +---------------+ | | full_check |-->| expand_pieses | | +------------+ +---------------+ | | | | v | | +--------------+ | - +->| create_files | <------+ + +->| finished | <------+ +--------------+ - | - v - +----------+ - | finished | - +----------+ */ @@ -1688,23 +1783,12 @@ namespace libtorrent // the second return value is the progress the // file check is at. 0 is nothing done, and 1 // is finished - std::pair piece_manager::check_files( - std::vector& pieces, int& num_pieces, boost::recursive_mutex& mutex - , bool& error) + int piece_manager::check_files(int& current_slot, int& have_piece, std::string& error) { -#ifndef NDEBUG - boost::recursive_mutex::scoped_lock l_(mutex); - TORRENT_ASSERT(num_pieces == std::count(pieces.begin(), pieces.end(), true)); - l_.unlock(); -#endif - - if (m_state == state_create_files) - { - m_storage->initialize(m_storage_mode == storage_mode_allocate); - m_state = state_finished; - return std::make_pair(true, 1.f); - } + TORRENT_ASSERT(int(m_piece_to_slot.size()) == m_info->num_pieces()); + current_slot = m_current_slot; + have_piece = -1; if (m_state == state_expand_pieces) { INVARIANT_CHECK; @@ -1724,8 +1808,9 @@ namespace libtorrent if (m_storage->read(&m_scratch_buffer2[0], piece, 0, piece_size) != piece_size) { - error = true; - return std::make_pair(true, (float)m_current_slot / m_info->num_pieces()); + error = m_storage->error(); + m_storage->clear_error(); + return fatal_disk_error; } m_scratch_piece = other_piece; m_piece_to_slot[other_piece] = unassigned; @@ -1736,8 +1821,9 @@ namespace libtorrent int piece_size = m_info->piece_size(piece); if (m_storage->write(&m_scratch_buffer[0], piece, 0, piece_size) != piece_size) { - error = true; - return std::make_pair(true, (float)m_current_slot / m_info->num_pieces()); + error = m_storage->error(); + m_storage->clear_error(); + return fatal_disk_error; } m_piece_to_slot[piece] = piece; m_slot_to_piece[piece] = piece; @@ -1745,7 +1831,7 @@ namespace libtorrent if (other_piece >= 0) m_scratch_buffer.swap(m_scratch_buffer2); - return std::make_pair(false, (float)m_current_slot / m_info->num_pieces()); + return need_full_check; } while (m_current_slot < m_info->num_pieces() @@ -1757,15 +1843,7 @@ namespace libtorrent if (m_current_slot == m_info->num_pieces()) { - m_state = state_create_files; - buffer().swap(m_scratch_buffer); - buffer().swap(m_scratch_buffer2); - if (m_storage_mode != storage_mode_compact) - { - std::vector().swap(m_piece_to_slot); - std::vector().swap(m_slot_to_piece); - } - return std::make_pair(false, 1.f); + return check_init_storage(error); } int piece = m_slot_to_piece[m_current_slot]; @@ -1782,8 +1860,9 @@ namespace libtorrent int piece_size = m_info->piece_size(other_piece); if (m_storage->read(&m_scratch_buffer[0], piece, 0, piece_size) != piece_size) { - error = true; - return std::make_pair(false, (float)m_current_slot / m_info->num_pieces()); + error = m_storage->error(); + m_storage->clear_error(); + return fatal_disk_error; } m_scratch_piece = other_piece; m_piece_to_slot[other_piece] = unassigned; @@ -1796,12 +1875,12 @@ namespace libtorrent m_slot_to_piece[m_current_slot] = unassigned; m_slot_to_piece[piece] = piece; - return std::make_pair(false, (float)m_current_slot / m_info->num_pieces()); + return need_full_check; } TORRENT_ASSERT(m_state == state_full_check); - bool skip = check_one_piece(pieces, num_pieces, mutex); + bool skip = check_one_piece(have_piece); if (skip) { @@ -1837,6 +1916,7 @@ namespace libtorrent } ++m_current_slot; + current_slot = m_current_slot; if (m_current_slot >= m_info->num_pieces()) { @@ -1856,8 +1936,7 @@ namespace libtorrent std::vector().swap(m_piece_to_slot); std::vector().swap(m_slot_to_piece); - m_state = state_create_files; - return std::make_pair(false, 1.f); + return check_init_storage(error); } else { @@ -1865,47 +1944,37 @@ namespace libtorrent // we're resuming a compact allocated storage m_state = state_expand_pieces; m_current_slot = 0; - return std::make_pair(false, 0.f); + current_slot = m_current_slot; + return need_full_check; } } else if (m_unallocated_slots.empty()) { switch_to_full_mode(); } - m_state = state_create_files; - -#ifndef NDEBUG - boost::recursive_mutex::scoped_lock l(mutex); - TORRENT_ASSERT(num_pieces == std::count(pieces.begin(), pieces.end(), true)); -#endif - return std::make_pair(false, 1.f); + return check_init_storage(error); } - - TORRENT_ASSERT(num_pieces == std::count(pieces.begin(), pieces.end(), true)); - - return std::make_pair(false, (float)m_current_slot / m_info->num_pieces()); + return need_full_check; } - bool piece_manager::check_one_piece(std::vector& pieces, int& num_pieces - , boost::recursive_mutex& mutex) + bool piece_manager::check_one_piece(int& have_piece) { // ------------------------ // DO THE FULL CHECK // ------------------------ + TORRENT_ASSERT(int(m_piece_to_slot.size()) == m_info->num_pieces()); + TORRENT_ASSERT(int(m_slot_to_piece.size()) == m_info->num_pieces()); + TORRENT_ASSERT(have_piece == -1); + // initialization for the full check if (m_hash_to_piece.empty()) { for (int i = 0; i < m_info->num_pieces(); ++i) - { m_hash_to_piece.insert(std::make_pair(m_info->hash_for_piece(i), i)); - } - boost::recursive_mutex::scoped_lock l(mutex); - std::fill(pieces.begin(), pieces.end(), false); - num_pieces = 0; } - m_piece_data.resize(m_info->piece_length()); + m_piece_data.resize(int(m_info->piece_length())); int piece_size = m_info->piece_size(m_current_slot); int num_read = m_storage->read(&m_piece_data[0] , m_current_slot, 0, piece_size); @@ -1914,14 +1983,14 @@ namespace libtorrent if (num_read != piece_size) return true; - int piece_index = identify_data(m_piece_data, m_current_slot - , pieces, num_pieces, m_hash_to_piece, mutex); + int piece_index = identify_data(m_piece_data, m_current_slot); + + if (piece_index >= 0) have_piece = piece_index; if (piece_index != m_current_slot && piece_index >= 0) m_out_of_place = true; - TORRENT_ASSERT(num_pieces == std::count(pieces.begin(), pieces.end(), true)); TORRENT_ASSERT(piece_index == unassigned || piece_index >= 0); const bool this_should_move = piece_index >= 0 && m_slot_to_piece[piece_index] != unallocated; @@ -2148,7 +2217,7 @@ namespace libtorrent if (m_storage_mode != storage_mode_compact) return piece_index; -// INVARIANT_CHECK; + INVARIANT_CHECK; TORRENT_ASSERT(piece_index >= 0); TORRENT_ASSERT(piece_index < (int)m_piece_to_slot.size()); @@ -2263,7 +2332,7 @@ namespace libtorrent boost::recursive_mutex::scoped_lock lock(m_mutex); TORRENT_ASSERT(num_slots > 0); -// INVARIANT_CHECK; + INVARIANT_CHECK; TORRENT_ASSERT(!m_unallocated_slots.empty()); TORRENT_ASSERT(m_storage_mode == storage_mode_compact); diff --git a/src/torrent.cpp b/src/torrent.cpp index 5553b9fc0..b08906141 100755 --- a/src/torrent.cpp +++ b/src/torrent.cpp @@ -150,14 +150,14 @@ namespace libtorrent torrent::torrent( session_impl& ses - , aux::checker_impl& checker , boost::intrusive_ptr tf , fs::path const& save_path , tcp::endpoint const& net_interface , storage_mode_t storage_mode , int block_size , storage_constructor_type sc - , bool paused) + , bool paused + , entry const& resume_data) : m_torrent_file(tf) , m_abort(false) , m_paused(paused) @@ -179,7 +179,6 @@ namespace libtorrent , m_last_dht_announce(time_now() - minutes(15)) #endif , m_ses(ses) - , m_checker(checker) , m_picker(0) , m_trackers(m_torrent_file->trackers()) , m_last_working_tracker(-1) @@ -195,6 +194,9 @@ namespace libtorrent , m_net_interface(net_interface.address(), 0) , m_save_path(complete(save_path)) , m_storage_mode(storage_mode) + , m_state(torrent_status::queued_for_checking) + , m_progress(0.f) + , m_resume_data(resume_data) , m_default_block_size(block_size) , m_connections_initialized(true) , m_settings(ses.settings()) @@ -211,7 +213,6 @@ namespace libtorrent torrent::torrent( session_impl& ses - , aux::checker_impl& checker , char const* tracker_url , sha1_hash const& info_hash , char const* name @@ -220,7 +221,8 @@ namespace libtorrent , storage_mode_t storage_mode , int block_size , storage_constructor_type sc - , bool paused) + , bool paused + , entry const& resume_data) : m_torrent_file(new torrent_info(info_hash)) , m_abort(false) , m_paused(paused) @@ -242,7 +244,6 @@ namespace libtorrent , m_last_dht_announce(time_now() - minutes(15)) #endif , m_ses(ses) - , m_checker(checker) , m_picker(0) , m_last_working_tracker(-1) , m_currently_trying_tracker(0) @@ -257,6 +258,9 @@ namespace libtorrent , m_net_interface(net_interface.address(), 0) , m_save_path(complete(save_path)) , m_storage_mode(storage_mode) + , m_state(torrent_status::queued_for_checking) + , m_progress(0.f) + , m_resume_data(resume_data) , m_default_block_size(block_size) , m_connections_initialized(false) , m_settings(ses.settings()) @@ -399,7 +403,8 @@ namespace libtorrent // the shared_from_this() will create an intentional // cycle of ownership, se the hpp file for description. m_owning_storage = new piece_manager(shared_from_this(), m_torrent_file - , m_save_path, m_ses.m_files, m_ses.m_disk_thread, m_storage_constructor); + , m_save_path, m_ses.m_files, m_ses.m_disk_thread, m_storage_constructor + , m_storage_mode); m_storage = m_owning_storage.get(); m_block_size = calculate_block_size(*m_torrent_file, m_default_block_size); m_picker.reset(new piece_picker( @@ -409,6 +414,233 @@ namespace libtorrent std::vector const& url_seeds = m_torrent_file->url_seeds(); std::copy(url_seeds.begin(), url_seeds.end(), std::inserter(m_web_seeds , m_web_seeds.begin())); + + m_state = torrent_status::queued_for_checking; + + m_storage->async_check_fastresume(&m_resume_data + , bind(&torrent::on_resume_data_checked + , shared_from_this(), _1, _2)); + } + + void torrent::on_resume_data_checked(int ret, disk_io_job const& j) + { + session_impl::mutex_t::scoped_lock l(m_ses.m_mutex); + + if (ret == piece_manager::fatal_disk_error) + { + if (m_ses.m_alerts.should_post(alert::fatal)) + { + m_ses.m_alerts.post_alert(file_error_alert(get_handle(), j.str)); +#if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING + (*m_ses.m_logger) << time_now_string() << ": fatal disk error [" + " error: " << j.str << + " torrent: " << torrent_file().name() << + " ]\n"; +#endif + } + std::fill(m_have_pieces.begin(), m_have_pieces.end(), false); + m_num_pieces = 0; + pause(); + return; + } + + // parse out "peers" from the resume data and add them to the peer list + entry const* peers_entry = m_resume_data.find_key("peers"); + if (peers_entry && peers_entry->type() == entry::list_t) + { + peer_id id; + std::fill(id.begin(), id.end(), 0); + entry::list_type const& peer_list = peers_entry->list(); + + for (entry::list_type::const_iterator i = peer_list.begin(); + i != peer_list.end(); ++i) + { + if (i->type() != entry::dictionary_t) continue; + entry const* ip = i->find_key("ip"); + entry const* port = i->find_key("port"); + if (ip == 0 || port == 0 + || ip->type() != entry::string_t + || port->type() != entry::int_t) + continue; + tcp::endpoint a( + address::from_string(ip->string()) + , (unsigned short)port->integer()); + m_policy.peer_from_tracker(a, id, peer_info::resume_data, 0); + } + } + + // parse out "banned_peers" and add them as banned + entry const* banned_peers_entry = m_resume_data.find_key("banned_peers"); + if (banned_peers_entry != 0 && banned_peers_entry->type() == entry::list_t) + { + peer_id id; + std::fill(id.begin(), id.end(), 0); + entry::list_type const& peer_list = banned_peers_entry->list(); + + for (entry::list_type::const_iterator i = peer_list.begin(); + i != peer_list.end(); ++i) + { + if (i->type() != entry::dictionary_t) continue; + entry const* ip = i->find_key("ip"); + entry const* port = i->find_key("port"); + if (ip == 0 || port == 0 + || ip->type() != entry::string_t + || port->type() != entry::int_t) + continue; + tcp::endpoint a( + address::from_string(ip->string()) + , (unsigned short)port->integer()); + policy::peer* p = m_policy.peer_from_tracker(a, id, peer_info::resume_data, 0); + if (p) p->banned = true; + } + } + + bool fastresume_rejected = !j.str.empty(); + + if (fastresume_rejected && m_ses.m_alerts.should_post(alert::warning)) + { + m_ses.m_alerts.post_alert(fastresume_rejected_alert(get_handle(), j.str)); +#if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING + (*m_ses.m_logger) << "fastresume data for " + << torrent_file().name() << " rejected: " + << j.str << "\n"; +#endif + } + + if (ret == 0) + { + // there are either no files for this torrent + // or the resume_data was accepted + + m_num_pieces = 0; + std::fill(m_have_pieces.begin(), m_have_pieces.end(), false); + if (!fastresume_rejected) + { + // parse slots + entry const* slots_ent = m_resume_data.find_key("slots"); + if (slots_ent != 0 && slots_ent->type() == entry::list_t) + { + entry::list_type const& slots = slots_ent->list(); + + for (entry::list_type::const_iterator i = slots.begin(); + i != slots.end(); ++i) + { + if (i->type() != entry::int_t) continue; + int piece_index = int(i->integer()); + if (piece_index < 0 || piece_index >= torrent_file().num_pieces()) + continue; + m_have_pieces[piece_index] = true; + ++m_num_pieces; + } + } + + // parse unfinished pieces + int num_blocks_per_piece = + static_cast(torrent_file().piece_length()) / block_size(); + + entry const* unfinished_ent = m_resume_data.find_key("unfinished"); + if (unfinished_ent != 0 && unfinished_ent->type() == entry::list_t) + { + entry::list_type const& unfinished = unfinished_ent->list(); + int index = 0; + for (entry::list_type::const_iterator i = unfinished.begin(); + i != unfinished.end(); ++i, ++index) + { + if (i->type() != entry::dictionary_t) continue; + entry const* piece = i->find_key("piece"); + if (piece == 0 || piece->type() != entry::int_t) continue; + int piece_index = int(piece->integer()); + if (piece_index < 0 || piece_index >= torrent_file().num_pieces()) + continue; + + // if this assert is hit, the resume data file was corrupt + TORRENT_ASSERT(m_have_pieces[piece_index]); + m_have_pieces[piece_index] = false; + --m_num_pieces; + + entry const* bitmask_ent = i->find_key("bitmask"); + if (bitmask_ent == 0 || bitmask_ent->type() != entry::string_t) break; + std::string const& bitmask = bitmask_ent->string(); + + const int num_bitmask_bytes = (std::max)(num_blocks_per_piece / 8, 1); + if ((int)bitmask.size() != num_bitmask_bytes) continue; + for (int j = 0; j < num_bitmask_bytes; ++j) + { + unsigned char bits = bitmask[j]; + int num_bits = (std::min)(num_blocks_per_piece - j*8, 8); + for (int k = 0; k < num_bits; ++k) + { + const int bit = j * 8 + k; + if (bits & (1 << k)) + { + m_picker->mark_as_finished(piece_block(piece_index, bit), 0); + if (m_picker->is_piece_finished(piece_index)) + async_verify_piece(piece_index, bind(&torrent::piece_finished + , shared_from_this(), piece_index, _1)); + } + } + } + } + } + } + + files_checked(); + } + else + { + // either the fastresume data was rejected or there are + // some files + m_ses.check_torrent(shared_from_this()); + } + } + + void torrent::start_checking() + { + m_state = torrent_status::checking_files; + + m_storage->async_check_files(bind( + &torrent::on_piece_checked + , shared_from_this(), _1, _2)); + } + + void torrent::on_piece_checked(int ret, disk_io_job const& j) + { + session_impl::mutex_t::scoped_lock l(m_ses.m_mutex); + + if (ret == piece_manager::fatal_disk_error) + { + if (m_ses.m_alerts.should_post(alert::fatal)) + { + m_ses.m_alerts.post_alert(file_error_alert(get_handle(), j.str)); +#if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING + (*m_ses.m_logger) << time_now_string() << ": fatal disk error [" + " error: " << j.str << + " torrent: " << torrent_file().name() << + " ]\n"; +#endif + } + std::fill(m_have_pieces.begin(), m_have_pieces.end(), false); + m_num_pieces = 0; + pause(); + m_ses.done_checking(shared_from_this()); + return; + } + + m_progress = j.piece / float(torrent_file().num_pieces()); + + if (j.offset >= 0 && !m_have_pieces[j.offset]) + { + m_have_pieces[j.offset] = true; + ++m_num_pieces; + } + + // we're not done checking yet + // this handler will be called repeatedly until + // we're done, or encounter a failure + if (ret == piece_manager::need_full_check) return; + + m_ses.done_checking(shared_from_this()); + files_checked(); } void torrent::use_interface(const char* net_interface) @@ -1102,7 +1334,6 @@ namespace libtorrent // that has sent the least number of pieces m_picker->restore_piece(index); TORRENT_ASSERT(m_storage); - m_storage->mark_failed(index); TORRENT_ASSERT(m_have_pieces[index] == false); @@ -2151,31 +2382,14 @@ namespace libtorrent return false; } - init(); - - boost::mutex::scoped_lock(m_checker.m_mutex); - - boost::shared_ptr d( - new aux::piece_checker_data); - d->torrent_ptr = shared_from_this(); - d->save_path = m_save_path; - d->info_hash = m_torrent_file->info_hash(); - // add the torrent to the queue to be checked - m_checker.m_torrents.push_back(d); - typedef session_impl::torrent_map torrent_map; - torrent_map::iterator i = m_ses.m_torrents.find( - m_torrent_file->info_hash()); - TORRENT_ASSERT(i != m_ses.m_torrents.end()); - m_ses.m_torrents.erase(i); - // and notify the thread that it got another - // job in its queue - m_checker.m_cond.notify_one(); - if (m_ses.m_alerts.should_post(alert::info)) { m_ses.m_alerts.post_alert(metadata_received_alert( get_handle(), "metadata successfully received from swarm")); } + + init(); + return true; } @@ -2186,6 +2400,13 @@ namespace libtorrent TORRENT_ASSERT(p != 0); TORRENT_ASSERT(!p->is_local()); + if (m_state == torrent_status::queued_for_checking + || m_state == torrent_status::checking_files) + { + p->disconnect("torrent is not ready to accept peers"); + return; + } + if (m_ses.m_connections.find(p) == m_ses.m_connections.end()) { p->disconnect("peer is not properly constructed"); @@ -2246,7 +2467,9 @@ namespace libtorrent { return int(m_connections.size()) < m_max_connections && m_ses.m_half_open.free_slots() - && !m_paused; + && !m_paused + && m_state != torrent_status::checking_files + && m_state != torrent_status::queued_for_checking; } void torrent::disconnect_all() @@ -2380,6 +2603,8 @@ namespace libtorrent , "torrent has finished downloading")); } + m_state = torrent_status::finished; + // disconnect all seeds // TODO: should disconnect all peers that have the pieces we have // not just seeds @@ -2414,6 +2639,7 @@ namespace libtorrent // make the next tracker request // be a completed-event m_event = tracker_request::completed; + m_state = torrent_status::seeding; force_tracker_request(); } @@ -2478,10 +2704,10 @@ namespace libtorrent } - bool torrent::check_fastresume(aux::piece_checker_data& data) +/* + void torrent::check_fastresume() { INVARIANT_CHECK; - TORRENT_ASSERT(valid_metadata()); bool done = true; try @@ -2492,16 +2718,6 @@ namespace libtorrent done = m_storage->check_fastresume(data, m_have_pieces, m_num_pieces , m_storage_mode, error_msg); - if (!error_msg.empty() && m_ses.m_alerts.should_post(alert::warning)) - { - m_ses.m_alerts.post_alert(fastresume_rejected_alert( - get_handle(), error_msg)); -#if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING - (*m_ses.m_logger) << "fastresume data for " - << torrent_file().name() << " rejected: " - << error_msg << "\n"; -#endif - } } catch (std::exception& e) { @@ -2520,7 +2736,7 @@ namespace libtorrent } return done; } - + std::pair torrent::check_files(bool& error) { TORRENT_ASSERT(m_torrent_file->is_valid()); @@ -2551,30 +2767,21 @@ namespace libtorrent return progress; } - - void torrent::files_checked(std::vector const& - unfinished_pieces) +*/ + void torrent::files_checked() { session_impl::mutex_t::scoped_lock l(m_ses.m_mutex); TORRENT_ASSERT(m_torrent_file->is_valid()); INVARIANT_CHECK; + m_state = torrent_status::connecting_to_tracker; + if (!is_seed()) { - // this is filled in with pieces that needs to be checked - // against its hashes. - std::vector verify_pieces; - m_picker->files_checked(m_have_pieces, unfinished_pieces, verify_pieces); + m_picker->init(m_have_pieces); if (m_sequential_download) picker().sequential_download(m_sequential_download); - while (!verify_pieces.empty()) - { - int piece = verify_pieces.back(); - verify_pieces.pop_back(); - async_verify_piece(piece, bind(&torrent::piece_finished - , shared_from_this(), piece, _1)); - } } #ifndef TORRENT_DISABLE_EXTENSIONS @@ -2593,6 +2800,7 @@ namespace libtorrent if (is_seed()) { + m_state = torrent_status::seeding; m_picker.reset(); if (m_ses.settings().free_torrent_hashes) m_torrent_file->seed_free(); @@ -2634,10 +2842,7 @@ namespace libtorrent fs::path torrent::save_path() const { - if (m_owning_storage.get()) - return m_owning_storage->save_path(); - else - return m_save_path; + return m_save_path; } void torrent::move_storage(fs::path const& save_path) @@ -2674,7 +2879,7 @@ namespace libtorrent torrent_handle torrent::get_handle() const { - return torrent_handle(&m_ses, &m_checker, m_torrent_file->info_hash()); + return torrent_handle(&m_ses, m_torrent_file->info_hash()); } session_settings const& torrent::settings() const @@ -2779,17 +2984,6 @@ namespace libtorrent complete = false; break; } - // this is no longer valid since the completion event - // may be queued in the io service -/* - if (complete && m_files_checked) - { - disk_io_job ret = m_ses.m_disk_thread.find_job( - m_owning_storage, -1, i->index); - TORRENT_ASSERT(ret.action == disk_io_job::hash || ret.action == disk_io_job::write); - TORRENT_ASSERT(ret.piece == i->index); - } -*/ } } @@ -3100,9 +3294,8 @@ namespace libtorrent void torrent::on_piece_verified(int ret, disk_io_job const& j , boost::function f) { - sha1_hash h(j.str); session_impl::mutex_t::scoped_lock l(m_ses.m_mutex); - f(m_torrent_file->hash_for_piece(j.piece) == h); + f(ret == 0); } const tcp::endpoint& torrent::current_tracker() const @@ -3110,9 +3303,6 @@ namespace libtorrent return m_tracker_address; } - bool torrent::is_allocating() const - { return m_owning_storage.get() && m_owning_storage->is_allocating(); } - void torrent::file_progress(std::vector& fp) const { TORRENT_ASSERT(valid_metadata()); @@ -3217,6 +3407,8 @@ namespace libtorrent st.connections_limit = m_max_connections; // if we don't have any metadata, stop here + st.state = m_state; + if (!valid_metadata()) { if (m_got_tracker_response == false) @@ -3224,14 +3416,8 @@ namespace libtorrent else st.state = torrent_status::downloading_metadata; -// TODO: add a progress member to the torrent that will be used in this case -// and that may be set by a plugin -// if (m_metadata_size == 0) st.progress = 0.f; -// else st.progress = (std::min)(1.f, m_metadata_progress / (float)m_metadata_size); - st.progress = 0.f; - + st.progress = m_progress; st.block_size = 0; - return st; } @@ -3258,31 +3444,14 @@ namespace libtorrent TORRENT_ASSERT(st.total_wanted >= st.total_wanted_done); - if (st.total_wanted == 0) st.progress = 1.f; + if (m_state == torrent_status::checking_files) + st.progress = m_progress; + else if (st.total_wanted == 0) st.progress = 1.f; else st.progress = st.total_wanted_done / static_cast(st.total_wanted); st.pieces = &m_have_pieces; st.num_pieces = m_num_pieces; - - if (m_got_tracker_response == false) - { - st.state = torrent_status::connecting_to_tracker; - } - else if (is_seed()) - { - TORRENT_ASSERT(st.total_done == m_torrent_file->total_size()); - st.state = torrent_status::seeding; - } - else if (st.total_wanted_done == st.total_wanted) - { - st.state = torrent_status::finished; - } - else - { - st.state = torrent_status::downloading; - } - st.num_seeds = num_seeds(); if (m_picker.get()) st.distributed_copies = m_picker->distributed_copies(); diff --git a/src/torrent_handle.cpp b/src/torrent_handle.cpp index 9a7dc33df..ee0ecd540 100755 --- a/src/torrent_handle.cpp +++ b/src/torrent_handle.cpp @@ -82,58 +82,46 @@ using libtorrent::aux::session_impl; #define TORRENT_FORWARD(call) \ if (m_ses == 0) return; \ - TORRENT_ASSERT(m_chk); \ session_impl::mutex_t::scoped_lock l1(m_ses->m_mutex); \ - mutex::scoped_lock l2(m_chk->m_mutex); \ - torrent* t = find_torrent(m_ses, m_chk, m_info_hash); \ - if (t == 0) return; \ + boost::shared_ptr t = m_ses->find_torrent(m_info_hash).lock(); \ + if (!t) return; \ t->call #define TORRENT_FORWARD_RETURN(call, def) \ if (m_ses == 0) return def; \ - TORRENT_ASSERT(m_chk); \ session_impl::mutex_t::scoped_lock l1(m_ses->m_mutex); \ - mutex::scoped_lock l2(m_chk->m_mutex); \ - torrent* t = find_torrent(m_ses, m_chk, m_info_hash); \ - if (t == 0) return def; \ + boost::shared_ptr t = m_ses->find_torrent(m_info_hash).lock(); \ + if (!t) return def; \ return t->call #define TORRENT_FORWARD_RETURN2(call, def) \ if (m_ses == 0) return def; \ - TORRENT_ASSERT(m_chk); \ session_impl::mutex_t::scoped_lock l1(m_ses->m_mutex); \ - mutex::scoped_lock l2(m_chk->m_mutex); \ - torrent* t = find_torrent(m_ses, m_chk, m_info_hash); \ - if (t == 0) return def; \ + boost::shared_ptr t = m_ses->find_torrent(m_info_hash).lock(); \ + if (!t) return def; \ t->call #else #define TORRENT_FORWARD(call) \ if (m_ses == 0) throw_invalid_handle(); \ - TORRENT_ASSERT(m_chk); \ session_impl::mutex_t::scoped_lock l1(m_ses->m_mutex); \ - mutex::scoped_lock l2(m_chk->m_mutex); \ - torrent* t = find_torrent(m_ses, m_chk, m_info_hash); \ - if (t == 0) throw_invalid_handle(); \ + boost::shared_ptr t = m_ses->find_torrent(m_info_hash).lock(); \ + if (!t) throw_invalid_handle(); \ t->call #define TORRENT_FORWARD_RETURN(call, def) \ if (m_ses == 0) throw_invalid_handle(); \ - TORRENT_ASSERT(m_chk); \ session_impl::mutex_t::scoped_lock l1(m_ses->m_mutex); \ - mutex::scoped_lock l2(m_chk->m_mutex); \ - torrent* t = find_torrent(m_ses, m_chk, m_info_hash); \ - if (t == 0) return def; \ + boost::shared_ptr t = m_ses->find_torrent(m_info_hash).lock(); \ + if (!t) return def; \ return t->call #define TORRENT_FORWARD_RETURN2(call, def) \ if (m_ses == 0) throw_invalid_handle(); \ - TORRENT_ASSERT(m_chk); \ session_impl::mutex_t::scoped_lock l1(m_ses->m_mutex); \ - mutex::scoped_lock l2(m_chk->m_mutex); \ - torrent* t = find_torrent(m_ses, m_chk, m_info_hash); \ - if (t == 0) return def; \ + boost::shared_ptr t = m_ses->find_torrent(m_info_hash).lock(); \ + if (!t) return def; \ t->call #endif @@ -150,26 +138,12 @@ namespace libtorrent throw invalid_handle(); } #endif - - torrent* find_torrent( - session_impl* ses - , aux::checker_impl* chk - , sha1_hash const& hash) - { - aux::piece_checker_data* d = chk->find_torrent(hash); - if (d != 0) return d->torrent_ptr.get(); - - boost::shared_ptr t = ses->find_torrent(hash).lock(); - if (t) return t.get(); - return 0; - } } #ifndef NDEBUG void torrent_handle::check_invariant() const { - TORRENT_ASSERT((m_ses == 0 && m_chk == 0) || (m_ses != 0 && m_chk != 0)); } #endif @@ -302,30 +276,8 @@ namespace libtorrent #else throw_invalid_handle(); #endif - TORRENT_ASSERT(m_chk); - + session_impl::mutex_t::scoped_lock l(m_ses->m_mutex); - mutex::scoped_lock l2(m_chk->m_mutex); - - aux::piece_checker_data* d = m_chk->find_torrent(m_info_hash); - if (d != 0) - { - torrent_status st = d->torrent_ptr->status(); - - if (d->processing) - { - if (d->torrent_ptr->is_allocating()) - st.state = torrent_status::allocating; - else - st.state = torrent_status::checking_files; - } - else - st.state = torrent_status::queued_for_checking; - st.progress = d->progress; - st.paused = d->torrent_ptr->is_paused(); - return st; - } - boost::shared_ptr t = m_ses->find_torrent(m_info_hash).lock(); if (t) return t->status(); @@ -464,11 +416,9 @@ namespace libtorrent #else if (m_ses == 0) throw_invalid_handle(); #endif - TORRENT_ASSERT(m_chk); session_impl::mutex_t::scoped_lock l1(m_ses->m_mutex); - mutex::scoped_lock l2(m_chk->m_mutex); - torrent* t = find_torrent(m_ses, m_chk, m_info_hash); - if (t == 0 || !t->valid_metadata()) + boost::shared_ptr t = m_ses->find_torrent(m_info_hash).lock(); + if (!t || !t->valid_metadata()) #ifdef BOOST_NO_EXCEPTIONS return empty; #else @@ -481,10 +431,8 @@ namespace libtorrent { INVARIANT_CHECK; if (m_ses == 0) return false; - TORRENT_ASSERT(m_chk); session_impl::mutex_t::scoped_lock l1(m_ses->m_mutex); - mutex::scoped_lock l2(m_chk->m_mutex); - torrent* t = find_torrent(m_ses, m_chk, m_info_hash); + boost::shared_ptr t = m_ses->find_torrent(m_info_hash).lock(); return t; } @@ -498,12 +446,8 @@ namespace libtorrent #else throw_invalid_handle(); #endif - TORRENT_ASSERT(m_chk); - session_impl::mutex_t::scoped_lock l(m_ses->m_mutex); - mutex::scoped_lock l2(m_chk->m_mutex); - - torrent* t = find_torrent(m_ses, m_chk, m_info_hash); + boost::shared_ptr t = m_ses->find_torrent(m_info_hash).lock(); if (!t || !t->valid_metadata()) #ifdef BOOST_NO_EXCEPTIONS return entry(); @@ -518,7 +462,9 @@ namespace libtorrent ret["file-format"] = "libtorrent resume file"; ret["file-version"] = 1; - ret["allocation"] = t->filesystem().compact_allocation()?"compact":"full"; + storage_mode_t sm = t->storage_mode(); + ret["allocation"] = sm == storage_mode_sparse?"sparse" + :sm == storage_mode_allocate?"full":"compact"; const sha1_hash& info_hash = t->torrent_file().info_hash(); ret["info-hash"] = std::string((char*)info_hash.begin(), (char*)info_hash.end()); @@ -569,25 +515,13 @@ namespace libtorrent TORRENT_ASSERT(bits == 8 || j == num_bitmask_bytes - 1); } piece_struct["bitmask"] = bitmask; -/* - TORRENT_ASSERT(t->filesystem().slot_for(i->index) >= 0); - unsigned long adler - = t->filesystem().piece_crc( - t->filesystem().slot_for(i->index) - , t->block_size() - , i->info); - - piece_struct["adler32"] = adler; -*/ // push the struct onto the unfinished-piece list up.push_back(piece_struct); } } std::vector piece_index; - t->filesystem().export_piece_map(piece_index, have_pieces); - entry::list_type& slots = ret["slots"].list(); - std::copy(piece_index.begin(), piece_index.end(), std::back_inserter(slots)); + t->filesystem().write_resume_data(ret, have_pieces); // write local peers @@ -632,8 +566,6 @@ namespace libtorrent peer_list.push_back(peer); } - t->filesystem().write_resume_data(ret); - return ret; } @@ -654,27 +586,16 @@ namespace libtorrent #else throw_invalid_handle(); #endif - TORRENT_ASSERT(m_chk); - session_impl::mutex_t::scoped_lock l(m_ses->m_mutex); boost::shared_ptr t = m_ses->find_torrent(m_info_hash).lock(); if (!t) { - // the torrent is being checked. Add the peer to its - // peer list. The entries in there will be connected - // once the checking is complete. - mutex::scoped_lock l2(m_chk->m_mutex); - - aux::piece_checker_data* d = m_chk->find_torrent(m_info_hash); - if (d == 0) #ifdef BOOST_NO_EXCEPTIONS - return; -#else - throw_invalid_handle(); -#endif - d->peers.push_back(adr); return; +#else + throw_invalid_handle(); +#endif } peer_id id;