separate the main thread and the io_service from session_impl. The io_service object is now simply run() in the main thread, all initialization is done by posting messages to it. This generalizes session_impl to some degree, enables future expansion to run in multiple threads (although, the peers and torrents don't support this). This patch also makes it possible to pass in a third party io_service to the session, but at this point that's mostly useful for tests, as it's not well supported

This commit is contained in:
Arvid Norberg 2015-06-03 03:04:44 +00:00
parent 5cc701df54
commit 0351326add
4 changed files with 155 additions and 199 deletions

View File

@ -179,10 +179,9 @@ namespace libtorrent
typedef std::map<sha1_hash, boost::shared_ptr<torrent> > torrent_map; typedef std::map<sha1_hash, boost::shared_ptr<torrent> > torrent_map;
#endif #endif
session_impl(); session_impl(io_service& ios);
virtual ~session_impl(); virtual ~session_impl();
void init();
void start_session(settings_pack const& pack); void start_session(settings_pack const& pack);
void set_load_function(user_load_function_t fun) void set_load_function(user_load_function_t fun)
@ -205,10 +204,7 @@ namespace libtorrent
bool m_posting_torrent_updates; bool m_posting_torrent_updates;
#endif #endif
void main_thread();
void open_listen_port(); void open_listen_port();
void init_settings();
torrent_peer_allocator_interface* get_peer_allocator() { return &m_peer_allocator; } torrent_peer_allocator_interface* get_peer_allocator() { return &m_peer_allocator; }
@ -608,7 +604,10 @@ namespace libtorrent
peer_class_pool m_classes; peer_class_pool m_classes;
// TODO: 2 fix this void init(boost::shared_ptr<settings_pack> pack);
// TODO: 3 fix this. all this should not be public. start by making
// everything that doesn't have to be public private.
public: public:
void submit_disk_jobs(); void submit_disk_jobs();
@ -679,10 +678,7 @@ namespace libtorrent
boost::pool<> m_send_buffers; boost::pool<> m_send_buffers;
#endif #endif
// this is where all active sockets are stored. io_service& m_io_service;
// the selector can sleep while there's no activity on
// them
mutable io_service m_io_service;
#ifdef TORRENT_USE_OPENSSL #ifdef TORRENT_USE_OPENSSL
// this is a generic SSL context used when talking to // this is a generic SSL context used when talking to
@ -1185,13 +1181,6 @@ namespace libtorrent
// into fewer network writes, saving CPU and possibly // into fewer network writes, saving CPU and possibly
// ending up sending larger network packets // ending up sending larger network packets
std::vector<peer_connection*> m_delayed_uncorks; std::vector<peer_connection*> m_delayed_uncorks;
// the main working thread
boost::scoped_ptr<thread> m_thread;
#if TORRENT_USE_ASSERTS && defined BOOST_HAS_PTHREADS
pthread_t m_network_thread;
#endif
}; };
#ifndef TORRENT_DISABLE_LOGGING #ifndef TORRENT_DISABLE_LOGGING

View File

@ -54,9 +54,12 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/peer_class.hpp" #include "libtorrent/peer_class.hpp"
#include "libtorrent/peer_class_type_filter.hpp" #include "libtorrent/peer_class_type_filter.hpp"
#include "libtorrent/build_config.hpp" #include "libtorrent/build_config.hpp"
#include "libtorrent/settings_pack.hpp"
#include "libtorrent/io_service.hpp"
#include "libtorrent/storage.hpp" #include "libtorrent/storage.hpp"
#include "libtorrent/session_settings.hpp" #include "libtorrent/session_settings.hpp"
#include "libtorrent/thread.hpp"
#ifndef TORRENT_NO_DEPRECATE #ifndef TORRENT_NO_DEPRECATE
#include "libtorrent/rss.hpp" #include "libtorrent/rss.hpp"
@ -143,9 +146,18 @@ namespace libtorrent
// default constructor, does not refer to any session // default constructor, does not refer to any session
// implementation object. // implementation object.
session_proxy() {} session_proxy() {}
~session_proxy();
private: private:
session_proxy(boost::shared_ptr<aux::session_impl> impl) session_proxy(
: m_impl(impl) {} boost::shared_ptr<io_service> ios
, boost::shared_ptr<thread> t
, boost::shared_ptr<aux::session_impl> impl)
: m_io_service(ios)
, m_thread(t)
, m_impl(impl)
{}
boost::shared_ptr<io_service> m_io_service;
boost::shared_ptr<thread> m_thread;
boost::shared_ptr<aux::session_impl> m_impl; boost::shared_ptr<aux::session_impl> m_impl;
}; };
@ -165,26 +177,47 @@ namespace libtorrent
{ {
public: public:
// If the fingerprint in the first overload is omited, the client will get // Constructs the session obects which acts as the container of torrents.
// a default fingerprint stating the version of libtorrent. The // It provides configuration options across torrents (such as rate limits,
// fingerprint is a short string that will be used in the peer-id to // disk cache, ip filter etc.). In order to avoid a race condition between
// identify the client and the client's version. For more details see the // starting the session and configuring it, you can pass in a
// fingerprint class. // settings_pack object. Its settings will take effect before the session
// starts up.
// //
// The flags parameter can be used to start default features (upnp & // The ``flags`` parameter can be used to start default features (upnp &
// nat-pmp) and default plugins (ut_metadata, ut_pex and smart_ban). The // nat-pmp) and default plugins (ut_metadata, ut_pex and smart_ban). The
// default is to start those features. If you do not want them to start, // default is to start those features. If you do not want them to start,
// pass 0 as the flags parameter. // pass 0 as the flags parameter.
//
// The ``alert_mask`` is the same mask that you would send to
// set_alert_mask().
session(settings_pack const& pack session(settings_pack const& pack
, int flags = start_default_features | add_default_plugins) , int flags = start_default_features | add_default_plugins)
{ {
TORRENT_CFG(); TORRENT_CFG();
start(flags, pack); start(flags, pack, NULL);
} }
// overload of the constructor that takes an external io_service to run
// the session object on. This is primarily useful for tests that may want
// to run multiple sessions on a single io_service, or low resource
// systems where additional threads are expensive and sharing an
// io_service with other events is fine.
//
// .. warning::
// The session object does not cleanly terminate with an external
// io_service. The io_service::run() call _must_ have returned before
// it's safe to destruct the session. Which means you *MUST* call
// session::abort() and save the session_proxy first, then destruct the
// session object, then sync withthe io_service, then destruct the
// session_proxy object.
session(settings_pack const& pack
, io_service& ios
, int flags = start_default_features | add_default_plugins)
{
TORRENT_CFG();
start(flags, pack, &ios);
}
#ifndef TORRENT_NO_DEPRECATE
TORRENT_DEPRECATED
session(fingerprint const& print = fingerprint("LT" session(fingerprint const& print = fingerprint("LT"
, LIBTORRENT_VERSION_MAJOR, LIBTORRENT_VERSION_MINOR, 0, 0) , LIBTORRENT_VERSION_MAJOR, LIBTORRENT_VERSION_MINOR, 0, 0)
, int flags = start_default_features | add_default_plugins , int flags = start_default_features | add_default_plugins
@ -192,9 +225,6 @@ namespace libtorrent
{ {
TORRENT_CFG(); TORRENT_CFG();
settings_pack pack; settings_pack pack;
// TODO: 2 the two second constructors here should probably
// be deprecated in favor of the more generic one that just
// takes a settings_pack and a string
pack.set_int(settings_pack::alert_mask, alert_mask); pack.set_int(settings_pack::alert_mask, alert_mask);
pack.set_str(settings_pack::peer_fingerprint, print.to_string()); pack.set_str(settings_pack::peer_fingerprint, print.to_string());
if ((flags & start_default_features) == 0) if ((flags & start_default_features) == 0)
@ -205,8 +235,10 @@ namespace libtorrent
pack.set_bool(settings_pack::enable_dht, false); pack.set_bool(settings_pack::enable_dht, false);
} }
start(flags, pack); start(flags, pack, NULL);
} }
TORRENT_DEPRECATED
session(fingerprint const& print session(fingerprint const& print
, std::pair<int, int> listen_port_range , std::pair<int, int> listen_port_range
, char const* listen_interface = "0.0.0.0" , char const* listen_interface = "0.0.0.0"
@ -232,8 +264,9 @@ namespace libtorrent
pack.set_bool(settings_pack::enable_lsd, false); pack.set_bool(settings_pack::enable_lsd, false);
pack.set_bool(settings_pack::enable_dht, false); pack.set_bool(settings_pack::enable_dht, false);
} }
start(flags, pack); start(flags, pack, NULL);
} }
#endif // TORRENT_NO_DEPRECATE
// The destructor of session will notify all trackers that our torrents // The destructor of session will notify all trackers that our torrents
// have been shut down. If some trackers are down, they will time out. // have been shut down. If some trackers are down, they will time out.
@ -432,7 +465,7 @@ namespace libtorrent
// session_proxy(); // session_proxy();
// ~session_proxy() // ~session_proxy()
// }; // };
session_proxy abort() { return session_proxy(m_impl); } session_proxy abort() { return session_proxy(m_io_service, m_thread, m_impl); }
// Pausing the session has the same effect as pausing every torrent in // Pausing the session has the same effect as pausing every torrent in
// it, except that torrents will not be resumed by the auto-manage // it, except that torrents will not be resumed by the auto-manage
@ -1218,10 +1251,12 @@ namespace libtorrent
private: private:
void start(int flags, settings_pack const& pack); void start(int flags, settings_pack const& pack, io_service* ios);
// data shared between the main thread // data shared between the main thread
// and the working thread // and the working thread
boost::shared_ptr<io_service> m_io_service;
boost::shared_ptr<thread> m_thread;
boost::shared_ptr<aux::session_impl> m_impl; boost::shared_ptr<aux::session_impl> m_impl;
}; };

View File

@ -389,7 +389,7 @@ namespace libtorrent
{ throw; } { throw; }
#endif #endif
void session::start(int flags, settings_pack const& pack) void session::start(int flags, settings_pack const& pack, io_service* ios)
{ {
#if defined _MSC_VER && defined TORRENT_DEBUG #if defined _MSC_VER && defined TORRENT_DEBUG
// workaround for microsofts // workaround for microsofts
@ -398,7 +398,17 @@ namespace libtorrent
::_set_se_translator(straight_to_debugger); ::_set_se_translator(straight_to_debugger);
#endif #endif
m_impl.reset(new session_impl()); bool internal_executor = ios == NULL;
if (internal_executor)
{
// the user did not provide an executor, we have to use our own
m_io_service = boost::make_shared<io_service>();
ios = m_io_service.get();
}
m_impl = boost::make_shared<session_impl>(boost::ref(*ios));
#ifndef TORRENT_DISABLE_EXTENSIONS #ifndef TORRENT_DISABLE_EXTENSIONS
if (flags & add_default_plugins) if (flags & add_default_plugins)
{ {
@ -409,6 +419,13 @@ namespace libtorrent
#endif #endif
m_impl->start_session(pack); m_impl->start_session(pack);
if (internal_executor)
{
// start a thread for the message pump
m_thread = boost::make_shared<thread>(boost::bind(&io_service::run
, m_io_service.get()));
}
} }
session::~session() session::~session()
@ -416,13 +433,10 @@ namespace libtorrent
aux::dump_call_profile(); aux::dump_call_profile();
TORRENT_ASSERT(m_impl); TORRENT_ASSERT(m_impl);
// if there is at least one destruction-proxy TORRENT_ASYNC_CALL(abort);
// abort the session and let the destructor
// of the proxy to syncronize if (m_thread && m_thread.unique())
if (!m_impl.unique()) m_thread->join();
{
TORRENT_ASYNC_CALL(abort);
}
} }
void session::save_state(entry& e, boost::uint32_t flags) const void session::save_state(entry& e, boost::uint32_t flags) const
@ -1278,5 +1292,10 @@ namespace libtorrent
session_settings::~session_settings() {} session_settings::~session_settings() {}
#endif // TORRENT_NO_DEPRECATE #endif // TORRENT_NO_DEPRECATE
session_proxy::~session_proxy()
{
if (m_thread && m_thread.unique())
m_thread->join();
}
} }

View File

@ -336,13 +336,13 @@ namespace aux {
} }
#endif #endif
session_impl::session_impl() session_impl::session_impl(io_service& ios)
: :
#ifndef TORRENT_DISABLE_POOL_ALLOCATOR #ifndef TORRENT_DISABLE_POOL_ALLOCATOR
m_send_buffers(send_buffer_size()) m_send_buffers(send_buffer_size())
, ,
#endif #endif
m_io_service() m_io_service(ios)
#ifdef TORRENT_USE_OPENSSL #ifdef TORRENT_USE_OPENSSL
, m_ssl_ctx(m_io_service, asio::ssl::context::sslv23) , m_ssl_ctx(m_io_service, asio::ssl::context::sslv23)
#endif #endif
@ -548,18 +548,54 @@ namespace aux {
#endif #endif
boost::shared_ptr<settings_pack> copy = boost::make_shared<settings_pack>(pack); boost::shared_ptr<settings_pack> copy = boost::make_shared<settings_pack>(pack);
m_io_service.post(boost::bind(&session_impl::apply_settings_pack, this, copy)); m_io_service.post(boost::bind(&session_impl::init, this, copy));
// call update_* after settings set initialized
m_io_service.post(boost::bind(&session_impl::init_settings, this));
#ifndef TORRENT_DISABLE_LOGGING
session_log(" spawning network thread");
#endif
m_thread.reset(new thread(boost::bind(&session_impl::main_thread, this)));
} }
void session_impl::init_settings() void session_impl::init(boost::shared_ptr<settings_pack> pack)
{ {
// this is a debug facility
// see single_threaded in debug.hpp
thread_started();
TORRENT_ASSERT(is_single_thread());
#ifndef TORRENT_DISABLE_LOGGING
session_log(" *** session thread init");
#endif
// this is where we should set up all async operations. This
// is called from within the network thread as opposed to the
// constructor which is called from the main thread
#if defined TORRENT_ASIO_DEBUGGING
async_inc_threads();
add_outstanding_async("session_impl::on_tick");
#endif
error_code ec;
m_io_service.post(boost::bind(&session_impl::on_tick, this, ec));
#if defined TORRENT_ASIO_DEBUGGING
add_outstanding_async("session_impl::on_lsd_announce");
#endif
int delay = (std::max)(m_settings.get_int(settings_pack::local_service_announce_interval)
/ (std::max)(int(m_torrents.size()), 1), 1);
m_lsd_announce_timer.expires_from_now(seconds(delay), ec);
m_lsd_announce_timer.async_wait(
boost::bind(&session_impl::on_lsd_announce, this, _1));
TORRENT_ASSERT(!ec);
#ifndef TORRENT_DISABLE_DHT
update_dht_announce_interval();
#endif
#ifndef TORRENT_DISABLE_LOGGING
session_log(" done starting session");
#endif
apply_settings_pack(pack);
// call update_* after settings set initialized
#ifndef TORRENT_NO_DEPRECATE #ifndef TORRENT_NO_DEPRECATE
update_local_download_rate(); update_local_download_rate();
update_local_upload_rate(); update_local_upload_rate();
@ -641,42 +677,6 @@ namespace aux {
} }
} }
void session_impl::init()
{
#ifndef TORRENT_DISABLE_LOGGING
session_log(" *** session thread init");
#endif
// this is where we should set up all async operations. This
// is called from within the network thread as opposed to the
// constructor which is called from the main thread
#if defined TORRENT_ASIO_DEBUGGING
async_inc_threads();
add_outstanding_async("session_impl::on_tick");
#endif
error_code ec;
m_io_service.post(boost::bind(&session_impl::on_tick, this, ec));
#if defined TORRENT_ASIO_DEBUGGING
add_outstanding_async("session_impl::on_lsd_announce");
#endif
int delay = (std::max)(m_settings.get_int(settings_pack::local_service_announce_interval)
/ (std::max)(int(m_torrents.size()), 1), 1);
m_lsd_announce_timer.expires_from_now(seconds(delay), ec);
m_lsd_announce_timer.async_wait(
boost::bind(&session_impl::on_lsd_announce, this, _1));
TORRENT_ASSERT(!ec);
#ifndef TORRENT_DISABLE_DHT
update_dht_announce_interval();
#endif
#ifndef TORRENT_DISABLE_LOGGING
session_log(" done starting session");
#endif
}
void session_impl::save_state(entry* eptr, boost::uint32_t flags) const void session_impl::save_state(entry* eptr, boost::uint32_t flags) const
{ {
TORRENT_ASSERT(is_single_thread()); TORRENT_ASSERT(is_single_thread());
@ -1075,6 +1075,9 @@ namespace aux {
// has an internal counter and won't release the network // has an internal counter and won't release the network
// thread until they're all dead (via m_work). // thread until they're all dead (via m_work).
m_disk_thread.set_num_threads(0, false); m_disk_thread.set_num_threads(0, false);
// now it's OK for the network thread to exit
m_work.reset();
} }
bool session_impl::has_connection(peer_connection* p) const bool session_impl::has_connection(peer_connection* p) const
@ -3988,88 +3991,6 @@ retry:
m_delayed_uncorks.clear(); m_delayed_uncorks.clear();
} }
#if defined _MSC_VER && defined TORRENT_DEBUG
static void straight_to_debugger(unsigned int, _EXCEPTION_POINTERS*)
{ throw; }
#endif
void session_impl::main_thread()
{
#if defined _MSC_VER && defined TORRENT_DEBUG
// workaround for microsofts
// hardware exceptions that makes
// it hard to debug stuff
::_set_se_translator(straight_to_debugger);
#endif
// this is a debug facility
// see single_threaded in debug.hpp
thread_started();
TORRENT_ASSERT(is_single_thread());
// initialize async operations
init();
bool stop_loop = false;
while (!stop_loop)
{
error_code ec;
m_io_service.run(ec);
if (ec)
{
#ifdef TORRENT_DEBUG
fprintf(stderr, "%s\n", ec.message().c_str());
std::string err = ec.message();
#endif
TORRENT_ASSERT(false);
}
m_io_service.reset();
stop_loop = m_abort;
}
#ifndef TORRENT_DISABLE_LOGGING
session_log(" locking mutex");
#endif
/*
#ifdef TORRENT_DEBUG
for (torrent_map::iterator i = m_torrents.begin();
i != m_torrents.end(); ++i)
{
TORRENT_ASSERT(i->second->num_peers() == 0);
}
#endif
*/
#ifndef TORRENT_DISABLE_LOGGING
session_log(" cleaning up torrents");
#endif
// clear the torrent LRU (probably not strictly necessary)
list_node* i = m_torrent_lru.get_all();
#if TORRENT_USE_ASSERTS
// clear the prev and next pointers in all torrents
// to avoid the assert when destructing them
while (i)
{
list_node* tmp = i;
i = i->next;
tmp->next = NULL;
tmp->prev= NULL;
}
#else
TORRENT_UNUSED(i);
#endif
m_torrents.clear();
TORRENT_ASSERT(m_torrents.empty());
TORRENT_ASSERT(m_connections.empty());
#if TORRENT_USE_ASSERTS && defined BOOST_HAS_PTHREADS
m_network_thread = 0;
#endif
}
boost::shared_ptr<torrent> session_impl::delay_load_torrent(sha1_hash const& info_hash boost::shared_ptr<torrent> session_impl::delay_load_torrent(sha1_hash const& info_hash
, peer_connection* pc) , peer_connection* pc)
{ {
@ -5623,11 +5544,6 @@ retry:
// this is not allowed to be the network thread! // this is not allowed to be the network thread!
TORRENT_ASSERT(is_not_thread()); TORRENT_ASSERT(is_not_thread());
m_io_service.post(boost::bind(&session_impl::abort, this));
// now it's OK for the network thread to exit
m_work.reset();
#if defined TORRENT_ASIO_DEBUGGING #if defined TORRENT_ASIO_DEBUGGING
int counter = 0; int counter = 0;
while (log_async()) while (log_async())
@ -5640,12 +5556,8 @@ retry:
async_dec_threads(); async_dec_threads();
fprintf(stderr, "\n\nEXPECTS NO MORE ASYNC OPS\n\n\n"); fprintf(stderr, "\n\nEXPECTS NO MORE ASYNC OPS\n\n\n");
// m_io_service.post(boost::bind(&io_service::stop, &m_io_service));
#endif #endif
if (m_thread) m_thread->join();
m_udp_socket.unsubscribe(this); m_udp_socket.unsubscribe(this);
m_udp_socket.unsubscribe(&m_utp_socket_manager); m_udp_socket.unsubscribe(&m_utp_socket_manager);
m_udp_socket.unsubscribe(&m_tracker_manager); m_udp_socket.unsubscribe(&m_tracker_manager);
@ -5684,6 +5596,22 @@ retry:
fclose(f); fclose(f);
} }
#endif #endif
// clear the torrent LRU. We do this to avoid having the torrent
// destructor assert because it's still linked into the lru list
#if TORRENT_USE_ASSERTS
list_node* i = m_torrent_lru.get_all();
// clear the prev and next pointers in all torrents
// to avoid the assert when destructing them
while (i)
{
list_node* tmp = i;
i = i->next;
tmp->next = NULL;
tmp->prev = NULL;
}
#endif
} }
#ifndef TORRENT_NO_DEPRECATE #ifndef TORRENT_NO_DEPRECATE
@ -5944,9 +5872,6 @@ retry:
m_pending_auto_manage = true; m_pending_auto_manage = true;
m_need_auto_manage = true; m_need_auto_manage = true;
// if we haven't started yet, don't actually trigger this
if (!m_thread) return;
m_io_service.post(boost::bind(&session_impl::on_trigger_auto_manage, this)); m_io_service.post(boost::bind(&session_impl::on_trigger_auto_manage, this));
} }
@ -5998,15 +5923,6 @@ retry:
m_dht_interval_update_torrents = m_torrents.size(); m_dht_interval_update_torrents = m_torrents.size();
// if we haven't started yet, don't actually trigger this
if (!m_thread)
{
#ifndef TORRENT_DISABLE_LOGGING
session_log("not starting DHT announce timer: thread not running yet");
#endif
return;
}
if (m_abort) if (m_abort)
{ {
#ifndef TORRENT_DISABLE_LOGGING #ifndef TORRENT_DISABLE_LOGGING
@ -6044,9 +5960,6 @@ retry:
if (!m_settings.get_bool(settings_pack::force_proxy)) return; if (!m_settings.get_bool(settings_pack::force_proxy)) return;
// if we haven't started yet, don't actually trigger this
if (!m_thread) return;
// enable force_proxy mode. We don't want to accept any incoming // enable force_proxy mode. We don't want to accept any incoming
// connections, except through a proxy. // connections, except through a proxy.
stop_lsd(); stop_lsd();