diff --git a/include/libtorrent/aux_/session_impl.hpp b/include/libtorrent/aux_/session_impl.hpp index 7d9b89d61..391f1c24b 100644 --- a/include/libtorrent/aux_/session_impl.hpp +++ b/include/libtorrent/aux_/session_impl.hpp @@ -978,7 +978,12 @@ namespace libtorrent time_point m_created; boost::int64_t session_time() const TORRENT_OVERRIDE - { return total_seconds(aux::time_now() - m_created); } + { + // +1 is here to make it possible to distinguish uninitialized (to + // 0) timestamps and timestamps of things that happend during the + // first second after the session was constructed + return total_seconds(aux::time_now() - m_created) + 1; + } time_point m_last_tick; time_point m_last_second_tick; diff --git a/include/libtorrent/bt_peer_connection.hpp b/include/libtorrent/bt_peer_connection.hpp index d86247ab5..97b9d1077 100644 --- a/include/libtorrent/bt_peer_connection.hpp +++ b/include/libtorrent/bt_peer_connection.hpp @@ -94,7 +94,7 @@ namespace libtorrent }; ~bt_peer_connection(); - + #if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) bool supports_encryption() const { return m_encrypted; } diff --git a/simulation/Jamfile b/simulation/Jamfile index 3fe4f3b0a..4e99da0f9 100644 --- a/simulation/Jamfile +++ b/simulation/Jamfile @@ -23,6 +23,7 @@ project ; alias libtorrent-sims : + [ run test_optimistic_unchoke.cpp ] [ run test_transfer.cpp ] [ run test_http_connection.cpp ] [ run test_auto_manage.cpp ] diff --git a/simulation/setup_swarm.cpp b/simulation/setup_swarm.cpp index 2fd58cb37..193e69eba 100644 --- a/simulation/setup_swarm.cpp +++ b/simulation/setup_swarm.cpp @@ -236,7 +236,7 @@ void setup_swarm(int num_nodes , std::function on_alert , std::function terminate) { - asio::io_service ios(sim, addr("0.0.0.0")); + asio::io_service ios(sim); lt::time_point start_time(lt::clock_type::now()); std::vector > nodes; diff --git a/simulation/test_optimistic_unchoke.cpp b/simulation/test_optimistic_unchoke.cpp new file mode 100644 index 000000000..7f7206e53 --- /dev/null +++ b/simulation/test_optimistic_unchoke.cpp @@ -0,0 +1,174 @@ +/* + +Copyright (c) 2016, Arvid Norberg +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in + the documentation and/or other materials provided with the distribution. + * Neither the name of the author nor the names of its + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. + +*/ + +#include "setup_swarm.hpp" +#include "test.hpp" +#include "create_torrent.hpp" +#include "bittorrent_peer.hpp" +#include "settings.hpp" +#include "print_alerts.hpp" + +#include "libtorrent/alert.hpp" +#include "libtorrent/alert_types.hpp" +#include "libtorrent/session.hpp" +#include "libtorrent/session_stats.hpp" +#include "libtorrent/io_service.hpp" +#include "libtorrent/torrent_info.hpp" +#include "libtorrent/deadline_timer.hpp" + +#include +#include +#include +#include + +struct choke_state +{ + choke_state() : unchoke_duration(lt::seconds(0)), choked(true) {} + lt::time_duration unchoke_duration; + lt::time_point last_unchoke; + bool choked; +}; + +TORRENT_TEST(optimistic_unchoke) +{ + int const num_nodes = 20; + lt::time_duration const test_duration = libtorrent::seconds(1201); + + dsl_config network_cfg; + sim::simulation sim{network_cfg}; + + io_service ios(sim, addr("50.1.0.0")); + lt::time_point start_time(lt::clock_type::now()); + + libtorrent::add_torrent_params atp = create_torrent(0); + atp.flags &= ~add_torrent_params::flag_auto_managed; + atp.flags &= ~add_torrent_params::flag_paused; + + lt::settings_pack pack = settings(); + // only allow an optimistic unchoke slot + pack.set_int(settings_pack::unchoke_slots_limit, 1); + pack.set_int(settings_pack::num_optimistic_unchoke_slots, 1); + + std::vector peer_choke_state(num_nodes); + + session_proxy proxy; + + boost::shared_ptr ses = boost::make_shared( + boost::ref(pack), boost::ref(ios)); + ses->async_add_torrent(atp); + + std::vector > io_service; + std::vector > peers; + + ses->set_alert_notify([&]() { + // this function is called inside libtorrent and we cannot perform work + // immediately in it. We have to notify the outside to pull all the alerts + ios.post(boost::bind(&print_alerts, ses.get(), start_time)); + }); + + lt::deadline_timer timer(ios); + timer.expires_from_now(libtorrent::seconds(2)); + timer.async_wait([&](error_code const& ec) + { + for (int i = 0; i < num_nodes; ++i) + { + // create a new io_service + char ep[30]; + snprintf(ep, sizeof(ep), "50.0.%d.%d", (i + 1) >> 8, (i + 1) & 0xff); + io_service.push_back(boost::make_shared( + boost::ref(sim), addr(ep))); + peers.push_back(boost::make_shared(boost::ref(*io_service.back()) + , [&,i](int msg, char const* bug, int len) + { + choke_state& cs = peer_choke_state[i]; + if (msg == 0) + { + // choke + if (!cs.choked) + { + cs.choked = true; + cs.unchoke_duration += lt::clock_type::now() - cs.last_unchoke; + } + } + else if (msg == 1) + { + // unchoke + if (cs.choked) + { + cs.choked = false; + cs.last_unchoke = lt::clock_type::now(); + } + } + else + { + return; + } + + char const* msg_str[] = {"choke", "unchoke"}; + + lt::time_duration d = lt::clock_type::now() - start_time; + boost::uint32_t millis = lt::duration_cast(d).count(); + printf("\x1b[35m%4d.%03d: [%d] %s (%d ms)\x1b[0m\n" + , millis / 1000, millis % 1000, i, msg_str[msg] + , int(lt::duration_cast(cs.unchoke_duration).count())); + } + , *atp.ti + , tcp::endpoint(addr("50.1.0.0"), 6881) + , peer_conn::idle)); + } + }); + + lt::deadline_timer end_timer(ios); + timer.expires_from_now(test_duration); + timer.async_wait([&](error_code const& ec) + { + for (auto& p : peers) + { + p->abort(); + } + proxy = ses->abort(); + ses.reset(); + }); + + sim.run(); + + boost::int64_t const duration_ms = lt::duration_cast(test_duration).count(); + boost::int64_t const average_unchoke_time = duration_ms / num_nodes; + printf("EXPECT: %" PRId64 " ms\n", average_unchoke_time); + for (auto const& cs : peer_choke_state) + { + boost::int64_t unchoke_duration = lt::duration_cast(cs.unchoke_duration).count(); + printf("%" PRId64 " ms\n", unchoke_duration); + TEST_CHECK(std::abs(unchoke_duration - average_unchoke_time) < 1000); + } +} + diff --git a/src/peer_connection.cpp b/src/peer_connection.cpp index 32eb5cac2..ac991159c 100644 --- a/src/peer_connection.cpp +++ b/src/peer_connection.cpp @@ -1658,8 +1658,8 @@ namespace libtorrent m_peer_interested = true; if (is_disconnecting()) return; - - // if the peer is ready to download stuff, it must have metadata + + // if the peer is ready to download stuff, it must have metadata m_has_metadata = true; disconnect_if_redundant(); @@ -4619,10 +4619,14 @@ namespace libtorrent return; } - int download_rate = statistics().download_payload_rate(); +#ifndef TORRENT_DISABLE_LOGGING + int const previous_queue_size = m_desired_queue_size; +#endif + + int const download_rate = statistics().download_payload_rate(); // the desired download queue size - const int queue_time = m_settings.get_int(settings_pack::request_queue_time); + int const queue_time = m_settings.get_int(settings_pack::request_queue_time); // when we're in slow-start mode we increase the desired queue size every // time we receive a piece, no need to adjust it here (other than @@ -4636,7 +4640,7 @@ namespace libtorrent // the block size doesn't have to be 16. So we first query the // torrent for it boost::shared_ptr t = m_torrent.lock(); - const int block_size = t->block_size(); + int const block_size = t->block_size(); TORRENT_ASSERT(block_size > 0); @@ -4649,10 +4653,13 @@ namespace libtorrent m_desired_queue_size = min_request_queue; #ifndef TORRENT_DISABLE_LOGGING - peer_log(peer_log_alert::info, "UPDATE_QUEUE_SIZE" - , "dqs: %d max: %d dl: %d qt: %d snubbed: %d slow-start: %d" - , m_desired_queue_size, m_max_out_request_queue - , download_rate, queue_time, int(m_snubbed), int(m_slow_start)); + if (previous_queue_size != m_desired_queue_size) + { + peer_log(peer_log_alert::info, "UPDATE_QUEUE_SIZE" + , "dqs: %d max: %d dl: %d qt: %d snubbed: %d slow-start: %d" + , m_desired_queue_size, m_max_out_request_queue + , download_rate, queue_time, int(m_snubbed), int(m_slow_start)); + } #endif } @@ -5092,7 +5099,7 @@ namespace libtorrent bool sent_a_piece = false; boost::shared_ptr t = m_torrent.lock(); - if (!t || t->is_aborted()) return; + if (!t || t->is_aborted() || m_requests.empty()) return; // only add new piece-chunks if the send buffer is small enough // otherwise there will be no end to how large it will be! @@ -6681,9 +6688,10 @@ namespace libtorrent TORRENT_ASSERT(t->torrent_file().num_pieces() == int(m_have_piece.size())); // in share mode we don't close redundant connections - if (m_settings.get_bool(settings_pack::close_redundant_connections) && !t->share_mode()) + if (m_settings.get_bool(settings_pack::close_redundant_connections) + && !t->share_mode()) { - bool ok_to_disconnect = + bool const ok_to_disconnect = can_disconnect(error_code(errors::upload_upload_connection)) || can_disconnect(error_code(errors::uninteresting_upload_peer)) || can_disconnect(error_code(errors::too_many_requests_when_choked)) diff --git a/src/session_impl.cpp b/src/session_impl.cpp index af46b81d6..8cbc86953 100644 --- a/src/session_impl.cpp +++ b/src/session_impl.cpp @@ -3032,8 +3032,8 @@ retry: m_last_second_tick = now; m_tick_residual += tick_interval_ms - 1000; - boost::int64_t session_time = total_seconds(now - m_created); - if (session_time > 65000) + boost::int64_t const stime = session_time(); + if (stime > 65000) { // we're getting close to the point where our timestamps // in torrent_peer are wrapping. We need to step all counters back @@ -3835,7 +3835,7 @@ retry: // unchoked int num_opt_unchoke = m_settings.get_int(settings_pack::num_optimistic_unchoke_slots); - int allowed_unchoke_slots = m_stats_counters[counters::num_unchoke_slots]; + int const allowed_unchoke_slots = m_stats_counters[counters::num_unchoke_slots]; if (num_opt_unchoke == 0) num_opt_unchoke = (std::max)(1, allowed_unchoke_slots / 5); if (num_opt_unchoke > int(opt_unchoke.size())) num_opt_unchoke = int(opt_unchoke.size()); @@ -3845,7 +3845,6 @@ retry: , opt_unchoke.begin() + num_opt_unchoke , opt_unchoke.end(), &last_optimistic_unchoke_cmp); - #ifndef TORRENT_DISABLE_EXTENSIONS if (m_session_extension_features & plugin::optimistic_unchoke_feature) { @@ -3884,8 +3883,14 @@ retry: i != opt_unchoke_end; ++i) { torrent_peer* pi = *i; + peer_connection* p = static_cast(pi->connection); if (pi->optimistically_unchoked) { +#ifndef TORRENT_DISABLE_LOGGING + p->peer_log(peer_log_alert::info, "OPTIMISTIC UNCHOKE" + , "already unchoked | session-time: %d" + , pi->last_optimistically_unchoked); +#endif TORRENT_ASSERT(!pi->connection->is_choked()); // remove this peer from prev_opt_unchoke, to prevent us from // choking it later. This peer gets another round of optimistic @@ -3897,7 +3902,6 @@ retry: } else { - peer_connection* p = static_cast(pi->connection); TORRENT_ASSERT(p->is_choked()); boost::shared_ptr t = p->associated_torrent().lock(); bool ret = t->unchoke_peer(*p, true); @@ -3907,6 +3911,10 @@ retry: pi->optimistically_unchoked = true; m_stats_counters.inc_stats_counter(counters::num_peers_up_unchoked_optimistic); pi->last_optimistically_unchoked = boost::uint16_t(session_time()); +#ifndef TORRENT_DISABLE_LOGGING + p->peer_log(peer_log_alert::info, "OPTIMISTIC UNCHOKE" + , "session-time: %d", pi->last_optimistically_unchoked); +#endif } } } @@ -3923,6 +3931,14 @@ retry: m_stats_counters.inc_stats_counter(counters::num_peers_up_unchoked_optimistic, -1); t->choke_peer(*p); } + + // if we have too many unchoked peers now, we need to trigger the regular + // choking logic to choke some + if (m_stats_counters[counters::num_unchoke_slots] + < m_stats_counters[counters::num_peers_up_unchoked_all]) + { + m_unchoke_time_scaler = 0; + } } void session_impl::try_connect_more_peers() @@ -4076,6 +4092,8 @@ retry: // build list of all peers that are // unchokable. + // TODO: 3 there should be a pre-calculated list of all peers eligible for + // unchoking std::vector peers; for (connection_map::iterator i = m_connections.begin(); i != m_connections.end();) @@ -4137,7 +4155,7 @@ retry: , performance_alert::bittyrant_with_no_uplimit); } - int allowed_upload_slots = unchoke_sort(peers, max_upload_rate + int const allowed_upload_slots = unchoke_sort(peers, max_upload_rate , unchoke_interval, m_settings); m_stats_counters.set_value(counters::num_unchoke_slots , allowed_upload_slots); @@ -4146,7 +4164,8 @@ retry: session_log("RECALCULATE UNCHOKE SLOTS: [ peers: %d " "eligible-peers: %d" " max_upload_rate: %d" - " allowed-slots: %d ]", int(m_connections.size()) + " allowed-slots: %d ]" + , int(m_connections.size()) , int(peers.size()) , max_upload_rate , allowed_upload_slots); @@ -4154,9 +4173,7 @@ retry: int num_opt_unchoke = m_settings.get_int(settings_pack::num_optimistic_unchoke_slots); if (num_opt_unchoke == 0) num_opt_unchoke = (std::max)(1, allowed_upload_slots / 5); - - // reserve some upload slots for optimistic unchokes - int unchoke_set_size = allowed_upload_slots; + int unchoke_set_size = allowed_upload_slots - num_opt_unchoke; // go through all the peers and unchoke the first ones and choke // all the other ones. diff --git a/test/Jamfile b/test/Jamfile index 89ab73a42..e96abf4f3 100644 --- a/test/Jamfile +++ b/test/Jamfile @@ -56,6 +56,8 @@ lib libtorrent_test dht_server.cpp udp_tracker.cpp peer_server.cpp + bittorrent_peer.cpp + print_alerts.cpp web_seed_suite.cpp swarm_suite.cpp test_utils.cpp diff --git a/test/bittorrent_peer.cpp b/test/bittorrent_peer.cpp new file mode 100644 index 000000000..6d9954ce2 --- /dev/null +++ b/test/bittorrent_peer.cpp @@ -0,0 +1,562 @@ +/* + +Copyright (c) 2016, Arvid Norberg +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in + the documentation and/or other materials provided with the distribution. + * Neither the name of the author nor the names of its + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. + +*/ + +#include "libtorrent/socket.hpp" +#include "libtorrent/sha1_hash.hpp" +#include "libtorrent/address.hpp" +#include "libtorrent/assert.hpp" +#include "bittorrent_peer.hpp" +#include "libtorrent/torrent_info.hpp" +#include "libtorrent/io_service.hpp" +#include "libtorrent/io.hpp" +#include +#include + +using namespace libtorrent; + +peer_conn::peer_conn(io_service& ios + , boost::function on_msg + , torrent_info const& ti + , tcp::endpoint const& ep + , peer_mode_t mode) + : s(ios) + , m_mode(mode) + , m_ti(ti) + , read_pos(0) + , m_on_msg(on_msg) + , state(handshaking) + , choked(true) + , current_piece(-1) + , m_current_piece_is_allowed(false) + , block(0) + , m_blocks_per_piece((m_ti.piece_length() + 0x3fff) / 0x4000) + , outstanding_requests(0) + , fast_extension(false) + , blocks_received(0) + , blocks_sent(0) + , start_time(clock_type::now()) + , endpoint(ep) + , restarting(false) +{ + pieces.reserve(m_ti.num_pieces()); + start_conn(); +} + +void peer_conn::start_conn() +{ + restarting = false; + s.async_connect(endpoint, boost::bind(&peer_conn::on_connect, this, _1)); +} + +void peer_conn::on_connect(error_code const& ec) +{ + if (ec) + { + close("ERROR CONNECT: %s", ec); + return; + } + + char handshake[] = "\x13" "BitTorrent protocol\0\0\0\0\0\0\0\x04" + " " // space for info-hash + "aaaaaaaaaaaaaaaaaaaa" // peer-id + "\0\0\0\x01\x02"; // interested + char* h = (char*)malloc(sizeof(handshake)); + memcpy(h, handshake, sizeof(handshake)); + std::memcpy(h + 28, m_ti.info_hash().data(), 20); + std::generate(h + 48, h + 68, &rand); + // for seeds, don't send the interested message + boost::asio::async_write(s, boost::asio::buffer(h, (sizeof(handshake) - 1) + - (m_mode == uploader ? 5 : 0)) + , boost::bind(&peer_conn::on_handshake, this, h, _1, _2)); +} + +void peer_conn::on_handshake(char* h, error_code const& ec, size_t bytes_transferred) +{ + free(h); + if (ec) + { + close("ERROR SEND HANDSHAKE: %s", ec); + return; + } + + // read handshake + boost::asio::async_read(s, boost::asio::buffer((char*)buffer, 68) + , boost::bind(&peer_conn::on_handshake2, this, _1, _2)); +} + +void peer_conn::on_handshake2(error_code const& ec, size_t bytes_transferred) +{ + if (ec) + { + close("ERROR READ HANDSHAKE: %s", ec); + return; + } + + // buffer is the full 68 byte handshake + // look at the extension bits + + fast_extension = ((char*)buffer)[27] & 4; + + if (m_mode == uploader) + { + write_have_all(); + } + else + { + work_download(); + } +} + +void peer_conn::write_have_all() +{ + using namespace libtorrent::detail; + + if (fast_extension) + { + char* ptr = write_buf_proto; + // have_all + write_uint32(1, ptr); + write_uint8(0xe, ptr); + // unchoke + write_uint32(1, ptr); + write_uint8(1, ptr); + error_code ec; + boost::asio::async_write(s, boost::asio::buffer(write_buf_proto, ptr - write_buf_proto) + , boost::bind(&peer_conn::on_have_all_sent, this, _1, _2)); + } + else + { + // bitfield + int len = (m_ti.num_pieces() + 7) / 8; + char* ptr = (char*)buffer; + write_uint32(len + 1, ptr); + write_uint8(5, ptr); + memset(ptr, 255, len); + ptr += len; + // unchoke + write_uint32(1, ptr); + write_uint8(1, ptr); + error_code ec; + boost::asio::async_write(s, boost::asio::buffer((char*)buffer, len + 10) + , boost::bind(&peer_conn::on_have_all_sent, this, _1, _2)); + } +} + +void peer_conn::on_have_all_sent(error_code const& ec, size_t bytes_transferred) +{ + if (ec) + { + close("ERROR SEND HAVE ALL: %s", ec); + return; + } + + // read message + boost::asio::async_read(s, boost::asio::buffer((char*)buffer, 4) + , boost::bind(&peer_conn::on_msg_length, this, _1, _2)); +} + +bool peer_conn::write_request() +{ + using namespace libtorrent::detail; + + // if we're choked (and there are no allowed-fast pieces left) + if (choked && allowed_fast.empty() && !m_current_piece_is_allowed) return false; + + // if there are no pieces left to request + if (pieces.empty() && suggested_pieces.empty() && current_piece == -1) return false; + + if (current_piece == -1) + { + // pick a new piece + if (choked && allowed_fast.size() > 0) + { + current_piece = allowed_fast.front(); + allowed_fast.erase(allowed_fast.begin()); + m_current_piece_is_allowed = true; + } + else if (suggested_pieces.size() > 0) + { + current_piece = suggested_pieces.front(); + suggested_pieces.erase(suggested_pieces.begin()); + m_current_piece_is_allowed = false; + } + else if (pieces.size() > 0) + { + current_piece = pieces.front(); + pieces.erase(pieces.begin()); + m_current_piece_is_allowed = false; + } + else + { + TORRENT_ASSERT(false); + } + } + char msg[] = "\0\0\0\xd\x06" + " " // piece + " " // offset + " "; // length + char* m = (char*)malloc(sizeof(msg)); + memcpy(m, msg, sizeof(msg)); + char* ptr = m + 5; + write_uint32(current_piece, ptr); + write_uint32(block * 16 * 1024, ptr); + write_uint32(16 * 1024, ptr); + error_code ec; + boost::asio::async_write(s, boost::asio::buffer(m, sizeof(msg) - 1) + , boost::bind(&peer_conn::on_req_sent, this, m, _1, _2)); + + ++outstanding_requests; + ++block; + if (block == m_blocks_per_piece) + { + block = 0; + current_piece = -1; + m_current_piece_is_allowed = false; + } + return true; +} + +void peer_conn::on_req_sent(char* m, error_code const& ec, size_t bytes_transferred) +{ + free(m); + if (ec) + { + close("ERROR SEND REQUEST: %s", ec); + return; + } + + work_download(); +} + +void peer_conn::close(char const* fmt, error_code const& ec) +{ + end_time = clock_type::now(); + char tmp[1024]; + snprintf(tmp, sizeof(tmp), fmt, ec.message().c_str()); + int time = total_milliseconds(end_time - start_time); + if (time == 0) time = 1; + float up = (boost::int64_t(blocks_sent) * 0x4000) / time / 1000.f; + float down = (boost::int64_t(blocks_received) * 0x4000) / time / 1000.f; + error_code e; + + char ep_str[200]; + address const& addr = s.local_endpoint(e).address(); +#if TORRENT_USE_IPV6 + if (addr.is_v6()) + snprintf(ep_str, sizeof(ep_str), "[%s]:%d", addr.to_string(e).c_str() + , s.local_endpoint(e).port()); + else +#endif + snprintf(ep_str, sizeof(ep_str), "%s:%d", addr.to_string(e).c_str() + , s.local_endpoint(e).port()); + printf("%s ep: %s sent: %d received: %d duration: %d ms up: %.1fMB/s down: %.1fMB/s\n" + , tmp, ep_str, blocks_sent, blocks_received, time, up, down); +} + +void peer_conn::work_download() +{ + if (pieces.empty() + && suggested_pieces.empty() + && current_piece == -1 + && outstanding_requests == 0 + && blocks_received >= m_ti.num_pieces() * m_blocks_per_piece) + { + close("COMPLETED DOWNLOAD", error_code()); + return; + } + + // send requests + if (outstanding_requests < 40) + { + if (write_request()) return; + } + + // read message + boost::asio::async_read(s, boost::asio::buffer((char*)buffer, 4) + , boost::bind(&peer_conn::on_msg_length, this, _1, _2)); +} + +void peer_conn::on_msg_length(error_code const& ec, size_t bytes_transferred) +{ + using namespace libtorrent::detail; + + if ((ec == boost::asio::error::operation_aborted || ec == boost::asio::error::bad_descriptor) + && restarting) + { + start_conn(); + return; + } + + if (ec) + { + close("ERROR RECEIVE MESSAGE PREFIX: %s", ec); + return; + } + char* ptr = (char*)buffer; + unsigned int length = read_uint32(ptr); + if (length > sizeof(buffer)) + { + fprintf(stderr, "len: %d\n", length); + close("ERROR RECEIVE MESSAGE PREFIX: packet too big", error_code()); + return; + } + if (length == 0) + { + // keep-alive messate. read another length prefix + boost::asio::async_read(s, boost::asio::buffer((char*)buffer, 4) + , boost::bind(&peer_conn::on_msg_length, this, _1, _2)); + } + else + { + boost::asio::async_read(s, boost::asio::buffer((char*)buffer, length) + , boost::bind(&peer_conn::on_message, this, _1, _2)); + } +} + +void peer_conn::on_message(error_code const& ec, size_t bytes_transferred) +{ + using namespace libtorrent::detail; + + if ((ec == boost::asio::error::operation_aborted || ec == boost::asio::error::bad_descriptor) + && restarting) + { + start_conn(); + return; + } + + if (ec) + { + close("ERROR RECEIVE MESSAGE: %s", ec); + return; + } + char* ptr = (char*)buffer; + int msg = read_uint8(ptr); + + m_on_msg(msg, ptr, bytes_transferred); + + switch (m_mode) + { + case peer_conn::uploader: + if (msg == 6) + { + if (bytes_transferred != 13) + { + close("REQUEST packet has invalid size", error_code()); + return; + } + int piece = detail::read_int32(ptr); + int start = detail::read_int32(ptr); + int length = detail::read_int32(ptr); + write_piece(piece, start, length); + } + else if (msg == 3) // not-interested + { + close("DONE", error_code()); + return; + } + else + { + // read another message + boost::asio::async_read(s, boost::asio::buffer(buffer, 4) + , boost::bind(&peer_conn::on_msg_length, this, _1, _2)); + } + break; + case peer_conn::downloader: + if (msg == 0xe) // have_all + { + // build a list of all pieces and request them all! + pieces.resize(m_ti.num_pieces()); + for (int i = 0; i < int(pieces.size()); ++i) + pieces[i] = i; + std::random_shuffle(pieces.begin(), pieces.end()); + } + else if (msg == 4) // have + { + int piece = detail::read_int32(ptr); + if (pieces.empty()) pieces.push_back(piece); + else pieces.insert(pieces.begin() + (rand() % pieces.size()), piece); + } + else if (msg == 5) // bitfield + { + pieces.reserve(m_ti.num_pieces()); + int piece = 0; + for (int i = 0; i < int(bytes_transferred); ++i) + { + int mask = 0x80; + for (int k = 0; k < 8; ++k) + { + if (piece > m_ti.num_pieces()) break; + if (*ptr & mask) pieces.push_back(piece); + mask >>= 1; + ++piece; + } + ++ptr; + } + std::random_shuffle(pieces.begin(), pieces.end()); + } + else if (msg == 7) // piece + { +/* + if (verify_downloads) + { + int piece = read_uint32(ptr); + int start = read_uint32(ptr); + int size = bytes_transferred - 9; + verify_piece(piece, start, ptr, size); + } +*/ + ++blocks_received; + --outstanding_requests; + int piece = detail::read_int32(ptr); + int start = detail::read_int32(ptr); + + if (int((start + bytes_transferred) / 0x4000) == m_blocks_per_piece) + { + write_have(piece); + return; + } + } + else if (msg == 13) // suggest + { + int piece = detail::read_int32(ptr); + std::vector::iterator i = std::find(pieces.begin(), pieces.end(), piece); + if (i != pieces.end()) + { + pieces.erase(i); + suggested_pieces.push_back(piece); + } + } + else if (msg == 16) // reject request + { + int piece = detail::read_int32(ptr); + int start = detail::read_int32(ptr); + int length = detail::read_int32(ptr); + + // put it back! + if (current_piece != piece) + { + if (pieces.empty() || pieces.back() != piece) + pieces.push_back(piece); + } + else + { + block = (std::min)(start / 0x4000, block); + if (block == 0) + { + pieces.push_back(current_piece); + current_piece = -1; + m_current_piece_is_allowed = false; + } + } + --outstanding_requests; + fprintf(stderr, "REJECT: [ piece: %d start: %d length: %d ]\n", piece, start, length); + } + else if (msg == 0) // choke + { + choked = true; + } + else if (msg == 1) // unchoke + { + choked = false; + } + else if (msg == 17) // allowed_fast + { + int piece = detail::read_int32(ptr); + std::vector::iterator i = std::find(pieces.begin(), pieces.end(), piece); + if (i != pieces.end()) + { + pieces.erase(i); + allowed_fast.push_back(piece); + } + } + work_download(); + break; + case peer_conn::idle: + // read another message + boost::asio::async_read(s, boost::asio::buffer(buffer, 4) + , boost::bind(&peer_conn::on_msg_length, this, _1, _2)); + break; + } +} +/* +bool peer_conn::verify_piece(int piece, int start, char const* ptr, int size) +{ + boost::uint32_t* buf = (boost::uint32_t*)ptr; + boost::uint32_t fill = (piece << 8) | ((start / 0x4000) & 0xff); + for (int i = 0; i < size / 4; ++i) + { + if (buf[i] != fill) + { + fprintf(stderr, "received invalid block. piece %d block %d\n", piece, start / 0x4000); + exit(1); + return false; + } + } + return true; +} +*/ +void peer_conn::write_piece(int piece, int start, int length) +{ + using namespace libtorrent::detail; + +// generate_block(write_buffer, piece, start, length); + + char* ptr = write_buf_proto; + write_uint32(9 + length, ptr); + TORRENT_ASSERT(length == 0x4000); + write_uint8(7, ptr); + write_uint32(piece, ptr); + write_uint32(start, ptr); + boost::array vec; + vec[0] = boost::asio::buffer(write_buf_proto, ptr - write_buf_proto); + vec[1] = boost::asio::buffer(write_buffer, length); + boost::asio::async_write(s, vec, boost::bind(&peer_conn::on_have_all_sent, this, _1, _2)); + ++blocks_sent; +} + +void peer_conn::write_have(int piece) +{ + using namespace libtorrent::detail; + + char* ptr = write_buf_proto; + write_uint32(5, ptr); + write_uint8(4, ptr); + write_uint32(piece, ptr); + boost::asio::async_write(s, boost::asio::buffer(write_buf_proto, 9), boost::bind(&peer_conn::on_have_all_sent, this, _1, _2)); +} + +void peer_conn::abort() +{ + error_code ec; + s.close(ec); +} + diff --git a/test/bittorrent_peer.hpp b/test/bittorrent_peer.hpp new file mode 100644 index 000000000..481f966cd --- /dev/null +++ b/test/bittorrent_peer.hpp @@ -0,0 +1,118 @@ +/* + +Copyright (c) 2016, Arvid Norberg +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in + the documentation and/or other materials provided with the distribution. + * Neither the name of the author nor the names of its + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. + +*/ + +#ifndef BITTORRENT_PEER_HPP +#define BITTORRENT_PEER_HPP + +#include "libtorrent/socket.hpp" +#include "libtorrent/sha1_hash.hpp" +#include "libtorrent/io_service.hpp" +#include "libtorrent/time.hpp" +#include "libtorrent/address.hpp" +#include "libtorrent/torrent_info.hpp" +#include "test.hpp" // for EXPORT +#include + +using namespace libtorrent; + +struct EXPORT peer_conn +{ + enum peer_mode_t + { uploader, downloader, idle }; + + peer_conn(io_service& ios + , boost::function on_msg + , libtorrent::torrent_info const& ti + , libtorrent::tcp::endpoint const& ep + , peer_mode_t mode); + + void start_conn(); + + void on_connect(error_code const& ec); + void on_handshake(char* h, error_code const& ec, size_t bytes_transferred); + void on_handshake2(error_code const& ec, size_t bytes_transferred); + void write_have_all(); + void on_have_all_sent(error_code const& ec, size_t bytes_transferred); + bool write_request(); + void on_req_sent(char* m, error_code const& ec, size_t bytes_transferred); + void close(char const* fmt, error_code const& ec); + void work_download(); + void on_msg_length(error_code const& ec, size_t bytes_transferred); + void on_message(error_code const& ec, size_t bytes_transferred); + bool verify_piece(int piece, int start, char const* ptr, int size); + void write_piece(int piece, int start, int length); + void write_have(int piece); + + void abort(); + +private: + + tcp::socket s; + char write_buf_proto[100]; + boost::uint32_t write_buffer[17*1024/4]; + boost::uint32_t buffer[17*1024/4]; + + peer_mode_t m_mode; + torrent_info const& m_ti; + + int read_pos; + + boost::function m_on_msg; + + enum state_t + { + handshaking, + sending_request, + receiving_message + }; + int state; + std::vector pieces; + std::vector suggested_pieces; + std::vector allowed_fast; + bool choked; + int current_piece; // the piece we're currently requesting blocks from + bool m_current_piece_is_allowed; + int block; + int const m_blocks_per_piece; + int outstanding_requests; + // if this is true, this connection is a seed + bool fast_extension; + int blocks_received; + int blocks_sent; + time_point start_time; + time_point end_time; + tcp::endpoint endpoint; + bool restarting; +}; + +#endif + diff --git a/test/print_alerts.cpp b/test/print_alerts.cpp new file mode 100644 index 000000000..6d183f107 --- /dev/null +++ b/test/print_alerts.cpp @@ -0,0 +1,73 @@ +/* + +Copyright (c) 2016, Arvid Norberg +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in + the documentation and/or other materials provided with the distribution. + * Neither the name of the author nor the names of its + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. + +*/ + +#include "print_alerts.hpp" +#include "libtorrent/time.hpp" +#include "libtorrent/session.hpp" +#include "libtorrent/alert_types.hpp" +#include "print_alerts.hpp" + +void print_alerts(libtorrent::session* ses, libtorrent::time_point start_time) +{ + using namespace libtorrent; + namespace lt = libtorrent; + + if (ses == NULL) return; + + std::vector alerts; + ses->pop_alerts(&alerts); + + for (std::vector::iterator i = alerts.begin() + , end(alerts.end()); i != end; ++i) + { + alert* a = *i; +#ifndef TORRENT_DISABLE_LOGGING + if (peer_log_alert* pla = alert_cast(a)) + { + // in order to keep down the amount of logging, just log actual peer + // messages + if (pla->direction != peer_log_alert::incoming_message + && pla->direction != peer_log_alert::outgoing_message) + { + continue; + } + } +#endif + lt::time_duration d = a->timestamp() - start_time; + boost::uint32_t millis = lt::duration_cast(d).count(); + printf("%4d.%03d: %-25s %s\n", millis / 1000, millis % 1000 + , a->what() + , a->message().c_str()); + } + +} + diff --git a/test/print_alerts.hpp b/test/print_alerts.hpp new file mode 100644 index 000000000..1c95b7e93 --- /dev/null +++ b/test/print_alerts.hpp @@ -0,0 +1,43 @@ +/* + +Copyright (c) 2016, Arvid Norberg +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in + the documentation and/or other materials provided with the distribution. + * Neither the name of the author nor the names of its + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. + +*/ + +#ifndef PRINT_ALERTS_HPP +#define PRINT_ALERTS_HPP + +#include "libtorrent/time.hpp" +#include "libtorrent/session.hpp" +#include "test.hpp" // for EXPORT + +void EXPORT print_alerts(libtorrent::session* ses, libtorrent::time_point start_time); + +#endif +