diff --git a/ChangeLog b/ChangeLog index 515ea6777..a5b40fa08 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,8 @@ + * added the ability to limit the number of simultaneous half-open + TCP connections. Flags in peer_info has been added. + +release 0.9.1 + * strings that are invalid utf-8 strings are now decoded with the local codepage on windows * added the ability to build libtorrent both as a shared library diff --git a/docs/manual.html b/docs/manual.html index 900408fca..6af8f3a14 100755 --- a/docs/manual.html +++ b/docs/manual.html @@ -1,41 +1,14 @@ - - - + + + -Rasterbar Software | Products | libtorrent - - - - - - + + +libtorrent manual + + - -
-
- -
- - -
-

libtorrent manual

@@ -1800,7 +1773,9 @@ struct peer_info remote_interested = 0x4, remote_choked = 0x8, supports_extensions = 0x10, - local_connection = 0x20 + local_connection = 0x20, + connecting = 0x40, + queued = 0x80 }; unsigned int flags; address ip; @@ -1858,6 +1833,15 @@ listen port open, and that port is the same as in the peer connection was opened by this peer connecting to us. + + + + + +
connectingThe connection is in a half-open state (i.e. it is +being connected).
queuedThe connection is currently queued for a connection +attempt. This may happen if there is a limit set on +the number of half-open TCP connections.

The ip field is the IP-address to this peer. Its type is a wrapper around the @@ -3266,13 +3250,5 @@ scripts.

sf_logo

-
- - - - - diff --git a/docs/manual.rst b/docs/manual.rst index 0652f4b50..20f2db306 100755 --- a/docs/manual.rst +++ b/docs/manual.rst @@ -1744,7 +1744,9 @@ It contains the following fields:: remote_interested = 0x4, remote_choked = 0x8, supports_extensions = 0x10, - local_connection = 0x20 + local_connection = 0x20, + connecting = 0x40, + queued = 0x80 }; unsigned int flags; address ip; @@ -1792,6 +1794,13 @@ any combination of the enums above. The following table describes each flag: | | peer connection was opened by this peer connecting to | | | us. | +-------------------------+-------------------------------------------------------+ +| ``connecting`` | The connection is in a half-open state (i.e. it is | +| | being connected). | ++-------------------------+-------------------------------------------------------+ +| ``queued`` | The connection is currently queued for a connection | +| | attempt. This may happen if there is a limit set on | +| | the number of half-open TCP connections. | ++-------------------------+-------------------------------------------------------+ __ extension_protocol.html diff --git a/docs/udp_tracker_protocol.html b/docs/udp_tracker_protocol.html index d51831766..dc3c28829 100644 --- a/docs/udp_tracker_protocol.html +++ b/docs/udp_tracker_protocol.html @@ -1,41 +1,14 @@ - - - + + + -Rasterbar Software | Products | libtorrent - - - - - - + + +Bittorrent udp-tracker protocol extension + + - -
-
- -
- - -
-

Bittorrent udp-tracker protocol extension

@@ -529,13 +502,5 @@ from the 20 bytes hash calculated.

Protocol designed by Olaf van der Spek

- - - - - - diff --git a/examples/client_test.cpp b/examples/client_test.cpp index 2bb75420c..b0a2e2e34 100755 --- a/examples/client_test.cpp +++ b/examples/client_test.cpp @@ -273,7 +273,18 @@ void print_peer_info(std::ostream& out, std::vector const out << progress_bar(0.f, 15); } - out << " " << identify_client(i->id) << "\n"; + if (i->flags & peer_info::connecting) + { + out << esc("31") << " connecting to peer" << esc("0") << "\n"; + } + else if (i->flags & peer_info::queued) + { + out << esc("33") << " queued" << esc("0") << "\n"; + } + else + { + out << " " << identify_client(i->id) << "\n"; + } } } @@ -403,6 +414,7 @@ int main(int ac, char* av[]) float preferred_ratio; int download_limit; int upload_limit; + int half_open_limit; std::string save_path_str; std::string log_level; std::string ip_filter_file; @@ -447,6 +459,8 @@ int main(int ac, char* av[]) ("poll-interval,t", po::value(&poll_interval)->default_value(2) , "if a directory is being monitored, this is the interval (given " "in seconds) between two refreshes of the directory listing") + ("half-open-limit,a", po::value(&half_open_limit)->default_value(-1) + , "Sets the maximum number of simultaneous half-open tcp connections") ; po::positional_options_description p; @@ -466,6 +480,7 @@ int main(int ac, char* av[]) if (download_limit <= 0) download_limit = -1; if (upload_limit <= 0) upload_limit = -1; if (poll_interval < 2) poll_interval = 2; + if (half_open_limit < 1) half_open_limit = -1; if (!monitor_dir.empty() && !exists(monitor_dir)) { std::cerr << "The monitor directory doesn't exist: " << monitor_dir.string() << std::endl; @@ -501,6 +516,7 @@ int main(int ac, char* av[]) handles_t handles; session ses; + ses.set_max_half_open_connections(half_open_limit); ses.set_download_rate_limit(download_limit); ses.set_upload_rate_limit(upload_limit); ses.listen_on(std::make_pair(listen_port, listen_port + 10)); @@ -662,7 +678,7 @@ int main(int ac, char* av[]) if (torrent_finished_alert* p = dynamic_cast(a.get())) { // limit the bandwidth for all seeding torrents - p->handle.set_max_connections(10); + p->handle.set_max_connections(60); //p->handle.set_max_uploads(5); //p->handle.set_upload_limit(10000); @@ -755,7 +771,8 @@ int main(int ac, char* av[]) out << "info-hash: " << h.info_hash() << "\n"; boost::posix_time::time_duration t = s.next_announce; - out << "next announce: " << esc("37") << boost::posix_time::to_simple_string(t) << esc("0") << "\n"; + out << "next announce: " << esc("37") << + boost::posix_time::to_simple_string(t) << esc("0") << "\n"; out << "tracker: " << s.current_tracker << "\n"; } diff --git a/include/libtorrent/peer_connection.hpp b/include/libtorrent/peer_connection.hpp index 0f3b65f97..c422fee60 100755 --- a/include/libtorrent/peer_connection.hpp +++ b/include/libtorrent/peer_connection.hpp @@ -86,7 +86,9 @@ namespace libtorrent protocol_error(const std::string& msg): std::runtime_error(msg) {}; }; - class TORRENT_EXPORT peer_connection: public boost::noncopyable + class TORRENT_EXPORT peer_connection + : public boost::noncopyable + , public boost::enable_shared_from_this { friend class invariant_access; public: @@ -98,7 +100,8 @@ namespace libtorrent detail::session_impl& ses , selector& sel , torrent* t - , boost::shared_ptr s); + , boost::shared_ptr s + , address const& remote); // with this constructor we have been contacted and we still don't // know which torrent the connection belongs to @@ -123,7 +126,7 @@ namespace libtorrent // work to do. void send_data(); void receive_data(); - + // tells if this connection has data it want to send // and has enough upload bandwidth quota left to send it. bool can_write() const; @@ -171,6 +174,7 @@ namespace libtorrent void second_tick(); boost::shared_ptr get_socket() const { return m_socket; } + address const& remote() const { return m_remote; } const peer_id& get_peer_id() const { return m_peer_id; } const std::vector& get_bitfield() const; @@ -182,6 +186,27 @@ namespace libtorrent void disconnect(); bool is_disconnecting() const { return m_disconnecting; } + // this is called when the connection attempt has succeeded + // and the peer_connection is supposed to set m_connecting + // to false, and stop monitor writability + void connection_complete(); + + // returns true if this connection is still waiting to + // finish the connection attempt + bool is_connecting() const { return m_connecting; } + + // returns true if the socket of this peer hasn't been + // attempted to connect yet (i.e. it's queued for + // connection attempt). + bool is_queued() const { return m_queued; } + + // called when it's time for this peer_conncetion to actually + // initiate the tcp connection. This may be postponed until + // the library isn't using up the limitation of half-open + // tcp connections. + void connect(); + + // This is called for every peer right after the upload // bandwidth has been distributed among them // It will reset the used bandwidth to 0 and @@ -376,6 +401,7 @@ namespace libtorrent // peer's socket from the writability monitor list. selector& m_selector; boost::shared_ptr m_socket; + address m_remote; // this is the torrent this connection is // associated with. If the connection is an @@ -529,8 +555,9 @@ namespace libtorrent // this was the request we sent std::pair m_last_metadata_request; - // this is true until this socket has received - // data for the first time. While connecting + // this is true until this socket has become + // writable for the first time (i.e. the + // connection completed). While connecting // the timeout will not be triggered. This is // because windows XP SP2 may delay connection // attempts, which means that the connection @@ -538,6 +565,11 @@ namespace libtorrent // time out is reached. bool m_connecting; + // This is true until connect is called on the + // peer_connection's socket. It is false on incoming + // connections. + bool m_queued; + // the number of bytes of metadata we have received // so far from this per, only counting the current // request. Any previously finished requests diff --git a/include/libtorrent/peer_info.hpp b/include/libtorrent/peer_info.hpp index ae6e31d0b..05d20459f 100755 --- a/include/libtorrent/peer_info.hpp +++ b/include/libtorrent/peer_info.hpp @@ -51,7 +51,9 @@ namespace libtorrent remote_interested = 0x4, remote_choked = 0x8, supports_extensions = 0x10, - local_connection = 0x20 + local_connection = 0x20, + connecting = 0x40, + queued = 0x80 }; unsigned int flags; address ip; diff --git a/include/libtorrent/session.hpp b/include/libtorrent/session.hpp index 21cd2d18e..7af375f96 100755 --- a/include/libtorrent/session.hpp +++ b/include/libtorrent/session.hpp @@ -172,8 +172,11 @@ namespace libtorrent friend class invariant_access; // TODO: maybe this should be changed to a sorted vector // using lower_bound? - typedef std::map, boost::shared_ptr > connection_map; + typedef std::map, boost::shared_ptr > + connection_map; typedef std::map > torrent_map; + typedef std::deque > + connection_queue; session_impl( std::pair listen_port_range @@ -193,16 +196,31 @@ namespace libtorrent tracker_manager m_tracker_manager; torrent_map m_torrents; + // this will see if there are any pending connection attempts + // and in that case initiate new connections until the limit + // is reached. + void process_connection_queue(); + // this maps sockets to their peer_connection // object. It is the complete list of all connected // peers. connection_map m_connections; + + // this is a list of half-open tcp connections + // (only outgoing connections) + connection_map m_half_open; + + // this is a queue of pending outgoing connections. If the + // list of half-open connections is full (given the global + // limit), new outgoing connections are put on this queue, + // waiting for one slot in the half-open queue to open up. + connection_queue m_connection_queue; // this is a list of iterators into the m_connections map // that should be disconnected as soon as possible. // It is used to delay disconnections to avoid troubles // in loops that iterate over them. - std::vector m_disconnect_peer; + std::vector > m_disconnect_peer; // filters incomming connections ip_filter m_ip_filter; @@ -253,6 +271,9 @@ namespace libtorrent int m_download_rate; int m_max_uploads; int m_max_connections; + // the number of simultaneous half-open tcp + // connections libtorrent will have. + int m_half_open_limit; // statistics gathered from all torrents. stat m_stat; @@ -379,6 +400,7 @@ namespace libtorrent void set_download_rate_limit(int bytes_per_second); void set_max_uploads(int limit); void set_max_connections(int limit); + void set_max_half_open_connections(int limit); std::auto_ptr pop_alert(); void set_severity_level(alert::severity_t s); diff --git a/include/libtorrent/socket.hpp b/include/libtorrent/socket.hpp index babd1004b..386798c11 100755 --- a/include/libtorrent/socket.hpp +++ b/include/libtorrent/socket.hpp @@ -133,7 +133,7 @@ namespace libtorrent void set_blocking(bool blocking); bool is_blocking() { return m_blocking; } - const address& sender() const { return m_sender; } + const address& sender() const { assert(m_sender != address()); return m_sender; } address name() const; void listen(libtorrent::address const& iface, int queue); diff --git a/include/libtorrent/torrent.hpp b/include/libtorrent/torrent.hpp index f928d5aa5..b2087ef94 100755 --- a/include/libtorrent/torrent.hpp +++ b/include/libtorrent/torrent.hpp @@ -176,6 +176,7 @@ namespace libtorrent torrent_status status() const; void use_interface(const char* net_interface); + address const& interface() const { return m_net_interface; } peer_connection& connect_to_peer(const address& a); void set_ratio(float ratio) diff --git a/src/peer_connection.cpp b/src/peer_connection.cpp index 4a97e8f04..a2ec43f43 100755 --- a/src/peer_connection.cpp +++ b/src/peer_connection.cpp @@ -77,7 +77,8 @@ namespace libtorrent detail::session_impl& ses , selector& sel , torrent* t - , boost::shared_ptr s) + , boost::shared_ptr s + , address const& remote) : #ifndef NDEBUG m_last_choke(boost::posix_time::second_clock::universal_time() @@ -92,11 +93,12 @@ namespace libtorrent , m_last_sent(second_clock::universal_time()) , m_selector(sel) , m_socket(s) + , m_remote(remote) , m_torrent(t) , m_attached_to_torrent(true) , m_ses(ses) , m_active(true) - , m_writability_monitored(false) + , m_writability_monitored(true) , m_readability_monitored(true) , m_peer_interested(false) , m_peer_choked(true) @@ -119,9 +121,11 @@ namespace libtorrent boost::gregorian::date(1970, boost::date_time::Jan, 1) , boost::posix_time::seconds(0)) , m_waiting_metadata_request(false) - , m_connecting(false) + , m_connecting(true) + , m_queued(true) , m_metadata_progress(0) { + m_selector.monitor_writability(m_socket); INVARIANT_CHECK; // these numbers are used the first second of connection. @@ -158,8 +162,8 @@ namespace libtorrent assert(m_torrent != 0); #ifdef TORRENT_VERBOSE_LOGGING - m_logger = m_ses.create_log(s->sender().as_string() + "_" - + boost::lexical_cast(s->sender().port)); + m_logger = m_ses.create_log(m_remote.as_string() + "_" + + boost::lexical_cast(m_remote.port)); (*m_logger) << "*** OUTGOING CONNECTION\n"; #endif @@ -202,6 +206,7 @@ namespace libtorrent , m_last_sent(second_clock::universal_time()) , m_selector(sel) , m_socket(s) + , m_remote(s->sender()) , m_torrent(0) , m_attached_to_torrent(false) , m_ses(ses) @@ -230,7 +235,8 @@ namespace libtorrent boost::gregorian::date(1970, boost::date_time::Jan, 1) , boost::posix_time::seconds(0)) , m_waiting_metadata_request(false) - , m_connecting(true) + , m_connecting(false) + , m_queued(false) , m_metadata_progress(0) { INVARIANT_CHECK; @@ -274,8 +280,9 @@ namespace libtorrent std::fill(m_peer_id.begin(), m_peer_id.end(), 0); #ifdef TORRENT_VERBOSE_LOGGING - m_logger = m_ses.create_log(s->sender().as_string() + "_" - + boost::lexical_cast(s->sender().port)); + assert(m_socket->sender() == remote()); + m_logger = m_ses.create_log(remote().as_string() + "_" + + boost::lexical_cast(remote().port)); (*m_logger) << "*** INCOMING CONNECTION\n"; #endif @@ -374,7 +381,7 @@ namespace libtorrent if (m_torrent) { torrent::peer_iterator i = m_torrent->m_connections.find( - get_socket()->sender()); + remote()); assert(i == m_torrent->m_connections.end()); } } @@ -939,7 +946,7 @@ namespace libtorrent m_torrent->alerts().post_alert(invalid_request_alert( r , m_torrent->get_handle() - , m_socket->sender() + , m_remote , m_peer_id , "peer sent an illegal request, ignoring")); } @@ -1058,7 +1065,7 @@ namespace libtorrent { m_torrent->alerts().post_alert( peer_error_alert( - m_socket->sender() + m_remote , m_peer_id , "got a block that was not requested")); } @@ -1077,7 +1084,7 @@ namespace libtorrent bool was_finished = picker.num_filtered() + m_torrent->num_pieces() == m_torrent->torrent_file().num_pieces(); - picker.mark_as_finished(block_finished, m_socket->sender()); + picker.mark_as_finished(block_finished, m_remote); m_torrent->get_policy().block_finished(*this, block_finished); @@ -1297,7 +1304,7 @@ namespace libtorrent m_torrent->alerts().post_alert( chat_message_alert( m_torrent->get_handle() - , m_socket->sender(), str)); + , m_remote, str)); } } @@ -1435,7 +1442,7 @@ namespace libtorrent << "<== LISTEN_PORT [ port: " << port << " ]\n"; #endif - address adr = m_socket->sender(); + address adr = m_remote; adr.port = port; m_torrent->get_policy().peer_from_tracker(adr, m_peer_id); } @@ -1450,14 +1457,17 @@ namespace libtorrent void peer_connection::disconnect() { if (m_disconnecting) return; - detail::session_impl::connection_map::iterator i - = m_ses.m_connections.find(m_socket); + + assert((m_ses.m_connections.find(m_socket) != m_ses.m_connections.end()) + == !m_connecting); + m_disconnecting = true; - assert(i != m_ses.m_connections.end()); + assert(std::find(m_ses.m_disconnect_peer.begin() - , m_ses.m_disconnect_peer.end(), i) + , m_ses.m_disconnect_peer.end(), shared_from_this()) == m_ses.m_disconnect_peer.end()); - m_ses.m_disconnect_peer.push_back(i); + + m_ses.m_disconnect_peer.push_back(shared_from_this()); } bool peer_connection::dispatch_message(int received) @@ -1634,7 +1644,7 @@ namespace libtorrent assert(block.block_index < m_torrent->torrent_file().piece_size(block.piece_index)); assert(!m_torrent->picker().is_downloading(block)); - m_torrent->picker().mark_as_downloading(block, m_socket->sender()); + m_torrent->picker().mark_as_downloading(block, m_remote); m_request_queue.push_back(block); send_block_requests(); } @@ -2141,7 +2151,6 @@ namespace libtorrent if (received > 0) { - m_connecting = false; m_last_receive = second_clock::universal_time(); m_recv_pos += received; @@ -2426,6 +2435,46 @@ namespace libtorrent return m_dl_bandwidth_quota.left() > 0; } + void peer_connection::connect() + { + INVARIANT_CHECK; + +#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING) + (*m_ses.m_logger) << "CONNECT: " << m_remote.as_string() << "\n"; +#endif + + m_queued = false; + assert(m_connecting); + assert(associated_torrent()); + m_socket->connect(m_remote, associated_torrent()->interface()); + + if (m_torrent->alerts().should_post(alert::debug)) + { + m_torrent->alerts().post_alert(peer_error_alert( + m_remote, m_peer_id, "connecting to peer")); + } + } + + void peer_connection::connection_complete() + { + INVARIANT_CHECK; + +#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING) + (*m_ses.m_logger) << "COMPLETED: " << m_remote.as_string() << "\n"; +#endif + + m_connecting = false; + // this means the connection just succeeded + +// assert(!can_write()); +// assert(m_writability_monitored); + if (!can_write() && m_writability_monitored) + { + m_writability_monitored = false; + m_selector.remove_writable(m_socket); + } + } + // -------------------------- // SEND DATA // -------------------------- @@ -2435,6 +2484,7 @@ namespace libtorrent { INVARIANT_CHECK; + assert(!m_connecting); assert(!m_disconnecting); assert(m_socket->is_writable()); assert(can_write()); @@ -2585,7 +2635,7 @@ namespace libtorrent #ifndef NDEBUG void peer_connection::check_invariant() const { - assert(can_write() == m_selector.is_writability_monitored(m_socket)); + assert((can_write() || m_connecting) == m_selector.is_writability_monitored(m_socket)); /* assert(m_num_pieces == std::count( diff --git a/src/policy.cpp b/src/policy.cpp index 3143f9b45..1733a3f19 100755 --- a/src/policy.cpp +++ b/src/policy.cpp @@ -116,8 +116,9 @@ namespace // the number of blocks we want, but it will try to make the picked // blocks be from whole pieces, possibly by returning more blocks // than we requested. + assert(c.remote() == c.get_socket()->sender()); p.pick_pieces(c.get_bitfield(), interesting_pieces - , num_requests, prefer_whole_pieces, c.get_socket()->sender()); + , num_requests, prefer_whole_pieces, c.remote()); // this vector is filled with the interesting pieces // that some other peer is currently downloading @@ -866,14 +867,15 @@ namespace libtorrent // TODO: only allow _one_ connection to use this // override at a time + assert(c.remote() == c.get_socket()->sender()); if (m_torrent->num_peers() >= m_torrent->m_connections_quota.given - && c.get_socket()->sender().ip() != m_torrent->current_tracker().ip()) + && c.remote().ip() != m_torrent->current_tracker().ip()) { throw protocol_error("too many connections, refusing incoming connection"); // cause a disconnect } #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING) - if (c.get_socket()->sender().ip() == m_torrent->current_tracker().ip()) + if (c.remote().ip() == m_torrent->current_tracker().ip()) { m_torrent->debug_log("overriding connection limit for tracker NAT-check"); } @@ -882,7 +884,7 @@ namespace libtorrent std::vector::iterator i = std::find_if( m_peers.begin() , m_peers.end() - , match_peer_ip(c.get_socket()->sender())); + , match_peer_ip(c.remote())); if (i == m_peers.end()) { @@ -892,8 +894,10 @@ namespace libtorrent // we don't have ny info about this peer. // add a new entry - peer p(c.get_socket()->sender(), peer::not_connectable); + assert(c.remote() == c.get_socket()->sender()); + peer p(c.remote(), peer::not_connectable); m_peers.push_back(p); + check_invariant(); i = m_peers.end()-1; } else @@ -909,6 +913,7 @@ namespace libtorrent i->prev_amount_download = 0; i->prev_amount_upload = 0; i->connection = &c; + assert(i->connection); i->connected = second_clock::universal_time(); m_last_optimistic_disconnect = second_clock::universal_time(); } @@ -939,7 +944,7 @@ namespace libtorrent m_peers.push_back(p); // the iterator is invalid // because of the push_back() - i = m_peers.end()-1; + i = m_peers.end() - 1; } else { @@ -1157,7 +1162,9 @@ namespace libtorrent { try { + assert(!p->connection); p->connection = &m_torrent->connect_to_peer(p->id); + assert(p->connection); p->connection->add_stat(p->prev_amount_download, p->prev_amount_upload); p->prev_amount_download = 0; p->prev_amount_upload = 0; @@ -1166,8 +1173,11 @@ namespace libtorrent second_clock::universal_time(); return true; } - catch (network_error&) + catch (network_error& e) { + // TODO: This path needs testing! + std::string msg = e.what(); + assert(false); // TODO: remove the peer // m_peers.erase(std::find(m_peers.begin(), m_peers.end(), p)); } @@ -1253,10 +1263,11 @@ namespace libtorrent bool policy::has_connection(const peer_connection* c) { assert(c); + assert(c->remote() == c->get_socket()->sender()); return std::find_if( m_peers.begin() , m_peers.end() - , match_peer_ip(c->get_socket()->sender())) != m_peers.end(); + , match_peer_ip(c->remote())) != m_peers.end(); } void policy::check_invariant() const @@ -1264,10 +1275,17 @@ namespace libtorrent if (m_torrent->is_aborted()) return; int actual_unchoked = 0; int connected_peers = 0; + + int total_connections = 0; + int nonempty_connections = 0; + + for (std::vector::const_iterator i = m_peers.begin(); i != m_peers.end(); ++i) { + ++total_connections; if (!i->connection) continue; + ++nonempty_connections; if (!i->connection->is_disconnecting()) ++connected_peers; if (!i->connection->is_choked()) ++actual_unchoked; @@ -1291,6 +1309,7 @@ namespace libtorrent // When there's an outgoing connection, it will first // be added to the torrent and then to the policy. // that's why the two second cases are in there. + assert(connected_peers == num_torrent_peers || (connected_peers == num_torrent_peers + 1 && connected_peers > 0) diff --git a/src/session.cpp b/src/session.cpp index 731add3d5..1fe84e768 100755 --- a/src/session.cpp +++ b/src/session.cpp @@ -393,6 +393,7 @@ namespace libtorrent { namespace detail , m_download_rate(-1) , m_max_uploads(-1) , m_max_connections(-1) + , m_half_open_limit(-1) , m_incoming_connection(false) { #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING) @@ -441,7 +442,34 @@ namespace libtorrent { namespace detail { while (!m_disconnect_peer.empty()) { - m_connections.erase(m_disconnect_peer.back()); + boost::shared_ptr& p = m_disconnect_peer.back(); + assert(p->is_disconnecting()); + if (p->is_connecting()) + { + // Since this peer is still connecting, will not be + // in the list of completed connections. + connection_map::iterator i = m_half_open.find(p->get_socket()); + if (i == m_half_open.end()) + { + // this connection is not in the half-open list, so it + // has to be in the queue, waiting to be connected. + connection_queue::iterator j = std::find( + m_connection_queue.begin(), m_connection_queue.end(), p); + + assert(j != m_connection_queue.end()); + if (j != m_connection_queue.end()) m_connection_queue.erase(j); + } + else + { + m_half_open.erase(i); + } + } + else + { + connection_map::iterator i = m_connections.find(p->get_socket()); + assert(i != m_connections.end()); + if (i != m_connections.end()) m_connections.erase(i); + } m_disconnect_peer.pop_back(); } } @@ -514,6 +542,21 @@ namespace libtorrent { namespace detail } } + void session_impl::process_connection_queue() + { + while (!m_connection_queue.empty()) + { + if ((int)m_half_open.size() >= m_half_open_limit + && m_half_open_limit > 0) + return; + + connection_queue::value_type& c = m_connection_queue.front(); + m_half_open.insert(std::make_pair(c->get_socket(), c)); + assert(c->associated_torrent()); + c->connect(); + m_connection_queue.pop_front(); + } + } void session_impl::operator()() { @@ -605,7 +648,24 @@ namespace libtorrent { namespace detail // the connection may have been disconnected in the receive phase if (p == m_connections.end()) { - m_selector.remove(*i); + // if we didn't find the socket among the + // connected connections, look among the + // half-open connections to see if some of + // them have finished. + p = m_half_open.find(*i); + + if (p == m_half_open.end()) + { + m_selector.remove(*i); + } + else + { + p->second->connection_complete(); + assert(!p->second->is_connecting()); + m_connections.insert(*p); + m_half_open.erase(p); + process_connection_queue(); + } } else { @@ -703,6 +763,7 @@ namespace libtorrent { namespace detail boost::shared_ptr c( new peer_connection(*this, m_selector, s)); + assert(!c->is_connecting()); m_connections.insert(std::make_pair(s, c)); m_selector.monitor_readability(s); m_selector.monitor_errors(s); @@ -773,6 +834,9 @@ namespace libtorrent { namespace detail i != error_clients.end(); ++i) { connection_map::iterator p = m_connections.find(*i); + + m_selector.remove(*i); + // the connection may have been disconnected in the receive or send phase if (p != m_connections.end()) { if (m_alerts.should_post(alert::debug)) @@ -783,12 +847,7 @@ namespace libtorrent { namespace detail , p->second->id() , "connection closed")); } - } - m_selector.remove(*i); - // the connection may have been disconnected in the receive or send phase - if (p != m_connections.end()) - { #if defined(TORRENT_VERBOSE_LOGGING) (*p->second->m_logger) << "*** CONNECTION EXCEPTION\n"; #endif @@ -809,6 +868,32 @@ namespace libtorrent { namespace detail assert(m_listen_socket.unique()); m_listen_socket.reset(); } + else + { + // the error was not in one of the connected + // conenctions. Look among the half-open ones. + p = m_half_open.find(*i); + if (p != m_half_open.end()) + { + if (m_alerts.should_post(alert::debug)) + { + m_alerts.post_alert( + peer_error_alert( + p->first->sender() + , p->second->id() + , "connection attempt failed")); + // TODO: TEMP! + #warning TEMP! + std::ofstream log("connect.log", std::ios::app); + log << boost::posix_time::microsec_clock::universal_time() << " FAILED: " + << (*i)->sender().as_string() << std::endl; + + + p->second->set_failed(); + m_half_open.erase(p); + } + } + } } #ifndef NDEBUG @@ -1003,17 +1088,25 @@ namespace libtorrent { namespace detail { assert(place); + for (connection_map::iterator i = m_half_open.begin(); + i != m_half_open.end(); ++i) + { + assert(i->second->is_connecting()); + } + for (connection_map::iterator i = m_connections.begin(); i != m_connections.end(); ++i) { assert(i->second); - if (i->second->can_write() != m_selector.is_writability_monitored(i->first) + if (i->second->is_connecting() + || i->second->can_write() != m_selector.is_writability_monitored(i->first) || i->second->can_read() != m_selector.is_readability_monitored(i->first)) { std::ofstream error_log("error.log", std::ios_base::app); boost::shared_ptr p = i->second; error_log << "selector::is_writability_monitored() " << m_selector.is_writability_monitored(i->first) << "\n"; error_log << "selector::is_readability_monitored() " << m_selector.is_readability_monitored(i->first) << "\n"; + error_log << "peer_connection::is_connecting() " << p->is_connecting() << "\n"; error_log << "peer_connection::can_write() " << p->can_write() << "\n"; error_log << "peer_connection::can_read() " << p->can_read() << "\n"; error_log << "peer_connection::ul_quota_left " << p->m_ul_bandwidth_quota.left() << "\n"; @@ -1406,6 +1499,13 @@ namespace libtorrent m_impl.m_max_connections = limit; } + void session::set_max_half_open_connections(int limit) + { + assert(limit > 0 || limit == -1); + boost::mutex::scoped_lock l(m_impl.m_mutex); + m_impl.m_half_open_limit = limit; + } + void session::set_upload_rate_limit(int bytes_per_second) { assert(bytes_per_second > 0 || bytes_per_second == -1); diff --git a/src/torrent.cpp b/src/torrent.cpp index 37a9e62e4..e1b3cbe75 100755 --- a/src/torrent.cpp +++ b/src/torrent.cpp @@ -857,7 +857,7 @@ namespace libtorrent { assert(p != 0); - peer_iterator i = m_connections.find(p->get_socket()->sender()); + peer_iterator i = m_connections.find(p->remote()); assert(i != m_connections.end()); if (ready_for_connections()) @@ -903,25 +903,20 @@ namespace libtorrent peer_connection& torrent::connect_to_peer(const address& a) { boost::shared_ptr s(new socket(socket::tcp, false)); - s->connect(a, m_net_interface); boost::shared_ptr c(new peer_connection( - m_ses, m_ses.m_selector, this, s)); + m_ses, m_ses.m_selector, this, s, a)); - detail::session_impl::connection_map::iterator p = - m_ses.m_connections.insert(std::make_pair(s, c)).first; + m_ses.m_connection_queue.push_back(c); - // add the newly connected peer to this torrent's peer list - assert(m_connections.find(p->second->get_socket()->sender()) - == m_connections.end()); + assert(m_connections.find(a) == m_connections.end()); #ifndef NDEBUG m_policy->check_invariant(); #endif + // add the newly connected peer to this torrent's peer list m_connections.insert( - std::make_pair( - p->second->get_socket()->sender() - , boost::get_pointer(p->second))); + std::make_pair(a, boost::get_pointer(c))); #ifndef NDEBUG m_policy->check_invariant(); @@ -929,13 +924,14 @@ namespace libtorrent m_ses.m_selector.monitor_readability(s); m_ses.m_selector.monitor_errors(s); + m_ses.process_connection_queue(); return *c; } void torrent::attach_peer(peer_connection* p) { assert(p != 0); - assert(m_connections.find(p->get_socket()->sender()) == m_connections.end()); + assert(m_connections.find(p->remote()) == m_connections.end()); assert(!p->is_local()); detail::session_impl::connection_map::iterator i @@ -950,7 +946,8 @@ namespace libtorrent // connection list. m_policy->new_connection(*i->second); - m_connections.insert(std::make_pair(p->get_socket()->sender(), p)); + assert(p->remote() == p->get_socket()->sender()); + m_connections.insert(std::make_pair(p->remote(), p)); #ifndef NDEBUG m_policy->check_invariant(); diff --git a/src/torrent_handle.cpp b/src/torrent_handle.cpp index 632110aeb..89d864db3 100755 --- a/src/torrent_handle.cpp +++ b/src/torrent_handle.cpp @@ -450,7 +450,7 @@ namespace libtorrent // but still connectable if (!i->second->is_local()) continue; - address ip = i->second->get_socket()->sender(); + address ip = i->second->remote(); entry peer(entry::dictionary_t); peer["ip"] = ip.as_string(); peer["port"] = ip.port; @@ -578,7 +578,7 @@ namespace libtorrent p.payload_down_speed = statistics.download_payload_rate(); p.payload_up_speed = statistics.upload_payload_rate(); p.id = peer->get_peer_id(); - p.ip = peer->get_socket()->sender(); + p.ip = peer->remote(); p.total_download = statistics.total_payload_download(); p.total_upload = statistics.total_payload_upload(); @@ -621,7 +621,9 @@ namespace libtorrent if (peer->has_peer_choked()) p.flags |= peer_info::remote_choked; if (peer->support_extensions()) p.flags |= peer_info::supports_extensions; if (peer->is_local()) p.flags |= peer_info::local_connection; - + if (peer->is_connecting() && !peer->is_queued()) p.flags |= peer_info::connecting; + if (peer->is_queued()) p.flags |= peer_info::queued; + p.pieces = peer->get_bitfield(); p.seed = peer->is_seed(); }