optimized the read callback to loop and exhaust the read buffer from the socket instead of making another async read call. #43

This commit is contained in:
Arvid Norberg 2007-05-10 01:50:11 +00:00
parent a5847956e1
commit f39185bb21
5 changed files with 212 additions and 20 deletions

View File

@ -44,6 +44,30 @@ public:
m_sock.async_read_some(buffers, handler);
}
template <class Mutable_Buffers>
std::size_t read_some(Mutable_Buffers const& buffers, asio::error_code& ec)
{
return m_sock.read_some(buffers, ec);
}
template <class Mutable_Buffers>
std::size_t read_some(Mutable_Buffers const& buffers)
{
return m_sock.read_some(buffers);
}
template <class IO_Control_Command>
void io_control(IO_Control_Command& ioc)
{
m_sock.io_control(ioc);
}
template <class IO_Control_Command>
void io_control(IO_Control_Command& ioc, asio::error_code& ec)
{
m_sock.io_control(ioc, ec);
}
template <class Const_Buffers, class Handler>
void async_write_some(Const_Buffers const& buffers, Handler const& handler)
{

View File

@ -41,6 +41,30 @@ public:
m_sock.async_read_some(buffers, handler);
}
template <class Mutable_Buffers>
std::size_t read_some(Mutable_Buffers const& buffers, asio::error_code& ec)
{
return m_sock.read_some(buffers, ec);
}
template <class Mutable_Buffers>
std::size_t read_some(Mutable_Buffers const& buffers)
{
return m_sock.read_some(buffers);
}
template <class IO_Control_Command>
void io_control(IO_Control_Command& ioc)
{
m_sock.io_control(ioc);
}
template <class IO_Control_Command>
void io_control(IO_Control_Command& ioc, asio::error_code& ec)
{
m_sock.io_control(ioc, ec);
}
template <class Const_Buffers, class Handler>
void async_write_some(Const_Buffers const& buffers, Handler const& handler)
{

View File

@ -43,6 +43,45 @@ namespace aux
{}
};
// -------------- io_control -----------
template<class IO_Control_Command>
struct io_control_visitor_ec: boost::static_visitor<>
{
io_control_visitor_ec(IO_Control_Command& ioc, asio::error_code& ec)
: ioc(ioc), ec(ec) {}
template <class T>
void operator()(T* p) const
{
p->io_control(ioc, ec);
}
void operator()(boost::blank) const
{}
IO_Control_Command& ioc;
asio::error_code& ec;
};
template<class IO_Control_Command>
struct io_control_visitor
: boost::static_visitor<>
{
io_control_visitor(IO_Control_Command& ioc)
: ioc(ioc) {}
template <class T>
void operator()(T* p) const
{
p->io_control(ioc);
}
void operator()(boost::blank) const
{}
IO_Control_Command& ioc;
};
// -------------- async_connect -----------
template <class EndpointType, class Handler>
@ -294,6 +333,46 @@ namespace aux
Handler const& handler;
};
// -------------- read_some -----------
template <class Mutable_Buffers>
struct read_some_visitor
: boost::static_visitor<std::size_t>
{
read_some_visitor(Mutable_Buffers const& buffers)
: buffers(buffers)
{}
template <class T>
std::size_t operator()(T* p) const
{ return p->read_some(buffers); }
std::size_t operator()(boost::blank) const
{ return 0; }
Mutable_Buffers const& buffers;
};
template <class Mutable_Buffers>
struct read_some_visitor_ec
: boost::static_visitor<std::size_t>
{
read_some_visitor_ec(Mutable_Buffers const& buffers, asio::error_code& ec)
: buffers(buffers)
, ec(ec)
{}
template <class T>
std::size_t operator()(T* p) const
{ return p->read_some(buffers, ec); }
std::size_t operator()(boost::blank) const
{ return 0; }
Mutable_Buffers const& buffers;
asio::error_code& ec;
};
// -------------- async_write_some -----------
template <class Const_Buffers, class Handler>
@ -452,6 +531,26 @@ public:
boost::apply_visitor(aux::delete_visitor(), m_variant);
}
template <class Mutable_Buffers>
std::size_t read_some(Mutable_Buffers const& buffers, asio::error_code& ec)
{
assert(instantiated());
return boost::apply_visitor(
aux::read_some_visitor_ec<Mutable_Buffers>(buffers, ec)
, m_variant
);
}
template <class Mutable_Buffers>
std::size_t read_some(Mutable_Buffers const& buffers)
{
assert(instantiated());
return boost::apply_visitor(
aux::read_some_visitor<Mutable_Buffers>(buffers)
, m_variant
);
}
template <class Mutable_Buffers, class Handler>
void async_read_some(Mutable_Buffers const& buffers, Handler const& handler)
{
@ -481,6 +580,25 @@ public:
);
}
template <class IO_Control_Command>
void io_control(IO_Control_Command& ioc)
{
assert(instantiated());
boost::apply_visitor(
aux::io_control_visitor<IO_Control_Command>(ioc), m_variant
);
}
template <class IO_Control_Command>
void io_control(IO_Control_Command& ioc, asio::error_code& ec)
{
assert(instantiated());
boost::apply_visitor(
aux::io_control_visitor_ec<IO_Control_Command>(ioc, ec)
, m_variant
);
}
void bind(endpoint_type const& endpoint)
{
assert(instantiated());

View File

@ -74,6 +74,7 @@ namespace libtorrent
delete c;
}
// outbound connection
peer_connection::peer_connection(
session_impl& ses
, boost::weak_ptr<torrent> tor
@ -147,6 +148,7 @@ namespace libtorrent
init();
}
// incoming connection
peer_connection::peer_connection(
session_impl& ses
, boost::shared_ptr<socket_type> s
@ -198,6 +200,8 @@ namespace libtorrent
, m_in_constructor(true)
#endif
{
tcp::socket::non_blocking_io ioc(true);
m_socket->io_control(ioc);
#ifndef TORRENT_DISABLE_RESOLVE_COUNTRIES
std::fill(m_country, m_country + 2, 0);
#endif
@ -2004,7 +2008,6 @@ namespace libtorrent
&& !m_connecting
&& t)
{
assert(t);
if (m_bandwidth_limit[download_channel].max_assignable() > 0)
{
#ifdef TORRENT_VERBOSE_LOGGING
@ -2076,7 +2079,6 @@ namespace libtorrent
bool m_cond;
};
// --------------------------
// RECEIVE DATA
// --------------------------
@ -2091,8 +2093,6 @@ namespace libtorrent
assert(m_reading);
m_reading = false;
// correct the dl quota usage, if not all of the buffer was actually read
m_bandwidth_limit[download_channel].use_quota(bytes_transferred);
if (error)
{
@ -2103,28 +2103,46 @@ namespace libtorrent
throw std::runtime_error(error.message());
}
if (m_disconnecting) return;
assert(m_packet_size > 0);
assert(bytes_transferred > 0);
m_last_receive = time_now();
m_recv_pos += bytes_transferred;
assert(m_recv_pos <= int(m_recv_buffer.size()));
do
{
INVARIANT_CHECK;
on_receive(error, bytes_transferred);
}
// correct the dl quota usage, if not all of the buffer was actually read
m_bandwidth_limit[download_channel].use_quota(bytes_transferred);
assert(m_packet_size > 0);
if (m_disconnecting) return;
if (m_peer_choked
&& m_recv_pos == 0
&& (m_recv_buffer.capacity() - m_packet_size) > 128)
{
std::vector<char>(m_packet_size).swap(m_recv_buffer);
assert(m_packet_size > 0);
assert(bytes_transferred > 0);
m_last_receive = time_now();
m_recv_pos += bytes_transferred;
assert(m_recv_pos <= int(m_recv_buffer.size()));
{
INVARIANT_CHECK;
on_receive(error, bytes_transferred);
}
assert(m_packet_size > 0);
if (m_peer_choked
&& m_recv_pos == 0
&& (m_recv_buffer.capacity() - m_packet_size) > 128)
{
std::vector<char>(m_packet_size).swap(m_recv_buffer);
}
if (m_bandwidth_limit[download_channel].quota_left() == 0) break;
int max_receive = std::min(
m_bandwidth_limit[download_channel].quota_left()
, m_packet_size - m_recv_pos);
asio::error_code ec;
bytes_transferred = m_socket->read_some(asio::buffer(&m_recv_buffer[m_recv_pos]
, max_receive), ec);
if (ec && ec != asio::error::would_block)
throw asio::system_error(ec);
}
while (bytes_transferred > 0);
setup_receive();
}
@ -2195,6 +2213,11 @@ namespace libtorrent
m_queued = false;
assert(m_connecting);
m_socket->open(t->get_interface().protocol());
// set the socket to non-blocking, so that we can
// read the entire buffer on each read event we get
tcp::socket::non_blocking_io ioc(true);
m_socket->io_control(ioc);
m_socket->bind(t->get_interface());
m_socket->async_connect(m_remote
, bind(&peer_connection::on_connection_complete, self(), _1));

View File

@ -1557,6 +1557,9 @@ namespace libtorrent
}
catch (std::exception& exc)
{
#ifndef NDEBUG
std::cerr << exc.what() << std::endl;
#endif
assert(false);
};