From 3b8670626a37dbd2d38ea45cac575f343c71b8b0 Mon Sep 17 00:00:00 2001 From: Arvid Norberg Date: Sun, 10 Jun 2007 20:46:09 +0000 Subject: [PATCH] merged back async_io branch into trunk --- Jamfile | 1 + examples/client_test.cpp | 16 +- include/libtorrent/aux_/session_impl.hpp | 12 +- include/libtorrent/bt_peer_connection.hpp | 2 +- include/libtorrent/debug.hpp | 13 +- include/libtorrent/disk_io_thread.hpp | 119 +++++ include/libtorrent/entry.hpp | 2 + include/libtorrent/file.hpp | 5 +- include/libtorrent/hasher.hpp | 1 + include/libtorrent/intrusive_ptr_base.hpp | 71 +++ include/libtorrent/peer_connection.hpp | 23 +- include/libtorrent/peer_id.hpp | 1 + include/libtorrent/piece_picker.hpp | 27 +- include/libtorrent/policy.hpp | 2 - include/libtorrent/session.hpp | 8 +- include/libtorrent/storage.hpp | 208 ++++++-- include/libtorrent/torrent.hpp | 45 +- include/libtorrent/torrent_handle.hpp | 21 +- include/libtorrent/torrent_info.hpp | 10 +- include/libtorrent/tracker_manager.hpp | 11 +- include/libtorrent/web_peer_connection.hpp | 2 +- src/bt_peer_connection.cpp | 6 +- src/disk_io_thread.cpp | 241 +++++++++ src/entry.cpp | 6 + src/file.cpp | 8 +- src/logger.cpp | 15 +- src/metadata_transfer.cpp | 2 - src/peer_connection.cpp | 251 ++++----- src/piece_picker.cpp | 190 +++++-- src/policy.cpp | 14 +- src/session.cpp | 4 +- src/session_impl.cpp | 16 +- src/storage.cpp | 571 ++++++++------------- src/torrent.cpp | 227 +++++--- src/torrent_handle.cpp | 22 +- src/torrent_info.cpp | 44 +- src/tracker_manager.cpp | 23 - test/setup_transfer.cpp | 21 +- test/setup_transfer.hpp | 3 +- test/test_metadata_extension.cpp | 4 +- test/test_pe_crypto.cpp | 4 +- test/test_piece_picker.cpp | 41 +- test/test_storage.cpp | 41 +- test/test_swarm.cpp | 2 +- test/test_web_seed.cpp | 7 +- 45 files changed, 1532 insertions(+), 831 deletions(-) create mode 100644 include/libtorrent/disk_io_thread.hpp create mode 100644 include/libtorrent/intrusive_ptr_base.hpp create mode 100644 src/disk_io_thread.cpp diff --git a/Jamfile b/Jamfile index 33755e186..3b3a8659c 100755 --- a/Jamfile +++ b/Jamfile @@ -86,6 +86,7 @@ SOURCES = logger file_pool lsd + disk_io_thread ; KADEMLIA_SOURCES = diff --git a/examples/client_test.cpp b/examples/client_test.cpp index 240f1785a..67838b09c 100644 --- a/examples/client_test.cpp +++ b/examples/client_test.cpp @@ -42,6 +42,7 @@ POSSIBILITY OF SUCH DAMAGE. #endif #include +#include #include #include #include @@ -899,6 +900,12 @@ int main(int ac, char* av[]) { event_string << "(" << p->ip << ") " << p->msg(); } + else if (torrent_alert* p = dynamic_cast(a.get())) + { + std::string name; + try { name = p->handle.name(); } catch (std::exception&) {} + event_string << "(" << name << ") " << p->msg(); + } else { event_string << a->msg(); @@ -1006,13 +1013,15 @@ int main(int ac, char* av[]) if (print_downloads && s.state != torrent_status::seeding) { h.get_download_queue(queue); + std::sort(queue.begin(), queue.end(), bind(&partial_piece_info::piece_index, _1) + < bind(&partial_piece_info::piece_index, _2)); for (std::vector::iterator i = queue.begin(); i != queue.end(); ++i) { out << to_string(i->piece_index, 4) << ": ["; for (int j = 0; j < i->blocks_in_piece; ++j) { - int index = peer_index(i->peer[j], peers); + int index = peer_index(i->blocks[j].peer, peers); char str[] = "+"; bool currently_downloading = false; if (index >= 0) @@ -1025,8 +1034,9 @@ int main(int ac, char* av[]) #ifdef ANSI_TERMINAL_COLORS if (currently_downloading) out << esc("33;7") << str << esc("0"); - else if (i->finished_blocks[j]) out << esc("32;7") << str << esc("0"); - else if (i->requested_blocks[j]) out << str; + else if (i->blocks[j].state == block_info::finished) out << esc("32;7") << str << esc("0"); + else if (i->blocks[j].state == block_info::writing) out << esc("35;7") << str << esc("0"); + else if (i->blocks[j].state == block_info::requested) out << str; else out << " "; #else if (i->finished_blocks[j]) out << "#"; diff --git a/include/libtorrent/aux_/session_impl.hpp b/include/libtorrent/aux_/session_impl.hpp index 4759d4a18..c270d9ffc 100644 --- a/include/libtorrent/aux_/session_impl.hpp +++ b/include/libtorrent/aux_/session_impl.hpp @@ -82,10 +82,13 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/lsd.hpp" #include "libtorrent/socket_type.hpp" #include "libtorrent/connection_queue.hpp" +#include "libtorrent/disk_io_thread.hpp" namespace libtorrent { + namespace fs = boost::filesystem; + namespace aux { struct session_impl; @@ -98,7 +101,7 @@ namespace libtorrent : processing(false), progress(0.f), abort(false) {} boost::shared_ptr torrent_ptr; - boost::filesystem::path save_path; + fs::path save_path; sha1_hash info_hash; @@ -238,7 +241,7 @@ namespace libtorrent torrent_handle add_torrent( torrent_info const& ti - , boost::filesystem::path const& save_path + , fs::path const& save_path , entry const& resume_data , bool compact_mode , int block_size @@ -248,7 +251,7 @@ namespace libtorrent char const* tracker_url , sha1_hash const& info_hash , char const* name - , boost::filesystem::path const& save_path + , fs::path const& save_path , entry const& resume_data , bool compact_mode , int block_size @@ -334,6 +337,9 @@ namespace libtorrent // when they are destructed. file_pool m_files; + // handles disk io requests asynchronously + disk_io_thread m_disk_thread; + // this is a list of half-open tcp connections // (only outgoing connections) // this has to be one of the last diff --git a/include/libtorrent/bt_peer_connection.hpp b/include/libtorrent/bt_peer_connection.hpp index 2a5000011..0f5e58e9d 100755 --- a/include/libtorrent/bt_peer_connection.hpp +++ b/include/libtorrent/bt_peer_connection.hpp @@ -192,7 +192,7 @@ namespace libtorrent void write_cancel(peer_request const& r); void write_bitfield(std::vector const& bitfield); void write_have(int index); - void write_piece(peer_request const& r); + void write_piece(peer_request const& r, char const* buffer); void write_handshake(); #ifndef TORRENT_DISABLE_EXTENSIONS void write_extensions(); diff --git a/include/libtorrent/debug.hpp b/include/libtorrent/debug.hpp index 39c4b0222..436b695f6 100755 --- a/include/libtorrent/debug.hpp +++ b/include/libtorrent/debug.hpp @@ -53,15 +53,16 @@ POSSIBILITY OF SUCH DAMAGE. namespace libtorrent { // DEBUG API + + namespace fs = boost::filesystem; struct logger { - logger(boost::filesystem::path const& filename, int instance, bool append = true) + logger(fs::path const& filename, int instance, bool append = true) { - using namespace boost::filesystem; - path dir(complete("libtorrent_logs" + boost::lexical_cast(instance))); - if (!exists(dir)) create_directories(dir); - m_file.open(dir / filename, std::ios_base::out | (append ? std::ios_base::app : std::ios_base::out)); + fs::path dir(fs::complete("libtorrent_logs" + boost::lexical_cast(instance))); + if (!fs::exists(dir)) fs::create_directories(dir); + m_file.open((dir / filename).string().c_str(), std::ios_base::out | (append ? std::ios_base::app : std::ios_base::out)); *this << "\n\n\n*** starting log ***\n"; } @@ -73,7 +74,7 @@ namespace libtorrent return *this; } - boost::filesystem::ofstream m_file; + std::ofstream m_file; }; } diff --git a/include/libtorrent/disk_io_thread.hpp b/include/libtorrent/disk_io_thread.hpp new file mode 100644 index 000000000..aff0930e4 --- /dev/null +++ b/include/libtorrent/disk_io_thread.hpp @@ -0,0 +1,119 @@ +/* + +Copyright (c) 2007, Arvid Norberg +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in + the documentation and/or other materials provided with the distribution. + * Neither the name of the author nor the names of its + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. + +*/ + +#include "libtorrent/storage.hpp" +#include +#include +#include +#include +#include +#include +#include "libtorrent/config.hpp" + +namespace libtorrent +{ + + struct disk_io_job + { + disk_io_job() + : action(read) + , buffer(0) + , buffer_size(0) + , piece(0) + , offset(0) + {} + + enum action_t + { + read + , write + , hash + , move_storage + , release_files + }; + + action_t action; + + char* buffer; + size_type buffer_size; + boost::intrusive_ptr storage; + // arguments used for read and write + int piece, offset; + // used for move_storage. On errors, this is set + // to the error message + std::string str; + + // this is called when operation completes + boost::function callback; + }; + + // this is a singleton consisting of the thread and a queue + // of disk io jobs + struct disk_io_thread : boost::noncopyable + { + disk_io_thread(int block_size = 16 * 1024); + ~disk_io_thread(); + + // aborts read operations + void stop(boost::intrusive_ptr s); + void add_job(disk_io_job const& j + , boost::function const& f + = boost::function()); + + // keep track of the number of bytes in the job queue + // at any given time. i.e. the sum of all buffer_size. + // this is used to slow down the download global download + // speed when the queue buffer size is too big. + size_type queue_buffer_size() const + { return m_queue_buffer_size; } + + void operator()(); + + char* allocate_buffer(); + + private: + + boost::mutex m_mutex; + boost::condition m_signal; + bool m_abort; + std::deque m_jobs; + size_type m_queue_buffer_size; + + // memory pool for read and write operations + boost::pool<> m_pool; + + // thread for performing blocking disk io operations + boost::thread m_disk_io_thread; + }; + +} + diff --git a/include/libtorrent/entry.hpp b/include/libtorrent/entry.hpp index 31a78b972..a1eba5324 100755 --- a/include/libtorrent/entry.hpp +++ b/include/libtorrent/entry.hpp @@ -155,6 +155,8 @@ namespace libtorrent dictionary_type& dict(); const dictionary_type& dict() const; + void swap(entry& e); + // these functions requires that the entry // is a dictionary, otherwise they will throw entry& operator[](char const* key); diff --git a/include/libtorrent/file.hpp b/include/libtorrent/file.hpp index d11496b28..bd0d03539 100755 --- a/include/libtorrent/file.hpp +++ b/include/libtorrent/file.hpp @@ -52,6 +52,7 @@ POSSIBILITY OF SUCH DAMAGE. namespace libtorrent { + namespace fs = boost::filesystem; struct TORRENT_EXPORT file_error: std::runtime_error { @@ -105,10 +106,10 @@ namespace libtorrent static const open_mode out; file(); - file(boost::filesystem::path const& p, open_mode m); + file(fs::path const& p, open_mode m); ~file(); - void open(boost::filesystem::path const& p, open_mode m); + void open(fs::path const& p, open_mode m); void close(); void set_size(size_type size); diff --git a/include/libtorrent/hasher.hpp b/include/libtorrent/hasher.hpp index d1743527a..932f2b100 100755 --- a/include/libtorrent/hasher.hpp +++ b/include/libtorrent/hasher.hpp @@ -119,3 +119,4 @@ namespace libtorrent } #endif // TORRENT_HASHER_HPP_INCLUDED + diff --git a/include/libtorrent/intrusive_ptr_base.hpp b/include/libtorrent/intrusive_ptr_base.hpp new file mode 100644 index 000000000..a432bc350 --- /dev/null +++ b/include/libtorrent/intrusive_ptr_base.hpp @@ -0,0 +1,71 @@ +/* + +Copyright (c) 2007, Arvid Norberg +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in + the documentation and/or other materials provided with the distribution. + * Neither the name of the author nor the names of its + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. + +*/ + +#ifndef TORRENT_INTRUSIVE_PTR_BASE +#define TORRENT_INTRUSIVE_PTR_BASE + +#include +#include +#include "libtorrent/config.hpp" + +namespace libtorrent +{ + template + struct intrusive_ptr_base + { + friend void intrusive_ptr_add_ref(intrusive_ptr_base const* s) + { + assert(s->m_refs >= 0); + assert(s != 0); + ++s->m_refs; + } + + friend void intrusive_ptr_release(intrusive_ptr_base const* s) + { + assert(s->m_refs > 0); + assert(s != 0); + if (--s->m_refs == 0) + delete static_cast(s); + } + + int refcount() const { return m_refs; } + + intrusive_ptr_base(): m_refs(0) {} + private: + // reference counter for intrusive_ptr + mutable boost::detail::atomic_count m_refs; + }; + +} + +#endif + diff --git a/include/libtorrent/peer_connection.hpp b/include/libtorrent/peer_connection.hpp index 64460661f..54b912ebe 100755 --- a/include/libtorrent/peer_connection.hpp +++ b/include/libtorrent/peer_connection.hpp @@ -51,7 +51,6 @@ POSSIBILITY OF SUCH DAMAGE. #include #include #include -#include #ifdef _MSC_VER #pragma warning(pop) @@ -73,6 +72,7 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/bandwidth_manager.hpp" #include "libtorrent/policy.hpp" #include "libtorrent/socket_type.hpp" +#include "libtorrent/intrusive_ptr_base.hpp" // TODO: each time a block is 'taken over' // from another peer. That peer must be given @@ -88,20 +88,16 @@ namespace libtorrent struct session_impl; } - TORRENT_EXPORT void intrusive_ptr_add_ref(peer_connection const*); - TORRENT_EXPORT void intrusive_ptr_release(peer_connection const*); - struct TORRENT_EXPORT protocol_error: std::runtime_error { protocol_error(const std::string& msg): std::runtime_error(msg) {}; }; class TORRENT_EXPORT peer_connection - : public boost::noncopyable + : public intrusive_ptr_base + , public boost::noncopyable { friend class invariant_access; - friend TORRENT_EXPORT void intrusive_ptr_add_ref(peer_connection const*); - friend TORRENT_EXPORT void intrusive_ptr_release(peer_connection const*); public: enum channels @@ -378,7 +374,7 @@ namespace libtorrent virtual void write_cancel(peer_request const& r) = 0; virtual void write_have(int index) = 0; virtual void write_keepalive() = 0; - virtual void write_piece(peer_request const& r) = 0; + virtual void write_piece(peer_request const& r, char const* buffer) = 0; virtual void on_connected() = 0; virtual void on_tick() {} @@ -474,6 +470,9 @@ namespace libtorrent private: void fill_send_buffer(); + void on_disk_read_complete(int ret, disk_io_job const& j, peer_request r); + void on_disk_write_complete(int ret, disk_io_job const& j + , peer_request r, boost::shared_ptr t); // the timeout in seconds int m_timeout; @@ -503,6 +502,11 @@ namespace libtorrent // (m_current_send_buffer + 1) % 2 is the // buffer we're currently waiting for. int m_current_send_buffer; + + // the number of bytes we are currently reading + // from disk, that will be added to the send + // buffer as soon as they complete + int m_reading_bytes; // if the sending buffer doesn't finish in one send // operation, this is the position within that buffer @@ -660,9 +664,6 @@ namespace libtorrent // the left-over bandwidth (suitable for web seeds). bool m_non_prioritized; - // reference counter for intrusive_ptr - mutable boost::detail::atomic_count m_refs; - int m_upload_limit; int m_download_limit; diff --git a/include/libtorrent/peer_id.hpp b/include/libtorrent/peer_id.hpp index 23a5eb463..b66c1d4bc 100755 --- a/include/libtorrent/peer_id.hpp +++ b/include/libtorrent/peer_id.hpp @@ -58,6 +58,7 @@ namespace libtorrent big_number(std::string const& s) { + assert(s.size() >= 20); int sl = int(s.size()) < size ? int(s.size()) : size; std::memcpy(m_number, &s[0], sl); } diff --git a/include/libtorrent/piece_picker.hpp b/include/libtorrent/piece_picker.hpp index ecbe2df4b..c52521d0a 100755 --- a/include/libtorrent/piece_picker.hpp +++ b/include/libtorrent/piece_picker.hpp @@ -90,14 +90,14 @@ namespace libtorrent struct block_info { - block_info(): num_downloads(0), requested(0), finished(0) {} + block_info(): num_downloads(0), state(state_none) {} // the peer this block was requested or // downloaded from tcp::endpoint peer; // the number of times this block has been downloaded unsigned num_downloads:14; - unsigned requested:1; - unsigned finished:1; + enum { state_none, state_requested, state_writing, state_finished }; + unsigned state:2; }; // the peers that are downloading this piece @@ -109,7 +109,7 @@ namespace libtorrent struct downloading_piece { - downloading_piece(): finished(0), requested(0) {} + downloading_piece(): finished(0), writing(0), requested(0) {} piece_state_t state; // the index of the piece @@ -118,8 +118,12 @@ namespace libtorrent // this is a pointer into the m_block_info // vector owned by the piece_picker block_info* info; - boost::uint16_t finished; - boost::uint16_t requested; + // the number of blocks in the finished state + boost::int16_t finished; + // the number of blocks in the writing state + boost::int16_t writing; + // the number of blocks in the requested state + boost::int16_t requested; }; piece_picker(int blocks_per_piece @@ -132,8 +136,9 @@ namespace libtorrent // the vector tells which pieces we already have // and which we don't have. void files_checked( - const std::vector& pieces - , const std::vector& unfinished); + std::vector const& pieces + , std::vector const& unfinished + , std::vector& verify_pieces); // increases the peer count for the given piece // (is used when a HAVE or BITFIELD message is received) @@ -190,12 +195,16 @@ namespace libtorrent // returns true if any client is currently downloading this // piece-block, or if it's queued for downloading by some client // or if it already has been successfully downloaded - bool is_downloading(piece_block block) const; + bool is_requested(piece_block block) const; + // returns true if the block has been downloaded + bool is_downloaded(piece_block block) const; + // returns true if the block has been downloaded and written to disk bool is_finished(piece_block block) const; // marks this piece-block as queued for downloading void mark_as_downloading(piece_block block, tcp::endpoint const& peer , piece_state_t s); + void mark_as_writing(piece_block block, tcp::endpoint const& peer); void mark_as_finished(piece_block block, tcp::endpoint const& peer); // if a piece had a hash-failure, it must be restored and diff --git a/include/libtorrent/policy.hpp b/include/libtorrent/policy.hpp index 935c193f2..fffc3bfa2 100755 --- a/include/libtorrent/policy.hpp +++ b/include/libtorrent/policy.hpp @@ -99,8 +99,6 @@ namespace libtorrent void piece_finished(int index, bool successfully_verified); - void block_finished(peer_connection& c, piece_block b); - // the peer choked us void choked(peer_connection& c); diff --git a/include/libtorrent/session.hpp b/include/libtorrent/session.hpp index f70dea73c..52ec62cdb 100755 --- a/include/libtorrent/session.hpp +++ b/include/libtorrent/session.hpp @@ -75,6 +75,8 @@ namespace libtorrent class port_filter; class connection_queue; + namespace fs = boost::filesystem; + namespace aux { // workaround for microsofts @@ -136,7 +138,7 @@ namespace libtorrent // all torrent_handles must be destructed before the session is destructed! torrent_handle add_torrent( torrent_info const& ti - , boost::filesystem::path const& save_path + , fs::path const& save_path , entry const& resume_data = entry() , bool compact_mode = true , int block_size = 16 * 1024 @@ -145,7 +147,7 @@ namespace libtorrent // TODO: deprecated, this is for backwards compatibility only torrent_handle add_torrent( entry const& e - , boost::filesystem::path const& save_path + , fs::path const& save_path , entry const& resume_data = entry() , bool compact_mode = true , int block_size = 16 * 1024 @@ -159,7 +161,7 @@ namespace libtorrent char const* tracker_url , sha1_hash const& info_hash , char const* name - , boost::filesystem::path const& save_path + , fs::path const& save_path , entry const& resume_data = entry() , bool compact_mode = true , int block_size = 16 * 1024 diff --git a/include/libtorrent/storage.hpp b/include/libtorrent/storage.hpp index ccb17cb71..70b64d8f8 100755 --- a/include/libtorrent/storage.hpp +++ b/include/libtorrent/storage.hpp @@ -41,9 +41,9 @@ POSSIBILITY OF SUCH DAMAGE. #endif #include -#include #include #include +#include #ifdef _MSC_VER #pragma warning(pop) @@ -52,6 +52,9 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/torrent_info.hpp" #include "libtorrent/piece_picker.hpp" +#include "libtorrent/intrusive_ptr_base.hpp" +#include "libtorrent/peer_request.hpp" +#include "libtorrent/hasher.hpp" #include "libtorrent/config.hpp" namespace libtorrent @@ -61,8 +64,11 @@ namespace libtorrent struct piece_checker_data; } + namespace fs = boost::filesystem; + class session; struct file_pool; + struct disk_io_job; #if defined(_WIN32) && defined(UNICODE) @@ -72,11 +78,11 @@ namespace libtorrent TORRENT_EXPORT std::vector > get_filesizes( torrent_info const& t - , boost::filesystem::path p); + , fs::path p); TORRENT_EXPORT bool match_filesizes( torrent_info const& t - , boost::filesystem::path p + , fs::path p , std::vector > const& sizes , bool compact_mode , std::string* error = 0); @@ -89,6 +95,15 @@ namespace libtorrent std::string m_msg; }; + struct TORRENT_EXPORT partial_hash + { + partial_hash(): offset(0) {} + // the number of bytes in the piece that has been hashed + int offset; + // the sha-1 context + hasher h; + }; + struct TORRENT_EXPORT storage_interface { // create directories and set file sizes @@ -103,7 +118,7 @@ namespace libtorrent // may throw file_error if storage for slot hasn't been allocated virtual void write(const char* buf, int slot, int offset, int size) = 0; - virtual bool move_storage(boost::filesystem::path save_path) = 0; + virtual bool move_storage(fs::path save_path) = 0; // verify storage dependent fast resume entries virtual bool verify_resume_data(entry& rd, std::string& error) = 0; @@ -121,6 +136,9 @@ namespace libtorrent // in slot3 and the data in slot3 in slot1 virtual void swap_slots3(int slot1, int slot2, int slot3) = 0; + // returns the sha1-hash for the data at the given slot + virtual sha1_hash hash_for_slot(int slot, partial_hash& h, int piece_size) = 0; + // this will close all open files that are opened for // writing. This is called when a torrent has finished // downloading. @@ -129,24 +147,32 @@ namespace libtorrent }; typedef storage_interface* (&storage_constructor_type)( - torrent_info const&, boost::filesystem::path const& + torrent_info const&, fs::path const& , file_pool&); TORRENT_EXPORT storage_interface* default_storage_constructor(torrent_info const& ti - , boost::filesystem::path const& path, file_pool& fp); + , fs::path const& path, file_pool& fp); // returns true if the filesystem the path relies on supports // sparse files or automatic zero filling of files. - TORRENT_EXPORT bool supports_sparse_files(boost::filesystem::path const& p); + TORRENT_EXPORT bool supports_sparse_files(fs::path const& p); - class TORRENT_EXPORT piece_manager : boost::noncopyable + struct disk_io_thread; + + class TORRENT_EXPORT piece_manager + : public intrusive_ptr_base + , boost::noncopyable { + friend class invariant_access; + friend struct disk_io_thread; public: piece_manager( - const torrent_info& info - , const boost::filesystem::path& path + boost::shared_ptr const& torrent + , torrent_info const& ti + , fs::path const& path , file_pool& fp + , disk_io_thread& io , storage_constructor_type sc); ~piece_manager(); @@ -156,35 +182,36 @@ namespace libtorrent std::pair check_files(std::vector& pieces , int& num_pieces, boost::recursive_mutex& mutex); - void release_files(); - void write_resume_data(entry& rd) const; bool verify_resume_data(entry& rd, std::string& error); - bool is_allocating() const; - bool allocate_slots(int num_slots, bool abort_on_disk = false); + bool is_allocating() const + { return m_state == state_allocating; } + void mark_failed(int index); unsigned long piece_crc( int slot_index , int block_size , piece_picker::block_info const* bi); + int slot_for_piece(int piece_index) const; - size_type read( - char* buf - , int piece_index - , int offset - , int size); + void async_read( + peer_request const& r + , boost::function const& handler); - void write( - const char* buf - , int piece_index - , int offset - , int size); + void async_write( + peer_request const& r + , char const* buffer + , boost::function const& f); - boost::filesystem::path const& save_path() const; - bool move_storage(boost::filesystem::path const&); + void async_hash(int piece, boost::function const& f); + + fs::path save_path() const; + + void async_release_files(); + void async_move_storage(fs::path const& p); // fills the vector that maps all allocated // slots to the piece that is stored (or @@ -192,11 +219,134 @@ namespace libtorrent // of unassigned pieces and -1 is unallocated void export_piece_map(std::vector& pieces) const; - bool compact_allocation() const; + bool compact_allocation() const + { return m_compact_mode; } +#ifndef NDEBUG + std::string name() const { return m_info.name(); } +#endif + private: - class impl; - std::auto_ptr m_pimpl; + + bool allocate_slots(int num_slots, bool abort_on_disk = false); + + int identify_data( + const std::vector& piece_data + , int current_slot + , std::vector& have_pieces + , int& num_pieces + , const std::multimap& hash_to_piece + , boost::recursive_mutex& mutex); + + size_type read_impl( + char* buf + , int piece_index + , int offset + , int size); + + void write_impl( + const char* buf + , int piece_index + , int offset + , int size); + + sha1_hash hash_for_piece_impl(int piece); + + void release_files_impl(); + + bool move_storage_impl(fs::path const& save_path); + + int allocate_slot_for_piece(int piece_index); +#ifndef NDEBUG + void check_invariant() const; +#ifdef TORRENT_STORAGE_DEBUG + void debug_log() const; +#endif +#endif + boost::scoped_ptr m_storage; + + // if this is true, pieces are always allocated at the + // lowest possible slot index. If it is false, pieces + // are always written to their final place immediately + bool m_compact_mode; + + // if this is true, pieces that haven't been downloaded + // will be filled with zeroes. Not filling with zeroes + // will not work in some cases (where a seek cannot pass + // the end of the file). + bool m_fill_mode; + + // a bitmask representing the pieces we have + std::vector m_have_piece; + + torrent_info const& m_info; + + // slots that haven't had any file storage allocated + std::vector m_unallocated_slots; + // slots that have file storage, but isn't assigned to a piece + std::vector m_free_slots; + + enum + { + has_no_slot = -3 // the piece has no storage + }; + + // maps piece indices to slots. If a piece doesn't + // have any storage, it is set to 'has_no_slot' + std::vector m_piece_to_slot; + + enum + { + unallocated = -1, // the slot is unallocated + unassigned = -2 // the slot is allocated but not assigned to a piece + }; + + // maps slots to piece indices, if a slot doesn't have a piece + // it can either be 'unassigned' or 'unallocated' + std::vector m_slot_to_piece; + + fs::path m_save_path; + + mutable boost::recursive_mutex m_mutex; + + bool m_allocating; + boost::mutex m_allocating_monitor; + boost::condition m_allocating_condition; + + // these states are used while checking/allocating the torrent + + enum { + // the default initial state + state_none, + // the file checking is complete + state_finished, + // creating the directories + state_create_files, + // checking the files + state_full_check, + // allocating files (in non-compact mode) + state_allocating + } m_state; + int m_current_slot; + + std::vector m_piece_data; + + // this maps a piece hash to piece index. It will be + // build the first time it is used (to save time if it + // isn't needed) + std::multimap m_hash_to_piece; + + std::map m_piece_hasher; + + disk_io_thread& m_io_thread; + + // the reason for this to be a void pointer + // is to avoid creating a dependency on the + // torrent. This shared_ptr is here only + // to keep the torrent object alive until + // the piece_manager destructs. This is because + // the torrent_info object is owned by the torrent. + boost::shared_ptr m_torrent; }; } diff --git a/include/libtorrent/torrent.hpp b/include/libtorrent/torrent.hpp index e643cf31a..0ff2f5221 100755 --- a/include/libtorrent/torrent.hpp +++ b/include/libtorrent/torrent.hpp @@ -68,6 +68,7 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/escape_string.hpp" #include "libtorrent/bandwidth_manager.hpp" #include "libtorrent/storage.hpp" +#include "libtorrent/hasher.hpp" namespace libtorrent { @@ -84,6 +85,8 @@ namespace libtorrent struct piece_checker_data; } + namespace fs = boost::filesystem; + // a torrent is a class that holds information // for a specific download. It updates itself against // the tracker @@ -96,7 +99,7 @@ namespace libtorrent aux::session_impl& ses , aux::checker_impl& checker , torrent_info const& tf - , boost::filesystem::path const& save_path + , fs::path const& save_path , tcp::endpoint const& net_interface , bool compact_mode , int block_size @@ -111,7 +114,7 @@ namespace libtorrent , char const* tracker_url , sha1_hash const& info_hash , char const* name - , boost::filesystem::path const& save_path + , fs::path const& save_path , tcp::endpoint const& net_interface , bool compact_mode , int block_size @@ -424,11 +427,12 @@ namespace libtorrent // completed() is called immediately after it. void finished(); - bool verify_piece(int piece_index); + void async_verify_piece(int piece_index, boost::function const&); // this is called from the peer_connection // each time a piece has failed the hash // test + void piece_finished(int index, bool passed_hash_check); void piece_failed(int index); void received_redundant_data(int num_bytes) { assert(num_bytes > 0); m_total_redundant_bytes += num_bytes; } @@ -448,7 +452,7 @@ namespace libtorrent - m_num_pieces - m_picker->num_filtered() == 0; } - boost::filesystem::path save_path() const; + fs::path save_path() const; alert_manager& alerts() const; piece_picker& picker() { @@ -458,7 +462,7 @@ namespace libtorrent } bool has_picker() const { - assert((valid_metadata() && !is_seed()) == bool(m_picker.get() != 0)); + assert((m_storage && !is_seed()) == bool(m_picker.get() != 0)); return m_picker.get() != 0; } policy& get_policy() @@ -505,14 +509,14 @@ namespace libtorrent void set_max_uploads(int limit); void set_max_connections(int limit); - bool move_storage(boost::filesystem::path const& save_path); + void move_storage(fs::path const& save_path); // unless this returns true, new connections must wait // with their initialization. bool ready_for_connections() const { return m_connections_initialized; } bool valid_metadata() const - { return m_storage.get() != 0; } + { return m_torrent_file.is_valid(); } // parses the info section from the given // bencoded tree and moves the torrent @@ -521,6 +525,9 @@ namespace libtorrent void set_metadata(entry const&); private: + + void on_piece_verified(int ret, disk_io_job const& j + , boost::function f); void try_next_tracker(); int prioritize_tracker(int tracker_index); @@ -554,7 +561,27 @@ namespace libtorrent // if this pointer is 0, the torrent is in // a state where the metadata hasn't been // received yet. - boost::scoped_ptr m_storage; + // the piece_manager keeps the torrent object + // alive by holding a shared_ptr to it and + // the torrent keeps the piece manager alive + // with this intrusive_ptr. This cycle is + // broken when torrent::abort() is called + // Then the torrent releases the piece_manager + // and when the piece_manager is complete with all + // outstanding disk io jobs (that keeps + // the piece_manager alive) it will destruct + // and release the torrent file. The reason for + // this is that the torrent_info is used by + // the piece_manager, and stored in the + // torrent, so the torrent cannot destruct + // before the piece_manager. + boost::intrusive_ptr m_owning_storage; + + // this is a weak (non owninig) pointer to + // the piece_manager. This is used after the torrent + // has been aborted, and it can no longer own + // the object. + piece_manager* m_storage; // the time of next tracker request ptime m_next_request; @@ -691,7 +718,7 @@ namespace libtorrent // are opened through tcp::endpoint m_net_interface; - boost::filesystem::path m_save_path; + fs::path m_save_path; // determines the storage state for this torrent. const bool m_compact_mode; diff --git a/include/libtorrent/torrent_handle.hpp b/include/libtorrent/torrent_handle.hpp index a4dbbb12b..b5e6bdc17 100755 --- a/include/libtorrent/torrent_handle.hpp +++ b/include/libtorrent/torrent_handle.hpp @@ -54,6 +54,8 @@ POSSIBILITY OF SUCH DAMAGE. namespace libtorrent { + namespace fs = boost::filesystem; + namespace aux { struct session_impl; @@ -202,15 +204,22 @@ namespace libtorrent int block_size; }; + struct TORRENT_EXPORT block_info + { + enum block_state_t + { none, requested, writing, finished }; + + tcp::endpoint peer; + unsigned state:2; + unsigned num_downloads:14; + }; + struct TORRENT_EXPORT partial_piece_info { enum { max_blocks_per_piece = 256 }; int piece_index; int blocks_in_piece; - std::bitset requested_blocks; - std::bitset finished_blocks; - tcp::endpoint peer[max_blocks_per_piece]; - int num_downloads[max_blocks_per_piece]; + block_info blocks[max_blocks_per_piece]; enum state_t { none, slow, medium, fast }; state_t piece_state; }; @@ -323,7 +332,7 @@ namespace libtorrent // the ratio is uploaded / downloaded. less than 1 is not allowed void set_ratio(float up_down_ratio) const; - boost::filesystem::path save_path() const; + fs::path save_path() const; // -1 means unlimited unchokes void set_max_uploads(int max_uploads) const; @@ -335,7 +344,7 @@ namespace libtorrent , std::string const& password) const; // post condition: save_path() == save_path if true is returned - bool move_storage(boost::filesystem::path const& save_path) const; + void move_storage(fs::path const& save_path) const; const sha1_hash& info_hash() const { return m_info_hash; } diff --git a/include/libtorrent/torrent_info.hpp b/include/libtorrent/torrent_info.hpp index d26f9f1f2..e52024255 100755 --- a/include/libtorrent/torrent_info.hpp +++ b/include/libtorrent/torrent_info.hpp @@ -62,16 +62,18 @@ namespace libtorrent namespace pt = boost::posix_time; namespace gr = boost::gregorian; + namespace fs = boost::filesystem; + struct TORRENT_EXPORT file_entry { - boost::filesystem::path path; + fs::path path; size_type offset; // the offset of this file inside the torrent size_type size; // the size of this file // if the path was incorrectly encoded, this is // the origianal corrupt encoded string. It is // preserved in order to be able to reproduce // the correct info-hash - boost::shared_ptr orig_path; + boost::shared_ptr orig_path; }; struct TORRENT_EXPORT file_slice @@ -109,7 +111,7 @@ namespace libtorrent void set_piece_size(int size); void set_hash(int index, sha1_hash const& h); void add_tracker(std::string const& url, int tier = 0); - void add_file(boost::filesystem::path file, size_type size); + void add_file(fs::path file, size_type size); void add_url_seed(std::string const& url); std::vector map_block(int piece, size_type offset, int size) const; @@ -192,6 +194,8 @@ namespace libtorrent // used by seeds void seed_free(); + void swap(torrent_info& ti); + private: void read_torrent_info(const entry& libtorrent); diff --git a/include/libtorrent/tracker_manager.hpp b/include/libtorrent/tracker_manager.hpp index a4d24f751..1435ceda6 100755 --- a/include/libtorrent/tracker_manager.hpp +++ b/include/libtorrent/tracker_manager.hpp @@ -62,6 +62,7 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/config.hpp" #include "libtorrent/time.hpp" #include "libtorrent/connection_queue.hpp" +#include "libtorrent/intrusive_ptr_base.hpp" namespace libtorrent { @@ -148,15 +149,10 @@ namespace libtorrent , request_callback* requester , int maximum_tracker_response_length); - TORRENT_EXPORT void intrusive_ptr_add_ref(timeout_handler const*); - TORRENT_EXPORT void intrusive_ptr_release(timeout_handler const*); - struct TORRENT_EXPORT timeout_handler - : boost::noncopyable + : intrusive_ptr_base + , boost::noncopyable { - friend TORRENT_EXPORT void intrusive_ptr_add_ref(timeout_handler const*); - friend TORRENT_EXPORT void intrusive_ptr_release(timeout_handler const*); - timeout_handler(asio::strand& str); void set_timeout(int completion_timeout, int read_timeout); @@ -187,7 +183,6 @@ namespace libtorrent typedef boost::mutex mutex_t; mutable mutex_t m_mutex; - mutable int m_refs; }; struct TORRENT_EXPORT tracker_connection diff --git a/include/libtorrent/web_peer_connection.hpp b/include/libtorrent/web_peer_connection.hpp index a84f2e6ff..ba7450c0a 100755 --- a/include/libtorrent/web_peer_connection.hpp +++ b/include/libtorrent/web_peer_connection.hpp @@ -123,7 +123,7 @@ namespace libtorrent void write_request(peer_request const& r); void write_cancel(peer_request const& r) {} void write_have(int index) {} - void write_piece(peer_request const& r) {} + void write_piece(peer_request const& r, char const* buffer) { assert(false); } void write_keepalive() {} void on_connected(); diff --git a/src/bt_peer_connection.cpp b/src/bt_peer_connection.cpp index 904d0dc26..55eec74c2 100755 --- a/src/bt_peer_connection.cpp +++ b/src/bt_peer_connection.cpp @@ -1335,7 +1335,7 @@ namespace libtorrent send_buffer(msg, msg + packet_size); } - void bt_peer_connection::write_piece(peer_request const& r) + void bt_peer_connection::write_piece(peer_request const& r, char const* buffer) { INVARIANT_CHECK; @@ -1350,9 +1350,7 @@ namespace libtorrent detail::write_uint8(msg_piece, i.begin); detail::write_int32(r.piece, i.begin); detail::write_int32(r.start, i.begin); - - t->filesystem().read( - i.begin, r.piece, r.start, r.length); + std::memcpy(i.begin, buffer, r.length); assert(i.begin + r.length == i.end); diff --git a/src/disk_io_thread.cpp b/src/disk_io_thread.cpp new file mode 100644 index 000000000..6f97ce7df --- /dev/null +++ b/src/disk_io_thread.cpp @@ -0,0 +1,241 @@ +/* + +Copyright (c) 2007, Arvid Norberg +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in + the documentation and/or other materials provided with the distribution. + * Neither the name of the author nor the names of its + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. + +*/ + +#include "libtorrent/storage.hpp" +#include +#include "libtorrent/disk_io_thread.hpp" + +namespace libtorrent +{ + + disk_io_thread::disk_io_thread(int block_size) + : m_abort(false) + , m_queue_buffer_size(0) + , m_pool(block_size) + , m_disk_io_thread(boost::ref(*this)) + {} + + disk_io_thread::~disk_io_thread() + { + boost::mutex::scoped_lock l(m_mutex); + m_abort = true; + m_signal.notify_all(); + l.unlock(); + + m_disk_io_thread.join(); + } + + // aborts read operations + void disk_io_thread::stop(boost::intrusive_ptr s) + { + boost::mutex::scoped_lock l(m_mutex); + // read jobs are aborted, write and move jobs are syncronized + for (std::deque::iterator i = m_jobs.begin(); + i != m_jobs.end();) + { + if (i->storage != s) + { + ++i; + continue; + } + if (i->action == disk_io_job::read) + { + i->callback(-1, *i); + m_jobs.erase(i++); + continue; + } + ++i; + } + m_signal.notify_all(); + } + + bool range_overlap(int start1, int length1, int start2, int length2) + { + return (start1 <= start2 && start1 + length1 > start2) + || (start2 <= start1 && start2 + length2 > start1); + } + + namespace + { + bool operator<(disk_io_job const& lhs, disk_io_job const& rhs) + { + if (lhs.storage.get() < rhs.storage.get()) return true; + if (lhs.storage.get() > rhs.storage.get()) return false; + if (lhs.piece < rhs.piece) return true; + if (lhs.piece > rhs.piece) return false; + if (lhs.offset < rhs.offset) return true; +// if (lhs.offset > rhs.offset) return false; + return false; + } + } + + void disk_io_thread::add_job(disk_io_job const& j + , boost::function const& f) + { + assert(!j.callback); + boost::mutex::scoped_lock l(m_mutex); + + std::deque::reverse_iterator i = m_jobs.rbegin(); + if (j.action == disk_io_job::read) + { + // when we're reading, we may not skip + // ahead of any write operation that overlaps + // the region we're reading + for (; i != m_jobs.rend(); ++i) + { + if (i->action == disk_io_job::read && *i < j) + break; + if (i->action == disk_io_job::write + && i->storage == j.storage + && i->piece == j.piece + && range_overlap(i->offset, i->buffer_size + , j.offset, j.buffer_size)) + { + // we have to stop, and we haven't + // found a suitable place for this job + // so just queue it up at the end + i = m_jobs.rbegin(); + break; + } + } + } + else if (j.action == disk_io_job::write) + { + for (; i != m_jobs.rend(); ++i) + { + if (i->action == disk_io_job::write && *i < j) + { + if (i != m_jobs.rbegin() + && i.base()->storage.get() != j.storage.get()) + i = m_jobs.rbegin(); + break; + } + } + } + + if (i == m_jobs.rend()) i = m_jobs.rbegin(); + + std::deque::iterator k = m_jobs.insert(i.base(), j); + k->callback.swap(const_cast&>(f)); + if (j.action == disk_io_job::write) + m_queue_buffer_size += j.buffer_size; + assert(j.storage.get()); + m_signal.notify_all(); + } + + char* disk_io_thread::allocate_buffer() + { + boost::mutex::scoped_lock l(m_mutex); + return (char*)m_pool.ordered_malloc(); + } + + void disk_io_thread::operator()() + { + for (;;) + { + boost::mutex::scoped_lock l(m_mutex); + while (m_jobs.empty() && !m_abort) + m_signal.wait(l); + if (m_abort && m_jobs.empty()) return; + + boost::function handler; + handler.swap(m_jobs.front().callback); + disk_io_job j = m_jobs.front(); + m_jobs.pop_front(); + m_queue_buffer_size -= j.buffer_size; + l.unlock(); + + int ret = 0; + + try + { +// std::cerr << "DISK THREAD: executing job: " << j.action << std::endl; + switch (j.action) + { + case disk_io_job::read: + l.lock(); + j.buffer = (char*)m_pool.ordered_malloc(); + l.unlock(); + if (j.buffer == 0) + { + ret = -1; + j.str = "out of memory"; + } + else + { + ret = j.storage->read_impl(j.buffer, j.piece, j.offset + , j.buffer_size); + + // simulates slow drives + // usleep(300); + } + break; + case disk_io_job::write: + assert(j.buffer); + j.storage->write_impl(j.buffer, j.piece, j.offset + , j.buffer_size); + + // simulates a slow drive + // usleep(300); + break; + case disk_io_job::hash: + { + sha1_hash h = j.storage->hash_for_piece_impl(j.piece); + j.str.resize(20); + std::memcpy(&j.str[0], &h[0], 20); + } + break; + case disk_io_job::move_storage: + ret = j.storage->move_storage_impl(j.str) ? 1 : 0; + break; + case disk_io_job::release_files: + j.storage->release_files_impl(); + break; + } + } + catch (std::exception& e) + { +// std::cerr << "DISK THREAD: exception: " << e.what() << std::endl; + j.str = e.what(); + ret = -1; + } + +// if (!handler) std::cerr << "DISK THREAD: no callback specified" << std::endl; +// else std::cerr << "DISK THREAD: invoking callback" << std::endl; + try { if (handler) handler(ret, j); } + catch (std::exception&) {} + + if (j.buffer) m_pool.ordered_free(j.buffer); + } + } +} + diff --git a/src/entry.cpp b/src/entry.cpp index 6506ed4c2..16dffc275 100755 --- a/src/entry.cpp +++ b/src/entry.cpp @@ -278,6 +278,12 @@ namespace libtorrent } } + void entry::swap(entry& e) + { + // not implemented + assert(false); + } + void entry::print(std::ostream& os, int indent) const { assert(indent >= 0); diff --git a/src/file.cpp b/src/file.cpp index a1352ae03..515406a46 100755 --- a/src/file.cpp +++ b/src/file.cpp @@ -82,8 +82,6 @@ BOOST_STATIC_ASSERT(sizeof(lseek(0, 0, 0)) >= 8); #endif -namespace fs = boost::filesystem; - namespace { enum { mode_in = 1, mode_out = 2 }; @@ -130,6 +128,8 @@ namespace namespace libtorrent { + namespace fs = boost::filesystem; + const file::open_mode file::in(mode_in); const file::open_mode file::out(mode_out); @@ -303,13 +303,13 @@ namespace libtorrent file::file() : m_impl(new impl()) {} - file::file(boost::filesystem::path const& p, file::open_mode m) + file::file(fs::path const& p, file::open_mode m) : m_impl(new impl(p, m.m_mask)) {} file::~file() {} - void file::open(boost::filesystem::path const& p, file::open_mode m) + void file::open(fs::path const& p, file::open_mode m) { m_impl->open(p, m.m_mask); } diff --git a/src/logger.cpp b/src/logger.cpp index 6881c5e7b..b33816a59 100644 --- a/src/logger.cpp +++ b/src/logger.cpp @@ -53,17 +53,20 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/peer_request.hpp" #include "libtorrent/peer_connection.hpp" -namespace libtorrent { namespace +namespace libtorrent { + +namespace fs = boost::filesystem; + +namespace { struct logger_peer_plugin : peer_plugin { logger_peer_plugin(std::string const& filename) { - using namespace boost::filesystem; - path dir(complete("libtorrent_ext_logs")); - if (!exists(dir)) create_directories(dir); - m_file.open(dir / filename, std::ios_base::out | std::ios_base::out); + fs::path dir(fs::complete("libtorrent_ext_logs")); + if (!fs::exists(dir)) fs::create_directories(dir); + m_file.open((dir / filename).string().c_str(), std::ios_base::out | std::ios_base::out); m_file << "\n\n\n"; log_timestamp(); m_file << "*** starting log ***\n"; @@ -201,7 +204,7 @@ namespace libtorrent { namespace } private: - boost::filesystem::ofstream m_file; + std::ofstream m_file; }; struct logger_plugin : torrent_plugin diff --git a/src/metadata_transfer.cpp b/src/metadata_transfer.cpp index 624ec1c20..97635cdb9 100644 --- a/src/metadata_transfer.cpp +++ b/src/metadata_transfer.cpp @@ -38,8 +38,6 @@ POSSIBILITY OF SUCH DAMAGE. #include #include -#include -#include #ifdef _MSC_VER #pragma warning(pop) diff --git a/src/peer_connection.cpp b/src/peer_connection.cpp index 21bbdac25..c2fc912a2 100755 --- a/src/peer_connection.cpp +++ b/src/peer_connection.cpp @@ -59,21 +59,6 @@ using libtorrent::aux::session_impl; namespace libtorrent { - void intrusive_ptr_add_ref(peer_connection const* c) - { - assert(c->m_refs >= 0); - assert(c != 0); - ++c->m_refs; - } - - void intrusive_ptr_release(peer_connection const* c) - { - assert(c->m_refs > 0); - assert(c != 0); - if (--c->m_refs == 0) - delete c; - } - // outbound connection peer_connection::peer_connection( session_impl& ses @@ -94,6 +79,7 @@ namespace libtorrent , m_packet_size(0) , m_recv_pos(0) , m_current_send_buffer(0) + , m_reading_bytes(0) , m_write_pos(0) , m_last_receive(time_now()) , m_last_sent(time_now()) @@ -122,7 +108,6 @@ namespace libtorrent , m_prefer_whole_pieces(false) , m_request_large_blocks(false) , m_non_prioritized(false) - , m_refs(0) , m_upload_limit(resource_request::inf) , m_download_limit(resource_request::inf) , m_peer_info(peerinfo) @@ -169,6 +154,7 @@ namespace libtorrent , m_packet_size(0) , m_recv_pos(0) , m_current_send_buffer(0) + , m_reading_bytes(0) , m_write_pos(0) , m_last_receive(time_now()) , m_last_sent(time_now()) @@ -195,7 +181,6 @@ namespace libtorrent , m_prefer_whole_pieces(false) , m_request_large_blocks(false) , m_non_prioritized(false) - , m_refs(0) , m_upload_limit(resource_request::inf) , m_download_limit(resource_request::inf) , m_peer_info(peerinfo) @@ -1019,7 +1004,7 @@ namespace libtorrent for (std::vector::const_iterator i = dl_queue.begin(); i != dl_queue.end(); ++i) { - assert(i->finished < blocks_per_piece); + assert(i->finished <= blocks_per_piece); } } } @@ -1083,7 +1068,6 @@ namespace libtorrent piece_picker& picker = t->picker(); piece_manager& fs = t->filesystem(); - policy& pol = t->get_policy(); std::vector finished_blocks; piece_block block_finished(p.piece, p.start / t->block_size()); @@ -1132,13 +1116,12 @@ namespace libtorrent } else { - // cancel the block from the +/* // cancel the block from the // peer that has taken over it. boost::optional peer = t->picker().get_downloader(block_finished); - if (peer) + if (peer && t->picker().is_requested(block_finished)) { - assert(!t->picker().is_finished(block_finished)); peer_connection* pc = t->connection_for(*peer); if (pc && pc != this) { @@ -1147,7 +1130,7 @@ namespace libtorrent } } else - { +*/ { if (t->alerts().should_post(alert::debug)) { t->alerts().post_alert( @@ -1160,15 +1143,28 @@ namespace libtorrent (*m_logger) << " *** The block we just got was not in the " "request queue ***\n"; #endif + t->received_redundant_data(p.length); + if (!has_peer_choked()) + { + request_a_block(*t, *this); + send_block_requests(); + } + return; } } + assert(picker.is_requested(block_finished)); + // if the block we got is already finished, then ignore it - if (picker.is_finished(block_finished)) + if (picker.is_downloaded(block_finished)) { - t->received_redundant_data(t->block_size()); - pol.block_finished(*this, block_finished); - send_block_requests(); + t->received_redundant_data(p.length); + + if (!has_peer_choked()) + { + request_a_block(*t, *this); + send_block_requests(); + } if (request_peer && !request_peer->has_peer_choked() && !t->is_seed()) { @@ -1178,107 +1174,73 @@ namespace libtorrent return; } - fs.write(data, p.piece, p.start, p.length); - - picker.mark_as_finished(block_finished, m_remote); - - try - { - pol.block_finished(*this, block_finished); - send_block_requests(); - } - catch (std::exception const&) {} - + fs.async_write(p, data, bind(&peer_connection::on_disk_write_complete + , self(), _1, _2, p, t)); + picker.mark_as_writing(block_finished, m_remote); + if (request_peer && !request_peer->has_peer_choked() && !t->is_seed()) { request_a_block(*t, *request_peer); request_peer->send_block_requests(); } + } + + void peer_connection::on_disk_write_complete(int ret, disk_io_job const& j + , peer_request p, boost::shared_ptr t) + { + session_impl::mutex_t::scoped_lock l(m_ses.m_mutex); + + if (ret == -1 || !t) + { + if (!t) + { + m_ses.connection_failed(m_socket, remote(), j.str.c_str()); + return; + } + + if (t->alerts().should_post(alert::fatal)) + { + std::string err = "torrent paused: disk write error, " + j.str; + t->alerts().post_alert(file_error_alert(t->get_handle(), err)); + } + t->pause(); + return; + } + + if (t->is_seed()) return; + + piece_picker& picker = t->picker(); + + assert(p.piece == j.piece); + assert(p.start == j.offset); + piece_block block_finished(p.piece, p.start / t->block_size()); + picker.mark_as_finished(block_finished, m_remote); + + if (!has_peer_choked() && !t->is_seed() && !m_torrent.expired()) + { + // this is a free function defined in policy.cpp + request_a_block(*t, *this); + try + { + send_block_requests(); + } + catch (std::exception const&) {} + } + #ifndef NDEBUG try { #endif - bool was_seed = t->is_seed(); - bool was_finished = picker.num_filtered() + t->num_pieces() - == t->torrent_file().num_pieces(); - // did we just finish the piece? if (picker.is_piece_finished(p.piece)) { #ifndef NDEBUG check_postcondition post_checker2_(t, false); #endif - bool verified = t->verify_piece(p.piece); - if (verified) - { - // the following call may cause picker to become invalid - // in case we just became a seed - t->announce_piece(p.piece); - assert(t->valid_metadata()); - // if we just became a seed, picker is now invalid, since it - // is deallocated by the torrent once it starts seeding - if (!was_finished - && (t->is_seed() - || picker.num_filtered() + t->num_pieces() - == t->torrent_file().num_pieces())) - { - // torrent finished - // i.e. all the pieces we're interested in have - // been downloaded. Release the files (they will open - // in read only mode if needed) - try { t->finished(); } - catch (std::exception& e) - { -#ifndef NDEBUG - std::cerr << e.what() << std::endl; - assert(false); -#endif - } - } - } - else - { - t->piece_failed(p.piece); - } - -#ifndef NDEBUG - try - { -#endif - - pol.piece_finished(p.piece, verified); - -#ifndef NDEBUG - } - catch (std::exception const& e) - { - std::cerr << e.what() << std::endl; - assert(false); - } -#endif - -#ifndef NDEBUG - try - { -#endif - - if (!was_seed && t->is_seed()) - { - assert(verified); - t->completed(); - } - -#ifndef NDEBUG - } - catch (std::exception const& e) - { - std::cerr << e.what() << std::endl; - assert(false); - } -#endif - + t->async_verify_piece(p.piece, bind(&torrent::piece_finished, t + , p.piece, _1)); } #ifndef NDEBUG @@ -1357,7 +1319,7 @@ namespace libtorrent assert(block.piece_index < t->torrent_file().num_pieces()); assert(block.block_index >= 0); assert(block.block_index < t->torrent_file().piece_size(block.piece_index)); - assert(!t->picker().is_downloading(block)); + assert(!t->picker().is_requested(block)); piece_picker::piece_state_t state; peer_speed_t speed = peer_speed(); @@ -1383,7 +1345,7 @@ namespace libtorrent assert(block.piece_index < t->torrent_file().num_pieces()); assert(block.block_index >= 0); assert(block.block_index < t->torrent_file().piece_size(block.piece_index)); - assert(t->picker().is_downloading(block)); + assert(t->picker().is_requested(block)); t->picker().abort_download(block); @@ -1610,6 +1572,8 @@ namespace libtorrent void peer_connection::disconnect() { + session_impl::mutex_t::scoped_lock l(m_ses.m_mutex); + boost::intrusive_ptr me(this); INVARIANT_CHECK; @@ -1877,8 +1841,11 @@ namespace libtorrent m_assume_fifo = true; - request_a_block(*t, *this); - send_block_requests(); + if (!has_peer_choked()) + { + request_a_block(*t, *this); + send_block_requests(); + } } } @@ -1997,7 +1964,7 @@ namespace libtorrent else if (buffer_size_watermark > 80 * 1024) buffer_size_watermark = 80 * 1024; while (!m_requests.empty() - && (send_buffer_size() < buffer_size_watermark) + && (send_buffer_size() + m_reading_bytes < buffer_size_watermark) && !m_choked) { assert(t->valid_metadata()); @@ -2009,16 +1976,12 @@ namespace libtorrent assert(r.start + r.length <= t->torrent_file().piece_size(r.piece)); assert(r.length > 0 && r.start >= 0); - write_piece(r); - -#ifdef TORRENT_VERBOSE_LOGGING - (*m_logger) << time_now_string() - << " ==> PIECE [ piece: " << r.piece << " | s: " << r.start - << " | l: " << r.length << " ]\n"; -#endif + t->filesystem().async_read(r, bind(&peer_connection::on_disk_read_complete + , self(), _1, _2, r)); + m_reading_bytes += r.length; m_requests.erase(m_requests.begin()); - +/* if (m_requests.empty() && m_num_invalid_requests > 0 && is_peer_interested() @@ -2031,9 +1994,49 @@ namespace libtorrent send_choke(); send_unchoke(); } +*/ } } + void peer_connection::on_disk_read_complete(int ret, disk_io_job const& j, peer_request r) + { + session_impl::mutex_t::scoped_lock l(m_ses.m_mutex); + + m_reading_bytes -= r.length; + + if (ret != r.length || m_torrent.expired()) + { + boost::shared_ptr t = m_torrent.lock(); + if (!t) + { + m_ses.connection_failed(m_socket, remote(), j.str.c_str()); + return; + } + + if (t->alerts().should_post(alert::fatal)) + { + std::string err = "torrent paused: disk read error"; + if (!j.str.empty()) + { + err += ", "; + err += j.str; + } + t->alerts().post_alert(file_error_alert(t->get_handle(), err)); + } + t->pause(); + return; + } + +#ifdef TORRENT_VERBOSE_LOGGING + (*m_logger) << time_now_string() + << " ==> PIECE [ piece: " << r.piece << " | s: " << r.start + << " | l: " << r.length << " ]\n"; +#endif + + write_piece(r, j.buffer); + setup_send(); + } + void peer_connection::assign_bandwidth(int channel, int amount) { session_impl::mutex_t::scoped_lock l(m_ses.m_mutex); diff --git a/src/piece_picker.cpp b/src/piece_picker.cpp index 2bb1637ee..bd568210f 100755 --- a/src/piece_picker.cpp +++ b/src/piece_picker.cpp @@ -88,8 +88,9 @@ namespace libtorrent // pieces is a bitmask with the pieces we have void piece_picker::files_checked( - const std::vector& pieces - , const std::vector& unfinished) + std::vector const& pieces + , std::vector const& unfinished + , std::vector& verify_pieces) { #ifndef NDEBUG m_files_checked_called = true; @@ -118,14 +119,12 @@ namespace libtorrent tcp::endpoint peer; for (int j = 0; j < m_blocks_per_piece; ++j) { - if (i->info[j].finished) + if (i->info[j].state == block_info::state_finished) mark_as_finished(piece_block(i->index, j), peer); } if (is_piece_finished(i->index)) { - // TODO: handle this case by verifying the - // piece and either accept it or discard it - assert(false); + verify_pieces.push_back(i->index); } } } @@ -212,8 +211,7 @@ namespace libtorrent for (int i = 0; i < m_blocks_per_piece; ++i) { ret.info[i].num_downloads = 0; - ret.info[i].requested = 0; - ret.info[i].finished = 0; + ret.info[i].state = block_info::state_none; ret.info[i].peer = tcp::endpoint(); } return ret; @@ -258,25 +256,28 @@ namespace libtorrent int num_blocks = blocks_in_piece(i->index); int num_requested = 0; int num_finished = 0; + int num_writing = 0; for (int k = 0; k < num_blocks; ++k) { - if (i->info[k].finished) + if (i->info[k].state == block_info::state_finished) { ++num_finished; - assert(i->info[k].requested); - ++num_requested; continue; } - if (i->info[k].requested) + if (i->info[k].state == block_info::state_requested) { ++num_requested; blocks_requested = true; } + if (i->info[k].state == block_info::state_writing) + { + ++num_writing; + } } assert(blocks_requested == (i->state != none)); assert(num_requested == i->requested); + assert(num_writing == i->writing); assert(num_finished == i->finished); - assert(num_finished <= num_requested); } @@ -1070,8 +1071,7 @@ namespace libtorrent for (int j = 0; j < num_blocks_in_piece; ++j) { piece_picker::block_info const& info = p.info[j]; - if ((info.finished == 1 - || info.requested == 1) + if (info.state != piece_picker::block_info::state_none && info.peer != peer && info.peer != tcp::endpoint()) { @@ -1089,6 +1089,9 @@ namespace libtorrent , int num_blocks, bool prefer_whole_pieces , tcp::endpoint peer, piece_state_t speed) const { + // if we have less than 1% of the pieces, ignore speed priorities and just try + // to finish any downloading piece + bool ignore_speed_categories = (m_num_have * 100 / m_piece_map.size()) < 1; for (std::vector::const_iterator i = piece_list.begin(); i != piece_list.end(); ++i) { @@ -1098,10 +1101,6 @@ namespace libtorrent // if the peer doesn't have the piece // skip it if (!pieces[*i]) continue; - - // if we have less than 1% of the pieces, ignore speed priorities and just try - // to finish any downloading piece - bool ignore_speed_categories = (m_num_have * 100 / m_piece_map.size()) < 1; int num_blocks_in_piece = blocks_in_piece(*i); @@ -1130,8 +1129,10 @@ namespace libtorrent for (int j = 0; j < num_blocks_in_piece; ++j) { block_info const& info = p->info[j]; - if (info.finished) continue; - if (info.requested + if (info.state == block_info::state_finished + || info.state == block_info::state_writing) + continue; + if (info.state == block_info::state_requested && info.peer == peer) continue; backup_blocks.push_back(piece_block(*i, j)); } @@ -1142,9 +1143,13 @@ namespace libtorrent { // ignore completed blocks block_info const& info = p->info[j]; - if (info.finished) continue; + if (info.state == block_info::state_finished + || info.state == block_info::state_writing) + continue; // ignore blocks requested from this peer already - if (info.requested && info.peer == peer) continue; + if (info.state == block_info::state_requested + && info.peer == peer) + continue; // if the piece is fast and the peer is slow, or vice versa, // add the block as a backup. // override this behavior if all the other blocks @@ -1169,7 +1174,7 @@ namespace libtorrent // blocks that have not been requested from any // other peer. interesting_blocks.push_back(piece_block(*i, j)); - if (p->info[j].requested == 0) + if (p->info[j].state == block_info::state_none) { // we have found a block that's free to download num_blocks--; @@ -1217,11 +1222,18 @@ namespace libtorrent int max_blocks = blocks_in_piece(index); if ((int)i->finished < max_blocks) return false; - assert((int)i->requested == max_blocks); +#ifndef NDEBUG + for (int k = 0; k < max_blocks; ++k) + { + assert(i->info[k].state == block_info::state_finished); + } +#endif + + assert((int)i->finished == max_blocks); return true; } - bool piece_picker::is_downloading(piece_block block) const + bool piece_picker::is_requested(piece_block block) const { assert(block.piece_index >= 0); assert(block.block_index >= 0); @@ -1235,7 +1247,22 @@ namespace libtorrent , has_index(block.piece_index)); assert(i != m_downloads.end()); - return i->info[block.block_index].requested; + return i->info[block.block_index].state == block_info::state_requested; + } + + bool piece_picker::is_downloaded(piece_block block) const + { + assert(block.piece_index >= 0); + assert(block.block_index >= 0); + assert(block.piece_index < (int)m_piece_map.size()); + + if (m_piece_map[block.piece_index].index == piece_pos::we_have_index) return true; + if (m_piece_map[block.piece_index].downloading == 0) return false; + std::vector::const_iterator i + = std::find_if(m_downloads.begin(), m_downloads.end(), has_index(block.piece_index)); + assert(i != m_downloads.end()); + return i->info[block.block_index].state == block_info::state_finished + || i->info[block.block_index].state == block_info::state_writing; } bool piece_picker::is_finished(piece_block block) const @@ -1249,7 +1276,7 @@ namespace libtorrent std::vector::const_iterator i = std::find_if(m_downloads.begin(), m_downloads.end(), has_index(block.piece_index)); assert(i != m_downloads.end()); - return i->info[block.block_index].finished; + return i->info[block.block_index].state == block_info::state_finished; } @@ -1274,7 +1301,7 @@ namespace libtorrent dp.state = state; dp.index = block.piece_index; block_info& info = dp.info[block.block_index]; - info.requested = 1; + info.state = block_info::state_requested; info.peer = peer; ++dp.requested; } @@ -1284,9 +1311,9 @@ namespace libtorrent = std::find_if(m_downloads.begin(), m_downloads.end(), has_index(block.piece_index)); assert(i != m_downloads.end()); block_info& info = i->info[block.block_index]; - assert(info.requested == 0); + assert(info.state == block_info::state_none); info.peer = peer; - info.requested = 1; + info.state = block_info::state_requested; ++i->requested; if (i->state == none) i->state = state; } @@ -1303,7 +1330,7 @@ namespace libtorrent *j = i->peer_count; } - void piece_picker::mark_as_finished(piece_block block, const tcp::endpoint& peer) + void piece_picker::mark_as_writing(piece_block block, tcp::endpoint const& peer) { TORRENT_PIECE_PICKER_INVARIANT_CHECK; @@ -1313,10 +1340,11 @@ namespace libtorrent assert(block.block_index < blocks_in_piece(block.piece_index)); piece_pos& p = m_piece_map[block.piece_index]; - int prio = p.priority(m_sequenced_download_threshold); + assert(p.downloading); - if (p.downloading == 0) +/* if (p.downloading == 0) { + int prio = p.priority(m_sequenced_download_threshold); p.downloading = 1; if (prio > 0) move(prio, p.index); else assert(p.priority(m_sequenced_download_threshold) == 0); @@ -1325,25 +1353,84 @@ namespace libtorrent dp.state = none; dp.index = block.piece_index; block_info& info = dp.info[block.block_index]; - info.requested = 1; - info.finished = 1; - ++dp.requested; - ++dp.finished; - dp.info[block.block_index].peer = peer; + info.peer = peer; + if (info.state == block_info::state_requested) --dp.requested; + assert(dp.requested >= 0); + assert (info.state != block_info::state_finished); + assert (info.state != block_info::state_writing); + if (info.state != block_info::state_requested) ++dp.writing; + info.state = block_info::state_writing; + } + else +*/ { + std::vector::iterator i + = std::find_if(m_downloads.begin(), m_downloads.end(), has_index(block.piece_index)); + assert(i != m_downloads.end()); + block_info& info = i->info[block.block_index]; + info.peer == peer; + assert(info.state == block_info::state_requested); + if (info.state == block_info::state_requested) --i->requested; + assert(i->requested >= 0); + assert (info.state != block_info::state_writing); + ++i->writing; + info.state = block_info::state_writing; + + if (i->requested == 0) + { + // there are no blocks requested in this piece. + // remove the fast/slow state from it + i->state = none; + } + } + } + + void piece_picker::mark_as_finished(piece_block block, tcp::endpoint const& peer) + { + assert(block.piece_index >= 0); + assert(block.block_index >= 0); + assert(block.piece_index < (int)m_piece_map.size()); + assert(block.block_index < blocks_in_piece(block.piece_index)); + + piece_pos& p = m_piece_map[block.piece_index]; + + if (p.downloading == 0) + { + TORRENT_PIECE_PICKER_INVARIANT_CHECK; + + assert(peer == tcp::endpoint()); + int prio = p.priority(m_sequenced_download_threshold); + p.downloading = 1; + if (prio > 0) move(prio, p.index); + else assert(p.priority(m_sequenced_download_threshold) == 0); + + downloading_piece& dp = add_download_piece(); + dp.state = none; + dp.index = block.piece_index; + block_info& info = dp.info[block.block_index]; + info.peer = peer; + assert(info.state == block_info::state_none); +// if (info.state == block_info::state_writing) --dp.writing; +// assert(dp.writing >= 0); + if (info.state != block_info::state_finished) ++dp.finished; + info.state = block_info::state_finished; } else { + TORRENT_PIECE_PICKER_INVARIANT_CHECK; + std::vector::iterator i = std::find_if(m_downloads.begin(), m_downloads.end(), has_index(block.piece_index)); assert(i != m_downloads.end()); block_info& info = i->info[block.block_index]; info.peer = peer; - if (!info.requested) ++i->requested; - info.requested = 1; - if (!info.finished) ++i->finished; - info.finished = 1; + assert(info.state == block_info::state_writing + || peer == tcp::endpoint()); + if (info.state == block_info::state_writing) --i->writing; + assert(i->writing >= 0); + ++i->finished; + info.state = block_info::state_finished; - if (i->requested == i->finished) + if (i->requested == 0) { // there are no blocks requested in this piece. // remove the fast/slow state from it @@ -1378,8 +1465,7 @@ namespace libtorrent assert(block.block_index >= 0); - if (i->info[block.block_index].requested == false - || i->info[block.block_index].requested == true) + if (i->info[block.block_index].state == block_info::state_none) return boost::optional(); return boost::optional(i->info[block.block_index].peer); @@ -1405,17 +1491,17 @@ namespace libtorrent , m_downloads.end(), has_index(block.piece_index)); assert(i != m_downloads.end()); - if (i->info[block.block_index].finished) + if (i->info[block.block_index].state == block_info::state_finished + || i->info[block.block_index].state == block_info::state_writing) { - assert(i->info[block.block_index].requested); return; } assert(block.block_index < blocks_in_piece(block.piece_index)); - assert(i->info[block.block_index].requested); + assert(i->info[block.block_index].state == block_info::state_requested); // clear this block as being downloaded - i->info[block.block_index].requested = false; + i->info[block.block_index].state = block_info::state_none; --i->requested; // clear the downloader of this block @@ -1423,7 +1509,7 @@ namespace libtorrent // if there are no other blocks in this piece // that's being downloaded, remove it from the list - if (i->requested == 0) + if (i->requested + i->finished + i->writing == 0) { erase_download_piece(i); piece_pos& p = m_piece_map[block.piece_index]; @@ -1434,7 +1520,7 @@ namespace libtorrent assert(std::find_if(m_downloads.begin(), m_downloads.end() , has_index(block.piece_index)) == m_downloads.end()); } - else if (i->requested == i->finished) + else if (i->requested == 0) { // there are no blocks requested in this piece. // remove the fast/slow state from it diff --git a/src/policy.cpp b/src/policy.cpp index fe119abc9..eca5ba613 100755 --- a/src/policy.cpp +++ b/src/policy.cpp @@ -246,7 +246,7 @@ namespace libtorrent for (std::vector::iterator i = interesting_pieces.begin(); i != interesting_pieces.end(); ++i) { - if (p.is_downloading(*i)) + if (p.is_requested(*i)) { busy_pieces.push_back(*i); continue; @@ -1149,18 +1149,6 @@ namespace libtorrent } } - // TODO: we must be able to get interested - // in a peer again, if a piece fails that - // this peer has. - void policy::block_finished(peer_connection& c, piece_block) - { - INVARIANT_CHECK; - - // if the peer hasn't choked us, ask for another piece - if (!c.has_peer_choked() && !m_torrent->is_seed()) - request_a_block(*m_torrent, c); - } - // this is called when we are unchoked by a peer // i.e. a peer lets us know that we will receive // data from now on diff --git a/src/session.cpp b/src/session.cpp index 40959f3ad..485c90d62 100755 --- a/src/session.cpp +++ b/src/session.cpp @@ -177,7 +177,7 @@ namespace libtorrent // if the torrent already exists, this will throw duplicate_torrent torrent_handle session::add_torrent( torrent_info const& ti - , boost::filesystem::path const& save_path + , fs::path const& save_path , entry const& resume_data , bool compact_mode , int block_size @@ -191,7 +191,7 @@ namespace libtorrent char const* tracker_url , sha1_hash const& info_hash , char const* name - , boost::filesystem::path const& save_path + , fs::path const& save_path , entry const& e , bool compact_mode , int block_size diff --git a/src/session_impl.cpp b/src/session_impl.cpp index 82b5e86e9..022e4f184 100755 --- a/src/session_impl.cpp +++ b/src/session_impl.cpp @@ -81,7 +81,11 @@ using boost::bind; using boost::mutex; using libtorrent::aux::session_impl; -namespace libtorrent { namespace detail +namespace libtorrent { + +namespace fs = boost::filesystem; + +namespace detail { std::string generate_auth_string(std::string const& user @@ -412,7 +416,6 @@ namespace libtorrent { namespace detail for (std::deque >::iterator i = m_processing.begin(); i != m_processing.end(); ++i) { - if ((*i)->info_hash == info_hash) return i->get(); } @@ -1276,7 +1279,7 @@ namespace libtorrent { namespace detail torrent_handle session_impl::add_torrent( torrent_info const& ti - , boost::filesystem::path const& save_path + , fs::path const& save_path , entry const& resume_data , bool compact_mode , int block_size @@ -1367,7 +1370,7 @@ namespace libtorrent { namespace detail char const* tracker_url , sha1_hash const& info_hash , char const* name - , boost::filesystem::path const& save_path + , fs::path const& save_path , entry const& , bool compact_mode , int block_size @@ -2108,12 +2111,13 @@ namespace libtorrent { namespace detail for (int j = 0; j < num_bitmask_bytes; ++j) { unsigned char bits = bitmask[j]; - for (int k = 0; k < 8; ++k) + int num_bits = std::min(num_blocks_per_piece - j*8, 8); + for (int k = 0; k < num_bits; ++k) { const int bit = j * 8 + k; if (bits & (1 << k)) { - p.info[bit].finished = true; + p.info[bit].state = piece_picker::block_info::state_finished; ++p.finished; } } diff --git a/src/storage.cpp b/src/storage.cpp index 2f77067ab..c7967283c 100755 --- a/src/storage.cpp +++ b/src/storage.cpp @@ -127,7 +127,7 @@ namespace using namespace boost::filesystem; // based on code from Boost.Fileystem - bool create_directories_win(const path& ph) + bool create_directories_win(const fs::path& ph) { if (ph.empty() || exists(ph)) { @@ -147,7 +147,7 @@ namespace return true; } - bool exists_win( const path & ph ) + bool exists_win( const fs::path & ph ) { std::wstring wpath(safe_convert(ph.string())); if(::GetFileAttributes( wpath.c_str() ) == 0xFFFFFFFF) @@ -167,7 +167,7 @@ namespace return true; } - boost::intmax_t file_size_win( const path & ph ) + boost::intmax_t file_size_win( const fs::path & ph ) { std::wstring wpath(safe_convert(ph.string())); // by now, intmax_t is 64-bits on all Windows compilers @@ -187,7 +187,7 @@ namespace + fad.nFileSizeLow; } - std::time_t last_write_time_win( const path & ph ) + std::time_t last_write_time_win( const fs::path & ph ) { struct _stat path_stat; std::wstring wph(safe_convert(ph.native_file_string())); @@ -198,8 +198,8 @@ namespace return path_stat.st_mtime; } - void rename_win( const path & old_path, - const path & new_path ) + void rename_win( const fs::path & old_path, + const fs::path & new_path ) { std::wstring wold_path(safe_convert(old_path.string())); std::wstring wnew_path(safe_convert(new_path.string())); @@ -214,14 +214,13 @@ namespace #endif #if BOOST_VERSION < 103200 -bool operator<(boost::filesystem::path const& lhs - , boost::filesystem::path const& rhs) +bool operator<(fs::path const& lhs, fs::path const& rhs) { return lhs.string() < rhs.string(); } #endif -using namespace boost::filesystem; +namespace fs = boost::filesystem; using boost::bind; using namespace ::boost::multi_index; using boost::multi_index::multi_index_container; @@ -244,7 +243,7 @@ namespace libtorrent { std::vector > get_filesizes( - torrent_info const& t, path p) + torrent_info const& t, fs::path p) { p = complete(p); std::vector > sizes; @@ -255,7 +254,7 @@ namespace libtorrent std::time_t time = 0; try { - path f = p / i->path; + fs::path f = p / i->path; #if defined(_WIN32) && defined(UNICODE) && BOOST_VERSION < 103400 size = file_size_win(f); time = last_write_time_win(f); @@ -278,7 +277,7 @@ namespace libtorrent // still be a correct subset of the actual data on disk. bool match_filesizes( torrent_info const& t - , path p + , fs::path p , std::vector > const& sizes , bool compact_mode , std::string* error) @@ -299,7 +298,7 @@ namespace libtorrent std::time_t time = 0; try { - path f = p / i->path; + fs::path f = p / i->path; #if defined(_WIN32) && defined(UNICODE) && BOOST_VERSION < 103400 size = file_size_win(f); time = last_write_time_win(f); @@ -371,19 +370,19 @@ namespace libtorrent class storage : public storage_interface, thread_safe_storage, boost::noncopyable { public: - storage(torrent_info const& info, path const& path, file_pool& fp) + storage(torrent_info const& info, fs::path const& path, file_pool& fp) : thread_safe_storage(info.num_pieces()) , m_info(info) , m_files(fp) { assert(info.begin_files() != info.end_files()); - m_save_path = complete(path); + m_save_path = fs::complete(path); assert(m_save_path.is_complete()); } void release_files(); void initialize(bool allocate_files); - bool move_storage(path save_path); + bool move_storage(fs::path save_path); size_type read(char* buf, int slot, int offset, int size); void write(const char* buf, int slot, int offset, int size); void move_slot(int src_slot, int dst_slot); @@ -391,6 +390,7 @@ namespace libtorrent void swap_slots3(int slot1, int slot2, int slot3); bool verify_resume_data(entry& rd, std::string& error); void write_resume_data(entry& rd) const; + sha1_hash hash_for_slot(int slot, partial_hash& ph, int piece_size); size_type read_impl(char* buf, int slot, int offset, int size, bool fill_zero); @@ -400,7 +400,7 @@ namespace libtorrent } torrent_info const& m_info; - path m_save_path; + fs::path m_save_path; // the file pool is typically stored in // the session, to make all storage // instances use the same pool @@ -410,14 +410,38 @@ namespace libtorrent std::vector m_scratch_buffer; }; + sha1_hash storage::hash_for_slot(int slot, partial_hash& ph, int piece_size) + { +#ifndef NDEBUG + hasher partial; + hasher whole; + int slot_size1 = piece_size; + m_scratch_buffer.resize(slot_size1); + read_impl(&m_scratch_buffer[0], slot, 0, slot_size1, true); + if (ph.offset > 0) + partial.update(&m_scratch_buffer[0], ph.offset); + whole.update(&m_scratch_buffer[0], slot_size1); + hasher partial_copy = ph.h; + assert(ph.offset == 0 || partial_copy.final() == partial.final()); +#endif + int slot_size = piece_size - ph.offset; + if (slot_size == 0) return ph.h.final(); + m_scratch_buffer.resize(slot_size); + read_impl(&m_scratch_buffer[0], slot, ph.offset, slot_size, true); + ph.h.update(&m_scratch_buffer[0], slot_size); + sha1_hash ret = ph.h.final(); + assert(whole.final() == ret); + return ret; + } + void storage::initialize(bool allocate_files) { // first, create all missing directories - path last_path; + fs::path last_path; for (torrent_info::file_iterator file_iter = m_info.begin_files(), end_iter = m_info.end_files(); file_iter != end_iter; ++file_iter) { - path dir = (m_save_path / file_iter->path).branch_path(); + fs::path dir = (m_save_path / file_iter->path).branch_path(); if (dir != last_path) { @@ -537,10 +561,10 @@ namespace libtorrent } // returns true on success - bool storage::move_storage(path save_path) + bool storage::move_storage(fs::path save_path) { - path old_path; - path new_path; + fs::path old_path; + fs::path new_path; save_path = complete(save_path); @@ -772,7 +796,7 @@ namespace libtorrent // this file was empty, don't increment the slice counter if (read_bytes > 0) ++counter; #endif - path path = m_save_path / file_iter->path; + fs::path path = m_save_path / file_iter->path; file_offset = 0; in = m_files.open_file( @@ -820,7 +844,7 @@ namespace libtorrent assert(file_iter != m_info.end_files()); } - path p(m_save_path / file_iter->path); + fs::path p(m_save_path / file_iter->path); boost::shared_ptr out = m_files.open_file( this, p, file::out | file::in); @@ -890,7 +914,7 @@ namespace libtorrent ++file_iter; assert(file_iter != m_info.end_files()); - path p = m_save_path / file_iter->path; + fs::path p = m_save_path / file_iter->path; file_offset = 0; out = m_files.open_file( this, p, file::out | file::in); @@ -901,12 +925,12 @@ namespace libtorrent } storage_interface* default_storage_constructor(torrent_info const& ti - , boost::filesystem::path const& path, file_pool& fp) + , fs::path const& path, file_pool& fp) { return new storage(ti, path, fp); } - bool supports_sparse_files(path const& p) + bool supports_sparse_files(fs::path const& p) { assert(p.is_complete()); #if defined(_WIN32) @@ -930,7 +954,7 @@ namespace libtorrent #if defined(__APPLE__) || defined(__linux__) // find the last existing directory of the save path - path query_path = p; + fs::path query_path = p; while (!query_path.empty() && !exists(query_path)) query_path = query_path.branch_path(); #endif @@ -1004,215 +1028,139 @@ namespace libtorrent // -- piece_manager ----------------------------------------------------- - class piece_manager::impl - { - friend class invariant_access; - public: - - impl( - torrent_info const& info - , path const& path - , file_pool& fp - , storage_constructor_type sc); - - bool check_fastresume( - aux::piece_checker_data& d - , std::vector& pieces - , int& num_pieces - , bool compact_mode); - - std::pair check_files( - std::vector& pieces - , int& num_pieces, boost::recursive_mutex& mutex); - - void release_files(); - - bool allocate_slots(int num_slots, bool abort_on_disk = false); - void mark_failed(int index); - unsigned long piece_crc( - int slot_index - , int block_size - , piece_picker::block_info const* bi); - - int slot_for_piece(int piece_index) const; - - size_type read( - char* buf - , int piece_index - , int offset - , int size); - - void write( - const char* buf - , int piece_index - , int offset - , int size); - - path const& save_path() const - { return m_save_path; } - - bool move_storage(path save_path) - { - if (m_storage->move_storage(save_path)) - { - m_save_path = complete(save_path); - return true; - } - return false; - } - - void export_piece_map(std::vector& p) const; - - // returns the slot currently associated with the given - // piece or assigns the given piece_index to a free slot - - int identify_data( - const std::vector& piece_data - , int current_slot - , std::vector& have_pieces - , int& num_pieces - , const std::multimap& hash_to_piece - , boost::recursive_mutex& mutex); - - int allocate_slot_for_piece(int piece_index); -#ifndef NDEBUG - void check_invariant() const; -#ifdef TORRENT_STORAGE_DEBUG - void debug_log() const; -#endif -#endif - boost::scoped_ptr m_storage; - - // if this is true, pieces are always allocated at the - // lowest possible slot index. If it is false, pieces - // are always written to their final place immediately - bool m_compact_mode; - - // if this is true, pieces that haven't been downloaded - // will be filled with zeroes. Not filling with zeroes - // will not work in some cases (where a seek cannot pass - // the end of the file). - bool m_fill_mode; - - // a bitmask representing the pieces we have - std::vector m_have_piece; - - torrent_info const& m_info; - - // slots that haven't had any file storage allocated - std::vector m_unallocated_slots; - // slots that have file storage, but isn't assigned to a piece - std::vector m_free_slots; - - enum - { - has_no_slot = -3 // the piece has no storage - }; - - // maps piece indices to slots. If a piece doesn't - // have any storage, it is set to 'has_no_slot' - std::vector m_piece_to_slot; - - enum - { - unallocated = -1, // the slot is unallocated - unassigned = -2 // the slot is allocated but not assigned to a piece - }; - - // maps slots to piece indices, if a slot doesn't have a piece - // it can either be 'unassigned' or 'unallocated' - std::vector m_slot_to_piece; - - path m_save_path; - - mutable boost::recursive_mutex m_mutex; - - bool m_allocating; - boost::mutex m_allocating_monitor; - boost::condition m_allocating_condition; - - // these states are used while checking/allocating the torrent - - enum { - // the default initial state - state_none, - // the file checking is complete - state_finished, - // creating the directories - state_create_files, - // checking the files - state_full_check, - // allocating files (in non-compact mode) - state_allocating - } m_state; - int m_current_slot; - - std::vector m_piece_data; - - // this maps a piece hash to piece index. It will be - // build the first time it is used (to save time if it - // isn't needed) - std::multimap m_hash_to_piece; - }; - - piece_manager::impl::impl( - torrent_info const& info - , path const& save_path + piece_manager::piece_manager( + boost::shared_ptr const& torrent + , torrent_info const& ti + , fs::path const& save_path , file_pool& fp + , disk_io_thread& io , storage_constructor_type sc) - : m_storage(sc(info, save_path, fp)) + : m_storage(sc(ti, save_path, fp)) , m_compact_mode(false) , m_fill_mode(true) - , m_info(info) + , m_info(ti) , m_save_path(complete(save_path)) , m_allocating(false) + , m_io_thread(io) + , m_torrent(torrent) { m_fill_mode = !supports_sparse_files(save_path); } - piece_manager::piece_manager( - torrent_info const& info - , path const& save_path - , file_pool& fp - , storage_constructor_type sc) - : m_pimpl(new impl(info, save_path, fp, sc)) - { - } - piece_manager::~piece_manager() { } void piece_manager::write_resume_data(entry& rd) const { - m_pimpl->m_storage->write_resume_data(rd); + m_storage->write_resume_data(rd); } bool piece_manager::verify_resume_data(entry& rd, std::string& error) { - return m_pimpl->m_storage->verify_resume_data(rd, error); + return m_storage->verify_resume_data(rd, error); } - void piece_manager::release_files() + void piece_manager::async_release_files() { - m_pimpl->release_files(); + disk_io_job j; + j.storage = this; + j.action = disk_io_job::release_files; + m_io_thread.add_job(j); } - void piece_manager::impl::release_files() + void piece_manager::async_move_storage(fs::path const& p) { - // synchronization ------------------------------------------------------ - boost::recursive_mutex::scoped_lock lock(m_mutex); - // ---------------------------------------------------------------------- + disk_io_job j; + j.storage = this; + j.action = disk_io_job::move_storage; + j.str = p.string(); + m_io_thread.add_job(j); + } + void piece_manager::async_read( + peer_request const& r + , boost::function const& handler) + { + disk_io_job j; + j.storage = this; + j.action = disk_io_job::read; + j.piece = r.piece; + j.offset = r.start; + j.buffer_size = r.length; + m_io_thread.add_job(j, handler); + } + + void piece_manager::async_write( + peer_request const& r + , char const* buffer + , boost::function const& handler) + { + assert(r.length <= 16 * 1024); + + disk_io_job j; + j.storage = this; + j.action = disk_io_job::write; + j.piece = r.piece; + j.offset = r.start; + j.buffer_size = r.length; + j.buffer = m_io_thread.allocate_buffer(); + if (j.buffer == 0) throw file_error("out of memory"); + std::memcpy(j.buffer, buffer, j.buffer_size); + m_io_thread.add_job(j, handler); + } + + void piece_manager::async_hash(int piece + , boost::function const& handler) + { + disk_io_job j; + j.storage = this; + j.action = disk_io_job::hash; + j.piece = piece; + + m_io_thread.add_job(j, handler); + } + + fs::path piece_manager::save_path() const + { + boost::recursive_mutex::scoped_lock l(m_mutex); + return m_save_path; + } + + sha1_hash piece_manager::hash_for_piece_impl(int piece) + { + partial_hash ph; + + std::map::iterator i = m_piece_hasher.find(piece); + if (i != m_piece_hasher.end()) + { + ph = i->second; + m_piece_hasher.erase(i); + } + + int slot = m_piece_to_slot[piece]; + assert(slot != has_no_slot); + return m_storage->hash_for_slot(slot, ph, m_info.piece_size(piece)); + } + + void piece_manager::release_files_impl() + { m_storage->release_files(); } - void piece_manager::impl::export_piece_map( + bool piece_manager::move_storage_impl(fs::path const& save_path) + { + if (m_storage->move_storage(save_path)) + { + m_save_path = fs::complete(save_path); + return true; + } + return false; + } + void piece_manager::export_piece_map( std::vector& p) const { - // synchronization ------------------------------------------------------ boost::recursive_mutex::scoped_lock lock(m_mutex); - // ---------------------------------------------------------------------- INVARIANT_CHECK; @@ -1232,20 +1180,9 @@ namespace libtorrent } } - bool piece_manager::compact_allocation() const - { return m_pimpl->m_compact_mode; } - - void piece_manager::export_piece_map( - std::vector& p) const + void piece_manager::mark_failed(int piece_index) { - m_pimpl->export_piece_map(p); - } - - void piece_manager::impl::mark_failed(int piece_index) - { - // synchronization ------------------------------------------------------ boost::recursive_mutex::scoped_lock lock(m_mutex); - // ---------------------------------------------------------------------- INVARIANT_CHECK; @@ -1261,37 +1198,13 @@ namespace libtorrent m_free_slots.push_back(slot_index); } - void piece_manager::mark_failed(int index) - { - m_pimpl->mark_failed(index); - } - - bool piece_manager::is_allocating() const - { - return m_pimpl->m_state - == impl::state_allocating; - } - int piece_manager::slot_for_piece(int piece_index) const - { - return m_pimpl->slot_for_piece(piece_index); - } - - int piece_manager::impl::slot_for_piece(int piece_index) const { assert(piece_index >= 0 && piece_index < m_info.num_pieces()); return m_piece_to_slot[piece_index]; } unsigned long piece_manager::piece_crc( - int index - , int block_size - , piece_picker::block_info const* bi) - { - return m_pimpl->piece_crc(index, block_size, bi); - } - - unsigned long piece_manager::impl::piece_crc( int slot_index , int block_size , piece_picker::block_info const* bi) @@ -1309,7 +1222,7 @@ namespace libtorrent for (int i = 0; i < num_blocks-1; ++i) { - if (!bi[i].finished) continue; + if (!bi[i].state == piece_picker::block_info::state_finished) continue; m_storage->read( &buf[0] , slot_index @@ -1317,7 +1230,7 @@ namespace libtorrent , block_size); crc.update(&buf[0], block_size); } - if (bi[num_blocks - 1].finished) + if (bi[num_blocks - 1].state == piece_picker::block_info::state_finished) { m_storage->read( &buf[0] @@ -1333,7 +1246,7 @@ namespace libtorrent return 0; } - size_type piece_manager::impl::read( + size_type piece_manager::read_impl( char* buf , int piece_index , int offset @@ -1350,16 +1263,7 @@ namespace libtorrent return m_storage->read(buf, slot, offset, size); } - size_type piece_manager::read( - char* buf - , int piece_index - , int offset - , int size) - { - return m_pimpl->read(buf, piece_index, offset, size); - } - - void piece_manager::impl::write( + void piece_manager::write_impl( const char* buf , int piece_index , int offset @@ -1369,21 +1273,34 @@ namespace libtorrent assert(offset >= 0); assert(size > 0); assert(piece_index >= 0 && piece_index < (int)m_piece_to_slot.size()); + + if (offset == 0) + { + partial_hash& ph = m_piece_hasher[piece_index]; + assert(ph.offset == 0); + ph.offset = size; + ph.h.update(buf, size); + } + else + { + std::map::iterator i = m_piece_hasher.find(piece_index); + if (i != m_piece_hasher.end()) + { + assert(i->second.offset > 0); + if (offset == i->second.offset) + { + i->second.offset += size; + i->second.h.update(buf, size); + } + } + } + int slot = allocate_slot_for_piece(piece_index); assert(slot >= 0 && slot < (int)m_slot_to_piece.size()); m_storage->write(buf, slot, offset, size); } - void piece_manager::write( - const char* buf - , int piece_index - , int offset - , int size) - { - m_pimpl->write(buf, piece_index, offset, size); - } - - int piece_manager::impl::identify_data( + int piece_manager::identify_data( const std::vector& piece_data , int current_slot , std::vector& have_pieces @@ -1447,6 +1364,8 @@ namespace libtorrent , matching_pieces.end() , current_slot) != matching_pieces.end()) { + // the current slot is among the matching pieces, so + // we will assume that the piece is in the right place const int piece_index = current_slot; // lock because we're writing to have_pieces @@ -1542,18 +1461,17 @@ namespace libtorrent // if it is, use it and return true. If it // isn't return false and the full check // will be run - bool piece_manager::impl::check_fastresume( + bool piece_manager::check_fastresume( aux::piece_checker_data& data , std::vector& pieces , int& num_pieces, bool compact_mode) { - assert(m_info.piece_length() > 0); - // synchronization ------------------------------------------------------ boost::recursive_mutex::scoped_lock lock(m_mutex); - // ---------------------------------------------------------------------- INVARIANT_CHECK; + assert(m_info.piece_length() > 0); + m_compact_mode = compact_mode; // This will corrupt the storage @@ -1613,20 +1531,25 @@ namespace libtorrent m_unallocated_slots.push_back(i); } - if (m_compact_mode || m_unallocated_slots.empty()) + if (m_unallocated_slots.empty()) + { + m_state = state_create_files; + return false; + } + + if (m_compact_mode) { m_state = state_create_files; return false; } } - m_current_slot = 0; m_state = state_full_check; return false; } /* - state chart: + state chart: check_fastresume() @@ -1659,7 +1582,7 @@ namespace libtorrent // the second return value is the progress the // file check is at. 0 is nothing done, and 1 // is finished - std::pair piece_manager::impl::check_files( + std::pair piece_manager::check_files( std::vector& pieces, int& num_pieces, boost::recursive_mutex& mutex) { assert(num_pieces == std::count(pieces.begin(), pieces.end(), true)); @@ -1712,7 +1635,6 @@ namespace libtorrent if (!m_unallocated_slots.empty() && !m_compact_mode) { assert(!m_fill_mode); - assert(!m_compact_mode); std::vector().swap(m_unallocated_slots); std::fill(m_slot_to_piece.begin(), m_slot_to_piece.end(), int(unassigned)); m_free_slots.resize(m_info.num_pieces()); @@ -1732,6 +1654,16 @@ namespace libtorrent try { + // initialization for the full check + if (m_hash_to_piece.empty()) + { + m_current_slot = 0; + for (int i = 0; i < m_info.num_pieces(); ++i) + { + m_hash_to_piece.insert(std::make_pair(m_info.hash_for_piece(i), i)); + } + std::fill(pieces.begin(), pieces.end(), false); + } m_piece_data.resize(int(m_info.piece_length())); int piece_size = int(m_info.piece_size(m_current_slot)); @@ -1742,14 +1674,6 @@ namespace libtorrent if (num_read != piece_size) throw file_error(""); - if (m_hash_to_piece.empty()) - { - for (int i = 0; i < m_info.num_pieces(); ++i) - { - m_hash_to_piece.insert(std::make_pair(m_info.hash_for_piece(i), i)); - } - } - int piece_index = identify_data(m_piece_data, m_current_slot , pieces, num_pieces, m_hash_to_piece, mutex); @@ -1986,26 +1910,9 @@ namespace libtorrent return std::make_pair(false, (float)m_current_slot / m_info.num_pieces()); } - bool piece_manager::check_fastresume( - aux::piece_checker_data& d, std::vector& pieces - , int& num_pieces, bool compact_mode) + int piece_manager::allocate_slot_for_piece(int piece_index) { - return m_pimpl->check_fastresume(d, pieces, num_pieces, compact_mode); - } - - std::pair piece_manager::check_files( - std::vector& pieces - , int& num_pieces - , boost::recursive_mutex& mutex) - { - return m_pimpl->check_files(pieces, num_pieces, mutex); - } - - int piece_manager::impl::allocate_slot_for_piece(int piece_index) - { - // synchronization ------------------------------------------------------ boost::recursive_mutex::scoped_lock lock(m_mutex); - // ---------------------------------------------------------------------- // INVARIANT_CHECK; @@ -2112,56 +2019,11 @@ namespace libtorrent return slot_index; } - namespace - { - // this is used to notify potential other - // threads that the allocation-function has exited - struct allocation_syncronization - { - allocation_syncronization( - bool& flag - , boost::condition& cond - , boost::mutex& monitor) - : m_flag(flag) - , m_cond(cond) - , m_monitor(monitor) - { - boost::mutex::scoped_lock lock(m_monitor); - - while (m_flag) - m_cond.wait(lock); - - m_flag = true; - } - - ~allocation_syncronization() - { - boost::mutex::scoped_lock lock(m_monitor); - m_flag = false; - m_cond.notify_one(); - } - - bool& m_flag; - boost::condition& m_cond; - boost::mutex& m_monitor; - }; - - } - - bool piece_manager::impl::allocate_slots(int num_slots, bool abort_on_disk) + bool piece_manager::allocate_slots(int num_slots, bool abort_on_disk) { assert(num_slots > 0); - // this object will syncronize the allocation with - // potential other threads - allocation_syncronization sync_obj( - m_allocating - , m_allocating_condition - , m_allocating_monitor); - - // synchronization ------------------------------------------------------ boost::recursive_mutex::scoped_lock lock(m_mutex); - // ---------------------------------------------------------------------- // INVARIANT_CHECK; @@ -2212,27 +2074,10 @@ namespace libtorrent return written; } - bool piece_manager::allocate_slots(int num_slots, bool abort_on_disk) - { - return m_pimpl->allocate_slots(num_slots, abort_on_disk); - } - - path const& piece_manager::save_path() const - { - return m_pimpl->save_path(); - } - - bool piece_manager::move_storage(path const& save_path) - { - return m_pimpl->move_storage(save_path); - } - #ifndef NDEBUG - void piece_manager::impl::check_invariant() const + void piece_manager::check_invariant() const { - // synchronization ------------------------------------------------------ boost::recursive_mutex::scoped_lock lock(m_mutex); - // ---------------------------------------------------------------------- if (m_piece_to_slot.empty()) return; assert((int)m_piece_to_slot.size() == m_info.num_pieces()); @@ -2340,7 +2185,7 @@ namespace libtorrent } #ifdef TORRENT_STORAGE_DEBUG - void piece_manager::impl::debug_log() const + void piece_manager::debug_log() const { std::stringstream s; diff --git a/src/torrent.cpp b/src/torrent.cpp index 90cb687de..e4589392f 100755 --- a/src/torrent.cpp +++ b/src/torrent.cpp @@ -77,7 +77,6 @@ using namespace libtorrent; using boost::tuples::tuple; using boost::tuples::get; using boost::tuples::make_tuple; -using boost::filesystem::complete; using boost::bind; using boost::mutex; using libtorrent::aux::session_impl; @@ -147,11 +146,12 @@ namespace namespace libtorrent { + torrent::torrent( session_impl& ses , aux::checker_impl& checker , torrent_info const& tf - , boost::filesystem::path const& save_path + , fs::path const& save_path , tcp::endpoint const& net_interface , bool compact_mode , int block_size @@ -204,8 +204,6 @@ namespace libtorrent m_initial_done = 0; #endif - INVARIANT_CHECK; - m_uploads_quota.min = 2; m_connections_quota.min = 2; // this will be corrected the next time the main session @@ -214,7 +212,6 @@ namespace libtorrent m_uploads_quota.max = std::numeric_limits::max(); m_connections_quota.max = std::numeric_limits::max(); m_policy.reset(new policy(this)); - init(); } @@ -224,7 +221,7 @@ namespace libtorrent , char const* tracker_url , sha1_hash const& info_hash , char const* name - , boost::filesystem::path const& save_path + , fs::path const& save_path , tcp::endpoint const& net_interface , bool compact_mode , int block_size @@ -299,6 +296,7 @@ namespace libtorrent void torrent::start() { boost::weak_ptr self(shared_from_this()); + if (m_torrent_file.is_valid()) init(); m_announce_timer.expires_from_now(seconds(1)); m_announce_timer.async_wait(m_ses.m_strand.wrap( bind(&torrent::on_announce_disp, self, _1))); @@ -331,8 +329,7 @@ namespace libtorrent INVARIANT_CHECK; - if (m_ses.is_aborted()) - m_abort = true; + assert(m_abort); if (!m_connections.empty()) disconnect_all(); } @@ -351,17 +348,20 @@ namespace libtorrent } #endif + // this may not be called from a constructor because of the call to + // shared_from_this() void torrent::init() { - INVARIANT_CHECK; - assert(m_torrent_file.is_valid()); assert(m_torrent_file.num_files() > 0); assert(m_torrent_file.total_size() >= 0); m_have_pieces.resize(m_torrent_file.num_pieces(), false); - m_storage.reset(new piece_manager(m_torrent_file, m_save_path - , m_ses.m_files, m_storage_constructor)); + // the shared_from_this() will create an intentional + // cycle of ownership, se the hpp file for description. + m_owning_storage = new piece_manager(shared_from_this(), m_torrent_file + , m_save_path, m_ses.m_files, m_ses.m_disk_thread, m_storage_constructor); + m_storage = m_owning_storage.get(); m_block_size = calculate_block_size(m_torrent_file, m_default_block_size); m_picker.reset(new piece_picker( static_cast(m_torrent_file.piece_length() / m_block_size) @@ -690,7 +690,7 @@ namespace libtorrent int corr = 0; int index = i->index; assert(!m_have_pieces[index]); - assert(i->finished < m_picker->blocks_in_piece(index)); + assert(i->finished <= m_picker->blocks_in_piece(index)); #ifndef NDEBUG for (std::vector::const_iterator j = boost::next(i); @@ -702,17 +702,17 @@ namespace libtorrent for (int j = 0; j < blocks_per_piece; ++j) { - assert(i->info[j].finished == 0 || i->info[j].finished == 1); - assert(m_picker->is_finished(piece_block(index, j)) == i->info[j].finished); - corr += i->info[j].finished * m_block_size; + assert(m_picker->is_finished(piece_block(index, j)) == (i->info[j].state == piece_picker::block_info::state_finished)); + corr += (i->info[j].state == piece_picker::block_info::state_finished) * m_block_size; assert(index != last_piece || j < m_picker->blocks_in_last_piece() - || i->info[j].finished == 0); + || i->info[j].state != piece_picker::block_info::state_finished); } // correction if this was the last piece // and if we have the last block if (i->index == last_piece - && i->info[m_picker->blocks_in_last_piece()-1].finished) + && i->info[m_picker->blocks_in_last_piece()-1].state + == piece_picker::block_info::state_finished) { corr -= m_block_size; corr += m_torrent_file.piece_size(last_piece) % m_block_size; @@ -722,8 +722,8 @@ namespace libtorrent wanted_done += corr; } - assert(total_done < m_torrent_file.total_size()); - assert(wanted_done < m_torrent_file.total_size()); + assert(total_done <= m_torrent_file.total_size()); + assert(wanted_done <= m_torrent_file.total_size()); std::map downloading_piece; for (const_peer_iterator i = begin(); i != end(); ++i) @@ -787,7 +787,7 @@ namespace libtorrent std::cerr << " " << i->index << " "; for (int j = 0; j < blocks_per_piece; ++j) { - std::cerr << i->info[j].finished; + std::cerr << (i->info[j].state == piece_picker::block_info::state_finished ? "1" : "0"); } std::cerr << std::endl; } @@ -803,8 +803,8 @@ namespace libtorrent } - assert(total_done < m_torrent_file.total_size()); - assert(wanted_done < m_torrent_file.total_size()); + assert(total_done <= m_torrent_file.total_size()); + assert(wanted_done <= m_torrent_file.total_size()); #endif @@ -812,6 +812,84 @@ namespace libtorrent return make_tuple(total_done, wanted_done); } + void torrent::piece_finished(int index, bool passed_hash_check) + { + session_impl::mutex_t::scoped_lock l(m_ses.m_mutex); + + bool was_seed = is_seed(); + bool was_finished = m_picker->num_filtered() + num_pieces() + == torrent_file().num_pieces(); + + if (passed_hash_check) + { + // the following call may cause picker to become invalid + // in case we just became a seed + announce_piece(index); + assert(valid_metadata()); + // if we just became a seed, picker is now invalid, since it + // is deallocated by the torrent once it starts seeding + if (!was_finished + && (is_seed() + || m_picker->num_filtered() + num_pieces() + == torrent_file().num_pieces())) + { + // torrent finished + // i.e. all the pieces we're interested in have + // been downloaded. Release the files (they will open + // in read only mode if needed) + try { finished(); } + catch (std::exception& e) + { +#ifndef NDEBUG + std::cerr << e.what() << std::endl; + assert(false); +#endif + } + } + } + else + { + piece_failed(index); + } + +#ifndef NDEBUG + try + { +#endif + + m_policy->piece_finished(index, passed_hash_check); + +#ifndef NDEBUG + } + catch (std::exception const& e) + { + std::cerr << e.what() << std::endl; + assert(false); + } +#endif + +#ifndef NDEBUG + try + { +#endif + + if (!was_seed && is_seed()) + { + assert(passed_hash_check); + completed(); + } + +#ifndef NDEBUG + } + catch (std::exception const& e) + { + std::cerr << e.what() << std::endl; + assert(false); + } +#endif + + } + void torrent::piece_failed(int index) { // if the last piece fails the peer connection will still @@ -821,7 +899,8 @@ namespace libtorrent // (total_done == m_torrent_file.total_size()) => is_seed() // INVARIANT_CHECK; - assert(m_storage.get()); + assert(m_storage); + assert(m_storage->refcount() > 0); assert(m_picker.get()); assert(index >= 0); assert(index < m_torrent_file.num_pieces()); @@ -910,6 +989,7 @@ namespace libtorrent // start with redownloading the pieces that the client // that has sent the least number of pieces m_picker->restore_piece(index); + assert(m_storage); m_storage->mark_failed(index); assert(m_have_pieces[index] == false); @@ -927,7 +1007,8 @@ namespace libtorrent // disconnect all peers and close all // files belonging to the torrents disconnect_all(); - if (m_storage.get()) m_storage->release_files(); + if (m_owning_storage.get()) m_storage->async_release_files(); + m_owning_storage = 0; } void torrent::announce_piece(int index) @@ -1728,8 +1809,13 @@ namespace libtorrent void torrent::set_metadata(entry const& metadata) { + INVARIANT_CHECK; + + assert(!m_torrent_file.is_valid()); m_torrent_file.parse_info_section(metadata); + init(); + boost::mutex::scoped_lock(m_checker.m_mutex); boost::shared_ptr d( @@ -1943,7 +2029,8 @@ namespace libtorrent std::for_each(seeds.begin(), seeds.end() , bind(&peer_connection::disconnect, _1)); - m_storage->release_files(); + assert(m_storage); + m_storage->async_release_files(); } // called when torrent is complete (all pieces downloaded) @@ -2019,17 +2106,12 @@ namespace libtorrent { INVARIANT_CHECK; - if (!m_storage.get()) - { - // this means we have received the metadata through the - // metadata extension, and we have to initialize - init(); - } - - assert(m_storage.get()); + assert(valid_metadata()); bool done = true; try { + assert(m_storage); + assert(m_owning_storage.get()); done = m_storage->check_fastresume(data, m_have_pieces, m_num_pieces , m_compact_mode); } @@ -2058,11 +2140,12 @@ namespace libtorrent { INVARIANT_CHECK; - assert(m_storage.get()); + assert(m_owning_storage.get()); std::pair progress(true, 1.f); try { + assert(m_storage); progress = m_storage->check_files(m_have_pieces, m_num_pieces , m_ses.m_mutex); } @@ -2097,9 +2180,19 @@ namespace libtorrent if (!is_seed()) { - m_picker->files_checked(m_have_pieces, unfinished_pieces); + // this is filled in with pieces that needs to be checked + // against its hashes. + std::vector verify_pieces; + m_picker->files_checked(m_have_pieces, unfinished_pieces, verify_pieces); if (m_sequenced_download_threshold > 0) picker().set_sequenced_download_threshold(m_sequenced_download_threshold); + while (!verify_pieces.empty()) + { + int piece = verify_pieces.back(); + verify_pieces.pop_back(); + async_verify_piece(piece, bind(&torrent::piece_finished + , shared_from_this(), piece, _1)); + } } #ifndef TORRENT_DISABLE_EXTENSIONS @@ -2152,34 +2245,34 @@ namespace libtorrent return m_ses.m_alerts; } - boost::filesystem::path torrent::save_path() const + fs::path torrent::save_path() const { - return m_save_path; + if (m_owning_storage.get()) + return m_owning_storage->save_path(); + else + return m_save_path; } - bool torrent::move_storage(boost::filesystem::path const& save_path) + void torrent::move_storage(fs::path const& save_path) { INVARIANT_CHECK; - bool ret = true; - if (m_storage.get()) + if (m_owning_storage.get()) { - ret = m_storage->move_storage(save_path); - m_save_path = m_storage->save_path(); + m_owning_storage->async_move_storage(save_path); } else { m_save_path = save_path; } - return ret; } piece_manager& torrent::filesystem() { INVARIANT_CHECK; - assert(m_storage.get()); - return *m_storage; + assert(m_owning_storage.get()); + return *m_owning_storage; } @@ -2213,11 +2306,11 @@ namespace libtorrent if (valid_metadata()) { - assert(int(m_have_pieces.size()) == m_torrent_file.num_pieces()); + assert(m_abort || int(m_have_pieces.size()) == m_torrent_file.num_pieces()); } else { - assert(m_have_pieces.empty()); + assert(m_abort || m_have_pieces.empty()); } size_type total_done = quantized_bytes_done(); @@ -2336,7 +2429,13 @@ namespace libtorrent m_just_paused = true; // this will make the storage close all // files and flush all cached data - if (m_storage.get()) m_storage->release_files(); + if (m_owning_storage.get()) + { + assert(m_storage); + // TOOD: add a callback which posts + // an alert for the client to sync. with + m_storage->async_release_files(); + } } void torrent::resume() @@ -2468,28 +2567,26 @@ namespace libtorrent } } - bool torrent::verify_piece(int piece_index) + void torrent::async_verify_piece(int piece_index, boost::function const& f) { -// INVARIANT_CHECK; + INVARIANT_CHECK; - assert(m_storage.get()); + assert(m_storage); + assert(m_storage->refcount() > 0); assert(piece_index >= 0); assert(piece_index < m_torrent_file.num_pieces()); assert(piece_index < (int)m_have_pieces.size()); - int size = static_cast(m_torrent_file.piece_size(piece_index)); - std::vector buffer(size); - assert(size > 0); - m_storage->read(&buffer[0], piece_index, 0, size); + m_storage->async_hash(piece_index, bind(&torrent::on_piece_verified + , shared_from_this(), _1, _2, f)); + } - hasher h; - h.update(&buffer[0], size); - sha1_hash digest = h.final(); - - if (m_torrent_file.hash_for_piece(piece_index) != digest) - return false; - - return true; + void torrent::on_piece_verified(int ret, disk_io_job const& j + , boost::function f) + { + sha1_hash h(j.str); + session_impl::mutex_t::scoped_lock l(m_ses.m_mutex); + f(m_torrent_file.hash_for_piece(j.piece) == h); } const tcp::endpoint& torrent::current_tracker() const @@ -2498,7 +2595,7 @@ namespace libtorrent } bool torrent::is_allocating() const - { return m_storage.get() && m_storage->is_allocating(); } + { return m_owning_storage.get() && m_owning_storage->is_allocating(); } void torrent::file_progress(std::vector& fp) const { diff --git a/src/torrent_handle.cpp b/src/torrent_handle.cpp index b8c307b88..c9ade14e9 100755 --- a/src/torrent_handle.cpp +++ b/src/torrent_handle.cpp @@ -80,6 +80,8 @@ using libtorrent::aux::session_impl; namespace libtorrent { + namespace fs = boost::filesystem; + namespace { void throw_invalid_handle() @@ -205,12 +207,12 @@ namespace libtorrent , bind(&torrent::download_limit, _1)); } - bool torrent_handle::move_storage( - boost::filesystem::path const& save_path) const + void torrent_handle::move_storage( + fs::path const& save_path) const { INVARIANT_CHECK; - return call_member(m_ses, m_chk, m_info_hash + call_member(m_ses, m_chk, m_info_hash , bind(&torrent::move_storage, _1, save_path)); } @@ -566,7 +568,8 @@ namespace libtorrent unsigned char v = 0; int bits = std::min(num_blocks_per_piece - j*8, 8); for (int k = 0; k < bits; ++k) - v |= i->info[j*8+k].finished?(1 << k):0; + v |= (i->info[j*8+k].state == piece_picker::block_info::state_finished) + ? (1 << k) : 0; bitmask.insert(bitmask.end(), v); assert(bits == 8 || j == num_bitmask_bytes - 1); } @@ -618,11 +621,11 @@ namespace libtorrent } - boost::filesystem::path torrent_handle::save_path() const + fs::path torrent_handle::save_path() const { INVARIANT_CHECK; - return call_member(m_ses, m_chk, m_info_hash + return call_member(m_ses, m_chk, m_info_hash , bind(&torrent::save_path, _1)); } @@ -770,10 +773,9 @@ namespace libtorrent pi.blocks_in_piece = p.blocks_in_piece(i->index); for (int j = 0; j < pi.blocks_in_piece; ++j) { - pi.peer[j] = i->info[j].peer; - pi.num_downloads[j] = i->info[j].num_downloads; - pi.finished_blocks[j] = i->info[j].finished; - pi.requested_blocks[j] = i->info[j].requested; + pi.blocks[j].peer = i->info[j].peer; + pi.blocks[j].num_downloads = i->info[j].num_downloads; + pi.blocks[j].state = i->info[j].state; } pi.piece_index = i->index; queue.push_back(pi); diff --git a/src/torrent_info.cpp b/src/torrent_info.cpp index e546a1243..4ea09aefd 100755 --- a/src/torrent_info.cpp +++ b/src/torrent_info.cpp @@ -62,10 +62,12 @@ namespace pt = boost::posix_time; namespace gr = boost::gregorian; using namespace libtorrent; -using namespace boost::filesystem; namespace { + + namespace fs = boost::filesystem; + void convert_to_utf8(std::string& str, unsigned char chr) { str += 0xc0 | ((chr & 0xff) >> 6); @@ -153,7 +155,7 @@ namespace // encoded string if (!valid_encoding) { - target.orig_path.reset(new path(target.path)); + target.orig_path.reset(new fs::path(target.path)); target.path = tmp_path; } } @@ -203,8 +205,8 @@ namespace offset += target.back().size; } } - - void remove_dir(path& p) +/* + void remove_dir(fs::path& p) { assert(p.begin() != p.end()); path tmp; @@ -212,6 +214,7 @@ namespace tmp /= *i; p = tmp; } +*/ } namespace libtorrent @@ -277,6 +280,29 @@ namespace libtorrent torrent_info::~torrent_info() {} + void torrent_info::swap(torrent_info& ti) + { + using std::swap; + m_urls.swap(ti.m_urls); + m_url_seeds.swap(ti.m_url_seeds); + swap(m_piece_length, ti.m_piece_length); + m_piece_hash.swap(ti.m_piece_hash); + m_files.swap(ti.m_files); + m_nodes.swap(ti.m_nodes); + swap(m_num_pieces, ti.m_num_pieces); + swap(m_info_hash, ti.m_info_hash); + m_name.swap(ti.m_name); + swap(m_creation_date, ti.m_creation_date); + m_comment.swap(ti.m_comment); + m_created_by.swap(ti.m_created_by); + swap(m_multifile, ti.m_multifile); + swap(m_private, ti.m_private); + m_extra_info.swap(ti.m_extra_info); +#ifndef NDEBUG + swap(m_half_metadata, ti.m_half_metadata); +#endif + } + void torrent_info::set_piece_size(int size) { // make sure the size is an even power of 2 @@ -323,7 +349,7 @@ namespace libtorrent else { m_name = info["name"].string(); } - path tmp = m_name; + fs::path tmp = m_name; if (tmp.is_complete()) throw std::runtime_error("torrent contains " "a file with an absolute path: '" + m_name + "'"); if (tmp.has_branch_path()) throw std::runtime_error( @@ -526,9 +552,9 @@ namespace libtorrent , bind(&announce_entry::tier, _1), bind(&announce_entry::tier, _2))); } - void torrent_info::add_file(boost::filesystem::path file, size_type size) + void torrent_info::add_file(fs::path file, size_type size) { - assert(file.begin() != file.end()); +// assert(file.begin() != file.end()); if (!file.has_branch_path()) { @@ -589,8 +615,6 @@ namespace libtorrent entry torrent_info::create_info_metadata() const { - namespace fs = boost::filesystem; - // you have to add files to the torrent first assert(!m_files.empty()); @@ -650,8 +674,6 @@ namespace libtorrent { assert(m_piece_length > 0); - namespace fs = boost::filesystem; - if ((m_urls.empty() && m_nodes.empty()) || m_files.empty()) { // TODO: throw something here diff --git a/src/tracker_manager.cpp b/src/tracker_manager.cpp index 4cf83b3ca..7bd511588 100755 --- a/src/tracker_manager.cpp +++ b/src/tracker_manager.cpp @@ -289,28 +289,6 @@ namespace libtorrent return ret; } - void intrusive_ptr_add_ref(timeout_handler const* c) - { - assert(c != 0); - assert(c->m_refs >= 0); - timeout_handler::mutex_t::scoped_lock l(c->m_mutex); - ++c->m_refs; - } - - void intrusive_ptr_release(timeout_handler const* c) - { - assert(c != 0); - assert(c->m_refs > 0); - timeout_handler::mutex_t::scoped_lock l(c->m_mutex); - --c->m_refs; - if (c->m_refs == 0) - { - l.unlock(); - delete c; - } - } - - timeout_handler::timeout_handler(asio::strand& str) : m_strand(str) , m_start_time(time_now()) @@ -318,7 +296,6 @@ namespace libtorrent , m_timeout(str.io_service()) , m_completion_timeout(0) , m_read_timeout(0) - , m_refs(0) {} void timeout_handler::set_timeout(int completion_timeout, int read_timeout) diff --git a/test/setup_transfer.cpp b/test/setup_transfer.cpp index abc56de4d..2b32a7f19 100644 --- a/test/setup_transfer.cpp +++ b/test/setup_transfer.cpp @@ -11,11 +11,20 @@ using boost::filesystem::remove_all; using boost::filesystem::create_directory; -void sleep(int msec) +void test_sleep(int millisec) { boost::xtime xt; boost::xtime_get(&xt, boost::TIME_UTC); - xt.nsec += msec * 1000000; + xt.nsec += millisec * 1000000; + boost::uint64_t nanosec = (millisec % 1000) * 1000000 + xt.nsec; + int sec = millisec / 1000; + if (nanosec > 1000000000) + { + nanosec -= 1000000000; + sec++; + } + xt.nsec = nanosec; + xt.sec += sec; boost::thread::sleep(xt); } @@ -70,6 +79,7 @@ setup_transfer(session* ses1, session* ses2, session* ses3 // they should not use the same save dir, because the // file pool will complain if two torrents are trying to // use the same files + sha1_hash info_hash = t.info_hash(); torrent_handle tor1 = ses1->add_torrent(t, "./tmp1"); torrent_handle tor2; torrent_handle tor3; @@ -81,7 +91,10 @@ setup_transfer(session* ses1, session* ses2, session* ses3 else tor2 = ses2->add_torrent(t, "./tmp2"); - sleep(100); + assert(ses1->get_torrents().size() == 1); + assert(ses2->get_torrents().size() == 1); + + test_sleep(100); std::cerr << "connecting peer\n"; tor1.connect_peer(tcp::endpoint(address::from_string("127.0.0.1") @@ -91,7 +104,7 @@ setup_transfer(session* ses1, session* ses2, session* ses3 { // give the other peers some time to get an initial // set of pieces before they start sharing with each-other - sleep(10000); + test_sleep(10000); tor3.connect_peer(tcp::endpoint( address::from_string("127.0.0.1") , ses2->listen_port())); diff --git a/test/setup_transfer.hpp b/test/setup_transfer.hpp index 5221a09cc..c679bb600 100644 --- a/test/setup_transfer.hpp +++ b/test/setup_transfer.hpp @@ -5,7 +5,8 @@ #include -void sleep(int msec); +void test_sleep(int millisec); + boost::tuple setup_transfer(libtorrent::session* ses1, libtorrent::session* ses2 diff --git a/test/test_metadata_extension.cpp b/test/test_metadata_extension.cpp index dcef62741..8cb172ed8 100644 --- a/test/test_metadata_extension.cpp +++ b/test/test_metadata_extension.cpp @@ -40,7 +40,7 @@ void test_transfer(bool clear_files = true, bool disconnect = false) if (disconnect && tor2.is_valid()) ses2.remove_torrent(tor2); if (!disconnect && tor2.has_metadata()) break; - sleep(100); + test_sleep(100); } if (disconnect) return; @@ -52,7 +52,7 @@ void test_transfer(bool clear_files = true, bool disconnect = false) { tor2.status(); if (tor2.is_seed()) break; - sleep(100); + test_sleep(100); } TEST_CHECK(tor2.is_seed()); diff --git a/test/test_pe_crypto.cpp b/test/test_pe_crypto.cpp index cd0d881b4..a7da88edd 100644 --- a/test/test_pe_crypto.cpp +++ b/test/test_pe_crypto.cpp @@ -75,7 +75,6 @@ void test_transfer(libtorrent::pe_settings::enc_policy policy, bool pref_rc4 = false) { using namespace libtorrent; - using boost::tuples::ignore; using std::cerr; session ses1(fingerprint("LT", 0, 1, 0, 0), std::make_pair(48000, 49000)); @@ -104,6 +103,7 @@ void test_transfer(libtorrent::pe_settings::enc_policy policy, torrent_handle tor1; torrent_handle tor2; + using boost::tuples::ignore; boost::tie(tor1, tor2, ignore) = setup_transfer(&ses1, &ses2, 0, true, false); std::cerr << "waiting for transfer to complete\n"; @@ -121,7 +121,7 @@ void test_transfer(libtorrent::pe_settings::enc_policy policy, std::cerr << "ses2: " << a->msg() << "\n"; if (tor2.is_seed()) break; - sleep(100); + test_sleep(100); } TEST_CHECK(tor2.is_seed()); diff --git a/test/test_piece_picker.cpp b/test/test_piece_picker.cpp index bf7173cfc..9c5a6c447 100644 --- a/test/test_piece_picker.cpp +++ b/test/test_piece_picker.cpp @@ -26,13 +26,14 @@ int test_main() partial.index = 1; partial.info = blocks; - partial.info[0].finished = true; - partial.info[2].finished = true; + partial.info[0].state = piece_picker::block_info::state_finished; + partial.info[2].state = piece_picker::block_info::state_finished; unfinished.push_back(partial); - p.files_checked(have, unfinished); - TEST_CHECK(p.is_downloading(piece_block(1, 0))); - TEST_CHECK(p.is_downloading(piece_block(1, 2))); + std::vector verify_pieces; + p.files_checked(have, unfinished, verify_pieces); + TEST_CHECK(p.is_finished(piece_block(1, 0))); + TEST_CHECK(p.is_finished(piece_block(1, 2))); p.set_piece_priority(4, 0); @@ -151,27 +152,21 @@ int test_main() std::vector const& downloads = p.get_download_queue(); TEST_CHECK(downloads.size() == 2); TEST_CHECK(downloads[0].index == 1); - TEST_CHECK(downloads[0].info[0].finished == 1); - TEST_CHECK(downloads[0].info[1].finished == 0); - TEST_CHECK(downloads[0].info[2].finished == 1); - TEST_CHECK(downloads[0].info[3].finished == 0); - TEST_CHECK(downloads[0].info[1].requested == 1); - TEST_CHECK(downloads[0].info[3].requested == 1); + TEST_CHECK(downloads[0].info[0].state == piece_picker::block_info::state_finished); + TEST_CHECK(downloads[0].info[1].state == piece_picker::block_info::state_requested); + TEST_CHECK(downloads[0].info[2].state == piece_picker::block_info::state_finished); + TEST_CHECK(downloads[0].info[3].state == piece_picker::block_info::state_requested); TEST_CHECK(downloads[1].index == 2); - TEST_CHECK(downloads[1].info[0].finished == 0); - TEST_CHECK(downloads[1].info[1].finished == 0); - TEST_CHECK(downloads[1].info[2].finished == 0); - TEST_CHECK(downloads[1].info[3].finished == 0); - TEST_CHECK(downloads[1].info[0].requested == 1); - TEST_CHECK(downloads[1].info[1].requested == 0); - TEST_CHECK(downloads[1].info[2].requested == 0); - TEST_CHECK(downloads[1].info[3].requested == 0); + TEST_CHECK(downloads[1].info[0].state == piece_picker::block_info::state_requested); + TEST_CHECK(downloads[1].info[1].state == piece_picker::block_info::state_none); + TEST_CHECK(downloads[1].info[2].state == piece_picker::block_info::state_none); + TEST_CHECK(downloads[1].info[3].state == piece_picker::block_info::state_none); - TEST_CHECK(p.is_downloading(piece_block(1, 1))); - TEST_CHECK(p.is_downloading(piece_block(1, 3))); - TEST_CHECK(p.is_downloading(piece_block(2, 0))); - TEST_CHECK(!p.is_downloading(piece_block(2, 1))); + TEST_CHECK(p.is_requested(piece_block(1, 1))); + TEST_CHECK(p.is_requested(piece_block(1, 3))); + TEST_CHECK(p.is_requested(piece_block(2, 0))); + TEST_CHECK(!p.is_requested(piece_block(2, 1))); picked.clear(); p.pick_pieces(peer1, picked, 1, false, tcp::endpoint(), piece_picker::fast); diff --git a/test/test_storage.cpp b/test/test_storage.cpp index b065d92ec..0471f7cbd 100644 --- a/test/test_storage.cpp +++ b/test/test_storage.cpp @@ -10,12 +10,19 @@ #include #include "test.hpp" +#include "setup_transfer.hpp" using namespace libtorrent; using namespace boost::filesystem; const int piece_size = 16; +void on_read_piece(int ret, disk_io_job const& j, char const* data, int size) +{ + TEST_CHECK(ret == size); + TEST_CHECK(std::equal(j.buffer, j.buffer + ret, data)); +} + void run_storage_tests(torrent_info& info, bool compact_allocation = true) { const int half = piece_size / 2; @@ -75,41 +82,47 @@ void run_storage_tests(torrent_info& info, bool compact_allocation = true) // make sure the piece_manager can identify the pieces { file_pool fp; - piece_manager pm(info, initial_path(), fp, default_storage_constructor); + disk_io_thread io; + boost::shared_ptr dummy(new int); + boost::intrusive_ptr pm = new piece_manager(dummy, info + , initial_path(), fp, io, default_storage_constructor); boost::mutex lock; libtorrent::aux::piece_checker_data d; std::vector pieces; num_pieces = 0; - TEST_CHECK(pm.check_fastresume(d, pieces, num_pieces + TEST_CHECK(pm->check_fastresume(d, pieces, num_pieces , compact_allocation) == false); bool finished = false; float progress; num_pieces = 0; boost::recursive_mutex mutex; while (!finished) - boost::tie(finished, progress) = pm.check_files(pieces, num_pieces, mutex); + boost::tie(finished, progress) = pm->check_files(pieces, num_pieces, mutex); TEST_CHECK(num_pieces == std::count(pieces.begin(), pieces.end() , true)); TEST_CHECK(exists("temp_storage")); - pm.move_storage("temp_storage2"); + pm->async_move_storage("temp_storage2"); + test_sleep(2000); TEST_CHECK(!exists("temp_storage")); TEST_CHECK(exists("temp_storage2/temp_storage")); - pm.move_storage("."); + pm->async_move_storage("."); + test_sleep(2000); TEST_CHECK(!exists("temp_storage2/temp_storage")); remove_all("temp_storage2"); - TEST_CHECK(pm.read(piece, 0, 0, piece_size) == piece_size); - TEST_CHECK(std::equal(piece, piece + piece_size, piece0)); - - TEST_CHECK(pm.read(piece, 1, 0, piece_size) == piece_size); - TEST_CHECK(std::equal(piece, piece + piece_size, piece1)); - - TEST_CHECK(pm.read(piece, 2, 0, piece_size) == piece_size); - TEST_CHECK(std::equal(piece, piece + piece_size, piece2)); - pm.release_files(); + peer_request r; + r.piece = 0; + r.start = 0; + r.length = piece_size; + pm->async_read(r, bind(&on_read_piece, _1, _2, piece0, piece_size)); + r.piece = 1; + pm->async_read(r, bind(&on_read_piece, _1, _2, piece1, piece_size)); + r.piece = 2; + pm->async_read(r, bind(&on_read_piece, _1, _2, piece2, piece_size)); + pm->async_release_files(); } } diff --git a/test/test_swarm.cpp b/test/test_swarm.cpp index 33defbf45..7ddbbecd8 100644 --- a/test/test_swarm.cpp +++ b/test/test_swarm.cpp @@ -92,7 +92,7 @@ void test_swarm() << std::endl; if (tor2.is_seed() && tor3.is_seed()) break; - sleep(1000); + test_sleep(1000); } TEST_CHECK(tor2.is_seed()); diff --git a/test/test_web_seed.cpp b/test/test_web_seed.cpp index 72a44b1cc..a92b1a3be 100644 --- a/test/test_web_seed.cpp +++ b/test/test_web_seed.cpp @@ -58,21 +58,22 @@ void test_transfer() torrent_file.create_torrent(); session ses; + ses.set_severity_level(alert::debug); ses.listen_on(std::make_pair(49000, 50000)); remove_all("./tmp1"); torrent_handle th = ses.add_torrent(torrent_file, "./tmp1"); - for (int i = 0; i < 70; ++i) + for (int i = 0; i < 30; ++i) { torrent_status s = th.status(); - std::cerr << s.progress << " " << (s.download_rate / 1000.f) << "\r"; + std::cerr << s.progress << " " << (s.download_rate / 1000.f) << std::endl; std::auto_ptr a; a = ses.pop_alert(); if (a.get()) std::cerr << a->msg() << "\n"; if (th.is_seed()) break; - sleep(999); + test_sleep(1000); } TEST_CHECK(th.is_seed());