From 327af3a69e85fc13fd5549d87d0344bc13f44428 Mon Sep 17 00:00:00 2001 From: Arvid Norberg Date: Thu, 9 Jun 2016 08:02:41 -0400 Subject: [PATCH] optimize allow-fast logic (#800) optimize allow-fast logic --- ChangeLog | 1 + include/libtorrent/bt_peer_connection.hpp | 4 + simulation/Jamfile | 1 + simulation/create_torrent.cpp | 5 +- simulation/create_torrent.hpp | 3 +- simulation/fake_peer.hpp | 70 +++++++- simulation/test_fast_extensions.cpp | 206 ++++++++++++++++++++++ simulation/test_swarm.cpp | 1 - src/bt_peer_connection.cpp | 19 +- src/peer_connection.cpp | 28 +-- 10 files changed, 311 insertions(+), 27 deletions(-) create mode 100644 simulation/test_fast_extensions.cpp diff --git a/ChangeLog b/ChangeLog index 4349f8b19..b15d224f1 100644 --- a/ChangeLog +++ b/ChangeLog @@ -21,6 +21,7 @@ 1.1.1 release + * optimize allow-fast logic * fix issue where FAST extension messages were not used during handshake * fixed crash on invalid input in http_parser * upgraded to libtommath 1.0 diff --git a/include/libtorrent/bt_peer_connection.hpp b/include/libtorrent/bt_peer_connection.hpp index 17998a038..9c6d667c1 100644 --- a/include/libtorrent/bt_peer_connection.hpp +++ b/include/libtorrent/bt_peer_connection.hpp @@ -351,6 +351,10 @@ private: // and can send bittorrent messages bool m_sent_handshake:1; + // set to true once we send the allowed-fast messages. This is + // only done once per connection + bool m_sent_allowed_fast:1; + #if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) // this is set to true after the encryption method has been // successfully negotiated (either plaintext or rc4), to signal diff --git a/simulation/Jamfile b/simulation/Jamfile index 744ca5762..7448c966f 100644 --- a/simulation/Jamfile +++ b/simulation/Jamfile @@ -43,5 +43,6 @@ alias libtorrent-sims : [ run test_tracker.cpp ] [ run test_ip_filter.cpp ] [ run test_dht_rate_limit.cpp ] + [ run test_fast_extensions.cpp ] ; diff --git a/simulation/create_torrent.cpp b/simulation/create_torrent.cpp index f5a8a07c7..2e3887c01 100644 --- a/simulation/create_torrent.cpp +++ b/simulation/create_torrent.cpp @@ -46,7 +46,8 @@ std::string save_path(int idx) return path; } -lt::add_torrent_params create_torrent(int idx, bool seed) +lt::add_torrent_params create_torrent(int const idx, bool const seed + , int const num_pieces) { // TODO: if we want non-seeding torrents, that could be a bit cheaper to // create @@ -60,7 +61,7 @@ lt::add_torrent_params create_torrent(int idx, bool seed) if (ec) std::fprintf(stderr, "failed to create directory: \"%s\": %s\n" , path.c_str(), ec.message().c_str()); std::ofstream file(lt::combine_path(path, name).c_str()); - params.ti = ::create_torrent(&file, name, 0x4000, 9 + idx, false); + params.ti = ::create_torrent(&file, name, 0x4000, num_pieces + idx, false); file.close(); // by setting the save path to a dummy path, it won't be seeding diff --git a/simulation/create_torrent.hpp b/simulation/create_torrent.hpp index ebb9bb870..e8785f0db 100644 --- a/simulation/create_torrent.hpp +++ b/simulation/create_torrent.hpp @@ -37,7 +37,8 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/add_torrent_params.hpp" std::string save_path(int idx); -libtorrent::add_torrent_params create_torrent(int idx, bool seed = true); +libtorrent::add_torrent_params create_torrent(int idx, bool seed = true + , int num_pieces = 9); #endif diff --git a/simulation/fake_peer.hpp b/simulation/fake_peer.hpp index ba9cb0230..4291e9ff5 100644 --- a/simulation/fake_peer.hpp +++ b/simulation/fake_peer.hpp @@ -36,6 +36,7 @@ POSSIBILITY OF SUCH DAMAGE. #include #include // for snprintf +#include #include "test.hpp" #include "simulator/simulator.hpp" #include "libtorrent/session.hpp" @@ -43,6 +44,7 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/torrent_handle.hpp" #include "libtorrent/sha1_hash.hpp" #include "libtorrent/torrent_info.hpp" +#include "libtorrent/io.hpp" using namespace sim; @@ -100,21 +102,56 @@ struct fake_peer bool connected() const { return m_connected; } bool disconnected() const { return m_disconnected; } + void send_interested() + { + m_send_buffer.resize(m_send_buffer.size() + 5); + char* ptr = m_send_buffer.data() + m_send_buffer.size() - 5; + + lt::detail::write_uint32(1, ptr); + lt::detail::write_uint8(2, ptr); + } + + void send_bitfield(std::vector const& pieces) + { + int const bytes = (pieces.size() + 7) / 8; + m_send_buffer.resize(m_send_buffer.size() + 5 + bytes); + char* ptr = m_send_buffer.data() + m_send_buffer.size() - 5 - bytes; + + lt::detail::write_uint32(1 + bytes, ptr); + lt::detail::write_uint8(5, ptr); + + boost::uint8_t b = 0; + int cnt = 7; + for (std::vector::const_iterator i = pieces.begin() + , end(pieces.end()); i != end; ++i) + { + if (*i) b |= 1 << cnt; + --cnt; + if (cnt < 0) + { + lt::detail::write_uint8(b, ptr); + b = 0; + cnt = 7; + } + } + lt::detail::write_uint8(b, ptr); + } + private: - void write_handshake(boost::system::error_code const& ec, lt::sha1_hash ih) + void write_handshake(boost::system::error_code const& ec + , lt::sha1_hash ih) { using namespace std::placeholders; - asio::ip::tcp::endpoint ep = m_socket.remote_endpoint(); + asio::ip::tcp::endpoint const ep = m_out_socket.remote_endpoint(); std::printf("fake_peer::connect (%s) -> (%d) %s\n" , lt::print_endpoint(ep).c_str(), ec.value() , ec.message().c_str()); static char const handshake[] = "\x13" "BitTorrent protocol\0\0\0\0\0\0\0\x04" " " // space for info-hash - "aaaaaaaaaaaaaaaaaaaa" // peer-id - "\0\0\0\x01\x02"; // interested + "aaaaaaaaaaaaaaaaaaaa"; // peer-id int const len = sizeof(handshake) - 1; memcpy(m_out_buffer, handshake, len); memcpy(&m_out_buffer[28], ih.data(), 20); @@ -125,8 +162,17 @@ private: std::printf("fake_peer::write_handshake(%s) -> (%d) %s\n" , lt::print_endpoint(ep).c_str(), ec.value() , ec.message().c_str()); - asio::async_read(m_socket, asio::mutable_buffers_1(&m_out_buffer[0], 68) - , std::bind(&fake_peer::read_handshake, this, _1, _2)); + if (!m_send_buffer.empty()) + { + asio::async_write(m_socket, asio::const_buffers_1( + m_send_buffer.data(), m_send_buffer.size()) + , std::bind(&fake_peer::write_send_buffer, this, _1, _2)); + } + else + { + asio::async_read(m_socket, asio::mutable_buffers_1(&m_out_buffer[0], 68) + , std::bind(&fake_peer::read_handshake, this, _1, _2)); + } }); } @@ -186,6 +232,16 @@ private: , std::bind(&fake_peer::on_read, this, _1, _2)); } + void write_send_buffer(boost::system::error_code const& ec + , size_t bytes_transferred) + { + printf("fake_peer::write_send_buffer() -> (%d) %s\n" + , ec.value(), ec.message().c_str()); + + asio::async_read(m_socket, asio::mutable_buffers_1(&m_out_buffer[0], 68) + , std::bind(&fake_peer::read_handshake, this, _1, _2)); + } + char m_out_buffer[300]; asio::io_service m_ios; @@ -202,6 +258,8 @@ private: // set to true if this peer has been disconnected by the other end bool m_disconnected; + + std::vector m_send_buffer; }; inline void add_fake_peer(lt::torrent_handle& h, int const i) diff --git a/simulation/test_fast_extensions.cpp b/simulation/test_fast_extensions.cpp new file mode 100644 index 000000000..ae3350e22 --- /dev/null +++ b/simulation/test_fast_extensions.cpp @@ -0,0 +1,206 @@ +/* + +Copyright (c) 2016, Arvid Norberg +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in + the documentation and/or other materials provided with the distribution. + * Neither the name of the author nor the names of its + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. + +*/ + +#include "test.hpp" +#include "utils.hpp" +#include "libtorrent/alert.hpp" +#include "libtorrent/alert_types.hpp" +#include "libtorrent/session.hpp" +#include "libtorrent/add_torrent_params.hpp" +#include "create_torrent.hpp" +#include "settings.hpp" +#include "fake_peer.hpp" +#include "setup_transfer.hpp" // for ep() +#include "simulator/utils.hpp" + +namespace lt = libtorrent; + +template +void run_fake_peer_test( + lt::add_torrent_params params + , Sett const& sett + , Alert const& alert) +{ + sim::default_config cfg; + sim::simulation sim{cfg}; + + sim::asio::io_service ios(sim, lt::address_v4::from_string("50.0.0.1")); + lt::session_proxy zombie; + + // setup settings pack to use for the session (customization point) + lt::settings_pack pack = settings(); + pack.set_str(lt::settings_pack::listen_interfaces, "0.0.0.0:6881"); + sett(pack); + // create session + std::shared_ptr ses = std::make_shared(pack, ios); + + fake_peer p1(sim, "60.0.0.0"); + + params.flags &= ~lt::add_torrent_params::flag_auto_managed; + params.flags &= ~lt::add_torrent_params::flag_paused; + ses->async_add_torrent(params); + + // the alert notification function is called from within libtorrent's + // context. It's not OK to talk to libtorrent in there, post it back out and + // then ask for alerts. + print_alerts(*ses, [&](lt::session& ses, lt::alert const* a) { + alert(ses, a, p1); + }); + + sim::timer t(sim, lt::seconds(1) + , [&](boost::system::error_code const& ec) + { + ses->set_alert_notify([]{}); + // shut down + zombie = ses->abort(); + + p1.close(); + + ses.reset(); + }); + + sim.run(); +} + +// make sure we consistently send the same allow-fast pieces, regardless +// of which pieces the peer has. +TORRENT_TEST(allow_fast) +{ + std::set allowed_fast; + + int const num_pieces = 50; + lt::add_torrent_params params = create_torrent(0, false, num_pieces); + std::vector bitfield(num_pieces, false); + + for (int i = 0; i < num_pieces + 1; ++i) + { + // just for this one session, to check for duplicates + std::set local_allowed_fast; + + run_fake_peer_test(params, [] (lt::settings_pack& pack) { + pack.set_int(lt::settings_pack::allowed_fast_set_size, 13); + } + , [&] (lt::session& ses, lt::alert const* a, fake_peer& p1) + { + if (auto at = lt::alert_cast(a)) + { + lt::torrent_handle h = at->handle; + p1.connect_to(ep("50.0.0.1", 6881) + , h.torrent_file()->info_hash()); + p1.send_bitfield(bitfield); + p1.send_interested(); + } + else if (auto l = lt::alert_cast(a)) + { + if (strcmp(l->event_type, "ALLOWED_FAST") != 0) return; + + int const piece = atoi(l->msg()); + // make sure we don't get the same allowed piece more than once + TEST_EQUAL(local_allowed_fast.count(piece), 0); + + // build the union of all allow-fast pieces we've received, across + // simulations. + allowed_fast.insert(piece); + local_allowed_fast.insert(piece); + + // make sure this is a valid piece + TEST_CHECK(piece < num_pieces); + TEST_CHECK(piece >= 0); + // and make sure it's not one of the pieces we have + // because that would be redundant + TEST_EQUAL(bitfield[piece], false); + } + }); + + // i goes from [0, mum_pieces + 1) to cover the have-none and have-all + // cases. After the last iteration, we can't add another piece. + if (i < int(bitfield.size())) + bitfield[i] = true; + } + + // we should never have sent any other pieces than the 13 designated for this + // peer's IP. + TEST_EQUAL(int(allowed_fast.size()), 13); +} + +// This tests a worst case scenario of allow-fast configuration where we must +// verify that libtorrent correctly aborts before satisfying the settings +// (because doing so would be too expensive) +// +// we have a torrent with a lot of pieces, and we want to send that many minus +// one allow-fast pieces. The way allow-fast pieces are computed is by hashing +// the peer's IP modulus the number of pieces. To actually compute which pieces +// to send (or which one piece _not_ to send) we would have to work hard through +// a lot of duplicates. This test makes sure we don't, and abort well before +// then +TORRENT_TEST(allow_fast_stress) +{ + std::set allowed_fast; + + int const num_pieces = 50000; + lt::add_torrent_params params = create_torrent(0, false, num_pieces); + + run_fake_peer_test(params, [&] (lt::settings_pack& pack) { + pack.set_int(lt::settings_pack::allowed_fast_set_size, num_pieces - 1); + } + , [&] (lt::session& ses, lt::alert const* a, fake_peer& p1) + { + if (auto at = lt::alert_cast(a)) + { + lt::torrent_handle h = at->handle; + p1.connect_to(ep("50.0.0.1", 6881) + , h.torrent_file()->info_hash()); + p1.send_interested(); + } + else if (auto l = lt::alert_cast(a)) + { + if (strcmp(l->event_type, "ALLOWED_FAST") != 0) return; + + int const piece = atoi(l->msg()); + + // make sure we don't get the same allowed piece more than once + TEST_EQUAL(allowed_fast.count(piece), 0); + + // build the union of all allow-fast pieces we've received, across + // simulations. + allowed_fast.insert(piece); + + // make sure this is a valid piece + TEST_CHECK(piece < num_pieces); + TEST_CHECK(piece >= 0); + } + }); + + std::printf("received %d allowed fast, out of %d configured ones\n" + , int(allowed_fast.size()), num_pieces - 1); + TEST_CHECK(int(allowed_fast.size()) < num_pieces / 80); +} diff --git a/simulation/test_swarm.cpp b/simulation/test_swarm.cpp index 7fcf96f39..f7a657d14 100644 --- a/simulation/test_swarm.cpp +++ b/simulation/test_swarm.cpp @@ -392,7 +392,6 @@ TORRENT_TEST(delete_partfile) // outgoing connections // TODO: add test that makes sure a torrent in graceful pause mode won't accept // incoming connections -// TODO: test allow-fast // TODO: test the different storage allocation modes // TODO: test contiguous buffers diff --git a/src/bt_peer_connection.cpp b/src/bt_peer_connection.cpp index 131bd2485..bca188ab0 100644 --- a/src/bt_peer_connection.cpp +++ b/src/bt_peer_connection.cpp @@ -168,6 +168,7 @@ namespace libtorrent , m_supports_fast(false) , m_sent_bitfield(false) , m_sent_handshake(false) + , m_sent_allowed_fast(false) #if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) , m_encrypted(false) , m_rc4_encrypted(false) @@ -424,6 +425,10 @@ namespace libtorrent if (!m_supports_fast) return; +#ifndef TORRENT_DISABLE_LOGGING + peer_log(peer_log_alert::outgoing_message, "ALLOWED_FAST", "%d", piece); +#endif + TORRENT_ASSERT(m_sent_handshake); TORRENT_ASSERT(m_sent_bitfield); TORRENT_ASSERT(associated_torrent().lock()->valid_metadata()); @@ -1009,6 +1014,15 @@ namespace libtorrent } if (!m_recv_buffer.packet_finished()) return; + // we defer sending the allowed set until the peer says it's interested in + // us. This saves some bandwidth and allows us to omit messages for pieces + // that the peer already has + if (!m_sent_allowed_fast && m_supports_fast) + { + m_sent_allowed_fast = true; + send_allowed_set(); + } + incoming_interested(); } @@ -2140,13 +2154,11 @@ namespace libtorrent else if (m_supports_fast && t->is_seed() && !m_settings.get_bool(settings_pack::lazy_bitfields)) { write_have_all(); - send_allowed_set(); return; } else if (m_supports_fast && t->num_have() == 0) { write_have_none(); - send_allowed_set(); return; } else if (t->num_have() == 0) @@ -2261,9 +2273,6 @@ namespace libtorrent } // TODO: if we're finished, send upload_only message } - - if (m_supports_fast) - send_allowed_set(); } #ifndef TORRENT_DISABLE_EXTENSIONS diff --git a/src/peer_connection.cpp b/src/peer_connection.cpp index 9eb06a6a8..48f2e83e7 100644 --- a/src/peer_connection.cpp +++ b/src/peer_connection.cpp @@ -590,9 +590,6 @@ namespace libtorrent // that the peer already has if (has_piece(i)) continue; -#ifndef TORRENT_DISABLE_LOGGING - peer_log(peer_log_alert::outgoing_message, "ALLOWED_FAST", "%d", i); -#endif write_allow_fast(i); TORRENT_ASSERT(std::find(m_accept_fast.begin() , m_accept_fast.end(), i) @@ -625,18 +622,26 @@ namespace libtorrent x.append(t->torrent_file().info_hash().data(), 20); sha1_hash hash = hasher(x.c_str(), int(x.size())).final(); + int attempts = 0; + int loops = 0; for (;;) { - char* p = hash.data(); - for (int i = 0; i < 5; ++i) + char const* p = hash.data(); + for (int i = 0; i < hash.size / sizeof(boost::uint32_t); ++i) { - int piece = detail::read_uint32(p) % num_pieces; + ++loops; + int const piece = detail::read_uint32(p) % num_pieces; if (std::find(m_accept_fast.begin(), m_accept_fast.end(), piece) - == m_accept_fast.end()) + != m_accept_fast.end()) + { + // this is our safety-net to make sure this loop terminates, even + // under the worst conditions + if (++loops > 500) return; + continue; + } + + if (!has_piece(piece)) { -#ifndef TORRENT_DISABLE_LOGGING - peer_log(peer_log_alert::outgoing_message, "ALLOWED_FAST", "%d", piece); -#endif write_allow_fast(piece); if (m_accept_fast.empty()) { @@ -645,9 +650,8 @@ namespace libtorrent } m_accept_fast.push_back(piece); m_accept_fast_piece_cnt.push_back(0); - if (int(m_accept_fast.size()) >= num_allowed_pieces - || int(m_accept_fast.size()) == num_pieces) return; } + if (++attempts >= num_allowed_pieces) return; } hash = hasher(hash.data(), 20).final(); }