From 194ad410dccf477547e526ac918e9973fbee6831 Mon Sep 17 00:00:00 2001 From: Thomas Yuan Date: Tue, 22 Sep 2015 14:10:57 -0400 Subject: [PATCH] Make dht_put_alert more accurate. --- CMakeLists.txt | 1 + Jamfile | 1 + include/libtorrent/Makefile.am | 1 + include/libtorrent/alert_types.hpp | 10 +- include/libtorrent/kademlia/dht_tracker.hpp | 11 +- include/libtorrent/kademlia/get_item.hpp | 10 +- include/libtorrent/kademlia/node.hpp | 10 +- include/libtorrent/kademlia/put_data.hpp | 94 ++++++++++++++ src/Makefile.am | 1 + src/alert.cpp | 13 +- src/kademlia/dht_tracker.cpp | 58 +-------- src/kademlia/get_item.cpp | 135 +++++--------------- src/kademlia/node.cpp | 79 +++++++++++- src/kademlia/put_data.cpp | 112 ++++++++++++++++ src/kademlia/rpc_manager.cpp | 3 +- src/session_impl.cpp | 27 ++-- test/test_dht.cpp | 53 +++++--- tools/dht_put.cpp | 11 +- 18 files changed, 422 insertions(+), 208 deletions(-) create mode 100644 include/libtorrent/kademlia/put_data.hpp create mode 100644 src/kademlia/put_data.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 563ca0076..89be2cda1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -126,6 +126,7 @@ set(kademlia_sources refresh rpc_manager find_data + put_data node_id routing_table traversal_algorithm diff --git a/Jamfile b/Jamfile index d2aa29225..d513f3ecd 100644 --- a/Jamfile +++ b/Jamfile @@ -706,6 +706,7 @@ KADEMLIA_SOURCES = get_peers item get_item + put_data ; ED25519_SOURCES = diff --git a/include/libtorrent/Makefile.am b/include/libtorrent/Makefile.am index bfe774039..b6918ae8e 100644 --- a/include/libtorrent/Makefile.am +++ b/include/libtorrent/Makefile.am @@ -187,6 +187,7 @@ nobase_include_HEADERS = \ kademlia/direct_request.hpp \ kademlia/dos_blocker.hpp \ kademlia/find_data.hpp \ + kademlia/put_data.hpp \ kademlia/msg.hpp \ kademlia/node.hpp \ kademlia/node_entry.hpp \ diff --git a/include/libtorrent/alert_types.hpp b/include/libtorrent/alert_types.hpp index 0a2e131ae..7d16e9fa8 100644 --- a/include/libtorrent/alert_types.hpp +++ b/include/libtorrent/alert_types.hpp @@ -2045,11 +2045,12 @@ namespace libtorrent struct TORRENT_EXPORT dht_put_alert: alert { // internal - dht_put_alert(aux::stack_allocator& alloc, sha1_hash const& t); + dht_put_alert(aux::stack_allocator& alloc, sha1_hash const& t, int n); dht_put_alert(aux::stack_allocator& alloc, boost::array key , boost::array sig , std::string s - , boost::uint64_t sequence_number); + , boost::uint64_t sequence_number + , int n); TORRENT_DEFINE_ALERT(dht_put_alert, 76) @@ -2066,6 +2067,11 @@ namespace libtorrent boost::array signature; std::string salt; boost::uint64_t seq; + + // DHT put operation usually writes item to k nodes, maybe the node + // is stale so no response, or the node doesn't support 'put', or the + // token for write is out of date, etc. + int num_success; }; // this alert is used to report errors in the i2p SAM connection diff --git a/include/libtorrent/kademlia/dht_tracker.hpp b/include/libtorrent/kademlia/dht_tracker.hpp index 004901b30..1224484e8 100644 --- a/include/libtorrent/kademlia/dht_tracker.hpp +++ b/include/libtorrent/kademlia/dht_tracker.hpp @@ -101,11 +101,18 @@ namespace libtorrent { namespace dht , boost::function cb , std::string salt = std::string()); + // for immutable_item. + // the callback function will be called when put operation is done. + // the bool parameter indicates the put operation success or failed. void put_item(entry data - , boost::function cb); + , boost::function cb); + // for mutable_item. + // the data_cb will be called when we get authoritative mutable_item, + // the cb is same as put immutable_item. void put_item(char const* key - , boost::function cb, std::string salt = std::string()); + , boost::function cb + , boost::function data_cb, std::string salt = std::string()); // send an arbitrary DHT request directly to a node void direct_request(udp::endpoint ep, entry& e diff --git a/include/libtorrent/kademlia/get_item.hpp b/include/libtorrent/kademlia/get_item.hpp index 3a52b6c9c..f23258f68 100644 --- a/include/libtorrent/kademlia/get_item.hpp +++ b/include/libtorrent/kademlia/get_item.hpp @@ -43,7 +43,7 @@ namespace libtorrent { namespace dht class get_item : public find_data { public: - typedef boost::function data_callback; + typedef boost::function data_callback; void got_data(bdecode_node const& v, char const* pk, @@ -53,13 +53,15 @@ public: // for immutable itms get_item(node& dht_node , node_id target - , data_callback const& dcallback); + , data_callback const& dcallback + , nodes_callback const& ncallback); // for mutable items get_item(node& dht_node , char const* pk , std::string const& salt - , data_callback const& dcallback); + , data_callback const& dcallback + , nodes_callback const& ncallback); virtual char const* name() const; @@ -68,8 +70,6 @@ protected: virtual bool invoke(observer_ptr o); virtual void done(); - void put(std::vector > const& v); - data_callback m_data_callback; item m_data; std::string m_salt; diff --git a/include/libtorrent/kademlia/node.hpp b/include/libtorrent/kademlia/node.hpp index 3f26a4d12..40e1b2cba 100644 --- a/include/libtorrent/kademlia/node.hpp +++ b/include/libtorrent/kademlia/node.hpp @@ -44,6 +44,7 @@ POSSIBILITY OF SUCH DAMAGE. #include #include #include +#include #include #include @@ -176,8 +177,13 @@ public: void direct_request(udp::endpoint ep, entry& e , boost::function f); - void get_item(sha1_hash const& target, boost::function f); - void get_item(char const* pk, std::string const& salt, boost::function f); + void get_item(sha1_hash const& target, boost::function f); + void get_item(char const* pk, std::string const& salt, boost::function f); + + void put_item(sha1_hash const& target, entry& data, boost::function f); + void put_item(char const* pk, std::string const& salt + , boost::function f + , boost::function data_cb); bool verify_token(std::string const& token, char const* info_hash , udp::endpoint const& addr) const; diff --git a/include/libtorrent/kademlia/put_data.hpp b/include/libtorrent/kademlia/put_data.hpp new file mode 100644 index 000000000..23f65b76b --- /dev/null +++ b/include/libtorrent/kademlia/put_data.hpp @@ -0,0 +1,94 @@ +/* + +Copyright (c) 2006-2015, Arvid Norberg, Thomas Yuan +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in + the documentation and/or other materials provided with the distribution. + * Neither the name of the author nor the names of its + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. + +*/ + +#ifndef TORRENT_PUT_DATA_HPP +#define TORRENT_PUT_DATA_HPP + +#include +#include +#include +#include + +#include "libtorrent/aux_/disable_warnings_push.hpp" + +#include +#include +#include + +#include "libtorrent/aux_/disable_warnings_pop.hpp" + +namespace libtorrent { namespace dht +{ +struct msg; +class node; + +struct put_data: traversal_algorithm +{ + typedef boost::function put_callback; + + put_data(node& node, put_callback const& callback); + + virtual char const* name() const; + + void set_data(item const& data) { m_data = data; } + + void set_targets(std::vector > const& targets); + +protected: + + virtual void done(); + virtual bool invoke(observer_ptr o); + + put_callback m_put_callback; + item m_data; + bool m_done; +}; + +struct put_data_observer : traversal_observer +{ + put_data_observer( + boost::intrusive_ptr const& algorithm + , udp::endpoint const& ep, node_id const& id, std::string const& token) + : traversal_observer(algorithm, ep, id) + , m_token(token) + { + } + + virtual void reply(msg const&) { done(); } + + std::string m_token; +}; + +} } // namespace libtorrent::dht + +#endif // TORRENT_PUT_DATA_HPP + diff --git a/src/Makefile.am b/src/Makefile.am index ca77b0396..a3a523f02 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -7,6 +7,7 @@ KADEMLIA_SOURCES = \ kademlia/dht_storage.cpp \ kademlia/dht_tracker.cpp \ kademlia/find_data.cpp \ + kademlia/put_data.cpp \ kademlia/node.cpp \ kademlia/node_entry.cpp \ kademlia/node_id.cpp \ diff --git a/src/alert.cpp b/src/alert.cpp index 78f7448b5..ac9651be3 100644 --- a/src/alert.cpp +++ b/src/alert.cpp @@ -1472,21 +1472,24 @@ namespace libtorrent { return msg; } - dht_put_alert::dht_put_alert(aux::stack_allocator&, sha1_hash const& t) + dht_put_alert::dht_put_alert(aux::stack_allocator&, sha1_hash const& t, int n) : target(t) , seq(0) + , num_success(n) {} dht_put_alert::dht_put_alert(aux::stack_allocator& , boost::array key , boost::array sig , std::string s - , boost::uint64_t sequence_number) + , boost::uint64_t sequence_number + , int n) : target(0) , public_key(key) , signature(sig) , salt(s) , seq(sequence_number) + , num_success(n) {} std::string dht_put_alert::message() const @@ -1494,7 +1497,8 @@ namespace libtorrent { char msg[1050]; if (target.is_all_zeros()) { - snprintf(msg, sizeof(msg), "DHT put complete (key=%s sig=%s salt=%s seq=%" PRId64 ")" + snprintf(msg, sizeof(msg), "DHT put complete (success=%d key=%s sig=%s salt=%s seq=%" PRId64 ")" + , num_success , to_hex(std::string(&public_key[0], 32)).c_str() , to_hex(std::string(&signature[0], 64)).c_str() , salt.c_str() @@ -1502,7 +1506,8 @@ namespace libtorrent { return msg; } - snprintf(msg, sizeof(msg), "DHT put complete (hash=%s)" + snprintf(msg, sizeof(msg), "DHT put commplete (success=%d hash=%s)" + , num_success , to_hex(target.to_string()).c_str()); return msg; } diff --git a/src/kademlia/dht_tracker.cpp b/src/kademlia/dht_tracker.cpp index 0b0826bbd..b20cc3652 100644 --- a/src/kademlia/dht_tracker.cpp +++ b/src/kademlia/dht_tracker.cpp @@ -239,53 +239,10 @@ namespace libtorrent { namespace dht m_dht.announce(ih, listen_port, flags, f); } - namespace { - - // these functions provide a slightly higher level - // interface to the get/put functionality in the DHT - bool get_immutable_item_callback(item& it, boost::function f) - { - // the reason to wrap here is to control the return value - // since it controls whether we re-put the content - TORRENT_ASSERT(!it.is_mutable()); - f(it); - return false; - } - - bool get_mutable_item_callback(item& it, bool authoritative, boost::function f) - { - // the reason to wrap here is to control the return value - // since it controls whether we re-put the content - TORRENT_ASSERT(it.is_mutable()); - f(it, authoritative); - return false; - } - - bool put_immutable_item_callback(item& it, boost::function f - , entry data) - { - TORRENT_ASSERT(!it.is_mutable()); - it.assign(data); - // TODO: ideally this function would be called when the - // put completes - f(); - return true; - } - - bool put_mutable_item_callback(item& it, bool authoritative, boost::function cb) - { - if (authoritative) { - cb(it); - } - return true; - } - - } // anonymous namespace - void dht_tracker::get_item(sha1_hash const& target , boost::function cb) { - m_dht.get_item(target, boost::bind(&get_immutable_item_callback, _1, cb)); + m_dht.get_item(target, cb); } // key is a 32-byte binary string, the public key to look up. @@ -294,26 +251,25 @@ namespace libtorrent { namespace dht , boost::function cb , std::string salt) { - m_dht.get_item(key, salt, boost::bind(&get_mutable_item_callback, _1, _2, cb)); + m_dht.get_item(key, salt, cb); } void dht_tracker::put_item(entry data - , boost::function cb) + , boost::function cb) { std::string flat_data; bencode(std::back_inserter(flat_data), data); sha1_hash target = item_target_id( std::pair(flat_data.c_str(), flat_data.size())); - m_dht.get_item(target, boost::bind(&put_immutable_item_callback - , _1, cb, data)); + m_dht.put_item(target, data, cb); } void dht_tracker::put_item(char const* key - , boost::function cb, std::string salt) + , boost::function cb + , boost::function data_cb, std::string salt) { - m_dht.get_item(key, salt, boost::bind(&put_mutable_item_callback - , _1, _2, cb)); + m_dht.put_item(key, salt, cb, data_cb); } void dht_tracker::direct_request(udp::endpoint ep, entry& e diff --git a/src/kademlia/get_item.cpp b/src/kademlia/get_item.cpp index dd433ac33..a9089573f 100644 --- a/src/kademlia/get_item.cpp +++ b/src/kademlia/get_item.cpp @@ -51,6 +51,9 @@ void get_item::got_data(bdecode_node const& v, { // we received data! + // for put_immutable_item, there is no data_callback. + if (!m_data_callback) return; + std::pair salt(m_salt.c_str(), int(m_salt.size())); sha1_hash incoming_target; @@ -88,39 +91,27 @@ void get_item::got_data(bdecode_node const& v, // and it's immutable m_data.assign(v); - bool put_requested = m_data_callback(m_data, true); - // if we intend to put, we need to keep going - // until we find the closest nodes, since those - // are the ones we're putting to - if (put_requested) - { + // There can only be one true immutable item with a given id + // Now that we've got it and the user doesn't want to do a put + // there's no point in continuing to query other nodes + m_data_callback(m_data, true); + abort(); + #if TORRENT_USE_ASSERTS - std::vector buffer; - bencode(std::back_inserter(buffer), m_data.value()); - TORRENT_ASSERT(m_target == hasher(&buffer[0], buffer.size()).final()); + std::vector buffer; + bencode(std::back_inserter(buffer), m_data.value()); + TORRENT_ASSERT(m_target == hasher(&buffer[0], buffer.size()).final()); #endif - - // this function is called when we're done, passing - // in all relevant nodes we received data from close - // to the target. - m_nodes_callback = boost::bind(&get_item::put, this, _1); - } - else - { - // There can only be one true immutable item with a given id - // Now that we've got it and the user doesn't want to do a put - // there's no point in continuing to query other nodes - done(); - } - } + } } get_item::get_item( node& dht_node , node_id target - , data_callback const& dcallback) - : find_data(dht_node, target, nodes_callback()) + , data_callback const& dcallback + , nodes_callback const& ncallback) + : find_data(dht_node, target, ncallback) , m_data_callback(dcallback) { } @@ -129,10 +120,11 @@ get_item::get_item( node& dht_node , char const* pk , std::string const& salt - , data_callback const& dcallback) + , data_callback const& dcallback + , nodes_callback const& ncallback) : find_data(dht_node, item_target_id( std::make_pair(salt.c_str(), int(salt.size())), pk) - , nodes_callback()) + , ncallback) , m_data_callback(dcallback) , m_data(pk, salt) { @@ -170,92 +162,29 @@ bool get_item::invoke(observer_ptr o) void get_item::done() { + // For immutable_put, we only need nodes_callback, the m_data_callback + // shouldn't be set. + if (!m_data_callback) return find_data::done(); + if (m_data.is_mutable() || m_data.empty()) { // for mutable data, now we have authoritative data since // we've heard from everyone, to be sure we got the // latest version of the data (i.e. highest sequence number) - bool put_requested = m_data_callback(m_data, true); - if (put_requested) - { + m_data_callback(m_data, true); + #if TORRENT_USE_ASSERTS - if (m_data.is_mutable()) - { - TORRENT_ASSERT(m_target - == item_target_id(std::pair(m_data.salt().c_str() - , m_data.salt().size()) - , m_data.pk().data())); - } - else - { - std::vector buffer; - bencode(std::back_inserter(buffer), m_data.value()); - TORRENT_ASSERT(m_target == hasher(&buffer[0], buffer.size()).final()); - } -#endif - - // this function is called when we're done, passing - // in all relevant nodes we received data from close - // to the target. - m_nodes_callback = boost::bind(&get_item::put, this, _1); - } - } - find_data::done(); -} - -// this function sends a put message to the nodes -// closest to the target. Those nodes are passed in -// as the v argument -void get_item::put(std::vector > const& v) -{ -#ifndef TORRENT_DISABLE_LOGGING - // TODO: 3 it would be nice to not have to spend so much time rendering - // the bencoded dict if logging is disabled - get_node().observer()->log(dht_logger::traversal, "[%p] sending put " - "[ seq: %" PRId64 " nodes: %d ]" - , static_cast(this), (m_data.is_mutable() ? m_data.seq() : -1) - , int(v.size())); -#endif - - // create a dummy traversal_algorithm - boost::intrusive_ptr algo( - new traversal_algorithm(m_node, (node_id::min)())); - - // store on the first k nodes - for (std::vector >::const_iterator i = v.begin() - , end(v.end()); i != end; ++i) - { -#ifndef TORRENT_DISABLE_LOGGING - get_node().observer()->log(dht_logger::traversal, "[%p] put-distance: %d" - , static_cast(this), 160 - distance_exp(m_target, i->first.id)); -#endif - - void* ptr = m_node.m_rpc.allocate_observer(); - if (ptr == 0) return; - - // TODO: 3 we don't support CAS errors here! we need a custom observer - observer_ptr o(new (ptr) announce_observer(algo, i->first.ep(), i->first.id)); -#if TORRENT_USE_ASSERTS - o->m_in_constructor = false; -#endif - entry e; - e["y"] = "q"; - e["q"] = "put"; - entry& a = e["a"]; - a["v"] = m_data.value(); - a["token"] = i->second; if (m_data.is_mutable()) { - a["k"] = std::string(m_data.pk().data(), item_pk_len); - a["seq"] = m_data.seq(); - a["sig"] = std::string(m_data.sig().data(), item_sig_len); - if (!m_data.salt().empty()) - { - a["salt"] = m_data.salt(); - } + TORRENT_ASSERT(m_target + == item_target_id(std::pair(m_data.salt().c_str() + , m_data.salt().size()) + , m_data.pk().data())); } - m_node.m_rpc.invoke(e, i->first.ep(), o); +#endif } + + find_data::done(); } void get_item_observer::reply(msg const& m) diff --git a/src/kademlia/node.cpp b/src/kademlia/node.cpp index 6fce3c431..51b38024e 100644 --- a/src/kademlia/node.cpp +++ b/src/kademlia/node.cpp @@ -315,7 +315,6 @@ namespace // create a dummy traversal_algorithm boost::intrusive_ptr algo( new traversal_algorithm(node, (node_id::min)())); - // store on the first k nodes for (std::vector >::const_iterator i = v.begin() , end(v.end()); i != end; ++i) @@ -424,7 +423,7 @@ void node::direct_request(udp::endpoint ep, entry& e } void node::get_item(sha1_hash const& target - , boost::function f) + , boost::function f) { #ifndef TORRENT_DISABLE_LOGGING if (m_observer) @@ -437,12 +436,12 @@ void node::get_item(sha1_hash const& target #endif boost::intrusive_ptr ta; - ta.reset(new dht::get_item(*this, target, f)); + ta.reset(new dht::get_item(*this, target, boost::bind(f, _1), find_data::nodes_callback())); ta->start(); } void node::get_item(char const* pk, std::string const& salt - , boost::function f) + , boost::function f) { #ifndef TORRENT_DISABLE_LOGGING if (m_observer) @@ -454,7 +453,77 @@ void node::get_item(char const* pk, std::string const& salt #endif boost::intrusive_ptr ta; - ta.reset(new dht::get_item(*this, pk, salt, f)); + ta.reset(new dht::get_item(*this, pk, salt, f, find_data::nodes_callback())); + ta->start(); +} + +namespace { + +void put(std::vector > const& nodes + , boost::intrusive_ptr ta) +{ + ta->set_targets(nodes); + ta->start(); +} + +void put_data_cb(item& i, bool auth + , boost::intrusive_ptr ta + , boost::function f) +{ + // call data_callback only when we got authoritative data. + if (auth) + { + f(i); + ta->set_data(i); + } +} + +} // namespace + +void node::put_item(sha1_hash const& target, entry& data, boost::function f) +{ +#ifndef TORRENT_DISABLE_LOGGING + if (m_observer) + { + char hex_target[41]; + to_hex(reinterpret_cast(&target[0]), 20, hex_target); + m_observer->log(dht_logger::node, "starting get for [ hash: %s ]" + , hex_target); + } +#endif + + item i; + i.assign(data); + boost::intrusive_ptr put_ta; + put_ta.reset(new dht::put_data(*this, boost::bind(f, _2))); + put_ta->set_data(i); + + boost::intrusive_ptr ta; + ta.reset(new dht::get_item(*this, target, get_item::data_callback(), + boost::bind(&put, _1, put_ta))); + ta->start(); +} + +void node::put_item(char const* pk, std::string const& salt + , boost::function f + , boost::function data_cb) +{ + #ifndef TORRENT_DISABLE_LOGGING + if (m_observer) + { + char hex_key[65]; + to_hex(pk, 32, hex_key); + m_observer->log(dht_logger::node, "starting get for [ key: %s ]", hex_key); + } + #endif + + boost::intrusive_ptr put_ta; + put_ta.reset(new dht::put_data(*this, f)); + + boost::intrusive_ptr ta; + ta.reset(new dht::get_item(*this, pk, salt + , boost::bind(&put_data_cb, _1, _2, put_ta, data_cb) + , boost::bind(&put, _1, put_ta))); ta->start(); } diff --git a/src/kademlia/put_data.cpp b/src/kademlia/put_data.cpp new file mode 100644 index 000000000..f23f09673 --- /dev/null +++ b/src/kademlia/put_data.cpp @@ -0,0 +1,112 @@ +/* + +Copyright (c) 2006-2015, Arvid Norberg & Daniel Wallin +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in + the documentation and/or other materials provided with the distribution. + * Neither the name of the author nor the names of its + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. + +*/ + +#include +#include +#include +#include + +namespace libtorrent { namespace dht +{ + +put_data::put_data(node& dht_node, put_callback const& callback) + : traversal_algorithm(dht_node, (node_id::min)()) + , m_put_callback(callback) + , m_done(false) +{ +} + +char const* put_data::name() const { return "put_data"; } + +void put_data::set_targets(std::vector > const& targets) +{ + for (std::vector >::const_iterator i = targets.begin() + , end(targets.end()); i != end; ++i) + { + void* ptr = m_node.m_rpc.allocate_observer(); + if (ptr == 0) return; + + observer_ptr o(new (ptr) put_data_observer(this, i->first.ep() + , i->first.id, i->second)); + + #if defined TORRENT_DEBUG || defined TORRENT_RELEASE_ASSERTS + o->m_in_constructor = false; + #endif + m_results.push_back(o); + } +} + +void put_data::done() +{ + if (m_invoke_count != 0) return; + m_done = true; + +#ifndef TORRENT_DISABLE_LOGGING + get_node().observer()->log(dht_logger::traversal, "[%p] %s DONE, response %d, timeout %d" + , static_cast(this), name(), m_responses, m_timeouts); +#endif + + m_put_callback(m_data, m_responses); + traversal_algorithm::done(); +} + +bool put_data::invoke(observer_ptr o) +{ + if (m_done) + { + m_invoke_count = -1; + return false; + } + put_data_observer* po = static_cast(o.get()); + + entry e; + e["y"] = "q"; + e["q"] = "put"; + entry& a = e["a"]; + a["v"] = m_data.value(); + a["token"] = po->m_token; + if (m_data.is_mutable()) + { + a["k"] = std::string(m_data.pk().data(), item_pk_len); + a["seq"] = m_data.seq(); + a["sig"] = std::string(m_data.sig().data(), item_sig_len); + if (!m_data.salt().empty()) + { + a["salt"] = m_data.salt(); + } + } + + return m_node.m_rpc.invoke(e, o->target_ep(), o); +} + +} } // namespace libtorrent::dht + diff --git a/src/kademlia/rpc_manager.cpp b/src/kademlia/rpc_manager.cpp index a97ad0836..b36e1b45c 100644 --- a/src/kademlia/rpc_manager.cpp +++ b/src/kademlia/rpc_manager.cpp @@ -46,6 +46,7 @@ POSSIBILITY OF SUCH DAMAGE. #include #include #include +#include #include #include #include @@ -158,7 +159,7 @@ void observer::set_id(node_id const& id) enum { observer_size = max3< sizeof(find_data_observer) , sizeof(announce_observer) - , sizeof(null_observer) + , sizeof(put_data_observer) >::value }; diff --git a/src/session_impl.cpp b/src/session_impl.cpp index 451e2a024..c970c131a 100644 --- a/src/session_impl.cpp +++ b/src/session_impl.cpp @@ -5648,13 +5648,24 @@ retry: namespace { - void on_dht_put_immutable_item(alert_manager& alerts, sha1_hash target) + void on_dht_put_immutable_item(alert_manager& alerts, sha1_hash target, int num) { if (alerts.should_post()) - alerts.emplace_alert(target); + alerts.emplace_alert(target, num); } - void put_mutable_callback(alert_manager& alerts, dht::item& i + void on_dht_put_mutable_item(alert_manager& alerts, dht::item& i, int num) + { + boost::array sig = i.sig(); + boost::array pk = i.pk(); + boost::uint64_t seq = i.seq(); + std::string salt = i.salt(); + + if (alerts.should_post()) + alerts.emplace_alert(pk, sig, salt, seq, num); + } + + void put_mutable_callback(dht::item& i , boost::function& , boost::uint64_t&, std::string const&)> cb) { @@ -5665,9 +5676,6 @@ retry: std::string salt = i.salt(); cb(value, sig, seq, salt); i.assign(value, salt, seq, pk.data(), sig.data()); - - if (alerts.should_post()) - alerts.emplace_alert(pk, sig, salt, seq); } void on_dht_get_peers(alert_manager& alerts, sha1_hash info_hash, std::vector const& peers) @@ -5690,7 +5698,7 @@ retry: { if (!m_dht) return; m_dht->put_item(data, boost::bind(&on_dht_put_immutable_item, boost::ref(m_alerts) - , target)); + , target, _1)); } void session_impl::dht_put_mutable_item(boost::array key @@ -5699,8 +5707,9 @@ retry: , std::string salt) { if (!m_dht) return; - m_dht->put_item(key.data(), boost::bind(&put_mutable_callback - , boost::ref(m_alerts), _1, cb), salt); + m_dht->put_item(key.data(), + boost::bind(&on_dht_put_mutable_item, boost::ref(m_alerts), _1, _2), + boost::bind(&put_mutable_callback, _1, cb), salt); } void session_impl::dht_get_peers(sha1_hash const& info_hash) diff --git a/test/test_dht.cpp b/test/test_dht.cpp index e3e5d6f8d..9a4c9e95f 100644 --- a/test/test_dht.cpp +++ b/test/test_dht.cpp @@ -481,19 +481,35 @@ std::vector g_got_items; dht::item g_put_item; int g_put_count; -bool get_item_cb(dht::item& i, bool a) +void get_mutable_item_cb(dht::item& i, bool a) { - // only count authoritative data - if (!a) return false; + if (!a) return; if (!i.empty()) g_got_items.push_back(i); - if (!g_put_item.empty()) - { - i = g_put_item; - g_put_count++; - return true; - } - return false; +} + +void put_mutable_item_data_cb(dht::item& i) +{ + if (!i.empty()) + g_got_items.push_back(i); + + TEST_CHECK(!g_put_item.empty()); + i = g_put_item; + g_put_count++; +} + +void put_mutable_item_cb(dht::item&, bool) +{ +} + +void get_immutable_item_cb(dht::item& i) +{ + if (!i.empty()) + g_got_items.push_back(i); +} + +void put_immutable_item_cb(bool) +{ } struct obs : dht::dht_observer @@ -1633,7 +1649,7 @@ TORRENT_TEST(dht) udp::endpoint initial_node(address_v4::from_string("4.4.4.4"), 1234); node.m_table.add_node(initial_node); - node.get_item(items[0].target, get_item_cb); + node.get_item(items[0].target, get_immutable_item_cb); TEST_EQUAL(g_sent_packets.size(), 1); if (g_sent_packets.empty()) break; @@ -1679,8 +1695,7 @@ TORRENT_TEST(dht) udp::endpoint initial_node(address_v4::from_string("4.4.4.4"), 1234); node.m_table.add_node(initial_node); - sha1_hash target = hasher(public_key, item_pk_len).final(); - node.get_item(target, get_item_cb); + node.get_item(public_key, std::string(), get_mutable_item_cb); TEST_EQUAL(g_sent_packets.size(), 1); if (g_sent_packets.empty()) break; @@ -1693,7 +1708,6 @@ TORRENT_TEST(dht) { TEST_EQUAL(parsed[0].string_value(), "q"); TEST_EQUAL(parsed[2].string_value(), "get"); - TEST_EQUAL(parsed[5].string_value(), target.to_string()); if (parsed[0].string_value() != "q" || parsed[2].string_value() != "get") break; } else @@ -1765,8 +1779,7 @@ TORRENT_TEST(dht) for (int i = 0; i < num_test_nodes; ++i) node.m_table.add_node(nodes[i]); - g_put_item.assign(items[0].ent); - node.get_item(items[0].target, get_item_cb); + node.put_item(items[0].target, items[0].ent, put_immutable_item_cb); TEST_EQUAL(g_sent_packets.size(), num_test_nodes); if (g_sent_packets.size() != num_test_nodes) break; @@ -1793,7 +1806,6 @@ TORRENT_TEST(dht) g_sent_packets.erase(packet); } - TEST_EQUAL(g_put_count, 1); TEST_EQUAL(g_sent_packets.size(), num_test_nodes); if (g_sent_packets.size() != num_test_nodes) break; @@ -1847,10 +1859,9 @@ TORRENT_TEST(dht) for (int i = 0; i < num_test_nodes; ++i) node.m_table.add_node(nodes[i]); - sha1_hash target = hasher(public_key, item_pk_len).final(); g_put_item.assign(items[0].ent, empty_salt, seq, public_key, private_key); std::string sig(g_put_item.sig().data(), item_sig_len); - node.get_item(target, get_item_cb); + node.put_item(public_key, std::string(), put_mutable_item_cb, put_mutable_item_data_cb); TEST_EQUAL(g_sent_packets.size(), num_test_nodes); if (g_sent_packets.size() != num_test_nodes) break; @@ -2308,7 +2319,7 @@ TORRENT_TEST(read_only_node) bdecode_node request; sha1_hash target = generate_next(); - node.get_item(target, get_item_cb); + node.get_item(target, get_immutable_item_cb); TEST_EQUAL(g_sent_packets.size(), 1); TEST_EQUAL(g_sent_packets.front().first, initial_node); @@ -2342,7 +2353,7 @@ TORRENT_TEST(read_only_node) g_sent_packets.clear(); target = generate_next(); - node.get_item(target, get_item_cb); + node.get_item(target, get_immutable_item_cb); // since we have 2 nodes, we should have two packets. TEST_EQUAL(g_sent_packets.size(), 2); diff --git a/tools/dht_put.cpp b/tools/dht_put.cpp index dac6eb8a6..4c2152e12 100644 --- a/tools/dht_put.cpp +++ b/tools/dht_put.cpp @@ -320,7 +320,9 @@ int main(int argc, char* argv[]) printf("PUT %s\n", to_hex(target.to_string()).c_str()); - wait_for_alert(s, dht_put_alert::alert_type); + alert* a = wait_for_alert(s, dht_put_alert::alert_type); + dht_put_alert* pa = alert_cast(a); + printf("%s\n", pa->message().c_str()); } else if (strcmp(argv[0], "mput") == 0) { @@ -353,10 +355,12 @@ int main(int argc, char* argv[]) s.dht_put_item(public_key, boost::bind(&put_string, _1, _2, _3, _4 , public_key.data(), private_key.data(), argv[0])); - printf("public key: %s\n", to_hex(std::string(public_key.data() + printf("MPUT publick key: %s\n", to_hex(std::string(public_key.data() , public_key.size())).c_str()); - wait_for_alert(s, dht_put_alert::alert_type); + alert* a = wait_for_alert(s, dht_put_alert::alert_type); + dht_put_alert* pa = alert_cast(a); + printf("%s\n", pa->message().c_str()); } else if (strcmp(argv[0], "mget") == 0) { @@ -380,6 +384,7 @@ int main(int argc, char* argv[]) bootstrap(s); s.dht_get_item(public_key); + printf("MGET %s\n", argv[0]); bool authoritative = false;