diff --git a/include/libtorrent/peer_connection.hpp b/include/libtorrent/peer_connection.hpp index 1204eef6f..f98b12d59 100755 --- a/include/libtorrent/peer_connection.hpp +++ b/include/libtorrent/peer_connection.hpp @@ -291,8 +291,8 @@ namespace libtorrent peer_id m_peer_id; // the pieces that we are sending and receiving - piece_file m_sending_piece; - piece_file m_receiving_piece; +// piece_file m_sending_piece; +// piece_file m_receiving_piece; // other side says that it's interested in downloading // from us. diff --git a/include/libtorrent/storage.hpp b/include/libtorrent/storage.hpp index e90609df1..1e17a5722 100755 --- a/include/libtorrent/storage.hpp +++ b/include/libtorrent/storage.hpp @@ -75,7 +75,7 @@ namespace libtorrent virtual ~file_allocation_failed() throw() {} std::string m_msg; }; - +/* // wraps access to pieces with a file-like interface class piece_file { @@ -151,6 +151,7 @@ namespace libtorrent friend class piece_file; friend class piece_sorter; public: + typedef entry::integer_type size_type; void initialize_pieces(torrent* t, const boost::filesystem::path& path, @@ -169,6 +170,9 @@ namespace libtorrent entry::integer_type piece_storage(int piece); void allocate_pieces(int num); + size_type read(char* buf, int slot, size_type offset, size_type size); + void write(const char* buf, int slot, size_type offset, size_type size); + private: void libtorrent::storage::invariant() const; @@ -202,23 +206,83 @@ namespace libtorrent boost::condition m_unlocked_pieces; std::vector m_locked_pieces; - boost::recursive_mutex m_mutex; + mutable boost::recursive_mutex m_mutex; torrent* m_torrent; }; - - class piece_sorter +*/ + class storage { public: - void operator()(); + storage( + const torrent_info& info + , const boost::filesystem::path& path); + + typedef entry::integer_type size_type; + + size_type read(char* buf, int slot, size_type offset, size_type size); + void write(const char* buf, int slot, size_type offset, size_type size); private: - typedef std::vector pieces_type; + const torrent_info& m_info; + const boost::filesystem::path m_save_path; + }; - storage* m_storage; - boost::mutex m_monitor; - boost::condition m_more_pieces; - pieces_type m_pieces; + class piece_manager + { + public: + + typedef entry::integer_type size_type; + + piece_manager( + const torrent_info& info + , const boost::filesystem::path& path); + + void check_pieces(boost::mutex& mutex, detail::piece_checker_data& data); + + void allocate_slots(int num_slots); + + size_type read(char* buf, int piece_index, size_type offset, size_type size); + size_type write(const char* buf, int piece_index, size_type offset, size_type size); + + private: + + // returns the slot currently associated with the given + // piece or assigns the given piece_index to a free slot + int slot_for_piece(int piece_index); + + void check_invariant() const; + + storage m_storage; + + // total number of bytes left to be downloaded + size_type m_bytes_left; + + // a bitmask representing the pieces we have + std::vector m_have_piece; + + const torrent_info& m_info; + + // maps piece index to slot index. -1 means the piece + // doesn't exist + std::vector m_piece_to_slot; + // slots that hasn't had any file storage allocated + std::vector m_unallocated_slots; + // slots that has file storage, but isn't assigned to a piece + std::vector m_free_slots; + + // index here is a slot number in the file + // -1 : the slot is unallocated + // -2 : the slot is allocated but not assigned to a piece + // * : the slot is assigned to this piece + std::vector m_slot_to_piece; + + // synchronization + boost::mutex m_locked_pieces_monitor; + boost::condition m_unlocked_pieces; + std::vector m_locked_pieces; + + mutable boost::recursive_mutex m_mutex; }; } diff --git a/include/libtorrent/torrent.hpp b/include/libtorrent/torrent.hpp index 42508094d..cd9c6799b 100755 --- a/include/libtorrent/torrent.hpp +++ b/include/libtorrent/torrent.hpp @@ -110,7 +110,25 @@ namespace libtorrent int bytes_downloaded() const { return m_bytes_downloaded; } int bytes_uploaded() const { return m_bytes_uploaded; } - int bytes_left() const { return m_storage.bytes_left(); } + int bytes_left() const + { + const std::vector& p = m_storage.pieces(); + int num_pieces = std::accumulate(p.begin(), p.end(), 0); + int total_blocks + = (m_torrent_file.total_size()+m_block_size-1)/m_block_size; + int blocks_per_piece + = m_torrent_file.piece_length() / m_block_size; + int unverified_blocks = m_picker.unverified_blocks(); + int blocks_we_have = num_pieces * blocks_per_piece; + const int last_piece = m_torrent_file.num_pieces()-1; + if (p[last_piece]) + { + blocks_we_have += m_picker.blocks_in_piece(last_piece) + - blocks_per_piece; + } + return m_torrent_file.total_size() + - (blocks_we_have + unverified_blocks) * m_block_size; + } torrent_status status() const; @@ -262,7 +280,7 @@ namespace libtorrent torrent_info m_torrent_file; - storage m_storage; + piece_manager m_storage; // the time of next tracker request boost::posix_time::ptime m_next_request; diff --git a/src/storage.cpp b/src/storage.cpp index 9a11f9690..b920c4199 100755 --- a/src/storage.cpp +++ b/src/storage.cpp @@ -54,7 +54,7 @@ POSSIBILITY OF SUCH DAMAGE. #if defined(_MSC_VER) #define for if (false) {} else for #endif - +/* using namespace libtorrent; //#define SUPER_VERBOSE_LOGGING @@ -202,20 +202,6 @@ void libtorrent::piece_file::open(storage* s, int index, open_mode o, int seek_o , this , false)); -/* std::vector& locked_pieces = s->m_locked_pieces; - boost::mutex::scoped_lock piece_lock(s->m_locked_pieces_monitor); - - while (locked_pieces[index]) - s->m_unlocked_pieces.wait(piece_lock); - - defered notifier = defered_action( - boost::bind( - &boost::condition::notify_all - , boost::ref(s->m_unlocked_pieces) - )); - - defered piece_locker = defered_bitvector_value( - locked_pieces, index, true, false);*/ // ---------------------------------------------------------------------- assert(index >= 0 && index < s->m_torrent_file->num_pieces() && "internal error"); @@ -255,76 +241,6 @@ void libtorrent::piece_file::open(storage* s, int index, open_mode o, int seek_o if (m_mode == in) m_file.seekg(m_file_offset, std::ios_base::beg); else m_file.seekp(m_file_offset, std::ios_base::beg); - -#if 0 // old implementation - - open_mode old_mode = m_mode; - storage* old_storage = m_storage; - std::vector::const_iterator old_file_iter = m_file_iter; - const file* old_file_info = &(*m_file_iter); - - m_mode = o; - m_piece_index = index; - m_storage = s; - m_piece_size = m_storage->m_torrent_file->piece_size(m_piece_index); - - assert(index < m_storage->m_torrent_file->num_pieces() && "internal error"); - - m_piece_offset = seek_offset; - int piece_byte_offset = index * m_storage->m_torrent_file->piece_length() - + m_piece_offset; - - entry::integer_type file_byte_offset = 0; - for (m_file_iter = m_storage->m_torrent_file->begin_files(); - m_file_iter != m_storage->m_torrent_file->end_files(); - ++m_file_iter) - { - if (file_byte_offset + m_file_iter->size > piece_byte_offset) break; - file_byte_offset += m_file_iter->size; - } - // m_file_iter now refers to the first file this piece is stored in - - // if we're still in the same file, don't reopen it - if ((m_mode == out && !(m_file_mode & std::ios_base::out)) - || old_file_iter != m_file_iter - || !m_file.is_open() - || m_file.fail() - || old_storage != m_storage) - { - std::ios_base::openmode m; - if (m_mode == out) m = std::ios_base::out | std::ios_base::in | std::ios_base::binary; - else m = std::ios_base::in | std::ios_base::binary; - - const file& f = *m_file_iter; - boost::filesystem::path p = m_storage->m_save_path; - p /= f.path; - p /= f.filename; - m_file.close(); - m_file.clear(); - - m_file_mode = m; - m_file.open(p, m_file_mode); -// std::cout << "opening file: '" << p.native_file_string() << "'\n"; - if (m_file.fail()) - { - // TODO: try to recover! create a new file? - assert(!m_file.fail()); - } - } - assert(!m_file.fail()); - - m_file_offset = piece_byte_offset - file_byte_offset; - if (m_mode == in) m_file.seekg(m_file_offset, std::ios_base::beg); - else m_file.seekp(m_file_offset, std::ios_base::beg); - -#ifndef NDEBUG - int gpos = m_file.tellg(); - int ppos = m_file.tellp(); - assert(m_mode == out || m_file_offset == gpos && "internal error"); - assert(m_mode == in || m_file_offset == ppos && "internal error"); -#endif - -#endif } int libtorrent::piece_file::read(char* buf, int size, bool lock_) @@ -334,21 +250,6 @@ int libtorrent::piece_file::read(char* buf, int size, bool lock_) assert(m_file.is_open()); // synchronization ------------------------------------------------------ -/* std::vector& locked_pieces = m_storage->m_locked_pieces; - boost::mutex::scoped_lock piece_lock(m_storage->m_locked_pieces_monitor); - - while (locked_pieces[m_piece_index]) - m_storage->m_unlocked_pieces.wait(piece_lock); - - defered notifier = defered_action( - boost::bind( - &boost::condition::notify_all - , boost::ref(m_storage->m_unlocked_pieces) - )); - - defered piece_locker = defered_bitvector_value( - locked_pieces, m_piece_index, true, false);*/ - lock(); defered unlock = defered_action( boost::bind( @@ -401,72 +302,6 @@ int libtorrent::piece_file::read(char* buf, int size, bool lock_) } return result; - -#if 0 // old implementation - - assert(m_file_offset == (long)m_file.tellg() && "internal error"); -// std::cout << std::clock() << "read " << m_piece_index << "\n"; - - assert(m_mode == in); - assert(!m_file.fail()); - assert(m_file.is_open()); - int left_to_read = size; - - // make sure we don't read more than what belongs to this piece - if (m_piece_offset + left_to_read > m_piece_size) - left_to_read = m_piece_size - m_piece_offset; - - int buf_pos = 0; - int read_total = 0; - do - { - assert(m_file_iter != m_storage->m_torrent_file->end_files() && "internal error, please report!"); - int available = std::min(static_cast(m_file_iter->size - m_file_offset), - static_cast(left_to_read)); - - assert(buf_pos >= 0); - m_file.read(buf + buf_pos, available); - int read = m_file.gcount(); - assert(read > 0); - left_to_read -= read; - read_total += read; - buf_pos += read; - assert(buf_pos >= 0); - m_file_offset += read; - m_piece_offset += read; - - if (m_file_offset == m_file_iter->size && m_piece_offset < m_piece_size) - { - ++m_file_iter; - assert(m_file_iter != m_storage->m_torrent_file->end_files() && "internal error, please report!"); - boost::filesystem::path path = m_storage->m_save_path; - path /= m_file_iter->path; - path /= m_file_iter->filename; - - m_file_offset = 0; - m_file.close(); - m_file.clear(); - m_file.open(path, m_file_mode); -// std::cout << "opening file: '" << path.native_file_string() << "'\n"; - if (m_file.fail()) - { - // TODO: try to recover! create a new file? - assert(!m_file.fail()); - } - } - else - { - assert(read != 0 && "internal error"); - } - } while (left_to_read > 0); - -#ifndef NDEBUG - int gpos = m_file.tellg(); - assert(m_file_offset == gpos && "internal error"); -#endif - return read_total; - -#endif } void libtorrent::piece_file::write(const char* buf, int size, bool lock_) @@ -476,20 +311,6 @@ void libtorrent::piece_file::write(const char* buf, int size, bool lock_) assert(m_file.is_open()); // synchronization ------------------------------------------------------ -/* std::vector& locked_pieces = m_storage->m_locked_pieces; - boost::mutex::scoped_lock piece_lock(m_storage->m_locked_pieces_monitor); - - while (locked_pieces[m_piece_index]) - m_storage->m_unlocked_pieces.wait(piece_lock); - - defered notifier = defered_action( - boost::bind( - &boost::condition::notify_all - , boost::ref(m_storage->m_unlocked_pieces) - )); - - defered piece_locker = defered_bitvector_value( - locked_pieces, m_piece_index, true, false);*/ lock(); defered unlock = defered_action( boost::bind( @@ -536,7 +357,6 @@ void libtorrent::piece_file::write(const char* buf, int size, bool lock_) ++m_file_iter; assert(m_file_iter != m_storage->m_torrent_file->end_files()); -// assert(m_file_iter->size > m_file_offset); BOGUS ASSERT? boost::filesystem::path path = m_storage->m_save_path / m_file_iter->path / m_file_iter->filename; @@ -547,50 +367,6 @@ void libtorrent::piece_file::write(const char* buf, int size, bool lock_) m_file.open(path, m_file_mode); } } - -#if 0 // old implementation - /* - assert(m_mode == out); - int left_to_write = size; - - // make sure we don't write more than what belongs to this piece - if (m_piece_offset + left_to_write > m_piece_size) - left_to_write = m_piece_size - m_piece_offset; - - int buf_pos = 0; - do - { - assert(m_file_iter != m_storage->m_torrent_file->end_files() && "internal error, please report!"); - int this_write = std::min(static_cast(m_file_iter->size - m_file_offset), - static_cast(left_to_write)); - - m_file.write(buf + buf_pos, this_write); - left_to_write -= this_write; - buf_pos += this_write; - m_file_offset += this_write; - m_piece_offset += this_write; - - if (m_file_offset == m_file_iter->size && m_piece_offset < m_piece_size) - { - ++m_file_iter; - assert(m_file_iter != m_storage->m_torrent_file->end_files() && "internal error, please report!"); - boost::filesystem::path path = m_storage->m_save_path; - path /= m_file_iter->path; - path /= m_file_iter->filename; - - m_file_offset = 0; - m_file.close(); - m_file.open(path, m_file_mode); -// std::cout << "opening file: '" << path.native_file_string() << "'\n"; - if (m_file.fail()) - { - // TODO: try to recover! create a new file? - assert(!m_file.fail()); - } - } - } while (left_to_write > 0); -*/ -#endif } void libtorrent::piece_file::seek_forward(int step, bool lock_) @@ -690,9 +466,9 @@ bool libtorrent::storage::verify_piece(piece_file& file) } return false; } - +*/ namespace { - +/* struct equal_hash { bool operator()(const sha1_hash& lhs, const sha1_hash& rhs) const @@ -700,7 +476,7 @@ namespace { return std::equal(lhs.begin(), lhs.end(), rhs.begin()); } }; - +*/ struct lazy_hash { mutable sha1_hash digest; @@ -734,7 +510,7 @@ namespace { } } // namespace unnamed - +/* void libtorrent::storage::allocate_pieces(int num) { // synchronization ------------------------------------------------------ @@ -786,8 +562,6 @@ void libtorrent::storage::allocate_pieces(int num) while (bytes_left > 0) { -// fs::ofstream out(fs::path("foo.bar"), std::ios_base::binary | std::ios_base::in); - fs::path path(m_save_path / file_iter->path / file_iter->filename); fs::ofstream out; @@ -797,9 +571,6 @@ void libtorrent::storage::allocate_pieces(int num) else out.open(path, std::ios_base::binary); -// std::ofstream out((m_save_path / file_iter->path / file_iter->filename).native_file_string().c_str() -// , std::ios_base::binary | std::ios_base::in); - out.seekp(pos, std::ios_base::beg); assert((entry::integer_type)out.tellp() == pos); @@ -1356,163 +1127,746 @@ void libtorrent::storage::invariant() const } } -#if 0 // OLD STORAGE -/* - -// allocate files will create all files that are missing -// if there are some files that already exists, it checks -// that they have the correct filesize -// data is the structure that is shared between the -// thread where this function is run in and the -// main thread. It is used to communicate progress -// and abortion information. -void libtorrent::storage::initialize_pieces(torrent* t, - const boost::filesystem::path& path, - detail::piece_checker_data* data, - boost::mutex& mutex) +libtorrent::storage::size_type libtorrent::storage::read( + char* buf + , int slot + , size_type offset + , size_type size) { - m_save_path = path; - m_torrent_file = &t->torrent_file(); + namespace fs = boost::filesystem; - // we don't know of any piece we have right now. Initialize - // it to say we don't have anything and fill it in later on. - m_have_piece.resize(m_torrent_file->num_pieces()); - std::fill(m_have_piece.begin(), m_have_piece.end(), false); + size_type start = slot * m_torrent_file->piece_length() + offset; - m_bytes_left = m_torrent_file->total_size(); + // find the file iterator and file offset + size_type file_offset = start; + std::vector::const_iterator file_iter; -#ifndef NDEBUG - std::size_t sum = 0; - for (int i = 0; i < m_torrent_file->num_pieces(); ++i) - sum += m_torrent_file->piece_size(i); - assert(sum == m_bytes_left); -#endif - - // this will be set to true if some file already exists - // in which case another pass will be made to check - // the hashes of all pieces to know which pieces we - // have - bool resume = false; - - unsigned int total_bytes = m_torrent_file->total_size(); - unsigned int progress = 0; - - // the buffersize of the file writes - const int chunksize = 8192; - char zeros[chunksize]; - std::fill(zeros, zeros+chunksize, 0); - - // remember which directories we have created, so - // we don't have to ask the filesystem all the time - std::set created_directories; - for (torrent_info::file_iterator i = m_torrent_file->begin_files(); i != m_torrent_file->end_files(); ++i) + for (file_iter = m_torrent_file->begin_files();;) { - boost::filesystem::path path = m_save_path; - path /= i->path; - if (created_directories.find(i->path) == created_directories.end()) + if (file_offset < file_iter->size) + break; + + file_offset -= file_iter->size; + ++file_iter; + } + + fs::ifstream in( + m_save_path / file_iter->path / file_iter->filename + , std::ios_base::binary + ); + + assert(file_offset < file_iter->size); + + in.seekg(std::ios_base::beg, file_offset); + + size_type left_to_read = size; + size_type slot_size = m_torrent_file->piece_size(slot); + + if (offset + left_to_read > slot_size) + left_to_read = slot_size - offset; + + assert(left_to_read >= 0); + + int result = left_to_read; + int buf_pos = 0; + + while (left_to_read > 0) + { + int read_bytes = left_to_read; + if (file_offset + read_bytes > file_iter->size) + read_bytes = file_iter->size - offset; + + assert(read_bytes > 0); + + in.read(buf + buf_pos, read_bytes); + + assert(read_bytes == in.gcount()); + + left_to_read -= read_bytes; + buf_pos += read_bytes; + assert(buf_pos >= 0); + file_offset += read_bytes; + + if (left_to_read > 0) { - if (boost::filesystem::exists(path)) - { - if (!boost::filesystem::is_directory(path)) - { - std::string msg = "Cannot create directory, the file \""; - msg += path.native_file_string(); - msg += "\" is in the way."; - throw file_allocation_failed(msg.c_str()); - } - } - else - { - boost::filesystem::create_directories(path); - } - created_directories.insert(i->path); - } + ++file_iter; + fs::path path = m_save_path /file_iter->path / file_iter->filename; - // allocate the file. - // fill it with zeros - path /= i->filename; - if (boost::filesystem::exists(path)) - { - // the file exists, make sure it isn't a directory - if (boost::filesystem::is_directory(path)) - { - std::string msg = "Cannot create file, the directory \""; - msg += path.native_file_string(); - msg += "\" is in the way."; - throw file_allocation_failed(msg.c_str()); - } -// std::cout << "creating file: '" << path.native_file_string() << "'\n"; - std::ifstream f(path.native_file_string().c_str(), std::ios_base::binary); - f.seekg(0, std::ios_base::end); - int filesize = f.tellg(); - if (filesize != i->size) - { - - // TODO: recover by padding the file with 0 - std::string msg = "The file \""; - msg += path.native_file_string(); - msg += "\" has the wrong size."; - throw file_allocation_failed(msg.c_str()); - } - resume = true; - } - else - { - // The file doesn't exist, create it and fill it with zeros - std::ofstream f(path.native_file_string().c_str(), std::ios_base::binary); - entry::integer_type left_to_write = i->size; - while(left_to_write >= chunksize) - { - f.write(zeros, chunksize); - // TODO: Check if disk is full - left_to_write -= chunksize; - progress += chunksize; - - boost::mutex::scoped_lock l(mutex); - data->progress = static_cast(progress) / total_bytes; - if (data->abort) return; - } - // TODO: Check if disk is full - if (left_to_write > 0) f.write(zeros, left_to_write); - progress += left_to_write; - - boost::mutex::scoped_lock l(mutex); - data->progress = static_cast(progress) / total_bytes; - if (data->abort) return; + file_offset = 0; + in.close(); + in.clear(); + in.open(path, std::ios_base::binary); } } - // we have to check which pieces we have and which we don't have - if (resume) + return result; +} + +void libtorrent::storage::write(const char* buf, int slot, size_type offset, size_type size) +{ + namespace fs = boost::filesystem; + + size_type start = slot * m_torrent_file->piece_length() + offset; + + // find the file iterator and file offset + size_type file_offset = start; + std::vector::const_iterator file_iter; + + for (file_iter = m_torrent_file->begin_files();;) { - int missing = 0; -// std::cout << "checking existing files\n"; + if (file_offset < file_iter->size) + break; - int num_pieces = m_torrent_file->num_pieces(); - - progress = 0; - piece_file f; - for (unsigned int i = 0; i < num_pieces; ++i) - { - f.open(this, i, piece_file::in); - if (!verify_piece(f)) missing++; - -// std::cout << i+1 << " / " << m_torrent_file->num_pieces() << " missing: " << missing << "\r"; - - progress += m_torrent_file->piece_size(i); - boost::mutex::scoped_lock l(mutex); - data->progress = static_cast(progress) / total_bytes; - if (data->abort) return; - } -// std::cout << "\n"; + file_offset -= file_iter->size; + ++file_iter; } + fs::ofstream out( + m_save_path / file_iter->path / file_iter->filename + , std::ios_base::in | std::ios_base::binary + ); + + assert(file_offset < file_iter->size); + + out.seekp(std::ios_base::beg, file_offset); + + size_type left_to_write = size; + size_type slot_size = m_torrent_file->piece_size(slot); + + if (offset + left_to_write > slot_size) + left_to_write = slot_size - offset; + + assert(left_to_write >= 0); + + int buf_pos = 0; + + while (left_to_write > 0) + { + int write_bytes = left_to_write; + if (file_offset + write_bytes > file_iter->size) + { + assert(file_iter->size > file_offset); + write_bytes = file_iter->size - file_offset; + } + + assert(buf_pos >= 0); + assert(write_bytes > 0); + out.write(buf + buf_pos, write_bytes); + + left_to_write -= write_bytes; + buf_pos += write_bytes; + assert(buf_pos >= 0); + file_offset += write_bytes; + assert(file_offset <= file_iter->size); + + if (left_to_write > 0) + { + ++file_iter; + + assert(file_iter != m_torrent_file->end_files()); + + fs::path path = m_save_path / file_iter->path / file_iter->filename; + + file_offset = 0; + out.close(); + out.clear(); + out.open(path, std::ios_base::in | std::ios_base::binary); + } + } } */ -#ifdef NO_THREAD_SAFE_PIECE_FILE - #undef NO_THREAD_SAFE_PIECE_FILE -#endif + +// -- new storage abstraction ----------------------------------------------- + +namespace fs = boost::filesystem; + +namespace libtorrent { + + storage::storage(const torrent_info& info, const fs::path& path) + : m_info(info) + , m_save_path(path) + {} + + storage::size_type storage::read( + char* buf + , int slot + , size_type offset + , size_type size) + { + size_type start = slot * m_info->piece_length() + offset; + + // find the file iterator and file offset + size_type file_offset = start; + std::vector::const_iterator file_iter; + + for (file_iter = m_info->begin_files();;) + { + if (file_offset < file_iter->size) + break; + + file_offset -= file_iter->size; + ++file_iter; + } + + fs::ifstream in( + m_save_path / file_iter->path / file_iter->filename + , std::ios_base::binary + ); + + assert(file_offset < file_iter->size); + + in.seekg(std::ios_base::beg, file_offset); + + size_type left_to_read = size; + size_type slot_size = m_info->piece_size(slot); + + if (offset + left_to_read > slot_size) + left_to_read = slot_size - offset; + + assert(left_to_read >= 0); + + int result = left_to_read; + int buf_pos = 0; + + while (left_to_read > 0) + { + int read_bytes = left_to_read; + if (file_offset + read_bytes > file_iter->size) + read_bytes = file_iter->size - offset; + + assert(read_bytes > 0); + + in.read(buf + buf_pos, read_bytes); + + assert(read_bytes == in.gcount()); + + left_to_read -= read_bytes; + buf_pos += read_bytes; + assert(buf_pos >= 0); + file_offset += read_bytes; + + if (left_to_read > 0) + { + ++file_iter; + fs::path path = m_save_path / file_iter->path / file_iter->filename; + + file_offset = 0; + in.close(); + in.clear(); + in.open(path, std::ios_base::binary); + } + } + + return result; + } + + void storage::write(const char* buf, int slot, size_type offset, size_type size) + { + size_type start = slot * m_info->piece_length() + offset; + + // find the file iterator and file offset + size_type file_offset = start; + std::vector::const_iterator file_iter; + + for (file_iter = m_info->begin_files();;) + { + if (file_offset < file_iter->size) + break; + + file_offset -= file_iter->size; + ++file_iter; + } + + fs::ofstream out( + m_save_path / file_iter->path / file_iter->filename + , std::ios_base::in | std::ios_base::binary + ); + + assert(file_offset < file_iter->size); + + out.seekp(std::ios_base::beg, file_offset); + + size_type left_to_write = size; + size_type slot_size = m_info->piece_size(slot); + + if (offset + left_to_write > slot_size) + left_to_write = slot_size - offset; + + assert(left_to_write >= 0); + + int buf_pos = 0; + + // TODO + // handle case when we can't write size bytes. + while (left_to_write > 0) + { + int write_bytes = left_to_write; + if (file_offset + write_bytes > file_iter->size) + { + assert(file_iter->size > file_offset); + write_bytes = file_iter->size - file_offset; + } + + assert(buf_pos >= 0); + assert(write_bytes > 0); + out.write(buf + buf_pos, write_bytes); + + left_to_write -= write_bytes; + buf_pos += write_bytes; + assert(buf_pos >= 0); + file_offset += write_bytes; + assert(file_offset <= file_iter->size); + + if (left_to_write > 0) + { + ++file_iter; + + assert(file_iter != m_info->end_files()); + + fs::path path = m_save_path / file_iter->path / file_iter->filename; + + file_offset = 0; + out.close(); + out.clear(); + out.open(path, std::ios_base::in | std::ios_base::binary); + } + } + } + + piece_manager::piece_manager( + const torrent_info& info + , const fs::path& save_path) + : m_storage(info, save_path) + , m_info(info) + { + } + + size_type piece_manager::read(char* buf, int piece_index, size_type offset, size_type size) + { + assert(m_piece_to_slot[piece_index] >= 0); + int slot = m_piece_to_slot[piece_index]; + return m_storage.read(buf, slot, offset, size); + } + + void piece_manager::write(const char* buf, int piece_index, size_type offset, size_type size) + { + int slot = slot_for_piece(piece_index); + m_storage.write(buf, slot, offset, size); + } + + void piece_manager::check_pieces(boost::mutex& mutex, detail::piece_checker_data& data) + { + // synchronization ------------------------------------------------------ + boost::recursive_mutex::scoped_lock lock(m_mutex); + // ---------------------------------------------------------------------- + + // free up some memory + std::vector( + m_info.num_pieces(), false + ).swap(m_have_piece); + + std::vector( + m_info.num_pieces(), -1 + ).swap(m_allocated_pieces); + + std::vector( + m_info.num_pieces(), false + ).swap(m_locked_pieces); + + std::vector( + m_info.num_pieces(), -1 + ).swap(m_slot_to_piece); + + std::vector().swap(m_free_blocks); + std::vector().swap(m_free_pieces); + + m_bytes_left = m_info.total_size(); + + const std::size_t piece_size = m_info.piece_length(); + const std::size_t last_piece_size = m_info.piece_size( + m_info.num_pieces() - 1); + + bool changed_file = true; + fs::ifstream in; + + std::vector piece_data(m_info.piece_length()); + std::size_t piece_offset = 0; + + std::size_t current_piece = 0; + std::size_t bytes_to_read = piece_size; + std::size_t bytes_current_read = 0; + std::size_t seek_into_next = 0; + entry::integer_type filesize = 0; + entry::integer_type start_of_read = 0; + entry::integer_type start_of_file = 0; + + { + boost::mutex::scoped_lock lock(mutex); + data->progress = 0.f; + } + + for (torrent_info::file_iterator file_iter = m_info.begin_files(), + end_iter = m_info.end_files(); + file_iter != end_iter;) + { + { + boost::mutex::scoped_lock lock(mutex); + + data->progress = (float)current_piece / m_info.num_pieces(); + if (data->abort) + return; + } + + assert(current_piece < m_info.num_pieces()); + + fs::path path(m_save_path / file_iter->path); + + // if the path doesn't exist, create the + // entire directory tree + if (!fs::exists(path)) + fs::create_directories(path); + + path /= file_iter->filename; + + if (changed_file) + { + in.close(); + in.clear(); + in.open(path, std::ios_base::binary); + + changed_file = false; + + bytes_current_read = seek_into_next; + + if (!in) + { + filesize = 0; + } + else + { + in.seekg(0, std::ios_base::end); + filesize = in.tellg(); + in.seekg(seek_into_next, std::ios_base::beg); + } + } + + // we are at the start of a new piece + // so we store the start of the piece + if (bytes_to_read == m_info.piece_size(current_piece)) + start_of_read = current_piece * piece_size; + + std::size_t bytes_read = 0; + + if (filesize > 0) + { + in.read(&piece_data[piece_offset], bytes_to_read); + bytes_read = in.gcount(); + } + + bytes_current_read += bytes_read; + bytes_to_read -= bytes_read; + + assert(bytes_to_read >= 0); + + // bytes left to read, go on with next file + if (bytes_to_read > 0) + { + if (bytes_current_read != file_iter->size) + { + entry::integer_type pos; + entry::integer_type file_end = start_of_file + file_iter->size; + + for (pos = start_of_read; pos < file_end; + pos += piece_size) + { + m_unallocated_slots.push_back(current_piece); + ++current_piece; + } + + seek_into_next = pos - file_end; + bytes_to_read = piece_size; + piece_offset = 0; + } + else + { + seek_into_next = 0; + piece_offset += bytes_read; + } + + changed_file = true; + start_of_file += file_iter->size; + ++file_iter; + continue; + } + + // we need to take special actions if this is + // the last piece, since that piece might actually + // be smaller than piece_size. + + lazy_hash large_digest(&piece_data[0], piece_size); + lazy_hash small_digest(&piece_data[0], last_piece_size); + + const lazy_hash* digest[2] = { + &large_digest, &small_digest + }; + + bool found = false; + + for (int i = 0; i < m_info.num_pieces(); ++i) + { + if (m_have_piece[i]) + continue; + + const sha1_hash& hash = digest[ + i == m_info.num_pieces() - 1]->get(); + + if (equal_hash()(hash, m_info.hash_for_piece(i))) + { + m_bytes_left -= m_info.piece_size(i); + + m_piece_to_slot[i] = current_piece; + m_slot_to_piece[current_piece] = i; + m_have_piece[i] = true; + found = true; + break; + } + } + + if (!found) + { + m_slot_to_piece[current_piece] = -2; + + entry::integer_type last_pos = + m_info.total_size() - + m_info.piece_size( + m_info.num_pieces() - 1); + + // treat the last slot as unallocated space. + // this means that when we get to the last + // slot we are either allocating space for + // the last piece, or the last piece has already + // been allocated + if (current_piece == m_info.num_pieces() - 1) + m_unallocated_slots.push_back(current_piece); + else + m_free_slots.push_back(current_piece); + } + + // done with piece, move on to next + piece_offset = 0; + ++current_piece; + + bytes_to_read = m_info.piece_size(current_piece); + } + + std::cout << " m_free_slots: " << m_free_slots.size() << "\n"; + std::cout << " m_unallocated_slots: " << m_unallocated_slots.size() << "\n"; + std::cout << " num pieces: " << m_info.num_pieces() << "\n"; + + std::cout << " have_pieces: "; + print_bitmask(m_have_piece); + std::cout << "\n"; + std::cout << std::count(m_have_piece.begin(), m_have_piece.end(), true) << "\n"; + + check_invariant(); + } + + int piece_manager::slot_for_piece(int piece_index) + { + // synchronization ------------------------------------------------------ + boost::recursive_mutex::scoped_lock lock(m_mutex); + // ---------------------------------------------------------------------- + + assert(piece_index >= 0 && piece_index < m_piece_to_slot.size()); + assert(m_piece_to_slot.size() == m_slot_to_piece.size()); + + int slot_index = m_piece_to_slot[piece_index]; + + if (slot_index != -1) + { + assert(slot_index >= 0); + assert(slot_index < m_slot_to_piece.size()); + return slot_index; + } + + if (m_free_slots.empty()) + { + allocate_slots(5); + assert(!m_free_slots.empty()); + } + + std::vector::iterator iter( + std::find( + m_free_slots.begin() + , m_free_slots.end() + , piece_index)); + + if (iter == m_free_slots.end()) + { + iter = m_free_slots.end() - 1; + + // special case to make sure we don't use the last slot + // when we shouldn't, since it's smaller than ordinary slots + if (*iter == m_info.num_pieces() - 1 && piece_index != *iter) + { + if (m_free_slots.size() == 1) + allocate_slots(5); + assert(m_free_slots.size() > 1); + // TODO: assumes that all allocated slots + // are put at the end of the free_slots vector + iter = m_free_slots.end() - 1; + } + } + + slot_index = *iter; + m_free_pieces.erase(iter); + + assert(m_slot_to_piece[slot_index] == -2); + + m_slot_to_piece[slot_index] = piece_index; + m_piece_to_slot[piece_index] = slot_index; + + // there is another piece already assigned to + // the slot we are interested in, swap positions + if (m_slot_to_piece[piece_index] >= 0) + { + std::vector buf(m_info.piece_length()); + m_storage.read(&buf[0], piece_index, m_info.piece_length(), 0); + m_storage.write(&buf[0], slot_index, m_info.piece_length(), 0); + + std::swap( + m_slot_to_piece[piece_index] + , m_slot_to_piece[slot_index]); + + std::swap( + m_piece_to_slot[piece_index] + , m_piece_to_slot[slot_index]); + + slot_index = piece_index; + } + + return slot_index; + } + + void piece_manager::allocate_slots(int num_slots) + { + // synchronization ------------------------------------------------------ + boost::recursive_mutex::scoped_lock lock(m_mutex); + // ---------------------------------------------------------------------- + + namespace fs = boost::filesystem; + + std::cout << "allocating pieces...\n"; + + std::vector::iterator iter + = m_unallocated_slots.begin(); + std::vector::iterator end_iter + = m_unallocated_slots.end(); + + const size_type piece_size = m_info.piece_length(); + + std::vector zeros(piece_size, 0); + +// int last_piece_index = -1; + + for (int i = 0; i < num; ++i, ++iter) + { + if (iter == end_iter) + break; + + int pos = *iter; + int piece_pos = pos; +/* + const bool last_piece = (pos == m_info.num_pieces() - 1); + + if (last_piece) + last_piece_index = i; +*/ + + int new_free_slot = pos; + + if (m_piece_to_slot[pos] != -1) + { + assert(m_piece_to_slot[pos] >= 0); + m_storage.read(&zeros[0], m_piece_to_slot[pos], 0, m_info.piece_size(pos)); + new_free_slot = m_piece_to_slot[pos]; + m_slot_to_piece[pos] = pos; + m_piece_to_slot[pos] = pos; + } + + m_slot_to_piece[new_free_slot] = -2; + m_free_slots.push_back(new_free_slot); + + m_storage.write(&zeros[0], pos, 0, m_info.piece_size(pos)); +/* + torrent_info::file_iterator file_iter; + + for (file_iter = m_torrent_file->begin_files(); + pos > file_iter->size; ++file_iter) + { + pos -= file_iter->size; + } + + entry::integer_type bytes_left = last_piece + ? m_torrent_file->piece_size(m_torrent_file->num_pieces() - 1) + : piece_size; + + while (bytes_left > 0) + { + fs::path path(m_save_path / file_iter->path / file_iter->filename); + + fs::ofstream out; + + if (fs::exists(path)) + out.open(path, std::ios_base::binary | std::ios_base::in); + else + out.open(path, std::ios_base::binary); + + out.seekp(pos, std::ios_base::beg); + + assert((entry::integer_type)out.tellp() == pos); + + entry::integer_type bytes_to_write = bytes_left; + + if (pos + bytes_to_write >= file_iter->size) + { + bytes_to_write = file_iter->size - pos; + } + + out.write(&zeros[0], bytes_to_write); + + bytes_left -= bytes_to_write; + ++file_iter; + pos = 0; + } +*/ + m_slot_to_piece[piece_pos / piece_size] = -2; + } + + m_unallocated_slots.erase(m_unallocated_slots.begin(), iter); + +/* // move last slot to the end + if (last_piece_index != -1) + std::swap(m_free_pieces[last_piece_index], m_free_pieces.front()); +*/ + } + + void storage::check_invariant() const + { + // synchronization ------------------------------------------------------ + boost::recursive_mutex::scoped_lock lock(m_mutex); + // ---------------------------------------------------------------------- + + for (int i = 0; i < m_info.num_pieces(); ++i) + { + if (m_allocated_pieces[i] != m_info.piece_length() * i) + assert(m_slot_to_piece[i] < 0); + } + } + + +} // namespace libtorrent #endif diff --git a/src/torrent.cpp b/src/torrent.cpp index b5d826ce2..96c0300b0 100755 --- a/src/torrent.cpp +++ b/src/torrent.cpp @@ -495,7 +495,7 @@ namespace libtorrent m_picker.files_checked(m_storage.pieces()); #ifndef NDEBUG m_picker.integrity_check(this); -#endif +#endif } void torrent::second_tick()