remove udp_socket mutex

This commit is contained in:
Arvid Norberg 2010-09-25 21:39:27 +00:00
parent e6d400084e
commit 554e329f57
3 changed files with 39 additions and 39 deletions

View File

@ -116,7 +116,7 @@ namespace libtorrent
void handshake2(error_code const& e); void handshake2(error_code const& e);
void handshake3(error_code const& e); void handshake3(error_code const& e);
void handshake4(error_code const& e); void handshake4(error_code const& e);
void socks_forward_udp(mutex::scoped_lock& l); void socks_forward_udp();
void connect1(error_code const& e); void connect1(error_code const& e);
void connect2(error_code const& e); void connect2(error_code const& e);
void hung_up(error_code const& e); void hung_up(error_code const& e);
@ -125,7 +125,20 @@ namespace libtorrent
void wrap(char const* hostname, int port, char const* p, int len, error_code& ec); void wrap(char const* hostname, int port, char const* p, int len, error_code& ec);
void unwrap(error_code const& e, char const* buf, int size); void unwrap(error_code const& e, char const* buf, int size);
mutable mutex m_mutex; #ifdef TORRENT_DEBUG
#if defined BOOST_HAS_PTHREADS
mutable pthread_t m_thread;
#endif
bool is_single_thread() const
{
#if defined BOOST_HAS_PTHREADS
if (m_thread == 0)
m_thread = pthread_self();
return m_thread == pthread_self();
#endif
return true;
}
#endif
udp::socket m_ipv4_sock; udp::socket m_ipv4_sock;
udp::endpoint m_v4_ep; udp::endpoint m_v4_ep;

View File

@ -425,7 +425,7 @@ namespace libtorrent
int piece_manager::hash_for_slot(int slot, partial_hash& ph, int piece_size int piece_manager::hash_for_slot(int slot, partial_hash& ph, int piece_size
, int small_piece_size, sha1_hash* small_hash) , int small_piece_size, sha1_hash* small_hash)
{ {
TORRENT_ASSERT(!error()); TORRENT_ASSERT_VAL(!error(), error());
int num_read = 0; int num_read = 0;
int slot_size = piece_size - ph.offset; int slot_size = piece_size - ph.offset;
if (slot_size > 0) if (slot_size > 0)

View File

@ -72,6 +72,9 @@ udp_socket::udp_socket(asio::io_service& ios
m_magic = 0x1337; m_magic = 0x1337;
m_started = false; m_started = false;
m_outstanding_when_aborted = -1; m_outstanding_when_aborted = -1;
#if defined BOOST_HAS_PTHREADS
m_thread = 0;
#endif
#endif #endif
} }
@ -164,7 +167,7 @@ void udp_socket::send(udp::endpoint const& ep, char const* p, int len, error_cod
void udp_socket::on_read(udp::socket* s, error_code const& e, std::size_t bytes_transferred) void udp_socket::on_read(udp::socket* s, error_code const& e, std::size_t bytes_transferred)
{ {
TORRENT_ASSERT(m_magic == 0x1337); TORRENT_ASSERT(m_magic == 0x1337);
mutex::scoped_lock l(m_mutex); TORRENT_ASSERT(is_single_thread());
TORRENT_ASSERT(m_outstanding > 0); TORRENT_ASSERT(m_outstanding > 0);
--m_outstanding; --m_outstanding;
@ -174,10 +177,8 @@ void udp_socket::on_read(udp::socket* s, error_code const& e, std::size_t bytes_
if (m_outstanding == 0) if (m_outstanding == 0)
{ {
// "this" may be destructed in the callback // "this" may be destructed in the callback
// that's why we need to unlock
callback_t tmp = m_callback; callback_t tmp = m_callback;
m_callback.clear(); m_callback.clear();
l.unlock();
} }
return; return;
} }
@ -187,7 +188,6 @@ void udp_socket::on_read(udp::socket* s, error_code const& e, std::size_t bytes_
if (e) if (e)
{ {
l.unlock();
#ifndef BOOST_NO_EXCEPTIONS #ifndef BOOST_NO_EXCEPTIONS
try { try {
#endif #endif
@ -204,8 +204,6 @@ void udp_socket::on_read(udp::socket* s, error_code const& e, std::size_t bytes_
#ifndef BOOST_NO_EXCEPTIONS #ifndef BOOST_NO_EXCEPTIONS
} catch(std::exception&) {} } catch(std::exception&) {}
#endif #endif
l.lock();
// don't stop listening on recoverable errors // don't stop listening on recoverable errors
if (e != asio::error::host_unreachable if (e != asio::error::host_unreachable
&& e != asio::error::fault && e != asio::error::fault
@ -217,10 +215,8 @@ void udp_socket::on_read(udp::socket* s, error_code const& e, std::size_t bytes_
if (m_outstanding == 0) if (m_outstanding == 0)
{ {
// "this" may be destructed in the callback // "this" may be destructed in the callback
// that's why we need to unlock
callback_t tmp = m_callback; callback_t tmp = m_callback;
m_callback.clear(); m_callback.clear();
l.unlock();
} }
return; return;
} }
@ -256,17 +252,14 @@ void udp_socket::on_read(udp::socket* s, error_code const& e, std::size_t bytes_
if (m_tunnel_packets) if (m_tunnel_packets)
{ {
l.unlock();
// if the source IP doesn't match the proxy's, ignore the packet // if the source IP doesn't match the proxy's, ignore the packet
if (m_v4_ep == m_proxy_addr) if (m_v4_ep == m_proxy_addr)
unwrap(e, m_v4_buf, bytes_transferred); unwrap(e, m_v4_buf, bytes_transferred);
} }
else else
{ {
l.unlock();
m_callback(e, m_v4_ep, m_v4_buf, bytes_transferred); m_callback(e, m_v4_ep, m_v4_buf, bytes_transferred);
} }
l.lock();
#ifndef BOOST_NO_EXCEPTIONS #ifndef BOOST_NO_EXCEPTIONS
} catch(std::exception&) {} } catch(std::exception&) {}
@ -286,21 +279,18 @@ void udp_socket::on_read(udp::socket* s, error_code const& e, std::size_t bytes_
if (m_tunnel_packets) if (m_tunnel_packets)
{ {
l.unlock();
// if the source IP doesn't match the proxy's, ignore the packet // if the source IP doesn't match the proxy's, ignore the packet
if (m_v6_ep == m_proxy_addr) if (m_v6_ep == m_proxy_addr)
unwrap(e, m_v6_buf, bytes_transferred); unwrap(e, m_v6_buf, bytes_transferred);
} }
else else
{ {
l.unlock();
m_callback(e, m_v6_ep, m_v6_buf, bytes_transferred); m_callback(e, m_v6_ep, m_v6_buf, bytes_transferred);
} }
#ifndef BOOST_NO_EXCEPTIONS #ifndef BOOST_NO_EXCEPTIONS
} catch(std::exception&) {} } catch(std::exception&) {}
#endif #endif
l.lock();
if (m_abort) return; if (m_abort) return;
@ -417,7 +407,7 @@ void udp_socket::unwrap(error_code const& e, char const* buf, int size)
void udp_socket::close() void udp_socket::close()
{ {
mutex::scoped_lock l(m_mutex); TORRENT_ASSERT(is_single_thread());
TORRENT_ASSERT(m_magic == 0x1337); TORRENT_ASSERT(m_magic == 0x1337);
error_code ec; error_code ec;
@ -446,14 +436,13 @@ void udp_socket::close()
// "this" may be destructed in the callback // "this" may be destructed in the callback
callback_t tmp = m_callback; callback_t tmp = m_callback;
m_callback.clear(); m_callback.clear();
l.unlock();
} }
} }
void udp_socket::bind(udp::endpoint const& ep, error_code& ec) void udp_socket::bind(udp::endpoint const& ep, error_code& ec)
{ {
CHECK_MAGIC; CHECK_MAGIC;
mutex::scoped_lock l(m_mutex); TORRENT_ASSERT(is_single_thread());
TORRENT_ASSERT(m_abort == false); TORRENT_ASSERT(m_abort == false);
if (m_abort) return; if (m_abort) return;
@ -494,7 +483,7 @@ void udp_socket::bind(udp::endpoint const& ep, error_code& ec)
void udp_socket::bind(int port) void udp_socket::bind(int port)
{ {
CHECK_MAGIC; CHECK_MAGIC;
mutex::scoped_lock l(m_mutex); TORRENT_ASSERT(is_single_thread());
TORRENT_ASSERT(m_abort == false); TORRENT_ASSERT(m_abort == false);
if (m_abort) return; if (m_abort) return;
@ -537,7 +526,7 @@ void udp_socket::bind(int port)
void udp_socket::set_proxy_settings(proxy_settings const& ps) void udp_socket::set_proxy_settings(proxy_settings const& ps)
{ {
CHECK_MAGIC; CHECK_MAGIC;
mutex::scoped_lock l(m_mutex); TORRENT_ASSERT(is_single_thread());
error_code ec; error_code ec;
m_socks5_sock.close(ec); m_socks5_sock.close(ec);
@ -563,11 +552,11 @@ void udp_socket::on_name_lookup(error_code const& e, tcp::resolver::iterator i)
if (e) return; if (e) return;
CHECK_MAGIC; CHECK_MAGIC;
mutex::scoped_lock l(m_mutex); TORRENT_ASSERT(is_single_thread());
m_proxy_addr.address(i->endpoint().address()); m_proxy_addr.address(i->endpoint().address());
m_proxy_addr.port(i->endpoint().port()); m_proxy_addr.port(i->endpoint().port());
l.unlock(); // on_connect may be called from within this thread // on_connect may be called from within this thread
m_cc.enqueue(boost::bind(&udp_socket::on_connect, this, _1) m_cc.enqueue(boost::bind(&udp_socket::on_connect, this, _1)
, boost::bind(&udp_socket::on_timeout, this), seconds(10)); , boost::bind(&udp_socket::on_timeout, this), seconds(10));
} }
@ -575,7 +564,7 @@ void udp_socket::on_name_lookup(error_code const& e, tcp::resolver::iterator i)
void udp_socket::on_timeout() void udp_socket::on_timeout()
{ {
CHECK_MAGIC; CHECK_MAGIC;
mutex::scoped_lock l(m_mutex); TORRENT_ASSERT(is_single_thread());
error_code ec; error_code ec;
m_socks5_sock.close(ec); m_socks5_sock.close(ec);
@ -585,7 +574,7 @@ void udp_socket::on_timeout()
void udp_socket::on_connect(int ticket) void udp_socket::on_connect(int ticket)
{ {
CHECK_MAGIC; CHECK_MAGIC;
mutex::scoped_lock l(m_mutex); TORRENT_ASSERT(is_single_thread());
if (m_abort) return; if (m_abort) return;
@ -600,7 +589,7 @@ void udp_socket::on_connected(error_code const& e)
{ {
CHECK_MAGIC; CHECK_MAGIC;
mutex::scoped_lock l(m_mutex); TORRENT_ASSERT(is_single_thread());
m_cc.done(m_connection_ticket); m_cc.done(m_connection_ticket);
m_connection_ticket = -1; m_connection_ticket = -1;
if (e) return; if (e) return;
@ -632,7 +621,7 @@ void udp_socket::handshake1(error_code const& e)
CHECK_MAGIC; CHECK_MAGIC;
if (e) return; if (e) return;
mutex::scoped_lock l(m_mutex); TORRENT_ASSERT(is_single_thread());
asio::async_read(m_socks5_sock, asio::buffer(m_tmp_buf, 2) asio::async_read(m_socks5_sock, asio::buffer(m_tmp_buf, 2)
, boost::bind(&udp_socket::handshake2, this, _1)); , boost::bind(&udp_socket::handshake2, this, _1));
@ -645,7 +634,7 @@ void udp_socket::handshake2(error_code const& e)
using namespace libtorrent::detail; using namespace libtorrent::detail;
mutex::scoped_lock l(m_mutex); TORRENT_ASSERT(is_single_thread());
char* p = &m_tmp_buf[0]; char* p = &m_tmp_buf[0];
int version = read_uint8(p); int version = read_uint8(p);
@ -655,7 +644,7 @@ void udp_socket::handshake2(error_code const& e)
if (method == 0) if (method == 0)
{ {
socks_forward_udp(l); socks_forward_udp(/*l*/);
} }
else if (method == 2) else if (method == 2)
{ {
@ -690,7 +679,7 @@ void udp_socket::handshake3(error_code const& e)
CHECK_MAGIC; CHECK_MAGIC;
if (e) return; if (e) return;
mutex::scoped_lock l(m_mutex); TORRENT_ASSERT(is_single_thread());
asio::async_read(m_socks5_sock, asio::buffer(m_tmp_buf, 2) asio::async_read(m_socks5_sock, asio::buffer(m_tmp_buf, 2)
, boost::bind(&udp_socket::handshake4, this, _1)); , boost::bind(&udp_socket::handshake4, this, _1));
@ -701,7 +690,7 @@ void udp_socket::handshake4(error_code const& e)
CHECK_MAGIC; CHECK_MAGIC;
if (e) return; if (e) return;
mutex::scoped_lock l(m_mutex); TORRENT_ASSERT(is_single_thread());
using namespace libtorrent::detail; using namespace libtorrent::detail;
@ -712,10 +701,10 @@ void udp_socket::handshake4(error_code const& e)
if (version != 1) return; if (version != 1) return;
if (status != 0) return; if (status != 0) return;
socks_forward_udp(l); socks_forward_udp(/*l*/);
} }
void udp_socket::socks_forward_udp(mutex::scoped_lock& l) void udp_socket::socks_forward_udp()
{ {
CHECK_MAGIC; CHECK_MAGIC;
using namespace libtorrent::detail; using namespace libtorrent::detail;
@ -749,7 +738,7 @@ void udp_socket::connect1(error_code const& e)
CHECK_MAGIC; CHECK_MAGIC;
if (e) return; if (e) return;
mutex::scoped_lock l(m_mutex); TORRENT_ASSERT(is_single_thread());
asio::async_read(m_socks5_sock, asio::buffer(m_tmp_buf, 10) asio::async_read(m_socks5_sock, asio::buffer(m_tmp_buf, 10)
, boost::bind(&udp_socket::connect2, this, _1)); , boost::bind(&udp_socket::connect2, this, _1));
@ -760,7 +749,7 @@ void udp_socket::connect2(error_code const& e)
CHECK_MAGIC; CHECK_MAGIC;
if (e) return; if (e) return;
mutex::scoped_lock l(m_mutex); TORRENT_ASSERT(is_single_thread());
using namespace libtorrent::detail; using namespace libtorrent::detail;
@ -811,12 +800,10 @@ void udp_socket::connect2(error_code const& e)
void udp_socket::hung_up(error_code const& e) void udp_socket::hung_up(error_code const& e)
{ {
CHECK_MAGIC; CHECK_MAGIC;
mutex::scoped_lock l(m_mutex); TORRENT_ASSERT(is_single_thread());
if (e == asio::error::operation_aborted || m_abort) return; if (e == asio::error::operation_aborted || m_abort) return;
l.unlock();
// the socks connection was closed, re-open it // the socks connection was closed, re-open it
set_proxy_settings(m_proxy_settings); set_proxy_settings(m_proxy_settings);
} }