add sim test for optimistic unchoke round-robin distribution

This commit is contained in:
arvidn 2016-01-30 21:33:47 -05:00 committed by arvidn
parent f2ce2284da
commit 88b7e3768f
12 changed files with 1028 additions and 25 deletions

View File

@ -978,7 +978,12 @@ namespace libtorrent
time_point m_created;
boost::int64_t session_time() const TORRENT_OVERRIDE
{ return total_seconds(aux::time_now() - m_created); }
{
// +1 is here to make it possible to distinguish uninitialized (to
// 0) timestamps and timestamps of things that happend during the
// first second after the session was constructed
return total_seconds(aux::time_now() - m_created) + 1;
}
time_point m_last_tick;
time_point m_last_second_tick;

View File

@ -94,7 +94,7 @@ namespace libtorrent
};
~bt_peer_connection();
#if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS)
bool supports_encryption() const
{ return m_encrypted; }

View File

@ -23,6 +23,7 @@ project
;
alias libtorrent-sims :
[ run test_optimistic_unchoke.cpp ]
[ run test_transfer.cpp ]
[ run test_http_connection.cpp ]
[ run test_auto_manage.cpp ]

View File

@ -236,7 +236,7 @@ void setup_swarm(int num_nodes
, std::function<void(lt::alert const*, lt::session&)> on_alert
, std::function<int(int, lt::session&)> terminate)
{
asio::io_service ios(sim, addr("0.0.0.0"));
asio::io_service ios(sim);
lt::time_point start_time(lt::clock_type::now());
std::vector<boost::shared_ptr<lt::session> > nodes;

View File

@ -0,0 +1,174 @@
/*
Copyright (c) 2016, Arvid Norberg
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in
the documentation and/or other materials provided with the distribution.
* Neither the name of the author nor the names of its
contributors may be used to endorse or promote products derived
from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
*/
#include "setup_swarm.hpp"
#include "test.hpp"
#include "create_torrent.hpp"
#include "bittorrent_peer.hpp"
#include "settings.hpp"
#include "print_alerts.hpp"
#include "libtorrent/alert.hpp"
#include "libtorrent/alert_types.hpp"
#include "libtorrent/session.hpp"
#include "libtorrent/session_stats.hpp"
#include "libtorrent/io_service.hpp"
#include "libtorrent/torrent_info.hpp"
#include "libtorrent/deadline_timer.hpp"
#include <boost/bind.hpp>
#include <boost/make_shared.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/ref.hpp>
struct choke_state
{
choke_state() : unchoke_duration(lt::seconds(0)), choked(true) {}
lt::time_duration unchoke_duration;
lt::time_point last_unchoke;
bool choked;
};
TORRENT_TEST(optimistic_unchoke)
{
int const num_nodes = 20;
lt::time_duration const test_duration = libtorrent::seconds(1201);
dsl_config network_cfg;
sim::simulation sim{network_cfg};
io_service ios(sim, addr("50.1.0.0"));
lt::time_point start_time(lt::clock_type::now());
libtorrent::add_torrent_params atp = create_torrent(0);
atp.flags &= ~add_torrent_params::flag_auto_managed;
atp.flags &= ~add_torrent_params::flag_paused;
lt::settings_pack pack = settings();
// only allow an optimistic unchoke slot
pack.set_int(settings_pack::unchoke_slots_limit, 1);
pack.set_int(settings_pack::num_optimistic_unchoke_slots, 1);
std::vector<choke_state> peer_choke_state(num_nodes);
session_proxy proxy;
boost::shared_ptr<lt::session> ses = boost::make_shared<lt::session>(
boost::ref(pack), boost::ref(ios));
ses->async_add_torrent(atp);
std::vector<boost::shared_ptr<sim::asio::io_service> > io_service;
std::vector<boost::shared_ptr<peer_conn> > peers;
ses->set_alert_notify([&]() {
// this function is called inside libtorrent and we cannot perform work
// immediately in it. We have to notify the outside to pull all the alerts
ios.post(boost::bind(&print_alerts, ses.get(), start_time));
});
lt::deadline_timer timer(ios);
timer.expires_from_now(libtorrent::seconds(2));
timer.async_wait([&](error_code const& ec)
{
for (int i = 0; i < num_nodes; ++i)
{
// create a new io_service
char ep[30];
snprintf(ep, sizeof(ep), "50.0.%d.%d", (i + 1) >> 8, (i + 1) & 0xff);
io_service.push_back(boost::make_shared<sim::asio::io_service>(
boost::ref(sim), addr(ep)));
peers.push_back(boost::make_shared<peer_conn>(boost::ref(*io_service.back())
, [&,i](int msg, char const* bug, int len)
{
choke_state& cs = peer_choke_state[i];
if (msg == 0)
{
// choke
if (!cs.choked)
{
cs.choked = true;
cs.unchoke_duration += lt::clock_type::now() - cs.last_unchoke;
}
}
else if (msg == 1)
{
// unchoke
if (cs.choked)
{
cs.choked = false;
cs.last_unchoke = lt::clock_type::now();
}
}
else
{
return;
}
char const* msg_str[] = {"choke", "unchoke"};
lt::time_duration d = lt::clock_type::now() - start_time;
boost::uint32_t millis = lt::duration_cast<lt::milliseconds>(d).count();
printf("\x1b[35m%4d.%03d: [%d] %s (%d ms)\x1b[0m\n"
, millis / 1000, millis % 1000, i, msg_str[msg]
, int(lt::duration_cast<lt::milliseconds>(cs.unchoke_duration).count()));
}
, *atp.ti
, tcp::endpoint(addr("50.1.0.0"), 6881)
, peer_conn::idle));
}
});
lt::deadline_timer end_timer(ios);
timer.expires_from_now(test_duration);
timer.async_wait([&](error_code const& ec)
{
for (auto& p : peers)
{
p->abort();
}
proxy = ses->abort();
ses.reset();
});
sim.run();
boost::int64_t const duration_ms = lt::duration_cast<lt::milliseconds>(test_duration).count();
boost::int64_t const average_unchoke_time = duration_ms / num_nodes;
printf("EXPECT: %" PRId64 " ms\n", average_unchoke_time);
for (auto const& cs : peer_choke_state)
{
boost::int64_t unchoke_duration = lt::duration_cast<lt::milliseconds>(cs.unchoke_duration).count();
printf("%" PRId64 " ms\n", unchoke_duration);
TEST_CHECK(std::abs(unchoke_duration - average_unchoke_time) < 1000);
}
}

View File

@ -1658,8 +1658,8 @@ namespace libtorrent
m_peer_interested = true;
if (is_disconnecting()) return;
// if the peer is ready to download stuff, it must have metadata
// if the peer is ready to download stuff, it must have metadata
m_has_metadata = true;
disconnect_if_redundant();
@ -4619,10 +4619,14 @@ namespace libtorrent
return;
}
int download_rate = statistics().download_payload_rate();
#ifndef TORRENT_DISABLE_LOGGING
int const previous_queue_size = m_desired_queue_size;
#endif
int const download_rate = statistics().download_payload_rate();
// the desired download queue size
const int queue_time = m_settings.get_int(settings_pack::request_queue_time);
int const queue_time = m_settings.get_int(settings_pack::request_queue_time);
// when we're in slow-start mode we increase the desired queue size every
// time we receive a piece, no need to adjust it here (other than
@ -4636,7 +4640,7 @@ namespace libtorrent
// the block size doesn't have to be 16. So we first query the
// torrent for it
boost::shared_ptr<torrent> t = m_torrent.lock();
const int block_size = t->block_size();
int const block_size = t->block_size();
TORRENT_ASSERT(block_size > 0);
@ -4649,10 +4653,13 @@ namespace libtorrent
m_desired_queue_size = min_request_queue;
#ifndef TORRENT_DISABLE_LOGGING
peer_log(peer_log_alert::info, "UPDATE_QUEUE_SIZE"
, "dqs: %d max: %d dl: %d qt: %d snubbed: %d slow-start: %d"
, m_desired_queue_size, m_max_out_request_queue
, download_rate, queue_time, int(m_snubbed), int(m_slow_start));
if (previous_queue_size != m_desired_queue_size)
{
peer_log(peer_log_alert::info, "UPDATE_QUEUE_SIZE"
, "dqs: %d max: %d dl: %d qt: %d snubbed: %d slow-start: %d"
, m_desired_queue_size, m_max_out_request_queue
, download_rate, queue_time, int(m_snubbed), int(m_slow_start));
}
#endif
}
@ -5092,7 +5099,7 @@ namespace libtorrent
bool sent_a_piece = false;
boost::shared_ptr<torrent> t = m_torrent.lock();
if (!t || t->is_aborted()) return;
if (!t || t->is_aborted() || m_requests.empty()) return;
// only add new piece-chunks if the send buffer is small enough
// otherwise there will be no end to how large it will be!
@ -6681,9 +6688,10 @@ namespace libtorrent
TORRENT_ASSERT(t->torrent_file().num_pieces() == int(m_have_piece.size()));
// in share mode we don't close redundant connections
if (m_settings.get_bool(settings_pack::close_redundant_connections) && !t->share_mode())
if (m_settings.get_bool(settings_pack::close_redundant_connections)
&& !t->share_mode())
{
bool ok_to_disconnect =
bool const ok_to_disconnect =
can_disconnect(error_code(errors::upload_upload_connection))
|| can_disconnect(error_code(errors::uninteresting_upload_peer))
|| can_disconnect(error_code(errors::too_many_requests_when_choked))

View File

@ -3032,8 +3032,8 @@ retry:
m_last_second_tick = now;
m_tick_residual += tick_interval_ms - 1000;
boost::int64_t session_time = total_seconds(now - m_created);
if (session_time > 65000)
boost::int64_t const stime = session_time();
if (stime > 65000)
{
// we're getting close to the point where our timestamps
// in torrent_peer are wrapping. We need to step all counters back
@ -3835,7 +3835,7 @@ retry:
// unchoked
int num_opt_unchoke = m_settings.get_int(settings_pack::num_optimistic_unchoke_slots);
int allowed_unchoke_slots = m_stats_counters[counters::num_unchoke_slots];
int const allowed_unchoke_slots = m_stats_counters[counters::num_unchoke_slots];
if (num_opt_unchoke == 0) num_opt_unchoke = (std::max)(1, allowed_unchoke_slots / 5);
if (num_opt_unchoke > int(opt_unchoke.size())) num_opt_unchoke =
int(opt_unchoke.size());
@ -3845,7 +3845,6 @@ retry:
, opt_unchoke.begin() + num_opt_unchoke
, opt_unchoke.end(), &last_optimistic_unchoke_cmp);
#ifndef TORRENT_DISABLE_EXTENSIONS
if (m_session_extension_features & plugin::optimistic_unchoke_feature)
{
@ -3884,8 +3883,14 @@ retry:
i != opt_unchoke_end; ++i)
{
torrent_peer* pi = *i;
peer_connection* p = static_cast<peer_connection*>(pi->connection);
if (pi->optimistically_unchoked)
{
#ifndef TORRENT_DISABLE_LOGGING
p->peer_log(peer_log_alert::info, "OPTIMISTIC UNCHOKE"
, "already unchoked | session-time: %d"
, pi->last_optimistically_unchoked);
#endif
TORRENT_ASSERT(!pi->connection->is_choked());
// remove this peer from prev_opt_unchoke, to prevent us from
// choking it later. This peer gets another round of optimistic
@ -3897,7 +3902,6 @@ retry:
}
else
{
peer_connection* p = static_cast<peer_connection*>(pi->connection);
TORRENT_ASSERT(p->is_choked());
boost::shared_ptr<torrent> t = p->associated_torrent().lock();
bool ret = t->unchoke_peer(*p, true);
@ -3907,6 +3911,10 @@ retry:
pi->optimistically_unchoked = true;
m_stats_counters.inc_stats_counter(counters::num_peers_up_unchoked_optimistic);
pi->last_optimistically_unchoked = boost::uint16_t(session_time());
#ifndef TORRENT_DISABLE_LOGGING
p->peer_log(peer_log_alert::info, "OPTIMISTIC UNCHOKE"
, "session-time: %d", pi->last_optimistically_unchoked);
#endif
}
}
}
@ -3923,6 +3931,14 @@ retry:
m_stats_counters.inc_stats_counter(counters::num_peers_up_unchoked_optimistic, -1);
t->choke_peer(*p);
}
// if we have too many unchoked peers now, we need to trigger the regular
// choking logic to choke some
if (m_stats_counters[counters::num_unchoke_slots]
< m_stats_counters[counters::num_peers_up_unchoked_all])
{
m_unchoke_time_scaler = 0;
}
}
void session_impl::try_connect_more_peers()
@ -4076,6 +4092,8 @@ retry:
// build list of all peers that are
// unchokable.
// TODO: 3 there should be a pre-calculated list of all peers eligible for
// unchoking
std::vector<peer_connection*> peers;
for (connection_map::iterator i = m_connections.begin();
i != m_connections.end();)
@ -4137,7 +4155,7 @@ retry:
, performance_alert::bittyrant_with_no_uplimit);
}
int allowed_upload_slots = unchoke_sort(peers, max_upload_rate
int const allowed_upload_slots = unchoke_sort(peers, max_upload_rate
, unchoke_interval, m_settings);
m_stats_counters.set_value(counters::num_unchoke_slots
, allowed_upload_slots);
@ -4146,7 +4164,8 @@ retry:
session_log("RECALCULATE UNCHOKE SLOTS: [ peers: %d "
"eligible-peers: %d"
" max_upload_rate: %d"
" allowed-slots: %d ]", int(m_connections.size())
" allowed-slots: %d ]"
, int(m_connections.size())
, int(peers.size())
, max_upload_rate
, allowed_upload_slots);
@ -4154,9 +4173,7 @@ retry:
int num_opt_unchoke = m_settings.get_int(settings_pack::num_optimistic_unchoke_slots);
if (num_opt_unchoke == 0) num_opt_unchoke = (std::max)(1, allowed_upload_slots / 5);
// reserve some upload slots for optimistic unchokes
int unchoke_set_size = allowed_upload_slots;
int unchoke_set_size = allowed_upload_slots - num_opt_unchoke;
// go through all the peers and unchoke the first ones and choke
// all the other ones.

View File

@ -56,6 +56,8 @@ lib libtorrent_test
dht_server.cpp
udp_tracker.cpp
peer_server.cpp
bittorrent_peer.cpp
print_alerts.cpp
web_seed_suite.cpp
swarm_suite.cpp
test_utils.cpp

562
test/bittorrent_peer.cpp Normal file
View File

@ -0,0 +1,562 @@
/*
Copyright (c) 2016, Arvid Norberg
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in
the documentation and/or other materials provided with the distribution.
* Neither the name of the author nor the names of its
contributors may be used to endorse or promote products derived
from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
*/
#include "libtorrent/socket.hpp"
#include "libtorrent/sha1_hash.hpp"
#include "libtorrent/address.hpp"
#include "libtorrent/assert.hpp"
#include "bittorrent_peer.hpp"
#include "libtorrent/torrent_info.hpp"
#include "libtorrent/io_service.hpp"
#include "libtorrent/io.hpp"
#include <cstdlib>
#include <boost/bind.hpp>
using namespace libtorrent;
peer_conn::peer_conn(io_service& ios
, boost::function<void(int, char const*, int)> on_msg
, torrent_info const& ti
, tcp::endpoint const& ep
, peer_mode_t mode)
: s(ios)
, m_mode(mode)
, m_ti(ti)
, read_pos(0)
, m_on_msg(on_msg)
, state(handshaking)
, choked(true)
, current_piece(-1)
, m_current_piece_is_allowed(false)
, block(0)
, m_blocks_per_piece((m_ti.piece_length() + 0x3fff) / 0x4000)
, outstanding_requests(0)
, fast_extension(false)
, blocks_received(0)
, blocks_sent(0)
, start_time(clock_type::now())
, endpoint(ep)
, restarting(false)
{
pieces.reserve(m_ti.num_pieces());
start_conn();
}
void peer_conn::start_conn()
{
restarting = false;
s.async_connect(endpoint, boost::bind(&peer_conn::on_connect, this, _1));
}
void peer_conn::on_connect(error_code const& ec)
{
if (ec)
{
close("ERROR CONNECT: %s", ec);
return;
}
char handshake[] = "\x13" "BitTorrent protocol\0\0\0\0\0\0\0\x04"
" " // space for info-hash
"aaaaaaaaaaaaaaaaaaaa" // peer-id
"\0\0\0\x01\x02"; // interested
char* h = (char*)malloc(sizeof(handshake));
memcpy(h, handshake, sizeof(handshake));
std::memcpy(h + 28, m_ti.info_hash().data(), 20);
std::generate(h + 48, h + 68, &rand);
// for seeds, don't send the interested message
boost::asio::async_write(s, boost::asio::buffer(h, (sizeof(handshake) - 1)
- (m_mode == uploader ? 5 : 0))
, boost::bind(&peer_conn::on_handshake, this, h, _1, _2));
}
void peer_conn::on_handshake(char* h, error_code const& ec, size_t bytes_transferred)
{
free(h);
if (ec)
{
close("ERROR SEND HANDSHAKE: %s", ec);
return;
}
// read handshake
boost::asio::async_read(s, boost::asio::buffer((char*)buffer, 68)
, boost::bind(&peer_conn::on_handshake2, this, _1, _2));
}
void peer_conn::on_handshake2(error_code const& ec, size_t bytes_transferred)
{
if (ec)
{
close("ERROR READ HANDSHAKE: %s", ec);
return;
}
// buffer is the full 68 byte handshake
// look at the extension bits
fast_extension = ((char*)buffer)[27] & 4;
if (m_mode == uploader)
{
write_have_all();
}
else
{
work_download();
}
}
void peer_conn::write_have_all()
{
using namespace libtorrent::detail;
if (fast_extension)
{
char* ptr = write_buf_proto;
// have_all
write_uint32(1, ptr);
write_uint8(0xe, ptr);
// unchoke
write_uint32(1, ptr);
write_uint8(1, ptr);
error_code ec;
boost::asio::async_write(s, boost::asio::buffer(write_buf_proto, ptr - write_buf_proto)
, boost::bind(&peer_conn::on_have_all_sent, this, _1, _2));
}
else
{
// bitfield
int len = (m_ti.num_pieces() + 7) / 8;
char* ptr = (char*)buffer;
write_uint32(len + 1, ptr);
write_uint8(5, ptr);
memset(ptr, 255, len);
ptr += len;
// unchoke
write_uint32(1, ptr);
write_uint8(1, ptr);
error_code ec;
boost::asio::async_write(s, boost::asio::buffer((char*)buffer, len + 10)
, boost::bind(&peer_conn::on_have_all_sent, this, _1, _2));
}
}
void peer_conn::on_have_all_sent(error_code const& ec, size_t bytes_transferred)
{
if (ec)
{
close("ERROR SEND HAVE ALL: %s", ec);
return;
}
// read message
boost::asio::async_read(s, boost::asio::buffer((char*)buffer, 4)
, boost::bind(&peer_conn::on_msg_length, this, _1, _2));
}
bool peer_conn::write_request()
{
using namespace libtorrent::detail;
// if we're choked (and there are no allowed-fast pieces left)
if (choked && allowed_fast.empty() && !m_current_piece_is_allowed) return false;
// if there are no pieces left to request
if (pieces.empty() && suggested_pieces.empty() && current_piece == -1) return false;
if (current_piece == -1)
{
// pick a new piece
if (choked && allowed_fast.size() > 0)
{
current_piece = allowed_fast.front();
allowed_fast.erase(allowed_fast.begin());
m_current_piece_is_allowed = true;
}
else if (suggested_pieces.size() > 0)
{
current_piece = suggested_pieces.front();
suggested_pieces.erase(suggested_pieces.begin());
m_current_piece_is_allowed = false;
}
else if (pieces.size() > 0)
{
current_piece = pieces.front();
pieces.erase(pieces.begin());
m_current_piece_is_allowed = false;
}
else
{
TORRENT_ASSERT(false);
}
}
char msg[] = "\0\0\0\xd\x06"
" " // piece
" " // offset
" "; // length
char* m = (char*)malloc(sizeof(msg));
memcpy(m, msg, sizeof(msg));
char* ptr = m + 5;
write_uint32(current_piece, ptr);
write_uint32(block * 16 * 1024, ptr);
write_uint32(16 * 1024, ptr);
error_code ec;
boost::asio::async_write(s, boost::asio::buffer(m, sizeof(msg) - 1)
, boost::bind(&peer_conn::on_req_sent, this, m, _1, _2));
++outstanding_requests;
++block;
if (block == m_blocks_per_piece)
{
block = 0;
current_piece = -1;
m_current_piece_is_allowed = false;
}
return true;
}
void peer_conn::on_req_sent(char* m, error_code const& ec, size_t bytes_transferred)
{
free(m);
if (ec)
{
close("ERROR SEND REQUEST: %s", ec);
return;
}
work_download();
}
void peer_conn::close(char const* fmt, error_code const& ec)
{
end_time = clock_type::now();
char tmp[1024];
snprintf(tmp, sizeof(tmp), fmt, ec.message().c_str());
int time = total_milliseconds(end_time - start_time);
if (time == 0) time = 1;
float up = (boost::int64_t(blocks_sent) * 0x4000) / time / 1000.f;
float down = (boost::int64_t(blocks_received) * 0x4000) / time / 1000.f;
error_code e;
char ep_str[200];
address const& addr = s.local_endpoint(e).address();
#if TORRENT_USE_IPV6
if (addr.is_v6())
snprintf(ep_str, sizeof(ep_str), "[%s]:%d", addr.to_string(e).c_str()
, s.local_endpoint(e).port());
else
#endif
snprintf(ep_str, sizeof(ep_str), "%s:%d", addr.to_string(e).c_str()
, s.local_endpoint(e).port());
printf("%s ep: %s sent: %d received: %d duration: %d ms up: %.1fMB/s down: %.1fMB/s\n"
, tmp, ep_str, blocks_sent, blocks_received, time, up, down);
}
void peer_conn::work_download()
{
if (pieces.empty()
&& suggested_pieces.empty()
&& current_piece == -1
&& outstanding_requests == 0
&& blocks_received >= m_ti.num_pieces() * m_blocks_per_piece)
{
close("COMPLETED DOWNLOAD", error_code());
return;
}
// send requests
if (outstanding_requests < 40)
{
if (write_request()) return;
}
// read message
boost::asio::async_read(s, boost::asio::buffer((char*)buffer, 4)
, boost::bind(&peer_conn::on_msg_length, this, _1, _2));
}
void peer_conn::on_msg_length(error_code const& ec, size_t bytes_transferred)
{
using namespace libtorrent::detail;
if ((ec == boost::asio::error::operation_aborted || ec == boost::asio::error::bad_descriptor)
&& restarting)
{
start_conn();
return;
}
if (ec)
{
close("ERROR RECEIVE MESSAGE PREFIX: %s", ec);
return;
}
char* ptr = (char*)buffer;
unsigned int length = read_uint32(ptr);
if (length > sizeof(buffer))
{
fprintf(stderr, "len: %d\n", length);
close("ERROR RECEIVE MESSAGE PREFIX: packet too big", error_code());
return;
}
if (length == 0)
{
// keep-alive messate. read another length prefix
boost::asio::async_read(s, boost::asio::buffer((char*)buffer, 4)
, boost::bind(&peer_conn::on_msg_length, this, _1, _2));
}
else
{
boost::asio::async_read(s, boost::asio::buffer((char*)buffer, length)
, boost::bind(&peer_conn::on_message, this, _1, _2));
}
}
void peer_conn::on_message(error_code const& ec, size_t bytes_transferred)
{
using namespace libtorrent::detail;
if ((ec == boost::asio::error::operation_aborted || ec == boost::asio::error::bad_descriptor)
&& restarting)
{
start_conn();
return;
}
if (ec)
{
close("ERROR RECEIVE MESSAGE: %s", ec);
return;
}
char* ptr = (char*)buffer;
int msg = read_uint8(ptr);
m_on_msg(msg, ptr, bytes_transferred);
switch (m_mode)
{
case peer_conn::uploader:
if (msg == 6)
{
if (bytes_transferred != 13)
{
close("REQUEST packet has invalid size", error_code());
return;
}
int piece = detail::read_int32(ptr);
int start = detail::read_int32(ptr);
int length = detail::read_int32(ptr);
write_piece(piece, start, length);
}
else if (msg == 3) // not-interested
{
close("DONE", error_code());
return;
}
else
{
// read another message
boost::asio::async_read(s, boost::asio::buffer(buffer, 4)
, boost::bind(&peer_conn::on_msg_length, this, _1, _2));
}
break;
case peer_conn::downloader:
if (msg == 0xe) // have_all
{
// build a list of all pieces and request them all!
pieces.resize(m_ti.num_pieces());
for (int i = 0; i < int(pieces.size()); ++i)
pieces[i] = i;
std::random_shuffle(pieces.begin(), pieces.end());
}
else if (msg == 4) // have
{
int piece = detail::read_int32(ptr);
if (pieces.empty()) pieces.push_back(piece);
else pieces.insert(pieces.begin() + (rand() % pieces.size()), piece);
}
else if (msg == 5) // bitfield
{
pieces.reserve(m_ti.num_pieces());
int piece = 0;
for (int i = 0; i < int(bytes_transferred); ++i)
{
int mask = 0x80;
for (int k = 0; k < 8; ++k)
{
if (piece > m_ti.num_pieces()) break;
if (*ptr & mask) pieces.push_back(piece);
mask >>= 1;
++piece;
}
++ptr;
}
std::random_shuffle(pieces.begin(), pieces.end());
}
else if (msg == 7) // piece
{
/*
if (verify_downloads)
{
int piece = read_uint32(ptr);
int start = read_uint32(ptr);
int size = bytes_transferred - 9;
verify_piece(piece, start, ptr, size);
}
*/
++blocks_received;
--outstanding_requests;
int piece = detail::read_int32(ptr);
int start = detail::read_int32(ptr);
if (int((start + bytes_transferred) / 0x4000) == m_blocks_per_piece)
{
write_have(piece);
return;
}
}
else if (msg == 13) // suggest
{
int piece = detail::read_int32(ptr);
std::vector<int>::iterator i = std::find(pieces.begin(), pieces.end(), piece);
if (i != pieces.end())
{
pieces.erase(i);
suggested_pieces.push_back(piece);
}
}
else if (msg == 16) // reject request
{
int piece = detail::read_int32(ptr);
int start = detail::read_int32(ptr);
int length = detail::read_int32(ptr);
// put it back!
if (current_piece != piece)
{
if (pieces.empty() || pieces.back() != piece)
pieces.push_back(piece);
}
else
{
block = (std::min)(start / 0x4000, block);
if (block == 0)
{
pieces.push_back(current_piece);
current_piece = -1;
m_current_piece_is_allowed = false;
}
}
--outstanding_requests;
fprintf(stderr, "REJECT: [ piece: %d start: %d length: %d ]\n", piece, start, length);
}
else if (msg == 0) // choke
{
choked = true;
}
else if (msg == 1) // unchoke
{
choked = false;
}
else if (msg == 17) // allowed_fast
{
int piece = detail::read_int32(ptr);
std::vector<int>::iterator i = std::find(pieces.begin(), pieces.end(), piece);
if (i != pieces.end())
{
pieces.erase(i);
allowed_fast.push_back(piece);
}
}
work_download();
break;
case peer_conn::idle:
// read another message
boost::asio::async_read(s, boost::asio::buffer(buffer, 4)
, boost::bind(&peer_conn::on_msg_length, this, _1, _2));
break;
}
}
/*
bool peer_conn::verify_piece(int piece, int start, char const* ptr, int size)
{
boost::uint32_t* buf = (boost::uint32_t*)ptr;
boost::uint32_t fill = (piece << 8) | ((start / 0x4000) & 0xff);
for (int i = 0; i < size / 4; ++i)
{
if (buf[i] != fill)
{
fprintf(stderr, "received invalid block. piece %d block %d\n", piece, start / 0x4000);
exit(1);
return false;
}
}
return true;
}
*/
void peer_conn::write_piece(int piece, int start, int length)
{
using namespace libtorrent::detail;
// generate_block(write_buffer, piece, start, length);
char* ptr = write_buf_proto;
write_uint32(9 + length, ptr);
TORRENT_ASSERT(length == 0x4000);
write_uint8(7, ptr);
write_uint32(piece, ptr);
write_uint32(start, ptr);
boost::array<boost::asio::const_buffer, 2> vec;
vec[0] = boost::asio::buffer(write_buf_proto, ptr - write_buf_proto);
vec[1] = boost::asio::buffer(write_buffer, length);
boost::asio::async_write(s, vec, boost::bind(&peer_conn::on_have_all_sent, this, _1, _2));
++blocks_sent;
}
void peer_conn::write_have(int piece)
{
using namespace libtorrent::detail;
char* ptr = write_buf_proto;
write_uint32(5, ptr);
write_uint8(4, ptr);
write_uint32(piece, ptr);
boost::asio::async_write(s, boost::asio::buffer(write_buf_proto, 9), boost::bind(&peer_conn::on_have_all_sent, this, _1, _2));
}
void peer_conn::abort()
{
error_code ec;
s.close(ec);
}

118
test/bittorrent_peer.hpp Normal file
View File

@ -0,0 +1,118 @@
/*
Copyright (c) 2016, Arvid Norberg
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in
the documentation and/or other materials provided with the distribution.
* Neither the name of the author nor the names of its
contributors may be used to endorse or promote products derived
from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef BITTORRENT_PEER_HPP
#define BITTORRENT_PEER_HPP
#include "libtorrent/socket.hpp"
#include "libtorrent/sha1_hash.hpp"
#include "libtorrent/io_service.hpp"
#include "libtorrent/time.hpp"
#include "libtorrent/address.hpp"
#include "libtorrent/torrent_info.hpp"
#include "test.hpp" // for EXPORT
#include <boost/function.hpp>
using namespace libtorrent;
struct EXPORT peer_conn
{
enum peer_mode_t
{ uploader, downloader, idle };
peer_conn(io_service& ios
, boost::function<void(int, char const*, int)> on_msg
, libtorrent::torrent_info const& ti
, libtorrent::tcp::endpoint const& ep
, peer_mode_t mode);
void start_conn();
void on_connect(error_code const& ec);
void on_handshake(char* h, error_code const& ec, size_t bytes_transferred);
void on_handshake2(error_code const& ec, size_t bytes_transferred);
void write_have_all();
void on_have_all_sent(error_code const& ec, size_t bytes_transferred);
bool write_request();
void on_req_sent(char* m, error_code const& ec, size_t bytes_transferred);
void close(char const* fmt, error_code const& ec);
void work_download();
void on_msg_length(error_code const& ec, size_t bytes_transferred);
void on_message(error_code const& ec, size_t bytes_transferred);
bool verify_piece(int piece, int start, char const* ptr, int size);
void write_piece(int piece, int start, int length);
void write_have(int piece);
void abort();
private:
tcp::socket s;
char write_buf_proto[100];
boost::uint32_t write_buffer[17*1024/4];
boost::uint32_t buffer[17*1024/4];
peer_mode_t m_mode;
torrent_info const& m_ti;
int read_pos;
boost::function<void(int, char const*, int)> m_on_msg;
enum state_t
{
handshaking,
sending_request,
receiving_message
};
int state;
std::vector<int> pieces;
std::vector<int> suggested_pieces;
std::vector<int> allowed_fast;
bool choked;
int current_piece; // the piece we're currently requesting blocks from
bool m_current_piece_is_allowed;
int block;
int const m_blocks_per_piece;
int outstanding_requests;
// if this is true, this connection is a seed
bool fast_extension;
int blocks_received;
int blocks_sent;
time_point start_time;
time_point end_time;
tcp::endpoint endpoint;
bool restarting;
};
#endif

73
test/print_alerts.cpp Normal file
View File

@ -0,0 +1,73 @@
/*
Copyright (c) 2016, Arvid Norberg
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in
the documentation and/or other materials provided with the distribution.
* Neither the name of the author nor the names of its
contributors may be used to endorse or promote products derived
from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
*/
#include "print_alerts.hpp"
#include "libtorrent/time.hpp"
#include "libtorrent/session.hpp"
#include "libtorrent/alert_types.hpp"
#include "print_alerts.hpp"
void print_alerts(libtorrent::session* ses, libtorrent::time_point start_time)
{
using namespace libtorrent;
namespace lt = libtorrent;
if (ses == NULL) return;
std::vector<lt::alert*> alerts;
ses->pop_alerts(&alerts);
for (std::vector<lt::alert*>::iterator i = alerts.begin()
, end(alerts.end()); i != end; ++i)
{
alert* a = *i;
#ifndef TORRENT_DISABLE_LOGGING
if (peer_log_alert* pla = alert_cast<peer_log_alert>(a))
{
// in order to keep down the amount of logging, just log actual peer
// messages
if (pla->direction != peer_log_alert::incoming_message
&& pla->direction != peer_log_alert::outgoing_message)
{
continue;
}
}
#endif
lt::time_duration d = a->timestamp() - start_time;
boost::uint32_t millis = lt::duration_cast<lt::milliseconds>(d).count();
printf("%4d.%03d: %-25s %s\n", millis / 1000, millis % 1000
, a->what()
, a->message().c_str());
}
}

43
test/print_alerts.hpp Normal file
View File

@ -0,0 +1,43 @@
/*
Copyright (c) 2016, Arvid Norberg
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in
the documentation and/or other materials provided with the distribution.
* Neither the name of the author nor the names of its
contributors may be used to endorse or promote products derived
from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef PRINT_ALERTS_HPP
#define PRINT_ALERTS_HPP
#include "libtorrent/time.hpp"
#include "libtorrent/session.hpp"
#include "test.hpp" // for EXPORT
void EXPORT print_alerts(libtorrent::session* ses, libtorrent::time_point start_time);
#endif