From 60c119c52d5b00d4f02f7315757736c1b5d1da7a Mon Sep 17 00:00:00 2001 From: arvidn Date: Sun, 1 May 2016 02:21:30 -0400 Subject: [PATCH] remove mutex in natpmp --- include/libtorrent/aux_/session_impl.hpp | 2 +- include/libtorrent/natpmp.hpp | 27 +++-- src/natpmp.cpp | 123 ++++++++++++----------- 3 files changed, 75 insertions(+), 77 deletions(-) diff --git a/include/libtorrent/aux_/session_impl.hpp b/include/libtorrent/aux_/session_impl.hpp index e5c287bd3..e690be89d 100644 --- a/include/libtorrent/aux_/session_impl.hpp +++ b/include/libtorrent/aux_/session_impl.hpp @@ -101,7 +101,7 @@ namespace libtorrent struct plugin; struct upnp; - class natpmp; + struct natpmp; class lsd; class torrent; class alert; diff --git a/include/libtorrent/natpmp.hpp b/include/libtorrent/natpmp.hpp index ea8a74579..88d5d7d9b 100644 --- a/include/libtorrent/natpmp.hpp +++ b/include/libtorrent/natpmp.hpp @@ -39,8 +39,7 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/error_code.hpp" #include "libtorrent/deadline_timer.hpp" #include "libtorrent/time.hpp" - -#include +#include "libtorrent/debug.hpp" #include "libtorrent/aux_/disable_warnings_push.hpp" @@ -59,9 +58,10 @@ namespace libtorrent typedef boost::function portmap_callback_t; typedef boost::function log_callback_t; -class natpmp : public boost::enable_shared_from_this +struct natpmp + : boost::enable_shared_from_this + , single_threaded { -public: natpmp(io_service& ios, portmap_callback_t const& cb , log_callback_t const& lcb); @@ -80,19 +80,19 @@ private: boost::shared_ptr self() { return shared_from_this(); } - void update_mapping(int i, std::unique_lock& l); - void send_map_request(int i, std::unique_lock& l); - void send_get_ip_address_request(std::unique_lock& l); + void update_mapping(int i); + void send_map_request(int i); + void send_get_ip_address_request(); void resend_request(int i, error_code const& e); void on_reply(error_code const& e , std::size_t bytes_transferred); - void try_next_mapping(int i, std::unique_lock& l); - void update_expiration_timer(std::unique_lock& l); + void try_next_mapping(int i); + void update_expiration_timer(); void mapping_expired(error_code const& e, int i); - void close_impl(std::unique_lock& l); + void close_impl(); - void log(char const* msg, std::unique_lock& l); - void disable(error_code const& ec, std::unique_lock& l); + void log(char const* msg); + void disable(error_code const& ec); struct mapping_t { @@ -173,9 +173,6 @@ private: bool m_disabled; bool m_abort; - - // TODO:3 is this object really acceessed from multiple threads? - mutable std::mutex m_mutex; }; } diff --git a/src/natpmp.cpp b/src/natpmp.cpp index 035fd5d7a..a2e2f33fb 100644 --- a/src/natpmp.cpp +++ b/src/natpmp.cpp @@ -79,11 +79,12 @@ natpmp::natpmp(io_service& ios // for this array not to be reallocated, by passing // around pointers to its elements. so reserve size for now m_mappings.reserve(10); + TORRENT_ASSERT(is_single_thread()); } void natpmp::start() { - std::unique_lock l(m_mutex); + TORRENT_ASSERT(is_single_thread()); error_code ec; address gateway = get_default_gateway(m_socket.get_io_service(), ec); @@ -92,8 +93,8 @@ void natpmp::start() char msg[200]; snprintf(msg, sizeof(msg), "failed to find default route: %s" , convert_from_native(ec.message()).c_str()); - log(msg, l); - disable(ec, l); + log(msg); + disable(ec); return; } @@ -106,25 +107,25 @@ void natpmp::start() char msg[200]; snprintf(msg, sizeof(msg), "found router at: %s" , print_address(m_nat_endpoint.address()).c_str()); - log(msg, l); + log(msg); m_socket.open(udp::v4(), ec); if (ec) { - disable(ec, l); + disable(ec); return; } m_socket.bind(udp::endpoint(address_v4::any(), 0), ec); if (ec) { - disable(ec, l); + disable(ec); return; } ADD_OUTSTANDING_ASYNC("natpmp::on_reply"); m_socket.async_receive_from(boost::asio::buffer(&m_response_buffer, 16) , m_remote, boost::bind(&natpmp::on_reply, self(), _1, _2)); - send_get_ip_address_request(l); + send_get_ip_address_request(); for (std::vector::iterator i = m_mappings.begin() , end(m_mappings.end()); i != end; ++i) @@ -133,19 +134,20 @@ void natpmp::start() || i->action != mapping_t::action_none) continue; i->action = mapping_t::action_add; - update_mapping(i - m_mappings.begin(), l); + update_mapping(i - m_mappings.begin()); } } -void natpmp::send_get_ip_address_request(std::unique_lock& l) +void natpmp::send_get_ip_address_request() { + TORRENT_ASSERT(is_single_thread()); using namespace libtorrent::detail; char buf[2]; char* out = buf; write_uint8(0, out); // NAT-PMP version write_uint8(0, out); // public IP address request opcode - log("==> get public IP address", l); + log("==> get public IP address"); error_code ec; m_socket.send_to(boost::asio::buffer(buf, sizeof(buf)), m_nat_endpoint, 0, ec); @@ -153,7 +155,7 @@ void natpmp::send_get_ip_address_request(std::unique_lock& l) bool natpmp::get_mapping(int index, int& local_port, int& external_port, int& protocol) const { - std::unique_lock l(m_mutex); + TORRENT_ASSERT(is_single_thread()); TORRENT_ASSERT(index < int(m_mappings.size()) && index >= 0); if (index >= int(m_mappings.size()) || index < 0) return false; @@ -165,15 +167,15 @@ bool natpmp::get_mapping(int index, int& local_port, int& external_port, int& pr return true; } -void natpmp::log(char const* msg, std::unique_lock& l) +void natpmp::log(char const* msg) { - l.unlock(); + TORRENT_ASSERT(is_single_thread()); m_log_callback(msg); - l.lock(); } -void natpmp::disable(error_code const& ec, std::unique_lock& l) +void natpmp::disable(error_code const& ec) { + TORRENT_ASSERT(is_single_thread()); m_disabled = true; for (std::vector::iterator i = m_mappings.begin() @@ -183,16 +185,14 @@ void natpmp::disable(error_code const& ec, std::unique_lock& l) int const proto = i->protocol; i->protocol = none; int index = i - m_mappings.begin(); - l.unlock(); m_callback(index, address(), 0, proto, ec); - l.lock(); } - close_impl(l); + close_impl(); } void natpmp::delete_mapping(int index) { - std::unique_lock l(m_mutex); + TORRENT_ASSERT(is_single_thread()); TORRENT_ASSERT(index < int(m_mappings.size()) && index >= 0); if (index >= int(m_mappings.size()) || index < 0) return; @@ -207,12 +207,12 @@ void natpmp::delete_mapping(int index) } m.action = mapping_t::action_delete; - update_mapping(index, l); + update_mapping(index); } int natpmp::add_mapping(protocol_type p, int external_port, int local_port) { - std::unique_lock l(m_mutex); + TORRENT_ASSERT(is_single_thread()); if (m_disabled) return -1; @@ -245,12 +245,13 @@ int natpmp::add_mapping(protocol_type p, int external_port, int local_port) } #endif - update_mapping(mapping_index, l); + update_mapping(mapping_index); return mapping_index; } -void natpmp::try_next_mapping(int i, std::unique_lock& l) +void natpmp::try_next_mapping(int i) { + TORRENT_ASSERT(is_single_thread()); #ifdef NATPMP_LOG time_point now = aux::time_now(); for (std::vector::iterator m = m_mappings.begin() @@ -267,7 +268,7 @@ void natpmp::try_next_mapping(int i, std::unique_lock& l) #endif if (i < int(m_mappings.size()) - 1) { - update_mapping(i + 1, l); + update_mapping(i + 1); return; } @@ -293,11 +294,12 @@ void natpmp::try_next_mapping(int i, std::unique_lock& l) std::cout << " updating " << (m - m_mappings.begin()) << std::endl; #endif - update_mapping(m - m_mappings.begin(), l); + update_mapping(m - m_mappings.begin()); } -void natpmp::update_mapping(int i, std::unique_lock& l) +void natpmp::update_mapping(int i) { + TORRENT_ASSERT(is_single_thread()); if (i == int(m_mappings.size())) { if (m_abort) @@ -316,7 +318,7 @@ void natpmp::update_mapping(int i, std::unique_lock& l) if (m.action == mapping_t::action_none || m.protocol == none) { - try_next_mapping(i, l); + try_next_mapping(i); return; } @@ -325,12 +327,13 @@ void natpmp::update_mapping(int i, std::unique_lock& l) // the socket is not currently in use // send out a mapping request m_retry_count = 0; - send_map_request(i, l); + send_map_request(i); } } -void natpmp::send_map_request(int i, std::unique_lock& l) +void natpmp::send_map_request(int i) { + TORRENT_ASSERT(is_single_thread()); using namespace libtorrent::detail; TORRENT_ASSERT(m_currently_mapping == -1 @@ -354,7 +357,7 @@ void natpmp::send_map_request(int i, std::unique_lock& l) , i, m.action == mapping_t::action_add ? "add" : "delete" , m.protocol == udp ? "udp" : "tcp" , m.local_port, m.external_port, ttl); - log(msg, l); + log(msg); error_code ec; m_socket.send_to(boost::asio::buffer(buf, sizeof(buf)), m_nat_endpoint, 0, ec); @@ -367,7 +370,7 @@ void natpmp::send_map_request(int i, std::unique_lock& l) // immediately m_currently_mapping = -1; m.action = mapping_t::action_none; - try_next_mapping(i, l); + try_next_mapping(i); } else { @@ -381,9 +384,9 @@ void natpmp::send_map_request(int i, std::unique_lock& l) void natpmp::resend_request(int i, error_code const& e) { + TORRENT_ASSERT(is_single_thread()); COMPLETE_ASYNC("natpmp::resend_request"); if (e) return; - std::unique_lock l(m_mutex); if (m_currently_mapping != i) return; // if we're shutting down, don't retry, just move on @@ -394,16 +397,16 @@ void natpmp::resend_request(int i, error_code const& e) m_mappings[i].action = mapping_t::action_none; // try again in two hours m_mappings[i].expires = aux::time_now() + hours(2); - try_next_mapping(i, l); + try_next_mapping(i); return; } - send_map_request(i, l); + send_map_request(i); } void natpmp::on_reply(error_code const& e , std::size_t bytes_transferred) { - std::unique_lock l(m_mutex); + TORRENT_ASSERT(is_single_thread()); COMPLETE_ASYNC("natpmp::on_reply"); @@ -413,7 +416,7 @@ void natpmp::on_reply(error_code const& e char msg[200]; snprintf(msg, sizeof(msg), "error on receiving reply: %s" , convert_from_native(e.message()).c_str()); - log(msg, l); + log(msg); return; } @@ -430,7 +433,7 @@ void natpmp::on_reply(error_code const& e /* if ((random() % 2) == 0) { - log(" simulating drop", l); + log(" simulating drop"); return; } */ @@ -439,7 +442,7 @@ void natpmp::on_reply(error_code const& e char msg[200]; snprintf(msg, sizeof(msg), "received packet from wrong IP: %s" , print_endpoint(m_remote).c_str()); - log(msg, l); + log(msg); return; } @@ -450,7 +453,7 @@ void natpmp::on_reply(error_code const& e { char msg[200]; snprintf(msg, sizeof(msg), "received packet of invalid size: %d", int(bytes_transferred)); - log(msg, l); + log(msg); return; } @@ -467,7 +470,7 @@ void natpmp::on_reply(error_code const& e char msg[200]; snprintf(msg, sizeof(msg), "<== public IP address [ %s ]", print_address(m_external_ip).c_str()); - log(msg, l); + log(msg); return; } @@ -476,7 +479,7 @@ void natpmp::on_reply(error_code const& e { char msg[200]; snprintf(msg, sizeof(msg), "received packet of invalid size: %d", int(bytes_transferred)); - log(msg, l); + log(msg); return; } @@ -498,7 +501,7 @@ void natpmp::on_reply(error_code const& e { snprintf(msg + num_chars, sizeof(msg) - num_chars, "unexpected version: %u" , version); - log(msg, l); + log(msg); } mapping_t* m = 0; @@ -518,12 +521,12 @@ void natpmp::on_reply(error_code const& e if (m == 0) { snprintf(msg + num_chars, sizeof(msg) - num_chars, " not found in map table"); - log(msg, l); + log(msg); return; } m->outstanding_request = false; - log(msg, l); + log(msg); if (public_port == 0 || lifetime == 0) { @@ -552,18 +555,14 @@ void natpmp::on_reply(error_code const& e m->expires = aux::time_now() + hours(2); int const proto = m->protocol; - l.unlock(); m_callback(index, address(), 0, proto , error_code(ev, get_libtorrent_category())); - l.lock(); } else if (m->action == mapping_t::action_add) { int const proto = m->protocol; - l.unlock(); m_callback(index, m_external_ip, m->external_port, proto , error_code(errors::no_error, get_libtorrent_category())); - l.lock(); } if (m_abort) return; @@ -571,12 +570,13 @@ void natpmp::on_reply(error_code const& e m_currently_mapping = -1; m->action = mapping_t::action_none; m_send_timer.cancel(ec); - update_expiration_timer(l); - try_next_mapping(index, l); + update_expiration_timer(); + try_next_mapping(index); } -void natpmp::update_expiration_timer(std::unique_lock& l) +void natpmp::update_expiration_timer() { + TORRENT_ASSERT(is_single_thread()); if (m_abort) return; time_point now = aux::time_now() + milliseconds(100); @@ -606,10 +606,10 @@ void natpmp::update_expiration_timer(std::unique_lock& l) { char msg[200]; snprintf(msg, sizeof(msg), "mapping %u expired", index); - log(msg, l); + log(msg); i->action = mapping_t::action_add; if (m_next_refresh == index) m_next_refresh = -1; - update_mapping(index, l); + update_mapping(index); } else if (i->expires < min_expire) { @@ -641,27 +641,28 @@ void natpmp::update_expiration_timer(std::unique_lock& l) void natpmp::mapping_expired(error_code const& e, int i) { + TORRENT_ASSERT(is_single_thread()); COMPLETE_ASYNC("natpmp::mapping_expired"); if (e) return; - std::unique_lock l(m_mutex); char msg[200]; snprintf(msg, sizeof(msg), "mapping %u expired", i); - log(msg, l); + log(msg); m_mappings[i].action = mapping_t::action_add; if (m_next_refresh == i) m_next_refresh = -1; - update_mapping(i, l); + update_mapping(i); } void natpmp::close() { - std::unique_lock l(m_mutex); - close_impl(l); + TORRENT_ASSERT(is_single_thread()); + close_impl(); } -void natpmp::close_impl(std::unique_lock& l) +void natpmp::close_impl() { + TORRENT_ASSERT(is_single_thread()); m_abort = true; - log("closing", l); + log("closing"); #ifdef NATPMP_LOG std::cout << time_now_string() << " close" << std::endl; time_point now = aux::time_now(); @@ -685,6 +686,6 @@ void natpmp::close_impl(std::unique_lock& l) error_code ec; m_refresh_timer.cancel(ec); m_currently_mapping = -1; - update_mapping(0, l); + update_mapping(0); }