Merge pull request #69 from ssiloti/direct-dht-req

Direct dht reqests
This commit is contained in:
Arvid Norberg 2015-08-11 23:32:25 -04:00
commit 36cf20b19f
17 changed files with 313 additions and 5 deletions

View File

@ -178,6 +178,7 @@ nobase_include_HEADERS = \
\
kademlia/dht_tracker.hpp \
kademlia/dht_observer.hpp \
kademlia/direct_request.hpp \
kademlia/dos_blocker.hpp \
kademlia/find_data.hpp \
kademlia/msg.hpp \

View File

@ -2382,12 +2382,39 @@ namespace libtorrent
int m_peers_idx;
};
// This is posted exactly once for every call to session_handle::dht_direct_request.
// If the request failed, response() will return a default constructed bdecode_node.
struct TORRENT_EXPORT dht_direct_response_alert: alert
{
dht_direct_response_alert(aux::stack_allocator& alloc, void* userdata
, udp::endpoint const& addr, bdecode_node const& response);
// for when there was a timeout so we don't have a response
dht_direct_response_alert(aux::stack_allocator& alloc, void* userdata
, udp::endpoint const& addr);
TORRENT_DEFINE_ALERT(dht_direct_response_alert, 88)
static const int static_category = alert::dht_notification;
virtual std::string message() const;
void* userdata;
udp::endpoint addr;
bdecode_node response() const;
private:
aux::stack_allocator& m_alloc;
int m_response_idx;
int m_response_size;
};
#undef TORRENT_DEFINE_ALERT_IMPL
#undef TORRENT_DEFINE_ALERT
#undef TORRENT_DEFINE_ALERT_PRIO
#undef TORRENT_CLONE
enum { num_alert_types = 88 };
enum { num_alert_types = 89 };
}

View File

@ -120,6 +120,9 @@ namespace libtorrent
struct bencode_map_entry;
typedef boost::function<bool(udp::endpoint const& source
, bdecode_node const& request, entry& response)> dht_extension_handler_t;
struct listen_socket_t
{
listen_socket_t(): external_port(0), ssl(false) {}
@ -167,6 +170,8 @@ namespace libtorrent
{
// the size of each allocation that is chained in the send buffer
enum { send_buffer_size_impl = 128 };
// maximum length of query names which can be registered by extensions
enum { max_dht_query_length = 15 };
#ifdef TORRENT_DEBUG
// friend class ::libtorrent::peer_connection;
@ -317,6 +322,8 @@ namespace libtorrent
void dht_get_peers(sha1_hash const& info_hash);
void dht_announce(sha1_hash const& info_hash, int port = 0, int flags = 0);
void dht_direct_request(boost::asio::ip::udp::endpoint ep, entry& e, void* userdata);
#ifndef TORRENT_NO_DEPRECATE
entry dht_state() const;
#endif
@ -573,6 +580,9 @@ namespace libtorrent
virtual void log_packet(message_direction_t dir, char const* pkt, int len
, udp::endpoint node) TORRENT_OVERRIDE;
virtual bool on_dht_request(char const* query, int query_len
, dht::msg const& request, entry& response);
void set_external_address(address const& ip
, int source_type, address const& source);
virtual external_ip const& external_address() const TORRENT_OVERRIDE;
@ -1135,6 +1145,17 @@ namespace libtorrent
// this is a list to allow extensions to potentially remove themselves.
typedef std::list<boost::shared_ptr<plugin> > ses_extension_list_t;
ses_extension_list_t m_ses_extensions;
// std::string could be used for the query names if only all common implementations used SSO
// *glares at gcc*
struct extention_dht_query
{
uint8_t query_len;
boost::array<char, max_dht_query_length> query;
dht_extension_handler_t handler;
};
typedef std::vector<extention_dht_query> m_extension_dht_queries_t;
m_extension_dht_queries_t m_extension_dht_queries;
#endif
// if this function is set, it indicates that torrents are allowed

View File

@ -192,6 +192,14 @@ namespace libtorrent
struct peer_connection_handle;
struct torrent_handle;
// Functions of this type are called to handle incoming DHT requests
typedef boost::function<bool(udp::endpoint const& source
, bdecode_node const& request, entry& response)> dht_extension_handler_t;
// Map of query strings to handlers. Note that query strings are limited to 15 bytes.
// see max_dht_query_length
typedef std::vector<std::pair<std::string, dht_extension_handler_t> > dht_extensions_t;
// this is the base class for a session plugin. One primary feature
// is that it is notified of all torrents that are added to the session,
// and can add its own torrent_plugins.
@ -214,6 +222,10 @@ namespace libtorrent
// called when plugin is added to a session
virtual void added(session_handle) {}
// called after a plugin is added
// allows the plugin to register DHT requests it would like to handle
virtual void register_dht_extensions(dht_extensions_t&) {}
// called when an alert is posted alerts that are filtered are not posted
virtual void on_alert(alert const*) {}

View File

@ -73,6 +73,8 @@ namespace libtorrent { namespace dht
virtual void outgoing_get_peers(sha1_hash const& target
, sha1_hash const& sent_target, udp::endpoint const& ep) = 0;
virtual void announce(sha1_hash const& ih, address const& addr, int port) = 0;
virtual bool on_dht_request(char const* query, int query_len
, dht::msg const& request, entry& response) = 0;
protected:
~dht_observer() {}

View File

@ -106,6 +106,10 @@ namespace libtorrent { namespace dht
void put_item(char const* key
, boost::function<void(item&)> cb, std::string salt = std::string());
// send an arbitrary DHT request directly to a node
void direct_request(boost::asio::ip::udp::endpoint ep, entry& e
, boost::function<void(msg const&)> f);
#ifndef TORRENT_NO_DEPRECATE
void dht_status(session_status& s);
#endif

View File

@ -0,0 +1,101 @@
/*
Copyright (c) 2014, Steven Siloti
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in
the documentation and/or other materials provided with the distribution.
* Neither the name of the author nor the names of its
contributors may be used to endorse or promote products derived
from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef LIBTORRENT_DIRECT_REQUEST_HPP
#define LIBTORRENT_DIRECT_REQUEST_HPP
#include <boost/function/function1.hpp>
#include <libtorrent/kademlia/msg.hpp>
#include <libtorrent/kademlia/traversal_algorithm.hpp>
namespace libtorrent { namespace dht
{
struct direct_traversal : traversal_algorithm
{
direct_traversal(node& node
, node_id target
, boost::function<void(msg const&)> cb)
: traversal_algorithm(node, target)
, m_cb(cb)
{}
virtual char const* name() const { return "direct_traversal"; }
void invoke_cb(msg const& m)
{
if (!m_cb.empty())
{
m_cb(m);
m_cb.clear();
done();
}
}
protected:
boost::function<void(msg const&)> m_cb;
};
struct direct_observer : observer
{
direct_observer(boost::intrusive_ptr<traversal_algorithm> const& algo
, udp::endpoint const& ep, node_id const& id)
: observer(algo, ep, id)
{}
virtual void reply(msg const& m)
{
flags |= flag_done;
static_cast<direct_traversal*>(algorithm())->invoke_cb(m);
}
virtual void timeout()
{
if (flags & flag_done) return;
flags |= flag_done;
bdecode_node e;
if (flags & flag_ipv6_address)
{
msg m(e, target_ep());
static_cast<direct_traversal*>(algorithm())->invoke_cb(m);
}
else
{
msg m(e, target_ep());
static_cast<direct_traversal*>(algorithm())->invoke_cb(m);
}
}
};
} } // namespace libtorrent::dht
#endif

View File

@ -245,6 +245,9 @@ public:
void announce(sha1_hash const& info_hash, int listen_port, int flags
, boost::function<void(std::vector<tcp::endpoint> const&)> f);
void direct_request(boost::asio::ip::udp::endpoint ep, entry& e
, boost::function<void(msg const&)> f);
void get_item(sha1_hash const& target, boost::function<bool(item&, bool)> f);
void get_item(char const* pk, std::string const& salt, boost::function<bool(item&, bool)> f);

View File

@ -96,7 +96,7 @@ struct observer : boost::noncopyable
// this is called when no reply has been received within
// some timeout
void timeout();
virtual void timeout();
// if this is called the destructor should
// not invoke any new messages, and should

View File

@ -108,6 +108,13 @@ struct TORRENT_EXPORT peer_connection_handle
time_t last_seen_complete() const;
time_point time_of_last_unchoke() const;
bool operator==(peer_connection_handle const& o) const
{ return m_connection.lock() == o.m_connection.lock(); }
bool operator!=(peer_connection_handle const& o) const
{ return m_connection.lock() != o.m_connection.lock(); }
bool operator<(peer_connection_handle const& o) const
{ return m_connection.lock() < o.m_connection.lock(); }
boost::shared_ptr<peer_connection> native_handle() const
{
return m_connection.lock();

View File

@ -431,6 +431,14 @@ namespace libtorrent
void dht_get_peers(sha1_hash const& info_hash);
void dht_announce(sha1_hash const& info_hash, int port = 0, int flags = 0);
// Send an arbitrary DHT request directly to the specified endpoint. This
// function is intended for use by plugins. Whan a response is received
// or the request times out, a dht_direct_response_alert will be posted
// with the response (if any) and the userdata pointer passed in here.
// Since this alert is a reponse to an explicit call, it will always be
// posted, regardless of the alert mask.
void dht_direct_request(boost::asio::ip::udp::endpoint ep, entry const& e, void* userdata);
#ifndef TORRENT_NO_DEPRECATE
// deprecated in 0.15
// use save_state and load_state instead

View File

@ -1831,5 +1831,42 @@ namespace libtorrent {
}
}
dht_direct_response_alert::dht_direct_response_alert(
aux::stack_allocator& alloc, void* userdata
, udp::endpoint const& addr, bdecode_node const& response)
: userdata(userdata), addr(addr), m_alloc(alloc)
, m_response_idx(alloc.copy_buffer(response.data_section().first, response.data_section().second))
, m_response_size(response.data_section().second)
{}
dht_direct_response_alert::dht_direct_response_alert(
aux::stack_allocator& alloc
, void* userdata
, udp::endpoint const& addr)
: userdata(userdata), addr(addr), m_alloc(alloc)
, m_response_idx(-1), m_response_size(0)
{}
std::string dht_direct_response_alert::message() const
{
char msg[1050];
snprintf(msg, sizeof(msg), "DHT direct response (address=%s) [ %s ]"
, addr.address().to_string().c_str()
, m_response_size ? std::string(m_alloc.ptr(m_response_idx), m_response_size).c_str() : "");
return msg;
}
bdecode_node dht_direct_response_alert::response() const
{
if (m_response_size == 0) return bdecode_node();
char const* start = m_alloc.ptr(m_response_idx);
char const* end = start + m_response_size;
error_code ec;
bdecode_node ret;
bdecode(start, end, ret, ec);
TORRENT_ASSERT(!ec);
return ret;
}
} // namespace libtorrent

View File

@ -311,6 +311,12 @@ namespace libtorrent { namespace dht
, _1, _2, cb));
}
void dht_tracker::direct_request(boost::asio::ip::udp::endpoint ep, entry& e
, boost::function<void(msg const&)> f)
{
m_dht.direct_request(ep, e, f);
}
// translate bittorrent kademlia message into the generice kademlia message
// used by the library
bool dht_tracker::incoming_packet(error_code const& ec

View File

@ -58,6 +58,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/kademlia/routing_table.hpp"
#include "libtorrent/kademlia/node.hpp"
#include "libtorrent/kademlia/dht_observer.hpp"
#include "libtorrent/kademlia/direct_request.hpp"
#include "libtorrent/kademlia/refresh.hpp"
#include "libtorrent/kademlia/get_peers.hpp"
@ -411,6 +412,22 @@ void node::announce(sha1_hash const& info_hash, int listen_port, int flags
, listen_port, info_hash, flags), flags & node::flag_seed);
}
void node::direct_request(boost::asio::ip::udp::endpoint ep, entry& e
, boost::function<void(msg const&)> f)
{
// not really a traversal
boost::intrusive_ptr<direct_traversal> algo(
new direct_traversal(*this, (node_id::min)(), f));
void* ptr = m_rpc.allocate_observer();
if (ptr == 0) return;
observer_ptr o(new (ptr) direct_observer(algo, ep, (node_id::min)()));
#if defined TORRENT_DEBUG || TORRENT_RELEASE_ASSERTS
o->m_in_constructor = false;
#endif
m_rpc.invoke(e, ep, o);
}
void node::get_item(sha1_hash const& target
, boost::function<bool(item&, bool)> f)
{
@ -868,9 +885,6 @@ void node::incoming_request(msg const& m, entry& e)
e["ip"] = endpoint_to_bytes(m.addr);
char const* query = top_level[0].string_ptr();
int query_len = top_level[0].string_length();
bdecode_node arg_ent = top_level[2];
bool read_only = top_level[1] && top_level[1].int_value() != 0;
node_id id(top_level[3].string_ptr());
@ -893,6 +907,12 @@ void node::incoming_request(msg const& m, entry& e)
// mirror back the other node's external port
reply["p"] = m.addr.port();
char const* query = top_level[0].string_ptr();
int query_len = top_level[0].string_length();
if (m_observer && m_observer->on_dht_request(query, query_len, m, e))
return;
if (query_len == 4 && memcmp(query, "ping", 4) == 0)
{
m_counters.inc_stats_counter(counters::dht_ping_in);

View File

@ -396,6 +396,13 @@ namespace libtorrent
#endif
}
void session_handle::dht_direct_request(boost::asio::ip::udp::endpoint ep, entry const& e, void* userdata)
{
#ifndef TORRENT_DISABLE_DHT
TORRENT_ASYNC_CALL3(dht_direct_request, ep, e, userdata);
#endif
}
#ifndef TORRENT_NO_DEPRECATE
entry session_handle::dht_state() const
{

View File

@ -894,7 +894,25 @@ namespace aux {
m_ses_extensions.push_back(ext);
m_alerts.add_extension(ext);
ext->added(session_handle(this));
// get any DHT queries the plugin would like to handle
// and record them in m_extension_dht_queries for lookup
// later
dht_extensions_t dht_ext;
ext->register_dht_extensions(dht_ext);
for (dht_extensions_t::iterator e = dht_ext.begin();
e != dht_ext.end(); ++e)
{
TORRENT_ASSERT(e->first.size() <= max_dht_query_length);
if (e->first.size() > max_dht_query_length) continue;
extention_dht_query registration;
registration.query_len = e->first.size();
std::copy(e->first.begin(), e->first.end(), registration.query.begin());
registration.handler = e->second;
m_extension_dht_queries.push_back(registration);
}
}
#endif // TORRENT_DISABLE_EXTENSIONS
#ifndef TORRENT_NO_DEPRECATE
@ -5532,6 +5550,14 @@ retry:
alerts.emplace_alert<dht_get_peers_reply_alert>(info_hash, peers);
}
void on_direct_response(alert_manager& alerts, void* userdata, dht::msg const& msg)
{
if (msg.message.type() == bdecode_node::none_t)
alerts.emplace_alert<dht_direct_response_alert>(userdata, msg.addr);
else
alerts.emplace_alert<dht_direct_response_alert>(userdata, msg.addr, msg.message);
}
} // anonymous namespace
void session_impl::dht_put_immutable_item(entry data, sha1_hash target)
@ -5563,6 +5589,12 @@ retry:
m_dht->announce(info_hash, port, flags, boost::bind(&on_dht_get_peers, boost::ref(m_alerts), info_hash, _1));
}
void session_impl::dht_direct_request(boost::asio::ip::udp::endpoint ep, entry& e, void* userdata)
{
if (!m_dht) return;
m_dht->direct_request(ep, e, boost::bind(&on_direct_response, boost::ref(m_alerts), userdata, _1));
}
#endif
void session_impl::maybe_update_udp_mapping(int nat, int local_port, int external_port)
@ -6496,6 +6528,24 @@ retry:
m_alerts.emplace_alert<dht_pkt_alert>(pkt, len, d, node);
}
bool session_impl::on_dht_request(char const* query, int query_len
, dht::msg const& request, entry& response)
{
#ifndef TORRENT_DISABLE_EXTENSIONS
if (query_len > max_dht_query_length) return false;
for (m_extension_dht_queries_t::iterator i = m_extension_dht_queries.begin();
i != m_extension_dht_queries.end(); ++i)
{
if (query_len == i->query_len
&& memcmp(i->query.data(), query, query_len) == 0
&& i->handler(request.addr, request.message, response))
return true;
}
#endif
return false;
}
void session_impl::set_external_address(address const& ip
, int source_type, address const& source)
{

View File

@ -438,6 +438,8 @@ struct obs : dht::dht_observer
virtual void log(dht_logger::module_t l, char const* fmt, ...) TORRENT_OVERRIDE {}
virtual void log_packet(message_direction_t dir, char const* pkt, int len
, udp::endpoint node) TORRENT_OVERRIDE {}
virtual bool on_dht_request(char const* query, int query_len
, dht::msg const& request, entry& response) TORRENT_OVERRIDE { return false; }
};
// TODO: test obfuscated_get_peers