From 49bd69cad48a104ddc1a2d54f0fe7239056f698e Mon Sep 17 00:00:00 2001 From: Arvid Norberg Date: Sat, 5 May 2007 00:29:33 +0000 Subject: [PATCH] introduced a proper half open TCP connection limit. Also exposed the connection queue to let clients use the same connection limiter as libtorrent. UPnP connections and tracker connection are now also limited as well as peer connections and web seeds --- Jamfile | 1 + docs/manual.rst | 7 + examples/client_test.cpp | 2 +- include/Makefile.am | 1 + include/libtorrent/aux_/session_impl.hpp | 26 +-- include/libtorrent/connection_queue.hpp | 96 +++++++++ include/libtorrent/http_connection.hpp | 10 +- .../libtorrent/http_tracker_connection.hpp | 6 + include/libtorrent/peer_connection.hpp | 9 +- include/libtorrent/policy.hpp | 3 +- include/libtorrent/session.hpp | 4 +- include/libtorrent/session_settings.hpp | 6 + include/libtorrent/torrent.hpp | 1 + include/libtorrent/tracker_manager.hpp | 2 + include/libtorrent/upnp.hpp | 8 +- src/Makefile.am | 3 +- src/connection_queue.cpp | 163 ++++++++++++++ src/http_connection.cpp | 42 +++- src/http_tracker_connection.cpp | 19 +- src/peer_connection.cpp | 24 ++- src/policy.cpp | 8 +- src/session.cpp | 4 + src/session_impl.cpp | 202 +++++------------- src/torrent.cpp | 40 ++-- src/tracker_manager.cpp | 2 + src/upnp.cpp | 15 +- 26 files changed, 488 insertions(+), 216 deletions(-) create mode 100644 include/libtorrent/connection_queue.hpp create mode 100644 src/connection_queue.cpp diff --git a/Jamfile b/Jamfile index cebb21dd3..b44630581 100755 --- a/Jamfile +++ b/Jamfile @@ -41,6 +41,7 @@ SOURCES = allocate_resources alert bandwidth_manager + connection_queue entry escape_string http_connection diff --git a/docs/manual.rst b/docs/manual.rst index dec329b30..6bce207b1 100644 --- a/docs/manual.rst +++ b/docs/manual.rst @@ -2175,6 +2175,7 @@ that will be sent to the tracker. The user-agent is a good way to identify your bool allow_multiple_connections_per_ip; int max_failcount; int min_reconnect_time; + int peer_connect_timeout; bool use_dht_as_fallback; }; @@ -2265,6 +2266,12 @@ decremented by one, allowing another try. ``min_reconnect_time`` is the time to wait between connection attempts. If the peer fails, the time is multiplied by fail counter. +``peer_connect_timeout`` the number of seconds to wait after a connection +attempt is initiated to a peer until it is considered as having timed out. +The default is 10 seconds. This setting is especially important in case +the number of half-open connections are limited, since stale half-open +connection may delay the connection of other peers considerably. + ``use_dht_as_fallback`` determines how the DHT is used. If this is true (which it is by default), the DHT will only be used for torrents where all trackers in its tracker list has failed. Either by an explicit error diff --git a/examples/client_test.cpp b/examples/client_test.cpp index 2fb491ac3..6b0fbe588 100644 --- a/examples/client_test.cpp +++ b/examples/client_test.cpp @@ -258,7 +258,7 @@ void print_peer_info(std::ostream& out, std::vector const for (std::vector::const_iterator i = peers.begin(); i != peers.end(); ++i) { - if (i->flags & (peer_info::handshake | peer_info::connecting | peer_info::queued)) + if (i->flags & (peer_info::handshake)) continue; out.fill(' '); diff --git a/include/Makefile.am b/include/Makefile.am index f5cd6ff04..2d886203e 100644 --- a/include/Makefile.am +++ b/include/Makefile.am @@ -4,6 +4,7 @@ libtorrent/allocate_resources.hpp \ libtorrent/bandwidth_manager.hpp \ libtorrent/bencode.hpp \ libtorrent/buffer.hpp \ +libtorrent/connection_queue.hpp \ libtorrent/config.hpp \ libtorrent/debug.hpp \ libtorrent/entry.hpp \ diff --git a/include/libtorrent/aux_/session_impl.hpp b/include/libtorrent/aux_/session_impl.hpp index 51908287a..57e4e6638 100644 --- a/include/libtorrent/aux_/session_impl.hpp +++ b/include/libtorrent/aux_/session_impl.hpp @@ -81,6 +81,7 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/upnp.hpp" #include "libtorrent/lsd.hpp" #include "libtorrent/socket_type.hpp" +#include "libtorrent/connection_queue.hpp" namespace libtorrent { @@ -171,8 +172,6 @@ namespace libtorrent , boost::intrusive_ptr > connection_map; typedef std::map > torrent_map; - typedef std::deque > - connection_queue; session_impl( std::pair listen_port_range @@ -199,13 +198,7 @@ namespace libtorrent boost::weak_ptr find_torrent(const sha1_hash& info_hash); peer_id const& get_peer_id() const { return m_peer_id; } - // this will see if there are any pending connection attempts - // and in that case initiate new connections until the limit - // is reached. - void process_connection_queue(); - void close_connection(boost::intrusive_ptr const& p); - void connection_completed(boost::intrusive_ptr const& p); void connection_failed(boost::shared_ptr const& s , tcp::endpoint const& a, char const* message); @@ -335,13 +328,7 @@ namespace libtorrent // this is a list of half-open tcp connections // (only outgoing connections) - connection_map m_half_open; - - // this is a queue of pending outgoing connections. If the - // list of half-open connections is full (given the global - // limit), new outgoing connections are put on this queue, - // waiting for one slot in the half-open queue to open up. - connection_queue m_connection_queue; + connection_queue m_half_open; // filters incoming connections ip_filter m_ip_filter; @@ -393,9 +380,6 @@ namespace libtorrent int m_max_uploads; int m_max_connections; - // the number of simultaneous half-open tcp - // connections libtorrent will have. - int m_half_open_limit; // statistics gathered from all torrents. stat m_stat; @@ -433,6 +417,11 @@ namespace libtorrent // the timer used to fire the second_tick deadline_timer m_timer; + + // the index of the torrent that will be offered to + // connect to a peer next time second_tick is called. + // This implements a round robin. + int m_next_connect_torrent; #ifndef NDEBUG void check_invariant(const char *place = 0); #endif @@ -448,6 +437,7 @@ namespace libtorrent // logger used to write bandwidth usage statistics boost::shared_ptr m_stats_logger; int m_second_counter; + public: boost::shared_ptr m_logger; private: diff --git a/include/libtorrent/connection_queue.hpp b/include/libtorrent/connection_queue.hpp new file mode 100644 index 000000000..05a8a61fe --- /dev/null +++ b/include/libtorrent/connection_queue.hpp @@ -0,0 +1,96 @@ +/* + +Copyright (c) 2007, Arvid Norberg +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in + the documentation and/or other materials provided with the distribution. + * Neither the name of the author nor the names of its + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. + +*/ + +#ifndef TORRENT_CONNECTION_QUEUE +#define TORRENT_CONNECTION_QUEUE + +#include +#include +#include +#include "libtorrent/socket.hpp" +#include "libtorrent/time.hpp" + +namespace libtorrent +{ + +class connection_queue : public boost::noncopyable +{ +public: + connection_queue(io_service& ios); + + bool free_slots() const; + + void enqueue(boost::function const& on_connect + , boost::function const& on_timeout + , time_duration timeout); + void done(int ticket); + void limit(int limit); + int limit() const; + +#ifndef NDEBUG + + void check_invariant() const; + +#endif + +private: + + void try_connect(); + void on_timeout(asio::error_code const& e); + + struct entry + { + entry(): connecting(false), ticket(0), expires(max_time()) {} + // called when the connection is initiated + boost::function on_connect; + // called if done hasn't been called within the timeout + boost::function on_timeout; + bool connecting; + int ticket; + ptime expires; + time_duration timeout; + }; + + std::list m_queue; + + // the next ticket id a connection will be given + int m_next_ticket; + int m_num_connecting; + int m_half_open_limit; + + deadline_timer m_timer; +}; + +} + +#endif + diff --git a/include/libtorrent/http_connection.hpp b/include/libtorrent/http_connection.hpp index 05c3f878e..409213857 100644 --- a/include/libtorrent/http_connection.hpp +++ b/include/libtorrent/http_connection.hpp @@ -57,7 +57,8 @@ typedef boost::function, boost::noncopyable { - http_connection(asio::io_service& ios, http_handler handler, bool bottled = true) + http_connection(asio::io_service& ios, connection_queue& cc + , http_handler handler, bool bottled = true) : m_sock(ios) , m_read_pos(0) , m_resolver(ios) @@ -71,6 +72,8 @@ struct http_connection : boost::enable_shared_from_this, boost: , m_limiter_timer_active(false) , m_limiter_timer(ios) , m_redirect(true) + , m_connection_ticket(-1) + , m_cc(cc) { assert(!m_handler.empty()); } @@ -93,6 +96,8 @@ private: void on_resolve(asio::error_code const& e , tcp::resolver::iterator i); + void connect(int ticket, tcp::endpoint target_address); + void on_connect_timeout(); void on_connect(asio::error_code const& e /* , tcp::resolver::iterator i*/); void on_write(asio::error_code const& e); @@ -139,6 +144,9 @@ private: // if set to true, the connection should handle // HTTP redirects. bool m_redirect; + + int m_connection_ticket; + connection_queue& m_cc; }; } diff --git a/include/libtorrent/http_tracker_connection.hpp b/include/libtorrent/http_tracker_connection.hpp index dcae78e9a..35d529504 100755 --- a/include/libtorrent/http_tracker_connection.hpp +++ b/include/libtorrent/http_tracker_connection.hpp @@ -60,6 +60,7 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/config.hpp" #include "libtorrent/buffer.hpp" #include "libtorrent/socket_type.hpp" +#include "libtorrent/connection_queue.hpp" namespace libtorrent { @@ -115,6 +116,7 @@ namespace libtorrent http_tracker_connection( asio::strand& str + , connection_queue& cc , tracker_manager& man , tracker_request const& req , std::string const& hostname @@ -138,6 +140,7 @@ namespace libtorrent , std::string const& request); void name_lookup(asio::error_code const& error, tcp::resolver::iterator i); + void connect(int ticket, tcp::endpoint target_address); void connected(asio::error_code const& error); void sent(asio::error_code const& error); void receive(asio::error_code const& error @@ -164,6 +167,9 @@ namespace libtorrent std::string m_password; bool m_timed_out; + + int m_connection_ticket; + connection_queue& m_cc; }; } diff --git a/include/libtorrent/peer_connection.hpp b/include/libtorrent/peer_connection.hpp index 52eb47335..cbe8c1478 100755 --- a/include/libtorrent/peer_connection.hpp +++ b/include/libtorrent/peer_connection.hpp @@ -224,6 +224,7 @@ namespace libtorrent std::vector const& get_bitfield() const; + void timed_out(); // this will cause this peer_connection to be disconnected. void disconnect(); bool is_disconnecting() const { return m_disconnecting; } @@ -246,7 +247,7 @@ namespace libtorrent // initiate the tcp connection. This may be postponed until // the library isn't using up the limitation of half-open // tcp connections. - void connect(); + void connect(int ticket); // This is called for every peer right after the upload // bandwidth has been distributed among them @@ -661,6 +662,12 @@ namespace libtorrent // it allows some variance without changing // back and forth between states peer_speed_t m_speed; + + // the ticket id from the connection queue. + // This is used to identify the connection + // so that it can be removed from the queue + // once the connection completes + int m_connection_ticket; #ifndef NDEBUG public: bool m_in_constructor; diff --git a/include/libtorrent/policy.hpp b/include/libtorrent/policy.hpp index ff4ad47e6..80661b079 100755 --- a/include/libtorrent/policy.hpp +++ b/include/libtorrent/policy.hpp @@ -188,6 +188,8 @@ namespace libtorrent iterator begin_peer() { return m_peers.begin(); } iterator end_peer() { return m_peers.end(); } + bool connect_one_peer(); + private: bool unchoke_one_peer(); @@ -203,7 +205,6 @@ namespace libtorrent iterator find_seed_unchoke_candidate(); bool connect_peer(iterator p); - bool connect_one_peer(); bool disconnect_one_peer(); iterator find_disconnect_candidate(); iterator find_connect_candidate(); diff --git a/include/libtorrent/session.hpp b/include/libtorrent/session.hpp index e7feb7b26..ddc05b092 100755 --- a/include/libtorrent/session.hpp +++ b/include/libtorrent/session.hpp @@ -72,7 +72,7 @@ namespace libtorrent struct torrent_plugin; class torrent; class ip_filter; - + class connection_queue; namespace aux { @@ -244,6 +244,8 @@ namespace libtorrent std::auto_ptr pop_alert(); void set_severity_level(alert::severity_t s); + connection_queue& get_connection_queue(); + // Resource management used for global limits. resource_request m_ul_bandwidth_quota; resource_request m_dl_bandwidth_quota; diff --git a/include/libtorrent/session_settings.hpp b/include/libtorrent/session_settings.hpp index a933ff5f3..71b9d1e71 100644 --- a/include/libtorrent/session_settings.hpp +++ b/include/libtorrent/session_settings.hpp @@ -97,6 +97,7 @@ namespace libtorrent , allow_multiple_connections_per_ip(false) , max_failcount(3) , min_reconnect_time(60) + , peer_connect_timeout(10) #ifndef TORRENT_DISABLE_DHT , use_dht_as_fallback(true) #endif @@ -196,6 +197,11 @@ namespace libtorrent // this time is multiplied with the failcount. int min_reconnect_time; + // this is the timeout for a connection attempt. If + // the connect does not succeed within this time, the + // connection is dropped. The time is specified in seconds. + int peer_connect_timeout; + #ifndef TORRENT_DISABLE_DHT // while this is true, the dht will note be used unless the // tracker is online diff --git a/include/libtorrent/torrent.hpp b/include/libtorrent/torrent.hpp index d7e74ff19..8fcdcfae9 100755 --- a/include/libtorrent/torrent.hpp +++ b/include/libtorrent/torrent.hpp @@ -250,6 +250,7 @@ namespace libtorrent void remove_peer(peer_connection* p); bool want_more_peers() const; + void try_connect_peer(); peer_connection* connection_for(tcp::endpoint const& a) { diff --git a/include/libtorrent/tracker_manager.hpp b/include/libtorrent/tracker_manager.hpp index b51e07c49..7e9783311 100755 --- a/include/libtorrent/tracker_manager.hpp +++ b/include/libtorrent/tracker_manager.hpp @@ -61,6 +61,7 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/peer.hpp" #include "libtorrent/config.hpp" #include "libtorrent/time.hpp" +#include "libtorrent/connection_queue.hpp" namespace libtorrent { @@ -227,6 +228,7 @@ namespace libtorrent void queue_request( asio::strand& str + , connection_queue& cc , tracker_request r , std::string const& auth , address bind_infc diff --git a/include/libtorrent/upnp.hpp b/include/libtorrent/upnp.hpp index 824c6d0c8..acf234d20 100644 --- a/include/libtorrent/upnp.hpp +++ b/include/libtorrent/upnp.hpp @@ -35,6 +35,7 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/socket.hpp" #include "libtorrent/http_connection.hpp" +#include "libtorrent/connection_queue.hpp" #include #include @@ -58,8 +59,9 @@ typedef boost::function portmap_callback_t; class upnp : boost::noncopyable { public: - upnp(io_service& ios, address const& listen_interface - , std::string const& user_agent, portmap_callback_t const& cb); + upnp(io_service& ios, connection_queue& cc + , address const& listen_interface, std::string const& user_agent + , portmap_callback_t const& cb); ~upnp(); void rebind(address const& listen_interface); @@ -203,6 +205,8 @@ private: bool m_disabled; bool m_closing; + connection_queue& m_cc; + #ifdef TORRENT_UPNP_LOGGING std::ofstream m_log; #endif diff --git a/src/Makefile.am b/src/Makefile.am index 2b0fedd2a..7790a80b1 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -21,7 +21,7 @@ torrent_info.cpp tracker_manager.cpp http_connection.cpp \ http_tracker_connection.cpp udp_tracker_connection.cpp \ alert.cpp identify_client.cpp ip_filter.cpp file.cpp metadata_transfer.cpp \ logger.cpp file_pool.cpp ut_pex.cpp lsd.cpp upnp.cpp instantiate_connection.cpp \ -socks5_stream.cpp http_stream.cpp $(kademlia_sources) +socks5_stream.cpp http_stream.cpp connection_queue.cpp $(kademlia_sources) noinst_HEADERS = \ $(top_srcdir)/include/libtorrent/alert.hpp \ @@ -32,6 +32,7 @@ $(top_srcdir)/include/libtorrent/aux_/session_impl.hpp \ $(top_srcdir)/include/libtorrent/bandwidth_manager.hpp \ $(top_srcdir)/include/libtorrent/bencode.hpp \ $(top_srcdir)/include/libtorrent/buffer.hpp \ +$(top_srcdir)/include/libtorrent/connection_queue.hpp \ $(top_srcdir)/include/libtorrent/debug.hpp \ $(top_srcdir)/include/libtorrent/entry.hpp \ $(top_srcdir)/include/libtorrent/escape_string.hpp \ diff --git a/src/connection_queue.cpp b/src/connection_queue.cpp new file mode 100644 index 000000000..45e27b5e0 --- /dev/null +++ b/src/connection_queue.cpp @@ -0,0 +1,163 @@ + +/* + +Copyright (c) 2007, Arvid Norberg +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in + the documentation and/or other materials provided with the distribution. + * Neither the name of the author nor the names of its + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. + +*/ + +#include +#include "libtorrent/invariant_check.hpp" +#include "libtorrent/connection_queue.hpp" + +namespace libtorrent +{ + + connection_queue::connection_queue(io_service& ios): m_next_ticket(0) + , m_num_connecting(0) + , m_half_open_limit(0) + , m_timer(ios) + {} + + bool connection_queue::free_slots() const + { return m_num_connecting < m_half_open_limit || m_half_open_limit <= 0; } + + void connection_queue::enqueue(boost::function const& on_connect + , boost::function const& on_timeout + , time_duration timeout) + { + INVARIANT_CHECK; + + m_queue.push_back(entry()); + entry& e = m_queue.back(); + e.on_connect = on_connect; + e.on_timeout = on_timeout; + e.ticket = m_next_ticket; + e.timeout = timeout; + ++m_next_ticket; + try_connect(); + } + + void connection_queue::done(int ticket) + { + INVARIANT_CHECK; + + std::list::iterator i = std::find_if(m_queue.begin() + , m_queue.end(), boost::bind(&entry::ticket, _1) == ticket); + if (i == m_queue.end()) + { + // this might not be here in case on_timeout calls remove + return; + } + if (i->connecting) --m_num_connecting; + m_queue.erase(i); + try_connect(); + } + + void connection_queue::limit(int limit) + { m_half_open_limit = limit; } + + int connection_queue::limit() const + { return m_half_open_limit; } + +#ifndef NDEBUG + + void connection_queue::check_invariant() const + { + int num_connecting = 0; + for (std::list::const_iterator i = m_queue.begin(); + i != m_queue.end(); ++i) + { + if (i->connecting) ++num_connecting; + } + assert(num_connecting == m_num_connecting); + } + +#endif + + void connection_queue::try_connect() + { + INVARIANT_CHECK; + + if (!free_slots() || m_queue.empty()) + return; + + std::list::iterator i = std::find_if(m_queue.begin() + , m_queue.end(), boost::bind(&entry::connecting, _1) == false); + while (i != m_queue.end()) + { + ptime expire = time_now() + i->timeout; + if (m_num_connecting == 0) + { + m_timer.expires_at(expire); + m_timer.async_wait(boost::bind(&connection_queue::on_timeout, this, _1)); + } + i->connecting = true; + ++m_num_connecting; + i->expires = expire; + try { i->on_connect(i->ticket); } catch (std::exception&) {} + + if (!free_slots()) break; + i = std::find_if(i, m_queue.end(), boost::bind(&entry::connecting, _1) == false); + } + } + + void connection_queue::on_timeout(asio::error_code const& e) + { + INVARIANT_CHECK; + + assert(!e || e == asio::error::operation_aborted); + if (e) return; + + ptime next_expire = max_time(); + ptime now = time_now(); + for (std::list::iterator i = m_queue.begin(); + i != m_queue.end();) + { + if (i->connecting && i->expires < now) + { + boost::function on_timeout = i->on_timeout; + i = m_queue.erase(i); + --m_num_connecting; + try { on_timeout(); } catch (std::exception&) {} + continue; + } + if (i->expires < next_expire) + next_expire = i->expires; + ++i; + } + if (next_expire < max_time()) + { + m_timer.expires_at(next_expire); + m_timer.async_wait(boost::bind(&connection_queue::on_timeout, this, _1)); + } + try_connect(); + } + +} + diff --git a/src/http_connection.cpp b/src/http_connection.cpp index 602942591..197b3029c 100644 --- a/src/http_connection.cpp +++ b/src/http_connection.cpp @@ -86,12 +86,27 @@ void http_connection::start(std::string const& hostname, std::string const& port } } +void http_connection::on_connect_timeout() +{ + if (m_connection_ticket > -1) m_cc.done(m_connection_ticket); + m_connection_ticket = -1; + + if (m_bottled && m_called) return; + m_called = true; + m_handler(asio::error::timed_out, m_parser, 0, 0); + close(); +} + void http_connection::on_timeout(boost::weak_ptr p , asio::error_code const& e) { - if (e == asio::error::operation_aborted) return; boost::shared_ptr c = p.lock(); if (!c) return; + if (c->m_connection_ticket > -1) c->m_cc.done(c->m_connection_ticket); + c->m_connection_ticket = -1; + + if (e == asio::error::operation_aborted) return; + if (c->m_bottled && c->m_called) return; if (c->m_last_receive + c->m_timeout < time_now()) @@ -112,6 +127,9 @@ void http_connection::close() m_sock.close(); m_hostname.clear(); m_port.clear(); + + if (m_connection_ticket > -1) m_cc.done(m_connection_ticket); + m_connection_ticket = -1; } void http_connection::on_resolve(asio::error_code const& e @@ -126,7 +144,15 @@ void http_connection::on_resolve(asio::error_code const& e return; } assert(i != tcp::resolver::iterator()); - m_sock.async_connect(*i, boost::bind(&http_connection::on_connect + m_cc.enqueue(bind(&http_connection::connect, shared_from_this(), _1, *i) + , bind(&http_connection::on_connect_timeout, shared_from_this()) + , m_timeout); +} + +void http_connection::connect(int ticket, tcp::endpoint target_address) +{ + m_connection_ticket = ticket; + m_sock.async_connect(target_address, boost::bind(&http_connection::on_connect , shared_from_this(), _1/*, ++i*/)); } @@ -143,8 +169,9 @@ void http_connection::on_connect(asio::error_code const& e { // The connection failed. Try the next endpoint in the list. m_sock.close(); - m_sock.async_connect(*i, bind(&http_connection::on_connect - , shared_from_this(), _1, ++i)); + m_cc.enqueue(bind(&http_connection::connect, shared_from_this(), _1, *i) + , bind(&http_connection::on_connect_timeout, shared_from_this()) + , m_timeout); } */ else { @@ -241,11 +268,8 @@ void http_connection::on_read(asio::error_code const& e } m_limiter_timer_active = false; - m_timer.cancel(); - m_limiter_timer.cancel(); - m_sock.close(); - m_hostname.clear(); - m_port.clear(); + close(); + get(url, m_timeout); return; } diff --git a/src/http_tracker_connection.cpp b/src/http_tracker_connection.cpp index c0ed90147..3fc88c24e 100755 --- a/src/http_tracker_connection.cpp +++ b/src/http_tracker_connection.cpp @@ -278,6 +278,7 @@ namespace libtorrent http_tracker_connection::http_tracker_connection( asio::strand& str + , connection_queue& cc , tracker_manager& man , tracker_request const& req , std::string const& hostname @@ -299,6 +300,8 @@ namespace libtorrent , m_proxy(ps) , m_password(auth) , m_timed_out(false) + , m_connection_ticket(-1) + , m_cc(cc) { m_send_buffer.assign("GET "); @@ -476,6 +479,8 @@ namespace libtorrent m_timed_out = true; m_socket.reset(); m_name_lookup.cancel(); + if (m_connection_ticket > -1) m_cc.done(m_connection_ticket); + m_connection_ticket = -1; fail_timeout(); } @@ -537,15 +542,25 @@ namespace libtorrent m_socket->open(target_address.protocol()); m_socket->bind(tcp::endpoint(bind_interface(), 0)); - m_socket->async_connect(target_address, bind(&http_tracker_connection::connected, self(), _1)); + m_cc.enqueue(bind(&http_tracker_connection::connect, self(), _1, target_address) + , bind(&http_tracker_connection::on_timeout, self()) + , seconds(m_settings.tracker_receive_timeout)); } catch (std::exception& e) { fail(-1, e.what()); }; + void http_tracker_connection::connect(int ticket, tcp::endpoint target_address) + { + m_connection_ticket = ticket; + m_socket->async_connect(target_address, bind(&http_tracker_connection::connected, self(), _1)); + } + void http_tracker_connection::connected(asio::error_code const& error) try { + if (m_connection_ticket > -1) m_cc.done(m_connection_ticket); + m_connection_ticket = -1; if (error == asio::error::operation_aborted) return; if (m_timed_out) return; if (error) @@ -708,7 +723,7 @@ namespace libtorrent req.url = location; - m_man.queue_request(m_strand, req + m_man.queue_request(m_strand, m_cc, req , m_password, bind_interface(), m_requester); close(); return; diff --git a/src/peer_connection.cpp b/src/peer_connection.cpp index acb8b4896..33154ab7c 100755 --- a/src/peer_connection.cpp +++ b/src/peer_connection.cpp @@ -125,6 +125,7 @@ namespace libtorrent , m_download_limit(resource_request::inf) , m_peer_info(peerinfo) , m_speed(slow) + , m_connection_ticket(-1) #ifndef NDEBUG , m_in_constructor(true) #endif @@ -1568,6 +1569,16 @@ namespace libtorrent try { s->close(); } catch (std::exception& e) {} } + void peer_connection::timed_out() + { + if (m_peer_info) ++m_peer_info->failcount; +#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING) + (*m_ses.m_logger) << "CONNECTION TIMED OUT: " << m_remote.address().to_string() + << "\n"; +#endif + m_ses.connection_failed(m_socket, m_remote, "timed out"); + } + void peer_connection::disconnect() { boost::intrusive_ptr me(this); @@ -1576,6 +1587,9 @@ namespace libtorrent if (m_disconnecting) return; m_disconnecting = true; + if (m_connecting) + m_ses.m_half_open.done(m_connection_ticket); + m_ses.m_io_service.post(boost::bind(&close_socket_ignore_error, m_socket)); boost::shared_ptr t = m_torrent.lock(); @@ -2166,7 +2180,7 @@ namespace libtorrent && !m_connecting; } - void peer_connection::connect() + void peer_connection::connect(int ticket) { INVARIANT_CHECK; @@ -2174,6 +2188,7 @@ namespace libtorrent (*m_ses.m_logger) << "CONNECTING: " << m_remote.address().to_string() << "\n"; #endif + m_connection_ticket = ticket; boost::shared_ptr t = m_torrent.lock(); assert(t); @@ -2197,6 +2212,11 @@ namespace libtorrent INVARIANT_CHECK; + if (m_disconnecting) return; + + m_connecting = false; + m_ses.m_half_open.done(m_connection_ticket); + if (e) { #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING) @@ -2216,8 +2236,6 @@ namespace libtorrent (*m_ses.m_logger) << "COMPLETED: " << m_remote.address().to_string() << "\n"; #endif - m_ses.connection_completed(self()); - m_connecting = false; on_connected(); setup_send(); setup_receive(); diff --git a/src/policy.cpp b/src/policy.cpp index a0302c0cb..a0df5325a 100755 --- a/src/policy.cpp +++ b/src/policy.cpp @@ -661,13 +661,14 @@ namespace libtorrent if (m_torrent->is_paused()) return; + ptime now = time_now(); // remove old disconnected peers from the list for (iterator i = m_peers.begin(); i != m_peers.end();) { // this timeout has to be customizable! if (i->connection == 0 && i->connected != min_time() - && time_now() - i->connected > minutes(120)) + && now - i->connected > minutes(120)) { m_peers.erase(i++); } @@ -724,9 +725,6 @@ namespace libtorrent } } - while (m_torrent->want_more_peers()) - if (!connect_one_peer()) break; - // ------------------------ // upload shift // ------------------------ @@ -967,7 +965,7 @@ namespace libtorrent { INVARIANT_CHECK; - // just ignore the obviously invalid entries from the tracker + // just ignore the obviously invalid entries if(remote.address() == address() || remote.port() == 0) return; diff --git a/src/session.cpp b/src/session.cpp index f33e3b474..229f251c6 100755 --- a/src/session.cpp +++ b/src/session.cpp @@ -360,5 +360,9 @@ namespace libtorrent m_impl->set_severity_level(s); } + connection_queue& session::get_connection_queue() + { + return m_impl->m_half_open; + } } diff --git a/src/session_impl.cpp b/src/session_impl.cpp index 517559592..253ddfa6f 100755 --- a/src/session_impl.cpp +++ b/src/session_impl.cpp @@ -480,13 +480,13 @@ namespace libtorrent { namespace detail , m_dl_bandwidth_manager(m_io_service, peer_connection::download_channel) , m_ul_bandwidth_manager(m_io_service, peer_connection::upload_channel) , m_tracker_manager(m_settings, m_tracker_proxy) + , m_half_open(m_io_service) , m_listen_port_range(listen_port_range) , m_listen_interface(address::from_string(listen_interface), listen_port_range.first) , m_external_listen_port(0) , m_abort(false) , m_max_uploads(-1) , m_max_connections(-1) - , m_half_open_limit(-1) , m_incoming_connection(false) , m_files(40) , m_last_tick(time_now()) @@ -496,12 +496,13 @@ namespace libtorrent { namespace detail #endif , m_natpmp(m_io_service, m_listen_interface.address() , bind(&session_impl::on_port_mapping, this, _1, _2, _3)) - , m_upnp(m_io_service, m_listen_interface.address() + , m_upnp(m_io_service, m_half_open, m_listen_interface.address() , m_settings.user_agent , bind(&session_impl::on_port_mapping, this, _1, _2, _3)) , m_lsd(m_io_service, m_listen_interface.address() , bind(&session_impl::on_lsd_peer, this, _1, _2)) , m_timer(m_io_service) + , m_next_connect_torrent(0) , m_checker_impl(*this) { @@ -698,35 +699,6 @@ namespace libtorrent { namespace detail if (m_listen_socket) async_accept(); } - void session_impl::process_connection_queue() - { - while (!m_connection_queue.empty()) - { - if ((int)m_half_open.size() >= m_half_open_limit - && m_half_open_limit > 0) - return; - - connection_queue::value_type c = m_connection_queue.front(); - - try - { - m_connection_queue.pop_front(); - assert(c->associated_torrent().lock().get()); - c->connect(); - m_half_open.insert(std::make_pair(c->get_socket(), c)); - } - catch (std::exception& e) - { - c->disconnect(); - -#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING) - (*m_logger) << "connect failed [" << c->remote() << "]: " - << e.what() << "\n"; -#endif - } - } - } - void session_impl::async_accept() { shared_ptr c(new socket_type(m_io_service)); @@ -808,46 +780,21 @@ namespace libtorrent { namespace detail connection_map::iterator p = m_connections.find(s); // the connection may have been disconnected in the receive or send phase - if (p != m_connections.end()) + if (p == m_connections.end()) return; + if (m_alerts.should_post(alert::debug)) { - if (m_alerts.should_post(alert::debug)) - { - m_alerts.post_alert( - peer_error_alert( - a - , p->second->pid() - , message)); - } + m_alerts.post_alert( + peer_error_alert( + a + , p->second->pid() + , message)); + } #if defined(TORRENT_VERBOSE_LOGGING) - (*p->second->m_logger) << "*** CONNECTION FAILED " << message << "\n"; + (*p->second->m_logger) << "*** CONNECTION FAILED " << message << "\n"; #endif - p->second->set_failed(); - p->second->disconnect(); - } - else - { - // the error was not in one of the connected - // conenctions. Look among the half-open ones. - p = m_half_open.find(s); - if (p != m_half_open.end()) - { - if (m_alerts.should_post(alert::debug)) - { - m_alerts.post_alert( - peer_error_alert( - a - , p->second->pid() - , message)); - } -#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING) - (*m_logger) << "CLOSED: " << a.address().to_string() - << " " << message << "\n"; -#endif - p->second->set_failed(); - p->second->disconnect(); - } - } + p->second->set_failed(); + p->second->disconnect(); } #ifndef NDEBUG catch (...) @@ -861,43 +808,9 @@ namespace libtorrent { namespace detail mutex_t::scoped_lock l(m_mutex); assert(p->is_disconnecting()); - - if (p->is_connecting()) - { - assert(p->is_local()); - assert(m_connections.find(p->get_socket()) == m_connections.end()); - // Since this peer is still connecting, will not be - // in the list of completed connections. - connection_map::iterator i = m_half_open.find(p->get_socket()); - if (i == m_half_open.end()) - { - // this connection is not in the half-open list, so it - // has to be in the queue, waiting to be connected. - connection_queue::iterator j = std::find( - m_connection_queue.begin(), m_connection_queue.end(), p); - - // if this connection was closed while being connected - // it has been removed from the connection queue and - // not yet put into the half-open queue. - if (j != m_connection_queue.end()) - m_connection_queue.erase(j); - } - else - { - m_half_open.erase(i); - process_connection_queue(); - } - } - else - { - assert(m_half_open.find(p->get_socket()) == m_half_open.end()); - assert(std::find(m_connection_queue.begin() - , m_connection_queue.end(), p) == m_connection_queue.end()); - connection_map::iterator i = m_connections.find(p->get_socket()); -// assert (i != m_connections.end()); - if (i != m_connections.end()) - m_connections.erase(i); - } + connection_map::iterator i = m_connections.find(p->get_socket()); + if (i != m_connections.end()) + m_connections.erase(i); } void session_impl::set_peer_id(peer_id const& id) @@ -933,7 +846,34 @@ namespace libtorrent { namespace detail m_timer.expires_from_now(seconds(1)); m_timer.async_wait(m_strand.wrap( bind(&session_impl::second_tick, this, _1))); - + + // let torrents connect to peers if they want to + // if there are any torrents and any free slots + if (!m_torrents.empty() && m_half_open.free_slots()) + { + torrent_map::iterator next_connect_torrent = m_torrents.begin(); + if (m_next_connect_torrent < int(m_torrents.size())) + std::advance(next_connect_torrent, m_next_connect_torrent); + else + m_next_connect_torrent = 0; + torrent_map::iterator i = next_connect_torrent; + do + { + torrent& t = *i->second; + if (t.want_more_peers()) + t.try_connect_peer(); + ++m_next_connect_torrent; + if (!m_half_open.free_slots()) break; + ++i; + if (i == m_torrents.end()) + { + assert(m_next_connect_torrent == int(m_torrents.size())); + i = m_torrents.begin(); + m_next_connect_torrent = 0; + } + } while (i != next_connect_torrent); + } + // do the second_tick() on each connection // this will update their statistics (download and upload speeds) // also purge sockets that have timed out @@ -972,8 +912,8 @@ namespace libtorrent { namespace detail // check each torrent for tracker updates // TODO: do this in a timer-event in each torrent instead - for (std::map >::iterator i - = m_torrents.begin(); i != m_torrents.end();) + for (torrent_map::iterator i = m_torrents.begin(); + i != m_torrents.end();) { torrent& t = *i->second; assert(!t.is_aborted()); @@ -982,8 +922,8 @@ namespace libtorrent { namespace detail tracker_request req = t.generate_tracker_request(); req.listen_port = m_external_listen_port; req.key = m_key; - m_tracker_manager.queue_request(m_strand, req, t.tracker_login() - , m_listen_interface.address(), i->second); + m_tracker_manager.queue_request(m_strand, m_half_open, req + , t.tracker_login(), m_listen_interface.address(), i->second); if (m_alerts.should_post(alert::info)) { @@ -1033,7 +973,7 @@ namespace libtorrent { namespace detail assert(false); #endif }; // msvc 7.1 seems to require this - +/* void session_impl::connection_completed( boost::intrusive_ptr const& p) try { @@ -1055,7 +995,7 @@ namespace libtorrent { namespace detail assert(false); #endif }; - +*/ void session_impl::operator()() { eh_initializer(); @@ -1115,9 +1055,11 @@ namespace libtorrent { namespace detail #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING) boost::shared_ptr tl(new tracker_logger(*this)); m_tracker_loggers.push_back(tl); - m_tracker_manager.queue_request(m_strand, req, login, m_listen_interface.address(), tl); + m_tracker_manager.queue_request(m_strand, m_half_open, req, login + , m_listen_interface.address(), tl); #else - m_tracker_manager.queue_request(m_strand, req, login, m_listen_interface.address()); + m_tracker_manager.queue_request(m_strand, m_half_open, req, login + , m_listen_interface.address()); #endif } } @@ -1141,11 +1083,6 @@ namespace libtorrent { namespace detail assert(m_abort); m_abort = true; - m_connection_queue.clear(); - - while (!m_half_open.empty()) - m_half_open.begin()->second->disconnect(); - while (!m_connections.empty()) m_connections.begin()->second->disconnect(); @@ -1412,10 +1349,10 @@ namespace libtorrent { namespace detail #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING) boost::shared_ptr tl(new tracker_logger(*this)); m_tracker_loggers.push_back(tl); - m_tracker_manager.queue_request(m_strand, req + m_tracker_manager.queue_request(m_strand, m_half_open, req , t.tracker_login(), m_listen_interface.address(), tl); #else - m_tracker_manager.queue_request(m_strand, req + m_tracker_manager.queue_request(m_strand, m_half_open, req , t.tracker_login(), m_listen_interface.address()); #endif @@ -1775,7 +1712,7 @@ namespace libtorrent { namespace detail assert(limit > 0 || limit == -1); mutex_t::scoped_lock l(m_mutex); - m_half_open_limit = limit; + m_half_open.limit(limit); } void session_impl::set_upload_rate_limit(int bytes_per_second) @@ -1801,7 +1738,7 @@ namespace libtorrent { namespace detail int session_impl::num_connections() const { mutex_t::scoped_lock l(m_mutex); - return m_connections.size() + m_half_open.size(); + return m_connections.size(); } @@ -1835,31 +1772,10 @@ namespace libtorrent { namespace detail void session_impl::check_invariant(const char *place) { assert(place); - - for (connection_map::iterator i = m_half_open.begin(); - i != m_half_open.end(); ++i) - { - assert(i->second->is_connecting()); - } - for (connection_map::iterator i = m_connections.begin(); i != m_connections.end(); ++i) { assert(i->second); - assert(!i->second->is_connecting()); - if (i->second->is_connecting()) - { - std::ofstream error_log("error.log", std::ios_base::app); - boost::intrusive_ptr p = i->second; - error_log << "peer_connection::is_connecting() " << p->is_connecting() << "\n"; - error_log << "peer_connection::can_write() " << p->can_write() << "\n"; - error_log << "peer_connection::can_read() " << p->can_read() << "\n"; - error_log << "peer_connection::get_peer_id " << p->pid() << "\n"; - error_log << "place: " << place << "\n"; - error_log.flush(); - assert(false); - } - boost::shared_ptr t = i->second->associated_torrent().lock(); if (t) diff --git a/src/torrent.cpp b/src/torrent.cpp index 678ab9f8b..8ff1cfb7a 100755 --- a/src/torrent.cpp +++ b/src/torrent.cpp @@ -1533,22 +1533,17 @@ namespace libtorrent try { - m_ses.m_connection_queue.push_back(c); - assert(m_connections.find(a) == m_connections.end()); -#ifndef NDEBUG - m_policy->check_invariant(); -#endif // add the newly connected peer to this torrent's peer list m_connections.insert( std::make_pair(a, boost::get_pointer(c))); + m_ses.m_connections.insert(std::make_pair(s, c)); -#ifndef NDEBUG - m_policy->check_invariant(); -#endif - - m_ses.process_connection_queue(); + m_ses.m_half_open.enqueue( + bind(&peer_connection::connect, c, _1) + , bind(&peer_connection::timed_out, c) + , seconds(settings().peer_connect_timeout)); } catch (std::exception& e) { @@ -1925,20 +1920,15 @@ namespace libtorrent try { - m_ses.m_connection_queue.push_back(c); - -#ifndef NDEBUG - m_policy->check_invariant(); -#endif // add the newly connected peer to this torrent's peer list m_connections.insert( std::make_pair(a, boost::get_pointer(c))); + m_ses.m_connections.insert(std::make_pair(s, c)); -#ifndef NDEBUG - m_policy->check_invariant(); -#endif - - m_ses.process_connection_queue(); + m_ses.m_half_open.enqueue( + bind(&peer_connection::connect, c, _1) + , bind(&peer_connection::timed_out, c) + , seconds(settings().peer_connect_timeout)); } catch (std::exception& e) { @@ -2048,11 +2038,9 @@ namespace libtorrent bool torrent::want_more_peers() const { return int(m_connections.size()) < m_connections_quota.given - && (int(m_ses.m_half_open.size()) < m_ses.m_half_open_limit - || m_ses.m_half_open_limit <= 0); + && m_ses.m_half_open.free_slots(); } - void torrent::disconnect_all() { session_impl::mutex_t::scoped_lock l(m_ses.m_mutex); @@ -2662,6 +2650,12 @@ namespace libtorrent m_stat.second_tick(tick_interval); } + void torrent::try_connect_peer() + { + assert(want_more_peers()); + m_policy->connect_one_peer(); + } + void torrent::distribute_resources(float tick_interval) { INVARIANT_CHECK; diff --git a/src/tracker_manager.cpp b/src/tracker_manager.cpp index da5333b89..df69d9098 100755 --- a/src/tracker_manager.cpp +++ b/src/tracker_manager.cpp @@ -481,6 +481,7 @@ namespace libtorrent void tracker_manager::queue_request( asio::strand& str + , connection_queue& cc , tracker_request req , std::string const& auth , address bind_infc @@ -507,6 +508,7 @@ namespace libtorrent { con = new http_tracker_connection( str + , cc , *this , req , hostname diff --git a/src/upnp.cpp b/src/upnp.cpp index f8402e1c4..7c3aa968c 100644 --- a/src/upnp.cpp +++ b/src/upnp.cpp @@ -37,6 +37,8 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/io.hpp" #include "libtorrent/http_tracker_connection.hpp" #include "libtorrent/xml_parse.hpp" +#include "libtorrent/connection_queue.hpp" + #include #include #include @@ -50,8 +52,9 @@ using namespace libtorrent; address_v4 upnp::upnp_multicast_address; udp::endpoint upnp::upnp_multicast_endpoint; -upnp::upnp(io_service& ios, address const& listen_interface - , std::string const& user_agent, portmap_callback_t const& cb) +upnp::upnp(io_service& ios, connection_queue& cc + , address const& listen_interface, std::string const& user_agent + , portmap_callback_t const& cb) : m_udp_local_port(0) , m_tcp_local_port(0) , m_user_agent(user_agent) @@ -63,6 +66,7 @@ upnp::upnp(io_service& ios, address const& listen_interface , m_strand(ios) , m_disabled(false) , m_closing(false) + , m_cc(cc) { // UPnP multicast address and port upnp_multicast_address = address_v4::from_string("239.255.255.250"); @@ -232,7 +236,8 @@ void upnp::resend_request(asio::error_code const& e) // ask for it rootdevice& d = const_cast(*i); d.upnp_connection.reset(new http_connection(m_socket.io_service() - , m_strand.wrap(bind(&upnp::on_upnp_xml, this, _1, _2, boost::ref(d))))); + , m_cc, m_strand.wrap(bind(&upnp::on_upnp_xml, this, _1, _2 + , boost::ref(d))))); d.upnp_connection->get(d.url); } } @@ -390,7 +395,7 @@ void upnp::map_port(rootdevice& d, int i) d.mapping[i].need_update = false; assert(!d.upnp_connection); d.upnp_connection.reset(new http_connection(m_socket.io_service() - , m_strand.wrap(bind(&upnp::on_upnp_map_response, this, _1, _2 + , m_cc, m_strand.wrap(bind(&upnp::on_upnp_map_response, this, _1, _2 , boost::ref(d), i)))); std::string soap_action = "AddPortMapping"; @@ -436,7 +441,7 @@ void upnp::unmap_port(rootdevice& d, int i) return; } d.upnp_connection.reset(new http_connection(m_socket.io_service() - , m_strand.wrap(bind(&upnp::on_upnp_unmap_response, this, _1, _2 + , m_cc, m_strand.wrap(bind(&upnp::on_upnp_unmap_response, this, _1, _2 , boost::ref(d), i)))); std::string soap_action = "DeletePortMapping";