premiere-libtorrent/src/peer_connection.cpp

1317 lines
35 KiB
C++
Raw Normal View History

/*
Copyright (c) 2003, Arvid Norberg
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in
the documentation and/or other materials provided with the distribution.
* Neither the name of the author nor the names of its
contributors may be used to endorse or promote products derived
from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
*/
#include <iostream>
#include <iomanip>
#include <vector>
#include "libtorrent/peer_connection.hpp"
#include "libtorrent/session.hpp"
2003-12-22 08:14:35 +01:00
#include "libtorrent/identify_client.hpp"
2003-11-09 19:17:09 +01:00
#if defined(_MSC_VER)
#define for if (false) {} else for
#endif
#define VERBOSE
using namespace libtorrent;
2003-11-05 00:27:06 +01:00
libtorrent::peer_connection::peer_connection(
2003-12-07 06:53:04 +01:00
detail::session_impl& ses
2003-11-05 00:27:06 +01:00
, selector& sel
, torrent* t
, boost::shared_ptr<libtorrent::socket> s
, const peer_id& p)
: m_state(read_protocol_length)
, m_timeout(120)
, m_packet_size(1)
, m_recv_pos(0)
2003-11-26 15:11:25 +01:00
, m_last_receive(boost::gregorian::date(std::time(0)))
, m_last_sent(boost::gregorian::date(std::time(0)))
2003-11-05 00:27:06 +01:00
, m_selector(sel)
, m_socket(s)
, m_torrent(t)
, m_attached_to_torrent(true)
, m_ses(ses)
, m_active(true)
2003-11-05 00:27:06 +01:00
, m_added_to_selector(false)
, m_peer_id(p)
, m_peer_interested(false)
, m_peer_choked(true)
, m_interesting(false)
, m_choked(true)
2003-12-14 23:55:32 +01:00
, m_free_upload(0)
2003-12-08 22:59:48 +01:00
, m_send_quota(100)
, m_send_quota_left(100)
2003-12-07 06:53:04 +01:00
, m_send_quota_limit(100)
, m_trust_points(0)
{
assert(!m_socket->is_blocking());
assert(m_torrent != 0);
2003-11-09 19:17:09 +01:00
#ifndef NDEBUG
2003-12-07 06:53:04 +01:00
m_logger = m_ses.create_log(s->sender().as_string().c_str());
#endif
send_handshake();
// start in the state where we are trying to read the
// handshake from the other side
m_recv_buffer.resize(1);
// assume the other end has no pieces
m_have_piece.resize(m_torrent->torrent_file().num_pieces());
std::fill(m_have_piece.begin(), m_have_piece.end(), false);
send_bitfield();
}
2003-11-05 00:27:06 +01:00
libtorrent::peer_connection::peer_connection(
2003-12-07 06:53:04 +01:00
detail::session_impl& ses
2003-11-05 00:27:06 +01:00
, selector& sel
, boost::shared_ptr<libtorrent::socket> s)
: m_state(read_protocol_length)
, m_timeout(120)
, m_packet_size(1)
, m_recv_pos(0)
2003-11-26 15:11:25 +01:00
, m_last_receive(boost::gregorian::date(std::time(0)))
, m_last_sent(boost::gregorian::date(std::time(0)))
2003-11-05 00:27:06 +01:00
, m_selector(sel)
, m_socket(s)
, m_torrent(0)
, m_attached_to_torrent(0)
, m_ses(ses)
, m_active(false)
2003-11-05 00:27:06 +01:00
, m_added_to_selector(false)
, m_peer_id()
, m_peer_interested(false)
, m_peer_choked(true)
, m_interesting(false)
, m_choked(true)
2003-12-14 23:55:32 +01:00
, m_free_upload(0)
2003-12-08 22:59:48 +01:00
, m_send_quota(100)
, m_send_quota_left(100)
2003-12-07 06:53:04 +01:00
, m_send_quota_limit(100)
, m_trust_points(0)
{
assert(!m_socket->is_blocking());
2003-11-09 19:17:09 +01:00
#ifndef NDEBUG
2003-12-07 06:53:04 +01:00
m_logger = m_ses.create_log(s->sender().as_string().c_str());
#endif
// we are not attached to any torrent yet.
// we have to wait for the handshake to see
// which torrent the connector want's to connect to
// start in the state where we are trying to read the
// handshake from the other side
m_recv_buffer.resize(1);
}
libtorrent::peer_connection::~peer_connection()
{
2003-11-10 14:15:41 +01:00
m_selector.remove(m_socket);
if (m_attached_to_torrent)
{
assert(m_torrent != 0);
m_torrent->remove_peer(this);
}
}
2003-11-09 19:17:09 +01:00
void libtorrent::peer_connection::set_send_quota(int num_bytes)
{
2003-12-14 06:56:12 +01:00
assert(num_bytes <= m_send_quota_limit || m_send_quota_limit == -1);
if (num_bytes > m_send_quota_limit && m_send_quota_limit!=-1) num_bytes = m_send_quota_limit;
2003-12-07 06:53:04 +01:00
2003-11-09 19:17:09 +01:00
m_send_quota = num_bytes;
m_send_quota_left = num_bytes;
send_buffer_updated();
}
void libtorrent::peer_connection::send_handshake()
{
assert(m_send_buffer.size() == 0);
// add handshake to the send buffer
2003-11-09 19:17:09 +01:00
const char version_string[] = "BitTorrent protocol";
const int string_len = sizeof(version_string)-1;
int pos = 1;
m_send_buffer.resize(1 + string_len + 8 + 20 + 20);
// length of version string
m_send_buffer[0] = string_len;
// version string itself
std::copy(
version_string
, version_string+string_len
, m_send_buffer.begin()+pos);
pos += string_len;
// 8 zeroes
std::fill(
m_send_buffer.begin() + pos
, m_send_buffer.begin() + pos + 8
, 0);
pos += 8;
// info hash
std::copy(
m_torrent->torrent_file().info_hash().begin()
, m_torrent->torrent_file().info_hash().end()
, m_send_buffer.begin() + pos);
pos += 20;
// peer id
std::copy(
2003-12-07 06:53:04 +01:00
m_ses.get_peer_id().begin()
, m_ses.get_peer_id().end()
2003-11-09 19:17:09 +01:00
, m_send_buffer.begin() + pos);
#ifndef NDEBUG
(*m_logger) << m_socket->sender().as_string() << " ==> HANDSHAKE\n";
#endif
2003-11-05 00:27:06 +01:00
send_buffer_updated();
}
2003-12-09 19:09:34 +01:00
boost::optional<piece_block_progress> libtorrent::peer_connection::downloading_piece() const
{
// are we currently receiving a 'piece' message?
if (m_state != read_packet
|| m_recv_pos < 9
|| m_recv_buffer[0] != msg_piece)
return boost::optional<piece_block_progress>();
2004-01-02 21:46:24 +01:00
const char* ptr = &m_recv_buffer[1];
int piece_index = detail::read_int(ptr);
int offset = detail::read_int(ptr);
2003-12-09 19:09:34 +01:00
int len = m_packet_size - 9;
// is any of the piece message header data invalid?
// TODO: make sure that len is == block_size or less only
// if its's the last block.
if (piece_index < 0
|| piece_index >= m_torrent->torrent_file().num_pieces()
|| offset < 0
|| offset + len > m_torrent->torrent_file().piece_size(piece_index)
|| offset % m_torrent->block_size() != 0)
return boost::optional<piece_block_progress>();
piece_block_progress p;
p.piece_index = piece_index;
p.block_index = offset / m_torrent->block_size();
p.bytes_downloaded = m_recv_pos - 9;
p.full_block_bytes = len;
return boost::optional<piece_block_progress>(p);
}
2003-12-08 22:59:48 +01:00
bool libtorrent::peer_connection::dispatch_message(int received)
{
2003-12-08 22:59:48 +01:00
assert(m_recv_pos >= received);
assert(m_recv_pos > 0);
int packet_type = m_recv_buffer[0];
2003-12-08 22:59:48 +01:00
if (packet_type > msg_cancel || packet_type < msg_choke)
2003-12-01 22:27:27 +01:00
throw protocol_error("unknown message id");
switch (packet_type)
{
// *************** CHOKE ***************
case msg_choke:
2003-12-08 22:59:48 +01:00
if (m_packet_size != 1)
throw protocol_error("'choke' message size != 1");
m_statistics.received_bytes(0, received);
if (m_recv_pos < m_packet_size) return false;
2003-12-07 06:53:04 +01:00
2003-11-09 19:17:09 +01:00
#ifndef NDEBUG
(*m_logger) << m_socket->sender().as_string() << " <== CHOKE\n";
#endif
m_peer_choked = true;
m_torrent->get_policy().choked(*this);
// remove all pieces from this peers download queue and
// remove the 'downloading' flag from piece_picker.
2003-12-08 22:59:48 +01:00
for (std::deque<piece_block>::iterator i = m_download_queue.begin();
i != m_download_queue.end();
++i)
{
m_torrent->picker().abort_download(*i);
}
m_download_queue.clear();
#ifndef NDEBUG
2003-12-14 23:55:32 +01:00
// m_torrent->picker().integrity_check(m_torrent);
#endif
break;
// *************** UNCHOKE ***************
case msg_unchoke:
2003-12-07 06:53:04 +01:00
if (m_packet_size != 1)
throw protocol_error("'unchoke' message size != 1");
2003-12-08 22:59:48 +01:00
m_statistics.received_bytes(0, received);
if (m_recv_pos < m_packet_size) return false;
2003-12-07 06:53:04 +01:00
2003-11-09 19:17:09 +01:00
#ifndef NDEBUG
(*m_logger) << m_socket->sender().as_string() << " <== UNCHOKE\n";
#endif
m_peer_choked = false;
m_torrent->get_policy().unchoked(*this);
break;
// *************** INTERESTED ***************
case msg_interested:
2003-12-07 06:53:04 +01:00
if (m_packet_size != 1)
throw protocol_error("'interested' message size != 1");
2003-12-08 22:59:48 +01:00
m_statistics.received_bytes(0, received);
if (m_recv_pos < m_packet_size) return false;
2003-12-07 06:53:04 +01:00
2003-11-09 19:17:09 +01:00
#ifndef NDEBUG
(*m_logger) << m_socket->sender().as_string() << " <== INTERESTED\n";
#endif
m_peer_interested = true;
m_torrent->get_policy().interested(*this);
break;
// *************** NOT INTERESTED ***************
case msg_not_interested:
2003-12-07 06:53:04 +01:00
if (m_packet_size != 1)
throw protocol_error("'not interested' message size != 1");
2003-12-08 22:59:48 +01:00
m_statistics.received_bytes(0, received);
if (m_recv_pos < m_packet_size) return false;
2003-12-07 06:53:04 +01:00
2003-12-17 22:21:09 +01:00
// clear the request queue if the client isn't interested
m_requests.clear();
2003-11-09 19:17:09 +01:00
#ifndef NDEBUG
(*m_logger) << m_socket->sender().as_string() << " <== NOT_INTERESTED\n";
#endif
m_peer_interested = false;
m_torrent->get_policy().not_interested(*this);
break;
// *************** HAVE ***************
case msg_have:
{
2003-12-07 06:53:04 +01:00
if (m_packet_size != 5)
throw protocol_error("'have' message size != 5");
2003-12-08 22:59:48 +01:00
m_statistics.received_bytes(0, received);
if (m_recv_pos < m_packet_size) return false;
2003-12-07 06:53:04 +01:00
2004-01-02 21:46:24 +01:00
const char* ptr = &m_recv_buffer[1];
int index = detail::read_int(ptr);
// if we got an invalid message, abort
2004-01-02 21:46:24 +01:00
if (index >= m_have_piece.size() || index < 0)
2003-12-01 22:27:27 +01:00
throw protocol_error("have message with higher index than the number of pieces");
2003-11-09 19:17:09 +01:00
#ifndef NDEBUG
(*m_logger) << m_socket->sender().as_string() << " <== HAVE [ piece: " << index << "]\n";
#endif
if (m_have_piece[index])
{
2003-11-09 19:17:09 +01:00
#ifndef NDEBUG
(*m_logger) << m_socket->sender().as_string() << " oops.. we already knew that: " << index << "\n";
#endif
}
else
{
m_have_piece[index] = true;
2003-12-18 04:30:41 +01:00
m_torrent->peer_has(index);
if (!m_torrent->have_piece(index) && !is_interesting())
m_torrent->get_policy().peer_is_interesting(*this);
}
break;
}
// *************** BITFIELD ***************
case msg_bitfield:
{
if (m_packet_size - 1 != (m_have_piece.size() + 7) / 8)
2003-12-01 22:27:27 +01:00
throw protocol_error("bitfield with invalid size");
2003-12-08 22:59:48 +01:00
m_statistics.received_bytes(0, received);
if (m_recv_pos < m_packet_size) return false;
2003-12-07 06:53:04 +01:00
2003-11-09 19:17:09 +01:00
#ifndef NDEBUG
(*m_logger) << m_socket->sender().as_string() << " <== BITFIELD\n";
#endif
2003-12-17 20:03:23 +01:00
// build a vector of all pieces
std::vector<int> piece_list;
for (std::size_t i = 0; i < m_have_piece.size(); ++i)
{
bool have = m_recv_buffer[1 + (i>>3)] & (1 << (7 - (i&7)));
if (have && !m_have_piece[i])
{
m_have_piece[i] = true;
2003-12-17 20:03:23 +01:00
piece_list.push_back(i);
}
else if (!have && m_have_piece[i])
{
m_have_piece[i] = false;
m_torrent->peer_lost(i);
}
}
2003-12-17 20:03:23 +01:00
// shuffle the piece list
std::random_shuffle(piece_list.begin(), piece_list.end());
// let the torrent know which pieces the
// peer has, in a shuffled order
bool interesting = false;
for (std::vector<int>::iterator i = piece_list.begin();
i != piece_list.end();
++i)
{
int index = *i;
2003-12-18 04:30:41 +01:00
m_torrent->peer_has(index);
if (!m_torrent->have_piece(index))
2003-12-17 20:03:23 +01:00
interesting = true;
}
2003-12-18 04:30:41 +01:00
if (piece_list.empty())
{
2003-11-09 19:17:09 +01:00
#ifndef NDEBUG
(*m_logger) << m_socket->sender().as_string() << " *** THIS IS A SEED ***\n";
#endif
}
if (interesting) m_torrent->get_policy().peer_is_interesting(*this);
break;
}
// *************** REQUEST ***************
case msg_request:
{
2003-12-07 06:53:04 +01:00
if (m_packet_size != 13)
throw protocol_error("'request' message size != 13");
2003-12-08 22:59:48 +01:00
m_statistics.received_bytes(0, received);
if (m_recv_pos < m_packet_size) return false;
2003-12-07 06:53:04 +01:00
peer_request r;
2004-01-02 21:46:24 +01:00
const char* ptr = &m_recv_buffer[1];
r.piece = detail::read_int(ptr);
r.start = detail::read_int(ptr);
r.length = detail::read_int(ptr);
2003-12-17 17:37:20 +01:00
// make sure this request
// is legal and taht the peer
// is not choked
if (r.piece >= 0
&& r.piece < m_torrent->torrent_file().num_pieces()
&& r.start >= 0
&& r.start < m_torrent->torrent_file().piece_size(r.piece)
&& r.length > 0
&& r.length + r.start < m_torrent->torrent_file().piece_size(r.piece)
2003-12-17 22:21:09 +01:00
&& !m_choked
&& m_peer_interested)
2003-11-08 03:16:26 +01:00
{
2003-12-14 06:56:12 +01:00
m_requests.push_back(r);
2003-11-08 03:16:26 +01:00
send_buffer_updated();
2003-12-17 17:37:20 +01:00
#ifndef NDEBUG
(*m_logger) << m_socket->sender().as_string() << " <== REQUEST [ piece: " << r.piece << " | s: " << r.start << " | l: " << r.length << " ]\n";
#endif
2003-11-08 03:16:26 +01:00
}
2003-12-14 06:56:12 +01:00
else
{
2003-12-17 17:37:20 +01:00
// TODO: log this illegal request
// if the only error is that the
// peer is choked, it may not be a
// mistake
2003-12-14 06:56:12 +01:00
}
2003-11-05 00:27:06 +01:00
break;
}
// *************** PIECE ***************
case msg_piece:
{
2003-12-08 22:59:48 +01:00
if (m_recv_pos <= 9)
// only received protocol data
m_statistics.received_bytes(0, received);
else if (m_recv_pos - received >= 9)
// only received payload data
m_statistics.received_bytes(received, 0);
else
{
// received a bit of both
assert(m_recv_pos - received < 9);
assert(m_recv_pos > 9);
assert(9 - (m_recv_pos - received) <= 9);
m_statistics.received_bytes(
m_recv_pos - 9
, 9 - (m_recv_pos - received));
}
if (m_recv_pos < m_packet_size) return false;
2004-01-02 21:46:24 +01:00
const char* ptr = &m_recv_buffer[1];
int index = detail::read_int(ptr);
if (index < 0 || index >= m_torrent->torrent_file().num_pieces())
{
2003-11-09 19:17:09 +01:00
#ifndef NDEBUG
(*m_logger) << m_socket->sender().as_string() << " piece index invalid\n";
#endif
2003-12-01 22:27:27 +01:00
throw protocol_error("invalid piece index in piece message");
}
2004-01-02 21:46:24 +01:00
int offset = detail::read_int(ptr);
int len = m_packet_size - 9;
if (offset < 0)
{
2003-11-09 19:17:09 +01:00
#ifndef NDEBUG
(*m_logger) << m_socket->sender().as_string() << " offset < 0\n";
#endif
2003-12-01 22:27:27 +01:00
throw protocol_error("offset < 0 in piece message");
}
if (offset + len > m_torrent->torrent_file().piece_size(index))
{
2003-11-09 19:17:09 +01:00
#ifndef NDEBUG
(*m_logger) << m_socket->sender().as_string() << " piece packet contains more data than the piece size\n";
#endif
2003-12-01 22:27:27 +01:00
throw protocol_error("piece message contains more data than the piece size");
}
2003-12-09 19:09:34 +01:00
// TODO: make sure that len is == block_size or less only
// if its's the last block.
if (offset % m_torrent->block_size() != 0)
{
2003-11-09 19:17:09 +01:00
#ifndef NDEBUG
(*m_logger) << m_socket->sender().as_string() << " piece packet contains unaligned offset\n";
#endif
2003-12-01 22:27:27 +01:00
throw protocol_error("piece message contains unaligned offset");
}
2003-11-05 00:27:06 +01:00
/*
piece_block req = m_download_queue.front();
if (req.piece_index != index)
{
2003-11-09 19:17:09 +01:00
#ifndef NDEBUG
(*m_logger) << m_socket->sender().as_string() << " piece packet contains unrequested index\n";
#endif
return false;
}
if (req.block_index != offset / m_torrent->block_size())
{
2003-11-09 19:17:09 +01:00
#ifndef NDEBUG
(*m_logger) << m_socket->sender().as_string() << " piece packet contains unrequested offset\n";
#endif
return false;
}
2003-11-05 00:27:06 +01:00
*/
2003-11-09 19:17:09 +01:00
#ifndef NDEBUG
(*m_logger) << m_socket->sender().as_string() << " <== PIECE [ piece: " << index << " | s: " << offset << " | l: " << len << " ]\n";
#endif
piece_picker& picker = m_torrent->picker();
piece_block block_finished(index, offset / m_torrent->block_size());
2003-12-08 22:59:48 +01:00
std::deque<piece_block>::iterator b
2003-11-05 00:27:06 +01:00
= std::find(
m_download_queue.begin()
, m_download_queue.end()
, block_finished);
if (b != m_download_queue.end())
{
// pop the request that just finished
// from the download queue
m_download_queue.erase(b);
}
else
{
// TODO: cancel the block from the
// peer that has taken over it.
}
if (picker.is_finished(block_finished)) break;
2003-12-07 06:53:04 +01:00
m_torrent->filesystem().write(&m_recv_buffer[9], index, offset, len);
2003-11-05 00:27:06 +01:00
picker.mark_as_finished(block_finished, m_peer_id);
2003-12-01 22:27:27 +01:00
m_torrent->get_policy().block_finished(*this, block_finished);
// did we just finish the piece?
if (picker.is_piece_finished(index))
{
2003-12-07 06:53:04 +01:00
bool verified = m_torrent->verify_piece(index);
if (verified)
{
m_torrent->announce_piece(index);
}
else
{
2003-12-01 22:27:27 +01:00
m_torrent->piece_failed(index);
}
2003-12-14 06:56:12 +01:00
m_torrent->get_policy().piece_finished(index, verified);
}
break;
}
// *************** CANCEL ***************
case msg_cancel:
{
2003-12-07 06:53:04 +01:00
if (m_packet_size != 13)
throw protocol_error("'cancel' message size != 13");
2003-12-08 22:59:48 +01:00
m_statistics.received_bytes(0, received);
if (m_recv_pos < m_packet_size) return false;
2003-12-07 06:53:04 +01:00
peer_request r;
2004-01-02 21:46:24 +01:00
const char* ptr = &m_recv_buffer[1];
r.piece = detail::read_int(ptr);
r.start = detail::read_int(ptr);
r.length = detail::read_int(ptr);
2003-12-08 22:59:48 +01:00
std::deque<peer_request>::iterator i
= std::find(m_requests.begin(), m_requests.end(), r);
if (i != m_requests.end())
{
m_requests.erase(i);
}
2003-11-05 00:27:06 +01:00
if (!has_data() && m_added_to_selector)
{
m_added_to_selector = false;
m_selector.remove_writable(m_socket);
}
2003-11-09 19:17:09 +01:00
#ifndef NDEBUG
(*m_logger) << m_socket->sender().as_string() << " <== CANCEL [ piece: " << r.piece << " | s: " << r.start << " | l: " << r.length << " ]\n";
#endif
break;
}
}
2003-12-08 22:59:48 +01:00
assert(m_recv_pos == m_packet_size);
return true;
}
2003-11-05 00:27:06 +01:00
void libtorrent::peer_connection::cancel_block(piece_block block)
{
assert(block.piece_index >= 0);
assert(block.piece_index < m_torrent->torrent_file().num_pieces());
assert(m_torrent->picker().is_downloading(block));
m_torrent->picker().abort_download(block);
2003-12-08 22:59:48 +01:00
std::deque<piece_block>::iterator i
2003-11-05 00:27:06 +01:00
= std::find(m_download_queue.begin(), m_download_queue.end(), block);
assert(i != m_download_queue.end());
m_download_queue.erase(i);
int block_offset = block.block_index * m_torrent->block_size();
int block_size
= std::min((int)m_torrent->torrent_file().piece_size(block.piece_index)-block_offset,
m_torrent->block_size());
assert(block_size > 0);
assert(block_size <= m_torrent->block_size());
char buf[] = {0,0,0,13, msg_cancel};
std::size_t start_offset = m_send_buffer.size();
m_send_buffer.resize(start_offset + 17);
std::copy(buf, buf + 5, m_send_buffer.begin()+start_offset);
2003-12-07 06:53:04 +01:00
start_offset += 5;
2003-11-05 00:27:06 +01:00
2004-01-02 21:46:24 +01:00
char* ptr = &m_send_buffer[start_offset];
2003-11-05 00:27:06 +01:00
2004-01-02 21:46:24 +01:00
// index
detail::write_int(block.piece_index, ptr);
2003-11-05 00:27:06 +01:00
// begin
2004-01-02 21:46:24 +01:00
detail::write_int(block_offset, ptr);
2003-11-05 00:27:06 +01:00
// length
2004-01-02 21:46:24 +01:00
detail::write_int(block_size, ptr);
2003-11-09 19:17:09 +01:00
#ifndef NDEBUG
2003-11-05 00:27:06 +01:00
(*m_logger) << m_socket->sender().as_string() << " ==> CANCEL [ piece: " << block.piece_index << " | s: " << block_offset << " | l: " << block_size << " | " << block.block_index << " ]\n";
#endif
send_buffer_updated();
}
void libtorrent::peer_connection::request_block(piece_block block)
{
assert(block.piece_index >= 0);
assert(block.piece_index < m_torrent->torrent_file().num_pieces());
assert(!m_torrent->picker().is_downloading(block));
m_torrent->picker().mark_as_downloading(block, m_peer_id);
m_download_queue.push_back(block);
int block_offset = block.block_index * m_torrent->block_size();
int block_size
= std::min((int)m_torrent->torrent_file().piece_size(block.piece_index)-block_offset,
m_torrent->block_size());
assert(block_size > 0);
assert(block_size <= m_torrent->block_size());
char buf[] = {0,0,0,13, msg_request};
std::size_t start_offset = m_send_buffer.size();
m_send_buffer.resize(start_offset + 17);
std::copy(buf, buf + 5, m_send_buffer.begin()+start_offset);
2004-01-02 21:46:24 +01:00
char* ptr = &m_send_buffer[start_offset+5];
// index
2004-01-02 21:46:24 +01:00
detail::write_int(block.piece_index, ptr);
// begin
2004-01-02 21:46:24 +01:00
detail::write_int(block_offset, ptr);
// length
2004-01-02 21:46:24 +01:00
detail::write_int(block_size, ptr);
2003-11-09 19:17:09 +01:00
#ifndef NDEBUG
(*m_logger) << m_socket->sender().as_string() << " ==> REQUEST [ piece: " << block.piece_index << " | s: " << block_offset << " | l: " << block_size << " | " << block.block_index << " ]\n";
#endif
2003-11-05 00:27:06 +01:00
send_buffer_updated();
}
void libtorrent::peer_connection::send_bitfield()
{
2003-11-09 19:17:09 +01:00
#ifndef NDEBUG
(*m_logger) << m_socket->sender().as_string() << " ==> BITFIELD\n";
#endif
const int packet_size = (m_have_piece.size() + 7) / 8 + 5;
const int old_size = m_send_buffer.size();
m_send_buffer.resize(old_size + packet_size);
2004-01-02 21:46:24 +01:00
char* ptr = &m_send_buffer[old_size];
detail::write_int(packet_size - 4, ptr);
m_send_buffer[old_size+4] = msg_bitfield;
std::fill(m_send_buffer.begin()+old_size+5, m_send_buffer.end(), 0);
for (std::size_t i = 0; i < m_have_piece.size(); ++i)
{
if (m_torrent->have_piece(i))
m_send_buffer[old_size + 5 + (i>>3)] |= 1 << (7 - (i&7));
}
2003-11-05 00:27:06 +01:00
send_buffer_updated();
}
void libtorrent::peer_connection::choke()
{
if (m_choked) return;
char msg[] = {0,0,0,1,msg_choke};
m_send_buffer.insert(m_send_buffer.end(), msg, msg+sizeof(msg));
m_choked = true;
2003-11-09 19:17:09 +01:00
#ifndef NDEBUG
(*m_logger) << m_socket->sender().as_string() << " ==> CHOKE\n";
#endif
2003-12-14 06:56:12 +01:00
m_requests.clear();
2003-11-05 00:27:06 +01:00
send_buffer_updated();
}
void libtorrent::peer_connection::unchoke()
{
if (!m_choked) return;
char msg[] = {0,0,0,1,msg_unchoke};
m_send_buffer.insert(m_send_buffer.end(), msg, msg+sizeof(msg));
m_choked = false;
2003-11-09 19:17:09 +01:00
#ifndef NDEBUG
(*m_logger) << m_socket->sender().as_string() << " ==> UNCHOKE\n";
#endif
2003-11-05 00:27:06 +01:00
send_buffer_updated();
}
void libtorrent::peer_connection::interested()
{
if (m_interesting) return;
char msg[] = {0,0,0,1,msg_interested};
m_send_buffer.insert(m_send_buffer.end(), msg, msg+sizeof(msg));
m_interesting = true;
2003-11-09 19:17:09 +01:00
#ifndef NDEBUG
(*m_logger) << m_socket->sender().as_string() << " ==> INTERESTED\n";
#endif
2003-11-05 00:27:06 +01:00
send_buffer_updated();
}
void libtorrent::peer_connection::not_interested()
{
if (!m_interesting) return;
char msg[] = {0,0,0,1,msg_not_interested};
m_send_buffer.insert(m_send_buffer.end(), msg, msg+sizeof(msg));
m_interesting = false;
2003-11-09 19:17:09 +01:00
#ifndef NDEBUG
(*m_logger) << m_socket->sender().as_string() << " ==> NOT_INTERESTED\n";
#endif
2003-11-05 00:27:06 +01:00
send_buffer_updated();
}
void libtorrent::peer_connection::send_have(int index)
{
2003-12-07 06:53:04 +01:00
const int packet_size = 9;
char msg[packet_size] = {0,0,0,5,msg_have};
2004-01-02 21:46:24 +01:00
char* ptr = msg+5;
detail::write_int(index, ptr);
2003-12-07 06:53:04 +01:00
m_send_buffer.insert(m_send_buffer.end(), msg, msg + packet_size);
2003-11-09 19:17:09 +01:00
#ifndef NDEBUG
(*m_logger) << m_socket->sender().as_string() << " ==> HAVE [ piece: " << index << " ]\n";
#endif
2003-11-05 00:27:06 +01:00
send_buffer_updated();
}
2003-12-07 06:53:04 +01:00
void libtorrent::peer_connection::second_tick()
{
m_statistics.second_tick();
m_send_quota_left = m_send_quota;
if (m_send_quota > 0) send_buffer_updated();
// If the client sends more data
// we send it data faster, otherwise, slower.
// It will also depend on how much data the
// client has sent us. This is the mean to
// maintain a 1:1 share ratio with all peers.
2003-12-14 23:55:32 +01:00
int diff = share_diff();
2003-12-07 06:53:04 +01:00
2003-12-14 06:56:12 +01:00
if (diff > 2*m_torrent->block_size())
2003-12-08 02:37:30 +01:00
{
// if we have downloaded more than one piece more
// than we have uploaded, have an unlimited
// upload rate
2003-12-14 06:56:12 +01:00
m_send_quota_limit = -1;
2003-12-08 02:37:30 +01:00
}
else
{
// if we have downloaded too much, response with an
// upload rate of 10 kB/s more than we dowlload
// if we have uploaded too much, send with a rate of
// 10 kB/s less than we receive
2003-12-14 06:56:12 +01:00
int bias = 0;
if (diff > -2*m_torrent->block_size())
2003-12-09 19:09:34 +01:00
{
2003-12-22 08:14:35 +01:00
bias = m_statistics.download_rate() / 2;
2003-12-14 06:56:12 +01:00
if (bias < 10*1024) bias = 10*1024;
2003-12-09 19:09:34 +01:00
}
else
{
2003-12-22 08:14:35 +01:00
bias = -m_statistics.download_rate() / 2;
2003-12-09 19:09:34 +01:00
}
2003-12-14 06:56:12 +01:00
m_send_quota_limit = m_statistics.download_rate() + bias;
2003-12-08 02:37:30 +01:00
// the maximum send_quota given our download rate from this peer
2003-12-14 06:56:12 +01:00
if (m_send_quota_limit < 256) m_send_quota_limit = 256;
2003-12-08 02:37:30 +01:00
}
2003-12-07 06:53:04 +01:00
}
// --------------------------
// RECEIVE DATA
// --------------------------
// throws exception when the client should be disconnected
void libtorrent::peer_connection::receive_data()
{
assert(!m_socket->is_blocking());
assert(m_packet_size > 0);
2003-10-30 00:28:09 +01:00
for(;;)
{
assert(m_packet_size > 0);
2003-10-30 00:28:09 +01:00
int received = m_socket->receive(&m_recv_buffer[m_recv_pos], m_packet_size - m_recv_pos);
2003-10-30 00:28:09 +01:00
// connection closed
if (received == 0)
{
throw network_error(0);
}
2003-10-30 00:28:09 +01:00
// an error
if (received < 0)
{
2003-11-09 19:17:09 +01:00
// would_block means that no data was ready to be received
2003-12-08 22:59:48 +01:00
// returns to exit the loop
if (m_socket->last_error() == socket::would_block)
return;
2003-10-30 00:28:09 +01:00
// the connection was closed
2003-11-09 19:17:09 +01:00
throw network_error(m_socket->last_error());
2003-10-30 00:28:09 +01:00
}
if (received > 0)
{
2003-10-30 00:28:09 +01:00
m_last_receive = boost::posix_time::second_clock::local_time();
2003-10-30 00:28:09 +01:00
m_recv_pos += received;
2003-12-08 22:59:48 +01:00
switch(m_state)
2003-10-30 00:28:09 +01:00
{
2003-12-08 22:59:48 +01:00
case read_protocol_length:
2003-12-08 22:59:48 +01:00
m_statistics.received_bytes(0, received);
if (m_recv_pos < m_packet_size) break;
assert(m_recv_pos == m_packet_size);
2003-12-08 22:59:48 +01:00
m_packet_size = reinterpret_cast<unsigned char&>(m_recv_buffer[0]);
#ifndef NDEBUG
(*m_logger) << m_socket->sender().as_string() << " protocol length: " << m_packet_size << "\n";
#endif
m_state = read_protocol_string;
m_recv_buffer.resize(m_packet_size);
m_recv_pos = 0;
2003-11-09 19:17:09 +01:00
2003-12-08 22:59:48 +01:00
if (m_packet_size == 0)
{
#ifndef NDEBUG
(*m_logger) << "incorrect protocol length\n";
#endif
throw network_error(0);
}
break;
2003-10-30 00:28:09 +01:00
2003-12-08 22:59:48 +01:00
case read_protocol_string:
{
2003-12-07 06:53:04 +01:00
m_statistics.received_bytes(0, received);
2003-12-08 22:59:48 +01:00
if (m_recv_pos < m_packet_size) break;
assert(m_recv_pos == m_packet_size);
#ifndef NDEBUG
(*m_logger) << m_socket->sender().as_string() << " protocol: '" << std::string(m_recv_buffer.begin(), m_recv_buffer.end()) << "'\n";
#endif
const char protocol_string[] = "BitTorrent protocol";
const int protocol_len = sizeof(protocol_string) - 1;
if (!std::equal(m_recv_buffer.begin(), m_recv_buffer.end(), protocol_string))
2003-10-30 00:28:09 +01:00
{
2003-12-08 22:59:48 +01:00
#ifndef NDEBUG
(*m_logger) << "incorrect protocol name\n";
#endif
throw network_error(0);
}
2003-12-08 22:59:48 +01:00
m_state = read_info_hash;
m_packet_size = 28;
2003-10-30 00:28:09 +01:00
m_recv_pos = 0;
2003-12-08 22:59:48 +01:00
m_recv_buffer.resize(28);
}
2003-12-08 22:59:48 +01:00
break;
2003-10-30 00:28:09 +01:00
2003-12-08 22:59:48 +01:00
case read_info_hash:
{
m_statistics.received_bytes(0, received);
if (m_recv_pos < m_packet_size) break;
assert(m_recv_pos == m_packet_size);
// ok, now we have got enough of the handshake. Is this connection
// attached to a torrent?
if (m_torrent == 0)
{
2003-12-08 22:59:48 +01:00
// TODO: if the protocol is to be extended
// these 8 bytes would be used to describe the
// extensions available on the other side
// now, we have to see if there's a torrent with the
// info_hash we got from the peer
sha1_hash info_hash;
std::copy(m_recv_buffer.begin()+8, m_recv_buffer.begin() + 28, (char*)info_hash.begin());
m_torrent = m_ses.find_torrent(info_hash);
if (m_torrent == 0)
{
2003-12-08 22:59:48 +01:00
// we couldn't find the torrent!
#ifndef NDEBUG
(*m_logger) << m_socket->sender().as_string() << " couldn't find a torrent with the given info_hash\n";
#endif
throw network_error(0);
2003-10-30 00:28:09 +01:00
}
2003-12-08 22:59:48 +01:00
// assume the other end has no pieces
m_have_piece.resize(m_torrent->torrent_file().num_pieces());
std::fill(m_have_piece.begin(), m_have_piece.end(), false);
2003-12-08 22:59:48 +01:00
// yes, we found the torrent
// reply with our handshake
std::copy(m_recv_buffer.begin()+28, m_recv_buffer.begin() + 48, (char*)m_peer_id.begin());
send_handshake();
send_bitfield();
}
else
{
// verify info hash
if (!std::equal(m_recv_buffer.begin()+8, m_recv_buffer.begin() + 28, (const char*)m_torrent->torrent_file().info_hash().begin()))
{
#ifndef NDEBUG
(*m_logger) << m_socket->sender().as_string() << " received invalid info_hash\n";
#endif
throw network_error(0);
}
2003-10-30 00:28:09 +01:00
}
2003-12-08 22:59:48 +01:00
m_state = read_peer_id;
m_packet_size = 20;
m_recv_pos = 0;
m_recv_buffer.resize(20);
#ifndef NDEBUG
(*m_logger) << m_socket->sender().as_string() << " info_hash received\n";
#endif
break;
}
2003-12-07 06:53:04 +01:00
2003-12-08 22:59:48 +01:00
case read_peer_id:
{
m_statistics.received_bytes(0, received);
if (m_recv_pos < m_packet_size) break;
assert(m_recv_pos == m_packet_size);
2003-12-22 08:14:35 +01:00
#ifndef NDEBUG
{
peer_id tmp;
std::copy(m_recv_buffer.begin(), m_recv_buffer.begin() + 20, (char*)tmp.begin());
std::stringstream s;
s << "received peer_id: " << tmp << " client: " << identify_client(tmp) << "\n";
(*m_logger) << s.str();
}
#endif
2003-12-08 22:59:48 +01:00
if (m_active)
{
// verify peer_id
// TODO: It seems like the original client ignores to check the peer id
// can that be correct?
if (!std::equal(m_recv_buffer.begin(), m_recv_buffer.begin() + 20, (const char*)m_peer_id.begin()))
{
2003-12-08 22:59:48 +01:00
#ifndef NDEBUG
(*m_logger) << m_socket->sender().as_string() << " invalid peer_id (it doesn't equal the one from the tracker)\n";
#endif
throw network_error(0);
}
2003-12-08 22:59:48 +01:00
}
else
{
// check to make sure we don't have another connection with the same
// info_hash and peer_id. If we do. close this connection.
std::copy(m_recv_buffer.begin(), m_recv_buffer.begin() + 20, (char*)m_peer_id.begin());
if (m_torrent->has_peer(m_peer_id))
2003-10-30 00:28:09 +01:00
{
2003-12-08 22:59:48 +01:00
#ifndef NDEBUG
2003-12-22 08:14:35 +01:00
(*m_logger) << " duplicate connection, closing\n";
2003-12-08 22:59:48 +01:00
#endif
throw network_error(0);
2003-10-30 00:28:09 +01:00
}
2003-12-08 22:59:48 +01:00
m_attached_to_torrent = true;
m_torrent->attach_peer(this);
assert(m_torrent->get_policy().has_connection(this));
}
m_state = read_packet_size;
m_packet_size = 4;
m_recv_pos = 0;
m_recv_buffer.resize(4);
2003-12-22 08:14:35 +01:00
2003-12-08 22:59:48 +01:00
break;
}
2003-12-08 22:59:48 +01:00
case read_packet_size:
2004-01-02 21:46:24 +01:00
{
2003-12-08 22:59:48 +01:00
m_statistics.received_bytes(0, received);
if (m_recv_pos < m_packet_size) break;
assert(m_recv_pos == m_packet_size);
2003-12-08 22:59:48 +01:00
// convert from big endian to native byte order
2004-01-02 21:46:24 +01:00
const char* ptr = &m_recv_buffer[0];
m_packet_size = detail::read_int(ptr);
2003-12-08 22:59:48 +01:00
// don't accept packets larger than 1 MB
if (m_packet_size > 1024*1024 || m_packet_size < 0)
{
#ifndef NDEBUG
(*m_logger) << m_socket->sender().as_string() << " packet too large (packet_size > 1 Megabyte), abort\n";
#endif
// packet too large
throw network_error(0);
}
if (m_packet_size == 0)
{
// keepalive message
m_state = read_packet_size;
m_packet_size = 4;
}
else
{
m_state = read_packet;
m_recv_buffer.resize(m_packet_size);
}
m_recv_pos = 0;
assert(m_packet_size > 0);
break;
2004-01-02 21:46:24 +01:00
}
2003-12-08 22:59:48 +01:00
case read_packet:
if (dispatch_message(received))
{
m_state = read_packet_size;
m_packet_size = 4;
2003-10-30 00:28:09 +01:00
m_recv_buffer.resize(4);
m_recv_pos = 0;
assert(m_packet_size > 0);
}
2003-12-08 22:59:48 +01:00
break;
}
}
}
assert(m_packet_size > 0);
}
bool libtorrent::peer_connection::has_data() const throw()
{
// if we have requests or pending data to be sent or announcements to be made
// we want to send data
2003-11-09 19:17:09 +01:00
return ((!m_requests.empty() && !m_choked)
|| !m_send_buffer.empty()
|| !m_announce_queue.empty())
&& m_send_quota_left != 0;
}
// --------------------------
// SEND DATA
// --------------------------
// throws exception when the client should be disconnected
void libtorrent::peer_connection::send_data()
{
2003-11-05 00:27:06 +01:00
assert(m_socket->is_writable());
assert(has_data());
2003-11-05 00:27:06 +01:00
// only add new piece-chunks if the send buffer is small enough
// otherwise there will be no end to how large it will be!
2003-11-10 14:15:41 +01:00
// TODO: make this a bit better. Don't always read the entire
2003-11-05 00:27:06 +01:00
// requested block. Have a limit of how much of the requested
// block is actually read at a time.
while (!m_requests.empty()
&& (m_send_buffer.size() < m_torrent->block_size())
&& !m_choked)
{
peer_request& r = m_requests.front();
if (r.piece >= 0 && r.piece < m_have_piece.size() && m_torrent && m_torrent->have_piece(r.piece))
{
// make sure the request is ok
if (r.start + r.length > m_torrent->torrent_file().piece_size(r.piece))
{
// NOT OK! disconnect
throw network_error(0);
}
2003-12-17 17:37:20 +01:00
if (r.length <= 0 || r.start < 0)
{
// NOT OK! disconnect
throw network_error(0);
}
#ifndef NDEBUG
2003-12-07 06:53:04 +01:00
assert(m_torrent->verify_piece(r.piece) && "internal error");
#endif
2003-11-08 03:16:26 +01:00
const int send_buffer_offset = m_send_buffer.size();
const int packet_size = 4 + 5 + 4 + r.length;
2003-11-08 03:16:26 +01:00
m_send_buffer.resize(send_buffer_offset + packet_size);
2004-01-02 21:46:24 +01:00
char* ptr = &m_send_buffer[send_buffer_offset];
detail::write_int(packet_size-4, ptr);
*ptr = msg_piece; ++ptr;
detail::write_int(r.piece, ptr);
detail::write_int(r.start, ptr);
2003-12-07 06:53:04 +01:00
m_torrent->filesystem().read(
&m_send_buffer[send_buffer_offset+13]
, r.piece
, r.start
, r.length);
2003-11-09 19:17:09 +01:00
#ifndef NDEBUG
2003-11-08 03:16:26 +01:00
(*m_logger) << m_socket->sender().as_string() << " ==> PIECE [ piece: " << r.piece << " | s: " << r.start << " | l: " << r.length << " ]\n";
#endif
2003-12-08 22:59:48 +01:00
m_payloads.push_back(range(send_buffer_offset+13, r.length));
}
else
{
2003-11-09 19:17:09 +01:00
#ifndef NDEBUG
2003-11-08 03:16:26 +01:00
(*m_logger) << m_socket->sender().as_string()
<< " *** WARNING [ illegal piece request idx: " << r.piece
<< " | s: " << r.start
<< " | l: " << r.length
<< " | max_piece: " << m_have_piece.size()
<< " | torrent: " << (m_torrent != 0)
<< " | have: " << m_torrent->have_piece(r.piece)
<< " ]\n";
#endif
}
2003-11-05 00:27:06 +01:00
m_requests.erase(m_requests.begin());
}
if (!m_announce_queue.empty())
{
2003-11-05 00:27:06 +01:00
for (std::vector<int>::iterator i = m_announce_queue.begin();
i != m_announce_queue.end();
++i)
{
// (*m_logger) << "have piece: " << *i << " sent to: " << m_socket->sender().as_string() << "\n";
send_have(*i);
}
m_announce_queue.clear();
}
2003-11-09 19:17:09 +01:00
assert(m_send_quota_left != 0);
// send the actual buffer
if (!m_send_buffer.empty())
{
2003-11-05 00:27:06 +01:00
2003-11-09 19:17:09 +01:00
int amount_to_send = m_send_buffer.size();
assert(m_send_quota_left != 0);
if (m_send_quota_left > 0)
amount_to_send = std::min(m_send_quota_left, amount_to_send);
// we have data that's scheduled for sending
2003-11-05 00:27:06 +01:00
int sent = m_socket->send(
&m_send_buffer[0]
2003-11-09 19:17:09 +01:00
, amount_to_send);
2003-11-09 19:17:09 +01:00
#ifndef NDEBUG
2003-12-22 08:14:35 +01:00
// (*m_logger) << m_socket->sender().as_string() << " ==> SENT [ length: " << sent << " ]\n";
2003-11-05 00:27:06 +01:00
#endif
if (sent > 0)
{
2003-11-09 19:17:09 +01:00
if (m_send_quota_left != -1)
{
assert(m_send_quota_left >= sent);
m_send_quota_left -= sent;
}
2003-12-08 22:59:48 +01:00
int amount_payload = 0;
if (!m_payloads.empty())
{
for (std::deque<range>::iterator i = m_payloads.begin();
i != m_payloads.end();
++i)
{
i->start -= sent;
if (i->start < 0)
{
if (i->start + i->length <= 0)
{
amount_payload += i->length;
}
else
{
amount_payload += -i->start;
i->length -= -i->start;
i->start = 0;
}
}
}
}
// remove all payload ranges that has been sent
m_payloads.erase(
std::remove_if(m_payloads.begin(), m_payloads.end(), range_below_zero)
, m_payloads.end());
assert(amount_payload <= sent);
m_statistics.sent_bytes(amount_payload, sent - amount_payload);
// empty the entire buffer at once or if
// only a part of the buffer could be sent
// remove the part that was sent from the buffer
if (sent == m_send_buffer.size())
2003-11-05 00:27:06 +01:00
{
m_send_buffer.clear();
2003-11-05 00:27:06 +01:00
}
else
2003-11-05 00:27:06 +01:00
{
m_send_buffer.erase(
m_send_buffer.begin()
, m_send_buffer.begin() + sent);
}
}
else
{
assert(sent == -1);
throw network_error(m_socket->last_error());
}
m_last_sent = boost::posix_time::second_clock::local_time();
}
2003-11-05 00:27:06 +01:00
assert(m_added_to_selector);
2003-11-09 19:17:09 +01:00
send_buffer_updated();
2003-11-05 00:27:06 +01:00
#ifndef NDEBUG
2003-11-09 19:17:09 +01:00
if (has_data())
2003-11-05 00:27:06 +01:00
{
if (m_socket->is_writable())
{
std::cout << "ERROR\n";
}
}
#endif
}
void libtorrent::peer_connection::keep_alive()
{
boost::posix_time::time_duration d;
d = boost::posix_time::second_clock::local_time() - m_last_sent;
if (d.seconds() > m_timeout / 2)
{
char noop[] = {0,0,0,0};
m_send_buffer.insert(m_send_buffer.end(), noop, noop+4);
m_last_sent = boost::posix_time::second_clock::local_time();
2003-11-09 19:17:09 +01:00
#ifndef NDEBUG
(*m_logger) << m_socket->sender().as_string() << " ==> NOP\n";
#endif
2003-11-05 00:27:06 +01:00
send_buffer_updated();
}
}