From 7251575d98a49df420832792063ba796fb500889 Mon Sep 17 00:00:00 2001 From: Arvid Norberg Date: Sun, 24 Apr 2016 15:26:28 -0400 Subject: [PATCH] This patch does: (#633) 1. simplifies and regularizies creation of listen sockets based on the listen interfaces setting. 2. simplifies and improves the behavior of UDP sockets, which are now explicitly opened per listen interface 3. transitions udp tracker, DHT and uTP socket manager over to using the new udp sockets 4. greatly simplified udp_socket to only wrap a single underlying socket (as opposed to one IPv4 and IPv6 socket) 5. improved behavior of bind-to-device 6. introduce an array_view type to make udp packet passing code simpler 7. simplify and make setting of DF flag more robust 8. simplify and regularize port mapping of listen sockets --- include/libtorrent/Makefile.am | 1 + include/libtorrent/aux_/array_view.hpp | 76 + include/libtorrent/aux_/session_impl.hpp | 79 +- include/libtorrent/aux_/session_interface.hpp | 3 +- include/libtorrent/kademlia/dht_tracker.hpp | 17 +- include/libtorrent/tracker_manager.hpp | 47 +- include/libtorrent/udp_socket.hpp | 240 +-- include/libtorrent/udp_tracker_connection.hpp | 6 +- include/libtorrent/utp_socket_manager.hpp | 50 +- simulation/test_dht_rate_limit.cpp | 25 +- src/kademlia/dht_tracker.cpp | 50 +- src/session_impl.cpp | 1175 +++++++------- src/torrent.cpp | 2 +- src/tracker_manager.cpp | 53 +- src/udp_socket.cpp | 1365 +++++------------ src/udp_tracker_connection.cpp | 30 +- src/utp_socket_manager.cpp | 142 +- src/utp_stream.cpp | 12 +- test/test_fast_extension.cpp | 4 +- 19 files changed, 1357 insertions(+), 2020 deletions(-) create mode 100644 include/libtorrent/aux_/array_view.hpp diff --git a/include/libtorrent/Makefile.am b/include/libtorrent/Makefile.am index 8fb33f2a7..b3c205b56 100644 --- a/include/libtorrent/Makefile.am +++ b/include/libtorrent/Makefile.am @@ -160,6 +160,7 @@ nobase_include_HEADERS = \ tommath_superclass.h \ \ aux_/alert_manager_variadic_emplace.hpp \ + aux_/array_view.hpp \ aux_/allocating_handler.hpp \ aux_/bind_to_device.hpp \ aux_/cpuid.hpp \ diff --git a/include/libtorrent/aux_/array_view.hpp b/include/libtorrent/aux_/array_view.hpp new file mode 100644 index 000000000..5a04a98da --- /dev/null +++ b/include/libtorrent/aux_/array_view.hpp @@ -0,0 +1,76 @@ +/* + +Copyright (c) 2016, Arvid Norberg +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in + the documentation and/or other materials provided with the distribution. + * Neither the name of the author nor the names of its + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. + +*/ + +#ifndef TORRENT_ARRAY_VIEW_HPP_INCLUDED +#define TORRENT_ARRAY_VIEW_HPP_INCLUDED + +#include + +namespace libtorrent { namespace aux { + + template + struct array_view + { + array_view() : m_ptr(NULL), m_len(0) {} + array_view(T* p, int l) : m_ptr(p), m_len(l) {} + + template + explicit array_view(boost::array& arr) + : m_ptr(arr.data()), m_len(arr.size()) {} + + template + explicit array_view(T (&arr)[N]) + : m_ptr(&arr[0]), m_len(N) {} + + explicit array_view(std::vector& vec) + : m_ptr(vec.data()), m_len(vec.size()) {} + + size_t size() const { return m_len; } + T* data() const { return m_ptr; } + T* begin() const { return m_ptr; } + T* end() const { return m_ptr + m_len; } + + T& operator[](int idx) + { + TORRENT_ASSERT(idx >= 0); + TORRENT_ASSERT(idx < m_len); + return m_ptr[idx]; + } + + private: + T* m_ptr; + size_t m_len; + }; +}} + +#endif + diff --git a/include/libtorrent/aux_/session_impl.hpp b/include/libtorrent/aux_/session_impl.hpp index c05bb2abd..0423d4cc4 100644 --- a/include/libtorrent/aux_/session_impl.hpp +++ b/include/libtorrent/aux_/session_impl.hpp @@ -126,17 +126,21 @@ namespace libtorrent { listen_socket_t() : tcp_external_port(0) + , udp_external_port(0) , ssl(false) + , udp_write_blocked(false) { tcp_port_mapping[0] = -1; tcp_port_mapping[1] = -1; + udp_port_mapping[0] = -1; + udp_port_mapping[1] = -1; } // this is typically empty but can be set // to the WAN IP address of NAT-PMP or UPnP router address external_address; - // this is a cached local endpoint for the listen socket + // this is a cached local endpoint for the listen TCP socket tcp::endpoint local_endpoint; // this is typically set to the same as the local @@ -147,15 +151,27 @@ namespace libtorrent // to be published to peers, since this is the port // the client is reachable through. int tcp_external_port; + int udp_external_port; // 0 is natpmp 1 is upnp int tcp_port_mapping[2]; + int udp_port_mapping[2]; // set to true if this is an SSL listen socket bool ssl; - // the actual socket + // this is true when the udp socket send() has failed with EAGAIN or + // EWOULDBLOCK. i.e. we're currently waiting for the socket to become + // writeable again. Once it is, we'll set it to false and notify the utp + // socket manager + bool udp_write_blocked; + + // the actual sockets (TCP listen socket and UDP socket) + // An entry does not necessarily have a UDP or TCP socket. One of these + // pointers may be null! + // TODO: 3 make these unique_ptr<> boost::shared_ptr sock; + boost::shared_ptr udp_sock; }; namespace aux @@ -175,7 +191,6 @@ namespace libtorrent : session_interface , dht::dht_observer , boost::noncopyable - , udp_socket_observer , uncork_interface , single_threaded { @@ -184,9 +199,6 @@ namespace libtorrent // maximum length of query names which can be registered by extensions enum { max_dht_query_length = 15 }; -#ifdef TORRENT_DEBUG -// friend class ::libtorrent::peer_connection; -#endif #if TORRENT_USE_INVARIANT_CHECKS friend class libtorrent::invariant_access; #endif @@ -245,9 +257,6 @@ namespace libtorrent // need the initial push to connect peers void prioritize_connections(boost::weak_ptr t) TORRENT_OVERRIDE; - // if we are listening on an IPv6 interface - // this will return one of the IPv6 addresses on this - // machine, otherwise just an empty endpoint tcp::endpoint get_ipv6_interface() const TORRENT_OVERRIDE; tcp::endpoint get_ipv4_interface() const TORRENT_OVERRIDE; @@ -354,8 +363,6 @@ namespace libtorrent , std::vector
const& addresses, int port); #endif - void maybe_update_udp_mapping(int nat, bool ssl, int local_port, int external_port); - #if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) torrent const* find_encrypted_torrent( sha1_hash const& info_hash, sha1_hash const& xor_mask) TORRENT_OVERRIDE; @@ -522,7 +529,15 @@ namespace libtorrent #ifndef TORRENT_DISABLE_DHT bool is_dht_running() const { return (m_dht.get() != NULL); } - int external_udp_port() const TORRENT_OVERRIDE { return m_external_udp_port; } + int external_udp_port() const TORRENT_OVERRIDE + { + for (std::list::const_iterator i = m_listen_sockets.begin() + , end(m_listen_sockets.end()); i != end; ++i) + { + if (i->udp_sock) return i->udp_external_port; + } + return -1; + } #endif #if TORRENT_USE_I2P @@ -1042,20 +1057,27 @@ namespace libtorrent int m_outstanding_router_lookups; #endif - bool incoming_packet(error_code const& ec - , udp::endpoint const&, char const* buf, int size) TORRENT_OVERRIDE; + void send_udp_packet_hostname(char const* hostname + , int port + , array_view p + , error_code& ec + , int flags); - // see m_external_listen_port. This is the same - // but for the udp port used by the DHT. - // TODO: 3 once udp sockets are part of m_listen_sockets, remove this - int m_external_udp_port; + void send_udp_packet(bool ssl + , udp::endpoint const& ep + , array_view p + , error_code& ec + , int flags); + + void on_udp_writeable(boost::weak_ptr s, error_code const& ec); + + void on_udp_packet(boost::weak_ptr const& s + , bool ssl, error_code const& ec); - udp_socket m_udp_socket; libtorrent::utp_socket_manager m_utp_socket_manager; #ifdef TORRENT_USE_OPENSSL // used for uTP connectons over SSL - udp_socket m_ssl_udp_socket; libtorrent::utp_socket_manager m_ssl_utp_socket_manager; #endif @@ -1068,18 +1090,17 @@ namespace libtorrent boost::shared_ptr m_upnp; boost::shared_ptr m_lsd; - // TODO: 3 once the udp socket is in listen_socket_t, these should - // move in there too - // 0 is natpmp 1 is upnp - int m_udp_mapping[2]; -#ifdef TORRENT_USE_OPENSSL - int m_ssl_udp_mapping[2]; -#endif - // mask is a bitmask of which protocols to remap on: // 1: NAT-PMP // 2: UPnP - void remap_ports(boost::uint32_t mask, listen_socket_t& s); + // TODO: 3 perhaps this function should move into listen_socket_t + enum remap_port_mask_t + { + remap_natpmp = 1, + remap_upnp = 2, + remap_natpmp_and_upnp = 3 + }; + void remap_ports(remap_port_mask_t mask, listen_socket_t& s); // the timer used to fire the tick deadline_timer m_timer; diff --git a/include/libtorrent/aux_/session_interface.hpp b/include/libtorrent/aux_/session_interface.hpp index 460a39151..f162e9a4e 100644 --- a/include/libtorrent/aux_/session_interface.hpp +++ b/include/libtorrent/aux_/session_interface.hpp @@ -249,9 +249,8 @@ namespace libtorrent { namespace aux virtual void prioritize_connections(boost::weak_ptr t) = 0; - // TODO: 3 these should go away! - virtual tcp::endpoint get_ipv6_interface() const = 0; virtual tcp::endpoint get_ipv4_interface() const = 0; + virtual tcp::endpoint get_ipv6_interface() const = 0; virtual void trigger_auto_manage() = 0; diff --git a/include/libtorrent/kademlia/dht_tracker.hpp b/include/libtorrent/kademlia/dht_tracker.hpp index d3e35cf76..f0be60acd 100644 --- a/include/libtorrent/kademlia/dht_tracker.hpp +++ b/include/libtorrent/kademlia/dht_tracker.hpp @@ -52,6 +52,7 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/socket.hpp" #include "libtorrent/thread.hpp" #include "libtorrent/deadline_timer.hpp" +#include "libtorrent/aux_/array_view.hpp" namespace libtorrent { @@ -68,10 +69,14 @@ namespace libtorrent { namespace dht struct TORRENT_EXTRA_EXPORT dht_tracker TORRENT_FINAL : udp_socket_interface - , udp_socket_observer , boost::enable_shared_from_this { - dht_tracker(dht_observer* observer, udp_socket& sock + typedef boost::function, error_code&, int)> send_fun_t; + + dht_tracker(dht_observer* observer + , io_service& ios + , send_fun_t const& send_fun , dht_settings const& settings, counters& cnt , dht_storage_constructor_type storage_constructor , entry const& state); @@ -129,10 +134,8 @@ namespace libtorrent { namespace dht , std::vector& requests); void update_stats_counters(counters& c) const; - // translate bittorrent kademlia message into the generic kademlia message - // used by the library - virtual bool incoming_packet(error_code const& ec - , udp::endpoint const&, char const* buf, int size); + void incoming_error(error_code const& ec, udp::endpoint const&); + bool incoming_packet(udp::endpoint const&, char const* buf, int size); private: @@ -154,7 +157,7 @@ namespace libtorrent { namespace dht counters& m_counters; node m_dht; - udp_socket& m_sock; + send_fun_t m_send_fun; dht_logger* m_log; std::vector m_send_buf; diff --git a/include/libtorrent/tracker_manager.hpp b/include/libtorrent/tracker_manager.hpp index 49cd84e77..910ef7aec 100644 --- a/include/libtorrent/tracker_manager.hpp +++ b/include/libtorrent/tracker_manager.hpp @@ -49,6 +49,7 @@ POSSIBILITY OF SUCH DAMAGE. #include #include #include +#include #ifdef TORRENT_USE_OPENSSL #include @@ -62,8 +63,9 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/peer.hpp" // peer_entry #include "libtorrent/deadline_timer.hpp" #include "libtorrent/union_endpoint.hpp" -#include "libtorrent/udp_socket.hpp" // for udp_socket_observer #include "libtorrent/io_service.hpp" +#include "libtorrent/thread.hpp" +#include "libtorrent/aux_/array_view.hpp" namespace libtorrent { @@ -272,8 +274,7 @@ namespace libtorrent int m_completion_timeout; - typedef mutex mutex_t; - mutable mutex_t m_mutex; + mutable mutex m_mutex; // used for timeouts // this is set when the request has been sent @@ -314,10 +315,9 @@ namespace libtorrent address const& bind_interface() const { return m_req.bind_ip; } void sent_bytes(int bytes); void received_bytes(int bytes); - virtual bool on_receive(error_code const&, udp::endpoint const& + virtual bool on_receive(udp::endpoint const& , char const* /* buf */, int /* size */) { return false; } - virtual bool on_receive_hostname(error_code const& - , char const* /* hostname */ + virtual bool on_receive_hostname(char const* /* hostname */ , char const* /* buf */, int /* size */) { return false; } boost::shared_ptr shared_from_this() @@ -341,12 +341,19 @@ namespace libtorrent }; class TORRENT_EXTRA_EXPORT tracker_manager TORRENT_FINAL - : public udp_socket_observer - , boost::noncopyable + : boost::noncopyable { public: - tracker_manager(udp_socket& sock + typedef boost::function + , error_code&, int)> send_fun_t; + typedef boost::function + , error_code&, int)> send_fun_hostname_t; + + tracker_manager(send_fun_t const& send_fun + , send_fun_hostname_t const& send_fun_hostname , counters& stats_counters , resolver_interface& resolver , aux::session_settings const& sett @@ -370,26 +377,31 @@ namespace libtorrent void sent_bytes(int bytes); void received_bytes(int bytes); - virtual bool incoming_packet(error_code const& e, udp::endpoint const& ep - , char const* buf, int size) TORRENT_OVERRIDE; + void incoming_error(error_code const& ec, udp::endpoint const& ep); + bool incoming_packet(udp::endpoint const& ep, char const* buf, int size); // this is only used for SOCKS packets, since // they may be addressed to hostname - virtual bool incoming_packet(error_code const& e, char const* hostname - , char const* buf, int size) TORRENT_OVERRIDE; + // TODO: 3 make sure the udp_socket supports passing on string-hostnames + // too, and that this function is used + bool incoming_packet(char const* hostname, char const* buf, int size); void update_transaction_id( boost::shared_ptr c , boost::uint64_t tid); aux::session_settings const& settings() const { return m_settings; } - udp_socket& get_udp_socket() { return m_udp_socket; } resolver_interface& host_resolver() { return m_host_resolver; } + void send_hostname(char const* hostname, int port, aux::array_view p + , error_code& ec, int flags = 0); + + void send(udp::endpoint const& ep, aux::array_view p + , error_code& ec, int flags = 0); + private: - typedef mutex mutex_t; - mutable mutex_t m_mutex; + mutable mutex m_mutex; // maps transactionid to the udp_tracker_connection // TODO: this should be unique_ptr in the future @@ -400,7 +412,8 @@ namespace libtorrent typedef std::vector > http_conns_t; http_conns_t m_http_conns; - class udp_socket& m_udp_socket; + send_fun_t m_send_fun; + send_fun_hostname_t m_send_fun_hostname; resolver_interface& m_host_resolver; aux::session_settings const& m_settings; counters& m_stats_counters; diff --git a/include/libtorrent/udp_socket.hpp b/include/libtorrent/udp_socket.hpp index 60ba212c5..f3338ce3f 100644 --- a/include/libtorrent/udp_socket.hpp +++ b/include/libtorrent/udp_socket.hpp @@ -41,30 +41,12 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/thread.hpp" #include "libtorrent/deadline_timer.hpp" #include "libtorrent/debug.hpp" +#include "libtorrent/aux_/array_view.hpp" #include "libtorrent/aux_/allocating_handler.hpp" -#include - namespace libtorrent { - struct TORRENT_EXTRA_EXPORT udp_socket_observer - { - // return true if the packet was handled (it won't be - // propagated to the next observer) - virtual bool incoming_packet(error_code const& ec - , udp::endpoint const&, char const* buf, int size) = 0; - virtual bool incoming_packet(error_code const& /* ec */ - , char const* /* hostname */, char const* /* buf */, int /* size */) { return false; } - - // called when the socket becomes writeable, after having - // failed with EWOULDBLOCK - virtual void writable() {} - - // called every time the socket is drained of packets - virtual void socket_drained() {} - protected: - ~udp_socket_observer() {} - }; + struct socks5; class TORRENT_EXTRA_EXPORT udp_socket : single_threaded { @@ -76,19 +58,38 @@ namespace libtorrent peer_connection = 1 , tracker_connection = 2 , dont_queue = 4 + , dont_fragment = 8 }; bool is_open() const { return m_abort == false; } - io_service& get_io_service() { return m_ipv4_sock.get_io_service(); } + io_service& get_io_service() { return m_socket.get_io_service(); } - void subscribe(udp_socket_observer* o); - void unsubscribe(udp_socket_observer* o); + template + void async_read(Handler h) + { + m_socket.async_receive(null_buffers(), h); + } + + template + void async_write(Handler h) + { + m_socket.async_send(null_buffers(), h); + } + + struct packet + { + aux::array_view data; + udp::endpoint from; + error_code error; + }; + + int read(aux::array_view pkts, error_code& ec); // this is only valid when using a socks5 proxy - void send_hostname(char const* hostname, int port, char const* p - , int len, error_code& ec, int flags = 0); + void send_hostname(char const* hostname, int port, aux::array_view p + , error_code& ec, int flags = 0); - void send(udp::endpoint const& ep, char const* p, int len + void send(udp::endpoint const& ep, aux::array_view p , error_code& ec, int flags = 0); void bind(udp::endpoint const& ep, error_code& ec); void close(); @@ -99,13 +100,8 @@ namespace libtorrent void set_force_proxy(bool f) { m_force_proxy = f; } bool is_closed() const { return m_abort; } - tcp::endpoint local_endpoint(error_code& ec) const - { - udp::endpoint ep = m_ipv4_sock.local_endpoint(ec); - return tcp::endpoint(ep.address(), ep.port()); - } - - void set_buf_size(int s); + udp::endpoint local_endpoint(error_code& ec) const + { return m_socket.local_endpoint(ec); } typedef udp::socket::receive_buffer_size receive_buffer_size; typedef udp::socket::send_buffer_size send_buffer_size; @@ -113,201 +109,51 @@ namespace libtorrent template void get_option(SocketOption const& opt, error_code& ec) { -#if TORRENT_USE_IPV6 - if (opt.level(udp::v6()) == IPPROTO_IPV6) - m_ipv6_sock.get_option(opt, ec); - else -#endif - m_ipv4_sock.get_option(opt, ec); + m_socket.get_option(opt, ec); } template void set_option(SocketOption const& opt, error_code& ec) { - if (opt.level(udp::v4()) != IPPROTO_IPV6) - m_ipv4_sock.set_option(opt, ec); -#if TORRENT_USE_IPV6 - if (opt.level(udp::v6()) != IPPROTO_IP) - m_ipv6_sock.set_option(opt, ec); -#endif + m_socket.set_option(opt, ec); } template void get_option(SocketOption& opt, error_code& ec) { -#if TORRENT_USE_IPV6 - if (opt.level(udp::v6()) == IPPROTO_IPV6) - m_ipv6_sock.get_option(opt, ec); - else -#endif - m_ipv4_sock.get_option(opt, ec); + m_socket.get_option(opt, ec); } - udp::endpoint proxy_addr() const { return m_proxy_addr; } - private: - struct queued_packet - { - queued_packet() - : hostname(NULL) - , flags(0) - {} - - udp::endpoint ep; - char* hostname; - buffer buf; - int flags; - }; - - // number of outstanding UDP socket operations - // using the UDP socket buffer - int num_outstanding() const - { - return m_v4_outstanding -#if TORRENT_USE_IPV6 - + m_v6_outstanding -#endif - ; - } - // non-copyable udp_socket(udp_socket const&); udp_socket& operator=(udp_socket const&); - void close_impl(); + void wrap(udp::endpoint const& ep, aux::array_view p, error_code& ec, int flags); + void wrap(char const* hostname, int port, aux::array_view p, error_code& ec, int flags); + bool unwrap(udp::endpoint& from, aux::array_view& buf); - // observers on this udp socket - std::vector m_observers; - std::vector m_added_observers; + udp::socket m_socket; - template - aux::allocating_handler - make_read_handler4(Handler const& handler) - { - return aux::allocating_handler( - handler, m_v4_read_handler_storage - ); - } - -#if TORRENT_USE_IPV6 - template - aux::allocating_handler - make_read_handler6(Handler const& handler) - { - return aux::allocating_handler( - handler, m_v6_read_handler_storage - ); - } -#endif - - // this is true while iterating over the observers - // vector, invoking observer hooks. We may not - // add new observers during this time, since it - // may invalidate the iterator. If this is true, - // instead add new observers to m_added_observers - // and they will be added later - bool m_observers_locked; - - void call_handler(error_code const& ec, udp::endpoint const& ep - , char const* buf, int size); - void call_handler(error_code const& ec, const char* host - , char const* buf, int size); - void call_drained_handler(); - void call_writable_handler(); - - void on_writable(error_code const& ec, udp::socket* s); - - void setup_read(udp::socket* s); - void on_read(error_code const& ec, udp::socket* s); - void on_read_impl(udp::endpoint const& ep - , error_code const& e, std::size_t bytes_transferred); - void on_name_lookup(error_code const& e, tcp::resolver::iterator i); - void on_connect_timeout(error_code const& ec); - void on_connected(error_code const& ec); - void handshake1(error_code const& e); - void handshake2(error_code const& e); - void handshake3(error_code const& e); - void handshake4(error_code const& e); - void socks_forward_udp(); - void connect1(error_code const& e); - void connect2(error_code const& e); - void hung_up(error_code const& e); - - void drain_queue(); - - void wrap(udp::endpoint const& ep, char const* p, int len, error_code& ec); - void wrap(char const* hostname, int port, char const* p, int len, error_code& ec); - void unwrap(error_code const& e, char const* buf, int size); - - udp::socket m_ipv4_sock; - aux::handler_storage m_v4_read_handler_storage; - deadline_timer m_timer; - int m_buf_size; - - // if the buffer size is attempted - // to be changed while the buffer is - // being used, this member is set to - // the desired size, and it's resized - // later - int m_new_buf_size; + // TODO: 2 this should probably be a scoped_ptr<> or unique_ptr + // with a hard coded size + int const m_buf_size; char* m_buf; -#if TORRENT_USE_IPV6 - udp::socket m_ipv6_sock; - aux::handler_storage m_v6_read_handler_storage; -#endif - boost::uint16_t m_bind_port; - boost::uint8_t m_v4_outstanding; - boost::uint8_t m_restart_v4; -#if TORRENT_USE_IPV6 - boost::uint8_t m_v6_outstanding; - boost::uint8_t m_restart_v6; -#endif - tcp::socket m_socks5_sock; aux::proxy_settings m_proxy_settings; - tcp::resolver m_resolver; - char m_tmp_buf[270]; - bool m_queue_packets; - bool m_tunnel_packets; - bool m_force_proxy; - bool m_abort; - // this is the endpoint the proxy server lives at. - // when performing a UDP associate, we get another - // endpoint (presumably on the same IP) where we're - // supposed to send UDP packets. - udp::endpoint m_proxy_addr; + boost::shared_ptr m_socks5_connection; - // this is where UDP packets that are to be forwarded - // are sent. The result from UDP ASSOCIATE is stored - // in here. - udp::endpoint m_udp_proxy_addr; - - // while we're connecting to the proxy - // we have to queue the packets, we'll flush - // them once we're connected - std::deque m_queue; - - // counts the number of outstanding async - // operations hanging on this socket - int m_outstanding_ops; - -#if TORRENT_USE_IPV6 - bool m_v6_write_subscribed:1; -#endif - bool m_v4_write_subscribed:1; + // TODO: 3 add a unit test for force-proxy + bool m_force_proxy:1; + bool m_abort:1; #if TORRENT_USE_ASSERTS bool m_started; int m_magic; - int m_outstanding_when_aborted; - int m_outstanding_connect; - int m_outstanding_timeout; - int m_outstanding_resolve; - int m_outstanding_socks; #endif }; } diff --git a/include/libtorrent/udp_tracker_connection.hpp b/include/libtorrent/udp_tracker_connection.hpp index fe41b886e..02f621a69 100644 --- a/include/libtorrent/udp_tracker_connection.hpp +++ b/include/libtorrent/udp_tracker_connection.hpp @@ -94,10 +94,8 @@ namespace libtorrent void timeout(error_code const& error); void start_announce(); - bool on_receive(error_code const& e, udp::endpoint const& ep - , char const* buf, int size); - bool on_receive_hostname(error_code const& e, char const* hostname - , char const* buf, int size); + bool on_receive(udp::endpoint const& ep, char const* buf, int size); + bool on_receive_hostname(char const* hostname, char const* buf, int size); bool on_connect_response(char const* buf, int size); bool on_announce_response(char const* buf, int size); bool on_scrape_response(char const* buf, int size); diff --git a/include/libtorrent/utp_socket_manager.hpp b/include/libtorrent/utp_socket_manager.hpp index ea7298072..ce387db4f 100644 --- a/include/libtorrent/utp_socket_manager.hpp +++ b/include/libtorrent/utp_socket_manager.hpp @@ -39,36 +39,51 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/session_status.hpp" #include "libtorrent/enum_net.hpp" #include "libtorrent/aux_/session_settings.hpp" +#include "libtorrent/aux_/array_view.hpp" + +#include "libtorrent/aux_/disable_warnings_push.hpp" + +#include + +#include "libtorrent/aux_/disable_warnings_pop.hpp" namespace libtorrent { - class udp_socket; class utp_stream; struct utp_socket_impl; struct counters; - typedef boost::function const&)> incoming_utp_callback_t; - - struct utp_socket_manager TORRENT_FINAL : udp_socket_observer + struct utp_socket_manager TORRENT_FINAL { - utp_socket_manager(aux::session_settings const& sett, udp_socket& s - , counters& cnt, void* ssl_context, incoming_utp_callback_t cb); + typedef boost::function + , error_code&, int)> send_fun_t; + + typedef boost::function const&)> + incoming_utp_callback_t; + + utp_socket_manager(send_fun_t const& send_fun + , incoming_utp_callback_t const& cb + , io_service& ios + , aux::session_settings const& sett + , counters& cnt, void* ssl_context + ); ~utp_socket_manager(); // return false if this is not a uTP packet - virtual bool incoming_packet(error_code const& ec, udp::endpoint const& ep - , char const* p, int size) TORRENT_OVERRIDE; - virtual bool incoming_packet(error_code const&, char const*, char const*, int) TORRENT_OVERRIDE - { return false; } - virtual void writable() TORRENT_OVERRIDE; + bool incoming_packet(udp::endpoint const& ep, char const* p, int size); - virtual void socket_drained() TORRENT_OVERRIDE; + // if the UDP socket failed with an EAGAIN or EWOULDBLOCK, this will be + // called once the socket is writeable again + void writable(); + + // when the upper layer has drained the underlying UDP socket, this is + // called, and uTP sockets will send their ACKs. This ensures ACKs at + // least coalese packets returned during the same wakeup + void socket_drained(); void tick(time_point now); - tcp::endpoint local_endpoint(address const& remote, error_code& ec) const; - int local_port(error_code& ec) const; - // flags for send_packet enum { dont_fragment = 1 }; void send_packet(udp::endpoint const& ep, char const* p, int len @@ -89,7 +104,6 @@ namespace libtorrent int loss_multiplier() const { return m_sett.get_int(settings_pack::utp_loss_multiplier); } void mtu_for_dest(address const& addr, int& link_mtu, int& utp_mtu); - void set_sock_buf(int size); int num_sockets() const { return m_utp_sockets.size(); } void defer_ack(utp_socket_impl* s); @@ -114,7 +128,7 @@ namespace libtorrent // explicitly disallow assignment, to silence msvc warning utp_socket_manager& operator=(utp_socket_manager const&); - udp_socket& m_sock; + send_fun_t m_send_fun; incoming_utp_callback_t m_cb; // replace with a hash-map @@ -165,6 +179,8 @@ namespace libtorrent // stats counters counters& m_counters; + io_service& m_ios; + boost::array m_restrict_mtu; int m_mtu_idx; diff --git a/simulation/test_dht_rate_limit.cpp b/simulation/test_dht_rate_limit.cpp index fdf1be94c..458a864ee 100644 --- a/simulation/test_dht_rate_limit.cpp +++ b/simulation/test_dht_rate_limit.cpp @@ -30,8 +30,6 @@ POSSIBILITY OF SUCH DAMAGE. */ -#if !defined TORRENT_DISABLE_DHT - #include "test.hpp" #include "simulator/simulator.hpp" @@ -41,6 +39,7 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/performance_counters.hpp" #include "libtorrent/entry.hpp" #include "libtorrent/session_settings.hpp" +#include "libtorrent/aux_/array_view.hpp" #include "libtorrent/kademlia/dht_observer.hpp" #include @@ -50,6 +49,8 @@ using namespace libtorrent; namespace lt = libtorrent; using namespace sim; +#if !defined TORRENT_DISABLE_DHT + struct obs : dht::dht_observer { virtual void set_external_address(address const& addr @@ -100,8 +101,22 @@ TORRENT_TEST(dht_rate_limit) counters cnt; entry state; boost::shared_ptr dht = boost::make_shared( - &o, sock, dhtsett, cnt, dht::dht_default_storage_constructor, state); - sock.subscribe(dht.get()); + &o, boost::ref(dht_ios), boost::bind(&udp_socket::send, &sock, _1, _2, _3, _4) + , dhtsett, cnt, dht::dht_default_storage_constructor, state); + + bool stop = false; + std::function on_read + = [&](error_code const& ec, size_t bytes) + { + if (ec) return; + udp_socket::packet p; + error_code err; + int const num = sock.read(lt::aux::array_view(&p, 1), err); + if (num) dht->incoming_packet(p.from, p.data.data(), p.data.size()); + if (stop || err) return; + sock.async_read(on_read); + }; + sock.async_read(on_read); // sender int num_packets_sent = 0; @@ -120,7 +135,7 @@ TORRENT_TEST(dht_rate_limit) timer.async_wait([&](error_code const& ec) { dht->stop(); - sock.unsubscribe(dht.get()); + stop = true; sender_sock.close(); sock.close(); }); diff --git a/src/kademlia/dht_tracker.cpp b/src/kademlia/dht_tracker.cpp index 529d0d516..8b3ce2f08 100644 --- a/src/kademlia/dht_tracker.cpp +++ b/src/kademlia/dht_tracker.cpp @@ -90,21 +90,22 @@ namespace libtorrent { namespace dht // class that puts the networking and the kademlia node in a single // unit and connecting them together. dht_tracker::dht_tracker(dht_observer* observer - , udp_socket& sock + , io_service& ios + , send_fun_t const& send_fun , dht_settings const& settings , counters& cnt , dht_storage_constructor_type storage_constructor , entry const& state) : m_counters(cnt) , m_dht(this, settings, extract_node_id(state), observer, cnt, storage_constructor) - , m_sock(sock) + , m_send_fun(send_fun) , m_log(observer) - , m_key_refresh_timer(sock.get_io_service()) - , m_connection_timer(sock.get_io_service()) - , m_refresh_timer(sock.get_io_service()) + , m_key_refresh_timer(ios) + , m_connection_timer(ios) + , m_refresh_timer(ios) , m_settings(settings) , m_abort(false) - , m_host_resolver(sock.get_io_service()) + , m_host_resolver(ios) , m_send_quota(settings.upload_rate_limit) , m_last_tick(aux::time_now()) { @@ -277,29 +278,26 @@ namespace libtorrent { namespace dht m_dht.direct_request(ep, e, f); } - // translate bittorrent kademlia message into the generice kademlia message - // used by the library - bool dht_tracker::incoming_packet(error_code const& ec - , udp::endpoint const& ep, char const* buf, int size) + void dht_tracker::incoming_error(error_code const& ec, udp::endpoint const& ep) { - if (ec) - { - if (ec == boost::asio::error::connection_refused - || ec == boost::asio::error::connection_reset - || ec == boost::asio::error::connection_aborted + if (ec == boost::asio::error::connection_refused + || ec == boost::asio::error::connection_reset + || ec == boost::asio::error::connection_aborted #ifdef WIN32 - || ec == error_code(ERROR_HOST_UNREACHABLE, system_category()) - || ec == error_code(ERROR_PORT_UNREACHABLE, system_category()) - || ec == error_code(ERROR_CONNECTION_REFUSED, system_category()) - || ec == error_code(ERROR_CONNECTION_ABORTED, system_category()) + || ec == error_code(ERROR_HOST_UNREACHABLE, system_category()) + || ec == error_code(ERROR_PORT_UNREACHABLE, system_category()) + || ec == error_code(ERROR_CONNECTION_REFUSED, system_category()) + || ec == error_code(ERROR_CONNECTION_ABORTED, system_category()) #endif - ) - { - m_dht.unreachable(ep); - } - return false; + ) + { + m_dht.unreachable(ep); } + } + bool dht_tracker::incoming_packet( udp::endpoint const& ep + , char const* buf, int size) + { if (size <= 20 || *buf != 'd' || buf[size-1] != 'e') return false; // remove this line/check once the DHT supports IPv6 if (!ep.address().is_v4()) return false; @@ -316,7 +314,7 @@ namespace libtorrent { namespace dht // these are class A networks not available to the public // if we receive messages from here, that seems suspicious - boost::uint8_t class_a[] = { 3, 6, 7, 9, 11, 19, 21, 22, 25 + static boost::uint8_t const class_a[] = { 3, 6, 7, 9, 11, 19, 21, 22, 25 , 26, 28, 29, 30, 33, 34, 48, 51, 56 }; int num = sizeof(class_a)/sizeof(class_a[0]); @@ -451,7 +449,7 @@ namespace libtorrent { namespace dht m_send_quota -= m_send_buf.size(); error_code ec; - m_sock.send(addr, &m_send_buf[0], int(m_send_buf.size()), ec, 0); + m_send_fun(addr, aux::array_view(&m_send_buf[0], m_send_buf.size()), ec, 0); if (ec) { m_counters.inc_stats_counter(counters::dht_messages_out_dropped); diff --git a/src/session_impl.cpp b/src/session_impl.cpp index 8b8bb4005..3b14b77f6 100644 --- a/src/session_impl.cpp +++ b/src/session_impl.cpp @@ -374,7 +374,11 @@ namespace aux { , m_global_class(0) , m_tcp_peer_class(0) , m_local_peer_class(0) - , m_tracker_manager(m_udp_socket, m_stats_counters, m_host_resolver + , m_tracker_manager( + boost::bind(&session_impl::send_udp_packet, this, false, _1, _2, _3, _4) + , boost::bind(&session_impl::send_udp_packet_hostname, this, _1, _2, _3, _4, _5) + , m_stats_counters + , m_host_resolver , m_settings #if !defined TORRENT_DISABLE_LOGGING || TORRENT_USE_ASSERTS , *this @@ -413,15 +417,18 @@ namespace aux { , m_dht_interval_update_torrents(0) , m_outstanding_router_lookups(0) #endif - , m_external_udp_port(0) - , m_udp_socket(m_io_service) - , m_utp_socket_manager(m_settings, m_udp_socket, m_stats_counters, NULL - , boost::bind(&session_impl::incoming_connection, this, _1)) + , m_utp_socket_manager( + boost::bind(&session_impl::send_udp_packet, this, false, _1, _2, _3, _4) + , boost::bind(&session_impl::incoming_connection, this, _1) + , m_io_service + , m_settings, m_stats_counters, NULL) #ifdef TORRENT_USE_OPENSSL - , m_ssl_udp_socket(m_io_service) - , m_ssl_utp_socket_manager(m_settings, m_ssl_udp_socket, m_stats_counters - , &m_ssl_ctx - , boost::bind(&session_impl::on_incoming_utp_ssl, this, _1)) + , m_ssl_utp_socket_manager( + boost::bind(&session_impl::send_udp_packet, this, true, _1, _2, _3, _4) + , boost::bind(&session_impl::on_incoming_utp_ssl, this, _1) + , m_io_service + , m_settings, m_stats_counters + , &m_ssl_ctx) #endif , m_boost_connections(0) , m_timer(m_io_service) @@ -444,18 +451,6 @@ namespace aux { #if TORRENT_USE_ASSERTS m_posting_torrent_updates = false; #endif - - m_udp_socket.subscribe(&m_utp_socket_manager); - m_udp_socket.subscribe(this); - m_udp_socket.subscribe(&m_tracker_manager); - -#ifdef TORRENT_USE_OPENSSL - m_ssl_udp_socket.subscribe(&m_ssl_utp_socket_manager); - m_ssl_udp_socket.subscribe(this); -#endif - - error_code ec; - TORRENT_ASSERT_VAL(!ec, ec); } // This function is called by the creating thread, not in the message loop's @@ -489,12 +484,6 @@ namespace aux { m_next_dht_torrent = m_torrents.begin(); #endif m_next_lsd_torrent = m_torrents.begin(); - m_udp_mapping[0] = -1; - m_udp_mapping[1] = -1; -#ifdef TORRENT_USE_OPENSSL - m_ssl_udp_mapping[0] = -1; - m_ssl_udp_mapping[1] = -1; -#endif m_global_class = m_classes.new_peer_class("global"); m_tcp_peer_class = m_classes.new_peer_class("tcp"); @@ -1046,10 +1035,19 @@ namespace aux { for (std::list::iterator i = m_listen_sockets.begin() , end(m_listen_sockets.end()); i != end; ++i) { - i->sock->close(ec); - TORRENT_ASSERT(!ec); + if (i->sock) + { + i->sock->close(ec); + TORRENT_ASSERT(!ec); + } + + // TODO: 3 closing the udp sockets here means that + // the uTP connections cannot be closed gracefully + if (i->udp_sock) + { + i->udp_sock->close(); + } } - m_listen_sockets.clear(); if (m_socks_listen_socket && m_socks_listen_socket->is_open()) { m_socks_listen_socket->close(ec); @@ -1113,12 +1111,6 @@ namespace aux { m_download_rate.close(); m_upload_rate.close(); - m_udp_socket.close(); - m_external_udp_port = 0; -#ifdef TORRENT_USE_OPENSSL - m_ssl_udp_socket.close(); -#endif - // it's OK to detach the threads here. The disk_io_thread // has an internal counter and won't release the network // thread until they're all dead (via m_work). @@ -1192,7 +1184,7 @@ namespace aux { template void set_socket_buffer_size(Socket& s, session_settings const& sett, error_code& ec) { - int snd_size = sett.get_int(settings_pack::send_socket_buffer_size); + int const snd_size = sett.get_int(settings_pack::send_socket_buffer_size); if (snd_size) { typename Socket::send_buffer_size prev_option; @@ -1209,7 +1201,7 @@ namespace aux { } } } - int recv_size = sett.get_int(settings_pack::recv_socket_buffer_size); + int const recv_size = sett.get_int(settings_pack::recv_socket_buffer_size); if (recv_size) { typename Socket::receive_buffer_size prev_option; @@ -1292,6 +1284,8 @@ namespace aux { } #endif +//TODO: should there be an option to announce once per listen interface? + m_tracker_manager.queue_request(get_io_service(), req, c); } @@ -1648,24 +1642,29 @@ namespace aux { } #endif - // TODO: 2 remove this function + // TODO: 3 try to remove these functions. They are misleading and not very + // useful. Anything using these should probably be fixed to do something more + // multi-homed friendly tcp::endpoint session_impl::get_ipv6_interface() const { +#if TORRENT_USE_IPV6 for (std::list::const_iterator i = m_listen_sockets.begin() , end(m_listen_sockets.end()); i != end; ++i) { - if (i->local_endpoint.address().is_v6()) return i->local_endpoint; + if (!i->local_endpoint.address().is_v6()) continue; + return tcp::endpoint(i->local_endpoint.address(), i->tcp_external_port); } +#endif return tcp::endpoint(); } - // TODO: 2 remove this function tcp::endpoint session_impl::get_ipv4_interface() const { for (std::list::const_iterator i = m_listen_sockets.begin() , end(m_listen_sockets.end()); i != end; ++i) { - if (i->local_endpoint.address().is_v4()) return i->local_endpoint; + if (!i->local_endpoint.address().is_v4()) continue; + return tcp::endpoint(i->local_endpoint.address(), i->tcp_external_port); } return tcp::endpoint(); } @@ -1689,83 +1688,201 @@ namespace aux { = (flags & open_ssl_socket) ? listen_failed_alert::tcp_ssl : listen_failed_alert::tcp; - ret.sock.reset(new tcp::acceptor(m_io_service)); - ret.sock->open(bind_ep.protocol(), ec); - last_op = listen_failed_alert::open; - if (ec) + + // if we're in force-proxy mode, don't open TCP listen sockets. We cannot + // accept connections on our local machine in this case. + // TODO: 3 the logic in this if-block should be factored out into a + // separate function. At least most of it + if (!m_settings.get_bool(settings_pack::force_proxy)) { + ret.sock = boost::make_shared(boost::ref(m_io_service)); + ret.sock->open(bind_ep.protocol(), ec); + last_op = listen_failed_alert::open; + if (ec) + { #ifndef TORRENT_DISABLE_LOGGING - session_log("failed to open socket: %s" - , ec.message().c_str()); + session_log("failed to open socket: %s" + , ec.message().c_str()); #endif - if (m_alerts.should_post()) - m_alerts.emplace_alert(device, bind_ep, last_op - , ec, sock_type); - return ret; - } + if (m_alerts.should_post()) + m_alerts.emplace_alert(device, bind_ep, last_op + , ec, sock_type); + return ret; + } #ifdef TORRENT_WINDOWS - { - // this is best-effort. ignore errors - error_code err; - ret.sock->set_option(exclusive_address_use(true), err); -#ifndef TORRENT_DISABLE_LOGGING - if (err) { - session_log("failed enable exclusive address use on listen socket: %s" - , err.message().c_str()); - } + // this is best-effort. ignore errors + error_code err; + ret.sock->set_option(exclusive_address_use(true), err); +#ifndef TORRENT_DISABLE_LOGGING + if (err) + { + session_log("failed enable exclusive address use on listen socket: %s" + , err.message().c_str()); + } #endif // TORRENT_DISABLE_LOGGING - } + } #endif // TORRENT_WINDOWS - { - // this is best-effort. ignore errors - error_code err; - ret.sock->set_option(tcp::acceptor::reuse_address(true), err); -#ifndef TORRENT_DISABLE_LOGGING - if (err) { - session_log("failed enable reuse-address on listen socket: %s" - , err.message().c_str()); - } + // this is best-effort. ignore errors + error_code err; + ret.sock->set_option(tcp::acceptor::reuse_address(true), err); +#ifndef TORRENT_DISABLE_LOGGING + if (err) + { + session_log("failed enable reuse-address on listen socket: %s" + , err.message().c_str()); + } #endif // TORRENT_DISABLE_LOGGING - } + } #if TORRENT_USE_IPV6 - if (bind_ep.address().is_v6()) - { - error_code err; // ignore errors here - ret.sock->set_option(boost::asio::ip::v6_only(true), err); -#ifndef TORRENT_DISABLE_LOGGING - if (err) + if (bind_ep.address().is_v6()) { - session_log("failed enable v6 only on listen socket: %s" - , err.message().c_str()); - } + error_code err; // ignore errors here + ret.sock->set_option(boost::asio::ip::v6_only(true), err); +#ifndef TORRENT_DISABLE_LOGGING + if (err) + { + session_log("failed enable v6 only on listen socket: %s" + , err.message().c_str()); + } #endif // LOGGING #ifdef TORRENT_WINDOWS - // enable Teredo on windows - ret.sock->set_option(v6_protection_level(PROTECTION_LEVEL_UNRESTRICTED), err); + // enable Teredo on windows + ret.sock->set_option(v6_protection_level(PROTECTION_LEVEL_UNRESTRICTED), err); #ifndef TORRENT_DISABLE_LOGGING - if (err) - { - session_log("failed enable IPv6 unrestricted protection level on " - "listen socket: %s", err.message().c_str()); - } + if (err) + { + session_log("failed enable IPv6 unrestricted protection level on " + "listen socket: %s", err.message().c_str()); + } #endif // TORRENT_DISABLE_LOGGING #endif // TORRENT_WINDOWS - } + } #endif // TORRENT_USE_IPV6 + if (!device.empty()) + { + // we have an actual device we're interested in listening on, if we + // have SO_BINDTODEVICE functionality, use it now. +#if TORRENT_HAS_BINDTODEVICE + ret.sock->set_option(bind_to_device(device.c_str()), ec); + if (ec) + { +#ifndef TORRENT_DISABLE_LOGGING + session_log("bind to device failed (device: %s): %s" + , device.c_str(), ec.message().c_str()); +#endif // TORRENT_DISABLE_LOGGING + + last_op = listen_failed_alert::bind_to_device; + if (m_alerts.should_post()) + { + m_alerts.emplace_alert(device, bind_ep + , last_op, ec, sock_type); + } + return ret; + } +#endif + } + + ret.sock->bind(bind_ep, ec); + last_op = listen_failed_alert::bind; + + while (ec == error_code(error::address_in_use) && retries > 0) + { + TORRENT_ASSERT_VAL(ec, ec); +#ifndef TORRENT_DISABLE_LOGGING + error_code ignore; + session_log("failed to bind listen socket to: %s on device: %s :" + " [%s] (%d) %s (retries: %d)" + , print_endpoint(bind_ep).c_str() + , device.c_str() + , ec.category().name(), ec.value(), ec.message().c_str() + , retries); +#endif + ec.clear(); + --retries; + bind_ep.port(bind_ep.port() + 1); + ret.sock->bind(bind_ep, ec); + } + + if (ec == error_code(error::address_in_use) + && !(flags & listen_no_system_port)) + { + // instead of giving up, try let the OS pick a port + bind_ep.port(0); + ec.clear(); + ret.sock->bind(bind_ep, ec); + last_op = listen_failed_alert::bind; + } + + if (ec) + { + // not even that worked, give up + +#ifndef TORRENT_DISABLE_LOGGING + error_code ignore; + session_log("failed to bind listen socket to: %s on device: %s :" + " [%s] (%d) %s (giving up)" + , print_endpoint(bind_ep).c_str() + , device.c_str() + , ec.category().name(), ec.value(), ec.message().c_str()); +#endif + if (m_alerts.should_post()) + { + m_alerts.emplace_alert(device, bind_ep + , last_op, ec, sock_type); + } + ret.sock.reset(); + return ret; + } + ret.local_endpoint = ret.sock->local_endpoint(ec); + last_op = listen_failed_alert::get_socket_name; + if (ec) + { +#ifndef TORRENT_DISABLE_LOGGING + session_log("get_sockname failed on listen socket: %s" + , ec.message().c_str()); +#endif + if (m_alerts.should_post()) + { + m_alerts.emplace_alert(device, bind_ep + , last_op, ec, sock_type); + } + return ret; + } + ret.tcp_external_port = ret.local_endpoint.port(); + TORRENT_ASSERT(ret.tcp_external_port == bind_ep.port() + || bind_ep.port() == 0); + + ret.sock->listen(m_settings.get_int(settings_pack::listen_queue_size), ec); + last_op = listen_failed_alert::listen; + + if (ec) + { +#ifndef TORRENT_DISABLE_LOGGING + session_log("cannot listen on interface \"%s\": %s" + , device.c_str(), ec.message().c_str()); +#endif + if (m_alerts.should_post()) + { + m_alerts.emplace_alert(device, bind_ep + , last_op, ec, sock_type); + } + return ret; + } + } // force-proxy mode + + ret.udp_sock = boost::make_shared(boost::ref(m_io_service)); +#if TORRENT_HAS_BINDTODEVICE if (!device.empty()) { - // we have an actual device we're interested in listening on, if we - // have SO_BINDTODEVICE functionality, use it now. -#if TORRENT_HAS_BINDTODEVICE - ret.sock->set_option(bind_to_device(device.c_str()), ec); + ret.udp_sock->set_option(bind_to_device(device.c_str()), ec); if (ec) { #ifndef TORRENT_DISABLE_LOGGING @@ -1781,98 +1898,51 @@ namespace aux { } return ret; } -#endif } +#endif + ret.udp_sock->bind(udp::endpoint(bind_ep.address(), bind_ep.port()) + , ec); - ret.sock->bind(bind_ep, ec); last_op = listen_failed_alert::bind; - - while (ec == error_code(error::address_in_use) && retries > 0) - { - TORRENT_ASSERT_VAL(ec, ec); -#ifndef TORRENT_DISABLE_LOGGING - error_code ignore; - session_log("failed to bind listen socket to: %s on device: %s :" - " [%s] (%d) %s (retries: %d)" - , print_endpoint(bind_ep).c_str() - , device.c_str() - , ec.category().name(), ec.value(), ec.message().c_str() - , retries); -#endif - ec.clear(); - --retries; - bind_ep.port(bind_ep.port() + 1); - ret.sock->bind(bind_ep, ec); - } - - if (ec == error_code(error::address_in_use) - && !(flags & listen_no_system_port)) - { - // instead of giving up, try let the OS pick a port - bind_ep.port(0); - ec.clear(); - ret.sock->bind(bind_ep, ec); - last_op = listen_failed_alert::bind; - } - - if (ec) - { - // not even that worked, give up - -#ifndef TORRENT_DISABLE_LOGGING - error_code ignore; - session_log("failed to bind listen socket to: %s on device: %s :" - " [%s] (%d) %s (giving up)" - , print_endpoint(bind_ep).c_str() - , device.c_str() - , ec.category().name(), ec.value(), ec.message().c_str()); -#endif - if (m_alerts.should_post()) - { - m_alerts.emplace_alert(device, bind_ep - , last_op, ec, sock_type); - } - ret.sock.reset(); - return ret; - } - ret.local_endpoint = ret.sock->local_endpoint(ec); - last_op = listen_failed_alert::get_socket_name; if (ec) { #ifndef TORRENT_DISABLE_LOGGING - session_log("get_sockname failed on listen socket: %s" - , ec.message().c_str()); -#endif - if (m_alerts.should_post()) - { - m_alerts.emplace_alert(device, bind_ep - , last_op, ec, sock_type); - } - return ret; - } - ret.tcp_external_port = ret.local_endpoint.port(); - - ret.sock->listen(m_settings.get_int(settings_pack::listen_queue_size), ec); - last_op = listen_failed_alert::listen; - - if (ec) - { -#ifndef TORRENT_DISABLE_LOGGING - session_log("cannot listen on interface \"%s\": %s" + session_log("failed to open UDP socket: %s: %s" , device.c_str(), ec.message().c_str()); #endif + + listen_failed_alert::socket_type_t const udp_sock_type + = (flags & open_ssl_socket) + ? listen_failed_alert::utp_ssl + : listen_failed_alert::udp; + if (m_alerts.should_post()) - { - m_alerts.emplace_alert(device, bind_ep - , last_op, ec, sock_type); - } + m_alerts.emplace_alert(device + , bind_ep, last_op, ec, udp_sock_type); + return ret; } + ret.udp_external_port = ret.udp_sock->local_port(); + + error_code err; + set_socket_buffer_size(*ret.udp_sock, m_settings, err); + if (err) + { + if (m_alerts.should_post()) + m_alerts.emplace_alert(ret.udp_sock->local_endpoint(ec), err); + } + + ret.udp_sock->set_force_proxy(m_settings.get_bool(settings_pack::force_proxy)); + + // TODO: 2 use a handler allocator here + ADD_OUTSTANDING_ASYNC("session_impl::on_udp_packet"); + ret.udp_sock->async_read(boost::bind(&session_impl::on_udp_packet + , this, boost::weak_ptr(ret.udp_sock), ret.ssl, _1)); #ifndef TORRENT_DISABLE_LOGGING - session_log(" listening on: %s TCP port: %d" - , print_endpoint(bind_ep).c_str() - , ret.tcp_external_port); + session_log(" listening on: %s TCP port: %d UDP port: %d" + , bind_ep.address().to_string().c_str() + , ret.tcp_external_port, ret.udp_external_port); #endif return ret; } @@ -1898,7 +1968,8 @@ namespace aux { for (std::list::iterator i = m_listen_sockets.begin() , end(m_listen_sockets.end()); i != end; ++i) { - i->sock->close(ec); + if (i->sock) i->sock->close(ec); + if (i->udp_sock) i->udp_sock->close(); } m_listen_sockets.clear(); @@ -1920,10 +1991,10 @@ namespace aux { // First, check to see if it's an IP address error_code err; - address adr = address::from_string(device.c_str(), err); + address const adr = address::from_string(device.c_str(), err); if (!err) { - listen_socket_t s = setup_listener("", tcp::endpoint(adr, port) + listen_socket_t const s = setup_listener("", tcp::endpoint(adr, port) , flags | (ssl ? open_ssl_socket : 0), ec); if (!ec && s.sock) @@ -1936,6 +2007,7 @@ namespace aux { // this is the case where device names a network device. We need to // enumerate all IPs associated with this device + // TODO: 3 only run this once, not every turn through the loop std::vector ifs = enum_net_interfaces(m_io_service, ec); if (ec) { @@ -1959,7 +2031,7 @@ namespace aux { // connecting to) if (device != ifs[k].name) continue; - listen_socket_t s = setup_listener(device + listen_socket_t const s = setup_listener(device , tcp::endpoint(ifs[k].interface_address, port) , flags | (ssl ? open_ssl_socket : 0), ec); @@ -1979,167 +2051,46 @@ namespace aux { return; } - // TODO: 3 this loop should be entirely merged with the one above and the - // udp sockets should be opened in parallel with the TCP ones, being held - // by listen_socket_t. - // until the UDP sockets fully honor the listen_interfaces setting, just - // create the two sockets based on the first matching (ssl vs. non-ssl) - // TCP socket -#ifdef TORRENT_USE_OPENSSL - bool created_ssl_udp_socket = false; -#endif - bool created_udp_socket = false; - for (std::list::const_iterator i = m_listen_sockets.begin() - , end(m_listen_sockets.end()); i != end; ++i) + // now, send out listen_succeeded_alert for the listen sockets we are + // listening on + if (m_alerts.should_post()) { - listen_socket_t const& s = *i; - -#ifdef TORRENT_USE_OPENSSL - if (!created_ssl_udp_socket && s.ssl) + for (std::list::iterator i = m_listen_sockets.begin() + , end(m_listen_sockets.end()); i != end; ++i) { - int retries = m_settings.get_int(settings_pack::max_retry_port_bind); - udp::endpoint bind_ep(s.local_endpoint.address(), s.local_endpoint.port()); - do + error_code err; + if (i->sock) { - ec.clear(); - m_ssl_udp_socket.bind(bind_ep, ec); - if (ec) + tcp::endpoint const tcp_ep = i->sock->local_endpoint(err); + if (!err) { -#ifndef TORRENT_DISABLE_LOGGING - session_log("SSL: cannot bind to UDP interface \"%s\": %s" - , print_endpoint(bind_ep).c_str(), ec.message().c_str()); -#endif - if (m_alerts.should_post()) - { - error_code err; - m_alerts.emplace_alert(bind_ep.address().to_string(err) - , tcp::endpoint(bind_ep.address(), bind_ep.port()) - , listen_failed_alert::bind, ec, listen_failed_alert::utp_ssl); - } - --retries; - bind_ep.port(bind_ep.port() + 1); - } - else - { - created_ssl_udp_socket = true; - maybe_update_udp_mapping(0, true, bind_ep.port(), bind_ep.port()); - maybe_update_udp_mapping(1, true, bind_ep.port(), bind_ep.port()); - } - } while (ec == error_code(error::address_in_use) && retries > 0); - } -#endif // TORRENT_USE_OPENSSL + listen_succeeded_alert::socket_type_t const socket_type + = i->ssl + ? listen_succeeded_alert::tcp_ssl + : listen_succeeded_alert::tcp; - if (!created_udp_socket && !s.ssl) - { - int retries = m_settings.get_int(settings_pack::max_retry_port_bind); - udp::endpoint bind_ep(s.local_endpoint.address(), s.local_endpoint.port()); - do + m_alerts.emplace_alert( + tcp_ep , socket_type); + } + } + + if (i->udp_sock) { - ec.clear(); - m_udp_socket.bind(bind_ep, ec); - if (ec) + udp::endpoint const udp_ep = i->udp_sock->local_endpoint(err); + if (!err && i->udp_sock->is_open()) { -#ifndef TORRENT_DISABLE_LOGGING - session_log("cannot bind to UDP interface \"%s\": %s" - , print_endpoint(bind_ep).c_str(), ec.message().c_str()); -#endif - if (m_alerts.should_post()) - { - error_code err; - m_alerts.emplace_alert(bind_ep.address().to_string(err) - , tcp::endpoint(bind_ep.address(), bind_ep.port()) - , listen_failed_alert::bind - , ec, listen_failed_alert::udp); - } - --retries; - bind_ep.port(bind_ep.port() + 1); + listen_succeeded_alert::socket_type_t const socket_type + = i->ssl + ? listen_succeeded_alert::utp_ssl + : listen_succeeded_alert::udp; + + m_alerts.emplace_alert( + tcp::endpoint(udp_ep.address(), udp_ep.port()), socket_type); } - else - { - created_udp_socket = true; - m_external_udp_port = m_udp_socket.local_port(); - maybe_update_udp_mapping(0, false, bind_ep.port(), bind_ep.port()); - maybe_update_udp_mapping(1, false, bind_ep.port(), bind_ep.port()); - } - } while (ec == error_code(error::address_in_use) && retries > 0); + } } } - // if we did not end up opening a udp socket, make sure we close any - // previous one -#ifdef TORRENT_USE_OPENSSL - if (!created_ssl_udp_socket) - { - m_ssl_udp_socket.close(); - - // if there are mappings for the SSL socket, delete them now - if (m_ssl_udp_mapping[0] != -1 && m_natpmp) - { - m_natpmp->delete_mapping(m_ssl_udp_mapping[0]); - m_ssl_udp_mapping[0] = -1; - } - if (m_ssl_udp_mapping[1] != -1 && m_upnp) - { - m_upnp->delete_mapping(m_ssl_udp_mapping[1]); - m_ssl_udp_mapping[1] = -1; - } - } -#endif - if (!created_udp_socket) - { - m_udp_socket.close(); - - // if there are mappings for the socket, delete them now - if (m_udp_mapping[0] != -1 && m_natpmp) - { - m_natpmp->delete_mapping(m_udp_mapping[0]); - m_udp_mapping[0] = -1; - } - if (m_udp_mapping[1] != -1 && m_upnp) - { - m_upnp->delete_mapping(m_udp_mapping[1]); - m_udp_mapping[1] = -1; - } - } - - // we made it! now post all the listen_succeeded_alerts - - for (std::list::iterator i = m_listen_sockets.begin() - , end(m_listen_sockets.end()); i != end; ++i) - { - listen_succeeded_alert::socket_type_t const socket_type = i->ssl - ? listen_succeeded_alert::tcp_ssl - : listen_succeeded_alert::tcp; - - if (!m_alerts.should_post()) continue; - - error_code error; - tcp::endpoint bind_ep = i->sock->local_endpoint(error); - if (error) continue; - - m_alerts.emplace_alert(bind_ep, socket_type); - } - -#ifdef TORRENT_USE_OPENSSL - if (m_ssl_udp_socket.is_open()) - { - error_code err; - if (m_alerts.should_post()) - m_alerts.emplace_alert( - m_ssl_udp_socket.local_endpoint(err) - , listen_succeeded_alert::utp_ssl); - } -#endif - - if (m_udp_socket.is_open()) - { - error_code err; - if (m_alerts.should_post()) - m_alerts.emplace_alert( - m_udp_socket.local_endpoint(err) - , listen_succeeded_alert::udp); - } - if (m_settings.get_int(settings_pack::peer_tos) != 0) { update_peer_tos(); @@ -2147,19 +2098,12 @@ namespace aux { ec.clear(); - set_socket_buffer_size(m_udp_socket, m_settings, ec); - if (ec) - { - if (m_alerts.should_post()) - m_alerts.emplace_alert(udp::endpoint(), ec); - } - // initiate accepting on the listen sockets for (std::list::iterator i = m_listen_sockets.begin() , end(m_listen_sockets.end()); i != end; ++i) { - async_accept(i->sock, i->ssl); - remap_ports(3, *i); + if (i->sock) async_accept(i->sock, i->ssl); + remap_ports(remap_natpmp_and_upnp, *i); } open_new_incoming_socks_connection(); @@ -2168,20 +2112,36 @@ namespace aux { #endif } - // TODO: 3 add an enum for the mask parameter here - void session_impl::remap_ports(boost::uint32_t mask, listen_socket_t& s) - { - if ((mask & 1) && m_natpmp) + namespace { + template + void map_port(MapProtocol& m, ProtoType protocol, EndpointType const& ep + , int& map_handle) { - if (s.tcp_port_mapping[0] != -1) m_natpmp->delete_mapping(s.tcp_port_mapping[0]); - s.tcp_port_mapping[0] = m_natpmp->add_mapping(natpmp::tcp - , s.local_endpoint.port(), s.local_endpoint.port()); + if (map_handle != -1) m.delete_mapping(map_handle); + map_handle = -1; + + // only update this mapping if we actually have a socket listening + if (ep.address() != address()) + map_handle = m.add_mapping(protocol, ep.port(), ep.port()); } - if ((mask & 2) && m_upnp) + } + + void session_impl::remap_ports(remap_port_mask_t const mask + , listen_socket_t& s) + { + error_code ec; + tcp::endpoint const tcp_ep = s.sock ? s.sock->local_endpoint(ec) : tcp::endpoint(); + udp::endpoint const udp_ep = s.udp_sock ? s.udp_sock->local_endpoint(ec) : udp::endpoint(); + + if ((mask & remap_natpmp) && m_natpmp) { - if (s.tcp_port_mapping[1] != -1) m_upnp->delete_mapping(s.tcp_port_mapping[1]); - s.tcp_port_mapping[1] = m_upnp->add_mapping(upnp::tcp - , s.local_endpoint.port(), s.local_endpoint.port()); + map_port(*m_natpmp, natpmp::tcp, tcp_ep, s.tcp_port_mapping[0]); + map_port(*m_natpmp, natpmp::udp, udp_ep, s.udp_port_mapping[0]); + } + if ((mask & remap_upnp) && m_upnp) + { + map_port(*m_upnp, upnp::tcp, tcp_ep, s.tcp_port_mapping[1]); + map_port(*m_upnp, upnp::udp, udp_ep, s.udp_port_mapping[1]); } } @@ -2306,23 +2266,238 @@ namespace aux { } #endif - bool session_impl::incoming_packet(error_code const& ec - , udp::endpoint const& ep, char const*, int) + void session_impl::send_udp_packet_hostname(char const* hostname + , int const port + , array_view p + , error_code& ec + , int const flags) { - m_stats_counters.inc_stats_counter(counters::on_udp_counter); + // for now, just pick the first socket with a matching address family + // TODO: 3 for proper multi-homed support, we may want to do something + // else here. Probably let the caller decide which interface to send over + for (std::list::iterator i = m_listen_sockets.begin() + , end(m_listen_sockets.end()); i != end; ++i) + { + if (!i->udp_sock) continue; + if (i->ssl) continue; + i->udp_sock->send_hostname(hostname, port, p, ec, flags); + + if ((ec == error::would_block + || ec == error::try_again) + && !i->udp_write_blocked) + { + i->udp_write_blocked = true; + ADD_OUTSTANDING_ASYNC("session_impl::on_udp_writeable"); + i->udp_sock->async_write(boost::bind(&session_impl::on_udp_writeable + , this, boost::weak_ptr(i->udp_sock), _1)); + } + return; + } + ec = boost::asio::error::operation_not_supported; + } + + void session_impl::send_udp_packet(bool const ssl + , udp::endpoint const& ep + , array_view p + , error_code& ec + , int const flags) + { + // for now, just pick the first socket with a matching address family + // TODO: 3 for proper multi-homed support, we may want to do something + // else here. Probably let the caller decide which interface to send over + for (std::list::iterator i = m_listen_sockets.begin() + , end(m_listen_sockets.end()); i != end; ++i) + { + if (i->ssl != ssl) continue; + if (!i->udp_sock) continue; + if (i->local_endpoint.address().is_v4() != ep.address().is_v4()) + continue; + + i->udp_sock->send(ep, p, ec, flags); + + if ((ec == error::would_block + || ec == error::try_again) + && !i->udp_write_blocked) + { + i->udp_write_blocked = true; + ADD_OUTSTANDING_ASYNC("session_impl::on_udp_writeable"); + i->udp_sock->async_write(boost::bind(&session_impl::on_udp_writeable + , this, boost::weak_ptr(i->udp_sock), _1)); + } + return; + } + ec = boost::asio::error::operation_not_supported; + } + + void session_impl::on_udp_writeable(boost::weak_ptr s, error_code const& ec) + { + COMPLETE_ASYNC("session_impl::on_udp_writeable"); + if (ec) return; + + boost::shared_ptr sock = s.lock(); + if (!sock) return; + + std::list::iterator i = std::find_if( + m_listen_sockets.begin(), m_listen_sockets.end() + , boost::bind(&listen_socket_t::udp_sock, _1) == sock); + + if (i == m_listen_sockets.end()) return; + + i->udp_write_blocked = false; + + // notify the utp socket manager it can start sending on the socket again + struct utp_socket_manager& mgr = +#ifdef TORRENT_USE_OPENSSL + i->ssl ? m_ssl_utp_socket_manager : +#endif + m_utp_socket_manager; + + mgr.writable(); + } + + + void session_impl::on_udp_packet(boost::weak_ptr const& socket + , bool const ssl, error_code const& ec) + { + COMPLETE_ASYNC("session_impl::on_udp_packet"); if (ec) { + boost::shared_ptr s = socket.lock(); + udp::endpoint ep; + error_code best_effort; + if (s) ep = s->local_endpoint(best_effort); + // don't bubble up operation aborted errors to the user if (ec != boost::asio::error::operation_aborted + && ec != boost::asio::error::bad_descriptor && m_alerts.should_post()) + { m_alerts.emplace_alert(ep, ec); + } #ifndef TORRENT_DISABLE_LOGGING - session_log("UDP socket error: (%d) %s", ec.value(), ec.message().c_str()); + session_log("UDP error: %s (%d) %s" + , print_endpoint(ep).c_str(), ec.value(), ec.message().c_str()); #endif + return; } - return false; + + m_stats_counters.inc_stats_counter(counters::on_udp_counter); + + boost::shared_ptr s = socket.lock(); + if (!s) return; + + struct utp_socket_manager& mgr = +#ifdef TORRENT_USE_OPENSSL + ssl ? m_ssl_utp_socket_manager : +#endif + m_utp_socket_manager; + + for (;;) + { + boost::array p; + error_code err; + int const num_packets = s->read(array_view(p), err); + + for (int i = 0; i < num_packets; ++i) + { + udp_socket::packet& packet = p[i]; + + if (packet.error) + { + // TODO: 3 it would be neat if the utp socket manager would + // handle ICMP errors too + +#ifndef TORRENT_DISABLE_DHT + if (m_dht) + m_dht->incoming_error(packet.error, packet.from); +#endif + + m_tracker_manager.incoming_error(packet.error, packet.from); + continue; + } + + char* buf = packet.data.data(); + int const len = packet.data.size(); + + // give the uTP socket manager first dis on the packet. Presumably + // the majority of packets are uTP packets. + if (!mgr.incoming_packet(packet.from, buf, len)) + { + // if it wasn't a uTP packet, try the other users of the UDP + // socket + bool handled = false; +#ifndef TORRENT_DISABLE_DHT + if (m_dht && len > 20 && buf[0] == 'd' && buf[len-1] == 'e') + { + handled = m_dht->incoming_packet(packet.from, buf, len); + } +#endif + + if (!handled) + { + m_tracker_manager.incoming_packet(packet.from, buf, len); + } + } + } + + if (err == error::would_block || err == error::try_again) + { + // there are no more packets on the socket + break; + } + + if (err) + { + error_code best_effort; + udp::endpoint ep = s->local_endpoint(best_effort); + + if (err != boost::asio::error::operation_aborted + && m_alerts.should_post()) + m_alerts.emplace_alert(ep, err); + +#ifndef TORRENT_DISABLE_LOGGING + session_log("UDP error: %s (%d) %s" + , print_endpoint(ep).c_str(), ec.value(), ec.message().c_str()); +#endif + + // any error other than these ones are considered fatal errors, and + // we won't read from the socket again + if (err != boost::asio::error::host_unreachable + && err != boost::asio::error::fault + && err != boost::asio::error::connection_reset + && err != boost::asio::error::connection_refused + && err != boost::asio::error::connection_aborted + && err != boost::asio::error::operation_aborted + && err != boost::asio::error::network_reset + && err != boost::asio::error::network_unreachable +#ifdef WIN32 + // ERROR_MORE_DATA means the same thing as EMSGSIZE + && err != error_code(ERROR_MORE_DATA, system_category()) + && err != error_code(ERROR_HOST_UNREACHABLE, system_category()) + && err != error_code(ERROR_PORT_UNREACHABLE, system_category()) + && err != error_code(ERROR_RETRY, system_category()) + && err != error_code(ERROR_NETWORK_UNREACHABLE, system_category()) + && err != error_code(ERROR_CONNECTION_REFUSED, system_category()) + && err != error_code(ERROR_CONNECTION_ABORTED, system_category()) +#endif + && err != boost::asio::error::message_size) + { + // fatal errors. Don't try to read from this socket again + mgr.socket_drained(); + return; + } + // non-fatal UDP errors get here, we should re-issue the read. + continue; + } + } + + mgr.socket_drained(); + + ADD_OUTSTANDING_ASYNC("session_impl::on_udp_packet"); + s->async_read(boost::bind(&session_impl::on_udp_packet + , this, socket, ssl, _1)); } void session_impl::async_accept(boost::shared_ptr const& listener, bool ssl) @@ -2360,7 +2535,8 @@ namespace aux { } void session_impl::on_accept_connection(shared_ptr const& s - , weak_ptr listen_socket, error_code const& e, bool ssl) + , weak_ptr listen_socket, error_code const& e + , bool const ssl) { COMPLETE_ASYNC("session_impl::on_accept_connection"); m_stats_counters.inc_stats_counter(counters::on_accept_counter); @@ -2971,11 +3147,21 @@ namespace aux { if (m_abort) { if (m_utp_socket_manager.num_sockets() == 0 +#ifdef TORRENT_USE_OPENSSL + && m_ssl_utp_socket_manager.num_sockets() == 0 +#endif && m_undead_peers.empty()) + { return; + } #if defined TORRENT_ASIO_DEBUGGING - fprintf(stderr, "uTP sockets left: %d undead-peers left: %d\n" + fprintf(stderr, "uTP sockets: %d ssl-uTP sockets: %d undead-peers left: %d\n" , m_utp_socket_manager.num_sockets() +#ifdef TORRENT_USE_OPENSSL + , m_ssl_utp_socket_manager.num_sockets() +#else + , 0 +#endif , int(m_undead_peers.size())); #endif } @@ -5170,11 +5356,11 @@ namespace aux { // in case we just set a socks proxy, we might have to // open the socks incoming connection if (!m_socks_listen_socket) open_new_incoming_socks_connection(); - m_udp_socket.set_proxy_settings(proxy()); - -#ifdef TORRENT_USE_OPENSSL - m_ssl_udp_socket.set_proxy_settings(proxy()); -#endif + for (std::list::iterator i = m_listen_sockets.begin() + , end(m_listen_sockets.end()); i != end; ++i) + { + i->udp_sock->set_proxy_settings(proxy()); + } } void session_impl::update_upnp() @@ -5281,9 +5467,6 @@ namespace aux { { if (i->ssl) return i->tcp_external_port; } - - if (m_ssl_udp_socket.is_open()) - return m_ssl_udp_socket.local_port(); #endif return 0; } @@ -5338,6 +5521,11 @@ namespace aux { { return ls.tcp_port_mapping[transport] == mapping; } + + bool find_udp_port_mapping(int transport, int mapping, listen_socket_t const& ls) + { + return ls.udp_port_mapping[transport] == mapping; + } } // transport is 0 for NAT-PMP and 1 for UPnP @@ -5354,16 +5542,6 @@ namespace aux { , map_transport, ec); } - if (mapping == m_udp_mapping[map_transport] && port != 0) - { - m_external_udp_port = port; - if (m_alerts.should_post()) - m_alerts.emplace_alert(mapping, port - , map_transport, protocol == natpmp::udp - ? portmap_alert::udp : portmap_alert::tcp); - return; - } - // look through our listen sockets to see if this mapping is for one of // them (it could also be a user mapping) @@ -5371,6 +5549,14 @@ namespace aux { = std::find_if(m_listen_sockets.begin(), m_listen_sockets.end() , boost::bind(find_tcp_port_mapping, map_transport, mapping, _1)); + bool tcp = true; + if (ls == m_listen_sockets.end()) + { + ls = std::find_if(m_listen_sockets.begin(), m_listen_sockets.end() + , boost::bind(find_udp_port_mapping, map_transport, mapping, _1)); + tcp = false; + } + if (ls != m_listen_sockets.end()) { if (ip != address()) @@ -5381,7 +5567,8 @@ namespace aux { } ls->external_address = ip; - ls->tcp_external_port = port; + if (tcp) ls->tcp_external_port = port; + else ls->udp_external_port = port; } if (!ec && m_alerts.should_post()) @@ -5542,8 +5729,11 @@ namespace aux { // postpone starting the DHT if we're still resolving the DHT router if (m_outstanding_router_lookups > 0) return; - m_dht = boost::make_shared(static_cast(this) - , boost::ref(m_udp_socket), boost::cref(m_dht_settings) + m_dht = boost::make_shared( + static_cast(this) + , boost::ref(m_io_service) + , boost::bind(&session_impl::send_udp_packet, this, false, _1, _2, _3, _4) + , boost::cref(m_dht_settings) , boost::ref(m_stats_counters) , m_dht_storage_constructor , startup_state); @@ -5562,14 +5752,11 @@ namespace aux { m_dht_nodes.clear(); m_dht->start(startup_state, boost::bind(&on_bootstrap, boost::ref(m_alerts))); - - m_udp_socket.subscribe(m_dht.get()); } void session_impl::stop_dht() { if (!m_dht) return; - m_udp_socket.unsubscribe(m_dht.get()); m_dht->stop(); m_dht.reset(); } @@ -5784,50 +5971,6 @@ namespace aux { #endif - void session_impl::maybe_update_udp_mapping(int const nat, bool const ssl - , int const local_port, int const external_port) - { - int local, external, protocol; -#ifdef TORRENT_USE_OPENSSL - int* mapping = ssl ? m_ssl_udp_mapping : m_udp_mapping; -#else - TORRENT_UNUSED(ssl); - int* mapping = m_udp_mapping; -#endif - if (nat == 0 && m_natpmp) - { - if (mapping[nat] != -1) - { - if (m_natpmp->get_mapping(mapping[nat], local, external, protocol)) - { - // we already have a mapping. If it's the same, don't do anything - if (local == local_port && external == external_port && protocol == natpmp::udp) - return; - } - m_natpmp->delete_mapping(mapping[nat]); - } - mapping[nat] = m_natpmp->add_mapping(natpmp::udp - , local_port, external_port); - return; - } - else if (nat == 1 && m_upnp) - { - if (mapping[nat] != -1) - { - if (m_upnp->get_mapping(mapping[nat], local, external, protocol)) - { - // we already have a mapping. If it's the same, don't do anything - if (local == local_port && external == external_port && protocol == natpmp::udp) - return; - } - m_upnp->delete_mapping(mapping[nat]); - } - mapping[nat] = m_upnp->add_mapping(upnp::udp - , local_port, external_port); - return; - } - } - #if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) void session_impl::add_obfuscated_hash(sha1_hash const& obfuscated , boost::weak_ptr const& t) @@ -5846,15 +5989,6 @@ namespace aux { // this is not allowed to be the network thread! // TORRENT_ASSERT(is_not_thread()); - m_udp_socket.unsubscribe(this); - m_udp_socket.unsubscribe(&m_utp_socket_manager); - m_udp_socket.unsubscribe(&m_tracker_manager); - -#ifdef TORRENT_USE_OPENSSL - m_ssl_udp_socket.unsubscribe(this); - m_ssl_udp_socket.unsubscribe(&m_ssl_utp_socket_manager); -#endif - TORRENT_ASSERT(m_torrents.empty()); TORRENT_ASSERT(m_connections.empty()); @@ -5976,33 +6110,47 @@ namespace aux { } #endif + + namespace { + template + void set_tos(Socket& s, int v, error_code& ec) + { +#if TORRENT_USE_IPV6 + if (s.local_endpoint(ec).address().is_v6()) + s.set_option(traffic_class(v), ec); + else if (!ec) +#endif + s.set_option(type_of_service(v), ec); + } + } + // TODO: 2 this should be factored into the udp socket, so we only have the // code once void session_impl::update_peer_tos() { - error_code ec; - -#if TORRENT_USE_IPV6 && defined IPV6_TCLASS - if (m_udp_socket.local_endpoint(ec).address().is_v6()) - m_udp_socket.set_option(traffic_class(m_settings.get_int(settings_pack::peer_tos)), ec); - else -#endif - m_udp_socket.set_option(type_of_service(m_settings.get_int(settings_pack::peer_tos)), ec); - -#ifdef TORRENT_USE_OPENSSL -#if TORRENT_USE_IPV6 && defined IPV6_TCLASS - if (m_ssl_udp_socket.local_endpoint(ec).address().is_v6()) - m_ssl_udp_socket.set_option(traffic_class(m_settings.get_int(settings_pack::peer_tos)), ec); - else -#endif - m_ssl_udp_socket.set_option(type_of_service(m_settings.get_int(settings_pack::peer_tos)), ec); -#endif + int const tos = m_settings.get_int(settings_pack::peer_tos); + for (std::list::iterator i = m_listen_sockets.begin() + , end(m_listen_sockets.end()); i != end; ++i) + { + error_code ec; + set_tos(*i->sock, tos, ec); #ifndef TORRENT_DISABLE_LOGGING - session_log(">>> SET_TOS [ udp_socket tos: %x e: %s ]" - , m_settings.get_int(settings_pack::peer_tos) - , ec.message().c_str()); + error_code err; + session_log(">>> SET_TOS [ tcp (%s %d) tos: %x e: %s ]" + , i->sock->local_endpoint(err).address().to_string().c_str() + , i->sock->local_endpoint(err).port(), tos, ec.message().c_str()); #endif + ec.clear(); + set_tos(*i->udp_sock, tos, ec); + +#ifndef TORRENT_DISABLE_LOGGING + session_log(">>> SET_TOS [ udp (%s %d) tos: %x e: %s ]" + , i->udp_sock->local_endpoint(err).address().to_string().c_str() + , i->udp_sock->local_port() + , tos, ec.message().c_str()); +#endif + } } void session_impl::update_user_agent() @@ -6182,22 +6330,32 @@ namespace aux { void session_impl::update_socket_buffer_size() { - error_code ec; - set_socket_buffer_size(m_udp_socket, m_settings, ec); - if (ec) + for (std::list::iterator i = m_listen_sockets.begin() + , end(m_listen_sockets.end()); i != end; ++i) { - if (m_alerts.should_post()) - m_alerts.emplace_alert(udp::endpoint(), ec); - } - -#ifdef TORRENT_USE_OPENSSL - set_socket_buffer_size(m_ssl_udp_socket, m_settings, ec); - if (ec) - { - if (m_alerts.should_post()) - m_alerts.emplace_alert(udp::endpoint(), ec); - } + error_code ec; + set_socket_buffer_size(*i->udp_sock, m_settings, ec); +#ifndef TORRENT_DISABLE_LOGGING + if (ec) + { + error_code err; + session_log("socket buffer size [ udp %s %d]: (%d) %s" + , i->udp_sock->local_endpoint(err).address().to_string(err).c_str() + , i->udp_sock->local_port(), ec.value(), ec.message().c_str()); + } #endif + ec.clear(); + set_socket_buffer_size(*i->sock, m_settings, ec); +#ifndef TORRENT_DISABLE_LOGGING + if (ec) + { + error_code err; + session_log("socket buffer size [ udp %s %d]: (%d) %s" + , i->sock->local_endpoint(err).address().to_string(err).c_str() + , i->sock->local_endpoint(err).port(), ec.value(), ec.message().c_str()); + } +#endif + } } void session_impl::update_dht_announce_interval() @@ -6241,10 +6399,19 @@ namespace aux { void session_impl::update_force_proxy() { - m_udp_socket.set_force_proxy(m_settings.get_bool(settings_pack::force_proxy)); -#ifdef TORRENT_USE_OPENSSL - m_ssl_udp_socket.set_force_proxy(m_settings.get_bool(settings_pack::force_proxy)); -#endif + for (std::list::iterator i = m_listen_sockets.begin() + , end(m_listen_sockets.end()); i != end; ++i) + { + i->udp_sock->set_force_proxy(m_settings.get_bool(settings_pack::force_proxy)); + + // close the TCP listen sockets + if (i->sock) + { + error_code ec; + i->sock->close(ec); + i->sock.reset(); + } + } if (!m_settings.get_bool(settings_pack::force_proxy)) return; @@ -6256,12 +6423,6 @@ namespace aux { #ifndef TORRENT_DISABLE_DHT stop_dht(); #endif - // close the listen sockets - error_code ec; - for (std::list::iterator i = m_listen_sockets.begin() - , end(m_listen_sockets.end()); i != end; ++i) - i->sock->close(ec); - m_listen_sockets.clear(); } #ifndef TORRENT_NO_DEPRECATE @@ -6522,33 +6683,8 @@ namespace aux { for (std::list::iterator i = m_listen_sockets.begin() , end(m_listen_sockets.end()); i != end; ++i) { - remap_ports(1, *i); + remap_ports(remap_natpmp, *i); } - - // TODO: 3 once UDP sockets are part of m_listen_sockets, this is not - // necesarry! - if (m_udp_socket.is_open()) - { - error_code ec; - tcp::endpoint ep = m_udp_socket.local_endpoint(ec); - if (!ec) { - if (m_udp_mapping[0] != -1) m_natpmp->delete_mapping(m_udp_mapping[0]); - m_udp_mapping[0] = m_natpmp->add_mapping(natpmp::udp - , ep.port(), ep.port()); - } - } -#ifdef TORRENT_USE_OPENSSL - if (m_ssl_udp_socket.is_open()) - { - error_code ec; - tcp::endpoint ep = m_ssl_udp_socket.local_endpoint(ec); - if (!ec) { - if (m_ssl_udp_mapping[0] != -1) m_natpmp->delete_mapping(m_ssl_udp_mapping[0]); - m_ssl_udp_mapping[0] = m_natpmp->add_mapping(natpmp::udp - , ep.port(), ep.port()); - } - } -#endif return m_natpmp.get(); } @@ -6573,33 +6709,8 @@ namespace aux { for (std::list::iterator i = m_listen_sockets.begin() , end(m_listen_sockets.end()); i != end; ++i) { - remap_ports(1, *i); + remap_ports(remap_upnp, *i); } - - // TODO: 3 once the UDP sockets are part of m_listen_sockets this won't be - // necessary! - if (m_udp_socket.is_open()) - { - error_code ec; - tcp::endpoint ep = m_udp_socket.local_endpoint(ec); - if (!ec) { - if (m_udp_mapping[1] != -1) m_upnp->delete_mapping(m_udp_mapping[1]); - m_udp_mapping[1] = m_upnp->add_mapping(upnp::udp - , ep.port(), ep.port()); - } - } -#ifdef TORRENT_USE_OPENSSL - if (m_ssl_udp_socket.is_open()) - { - error_code ec; - tcp::endpoint ep = m_ssl_udp_socket.local_endpoint(ec); - if (!ec) { - if (m_ssl_udp_mapping[1] != -1) m_upnp->delete_mapping(m_ssl_udp_mapping[1]); - m_ssl_udp_mapping[1] = m_upnp->add_mapping(upnp::udp - , ep.port(), ep.port()); - } - } -#endif return m_upnp.get(); } @@ -6636,12 +6747,9 @@ namespace aux { , end(m_listen_sockets.end()); i != end; ++i) { i->tcp_port_mapping[0] = -1; + i->udp_port_mapping[0] = -1; } - m_udp_mapping[0] = -1; -#ifdef TORRENT_USE_OPENSSL - m_ssl_udp_mapping[0] = -1; -#endif m_natpmp.reset(); } @@ -6654,11 +6762,8 @@ namespace aux { , end(m_listen_sockets.end()); i != end; ++i) { i->tcp_port_mapping[1] = -1; + i->udp_port_mapping[1] = -1; } - m_udp_mapping[1] = -1; -#ifdef TORRENT_USE_OPENSSL - m_ssl_udp_mapping[1] = -1; -#endif m_upnp.reset(); } diff --git a/src/torrent.cpp b/src/torrent.cpp index fa7ec17a8..6d73233dd 100644 --- a/src/torrent.cpp +++ b/src/torrent.cpp @@ -3410,7 +3410,7 @@ namespace libtorrent // the tracker did resolve to a different type of address, so announce // to that as well - // TODO 2: there's a bug when removing a torrent or shutting down the session, + // TODO 3: there's a bug when removing a torrent or shutting down the session, // where the second announce is skipped (in this case, the one to the IPv6 // name). This should be fixed by generalizing the tracker list structure to // separate the IPv6 and IPv4 addresses as conceptually separate trackers, diff --git a/src/tracker_manager.cpp b/src/tracker_manager.cpp index 61d4599a1..bc114659e 100644 --- a/src/tracker_manager.cpp +++ b/src/tracker_manager.cpp @@ -60,6 +60,8 @@ namespace namespace libtorrent { + using namespace libtorrent::aux; + timeout_handler::timeout_handler(io_service& ios) : m_completion_timeout(0) , m_start_time(clock_type::now()) @@ -191,9 +193,8 @@ namespace libtorrent m_man.remove_request(this); } - // TODO: 2 some of these arguments could probably be moved to the - // tracker request itself. like the ip_filter and settings - tracker_manager::tracker_manager(class udp_socket& sock + tracker_manager::tracker_manager(send_fun_t const& send_fun + , send_fun_hostname_t const& send_fun_hostname , counters& stats_counters , resolver_interface& resolver , aux::session_settings const& sett @@ -201,7 +202,8 @@ namespace libtorrent , aux::session_logger& ses #endif ) - : m_udp_socket(sock) + : m_send_fun(send_fun) + , m_send_fun_hostname(send_fun_hostname) , m_host_resolver(resolver) , m_settings(sett) , m_stats_counters(stats_counters) @@ -231,7 +233,7 @@ namespace libtorrent void tracker_manager::remove_request(tracker_connection const* c) { - mutex_t::scoped_lock l(m_mutex); + mutex::scoped_lock l(m_mutex); http_conns_t::iterator i = std::find_if(m_http_conns.begin() , m_http_conns.end() @@ -266,7 +268,7 @@ namespace libtorrent , tracker_request req , boost::weak_ptr c) { - mutex_t::scoped_lock l(m_mutex); + mutex::scoped_lock l(m_mutex); TORRENT_ASSERT(req.num_want >= 0); TORRENT_ASSERT(!m_abort || req.event == tracker_request::stopped); if (m_abort && req.event != tracker_request::stopped) return; @@ -309,8 +311,8 @@ namespace libtorrent , "", 0)); } - bool tracker_manager::incoming_packet(error_code const& e - , udp::endpoint const& ep, char const* buf, int size) + bool tracker_manager::incoming_packet(udp::endpoint const& ep + , char const* buf, int size) { // ignore packets smaller than 8 bytes if (size < 8) @@ -343,11 +345,19 @@ namespace libtorrent boost::shared_ptr p = i->second; // on_receive() may remove the tracker connection from the list - return p->on_receive(e, ep, buf, size); + return p->on_receive(ep, buf, size); } - bool tracker_manager::incoming_packet(error_code const& e - , char const* hostname, char const* buf, int size) + void tracker_manager::incoming_error(error_code const& ec + , udp::endpoint const& ep) + { + TORRENT_UNUSED(ec); + TORRENT_UNUSED(ep); + // TODO: 2 implement + } + + bool tracker_manager::incoming_packet(char const* hostname + , char const* buf, int size) { // ignore packets smaller than 8 bytes if (size < 16) return false; @@ -374,13 +384,26 @@ namespace libtorrent boost::shared_ptr p = i->second; // on_receive() may remove the tracker connection from the list - return p->on_receive_hostname(e, hostname, buf, size); + return p->on_receive_hostname(hostname, buf, size); + } + + void tracker_manager::send_hostname(char const* hostname, int const port + , array_view p, error_code& ec, int const flags) + { + m_send_fun_hostname(hostname, port, p, ec, flags); + } + + void tracker_manager::send(udp::endpoint const& ep + , array_view p + , error_code& ec, int const flags) + { + m_send_fun(ep, p, ec, flags); } void tracker_manager::abort_all_requests(bool all) { // removes all connections except 'event=stopped'-requests - mutex_t::scoped_lock l(m_mutex); + mutex::scoped_lock l(m_mutex); m_abort = true; http_conns_t close_http_connections; @@ -434,13 +457,13 @@ namespace libtorrent bool tracker_manager::empty() const { - mutex_t::scoped_lock l(m_mutex); + mutex::scoped_lock l(m_mutex); return m_http_conns.empty() && m_udp_conns.empty(); } int tracker_manager::num_requests() const { - mutex_t::scoped_lock l(m_mutex); + mutex::scoped_lock l(m_mutex); return m_http_conns.size() + m_udp_conns.size(); } } diff --git a/src/udp_socket.cpp b/src/udp_socket.cpp index 75cbd6b89..373538eef 100644 --- a/src/udp_socket.cpp +++ b/src/udp_socket.cpp @@ -44,6 +44,9 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/debug.hpp" #include + +#include "libtorrent/aux_/disable_warnings_push.hpp" + #include #include #include @@ -51,84 +54,193 @@ POSSIBILITY OF SUCH DAMAGE. #include #include -using namespace libtorrent; +#include "libtorrent/aux_/disable_warnings_pop.hpp" + +#if defined TORRENT_ASIO_DEBUGGING +#include "libtorrent/debug.hpp" +#endif + +namespace libtorrent { + +using namespace libtorrent::aux; + +// this class hold the state of the SOCKS5 connection to maintain the UDP +// ASSOCIATE tunnel. It's instantiated on the heap for two reasons: +// +// 1. since its asynchronous functions may refer to it after the udp_socket has +// been destructed, it needs to be held by a shared_ptr +// 2. since using a sokcs proxy is assumed to be a less common case, it makes +// the common case cheaper by not allocating this space unconditionally +struct socks5 : boost::enable_shared_from_this +{ + socks5(io_service& ios) + : m_socks5_sock(ios) + , m_resolver(ios) + , m_timer(ios) + , m_abort(false) + , m_active(false) + {} + + void start(aux::proxy_settings const& ps); + void close(); + + bool active() const { return m_active; } + udp::endpoint target() const { return m_udp_proxy_addr; } + +private: + + boost::shared_ptr self() { return shared_from_this(); } + + void on_name_lookup(error_code const& e, tcp::resolver::iterator i); + void on_connect_timeout(error_code const& ec); + void on_connected(error_code const& ec); + void handshake1(error_code const& e); + void handshake2(error_code const& e); + void handshake3(error_code const& e); + void handshake4(error_code const& e); + void socks_forward_udp(); + void connect1(error_code const& e); + void connect2(error_code const& e); + void hung_up(error_code const& e); + + tcp::socket m_socks5_sock; + tcp::resolver m_resolver; + deadline_timer m_timer; + char m_tmp_buf[270]; + + aux::proxy_settings m_proxy_settings; + + // this is the endpoint the proxy server lives at. + // when performing a UDP associate, we get another + // endpoint (presumably on the same IP) where we're + // supposed to send UDP packets. + udp::endpoint m_proxy_addr; + + // this is where UDP packets that are to be forwarded + // are sent. The result from UDP ASSOCIATE is stored + // in here. + udp::endpoint m_udp_proxy_addr; + + // set to true when we've been asked to shut down + bool m_abort; + + // set to true once the tunnel is established + bool m_active; +}; + +#ifdef TORRENT_HAS_DONT_FRAGMENT +struct set_dont_frag +{ + set_dont_frag(udp::socket& sock, bool const df) + : m_socket(sock) + , m_df(df) + { + if (!m_df) return; + error_code ignore_errors; + m_socket.set_option(libtorrent::dont_fragment(true), ignore_errors); + TORRENT_ASSERT_VAL(!ignore_errors, ignore_errors.message()); + } + + ~set_dont_frag() + { + if (!m_df) return; + error_code ignore_errors; + m_socket.set_option(libtorrent::dont_fragment(false), ignore_errors); + TORRENT_ASSERT_VAL(!ignore_errors, ignore_errors.message()); + } + +private: + udp::socket& m_socket; + bool const m_df; +}; +#else +struct set_dont_frag +{ set_dont_frag(udp::socket&, int) {} }; +#endif udp_socket::udp_socket(io_service& ios) - : m_observers_locked(false) - , m_ipv4_sock(ios) - , m_timer(ios) - , m_buf_size(0) - , m_new_buf_size(0) - , m_buf(0) -#if TORRENT_USE_IPV6 - , m_ipv6_sock(ios) -#endif + : m_socket(ios) + , m_buf_size(1500) + , m_buf(NULL) , m_bind_port(0) - , m_v4_outstanding(0) - , m_restart_v4(0) -#if TORRENT_USE_IPV6 - , m_v6_outstanding(0) - , m_restart_v6(false) -#endif - , m_socks5_sock(ios) - , m_resolver(ios) - , m_queue_packets(false) - , m_tunnel_packets(false) , m_force_proxy(false) , m_abort(true) - , m_outstanding_ops(0) -#if TORRENT_USE_IPV6 - , m_v6_write_subscribed(false) -#endif - , m_v4_write_subscribed(false) { -#if TORRENT_USE_ASSERTS - m_magic = 0x1337; - m_started = false; - m_outstanding_when_aborted = -1; - m_outstanding_connect = 0; - m_outstanding_timeout = 0; - m_outstanding_resolve = 0; - m_outstanding_socks = 0; -#endif - - m_buf_size = 2048; - m_new_buf_size = m_buf_size; m_buf = static_cast(malloc(m_buf_size)); } udp_socket::~udp_socket() { free(m_buf); -#if TORRENT_USE_IPV6 - TORRENT_ASSERT_VAL(m_v6_outstanding == 0, m_v6_outstanding); -#endif - TORRENT_ASSERT_VAL(m_v4_outstanding == 0, m_v4_outstanding); - TORRENT_ASSERT(m_magic == 0x1337); - TORRENT_ASSERT(m_observers_locked == false); -#if TORRENT_USE_ASSERTS - m_magic = 0; -#endif - TORRENT_ASSERT(m_outstanding_ops == 0); } -#if TORRENT_USE_ASSERTS - #define CHECK_MAGIC check_magic_ cm_(m_magic) - struct check_magic_ - { - check_magic_(int& m_): m(m_) { TORRENT_ASSERT(m == 0x1337); } - ~check_magic_() { TORRENT_ASSERT(m == 0x1337); } - int& m; - }; -#else - #define CHECK_MAGIC do {} TORRENT_WHILE_0 -#endif - -void udp_socket::send_hostname(char const* hostname, int port - , char const* p, int len, error_code& ec, int flags) +int udp_socket::read(array_view pkts, error_code& ec) { - CHECK_MAGIC; + int const num = pkts.size(); + int ret = 0; + packet p; + while (ret < num) + { + int const len = m_socket.receive_from(boost::asio::buffer(m_buf, m_buf_size) + , p.from, 0, ec); + + if (ec == error::would_block + || ec == error::try_again + || ec == error::operation_aborted + || ec == error::bad_descriptor) + { + return ret; + } + + if (ec == error::interrupted) + { + continue; + } + + if (ec) + { + // SOCKS5 cannot wrap ICMP errors. And even if it could, they certainly + // would not arrive as unwrapped (regular) ICMP errors. If we're using + // a proxy we must ignore these + if (m_force_proxy + || (m_socks5_connection + && m_socks5_connection->active())) continue; + + p.error = ec; + p.data = array_view(); + } + else + { + p.data = array_view(m_buf, len); + + // support packets coming from the SOCKS5 proxy + if (m_socks5_connection && m_socks5_connection->active()) + { + // if the source IP doesn't match the proxy's, ignore the packet + if (p.from != m_socks5_connection->target()) continue; + if (!unwrap(p.from, p.data)) continue; + } + // block incoming packets that aren't coming via the proxy + // if force proxy mode is enabled + else if (m_force_proxy) continue; + } + + pkts[ret] = p; + ++ret; + + // we only have a single buffer for now, so we can only return a + // single packet. In the future though, we could attempt to drain + // the socket here, or maybe even use recvmmsg() + break; + } + + return ret; +} + +void udp_socket::send_hostname(char const* hostname, int const port + , array_view p, error_code& ec, int const flags) +{ TORRENT_ASSERT(is_single_thread()); // if the sockets are closed, the udp_socket is closing too @@ -138,39 +250,28 @@ void udp_socket::send_hostname(char const* hostname, int port return; } - if (m_tunnel_packets) + if (m_socks5_connection && m_socks5_connection->active()) { // send udp packets through SOCKS5 server - wrap(hostname, port, p, len, ec); + wrap(hostname, port, p, ec, flags); return; } - // this function is only supported when we're using a proxy - if (!m_queue_packets && !m_force_proxy) + if (m_force_proxy) { - address target = address::from_string(hostname, ec); - if (!ec) send(udp::endpoint(target, port), p, len, ec, 0); + ec = error_code(boost::system::errc::permission_denied, generic_category()); return; } - if (m_queue.size() > 1000 || (flags & dont_queue)) return; - - m_queue.push_back(queued_packet()); - queued_packet& qp = m_queue.back(); - qp.ep.port(port); - + // the overload that takes a hostname is really only supported when we're + // using a proxy address target = address::from_string(hostname, ec); - if (ec) qp.ep.address(target); - else qp.hostname = allocate_string_copy(hostname); - qp.buf.insert(qp.buf.begin(), p, p + len); - qp.flags = 0; + if (!ec) send(udp::endpoint(target, port), p, ec, flags); } -void udp_socket::send(udp::endpoint const& ep, char const* p, int len - , error_code& ec, int flags) +void udp_socket::send(udp::endpoint const& ep, array_view p + , error_code& ec, int const flags) { - CHECK_MAGIC; - TORRENT_ASSERT(is_single_thread()); // if the sockets are closed, the udp_socket is closing too @@ -186,401 +287,26 @@ void udp_socket::send(udp::endpoint const& ep, char const* p, int len || (flags & (tracker_connection | peer_connection)) == 0 ; - if (allow_proxy) + if (allow_proxy && m_socks5_connection && m_socks5_connection->active()) { - if (m_tunnel_packets) - { - // send udp packets through SOCKS5 server - wrap(ep, p, len, ec); - return; - } - - if (m_queue_packets) - { - if (m_queue.size() > 1000 || (flags & dont_queue)) return; - - m_queue.push_back(queued_packet()); - queued_packet& qp = m_queue.back(); - qp.ep = ep; - qp.hostname = 0; - qp.flags = flags; - qp.buf.insert(qp.buf.begin(), p, p + len); - return; - } + // send udp packets through SOCKS5 server + wrap(ep, p, ec, flags); + return; } if (m_force_proxy) return; -#if TORRENT_USE_IPV6 - if (ep.address().is_v6() && m_ipv6_sock.is_open()) - m_ipv6_sock.send_to(boost::asio::buffer(p, len), ep, 0, ec); - else -#endif - m_ipv4_sock.send_to(boost::asio::buffer(p, len), ep, 0, ec); + // set the DF flag for the socket and clear it again in the destructor + set_dont_frag df(m_socket, (flags & dont_fragment) != 0 + && ep.protocol() == udp::v4()); - if (ec == error::would_block || ec == error::try_again) - { -#if TORRENT_USE_IPV6 - if (ep.address().is_v6() && m_ipv6_sock.is_open()) - { - if (!m_v6_write_subscribed) - { - m_ipv6_sock.async_send(null_buffers() - , boost::bind(&udp_socket::on_writable, this, _1, &m_ipv6_sock)); - m_v6_write_subscribed = true; - } - } - else -#endif - { - if (!m_v4_write_subscribed) - { - m_ipv4_sock.async_send(null_buffers() - , boost::bind(&udp_socket::on_writable, this, _1, &m_ipv4_sock)); - m_v4_write_subscribed = true; - } - } - } + m_socket.send_to(boost::asio::buffer(p.data(), p.size()), ep, 0, ec); } -void udp_socket::on_writable(error_code const& ec, udp::socket* s) +void udp_socket::wrap(udp::endpoint const& ep, array_view p + , error_code& ec, int const flags) { -#if TORRENT_USE_IPV6 - if (s == &m_ipv6_sock) - m_v6_write_subscribed = false; - else -#else - TORRENT_UNUSED(s); -#endif - m_v4_write_subscribed = false; - - if (ec == boost::asio::error::operation_aborted) return; - - call_writable_handler(); -} - -// called whenever the socket is readable -void udp_socket::on_read(error_code const& ec, udp::socket* s) -{ - COMPLETE_ASYNC("udp_socket::on_read"); - - TORRENT_ASSERT(m_magic == 0x1337); - TORRENT_ASSERT(is_single_thread()); - -#if TORRENT_USE_IPV6 - if (s == &m_ipv6_sock) - { - TORRENT_ASSERT(m_v6_outstanding > 0); - --m_v6_outstanding; - } - else -#else - TORRENT_UNUSED(s); -#endif - { - TORRENT_ASSERT(m_v4_outstanding > 0); - --m_v4_outstanding; - } - - if (ec == boost::asio::error::operation_aborted) - { -#if TORRENT_USE_IPV6 - if (s == &m_ipv6_sock) - { - if (m_restart_v6) { - --m_restart_v6; - setup_read(s); - } - } - else -#endif - { - if (m_restart_v4) { - --m_restart_v4; - setup_read(s); - } - } - return; - } - if (m_abort) - { - close_impl(); - return; - } - - CHECK_MAGIC; - - for (;;) - { - error_code err; - udp::endpoint ep; - size_t bytes_transferred = s->receive_from(boost::asio::buffer(m_buf, m_buf_size), ep, 0, err); - - // TODO: it would be nice to detect this on posix systems also -#ifdef TORRENT_WINDOWS - if ((err == error_code(ERROR_MORE_DATA, system_category()) - || err == error_code(WSAEMSGSIZE, system_category())) - && m_buf_size < 65536) - { - // if this function fails to allocate memory, m_buf_size - // is set to 0. In that case, don't issue the async_read(). - set_buf_size(m_buf_size * 2); - if (m_buf_size == 0) return; - continue; - } -#endif - - if (err == boost::asio::error::would_block || err == boost::asio::error::try_again) break; - on_read_impl(ep, err, bytes_transferred); - - // found on iOS, socket will be disconnected when app goes backgroud. try to reopen it. - if (err == boost::asio::error::not_connected || err == boost::asio::error::bad_descriptor) - { - ep = s->local_endpoint(err); - if (!err) { - bind(ep, err); - } - return; - } - } - call_drained_handler(); - setup_read(s); -} - -void udp_socket::call_handler(error_code const& ec, udp::endpoint const& ep, char const* buf, int size) -{ - m_observers_locked = true; - for (std::vector::iterator i = m_observers.begin(); - i != m_observers.end();) - { - bool ret = false; - TORRENT_TRY { - ret = (*i)->incoming_packet(ec, ep, buf, size); - } TORRENT_CATCH (std::exception&) {} - if (*i == NULL) i = m_observers.erase(i); - else ++i; - if (ret) break; - } - if (!m_added_observers.empty()) - { - m_observers.insert(m_observers.end(), m_added_observers.begin(), m_added_observers.end()); - m_added_observers.clear(); - } - m_observers_locked = false; - if (m_new_buf_size != m_buf_size) - set_buf_size(m_new_buf_size); -} - -void udp_socket::call_handler(error_code const& ec, const char* host, char const* buf, int size) -{ - m_observers_locked = true; - for (std::vector::iterator i = m_observers.begin(); - i != m_observers.end();) - { - bool ret = false; - TORRENT_TRY { - ret = (*i)->incoming_packet(ec, host, buf, size); - } TORRENT_CATCH (std::exception&) {} - if (*i == NULL) i = m_observers.erase(i); - else ++i; - if (ret) break; - } - if (!m_added_observers.empty()) - { - m_observers.insert(m_observers.end(), m_added_observers.begin(), m_added_observers.end()); - m_added_observers.clear(); - } - m_observers_locked = false; - if (m_new_buf_size != m_buf_size) - set_buf_size(m_new_buf_size); -} - -void udp_socket::call_drained_handler() -{ - m_observers_locked = true; - for (std::vector::iterator i = m_observers.begin(); - i != m_observers.end();) - { - TORRENT_TRY { - (*i)->socket_drained(); - } TORRENT_CATCH (std::exception&) {} - if (*i == NULL) i = m_observers.erase(i); - else ++i; - } - if (!m_added_observers.empty()) - { - m_observers.insert(m_observers.end(), m_added_observers.begin(), m_added_observers.end()); - m_added_observers.clear(); - } - m_observers_locked = false; - if (m_new_buf_size != m_buf_size) - set_buf_size(m_new_buf_size); -} - -void udp_socket::call_writable_handler() -{ - m_observers_locked = true; - for (std::vector::iterator i = m_observers.begin(); - i != m_observers.end();) - { - TORRENT_TRY { - (*i)->writable(); - } TORRENT_CATCH (std::exception&) {} - if (*i == NULL) i = m_observers.erase(i); - else ++i; - } - if (!m_added_observers.empty()) - { - m_observers.insert(m_observers.end(), m_added_observers.begin(), m_added_observers.end()); - m_added_observers.clear(); - } - m_observers_locked = false; - if (m_new_buf_size != m_buf_size) - set_buf_size(m_new_buf_size); -} - -void udp_socket::subscribe(udp_socket_observer* o) -{ - TORRENT_ASSERT(std::find(m_observers.begin(), m_observers.end(), o) == m_observers.end()); - if (m_observers_locked) - m_added_observers.push_back(o); - else - m_observers.push_back(o); -} - -void udp_socket::unsubscribe(udp_socket_observer* o) -{ - std::vector::iterator i = std::find(m_observers.begin(), m_observers.end(), o); - if (i == m_observers.end()) return; - if (m_observers_locked) - *i = NULL; - else - m_observers.erase(i); -} - -void udp_socket::on_read_impl(udp::endpoint const& ep - , error_code const& e, std::size_t bytes_transferred) -{ - TORRENT_ASSERT(m_magic == 0x1337); - TORRENT_ASSERT(is_single_thread()); - - if (e) - { - call_handler(e, ep, 0, 0); - - // don't stop listening on recoverable errors - if (e != boost::asio::error::host_unreachable - && e != boost::asio::error::fault - && e != boost::asio::error::connection_reset - && e != boost::asio::error::connection_refused - && e != boost::asio::error::connection_aborted - && e != boost::asio::error::operation_aborted - && e != boost::asio::error::network_reset - && e != boost::asio::error::network_unreachable -#ifdef WIN32 - // ERROR_MORE_DATA means the same thing as EMSGSIZE - && e != error_code(ERROR_MORE_DATA, system_category()) - && e != error_code(ERROR_HOST_UNREACHABLE, system_category()) - && e != error_code(ERROR_PORT_UNREACHABLE, system_category()) - && e != error_code(ERROR_RETRY, system_category()) - && e != error_code(ERROR_NETWORK_UNREACHABLE, system_category()) - && e != error_code(ERROR_CONNECTION_REFUSED, system_category()) - && e != error_code(ERROR_CONNECTION_ABORTED, system_category()) -#endif - && e != boost::asio::error::message_size) - { - return; - } - - if (m_abort) - { - close_impl(); - return; - } - - return; - } - - TORRENT_TRY { - - if (m_tunnel_packets) - { - // if the source IP doesn't match the proxy's, ignore the packet - if (ep == m_udp_proxy_addr) - unwrap(e, m_buf, bytes_transferred); - } - else if (!m_force_proxy) // block incoming packets that aren't coming via the proxy - { - call_handler(e, ep, m_buf, bytes_transferred); - } - - } TORRENT_CATCH (std::exception&) {} -} - -void udp_socket::setup_read(udp::socket* s) -{ - if (m_abort) - { - close_impl(); - return; - } - -#if TORRENT_USE_IPV6 - if (s == &m_ipv6_sock) - { - if (m_v6_outstanding) - { - ++m_restart_v6; - m_ipv6_sock.cancel(); - return; - } - ++m_v6_outstanding; - } - else -#endif - { - if (m_v4_outstanding) - { - ++m_restart_v4; - m_ipv4_sock.cancel(); - return; - } - ++m_v4_outstanding; - } - - ADD_OUTSTANDING_ASYNC("udp_socket::on_read"); - - udp::endpoint ep; - TORRENT_TRY - { -#if TORRENT_USE_IPV6 - if (s == &m_ipv6_sock) - { - s->async_receive_from(null_buffers() - , ep, make_read_handler6(boost::bind(&udp_socket::on_read, this, _1, s))); - } - else -#endif - { - s->async_receive_from(null_buffers() - , ep, make_read_handler4(boost::bind(&udp_socket::on_read, this, _1, s))); - } - } - TORRENT_CATCH(boost::system::system_error& e) - { -#ifdef BOOST_NO_EXCEPTIONS - // dummy - error_code ec; - boost::system::system_error e(ec); -#endif - get_io_service().post(boost::bind(&udp_socket::on_read - , this, e.code(), s)); - } -} - -void udp_socket::wrap(udp::endpoint const& ep, char const* p, int len, error_code& ec) -{ - CHECK_MAGIC; + TORRENT_UNUSED(flags); using namespace libtorrent::detail; char header[25]; @@ -593,21 +319,19 @@ void udp_socket::wrap(udp::endpoint const& ep, char const* p, int len, error_cod boost::array iovec; iovec[0] = boost::asio::const_buffer(header, h - header); - iovec[1] = boost::asio::const_buffer(p, len); + iovec[1] = boost::asio::const_buffer(p.data(), p.size()); -#if TORRENT_USE_IPV6 - if (m_udp_proxy_addr.address().is_v4() && m_ipv4_sock.is_open()) -#endif - m_ipv4_sock.send_to(iovec, m_udp_proxy_addr, 0, ec); -#if TORRENT_USE_IPV6 - else - m_ipv6_sock.send_to(iovec, m_udp_proxy_addr, 0, ec); -#endif + // set the DF flag for the socket and clear it again in the destructor + set_dont_frag df(m_socket, (flags & dont_fragment) != 0 + && ep.protocol() == udp::v4()); + + m_socket.send_to(iovec, m_socks5_connection->target(), 0, ec); } -void udp_socket::wrap(char const* hostname, int port, char const* p, int len, error_code& ec) +void udp_socket::wrap(char const* hostname, int const port, array_view p + , error_code& ec, int const flags) { - CHECK_MAGIC; + TORRENT_UNUSED(flags); using namespace libtorrent::detail; char header[270]; @@ -624,57 +348,61 @@ void udp_socket::wrap(char const* hostname, int port, char const* p, int len, er boost::array iovec; iovec[0] = boost::asio::const_buffer(header, h - header); - iovec[1] = boost::asio::const_buffer(p, len); + iovec[1] = boost::asio::const_buffer(p.data(), p.size()); -#if TORRENT_USE_IPV6 - if (m_udp_proxy_addr.address().is_v6() && m_ipv6_sock.is_open()) - m_ipv6_sock.send_to(iovec, m_udp_proxy_addr, 0, ec); - else -#endif - m_ipv4_sock.send_to(iovec, m_udp_proxy_addr, 0, ec); + // set the DF flag for the socket and clear it again in the destructor + set_dont_frag df(m_socket, (flags & dont_fragment) != 0 + && m_socket.local_endpoint(ec).protocol() == udp::v4()); + + m_socket.send_to(iovec, m_socks5_connection->target(), 0, ec); } // unwrap the UDP packet from the SOCKS5 header -void udp_socket::unwrap(error_code const& e, char const* buf, int size) +// buf is an in-out parameter. It will be updated +// return false if the packet should be ignored. It's not a valid Socks5 UDP +// forwarded packet +bool udp_socket::unwrap(udp::endpoint& from, array_view& buf) { - CHECK_MAGIC; using namespace libtorrent::detail; // the minimum socks5 header size - if (size <= 10) return; + int const size = buf.size(); + if (size <= 10) return false; - char const* p = buf; + char* p = buf.data(); p += 2; // reserved - int frag = read_uint8(p); + int const frag = read_uint8(p); // fragmentation is not supported - if (frag != 0) return; + if (frag != 0) return false; - udp::endpoint sender; - - int atyp = read_uint8(p); + int const atyp = read_uint8(p); if (atyp == 1) { // IPv4 - sender = read_v4_endpoint(p); + from = read_v4_endpoint(p); } #if TORRENT_USE_IPV6 else if (atyp == 4) { // IPv6 - sender = read_v6_endpoint(p); + from = read_v6_endpoint(p); } #endif else { - int len = read_uint8(p); - if (len > (buf + size) - p) return; + int const len = read_uint8(p); + if (len > buf.end() - p) return false; std::string hostname(p, p + len); + error_code ec; + address addr = address::from_string(hostname, ec); + // we only support "hostnames" that are a dotted decimal IP + if (ec) return false; p += len; - call_handler(e, hostname.c_str(), p, size - (p - buf)); - return; + from = udp::endpoint(addr, read_uint16(p)); } - call_handler(e, sender, p, size - (p - buf)); + buf = array_view(p, size - (p - buf.data())); + return true; } #if !defined BOOST_ASIO_ENABLE_CANCELIO && defined TORRENT_WINDOWS @@ -684,355 +412,154 @@ void udp_socket::unwrap(error_code const& e, char const* buf, int size) void udp_socket::close() { TORRENT_ASSERT(is_single_thread()); - TORRENT_ASSERT(m_magic == 0x1337); error_code ec; - m_ipv4_sock.close(ec); + m_socket.close(ec); TORRENT_ASSERT_VAL(!ec || ec == error::bad_descriptor, ec); -#if TORRENT_USE_IPV6 - m_ipv6_sock.close(ec); - TORRENT_ASSERT_VAL(!ec || ec == error::bad_descriptor, ec); -#endif - m_socks5_sock.close(ec); - TORRENT_ASSERT_VAL(!ec || ec == error::bad_descriptor, ec); - m_resolver.cancel(); - m_timer.cancel(); + if (m_socks5_connection) + { + m_socks5_connection->close(); + m_socks5_connection.reset(); + } m_abort = true; - -#if TORRENT_USE_ASSERTS - m_outstanding_when_aborted = num_outstanding(); -#endif -} - -void udp_socket::set_buf_size(int s) -{ - TORRENT_ASSERT(is_single_thread()); - - if (m_observers_locked) - { - // we can't actually reallocate the buffer while - // it's being used by the observers, we have to - // do that once we're done iterating over them - m_new_buf_size = s; - return; - } - - if (s == m_buf_size) return; - - bool no_mem = false; - char* tmp = static_cast(realloc(m_buf, s)); - if (tmp != 0) - { - m_buf = tmp; - m_buf_size = s; - m_new_buf_size = s; - } - else - { - no_mem = true; - } - - if (no_mem) - { - free(m_buf); - m_buf = 0; - m_buf_size = 0; - m_new_buf_size = 0; - udp::endpoint ep; - call_handler(error::no_memory, ep, 0, 0); - close(); - } - - int size = m_buf_size; - - // don't shrink the size of the receive buffer - error_code ec; - boost::asio::socket_base::receive_buffer_size recv_size; - m_ipv4_sock.get_option(recv_size, ec); - if (!ec) size = (std::max)(recv_size.value(), size); -#if TORRENT_USE_IPV6 - m_ipv6_sock.get_option(recv_size, ec); - if (!ec) size = (std::max)(recv_size.value(), size); -#endif - - error_code ignore_errors; - // set the internal buffer sizes as well - m_ipv4_sock.set_option(boost::asio::socket_base::receive_buffer_size(size) - , ignore_errors); -#if TORRENT_USE_IPV6 - m_ipv6_sock.set_option(boost::asio::socket_base::receive_buffer_size(size) - , ignore_errors); -#endif } void udp_socket::bind(udp::endpoint const& ep, error_code& ec) { - CHECK_MAGIC; TORRENT_ASSERT(is_single_thread()); m_abort = false; - if (m_ipv4_sock.is_open()) m_ipv4_sock.close(ec); -#if TORRENT_USE_IPV6 - if (m_ipv6_sock.is_open()) m_ipv6_sock.close(ec); -#endif + if (m_socket.is_open()) m_socket.close(ec); ec.clear(); if (ep.address().is_v4()) { - m_ipv4_sock.open(udp::v4(), ec); + m_socket.open(udp::v4(), ec); if (ec) return; - - // this is best-effort. ignore errors - error_code err; -#ifdef TORRENT_WINDOWS - m_ipv4_sock.set_option(exclusive_address_use(true), err); -#endif - m_ipv4_sock.set_option(boost::asio::socket_base::reuse_address(true), err); - - m_ipv4_sock.bind(ep, ec); - if (ec) return; - udp::socket::non_blocking_io ioc(true); - m_ipv4_sock.io_control(ioc, ec); - if (ec) return; - setup_read(&m_ipv4_sock); } - #if TORRENT_USE_IPV6 - // TODO: 2 the udp_socket should really just be a single socket, and the - // session should support having more than one, just like with TCP sockets - // for now, just make bind failures non-fatal - if (supports_ipv6() && (ep.address().is_v6() || is_any(ep.address()))) + else if (ep.address().is_v6()) { - udp::endpoint ep6 = ep; - if (is_any(ep.address())) ep6.address(address_v6::any()); - m_ipv6_sock.open(udp::v6(), ec); + m_socket.open(udp::v6(), ec); if (ec) return; - - // this is best-effort. ignore errors error_code err; -#ifdef TORRENT_WINDOWS - m_ipv6_sock.set_option(exclusive_address_use(true), err); -#endif - m_ipv6_sock.set_option(boost::asio::socket_base::reuse_address(true), err); - m_ipv6_sock.set_option(boost::asio::ip::v6_only(true), err); + m_socket.set_option(boost::asio::ip::v6_only(true), err); - m_ipv6_sock.bind(ep6, ec); - if (ec != error_code(boost::system::errc::address_not_available - , boost::system::generic_category())) - { - if (ec) return; - udp::socket::non_blocking_io ioc(true); - m_ipv6_sock.io_control(ioc, ec); - if (ec) return; - setup_read(&m_ipv6_sock); - } - else - { - ec.clear(); - } +#ifdef TORRENT_WINDOWS + // enable Teredo on windows + m_socket.set_option(v6_protection_level(PROTECTION_LEVEL_UNRESTRICTED), err); +#endif // TORRENT_WINDOWS } #endif -#if TORRENT_USE_ASSERTS - m_started = true; + + // this is best-effort. ignore errors + error_code err; +#ifdef TORRENT_WINDOWS + m_socket.set_option(exclusive_address_use(true), err); #endif + m_socket.set_option(boost::asio::socket_base::reuse_address(true), err); + + m_socket.bind(ep, ec); + if (ec) return; + udp::socket::non_blocking_io ioc(true); + m_socket.io_control(ioc, ec); + if (ec) return; + m_bind_port = ep.port(); } void udp_socket::set_proxy_settings(aux::proxy_settings const& ps) { - CHECK_MAGIC; TORRENT_ASSERT(is_single_thread()); error_code ec; - m_socks5_sock.close(ec); - m_tunnel_packets = false; + if (m_socks5_connection) + { + m_socks5_connection->close(); + m_socks5_connection.reset(); + } m_proxy_settings = ps; - if (m_abort) - { - close_impl(); - return; - } + if (m_abort) return; if (ps.type == settings_pack::socks5 || ps.type == settings_pack::socks5_pw) { - m_queue_packets = true; // connect to socks5 server and open up the UDP tunnel - // TODO: use the system resolver_interface here - tcp::resolver::query q(ps.hostname, to_string(ps.port).elems); - ++m_outstanding_ops; -#if TORRENT_USE_ASSERTS - ++m_outstanding_resolve; -#endif - ADD_OUTSTANDING_ASYNC("udp_socket::on_name_lookup"); - m_resolver.async_resolve(q, boost::bind( - &udp_socket::on_name_lookup, this, _1, _2)); + m_socks5_connection = boost::make_shared(boost::ref(m_socket.get_io_service())); + m_socks5_connection->start(ps); } } -void udp_socket::close_impl() +// ===================== SOCKS 5 ========================= + +void socks5::start(aux::proxy_settings const& ps) { - if (m_outstanding_ops == 0) - { - error_code ec; - m_ipv4_sock.close(ec); -#if TORRENT_USE_IPV6 - m_ipv6_sock.close(ec); -#endif - m_socks5_sock.close(ec); - } + m_proxy_settings = ps; + + // TODO: use the system resolver_interface here + tcp::resolver::query q(ps.hostname, to_string(ps.port).elems); + ADD_OUTSTANDING_ASYNC("socks5::on_name_lookup"); + m_resolver.async_resolve(q, boost::bind( + &socks5::on_name_lookup, self(), _1, _2)); } -void udp_socket::on_name_lookup(error_code const& e, tcp::resolver::iterator i) +void socks5::on_name_lookup(error_code const& e, tcp::resolver::iterator i) { - COMPLETE_ASYNC("udp_socket::on_name_lookup"); -#if TORRENT_USE_ASSERTS - TORRENT_ASSERT(m_outstanding_resolve > 0); - --m_outstanding_resolve; -#endif + COMPLETE_ASYNC("socks5::on_name_lookup"); - TORRENT_ASSERT(m_outstanding_ops > 0); - --m_outstanding_ops; - TORRENT_ASSERT(m_outstanding_ops == m_outstanding_connect - + m_outstanding_timeout - + m_outstanding_resolve - + m_outstanding_socks); - - if (m_abort) - { - close_impl(); - return; - } - - CHECK_MAGIC; + if (m_abort) return; if (e == boost::asio::error::operation_aborted) return; - TORRENT_ASSERT(is_single_thread()); - - if (e) - { - if (m_force_proxy) - { - call_handler(e, udp::endpoint(), 0, 0); - } - else - { - // if we can't connect to the proxy, and - // we're not in privacy mode, try to just - // not use a proxy - m_proxy_settings = aux::proxy_settings(); - m_tunnel_packets = false; - } - - drain_queue(); - return; - } + if (e) return; m_proxy_addr.address(i->endpoint().address()); m_proxy_addr.port(i->endpoint().port()); - ADD_OUTSTANDING_ASYNC("udp_socket::on_connected"); - error_code ec; m_socks5_sock.open(m_proxy_addr.address().is_v4()?tcp::v4():tcp::v6(), ec); // enable keepalives m_socks5_sock.set_option(boost::asio::socket_base::keep_alive(true), ec); - ++m_outstanding_ops; -#if TORRENT_USE_ASSERTS - ++m_outstanding_connect; -#endif + ADD_OUTSTANDING_ASYNC("socks5::on_connected"); m_socks5_sock.async_connect(tcp::endpoint(m_proxy_addr.address(), m_proxy_addr.port()) - , boost::bind(&udp_socket::on_connected, this, _1)); + , boost::bind(&socks5::on_connected, self(), _1)); - ++m_outstanding_ops; -#if TORRENT_USE_ASSERTS - ++m_outstanding_timeout; -#endif - - ADD_OUTSTANDING_ASYNC("udp_socket::on_connect_timeout"); + ADD_OUTSTANDING_ASYNC("socks5::on_connect_timeout"); m_timer.expires_from_now(seconds(10)); - m_timer.async_wait(boost::bind(&udp_socket::on_connect_timeout - , this, _1)); + m_timer.async_wait(boost::bind(&socks5::on_connect_timeout + , self(), _1)); } -void udp_socket::on_connect_timeout(error_code const& ec) +void socks5::on_connect_timeout(error_code const& ec) { - COMPLETE_ASYNC("udp_socket::on_connect_timeout"); -#if TORRENT_USE_ASSERTS - TORRENT_ASSERT(m_outstanding_timeout > 0); - --m_outstanding_timeout; -#endif - TORRENT_ASSERT(m_outstanding_ops > 0); - --m_outstanding_ops; - TORRENT_ASSERT(m_outstanding_ops == m_outstanding_connect - + m_outstanding_timeout - + m_outstanding_resolve - + m_outstanding_socks); + COMPLETE_ASYNC("socks5::on_connect_timeout"); if (ec == boost::asio::error::operation_aborted) return; - m_queue_packets = false; - - if (m_abort) - { - close_impl(); - return; - } - - CHECK_MAGIC; - TORRENT_ASSERT(is_single_thread()); + if (m_abort) return; error_code ignore; m_socks5_sock.close(ignore); } -void udp_socket::on_connected(error_code const& e) +void socks5::on_connected(error_code const& e) { - COMPLETE_ASYNC("udp_socket::on_connected"); - - TORRENT_ASSERT(is_single_thread()); - -#if TORRENT_USE_ASSERTS - TORRENT_ASSERT(m_outstanding_connect > 0); - --m_outstanding_connect; -#endif - TORRENT_ASSERT(m_outstanding_ops > 0); - --m_outstanding_ops; - TORRENT_ASSERT(m_outstanding_ops == m_outstanding_connect - + m_outstanding_timeout - + m_outstanding_resolve - + m_outstanding_socks); - CHECK_MAGIC; + COMPLETE_ASYNC("socks5::on_connected"); m_timer.cancel(); if (e == boost::asio::error::operation_aborted) return; - if (m_abort) - { - close_impl(); - return; - } + if (m_abort) return; - if (e) - { - // we failed to connect to the proxy, if we don't have force_proxy set, - // drain the queue over the UDP socket - if (!m_force_proxy) - { - drain_queue(); - } - - call_handler(e, udp::endpoint(), 0, 0); - return; - } + // we failed to connect to the proxy + if (e) return; using namespace libtorrent::detail; @@ -1052,81 +579,31 @@ void udp_socket::on_connected(error_code const& e) write_uint8(2, p); // username/password } TORRENT_ASSERT_VAL(p - m_tmp_buf < int(sizeof(m_tmp_buf)), (p - m_tmp_buf)); - ADD_OUTSTANDING_ASYNC("udp_socket::on_handshake1"); - ++m_outstanding_ops; -#if TORRENT_USE_ASSERTS - ++m_outstanding_socks; -#endif + ADD_OUTSTANDING_ASYNC("socks5::on_handshake1"); boost::asio::async_write(m_socks5_sock, boost::asio::buffer(m_tmp_buf, p - m_tmp_buf) - , boost::bind(&udp_socket::handshake1, this, _1)); + , boost::bind(&socks5::handshake1, self(), _1)); } -void udp_socket::handshake1(error_code const& e) +void socks5::handshake1(error_code const& e) { - COMPLETE_ASYNC("udp_socket::on_handshake1"); -#if TORRENT_USE_ASSERTS - TORRENT_ASSERT(m_outstanding_socks > 0); - --m_outstanding_socks; -#endif - TORRENT_ASSERT(m_outstanding_ops > 0); - --m_outstanding_ops; - TORRENT_ASSERT(m_outstanding_ops == m_outstanding_connect - + m_outstanding_timeout - + m_outstanding_resolve - + m_outstanding_socks); - if (m_abort) - { - close_impl(); - return; - } - CHECK_MAGIC; - if (e) - { - drain_queue(); - return; - } + COMPLETE_ASYNC("socks5::on_handshake1"); + if (m_abort) return; + if (e) return; - TORRENT_ASSERT(is_single_thread()); - - ADD_OUTSTANDING_ASYNC("udp_socket::on_handshake2"); - ++m_outstanding_ops; -#if TORRENT_USE_ASSERTS - ++m_outstanding_socks; -#endif + ADD_OUTSTANDING_ASYNC("socks5::on_handshake2"); boost::asio::async_read(m_socks5_sock, boost::asio::buffer(m_tmp_buf, 2) - , boost::bind(&udp_socket::handshake2, this, _1)); + , boost::bind(&socks5::handshake2, self(), _1)); } -void udp_socket::handshake2(error_code const& e) +void socks5::handshake2(error_code const& e) { - COMPLETE_ASYNC("udp_socket::on_handshake2"); -#if TORRENT_USE_ASSERTS - TORRENT_ASSERT(m_outstanding_socks > 0); - --m_outstanding_socks; -#endif - TORRENT_ASSERT(m_outstanding_ops > 0); - --m_outstanding_ops; - TORRENT_ASSERT(m_outstanding_ops == m_outstanding_connect - + m_outstanding_timeout - + m_outstanding_resolve - + m_outstanding_socks); - if (m_abort) - { - close_impl(); - return; - } - CHECK_MAGIC; + COMPLETE_ASYNC("socks5::on_handshake2"); + if (m_abort) return; - if (e) - { - drain_queue(); - return; - } + if (e) return; using namespace libtorrent::detail; - TORRENT_ASSERT(is_single_thread()); - char* p = &m_tmp_buf[0]; int version = read_uint8(p); int method = read_uint8(p); @@ -1135,7 +612,6 @@ void udp_socket::handshake2(error_code const& e) { error_code ec; m_socks5_sock.close(ec); - drain_queue(); return; } @@ -1149,7 +625,6 @@ void udp_socket::handshake2(error_code const& e) { error_code ec; m_socks5_sock.close(ec); - drain_queue(); return; } @@ -1163,85 +638,34 @@ void udp_socket::handshake2(error_code const& e) write_uint8(uint8_t(m_proxy_settings.password.size()), p); write_string(m_proxy_settings.password, p); TORRENT_ASSERT_VAL(p - m_tmp_buf < int(sizeof(m_tmp_buf)), (p - m_tmp_buf)); - ADD_OUTSTANDING_ASYNC("udp_socket::on_handshake3"); - ++m_outstanding_ops; -#if TORRENT_USE_ASSERTS - ++m_outstanding_socks; -#endif + ADD_OUTSTANDING_ASYNC("socks5::on_handshake3"); boost::asio::async_write(m_socks5_sock, boost::asio::buffer(m_tmp_buf, p - m_tmp_buf) - , boost::bind(&udp_socket::handshake3, this, _1)); + , boost::bind(&socks5::handshake3, self(), _1)); } else { - drain_queue(); error_code ec; m_socks5_sock.close(ec); return; } } -void udp_socket::handshake3(error_code const& e) +void socks5::handshake3(error_code const& e) { - COMPLETE_ASYNC("udp_socket::on_handshake3"); -#if TORRENT_USE_ASSERTS - TORRENT_ASSERT(m_outstanding_socks > 0); - --m_outstanding_socks; -#endif - TORRENT_ASSERT(m_outstanding_ops > 0); - --m_outstanding_ops; - TORRENT_ASSERT(m_outstanding_ops == m_outstanding_connect - + m_outstanding_timeout - + m_outstanding_resolve - + m_outstanding_socks); - if (m_abort) - { - close_impl(); - return; - } - CHECK_MAGIC; - if (e) - { - drain_queue(); - return; - } + COMPLETE_ASYNC("socks5::on_handshake3"); + if (m_abort) return; + if (e) return; - TORRENT_ASSERT(is_single_thread()); - - ADD_OUTSTANDING_ASYNC("udp_socket::on_handshake4"); - ++m_outstanding_ops; -#if TORRENT_USE_ASSERTS - ++m_outstanding_socks; -#endif + ADD_OUTSTANDING_ASYNC("socks5::on_handshake4"); boost::asio::async_read(m_socks5_sock, boost::asio::buffer(m_tmp_buf, 2) - , boost::bind(&udp_socket::handshake4, this, _1)); + , boost::bind(&socks5::handshake4, self(), _1)); } -void udp_socket::handshake4(error_code const& e) +void socks5::handshake4(error_code const& e) { - COMPLETE_ASYNC("udp_socket::on_handshake4"); -#if TORRENT_USE_ASSERTS - TORRENT_ASSERT(m_outstanding_socks > 0); - --m_outstanding_socks; -#endif - TORRENT_ASSERT(m_outstanding_ops > 0); - --m_outstanding_ops; - TORRENT_ASSERT(m_outstanding_ops == m_outstanding_connect - + m_outstanding_timeout - + m_outstanding_resolve - + m_outstanding_socks); - if (m_abort) - { - close_impl(); - return; - } - CHECK_MAGIC; - if (e) - { - drain_queue(); - return; - } - - TORRENT_ASSERT(is_single_thread()); + COMPLETE_ASYNC("socks5::on_handshake4"); + if (m_abort) return; + if (e) return; using namespace libtorrent::detail; @@ -1249,18 +673,13 @@ void udp_socket::handshake4(error_code const& e) int version = read_uint8(p); int status = read_uint8(p); - if (version != 1 || status != 0) - { - drain_queue(); - return; - } + if (version != 1 || status != 0) return; socks_forward_udp(/*l*/); } -void udp_socket::socks_forward_udp() +void socks5::socks_forward_udp() { - CHECK_MAGIC; using namespace libtorrent::detail; // send SOCKS5 UDP command @@ -1273,78 +692,28 @@ void udp_socket::socks_forward_udp() write_uint32(0, p); // 0.0.0.0 write_uint16(0, p); // :0 TORRENT_ASSERT_VAL(p - m_tmp_buf < int(sizeof(m_tmp_buf)), (p - m_tmp_buf)); - ADD_OUTSTANDING_ASYNC("udp_socket::connect1"); - ++m_outstanding_ops; -#if TORRENT_USE_ASSERTS - ++m_outstanding_socks; -#endif + ADD_OUTSTANDING_ASYNC("socks5::connect1"); boost::asio::async_write(m_socks5_sock, boost::asio::buffer(m_tmp_buf, p - m_tmp_buf) - , boost::bind(&udp_socket::connect1, this, _1)); + , boost::bind(&socks5::connect1, self(), _1)); } -void udp_socket::connect1(error_code const& e) +void socks5::connect1(error_code const& e) { - COMPLETE_ASYNC("udp_socket::connect1"); -#if TORRENT_USE_ASSERTS - TORRENT_ASSERT(m_outstanding_socks > 0); - --m_outstanding_socks; -#endif - TORRENT_ASSERT(m_outstanding_ops > 0); - --m_outstanding_ops; - TORRENT_ASSERT(m_outstanding_ops == m_outstanding_connect - + m_outstanding_timeout - + m_outstanding_resolve - + m_outstanding_socks); - if (m_abort) - { - close_impl(); - return; - } - CHECK_MAGIC; - if (e) - { - drain_queue(); - return; - } + COMPLETE_ASYNC("socks5::connect1"); + if (m_abort) return; + if (e) return; - TORRENT_ASSERT(is_single_thread()); - - ADD_OUTSTANDING_ASYNC("udp_socket::connect2"); - ++m_outstanding_ops; -#if TORRENT_USE_ASSERTS - ++m_outstanding_socks; -#endif + ADD_OUTSTANDING_ASYNC("socks5::connect2"); boost::asio::async_read(m_socks5_sock, boost::asio::buffer(m_tmp_buf, 10) - , boost::bind(&udp_socket::connect2, this, _1)); + , boost::bind(&socks5::connect2, self(), _1)); } -void udp_socket::connect2(error_code const& e) +void socks5::connect2(error_code const& e) { - COMPLETE_ASYNC("udp_socket::connect2"); -#if TORRENT_USE_ASSERTS - TORRENT_ASSERT(m_outstanding_socks > 0); - --m_outstanding_socks; -#endif - TORRENT_ASSERT(m_outstanding_ops > 0); - --m_outstanding_ops; - TORRENT_ASSERT(m_outstanding_ops == m_outstanding_connect - + m_outstanding_timeout - + m_outstanding_resolve - + m_outstanding_socks); + COMPLETE_ASYNC("socks5::connect2"); - if (m_abort) - { - m_queue.clear(); - return; - } - CHECK_MAGIC; - if (e) - { - drain_queue(); - return; - } - - TORRENT_ASSERT(is_single_thread()); + if (m_abort) return; + if (e) return; using namespace libtorrent::detail; @@ -1354,11 +723,7 @@ void udp_socket::connect2(error_code const& e) ++p; // RESERVED int atyp = read_uint8(p); // address type - if (version != 5 || status != 0) - { - drain_queue(); - return; - } + if (version != 5 || status != 0) return; if (atyp == 1) { @@ -1368,70 +733,38 @@ void udp_socket::connect2(error_code const& e) else { // in this case we need to read more data from the socket + // no IPv6 support for UDP socks5 TORRENT_ASSERT(false); - drain_queue(); return; } - m_tunnel_packets = true; - drain_queue(); + // we're done! + m_active = true; - ADD_OUTSTANDING_ASYNC("udp_socket::hung_up"); - ++m_outstanding_ops; -#if TORRENT_USE_ASSERTS - ++m_outstanding_socks; -#endif + ADD_OUTSTANDING_ASYNC("socks5::hung_up"); boost::asio::async_read(m_socks5_sock, boost::asio::buffer(m_tmp_buf, 10) - , boost::bind(&udp_socket::hung_up, this, _1)); + , boost::bind(&socks5::hung_up, self(), _1)); } -void udp_socket::hung_up(error_code const& e) +void socks5::hung_up(error_code const& e) { - COMPLETE_ASYNC("udp_socket::hung_up"); -#if TORRENT_USE_ASSERTS - TORRENT_ASSERT(m_outstanding_socks > 0); - --m_outstanding_socks; -#endif - TORRENT_ASSERT(m_outstanding_ops > 0); - --m_outstanding_ops; - TORRENT_ASSERT(m_outstanding_ops == m_outstanding_connect - + m_outstanding_timeout - + m_outstanding_resolve - + m_outstanding_socks); - if (m_abort) - { - close_impl(); - return; - } - CHECK_MAGIC; - TORRENT_ASSERT(is_single_thread()); + COMPLETE_ASYNC("socks5::hung_up"); + m_active = false; if (e == boost::asio::error::operation_aborted || m_abort) return; // the socks connection was closed, re-open it - set_proxy_settings(m_proxy_settings); + start(m_proxy_settings); } -void udp_socket::drain_queue() +void socks5::close() { - m_queue_packets = false; - - // forward all packets that were put in the queue - while (!m_queue.empty()) - { - queued_packet const& p = m_queue.front(); - error_code ec; - if (p.hostname) - { - udp_socket::send_hostname(p.hostname, p.ep.port(), &p.buf[0] - , p.buf.size(), ec, p.flags | dont_queue); - free(p.hostname); - } - else - { - udp_socket::send(p.ep, &p.buf[0], p.buf.size(), ec, p.flags | dont_queue); - } - m_queue.pop_front(); - } + m_abort = true; + error_code ec; + m_socks5_sock.close(ec); + m_resolver.cancel(); + m_timer.cancel(); +} + } diff --git a/src/udp_tracker_connection.cpp b/src/udp_tracker_connection.cpp index 7922cbac3..66beb3c69 100644 --- a/src/udp_tracker_connection.cpp +++ b/src/udp_tracker_connection.cpp @@ -319,19 +319,19 @@ namespace libtorrent tracker_connection::close(); } - bool udp_tracker_connection::on_receive_hostname(error_code const& e - , char const* hostname, char const* buf, int size) + bool udp_tracker_connection::on_receive_hostname(char const* hostname + , char const* buf, int size) { TORRENT_UNUSED(hostname); // just ignore the hostname this came from, pretend that // it's from the same endpoint we sent it to (i.e. the same // port). We have so many other ways of confirming this packet // comes from the tracker anyway, so it's not a big deal - return on_receive(e, m_target, buf, size); + return on_receive(m_target, buf, size); } - bool udp_tracker_connection::on_receive(error_code const& e - , udp::endpoint const& ep, char const* buf, int size) + bool udp_tracker_connection::on_receive(udp::endpoint const& ep + , char const* buf, int const size) { #ifndef TORRENT_DISABLE_LOGGING boost::shared_ptr cb = requester(); @@ -369,8 +369,6 @@ namespace libtorrent return false; } - if (e) fail(e); - #ifndef TORRENT_DISABLE_LOGGING if (cb) cb->debug_log("<== UDP_TRACKER_PACKET [ size: %d ]", size); #endif @@ -499,13 +497,13 @@ namespace libtorrent error_code ec; if (!m_hostname.empty()) { - m_man.get_udp_socket().send_hostname(m_hostname.c_str() - , m_target.port(), buf, 16, ec + m_man.send_hostname(m_hostname.c_str() + , m_target.port(), aux::array_view(buf, 16), ec , udp_socket::tracker_connection); } else { - m_man.get_udp_socket().send(m_target, buf, 16, ec + m_man.send(m_target, aux::array_view(buf, 16), ec , udp_socket::tracker_connection); } @@ -563,12 +561,12 @@ namespace libtorrent error_code ec; if (!m_hostname.empty()) { - m_man.get_udp_socket().send_hostname(m_hostname.c_str(), m_target.port() - , buf, sizeof(buf), ec, udp_socket::tracker_connection); + m_man.send_hostname(m_hostname.c_str(), m_target.port() + , aux::array_view(buf), ec, udp_socket::tracker_connection); } else { - m_man.get_udp_socket().send(m_target, buf, sizeof(buf), ec + m_man.send(m_target, aux::array_view(buf), ec , udp_socket::tracker_connection); } m_state = action_scrape; @@ -762,13 +760,13 @@ namespace libtorrent if (!m_hostname.empty()) { - m_man.get_udp_socket().send_hostname(m_hostname.c_str() - , m_target.port(), buf, out - buf, ec + m_man.send_hostname(m_hostname.c_str() + , m_target.port(), aux::array_view(buf, out - buf), ec , udp_socket::tracker_connection); } else { - m_man.get_udp_socket().send(m_target, buf, out - buf, ec + m_man.send(m_target, aux::array_view(buf, out - buf), ec , udp_socket::tracker_connection); } m_state = action_announce; diff --git a/src/utp_socket_manager.cpp b/src/utp_socket_manager.cpp index c7b104493..17580c1ec 100644 --- a/src/utp_socket_manager.cpp +++ b/src/utp_socket_manager.cpp @@ -40,18 +40,22 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/random.hpp" #include "libtorrent/performance_counters.hpp" #include "libtorrent/aux_/time.hpp" // for aux::time_now() +#include "libtorrent/aux_/array_view.hpp" // #define TORRENT_DEBUG_MTU 1135 namespace libtorrent { + using namespace libtorrent::aux; - utp_socket_manager::utp_socket_manager(aux::session_settings const& sett - , udp_socket& s + utp_socket_manager::utp_socket_manager( + send_fun_t const& send_fun + , incoming_utp_callback_t const& cb + , io_service& ios + , aux::session_settings const& sett , counters& cnt - , void* ssl_context - , incoming_utp_callback_t cb) - : m_sock(s) + , void* ssl_context) + : m_send_fun(send_fun) , m_cb(cb) , m_last_socket(0) , m_new_connection(-1) @@ -60,6 +64,7 @@ namespace libtorrent , m_last_if_update(min_time()) , m_sock_buf_size(0) , m_counters(cnt) + , m_ios(ios) , m_mtu_idx(0) , m_ssl_context(ssl_context) { @@ -94,7 +99,6 @@ namespace libtorrent void utp_socket_manager::mtu_for_dest(address const& addr, int& link_mtu, int& utp_mtu) { - int mtu = 0; if (is_teredo(addr)) mtu = TORRENT_TEREDO_MTU; else mtu = TORRENT_ETHERNET_MTU; @@ -118,13 +122,12 @@ namespace libtorrent mtu -= TORRENT_UDP_HEADER; - if (m_sock.get_proxy_settings().type == settings_pack::socks5 - || m_sock.get_proxy_settings().type == settings_pack::socks5_pw) + if (m_sett.get_int(settings_pack::proxy_type) == settings_pack::socks5 + || m_sett.get_int(settings_pack::proxy_type) == settings_pack::socks5_pw) { // this is for the IP layer - address proxy_addr = m_sock.proxy_addr().address(); - if (proxy_addr.is_v4()) mtu -= TORRENT_IPV4_HEADER; - else mtu -= TORRENT_IPV6_HEADER; + // assume the proxy is running over IPv4 + mtu -= TORRENT_IPV4_HEADER; // this is for the SOCKS layer mtu -= TORRENT_SOCKS5_HEADER; @@ -148,102 +151,20 @@ namespace libtorrent #if !defined TORRENT_HAS_DONT_FRAGMENT && !defined TORRENT_DEBUG_MTU TORRENT_UNUSED(flags); #endif - if (!m_sock.is_open()) - { - ec = boost::asio::error::operation_aborted; - return; - } #ifdef TORRENT_DEBUG_MTU // drop packets that exceed the debug MTU if ((flags & dont_fragment) && len > TORRENT_DEBUG_MTU) return; #endif -#ifdef TORRENT_HAS_DONT_FRAGMENT - error_code tmp; - if (flags & utp_socket_manager::dont_fragment) - { - m_sock.set_option(libtorrent::dont_fragment(true), tmp); - TORRENT_ASSERT_VAL(!tmp, tmp.message()); - } -#endif - m_sock.send(ep, p, len, ec, udp_socket::peer_connection); -#ifdef TORRENT_HAS_DONT_FRAGMENT - if (flags & utp_socket_manager::dont_fragment) - { - m_sock.set_option(libtorrent::dont_fragment(false), tmp); - TORRENT_ASSERT_VAL(!tmp, tmp.message()); - } -#endif + m_send_fun(ep, array_view(p, len), ec + , ((flags & dont_fragment) ? udp_socket::dont_fragment : 0) + | udp_socket::peer_connection); } - int utp_socket_manager::local_port(error_code& ec) const + bool utp_socket_manager::incoming_packet(udp::endpoint const& ep + , char const* p, int const size) { - return m_sock.local_endpoint(ec).port(); - } - - tcp::endpoint utp_socket_manager::local_endpoint(address const& remote, error_code& ec) const - { - tcp::endpoint socket_ep = m_sock.local_endpoint(ec); - - // first enumerate the routes in the routing table - if (aux::time_now() - seconds(60) > m_last_route_update) - { - m_last_route_update = aux::time_now(); - error_code err; - m_routes = enum_routes(m_sock.get_io_service(), err); - if (err) return socket_ep; - } - - if (m_routes.empty()) return socket_ep; - // then find the best match - ip_route* best = &m_routes[0]; - for (std::vector::iterator i = m_routes.begin() - , end(m_routes.end()); i != end; ++i) - { - if (is_any(i->destination) && i->destination.is_v4() == remote.is_v4()) - { - best = &*i; - break; - } - - if (match_addr_mask(remote, i->destination, i->netmask)) - { - best = &*i; - break; - } - } - - // best now tells us which interface we would send over - // for this target. Now figure out what the local address - // is for that interface - - if (aux::time_now() - seconds(60) > m_last_if_update) - { - m_last_if_update = aux::time_now(); - error_code err; - m_interfaces = enum_net_interfaces(m_sock.get_io_service(), err); - if (err) return socket_ep; - } - - for (std::vector::iterator i = m_interfaces.begin() - , end(m_interfaces.end()); i != end; ++i) - { - if (i->interface_address.is_v4() != remote.is_v4()) - continue; - - if (strcmp(best->name, i->name) == 0) - return tcp::endpoint(i->interface_address, socket_ep.port()); - } - return socket_ep; - } - - bool utp_socket_manager::incoming_packet(error_code const& ec, udp::endpoint const& ep - , char const* p, int size) - { - // TODO: 2 we may want to take ec into account here. possibly close - // connections quicker - TORRENT_UNUSED(ec); // UTP_LOGV("incoming packet size:%d\n", size); if (size < int(sizeof(utp_header))) return false; @@ -294,14 +215,14 @@ namespace libtorrent // UTP_LOGV("not found, new connection id:%d\n", m_new_connection); - boost::shared_ptr c(new (std::nothrow) socket_type(m_sock.get_io_service())); + boost::shared_ptr c(new (std::nothrow) socket_type(m_ios)); if (!c) return false; TORRENT_ASSERT(m_new_connection == -1); // create the new socket with this ID m_new_connection = id; - instantiate_connection(m_sock.get_io_service(), aux::proxy_settings(), *c + instantiate_connection(m_ios, aux::proxy_settings(), *c , m_ssl_context, this, true, false); @@ -397,27 +318,6 @@ namespace libtorrent m_utp_sockets.erase(i); } - void utp_socket_manager::set_sock_buf(int size) - { - if (size < m_sock_buf_size) return; - m_sock.set_buf_size(size); - error_code ec; - // add more socket buffer storage on the lower level socket - // to avoid dropping packets because of a full receive buffer - // while processing a packet - - // only update the buffer size if it's bigger than - // what we already have - udp::socket::receive_buffer_size recv_buf_size_opt; - m_sock.get_option(recv_buf_size_opt, ec); - if (recv_buf_size_opt.value() < size * 10) - { - m_sock.set_option(udp::socket::receive_buffer_size(size * 10), ec); - m_sock.set_option(udp::socket::send_buffer_size(size * 3), ec); - } - m_sock_buf_size = size; - } - void utp_socket_manager::inc_stats_counter(int counter, int delta) { TORRENT_ASSERT((counter >= counters::utp_packet_loss diff --git a/src/utp_stream.cpp b/src/utp_stream.cpp index bf2b1f0df..808d48dec 100644 --- a/src/utp_stream.cpp +++ b/src/utp_stream.cpp @@ -460,9 +460,6 @@ public: // the address of the remote endpoint address m_remote_address; - // the local address - address m_local_address; - // the send and receive buffers // maps packet sequence numbers packet_buffer m_inbuf; @@ -887,9 +884,8 @@ utp_stream::endpoint_type utp_stream::local_endpoint(error_code& ec) const if (m_impl == 0 || m_impl->m_sm == 0) { ec = boost::asio::error::not_connected; - return endpoint_type(); } - return tcp::endpoint(m_impl->m_local_address, m_impl->m_sm->local_port(ec)); + return endpoint_type(); } utp_stream::~utp_stream() @@ -1192,9 +1188,6 @@ void utp_stream::do_connect(tcp::endpoint const& ep) m_impl->m_connect_handler = true; - error_code ec; - m_impl->m_local_address = m_impl->m_sm->local_endpoint(m_impl->m_remote_address, ec).address(); - if (m_impl->test_socket_state()) return; m_impl->send_syn(); } @@ -3089,9 +3082,6 @@ bool utp_socket_impl::incoming_packet(boost::uint8_t const* buf, int size m_remote_address = ep.address(); m_port = ep.port(); - error_code ec; - m_local_address = m_sm->local_endpoint(m_remote_address, ec).address(); - m_ack_nr = ph->seq_nr; m_seq_nr = random() & 0xffff; m_acked_seq_nr = (m_seq_nr - 1) & ACK_MASK; diff --git a/test/test_fast_extension.cpp b/test/test_fast_extension.cpp index e8e1aecf4..3f8212f2f 100644 --- a/test/test_fast_extension.cpp +++ b/test/test_fast_extension.cpp @@ -431,7 +431,9 @@ boost::shared_ptr setup_peer(tcp::socket& s, sha1_hash& ih // wait for the torrent to be ready wait_for_downloading(*ses, "ses"); - s.connect(tcp::endpoint(address::from_string("127.0.0.1", ec), ses->listen_port()), ec); + int const port = ses->listen_port(); + fprintf(stderr, "listen port: %d\n", port); + s.connect(tcp::endpoint(address::from_string("127.0.0.1", ec), port), ec); if (ec) TEST_ERROR(ec.message()); print_session_log(*ses);