attempt at fixing shutdown hang caused by connection_queue
This commit is contained in:
parent
db33922684
commit
0657690b1c
|
@ -67,6 +67,22 @@ public:
|
||||||
int limit() const;
|
int limit() const;
|
||||||
void close();
|
void close();
|
||||||
int size() const { return m_queue.size(); }
|
int size() const { return m_queue.size(); }
|
||||||
|
int num_connecting() const { return m_num_connecting; }
|
||||||
|
#if defined TORRENT_ASIO_DEBUGGING
|
||||||
|
float next_timeout() const { return total_milliseconds(m_timer.expires_at() - time_now_hires()) / 1000.f; }
|
||||||
|
float max_timeout() const
|
||||||
|
{
|
||||||
|
ptime max_timeout = min_time();
|
||||||
|
for (std::list<entry>::const_iterator i = m_queue.begin()
|
||||||
|
, end(m_queue.end()); i != end; ++i)
|
||||||
|
{
|
||||||
|
if (!i->connecting) continue;
|
||||||
|
if (i->expires > max_timeout) max_timeout = i->expires;
|
||||||
|
}
|
||||||
|
if (max_timeout == min_time()) return 0.f;
|
||||||
|
return total_milliseconds(max_timeout - time_now_hires()) / 1000.f;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
#ifdef TORRENT_DEBUG
|
#ifdef TORRENT_DEBUG
|
||||||
void check_invariant() const;
|
void check_invariant() const;
|
||||||
|
@ -85,6 +101,9 @@ private:
|
||||||
entry(): connecting(false), ticket(0), expires(max_time()), priority(0) {}
|
entry(): connecting(false), ticket(0), expires(max_time()), priority(0) {}
|
||||||
// called when the connection is initiated
|
// called when the connection is initiated
|
||||||
// this is when the timeout countdown starts
|
// this is when the timeout countdown starts
|
||||||
|
// TODO: if we don't actually need the connection queue
|
||||||
|
// to hold ownership of objects, replace these boost functions
|
||||||
|
// with pointer to a pure virtual interface class
|
||||||
boost::function<void(int)> on_connect;
|
boost::function<void(int)> on_connect;
|
||||||
// called if done hasn't been called within the timeout
|
// called if done hasn't been called within the timeout
|
||||||
// or if the connection queue aborts. This means there
|
// or if the connection queue aborts. This means there
|
||||||
|
@ -100,6 +119,9 @@ private:
|
||||||
int priority;
|
int priority;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// TODO: split this into a queue and connecting map. The key for the map
|
||||||
|
// is the ticket. Most field in entry would only be necessary for the
|
||||||
|
// connecting map.
|
||||||
std::list<entry> m_queue;
|
std::list<entry> m_queue;
|
||||||
|
|
||||||
// the next ticket id a connection will be given
|
// the next ticket id a connection will be given
|
||||||
|
@ -108,6 +130,9 @@ private:
|
||||||
int m_half_open_limit;
|
int m_half_open_limit;
|
||||||
bool m_abort;
|
bool m_abort;
|
||||||
|
|
||||||
|
// the number of outstanding timers
|
||||||
|
int m_num_timers;
|
||||||
|
|
||||||
deadline_timer m_timer;
|
deadline_timer m_timer;
|
||||||
|
|
||||||
mutable mutex_t m_mutex;
|
mutable mutex_t m_mutex;
|
||||||
|
|
|
@ -49,6 +49,7 @@ namespace libtorrent
|
||||||
, m_num_connecting(0)
|
, m_num_connecting(0)
|
||||||
, m_half_open_limit(0)
|
, m_half_open_limit(0)
|
||||||
, m_abort(false)
|
, m_abort(false)
|
||||||
|
, m_num_timers(0)
|
||||||
, m_timer(ios)
|
, m_timer(ios)
|
||||||
#ifdef TORRENT_DEBUG
|
#ifdef TORRENT_DEBUG
|
||||||
, m_in_timeout_function(false)
|
, m_in_timeout_function(false)
|
||||||
|
@ -155,7 +156,7 @@ namespace libtorrent
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
TORRENT_TRY {
|
TORRENT_TRY {
|
||||||
e.on_timeout();
|
e.on_connect(-1);
|
||||||
} TORRENT_CATCH(std::exception&) {}
|
} TORRENT_CATCH(std::exception&) {}
|
||||||
tmp.pop_front();
|
tmp.pop_front();
|
||||||
}
|
}
|
||||||
|
@ -227,6 +228,7 @@ namespace libtorrent
|
||||||
error_code ec;
|
error_code ec;
|
||||||
m_timer.expires_at(expire, ec);
|
m_timer.expires_at(expire, ec);
|
||||||
m_timer.async_wait(boost::bind(&connection_queue::on_timeout, this, _1));
|
m_timer.async_wait(boost::bind(&connection_queue::on_timeout, this, _1));
|
||||||
|
++m_num_timers;
|
||||||
}
|
}
|
||||||
i->connecting = true;
|
i->connecting = true;
|
||||||
++m_num_connecting;
|
++m_num_connecting;
|
||||||
|
@ -275,6 +277,7 @@ namespace libtorrent
|
||||||
complete_async("connection_queue::on_timeout");
|
complete_async("connection_queue::on_timeout");
|
||||||
#endif
|
#endif
|
||||||
mutex_t::scoped_lock l(m_mutex);
|
mutex_t::scoped_lock l(m_mutex);
|
||||||
|
--m_num_timers;
|
||||||
|
|
||||||
INVARIANT_CHECK;
|
INVARIANT_CHECK;
|
||||||
#ifdef TORRENT_DEBUG
|
#ifdef TORRENT_DEBUG
|
||||||
|
@ -282,7 +285,7 @@ namespace libtorrent
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
TORRENT_ASSERT(!e || e == error::operation_aborted);
|
TORRENT_ASSERT(!e || e == error::operation_aborted);
|
||||||
if (e) return;
|
if (e && m_num_connecting == 0 && m_num_timers > 0) return;
|
||||||
|
|
||||||
ptime next_expire = max_time();
|
ptime next_expire = max_time();
|
||||||
ptime now = time_now_hires() + milliseconds(100);
|
ptime now = time_now_hires() + milliseconds(100);
|
||||||
|
@ -327,6 +330,7 @@ namespace libtorrent
|
||||||
error_code ec;
|
error_code ec;
|
||||||
m_timer.expires_at(next_expire, ec);
|
m_timer.expires_at(next_expire, ec);
|
||||||
m_timer.async_wait(boost::bind(&connection_queue::on_timeout, this, _1));
|
m_timer.async_wait(boost::bind(&connection_queue::on_timeout, this, _1));
|
||||||
|
++m_num_timers;
|
||||||
}
|
}
|
||||||
try_connect(l);
|
try_connect(l);
|
||||||
}
|
}
|
||||||
|
|
|
@ -93,6 +93,7 @@ http_connection::http_connection(io_service& ios, connection_queue& cc
|
||||||
|
|
||||||
http_connection::~http_connection()
|
http_connection::~http_connection()
|
||||||
{
|
{
|
||||||
|
TORRENT_ASSERT(m_connection_ticket == -1);
|
||||||
#ifdef TORRENT_USE_OPENSSL
|
#ifdef TORRENT_USE_OPENSSL
|
||||||
if (m_own_ssl_context) delete m_ssl_ctx;
|
if (m_own_ssl_context) delete m_ssl_ctx;
|
||||||
#endif
|
#endif
|
||||||
|
@ -378,23 +379,14 @@ void http_connection::start(std::string const& hostname, std::string const& port
|
||||||
|
|
||||||
void http_connection::on_connect_timeout()
|
void http_connection::on_connect_timeout()
|
||||||
{
|
{
|
||||||
if (m_connection_ticket > -1) m_cc.done(m_connection_ticket);
|
TORRENT_ASSERT(m_connection_ticket > -1);
|
||||||
m_connection_ticket = -1;
|
|
||||||
|
|
||||||
// keep ourselves alive even if the callback function
|
// keep ourselves alive even if the callback function
|
||||||
// deletes this object
|
// deletes this object
|
||||||
boost::shared_ptr<http_connection> me(shared_from_this());
|
boost::shared_ptr<http_connection> me(shared_from_this());
|
||||||
|
|
||||||
if (!m_endpoints.empty())
|
error_code ec;
|
||||||
{
|
m_sock.close(ec);
|
||||||
error_code ec;
|
|
||||||
m_sock.close(ec);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
callback(asio::error::timed_out);
|
|
||||||
close();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void http_connection::on_timeout(boost::weak_ptr<http_connection> p
|
void http_connection::on_timeout(boost::weak_ptr<http_connection> p
|
||||||
|
@ -550,6 +542,12 @@ void http_connection::queue_connect()
|
||||||
|
|
||||||
void http_connection::connect(int ticket, tcp::endpoint target_address)
|
void http_connection::connect(int ticket, tcp::endpoint target_address)
|
||||||
{
|
{
|
||||||
|
if (ticket == -1)
|
||||||
|
{
|
||||||
|
close();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
m_connection_ticket = ticket;
|
m_connection_ticket = ticket;
|
||||||
if (m_proxy.proxy_hostnames
|
if (m_proxy.proxy_hostnames
|
||||||
&& (m_proxy.type == proxy_settings::socks5
|
&& (m_proxy.type == proxy_settings::socks5
|
||||||
|
|
|
@ -5245,6 +5245,12 @@ namespace libtorrent
|
||||||
(*m_ses.m_logger) << time_now_string() << " ON_CONNECT: " << print_endpoint(m_remote) << "\n";
|
(*m_ses.m_logger) << time_now_string() << " ON_CONNECT: " << print_endpoint(m_remote) << "\n";
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
if (ticket == -1)
|
||||||
|
{
|
||||||
|
disconnect(asio::error::operation_aborted);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
m_connection_ticket = ticket;
|
m_connection_ticket = ticket;
|
||||||
boost::shared_ptr<torrent> t = m_torrent.lock();
|
boost::shared_ptr<torrent> t = m_torrent.lock();
|
||||||
|
|
||||||
|
|
|
@ -5495,7 +5495,9 @@ namespace aux {
|
||||||
{
|
{
|
||||||
sleep(1000);
|
sleep(1000);
|
||||||
++counter;
|
++counter;
|
||||||
printf("\n==== Waiting to shut down: %d ==== conn-queue: %d\n\n", counter, m_half_open.size());
|
printf("\n==== Waiting to shut down: %d ==== conn-queue: %d connecting: %d timeout (next: %f max: %f)\n\n"
|
||||||
|
, counter, m_half_open.size(), m_half_open.num_connecting(), m_half_open.next_timeout()
|
||||||
|
, m_half_open.max_timeout());
|
||||||
}
|
}
|
||||||
async_dec_threads();
|
async_dec_threads();
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -776,6 +776,11 @@ void udp_socket::on_connect(int ticket)
|
||||||
|
|
||||||
if (m_abort) return;
|
if (m_abort) return;
|
||||||
if (is_closed()) return;
|
if (is_closed()) return;
|
||||||
|
if (ticket == -1)
|
||||||
|
{
|
||||||
|
close();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
#if defined TORRENT_ASIO_DEBUGGING
|
#if defined TORRENT_ASIO_DEBUGGING
|
||||||
add_outstanding_async("udp_socket::on_connected");
|
add_outstanding_async("udp_socket::on_connected");
|
||||||
|
|
Loading…
Reference in New Issue