diff --git a/include/libtorrent/Makefile.am b/include/libtorrent/Makefile.am index 8fb33f2a7..b3c205b56 100644 --- a/include/libtorrent/Makefile.am +++ b/include/libtorrent/Makefile.am @@ -160,6 +160,7 @@ nobase_include_HEADERS = \ tommath_superclass.h \ \ aux_/alert_manager_variadic_emplace.hpp \ + aux_/array_view.hpp \ aux_/allocating_handler.hpp \ aux_/bind_to_device.hpp \ aux_/cpuid.hpp \ diff --git a/include/libtorrent/aux_/array_view.hpp b/include/libtorrent/aux_/array_view.hpp new file mode 100644 index 000000000..5a04a98da --- /dev/null +++ b/include/libtorrent/aux_/array_view.hpp @@ -0,0 +1,76 @@ +/* + +Copyright (c) 2016, Arvid Norberg +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in + the documentation and/or other materials provided with the distribution. + * Neither the name of the author nor the names of its + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. + +*/ + +#ifndef TORRENT_ARRAY_VIEW_HPP_INCLUDED +#define TORRENT_ARRAY_VIEW_HPP_INCLUDED + +#include + +namespace libtorrent { namespace aux { + + template + struct array_view + { + array_view() : m_ptr(NULL), m_len(0) {} + array_view(T* p, int l) : m_ptr(p), m_len(l) {} + + template + explicit array_view(boost::array& arr) + : m_ptr(arr.data()), m_len(arr.size()) {} + + template + explicit array_view(T (&arr)[N]) + : m_ptr(&arr[0]), m_len(N) {} + + explicit array_view(std::vector& vec) + : m_ptr(vec.data()), m_len(vec.size()) {} + + size_t size() const { return m_len; } + T* data() const { return m_ptr; } + T* begin() const { return m_ptr; } + T* end() const { return m_ptr + m_len; } + + T& operator[](int idx) + { + TORRENT_ASSERT(idx >= 0); + TORRENT_ASSERT(idx < m_len); + return m_ptr[idx]; + } + + private: + T* m_ptr; + size_t m_len; + }; +}} + +#endif + diff --git a/include/libtorrent/aux_/session_impl.hpp b/include/libtorrent/aux_/session_impl.hpp index c05bb2abd..0423d4cc4 100644 --- a/include/libtorrent/aux_/session_impl.hpp +++ b/include/libtorrent/aux_/session_impl.hpp @@ -126,17 +126,21 @@ namespace libtorrent { listen_socket_t() : tcp_external_port(0) + , udp_external_port(0) , ssl(false) + , udp_write_blocked(false) { tcp_port_mapping[0] = -1; tcp_port_mapping[1] = -1; + udp_port_mapping[0] = -1; + udp_port_mapping[1] = -1; } // this is typically empty but can be set // to the WAN IP address of NAT-PMP or UPnP router address external_address; - // this is a cached local endpoint for the listen socket + // this is a cached local endpoint for the listen TCP socket tcp::endpoint local_endpoint; // this is typically set to the same as the local @@ -147,15 +151,27 @@ namespace libtorrent // to be published to peers, since this is the port // the client is reachable through. int tcp_external_port; + int udp_external_port; // 0 is natpmp 1 is upnp int tcp_port_mapping[2]; + int udp_port_mapping[2]; // set to true if this is an SSL listen socket bool ssl; - // the actual socket + // this is true when the udp socket send() has failed with EAGAIN or + // EWOULDBLOCK. i.e. we're currently waiting for the socket to become + // writeable again. Once it is, we'll set it to false and notify the utp + // socket manager + bool udp_write_blocked; + + // the actual sockets (TCP listen socket and UDP socket) + // An entry does not necessarily have a UDP or TCP socket. One of these + // pointers may be null! + // TODO: 3 make these unique_ptr<> boost::shared_ptr sock; + boost::shared_ptr udp_sock; }; namespace aux @@ -175,7 +191,6 @@ namespace libtorrent : session_interface , dht::dht_observer , boost::noncopyable - , udp_socket_observer , uncork_interface , single_threaded { @@ -184,9 +199,6 @@ namespace libtorrent // maximum length of query names which can be registered by extensions enum { max_dht_query_length = 15 }; -#ifdef TORRENT_DEBUG -// friend class ::libtorrent::peer_connection; -#endif #if TORRENT_USE_INVARIANT_CHECKS friend class libtorrent::invariant_access; #endif @@ -245,9 +257,6 @@ namespace libtorrent // need the initial push to connect peers void prioritize_connections(boost::weak_ptr t) TORRENT_OVERRIDE; - // if we are listening on an IPv6 interface - // this will return one of the IPv6 addresses on this - // machine, otherwise just an empty endpoint tcp::endpoint get_ipv6_interface() const TORRENT_OVERRIDE; tcp::endpoint get_ipv4_interface() const TORRENT_OVERRIDE; @@ -354,8 +363,6 @@ namespace libtorrent , std::vector
const& addresses, int port); #endif - void maybe_update_udp_mapping(int nat, bool ssl, int local_port, int external_port); - #if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) torrent const* find_encrypted_torrent( sha1_hash const& info_hash, sha1_hash const& xor_mask) TORRENT_OVERRIDE; @@ -522,7 +529,15 @@ namespace libtorrent #ifndef TORRENT_DISABLE_DHT bool is_dht_running() const { return (m_dht.get() != NULL); } - int external_udp_port() const TORRENT_OVERRIDE { return m_external_udp_port; } + int external_udp_port() const TORRENT_OVERRIDE + { + for (std::list::const_iterator i = m_listen_sockets.begin() + , end(m_listen_sockets.end()); i != end; ++i) + { + if (i->udp_sock) return i->udp_external_port; + } + return -1; + } #endif #if TORRENT_USE_I2P @@ -1042,20 +1057,27 @@ namespace libtorrent int m_outstanding_router_lookups; #endif - bool incoming_packet(error_code const& ec - , udp::endpoint const&, char const* buf, int size) TORRENT_OVERRIDE; + void send_udp_packet_hostname(char const* hostname + , int port + , array_view p + , error_code& ec + , int flags); - // see m_external_listen_port. This is the same - // but for the udp port used by the DHT. - // TODO: 3 once udp sockets are part of m_listen_sockets, remove this - int m_external_udp_port; + void send_udp_packet(bool ssl + , udp::endpoint const& ep + , array_view p + , error_code& ec + , int flags); + + void on_udp_writeable(boost::weak_ptr s, error_code const& ec); + + void on_udp_packet(boost::weak_ptr const& s + , bool ssl, error_code const& ec); - udp_socket m_udp_socket; libtorrent::utp_socket_manager m_utp_socket_manager; #ifdef TORRENT_USE_OPENSSL // used for uTP connectons over SSL - udp_socket m_ssl_udp_socket; libtorrent::utp_socket_manager m_ssl_utp_socket_manager; #endif @@ -1068,18 +1090,17 @@ namespace libtorrent boost::shared_ptr m_upnp; boost::shared_ptr m_lsd; - // TODO: 3 once the udp socket is in listen_socket_t, these should - // move in there too - // 0 is natpmp 1 is upnp - int m_udp_mapping[2]; -#ifdef TORRENT_USE_OPENSSL - int m_ssl_udp_mapping[2]; -#endif - // mask is a bitmask of which protocols to remap on: // 1: NAT-PMP // 2: UPnP - void remap_ports(boost::uint32_t mask, listen_socket_t& s); + // TODO: 3 perhaps this function should move into listen_socket_t + enum remap_port_mask_t + { + remap_natpmp = 1, + remap_upnp = 2, + remap_natpmp_and_upnp = 3 + }; + void remap_ports(remap_port_mask_t mask, listen_socket_t& s); // the timer used to fire the tick deadline_timer m_timer; diff --git a/include/libtorrent/aux_/session_interface.hpp b/include/libtorrent/aux_/session_interface.hpp index 460a39151..f162e9a4e 100644 --- a/include/libtorrent/aux_/session_interface.hpp +++ b/include/libtorrent/aux_/session_interface.hpp @@ -249,9 +249,8 @@ namespace libtorrent { namespace aux virtual void prioritize_connections(boost::weak_ptr t) = 0; - // TODO: 3 these should go away! - virtual tcp::endpoint get_ipv6_interface() const = 0; virtual tcp::endpoint get_ipv4_interface() const = 0; + virtual tcp::endpoint get_ipv6_interface() const = 0; virtual void trigger_auto_manage() = 0; diff --git a/include/libtorrent/kademlia/dht_tracker.hpp b/include/libtorrent/kademlia/dht_tracker.hpp index d3e35cf76..f0be60acd 100644 --- a/include/libtorrent/kademlia/dht_tracker.hpp +++ b/include/libtorrent/kademlia/dht_tracker.hpp @@ -52,6 +52,7 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/socket.hpp" #include "libtorrent/thread.hpp" #include "libtorrent/deadline_timer.hpp" +#include "libtorrent/aux_/array_view.hpp" namespace libtorrent { @@ -68,10 +69,14 @@ namespace libtorrent { namespace dht struct TORRENT_EXTRA_EXPORT dht_tracker TORRENT_FINAL : udp_socket_interface - , udp_socket_observer , boost::enable_shared_from_this { - dht_tracker(dht_observer* observer, udp_socket& sock + typedef boost::function, error_code&, int)> send_fun_t; + + dht_tracker(dht_observer* observer + , io_service& ios + , send_fun_t const& send_fun , dht_settings const& settings, counters& cnt , dht_storage_constructor_type storage_constructor , entry const& state); @@ -129,10 +134,8 @@ namespace libtorrent { namespace dht , std::vector& requests); void update_stats_counters(counters& c) const; - // translate bittorrent kademlia message into the generic kademlia message - // used by the library - virtual bool incoming_packet(error_code const& ec - , udp::endpoint const&, char const* buf, int size); + void incoming_error(error_code const& ec, udp::endpoint const&); + bool incoming_packet(udp::endpoint const&, char const* buf, int size); private: @@ -154,7 +157,7 @@ namespace libtorrent { namespace dht counters& m_counters; node m_dht; - udp_socket& m_sock; + send_fun_t m_send_fun; dht_logger* m_log; std::vector m_send_buf; diff --git a/include/libtorrent/tracker_manager.hpp b/include/libtorrent/tracker_manager.hpp index 49cd84e77..910ef7aec 100644 --- a/include/libtorrent/tracker_manager.hpp +++ b/include/libtorrent/tracker_manager.hpp @@ -49,6 +49,7 @@ POSSIBILITY OF SUCH DAMAGE. #include #include #include +#include #ifdef TORRENT_USE_OPENSSL #include @@ -62,8 +63,9 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/peer.hpp" // peer_entry #include "libtorrent/deadline_timer.hpp" #include "libtorrent/union_endpoint.hpp" -#include "libtorrent/udp_socket.hpp" // for udp_socket_observer #include "libtorrent/io_service.hpp" +#include "libtorrent/thread.hpp" +#include "libtorrent/aux_/array_view.hpp" namespace libtorrent { @@ -272,8 +274,7 @@ namespace libtorrent int m_completion_timeout; - typedef mutex mutex_t; - mutable mutex_t m_mutex; + mutable mutex m_mutex; // used for timeouts // this is set when the request has been sent @@ -314,10 +315,9 @@ namespace libtorrent address const& bind_interface() const { return m_req.bind_ip; } void sent_bytes(int bytes); void received_bytes(int bytes); - virtual bool on_receive(error_code const&, udp::endpoint const& + virtual bool on_receive(udp::endpoint const& , char const* /* buf */, int /* size */) { return false; } - virtual bool on_receive_hostname(error_code const& - , char const* /* hostname */ + virtual bool on_receive_hostname(char const* /* hostname */ , char const* /* buf */, int /* size */) { return false; } boost::shared_ptr shared_from_this() @@ -341,12 +341,19 @@ namespace libtorrent }; class TORRENT_EXTRA_EXPORT tracker_manager TORRENT_FINAL - : public udp_socket_observer - , boost::noncopyable + : boost::noncopyable { public: - tracker_manager(udp_socket& sock + typedef boost::function + , error_code&, int)> send_fun_t; + typedef boost::function + , error_code&, int)> send_fun_hostname_t; + + tracker_manager(send_fun_t const& send_fun + , send_fun_hostname_t const& send_fun_hostname , counters& stats_counters , resolver_interface& resolver , aux::session_settings const& sett @@ -370,26 +377,31 @@ namespace libtorrent void sent_bytes(int bytes); void received_bytes(int bytes); - virtual bool incoming_packet(error_code const& e, udp::endpoint const& ep - , char const* buf, int size) TORRENT_OVERRIDE; + void incoming_error(error_code const& ec, udp::endpoint const& ep); + bool incoming_packet(udp::endpoint const& ep, char const* buf, int size); // this is only used for SOCKS packets, since // they may be addressed to hostname - virtual bool incoming_packet(error_code const& e, char const* hostname - , char const* buf, int size) TORRENT_OVERRIDE; + // TODO: 3 make sure the udp_socket supports passing on string-hostnames + // too, and that this function is used + bool incoming_packet(char const* hostname, char const* buf, int size); void update_transaction_id( boost::shared_ptr c , boost::uint64_t tid); aux::session_settings const& settings() const { return m_settings; } - udp_socket& get_udp_socket() { return m_udp_socket; } resolver_interface& host_resolver() { return m_host_resolver; } + void send_hostname(char const* hostname, int port, aux::array_view p + , error_code& ec, int flags = 0); + + void send(udp::endpoint const& ep, aux::array_view p + , error_code& ec, int flags = 0); + private: - typedef mutex mutex_t; - mutable mutex_t m_mutex; + mutable mutex m_mutex; // maps transactionid to the udp_tracker_connection // TODO: this should be unique_ptr in the future @@ -400,7 +412,8 @@ namespace libtorrent typedef std::vector > http_conns_t; http_conns_t m_http_conns; - class udp_socket& m_udp_socket; + send_fun_t m_send_fun; + send_fun_hostname_t m_send_fun_hostname; resolver_interface& m_host_resolver; aux::session_settings const& m_settings; counters& m_stats_counters; diff --git a/include/libtorrent/udp_socket.hpp b/include/libtorrent/udp_socket.hpp index 60ba212c5..f3338ce3f 100644 --- a/include/libtorrent/udp_socket.hpp +++ b/include/libtorrent/udp_socket.hpp @@ -41,30 +41,12 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/thread.hpp" #include "libtorrent/deadline_timer.hpp" #include "libtorrent/debug.hpp" +#include "libtorrent/aux_/array_view.hpp" #include "libtorrent/aux_/allocating_handler.hpp" -#include - namespace libtorrent { - struct TORRENT_EXTRA_EXPORT udp_socket_observer - { - // return true if the packet was handled (it won't be - // propagated to the next observer) - virtual bool incoming_packet(error_code const& ec - , udp::endpoint const&, char const* buf, int size) = 0; - virtual bool incoming_packet(error_code const& /* ec */ - , char const* /* hostname */, char const* /* buf */, int /* size */) { return false; } - - // called when the socket becomes writeable, after having - // failed with EWOULDBLOCK - virtual void writable() {} - - // called every time the socket is drained of packets - virtual void socket_drained() {} - protected: - ~udp_socket_observer() {} - }; + struct socks5; class TORRENT_EXTRA_EXPORT udp_socket : single_threaded { @@ -76,19 +58,38 @@ namespace libtorrent peer_connection = 1 , tracker_connection = 2 , dont_queue = 4 + , dont_fragment = 8 }; bool is_open() const { return m_abort == false; } - io_service& get_io_service() { return m_ipv4_sock.get_io_service(); } + io_service& get_io_service() { return m_socket.get_io_service(); } - void subscribe(udp_socket_observer* o); - void unsubscribe(udp_socket_observer* o); + template + void async_read(Handler h) + { + m_socket.async_receive(null_buffers(), h); + } + + template + void async_write(Handler h) + { + m_socket.async_send(null_buffers(), h); + } + + struct packet + { + aux::array_view data; + udp::endpoint from; + error_code error; + }; + + int read(aux::array_view pkts, error_code& ec); // this is only valid when using a socks5 proxy - void send_hostname(char const* hostname, int port, char const* p - , int len, error_code& ec, int flags = 0); + void send_hostname(char const* hostname, int port, aux::array_view p + , error_code& ec, int flags = 0); - void send(udp::endpoint const& ep, char const* p, int len + void send(udp::endpoint const& ep, aux::array_view p , error_code& ec, int flags = 0); void bind(udp::endpoint const& ep, error_code& ec); void close(); @@ -99,13 +100,8 @@ namespace libtorrent void set_force_proxy(bool f) { m_force_proxy = f; } bool is_closed() const { return m_abort; } - tcp::endpoint local_endpoint(error_code& ec) const - { - udp::endpoint ep = m_ipv4_sock.local_endpoint(ec); - return tcp::endpoint(ep.address(), ep.port()); - } - - void set_buf_size(int s); + udp::endpoint local_endpoint(error_code& ec) const + { return m_socket.local_endpoint(ec); } typedef udp::socket::receive_buffer_size receive_buffer_size; typedef udp::socket::send_buffer_size send_buffer_size; @@ -113,201 +109,51 @@ namespace libtorrent template void get_option(SocketOption const& opt, error_code& ec) { -#if TORRENT_USE_IPV6 - if (opt.level(udp::v6()) == IPPROTO_IPV6) - m_ipv6_sock.get_option(opt, ec); - else -#endif - m_ipv4_sock.get_option(opt, ec); + m_socket.get_option(opt, ec); } template void set_option(SocketOption const& opt, error_code& ec) { - if (opt.level(udp::v4()) != IPPROTO_IPV6) - m_ipv4_sock.set_option(opt, ec); -#if TORRENT_USE_IPV6 - if (opt.level(udp::v6()) != IPPROTO_IP) - m_ipv6_sock.set_option(opt, ec); -#endif + m_socket.set_option(opt, ec); } template void get_option(SocketOption& opt, error_code& ec) { -#if TORRENT_USE_IPV6 - if (opt.level(udp::v6()) == IPPROTO_IPV6) - m_ipv6_sock.get_option(opt, ec); - else -#endif - m_ipv4_sock.get_option(opt, ec); + m_socket.get_option(opt, ec); } - udp::endpoint proxy_addr() const { return m_proxy_addr; } - private: - struct queued_packet - { - queued_packet() - : hostname(NULL) - , flags(0) - {} - - udp::endpoint ep; - char* hostname; - buffer buf; - int flags; - }; - - // number of outstanding UDP socket operations - // using the UDP socket buffer - int num_outstanding() const - { - return m_v4_outstanding -#if TORRENT_USE_IPV6 - + m_v6_outstanding -#endif - ; - } - // non-copyable udp_socket(udp_socket const&); udp_socket& operator=(udp_socket const&); - void close_impl(); + void wrap(udp::endpoint const& ep, aux::array_view p, error_code& ec, int flags); + void wrap(char const* hostname, int port, aux::array_view p, error_code& ec, int flags); + bool unwrap(udp::endpoint& from, aux::array_view& buf); - // observers on this udp socket - std::vector m_observers; - std::vector m_added_observers; + udp::socket m_socket; - template - aux::allocating_handler - make_read_handler4(Handler const& handler) - { - return aux::allocating_handler( - handler, m_v4_read_handler_storage - ); - } - -#if TORRENT_USE_IPV6 - template - aux::allocating_handler - make_read_handler6(Handler const& handler) - { - return aux::allocating_handler( - handler, m_v6_read_handler_storage - ); - } -#endif - - // this is true while iterating over the observers - // vector, invoking observer hooks. We may not - // add new observers during this time, since it - // may invalidate the iterator. If this is true, - // instead add new observers to m_added_observers - // and they will be added later - bool m_observers_locked; - - void call_handler(error_code const& ec, udp::endpoint const& ep - , char const* buf, int size); - void call_handler(error_code const& ec, const char* host - , char const* buf, int size); - void call_drained_handler(); - void call_writable_handler(); - - void on_writable(error_code const& ec, udp::socket* s); - - void setup_read(udp::socket* s); - void on_read(error_code const& ec, udp::socket* s); - void on_read_impl(udp::endpoint const& ep - , error_code const& e, std::size_t bytes_transferred); - void on_name_lookup(error_code const& e, tcp::resolver::iterator i); - void on_connect_timeout(error_code const& ec); - void on_connected(error_code const& ec); - void handshake1(error_code const& e); - void handshake2(error_code const& e); - void handshake3(error_code const& e); - void handshake4(error_code const& e); - void socks_forward_udp(); - void connect1(error_code const& e); - void connect2(error_code const& e); - void hung_up(error_code const& e); - - void drain_queue(); - - void wrap(udp::endpoint const& ep, char const* p, int len, error_code& ec); - void wrap(char const* hostname, int port, char const* p, int len, error_code& ec); - void unwrap(error_code const& e, char const* buf, int size); - - udp::socket m_ipv4_sock; - aux::handler_storage m_v4_read_handler_storage; - deadline_timer m_timer; - int m_buf_size; - - // if the buffer size is attempted - // to be changed while the buffer is - // being used, this member is set to - // the desired size, and it's resized - // later - int m_new_buf_size; + // TODO: 2 this should probably be a scoped_ptr<> or unique_ptr + // with a hard coded size + int const m_buf_size; char* m_buf; -#if TORRENT_USE_IPV6 - udp::socket m_ipv6_sock; - aux::handler_storage m_v6_read_handler_storage; -#endif - boost::uint16_t m_bind_port; - boost::uint8_t m_v4_outstanding; - boost::uint8_t m_restart_v4; -#if TORRENT_USE_IPV6 - boost::uint8_t m_v6_outstanding; - boost::uint8_t m_restart_v6; -#endif - tcp::socket m_socks5_sock; aux::proxy_settings m_proxy_settings; - tcp::resolver m_resolver; - char m_tmp_buf[270]; - bool m_queue_packets; - bool m_tunnel_packets; - bool m_force_proxy; - bool m_abort; - // this is the endpoint the proxy server lives at. - // when performing a UDP associate, we get another - // endpoint (presumably on the same IP) where we're - // supposed to send UDP packets. - udp::endpoint m_proxy_addr; + boost::shared_ptr m_socks5_connection; - // this is where UDP packets that are to be forwarded - // are sent. The result from UDP ASSOCIATE is stored - // in here. - udp::endpoint m_udp_proxy_addr; - - // while we're connecting to the proxy - // we have to queue the packets, we'll flush - // them once we're connected - std::deque m_queue; - - // counts the number of outstanding async - // operations hanging on this socket - int m_outstanding_ops; - -#if TORRENT_USE_IPV6 - bool m_v6_write_subscribed:1; -#endif - bool m_v4_write_subscribed:1; + // TODO: 3 add a unit test for force-proxy + bool m_force_proxy:1; + bool m_abort:1; #if TORRENT_USE_ASSERTS bool m_started; int m_magic; - int m_outstanding_when_aborted; - int m_outstanding_connect; - int m_outstanding_timeout; - int m_outstanding_resolve; - int m_outstanding_socks; #endif }; } diff --git a/include/libtorrent/udp_tracker_connection.hpp b/include/libtorrent/udp_tracker_connection.hpp index fe41b886e..02f621a69 100644 --- a/include/libtorrent/udp_tracker_connection.hpp +++ b/include/libtorrent/udp_tracker_connection.hpp @@ -94,10 +94,8 @@ namespace libtorrent void timeout(error_code const& error); void start_announce(); - bool on_receive(error_code const& e, udp::endpoint const& ep - , char const* buf, int size); - bool on_receive_hostname(error_code const& e, char const* hostname - , char const* buf, int size); + bool on_receive(udp::endpoint const& ep, char const* buf, int size); + bool on_receive_hostname(char const* hostname, char const* buf, int size); bool on_connect_response(char const* buf, int size); bool on_announce_response(char const* buf, int size); bool on_scrape_response(char const* buf, int size); diff --git a/include/libtorrent/utp_socket_manager.hpp b/include/libtorrent/utp_socket_manager.hpp index ea7298072..ce387db4f 100644 --- a/include/libtorrent/utp_socket_manager.hpp +++ b/include/libtorrent/utp_socket_manager.hpp @@ -39,36 +39,51 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/session_status.hpp" #include "libtorrent/enum_net.hpp" #include "libtorrent/aux_/session_settings.hpp" +#include "libtorrent/aux_/array_view.hpp" + +#include "libtorrent/aux_/disable_warnings_push.hpp" + +#include + +#include "libtorrent/aux_/disable_warnings_pop.hpp" namespace libtorrent { - class udp_socket; class utp_stream; struct utp_socket_impl; struct counters; - typedef boost::function const&)> incoming_utp_callback_t; - - struct utp_socket_manager TORRENT_FINAL : udp_socket_observer + struct utp_socket_manager TORRENT_FINAL { - utp_socket_manager(aux::session_settings const& sett, udp_socket& s - , counters& cnt, void* ssl_context, incoming_utp_callback_t cb); + typedef boost::function + , error_code&, int)> send_fun_t; + + typedef boost::function const&)> + incoming_utp_callback_t; + + utp_socket_manager(send_fun_t const& send_fun + , incoming_utp_callback_t const& cb + , io_service& ios + , aux::session_settings const& sett + , counters& cnt, void* ssl_context + ); ~utp_socket_manager(); // return false if this is not a uTP packet - virtual bool incoming_packet(error_code const& ec, udp::endpoint const& ep - , char const* p, int size) TORRENT_OVERRIDE; - virtual bool incoming_packet(error_code const&, char const*, char const*, int) TORRENT_OVERRIDE - { return false; } - virtual void writable() TORRENT_OVERRIDE; + bool incoming_packet(udp::endpoint const& ep, char const* p, int size); - virtual void socket_drained() TORRENT_OVERRIDE; + // if the UDP socket failed with an EAGAIN or EWOULDBLOCK, this will be + // called once the socket is writeable again + void writable(); + + // when the upper layer has drained the underlying UDP socket, this is + // called, and uTP sockets will send their ACKs. This ensures ACKs at + // least coalese packets returned during the same wakeup + void socket_drained(); void tick(time_point now); - tcp::endpoint local_endpoint(address const& remote, error_code& ec) const; - int local_port(error_code& ec) const; - // flags for send_packet enum { dont_fragment = 1 }; void send_packet(udp::endpoint const& ep, char const* p, int len @@ -89,7 +104,6 @@ namespace libtorrent int loss_multiplier() const { return m_sett.get_int(settings_pack::utp_loss_multiplier); } void mtu_for_dest(address const& addr, int& link_mtu, int& utp_mtu); - void set_sock_buf(int size); int num_sockets() const { return m_utp_sockets.size(); } void defer_ack(utp_socket_impl* s); @@ -114,7 +128,7 @@ namespace libtorrent // explicitly disallow assignment, to silence msvc warning utp_socket_manager& operator=(utp_socket_manager const&); - udp_socket& m_sock; + send_fun_t m_send_fun; incoming_utp_callback_t m_cb; // replace with a hash-map @@ -165,6 +179,8 @@ namespace libtorrent // stats counters counters& m_counters; + io_service& m_ios; + boost::array m_restrict_mtu; int m_mtu_idx; diff --git a/simulation/test_dht_rate_limit.cpp b/simulation/test_dht_rate_limit.cpp index fdf1be94c..458a864ee 100644 --- a/simulation/test_dht_rate_limit.cpp +++ b/simulation/test_dht_rate_limit.cpp @@ -30,8 +30,6 @@ POSSIBILITY OF SUCH DAMAGE. */ -#if !defined TORRENT_DISABLE_DHT - #include "test.hpp" #include "simulator/simulator.hpp" @@ -41,6 +39,7 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/performance_counters.hpp" #include "libtorrent/entry.hpp" #include "libtorrent/session_settings.hpp" +#include "libtorrent/aux_/array_view.hpp" #include "libtorrent/kademlia/dht_observer.hpp" #include @@ -50,6 +49,8 @@ using namespace libtorrent; namespace lt = libtorrent; using namespace sim; +#if !defined TORRENT_DISABLE_DHT + struct obs : dht::dht_observer { virtual void set_external_address(address const& addr @@ -100,8 +101,22 @@ TORRENT_TEST(dht_rate_limit) counters cnt; entry state; boost::shared_ptr dht = boost::make_shared( - &o, sock, dhtsett, cnt, dht::dht_default_storage_constructor, state); - sock.subscribe(dht.get()); + &o, boost::ref(dht_ios), boost::bind(&udp_socket::send, &sock, _1, _2, _3, _4) + , dhtsett, cnt, dht::dht_default_storage_constructor, state); + + bool stop = false; + std::function on_read + = [&](error_code const& ec, size_t bytes) + { + if (ec) return; + udp_socket::packet p; + error_code err; + int const num = sock.read(lt::aux::array_view(&p, 1), err); + if (num) dht->incoming_packet(p.from, p.data.data(), p.data.size()); + if (stop || err) return; + sock.async_read(on_read); + }; + sock.async_read(on_read); // sender int num_packets_sent = 0; @@ -120,7 +135,7 @@ TORRENT_TEST(dht_rate_limit) timer.async_wait([&](error_code const& ec) { dht->stop(); - sock.unsubscribe(dht.get()); + stop = true; sender_sock.close(); sock.close(); }); diff --git a/src/kademlia/dht_tracker.cpp b/src/kademlia/dht_tracker.cpp index 529d0d516..8b3ce2f08 100644 --- a/src/kademlia/dht_tracker.cpp +++ b/src/kademlia/dht_tracker.cpp @@ -90,21 +90,22 @@ namespace libtorrent { namespace dht // class that puts the networking and the kademlia node in a single // unit and connecting them together. dht_tracker::dht_tracker(dht_observer* observer - , udp_socket& sock + , io_service& ios + , send_fun_t const& send_fun , dht_settings const& settings , counters& cnt , dht_storage_constructor_type storage_constructor , entry const& state) : m_counters(cnt) , m_dht(this, settings, extract_node_id(state), observer, cnt, storage_constructor) - , m_sock(sock) + , m_send_fun(send_fun) , m_log(observer) - , m_key_refresh_timer(sock.get_io_service()) - , m_connection_timer(sock.get_io_service()) - , m_refresh_timer(sock.get_io_service()) + , m_key_refresh_timer(ios) + , m_connection_timer(ios) + , m_refresh_timer(ios) , m_settings(settings) , m_abort(false) - , m_host_resolver(sock.get_io_service()) + , m_host_resolver(ios) , m_send_quota(settings.upload_rate_limit) , m_last_tick(aux::time_now()) { @@ -277,29 +278,26 @@ namespace libtorrent { namespace dht m_dht.direct_request(ep, e, f); } - // translate bittorrent kademlia message into the generice kademlia message - // used by the library - bool dht_tracker::incoming_packet(error_code const& ec - , udp::endpoint const& ep, char const* buf, int size) + void dht_tracker::incoming_error(error_code const& ec, udp::endpoint const& ep) { - if (ec) - { - if (ec == boost::asio::error::connection_refused - || ec == boost::asio::error::connection_reset - || ec == boost::asio::error::connection_aborted + if (ec == boost::asio::error::connection_refused + || ec == boost::asio::error::connection_reset + || ec == boost::asio::error::connection_aborted #ifdef WIN32 - || ec == error_code(ERROR_HOST_UNREACHABLE, system_category()) - || ec == error_code(ERROR_PORT_UNREACHABLE, system_category()) - || ec == error_code(ERROR_CONNECTION_REFUSED, system_category()) - || ec == error_code(ERROR_CONNECTION_ABORTED, system_category()) + || ec == error_code(ERROR_HOST_UNREACHABLE, system_category()) + || ec == error_code(ERROR_PORT_UNREACHABLE, system_category()) + || ec == error_code(ERROR_CONNECTION_REFUSED, system_category()) + || ec == error_code(ERROR_CONNECTION_ABORTED, system_category()) #endif - ) - { - m_dht.unreachable(ep); - } - return false; + ) + { + m_dht.unreachable(ep); } + } + bool dht_tracker::incoming_packet( udp::endpoint const& ep + , char const* buf, int size) + { if (size <= 20 || *buf != 'd' || buf[size-1] != 'e') return false; // remove this line/check once the DHT supports IPv6 if (!ep.address().is_v4()) return false; @@ -316,7 +314,7 @@ namespace libtorrent { namespace dht // these are class A networks not available to the public // if we receive messages from here, that seems suspicious - boost::uint8_t class_a[] = { 3, 6, 7, 9, 11, 19, 21, 22, 25 + static boost::uint8_t const class_a[] = { 3, 6, 7, 9, 11, 19, 21, 22, 25 , 26, 28, 29, 30, 33, 34, 48, 51, 56 }; int num = sizeof(class_a)/sizeof(class_a[0]); @@ -451,7 +449,7 @@ namespace libtorrent { namespace dht m_send_quota -= m_send_buf.size(); error_code ec; - m_sock.send(addr, &m_send_buf[0], int(m_send_buf.size()), ec, 0); + m_send_fun(addr, aux::array_view(&m_send_buf[0], m_send_buf.size()), ec, 0); if (ec) { m_counters.inc_stats_counter(counters::dht_messages_out_dropped); diff --git a/src/session_impl.cpp b/src/session_impl.cpp index 8b8bb4005..3b14b77f6 100644 --- a/src/session_impl.cpp +++ b/src/session_impl.cpp @@ -374,7 +374,11 @@ namespace aux { , m_global_class(0) , m_tcp_peer_class(0) , m_local_peer_class(0) - , m_tracker_manager(m_udp_socket, m_stats_counters, m_host_resolver + , m_tracker_manager( + boost::bind(&session_impl::send_udp_packet, this, false, _1, _2, _3, _4) + , boost::bind(&session_impl::send_udp_packet_hostname, this, _1, _2, _3, _4, _5) + , m_stats_counters + , m_host_resolver , m_settings #if !defined TORRENT_DISABLE_LOGGING || TORRENT_USE_ASSERTS , *this @@ -413,15 +417,18 @@ namespace aux { , m_dht_interval_update_torrents(0) , m_outstanding_router_lookups(0) #endif - , m_external_udp_port(0) - , m_udp_socket(m_io_service) - , m_utp_socket_manager(m_settings, m_udp_socket, m_stats_counters, NULL - , boost::bind(&session_impl::incoming_connection, this, _1)) + , m_utp_socket_manager( + boost::bind(&session_impl::send_udp_packet, this, false, _1, _2, _3, _4) + , boost::bind(&session_impl::incoming_connection, this, _1) + , m_io_service + , m_settings, m_stats_counters, NULL) #ifdef TORRENT_USE_OPENSSL - , m_ssl_udp_socket(m_io_service) - , m_ssl_utp_socket_manager(m_settings, m_ssl_udp_socket, m_stats_counters - , &m_ssl_ctx - , boost::bind(&session_impl::on_incoming_utp_ssl, this, _1)) + , m_ssl_utp_socket_manager( + boost::bind(&session_impl::send_udp_packet, this, true, _1, _2, _3, _4) + , boost::bind(&session_impl::on_incoming_utp_ssl, this, _1) + , m_io_service + , m_settings, m_stats_counters + , &m_ssl_ctx) #endif , m_boost_connections(0) , m_timer(m_io_service) @@ -444,18 +451,6 @@ namespace aux { #if TORRENT_USE_ASSERTS m_posting_torrent_updates = false; #endif - - m_udp_socket.subscribe(&m_utp_socket_manager); - m_udp_socket.subscribe(this); - m_udp_socket.subscribe(&m_tracker_manager); - -#ifdef TORRENT_USE_OPENSSL - m_ssl_udp_socket.subscribe(&m_ssl_utp_socket_manager); - m_ssl_udp_socket.subscribe(this); -#endif - - error_code ec; - TORRENT_ASSERT_VAL(!ec, ec); } // This function is called by the creating thread, not in the message loop's @@ -489,12 +484,6 @@ namespace aux { m_next_dht_torrent = m_torrents.begin(); #endif m_next_lsd_torrent = m_torrents.begin(); - m_udp_mapping[0] = -1; - m_udp_mapping[1] = -1; -#ifdef TORRENT_USE_OPENSSL - m_ssl_udp_mapping[0] = -1; - m_ssl_udp_mapping[1] = -1; -#endif m_global_class = m_classes.new_peer_class("global"); m_tcp_peer_class = m_classes.new_peer_class("tcp"); @@ -1046,10 +1035,19 @@ namespace aux { for (std::list::iterator i = m_listen_sockets.begin() , end(m_listen_sockets.end()); i != end; ++i) { - i->sock->close(ec); - TORRENT_ASSERT(!ec); + if (i->sock) + { + i->sock->close(ec); + TORRENT_ASSERT(!ec); + } + + // TODO: 3 closing the udp sockets here means that + // the uTP connections cannot be closed gracefully + if (i->udp_sock) + { + i->udp_sock->close(); + } } - m_listen_sockets.clear(); if (m_socks_listen_socket && m_socks_listen_socket->is_open()) { m_socks_listen_socket->close(ec); @@ -1113,12 +1111,6 @@ namespace aux { m_download_rate.close(); m_upload_rate.close(); - m_udp_socket.close(); - m_external_udp_port = 0; -#ifdef TORRENT_USE_OPENSSL - m_ssl_udp_socket.close(); -#endif - // it's OK to detach the threads here. The disk_io_thread // has an internal counter and won't release the network // thread until they're all dead (via m_work). @@ -1192,7 +1184,7 @@ namespace aux { template void set_socket_buffer_size(Socket& s, session_settings const& sett, error_code& ec) { - int snd_size = sett.get_int(settings_pack::send_socket_buffer_size); + int const snd_size = sett.get_int(settings_pack::send_socket_buffer_size); if (snd_size) { typename Socket::send_buffer_size prev_option; @@ -1209,7 +1201,7 @@ namespace aux { } } } - int recv_size = sett.get_int(settings_pack::recv_socket_buffer_size); + int const recv_size = sett.get_int(settings_pack::recv_socket_buffer_size); if (recv_size) { typename Socket::receive_buffer_size prev_option; @@ -1292,6 +1284,8 @@ namespace aux { } #endif +//TODO: should there be an option to announce once per listen interface? + m_tracker_manager.queue_request(get_io_service(), req, c); } @@ -1648,24 +1642,29 @@ namespace aux { } #endif - // TODO: 2 remove this function + // TODO: 3 try to remove these functions. They are misleading and not very + // useful. Anything using these should probably be fixed to do something more + // multi-homed friendly tcp::endpoint session_impl::get_ipv6_interface() const { +#if TORRENT_USE_IPV6 for (std::list::const_iterator i = m_listen_sockets.begin() , end(m_listen_sockets.end()); i != end; ++i) { - if (i->local_endpoint.address().is_v6()) return i->local_endpoint; + if (!i->local_endpoint.address().is_v6()) continue; + return tcp::endpoint(i->local_endpoint.address(), i->tcp_external_port); } +#endif return tcp::endpoint(); } - // TODO: 2 remove this function tcp::endpoint session_impl::get_ipv4_interface() const { for (std::list::const_iterator i = m_listen_sockets.begin() , end(m_listen_sockets.end()); i != end; ++i) { - if (i->local_endpoint.address().is_v4()) return i->local_endpoint; + if (!i->local_endpoint.address().is_v4()) continue; + return tcp::endpoint(i->local_endpoint.address(), i->tcp_external_port); } return tcp::endpoint(); } @@ -1689,83 +1688,201 @@ namespace aux { = (flags & open_ssl_socket) ? listen_failed_alert::tcp_ssl : listen_failed_alert::tcp; - ret.sock.reset(new tcp::acceptor(m_io_service)); - ret.sock->open(bind_ep.protocol(), ec); - last_op = listen_failed_alert::open; - if (ec) + + // if we're in force-proxy mode, don't open TCP listen sockets. We cannot + // accept connections on our local machine in this case. + // TODO: 3 the logic in this if-block should be factored out into a + // separate function. At least most of it + if (!m_settings.get_bool(settings_pack::force_proxy)) { + ret.sock = boost::make_shared(boost::ref(m_io_service)); + ret.sock->open(bind_ep.protocol(), ec); + last_op = listen_failed_alert::open; + if (ec) + { #ifndef TORRENT_DISABLE_LOGGING - session_log("failed to open socket: %s" - , ec.message().c_str()); + session_log("failed to open socket: %s" + , ec.message().c_str()); #endif - if (m_alerts.should_post()) - m_alerts.emplace_alert(device, bind_ep, last_op - , ec, sock_type); - return ret; - } + if (m_alerts.should_post()) + m_alerts.emplace_alert(device, bind_ep, last_op + , ec, sock_type); + return ret; + } #ifdef TORRENT_WINDOWS - { - // this is best-effort. ignore errors - error_code err; - ret.sock->set_option(exclusive_address_use(true), err); -#ifndef TORRENT_DISABLE_LOGGING - if (err) { - session_log("failed enable exclusive address use on listen socket: %s" - , err.message().c_str()); - } + // this is best-effort. ignore errors + error_code err; + ret.sock->set_option(exclusive_address_use(true), err); +#ifndef TORRENT_DISABLE_LOGGING + if (err) + { + session_log("failed enable exclusive address use on listen socket: %s" + , err.message().c_str()); + } #endif // TORRENT_DISABLE_LOGGING - } + } #endif // TORRENT_WINDOWS - { - // this is best-effort. ignore errors - error_code err; - ret.sock->set_option(tcp::acceptor::reuse_address(true), err); -#ifndef TORRENT_DISABLE_LOGGING - if (err) { - session_log("failed enable reuse-address on listen socket: %s" - , err.message().c_str()); - } + // this is best-effort. ignore errors + error_code err; + ret.sock->set_option(tcp::acceptor::reuse_address(true), err); +#ifndef TORRENT_DISABLE_LOGGING + if (err) + { + session_log("failed enable reuse-address on listen socket: %s" + , err.message().c_str()); + } #endif // TORRENT_DISABLE_LOGGING - } + } #if TORRENT_USE_IPV6 - if (bind_ep.address().is_v6()) - { - error_code err; // ignore errors here - ret.sock->set_option(boost::asio::ip::v6_only(true), err); -#ifndef TORRENT_DISABLE_LOGGING - if (err) + if (bind_ep.address().is_v6()) { - session_log("failed enable v6 only on listen socket: %s" - , err.message().c_str()); - } + error_code err; // ignore errors here + ret.sock->set_option(boost::asio::ip::v6_only(true), err); +#ifndef TORRENT_DISABLE_LOGGING + if (err) + { + session_log("failed enable v6 only on listen socket: %s" + , err.message().c_str()); + } #endif // LOGGING #ifdef TORRENT_WINDOWS - // enable Teredo on windows - ret.sock->set_option(v6_protection_level(PROTECTION_LEVEL_UNRESTRICTED), err); + // enable Teredo on windows + ret.sock->set_option(v6_protection_level(PROTECTION_LEVEL_UNRESTRICTED), err); #ifndef TORRENT_DISABLE_LOGGING - if (err) - { - session_log("failed enable IPv6 unrestricted protection level on " - "listen socket: %s", err.message().c_str()); - } + if (err) + { + session_log("failed enable IPv6 unrestricted protection level on " + "listen socket: %s", err.message().c_str()); + } #endif // TORRENT_DISABLE_LOGGING #endif // TORRENT_WINDOWS - } + } #endif // TORRENT_USE_IPV6 + if (!device.empty()) + { + // we have an actual device we're interested in listening on, if we + // have SO_BINDTODEVICE functionality, use it now. +#if TORRENT_HAS_BINDTODEVICE + ret.sock->set_option(bind_to_device(device.c_str()), ec); + if (ec) + { +#ifndef TORRENT_DISABLE_LOGGING + session_log("bind to device failed (device: %s): %s" + , device.c_str(), ec.message().c_str()); +#endif // TORRENT_DISABLE_LOGGING + + last_op = listen_failed_alert::bind_to_device; + if (m_alerts.should_post()) + { + m_alerts.emplace_alert(device, bind_ep + , last_op, ec, sock_type); + } + return ret; + } +#endif + } + + ret.sock->bind(bind_ep, ec); + last_op = listen_failed_alert::bind; + + while (ec == error_code(error::address_in_use) && retries > 0) + { + TORRENT_ASSERT_VAL(ec, ec); +#ifndef TORRENT_DISABLE_LOGGING + error_code ignore; + session_log("failed to bind listen socket to: %s on device: %s :" + " [%s] (%d) %s (retries: %d)" + , print_endpoint(bind_ep).c_str() + , device.c_str() + , ec.category().name(), ec.value(), ec.message().c_str() + , retries); +#endif + ec.clear(); + --retries; + bind_ep.port(bind_ep.port() + 1); + ret.sock->bind(bind_ep, ec); + } + + if (ec == error_code(error::address_in_use) + && !(flags & listen_no_system_port)) + { + // instead of giving up, try let the OS pick a port + bind_ep.port(0); + ec.clear(); + ret.sock->bind(bind_ep, ec); + last_op = listen_failed_alert::bind; + } + + if (ec) + { + // not even that worked, give up + +#ifndef TORRENT_DISABLE_LOGGING + error_code ignore; + session_log("failed to bind listen socket to: %s on device: %s :" + " [%s] (%d) %s (giving up)" + , print_endpoint(bind_ep).c_str() + , device.c_str() + , ec.category().name(), ec.value(), ec.message().c_str()); +#endif + if (m_alerts.should_post()) + { + m_alerts.emplace_alert(device, bind_ep + , last_op, ec, sock_type); + } + ret.sock.reset(); + return ret; + } + ret.local_endpoint = ret.sock->local_endpoint(ec); + last_op = listen_failed_alert::get_socket_name; + if (ec) + { +#ifndef TORRENT_DISABLE_LOGGING + session_log("get_sockname failed on listen socket: %s" + , ec.message().c_str()); +#endif + if (m_alerts.should_post()) + { + m_alerts.emplace_alert(device, bind_ep + , last_op, ec, sock_type); + } + return ret; + } + ret.tcp_external_port = ret.local_endpoint.port(); + TORRENT_ASSERT(ret.tcp_external_port == bind_ep.port() + || bind_ep.port() == 0); + + ret.sock->listen(m_settings.get_int(settings_pack::listen_queue_size), ec); + last_op = listen_failed_alert::listen; + + if (ec) + { +#ifndef TORRENT_DISABLE_LOGGING + session_log("cannot listen on interface \"%s\": %s" + , device.c_str(), ec.message().c_str()); +#endif + if (m_alerts.should_post()) + { + m_alerts.emplace_alert(device, bind_ep + , last_op, ec, sock_type); + } + return ret; + } + } // force-proxy mode + + ret.udp_sock = boost::make_shared(boost::ref(m_io_service)); +#if TORRENT_HAS_BINDTODEVICE if (!device.empty()) { - // we have an actual device we're interested in listening on, if we - // have SO_BINDTODEVICE functionality, use it now. -#if TORRENT_HAS_BINDTODEVICE - ret.sock->set_option(bind_to_device(device.c_str()), ec); + ret.udp_sock->set_option(bind_to_device(device.c_str()), ec); if (ec) { #ifndef TORRENT_DISABLE_LOGGING @@ -1781,98 +1898,51 @@ namespace aux { } return ret; } -#endif } +#endif + ret.udp_sock->bind(udp::endpoint(bind_ep.address(), bind_ep.port()) + , ec); - ret.sock->bind(bind_ep, ec); last_op = listen_failed_alert::bind; - - while (ec == error_code(error::address_in_use) && retries > 0) - { - TORRENT_ASSERT_VAL(ec, ec); -#ifndef TORRENT_DISABLE_LOGGING - error_code ignore; - session_log("failed to bind listen socket to: %s on device: %s :" - " [%s] (%d) %s (retries: %d)" - , print_endpoint(bind_ep).c_str() - , device.c_str() - , ec.category().name(), ec.value(), ec.message().c_str() - , retries); -#endif - ec.clear(); - --retries; - bind_ep.port(bind_ep.port() + 1); - ret.sock->bind(bind_ep, ec); - } - - if (ec == error_code(error::address_in_use) - && !(flags & listen_no_system_port)) - { - // instead of giving up, try let the OS pick a port - bind_ep.port(0); - ec.clear(); - ret.sock->bind(bind_ep, ec); - last_op = listen_failed_alert::bind; - } - - if (ec) - { - // not even that worked, give up - -#ifndef TORRENT_DISABLE_LOGGING - error_code ignore; - session_log("failed to bind listen socket to: %s on device: %s :" - " [%s] (%d) %s (giving up)" - , print_endpoint(bind_ep).c_str() - , device.c_str() - , ec.category().name(), ec.value(), ec.message().c_str()); -#endif - if (m_alerts.should_post()) - { - m_alerts.emplace_alert(device, bind_ep - , last_op, ec, sock_type); - } - ret.sock.reset(); - return ret; - } - ret.local_endpoint = ret.sock->local_endpoint(ec); - last_op = listen_failed_alert::get_socket_name; if (ec) { #ifndef TORRENT_DISABLE_LOGGING - session_log("get_sockname failed on listen socket: %s" - , ec.message().c_str()); -#endif - if (m_alerts.should_post()) - { - m_alerts.emplace_alert(device, bind_ep - , last_op, ec, sock_type); - } - return ret; - } - ret.tcp_external_port = ret.local_endpoint.port(); - - ret.sock->listen(m_settings.get_int(settings_pack::listen_queue_size), ec); - last_op = listen_failed_alert::listen; - - if (ec) - { -#ifndef TORRENT_DISABLE_LOGGING - session_log("cannot listen on interface \"%s\": %s" + session_log("failed to open UDP socket: %s: %s" , device.c_str(), ec.message().c_str()); #endif + + listen_failed_alert::socket_type_t const udp_sock_type + = (flags & open_ssl_socket) + ? listen_failed_alert::utp_ssl + : listen_failed_alert::udp; + if (m_alerts.should_post()) - { - m_alerts.emplace_alert(device, bind_ep - , last_op, ec, sock_type); - } + m_alerts.emplace_alert(device + , bind_ep, last_op, ec, udp_sock_type); + return ret; } + ret.udp_external_port = ret.udp_sock->local_port(); + + error_code err; + set_socket_buffer_size(*ret.udp_sock, m_settings, err); + if (err) + { + if (m_alerts.should_post()) + m_alerts.emplace_alert(ret.udp_sock->local_endpoint(ec), err); + } + + ret.udp_sock->set_force_proxy(m_settings.get_bool(settings_pack::force_proxy)); + + // TODO: 2 use a handler allocator here + ADD_OUTSTANDING_ASYNC("session_impl::on_udp_packet"); + ret.udp_sock->async_read(boost::bind(&session_impl::on_udp_packet + , this, boost::weak_ptr(ret.udp_sock), ret.ssl, _1)); #ifndef TORRENT_DISABLE_LOGGING - session_log(" listening on: %s TCP port: %d" - , print_endpoint(bind_ep).c_str() - , ret.tcp_external_port); + session_log(" listening on: %s TCP port: %d UDP port: %d" + , bind_ep.address().to_string().c_str() + , ret.tcp_external_port, ret.udp_external_port); #endif return ret; } @@ -1898,7 +1968,8 @@ namespace aux { for (std::list::iterator i = m_listen_sockets.begin() , end(m_listen_sockets.end()); i != end; ++i) { - i->sock->close(ec); + if (i->sock) i->sock->close(ec); + if (i->udp_sock) i->udp_sock->close(); } m_listen_sockets.clear(); @@ -1920,10 +1991,10 @@ namespace aux { // First, check to see if it's an IP address error_code err; - address adr = address::from_string(device.c_str(), err); + address const adr = address::from_string(device.c_str(), err); if (!err) { - listen_socket_t s = setup_listener("", tcp::endpoint(adr, port) + listen_socket_t const s = setup_listener("", tcp::endpoint(adr, port) , flags | (ssl ? open_ssl_socket : 0), ec); if (!ec && s.sock) @@ -1936,6 +2007,7 @@ namespace aux { // this is the case where device names a network device. We need to // enumerate all IPs associated with this device + // TODO: 3 only run this once, not every turn through the loop std::vector ifs = enum_net_interfaces(m_io_service, ec); if (ec) { @@ -1959,7 +2031,7 @@ namespace aux { // connecting to) if (device != ifs[k].name) continue; - listen_socket_t s = setup_listener(device + listen_socket_t const s = setup_listener(device , tcp::endpoint(ifs[k].interface_address, port) , flags | (ssl ? open_ssl_socket : 0), ec); @@ -1979,167 +2051,46 @@ namespace aux { return; } - // TODO: 3 this loop should be entirely merged with the one above and the - // udp sockets should be opened in parallel with the TCP ones, being held - // by listen_socket_t. - // until the UDP sockets fully honor the listen_interfaces setting, just - // create the two sockets based on the first matching (ssl vs. non-ssl) - // TCP socket -#ifdef TORRENT_USE_OPENSSL - bool created_ssl_udp_socket = false; -#endif - bool created_udp_socket = false; - for (std::list::const_iterator i = m_listen_sockets.begin() - , end(m_listen_sockets.end()); i != end; ++i) + // now, send out listen_succeeded_alert for the listen sockets we are + // listening on + if (m_alerts.should_post()) { - listen_socket_t const& s = *i; - -#ifdef TORRENT_USE_OPENSSL - if (!created_ssl_udp_socket && s.ssl) + for (std::list::iterator i = m_listen_sockets.begin() + , end(m_listen_sockets.end()); i != end; ++i) { - int retries = m_settings.get_int(settings_pack::max_retry_port_bind); - udp::endpoint bind_ep(s.local_endpoint.address(), s.local_endpoint.port()); - do + error_code err; + if (i->sock) { - ec.clear(); - m_ssl_udp_socket.bind(bind_ep, ec); - if (ec) + tcp::endpoint const tcp_ep = i->sock->local_endpoint(err); + if (!err) { -#ifndef TORRENT_DISABLE_LOGGING - session_log("SSL: cannot bind to UDP interface \"%s\": %s" - , print_endpoint(bind_ep).c_str(), ec.message().c_str()); -#endif - if (m_alerts.should_post()) - { - error_code err; - m_alerts.emplace_alert(bind_ep.address().to_string(err) - , tcp::endpoint(bind_ep.address(), bind_ep.port()) - , listen_failed_alert::bind, ec, listen_failed_alert::utp_ssl); - } - --retries; - bind_ep.port(bind_ep.port() + 1); - } - else - { - created_ssl_udp_socket = true; - maybe_update_udp_mapping(0, true, bind_ep.port(), bind_ep.port()); - maybe_update_udp_mapping(1, true, bind_ep.port(), bind_ep.port()); - } - } while (ec == error_code(error::address_in_use) && retries > 0); - } -#endif // TORRENT_USE_OPENSSL + listen_succeeded_alert::socket_type_t const socket_type + = i->ssl + ? listen_succeeded_alert::tcp_ssl + : listen_succeeded_alert::tcp; - if (!created_udp_socket && !s.ssl) - { - int retries = m_settings.get_int(settings_pack::max_retry_port_bind); - udp::endpoint bind_ep(s.local_endpoint.address(), s.local_endpoint.port()); - do + m_alerts.emplace_alert( + tcp_ep , socket_type); + } + } + + if (i->udp_sock) { - ec.clear(); - m_udp_socket.bind(bind_ep, ec); - if (ec) + udp::endpoint const udp_ep = i->udp_sock->local_endpoint(err); + if (!err && i->udp_sock->is_open()) { -#ifndef TORRENT_DISABLE_LOGGING - session_log("cannot bind to UDP interface \"%s\": %s" - , print_endpoint(bind_ep).c_str(), ec.message().c_str()); -#endif - if (m_alerts.should_post()) - { - error_code err; - m_alerts.emplace_alert(bind_ep.address().to_string(err) - , tcp::endpoint(bind_ep.address(), bind_ep.port()) - , listen_failed_alert::bind - , ec, listen_failed_alert::udp); - } - --retries; - bind_ep.port(bind_ep.port() + 1); + listen_succeeded_alert::socket_type_t const socket_type + = i->ssl + ? listen_succeeded_alert::utp_ssl + : listen_succeeded_alert::udp; + + m_alerts.emplace_alert( + tcp::endpoint(udp_ep.address(), udp_ep.port()), socket_type); } - else - { - created_udp_socket = true; - m_external_udp_port = m_udp_socket.local_port(); - maybe_update_udp_mapping(0, false, bind_ep.port(), bind_ep.port()); - maybe_update_udp_mapping(1, false, bind_ep.port(), bind_ep.port()); - } - } while (ec == error_code(error::address_in_use) && retries > 0); + } } } - // if we did not end up opening a udp socket, make sure we close any - // previous one -#ifdef TORRENT_USE_OPENSSL - if (!created_ssl_udp_socket) - { - m_ssl_udp_socket.close(); - - // if there are mappings for the SSL socket, delete them now - if (m_ssl_udp_mapping[0] != -1 && m_natpmp) - { - m_natpmp->delete_mapping(m_ssl_udp_mapping[0]); - m_ssl_udp_mapping[0] = -1; - } - if (m_ssl_udp_mapping[1] != -1 && m_upnp) - { - m_upnp->delete_mapping(m_ssl_udp_mapping[1]); - m_ssl_udp_mapping[1] = -1; - } - } -#endif - if (!created_udp_socket) - { - m_udp_socket.close(); - - // if there are mappings for the socket, delete them now - if (m_udp_mapping[0] != -1 && m_natpmp) - { - m_natpmp->delete_mapping(m_udp_mapping[0]); - m_udp_mapping[0] = -1; - } - if (m_udp_mapping[1] != -1 && m_upnp) - { - m_upnp->delete_mapping(m_udp_mapping[1]); - m_udp_mapping[1] = -1; - } - } - - // we made it! now post all the listen_succeeded_alerts - - for (std::list::iterator i = m_listen_sockets.begin() - , end(m_listen_sockets.end()); i != end; ++i) - { - listen_succeeded_alert::socket_type_t const socket_type = i->ssl - ? listen_succeeded_alert::tcp_ssl - : listen_succeeded_alert::tcp; - - if (!m_alerts.should_post()) continue; - - error_code error; - tcp::endpoint bind_ep = i->sock->local_endpoint(error); - if (error) continue; - - m_alerts.emplace_alert(bind_ep, socket_type); - } - -#ifdef TORRENT_USE_OPENSSL - if (m_ssl_udp_socket.is_open()) - { - error_code err; - if (m_alerts.should_post()) - m_alerts.emplace_alert( - m_ssl_udp_socket.local_endpoint(err) - , listen_succeeded_alert::utp_ssl); - } -#endif - - if (m_udp_socket.is_open()) - { - error_code err; - if (m_alerts.should_post()) - m_alerts.emplace_alert( - m_udp_socket.local_endpoint(err) - , listen_succeeded_alert::udp); - } - if (m_settings.get_int(settings_pack::peer_tos) != 0) { update_peer_tos(); @@ -2147,19 +2098,12 @@ namespace aux { ec.clear(); - set_socket_buffer_size(m_udp_socket, m_settings, ec); - if (ec) - { - if (m_alerts.should_post()) - m_alerts.emplace_alert(udp::endpoint(), ec); - } - // initiate accepting on the listen sockets for (std::list::iterator i = m_listen_sockets.begin() , end(m_listen_sockets.end()); i != end; ++i) { - async_accept(i->sock, i->ssl); - remap_ports(3, *i); + if (i->sock) async_accept(i->sock, i->ssl); + remap_ports(remap_natpmp_and_upnp, *i); } open_new_incoming_socks_connection(); @@ -2168,20 +2112,36 @@ namespace aux { #endif } - // TODO: 3 add an enum for the mask parameter here - void session_impl::remap_ports(boost::uint32_t mask, listen_socket_t& s) - { - if ((mask & 1) && m_natpmp) + namespace { + template + void map_port(MapProtocol& m, ProtoType protocol, EndpointType const& ep + , int& map_handle) { - if (s.tcp_port_mapping[0] != -1) m_natpmp->delete_mapping(s.tcp_port_mapping[0]); - s.tcp_port_mapping[0] = m_natpmp->add_mapping(natpmp::tcp - , s.local_endpoint.port(), s.local_endpoint.port()); + if (map_handle != -1) m.delete_mapping(map_handle); + map_handle = -1; + + // only update this mapping if we actually have a socket listening + if (ep.address() != address()) + map_handle = m.add_mapping(protocol, ep.port(), ep.port()); } - if ((mask & 2) && m_upnp) + } + + void session_impl::remap_ports(remap_port_mask_t const mask + , listen_socket_t& s) + { + error_code ec; + tcp::endpoint const tcp_ep = s.sock ? s.sock->local_endpoint(ec) : tcp::endpoint(); + udp::endpoint const udp_ep = s.udp_sock ? s.udp_sock->local_endpoint(ec) : udp::endpoint(); + + if ((mask & remap_natpmp) && m_natpmp) { - if (s.tcp_port_mapping[1] != -1) m_upnp->delete_mapping(s.tcp_port_mapping[1]); - s.tcp_port_mapping[1] = m_upnp->add_mapping(upnp::tcp - , s.local_endpoint.port(), s.local_endpoint.port()); + map_port(*m_natpmp, natpmp::tcp, tcp_ep, s.tcp_port_mapping[0]); + map_port(*m_natpmp, natpmp::udp, udp_ep, s.udp_port_mapping[0]); + } + if ((mask & remap_upnp) && m_upnp) + { + map_port(*m_upnp, upnp::tcp, tcp_ep, s.tcp_port_mapping[1]); + map_port(*m_upnp, upnp::udp, udp_ep, s.udp_port_mapping[1]); } } @@ -2306,23 +2266,238 @@ namespace aux { } #endif - bool session_impl::incoming_packet(error_code const& ec - , udp::endpoint const& ep, char const*, int) + void session_impl::send_udp_packet_hostname(char const* hostname + , int const port + , array_view p + , error_code& ec + , int const flags) { - m_stats_counters.inc_stats_counter(counters::on_udp_counter); + // for now, just pick the first socket with a matching address family + // TODO: 3 for proper multi-homed support, we may want to do something + // else here. Probably let the caller decide which interface to send over + for (std::list::iterator i = m_listen_sockets.begin() + , end(m_listen_sockets.end()); i != end; ++i) + { + if (!i->udp_sock) continue; + if (i->ssl) continue; + i->udp_sock->send_hostname(hostname, port, p, ec, flags); + + if ((ec == error::would_block + || ec == error::try_again) + && !i->udp_write_blocked) + { + i->udp_write_blocked = true; + ADD_OUTSTANDING_ASYNC("session_impl::on_udp_writeable"); + i->udp_sock->async_write(boost::bind(&session_impl::on_udp_writeable + , this, boost::weak_ptr(i->udp_sock), _1)); + } + return; + } + ec = boost::asio::error::operation_not_supported; + } + + void session_impl::send_udp_packet(bool const ssl + , udp::endpoint const& ep + , array_view p + , error_code& ec + , int const flags) + { + // for now, just pick the first socket with a matching address family + // TODO: 3 for proper multi-homed support, we may want to do something + // else here. Probably let the caller decide which interface to send over + for (std::list::iterator i = m_listen_sockets.begin() + , end(m_listen_sockets.end()); i != end; ++i) + { + if (i->ssl != ssl) continue; + if (!i->udp_sock) continue; + if (i->local_endpoint.address().is_v4() != ep.address().is_v4()) + continue; + + i->udp_sock->send(ep, p, ec, flags); + + if ((ec == error::would_block + || ec == error::try_again) + && !i->udp_write_blocked) + { + i->udp_write_blocked = true; + ADD_OUTSTANDING_ASYNC("session_impl::on_udp_writeable"); + i->udp_sock->async_write(boost::bind(&session_impl::on_udp_writeable + , this, boost::weak_ptr(i->udp_sock), _1)); + } + return; + } + ec = boost::asio::error::operation_not_supported; + } + + void session_impl::on_udp_writeable(boost::weak_ptr s, error_code const& ec) + { + COMPLETE_ASYNC("session_impl::on_udp_writeable"); + if (ec) return; + + boost::shared_ptr sock = s.lock(); + if (!sock) return; + + std::list::iterator i = std::find_if( + m_listen_sockets.begin(), m_listen_sockets.end() + , boost::bind(&listen_socket_t::udp_sock, _1) == sock); + + if (i == m_listen_sockets.end()) return; + + i->udp_write_blocked = false; + + // notify the utp socket manager it can start sending on the socket again + struct utp_socket_manager& mgr = +#ifdef TORRENT_USE_OPENSSL + i->ssl ? m_ssl_utp_socket_manager : +#endif + m_utp_socket_manager; + + mgr.writable(); + } + + + void session_impl::on_udp_packet(boost::weak_ptr const& socket + , bool const ssl, error_code const& ec) + { + COMPLETE_ASYNC("session_impl::on_udp_packet"); if (ec) { + boost::shared_ptr s = socket.lock(); + udp::endpoint ep; + error_code best_effort; + if (s) ep = s->local_endpoint(best_effort); + // don't bubble up operation aborted errors to the user if (ec != boost::asio::error::operation_aborted + && ec != boost::asio::error::bad_descriptor && m_alerts.should_post()) + { m_alerts.emplace_alert(ep, ec); + } #ifndef TORRENT_DISABLE_LOGGING - session_log("UDP socket error: (%d) %s", ec.value(), ec.message().c_str()); + session_log("UDP error: %s (%d) %s" + , print_endpoint(ep).c_str(), ec.value(), ec.message().c_str()); #endif + return; } - return false; + + m_stats_counters.inc_stats_counter(counters::on_udp_counter); + + boost::shared_ptr s = socket.lock(); + if (!s) return; + + struct utp_socket_manager& mgr = +#ifdef TORRENT_USE_OPENSSL + ssl ? m_ssl_utp_socket_manager : +#endif + m_utp_socket_manager; + + for (;;) + { + boost::array p; + error_code err; + int const num_packets = s->read(array_view(p), err); + + for (int i = 0; i < num_packets; ++i) + { + udp_socket::packet& packet = p[i]; + + if (packet.error) + { + // TODO: 3 it would be neat if the utp socket manager would + // handle ICMP errors too + +#ifndef TORRENT_DISABLE_DHT + if (m_dht) + m_dht->incoming_error(packet.error, packet.from); +#endif + + m_tracker_manager.incoming_error(packet.error, packet.from); + continue; + } + + char* buf = packet.data.data(); + int const len = packet.data.size(); + + // give the uTP socket manager first dis on the packet. Presumably + // the majority of packets are uTP packets. + if (!mgr.incoming_packet(packet.from, buf, len)) + { + // if it wasn't a uTP packet, try the other users of the UDP + // socket + bool handled = false; +#ifndef TORRENT_DISABLE_DHT + if (m_dht && len > 20 && buf[0] == 'd' && buf[len-1] == 'e') + { + handled = m_dht->incoming_packet(packet.from, buf, len); + } +#endif + + if (!handled) + { + m_tracker_manager.incoming_packet(packet.from, buf, len); + } + } + } + + if (err == error::would_block || err == error::try_again) + { + // there are no more packets on the socket + break; + } + + if (err) + { + error_code best_effort; + udp::endpoint ep = s->local_endpoint(best_effort); + + if (err != boost::asio::error::operation_aborted + && m_alerts.should_post()) + m_alerts.emplace_alert(ep, err); + +#ifndef TORRENT_DISABLE_LOGGING + session_log("UDP error: %s (%d) %s" + , print_endpoint(ep).c_str(), ec.value(), ec.message().c_str()); +#endif + + // any error other than these ones are considered fatal errors, and + // we won't read from the socket again + if (err != boost::asio::error::host_unreachable + && err != boost::asio::error::fault + && err != boost::asio::error::connection_reset + && err != boost::asio::error::connection_refused + && err != boost::asio::error::connection_aborted + && err != boost::asio::error::operation_aborted + && err != boost::asio::error::network_reset + && err != boost::asio::error::network_unreachable +#ifdef WIN32 + // ERROR_MORE_DATA means the same thing as EMSGSIZE + && err != error_code(ERROR_MORE_DATA, system_category()) + && err != error_code(ERROR_HOST_UNREACHABLE, system_category()) + && err != error_code(ERROR_PORT_UNREACHABLE, system_category()) + && err != error_code(ERROR_RETRY, system_category()) + && err != error_code(ERROR_NETWORK_UNREACHABLE, system_category()) + && err != error_code(ERROR_CONNECTION_REFUSED, system_category()) + && err != error_code(ERROR_CONNECTION_ABORTED, system_category()) +#endif + && err != boost::asio::error::message_size) + { + // fatal errors. Don't try to read from this socket again + mgr.socket_drained(); + return; + } + // non-fatal UDP errors get here, we should re-issue the read. + continue; + } + } + + mgr.socket_drained(); + + ADD_OUTSTANDING_ASYNC("session_impl::on_udp_packet"); + s->async_read(boost::bind(&session_impl::on_udp_packet + , this, socket, ssl, _1)); } void session_impl::async_accept(boost::shared_ptr const& listener, bool ssl) @@ -2360,7 +2535,8 @@ namespace aux { } void session_impl::on_accept_connection(shared_ptr const& s - , weak_ptr listen_socket, error_code const& e, bool ssl) + , weak_ptr listen_socket, error_code const& e + , bool const ssl) { COMPLETE_ASYNC("session_impl::on_accept_connection"); m_stats_counters.inc_stats_counter(counters::on_accept_counter); @@ -2971,11 +3147,21 @@ namespace aux { if (m_abort) { if (m_utp_socket_manager.num_sockets() == 0 +#ifdef TORRENT_USE_OPENSSL + && m_ssl_utp_socket_manager.num_sockets() == 0 +#endif && m_undead_peers.empty()) + { return; + } #if defined TORRENT_ASIO_DEBUGGING - fprintf(stderr, "uTP sockets left: %d undead-peers left: %d\n" + fprintf(stderr, "uTP sockets: %d ssl-uTP sockets: %d undead-peers left: %d\n" , m_utp_socket_manager.num_sockets() +#ifdef TORRENT_USE_OPENSSL + , m_ssl_utp_socket_manager.num_sockets() +#else + , 0 +#endif , int(m_undead_peers.size())); #endif } @@ -5170,11 +5356,11 @@ namespace aux { // in case we just set a socks proxy, we might have to // open the socks incoming connection if (!m_socks_listen_socket) open_new_incoming_socks_connection(); - m_udp_socket.set_proxy_settings(proxy()); - -#ifdef TORRENT_USE_OPENSSL - m_ssl_udp_socket.set_proxy_settings(proxy()); -#endif + for (std::list::iterator i = m_listen_sockets.begin() + , end(m_listen_sockets.end()); i != end; ++i) + { + i->udp_sock->set_proxy_settings(proxy()); + } } void session_impl::update_upnp() @@ -5281,9 +5467,6 @@ namespace aux { { if (i->ssl) return i->tcp_external_port; } - - if (m_ssl_udp_socket.is_open()) - return m_ssl_udp_socket.local_port(); #endif return 0; } @@ -5338,6 +5521,11 @@ namespace aux { { return ls.tcp_port_mapping[transport] == mapping; } + + bool find_udp_port_mapping(int transport, int mapping, listen_socket_t const& ls) + { + return ls.udp_port_mapping[transport] == mapping; + } } // transport is 0 for NAT-PMP and 1 for UPnP @@ -5354,16 +5542,6 @@ namespace aux { , map_transport, ec); } - if (mapping == m_udp_mapping[map_transport] && port != 0) - { - m_external_udp_port = port; - if (m_alerts.should_post()) - m_alerts.emplace_alert(mapping, port - , map_transport, protocol == natpmp::udp - ? portmap_alert::udp : portmap_alert::tcp); - return; - } - // look through our listen sockets to see if this mapping is for one of // them (it could also be a user mapping) @@ -5371,6 +5549,14 @@ namespace aux { = std::find_if(m_listen_sockets.begin(), m_listen_sockets.end() , boost::bind(find_tcp_port_mapping, map_transport, mapping, _1)); + bool tcp = true; + if (ls == m_listen_sockets.end()) + { + ls = std::find_if(m_listen_sockets.begin(), m_listen_sockets.end() + , boost::bind(find_udp_port_mapping, map_transport, mapping, _1)); + tcp = false; + } + if (ls != m_listen_sockets.end()) { if (ip != address()) @@ -5381,7 +5567,8 @@ namespace aux { } ls->external_address = ip; - ls->tcp_external_port = port; + if (tcp) ls->tcp_external_port = port; + else ls->udp_external_port = port; } if (!ec && m_alerts.should_post()) @@ -5542,8 +5729,11 @@ namespace aux { // postpone starting the DHT if we're still resolving the DHT router if (m_outstanding_router_lookups > 0) return; - m_dht = boost::make_shared(static_cast(this) - , boost::ref(m_udp_socket), boost::cref(m_dht_settings) + m_dht = boost::make_shared( + static_cast(this) + , boost::ref(m_io_service) + , boost::bind(&session_impl::send_udp_packet, this, false, _1, _2, _3, _4) + , boost::cref(m_dht_settings) , boost::ref(m_stats_counters) , m_dht_storage_constructor , startup_state); @@ -5562,14 +5752,11 @@ namespace aux { m_dht_nodes.clear(); m_dht->start(startup_state, boost::bind(&on_bootstrap, boost::ref(m_alerts))); - - m_udp_socket.subscribe(m_dht.get()); } void session_impl::stop_dht() { if (!m_dht) return; - m_udp_socket.unsubscribe(m_dht.get()); m_dht->stop(); m_dht.reset(); } @@ -5784,50 +5971,6 @@ namespace aux { #endif - void session_impl::maybe_update_udp_mapping(int const nat, bool const ssl - , int const local_port, int const external_port) - { - int local, external, protocol; -#ifdef TORRENT_USE_OPENSSL - int* mapping = ssl ? m_ssl_udp_mapping : m_udp_mapping; -#else - TORRENT_UNUSED(ssl); - int* mapping = m_udp_mapping; -#endif - if (nat == 0 && m_natpmp) - { - if (mapping[nat] != -1) - { - if (m_natpmp->get_mapping(mapping[nat], local, external, protocol)) - { - // we already have a mapping. If it's the same, don't do anything - if (local == local_port && external == external_port && protocol == natpmp::udp) - return; - } - m_natpmp->delete_mapping(mapping[nat]); - } - mapping[nat] = m_natpmp->add_mapping(natpmp::udp - , local_port, external_port); - return; - } - else if (nat == 1 && m_upnp) - { - if (mapping[nat] != -1) - { - if (m_upnp->get_mapping(mapping[nat], local, external, protocol)) - { - // we already have a mapping. If it's the same, don't do anything - if (local == local_port && external == external_port && protocol == natpmp::udp) - return; - } - m_upnp->delete_mapping(mapping[nat]); - } - mapping[nat] = m_upnp->add_mapping(upnp::udp - , local_port, external_port); - return; - } - } - #if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) void session_impl::add_obfuscated_hash(sha1_hash const& obfuscated , boost::weak_ptr const& t) @@ -5846,15 +5989,6 @@ namespace aux { // this is not allowed to be the network thread! // TORRENT_ASSERT(is_not_thread()); - m_udp_socket.unsubscribe(this); - m_udp_socket.unsubscribe(&m_utp_socket_manager); - m_udp_socket.unsubscribe(&m_tracker_manager); - -#ifdef TORRENT_USE_OPENSSL - m_ssl_udp_socket.unsubscribe(this); - m_ssl_udp_socket.unsubscribe(&m_ssl_utp_socket_manager); -#endif - TORRENT_ASSERT(m_torrents.empty()); TORRENT_ASSERT(m_connections.empty()); @@ -5976,33 +6110,47 @@ namespace aux { } #endif + + namespace { + template + void set_tos(Socket& s, int v, error_code& ec) + { +#if TORRENT_USE_IPV6 + if (s.local_endpoint(ec).address().is_v6()) + s.set_option(traffic_class(v), ec); + else if (!ec) +#endif + s.set_option(type_of_service(v), ec); + } + } + // TODO: 2 this should be factored into the udp socket, so we only have the // code once void session_impl::update_peer_tos() { - error_code ec; - -#if TORRENT_USE_IPV6 && defined IPV6_TCLASS - if (m_udp_socket.local_endpoint(ec).address().is_v6()) - m_udp_socket.set_option(traffic_class(m_settings.get_int(settings_pack::peer_tos)), ec); - else -#endif - m_udp_socket.set_option(type_of_service(m_settings.get_int(settings_pack::peer_tos)), ec); - -#ifdef TORRENT_USE_OPENSSL -#if TORRENT_USE_IPV6 && defined IPV6_TCLASS - if (m_ssl_udp_socket.local_endpoint(ec).address().is_v6()) - m_ssl_udp_socket.set_option(traffic_class(m_settings.get_int(settings_pack::peer_tos)), ec); - else -#endif - m_ssl_udp_socket.set_option(type_of_service(m_settings.get_int(settings_pack::peer_tos)), ec); -#endif + int const tos = m_settings.get_int(settings_pack::peer_tos); + for (std::list::iterator i = m_listen_sockets.begin() + , end(m_listen_sockets.end()); i != end; ++i) + { + error_code ec; + set_tos(*i->sock, tos, ec); #ifndef TORRENT_DISABLE_LOGGING - session_log(">>> SET_TOS [ udp_socket tos: %x e: %s ]" - , m_settings.get_int(settings_pack::peer_tos) - , ec.message().c_str()); + error_code err; + session_log(">>> SET_TOS [ tcp (%s %d) tos: %x e: %s ]" + , i->sock->local_endpoint(err).address().to_string().c_str() + , i->sock->local_endpoint(err).port(), tos, ec.message().c_str()); #endif + ec.clear(); + set_tos(*i->udp_sock, tos, ec); + +#ifndef TORRENT_DISABLE_LOGGING + session_log(">>> SET_TOS [ udp (%s %d) tos: %x e: %s ]" + , i->udp_sock->local_endpoint(err).address().to_string().c_str() + , i->udp_sock->local_port() + , tos, ec.message().c_str()); +#endif + } } void session_impl::update_user_agent() @@ -6182,22 +6330,32 @@ namespace aux { void session_impl::update_socket_buffer_size() { - error_code ec; - set_socket_buffer_size(m_udp_socket, m_settings, ec); - if (ec) + for (std::list::iterator i = m_listen_sockets.begin() + , end(m_listen_sockets.end()); i != end; ++i) { - if (m_alerts.should_post()) - m_alerts.emplace_alert(udp::endpoint(), ec); - } - -#ifdef TORRENT_USE_OPENSSL - set_socket_buffer_size(m_ssl_udp_socket, m_settings, ec); - if (ec) - { - if (m_alerts.should_post()) - m_alerts.emplace_alert(udp::endpoint(), ec); - } + error_code ec; + set_socket_buffer_size(*i->udp_sock, m_settings, ec); +#ifndef TORRENT_DISABLE_LOGGING + if (ec) + { + error_code err; + session_log("socket buffer size [ udp %s %d]: (%d) %s" + , i->udp_sock->local_endpoint(err).address().to_string(err).c_str() + , i->udp_sock->local_port(), ec.value(), ec.message().c_str()); + } #endif + ec.clear(); + set_socket_buffer_size(*i->sock, m_settings, ec); +#ifndef TORRENT_DISABLE_LOGGING + if (ec) + { + error_code err; + session_log("socket buffer size [ udp %s %d]: (%d) %s" + , i->sock->local_endpoint(err).address().to_string(err).c_str() + , i->sock->local_endpoint(err).port(), ec.value(), ec.message().c_str()); + } +#endif + } } void session_impl::update_dht_announce_interval() @@ -6241,10 +6399,19 @@ namespace aux { void session_impl::update_force_proxy() { - m_udp_socket.set_force_proxy(m_settings.get_bool(settings_pack::force_proxy)); -#ifdef TORRENT_USE_OPENSSL - m_ssl_udp_socket.set_force_proxy(m_settings.get_bool(settings_pack::force_proxy)); -#endif + for (std::list::iterator i = m_listen_sockets.begin() + , end(m_listen_sockets.end()); i != end; ++i) + { + i->udp_sock->set_force_proxy(m_settings.get_bool(settings_pack::force_proxy)); + + // close the TCP listen sockets + if (i->sock) + { + error_code ec; + i->sock->close(ec); + i->sock.reset(); + } + } if (!m_settings.get_bool(settings_pack::force_proxy)) return; @@ -6256,12 +6423,6 @@ namespace aux { #ifndef TORRENT_DISABLE_DHT stop_dht(); #endif - // close the listen sockets - error_code ec; - for (std::list::iterator i = m_listen_sockets.begin() - , end(m_listen_sockets.end()); i != end; ++i) - i->sock->close(ec); - m_listen_sockets.clear(); } #ifndef TORRENT_NO_DEPRECATE @@ -6522,33 +6683,8 @@ namespace aux { for (std::list::iterator i = m_listen_sockets.begin() , end(m_listen_sockets.end()); i != end; ++i) { - remap_ports(1, *i); + remap_ports(remap_natpmp, *i); } - - // TODO: 3 once UDP sockets are part of m_listen_sockets, this is not - // necesarry! - if (m_udp_socket.is_open()) - { - error_code ec; - tcp::endpoint ep = m_udp_socket.local_endpoint(ec); - if (!ec) { - if (m_udp_mapping[0] != -1) m_natpmp->delete_mapping(m_udp_mapping[0]); - m_udp_mapping[0] = m_natpmp->add_mapping(natpmp::udp - , ep.port(), ep.port()); - } - } -#ifdef TORRENT_USE_OPENSSL - if (m_ssl_udp_socket.is_open()) - { - error_code ec; - tcp::endpoint ep = m_ssl_udp_socket.local_endpoint(ec); - if (!ec) { - if (m_ssl_udp_mapping[0] != -1) m_natpmp->delete_mapping(m_ssl_udp_mapping[0]); - m_ssl_udp_mapping[0] = m_natpmp->add_mapping(natpmp::udp - , ep.port(), ep.port()); - } - } -#endif return m_natpmp.get(); } @@ -6573,33 +6709,8 @@ namespace aux { for (std::list::iterator i = m_listen_sockets.begin() , end(m_listen_sockets.end()); i != end; ++i) { - remap_ports(1, *i); + remap_ports(remap_upnp, *i); } - - // TODO: 3 once the UDP sockets are part of m_listen_sockets this won't be - // necessary! - if (m_udp_socket.is_open()) - { - error_code ec; - tcp::endpoint ep = m_udp_socket.local_endpoint(ec); - if (!ec) { - if (m_udp_mapping[1] != -1) m_upnp->delete_mapping(m_udp_mapping[1]); - m_udp_mapping[1] = m_upnp->add_mapping(upnp::udp - , ep.port(), ep.port()); - } - } -#ifdef TORRENT_USE_OPENSSL - if (m_ssl_udp_socket.is_open()) - { - error_code ec; - tcp::endpoint ep = m_ssl_udp_socket.local_endpoint(ec); - if (!ec) { - if (m_ssl_udp_mapping[1] != -1) m_upnp->delete_mapping(m_ssl_udp_mapping[1]); - m_ssl_udp_mapping[1] = m_upnp->add_mapping(upnp::udp - , ep.port(), ep.port()); - } - } -#endif return m_upnp.get(); } @@ -6636,12 +6747,9 @@ namespace aux { , end(m_listen_sockets.end()); i != end; ++i) { i->tcp_port_mapping[0] = -1; + i->udp_port_mapping[0] = -1; } - m_udp_mapping[0] = -1; -#ifdef TORRENT_USE_OPENSSL - m_ssl_udp_mapping[0] = -1; -#endif m_natpmp.reset(); } @@ -6654,11 +6762,8 @@ namespace aux { , end(m_listen_sockets.end()); i != end; ++i) { i->tcp_port_mapping[1] = -1; + i->udp_port_mapping[1] = -1; } - m_udp_mapping[1] = -1; -#ifdef TORRENT_USE_OPENSSL - m_ssl_udp_mapping[1] = -1; -#endif m_upnp.reset(); } diff --git a/src/torrent.cpp b/src/torrent.cpp index fa7ec17a8..6d73233dd 100644 --- a/src/torrent.cpp +++ b/src/torrent.cpp @@ -3410,7 +3410,7 @@ namespace libtorrent // the tracker did resolve to a different type of address, so announce // to that as well - // TODO 2: there's a bug when removing a torrent or shutting down the session, + // TODO 3: there's a bug when removing a torrent or shutting down the session, // where the second announce is skipped (in this case, the one to the IPv6 // name). This should be fixed by generalizing the tracker list structure to // separate the IPv6 and IPv4 addresses as conceptually separate trackers, diff --git a/src/tracker_manager.cpp b/src/tracker_manager.cpp index 61d4599a1..bc114659e 100644 --- a/src/tracker_manager.cpp +++ b/src/tracker_manager.cpp @@ -60,6 +60,8 @@ namespace namespace libtorrent { + using namespace libtorrent::aux; + timeout_handler::timeout_handler(io_service& ios) : m_completion_timeout(0) , m_start_time(clock_type::now()) @@ -191,9 +193,8 @@ namespace libtorrent m_man.remove_request(this); } - // TODO: 2 some of these arguments could probably be moved to the - // tracker request itself. like the ip_filter and settings - tracker_manager::tracker_manager(class udp_socket& sock + tracker_manager::tracker_manager(send_fun_t const& send_fun + , send_fun_hostname_t const& send_fun_hostname , counters& stats_counters , resolver_interface& resolver , aux::session_settings const& sett @@ -201,7 +202,8 @@ namespace libtorrent , aux::session_logger& ses #endif ) - : m_udp_socket(sock) + : m_send_fun(send_fun) + , m_send_fun_hostname(send_fun_hostname) , m_host_resolver(resolver) , m_settings(sett) , m_stats_counters(stats_counters) @@ -231,7 +233,7 @@ namespace libtorrent void tracker_manager::remove_request(tracker_connection const* c) { - mutex_t::scoped_lock l(m_mutex); + mutex::scoped_lock l(m_mutex); http_conns_t::iterator i = std::find_if(m_http_conns.begin() , m_http_conns.end() @@ -266,7 +268,7 @@ namespace libtorrent , tracker_request req , boost::weak_ptr c) { - mutex_t::scoped_lock l(m_mutex); + mutex::scoped_lock l(m_mutex); TORRENT_ASSERT(req.num_want >= 0); TORRENT_ASSERT(!m_abort || req.event == tracker_request::stopped); if (m_abort && req.event != tracker_request::stopped) return; @@ -309,8 +311,8 @@ namespace libtorrent , "", 0)); } - bool tracker_manager::incoming_packet(error_code const& e - , udp::endpoint const& ep, char const* buf, int size) + bool tracker_manager::incoming_packet(udp::endpoint const& ep + , char const* buf, int size) { // ignore packets smaller than 8 bytes if (size < 8) @@ -343,11 +345,19 @@ namespace libtorrent boost::shared_ptr p = i->second; // on_receive() may remove the tracker connection from the list - return p->on_receive(e, ep, buf, size); + return p->on_receive(ep, buf, size); } - bool tracker_manager::incoming_packet(error_code const& e - , char const* hostname, char const* buf, int size) + void tracker_manager::incoming_error(error_code const& ec + , udp::endpoint const& ep) + { + TORRENT_UNUSED(ec); + TORRENT_UNUSED(ep); + // TODO: 2 implement + } + + bool tracker_manager::incoming_packet(char const* hostname + , char const* buf, int size) { // ignore packets smaller than 8 bytes if (size < 16) return false; @@ -374,13 +384,26 @@ namespace libtorrent boost::shared_ptr p = i->second; // on_receive() may remove the tracker connection from the list - return p->on_receive_hostname(e, hostname, buf, size); + return p->on_receive_hostname(hostname, buf, size); + } + + void tracker_manager::send_hostname(char const* hostname, int const port + , array_view p, error_code& ec, int const flags) + { + m_send_fun_hostname(hostname, port, p, ec, flags); + } + + void tracker_manager::send(udp::endpoint const& ep + , array_view p + , error_code& ec, int const flags) + { + m_send_fun(ep, p, ec, flags); } void tracker_manager::abort_all_requests(bool all) { // removes all connections except 'event=stopped'-requests - mutex_t::scoped_lock l(m_mutex); + mutex::scoped_lock l(m_mutex); m_abort = true; http_conns_t close_http_connections; @@ -434,13 +457,13 @@ namespace libtorrent bool tracker_manager::empty() const { - mutex_t::scoped_lock l(m_mutex); + mutex::scoped_lock l(m_mutex); return m_http_conns.empty() && m_udp_conns.empty(); } int tracker_manager::num_requests() const { - mutex_t::scoped_lock l(m_mutex); + mutex::scoped_lock l(m_mutex); return m_http_conns.size() + m_udp_conns.size(); } } diff --git a/src/udp_socket.cpp b/src/udp_socket.cpp index 75cbd6b89..373538eef 100644 --- a/src/udp_socket.cpp +++ b/src/udp_socket.cpp @@ -44,6 +44,9 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/debug.hpp" #include + +#include "libtorrent/aux_/disable_warnings_push.hpp" + #include #include #include @@ -51,84 +54,193 @@ POSSIBILITY OF SUCH DAMAGE. #include #include -using namespace libtorrent; +#include "libtorrent/aux_/disable_warnings_pop.hpp" + +#if defined TORRENT_ASIO_DEBUGGING +#include "libtorrent/debug.hpp" +#endif + +namespace libtorrent { + +using namespace libtorrent::aux; + +// this class hold the state of the SOCKS5 connection to maintain the UDP +// ASSOCIATE tunnel. It's instantiated on the heap for two reasons: +// +// 1. since its asynchronous functions may refer to it after the udp_socket has +// been destructed, it needs to be held by a shared_ptr +// 2. since using a sokcs proxy is assumed to be a less common case, it makes +// the common case cheaper by not allocating this space unconditionally +struct socks5 : boost::enable_shared_from_this +{ + socks5(io_service& ios) + : m_socks5_sock(ios) + , m_resolver(ios) + , m_timer(ios) + , m_abort(false) + , m_active(false) + {} + + void start(aux::proxy_settings const& ps); + void close(); + + bool active() const { return m_active; } + udp::endpoint target() const { return m_udp_proxy_addr; } + +private: + + boost::shared_ptr self() { return shared_from_this(); } + + void on_name_lookup(error_code const& e, tcp::resolver::iterator i); + void on_connect_timeout(error_code const& ec); + void on_connected(error_code const& ec); + void handshake1(error_code const& e); + void handshake2(error_code const& e); + void handshake3(error_code const& e); + void handshake4(error_code const& e); + void socks_forward_udp(); + void connect1(error_code const& e); + void connect2(error_code const& e); + void hung_up(error_code const& e); + + tcp::socket m_socks5_sock; + tcp::resolver m_resolver; + deadline_timer m_timer; + char m_tmp_buf[270]; + + aux::proxy_settings m_proxy_settings; + + // this is the endpoint the proxy server lives at. + // when performing a UDP associate, we get another + // endpoint (presumably on the same IP) where we're + // supposed to send UDP packets. + udp::endpoint m_proxy_addr; + + // this is where UDP packets that are to be forwarded + // are sent. The result from UDP ASSOCIATE is stored + // in here. + udp::endpoint m_udp_proxy_addr; + + // set to true when we've been asked to shut down + bool m_abort; + + // set to true once the tunnel is established + bool m_active; +}; + +#ifdef TORRENT_HAS_DONT_FRAGMENT +struct set_dont_frag +{ + set_dont_frag(udp::socket& sock, bool const df) + : m_socket(sock) + , m_df(df) + { + if (!m_df) return; + error_code ignore_errors; + m_socket.set_option(libtorrent::dont_fragment(true), ignore_errors); + TORRENT_ASSERT_VAL(!ignore_errors, ignore_errors.message()); + } + + ~set_dont_frag() + { + if (!m_df) return; + error_code ignore_errors; + m_socket.set_option(libtorrent::dont_fragment(false), ignore_errors); + TORRENT_ASSERT_VAL(!ignore_errors, ignore_errors.message()); + } + +private: + udp::socket& m_socket; + bool const m_df; +}; +#else +struct set_dont_frag +{ set_dont_frag(udp::socket&, int) {} }; +#endif udp_socket::udp_socket(io_service& ios) - : m_observers_locked(false) - , m_ipv4_sock(ios) - , m_timer(ios) - , m_buf_size(0) - , m_new_buf_size(0) - , m_buf(0) -#if TORRENT_USE_IPV6 - , m_ipv6_sock(ios) -#endif + : m_socket(ios) + , m_buf_size(1500) + , m_buf(NULL) , m_bind_port(0) - , m_v4_outstanding(0) - , m_restart_v4(0) -#if TORRENT_USE_IPV6 - , m_v6_outstanding(0) - , m_restart_v6(false) -#endif - , m_socks5_sock(ios) - , m_resolver(ios) - , m_queue_packets(false) - , m_tunnel_packets(false) , m_force_proxy(false) , m_abort(true) - , m_outstanding_ops(0) -#if TORRENT_USE_IPV6 - , m_v6_write_subscribed(false) -#endif - , m_v4_write_subscribed(false) { -#if TORRENT_USE_ASSERTS - m_magic = 0x1337; - m_started = false; - m_outstanding_when_aborted = -1; - m_outstanding_connect = 0; - m_outstanding_timeout = 0; - m_outstanding_resolve = 0; - m_outstanding_socks = 0; -#endif - - m_buf_size = 2048; - m_new_buf_size = m_buf_size; m_buf = static_cast(malloc(m_buf_size)); } udp_socket::~udp_socket() { free(m_buf); -#if TORRENT_USE_IPV6 - TORRENT_ASSERT_VAL(m_v6_outstanding == 0, m_v6_outstanding); -#endif - TORRENT_ASSERT_VAL(m_v4_outstanding == 0, m_v4_outstanding); - TORRENT_ASSERT(m_magic == 0x1337); - TORRENT_ASSERT(m_observers_locked == false); -#if TORRENT_USE_ASSERTS - m_magic = 0; -#endif - TORRENT_ASSERT(m_outstanding_ops == 0); } -#if TORRENT_USE_ASSERTS - #define CHECK_MAGIC check_magic_ cm_(m_magic) - struct check_magic_ - { - check_magic_(int& m_): m(m_) { TORRENT_ASSERT(m == 0x1337); } - ~check_magic_() { TORRENT_ASSERT(m == 0x1337); } - int& m; - }; -#else - #define CHECK_MAGIC do {} TORRENT_WHILE_0 -#endif - -void udp_socket::send_hostname(char const* hostname, int port - , char const* p, int len, error_code& ec, int flags) +int udp_socket::read(array_view pkts, error_code& ec) { - CHECK_MAGIC; + int const num = pkts.size(); + int ret = 0; + packet p; + while (ret < num) + { + int const len = m_socket.receive_from(boost::asio::buffer(m_buf, m_buf_size) + , p.from, 0, ec); + + if (ec == error::would_block + || ec == error::try_again + || ec == error::operation_aborted + || ec == error::bad_descriptor) + { + return ret; + } + + if (ec == error::interrupted) + { + continue; + } + + if (ec) + { + // SOCKS5 cannot wrap ICMP errors. And even if it could, they certainly + // would not arrive as unwrapped (regular) ICMP errors. If we're using + // a proxy we must ignore these + if (m_force_proxy + || (m_socks5_connection + && m_socks5_connection->active())) continue; + + p.error = ec; + p.data = array_view(); + } + else + { + p.data = array_view(m_buf, len); + + // support packets coming from the SOCKS5 proxy + if (m_socks5_connection && m_socks5_connection->active()) + { + // if the source IP doesn't match the proxy's, ignore the packet + if (p.from != m_socks5_connection->target()) continue; + if (!unwrap(p.from, p.data)) continue; + } + // block incoming packets that aren't coming via the proxy + // if force proxy mode is enabled + else if (m_force_proxy) continue; + } + + pkts[ret] = p; + ++ret; + + // we only have a single buffer for now, so we can only return a + // single packet. In the future though, we could attempt to drain + // the socket here, or maybe even use recvmmsg() + break; + } + + return ret; +} + +void udp_socket::send_hostname(char const* hostname, int const port + , array_view p, error_code& ec, int const flags) +{ TORRENT_ASSERT(is_single_thread()); // if the sockets are closed, the udp_socket is closing too @@ -138,39 +250,28 @@ void udp_socket::send_hostname(char const* hostname, int port return; } - if (m_tunnel_packets) + if (m_socks5_connection && m_socks5_connection->active()) { // send udp packets through SOCKS5 server - wrap(hostname, port, p, len, ec); + wrap(hostname, port, p, ec, flags); return; } - // this function is only supported when we're using a proxy - if (!m_queue_packets && !m_force_proxy) + if (m_force_proxy) { - address target = address::from_string(hostname, ec); - if (!ec) send(udp::endpoint(target, port), p, len, ec, 0); + ec = error_code(boost::system::errc::permission_denied, generic_category()); return; } - if (m_queue.size() > 1000 || (flags & dont_queue)) return; - - m_queue.push_back(queued_packet()); - queued_packet& qp = m_queue.back(); - qp.ep.port(port); - + // the overload that takes a hostname is really only supported when we're + // using a proxy address target = address::from_string(hostname, ec); - if (ec) qp.ep.address(target); - else qp.hostname = allocate_string_copy(hostname); - qp.buf.insert(qp.buf.begin(), p, p + len); - qp.flags = 0; + if (!ec) send(udp::endpoint(target, port), p, ec, flags); } -void udp_socket::send(udp::endpoint const& ep, char const* p, int len - , error_code& ec, int flags) +void udp_socket::send(udp::endpoint const& ep, array_view p + , error_code& ec, int const flags) { - CHECK_MAGIC; - TORRENT_ASSERT(is_single_thread()); // if the sockets are closed, the udp_socket is closing too @@ -186,401 +287,26 @@ void udp_socket::send(udp::endpoint const& ep, char const* p, int len || (flags & (tracker_connection | peer_connection)) == 0 ; - if (allow_proxy) + if (allow_proxy && m_socks5_connection && m_socks5_connection->active()) { - if (m_tunnel_packets) - { - // send udp packets through SOCKS5 server - wrap(ep, p, len, ec); - return; - } - - if (m_queue_packets) - { - if (m_queue.size() > 1000 || (flags & dont_queue)) return; - - m_queue.push_back(queued_packet()); - queued_packet& qp = m_queue.back(); - qp.ep = ep; - qp.hostname = 0; - qp.flags = flags; - qp.buf.insert(qp.buf.begin(), p, p + len); - return; - } + // send udp packets through SOCKS5 server + wrap(ep, p, ec, flags); + return; } if (m_force_proxy) return; -#if TORRENT_USE_IPV6 - if (ep.address().is_v6() && m_ipv6_sock.is_open()) - m_ipv6_sock.send_to(boost::asio::buffer(p, len), ep, 0, ec); - else -#endif - m_ipv4_sock.send_to(boost::asio::buffer(p, len), ep, 0, ec); + // set the DF flag for the socket and clear it again in the destructor + set_dont_frag df(m_socket, (flags & dont_fragment) != 0 + && ep.protocol() == udp::v4()); - if (ec == error::would_block || ec == error::try_again) - { -#if TORRENT_USE_IPV6 - if (ep.address().is_v6() && m_ipv6_sock.is_open()) - { - if (!m_v6_write_subscribed) - { - m_ipv6_sock.async_send(null_buffers() - , boost::bind(&udp_socket::on_writable, this, _1, &m_ipv6_sock)); - m_v6_write_subscribed = true; - } - } - else -#endif - { - if (!m_v4_write_subscribed) - { - m_ipv4_sock.async_send(null_buffers() - , boost::bind(&udp_socket::on_writable, this, _1, &m_ipv4_sock)); - m_v4_write_subscribed = true; - } - } - } + m_socket.send_to(boost::asio::buffer(p.data(), p.size()), ep, 0, ec); } -void udp_socket::on_writable(error_code const& ec, udp::socket* s) +void udp_socket::wrap(udp::endpoint const& ep, array_view p + , error_code& ec, int const flags) { -#if TORRENT_USE_IPV6 - if (s == &m_ipv6_sock) - m_v6_write_subscribed = false; - else -#else - TORRENT_UNUSED(s); -#endif - m_v4_write_subscribed = false; - - if (ec == boost::asio::error::operation_aborted) return; - - call_writable_handler(); -} - -// called whenever the socket is readable -void udp_socket::on_read(error_code const& ec, udp::socket* s) -{ - COMPLETE_ASYNC("udp_socket::on_read"); - - TORRENT_ASSERT(m_magic == 0x1337); - TORRENT_ASSERT(is_single_thread()); - -#if TORRENT_USE_IPV6 - if (s == &m_ipv6_sock) - { - TORRENT_ASSERT(m_v6_outstanding > 0); - --m_v6_outstanding; - } - else -#else - TORRENT_UNUSED(s); -#endif - { - TORRENT_ASSERT(m_v4_outstanding > 0); - --m_v4_outstanding; - } - - if (ec == boost::asio::error::operation_aborted) - { -#if TORRENT_USE_IPV6 - if (s == &m_ipv6_sock) - { - if (m_restart_v6) { - --m_restart_v6; - setup_read(s); - } - } - else -#endif - { - if (m_restart_v4) { - --m_restart_v4; - setup_read(s); - } - } - return; - } - if (m_abort) - { - close_impl(); - return; - } - - CHECK_MAGIC; - - for (;;) - { - error_code err; - udp::endpoint ep; - size_t bytes_transferred = s->receive_from(boost::asio::buffer(m_buf, m_buf_size), ep, 0, err); - - // TODO: it would be nice to detect this on posix systems also -#ifdef TORRENT_WINDOWS - if ((err == error_code(ERROR_MORE_DATA, system_category()) - || err == error_code(WSAEMSGSIZE, system_category())) - && m_buf_size < 65536) - { - // if this function fails to allocate memory, m_buf_size - // is set to 0. In that case, don't issue the async_read(). - set_buf_size(m_buf_size * 2); - if (m_buf_size == 0) return; - continue; - } -#endif - - if (err == boost::asio::error::would_block || err == boost::asio::error::try_again) break; - on_read_impl(ep, err, bytes_transferred); - - // found on iOS, socket will be disconnected when app goes backgroud. try to reopen it. - if (err == boost::asio::error::not_connected || err == boost::asio::error::bad_descriptor) - { - ep = s->local_endpoint(err); - if (!err) { - bind(ep, err); - } - return; - } - } - call_drained_handler(); - setup_read(s); -} - -void udp_socket::call_handler(error_code const& ec, udp::endpoint const& ep, char const* buf, int size) -{ - m_observers_locked = true; - for (std::vector::iterator i = m_observers.begin(); - i != m_observers.end();) - { - bool ret = false; - TORRENT_TRY { - ret = (*i)->incoming_packet(ec, ep, buf, size); - } TORRENT_CATCH (std::exception&) {} - if (*i == NULL) i = m_observers.erase(i); - else ++i; - if (ret) break; - } - if (!m_added_observers.empty()) - { - m_observers.insert(m_observers.end(), m_added_observers.begin(), m_added_observers.end()); - m_added_observers.clear(); - } - m_observers_locked = false; - if (m_new_buf_size != m_buf_size) - set_buf_size(m_new_buf_size); -} - -void udp_socket::call_handler(error_code const& ec, const char* host, char const* buf, int size) -{ - m_observers_locked = true; - for (std::vector::iterator i = m_observers.begin(); - i != m_observers.end();) - { - bool ret = false; - TORRENT_TRY { - ret = (*i)->incoming_packet(ec, host, buf, size); - } TORRENT_CATCH (std::exception&) {} - if (*i == NULL) i = m_observers.erase(i); - else ++i; - if (ret) break; - } - if (!m_added_observers.empty()) - { - m_observers.insert(m_observers.end(), m_added_observers.begin(), m_added_observers.end()); - m_added_observers.clear(); - } - m_observers_locked = false; - if (m_new_buf_size != m_buf_size) - set_buf_size(m_new_buf_size); -} - -void udp_socket::call_drained_handler() -{ - m_observers_locked = true; - for (std::vector::iterator i = m_observers.begin(); - i != m_observers.end();) - { - TORRENT_TRY { - (*i)->socket_drained(); - } TORRENT_CATCH (std::exception&) {} - if (*i == NULL) i = m_observers.erase(i); - else ++i; - } - if (!m_added_observers.empty()) - { - m_observers.insert(m_observers.end(), m_added_observers.begin(), m_added_observers.end()); - m_added_observers.clear(); - } - m_observers_locked = false; - if (m_new_buf_size != m_buf_size) - set_buf_size(m_new_buf_size); -} - -void udp_socket::call_writable_handler() -{ - m_observers_locked = true; - for (std::vector::iterator i = m_observers.begin(); - i != m_observers.end();) - { - TORRENT_TRY { - (*i)->writable(); - } TORRENT_CATCH (std::exception&) {} - if (*i == NULL) i = m_observers.erase(i); - else ++i; - } - if (!m_added_observers.empty()) - { - m_observers.insert(m_observers.end(), m_added_observers.begin(), m_added_observers.end()); - m_added_observers.clear(); - } - m_observers_locked = false; - if (m_new_buf_size != m_buf_size) - set_buf_size(m_new_buf_size); -} - -void udp_socket::subscribe(udp_socket_observer* o) -{ - TORRENT_ASSERT(std::find(m_observers.begin(), m_observers.end(), o) == m_observers.end()); - if (m_observers_locked) - m_added_observers.push_back(o); - else - m_observers.push_back(o); -} - -void udp_socket::unsubscribe(udp_socket_observer* o) -{ - std::vector::iterator i = std::find(m_observers.begin(), m_observers.end(), o); - if (i == m_observers.end()) return; - if (m_observers_locked) - *i = NULL; - else - m_observers.erase(i); -} - -void udp_socket::on_read_impl(udp::endpoint const& ep - , error_code const& e, std::size_t bytes_transferred) -{ - TORRENT_ASSERT(m_magic == 0x1337); - TORRENT_ASSERT(is_single_thread()); - - if (e) - { - call_handler(e, ep, 0, 0); - - // don't stop listening on recoverable errors - if (e != boost::asio::error::host_unreachable - && e != boost::asio::error::fault - && e != boost::asio::error::connection_reset - && e != boost::asio::error::connection_refused - && e != boost::asio::error::connection_aborted - && e != boost::asio::error::operation_aborted - && e != boost::asio::error::network_reset - && e != boost::asio::error::network_unreachable -#ifdef WIN32 - // ERROR_MORE_DATA means the same thing as EMSGSIZE - && e != error_code(ERROR_MORE_DATA, system_category()) - && e != error_code(ERROR_HOST_UNREACHABLE, system_category()) - && e != error_code(ERROR_PORT_UNREACHABLE, system_category()) - && e != error_code(ERROR_RETRY, system_category()) - && e != error_code(ERROR_NETWORK_UNREACHABLE, system_category()) - && e != error_code(ERROR_CONNECTION_REFUSED, system_category()) - && e != error_code(ERROR_CONNECTION_ABORTED, system_category()) -#endif - && e != boost::asio::error::message_size) - { - return; - } - - if (m_abort) - { - close_impl(); - return; - } - - return; - } - - TORRENT_TRY { - - if (m_tunnel_packets) - { - // if the source IP doesn't match the proxy's, ignore the packet - if (ep == m_udp_proxy_addr) - unwrap(e, m_buf, bytes_transferred); - } - else if (!m_force_proxy) // block incoming packets that aren't coming via the proxy - { - call_handler(e, ep, m_buf, bytes_transferred); - } - - } TORRENT_CATCH (std::exception&) {} -} - -void udp_socket::setup_read(udp::socket* s) -{ - if (m_abort) - { - close_impl(); - return; - } - -#if TORRENT_USE_IPV6 - if (s == &m_ipv6_sock) - { - if (m_v6_outstanding) - { - ++m_restart_v6; - m_ipv6_sock.cancel(); - return; - } - ++m_v6_outstanding; - } - else -#endif - { - if (m_v4_outstanding) - { - ++m_restart_v4; - m_ipv4_sock.cancel(); - return; - } - ++m_v4_outstanding; - } - - ADD_OUTSTANDING_ASYNC("udp_socket::on_read"); - - udp::endpoint ep; - TORRENT_TRY - { -#if TORRENT_USE_IPV6 - if (s == &m_ipv6_sock) - { - s->async_receive_from(null_buffers() - , ep, make_read_handler6(boost::bind(&udp_socket::on_read, this, _1, s))); - } - else -#endif - { - s->async_receive_from(null_buffers() - , ep, make_read_handler4(boost::bind(&udp_socket::on_read, this, _1, s))); - } - } - TORRENT_CATCH(boost::system::system_error& e) - { -#ifdef BOOST_NO_EXCEPTIONS - // dummy - error_code ec; - boost::system::system_error e(ec); -#endif - get_io_service().post(boost::bind(&udp_socket::on_read - , this, e.code(), s)); - } -} - -void udp_socket::wrap(udp::endpoint const& ep, char const* p, int len, error_code& ec) -{ - CHECK_MAGIC; + TORRENT_UNUSED(flags); using namespace libtorrent::detail; char header[25]; @@ -593,21 +319,19 @@ void udp_socket::wrap(udp::endpoint const& ep, char const* p, int len, error_cod boost::array iovec; iovec[0] = boost::asio::const_buffer(header, h - header); - iovec[1] = boost::asio::const_buffer(p, len); + iovec[1] = boost::asio::const_buffer(p.data(), p.size()); -#if TORRENT_USE_IPV6 - if (m_udp_proxy_addr.address().is_v4() && m_ipv4_sock.is_open()) -#endif - m_ipv4_sock.send_to(iovec, m_udp_proxy_addr, 0, ec); -#if TORRENT_USE_IPV6 - else - m_ipv6_sock.send_to(iovec, m_udp_proxy_addr, 0, ec); -#endif + // set the DF flag for the socket and clear it again in the destructor + set_dont_frag df(m_socket, (flags & dont_fragment) != 0 + && ep.protocol() == udp::v4()); + + m_socket.send_to(iovec, m_socks5_connection->target(), 0, ec); } -void udp_socket::wrap(char const* hostname, int port, char const* p, int len, error_code& ec) +void udp_socket::wrap(char const* hostname, int const port, array_view p + , error_code& ec, int const flags) { - CHECK_MAGIC; + TORRENT_UNUSED(flags); using namespace libtorrent::detail; char header[270]; @@ -624,57 +348,61 @@ void udp_socket::wrap(char const* hostname, int port, char const* p, int len, er boost::array iovec; iovec[0] = boost::asio::const_buffer(header, h - header); - iovec[1] = boost::asio::const_buffer(p, len); + iovec[1] = boost::asio::const_buffer(p.data(), p.size()); -#if TORRENT_USE_IPV6 - if (m_udp_proxy_addr.address().is_v6() && m_ipv6_sock.is_open()) - m_ipv6_sock.send_to(iovec, m_udp_proxy_addr, 0, ec); - else -#endif - m_ipv4_sock.send_to(iovec, m_udp_proxy_addr, 0, ec); + // set the DF flag for the socket and clear it again in the destructor + set_dont_frag df(m_socket, (flags & dont_fragment) != 0 + && m_socket.local_endpoint(ec).protocol() == udp::v4()); + + m_socket.send_to(iovec, m_socks5_connection->target(), 0, ec); } // unwrap the UDP packet from the SOCKS5 header -void udp_socket::unwrap(error_code const& e, char const* buf, int size) +// buf is an in-out parameter. It will be updated +// return false if the packet should be ignored. It's not a valid Socks5 UDP +// forwarded packet +bool udp_socket::unwrap(udp::endpoint& from, array_view& buf) { - CHECK_MAGIC; using namespace libtorrent::detail; // the minimum socks5 header size - if (size <= 10) return; + int const size = buf.size(); + if (size <= 10) return false; - char const* p = buf; + char* p = buf.data(); p += 2; // reserved - int frag = read_uint8(p); + int const frag = read_uint8(p); // fragmentation is not supported - if (frag != 0) return; + if (frag != 0) return false; - udp::endpoint sender; - - int atyp = read_uint8(p); + int const atyp = read_uint8(p); if (atyp == 1) { // IPv4 - sender = read_v4_endpoint(p); + from = read_v4_endpoint(p); } #if TORRENT_USE_IPV6 else if (atyp == 4) { // IPv6 - sender = read_v6_endpoint(p); + from = read_v6_endpoint(p); } #endif else { - int len = read_uint8(p); - if (len > (buf + size) - p) return; + int const len = read_uint8(p); + if (len > buf.end() - p) return false; std::string hostname(p, p + len); + error_code ec; + address addr = address::from_string(hostname, ec); + // we only support "hostnames" that are a dotted decimal IP + if (ec) return false; p += len; - call_handler(e, hostname.c_str(), p, size - (p - buf)); - return; + from = udp::endpoint(addr, read_uint16(p)); } - call_handler(e, sender, p, size - (p - buf)); + buf = array_view(p, size - (p - buf.data())); + return true; } #if !defined BOOST_ASIO_ENABLE_CANCELIO && defined TORRENT_WINDOWS @@ -684,355 +412,154 @@ void udp_socket::unwrap(error_code const& e, char const* buf, int size) void udp_socket::close() { TORRENT_ASSERT(is_single_thread()); - TORRENT_ASSERT(m_magic == 0x1337); error_code ec; - m_ipv4_sock.close(ec); + m_socket.close(ec); TORRENT_ASSERT_VAL(!ec || ec == error::bad_descriptor, ec); -#if TORRENT_USE_IPV6 - m_ipv6_sock.close(ec); - TORRENT_ASSERT_VAL(!ec || ec == error::bad_descriptor, ec); -#endif - m_socks5_sock.close(ec); - TORRENT_ASSERT_VAL(!ec || ec == error::bad_descriptor, ec); - m_resolver.cancel(); - m_timer.cancel(); + if (m_socks5_connection) + { + m_socks5_connection->close(); + m_socks5_connection.reset(); + } m_abort = true; - -#if TORRENT_USE_ASSERTS - m_outstanding_when_aborted = num_outstanding(); -#endif -} - -void udp_socket::set_buf_size(int s) -{ - TORRENT_ASSERT(is_single_thread()); - - if (m_observers_locked) - { - // we can't actually reallocate the buffer while - // it's being used by the observers, we have to - // do that once we're done iterating over them - m_new_buf_size = s; - return; - } - - if (s == m_buf_size) return; - - bool no_mem = false; - char* tmp = static_cast(realloc(m_buf, s)); - if (tmp != 0) - { - m_buf = tmp; - m_buf_size = s; - m_new_buf_size = s; - } - else - { - no_mem = true; - } - - if (no_mem) - { - free(m_buf); - m_buf = 0; - m_buf_size = 0; - m_new_buf_size = 0; - udp::endpoint ep; - call_handler(error::no_memory, ep, 0, 0); - close(); - } - - int size = m_buf_size; - - // don't shrink the size of the receive buffer - error_code ec; - boost::asio::socket_base::receive_buffer_size recv_size; - m_ipv4_sock.get_option(recv_size, ec); - if (!ec) size = (std::max)(recv_size.value(), size); -#if TORRENT_USE_IPV6 - m_ipv6_sock.get_option(recv_size, ec); - if (!ec) size = (std::max)(recv_size.value(), size); -#endif - - error_code ignore_errors; - // set the internal buffer sizes as well - m_ipv4_sock.set_option(boost::asio::socket_base::receive_buffer_size(size) - , ignore_errors); -#if TORRENT_USE_IPV6 - m_ipv6_sock.set_option(boost::asio::socket_base::receive_buffer_size(size) - , ignore_errors); -#endif } void udp_socket::bind(udp::endpoint const& ep, error_code& ec) { - CHECK_MAGIC; TORRENT_ASSERT(is_single_thread()); m_abort = false; - if (m_ipv4_sock.is_open()) m_ipv4_sock.close(ec); -#if TORRENT_USE_IPV6 - if (m_ipv6_sock.is_open()) m_ipv6_sock.close(ec); -#endif + if (m_socket.is_open()) m_socket.close(ec); ec.clear(); if (ep.address().is_v4()) { - m_ipv4_sock.open(udp::v4(), ec); + m_socket.open(udp::v4(), ec); if (ec) return; - - // this is best-effort. ignore errors - error_code err; -#ifdef TORRENT_WINDOWS - m_ipv4_sock.set_option(exclusive_address_use(true), err); -#endif - m_ipv4_sock.set_option(boost::asio::socket_base::reuse_address(true), err); - - m_ipv4_sock.bind(ep, ec); - if (ec) return; - udp::socket::non_blocking_io ioc(true); - m_ipv4_sock.io_control(ioc, ec); - if (ec) return; - setup_read(&m_ipv4_sock); } - #if TORRENT_USE_IPV6 - // TODO: 2 the udp_socket should really just be a single socket, and the - // session should support having more than one, just like with TCP sockets - // for now, just make bind failures non-fatal - if (supports_ipv6() && (ep.address().is_v6() || is_any(ep.address()))) + else if (ep.address().is_v6()) { - udp::endpoint ep6 = ep; - if (is_any(ep.address())) ep6.address(address_v6::any()); - m_ipv6_sock.open(udp::v6(), ec); + m_socket.open(udp::v6(), ec); if (ec) return; - - // this is best-effort. ignore errors error_code err; -#ifdef TORRENT_WINDOWS - m_ipv6_sock.set_option(exclusive_address_use(true), err); -#endif - m_ipv6_sock.set_option(boost::asio::socket_base::reuse_address(true), err); - m_ipv6_sock.set_option(boost::asio::ip::v6_only(true), err); + m_socket.set_option(boost::asio::ip::v6_only(true), err); - m_ipv6_sock.bind(ep6, ec); - if (ec != error_code(boost::system::errc::address_not_available - , boost::system::generic_category())) - { - if (ec) return; - udp::socket::non_blocking_io ioc(true); - m_ipv6_sock.io_control(ioc, ec); - if (ec) return; - setup_read(&m_ipv6_sock); - } - else - { - ec.clear(); - } +#ifdef TORRENT_WINDOWS + // enable Teredo on windows + m_socket.set_option(v6_protection_level(PROTECTION_LEVEL_UNRESTRICTED), err); +#endif // TORRENT_WINDOWS } #endif -#if TORRENT_USE_ASSERTS - m_started = true; + + // this is best-effort. ignore errors + error_code err; +#ifdef TORRENT_WINDOWS + m_socket.set_option(exclusive_address_use(true), err); #endif + m_socket.set_option(boost::asio::socket_base::reuse_address(true), err); + + m_socket.bind(ep, ec); + if (ec) return; + udp::socket::non_blocking_io ioc(true); + m_socket.io_control(ioc, ec); + if (ec) return; + m_bind_port = ep.port(); } void udp_socket::set_proxy_settings(aux::proxy_settings const& ps) { - CHECK_MAGIC; TORRENT_ASSERT(is_single_thread()); error_code ec; - m_socks5_sock.close(ec); - m_tunnel_packets = false; + if (m_socks5_connection) + { + m_socks5_connection->close(); + m_socks5_connection.reset(); + } m_proxy_settings = ps; - if (m_abort) - { - close_impl(); - return; - } + if (m_abort) return; if (ps.type == settings_pack::socks5 || ps.type == settings_pack::socks5_pw) { - m_queue_packets = true; // connect to socks5 server and open up the UDP tunnel - // TODO: use the system resolver_interface here - tcp::resolver::query q(ps.hostname, to_string(ps.port).elems); - ++m_outstanding_ops; -#if TORRENT_USE_ASSERTS - ++m_outstanding_resolve; -#endif - ADD_OUTSTANDING_ASYNC("udp_socket::on_name_lookup"); - m_resolver.async_resolve(q, boost::bind( - &udp_socket::on_name_lookup, this, _1, _2)); + m_socks5_connection = boost::make_shared(boost::ref(m_socket.get_io_service())); + m_socks5_connection->start(ps); } } -void udp_socket::close_impl() +// ===================== SOCKS 5 ========================= + +void socks5::start(aux::proxy_settings const& ps) { - if (m_outstanding_ops == 0) - { - error_code ec; - m_ipv4_sock.close(ec); -#if TORRENT_USE_IPV6 - m_ipv6_sock.close(ec); -#endif - m_socks5_sock.close(ec); - } + m_proxy_settings = ps; + + // TODO: use the system resolver_interface here + tcp::resolver::query q(ps.hostname, to_string(ps.port).elems); + ADD_OUTSTANDING_ASYNC("socks5::on_name_lookup"); + m_resolver.async_resolve(q, boost::bind( + &socks5::on_name_lookup, self(), _1, _2)); } -void udp_socket::on_name_lookup(error_code const& e, tcp::resolver::iterator i) +void socks5::on_name_lookup(error_code const& e, tcp::resolver::iterator i) { - COMPLETE_ASYNC("udp_socket::on_name_lookup"); -#if TORRENT_USE_ASSERTS - TORRENT_ASSERT(m_outstanding_resolve > 0); - --m_outstanding_resolve; -#endif + COMPLETE_ASYNC("socks5::on_name_lookup"); - TORRENT_ASSERT(m_outstanding_ops > 0); - --m_outstanding_ops; - TORRENT_ASSERT(m_outstanding_ops == m_outstanding_connect - + m_outstanding_timeout - + m_outstanding_resolve - + m_outstanding_socks); - - if (m_abort) - { - close_impl(); - return; - } - - CHECK_MAGIC; + if (m_abort) return; if (e == boost::asio::error::operation_aborted) return; - TORRENT_ASSERT(is_single_thread()); - - if (e) - { - if (m_force_proxy) - { - call_handler(e, udp::endpoint(), 0, 0); - } - else - { - // if we can't connect to the proxy, and - // we're not in privacy mode, try to just - // not use a proxy - m_proxy_settings = aux::proxy_settings(); - m_tunnel_packets = false; - } - - drain_queue(); - return; - } + if (e) return; m_proxy_addr.address(i->endpoint().address()); m_proxy_addr.port(i->endpoint().port()); - ADD_OUTSTANDING_ASYNC("udp_socket::on_connected"); - error_code ec; m_socks5_sock.open(m_proxy_addr.address().is_v4()?tcp::v4():tcp::v6(), ec); // enable keepalives m_socks5_sock.set_option(boost::asio::socket_base::keep_alive(true), ec); - ++m_outstanding_ops; -#if TORRENT_USE_ASSERTS - ++m_outstanding_connect; -#endif + ADD_OUTSTANDING_ASYNC("socks5::on_connected"); m_socks5_sock.async_connect(tcp::endpoint(m_proxy_addr.address(), m_proxy_addr.port()) - , boost::bind(&udp_socket::on_connected, this, _1)); + , boost::bind(&socks5::on_connected, self(), _1)); - ++m_outstanding_ops; -#if TORRENT_USE_ASSERTS - ++m_outstanding_timeout; -#endif - - ADD_OUTSTANDING_ASYNC("udp_socket::on_connect_timeout"); + ADD_OUTSTANDING_ASYNC("socks5::on_connect_timeout"); m_timer.expires_from_now(seconds(10)); - m_timer.async_wait(boost::bind(&udp_socket::on_connect_timeout - , this, _1)); + m_timer.async_wait(boost::bind(&socks5::on_connect_timeout + , self(), _1)); } -void udp_socket::on_connect_timeout(error_code const& ec) +void socks5::on_connect_timeout(error_code const& ec) { - COMPLETE_ASYNC("udp_socket::on_connect_timeout"); -#if TORRENT_USE_ASSERTS - TORRENT_ASSERT(m_outstanding_timeout > 0); - --m_outstanding_timeout; -#endif - TORRENT_ASSERT(m_outstanding_ops > 0); - --m_outstanding_ops; - TORRENT_ASSERT(m_outstanding_ops == m_outstanding_connect - + m_outstanding_timeout - + m_outstanding_resolve - + m_outstanding_socks); + COMPLETE_ASYNC("socks5::on_connect_timeout"); if (ec == boost::asio::error::operation_aborted) return; - m_queue_packets = false; - - if (m_abort) - { - close_impl(); - return; - } - - CHECK_MAGIC; - TORRENT_ASSERT(is_single_thread()); + if (m_abort) return; error_code ignore; m_socks5_sock.close(ignore); } -void udp_socket::on_connected(error_code const& e) +void socks5::on_connected(error_code const& e) { - COMPLETE_ASYNC("udp_socket::on_connected"); - - TORRENT_ASSERT(is_single_thread()); - -#if TORRENT_USE_ASSERTS - TORRENT_ASSERT(m_outstanding_connect > 0); - --m_outstanding_connect; -#endif - TORRENT_ASSERT(m_outstanding_ops > 0); - --m_outstanding_ops; - TORRENT_ASSERT(m_outstanding_ops == m_outstanding_connect - + m_outstanding_timeout - + m_outstanding_resolve - + m_outstanding_socks); - CHECK_MAGIC; + COMPLETE_ASYNC("socks5::on_connected"); m_timer.cancel(); if (e == boost::asio::error::operation_aborted) return; - if (m_abort) - { - close_impl(); - return; - } + if (m_abort) return; - if (e) - { - // we failed to connect to the proxy, if we don't have force_proxy set, - // drain the queue over the UDP socket - if (!m_force_proxy) - { - drain_queue(); - } - - call_handler(e, udp::endpoint(), 0, 0); - return; - } + // we failed to connect to the proxy + if (e) return; using namespace libtorrent::detail; @@ -1052,81 +579,31 @@ void udp_socket::on_connected(error_code const& e) write_uint8(2, p); // username/password } TORRENT_ASSERT_VAL(p - m_tmp_buf < int(sizeof(m_tmp_buf)), (p - m_tmp_buf)); - ADD_OUTSTANDING_ASYNC("udp_socket::on_handshake1"); - ++m_outstanding_ops; -#if TORRENT_USE_ASSERTS - ++m_outstanding_socks; -#endif + ADD_OUTSTANDING_ASYNC("socks5::on_handshake1"); boost::asio::async_write(m_socks5_sock, boost::asio::buffer(m_tmp_buf, p - m_tmp_buf) - , boost::bind(&udp_socket::handshake1, this, _1)); + , boost::bind(&socks5::handshake1, self(), _1)); } -void udp_socket::handshake1(error_code const& e) +void socks5::handshake1(error_code const& e) { - COMPLETE_ASYNC("udp_socket::on_handshake1"); -#if TORRENT_USE_ASSERTS - TORRENT_ASSERT(m_outstanding_socks > 0); - --m_outstanding_socks; -#endif - TORRENT_ASSERT(m_outstanding_ops > 0); - --m_outstanding_ops; - TORRENT_ASSERT(m_outstanding_ops == m_outstanding_connect - + m_outstanding_timeout - + m_outstanding_resolve - + m_outstanding_socks); - if (m_abort) - { - close_impl(); - return; - } - CHECK_MAGIC; - if (e) - { - drain_queue(); - return; - } + COMPLETE_ASYNC("socks5::on_handshake1"); + if (m_abort) return; + if (e) return; - TORRENT_ASSERT(is_single_thread()); - - ADD_OUTSTANDING_ASYNC("udp_socket::on_handshake2"); - ++m_outstanding_ops; -#if TORRENT_USE_ASSERTS - ++m_outstanding_socks; -#endif + ADD_OUTSTANDING_ASYNC("socks5::on_handshake2"); boost::asio::async_read(m_socks5_sock, boost::asio::buffer(m_tmp_buf, 2) - , boost::bind(&udp_socket::handshake2, this, _1)); + , boost::bind(&socks5::handshake2, self(), _1)); } -void udp_socket::handshake2(error_code const& e) +void socks5::handshake2(error_code const& e) { - COMPLETE_ASYNC("udp_socket::on_handshake2"); -#if TORRENT_USE_ASSERTS - TORRENT_ASSERT(m_outstanding_socks > 0); - --m_outstanding_socks; -#endif - TORRENT_ASSERT(m_outstanding_ops > 0); - --m_outstanding_ops; - TORRENT_ASSERT(m_outstanding_ops == m_outstanding_connect - + m_outstanding_timeout - + m_outstanding_resolve - + m_outstanding_socks); - if (m_abort) - { - close_impl(); - return; - } - CHECK_MAGIC; + COMPLETE_ASYNC("socks5::on_handshake2"); + if (m_abort) return; - if (e) - { - drain_queue(); - return; - } + if (e) return; using namespace libtorrent::detail; - TORRENT_ASSERT(is_single_thread()); - char* p = &m_tmp_buf[0]; int version = read_uint8(p); int method = read_uint8(p); @@ -1135,7 +612,6 @@ void udp_socket::handshake2(error_code const& e) { error_code ec; m_socks5_sock.close(ec); - drain_queue(); return; } @@ -1149,7 +625,6 @@ void udp_socket::handshake2(error_code const& e) { error_code ec; m_socks5_sock.close(ec); - drain_queue(); return; } @@ -1163,85 +638,34 @@ void udp_socket::handshake2(error_code const& e) write_uint8(uint8_t(m_proxy_settings.password.size()), p); write_string(m_proxy_settings.password, p); TORRENT_ASSERT_VAL(p - m_tmp_buf < int(sizeof(m_tmp_buf)), (p - m_tmp_buf)); - ADD_OUTSTANDING_ASYNC("udp_socket::on_handshake3"); - ++m_outstanding_ops; -#if TORRENT_USE_ASSERTS - ++m_outstanding_socks; -#endif + ADD_OUTSTANDING_ASYNC("socks5::on_handshake3"); boost::asio::async_write(m_socks5_sock, boost::asio::buffer(m_tmp_buf, p - m_tmp_buf) - , boost::bind(&udp_socket::handshake3, this, _1)); + , boost::bind(&socks5::handshake3, self(), _1)); } else { - drain_queue(); error_code ec; m_socks5_sock.close(ec); return; } } -void udp_socket::handshake3(error_code const& e) +void socks5::handshake3(error_code const& e) { - COMPLETE_ASYNC("udp_socket::on_handshake3"); -#if TORRENT_USE_ASSERTS - TORRENT_ASSERT(m_outstanding_socks > 0); - --m_outstanding_socks; -#endif - TORRENT_ASSERT(m_outstanding_ops > 0); - --m_outstanding_ops; - TORRENT_ASSERT(m_outstanding_ops == m_outstanding_connect - + m_outstanding_timeout - + m_outstanding_resolve - + m_outstanding_socks); - if (m_abort) - { - close_impl(); - return; - } - CHECK_MAGIC; - if (e) - { - drain_queue(); - return; - } + COMPLETE_ASYNC("socks5::on_handshake3"); + if (m_abort) return; + if (e) return; - TORRENT_ASSERT(is_single_thread()); - - ADD_OUTSTANDING_ASYNC("udp_socket::on_handshake4"); - ++m_outstanding_ops; -#if TORRENT_USE_ASSERTS - ++m_outstanding_socks; -#endif + ADD_OUTSTANDING_ASYNC("socks5::on_handshake4"); boost::asio::async_read(m_socks5_sock, boost::asio::buffer(m_tmp_buf, 2) - , boost::bind(&udp_socket::handshake4, this, _1)); + , boost::bind(&socks5::handshake4, self(), _1)); } -void udp_socket::handshake4(error_code const& e) +void socks5::handshake4(error_code const& e) { - COMPLETE_ASYNC("udp_socket::on_handshake4"); -#if TORRENT_USE_ASSERTS - TORRENT_ASSERT(m_outstanding_socks > 0); - --m_outstanding_socks; -#endif - TORRENT_ASSERT(m_outstanding_ops > 0); - --m_outstanding_ops; - TORRENT_ASSERT(m_outstanding_ops == m_outstanding_connect - + m_outstanding_timeout - + m_outstanding_resolve - + m_outstanding_socks); - if (m_abort) - { - close_impl(); - return; - } - CHECK_MAGIC; - if (e) - { - drain_queue(); - return; - } - - TORRENT_ASSERT(is_single_thread()); + COMPLETE_ASYNC("socks5::on_handshake4"); + if (m_abort) return; + if (e) return; using namespace libtorrent::detail; @@ -1249,18 +673,13 @@ void udp_socket::handshake4(error_code const& e) int version = read_uint8(p); int status = read_uint8(p); - if (version != 1 || status != 0) - { - drain_queue(); - return; - } + if (version != 1 || status != 0) return; socks_forward_udp(/*l*/); } -void udp_socket::socks_forward_udp() +void socks5::socks_forward_udp() { - CHECK_MAGIC; using namespace libtorrent::detail; // send SOCKS5 UDP command @@ -1273,78 +692,28 @@ void udp_socket::socks_forward_udp() write_uint32(0, p); // 0.0.0.0 write_uint16(0, p); // :0 TORRENT_ASSERT_VAL(p - m_tmp_buf < int(sizeof(m_tmp_buf)), (p - m_tmp_buf)); - ADD_OUTSTANDING_ASYNC("udp_socket::connect1"); - ++m_outstanding_ops; -#if TORRENT_USE_ASSERTS - ++m_outstanding_socks; -#endif + ADD_OUTSTANDING_ASYNC("socks5::connect1"); boost::asio::async_write(m_socks5_sock, boost::asio::buffer(m_tmp_buf, p - m_tmp_buf) - , boost::bind(&udp_socket::connect1, this, _1)); + , boost::bind(&socks5::connect1, self(), _1)); } -void udp_socket::connect1(error_code const& e) +void socks5::connect1(error_code const& e) { - COMPLETE_ASYNC("udp_socket::connect1"); -#if TORRENT_USE_ASSERTS - TORRENT_ASSERT(m_outstanding_socks > 0); - --m_outstanding_socks; -#endif - TORRENT_ASSERT(m_outstanding_ops > 0); - --m_outstanding_ops; - TORRENT_ASSERT(m_outstanding_ops == m_outstanding_connect - + m_outstanding_timeout - + m_outstanding_resolve - + m_outstanding_socks); - if (m_abort) - { - close_impl(); - return; - } - CHECK_MAGIC; - if (e) - { - drain_queue(); - return; - } + COMPLETE_ASYNC("socks5::connect1"); + if (m_abort) return; + if (e) return; - TORRENT_ASSERT(is_single_thread()); - - ADD_OUTSTANDING_ASYNC("udp_socket::connect2"); - ++m_outstanding_ops; -#if TORRENT_USE_ASSERTS - ++m_outstanding_socks; -#endif + ADD_OUTSTANDING_ASYNC("socks5::connect2"); boost::asio::async_read(m_socks5_sock, boost::asio::buffer(m_tmp_buf, 10) - , boost::bind(&udp_socket::connect2, this, _1)); + , boost::bind(&socks5::connect2, self(), _1)); } -void udp_socket::connect2(error_code const& e) +void socks5::connect2(error_code const& e) { - COMPLETE_ASYNC("udp_socket::connect2"); -#if TORRENT_USE_ASSERTS - TORRENT_ASSERT(m_outstanding_socks > 0); - --m_outstanding_socks; -#endif - TORRENT_ASSERT(m_outstanding_ops > 0); - --m_outstanding_ops; - TORRENT_ASSERT(m_outstanding_ops == m_outstanding_connect - + m_outstanding_timeout - + m_outstanding_resolve - + m_outstanding_socks); + COMPLETE_ASYNC("socks5::connect2"); - if (m_abort) - { - m_queue.clear(); - return; - } - CHECK_MAGIC; - if (e) - { - drain_queue(); - return; - } - - TORRENT_ASSERT(is_single_thread()); + if (m_abort) return; + if (e) return; using namespace libtorrent::detail; @@ -1354,11 +723,7 @@ void udp_socket::connect2(error_code const& e) ++p; // RESERVED int atyp = read_uint8(p); // address type - if (version != 5 || status != 0) - { - drain_queue(); - return; - } + if (version != 5 || status != 0) return; if (atyp == 1) { @@ -1368,70 +733,38 @@ void udp_socket::connect2(error_code const& e) else { // in this case we need to read more data from the socket + // no IPv6 support for UDP socks5 TORRENT_ASSERT(false); - drain_queue(); return; } - m_tunnel_packets = true; - drain_queue(); + // we're done! + m_active = true; - ADD_OUTSTANDING_ASYNC("udp_socket::hung_up"); - ++m_outstanding_ops; -#if TORRENT_USE_ASSERTS - ++m_outstanding_socks; -#endif + ADD_OUTSTANDING_ASYNC("socks5::hung_up"); boost::asio::async_read(m_socks5_sock, boost::asio::buffer(m_tmp_buf, 10) - , boost::bind(&udp_socket::hung_up, this, _1)); + , boost::bind(&socks5::hung_up, self(), _1)); } -void udp_socket::hung_up(error_code const& e) +void socks5::hung_up(error_code const& e) { - COMPLETE_ASYNC("udp_socket::hung_up"); -#if TORRENT_USE_ASSERTS - TORRENT_ASSERT(m_outstanding_socks > 0); - --m_outstanding_socks; -#endif - TORRENT_ASSERT(m_outstanding_ops > 0); - --m_outstanding_ops; - TORRENT_ASSERT(m_outstanding_ops == m_outstanding_connect - + m_outstanding_timeout - + m_outstanding_resolve - + m_outstanding_socks); - if (m_abort) - { - close_impl(); - return; - } - CHECK_MAGIC; - TORRENT_ASSERT(is_single_thread()); + COMPLETE_ASYNC("socks5::hung_up"); + m_active = false; if (e == boost::asio::error::operation_aborted || m_abort) return; // the socks connection was closed, re-open it - set_proxy_settings(m_proxy_settings); + start(m_proxy_settings); } -void udp_socket::drain_queue() +void socks5::close() { - m_queue_packets = false; - - // forward all packets that were put in the queue - while (!m_queue.empty()) - { - queued_packet const& p = m_queue.front(); - error_code ec; - if (p.hostname) - { - udp_socket::send_hostname(p.hostname, p.ep.port(), &p.buf[0] - , p.buf.size(), ec, p.flags | dont_queue); - free(p.hostname); - } - else - { - udp_socket::send(p.ep, &p.buf[0], p.buf.size(), ec, p.flags | dont_queue); - } - m_queue.pop_front(); - } + m_abort = true; + error_code ec; + m_socks5_sock.close(ec); + m_resolver.cancel(); + m_timer.cancel(); +} + } diff --git a/src/udp_tracker_connection.cpp b/src/udp_tracker_connection.cpp index 7922cbac3..66beb3c69 100644 --- a/src/udp_tracker_connection.cpp +++ b/src/udp_tracker_connection.cpp @@ -319,19 +319,19 @@ namespace libtorrent tracker_connection::close(); } - bool udp_tracker_connection::on_receive_hostname(error_code const& e - , char const* hostname, char const* buf, int size) + bool udp_tracker_connection::on_receive_hostname(char const* hostname + , char const* buf, int size) { TORRENT_UNUSED(hostname); // just ignore the hostname this came from, pretend that // it's from the same endpoint we sent it to (i.e. the same // port). We have so many other ways of confirming this packet // comes from the tracker anyway, so it's not a big deal - return on_receive(e, m_target, buf, size); + return on_receive(m_target, buf, size); } - bool udp_tracker_connection::on_receive(error_code const& e - , udp::endpoint const& ep, char const* buf, int size) + bool udp_tracker_connection::on_receive(udp::endpoint const& ep + , char const* buf, int const size) { #ifndef TORRENT_DISABLE_LOGGING boost::shared_ptr cb = requester(); @@ -369,8 +369,6 @@ namespace libtorrent return false; } - if (e) fail(e); - #ifndef TORRENT_DISABLE_LOGGING if (cb) cb->debug_log("<== UDP_TRACKER_PACKET [ size: %d ]", size); #endif @@ -499,13 +497,13 @@ namespace libtorrent error_code ec; if (!m_hostname.empty()) { - m_man.get_udp_socket().send_hostname(m_hostname.c_str() - , m_target.port(), buf, 16, ec + m_man.send_hostname(m_hostname.c_str() + , m_target.port(), aux::array_view(buf, 16), ec , udp_socket::tracker_connection); } else { - m_man.get_udp_socket().send(m_target, buf, 16, ec + m_man.send(m_target, aux::array_view(buf, 16), ec , udp_socket::tracker_connection); } @@ -563,12 +561,12 @@ namespace libtorrent error_code ec; if (!m_hostname.empty()) { - m_man.get_udp_socket().send_hostname(m_hostname.c_str(), m_target.port() - , buf, sizeof(buf), ec, udp_socket::tracker_connection); + m_man.send_hostname(m_hostname.c_str(), m_target.port() + , aux::array_view(buf), ec, udp_socket::tracker_connection); } else { - m_man.get_udp_socket().send(m_target, buf, sizeof(buf), ec + m_man.send(m_target, aux::array_view(buf), ec , udp_socket::tracker_connection); } m_state = action_scrape; @@ -762,13 +760,13 @@ namespace libtorrent if (!m_hostname.empty()) { - m_man.get_udp_socket().send_hostname(m_hostname.c_str() - , m_target.port(), buf, out - buf, ec + m_man.send_hostname(m_hostname.c_str() + , m_target.port(), aux::array_view(buf, out - buf), ec , udp_socket::tracker_connection); } else { - m_man.get_udp_socket().send(m_target, buf, out - buf, ec + m_man.send(m_target, aux::array_view(buf, out - buf), ec , udp_socket::tracker_connection); } m_state = action_announce; diff --git a/src/utp_socket_manager.cpp b/src/utp_socket_manager.cpp index c7b104493..17580c1ec 100644 --- a/src/utp_socket_manager.cpp +++ b/src/utp_socket_manager.cpp @@ -40,18 +40,22 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/random.hpp" #include "libtorrent/performance_counters.hpp" #include "libtorrent/aux_/time.hpp" // for aux::time_now() +#include "libtorrent/aux_/array_view.hpp" // #define TORRENT_DEBUG_MTU 1135 namespace libtorrent { + using namespace libtorrent::aux; - utp_socket_manager::utp_socket_manager(aux::session_settings const& sett - , udp_socket& s + utp_socket_manager::utp_socket_manager( + send_fun_t const& send_fun + , incoming_utp_callback_t const& cb + , io_service& ios + , aux::session_settings const& sett , counters& cnt - , void* ssl_context - , incoming_utp_callback_t cb) - : m_sock(s) + , void* ssl_context) + : m_send_fun(send_fun) , m_cb(cb) , m_last_socket(0) , m_new_connection(-1) @@ -60,6 +64,7 @@ namespace libtorrent , m_last_if_update(min_time()) , m_sock_buf_size(0) , m_counters(cnt) + , m_ios(ios) , m_mtu_idx(0) , m_ssl_context(ssl_context) { @@ -94,7 +99,6 @@ namespace libtorrent void utp_socket_manager::mtu_for_dest(address const& addr, int& link_mtu, int& utp_mtu) { - int mtu = 0; if (is_teredo(addr)) mtu = TORRENT_TEREDO_MTU; else mtu = TORRENT_ETHERNET_MTU; @@ -118,13 +122,12 @@ namespace libtorrent mtu -= TORRENT_UDP_HEADER; - if (m_sock.get_proxy_settings().type == settings_pack::socks5 - || m_sock.get_proxy_settings().type == settings_pack::socks5_pw) + if (m_sett.get_int(settings_pack::proxy_type) == settings_pack::socks5 + || m_sett.get_int(settings_pack::proxy_type) == settings_pack::socks5_pw) { // this is for the IP layer - address proxy_addr = m_sock.proxy_addr().address(); - if (proxy_addr.is_v4()) mtu -= TORRENT_IPV4_HEADER; - else mtu -= TORRENT_IPV6_HEADER; + // assume the proxy is running over IPv4 + mtu -= TORRENT_IPV4_HEADER; // this is for the SOCKS layer mtu -= TORRENT_SOCKS5_HEADER; @@ -148,102 +151,20 @@ namespace libtorrent #if !defined TORRENT_HAS_DONT_FRAGMENT && !defined TORRENT_DEBUG_MTU TORRENT_UNUSED(flags); #endif - if (!m_sock.is_open()) - { - ec = boost::asio::error::operation_aborted; - return; - } #ifdef TORRENT_DEBUG_MTU // drop packets that exceed the debug MTU if ((flags & dont_fragment) && len > TORRENT_DEBUG_MTU) return; #endif -#ifdef TORRENT_HAS_DONT_FRAGMENT - error_code tmp; - if (flags & utp_socket_manager::dont_fragment) - { - m_sock.set_option(libtorrent::dont_fragment(true), tmp); - TORRENT_ASSERT_VAL(!tmp, tmp.message()); - } -#endif - m_sock.send(ep, p, len, ec, udp_socket::peer_connection); -#ifdef TORRENT_HAS_DONT_FRAGMENT - if (flags & utp_socket_manager::dont_fragment) - { - m_sock.set_option(libtorrent::dont_fragment(false), tmp); - TORRENT_ASSERT_VAL(!tmp, tmp.message()); - } -#endif + m_send_fun(ep, array_view(p, len), ec + , ((flags & dont_fragment) ? udp_socket::dont_fragment : 0) + | udp_socket::peer_connection); } - int utp_socket_manager::local_port(error_code& ec) const + bool utp_socket_manager::incoming_packet(udp::endpoint const& ep + , char const* p, int const size) { - return m_sock.local_endpoint(ec).port(); - } - - tcp::endpoint utp_socket_manager::local_endpoint(address const& remote, error_code& ec) const - { - tcp::endpoint socket_ep = m_sock.local_endpoint(ec); - - // first enumerate the routes in the routing table - if (aux::time_now() - seconds(60) > m_last_route_update) - { - m_last_route_update = aux::time_now(); - error_code err; - m_routes = enum_routes(m_sock.get_io_service(), err); - if (err) return socket_ep; - } - - if (m_routes.empty()) return socket_ep; - // then find the best match - ip_route* best = &m_routes[0]; - for (std::vector::iterator i = m_routes.begin() - , end(m_routes.end()); i != end; ++i) - { - if (is_any(i->destination) && i->destination.is_v4() == remote.is_v4()) - { - best = &*i; - break; - } - - if (match_addr_mask(remote, i->destination, i->netmask)) - { - best = &*i; - break; - } - } - - // best now tells us which interface we would send over - // for this target. Now figure out what the local address - // is for that interface - - if (aux::time_now() - seconds(60) > m_last_if_update) - { - m_last_if_update = aux::time_now(); - error_code err; - m_interfaces = enum_net_interfaces(m_sock.get_io_service(), err); - if (err) return socket_ep; - } - - for (std::vector::iterator i = m_interfaces.begin() - , end(m_interfaces.end()); i != end; ++i) - { - if (i->interface_address.is_v4() != remote.is_v4()) - continue; - - if (strcmp(best->name, i->name) == 0) - return tcp::endpoint(i->interface_address, socket_ep.port()); - } - return socket_ep; - } - - bool utp_socket_manager::incoming_packet(error_code const& ec, udp::endpoint const& ep - , char const* p, int size) - { - // TODO: 2 we may want to take ec into account here. possibly close - // connections quicker - TORRENT_UNUSED(ec); // UTP_LOGV("incoming packet size:%d\n", size); if (size < int(sizeof(utp_header))) return false; @@ -294,14 +215,14 @@ namespace libtorrent // UTP_LOGV("not found, new connection id:%d\n", m_new_connection); - boost::shared_ptr c(new (std::nothrow) socket_type(m_sock.get_io_service())); + boost::shared_ptr c(new (std::nothrow) socket_type(m_ios)); if (!c) return false; TORRENT_ASSERT(m_new_connection == -1); // create the new socket with this ID m_new_connection = id; - instantiate_connection(m_sock.get_io_service(), aux::proxy_settings(), *c + instantiate_connection(m_ios, aux::proxy_settings(), *c , m_ssl_context, this, true, false); @@ -397,27 +318,6 @@ namespace libtorrent m_utp_sockets.erase(i); } - void utp_socket_manager::set_sock_buf(int size) - { - if (size < m_sock_buf_size) return; - m_sock.set_buf_size(size); - error_code ec; - // add more socket buffer storage on the lower level socket - // to avoid dropping packets because of a full receive buffer - // while processing a packet - - // only update the buffer size if it's bigger than - // what we already have - udp::socket::receive_buffer_size recv_buf_size_opt; - m_sock.get_option(recv_buf_size_opt, ec); - if (recv_buf_size_opt.value() < size * 10) - { - m_sock.set_option(udp::socket::receive_buffer_size(size * 10), ec); - m_sock.set_option(udp::socket::send_buffer_size(size * 3), ec); - } - m_sock_buf_size = size; - } - void utp_socket_manager::inc_stats_counter(int counter, int delta) { TORRENT_ASSERT((counter >= counters::utp_packet_loss diff --git a/src/utp_stream.cpp b/src/utp_stream.cpp index bf2b1f0df..808d48dec 100644 --- a/src/utp_stream.cpp +++ b/src/utp_stream.cpp @@ -460,9 +460,6 @@ public: // the address of the remote endpoint address m_remote_address; - // the local address - address m_local_address; - // the send and receive buffers // maps packet sequence numbers packet_buffer m_inbuf; @@ -887,9 +884,8 @@ utp_stream::endpoint_type utp_stream::local_endpoint(error_code& ec) const if (m_impl == 0 || m_impl->m_sm == 0) { ec = boost::asio::error::not_connected; - return endpoint_type(); } - return tcp::endpoint(m_impl->m_local_address, m_impl->m_sm->local_port(ec)); + return endpoint_type(); } utp_stream::~utp_stream() @@ -1192,9 +1188,6 @@ void utp_stream::do_connect(tcp::endpoint const& ep) m_impl->m_connect_handler = true; - error_code ec; - m_impl->m_local_address = m_impl->m_sm->local_endpoint(m_impl->m_remote_address, ec).address(); - if (m_impl->test_socket_state()) return; m_impl->send_syn(); } @@ -3089,9 +3082,6 @@ bool utp_socket_impl::incoming_packet(boost::uint8_t const* buf, int size m_remote_address = ep.address(); m_port = ep.port(); - error_code ec; - m_local_address = m_sm->local_endpoint(m_remote_address, ec).address(); - m_ack_nr = ph->seq_nr; m_seq_nr = random() & 0xffff; m_acked_seq_nr = (m_seq_nr - 1) & ACK_MASK; diff --git a/test/test_fast_extension.cpp b/test/test_fast_extension.cpp index e8e1aecf4..3f8212f2f 100644 --- a/test/test_fast_extension.cpp +++ b/test/test_fast_extension.cpp @@ -431,7 +431,9 @@ boost::shared_ptr setup_peer(tcp::socket& s, sha1_hash& ih // wait for the torrent to be ready wait_for_downloading(*ses, "ses"); - s.connect(tcp::endpoint(address::from_string("127.0.0.1", ec), ses->listen_port()), ec); + int const port = ses->listen_port(); + fprintf(stderr, "listen port: %d\n", port); + s.connect(tcp::endpoint(address::from_string("127.0.0.1", ec), port), ec); if (ec) TEST_ERROR(ec.message()); print_session_log(*ses);