From 63252959938fb79ec40a4c7279ca643e0470f908 Mon Sep 17 00:00:00 2001 From: Arvid Norberg Date: Tue, 4 Nov 2003 23:27:06 +0000 Subject: [PATCH] *** empty log message *** --- docs/index.html | 5 + examples/client_test.cpp | 17 ++- include/libtorrent/peer_connection.hpp | 45 +++++- include/libtorrent/peer_id.hpp | 2 +- include/libtorrent/piece_picker.hpp | 10 +- include/libtorrent/torrent.hpp | 20 +-- include/libtorrent/torrent_handle.hpp | 11 +- src/peer_connection.cpp | 186 +++++++++++++++++++++---- src/piece_picker.cpp | 72 +++++++++- src/policy.cpp | 91 ++++++++++-- src/session.cpp | 54 ++----- src/storage.cpp | 10 +- src/torrent.cpp | 32 ++--- src/torrent_handle.cpp | 3 + 14 files changed, 428 insertions(+), 130 deletions(-) diff --git a/docs/index.html b/docs/index.html index 2ce241c0e..d6a711c49 100755 --- a/docs/index.html +++ b/docs/index.html @@ -518,6 +518,7 @@ struct torrent_status state_t state; float progress; + boost::posix_time::time_duration next_announce; std::size_t total_download; std::size_t total_upload; }; @@ -577,6 +578,10 @@ current task is in the state member, it will be one of the following: +

+next_announce is the time until the torrent will announce itself to the tracker. +

+

total_download and total_upload is the number of bytes downloaded and uploaded to all peers, accumulated. diff --git a/examples/client_test.cpp b/examples/client_test.cpp index 8d24534bd..c0a7e31a3 100755 --- a/examples/client_test.cpp +++ b/examples/client_test.cpp @@ -36,6 +36,7 @@ POSSIBILITY OF SUCH DAMAGE. #include #include +//#include #include "libtorrent/entry.hpp" #include "libtorrent/bencode.hpp" @@ -115,6 +116,16 @@ bool sleep_and_input(char* c) return false; } +void set_cursor(int x, int y) +{ + std::cout << "\033[" << y << ";" << x << "H"; +} + +void clear() +{ + std::cout << "\033[2J"; +} + #endif std::string add_suffix(float val) @@ -237,6 +248,10 @@ int main(int argc, char* argv[]) << add_suffix(total_down) << ") " << add_suffix(down) << "/s u:(" << add_suffix(total_up) << ") " << add_suffix(up) << "/s\n"; + boost::posix_time::time_duration t = s.next_announce; +// std::cout << "next announce: " << boost::posix_time::to_simple_string(t) << "\n"; + std::cout << "next announce: " << t.hours() << ":" << t.minutes() << ":" << t.seconds() << "\n"; + i->get_download_queue(queue); for (std::vector::iterator i = queue.begin(); i != queue.end(); @@ -245,7 +260,7 @@ int main(int argc, char* argv[]) std::cout << i->piece_index << ": "; for (int j = 0; j < i->blocks_in_piece; ++j) { - if (i->finished_blocks[j]) std::cout << "#"; + if (i->finished_blocks[j]) std::cout << "+"; else if (i->requested_blocks[j]) std::cout << "-"; else std::cout << "."; } diff --git a/include/libtorrent/peer_connection.hpp b/include/libtorrent/peer_connection.hpp index 4bc2e6fbb..646084fb5 100755 --- a/include/libtorrent/peer_connection.hpp +++ b/include/libtorrent/peer_connection.hpp @@ -81,17 +81,29 @@ namespace libtorrent // this is the constructor where the we are teh active part. The peer_conenction // should handshake and verify that the other end has the correct id - peer_connection(detail::session_impl* ses, torrent* t, boost::shared_ptr s, const peer_id& p); + peer_connection( + detail::session_impl* ses + , selector& sel + , torrent* t + , boost::shared_ptr s + , const peer_id& p); // with this constructor we have been contacted and we still don't know which torrent the // connection belongs to - peer_connection(detail::session_impl* ses, boost::shared_ptr s); + peer_connection( + detail::session_impl* ses + , selector& sel + , boost::shared_ptr s); ~peer_connection(); // this adds an announcement in the announcement queue // it will let the peer know that we have the given piece - void announce_piece(int index) { m_announce_queue.push_back(index); } + void announce_piece(int index) + { + m_announce_queue.push_back(index); + send_buffer_updated(); + } // called from the main loop when this connection has any // work to do. @@ -122,6 +134,7 @@ namespace libtorrent void interested(); void not_interested(); void request_block(piece_block block); + void cancel_block(piece_block block); bool is_interesting() const throw() { return m_interesting; } bool has_choked() const throw() { return m_choked; } @@ -152,6 +165,7 @@ namespace libtorrent private: bool dispatch_message(); + void send_buffer_updated(); void send_bitfield(); void send_have(int index); @@ -200,6 +214,7 @@ namespace libtorrent boost::posix_time::ptime m_last_receive; boost::posix_time::ptime m_last_sent; + selector& m_selector; boost::shared_ptr m_socket; torrent* m_torrent; detail::session_impl* m_ses; @@ -207,6 +222,15 @@ namespace libtorrent // and false if we got an incomming connection bool m_active; + // this is true as long as this peer's + // socket is added to the selector to + // monitor writability. Each time we do + // something that generates data to be + // sent to this peer, we check this and + // if it's not added to the selector we + // add it. + bool m_added_to_selector; + // remote peer's id peer_id m_peer_id; @@ -242,6 +266,21 @@ namespace libtorrent stat m_statistics; }; + // this is called each time this peer generates some + // data to be sent. It will add this socket to + // the writibility monitor in the selector. + inline void peer_connection::send_buffer_updated() + { + if (!has_data()) return; + + if (!m_added_to_selector) + { + m_selector.monitor_writability(m_socket); + m_added_to_selector = true; + } + assert(m_added_to_selector); + assert(m_selector.is_writability_monitored(m_socket)); + } } #endif // TORRENT_PEER_CONNECTION_HPP_INCLUDED diff --git a/include/libtorrent/peer_id.hpp b/include/libtorrent/peer_id.hpp index c9c15a616..a13840c0f 100755 --- a/include/libtorrent/peer_id.hpp +++ b/include/libtorrent/peer_id.hpp @@ -92,7 +92,7 @@ namespace libtorrent os << std::hex << std::setw(2) << std::setfill('0') << static_cast(*i); } - os << std::dec << std::cout << std::setfill(' '); + os << std::dec << std::setfill(' '); return os; } diff --git a/include/libtorrent/piece_picker.hpp b/include/libtorrent/piece_picker.hpp index e368b2489..8aa4cf90b 100755 --- a/include/libtorrent/piece_picker.hpp +++ b/include/libtorrent/piece_picker.hpp @@ -54,6 +54,12 @@ namespace libtorrent {} int piece_index; int block_index; + bool operator==(const piece_block& b) const + { return piece_index == b.piece_index && block_index == b.block_index; } + + bool operator!=(const piece_block& b) const + { return piece_index != b.piece_index || block_index != b.block_index; } + }; class piece_picker @@ -133,9 +139,11 @@ namespace libtorrent // or if it already has been successfully downlloaded bool is_downloading(piece_block block) const; + bool is_finished(piece_block block) const; + // marks this piece-block as queued for downloading void mark_as_downloading(piece_block block, const peer_id& peer); - void mark_as_finished(piece_block block); + void mark_as_finished(piece_block block, const peer_id& peer); // if a piece had a hash-failure, it must be restured and // made available for redownloading diff --git a/include/libtorrent/torrent.hpp b/include/libtorrent/torrent.hpp index d93fb453e..14a5e3b32 100755 --- a/include/libtorrent/torrent.hpp +++ b/include/libtorrent/torrent.hpp @@ -152,8 +152,14 @@ namespace libtorrent // the given peer_id (should be kept at max 1) int num_connections(const peer_id& id) const; - std::vector::const_iterator begin() const { return m_connections.begin(); } - std::vector::const_iterator end() const { return m_connections.end(); } + typedef std::vector::iterator peer_iterator; + typedef std::vector::const_iterator peer_const_iterator; + + peer_const_iterator begin() const { return m_connections.begin(); } + peer_const_iterator end() const { return m_connections.end(); } + + peer_iterator begin() { return m_connections.begin(); } + peer_iterator end() { return m_connections.end(); } // -------------------------------------------- @@ -165,13 +171,13 @@ namespace libtorrent void tracker_request_timed_out() { - std::cout << "TRACKER TIMED OUT\n"; + std::cerr << "TRACKER TIMED OUT\n"; try_next_tracker(); } void tracker_request_error(const char* str) { - std::cout << "TRACKER ERROR: " << str << "\n"; + std::cerr << "TRACKER ERROR: " << str << "\n"; try_next_tracker(); } @@ -179,6 +185,8 @@ namespace libtorrent // to the tracker std::string generate_tracker_request(int port); + boost::posix_time::ptime next_announce() const + { return m_next_request; } // -------------------------------------------- // PIECE MANAGEMENT @@ -216,10 +224,6 @@ namespace libtorrent logger* spawn_logger(const char* title); #endif - // the number of blocks downloaded - // that hasn't been verified yet - int m_unverified_blocks; - private: void try_next_tracker(); diff --git a/include/libtorrent/torrent_handle.hpp b/include/libtorrent/torrent_handle.hpp index a20e2f7c6..fe979cec5 100755 --- a/include/libtorrent/torrent_handle.hpp +++ b/include/libtorrent/torrent_handle.hpp @@ -34,6 +34,7 @@ POSSIBILITY OF SUCH DAMAGE. #define TORRENT_TORRENT_HANDLE_HPP_INCLUDED #include +#include #include "libtorrent/peer_id.hpp" #include "libtorrent/peer_info.hpp" @@ -50,7 +51,8 @@ namespace libtorrent struct duplicate_torrent: std::exception { - virtual const char* what() const { return "torrent already exists in session"; } + virtual const char* what() const throw() + { return "torrent already exists in session"; } }; struct torrent_status @@ -66,6 +68,7 @@ namespace libtorrent state_t state; float progress; + boost::posix_time::time_duration next_announce; std::size_t total_download; std::size_t total_upload; }; @@ -84,17 +87,15 @@ namespace libtorrent struct torrent_handle { friend class session; - torrent_handle(): m_ses(0) {} - void get_peer_info(std::vector& v); void abort(); + void get_peer_info(std::vector& v); torrent_status status() const; - void get_download_queue(std::vector& queue) const; - // TODO: add a 'time to next announce' query. + // TODO: add force reannounce private: diff --git a/src/peer_connection.cpp b/src/peer_connection.cpp index e75926633..b79236708 100755 --- a/src/peer_connection.cpp +++ b/src/peer_connection.cpp @@ -71,17 +71,24 @@ namespace } -libtorrent::peer_connection::peer_connection(detail::session_impl* ses, torrent* t, boost::shared_ptr s, const peer_id& p) +libtorrent::peer_connection::peer_connection( + detail::session_impl* ses + , selector& sel + , torrent* t + , boost::shared_ptr s + , const peer_id& p) : m_state(read_protocol_length) , m_timeout(120) , m_packet_size(1) , m_recv_pos(0) , m_last_receive(std::time(0)) , m_last_sent(std::time(0)) + , m_selector(sel) , m_socket(s) , m_torrent(t) , m_ses(ses) , m_active(true) + , m_added_to_selector(false) , m_peer_id(p) , m_peer_interested(false) , m_peer_choked(true) @@ -107,17 +114,22 @@ libtorrent::peer_connection::peer_connection(detail::session_impl* ses, torrent* send_bitfield(); } -libtorrent::peer_connection::peer_connection(detail::session_impl* ses, boost::shared_ptr s) +libtorrent::peer_connection::peer_connection( + detail::session_impl* ses + , selector& sel + , boost::shared_ptr s) : m_state(read_protocol_length) , m_timeout(120) , m_packet_size(1) , m_recv_pos(0) , m_last_receive(std::time(0)) , m_last_sent(std::time(0)) + , m_selector(sel) , m_socket(s) , m_torrent(0) , m_ses(ses) , m_active(false) + , m_added_to_selector(false) , m_peer_id() , m_peer_interested(false) , m_peer_choked(true) @@ -161,6 +173,7 @@ void libtorrent::peer_connection::send_handshake() (*m_logger) << m_socket->sender().as_string() << " ==> HANDSHAKE\n"; #endif + send_buffer_updated(); } bool libtorrent::peer_connection::dispatch_message() @@ -305,6 +318,8 @@ bool libtorrent::peer_connection::dispatch_message() r.length = read_int(&m_recv_buffer[9]); m_requests.push_back(r); + send_buffer_updated(); + #if defined(TORRENT_VERBOSE_LOGGING) (*m_logger) << m_socket->sender().as_string() << " <== REQUEST [ piece: " << r.piece << " | s: " << r.start << " | l: " << r.length << " ]\n"; #endif @@ -351,7 +366,7 @@ bool libtorrent::peer_connection::dispatch_message() #endif return false; } - +/* piece_block req = m_download_queue.front(); if (req.piece_index != index) { @@ -368,31 +383,48 @@ bool libtorrent::peer_connection::dispatch_message() #endif return false; } - - m_receiving_piece.open(m_torrent->filesystem(), index, piece_file::out, offset); - +*/ #if defined(TORRENT_VERBOSE_LOGGING) (*m_logger) << m_socket->sender().as_string() << " <== PIECE [ piece: " << index << " | s: " << offset << " | l: " << len << " ]\n"; #endif - m_receiving_piece.write(&m_recv_buffer[9], len); m_torrent->downloaded_bytes(len); piece_picker& picker = m_torrent->picker(); piece_block block_finished(index, offset / m_torrent->block_size()); - picker.mark_as_finished(block_finished); - // pop the request that just finished - // from the download queue - m_download_queue.erase(m_download_queue.begin()); - m_torrent->m_unverified_blocks++; + std::vector::iterator b + = std::find( + m_download_queue.begin() + , m_download_queue.end() + , block_finished); + + if (b != m_download_queue.end()) + { + // pop the request that just finished + // from the download queue + m_download_queue.erase(b); + } + else + { + // TODO: cancel the block from the + // peer that has taken over it. + } + + if (picker.is_finished(block_finished)) break; + + m_receiving_piece.open(m_torrent->filesystem(), index, piece_file::out, offset); + m_receiving_piece.write(&m_recv_buffer[9], len); + m_receiving_piece.close(); + + picker.mark_as_finished(block_finished, m_peer_id); // did we just finish the piece? if (picker.is_piece_finished(index)) { - m_torrent->m_unverified_blocks -= picker.blocks_in_piece(index); - + m_receiving_piece.open(m_torrent->filesystem(), index, piece_file::in); bool verified = m_torrent->filesystem()->verify_piece(m_receiving_piece); + m_receiving_piece.close(); if (verified) { m_torrent->announce_piece(index); @@ -438,10 +470,15 @@ bool libtorrent::peer_connection::dispatch_message() m_requests.erase(i); } + if (!has_data() && m_added_to_selector) + { + m_added_to_selector = false; + m_selector.remove_writable(m_socket); + } + #if defined(TORRENT_VERBOSE_LOGGING) (*m_logger) << m_socket->sender().as_string() << " <== CANCEL [ piece: " << r.piece << " | s: " << r.start << " | l: " << r.length << " ]\n"; #endif - m_requests.clear(); break; } } @@ -449,6 +486,55 @@ bool libtorrent::peer_connection::dispatch_message() return true; } +void libtorrent::peer_connection::cancel_block(piece_block block) +{ + assert(block.piece_index >= 0); + assert(block.piece_index < m_torrent->torrent_file().num_pieces()); + assert(m_torrent->picker().is_downloading(block)); + + m_torrent->picker().abort_download(block); + + std::vector::iterator i + = std::find(m_download_queue.begin(), m_download_queue.end(), block); + assert(i != m_download_queue.end()); + + m_download_queue.erase(i); + + + int block_offset = block.block_index * m_torrent->block_size(); + int block_size + = std::min((int)m_torrent->torrent_file().piece_size(block.piece_index)-block_offset, + m_torrent->block_size()); + assert(block_size > 0); + assert(block_size <= m_torrent->block_size()); + + char buf[] = {0,0,0,13, msg_cancel}; + + std::size_t start_offset = m_send_buffer.size(); + m_send_buffer.resize(start_offset + 17); + + std::copy(buf, buf + 5, m_send_buffer.begin()+start_offset); + start_offset +=5; + + // index + write_int(block.piece_index, &m_send_buffer[start_offset]); + start_offset += 4; + + // begin + write_int(block_offset, &m_send_buffer[start_offset]); + start_offset += 4; + + // length + write_int(block_size, &m_send_buffer[start_offset]); + start_offset += 4; +#if defined(TORRENT_VERBOSE_LOGGING) + (*m_logger) << m_socket->sender().as_string() << " ==> CANCEL [ piece: " << block.piece_index << " | s: " << block_offset << " | l: " << block_size << " | " << block.block_index << " ]\n"; +#endif + assert(start_offset == m_send_buffer.size()); + + send_buffer_updated(); +} + void libtorrent::peer_connection::request_block(piece_block block) { assert(block.piece_index >= 0); @@ -493,6 +579,7 @@ void libtorrent::peer_connection::request_block(piece_block block) #endif assert(start_offset == m_send_buffer.size()); + send_buffer_updated(); } void libtorrent::peer_connection::send_bitfield() @@ -511,6 +598,7 @@ void libtorrent::peer_connection::send_bitfield() if (m_torrent->have_piece(i)) m_send_buffer[old_size + 5 + (i>>3)] |= 1 << (7 - (i&7)); } + send_buffer_updated(); } void libtorrent::peer_connection::choke() @@ -522,6 +610,7 @@ void libtorrent::peer_connection::choke() #if defined(TORRENT_VERBOSE_LOGGING) (*m_logger) << m_socket->sender().as_string() << " ==> CHOKE\n"; #endif + send_buffer_updated(); } void libtorrent::peer_connection::unchoke() @@ -533,6 +622,7 @@ void libtorrent::peer_connection::unchoke() #if defined(TORRENT_VERBOSE_LOGGING) (*m_logger) << m_socket->sender().as_string() << " ==> UNCHOKE\n"; #endif + send_buffer_updated(); } void libtorrent::peer_connection::interested() @@ -544,6 +634,7 @@ void libtorrent::peer_connection::interested() #if defined(TORRENT_VERBOSE_LOGGING) (*m_logger) << m_socket->sender().as_string() << " ==> INTERESTED\n"; #endif + send_buffer_updated(); } void libtorrent::peer_connection::not_interested() @@ -555,6 +646,7 @@ void libtorrent::peer_connection::not_interested() #if defined(TORRENT_VERBOSE_LOGGING) (*m_logger) << m_socket->sender().as_string() << " ==> NOT_INTERESTED\n"; #endif + send_buffer_updated(); } void libtorrent::peer_connection::send_have(int index) @@ -565,6 +657,7 @@ void libtorrent::peer_connection::send_have(int index) #if defined(TORRENT_VERBOSE_LOGGING) (*m_logger) << m_socket->sender().as_string() << " ==> HAVE [ piece: " << index << " ]\n"; #endif + send_buffer_updated(); } @@ -784,7 +877,7 @@ bool libtorrent::peer_connection::has_data() const throw() { // if we have requests or pending data to be sent or announcements to be made // we want to send data - return !m_requests.empty() || !m_send_buffer.empty() || !m_announce_queue.empty(); + return (!m_requests.empty() && !m_choked) || !m_send_buffer.empty() || !m_announce_queue.empty(); } // -------------------------- @@ -794,11 +887,17 @@ bool libtorrent::peer_connection::has_data() const throw() // throws exception when the client should be disconnected void libtorrent::peer_connection::send_data() { + assert(m_socket->is_writable()); assert(has_data()); - // only add new piece-chunks if the send buffer is empty + // only add new piece-chunks if the send buffer is small enough // otherwise there will be no end to how large it will be! - if (!m_requests.empty() && m_send_buffer.empty() && m_peer_interested && !m_choked) + // TODO: make ths a bit better. Don't always read the entire + // requested block. Have a limit of how much of the requested + // block is actually read at a time. + while (!m_requests.empty() + && (m_send_buffer.size() < m_torrent->block_size()) + && !m_choked) { peer_request& r = m_requests.front(); @@ -840,23 +939,25 @@ void libtorrent::peer_connection::send_data() m_sending_piece.read(&m_send_buffer[13], r.length); #if defined(TORRENT_VERBOSE_LOGGING) - (*m_logger) << m_socket->sender().as_string() << " ==> PIECE [ idx: " << r.piece << " | s: " << r.start << " | l: " << r.length << " | dest: " << m_socket->sender().as_string() << " ]\n"; + (*m_logger) << m_socket->sender().as_string() << " ==> PIECE [ idx: " << r.piece << " | s: " << r.start << " | l: " << r.length << " ]\n"; #endif // let the torrent keep track of how much we have uploaded m_torrent->uploaded_bytes(r.length); - m_requests.erase(m_requests.begin()); } else { #if defined(TORRENT_VERBOSE_LOGGING) - (*m_logger) << m_socket->sender().as_string() << " *** WARNING [ illegal piece request ]\n"; + (*m_logger) << m_socket->sender().as_string() << " *** WARNING [ illegal piece request idx: " << r.piece << " | s: " << r.start << " | l: " << r.length << " ]\n"; #endif } + m_requests.erase(m_requests.begin()); } if (!m_announce_queue.empty()) { - for (std::vector::iterator i = m_announce_queue.begin(); i != m_announce_queue.end(); ++i) + for (std::vector::iterator i = m_announce_queue.begin(); + i != m_announce_queue.end(); + ++i) { // (*m_logger) << "have piece: " << *i << " sent to: " << m_socket->sender().as_string() << "\n"; send_have(*i); @@ -867,12 +968,15 @@ void libtorrent::peer_connection::send_data() // send the actual buffer if (!m_send_buffer.empty()) { - // we have data that's scheduled for sending - std::size_t sent = m_socket->send(&m_send_buffer[0], m_send_buffer.size()); -#if defined(TORRENT_VERBOSE_LOGGING) + // we have data that's scheduled for sending + int sent = m_socket->send( + &m_send_buffer[0] + , m_send_buffer.size()); + + #if defined(TORRENT_VERBOSE_LOGGING) (*m_logger) << m_socket->sender().as_string() << " ==> SENT [ length: " << sent << " ]\n"; -#endif + #endif if (sent > 0) { @@ -882,14 +986,40 @@ void libtorrent::peer_connection::send_data() // only a part of the buffer could be sent // remove the part that was sent from the buffer if (sent == m_send_buffer.size()) + { m_send_buffer.clear(); + } else - m_send_buffer.erase(m_send_buffer.begin(), m_send_buffer.begin() + sent); + { + m_send_buffer.erase( + m_send_buffer.begin() + , m_send_buffer.begin() + sent); + } + } + else + { + assert(sent == -1); + throw network_error(m_socket->last_error()); } m_last_sent = boost::posix_time::second_clock::local_time(); } + assert(m_added_to_selector); + if (!has_data()) + { + m_selector.remove_writable(m_socket); + m_added_to_selector = false; + } +#ifndef NDEBUG + else + { + if (m_socket->is_writable()) + { + std::cout << "ERROR\n"; + } + } +#endif } @@ -905,6 +1035,6 @@ void libtorrent::peer_connection::keep_alive() #if defined(TORRENT_VERBOSE_LOGGING) (*m_logger) << m_socket->sender().as_string() << " ==> NOP\n"; #endif - + send_buffer_updated(); } } diff --git a/src/piece_picker.cpp b/src/piece_picker.cpp index d24e5d7ec..f19c7da81 100755 --- a/src/piece_picker.cpp +++ b/src/piece_picker.cpp @@ -424,7 +424,7 @@ namespace libtorrent int piece_blocks = std::min(blocks_in_piece(*i), num_blocks); for (int j = 0; j < piece_blocks; ++j) { - interesting_blocks.push_back(piece_block(*i, 0)); + interesting_blocks.push_back(piece_block(*i, j)); } num_blocks -= piece_blocks; if (num_blocks == 0) return num_blocks; @@ -442,6 +442,8 @@ namespace libtorrent for (int j = 0; j < num_blocks_in_piece; ++j) { + if (p->finished_blocks[j] == 1) continue; + interesting_blocks.push_back(piece_block(*i, j)); if (p->requested_blocks[j] == 0) { @@ -483,13 +485,27 @@ namespace libtorrent return i->requested_blocks[block.block_index]; } + bool piece_picker::is_finished(piece_block block) const + { + assert(block.piece_index < m_piece_map.size()); + assert(block.block_index < max_blocks_per_piece); + + if (m_piece_map[block.piece_index].index == 0xffffff) return true; + if (m_piece_map[block.piece_index].downloading == 0) return false; + std::vector::const_iterator i + = std::find_if(m_downloads.begin(), m_downloads.end(), has_index(block.piece_index)); + assert(i != m_downloads.end()); + return i->finished_blocks[block.block_index]; + } + + void piece_picker::mark_as_downloading(piece_block block, const peer_id& peer) { #ifndef NDEBUG integrity_check(); #endif assert(block.piece_index < m_piece_map.size()); - assert(block.block_index < max_blocks_per_piece); + assert(block.block_index < blocks_in_piece(block.piece_index)); piece_pos& p = m_piece_map[block.piece_index]; if (p.downloading == 0) @@ -517,27 +533,67 @@ namespace libtorrent #endif } - void piece_picker::mark_as_finished(piece_block block) + void piece_picker::mark_as_finished(piece_block block, const peer_id& peer) { #ifndef NDEBUG integrity_check(); #endif assert(block.piece_index < m_piece_map.size()); - assert(block.block_index < max_blocks_per_piece); + assert(block.block_index < blocks_in_piece(block.piece_index)); + + piece_pos& p = m_piece_map[block.piece_index]; + if (p.downloading == 0) + { + p.downloading = 1; + move(false, p.peer_count, p.index); + + downloading_piece dp; + dp.index = block.piece_index; + dp.requested_blocks[block.block_index] = 1; + dp.finished_blocks[block.block_index] = 1; + dp.info[block.block_index].peer = peer; + m_downloads.push_back(dp); + } + else + { + std::vector::iterator i + = std::find_if(m_downloads.begin(), m_downloads.end(), has_index(block.piece_index)); + assert(i != m_downloads.end()); + i->info[block.block_index].peer = peer; + i->requested_blocks[block.block_index] = 1; + i->finished_blocks[block.block_index] = 1; + } +#ifndef NDEBUG + integrity_check(); +#endif + } +/* + void piece_picker::mark_as_finished(piece_block block, const peer_id& peer) + { +#ifndef NDEBUG + integrity_check(); +#endif + assert(block.piece_index < m_piece_map.size()); + assert(block.block_index < blocks_in_piece(block.piece_index)); assert(m_piece_map[block.piece_index].downloading == 1); std::vector::iterator i = std::find_if(m_downloads.begin(), m_downloads.end(), has_index(block.piece_index)); assert(i != m_downloads.end()); - assert(i->requested_blocks[block.block_index] == 1); i->finished_blocks[block.block_index] = 1; + // the block may have been requested, then cancled + // and requested by a peer that disconnects + // that way we can actually receive the piece + // without the requested bit is set. + i->requested_blocks[block.block_index] = 1; i->info[block.block_index].num_downloads++; + i->info[block.block_index].peer = peer; #ifndef NDEBUG integrity_check(); #endif } - +*/ void piece_picker::get_downloaders(std::vector& d, int index) { std::vector::iterator i @@ -569,6 +625,10 @@ namespace libtorrent std::vector::iterator i = std::find_if(m_downloads.begin(), m_downloads.end(), has_index(block.piece_index)); assert(i != m_downloads.end()); + + if (i->finished_blocks[block.block_index]) return; + + assert(block.block_index < blocks_in_piece(block.piece_index)); assert(i->requested_blocks[block.block_index] == 1); // clear this block as being downloaded diff --git a/src/policy.cpp b/src/policy.cpp index 130646dd5..333846f89 100755 --- a/src/policy.cpp +++ b/src/policy.cpp @@ -43,20 +43,54 @@ namespace { // we try to maintain 4 requested blocks in the download // queue - request_queue = 4 + request_queue = 16 }; using namespace libtorrent; + + + // TODO: replace these two functions with std::find_first_of + template + bool has_intersection(It1 start1, It1 end1, It2 start2, It2 end2) + { + for (;start1 != end1; ++start1) + for (;start2 != end2; ++start2) + if (*start1 == *start2) return true; + return false; + } + + piece_block find_first_common(const std::vector& queue, + const std::vector& busy) + { + for (std::vector::const_reverse_iterator i + = queue.rbegin(); + i != queue.rend(); + ++i) + { + for (std::vector::const_iterator j + = busy.begin(); + j != busy.end(); + ++j) + { + if ((*j) == (*i)) return *i; + } + } + assert(false); + } + void request_a_block(torrent& t, peer_connection& c) { + int num_requests = request_queue - c.download_queue().size(); + + // if our request queue is already full, we + // don't have to make any new requests yet + if (num_requests <= 0) return; + piece_picker& p = t.picker(); std::vector interesting_pieces; interesting_pieces.reserve(100); - int num_requests = request_queue - c.download_queue().size(); - if (num_requests <= 0) num_requests = 1; - // picks the interesting pieces from this peer // the integer is the number of pieces that // should be guaranteed to be available for download @@ -85,12 +119,53 @@ namespace // ok, we found a piece that's not being downloaded // by somebody else. request it from this peer c.request_block(*i); - num_requests++; - if (num_requests >= request_queue) return; + num_requests--; + if (num_requests <= 0) return; } - // TODO: compare this peer's bandwidth against the - // ones downloading these pieces (busy_pieces) + if (busy_pieces.empty()) return; + + // first look for blocks that are just queued + // and not actually sent to us yet + // (then we can cancel those and request them + // from this peer instead) + + peer_connection* peer = 0; + float down_speed = 0.f; + // find the peer with the lowest download + // speed that also has a piece thatt this + // peer could send us + for (torrent::peer_iterator i = t.begin(); + i != t.end(); + ++i) + { + const std::vector& queue = (*i)->download_queue(); + if ((*i)->statistics().down_peak() > down_speed + && has_intersection(busy_pieces.begin(), + busy_pieces.end(), + queue.begin(), + queue.end())) + { + peer = *i; + down_speed = (*i)->statistics().down_peak(); + } + } + + assert(peer != 0); + + // this peer doesn't have a faster connection than the + // slowest peer. Don't take over any blocks + if (c.statistics().down_peak() <= down_speed) return; + + // find a suitable block to take over from this peer + piece_block block = find_first_common(peer->download_queue(), busy_pieces); + peer->cancel_block(block); + c.request_block(block); + + // the one we interrupted may need to request a new piece + request_a_block(t, *peer); + + num_requests--; } } diff --git a/src/session.cpp b/src/session.cpp index ab2fa41d3..1304316f7 100755 --- a/src/session.cpp +++ b/src/session.cpp @@ -111,9 +111,6 @@ namespace libtorrent boost::mutex::scoped_lock l(m_mutex); if (!t->abort) { -#ifndef NDEBUG - std::cout << "adding torrent to session!\n"; -#endif boost::mutex::scoped_lock l(m_ses->m_mutex); m_ses->m_torrents.insert( @@ -123,7 +120,7 @@ namespace libtorrent catch(...) { #ifndef NDEBUG - std::cout << "error while checking files\n"; + std::cerr << "error while checking files\n"; #endif } @@ -199,7 +196,8 @@ namespace libtorrent } catch(network_error&) { - if (m_listen_port > max_port) throw; + if (m_listen_port > max_port) + throw; m_listen_port++; continue; } @@ -230,29 +228,15 @@ namespace libtorrent std::vector > writable_clients; std::vector > error_clients; boost::posix_time::ptime timer = boost::posix_time::second_clock::local_time(); -#ifdef TORRENT_DEBUG_SOCKETS - int num_loops = 0; -#endif + for(;;) { -#ifndef NDEBUG - std::clock_t time__ = std::clock(); -#endif - - // if nothing happens within 500000 microseconds (0.5 seconds) // do the loop anyway to check if anything else has changed // (*m_logger) << "sleeping\n"; m_selector.wait(500000, readable_clients, writable_clients, error_clients); boost::mutex::scoped_lock l(m_mutex); -#ifdef TORRENT_DEBUG_SOCKETS - num_loops++; -#endif - - - assert(readable_clients.size() + writable_clients.size() + error_clients.size() > 0 - || (std::clock() - time__) > CLOCKS_PER_SEC / 3); // +1 for the listen socket assert(m_selector.count_read_monitors() == m_connections.size() + 1); @@ -301,7 +285,7 @@ namespace libtorrent // s->set_send_bufsize(2048); // TODO: add some possibility to filter IP:s - boost::shared_ptr c(new peer_connection(this, s)); + boost::shared_ptr c(new peer_connection(this, m_selector, s)); m_connections.insert(std::make_pair(s, c)); m_selector.monitor_readability(s); m_selector.monitor_errors(s); @@ -356,11 +340,6 @@ namespace libtorrent assert(p->second->has_data()); // (*m_logger) << "writable: " << p->first->sender().as_string() << "\n"; p->second->send_data(); - // if the peer doesn't have - // any data left to send, remove it - // from the writabilty monitor - if (!p->second->has_data()) - m_selector.remove_writable(p->first); } catch(network_error&) { @@ -396,8 +375,9 @@ namespace libtorrent i != m_connections.end(); ++i) { - if (m_selector.is_writability_monitored(i->first)) - assert(i->second->has_data()); + assert(i->second->has_data() == m_selector.is_writability_monitored(i->first)); +// if (m_selector.is_writability_monitored(i->first)) +// assert(i->second->has_data()); } #endif @@ -450,17 +430,6 @@ namespace libtorrent // THE SECTION BELOW IS EXECUTED ONCE EVERY SECOND // ************************ - -#ifdef TORRENT_DEBUG_SOCKETS - std::cout << "\nloops: " << num_loops << "\n"; - if (num_loops > 1300) - { - int i = 0; - } - num_loops = 0; -#endif - - // do the second_tick() on each connection // this will update their statistics (download and upload speeds) // also purge sockets that have timed out @@ -482,9 +451,6 @@ namespace libtorrent } j->second->keep_alive(); - - if (j->second->has_data() && !m_selector.is_writability_monitored(j->first)) - m_selector.monitor_writability(j->first); } // check each torrent for abortion or @@ -538,11 +504,11 @@ namespace libtorrent } catch(const std::exception& e) { - std::cout << e.what() << "\n"; + std::cerr << e.what() << "\n"; } catch(...) { - std::cout << "error\n"; + std::cerr << "error\n"; } diff --git a/src/storage.cpp b/src/storage.cpp index fff10d991..654e19483 100755 --- a/src/storage.cpp +++ b/src/storage.cpp @@ -286,6 +286,7 @@ void libtorrent::piece_file::seek_forward(int step) bool libtorrent::storage::verify_piece(piece_file& file) { int index = file.index(); + assert(index >= 0 && index < m_have_piece.size()); if (m_have_piece[index]) return true; std::vector buffer(m_torrent_file->piece_size(index)); @@ -353,7 +354,6 @@ void libtorrent::storage::initialize_pieces(torrent* t, */ - // we don't know of any piece we have right now. Initialize // it to say we don't have anything and fill it in later on. m_have_piece.resize(m_torrent_file->num_pieces()); @@ -382,10 +382,6 @@ void libtorrent::storage::initialize_pieces(torrent* t, char zeros[chunksize]; std::fill(zeros, zeros+chunksize, 0); -#ifndef NDEBUG - std::cout << "allocating files\n"; -#endif - // remember which directories we have created, so // we don't have to ask the filesystem all the time std::set created_directories; @@ -491,10 +487,6 @@ void libtorrent::storage::initialize_pieces(torrent* t, // std::cout << "\n"; } -#ifndef NDEBUG - std::cout << "allocation/checking DONE!\n"; -#endif - } /* // reads the piece with the given index from disk diff --git a/src/torrent.cpp b/src/torrent.cpp index 109cd76f1..49dda7920 100755 --- a/src/torrent.cpp +++ b/src/torrent.cpp @@ -143,7 +143,6 @@ namespace libtorrent , m_bytes_uploaded(0) , m_bytes_downloaded(0) , m_torrent_file(torrent_file) - , m_unverified_blocks(0) , m_next_request(boost::posix_time::second_clock::local_time()) , m_duration(1800) , m_policy(new policy(this)) @@ -172,6 +171,7 @@ namespace libtorrent // connect to random peers from the list std::random_shuffle(peer_list.begin(), peer_list.end()); + std::cout << "interval: " << m_duration << "\n"; std::cout << "peers:\n"; for (std::vector::const_iterator i = peer_list.begin(); @@ -179,15 +179,8 @@ namespace libtorrent ++i) { std::cout << " " << std::setfill(' ') << std::setw(16) << i->ip - << " " << std::setw(5) << std::dec << i->port << " "; - for (const unsigned char* j = i->id.begin(); - j != i->id.end(); - ++j) - { - std::cout << std::hex << std::setw(2) << std::setfill('0') - << static_cast(*j); - } - std::cout << std::dec << " " << extract_fingerprint(i->id) << "\n"; + << " " << std::setw(5) << std::dec << i->port << " " + << i->id << " " << extract_fingerprint(i->id) << "\n"; } std::cout << std::setfill(' '); @@ -332,7 +325,7 @@ namespace libtorrent if (p->has_piece(i)) peer_lost(i); } - std::cout << p->get_socket()->sender().as_string() << " *** DISCONNECT\n"; +// std::cout << p->get_socket()->sender().as_string() << " *** DISCONNECT\n"; m_policy->connection_closed(*p); m_connections.erase(i); @@ -348,14 +341,18 @@ namespace libtorrent // TODO: the send buffer size should be controllable from the outside // s->set_send_bufsize(2048); s->connect(a); - boost::shared_ptr c(new peer_connection(m_ses, this, s, id)); + boost::shared_ptr c(new peer_connection( + m_ses + , m_ses->m_selector + , this + , s + , id)); detail::session_impl::connection_map::iterator p = m_ses->m_connections.insert(std::make_pair(s, c)).first; attach_peer(boost::get_pointer(p->second)); - m_ses->m_selector.monitor_writability(s); m_ses->m_selector.monitor_readability(s); m_ses->m_selector.monitor_errors(s); - std::cout << "connecting to: " << a.as_string() << ":" << a.port() << "\n"; +// std::cout << "connecting to: " << a.as_string() << ":" << a.port() << "\n"; } void torrent::close_all_connections() @@ -415,7 +412,7 @@ namespace libtorrent int blocks_per_piece = m_torrent_file.piece_length() / m_block_size; - assert(m_unverified_blocks == m_picker.unverified_blocks()); + int unverified_blocks = m_picker.unverified_blocks(); int blocks_we_have = num_pieces * blocks_per_piece; const int last_piece = m_torrent_file.num_pieces()-1; @@ -428,9 +425,12 @@ namespace libtorrent // TODO: Implement total download and total_upload st.total_download = 0; st.total_upload = 0; - st.progress = (blocks_we_have + m_unverified_blocks) + st.progress = (blocks_we_have + unverified_blocks) / static_cast(total_blocks); + st.next_announce = next_announce() + - boost::posix_time::second_clock::local_time(); + if (num_pieces == p.size()) st.state = torrent_status::seeding; else diff --git a/src/torrent_handle.cpp b/src/torrent_handle.cpp index 9bc1a25d3..da2aa218f 100755 --- a/src/torrent_handle.cpp +++ b/src/torrent_handle.cpp @@ -72,6 +72,7 @@ namespace libtorrent st.total_download = 0; st.progress = 0.f; st.state = torrent_status::invalid_handle; + st.next_announce = boost::posix_time::time_duration(); return st; } @@ -95,6 +96,7 @@ namespace libtorrent else st.state = torrent_status::queued_for_checking; st.progress = d->progress; + st.next_announce = boost::posix_time::time_duration(); return st; } } @@ -104,6 +106,7 @@ namespace libtorrent st.total_download = 0; st.progress = 0.f; st.state = torrent_status::invalid_handle; + st.next_announce = boost::posix_time::time_duration(); return st; }