simplified and optimized the DHT implementation

This commit is contained in:
Arvid Norberg 2009-09-20 00:23:36 +00:00
parent 8b15d6b850
commit 54cce9da60
25 changed files with 851 additions and 1420 deletions

View File

@ -66,7 +66,6 @@ set(sources
# -- kademlia --
set(kademlia_sources
closest_nodes
dht_tracker
node
refresh

View File

@ -403,7 +403,6 @@ SOURCES =
;
KADEMLIA_SOURCES =
closest_nodes
dht_tracker
node
refresh

View File

@ -99,7 +99,6 @@ nobase_include_HEADERS = \
extensions/ut_metadata.hpp \
extensions/ut_pex.hpp \
\
kademlia/closest_nodes.hpp \
kademlia/dht_tracker.hpp \
kademlia/find_data.hpp \
kademlia/logging.hpp \

View File

@ -1,104 +0,0 @@
/*
Copyright (c) 2006, Arvid Norberg & Daniel Wallin
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in
the documentation and/or other materials provided with the distribution.
* Neither the name of the author nor the names of its
contributors may be used to endorse or promote products derived
from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef CLOSEST_NODES_050323_HPP
#define CLOSEST_NODES_050323_HPP
#include <vector>
#include <libtorrent/kademlia/traversal_algorithm.hpp>
#include <libtorrent/kademlia/node_id.hpp>
#include <libtorrent/kademlia/routing_table.hpp>
#include <libtorrent/kademlia/observer.hpp>
#include <libtorrent/kademlia/msg.hpp>
#include <boost/function.hpp>
namespace libtorrent { namespace dht
{
class rpc_manager;
// -------- closest nodes -----------
class closest_nodes : public traversal_algorithm
{
public:
typedef boost::function<
void(std::vector<node_entry> const&)
> done_callback;
closest_nodes(
node_impl& node
, node_id target
, done_callback const& callback
);
virtual char const* name() const { return "closest nodes"; }
private:
void done();
void invoke(node_id const& id, udp::endpoint addr);
done_callback m_done_callback;
};
class closest_nodes_observer : public observer
{
public:
closest_nodes_observer(
boost::intrusive_ptr<traversal_algorithm> const& algorithm
, node_id self)
: observer(algorithm->allocator())
, m_algorithm(algorithm)
, m_self(self)
{}
~closest_nodes_observer();
void send(msg& p)
{
p.info_hash = m_algorithm->target();
}
void timeout();
void reply(msg const&);
void abort() { m_algorithm = 0; }
private:
boost::intrusive_ptr<traversal_algorithm> m_algorithm;
node_id const m_self;
};
} } // namespace libtorrent::dht
#endif // CLOSEST_NODES_050323_HPP

View File

@ -77,6 +77,7 @@ namespace libtorrent { namespace dht
{
friend void intrusive_ptr_add_ref(dht_tracker const*);
friend void intrusive_ptr_release(dht_tracker const*);
friend void send_callback(void* userdata, entry const& e, udp::endpoint const& addr, int flags);
dht_tracker(libtorrent::aux::session_impl& ses, rate_limited_udp_socket& sock
, dht_settings const& settings, entry const* state = 0);
@ -113,8 +114,8 @@ namespace libtorrent { namespace dht
void refresh_timeout(error_code const& e);
void tick(error_code const& e);
void on_bootstrap();
void send_packet(msg const& m);
void on_bootstrap(std::vector<std::pair<node_entry, std::string> > const&);
void send_packet(libtorrent::entry const& e, udp::endpoint const& addr, int send_flags);
void incoming_error(char const* msg, lazy_entry const& e, udp::endpoint const& ep);
@ -170,13 +171,6 @@ namespace libtorrent { namespace dht
int m_failed_announces;
int m_total_message_input;
int m_az_message_input;
int m_ut_message_input;
int m_lt_message_input;
int m_mp_message_input;
int m_gr_message_input;
int m_mo_message_input;
int m_total_in_bytes;
int m_total_out_bytes;

View File

@ -56,13 +56,14 @@ class node_impl;
// -------- find data -----------
//TODO: rename this to find_peers
class find_data : public traversal_algorithm
{
public:
typedef boost::function<void(std::vector<tcp::endpoint> const&)> data_callback;
typedef boost::function<void(std::vector<std::pair<node_entry, std::string> > const&)> nodes_callback;
void got_data(msg const* m);
void got_peers(std::vector<tcp::endpoint> const& peers);
void got_write_token(node_id const& n, std::string const& write_token)
{ m_write_tokens[n] = write_token; }
@ -71,12 +72,16 @@ public:
, nodes_callback const& ncallback);
virtual char const* name() const { return "get_peers"; }
node_id const target() const { return m_target; }
protected:
void done();
private:
void done();
void invoke(node_id const& id, udp::endpoint addr);
virtual void invoke(node_id const& id, udp::endpoint addr);
data_callback m_data_callback;
nodes_callback m_nodes_callback;
@ -96,20 +101,21 @@ public:
, m_self(self)
{}
~find_data_observer();
void send(msg& m)
{
m.reply = false;
m.message_id = messages::get_peers;
m.info_hash = m_algorithm->target();
}
void timeout();
void reply(msg const&);
void abort() { m_algorithm = 0; }
// with verbose logging, we log the size and
// offset of this structs members, so we need
// access to all of them
#ifndef TORRENT_DHT_VERBOSE_LOGGING
private:
#endif
boost::intrusive_ptr<find_data> m_algorithm;
// the node this observer sent a message to
// this is used to mark the result in the right
// node when we get a response
// TODO: replace this with the observer this-pointer
node_id const m_self;
};

View File

@ -35,6 +35,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include <string>
#include <libtorrent/kademlia/node_id.hpp>
#include "libtorrent/lazy_entry.hpp"
#if BOOST_VERSION < 103500
#include <asio/ip/udp.hpp>
#else
@ -45,7 +46,7 @@ namespace libtorrent {
namespace dht {
typedef std::vector<char> packet_t;
/*
namespace messages
{
enum { ping = 0, find_node = 1, get_peers = 2, announce_peer = 3, error = 4 };
@ -74,10 +75,8 @@ struct msg
// the message.
udp::endpoint addr;
// if this is a nodes response, these are the nodes
typedef std::vector<node_entry> nodes_t;
nodes_t nodes;
typedef std::vector<tcp::endpoint> peers_t;
peers_t peers;
// similar to transaction_id but for write operations.
@ -94,7 +93,20 @@ struct msg
int error_code;
std::string error_msg;
};
*/
typedef std::vector<node_entry> nodes_t;
typedef std::vector<tcp::endpoint> peers_t;
struct msg
{
// the message
lazy_entry const& message;
// the address of the process sending or receiving
// the message.
udp::endpoint addr;
};
} }

View File

@ -41,6 +41,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include <libtorrent/kademlia/rpc_manager.hpp>
#include <libtorrent/kademlia/node_id.hpp>
#include <libtorrent/kademlia/msg.hpp>
#include <libtorrent/kademlia/find_data.hpp>
#include <libtorrent/io.hpp>
#include <libtorrent/session_settings.hpp>
@ -108,13 +109,6 @@ public:
, m_token(write_token)
{}
void send(msg& m)
{
m.port = m_listen_port;
m.info_hash = m_info_hash;
m.write_token = m_token;
}
void timeout() {}
void reply(msg const&) {}
void abort() {}
@ -129,16 +123,16 @@ class node_impl : boost::noncopyable
{
typedef std::map<node_id, torrent_entry> table_t;
public:
node_impl(libtorrent::aux::session_impl& ses, boost::function<void(msg const&)> const& f
, dht_settings const& settings, boost::optional<node_id> nid);
node_impl(libtorrent::aux::session_impl& ses
, void (*f)(void*, entry const&, udp::endpoint const&, int)
, dht_settings const& settings, boost::optional<node_id> nid
, void* userdata);
virtual ~node_impl() {}
void refresh(node_id const& id, boost::function0<void> f);
void refresh(node_id const& id, find_data::nodes_callback const& f);
void bootstrap(std::vector<udp::endpoint> const& nodes
, boost::function0<void> f);
void find_node(node_id const& id, boost::function<
void(std::vector<node_entry> const&)> f);
, find_data::nodes_callback const& f);
void add_router_node(udp::endpoint router);
void unreachable(udp::endpoint const& ep);
@ -173,8 +167,10 @@ public:
void announce(sha1_hash const& info_hash, int listen_port
, boost::function<void(std::vector<tcp::endpoint> const&)> f);
bool verify_token(msg const& m);
std::string generate_token(msg const& m);
bool verify_token(std::string const& token, char const* info_hash
, udp::endpoint const& addr);
std::string generate_token(udp::endpoint const& addr, char const* info_hash);
// the returned time is the delay until connection_timeout()
// should be called again the next time
@ -212,7 +208,7 @@ protected:
// is called when a find data request is received. Should
// return false if the data is not stored on this node. If
// the data is stored, it should be serialized into 'data'.
bool on_find(msg const& m, std::vector<tcp::endpoint>& peers) const;
bool on_find(sha1_hash const& info_hash, std::vector<tcp::endpoint>& peers) const;
// this is called when a store request is received. The data
// is store-parameters and the data to be stored.
@ -233,7 +229,7 @@ private:
// since it might have references to it
std::set<traversal_algorithm*> m_running_requests;
void incoming_request(msg const& h);
void incoming_request(msg const& h, entry& e);
node_id m_id;
@ -250,6 +246,8 @@ private:
int m_secret[2];
libtorrent::aux::session_impl& m_ses;
void (*m_send)(void*, entry const&, udp::endpoint const&, int);
void* m_userdata;
};

View File

@ -79,6 +79,7 @@ struct node_entry
udp::endpoint ep() const { return udp::endpoint(addr, port); }
bool confirmed() const { return timeout_count == 0; }
// TODO: replace with a union of address_v4 and address_v6
address addr;
boost::uint16_t port;
// the number of times this node has failed to

View File

@ -48,15 +48,26 @@ struct msg;
TORRENT_EXPORT void intrusive_ptr_add_ref(observer const*);
TORRENT_EXPORT void intrusive_ptr_release(observer const*);
// intended struct layout (on 32 bit architectures)
// offset size alignment field
// 0 8 8 sent
// 8 8 4 m_refs
// 16 4 4 pool_allocator
// 20 16 4 m_addr
// 36 2 2 m_port
// 38 1 1 m_is_v6, m_in_constructor
// 39 1 1 <padding>
// 40
struct observer : boost::noncopyable
{
friend TORRENT_EXPORT void intrusive_ptr_add_ref(observer const*);
friend TORRENT_EXPORT void intrusive_ptr_release(observer const*);
observer(boost::pool<>& p)
: sent(time_now())
, pool_allocator(p)
: m_sent()
, m_refs(0)
, pool_allocator(p)
{
#ifdef TORRENT_DEBUG
m_in_constructor = true;
@ -68,10 +79,6 @@ struct observer : boost::noncopyable
TORRENT_ASSERT(!m_in_constructor);
}
// these two callbacks lets the observer add
// information to the message before it's sent
virtual void send(msg& m) = 0;
// this is called when a reply is received
virtual void reply(msg const& m) = 0;
@ -85,21 +92,38 @@ struct observer : boost::noncopyable
// is being destructed
virtual void abort() = 0;
#if TORRENT_USE_IPV6
address target_addr;
#else
address_v4 target_addr;
#endif
boost::uint16_t port;
udp::endpoint target_ep() const { return udp::endpoint(target_addr, port); }
ptime sent;
#ifdef TORRENT_DEBUG
bool m_in_constructor;
#endif
ptime sent() const { return m_sent; }
void set_target(udp::endpoint const& ep);
address target_addr() const;
udp::endpoint target_ep() const;
// with verbose logging, we log the size and
// offset of this structs members, so we need
// access to all of them
#ifndef TORRENT_DHT_VERBOSE_LOGGING
private:
boost::pool<>& pool_allocator;
#endif
ptime m_sent;
// reference counter for intrusive_ptr
mutable boost::detail::atomic_count m_refs;
boost::pool<>& pool_allocator;
union addr_t
{
#if TORRENT_USE_IPV6
address_v6::bytes_type v6;
#endif
address_v4::bytes_type v4;
} m_addr;
boost::uint16_t m_port;
bool m_is_v6:1;
#ifdef TORRENT_DEBUG
bool m_in_constructor:1;
#endif
};
typedef boost::intrusive_ptr<observer> observer_ptr;
@ -107,3 +131,4 @@ typedef boost::intrusive_ptr<observer> observer_ptr;
} }
#endif

View File

@ -33,119 +33,33 @@ POSSIBILITY OF SUCH DAMAGE.
#ifndef REFRESH_050324_HPP
#define REFRESH_050324_HPP
#include <vector>
#include <libtorrent/kademlia/traversal_algorithm.hpp>
#include <libtorrent/kademlia/node_id.hpp>
#include <libtorrent/kademlia/observer.hpp>
#include <libtorrent/kademlia/msg.hpp>
#include <libtorrent/kademlia/find_data.hpp>
#include <boost/function.hpp>
namespace libtorrent { namespace dht
{
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_DECLARE_LOG(refresh);
#endif
class routing_table;
class rpc_manager;
class refresh : public traversal_algorithm
class refresh : public find_data
{
public:
typedef std::vector<node_entry>::iterator InIt;
typedef boost::function<void()> done_callback;
typedef find_data::nodes_callback done_callback;
void ping_reply(node_id id);
void ping_timeout(node_id id, bool prevent_request = false);
refresh(node_impl& node, node_id target, InIt first, InIt last
refresh(node_impl& node, node_id target
, done_callback const& callback);
virtual char const* name() const { return "refresh"; }
virtual char const* name() const;
private:
void done();
void invoke(node_id const& id, udp::endpoint addr);
void invoke_pings_or_finish(bool prevent_request = false);
int m_max_active_pings;
int m_active_pings;
done_callback m_done_callback;
std::vector<result>::iterator m_leftover_nodes_iterator;
};
class refresh_observer : public observer
{
public:
refresh_observer(
boost::intrusive_ptr<refresh> const& algorithm
, node_id self)
: observer(algorithm->allocator())
, m_algorithm(algorithm)
, m_self(self)
{}
~refresh_observer();
void send(msg& m)
{
m.info_hash = m_algorithm->target();
}
void timeout();
void reply(msg const& m);
void abort() { m_algorithm = 0; }
private:
boost::intrusive_ptr<refresh> m_algorithm;
node_id const m_self;
};
class ping_observer : public observer
{
public:
ping_observer(
boost::intrusive_ptr<refresh> const& algorithm
, node_id self)
: observer(algorithm->allocator())
, m_self(self)
, m_algorithm(algorithm)
{}
~ping_observer();
void send(msg& p) {}
void timeout();
void reply(msg const& m);
void abort() { m_algorithm = 0; }
private:
node_id const m_self;
boost::intrusive_ptr<refresh> m_algorithm;
};
inline refresh::refresh(
node_impl& node
, node_id target
, refresh::InIt first
, refresh::InIt last
, done_callback const& callback)
: traversal_algorithm(node, target, first, last)
, m_max_active_pings(10)
, m_active_pings(0)
, m_done_callback(callback)
{
boost::intrusive_ptr<refresh> self(this);
add_requests();
}
} } // namespace libtorrent::dht
#endif // REFRESH_050324_HPP

View File

@ -65,7 +65,6 @@ struct null_observer : public observer
null_observer(boost::pool<>& allocator): observer(allocator) {}
virtual void reply(msg const&) {}
virtual void timeout() {}
virtual void send(msg&) {}
void abort() {}
};
@ -74,11 +73,11 @@ class routing_table;
class rpc_manager
{
public:
typedef boost::function1<void, msg const&> fun;
typedef boost::function1<void, msg const&> send_fun;
typedef void (*send_fun)(void* userdata, entry const&, udp::endpoint const&, int);
rpc_manager(fun const& incoming_fun, node_id const& our_id
, routing_table& table, send_fun const& sf);
rpc_manager(node_id const& our_id
, routing_table& table, send_fun const& sf
, void* userdata);
~rpc_manager();
void unreachable(udp::endpoint const& ep);
@ -87,10 +86,10 @@ public:
bool incoming(msg const&);
time_duration tick();
void invoke(int message_id, udp::endpoint target
void invoke(entry& e, udp::endpoint target
, observer_ptr o);
void reply(msg& m);
void add_our_id(entry& e);
#ifdef TORRENT_DEBUG
size_t allocation_size() const;
@ -124,8 +123,8 @@ private:
// waiting for to time out
int m_oldest_transaction_id;
fun m_incoming;
send_fun m_send;
void* m_userdata;
node_id m_our_id;
routing_table& m_table;
ptime m_timer;

View File

@ -67,21 +67,11 @@ public:
void status(dht_lookup& l);
virtual char const* name() const { return "traversal_algorithm"; }
virtual void start();
node_id const& target() const { return m_target; }
protected:
template<class InIt>
traversal_algorithm(node_impl& node, node_id target, InIt start, InIt end);
void add_requests();
void add_entry(node_id const& id, udp::endpoint addr, unsigned char flags);
void add_router_entries();
void init();
virtual void done() = 0;
virtual void invoke(node_id const& id, udp::endpoint addr) = 0;
struct result
{
@ -89,11 +79,33 @@ protected:
: id(id), addr(addr), flags(f) {}
node_id id;
// TODO: replace with union of address_v4 and address_v6 and a port
udp::endpoint addr;
enum { queried = 1, initial = 2, no_id = 4 };
unsigned char flags;
};
protected:
traversal_algorithm::traversal_algorithm(
node_impl& node
, node_id target)
: m_ref_count(0)
, m_node(node)
, m_target(target)
, m_invoke_count(0)
, m_branch_factor(3)
, m_responses(0)
, m_timeouts(0)
{}
void add_requests();
void add_router_entries();
void init();
virtual void done() = 0;
virtual void invoke(node_id const& id, udp::endpoint addr) = 0;
std::vector<result>::iterator last_iterator();
friend void intrusive_ptr_add_ref(traversal_algorithm* p)
@ -119,33 +131,6 @@ protected:
int m_timeouts;
};
template<class InIt>
traversal_algorithm::traversal_algorithm(
node_impl& node
, node_id target
, InIt start // <- nodes to initiate traversal with
, InIt end)
: m_ref_count(0)
, m_node(node)
, m_target(target)
, m_invoke_count(0)
, m_branch_factor(3)
, m_responses(0)
, m_timeouts(0)
{
using boost::bind;
for (InIt i = start; i != end; ++i)
{
add_entry(i->id, udp::endpoint(i->addr, i->port), result::initial);
}
// in case the routing table is empty, use the
// router nodes in the table
if (start == end) add_router_entries();
init();
}
} } // namespace libtorrent::dht
#endif // TRAVERSAL_ALGORITHM_050324_HPP

View File

@ -230,7 +230,7 @@ namespace libtorrent
char const* m_end;
};
TORRENT_EXPORT std::string print_entry(lazy_entry const& e);
TORRENT_EXPORT std::string print_entry(lazy_entry const& e, bool single_line = false);
#if TORRENT_USE_IOSTREAM
TORRENT_EXPORT std::ostream& operator<<(std::ostream& os, lazy_entry const& e);
#endif

View File

@ -36,6 +36,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/socket.hpp"
#include "libtorrent/io.hpp"
#include "libtorrent/error_code.hpp"
#include "libtorrent/lazy_entry.hpp"
#include <string>
namespace libtorrent
@ -109,6 +110,27 @@ namespace libtorrent
return Endpoint(addr, port);
}
#endif
template <class EndpointType>
void read_endpoint_list(libtorrent::lazy_entry const* n, std::vector<EndpointType>& epl)
{
using namespace libtorrent;
if (n->type() != lazy_entry::list_t) return;
for (int i = 0; i < n->list_size(); ++i)
{
lazy_entry const* e = n->list_at(i);
if (e->type() != lazy_entry::string_t) return;
if (e->string_length() < 6) continue;
char const* in = e->string_ptr();
if (e->string_length() == 6)
epl.push_back(read_v4_endpoint<EndpointType>(in));
#if TORRENT_USE_IPV6
else if (e->string_length() == 18)
epl.push_back(read_v6_endpoint<EndpointType>(in));
#endif
}
}
}

View File

@ -2,7 +2,6 @@ lib_LTLIBRARIES = libtorrent-rasterbar.la
if ENABLE_DHT
KADEMLIA_SOURCES = \
kademlia/closest_nodes.cpp \
kademlia/dht_tracker.cpp \
kademlia/find_data.cpp \
kademlia/node.cpp \

View File

@ -1,120 +0,0 @@
/*
Copyright (c) 2006, Arvid Norberg & Daniel Wallin
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in
the documentation and/or other materials provided with the distribution.
* Neither the name of the author nor the names of its
contributors may be used to endorse or promote products derived
from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
*/
#include "libtorrent/pch.hpp"
#include <libtorrent/kademlia/closest_nodes.hpp>
#include <libtorrent/kademlia/routing_table.hpp>
#include <libtorrent/kademlia/rpc_manager.hpp>
#include <libtorrent/kademlia/node.hpp>
#include "libtorrent/assert.hpp"
namespace libtorrent { namespace dht
{
closest_nodes_observer::~closest_nodes_observer()
{
if (m_algorithm) m_algorithm->failed(m_self, true);
}
void closest_nodes_observer::reply(msg const& in)
{
if (!m_algorithm)
{
TORRENT_ASSERT(false);
return;
}
if (!in.nodes.empty())
{
for (msg::nodes_t::const_iterator i = in.nodes.begin()
, end(in.nodes.end()); i != end; ++i)
{
m_algorithm->traverse(i->id, i->ep());
}
}
m_algorithm->finished(m_self);
m_algorithm = 0;
}
void closest_nodes_observer::timeout()
{
if (!m_algorithm) return;
m_algorithm->failed(m_self);
m_algorithm = 0;
}
closest_nodes::closest_nodes(
node_impl& node
, node_id target
, done_callback const& callback)
: traversal_algorithm(node, target, node.m_table.begin(), node.m_table.end())
, m_done_callback(callback)
{
boost::intrusive_ptr<closest_nodes> self(this);
add_requests();
}
void closest_nodes::invoke(node_id const& id, udp::endpoint addr)
{
TORRENT_ASSERT(m_node.m_rpc.allocation_size() >= sizeof(closest_nodes_observer));
void* ptr = m_node.m_rpc.allocator().malloc();
if (ptr == 0)
{
done();
return;
}
m_node.m_rpc.allocator().set_next_size(10);
observer_ptr o(new (ptr) closest_nodes_observer(this, id));
#ifdef TORRENT_DEBUG
o->m_in_constructor = false;
#endif
m_node.m_rpc.invoke(messages::find_node, addr, o);
}
void closest_nodes::done()
{
std::vector<node_entry> results;
int num_results = m_node.m_table.bucket_size();
for (std::vector<result>::iterator i = m_results.begin()
, end(m_results.end()); i != end && num_results > 0; ++i)
{
if (i->flags & result::no_id) continue;
if ((i->flags & result::queried) == 0) continue;
results.push_back(node_entry(i->id, i->addr));
--num_results;
}
m_done_callback(results);
}
} } // namespace libtorrent::dht

View File

@ -62,7 +62,6 @@ using libtorrent::dht::node_impl;
using libtorrent::dht::node_id;
using libtorrent::dht::packet_t;
using libtorrent::dht::msg;
namespace messages = libtorrent::dht::messages;
using namespace libtorrent::detail;
enum
@ -85,26 +84,6 @@ namespace
}
};
template <class EndpointType>
void read_endpoint_list(libtorrent::lazy_entry const* n, std::vector<EndpointType>& epl)
{
using namespace libtorrent;
if (n->type() != lazy_entry::list_t) return;
for (int i = 0; i < n->list_size(); ++i)
{
lazy_entry const* e = n->list_at(i);
if (e->type() != lazy_entry::string_t) return;
if (e->string_length() < 6) continue;
char const* in = e->string_ptr();
if (e->string_length() == 6)
epl.push_back(read_v4_endpoint<EndpointType>(in));
#if TORRENT_USE_IPV6
else if (e->string_length() == 18)
epl.push_back(read_v6_endpoint<EndpointType>(in));
#endif
}
}
template <class EndpointType>
void read_endpoint_list(libtorrent::entry const* n, std::vector<EndpointType>& epl)
{
@ -132,6 +111,16 @@ namespace
namespace libtorrent { namespace dht
{
#ifdef TORRENT_DHT_VERBOSE_LOGGING
int g_az_message_input = 0;
int g_ut_message_input = 0;
int g_lt_message_input = 0;
int g_mp_message_input = 0;
int g_gr_message_input = 0;
int g_mo_message_input = 0;
int g_unknown_message_input = 0;
#endif
void intrusive_ptr_add_ref(dht_tracker const* c)
{
TORRENT_ASSERT(c != 0);
@ -147,6 +136,55 @@ namespace libtorrent { namespace dht
delete c;
}
#ifdef TORRENT_DHT_VERBOSE_LOGGING
std::string parse_dht_client(lazy_entry const& e)
{
lazy_entry const* ver = e.dict_find_string("v");
if (!ver) return "generic";
std::string const& client = ver->string_value();
if (client.size() < 2)
{
++g_unknown_message_input;
return client;
}
else if (std::equal(client.begin(), client.begin() + 2, "Az"))
{
++g_az_message_input;
return "Azureus";
}
else if (std::equal(client.begin(), client.begin() + 2, "UT"))
{
++g_ut_message_input;
return "uTorrent";
}
else if (std::equal(client.begin(), client.begin() + 2, "LT"))
{
++g_lt_message_input;
return "libtorrent";
}
else if (std::equal(client.begin(), client.begin() + 2, "MP"))
{
++g_mp_message_input;
return "MooPolice";
}
else if (std::equal(client.begin(), client.begin() + 2, "GR"))
{
++g_gr_message_input;
return "GetRight";
}
else if (std::equal(client.begin(), client.begin() + 2, "MO"))
{
++g_mo_message_input;
return "Mono Torrent";
}
else
{
++g_unknown_message_input;
return client;
}
}
#endif
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_DEFINE_LOG(dht_tracker)
#endif
@ -168,11 +206,17 @@ namespace libtorrent { namespace dht
return boost::optional<node_id>(node_id(nid->string().c_str()));
}
void send_callback(void* userdata, entry const& e, udp::endpoint const& addr, int flags)
{
dht_tracker* self = (dht_tracker*)userdata;
self->send_packet(e, addr, flags);
}
// class that puts the networking and the kademlia node in a single
// unit and connecting them together.
dht_tracker::dht_tracker(libtorrent::aux::session_impl& ses, rate_limited_udp_socket& sock
, dht_settings const& settings, entry const* state)
: m_dht(ses, bind(&dht_tracker::send_packet, this, _1), settings, extract_node_id(state))
: m_dht(ses, &send_callback, settings, extract_node_id(state), this)
, m_ses(ses)
, m_sock(sock)
, m_last_new_key(time_now() - minutes(key_refresh))
@ -198,19 +242,13 @@ namespace libtorrent { namespace dht
m_announces = 0;
m_failed_announces = 0;
m_total_message_input = 0;
m_az_message_input = 0;
m_ut_message_input = 0;
m_lt_message_input = 0;
m_mp_message_input = 0;
m_gr_message_input = 0;
m_mo_message_input = 0;
m_total_in_bytes = 0;
m_total_out_bytes = 0;
m_queries_out_bytes = 0;
// turns on and off individual components' logging
rpc_log().enable(false);
// rpc_log().enable(false);
// node_log().enable(false);
traversal_log().enable(false);
// dht_tracker_log.enable(false);
@ -219,21 +257,6 @@ namespace libtorrent { namespace dht
#endif
}
void dht_tracker::incoming_error(char const* message, lazy_entry const& e, udp::endpoint const& ep)
{
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(dht_tracker) << "ERROR: '" << message << "' " << e;
#endif
msg reply;
reply.reply = true;
reply.message_id = messages::error;
reply.error_code = 203; // Protocol error
reply.error_msg = message;
reply.addr = ep;
reply.transaction_id = "";
send_packet(reply);
}
void dht_tracker::start(entry const& bootstrap)
{
std::vector<udp::endpoint> initial_nodes;
@ -262,8 +285,7 @@ namespace libtorrent { namespace dht
m_refresh_timer.expires_from_now(seconds(5), ec);
m_refresh_timer.async_wait(bind(&dht_tracker::refresh_timeout, self(), _1));
m_dht.bootstrap(initial_nodes, bind(&dht_tracker::on_bootstrap, self()));
m_dht.bootstrap(initial_nodes, bind(&dht_tracker::on_bootstrap, self(), _1));
}
void dht_tracker::stop()
@ -387,12 +409,12 @@ namespace libtorrent { namespace dht
<< "\t" << m_announces / float(tick_period)
<< "\t" << m_failed_announces / float(tick_period)
<< "\t" << (m_total_message_input / float(tick_period))
<< "\t" << (m_az_message_input / float(tick_period))
<< "\t" << (m_ut_message_input / float(tick_period))
<< "\t" << (m_lt_message_input / float(tick_period))
<< "\t" << (m_mp_message_input / float(tick_period))
<< "\t" << (m_gr_message_input / float(tick_period))
<< "\t" << (m_mo_message_input / float(tick_period))
<< "\t" << (g_az_message_input / float(tick_period))
<< "\t" << (g_ut_message_input / float(tick_period))
<< "\t" << (g_lt_message_input / float(tick_period))
<< "\t" << (g_mp_message_input / float(tick_period))
<< "\t" << (g_gr_message_input / float(tick_period))
<< "\t" << (g_mo_message_input / float(tick_period))
<< "\t" << (m_total_in_bytes / float(tick_period*60))
<< "\t" << (m_total_out_bytes / float(tick_period*60))
<< "\t" << (m_queries_out_bytes / float(tick_period*60))
@ -405,9 +427,13 @@ namespace libtorrent { namespace dht
m_announces = 0;
m_failed_announces = 0;
m_total_message_input = 0;
m_az_message_input = 0;
m_ut_message_input = 0;
m_lt_message_input = 0;
g_az_message_input = 0;
g_ut_message_input = 0;
g_lt_message_input = 0;
g_mp_message_input = 0;
g_gr_message_input = 0;
g_mo_message_input = 0;
g_unknown_message_input = 0;
m_total_in_bytes = 0;
m_total_out_bytes = 0;
m_queries_out_bytes = 0;
@ -459,7 +485,7 @@ namespace libtorrent { namespace dht
#ifdef TORRENT_DHT_VERBOSE_LOGGING
if (match->count == 20)
{
TORRENT_LOG(dht_tracker) << time_now_string() << " BANNING PEER [ ip: "
TORRENT_LOG(dht_tracker) << " BANNING PEER [ ip: "
<< ep << " time: " << total_milliseconds((now - match->limit) + seconds(5)) / 1000.f
<< " count: " << match->count << " ]";
}
@ -493,385 +519,35 @@ namespace libtorrent { namespace dht
TORRENT_ASSERT(bytes_transferred > 0);
extern void incoming_error(entry& e, char const* msg);
lazy_entry e;
int ret = lazy_bdecode(buf, buf + bytes_transferred, e);
if (ret != 0)
{
incoming_error("invalid bencoding", e, ep);
TORRENT_LOG(dht_tracker) << "<== " << ep << " ERROR: Invalid bencoding";
return;
}
#ifdef TORRENT_DHT_VERBOSE_LOGGING
std::stringstream log_line;
log_line << "RECEIVED ["
" ip: " << ep;
#endif
libtorrent::dht::msg m = {e, ep};
if (e.type() != lazy_entry::dict_t)
{
incoming_error("message is not a dictionary", e, ep);
return;
}
libtorrent::dht::msg m;
m.message_id = 0;
m.addr = ep;
lazy_entry const* transaction = e.dict_find_string("t");
if (!transaction)
{
incoming_error("missing or invalid transaction id", e, ep);
return;
}
m.transaction_id = transaction->string_value();
#ifdef TORRENT_DHT_VERBOSE_LOGGING
lazy_entry const* ver = e.dict_find_string("v");
if (!ver)
{
log_line << " c: generic";
}
else
{
std::string const& client = ver->string_value();
if (client.size() < 2)
{
log_line << " c: " << client;
}
else if (std::equal(client.begin(), client.begin() + 2, "Az"))
{
++m_az_message_input;
log_line << " c: Azureus";
}
else if (std::equal(client.begin(), client.begin() + 2, "UT"))
{
++m_ut_message_input;
log_line << " c: uTorrent";
}
else if (std::equal(client.begin(), client.begin() + 2, "LT"))
{
++m_lt_message_input;
log_line << " c: libtorrent";
}
else if (std::equal(client.begin(), client.begin() + 2, "MP"))
{
++m_mp_message_input;
log_line << " c: MooPolice";
}
else if (std::equal(client.begin(), client.begin() + 2, "GR"))
{
++m_gr_message_input;
log_line << " c: GetRight";
}
else if (std::equal(client.begin(), client.begin() + 2, "MO"))
{
++m_mo_message_input;
log_line << " c: Mono Torrent";
}
else
{
log_line << " c: " << client;
}
}
TORRENT_LOG(dht_tracker) << "<== " << ep << " ERROR: not a dictionary: "
<< print_entry(e, true);
#endif
lazy_entry const* y = e.dict_find_string("y");
if (!y || y->string_length() < 1)
{
incoming_error("missing or invalid message type", e, ep);
return;
}
char msg_type = *y->string_ptr();
if (msg_type == 'r')
{
#ifdef TORRENT_DHT_VERBOSE_LOGGING
log_line << " t: " << to_hex(m.transaction_id);
#endif
m.reply = true;
lazy_entry const* r = e.dict_find_dict("r");
if (!r)
{
incoming_error("missing or invalid reply dict", e, ep);
return;
}
lazy_entry const* id = r->dict_find_string("id");
if (!id)
{
incoming_error("missing or invalid id", e, ep);
return;
}
if (id->string_length() != 20)
{
incoming_error("invalid node id (not 20 bytes)", e, ep);
return;
}
std::copy(id->string_ptr(), id->string_ptr()
+ id->string_length(), m.id.begin());
#ifdef TORRENT_DHT_VERBOSE_LOGGING
log_line << " id: " << m.id;
#endif
lazy_entry const* n = r->dict_find_list("values");
if (n)
{
m.peers.clear();
if (n->list_size() == 1 && n->list_at(0)->type() == lazy_entry::string_t)
{
// assume it's mainline format
char const* peers = n->list_at(0)->string_ptr();
char const* end = peers + n->list_at(0)->string_length();
while (end - peers >= 6)
m.peers.push_back(read_v4_endpoint<tcp::endpoint>(peers));
}
else
{
// assume it's uTorrent/libtorrent format
read_endpoint_list<tcp::endpoint>(n, m.peers);
}
#ifdef TORRENT_DHT_VERBOSE_LOGGING
log_line << " p: " << m.peers.size();
#endif
}
m.nodes.clear();
n = r->dict_find_string("nodes");
if (n)
{
char const* nodes = n->string_ptr();
char const* end = nodes + n->string_length();
while (end - nodes >= 26)
{
node_id id;
std::copy(nodes, nodes + 20, id.begin());
nodes += 20;
m.nodes.push_back(libtorrent::dht::node_entry(
id, read_v4_endpoint<udp::endpoint>(nodes)));
}
#ifdef TORRENT_DHT_VERBOSE_LOGGING
log_line << " n: " << m.nodes.size();
#endif
}
n = r->dict_find_list("nodes2");
if (n)
{
for (int i = 0; i < n->list_size(); ++i)
{
lazy_entry const* p = n->list_at(0);
if (p->type() != lazy_entry::string_t) continue;
if (p->string_length() < 6 + 20) continue;
char const* in = p->string_ptr();
node_id id;
std::copy(in, in + 20, id.begin());
in += 20;
if (p->string_length() == 6 + 20)
m.nodes.push_back(libtorrent::dht::node_entry(
id, read_v4_endpoint<udp::endpoint>(in)));
#if TORRENT_USE_IPV6
else if (p->string_length() == 18 + 20)
m.nodes.push_back(libtorrent::dht::node_entry(
id, read_v6_endpoint<udp::endpoint>(in)));
#endif
}
#ifdef TORRENT_DHT_VERBOSE_LOGGING
log_line << " n2: " << m.nodes.size();
#endif
}
lazy_entry const* token = r->dict_find_string("token");
if (token)
{
m.write_token = token->string_value();
TORRENT_ASSERT(m.write_token.size() == token->string_length());
#ifdef TORRENT_DHT_VERBOSE_LOGGING
log_line << " token: " << to_hex(m.write_token);
#endif
}
}
else if (msg_type == 'q')
{
m.reply = false;
lazy_entry const* a = e.dict_find_dict("a");
if (!a)
{
incoming_error("missing or invalid argument dictionary", e, ep);
return;
}
lazy_entry const* id = a->dict_find_string("id");
if (!id)
{
incoming_error("missing or invalid node id", e, ep);
return;
}
if (id->string_length() != 20)
{
incoming_error("invalid node id (not 20 bytes)", e, ep);
return;
}
std::copy(id->string_ptr(), id->string_ptr()
+ id->string_length(), m.id.begin());
lazy_entry const* q = e.dict_find_string("q");
if (!q)
{
incoming_error("invalid or missing query string", e, ep);
return;
}
std::string request_kind = q->string_value();
#ifdef TORRENT_DHT_VERBOSE_LOGGING
log_line << " q: " << request_kind;
#endif
if (request_kind == "ping")
{
m.message_id = libtorrent::dht::messages::ping;
}
else if (request_kind == "find_node")
{
lazy_entry const* target = a->dict_find_string("target");
if (!target)
{
incoming_error("missing or invalid target", e, ep);
return;
}
if (target->string_length() != 20)
{
incoming_error("invalid target (not 20 bytes)", e, ep);
return;
}
std::copy(target->string_ptr(), target->string_ptr()
+ target->string_length(), m.info_hash.begin());
#ifdef TORRENT_DHT_VERBOSE_LOGGING
log_line << " t: " << boost::lexical_cast<std::string>(m.info_hash);
#endif
m.message_id = libtorrent::dht::messages::find_node;
}
else if (request_kind == "get_peers")
{
lazy_entry const* info_hash = a->dict_find_string("info_hash");
if (!info_hash)
{
incoming_error("missing or invalid info_hash", e, ep);
return;
}
if (info_hash->string_length() != 20)
{
incoming_error("invalid info_hash (not 20 bytes)", e, ep);
return;
}
std::copy(info_hash->string_ptr(), info_hash->string_ptr()
+ info_hash->string_length(), m.info_hash.begin());
m.message_id = libtorrent::dht::messages::get_peers;
#ifdef TORRENT_DHT_VERBOSE_LOGGING
log_line << " ih: " << boost::lexical_cast<std::string>(m.info_hash);
#endif
}
else if (request_kind == "announce_peer")
{
#ifdef TORRENT_DHT_VERBOSE_LOGGING
++m_announces;
#endif
lazy_entry const* info_hash = a->dict_find_string("info_hash");
if (!info_hash)
{
incoming_error("missing or invalid info_hash", e, ep);
return;
}
if (info_hash->string_length() != 20)
{
incoming_error("invalid info_hash (not 20 bytes)", e, ep);
return;
}
std::copy(info_hash->string_ptr(), info_hash->string_ptr()
+ info_hash->string_length(), m.info_hash.begin());
m.port = a->dict_find_int_value("port", -1);
if (m.port == -1)
{
incoming_error("missing or invalid port in announce_peer message", e, ep);
return;
}
lazy_entry const* token = a->dict_find_string("token");
if (!token)
{
incoming_error("missing or invalid token in announce peer", e, ep);
return;
}
m.write_token = token->string_value();
m.message_id = libtorrent::dht::messages::announce_peer;
#ifdef TORRENT_DHT_VERBOSE_LOGGING
log_line << " token: " << to_hex(m.write_token);
log_line << " ih: " << boost::lexical_cast<std::string>(m.info_hash);
log_line << " p: " << m.port;
if (!m_dht.verify_token(m))
++m_failed_announces;
#endif
}
else
{
incoming_error("unknown query", e, ep);
return;
}
}
else if (msg_type == 'e')
{
m.message_id = messages::error;
m.error_code = 0;
lazy_entry const* list = e.dict_find_list("e");
if (!list)
{
list = e.dict_find_string("e");
if (!list)
{
incoming_error("missing or invalid 'e' in error message", e, ep);
return;
}
m.error_msg = list->string_value();
}
else
{
if (list->list_size() > 0 && list->list_at(0)->type() == lazy_entry::int_t)
m.error_code = list->list_at(0)->int_value();
if (list->list_size() > 1 && list->list_at(1)->type() == lazy_entry::string_t)
m.error_msg = list->list_at(1)->string_value();
}
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(dht_tracker) << log_line.str() << " ]";
TORRENT_LOG(dht_tracker) << "ERROR: incoming error: " << m.error_code
<< " " << m.error_msg;
#endif
return;
}
else
{
incoming_error("unknown message", e, ep);
entry r;
incoming_error(r, "message is not a dictionary");
send_packet(r, ep, 0);
return;
}
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(dht_tracker) << log_line.str() << " ]";
// TORRENT_LOG(dht_tracker) << std::string(buf, buf + bytes_transferred);
if (!m.reply)
{
++m_queries_received[m.message_id];
m_queries_bytes_received[m.message_id] += int(bytes_transferred);
}
parse_dht_client(e);
TORRENT_LOG(dht_tracker) << "<== " << ep << " " << print_entry(e, true);
#endif
TORRENT_ASSERT(m.message_id != messages::error);
m_dht.incoming(m);
}
@ -945,217 +621,52 @@ namespace libtorrent { namespace dht
m_dht.add_router_node(host->endpoint());
}
void dht_tracker::on_bootstrap()
void dht_tracker::on_bootstrap(std::vector<std::pair<node_entry, std::string> > const&)
{}
namespace
{
void write_nodes_entry(entry& r, libtorrent::dht::msg const& m)
{
bool ipv6_nodes = false;
entry& n = r["nodes"];
std::back_insert_iterator<std::string> out(n.string());
for (msg::nodes_t::const_iterator i = m.nodes.begin()
, end(m.nodes.end()); i != end; ++i)
{
if (!i->addr.is_v4())
{
ipv6_nodes = true;
continue;
}
std::copy(i->id.begin(), i->id.end(), out);
write_endpoint(udp::endpoint(i->addr, i->port), out);
}
if (ipv6_nodes)
{
entry& p = r["nodes2"];
std::string endpoint;
for (msg::nodes_t::const_iterator i = m.nodes.begin()
, end(m.nodes.end()); i != end; ++i)
{
if (!i->addr.is_v6()) continue;
endpoint.resize(18 + 20);
std::string::iterator out = endpoint.begin();
std::copy(i->id.begin(), i->id.end(), out);
out += 20;
write_endpoint(udp::endpoint(i->addr, i->port), out);
endpoint.resize(out - endpoint.begin());
p.list().push_back(entry(endpoint));
}
}
}
}
void dht_tracker::send_packet(msg const& m)
void dht_tracker::send_packet(libtorrent::entry const& e, udp::endpoint const& addr, int send_flags)
{
using libtorrent::bencode;
using libtorrent::entry;
int send_flags = 0;
entry e(entry::dictionary_t);
TORRENT_ASSERT(!m.transaction_id.empty() || m.message_id == messages::error);
e["t"] = m.transaction_id;
static char const version_str[] = {'L', 'T'
, LIBTORRENT_VERSION_MAJOR, LIBTORRENT_VERSION_MINOR};
e["v"] = std::string(version_str, version_str + 4);
#ifdef TORRENT_DHT_VERBOSE_LOGGING
std::stringstream log_line;
log_line << "SENDING [ ip: " << m.addr
<< " t: " << to_hex(m.transaction_id);
#endif
if (m.message_id == messages::error)
{
TORRENT_ASSERT(m.reply);
e["y"] = "e";
entry error_list(entry::list_t);
TORRENT_ASSERT(m.error_code > 200 && m.error_code <= 204);
error_list.list().push_back(entry(m.error_code));
error_list.list().push_back(entry(m.error_msg));
e["e"] = error_list;
#ifdef TORRENT_DHT_VERBOSE_LOGGING
log_line << " err: " << m.error_code
<< " msg: " << m.error_msg;
#endif
}
else if (m.reply)
{
e["y"] = "r";
e["r"] = entry(entry::dictionary_t);
entry& r = e["r"];
r["id"] = std::string((char*)m.id.begin(), (char*)m.id.end());
if (!m.write_token.empty())
{
r["token"] = m.write_token;
#ifdef TORRENT_DHT_VERBOSE_LOGGING
log_line << " token: " << to_hex(m.write_token);
#endif
}
#ifdef TORRENT_DHT_VERBOSE_LOGGING
log_line << " r: " << messages::ids[m.message_id]
<< " id: " << m.id;
#endif
switch (m.message_id)
{
case messages::ping:
break;
case messages::find_node:
{
write_nodes_entry(r, m);
break;
}
case messages::get_peers:
{
write_nodes_entry(r, m);
if (!m.peers.empty())
{
r["values"] = entry(entry::list_t);
entry& p = r["values"];
std::string endpoint;
for (msg::peers_t::const_iterator i = m.peers.begin()
, end(m.peers.end()); i != end; ++i)
{
endpoint.resize(18);
std::string::iterator out = endpoint.begin();
write_endpoint(*i, out);
endpoint.resize(out - endpoint.begin());
p.list().push_back(entry(endpoint));
}
#ifdef TORRENT_DHT_VERBOSE_LOGGING
log_line << " values: " << m.peers.size();
#endif
}
break;
}
case messages::announce_peer:
break;
break;
}
}
else
{
// set bit 1 of send_flags to indicate that
// this packet should not be dropped by the
// rate limiter.
e["y"] = "q";
e["a"] = entry(entry::dictionary_t);
entry& a = e["a"];
a["id"] = std::string((char*)m.id.begin(), (char*)m.id.end());
TORRENT_ASSERT(m.message_id <= messages::error);
e["q"] = messages::ids[m.message_id];
#ifdef TORRENT_DHT_VERBOSE_LOGGING
log_line << " q: " << messages::ids[m.message_id]
<< " id: " << m.id;
#endif
switch (m.message_id)
{
case messages::find_node:
{
send_flags = 1;
a["target"] = std::string((char*)m.info_hash.begin(), (char*)m.info_hash.end());
#ifdef TORRENT_DHT_VERBOSE_LOGGING
log_line << " target: " << boost::lexical_cast<std::string>(m.info_hash);
#endif
break;
}
case messages::get_peers:
{
send_flags = 1;
a["info_hash"] = std::string((char*)m.info_hash.begin(), (char*)m.info_hash.end());
#ifdef TORRENT_DHT_VERBOSE_LOGGING
log_line << " ih: " << boost::lexical_cast<std::string>(m.info_hash);
#endif
break;
}
case messages::announce_peer:
send_flags = 1;
a["port"] = m.port;
a["info_hash"] = std::string((char*)m.info_hash.begin(), (char*)m.info_hash.end());
a["token"] = m.write_token;
#ifdef TORRENT_DHT_VERBOSE_LOGGING
log_line << " port: " << m.port
<< " ih: " << boost::lexical_cast<std::string>(m.info_hash)
<< " token: " << to_hex(m.write_token);
#endif
break;
default: break;
}
}
m_send_buf.clear();
bencode(std::back_inserter(m_send_buf), e);
#ifdef TORRENT_DHT_VERBOSE_LOGGING
std::stringstream log_line;
lazy_entry print;
int ret = lazy_bdecode(&m_send_buf[0], &m_send_buf[0] + m_send_buf.size(), print);
TORRENT_ASSERT(ret == 0);
log_line << print_entry(print, true);
#endif
error_code ec;
if (m_sock.send(m.addr, &m_send_buf[0], (int)m_send_buf.size(), ec, send_flags))
if (m_sock.send(addr, &m_send_buf[0], (int)m_send_buf.size(), ec, send_flags))
{
// account for IP and UDP overhead
m_sent_bytes += m_send_buf.size() + (m.addr.address().is_v6() ? 48 : 28);
m_sent_bytes += m_send_buf.size() + (addr.address().is_v6() ? 48 : 28);
#ifdef TORRENT_DHT_VERBOSE_LOGGING
m_total_out_bytes += m_send_buf.size();
if (m.reply)
if (e["y"].string() == "r")
{
++m_replies_sent[m.message_id];
m_replies_bytes_sent[m.message_id] += int(m_send_buf.size());
// TODO: fix this stats logging
// ++m_replies_sent[e["r"]];
// m_replies_bytes_sent[e["r"]] += int(m_send_buf.size());
}
else
else if (e["y"].string() == "q")
{
m_queries_out_bytes += m_send_buf.size();
}
TORRENT_LOG(dht_tracker) << "==> " << addr << " " << log_line.str();
#endif
}
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(dht_tracker) << log_line.str() << " ]";
// TORRENT_LOG(dht_tracker) << std::string(m_send_buf.begin(), m_send_buf.end());
else
{
TORRENT_LOG(dht_tracker) << "==> " << addr << " DROPPED " << log_line.str();
}
#endif
}

View File

@ -38,10 +38,20 @@ POSSIBILITY OF SUCH DAMAGE.
#include <libtorrent/kademlia/node.hpp>
#include <libtorrent/io.hpp>
#include <libtorrent/socket.hpp>
#include <libtorrent/socket_io.hpp>
#include <vector>
namespace libtorrent { namespace dht
{
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_DECLARE_LOG(dht_tracker);
#endif
using detail::read_v4_endpoint;
using detail::read_v6_endpoint;
using detail::read_endpoint_list;
find_data_observer::~find_data_observer()
{
if (m_algorithm) m_algorithm->failed(m_self);
@ -55,22 +65,116 @@ void find_data_observer::reply(msg const& m)
return;
}
if (!m.write_token.empty())
m_algorithm->got_write_token(m.id, m.write_token);
#ifdef TORRENT_DHT_VERBOSE_LOGGING
std::stringstream log_line;
log_line << " incoming get_peer response [ ";
#endif
if (!m.peers.empty())
m_algorithm->got_data(&m);
if (!m.nodes.empty())
lazy_entry const* r = m.message.dict_find_dict("r");
if (!r)
{
for (msg::nodes_t::const_iterator i = m.nodes.begin()
, end(m.nodes.end()); i != end; ++i)
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(dht_tracker) << " missing response dict";
#endif
return;
}
lazy_entry const* id = r->dict_find_string("id");
if (!id || id->string_length() != 20)
{
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(dht_tracker) << " invalid id in response";
#endif
return;
}
lazy_entry const* token = r->dict_find_string("token");
if (token)
{
m_algorithm->got_write_token(node_id(id->string_ptr()), token->string_value());
#ifdef TORRENT_DHT_VERBOSE_LOGGING
log_line << " token: " << to_hex(token->string_value());
#endif
}
// look for peers
lazy_entry const* n = r->dict_find_list("values");
if (n)
{
std::vector<tcp::endpoint> peer_list;
if (n->list_size() == 1 && n->list_at(0)->type() == lazy_entry::string_t)
{
m_algorithm->traverse(i->id, udp::endpoint(i->addr, i->port));
// assume it's mainline format
char const* peers = n->list_at(0)->string_ptr();
char const* end = peers + n->list_at(0)->string_length();
#ifdef TORRENT_DHT_VERBOSE_LOGGING
log_line << " p: " << ((end - peers) / 6);
#endif
while (end - peers >= 6)
peer_list.push_back(read_v4_endpoint<tcp::endpoint>(peers));
}
else
{
// assume it's uTorrent/libtorrent format
read_endpoint_list<tcp::endpoint>(n, peer_list);
#ifdef TORRENT_DHT_VERBOSE_LOGGING
log_line << " p: " << n->list_size();
#endif
}
m_algorithm->got_peers(peer_list);
}
// look for nodes
n = r->dict_find_string("nodes");
if (n)
{
std::vector<node_entry> node_list;
char const* nodes = n->string_ptr();
char const* end = nodes + n->string_length();
#ifdef TORRENT_DHT_VERBOSE_LOGGING
log_line << " nodes: " << ((end - nodes) / 26);
#endif
while (end - nodes >= 26)
{
node_id id;
std::copy(nodes, nodes + 20, id.begin());
nodes += 20;
m_algorithm->traverse(id, read_v4_endpoint<udp::endpoint>(nodes));
}
}
n = r->dict_find_list("nodes2");
if (n)
{
#ifdef TORRENT_DHT_VERBOSE_LOGGING
log_line << " nodes2: " << n->list_size();
#endif
for (int i = 0; i < n->list_size(); ++i)
{
lazy_entry const* p = n->list_at(0);
if (p->type() != lazy_entry::string_t) continue;
if (p->string_length() < 6 + 20) continue;
char const* in = p->string_ptr();
node_id id;
std::copy(in, in + 20, id.begin());
in += 20;
if (p->string_length() == 6 + 20)
m_algorithm->traverse(id, read_v4_endpoint<udp::endpoint>(in));
#if TORRENT_USE_IPV6
else if (p->string_length() == 18 + 20)
m_algorithm->traverse(id, read_v6_endpoint<udp::endpoint>(in));
#endif
}
}
m_algorithm->finished(m_self);
m_algorithm = 0;
#ifdef TORRENT_DHT_VERBOSE_LOGGING
log_line << " ]";
TORRENT_LOG(dht_tracker) << log_line.str();
#endif
}
void find_data_observer::timeout()
@ -86,14 +190,17 @@ find_data::find_data(
, node_id target
, data_callback const& dcallback
, nodes_callback const& ncallback)
: traversal_algorithm(node, target, node.m_table.begin(), node.m_table.end())
: traversal_algorithm(node, target)
, m_data_callback(dcallback)
, m_nodes_callback(ncallback)
, m_target(target)
, m_done(false)
{
boost::intrusive_ptr<find_data> self(this);
add_requests();
for (routing_table::const_iterator i = node.m_table.begin()
, end(node.m_table.end()); i != end; ++i)
{
add_entry(i->id, i->ep(), result::initial);
}
}
void find_data::invoke(node_id const& id, udp::endpoint addr)
@ -116,12 +223,17 @@ void find_data::invoke(node_id const& id, udp::endpoint addr)
#ifdef TORRENT_DEBUG
o->m_in_constructor = false;
#endif
m_node.m_rpc.invoke(messages::get_peers, addr, o);
entry e;
e["y"] = "q";
e["q"] = "get_peers";
entry& a = e["a"];
a["info_hash"] = id.to_string();
m_node.m_rpc.invoke(e, addr, o);
}
void find_data::got_data(msg const* m)
void find_data::got_peers(std::vector<tcp::endpoint> const& peers)
{
m_data_callback(m->peers);
m_data_callback(peers);
}
void find_data::done()

View File

@ -42,6 +42,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/hasher.hpp"
#include "libtorrent/random_sample.hpp"
#include "libtorrent/alert_types.hpp"
#include "libtorrent/socket.hpp"
#include "libtorrent/aux_/session_impl.hpp"
#include "libtorrent/kademlia/node_id.hpp"
#include "libtorrent/kademlia/rpc_manager.hpp"
@ -49,7 +50,6 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/kademlia/node.hpp"
#include "libtorrent/kademlia/refresh.hpp"
#include "libtorrent/kademlia/closest_nodes.hpp"
#include "libtorrent/kademlia/find_data.hpp"
using boost::bind;
@ -57,6 +57,8 @@ using boost::bind;
namespace libtorrent { namespace dht
{
using detail::write_endpoint;
#ifdef _MSC_VER
namespace
{
@ -93,24 +95,26 @@ void purge_peers(std::set<peer_entry>& peers)
void nop() {}
node_impl::node_impl(libtorrent::aux::session_impl& ses
, boost::function<void(msg const&)> const& f
, void (*f)(void*, entry const&, udp::endpoint const&, int)
, dht_settings const& settings
, boost::optional<node_id> nid)
, boost::optional<node_id> nid
, void* userdata)
: m_settings(settings)
, m_id(nid ? *nid : generate_id())
, m_table(m_id, 8, settings)
, m_rpc(bind(&node_impl::incoming_request, this, _1)
, m_id, m_table, f)
, m_rpc(m_id, m_table, f, userdata)
, m_last_tracker_tick(time_now())
, m_ses(ses)
, m_send(f)
, m_userdata(userdata)
{
m_secret[0] = std::rand();
m_secret[1] = std::rand();
}
bool node_impl::verify_token(msg const& m)
bool node_impl::verify_token(std::string const& token, char const* info_hash
, udp::endpoint const& addr)
{
std::string const& token = m.write_token;
if (token.length() != 4)
{
#ifdef TORRENT_DHT_VERBOSE_LOGGING
@ -121,11 +125,11 @@ bool node_impl::verify_token(msg const& m)
hasher h1;
error_code ec;
std::string address = m.addr.address().to_string(ec);
std::string address = addr.address().to_string(ec);
if (ec) return false;
h1.update(&address[0], address.length());
h1.update((char*)&m_secret[0], sizeof(m_secret[0]));
h1.update((char*)&m.info_hash[0], sha1_hash::size);
h1.update((char*)info_hash, sha1_hash::size);
sha1_hash h = h1.final();
if (std::equal(token.begin(), token.end(), (signed char*)&h[0]))
@ -134,24 +138,24 @@ bool node_impl::verify_token(msg const& m)
hasher h2;
h2.update(&address[0], address.length());
h2.update((char*)&m_secret[1], sizeof(m_secret[1]));
h2.update((char*)&m.info_hash[0], sha1_hash::size);
h2.update((char*)info_hash, sha1_hash::size);
h = h2.final();
if (std::equal(token.begin(), token.end(), (signed char*)&h[0]))
return true;
return false;
}
std::string node_impl::generate_token(msg const& m)
std::string node_impl::generate_token(udp::endpoint const& addr, char const* info_hash)
{
std::string token;
token.resize(4);
hasher h;
error_code ec;
std::string address = m.addr.address().to_string(ec);
std::string address = addr.address().to_string(ec);
TORRENT_ASSERT(!ec);
h.update(&address[0], address.length());
h.update((char*)&m_secret[0], sizeof(m_secret[0]));
h.update((char*)&m.info_hash[0], sha1_hash::size);
h.update(info_hash, sha1_hash::size);
sha1_hash hash = h.final();
std::copy(hash.begin(), hash.begin() + 4, (signed char*)&token[0]);
@ -159,40 +163,30 @@ std::string node_impl::generate_token(msg const& m)
}
void node_impl::refresh(node_id const& id
, boost::function0<void> f)
, find_data::nodes_callback const& f)
{
// use the 'bucket size' closest nodes
// to start the refresh with
std::vector<node_entry> start;
start.reserve(m_table.bucket_size());
m_table.find_node(id, start, routing_table::include_failed);
new dht::refresh(*this, id, start.begin(), start.end(), f);
boost::intrusive_ptr<dht::refresh> r(new dht::refresh(*this, id, f));
r->start();
}
void node_impl::bootstrap(std::vector<udp::endpoint> const& nodes
, boost::function0<void> f)
, find_data::nodes_callback const& f)
{
/*
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(node) << "bootrapping: " << nodes.size();
boost::intrusive_ptr<dht::refresh> r(new dht::refresh(*this, m_id, f));
for (std::vector<udp::endpoint>::const_iterator i = nodes.begin()
, end(nodes.end()); i != end; ++i)
TORRENT_LOG(node) << " " << *i;
#endif
*/
std::vector<node_entry> start;
start.reserve(nodes.size());
std::copy(nodes.begin(), nodes.end(), std::back_inserter(start));
new dht::refresh(*this, m_id, start.begin(), start.end(), f);
{
r->add_entry(node_id(0), *i, traversal_algorithm::result::initial);
}
r->start();
}
void node_impl::refresh()
{
std::vector<node_entry> start;
start.reserve(m_table.size().get<0>());
std::copy(m_table.begin(), m_table.end(), std::back_inserter(start));
new dht::refresh(*this, m_id, start.begin(), start.end(), bind(&nop));
boost::intrusive_ptr<dht::refresh> r(new dht::refresh(*this, m_id, boost::bind(&nop)));
r->start();
}
int node_impl::bucket_size(int bucket)
@ -234,11 +228,8 @@ void node_impl::refresh_bucket(int bucket)
TORRENT_ASSERT(distance_exp(m_id, target) == bucket);
std::vector<node_entry> start;
start.reserve(m_table.bucket_size());
m_table.find_node(target, start, routing_table::include_failed);
new dht::refresh(*this, target, start.begin(), start.end(), bind(&nop));
boost::intrusive_ptr<dht::refresh> ta(new dht::refresh(*this, target, bind(&nop)));
ta->start();
m_table.touch_bucket(bucket);
}
@ -249,9 +240,46 @@ void node_impl::unreachable(udp::endpoint const& ep)
void node_impl::incoming(msg const& m)
{
if (m_rpc.incoming(m))
extern void incoming_error(entry& e, char const* msg);
// is this a reply?
lazy_entry const* y_ent = m.message.dict_find_string("y");
if (!y_ent || y_ent->string_length() == 0)
{
refresh();
entry e;
incoming_error(e, "missing 'y' entry");
m_send(m_userdata, e, m.addr, 0);
return;
}
char y = *(y_ent->string_ptr());
switch (y)
{
case 'r':
{
if (m_rpc.incoming(m)) refresh();
break;
}
case 'q':
{
TORRENT_ASSERT(m.message.dict_find_string_value("y") == "q");
entry e;
incoming_request(m, e);
m_send(m_userdata, e, m.addr, 0);
break;
}
case 'e':
{
#ifdef TORRENT_DHT_VERBOSE_LOGGING
lazy_entry const* err = m.message.dict_find_list("e");
if (err && err->list_size() >= 2)
{
TORRENT_LOG(node) << "INCOMING ERROR: " << err->list_string_value_at(1);
}
#endif
break;
}
}
}
@ -282,7 +310,13 @@ namespace
#ifdef TORRENT_DEBUG
o->m_in_constructor = false;
#endif
rpc.invoke(messages::announce_peer, i->first.ep(), o);
entry e;
e["y"] = "q";
e["q"] = "announce_peer";
entry& a = e["a"];
a["port"] = listen_port;
a["token"] = i->second;
rpc.invoke(e, i->first.ep(), o);
}
}
}
@ -306,7 +340,10 @@ void node_impl::add_node(udp::endpoint node)
#ifdef TORRENT_DEBUG
o->m_in_constructor = false;
#endif
m_rpc.invoke(messages::ping, node, o);
entry e;
e["y"] = "q";
e["q"] = "ping";
m_rpc.invoke(e, node, o);
}
void node_impl::announce(sha1_hash const& info_hash, int listen_port
@ -317,8 +354,10 @@ void node_impl::announce(sha1_hash const& info_hash, int listen_port
#endif
// search for nodes with ids close to id or with peers
// for info-hash id. then send announce_peer to them.
new find_data(*this, info_hash, f, boost::bind(&announce_fun, _1, boost::ref(m_rpc)
, listen_port, info_hash));
boost::intrusive_ptr<find_data> ta(new find_data(*this, info_hash, f
, boost::bind(&announce_fun, _1, boost::ref(m_rpc)
, listen_port, info_hash)));
ta->start();
}
time_duration node_impl::refresh_timeout()
@ -388,30 +427,6 @@ time_duration node_impl::connection_timeout()
void node_impl::on_announce(msg const& m, msg& reply)
{
if (m_ses.m_alerts.should_post<dht_announce_alert>())
m_ses.m_alerts.post_alert(dht_announce_alert(
m.addr.address(), m.port, m.info_hash));
if (!verify_token(m))
{
reply.message_id = messages::error;
reply.error_code = 203;
reply.error_msg = "Incorrect token in announce_peer";
return;
}
// the token was correct. That means this
// node is not spoofing its address. So, let
// the table get a chance to add it.
m_table.node_seen(m.id, m.addr);
torrent_entry& v = m_map[m.info_hash];
peer_entry e;
e.addr = tcp::endpoint(m.addr.address(), m.port);
e.added = time_now();
std::set<peer_entry>::iterator i = v.peers.find(e);
if (i != v.peers.end()) v.peers.erase(i++);
v.peers.insert(i, e);
}
namespace
@ -438,12 +453,12 @@ void node_impl::status(session_status& s)
}
}
bool node_impl::on_find(msg const& m, std::vector<tcp::endpoint>& peers) const
bool node_impl::on_find(sha1_hash const& info_hash, std::vector<tcp::endpoint>& peers) const
{
if (m_ses.m_alerts.should_post<dht_get_peers_alert>())
m_ses.m_alerts.post_alert(dht_get_peers_alert(m.info_hash));
m_ses.m_alerts.post_alert(dht_get_peers_alert(info_hash));
table_t::const_iterator i = m_map.find(m.info_hash);
table_t::const_iterator i = m_map.find(info_hash);
if (i == m_map.end()) return false;
torrent_entry const& v = i->second;
@ -454,75 +469,206 @@ bool node_impl::on_find(msg const& m, std::vector<tcp::endpoint>& peers) const
random_sample_n(boost::make_transform_iterator(v.peers.begin(), &get_endpoint)
, boost::make_transform_iterator(v.peers.end(), &get_endpoint)
, std::back_inserter(peers), num);
/*
#ifdef TORRENT_DHT_VERBOSE_LOGGING
for (std::vector<tcp::endpoint>::iterator i = peers.begin()
, end(peers.end()); i != end; ++i)
{
TORRENT_LOG(node) << " " << *i;
}
#endif
*/
return true;
}
void node_impl::incoming_request(msg const& m)
namespace
{
msg reply;
reply.message_id = m.message_id;
reply.addr = m.addr;
reply.reply = true;
reply.transaction_id = m.transaction_id;
switch (m.message_id)
void write_nodes_entry(entry& r, nodes_t const& nodes)
{
case messages::ping:
break;
case messages::get_peers:
bool ipv6_nodes = false;
entry& n = r["nodes"];
std::back_insert_iterator<std::string> out(n.string());
for (nodes_t::const_iterator i = nodes.begin()
, end(nodes.end()); i != end; ++i)
{
reply.info_hash = m.info_hash;
reply.write_token = generate_token(m);
on_find(m, reply.peers);
// always return nodes as well as peers
m_table.find_node(m.info_hash, reply.nodes, 0);
/*
#ifdef TORRENT_DHT_VERBOSE_LOGGING
for (std::vector<node_entry>::iterator i = reply.nodes.begin()
, end(reply.nodes.end()); i != end; ++i)
if (!i->addr.is_v4())
{
TORRENT_LOG(node) << " " << i->id << " " << i->ep();
ipv6_nodes = true;
continue;
}
#endif
*/
std::copy(i->id.begin(), i->id.end(), out);
write_endpoint(udp::endpoint(i->addr, i->port), out);
}
break;
case messages::find_node:
if (ipv6_nodes)
{
reply.info_hash = m.info_hash;
m_table.find_node(m.info_hash, reply.nodes, 0);
/*
#ifdef TORRENT_DHT_VERBOSE_LOGGING
for (std::vector<node_entry>::iterator i = reply.nodes.begin()
, end(reply.nodes.end()); i != end; ++i)
entry& p = r["nodes2"];
std::string endpoint;
for (nodes_t::const_iterator i = nodes.begin()
, end(nodes.end()); i != end; ++i)
{
TORRENT_LOG(node) << " " << i->id << " " << i->ep();
if (!i->addr.is_v6()) continue;
endpoint.resize(18 + 20);
std::string::iterator out = endpoint.begin();
std::copy(i->id.begin(), i->id.end(), out);
out += 20;
write_endpoint(udp::endpoint(i->addr, i->port), out);
endpoint.resize(out - endpoint.begin());
p.list().push_back(entry(endpoint));
}
#endif
*/
}
break;
case messages::announce_peer:
on_announce(m, reply);
break;
default:
TORRENT_ASSERT(false);
};
}
}
m_table.heard_about(m.id, m.addr);
m_rpc.reply(reply);
void incoming_error(entry& e, char const* msg)
{
e["y"] = "e";
entry::list_type& l = e["e"].list();
l.push_back(entry(203));
l.push_back(entry(msg));
}
// build response
void node_impl::incoming_request(msg const& m, entry& e)
{
e = entry(entry::dictionary_t);
e["y"] = "r";
e["t"] = m.message.dict_find_string_value("t");
lazy_entry const* query_ent = m.message.dict_find_string("q");
if (query_ent == 0)
{
incoming_error(e, "missing 'q' key");
return;
}
char const* query = query_ent->string_cstr();
lazy_entry const* arg_ent = m.message.dict_find_dict("a");
if (arg_ent == 0)
{
incoming_error(e, "missing 'a' key");
return;
}
lazy_entry const* node_id_ent = arg_ent->dict_find_string("id");
if (node_id_ent == 0 || node_id_ent->string_length() != 20)
{
incoming_error(e, "missing 'id' key");
return;
}
node_id id(node_id_ent->string_ptr());
m_table.heard_about(id, m.addr);
entry& reply = e["r"];
m_rpc.add_our_id(reply);
if (strcmp(query, "ping") == 0)
{
// we already have 't' and 'id' in the response
// no more left to add
}
else if (strcmp(query, "get_peers") == 0)
{
lazy_entry const* info_hash_ent = arg_ent->dict_find_string("info_hash");
if (info_hash_ent == 0 || info_hash_ent->string_length() != 20)
{
incoming_error(e, "missing 'info-hash' key");
return;
}
reply["token"] = generate_token(m.addr, info_hash_ent->string_ptr());
sha1_hash info_hash(info_hash_ent->string_ptr());
nodes_t n;
// always return nodes as well as peers
m_table.find_node(info_hash, n, 0);
write_nodes_entry(reply, n);
peers_t p;
on_find(info_hash, p);
if (!p.empty())
{
entry::list_type& pe = reply["values"].list();
std::string endpoint;
for (peers_t::const_iterator i = p.begin()
, end(p.end()); i != end; ++i)
{
endpoint.resize(18);
std::string::iterator out = endpoint.begin();
write_endpoint(*i, out);
endpoint.resize(out - endpoint.begin());
pe.push_back(entry(endpoint));
}
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(node) << " values: " << p.size();
#endif
}
}
else if (strcmp(query, "find_node") == 0)
{
lazy_entry const* target_ent = arg_ent->dict_find_string("target");
if (target_ent == 0 || target_ent->string_length() != 20)
{
incoming_error(e, "missing 'target' key");
return;
}
sha1_hash target(target_ent->string_ptr());
nodes_t n;
// always return nodes as well as peers
m_table.find_node(target, n, 0);
write_nodes_entry(reply, n);
}
else if (strcmp(query, "announce_peer") == 0)
{
lazy_entry const* info_hash_ent = arg_ent->dict_find_string("info_hash");
if (info_hash_ent == 0 || info_hash_ent->string_length() != 20)
{
incoming_error(e, "missing 'info-hash' key");
return;
}
int port = arg_ent->dict_find_int_value("port", -1);
if (port < 0 || port >= 65536)
{
incoming_error(e, "invalid 'port' in announce");
return;
}
sha1_hash info_hash(info_hash_ent->string_ptr());
if (m_ses.m_alerts.should_post<dht_announce_alert>())
m_ses.m_alerts.post_alert(dht_announce_alert(
m.addr.address(), port, info_hash));
lazy_entry const* token = arg_ent->dict_find_string("token");
if (!token)
{
incoming_error(e, "missing 'token' key in announce");
return;
}
if (!verify_token(token->string_value(), info_hash_ent->string_ptr(), m.addr))
{
incoming_error(e, "invalid token in announce");
return;
}
// the token was correct. That means this
// node is not spoofing its address. So, let
// the table get a chance to add it.
m_table.node_seen(id, m.addr);
torrent_entry& v = m_map[info_hash];
peer_entry e;
e.addr = tcp::endpoint(m.addr.address(), port);
e.added = time_now();
std::set<peer_entry>::iterator i = v.peers.find(e);
if (i != v.peers.end()) v.peers.erase(i++);
v.peers.insert(i, e);
}
else
{
incoming_error(e, "unknown message");
return;
}
}
} } // namespace libtorrent::dht

View File

@ -33,76 +33,30 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/pch.hpp"
#include <libtorrent/kademlia/refresh.hpp>
#include <libtorrent/kademlia/routing_table.hpp>
#include <libtorrent/kademlia/rpc_manager.hpp>
#include <libtorrent/kademlia/logging.hpp>
#include <libtorrent/kademlia/node.hpp>
#include <libtorrent/kademlia/msg.hpp>
#include <libtorrent/io.hpp>
#include <boost/bind.hpp>
using boost::bind;
namespace libtorrent { namespace dht
{
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_DEFINE_LOG(refresh)
#endif
refresh_observer::~refresh_observer()
refresh::refresh(
node_impl& node
, node_id target
, done_callback const& callback)
: find_data(node, target, find_data::data_callback(), callback)
{
if (m_algorithm) m_algorithm->failed(m_self, true);
}
void refresh_observer::reply(msg const& in)
char const* refresh::name() const
{
if (!m_algorithm) return;
if (!in.nodes.empty())
{
for (msg::nodes_t::const_iterator i = in.nodes.begin()
, end(in.nodes.end()); i != end; ++i)
{
m_algorithm->traverse(i->id, udp::endpoint(i->addr, i->port));
}
}
m_algorithm->finished(m_self);
m_algorithm = 0;
}
void refresh_observer::timeout()
{
if (!m_algorithm) return;
m_algorithm->failed(m_self);
m_algorithm = 0;
}
ping_observer::~ping_observer()
{
if (m_algorithm) m_algorithm->ping_timeout(m_self, true);
}
void ping_observer::reply(msg const& m)
{
if (!m_algorithm) return;
m_algorithm->ping_reply(m_self);
m_algorithm = 0;
}
void ping_observer::timeout()
{
if (!m_algorithm) return;
m_algorithm->ping_timeout(m_self);
m_algorithm = 0;
return "refresh";
}
void refresh::invoke(node_id const& nid, udp::endpoint addr)
{
TORRENT_ASSERT(m_node.m_rpc.allocation_size() >= sizeof(refresh_observer));
TORRENT_ASSERT(m_node.m_rpc.allocation_size() >= sizeof(find_data_observer));
void* ptr = m_node.m_rpc.allocator().malloc();
if (ptr == 0)
{
@ -110,84 +64,16 @@ void refresh::invoke(node_id const& nid, udp::endpoint addr)
return;
}
m_node.m_rpc.allocator().set_next_size(10);
observer_ptr o(new (ptr) refresh_observer( this, nid));
observer_ptr o(new (ptr) find_data_observer(this, nid));
#ifdef TORRENT_DEBUG
o->m_in_constructor = false;
#endif
m_node.m_rpc.invoke(messages::find_node, addr, o);
}
void refresh::done()
{
int max_results = m_node.m_table.bucket_size();
m_leftover_nodes_iterator = (int)m_results.size() > max_results ?
m_results.begin() + max_results : m_results.end();
invoke_pings_or_finish();
}
void refresh::ping_reply(node_id nid)
{
m_active_pings--;
invoke_pings_or_finish();
}
void refresh::ping_timeout(node_id nid, bool prevent_request)
{
m_active_pings--;
invoke_pings_or_finish(prevent_request);
}
void refresh::invoke_pings_or_finish(bool prevent_request)
{
if (prevent_request)
{
--m_max_active_pings;
if (m_max_active_pings <= 0)
m_max_active_pings = 1;
}
else
{
while (m_active_pings < m_max_active_pings)
{
if (m_leftover_nodes_iterator == m_results.end()) break;
result const& node = *m_leftover_nodes_iterator;
// Skip initial nodes
if (node.flags & result::initial)
{
++m_leftover_nodes_iterator;
continue;
}
#ifndef BOOST_NO_EXCEPTIONS
try
{
#endif
TORRENT_ASSERT(m_node.m_rpc.allocation_size() >= sizeof(ping_observer));
void* ptr = m_node.m_rpc.allocator().malloc();
if (ptr == 0) return;
m_node.m_rpc.allocator().set_next_size(10);
observer_ptr o(new (ptr) ping_observer(this, node.id));
#ifdef TORRENT_DEBUG
o->m_in_constructor = false;
#endif
m_node.m_rpc.invoke(messages::ping, node.addr, o);
++m_active_pings;
++m_leftover_nodes_iterator;
#ifndef BOOST_NO_EXCEPTIONS
}
catch (std::exception& e) {}
#endif
}
}
if (m_active_pings == 0)
{
m_done_callback();
}
entry e;
e["y"] = "q";
e["q"] = "find_node";
entry& a = e["a"];
a["target"] = target().to_string();
m_node.m_rpc.invoke(e, addr, o);
}
} } // namespace libtorrent::dht

View File

@ -47,7 +47,6 @@ POSSIBILITY OF SUCH DAMAGE.
#include <libtorrent/kademlia/logging.hpp>
#include <libtorrent/kademlia/routing_table.hpp>
#include <libtorrent/kademlia/find_data.hpp>
#include <libtorrent/kademlia/closest_nodes.hpp>
#include <libtorrent/kademlia/refresh.hpp>
#include <libtorrent/kademlia/node.hpp>
#include <libtorrent/kademlia/observer.hpp>
@ -87,14 +86,49 @@ void intrusive_ptr_release(observer const* o)
}
}
void observer::set_target(udp::endpoint const& ep)
{
#ifdef TORRENT_DHT_VERBOSE_LOGGING
// use high resolution timers for logging
m_sent = time_now_hires();
#else
m_sent = time_now();
#endif
m_port = ep.port();
#if TORRENT_USE_IPV6
if (ep.address().is_v6())
{
m_is_v6 = true;
m_addr.v6 = ep.address().to_v6().to_bytes();
}
else
#endif
{
m_is_v6 = false;
m_addr.v4 = ep.address().to_v4().to_bytes();
}
}
address observer::target_addr() const
{
if (m_is_v6)
return address_v6(m_addr.v6);
else
return address_v4(m_addr.v4);
}
udp::endpoint observer::target_ep() const
{
return udp::endpoint(target_addr(), m_port);
}
node_id generate_id();
typedef mpl::vector<
closest_nodes_observer
, find_data_observer
find_data_observer
, announce_observer
, refresh_observer
, ping_observer
, null_observer
> observer_types;
@ -102,13 +136,14 @@ typedef mpl::max_element<
mpl::transform_view<observer_types, mpl::sizeof_<mpl::_1> >
>::type max_observer_type_iter;
rpc_manager::rpc_manager(fun const& f, node_id const& our_id
, routing_table& table, send_fun const& sf)
rpc_manager::rpc_manager(node_id const& our_id
, routing_table& table, send_fun const& sf
, void* userdata)
: m_pool_allocator(sizeof(mpl::deref<max_observer_type_iter::base>::type), 10)
, m_next_transaction_id(std::rand() % max_transactions)
, m_oldest_transaction_id(m_next_transaction_id)
, m_incoming(f)
, m_send(sf)
, m_userdata(userdata)
, m_our_id(our_id)
, m_table(table)
, m_timer(time_now())
@ -119,12 +154,23 @@ rpc_manager::rpc_manager(fun const& f, node_id const& our_id
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(rpc) << "Constructing";
TORRENT_LOG(rpc) << " closest_nodes_observer: " << sizeof(closest_nodes_observer);
TORRENT_LOG(rpc) << " find_data_observer: " << sizeof(find_data_observer);
TORRENT_LOG(rpc) << " announce_observer: " << sizeof(announce_observer);
TORRENT_LOG(rpc) << " refresh_observer: " << sizeof(refresh_observer);
TORRENT_LOG(rpc) << " ping_observer: " << sizeof(ping_observer);
TORRENT_LOG(rpc) << " null_observer: " << sizeof(null_observer);
#define PRINT_OFFSETOF(x, y) TORRENT_LOG(rpc) << " +" << offsetof(x, y) << ": " #y
TORRENT_LOG(rpc) << " observer: " << sizeof(observer);
PRINT_OFFSETOF(observer, pool_allocator);
PRINT_OFFSETOF(observer, m_sent);
PRINT_OFFSETOF(observer, m_refs);
PRINT_OFFSETOF(observer, m_addr);
PRINT_OFFSETOF(observer, m_port);
TORRENT_LOG(rpc) << " find_data_observer: " << sizeof(find_data_observer);
PRINT_OFFSETOF(find_data_observer, m_algorithm);
PRINT_OFFSETOF(find_data_observer, m_self);
#undef PRINT_OFFSETOF
#endif
}
@ -201,96 +247,91 @@ void rpc_manager::unreachable(udp::endpoint const& ep)
}
}
// defined in node.cpp
void incoming_error(entry& e, char const* msg);
bool rpc_manager::incoming(msg const& m)
{
INVARIANT_CHECK;
if (m_destructing) return false;
if (m.reply)
// we only deal with replies, not queries
TORRENT_ASSERT(m.message.dict_find_string_value("y") == "r");
// if we don't have the transaction id in our
// request list, ignore the packet
std::string transaction_id = m.message.dict_find_string_value("t");
std::string::const_iterator i = transaction_id.begin();
int tid = transaction_id.size() != 2 ? -1 : io::read_uint16(i);
observer_ptr o;
if (tid >= (int)m_transactions.size() || tid < 0)
{
// if we don't have the transaction id in our
// request list, ignore the packet
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(rpc) << "Reply with invalid transaction id size: "
<< transaction_id.size() << " from " << m.addr;
#endif
entry e;
incoming_error(e, "invalid transaction id");
m_send(m_userdata, e, m.addr, 0);
return false;
}
if (m.transaction_id.size() < 2)
{
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(rpc) << "Reply with invalid transaction id size: "
<< m.transaction_id.size() << " from " << m.addr;
#endif
msg reply;
reply.reply = true;
reply.message_id = messages::error;
reply.error_code = 203; // Protocol error
reply.error_msg = "reply with invalid transaction id, size "
+ boost::lexical_cast<std::string>(m.transaction_id.size());
reply.addr = m.addr;
reply.transaction_id = "";
m_send(reply);
return false;
}
std::string::const_iterator i = m.transaction_id.begin();
int tid = io::read_uint16(i);
o = m_transactions[tid];
if (tid >= (int)m_transactions.size()
|| tid < 0)
{
if (!o)
{
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(rpc) << "Reply with invalid transaction id: "
<< tid << " from " << m.addr;
#endif
msg reply;
reply.reply = true;
reply.message_id = messages::error;
reply.error_code = 203; // Protocol error
reply.error_msg = "reply with invalid transaction id";
reply.addr = m.addr;
reply.transaction_id = "";
m_send(reply);
return false;
}
observer_ptr o = m_transactions[tid];
if (!o)
{
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(rpc) << "Reply with unknown transaction id: "
<< tid << " from " << m.addr << " (possibly timed out)";
#endif
return false;
}
if (m.addr.address() != o->target_addr)
{
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(rpc) << "Reply with incorrect address and valid transaction id: "
<< tid << " from " << m.addr << " expected: " << o->target_addr;
#endif
return false;
}
#ifdef TORRENT_DHT_VERBOSE_LOGGING
std::ofstream reply_stats("round_trip_ms.log", std::ios::app);
reply_stats << m.addr << "\t" << total_milliseconds(time_now() - o->sent)
<< std::endl;
#endif
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(rpc) << "Reply with transaction id: "
TORRENT_LOG(rpc) << "Reply to a timed out request "
<< tid << " from " << m.addr;
#endif
o->reply(m);
m_transactions[tid] = 0;
return m_table.node_seen(m.id, m.addr);
return false;
}
else
if (m.addr.address() != o->target_addr())
{
TORRENT_ASSERT(m.message_id != messages::error);
// this is an incoming request
m_incoming(m);
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(rpc) << "Reply with incorrect address and valid transaction id: "
<< tid << " from " << m.addr << " expected: " << o->target_addr();
#endif
return false;
}
return false;
#ifdef TORRENT_DHT_VERBOSE_LOGGING
std::ofstream reply_stats("round_trip_ms.log", std::ios::app);
reply_stats << m.addr << "\t" << total_milliseconds(time_now_hires() - o->sent())
<< std::endl;
#endif
lazy_entry const* ret_ent = m.message.dict_find_dict("r");
if (ret_ent == 0)
{
entry e;
incoming_error(e, "missing 'r' key");
m_send(m_userdata, e, m.addr, 0);
return false;
}
lazy_entry const* node_id_ent = ret_ent->dict_find_string("id");
if (node_id_ent == 0 || node_id_ent->string_length() != 20)
{
entry e;
incoming_error(e, "missing 'id' key");
m_send(m_userdata, e, m.addr, 0);
return false;
}
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(rpc) << "Reply with transaction id: "
<< tid << " from " << m.addr;
#endif
o->reply(m);
m_transactions[tid] = 0;
return m_table.node_seen(node_id(node_id_ent->string_ptr()), m.addr);
}
time_duration rpc_manager::tick()
@ -316,7 +357,7 @@ time_duration rpc_manager::tick()
observer_ptr o = m_transactions[m_oldest_transaction_id];
if (!o) continue;
time_duration diff = o->sent + milliseconds(timeout_ms) - time_now();
time_duration diff = o->sent() + milliseconds(timeout_ms) - time_now();
if (diff > seconds(0))
{
if (diff < seconds(1))
@ -372,7 +413,7 @@ unsigned int rpc_manager::new_transaction_id(observer_ptr o)
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(rpc) << "[new_transaction_id] Aborting message with transaction id: "
<< m_next_transaction_id << " sent to " << o->target_ep()
<< " " << total_seconds(time_now() - o->sent) << " seconds ago";
<< " " << total_seconds(time_now() - o->sent()) << " seconds ago";
#endif
m_transactions[m_next_transaction_id] = 0;
TORRENT_ASSERT(m_oldest_transaction_id == m_next_transaction_id);
@ -406,7 +447,12 @@ void rpc_manager::update_oldest_transaction_id()
}
}
void rpc_manager::invoke(int message_id, udp::endpoint target_addr
void rpc_manager::add_our_id(entry& e)
{
e["id"] = m_our_id.to_string();
}
void rpc_manager::invoke(entry& e, udp::endpoint target_addr
, observer_ptr o)
{
INVARIANT_CHECK;
@ -417,11 +463,10 @@ void rpc_manager::invoke(int message_id, udp::endpoint target_addr
return;
}
msg m;
m.message_id = message_id;
m.reply = false;
m.id = m_our_id;
m.addr = target_addr;
e["y"] = "q";
entry& a = e["a"];
add_our_id(a);
TORRENT_ASSERT(!m_transactions[m_next_transaction_id]);
#ifdef TORRENT_DEBUG
int potential_new_id = m_next_transaction_id;
@ -430,25 +475,18 @@ void rpc_manager::invoke(int message_id, udp::endpoint target_addr
try
{
#endif
m.transaction_id.clear();
std::back_insert_iterator<std::string> out(m.transaction_id);
std::string transaction_id;
transaction_id.resize(2);
char* out = &transaction_id[0];
io::write_uint16(m_next_transaction_id, out);
e["t"] = transaction_id;
o->send(m);
o->sent = time_now();
#if TORRENT_USE_IPV6
o->target_addr = target_addr.address();
#else
o->target_addr = target_addr.address().to_v4();
#endif
o->port = target_addr.port();
o->set_target(target_addr);
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(rpc) << "Invoking " << messages::ids[message_id]
<< " -> " << target_addr;
TORRENT_LOG(rpc) << "Invoking " << e["q"].string() << " -> " << target_addr;
#endif
m_send(m);
m_send(m_userdata, e, target_addr, 1);
new_transaction_id(o);
#ifndef BOOST_NO_EXCEPTIONS
}
@ -460,7 +498,7 @@ void rpc_manager::invoke(int message_id, udp::endpoint target_addr
}
#endif
}
/*
void rpc_manager::reply(msg& m)
{
INVARIANT_CHECK;
@ -472,6 +510,6 @@ void rpc_manager::reply(msg& m)
m_send(m);
}
*/
} } // namespace libtorrent::dht

View File

@ -82,6 +82,16 @@ void traversal_algorithm::add_entry(node_id const& id, udp::endpoint addr, unsig
}
}
void traversal_algorithm::start()
{
add_requests();
// in case the routing table is empty, use the
// router nodes in the table
if (m_results.empty()) add_router_entries();
init();
}
boost::pool<>& traversal_algorithm::allocator() const
{
return m_node.m_rpc.allocator();

View File

@ -372,7 +372,7 @@ namespace libtorrent
}
#endif // TORRENT_USE_IOSTREAM
std::string print_entry(lazy_entry const& e)
std::string print_entry(lazy_entry const& e, bool single_line)
{
std::string ret;
switch (e.type())
@ -420,7 +420,9 @@ namespace libtorrent
|| (e.list_at(0)->type() == lazy_entry::string_t
&& (e.list_at(0)->string_length() < 10
|| e.list_size() < 2)
&& e.list_size() < 5));
&& e.list_size() < 5))
|| single_line;
if (!one_liner) ret += "\n";
for (int i = 0; i < e.list_size(); ++i)
{
@ -435,12 +437,13 @@ namespace libtorrent
case lazy_entry::dict_t:
{
ret += "{";
bool one_liner = (e.dict_size() == 0
bool one_liner = ((e.dict_size() == 0
|| e.dict_at(0).second->type() == lazy_entry::int_t
|| (e.dict_at(0).second->type() == lazy_entry::string_t
&& e.dict_at(0).second->string_length() < 30)
|| e.dict_at(0).first.size() < 10)
&& e.dict_size() < 5;
&& e.dict_size() < 5)
|| single_line;
if (!one_liner) ret += "\n";
for (int i = 0; i < e.dict_size(); ++i)

View File

@ -289,11 +289,8 @@ namespace aux {
PRINT_SIZEOF(policy::ipv6_peer)
#endif
PRINT_SIZEOF(dht::closest_nodes_observer)
PRINT_SIZEOF(dht::find_data_observer)
PRINT_SIZEOF(dht::announce_observer)
PRINT_SIZEOF(dht::refresh_observer)
PRINT_SIZEOF(dht::ping_observer)
PRINT_SIZEOF(dht::null_observer)
#undef PRINT_OFFSETOF
#undef PRINT_SIZEOF