This patch does: (#633)

1. simplifies and regularizies creation of listen sockets based on the listen
   interfaces setting.
2. simplifies and improves the behavior of UDP sockets, which are now explicitly
   opened per listen interface
3. transitions udp tracker, DHT and uTP socket manager over to using the new
   udp sockets
4. greatly simplified udp_socket to only wrap a single underlying socket (as
   opposed to one IPv4 and IPv6 socket)
5. improved behavior of bind-to-device
6. introduce an array_view type to make udp packet passing code simpler
7. simplify and make setting of DF flag more robust
8. simplify and regularize port mapping of listen sockets
This commit is contained in:
Arvid Norberg 2016-04-24 15:26:28 -04:00
parent 97bdb4ba25
commit 7251575d98
19 changed files with 1357 additions and 2020 deletions

View File

@ -160,6 +160,7 @@ nobase_include_HEADERS = \
tommath_superclass.h \ tommath_superclass.h \
\ \
aux_/alert_manager_variadic_emplace.hpp \ aux_/alert_manager_variadic_emplace.hpp \
aux_/array_view.hpp \
aux_/allocating_handler.hpp \ aux_/allocating_handler.hpp \
aux_/bind_to_device.hpp \ aux_/bind_to_device.hpp \
aux_/cpuid.hpp \ aux_/cpuid.hpp \

View File

@ -0,0 +1,76 @@
/*
Copyright (c) 2016, Arvid Norberg
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in
the documentation and/or other materials provided with the distribution.
* Neither the name of the author nor the names of its
contributors may be used to endorse or promote products derived
from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef TORRENT_ARRAY_VIEW_HPP_INCLUDED
#define TORRENT_ARRAY_VIEW_HPP_INCLUDED
#include <vector>
namespace libtorrent { namespace aux {
template <typename T>
struct array_view
{
array_view() : m_ptr(NULL), m_len(0) {}
array_view(T* p, int l) : m_ptr(p), m_len(l) {}
template <size_t N>
explicit array_view(boost::array<T, N>& arr)
: m_ptr(arr.data()), m_len(arr.size()) {}
template <size_t N>
explicit array_view(T (&arr)[N])
: m_ptr(&arr[0]), m_len(N) {}
explicit array_view(std::vector<T>& vec)
: m_ptr(vec.data()), m_len(vec.size()) {}
size_t size() const { return m_len; }
T* data() const { return m_ptr; }
T* begin() const { return m_ptr; }
T* end() const { return m_ptr + m_len; }
T& operator[](int idx)
{
TORRENT_ASSERT(idx >= 0);
TORRENT_ASSERT(idx < m_len);
return m_ptr[idx];
}
private:
T* m_ptr;
size_t m_len;
};
}}
#endif

View File

@ -126,17 +126,21 @@ namespace libtorrent
{ {
listen_socket_t() listen_socket_t()
: tcp_external_port(0) : tcp_external_port(0)
, udp_external_port(0)
, ssl(false) , ssl(false)
, udp_write_blocked(false)
{ {
tcp_port_mapping[0] = -1; tcp_port_mapping[0] = -1;
tcp_port_mapping[1] = -1; tcp_port_mapping[1] = -1;
udp_port_mapping[0] = -1;
udp_port_mapping[1] = -1;
} }
// this is typically empty but can be set // this is typically empty but can be set
// to the WAN IP address of NAT-PMP or UPnP router // to the WAN IP address of NAT-PMP or UPnP router
address external_address; address external_address;
// this is a cached local endpoint for the listen socket // this is a cached local endpoint for the listen TCP socket
tcp::endpoint local_endpoint; tcp::endpoint local_endpoint;
// this is typically set to the same as the local // this is typically set to the same as the local
@ -147,15 +151,27 @@ namespace libtorrent
// to be published to peers, since this is the port // to be published to peers, since this is the port
// the client is reachable through. // the client is reachable through.
int tcp_external_port; int tcp_external_port;
int udp_external_port;
// 0 is natpmp 1 is upnp // 0 is natpmp 1 is upnp
int tcp_port_mapping[2]; int tcp_port_mapping[2];
int udp_port_mapping[2];
// set to true if this is an SSL listen socket // set to true if this is an SSL listen socket
bool ssl; bool ssl;
// the actual socket // this is true when the udp socket send() has failed with EAGAIN or
// EWOULDBLOCK. i.e. we're currently waiting for the socket to become
// writeable again. Once it is, we'll set it to false and notify the utp
// socket manager
bool udp_write_blocked;
// the actual sockets (TCP listen socket and UDP socket)
// An entry does not necessarily have a UDP or TCP socket. One of these
// pointers may be null!
// TODO: 3 make these unique_ptr<>
boost::shared_ptr<tcp::acceptor> sock; boost::shared_ptr<tcp::acceptor> sock;
boost::shared_ptr<udp_socket> udp_sock;
}; };
namespace aux namespace aux
@ -175,7 +191,6 @@ namespace libtorrent
: session_interface : session_interface
, dht::dht_observer , dht::dht_observer
, boost::noncopyable , boost::noncopyable
, udp_socket_observer
, uncork_interface , uncork_interface
, single_threaded , single_threaded
{ {
@ -184,9 +199,6 @@ namespace libtorrent
// maximum length of query names which can be registered by extensions // maximum length of query names which can be registered by extensions
enum { max_dht_query_length = 15 }; enum { max_dht_query_length = 15 };
#ifdef TORRENT_DEBUG
// friend class ::libtorrent::peer_connection;
#endif
#if TORRENT_USE_INVARIANT_CHECKS #if TORRENT_USE_INVARIANT_CHECKS
friend class libtorrent::invariant_access; friend class libtorrent::invariant_access;
#endif #endif
@ -245,9 +257,6 @@ namespace libtorrent
// need the initial push to connect peers // need the initial push to connect peers
void prioritize_connections(boost::weak_ptr<torrent> t) TORRENT_OVERRIDE; void prioritize_connections(boost::weak_ptr<torrent> t) TORRENT_OVERRIDE;
// if we are listening on an IPv6 interface
// this will return one of the IPv6 addresses on this
// machine, otherwise just an empty endpoint
tcp::endpoint get_ipv6_interface() const TORRENT_OVERRIDE; tcp::endpoint get_ipv6_interface() const TORRENT_OVERRIDE;
tcp::endpoint get_ipv4_interface() const TORRENT_OVERRIDE; tcp::endpoint get_ipv4_interface() const TORRENT_OVERRIDE;
@ -354,8 +363,6 @@ namespace libtorrent
, std::vector<address> const& addresses, int port); , std::vector<address> const& addresses, int port);
#endif #endif
void maybe_update_udp_mapping(int nat, bool ssl, int local_port, int external_port);
#if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) #if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS)
torrent const* find_encrypted_torrent( torrent const* find_encrypted_torrent(
sha1_hash const& info_hash, sha1_hash const& xor_mask) TORRENT_OVERRIDE; sha1_hash const& info_hash, sha1_hash const& xor_mask) TORRENT_OVERRIDE;
@ -522,7 +529,15 @@ namespace libtorrent
#ifndef TORRENT_DISABLE_DHT #ifndef TORRENT_DISABLE_DHT
bool is_dht_running() const { return (m_dht.get() != NULL); } bool is_dht_running() const { return (m_dht.get() != NULL); }
int external_udp_port() const TORRENT_OVERRIDE { return m_external_udp_port; } int external_udp_port() const TORRENT_OVERRIDE
{
for (std::list<listen_socket_t>::const_iterator i = m_listen_sockets.begin()
, end(m_listen_sockets.end()); i != end; ++i)
{
if (i->udp_sock) return i->udp_external_port;
}
return -1;
}
#endif #endif
#if TORRENT_USE_I2P #if TORRENT_USE_I2P
@ -1042,20 +1057,27 @@ namespace libtorrent
int m_outstanding_router_lookups; int m_outstanding_router_lookups;
#endif #endif
bool incoming_packet(error_code const& ec void send_udp_packet_hostname(char const* hostname
, udp::endpoint const&, char const* buf, int size) TORRENT_OVERRIDE; , int port
, array_view<char const> p
, error_code& ec
, int flags);
// see m_external_listen_port. This is the same void send_udp_packet(bool ssl
// but for the udp port used by the DHT. , udp::endpoint const& ep
// TODO: 3 once udp sockets are part of m_listen_sockets, remove this , array_view<char const> p
int m_external_udp_port; , error_code& ec
, int flags);
void on_udp_writeable(boost::weak_ptr<udp_socket> s, error_code const& ec);
void on_udp_packet(boost::weak_ptr<udp_socket> const& s
, bool ssl, error_code const& ec);
udp_socket m_udp_socket;
libtorrent::utp_socket_manager m_utp_socket_manager; libtorrent::utp_socket_manager m_utp_socket_manager;
#ifdef TORRENT_USE_OPENSSL #ifdef TORRENT_USE_OPENSSL
// used for uTP connectons over SSL // used for uTP connectons over SSL
udp_socket m_ssl_udp_socket;
libtorrent::utp_socket_manager m_ssl_utp_socket_manager; libtorrent::utp_socket_manager m_ssl_utp_socket_manager;
#endif #endif
@ -1068,18 +1090,17 @@ namespace libtorrent
boost::shared_ptr<upnp> m_upnp; boost::shared_ptr<upnp> m_upnp;
boost::shared_ptr<lsd> m_lsd; boost::shared_ptr<lsd> m_lsd;
// TODO: 3 once the udp socket is in listen_socket_t, these should
// move in there too
// 0 is natpmp 1 is upnp
int m_udp_mapping[2];
#ifdef TORRENT_USE_OPENSSL
int m_ssl_udp_mapping[2];
#endif
// mask is a bitmask of which protocols to remap on: // mask is a bitmask of which protocols to remap on:
// 1: NAT-PMP // 1: NAT-PMP
// 2: UPnP // 2: UPnP
void remap_ports(boost::uint32_t mask, listen_socket_t& s); // TODO: 3 perhaps this function should move into listen_socket_t
enum remap_port_mask_t
{
remap_natpmp = 1,
remap_upnp = 2,
remap_natpmp_and_upnp = 3
};
void remap_ports(remap_port_mask_t mask, listen_socket_t& s);
// the timer used to fire the tick // the timer used to fire the tick
deadline_timer m_timer; deadline_timer m_timer;

View File

@ -249,9 +249,8 @@ namespace libtorrent { namespace aux
virtual void prioritize_connections(boost::weak_ptr<torrent> t) = 0; virtual void prioritize_connections(boost::weak_ptr<torrent> t) = 0;
// TODO: 3 these should go away!
virtual tcp::endpoint get_ipv6_interface() const = 0;
virtual tcp::endpoint get_ipv4_interface() const = 0; virtual tcp::endpoint get_ipv4_interface() const = 0;
virtual tcp::endpoint get_ipv6_interface() const = 0;
virtual void trigger_auto_manage() = 0; virtual void trigger_auto_manage() = 0;

View File

@ -52,6 +52,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/socket.hpp" #include "libtorrent/socket.hpp"
#include "libtorrent/thread.hpp" #include "libtorrent/thread.hpp"
#include "libtorrent/deadline_timer.hpp" #include "libtorrent/deadline_timer.hpp"
#include "libtorrent/aux_/array_view.hpp"
namespace libtorrent namespace libtorrent
{ {
@ -68,10 +69,14 @@ namespace libtorrent { namespace dht
struct TORRENT_EXTRA_EXPORT dht_tracker TORRENT_FINAL struct TORRENT_EXTRA_EXPORT dht_tracker TORRENT_FINAL
: udp_socket_interface : udp_socket_interface
, udp_socket_observer
, boost::enable_shared_from_this<dht_tracker> , boost::enable_shared_from_this<dht_tracker>
{ {
dht_tracker(dht_observer* observer, udp_socket& sock typedef boost::function<void(udp::endpoint const&
, aux::array_view<char const>, error_code&, int)> send_fun_t;
dht_tracker(dht_observer* observer
, io_service& ios
, send_fun_t const& send_fun
, dht_settings const& settings, counters& cnt , dht_settings const& settings, counters& cnt
, dht_storage_constructor_type storage_constructor , dht_storage_constructor_type storage_constructor
, entry const& state); , entry const& state);
@ -129,10 +134,8 @@ namespace libtorrent { namespace dht
, std::vector<dht_lookup>& requests); , std::vector<dht_lookup>& requests);
void update_stats_counters(counters& c) const; void update_stats_counters(counters& c) const;
// translate bittorrent kademlia message into the generic kademlia message void incoming_error(error_code const& ec, udp::endpoint const&);
// used by the library bool incoming_packet(udp::endpoint const&, char const* buf, int size);
virtual bool incoming_packet(error_code const& ec
, udp::endpoint const&, char const* buf, int size);
private: private:
@ -154,7 +157,7 @@ namespace libtorrent { namespace dht
counters& m_counters; counters& m_counters;
node m_dht; node m_dht;
udp_socket& m_sock; send_fun_t m_send_fun;
dht_logger* m_log; dht_logger* m_log;
std::vector<char> m_send_buf; std::vector<char> m_send_buf;

View File

@ -49,6 +49,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include <boost/weak_ptr.hpp> #include <boost/weak_ptr.hpp>
#include <boost/tuple/tuple.hpp> #include <boost/tuple/tuple.hpp>
#include <boost/unordered_map.hpp> #include <boost/unordered_map.hpp>
#include <boost/function.hpp>
#ifdef TORRENT_USE_OPENSSL #ifdef TORRENT_USE_OPENSSL
#include <boost/asio/ssl/context.hpp> #include <boost/asio/ssl/context.hpp>
@ -62,8 +63,9 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/peer.hpp" // peer_entry #include "libtorrent/peer.hpp" // peer_entry
#include "libtorrent/deadline_timer.hpp" #include "libtorrent/deadline_timer.hpp"
#include "libtorrent/union_endpoint.hpp" #include "libtorrent/union_endpoint.hpp"
#include "libtorrent/udp_socket.hpp" // for udp_socket_observer
#include "libtorrent/io_service.hpp" #include "libtorrent/io_service.hpp"
#include "libtorrent/thread.hpp"
#include "libtorrent/aux_/array_view.hpp"
namespace libtorrent namespace libtorrent
{ {
@ -272,8 +274,7 @@ namespace libtorrent
int m_completion_timeout; int m_completion_timeout;
typedef mutex mutex_t; mutable mutex m_mutex;
mutable mutex_t m_mutex;
// used for timeouts // used for timeouts
// this is set when the request has been sent // this is set when the request has been sent
@ -314,10 +315,9 @@ namespace libtorrent
address const& bind_interface() const { return m_req.bind_ip; } address const& bind_interface() const { return m_req.bind_ip; }
void sent_bytes(int bytes); void sent_bytes(int bytes);
void received_bytes(int bytes); void received_bytes(int bytes);
virtual bool on_receive(error_code const&, udp::endpoint const& virtual bool on_receive(udp::endpoint const&
, char const* /* buf */, int /* size */) { return false; } , char const* /* buf */, int /* size */) { return false; }
virtual bool on_receive_hostname(error_code const& virtual bool on_receive_hostname(char const* /* hostname */
, char const* /* hostname */
, char const* /* buf */, int /* size */) { return false; } , char const* /* buf */, int /* size */) { return false; }
boost::shared_ptr<tracker_connection> shared_from_this() boost::shared_ptr<tracker_connection> shared_from_this()
@ -341,12 +341,19 @@ namespace libtorrent
}; };
class TORRENT_EXTRA_EXPORT tracker_manager TORRENT_FINAL class TORRENT_EXTRA_EXPORT tracker_manager TORRENT_FINAL
: public udp_socket_observer : boost::noncopyable
, boost::noncopyable
{ {
public: public:
tracker_manager(udp_socket& sock typedef boost::function<void(udp::endpoint const&
, aux::array_view<char const>
, error_code&, int)> send_fun_t;
typedef boost::function<void(char const*, int
, aux::array_view<char const>
, error_code&, int)> send_fun_hostname_t;
tracker_manager(send_fun_t const& send_fun
, send_fun_hostname_t const& send_fun_hostname
, counters& stats_counters , counters& stats_counters
, resolver_interface& resolver , resolver_interface& resolver
, aux::session_settings const& sett , aux::session_settings const& sett
@ -370,26 +377,31 @@ namespace libtorrent
void sent_bytes(int bytes); void sent_bytes(int bytes);
void received_bytes(int bytes); void received_bytes(int bytes);
virtual bool incoming_packet(error_code const& e, udp::endpoint const& ep void incoming_error(error_code const& ec, udp::endpoint const& ep);
, char const* buf, int size) TORRENT_OVERRIDE; bool incoming_packet(udp::endpoint const& ep, char const* buf, int size);
// this is only used for SOCKS packets, since // this is only used for SOCKS packets, since
// they may be addressed to hostname // they may be addressed to hostname
virtual bool incoming_packet(error_code const& e, char const* hostname // TODO: 3 make sure the udp_socket supports passing on string-hostnames
, char const* buf, int size) TORRENT_OVERRIDE; // too, and that this function is used
bool incoming_packet(char const* hostname, char const* buf, int size);
void update_transaction_id( void update_transaction_id(
boost::shared_ptr<udp_tracker_connection> c boost::shared_ptr<udp_tracker_connection> c
, boost::uint64_t tid); , boost::uint64_t tid);
aux::session_settings const& settings() const { return m_settings; } aux::session_settings const& settings() const { return m_settings; }
udp_socket& get_udp_socket() { return m_udp_socket; }
resolver_interface& host_resolver() { return m_host_resolver; } resolver_interface& host_resolver() { return m_host_resolver; }
void send_hostname(char const* hostname, int port, aux::array_view<char const> p
, error_code& ec, int flags = 0);
void send(udp::endpoint const& ep, aux::array_view<char const> p
, error_code& ec, int flags = 0);
private: private:
typedef mutex mutex_t; mutable mutex m_mutex;
mutable mutex_t m_mutex;
// maps transactionid to the udp_tracker_connection // maps transactionid to the udp_tracker_connection
// TODO: this should be unique_ptr in the future // TODO: this should be unique_ptr in the future
@ -400,7 +412,8 @@ namespace libtorrent
typedef std::vector<boost::shared_ptr<http_tracker_connection> > http_conns_t; typedef std::vector<boost::shared_ptr<http_tracker_connection> > http_conns_t;
http_conns_t m_http_conns; http_conns_t m_http_conns;
class udp_socket& m_udp_socket; send_fun_t m_send_fun;
send_fun_hostname_t m_send_fun_hostname;
resolver_interface& m_host_resolver; resolver_interface& m_host_resolver;
aux::session_settings const& m_settings; aux::session_settings const& m_settings;
counters& m_stats_counters; counters& m_stats_counters;

View File

@ -41,30 +41,12 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/thread.hpp" #include "libtorrent/thread.hpp"
#include "libtorrent/deadline_timer.hpp" #include "libtorrent/deadline_timer.hpp"
#include "libtorrent/debug.hpp" #include "libtorrent/debug.hpp"
#include "libtorrent/aux_/array_view.hpp"
#include "libtorrent/aux_/allocating_handler.hpp" #include "libtorrent/aux_/allocating_handler.hpp"
#include <deque>
namespace libtorrent namespace libtorrent
{ {
struct TORRENT_EXTRA_EXPORT udp_socket_observer struct socks5;
{
// return true if the packet was handled (it won't be
// propagated to the next observer)
virtual bool incoming_packet(error_code const& ec
, udp::endpoint const&, char const* buf, int size) = 0;
virtual bool incoming_packet(error_code const& /* ec */
, char const* /* hostname */, char const* /* buf */, int /* size */) { return false; }
// called when the socket becomes writeable, after having
// failed with EWOULDBLOCK
virtual void writable() {}
// called every time the socket is drained of packets
virtual void socket_drained() {}
protected:
~udp_socket_observer() {}
};
class TORRENT_EXTRA_EXPORT udp_socket : single_threaded class TORRENT_EXTRA_EXPORT udp_socket : single_threaded
{ {
@ -76,19 +58,38 @@ namespace libtorrent
peer_connection = 1 peer_connection = 1
, tracker_connection = 2 , tracker_connection = 2
, dont_queue = 4 , dont_queue = 4
, dont_fragment = 8
}; };
bool is_open() const { return m_abort == false; } bool is_open() const { return m_abort == false; }
io_service& get_io_service() { return m_ipv4_sock.get_io_service(); } io_service& get_io_service() { return m_socket.get_io_service(); }
void subscribe(udp_socket_observer* o); template <typename Handler>
void unsubscribe(udp_socket_observer* o); void async_read(Handler h)
{
m_socket.async_receive(null_buffers(), h);
}
template <typename Handler>
void async_write(Handler h)
{
m_socket.async_send(null_buffers(), h);
}
struct packet
{
aux::array_view<char> data;
udp::endpoint from;
error_code error;
};
int read(aux::array_view<packet> pkts, error_code& ec);
// this is only valid when using a socks5 proxy // this is only valid when using a socks5 proxy
void send_hostname(char const* hostname, int port, char const* p void send_hostname(char const* hostname, int port, aux::array_view<char const> p
, int len, error_code& ec, int flags = 0); , error_code& ec, int flags = 0);
void send(udp::endpoint const& ep, char const* p, int len void send(udp::endpoint const& ep, aux::array_view<char const> p
, error_code& ec, int flags = 0); , error_code& ec, int flags = 0);
void bind(udp::endpoint const& ep, error_code& ec); void bind(udp::endpoint const& ep, error_code& ec);
void close(); void close();
@ -99,13 +100,8 @@ namespace libtorrent
void set_force_proxy(bool f) { m_force_proxy = f; } void set_force_proxy(bool f) { m_force_proxy = f; }
bool is_closed() const { return m_abort; } bool is_closed() const { return m_abort; }
tcp::endpoint local_endpoint(error_code& ec) const udp::endpoint local_endpoint(error_code& ec) const
{ { return m_socket.local_endpoint(ec); }
udp::endpoint ep = m_ipv4_sock.local_endpoint(ec);
return tcp::endpoint(ep.address(), ep.port());
}
void set_buf_size(int s);
typedef udp::socket::receive_buffer_size receive_buffer_size; typedef udp::socket::receive_buffer_size receive_buffer_size;
typedef udp::socket::send_buffer_size send_buffer_size; typedef udp::socket::send_buffer_size send_buffer_size;
@ -113,201 +109,51 @@ namespace libtorrent
template <class SocketOption> template <class SocketOption>
void get_option(SocketOption const& opt, error_code& ec) void get_option(SocketOption const& opt, error_code& ec)
{ {
#if TORRENT_USE_IPV6 m_socket.get_option(opt, ec);
if (opt.level(udp::v6()) == IPPROTO_IPV6)
m_ipv6_sock.get_option(opt, ec);
else
#endif
m_ipv4_sock.get_option(opt, ec);
} }
template <class SocketOption> template <class SocketOption>
void set_option(SocketOption const& opt, error_code& ec) void set_option(SocketOption const& opt, error_code& ec)
{ {
if (opt.level(udp::v4()) != IPPROTO_IPV6) m_socket.set_option(opt, ec);
m_ipv4_sock.set_option(opt, ec);
#if TORRENT_USE_IPV6
if (opt.level(udp::v6()) != IPPROTO_IP)
m_ipv6_sock.set_option(opt, ec);
#endif
} }
template <class SocketOption> template <class SocketOption>
void get_option(SocketOption& opt, error_code& ec) void get_option(SocketOption& opt, error_code& ec)
{ {
#if TORRENT_USE_IPV6 m_socket.get_option(opt, ec);
if (opt.level(udp::v6()) == IPPROTO_IPV6)
m_ipv6_sock.get_option(opt, ec);
else
#endif
m_ipv4_sock.get_option(opt, ec);
} }
udp::endpoint proxy_addr() const { return m_proxy_addr; }
private: private:
struct queued_packet
{
queued_packet()
: hostname(NULL)
, flags(0)
{}
udp::endpoint ep;
char* hostname;
buffer buf;
int flags;
};
// number of outstanding UDP socket operations
// using the UDP socket buffer
int num_outstanding() const
{
return m_v4_outstanding
#if TORRENT_USE_IPV6
+ m_v6_outstanding
#endif
;
}
// non-copyable // non-copyable
udp_socket(udp_socket const&); udp_socket(udp_socket const&);
udp_socket& operator=(udp_socket const&); udp_socket& operator=(udp_socket const&);
void close_impl(); void wrap(udp::endpoint const& ep, aux::array_view<char const> p, error_code& ec, int flags);
void wrap(char const* hostname, int port, aux::array_view<char const> p, error_code& ec, int flags);
bool unwrap(udp::endpoint& from, aux::array_view<char>& buf);
// observers on this udp socket udp::socket m_socket;
std::vector<udp_socket_observer*> m_observers;
std::vector<udp_socket_observer*> m_added_observers;
template <class Handler> // TODO: 2 this should probably be a scoped_ptr<> or unique_ptr
aux::allocating_handler<Handler, TORRENT_READ_HANDLER_MAX_SIZE> // with a hard coded size
make_read_handler4(Handler const& handler) int const m_buf_size;
{
return aux::allocating_handler<Handler, TORRENT_READ_HANDLER_MAX_SIZE>(
handler, m_v4_read_handler_storage
);
}
#if TORRENT_USE_IPV6
template <class Handler>
aux::allocating_handler<Handler, TORRENT_READ_HANDLER_MAX_SIZE>
make_read_handler6(Handler const& handler)
{
return aux::allocating_handler<Handler, TORRENT_READ_HANDLER_MAX_SIZE>(
handler, m_v6_read_handler_storage
);
}
#endif
// this is true while iterating over the observers
// vector, invoking observer hooks. We may not
// add new observers during this time, since it
// may invalidate the iterator. If this is true,
// instead add new observers to m_added_observers
// and they will be added later
bool m_observers_locked;
void call_handler(error_code const& ec, udp::endpoint const& ep
, char const* buf, int size);
void call_handler(error_code const& ec, const char* host
, char const* buf, int size);
void call_drained_handler();
void call_writable_handler();
void on_writable(error_code const& ec, udp::socket* s);
void setup_read(udp::socket* s);
void on_read(error_code const& ec, udp::socket* s);
void on_read_impl(udp::endpoint const& ep
, error_code const& e, std::size_t bytes_transferred);
void on_name_lookup(error_code const& e, tcp::resolver::iterator i);
void on_connect_timeout(error_code const& ec);
void on_connected(error_code const& ec);
void handshake1(error_code const& e);
void handshake2(error_code const& e);
void handshake3(error_code const& e);
void handshake4(error_code const& e);
void socks_forward_udp();
void connect1(error_code const& e);
void connect2(error_code const& e);
void hung_up(error_code const& e);
void drain_queue();
void wrap(udp::endpoint const& ep, char const* p, int len, error_code& ec);
void wrap(char const* hostname, int port, char const* p, int len, error_code& ec);
void unwrap(error_code const& e, char const* buf, int size);
udp::socket m_ipv4_sock;
aux::handler_storage<TORRENT_READ_HANDLER_MAX_SIZE> m_v4_read_handler_storage;
deadline_timer m_timer;
int m_buf_size;
// if the buffer size is attempted
// to be changed while the buffer is
// being used, this member is set to
// the desired size, and it's resized
// later
int m_new_buf_size;
char* m_buf; char* m_buf;
#if TORRENT_USE_IPV6
udp::socket m_ipv6_sock;
aux::handler_storage<TORRENT_READ_HANDLER_MAX_SIZE> m_v6_read_handler_storage;
#endif
boost::uint16_t m_bind_port; boost::uint16_t m_bind_port;
boost::uint8_t m_v4_outstanding;
boost::uint8_t m_restart_v4;
#if TORRENT_USE_IPV6
boost::uint8_t m_v6_outstanding;
boost::uint8_t m_restart_v6;
#endif
tcp::socket m_socks5_sock;
aux::proxy_settings m_proxy_settings; aux::proxy_settings m_proxy_settings;
tcp::resolver m_resolver;
char m_tmp_buf[270];
bool m_queue_packets;
bool m_tunnel_packets;
bool m_force_proxy;
bool m_abort;
// this is the endpoint the proxy server lives at. boost::shared_ptr<socks5> m_socks5_connection;
// when performing a UDP associate, we get another
// endpoint (presumably on the same IP) where we're
// supposed to send UDP packets.
udp::endpoint m_proxy_addr;
// this is where UDP packets that are to be forwarded // TODO: 3 add a unit test for force-proxy
// are sent. The result from UDP ASSOCIATE is stored bool m_force_proxy:1;
// in here. bool m_abort:1;
udp::endpoint m_udp_proxy_addr;
// while we're connecting to the proxy
// we have to queue the packets, we'll flush
// them once we're connected
std::deque<queued_packet> m_queue;
// counts the number of outstanding async
// operations hanging on this socket
int m_outstanding_ops;
#if TORRENT_USE_IPV6
bool m_v6_write_subscribed:1;
#endif
bool m_v4_write_subscribed:1;
#if TORRENT_USE_ASSERTS #if TORRENT_USE_ASSERTS
bool m_started; bool m_started;
int m_magic; int m_magic;
int m_outstanding_when_aborted;
int m_outstanding_connect;
int m_outstanding_timeout;
int m_outstanding_resolve;
int m_outstanding_socks;
#endif #endif
}; };
} }

View File

@ -94,10 +94,8 @@ namespace libtorrent
void timeout(error_code const& error); void timeout(error_code const& error);
void start_announce(); void start_announce();
bool on_receive(error_code const& e, udp::endpoint const& ep bool on_receive(udp::endpoint const& ep, char const* buf, int size);
, char const* buf, int size); bool on_receive_hostname(char const* hostname, char const* buf, int size);
bool on_receive_hostname(error_code const& e, char const* hostname
, char const* buf, int size);
bool on_connect_response(char const* buf, int size); bool on_connect_response(char const* buf, int size);
bool on_announce_response(char const* buf, int size); bool on_announce_response(char const* buf, int size);
bool on_scrape_response(char const* buf, int size); bool on_scrape_response(char const* buf, int size);

View File

@ -39,36 +39,51 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/session_status.hpp" #include "libtorrent/session_status.hpp"
#include "libtorrent/enum_net.hpp" #include "libtorrent/enum_net.hpp"
#include "libtorrent/aux_/session_settings.hpp" #include "libtorrent/aux_/session_settings.hpp"
#include "libtorrent/aux_/array_view.hpp"
#include "libtorrent/aux_/disable_warnings_push.hpp"
#include <boost/function.hpp>
#include "libtorrent/aux_/disable_warnings_pop.hpp"
namespace libtorrent namespace libtorrent
{ {
class udp_socket;
class utp_stream; class utp_stream;
struct utp_socket_impl; struct utp_socket_impl;
struct counters; struct counters;
typedef boost::function<void(boost::shared_ptr<socket_type> const&)> incoming_utp_callback_t; struct utp_socket_manager TORRENT_FINAL
struct utp_socket_manager TORRENT_FINAL : udp_socket_observer
{ {
utp_socket_manager(aux::session_settings const& sett, udp_socket& s typedef boost::function<void(udp::endpoint const&
, counters& cnt, void* ssl_context, incoming_utp_callback_t cb); , aux::array_view<char const>
, error_code&, int)> send_fun_t;
typedef boost::function<void(boost::shared_ptr<socket_type> const&)>
incoming_utp_callback_t;
utp_socket_manager(send_fun_t const& send_fun
, incoming_utp_callback_t const& cb
, io_service& ios
, aux::session_settings const& sett
, counters& cnt, void* ssl_context
);
~utp_socket_manager(); ~utp_socket_manager();
// return false if this is not a uTP packet // return false if this is not a uTP packet
virtual bool incoming_packet(error_code const& ec, udp::endpoint const& ep bool incoming_packet(udp::endpoint const& ep, char const* p, int size);
, char const* p, int size) TORRENT_OVERRIDE;
virtual bool incoming_packet(error_code const&, char const*, char const*, int) TORRENT_OVERRIDE
{ return false; }
virtual void writable() TORRENT_OVERRIDE;
virtual void socket_drained() TORRENT_OVERRIDE; // if the UDP socket failed with an EAGAIN or EWOULDBLOCK, this will be
// called once the socket is writeable again
void writable();
// when the upper layer has drained the underlying UDP socket, this is
// called, and uTP sockets will send their ACKs. This ensures ACKs at
// least coalese packets returned during the same wakeup
void socket_drained();
void tick(time_point now); void tick(time_point now);
tcp::endpoint local_endpoint(address const& remote, error_code& ec) const;
int local_port(error_code& ec) const;
// flags for send_packet // flags for send_packet
enum { dont_fragment = 1 }; enum { dont_fragment = 1 };
void send_packet(udp::endpoint const& ep, char const* p, int len void send_packet(udp::endpoint const& ep, char const* p, int len
@ -89,7 +104,6 @@ namespace libtorrent
int loss_multiplier() const { return m_sett.get_int(settings_pack::utp_loss_multiplier); } int loss_multiplier() const { return m_sett.get_int(settings_pack::utp_loss_multiplier); }
void mtu_for_dest(address const& addr, int& link_mtu, int& utp_mtu); void mtu_for_dest(address const& addr, int& link_mtu, int& utp_mtu);
void set_sock_buf(int size);
int num_sockets() const { return m_utp_sockets.size(); } int num_sockets() const { return m_utp_sockets.size(); }
void defer_ack(utp_socket_impl* s); void defer_ack(utp_socket_impl* s);
@ -114,7 +128,7 @@ namespace libtorrent
// explicitly disallow assignment, to silence msvc warning // explicitly disallow assignment, to silence msvc warning
utp_socket_manager& operator=(utp_socket_manager const&); utp_socket_manager& operator=(utp_socket_manager const&);
udp_socket& m_sock; send_fun_t m_send_fun;
incoming_utp_callback_t m_cb; incoming_utp_callback_t m_cb;
// replace with a hash-map // replace with a hash-map
@ -165,6 +179,8 @@ namespace libtorrent
// stats counters // stats counters
counters& m_counters; counters& m_counters;
io_service& m_ios;
boost::array<int, 3> m_restrict_mtu; boost::array<int, 3> m_restrict_mtu;
int m_mtu_idx; int m_mtu_idx;

View File

@ -30,8 +30,6 @@ POSSIBILITY OF SUCH DAMAGE.
*/ */
#if !defined TORRENT_DISABLE_DHT
#include "test.hpp" #include "test.hpp"
#include "simulator/simulator.hpp" #include "simulator/simulator.hpp"
@ -41,6 +39,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/performance_counters.hpp" #include "libtorrent/performance_counters.hpp"
#include "libtorrent/entry.hpp" #include "libtorrent/entry.hpp"
#include "libtorrent/session_settings.hpp" #include "libtorrent/session_settings.hpp"
#include "libtorrent/aux_/array_view.hpp"
#include "libtorrent/kademlia/dht_observer.hpp" #include "libtorrent/kademlia/dht_observer.hpp"
#include <functional> #include <functional>
@ -50,6 +49,8 @@ using namespace libtorrent;
namespace lt = libtorrent; namespace lt = libtorrent;
using namespace sim; using namespace sim;
#if !defined TORRENT_DISABLE_DHT
struct obs : dht::dht_observer struct obs : dht::dht_observer
{ {
virtual void set_external_address(address const& addr virtual void set_external_address(address const& addr
@ -100,8 +101,22 @@ TORRENT_TEST(dht_rate_limit)
counters cnt; counters cnt;
entry state; entry state;
boost::shared_ptr<lt::dht::dht_tracker> dht = boost::make_shared<lt::dht::dht_tracker>( boost::shared_ptr<lt::dht::dht_tracker> dht = boost::make_shared<lt::dht::dht_tracker>(
&o, sock, dhtsett, cnt, dht::dht_default_storage_constructor, state); &o, boost::ref(dht_ios), boost::bind(&udp_socket::send, &sock, _1, _2, _3, _4)
sock.subscribe(dht.get()); , dhtsett, cnt, dht::dht_default_storage_constructor, state);
bool stop = false;
std::function<void(error_code const&, size_t)> on_read
= [&](error_code const& ec, size_t bytes)
{
if (ec) return;
udp_socket::packet p;
error_code err;
int const num = sock.read(lt::aux::array_view<udp_socket::packet>(&p, 1), err);
if (num) dht->incoming_packet(p.from, p.data.data(), p.data.size());
if (stop || err) return;
sock.async_read(on_read);
};
sock.async_read(on_read);
// sender // sender
int num_packets_sent = 0; int num_packets_sent = 0;
@ -120,7 +135,7 @@ TORRENT_TEST(dht_rate_limit)
timer.async_wait([&](error_code const& ec) timer.async_wait([&](error_code const& ec)
{ {
dht->stop(); dht->stop();
sock.unsubscribe(dht.get()); stop = true;
sender_sock.close(); sender_sock.close();
sock.close(); sock.close();
}); });

View File

@ -90,21 +90,22 @@ namespace libtorrent { namespace dht
// class that puts the networking and the kademlia node in a single // class that puts the networking and the kademlia node in a single
// unit and connecting them together. // unit and connecting them together.
dht_tracker::dht_tracker(dht_observer* observer dht_tracker::dht_tracker(dht_observer* observer
, udp_socket& sock , io_service& ios
, send_fun_t const& send_fun
, dht_settings const& settings , dht_settings const& settings
, counters& cnt , counters& cnt
, dht_storage_constructor_type storage_constructor , dht_storage_constructor_type storage_constructor
, entry const& state) , entry const& state)
: m_counters(cnt) : m_counters(cnt)
, m_dht(this, settings, extract_node_id(state), observer, cnt, storage_constructor) , m_dht(this, settings, extract_node_id(state), observer, cnt, storage_constructor)
, m_sock(sock) , m_send_fun(send_fun)
, m_log(observer) , m_log(observer)
, m_key_refresh_timer(sock.get_io_service()) , m_key_refresh_timer(ios)
, m_connection_timer(sock.get_io_service()) , m_connection_timer(ios)
, m_refresh_timer(sock.get_io_service()) , m_refresh_timer(ios)
, m_settings(settings) , m_settings(settings)
, m_abort(false) , m_abort(false)
, m_host_resolver(sock.get_io_service()) , m_host_resolver(ios)
, m_send_quota(settings.upload_rate_limit) , m_send_quota(settings.upload_rate_limit)
, m_last_tick(aux::time_now()) , m_last_tick(aux::time_now())
{ {
@ -277,29 +278,26 @@ namespace libtorrent { namespace dht
m_dht.direct_request(ep, e, f); m_dht.direct_request(ep, e, f);
} }
// translate bittorrent kademlia message into the generice kademlia message void dht_tracker::incoming_error(error_code const& ec, udp::endpoint const& ep)
// used by the library
bool dht_tracker::incoming_packet(error_code const& ec
, udp::endpoint const& ep, char const* buf, int size)
{ {
if (ec) if (ec == boost::asio::error::connection_refused
{ || ec == boost::asio::error::connection_reset
if (ec == boost::asio::error::connection_refused || ec == boost::asio::error::connection_aborted
|| ec == boost::asio::error::connection_reset
|| ec == boost::asio::error::connection_aborted
#ifdef WIN32 #ifdef WIN32
|| ec == error_code(ERROR_HOST_UNREACHABLE, system_category()) || ec == error_code(ERROR_HOST_UNREACHABLE, system_category())
|| ec == error_code(ERROR_PORT_UNREACHABLE, system_category()) || ec == error_code(ERROR_PORT_UNREACHABLE, system_category())
|| ec == error_code(ERROR_CONNECTION_REFUSED, system_category()) || ec == error_code(ERROR_CONNECTION_REFUSED, system_category())
|| ec == error_code(ERROR_CONNECTION_ABORTED, system_category()) || ec == error_code(ERROR_CONNECTION_ABORTED, system_category())
#endif #endif
) )
{ {
m_dht.unreachable(ep); m_dht.unreachable(ep);
}
return false;
} }
}
bool dht_tracker::incoming_packet( udp::endpoint const& ep
, char const* buf, int size)
{
if (size <= 20 || *buf != 'd' || buf[size-1] != 'e') return false; if (size <= 20 || *buf != 'd' || buf[size-1] != 'e') return false;
// remove this line/check once the DHT supports IPv6 // remove this line/check once the DHT supports IPv6
if (!ep.address().is_v4()) return false; if (!ep.address().is_v4()) return false;
@ -316,7 +314,7 @@ namespace libtorrent { namespace dht
// these are class A networks not available to the public // these are class A networks not available to the public
// if we receive messages from here, that seems suspicious // if we receive messages from here, that seems suspicious
boost::uint8_t class_a[] = { 3, 6, 7, 9, 11, 19, 21, 22, 25 static boost::uint8_t const class_a[] = { 3, 6, 7, 9, 11, 19, 21, 22, 25
, 26, 28, 29, 30, 33, 34, 48, 51, 56 }; , 26, 28, 29, 30, 33, 34, 48, 51, 56 };
int num = sizeof(class_a)/sizeof(class_a[0]); int num = sizeof(class_a)/sizeof(class_a[0]);
@ -451,7 +449,7 @@ namespace libtorrent { namespace dht
m_send_quota -= m_send_buf.size(); m_send_quota -= m_send_buf.size();
error_code ec; error_code ec;
m_sock.send(addr, &m_send_buf[0], int(m_send_buf.size()), ec, 0); m_send_fun(addr, aux::array_view<char const>(&m_send_buf[0], m_send_buf.size()), ec, 0);
if (ec) if (ec)
{ {
m_counters.inc_stats_counter(counters::dht_messages_out_dropped); m_counters.inc_stats_counter(counters::dht_messages_out_dropped);

File diff suppressed because it is too large Load Diff

View File

@ -3410,7 +3410,7 @@ namespace libtorrent
// the tracker did resolve to a different type of address, so announce // the tracker did resolve to a different type of address, so announce
// to that as well // to that as well
// TODO 2: there's a bug when removing a torrent or shutting down the session, // TODO 3: there's a bug when removing a torrent or shutting down the session,
// where the second announce is skipped (in this case, the one to the IPv6 // where the second announce is skipped (in this case, the one to the IPv6
// name). This should be fixed by generalizing the tracker list structure to // name). This should be fixed by generalizing the tracker list structure to
// separate the IPv6 and IPv4 addresses as conceptually separate trackers, // separate the IPv6 and IPv4 addresses as conceptually separate trackers,

View File

@ -60,6 +60,8 @@ namespace
namespace libtorrent namespace libtorrent
{ {
using namespace libtorrent::aux;
timeout_handler::timeout_handler(io_service& ios) timeout_handler::timeout_handler(io_service& ios)
: m_completion_timeout(0) : m_completion_timeout(0)
, m_start_time(clock_type::now()) , m_start_time(clock_type::now())
@ -191,9 +193,8 @@ namespace libtorrent
m_man.remove_request(this); m_man.remove_request(this);
} }
// TODO: 2 some of these arguments could probably be moved to the tracker_manager::tracker_manager(send_fun_t const& send_fun
// tracker request itself. like the ip_filter and settings , send_fun_hostname_t const& send_fun_hostname
tracker_manager::tracker_manager(class udp_socket& sock
, counters& stats_counters , counters& stats_counters
, resolver_interface& resolver , resolver_interface& resolver
, aux::session_settings const& sett , aux::session_settings const& sett
@ -201,7 +202,8 @@ namespace libtorrent
, aux::session_logger& ses , aux::session_logger& ses
#endif #endif
) )
: m_udp_socket(sock) : m_send_fun(send_fun)
, m_send_fun_hostname(send_fun_hostname)
, m_host_resolver(resolver) , m_host_resolver(resolver)
, m_settings(sett) , m_settings(sett)
, m_stats_counters(stats_counters) , m_stats_counters(stats_counters)
@ -231,7 +233,7 @@ namespace libtorrent
void tracker_manager::remove_request(tracker_connection const* c) void tracker_manager::remove_request(tracker_connection const* c)
{ {
mutex_t::scoped_lock l(m_mutex); mutex::scoped_lock l(m_mutex);
http_conns_t::iterator i = std::find_if(m_http_conns.begin() http_conns_t::iterator i = std::find_if(m_http_conns.begin()
, m_http_conns.end() , m_http_conns.end()
@ -266,7 +268,7 @@ namespace libtorrent
, tracker_request req , tracker_request req
, boost::weak_ptr<request_callback> c) , boost::weak_ptr<request_callback> c)
{ {
mutex_t::scoped_lock l(m_mutex); mutex::scoped_lock l(m_mutex);
TORRENT_ASSERT(req.num_want >= 0); TORRENT_ASSERT(req.num_want >= 0);
TORRENT_ASSERT(!m_abort || req.event == tracker_request::stopped); TORRENT_ASSERT(!m_abort || req.event == tracker_request::stopped);
if (m_abort && req.event != tracker_request::stopped) return; if (m_abort && req.event != tracker_request::stopped) return;
@ -309,8 +311,8 @@ namespace libtorrent
, "", 0)); , "", 0));
} }
bool tracker_manager::incoming_packet(error_code const& e bool tracker_manager::incoming_packet(udp::endpoint const& ep
, udp::endpoint const& ep, char const* buf, int size) , char const* buf, int size)
{ {
// ignore packets smaller than 8 bytes // ignore packets smaller than 8 bytes
if (size < 8) if (size < 8)
@ -343,11 +345,19 @@ namespace libtorrent
boost::shared_ptr<tracker_connection> p = i->second; boost::shared_ptr<tracker_connection> p = i->second;
// on_receive() may remove the tracker connection from the list // on_receive() may remove the tracker connection from the list
return p->on_receive(e, ep, buf, size); return p->on_receive(ep, buf, size);
} }
bool tracker_manager::incoming_packet(error_code const& e void tracker_manager::incoming_error(error_code const& ec
, char const* hostname, char const* buf, int size) , udp::endpoint const& ep)
{
TORRENT_UNUSED(ec);
TORRENT_UNUSED(ep);
// TODO: 2 implement
}
bool tracker_manager::incoming_packet(char const* hostname
, char const* buf, int size)
{ {
// ignore packets smaller than 8 bytes // ignore packets smaller than 8 bytes
if (size < 16) return false; if (size < 16) return false;
@ -374,13 +384,26 @@ namespace libtorrent
boost::shared_ptr<tracker_connection> p = i->second; boost::shared_ptr<tracker_connection> p = i->second;
// on_receive() may remove the tracker connection from the list // on_receive() may remove the tracker connection from the list
return p->on_receive_hostname(e, hostname, buf, size); return p->on_receive_hostname(hostname, buf, size);
}
void tracker_manager::send_hostname(char const* hostname, int const port
, array_view<char const> p, error_code& ec, int const flags)
{
m_send_fun_hostname(hostname, port, p, ec, flags);
}
void tracker_manager::send(udp::endpoint const& ep
, array_view<char const> p
, error_code& ec, int const flags)
{
m_send_fun(ep, p, ec, flags);
} }
void tracker_manager::abort_all_requests(bool all) void tracker_manager::abort_all_requests(bool all)
{ {
// removes all connections except 'event=stopped'-requests // removes all connections except 'event=stopped'-requests
mutex_t::scoped_lock l(m_mutex); mutex::scoped_lock l(m_mutex);
m_abort = true; m_abort = true;
http_conns_t close_http_connections; http_conns_t close_http_connections;
@ -434,13 +457,13 @@ namespace libtorrent
bool tracker_manager::empty() const bool tracker_manager::empty() const
{ {
mutex_t::scoped_lock l(m_mutex); mutex::scoped_lock l(m_mutex);
return m_http_conns.empty() && m_udp_conns.empty(); return m_http_conns.empty() && m_udp_conns.empty();
} }
int tracker_manager::num_requests() const int tracker_manager::num_requests() const
{ {
mutex_t::scoped_lock l(m_mutex); mutex::scoped_lock l(m_mutex);
return m_http_conns.size() + m_udp_conns.size(); return m_http_conns.size() + m_udp_conns.size();
} }
} }

File diff suppressed because it is too large Load Diff

View File

@ -319,19 +319,19 @@ namespace libtorrent
tracker_connection::close(); tracker_connection::close();
} }
bool udp_tracker_connection::on_receive_hostname(error_code const& e bool udp_tracker_connection::on_receive_hostname(char const* hostname
, char const* hostname, char const* buf, int size) , char const* buf, int size)
{ {
TORRENT_UNUSED(hostname); TORRENT_UNUSED(hostname);
// just ignore the hostname this came from, pretend that // just ignore the hostname this came from, pretend that
// it's from the same endpoint we sent it to (i.e. the same // it's from the same endpoint we sent it to (i.e. the same
// port). We have so many other ways of confirming this packet // port). We have so many other ways of confirming this packet
// comes from the tracker anyway, so it's not a big deal // comes from the tracker anyway, so it's not a big deal
return on_receive(e, m_target, buf, size); return on_receive(m_target, buf, size);
} }
bool udp_tracker_connection::on_receive(error_code const& e bool udp_tracker_connection::on_receive(udp::endpoint const& ep
, udp::endpoint const& ep, char const* buf, int size) , char const* buf, int const size)
{ {
#ifndef TORRENT_DISABLE_LOGGING #ifndef TORRENT_DISABLE_LOGGING
boost::shared_ptr<request_callback> cb = requester(); boost::shared_ptr<request_callback> cb = requester();
@ -369,8 +369,6 @@ namespace libtorrent
return false; return false;
} }
if (e) fail(e);
#ifndef TORRENT_DISABLE_LOGGING #ifndef TORRENT_DISABLE_LOGGING
if (cb) cb->debug_log("<== UDP_TRACKER_PACKET [ size: %d ]", size); if (cb) cb->debug_log("<== UDP_TRACKER_PACKET [ size: %d ]", size);
#endif #endif
@ -499,13 +497,13 @@ namespace libtorrent
error_code ec; error_code ec;
if (!m_hostname.empty()) if (!m_hostname.empty())
{ {
m_man.get_udp_socket().send_hostname(m_hostname.c_str() m_man.send_hostname(m_hostname.c_str()
, m_target.port(), buf, 16, ec , m_target.port(), aux::array_view<char const>(buf, 16), ec
, udp_socket::tracker_connection); , udp_socket::tracker_connection);
} }
else else
{ {
m_man.get_udp_socket().send(m_target, buf, 16, ec m_man.send(m_target, aux::array_view<char const>(buf, 16), ec
, udp_socket::tracker_connection); , udp_socket::tracker_connection);
} }
@ -563,12 +561,12 @@ namespace libtorrent
error_code ec; error_code ec;
if (!m_hostname.empty()) if (!m_hostname.empty())
{ {
m_man.get_udp_socket().send_hostname(m_hostname.c_str(), m_target.port() m_man.send_hostname(m_hostname.c_str(), m_target.port()
, buf, sizeof(buf), ec, udp_socket::tracker_connection); , aux::array_view<char const>(buf), ec, udp_socket::tracker_connection);
} }
else else
{ {
m_man.get_udp_socket().send(m_target, buf, sizeof(buf), ec m_man.send(m_target, aux::array_view<char const>(buf), ec
, udp_socket::tracker_connection); , udp_socket::tracker_connection);
} }
m_state = action_scrape; m_state = action_scrape;
@ -762,13 +760,13 @@ namespace libtorrent
if (!m_hostname.empty()) if (!m_hostname.empty())
{ {
m_man.get_udp_socket().send_hostname(m_hostname.c_str() m_man.send_hostname(m_hostname.c_str()
, m_target.port(), buf, out - buf, ec , m_target.port(), aux::array_view<char const>(buf, out - buf), ec
, udp_socket::tracker_connection); , udp_socket::tracker_connection);
} }
else else
{ {
m_man.get_udp_socket().send(m_target, buf, out - buf, ec m_man.send(m_target, aux::array_view<char const>(buf, out - buf), ec
, udp_socket::tracker_connection); , udp_socket::tracker_connection);
} }
m_state = action_announce; m_state = action_announce;

View File

@ -40,18 +40,22 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/random.hpp" #include "libtorrent/random.hpp"
#include "libtorrent/performance_counters.hpp" #include "libtorrent/performance_counters.hpp"
#include "libtorrent/aux_/time.hpp" // for aux::time_now() #include "libtorrent/aux_/time.hpp" // for aux::time_now()
#include "libtorrent/aux_/array_view.hpp"
// #define TORRENT_DEBUG_MTU 1135 // #define TORRENT_DEBUG_MTU 1135
namespace libtorrent namespace libtorrent
{ {
using namespace libtorrent::aux;
utp_socket_manager::utp_socket_manager(aux::session_settings const& sett utp_socket_manager::utp_socket_manager(
, udp_socket& s send_fun_t const& send_fun
, incoming_utp_callback_t const& cb
, io_service& ios
, aux::session_settings const& sett
, counters& cnt , counters& cnt
, void* ssl_context , void* ssl_context)
, incoming_utp_callback_t cb) : m_send_fun(send_fun)
: m_sock(s)
, m_cb(cb) , m_cb(cb)
, m_last_socket(0) , m_last_socket(0)
, m_new_connection(-1) , m_new_connection(-1)
@ -60,6 +64,7 @@ namespace libtorrent
, m_last_if_update(min_time()) , m_last_if_update(min_time())
, m_sock_buf_size(0) , m_sock_buf_size(0)
, m_counters(cnt) , m_counters(cnt)
, m_ios(ios)
, m_mtu_idx(0) , m_mtu_idx(0)
, m_ssl_context(ssl_context) , m_ssl_context(ssl_context)
{ {
@ -94,7 +99,6 @@ namespace libtorrent
void utp_socket_manager::mtu_for_dest(address const& addr, int& link_mtu, int& utp_mtu) void utp_socket_manager::mtu_for_dest(address const& addr, int& link_mtu, int& utp_mtu)
{ {
int mtu = 0; int mtu = 0;
if (is_teredo(addr)) mtu = TORRENT_TEREDO_MTU; if (is_teredo(addr)) mtu = TORRENT_TEREDO_MTU;
else mtu = TORRENT_ETHERNET_MTU; else mtu = TORRENT_ETHERNET_MTU;
@ -118,13 +122,12 @@ namespace libtorrent
mtu -= TORRENT_UDP_HEADER; mtu -= TORRENT_UDP_HEADER;
if (m_sock.get_proxy_settings().type == settings_pack::socks5 if (m_sett.get_int(settings_pack::proxy_type) == settings_pack::socks5
|| m_sock.get_proxy_settings().type == settings_pack::socks5_pw) || m_sett.get_int(settings_pack::proxy_type) == settings_pack::socks5_pw)
{ {
// this is for the IP layer // this is for the IP layer
address proxy_addr = m_sock.proxy_addr().address(); // assume the proxy is running over IPv4
if (proxy_addr.is_v4()) mtu -= TORRENT_IPV4_HEADER; mtu -= TORRENT_IPV4_HEADER;
else mtu -= TORRENT_IPV6_HEADER;
// this is for the SOCKS layer // this is for the SOCKS layer
mtu -= TORRENT_SOCKS5_HEADER; mtu -= TORRENT_SOCKS5_HEADER;
@ -148,102 +151,20 @@ namespace libtorrent
#if !defined TORRENT_HAS_DONT_FRAGMENT && !defined TORRENT_DEBUG_MTU #if !defined TORRENT_HAS_DONT_FRAGMENT && !defined TORRENT_DEBUG_MTU
TORRENT_UNUSED(flags); TORRENT_UNUSED(flags);
#endif #endif
if (!m_sock.is_open())
{
ec = boost::asio::error::operation_aborted;
return;
}
#ifdef TORRENT_DEBUG_MTU #ifdef TORRENT_DEBUG_MTU
// drop packets that exceed the debug MTU // drop packets that exceed the debug MTU
if ((flags & dont_fragment) && len > TORRENT_DEBUG_MTU) return; if ((flags & dont_fragment) && len > TORRENT_DEBUG_MTU) return;
#endif #endif
#ifdef TORRENT_HAS_DONT_FRAGMENT m_send_fun(ep, array_view<char const>(p, len), ec
error_code tmp; , ((flags & dont_fragment) ? udp_socket::dont_fragment : 0)
if (flags & utp_socket_manager::dont_fragment) | udp_socket::peer_connection);
{
m_sock.set_option(libtorrent::dont_fragment(true), tmp);
TORRENT_ASSERT_VAL(!tmp, tmp.message());
}
#endif
m_sock.send(ep, p, len, ec, udp_socket::peer_connection);
#ifdef TORRENT_HAS_DONT_FRAGMENT
if (flags & utp_socket_manager::dont_fragment)
{
m_sock.set_option(libtorrent::dont_fragment(false), tmp);
TORRENT_ASSERT_VAL(!tmp, tmp.message());
}
#endif
} }
int utp_socket_manager::local_port(error_code& ec) const bool utp_socket_manager::incoming_packet(udp::endpoint const& ep
, char const* p, int const size)
{ {
return m_sock.local_endpoint(ec).port();
}
tcp::endpoint utp_socket_manager::local_endpoint(address const& remote, error_code& ec) const
{
tcp::endpoint socket_ep = m_sock.local_endpoint(ec);
// first enumerate the routes in the routing table
if (aux::time_now() - seconds(60) > m_last_route_update)
{
m_last_route_update = aux::time_now();
error_code err;
m_routes = enum_routes(m_sock.get_io_service(), err);
if (err) return socket_ep;
}
if (m_routes.empty()) return socket_ep;
// then find the best match
ip_route* best = &m_routes[0];
for (std::vector<ip_route>::iterator i = m_routes.begin()
, end(m_routes.end()); i != end; ++i)
{
if (is_any(i->destination) && i->destination.is_v4() == remote.is_v4())
{
best = &*i;
break;
}
if (match_addr_mask(remote, i->destination, i->netmask))
{
best = &*i;
break;
}
}
// best now tells us which interface we would send over
// for this target. Now figure out what the local address
// is for that interface
if (aux::time_now() - seconds(60) > m_last_if_update)
{
m_last_if_update = aux::time_now();
error_code err;
m_interfaces = enum_net_interfaces(m_sock.get_io_service(), err);
if (err) return socket_ep;
}
for (std::vector<ip_interface>::iterator i = m_interfaces.begin()
, end(m_interfaces.end()); i != end; ++i)
{
if (i->interface_address.is_v4() != remote.is_v4())
continue;
if (strcmp(best->name, i->name) == 0)
return tcp::endpoint(i->interface_address, socket_ep.port());
}
return socket_ep;
}
bool utp_socket_manager::incoming_packet(error_code const& ec, udp::endpoint const& ep
, char const* p, int size)
{
// TODO: 2 we may want to take ec into account here. possibly close
// connections quicker
TORRENT_UNUSED(ec);
// UTP_LOGV("incoming packet size:%d\n", size); // UTP_LOGV("incoming packet size:%d\n", size);
if (size < int(sizeof(utp_header))) return false; if (size < int(sizeof(utp_header))) return false;
@ -294,14 +215,14 @@ namespace libtorrent
// UTP_LOGV("not found, new connection id:%d\n", m_new_connection); // UTP_LOGV("not found, new connection id:%d\n", m_new_connection);
boost::shared_ptr<socket_type> c(new (std::nothrow) socket_type(m_sock.get_io_service())); boost::shared_ptr<socket_type> c(new (std::nothrow) socket_type(m_ios));
if (!c) return false; if (!c) return false;
TORRENT_ASSERT(m_new_connection == -1); TORRENT_ASSERT(m_new_connection == -1);
// create the new socket with this ID // create the new socket with this ID
m_new_connection = id; m_new_connection = id;
instantiate_connection(m_sock.get_io_service(), aux::proxy_settings(), *c instantiate_connection(m_ios, aux::proxy_settings(), *c
, m_ssl_context, this, true, false); , m_ssl_context, this, true, false);
@ -397,27 +318,6 @@ namespace libtorrent
m_utp_sockets.erase(i); m_utp_sockets.erase(i);
} }
void utp_socket_manager::set_sock_buf(int size)
{
if (size < m_sock_buf_size) return;
m_sock.set_buf_size(size);
error_code ec;
// add more socket buffer storage on the lower level socket
// to avoid dropping packets because of a full receive buffer
// while processing a packet
// only update the buffer size if it's bigger than
// what we already have
udp::socket::receive_buffer_size recv_buf_size_opt;
m_sock.get_option(recv_buf_size_opt, ec);
if (recv_buf_size_opt.value() < size * 10)
{
m_sock.set_option(udp::socket::receive_buffer_size(size * 10), ec);
m_sock.set_option(udp::socket::send_buffer_size(size * 3), ec);
}
m_sock_buf_size = size;
}
void utp_socket_manager::inc_stats_counter(int counter, int delta) void utp_socket_manager::inc_stats_counter(int counter, int delta)
{ {
TORRENT_ASSERT((counter >= counters::utp_packet_loss TORRENT_ASSERT((counter >= counters::utp_packet_loss

View File

@ -460,9 +460,6 @@ public:
// the address of the remote endpoint // the address of the remote endpoint
address m_remote_address; address m_remote_address;
// the local address
address m_local_address;
// the send and receive buffers // the send and receive buffers
// maps packet sequence numbers // maps packet sequence numbers
packet_buffer<packet> m_inbuf; packet_buffer<packet> m_inbuf;
@ -887,9 +884,8 @@ utp_stream::endpoint_type utp_stream::local_endpoint(error_code& ec) const
if (m_impl == 0 || m_impl->m_sm == 0) if (m_impl == 0 || m_impl->m_sm == 0)
{ {
ec = boost::asio::error::not_connected; ec = boost::asio::error::not_connected;
return endpoint_type();
} }
return tcp::endpoint(m_impl->m_local_address, m_impl->m_sm->local_port(ec)); return endpoint_type();
} }
utp_stream::~utp_stream() utp_stream::~utp_stream()
@ -1192,9 +1188,6 @@ void utp_stream::do_connect(tcp::endpoint const& ep)
m_impl->m_connect_handler = true; m_impl->m_connect_handler = true;
error_code ec;
m_impl->m_local_address = m_impl->m_sm->local_endpoint(m_impl->m_remote_address, ec).address();
if (m_impl->test_socket_state()) return; if (m_impl->test_socket_state()) return;
m_impl->send_syn(); m_impl->send_syn();
} }
@ -3089,9 +3082,6 @@ bool utp_socket_impl::incoming_packet(boost::uint8_t const* buf, int size
m_remote_address = ep.address(); m_remote_address = ep.address();
m_port = ep.port(); m_port = ep.port();
error_code ec;
m_local_address = m_sm->local_endpoint(m_remote_address, ec).address();
m_ack_nr = ph->seq_nr; m_ack_nr = ph->seq_nr;
m_seq_nr = random() & 0xffff; m_seq_nr = random() & 0xffff;
m_acked_seq_nr = (m_seq_nr - 1) & ACK_MASK; m_acked_seq_nr = (m_seq_nr - 1) & ACK_MASK;

View File

@ -431,7 +431,9 @@ boost::shared_ptr<torrent_info> setup_peer(tcp::socket& s, sha1_hash& ih
// wait for the torrent to be ready // wait for the torrent to be ready
wait_for_downloading(*ses, "ses"); wait_for_downloading(*ses, "ses");
s.connect(tcp::endpoint(address::from_string("127.0.0.1", ec), ses->listen_port()), ec); int const port = ses->listen_port();
fprintf(stderr, "listen port: %d\n", port);
s.connect(tcp::endpoint(address::from_string("127.0.0.1", ec), port), ec);
if (ec) TEST_ERROR(ec.message()); if (ec) TEST_ERROR(ec.message());
print_session_log(*ses); print_session_log(*ses);