Merge pull request #178 from thomas-yuan/put

Make dht_put_alert more accurate.
This commit is contained in:
Arvid Norberg 2015-11-21 02:30:45 -05:00
commit 42b129b9d5
18 changed files with 528 additions and 281 deletions

View File

@ -127,6 +127,7 @@ set(kademlia_sources
refresh
rpc_manager
find_data
put_data
node_id
routing_table
traversal_algorithm

View File

@ -706,6 +706,7 @@ KADEMLIA_SOURCES =
get_peers
item
get_item
put_data
;
ED25519_SOURCES =

View File

@ -187,6 +187,7 @@ nobase_include_HEADERS = \
kademlia/direct_request.hpp \
kademlia/dos_blocker.hpp \
kademlia/find_data.hpp \
kademlia/put_data.hpp \
kademlia/msg.hpp \
kademlia/node.hpp \
kademlia/node_entry.hpp \

View File

@ -2045,11 +2045,12 @@ namespace libtorrent
struct TORRENT_EXPORT dht_put_alert: alert
{
// internal
dht_put_alert(aux::stack_allocator& alloc, sha1_hash const& t);
dht_put_alert(aux::stack_allocator& alloc, sha1_hash const& t, int n);
dht_put_alert(aux::stack_allocator& alloc, boost::array<char, 32> key
, boost::array<char, 64> sig
, std::string s
, boost::uint64_t sequence_number);
, boost::uint64_t sequence_number
, int n);
TORRENT_DEFINE_ALERT(dht_put_alert, 76)
@ -2066,6 +2067,12 @@ namespace libtorrent
boost::array<char, 64> signature;
std::string salt;
boost::uint64_t seq;
// DHT put operation usually writes item to k nodes, maybe the node
// is stale so no response, or the node doesn't support 'put', or the
// token for write is out of date, etc. num_success is the number of
// successful responses we got from the puts.
int num_success;
};
// this alert is used to report errors in the i2p SAM connection

View File

@ -101,11 +101,18 @@ namespace libtorrent { namespace dht
, boost::function<void(item const&, bool)> cb
, std::string salt = std::string());
// for immutable_item.
// the callback function will be called when put operation is done.
// the int parameter indicates the success numbers of put operation.
void put_item(entry data
, boost::function<void()> cb);
, boost::function<void(int)> cb);
// for mutable_item.
// the data_cb will be called when we get authoritative mutable_item,
// the cb is same as put immutable_item.
void put_item(char const* key
, boost::function<void(item&)> cb, std::string salt = std::string());
, boost::function<void(item&, int)> cb
, boost::function<void(item&)> data_cb, std::string salt = std::string());
// send an arbitrary DHT request directly to a node
void direct_request(udp::endpoint ep, entry& e

View File

@ -43,7 +43,7 @@ namespace libtorrent { namespace dht
class get_item : public find_data
{
public:
typedef boost::function<bool(item&, bool)> data_callback;
typedef boost::function<void(item&, bool)> data_callback;
void got_data(bdecode_node const& v,
char const* pk,
@ -53,13 +53,15 @@ public:
// for immutable itms
get_item(node& dht_node
, node_id target
, data_callback const& dcallback);
, data_callback const& dcallback
, nodes_callback const& ncallback);
// for mutable items
get_item(node& dht_node
, char const* pk
, std::string const& salt
, data_callback const& dcallback);
, data_callback const& dcallback
, nodes_callback const& ncallback);
virtual char const* name() const;
@ -68,11 +70,10 @@ protected:
virtual bool invoke(observer_ptr o);
virtual void done();
void put(std::vector<std::pair<node_entry, std::string> > const& v);
data_callback m_data_callback;
item m_data;
std::string m_salt;
bool m_immutable;
};
class get_item_observer : public find_data_observer

View File

@ -44,6 +44,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include <libtorrent/kademlia/node_id.hpp>
#include <libtorrent/kademlia/msg.hpp>
#include <libtorrent/kademlia/find_data.hpp>
#include <libtorrent/kademlia/put_data.hpp>
#include <libtorrent/kademlia/item.hpp>
#include <libtorrent/io.hpp>
@ -176,8 +177,13 @@ public:
void direct_request(udp::endpoint ep, entry& e
, boost::function<void(msg const&)> f);
void get_item(sha1_hash const& target, boost::function<bool(item&, bool)> f);
void get_item(char const* pk, std::string const& salt, boost::function<bool(item&, bool)> f);
void get_item(sha1_hash const& target, boost::function<void(item&)> f);
void get_item(char const* pk, std::string const& salt, boost::function<void(item&, bool)> f);
void put_item(sha1_hash const& target, entry& data, boost::function<void(int)> f);
void put_item(char const* pk, std::string const& salt
, boost::function<void(item&, int)> f
, boost::function<void(item&)> data_cb);
bool verify_token(std::string const& token, char const* info_hash
, udp::endpoint const& addr) const;

View File

@ -0,0 +1,94 @@
/*
Copyright (c) 2006-2015, Arvid Norberg, Thomas Yuan
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in
the documentation and/or other materials provided with the distribution.
* Neither the name of the author nor the names of its
contributors may be used to endorse or promote products derived
from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef TORRENT_PUT_DATA_HPP
#define TORRENT_PUT_DATA_HPP
#include <libtorrent/kademlia/traversal_algorithm.hpp>
#include <libtorrent/kademlia/node_id.hpp>
#include <libtorrent/kademlia/observer.hpp>
#include <libtorrent/kademlia/item.hpp>
#include "libtorrent/aux_/disable_warnings_push.hpp"
#include <boost/function/function1.hpp>
#include <boost/function/function2.hpp>
#include <vector>
#include "libtorrent/aux_/disable_warnings_pop.hpp"
namespace libtorrent { namespace dht
{
struct msg;
class node;
struct put_data: traversal_algorithm
{
typedef boost::function<void(item&, int)> put_callback;
put_data(node& node, put_callback const& callback);
virtual char const* name() const;
void set_data(item const& data) { m_data = data; }
void set_targets(std::vector<std::pair<node_entry, std::string> > const& targets);
protected:
virtual void done();
virtual bool invoke(observer_ptr o);
put_callback m_put_callback;
item m_data;
bool m_done;
};
struct put_data_observer : traversal_observer
{
put_data_observer(
boost::intrusive_ptr<traversal_algorithm> const& algorithm
, udp::endpoint const& ep, node_id const& id, std::string const& token)
: traversal_observer(algorithm, ep, id)
, m_token(token)
{
}
virtual void reply(msg const&) { done(); }
std::string m_token;
};
} } // namespace libtorrent::dht
#endif // TORRENT_PUT_DATA_HPP

View File

@ -7,6 +7,7 @@ KADEMLIA_SOURCES = \
kademlia/dht_storage.cpp \
kademlia/dht_tracker.cpp \
kademlia/find_data.cpp \
kademlia/put_data.cpp \
kademlia/node.cpp \
kademlia/node_entry.cpp \
kademlia/node_id.cpp \

View File

@ -1472,21 +1472,24 @@ namespace libtorrent {
return msg;
}
dht_put_alert::dht_put_alert(aux::stack_allocator&, sha1_hash const& t)
dht_put_alert::dht_put_alert(aux::stack_allocator&, sha1_hash const& t, int n)
: target(t)
, seq(0)
, num_success(n)
{}
dht_put_alert::dht_put_alert(aux::stack_allocator&
, boost::array<char, 32> key
, boost::array<char, 64> sig
, std::string s
, boost::uint64_t sequence_number)
, boost::uint64_t sequence_number
, int n)
: target(0)
, public_key(key)
, signature(sig)
, salt(s)
, seq(sequence_number)
, num_success(n)
{}
std::string dht_put_alert::message() const
@ -1494,7 +1497,8 @@ namespace libtorrent {
char msg[1050];
if (target.is_all_zeros())
{
snprintf(msg, sizeof(msg), "DHT put complete (key=%s sig=%s salt=%s seq=%" PRId64 ")"
snprintf(msg, sizeof(msg), "DHT put complete (success=%d key=%s sig=%s salt=%s seq=%" PRId64 ")"
, num_success
, to_hex(std::string(&public_key[0], 32)).c_str()
, to_hex(std::string(&signature[0], 64)).c_str()
, salt.c_str()
@ -1502,7 +1506,8 @@ namespace libtorrent {
return msg;
}
snprintf(msg, sizeof(msg), "DHT put complete (hash=%s)"
snprintf(msg, sizeof(msg), "DHT put commplete (success=%d hash=%s)"
, num_success
, to_hex(target.to_string()).c_str());
return msg;
}

View File

@ -239,53 +239,10 @@ namespace libtorrent { namespace dht
m_dht.announce(ih, listen_port, flags, f);
}
namespace {
// these functions provide a slightly higher level
// interface to the get/put functionality in the DHT
bool get_immutable_item_callback(item& it, boost::function<void(item const&)> f)
{
// the reason to wrap here is to control the return value
// since it controls whether we re-put the content
TORRENT_ASSERT(!it.is_mutable());
f(it);
return false;
}
bool get_mutable_item_callback(item& it, bool authoritative, boost::function<void(item const&, bool)> f)
{
// the reason to wrap here is to control the return value
// since it controls whether we re-put the content
TORRENT_ASSERT(it.is_mutable());
f(it, authoritative);
return false;
}
bool put_immutable_item_callback(item& it, boost::function<void()> f
, entry data)
{
TORRENT_ASSERT(!it.is_mutable());
it.assign(data);
// TODO: ideally this function would be called when the
// put completes
f();
return true;
}
bool put_mutable_item_callback(item& it, bool authoritative, boost::function<void(item&)> cb)
{
if (authoritative) {
cb(it);
}
return true;
}
} // anonymous namespace
void dht_tracker::get_item(sha1_hash const& target
, boost::function<void(item const&)> cb)
{
m_dht.get_item(target, boost::bind(&get_immutable_item_callback, _1, cb));
m_dht.get_item(target, cb);
}
// key is a 32-byte binary string, the public key to look up.
@ -294,26 +251,25 @@ namespace libtorrent { namespace dht
, boost::function<void(item const&, bool)> cb
, std::string salt)
{
m_dht.get_item(key, salt, boost::bind(&get_mutable_item_callback, _1, _2, cb));
m_dht.get_item(key, salt, cb);
}
void dht_tracker::put_item(entry data
, boost::function<void()> cb)
, boost::function<void(int)> cb)
{
std::string flat_data;
bencode(std::back_inserter(flat_data), data);
sha1_hash target = item_target_id(
std::pair<char const*, int>(flat_data.c_str(), flat_data.size()));
m_dht.get_item(target, boost::bind(&put_immutable_item_callback
, _1, cb, data));
m_dht.put_item(target, data, cb);
}
void dht_tracker::put_item(char const* key
, boost::function<void(item&)> cb, std::string salt)
, boost::function<void(item&, int)> cb
, boost::function<void(item&)> data_cb, std::string salt)
{
m_dht.get_item(key, salt, boost::bind(&put_mutable_item_callback
, _1, _2, cb));
m_dht.put_item(key, salt, cb, data_cb);
}
void dht_tracker::direct_request(udp::endpoint ep, entry& e

View File

@ -50,78 +50,64 @@ void get_item::got_data(bdecode_node const& v,
char const* sig)
{
// we received data!
// if no data_callback, we needn't care about the data we get.
// only put_immutable_item no data_callback
if (!m_data_callback) return;
std::pair<char const*, int> salt(m_salt.c_str(), int(m_salt.size()));
sha1_hash incoming_target;
if (pk)
incoming_target = item_target_id(salt, pk);
else
incoming_target = item_target_id(v.data_section());
if (incoming_target != m_target) return;
if (pk && sig)
// for get_immutable_item
if (m_immutable)
{
// this is mutable data. If it passes the signature
// check, remember it. Just keep the version with
// the highest sequence number.
if (m_data.empty() || m_data.seq() < seq)
{
if (!m_data.assign(v, salt, seq, pk, sig))
return;
// If m_data isn't empty, we should have post alert.
if (!m_data.empty()) return;
// for get_item, we should call callback when we get data,
// even if the date is not authoritative, we can update later.
// so caller can get response ASAP without waitting transaction
// time-out (15 seconds).
// for put_item, the callback function will do nothing
// if the data is non-authoritative.
// we can just ignore the return value here since for mutable
// data, we always need the transaction done.
m_data_callback(m_data, false);
}
}
else if (m_data.empty())
{
// this is the first time we receive data,
// and it's immutable
sha1_hash incoming_target = item_target_id(v.data_section());
if (incoming_target != m_target) return;
m_data.assign(v);
bool put_requested = m_data_callback(m_data, true);
// if we intend to put, we need to keep going
// until we find the closest nodes, since those
// are the ones we're putting to
if (put_requested)
{
#if TORRENT_USE_ASSERTS
std::vector<char> buffer;
bencode(std::back_inserter(buffer), m_data.value());
TORRENT_ASSERT(m_target == hasher(&buffer[0], buffer.size()).final());
#endif
// There can only be one true immutable item with a given id
// Now that we've got it and the user doesn't want to do a put
// there's no point in continuing to query other nodes
m_data_callback(m_data, true);
done();
// this function is called when we're done, passing
// in all relevant nodes we received data from close
// to the target.
m_nodes_callback = boost::bind(&get_item::put, this, _1);
}
else
{
// There can only be one true immutable item with a given id
// Now that we've got it and the user doesn't want to do a put
// there's no point in continuing to query other nodes
done();
}
return;
}
// immutalbe data should has been handled before this line, only mutable
// data can reach here, which means pk and sig must be valid.
if (!pk || !sig) return;
std::pair<char const*, int> salt(m_salt.c_str(), int(m_salt.size()));
sha1_hash incoming_target = item_target_id(salt, pk);
if (incoming_target != m_target) return;
// this is mutable data. If it passes the signature
// check, remember it. Just keep the version with
// the highest sequence number.
if (m_data.empty() || m_data.seq() < seq)
{
if (!m_data.assign(v, salt, seq, pk, sig))
return;
// for get_item, we should call callback when we get data,
// even if the date is not authoritative, we can update later.
// so caller can get response ASAP without waitting transaction
// time-out (15 seconds).
// for put_item, the callback function will do nothing
// if the data is non-authoritative.
m_data_callback(m_data, false);
}
}
get_item::get_item(
node& dht_node
, node_id target
, data_callback const& dcallback)
: find_data(dht_node, target, nodes_callback())
, data_callback const& dcallback
, nodes_callback const& ncallback)
: find_data(dht_node, target, ncallback)
, m_data_callback(dcallback)
, m_immutable(true)
{
}
@ -129,12 +115,14 @@ get_item::get_item(
node& dht_node
, char const* pk
, std::string const& salt
, data_callback const& dcallback)
, data_callback const& dcallback
, nodes_callback const& ncallback)
: find_data(dht_node, item_target_id(
std::make_pair(salt.c_str(), int(salt.size())), pk)
, nodes_callback())
, ncallback)
, m_data_callback(dcallback)
, m_data(pk, salt)
, m_immutable(false)
{
}
@ -170,92 +158,28 @@ bool get_item::invoke(observer_ptr o)
void get_item::done()
{
// no data_callback for immutable item put
if (!m_data_callback) return find_data::done();
if (m_data.is_mutable() || m_data.empty())
{
// for mutable data, now we have authoritative data since
// we've heard from everyone, to be sure we got the
// latest version of the data (i.e. highest sequence number)
bool put_requested = m_data_callback(m_data, true);
if (put_requested)
{
m_data_callback(m_data, true);
#if TORRENT_USE_ASSERTS
if (m_data.is_mutable())
{
TORRENT_ASSERT(m_target
== item_target_id(std::pair<char const*, int>(m_data.salt().c_str()
, m_data.salt().size())
, m_data.pk().data()));
}
else
{
std::vector<char> buffer;
bencode(std::back_inserter(buffer), m_data.value());
TORRENT_ASSERT(m_target == hasher(&buffer[0], buffer.size()).final());
}
#endif
// this function is called when we're done, passing
// in all relevant nodes we received data from close
// to the target.
m_nodes_callback = boost::bind(&get_item::put, this, _1);
}
}
find_data::done();
}
// this function sends a put message to the nodes
// closest to the target. Those nodes are passed in
// as the v argument
void get_item::put(std::vector<std::pair<node_entry, std::string> > const& v)
{
#ifndef TORRENT_DISABLE_LOGGING
// TODO: 3 it would be nice to not have to spend so much time rendering
// the bencoded dict if logging is disabled
get_node().observer()->log(dht_logger::traversal, "[%p] sending put "
"[ seq: %" PRId64 " nodes: %d ]"
, static_cast<void*>(this), (m_data.is_mutable() ? m_data.seq() : -1)
, int(v.size()));
#endif
// create a dummy traversal_algorithm
boost::intrusive_ptr<traversal_algorithm> algo(
new traversal_algorithm(m_node, (node_id::min)()));
// store on the first k nodes
for (std::vector<std::pair<node_entry, std::string> >::const_iterator i = v.begin()
, end(v.end()); i != end; ++i)
{
#ifndef TORRENT_DISABLE_LOGGING
get_node().observer()->log(dht_logger::traversal, "[%p] put-distance: %d"
, static_cast<void*>(this), 160 - distance_exp(m_target, i->first.id));
#endif
void* ptr = m_node.m_rpc.allocate_observer();
if (ptr == 0) return;
// TODO: 3 we don't support CAS errors here! we need a custom observer
observer_ptr o(new (ptr) announce_observer(algo, i->first.ep(), i->first.id));
#if TORRENT_USE_ASSERTS
o->m_in_constructor = false;
#endif
entry e;
e["y"] = "q";
e["q"] = "put";
entry& a = e["a"];
a["v"] = m_data.value();
a["token"] = i->second;
if (m_data.is_mutable())
{
a["k"] = std::string(m_data.pk().data(), item_pk_len);
a["seq"] = m_data.seq();
a["sig"] = std::string(m_data.sig().data(), item_sig_len);
if (!m_data.salt().empty())
{
a["salt"] = m_data.salt();
}
TORRENT_ASSERT(m_target
== item_target_id(std::pair<char const*, int>(m_data.salt().c_str()
, m_data.salt().size())
, m_data.pk().data()));
}
m_node.m_rpc.invoke(e, i->first.ep(), o);
#endif
}
find_data::done();
}
void get_item_observer::reply(msg const& m)

View File

@ -315,7 +315,6 @@ namespace
// create a dummy traversal_algorithm
boost::intrusive_ptr<traversal_algorithm> algo(
new traversal_algorithm(node, (node_id::min)()));
// store on the first k nodes
for (std::vector<std::pair<node_entry, std::string> >::const_iterator i = v.begin()
, end(v.end()); i != end; ++i)
@ -424,7 +423,7 @@ void node::direct_request(udp::endpoint ep, entry& e
}
void node::get_item(sha1_hash const& target
, boost::function<bool(item&, bool)> f)
, boost::function<void(item&)> f)
{
#ifndef TORRENT_DISABLE_LOGGING
if (m_observer)
@ -437,12 +436,12 @@ void node::get_item(sha1_hash const& target
#endif
boost::intrusive_ptr<dht::get_item> ta;
ta.reset(new dht::get_item(*this, target, f));
ta.reset(new dht::get_item(*this, target, boost::bind(f, _1), find_data::nodes_callback()));
ta->start();
}
void node::get_item(char const* pk, std::string const& salt
, boost::function<bool(item&, bool)> f)
, boost::function<void(item&, bool)> f)
{
#ifndef TORRENT_DISABLE_LOGGING
if (m_observer)
@ -454,7 +453,77 @@ void node::get_item(char const* pk, std::string const& salt
#endif
boost::intrusive_ptr<dht::get_item> ta;
ta.reset(new dht::get_item(*this, pk, salt, f));
ta.reset(new dht::get_item(*this, pk, salt, f, find_data::nodes_callback()));
ta->start();
}
namespace {
void put(std::vector<std::pair<node_entry, std::string> > const& nodes
, boost::intrusive_ptr<dht::put_data> ta)
{
ta->set_targets(nodes);
ta->start();
}
void put_data_cb(item& i, bool auth
, boost::intrusive_ptr<put_data> ta
, boost::function<void(item&)> f)
{
// call data_callback only when we got authoritative data.
if (auth)
{
f(i);
ta->set_data(i);
}
}
} // namespace
void node::put_item(sha1_hash const& target, entry& data, boost::function<void(int)> f)
{
#ifndef TORRENT_DISABLE_LOGGING
if (m_observer)
{
char hex_target[41];
to_hex(target.data(), 20, hex_target);
m_observer->log(dht_logger::node, "starting get for [ hash: %s ]"
, hex_target);
}
#endif
item i;
i.assign(data);
boost::intrusive_ptr<dht::put_data> put_ta;
put_ta.reset(new dht::put_data(*this, boost::bind(f, _2)));
put_ta->set_data(i);
boost::intrusive_ptr<dht::get_item> ta;
ta.reset(new dht::get_item(*this, target, get_item::data_callback(),
boost::bind(&put, _1, put_ta)));
ta->start();
}
void node::put_item(char const* pk, std::string const& salt
, boost::function<void(item&, int)> f
, boost::function<void(item&)> data_cb)
{
#ifndef TORRENT_DISABLE_LOGGING
if (m_observer)
{
char hex_key[65];
to_hex(pk, 32, hex_key);
m_observer->log(dht_logger::node, "starting get for [ key: %s ]", hex_key);
}
#endif
boost::intrusive_ptr<dht::put_data> put_ta;
put_ta.reset(new dht::put_data(*this, f));
boost::intrusive_ptr<dht::get_item> ta;
ta.reset(new dht::get_item(*this, pk, salt
, boost::bind(&put_data_cb, _1, _2, put_ta, data_cb)
, boost::bind(&put, _1, put_ta)));
ta->start();
}

112
src/kademlia/put_data.cpp Normal file
View File

@ -0,0 +1,112 @@
/*
Copyright (c) 2006-2015, Arvid Norberg & Daniel Wallin
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in
the documentation and/or other materials provided with the distribution.
* Neither the name of the author nor the names of its
contributors may be used to endorse or promote products derived
from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
*/
#include <libtorrent/kademlia/put_data.hpp>
#include <libtorrent/kademlia/dht_observer.hpp>
#include <libtorrent/kademlia/node.hpp>
#include <libtorrent/io.hpp>
namespace libtorrent { namespace dht
{
put_data::put_data(node& dht_node, put_callback const& callback)
: traversal_algorithm(dht_node, (node_id::min)())
, m_put_callback(callback)
, m_done(false)
{
}
char const* put_data::name() const { return "put_data"; }
void put_data::set_targets(std::vector<std::pair<node_entry, std::string> > const& targets)
{
for (std::vector<std::pair<node_entry, std::string> >::const_iterator i = targets.begin()
, end(targets.end()); i != end; ++i)
{
void* ptr = m_node.m_rpc.allocate_observer();
if (ptr == 0) return;
observer_ptr o(new (ptr) put_data_observer(this, i->first.ep()
, i->first.id, i->second));
#if defined TORRENT_DEBUG || defined TORRENT_RELEASE_ASSERTS
o->m_in_constructor = false;
#endif
m_results.push_back(o);
}
}
void put_data::done()
{
if (m_invoke_count != 0) return;
m_done = true;
#ifndef TORRENT_DISABLE_LOGGING
get_node().observer()->log(dht_logger::traversal, "[%p] %s DONE, response %d, timeout %d"
, static_cast<void*>(this), name(), m_responses, m_timeouts);
#endif
m_put_callback(m_data, m_responses);
traversal_algorithm::done();
}
bool put_data::invoke(observer_ptr o)
{
if (m_done)
{
m_invoke_count = -1;
return false;
}
put_data_observer* po = static_cast<put_data_observer*>(o.get());
entry e;
e["y"] = "q";
e["q"] = "put";
entry& a = e["a"];
a["v"] = m_data.value();
a["token"] = po->m_token;
if (m_data.is_mutable())
{
a["k"] = std::string(m_data.pk().data(), item_pk_len);
a["seq"] = m_data.seq();
a["sig"] = std::string(m_data.sig().data(), item_sig_len);
if (!m_data.salt().empty())
{
a["salt"] = m_data.salt();
}
}
return m_node.m_rpc.invoke(e, o->target_ep(), o);
}
} } // namespace libtorrent::dht

View File

@ -46,6 +46,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include <libtorrent/kademlia/rpc_manager.hpp>
#include <libtorrent/kademlia/routing_table.hpp>
#include <libtorrent/kademlia/find_data.hpp>
#include <libtorrent/kademlia/put_data.hpp>
#include <libtorrent/kademlia/refresh.hpp>
#include <libtorrent/kademlia/node.hpp>
#include <libtorrent/kademlia/observer.hpp>
@ -158,7 +159,7 @@ void observer::set_id(node_id const& id)
enum { observer_size = max3<
sizeof(find_data_observer)
, sizeof(announce_observer)
, sizeof(null_observer)
, sizeof(put_data_observer)
>::value
};

View File

@ -5648,13 +5648,24 @@ retry:
namespace {
void on_dht_put_immutable_item(alert_manager& alerts, sha1_hash target)
void on_dht_put_immutable_item(alert_manager& alerts, sha1_hash target, int num)
{
if (alerts.should_post<dht_put_alert>())
alerts.emplace_alert<dht_put_alert>(target);
alerts.emplace_alert<dht_put_alert>(target, num);
}
void put_mutable_callback(alert_manager& alerts, dht::item& i
void on_dht_put_mutable_item(alert_manager& alerts, dht::item& i, int num)
{
boost::array<char, 64> sig = i.sig();
boost::array<char, 32> pk = i.pk();
boost::uint64_t seq = i.seq();
std::string salt = i.salt();
if (alerts.should_post<dht_put_alert>())
alerts.emplace_alert<dht_put_alert>(pk, sig, salt, seq, num);
}
void put_mutable_callback(dht::item& i
, boost::function<void(entry&, boost::array<char,64>&
, boost::uint64_t&, std::string const&)> cb)
{
@ -5665,9 +5676,6 @@ retry:
std::string salt = i.salt();
cb(value, sig, seq, salt);
i.assign(value, salt, seq, pk.data(), sig.data());
if (alerts.should_post<dht_put_alert>())
alerts.emplace_alert<dht_put_alert>(pk, sig, salt, seq);
}
void on_dht_get_peers(alert_manager& alerts, sha1_hash info_hash, std::vector<tcp::endpoint> const& peers)
@ -5690,7 +5698,7 @@ retry:
{
if (!m_dht) return;
m_dht->put_item(data, boost::bind(&on_dht_put_immutable_item, boost::ref(m_alerts)
, target));
, target, _1));
}
void session_impl::dht_put_mutable_item(boost::array<char, 32> key
@ -5699,8 +5707,9 @@ retry:
, std::string salt)
{
if (!m_dht) return;
m_dht->put_item(key.data(), boost::bind(&put_mutable_callback
, boost::ref(m_alerts), _1, cb), salt);
m_dht->put_item(key.data(),
boost::bind(&on_dht_put_mutable_item, boost::ref(m_alerts), _1, _2),
boost::bind(&put_mutable_callback, _1, cb), salt);
}
void session_impl::dht_get_peers(sha1_hash const& info_hash)

View File

@ -481,19 +481,37 @@ std::vector<dht::item> g_got_items;
dht::item g_put_item;
int g_put_count;
bool get_item_cb(dht::item& i, bool a)
void get_mutable_item_cb(dht::item& i, bool a)
{
// only count authoritative data
if (!a) return false;
if (!a) return;
if (!i.empty())
g_got_items.push_back(i);
if (!g_put_item.empty())
{
i = g_put_item;
g_put_count++;
return true;
}
return false;
}
void put_mutable_item_data_cb(dht::item& i)
{
if (!i.empty())
g_got_items.push_back(i);
TEST_CHECK(!g_put_item.empty());
i = g_put_item;
g_put_count++;
}
void put_mutable_item_cb(dht::item&, int num, int expect)
{
TEST_EQUAL(num, expect);
}
void get_immutable_item_cb(dht::item& i)
{
if (!i.empty())
g_got_items.push_back(i);
}
void put_immutable_item_cb(int num, int expect)
{
TEST_EQUAL(num, expect);
}
struct obs : dht::dht_observer
@ -1633,7 +1651,7 @@ TORRENT_TEST(dht)
udp::endpoint initial_node(address_v4::from_string("4.4.4.4"), 1234);
node.m_table.add_node(initial_node);
node.get_item(items[0].target, get_item_cb);
node.get_item(items[0].target, get_immutable_item_cb);
TEST_EQUAL(g_sent_packets.size(), 1);
if (g_sent_packets.empty()) break;
@ -1679,8 +1697,7 @@ TORRENT_TEST(dht)
udp::endpoint initial_node(address_v4::from_string("4.4.4.4"), 1234);
node.m_table.add_node(initial_node);
sha1_hash target = hasher(public_key, item_pk_len).final();
node.get_item(target, get_item_cb);
node.get_item(public_key, std::string(), get_mutable_item_cb);
TEST_EQUAL(g_sent_packets.size(), 1);
if (g_sent_packets.empty()) break;
@ -1693,7 +1710,6 @@ TORRENT_TEST(dht)
{
TEST_EQUAL(parsed[0].string_value(), "q");
TEST_EQUAL(parsed[2].string_value(), "get");
TEST_EQUAL(parsed[5].string_value(), target.to_string());
if (parsed[0].string_value() != "q" || parsed[2].string_value() != "get") break;
}
else
@ -1754,24 +1770,39 @@ TORRENT_TEST(dht)
// immutable put
g_sent_packets.clear();
do
for (int loop = 0; loop < 9; loop++)
{
// set the branching factor to k to make this a little easier
int old_branching = sett.search_branching;
sett.search_branching = 8;
dht::node node(&s, sett, (node_id::min)(), &observer, cnt);
enum { num_test_nodes = 2 };
enum { num_test_nodes = 8 };
node_entry nodes[num_test_nodes] =
{ node_entry(generate_next(), udp::endpoint(address_v4::from_string("4.4.4.4"), 1234))
, node_entry(generate_next(), udp::endpoint(address_v4::from_string("5.5.5.5"), 1235)) };
{ node_entry(items[0].target, udp::endpoint(address_v4::from_string("1.1.1.1"), 1231))
, node_entry(items[1].target, udp::endpoint(address_v4::from_string("2.2.2.2"), 1232))
, node_entry(items[2].target, udp::endpoint(address_v4::from_string("3.3.3.3"), 1233))
, node_entry(items[3].target, udp::endpoint(address_v4::from_string("4.4.4.4"), 1234))
, node_entry(items[4].target, udp::endpoint(address_v4::from_string("5.5.5.5"), 1235))
, node_entry(items[5].target, udp::endpoint(address_v4::from_string("6.6.6.6"), 1236))
, node_entry(items[6].target, udp::endpoint(address_v4::from_string("7.7.7.7"), 1237))
, node_entry(items[7].target, udp::endpoint(address_v4::from_string("8.8.8.8"), 1238)) };
for (int i = 0; i < num_test_nodes; ++i)
node.m_table.add_node(nodes[i]);
g_put_item.assign(items[0].ent);
node.get_item(items[0].target, get_item_cb);
entry put_data;
put_data = "Hello world";
std::string flat_data;
bencode(std::back_inserter(flat_data), put_data);
sha1_hash target = item_target_id(
std::pair<char const*, int>(flat_data.c_str(), flat_data.size()));
TEST_EQUAL(g_sent_packets.size(), num_test_nodes);
if (g_sent_packets.size() != num_test_nodes) break;
node.put_item(target, put_data, boost::bind(&put_immutable_item_cb, _1, loop));
for (int i = 0; i < num_test_nodes; ++i)
TEST_EQUAL(g_sent_packets.size(), 8);
if (g_sent_packets.size() != 8) break;
for (int i = 0; i < 8; ++i)
{
std::list<std::pair<udp::endpoint, entry> >::iterator packet = find_packet(nodes[i].ep());
TEST_CHECK(packet != g_sent_packets.end());
@ -1788,18 +1819,19 @@ TORRENT_TEST(dht)
}
char t[10];
snprintf(t, sizeof(t), "%02d", i);
send_dht_response(node, response, nodes[i].ep()
, msg_args().token(t).port(1234).nid(nodes[i].id));
msg_args args;
args.token(t).port(1234).nid(nodes[i].id).nodes(nodes_t(1, nodes[i]));
send_dht_response(node, response, nodes[i].ep(), args);
g_sent_packets.erase(packet);
}
TEST_EQUAL(g_put_count, 1);
TEST_EQUAL(g_sent_packets.size(), num_test_nodes);
if (g_sent_packets.size() != num_test_nodes) break;
TEST_EQUAL(g_sent_packets.size(), 8);
if (g_sent_packets.size() != 8) break;
itemv.second = bencode(buffer, items[0].ent);
itemv.second = bencode(buffer, put_data);
for (int i = 0; i < num_test_nodes; ++i)
for (int i = 0; i < 8; ++i)
{
std::list<std::pair<udp::endpoint, entry> >::iterator packet = find_packet(nodes[i].ep());
TEST_CHECK(packet != g_sent_packets.end());
@ -1812,13 +1844,14 @@ TORRENT_TEST(dht)
{
TEST_EQUAL(parsed[0].string_value(), "q");
TEST_EQUAL(parsed[2].string_value(), "put");
std::pair<const char*, int> v = parsed[6].data_section();
TEST_EQUAL(v.second, itemv.second);
TEST_CHECK(memcmp(v.first, itemv.first, itemv.second) == 0);
std::pair<const char*, int>v = parsed[6].data_section();
TEST_EQUAL(std::string(v.first, v.second), flat_data);
char t[10];
snprintf(t, sizeof(t), "%02d", i);
TEST_EQUAL(parsed[5].string_value(), t);
if (parsed[0].string_value() != "q" || parsed[2].string_value() != "put") continue;
if (i < loop) send_dht_response(node, response, nodes[i].ep());
}
else
{
@ -1827,63 +1860,75 @@ TORRENT_TEST(dht)
continue;
}
}
sett.search_branching = old_branching;
g_sent_packets.clear();
g_put_item.clear();
g_put_count = 0;
} while (false);
};
// mutable put
g_sent_packets.clear();
do
for (int loop = 0; loop < 9; loop++)
{
// set the branching factor to k to make this a little easier
int old_branching = sett.search_branching;
sett.search_branching = 8;
dht::node node(&s, sett, (node_id::min)(), &observer, cnt);
enum { num_test_nodes = 2 };
enum { num_test_nodes = 8 };
node_entry nodes[num_test_nodes] =
{ node_entry(generate_next(), udp::endpoint(address_v4::from_string("4.4.4.4"), 1234))
, node_entry(generate_next(), udp::endpoint(address_v4::from_string("5.5.5.5"), 1235)) };
{ node_entry(items[0].target, udp::endpoint(address_v4::from_string("1.1.1.1"), 1231))
, node_entry(items[1].target, udp::endpoint(address_v4::from_string("2.2.2.2"), 1232))
, node_entry(items[2].target, udp::endpoint(address_v4::from_string("3.3.3.3"), 1233))
, node_entry(items[3].target, udp::endpoint(address_v4::from_string("4.4.4.4"), 1234))
, node_entry(items[4].target, udp::endpoint(address_v4::from_string("5.5.5.5"), 1235))
, node_entry(items[5].target, udp::endpoint(address_v4::from_string("6.6.6.6"), 1236))
, node_entry(items[6].target, udp::endpoint(address_v4::from_string("7.7.7.7"), 1237))
, node_entry(items[7].target, udp::endpoint(address_v4::from_string("8.8.8.8"), 1238)) };
for (int i = 0; i < num_test_nodes; ++i)
node.m_table.add_node(nodes[i]);
sha1_hash target = hasher(public_key, item_pk_len).final();
g_put_item.assign(items[0].ent, empty_salt, seq, public_key, private_key);
std::string sig(g_put_item.sig().data(), item_sig_len);
node.get_item(target, get_item_cb);
node.put_item(public_key, std::string()
, boost::bind(&put_mutable_item_cb, _1, _2, loop)
, put_mutable_item_data_cb);
TEST_EQUAL(g_sent_packets.size(), num_test_nodes);
if (g_sent_packets.size() != num_test_nodes) break;
TEST_EQUAL(g_sent_packets.size(), 8);
if (g_sent_packets.size() != 8) break;
for (int i = 0; i < num_test_nodes; ++i)
for (int i = 0; i < 8; ++i)
{
std::list<std::pair<udp::endpoint, entry> >::iterator packet = find_packet(nodes[i].ep());
TEST_CHECK(packet != g_sent_packets.end());
if (packet == g_sent_packets.end()) continue;
lazy_from_entry(packet->second, response);
ret = verify_message(response, get_item_desc, parsed, 6
, error_string, sizeof(error_string));
ret = verify_message(response, get_item_desc, parsed, 6, error_string
, sizeof(error_string));
if (!ret)
{
fprintf(stderr, " invalid mutable put request: %s\n", print_entry(response).c_str());
fprintf(stderr, " invalid get request: %s\n", print_entry(response).c_str());
TEST_ERROR(error_string);
continue;
}
char t[10];
snprintf(t, sizeof(t), "%02d", i);
send_dht_response(node, response, nodes[i].ep()
, msg_args().token(t).port(1234).nid(nodes[i].id));
msg_args args;
args.token(t).port(1234).nid(nodes[i].id).nodes(nodes_t(1, nodes[i]));
send_dht_response(node, response, nodes[i].ep(), args);
g_sent_packets.erase(packet);
}
TEST_EQUAL(g_put_count, 1);
TEST_EQUAL(g_sent_packets.size(), num_test_nodes);
if (g_sent_packets.size() != num_test_nodes) break;
TEST_EQUAL(g_sent_packets.size(), 8);
if (g_sent_packets.size() != 8) break;
itemv.second = bencode(buffer, items[0].ent);
for (int i = 0; i < num_test_nodes; ++i)
for (int i = 0; i < 8; ++i)
{
std::list<std::pair<udp::endpoint, entry> >::iterator packet = find_packet(nodes[i].ep());
TEST_CHECK(packet != g_sent_packets.end());
@ -1906,6 +1951,8 @@ TORRENT_TEST(dht)
snprintf(t, sizeof(t), "%02d", i);
TEST_EQUAL(parsed[9].string_value(), t);
if (parsed[0].string_value() != "q" || parsed[2].string_value() != "put") continue;
if (i < loop) send_dht_response(node, response, nodes[i].ep());
}
else
{
@ -1914,12 +1961,11 @@ TORRENT_TEST(dht)
continue;
}
}
sett.search_branching = old_branching;
g_sent_packets.clear();
g_put_item.clear();
g_put_count = 0;
} while (false);
}
// verify that done() is only invoked once
// See PR 252
@ -1952,10 +1998,11 @@ TORRENT_TEST(dht)
for (int i = 0; i < 8; ++i)
node.m_table.add_node(nodes[i]);
// kick off a mutable get request
// kick off a mutable put request
g_put_item.assign(items[0].ent, empty_salt, seq, public_key, private_key);
node.get_item(target, get_item_cb);
node.put_item(public_key, std::string()
, boost::bind(&put_mutable_item_cb, _1, _2, 0)
, put_mutable_item_data_cb);
TEST_EQUAL(g_sent_packets.size(), 8);
if (g_sent_packets.size() != 8) break;
@ -2308,7 +2355,7 @@ TORRENT_TEST(read_only_node)
bdecode_node request;
sha1_hash target = generate_next();
node.get_item(target, get_item_cb);
node.get_item(target, get_immutable_item_cb);
TEST_EQUAL(g_sent_packets.size(), 1);
TEST_EQUAL(g_sent_packets.front().first, initial_node);
@ -2342,7 +2389,7 @@ TORRENT_TEST(read_only_node)
g_sent_packets.clear();
target = generate_next();
node.get_item(target, get_item_cb);
node.get_item(target, get_immutable_item_cb);
// since we have 2 nodes, we should have two packets.
TEST_EQUAL(g_sent_packets.size(), 2);

View File

@ -320,7 +320,9 @@ int main(int argc, char* argv[])
printf("PUT %s\n", to_hex(target.to_string()).c_str());
wait_for_alert(s, dht_put_alert::alert_type);
alert* a = wait_for_alert(s, dht_put_alert::alert_type);
dht_put_alert* pa = alert_cast<dht_put_alert>(a);
printf("%s\n", pa->message().c_str());
}
else if (strcmp(argv[0], "mput") == 0)
{
@ -353,10 +355,12 @@ int main(int argc, char* argv[])
s.dht_put_item(public_key, boost::bind(&put_string, _1, _2, _3, _4
, public_key.data(), private_key.data(), argv[0]));
printf("public key: %s\n", to_hex(std::string(public_key.data()
printf("MPUT publick key: %s\n", to_hex(std::string(public_key.data()
, public_key.size())).c_str());
wait_for_alert(s, dht_put_alert::alert_type);
alert* a = wait_for_alert(s, dht_put_alert::alert_type);
dht_put_alert* pa = alert_cast<dht_put_alert>(a);
printf("%s\n", pa->message().c_str());
}
else if (strcmp(argv[0], "mget") == 0)
{
@ -380,6 +384,7 @@ int main(int argc, char* argv[])
bootstrap(s);
s.dht_get_item(public_key);
printf("MGET %s\n", argv[0]);
bool authoritative = false;