simplify and clean up utp_stream. fix missing error check in read handler in peer_connection

This commit is contained in:
Arvid Norberg 2015-02-15 00:25:56 +00:00
parent 43021f5f22
commit a831a78fd5
5 changed files with 97 additions and 53 deletions

View File

@ -51,6 +51,7 @@ namespace libtorrent
connreset_peers, connreset_peers,
connrefused_peers, connrefused_peers,
connaborted_peers, connaborted_peers,
notconnected_peers,
perm_peers, perm_peers,
buffer_peers, buffer_peers,
unreachable_peers, unreachable_peers,

View File

@ -232,19 +232,16 @@ public:
static void on_write(void* self, size_t bytes_transferred, error_code const& ec, bool kill); static void on_write(void* self, size_t bytes_transferred, error_code const& ec, bool kill);
static void on_connect(void* self, error_code const& ec, bool kill); static void on_connect(void* self, error_code const& ec, bool kill);
typedef void(*handler_t)(void*, size_t, error_code const&, bool);
typedef void(*connect_handler_t)(void*, error_code const&, bool);
void add_read_buffer(void* buf, size_t len); void add_read_buffer(void* buf, size_t len);
void set_read_handler(handler_t h); void issue_read();
void add_write_buffer(void const* buf, size_t len); void add_write_buffer(void const* buf, size_t len);
void set_write_handler(handler_t h); void issue_write();
size_t read_some(bool clear_buffers); size_t read_some(bool clear_buffers);
int send_delay() const; int send_delay() const;
int recv_delay() const; int recv_delay() const;
void do_connect(tcp::endpoint const& ep, connect_handler_t h); void do_connect(tcp::endpoint const& ep);
endpoint_type local_endpoint() const endpoint_type local_endpoint() const
{ {
@ -283,7 +280,7 @@ public:
} }
m_connect_handler = handler; m_connect_handler = handler;
do_connect(endpoint, &utp_stream::on_connect); do_connect(endpoint);
} }
template <class Mutable_Buffers, class Handler> template <class Mutable_Buffers, class Handler>
@ -320,7 +317,7 @@ public:
} }
m_read_handler = handler; m_read_handler = handler;
set_read_handler(&utp_stream::on_read); issue_read();
} }
template <class Handler> template <class Handler>
@ -335,11 +332,12 @@ public:
TORRENT_ASSERT(!m_read_handler); TORRENT_ASSERT(!m_read_handler);
if (m_read_handler) if (m_read_handler)
{ {
TORRENT_ASSERT(false); // we should never do this!
m_io_service.post(boost::bind<void>(handler, asio::error::operation_not_supported, 0)); m_io_service.post(boost::bind<void>(handler, asio::error::operation_not_supported, 0));
return; return;
} }
m_read_handler = handler; m_read_handler = handler;
set_read_handler(&utp_stream::on_read); issue_read();
} }
void do_async_connect(endpoint_type const& ep void do_async_connect(endpoint_type const& ep
@ -452,7 +450,7 @@ public:
return; return;
} }
m_write_handler = handler; m_write_handler = handler;
set_write_handler(&utp_stream::on_write); issue_write();
} }
//private: //private:

View File

@ -3952,18 +3952,33 @@ namespace libtorrent
m_counters.inc_stats_counter(counters::disconnected_peers); m_counters.inc_stats_counter(counters::disconnected_peers);
if (error == 2) m_counters.inc_stats_counter(counters::error_peers); if (error == 2) m_counters.inc_stats_counter(counters::error_peers);
if (ec == error::connection_reset) m_counters.inc_stats_counter(counters::connreset_peers);
else if (ec == error::eof) m_counters.inc_stats_counter(counters::eof_peers); if (ec == error::connection_reset)
else if (ec == error::connection_refused) m_counters.inc_stats_counter(counters::connrefused_peers); m_counters.inc_stats_counter(counters::connreset_peers);
else if (ec == error::connection_aborted) m_counters.inc_stats_counter(counters::connaborted_peers); else if (ec == error::eof)
else if (ec == error::no_permission) m_counters.inc_stats_counter(counters::perm_peers); m_counters.inc_stats_counter(counters::eof_peers);
else if (ec == error::no_buffer_space) m_counters.inc_stats_counter(counters::buffer_peers); else if (ec == error::connection_refused)
else if (ec == error::host_unreachable) m_counters.inc_stats_counter(counters::unreachable_peers); m_counters.inc_stats_counter(counters::connrefused_peers);
else if (ec == error::broken_pipe) m_counters.inc_stats_counter(counters::broken_pipe_peers); else if (ec == error::connection_aborted)
else if (ec == error::address_in_use) m_counters.inc_stats_counter(counters::addrinuse_peers); m_counters.inc_stats_counter(counters::connaborted_peers);
else if (ec == error::access_denied) m_counters.inc_stats_counter(counters::no_access_peers); else if (ec == error::not_connected)
else if (ec == error::invalid_argument) m_counters.inc_stats_counter(counters::invalid_arg_peers); m_counters.inc_stats_counter(counters::notconnected_peers);
else if (ec == error::operation_aborted) m_counters.inc_stats_counter(counters::aborted_peers); else if (ec == error::no_permission)
m_counters.inc_stats_counter(counters::perm_peers);
else if (ec == error::no_buffer_space)
m_counters.inc_stats_counter(counters::buffer_peers);
else if (ec == error::host_unreachable)
m_counters.inc_stats_counter(counters::unreachable_peers);
else if (ec == error::broken_pipe)
m_counters.inc_stats_counter(counters::broken_pipe_peers);
else if (ec == error::address_in_use)
m_counters.inc_stats_counter(counters::addrinuse_peers);
else if (ec == error::access_denied)
m_counters.inc_stats_counter(counters::no_access_peers);
else if (ec == error::invalid_argument)
m_counters.inc_stats_counter(counters::invalid_arg_peers);
else if (ec == error::operation_aborted)
m_counters.inc_stats_counter(counters::aborted_peers);
else if (ec == error_code(errors::upload_upload_connection) else if (ec == error_code(errors::upload_upload_connection)
|| ec == error_code(errors::uninteresting_upload_peer) || ec == error_code(errors::uninteresting_upload_peer)
|| ec == error_code(errors::torrent_aborted) || ec == error_code(errors::torrent_aborted)
@ -5683,6 +5698,17 @@ namespace libtorrent
// we can read from the socket, and then determine how much there // we can read from the socket, and then determine how much there
// is to read. // is to read.
if (error)
{
#if defined TORRENT_LOGGING
peer_log("*** ERROR [ in peer_connection::on_receive_data_nb error: %s ]"
, error.message().c_str());
#endif
on_receive(error, bytes_transferred);
disconnect(error, op_sock_read);
return;
}
error_code ec; error_code ec;
std::size_t buffer_size = m_socket->available(ec); std::size_t buffer_size = m_socket->available(ec);
if (ec) if (ec)
@ -5822,7 +5848,7 @@ namespace libtorrent
if (error) if (error)
{ {
#if defined TORRENT_LOGGING #if defined TORRENT_LOGGING
peer_log("*** ERROR [ in peer_connection::on_receive_data error: %s ]" peer_log("*** ERROR [ in peer_connection::on_receive_data_impl error: %s ]"
, error.message().c_str()); , error.message().c_str());
#endif #endif
trancieve_ip_packet(bytes_in_loop, m_remote.address().is_v6()); trancieve_ip_packet(bytes_in_loop, m_remote.address().is_v6());

View File

@ -60,6 +60,7 @@ namespace libtorrent
METRIC(peer, connreset_peers) METRIC(peer, connreset_peers)
METRIC(peer, connrefused_peers) METRIC(peer, connrefused_peers)
METRIC(peer, connaborted_peers) METRIC(peer, connaborted_peers)
METRIC(peer, notconnected_peers)
METRIC(peer, perm_peers) METRIC(peer, perm_peers)
METRIC(peer, buffer_peers) METRIC(peer, buffer_peers)
METRIC(peer, unreachable_peers) METRIC(peer, unreachable_peers)

View File

@ -230,9 +230,9 @@ struct utp_socket_impl
: m_sm(sm) : m_sm(sm)
, m_userdata(userdata) , m_userdata(userdata)
, m_nagle_packet(NULL) , m_nagle_packet(NULL)
, m_read_handler(0) , m_read_handler(false)
, m_write_handler(0) , m_write_handler(false)
, m_connect_handler(0) , m_connect_handler(false)
, m_remote_address() , m_remote_address()
, m_timeout(time_now_hires() + milliseconds(m_sm->connect_timeout())) , m_timeout(time_now_hires() + milliseconds(m_sm->connect_timeout()))
, m_last_history_step(time_now_hires()) , m_last_history_step(time_now_hires())
@ -413,11 +413,11 @@ public:
// async operation initiated // async operation initiated
error_code m_error; error_code m_error;
// these are the callbacks made into the utp_stream object // these indicate whether or not there is an outstanding read/write or
// on read/write/connect events // connect operation. i.e. is there upper layer subscribed to these events.
utp_stream::handler_t m_read_handler; bool m_read_handler;
utp_stream::handler_t m_write_handler; bool m_write_handler;
utp_stream::connect_handler_t m_connect_handler; bool m_connect_handler;
// the address of the remote endpoint // the address of the remote endpoint
address m_remote_address; address m_remote_address;
@ -871,11 +871,15 @@ void utp_stream::on_read(void* self, size_t bytes_transferred, error_code const&
TORRENT_ASSERT(bytes_transferred > 0 || ec || s->m_impl->m_null_buffers); TORRENT_ASSERT(bytes_transferred > 0 || ec || s->m_impl->m_null_buffers);
s->m_io_service.post(boost::bind<void>(s->m_read_handler, ec, bytes_transferred)); s->m_io_service.post(boost::bind<void>(s->m_read_handler, ec, bytes_transferred));
s->m_read_handler.clear(); s->m_read_handler.clear();
// boost::function2<void, error_code const&, std::size_t> tmp;
// tmp.swap(s->m_read_handler);
if (kill && s->m_impl) if (kill && s->m_impl)
{ {
TORRENT_ASSERT(ec);
detach_utp_impl(s->m_impl); detach_utp_impl(s->m_impl);
s->m_impl = 0; s->m_impl = 0;
} }
// tmp(ec, bytes_transferred);
} }
void utp_stream::on_write(void* self, size_t bytes_transferred, error_code const& ec, bool kill) void utp_stream::on_write(void* self, size_t bytes_transferred, error_code const& ec, bool kill)
@ -889,11 +893,15 @@ void utp_stream::on_write(void* self, size_t bytes_transferred, error_code const
TORRENT_ASSERT(bytes_transferred > 0 || ec); TORRENT_ASSERT(bytes_transferred > 0 || ec);
s->m_io_service.post(boost::bind<void>(s->m_write_handler, ec, bytes_transferred)); s->m_io_service.post(boost::bind<void>(s->m_write_handler, ec, bytes_transferred));
s->m_write_handler.clear(); s->m_write_handler.clear();
// boost::function2<void, error_code const&, std::size_t> tmp;
// tmp.swap(s->m_read_handler);
if (kill && s->m_impl) if (kill && s->m_impl)
{ {
TORRENT_ASSERT(ec);
detach_utp_impl(s->m_impl); detach_utp_impl(s->m_impl);
s->m_impl = 0; s->m_impl = 0;
} }
// tmp(ec, bytes_transferred);
} }
void utp_stream::on_connect(void* self, error_code const& ec, bool kill) void utp_stream::on_connect(void* self, error_code const& ec, bool kill)
@ -907,12 +915,15 @@ void utp_stream::on_connect(void* self, error_code const& ec, bool kill)
TORRENT_ASSERT(s->m_connect_handler); TORRENT_ASSERT(s->m_connect_handler);
s->m_io_service.post(boost::bind<void>(s->m_connect_handler, ec)); s->m_io_service.post(boost::bind<void>(s->m_connect_handler, ec));
s->m_connect_handler.clear(); s->m_connect_handler.clear();
// boost::function1<void, error_code const&> tmp;
// s->m_connect_handler.swap(tmp);
if (kill && s->m_impl) if (kill && s->m_impl)
{ {
TORRENT_ASSERT(ec); TORRENT_ASSERT(ec);
detach_utp_impl(s->m_impl); detach_utp_impl(s->m_impl);
s->m_impl = 0; s->m_impl = 0;
} }
// tmp(ec);
} }
void utp_stream::add_read_buffer(void* buf, size_t len) void utp_stream::add_read_buffer(void* buf, size_t len)
@ -968,13 +979,14 @@ void utp_stream::add_write_buffer(void const* buf, size_t len)
// do is to copy any data stored in m_receive_buffer into the user // do is to copy any data stored in m_receive_buffer into the user
// provided buffer. This might be enough to in turn trigger the read // provided buffer. This might be enough to in turn trigger the read
// handler immediately. // handler immediately.
void utp_stream::set_read_handler(handler_t h) void utp_stream::issue_read()
{ {
TORRENT_ASSERT(m_impl->m_userdata); TORRENT_ASSERT(m_impl->m_userdata);
TORRENT_ASSERT(!m_impl->m_read_handler);
m_impl->m_null_buffers = m_impl->m_read_buffer_size == 0; m_impl->m_null_buffers = m_impl->m_read_buffer_size == 0;
m_impl->m_read_handler = h; m_impl->m_read_handler = true;
if (m_impl->test_socket_state()) return; if (m_impl->test_socket_state()) return;
UTP_LOGV("%8p: new read handler. %d bytes in buffer\n" UTP_LOGV("%8p: new read handler. %d bytes in buffer\n"
@ -1075,15 +1087,16 @@ size_t utp_stream::read_some(bool clear_buffers)
// this is called when all user provided write buffers have been // this is called when all user provided write buffers have been
// added. Start trying to send packets with the payload immediately. // added. Start trying to send packets with the payload immediately.
void utp_stream::set_write_handler(handler_t h) void utp_stream::issue_write()
{ {
UTP_LOGV("%8p: new write handler. %d bytes to write\n" UTP_LOGV("%8p: new write handler. %d bytes to write\n"
, m_impl, m_impl->m_write_buffer_size); , m_impl, m_impl->m_write_buffer_size);
TORRENT_ASSERT(m_impl->m_write_buffer_size > 0); TORRENT_ASSERT(m_impl->m_write_buffer_size > 0);
TORRENT_ASSERT(m_impl->m_write_handler == false);
TORRENT_ASSERT(m_impl->m_userdata); TORRENT_ASSERT(m_impl->m_userdata);
m_impl->m_write_handler = h;
m_impl->m_write_handler = true;
m_impl->m_written = 0; m_impl->m_written = 0;
if (m_impl->test_socket_state()) return; if (m_impl->test_socket_state()) return;
@ -1097,15 +1110,16 @@ void utp_stream::set_write_handler(handler_t h)
if (m_impl) m_impl->maybe_trigger_send_callback(); if (m_impl) m_impl->maybe_trigger_send_callback();
} }
void utp_stream::do_connect(tcp::endpoint const& ep, utp_stream::connect_handler_t handler) void utp_stream::do_connect(tcp::endpoint const& ep)
{ {
int link_mtu, utp_mtu; int link_mtu, utp_mtu;
m_impl->m_sm->mtu_for_dest(ep.address(), link_mtu, utp_mtu); m_impl->m_sm->mtu_for_dest(ep.address(), link_mtu, utp_mtu);
m_impl->init_mtu(link_mtu, utp_mtu); m_impl->init_mtu(link_mtu, utp_mtu);
TORRENT_ASSERT(m_impl->m_connect_handler == 0); TORRENT_ASSERT(m_impl->m_connect_handler == false);
m_impl->m_remote_address = ep.address(); m_impl->m_remote_address = ep.address();
m_impl->m_port = ep.port(); m_impl->m_port = ep.port();
m_impl->m_connect_handler = handler;
m_impl->m_connect_handler = true;
error_code ec; error_code ec;
m_impl->m_local_address = m_impl->m_sm->local_endpoint(m_impl->m_remote_address, ec).address(); m_impl->m_local_address = m_impl->m_sm->local_endpoint(m_impl->m_remote_address, ec).address();
@ -1183,15 +1197,15 @@ void utp_socket_impl::maybe_trigger_receive_callback()
{ {
INVARIANT_CHECK; INVARIANT_CHECK;
if (m_read_handler == 0) return; if (m_read_handler == false) return;
// nothing has been read or there's no outstanding read operation // nothing has been read or there's no outstanding read operation
if (m_null_buffers && m_receive_buffer_size == 0) return; if (m_null_buffers && m_receive_buffer_size == 0) return;
else if (!m_null_buffers && m_read == 0) return; else if (!m_null_buffers && m_read == 0) return;
UTP_LOGV("%8p: calling read handler read:%d\n", this, m_read); UTP_LOGV("%8p: calling read handler read:%d\n", this, m_read);
m_read_handler(m_userdata, m_read, m_error, false); m_read_handler = false;
m_read_handler = 0; utp_stream::on_read(m_userdata, m_read, m_error, false);
m_read = 0; m_read = 0;
m_read_buffer_size = 0; m_read_buffer_size = 0;
m_read_buffer.clear(); m_read_buffer.clear();
@ -1202,12 +1216,12 @@ void utp_socket_impl::maybe_trigger_send_callback()
INVARIANT_CHECK; INVARIANT_CHECK;
// nothing has been written or there's no outstanding write operation // nothing has been written or there's no outstanding write operation
if (m_written == 0 || m_write_handler == 0) return; if (m_written == 0 || m_write_handler == false) return;
UTP_LOGV("%8p: calling write handler written:%d\n", this, m_written); UTP_LOGV("%8p: calling write handler written:%d\n", this, m_written);
m_write_handler(m_userdata, m_written, m_error, false); m_write_handler = false;
m_write_handler = 0; utp_stream::on_write(m_userdata, m_written, m_error, false);
m_written = 0; m_written = 0;
m_write_buffer_size = 0; m_write_buffer_size = 0;
m_write_buffer.clear(); m_write_buffer.clear();
@ -2351,12 +2365,16 @@ bool utp_socket_impl::cancel_handlers(error_code const& ec, bool kill)
// calling the callbacks with m_userdata being 0 will just crash // calling the callbacks with m_userdata being 0 will just crash
TORRENT_ASSERT((ret && bool(m_userdata)) || !ret); TORRENT_ASSERT((ret && bool(m_userdata)) || !ret);
if (m_read_handler) m_read_handler(m_userdata, 0, ec, kill); bool read = m_read_handler;
m_read_handler = 0; bool write = m_write_handler;
if (m_write_handler) m_write_handler(m_userdata, 0, ec, kill); bool connect = m_connect_handler;
m_write_handler = 0; m_read_handler = false;
if (m_connect_handler) m_connect_handler(m_userdata, ec, kill); m_write_handler = false;
m_connect_handler = 0; m_connect_handler = false;
if (read) utp_stream::on_read(m_userdata, 0, ec, kill);
if (write) utp_stream::on_write(m_userdata, 0, ec, kill);
if (connect) utp_stream::on_connect(m_userdata, ec, kill);
return ret; return ret;
} }
@ -2986,9 +3004,9 @@ bool utp_socket_impl::incoming_packet(boost::uint8_t const* buf, int size
if (m_connect_handler) if (m_connect_handler)
{ {
UTP_LOGV("%8p: calling connect handler\n", this); UTP_LOGV("%8p: calling connect handler\n", this);
m_connect_handler(m_userdata, m_error, false); m_connect_handler = false;
utp_stream::on_connect(m_userdata, m_error, false);
} }
m_connect_handler = 0;
// fall through // fall through
} }
case UTP_STATE_CONNECTED: case UTP_STATE_CONNECTED: