refactored socket reading code to not use async. operations unnecessarily

This commit is contained in:
Arvid Norberg 2010-06-18 04:43:20 +00:00
parent 26c4a819c8
commit b882914461
7 changed files with 110 additions and 78 deletions

View File

@ -33,6 +33,8 @@
incoming connection incoming connection
* added more detailed instrumentation of the disk I/O thread * added more detailed instrumentation of the disk I/O thread
* refactored socket reading code to not use async. operations unnecessarily
* some timer optimizations
* removed the reuse-address flag on the listen socket * removed the reuse-address flag on the listen socket
* fixed bug where local peer discovery and DHT wouldn't be announced to without trackers * fixed bug where local peer discovery and DHT wouldn't be announced to without trackers
* fixed bug in bdecoder when decoding invalid messages * fixed bug in bdecoder when decoding invalid messages

View File

@ -79,7 +79,7 @@ private:
void on_reply(error_code const& e void on_reply(error_code const& e
, std::size_t bytes_transferred); , std::size_t bytes_transferred);
void try_next_mapping(int i, mutex::scoped_lock& l); void try_next_mapping(int i, mutex::scoped_lock& l);
void update_expiration_timer(); void update_expiration_timer(mutex::scoped_lock& l);
void mapping_expired(error_code const& e, int i); void mapping_expired(error_code const& e, int i);
void close_impl(mutex::scoped_lock& l); void close_impl(mutex::scoped_lock& l);

View File

@ -560,6 +560,9 @@ namespace libtorrent
protected: protected:
enum sync_t { read_async, read_sync };
size_t try_read(sync_t s, error_code& ec);
virtual void get_specific_peer_info(peer_info& p) const = 0; virtual void get_specific_peer_info(peer_info& p) const = 0;
virtual void write_choke() = 0; virtual void write_choke() = 0;
@ -637,6 +640,8 @@ namespace libtorrent
, std::size_t bytes_transferred); , std::size_t bytes_transferred);
void on_receive_data(error_code const& error void on_receive_data(error_code const& error
, std::size_t bytes_transferred); , std::size_t bytes_transferred);
void on_receive_data_nolock(error_code const& error
, std::size_t bytes_transferred);
// this is the limit on the number of outstanding requests // this is the limit on the number of outstanding requests
// we have to this peer. This is initialized to the settings // we have to this peer. This is initialized to the settings

View File

@ -165,6 +165,7 @@ namespace libtorrent
i != m_queue.end(); ++i) i != m_queue.end(); ++i)
{ {
if (i->connecting) ++num_connecting; if (i->connecting) ++num_connecting;
else TORRENT_ASSERT(i->expires == max_time());
} }
TORRENT_ASSERT(num_connecting == m_num_connecting); TORRENT_ASSERT(num_connecting == m_num_connecting);
} }
@ -198,7 +199,7 @@ namespace libtorrent
while (i != m_queue.end()) while (i != m_queue.end())
{ {
TORRENT_ASSERT(i->connecting == false); TORRENT_ASSERT(i->connecting == false);
ptime expire = time_now() + i->timeout; ptime expire = time_now_hires() + i->timeout;
if (m_num_connecting == 0) if (m_num_connecting == 0)
{ {
error_code ec; error_code ec;
@ -262,7 +263,7 @@ namespace libtorrent
if (e) return; if (e) return;
ptime next_expire = max_time(); ptime next_expire = max_time();
ptime now = time_now(); ptime now = time_now_hires() + milliseconds(100);
std::list<entry> timed_out; std::list<entry> timed_out;
for (std::list<entry>::iterator i = m_queue.begin(); for (std::list<entry>::iterator i = m_queue.begin();
!m_queue.empty() && i != m_queue.end();) !m_queue.empty() && i != m_queue.end();)

View File

@ -477,15 +477,15 @@ void natpmp::on_reply(error_code const& e
m_currently_mapping = -1; m_currently_mapping = -1;
m->action = mapping_t::action_none; m->action = mapping_t::action_none;
m_send_timer.cancel(ec); m_send_timer.cancel(ec);
update_expiration_timer(); update_expiration_timer(l);
try_next_mapping(index, l); try_next_mapping(index, l);
} }
void natpmp::update_expiration_timer() void natpmp::update_expiration_timer(mutex::scoped_lock& l)
{ {
if (m_abort) return; if (m_abort) return;
ptime now = time_now(); ptime now = time_now() + milliseconds(100);
/* /*
#if defined(TORRENT_LOGGING) || defined(TORRENT_VERBOSE_LOGGING) #if defined(TORRENT_LOGGING) || defined(TORRENT_VERBOSE_LOGGING)
m_log << time_now_string() << "update_expiration_timer " << std::endl; m_log << time_now_string() << "update_expiration_timer " << std::endl;
@ -509,10 +509,20 @@ void natpmp::update_expiration_timer()
{ {
if (i->protocol == none if (i->protocol == none
|| i->action != mapping_t::action_none) continue; || i->action != mapping_t::action_none) continue;
if (i->expires < min_expire) int index = i - m_mappings.begin();
if (i->expires < now)
{
char msg[200];
snprintf(msg, sizeof(msg), "mapping %u expired", index);
log(msg, l);
i->action = mapping_t::action_add;
if (m_next_refresh == index) m_next_refresh = -1;
update_mapping(index, l);
}
else if (i->expires < min_expire)
{ {
min_expire = i->expires; min_expire = i->expires;
min_index = i - m_mappings.begin(); min_index = index;
} }
} }

View File

@ -3732,6 +3732,7 @@ namespace libtorrent
} }
if (!m_download_queue.empty() if (!m_download_queue.empty()
&& m_quota[download_channel] > 0
&& now > m_requested + seconds(m_ses.settings().request_timeout && now > m_requested + seconds(m_ses.settings().request_timeout
+ m_timeout_extend)) + m_timeout_extend))
{ {
@ -3816,6 +3817,7 @@ namespace libtorrent
if (piece_timeout < rate_limit_timeout) piece_timeout = rate_limit_timeout; if (piece_timeout < rate_limit_timeout) piece_timeout = rate_limit_timeout;
if (!m_download_queue.empty() if (!m_download_queue.empty()
&& m_quota[download_channel] > 0
&& now - m_last_piece > seconds(piece_timeout + m_timeout_extend)) && now - m_last_piece > seconds(piece_timeout + m_timeout_extend))
{ {
// this peer isn't sending the pieces we've // this peer isn't sending the pieces we've
@ -4367,6 +4369,22 @@ namespace libtorrent
return; return;
} }
error_code ec;
size_t bytes_transferred = try_read(read_sync, ec);
if (ec == asio::error::would_block)
{
try_read(read_async, ec);
}
else
{
m_channel_state[download_channel] = peer_info::bw_network;
on_receive_data_nolock(ec, bytes_transferred);
}
}
size_t peer_connection::try_read(sync_t s, error_code& ec)
{
TORRENT_ASSERT(m_packet_size > 0); TORRENT_ASSERT(m_packet_size > 0);
int max_receive = m_packet_size - m_recv_pos; int max_receive = m_packet_size - m_recv_pos;
TORRENT_ASSERT(max_receive >= 0); TORRENT_ASSERT(max_receive >= 0);
@ -4378,40 +4396,42 @@ namespace libtorrent
if (max_receive > quota_left) if (max_receive > quota_left)
max_receive = quota_left; max_receive = quota_left;
if (max_receive == 0) return; if (max_receive == 0)
{
ec = asio::error::would_block;
return 0;
}
TORRENT_ASSERT(m_recv_pos >= 0); TORRENT_ASSERT(m_recv_pos >= 0);
TORRENT_ASSERT(m_packet_size > 0); TORRENT_ASSERT(m_packet_size > 0);
TORRENT_ASSERT(can_read());
#ifdef TORRENT_VERBOSE_LOGGING if (!can_read())
(*m_logger) << time_now_string() << " *** ASYNC_READ [ max: " << max_receive << " bytes ]\n"; {
#endif ec = asio::error::would_block;
return 0;
}
int regular_buffer_size = m_packet_size - m_disk_recv_buffer_size; int regular_buffer_size = m_packet_size - m_disk_recv_buffer_size;
if (int(m_recv_buffer.size()) < regular_buffer_size) if (int(m_recv_buffer.size()) < regular_buffer_size)
m_recv_buffer.resize(regular_buffer_size); m_recv_buffer.resize(regular_buffer_size);
boost::array<asio::mutable_buffer, 2> vec;
int num_bufs = 0;
if (!m_disk_recv_buffer || regular_buffer_size >= m_recv_pos + max_receive) if (!m_disk_recv_buffer || regular_buffer_size >= m_recv_pos + max_receive)
{ {
// only receive into regular buffer // only receive into regular buffer
TORRENT_ASSERT(m_recv_pos + max_receive <= int(m_recv_buffer.size())); TORRENT_ASSERT(m_recv_pos + max_receive <= int(m_recv_buffer.size()));
m_socket->async_read_some(asio::buffer(&m_recv_buffer[m_recv_pos] vec[0] = asio::buffer(&m_recv_buffer[m_recv_pos], max_receive);
, max_receive) num_bufs = 1;
, make_read_handler(
boost::bind(&peer_connection::on_receive_data, self(), _1, _2)
));
} }
else if (m_recv_pos >= regular_buffer_size) else if (m_recv_pos >= regular_buffer_size)
{ {
// only receive into disk buffer // only receive into disk buffer
TORRENT_ASSERT(m_recv_pos - regular_buffer_size >= 0); TORRENT_ASSERT(m_recv_pos - regular_buffer_size >= 0);
TORRENT_ASSERT(m_recv_pos - regular_buffer_size + max_receive <= m_disk_recv_buffer_size); TORRENT_ASSERT(m_recv_pos - regular_buffer_size + max_receive <= m_disk_recv_buffer_size);
m_socket->async_read_some(asio::buffer(m_disk_recv_buffer.get() + m_recv_pos - regular_buffer_size vec[0] = asio::buffer(m_disk_recv_buffer.get() + m_recv_pos - regular_buffer_size, max_receive);
, max_receive) num_bufs = 1;
, make_read_handler(
boost::bind(&peer_connection::on_receive_data, self(), _1, _2)
));
} }
else else
{ {
@ -4421,16 +4441,49 @@ namespace libtorrent
TORRENT_ASSERT(max_receive - regular_buffer_size TORRENT_ASSERT(max_receive - regular_buffer_size
+ m_recv_pos <= m_disk_recv_buffer_size); + m_recv_pos <= m_disk_recv_buffer_size);
boost::array<asio::mutable_buffer, 2> vec;
vec[0] = asio::buffer(&m_recv_buffer[m_recv_pos] vec[0] = asio::buffer(&m_recv_buffer[m_recv_pos]
, regular_buffer_size - m_recv_pos); , regular_buffer_size - m_recv_pos);
vec[1] = asio::buffer(m_disk_recv_buffer.get() vec[1] = asio::buffer(m_disk_recv_buffer.get()
, max_receive - regular_buffer_size + m_recv_pos); , max_receive - regular_buffer_size + m_recv_pos);
m_socket->async_read_some( num_bufs = 2;
vec, make_read_handler(
boost::bind(&peer_connection::on_receive_data, self(), _1, _2)));
} }
m_channel_state[download_channel] = peer_info::bw_network;
if (s == read_async)
{
m_channel_state[download_channel] = peer_info::bw_network;
#ifdef TORRENT_VERBOSE_LOGGING
(*m_logger) << time_now_string() << " *** ASYNC_READ [ max: " << max_receive << " bytes ]\n";
#endif
if (num_bufs == 1)
{
m_socket->async_read_some(
asio::mutable_buffers_1(vec[0]), make_read_handler(
boost::bind(&peer_connection::on_receive_data, self(), _1, _2)));
}
else
{
m_socket->async_read_some(
vec, make_read_handler(
boost::bind(&peer_connection::on_receive_data, self(), _1, _2)));
}
return 0;
}
size_t ret = 0;
if (num_bufs == 1)
{
ret = m_socket->read_some(asio::mutable_buffers_1(vec[0]), ec);
}
else
{
ret = m_socket->read_some(vec, ec);
}
#ifdef TORRENT_VERBOSE_LOGGING
(*m_logger) << time_now_string() << " *** SYNC_READ [ max: " << max_receive << " ret: " << ret << " e: " << ec.message() << " ]\n";
#endif
return ret;
} }
#ifndef TORRENT_DISABLE_ENCRYPTION #ifndef TORRENT_DISABLE_ENCRYPTION
@ -4573,6 +4626,12 @@ namespace libtorrent
, std::size_t bytes_transferred) , std::size_t bytes_transferred)
{ {
mutex::scoped_lock l(m_ses.m_mutex); mutex::scoped_lock l(m_ses.m_mutex);
on_receive_data_nolock(error, bytes_transferred);
}
void peer_connection::on_receive_data_nolock(const error_code& error
, std::size_t bytes_transferred)
{
INVARIANT_CHECK; INVARIANT_CHECK;
@ -4651,54 +4710,9 @@ namespace libtorrent
} }
if (m_recv_pos >= m_soft_packet_size) m_soft_packet_size = 0; if (m_recv_pos >= m_soft_packet_size) m_soft_packet_size = 0;
max_receive = m_packet_size - m_recv_pos;
TORRENT_ASSERT(max_receive >= 0);
if (m_soft_packet_size && max_receive > m_soft_packet_size - m_recv_pos)
max_receive = m_soft_packet_size - m_recv_pos;
int quota_left = m_quota[download_channel];
if (max_receive > quota_left)
max_receive = quota_left;
if (max_receive == 0) break; error_code ec;
bytes_transferred = try_read(read_sync, ec);
int regular_buffer_size = m_packet_size - m_disk_recv_buffer_size;
if (int(m_recv_buffer.size()) < regular_buffer_size)
m_recv_buffer.resize(regular_buffer_size);
error_code ec;
if (!m_disk_recv_buffer || regular_buffer_size >= m_recv_pos + max_receive)
{
// only receive into regular buffer
TORRENT_ASSERT(m_recv_pos + max_receive <= int(m_recv_buffer.size()));
bytes_transferred = m_socket->read_some(asio::buffer(&m_recv_buffer[m_recv_pos]
, max_receive), ec);
}
else if (m_recv_pos >= regular_buffer_size)
{
// only receive into disk buffer
TORRENT_ASSERT(m_recv_pos - regular_buffer_size >= 0);
TORRENT_ASSERT(m_recv_pos - regular_buffer_size + max_receive <= m_disk_recv_buffer_size);
bytes_transferred = m_socket->read_some(asio::buffer(m_disk_recv_buffer.get()
+ m_recv_pos - regular_buffer_size, (std::min)(m_packet_size
- m_recv_pos, max_receive)), ec);
}
else
{
// receive into both regular and disk buffer
TORRENT_ASSERT(max_receive + m_recv_pos > regular_buffer_size);
TORRENT_ASSERT(m_recv_pos < regular_buffer_size);
TORRENT_ASSERT(max_receive - regular_buffer_size
+ m_recv_pos <= m_disk_recv_buffer_size);
boost::array<asio::mutable_buffer, 2> vec;
vec[0] = asio::buffer(&m_recv_buffer[m_recv_pos]
, regular_buffer_size - m_recv_pos);
vec[1] = asio::buffer(m_disk_recv_buffer.get()
, (std::min)(m_disk_recv_buffer_size
, max_receive - regular_buffer_size + m_recv_pos));
bytes_transferred = m_socket->read_some(vec, ec);
}
if (ec && ec != asio::error::would_block) if (ec && ec != asio::error::would_block)
{ {
m_statistics.trancieve_ip_packet(bytes_in_loop, m_remote.address().is_v6()); m_statistics.trancieve_ip_packet(bytes_in_loop, m_remote.address().is_v6());
@ -4717,7 +4731,7 @@ namespace libtorrent
} }
m_statistics.trancieve_ip_packet(bytes_in_loop, m_remote.address().is_v6()); m_statistics.trancieve_ip_packet(bytes_in_loop, m_remote.address().is_v6());
setup_receive(); setup_receive();
} }
bool peer_connection::can_write() const bool peer_connection::can_write() const

View File

@ -105,9 +105,9 @@ namespace libtorrent
time_duration completion_timeout = now - m_start_time; time_duration completion_timeout = now - m_start_time;
if (m_read_timeout if (m_read_timeout
< total_seconds(receive_timeout) <= total_seconds(receive_timeout)
|| m_completion_timeout || m_completion_timeout
< total_seconds(completion_timeout)) <= total_seconds(completion_timeout))
{ {
on_timeout(); on_timeout();
return; return;