Merge pull request #430 from arvidn/optimistic-unchoke-1.1

optimize the optimistic unchoke logic-1.1
This commit is contained in:
Arvid Norberg 2016-02-01 23:56:17 -05:00
commit 5cee0cb5f2
17 changed files with 1181 additions and 102 deletions

View File

@ -978,7 +978,12 @@ namespace libtorrent
time_point m_created; time_point m_created;
boost::int64_t session_time() const TORRENT_OVERRIDE boost::int64_t session_time() const TORRENT_OVERRIDE
{ return total_seconds(aux::time_now() - m_created); } {
// +1 is here to make it possible to distinguish uninitialized (to
// 0) timestamps and timestamps of things that happend during the
// first second after the session was constructed
return total_seconds(aux::time_now() - m_created) + 1;
}
time_point m_last_tick; time_point m_last_tick;
time_point m_last_second_tick; time_point m_last_second_tick;
@ -1169,15 +1174,19 @@ namespace libtorrent
typedef std::list<boost::shared_ptr<plugin> > ses_extension_list_t; typedef std::list<boost::shared_ptr<plugin> > ses_extension_list_t;
ses_extension_list_t m_ses_extensions; ses_extension_list_t m_ses_extensions;
// std::string could be used for the query names if only all common implementations used SSO // the union of all session extensions' implemented_features(). This is
// *glares at gcc* // used to exclude callbacks to the session extensions.
struct extention_dht_query boost::uint32_t m_session_extension_features;
// std::string could be used for the query names if only all common
// implementations used SSO *glares at gcc*
struct extension_dht_query
{ {
boost::uint8_t query_len; boost::uint8_t query_len;
boost::array<char, max_dht_query_length> query; boost::array<char, max_dht_query_length> query;
dht_extension_handler_t handler; dht_extension_handler_t handler;
}; };
typedef std::vector<extention_dht_query> m_extension_dht_queries_t; typedef std::vector<extension_dht_query> m_extension_dht_queries_t;
m_extension_dht_queries_t m_extension_dht_queries; m_extension_dht_queries_t m_extension_dht_queries;
#endif #endif

View File

@ -94,7 +94,7 @@ namespace libtorrent
}; };
~bt_peer_connection(); ~bt_peer_connection();
#if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) #if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS)
bool supports_encryption() const bool supports_encryption() const
{ return m_encrypted; } { return m_encrypted; }

View File

@ -208,6 +208,27 @@ namespace libtorrent
// hidden // hidden
virtual ~plugin() {} virtual ~plugin() {}
// these are flags that can be returned by implemented_features()
// indicating which callbacks this plugin is interested in
enum feature_flags_t
{
// include this bit if your plugin needs to alter the order of the
// optimistic unchoke of peers. i.e. have the on_optimistic_unchoke()
// callback be called.
optimistic_unchoke_feature = 1,
// include this bit if your plugin needs to have on_tick() called
tick_feature = 2
};
// This function is expected to return a bitmask indicating which features
// this plugin implements. Some callbacks on this object may not be called
// unless the corresponding feature flag is returned here. Note that
// callbacks may still be called even if the corresponding feature is not
// specified in the return value here. See feature_flags_t for possible
// flags to return.
virtual boost::uint32_t implemented_features() { return 0; }
// this is called by the session every time a new torrent is added. // this is called by the session every time a new torrent is added.
// The ``torrent*`` points to the internal torrent object created // The ``torrent*`` points to the internal torrent object created
// for the new torrent. The ``void*`` is the userdata pointer as // for the new torrent. The ``void*`` is the userdata pointer as
@ -237,12 +258,13 @@ namespace libtorrent
// called once per second // called once per second
virtual void on_tick() {} virtual void on_tick() {}
// called when choosing peers to optimistically unchoke // called when choosing peers to optimisticallly unchoke. peer's will be
// peer's will be unchoked in the order they appear in the given // unchoked in the order they appear in the given vector. if
// vector which is initially sorted by when they were last // the plugin returns true then the ordering provided will be used and no
// optimistically unchoked. // other plugin will be allowed to change it. If your plugin expects this
// if the plugin returns true then the ordering provided will be // to be called, make sure to include the flag
// used and no other plugin will be allowed to change it. // ``optimistic_unchoke_feature`` in the return value from
// implemented_features().
virtual bool on_optimistic_unchoke(std::vector<peer_connection_handle>& /* peers */) virtual bool on_optimistic_unchoke(std::vector<peer_connection_handle>& /* peers */)
{ return false; } { return false; }

View File

@ -50,6 +50,7 @@ struct crypto_plugin;
typedef boost::system::error_code error_code; typedef boost::system::error_code error_code;
// hidden
struct TORRENT_EXPORT peer_connection_handle struct TORRENT_EXPORT peer_connection_handle
{ {
peer_connection_handle(boost::weak_ptr<peer_connection> impl) peer_connection_handle(boost::weak_ptr<peer_connection> impl)
@ -113,11 +114,11 @@ struct TORRENT_EXPORT peer_connection_handle
time_point time_of_last_unchoke() const; time_point time_of_last_unchoke() const;
bool operator==(peer_connection_handle const& o) const bool operator==(peer_connection_handle const& o) const
{ return m_connection.lock() == o.m_connection.lock(); } { return !(m_connection < o.m_connection) && !(o.m_connection < m_connection); }
bool operator!=(peer_connection_handle const& o) const bool operator!=(peer_connection_handle const& o) const
{ return m_connection.lock() != o.m_connection.lock(); } { return m_connection < o.m_connection || o.m_connection < m_connection; }
bool operator<(peer_connection_handle const& o) const bool operator<(peer_connection_handle const& o) const
{ return m_connection.lock() < o.m_connection.lock(); } { return m_connection < o.m_connection; }
boost::shared_ptr<peer_connection> native_handle() const boost::shared_ptr<peer_connection> native_handle() const
{ {

View File

@ -23,6 +23,7 @@ project
; ;
alias libtorrent-sims : alias libtorrent-sims :
[ run test_optimistic_unchoke.cpp ]
[ run test_transfer.cpp ] [ run test_transfer.cpp ]
[ run test_http_connection.cpp ] [ run test_http_connection.cpp ]
[ run test_auto_manage.cpp ] [ run test_auto_manage.cpp ]

View File

@ -236,7 +236,7 @@ void setup_swarm(int num_nodes
, std::function<void(lt::alert const*, lt::session&)> on_alert , std::function<void(lt::alert const*, lt::session&)> on_alert
, std::function<int(int, lt::session&)> terminate) , std::function<int(int, lt::session&)> terminate)
{ {
asio::io_service ios(sim, addr("0.0.0.0")); asio::io_service ios(sim);
lt::time_point start_time(lt::clock_type::now()); lt::time_point start_time(lt::clock_type::now());
std::vector<boost::shared_ptr<lt::session> > nodes; std::vector<boost::shared_ptr<lt::session> > nodes;

View File

@ -0,0 +1,174 @@
/*
Copyright (c) 2016, Arvid Norberg
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in
the documentation and/or other materials provided with the distribution.
* Neither the name of the author nor the names of its
contributors may be used to endorse or promote products derived
from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
*/
#include "setup_swarm.hpp"
#include "test.hpp"
#include "create_torrent.hpp"
#include "bittorrent_peer.hpp"
#include "settings.hpp"
#include "print_alerts.hpp"
#include "libtorrent/alert.hpp"
#include "libtorrent/alert_types.hpp"
#include "libtorrent/session.hpp"
#include "libtorrent/session_stats.hpp"
#include "libtorrent/io_service.hpp"
#include "libtorrent/torrent_info.hpp"
#include "libtorrent/deadline_timer.hpp"
#include <boost/bind.hpp>
#include <boost/make_shared.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/ref.hpp>
struct choke_state
{
choke_state() : unchoke_duration(lt::seconds(0)), choked(true) {}
lt::time_duration unchoke_duration;
lt::time_point last_unchoke;
bool choked;
};
TORRENT_TEST(optimistic_unchoke)
{
int const num_nodes = 20;
lt::time_duration const test_duration = libtorrent::seconds(1201);
dsl_config network_cfg;
sim::simulation sim{network_cfg};
io_service ios(sim, addr("50.1.0.0"));
lt::time_point start_time(lt::clock_type::now());
libtorrent::add_torrent_params atp = create_torrent(0);
atp.flags &= ~add_torrent_params::flag_auto_managed;
atp.flags &= ~add_torrent_params::flag_paused;
lt::settings_pack pack = settings();
// only allow an optimistic unchoke slot
pack.set_int(settings_pack::unchoke_slots_limit, 1);
pack.set_int(settings_pack::num_optimistic_unchoke_slots, 1);
std::vector<choke_state> peer_choke_state(num_nodes);
session_proxy proxy;
boost::shared_ptr<lt::session> ses = boost::make_shared<lt::session>(
boost::ref(pack), boost::ref(ios));
ses->async_add_torrent(atp);
std::vector<boost::shared_ptr<sim::asio::io_service> > io_service;
std::vector<boost::shared_ptr<peer_conn> > peers;
ses->set_alert_notify([&]() {
// this function is called inside libtorrent and we cannot perform work
// immediately in it. We have to notify the outside to pull all the alerts
ios.post(boost::bind(&print_alerts, ses.get(), start_time));
});
lt::deadline_timer timer(ios);
timer.expires_from_now(libtorrent::seconds(2));
timer.async_wait([&](error_code const& ec)
{
for (int i = 0; i < num_nodes; ++i)
{
// create a new io_service
char ep[30];
snprintf(ep, sizeof(ep), "50.0.%d.%d", (i + 1) >> 8, (i + 1) & 0xff);
io_service.push_back(boost::make_shared<sim::asio::io_service>(
boost::ref(sim), addr(ep)));
peers.push_back(boost::make_shared<peer_conn>(boost::ref(*io_service.back())
, [&,i](int msg, char const* bug, int len)
{
choke_state& cs = peer_choke_state[i];
if (msg == 0)
{
// choke
if (!cs.choked)
{
cs.choked = true;
cs.unchoke_duration += lt::clock_type::now() - cs.last_unchoke;
}
}
else if (msg == 1)
{
// unchoke
if (cs.choked)
{
cs.choked = false;
cs.last_unchoke = lt::clock_type::now();
}
}
else
{
return;
}
char const* msg_str[] = {"choke", "unchoke"};
lt::time_duration d = lt::clock_type::now() - start_time;
boost::uint32_t millis = lt::duration_cast<lt::milliseconds>(d).count();
printf("\x1b[35m%4d.%03d: [%d] %s (%d ms)\x1b[0m\n"
, millis / 1000, millis % 1000, i, msg_str[msg]
, int(lt::duration_cast<lt::milliseconds>(cs.unchoke_duration).count()));
}
, *atp.ti
, tcp::endpoint(addr("50.1.0.0"), 6881)
, peer_conn::idle));
}
});
lt::deadline_timer end_timer(ios);
timer.expires_from_now(test_duration);
timer.async_wait([&](error_code const& ec)
{
for (auto& p : peers)
{
p->abort();
}
proxy = ses->abort();
ses.reset();
});
sim.run();
boost::int64_t const duration_ms = lt::duration_cast<lt::milliseconds>(test_duration).count();
boost::int64_t const average_unchoke_time = duration_ms / num_nodes;
printf("EXPECT: %" PRId64 " ms\n", average_unchoke_time);
for (auto const& cs : peer_choke_state)
{
boost::int64_t unchoke_duration = lt::duration_cast<lt::milliseconds>(cs.unchoke_duration).count();
printf("%" PRId64 " ms\n", unchoke_duration);
TEST_CHECK(std::abs(unchoke_duration - average_unchoke_time) < 1000);
}
}

View File

@ -219,7 +219,7 @@ namespace libtorrent { namespace
m_torrent.set_progress_ppm(boost::int64_t(m_metadata_progress) * 1000000 / m_metadata_size); m_torrent.set_progress_ppm(boost::int64_t(m_metadata_progress) * 1000000 / m_metadata_size);
} }
void on_piece_pass(int) void on_piece_pass(int) TORRENT_OVERRIDE
{ {
// if we became a seed, copy the metadata from // if we became a seed, copy the metadata from
// the torrent before it is deallocated // the torrent before it is deallocated

View File

@ -1658,8 +1658,8 @@ namespace libtorrent
m_peer_interested = true; m_peer_interested = true;
if (is_disconnecting()) return; if (is_disconnecting()) return;
// if the peer is ready to download stuff, it must have metadata // if the peer is ready to download stuff, it must have metadata
m_has_metadata = true; m_has_metadata = true;
disconnect_if_redundant(); disconnect_if_redundant();
@ -4619,10 +4619,14 @@ namespace libtorrent
return; return;
} }
int download_rate = statistics().download_payload_rate(); #ifndef TORRENT_DISABLE_LOGGING
int const previous_queue_size = m_desired_queue_size;
#endif
int const download_rate = statistics().download_payload_rate();
// the desired download queue size // the desired download queue size
const int queue_time = m_settings.get_int(settings_pack::request_queue_time); int const queue_time = m_settings.get_int(settings_pack::request_queue_time);
// when we're in slow-start mode we increase the desired queue size every // when we're in slow-start mode we increase the desired queue size every
// time we receive a piece, no need to adjust it here (other than // time we receive a piece, no need to adjust it here (other than
@ -4636,7 +4640,7 @@ namespace libtorrent
// the block size doesn't have to be 16. So we first query the // the block size doesn't have to be 16. So we first query the
// torrent for it // torrent for it
boost::shared_ptr<torrent> t = m_torrent.lock(); boost::shared_ptr<torrent> t = m_torrent.lock();
const int block_size = t->block_size(); int const block_size = t->block_size();
TORRENT_ASSERT(block_size > 0); TORRENT_ASSERT(block_size > 0);
@ -4649,10 +4653,13 @@ namespace libtorrent
m_desired_queue_size = min_request_queue; m_desired_queue_size = min_request_queue;
#ifndef TORRENT_DISABLE_LOGGING #ifndef TORRENT_DISABLE_LOGGING
peer_log(peer_log_alert::info, "UPDATE_QUEUE_SIZE" if (previous_queue_size != m_desired_queue_size)
, "dqs: %d max: %d dl: %d qt: %d snubbed: %d slow-start: %d" {
, m_desired_queue_size, m_max_out_request_queue peer_log(peer_log_alert::info, "UPDATE_QUEUE_SIZE"
, download_rate, queue_time, int(m_snubbed), int(m_slow_start)); , "dqs: %d max: %d dl: %d qt: %d snubbed: %d slow-start: %d"
, m_desired_queue_size, m_max_out_request_queue
, download_rate, queue_time, int(m_snubbed), int(m_slow_start));
}
#endif #endif
} }
@ -5092,7 +5099,7 @@ namespace libtorrent
bool sent_a_piece = false; bool sent_a_piece = false;
boost::shared_ptr<torrent> t = m_torrent.lock(); boost::shared_ptr<torrent> t = m_torrent.lock();
if (!t || t->is_aborted()) return; if (!t || t->is_aborted() || m_requests.empty()) return;
// only add new piece-chunks if the send buffer is small enough // only add new piece-chunks if the send buffer is small enough
// otherwise there will be no end to how large it will be! // otherwise there will be no end to how large it will be!
@ -6681,9 +6688,10 @@ namespace libtorrent
TORRENT_ASSERT(t->torrent_file().num_pieces() == int(m_have_piece.size())); TORRENT_ASSERT(t->torrent_file().num_pieces() == int(m_have_piece.size()));
// in share mode we don't close redundant connections // in share mode we don't close redundant connections
if (m_settings.get_bool(settings_pack::close_redundant_connections) && !t->share_mode()) if (m_settings.get_bool(settings_pack::close_redundant_connections)
&& !t->share_mode())
{ {
bool ok_to_disconnect = bool const ok_to_disconnect =
can_disconnect(error_code(errors::upload_upload_connection)) can_disconnect(error_code(errors::upload_upload_connection))
|| can_disconnect(error_code(errors::uninteresting_upload_peer)) || can_disconnect(error_code(errors::uninteresting_upload_peer))
|| can_disconnect(error_code(errors::too_many_requests_when_choked)) || can_disconnect(error_code(errors::too_many_requests_when_choked))

View File

@ -430,6 +430,9 @@ namespace aux {
, m_download_connect_attempts(0) , m_download_connect_attempts(0)
, m_next_scrape_torrent(0) , m_next_scrape_torrent(0)
, m_tick_residual(0) , m_tick_residual(0)
#ifndef TORRENT_DISABLE_EXTENSIONS
, m_session_extension_features(0)
#endif
, m_deferred_submit_disk_jobs(false) , m_deferred_submit_disk_jobs(false)
, m_pending_auto_manage(false) , m_pending_auto_manage(false)
, m_need_auto_manage(false) , m_need_auto_manage(false)
@ -921,6 +924,7 @@ namespace aux {
boost::shared_ptr<plugin> p(new session_plugin_wrapper(ext)); boost::shared_ptr<plugin> p(new session_plugin_wrapper(ext));
m_ses_extensions.push_back(p); m_ses_extensions.push_back(p);
m_session_extension_features |= p->implemented_features();
} }
void session_impl::add_ses_extension(boost::shared_ptr<plugin> ext) void session_impl::add_ses_extension(boost::shared_ptr<plugin> ext)
@ -931,6 +935,7 @@ namespace aux {
m_ses_extensions.push_back(ext); m_ses_extensions.push_back(ext);
m_alerts.add_extension(ext); m_alerts.add_extension(ext);
ext->added(session_handle(this)); ext->added(session_handle(this));
m_session_extension_features |= ext->implemented_features();
// get any DHT queries the plugin would like to handle // get any DHT queries the plugin would like to handle
// and record them in m_extension_dht_queries for lookup // and record them in m_extension_dht_queries for lookup
@ -942,7 +947,7 @@ namespace aux {
{ {
TORRENT_ASSERT(e->first.size() <= max_dht_query_length); TORRENT_ASSERT(e->first.size() <= max_dht_query_length);
if (e->first.size() > max_dht_query_length) continue; if (e->first.size() > max_dht_query_length) continue;
extention_dht_query registration; extension_dht_query registration;
registration.query_len = e->first.size(); registration.query_len = e->first.size();
std::copy(e->first.begin(), e->first.end(), registration.query.begin()); std::copy(e->first.begin(), e->first.end(), registration.query.begin());
registration.handler = e->second; registration.handler = e->second;
@ -2021,7 +2026,7 @@ retry:
m_listen_interface.address().to_string() m_listen_interface.address().to_string()
, m_listen_interface.port() , m_listen_interface.port()
, listen_failed_alert::bind , listen_failed_alert::bind
, ec, listen_failed_alert::udp); , ec, listen_failed_alert::tcp);
return; return;
} }
@ -3027,8 +3032,8 @@ retry:
m_last_second_tick = now; m_last_second_tick = now;
m_tick_residual += tick_interval_ms - 1000; m_tick_residual += tick_interval_ms - 1000;
boost::int64_t session_time = total_seconds(now - m_created); boost::int64_t const stime = session_time();
if (session_time > 65000) if (stime > 65000)
{ {
// we're getting close to the point where our timestamps // we're getting close to the point where our timestamps
// in torrent_peer are wrapping. We need to step all counters back // in torrent_peer are wrapping. We need to step all counters back
@ -3047,12 +3052,15 @@ retry:
} }
#ifndef TORRENT_DISABLE_EXTENSIONS #ifndef TORRENT_DISABLE_EXTENSIONS
for (ses_extension_list_t::const_iterator i = m_ses_extensions.begin() if (m_session_extension_features & plugin::tick_feature)
, end(m_ses_extensions.end()); i != end; ++i)
{ {
TORRENT_TRY { for (ses_extension_list_t::const_iterator i = m_ses_extensions.begin()
(*i)->on_tick(); , end(m_ses_extensions.end()); i != end; ++i)
} TORRENT_CATCH(std::exception&) {} {
TORRENT_TRY {
(*i)->on_tick();
} TORRENT_CATCH(std::exception&) {}
}
} }
#endif #endif
@ -3764,24 +3772,32 @@ retry:
} }
namespace { namespace {
struct last_optimistic_unchoke_cmp bool last_optimistic_unchoke_cmp(torrent_peer const* const l
, torrent_peer const* const r)
{ {
bool operator()(peer_connection_handle const& l return l->last_optimistically_unchoked
, peer_connection_handle const& r) < r->last_optimistically_unchoked;
{ }
return l.native_handle()->peer_info_struct()->last_optimistically_unchoked
< r.native_handle()->peer_info_struct()->last_optimistically_unchoked;
}
};
} }
void session_impl::recalculate_optimistic_unchoke_slots() void session_impl::recalculate_optimistic_unchoke_slots()
{ {
INVARIANT_CHECK;
TORRENT_ASSERT(is_single_thread()); TORRENT_ASSERT(is_single_thread());
if (m_stats_counters[counters::num_unchoke_slots] == 0) return; if (m_stats_counters[counters::num_unchoke_slots] == 0) return;
std::vector<peer_connection_handle> opt_unchoke; std::vector<torrent_peer*> opt_unchoke;
// collect the currently optimistically unchoked peers here, so we can
// choke them when we've found new optimistic unchoke candidates.
std::vector<torrent_peer*> prev_opt_unchoke;
// TODO: 3 it would probably make sense to have a separate list of peers
// that are eligible for optimistic unchoke, similar to the torrents
// perhaps this could even iterate over the pool allocators of
// torrent_peer objects. It could probably be done in a single pass and
// collect the n best candidates
for (connection_map::iterator i = m_connections.begin() for (connection_map::iterator i = m_connections.begin()
, end(m_connections.end()); i != end; ++i) , end(m_connections.end()); i != end; ++i)
{ {
@ -3790,90 +3806,139 @@ retry:
torrent_peer* pi = p->peer_info_struct(); torrent_peer* pi = p->peer_info_struct();
if (!pi) continue; if (!pi) continue;
if (pi->web_seed) continue; if (pi->web_seed) continue;
torrent* t = p->associated_torrent().lock().get();
if (!t) continue;
if (t->is_paused()) continue;
if (pi->optimistically_unchoked) if (pi->optimistically_unchoked)
{ {
TORRENT_ASSERT(!p->is_choked()); prev_opt_unchoke.push_back(pi);
opt_unchoke.push_back(peer_connection_handle(*i));
} }
torrent* t = p->associated_torrent().lock().get();
if (!t) continue;
// TODO: 3 peers should know whether their torrent is paused or not,
// instead of having to ask it over and over again
if (t->is_paused()) continue;
if (!p->is_connecting() if (!p->is_connecting()
&& !p->is_disconnecting() && !p->is_disconnecting()
&& p->is_peer_interested() && p->is_peer_interested()
&& t->free_upload_slots() && t->free_upload_slots()
&& p->is_choked() && (p->is_choked() || pi->optimistically_unchoked)
&& !p->ignore_unchoke_slots() && !p->ignore_unchoke_slots()
&& t->valid_metadata()) && t->valid_metadata())
{ {
opt_unchoke.push_back(peer_connection_handle(*i)); opt_unchoke.push_back(pi);
} }
} }
// find the peers that has been waiting the longest to be optimistically // find the peers that has been waiting the longest to be optimistically
// unchoked // unchoked
// avoid having a bias towards peers that happen to be sorted first int num_opt_unchoke = m_settings.get_int(settings_pack::num_optimistic_unchoke_slots);
std::random_shuffle(opt_unchoke.begin(), opt_unchoke.end(), randint); int const allowed_unchoke_slots = m_stats_counters[counters::num_unchoke_slots];
if (num_opt_unchoke == 0) num_opt_unchoke = (std::max)(1, allowed_unchoke_slots / 5);
if (num_opt_unchoke > int(opt_unchoke.size())) num_opt_unchoke =
int(opt_unchoke.size());
// sort all candidates based on when they were last optimistically // find the n best optimistic unchoke candidates
// unchoked. std::partial_sort(opt_unchoke.begin()
std::sort(opt_unchoke.begin(), opt_unchoke.end(), last_optimistic_unchoke_cmp()); , opt_unchoke.begin() + num_opt_unchoke
, opt_unchoke.end(), &last_optimistic_unchoke_cmp);
#ifndef TORRENT_DISABLE_EXTENSIONS #ifndef TORRENT_DISABLE_EXTENSIONS
for (ses_extension_list_t::iterator i = m_ses_extensions.begin() if (m_session_extension_features & plugin::optimistic_unchoke_feature)
, end(m_ses_extensions.end()); i != end; ++i)
{ {
if ((*i)->on_optimistic_unchoke(opt_unchoke)) // if there is an extension that wants to reorder the optimistic
break; // unchoke peers, first convert the vector into one containing
// peer_connection_handles, since that's the exported API
std::vector<peer_connection_handle> peers;
peers.reserve(opt_unchoke.size());
for (std::vector<torrent_peer*>::iterator i = opt_unchoke.begin()
, end(opt_unchoke.end()); i != end; ++i)
{
peers.push_back(peer_connection_handle(static_cast<peer_connection*>((*i)->connection)->self()));
}
for (ses_extension_list_t::iterator i = m_ses_extensions.begin()
, end(m_ses_extensions.end()); i != end; ++i)
{
if ((*i)->on_optimistic_unchoke(peers))
break;
}
// then convert back to the internal torrent_peer pointers
opt_unchoke.clear();
for (std::vector<peer_connection_handle>::iterator i = peers.begin()
, end(peers.end()); i != end; ++i)
{
opt_unchoke.push_back(i->native_handle()->peer_info_struct());
}
} }
#endif #endif
int num_opt_unchoke = m_settings.get_int(settings_pack::num_optimistic_unchoke_slots);
int allowed_unchoke_slots = m_stats_counters[counters::num_unchoke_slots];
if (num_opt_unchoke == 0) num_opt_unchoke = (std::max)(1, allowed_unchoke_slots / 5);
// unchoke the first num_opt_unchoke peers in the candidate set // unchoke the first num_opt_unchoke peers in the candidate set
// and make sure that the others are choked // and make sure that the others are choked
for (std::vector<peer_connection_handle>::iterator i = opt_unchoke.begin() std::vector<torrent_peer*>::iterator opt_unchoke_end = opt_unchoke.begin()
, end(opt_unchoke.end()); i != end; ++i) + num_opt_unchoke;
for (std::vector<torrent_peer*>::iterator i = opt_unchoke.begin();
i != opt_unchoke_end; ++i)
{ {
torrent_peer* pi = i->native_handle()->peer_info_struct(); torrent_peer* pi = *i;
if (num_opt_unchoke > 0) peer_connection* p = static_cast<peer_connection*>(pi->connection);
if (pi->optimistically_unchoked)
{ {
--num_opt_unchoke; #ifndef TORRENT_DISABLE_LOGGING
if (!pi->optimistically_unchoked) p->peer_log(peer_log_alert::info, "OPTIMISTIC UNCHOKE"
{ , "already unchoked | session-time: %d"
peer_connection* p = static_cast<peer_connection*>(pi->connection); , pi->last_optimistically_unchoked);
torrent* t = p->associated_torrent().lock().get(); #endif
bool ret = t->unchoke_peer(*p, true); TORRENT_ASSERT(!pi->connection->is_choked());
if (ret) // remove this peer from prev_opt_unchoke, to prevent us from
{ // choking it later. This peer gets another round of optimistic
pi->optimistically_unchoked = true; // unchoke
m_stats_counters.inc_stats_counter(counters::num_peers_up_unchoked_optimistic); std::vector<torrent_peer*>::iterator existing =
pi->last_optimistically_unchoked = boost::uint16_t(session_time()); std::find(prev_opt_unchoke.begin(), prev_opt_unchoke.end(), pi);
} TORRENT_ASSERT(existing != prev_opt_unchoke.end());
else prev_opt_unchoke.erase(existing);
{
// we failed to unchoke it, increment the count again
++num_opt_unchoke;
}
}
} }
else else
{ {
if (pi->optimistically_unchoked) TORRENT_ASSERT(p->is_choked());
boost::shared_ptr<torrent> t = p->associated_torrent().lock();
bool ret = t->unchoke_peer(*p, true);
TORRENT_ASSERT(ret);
if (ret)
{ {
peer_connection* p = static_cast<peer_connection*>(pi->connection); pi->optimistically_unchoked = true;
torrent* t = p->associated_torrent().lock().get(); m_stats_counters.inc_stats_counter(counters::num_peers_up_unchoked_optimistic);
pi->optimistically_unchoked = false; pi->last_optimistically_unchoked = boost::uint16_t(session_time());
m_stats_counters.inc_stats_counter(counters::num_peers_up_unchoked_optimistic, -1); #ifndef TORRENT_DISABLE_LOGGING
t->choke_peer(*p); p->peer_log(peer_log_alert::info, "OPTIMISTIC UNCHOKE"
, "session-time: %d", pi->last_optimistically_unchoked);
#endif
} }
} }
} }
// now, choke all the previous optimistically unchoked peers
for (std::vector<torrent_peer*>::iterator i = prev_opt_unchoke.begin()
, end(prev_opt_unchoke.end()); i != end; ++i)
{
torrent_peer* pi = *i;
TORRENT_ASSERT(pi->optimistically_unchoked);
peer_connection* p = static_cast<peer_connection*>(pi->connection);
boost::shared_ptr<torrent> t = p->associated_torrent().lock();
pi->optimistically_unchoked = false;
m_stats_counters.inc_stats_counter(counters::num_peers_up_unchoked_optimistic, -1);
t->choke_peer(*p);
}
// if we have too many unchoked peers now, we need to trigger the regular
// choking logic to choke some
if (m_stats_counters[counters::num_unchoke_slots]
< m_stats_counters[counters::num_peers_up_unchoked_all])
{
m_unchoke_time_scaler = 0;
}
} }
void session_impl::try_connect_more_peers() void session_impl::try_connect_more_peers()
@ -4027,6 +4092,8 @@ retry:
// build list of all peers that are // build list of all peers that are
// unchokable. // unchokable.
// TODO: 3 there should be a pre-calculated list of all peers eligible for
// unchoking
std::vector<peer_connection*> peers; std::vector<peer_connection*> peers;
for (connection_map::iterator i = m_connections.begin(); for (connection_map::iterator i = m_connections.begin();
i != m_connections.end();) i != m_connections.end();)
@ -4088,7 +4155,7 @@ retry:
, performance_alert::bittyrant_with_no_uplimit); , performance_alert::bittyrant_with_no_uplimit);
} }
int allowed_upload_slots = unchoke_sort(peers, max_upload_rate int const allowed_upload_slots = unchoke_sort(peers, max_upload_rate
, unchoke_interval, m_settings); , unchoke_interval, m_settings);
m_stats_counters.set_value(counters::num_unchoke_slots m_stats_counters.set_value(counters::num_unchoke_slots
, allowed_upload_slots); , allowed_upload_slots);
@ -4097,7 +4164,8 @@ retry:
session_log("RECALCULATE UNCHOKE SLOTS: [ peers: %d " session_log("RECALCULATE UNCHOKE SLOTS: [ peers: %d "
"eligible-peers: %d" "eligible-peers: %d"
" max_upload_rate: %d" " max_upload_rate: %d"
" allowed-slots: %d ]", int(m_connections.size()) " allowed-slots: %d ]"
, int(m_connections.size())
, int(peers.size()) , int(peers.size())
, max_upload_rate , max_upload_rate
, allowed_upload_slots); , allowed_upload_slots);
@ -4105,9 +4173,7 @@ retry:
int num_opt_unchoke = m_settings.get_int(settings_pack::num_optimistic_unchoke_slots); int num_opt_unchoke = m_settings.get_int(settings_pack::num_optimistic_unchoke_slots);
if (num_opt_unchoke == 0) num_opt_unchoke = (std::max)(1, allowed_upload_slots / 5); if (num_opt_unchoke == 0) num_opt_unchoke = (std::max)(1, allowed_upload_slots / 5);
int unchoke_set_size = allowed_upload_slots - num_opt_unchoke;
// reserve some upload slots for optimistic unchokes
int unchoke_set_size = allowed_upload_slots;
// go through all the peers and unchoke the first ones and choke // go through all the peers and unchoke the first ones and choke
// all the other ones. // all the other ones.

View File

@ -284,7 +284,7 @@ namespace libtorrent
return ""; return "";
} }
#endif #endif
libtorrent::address torrent_peer::address() const libtorrent::address torrent_peer::address() const
{ {
#if TORRENT_USE_IPV6 #if TORRENT_USE_IPV6

View File

@ -167,7 +167,7 @@ namespace libtorrent { namespace
m_torrent.set_progress_ppm(boost::int64_t(m_metadata_progress) * 1000000 / m_metadata_size); m_torrent.set_progress_ppm(boost::int64_t(m_metadata_progress) * 1000000 / m_metadata_size);
} }
*/ */
void on_piece_pass(int) void on_piece_pass(int) TORRENT_OVERRIDE
{ {
// if we became a seed, copy the metadata from // if we became a seed, copy the metadata from
// the torrent before it is deallocated // the torrent before it is deallocated

View File

@ -56,6 +56,8 @@ lib libtorrent_test
dht_server.cpp dht_server.cpp
udp_tracker.cpp udp_tracker.cpp
peer_server.cpp peer_server.cpp
bittorrent_peer.cpp
print_alerts.cpp
web_seed_suite.cpp web_seed_suite.cpp
swarm_suite.cpp swarm_suite.cpp
test_utils.cpp test_utils.cpp

562
test/bittorrent_peer.cpp Normal file
View File

@ -0,0 +1,562 @@
/*
Copyright (c) 2016, Arvid Norberg
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in
the documentation and/or other materials provided with the distribution.
* Neither the name of the author nor the names of its
contributors may be used to endorse or promote products derived
from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
*/
#include "libtorrent/socket.hpp"
#include "libtorrent/sha1_hash.hpp"
#include "libtorrent/address.hpp"
#include "libtorrent/assert.hpp"
#include "bittorrent_peer.hpp"
#include "libtorrent/torrent_info.hpp"
#include "libtorrent/io_service.hpp"
#include "libtorrent/io.hpp"
#include <cstdlib>
#include <boost/bind.hpp>
using namespace libtorrent;
peer_conn::peer_conn(io_service& ios
, boost::function<void(int, char const*, int)> on_msg
, torrent_info const& ti
, tcp::endpoint const& ep
, peer_mode_t mode)
: s(ios)
, m_mode(mode)
, m_ti(ti)
, read_pos(0)
, m_on_msg(on_msg)
, state(handshaking)
, choked(true)
, current_piece(-1)
, m_current_piece_is_allowed(false)
, block(0)
, m_blocks_per_piece((m_ti.piece_length() + 0x3fff) / 0x4000)
, outstanding_requests(0)
, fast_extension(false)
, blocks_received(0)
, blocks_sent(0)
, start_time(clock_type::now())
, endpoint(ep)
, restarting(false)
{
pieces.reserve(m_ti.num_pieces());
start_conn();
}
void peer_conn::start_conn()
{
restarting = false;
s.async_connect(endpoint, boost::bind(&peer_conn::on_connect, this, _1));
}
void peer_conn::on_connect(error_code const& ec)
{
if (ec)
{
close("ERROR CONNECT: %s", ec);
return;
}
char handshake[] = "\x13" "BitTorrent protocol\0\0\0\0\0\0\0\x04"
" " // space for info-hash
"aaaaaaaaaaaaaaaaaaaa" // peer-id
"\0\0\0\x01\x02"; // interested
char* h = (char*)malloc(sizeof(handshake));
memcpy(h, handshake, sizeof(handshake));
std::memcpy(h + 28, m_ti.info_hash().data(), 20);
std::generate(h + 48, h + 68, &rand);
// for seeds, don't send the interested message
boost::asio::async_write(s, boost::asio::buffer(h, (sizeof(handshake) - 1)
- (m_mode == uploader ? 5 : 0))
, boost::bind(&peer_conn::on_handshake, this, h, _1, _2));
}
void peer_conn::on_handshake(char* h, error_code const& ec, size_t bytes_transferred)
{
free(h);
if (ec)
{
close("ERROR SEND HANDSHAKE: %s", ec);
return;
}
// read handshake
boost::asio::async_read(s, boost::asio::buffer((char*)buffer, 68)
, boost::bind(&peer_conn::on_handshake2, this, _1, _2));
}
void peer_conn::on_handshake2(error_code const& ec, size_t bytes_transferred)
{
if (ec)
{
close("ERROR READ HANDSHAKE: %s", ec);
return;
}
// buffer is the full 68 byte handshake
// look at the extension bits
fast_extension = ((char*)buffer)[27] & 4;
if (m_mode == uploader)
{
write_have_all();
}
else
{
work_download();
}
}
void peer_conn::write_have_all()
{
using namespace libtorrent::detail;
if (fast_extension)
{
char* ptr = write_buf_proto;
// have_all
write_uint32(1, ptr);
write_uint8(0xe, ptr);
// unchoke
write_uint32(1, ptr);
write_uint8(1, ptr);
error_code ec;
boost::asio::async_write(s, boost::asio::buffer(write_buf_proto, ptr - write_buf_proto)
, boost::bind(&peer_conn::on_have_all_sent, this, _1, _2));
}
else
{
// bitfield
int len = (m_ti.num_pieces() + 7) / 8;
char* ptr = (char*)buffer;
write_uint32(len + 1, ptr);
write_uint8(5, ptr);
memset(ptr, 255, len);
ptr += len;
// unchoke
write_uint32(1, ptr);
write_uint8(1, ptr);
error_code ec;
boost::asio::async_write(s, boost::asio::buffer((char*)buffer, len + 10)
, boost::bind(&peer_conn::on_have_all_sent, this, _1, _2));
}
}
void peer_conn::on_have_all_sent(error_code const& ec, size_t bytes_transferred)
{
if (ec)
{
close("ERROR SEND HAVE ALL: %s", ec);
return;
}
// read message
boost::asio::async_read(s, boost::asio::buffer((char*)buffer, 4)
, boost::bind(&peer_conn::on_msg_length, this, _1, _2));
}
bool peer_conn::write_request()
{
using namespace libtorrent::detail;
// if we're choked (and there are no allowed-fast pieces left)
if (choked && allowed_fast.empty() && !m_current_piece_is_allowed) return false;
// if there are no pieces left to request
if (pieces.empty() && suggested_pieces.empty() && current_piece == -1) return false;
if (current_piece == -1)
{
// pick a new piece
if (choked && allowed_fast.size() > 0)
{
current_piece = allowed_fast.front();
allowed_fast.erase(allowed_fast.begin());
m_current_piece_is_allowed = true;
}
else if (suggested_pieces.size() > 0)
{
current_piece = suggested_pieces.front();
suggested_pieces.erase(suggested_pieces.begin());
m_current_piece_is_allowed = false;
}
else if (pieces.size() > 0)
{
current_piece = pieces.front();
pieces.erase(pieces.begin());
m_current_piece_is_allowed = false;
}
else
{
TORRENT_ASSERT(false);
}
}
char msg[] = "\0\0\0\xd\x06"
" " // piece
" " // offset
" "; // length
char* m = (char*)malloc(sizeof(msg));
memcpy(m, msg, sizeof(msg));
char* ptr = m + 5;
write_uint32(current_piece, ptr);
write_uint32(block * 16 * 1024, ptr);
write_uint32(16 * 1024, ptr);
error_code ec;
boost::asio::async_write(s, boost::asio::buffer(m, sizeof(msg) - 1)
, boost::bind(&peer_conn::on_req_sent, this, m, _1, _2));
++outstanding_requests;
++block;
if (block == m_blocks_per_piece)
{
block = 0;
current_piece = -1;
m_current_piece_is_allowed = false;
}
return true;
}
void peer_conn::on_req_sent(char* m, error_code const& ec, size_t bytes_transferred)
{
free(m);
if (ec)
{
close("ERROR SEND REQUEST: %s", ec);
return;
}
work_download();
}
void peer_conn::close(char const* fmt, error_code const& ec)
{
end_time = clock_type::now();
char tmp[1024];
snprintf(tmp, sizeof(tmp), fmt, ec.message().c_str());
int time = total_milliseconds(end_time - start_time);
if (time == 0) time = 1;
float up = (boost::int64_t(blocks_sent) * 0x4000) / time / 1000.f;
float down = (boost::int64_t(blocks_received) * 0x4000) / time / 1000.f;
error_code e;
char ep_str[200];
address const& addr = s.local_endpoint(e).address();
#if TORRENT_USE_IPV6
if (addr.is_v6())
snprintf(ep_str, sizeof(ep_str), "[%s]:%d", addr.to_string(e).c_str()
, s.local_endpoint(e).port());
else
#endif
snprintf(ep_str, sizeof(ep_str), "%s:%d", addr.to_string(e).c_str()
, s.local_endpoint(e).port());
printf("%s ep: %s sent: %d received: %d duration: %d ms up: %.1fMB/s down: %.1fMB/s\n"
, tmp, ep_str, blocks_sent, blocks_received, time, up, down);
}
void peer_conn::work_download()
{
if (pieces.empty()
&& suggested_pieces.empty()
&& current_piece == -1
&& outstanding_requests == 0
&& blocks_received >= m_ti.num_pieces() * m_blocks_per_piece)
{
close("COMPLETED DOWNLOAD", error_code());
return;
}
// send requests
if (outstanding_requests < 40)
{
if (write_request()) return;
}
// read message
boost::asio::async_read(s, boost::asio::buffer((char*)buffer, 4)
, boost::bind(&peer_conn::on_msg_length, this, _1, _2));
}
void peer_conn::on_msg_length(error_code const& ec, size_t bytes_transferred)
{
using namespace libtorrent::detail;
if ((ec == boost::asio::error::operation_aborted || ec == boost::asio::error::bad_descriptor)
&& restarting)
{
start_conn();
return;
}
if (ec)
{
close("ERROR RECEIVE MESSAGE PREFIX: %s", ec);
return;
}
char* ptr = (char*)buffer;
unsigned int length = read_uint32(ptr);
if (length > sizeof(buffer))
{
fprintf(stderr, "len: %d\n", length);
close("ERROR RECEIVE MESSAGE PREFIX: packet too big", error_code());
return;
}
if (length == 0)
{
// keep-alive messate. read another length prefix
boost::asio::async_read(s, boost::asio::buffer((char*)buffer, 4)
, boost::bind(&peer_conn::on_msg_length, this, _1, _2));
}
else
{
boost::asio::async_read(s, boost::asio::buffer((char*)buffer, length)
, boost::bind(&peer_conn::on_message, this, _1, _2));
}
}
void peer_conn::on_message(error_code const& ec, size_t bytes_transferred)
{
using namespace libtorrent::detail;
if ((ec == boost::asio::error::operation_aborted || ec == boost::asio::error::bad_descriptor)
&& restarting)
{
start_conn();
return;
}
if (ec)
{
close("ERROR RECEIVE MESSAGE: %s", ec);
return;
}
char* ptr = (char*)buffer;
int msg = read_uint8(ptr);
m_on_msg(msg, ptr, bytes_transferred);
switch (m_mode)
{
case peer_conn::uploader:
if (msg == 6)
{
if (bytes_transferred != 13)
{
close("REQUEST packet has invalid size", error_code());
return;
}
int piece = detail::read_int32(ptr);
int start = detail::read_int32(ptr);
int length = detail::read_int32(ptr);
write_piece(piece, start, length);
}
else if (msg == 3) // not-interested
{
close("DONE", error_code());
return;
}
else
{
// read another message
boost::asio::async_read(s, boost::asio::buffer(buffer, 4)
, boost::bind(&peer_conn::on_msg_length, this, _1, _2));
}
break;
case peer_conn::downloader:
if (msg == 0xe) // have_all
{
// build a list of all pieces and request them all!
pieces.resize(m_ti.num_pieces());
for (int i = 0; i < int(pieces.size()); ++i)
pieces[i] = i;
std::random_shuffle(pieces.begin(), pieces.end());
}
else if (msg == 4) // have
{
int piece = detail::read_int32(ptr);
if (pieces.empty()) pieces.push_back(piece);
else pieces.insert(pieces.begin() + (rand() % pieces.size()), piece);
}
else if (msg == 5) // bitfield
{
pieces.reserve(m_ti.num_pieces());
int piece = 0;
for (int i = 0; i < int(bytes_transferred); ++i)
{
int mask = 0x80;
for (int k = 0; k < 8; ++k)
{
if (piece > m_ti.num_pieces()) break;
if (*ptr & mask) pieces.push_back(piece);
mask >>= 1;
++piece;
}
++ptr;
}
std::random_shuffle(pieces.begin(), pieces.end());
}
else if (msg == 7) // piece
{
/*
if (verify_downloads)
{
int piece = read_uint32(ptr);
int start = read_uint32(ptr);
int size = bytes_transferred - 9;
verify_piece(piece, start, ptr, size);
}
*/
++blocks_received;
--outstanding_requests;
int piece = detail::read_int32(ptr);
int start = detail::read_int32(ptr);
if (int((start + bytes_transferred) / 0x4000) == m_blocks_per_piece)
{
write_have(piece);
return;
}
}
else if (msg == 13) // suggest
{
int piece = detail::read_int32(ptr);
std::vector<int>::iterator i = std::find(pieces.begin(), pieces.end(), piece);
if (i != pieces.end())
{
pieces.erase(i);
suggested_pieces.push_back(piece);
}
}
else if (msg == 16) // reject request
{
int piece = detail::read_int32(ptr);
int start = detail::read_int32(ptr);
int length = detail::read_int32(ptr);
// put it back!
if (current_piece != piece)
{
if (pieces.empty() || pieces.back() != piece)
pieces.push_back(piece);
}
else
{
block = (std::min)(start / 0x4000, block);
if (block == 0)
{
pieces.push_back(current_piece);
current_piece = -1;
m_current_piece_is_allowed = false;
}
}
--outstanding_requests;
fprintf(stderr, "REJECT: [ piece: %d start: %d length: %d ]\n", piece, start, length);
}
else if (msg == 0) // choke
{
choked = true;
}
else if (msg == 1) // unchoke
{
choked = false;
}
else if (msg == 17) // allowed_fast
{
int piece = detail::read_int32(ptr);
std::vector<int>::iterator i = std::find(pieces.begin(), pieces.end(), piece);
if (i != pieces.end())
{
pieces.erase(i);
allowed_fast.push_back(piece);
}
}
work_download();
break;
case peer_conn::idle:
// read another message
boost::asio::async_read(s, boost::asio::buffer(buffer, 4)
, boost::bind(&peer_conn::on_msg_length, this, _1, _2));
break;
}
}
/*
bool peer_conn::verify_piece(int piece, int start, char const* ptr, int size)
{
boost::uint32_t* buf = (boost::uint32_t*)ptr;
boost::uint32_t fill = (piece << 8) | ((start / 0x4000) & 0xff);
for (int i = 0; i < size / 4; ++i)
{
if (buf[i] != fill)
{
fprintf(stderr, "received invalid block. piece %d block %d\n", piece, start / 0x4000);
exit(1);
return false;
}
}
return true;
}
*/
void peer_conn::write_piece(int piece, int start, int length)
{
using namespace libtorrent::detail;
// generate_block(write_buffer, piece, start, length);
char* ptr = write_buf_proto;
write_uint32(9 + length, ptr);
TORRENT_ASSERT(length == 0x4000);
write_uint8(7, ptr);
write_uint32(piece, ptr);
write_uint32(start, ptr);
boost::array<boost::asio::const_buffer, 2> vec;
vec[0] = boost::asio::buffer(write_buf_proto, ptr - write_buf_proto);
vec[1] = boost::asio::buffer(write_buffer, length);
boost::asio::async_write(s, vec, boost::bind(&peer_conn::on_have_all_sent, this, _1, _2));
++blocks_sent;
}
void peer_conn::write_have(int piece)
{
using namespace libtorrent::detail;
char* ptr = write_buf_proto;
write_uint32(5, ptr);
write_uint8(4, ptr);
write_uint32(piece, ptr);
boost::asio::async_write(s, boost::asio::buffer(write_buf_proto, 9), boost::bind(&peer_conn::on_have_all_sent, this, _1, _2));
}
void peer_conn::abort()
{
error_code ec;
s.close(ec);
}

118
test/bittorrent_peer.hpp Normal file
View File

@ -0,0 +1,118 @@
/*
Copyright (c) 2016, Arvid Norberg
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in
the documentation and/or other materials provided with the distribution.
* Neither the name of the author nor the names of its
contributors may be used to endorse or promote products derived
from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef BITTORRENT_PEER_HPP
#define BITTORRENT_PEER_HPP
#include "libtorrent/socket.hpp"
#include "libtorrent/sha1_hash.hpp"
#include "libtorrent/io_service.hpp"
#include "libtorrent/time.hpp"
#include "libtorrent/address.hpp"
#include "libtorrent/torrent_info.hpp"
#include "test.hpp" // for EXPORT
#include <boost/function.hpp>
using namespace libtorrent;
struct EXPORT peer_conn
{
enum peer_mode_t
{ uploader, downloader, idle };
peer_conn(io_service& ios
, boost::function<void(int, char const*, int)> on_msg
, libtorrent::torrent_info const& ti
, libtorrent::tcp::endpoint const& ep
, peer_mode_t mode);
void start_conn();
void on_connect(error_code const& ec);
void on_handshake(char* h, error_code const& ec, size_t bytes_transferred);
void on_handshake2(error_code const& ec, size_t bytes_transferred);
void write_have_all();
void on_have_all_sent(error_code const& ec, size_t bytes_transferred);
bool write_request();
void on_req_sent(char* m, error_code const& ec, size_t bytes_transferred);
void close(char const* fmt, error_code const& ec);
void work_download();
void on_msg_length(error_code const& ec, size_t bytes_transferred);
void on_message(error_code const& ec, size_t bytes_transferred);
bool verify_piece(int piece, int start, char const* ptr, int size);
void write_piece(int piece, int start, int length);
void write_have(int piece);
void abort();
private:
tcp::socket s;
char write_buf_proto[100];
boost::uint32_t write_buffer[17*1024/4];
boost::uint32_t buffer[17*1024/4];
peer_mode_t m_mode;
torrent_info const& m_ti;
int read_pos;
boost::function<void(int, char const*, int)> m_on_msg;
enum state_t
{
handshaking,
sending_request,
receiving_message
};
int state;
std::vector<int> pieces;
std::vector<int> suggested_pieces;
std::vector<int> allowed_fast;
bool choked;
int current_piece; // the piece we're currently requesting blocks from
bool m_current_piece_is_allowed;
int block;
int const m_blocks_per_piece;
int outstanding_requests;
// if this is true, this connection is a seed
bool fast_extension;
int blocks_received;
int blocks_sent;
time_point start_time;
time_point end_time;
tcp::endpoint endpoint;
bool restarting;
};
#endif

73
test/print_alerts.cpp Normal file
View File

@ -0,0 +1,73 @@
/*
Copyright (c) 2016, Arvid Norberg
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in
the documentation and/or other materials provided with the distribution.
* Neither the name of the author nor the names of its
contributors may be used to endorse or promote products derived
from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
*/
#include "print_alerts.hpp"
#include "libtorrent/time.hpp"
#include "libtorrent/session.hpp"
#include "libtorrent/alert_types.hpp"
#include "print_alerts.hpp"
void print_alerts(libtorrent::session* ses, libtorrent::time_point start_time)
{
using namespace libtorrent;
namespace lt = libtorrent;
if (ses == NULL) return;
std::vector<lt::alert*> alerts;
ses->pop_alerts(&alerts);
for (std::vector<lt::alert*>::iterator i = alerts.begin()
, end(alerts.end()); i != end; ++i)
{
alert* a = *i;
#ifndef TORRENT_DISABLE_LOGGING
if (peer_log_alert* pla = alert_cast<peer_log_alert>(a))
{
// in order to keep down the amount of logging, just log actual peer
// messages
if (pla->direction != peer_log_alert::incoming_message
&& pla->direction != peer_log_alert::outgoing_message)
{
continue;
}
}
#endif
lt::time_duration d = a->timestamp() - start_time;
boost::uint32_t millis = lt::duration_cast<lt::milliseconds>(d).count();
printf("%4d.%03d: %-25s %s\n", millis / 1000, millis % 1000
, a->what()
, a->message().c_str());
}
}

43
test/print_alerts.hpp Normal file
View File

@ -0,0 +1,43 @@
/*
Copyright (c) 2016, Arvid Norberg
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in
the documentation and/or other materials provided with the distribution.
* Neither the name of the author nor the names of its
contributors may be used to endorse or promote products derived
from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef PRINT_ALERTS_HPP
#define PRINT_ALERTS_HPP
#include "libtorrent/time.hpp"
#include "libtorrent/session.hpp"
#include "test.hpp" // for EXPORT
void EXPORT print_alerts(libtorrent::session* ses, libtorrent::time_point start_time);
#endif