fixed bug in udp_socket where there would be two outstanding async read operations on the socket

This commit is contained in:
Arvid Norberg 2011-02-05 21:19:33 +00:00
parent 4c0c322387
commit 5690444178
2 changed files with 105 additions and 37 deletions

View File

@ -189,8 +189,11 @@ namespace libtorrent
bool m_reallocate_buffer6; bool m_reallocate_buffer6;
#endif #endif
int m_bind_port; boost::uint16_t m_bind_port;
char m_outstanding; boost::uint8_t m_v4_outstanding;
#if TORRENT_USE_IPV6
boost::uint8_t m_v6_outstanding;
#endif
tcp::socket m_socks5_sock; tcp::socket m_socks5_sock;
int m_connection_ticket; int m_connection_ticket;

View File

@ -69,7 +69,10 @@ udp_socket::udp_socket(asio::io_service& ios
, m_reallocate_buffer6(false) , m_reallocate_buffer6(false)
#endif #endif
, m_bind_port(0) , m_bind_port(0)
, m_outstanding(0) , m_v4_outstanding(0)
#if TORRENT_USE_IPV6
, m_v6_outstanding(0)
#endif
, m_socks5_sock(ios) , m_socks5_sock(ios)
, m_connection_ticket(-1) , m_connection_ticket(-1)
, m_cc(cc) , m_cc(cc)
@ -93,6 +96,13 @@ udp_socket::udp_socket(asio::io_service& ios
m_v6_buf_size = 1600; m_v6_buf_size = 1600;
m_v6_buf = (char*)malloc(m_v6_buf_size); m_v6_buf = (char*)malloc(m_v6_buf_size);
#endif #endif
#ifdef TORRENT_DEBUG
m_v4_outstanding = 0;
#if TORRENT_USE_IPV6
m_v6_outstanding = 0;
#endif
#endif
} }
udp_socket::~udp_socket() udp_socket::~udp_socket()
@ -100,11 +110,12 @@ udp_socket::~udp_socket()
free(m_v4_buf); free(m_v4_buf);
#if TORRENT_USE_IPV6 #if TORRENT_USE_IPV6
free(m_v6_buf); free(m_v6_buf);
TORRENT_ASSERT_VAL(m_v6_outstanding == 0, m_v6_outstanding);
#endif #endif
#ifdef TORRENT_DEBUG TORRENT_ASSERT_VAL(m_v4_outstanding == 0, m_v4_outstanding);
TORRENT_ASSERT(m_magic == 0x1337); TORRENT_ASSERT(m_magic == 0x1337);
TORRENT_ASSERT(!m_callback || !m_started); TORRENT_ASSERT(!m_callback || !m_started);
TORRENT_ASSERT_VAL(m_outstanding == 0, m_outstanding); #ifdef TORRENT_DEBUG
m_magic = 0; m_magic = 0;
#endif #endif
} }
@ -196,20 +207,37 @@ void udp_socket::send(udp::endpoint const& ep, char const* p, int len
void udp_socket::maybe_realloc_buffers(int which) void udp_socket::maybe_realloc_buffers(int which)
{ {
TORRENT_ASSERT(is_single_thread()); TORRENT_ASSERT(is_single_thread());
bool no_mem = false;
if (m_reallocate_buffer4 && (which & 1)) if (m_reallocate_buffer4 && (which & 1))
{ {
free(m_v4_buf); TORRENT_ASSERT(m_v4_outstanding == 0);
m_v4_buf = (char*)malloc(m_v4_buf_size); void* tmp = realloc(m_v4_buf, m_v4_buf_size);
if (tmp != 0) m_v4_buf = (char*)tmp;
else no_mem = true;
m_reallocate_buffer4 = false; m_reallocate_buffer4 = false;
} }
#if TORRENT_USE_IPV6 #if TORRENT_USE_IPV6
if (m_reallocate_buffer6 && (which & 2)) if (m_reallocate_buffer6 && (which & 2))
{ {
free(m_v6_buf); TORRENT_ASSERT(m_v6_outstanding == 0);
m_v6_buf = (char*)malloc(m_v6_buf_size); void* tmp = realloc(m_v6_buf, m_v6_buf_size);
if (tmp != 0) m_v6_buf = (char*)tmp;
else no_mem = true;
m_reallocate_buffer6 = false; m_reallocate_buffer6 = false;
} }
#endif #endif
if (no_mem)
{
free(m_v4_buf);
m_v4_buf_size = 0;
#if TORRENT_USE_IPV6
free(m_v6_buf);
m_v6_buf_size = 0;
#endif
if (m_callback) m_callback(error::no_memory, m_v4_ep, 0, 0);
close();
}
} }
void udp_socket::on_read(udp::socket* s, error_code const& e, std::size_t bytes_transferred) void udp_socket::on_read(udp::socket* s, error_code const& e, std::size_t bytes_transferred)
@ -220,12 +248,22 @@ void udp_socket::on_read(udp::socket* s, error_code const& e, std::size_t bytes_
TORRENT_ASSERT(m_magic == 0x1337); TORRENT_ASSERT(m_magic == 0x1337);
TORRENT_ASSERT(is_single_thread()); TORRENT_ASSERT(is_single_thread());
TORRENT_ASSERT(m_outstanding > 0); #if TORRENT_USE_IPV6
--m_outstanding; if (s == &m_ipv6_sock)
if (e == asio::error::operation_aborted || m_abort)
{ {
if (m_outstanding == 0) TORRENT_ASSERT(m_v6_outstanding > 0);
--m_v6_outstanding;
}
else
#endif
{
TORRENT_ASSERT(m_v4_outstanding > 0);
--m_v4_outstanding;
}
if (m_abort)
{
if (m_v4_outstanding + m_v6_outstanding == 0)
{ {
// "this" may be destructed in the callback // "this" may be destructed in the callback
callback_t tmp = m_callback; callback_t tmp = m_callback;
@ -266,9 +304,10 @@ void udp_socket::on_read(udp::socket* s, error_code const& e, std::size_t bytes_
&& e != asio::error::connection_reset && e != asio::error::connection_reset
&& e != asio::error::connection_refused && e != asio::error::connection_refused
&& e != asio::error::connection_aborted && e != asio::error::connection_aborted
&& e != asio::error::connection_aborted
&& e != asio::error::message_size) && e != asio::error::message_size)
{ {
if (m_outstanding == 0) if (m_v4_outstanding + m_v6_outstanding == 0)
{ {
// "this" may be destructed in the callback // "this" may be destructed in the callback
callback_t tmp = m_callback; callback_t tmp = m_callback;
@ -286,14 +325,21 @@ void udp_socket::on_read(udp::socket* s, error_code const& e, std::size_t bytes_
#endif #endif
#if TORRENT_USE_IPV6 #if TORRENT_USE_IPV6
if (s == &m_ipv6_sock) if (s == &m_ipv6_sock)
{
TORRENT_ASSERT(m_v6_outstanding == 0);
++m_v6_outstanding;
s->async_receive_from(asio::buffer(m_v6_buf, m_v6_buf_size) s->async_receive_from(asio::buffer(m_v6_buf, m_v6_buf_size)
, m_v6_ep, boost::bind(&udp_socket::on_read, this, s, _1, _2)); , m_v6_ep, boost::bind(&udp_socket::on_read, this, s, _1, _2));
}
else else
#endif #endif
{
TORRENT_ASSERT(m_v4_outstanding == 0);
++m_v4_outstanding;
s->async_receive_from(asio::buffer(m_v4_buf, m_v4_buf_size) s->async_receive_from(asio::buffer(m_v4_buf, m_v4_buf_size)
, m_v4_ep, boost::bind(&udp_socket::on_read, this, s, _1, _2)); , m_v4_ep, boost::bind(&udp_socket::on_read, this, s, _1, _2));
}
++m_outstanding;
#ifdef TORRENT_DEBUG #ifdef TORRENT_DEBUG
m_started = true; m_started = true;
#endif #endif
@ -329,6 +375,8 @@ void udp_socket::on_read(udp::socket* s, error_code const& e, std::size_t bytes_
#if defined TORRENT_ASIO_DEBUGGING #if defined TORRENT_ASIO_DEBUGGING
add_outstanding_async("udp_socket::on_read"); add_outstanding_async("udp_socket::on_read");
#endif #endif
TORRENT_ASSERT(m_v6_outstanding == 0);
++m_v6_outstanding;
s->async_receive_from(asio::buffer(m_v6_buf, m_v6_buf_size) s->async_receive_from(asio::buffer(m_v6_buf, m_v6_buf_size)
, m_v6_ep, boost::bind(&udp_socket::on_read, this, s, _1, _2)); , m_v6_ep, boost::bind(&udp_socket::on_read, this, s, _1, _2));
} }
@ -362,10 +410,11 @@ void udp_socket::on_read(udp::socket* s, error_code const& e, std::size_t bytes_
#if defined TORRENT_ASIO_DEBUGGING #if defined TORRENT_ASIO_DEBUGGING
add_outstanding_async("udp_socket::on_read"); add_outstanding_async("udp_socket::on_read");
#endif #endif
TORRENT_ASSERT(m_v4_outstanding == 0);
++m_v4_outstanding;
s->async_receive_from(asio::buffer(m_v4_buf, m_v4_buf_size) s->async_receive_from(asio::buffer(m_v4_buf, m_v4_buf_size)
, m_v4_ep, boost::bind(&udp_socket::on_read, this, s, _1, _2)); , m_v4_ep, boost::bind(&udp_socket::on_read, this, s, _1, _2));
} }
++m_outstanding;
#ifdef TORRENT_DEBUG #ifdef TORRENT_DEBUG
m_started = true; m_started = true;
@ -509,7 +558,7 @@ void udp_socket::close()
m_connection_ticket = -1; m_connection_ticket = -1;
} }
if (m_outstanding == 0) if (m_v4_outstanding + m_v6_outstanding == 0)
{ {
// "this" may be destructed in the callback // "this" may be destructed in the callback
callback_t tmp = m_callback; callback_t tmp = m_callback;
@ -552,12 +601,16 @@ void udp_socket::bind(udp::endpoint const& ep, error_code& ec)
if (ec) return; if (ec) return;
m_ipv4_sock.bind(ep, ec); m_ipv4_sock.bind(ep, ec);
if (ec) return; if (ec) return;
if (m_v4_outstanding == 0)
{
#if defined TORRENT_ASIO_DEBUGGING #if defined TORRENT_ASIO_DEBUGGING
add_outstanding_async("udp_socket::on_read"); add_outstanding_async("udp_socket::on_read");
#endif #endif
m_ipv4_sock.async_receive_from(asio::buffer(m_v4_buf, m_v4_buf_size) ++m_v4_outstanding;
, m_v4_ep, boost::bind(&udp_socket::on_read, this, &m_ipv4_sock, _1, _2)); m_ipv4_sock.async_receive_from(asio::buffer(m_v4_buf, m_v4_buf_size)
++m_outstanding; , m_v4_ep, boost::bind(&udp_socket::on_read, this, &m_ipv4_sock
, _1, _2));
}
} }
#if TORRENT_USE_IPV6 #if TORRENT_USE_IPV6
else else
@ -566,12 +619,17 @@ void udp_socket::bind(udp::endpoint const& ep, error_code& ec)
if (ec) return; if (ec) return;
m_ipv6_sock.bind(ep, ec); m_ipv6_sock.bind(ep, ec);
if (ec) return; if (ec) return;
if (m_v6_outstanding == 0)
{
#if defined TORRENT_ASIO_DEBUGGING #if defined TORRENT_ASIO_DEBUGGING
add_outstanding_async("udp_socket::on_read"); add_outstanding_async("udp_socket::on_read");
#endif #endif
m_ipv6_sock.async_receive_from(asio::buffer(m_v6_buf, m_v6_buf_size) TORRENT_ASSERT(m_v6_outstanding == 0);
, m_v6_ep, boost::bind(&udp_socket::on_read, this, &m_ipv6_sock, _1, _2)); ++m_v6_outstanding;
++m_outstanding; m_ipv6_sock.async_receive_from(asio::buffer(m_v6_buf, m_v6_buf_size)
, m_v6_ep, boost::bind(&udp_socket::on_read, this, &m_ipv6_sock
, _1, _2));
}
} }
#endif #endif
#ifdef TORRENT_DEBUG #ifdef TORRENT_DEBUG
@ -604,12 +662,13 @@ void udp_socket::bind(int port)
add_outstanding_async("udp_socket::on_read"); add_outstanding_async("udp_socket::on_read");
#endif #endif
m_ipv4_sock.bind(udp::endpoint(address_v4::any(), port), ec); m_ipv4_sock.bind(udp::endpoint(address_v4::any(), port), ec);
m_ipv4_sock.async_receive_from(asio::buffer(m_v4_buf, m_v4_buf_size) if (m_v4_outstanding == 0)
, m_v4_ep, boost::bind(&udp_socket::on_read, this, &m_ipv4_sock, _1, _2)); {
++m_outstanding; ++m_v4_outstanding;
#ifdef TORRENT_DEBUG m_ipv4_sock.async_receive_from(asio::buffer(m_v4_buf, m_v4_buf_size)
m_started = true; , m_v4_ep, boost::bind(&udp_socket::on_read, this, &m_ipv4_sock
#endif , _1, _2));
}
} }
#if TORRENT_USE_IPV6 #if TORRENT_USE_IPV6
m_ipv6_sock.open(udp::v6(), ec); m_ipv6_sock.open(udp::v6(), ec);
@ -620,13 +679,19 @@ void udp_socket::bind(int port)
#endif #endif
m_ipv6_sock.set_option(v6only(true), ec); m_ipv6_sock.set_option(v6only(true), ec);
m_ipv6_sock.bind(udp::endpoint(address_v6::any(), port), ec); m_ipv6_sock.bind(udp::endpoint(address_v6::any(), port), ec);
m_ipv6_sock.async_receive_from(asio::buffer(m_v6_buf, m_v6_buf_size)
, m_v6_ep, boost::bind(&udp_socket::on_read, this, &m_ipv6_sock, _1, _2)); if (m_v6_outstanding == 0)
++m_outstanding; {
++m_v6_outstanding;
m_ipv6_sock.async_receive_from(asio::buffer(m_v6_buf, m_v6_buf_size)
, m_v6_ep, boost::bind(&udp_socket::on_read, this, &m_ipv6_sock
, _1, _2));
}
}
#endif // TORRENT_USE_IPV6
#ifdef TORRENT_DEBUG #ifdef TORRENT_DEBUG
m_started = true; m_started = true;
#endif
}
#endif #endif
m_bind_port = port; m_bind_port = port;
} }