added mutex to udp_socket and posts the callback instead of calling it directly

This commit is contained in:
Arvid Norberg 2008-09-19 17:31:16 +00:00
parent 656ff6d5f2
commit 67f1242836
2 changed files with 45 additions and 6 deletions

View File

@ -38,6 +38,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include <vector> #include <vector>
#include <boost/function.hpp> #include <boost/function.hpp>
#include <boost/thread/mutex.hpp>
namespace libtorrent namespace libtorrent
{ {
@ -83,6 +84,9 @@ namespace libtorrent
void wrap(udp::endpoint const& ep, char const* p, int len, error_code& ec); void wrap(udp::endpoint const& ep, char const* p, int len, error_code& ec);
void unwrap(error_code const& e, char const* buf, int size); void unwrap(error_code const& e, char const* buf, int size);
typedef boost::mutex mutex_t;
mutable mutex_t m_mutex;
udp::socket m_ipv4_sock; udp::socket m_ipv4_sock;
udp::socket m_ipv6_sock; udp::socket m_ipv6_sock;
udp::endpoint m_v4_ep; udp::endpoint m_v4_ep;

View File

@ -30,6 +30,8 @@ void udp_socket::send(udp::endpoint const& ep, char const* p, int len, error_cod
{ {
if (ec == asio::error::operation_aborted) return; if (ec == asio::error::operation_aborted) return;
mutex_t::scoped_lock l(m_mutex);
if (m_tunnel_packets) if (m_tunnel_packets)
{ {
// send udp packets through SOCKS5 server // send udp packets through SOCKS5 server
@ -47,6 +49,8 @@ void udp_socket::on_read(udp::socket* s, error_code const& e, std::size_t bytes_
{ {
if (e == asio::error::operation_aborted) return; if (e == asio::error::operation_aborted) return;
mutex_t::scoped_lock l(m_mutex);
if (!m_callback) return; if (!m_callback) return;
if (e) if (e)
@ -55,9 +59,9 @@ void udp_socket::on_read(udp::socket* s, error_code const& e, std::size_t bytes_
try { try {
#endif #endif
if (s == &m_ipv4_sock) if (s == &m_ipv4_sock)
m_callback(e, m_v4_ep, 0, 0); get_io_service().post(boost::bind(m_callback, e, m_v4_ep, (char*)0, 0));
else else
m_callback(e, m_v6_ep, 0, 0); get_io_service().post(boost::bind(m_callback, e, m_v6_ep, (char*)0, 0));
#ifndef BOOST_NO_EXCEPTIONS #ifndef BOOST_NO_EXCEPTIONS
} catch(std::exception&) {} } catch(std::exception&) {}
#endif #endif
@ -90,7 +94,7 @@ void udp_socket::on_read(udp::socket* s, error_code const& e, std::size_t bytes_
if (m_tunnel_packets && m_v4_ep == m_proxy_addr) if (m_tunnel_packets && m_v4_ep == m_proxy_addr)
unwrap(e, m_v4_buf, bytes_transferred); unwrap(e, m_v4_buf, bytes_transferred);
else else
m_callback(e, m_v4_ep, m_v4_buf, bytes_transferred); get_io_service().post(boost::bind(m_callback, e, m_v4_ep, m_v4_buf, bytes_transferred));
#ifndef BOOST_NO_EXCEPTIONS #ifndef BOOST_NO_EXCEPTIONS
} catch(std::exception&) {} } catch(std::exception&) {}
@ -107,7 +111,7 @@ void udp_socket::on_read(udp::socket* s, error_code const& e, std::size_t bytes_
if (m_tunnel_packets && m_v6_ep == m_proxy_addr) if (m_tunnel_packets && m_v6_ep == m_proxy_addr)
unwrap(e, m_v6_buf, bytes_transferred); unwrap(e, m_v6_buf, bytes_transferred);
else else
m_callback(e, m_v6_ep, m_v6_buf, bytes_transferred); get_io_service().post(boost::bind(m_callback, e, m_v6_ep, m_v6_buf, bytes_transferred));
#ifndef BOOST_NO_EXCEPTIONS #ifndef BOOST_NO_EXCEPTIONS
} catch(std::exception&) {} } catch(std::exception&) {}
@ -173,11 +177,13 @@ void udp_socket::unwrap(error_code const& e, char const* buf, int size)
return; return;
} }
m_callback(e, sender, p, size - (p - buf)); get_io_service().post(boost::bind(m_callback, e, sender, p, size - (p - buf)));
} }
void udp_socket::close() void udp_socket::close()
{ {
mutex_t::scoped_lock l(m_mutex);
error_code ec; error_code ec;
m_ipv4_sock.close(ec); m_ipv4_sock.close(ec);
m_ipv6_sock.close(ec); m_ipv6_sock.close(ec);
@ -192,6 +198,8 @@ void udp_socket::close()
void udp_socket::bind(udp::endpoint const& ep, error_code& ec) void udp_socket::bind(udp::endpoint const& ep, error_code& ec)
{ {
mutex_t::scoped_lock l(m_mutex);
if (m_ipv4_sock.is_open()) m_ipv4_sock.close(ec); if (m_ipv4_sock.is_open()) m_ipv4_sock.close(ec);
if (m_ipv6_sock.is_open()) m_ipv6_sock.close(ec); if (m_ipv6_sock.is_open()) m_ipv6_sock.close(ec);
@ -218,6 +226,8 @@ void udp_socket::bind(udp::endpoint const& ep, error_code& ec)
void udp_socket::bind(int port) void udp_socket::bind(int port)
{ {
mutex_t::scoped_lock l(m_mutex);
error_code ec; error_code ec;
if (m_ipv4_sock.is_open()) m_ipv4_sock.close(ec); if (m_ipv4_sock.is_open()) m_ipv4_sock.close(ec);
@ -243,6 +253,8 @@ void udp_socket::bind(int port)
void udp_socket::set_proxy_settings(proxy_settings const& ps) void udp_socket::set_proxy_settings(proxy_settings const& ps)
{ {
mutex_t::scoped_lock l(m_mutex);
error_code ec; error_code ec;
m_socks5_sock.close(ec); m_socks5_sock.close(ec);
m_tunnel_packets = false; m_tunnel_packets = false;
@ -263,6 +275,9 @@ void udp_socket::set_proxy_settings(proxy_settings const& ps)
void udp_socket::on_name_lookup(error_code const& e, tcp::resolver::iterator i) void udp_socket::on_name_lookup(error_code const& e, tcp::resolver::iterator i)
{ {
if (e) return; if (e) return;
mutex_t::scoped_lock l(m_mutex);
m_proxy_addr.address(i->endpoint().address()); m_proxy_addr.address(i->endpoint().address());
m_proxy_addr.port(i->endpoint().port()); m_proxy_addr.port(i->endpoint().port());
m_cc.enqueue(boost::bind(&udp_socket::on_connect, this, _1) m_cc.enqueue(boost::bind(&udp_socket::on_connect, this, _1)
@ -271,6 +286,8 @@ void udp_socket::on_name_lookup(error_code const& e, tcp::resolver::iterator i)
void udp_socket::on_timeout() void udp_socket::on_timeout()
{ {
mutex_t::scoped_lock l(m_mutex);
error_code ec; error_code ec;
m_socks5_sock.close(ec); m_socks5_sock.close(ec);
m_connection_ticket = -1; m_connection_ticket = -1;
@ -278,6 +295,8 @@ void udp_socket::on_timeout()
void udp_socket::on_connect(int ticket) void udp_socket::on_connect(int ticket)
{ {
mutex_t::scoped_lock l(m_mutex);
m_connection_ticket = ticket; m_connection_ticket = ticket;
error_code ec; error_code ec;
m_socks5_sock.open(m_proxy_addr.address().is_v4()?tcp::v4():tcp::v6(), ec); m_socks5_sock.open(m_proxy_addr.address().is_v4()?tcp::v4():tcp::v6(), ec);
@ -291,6 +310,8 @@ void udp_socket::on_connected(error_code const& e)
m_connection_ticket = -1; m_connection_ticket = -1;
if (e) return; if (e) return;
mutex_t::scoped_lock l(m_mutex);
using namespace libtorrent::detail; using namespace libtorrent::detail;
// send SOCKS5 authentication methods // send SOCKS5 authentication methods
@ -316,6 +337,8 @@ void udp_socket::handshake1(error_code const& e)
{ {
if (e) return; if (e) return;
mutex_t::scoped_lock l(m_mutex);
asio::async_read(m_socks5_sock, asio::buffer(m_tmp_buf, 2) asio::async_read(m_socks5_sock, asio::buffer(m_tmp_buf, 2)
, boost::bind(&udp_socket::handshake2, this, _1)); , boost::bind(&udp_socket::handshake2, this, _1));
} }
@ -326,6 +349,8 @@ void udp_socket::handshake2(error_code const& e)
using namespace libtorrent::detail; using namespace libtorrent::detail;
mutex_t::scoped_lock l(m_mutex);
char* p = &m_tmp_buf[0]; char* p = &m_tmp_buf[0];
int version = read_uint8(p); int version = read_uint8(p);
int method = read_uint8(p); int method = read_uint8(p);
@ -367,6 +392,8 @@ void udp_socket::handshake3(error_code const& e)
{ {
if (e) return; if (e) return;
mutex_t::scoped_lock l(m_mutex);
asio::async_read(m_socks5_sock, asio::buffer(m_tmp_buf, 2) asio::async_read(m_socks5_sock, asio::buffer(m_tmp_buf, 2)
, boost::bind(&udp_socket::handshake4, this, _1)); , boost::bind(&udp_socket::handshake4, this, _1));
} }
@ -375,6 +402,8 @@ void udp_socket::handshake4(error_code const& e)
{ {
if (e) return; if (e) return;
mutex_t::scoped_lock l(m_mutex);
using namespace libtorrent::detail; using namespace libtorrent::detail;
char* p = &m_tmp_buf[0]; char* p = &m_tmp_buf[0];
@ -391,6 +420,8 @@ void udp_socket::socks_forward_udp()
{ {
using namespace libtorrent::detail; using namespace libtorrent::detail;
mutex_t::scoped_lock l(m_mutex);
// send SOCKS5 UDP command // send SOCKS5 UDP command
char* p = &m_tmp_buf[0]; char* p = &m_tmp_buf[0];
write_uint8(5, p); // SOCKS VERSION 5 write_uint8(5, p); // SOCKS VERSION 5
@ -408,6 +439,8 @@ void udp_socket::connect1(error_code const& e)
{ {
if (e) return; if (e) return;
mutex_t::scoped_lock l(m_mutex);
asio::async_read(m_socks5_sock, asio::buffer(m_tmp_buf, 10) asio::async_read(m_socks5_sock, asio::buffer(m_tmp_buf, 10)
, boost::bind(&udp_socket::connect2, this, _1)); , boost::bind(&udp_socket::connect2, this, _1));
} }
@ -415,7 +448,9 @@ void udp_socket::connect1(error_code const& e)
void udp_socket::connect2(error_code const& e) void udp_socket::connect2(error_code const& e)
{ {
if (e) return; if (e) return;
mutex_t::scoped_lock l(m_mutex);
using namespace libtorrent::detail; using namespace libtorrent::detail;
char* p = &m_tmp_buf[0]; char* p = &m_tmp_buf[0];