remove old search code from DHT. modify announce-item to fit with more recent proposal of get/put. Only immutable entries for now

This commit is contained in:
Arvid Norberg 2011-05-23 05:07:52 +00:00
parent 58d723012a
commit 6830eb10a9
4 changed files with 89 additions and 415 deletions

View File

@ -93,8 +93,8 @@ struct key_desc_t
};
};
bool TORRENT_EXPORT verify_message(lazy_entry const* msg, key_desc_t const desc[], lazy_entry const* ret[]
, int size , char* error, int error_size);
bool TORRENT_EXPORT verify_message(lazy_entry const* msg, key_desc_t const desc[]
, lazy_entry const* ret[], int size , char* error, int error_size);
// this is the entry for every peer
// the timestamp is there to make it possible
@ -113,64 +113,21 @@ struct torrent_entry
std::set<peer_entry> peers;
};
struct feed_item
struct dht_immutable_item
{
feed_item() : sequence_number(0), num_announcers(0) {}
enum { list_head, list_item } type;
size_type sequence_number;
std::string name;
unsigned char signature[64];
entry item;
ptime last_seen;
dht_immutable_item() : value(0), num_announcers(0) {}
// malloced space for the actual value
char* value;
// this counts the number of IPs we have seen
// announcing this item, this is used to determine
// popularity if we reach the limit of items to store
bloom_filter<8> ips;
bloom_filter<128> ips;
// the last time we heard about this
ptime last_seen;
// number of IPs in the bloom filter
int num_announcers;
};
// this is the entry for a torrent that has been published
// in the DHT.
struct TORRENT_EXPORT search_torrent_entry
{
search_torrent_entry(): total_tag_points(0), total_name_points(0) {}
// the tags of the torrent. The key of
// this entry is the sha-1 hash of one of
// these tags. The counter is the number of
// times a tag has been included in a publish
// call. The counters are periodically
// decremented by a factor, so that the
// popularity ratio between the tags is
// maintained. The decrement is rounded down.
std::map<std::string, int> tags;
// this is the sum of all values in the tags
// map. It is only an optimization to avoid
// recalculating it constantly
int total_tag_points;
// the name of the torrent
std::map<std::string, int> name;
int total_name_points;
// increase the popularity counters for this torrent
void publish(std::string const& name, char const* in_tags[], int num_tags);
// return a score of how well this torrent matches
// the given set of tags. Each word in the string
// (separated by a space) is considered a tag.
// tags with 2 letters or fewer are ignored
int match(char const* tags[], int num_tags) const;
// this is called once every hour, and will
// decrement the popularity counters of the
// tags. Returns true if this entry should
// be deleted
bool tick();
void get_name(std::string& t) const;
void get_tags(std::string& t) const;
// size of malloced space pointed to by value
int size;
};
inline bool operator<(peer_entry const& lhs, peer_entry const& rhs)
@ -207,8 +164,7 @@ struct count_peers
class node_impl : boost::noncopyable
{
typedef std::map<node_id, torrent_entry> table_t;
typedef std::map<node_id, feed_item> feed_table_t;
typedef std::map<std::pair<node_id, sha1_hash>, search_torrent_entry> search_table_t;
typedef std::map<node_id, dht_immutable_item> dht_immutable_table_t;
public:
typedef boost::function3<void, address, int, address> external_ip_fun;
@ -320,8 +276,7 @@ public:
private:
table_t m_map;
feed_table_t m_feeds;
search_table_t m_search_map;
dht_immutable_table_t m_immutable_table;
ptime m_last_tracker_tick;

View File

@ -1087,8 +1087,8 @@ namespace libtorrent
, service_port(0)
#endif
, max_fail_count(20)
, max_torrents(3000)
, max_feed_items(3000)
, max_torrents(2000)
, max_dht_items(700)
, max_torrent_search_reply(20)
, restrict_routing_ips(true)
, restrict_search_ips(true)
@ -1115,8 +1115,8 @@ namespace libtorrent
// this is the max number of torrents the DHT will track
int max_torrents;
// max number of feed items the DHT will store
int max_feed_items;
// max number of items the DHT will store
int max_dht_items;
// the max number of torrents to return in a
// torrent search query to the DHT

View File

@ -37,6 +37,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include <boost/function/function1.hpp>
#include "libtorrent/io.hpp"
#include "libtorrent/bencode.hpp"
#include "libtorrent/hasher.hpp"
#include "libtorrent/alert_types.hpp"
#include "libtorrent/alert.hpp"
@ -58,96 +59,6 @@ void incoming_error(entry& e, char const* msg);
using detail::write_endpoint;
int search_torrent_entry::match(char const* in_tags[], int num_tags) const
{
int ret = 0;
for (int i = 0; i < num_tags; ++i)
{
char const* t = in_tags[i];
std::map<std::string, int>::const_iterator j = tags.find(t);
if (j == tags.end()) continue;
// weigh the score by how popular this tag is in this torrent
ret += 100 * j->second / total_tag_points;
}
return ret;
}
bool search_torrent_entry::tick()
{
int sum = 0;
for (std::map<std::string, int>::iterator i = tags.begin()
, end(tags.end()); i != end;)
{
i->second = (i->second * 2) / 3;
sum += i->second;
if (i->second > 0) { ++i; continue; }
tags.erase(i++);
}
total_tag_points = sum;
sum = 0;
for (std::map<std::string, int>::iterator i = name.begin()
, end(name.end()); i != end;)
{
i->second = (i->second * 2) / 3;
sum += i->second;
if (i->second > 0) { ++i; continue; }
name.erase(i++);
}
total_name_points = sum;
return total_tag_points == 0;
}
void search_torrent_entry::publish(std::string const& torrent_name, char const* in_tags[]
, int num_tags)
{
for (int i = 0; i < num_tags; ++i)
{
char const* t = in_tags[i];
std::map<std::string, int>::iterator j = tags.find(t);
if (j != tags.end())
++j->second;
else
tags[t] = 1;
++total_tag_points;
// TODO: limit the number of tags
}
name[torrent_name] += 1;
++total_name_points;
// TODO: limit the number of names
}
void search_torrent_entry::get_name(std::string& t) const
{
std::map<std::string, int>::const_iterator max = name.begin();
for (std::map<std::string, int>::const_iterator i = name.begin()
, end(name.end()); i != end; ++i)
{
if (i->second > max->second) max = i;
}
t = max->first;
}
void search_torrent_entry::get_tags(std::string& t) const
{
for (std::map<std::string, int>::const_iterator i = tags.begin()
, end(tags.end()); i != end; ++i)
{
if (i != tags.begin()) t += " ";
t += i->first;
}
}
#ifdef _MSC_VER
namespace
{
char rand() { return (char)std::rand(); }
}
#endif
// TODO: configurable?
enum { announce_interval = 30 };
@ -435,14 +346,16 @@ time_duration node_impl::connection_timeout()
if (now - m_last_tracker_tick < minutes(2)) return d;
m_last_tracker_tick = now;
for (feed_table_t::iterator i = m_feeds.begin(); i != m_feeds.end();)
for (dht_immutable_table_t::iterator i = m_immutable_table.begin();
i != m_immutable_table.end();)
{
if (i->second.last_seen + minutes(60) > now)
{
++i;
continue;
}
m_feeds.erase(i++);
free(i->second.value);
m_immutable_table.erase(i++);
}
// look through all peers and see if any have timed out
@ -481,53 +394,6 @@ void node_impl::status(session_status& s)
}
}
bool node_impl::lookup_torrents(sha1_hash const& target
, entry& reply, char* tags) const
{
// if (m_alerts.should_post<dht_find_torrents_alert>())
// m_alerts.post_alert(dht_find_torrents_alert(info_hash));
search_table_t::const_iterator first, last;
first = m_search_map.lower_bound(std::make_pair(target, (sha1_hash::min)()));
last = m_search_map.upper_bound(std::make_pair(target, (sha1_hash::max)()));
if (first == last) return false;
std::string tags_copy(tags);
char const* in_tags[20];
int num_tags = 0;
num_tags = split_string(in_tags, 20, &tags_copy[0]);
typedef std::pair<int, search_table_t::const_iterator> sort_item;
std::vector<sort_item> result;
for (; first != last; ++first)
{
result.push_back(std::make_pair(
first->second.match(in_tags, num_tags), first));
}
std::sort(result.begin(), result.end()
, boost::bind(&sort_item::first, _1) > boost::bind(&sort_item::first, _2));
int num = (std::min)((int)result.size(), m_settings.max_torrent_search_reply);
entry::list_type& pe = reply["values"].list();
for (int i = 0; i < num; ++i)
{
pe.push_back(entry());
entry::list_type& e = pe.back().list();
// push name
e.push_back(entry());
result[i].second->second.get_name(e.back().string());
// push tags
e.push_back(entry());
result[i].second->second.get_tags(e.back().string());
// push info-hash
e.push_back(entry());
e.back().string() = result[i].second->first.second.to_string();
}
return true;
}
void node_impl::lookup_peers(sha1_hash const& info_hash, int prefix, entry& reply
, bool noseed, bool scrape) const
{
@ -654,10 +520,11 @@ bool verify_message(lazy_entry const* msg, key_desc_t const desc[], lazy_entry c
// fprintf(stderr, "looking for %s in %s\n", k.name, print_entry(*msg).c_str());
ret[i] = msg->dict_find(k.name);
if (ret[i] && ret[i]->type() != k.type) ret[i] = 0;
// none_t means any type
if (ret[i] && ret[i]->type() != k.type && k.type != lazy_entry::none_t) ret[i] = 0;
if (ret[i] == 0 && (k.flags & key_desc_t::optional) == 0)
{
// the key was not found, and it's not an optiona key
// the key was not found, and it's not an optional key
snprintf(error, error_size, "missing '%s' key", k.name);
return false;
}
@ -917,14 +784,14 @@ void node_impl::incoming_request(msg const& m, entry& e)
++g_announces;
#endif
}
/*
else if (strcmp(query, "find_torrent") == 0)
else if (strcmp(query, "put") == 0)
{
key_desc_t msg_desc[] = {
{"target", lazy_entry::string_t, 20, 0},
{"tags", lazy_entry::string_t, 0, 0},
const static key_desc_t msg_desc[] = {
{"token", lazy_entry::string_t, 0, 0},
{"v", lazy_entry::none_t, 0, 0},
};
// attempt to parse the message
lazy_entry const* msg_keys[2];
if (!verify_message(arg_ent, msg_desc, msg_keys, 2, error_string, sizeof(error_string)))
{
@ -932,134 +799,53 @@ void node_impl::incoming_request(msg const& m, entry& e)
return;
}
reply["token"] = generate_token(m.addr, msg_keys[0]->string_ptr());
sha1_hash target(msg_keys[0]->string_ptr());
nodes_t n;
// always return nodes as well as torrents
m_table.find_node(target, n, 0);
write_nodes_entry(reply, n);
lookup_torrents(target, reply, (char*)msg_keys[1]->string_cstr());
}
*/
else if (strcmp(query, "announce_item") == 0)
{
feed_item add_item;
const static key_desc_t msg_desc[] = {
{"target", lazy_entry::string_t, 20, 0},
{"token", lazy_entry::string_t, 0, 0},
{"sig", lazy_entry::string_t, sizeof(add_item.signature), 0},
{"head", lazy_entry::dict_t, 0, key_desc_t::optional | key_desc_t::parse_children},
{"n", lazy_entry::string_t, 0, 0},
{"key", lazy_entry::string_t, 64, 0},
{"seq", lazy_entry::int_t, 0, 0},
{"next", lazy_entry::string_t, 20, key_desc_t::last_child | key_desc_t::size_divisible},
{"item", lazy_entry::dict_t, 0, key_desc_t::optional | key_desc_t::parse_children},
{"key", lazy_entry::string_t, 64, 0},
{"next", lazy_entry::string_t, 20, key_desc_t::last_child | key_desc_t::size_divisible},
};
// attempt to parse the message
lazy_entry const* msg_keys[11];
if (!verify_message(arg_ent, msg_desc, msg_keys, 11, error_string, sizeof(error_string)))
{
incoming_error(e, error_string);
return;
}
sha1_hash target(msg_keys[0]->string_ptr());
// verify the write-token
if (!verify_token(msg_keys[1]->string_value(), msg_keys[0]->string_ptr(), m.addr))
{
incoming_error(e, "invalid token");
return;
}
sha1_hash expected_target;
sha1_hash item_hash;
std::pair<char const*, int> buf;
if (msg_keys[3])
{
// we found the "head" entry
add_item.type = feed_item::list_head;
add_item.item = *msg_keys[3];
add_item.name = msg_keys[4]->string_value();
add_item.sequence_number = msg_keys[6]->int_value();
buf = msg_keys[3]->data_section();
item_hash = hasher(buf.first, buf.second).final();
hasher h;
h.update(add_item.name);
h.update((const char*)msg_keys[5]->string_ptr(), msg_keys[5]->string_length());
expected_target = h.final();
}
else if (msg_keys[8])
{
// we found the "item" entry
add_item.type = feed_item::list_item;
add_item.item = *msg_keys[8];
buf = msg_keys[8]->data_section();
item_hash = hasher(buf.first, buf.second).final();
expected_target = item_hash;
}
else
{
incoming_error(e, "missing head or item");
return;
}
if (buf.second > 1024)
// pointer and length to the whole entry
std::pair<char const*, int> buf = msg_keys[1]->data_section();
if (buf.second > 767 || buf.second <= 0)
{
incoming_error(e, "message too big");
return;
}
// verify that the key matches the target
if (expected_target != target)
sha1_hash target = hasher(buf.first, buf.second).final();
// verify the write-token. tokens are only valid to write to
// specific target hashes. it must match the one we got a "get" for
if (!verify_token(msg_keys[0]->string_value(), (char const*)&target[0], m.addr))
{
incoming_error(e, "invalid target");
incoming_error(e, "invalid token");
return;
}
memcpy(add_item.signature, msg_keys[2]->string_ptr(), sizeof(add_item.signature));
// #error verify signature by comparing it to item_hash
m_table.node_seen(id, m.addr);
feed_table_t::iterator i = m_feeds.find(target);
if (i == m_feeds.end())
dht_immutable_table_t::iterator i = m_immutable_table.find(target);
if (i == m_immutable_table.end())
{
// make sure we don't add too many items
if (int(m_feeds.size()) >= m_settings.max_feed_items)
if (int(m_immutable_table.size()) >= m_settings.max_dht_items)
{
// delete the least important one (i.e. the one
// the fewest peers are announcing)
feed_table_t::iterator j = std::min_element(m_feeds.begin(), m_feeds.end()
, boost::bind(&feed_item::num_announcers
, boost::bind(&feed_table_t::value_type::second, _1)));
TORRENT_ASSERT(j != m_feeds.end());
// std::cerr << " removing: " << i->second.item << std::endl;
m_feeds.erase(j);
dht_immutable_table_t::iterator j = std::min_element(m_immutable_table.begin()
, m_immutable_table.end()
, boost::bind(&dht_immutable_item::num_announcers
, boost::bind(&dht_immutable_table_t::value_type::second, _1)));
TORRENT_ASSERT(j != m_immutable_table.end());
m_immutable_table.erase(j);
}
boost::tie(i, boost::tuples::ignore) = m_feeds.insert(std::make_pair(target, add_item));
dht_immutable_item to_add;
to_add.value = (char*)malloc(buf.second);
to_add.size = buf.second;
memcpy(to_add.value, buf.first, buf.second);
boost::tie(i, boost::tuples::ignore) = m_immutable_table.insert(
std::make_pair(target, to_add));
}
feed_item& f = i->second;
if (f.type != add_item.type) return;
dht_immutable_item& f = i->second;
m_table.node_seen(id, m.addr);
f.last_seen = time_now();
if (add_item.sequence_number > f.sequence_number)
{
f.item.swap(add_item.item);
f.name.swap(add_item.name);
f.sequence_number = add_item.sequence_number;
memcpy(f.signature, add_item.signature, sizeof(f.signature));
}
// maybe increase num_announcers if we haven't seen this IP before
sha1_hash iphash;
@ -1070,17 +856,18 @@ void node_impl::incoming_request(msg const& m, entry& e)
++f.num_announcers;
}
}
else if (strcmp(query, "get_item") == 0)
else if (strcmp(query, "get") == 0)
{
key_desc_t msg_desc[] = {
{"target", lazy_entry::string_t, 20, 0},
{"key", lazy_entry::string_t, 64, 0},
{"n", lazy_entry::string_t, 0, key_desc_t::optional},
{"k", lazy_entry::string_t, 0, key_desc_t::optional},
};
// k is not used for now
// attempt to parse the message
lazy_entry const* msg_keys[3];
if (!verify_message(arg_ent, msg_desc, msg_keys, 3, error_string, sizeof(error_string)))
lazy_entry const* msg_keys[2];
if (!verify_message(arg_ent, msg_desc, msg_keys, 2, error_string, sizeof(error_string)))
{
incoming_error(e, error_string);
return;
@ -1088,21 +875,6 @@ void node_impl::incoming_request(msg const& m, entry& e)
sha1_hash target(msg_keys[0]->string_ptr());
// verify that the key matches the target
// we can only do this for list heads, where
// we have the name.
if (msg_keys[2])
{
hasher h;
h.update(msg_keys[2]->string_ptr(), msg_keys[2]->string_length());
h.update(msg_keys[1]->string_ptr(), msg_keys[1]->string_length());
if (h.final() != target)
{
incoming_error(e, "invalid target");
return;
}
}
reply["token"] = generate_token(m.addr, msg_keys[0]->string_ptr());
nodes_t n;
@ -1110,69 +882,13 @@ void node_impl::incoming_request(msg const& m, entry& e)
m_table.find_node(target, n, 0);
write_nodes_entry(reply, n);
feed_table_t::iterator i = m_feeds.find(target);
if (i != m_feeds.end())
dht_immutable_table_t::iterator i = m_immutable_table.find(target);
if (i != m_immutable_table.end())
{
feed_item const& f = i->second;
if (f.type == feed_item::list_head)
reply["head"] = f.item;
else
reply["item"] = f.item;
reply["sig"] = std::string((char*)f.signature, sizeof(f.signature));
dht_immutable_item const& f = i->second;
reply["v"] = bdecode(f.value, f.value + f.size);
}
}
/*
else if (strcmp(query, "announce_torrent") == 0)
{
key_desc_t msg_desc[] = {
{"target", lazy_entry::string_t, 20, 0},
{"info_hash", lazy_entry::string_t, 20, 0},
{"name", lazy_entry::string_t, 0, 0},
{"tags", lazy_entry::string_t, 0, 0},
{"token", lazy_entry::string_t, 0, 0},
};
lazy_entry const* msg_keys[5];
if (!verify_message(arg_ent, msg_desc, msg_keys, 5, error_string, sizeof(error_string)))
{
incoming_error(e, error_string);
return;
}
// if (m_alerts.should_post<dht_announce_torrent_alert>())
// m_alerts.post_alert(dht_announce_torrent_alert(
// m.addr.address(), name, tags, info_hash));
if (!verify_token(msg_keys[4]->string_value(), msg_keys[0]->string_ptr(), m.addr))
{
incoming_error(e, "invalid token in announce");
return;
}
sha1_hash target(msg_keys[0]->string_ptr());
sha1_hash info_hash(msg_keys[1]->string_ptr());
// the token was correct. That means this
// node is not spoofing its address. So, let
// the table get a chance to add it.
m_table.node_seen(id, m.addr);
search_table_t::iterator i = m_search_map.find(std::make_pair(target, info_hash));
if (i == m_search_map.end())
{
boost::tie(i, boost::tuples::ignore)
= m_search_map.insert(std::make_pair(std::make_pair(target, info_hash)
, search_torrent_entry()));
}
char const* in_tags[20];
int num_tags = 0;
num_tags = split_string(in_tags, 20, (char*)msg_keys[3]->string_cstr());
i->second.publish(msg_keys[2]->string_value(), in_tags, num_tags);
}
*/
else
{
// if we don't recognize the message but there's a

View File

@ -75,7 +75,7 @@ static const std::string no;
void send_dht_msg(node_impl& node, char const* msg, udp::endpoint const& ep
, lazy_entry* reply, char const* t = "10", char const* info_hash = 0
, char const* name = 0, std::string const token = std::string(), int port = 0
, std::string const target = std::string(), entry const* item = 0
, std::string const target = std::string(), entry const* value = 0
, std::string const id = std::string()
, bool scrape = false, bool seed = false)
{
@ -93,7 +93,7 @@ void send_dht_msg(node_impl& node, char const* msg, udp::endpoint const& ep
if (!token.empty()) a["token"] = token;
if (port) a["port"] = port;
if (!target.empty()) a["target"] = target;
if (item) a["item"] = *item;
if (value) a["v"] = *value;
if (scrape) a["scrape"] = 1;
if (seed) a["seed"] = 1;
char msg_buf[1500];
@ -157,7 +157,7 @@ void announce_items(node_impl& node, udp::endpoint const* eps
{
if ((i % items[j].num_peers) == 0) continue;
lazy_entry response;
send_dht_msg(node, "get_item", eps[i], &response, "10", 0
send_dht_msg(node, "get", eps[i], &response, "10", 0
, 0, no, 0, items[j].target.to_string());
key_desc_t desc[] =
@ -181,7 +181,7 @@ void announce_items(node_impl& node, udp::endpoint const* eps
}
else
{
fprintf(stderr, " invalid get_item response: %s\n", error_string);
fprintf(stderr, " invalid get response: %s\n", error_string);
TEST_ERROR(error_string);
}
@ -193,10 +193,9 @@ void announce_items(node_impl& node, udp::endpoint const* eps
TEST_EQUAL(addr, eps[i].address());
}
send_dht_msg(node, "announce_item", eps[i], &response, "10", 0
send_dht_msg(node, "put", eps[i], &response, "10", 0
, 0, tokens[i], 0, items[j].target.to_string(), &items[j].ent
, std::string("0123456789012345678901234567890123456789012345678901234567890123"));
, ids[i].to_string());
key_desc_t desc2[] =
{
@ -211,7 +210,7 @@ void announce_items(node_impl& node, udp::endpoint const* eps
}
else
{
fprintf(stderr, " invalid announce_item response: %s\n", error_string);
fprintf(stderr, " invalid put response: %s\n", error_string);
TEST_ERROR(error_string);
}
}
@ -221,13 +220,14 @@ void announce_items(node_impl& node, udp::endpoint const* eps
for (int j = 0; j < num_items; ++j)
{
lazy_entry response;
send_dht_msg(node, "get_item", eps[0], &response, "10", 0
, 0, no, 0, items[j].target.to_string());
send_dht_msg(node, "get", eps[0], &response, "10", 0
, 0, no, 0, items[j].target.to_string(), 0
, ids[0].to_string());
key_desc_t desc[] =
{
{ "r", lazy_entry::dict_t, 0, key_desc_t::parse_children },
{ "item", lazy_entry::dict_t, 0, key_desc_t::parse_children},
{ "v", lazy_entry::dict_t, 0, key_desc_t::parse_children},
{ "A", lazy_entry::string_t, 1, 0},
{ "B", lazy_entry::string_t, 1, 0},
{ "num_peers", lazy_entry::int_t, 0, key_desc_t::last_child},
@ -238,7 +238,6 @@ void announce_items(node_impl& node, udp::endpoint const* eps
lazy_entry const* parsed[7];
char error_string[200];
fprintf(stderr, "msg: %s\n", print_entry(response).c_str());
int ret = verify_message(&response, desc, parsed, 7, error_string, sizeof(error_string));
if (ret)
{
@ -247,6 +246,10 @@ void announce_items(node_impl& node, udp::endpoint const* eps
TEST_EQUAL(parsed[3]->string_value(), "b");
items_num.insert(items_num.begin(), parsed[4]->int_value());
}
else
{
fprintf(stderr, "unexpected msg: %s\n", print_entry(response).c_str());
}
}
TEST_EQUAL(items_num.size(), 4);
@ -266,7 +269,7 @@ int test_main()
alert_manager al(ios, 100);
dht_settings sett;
sett.max_torrents = 4;
sett.max_feed_items = 4;
sett.max_dht_items = 4;
address ext = address::from_string("236.0.0.1");
dht::node_impl node(al, &our_send, sett, node_id(0), ext, boost::bind(nop, _1, _2, _3), 0);
@ -467,8 +470,8 @@ int test_main()
response.clear();
// ====== announce_item ======
/*
// ====== put ======
udp::endpoint eps[1000];
node_id ids[1000];
@ -494,7 +497,7 @@ int test_main()
items[i].gen();
announce_items(node, eps, ids, items, sizeof(items)/sizeof(items[0]));
*/
return 0;
}