expose new DHT put/get functionality in the public session API

This commit is contained in:
Arvid Norberg 2014-02-24 00:31:13 +00:00
parent bfdb445890
commit 1188ec2dcd
12 changed files with 446 additions and 22 deletions

View File

@ -1943,7 +1943,66 @@ namespace libtorrent
op_t operation;
};
// this alert is posted as a response to a call to session::get_item(),
// specifically the overload for looking up immutable items in the DHT.
struct TORRENT_EXPORT dht_immutable_item_alert: alert
{
dht_immutable_item_alert(sha1_hash const& t, entry const& i)
: target(t), item(i) {}
TORRENT_DEFINE_ALERT(dht_immutable_item_alert);
const static int static_category = alert::error_notification
| alert::dht_notification;
virtual std::string message() const;
virtual bool discardable() const { return false; }
// the target hash of the immutable item. This must
// match the sha-1 hash of the bencoded form of ``item``.
sha1_hash target;
// the data for this item
entry item;
};
// this alert is posted as a response to a call to session::get_item(),
// specifically the overload for looking up mutable items in the DHT.
struct TORRENT_EXPORT dht_mutable_item_alert: alert
{
dht_mutable_item_alert(boost::array<char, 32> k
, boost::array<char, 64> sig
, boost::uint64_t sequence
, std::string const& s
, entry const& i)
: key(k), signature(sig), seq(sequence), salt(s), item(i) {}
TORRENT_DEFINE_ALERT(dht_mutable_item_alert);
const static int static_category = alert::error_notification
| alert::dht_notification;
virtual std::string message() const;
virtual bool discardable() const { return false; }
// the public key that was looked up
boost::array<char, 32> key;
// the signature of the data. This is not the signature of the
// plain encoded form of the item, but it includes the sequence number
// and possibly the hash as well. See the dht_store document for more
// information. This is primarily useful for echoing back in a store
// request.
boost::array<char, 64> signature;
// the sequence number of this item
boost::uint64_t seq;
// the salf, if any, used to lookup and store this item. If no
// salt was used, this is an empty string
std::string salt;
// the data for this item
entry item;
};
#undef TORRENT_DEFINE_ALERT
}

View File

@ -119,6 +119,7 @@ namespace libtorrent
namespace dht
{
struct dht_tracker;
class item;
}
struct bencode_map_entry;
@ -313,6 +314,22 @@ namespace libtorrent
// the DHT, to get the initial peers quickly
void prioritize_dht(boost::weak_ptr<torrent> t);
void get_immutable_callback(sha1_hash target
, dht::item const& i);
void get_mutable_callback(dht::item const& i);
void dht_get_immutable_item(sha1_hash const& target);
void dht_get_mutable_item(boost::array<char, 32> key
, std::string salt = std::string());
void dht_put_item(entry data);
void dht_put_mutable_item(boost::array<char, 32> key
, boost::function<void(entry&, boost::array<char,64>&
, boost::uint64_t&, std::string const&)> cb
, std::string salt = std::string());
#ifndef TORRENT_NO_DEPRECATE
entry dht_state() const;
#endif

View File

@ -93,6 +93,21 @@ namespace libtorrent { namespace dht
void announce(sha1_hash const& ih, int listen_port, int flags
, boost::function<void(std::vector<tcp::endpoint> const&)> f);
void get_item(sha1_hash const& target
, boost::function<void(item const&)> cb);
// key is a 32-byte binary string, the public key to look up.
// the salt is optional
void get_item(char const* key
, boost::function<void(item const&)> cb
, std::string salt = std::string());
void put_item(entry data
, boost::function<void()> cb);
void put_item(char const* key
, boost::function<void(item&)> cb, std::string salt = std::string());
void dht_status(session_status& s);
void network_stats(int& sent, int& received);

View File

@ -38,10 +38,14 @@ POSSIBILITY OF SUCH DAMAGE.
#include <libtorrent/entry.hpp>
#include <vector>
#include <exception>
#include <boost/array.hpp>
namespace libtorrent { namespace dht
{
// calculate the target hash for an item. Either v must be specified,
// which is the content. That's for immutable items. For mutable items,
// instead specify the salt and the public key (pk).
sha1_hash TORRENT_EXTRA_EXPORT item_target_id(
std::pair<char const*, int> v
, std::pair<char const*, int> salt
@ -54,7 +58,16 @@ bool TORRENT_EXTRA_EXPORT verify_mutable_item(
char const* pk,
char const* sig);
void TORRENT_EXTRA_EXPORT sign_mutable_item(
// TODO: since this is a public function, it should probably be moved
// out of this header and into one with other public functions.
// given a byte range ``v`` and an optional byte range ``salt``, a
// sequence number, public key ``pk`` (must be 32 bytes) and a secret key
// ``sk`` (must be 64 bytes), this function produces a signature which
// is written into a 64 byte buffer pointed to by ``sig``. The caller
// is responsible for allocating the destination buffer that's passed in
// as the ``sig`` argument. Typically it would be allocated on the stack.
void TORRENT_EXPORT sign_mutable_item(
std::pair<char const*, int> v,
std::pair<char const*, int> salt,
boost::uint64_t seq,
@ -93,18 +106,20 @@ public:
void assign(entry const& v)
{
assign(v, std::pair<char const*, int>(static_cast<char const*>(NULL), 0), 0, NULL, NULL);
assign(v, std::pair<char const*, int>(static_cast<char const*>(NULL)
, 0), 0, NULL, NULL);
}
void assign(entry const& v
, std::pair<char const*, int> salt
void assign(entry const& v, std::pair<char const*, int> salt
, boost::uint64_t seq, char const* pk, char const* sk);
void assign(lazy_entry const* v)
{
assign(v, std::pair<char const*, int>(static_cast<char const*>(NULL), 0), 0, NULL, NULL);
assign(v, std::pair<char const*, int>(static_cast<char const*>(NULL)
, 0), 0, NULL, NULL);
}
bool assign(lazy_entry const* v
, std::pair<char const*, int> salt
bool assign(lazy_entry const* v, std::pair<char const*, int> salt
, boost::uint64_t seq, char const* pk, char const* sig);
void assign(entry const& v, std::string salt, boost::uint64_t seq
, char const* pk, char const* sig);
void clear() { m_value = entry(); }
bool empty() const { return m_value.type() == entry::undefined_t; }
@ -114,15 +129,18 @@ public:
sha1_hash cas();
entry const& value() const { return m_value; }
char const* pk() { TORRENT_ASSERT(m_mutable); return m_pk; }
char const* sig() { TORRENT_ASSERT(m_mutable); return m_sig; }
boost::uint64_t seq() { TORRENT_ASSERT(m_mutable); return m_seq; }
boost::array<char, item_pk_len> const& pk() const
{ TORRENT_ASSERT(m_mutable); return m_pk; }
boost::array<char, item_sig_len> const& sig() const
{ TORRENT_ASSERT(m_mutable); return m_sig; }
boost::uint64_t seq() const { TORRENT_ASSERT(m_mutable); return m_seq; }
std::string const& salt() const { return m_salt; }
private:
entry m_value;
std::string m_salt;
char m_pk[item_pk_len];
char m_sig[item_sig_len];
boost::array<char, item_pk_len> m_pk;
boost::array<char, item_sig_len> m_sig;
boost::uint64_t m_seq;
bool m_mutable;
};

View File

@ -502,6 +502,68 @@ namespace libtorrent
void add_dht_node(std::pair<std::string, int> const& node);
void add_dht_router(std::pair<std::string, int> const& node);
// query the DHT for an immutable item at the ``target`` hash.
// the result is posted as a dht_immutable_item_alert.
void dht_get_item(sha1_hash const& target);
// query the DHT for a mutable item under the public key ``key``.
// this is an ed25519 key. ``salt`` is optional and may be left
// as an empty string if no salt is to be used.
// if the item is found in the DHT, a dht_mutable_item_alert is
// posted.
void dht_get_item(boost::array<char, 32> key
, std::string salt = std::string());
// store the given bencoded data as an immutable item in the DHT.
// the returned hash is the key that is to be used to look the item
// up agan. It's just the sha-1 hash of the bencoded form of the
// structure.
sha1_hash dht_put_item(entry data);
// store an immutable item. The ``key`` is the public key the blob is
// to be stored under. The optional ``salt`` argument is a string that
// is to be mixed in with the key when determining where in the DHT
// the value is to be stored. The callback function is called from within
// the libtorrent network thread once we've found where to store the blob,
// possibly with the current value stored under the key.
// The values passed to the callback functions are:
//
// entry& value
// the current value stored under the key (may be empty). Also expected
// to be set to the value to be stored by the function.
//
// boost::array<char,64>& signature
// the signature authenticating the current value. This may be zeroes
// if there is currently no value stored. The functon is expected to
// fill in this buffer with the signature of the new value to store.
// To generate the signature, you may want to use the
// ``sign_mutable_item`` function.
//
// boost::uint64_t& seq
// current sequence number. May be zero if there is no current value.
// The function is expected to set this to the new sequence number of
// the value that is to be stored. Sequence numbers must be monotonically
// increasing. Attempting to overwrite a value with a lower or equal
// sequence number will fail, even if the signature is correct.
//
// std::string const& salt
// this is the salt that was used for this put call.
//
// Since the callback function ``cb`` is called from within libtorrent,
// it is critical to not perform any blocking operations. Ideally not
// even locking a mutex. Pass any data required for this function along
// with the function object's context and make the function entirely
// self-contained. The only reason data blobs' values are computed
// via a function instead of just passing in the new value is to avoid
// race conditions. If you want to *update* the value in the DHT, you
// must first retrieve it, then modify it, then write it back. The way
// the DHT works, it is natural to always do a lookup before storing and
// calling the callback in between is convenient.
void dht_put_item(boost::array<char, 32> key
, boost::function<void(entry&, boost::array<char,64>&
, boost::uint64_t&, std::string const&)> cb
, std::string salt = std::string());
#ifndef TORRENT_NO_DEPRECATE
// deprecated in 0.15
// use save_state and load_state instead

View File

@ -579,5 +579,26 @@ namespace libtorrent {
return msg;
}
std::string dht_immutable_item_alert::message() const
{
char msg[1050];
snprintf(msg, sizeof(msg), "DHT immutable item %s [ %s ]"
, to_hex(target.to_string()).c_str()
, item.to_string().c_str());
return msg;
}
std::string dht_mutable_item_alert::message() const
{
char msg[1050];
snprintf(msg, sizeof(msg), "DHT mutable item (key=%s salt=%s seq=%" PRId64 ") [ %s ]"
, to_hex(std::string(&key[0], 32)).c_str()
, salt.c_str()
, seq
, item.to_string().c_str());
return msg;
}
} // namespace libtorrent

View File

@ -422,6 +422,87 @@ namespace libtorrent { namespace dht
m_dht.announce(ih, listen_port, flags, f);
}
// these functions provide a slightly higher level
// interface to the get/put functionality in the DHT
bool get_immutable_item_callback(item& it, boost::function<void(item const&)> f)
{
// the reason to wrap here is to control the return value
// since it controls whether we re-put the content
TORRENT_ASSERT(!it.is_mutable());
f(it);
return false;
}
bool get_mutable_item_callback(item& it, boost::function<void(item const&)> f)
{
// the reason to wrap here is to control the return value
// since it controls whether we re-put the content
TORRENT_ASSERT(it.is_mutable());
f(it);
return false;
}
bool put_immutable_item_callback(item& it, boost::function<void()> f
, entry data)
{
TORRENT_ASSERT(!it.is_mutable());
it.assign(data);
// TODO: ideally this function would be called when the
// put completes
f();
return true;
}
bool put_mutable_item_callback(item& it, boost::function<void(item&)> cb)
{
cb(it);
return true;
}
void dht_tracker::get_item(sha1_hash const& target
, boost::function<void(item const&)> cb)
{
m_dht.get_item(target, boost::bind(&get_immutable_item_callback, _1, cb));
}
// key is a 32-byte binary string, the public key to look up.
// the salt is optional
void dht_tracker::get_item(char const* key
, boost::function<void(item const&)> cb
, std::string salt)
{
sha1_hash target = item_target_id(
std::pair<char const*, int>(NULL, 0)
, std::pair<char const*, int>(salt.c_str(), salt.size())
, key);
m_dht.get_item(target, boost::bind(&get_mutable_item_callback, _1, cb));
}
void dht_tracker::put_item(entry data
, boost::function<void()> cb)
{
std::string flat_data;
bencode(std::back_inserter(flat_data), data);
sha1_hash target = item_target_id(
std::pair<char const*, int>(flat_data.c_str(), flat_data.size())
, std::pair<char const*, int>(NULL, 0), NULL);
m_dht.get_item(target, boost::bind(&put_immutable_item_callback
, _1, cb, data));
}
void dht_tracker::put_item(char const* key
, boost::function<void(item&)> cb, std::string salt)
{
sha1_hash target = item_target_id(
std::pair<char const*, int>(NULL, 0)
, std::pair<char const*, int>(salt.c_str(), salt.size())
, key);
m_dht.get_item(target, boost::bind(&put_mutable_item_callback
, _1, cb));
}
// translate bittorrent kademlia message into the generice kademlia message
// used by the library

View File

@ -47,6 +47,8 @@ void get_item::got_data(lazy_entry const* v,
boost::uint64_t seq,
char const* sig)
{
// we received data!
std::pair<char const*, int> salt(m_salt.c_str(), m_salt.size());
sha1_hash incoming_target = item_target_id(v->data_section(), salt, pk);
@ -54,6 +56,9 @@ void get_item::got_data(lazy_entry const* v,
if (pk && sig)
{
// this is mutable data. If it passes the signature
// check, remember it. Just keep the version with
// the highest sequence number.
if (m_data.empty() || m_data.seq() < seq)
{
if (!m_data.assign(v, salt, seq, pk, sig))
@ -62,8 +67,15 @@ void get_item::got_data(lazy_entry const* v,
}
else if (m_data.empty())
{
// this is the first time we receive data,
// and it's immutable
m_data.assign(v);
bool put_requested = m_data_callback(m_data);
// if we intend to put, we need to keep going
// until we find the closest nodes, since those
// are the ones we're putting to
if (put_requested)
{
#if TORRENT_USE_ASSERTS
@ -71,6 +83,10 @@ void get_item::got_data(lazy_entry const* v,
bencode(std::back_inserter(buffer), m_data.value());
TORRENT_ASSERT(m_target == hasher(&buffer[0], buffer.size()).final());
#endif
// this function is called when we're done, passing
// in all relevant nodes we received data from close
// to the target.
m_nodes_callback = boost::bind(&get_item::put, this, _1);
}
else
@ -126,13 +142,17 @@ void get_item::done()
{
if (m_data.is_mutable() || m_data.empty())
{
// for mutable data, we only call the callback at the end,
// when we've heard from everyone, to be sure we got the
// latest version of the data (i.e. highest sequence number)
bool put_requested = m_data_callback(m_data);
if (put_requested)
{
#if TORRENT_USE_ASSERTS
if (m_data.is_mutable())
{
TORRENT_ASSERT(m_target == hasher(m_data.pk(), item_pk_len).final());
TORRENT_ASSERT(m_target == hasher(m_data.pk().data(),
item_pk_len).final());
}
else
{
@ -141,12 +161,19 @@ void get_item::done()
TORRENT_ASSERT(m_target == hasher(&buffer[0], buffer.size()).final());
}
#endif
// this function is called when we're done, passing
// in all relevant nodes we received data from close
// to the target.
m_nodes_callback = boost::bind(&get_item::put, this, _1);
}
}
find_data::done();
}
// this function sends a put message to the nodes
// closest to the target. Those nodes are passed in
// as the v argument
void get_item::put(std::vector<std::pair<node_entry, std::string> > const& v)
{
#ifdef TORRENT_DHT_VERBOSE_LOGGING
@ -181,9 +208,9 @@ void get_item::put(std::vector<std::pair<node_entry, std::string> > const& v)
a["token"] = i->second;
if (m_data.is_mutable())
{
a["k"] = std::string(m_data.pk(), item_pk_len);
a["k"] = std::string(m_data.pk().data(), item_pk_len);
a["seq"] = m_data.seq();
a["sig"] = std::string(m_data.sig(), item_sig_len);
a["sig"] = std::string(m_data.sig().data(), item_sig_len);
}
m_node.m_rpc.invoke(e, i->first.ep(), o);
}

View File

@ -120,6 +120,12 @@ bool verify_mutable_item(
(unsigned char const*)pk) == 1;
}
// given the bencoded buffer ``v``, the salt (which is optional and may have
// a length of zero to be omitted), sequence number ``seq``, public key (32
// bytes ed25519 key) ``pk`` and a secret/private key ``sk`` (64 bytes ed25519
// key) a signature ``sig`` is produced. The ``sig`` pointer must point to
// at least 64 bytes of available space. This space is where the signature is
// written.
void sign_mutable_item(
std::pair<char const*, int> v,
std::pair<char const*, int> salt,
@ -178,8 +184,8 @@ void item::assign(entry const& v, std::pair<char const*, int> salt
int bsize = bencode(buffer, v);
TORRENT_ASSERT(bsize <= 1000);
sign_mutable_item(std::make_pair(buffer, bsize)
, salt, seq, pk, sk, m_sig);
memcpy(m_pk, pk, item_pk_len);
, salt, seq, pk, sk, m_sig.c_array());
memcpy(m_pk.c_array(), pk, item_pk_len);
m_seq = seq;
m_mutable = true;
}
@ -196,10 +202,12 @@ bool item::assign(lazy_entry const* v
{
if (!verify_mutable_item(v->data_section(), salt, seq, pk, sig))
return false;
memcpy(m_pk, pk, item_pk_len);
memcpy(m_sig, sig, item_sig_len);
memcpy(m_pk.c_array(), pk, item_pk_len);
memcpy(m_sig.c_array(), sig, item_sig_len);
if (salt.second > 0)
m_salt.assign(salt.first, salt.second);
else
m_salt.clear();
m_seq = seq;
m_mutable = true;
}
@ -210,6 +218,17 @@ bool item::assign(lazy_entry const* v
return true;
}
void item::assign(entry const& v, std::string salt, boost::uint64_t seq
, char const* pk, char const* sig)
{
memcpy(m_pk.c_array(), pk, item_pk_len);
memcpy(m_sig.c_array(), sig, item_sig_len);
m_salt = salt;
m_seq = seq;
m_mutable = true;
m_value = v;
}
sha1_hash item::cas()
{
TORRENT_ASSERT(m_mutable);

View File

@ -332,6 +332,9 @@ namespace libtorrent
#define TORRENT_ASYNC_CALL2(x, a1, a2) \
m_impl->m_io_service.dispatch(boost::bind(&session_impl:: x, m_impl.get(), a1, a2))
#define TORRENT_ASYNC_CALL3(x, a1, a2, a3) \
m_impl->m_io_service.dispatch(boost::bind(&session_impl:: x, m_impl.get(), a1, a2, a3))
#define TORRENT_WAIT \
mutex::scoped_lock l(m_impl->mut); \
while (!done) { m_impl->cond.wait(l); };
@ -877,6 +880,43 @@ namespace libtorrent
#endif
}
void session::dht_get_item(sha1_hash const& target)
{
#ifndef TORRENT_DISABLE_DHT
TORRENT_ASYNC_CALL1(dht_get_immutable_item, target);
#endif
}
void session::dht_get_item(boost::array<char, 32> key
, std::string salt)
{
#ifndef TORRENT_DISABLE_DHT
TORRENT_ASYNC_CALL2(dht_get_mutable_item, key, salt);
#endif
}
sha1_hash session::dht_put_item(entry data)
{
std::vector<char> buf;
bencode(std::back_inserter(buf), data);
sha1_hash ret = hasher(&buf[0], buf.size()).final();
#ifndef TORRENT_DISABLE_DHT
TORRENT_ASYNC_CALL1(dht_put_item, data);
#endif
return ret;
}
void session::dht_put_item(boost::array<char, 32> key
, boost::function<void(entry&, boost::array<char,64>&
, boost::uint64_t&, std::string const&)> cb
, std::string salt)
{
#ifndef TORRENT_DISABLE_DHT
TORRENT_ASYNC_CALL3(dht_put_mutable_item, key, cb, salt);
#endif
}
void session::set_pe_settings(pe_settings const& settings)
{
#ifndef TORRENT_DISABLE_ENCRYPTION

View File

@ -106,6 +106,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include <libtorrent/kademlia/refresh.hpp>
#include <libtorrent/kademlia/node.hpp>
#include <libtorrent/kademlia/observer.hpp>
#include <libtorrent/kademlia/item.hpp>
#endif // TORRENT_DISABLE_DHT
#include "libtorrent/http_tracker_connection.hpp"
@ -5771,6 +5772,70 @@ retry:
++host;
}
}
// callback for dht_immutable_get
void session_impl::get_immutable_callback(sha1_hash target
, dht::item const& i)
{
TORRENT_ASSERT(!i.is_mutable());
m_alerts.post_alert(dht_immutable_item_alert(target, i.value()));
}
void session_impl::dht_get_immutable_item(sha1_hash const& target)
{
if (!m_dht) return;
m_dht->get_item(target, boost::bind(&session_impl::get_immutable_callback
, this, target, _1));
}
// callback for dht_mutable_get
void session_impl::get_mutable_callback(dht::item const& i)
{
TORRENT_ASSERT(i.is_mutable());
m_alerts.post_alert(dht_mutable_item_alert(i.pk(), i.sig(), i.seq()
, i.salt(), i.value()));
}
// key is a 32-byte binary string, the public key to look up.
// the salt is optional
void session_impl::dht_get_mutable_item(boost::array<char, 32> key
, std::string salt)
{
if (!m_dht) return;
m_dht->get_item(key.data(), boost::bind(&session_impl::get_mutable_callback
, this, _1), salt);
}
void nop() {}
void session_impl::dht_put_item(entry data)
{
if (!m_dht) return;
m_dht->put_item(data, boost::bind(&nop));
}
void put_mutable_callback(dht::item& i
, boost::function<void(entry&, boost::array<char,64>&
, boost::uint64_t&, std::string const&)> cb)
{
entry value = i.value();
boost::array<char, 64> sig = i.sig();
boost::array<char, 32> pk = i.pk();
boost::uint64_t seq = i.seq();
std::string salt = i.salt();
cb(value, sig, seq, salt);
i.assign(value, salt, seq, pk.data(), sig.data());
}
void session_impl::dht_put_mutable_item(boost::array<char, 32> key
, boost::function<void(entry&, boost::array<char,64>&
, boost::uint64_t&, std::string const&)> cb
, std::string salt)
{
if (!m_dht) return;
m_dht->put_item(key.data(), boost::bind(&put_mutable_callback, _1, cb), salt);
}
#endif
void session_impl::maybe_update_udp_mapping(int nat, int local_port, int external_port)

View File

@ -1635,8 +1635,8 @@ int test_main()
if (g_got_items.empty()) break;
TEST_EQUAL(g_got_items.front().value(), items[0].ent);
TEST_CHECK(memcmp(g_got_items.front().pk(), public_key, item_pk_len) == 0);
TEST_CHECK(memcmp(g_got_items.front().sig(), signature, item_sig_len) == 0);
TEST_CHECK(memcmp(g_got_items.front().pk().data(), public_key, item_pk_len) == 0);
TEST_CHECK(memcmp(g_got_items.front().sig().data(), signature, item_sig_len) == 0);
TEST_EQUAL(g_got_items.front().seq(), seq);
g_got_items.clear();
@ -1761,7 +1761,7 @@ int test_main()
sha1_hash target = hasher(public_key, item_pk_len).final();
g_put_item.assign(items[0].ent, empty_salt, seq, public_key, private_key);
std::string sig(g_put_item.sig(), item_sig_len);
std::string sig(g_put_item.sig().data(), item_sig_len);
node.get_item(target, get_item_cb);
TEST_EQUAL(g_sent_packets.size(), num_test_nodes);