fixed race when adding router nodes to the dht (router nodes should be added before it's started)

This commit is contained in:
Arvid Norberg 2008-09-02 06:37:40 +00:00
parent 0074b10b88
commit c88071ae10
7 changed files with 48 additions and 36 deletions

View File

@ -888,6 +888,13 @@ int main(int ac, char* av[])
#ifndef TORRENT_DISABLE_DHT #ifndef TORRENT_DISABLE_DHT
settings.use_dht_as_fallback = false; settings.use_dht_as_fallback = false;
ses.add_dht_router(std::make_pair(
std::string("router.bittorrent.com"), 6881));
ses.add_dht_router(std::make_pair(
std::string("router.utorrent.com"), 6881));
ses.add_dht_router(std::make_pair(
std::string("router.bitcomet.com"), 6881));
boost::filesystem::ifstream dht_state_file(".dht_state" boost::filesystem::ifstream dht_state_file(".dht_state"
, std::ios_base::binary); , std::ios_base::binary);
dht_state_file.unsetf(std::ios_base::skipws); dht_state_file.unsetf(std::ios_base::skipws);
@ -896,12 +903,6 @@ int main(int ac, char* av[])
std::istream_iterator<char>(dht_state_file) std::istream_iterator<char>(dht_state_file)
, std::istream_iterator<char>()); , std::istream_iterator<char>());
ses.start_dht(dht_state); ses.start_dht(dht_state);
ses.add_dht_router(std::make_pair(std::string("router.bittorrent.com")
, 6881));
ses.add_dht_router(std::make_pair(std::string("router.utorrent.com")
, 6881));
ses.add_dht_router(std::make_pair(std::string("router.bitcomet.com")
, 6881));
#endif #endif
// look for ipfilter.dat // look for ipfilter.dat

View File

@ -549,6 +549,10 @@ namespace libtorrent
udp_socket m_dht_socket; udp_socket m_dht_socket;
// these are used when starting the DHT
// (and bootstrapping it), and then erased
std::list<std::pair<std::string, int> > m_dht_router_nodes;
void on_receive_udp(error_code const& e void on_receive_udp(error_code const& e
, udp::endpoint const& ep, char const* buf, int len); , udp::endpoint const& ep, char const* buf, int len);
#endif #endif

View File

@ -71,8 +71,9 @@ namespace libtorrent { namespace dht
{ {
friend void intrusive_ptr_add_ref(dht_tracker const*); friend void intrusive_ptr_add_ref(dht_tracker const*);
friend void intrusive_ptr_release(dht_tracker const*); friend void intrusive_ptr_release(dht_tracker const*);
dht_tracker(udp_socket& sock, dht_settings const& settings dht_tracker(udp_socket& sock, dht_settings const& settings);
, entry const& bootstrap);
void start(entry const& bootstrap);
void stop(); void stop();
void add_node(udp::endpoint node); void add_node(udp::endpoint node);

View File

@ -161,7 +161,7 @@ class node_impl : boost::noncopyable
typedef std::map<node_id, torrent_entry> table_t; typedef std::map<node_id, torrent_entry> table_t;
public: public:
node_impl(boost::function<void(msg const&)> const& f node_impl(boost::function<void(msg const&)> const& f
, dht_settings const& settings, boost::optional<node_id> node_id); , dht_settings const& settings);
virtual ~node_impl() {} virtual ~node_impl() {}
@ -186,7 +186,9 @@ public:
typedef table_t::iterator data_iterator; typedef table_t::iterator data_iterator;
void set_node_id(node_id const& nid) { m_id = nid; }
node_id const& nid() const { return m_id; } node_id const& nid() const { return m_id; }
boost::tuple<int, int> size() const{ return m_table.size(); } boost::tuple<int, int> size() const{ return m_table.size(); }
size_type num_global_nodes() const size_type num_global_nodes() const
{ return m_table.num_global_nodes(); } { return m_table.num_global_nodes(); }

View File

@ -83,21 +83,6 @@ namespace
} }
}; };
boost::optional<node_id> read_id(libtorrent::entry const& d)
{
using namespace libtorrent;
using libtorrent::dht::node_id;
if (d.type() != entry::dictionary_t) return boost::optional<node_id>();
entry const* nid = d.find_key("node-id");
if (!nid
|| nid->type() != entry::string_t
|| nid->string().length() != 40)
return boost::optional<node_id>();
return boost::optional<node_id>(
boost::lexical_cast<node_id>(nid->string()));
}
template <class EndpointType> template <class EndpointType>
void read_endpoint_list(libtorrent::entry const* n, std::vector<EndpointType>& epl) void read_endpoint_list(libtorrent::entry const* n, std::vector<EndpointType>& epl)
{ {
@ -142,10 +127,8 @@ namespace libtorrent { namespace dht
// class that puts the networking and the kademlia node in a single // class that puts the networking and the kademlia node in a single
// unit and connecting them together. // unit and connecting them together.
dht_tracker::dht_tracker(udp_socket& sock, dht_settings const& settings dht_tracker::dht_tracker(udp_socket& sock, dht_settings const& settings)
, entry const& bootstrap) : m_dht(bind(&dht_tracker::send_packet, this, _1), settings)
: m_dht(bind(&dht_tracker::send_packet, this, _1), settings
, read_id(bootstrap))
, m_sock(sock) , m_sock(sock)
, m_last_new_key(time_now() - minutes(key_refresh)) , m_last_new_key(time_now() - minutes(key_refresh))
, m_timer(sock.get_io_service()) , m_timer(sock.get_io_service())
@ -185,6 +168,10 @@ namespace libtorrent { namespace dht
// dht_tracker_log.enable(false); // dht_tracker_log.enable(false);
#endif #endif
}
void dht_tracker::start(entry const& bootstrap)
{
std::vector<udp::endpoint> initial_nodes; std::vector<udp::endpoint> initial_nodes;
if (bootstrap.type() == entry::dictionary_t) if (bootstrap.type() == entry::dictionary_t)
@ -194,6 +181,12 @@ namespace libtorrent { namespace dht
if (entry const* nodes = bootstrap.find_key("nodes")) if (entry const* nodes = bootstrap.find_key("nodes"))
read_endpoint_list<udp::endpoint>(nodes, initial_nodes); read_endpoint_list<udp::endpoint>(nodes, initial_nodes);
} catch (std::exception&) {} } catch (std::exception&) {}
entry const* nid = bootstrap.find_key("node-id");
if (nid
&& nid->type() == entry::string_t
&& nid->string().length() == 40)
m_dht.set_node_id(boost::lexical_cast<node_id>(nid->string()));
} }
m_timer.expires_from_now(seconds(1)); m_timer.expires_from_now(seconds(1));
@ -465,6 +458,8 @@ namespace libtorrent { namespace dht
m.transaction_id = e["t"].string(); m.transaction_id = e["t"].string();
#ifdef TORRENT_DHT_VERBOSE_LOGGING #ifdef TORRENT_DHT_VERBOSE_LOGGING
log_line << " t: " << to_hex(m.transaction_id);
try try
{ {
entry const* ver = e.find_key("v"); entry const* ver = e.find_key("v");
@ -512,8 +507,7 @@ namespace libtorrent { namespace dht
if (msg_type == "r") if (msg_type == "r")
{ {
#ifdef TORRENT_DHT_VERBOSE_LOGGING #ifdef TORRENT_DHT_VERBOSE_LOGGING
log_line << " r: " << messages::ids[m.message_id] log_line << " r: " << messages::ids[m.message_id];
<< " t: " << to_hex(m.transaction_id);
#endif #endif
m.reply = true; m.reply = true;
@ -616,7 +610,7 @@ namespace libtorrent { namespace dht
if (target.size() != 20) throw std::runtime_error("invalid size of target id"); if (target.size() != 20) throw std::runtime_error("invalid size of target id");
std::copy(target.begin(), target.end(), m.info_hash.begin()); std::copy(target.begin(), target.end(), m.info_hash.begin());
#ifdef TORRENT_DHT_VERBOSE_LOGGING #ifdef TORRENT_DHT_VERBOSE_LOGGING
log_line << " t: " << boost::lexical_cast<std::string>(m.info_hash); log_line << " target: " << boost::lexical_cast<std::string>(m.info_hash);
#endif #endif
m.message_id = libtorrent::dht::messages::find_node; m.message_id = libtorrent::dht::messages::find_node;

View File

@ -91,9 +91,9 @@ void purge_peers(std::set<peer_entry>& peers)
void nop() {} void nop() {}
node_impl::node_impl(boost::function<void(msg const&)> const& f node_impl::node_impl(boost::function<void(msg const&)> const& f
, dht_settings const& settings, boost::optional<node_id> node_id) , dht_settings const& settings)
: m_settings(settings) : m_settings(settings)
, m_id(node_id ? *node_id : generate_id()) , m_id(generate_id())
, m_table(m_id, 8, settings) , m_table(m_id, 8, settings)
, m_rpc(bind(&node_impl::incoming_request, this, _1) , m_rpc(bind(&node_impl::incoming_request, this, _1)
, m_id, m_table, f) , m_id, m_table, f)

View File

@ -2047,11 +2047,20 @@ namespace aux {
, m_dht_settings.service_port , m_dht_settings.service_port
, m_dht_settings.service_port); , m_dht_settings.service_port);
} }
m_dht = new dht::dht_tracker(m_dht_socket, m_dht_settings, startup_state); m_dht = new dht::dht_tracker(m_dht_socket, m_dht_settings);
if (!m_dht_socket.is_open() || m_dht_socket.local_port() != m_dht_settings.service_port) if (!m_dht_socket.is_open() || m_dht_socket.local_port() != m_dht_settings.service_port)
{ {
m_dht_socket.bind(m_dht_settings.service_port); m_dht_socket.bind(m_dht_settings.service_port);
} }
for (std::list<std::pair<std::string, int> >::iterator i = m_dht_router_nodes.begin()
, end(m_dht_router_nodes.end()); i != end; ++i)
{
m_dht->add_router_node(*i);
}
std::list<std::pair<std::string, int> >().swap(m_dht_router_nodes);
m_dht->start(startup_state);
} }
void session_impl::stop_dht() void session_impl::stop_dht()
@ -2115,9 +2124,10 @@ namespace aux {
void session_impl::add_dht_router(std::pair<std::string, int> const& node) void session_impl::add_dht_router(std::pair<std::string, int> const& node)
{ {
TORRENT_ASSERT(m_dht); // router nodes should be added before the DHT is started (and bootstrapped)
mutex_t::scoped_lock l(m_mutex); mutex_t::scoped_lock l(m_mutex);
m_dht->add_router_node(node); if (m_dht) m_dht->add_router_node(node);
else m_dht_router_nodes.push_back(node);
} }
#endif #endif