make aux::session_settings thread-safe. this allows all threads to share the same settings object, which has low contention and is rarely updated

This commit is contained in:
arvidn 2019-08-29 14:50:51 +02:00 committed by Arvid Norberg
parent c0c4c2083a
commit a3440e54bb
10 changed files with 192 additions and 106 deletions

View File

@ -40,19 +40,14 @@ POSSIBILITY OF SUCH DAMAGE.
#include <string>
#include <array>
#include <bitset>
#include <mutex>
#include <functional>
namespace libtorrent {
namespace aux {
TORRENT_EXTRA_EXPORT void initialize_default_settings(aux::session_settings& s);
}
namespace libtorrent { namespace aux {
struct TORRENT_EXTRA_EXPORT session_settings
struct TORRENT_EXTRA_EXPORT session_settings_single_thread
{
friend TORRENT_EXTRA_EXPORT void libtorrent::save_settings_to_dict(
aux::session_settings const& s, entry::dictionary_type& sett);
void set_str(int name, std::string value)
{ set<std::string>(m_strings, name, std::move(value), settings_pack::string_type_base); }
void set_int(int name, int value)
@ -67,14 +62,13 @@ namespace libtorrent { namespace aux {
bool get_bool(int name) const
{ return get<bool>(m_bools, name, settings_pack::bool_type_base); }
session_settings();
explicit session_settings(settings_pack const&);
session_settings_single_thread();
private:
template <typename T, typename Container>
void set(Container& c, int const name, T val
, int const type) const
, int const type)
{
TORRENT_ASSERT((name & settings_pack::type_mask) == type);
if ((name & settings_pack::type_mask) != type) return;
@ -99,6 +93,57 @@ namespace libtorrent { namespace aux {
std::bitset<settings_pack::num_bool_settings> m_bools;
};
} }
struct TORRENT_EXTRA_EXPORT session_settings
{
void set_str(int name, std::string value)
{
std::unique_lock<std::mutex> l(m_mutex);
return m_store.set_str(name, std::move(value));
}
void set_int(int name, int value)
{
std::unique_lock<std::mutex> l(m_mutex);
m_store.set_int(name, value);
}
void set_bool(int name, bool value)
{
std::unique_lock<std::mutex> l(m_mutex);
m_store.set_bool(name, value);
}
std::string const& get_str(int name) const
{
std::unique_lock<std::mutex> l(m_mutex);
return m_store.get_str(name);
}
int get_int(int name) const
{
std::unique_lock<std::mutex> l(m_mutex);
return m_store.get_int(name);
}
bool get_bool(int name) const
{
std::unique_lock<std::mutex> l(m_mutex);
return m_store.get_bool(name);
}
session_settings();
explicit session_settings(settings_pack const&);
void bulk_set(std::function<void(session_settings_single_thread&)>);
void bulk_get(std::function<void(session_settings_single_thread const&)>) const;
private:
session_settings_single_thread m_store;
mutable std::mutex m_mutex;
};
}
}
namespace libtorrent {
TORRENT_EXTRA_EXPORT void initialize_default_settings(aux::session_settings_single_thread& s);
}
#endif

View File

@ -279,7 +279,7 @@ namespace aux { struct block_cache_reference; }
, disk_interface
, buffer_allocator_interface
{
disk_io_thread(io_service& ios, counters& cnt);
disk_io_thread(io_service& ios, aux::session_settings const&, counters&);
#if TORRENT_USE_ASSERTS
~disk_io_thread();
#endif
@ -290,7 +290,7 @@ namespace aux { struct block_cache_reference; }
hasher_thread_divisor = 4
};
void set_settings(settings_pack const* sett);
void settings_updated();
void abort(bool wait);
@ -517,7 +517,7 @@ namespace aux { struct block_cache_reference; }
job_queue m_hash_io_jobs;
disk_io_thread_pool m_hash_threads;
aux::session_settings m_settings;
aux::session_settings const& m_settings;
// the last time we expired write blocks from the cache
time_point m_last_cache_expiry = min_time();

View File

@ -56,7 +56,11 @@ POSSIBILITY OF SUCH DAMAGE.
//
namespace libtorrent {
namespace aux { struct session_impl; struct session_settings; }
namespace aux {
struct session_impl;
struct session_settings;
struct session_settings_single_thread;
}
struct settings_pack;
struct bdecode_node;
@ -65,6 +69,9 @@ namespace libtorrent {
TORRENT_EXTRA_EXPORT void save_settings_to_dict(aux::session_settings const& s, entry::dictionary_type& sett);
TORRENT_EXTRA_EXPORT void apply_pack(settings_pack const* pack, aux::session_settings& sett
, aux::session_impl* ses = nullptr);
TORRENT_EXTRA_EXPORT void apply_pack_impl(settings_pack const* pack
, aux::session_settings_single_thread& sett
, std::vector<void(aux::session_impl::*)()>* callbacks = nullptr);
TORRENT_EXTRA_EXPORT void run_all_updates(aux::session_impl& ses);
TORRENT_EXPORT int setting_by_name(string_view name);
@ -84,7 +91,9 @@ namespace libtorrent {
//
struct TORRENT_EXPORT settings_pack
{
friend TORRENT_EXTRA_EXPORT void apply_pack(settings_pack const* pack, aux::session_settings& sett, aux::session_impl* ses);
friend TORRENT_EXTRA_EXPORT void apply_pack_impl(settings_pack const*
, aux::session_settings_single_thread&
, std::vector<void(aux::session_impl::*)()>*);
settings_pack() = default;
settings_pack(settings_pack const&) = default;

View File

@ -277,7 +277,7 @@ namespace libtorrent {
virtual ~storage_interface() {}
// initialized in disk_io_thread::perform_async_job
aux::session_settings* m_settings = nullptr;
aux::session_settings const* m_settings = nullptr;
storage_index_t storage_index() const { return m_storage_index; }
void set_storage_index(storage_index_t st) { m_storage_index = st; }

View File

@ -294,7 +294,14 @@ namespace {
}
counters cnt;
disk_io_thread disk_thread(ios, cnt);
aux::session_settings sett;
sett.set_int(settings_pack::cache_size, 0);
int const num_threads = disk_io_thread::hasher_thread_divisor - 1;
int const jobs_per_thread = 4;
sett.set_int(settings_pack::aio_threads, num_threads);
disk_io_thread disk_thread(ios, sett, cnt);
disk_aborter da(disk_thread);
aux::vector<download_priority_t, file_index_t> priorities;
@ -311,14 +318,6 @@ namespace {
storage_holder storage = disk_thread.new_torrent(default_storage_constructor
, params, std::shared_ptr<void>());
settings_pack sett;
sett.set_int(settings_pack::cache_size, 0);
int const num_threads = disk_io_thread::hasher_thread_divisor - 1;
int const jobs_per_thread = 4;
sett.set_int(settings_pack::aio_threads, num_threads);
disk_thread.set_settings(&sett);
int const piece_read_ahead = std::max(num_threads * jobs_per_thread
, default_block_size / t.piece_length());

View File

@ -207,16 +207,17 @@ constexpr disk_job_flags_t disk_interface::cache_hit;
// ------- disk_io_thread ------
disk_io_thread::disk_io_thread(io_service& ios, counters& cnt)
disk_io_thread::disk_io_thread(io_service& ios, aux::session_settings const& sett, counters& cnt)
: m_generic_io_jobs(*this)
, m_generic_threads(m_generic_io_jobs, ios)
, m_hash_io_jobs(*this)
, m_hash_threads(m_hash_io_jobs, ios)
, m_settings(sett)
, m_disk_cache(ios, std::bind(&disk_io_thread::trigger_cache_trim, this))
, m_stats_counters(cnt)
, m_ios(ios)
{
m_disk_cache.set_settings(m_settings);
settings_updated();
}
storage_interface* disk_io_thread::get_torrent(storage_index_t const storage)
@ -333,11 +334,10 @@ constexpr disk_job_flags_t disk_interface::cache_hit;
}
}
void disk_io_thread::set_settings(settings_pack const* pack)
void disk_io_thread::settings_updated()
{
TORRENT_ASSERT(m_magic == 0x1337);
std::unique_lock<std::mutex> l(m_cache_mutex);
apply_pack(pack, m_settings);
m_disk_cache.set_settings(m_settings);
m_file_pool.resize(m_settings.get_int(settings_pack::file_pool_size));

View File

@ -404,7 +404,7 @@ namespace aux {
#endif
, m_alerts(m_settings.get_int(settings_pack::alert_queue_size)
, alert_category_t{static_cast<unsigned int>(m_settings.get_int(settings_pack::alert_mask))})
, m_disk_thread(m_io_service, m_stats_counters)
, m_disk_thread(m_io_service, m_settings, m_stats_counters)
, m_download_rate(peer_connection::download_channel)
, m_upload_rate(peer_connection::upload_channel)
, m_host_resolver(m_io_service)
@ -447,7 +447,6 @@ namespace aux {
, m_lsd_announce_timer(m_io_service)
, m_close_file_timer(m_io_service)
{
m_disk_thread.set_settings(&pack);
}
template <typename Fun, typename... Args>
@ -671,21 +670,24 @@ namespace aux {
settings = e->dict_find_dict("proxy");
if (settings)
{
bdecode_node val;
val = settings.dict_find_int("port");
if (val) m_settings.set_int(settings_pack::proxy_port, int(val.int_value()));
val = settings.dict_find_int("type");
if (val) m_settings.set_int(settings_pack::proxy_type, int(val.int_value()));
val = settings.dict_find_int("proxy_hostnames");
if (val) m_settings.set_bool(settings_pack::proxy_hostnames, val.int_value() != 0);
val = settings.dict_find_int("proxy_peer_connections");
if (val) m_settings.set_bool(settings_pack::proxy_peer_connections, val.int_value() != 0);
val = settings.dict_find_string("hostname");
if (val) m_settings.set_str(settings_pack::proxy_hostname, val.string_value().to_string());
val = settings.dict_find_string("password");
if (val) m_settings.set_str(settings_pack::proxy_password, val.string_value().to_string());
val = settings.dict_find_string("username");
if (val) m_settings.set_str(settings_pack::proxy_username, val.string_value().to_string());
m_settings.bulk_set([&settings](session_settings_single_thread& s)
{
bdecode_node val;
val = settings.dict_find_int("port");
if (val) s.set_int(settings_pack::proxy_port, int(val.int_value()));
val = settings.dict_find_int("type");
if (val) s.set_int(settings_pack::proxy_type, int(val.int_value()));
val = settings.dict_find_int("proxy_hostnames");
if (val) s.set_bool(settings_pack::proxy_hostnames, val.int_value() != 0);
val = settings.dict_find_int("proxy_peer_connections");
if (val) s.set_bool(settings_pack::proxy_peer_connections, val.int_value() != 0);
val = settings.dict_find_string("hostname");
if (val) s.set_str(settings_pack::proxy_hostname, val.string_value().to_string());
val = settings.dict_find_string("password");
if (val) s.set_str(settings_pack::proxy_password, val.string_value().to_string());
val = settings.dict_find_string("username");
if (val) s.set_str(settings_pack::proxy_username, val.string_value().to_string());
});
need_update_proxy = true;
}
}
@ -693,15 +695,18 @@ namespace aux {
settings = e->dict_find_dict("encryption");
if (settings)
{
bdecode_node val;
val = settings.dict_find_int("prefer_rc4");
if (val) m_settings.set_bool(settings_pack::prefer_rc4, val.int_value() != 0);
val = settings.dict_find_int("out_enc_policy");
if (val) m_settings.set_int(settings_pack::out_enc_policy, int(val.int_value()));
val = settings.dict_find_int("in_enc_policy");
if (val) m_settings.set_int(settings_pack::in_enc_policy, int(val.int_value()));
val = settings.dict_find_int("allowed_enc_level");
if (val) m_settings.set_int(settings_pack::allowed_enc_level, int(val.int_value()));
m_settings.bulk_set([&settings](session_settings_single_thread& s)
{
bdecode_node val;
val = settings.dict_find_int("prefer_rc4");
if (val) s.set_bool(settings_pack::prefer_rc4, val.int_value() != 0);
val = settings.dict_find_int("out_enc_policy");
if (val) s.set_int(settings_pack::out_enc_policy, int(val.int_value()));
val = settings.dict_find_int("in_enc_policy");
if (val) s.set_int(settings_pack::in_enc_policy, int(val.int_value()));
val = settings.dict_find_int("allowed_enc_level");
if (val) s.set_int(settings_pack::allowed_enc_level, int(val.int_value()));
});
}
#endif
@ -1288,7 +1293,7 @@ namespace aux {
#endif
apply_pack(&pack, m_settings, this);
m_disk_thread.set_settings(&pack);
m_disk_thread.settings_updated();
if (!reopen_listen_port)
{

View File

@ -33,17 +33,33 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/aux_/session_settings.hpp"
#include "libtorrent/settings_pack.hpp"
#include <functional>
namespace libtorrent { namespace aux {
session_settings::session_settings()
{
initialize_default_settings(*this);
}
session_settings::session_settings() = default;
session_settings::session_settings(settings_pack const& p)
{
initialize_default_settings(*this);
apply_pack(&p, *this);
apply_pack_impl(&p, m_store);
}
void session_settings::bulk_set(std::function<void(session_settings_single_thread&)> f)
{
std::unique_lock<std::mutex> l(m_mutex);
f(m_store);
}
void session_settings::bulk_get(std::function<void(session_settings_single_thread const&)> f) const
{
std::unique_lock<std::mutex> l(m_mutex);
f(m_store);
}
session_settings_single_thread::session_settings_single_thread()
{
initialize_default_settings(*this);
}
} }

View File

@ -35,6 +35,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/settings_pack.hpp"
#include "libtorrent/aux_/session_impl.hpp"
#include "libtorrent/aux_/array.hpp"
#include "libtorrent/aux_/session_settings.hpp"
#include <algorithm>
@ -408,7 +409,7 @@ constexpr int CLOSE_FILE_INTERVAL = 0;
for (int k = 0; k < int_settings.end_index(); ++k)
{
if (key != int_settings[k].name) continue;
pack.set_int(settings_pack::int_type_base + k, int(val.int_value()));
pack.set_int(settings_pack::int_type_base | k, int(val.int_value()));
found = true;
break;
}
@ -416,7 +417,7 @@ constexpr int CLOSE_FILE_INTERVAL = 0;
for (int k = 0; k < bool_settings.end_index(); ++k)
{
if (key != bool_settings[k].name) continue;
pack.set_bool(settings_pack::bool_type_base + k, val.int_value() != 0);
pack.set_bool(settings_pack::bool_type_base | k, val.int_value() != 0);
break;
}
}
@ -436,26 +437,29 @@ constexpr int CLOSE_FILE_INTERVAL = 0;
return pack;
}
void save_settings_to_dict(aux::session_settings const& s, entry::dictionary_type& sett)
void save_settings_to_dict(aux::session_settings const& sett, entry::dictionary_type& out)
{
sett.bulk_get([&out](aux::session_settings_single_thread const& s)
{
// loop over all settings that differ from default
for (int i = 0; i < settings_pack::num_string_settings; ++i)
{
if (ensure_string(str_settings[i].default_value) == s.m_strings[std::size_t(i)]) continue;
sett[str_settings[i].name] = s.m_strings[std::size_t(i)];
}
for (int i = 0; i < settings_pack::num_string_settings; ++i)
{
if (ensure_string(str_settings[i].default_value) == s.get_str(i | settings_pack::string_type_base)) continue;
out[str_settings[i].name] = s.get_str(i | settings_pack::string_type_base);
}
for (int i = 0; i < settings_pack::num_int_settings; ++i)
{
if (int_settings[i].default_value == s.m_ints[std::size_t(i)]) continue;
sett[int_settings[i].name] = s.m_ints[std::size_t(i)];
}
for (int i = 0; i < settings_pack::num_int_settings; ++i)
{
if (int_settings[i].default_value == s.get_int(i | settings_pack::int_type_base)) continue;
out[int_settings[i].name] = s.get_int(i | settings_pack::int_type_base);
}
for (int i = 0; i < settings_pack::num_bool_settings; ++i)
{
if (bool_settings[i].default_value == s.m_bools[std::size_t(i)]) continue;
sett[bool_settings[i].name] = s.m_bools[std::size_t(i)];
}
for (int i = 0; i < settings_pack::num_bool_settings; ++i)
{
if (bool_settings[i].default_value == s.get_bool(i | settings_pack::bool_type_base)) continue;
out[bool_settings[i].name] = s.get_bool(i | settings_pack::bool_type_base);
}
});
}
void run_all_updates(aux::session_impl& ses)
@ -480,24 +484,24 @@ constexpr int CLOSE_FILE_INTERVAL = 0;
}
}
void initialize_default_settings(aux::session_settings& s)
void initialize_default_settings(aux::session_settings_single_thread& s)
{
for (int i = 0; i < settings_pack::num_string_settings; ++i)
{
if (str_settings[i].default_value == nullptr) continue;
s.set_str(settings_pack::string_type_base + i, str_settings[i].default_value);
s.set_str(settings_pack::string_type_base | i, str_settings[i].default_value);
TORRENT_ASSERT(s.get_str(settings_pack::string_type_base + i) == str_settings[i].default_value);
}
for (int i = 0; i < settings_pack::num_int_settings; ++i)
{
s.set_int(settings_pack::int_type_base + i, int_settings[i].default_value);
s.set_int(settings_pack::int_type_base | i, int_settings[i].default_value);
TORRENT_ASSERT(s.get_int(settings_pack::int_type_base + i) == int_settings[i].default_value);
}
for (int i = 0; i < settings_pack::num_bool_settings; ++i)
{
s.set_bool(settings_pack::bool_type_base + i, bool_settings[i].default_value);
s.set_bool(settings_pack::bool_type_base | i, bool_settings[i].default_value);
TORRENT_ASSERT(s.get_bool(settings_pack::bool_type_base + i) == bool_settings[i].default_value);
}
}
@ -530,6 +534,22 @@ constexpr int CLOSE_FILE_INTERVAL = 0;
using fun_t = void (aux::session_impl::*)();
std::vector<fun_t> callbacks;
sett.bulk_set([&](aux::session_settings_single_thread& s)
{
apply_pack_impl(pack, s, ses ? &callbacks : nullptr);
});
// call the callbacks once all the settings have been applied, and
// only once per callback
for (auto const& f : callbacks)
{
(ses->*f)();
}
}
void apply_pack_impl(settings_pack const* pack, aux::session_settings_single_thread& sett
, std::vector<void(aux::session_impl::*)()>* callbacks)
{
for (auto const& p : pack->m_strings)
{
// disregard setting indices that are not string types
@ -548,9 +568,9 @@ constexpr int CLOSE_FILE_INTERVAL = 0;
sett.set_str(p.first, p.second);
str_setting_entry_t const& sa = str_settings[index];
if (sa.fun && ses
&& std::find(callbacks.begin(), callbacks.end(), sa.fun) == callbacks.end())
callbacks.push_back(sa.fun);
if (sa.fun && callbacks
&& std::find(callbacks->begin(), callbacks->end(), sa.fun) == callbacks->end())
callbacks->push_back(sa.fun);
}
for (auto const& p : pack->m_ints)
@ -570,9 +590,9 @@ constexpr int CLOSE_FILE_INTERVAL = 0;
sett.set_int(p.first, p.second);
int_setting_entry_t const& sa = int_settings[index];
if (sa.fun && ses
&& std::find(callbacks.begin(), callbacks.end(), sa.fun) == callbacks.end())
callbacks.push_back(sa.fun);
if (sa.fun && callbacks
&& std::find(callbacks->begin(), callbacks->end(), sa.fun) == callbacks->end())
callbacks->push_back(sa.fun);
}
for (auto const& p : pack->m_bools)
@ -592,16 +612,9 @@ constexpr int CLOSE_FILE_INTERVAL = 0;
sett.set_bool(p.first, p.second);
bool_setting_entry_t const& sa = bool_settings[index];
if (sa.fun && ses
&& std::find(callbacks.begin(), callbacks.end(), sa.fun) == callbacks.end())
callbacks.push_back(sa.fun);
}
// call the callbacks once all the settings have been applied, and
// only once per callback
for (auto const& f : callbacks)
{
(ses->*f)();
if (sa.fun && callbacks
&& std::find(callbacks->begin(), callbacks->end(), sa.fun) == callbacks->end())
callbacks->push_back(sa.fun);
}
}

View File

@ -487,10 +487,9 @@ void test_check_files(std::string const& test_path
boost::asio::io_service ios;
counters cnt;
disk_io_thread io(ios, cnt);
settings_pack sett;
aux::session_settings sett;
sett.set_int(settings_pack::aio_threads, 1);
io.set_settings(&sett);
disk_io_thread io(ios, sett, cnt);
disk_buffer_pool dp(ios, std::bind(&nop));