premiere-libtorrent/src/peer_connection.cpp

1663 lines
44 KiB
C++
Raw Normal View History

/*
Copyright (c) 2003, Arvid Norberg
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in
the documentation and/or other materials provided with the distribution.
* Neither the name of the author nor the names of its
contributors may be used to endorse or promote products derived
from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
*/
#include <iostream>
#include <iomanip>
#include <vector>
2004-01-12 21:31:27 +01:00
#include <limits>
#include "libtorrent/peer_connection.hpp"
#include "libtorrent/session.hpp"
2003-12-22 08:14:35 +01:00
#include "libtorrent/identify_client.hpp"
2004-01-04 05:29:13 +01:00
#include "libtorrent/entry.hpp"
#include "libtorrent/bencode.hpp"
2004-01-18 20:12:18 +01:00
#include "libtorrent/alert_types.hpp"
2003-11-09 19:17:09 +01:00
#if defined(_MSC_VER)
#define for if (false) {} else for
#endif
#define VERBOSE
2004-01-04 05:29:13 +01:00
namespace libtorrent
{
2004-01-04 05:29:13 +01:00
// the names of the extensions to look for in
// the extensions-message
const char* peer_connection::extension_names[] =
2004-01-07 01:48:02 +01:00
{ "chat" };
2004-01-04 05:29:13 +01:00
2004-01-05 00:51:54 +01:00
const peer_connection::message_handler peer_connection::m_message_handler[] =
{
&peer_connection::on_choke,
&peer_connection::on_unchoke,
&peer_connection::on_interested,
&peer_connection::on_not_interested,
&peer_connection::on_have,
&peer_connection::on_bitfield,
&peer_connection::on_request,
&peer_connection::on_piece,
&peer_connection::on_cancel,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
&peer_connection::on_extension_list,
&peer_connection::on_extended
};
2004-01-04 05:29:13 +01:00
peer_connection::peer_connection(
detail::session_impl& ses
, selector& sel
, torrent* t
2004-01-15 17:45:34 +01:00
, boost::shared_ptr<libtorrent::socket> s)
2004-01-04 05:29:13 +01:00
: m_state(read_protocol_length)
, m_timeout(120)
, m_packet_size(1)
, m_recv_pos(0)
, m_last_receive(boost::gregorian::date(std::time(0)))
, m_last_sent(boost::gregorian::date(std::time(0)))
, m_selector(sel)
, m_socket(s)
, m_torrent(t)
, m_attached_to_torrent(true)
, m_ses(ses)
, m_active(true)
, m_added_to_selector(false)
, m_peer_interested(false)
, m_peer_choked(true)
, m_interesting(false)
, m_choked(true)
, m_supports_extensions(false)
, m_free_upload(0)
, m_send_quota(100)
, m_send_quota_left(100)
, m_send_quota_limit(100)
, m_trust_points(0)
2004-01-12 21:31:27 +01:00
, m_num_invalid_requests(0)
, m_last_piece(boost::gregorian::date(std::time(0)))
, m_last_piece_time(boost::posix_time::seconds(0))
2004-01-20 23:59:21 +01:00
, m_disconnecting(false)
2004-01-04 05:29:13 +01:00
{
assert(!m_socket->is_blocking());
assert(m_torrent != 0);
2004-01-04 05:29:13 +01:00
#ifndef NDEBUG
m_logger = m_ses.create_log(s->sender().as_string().c_str());
#endif
2004-01-15 17:45:34 +01:00
std::fill(m_peer_id.begin(), m_peer_id.end(), 0);
2004-01-04 05:29:13 +01:00
// initialize the extension list to zero, since
// we don't know which extensions the other
// end supports yet
2004-01-07 13:07:16 +01:00
std::fill(m_extension_messages, m_extension_messages + num_supported_extensions, -1);
2004-01-04 05:29:13 +01:00
send_handshake();
2004-01-04 05:29:13 +01:00
// start in the state where we are trying to read the
// handshake from the other side
m_recv_buffer.resize(1);
2004-01-04 05:29:13 +01:00
// assume the other end has no pieces
m_have_piece.resize(m_torrent->torrent_file().num_pieces());
std::fill(m_have_piece.begin(), m_have_piece.end(), false);
2004-01-04 05:29:13 +01:00
send_bitfield();
}
2003-12-07 06:53:04 +01:00
2004-01-04 05:29:13 +01:00
peer_connection::peer_connection(
detail::session_impl& ses
, selector& sel
, boost::shared_ptr<libtorrent::socket> s)
: m_state(read_protocol_length)
, m_timeout(120)
, m_packet_size(1)
, m_recv_pos(0)
, m_last_receive(boost::gregorian::date(std::time(0)))
, m_last_sent(boost::gregorian::date(std::time(0)))
, m_selector(sel)
, m_socket(s)
, m_torrent(0)
, m_attached_to_torrent(0)
, m_ses(ses)
, m_active(false)
, m_added_to_selector(false)
, m_peer_id()
, m_peer_interested(false)
, m_peer_choked(true)
, m_interesting(false)
, m_choked(true)
, m_supports_extensions(false)
, m_free_upload(0)
, m_send_quota(100)
, m_send_quota_left(100)
, m_send_quota_limit(100)
, m_trust_points(0)
2004-01-12 21:31:27 +01:00
, m_num_invalid_requests(0)
, m_last_piece(boost::gregorian::date(std::time(0)))
, m_last_piece_time(boost::posix_time::seconds(0))
2004-01-20 23:59:21 +01:00
, m_disconnecting(false)
2004-01-04 05:29:13 +01:00
{
assert(!m_socket->is_blocking());
2003-11-09 19:17:09 +01:00
2004-01-15 17:45:34 +01:00
std::fill(m_peer_id.begin(), m_peer_id.end(), 0);
2004-01-04 05:29:13 +01:00
#ifndef NDEBUG
m_logger = m_ses.create_log(s->sender().as_string().c_str());
#endif
2003-11-09 19:17:09 +01:00
2004-01-04 05:29:13 +01:00
// initialize the extension list to zero, since
// we don't know which extensions the other
// end supports yet
2004-01-07 13:07:16 +01:00
std::fill(m_extension_messages, m_extension_messages + num_supported_extensions, -1);
2004-01-04 05:29:13 +01:00
// we are not attached to any torrent yet.
// we have to wait for the handshake to see
// which torrent the connector want's to connect to
2004-01-04 05:29:13 +01:00
// start in the state where we are trying to read the
// handshake from the other side
m_recv_buffer.resize(1);
}
2004-01-04 05:29:13 +01:00
peer_connection::~peer_connection()
{
2004-01-04 05:29:13 +01:00
m_selector.remove(m_socket);
if (m_attached_to_torrent)
{
2004-01-04 05:29:13 +01:00
assert(m_torrent != 0);
m_torrent->remove_peer(this);
}
2004-01-04 05:29:13 +01:00
}
2004-01-04 05:29:13 +01:00
void peer_connection::set_send_quota(int num_bytes)
{
assert(num_bytes <= m_send_quota_limit || m_send_quota_limit == -1);
if (num_bytes > m_send_quota_limit && m_send_quota_limit!=-1) num_bytes = m_send_quota_limit;
2004-01-04 05:29:13 +01:00
m_send_quota = num_bytes;
m_send_quota_left = num_bytes;
send_buffer_updated();
}
2004-01-04 05:29:13 +01:00
void peer_connection::send_handshake()
{
assert(m_send_buffer.size() == 0);
// add handshake to the send buffer
const char version_string[] = "BitTorrent protocol";
const int string_len = sizeof(version_string)-1;
int pos = 1;
m_send_buffer.resize(1 + string_len + 8 + 20 + 20);
// length of version string
m_send_buffer[0] = string_len;
// version string itself
std::copy(
version_string
, version_string+string_len
, m_send_buffer.begin()+pos);
pos += string_len;
// 8 zeroes
std::fill(
m_send_buffer.begin() + pos
, m_send_buffer.begin() + pos + 8
, 0);
// indicate that we support the extension protocol
2004-01-14 02:19:30 +01:00
// curently disabled
// m_send_buffer[pos] = 0x80;
2004-01-04 05:29:13 +01:00
pos += 8;
// info hash
std::copy(
m_torrent->torrent_file().info_hash().begin()
, m_torrent->torrent_file().info_hash().end()
, m_send_buffer.begin() + pos);
pos += 20;
// peer id
std::copy(
m_ses.get_peer_id().begin()
, m_ses.get_peer_id().end()
, m_send_buffer.begin() + pos);
2003-12-07 06:53:04 +01:00
2004-01-04 05:29:13 +01:00
#ifndef NDEBUG
2004-01-05 02:30:34 +01:00
(*m_logger) << " ==> HANDSHAKE\n";
2004-01-04 05:29:13 +01:00
#endif
send_buffer_updated();
}
2004-01-08 14:03:38 +01:00
// verifies a piece to see if it is valid (is within a valid range)
// and if it can correspond to a request generated by libtorrent.
bool peer_connection::verify_piece(const peer_request& p) const
{
return p.piece >= 0
&& p.piece < m_torrent->torrent_file().num_pieces()
&& p.length > 0
&& p.start >= 0
&& (p.length == m_torrent->block_size()
|| (p.length < m_torrent->block_size()
&& p.piece == m_torrent->torrent_file().num_pieces()-1
&& p.start + p.length == m_torrent->torrent_file().piece_size(p.piece)))
&& p.start + p.length <= m_torrent->torrent_file().piece_size(p.piece)
&& p.start % m_torrent->block_size() == 0;
}
2004-01-04 05:29:13 +01:00
boost::optional<piece_block_progress> peer_connection::downloading_piece() const
{
// are we currently receiving a 'piece' message?
if (m_state != read_packet
|| m_recv_pos < 9
|| m_recv_buffer[0] != msg_piece)
return boost::optional<piece_block_progress>();
const char* ptr = &m_recv_buffer[1];
2004-01-08 14:03:38 +01:00
peer_request r;
r.piece = detail::read_int(ptr);
r.start = detail::read_int(ptr);
r.length = m_packet_size - 9;
2004-01-04 05:29:13 +01:00
// is any of the piece message header data invalid?
2004-01-08 14:03:38 +01:00
if (!verify_piece(r))
2004-01-04 05:29:13 +01:00
return boost::optional<piece_block_progress>();
piece_block_progress p;
2004-01-08 14:03:38 +01:00
p.piece_index = r.piece;
p.block_index = r.start / m_torrent->block_size();
2004-01-04 05:29:13 +01:00
p.bytes_downloaded = m_recv_pos - 9;
2004-01-08 14:03:38 +01:00
p.full_block_bytes = r.length;
2004-01-04 05:29:13 +01:00
return boost::optional<piece_block_progress>(p);
}
2003-12-07 06:53:04 +01:00
2004-01-05 00:51:54 +01:00
// message handlers
2004-01-05 00:51:54 +01:00
// -----------------------------
// ----------- CHOKE -----------
// -----------------------------
void peer_connection::on_choke(int received)
{
if (m_packet_size != 1)
throw protocol_error("'choke' message size != 1");
m_statistics.received_bytes(0, received);
if (m_recv_pos < m_packet_size) return;
#ifndef NDEBUG
2004-01-12 21:31:27 +01:00
(*m_logger) << " <== CHOKE\n";
2004-01-05 00:51:54 +01:00
#endif
m_peer_choked = true;
m_torrent->get_policy().choked(*this);
// remove all pieces from this peers download queue and
// remove the 'downloading' flag from piece_picker.
for (std::deque<piece_block>::iterator i = m_download_queue.begin();
i != m_download_queue.end();
++i)
2004-01-04 06:28:24 +01:00
{
2004-01-05 00:51:54 +01:00
m_torrent->picker().abort_download(*i);
2004-01-04 06:28:24 +01:00
}
2004-01-05 00:51:54 +01:00
m_download_queue.clear();
#ifndef NDEBUG
// m_torrent->picker().integrity_check(m_torrent);
#endif
}
2004-01-04 06:28:24 +01:00
2004-01-05 00:51:54 +01:00
// -----------------------------
// ---------- UNCHOKE ----------
// -----------------------------
2004-01-05 00:51:54 +01:00
void peer_connection::on_unchoke(int received)
{
if (m_packet_size != 1)
throw protocol_error("'unchoke' message size != 1");
m_statistics.received_bytes(0, received);
if (m_recv_pos < m_packet_size) return;
2003-12-07 06:53:04 +01:00
2004-01-05 00:51:54 +01:00
#ifndef NDEBUG
2004-01-12 21:31:27 +01:00
(*m_logger) << " <== UNCHOKE\n";
2004-01-05 00:51:54 +01:00
#endif
m_peer_choked = false;
m_torrent->get_policy().unchoked(*this);
}
2004-01-05 00:51:54 +01:00
// -----------------------------
// -------- INTERESTED ---------
// -----------------------------
2004-01-05 00:51:54 +01:00
void peer_connection::on_interested(int received)
{
if (m_packet_size != 1)
throw protocol_error("'interested' message size != 1");
m_statistics.received_bytes(0, received);
if (m_recv_pos < m_packet_size) return;
2004-01-05 00:51:54 +01:00
#ifndef NDEBUG
2004-01-05 02:30:34 +01:00
(*m_logger) << " <== INTERESTED\n";
2004-01-05 00:51:54 +01:00
#endif
m_peer_interested = true;
m_torrent->get_policy().interested(*this);
}
2003-12-07 06:53:04 +01:00
2004-01-05 00:51:54 +01:00
// -----------------------------
// ------ NOT INTERESTED -------
// -----------------------------
2004-01-05 00:51:54 +01:00
void peer_connection::on_not_interested(int received)
{
if (m_packet_size != 1)
throw protocol_error("'not interested' message size != 1");
m_statistics.received_bytes(0, received);
if (m_recv_pos < m_packet_size) return;
2004-01-05 00:51:54 +01:00
// clear the request queue if the client isn't interested
m_requests.clear();
2003-12-18 04:30:41 +01:00
2004-01-05 00:51:54 +01:00
#ifndef NDEBUG
2004-01-12 21:31:27 +01:00
(*m_logger) << " <== NOT_INTERESTED\n";
2004-01-05 00:51:54 +01:00
#endif
m_peer_interested = false;
m_torrent->get_policy().not_interested(*this);
}
2004-01-05 00:51:54 +01:00
// -----------------------------
// ----------- HAVE ------------
// -----------------------------
2004-01-05 00:51:54 +01:00
void peer_connection::on_have(int received)
{
if (m_packet_size != 5)
throw protocol_error("'have' message size != 5");
m_statistics.received_bytes(0, received);
if (m_recv_pos < m_packet_size) return;
2003-12-07 06:53:04 +01:00
2004-01-05 00:51:54 +01:00
const char* ptr = &m_recv_buffer[1];
int index = detail::read_int(ptr);
// if we got an invalid message, abort
if (index >= m_have_piece.size() || index < 0)
throw protocol_error("have message with higher index than the number of pieces");
2004-01-04 05:29:13 +01:00
2004-01-05 00:51:54 +01:00
#ifndef NDEBUG
2004-01-12 21:31:27 +01:00
(*m_logger) << " <== HAVE [ piece: " << index << "]\n";
2004-01-05 00:51:54 +01:00
#endif
if (m_have_piece[index])
{
#ifndef NDEBUG
2004-01-05 02:30:34 +01:00
(*m_logger) << " oops.. we already knew that: " << index << "\n";
2004-01-05 00:51:54 +01:00
#endif
}
else
{
m_have_piece[index] = true;
m_torrent->peer_has(index);
if (!m_torrent->have_piece(index) && !is_interesting())
m_torrent->get_policy().peer_is_interesting(*this);
2004-01-12 21:31:27 +01:00
if (m_torrent->is_seed() && is_seed())
{
throw protocol_error("seed to seed connection redundant, disconnecting");
}
2004-01-05 00:51:54 +01:00
}
}
2004-01-04 05:29:13 +01:00
2004-01-05 00:51:54 +01:00
// -----------------------------
// --------- BITFIELD ----------
// -----------------------------
2004-01-04 05:29:13 +01:00
2004-01-05 00:51:54 +01:00
void peer_connection::on_bitfield(int received)
{
if (m_packet_size - 1 != (m_have_piece.size() + 7) / 8)
throw protocol_error("bitfield with invalid size");
m_statistics.received_bytes(0, received);
if (m_recv_pos < m_packet_size) return;
2004-01-04 05:29:13 +01:00
2004-01-05 00:51:54 +01:00
#ifndef NDEBUG
2004-01-05 02:30:34 +01:00
(*m_logger) << " <== BITFIELD\n";
2004-01-05 00:51:54 +01:00
#endif
// build a vector of all pieces
std::vector<int> piece_list;
for (std::size_t i = 0; i < m_have_piece.size(); ++i)
{
bool have = m_recv_buffer[1 + (i>>3)] & (1 << (7 - (i&7)));
if (have && !m_have_piece[i])
{
m_have_piece[i] = true;
piece_list.push_back(i);
}
else if (!have && m_have_piece[i])
{
2004-01-05 00:51:54 +01:00
m_have_piece[i] = false;
m_torrent->peer_lost(i);
}
}
2004-01-04 05:29:13 +01:00
2004-01-05 00:51:54 +01:00
// shuffle the piece list
std::random_shuffle(piece_list.begin(), piece_list.end());
2004-01-04 05:29:13 +01:00
2004-01-05 00:51:54 +01:00
// let the torrent know which pieces the
// peer has, in a shuffled order
bool interesting = false;
for (std::vector<int>::iterator i = piece_list.begin();
i != piece_list.end();
++i)
{
int index = *i;
m_torrent->peer_has(index);
if (!m_torrent->have_piece(index))
interesting = true;
}
2004-01-04 05:29:13 +01:00
2004-01-08 14:03:38 +01:00
if (piece_list.size() == m_have_piece.size())
2004-01-05 00:51:54 +01:00
{
#ifndef NDEBUG
2004-01-08 14:03:38 +01:00
(*m_logger) << " *** THIS IS A SEED ***\n";
2004-01-05 00:51:54 +01:00
#endif
2004-01-08 14:03:38 +01:00
// if we're a seed too, disconnect
if (m_torrent->is_seed())
{
#ifndef NDEBUG
(*m_logger) << " we're also a seed, disconnecting\n";
#endif
2004-01-12 21:31:27 +01:00
throw protocol_error("seed to seed connection redundant, disconnecting");
2004-01-08 14:03:38 +01:00
}
2004-01-05 00:51:54 +01:00
}
2004-01-04 05:29:13 +01:00
2004-01-05 00:51:54 +01:00
if (interesting) m_torrent->get_policy().peer_is_interesting(*this);
}
2003-12-17 20:03:23 +01:00
2004-01-05 00:51:54 +01:00
// -----------------------------
// ---------- REQUEST ----------
// -----------------------------
2003-12-17 20:03:23 +01:00
2004-01-05 00:51:54 +01:00
void peer_connection::on_request(int received)
{
if (m_packet_size != 13)
throw protocol_error("'request' message size != 13");
m_statistics.received_bytes(0, received);
if (m_recv_pos < m_packet_size) return;
2003-12-17 20:03:23 +01:00
2004-01-05 00:51:54 +01:00
peer_request r;
const char* ptr = &m_recv_buffer[1];
r.piece = detail::read_int(ptr);
r.start = detail::read_int(ptr);
r.length = detail::read_int(ptr);
// make sure this request
// is legal and taht the peer
// is not choked
if (r.piece >= 0
&& r.piece < m_torrent->torrent_file().num_pieces()
2004-01-12 21:31:27 +01:00
&& m_torrent->have_piece(r.piece)
2004-01-05 00:51:54 +01:00
&& r.start >= 0
&& r.start < m_torrent->torrent_file().piece_size(r.piece)
&& r.length > 0
2004-01-05 02:30:34 +01:00
&& r.length + r.start <= m_torrent->torrent_file().piece_size(r.piece)
2004-01-05 00:51:54 +01:00
&& m_peer_interested)
{
// if we have choked the client
// ignore the request
2004-01-05 02:30:34 +01:00
if (m_choked)
return;
2004-01-04 05:29:13 +01:00
2004-01-05 00:51:54 +01:00
m_requests.push_back(r);
send_buffer_updated();
#ifndef NDEBUG
2004-01-05 02:30:34 +01:00
(*m_logger) << " <== REQUEST [ piece: " << r.piece << " | s: " << r.start << " | l: " << r.length << " ]\n";
2004-01-05 00:51:54 +01:00
#endif
}
else
{
2004-01-05 02:30:34 +01:00
#ifndef NDEBUG
2004-01-12 21:31:27 +01:00
(*m_logger) << " <== INVALID_REQUEST [ "
2004-01-05 02:30:34 +01:00
"piece: " << r.piece << " | "
"s: " << r.start << " | "
"l: " << r.length << " | "
"i: " << m_peer_interested << " | "
"t: " << (int)m_torrent->torrent_file().piece_size(r.piece) << " | "
"n: " << m_torrent->torrent_file().num_pieces() << " ]\n";
#endif
2004-01-12 21:31:27 +01:00
++m_num_invalid_requests;
2004-01-08 14:03:38 +01:00
if (m_torrent->alerts().should_post(alert::debug))
{
m_torrent->alerts().post_alert(invalid_request_alert(
r
, m_torrent->get_handle()
, m_peer_id
, "peer sent an illegal request, ignoring"));
}
2004-01-05 00:51:54 +01:00
}
}
2004-01-05 00:51:54 +01:00
// -----------------------------
// ----------- PIECE -----------
// -----------------------------
2004-01-05 00:51:54 +01:00
void peer_connection::on_piece(int received)
{
2004-01-12 21:31:27 +01:00
if (m_recv_pos - received <= 9)
{
m_last_piece = boost::posix_time::second_clock::local_time();
}
2004-01-08 14:03:38 +01:00
// classify the received data as protocol chatter
// or data payload for the statistics
2004-01-05 00:51:54 +01:00
if (m_recv_pos <= 9)
// only received protocol data
m_statistics.received_bytes(0, received);
else if (m_recv_pos - received >= 9)
// only received payload data
m_statistics.received_bytes(received, 0);
else
{
// received a bit of both
assert(m_recv_pos - received < 9);
assert(m_recv_pos > 9);
assert(9 - (m_recv_pos - received) <= 9);
m_statistics.received_bytes(
m_recv_pos - 9
, 9 - (m_recv_pos - received));
}
2004-01-05 00:51:54 +01:00
if (m_recv_pos < m_packet_size) return;
2004-01-12 21:31:27 +01:00
m_last_piece_time = m_last_piece
- boost::posix_time::second_clock::local_time();
2004-01-05 00:51:54 +01:00
const char* ptr = &m_recv_buffer[1];
2004-01-08 14:03:38 +01:00
peer_request p;
p.piece = detail::read_int(ptr);
p.start = detail::read_int(ptr);
p.length = m_packet_size - 9;
2003-12-07 06:53:04 +01:00
2004-01-08 14:03:38 +01:00
if (!verify_piece(p))
2004-01-05 00:51:54 +01:00
{
#ifndef NDEBUG
2004-01-08 14:03:38 +01:00
(*m_logger) << " <== INVALID_PIECE [ piece: " << p.piece << " | "
"start: " << p.start << " | "
"length: " << p.length << " ]\n";
2004-01-05 00:51:54 +01:00
#endif
2004-01-08 14:03:38 +01:00
throw protocol_error("invalid piece packet");
2004-01-05 00:51:54 +01:00
}
2004-01-04 05:29:13 +01:00
2004-01-05 00:51:54 +01:00
#ifndef NDEBUG
2004-01-12 21:31:27 +01:00
for (std::deque<piece_block>::iterator i = m_download_queue.begin();
i != m_download_queue.end();
++i)
{
if (i->piece_index == p.piece
&& i->block_index == p.start / m_torrent->block_size())
break;
2004-01-13 04:08:59 +01:00
(*m_logger) << " *** SKIPPED_PIECE [ piece: " << i->piece_index << " | "
"b: " << i->block_index << " ] ***\n";
2004-01-12 21:31:27 +01:00
}
(*m_logger) << " <== PIECE [ piece: " << p.piece << " | "
"b: " << p.start / m_torrent->block_size() << " | "
2004-01-08 14:03:38 +01:00
"s: " << p.start << " | "
"l: " << p.length << " ]\n";
2004-01-05 00:51:54 +01:00
#endif
2004-01-05 00:51:54 +01:00
piece_picker& picker = m_torrent->picker();
2004-01-08 14:03:38 +01:00
piece_block block_finished(p.piece, p.start / m_torrent->block_size());
2004-01-04 05:29:13 +01:00
2004-01-13 04:08:59 +01:00
// if the block we got is already finished, then ignore it
if (picker.is_finished(block_finished)) return;
2004-01-05 00:51:54 +01:00
std::deque<piece_block>::iterator b
= std::find(
m_download_queue.begin()
, m_download_queue.end()
, block_finished);
2003-12-08 22:59:48 +01:00
2004-01-05 00:51:54 +01:00
if (b != m_download_queue.end())
{
// pop the request that just finished
// from the download queue
m_download_queue.erase(b);
}
else
{
2004-01-13 04:08:59 +01:00
// cancel the block from the
2004-01-05 00:51:54 +01:00
// peer that has taken over it.
2004-01-13 04:08:59 +01:00
boost::optional<address> peer = m_torrent->picker().get_downloader(block_finished);
if (peer)
{
peer_connection* pc = m_torrent->connection_for(*peer);
if (pc && pc != this)
{
pc->send_cancel(block_finished);
}
}
else
{
if (m_torrent->alerts().should_post(alert::debug))
{
m_torrent->alerts().post_alert(
peer_error_alert(
m_peer_id
, "got a block that was not requested"));
}
#ifndef NDEBUG
(*m_logger) << " *** The block we just got was not requested ***\n";
#endif
}
2004-01-05 00:51:54 +01:00
}
2003-12-08 22:59:48 +01:00
2004-01-08 14:03:38 +01:00
m_torrent->filesystem().write(&m_recv_buffer[9], p.piece, p.start, p.length);
2004-01-19 20:36:55 +01:00
bool was_seed = m_torrent->is_seed();
2004-01-18 11:22:18 +01:00
2004-01-12 21:31:27 +01:00
picker.mark_as_finished(block_finished, m_socket->sender());
2004-01-05 00:51:54 +01:00
m_torrent->get_policy().block_finished(*this, block_finished);
2004-01-05 00:51:54 +01:00
// did we just finish the piece?
2004-01-08 14:03:38 +01:00
if (picker.is_piece_finished(p.piece))
2004-01-05 00:51:54 +01:00
{
2004-01-08 14:03:38 +01:00
bool verified = m_torrent->verify_piece(p.piece);
2004-01-05 00:51:54 +01:00
if (verified)
{
2004-01-08 14:03:38 +01:00
m_torrent->announce_piece(p.piece);
2004-01-05 00:51:54 +01:00
}
else
{
2004-01-08 14:03:38 +01:00
m_torrent->piece_failed(p.piece);
2004-01-05 00:51:54 +01:00
}
2004-01-08 14:03:38 +01:00
m_torrent->get_policy().piece_finished(p.piece, verified);
2004-01-18 11:22:18 +01:00
2004-01-19 20:36:55 +01:00
if (!was_seed && m_torrent->is_seed())
2004-01-18 11:22:18 +01:00
{
assert(verified);
2004-01-19 20:36:55 +01:00
if (m_torrent->alerts().should_post(alert::info))
2004-01-18 20:12:18 +01:00
{
m_torrent->alerts().post_alert(torrent_finished_alert(
m_torrent->get_handle()
, "torrent is finished downloading"));
}
2004-01-20 12:01:50 +01:00
m_torrent->disconnect_seeds();
2004-01-18 11:22:18 +01:00
}
2004-01-05 00:51:54 +01:00
}
}
2004-01-05 00:51:54 +01:00
// -----------------------------
// ---------- CANCEL -----------
// -----------------------------
2004-01-05 00:51:54 +01:00
void peer_connection::on_cancel(int received)
{
if (m_packet_size != 13)
throw protocol_error("'cancel' message size != 13");
m_statistics.received_bytes(0, received);
if (m_recv_pos < m_packet_size) return;
2003-11-05 00:27:06 +01:00
2004-01-05 00:51:54 +01:00
peer_request r;
const char* ptr = &m_recv_buffer[1];
r.piece = detail::read_int(ptr);
r.start = detail::read_int(ptr);
r.length = detail::read_int(ptr);
2004-01-04 05:29:13 +01:00
2004-01-05 00:51:54 +01:00
std::deque<peer_request>::iterator i
= std::find(m_requests.begin(), m_requests.end(), r);
if (i != m_requests.end())
{
m_requests.erase(i);
}
2003-11-05 00:27:06 +01:00
2004-01-05 00:51:54 +01:00
if (!has_data() && m_added_to_selector)
{
m_added_to_selector = false;
m_selector.remove_writable(m_socket);
}
2003-11-05 00:27:06 +01:00
2004-01-05 00:51:54 +01:00
#ifndef NDEBUG
2004-01-12 21:31:27 +01:00
(*m_logger) << " <== CANCEL [ piece: " << r.piece << " | s: " << r.start << " | l: " << r.length << " ]\n";
2004-01-05 00:51:54 +01:00
#endif
}
2003-11-05 00:27:06 +01:00
2004-01-05 00:51:54 +01:00
// -----------------------------
// ------ EXTENSION LIST -------
// -----------------------------
2004-01-05 00:51:54 +01:00
void peer_connection::on_extension_list(int received)
{
if (m_packet_size > 100 * 1024)
{
// too big extension message, abort
throw protocol_error("'extensions' message size > 100kB");
}
m_statistics.received_bytes(0, received);
if (m_recv_pos < m_packet_size) return;
2003-12-01 22:27:27 +01:00
2004-01-05 00:51:54 +01:00
try
{
entry e = bdecode(m_recv_buffer.begin()+1, m_recv_buffer.end());
entry::dictionary_type& extensions = e.dict();
for (int i = 0; i < num_supported_extensions; ++i)
{
entry::dictionary_type::iterator f =
extensions.find(extension_names[i]);
if (f != extensions.end())
{
2004-01-05 00:51:54 +01:00
m_extension_messages[i] = f->second.integer();
}
2004-01-05 00:51:54 +01:00
}
2004-01-12 04:05:10 +01:00
#ifndef NDEBUG
(*m_logger) << "supported extensions:\n";
for (entry::dictionary_type::const_iterator i = extensions.begin();
i != extensions.end();
++i)
{
(*m_logger) << i->first << "\n";
}
#endif
2004-01-05 00:51:54 +01:00
}
catch(invalid_encoding& e)
{
throw protocol_error("'extensions' packet contains invalid bencoding");
}
catch(type_error& e)
{
throw protocol_error("'extensions' packet contains incorrect types");
}
}
2004-01-05 00:51:54 +01:00
// -----------------------------
// --------- EXTENDED ----------
// -----------------------------
2004-01-05 00:51:54 +01:00
void peer_connection::on_extended(int received)
{
2004-01-07 01:48:02 +01:00
m_statistics.received_bytes(0, received);
if (m_packet_size < 5)
throw protocol_error("'extended' message smaller than 5 bytes");
if (m_torrent == 0)
throw protocol_error("'extended' message sent before proper handshake");
if (m_recv_pos < 5) return;
const char* ptr = &m_recv_buffer[1];
int extended_id = detail::read_int(ptr);
switch (extended_id)
{
case extended_chat_message:
{
if (m_packet_size > 2 * 1024)
throw protocol_error("CHAT message larger than 2 kB");
2004-01-07 13:07:16 +01:00
if (m_recv_pos < m_packet_size) return;
2004-01-07 01:48:02 +01:00
try
{
entry d = bdecode(m_recv_buffer.begin()+5, m_recv_buffer.end());
entry::dictionary_type::const_iterator i = d.dict().find("msg");
if (i == d.dict().end())
throw protocol_error("CHAT message did not contain any 'msg'");
const std::string& str = i->second.string();
if (m_torrent->alerts().should_post(alert::critical))
{
m_torrent->alerts()
.post_alert(chat_message_alert(m_torrent->get_handle(), m_peer_id, str));
}
}
catch (invalid_encoding& e)
{
throw protocol_error("invalid bencoding in CHAT message");
}
catch (type_error& e)
{
throw protocol_error("invalid types in bencoded CHAT message");
}
return;
}
default:
throw protocol_error("unknown extended message id");
};
2004-01-05 00:51:54 +01:00
}
2003-12-07 06:53:04 +01:00
2004-01-04 05:29:13 +01:00
2004-01-04 05:29:13 +01:00
2004-01-05 00:51:54 +01:00
2004-01-20 12:01:50 +01:00
void peer_connection::disconnect()
{
2004-01-20 23:59:21 +01:00
assert(m_disconnecting == false);
2004-01-20 12:01:50 +01:00
detail::session_impl::connection_map::iterator i = m_ses.m_connections.find(m_socket);
2004-01-20 23:59:21 +01:00
m_disconnecting = true;
2004-01-20 12:01:50 +01:00
assert(i != m_ses.m_connections.end());
assert(std::find(m_ses.m_disconnect_peer.begin(), m_ses.m_disconnect_peer.end(), i) == m_ses.m_disconnect_peer.end());
m_ses.m_disconnect_peer.push_back(i);
}
2004-01-05 00:51:54 +01:00
bool peer_connection::dispatch_message(int received)
{
assert(m_recv_pos >= received);
assert(m_recv_pos > 0);
2004-01-07 01:48:02 +01:00
assert(m_torrent);
2004-01-05 00:51:54 +01:00
int packet_type = m_recv_buffer[0];
if (packet_type < 0
|| packet_type >= num_supported_messages
|| m_message_handler[packet_type] == 0)
{
2004-01-04 05:35:25 +01:00
throw protocol_error("unknown message id");
}
2004-01-05 00:51:54 +01:00
assert(m_message_handler[packet_type] != 0);
// call the correct handler for this packet type
(this->*m_message_handler[packet_type])(received);
if (m_recv_pos < m_packet_size) return false;
2004-01-04 05:29:13 +01:00
assert(m_recv_pos == m_packet_size);
return true;
}
2004-01-12 21:31:27 +01:00
void peer_connection::send_cancel(piece_block block)
2004-01-04 05:29:13 +01:00
{
assert(block.piece_index >= 0);
assert(block.piece_index < m_torrent->torrent_file().num_pieces());
assert(m_torrent->picker().is_downloading(block));
2003-11-05 00:27:06 +01:00
2004-01-04 05:29:13 +01:00
m_torrent->picker().abort_download(block);
2003-11-05 00:27:06 +01:00
2004-01-04 05:29:13 +01:00
std::deque<piece_block>::iterator i
= std::find(m_download_queue.begin(), m_download_queue.end(), block);
assert(i != m_download_queue.end());
2003-11-05 00:27:06 +01:00
2004-01-04 05:29:13 +01:00
m_download_queue.erase(i);
2003-11-05 00:27:06 +01:00
2004-01-04 05:29:13 +01:00
int block_offset = block.block_index * m_torrent->block_size();
int block_size
= std::min((int)m_torrent->torrent_file().piece_size(block.piece_index)-block_offset,
m_torrent->block_size());
assert(block_size > 0);
assert(block_size <= m_torrent->block_size());
2003-11-05 00:27:06 +01:00
2004-01-04 05:29:13 +01:00
char buf[] = {0,0,0,13, msg_cancel};
2003-11-05 00:27:06 +01:00
2004-01-04 05:29:13 +01:00
std::size_t start_offset = m_send_buffer.size();
m_send_buffer.resize(start_offset + 17);
2003-11-05 00:27:06 +01:00
2004-01-04 05:29:13 +01:00
std::copy(buf, buf + 5, m_send_buffer.begin()+start_offset);
start_offset += 5;
2003-11-05 00:27:06 +01:00
2004-01-04 05:29:13 +01:00
char* ptr = &m_send_buffer[start_offset];
2003-11-05 00:27:06 +01:00
2004-01-04 05:29:13 +01:00
// index
detail::write_int(block.piece_index, ptr);
// begin
detail::write_int(block_offset, ptr);
// length
detail::write_int(block_size, ptr);
2004-01-02 21:46:24 +01:00
2004-01-04 05:29:13 +01:00
#ifndef NDEBUG
2004-01-12 21:31:27 +01:00
(*m_logger) << " ==> CANCEL [ piece: " << block.piece_index << " | s: " << block_offset << " | l: " << block_size << " | " << block.block_index << " ]\n";
2004-01-04 05:29:13 +01:00
#endif
2003-11-05 00:27:06 +01:00
2004-01-04 05:29:13 +01:00
send_buffer_updated();
}
2003-11-05 00:27:06 +01:00
2004-01-12 21:31:27 +01:00
void peer_connection::send_request(piece_block block)
2004-01-04 05:29:13 +01:00
{
assert(block.piece_index >= 0);
assert(block.piece_index < m_torrent->torrent_file().num_pieces());
assert(!m_torrent->picker().is_downloading(block));
2004-01-12 21:31:27 +01:00
m_torrent->picker().mark_as_downloading(block, m_socket->sender());
2004-01-04 05:29:13 +01:00
m_download_queue.push_back(block);
2004-01-04 05:29:13 +01:00
int block_offset = block.block_index * m_torrent->block_size();
int block_size
= std::min((int)m_torrent->torrent_file().piece_size(block.piece_index)-block_offset,
m_torrent->block_size());
assert(block_size > 0);
assert(block_size <= m_torrent->block_size());
2004-01-04 05:29:13 +01:00
char buf[] = {0,0,0,13, msg_request};
2004-01-04 05:29:13 +01:00
std::size_t start_offset = m_send_buffer.size();
m_send_buffer.resize(start_offset + 17);
2004-01-04 05:29:13 +01:00
std::copy(buf, buf + 5, m_send_buffer.begin()+start_offset);
2004-01-04 05:29:13 +01:00
char* ptr = &m_send_buffer[start_offset+5];
// index
detail::write_int(block.piece_index, ptr);
2004-01-04 05:29:13 +01:00
// begin
detail::write_int(block_offset, ptr);
2004-01-04 05:29:13 +01:00
// length
detail::write_int(block_size, ptr);
2004-01-02 21:46:24 +01:00
2004-01-04 05:29:13 +01:00
#ifndef NDEBUG
2004-01-12 21:31:27 +01:00
(*m_logger) << " ==> REQUEST [ "
"piece: " << block.piece_index << " | "
"b: " << block.block_index << " | "
"s: " << block_offset << " | "
"l: " << block_size << " ]\n";
2004-01-08 14:03:38 +01:00
peer_request r;
r.piece = block.piece_index;
r.start = block_offset;
r.length = block_size;
assert(verify_piece(r));
2004-01-04 05:29:13 +01:00
#endif
2004-01-04 05:29:13 +01:00
send_buffer_updated();
}
2004-01-07 01:48:02 +01:00
void peer_connection::send_chat_message(const std::string& msg)
{
assert(msg.length() <= 1 * 1024);
2004-01-07 13:07:16 +01:00
if (m_extension_messages[extended_chat_message] == -1) return;
2004-01-07 01:48:02 +01:00
entry e(entry::dictionary_t);
e.dict()["msg"] = msg;
std::vector<char> message;
bencode(std::back_inserter(message), e);
std::back_insert_iterator<std::vector<char> > ptr(m_send_buffer);
detail::write_uint(1 + 4 + message.size(), ptr);
detail::write_uchar(msg_extended, ptr);
detail::write_int(m_extension_messages[extended_chat_message], ptr);
std::copy(message.begin(), message.end(), ptr);
send_buffer_updated();
}
2004-01-04 05:29:13 +01:00
void peer_connection::send_bitfield()
{
2004-01-04 05:29:13 +01:00
#ifndef NDEBUG
2004-01-05 02:30:34 +01:00
(*m_logger) << " ==> BITFIELD\n";
2004-01-04 05:29:13 +01:00
#endif
const int packet_size = (m_have_piece.size() + 7) / 8 + 5;
const int old_size = m_send_buffer.size();
m_send_buffer.resize(old_size + packet_size);
char* ptr = &m_send_buffer[old_size];
detail::write_int(packet_size - 4, ptr);
m_send_buffer[old_size+4] = msg_bitfield;
std::fill(m_send_buffer.begin()+old_size+5, m_send_buffer.end(), 0);
for (std::size_t i = 0; i < m_have_piece.size(); ++i)
{
if (m_torrent->have_piece(i))
m_send_buffer[old_size + 5 + (i>>3)] |= 1 << (7 - (i&7));
}
send_buffer_updated();
}
2004-01-04 05:29:13 +01:00
void peer_connection::send_extensions()
{
#ifndef NDEBUG
2004-01-05 02:30:34 +01:00
(*m_logger) << " ==> EXTENSIONS\n";
2004-01-04 05:29:13 +01:00
#endif
assert(m_supports_extensions);
2004-01-04 05:29:13 +01:00
entry extension_list(entry::dictionary_t);
2004-01-04 05:29:13 +01:00
for (int i = 0; i < num_supported_extensions; ++i)
{
2004-01-07 01:48:02 +01:00
extension_list.dict()[extension_names[i]] = i;
2004-01-04 05:29:13 +01:00
}
2004-01-04 05:29:13 +01:00
// make room for message size
const int msg_size_pos = m_send_buffer.size();
m_send_buffer.resize(msg_size_pos + 4);
2004-01-05 00:51:54 +01:00
m_send_buffer.push_back(msg_extension_list);
2004-01-04 06:28:24 +01:00
2004-01-04 05:29:13 +01:00
bencode(std::back_inserter(m_send_buffer), extension_list);
// write the length of the message
char* ptr = &m_send_buffer[msg_size_pos];
2004-01-04 06:28:24 +01:00
detail::write_int(m_send_buffer.size() - msg_size_pos - 4, ptr);
2004-01-04 05:29:13 +01:00
send_buffer_updated();
}
2004-01-12 21:31:27 +01:00
void peer_connection::send_choke()
2004-01-04 05:29:13 +01:00
{
if (m_choked) return;
char msg[] = {0,0,0,1,msg_choke};
m_send_buffer.insert(m_send_buffer.end(), msg, msg+sizeof(msg));
m_choked = true;
#ifndef NDEBUG
2004-01-05 02:30:34 +01:00
(*m_logger) << " ==> CHOKE\n";
2004-01-04 05:29:13 +01:00
#endif
2004-01-12 21:31:27 +01:00
m_num_invalid_requests = 0;
2004-01-04 05:29:13 +01:00
m_requests.clear();
send_buffer_updated();
}
2004-01-12 21:31:27 +01:00
void peer_connection::send_unchoke()
2004-01-04 05:29:13 +01:00
{
if (!m_choked) return;
char msg[] = {0,0,0,1,msg_unchoke};
m_send_buffer.insert(m_send_buffer.end(), msg, msg+sizeof(msg));
m_choked = false;
#ifndef NDEBUG
2004-01-05 02:30:34 +01:00
(*m_logger) << " ==> UNCHOKE\n";
2004-01-04 05:29:13 +01:00
#endif
send_buffer_updated();
}
2004-01-12 21:31:27 +01:00
void peer_connection::send_interested()
2004-01-04 05:29:13 +01:00
{
if (m_interesting) return;
char msg[] = {0,0,0,1,msg_interested};
m_send_buffer.insert(m_send_buffer.end(), msg, msg+sizeof(msg));
m_interesting = true;
#ifndef NDEBUG
2004-01-05 02:30:34 +01:00
(*m_logger) << " ==> INTERESTED\n";
2004-01-04 05:29:13 +01:00
#endif
send_buffer_updated();
}
2004-01-12 21:31:27 +01:00
void peer_connection::send_not_interested()
2004-01-04 05:29:13 +01:00
{
if (!m_interesting) return;
char msg[] = {0,0,0,1,msg_not_interested};
m_send_buffer.insert(m_send_buffer.end(), msg, msg+sizeof(msg));
m_interesting = false;
#ifndef NDEBUG
2004-01-05 02:30:34 +01:00
(*m_logger) << " ==> NOT_INTERESTED\n";
2004-01-04 05:29:13 +01:00
#endif
send_buffer_updated();
}
void peer_connection::send_have(int index)
{
2004-01-08 14:03:38 +01:00
// optimization, don't send have messages
// to peers that already have the piece
if (m_have_piece[index]) return;
2004-01-04 05:29:13 +01:00
const int packet_size = 9;
char msg[packet_size] = {0,0,0,5,msg_have};
char* ptr = msg+5;
detail::write_int(index, ptr);
m_send_buffer.insert(m_send_buffer.end(), msg, msg + packet_size);
#ifndef NDEBUG
2004-01-12 21:31:27 +01:00
(*m_logger) << " ==> HAVE [ piece: " << index << " ]\n";
2004-01-04 05:29:13 +01:00
#endif
send_buffer_updated();
}
2004-01-12 04:05:10 +01:00
int peer_connection::share_diff() const
{
float ratio = m_torrent->ratio();
// if we have an infinite ratio, just say we have downloaded
// much more than we have uploaded. And we'll keep uploading.
2004-01-12 21:31:27 +01:00
if (ratio == 0.f) return std::numeric_limits<int>::max();
2004-01-12 04:05:10 +01:00
return m_free_upload
2004-01-12 21:31:27 +01:00
+ static_cast<int>(m_statistics.total_payload_download() * ratio)
2004-01-12 04:05:10 +01:00
- m_statistics.total_payload_upload();
}
2004-01-04 05:29:13 +01:00
void peer_connection::second_tick()
{
m_statistics.second_tick();
m_send_quota_left = m_send_quota;
if (m_send_quota > 0) send_buffer_updated();
2003-12-07 06:53:04 +01:00
2004-01-04 05:29:13 +01:00
// If the client sends more data
// we send it data faster, otherwise, slower.
// It will also depend on how much data the
// client has sent us. This is the mean to
2004-01-14 13:53:17 +01:00
// maintain the share ratio given by m_ratio
// with all peers.
2003-12-07 06:53:04 +01:00
2004-01-04 05:29:13 +01:00
int diff = share_diff();
2003-12-07 06:53:04 +01:00
2004-01-14 13:19:51 +01:00
enum { block_limit=2 }; // how many blocks difference is considered unfair
2004-01-14 12:46:26 +01:00
2004-01-14 13:19:51 +01:00
if (diff > block_limit*m_torrent->block_size() || m_torrent->is_seed())
2003-12-09 19:09:34 +01:00
{
2004-01-04 05:29:13 +01:00
// if we have downloaded more than one piece more
2004-01-05 00:51:54 +01:00
// than we have uploaded OR if we are a seed
// have an unlimited upload rate
2004-01-04 05:29:13 +01:00
m_send_quota_limit = -1;
2003-12-09 19:09:34 +01:00
}
else
{
2004-01-12 04:05:10 +01:00
float ratio = m_torrent->ratio();
2004-01-04 05:29:13 +01:00
// if we have downloaded too much, response with an
// upload rate of 10 kB/s more than we dowlload
// if we have uploaded too much, send with a rate of
// 10 kB/s less than we receive
int bias = 0;
2004-01-14 13:19:51 +01:00
if (diff > -block_limit*m_torrent->block_size())
2004-01-04 05:29:13 +01:00
{
2004-01-12 21:31:27 +01:00
bias = static_cast<int>(m_statistics.download_rate() * ratio) / 2;
2004-01-04 05:29:13 +01:00
if (bias < 10*1024) bias = 10*1024;
}
else
{
2004-01-12 21:31:27 +01:00
bias = -static_cast<int>(m_statistics.download_rate() * ratio) / 2;
2004-01-04 05:29:13 +01:00
}
2004-01-12 21:31:27 +01:00
m_send_quota_limit = static_cast<int>(m_statistics.download_rate()) + bias;
2004-01-12 04:05:10 +01:00
2004-01-04 05:29:13 +01:00
// the maximum send_quota given our download rate from this peer
if (m_send_quota_limit < 256) m_send_quota_limit = 256;
2004-01-12 04:05:10 +01:00
// if the peer has been choked, send tha current piece
// as fast as possible
if (is_choked()) m_send_quota_limit = -1;
2003-12-09 19:09:34 +01:00
}
2003-12-08 02:37:30 +01:00
}
2004-01-04 05:29:13 +01:00
// --------------------------
// RECEIVE DATA
// --------------------------
2004-01-04 05:29:13 +01:00
// throws exception when the client should be disconnected
void peer_connection::receive_data()
{
2004-01-04 05:29:13 +01:00
assert(!m_socket->is_blocking());
assert(m_packet_size > 0);
2004-01-04 06:53:01 +01:00
assert(m_socket->is_readable());
2004-01-04 05:29:13 +01:00
for(;;)
2003-10-30 00:28:09 +01:00
{
2004-01-04 05:29:13 +01:00
assert(m_packet_size > 0);
int received = m_socket->receive(&m_recv_buffer[m_recv_pos], m_packet_size - m_recv_pos);
2004-01-04 05:29:13 +01:00
// connection closed
if (received == 0)
{
2004-01-05 02:30:34 +01:00
throw protocol_error("connection closed by remote host");
2004-01-04 05:29:13 +01:00
}
2003-10-30 00:28:09 +01:00
2004-01-04 05:29:13 +01:00
// an error
if (received < 0)
{
// would_block means that no data was ready to be received
// returns to exit the loop
if (m_socket->last_error() == socket::would_block)
return;
2004-01-04 05:29:13 +01:00
// the connection was closed
throw network_error(m_socket->last_error());
}
2004-01-04 05:29:13 +01:00
if (received > 0)
2003-10-30 00:28:09 +01:00
{
2004-01-04 05:29:13 +01:00
m_last_receive = boost::posix_time::second_clock::local_time();
2004-01-04 05:29:13 +01:00
m_recv_pos += received;
2003-11-09 19:17:09 +01:00
2004-01-04 05:29:13 +01:00
switch(m_state)
2003-12-08 22:59:48 +01:00
{
2004-01-04 05:29:13 +01:00
case read_protocol_length:
2003-10-30 00:28:09 +01:00
2003-12-07 06:53:04 +01:00
m_statistics.received_bytes(0, received);
2003-12-08 22:59:48 +01:00
if (m_recv_pos < m_packet_size) break;
assert(m_recv_pos == m_packet_size);
2004-01-04 05:29:13 +01:00
m_packet_size = reinterpret_cast<unsigned char&>(m_recv_buffer[0]);
#ifndef NDEBUG
2004-01-05 02:30:34 +01:00
(*m_logger) << " protocol length: " << m_packet_size << "\n";
2004-01-04 05:29:13 +01:00
#endif
m_state = read_protocol_string;
m_recv_buffer.resize(m_packet_size);
m_recv_pos = 0;
2004-01-05 02:30:34 +01:00
if (m_packet_size != 19)
2003-10-30 00:28:09 +01:00
{
2004-01-04 05:29:13 +01:00
#ifndef NDEBUG
(*m_logger) << "incorrect protocol length\n";
#endif
2004-01-05 02:30:34 +01:00
std::stringstream s;
s << "received incorrect protocol length ("
<< m_packet_size
<< ") should be 19.";
throw protocol_error(s.str());
}
2004-01-04 05:29:13 +01:00
break;
2003-10-30 00:28:09 +01:00
2004-01-04 05:29:13 +01:00
case read_protocol_string:
{
m_statistics.received_bytes(0, received);
if (m_recv_pos < m_packet_size) break;
assert(m_recv_pos == m_packet_size);
#ifndef NDEBUG
2004-01-05 02:30:34 +01:00
(*m_logger) << " protocol: '" << std::string(m_recv_buffer.begin(), m_recv_buffer.end()) << "'\n";
2004-01-04 05:29:13 +01:00
#endif
const char protocol_string[] = "BitTorrent protocol";
const int protocol_len = sizeof(protocol_string) - 1;
if (!std::equal(m_recv_buffer.begin(), m_recv_buffer.end(), protocol_string))
{
#ifndef NDEBUG
(*m_logger) << "incorrect protocol name\n";
#endif
2004-01-05 02:30:34 +01:00
std::stringstream s;
s << "got invalid protocol name: '"
<< std::string(m_recv_buffer.begin(), m_recv_buffer.end())
<< "'";
throw protocol_error(s.str());
2004-01-04 05:29:13 +01:00
}
m_state = read_info_hash;
m_packet_size = 28;
m_recv_pos = 0;
m_recv_buffer.resize(28);
}
break;
2003-10-30 00:28:09 +01:00
2003-12-08 22:59:48 +01:00
2004-01-04 05:29:13 +01:00
case read_info_hash:
{
2004-01-04 05:29:13 +01:00
m_statistics.received_bytes(0, received);
if (m_recv_pos < m_packet_size) break;
assert(m_recv_pos == m_packet_size);
// ok, now we have got enough of the handshake. Is this connection
// attached to a torrent?
2003-12-08 22:59:48 +01:00
// TODO: if the protocol is to be extended
// these 8 bytes would be used to describe the
// extensions available on the other side
2004-01-14 02:19:30 +01:00
// currently disabled
// if (m_recv_buffer[0] & 0x80)
// {
// m_supports_extensions = true;
// }
2003-12-08 22:59:48 +01:00
if (m_torrent == 0)
{
2004-01-04 05:29:13 +01:00
// now, we have to see if there's a torrent with the
// info_hash we got from the peer
sha1_hash info_hash;
std::copy(m_recv_buffer.begin()+8, m_recv_buffer.begin() + 28, (char*)info_hash.begin());
m_torrent = m_ses.find_torrent(info_hash);
if (m_torrent == 0)
{
// we couldn't find the torrent!
#ifndef NDEBUG
2004-01-05 02:30:34 +01:00
(*m_logger) << " couldn't find a torrent with the given info_hash\n";
2004-01-04 05:29:13 +01:00
#endif
2004-01-05 02:30:34 +01:00
throw protocol_error("got info-hash that is not in our session");
2004-01-04 05:29:13 +01:00
}
2004-01-04 05:29:13 +01:00
// assume the other end has no pieces
m_have_piece.resize(m_torrent->torrent_file().num_pieces());
std::fill(m_have_piece.begin(), m_have_piece.end(), false);
// yes, we found the torrent
// reply with our handshake
std::copy(m_recv_buffer.begin()+28, m_recv_buffer.begin() + 48, (char*)m_peer_id.begin());
send_handshake();
send_bitfield();
}
else
2003-12-08 22:59:48 +01:00
{
2004-01-04 05:29:13 +01:00
// verify info hash
if (!std::equal(m_recv_buffer.begin()+8, m_recv_buffer.begin() + 28, (const char*)m_torrent->torrent_file().info_hash().begin()))
{
#ifndef NDEBUG
2004-01-05 02:30:34 +01:00
(*m_logger) << " received invalid info_hash\n";
2004-01-04 05:29:13 +01:00
#endif
2004-01-05 02:30:34 +01:00
throw protocol_error("invalid info-hash in handshake");
2004-01-04 05:29:13 +01:00
}
}
2004-01-04 05:29:13 +01:00
if (m_supports_extensions) send_extensions();
2004-01-04 05:29:13 +01:00
m_state = read_peer_id;
m_packet_size = 20;
m_recv_pos = 0;
m_recv_buffer.resize(20);
#ifndef NDEBUG
2004-01-05 02:30:34 +01:00
(*m_logger) << " info_hash received\n";
2004-01-04 05:29:13 +01:00
#endif
break;
}
2003-12-07 06:53:04 +01:00
2003-12-08 22:59:48 +01:00
2004-01-04 05:29:13 +01:00
case read_peer_id:
2003-12-22 08:14:35 +01:00
{
2004-01-04 05:29:13 +01:00
m_statistics.received_bytes(0, received);
if (m_recv_pos < m_packet_size) break;
assert(m_recv_pos == m_packet_size);
2003-12-22 08:14:35 +01:00
2004-01-04 05:29:13 +01:00
#ifndef NDEBUG
{
2004-01-04 05:29:13 +01:00
peer_id tmp;
std::copy(m_recv_buffer.begin(), m_recv_buffer.begin() + 20, (char*)tmp.begin());
std::stringstream s;
s << "received peer_id: " << tmp << " client: " << identify_client(tmp) << "\n";
(*m_logger) << s.str();
}
2004-01-04 05:29:13 +01:00
#endif
2003-12-08 22:59:48 +01:00
2004-01-08 18:03:04 +01:00
if (!m_active)
2004-01-04 05:29:13 +01:00
{
// check to make sure we don't have another connection with the same
// info_hash and peer_id. If we do. close this connection.
std::copy(m_recv_buffer.begin(), m_recv_buffer.begin() + 20, (char*)m_peer_id.begin());
2004-01-04 05:29:13 +01:00
m_attached_to_torrent = true;
m_torrent->attach_peer(this);
assert(m_torrent->get_policy().has_connection(this));
}
2003-12-08 22:59:48 +01:00
m_state = read_packet_size;
m_packet_size = 4;
2004-01-04 05:29:13 +01:00
m_recv_pos = 0;
m_recv_buffer.resize(4);
break;
2003-12-08 22:59:48 +01:00
}
2004-01-04 05:29:13 +01:00
case read_packet_size:
2003-12-08 22:59:48 +01:00
{
2004-01-04 05:29:13 +01:00
m_statistics.received_bytes(0, received);
if (m_recv_pos < m_packet_size) break;
assert(m_recv_pos == m_packet_size);
// convert from big endian to native byte order
const char* ptr = &m_recv_buffer[0];
m_packet_size = detail::read_int(ptr);
// don't accept packets larger than 1 MB
if (m_packet_size > 1024*1024 || m_packet_size < 0)
{
#ifndef NDEBUG
2004-01-05 02:30:34 +01:00
(*m_logger) << " packet too large (packet_size > 1 Megabyte), abort\n";
2004-01-04 05:29:13 +01:00
#endif
// packet too large
2004-01-05 02:30:34 +01:00
throw protocol_error("packet > 1 MB");
2004-01-04 05:29:13 +01:00
}
if (m_packet_size == 0)
{
// keepalive message
m_state = read_packet_size;
m_packet_size = 4;
}
else
{
m_state = read_packet;
m_recv_buffer.resize(m_packet_size);
}
2003-10-30 00:28:09 +01:00
m_recv_pos = 0;
assert(m_packet_size > 0);
2004-01-04 05:29:13 +01:00
break;
}
case read_packet:
if (dispatch_message(received))
{
m_state = read_packet_size;
m_packet_size = 4;
m_recv_buffer.resize(4);
m_recv_pos = 0;
assert(m_packet_size > 0);
}
break;
}
}
}
2004-01-04 05:29:13 +01:00
assert(m_packet_size > 0);
}
2004-01-12 21:31:27 +01:00
bool peer_connection::has_data() const
2004-01-04 05:29:13 +01:00
{
// if we have requests or pending data to be sent or announcements to be made
// we want to send data
return ((!m_requests.empty() && !m_choked)
|| !m_send_buffer.empty()
|| !m_announce_queue.empty())
&& m_send_quota_left != 0;
}
// --------------------------
// SEND DATA
// --------------------------
// throws exception when the client should be disconnected
void peer_connection::send_data()
{
2004-01-04 05:29:13 +01:00
assert(m_socket->is_writable());
assert(has_data());
// only add new piece-chunks if the send buffer is small enough
// otherwise there will be no end to how large it will be!
// TODO: make this a bit better. Don't always read the entire
// requested block. Have a limit of how much of the requested
// block is actually read at a time.
while (!m_requests.empty()
&& (m_send_buffer.size() < m_torrent->block_size())
&& !m_choked)
{
2004-01-04 05:29:13 +01:00
peer_request& r = m_requests.front();
2004-01-12 21:31:27 +01:00
assert(r.piece >= 0 && r.piece < m_have_piece.size() && m_torrent && m_torrent->have_piece(r.piece));
assert(r.start + r.length <= m_torrent->torrent_file().piece_size(r.piece));
assert(r.length > 0 && r.start >= 0);
2004-01-12 21:31:27 +01:00
#ifndef NDEBUG
// assert(m_torrent->verify_piece(r.piece) && "internal error");
2004-01-12 21:31:27 +01:00
#endif
const int send_buffer_offset = m_send_buffer.size();
const int packet_size = 4 + 5 + 4 + r.length;
m_send_buffer.resize(send_buffer_offset + packet_size);
char* ptr = &m_send_buffer[send_buffer_offset];
detail::write_int(packet_size-4, ptr);
*ptr = msg_piece; ++ptr;
detail::write_int(r.piece, ptr);
detail::write_int(r.start, ptr);
m_torrent->filesystem().read(
&m_send_buffer[send_buffer_offset+13]
, r.piece
, r.start
, r.length);
#ifndef NDEBUG
(*m_logger) << " ==> PIECE [ piece: " << r.piece << " | s: " << r.start << " | l: " << r.length << " ]\n";
#endif
2004-01-04 05:29:13 +01:00
2004-01-12 21:31:27 +01:00
m_payloads.push_back(range(send_buffer_offset+13, r.length));
m_requests.erase(m_requests.begin());
if (m_requests.empty()
&& m_num_invalid_requests > 0
&& is_peer_interested()
&& !is_seed())
2003-12-17 17:37:20 +01:00
{
2004-01-12 21:31:27 +01:00
// this will make the peer clear
// its download queue and re-request
// pieces. Hopefully it will not
// send invalid requests then
send_choke();
send_unchoke();
2003-12-17 17:37:20 +01:00
}
}
2004-01-04 05:29:13 +01:00
if (!m_announce_queue.empty())
{
2004-01-04 05:29:13 +01:00
for (std::vector<int>::iterator i = m_announce_queue.begin();
i != m_announce_queue.end();
++i)
{
send_have(*i);
}
m_announce_queue.clear();
}
2004-01-04 05:29:13 +01:00
assert(m_send_quota_left != 0);
2003-11-09 19:17:09 +01:00
2004-01-04 05:29:13 +01:00
// send the actual buffer
if (!m_send_buffer.empty())
{
2003-11-05 00:27:06 +01:00
2004-01-04 05:29:13 +01:00
int amount_to_send = m_send_buffer.size();
assert(m_send_quota_left != 0);
if (m_send_quota_left > 0)
amount_to_send = std::min(m_send_quota_left, amount_to_send);
2004-01-17 21:04:19 +01:00
2004-01-04 05:29:13 +01:00
// we have data that's scheduled for sending
int sent = m_socket->send(
&m_send_buffer[0]
, amount_to_send);
2004-01-04 05:29:13 +01:00
if (sent > 0)
2003-11-09 19:17:09 +01:00
{
2004-01-04 05:29:13 +01:00
if (m_send_quota_left != -1)
{
assert(m_send_quota_left >= sent);
m_send_quota_left -= sent;
}
2004-01-17 21:04:19 +01:00
// manage the payload markers
2004-01-04 05:29:13 +01:00
int amount_payload = 0;
if (!m_payloads.empty())
2003-12-08 22:59:48 +01:00
{
2004-01-04 05:29:13 +01:00
for (std::deque<range>::iterator i = m_payloads.begin();
i != m_payloads.end();
++i)
2003-12-08 22:59:48 +01:00
{
2004-01-04 05:29:13 +01:00
i->start -= sent;
if (i->start < 0)
2003-12-08 22:59:48 +01:00
{
2004-01-04 05:29:13 +01:00
if (i->start + i->length <= 0)
{
amount_payload += i->length;
}
else
{
amount_payload += -i->start;
i->length -= -i->start;
i->start = 0;
}
2003-12-08 22:59:48 +01:00
}
}
}
2004-01-04 05:29:13 +01:00
// remove all payload ranges that has been sent
m_payloads.erase(
std::remove_if(m_payloads.begin(), m_payloads.end(), range_below_zero)
, m_payloads.end());
assert(amount_payload <= sent);
m_statistics.sent_bytes(amount_payload, sent - amount_payload);
// empty the entire buffer at once or if
// only a part of the buffer could be sent
// remove the part that was sent from the buffer
if (sent == m_send_buffer.size())
{
m_send_buffer.clear();
}
else
{
m_send_buffer.erase(
m_send_buffer.begin()
, m_send_buffer.begin() + sent);
}
2003-11-05 00:27:06 +01:00
}
else
2003-11-05 00:27:06 +01:00
{
2004-01-04 05:29:13 +01:00
assert(sent == -1);
throw network_error(m_socket->last_error());
2003-11-05 00:27:06 +01:00
}
2004-01-04 05:29:13 +01:00
m_last_sent = boost::posix_time::second_clock::local_time();
}
2004-01-04 05:29:13 +01:00
assert(m_added_to_selector);
send_buffer_updated();
2004-01-17 21:04:19 +01:00
/*
2004-01-04 05:29:13 +01:00
#ifndef NDEBUG
if (has_data())
2003-11-05 00:27:06 +01:00
{
2004-01-04 05:29:13 +01:00
if (m_socket->is_writable())
{
2004-01-15 20:32:03 +01:00
std::cout << "ERROR, not good\n";
2004-01-04 05:29:13 +01:00
}
2003-11-05 00:27:06 +01:00
}
2004-01-04 05:29:13 +01:00
#endif
2004-01-17 21:04:19 +01:00
*/
2003-11-05 00:27:06 +01:00
}
2004-01-04 05:29:13 +01:00
void peer_connection::keep_alive()
{
2004-01-04 05:29:13 +01:00
boost::posix_time::time_duration d;
d = boost::posix_time::second_clock::local_time() - m_last_sent;
if (d.seconds() > m_timeout / 2)
{
char noop[] = {0,0,0,0};
m_send_buffer.insert(m_send_buffer.end(), noop, noop+4);
m_last_sent = boost::posix_time::second_clock::local_time();
#ifndef NDEBUG
2004-01-05 02:30:34 +01:00
(*m_logger) << " ==> NOP\n";
2004-01-04 05:29:13 +01:00
#endif
send_buffer_updated();
}
}
2004-01-12 21:31:27 +01:00
// TODO: this could be implemented more efficient
bool peer_connection::is_seed() const
{
return std::count(m_have_piece.begin(), m_have_piece.end(), true)
== m_have_piece.size();
}
2004-01-04 06:53:01 +01:00
}