diff --git a/include/libtorrent/alert_types.hpp b/include/libtorrent/alert_types.hpp index 70df1de84..4ea045b61 100644 --- a/include/libtorrent/alert_types.hpp +++ b/include/libtorrent/alert_types.hpp @@ -2382,12 +2382,27 @@ namespace libtorrent int m_peers_idx; }; + struct TORRENT_EXPORT dht_direct_response_alert: alert + { + dht_direct_response_alert(aux::stack_allocator& alloc, void* userdata + , udp::endpoint const& addr, entry const& response); + + TORRENT_DEFINE_ALERT(dht_direct_response_alert, 88) + + static const int static_category = alert::dht_notification; + virtual std::string message() const; + + void* userdata; + udp::endpoint addr; + entry response; + }; + #undef TORRENT_DEFINE_ALERT_IMPL #undef TORRENT_DEFINE_ALERT #undef TORRENT_DEFINE_ALERT_PRIO #undef TORRENT_CLONE - enum { num_alert_types = 88 }; + enum { num_alert_types = 89 }; } diff --git a/include/libtorrent/aux_/session_impl.hpp b/include/libtorrent/aux_/session_impl.hpp index ba4ebec02..11c5729e5 100644 --- a/include/libtorrent/aux_/session_impl.hpp +++ b/include/libtorrent/aux_/session_impl.hpp @@ -120,6 +120,9 @@ namespace libtorrent struct bencode_map_entry; + typedef boost::function dht_extension_handler_t; + struct listen_socket_t { listen_socket_t(): external_port(0), ssl(false) {} @@ -167,6 +170,8 @@ namespace libtorrent { // the size of each allocation that is chained in the send buffer enum { send_buffer_size_impl = 128 }; + // maximum length of query names which can be registered by extensions + enum { max_dht_query_length = 15 }; #ifdef TORRENT_DEBUG // friend class ::libtorrent::peer_connection; @@ -317,6 +322,8 @@ namespace libtorrent void dht_get_peers(sha1_hash const& info_hash); void dht_announce(sha1_hash const& info_hash, int port = 0, int flags = 0); + void dht_direct_request(boost::asio::ip::udp::endpoint ep, entry& e, void* userdata); + #ifndef TORRENT_NO_DEPRECATE entry dht_state() const; #endif @@ -573,6 +580,9 @@ namespace libtorrent virtual void log_packet(message_direction_t dir, char const* pkt, int len , udp::endpoint node) TORRENT_OVERRIDE; + virtual bool on_dht_request(char const* query, int query_len + , dht::msg const& request, entry& response); + void set_external_address(address const& ip , int source_type, address const& source); virtual external_ip const& external_address() const TORRENT_OVERRIDE; @@ -1135,6 +1145,17 @@ namespace libtorrent // this is a list to allow extensions to potentially remove themselves. typedef std::list > ses_extension_list_t; ses_extension_list_t m_ses_extensions; + + // std::string could be used for the query names if only all common implementations used SSO + // *glares at gcc* + struct extention_dht_query + { + uint8_t query_len; + boost::array query; + dht_extension_handler_t handler; + }; + typedef std::vector m_extension_dht_queries_t; + m_extension_dht_queries_t m_extension_dht_queries; #endif // if this function is set, it indicates that torrents are allowed diff --git a/include/libtorrent/extensions.hpp b/include/libtorrent/extensions.hpp index 275540241..6f6b84399 100644 --- a/include/libtorrent/extensions.hpp +++ b/include/libtorrent/extensions.hpp @@ -192,6 +192,11 @@ namespace libtorrent struct peer_connection_handle; struct torrent_handle; + typedef boost::function dht_extension_handler_t; + + typedef std::vector > dht_extensions_t; + // this is the base class for a session plugin. One primary feature // is that it is notified of all torrents that are added to the session, // and can add its own torrent_plugins. @@ -214,6 +219,10 @@ namespace libtorrent // called when plugin is added to a session virtual void added(session_handle) {} + // called after a plugin is added + // allows the plugin to register DHT requests it would like to handle + virtual void register_dht_extensions(dht_extensions_t&) {} + // called when an alert is posted alerts that are filtered are not posted virtual void on_alert(alert const*) {} diff --git a/include/libtorrent/kademlia/dht_observer.hpp b/include/libtorrent/kademlia/dht_observer.hpp index 949bbe735..ba9c9d389 100644 --- a/include/libtorrent/kademlia/dht_observer.hpp +++ b/include/libtorrent/kademlia/dht_observer.hpp @@ -73,6 +73,8 @@ namespace libtorrent { namespace dht virtual void outgoing_get_peers(sha1_hash const& target , sha1_hash const& sent_target, udp::endpoint const& ep) = 0; virtual void announce(sha1_hash const& ih, address const& addr, int port) = 0; + virtual bool on_dht_request(char const* query, int query_len + , dht::msg const& request, entry& response) = 0; protected: ~dht_observer() {} diff --git a/include/libtorrent/kademlia/dht_tracker.hpp b/include/libtorrent/kademlia/dht_tracker.hpp index dd30a36db..ac09bd369 100644 --- a/include/libtorrent/kademlia/dht_tracker.hpp +++ b/include/libtorrent/kademlia/dht_tracker.hpp @@ -106,6 +106,10 @@ namespace libtorrent { namespace dht void put_item(char const* key , boost::function cb, std::string salt = std::string()); + // send an arbitrary DHT request directly to a node + void direct_request(boost::asio::ip::udp::endpoint ep, entry& e + , boost::function f); + #ifndef TORRENT_NO_DEPRECATE void dht_status(session_status& s); #endif diff --git a/include/libtorrent/kademlia/direct_request.hpp b/include/libtorrent/kademlia/direct_request.hpp new file mode 100644 index 000000000..f9e24e238 --- /dev/null +++ b/include/libtorrent/kademlia/direct_request.hpp @@ -0,0 +1,101 @@ +/* + +Copyright (c) 2014, Steven Siloti +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in + the documentation and/or other materials provided with the distribution. + * Neither the name of the author nor the names of its + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. + +*/ + +#ifndef LIBTORRENT_DIRECT_REQUEST_HPP +#define LIBTORRENT_DIRECT_REQUEST_HPP + +#include +#include +#include + +namespace libtorrent { namespace dht +{ + +struct direct_trasversal : traversal_algorithm +{ + direct_trasversal(node& node + , node_id target + , boost::function cb) + : traversal_algorithm(node, target) + , m_cb(cb) + {} + + virtual char const* name() const { return "direct_trasversal"; } + + void invoke_cb(msg const& m) + { + if (!m_cb.empty()) + { + m_cb(m); + m_cb.clear(); + done(); + } + } + +protected: + boost::function m_cb; +}; + +struct direct_observer : observer +{ + direct_observer(boost::intrusive_ptr const& algo + , udp::endpoint const& ep, node_id const& id) + : observer(algo, ep, id) + {} + + virtual void reply(msg const& m) + { + flags |= flag_done; + static_cast(algorithm())->invoke_cb(m); + } + + virtual void timeout() + { + if (flags & flag_done) return; + flags |= flag_done; + bdecode_node e; + if (flags & flag_ipv6_address) + { + msg m(e, target_ep()); + static_cast(algorithm())->invoke_cb(m); + } + else + { + msg m(e, target_ep()); + static_cast(algorithm())->invoke_cb(m); + } + } +}; + +} } // namespace libtorrent::dht + +#endif diff --git a/include/libtorrent/kademlia/node.hpp b/include/libtorrent/kademlia/node.hpp index d97d1c8c4..179738a50 100644 --- a/include/libtorrent/kademlia/node.hpp +++ b/include/libtorrent/kademlia/node.hpp @@ -245,6 +245,9 @@ public: void announce(sha1_hash const& info_hash, int listen_port, int flags , boost::function const&)> f); + void direct_request(boost::asio::ip::udp::endpoint ep, entry& e + , boost::function f); + void get_item(sha1_hash const& target, boost::function f); void get_item(char const* pk, std::string const& salt, boost::function f); diff --git a/include/libtorrent/kademlia/observer.hpp b/include/libtorrent/kademlia/observer.hpp index 68722b330..67928a352 100644 --- a/include/libtorrent/kademlia/observer.hpp +++ b/include/libtorrent/kademlia/observer.hpp @@ -96,7 +96,7 @@ struct observer : boost::noncopyable // this is called when no reply has been received within // some timeout - void timeout(); + virtual void timeout(); // if this is called the destructor should // not invoke any new messages, and should diff --git a/include/libtorrent/session_handle.hpp b/include/libtorrent/session_handle.hpp index 3c9cb2a32..9728af7ae 100644 --- a/include/libtorrent/session_handle.hpp +++ b/include/libtorrent/session_handle.hpp @@ -431,6 +431,8 @@ namespace libtorrent void dht_get_peers(sha1_hash const& info_hash); void dht_announce(sha1_hash const& info_hash, int port = 0, int flags = 0); + void dht_direct_request(boost::asio::ip::udp::endpoint ep, entry const& e, void* userdata); + #ifndef TORRENT_NO_DEPRECATE // deprecated in 0.15 // use save_state and load_state instead diff --git a/src/alert.cpp b/src/alert.cpp index aa9b8f268..f08f40b8c 100644 --- a/src/alert.cpp +++ b/src/alert.cpp @@ -1831,5 +1831,20 @@ namespace libtorrent { } } + dht_direct_response_alert::dht_direct_response_alert( + aux::stack_allocator&, void* userdata + , udp::endpoint const& addr, entry const& response) + : userdata(userdata), addr(addr), response(response) + {} + + std::string dht_direct_response_alert::message() const + { + char msg[1050]; + snprintf(msg, sizeof(msg), "DHT direct response (address=%s) [ %s ]" + , addr.address().to_string().c_str() + , response.to_string().c_str()); + return msg; + } + } // namespace libtorrent diff --git a/src/kademlia/dht_tracker.cpp b/src/kademlia/dht_tracker.cpp index 83db75609..e3646492f 100644 --- a/src/kademlia/dht_tracker.cpp +++ b/src/kademlia/dht_tracker.cpp @@ -311,6 +311,12 @@ namespace libtorrent { namespace dht , _1, _2, cb)); } + void dht_tracker::direct_request(boost::asio::ip::udp::endpoint ep, entry& e + , boost::function f) + { + m_dht.direct_request(ep, e, f); + } + // translate bittorrent kademlia message into the generice kademlia message // used by the library bool dht_tracker::incoming_packet(error_code const& ec diff --git a/src/kademlia/node.cpp b/src/kademlia/node.cpp index 7df76a0c3..e964f3819 100644 --- a/src/kademlia/node.cpp +++ b/src/kademlia/node.cpp @@ -58,6 +58,7 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/kademlia/routing_table.hpp" #include "libtorrent/kademlia/node.hpp" #include "libtorrent/kademlia/dht_observer.hpp" +#include "libtorrent/kademlia/direct_request.hpp" #include "libtorrent/kademlia/refresh.hpp" #include "libtorrent/kademlia/get_peers.hpp" @@ -411,6 +412,22 @@ void node::announce(sha1_hash const& info_hash, int listen_port, int flags , listen_port, info_hash, flags), flags & node::flag_seed); } +void node::direct_request(boost::asio::ip::udp::endpoint ep, entry& e + , boost::function f) +{ + // not really a traversal + boost::intrusive_ptr algo( + new direct_trasversal(*this, (node_id::min)(), f)); + + void* ptr = m_rpc.allocate_observer(); + if (ptr == 0) return; + observer_ptr o(new (ptr) direct_observer(algo, ep, (node_id::min)())); +#if defined TORRENT_DEBUG || TORRENT_RELEASE_ASSERTS + o->m_in_constructor = false; +#endif + m_rpc.invoke(e, ep, o); +} + void node::get_item(sha1_hash const& target , boost::function f) { @@ -868,9 +885,6 @@ void node::incoming_request(msg const& m, entry& e) e["ip"] = endpoint_to_bytes(m.addr); - char const* query = top_level[0].string_ptr(); - int query_len = top_level[0].string_length(); - bdecode_node arg_ent = top_level[2]; bool read_only = top_level[1] && top_level[1].int_value() != 0; node_id id(top_level[3].string_ptr()); @@ -893,6 +907,12 @@ void node::incoming_request(msg const& m, entry& e) // mirror back the other node's external port reply["p"] = m.addr.port(); + char const* query = top_level[0].string_ptr(); + int query_len = top_level[0].string_length(); + + if (m_observer && m_observer->on_dht_request(query, query_len, m, e)) + return; + if (query_len == 4 && memcmp(query, "ping", 4) == 0) { m_counters.inc_stats_counter(counters::dht_ping_in); diff --git a/src/session_handle.cpp b/src/session_handle.cpp index b06ff13a5..24a947e92 100644 --- a/src/session_handle.cpp +++ b/src/session_handle.cpp @@ -396,6 +396,13 @@ namespace libtorrent #endif } + void session_handle::dht_direct_request(boost::asio::ip::udp::endpoint ep, entry const& e, void* userdata) + { +#ifndef TORRENT_DISABLE_DHT + TORRENT_ASYNC_CALL3(dht_direct_request, ep, e, userdata); +#endif + } + #ifndef TORRENT_NO_DEPRECATE entry session_handle::dht_state() const { diff --git a/src/session_impl.cpp b/src/session_impl.cpp index 50268fbfb..5ab9d7c43 100644 --- a/src/session_impl.cpp +++ b/src/session_impl.cpp @@ -894,7 +894,22 @@ namespace aux { m_ses_extensions.push_back(ext); m_alerts.add_extension(ext); ext->added(session_handle(this)); + + dht_extensions_t dht_ext; + ext->register_dht_extensions(dht_ext); + for (dht_extensions_t::iterator e = dht_ext.begin(); + e != dht_ext.end(); ++e) + { + TORRENT_ASSERT(e->first.size() <= max_dht_query_length); + if (e->first.size() > max_dht_query_length) continue; + extention_dht_query registration; + registration.query_len = e->first.size(); + std::copy(e->first.begin(), e->first.end(), registration.query.begin()); + registration.handler = e->second; + m_extension_dht_queries.push_back(registration); + } } + #endif // TORRENT_DISABLE_EXTENSIONS #ifndef TORRENT_NO_DEPRECATE @@ -5532,6 +5547,13 @@ retry: alerts.emplace_alert(info_hash, peers); } + void on_direct_response(alert_manager& alerts, void* userdata, dht::msg const& msg) + { + entry e; + e = msg.message; + alerts.emplace_alert(userdata, msg.addr, e); + } + } // anonymous namespace void session_impl::dht_put_immutable_item(entry data, sha1_hash target) @@ -5563,6 +5585,12 @@ retry: m_dht->announce(info_hash, port, flags, boost::bind(&on_dht_get_peers, boost::ref(m_alerts), info_hash, _1)); } + void session_impl::dht_direct_request(boost::asio::ip::udp::endpoint ep, entry& e, void* userdata) + { + if (!m_dht) return; + m_dht->direct_request(ep, e, boost::bind(&on_direct_response, boost::ref(m_alerts), userdata, _1)); + } + #endif void session_impl::maybe_update_udp_mapping(int nat, int local_port, int external_port) @@ -6496,6 +6524,24 @@ retry: m_alerts.emplace_alert(pkt, len, d, node); } + bool session_impl::on_dht_request(char const* query, int query_len + , dht::msg const& request, entry& response) + { +#ifndef TORRENT_DISABLE_EXTENSIONS + if (query_len > max_dht_query_length) return false; + + for (m_extension_dht_queries_t::iterator i = m_extension_dht_queries.begin(); + i != m_extension_dht_queries.end(); ++i) + { + if (query_len == i->query_len + && memcmp(i->query.data(), query, query_len) == 0 + && i->handler(request.addr, request.message, response)) + return true; + } +#endif + return false; + } + void session_impl::set_external_address(address const& ip , int source_type, address const& source) { diff --git a/test/test_dht.cpp b/test/test_dht.cpp index b1a438ce6..cca52086d 100644 --- a/test/test_dht.cpp +++ b/test/test_dht.cpp @@ -438,6 +438,8 @@ struct obs : dht::dht_observer virtual void log(dht_logger::module_t l, char const* fmt, ...) TORRENT_OVERRIDE {} virtual void log_packet(message_direction_t dir, char const* pkt, int len , udp::endpoint node) TORRENT_OVERRIDE {} + virtual bool on_dht_request(char const* query, int query_len + , dht::msg const& request, entry& response) TORRENT_OVERRIDE { return false; } }; // TODO: test obfuscated_get_peers