diff --git a/ChangeLog b/ChangeLog index faf58cd89..0244991cc 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,4 @@ + * optimize download queue management * deprecated (undocumented) file:// urls, added torrent_file_path alternative * add limit for number of web seed connections * added support for retrieval of DHT live nodes diff --git a/examples/client_test.cpp b/examples/client_test.cpp index 0b7225aa9..c6ff89633 100644 --- a/examples/client_test.cpp +++ b/examples/client_test.cpp @@ -2068,28 +2068,26 @@ int main(int argc, char* argv[]) ses.pause(); std::printf("saving resume data\n"); - std::vector temp; - ses.get_torrent_status(&temp, &yes, 0); - for (std::vector::iterator i = temp.begin(); - i != temp.end(); ++i) - { - torrent_status& st = *i; - if (!st.handle.is_valid()) - { - std::printf(" skipping, invalid handle\n"); - continue; - } - if (!st.has_metadata) - { - std::printf(" skipping %s, no metadata\n", st.name.c_str()); - continue; - } - if (!st.need_save_resume) - { - std::printf(" skipping %s, resume file up-to-date\n", st.name.c_str()); - continue; - } + // get all the torrent handles that we need to save resume data for + std::vector temp; + ses.get_torrent_status(&temp, [](torrent_status const& st) + { + if (!st.handle.is_valid()) return false; + if (!st.has_metadata) return false; + if (!st.need_save_resume) return false; + return true; + }, 0); + + // sort them by queue position. We want to remove them from the back of the + // queue, so save higher queue positions first + std::sort(temp.begin(), temp.end(), [](torrent_status const& lhs, torrent_status const& rhs) + { + return lhs.queue_position > rhs.queue_position; + }); + + for (auto const& st : temp) + { // save_resume_data will generate an alert when it's done st.handle.save_resume_data(); ++num_outstanding_resume_data; diff --git a/include/libtorrent/aux_/session_impl.hpp b/include/libtorrent/aux_/session_impl.hpp index cff9114de..3f2e7d21c 100644 --- a/include/libtorrent/aux_/session_impl.hpp +++ b/include/libtorrent/aux_/session_impl.hpp @@ -273,6 +273,7 @@ namespace libtorrent // this is set while the session is building the // torrent status update message bool m_posting_torrent_updates = false; + bool verify_queue_position(torrent const* t, int pos) override; #endif void on_exception(std::exception const& e) override; @@ -822,6 +823,10 @@ namespace libtorrent tracker_manager m_tracker_manager; torrent_map m_torrents; + // all torrents that are downloading or queued, + // ordered by their queue position + aux::vector m_download_queue; + #if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) // this maps obfuscated hashes to torrents. It's only // used when encryption is enabled @@ -873,12 +878,6 @@ namespace libtorrent // the peer id that is generated at the start of the session peer_id m_peer_id; - // this is the highest queue position of any torrent - // in this session. queue positions are packed (i.e. there - // are no gaps). If there are no torrents with queue positions - // this is -1. - int m_max_queue_pos = -1; - // the key is an id that is used to identify the // client with the tracker only. It is randomized // at startup diff --git a/include/libtorrent/aux_/session_interface.hpp b/include/libtorrent/aux_/session_interface.hpp index 04987b65b..63145a26c 100644 --- a/include/libtorrent/aux_/session_interface.hpp +++ b/include/libtorrent/aux_/session_interface.hpp @@ -317,6 +317,11 @@ namespace libtorrent { namespace aux virtual counters& stats_counters() = 0; virtual void received_buffer(int size) = 0; virtual void sent_buffer(int size) = 0; + +#if TORRENT_USE_ASSERTS + virtual bool verify_queue_position(torrent const*, int) = 0; +#endif + protected: ~session_interface() {} }; diff --git a/include/libtorrent/session_handle.hpp b/include/libtorrent/session_handle.hpp index 30c6e390f..cae8398c9 100644 --- a/include/libtorrent/session_handle.hpp +++ b/include/libtorrent/session_handle.hpp @@ -771,6 +771,13 @@ namespace libtorrent // there is no guarantee that adding the same torrent immediately after // it was removed will not throw a system_error exception. Once // the torrent is deleted, a torrent_deleted_alert is posted. + // + // Note that when a queued or downloading torrent is removed, its position + // in the download queue is vacated and avery subsequent torrent in the + // queue has their queue positions updated. This can potentially cause a + // large state_update to be posted. When removing all torrents, it is + // adviced to remove them from the back of the queue, to minimize the + // shifting. void remove_torrent(const torrent_handle& h, int options = 0); #ifndef TORRENT_NO_DEPRECATE diff --git a/include/libtorrent/torrent.hpp b/include/libtorrent/torrent.hpp index 9c2fcbc37..5abc53a55 100644 --- a/include/libtorrent/torrent.hpp +++ b/include/libtorrent/torrent.hpp @@ -317,7 +317,7 @@ namespace libtorrent public: torrent(aux::session_interface& ses, int block_size - , int seq, bool session_paused, add_torrent_params const& p + , bool session_paused, add_torrent_params const& p , sha1_hash const& info_hash); ~torrent(); @@ -340,6 +340,7 @@ namespace libtorrent { TORRENT_ASSERT(m_added == true); m_added = false; + set_queue_position(-1); // make sure we decrement the gauge counter for this torrent update_gauge(); } @@ -450,7 +451,12 @@ namespace libtorrent void set_queue_position(int p); int queue_position() const { return m_sequence_number; } // used internally - void set_queue_position_impl(int p) { m_sequence_number = p; } + void set_queue_position_impl(int const p) + { + if (m_sequence_number == p) return; + m_sequence_number = p; + state_updated(); + } void second_tick(int tick_interval_ms); diff --git a/src/session_impl.cpp b/src/session_impl.cpp index 32a6567f7..dbdad3d58 100644 --- a/src/session_impl.cpp +++ b/src/session_impl.cpp @@ -2988,6 +2988,11 @@ namespace aux { if (pe.second->has_peer(p)) return true; return false; } + + bool session_impl::verify_queue_position(torrent const* t, int pos) + { + return m_download_queue.end_index() > pos && m_download_queue[pos] == t; + } #endif void session_impl::sent_bytes(int bytes_payload, int bytes_protocol) @@ -4315,76 +4320,61 @@ namespace aux { void session_impl::set_queue_position(torrent* me, int p) { - // TODO: Maybe the queue position should be maintained as a vector of - // torrent pointers. Maybe this logic could be simplified - if (p >= 0 && me->queue_position() == -1) + int const current_pos = me->queue_position(); + if (current_pos == p) return; + + if (p >= 0 && current_pos == -1) { - for (auto& i : m_torrents) + // we're inserting the torrent into the download queue + int const last = m_download_queue.end_index(); + if (p >= last) { - torrent* t = i.second.get(); - if (t->queue_position() >= p) - { - t->set_queue_position_impl(t->queue_position()+1); - t->state_updated(); - } - if (t->queue_position() >= p) t->set_queue_position_impl(t->queue_position()+1); + m_download_queue.push_back(me); + me->set_queue_position_impl(last); + return; + } + + m_download_queue.insert(m_download_queue.begin() + p, me); + for (int i = p; i < m_download_queue.end_index(); ++i) + { + m_download_queue[i]->set_queue_position_impl(i); } - ++m_max_queue_pos; - me->set_queue_position_impl((std::min)(m_max_queue_pos, p)); } else if (p < 0) { - TORRENT_ASSERT(me->queue_position() >= 0); + // we're removing the torrent from the download queue + TORRENT_ASSERT(current_pos >= 0); TORRENT_ASSERT(p == -1); - for (auto& i : m_torrents) + TORRENT_ASSERT(m_download_queue[current_pos] == me); + m_download_queue.erase(m_download_queue.begin() + current_pos); + me->set_queue_position_impl(-1); + for (int i = current_pos; i < m_download_queue.end_index(); ++i) { - torrent* t = i.second.get(); - if (t == me) continue; - if (t->queue_position() == -1) continue; - if (t->queue_position() >= me->queue_position()) - { - t->set_queue_position_impl(t->queue_position()-1); - t->state_updated(); - } + m_download_queue[i]->set_queue_position_impl(i); } - --m_max_queue_pos; - me->set_queue_position_impl(p); } - else if (p < me->queue_position()) + else if (p < current_pos) { - for (auto& i : m_torrents) + // we're moving the torrent up the queue + torrent* tmp = me; + for (int i = p; i <= current_pos; ++i) { - torrent* t = i.second.get(); - if (t == me) continue; - if (t->queue_position() == -1) continue; - if (t->queue_position() >= p - && t->queue_position() < me->queue_position()) - { - t->set_queue_position_impl(t->queue_position()+1); - t->state_updated(); - } + std::swap(m_download_queue[i], tmp); + m_download_queue[i]->set_queue_position_impl(i); } - me->set_queue_position_impl(p); + TORRENT_ASSERT(tmp == me); } - else if (p > me->queue_position()) + else if (p > current_pos) { - for (auto& i : m_torrents) + // we're moving the torrent down the queue + p = std::min(p, m_download_queue.end_index() - 1); + for (int i = current_pos; i < p; ++i) { - torrent* t = i.second.get(); - int pos = t->queue_position(); - if (t == me) continue; - if (pos == -1) continue; - - if (pos <= p - && pos > me->queue_position() - && pos != -1) - { - t->set_queue_position_impl(t->queue_position()-1); - t->state_updated(); - } - + m_download_queue[i] = m_download_queue[i + 1]; + m_download_queue[i]->set_queue_position_impl(i); } - me->set_queue_position_impl((std::min)(m_max_queue_pos, p)); + m_download_queue[p] = me; + me->set_queue_position_impl(p); } trigger_auto_manage(); @@ -4904,11 +4894,10 @@ namespace aux { return std::make_pair(ptr_t(), false); } - int queue_pos = ++m_max_queue_pos; - torrent_ptr = std::make_shared(*this - , 16 * 1024, queue_pos, m_paused + , 16 * 1024, m_paused , params, params.info_hash); + torrent_ptr->set_queue_position(m_download_queue.end_index()); return std::make_pair(torrent_ptr, true); } @@ -5027,7 +5016,6 @@ namespace aux { remove_torrent_impl(tptr, options); tptr->abort(); - tptr->set_queue_position(-1); } void session_impl::remove_torrent_impl(std::shared_ptr tptr @@ -5884,7 +5872,6 @@ namespace aux { { // this is not allowed to be the network thread! // TORRENT_ASSERT(is_not_thread()); - // TODO: asserts that no outstanding async operations are still in flight #if defined TORRENT_ASIO_DEBUGGING @@ -6794,6 +6781,13 @@ namespace aux { { TORRENT_ASSERT(i->m_links[l].in_list()); } + + int idx = 0; + for (auto t : m_download_queue) + { + TORRENT_ASSERT(t->queue_position() == idx); + ++idx; + } } int const num_gauges = counters::num_error_torrents - counters::num_checking_torrents + 1; diff --git a/src/torrent.cpp b/src/torrent.cpp index 7c79d12d8..ca7e3992f 100644 --- a/src/torrent.cpp +++ b/src/torrent.cpp @@ -163,7 +163,6 @@ namespace libtorrent torrent::torrent( aux::session_interface& ses , int const block_size - , int const seq , bool const session_paused , add_torrent_params const& p , sha1_hash const& info_hash) @@ -181,7 +180,7 @@ namespace libtorrent , m_storage_constructor(p.storage) , m_info_hash(info_hash) , m_error_file(torrent_status::error_file_none) - , m_sequence_number(seq) + , m_sequence_number(-1) , m_announce_to_trackers((p.flags & add_torrent_params::flag_paused) == 0) , m_announce_to_lsd((p.flags & add_torrent_params::flag_paused) == 0) , m_has_incoming(false) @@ -7520,6 +7519,9 @@ namespace libtorrent TORRENT_ASSERT(current_stats_state() == int(m_current_gauge_state + counters::num_checking_torrents) || m_current_gauge_state == no_gauge_state); + TORRENT_ASSERT(m_sequence_number == -1 + || m_ses.verify_queue_position(this, m_sequence_number)); + for (auto const& i : m_time_critical_pieces) { TORRENT_ASSERT(!is_seed()); @@ -7808,7 +7810,8 @@ namespace libtorrent TORRENT_ASSERT((p == -1) == is_finished() || (!m_auto_managed && p == -1) - || (m_abort && p == -1)); + || (m_abort && p == -1) + || (!m_added && p == -1)); if (p == m_sequence_number) return; TORRENT_ASSERT(p >= -1);