From 918dce134135a21f016967be0f7ca25d6bacf03f Mon Sep 17 00:00:00 2001 From: Arvid Norberg Date: Mon, 30 Apr 2012 05:39:35 +0000 Subject: [PATCH] back-ported part of the alert_dispatcher patch for the DHT --- include/libtorrent/Makefile.am | 1 + include/libtorrent/alert.hpp | 5 +++ include/libtorrent/alert_dispatcher.hpp | 50 +++++++++++++++++++++ include/libtorrent/aux_/session_impl.hpp | 9 +++- include/libtorrent/kademlia/dht_tracker.hpp | 8 ++-- include/libtorrent/kademlia/node.hpp | 18 +++++--- include/libtorrent/kademlia/rpc_manager.hpp | 10 ++--- src/kademlia/dht_tracker.cpp | 28 +----------- src/kademlia/node.cpp | 32 +++++++------ src/kademlia/rpc_manager.cpp | 14 +++--- src/session_impl.cpp | 8 ++++ test/test_dht.cpp | 28 +++++++++--- 12 files changed, 139 insertions(+), 72 deletions(-) create mode 100644 include/libtorrent/alert_dispatcher.hpp diff --git a/include/libtorrent/Makefile.am b/include/libtorrent/Makefile.am index de342e85c..f95f22048 100644 --- a/include/libtorrent/Makefile.am +++ b/include/libtorrent/Makefile.am @@ -8,6 +8,7 @@ nobase_include_HEADERS = \ address.hpp \ add_torrent_params.hpp \ alert.hpp \ + alert_dispatcher.hpp \ alert_types.hpp \ alloca.hpp \ allocator.hpp \ diff --git a/include/libtorrent/alert.hpp b/include/libtorrent/alert.hpp index bb4baeea5..3b4ccb8c9 100644 --- a/include/libtorrent/alert.hpp +++ b/include/libtorrent/alert.hpp @@ -146,6 +146,11 @@ namespace libtorrent { return (m_alert_mask & T::static_category) != 0; } + bool should_post(alert const* a) const + { + return m_alert_mask & a->category(); + } + alert const* wait_for_alert(time_duration max_wait); void set_alert_mask(boost::uint32_t m) diff --git a/include/libtorrent/alert_dispatcher.hpp b/include/libtorrent/alert_dispatcher.hpp new file mode 100644 index 000000000..82c0f4f6b --- /dev/null +++ b/include/libtorrent/alert_dispatcher.hpp @@ -0,0 +1,50 @@ +/* + +Copyright (c) 2012, Arvid Norberg +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in + the documentation and/or other materials provided with the distribution. + * Neither the name of the author nor the names of its + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. + +*/ + +#ifndef TORRENT_ALERT_DISPATCHER_HPP_INCLUDED +#define TORRENT_ALERT_DISPATCHER_HPP_INCLUDED + +namespace libtorrent +{ + struct alert; + + struct alert_dispatcher + { + // return true if the alert was swallowed (i.e. + // ownership was taken over). In this case, the + // alert will not be passed on to any one else + virtual bool post_alert(alert* a) = 0; + }; +} + +#endif + diff --git a/include/libtorrent/aux_/session_impl.hpp b/include/libtorrent/aux_/session_impl.hpp index 20497a878..a5efd9d46 100644 --- a/include/libtorrent/aux_/session_impl.hpp +++ b/include/libtorrent/aux_/session_impl.hpp @@ -85,6 +85,7 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/utp_socket_manager.hpp" #include "libtorrent/bloom_filter.hpp" #include "libtorrent/rss.hpp" +#include "libtorrent/alert_dispatcher.hpp" #if TORRENT_COMPLETE_TYPES_REQUIRED #include "libtorrent/peer_connection.hpp" @@ -181,7 +182,10 @@ namespace libtorrent // this is the link between the main thread and the // thread started to run the main downloader loop - struct TORRENT_EXTRA_EXPORT session_impl: boost::noncopyable, initialize_timer + struct TORRENT_EXTRA_EXPORT session_impl + : alert_dispatcher + , boost::noncopyable + , initialize_timer , boost::enable_shared_from_this { #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING @@ -525,6 +529,9 @@ namespace libtorrent // private: + // implements alert_dispatcher + virtual bool post_alert(alert* a); + void update_connections_limit(); void update_unchoke_limit(); void update_rate_settings(); diff --git a/include/libtorrent/kademlia/dht_tracker.hpp b/include/libtorrent/kademlia/dht_tracker.hpp index afe51aa11..8edad64e8 100644 --- a/include/libtorrent/kademlia/dht_tracker.hpp +++ b/include/libtorrent/kademlia/dht_tracker.hpp @@ -71,11 +71,11 @@ namespace libtorrent { namespace dht TORRENT_EXTRA_EXPORT void intrusive_ptr_add_ref(dht_tracker const*); TORRENT_EXTRA_EXPORT void intrusive_ptr_release(dht_tracker const*); - struct dht_tracker + struct dht_tracker : udp_socket_interface { friend void intrusive_ptr_add_ref(dht_tracker const*); friend void intrusive_ptr_release(dht_tracker const*); - friend bool send_callback(void* userdata, entry& e, udp::endpoint const& addr, int flags); + dht_tracker(libtorrent::aux::session_impl& ses, rate_limited_udp_socket& sock , dht_settings const& settings, entry const* state = 0); @@ -112,10 +112,10 @@ namespace libtorrent { namespace dht void refresh_timeout(error_code const& e); void tick(error_code const& e); - bool send_packet(libtorrent::entry& e, udp::endpoint const& addr, int send_flags); + // implements udp_socket_interface + virtual bool send_packet(libtorrent::entry& e, udp::endpoint const& addr, int send_flags); node_impl m_dht; - libtorrent::aux::session_impl& m_ses; rate_limited_udp_socket& m_sock; std::vector m_send_buf; diff --git a/include/libtorrent/kademlia/node.hpp b/include/libtorrent/kademlia/node.hpp index 45bbbe660..ea3502e4f 100644 --- a/include/libtorrent/kademlia/node.hpp +++ b/include/libtorrent/kademlia/node.hpp @@ -57,6 +57,7 @@ POSSIBILITY OF SUCH DAMAGE. namespace libtorrent { class alert_manager; + struct alert_dispatcher; } namespace libtorrent { namespace dht @@ -173,7 +174,12 @@ struct count_peers count += t.second.peers.size(); } }; - + +struct udp_socket_interface +{ + virtual bool send_packet(entry& e, udp::endpoint const& addr, int flags) = 0; +}; + class TORRENT_EXTRA_EXPORT node_impl : boost::noncopyable { typedef std::map table_t; @@ -183,10 +189,9 @@ typedef std::map dht_mutable_table_t; public: typedef boost::function3 external_ip_fun; - node_impl(libtorrent::alert_manager& alerts - , bool (*f)(void*, entry&, udp::endpoint const&, int) + node_impl(alert_dispatcher* alert_disp, udp_socket_interface* sock , dht_settings const& settings, node_id nid, address const& external_address - , external_ip_fun ext_ip, void* userdata); + , external_ip_fun ext_ip); virtual ~node_impl() {} @@ -298,9 +303,8 @@ private: // secret random numbers used to create write tokens int m_secret[2]; - libtorrent::alert_manager& m_alerts; - bool (*m_send)(void*, entry&, udp::endpoint const&, int); - void* m_userdata; + alert_dispatcher* m_post_alert; + udp_socket_interface* m_sock; }; diff --git a/include/libtorrent/kademlia/rpc_manager.hpp b/include/libtorrent/kademlia/rpc_manager.hpp index 92c430e0e..5da1c82c9 100644 --- a/include/libtorrent/kademlia/rpc_manager.hpp +++ b/include/libtorrent/kademlia/rpc_manager.hpp @@ -56,6 +56,8 @@ namespace libtorrent { namespace dht TORRENT_DECLARE_LOG(rpc); #endif +struct udp_socket_interface; + struct null_observer : public observer { null_observer(boost::intrusive_ptr const& a @@ -68,12 +70,11 @@ class routing_table; class TORRENT_EXTRA_EXPORT rpc_manager { public: - typedef bool (*send_fun)(void* userdata, entry&, udp::endpoint const&, int); typedef boost::function3 external_ip_fun; rpc_manager(node_id const& our_id - , routing_table& table, send_fun const& sf - , void* userdata, external_ip_fun ext_ip); + , routing_table& table, udp_socket_interface* sock + , external_ip_fun ext_ip); ~rpc_manager(); void unreachable(udp::endpoint const& ep); @@ -109,8 +110,7 @@ private: typedef std::list transactions_t; transactions_t m_transactions; - send_fun m_send; - void* m_userdata; + udp_socket_interface* m_sock; node_id m_our_id; routing_table& m_table; ptime m_timer; diff --git a/src/kademlia/dht_tracker.cpp b/src/kademlia/dht_tracker.cpp index 2e2011326..29f7e8093 100644 --- a/src/kademlia/dht_tracker.cpp +++ b/src/kademlia/dht_tracker.cpp @@ -198,21 +198,13 @@ namespace libtorrent { namespace dht return node_id(node_id(nid->string().c_str())); } - bool send_callback(void* userdata, entry& e, udp::endpoint const& addr, int flags) - { - dht_tracker* self = (dht_tracker*)userdata; - return self->send_packet(e, addr, flags); - } - // class that puts the networking and the kademlia node in a single // unit and connecting them together. dht_tracker::dht_tracker(libtorrent::aux::session_impl& ses, rate_limited_udp_socket& sock , dht_settings const& settings, entry const* state) - : m_dht(ses.m_alerts, &send_callback, settings, extract_node_id(state) + : m_dht(&ses, this, settings, extract_node_id(state) , ses.external_address() - , boost::bind(&aux::session_impl::set_external_address, &ses, _1, _2, _3) - , this) - , m_ses(ses) + , boost::bind(&aux::session_impl::set_external_address, &ses, _1, _2, _3)) , m_sock(sock) , m_last_new_key(time_now() - minutes(key_refresh)) , m_timer(sock.get_io_service()) @@ -255,7 +247,6 @@ namespace libtorrent { namespace dht void dht_tracker::start(entry const& bootstrap) { - TORRENT_ASSERT(m_ses.is_network_thread()); std::vector initial_nodes; if (bootstrap.type() == entry::dictionary_t) @@ -281,7 +272,6 @@ namespace libtorrent { namespace dht void dht_tracker::stop() { - TORRENT_ASSERT(m_ses.is_network_thread()); m_abort = true; error_code ec; m_timer.cancel(ec); @@ -292,13 +282,11 @@ namespace libtorrent { namespace dht void dht_tracker::dht_status(session_status& s) { - TORRENT_ASSERT(m_ses.is_network_thread()); m_dht.status(s); } void dht_tracker::network_stats(int& sent, int& received) { - TORRENT_ASSERT(m_ses.is_network_thread()); sent = m_sent_bytes; received = m_received_bytes; m_sent_bytes = 0; @@ -307,7 +295,6 @@ namespace libtorrent { namespace dht void dht_tracker::connection_timeout(error_code const& e) { - TORRENT_ASSERT(m_ses.is_network_thread()); if (e || m_abort) return; time_duration d = m_dht.connection_timeout(); @@ -318,7 +305,6 @@ namespace libtorrent { namespace dht void dht_tracker::refresh_timeout(error_code const& e) { - TORRENT_ASSERT(m_ses.is_network_thread()); if (e || m_abort) return; m_dht.tick(); @@ -330,7 +316,6 @@ namespace libtorrent { namespace dht void dht_tracker::tick(error_code const& e) { - TORRENT_ASSERT(m_ses.is_network_thread()); if (e || m_abort) return; error_code ec; @@ -433,14 +418,12 @@ namespace libtorrent { namespace dht void dht_tracker::announce(sha1_hash const& ih, int listen_port, bool seed , boost::function const&)> f) { - TORRENT_ASSERT(m_ses.is_network_thread()); m_dht.announce(ih, listen_port, seed, f); } void dht_tracker::on_unreachable(udp::endpoint const& ep) { - TORRENT_ASSERT(m_ses.is_network_thread()); m_dht.unreachable(ep); } @@ -448,7 +431,6 @@ namespace libtorrent { namespace dht // used by the library void dht_tracker::on_receive(udp::endpoint const& ep, char const* buf, int bytes_transferred) { - TORRENT_ASSERT(m_ses.is_network_thread()); // account for IP and UDP overhead m_received_bytes += bytes_transferred + (ep.address().is_v6() ? 48 : 28); @@ -557,7 +539,6 @@ namespace libtorrent { namespace dht entry dht_tracker::state() const { - TORRENT_ASSERT(m_ses.is_network_thread()); entry ret(entry::dictionary_t); { entry nodes(entry::list_t); @@ -582,13 +563,11 @@ namespace libtorrent { namespace dht void dht_tracker::add_node(udp::endpoint node) { - TORRENT_ASSERT(m_ses.is_network_thread()); m_dht.add_node(node); } void dht_tracker::add_node(std::pair const& node) { - TORRENT_ASSERT(m_ses.is_network_thread()); char port[7]; snprintf(port, sizeof(port), "%d", node.second); udp::resolver::query q(node.first, port); @@ -599,20 +578,17 @@ namespace libtorrent { namespace dht void dht_tracker::on_name_lookup(error_code const& e , udp::resolver::iterator host) { - TORRENT_ASSERT(m_ses.is_network_thread()); if (e || host == udp::resolver::iterator()) return; add_node(host->endpoint()); } void dht_tracker::add_router_node(udp::endpoint const& node) { - TORRENT_ASSERT(m_ses.is_network_thread()); m_dht.add_router_node(node); } bool dht_tracker::send_packet(libtorrent::entry& e, udp::endpoint const& addr, int send_flags) { - TORRENT_ASSERT(m_ses.is_network_thread()); using libtorrent::bencode; using libtorrent::entry; diff --git a/src/kademlia/node.cpp b/src/kademlia/node.cpp index 98f6eb719..0d7388e45 100644 --- a/src/kademlia/node.cpp +++ b/src/kademlia/node.cpp @@ -88,18 +88,17 @@ void purge_peers(std::set& peers) void nop() {} -node_impl::node_impl(libtorrent::alert_manager& alerts - , bool (*f)(void*, entry&, udp::endpoint const&, int) +node_impl::node_impl(alert_dispatcher* alert_disp + , udp_socket_interface* sock , dht_settings const& settings, node_id nid, address const& external_address - , external_ip_fun ext_ip, void* userdata) + , external_ip_fun ext_ip) : m_settings(settings) , m_id(nid == (node_id::min)() || !verify_id(nid, external_address) ? generate_id(external_address) : nid) , m_table(m_id, 8, settings) - , m_rpc(m_id, m_table, f, userdata, ext_ip) + , m_rpc(m_id, m_table, sock, ext_ip) , m_last_tracker_tick(time_now()) - , m_alerts(alerts) - , m_send(f) - , m_userdata(userdata) + , m_post_alert(alert_disp) + , m_sock(sock) { m_secret[0] = random(); m_secret[1] = std::rand(); @@ -210,7 +209,7 @@ void node_impl::incoming(msg const& m) { entry e; incoming_error(e, "missing 'y' entry"); - m_send(m_userdata, e, m.addr, 0); + m_sock->send_packet(e, m.addr, 0); return; } @@ -230,7 +229,7 @@ void node_impl::incoming(msg const& m) TORRENT_ASSERT(m.message.dict_find_string_value("y") == "q"); entry e; incoming_request(m, e); - m_send(m_userdata, e, m.addr, 0); + m_sock->send_packet(e, m.addr, 0); break; } case 'e': @@ -398,8 +397,11 @@ void node_impl::status(session_status& s) void node_impl::lookup_peers(sha1_hash const& info_hash, int prefix, entry& reply , bool noseed, bool scrape) const { - if (m_alerts.should_post()) - m_alerts.post_alert(dht_get_peers_alert(info_hash)); + if (m_post_alert) + { + alert* a = new dht_get_peers_alert(info_hash); + if (!m_post_alert->post_alert(a)) delete a; + } table_t::const_iterator i = m_map.lower_bound(info_hash); if (i == m_map.end()) return; @@ -722,9 +724,11 @@ void node_impl::incoming_request(msg const& m, entry& e) sha1_hash info_hash(msg_keys[0]->string_ptr()); - if (m_alerts.should_post()) - m_alerts.post_alert(dht_announce_alert( - m.addr.address(), port, info_hash)); + if (m_post_alert) + { + alert* a = new dht_announce_alert(m.addr.address(), port, info_hash); + if (!m_post_alert->post_alert(a)) delete a; + } if (!verify_token(msg_keys[2]->string_value(), msg_keys[0]->string_ptr(), m.addr)) { diff --git a/src/kademlia/rpc_manager.cpp b/src/kademlia/rpc_manager.cpp index 8b434ce6d..6fd3475bb 100644 --- a/src/kademlia/rpc_manager.cpp +++ b/src/kademlia/rpc_manager.cpp @@ -160,12 +160,10 @@ enum { observer_size = max3< }; rpc_manager::rpc_manager(node_id const& our_id - , routing_table& table, send_fun const& sf - , void* userdata + , routing_table& table, udp_socket_interface* sock , external_ip_fun ext_ip) : m_pool_allocator(observer_size, 10) - , m_send(sf) - , m_userdata(userdata) + , m_sock(sock) , m_our_id(our_id) , m_table(table) , m_timer(time_now()) @@ -310,7 +308,7 @@ bool rpc_manager::incoming(msg const& m, node_id* id) #endif entry e; incoming_error(e, "invalid transaction id"); - m_send(m_userdata, e, m.addr, 0); + m_sock->send_packet(e, m.addr, 0); return false; } @@ -325,7 +323,7 @@ bool rpc_manager::incoming(msg const& m, node_id* id) { entry e; incoming_error(e, "missing 'r' key"); - m_send(m_userdata, e, m.addr, 0); + m_sock->send_packet(e, m.addr, 0); return false; } @@ -334,7 +332,7 @@ bool rpc_manager::incoming(msg const& m, node_id* id) { entry e; incoming_error(e, "missing 'id' key"); - m_send(m_userdata, e, m.addr, 0); + m_sock->send_packet(e, m.addr, 0); return false; } @@ -468,7 +466,7 @@ bool rpc_manager::invoke(entry& e, udp::endpoint target_addr << e["q"].string() << " -> " << target_addr; #endif - if (m_send(m_userdata, e, target_addr, 1)) + if (m_sock->send_packet(e, target_addr, 1)) { m_transactions.push_back(o); #if defined TORRENT_DEBUG || TORRENT_RELEASE_ASSERTS diff --git a/src/session_impl.cpp b/src/session_impl.cpp index 77c785c67..59d38d4e5 100644 --- a/src/session_impl.cpp +++ b/src/session_impl.cpp @@ -2888,6 +2888,14 @@ namespace aux { if (m_next_disk_peer == m_connections.end()) m_next_disk_peer = m_connections.begin(); } + // implements alert_dispatcher + bool session_impl::post_alert(alert* a) + { + if (!m_alerts.should_post(a)) return false; + m_alerts.post_alert_ptr(a); + return true; + } + void session_impl::set_peer_id(peer_id const& id) { m_peer_id = id; diff --git a/test/test_dht.cpp b/test/test_dht.cpp index a0f70e8d1..0fa7e3661 100644 --- a/test/test_dht.cpp +++ b/test/test_dht.cpp @@ -38,6 +38,7 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/socket_io.hpp" // for hash_address #include "libtorrent/rsa.hpp" // for generate_rsa_keys and sign_rsa #include "libtorrent/broadcast_socket.hpp" // for supports_ipv6 +#include "libtorrent/alert_dispatcher.hpp" #include #include "test.hpp" @@ -47,11 +48,14 @@ using namespace libtorrent::dht; std::list > g_responses; -bool our_send(void* user, entry& msg, udp::endpoint const& ep, int flags) +struct mock_socket : udp_socket_interface { - g_responses.push_back(std::make_pair(ep, msg)); - return true; -} + bool send_packet(entry& msg, udp::endpoint const& ep, int flags) + { + g_responses.push_back(std::make_pair(ep, msg)); + return true; + } +}; address rand_v4() { @@ -258,15 +262,25 @@ void announce_immutable_items(node_impl& node, udp::endpoint const* eps void nop(address, int, address) {} +struct print_alert : alert_dispatcher +{ + virtual bool post_alert(alert* a) + { + fprintf(stderr, "ALERT: %s\n", a->message().c_str()); + delete a; + return true; + } +}; + int test_main() { - io_service ios; - alert_manager al(ios, 100); dht_settings sett; sett.max_torrents = 4; sett.max_dht_items = 4; address ext = address::from_string("236.0.0.1"); - dht::node_impl node(al, &our_send, sett, node_id(0), ext, boost::bind(nop, _1, _2, _3), 0); + mock_socket s; + print_alert ad; + dht::node_impl node(&ad, &s, sett, node_id(0), ext, boost::bind(nop, _1, _2, _3)); // DHT should be running on port 48199 now lazy_entry response;