From 8c3f9571de0bd5c68f9537c2b1cdd09b8f6a7415 Mon Sep 17 00:00:00 2001 From: Arvid Norberg Date: Sun, 9 Dec 2007 04:15:24 +0000 Subject: [PATCH] socks5 support for udp messages --- Jamfile | 1 + include/Makefile.am | 1 + include/libtorrent/aux_/session_impl.hpp | 9 +- include/libtorrent/kademlia/dht_tracker.hpp | 23 +- include/libtorrent/udp_socket.hpp | 104 ++++++ src/Makefile.am | 2 +- src/kademlia/dht_tracker.cpp | 99 ++--- src/session_impl.cpp | 27 +- src/udp_socket.cpp | 377 ++++++++++++++++++++ 9 files changed, 549 insertions(+), 94 deletions(-) create mode 100644 include/libtorrent/udp_socket.hpp create mode 100644 src/udp_socket.cpp diff --git a/Jamfile b/Jamfile index 96ad7b9e4..eccdeb552 100755 --- a/Jamfile +++ b/Jamfile @@ -234,6 +234,7 @@ SOURCES = udp_tracker_connection sha1 metadata_transfer + udp_socket upnp ut_pex ut_metadata diff --git a/include/Makefile.am b/include/Makefile.am index 7fe59a4fd..8aa322dbd 100644 --- a/include/Makefile.am +++ b/include/Makefile.am @@ -60,6 +60,7 @@ libtorrent/torrent_handle.hpp \ libtorrent/torrent_info.hpp \ libtorrent/tracker_manager.hpp \ libtorrent/udp_tracker_connection.hpp \ +libtorrent/udp_socket.hpp \ libtorrent/utf8.hpp \ libtorrent/upnp.hpp \ libtorrent/xml_parse.hpp \ diff --git a/include/libtorrent/aux_/session_impl.hpp b/include/libtorrent/aux_/session_impl.hpp index cf627c70b..3f0a714c6 100644 --- a/include/libtorrent/aux_/session_impl.hpp +++ b/include/libtorrent/aux_/session_impl.hpp @@ -334,7 +334,10 @@ namespace libtorrent #ifndef TORRENT_DISABLE_DHT void set_dht_proxy(proxy_settings const& s) - { m_dht_proxy = s; } + { + m_dht_proxy = s; + m_dht_socket.set_proxy_settings(s); + } proxy_settings const& dht_proxy() const { return m_dht_proxy; } #endif @@ -542,6 +545,10 @@ namespace libtorrent // see m_external_listen_port. This is the same // but for the udp port used by the DHT. int m_external_udp_port; + + udp_socket m_dht_socket; + + void on_receive_udp(udp::endpoint const& ep, char const* buf, int len); #endif #ifndef TORRENT_DISABLE_ENCRYPTION diff --git a/include/libtorrent/kademlia/dht_tracker.hpp b/include/libtorrent/kademlia/dht_tracker.hpp index c032c19d1..21a3c2f28 100644 --- a/include/libtorrent/kademlia/dht_tracker.hpp +++ b/include/libtorrent/kademlia/dht_tracker.hpp @@ -52,6 +52,7 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/kademlia/packet_iterator.hpp" #include "libtorrent/session_settings.hpp" #include "libtorrent/session_status.hpp" +#include "libtorrent/udp_socket.hpp" namespace libtorrent { namespace dht { @@ -69,16 +70,14 @@ namespace libtorrent { namespace dht { friend void intrusive_ptr_add_ref(dht_tracker const*); friend void intrusive_ptr_release(dht_tracker const*); - dht_tracker(asio::io_service& ios, dht_settings const& settings - , asio::ip::address listen_interface, entry const& bootstrap); + dht_tracker(udp_socket& sock, dht_settings const& settings + , entry const& bootstrap); void stop(); void add_node(udp::endpoint node); void add_node(std::pair const& node); void add_router_node(std::pair const& node); - void rebind(asio::ip::address listen_interface, int listen_port); - entry state() const; void announce(sha1_hash const& ih, int listen_port @@ -87,6 +86,10 @@ namespace libtorrent { namespace dht void dht_status(session_status& s); + // translate bittorrent kademlia message into the generic kademlia message + // used by the library + void on_receive(udp::endpoint const& ep, char const* pkt, int size); + private: boost::intrusive_ptr self() @@ -100,22 +103,12 @@ namespace libtorrent { namespace dht void refresh_timeout(asio::error_code const& e); void tick(asio::error_code const& e); - // translate bittorrent kademlia message into the generic kademlia message - // used by the library - void on_receive(asio::error_code const& error, size_t bytes_transferred); void on_bootstrap(); void send_packet(msg const& m); - asio::strand m_strand; - asio::ip::udp::socket m_socket; - node_impl m_dht; + udp_socket& m_sock; - // this is the index of the receive buffer we are currently receiving to - // the other buffer is the one containing the last message - int m_buffer; - std::vector m_in_buf[2]; - udp::endpoint m_remote_endpoint[2]; std::vector m_send_buf; ptime m_last_new_key; diff --git a/include/libtorrent/udp_socket.hpp b/include/libtorrent/udp_socket.hpp new file mode 100644 index 000000000..2fd3143d1 --- /dev/null +++ b/include/libtorrent/udp_socket.hpp @@ -0,0 +1,104 @@ +/* + +Copyright (c) 2007, Arvid Norberg +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in + the documentation and/or other materials provided with the distribution. + * Neither the name of the author nor the names of its + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. + +*/ + +#ifndef TORRENT_UDP_SOCKET_HPP_INCLUDED +#define TORRENT_UDP_SOCKET_HPP_INCLUDED + +#include "libtorrent/socket.hpp" +#include "libtorrent/session_settings.hpp" + +#include +#include + +namespace libtorrent +{ + class connection_queue; + + class udp_socket + { + public: + typedef boost::function callback_t; + + udp_socket(asio::io_service& ios, callback_t const& c, connection_queue& cc); + + bool is_open() const { return m_ipv4_sock.is_open() || m_ipv6_sock.is_open(); } + asio::io_service& get_io_service() { return m_ipv4_sock.get_io_service(); } + + void send(udp::endpoint const& ep, char const* p, int len); + void bind(int port); + void close(); + int local_port() const { return m_bind_port; } + + void set_proxy_settings(proxy_settings const& ps); + proxy_settings const& get_proxy_settings() { return m_proxy_settings; } + + private: + + callback_t m_callback; + + void on_read(udp::socket* sock, asio::error_code const& e, std::size_t bytes_transferred); + void on_name_lookup(asio::error_code const& e, tcp::resolver::iterator i); + void on_timeout(); + void on_connect(int ticket); + void on_connected(asio::error_code const& ec); + void handshake1(asio::error_code const& e); + void handshake2(asio::error_code const& e); + void handshake3(asio::error_code const& e); + void handshake4(asio::error_code const& e); + void socks_forward_udp(); + void connect1(asio::error_code const& e); + void connect2(asio::error_code const& e); + + void wrap(udp::endpoint const& ep, char const* p, int len); + void unwrap(char const* buf, int size); + + udp::socket m_ipv4_sock; + udp::socket m_ipv6_sock; + udp::endpoint m_v4_ep; + udp::endpoint m_v6_ep; + char m_v4_buf[1600]; + char m_v6_buf[1600]; + int m_bind_port; + + tcp::socket m_socks5_sock; + int m_connection_ticket; + proxy_settings m_proxy_settings; + connection_queue& m_cc; + tcp::resolver m_resolver; + char m_tmp_buf[100]; + bool m_tunnel_packets; + udp::endpoint m_proxy_addr; + }; +} + +#endif + diff --git a/src/Makefile.am b/src/Makefile.am index 1e9e9f923..5f8fe560a 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -22,7 +22,7 @@ http_tracker_connection.cpp udp_tracker_connection.cpp \ alert.cpp identify_client.cpp ip_filter.cpp file.cpp metadata_transfer.cpp \ logger.cpp file_pool.cpp ut_pex.cpp lsd.cpp upnp.cpp instantiate_connection.cpp \ socks5_stream.cpp socks4_stream.cpp http_stream.cpp connection_queue.cpp \ -disk_io_thread.cpp ut_metadata.cpp magnet_uri.cpp \ +disk_io_thread.cpp ut_metadata.cpp magnet_uri.cpp udp_socket.cpp \ $(kademlia_sources) noinst_HEADERS = \ diff --git a/src/kademlia/dht_tracker.cpp b/src/kademlia/dht_tracker.cpp index 1a074bc20..cdddceefe 100644 --- a/src/kademlia/dht_tracker.cpp +++ b/src/kademlia/dht_tracker.cpp @@ -145,26 +145,22 @@ namespace libtorrent { namespace dht // class that puts the networking and the kademlia node in a single // unit and connecting them together. - dht_tracker::dht_tracker(asio::io_service& ios, dht_settings const& settings - , asio::ip::address listen_interface, entry const& bootstrap) - : m_strand(ios) - , m_socket(ios, udp::endpoint(listen_interface, settings.service_port)) - , m_dht(bind(&dht_tracker::send_packet, this, _1), settings + dht_tracker::dht_tracker(udp_socket& sock, dht_settings const& settings + , entry const& bootstrap) + : m_dht(bind(&dht_tracker::send_packet, this, _1), settings , read_id(bootstrap)) - , m_buffer(0) + , m_sock(sock) , m_last_new_key(time_now() - minutes(key_refresh)) - , m_timer(ios) - , m_connection_timer(ios) - , m_refresh_timer(ios) + , m_timer(sock.get_io_service()) + , m_connection_timer(sock.get_io_service()) + , m_refresh_timer(sock.get_io_service()) , m_settings(settings) , m_refresh_bucket(160) - , m_host_resolver(ios) + , m_host_resolver(sock.get_io_service()) , m_refs(0) { using boost::bind; - m_in_buf[0].resize(1000); - m_in_buf[1].resize(1000); #ifdef TORRENT_DHT_VERBOSE_LOGGING m_counter = 0; std::fill_n(m_replies_bytes_sent, 5, 0); @@ -202,18 +198,15 @@ namespace libtorrent { namespace dht } catch (std::exception&) {} } - m_socket.async_receive_from(asio::buffer(&m_in_buf[m_buffer][0] - , m_in_buf[m_buffer].size()), m_remote_endpoint[m_buffer] - , m_strand.wrap(bind(&dht_tracker::on_receive, self(), _1, _2))); m_timer.expires_from_now(seconds(1)); - m_timer.async_wait(m_strand.wrap(bind(&dht_tracker::tick, self(), _1))); + m_timer.async_wait(bind(&dht_tracker::tick, self(), _1)); m_connection_timer.expires_from_now(seconds(10)); - m_connection_timer.async_wait(m_strand.wrap( - bind(&dht_tracker::connection_timeout, self(), _1))); + m_connection_timer.async_wait( + bind(&dht_tracker::connection_timeout, self(), _1)); m_refresh_timer.expires_from_now(seconds(5)); - m_refresh_timer.async_wait(m_strand.wrap(bind(&dht_tracker::refresh_timeout, self(), _1))); + m_refresh_timer.async_wait(bind(&dht_tracker::refresh_timeout, self(), _1)); m_dht.bootstrap(initial_nodes, bind(&dht_tracker::on_bootstrap, self())); } @@ -223,7 +216,6 @@ namespace libtorrent { namespace dht m_timer.cancel(); m_connection_timer.cancel(); m_refresh_timer.cancel(); - m_socket.close(); m_host_resolver.cancel(); } @@ -238,10 +230,9 @@ namespace libtorrent { namespace dht try { if (e) return; - if (!m_socket.is_open()) return; time_duration d = m_dht.connection_timeout(); m_connection_timer.expires_from_now(d); - m_connection_timer.async_wait(m_strand.wrap(bind(&dht_tracker::connection_timeout, self(), _1))); + m_connection_timer.async_wait(bind(&dht_tracker::connection_timeout, self(), _1)); } catch (std::exception& exc) { @@ -256,35 +247,22 @@ namespace libtorrent { namespace dht try { if (e) return; - if (!m_socket.is_open()) return; time_duration d = m_dht.refresh_timeout(); m_refresh_timer.expires_from_now(d); - m_refresh_timer.async_wait(m_strand.wrap( - bind(&dht_tracker::refresh_timeout, self(), _1))); + m_refresh_timer.async_wait( + bind(&dht_tracker::refresh_timeout, self(), _1)); } catch (std::exception&) { TORRENT_ASSERT(false); }; - void dht_tracker::rebind(asio::ip::address listen_interface, int listen_port) - { - m_socket.close(); - udp::endpoint ep(listen_interface, listen_port); - m_socket.open(ep.protocol()); - m_socket.bind(ep); - m_socket.async_receive_from(asio::buffer(&m_in_buf[m_buffer][0] - , m_in_buf[m_buffer].size()), m_remote_endpoint[m_buffer] - , m_strand.wrap(bind(&dht_tracker::on_receive, self(), _1, _2))); - } - void dht_tracker::tick(asio::error_code const& e) try { if (e) return; - if (!m_socket.is_open()) return; m_timer.expires_from_now(minutes(tick_period)); - m_timer.async_wait(m_strand.wrap(bind(&dht_tracker::tick, self(), _1))); + m_timer.async_wait(bind(&dht_tracker::tick, self(), _1)); ptime now = time_now(); if (now - m_last_new_key > minutes(key_refresh)) @@ -391,26 +369,15 @@ namespace libtorrent { namespace dht // translate bittorrent kademlia message into the generice kademlia message // used by the library - void dht_tracker::on_receive(asio::error_code const& error, size_t bytes_transferred) + void dht_tracker::on_receive(udp::endpoint const& ep, char const* buf, int bytes_transferred) try { - if (error == asio::error::operation_aborted) return; - if (!m_socket.is_open()) return; - - int current_buffer = m_buffer; - m_buffer = (m_buffer + 1) & 1; - m_socket.async_receive_from(asio::buffer(&m_in_buf[m_buffer][0] - , m_in_buf[m_buffer].size()), m_remote_endpoint[m_buffer] - , m_strand.wrap(bind(&dht_tracker::on_receive, self(), _1, _2))); - - if (error) return; - node_ban_entry* match = 0; node_ban_entry* min = m_ban_nodes; ptime now = time_now(); for (node_ban_entry* i = m_ban_nodes; i < m_ban_nodes + num_ban_nodes; ++i) { - if (i->src == m_remote_endpoint[current_buffer]) + if (i->src == ep) { match = i; break; @@ -429,8 +396,7 @@ namespace libtorrent { namespace dht if (match->count == 20) { TORRENT_LOG(dht_tracker) << time_now_string() << " BANNING PEER [ ip: " - << m_remote_endpoint[current_buffer] << " | " - "time: " << total_seconds((now - match->limit) + seconds(5)) + << ep << " | time: " << total_milliseconds((now - match->limit) + seconds(5)) / 1000.f << " | count: " << match->count << " ]"; } #endif @@ -450,7 +416,7 @@ namespace libtorrent { namespace dht { min->count = 1; min->limit = now + seconds(5); - min->src = m_remote_endpoint[current_buffer]; + min->src = ep; } #ifdef TORRENT_DHT_VERBOSE_LOGGING @@ -465,17 +431,16 @@ namespace libtorrent { namespace dht TORRENT_ASSERT(bytes_transferred > 0); - entry e = bdecode(m_in_buf[current_buffer].begin() - , m_in_buf[current_buffer].end()); + entry e = bdecode(buf, buf + bytes_transferred); #ifdef TORRENT_DHT_VERBOSE_LOGGING TORRENT_LOG(dht_tracker) << time_now_string() << " RECEIVED [" - << m_remote_endpoint[current_buffer] << "]:"; + << ep << "]:"; #endif libtorrent::dht::msg m; m.message_id = 0; - m.addr = m_remote_endpoint[current_buffer]; + m.addr = ep; m.transaction_id = e["t"].string(); #ifdef TORRENT_DHT_VERBOSE_LOGGING @@ -712,9 +677,7 @@ namespace libtorrent { namespace dht catch (std::exception& e) { #ifdef TORRENT_DHT_VERBOSE_LOGGING - int current_buffer = (m_buffer + 1) & 1; - std::string msg(m_in_buf[current_buffer].begin() - , m_in_buf[current_buffer].begin() + bytes_transferred); + std::string msg(buf, buf + bytes_transferred); TORRENT_LOG(dht_tracker) << "invalid incoming packet: " << e.what() << "\n" << msg << "\n"; #endif @@ -764,15 +727,14 @@ namespace libtorrent { namespace dht void dht_tracker::add_node(std::pair const& node) { udp::resolver::query q(node.first, lexical_cast(node.second)); - m_host_resolver.async_resolve(q, m_strand.wrap( - bind(&dht_tracker::on_name_lookup, self(), _1, _2))); + m_host_resolver.async_resolve(q, + bind(&dht_tracker::on_name_lookup, self(), _1, _2)); } void dht_tracker::on_name_lookup(asio::error_code const& e , udp::resolver::iterator host) try { if (e || host == udp::resolver::iterator()) return; - if (!m_socket.is_open()) return; add_node(host->endpoint()); } catch (std::exception&) @@ -783,15 +745,14 @@ namespace libtorrent { namespace dht void dht_tracker::add_router_node(std::pair const& node) { udp::resolver::query q(node.first, lexical_cast(node.second)); - m_host_resolver.async_resolve(q, m_strand.wrap( - bind(&dht_tracker::on_router_name_lookup, self(), _1, _2))); + m_host_resolver.async_resolve(q, + bind(&dht_tracker::on_router_name_lookup, self(), _1, _2)); } void dht_tracker::on_router_name_lookup(asio::error_code const& e , udp::resolver::iterator host) try { if (e || host == udp::resolver::iterator()) return; - if (!m_socket.is_open()) return; m_dht.add_router_node(host->endpoint()); } catch (std::exception&) @@ -989,9 +950,7 @@ namespace libtorrent { namespace dht m_send_buf.clear(); bencode(std::back_inserter(m_send_buf), e); asio::error_code ec; - m_socket.send_to(asio::buffer(&m_send_buf[0] - , (int)m_send_buf.size()), m.addr, 0, ec); - if (ec) return; + m_sock.send(m.addr, &m_send_buf[0], (int)m_send_buf.size()); #ifdef TORRENT_DHT_VERBOSE_LOGGING m_total_out_bytes += m_send_buf.size(); diff --git a/src/session_impl.cpp b/src/session_impl.cpp index e28a71978..193291964 100755 --- a/src/session_impl.cpp +++ b/src/session_impl.cpp @@ -571,6 +571,8 @@ namespace detail #ifndef TORRENT_DISABLE_DHT , m_dht_same_port(true) , m_external_udp_port(0) + , m_dht_socket(m_io_service, bind(&session_impl::on_receive_udp, this, _1, _2, _3) + , m_half_open) #endif , m_timer(m_io_service) , m_next_connect_torrent(0) @@ -680,6 +682,7 @@ namespace detail if (m_natpmp) m_natpmp->close(); #ifndef TORRENT_DISABLE_DHT if (m_dht) m_dht->stop(); + m_dht_socket.close(); #endif m_timer.cancel(); @@ -981,6 +984,15 @@ namespace detail } } + void session_impl::on_receive_udp(udp::endpoint const& ep, char const* buf, int len) + { + if (len > 20 && *buf == 'd' && m_dht) + { + // this is probably a dht message + m_dht->on_receive(ep, buf, len); + } + } + void session_impl::async_accept(boost::shared_ptr const& listener) { shared_ptr c(new socket_type); @@ -1898,8 +1910,7 @@ namespace detail if (m_dht_same_port) m_dht_settings.service_port = new_interface.port(); // the listen interface changed, rebind the dht listen socket as well - m_dht->rebind(new_interface.address() - , m_dht_settings.service_port); + m_dht_socket.bind(m_dht_settings.service_port); if (m_natpmp.get()) m_natpmp->set_mappings(0, m_dht_settings.service_port); if (m_upnp.get()) @@ -2062,9 +2073,11 @@ namespace detail m_natpmp->set_mappings(0, m_dht_settings.service_port); if (m_upnp.get()) m_upnp->set_mappings(0, m_dht_settings.service_port); - m_dht = new dht::dht_tracker(m_io_service - , m_dht_settings, m_listen_interface.address() - , startup_state); + m_dht = new dht::dht_tracker(m_dht_socket, m_dht_settings, startup_state); + if (!m_dht_socket.is_open() || m_dht_socket.local_port() != m_dht_settings.service_port) + { + m_dht_socket.bind(m_dht_settings.service_port); + } } void session_impl::stop_dht() @@ -2089,8 +2102,8 @@ namespace detail && settings.service_port != m_dht_settings.service_port && m_dht) { - m_dht->rebind(m_listen_interface.address() - , settings.service_port); + m_dht_socket.bind(settings.service_port); + if (m_natpmp.get()) m_natpmp->set_mappings(0, m_dht_settings.service_port); if (m_upnp.get()) diff --git a/src/udp_socket.cpp b/src/udp_socket.cpp new file mode 100644 index 000000000..3a8b75ea6 --- /dev/null +++ b/src/udp_socket.cpp @@ -0,0 +1,377 @@ +#include "libtorrent/udp_socket.hpp" +#include "libtorrent/connection_queue.hpp" +#include +#include +#include +#include +#include + +using namespace libtorrent; + +udp_socket::udp_socket(asio::io_service& ios, udp_socket::callback_t const& c + , connection_queue& cc) + : m_callback(c) + , m_ipv4_sock(ios) + , m_ipv6_sock(ios) + , m_bind_port(0) + , m_socks5_sock(ios) + , m_connection_ticket(-1) + , m_cc(cc) + , m_resolver(ios) + , m_tunnel_packets(false) +{ +} + +void udp_socket::send(udp::endpoint const& ep, char const* p, int len) +{ + if (m_tunnel_packets) + { + // send udp packets through SOCKS5 server + wrap(ep, p, len); + return; + } + + asio::error_code ec; + if (ep.address().is_v4() && m_ipv4_sock.is_open()) + m_ipv4_sock.send_to(asio::buffer(p, len), ep, 0, ec); + else + m_ipv6_sock.send_to(asio::buffer(p, len), ep, 0, ec); +} + +void udp_socket::on_read(udp::socket* s, asio::error_code const& e, std::size_t bytes_transferred) +{ + if (e) return; + if (!m_callback) return; + + if (s == &m_ipv4_sock) + { +#ifndef BOOST_NO_EXCEPTIONS + try { +#endif + + if (m_tunnel_packets && m_v4_ep == m_proxy_addr) + unwrap(m_v4_buf, bytes_transferred); + else + m_callback(m_v4_ep, m_v4_buf, bytes_transferred); + +#ifndef BOOST_NO_EXCEPTIONS + } catch(std::exception&) {} +#endif + s->async_receive_from(asio::buffer(m_v4_buf, sizeof(m_v4_buf)) + , m_v4_ep, boost::bind(&udp_socket::on_read, this, s, _1, _2)); + } + else + { +#ifndef BOOST_NO_EXCEPTIONS + try { +#endif + + if (m_tunnel_packets && m_v6_ep == m_proxy_addr) + unwrap(m_v6_buf, bytes_transferred); + else + m_callback(m_v6_ep, m_v6_buf, bytes_transferred); + +#ifndef BOOST_NO_EXCEPTIONS + } catch(std::exception&) {} +#endif + s->async_receive_from(asio::buffer(m_v6_buf, sizeof(m_v6_buf)) + , m_v6_ep, boost::bind(&udp_socket::on_read, this, s, _1, _2)); + } +} + +void udp_socket::wrap(udp::endpoint const& ep, char const* p, int len) +{ + using namespace libtorrent::detail; + + char header[20]; + char* h = header; + + write_uint16(0, h); // reserved + write_uint8(0, h); // fragment + write_uint8(ep.address().is_v4()?1:4, h); // atyp + write_address(ep.address(), h); + write_uint16(ep.port(), h); + + boost::array iovec; + iovec[0] = asio::const_buffer(header, h - header); + iovec[1] = asio::const_buffer(p, len); + + asio::error_code ec; + if (m_proxy_addr.address().is_v4() && m_ipv4_sock.is_open()) + m_ipv4_sock.send_to(iovec, m_proxy_addr, 0, ec); + else + m_ipv6_sock.send_to(iovec, m_proxy_addr, 0, ec); +} + +// unwrap the UDP packet from the SOCKS5 header +void udp_socket::unwrap(char const* buf, int size) +{ + using namespace libtorrent::detail; + + // the minimum socks5 header size + if (size <= 10) return; + + char const* p = buf; + p += 2; // reserved + int frag = read_uint8(p); + // fragmentation is not supported + if (frag != 0) return; + + udp::endpoint sender; + + int atyp = read_uint8(p); + if (atyp == 1) + { + // IPv4 + sender.address(address_v4(read_uint32(p))); + sender.port(read_uint16(p)); + } + else if (atyp == 4) + { + // IPv6 + TORRENT_ASSERT(false && "not supported yet"); + + } + else + { + // domain name not supported + return; + } + + m_callback(sender, p, size - (p - buf)); +} + +void udp_socket::close() +{ + m_ipv4_sock.close(); + m_ipv6_sock.close(); + m_socks5_sock.close(); + m_callback.clear(); + if (m_connection_ticket >= 0) + { + m_cc.done(m_connection_ticket); + m_connection_ticket = -1; + } +} + +void udp_socket::bind(int port) +{ + asio::error_code ec; + + if (m_ipv4_sock.is_open()) m_ipv4_sock.close(); + if (m_ipv6_sock.is_open()) m_ipv6_sock.close(); + + m_ipv4_sock.open(udp::v4(), ec); + if (!ec) + { + m_ipv4_sock.bind(udp::endpoint(address_v4::any(), port), ec); + m_ipv4_sock.async_receive_from(asio::buffer(m_v4_buf, sizeof(m_v4_buf)) + , m_v4_ep, boost::bind(&udp_socket::on_read, this, &m_ipv4_sock, _1, _2)); + } + m_ipv6_sock.open(udp::v6(), ec); + if (!ec) + { + m_ipv6_sock.set_option(v6only(true), ec); + m_ipv6_sock.bind(udp::endpoint(address_v6::any(), port), ec); + m_ipv6_sock.async_receive_from(asio::buffer(m_v6_buf, sizeof(m_v6_buf)) + , m_v6_ep, boost::bind(&udp_socket::on_read, this, &m_ipv6_sock, _1, _2)); + } + m_bind_port = port; +} + +void udp_socket::set_proxy_settings(proxy_settings const& ps) +{ + m_socks5_sock.close(); + m_tunnel_packets = false; + + m_proxy_settings = ps; + + if (ps.type == proxy_settings::socks5 + || ps.type == proxy_settings::socks5_pw) + { + // connect to socks5 server and open up the UDP tunnel + tcp::resolver::query q(ps.hostname + , boost::lexical_cast(ps.port)); + m_resolver.async_resolve(q, boost::bind( + &udp_socket::on_name_lookup, this, _1, _2)); + } +} + +void udp_socket::on_name_lookup(asio::error_code const& e, tcp::resolver::iterator i) +{ + if (e) return; + m_proxy_addr.address(i->endpoint().address()); + m_proxy_addr.port(i->endpoint().port()); + m_cc.enqueue(boost::bind(&udp_socket::on_connect, this, _1) + , boost::bind(&udp_socket::on_timeout, this), seconds(10)); +} + +void udp_socket::on_timeout() +{ + m_socks5_sock.close(); + m_connection_ticket = -1; +} + +void udp_socket::on_connect(int ticket) +{ + m_connection_ticket = ticket; + asio::error_code ec; + m_socks5_sock.open(m_proxy_addr.address().is_v4()?tcp::v4():tcp::v6(), ec); + m_socks5_sock.async_connect(tcp::endpoint(m_proxy_addr.address(), m_proxy_addr.port()) + , boost::bind(&udp_socket::on_connected, this, _1)); +} + +void udp_socket::on_connected(asio::error_code const& e) +{ + m_cc.done(m_connection_ticket); + m_connection_ticket = -1; + if (e) return; + + using namespace libtorrent::detail; + + // send SOCKS5 authentication methods + char* p = &m_tmp_buf[0]; + write_uint8(5, p); // SOCKS VERSION 5 + if (m_proxy_settings.username.empty() + || m_proxy_settings.type == proxy_settings::socks5) + { + write_uint8(1, p); // 1 authentication method (no auth) + write_uint8(0, p); // no authentication + } + else + { + write_uint8(2, p); // 2 authentication methods + write_uint8(0, p); // no authentication + write_uint8(2, p); // username/password + } + asio::async_write(m_socks5_sock, asio::buffer(m_tmp_buf, p - m_tmp_buf) + , boost::bind(&udp_socket::handshake1, this, _1)); +} + +void udp_socket::handshake1(asio::error_code const& e) +{ + if (e) return; + + asio::async_read(m_socks5_sock, asio::buffer(m_tmp_buf, 2) + , boost::bind(&udp_socket::handshake2, this, _1)); +} + +void udp_socket::handshake2(asio::error_code const& e) +{ + if (e) return; + + using namespace libtorrent::detail; + + char* p = &m_tmp_buf[0]; + int version = read_uint8(p); + int method = read_uint8(p); + + if (version < 5) return; + + if (method == 0) + { + socks_forward_udp(); + } + else if (method == 2) + { + if (m_proxy_settings.username.empty()) + { + m_socks5_sock.close(); + return; + } + + // start sub-negotiation + char* p = &m_tmp_buf[0]; + write_uint8(1, p); + write_uint8(m_proxy_settings.username.size(), p); + write_string(m_proxy_settings.username, p); + write_uint8(m_proxy_settings.password.size(), p); + write_string(m_proxy_settings.password, p); + asio::async_write(m_socks5_sock, asio::buffer(m_tmp_buf, p - m_tmp_buf) + , boost::bind(&udp_socket::handshake3, this, _1)); + } + else + { + m_socks5_sock.close(); + return; + } +} + +void udp_socket::handshake3(asio::error_code const& e) +{ + if (e) return; + + asio::async_read(m_socks5_sock, asio::buffer(m_tmp_buf, 2) + , boost::bind(&udp_socket::handshake4, this, _1)); +} + +void udp_socket::handshake4(asio::error_code const& e) +{ + if (e) return; + + using namespace libtorrent::detail; + + char* p = &m_tmp_buf[0]; + int version = read_uint8(p); + int status = read_uint8(p); + + if (version != 1) return; + if (status != 0) return; + + socks_forward_udp(); +} + +void udp_socket::socks_forward_udp() +{ + using namespace libtorrent::detail; + + // send SOCKS5 UDP command + char* p = &m_tmp_buf[0]; + write_uint8(5, p); // SOCKS VERSION 5 + write_uint8(3, p); // UDP ASSOCIATE command + write_uint8(0, p); // reserved + write_uint8(0, p); // ATYP IPv4 + write_uint32(0, p); // IP any + write_uint16(m_bind_port, p); + + asio::async_write(m_socks5_sock, asio::buffer(m_tmp_buf, p - m_tmp_buf) + , boost::bind(&udp_socket::connect1, this, _1)); +} + +void udp_socket::connect1(asio::error_code const& e) +{ + if (e) return; + + asio::async_read(m_socks5_sock, asio::buffer(m_tmp_buf, 10) + , boost::bind(&udp_socket::connect2, this, _1)); +} + +void udp_socket::connect2(asio::error_code const& e) +{ + if (e) return; + + using namespace libtorrent::detail; + + char* p = &m_tmp_buf[0]; + int version = read_uint8(p); // VERSION + int status = read_uint8(p); // STATUS + read_uint8(p); // RESERVED + int atyp = read_uint8(p); // address type + + if (version != 5) return; + if (status != 0) return; + + if (atyp == 1) + { + m_proxy_addr.address(address_v4(read_uint32(p))); + m_proxy_addr.port(read_uint16(p)); + } + else + { + // in this case we need to read more data from the socket + TORRENT_ASSERT(false && "not implemented yet!"); + } + + m_tunnel_packets = true; +} +