simplify interaction with disk_io_thread by removing the uncork interface (#1271)

simplify the interaction with the disk_io_thread by removing the uncork interface. This should be turned into a transparent part of peer_connection instead and remove cork logic from peer_connection
This commit is contained in:
Arvid Norberg 2016-10-30 18:21:07 -04:00 committed by GitHub
parent d1e916ec9e
commit 8daa200d11
11 changed files with 42 additions and 172 deletions

View File

@ -142,7 +142,6 @@ nobase_include_HEADERS = \
torrent_status.hpp \
udp_socket.hpp \
udp_tracker_connection.hpp \
uncork_interface.hpp \
union_endpoint.hpp \
upnp.hpp \
utp_socket_manager.hpp \

View File

@ -36,7 +36,6 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/config.hpp"
#include "libtorrent/aux_/session_settings.hpp"
#include "libtorrent/aux_/session_interface.hpp"
#include "libtorrent/uncork_interface.hpp"
#include "libtorrent/linked_list.hpp"
#include "libtorrent/torrent_peer.hpp"
#include "libtorrent/torrent_peer_allocator.hpp"
@ -192,7 +191,6 @@ namespace libtorrent
, aux::portmap_callback
, aux::lsd_callback
, boost::noncopyable
, uncork_interface
, single_threaded
, aux::error_handler_interface
{
@ -638,14 +636,6 @@ namespace libtorrent
mutable std::mutex mut;
mutable std::condition_variable cond;
// cork a peer and schedule a delayed uncork
// does nothing if the peer is already corked
void cork_burst(peer_connection* p) override;
// uncork all peers added to the delayed uncork queue
// implements uncork_interface
virtual void do_delayed_uncork() override;
// implements session_interface
virtual tcp::endpoint bind_outgoing_socket(socket_type& s, address
const& remote_address, error_code& ec) const override;
@ -1210,14 +1200,6 @@ namespace libtorrent
// is true if the session is paused
bool m_paused = false;
// this is a list of peer connections who have been
// corked (i.e. their network socket) and needs to be
// uncorked at the end of the burst of events. This is
// here to coalesce the effects of bursts of events
// into fewer network writes, saving CPU and possibly
// ending up sending larger network packets
std::vector<peer_connection*> m_delayed_uncorks;
};
#ifndef TORRENT_DISABLE_LOGGING

View File

@ -191,10 +191,6 @@ namespace libtorrent { namespace aux
virtual peer_id const& get_peer_id() const = 0;
// cork a peer and schedule a delayed uncork
// does nothing if the peer is already corked
virtual void cork_burst(peer_connection* p) = 0;
virtual void close_connection(peer_connection* p, error_code const& ec) = 0;
virtual int num_connections() const = 0;

View File

@ -279,7 +279,6 @@ namespace libtorrent
{
disk_io_thread(io_service& ios
, counters& cnt
, void* userdata
, int block_size = 16 * 1024);
~disk_io_thread();
@ -419,7 +418,7 @@ namespace libtorrent
int do_tick(disk_io_job* j, jobqueue_t& completed_jobs);
int do_resolve_links(disk_io_job* j, jobqueue_t& completed_jobs);
void call_job_handlers(void* userdata);
void call_job_handlers();
private:

View File

@ -63,6 +63,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/debug.hpp"
#include "libtorrent/span.hpp"
#include "libtorrent/piece_block.hpp"
#include "libtorrent/peer_info.hpp"
#include <ctime>
#include <algorithm>
@ -76,7 +77,6 @@ POSSIBILITY OF SUCH DAMAGE.
namespace libtorrent
{
class torrent;
struct peer_info;
struct disk_io_job;
struct disk_interface;
struct torrent_peer;
@ -166,7 +166,6 @@ namespace libtorrent
, m_snubbed(false)
, m_interesting(false)
, m_choked(true)
, m_corked(false)
, m_ignore_stats(false)
{}
@ -230,13 +229,6 @@ namespace libtorrent
// we have choked the upload to the peer
bool m_choked:1;
// when this is set, the peer_connection socket is
// corked, similar to the linux TCP feature TCP_CORK.
// we won't send anything to the actual socket, just
// buffer messages up in the application layer send
// buffer, and send it once we're uncorked.
bool m_corked:1;
// when this is set, the transfer stats for this connection
// is not included in the torrent or session stats
bool m_ignore_stats:1;
@ -263,6 +255,7 @@ namespace libtorrent
{
friend class invariant_access;
friend class torrent;
friend struct cork;
public:
void on_exception(std::exception const& e) override;
@ -635,10 +628,6 @@ namespace libtorrent
void send_buffer(char const* begin, int size, int flags = 0);
void setup_send();
void cork_socket() { TORRENT_ASSERT(!m_corked); m_corked = true; }
bool is_corked() const { return m_corked; }
void uncork_socket();
void append_send_buffer(char* buffer, int size
, chained_buffer::free_buffer_fun destructor = &nop
, void* userdata = nullptr, block_cache_reference ref
@ -1218,16 +1207,27 @@ namespace libtorrent
{
explicit cork(peer_connection& p): m_pc(p)
{
if (m_pc.is_corked()) return;
m_pc.cork_socket();
if (m_pc.m_channel_state[peer_connection::upload_channel] & peer_info::bw_network)
return;
// pretend that there's an outstanding send operation already, to
// prevent future calls to setup_send() from actually causing an
// asyc_send() to be issued.
m_pc.m_channel_state[peer_connection::upload_channel] |= peer_info::bw_network;
m_need_uncork = true;
}
~cork() { if (m_need_uncork) m_pc.uncork_socket(); }
cork(cork const&) = delete;
cork& operator=(cork const&) = delete;
~cork()
{
if (!m_need_uncork) return;
m_pc.m_channel_state[peer_connection::upload_channel] &= ~peer_info::bw_network;
m_pc.setup_send();
}
private:
peer_connection& m_pc;
bool m_need_uncork = false;
cork& operator=(cork const&);
};
}

View File

@ -1,58 +0,0 @@
/*
Copyright (c) 2012-2016, Arvid Norberg
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in
the documentation and/or other materials provided with the distribution.
* Neither the name of the author nor the names of its
contributors may be used to endorse or promote products derived
from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef TORRENT_UNCORK_INTERFACE_HPP
#define TORRENT_UNCORK_INTERFACE_HPP
#include "libtorrent/export.hpp"
namespace libtorrent
{
// the uncork interface is used by the disk_io_thread
// to indicate that it has called all the disk job handlers
// in the current batch. The intention is for the peer
// connections to be able to not issue any sends on their
// sockets until they have received all the disk jobs
// that are ready first. This makes the networking more
// efficient since it can send larger buffers down to the
// kernel per system call.
// uncorking refers to releasing the "cork" in the peers
// preventing them to issue sends
struct TORRENT_EXTRA_EXPORT uncork_interface
{
virtual void do_delayed_uncork() = 0;
protected:
~uncork_interface() {}
};
}
#endif

View File

@ -265,7 +265,7 @@ namespace libtorrent
// dummy torrent object pointer
std::shared_ptr<char> dummy;
counters cnt;
disk_io_thread disk_thread(ios, cnt, nullptr);
disk_io_thread disk_thread(ios, cnt);
disk_thread.set_num_threads(1);
storage_params params;

View File

@ -46,7 +46,6 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/disk_buffer_pool.hpp"
#include "libtorrent/disk_io_job.hpp"
#include "libtorrent/alert_types.hpp"
#include "libtorrent/uncork_interface.hpp"
#include "libtorrent/performance_counters.hpp"
#include "libtorrent/alert_manager.hpp"
#include "libtorrent/debug.hpp"
@ -179,13 +178,11 @@ namespace libtorrent
disk_io_thread::disk_io_thread(io_service& ios
, counters& cnt
, void* userdata
, int const block_size)
: m_generic_io_jobs(*this, generic_thread)
, m_generic_threads(m_generic_io_jobs, ios)
, m_hash_io_jobs(*this, hasher_thread)
, m_hash_threads(m_hash_io_jobs, ios)
, m_userdata(userdata)
, m_last_file_check(clock_type::now())
, m_disk_cache(block_size, ios, std::bind(&disk_io_thread::trigger_cache_trim, this))
, m_stats_counters(cnt)
@ -3518,16 +3515,13 @@ namespace libtorrent
// we take this lock just to make the logging prettier (non-interleaved)
DLOG("posting job handlers (%d)\n", m_completed_jobs.size());
m_ios.post(std::bind(&disk_io_thread::call_job_handlers, this, m_userdata));
m_ios.post(std::bind(&disk_io_thread::call_job_handlers, this));
m_job_completions_in_flight = true;
}
}
// This is run in the network thread
// TODO: 2 it would be nice to get rid of m_userdata and just have a function
// object to pass all the job completions to. It could in turn be responsible
// for posting them to the correct io_service
void disk_io_thread::call_job_handlers(void* userdata)
void disk_io_thread::call_job_handlers()
{
std::unique_lock<std::mutex> l(m_completed_jobs_mutex);
@ -3539,7 +3533,6 @@ namespace libtorrent
disk_io_job* j = m_completed_jobs.get_all();
l.unlock();
uncork_interface* uncork = static_cast<uncork_interface*>(userdata);
std::array<disk_io_job*, 64> to_delete;
int cnt = 0;
@ -3564,10 +3557,6 @@ namespace libtorrent
}
if (cnt > 0) free_jobs(to_delete.data(), cnt);
// uncork all peers who received a disk event. This is
// to coalesce all the socket writes caused by the events.
if (uncork) uncork->do_delayed_uncork();
}
#if TORRENT_USE_INVARIANT_CHECKS

View File

@ -2986,10 +2986,6 @@ namespace libtorrent
m_channel_state[download_channel] &= ~peer_info::bw_disk;
}
// flush send buffer at the end of
// this burst of disk events
// m_ses.cork_burst(this);
INVARIANT_CHECK;
if (!t)
@ -5258,10 +5254,6 @@ namespace libtorrent
if (m_disconnecting) return;
// flush send buffer at the end of
// this burst of disk events
// m_ses.cork_burst(this);
if (!t)
{
disconnect(j->error.ec, op_file_read);
@ -5423,14 +5415,6 @@ namespace libtorrent
return ret;
}
void peer_connection::uncork_socket()
{
TORRENT_ASSERT(is_single_thread());
if (!m_corked) return;
m_corked = false;
setup_send();
}
void peer_connection::setup_send()
{
TORRENT_ASSERT(is_single_thread());
@ -5439,7 +5423,16 @@ namespace libtorrent
// we may want to request more quota at this point
request_bandwidth(upload_channel);
if (m_channel_state[upload_channel] & peer_info::bw_network) return;
// if we already have an outstanding send operation, don't issue another
// one, instead accrue more send buffer to coalesce for the next write
if (m_channel_state[upload_channel] & peer_info::bw_network)
{
#ifndef TORRENT_DISABLE_LOGGING
peer_log(peer_log_alert::outgoing, "CORKED_WRITE", "bytes: %d"
, int(m_send_buffer.size()));
#endif
return;
}
if (m_send_barrier == 0)
{
@ -5470,7 +5463,6 @@ namespace libtorrent
}
int const quota_left = m_quota[upload_channel];
if (m_send_buffer.empty()
&& m_reading_bytes > 0
&& quota_left > 0)
@ -5542,24 +5534,13 @@ namespace libtorrent
return;
}
// send the actual buffer
int amount_to_send = m_send_buffer.size();
if (amount_to_send > quota_left)
amount_to_send = quota_left;
if (amount_to_send > m_send_barrier)
amount_to_send = m_send_barrier;
int const amount_to_send = std::min({
int(m_send_buffer.size())
, quota_left
, m_send_barrier});
TORRENT_ASSERT(amount_to_send > 0);
if (m_corked)
{
#ifndef TORRENT_DISABLE_LOGGING
peer_log(peer_log_alert::outgoing, "CORKED_WRITE", "bytes: %d"
, amount_to_send);
#endif
return;
}
TORRENT_ASSERT((m_channel_state[upload_channel] & peer_info::bw_network) == 0);
#ifndef TORRENT_DISABLE_LOGGING
peer_log(peer_log_alert::outgoing, "ASYNC_WRITE", "bytes: %d", amount_to_send);
@ -5816,11 +5797,11 @@ namespace libtorrent
// to keep the object alive through the exit check
std::shared_ptr<peer_connection> me(self());
TORRENT_ASSERT(bytes_transferred > 0);
// flush the send buffer at the end of this function
cork _c(*this);
TORRENT_ASSERT(bytes_transferred > 0);
// if we received exactly as many bytes as we provided a receive buffer
// for. There most likely are more bytes to read, and we should grow our
// receive buffer.
@ -6230,8 +6211,8 @@ namespace libtorrent
m_last_sent = now;
#if TORRENT_USE_ASSERTS
std::int64_t cur_payload_ul = m_statistics.last_payload_uploaded();
std::int64_t cur_protocol_ul = m_statistics.last_protocol_uploaded();
std::int64_t const cur_payload_ul = m_statistics.last_payload_uploaded();
std::int64_t const cur_protocol_ul = m_statistics.last_protocol_uploaded();
#endif
on_sent(error, bytes_transferred);
#if TORRENT_USE_ASSERTS

View File

@ -398,8 +398,7 @@ namespace aux {
, m_ssl_ctx(m_io_service, boost::asio::ssl::context::sslv23)
#endif
, m_alerts(m_settings.get_int(settings_pack::alert_queue_size), alert::all_categories)
, m_disk_thread(m_io_service, m_stats_counters
, static_cast<uncork_interface*>(this))
, m_disk_thread(m_io_service, m_stats_counters)
, m_download_rate(peer_connection::download_channel)
, m_upload_rate(peer_connection::upload_channel)
, m_tracker_manager(
@ -4141,23 +4140,6 @@ namespace aux {
}
}
void session_impl::cork_burst(peer_connection* p)
{
TORRENT_ASSERT(is_single_thread());
if (p->is_corked()) return;
p->cork_socket();
m_delayed_uncorks.push_back(p);
}
void session_impl::do_delayed_uncork()
{
m_stats_counters.inc_stats_counter(counters::on_disk_counter);
TORRENT_ASSERT(is_single_thread());
for (peer_connection* p : m_delayed_uncorks)
p->uncork_socket();
m_delayed_uncorks.clear();
}
std::shared_ptr<torrent> session_impl::delay_load_torrent(sha1_hash const& info_hash
, peer_connection* pc)
{

View File

@ -450,7 +450,7 @@ void test_check_files(std::string const& test_path
file_pool fp;
boost::asio::io_service ios;
counters cnt;
disk_io_thread io(ios, cnt, nullptr);
disk_io_thread io(ios, cnt);
io.set_num_threads(1);
disk_buffer_pool dp(16 * 1024, ios, std::bind(&nop));
storage_params p;