diff --git a/include/libtorrent/aux_/session_impl.hpp b/include/libtorrent/aux_/session_impl.hpp index 336d4d676..391f1c24b 100644 --- a/include/libtorrent/aux_/session_impl.hpp +++ b/include/libtorrent/aux_/session_impl.hpp @@ -978,7 +978,12 @@ namespace libtorrent time_point m_created; boost::int64_t session_time() const TORRENT_OVERRIDE - { return total_seconds(aux::time_now() - m_created); } + { + // +1 is here to make it possible to distinguish uninitialized (to + // 0) timestamps and timestamps of things that happend during the + // first second after the session was constructed + return total_seconds(aux::time_now() - m_created) + 1; + } time_point m_last_tick; time_point m_last_second_tick; @@ -1169,15 +1174,19 @@ namespace libtorrent typedef std::list > ses_extension_list_t; ses_extension_list_t m_ses_extensions; - // std::string could be used for the query names if only all common implementations used SSO - // *glares at gcc* - struct extention_dht_query + // the union of all session extensions' implemented_features(). This is + // used to exclude callbacks to the session extensions. + boost::uint32_t m_session_extension_features; + + // std::string could be used for the query names if only all common + // implementations used SSO *glares at gcc* + struct extension_dht_query { boost::uint8_t query_len; boost::array query; dht_extension_handler_t handler; }; - typedef std::vector m_extension_dht_queries_t; + typedef std::vector m_extension_dht_queries_t; m_extension_dht_queries_t m_extension_dht_queries; #endif diff --git a/include/libtorrent/bt_peer_connection.hpp b/include/libtorrent/bt_peer_connection.hpp index d86247ab5..97b9d1077 100644 --- a/include/libtorrent/bt_peer_connection.hpp +++ b/include/libtorrent/bt_peer_connection.hpp @@ -94,7 +94,7 @@ namespace libtorrent }; ~bt_peer_connection(); - + #if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) bool supports_encryption() const { return m_encrypted; } diff --git a/include/libtorrent/extensions.hpp b/include/libtorrent/extensions.hpp index 66b775e49..6f7f23b8c 100644 --- a/include/libtorrent/extensions.hpp +++ b/include/libtorrent/extensions.hpp @@ -208,6 +208,27 @@ namespace libtorrent // hidden virtual ~plugin() {} + // these are flags that can be returned by implemented_features() + // indicating which callbacks this plugin is interested in + enum feature_flags_t + { + // include this bit if your plugin needs to alter the order of the + // optimistic unchoke of peers. i.e. have the on_optimistic_unchoke() + // callback be called. + optimistic_unchoke_feature = 1, + + // include this bit if your plugin needs to have on_tick() called + tick_feature = 2 + }; + + // This function is expected to return a bitmask indicating which features + // this plugin implements. Some callbacks on this object may not be called + // unless the corresponding feature flag is returned here. Note that + // callbacks may still be called even if the corresponding feature is not + // specified in the return value here. See feature_flags_t for possible + // flags to return. + virtual boost::uint32_t implemented_features() { return 0; } + // this is called by the session every time a new torrent is added. // The ``torrent*`` points to the internal torrent object created // for the new torrent. The ``void*`` is the userdata pointer as @@ -237,12 +258,13 @@ namespace libtorrent // called once per second virtual void on_tick() {} - // called when choosing peers to optimistically unchoke - // peer's will be unchoked in the order they appear in the given - // vector which is initially sorted by when they were last - // optimistically unchoked. - // if the plugin returns true then the ordering provided will be - // used and no other plugin will be allowed to change it. + // called when choosing peers to optimisticallly unchoke. peer's will be + // unchoked in the order they appear in the given vector. if + // the plugin returns true then the ordering provided will be used and no + // other plugin will be allowed to change it. If your plugin expects this + // to be called, make sure to include the flag + // ``optimistic_unchoke_feature`` in the return value from + // implemented_features(). virtual bool on_optimistic_unchoke(std::vector& /* peers */) { return false; } diff --git a/include/libtorrent/peer_connection_handle.hpp b/include/libtorrent/peer_connection_handle.hpp index 3037c6938..a2b8d5c1a 100644 --- a/include/libtorrent/peer_connection_handle.hpp +++ b/include/libtorrent/peer_connection_handle.hpp @@ -50,6 +50,7 @@ struct crypto_plugin; typedef boost::system::error_code error_code; +// hidden struct TORRENT_EXPORT peer_connection_handle { peer_connection_handle(boost::weak_ptr impl) @@ -113,11 +114,11 @@ struct TORRENT_EXPORT peer_connection_handle time_point time_of_last_unchoke() const; bool operator==(peer_connection_handle const& o) const - { return m_connection.lock() == o.m_connection.lock(); } + { return !(m_connection < o.m_connection) && !(o.m_connection < m_connection); } bool operator!=(peer_connection_handle const& o) const - { return m_connection.lock() != o.m_connection.lock(); } + { return m_connection < o.m_connection || o.m_connection < m_connection; } bool operator<(peer_connection_handle const& o) const - { return m_connection.lock() < o.m_connection.lock(); } + { return m_connection < o.m_connection; } boost::shared_ptr native_handle() const { diff --git a/simulation/Jamfile b/simulation/Jamfile index 3fe4f3b0a..4e99da0f9 100644 --- a/simulation/Jamfile +++ b/simulation/Jamfile @@ -23,6 +23,7 @@ project ; alias libtorrent-sims : + [ run test_optimistic_unchoke.cpp ] [ run test_transfer.cpp ] [ run test_http_connection.cpp ] [ run test_auto_manage.cpp ] diff --git a/simulation/setup_swarm.cpp b/simulation/setup_swarm.cpp index 2fd58cb37..193e69eba 100644 --- a/simulation/setup_swarm.cpp +++ b/simulation/setup_swarm.cpp @@ -236,7 +236,7 @@ void setup_swarm(int num_nodes , std::function on_alert , std::function terminate) { - asio::io_service ios(sim, addr("0.0.0.0")); + asio::io_service ios(sim); lt::time_point start_time(lt::clock_type::now()); std::vector > nodes; diff --git a/simulation/test_optimistic_unchoke.cpp b/simulation/test_optimistic_unchoke.cpp new file mode 100644 index 000000000..7f7206e53 --- /dev/null +++ b/simulation/test_optimistic_unchoke.cpp @@ -0,0 +1,174 @@ +/* + +Copyright (c) 2016, Arvid Norberg +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in + the documentation and/or other materials provided with the distribution. + * Neither the name of the author nor the names of its + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. + +*/ + +#include "setup_swarm.hpp" +#include "test.hpp" +#include "create_torrent.hpp" +#include "bittorrent_peer.hpp" +#include "settings.hpp" +#include "print_alerts.hpp" + +#include "libtorrent/alert.hpp" +#include "libtorrent/alert_types.hpp" +#include "libtorrent/session.hpp" +#include "libtorrent/session_stats.hpp" +#include "libtorrent/io_service.hpp" +#include "libtorrent/torrent_info.hpp" +#include "libtorrent/deadline_timer.hpp" + +#include +#include +#include +#include + +struct choke_state +{ + choke_state() : unchoke_duration(lt::seconds(0)), choked(true) {} + lt::time_duration unchoke_duration; + lt::time_point last_unchoke; + bool choked; +}; + +TORRENT_TEST(optimistic_unchoke) +{ + int const num_nodes = 20; + lt::time_duration const test_duration = libtorrent::seconds(1201); + + dsl_config network_cfg; + sim::simulation sim{network_cfg}; + + io_service ios(sim, addr("50.1.0.0")); + lt::time_point start_time(lt::clock_type::now()); + + libtorrent::add_torrent_params atp = create_torrent(0); + atp.flags &= ~add_torrent_params::flag_auto_managed; + atp.flags &= ~add_torrent_params::flag_paused; + + lt::settings_pack pack = settings(); + // only allow an optimistic unchoke slot + pack.set_int(settings_pack::unchoke_slots_limit, 1); + pack.set_int(settings_pack::num_optimistic_unchoke_slots, 1); + + std::vector peer_choke_state(num_nodes); + + session_proxy proxy; + + boost::shared_ptr ses = boost::make_shared( + boost::ref(pack), boost::ref(ios)); + ses->async_add_torrent(atp); + + std::vector > io_service; + std::vector > peers; + + ses->set_alert_notify([&]() { + // this function is called inside libtorrent and we cannot perform work + // immediately in it. We have to notify the outside to pull all the alerts + ios.post(boost::bind(&print_alerts, ses.get(), start_time)); + }); + + lt::deadline_timer timer(ios); + timer.expires_from_now(libtorrent::seconds(2)); + timer.async_wait([&](error_code const& ec) + { + for (int i = 0; i < num_nodes; ++i) + { + // create a new io_service + char ep[30]; + snprintf(ep, sizeof(ep), "50.0.%d.%d", (i + 1) >> 8, (i + 1) & 0xff); + io_service.push_back(boost::make_shared( + boost::ref(sim), addr(ep))); + peers.push_back(boost::make_shared(boost::ref(*io_service.back()) + , [&,i](int msg, char const* bug, int len) + { + choke_state& cs = peer_choke_state[i]; + if (msg == 0) + { + // choke + if (!cs.choked) + { + cs.choked = true; + cs.unchoke_duration += lt::clock_type::now() - cs.last_unchoke; + } + } + else if (msg == 1) + { + // unchoke + if (cs.choked) + { + cs.choked = false; + cs.last_unchoke = lt::clock_type::now(); + } + } + else + { + return; + } + + char const* msg_str[] = {"choke", "unchoke"}; + + lt::time_duration d = lt::clock_type::now() - start_time; + boost::uint32_t millis = lt::duration_cast(d).count(); + printf("\x1b[35m%4d.%03d: [%d] %s (%d ms)\x1b[0m\n" + , millis / 1000, millis % 1000, i, msg_str[msg] + , int(lt::duration_cast(cs.unchoke_duration).count())); + } + , *atp.ti + , tcp::endpoint(addr("50.1.0.0"), 6881) + , peer_conn::idle)); + } + }); + + lt::deadline_timer end_timer(ios); + timer.expires_from_now(test_duration); + timer.async_wait([&](error_code const& ec) + { + for (auto& p : peers) + { + p->abort(); + } + proxy = ses->abort(); + ses.reset(); + }); + + sim.run(); + + boost::int64_t const duration_ms = lt::duration_cast(test_duration).count(); + boost::int64_t const average_unchoke_time = duration_ms / num_nodes; + printf("EXPECT: %" PRId64 " ms\n", average_unchoke_time); + for (auto const& cs : peer_choke_state) + { + boost::int64_t unchoke_duration = lt::duration_cast(cs.unchoke_duration).count(); + printf("%" PRId64 " ms\n", unchoke_duration); + TEST_CHECK(std::abs(unchoke_duration - average_unchoke_time) < 1000); + } +} + diff --git a/src/metadata_transfer.cpp b/src/metadata_transfer.cpp index 153a90483..d67148379 100644 --- a/src/metadata_transfer.cpp +++ b/src/metadata_transfer.cpp @@ -219,7 +219,7 @@ namespace libtorrent { namespace m_torrent.set_progress_ppm(boost::int64_t(m_metadata_progress) * 1000000 / m_metadata_size); } - void on_piece_pass(int) + void on_piece_pass(int) TORRENT_OVERRIDE { // if we became a seed, copy the metadata from // the torrent before it is deallocated diff --git a/src/peer_connection.cpp b/src/peer_connection.cpp index 32eb5cac2..ac991159c 100644 --- a/src/peer_connection.cpp +++ b/src/peer_connection.cpp @@ -1658,8 +1658,8 @@ namespace libtorrent m_peer_interested = true; if (is_disconnecting()) return; - - // if the peer is ready to download stuff, it must have metadata + + // if the peer is ready to download stuff, it must have metadata m_has_metadata = true; disconnect_if_redundant(); @@ -4619,10 +4619,14 @@ namespace libtorrent return; } - int download_rate = statistics().download_payload_rate(); +#ifndef TORRENT_DISABLE_LOGGING + int const previous_queue_size = m_desired_queue_size; +#endif + + int const download_rate = statistics().download_payload_rate(); // the desired download queue size - const int queue_time = m_settings.get_int(settings_pack::request_queue_time); + int const queue_time = m_settings.get_int(settings_pack::request_queue_time); // when we're in slow-start mode we increase the desired queue size every // time we receive a piece, no need to adjust it here (other than @@ -4636,7 +4640,7 @@ namespace libtorrent // the block size doesn't have to be 16. So we first query the // torrent for it boost::shared_ptr t = m_torrent.lock(); - const int block_size = t->block_size(); + int const block_size = t->block_size(); TORRENT_ASSERT(block_size > 0); @@ -4649,10 +4653,13 @@ namespace libtorrent m_desired_queue_size = min_request_queue; #ifndef TORRENT_DISABLE_LOGGING - peer_log(peer_log_alert::info, "UPDATE_QUEUE_SIZE" - , "dqs: %d max: %d dl: %d qt: %d snubbed: %d slow-start: %d" - , m_desired_queue_size, m_max_out_request_queue - , download_rate, queue_time, int(m_snubbed), int(m_slow_start)); + if (previous_queue_size != m_desired_queue_size) + { + peer_log(peer_log_alert::info, "UPDATE_QUEUE_SIZE" + , "dqs: %d max: %d dl: %d qt: %d snubbed: %d slow-start: %d" + , m_desired_queue_size, m_max_out_request_queue + , download_rate, queue_time, int(m_snubbed), int(m_slow_start)); + } #endif } @@ -5092,7 +5099,7 @@ namespace libtorrent bool sent_a_piece = false; boost::shared_ptr t = m_torrent.lock(); - if (!t || t->is_aborted()) return; + if (!t || t->is_aborted() || m_requests.empty()) return; // only add new piece-chunks if the send buffer is small enough // otherwise there will be no end to how large it will be! @@ -6681,9 +6688,10 @@ namespace libtorrent TORRENT_ASSERT(t->torrent_file().num_pieces() == int(m_have_piece.size())); // in share mode we don't close redundant connections - if (m_settings.get_bool(settings_pack::close_redundant_connections) && !t->share_mode()) + if (m_settings.get_bool(settings_pack::close_redundant_connections) + && !t->share_mode()) { - bool ok_to_disconnect = + bool const ok_to_disconnect = can_disconnect(error_code(errors::upload_upload_connection)) || can_disconnect(error_code(errors::uninteresting_upload_peer)) || can_disconnect(error_code(errors::too_many_requests_when_choked)) diff --git a/src/session_impl.cpp b/src/session_impl.cpp index cd1bbe2b0..62752d9f0 100644 --- a/src/session_impl.cpp +++ b/src/session_impl.cpp @@ -430,6 +430,9 @@ namespace aux { , m_download_connect_attempts(0) , m_next_scrape_torrent(0) , m_tick_residual(0) +#ifndef TORRENT_DISABLE_EXTENSIONS + , m_session_extension_features(0) +#endif , m_deferred_submit_disk_jobs(false) , m_pending_auto_manage(false) , m_need_auto_manage(false) @@ -921,6 +924,7 @@ namespace aux { boost::shared_ptr p(new session_plugin_wrapper(ext)); m_ses_extensions.push_back(p); + m_session_extension_features |= p->implemented_features(); } void session_impl::add_ses_extension(boost::shared_ptr ext) @@ -931,6 +935,7 @@ namespace aux { m_ses_extensions.push_back(ext); m_alerts.add_extension(ext); ext->added(session_handle(this)); + m_session_extension_features |= ext->implemented_features(); // get any DHT queries the plugin would like to handle // and record them in m_extension_dht_queries for lookup @@ -942,7 +947,7 @@ namespace aux { { TORRENT_ASSERT(e->first.size() <= max_dht_query_length); if (e->first.size() > max_dht_query_length) continue; - extention_dht_query registration; + extension_dht_query registration; registration.query_len = e->first.size(); std::copy(e->first.begin(), e->first.end(), registration.query.begin()); registration.handler = e->second; @@ -2021,7 +2026,7 @@ retry: m_listen_interface.address().to_string() , m_listen_interface.port() , listen_failed_alert::bind - , ec, listen_failed_alert::udp); + , ec, listen_failed_alert::tcp); return; } @@ -3027,8 +3032,8 @@ retry: m_last_second_tick = now; m_tick_residual += tick_interval_ms - 1000; - boost::int64_t session_time = total_seconds(now - m_created); - if (session_time > 65000) + boost::int64_t const stime = session_time(); + if (stime > 65000) { // we're getting close to the point where our timestamps // in torrent_peer are wrapping. We need to step all counters back @@ -3047,12 +3052,15 @@ retry: } #ifndef TORRENT_DISABLE_EXTENSIONS - for (ses_extension_list_t::const_iterator i = m_ses_extensions.begin() - , end(m_ses_extensions.end()); i != end; ++i) + if (m_session_extension_features & plugin::tick_feature) { - TORRENT_TRY { - (*i)->on_tick(); - } TORRENT_CATCH(std::exception&) {} + for (ses_extension_list_t::const_iterator i = m_ses_extensions.begin() + , end(m_ses_extensions.end()); i != end; ++i) + { + TORRENT_TRY { + (*i)->on_tick(); + } TORRENT_CATCH(std::exception&) {} + } } #endif @@ -3764,24 +3772,32 @@ retry: } namespace { - struct last_optimistic_unchoke_cmp + bool last_optimistic_unchoke_cmp(torrent_peer const* const l + , torrent_peer const* const r) { - bool operator()(peer_connection_handle const& l - , peer_connection_handle const& r) - { - return l.native_handle()->peer_info_struct()->last_optimistically_unchoked - < r.native_handle()->peer_info_struct()->last_optimistically_unchoked; - } - }; + return l->last_optimistically_unchoked + < r->last_optimistically_unchoked; + } } void session_impl::recalculate_optimistic_unchoke_slots() { + INVARIANT_CHECK; + TORRENT_ASSERT(is_single_thread()); if (m_stats_counters[counters::num_unchoke_slots] == 0) return; - std::vector opt_unchoke; + std::vector opt_unchoke; + // collect the currently optimistically unchoked peers here, so we can + // choke them when we've found new optimistic unchoke candidates. + std::vector prev_opt_unchoke; + + // TODO: 3 it would probably make sense to have a separate list of peers + // that are eligible for optimistic unchoke, similar to the torrents + // perhaps this could even iterate over the pool allocators of + // torrent_peer objects. It could probably be done in a single pass and + // collect the n best candidates for (connection_map::iterator i = m_connections.begin() , end(m_connections.end()); i != end; ++i) { @@ -3790,90 +3806,139 @@ retry: torrent_peer* pi = p->peer_info_struct(); if (!pi) continue; if (pi->web_seed) continue; - torrent* t = p->associated_torrent().lock().get(); - if (!t) continue; - if (t->is_paused()) continue; if (pi->optimistically_unchoked) { - TORRENT_ASSERT(!p->is_choked()); - opt_unchoke.push_back(peer_connection_handle(*i)); + prev_opt_unchoke.push_back(pi); } + torrent* t = p->associated_torrent().lock().get(); + if (!t) continue; + + // TODO: 3 peers should know whether their torrent is paused or not, + // instead of having to ask it over and over again + if (t->is_paused()) continue; + if (!p->is_connecting() && !p->is_disconnecting() && p->is_peer_interested() && t->free_upload_slots() - && p->is_choked() + && (p->is_choked() || pi->optimistically_unchoked) && !p->ignore_unchoke_slots() && t->valid_metadata()) { - opt_unchoke.push_back(peer_connection_handle(*i)); + opt_unchoke.push_back(pi); } } // find the peers that has been waiting the longest to be optimistically // unchoked - // avoid having a bias towards peers that happen to be sorted first - std::random_shuffle(opt_unchoke.begin(), opt_unchoke.end(), randint); + int num_opt_unchoke = m_settings.get_int(settings_pack::num_optimistic_unchoke_slots); + int const allowed_unchoke_slots = m_stats_counters[counters::num_unchoke_slots]; + if (num_opt_unchoke == 0) num_opt_unchoke = (std::max)(1, allowed_unchoke_slots / 5); + if (num_opt_unchoke > int(opt_unchoke.size())) num_opt_unchoke = + int(opt_unchoke.size()); - // sort all candidates based on when they were last optimistically - // unchoked. - std::sort(opt_unchoke.begin(), opt_unchoke.end(), last_optimistic_unchoke_cmp()); + // find the n best optimistic unchoke candidates + std::partial_sort(opt_unchoke.begin() + , opt_unchoke.begin() + num_opt_unchoke + , opt_unchoke.end(), &last_optimistic_unchoke_cmp); #ifndef TORRENT_DISABLE_EXTENSIONS - for (ses_extension_list_t::iterator i = m_ses_extensions.begin() - , end(m_ses_extensions.end()); i != end; ++i) + if (m_session_extension_features & plugin::optimistic_unchoke_feature) { - if ((*i)->on_optimistic_unchoke(opt_unchoke)) - break; + // if there is an extension that wants to reorder the optimistic + // unchoke peers, first convert the vector into one containing + // peer_connection_handles, since that's the exported API + std::vector peers; + peers.reserve(opt_unchoke.size()); + for (std::vector::iterator i = opt_unchoke.begin() + , end(opt_unchoke.end()); i != end; ++i) + { + peers.push_back(peer_connection_handle(static_cast((*i)->connection)->self())); + } + for (ses_extension_list_t::iterator i = m_ses_extensions.begin() + , end(m_ses_extensions.end()); i != end; ++i) + { + if ((*i)->on_optimistic_unchoke(peers)) + break; + } + // then convert back to the internal torrent_peer pointers + opt_unchoke.clear(); + for (std::vector::iterator i = peers.begin() + , end(peers.end()); i != end; ++i) + { + opt_unchoke.push_back(i->native_handle()->peer_info_struct()); + } } #endif - int num_opt_unchoke = m_settings.get_int(settings_pack::num_optimistic_unchoke_slots); - int allowed_unchoke_slots = m_stats_counters[counters::num_unchoke_slots]; - if (num_opt_unchoke == 0) num_opt_unchoke = (std::max)(1, allowed_unchoke_slots / 5); - // unchoke the first num_opt_unchoke peers in the candidate set // and make sure that the others are choked - for (std::vector::iterator i = opt_unchoke.begin() - , end(opt_unchoke.end()); i != end; ++i) + std::vector::iterator opt_unchoke_end = opt_unchoke.begin() + + num_opt_unchoke; + + for (std::vector::iterator i = opt_unchoke.begin(); + i != opt_unchoke_end; ++i) { - torrent_peer* pi = i->native_handle()->peer_info_struct(); - if (num_opt_unchoke > 0) + torrent_peer* pi = *i; + peer_connection* p = static_cast(pi->connection); + if (pi->optimistically_unchoked) { - --num_opt_unchoke; - if (!pi->optimistically_unchoked) - { - peer_connection* p = static_cast(pi->connection); - torrent* t = p->associated_torrent().lock().get(); - bool ret = t->unchoke_peer(*p, true); - if (ret) - { - pi->optimistically_unchoked = true; - m_stats_counters.inc_stats_counter(counters::num_peers_up_unchoked_optimistic); - pi->last_optimistically_unchoked = boost::uint16_t(session_time()); - } - else - { - // we failed to unchoke it, increment the count again - ++num_opt_unchoke; - } - } +#ifndef TORRENT_DISABLE_LOGGING + p->peer_log(peer_log_alert::info, "OPTIMISTIC UNCHOKE" + , "already unchoked | session-time: %d" + , pi->last_optimistically_unchoked); +#endif + TORRENT_ASSERT(!pi->connection->is_choked()); + // remove this peer from prev_opt_unchoke, to prevent us from + // choking it later. This peer gets another round of optimistic + // unchoke + std::vector::iterator existing = + std::find(prev_opt_unchoke.begin(), prev_opt_unchoke.end(), pi); + TORRENT_ASSERT(existing != prev_opt_unchoke.end()); + prev_opt_unchoke.erase(existing); } else { - if (pi->optimistically_unchoked) + TORRENT_ASSERT(p->is_choked()); + boost::shared_ptr t = p->associated_torrent().lock(); + bool ret = t->unchoke_peer(*p, true); + TORRENT_ASSERT(ret); + if (ret) { - peer_connection* p = static_cast(pi->connection); - torrent* t = p->associated_torrent().lock().get(); - pi->optimistically_unchoked = false; - m_stats_counters.inc_stats_counter(counters::num_peers_up_unchoked_optimistic, -1); - t->choke_peer(*p); + pi->optimistically_unchoked = true; + m_stats_counters.inc_stats_counter(counters::num_peers_up_unchoked_optimistic); + pi->last_optimistically_unchoked = boost::uint16_t(session_time()); +#ifndef TORRENT_DISABLE_LOGGING + p->peer_log(peer_log_alert::info, "OPTIMISTIC UNCHOKE" + , "session-time: %d", pi->last_optimistically_unchoked); +#endif } } } + + // now, choke all the previous optimistically unchoked peers + for (std::vector::iterator i = prev_opt_unchoke.begin() + , end(prev_opt_unchoke.end()); i != end; ++i) + { + torrent_peer* pi = *i; + TORRENT_ASSERT(pi->optimistically_unchoked); + peer_connection* p = static_cast(pi->connection); + boost::shared_ptr t = p->associated_torrent().lock(); + pi->optimistically_unchoked = false; + m_stats_counters.inc_stats_counter(counters::num_peers_up_unchoked_optimistic, -1); + t->choke_peer(*p); + } + + // if we have too many unchoked peers now, we need to trigger the regular + // choking logic to choke some + if (m_stats_counters[counters::num_unchoke_slots] + < m_stats_counters[counters::num_peers_up_unchoked_all]) + { + m_unchoke_time_scaler = 0; + } } void session_impl::try_connect_more_peers() @@ -4027,6 +4092,8 @@ retry: // build list of all peers that are // unchokable. + // TODO: 3 there should be a pre-calculated list of all peers eligible for + // unchoking std::vector peers; for (connection_map::iterator i = m_connections.begin(); i != m_connections.end();) @@ -4088,7 +4155,7 @@ retry: , performance_alert::bittyrant_with_no_uplimit); } - int allowed_upload_slots = unchoke_sort(peers, max_upload_rate + int const allowed_upload_slots = unchoke_sort(peers, max_upload_rate , unchoke_interval, m_settings); m_stats_counters.set_value(counters::num_unchoke_slots , allowed_upload_slots); @@ -4097,7 +4164,8 @@ retry: session_log("RECALCULATE UNCHOKE SLOTS: [ peers: %d " "eligible-peers: %d" " max_upload_rate: %d" - " allowed-slots: %d ]", int(m_connections.size()) + " allowed-slots: %d ]" + , int(m_connections.size()) , int(peers.size()) , max_upload_rate , allowed_upload_slots); @@ -4105,9 +4173,7 @@ retry: int num_opt_unchoke = m_settings.get_int(settings_pack::num_optimistic_unchoke_slots); if (num_opt_unchoke == 0) num_opt_unchoke = (std::max)(1, allowed_upload_slots / 5); - - // reserve some upload slots for optimistic unchokes - int unchoke_set_size = allowed_upload_slots; + int unchoke_set_size = allowed_upload_slots - num_opt_unchoke; // go through all the peers and unchoke the first ones and choke // all the other ones. diff --git a/src/torrent_peer.cpp b/src/torrent_peer.cpp index a90933e88..705541cf2 100644 --- a/src/torrent_peer.cpp +++ b/src/torrent_peer.cpp @@ -284,7 +284,7 @@ namespace libtorrent return ""; } #endif - + libtorrent::address torrent_peer::address() const { #if TORRENT_USE_IPV6 diff --git a/src/ut_metadata.cpp b/src/ut_metadata.cpp index 25a534372..19c66b302 100644 --- a/src/ut_metadata.cpp +++ b/src/ut_metadata.cpp @@ -167,7 +167,7 @@ namespace libtorrent { namespace m_torrent.set_progress_ppm(boost::int64_t(m_metadata_progress) * 1000000 / m_metadata_size); } */ - void on_piece_pass(int) + void on_piece_pass(int) TORRENT_OVERRIDE { // if we became a seed, copy the metadata from // the torrent before it is deallocated diff --git a/test/Jamfile b/test/Jamfile index 89ab73a42..e96abf4f3 100644 --- a/test/Jamfile +++ b/test/Jamfile @@ -56,6 +56,8 @@ lib libtorrent_test dht_server.cpp udp_tracker.cpp peer_server.cpp + bittorrent_peer.cpp + print_alerts.cpp web_seed_suite.cpp swarm_suite.cpp test_utils.cpp diff --git a/test/bittorrent_peer.cpp b/test/bittorrent_peer.cpp new file mode 100644 index 000000000..6d9954ce2 --- /dev/null +++ b/test/bittorrent_peer.cpp @@ -0,0 +1,562 @@ +/* + +Copyright (c) 2016, Arvid Norberg +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in + the documentation and/or other materials provided with the distribution. + * Neither the name of the author nor the names of its + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. + +*/ + +#include "libtorrent/socket.hpp" +#include "libtorrent/sha1_hash.hpp" +#include "libtorrent/address.hpp" +#include "libtorrent/assert.hpp" +#include "bittorrent_peer.hpp" +#include "libtorrent/torrent_info.hpp" +#include "libtorrent/io_service.hpp" +#include "libtorrent/io.hpp" +#include +#include + +using namespace libtorrent; + +peer_conn::peer_conn(io_service& ios + , boost::function on_msg + , torrent_info const& ti + , tcp::endpoint const& ep + , peer_mode_t mode) + : s(ios) + , m_mode(mode) + , m_ti(ti) + , read_pos(0) + , m_on_msg(on_msg) + , state(handshaking) + , choked(true) + , current_piece(-1) + , m_current_piece_is_allowed(false) + , block(0) + , m_blocks_per_piece((m_ti.piece_length() + 0x3fff) / 0x4000) + , outstanding_requests(0) + , fast_extension(false) + , blocks_received(0) + , blocks_sent(0) + , start_time(clock_type::now()) + , endpoint(ep) + , restarting(false) +{ + pieces.reserve(m_ti.num_pieces()); + start_conn(); +} + +void peer_conn::start_conn() +{ + restarting = false; + s.async_connect(endpoint, boost::bind(&peer_conn::on_connect, this, _1)); +} + +void peer_conn::on_connect(error_code const& ec) +{ + if (ec) + { + close("ERROR CONNECT: %s", ec); + return; + } + + char handshake[] = "\x13" "BitTorrent protocol\0\0\0\0\0\0\0\x04" + " " // space for info-hash + "aaaaaaaaaaaaaaaaaaaa" // peer-id + "\0\0\0\x01\x02"; // interested + char* h = (char*)malloc(sizeof(handshake)); + memcpy(h, handshake, sizeof(handshake)); + std::memcpy(h + 28, m_ti.info_hash().data(), 20); + std::generate(h + 48, h + 68, &rand); + // for seeds, don't send the interested message + boost::asio::async_write(s, boost::asio::buffer(h, (sizeof(handshake) - 1) + - (m_mode == uploader ? 5 : 0)) + , boost::bind(&peer_conn::on_handshake, this, h, _1, _2)); +} + +void peer_conn::on_handshake(char* h, error_code const& ec, size_t bytes_transferred) +{ + free(h); + if (ec) + { + close("ERROR SEND HANDSHAKE: %s", ec); + return; + } + + // read handshake + boost::asio::async_read(s, boost::asio::buffer((char*)buffer, 68) + , boost::bind(&peer_conn::on_handshake2, this, _1, _2)); +} + +void peer_conn::on_handshake2(error_code const& ec, size_t bytes_transferred) +{ + if (ec) + { + close("ERROR READ HANDSHAKE: %s", ec); + return; + } + + // buffer is the full 68 byte handshake + // look at the extension bits + + fast_extension = ((char*)buffer)[27] & 4; + + if (m_mode == uploader) + { + write_have_all(); + } + else + { + work_download(); + } +} + +void peer_conn::write_have_all() +{ + using namespace libtorrent::detail; + + if (fast_extension) + { + char* ptr = write_buf_proto; + // have_all + write_uint32(1, ptr); + write_uint8(0xe, ptr); + // unchoke + write_uint32(1, ptr); + write_uint8(1, ptr); + error_code ec; + boost::asio::async_write(s, boost::asio::buffer(write_buf_proto, ptr - write_buf_proto) + , boost::bind(&peer_conn::on_have_all_sent, this, _1, _2)); + } + else + { + // bitfield + int len = (m_ti.num_pieces() + 7) / 8; + char* ptr = (char*)buffer; + write_uint32(len + 1, ptr); + write_uint8(5, ptr); + memset(ptr, 255, len); + ptr += len; + // unchoke + write_uint32(1, ptr); + write_uint8(1, ptr); + error_code ec; + boost::asio::async_write(s, boost::asio::buffer((char*)buffer, len + 10) + , boost::bind(&peer_conn::on_have_all_sent, this, _1, _2)); + } +} + +void peer_conn::on_have_all_sent(error_code const& ec, size_t bytes_transferred) +{ + if (ec) + { + close("ERROR SEND HAVE ALL: %s", ec); + return; + } + + // read message + boost::asio::async_read(s, boost::asio::buffer((char*)buffer, 4) + , boost::bind(&peer_conn::on_msg_length, this, _1, _2)); +} + +bool peer_conn::write_request() +{ + using namespace libtorrent::detail; + + // if we're choked (and there are no allowed-fast pieces left) + if (choked && allowed_fast.empty() && !m_current_piece_is_allowed) return false; + + // if there are no pieces left to request + if (pieces.empty() && suggested_pieces.empty() && current_piece == -1) return false; + + if (current_piece == -1) + { + // pick a new piece + if (choked && allowed_fast.size() > 0) + { + current_piece = allowed_fast.front(); + allowed_fast.erase(allowed_fast.begin()); + m_current_piece_is_allowed = true; + } + else if (suggested_pieces.size() > 0) + { + current_piece = suggested_pieces.front(); + suggested_pieces.erase(suggested_pieces.begin()); + m_current_piece_is_allowed = false; + } + else if (pieces.size() > 0) + { + current_piece = pieces.front(); + pieces.erase(pieces.begin()); + m_current_piece_is_allowed = false; + } + else + { + TORRENT_ASSERT(false); + } + } + char msg[] = "\0\0\0\xd\x06" + " " // piece + " " // offset + " "; // length + char* m = (char*)malloc(sizeof(msg)); + memcpy(m, msg, sizeof(msg)); + char* ptr = m + 5; + write_uint32(current_piece, ptr); + write_uint32(block * 16 * 1024, ptr); + write_uint32(16 * 1024, ptr); + error_code ec; + boost::asio::async_write(s, boost::asio::buffer(m, sizeof(msg) - 1) + , boost::bind(&peer_conn::on_req_sent, this, m, _1, _2)); + + ++outstanding_requests; + ++block; + if (block == m_blocks_per_piece) + { + block = 0; + current_piece = -1; + m_current_piece_is_allowed = false; + } + return true; +} + +void peer_conn::on_req_sent(char* m, error_code const& ec, size_t bytes_transferred) +{ + free(m); + if (ec) + { + close("ERROR SEND REQUEST: %s", ec); + return; + } + + work_download(); +} + +void peer_conn::close(char const* fmt, error_code const& ec) +{ + end_time = clock_type::now(); + char tmp[1024]; + snprintf(tmp, sizeof(tmp), fmt, ec.message().c_str()); + int time = total_milliseconds(end_time - start_time); + if (time == 0) time = 1; + float up = (boost::int64_t(blocks_sent) * 0x4000) / time / 1000.f; + float down = (boost::int64_t(blocks_received) * 0x4000) / time / 1000.f; + error_code e; + + char ep_str[200]; + address const& addr = s.local_endpoint(e).address(); +#if TORRENT_USE_IPV6 + if (addr.is_v6()) + snprintf(ep_str, sizeof(ep_str), "[%s]:%d", addr.to_string(e).c_str() + , s.local_endpoint(e).port()); + else +#endif + snprintf(ep_str, sizeof(ep_str), "%s:%d", addr.to_string(e).c_str() + , s.local_endpoint(e).port()); + printf("%s ep: %s sent: %d received: %d duration: %d ms up: %.1fMB/s down: %.1fMB/s\n" + , tmp, ep_str, blocks_sent, blocks_received, time, up, down); +} + +void peer_conn::work_download() +{ + if (pieces.empty() + && suggested_pieces.empty() + && current_piece == -1 + && outstanding_requests == 0 + && blocks_received >= m_ti.num_pieces() * m_blocks_per_piece) + { + close("COMPLETED DOWNLOAD", error_code()); + return; + } + + // send requests + if (outstanding_requests < 40) + { + if (write_request()) return; + } + + // read message + boost::asio::async_read(s, boost::asio::buffer((char*)buffer, 4) + , boost::bind(&peer_conn::on_msg_length, this, _1, _2)); +} + +void peer_conn::on_msg_length(error_code const& ec, size_t bytes_transferred) +{ + using namespace libtorrent::detail; + + if ((ec == boost::asio::error::operation_aborted || ec == boost::asio::error::bad_descriptor) + && restarting) + { + start_conn(); + return; + } + + if (ec) + { + close("ERROR RECEIVE MESSAGE PREFIX: %s", ec); + return; + } + char* ptr = (char*)buffer; + unsigned int length = read_uint32(ptr); + if (length > sizeof(buffer)) + { + fprintf(stderr, "len: %d\n", length); + close("ERROR RECEIVE MESSAGE PREFIX: packet too big", error_code()); + return; + } + if (length == 0) + { + // keep-alive messate. read another length prefix + boost::asio::async_read(s, boost::asio::buffer((char*)buffer, 4) + , boost::bind(&peer_conn::on_msg_length, this, _1, _2)); + } + else + { + boost::asio::async_read(s, boost::asio::buffer((char*)buffer, length) + , boost::bind(&peer_conn::on_message, this, _1, _2)); + } +} + +void peer_conn::on_message(error_code const& ec, size_t bytes_transferred) +{ + using namespace libtorrent::detail; + + if ((ec == boost::asio::error::operation_aborted || ec == boost::asio::error::bad_descriptor) + && restarting) + { + start_conn(); + return; + } + + if (ec) + { + close("ERROR RECEIVE MESSAGE: %s", ec); + return; + } + char* ptr = (char*)buffer; + int msg = read_uint8(ptr); + + m_on_msg(msg, ptr, bytes_transferred); + + switch (m_mode) + { + case peer_conn::uploader: + if (msg == 6) + { + if (bytes_transferred != 13) + { + close("REQUEST packet has invalid size", error_code()); + return; + } + int piece = detail::read_int32(ptr); + int start = detail::read_int32(ptr); + int length = detail::read_int32(ptr); + write_piece(piece, start, length); + } + else if (msg == 3) // not-interested + { + close("DONE", error_code()); + return; + } + else + { + // read another message + boost::asio::async_read(s, boost::asio::buffer(buffer, 4) + , boost::bind(&peer_conn::on_msg_length, this, _1, _2)); + } + break; + case peer_conn::downloader: + if (msg == 0xe) // have_all + { + // build a list of all pieces and request them all! + pieces.resize(m_ti.num_pieces()); + for (int i = 0; i < int(pieces.size()); ++i) + pieces[i] = i; + std::random_shuffle(pieces.begin(), pieces.end()); + } + else if (msg == 4) // have + { + int piece = detail::read_int32(ptr); + if (pieces.empty()) pieces.push_back(piece); + else pieces.insert(pieces.begin() + (rand() % pieces.size()), piece); + } + else if (msg == 5) // bitfield + { + pieces.reserve(m_ti.num_pieces()); + int piece = 0; + for (int i = 0; i < int(bytes_transferred); ++i) + { + int mask = 0x80; + for (int k = 0; k < 8; ++k) + { + if (piece > m_ti.num_pieces()) break; + if (*ptr & mask) pieces.push_back(piece); + mask >>= 1; + ++piece; + } + ++ptr; + } + std::random_shuffle(pieces.begin(), pieces.end()); + } + else if (msg == 7) // piece + { +/* + if (verify_downloads) + { + int piece = read_uint32(ptr); + int start = read_uint32(ptr); + int size = bytes_transferred - 9; + verify_piece(piece, start, ptr, size); + } +*/ + ++blocks_received; + --outstanding_requests; + int piece = detail::read_int32(ptr); + int start = detail::read_int32(ptr); + + if (int((start + bytes_transferred) / 0x4000) == m_blocks_per_piece) + { + write_have(piece); + return; + } + } + else if (msg == 13) // suggest + { + int piece = detail::read_int32(ptr); + std::vector::iterator i = std::find(pieces.begin(), pieces.end(), piece); + if (i != pieces.end()) + { + pieces.erase(i); + suggested_pieces.push_back(piece); + } + } + else if (msg == 16) // reject request + { + int piece = detail::read_int32(ptr); + int start = detail::read_int32(ptr); + int length = detail::read_int32(ptr); + + // put it back! + if (current_piece != piece) + { + if (pieces.empty() || pieces.back() != piece) + pieces.push_back(piece); + } + else + { + block = (std::min)(start / 0x4000, block); + if (block == 0) + { + pieces.push_back(current_piece); + current_piece = -1; + m_current_piece_is_allowed = false; + } + } + --outstanding_requests; + fprintf(stderr, "REJECT: [ piece: %d start: %d length: %d ]\n", piece, start, length); + } + else if (msg == 0) // choke + { + choked = true; + } + else if (msg == 1) // unchoke + { + choked = false; + } + else if (msg == 17) // allowed_fast + { + int piece = detail::read_int32(ptr); + std::vector::iterator i = std::find(pieces.begin(), pieces.end(), piece); + if (i != pieces.end()) + { + pieces.erase(i); + allowed_fast.push_back(piece); + } + } + work_download(); + break; + case peer_conn::idle: + // read another message + boost::asio::async_read(s, boost::asio::buffer(buffer, 4) + , boost::bind(&peer_conn::on_msg_length, this, _1, _2)); + break; + } +} +/* +bool peer_conn::verify_piece(int piece, int start, char const* ptr, int size) +{ + boost::uint32_t* buf = (boost::uint32_t*)ptr; + boost::uint32_t fill = (piece << 8) | ((start / 0x4000) & 0xff); + for (int i = 0; i < size / 4; ++i) + { + if (buf[i] != fill) + { + fprintf(stderr, "received invalid block. piece %d block %d\n", piece, start / 0x4000); + exit(1); + return false; + } + } + return true; +} +*/ +void peer_conn::write_piece(int piece, int start, int length) +{ + using namespace libtorrent::detail; + +// generate_block(write_buffer, piece, start, length); + + char* ptr = write_buf_proto; + write_uint32(9 + length, ptr); + TORRENT_ASSERT(length == 0x4000); + write_uint8(7, ptr); + write_uint32(piece, ptr); + write_uint32(start, ptr); + boost::array vec; + vec[0] = boost::asio::buffer(write_buf_proto, ptr - write_buf_proto); + vec[1] = boost::asio::buffer(write_buffer, length); + boost::asio::async_write(s, vec, boost::bind(&peer_conn::on_have_all_sent, this, _1, _2)); + ++blocks_sent; +} + +void peer_conn::write_have(int piece) +{ + using namespace libtorrent::detail; + + char* ptr = write_buf_proto; + write_uint32(5, ptr); + write_uint8(4, ptr); + write_uint32(piece, ptr); + boost::asio::async_write(s, boost::asio::buffer(write_buf_proto, 9), boost::bind(&peer_conn::on_have_all_sent, this, _1, _2)); +} + +void peer_conn::abort() +{ + error_code ec; + s.close(ec); +} + diff --git a/test/bittorrent_peer.hpp b/test/bittorrent_peer.hpp new file mode 100644 index 000000000..481f966cd --- /dev/null +++ b/test/bittorrent_peer.hpp @@ -0,0 +1,118 @@ +/* + +Copyright (c) 2016, Arvid Norberg +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in + the documentation and/or other materials provided with the distribution. + * Neither the name of the author nor the names of its + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. + +*/ + +#ifndef BITTORRENT_PEER_HPP +#define BITTORRENT_PEER_HPP + +#include "libtorrent/socket.hpp" +#include "libtorrent/sha1_hash.hpp" +#include "libtorrent/io_service.hpp" +#include "libtorrent/time.hpp" +#include "libtorrent/address.hpp" +#include "libtorrent/torrent_info.hpp" +#include "test.hpp" // for EXPORT +#include + +using namespace libtorrent; + +struct EXPORT peer_conn +{ + enum peer_mode_t + { uploader, downloader, idle }; + + peer_conn(io_service& ios + , boost::function on_msg + , libtorrent::torrent_info const& ti + , libtorrent::tcp::endpoint const& ep + , peer_mode_t mode); + + void start_conn(); + + void on_connect(error_code const& ec); + void on_handshake(char* h, error_code const& ec, size_t bytes_transferred); + void on_handshake2(error_code const& ec, size_t bytes_transferred); + void write_have_all(); + void on_have_all_sent(error_code const& ec, size_t bytes_transferred); + bool write_request(); + void on_req_sent(char* m, error_code const& ec, size_t bytes_transferred); + void close(char const* fmt, error_code const& ec); + void work_download(); + void on_msg_length(error_code const& ec, size_t bytes_transferred); + void on_message(error_code const& ec, size_t bytes_transferred); + bool verify_piece(int piece, int start, char const* ptr, int size); + void write_piece(int piece, int start, int length); + void write_have(int piece); + + void abort(); + +private: + + tcp::socket s; + char write_buf_proto[100]; + boost::uint32_t write_buffer[17*1024/4]; + boost::uint32_t buffer[17*1024/4]; + + peer_mode_t m_mode; + torrent_info const& m_ti; + + int read_pos; + + boost::function m_on_msg; + + enum state_t + { + handshaking, + sending_request, + receiving_message + }; + int state; + std::vector pieces; + std::vector suggested_pieces; + std::vector allowed_fast; + bool choked; + int current_piece; // the piece we're currently requesting blocks from + bool m_current_piece_is_allowed; + int block; + int const m_blocks_per_piece; + int outstanding_requests; + // if this is true, this connection is a seed + bool fast_extension; + int blocks_received; + int blocks_sent; + time_point start_time; + time_point end_time; + tcp::endpoint endpoint; + bool restarting; +}; + +#endif + diff --git a/test/print_alerts.cpp b/test/print_alerts.cpp new file mode 100644 index 000000000..6d183f107 --- /dev/null +++ b/test/print_alerts.cpp @@ -0,0 +1,73 @@ +/* + +Copyright (c) 2016, Arvid Norberg +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in + the documentation and/or other materials provided with the distribution. + * Neither the name of the author nor the names of its + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. + +*/ + +#include "print_alerts.hpp" +#include "libtorrent/time.hpp" +#include "libtorrent/session.hpp" +#include "libtorrent/alert_types.hpp" +#include "print_alerts.hpp" + +void print_alerts(libtorrent::session* ses, libtorrent::time_point start_time) +{ + using namespace libtorrent; + namespace lt = libtorrent; + + if (ses == NULL) return; + + std::vector alerts; + ses->pop_alerts(&alerts); + + for (std::vector::iterator i = alerts.begin() + , end(alerts.end()); i != end; ++i) + { + alert* a = *i; +#ifndef TORRENT_DISABLE_LOGGING + if (peer_log_alert* pla = alert_cast(a)) + { + // in order to keep down the amount of logging, just log actual peer + // messages + if (pla->direction != peer_log_alert::incoming_message + && pla->direction != peer_log_alert::outgoing_message) + { + continue; + } + } +#endif + lt::time_duration d = a->timestamp() - start_time; + boost::uint32_t millis = lt::duration_cast(d).count(); + printf("%4d.%03d: %-25s %s\n", millis / 1000, millis % 1000 + , a->what() + , a->message().c_str()); + } + +} + diff --git a/test/print_alerts.hpp b/test/print_alerts.hpp new file mode 100644 index 000000000..1c95b7e93 --- /dev/null +++ b/test/print_alerts.hpp @@ -0,0 +1,43 @@ +/* + +Copyright (c) 2016, Arvid Norberg +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in + the documentation and/or other materials provided with the distribution. + * Neither the name of the author nor the names of its + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. + +*/ + +#ifndef PRINT_ALERTS_HPP +#define PRINT_ALERTS_HPP + +#include "libtorrent/time.hpp" +#include "libtorrent/session.hpp" +#include "test.hpp" // for EXPORT + +void EXPORT print_alerts(libtorrent::session* ses, libtorrent::time_point start_time); + +#endif +