optimize download queue management with an explicit queue

This commit is contained in:
arvidn 2017-03-25 18:00:23 -04:00 committed by Arvid Norberg
parent 8e6f417015
commit 7640c0641e
8 changed files with 103 additions and 90 deletions

View File

@ -1,3 +1,4 @@
* optimize download queue management
* deprecated (undocumented) file:// urls, added torrent_file_path alternative * deprecated (undocumented) file:// urls, added torrent_file_path alternative
* add limit for number of web seed connections * add limit for number of web seed connections
* added support for retrieval of DHT live nodes * added support for retrieval of DHT live nodes

View File

@ -2068,28 +2068,26 @@ int main(int argc, char* argv[])
ses.pause(); ses.pause();
std::printf("saving resume data\n"); std::printf("saving resume data\n");
std::vector<torrent_status> temp;
ses.get_torrent_status(&temp, &yes, 0);
for (std::vector<torrent_status>::iterator i = temp.begin();
i != temp.end(); ++i)
{
torrent_status& st = *i;
if (!st.handle.is_valid())
{
std::printf(" skipping, invalid handle\n");
continue;
}
if (!st.has_metadata)
{
std::printf(" skipping %s, no metadata\n", st.name.c_str());
continue;
}
if (!st.need_save_resume)
{
std::printf(" skipping %s, resume file up-to-date\n", st.name.c_str());
continue;
}
// get all the torrent handles that we need to save resume data for
std::vector<torrent_status> temp;
ses.get_torrent_status(&temp, [](torrent_status const& st)
{
if (!st.handle.is_valid()) return false;
if (!st.has_metadata) return false;
if (!st.need_save_resume) return false;
return true;
}, 0);
// sort them by queue position. We want to remove them from the back of the
// queue, so save higher queue positions first
std::sort(temp.begin(), temp.end(), [](torrent_status const& lhs, torrent_status const& rhs)
{
return lhs.queue_position > rhs.queue_position;
});
for (auto const& st : temp)
{
// save_resume_data will generate an alert when it's done // save_resume_data will generate an alert when it's done
st.handle.save_resume_data(); st.handle.save_resume_data();
++num_outstanding_resume_data; ++num_outstanding_resume_data;

View File

@ -273,6 +273,7 @@ namespace libtorrent
// this is set while the session is building the // this is set while the session is building the
// torrent status update message // torrent status update message
bool m_posting_torrent_updates = false; bool m_posting_torrent_updates = false;
bool verify_queue_position(torrent const* t, int pos) override;
#endif #endif
void on_exception(std::exception const& e) override; void on_exception(std::exception const& e) override;
@ -822,6 +823,10 @@ namespace libtorrent
tracker_manager m_tracker_manager; tracker_manager m_tracker_manager;
torrent_map m_torrents; torrent_map m_torrents;
// all torrents that are downloading or queued,
// ordered by their queue position
aux::vector<torrent*> m_download_queue;
#if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) #if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS)
// this maps obfuscated hashes to torrents. It's only // this maps obfuscated hashes to torrents. It's only
// used when encryption is enabled // used when encryption is enabled
@ -873,12 +878,6 @@ namespace libtorrent
// the peer id that is generated at the start of the session // the peer id that is generated at the start of the session
peer_id m_peer_id; peer_id m_peer_id;
// this is the highest queue position of any torrent
// in this session. queue positions are packed (i.e. there
// are no gaps). If there are no torrents with queue positions
// this is -1.
int m_max_queue_pos = -1;
// the key is an id that is used to identify the // the key is an id that is used to identify the
// client with the tracker only. It is randomized // client with the tracker only. It is randomized
// at startup // at startup

View File

@ -317,6 +317,11 @@ namespace libtorrent { namespace aux
virtual counters& stats_counters() = 0; virtual counters& stats_counters() = 0;
virtual void received_buffer(int size) = 0; virtual void received_buffer(int size) = 0;
virtual void sent_buffer(int size) = 0; virtual void sent_buffer(int size) = 0;
#if TORRENT_USE_ASSERTS
virtual bool verify_queue_position(torrent const*, int) = 0;
#endif
protected: protected:
~session_interface() {} ~session_interface() {}
}; };

View File

@ -771,6 +771,13 @@ namespace libtorrent
// there is no guarantee that adding the same torrent immediately after // there is no guarantee that adding the same torrent immediately after
// it was removed will not throw a system_error exception. Once // it was removed will not throw a system_error exception. Once
// the torrent is deleted, a torrent_deleted_alert is posted. // the torrent is deleted, a torrent_deleted_alert is posted.
//
// Note that when a queued or downloading torrent is removed, its position
// in the download queue is vacated and avery subsequent torrent in the
// queue has their queue positions updated. This can potentially cause a
// large state_update to be posted. When removing all torrents, it is
// adviced to remove them from the back of the queue, to minimize the
// shifting.
void remove_torrent(const torrent_handle& h, int options = 0); void remove_torrent(const torrent_handle& h, int options = 0);
#ifndef TORRENT_NO_DEPRECATE #ifndef TORRENT_NO_DEPRECATE

View File

@ -317,7 +317,7 @@ namespace libtorrent
public: public:
torrent(aux::session_interface& ses, int block_size torrent(aux::session_interface& ses, int block_size
, int seq, bool session_paused, add_torrent_params const& p , bool session_paused, add_torrent_params const& p
, sha1_hash const& info_hash); , sha1_hash const& info_hash);
~torrent(); ~torrent();
@ -340,6 +340,7 @@ namespace libtorrent
{ {
TORRENT_ASSERT(m_added == true); TORRENT_ASSERT(m_added == true);
m_added = false; m_added = false;
set_queue_position(-1);
// make sure we decrement the gauge counter for this torrent // make sure we decrement the gauge counter for this torrent
update_gauge(); update_gauge();
} }
@ -450,7 +451,12 @@ namespace libtorrent
void set_queue_position(int p); void set_queue_position(int p);
int queue_position() const { return m_sequence_number; } int queue_position() const { return m_sequence_number; }
// used internally // used internally
void set_queue_position_impl(int p) { m_sequence_number = p; } void set_queue_position_impl(int const p)
{
if (m_sequence_number == p) return;
m_sequence_number = p;
state_updated();
}
void second_tick(int tick_interval_ms); void second_tick(int tick_interval_ms);

View File

@ -2988,6 +2988,11 @@ namespace aux {
if (pe.second->has_peer(p)) return true; if (pe.second->has_peer(p)) return true;
return false; return false;
} }
bool session_impl::verify_queue_position(torrent const* t, int pos)
{
return m_download_queue.end_index() > pos && m_download_queue[pos] == t;
}
#endif #endif
void session_impl::sent_bytes(int bytes_payload, int bytes_protocol) void session_impl::sent_bytes(int bytes_payload, int bytes_protocol)
@ -4315,76 +4320,61 @@ namespace aux {
void session_impl::set_queue_position(torrent* me, int p) void session_impl::set_queue_position(torrent* me, int p)
{ {
// TODO: Maybe the queue position should be maintained as a vector of int const current_pos = me->queue_position();
// torrent pointers. Maybe this logic could be simplified if (current_pos == p) return;
if (p >= 0 && me->queue_position() == -1)
if (p >= 0 && current_pos == -1)
{ {
for (auto& i : m_torrents) // we're inserting the torrent into the download queue
int const last = m_download_queue.end_index();
if (p >= last)
{ {
torrent* t = i.second.get(); m_download_queue.push_back(me);
if (t->queue_position() >= p) me->set_queue_position_impl(last);
{ return;
t->set_queue_position_impl(t->queue_position()+1); }
t->state_updated();
} m_download_queue.insert(m_download_queue.begin() + p, me);
if (t->queue_position() >= p) t->set_queue_position_impl(t->queue_position()+1); for (int i = p; i < m_download_queue.end_index(); ++i)
{
m_download_queue[i]->set_queue_position_impl(i);
} }
++m_max_queue_pos;
me->set_queue_position_impl((std::min)(m_max_queue_pos, p));
} }
else if (p < 0) else if (p < 0)
{ {
TORRENT_ASSERT(me->queue_position() >= 0); // we're removing the torrent from the download queue
TORRENT_ASSERT(current_pos >= 0);
TORRENT_ASSERT(p == -1); TORRENT_ASSERT(p == -1);
for (auto& i : m_torrents) TORRENT_ASSERT(m_download_queue[current_pos] == me);
m_download_queue.erase(m_download_queue.begin() + current_pos);
me->set_queue_position_impl(-1);
for (int i = current_pos; i < m_download_queue.end_index(); ++i)
{ {
torrent* t = i.second.get(); m_download_queue[i]->set_queue_position_impl(i);
if (t == me) continue;
if (t->queue_position() == -1) continue;
if (t->queue_position() >= me->queue_position())
{
t->set_queue_position_impl(t->queue_position()-1);
t->state_updated();
}
} }
--m_max_queue_pos;
me->set_queue_position_impl(p);
} }
else if (p < me->queue_position()) else if (p < current_pos)
{ {
for (auto& i : m_torrents) // we're moving the torrent up the queue
torrent* tmp = me;
for (int i = p; i <= current_pos; ++i)
{ {
torrent* t = i.second.get(); std::swap(m_download_queue[i], tmp);
if (t == me) continue; m_download_queue[i]->set_queue_position_impl(i);
if (t->queue_position() == -1) continue;
if (t->queue_position() >= p
&& t->queue_position() < me->queue_position())
{
t->set_queue_position_impl(t->queue_position()+1);
t->state_updated();
}
} }
me->set_queue_position_impl(p); TORRENT_ASSERT(tmp == me);
} }
else if (p > me->queue_position()) else if (p > current_pos)
{ {
for (auto& i : m_torrents) // we're moving the torrent down the queue
p = std::min(p, m_download_queue.end_index() - 1);
for (int i = current_pos; i < p; ++i)
{ {
torrent* t = i.second.get(); m_download_queue[i] = m_download_queue[i + 1];
int pos = t->queue_position(); m_download_queue[i]->set_queue_position_impl(i);
if (t == me) continue;
if (pos == -1) continue;
if (pos <= p
&& pos > me->queue_position()
&& pos != -1)
{
t->set_queue_position_impl(t->queue_position()-1);
t->state_updated();
}
} }
me->set_queue_position_impl((std::min)(m_max_queue_pos, p)); m_download_queue[p] = me;
me->set_queue_position_impl(p);
} }
trigger_auto_manage(); trigger_auto_manage();
@ -4904,11 +4894,10 @@ namespace aux {
return std::make_pair(ptr_t(), false); return std::make_pair(ptr_t(), false);
} }
int queue_pos = ++m_max_queue_pos;
torrent_ptr = std::make_shared<torrent>(*this torrent_ptr = std::make_shared<torrent>(*this
, 16 * 1024, queue_pos, m_paused , 16 * 1024, m_paused
, params, params.info_hash); , params, params.info_hash);
torrent_ptr->set_queue_position(m_download_queue.end_index());
return std::make_pair(torrent_ptr, true); return std::make_pair(torrent_ptr, true);
} }
@ -5027,7 +5016,6 @@ namespace aux {
remove_torrent_impl(tptr, options); remove_torrent_impl(tptr, options);
tptr->abort(); tptr->abort();
tptr->set_queue_position(-1);
} }
void session_impl::remove_torrent_impl(std::shared_ptr<torrent> tptr void session_impl::remove_torrent_impl(std::shared_ptr<torrent> tptr
@ -5884,7 +5872,6 @@ namespace aux {
{ {
// this is not allowed to be the network thread! // this is not allowed to be the network thread!
// TORRENT_ASSERT(is_not_thread()); // TORRENT_ASSERT(is_not_thread());
// TODO: asserts that no outstanding async operations are still in flight // TODO: asserts that no outstanding async operations are still in flight
#if defined TORRENT_ASIO_DEBUGGING #if defined TORRENT_ASIO_DEBUGGING
@ -6794,6 +6781,13 @@ namespace aux {
{ {
TORRENT_ASSERT(i->m_links[l].in_list()); TORRENT_ASSERT(i->m_links[l].in_list());
} }
int idx = 0;
for (auto t : m_download_queue)
{
TORRENT_ASSERT(t->queue_position() == idx);
++idx;
}
} }
int const num_gauges = counters::num_error_torrents - counters::num_checking_torrents + 1; int const num_gauges = counters::num_error_torrents - counters::num_checking_torrents + 1;

View File

@ -163,7 +163,6 @@ namespace libtorrent
torrent::torrent( torrent::torrent(
aux::session_interface& ses aux::session_interface& ses
, int const block_size , int const block_size
, int const seq
, bool const session_paused , bool const session_paused
, add_torrent_params const& p , add_torrent_params const& p
, sha1_hash const& info_hash) , sha1_hash const& info_hash)
@ -181,7 +180,7 @@ namespace libtorrent
, m_storage_constructor(p.storage) , m_storage_constructor(p.storage)
, m_info_hash(info_hash) , m_info_hash(info_hash)
, m_error_file(torrent_status::error_file_none) , m_error_file(torrent_status::error_file_none)
, m_sequence_number(seq) , m_sequence_number(-1)
, m_announce_to_trackers((p.flags & add_torrent_params::flag_paused) == 0) , m_announce_to_trackers((p.flags & add_torrent_params::flag_paused) == 0)
, m_announce_to_lsd((p.flags & add_torrent_params::flag_paused) == 0) , m_announce_to_lsd((p.flags & add_torrent_params::flag_paused) == 0)
, m_has_incoming(false) , m_has_incoming(false)
@ -7520,6 +7519,9 @@ namespace libtorrent
TORRENT_ASSERT(current_stats_state() == int(m_current_gauge_state + counters::num_checking_torrents) TORRENT_ASSERT(current_stats_state() == int(m_current_gauge_state + counters::num_checking_torrents)
|| m_current_gauge_state == no_gauge_state); || m_current_gauge_state == no_gauge_state);
TORRENT_ASSERT(m_sequence_number == -1
|| m_ses.verify_queue_position(this, m_sequence_number));
for (auto const& i : m_time_critical_pieces) for (auto const& i : m_time_critical_pieces)
{ {
TORRENT_ASSERT(!is_seed()); TORRENT_ASSERT(!is_seed());
@ -7808,7 +7810,8 @@ namespace libtorrent
TORRENT_ASSERT((p == -1) == is_finished() TORRENT_ASSERT((p == -1) == is_finished()
|| (!m_auto_managed && p == -1) || (!m_auto_managed && p == -1)
|| (m_abort && p == -1)); || (m_abort && p == -1)
|| (!m_added && p == -1));
if (p == m_sequence_number) return; if (p == m_sequence_number) return;
TORRENT_ASSERT(p >= -1); TORRENT_ASSERT(p >= -1);