diff --git a/include/libtorrent/Makefile.am b/include/libtorrent/Makefile.am index 433b9ef3c..24e2377c7 100644 --- a/include/libtorrent/Makefile.am +++ b/include/libtorrent/Makefile.am @@ -142,7 +142,6 @@ nobase_include_HEADERS = \ torrent_status.hpp \ udp_socket.hpp \ udp_tracker_connection.hpp \ - uncork_interface.hpp \ union_endpoint.hpp \ upnp.hpp \ utp_socket_manager.hpp \ diff --git a/include/libtorrent/aux_/session_impl.hpp b/include/libtorrent/aux_/session_impl.hpp index e3b6bc004..d52aaf529 100644 --- a/include/libtorrent/aux_/session_impl.hpp +++ b/include/libtorrent/aux_/session_impl.hpp @@ -36,7 +36,6 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/config.hpp" #include "libtorrent/aux_/session_settings.hpp" #include "libtorrent/aux_/session_interface.hpp" -#include "libtorrent/uncork_interface.hpp" #include "libtorrent/linked_list.hpp" #include "libtorrent/torrent_peer.hpp" #include "libtorrent/torrent_peer_allocator.hpp" @@ -192,7 +191,6 @@ namespace libtorrent , aux::portmap_callback , aux::lsd_callback , boost::noncopyable - , uncork_interface , single_threaded , aux::error_handler_interface { @@ -638,14 +636,6 @@ namespace libtorrent mutable std::mutex mut; mutable std::condition_variable cond; - // cork a peer and schedule a delayed uncork - // does nothing if the peer is already corked - void cork_burst(peer_connection* p) override; - - // uncork all peers added to the delayed uncork queue - // implements uncork_interface - virtual void do_delayed_uncork() override; - // implements session_interface virtual tcp::endpoint bind_outgoing_socket(socket_type& s, address const& remote_address, error_code& ec) const override; @@ -1210,14 +1200,6 @@ namespace libtorrent // is true if the session is paused bool m_paused = false; - - // this is a list of peer connections who have been - // corked (i.e. their network socket) and needs to be - // uncorked at the end of the burst of events. This is - // here to coalesce the effects of bursts of events - // into fewer network writes, saving CPU and possibly - // ending up sending larger network packets - std::vector m_delayed_uncorks; }; #ifndef TORRENT_DISABLE_LOGGING diff --git a/include/libtorrent/aux_/session_interface.hpp b/include/libtorrent/aux_/session_interface.hpp index 241b169e9..752132b1d 100644 --- a/include/libtorrent/aux_/session_interface.hpp +++ b/include/libtorrent/aux_/session_interface.hpp @@ -191,10 +191,6 @@ namespace libtorrent { namespace aux virtual peer_id const& get_peer_id() const = 0; - // cork a peer and schedule a delayed uncork - // does nothing if the peer is already corked - virtual void cork_burst(peer_connection* p) = 0; - virtual void close_connection(peer_connection* p, error_code const& ec) = 0; virtual int num_connections() const = 0; diff --git a/include/libtorrent/disk_io_thread.hpp b/include/libtorrent/disk_io_thread.hpp index a1c76fbb2..1e83518ed 100644 --- a/include/libtorrent/disk_io_thread.hpp +++ b/include/libtorrent/disk_io_thread.hpp @@ -279,7 +279,6 @@ namespace libtorrent { disk_io_thread(io_service& ios , counters& cnt - , void* userdata , int block_size = 16 * 1024); ~disk_io_thread(); @@ -419,7 +418,7 @@ namespace libtorrent int do_tick(disk_io_job* j, jobqueue_t& completed_jobs); int do_resolve_links(disk_io_job* j, jobqueue_t& completed_jobs); - void call_job_handlers(void* userdata); + void call_job_handlers(); private: diff --git a/include/libtorrent/peer_connection.hpp b/include/libtorrent/peer_connection.hpp index 0e3976584..a34dff3b4 100644 --- a/include/libtorrent/peer_connection.hpp +++ b/include/libtorrent/peer_connection.hpp @@ -63,6 +63,7 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/debug.hpp" #include "libtorrent/span.hpp" #include "libtorrent/piece_block.hpp" +#include "libtorrent/peer_info.hpp" #include #include @@ -76,7 +77,6 @@ POSSIBILITY OF SUCH DAMAGE. namespace libtorrent { class torrent; - struct peer_info; struct disk_io_job; struct disk_interface; struct torrent_peer; @@ -166,7 +166,6 @@ namespace libtorrent , m_snubbed(false) , m_interesting(false) , m_choked(true) - , m_corked(false) , m_ignore_stats(false) {} @@ -230,13 +229,6 @@ namespace libtorrent // we have choked the upload to the peer bool m_choked:1; - // when this is set, the peer_connection socket is - // corked, similar to the linux TCP feature TCP_CORK. - // we won't send anything to the actual socket, just - // buffer messages up in the application layer send - // buffer, and send it once we're uncorked. - bool m_corked:1; - // when this is set, the transfer stats for this connection // is not included in the torrent or session stats bool m_ignore_stats:1; @@ -263,6 +255,7 @@ namespace libtorrent { friend class invariant_access; friend class torrent; + friend struct cork; public: void on_exception(std::exception const& e) override; @@ -635,10 +628,6 @@ namespace libtorrent void send_buffer(char const* begin, int size, int flags = 0); void setup_send(); - void cork_socket() { TORRENT_ASSERT(!m_corked); m_corked = true; } - bool is_corked() const { return m_corked; } - void uncork_socket(); - void append_send_buffer(char* buffer, int size , chained_buffer::free_buffer_fun destructor = &nop , void* userdata = nullptr, block_cache_reference ref @@ -1218,16 +1207,27 @@ namespace libtorrent { explicit cork(peer_connection& p): m_pc(p) { - if (m_pc.is_corked()) return; - m_pc.cork_socket(); + if (m_pc.m_channel_state[peer_connection::upload_channel] & peer_info::bw_network) + return; + + // pretend that there's an outstanding send operation already, to + // prevent future calls to setup_send() from actually causing an + // asyc_send() to be issued. + m_pc.m_channel_state[peer_connection::upload_channel] |= peer_info::bw_network; m_need_uncork = true; } - ~cork() { if (m_need_uncork) m_pc.uncork_socket(); } + cork(cork const&) = delete; + cork& operator=(cork const&) = delete; + + ~cork() + { + if (!m_need_uncork) return; + m_pc.m_channel_state[peer_connection::upload_channel] &= ~peer_info::bw_network; + m_pc.setup_send(); + } private: peer_connection& m_pc; bool m_need_uncork = false; - - cork& operator=(cork const&); }; } diff --git a/include/libtorrent/uncork_interface.hpp b/include/libtorrent/uncork_interface.hpp deleted file mode 100644 index 4ab5dd569..000000000 --- a/include/libtorrent/uncork_interface.hpp +++ /dev/null @@ -1,58 +0,0 @@ -/* - -Copyright (c) 2012-2016, Arvid Norberg -All rights reserved. - -Redistribution and use in source and binary forms, with or without -modification, are permitted provided that the following conditions -are met: - - * Redistributions of source code must retain the above copyright - notice, this list of conditions and the following disclaimer. - * Redistributions in binary form must reproduce the above copyright - notice, this list of conditions and the following disclaimer in - the documentation and/or other materials provided with the distribution. - * Neither the name of the author nor the names of its - contributors may be used to endorse or promote products derived - from this software without specific prior written permission. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE -LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR -CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF -SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN -CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) -ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE -POSSIBILITY OF SUCH DAMAGE. - -*/ - -#ifndef TORRENT_UNCORK_INTERFACE_HPP -#define TORRENT_UNCORK_INTERFACE_HPP - -#include "libtorrent/export.hpp" - -namespace libtorrent -{ - // the uncork interface is used by the disk_io_thread - // to indicate that it has called all the disk job handlers - // in the current batch. The intention is for the peer - // connections to be able to not issue any sends on their - // sockets until they have received all the disk jobs - // that are ready first. This makes the networking more - // efficient since it can send larger buffers down to the - // kernel per system call. - // uncorking refers to releasing the "cork" in the peers - // preventing them to issue sends - struct TORRENT_EXTRA_EXPORT uncork_interface - { - virtual void do_delayed_uncork() = 0; - protected: - ~uncork_interface() {} - }; -} - -#endif diff --git a/src/create_torrent.cpp b/src/create_torrent.cpp index 0e306bd47..0018f0ae5 100644 --- a/src/create_torrent.cpp +++ b/src/create_torrent.cpp @@ -265,7 +265,7 @@ namespace libtorrent // dummy torrent object pointer std::shared_ptr dummy; counters cnt; - disk_io_thread disk_thread(ios, cnt, nullptr); + disk_io_thread disk_thread(ios, cnt); disk_thread.set_num_threads(1); storage_params params; diff --git a/src/disk_io_thread.cpp b/src/disk_io_thread.cpp index df17300b0..50f796f7f 100644 --- a/src/disk_io_thread.cpp +++ b/src/disk_io_thread.cpp @@ -46,7 +46,6 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/disk_buffer_pool.hpp" #include "libtorrent/disk_io_job.hpp" #include "libtorrent/alert_types.hpp" -#include "libtorrent/uncork_interface.hpp" #include "libtorrent/performance_counters.hpp" #include "libtorrent/alert_manager.hpp" #include "libtorrent/debug.hpp" @@ -179,13 +178,11 @@ namespace libtorrent disk_io_thread::disk_io_thread(io_service& ios , counters& cnt - , void* userdata , int const block_size) : m_generic_io_jobs(*this, generic_thread) , m_generic_threads(m_generic_io_jobs, ios) , m_hash_io_jobs(*this, hasher_thread) , m_hash_threads(m_hash_io_jobs, ios) - , m_userdata(userdata) , m_last_file_check(clock_type::now()) , m_disk_cache(block_size, ios, std::bind(&disk_io_thread::trigger_cache_trim, this)) , m_stats_counters(cnt) @@ -3518,16 +3515,13 @@ namespace libtorrent // we take this lock just to make the logging prettier (non-interleaved) DLOG("posting job handlers (%d)\n", m_completed_jobs.size()); - m_ios.post(std::bind(&disk_io_thread::call_job_handlers, this, m_userdata)); + m_ios.post(std::bind(&disk_io_thread::call_job_handlers, this)); m_job_completions_in_flight = true; } } // This is run in the network thread - // TODO: 2 it would be nice to get rid of m_userdata and just have a function - // object to pass all the job completions to. It could in turn be responsible - // for posting them to the correct io_service - void disk_io_thread::call_job_handlers(void* userdata) + void disk_io_thread::call_job_handlers() { std::unique_lock l(m_completed_jobs_mutex); @@ -3539,7 +3533,6 @@ namespace libtorrent disk_io_job* j = m_completed_jobs.get_all(); l.unlock(); - uncork_interface* uncork = static_cast(userdata); std::array to_delete; int cnt = 0; @@ -3564,10 +3557,6 @@ namespace libtorrent } if (cnt > 0) free_jobs(to_delete.data(), cnt); - - // uncork all peers who received a disk event. This is - // to coalesce all the socket writes caused by the events. - if (uncork) uncork->do_delayed_uncork(); } #if TORRENT_USE_INVARIANT_CHECKS diff --git a/src/peer_connection.cpp b/src/peer_connection.cpp index 4e485e822..f26381ea1 100644 --- a/src/peer_connection.cpp +++ b/src/peer_connection.cpp @@ -2986,10 +2986,6 @@ namespace libtorrent m_channel_state[download_channel] &= ~peer_info::bw_disk; } - // flush send buffer at the end of - // this burst of disk events -// m_ses.cork_burst(this); - INVARIANT_CHECK; if (!t) @@ -5258,10 +5254,6 @@ namespace libtorrent if (m_disconnecting) return; - // flush send buffer at the end of - // this burst of disk events -// m_ses.cork_burst(this); - if (!t) { disconnect(j->error.ec, op_file_read); @@ -5423,14 +5415,6 @@ namespace libtorrent return ret; } - void peer_connection::uncork_socket() - { - TORRENT_ASSERT(is_single_thread()); - if (!m_corked) return; - m_corked = false; - setup_send(); - } - void peer_connection::setup_send() { TORRENT_ASSERT(is_single_thread()); @@ -5439,7 +5423,16 @@ namespace libtorrent // we may want to request more quota at this point request_bandwidth(upload_channel); - if (m_channel_state[upload_channel] & peer_info::bw_network) return; + // if we already have an outstanding send operation, don't issue another + // one, instead accrue more send buffer to coalesce for the next write + if (m_channel_state[upload_channel] & peer_info::bw_network) + { +#ifndef TORRENT_DISABLE_LOGGING + peer_log(peer_log_alert::outgoing, "CORKED_WRITE", "bytes: %d" + , int(m_send_buffer.size())); +#endif + return; + } if (m_send_barrier == 0) { @@ -5470,7 +5463,6 @@ namespace libtorrent } int const quota_left = m_quota[upload_channel]; - if (m_send_buffer.empty() && m_reading_bytes > 0 && quota_left > 0) @@ -5542,24 +5534,13 @@ namespace libtorrent return; } - // send the actual buffer - int amount_to_send = m_send_buffer.size(); - if (amount_to_send > quota_left) - amount_to_send = quota_left; - if (amount_to_send > m_send_barrier) - amount_to_send = m_send_barrier; + int const amount_to_send = std::min({ + int(m_send_buffer.size()) + , quota_left + , m_send_barrier}); TORRENT_ASSERT(amount_to_send > 0); - if (m_corked) - { -#ifndef TORRENT_DISABLE_LOGGING - peer_log(peer_log_alert::outgoing, "CORKED_WRITE", "bytes: %d" - , amount_to_send); -#endif - return; - } - TORRENT_ASSERT((m_channel_state[upload_channel] & peer_info::bw_network) == 0); #ifndef TORRENT_DISABLE_LOGGING peer_log(peer_log_alert::outgoing, "ASYNC_WRITE", "bytes: %d", amount_to_send); @@ -5816,11 +5797,11 @@ namespace libtorrent // to keep the object alive through the exit check std::shared_ptr me(self()); + TORRENT_ASSERT(bytes_transferred > 0); + // flush the send buffer at the end of this function cork _c(*this); - TORRENT_ASSERT(bytes_transferred > 0); - // if we received exactly as many bytes as we provided a receive buffer // for. There most likely are more bytes to read, and we should grow our // receive buffer. @@ -6230,8 +6211,8 @@ namespace libtorrent m_last_sent = now; #if TORRENT_USE_ASSERTS - std::int64_t cur_payload_ul = m_statistics.last_payload_uploaded(); - std::int64_t cur_protocol_ul = m_statistics.last_protocol_uploaded(); + std::int64_t const cur_payload_ul = m_statistics.last_payload_uploaded(); + std::int64_t const cur_protocol_ul = m_statistics.last_protocol_uploaded(); #endif on_sent(error, bytes_transferred); #if TORRENT_USE_ASSERTS diff --git a/src/session_impl.cpp b/src/session_impl.cpp index 1eda77224..3e9ba01d2 100644 --- a/src/session_impl.cpp +++ b/src/session_impl.cpp @@ -398,8 +398,7 @@ namespace aux { , m_ssl_ctx(m_io_service, boost::asio::ssl::context::sslv23) #endif , m_alerts(m_settings.get_int(settings_pack::alert_queue_size), alert::all_categories) - , m_disk_thread(m_io_service, m_stats_counters - , static_cast(this)) + , m_disk_thread(m_io_service, m_stats_counters) , m_download_rate(peer_connection::download_channel) , m_upload_rate(peer_connection::upload_channel) , m_tracker_manager( @@ -4141,23 +4140,6 @@ namespace aux { } } - void session_impl::cork_burst(peer_connection* p) - { - TORRENT_ASSERT(is_single_thread()); - if (p->is_corked()) return; - p->cork_socket(); - m_delayed_uncorks.push_back(p); - } - - void session_impl::do_delayed_uncork() - { - m_stats_counters.inc_stats_counter(counters::on_disk_counter); - TORRENT_ASSERT(is_single_thread()); - for (peer_connection* p : m_delayed_uncorks) - p->uncork_socket(); - m_delayed_uncorks.clear(); - } - std::shared_ptr session_impl::delay_load_torrent(sha1_hash const& info_hash , peer_connection* pc) { diff --git a/test/test_storage.cpp b/test/test_storage.cpp index 970dc78d4..2f5448417 100644 --- a/test/test_storage.cpp +++ b/test/test_storage.cpp @@ -450,7 +450,7 @@ void test_check_files(std::string const& test_path file_pool fp; boost::asio::io_service ios; counters cnt; - disk_io_thread io(ios, cnt, nullptr); + disk_io_thread io(ios, cnt); io.set_num_threads(1); disk_buffer_pool dp(16 * 1024, ios, std::bind(&nop)); storage_params p;