From 96aa1f162b1b5ccb20c76efbda0691171c3feb97 Mon Sep 17 00:00:00 2001 From: Arvid Norberg Date: Sat, 30 Jun 2012 15:30:38 +0000 Subject: [PATCH] fix for udp_socket observers and some dht warning fixes --- include/libtorrent/aux_/session_impl.hpp | 2 +- include/libtorrent/kademlia/dht_tracker.hpp | 1 + include/libtorrent/udp_socket.hpp | 11 ++- src/kademlia/dht_tracker.cpp | 2 + src/udp_socket.cpp | 77 ++++++++++++--------- 5 files changed, 58 insertions(+), 35 deletions(-) diff --git a/include/libtorrent/aux_/session_impl.hpp b/include/libtorrent/aux_/session_impl.hpp index 8aee8716c..fbd46e229 100644 --- a/include/libtorrent/aux_/session_impl.hpp +++ b/include/libtorrent/aux_/session_impl.hpp @@ -218,7 +218,7 @@ namespace libtorrent , std::string const& logpath #endif ); - ~session_impl(); + virtual ~session_impl(); void init(); void start_session(); diff --git a/include/libtorrent/kademlia/dht_tracker.hpp b/include/libtorrent/kademlia/dht_tracker.hpp index b7228985a..d50801fff 100644 --- a/include/libtorrent/kademlia/dht_tracker.hpp +++ b/include/libtorrent/kademlia/dht_tracker.hpp @@ -79,6 +79,7 @@ namespace libtorrent { namespace dht // TODO: take a udp_socket_interface here instead. Move udp_socket_interface down into libtorrent core dht_tracker(libtorrent::aux::session_impl& ses, rate_limited_udp_socket& sock , dht_settings const& settings, entry const* state = 0); + virtual ~dht_tracker(); void start(entry const& bootstrap); void stop(); diff --git a/include/libtorrent/udp_socket.hpp b/include/libtorrent/udp_socket.hpp index 24dcd7fb8..f64a1f9f0 100644 --- a/include/libtorrent/udp_socket.hpp +++ b/include/libtorrent/udp_socket.hpp @@ -146,7 +146,17 @@ namespace libtorrent public: #endif + // observers on this udp socket std::vector m_observers; + std::vector m_added_observers; + + // this is true while iterating over the observers + // vector, invoking observer hooks. We may not + // add new observers during this time, since it + // may invalidate the iterator. If this is true, + // instead add new observers to m_added_observers + // and they will be added later + bool m_observers_locked; void call_handler(error_code const& ec, udp::endpoint const& ep, char const* buf, int size); void call_handler(error_code const& ec, const char* host, char const* buf, int size); @@ -174,7 +184,6 @@ namespace libtorrent void unwrap(error_code const& e, char const* buf, int size); #if defined TORRENT_DEBUG || TORRENT_RELEASE_ASSERTS - bool m_observers_locked; // TODO: move this debug facility into a base class. It's used in a lot of places #if defined BOOST_HAS_PTHREADS diff --git a/src/kademlia/dht_tracker.cpp b/src/kademlia/dht_tracker.cpp index a2ba36a37..3edb65e57 100644 --- a/src/kademlia/dht_tracker.cpp +++ b/src/kademlia/dht_tracker.cpp @@ -241,6 +241,8 @@ namespace libtorrent { namespace dht #endif } + dht_tracker::~dht_tracker() {} + // defined in node.cpp extern void nop(); diff --git a/src/udp_socket.cpp b/src/udp_socket.cpp index 3ec391488..f360c4b2e 100644 --- a/src/udp_socket.cpp +++ b/src/udp_socket.cpp @@ -54,7 +54,8 @@ using namespace libtorrent; udp_socket::udp_socket(asio::io_service& ios , connection_queue& cc) - : m_ipv4_sock(ios) + : m_observers_locked(false) + , m_ipv4_sock(ios) , m_buf_size(0) , m_buf(0) #if TORRENT_USE_IPV6 @@ -75,7 +76,6 @@ udp_socket::udp_socket(asio::io_service& ios , m_outstanding_ops(0) { #if defined TORRENT_DEBUG || TORRENT_RELEASE_ASSERTS - m_observers_locked = false; m_magic = 0x1337; m_started = false; m_outstanding_when_aborted = -1; @@ -228,74 +228,85 @@ void udp_socket::on_read(udp::socket* s) void udp_socket::call_handler(error_code const& ec, udp::endpoint const& ep, char const* buf, int size) { -#if defined TORRENT_DEBUG || TORRENT_RELEASE_ASSERTS m_observers_locked = true; -#endif - for (std::vector::iterator i = m_observers.begin() - , end(m_observers.end()); i != end; ++i) + for (std::vector::iterator i = m_observers.begin(); + i != m_observers.end();) { + bool ret = false; TORRENT_TRY { - - if ((*i)->incoming_packet(ec, ep, buf, size)) - break; - + ret = (*i)->incoming_packet(ec, ep, buf, size); } TORRENT_CATCH (std::exception&) {} + if (*i == NULL) i = m_observers.erase(i); + else ++i; + if (ret) break; + } + if (!m_added_observers.empty()) + { + m_observers.insert(m_observers.end(), m_added_observers.begin(), m_added_observers.end()); + m_added_observers.clear(); } -#if defined TORRENT_DEBUG || TORRENT_RELEASE_ASSERTS m_observers_locked = false; -#endif } void udp_socket::call_handler(error_code const& ec, const char* host, char const* buf, int size) { -#if defined TORRENT_DEBUG || TORRENT_RELEASE_ASSERTS m_observers_locked = true; -#endif - for (std::vector::iterator i = m_observers.begin() - , end(m_observers.end()); i != end; ++i) + for (std::vector::iterator i = m_observers.begin(); + i != m_observers.end();) { + bool ret = false; TORRENT_TRY { - - if ((*i)->incoming_packet(ec, host, buf, size)) - break; - + ret = (*i)->incoming_packet(ec, host, buf, size); } TORRENT_CATCH (std::exception&) {} + if (*i == NULL) i = m_observers.erase(i); + else ++i; + if (ret) break; + } + if (!m_added_observers.empty()) + { + m_observers.insert(m_observers.end(), m_added_observers.begin(), m_added_observers.end()); + m_added_observers.clear(); } -#if defined TORRENT_DEBUG || TORRENT_RELEASE_ASSERTS m_observers_locked = false; -#endif } void udp_socket::call_drained_handler() { -#if defined TORRENT_DEBUG || TORRENT_RELEASE_ASSERTS m_observers_locked = true; -#endif - for (std::vector::iterator i = m_observers.begin() - , end(m_observers.end()); i != end; ++i) + for (std::vector::iterator i = m_observers.begin(); + i != m_observers.end();) { TORRENT_TRY { - (*i)->socket_drained(); + (*i)->socket_drained(); } TORRENT_CATCH (std::exception&) {} + if (*i == NULL) i = m_observers.erase(i); + else ++i; + } + if (!m_added_observers.empty()) + { + m_observers.insert(m_observers.end(), m_added_observers.begin(), m_added_observers.end()); + m_added_observers.clear(); } -#if defined TORRENT_DEBUG || TORRENT_RELEASE_ASSERTS m_observers_locked = false; -#endif } void udp_socket::subscribe(udp_socket_observer* o) { - TORRENT_ASSERT(m_observers_locked == false); TORRENT_ASSERT(std::find(m_observers.begin(), m_observers.end(), o) == m_observers.end()); - m_observers.push_back(o); + if (m_observers_locked) + m_added_observers.push_back(o); + else + m_observers.push_back(o); } void udp_socket::unsubscribe(udp_socket_observer* o) { - TORRENT_ASSERT(m_observers_locked == false); std::vector::iterator i = std::find(m_observers.begin(), m_observers.end(), o); if (i == m_observers.end()) return; - m_observers.erase(i); + if (m_observers_locked) + *i = NULL; + else + m_observers.erase(i); } void udp_socket::on_read_impl(udp::socket* s, udp::endpoint const& ep