From 0351326addd729de7b72a329794282a677176f0d Mon Sep 17 00:00:00 2001 From: Arvid Norberg Date: Wed, 3 Jun 2015 03:04:44 +0000 Subject: [PATCH] separate the main thread and the io_service from session_impl. The io_service object is now simply run() in the main thread, all initialization is done by posting messages to it. This generalizes session_impl to some degree, enables future expansion to run in multiple threads (although, the peers and torrents don't support this). This patch also makes it possible to pass in a third party io_service to the session, but at this point that's mostly useful for tests, as it's not well supported --- include/libtorrent/aux_/session_impl.hpp | 23 +-- include/libtorrent/session.hpp | 75 +++++--- src/session.cpp | 37 +++- src/session_impl.cpp | 219 +++++++---------------- 4 files changed, 155 insertions(+), 199 deletions(-) diff --git a/include/libtorrent/aux_/session_impl.hpp b/include/libtorrent/aux_/session_impl.hpp index a35ffe448..0898dec00 100644 --- a/include/libtorrent/aux_/session_impl.hpp +++ b/include/libtorrent/aux_/session_impl.hpp @@ -179,10 +179,9 @@ namespace libtorrent typedef std::map > torrent_map; #endif - session_impl(); + session_impl(io_service& ios); virtual ~session_impl(); - void init(); void start_session(settings_pack const& pack); void set_load_function(user_load_function_t fun) @@ -205,10 +204,7 @@ namespace libtorrent bool m_posting_torrent_updates; #endif - void main_thread(); - void open_listen_port(); - void init_settings(); torrent_peer_allocator_interface* get_peer_allocator() { return &m_peer_allocator; } @@ -608,7 +604,10 @@ namespace libtorrent peer_class_pool m_classes; - // TODO: 2 fix this + void init(boost::shared_ptr pack); + + // TODO: 3 fix this. all this should not be public. start by making + // everything that doesn't have to be public private. public: void submit_disk_jobs(); @@ -679,10 +678,7 @@ namespace libtorrent boost::pool<> m_send_buffers; #endif - // this is where all active sockets are stored. - // the selector can sleep while there's no activity on - // them - mutable io_service m_io_service; + io_service& m_io_service; #ifdef TORRENT_USE_OPENSSL // this is a generic SSL context used when talking to @@ -1185,13 +1181,6 @@ namespace libtorrent // into fewer network writes, saving CPU and possibly // ending up sending larger network packets std::vector m_delayed_uncorks; - - // the main working thread - boost::scoped_ptr m_thread; - -#if TORRENT_USE_ASSERTS && defined BOOST_HAS_PTHREADS - pthread_t m_network_thread; -#endif }; #ifndef TORRENT_DISABLE_LOGGING diff --git a/include/libtorrent/session.hpp b/include/libtorrent/session.hpp index eb8f5c68b..eba76ea88 100644 --- a/include/libtorrent/session.hpp +++ b/include/libtorrent/session.hpp @@ -54,9 +54,12 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/peer_class.hpp" #include "libtorrent/peer_class_type_filter.hpp" #include "libtorrent/build_config.hpp" +#include "libtorrent/settings_pack.hpp" +#include "libtorrent/io_service.hpp" #include "libtorrent/storage.hpp" #include "libtorrent/session_settings.hpp" +#include "libtorrent/thread.hpp" #ifndef TORRENT_NO_DEPRECATE #include "libtorrent/rss.hpp" @@ -143,9 +146,18 @@ namespace libtorrent // default constructor, does not refer to any session // implementation object. session_proxy() {} + ~session_proxy(); private: - session_proxy(boost::shared_ptr impl) - : m_impl(impl) {} + session_proxy( + boost::shared_ptr ios + , boost::shared_ptr t + , boost::shared_ptr impl) + : m_io_service(ios) + , m_thread(t) + , m_impl(impl) + {} + boost::shared_ptr m_io_service; + boost::shared_ptr m_thread; boost::shared_ptr m_impl; }; @@ -165,26 +177,47 @@ namespace libtorrent { public: - // If the fingerprint in the first overload is omited, the client will get - // a default fingerprint stating the version of libtorrent. The - // fingerprint is a short string that will be used in the peer-id to - // identify the client and the client's version. For more details see the - // fingerprint class. + // Constructs the session obects which acts as the container of torrents. + // It provides configuration options across torrents (such as rate limits, + // disk cache, ip filter etc.). In order to avoid a race condition between + // starting the session and configuring it, you can pass in a + // settings_pack object. Its settings will take effect before the session + // starts up. // - // The flags parameter can be used to start default features (upnp & + // The ``flags`` parameter can be used to start default features (upnp & // nat-pmp) and default plugins (ut_metadata, ut_pex and smart_ban). The // default is to start those features. If you do not want them to start, // pass 0 as the flags parameter. - // - // The ``alert_mask`` is the same mask that you would send to - // set_alert_mask(). - session(settings_pack const& pack , int flags = start_default_features | add_default_plugins) { TORRENT_CFG(); - start(flags, pack); + start(flags, pack, NULL); } + + // overload of the constructor that takes an external io_service to run + // the session object on. This is primarily useful for tests that may want + // to run multiple sessions on a single io_service, or low resource + // systems where additional threads are expensive and sharing an + // io_service with other events is fine. + // + // .. warning:: + // The session object does not cleanly terminate with an external + // io_service. The io_service::run() call _must_ have returned before + // it's safe to destruct the session. Which means you *MUST* call + // session::abort() and save the session_proxy first, then destruct the + // session object, then sync withthe io_service, then destruct the + // session_proxy object. + session(settings_pack const& pack + , io_service& ios + , int flags = start_default_features | add_default_plugins) + { + TORRENT_CFG(); + start(flags, pack, &ios); + } + +#ifndef TORRENT_NO_DEPRECATE + TORRENT_DEPRECATED session(fingerprint const& print = fingerprint("LT" , LIBTORRENT_VERSION_MAJOR, LIBTORRENT_VERSION_MINOR, 0, 0) , int flags = start_default_features | add_default_plugins @@ -192,9 +225,6 @@ namespace libtorrent { TORRENT_CFG(); settings_pack pack; - // TODO: 2 the two second constructors here should probably - // be deprecated in favor of the more generic one that just - // takes a settings_pack and a string pack.set_int(settings_pack::alert_mask, alert_mask); pack.set_str(settings_pack::peer_fingerprint, print.to_string()); if ((flags & start_default_features) == 0) @@ -205,8 +235,10 @@ namespace libtorrent pack.set_bool(settings_pack::enable_dht, false); } - start(flags, pack); + start(flags, pack, NULL); } + + TORRENT_DEPRECATED session(fingerprint const& print , std::pair listen_port_range , char const* listen_interface = "0.0.0.0" @@ -232,8 +264,9 @@ namespace libtorrent pack.set_bool(settings_pack::enable_lsd, false); pack.set_bool(settings_pack::enable_dht, false); } - start(flags, pack); + start(flags, pack, NULL); } +#endif // TORRENT_NO_DEPRECATE // The destructor of session will notify all trackers that our torrents // have been shut down. If some trackers are down, they will time out. @@ -432,7 +465,7 @@ namespace libtorrent // session_proxy(); // ~session_proxy() // }; - session_proxy abort() { return session_proxy(m_impl); } + session_proxy abort() { return session_proxy(m_io_service, m_thread, m_impl); } // Pausing the session has the same effect as pausing every torrent in // it, except that torrents will not be resumed by the auto-manage @@ -1218,10 +1251,12 @@ namespace libtorrent private: - void start(int flags, settings_pack const& pack); + void start(int flags, settings_pack const& pack, io_service* ios); // data shared between the main thread // and the working thread + boost::shared_ptr m_io_service; + boost::shared_ptr m_thread; boost::shared_ptr m_impl; }; diff --git a/src/session.cpp b/src/session.cpp index f1ffe7eb1..d48ff43fa 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -389,7 +389,7 @@ namespace libtorrent { throw; } #endif - void session::start(int flags, settings_pack const& pack) + void session::start(int flags, settings_pack const& pack, io_service* ios) { #if defined _MSC_VER && defined TORRENT_DEBUG // workaround for microsofts @@ -398,7 +398,17 @@ namespace libtorrent ::_set_se_translator(straight_to_debugger); #endif - m_impl.reset(new session_impl()); + bool internal_executor = ios == NULL; + + if (internal_executor) + { + // the user did not provide an executor, we have to use our own + m_io_service = boost::make_shared(); + ios = m_io_service.get(); + } + + m_impl = boost::make_shared(boost::ref(*ios)); + #ifndef TORRENT_DISABLE_EXTENSIONS if (flags & add_default_plugins) { @@ -409,6 +419,13 @@ namespace libtorrent #endif m_impl->start_session(pack); + + if (internal_executor) + { + // start a thread for the message pump + m_thread = boost::make_shared(boost::bind(&io_service::run + , m_io_service.get())); + } } session::~session() @@ -416,13 +433,10 @@ namespace libtorrent aux::dump_call_profile(); TORRENT_ASSERT(m_impl); - // if there is at least one destruction-proxy - // abort the session and let the destructor - // of the proxy to syncronize - if (!m_impl.unique()) - { - TORRENT_ASYNC_CALL(abort); - } + TORRENT_ASYNC_CALL(abort); + + if (m_thread && m_thread.unique()) + m_thread->join(); } void session::save_state(entry& e, boost::uint32_t flags) const @@ -1278,5 +1292,10 @@ namespace libtorrent session_settings::~session_settings() {} #endif // TORRENT_NO_DEPRECATE + session_proxy::~session_proxy() + { + if (m_thread && m_thread.unique()) + m_thread->join(); + } } diff --git a/src/session_impl.cpp b/src/session_impl.cpp index b920a1fe1..a9fc81f1d 100644 --- a/src/session_impl.cpp +++ b/src/session_impl.cpp @@ -336,13 +336,13 @@ namespace aux { } #endif - session_impl::session_impl() + session_impl::session_impl(io_service& ios) : #ifndef TORRENT_DISABLE_POOL_ALLOCATOR m_send_buffers(send_buffer_size()) , #endif - m_io_service() + m_io_service(ios) #ifdef TORRENT_USE_OPENSSL , m_ssl_ctx(m_io_service, asio::ssl::context::sslv23) #endif @@ -548,18 +548,54 @@ namespace aux { #endif boost::shared_ptr copy = boost::make_shared(pack); - m_io_service.post(boost::bind(&session_impl::apply_settings_pack, this, copy)); - // call update_* after settings set initialized - m_io_service.post(boost::bind(&session_impl::init_settings, this)); - -#ifndef TORRENT_DISABLE_LOGGING - session_log(" spawning network thread"); -#endif - m_thread.reset(new thread(boost::bind(&session_impl::main_thread, this))); + m_io_service.post(boost::bind(&session_impl::init, this, copy)); } - void session_impl::init_settings() + void session_impl::init(boost::shared_ptr pack) { + // this is a debug facility + // see single_threaded in debug.hpp + thread_started(); + + TORRENT_ASSERT(is_single_thread()); + +#ifndef TORRENT_DISABLE_LOGGING + session_log(" *** session thread init"); +#endif + + // this is where we should set up all async operations. This + // is called from within the network thread as opposed to the + // constructor which is called from the main thread + +#if defined TORRENT_ASIO_DEBUGGING + async_inc_threads(); + add_outstanding_async("session_impl::on_tick"); +#endif + error_code ec; + m_io_service.post(boost::bind(&session_impl::on_tick, this, ec)); + +#if defined TORRENT_ASIO_DEBUGGING + add_outstanding_async("session_impl::on_lsd_announce"); +#endif + int delay = (std::max)(m_settings.get_int(settings_pack::local_service_announce_interval) + / (std::max)(int(m_torrents.size()), 1), 1); + m_lsd_announce_timer.expires_from_now(seconds(delay), ec); + m_lsd_announce_timer.async_wait( + boost::bind(&session_impl::on_lsd_announce, this, _1)); + TORRENT_ASSERT(!ec); + +#ifndef TORRENT_DISABLE_DHT + update_dht_announce_interval(); +#endif + +#ifndef TORRENT_DISABLE_LOGGING + session_log(" done starting session"); +#endif + + apply_settings_pack(pack); + + // call update_* after settings set initialized + #ifndef TORRENT_NO_DEPRECATE update_local_download_rate(); update_local_upload_rate(); @@ -641,42 +677,6 @@ namespace aux { } } - void session_impl::init() - { -#ifndef TORRENT_DISABLE_LOGGING - session_log(" *** session thread init"); -#endif - - // this is where we should set up all async operations. This - // is called from within the network thread as opposed to the - // constructor which is called from the main thread - -#if defined TORRENT_ASIO_DEBUGGING - async_inc_threads(); - add_outstanding_async("session_impl::on_tick"); -#endif - error_code ec; - m_io_service.post(boost::bind(&session_impl::on_tick, this, ec)); - -#if defined TORRENT_ASIO_DEBUGGING - add_outstanding_async("session_impl::on_lsd_announce"); -#endif - int delay = (std::max)(m_settings.get_int(settings_pack::local_service_announce_interval) - / (std::max)(int(m_torrents.size()), 1), 1); - m_lsd_announce_timer.expires_from_now(seconds(delay), ec); - m_lsd_announce_timer.async_wait( - boost::bind(&session_impl::on_lsd_announce, this, _1)); - TORRENT_ASSERT(!ec); - -#ifndef TORRENT_DISABLE_DHT - update_dht_announce_interval(); -#endif - -#ifndef TORRENT_DISABLE_LOGGING - session_log(" done starting session"); -#endif - } - void session_impl::save_state(entry* eptr, boost::uint32_t flags) const { TORRENT_ASSERT(is_single_thread()); @@ -1075,6 +1075,9 @@ namespace aux { // has an internal counter and won't release the network // thread until they're all dead (via m_work). m_disk_thread.set_num_threads(0, false); + + // now it's OK for the network thread to exit + m_work.reset(); } bool session_impl::has_connection(peer_connection* p) const @@ -3988,88 +3991,6 @@ retry: m_delayed_uncorks.clear(); } -#if defined _MSC_VER && defined TORRENT_DEBUG - static void straight_to_debugger(unsigned int, _EXCEPTION_POINTERS*) - { throw; } -#endif - - void session_impl::main_thread() - { -#if defined _MSC_VER && defined TORRENT_DEBUG - // workaround for microsofts - // hardware exceptions that makes - // it hard to debug stuff - ::_set_se_translator(straight_to_debugger); -#endif - // this is a debug facility - // see single_threaded in debug.hpp - thread_started(); - - TORRENT_ASSERT(is_single_thread()); - - // initialize async operations - init(); - - bool stop_loop = false; - while (!stop_loop) - { - error_code ec; - m_io_service.run(ec); - if (ec) - { -#ifdef TORRENT_DEBUG - fprintf(stderr, "%s\n", ec.message().c_str()); - std::string err = ec.message(); -#endif - TORRENT_ASSERT(false); - } - m_io_service.reset(); - - stop_loop = m_abort; - } - -#ifndef TORRENT_DISABLE_LOGGING - session_log(" locking mutex"); -#endif - -/* -#ifdef TORRENT_DEBUG - for (torrent_map::iterator i = m_torrents.begin(); - i != m_torrents.end(); ++i) - { - TORRENT_ASSERT(i->second->num_peers() == 0); - } -#endif -*/ -#ifndef TORRENT_DISABLE_LOGGING - session_log(" cleaning up torrents"); -#endif - - // clear the torrent LRU (probably not strictly necessary) - list_node* i = m_torrent_lru.get_all(); -#if TORRENT_USE_ASSERTS - // clear the prev and next pointers in all torrents - // to avoid the assert when destructing them - while (i) - { - list_node* tmp = i; - i = i->next; - tmp->next = NULL; - tmp->prev= NULL; - } -#else - TORRENT_UNUSED(i); -#endif - m_torrents.clear(); - - TORRENT_ASSERT(m_torrents.empty()); - TORRENT_ASSERT(m_connections.empty()); - -#if TORRENT_USE_ASSERTS && defined BOOST_HAS_PTHREADS - m_network_thread = 0; -#endif - } - boost::shared_ptr session_impl::delay_load_torrent(sha1_hash const& info_hash , peer_connection* pc) { @@ -5623,11 +5544,6 @@ retry: // this is not allowed to be the network thread! TORRENT_ASSERT(is_not_thread()); - m_io_service.post(boost::bind(&session_impl::abort, this)); - - // now it's OK for the network thread to exit - m_work.reset(); - #if defined TORRENT_ASIO_DEBUGGING int counter = 0; while (log_async()) @@ -5640,12 +5556,8 @@ retry: async_dec_threads(); fprintf(stderr, "\n\nEXPECTS NO MORE ASYNC OPS\n\n\n"); - -// m_io_service.post(boost::bind(&io_service::stop, &m_io_service)); #endif - if (m_thread) m_thread->join(); - m_udp_socket.unsubscribe(this); m_udp_socket.unsubscribe(&m_utp_socket_manager); m_udp_socket.unsubscribe(&m_tracker_manager); @@ -5684,6 +5596,22 @@ retry: fclose(f); } #endif + + // clear the torrent LRU. We do this to avoid having the torrent + // destructor assert because it's still linked into the lru list +#if TORRENT_USE_ASSERTS + list_node* i = m_torrent_lru.get_all(); + // clear the prev and next pointers in all torrents + // to avoid the assert when destructing them + while (i) + { + list_node* tmp = i; + i = i->next; + tmp->next = NULL; + tmp->prev = NULL; + } +#endif + } #ifndef TORRENT_NO_DEPRECATE @@ -5944,9 +5872,6 @@ retry: m_pending_auto_manage = true; m_need_auto_manage = true; - // if we haven't started yet, don't actually trigger this - if (!m_thread) return; - m_io_service.post(boost::bind(&session_impl::on_trigger_auto_manage, this)); } @@ -5998,15 +5923,6 @@ retry: m_dht_interval_update_torrents = m_torrents.size(); - // if we haven't started yet, don't actually trigger this - if (!m_thread) - { -#ifndef TORRENT_DISABLE_LOGGING - session_log("not starting DHT announce timer: thread not running yet"); -#endif - return; - } - if (m_abort) { #ifndef TORRENT_DISABLE_LOGGING @@ -6044,9 +5960,6 @@ retry: if (!m_settings.get_bool(settings_pack::force_proxy)) return; - // if we haven't started yet, don't actually trigger this - if (!m_thread) return; - // enable force_proxy mode. We don't want to accept any incoming // connections, except through a proxy. stop_lsd();