first implementation of set_piece_deadline

This commit is contained in:
Arvid Norberg 2009-03-17 09:34:44 +00:00
parent 1cdfe26630
commit 31054ef069
18 changed files with 651 additions and 129 deletions

View File

@ -1789,6 +1789,9 @@ Its declaration looks like this::
void resolve_countries(bool r);
bool resolve_countries() const;
enum deadline_flags { alert_when_available = 1 };
void set_piece_deadline(int index, time_duration deadline, int flags = 0) const;
void piece_priority(int index, int priority) const;
int piece_priority(int index) const;
void prioritize_pieces(std::vector<int> const& pieces) const;
@ -1837,6 +1840,29 @@ it will throw ``invalid_handle``.
Since the torrents are processed by a background thread, there is no
guarantee that a handle will remain valid between two calls.
set_piece_deadline()
--------------------
::
enum deadline_flags { alert_when_available = 1 };
void set_piece_deadline(int index, time_duration deadline, int flags = 0) const;
This function sets or resets the deadline associated with a specific piece
index (``index``). libtorrent will attempt to download this entire piece before
the deadline expires. This is not necessarily possible, but pieces with a more
recent deadline will always be prioritized over pieces with a deadline further
ahead in time. The deadline (and flags) of a piece can be changed by calling this
function again.
The ``flags`` parameter can be used to ask libtorrent to send an alert once the
piece has been downloaded, by passing ``alert_when_available``. When set, the
read_piece_alert_ alert will be delivered, with the piece data, when it's downloaded.
If the piece is already downloaded when this call is made, nothing happens, unless
the ``alert_when_available`` flag is set, in which case it will do the same thing
as calling read_piece_ for ``index``.
piece_priority() prioritize_pieces() piece_priorities()
-------------------------------------------------------

View File

@ -328,7 +328,7 @@ void print_peer_info(std::ostream& out, std::vector<libtorrent::peer_info> const
#endif
out << "down (total | peak ) up (total | peak ) sent-req recv flags source ";
if (print_fails) out << "fail hshf ";
if (print_send_bufs) out << "rq sndb quota rcvb ";
if (print_send_bufs) out << "rq sndb quota rcvb q-bytes ";
if (print_timers) out << "inactive wait timeout q-time ";
out << "disk rtt ";
if (print_block) out << "block-progress ";
@ -412,7 +412,8 @@ void print_peer_info(std::ostream& out, std::vector<libtorrent::peer_info> const
out << to_string(i->requests_in_buffer, 2) << " "
<< to_string(i->used_send_buffer, 6) << " ("<< add_suffix(i->send_buffer_size) << ") "
<< to_string(i->send_quota, 5) << " "
<< to_string(i->used_receive_buffer, 6) << " ("<< add_suffix(i->receive_buffer_size) << ") ";
<< to_string(i->used_receive_buffer, 6) << " ("<< add_suffix(i->receive_buffer_size) << ") "
<< to_string(i->queue_bytes, 6) << " ";
}
if (print_timers)
{

View File

@ -68,6 +68,11 @@ public:
struct const_interval
{
const_interval(interval const& i)
: begin(i.begin)
, end(i.end)
{}
const_interval(char const* begin, char const* end)
: begin(begin)
, end(end)

View File

@ -94,10 +94,22 @@ namespace libtorrent
struct pending_block
{
pending_block(piece_block const& b): skipped(0), block(b) {}
int skipped;
pending_block(piece_block const& b)
: skipped(0), not_wanted(false), timed_out(false), block(b) {}
// the number of times the request
// has been skipped by out of order blocks
boost::uint16_t skipped;
// if any of these are set to true, this block
// is not allocated
// in the piece picker anymore, and open for
// other peers to pick. This may be caused by
// it either timing out or being received
// unexpectedly from the peer
bool not_wanted:1;
bool timed_out:1;
piece_block block;
};
@ -242,8 +254,8 @@ namespace libtorrent
void set_pid(const peer_id& pid) { m_peer_id = pid; }
bool has_piece(int i) const;
std::deque<pending_block> const& download_queue() const;
std::deque<piece_block> const& request_queue() const;
std::vector<pending_block> const& download_queue() const;
std::vector<piece_block> const& request_queue() const;
std::deque<peer_request> const& upload_queue() const;
// estimate of how long it will take until we have
@ -380,7 +392,8 @@ namespace libtorrent
void incoming_request(peer_request const& r);
void incoming_piece(peer_request const& p, disk_buffer_holder& data);
void incoming_piece(peer_request const& p, char const* data);
void incoming_piece_fragment();
void incoming_piece_fragment(int bytes);
void start_receive_piece(peer_request const& r);
void incoming_cancel(peer_request const& r);
void incoming_dht_port(int listen_port);
@ -400,8 +413,12 @@ namespace libtorrent
void snub_peer();
bool can_request_time_critical() const;
void make_time_critical(piece_block const& block);
// adds a block to the request queue
void add_request(piece_block const& b);
void add_request(piece_block const& b, bool time_critical = false);
// removes a block from the request queue or download queue
// sends a cancel message if appropriate
// refills the request queue, and possibly ignoring pieces requested
@ -702,11 +719,11 @@ namespace libtorrent
// the blocks we have reserved in the piece
// picker and will request from this peer.
std::deque<piece_block> m_request_queue;
std::vector<piece_block> m_request_queue;
// the queue of blocks we have requested
// from this peer
std::deque<pending_block> m_download_queue;
std::vector<pending_block> m_download_queue;
// the pieces we will send to the peer
// if requested (regardless of choke state)
@ -724,6 +741,22 @@ namespace libtorrent
// the piece requests
std::vector<int> m_requests_in_buffer;
// the number of bytes that the other
// end has to send us in order to respond
// to all outstanding piece requests we
// have sent to it
int m_outstanding_bytes;
// the number of time critical requests
// queued up in the m_request_queue that
// soon will be committed to the download
// queue. This is included in download_queue_time()
// so that it can be used while adding more
// requests and take the previous requests
// into account without submitting it all
// immediately
int m_queued_time_critical;
// the number of pieces this peer
// has. Must be the same as
// std::count(m_have_piece.begin(),
@ -929,6 +962,7 @@ namespace libtorrent
bool m_in_constructor:1;
bool m_disconnect_started:1;
bool m_initialized:1;
int m_received_in_piece;
#endif
};
}

View File

@ -110,6 +110,7 @@ namespace libtorrent
// the time until all blocks in the request
// queue will be d
time_duration download_queue_time;
int queue_bytes;
// the number of seconds until the current
// pending request times out

View File

@ -219,6 +219,8 @@ namespace libtorrent
// returns a list of all torrents in this session
std::vector<torrent_handle> get_torrents() const;
io_service& get_io_service();
// returns an invalid handle in case the torrent doesn't exist
torrent_handle find_torrent(sha1_hash const& info_hash) const;

View File

@ -263,6 +263,7 @@ namespace libtorrent
void prioritize_files(std::vector<int> const& files);
void file_priorities(std::vector<int>&) const;
void set_piece_deadline(int piece, time_duration t, int flags);
void update_piece_priorities();
torrent_status status() const;
@ -855,6 +856,36 @@ namespace libtorrent
std::vector<announce_entry> m_trackers;
// this is an index into m_trackers
struct time_critical_piece
{
// when this piece was first requested
ptime first_requested;
// when this piece was last requested
ptime last_requested;
// by what time we want this piece
ptime deadline;
// 1 = send alert with piece data when available
int flags;
// how many peers it's been requested from
int peers;
// the piece index
int piece;
bool operator<(time_critical_piece const& rhs) const
{ return deadline < rhs.deadline; }
};
void remove_time_critical_piece(int piece, bool finished = false);
void remove_time_critical_pieces(std::vector<int> const& priority);
void request_time_critical_pieces();
// this list is sorted by time_critical_piece::deadline
std::list<time_critical_piece> m_time_critical_pieces;
// the average time it takes to download one time critical piece
time_duration m_average_piece_time;
// the average piece download time deviation
time_duration m_piece_time_deviation;
// the number of bytes that has been
// downloaded that failed the hash-test
size_type m_total_failed_bytes;

View File

@ -325,6 +325,9 @@ namespace libtorrent
void get_peer_info(std::vector<peer_info>& v) const;
torrent_status status() const;
void get_download_queue(std::vector<partial_piece_info>& queue) const;
enum deadline_flags { alert_when_available = 1 };
void set_piece_deadline(int index, time_duration deadline, int flags = 0) const;
#ifndef TORRENT_NO_DEPRECATE
// fills the specified vector with the download progress [0, 1]

View File

@ -833,7 +833,7 @@ namespace libtorrent
buffer::const_interval recv_buffer = receive_buffer();
// are we currently receiving a 'piece' message?
if (m_state != read_packet
|| recv_buffer.left() < 9
|| recv_buffer.left() <= 9
|| recv_buffer[0] != msg_piece)
return boost::optional<piece_block_progress>();
@ -1126,12 +1126,18 @@ namespace libtorrent
TORRENT_ASSERT(has_disk_receive_buffer() || packet_size() == 9);
// classify the received data as protocol chatter
// or data payload for the statistics
int piece_bytes = 0;
if (recv_pos <= 9)
{
// only received protocol data
m_statistics.received_bytes(0, received);
}
else if (recv_pos - received >= 9)
{
// only received payload data
m_statistics.received_bytes(received, 0);
piece_bytes = received;
}
else
{
// received a bit of both
@ -1141,6 +1147,7 @@ namespace libtorrent
m_statistics.received_bytes(
recv_pos - 9
, 9 - (recv_pos - received));
piece_bytes = recv_pos - 9;
}
const char* ptr = recv_buffer.begin + 1;
@ -1168,12 +1175,14 @@ namespace libtorrent
if (recv_pos - received < header_size && recv_pos >= header_size)
{
// begin_receive_piece(p)
// call this once, the first time the entire header
// has been received
start_receive_piece(p);
}
TORRENT_ASSERT(has_disk_receive_buffer() || packet_size() == header_size);
incoming_piece_fragment();
incoming_piece_fragment(piece_bytes);
if (is_disconnecting()) return;
if (!packet_finished()) return;

View File

@ -264,8 +264,6 @@ namespace libtorrent
boost::shared_ptr<torrent> t = associated_torrent().lock();
TORRENT_ASSERT(t);
incoming_piece_fragment();
for (;;)
{
buffer::const_interval recv_buffer = receive_buffer();
@ -385,6 +383,7 @@ namespace libtorrent
m_body_start = m_parser.body_start();
m_response_left -= payload;
m_statistics.received_bytes(payload, 0);
incoming_piece_fragment(payload);
}
else
{
@ -392,6 +391,7 @@ namespace libtorrent
if (payload > m_response_left) payload = m_response_left;
if (payload > front_request.length) payload = front_request.length;
m_statistics.received_bytes(payload, 0);
incoming_piece_fragment(payload);
m_response_left -= payload;
}
recv_buffer.begin += m_body_start;

View File

@ -95,6 +95,8 @@ namespace libtorrent
, m_socket(s)
, m_remote(endp)
, m_torrent(tor)
, m_outstanding_bytes(0)
, m_queued_time_critical(0)
, m_num_pieces(0)
, m_timeout(m_ses.settings().peer_timeout)
, m_packet_size(0)
@ -141,6 +143,7 @@ namespace libtorrent
, m_in_constructor(true)
, m_disconnect_started(false)
, m_initialized(false)
, m_received_in_piece(0)
#endif
{
m_channel_state[upload_channel] = peer_info::bw_idle;
@ -208,6 +211,8 @@ namespace libtorrent
, m_disk_recv_buffer(ses, 0)
, m_socket(s)
, m_remote(endp)
, m_outstanding_bytes(0)
, m_queued_time_critical(0)
, m_num_pieces(0)
, m_timeout(m_ses.settings().peer_timeout)
, m_packet_size(0)
@ -254,6 +259,7 @@ namespace libtorrent
, m_in_constructor(true)
, m_disconnect_started(false)
, m_initialized(false)
, m_received_in_piece(0)
#endif
{
m_channel_state[upload_channel] = peer_info::bw_idle;
@ -722,12 +728,12 @@ namespace libtorrent
return m_have_piece[i];
}
std::deque<piece_block> const& peer_connection::request_queue() const
std::vector<piece_block> const& peer_connection::request_queue() const
{
return m_request_queue;
}
std::deque<pending_block> const& peer_connection::download_queue() const
std::vector<pending_block> const& peer_connection::download_queue() const
{
return m_download_queue;
}
@ -739,34 +745,13 @@ namespace libtorrent
time_duration peer_connection::download_queue_time(int extra_bytes) const
{
boost::shared_ptr<torrent> t = m_torrent.lock();
TORRENT_ASSERT(t);
boost::optional<piece_block_progress> p = downloading_piece_progress();
size_type queue_bytes = extra_bytes;
bool in_download_queue = false;
for (std::deque<pending_block>::const_iterator i = m_download_queue.begin()
, end(m_download_queue.end()); i != end; ++i)
{
// piece packets have 13 bytes overhead
queue_bytes += t->block_size() + 13;
if (p && p->piece_index == i->block.piece_index)
{
queue_bytes -= p->bytes_downloaded + 13;
in_download_queue = true;
}
}
if (p && !in_download_queue)
queue_bytes += p->full_block_bytes - p->bytes_downloaded;
// queue bytes now contain the number of bytes we have to receive
// from this peer before all of the requested blocks complete
int rate = m_statistics.transfer_rate(stat::download_payload)
+ m_statistics.transfer_rate(stat::download_protocol);
// avoid division by zero
if (rate < 50) rate = 50;
return seconds(queue_bytes / rate);
boost::shared_ptr<torrent> t = m_torrent.lock();
TORRENT_ASSERT(t);
return seconds((m_outstanding_bytes + m_queued_time_critical * t->block_size()) / rate);
}
void peer_connection::add_stat(size_type downloaded, size_type uploaded)
@ -853,7 +838,7 @@ namespace libtorrent
return p.piece >= 0
&& p.piece < t->torrent_file().num_pieces()
&& p.length > 0
&& p.length >= 0
&& p.start >= 0
&& (p.length == t->block_size()
|| (p.length < t->block_size()
@ -983,11 +968,9 @@ namespace libtorrent
if (!t->is_seed())
{
piece_picker& p = t->picker();
for (std::deque<piece_block>::const_iterator i = m_request_queue.begin()
for (std::vector<piece_block>::const_iterator i = m_request_queue.begin()
, end(m_request_queue.end()); i != end; ++i)
{
// since this piece was skipped, clear it and allow it to
// be requested from other peers
p.abort_download(*i);
}
}
@ -1024,7 +1007,7 @@ namespace libtorrent
if (is_disconnecting()) return;
std::deque<pending_block>::iterator i = std::find_if(
std::vector<pending_block>::iterator i = std::find_if(
m_download_queue.begin(), m_download_queue.end()
, bind(match_request, boost::cref(r), bind(&pending_block::block, _1)
, t->block_size()));
@ -1034,22 +1017,26 @@ namespace libtorrent
<< " <== REJECT_PIECE [ piece: " << r.piece << " | s: " << r.start << " | l: " << r.length << " ]\n";
#endif
piece_block b(-1, 0);
if (i != m_download_queue.end())
{
b = i->block;
piece_block b = i->block;
bool remove_from_picker = !i->timed_out && !i->not_wanted;
m_download_queue.erase(i);
m_outstanding_bytes -= r.length;
// if the peer is in parole mode, keep the request
if (peer_info_struct() && peer_info_struct()->on_parole)
{
m_request_queue.push_front(b);
m_request_queue.insert(m_request_queue.begin(), b);
}
else if (!t->is_seed())
else if (!t->is_seed() && remove_from_picker)
{
piece_picker& p = t->picker();
p.abort_download(b);
}
#if !defined TORRENT_DISABLE_INVARIANT_CHECKS && defined TORRENT_DEBUG
check_invariant();
#endif
}
#if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
else
@ -1678,9 +1665,70 @@ namespace libtorrent
}
}
void peer_connection::incoming_piece_fragment()
void peer_connection::incoming_piece_fragment(int bytes)
{
m_last_piece = time_now();
m_outstanding_bytes -= bytes;
#ifdef TORRENT_DEBUG
m_received_in_piece += bytes;
#endif
#if !defined TORRENT_DISABLE_INVARIANT_CHECKS && defined TORRENT_DEBUG
check_invariant();
#endif
}
void peer_connection::start_receive_piece(peer_request const& r)
{
#if !defined TORRENT_DISABLE_INVARIANT_CHECKS && defined TORRENT_DEBUG
check_invariant();
#endif
#ifdef TORRENT_DEBUG
buffer::const_interval recv_buffer = receive_buffer();
int recv_pos = recv_buffer.end - recv_buffer.begin;
TORRENT_ASSERT(recv_pos >= 9);
#endif
boost::shared_ptr<torrent> t = associated_torrent().lock();
TORRENT_ASSERT(t);
piece_block b(r.piece, r.start / t->block_size());
if (!verify_piece(r))
{
#if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
(*m_logger) << time_now_string()
<< " <== INVALID_PIECE [ piece: " << p.piece << " | "
"start: " << p.start << " | "
"length: " << p.length << " ]\n";
#endif
disconnect("got invalid piece packet", 2);
return;
}
bool in_req_queue = false;
for (std::vector<pending_block>::const_iterator i = m_download_queue.begin()
, end(m_download_queue.end()); i != end; ++i)
{
if (i->block != b) continue;
in_req_queue = true;
break;
}
// if this is not in the request queue, we have to
// assume our outstanding bytes includes this piece too
if (!in_req_queue)
{
if (t->alerts().should_post<unwanted_block_alert>())
{
t->alerts().post_alert(unwanted_block_alert(t->get_handle(), m_remote
, m_peer_id, b.block_index, b.piece_index));
}
#if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
(*m_logger) << " *** The block we just got was not in the "
"request queue ***\n";
#endif
m_download_queue.insert(m_download_queue.begin(), b);
m_download_queue.front().not_wanted = true;
m_outstanding_bytes += r.length;
}
}
#ifdef TORRENT_DEBUG
@ -1759,7 +1807,13 @@ namespace libtorrent
for (extension_list_t::iterator i = m_extensions.begin()
, end(m_extensions.end()); i != end; ++i)
{
if ((*i)->on_piece(p, data)) return;
if ((*i)->on_piece(p, data))
{
#ifdef TORRENT_DEBUG
m_received_in_piece = 0;
#endif
return;
}
}
#endif
if (is_disconnecting()) return;
@ -1792,22 +1846,14 @@ namespace libtorrent
return;
}
if (!verify_piece(p))
{
#if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
(*m_logger) << time_now_string()
<< " <== INVALID_PIECE [ piece: " << p.piece << " | "
"start: " << p.start << " | "
"length: " << p.length << " ]\n";
#endif
disconnect("got invalid piece packet", 2);
return;
}
// if we're already seeding, don't bother,
// just ignore it
if (t->is_seed())
{
#ifdef TORRENT_DEBUG
m_received_in_piece = 0;
#endif
if (!m_download_queue.empty()) m_download_queue.erase(m_download_queue.begin());
t->add_redundant_bytes(p.length);
return;
}
@ -1823,7 +1869,7 @@ namespace libtorrent
TORRENT_ASSERT(p.length == t->block_size()
|| p.length == t->torrent_file().total_size() % t->block_size());
std::deque<pending_block>::iterator b
std::vector<pending_block>::iterator b
= std::find_if(
m_download_queue.begin()
, m_download_queue.end()
@ -1839,12 +1885,14 @@ namespace libtorrent
#if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
(*m_logger) << " *** The block we just got was not in the "
"request queue ***\n";
#endif
#ifdef TORRENT_DEBUG
m_received_in_piece = 0;
#endif
t->add_redundant_bytes(p.length);
request_a_block(*t, *this);
send_block_requests();
return;
}
#ifdef TORRENT_DEBUG
pending_block pending_b = *b;
#endif
@ -1864,22 +1912,33 @@ namespace libtorrent
// if the number of times a block is skipped by out of order
// blocks exceeds the size of the outstanding queue, assume that
// the other end dropped the request.
if (qe.skipped > m_desired_queue_size)
if (qe.skipped > m_desired_queue_size * 2)
{
if (m_ses.m_alerts.should_post<request_dropped_alert>())
m_ses.m_alerts.post_alert(request_dropped_alert(t->get_handle()
, remote(), pid(), qe.block.block_index, qe.block.piece_index));
picker.abort_download(qe.block);
if (!qe.timed_out && !qe.not_wanted)
picker.abort_download(qe.block);
TORRENT_ASSERT(m_download_queue.begin() + i != b);
m_download_queue.erase(m_download_queue.begin() + i);
--i;
--block_index;
m_outstanding_bytes -= t->block_size();
#if !defined TORRENT_DISABLE_INVARIANT_CHECKS && defined TORRENT_DEBUG
check_invariant();
#endif
}
}
TORRENT_ASSERT(int(m_download_queue.size()) > block_index + 1);
b = m_download_queue.begin() + (block_index + 1);
TORRENT_ASSERT(b->block == pending_b.block);
#ifdef TORRENT_DEBUG
TORRENT_ASSERT(m_received_in_piece == p.length);
m_received_in_piece = 0;
#endif
// if the block we got is already finished, then ignore it
if (picker.is_downloaded(block_finished))
{
@ -2249,7 +2308,30 @@ namespace libtorrent
return m_allowed_fast;
}
void peer_connection::add_request(piece_block const& block)
bool peer_connection::can_request_time_critical() const
{
if (has_peer_choked() || !is_interesting()) return false;
if (m_desired_queue_size * 2 <
- (int)m_download_queue.size()
- (int)m_request_queue.size()) return false;
if (on_parole()) return false;
return true;
}
void peer_connection::make_time_critical(piece_block const& block)
{
std::vector<piece_block>::iterator rit = std::find(m_request_queue.begin()
, m_request_queue.end(), block);
if (rit == m_request_queue.end()) return;
// ignore it if it's already time critical
if (rit - m_request_queue.begin() < m_queued_time_critical) return;
piece_block b = *rit;
m_request_queue.erase(rit);
m_request_queue.insert(m_request_queue.begin() + m_queued_time_critical, b);
++m_queued_time_critical;
}
void peer_connection::add_request(piece_block const& block, bool time_critical)
{
// INVARIANT_CHECK;
@ -2296,7 +2378,16 @@ namespace libtorrent
remote(), pid(), speedmsg, block.block_index, block.piece_index));
}
m_request_queue.push_back(block);
if (time_critical)
{
m_request_queue.insert(m_request_queue.begin() + m_queued_time_critical
, block);
++m_queued_time_critical;
}
else
{
m_request_queue.push_back(block);
}
}
void peer_connection::cancel_request(piece_block const& block)
@ -2318,11 +2409,11 @@ namespace libtorrent
// cancelled, then just ignore the cancel.
if (!t->picker().is_requested(block)) return;
std::deque<pending_block>::iterator it
std::vector<pending_block>::iterator it
= std::find_if(m_download_queue.begin(), m_download_queue.end(), has_block(block));
if (it == m_download_queue.end())
{
std::deque<piece_block>::iterator rit = std::find(m_request_queue.begin()
std::vector<piece_block>::iterator rit = std::find(m_request_queue.begin()
, m_request_queue.end(), block);
// when a multi block is received, it is cancelled
@ -2460,7 +2551,8 @@ namespace libtorrent
bool empty_download_queue = m_download_queue.empty();
while (!m_request_queue.empty()
&& (int)m_download_queue.size() < m_desired_queue_size)
&& ((int)m_download_queue.size() < m_desired_queue_size
|| m_queued_time_critical > 0))
{
piece_block block = m_request_queue.front();
@ -2475,7 +2567,8 @@ namespace libtorrent
r.start = block_offset;
r.length = block_size;
m_request_queue.pop_front();
m_request_queue.erase(m_request_queue.begin());
if (m_queued_time_critical) --m_queued_time_critical;
if (t->is_seed()) continue;
// this can happen if a block times out, is re-requested and
// then arrives "unexpectedly"
@ -2483,6 +2576,10 @@ namespace libtorrent
continue;
m_download_queue.push_back(block);
m_outstanding_bytes += block_size;
#if !defined TORRENT_DISABLE_INVARIANT_CHECKS && defined TORRENT_DEBUG
check_invariant();
#endif
/*
#ifdef TORRENT_VERBOSE_LOGGING
(*m_logger) << time_now_string()
@ -2506,8 +2603,9 @@ namespace libtorrent
!= block.piece_index * blocks_per_piece + block.block_index + 1)
break;
block = m_request_queue.front();
m_request_queue.pop_front();
m_request_queue.erase(m_request_queue.begin());
m_download_queue.push_back(block);
if (m_queued_time_critical) --m_queued_time_critical;
#ifdef TORRENT_VERBOSE_LOGGING
(*m_logger) << time_now_string()
@ -2523,6 +2621,10 @@ namespace libtorrent
TORRENT_ASSERT(block_size <= t->block_size());
r.length += block_size;
m_outstanding_bytes += block_size;
#if !defined TORRENT_DISABLE_INVARIANT_CHECKS && defined TORRENT_DEBUG
check_invariant();
#endif
}
}
@ -2646,7 +2748,8 @@ namespace libtorrent
while (!m_download_queue.empty())
{
picker.abort_download(m_download_queue.back().block);
pending_block& qe = m_download_queue.back();
if (!qe.timed_out && !qe.not_wanted) picker.abort_download(qe.block);
m_download_queue.pop_back();
}
while (!m_request_queue.empty())
@ -2758,6 +2861,7 @@ namespace libtorrent
#endif
p.download_queue_time = download_queue_time();
p.queue_bytes = m_outstanding_bytes;
#ifndef TORRENT_DISABLE_RESOLVE_COUNTRIES
p.country[0] = m_country[0];
@ -3221,14 +3325,16 @@ namespace libtorrent
else
{
TORRENT_ASSERT(!m_download_queue.empty());
r = m_download_queue.back().block;
pending_block& qe = m_download_queue.back();
if (!qe.timed_out && !qe.not_wanted)
r = qe.block;
// only time out a request if it blocks the piece
// from being completed (i.e. no free blocks to
// request from it)
piece_picker::downloading_piece p;
picker.piece_info(r.piece_index, p);
int free_blocks = picker.blocks_in_piece(r.piece_index)
picker.piece_info(qe.block.piece_index, p);
int free_blocks = picker.blocks_in_piece(qe.block.piece_index)
- p.finished - p.writing - p.requested;
if (free_blocks > 0)
{
@ -3239,9 +3345,9 @@ namespace libtorrent
if (m_ses.m_alerts.should_post<block_timeout_alert>())
{
m_ses.m_alerts.post_alert(block_timeout_alert(t->get_handle()
, remote(), pid(), r.block_index, r.piece_index));
, remote(), pid(), qe.block.block_index, qe.block.piece_index));
}
m_download_queue.pop_back();
qe.timed_out = true;
}
if (!m_download_queue.empty() || !m_request_queue.empty())
m_timeout_extend += m_ses.settings().request_timeout;
@ -4086,6 +4192,41 @@ namespace libtorrent
TORRENT_ASSERT(m_ses.has_peer((peer_connection*)this));
}
if (t && !m_disconnecting)
{
boost::optional<piece_block_progress> p = t?downloading_piece_progress():boost::optional<piece_block_progress>();
torrent_info const& ti = t->torrent_file();
// if the piece is fully downloaded, we might have popped it from the
// download queue already
int outstanding_bytes = 0;
bool in_download_queue = false;
int block_size = t->block_size();
piece_block last_block(ti.num_pieces()-1, (ti.total_size() % ti.piece_length())
/ block_size);
int last_block_size = t->torrent_file().total_size() & (block_size-1);
if (last_block_size == 0) last_block_size = block_size;
for (std::vector<pending_block>::const_iterator i = m_download_queue.begin()
, end(m_download_queue.end()); i != end; ++i)
{
if (p && i->block == piece_block(p->piece_index, p->block_index))
{
in_download_queue = true;
outstanding_bytes += p->full_block_bytes - m_received_in_piece;
}
else if (i->block == last_block)
{
outstanding_bytes += last_block_size;
}
else
{
outstanding_bytes += block_size;
}
}
//if (p && p->bytes_downloaded < p->full_block_bytes) TORRENT_ASSERT(in_download_queue);
TORRENT_ASSERT(m_outstanding_bytes == outstanding_bytes);
}
/*
// this assertion correct most of the time, but sometimes right when the
// limit is changed it might break
@ -4169,18 +4310,21 @@ namespace libtorrent
TORRENT_ASSERT(m_ses.has_peer(*i));
#endif
peer_connection const& p = *(*i);
for (std::deque<piece_block>::const_iterator i = p.request_queue().begin()
for (std::vector<piece_block>::const_iterator i = p.request_queue().begin()
, end(p.request_queue().end()); i != end; ++i)
++num_requests[*i];
for (std::deque<pending_block>::const_iterator i = p.download_queue().begin()
for (std::vector<pending_block>::const_iterator i = p.download_queue().begin()
, end(p.download_queue().end()); i != end; ++i)
++num_requests[i->block];
if (!i->not_wanted && !i->timed_out) ++num_requests[i->block];
}
for (std::map<piece_block, int>::iterator i = num_requests.begin()
, end(num_requests.end()); i != end; ++i)
{
if (!t->picker().is_downloaded(i->first))
TORRENT_ASSERT(t->picker().num_peers(i->first) == i->second);
piece_block b = i->first;
int count = i->second;
int picker_count = t->picker().num_peers(b);
if (!t->picker().is_downloaded(b))
TORRENT_ASSERT(picker_count == count);
}
}
#ifdef TORRENT_EXPENSIVE_INVARIANT_CHECKS

View File

@ -2059,32 +2059,55 @@ namespace libtorrent
TORRENT_ASSERT(block.piece_index < (int)m_piece_map.size());
TORRENT_ASSERT(block.block_index < blocks_in_piece(block.piece_index));
TORRENT_ASSERT(m_piece_map[block.piece_index].downloading);
std::vector<downloading_piece>::iterator i
= std::find_if(m_downloads.begin(), m_downloads.end(), has_index(block.piece_index));
TORRENT_ASSERT(i != m_downloads.end());
block_info& info = i->info[block.block_index];
info.peer = peer;
TORRENT_ASSERT(info.state == block_info::state_requested);
if (info.state == block_info::state_requested) --i->requested;
TORRENT_ASSERT(i->requested >= 0);
TORRENT_ASSERT(info.state != block_info::state_writing);
++i->writing;
info.state = block_info::state_writing;
// all other requests for this block should have been
// cancelled now
info.num_peers = 0;
if (i->requested == 0)
piece_pos& p = m_piece_map[block.piece_index];
if (p.downloading == 0)
{
// there are no blocks requested in this piece.
// remove the fast/slow state from it
i->state = none;
#ifdef TORRENT_EXPENSIVE_INVARIANT_CHECKS
TORRENT_PIECE_PICKER_INVARIANT_CHECK;
#endif
int prio = p.priority(this);
TORRENT_ASSERT(prio < int(m_priority_boundries.size())
|| m_dirty);
TORRENT_ASSERT(prio >= 0);
p.downloading = 1;
if (prio >= 0 && !m_dirty) update(prio, p.index);
downloading_piece& dp = add_download_piece();
dp.index = block.piece_index;
dp.state = none;
block_info& info = dp.info[block.block_index];
info.state = block_info::state_writing;
info.peer = peer;
info.num_peers = 0;
dp.writing = 1;
sort_piece(m_downloads.end()-1);
}
else
{
std::vector<downloading_piece>::iterator i
= std::find_if(m_downloads.begin(), m_downloads.end(), has_index(block.piece_index));
TORRENT_ASSERT(i != m_downloads.end());
block_info& info = i->info[block.block_index];
info.peer = peer;
if (info.state == block_info::state_requested) --i->requested;
TORRENT_ASSERT(i->requested >= 0);
TORRENT_ASSERT(info.state != block_info::state_writing);
++i->writing;
info.state = block_info::state_writing;
// all other requests for this block should have been
// cancelled now
info.num_peers = 0;
if (i->requested == 0)
{
// there are no blocks requested in this piece.
// remove the fast/slow state from it
i->state = none;
}
sort_piece(i);
}
sort_piece(i);
}
void piece_picker::write_failed(piece_block block)

View File

@ -271,8 +271,8 @@ namespace libtorrent
(*c.m_logger) << time_now_string() << " PIECE_PICKER [ php: " << prefer_whole_pieces
<< " picked: " << interesting_pieces.size() << " ]\n";
#endif
std::deque<pending_block> const& dq = c.download_queue();
std::deque<piece_block> const& rq = c.request_queue();
std::vector<pending_block> const& dq = c.download_queue();
std::vector<piece_block> const& rq = c.request_queue();
for (std::vector<piece_block>::iterator i = interesting_pieces.begin();
i != interesting_pieces.end(); ++i)
{
@ -292,6 +292,12 @@ namespace libtorrent
}
TORRENT_ASSERT(p.num_peers(*i) == 0);
// don't request pieces we already have in our request queue
if (std::find_if(dq.begin(), dq.end(), has_block(*i)) != dq.end()
|| std::find(rq.begin(), rq.end(), *i) != rq.end())
continue;
// ok, we found a piece that's not being downloaded
// by somebody else. request it from this peer
// and return

View File

@ -283,6 +283,11 @@ namespace libtorrent
return m_impl->get_peer_id();
}
io_service& session::get_io_service()
{
return m_impl->m_io_service;
}
void session::set_key(int key)
{
m_impl->set_key(key);

View File

@ -151,6 +151,8 @@ namespace libtorrent
#endif
, m_ses(ses)
, m_trackers(m_torrent_file->trackers())
, m_average_piece_time(0)
, m_piece_time_deviation(0)
, m_total_failed_bytes(0)
, m_total_redundant_bytes(0)
, m_padding(0)
@ -1598,6 +1600,8 @@ namespace libtorrent
, index));
}
remove_time_critical_piece(index, true);
bool was_finished = m_picker->num_filtered() + num_have()
== torrent_file().num_pieces();
@ -1818,16 +1822,16 @@ namespace libtorrent
i != m_connections.end(); ++i)
{
peer_connection* p = *i;
std::deque<pending_block> const& dq = p->download_queue();
std::deque<piece_block> const& rq = p->request_queue();
for (std::deque<pending_block>::const_iterator k = dq.begin()
std::vector<pending_block> const& dq = p->download_queue();
std::vector<piece_block> const& rq = p->request_queue();
for (std::vector<pending_block>::const_iterator k = dq.begin()
, end(dq.end()); k != end; ++k)
{
if (k->block.piece_index != index) continue;
m_picker->mark_as_downloading(k->block, p->peer_info_struct()
, (piece_picker::piece_state_t)p->peer_speed());
}
for (std::deque<piece_block>::const_iterator k = rq.begin()
for (std::vector<piece_block>::const_iterator k = rq.begin()
, end(rq.end()); k != end; ++k)
{
if (k->piece_index != index) continue;
@ -2025,6 +2029,100 @@ namespace libtorrent
return m_username + ":" + m_password;
}
void torrent::set_piece_deadline(int piece, time_duration t, int flags)
{
ptime deadline = time_now() + t;
if (is_seed() || m_picker->have_piece(piece))
{
if (flags & torrent_handle::alert_when_available)
read_piece(piece);
return;
}
for (std::list<time_critical_piece>::iterator i = m_time_critical_pieces.begin()
, end(m_time_critical_pieces.end()); i != end; ++i)
{
if (i->piece != piece) continue;
i->deadline = deadline;
i->flags = flags;
// resort i since deadline might have changed
while (boost::next(i) != m_time_critical_pieces.end() && i->deadline > boost::next(i)->deadline)
{
std::iter_swap(i, boost::next(i));
++i;
}
while (i != m_time_critical_pieces.begin() && i->deadline < boost::prior(i)->deadline)
{
std::iter_swap(i, boost::next(i));
--i;
}
return;
}
time_critical_piece p;
p.first_requested = min_time();
p.last_requested = min_time();
p.flags = flags;
p.deadline = deadline;
p.peers = 0;
p.piece = piece;
std::list<time_critical_piece>::iterator i = std::upper_bound(m_time_critical_pieces.begin()
, m_time_critical_pieces.end(), p);
m_time_critical_pieces.insert(i, p);
}
void torrent::remove_time_critical_piece(int piece, bool finished)
{
for (std::list<time_critical_piece>::iterator i = m_time_critical_pieces.begin()
, end(m_time_critical_pieces.end()); i != end; ++i)
{
if (i->piece != piece) continue;
if (finished)
{
if (i->flags & torrent_handle::alert_when_available)
{
read_piece(i->piece);
}
// update the average download time and average
// download time deviation
time_duration dl_time = time_now() - i->first_requested;
if (m_average_piece_time == seconds(0))
{
m_average_piece_time = dl_time;
}
else
{
time_duration diff = time_duration(dl_time.diff - m_average_piece_time.diff);
if (m_piece_time_deviation == seconds(0)) m_piece_time_deviation = diff;
else m_piece_time_deviation = m_piece_time_deviation * 0.6f + diff * 0.4;
m_average_piece_time = m_average_piece_time * 0.6f + dl_time * 0.4f;
}
}
m_time_critical_pieces.erase(i);
return;
}
}
// remove time critical pieces where priority is 0
void torrent::remove_time_critical_pieces(std::vector<int> const& priority)
{
for (std::list<time_critical_piece>::iterator i = m_time_critical_pieces.begin();
i != m_time_critical_pieces.end();)
{
if (priority[i->piece] == 0)
{
i = m_time_critical_pieces.erase(i);
continue;
}
++i;
}
}
void torrent::piece_availability(std::vector<int>& avail) const
{
INVARIANT_CHECK;
@ -2054,7 +2152,12 @@ namespace libtorrent
bool was_finished = is_finished();
bool filter_updated = m_picker->set_piece_priority(index, priority);
TORRENT_ASSERT(num_have() >= m_picker->num_have_filtered());
if (filter_updated) update_peer_interest(was_finished);
if (filter_updated)
{
update_peer_interest(was_finished);
if (priority == 0) remove_time_critical_piece(index);
}
}
int torrent::piece_priority(int index) const
@ -2093,7 +2196,11 @@ namespace libtorrent
filter_updated |= m_picker->set_piece_priority(index, *i);
TORRENT_ASSERT(num_have() >= m_picker->num_have_filtered());
}
if (filter_updated) update_peer_interest(was_finished);
if (filter_updated)
{
update_peer_interest(was_finished);
remove_time_critical_pieces(pieces);
}
}
void torrent::piece_priorities(std::vector<int>& pieces) const
@ -4087,12 +4194,12 @@ namespace libtorrent
TORRENT_ASSERT(m_ses.has_peer(*i));
#endif
peer_connection const& p = *(*i);
for (std::deque<piece_block>::const_iterator i = p.request_queue().begin()
for (std::vector<piece_block>::const_iterator i = p.request_queue().begin()
, end(p.request_queue().end()); i != end; ++i)
++num_requests[*i];
for (std::deque<pending_block>::const_iterator i = p.download_queue().begin()
for (std::vector<pending_block>::const_iterator i = p.download_queue().begin()
, end(p.download_queue().end()); i != end; ++i)
++num_requests[i->block];
if (!i->not_wanted && !i->timed_out) ++num_requests[i->block];
if (!p.is_choked() && !p.ignore_unchoke_slots()) ++num_uploads;
torrent* associated_torrent = p.associated_torrent().lock().get();
if (associated_torrent != this)
@ -4105,8 +4212,11 @@ namespace libtorrent
for (std::map<piece_block, int>::iterator i = num_requests.begin()
, end(num_requests.end()); i != end; ++i)
{
if (!m_picker->is_downloaded(i->first))
TORRENT_ASSERT(m_picker->num_peers(i->first) == i->second);
piece_block b = i->first;
int count = i->second;
int picker_count = m_picker->num_peers(b);
if (!m_picker->is_downloaded(b))
TORRENT_ASSERT(picker_count == count);
}
TORRENT_ASSERT(num_have() >= m_picker->num_have_filtered());
}
@ -4697,9 +4807,17 @@ namespace libtorrent
if (is_seed()) m_seeding_time += since_last_tick;
m_active_time += since_last_tick;
ptime now = time_now();
// ---- TIME CRITICAL PIECES ----
if (!m_time_critical_pieces.empty())
{
request_time_critical_pieces();
}
// ---- WEB SEEDS ----
ptime now = time_now();
// re-insert urls that are to be retried into the m_web_seeds
typedef std::map<web_seed_entry, ptime>::iterator iter_t;
for (iter_t i = m_web_seeds_next_retry.begin(); i != m_web_seeds_next_retry.end();)
@ -4793,6 +4911,105 @@ namespace libtorrent
}
}
void torrent::request_time_critical_pieces()
{
// build a list of peers and sort it by download_queue_time
std::vector<peer_connection*> peers;
peers.reserve(m_connections.size());
std::remove_copy_if(m_connections.begin(), m_connections.end()
, std::back_inserter(peers), !boost::bind(&peer_connection::can_request_time_critical, _1));
std::sort(peers.begin(), peers.end()
, boost::bind(&peer_connection::download_queue_time, _1, 16*1024)
< boost::bind(&peer_connection::download_queue_time, _2, 16*1024));
std::set<peer_connection*> peers_with_requests;
std::vector<piece_block> interesting_blocks;
std::vector<piece_block> backup1;
std::vector<piece_block> backup2;
std::vector<int> ignore;
ptime now = time_now();
for (std::list<time_critical_piece>::iterator i = m_time_critical_pieces.begin()
, end(m_time_critical_pieces.end()); i != end; ++i)
{
if (i != m_time_critical_pieces.begin() && i->deadline > now
+ m_average_piece_time + 4 * m_piece_time_deviation)
{
// don't request pieces whose deadline is too far in the future
break;
}
// loop until every block has been requested from
do
{
// pick the peer with the lowest download_queue_time that has i->piece
std::vector<peer_connection*>::iterator p = std::find_if(peers.begin(), peers.end()
, boost::bind(&peer_connection::has_piece, _1, i->piece));
if (p == peers.end()) break;
peer_connection& c = **p;
interesting_blocks.clear();
backup1.clear();
backup2.clear();
m_picker->add_blocks(i->piece, c.get_bitfield(), interesting_blocks
, backup1, backup2, 1, 0, c.peer_info_struct()
, ignore, piece_picker::fast, 0);
std::vector<piece_block> const& rq = c.request_queue();
bool added_request = false;
if (!interesting_blocks.empty() && std::find(rq.begin(), rq.end()
, interesting_blocks.front()) != rq.end())
{
c.make_time_critical(interesting_blocks.front());
added_request = true;
}
else if (!interesting_blocks.empty())
{
c.add_request(interesting_blocks.front(), true);
added_request = true;
}
// TODO: if there's been long enough since we requested something
// from this piece, request one of the backup blocks (the one with
// the least number of requests to it) and update the last request
// timestamp
if (added_request)
{
peers_with_requests.insert(peers_with_requests.begin(), &c);
if (i->first_requested == min_time()) i->first_requested = now;
if (!c.can_request_time_critical())
{
peers.erase(p);
}
else
{
// resort p, since it will have a higher download_queue_time now
while (p != peers.end()-1 && (*p)->download_queue_time() > (*(p+1))->download_queue_time())
{
std::iter_swap(p, p+1);
++p;
}
}
}
} while (!interesting_blocks.empty());
}
// commit all the time critical requests
for (std::set<peer_connection*>::iterator i = peers_with_requests.begin()
, end(peers_with_requests.end()); i != end; ++i)
{
(*i)->send_block_requests();
}
}
std::set<std::string> torrent::web_seeds(web_seed_entry::type_t type) const
{
std::set<std::string> ret;

View File

@ -708,5 +708,11 @@ namespace libtorrent
TORRENT_FORWARD(get_download_queue(queue));
}
void torrent_handle::set_piece_deadline(int index, time_duration deadline, int flags) const
{
INVARIANT_CHECK;
TORRENT_FORWARD(set_piece_deadline(index, deadline, flags));
}
}

View File

@ -337,8 +337,6 @@ namespace libtorrent
boost::shared_ptr<torrent> t = associated_torrent().lock();
TORRENT_ASSERT(t);
incoming_piece_fragment();
for (;;)
{
buffer::const_interval recv_buffer = receive_buffer();
@ -522,6 +520,7 @@ namespace libtorrent
int left_in_response = range_end - range_start - m_range_pos;
int payload_transferred = (std::min)(left_in_response, int(bytes_transferred));
m_statistics.received_bytes(payload_transferred, 0);
incoming_piece_fragment(payload_transferred);
bytes_transferred -= payload_transferred;
m_range_pos += payload_transferred;;
if (m_range_pos > range_end - range_start) m_range_pos = range_end - range_start;

View File

@ -44,7 +44,7 @@ POSSIBILITY OF SUCH DAMAGE.
using boost::filesystem::remove_all;
using boost::filesystem::exists;
void test_swarm(bool super_seeding = false, bool strict = false, bool seed_mode = false)
void test_swarm(bool super_seeding = false, bool strict = false, bool seed_mode = false, bool time_critical = false)
{
using namespace libtorrent;
@ -96,7 +96,14 @@ void test_swarm(bool super_seeding = false, bool strict = false, bool seed_mode
p.seed_mode = seed_mode;
// test using piece sizes smaller than 16kB
boost::tie(tor1, tor2, tor3) = setup_transfer(&ses1, &ses2, &ses3, true
, false, true, "_swarm", 8 * 1024, 0, super_seeding, &p);
, false, true, "_swarm", 8 * 1024, 0, super_seeding, &p);
if (time_critical)
{
tor2.set_piece_deadline(2, seconds(0));
tor2.set_piece_deadline(5, seconds(1));
tor2.set_piece_deadline(8, seconds(2));
}
float sum_dl_rate2 = 0.f;
float sum_dl_rate3 = 0.f;
@ -203,6 +210,9 @@ int test_main()
using namespace libtorrent;
using namespace boost::filesystem;
// with time critical pieces
test_swarm(false, false, false, true);
// with seed mode
test_swarm(false, false, true);