From b8829144613df7176e6b72212614629b56848bbd Mon Sep 17 00:00:00 2001 From: Arvid Norberg Date: Fri, 18 Jun 2010 04:43:20 +0000 Subject: [PATCH] refactored socket reading code to not use async. operations unnecessarily --- ChangeLog | 2 + include/libtorrent/natpmp.hpp | 2 +- include/libtorrent/peer_connection.hpp | 5 + src/connection_queue.cpp | 5 +- src/natpmp.cpp | 20 +++- src/peer_connection.cpp | 150 ++++++++++++++----------- src/tracker_manager.cpp | 4 +- 7 files changed, 110 insertions(+), 78 deletions(-) diff --git a/ChangeLog b/ChangeLog index f3aca1c97..e851fc7c0 100644 --- a/ChangeLog +++ b/ChangeLog @@ -33,6 +33,8 @@ incoming connection * added more detailed instrumentation of the disk I/O thread + * refactored socket reading code to not use async. operations unnecessarily + * some timer optimizations * removed the reuse-address flag on the listen socket * fixed bug where local peer discovery and DHT wouldn't be announced to without trackers * fixed bug in bdecoder when decoding invalid messages diff --git a/include/libtorrent/natpmp.hpp b/include/libtorrent/natpmp.hpp index 35775117f..c8f34f78b 100644 --- a/include/libtorrent/natpmp.hpp +++ b/include/libtorrent/natpmp.hpp @@ -79,7 +79,7 @@ private: void on_reply(error_code const& e , std::size_t bytes_transferred); void try_next_mapping(int i, mutex::scoped_lock& l); - void update_expiration_timer(); + void update_expiration_timer(mutex::scoped_lock& l); void mapping_expired(error_code const& e, int i); void close_impl(mutex::scoped_lock& l); diff --git a/include/libtorrent/peer_connection.hpp b/include/libtorrent/peer_connection.hpp index 7b543b8bf..9481e3068 100644 --- a/include/libtorrent/peer_connection.hpp +++ b/include/libtorrent/peer_connection.hpp @@ -560,6 +560,9 @@ namespace libtorrent protected: + enum sync_t { read_async, read_sync }; + size_t try_read(sync_t s, error_code& ec); + virtual void get_specific_peer_info(peer_info& p) const = 0; virtual void write_choke() = 0; @@ -637,6 +640,8 @@ namespace libtorrent , std::size_t bytes_transferred); void on_receive_data(error_code const& error , std::size_t bytes_transferred); + void on_receive_data_nolock(error_code const& error + , std::size_t bytes_transferred); // this is the limit on the number of outstanding requests // we have to this peer. This is initialized to the settings diff --git a/src/connection_queue.cpp b/src/connection_queue.cpp index 6f4474b60..95f3a214a 100644 --- a/src/connection_queue.cpp +++ b/src/connection_queue.cpp @@ -165,6 +165,7 @@ namespace libtorrent i != m_queue.end(); ++i) { if (i->connecting) ++num_connecting; + else TORRENT_ASSERT(i->expires == max_time()); } TORRENT_ASSERT(num_connecting == m_num_connecting); } @@ -198,7 +199,7 @@ namespace libtorrent while (i != m_queue.end()) { TORRENT_ASSERT(i->connecting == false); - ptime expire = time_now() + i->timeout; + ptime expire = time_now_hires() + i->timeout; if (m_num_connecting == 0) { error_code ec; @@ -262,7 +263,7 @@ namespace libtorrent if (e) return; ptime next_expire = max_time(); - ptime now = time_now(); + ptime now = time_now_hires() + milliseconds(100); std::list timed_out; for (std::list::iterator i = m_queue.begin(); !m_queue.empty() && i != m_queue.end();) diff --git a/src/natpmp.cpp b/src/natpmp.cpp index 72fb48a80..de6a5f856 100644 --- a/src/natpmp.cpp +++ b/src/natpmp.cpp @@ -477,15 +477,15 @@ void natpmp::on_reply(error_code const& e m_currently_mapping = -1; m->action = mapping_t::action_none; m_send_timer.cancel(ec); - update_expiration_timer(); + update_expiration_timer(l); try_next_mapping(index, l); } -void natpmp::update_expiration_timer() +void natpmp::update_expiration_timer(mutex::scoped_lock& l) { if (m_abort) return; - ptime now = time_now(); + ptime now = time_now() + milliseconds(100); /* #if defined(TORRENT_LOGGING) || defined(TORRENT_VERBOSE_LOGGING) m_log << time_now_string() << "update_expiration_timer " << std::endl; @@ -509,10 +509,20 @@ void natpmp::update_expiration_timer() { if (i->protocol == none || i->action != mapping_t::action_none) continue; - if (i->expires < min_expire) + int index = i - m_mappings.begin(); + if (i->expires < now) + { + char msg[200]; + snprintf(msg, sizeof(msg), "mapping %u expired", index); + log(msg, l); + i->action = mapping_t::action_add; + if (m_next_refresh == index) m_next_refresh = -1; + update_mapping(index, l); + } + else if (i->expires < min_expire) { min_expire = i->expires; - min_index = i - m_mappings.begin(); + min_index = index; } } diff --git a/src/peer_connection.cpp b/src/peer_connection.cpp index 85fa7143d..b1e70c6c7 100644 --- a/src/peer_connection.cpp +++ b/src/peer_connection.cpp @@ -3732,6 +3732,7 @@ namespace libtorrent } if (!m_download_queue.empty() + && m_quota[download_channel] > 0 && now > m_requested + seconds(m_ses.settings().request_timeout + m_timeout_extend)) { @@ -3816,6 +3817,7 @@ namespace libtorrent if (piece_timeout < rate_limit_timeout) piece_timeout = rate_limit_timeout; if (!m_download_queue.empty() + && m_quota[download_channel] > 0 && now - m_last_piece > seconds(piece_timeout + m_timeout_extend)) { // this peer isn't sending the pieces we've @@ -4367,6 +4369,22 @@ namespace libtorrent return; } + error_code ec; + size_t bytes_transferred = try_read(read_sync, ec); + + if (ec == asio::error::would_block) + { + try_read(read_async, ec); + } + else + { + m_channel_state[download_channel] = peer_info::bw_network; + on_receive_data_nolock(ec, bytes_transferred); + } + } + + size_t peer_connection::try_read(sync_t s, error_code& ec) + { TORRENT_ASSERT(m_packet_size > 0); int max_receive = m_packet_size - m_recv_pos; TORRENT_ASSERT(max_receive >= 0); @@ -4378,40 +4396,42 @@ namespace libtorrent if (max_receive > quota_left) max_receive = quota_left; - if (max_receive == 0) return; + if (max_receive == 0) + { + ec = asio::error::would_block; + return 0; + } TORRENT_ASSERT(m_recv_pos >= 0); TORRENT_ASSERT(m_packet_size > 0); - TORRENT_ASSERT(can_read()); -#ifdef TORRENT_VERBOSE_LOGGING - (*m_logger) << time_now_string() << " *** ASYNC_READ [ max: " << max_receive << " bytes ]\n"; -#endif + + if (!can_read()) + { + ec = asio::error::would_block; + return 0; + } int regular_buffer_size = m_packet_size - m_disk_recv_buffer_size; if (int(m_recv_buffer.size()) < regular_buffer_size) m_recv_buffer.resize(regular_buffer_size); + boost::array vec; + int num_bufs = 0; if (!m_disk_recv_buffer || regular_buffer_size >= m_recv_pos + max_receive) { // only receive into regular buffer TORRENT_ASSERT(m_recv_pos + max_receive <= int(m_recv_buffer.size())); - m_socket->async_read_some(asio::buffer(&m_recv_buffer[m_recv_pos] - , max_receive) - , make_read_handler( - boost::bind(&peer_connection::on_receive_data, self(), _1, _2) - )); + vec[0] = asio::buffer(&m_recv_buffer[m_recv_pos], max_receive); + num_bufs = 1; } else if (m_recv_pos >= regular_buffer_size) { // only receive into disk buffer TORRENT_ASSERT(m_recv_pos - regular_buffer_size >= 0); TORRENT_ASSERT(m_recv_pos - regular_buffer_size + max_receive <= m_disk_recv_buffer_size); - m_socket->async_read_some(asio::buffer(m_disk_recv_buffer.get() + m_recv_pos - regular_buffer_size - , max_receive) - , make_read_handler( - boost::bind(&peer_connection::on_receive_data, self(), _1, _2) - )); + vec[0] = asio::buffer(m_disk_recv_buffer.get() + m_recv_pos - regular_buffer_size, max_receive); + num_bufs = 1; } else { @@ -4421,16 +4441,49 @@ namespace libtorrent TORRENT_ASSERT(max_receive - regular_buffer_size + m_recv_pos <= m_disk_recv_buffer_size); - boost::array vec; vec[0] = asio::buffer(&m_recv_buffer[m_recv_pos] , regular_buffer_size - m_recv_pos); vec[1] = asio::buffer(m_disk_recv_buffer.get() , max_receive - regular_buffer_size + m_recv_pos); - m_socket->async_read_some( - vec, make_read_handler( - boost::bind(&peer_connection::on_receive_data, self(), _1, _2))); + num_bufs = 2; } - m_channel_state[download_channel] = peer_info::bw_network; + + if (s == read_async) + { + m_channel_state[download_channel] = peer_info::bw_network; +#ifdef TORRENT_VERBOSE_LOGGING + (*m_logger) << time_now_string() << " *** ASYNC_READ [ max: " << max_receive << " bytes ]\n"; +#endif + + if (num_bufs == 1) + { + m_socket->async_read_some( + asio::mutable_buffers_1(vec[0]), make_read_handler( + boost::bind(&peer_connection::on_receive_data, self(), _1, _2))); + } + else + { + m_socket->async_read_some( + vec, make_read_handler( + boost::bind(&peer_connection::on_receive_data, self(), _1, _2))); + } + return 0; + } + + size_t ret = 0; + if (num_bufs == 1) + { + ret = m_socket->read_some(asio::mutable_buffers_1(vec[0]), ec); + } + else + { + ret = m_socket->read_some(vec, ec); + } + +#ifdef TORRENT_VERBOSE_LOGGING + (*m_logger) << time_now_string() << " *** SYNC_READ [ max: " << max_receive << " ret: " << ret << " e: " << ec.message() << " ]\n"; +#endif + return ret; } #ifndef TORRENT_DISABLE_ENCRYPTION @@ -4573,6 +4626,12 @@ namespace libtorrent , std::size_t bytes_transferred) { mutex::scoped_lock l(m_ses.m_mutex); + on_receive_data_nolock(error, bytes_transferred); + } + + void peer_connection::on_receive_data_nolock(const error_code& error + , std::size_t bytes_transferred) + { INVARIANT_CHECK; @@ -4651,54 +4710,9 @@ namespace libtorrent } if (m_recv_pos >= m_soft_packet_size) m_soft_packet_size = 0; - max_receive = m_packet_size - m_recv_pos; - TORRENT_ASSERT(max_receive >= 0); - if (m_soft_packet_size && max_receive > m_soft_packet_size - m_recv_pos) - max_receive = m_soft_packet_size - m_recv_pos; - int quota_left = m_quota[download_channel]; - if (max_receive > quota_left) - max_receive = quota_left; - if (max_receive == 0) break; - - int regular_buffer_size = m_packet_size - m_disk_recv_buffer_size; - - if (int(m_recv_buffer.size()) < regular_buffer_size) - m_recv_buffer.resize(regular_buffer_size); - - error_code ec; - if (!m_disk_recv_buffer || regular_buffer_size >= m_recv_pos + max_receive) - { - // only receive into regular buffer - TORRENT_ASSERT(m_recv_pos + max_receive <= int(m_recv_buffer.size())); - bytes_transferred = m_socket->read_some(asio::buffer(&m_recv_buffer[m_recv_pos] - , max_receive), ec); - } - else if (m_recv_pos >= regular_buffer_size) - { - // only receive into disk buffer - TORRENT_ASSERT(m_recv_pos - regular_buffer_size >= 0); - TORRENT_ASSERT(m_recv_pos - regular_buffer_size + max_receive <= m_disk_recv_buffer_size); - bytes_transferred = m_socket->read_some(asio::buffer(m_disk_recv_buffer.get() - + m_recv_pos - regular_buffer_size, (std::min)(m_packet_size - - m_recv_pos, max_receive)), ec); - } - else - { - // receive into both regular and disk buffer - TORRENT_ASSERT(max_receive + m_recv_pos > regular_buffer_size); - TORRENT_ASSERT(m_recv_pos < regular_buffer_size); - TORRENT_ASSERT(max_receive - regular_buffer_size - + m_recv_pos <= m_disk_recv_buffer_size); - - boost::array vec; - vec[0] = asio::buffer(&m_recv_buffer[m_recv_pos] - , regular_buffer_size - m_recv_pos); - vec[1] = asio::buffer(m_disk_recv_buffer.get() - , (std::min)(m_disk_recv_buffer_size - , max_receive - regular_buffer_size + m_recv_pos)); - bytes_transferred = m_socket->read_some(vec, ec); - } + error_code ec; + bytes_transferred = try_read(read_sync, ec); if (ec && ec != asio::error::would_block) { m_statistics.trancieve_ip_packet(bytes_in_loop, m_remote.address().is_v6()); @@ -4717,7 +4731,7 @@ namespace libtorrent } m_statistics.trancieve_ip_packet(bytes_in_loop, m_remote.address().is_v6()); - setup_receive(); + setup_receive(); } bool peer_connection::can_write() const diff --git a/src/tracker_manager.cpp b/src/tracker_manager.cpp index 4ece3850d..33cf0eb93 100644 --- a/src/tracker_manager.cpp +++ b/src/tracker_manager.cpp @@ -105,9 +105,9 @@ namespace libtorrent time_duration completion_timeout = now - m_start_time; if (m_read_timeout - < total_seconds(receive_timeout) + <= total_seconds(receive_timeout) || m_completion_timeout - < total_seconds(completion_timeout)) + <= total_seconds(completion_timeout)) { on_timeout(); return;