enable arbitrary direct dht requests and extension dht requests

If you're wondering why I used an alert for handling responses but not requests,
it's because the former was easy to do that way but the latter would have
involved some major refactoring or kludging. The DHT node is written with the
(IMO reasonable) assumption that all responses will be generated immediately, so
there was no way to easily accommodate getting a response asynchronously via
an alert.
This commit is contained in:
Steven Siloti 2014-02-16 21:56:49 -08:00
parent a29e64965c
commit 86fd8c3048
15 changed files with 258 additions and 5 deletions

View File

@ -2382,12 +2382,27 @@ namespace libtorrent
int m_peers_idx;
};
struct TORRENT_EXPORT dht_direct_response_alert: alert
{
dht_direct_response_alert(aux::stack_allocator& alloc, void* userdata
, udp::endpoint const& addr, entry const& response);
TORRENT_DEFINE_ALERT(dht_direct_response_alert, 88)
static const int static_category = alert::dht_notification;
virtual std::string message() const;
void* userdata;
udp::endpoint addr;
entry response;
};
#undef TORRENT_DEFINE_ALERT_IMPL
#undef TORRENT_DEFINE_ALERT
#undef TORRENT_DEFINE_ALERT_PRIO
#undef TORRENT_CLONE
enum { num_alert_types = 88 };
enum { num_alert_types = 89 };
}

View File

@ -120,6 +120,9 @@ namespace libtorrent
struct bencode_map_entry;
typedef boost::function<bool(udp::endpoint const& source
, bdecode_node const& request, entry& response)> dht_extension_handler_t;
struct listen_socket_t
{
listen_socket_t(): external_port(0), ssl(false) {}
@ -167,6 +170,8 @@ namespace libtorrent
{
// the size of each allocation that is chained in the send buffer
enum { send_buffer_size_impl = 128 };
// maximum length of query names which can be registered by extensions
enum { max_dht_query_length = 15 };
#ifdef TORRENT_DEBUG
// friend class ::libtorrent::peer_connection;
@ -317,6 +322,8 @@ namespace libtorrent
void dht_get_peers(sha1_hash const& info_hash);
void dht_announce(sha1_hash const& info_hash, int port = 0, int flags = 0);
void dht_direct_request(boost::asio::ip::udp::endpoint ep, entry& e, void* userdata);
#ifndef TORRENT_NO_DEPRECATE
entry dht_state() const;
#endif
@ -573,6 +580,9 @@ namespace libtorrent
virtual void log_packet(message_direction_t dir, char const* pkt, int len
, udp::endpoint node) TORRENT_OVERRIDE;
virtual bool on_dht_request(char const* query, int query_len
, dht::msg const& request, entry& response);
void set_external_address(address const& ip
, int source_type, address const& source);
virtual external_ip const& external_address() const TORRENT_OVERRIDE;
@ -1135,6 +1145,17 @@ namespace libtorrent
// this is a list to allow extensions to potentially remove themselves.
typedef std::list<boost::shared_ptr<plugin> > ses_extension_list_t;
ses_extension_list_t m_ses_extensions;
// std::string could be used for the query names if only all common implementations used SSO
// *glares at gcc*
struct extention_dht_query
{
uint8_t query_len;
boost::array<char, max_dht_query_length> query;
dht_extension_handler_t handler;
};
typedef std::vector<extention_dht_query> m_extension_dht_queries_t;
m_extension_dht_queries_t m_extension_dht_queries;
#endif
// if this function is set, it indicates that torrents are allowed

View File

@ -192,6 +192,11 @@ namespace libtorrent
struct peer_connection_handle;
struct torrent_handle;
typedef boost::function<bool(udp::endpoint const& source
, bdecode_node const& request, entry& response)> dht_extension_handler_t;
typedef std::vector<std::pair<std::string, dht_extension_handler_t> > dht_extensions_t;
// this is the base class for a session plugin. One primary feature
// is that it is notified of all torrents that are added to the session,
// and can add its own torrent_plugins.
@ -214,6 +219,10 @@ namespace libtorrent
// called when plugin is added to a session
virtual void added(session_handle) {}
// called after a plugin is added
// allows the plugin to register DHT requests it would like to handle
virtual void register_dht_extensions(dht_extensions_t&) {}
// called when an alert is posted alerts that are filtered are not posted
virtual void on_alert(alert const*) {}

View File

@ -73,6 +73,8 @@ namespace libtorrent { namespace dht
virtual void outgoing_get_peers(sha1_hash const& target
, sha1_hash const& sent_target, udp::endpoint const& ep) = 0;
virtual void announce(sha1_hash const& ih, address const& addr, int port) = 0;
virtual bool on_dht_request(char const* query, int query_len
, dht::msg const& request, entry& response) = 0;
protected:
~dht_observer() {}

View File

@ -106,6 +106,10 @@ namespace libtorrent { namespace dht
void put_item(char const* key
, boost::function<void(item&)> cb, std::string salt = std::string());
// send an arbitrary DHT request directly to a node
void direct_request(boost::asio::ip::udp::endpoint ep, entry& e
, boost::function<void(msg const&)> f);
#ifndef TORRENT_NO_DEPRECATE
void dht_status(session_status& s);
#endif

View File

@ -0,0 +1,101 @@
/*
Copyright (c) 2014, Steven Siloti
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in
the documentation and/or other materials provided with the distribution.
* Neither the name of the author nor the names of its
contributors may be used to endorse or promote products derived
from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef LIBTORRENT_DIRECT_REQUEST_HPP
#define LIBTORRENT_DIRECT_REQUEST_HPP
#include <boost/function/function1.hpp>
#include <libtorrent/kademlia/msg.hpp>
#include <libtorrent/kademlia/traversal_algorithm.hpp>
namespace libtorrent { namespace dht
{
struct direct_trasversal : traversal_algorithm
{
direct_trasversal(node& node
, node_id target
, boost::function<void(msg const&)> cb)
: traversal_algorithm(node, target)
, m_cb(cb)
{}
virtual char const* name() const { return "direct_trasversal"; }
void invoke_cb(msg const& m)
{
if (!m_cb.empty())
{
m_cb(m);
m_cb.clear();
done();
}
}
protected:
boost::function<void(msg const&)> m_cb;
};
struct direct_observer : observer
{
direct_observer(boost::intrusive_ptr<traversal_algorithm> const& algo
, udp::endpoint const& ep, node_id const& id)
: observer(algo, ep, id)
{}
virtual void reply(msg const& m)
{
flags |= flag_done;
static_cast<direct_trasversal*>(algorithm())->invoke_cb(m);
}
virtual void timeout()
{
if (flags & flag_done) return;
flags |= flag_done;
bdecode_node e;
if (flags & flag_ipv6_address)
{
msg m(e, target_ep());
static_cast<direct_trasversal*>(algorithm())->invoke_cb(m);
}
else
{
msg m(e, target_ep());
static_cast<direct_trasversal*>(algorithm())->invoke_cb(m);
}
}
};
} } // namespace libtorrent::dht
#endif

View File

@ -245,6 +245,9 @@ public:
void announce(sha1_hash const& info_hash, int listen_port, int flags
, boost::function<void(std::vector<tcp::endpoint> const&)> f);
void direct_request(boost::asio::ip::udp::endpoint ep, entry& e
, boost::function<void(msg const&)> f);
void get_item(sha1_hash const& target, boost::function<bool(item&, bool)> f);
void get_item(char const* pk, std::string const& salt, boost::function<bool(item&, bool)> f);

View File

@ -96,7 +96,7 @@ struct observer : boost::noncopyable
// this is called when no reply has been received within
// some timeout
void timeout();
virtual void timeout();
// if this is called the destructor should
// not invoke any new messages, and should

View File

@ -431,6 +431,8 @@ namespace libtorrent
void dht_get_peers(sha1_hash const& info_hash);
void dht_announce(sha1_hash const& info_hash, int port = 0, int flags = 0);
void dht_direct_request(boost::asio::ip::udp::endpoint ep, entry const& e, void* userdata);
#ifndef TORRENT_NO_DEPRECATE
// deprecated in 0.15
// use save_state and load_state instead

View File

@ -1831,5 +1831,20 @@ namespace libtorrent {
}
}
dht_direct_response_alert::dht_direct_response_alert(
aux::stack_allocator&, void* userdata
, udp::endpoint const& addr, entry const& response)
: userdata(userdata), addr(addr), response(response)
{}
std::string dht_direct_response_alert::message() const
{
char msg[1050];
snprintf(msg, sizeof(msg), "DHT direct response (address=%s) [ %s ]"
, addr.address().to_string().c_str()
, response.to_string().c_str());
return msg;
}
} // namespace libtorrent

View File

@ -311,6 +311,12 @@ namespace libtorrent { namespace dht
, _1, _2, cb));
}
void dht_tracker::direct_request(boost::asio::ip::udp::endpoint ep, entry& e
, boost::function<void(msg const&)> f)
{
m_dht.direct_request(ep, e, f);
}
// translate bittorrent kademlia message into the generice kademlia message
// used by the library
bool dht_tracker::incoming_packet(error_code const& ec

View File

@ -58,6 +58,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/kademlia/routing_table.hpp"
#include "libtorrent/kademlia/node.hpp"
#include "libtorrent/kademlia/dht_observer.hpp"
#include "libtorrent/kademlia/direct_request.hpp"
#include "libtorrent/kademlia/refresh.hpp"
#include "libtorrent/kademlia/get_peers.hpp"
@ -411,6 +412,22 @@ void node::announce(sha1_hash const& info_hash, int listen_port, int flags
, listen_port, info_hash, flags), flags & node::flag_seed);
}
void node::direct_request(boost::asio::ip::udp::endpoint ep, entry& e
, boost::function<void(msg const&)> f)
{
// not really a traversal
boost::intrusive_ptr<direct_trasversal> algo(
new direct_trasversal(*this, (node_id::min)(), f));
void* ptr = m_rpc.allocate_observer();
if (ptr == 0) return;
observer_ptr o(new (ptr) direct_observer(algo, ep, (node_id::min)()));
#if defined TORRENT_DEBUG || TORRENT_RELEASE_ASSERTS
o->m_in_constructor = false;
#endif
m_rpc.invoke(e, ep, o);
}
void node::get_item(sha1_hash const& target
, boost::function<bool(item&, bool)> f)
{
@ -868,9 +885,6 @@ void node::incoming_request(msg const& m, entry& e)
e["ip"] = endpoint_to_bytes(m.addr);
char const* query = top_level[0].string_ptr();
int query_len = top_level[0].string_length();
bdecode_node arg_ent = top_level[2];
bool read_only = top_level[1] && top_level[1].int_value() != 0;
node_id id(top_level[3].string_ptr());
@ -893,6 +907,12 @@ void node::incoming_request(msg const& m, entry& e)
// mirror back the other node's external port
reply["p"] = m.addr.port();
char const* query = top_level[0].string_ptr();
int query_len = top_level[0].string_length();
if (m_observer && m_observer->on_dht_request(query, query_len, m, e))
return;
if (query_len == 4 && memcmp(query, "ping", 4) == 0)
{
m_counters.inc_stats_counter(counters::dht_ping_in);

View File

@ -396,6 +396,13 @@ namespace libtorrent
#endif
}
void session_handle::dht_direct_request(boost::asio::ip::udp::endpoint ep, entry const& e, void* userdata)
{
#ifndef TORRENT_DISABLE_DHT
TORRENT_ASYNC_CALL3(dht_direct_request, ep, e, userdata);
#endif
}
#ifndef TORRENT_NO_DEPRECATE
entry session_handle::dht_state() const
{

View File

@ -894,7 +894,22 @@ namespace aux {
m_ses_extensions.push_back(ext);
m_alerts.add_extension(ext);
ext->added(session_handle(this));
dht_extensions_t dht_ext;
ext->register_dht_extensions(dht_ext);
for (dht_extensions_t::iterator e = dht_ext.begin();
e != dht_ext.end(); ++e)
{
TORRENT_ASSERT(e->first.size() <= max_dht_query_length);
if (e->first.size() > max_dht_query_length) continue;
extention_dht_query registration;
registration.query_len = e->first.size();
std::copy(e->first.begin(), e->first.end(), registration.query.begin());
registration.handler = e->second;
m_extension_dht_queries.push_back(registration);
}
}
#endif // TORRENT_DISABLE_EXTENSIONS
#ifndef TORRENT_NO_DEPRECATE
@ -5532,6 +5547,13 @@ retry:
alerts.emplace_alert<dht_get_peers_reply_alert>(info_hash, peers);
}
void on_direct_response(alert_manager& alerts, void* userdata, dht::msg const& msg)
{
entry e;
e = msg.message;
alerts.emplace_alert<dht_direct_response_alert>(userdata, msg.addr, e);
}
} // anonymous namespace
void session_impl::dht_put_immutable_item(entry data, sha1_hash target)
@ -5563,6 +5585,12 @@ retry:
m_dht->announce(info_hash, port, flags, boost::bind(&on_dht_get_peers, boost::ref(m_alerts), info_hash, _1));
}
void session_impl::dht_direct_request(boost::asio::ip::udp::endpoint ep, entry& e, void* userdata)
{
if (!m_dht) return;
m_dht->direct_request(ep, e, boost::bind(&on_direct_response, boost::ref(m_alerts), userdata, _1));
}
#endif
void session_impl::maybe_update_udp_mapping(int nat, int local_port, int external_port)
@ -6496,6 +6524,24 @@ retry:
m_alerts.emplace_alert<dht_pkt_alert>(pkt, len, d, node);
}
bool session_impl::on_dht_request(char const* query, int query_len
, dht::msg const& request, entry& response)
{
#ifndef TORRENT_DISABLE_EXTENSIONS
if (query_len > max_dht_query_length) return false;
for (m_extension_dht_queries_t::iterator i = m_extension_dht_queries.begin();
i != m_extension_dht_queries.end(); ++i)
{
if (query_len == i->query_len
&& memcmp(i->query.data(), query, query_len) == 0
&& i->handler(request.addr, request.message, response))
return true;
}
#endif
return false;
}
void session_impl::set_external_address(address const& ip
, int source_type, address const& source)
{

View File

@ -438,6 +438,8 @@ struct obs : dht::dht_observer
virtual void log(dht_logger::module_t l, char const* fmt, ...) TORRENT_OVERRIDE {}
virtual void log_packet(message_direction_t dir, char const* pkt, int len
, udp::endpoint node) TORRENT_OVERRIDE {}
virtual bool on_dht_request(char const* query, int query_len
, dht::msg const& request, entry& response) TORRENT_OVERRIDE { return false; }
};
// TODO: test obfuscated_get_peers