land branch to remove half-open connection limit / connection queue

This commit is contained in:
Arvid Norberg 2014-10-03 20:56:57 +00:00
parent 65473fa783
commit ceccc2a483
41 changed files with 250 additions and 1410 deletions

View File

@ -16,7 +16,6 @@ set(sources
block_cache
bloom_filter
chained_buffer
connection_queue
create_torrent
disk_buffer_holder
entry

View File

@ -518,7 +518,6 @@ SOURCES =
block_cache
bloom_filter
chained_buffer
connection_queue
crc32c
create_torrent
disk_buffer_holder

View File

@ -27,7 +27,6 @@ nobase_include_HEADERS = \
chained_buffer.hpp \
config.hpp \
connection_interface.hpp \
connection_queue.hpp \
ConvertUTF.h \
copy_ptr.hpp \
cpuid.hpp \

View File

@ -78,7 +78,6 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/file_pool.hpp"
#include "libtorrent/bandwidth_manager.hpp"
#include "libtorrent/socket_type.hpp"
#include "libtorrent/connection_queue.hpp"
#include "libtorrent/disk_io_thread.hpp"
#include "libtorrent/udp_socket.hpp"
#include "libtorrent/assert.hpp"
@ -317,8 +316,7 @@ namespace libtorrent
peer_id const& get_peer_id() const { return m_peer_id; }
void close_connection(peer_connection* p, error_code const& ec
, bool cancel_in_cq);
void close_connection(peer_connection* p, error_code const& ec);
#ifndef TORRENT_NO_DEPRECATE
void set_settings(libtorrent::session_settings const& s);
@ -472,18 +470,13 @@ namespace libtorrent
void set_local_upload_rate_limit(int bytes_per_second);
void set_download_rate_limit(int bytes_per_second);
void set_upload_rate_limit(int bytes_per_second);
void set_max_half_open_connections(int limit);
void set_max_connections(int limit);
void set_max_uploads(int limit);
int max_connections() const;
int max_uploads() const;
int max_half_open_connections() const;
#endif
bool half_open_done(int ticket)
{ return m_half_open.done(ticket); }
bandwidth_manager* get_bandwidth_manager(int channel);
int upload_rate_limit(peer_class_t c) const;
@ -638,7 +631,6 @@ namespace libtorrent
std::vector<block_info>& block_info_storage() { return m_block_info_storage; }
connection_queue& half_open() { return m_half_open; }
libtorrent::utp_socket_manager* utp_socket_manager() { return &m_utp_socket_manager; }
void inc_boost_connections() { ++m_boost_connections; }
@ -685,7 +677,6 @@ namespace libtorrent
void update_dht_announce_interval();
void update_anonymous_mode();
void update_force_proxy();
void update_half_open();
void update_download_rate();
void update_upload_rate();
void update_connections_limit();
@ -747,12 +738,6 @@ namespace libtorrent
// to distribute its cost to multiple threads
std::vector<boost::shared_ptr<network_thread_pool> > m_net_thread_pool;
// this is a list of half-open tcp connections
// (only outgoing connections)
// this has to be one of the last
// members to be destructed
connection_queue m_half_open;
// the bandwidth manager is responsible for
// handing out bandwidth to connections that
// asks for it, it can also throttle the

View File

@ -78,7 +78,6 @@ namespace libtorrent
struct disk_interface;
struct tracker_request;
struct request_callback;
class connection_queue;
struct utp_socket_manager;
struct socket_type;
struct block_info;
@ -166,7 +165,7 @@ namespace libtorrent { namespace aux
// does nothing if the peer is already corked
virtual void cork_burst(peer_connection* p) = 0;
virtual void close_connection(peer_connection* p, error_code const& ec, bool cancel_with_cq) = 0;
virtual void close_connection(peer_connection* p, error_code const& ec) = 0;
virtual int num_connections() const = 0;
virtual char* allocate_buffer() = 0;
@ -234,9 +233,6 @@ namespace libtorrent { namespace aux
virtual void sent_syn(bool ipv6) = 0;
virtual void received_synack(bool ipv6) = 0;
// half-open
virtual bool half_open_done(int ticket) = 0;
virtual int peak_up_rate() const = 0;
enum torrent_list_index
@ -268,7 +264,6 @@ namespace libtorrent { namespace aux
virtual bool has_lsd() const = 0;
virtual void announce_lsd(sha1_hash const& ih, int port, bool broadcast = false) = 0;
virtual connection_queue& half_open() = 0;
virtual libtorrent::utp_socket_manager* utp_socket_manager() = 0;
virtual void inc_boost_connections() = 0;
virtual void setup_socket_buffers(socket_type& s) = 0;

View File

@ -1,55 +0,0 @@
/*
Copyright (c) 2012-2013, Arvid Norberg
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in
the documentation and/or other materials provided with the distribution.
* Neither the name of the author nor the names of its
contributors may be used to endorse or promote products derived
from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef TORRENT_CONNECTION_INTERFACE_HPP
#define TORRENT_CONNECTION_INTERFACE_HPP
namespace libtorrent
{
struct connection_interface
{
// called when the connection may be initiated
// this is when the timeout countdown starts
virtual void on_allow_connect(int ticket) = 0;
// called if done() hasn't been called within the timeout
// or if the connection queue aborts. This means there
// are 3 different interleaves of these function calls:
// 1. on_connect
// 2. on_connect, on_timeout
// 3. on_timeout
virtual void on_connect_timeout() = 0;
};
}
#endif

View File

@ -1,140 +0,0 @@
/*
Copyright (c) 2007-2014, Arvid Norberg
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in
the documentation and/or other materials provided with the distribution.
* Neither the name of the author nor the names of its
contributors may be used to endorse or promote products derived
from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef TORRENT_CONNECTION_QUEUE_HPP
#define TORRENT_CONNECTION_QUEUE_HPP
#include <vector>
#include <map>
#include <boost/noncopyable.hpp>
#include "libtorrent/io_service.hpp"
#include "libtorrent/error_code.hpp"
#include "libtorrent/deadline_timer.hpp"
#ifdef TORRENT_CONNECTION_LOGGING
#include <fstream>
#endif
#include "libtorrent/thread.hpp"
#include "libtorrent/debug.hpp"
namespace libtorrent
{
struct connection_interface;
class TORRENT_EXTRA_EXPORT connection_queue
: public boost::noncopyable
, single_threaded
{
public:
connection_queue(io_service& ios);
// if there are no free slots, returns the negative
// number of queued up connections
int free_slots() const;
void enqueue(connection_interface* conn
, time_duration timeout, int priority = 0);
bool cancel(connection_interface* conn);
bool done(int ticket);
void limit(int limit);
int limit() const;
void close();
int size() const { return m_queue.size(); }
int num_connecting() const { return int(m_connecting.size()); }
#if defined TORRENT_ASIO_DEBUGGING
float next_timeout() const { return total_milliseconds(m_timer.expires_at() - time_now_hires()) / 1000.f; }
float max_timeout() const
{
ptime max_timeout = min_time();
for (std::map<int, connect_entry>::const_iterator i = m_connecting.begin()
, end(m_connecting.end()); i != end; ++i)
{
if (i->second.expires > max_timeout) max_timeout = i->second.expires;
}
if (max_timeout == min_time()) return 0.f;
return total_milliseconds(max_timeout - time_now_hires()) / 1000.f;
}
#endif
#if TORRENT_USE_INVARIANT_CHECKS
void check_invariant() const;
#endif
private:
void try_connect();
void on_timeout(error_code const& e);
void on_try_connect();
struct queue_entry
{
queue_entry(): conn(0), priority(0) {}
connection_interface* conn;
time_duration timeout;
boost::int32_t ticket;
bool connecting;
boost::uint8_t priority;
};
struct connect_entry
{
connect_entry(): conn(0), expires(max_time()), priority(0) {}
connection_interface* conn;
ptime expires;
int priority;
};
std::vector<queue_entry> m_queue;
std::map<int, connect_entry> m_connecting;
// the next ticket id a connection will be given
int m_next_ticket;
int m_half_open_limit;
// the number of outstanding timers
int m_num_timers;
deadline_timer m_timer;
#ifdef TORRENT_DEBUG
bool m_in_timeout_function;
#endif
#ifdef TORRENT_CONNECTION_LOGGING
std::ofstream m_log;
#endif
};
}
#endif

View File

@ -50,7 +50,6 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/assert.hpp"
#include "libtorrent/socket_type.hpp"
#include "libtorrent/session_settings.hpp"
#include "libtorrent/connection_interface.hpp"
#include "libtorrent/i2p_stream.hpp"
@ -62,7 +61,6 @@ namespace libtorrent
{
struct http_connection;
class connection_queue;
struct resolver_interface;
const int default_max_bottled_buffer_size = 2*1024*1024;
@ -77,12 +75,10 @@ typedef boost::function<void(http_connection&, std::vector<tcp::endpoint>&)> htt
// when bottled, the last two arguments to the handler
// will always be 0
struct TORRENT_EXTRA_EXPORT http_connection
: connection_interface
, boost::enable_shared_from_this<http_connection>
: boost::enable_shared_from_this<http_connection>
, boost::noncopyable
{
http_connection(io_service& ios
, connection_queue& cc
, resolver_interface& resolver
, http_handler const& handler
, bool bottled = true
@ -136,9 +132,7 @@ private:
#endif
void on_resolve(error_code const& e
, std::vector<address> const& addresses);
void queue_connect();
void on_allow_connect(int ticket);
void on_connect_timeout();
void connect();
void on_connect(error_code const& e);
void on_write(error_code const& e);
void on_read(error_code const& e, std::size_t bytes_transferred);
@ -156,11 +150,6 @@ private:
std::vector<tcp::endpoint> m_endpoints;
// used to keep us alive when queued in the connection_queue
boost::shared_ptr<http_connection> m_self_reference;
connection_queue& m_cc;
#ifdef TORRENT_USE_OPENSSL
asio::ssl::context* m_ssl_ctx;
bool m_own_ssl_context;
@ -201,8 +190,6 @@ private:
// the number of redirects to follow (in sequence)
int m_redirects;
int m_connection_ticket;
// maximum size of bottled buffer
int m_max_bottled_buffer_size;
@ -236,8 +223,6 @@ private:
// 0 and continue to hand out quota at that time.
bool m_limiter_timer_active;
bool m_queued_for_connection;
// true if the connection is using ssl
bool m_ssl;

View File

@ -58,7 +58,6 @@ namespace libtorrent
struct http_connection;
class entry;
class http_parser;
class connection_queue;
namespace aux { struct session_impl; struct session_settings; }
class TORRENT_EXTRA_EXPORT http_tracker_connection
@ -69,7 +68,6 @@ namespace libtorrent
http_tracker_connection(
io_service& ios
, connection_queue& cc
, tracker_manager& man
, tracker_request const& req
, boost::weak_ptr<request_callback> c
@ -99,7 +97,6 @@ namespace libtorrent
boost::shared_ptr<http_connection> m_tracker_connection;
aux::session_impl& m_ses;
address m_tracker_ip;
connection_queue& m_cc;
io_service& m_ios;
#if TORRENT_USE_I2P
i2p_connection* m_i2p_conn;

View File

@ -81,7 +81,6 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/peer_class_set.hpp"
#include "libtorrent/aux_/session_settings.hpp"
#include "libtorrent/disk_observer.hpp"
#include "libtorrent/connection_interface.hpp"
#include "libtorrent/peer_connection_interface.hpp"
#include "libtorrent/piece_picker.hpp" // for piece_block
#include "libtorrent/socket.hpp" // for tcp::endpoint
@ -289,7 +288,6 @@ namespace libtorrent
, public bandwidth_socket
, public peer_class_set
, public disk_observer
, public connection_interface
, public peer_connection_interface
, public boost::enable_shared_from_this<peer_connection>
{
@ -525,25 +523,6 @@ namespace libtorrent
// finish the connection attempt
bool is_connecting() const { return m_connecting; }
// returns true if the socket of this peer hasn't been
// attempted to connect yet (i.e. it's queued for
// connection attempt).
bool is_queued() const { return m_queued; }
// returns true if this peer has successfully completed its connection
// attempt to the remote end.
bool is_connected() const { return m_connected; }
// called when it's time for this peer_conncetion to actually
// initiate the tcp connection. This may be postponed until
// the library isn't using up the limitation of half-open
// tcp connections.
// implements connection_interface
void on_allow_connect(int ticket);
// implements connection_interface. Called by the connection_queue
void on_connect_timeout();
// This is called for every peer right after the upload
// bandwidth has been distributed among them
// It will reset the used bandwidth to 0.
@ -1218,12 +1197,6 @@ namespace libtorrent
// by sending choke, unchoke.
int m_num_invalid_requests;
// the ticket id from the connection queue.
// This is used to identify the connection
// so that it can be removed from the queue
// once the connection completes
int m_connection_ticket;
// if [0] is -1, superseeding is not active. If it is >= 0
// this is the piece that is available to this peer. Only
// these two pieces can be downloaded from us by this peer.
@ -1317,11 +1290,6 @@ namespace libtorrent
// succeeded. i.e. the TCP 3-way handshake
bool m_connected:1;
// This is true until connect is called on the
// peer_connection's socket. It is false on incoming
// connections.
bool m_queued:1;
// if this is true, the blocks picked by the piece
// picker will be merged before passed to the
// request function. i.e. subsequent blocks are
@ -1374,12 +1342,6 @@ namespace libtorrent
// otherwise.
bool m_has_metadata:1;
// this is true while this connection is queued
// in the connection_queue. We may not destruct
// the connection while it is, since it's not
// held by an owning pointer, just a plain one
bool m_queued_for_connection:1;
// this is set to true if this peer was accepted exceeding
// the connection limit. It means it has to disconnect
// itself, or some other peer, as soon as it's completed

View File

@ -116,10 +116,15 @@ namespace libtorrent
// being connected).
connecting = 0x80,
#ifndef TORRENT_NO_DEPRECATE
// The connection is currently queued for a connection
// attempt. This may happen if there is a limit set on
// the number of half-open TCP connections.
queued = 0x100,
#else
// hidden
deprecated__ = 0x100,
#endif
// The peer has participated in a piece that failed the
// hash check, and is now "on parole", which means we're

View File

@ -1181,9 +1181,6 @@ namespace libtorrent
// a pipe or an eventfd.
void set_alert_dispatch(boost::function<void(std::auto_ptr<alert>)> const& fun);
// internal
connection_queue& get_connection_queue();
#ifndef TORRENT_NO_DEPRECATE
// Starts and stops Local Service Discovery. This service will broadcast
// the infohashes of all the non-private torrents on the local network to

View File

@ -1233,6 +1233,7 @@ namespace libtorrent
// ``choking_algorithm`` is set to.
unchoke_slots_limit,
#ifndef TORRENT_NO_DEPRECATE
// ``half_open_limit`` sets the maximum number of half-open connections
// libtorrent will have when connecting to peers. A half-open connection is one
// where connect() has been called, but the connection still hasn't been established
@ -1243,6 +1244,9 @@ namespace libtorrent
// limiting the number of simultaneous connection attempts, peers will be put in
// a queue waiting for their turn to get connected.
half_open_limit,
#else
deprecated5,
#endif
// ``connections_limit`` sets a global limit on the number of connections
// opened. The number of connections is set to a hard minimum of at least two per
@ -1290,7 +1294,7 @@ namespace libtorrent
#ifndef TORRENT_NO_DEPRECATE
utp_delayed_ack,
#else
deprecated5,
deprecated6,
#endif
utp_loss_multiplier,

View File

@ -59,7 +59,6 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/peer_id.hpp"
#include "libtorrent/peer.hpp" // peer_entry
#include "libtorrent/deadline_timer.hpp"
#include "libtorrent/connection_queue.hpp"
#include "libtorrent/intrusive_ptr_base.hpp"
#include "libtorrent/size_type.hpp"
#include "libtorrent/union_endpoint.hpp"
@ -319,7 +318,6 @@ namespace libtorrent
void queue_request(
io_service& ios
, connection_queue& cc
, tracker_request r
, std::string const& auth
, boost::weak_ptr<request_callback> c

View File

@ -39,7 +39,6 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/session_settings.hpp"
#include "libtorrent/buffer.hpp"
#include "libtorrent/thread.hpp"
#include "libtorrent/connection_interface.hpp"
#include "libtorrent/deadline_timer.hpp"
#include "libtorrent/debug.hpp"
@ -47,8 +46,6 @@ POSSIBILITY OF SUCH DAMAGE.
namespace libtorrent
{
class connection_queue;
struct udp_socket_observer
{
// return true if the packet was handled (it won't be
@ -66,10 +63,10 @@ namespace libtorrent
virtual void socket_drained() {}
};
class udp_socket : connection_interface, single_threaded
class udp_socket : single_threaded
{
public:
udp_socket(io_service& ios, connection_queue& cc);
udp_socket(io_service& ios);
~udp_socket();
enum flags_t { dont_drop = 1, peer_connection = 2, dont_queue = 4 };
@ -185,9 +182,8 @@ namespace libtorrent
void on_read_impl(udp::socket* sock, udp::endpoint const& ep
, error_code const& e, std::size_t bytes_transferred);
void on_name_lookup(error_code const& e, tcp::resolver::iterator i);
void on_connect_timeout();
void on_allow_connect(int ticket);
void on_connected(error_code const& ec, int ticket);
void on_connect_timeout(error_code const& ec);
void on_connected(error_code const& ec);
void handshake1(error_code const& e);
void handshake2(error_code const& e);
void handshake3(error_code const& e);
@ -204,6 +200,7 @@ namespace libtorrent
void unwrap(error_code const& e, char const* buf, int size);
udp::socket m_ipv4_sock;
deadline_timer m_timer;
int m_buf_size;
// if the buffer size is attempted
@ -225,9 +222,7 @@ namespace libtorrent
#endif
tcp::socket m_socks5_sock;
int m_connection_ticket;
proxy_settings m_proxy_settings;
connection_queue& m_cc;
tcp::resolver m_resolver;
char m_tmp_buf[270];
bool m_queue_packets;
@ -267,16 +262,13 @@ namespace libtorrent
int m_outstanding_connect;
int m_outstanding_timeout;
int m_outstanding_resolve;
int m_outstanding_connect_queue;
int m_outstanding_socks;
char timeout_stack[2000];
#endif
};
struct rate_limited_udp_socket : public udp_socket
{
rate_limited_udp_socket(io_service& ios, connection_queue& cc);
rate_limited_udp_socket(io_service& ios);
void set_rate_limit(int limit) { m_rate_limit = limit; }
bool send(udp::endpoint const& ep, char const* p, int len
, error_code& ec, int flags = 0);

View File

@ -68,7 +68,6 @@ namespace libtorrent
udp_tracker_connection(
io_service& ios
, connection_queue& cc
, tracker_manager& man
, tracker_request const& req
, boost::weak_ptr<request_callback> c

View File

@ -37,7 +37,6 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/error_code.hpp"
#include "libtorrent/broadcast_socket.hpp"
#include "libtorrent/http_connection.hpp"
#include "libtorrent/connection_queue.hpp"
#include "libtorrent/intrusive_ptr_base.hpp"
#include "libtorrent/thread.hpp"
#include "libtorrent/deadline_timer.hpp"
@ -114,7 +113,7 @@ typedef boost::function<void(char const*)> log_callback_t;
class TORRENT_EXTRA_EXPORT upnp : public intrusive_ptr_base<upnp>
{
public:
upnp(io_service& ios, connection_queue& cc
upnp(io_service& ios
, address const& listen_interface, std::string const& user_agent
, portmap_callback_t const& cb, log_callback_t const& lcb
, bool ignore_nonrouters, void* state = 0);
@ -370,8 +369,6 @@ private:
bool m_closing;
bool m_ignore_non_routers;
connection_queue& m_cc;
mutex m_mutex;
std::string m_model;

View File

@ -33,7 +33,6 @@ POSSIBILITY OF SUCH DAMAGE.
#ifndef TORRENT_UTP_STREAM_HPP_INCLUDED
#define TORRENT_UTP_STREAM_HPP_INCLUDED
#include "libtorrent/connection_queue.hpp"
#include "libtorrent/proxy_base.hpp"
#include "libtorrent/udp_socket.hpp"
#include "libtorrent/io.hpp"

View File

@ -400,8 +400,7 @@ namespace libtorrent
if (!is_connecting() && in_handshake())
p.flags |= peer_info::handshake;
if (is_connecting() && !is_queued()) p.flags |= peer_info::connecting;
if (is_queued()) p.flags |= peer_info::queued;
if (is_connecting()) p.flags |= peer_info::connecting;
p.client = m_client_version;
p.connection_type = peer_info::standard_bittorrent;

View File

@ -1,340 +0,0 @@
/*
Copyright (c) 2007-2014, Arvid Norberg
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in
the documentation and/or other materials provided with the distribution.
* Neither the name of the author nor the names of its
contributors may be used to endorse or promote products derived
from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
*/
#include "libtorrent/config.hpp"
#include "libtorrent/invariant_check.hpp"
#include "libtorrent/connection_queue.hpp"
#include "libtorrent/io_service.hpp"
#include "libtorrent/error_code.hpp"
#include "libtorrent/error.hpp"
#include "libtorrent/connection_interface.hpp"
#include <boost/bind.hpp>
#include <algorithm>
#if defined TORRENT_ASIO_DEBUGGING
#include "libtorrent/debug.hpp"
#endif
namespace libtorrent
{
connection_queue::connection_queue(io_service& ios): m_next_ticket(0)
, m_half_open_limit(0)
, m_num_timers(0)
, m_timer(ios)
#ifdef TORRENT_DEBUG
, m_in_timeout_function(false)
#endif
{
#ifdef TORRENT_CONNECTION_LOGGING
m_log.open("connection_queue.log");
#endif
}
int connection_queue::free_slots() const
{
TORRENT_ASSERT(is_single_thread());
return m_half_open_limit == 0 ? (std::numeric_limits<int>::max)()
: m_half_open_limit - m_queue.size();
}
void connection_queue::enqueue(connection_interface* conn
, time_duration timeout, int priority)
{
TORRENT_ASSERT(is_single_thread());
INVARIANT_CHECK;
TORRENT_ASSERT(priority >= 0);
TORRENT_ASSERT(priority < 3);
queue_entry e;
e.priority = priority;
e.conn = conn;
e.timeout = timeout;
if (priority <= 0)
{
m_queue.push_back(e);
}
else // priority > 0
{
m_queue.insert(m_queue.begin(), e);
}
if (num_connecting() < m_half_open_limit
|| m_half_open_limit == 0)
m_timer.get_io_service().post(boost::bind(
&connection_queue::on_try_connect, this));
}
bool connection_queue::cancel(connection_interface* conn)
{
std::vector<queue_entry>::iterator i = std::find_if(
m_queue.begin(), m_queue.end(), boost::bind(&queue_entry::conn, _1) == conn);
if (i == m_queue.end())
{
#if defined TORRENT_DEBUG || TORRENT_RELEASE_ASSERTS
// assert the connection is not in the connecting list
for (std::map<int, connect_entry>::iterator i = m_connecting.begin();
i != m_connecting.end(); ++i)
{
TORRENT_ASSERT(i->second.conn != conn);
}
#endif
return false;
}
m_queue.erase(i);
return true;
}
bool connection_queue::done(int ticket)
{
TORRENT_ASSERT(is_single_thread());
INVARIANT_CHECK;
std::map<int, connect_entry>::iterator i = m_connecting.find(ticket);
// this might not be here in case on_timeout calls remove
if (i == m_connecting.end()) return false;
m_connecting.erase(i);
if (num_connecting() < m_half_open_limit
|| m_half_open_limit == 0)
m_timer.get_io_service().post(boost::bind(
&connection_queue::on_try_connect, this));
return true;
}
void connection_queue::close()
{
TORRENT_ASSERT(is_single_thread());
error_code ec;
if (num_connecting() == 0) m_timer.cancel(ec);
std::vector<queue_entry> tmp;
tmp.swap(m_queue);
while (!tmp.empty())
{
queue_entry& e = tmp.front();
if (e.priority > 1)
{
m_queue.push_back(e);
tmp.erase(tmp.begin());
continue;
}
TORRENT_TRY {
e.conn->on_allow_connect(-1);
} TORRENT_CATCH(std::exception&) {}
tmp.erase(tmp.begin());
}
std::vector<std::pair<int, connect_entry> > tmp2;
for (std::map<int, connect_entry>::iterator i = m_connecting.begin();
i != m_connecting.end();)
{
if (i->second.priority <= 1)
{
tmp2.push_back(*i);
m_connecting.erase(i++);
}
else
{
++i;
}
}
while (!tmp2.empty())
{
std::pair<int, connect_entry>& e = tmp2.back();
TORRENT_TRY {
e.second.conn->on_connect_timeout();
} TORRENT_CATCH(std::exception&) {}
tmp2.erase(tmp2.end()-1);
}
}
void connection_queue::limit(int limit)
{
TORRENT_ASSERT(limit >= 0);
m_half_open_limit = limit;
}
int connection_queue::limit() const
{ return m_half_open_limit; }
#if TORRENT_USE_INVARIANT_CHECKS
void connection_queue::check_invariant() const
{
}
#endif
void connection_queue::try_connect()
{
TORRENT_ASSERT(is_single_thread());
INVARIANT_CHECK;
#ifdef TORRENT_CONNECTION_LOGGING
m_log << log_time() << " " << free_slots() << std::endl;
#endif
if (num_connecting() >= m_half_open_limit
&& m_half_open_limit > 0) return;
if (m_queue.empty() && m_connecting.empty())
{
error_code ec;
m_timer.cancel(ec);
return;
}
// all entries are connecting, no need to look for new ones
if (m_queue.empty()) return;
while (!m_queue.empty())
{
if (num_connecting() >= m_half_open_limit
&& m_half_open_limit > 0) break;
queue_entry e = m_queue.front();
m_queue.erase(m_queue.begin());
ptime expire = time_now_hires() + e.timeout;
if (num_connecting() == 0)
{
#if defined TORRENT_ASIO_DEBUGGING
add_outstanding_async("connection_queue::on_timeout");
#endif
error_code ec;
m_timer.expires_at(expire, ec);
m_timer.async_wait(boost::bind(&connection_queue::on_timeout, this, _1));
++m_num_timers;
}
connect_entry ce;
ce.conn = e.conn;
ce.priority = e.priority;
ce.expires = time_now_hires() + e.timeout;
int ticket = ++m_next_ticket;
m_connecting.insert(std::make_pair(ticket, ce));
TORRENT_TRY {
ce.conn->on_allow_connect(ticket);
} TORRENT_CATCH(std::exception&) {}
#ifdef TORRENT_CONNECTION_LOGGING
m_log << log_time() << " " << free_slots() << std::endl;
#endif
}
}
#ifdef TORRENT_DEBUG
struct function_guard
{
function_guard(bool& v): val(v) { TORRENT_ASSERT(!val); val = true; }
~function_guard() { val = false; }
bool& val;
};
#endif
void connection_queue::on_timeout(error_code const& e)
{
#if defined TORRENT_ASIO_DEBUGGING
complete_async("connection_queue::on_timeout");
#endif
--m_num_timers;
INVARIANT_CHECK;
#ifdef TORRENT_DEBUG
function_guard guard_(m_in_timeout_function);
#endif
TORRENT_ASSERT(!e || e == error::operation_aborted);
// if there was an error, it's most likely operation aborted,
// we should just quit. However, in case there are still connections
// in connecting state, and there are no other timer invocations
// we need to stick around still.
if (e && (num_connecting() == 0 || m_num_timers > 0)) return;
ptime next_expire = max_time();
ptime now = time_now_hires() + milliseconds(100);
std::vector<connect_entry> timed_out;
for (std::map<int, connect_entry>::iterator i = m_connecting.begin();
!m_connecting.empty() && i != m_connecting.end(); ++i)
{
if (i->second.expires < now)
{
timed_out.push_back(i->second);
continue;
}
if (i->second.expires < next_expire)
next_expire = i->second.expires;
}
for (std::vector<connect_entry>::iterator i = timed_out.begin()
, end(timed_out.end()); i != end; ++i)
{
TORRENT_TRY {
i->conn->on_connect_timeout();
} TORRENT_CATCH(std::exception&) {}
}
if (next_expire < max_time())
{
#if defined TORRENT_ASIO_DEBUGGING
add_outstanding_async("connection_queue::on_timeout");
#endif
error_code ec;
m_timer.expires_at(next_expire, ec);
m_timer.async_wait(boost::bind(&connection_queue::on_timeout, this, _1));
++m_num_timers;
}
try_connect();
}
void connection_queue::on_try_connect()
{
try_connect();
}
}

View File

@ -36,7 +36,6 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/gzip.hpp"
#include "libtorrent/parse_url.hpp"
#include "libtorrent/socket.hpp"
#include "libtorrent/connection_queue.hpp"
#include "libtorrent/socket_type.hpp" // for async_shutdown
#include "libtorrent/resolver_interface.hpp"
#include "libtorrent/settings_pack.hpp"
@ -52,7 +51,6 @@ POSSIBILITY OF SUCH DAMAGE.
namespace libtorrent {
http_connection::http_connection(io_service& ios
, connection_queue& cc
, resolver_interface& resolver
, http_handler const& handler
, bool bottled
@ -63,16 +61,16 @@ http_connection::http_connection(io_service& ios
, boost::asio::ssl::context* ssl_ctx
#endif
)
: m_cc(cc)
:
#ifdef TORRENT_USE_OPENSSL
, m_ssl_ctx(ssl_ctx)
, m_own_ssl_context(false)
m_ssl_ctx(ssl_ctx),
m_own_ssl_context(false),
#endif
, m_sock(ios)
m_sock(ios),
#if TORRENT_USE_I2P
, m_i2p_conn(0)
m_i2p_conn(0),
#endif
, m_resolver(resolver)
m_resolver(resolver)
, m_handler(handler)
, m_connect_handler(ch)
, m_filter_handler(fh)
@ -82,7 +80,6 @@ http_connection::http_connection(io_service& ios
, m_start_time(time_now())
, m_read_pos(0)
, m_redirects(5)
, m_connection_ticket(-1)
, m_max_bottled_buffer_size(max_bottled_buffer_size)
, m_rate_limit(0)
, m_download_quota(0)
@ -92,7 +89,6 @@ http_connection::http_connection(io_service& ios
, m_bottled(bottled)
, m_called(false)
, m_limiter_timer_active(false)
, m_queued_for_connection(false)
, m_ssl(false)
, m_abort(false)
{
@ -101,7 +97,6 @@ http_connection::http_connection(io_service& ios
http_connection::~http_connection()
{
TORRENT_ASSERT(m_connection_ticket == -1);
#ifdef TORRENT_USE_OPENSSL
if (m_own_ssl_context) delete m_ssl_ctx;
#endif
@ -242,7 +237,8 @@ void http_connection::start(std::string const& hostname, int port
m_read_timeout = seconds(5);
if (m_read_timeout < timeout / 5) m_read_timeout = timeout / 5;
error_code ec;
m_timer.expires_from_now(m_completion_timeout, ec);
m_timer.expires_from_now((std::min)(
m_read_timeout, m_completion_timeout), ec);
#if defined TORRENT_ASIO_DEBUGGING
add_outstanding_async("http_connection::on_timeout");
#endif
@ -384,14 +380,13 @@ void http_connection::start(std::string const& hostname, int port
m_hostname = hostname;
m_port = port;
m_endpoints.push_back(tcp::endpoint(address(), port));
queue_connect();
connect();
}
else
{
#if defined TORRENT_ASIO_DEBUGGING
add_outstanding_async("http_connection::on_resolve");
#endif
TORRENT_ASSERT(!m_self_reference);
m_endpoints.clear();
m_resolver.async_resolve(hostname, m_resolve_flags
, boost::bind(&http_connection::on_resolve
@ -402,21 +397,6 @@ void http_connection::start(std::string const& hostname, int port
}
}
void http_connection::on_connect_timeout()
{
TORRENT_ASSERT(m_connection_ticket > -1);
TORRENT_ASSERT(!m_queued_for_connection);
// keep ourselves alive even if the callback function
// deletes this object
boost::shared_ptr<http_connection> me(shared_from_this());
error_code ec;
m_sock.close(ec);
m_self_reference.reset();
}
void http_connection::on_timeout(boost::weak_ptr<http_connection> p
, error_code const& e)
{
@ -433,27 +413,26 @@ void http_connection::on_timeout(boost::weak_ptr<http_connection> p
if (c->m_start_time + c->m_completion_timeout < now
|| c->m_last_receive + c->m_read_timeout < now)
{
if (c->m_connection_ticket > -1 && !c->m_endpoints.empty())
if (!c->m_endpoints.empty())
{
#if defined TORRENT_ASIO_DEBUGGING
add_outstanding_async("http_connection::on_timeout");
#endif
error_code ec;
async_shutdown(c->m_sock, c);
c->m_timer.expires_at((std::min)(
c->m_last_receive + c->m_read_timeout
, c->m_start_time + c->m_completion_timeout), ec);
c->m_timer.async_wait(boost::bind(&http_connection::on_timeout, p, _1));
}
else
{
c->callback(asio::error::timed_out);
c->close(true);
return;
}
return;
}
else
{
if (!c->m_sock.is_open()) return;
}
if (!c->m_sock.is_open()) return;
#if defined TORRENT_ASIO_DEBUGGING
add_outstanding_async("http_connection::on_timeout");
#endif
@ -474,15 +453,6 @@ void http_connection::close(bool force)
else
async_shutdown(m_sock, shared_from_this());
if (m_queued_for_connection)
m_cc.cancel(this);
if (m_connection_ticket > -1)
{
m_cc.done(m_connection_ticket);
m_connection_ticket = -1;
}
m_timer.cancel(ec);
m_limiter_timer.cancel(ec);
@ -568,45 +538,24 @@ void http_connection::on_resolve(error_code const& e
== m_bind_addr.is_v4());
#endif
queue_connect();
connect();
}
void http_connection::queue_connect()
void http_connection::connect()
{
TORRENT_ASSERT(!m_endpoints.empty());
m_self_reference = shared_from_this();
m_cc.enqueue(this, m_read_timeout, m_priority);
m_queued_for_connection = true;
}
void http_connection::on_allow_connect(int ticket)
{
TORRENT_ASSERT(m_queued_for_connection);
m_queued_for_connection = false;
boost::shared_ptr<http_connection> me(shared_from_this());
m_self_reference.reset();
#if defined TORRENT_ASIO_DEBUGGING
TORRENT_ASSERT(has_outstanding_async("connection_queue::on_timeout"));
#endif
if (ticket == -1)
{
close();
return;
}
TORRENT_ASSERT(!m_endpoints.empty());
if (m_endpoints.empty())
{
m_cc.done(ticket);
return;
}
if (m_endpoints.empty()) return;
tcp::endpoint target_address = m_endpoints.front();
m_endpoints.erase(m_endpoints.begin());
m_connection_ticket = ticket;
if (m_proxy.proxy_hostnames
&& (m_proxy.type == settings_pack::socks5
|| m_proxy.type == settings_pack::socks5_pw))
@ -638,11 +587,6 @@ void http_connection::on_connect(error_code const& e)
#if defined TORRENT_ASIO_DEBUGGING
complete_async("http_connection::on_connect");
#endif
if (m_connection_ticket >= 0)
{
m_cc.done(m_connection_ticket);
m_connection_ticket = -1;
}
m_last_receive = time_now_hires();
m_start_time = m_last_receive;
@ -660,7 +604,7 @@ void http_connection::on_connect(error_code const& e)
// The connection failed. Try the next endpoint in the list.
error_code ec;
m_sock.close(ec);
queue_connect();
connect();
}
else
{

View File

@ -71,7 +71,6 @@ namespace libtorrent
http_tracker_connection::http_tracker_connection(
io_service& ios
, connection_queue& cc
, tracker_manager& man
, tracker_request const& req
, boost::weak_ptr<request_callback> c
@ -84,7 +83,6 @@ namespace libtorrent
: tracker_connection(man, req, ios, c)
, m_man(man)
, m_ses(ses)
, m_cc(cc)
, m_ios(ios)
#if TORRENT_USE_I2P
, m_i2p_conn(i2p_conn)
@ -206,7 +204,7 @@ namespace libtorrent
}
}
m_tracker_connection.reset(new http_connection(m_ios, m_cc, m_ses.m_host_resolver
m_tracker_connection.reset(new http_connection(m_ios, m_ses.m_host_resolver
, boost::bind(&http_tracker_connection::on_response, self(), _1, _2, _3, _4)
, true, settings.get_int(settings_pack::max_http_recv_buffer_size)
, boost::bind(&http_tracker_connection::on_connect, self(), _1)

View File

@ -176,7 +176,6 @@ namespace libtorrent
, m_reading_bytes(0)
, m_picker_options(0)
, m_num_invalid_requests(0)
, m_connection_ticket(-1)
, m_remote_pieces_dled(0)
, m_remote_dl_rate(0)
, m_outstanding_writing_bytes(0)
@ -192,7 +191,6 @@ namespace libtorrent
, m_fast_reconnect(false)
, m_failed(false)
, m_connected(pack.tor.expired())
, m_queued(!pack.tor.expired())
, m_request_large_blocks(false)
, m_share_mode(false)
, m_upload_only(false)
@ -205,7 +203,6 @@ namespace libtorrent
, m_peer_interested(false)
, m_need_interest_update(false)
, m_has_metadata(true)
, m_queued_for_connection(false)
, m_exceeded_limit(false)
#if TORRENT_USE_ASSERTS
, m_in_constructor(true)
@ -544,10 +541,68 @@ namespace libtorrent
peer_log("*** CLASS [ %s ]", m_ses.peer_classes().at(class_at(i))->label.c_str());
}
#endif
if (t && t->ready_for_connections())
if (!t || !t->ready_for_connections())
return;
init();
error_code ec;
if (!t)
{
init();
TORRENT_ASSERT(!m_connecting);
disconnect(errors::torrent_aborted, op_bittorrent);
return;
}
TORRENT_ASSERT(m_connecting);
#if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
peer_log(">>> OPEN [ protocol: %s ]", (m_remote.address().is_v4()?"IPv4":"IPv6"));
#endif
m_socket->open(m_remote.protocol(), ec);
if (ec)
{
disconnect(ec, op_sock_open);
return;
}
tcp::endpoint bound_ip = m_ses.bind_outgoing_socket(*m_socket
, m_remote.address(), ec);
#if defined TORRENT_VERBOSE_LOGGING
peer_log(">>> BIND [ dst: %s ec: %s ]", print_endpoint(bound_ip).c_str()
, ec.message().c_str());
#endif
if (ec)
{
disconnect(ec, op_sock_bind);
return;
}
#if defined TORRENT_VERBOSE_LOGGING
peer_log(">>> ASYNC_CONNECT [ dst: %s ]", print_endpoint(m_remote).c_str());
#endif
#if defined TORRENT_ASIO_DEBUGGING
add_outstanding_async("peer_connection::on_connection_complete");
#endif
#if defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING
t->debug_log("START connect [%p] (%d)", this, int(t->num_peers()));
#endif
m_socket->async_connect(m_remote
, boost::bind(&peer_connection::on_connection_complete, self(), _1));
m_connect = time_now_hires();
sent_syn(m_remote.address().is_v6());
if (t->alerts().should_post<peer_connect_alert>())
{
t->alerts().post_alert(peer_connect_alert(
t->get_handle(), remote(), pid(), m_socket->type()));
}
#if defined TORRENT_VERBOSE_LOGGING
peer_log("*** LOCAL ENDPOINT[ e: %s ]", print_endpoint(m_socket->local_endpoint(ec)).c_str());
#endif
}
void peer_connection::update_interest()
@ -877,7 +932,6 @@ namespace libtorrent
{
m_counters.inc_stats_counter(counters::num_tcp_peers + m_socket->type() - 1, -1);
TORRENT_ASSERT(!m_queued_for_connection);
// INVARIANT_CHECK;
TORRENT_ASSERT(!m_in_constructor);
TORRENT_ASSERT(m_disconnecting);
@ -4005,21 +4059,6 @@ namespace libtorrent
}
}
void peer_connection::on_connect_timeout()
{
TORRENT_ASSERT(is_single_thread());
m_queued_for_connection = false;
#if defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING
boost::shared_ptr<torrent> t = m_torrent.lock();
if (t)
{
t->debug_log("END queue peer (timed out) [%p]", this);
}
#endif
connect_failed(errors::timed_out);
}
void peer_connection::connect_failed(error_code const& e)
{
TORRENT_ASSERT(is_single_thread());
@ -4043,12 +4082,6 @@ namespace libtorrent
m_connecting = false;
}
if (m_connection_ticket != -1)
{
if (m_ses.half_open_done(m_connection_ticket))
m_connection_ticket = -1;
}
// a connection attempt using uTP just failed
// mark this peer as not supporting uTP
// we'll never try it again (unless we're trying holepunch)
@ -4226,11 +4259,6 @@ namespace libtorrent
if (t) t->dec_num_connecting();
m_connecting = false;
}
if (m_connection_ticket >= 0)
{
if (m_ses.half_open_done(m_connection_ticket))
m_connection_ticket = -1;
}
torrent_handle handle;
if (t) handle = t->get_handle();
@ -4333,8 +4361,7 @@ namespace libtorrent
async_shutdown(*m_socket, m_socket);
m_ses.close_connection(this, ec, m_queued_for_connection);
m_queued_for_connection = false;
m_ses.close_connection(this, ec);
}
bool peer_connection::ignore_unchoke_slots() const
@ -4733,11 +4760,6 @@ namespace libtorrent
if (!t || m_disconnecting)
{
if (m_connection_ticket != -1)
{
if (m_ses.half_open_done(m_connection_ticket))
m_connection_ticket = -1;
}
TORRENT_ASSERT(t || !m_connecting);
if (m_connecting)
{
@ -4770,6 +4792,7 @@ namespace libtorrent
}
on_tick();
if (is_disconnecting()) return;
#ifndef TORRENT_DISABLE_EXTENSIONS
for (extension_list_t::iterator i = m_extensions.begin()
@ -4785,11 +4808,38 @@ namespace libtorrent
time_duration d;
d = (std::min)(now - m_last_receive, now - m_last_sent);
if (m_connecting)
{
int connect_timeout = m_settings.get_int(settings_pack::peer_connect_timeout);
if (m_peer_info) connect_timeout += 3 * m_peer_info->failcount;
// SSL and i2p handshakes are slow
if (is_ssl(*m_socket))
connect_timeout += 10;
#if TORRENT_USE_I2P
if (is_i2p(*m_socket))
connect_timeout += 20;
#endif
if (d > seconds(connect_timeout)
&& can_disconnect(error_code(errors::timed_out, get_libtorrent_category())))
{
#if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
peer_log("*** CONNECT FAILED [ waited %d seconds ] ***", int(total_seconds(d)));
#endif
connect_failed(errors::timed_out);
return;
}
}
// if we can't read, it means we're blocked on the rate-limiter
// or the disk, not the peer itself. In this case, don't blame
// the peer and disconnect it
bool may_timeout = (m_channel_state[download_channel] & peer_info::bw_network) != 0;
// TODO: 4 use a deadline_timer for timeouts. Don't rely on second_tick()!
// Hook this up to connect timeout as well
if (may_timeout && d > seconds(timeout()) && !m_connecting && m_reading_bytes == 0
&& can_disconnect(error_code(errors::timed_out_inactivity, get_libtorrent_category())))
{
@ -6290,101 +6340,6 @@ namespace libtorrent
return !m_connecting && !m_disconnecting;
}
void peer_connection::on_allow_connect(int ticket)
{
TORRENT_ASSERT(is_single_thread());
TORRENT_ASSERT(m_queued_for_connection);
m_queued_for_connection = false;
#if defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING
{
boost::shared_ptr<torrent> t = m_torrent.lock();
t->debug_log("END queue peer [%p]", this);
}
#endif
#if TORRENT_USE_ASSERTS
// in case we disconnect here, we need to
// keep the connection alive until the
// exit invariant check is run
boost::shared_ptr<peer_connection> me(self());
#endif
INVARIANT_CHECK;
error_code ec;
#if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING
m_ses.session_log("ON_CONNECT: %s", print_endpoint(m_remote).c_str());
#endif
if (ticket == -1)
{
disconnect(asio::error::operation_aborted, op_bittorrent);
return;
}
m_connection_ticket = ticket;
boost::shared_ptr<torrent> t = m_torrent.lock();
m_queued = false;
if (!t)
{
TORRENT_ASSERT(!m_connecting);
disconnect(errors::torrent_aborted, op_bittorrent);
return;
}
TORRENT_ASSERT(m_connecting);
#if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
peer_log(">>> OPEN [ protocol: %s ]", (m_remote.address().is_v4()?"IPv4":"IPv6"));
#endif
m_socket->open(m_remote.protocol(), ec);
if (ec)
{
disconnect(ec, op_sock_open);
return;
}
tcp::endpoint bound_ip = m_ses.bind_outgoing_socket(*m_socket
, m_remote.address(), ec);
#if defined TORRENT_VERBOSE_LOGGING
peer_log(">>> BIND [ dst: %s ec: %s ]", print_endpoint(bound_ip).c_str()
, ec.message().c_str());
#endif
if (ec)
{
disconnect(ec, op_sock_bind);
return;
}
#if defined TORRENT_VERBOSE_LOGGING
peer_log(">>> ASYNC_CONNECT [ dst: %s ]", print_endpoint(m_remote).c_str());
#endif
#if defined TORRENT_ASIO_DEBUGGING
add_outstanding_async("peer_connection::on_connection_complete");
#endif
#if defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING
t->debug_log("START connect [%p] (%d)", this, int(t->num_peers()));
#endif
m_socket->async_connect(m_remote
, boost::bind(&peer_connection::on_connection_complete, self(), _1));
m_connect = time_now_hires();
sent_syn(m_remote.address().is_v6());
if (t->alerts().should_post<peer_connect_alert>())
{
t->alerts().post_alert(peer_connect_alert(
t->get_handle(), remote(), pid(), m_socket->type()));
}
#if defined TORRENT_VERBOSE_LOGGING
peer_log("*** LOCAL ENDPOINT[ e: %s ]", print_endpoint(m_socket->local_endpoint(ec)).c_str());
#endif
}
void peer_connection::on_connection_complete(error_code const& e)
{
TORRENT_ASSERT(is_single_thread());
@ -6430,11 +6385,6 @@ namespace libtorrent
if (t) t->dec_num_connecting();
m_connecting = false;
}
if (m_connection_ticket != -1)
{
if (m_ses.half_open_done(m_connection_ticket))
m_connection_ticket = -1;
}
TORRENT_ASSERT(!m_connected);
m_connected = true;

View File

@ -584,7 +584,7 @@ int feed::update_feed()
}
boost::shared_ptr<http_connection> feed(
new http_connection(m_ses.m_io_service, m_ses.m_half_open
new http_connection(m_ses.m_io_service
, m_ses.m_host_resolver
, boost::bind(&feed::on_feed, shared_from_this()
, _1, _2, _3, _4)));

View File

@ -1085,16 +1085,6 @@ namespace libtorrent
TORRENT_ASYNC_CALL1(set_max_connections, limit);
}
int session::max_half_open_connections() const
{
return TORRENT_SYNC_CALL_RET(int, max_half_open_connections);
}
void session::set_max_half_open_connections(int limit)
{
TORRENT_ASYNC_CALL1(set_max_half_open_connections, limit);
}
int session::local_upload_rate_limit() const
{
return TORRENT_SYNC_CALL_RET(int, local_upload_rate_limit);
@ -1262,11 +1252,6 @@ namespace libtorrent
TORRENT_ASYNC_CALL1(delete_port_mapping, handle);
}
connection_queue& session::get_connection_queue()
{
return m_impl->m_half_open;
}
#ifndef TORRENT_NO_DEPRECATE
session_settings::session_settings(std::string const& user_agent_)
{

View File

@ -435,7 +435,6 @@ namespace aux {
, m_alerts(m_settings.get_int(settings_pack::alert_queue_size), alert::all_categories)
, m_disk_thread(m_io_service, this, m_stats_counters
, (uncork_interface*)this)
, m_half_open(m_io_service)
, m_download_rate(peer_connection::download_channel)
#ifdef TORRENT_VERBOSE_BANDWIDTH_LIMIT
, m_upload_rate(peer_connection::upload_channel, true)
@ -475,7 +474,7 @@ namespace aux {
, m_dht_interval_update_torrents(0)
#endif
, m_external_udp_port(0)
, m_udp_socket(m_io_service, m_half_open)
, m_udp_socket(m_io_service)
// TODO: 4 in order to support SSL over uTP, the utp_socket manager either
// needs to be able to receive packets on multiple ports, or we need to
// peek into the first few bytes the payload stream of a socket to determine
@ -576,79 +575,6 @@ namespace aux {
m_ssl_mapping[0] = -1;
m_ssl_mapping[1] = -1;
#endif
#ifdef WIN32
// windows XP has a limit on the number of
// simultaneous half-open TCP connections
// here's a table:
// windows version half-open connections limit
// --------------------- ---------------------------
// XP sp1 and earlier infinite
// earlier than vista 8
// vista sp1 and earlier 5
// vista sp2 and later infinite
// windows release version number
// ----------------------------------- --------------
// Windows 7 6.1
// Windows Server 2008 R2 6.1
// Windows Server 2008 6.0
// Windows Vista 6.0
// Windows Server 2003 R2 5.2
// Windows Home Server 5.2
// Windows Server 2003 5.2
// Windows XP Professional x64 Edition 5.2
// Windows XP 5.1
// Windows 2000 5.0
OSVERSIONINFOEX osv;
memset(&osv, 0, sizeof(osv));
osv.dwOSVersionInfoSize = sizeof(osv);
GetVersionEx((OSVERSIONINFO*)&osv);
// the low two bytes of windows_version is the actual
// version.
boost::uint32_t windows_version
= ((osv.dwMajorVersion & 0xff) << 16)
| ((osv.dwMinorVersion & 0xff) << 8)
| (osv.wServicePackMajor & 0xff);
// this is the format of windows_version
// xx xx xx
// | | |
// | | + service pack version
// | + minor version
// + major version
// the least significant byte is the major version
// and the most significant one is the minor version
if (windows_version >= 0x060100)
{
// windows 7 and up doesn't have a half-open limit
m_half_open.limit(0);
}
else if (windows_version >= 0x060002)
{
// on vista SP 2 and up, there's no limit
m_half_open.limit(0);
}
else if (windows_version >= 0x060000)
{
// on vista the limit is 5 (in home edition)
m_half_open.limit(4);
}
else if (windows_version >= 0x050102)
{
// on XP SP2 the limit is 10
m_half_open.limit(9);
}
else
{
// before XP SP2, there was no limit
m_half_open.limit(0);
}
m_settings.set_int(settings_pack::half_open_limit, m_half_open.limit());
#endif
m_global_class = m_classes.new_peer_class("global");
m_tcp_peer_class = m_classes.new_peer_class("tcp");
@ -736,7 +662,6 @@ namespace aux {
session_log(" generated peer ID: %s", m_peer_id.to_string().c_str());
#endif
update_half_open();
#ifndef TORRENT_NO_DEPRECATE
update_local_download_rate();
update_local_upload_rate();
@ -1548,12 +1473,6 @@ namespace aux {
#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
session_log(" aborting all connections (%d)", m_connections.size());
#endif
m_half_open.close();
#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
session_log(" connection queue: %d", m_half_open.size());
#endif
// abort all connections
while (!m_connections.empty())
{
@ -1564,14 +1483,6 @@ namespace aux {
TORRENT_ASSERT_VAL(conn == int(m_connections.size()) + 1, conn);
}
#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
session_log(" connection queue: %d", m_half_open.size());
#endif
#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
session_log(" shutting down connection queue");
#endif
m_download_rate.close();
m_upload_rate.close();
@ -1724,7 +1635,7 @@ namespace aux {
req.ssl_ctx = &m_ssl_ctx;
#endif
if (is_any(req.bind_ip)) req.bind_ip = m_listen_interface.address();
m_tracker_manager.queue_request(get_io_service(), m_half_open, req
m_tracker_manager.queue_request(get_io_service(), req
, login, c);
}
@ -3082,13 +2993,11 @@ retry:
// with the connection queue, and should be cancelled
// TODO: should this function take a shared_ptr instead?
void session_impl::close_connection(peer_connection* p
, error_code const& ec, bool cancel_with_cq)
, error_code const& ec)
{
TORRENT_ASSERT(is_single_thread());
boost::shared_ptr<peer_connection> sp(p->self());
if (cancel_with_cq) m_half_open.cancel(p);
// someone else is holding a reference, it's important that
// it's destructed from the network thread. Make sure the
// last reference is held by the network thread.
@ -3956,7 +3865,6 @@ retry:
STAT_COUNTER(num_downloading_torrents);
STAT_COUNTER(num_seeding_torrents);
STAT_COUNTER(num_peers_connected);
STAT_COUNTER(num_peers_half_open);
STAT_COUNTER(disk_blocks_in_use);
STAT_LOGL(d, num_peers); // total number of known peers
STAT_LOG(d, m_peer_allocator.live_allocations());
@ -4766,15 +4674,9 @@ retry:
// zero connections speeds are allowed, we just won't make any connections
if (max_connections <= 0) return;
// this loop will "hand out" max(connection_speed
// , half_open.free_slots()) to the torrents, in a
// round robin fashion, so that every torrent is
// equally likely to connect to a peer
int free_slots = m_half_open.free_slots();
// if we don't have any free slots, return
if (free_slots <= -m_half_open.limit()) return;
// this loop will "hand out" connection_speed to the torrents, in a round
// robin fashion, so that every torrent is equally likely to connect to a
// peer
// boost connections are connections made by torrent connection
// boost, which are done immediately on a tracker response. These
@ -4796,8 +4698,8 @@ retry:
// TODO: use a lower limit than m_settings.connections_limit
// to allocate the to 10% or so of connection slots for incoming
// connections
int limit = (std::min)(m_settings.get_int(settings_pack::connections_limit)
- num_connections(), free_slots);
int limit = m_settings.get_int(settings_pack::connections_limit)
- num_connections();
// this logic is here to smooth out the number of new connection
// attempts over time, to prevent connecting a large number of
@ -4870,7 +4772,6 @@ retry:
if (t->try_connect_peer())
{
--max_connections;
--free_slots;
steps_since_last_connect = 0;
m_stats_counters.inc_stats_counter(counters::connection_attempts);
}
@ -4888,7 +4789,6 @@ retry:
++steps_since_last_connect;
// if there are no more free connection slots, abort
if (free_slots <= -m_half_open.limit()) break;
if (max_connections == 0) return;
// there are no more torrents that want peers
if (want_peers_download.empty() && want_peers_finished.empty()) break;
@ -6730,9 +6630,8 @@ retry:
{
sleep(1000);
++counter;
printf("\x1b[2J\x1b[0;0H\x1b[33m==== Waiting to shut down: %d ==== conn-queue: %d connecting: %d timeout (next: %f max: %f)\x1b[0m\n\n"
, counter, m_half_open.size(), m_half_open.num_connecting(), m_half_open.next_timeout()
, m_half_open.max_timeout());
printf("\x1b[2J\x1b[0;0H\x1b[33m==== Waiting to shut down: %d ==== \x1b[0m\n\n"
, counter);
}
async_dec_threads();
@ -6749,7 +6648,6 @@ retry:
TORRENT_ASSERT(m_torrents.empty());
TORRENT_ASSERT(m_connections.empty());
TORRENT_ASSERT(m_connections.empty());
#ifdef TORRENT_REQUEST_LOGGING
if (m_request_log) fclose(m_request_log);
@ -6798,11 +6696,6 @@ retry:
return m_settings.get_int(settings_pack::unchoke_slots_limit);
}
int session_impl::max_half_open_connections() const
{
return m_settings.get_int(settings_pack::half_open_limit);
}
void session_impl::set_local_download_rate_limit(int bytes_per_second)
{
settings_pack* p = new settings_pack;
@ -6831,13 +6724,6 @@ retry:
apply_settings_pack(p);
}
void session_impl::set_max_half_open_connections(int limit)
{
settings_pack* p = new settings_pack;
p->set_int(settings_pack::half_open_limit, limit);
apply_settings_pack(p);
}
void session_impl::set_max_connections(int limit)
{
settings_pack* p = new settings_pack;
@ -7152,13 +7038,6 @@ retry:
m_listen_sockets.clear();
}
void session_impl::update_half_open()
{
if (m_settings.get_int(settings_pack::half_open_limit) <= 0)
m_settings.set_int(settings_pack::half_open_limit, (std::numeric_limits<int>::max)());
m_half_open.limit(m_settings.get_int(settings_pack::half_open_limit));
}
#ifndef TORRENT_NO_DEPRECATE
void session_impl::update_local_download_rate()
{
@ -7404,7 +7283,6 @@ retry:
// the upnp constructor may fail and call the callbacks
upnp* u = new (std::nothrow) upnp(m_io_service
, m_half_open
, m_listen_interface.address()
, m_settings.get_str(settings_pack::user_agent)
, boost::bind(&session_impl::on_port_mapping

View File

@ -293,7 +293,7 @@ namespace libtorrent
DEPRECATED_SET(local_download_rate_limit, 0, &session_impl::update_local_download_rate),
SET(dht_upload_rate_limit, 4000, &session_impl::update_dht_upload_rate_limit),
SET(unchoke_slots_limit, 8, &session_impl::update_choking_algorithm),
SET(half_open_limit, 0, &session_impl::update_half_open),
DEPRECATED_SET(half_open_limit, 0, 0),
SET(connections_limit, 200, &session_impl::update_connections_limit),
SET(connections_slack, 10, 0),
SET(utp_target_delay, 100, 0),

View File

@ -786,7 +786,7 @@ namespace libtorrent
TORRENT_ASSERT(!m_url.empty());
TORRENT_ASSERT(!m_torrent_file->is_valid());
boost::shared_ptr<http_connection> conn(
new http_connection(m_ses.get_io_service(), m_ses.half_open()
new http_connection(m_ses.get_io_service()
, m_ses.get_resolver()
, boost::bind(&torrent::on_torrent_download, shared_from_this()
, _1, _2, _3, _4)
@ -3357,10 +3357,9 @@ namespace libtorrent
// this is the first tracker response for this torrent
// instead of waiting one second for session_impl::on_tick()
// to be called, connect to a few peers immediately
int conns = (std::min)((std::min)(
int conns = (std::min)(
m_ses.settings().get_int(settings_pack::torrent_connect_boost)
, m_ses.settings().get_int(settings_pack::connections_limit) - m_ses.num_connections())
, m_ses.half_open().free_slots());
, m_ses.settings().get_int(settings_pack::connections_limit) - m_ses.num_connections());
if (conns > 0) m_need_connect_boost = false;
@ -6328,10 +6327,6 @@ namespace libtorrent
if (c->is_disconnecting()) return;
c->m_queued_for_connection = true;
m_ses.half_open().enqueue(c.get()
, seconds(settings().get_int(settings_pack::peer_connect_timeout)));
#if defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING
debug_log("START queue peer [%p] (%d)", c.get(), num_peers());
#endif
@ -6371,7 +6366,6 @@ namespace libtorrent
|| is_local(p->remote().address())
|| p->has_country()
|| p->is_connecting()
|| p->is_queued()
|| p->in_handshake()
|| p->remote().address().is_v6()) return;
@ -7191,9 +7185,6 @@ namespace libtorrent
}
#endif
// extend connect timeout by this many seconds
int timeout_extend = 0;
TORRENT_ASSERT(want_peers() || ignore_limit);
TORRENT_ASSERT(m_ses.num_connections()
< m_ses.settings().get_int(settings_pack::connections_limit) || ignore_limit);
@ -7224,8 +7215,6 @@ namespace libtorrent
s->get<i2p_stream>()->set_destination(static_cast<i2p_peer*>(peerinfo)->destination);
s->get<i2p_stream>()->set_command(i2p_stream::cmd_connect);
s->get<i2p_stream>()->set_session_id(m_ses.i2p_session());
// i2p setups are slow
timeout_extend = 20;
}
else
#endif
@ -7249,12 +7238,11 @@ namespace libtorrent
if (is_ssl_torrent() && m_ses.settings().get_int(settings_pack::ssl_listen) != 0)
{
userdata = m_ssl_ctx.get();
// SSL handshakes are slow
timeout_extend = 10;
}
#endif
bool ret = instantiate_connection(m_ses.get_io_service(), m_ses.proxy(), *s, userdata, sm, true);
bool ret = instantiate_connection(m_ses.get_io_service()
, m_ses.proxy(), *s, userdata, sm, true);
(void)ret;
TORRENT_ASSERT(ret);
@ -7297,55 +7285,43 @@ namespace libtorrent
boost::shared_ptr<peer_connection> c = boost::make_shared<bt_peer_connection>(
boost::cref(pack), m_ses.get_peer_id());
#if TORRENT_USE_ASSERTS
c->m_in_constructor = false;
#endif
c->add_stat(size_type(peerinfo->prev_amount_download) << 10
, size_type(peerinfo->prev_amount_upload) << 10);
peerinfo->prev_amount_download = 0;
peerinfo->prev_amount_upload = 0;
#ifndef TORRENT_DISABLE_EXTENSIONS
for (extension_list_t::iterator i = m_extensions.begin()
, end(m_extensions.end()); i != end; ++i)
{
TORRENT_TRY {
boost::shared_ptr<peer_plugin> pp((*i)->new_connection(c.get()));
if (pp) c->add_extension(pp);
} TORRENT_CATCH (std::exception&) {}
}
#endif
// add the newly connected peer to this torrent's peer list
sorted_insert(m_connections, boost::get_pointer(c));
m_ses.insert_peer(c);
need_policy();
m_policy->set_connection(peerinfo, c.get());
if (peerinfo->seed)
{
TORRENT_ASSERT(m_num_seeds < 0xffff);
++m_num_seeds;
}
update_want_peers();
update_want_tick();
c->start();
if (c->is_disconnecting()) return false;
int timeout = settings().get_int(settings_pack::peer_connect_timeout);
if (peerinfo) timeout += 3 * peerinfo->failcount;
timeout += timeout_extend;
TORRENT_TRY
{
c->m_queued_for_connection = true;
m_ses.half_open().enqueue(c.get()
, seconds(timeout));
#if defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING
debug_log("START queue peer [%p] (%d)", c.get(), num_peers());
#if TORRENT_USE_ASSERTS
c->m_in_constructor = false;
#endif
c->add_stat(size_type(peerinfo->prev_amount_download) << 10
, size_type(peerinfo->prev_amount_upload) << 10);
peerinfo->prev_amount_download = 0;
peerinfo->prev_amount_upload = 0;
#ifndef TORRENT_DISABLE_EXTENSIONS
for (extension_list_t::iterator i = m_extensions.begin()
, end(m_extensions.end()); i != end; ++i)
{
TORRENT_TRY {
boost::shared_ptr<peer_plugin> pp((*i)->new_connection(c.get()));
if (pp) c->add_extension(pp);
} TORRENT_CATCH (std::exception&) {}
}
#endif
// add the newly connected peer to this torrent's peer list
sorted_insert(m_connections, boost::get_pointer(c));
m_ses.insert_peer(c);
need_policy();
m_policy->set_connection(peerinfo, c.get());
if (peerinfo->seed)
{
TORRENT_ASSERT(m_num_seeds < 0xffff);
++m_num_seeds;
}
update_want_peers();
update_want_tick();
c->start();
if (c->is_disconnecting()) return false;
}
TORRENT_CATCH (std::exception&)
{

View File

@ -223,7 +223,6 @@ namespace libtorrent
void tracker_manager::queue_request(
io_service& ios
, connection_queue& cc
, tracker_request req
, std::string const& auth
, boost::weak_ptr<request_callback> c)
@ -250,7 +249,7 @@ namespace libtorrent
#endif
{
con = new http_tracker_connection(
ios, cc, *this, req, c
ios, *this, req, c
, m_ses, auth
#if TORRENT_USE_I2P
, &m_ses.m_i2p_conn
@ -260,7 +259,7 @@ namespace libtorrent
else if (protocol == "udp")
{
con = new udp_tracker_connection(
ios, cc, *this, req , c, m_ses, m_ses.proxy());
ios, *this, req , c, m_ses, m_ses.proxy());
}
else
{

View File

@ -34,7 +34,6 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/assert.hpp" // for print_backtrace
#include "libtorrent/socket.hpp"
#include "libtorrent/udp_socket.hpp"
#include "libtorrent/connection_queue.hpp"
#include "libtorrent/socket_io.hpp"
#include "libtorrent/error.hpp"
#include "libtorrent/string_util.hpp" // for allocate_string_copy
@ -56,10 +55,10 @@ POSSIBILITY OF SUCH DAMAGE.
using namespace libtorrent;
udp_socket::udp_socket(asio::io_service& ios
, connection_queue& cc)
udp_socket::udp_socket(asio::io_service& ios)
: m_observers_locked(false)
, m_ipv4_sock(ios)
, m_timer(ios)
, m_buf_size(0)
, m_new_buf_size(0)
, m_buf(0)
@ -72,8 +71,6 @@ udp_socket::udp_socket(asio::io_service& ios
, m_v6_outstanding(0)
#endif
, m_socks5_sock(ios)
, m_connection_ticket(-1)
, m_cc(cc)
, m_resolver(ios)
, m_queue_packets(false)
, m_tunnel_packets(false)
@ -89,7 +86,6 @@ udp_socket::udp_socket(asio::io_service& ios
m_magic = 0x1337;
m_started = false;
m_outstanding_when_aborted = -1;
m_outstanding_connect_queue = 0;
m_outstanding_connect = 0;
m_outstanding_timeout = 0;
m_outstanding_resolve = 0;
@ -636,36 +632,12 @@ void udp_socket::close()
m_socks5_sock.close(ec);
TORRENT_ASSERT_VAL(!ec || ec == error::bad_descriptor, ec);
m_resolver.cancel();
m_timer.cancel();
m_abort = true;
#if TORRENT_USE_ASSERTS
m_outstanding_when_aborted = num_outstanding();
#endif
if (m_connection_ticket >= 0)
{
if (m_cc.done(m_connection_ticket))
m_connection_ticket = -1;
// we just called done, which means on_timeout
// won't be called. Decrement the outstanding
// ops counter for that
#if TORRENT_USE_ASSERTS
TORRENT_ASSERT(m_outstanding_timeout > 0);
--m_outstanding_timeout;
print_backtrace(timeout_stack, sizeof(timeout_stack));
#endif
TORRENT_ASSERT(m_outstanding_ops > 0);
--m_outstanding_ops;
TORRENT_ASSERT(m_outstanding_ops == m_outstanding_connect
+ m_outstanding_timeout
+ m_outstanding_resolve
+ m_outstanding_connect_queue
+ m_outstanding_socks);
if (m_abort) return;
}
}
void udp_socket::set_buf_size(int s)
@ -811,7 +783,6 @@ void udp_socket::on_name_lookup(error_code const& e, tcp::resolver::iterator i)
TORRENT_ASSERT(m_outstanding_ops == m_outstanding_connect
+ m_outstanding_timeout
+ m_outstanding_resolve
+ m_outstanding_connect_queue
+ m_outstanding_socks);
if (m_abort) return;
@ -842,104 +813,10 @@ void udp_socket::on_name_lookup(error_code const& e, tcp::resolver::iterator i)
m_proxy_addr.address(i->endpoint().address());
m_proxy_addr.port(i->endpoint().port());
// on_connect may be called from within this thread
// the semantics for on_connect and on_timeout is
// a bit complicated. See comments in connection_queue.hpp
// for more details. This semantic determines how and
// when m_outstanding_ops may be decremented
// To simplyfy this, it's probably a good idea to
// merge on_connect and on_timeout to a single function
// on_timeout may be called before on_connected
// so increment the outstanding ops
// it may also not be called in case we call
// connection_queue::done first, so be sure to
// decrement if that happens
m_outstanding_ops += 2;
#if TORRENT_USE_ASSERTS
++m_outstanding_timeout;
++m_outstanding_connect_queue;
#endif
m_cc.enqueue(this, seconds(10));
}
void udp_socket::on_connect_timeout()
{
#if TORRENT_USE_ASSERTS
TORRENT_ASSERT(m_outstanding_timeout > 0);
--m_outstanding_timeout;
print_backtrace(timeout_stack, sizeof(timeout_stack));
#endif
TORRENT_ASSERT(m_outstanding_ops > 0);
--m_outstanding_ops;
TORRENT_ASSERT(m_outstanding_ops == m_outstanding_connect
+ m_outstanding_timeout
+ m_outstanding_resolve
+ m_outstanding_connect_queue
+ m_outstanding_socks);
m_queue_packets = false;
if (m_abort) return;
CHECK_MAGIC;
TORRENT_ASSERT(is_single_thread());
error_code ec;
m_socks5_sock.close(ec);
TORRENT_ASSERT(m_cc.done(m_connection_ticket) == false);
m_connection_ticket = -1;
}
void udp_socket::on_allow_connect(int ticket)
{
TORRENT_ASSERT(is_single_thread());
#if TORRENT_USE_ASSERTS
TORRENT_ASSERT(m_outstanding_connect_queue > 0);
--m_outstanding_connect_queue;
#endif
TORRENT_ASSERT(m_outstanding_ops > 0);
--m_outstanding_ops;
TORRENT_ASSERT(m_outstanding_ops == m_outstanding_connect
+ m_outstanding_timeout
+ m_outstanding_resolve
+ m_outstanding_connect_queue
+ m_outstanding_socks);
CHECK_MAGIC;
if (ticket == -1)
{
#if TORRENT_USE_ASSERTS
TORRENT_ASSERT(m_outstanding_timeout > 0);
--m_outstanding_timeout;
print_backtrace(timeout_stack, sizeof(timeout_stack));
#endif
TORRENT_ASSERT(m_outstanding_ops > 0);
--m_outstanding_ops;
TORRENT_ASSERT(m_outstanding_ops == m_outstanding_connect
+ m_outstanding_timeout
+ m_outstanding_resolve
+ m_outstanding_connect_queue
+ m_outstanding_socks);
close();
return;
}
if (m_abort) return;
if (is_closed()) return;
if (m_connection_ticket != -1)
{
// there's already an outstanding connect. Cancel it.
m_socks5_sock.close();
m_connection_ticket = -1;
}
#if defined TORRENT_ASIO_DEBUGGING
add_outstanding_async("udp_socket::on_connected");
#endif
m_connection_ticket = ticket;
error_code ec;
m_socks5_sock.open(m_proxy_addr.address().is_v4()?tcp::v4():tcp::v6(), ec);
@ -952,14 +829,55 @@ void udp_socket::on_allow_connect(int ticket)
++m_outstanding_connect;
#endif
m_socks5_sock.async_connect(tcp::endpoint(m_proxy_addr.address(), m_proxy_addr.port())
, boost::bind(&udp_socket::on_connected, this, _1, ticket));
, boost::bind(&udp_socket::on_connected, this, _1));
++m_outstanding_ops;
#if TORRENT_USE_ASSERTS
++m_outstanding_timeout;
#endif
#if defined TORRENT_ASIO_DEBUGGING
add_outstanding_async("udp_socket::on_connect_timeout");
#endif
m_timer.expires_from_now(seconds(10));
m_timer.async_wait(boost::bind(&udp_socket::on_connect_timeout
, this, _1));
}
void udp_socket::on_connected(error_code const& e, int ticket)
void udp_socket::on_connect_timeout(error_code const& ec)
{
#if TORRENT_USE_ASSERTS
TORRENT_ASSERT(m_outstanding_timeout > 0);
--m_outstanding_timeout;
#endif
TORRENT_ASSERT(m_outstanding_ops > 0);
--m_outstanding_ops;
TORRENT_ASSERT(m_outstanding_ops == m_outstanding_connect
+ m_outstanding_timeout
+ m_outstanding_resolve
+ m_outstanding_socks);
if (ec == boost::asio::error::operation_aborted) return;
m_queue_packets = false;
if (m_abort) return;
CHECK_MAGIC;
TORRENT_ASSERT(is_single_thread());
error_code ignore;
m_socks5_sock.close(ignore);
}
void udp_socket::on_connected(error_code const& e)
{
#if defined TORRENT_ASIO_DEBUGGING
complete_async("udp_socket::on_connected");
#endif
TORRENT_ASSERT(is_single_thread());
#if TORRENT_USE_ASSERTS
TORRENT_ASSERT(m_outstanding_connect > 0);
--m_outstanding_connect;
@ -969,42 +887,13 @@ void udp_socket::on_connected(error_code const& e, int ticket)
TORRENT_ASSERT(m_outstanding_ops == m_outstanding_connect
+ m_outstanding_timeout
+ m_outstanding_resolve
+ m_outstanding_connect_queue
+ m_outstanding_socks);
CHECK_MAGIC;
TORRENT_ASSERT(is_single_thread());
if (m_cc.done(ticket))
{
// if the tickets mismatch, another connection attempt
// was initiated while waiting for this one to complete.
if (ticket == m_connection_ticket)
m_connection_ticket = -1;
}
// we just called done, which means on_timeout
// won't be called. Decrement the outstanding
// ops counter for that
#if TORRENT_USE_ASSERTS
TORRENT_ASSERT(m_outstanding_timeout > 0);
--m_outstanding_timeout;
print_backtrace(timeout_stack, sizeof(timeout_stack));
#endif
TORRENT_ASSERT(m_outstanding_ops > 0);
--m_outstanding_ops;
TORRENT_ASSERT(m_outstanding_ops == m_outstanding_connect
+ m_outstanding_timeout
+ m_outstanding_resolve
+ m_outstanding_connect_queue
+ m_outstanding_socks);
m_timer.cancel();
if (e == asio::error::operation_aborted) return;
// if ticket != m_connection_ticket, it means m_connection_ticket
// will not have been reset, and it means we are still waiting
// for a connection attempt.
if (m_connection_ticket != -1) return;
if (m_abort) return;
if (e)
@ -1056,7 +945,6 @@ void udp_socket::handshake1(error_code const& e)
TORRENT_ASSERT(m_outstanding_ops == m_outstanding_connect
+ m_outstanding_timeout
+ m_outstanding_resolve
+ m_outstanding_connect_queue
+ m_outstanding_socks);
if (m_abort) return;
CHECK_MAGIC;
@ -1093,7 +981,6 @@ void udp_socket::handshake2(error_code const& e)
TORRENT_ASSERT(m_outstanding_ops == m_outstanding_connect
+ m_outstanding_timeout
+ m_outstanding_resolve
+ m_outstanding_connect_queue
+ m_outstanding_socks);
if (m_abort) return;
CHECK_MAGIC;
@ -1175,7 +1062,6 @@ void udp_socket::handshake3(error_code const& e)
TORRENT_ASSERT(m_outstanding_ops == m_outstanding_connect
+ m_outstanding_timeout
+ m_outstanding_resolve
+ m_outstanding_connect_queue
+ m_outstanding_socks);
if (m_abort) return;
CHECK_MAGIC;
@ -1212,7 +1098,6 @@ void udp_socket::handshake4(error_code const& e)
TORRENT_ASSERT(m_outstanding_ops == m_outstanding_connect
+ m_outstanding_timeout
+ m_outstanding_resolve
+ m_outstanding_connect_queue
+ m_outstanding_socks);
if (m_abort) return;
CHECK_MAGIC;
@ -1279,7 +1164,6 @@ void udp_socket::connect1(error_code const& e)
TORRENT_ASSERT(m_outstanding_ops == m_outstanding_connect
+ m_outstanding_timeout
+ m_outstanding_resolve
+ m_outstanding_connect_queue
+ m_outstanding_socks);
if (m_abort) return;
CHECK_MAGIC;
@ -1316,7 +1200,6 @@ void udp_socket::connect2(error_code const& e)
TORRENT_ASSERT(m_outstanding_ops == m_outstanding_connect
+ m_outstanding_timeout
+ m_outstanding_resolve
+ m_outstanding_connect_queue
+ m_outstanding_socks);
if (m_abort)
@ -1388,7 +1271,6 @@ void udp_socket::hung_up(error_code const& e)
TORRENT_ASSERT(m_outstanding_ops == m_outstanding_connect
+ m_outstanding_timeout
+ m_outstanding_resolve
+ m_outstanding_connect_queue
+ m_outstanding_socks);
if (m_abort) return;
CHECK_MAGIC;
@ -1423,9 +1305,8 @@ void udp_socket::drain_queue()
}
}
rate_limited_udp_socket::rate_limited_udp_socket(io_service& ios
, connection_queue& cc)
: udp_socket(ios, cc)
rate_limited_udp_socket::rate_limited_udp_socket(io_service& ios)
: udp_socket(ios)
, m_rate_limit(8000)
, m_quota(8000)
, m_last_tick(time_now())

View File

@ -64,7 +64,6 @@ namespace libtorrent
// TODO: 2 it would be nice to not have a dependency on session_impl here
udp_tracker_connection::udp_tracker_connection(
io_service& ios
, connection_queue& cc
, tracker_manager& man
, tracker_request const& req
, boost::weak_ptr<request_callback> c

View File

@ -36,7 +36,6 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/io.hpp"
#include "libtorrent/parse_url.hpp"
#include "libtorrent/xml_parse.hpp"
#include "libtorrent/connection_queue.hpp"
#include "libtorrent/enum_net.hpp"
#include "libtorrent/escape_string.hpp"
#include "libtorrent/random.hpp"
@ -70,7 +69,7 @@ namespace upnp_errors
static error_code ec;
// TODO: listen_interface is not used. It's meant to bind the broadcast socket
upnp::upnp(io_service& ios, connection_queue& cc
upnp::upnp(io_service& ios
, address const& listen_interface, std::string const& user_agent
, portmap_callback_t const& cb, log_callback_t const& lcb
, bool ignore_nonrouters, void* state)
@ -88,7 +87,6 @@ upnp::upnp(io_service& ios, connection_queue& cc
, m_disabled(false)
, m_closing(false)
, m_ignore_non_routers(ignore_nonrouters)
, m_cc(cc)
, m_last_if_update(min_time())
{
TORRENT_ASSERT(cb);
@ -312,7 +310,7 @@ void upnp::resend_request(error_code const& ec)
log(msg, l);
if (d.upnp_connection) d.upnp_connection->close();
d.upnp_connection.reset(new http_connection(m_io_service
, m_cc, m_resolver
, m_resolver
, boost::bind(&upnp::on_upnp_xml, self(), _1, _2
, boost::ref(d), _5)));
d.upnp_connection->get(d.url, seconds(30), 1);
@ -633,7 +631,7 @@ void upnp::try_map_upnp(mutex::scoped_lock& l, bool timer)
if (d.upnp_connection) d.upnp_connection->close();
d.upnp_connection.reset(new http_connection(m_io_service
, m_cc, m_resolver
, m_resolver
, boost::bind(&upnp::on_upnp_xml, self(), _1, _2
, boost::ref(d), _5)));
d.upnp_connection->get(d.url, seconds(30), 1);
@ -777,7 +775,7 @@ void upnp::update_map(rootdevice& d, int i, mutex::scoped_lock& l)
if (d.upnp_connection) d.upnp_connection->close();
d.upnp_connection.reset(new http_connection(m_io_service
, m_cc, m_resolver
, m_resolver
, boost::bind(&upnp::on_upnp_map_response, self(), _1, _2
, boost::ref(d), i, _5), true, default_max_bottled_buffer_size
, boost::bind(&upnp::create_port_mapping, self(), _1, boost::ref(d), i)));
@ -789,7 +787,7 @@ void upnp::update_map(rootdevice& d, int i, mutex::scoped_lock& l)
{
if (d.upnp_connection) d.upnp_connection->close();
d.upnp_connection.reset(new http_connection(m_io_service
, m_cc, m_resolver
, m_resolver
, boost::bind(&upnp::on_upnp_unmap_response, self(), _1, _2
, boost::ref(d), i, _5), true, default_max_bottled_buffer_size
, boost::bind(&upnp::delete_port_mapping, self(), boost::ref(d), i)));
@ -1040,7 +1038,7 @@ void upnp::on_upnp_xml(error_code const& e
}
d.upnp_connection.reset(new http_connection(m_io_service
, m_cc, m_resolver
, m_resolver
, boost::bind(&upnp::on_upnp_get_ip_address_response, self(), _1, _2
, boost::ref(d), _5), true, default_max_bottled_buffer_size
, boost::bind(&upnp::get_ip_address, self(), boost::ref(d))));

View File

@ -173,8 +173,7 @@ namespace libtorrent
if (is_choked()) p.flags |= peer_info::choked;
if (!is_connecting() && m_server_string.empty())
p.flags |= peer_info::handshake;
if (is_connecting() && !is_queued()) p.flags |= peer_info::connecting;
if (is_queued()) p.flags |= peer_info::queued;
if (is_connecting()) p.flags |= peer_info::connecting;
p.client = m_server_string;
}

View File

@ -95,7 +95,6 @@ test-suite libtorrent :
[ run test_utf8.cpp ]
[ run test_gzip.cpp ]
[ run test_bitfield.cpp ]
[ run test_connection_queue.cpp ]
[ run test_recheck.cpp ]
[ run test_stat_cache.cpp ]
[ run test_part_file.cpp ]

View File

@ -2,7 +2,6 @@ AUTOMAKE_OPTIONS = subdir-objects
test_programs = \
test_bitfield \
test_connection_queue \
test_torrent_info \
test_recheck \
test_stat_cache \
@ -132,7 +131,6 @@ libtest_la_SOURCES = main.cpp \
web_seed_suite.cpp
test_bitfield_SOURCES = test_bitfield.cpp
test_connection_queue_SOURCES = test_connection_queue.cpp
test_torrent_info_SOURCES = test_torrent_info.cpp
test_recheck_SOURCES = test_recheck.cpp
test_stat_cache_SOURCES = test_stat_cache.cpp

View File

@ -1,144 +0,0 @@
/*
Copyright (c) 2012, Arvid Norberg
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in
the documentation and/or other materials provided with the distribution.
* Neither the name of the author nor the names of its
contributors may be used to endorse or promote products derived
from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
*/
#include "test.hpp"
#include "libtorrent/connection_queue.hpp"
#include "libtorrent/connection_interface.hpp"
#include <boost/bind.hpp>
using namespace libtorrent;
int concurrent_connections = 0;
int num_queued = 0;
enum test_type_t
{
half_open_test,
timeout_test,
priority_test
};
char const* test_name[] =
{
"half-open", "timeout", "priority"
};
struct test_connection : libtorrent::connection_interface
{
test_connection(io_service& ios, connection_queue& cq, int test_type)
: m_ios(ios), m_cq(cq), m_ticket(-1), m_type(test_type), m_done(false)
{
++num_queued;
m_cq.enqueue(this, milliseconds(100), 0);
}
io_service& m_ios;
connection_queue& m_cq;
int m_ticket;
int m_type;
bool m_done;
void on_allow_connect(int ticket)
{
fprintf(stderr, "%s: [%p] on_allow_connect(%d)\n", test_name[m_type], this, ticket);
--num_queued;
if (ticket < 0) return;
m_ticket = ticket;
if (m_type != timeout_test)
m_ios.post(boost::bind(&test_connection::on_connected, this));
++concurrent_connections;
TEST_CHECK(concurrent_connections <= 5);
}
void on_connect_timeout()
{
fprintf(stderr, "%s: [%p] on_connect_timeout\n", test_name[m_type], this);
TEST_CHECK(m_type == timeout_test);
TEST_CHECK(concurrent_connections <= 5);
--concurrent_connections;
if (m_type == timeout_test) m_done = true;
}
void on_connected()
{
fprintf(stderr, "%s: [%p] on_connected\n", test_name[m_type], this);
TEST_CHECK(m_type != timeout_test);
TEST_CHECK(concurrent_connections <= 5);
--concurrent_connections;
m_cq.done(m_ticket);
if (m_type == half_open_test) m_done = true;
}
virtual ~test_connection()
{
if (!m_done)
{
fprintf(stderr, "%s: failed\n", test_name[m_type]);
TEST_CHECK(m_done);
}
}
};
int test_main()
{
io_service ios;
connection_queue cq(ios);
// test half-open limit
cq.limit(5);
std::vector<test_connection*> conns;
for (int i = 0; i < 20; ++i)
conns.push_back(new test_connection(ios, cq, half_open_test));
ios.run();
TEST_CHECK(concurrent_connections == 0);
TEST_CHECK(num_queued == 0);
ios.reset();
for (int i = 0; i < 20; ++i)
delete conns[i];
conns.clear();
for (int i = 0; i < 5; ++i)
conns.push_back(new test_connection(ios, cq, timeout_test));
ios.run();
for (int i = 0; i < 5; ++i)
delete conns[i];
return 0;
}

View File

@ -33,7 +33,6 @@ POSSIBILITY OF SUCH DAMAGE.
#include "test.hpp"
#include "libtorrent/socket.hpp"
#include "libtorrent/socket_io.hpp" // print_endpoint
#include "libtorrent/connection_queue.hpp"
#include "libtorrent/http_connection.hpp"
#include "libtorrent/resolver.hpp"
#include "setup_transfer.hpp"
@ -45,7 +44,6 @@ POSSIBILITY OF SUCH DAMAGE.
using namespace libtorrent;
io_service ios;
connection_queue cq(ios);
resolver res(ios);
int connect_handler_called = 0;
@ -118,7 +116,7 @@ void run_test(std::string const& url, int size, int status, int connected
<< " connected: " << connected
<< " error: " << (ec?ec->message():"no error") << std::endl;
boost::shared_ptr<http_connection> h(new http_connection(ios, cq
boost::shared_ptr<http_connection> h(new http_connection(ios
, res, &::http_handler, true, 1024*1024, &::http_connect_handler));
h->get(url, seconds(1), 0, &ps);
ios.reset();

View File

@ -99,7 +99,9 @@ session_proxy test_proxy(settings_pack::proxy_type_t proxy_type, int flags)
sett.set_int(settings_pack::stop_tracker_timeout, 2);
sett.set_int(settings_pack::tracker_completion_timeout, 2);
sett.set_int(settings_pack::tracker_receive_timeout, 2);
#ifndef TORRENT_NO_DEPRECATE
sett.set_int(settings_pack::half_open_limit, 2);
#endif
sett.set_bool(settings_pack::announce_to_all_trackers, true);
sett.set_bool(settings_pack::announce_to_all_tiers, true);
sett.set_bool(settings_pack::force_proxy, flags & force_proxy_mode);

View File

@ -73,7 +73,9 @@ int test_main()
TEST_NAME(contiguous_recv_buffer);
TEST_NAME(choking_algorithm);
TEST_NAME(seeding_piece_quota);
#ifndef TORRENT_NO_DEPRECATE
TEST_NAME(half_open_limit);
#endif
TEST_NAME(peer_turnover_interval);
TEST_NAME(mmap_cache);

View File

@ -218,7 +218,9 @@ int test_main()
lt::session* s = new lt::session(fingerprint("LT", 0, 1, 0, 0), std::make_pair(48875, 49800), "0.0.0.0", 0, alert_mask);
settings_pack pack;
#ifndef TORRENT_NO_DEPRECATE
pack.set_int(settings_pack::half_open_limit, 1);
#endif
pack.set_bool(settings_pack::announce_to_all_trackers, true);
pack.set_bool(settings_pack::announce_to_all_tiers, true);
s->apply_settings(pack);
@ -273,7 +275,9 @@ int test_main()
s = new lt::session(fingerprint("LT", 0, 1, 0, 0), std::make_pair(39775, 39800), "0.0.0.0", 0, alert_mask);
pack.clear();
#ifndef TORRENT_NO_DEPRECATE
pack.set_int(settings_pack::half_open_limit, 1);
#endif
pack.set_bool(settings_pack::announce_to_all_trackers, true);
pack.set_bool(settings_pack::announce_to_all_tiers, false);
pack.set_int(settings_pack::tracker_completion_timeout, 2);

View File

@ -33,7 +33,6 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/upnp.hpp"
#include "libtorrent/socket.hpp"
#include "libtorrent/socket_io.hpp" // print_endpoint
#include "libtorrent/connection_queue.hpp"
#include "test.hpp"
#include "setup_transfer.hpp"
#include <fstream>
@ -152,8 +151,7 @@ int run_upnp_test(char const* root_filename, char const* router_model, char cons
std::string user_agent = "test agent";
connection_queue cc(ios);
boost::intrusive_ptr<upnp> upnp_handler = new upnp(ios, cc, address_v4::from_string("127.0.0.1")
boost::intrusive_ptr<upnp> upnp_handler = new upnp(ios, address_v4::from_string("127.0.0.1")
, user_agent, &callback, &log_callback, false);
upnp_handler->discover_device();