diff --git a/include/libtorrent/aux_/session_impl.hpp b/include/libtorrent/aux_/session_impl.hpp index f7875a15d..8aee8716c 100644 --- a/include/libtorrent/aux_/session_impl.hpp +++ b/include/libtorrent/aux_/session_impl.hpp @@ -188,6 +188,7 @@ namespace libtorrent , dht::dht_observer , boost::noncopyable , initialize_timer + , udp_socket_observer , boost::enable_shared_from_this { #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING @@ -884,13 +885,8 @@ namespace libtorrent deadline_timer m_dht_announce_timer; #endif - void on_receive_udp(error_code const& e - , udp::endpoint const& ep, char const* buf, int len); - - void on_receive_udp_hostname(error_code const& e - , char const* hostname, char const* buf, int len); - - void on_udp_socket_drained(); + bool incoming_packet(error_code const& ec + , udp::endpoint const&, char const* buf, int size); // see m_external_listen_port. This is the same // but for the udp port used by the DHT. diff --git a/include/libtorrent/kademlia/dht_tracker.hpp b/include/libtorrent/kademlia/dht_tracker.hpp index 8edad64e8..b7228985a 100644 --- a/include/libtorrent/kademlia/dht_tracker.hpp +++ b/include/libtorrent/kademlia/dht_tracker.hpp @@ -71,11 +71,12 @@ namespace libtorrent { namespace dht TORRENT_EXTRA_EXPORT void intrusive_ptr_add_ref(dht_tracker const*); TORRENT_EXTRA_EXPORT void intrusive_ptr_release(dht_tracker const*); - struct dht_tracker : udp_socket_interface + struct dht_tracker : udp_socket_interface, udp_socket_observer { friend void intrusive_ptr_add_ref(dht_tracker const*); friend void intrusive_ptr_release(dht_tracker const*); + // TODO: take a udp_socket_interface here instead. Move udp_socket_interface down into libtorrent core dht_tracker(libtorrent::aux::session_impl& ses, rate_limited_udp_socket& sock , dht_settings const& settings, entry const* state = 0); @@ -96,8 +97,8 @@ namespace libtorrent { namespace dht // translate bittorrent kademlia message into the generic kademlia message // used by the library - void on_receive(udp::endpoint const& ep, char const* pkt, int size); - void on_unreachable(udp::endpoint const& ep); + virtual bool incoming_packet(error_code const& ec + , udp::endpoint const&, char const* buf, int size); private: diff --git a/include/libtorrent/tracker_manager.hpp b/include/libtorrent/tracker_manager.hpp index 1f9e4faff..470116f9a 100644 --- a/include/libtorrent/tracker_manager.hpp +++ b/include/libtorrent/tracker_manager.hpp @@ -63,6 +63,7 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/intrusive_ptr_base.hpp" #include "libtorrent/size_type.hpp" #include "libtorrent/union_endpoint.hpp" +#include "libtorrent/udp_socket.hpp" // for udp_socket_observer #ifdef TORRENT_USE_OPENSSL #include #endif @@ -263,7 +264,7 @@ namespace libtorrent const tracker_request m_req; }; - class TORRENT_EXTRA_EXPORT tracker_manager: boost::noncopyable + class TORRENT_EXTRA_EXPORT tracker_manager: public udp_socket_observer, boost::noncopyable { public: @@ -289,11 +290,13 @@ namespace libtorrent void sent_bytes(int bytes); void received_bytes(int bytes); - bool incoming_udp(error_code const& e, udp::endpoint const& ep, char const* buf, int size); + virtual bool incoming_packet(error_code const& e, udp::endpoint const& ep + , char const* buf, int size); // this is only used for SOCKS packets, since // they may be addressed to hostname - bool incoming_udp(error_code const& e, char const* hostname, char const* buf, int size); + virtual bool incoming_packet(error_code const& e, char const* hostname + , char const* buf, int size); private: diff --git a/include/libtorrent/udp_socket.hpp b/include/libtorrent/udp_socket.hpp index 1be888720..7579e8fcb 100644 --- a/include/libtorrent/udp_socket.hpp +++ b/include/libtorrent/udp_socket.hpp @@ -42,25 +42,28 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/deadline_timer.hpp" #include -#include -#include namespace libtorrent { class connection_queue; + struct udp_socket_observer + { + // return true if the packet was handled (it won't be + // propagated to the next observer) + virtual bool incoming_packet(error_code const& ec + , udp::endpoint const&, char const* buf, int size) = 0; + virtual bool incoming_packet(error_code const& ec + , char const* hostname, char const* buf, int size) { return false; } + + // called every time the socket is drained of packets + virtual void socket_drained() {} + }; + class udp_socket { public: - // TODO: instead of these callbacks, support observers - typedef boost::function callback_t; - typedef boost::function callback2_t; - typedef boost::function drain_callback_t; - - udp_socket(io_service& ios, callback_t const& c, callback2_t const& c2 - , drain_callback_t const& dc, connection_queue& cc); + udp_socket(io_service& ios, connection_queue& cc); ~udp_socket(); enum flags_t { dont_drop = 1, peer_connection = 2 }; @@ -75,6 +78,9 @@ namespace libtorrent } io_service& get_io_service() { return m_ipv4_sock.get_io_service(); } + void subscribe(udp_socket_observer* o); + void unsubscribe(udp_socket_observer* o); + // this is only valid when using a socks5 proxy void send_hostname(char const* hostname, int port, char const* p, int len, error_code& ec); @@ -140,15 +146,11 @@ namespace libtorrent public: #endif - // callback for regular incoming packets - callback_t m_callback; + std::vector m_observers; - // callback for proxied incoming packets with a domain - // name as source - callback2_t m_callback2; - - // called every time we drain the udp sockets - drain_callback_t m_drained_callback; + void call_handler(error_code const& ec, udp::endpoint const& ep, char const* buf, int size); + void call_handler(error_code const& ec, const char* host, char const* buf, int size); + void call_drained_handler(); void setup_read(udp::socket* s); void on_read(udp::socket* s); @@ -171,8 +173,6 @@ namespace libtorrent void wrap(char const* hostname, int port, char const* p, int len, error_code& ec); void unwrap(error_code const& e, char const* buf, int size); - bool maybe_clear_callback(); - #if defined TORRENT_DEBUG || TORRENT_RELEASE_ASSERTS #if defined BOOST_HAS_PTHREADS mutable pthread_t m_thread; @@ -230,8 +230,7 @@ namespace libtorrent struct rate_limited_udp_socket : public udp_socket { - rate_limited_udp_socket(io_service& ios, callback_t const& c - , callback2_t const& c2, drain_callback_t const& dc, connection_queue& cc); + rate_limited_udp_socket(io_service& ios, connection_queue& cc); void set_rate_limit(int limit) { m_rate_limit = limit; } bool can_send() const { return int(m_queue.size()) >= m_queue_size_limit; } bool send(udp::endpoint const& ep, char const* p, int len, error_code& ec, int flags = 0); diff --git a/include/libtorrent/utp_socket_manager.hpp b/include/libtorrent/utp_socket_manager.hpp index d25e499f5..94aa024e1 100644 --- a/include/libtorrent/utp_socket_manager.hpp +++ b/include/libtorrent/utp_socket_manager.hpp @@ -47,7 +47,7 @@ namespace libtorrent typedef boost::function const&)> incoming_utp_callback_t; - struct utp_socket_manager + struct utp_socket_manager : udp_socket_observer { utp_socket_manager(session_settings const& sett, udp_socket& s, incoming_utp_callback_t cb); ~utp_socket_manager(); @@ -55,8 +55,12 @@ namespace libtorrent void get_status(utp_status& s) const; // return false if this is not a uTP packet - bool incoming_packet(char const* p, int size, udp::endpoint const& ep); - void socket_drained(); + virtual bool incoming_packet(error_code const& ec, udp::endpoint const& ep + , char const* p, int size); + virtual bool incoming_packet(error_code const& ec, char const* host, char const* p, int size) + { return false; } + + virtual void socket_drained(); void tick(ptime now); diff --git a/src/kademlia/dht_tracker.cpp b/src/kademlia/dht_tracker.cpp index e2ef8bc76..a2ba36a37 100644 --- a/src/kademlia/dht_tracker.cpp +++ b/src/kademlia/dht_tracker.cpp @@ -421,17 +421,26 @@ namespace libtorrent { namespace dht } - void dht_tracker::on_unreachable(udp::endpoint const& ep) - { - m_dht.unreachable(ep); - } - // translate bittorrent kademlia message into the generice kademlia message // used by the library - void dht_tracker::on_receive(udp::endpoint const& ep, char const* buf, int bytes_transferred) + bool dht_tracker::incoming_packet(error_code const& ec + , udp::endpoint const& ep, char const* buf, int size) { + if (ec) + { + if (ec == asio::error::connection_refused + || ec == asio::error::connection_reset + || ec == asio::error::connection_aborted) + { + m_dht.unreachable(ep); + } + return false; + } + + if (size <= 20 || *buf != 'd' || buf[size-1] != 'e') return false; + // account for IP and UDP overhead - m_received_bytes += bytes_transferred + (ep.address().is_v6() ? 48 : 28); + m_received_bytes += size + (ep.address().is_v6() ? 48 : 28); node_ban_entry* match = 0; node_ban_entry* min = m_ban_nodes; @@ -464,7 +473,7 @@ namespace libtorrent { namespace dht // we've received 20 messages in less than 5 seconds from // this node. Ignore it until it's silent for 5 minutes match->limit = now + minutes(5); - return; + return true; } // we got 50 messages from this peer, but it was in @@ -488,19 +497,19 @@ namespace libtorrent { namespace dht using libtorrent::entry; using libtorrent::bdecode; - TORRENT_ASSERT(bytes_transferred > 0); + TORRENT_ASSERT(size > 0); lazy_entry e; int pos; - error_code ec; - int ret = lazy_bdecode(buf, buf + bytes_transferred, e, ec, &pos, 10, 500); + error_code err; + int ret = lazy_bdecode(buf, buf + size, e, err, &pos, 10, 500); if (ret != 0) { #ifdef TORRENT_DHT_VERBOSE_LOGGING TORRENT_LOG(dht_tracker) << "<== " << ep << " ERROR: " - << ec.message() << " pos: " << pos; + << err.message() << " pos: " << pos; #endif - return; + return false; } libtorrent::dht::msg m(e, ep); @@ -516,7 +525,7 @@ namespace libtorrent { namespace dht // entry r; // libtorrent::dht::incoming_error(r, "message is not a dictionary"); // send_packet(r, ep, 0); - return; + return false; } #ifdef TORRENT_DHT_VERBOSE_LOGGING @@ -525,6 +534,7 @@ namespace libtorrent { namespace dht #endif m_dht.incoming(m); + return true; } void add_node_fun(void* userdata, node_entry const& e) diff --git a/src/session_impl.cpp b/src/session_impl.cpp index 9690189f0..d7801fddd 100644 --- a/src/session_impl.cpp +++ b/src/session_impl.cpp @@ -641,11 +641,7 @@ namespace aux { , m_dht_announce_timer(m_io_service) #endif , m_external_udp_port(0) - , m_udp_socket(m_io_service - , boost::bind(&session_impl::on_receive_udp, this, _1, _2, _3, _4) - , boost::bind(&session_impl::on_receive_udp_hostname, this, _1, _2, _3, _4) - , boost::bind(&session_impl::on_udp_socket_drained, this) - , m_half_open) + , m_udp_socket(m_io_service, m_half_open) , m_utp_socket_manager(m_settings, m_udp_socket , boost::bind(&session_impl::incoming_connection, this, _1)) , m_boost_connections(0) @@ -670,6 +666,10 @@ namespace aux { memset(m_redundant_bytes, 0, sizeof(m_redundant_bytes)); m_udp_socket.set_rate_limit(m_settings.dht_upload_rate_limit); + m_udp_socket.subscribe(&m_tracker_manager); + m_udp_socket.subscribe(&m_utp_socket_manager); + m_udp_socket.subscribe(this); + m_disk_queues[0] = 0; m_disk_queues[1] = 0; @@ -2448,65 +2448,20 @@ namespace aux { } #endif - void session_impl::on_receive_udp(error_code const& e - , udp::endpoint const& ep, char const* buf, int len) + bool session_impl::incoming_packet(error_code const& ec + , udp::endpoint const& ep, char const* buf, int size) { #ifdef TORRENT_STATS ++m_num_messages[on_udp_counter]; #endif - if (e) - { - if (e == asio::error::connection_refused - || e == asio::error::connection_reset - || e == asio::error::connection_aborted) - { -#ifndef TORRENT_DISABLE_DHT - if (m_dht) m_dht->on_unreachable(ep); -#endif - if (m_tracker_manager.incoming_udp(e, ep, buf, len)) - m_stat.received_tracker_bytes(len + 28); - } + if (ec) + { // don't bubble up operation aborted errors to the user - if (e != asio::error::operation_aborted + if (ec != asio::error::operation_aborted && m_alerts.should_post()) - m_alerts.post_alert(udp_error_alert(ep, e)); - return; + m_alerts.post_alert(udp_error_alert(ep, ec)); } - -#ifndef TORRENT_DISABLE_DHT - if (len > 20 && *buf == 'd' && buf[len-1] == 'e' && m_dht) - { - // this is probably a dht message - m_dht->on_receive(ep, buf, len); - return; - } -#endif - - if (m_utp_socket_manager.incoming_packet(buf, len, ep)) - return; - - // maybe it's a udp tracker response - if (m_tracker_manager.incoming_udp(e, ep, buf, len)) - m_stat.received_tracker_bytes(len + 28); - } - - void session_impl::on_receive_udp_hostname(error_code const& e - , char const* hostname, char const* buf, int len) - { - // it's probably a udp tracker response - if (m_tracker_manager.incoming_udp(e, hostname, buf, len)) - { - m_stat.received_tracker_bytes(len + 28); - } - } - - // this is called every time all packets have been read from - // the udp socket. The utp_socket_manager uses this event to - // trigger a flush of deferred ACKs - void session_impl::on_udp_socket_drained() - { - m_utp_socket_manager.socket_drained(); } void session_impl::async_accept(boost::shared_ptr const& listener, bool ssl) @@ -5405,6 +5360,8 @@ namespace aux { m_dht->start(startup_state); + m_udp_socket.subscribe(m_dht.get()); + // announce all torrents we have to the DHT for (torrent_map::const_iterator i = m_torrents.begin() , end(m_torrents.end()); i != end; ++i) @@ -5416,6 +5373,7 @@ namespace aux { void session_impl::stop_dht() { if (!m_dht) return; + m_udp_socket.unsubscribe(m_dht.get()); m_dht->stop(); m_dht = 0; } @@ -5543,6 +5501,10 @@ namespace aux { if (m_thread) m_thread->join(); + m_udp_socket.unsubscribe(this); + m_udp_socket.unsubscribe(&m_utp_socket_manager); + m_udp_socket.unsubscribe(&m_tracker_manager); + TORRENT_ASSERT(m_torrents.empty()); TORRENT_ASSERT(m_connections.empty()); TORRENT_ASSERT(m_connections.empty()); diff --git a/src/tracker_manager.cpp b/src/tracker_manager.cpp index 622dce3e1..b5e22bb05 100644 --- a/src/tracker_manager.cpp +++ b/src/tracker_manager.cpp @@ -274,9 +274,10 @@ namespace libtorrent con->start(); } - bool tracker_manager::incoming_udp(error_code const& e + bool tracker_manager::incoming_packet(error_code const& e , udp::endpoint const& ep, char const* buf, int size) { + // m_ses.m_stat.received_tracker_bytes(len + 28); for (tracker_connections_t::iterator i = m_connections.begin(); i != m_connections.end();) { @@ -288,9 +289,10 @@ namespace libtorrent return false; } - bool tracker_manager::incoming_udp(error_code const& e + bool tracker_manager::incoming_packet(error_code const& e , char const* hostname, char const* buf, int size) { + // m_ses.m_stat.received_tracker_bytes(len + 28); for (tracker_connections_t::iterator i = m_connections.begin(); i != m_connections.end();) { diff --git a/src/udp_socket.cpp b/src/udp_socket.cpp index 57bedb16b..4cebfd921 100644 --- a/src/udp_socket.cpp +++ b/src/udp_socket.cpp @@ -53,14 +53,8 @@ POSSIBILITY OF SUCH DAMAGE. using namespace libtorrent; udp_socket::udp_socket(asio::io_service& ios - , udp_socket::callback_t const& c - , udp_socket::callback2_t const& c2 - , udp_socket::drain_callback_t const& dc , connection_queue& cc) - : m_callback(c) - , m_callback2(c2) - , m_drained_callback(dc) - , m_ipv4_sock(ios) + : m_ipv4_sock(ios) , m_buf_size(0) , m_buf(0) #if TORRENT_USE_IPV6 @@ -101,7 +95,6 @@ udp_socket::~udp_socket() #endif TORRENT_ASSERT_VAL(m_v4_outstanding == 0, m_v4_outstanding); TORRENT_ASSERT(m_magic == 0x1337); - TORRENT_ASSERT(!m_callback || !m_started); #if defined TORRENT_DEBUG || TORRENT_RELEASE_ASSERTS m_magic = 0; #endif @@ -150,21 +143,6 @@ void udp_socket::send_hostname(char const* hostname, int port qp.flags = 0; } -bool udp_socket::maybe_clear_callback() -{ - if (m_outstanding_ops + m_v4_outstanding -#if TORRENT_USE_IPV6 - + m_v6_outstanding -#endif - == 0) - { - // "this" may be destructed in the callback - m_callback.clear(); - return true; - } - return false; -} - void udp_socket::send(udp::endpoint const& ep, char const* p, int len , error_code& ec, int flags) { @@ -230,14 +208,9 @@ void udp_socket::on_read(udp::socket* s) --m_v4_outstanding; } - if (m_abort) - { - maybe_clear_callback(); - return; - } + if (m_abort) return; CHECK_MAGIC; - if (!m_callback) return; for (;;) { @@ -247,10 +220,62 @@ void udp_socket::on_read(udp::socket* s) if (ec == asio::error::would_block) break; on_read_impl(s, ep, ec, bytes_transferred); } - if (m_drained_callback) m_drained_callback(); + call_drained_handler(); setup_read(s); } +void udp_socket::call_handler(error_code const& ec, udp::endpoint const& ep, char const* buf, int size) +{ + for (std::vector::iterator i = m_observers.begin() + , end(m_observers.end()); i != end; ++i) + { + TORRENT_TRY { + + if ((*i)->incoming_packet(ec, ep, buf, size)) + break; + + } TORRENT_CATCH (std::exception&) {} + } +} + +void udp_socket::call_handler(error_code const& ec, const char* host, char const* buf, int size) +{ + for (std::vector::iterator i = m_observers.begin() + , end(m_observers.end()); i != end; ++i) + { + TORRENT_TRY { + + if ((*i)->incoming_packet(ec, host, buf, size)) + break; + + } TORRENT_CATCH (std::exception&) {} + } +} + +void udp_socket::subscribe(udp_socket_observer* o) +{ + TORRENT_ASSERT(std::find(m_observers.begin(), m_observers.end(), o) == m_observers.end()); + m_observers.push_back(o); +} + +void udp_socket::unsubscribe(udp_socket_observer* o) +{ + std::vector::iterator i = std::find(m_observers.begin(), m_observers.end(), o); + if (i == m_observers.end()) return; + m_observers.erase(i); +} + +void udp_socket::call_drained_handler() +{ + for (std::vector::iterator i = m_observers.begin() + , end(m_observers.end()); i != end; ++i) + { + TORRENT_TRY { + (*i)->socket_drained(); + } TORRENT_CATCH (std::exception&) {} + } +} + void udp_socket::on_read_impl(udp::socket* s, udp::endpoint const& ep , error_code const& e, std::size_t bytes_transferred) { @@ -259,16 +284,7 @@ void udp_socket::on_read_impl(udp::socket* s, udp::endpoint const& ep if (e) { - TORRENT_TRY { - -#if TORRENT_USE_IPV6 - if (s == &m_ipv6_sock) - m_callback(e, ep, 0, 0); - else -#endif - m_callback(e, ep, 0, 0); - - } TORRENT_CATCH (std::exception&) {} + call_handler(e, ep, 0, 0); // don't stop listening on recoverable errors if (e != asio::error::host_unreachable @@ -283,7 +299,6 @@ void udp_socket::on_read_impl(udp::socket* s, udp::endpoint const& ep #endif && e != asio::error::message_size) { - maybe_clear_callback(); return; } @@ -302,7 +317,7 @@ void udp_socket::on_read_impl(udp::socket* s, udp::endpoint const& ep } else { - m_callback(e, ep, m_buf, bytes_transferred); + call_handler(e, ep, m_buf, bytes_transferred); } } TORRENT_CATCH (std::exception&) {} @@ -421,11 +436,11 @@ void udp_socket::unwrap(error_code const& e, char const* buf, int size) if (len > (buf + size) - p) return; std::string hostname(p, p + len); p += len; - m_callback2(e, hostname.c_str(), p, size - (p - buf)); + call_handler(e, hostname.c_str(), p, size - (p - buf)); return; } - m_callback(e, sender, p, size - (p - buf)); + call_handler(e, sender, p, size - (p - buf)); } #if !defined BOOST_ASIO_ENABLE_CANCELIO && defined TORRENT_WINDOWS @@ -472,14 +487,9 @@ void udp_socket::close() // ops counter for that TORRENT_ASSERT(m_outstanding_ops > 0); --m_outstanding_ops; - if (m_abort) - { - maybe_clear_callback(); - return; - } + if (m_abort) return; } - maybe_clear_callback(); } void udp_socket::set_buf_size(int s) @@ -503,7 +513,7 @@ void udp_socket::set_buf_size(int s) m_buf = 0; m_buf_size = 0; udp::endpoint ep; - if (m_callback) m_callback(error::no_memory, ep, 0, 0); + call_handler(error::no_memory, ep, 0, 0); close(); } } @@ -625,11 +635,7 @@ void udp_socket::on_name_lookup(error_code const& e, tcp::resolver::iterator i) TORRENT_ASSERT(m_outstanding_ops > 0); --m_outstanding_ops; - if (m_abort) - { - maybe_clear_callback(); - return; - } + if (m_abort) return; CHECK_MAGIC; if (e == asio::error::operation_aborted) return; @@ -638,9 +644,7 @@ void udp_socket::on_name_lookup(error_code const& e, tcp::resolver::iterator i) if (e) { - TORRENT_TRY { - if (m_callback) m_callback(e, udp::endpoint(), 0, 0); - } TORRENT_CATCH (std::exception&) {} + call_handler(e, udp::endpoint(), 0, 0); return; } @@ -662,11 +666,7 @@ void udp_socket::on_timeout() { TORRENT_ASSERT(m_outstanding_ops > 0); --m_outstanding_ops; - if (m_abort) - { - maybe_clear_callback(); - return; - } + if (m_abort) return; CHECK_MAGIC; TORRENT_ASSERT(is_single_thread()); @@ -680,11 +680,6 @@ void udp_socket::on_connect(int ticket) TORRENT_ASSERT(is_single_thread()); TORRENT_ASSERT(m_outstanding_ops > 0); --m_outstanding_ops; - if (m_abort) - { - maybe_clear_callback(); - return; - } CHECK_MAGIC; if (m_abort) return; @@ -715,11 +710,7 @@ void udp_socket::on_connected(error_code const& e) #endif TORRENT_ASSERT(m_outstanding_ops > 0); --m_outstanding_ops; - if (m_abort) - { - maybe_clear_callback(); - return; - } + if (m_abort) return; CHECK_MAGIC; @@ -734,17 +725,11 @@ void udp_socket::on_connected(error_code const& e) // ops counter for that TORRENT_ASSERT(m_outstanding_ops > 0); --m_outstanding_ops; - if (m_abort) - { - maybe_clear_callback(); - return; - } + if (m_abort) return; if (e) { - TORRENT_TRY { - if (m_callback) m_callback(e, udp::endpoint(), 0, 0); - } TORRENT_CATCH (std::exception&) {} + call_handler(e, udp::endpoint(), 0, 0); return; } @@ -781,11 +766,7 @@ void udp_socket::handshake1(error_code const& e) #endif TORRENT_ASSERT(m_outstanding_ops > 0); --m_outstanding_ops; - if (m_abort) - { - maybe_clear_callback(); - return; - } + if (m_abort) return; CHECK_MAGIC; if (e) return; @@ -807,11 +788,7 @@ void udp_socket::handshake2(error_code const& e) #endif TORRENT_ASSERT(m_outstanding_ops > 0); --m_outstanding_ops; - if (m_abort) - { - maybe_clear_callback(); - return; - } + if (m_abort) return; CHECK_MAGIC; if (e) return; @@ -869,11 +846,7 @@ void udp_socket::handshake3(error_code const& e) #endif TORRENT_ASSERT(m_outstanding_ops > 0); --m_outstanding_ops; - if (m_abort) - { - maybe_clear_callback(); - return; - } + if (m_abort) return; CHECK_MAGIC; if (e) return; @@ -895,11 +868,7 @@ void udp_socket::handshake4(error_code const& e) #endif TORRENT_ASSERT(m_outstanding_ops > 0); --m_outstanding_ops; - if (m_abort) - { - maybe_clear_callback(); - return; - } + if (m_abort) return; CHECK_MAGIC; if (e) return; @@ -958,11 +927,7 @@ void udp_socket::connect1(error_code const& e) #endif TORRENT_ASSERT(m_outstanding_ops > 0); --m_outstanding_ops; - if (m_abort) - { - maybe_clear_callback(); - return; - } + if (m_abort) return; CHECK_MAGIC; if (e) return; @@ -987,7 +952,6 @@ void udp_socket::connect2(error_code const& e) if (m_abort) { m_queue.clear(); - maybe_clear_callback(); return; } @@ -1062,11 +1026,7 @@ void udp_socket::hung_up(error_code const& e) #endif TORRENT_ASSERT(m_outstanding_ops > 0); --m_outstanding_ops; - if (m_abort) - { - maybe_clear_callback(); - return; - } + if (m_abort) return; CHECK_MAGIC; TORRENT_ASSERT(is_single_thread()); @@ -1078,11 +1038,8 @@ void udp_socket::hung_up(error_code const& e) } rate_limited_udp_socket::rate_limited_udp_socket(io_service& ios - , callback_t const& c - , callback2_t const& c2 - , drain_callback_t const& dc , connection_queue& cc) - : udp_socket(ios, c, c2, dc, cc) + : udp_socket(ios, cc) , m_timer(ios) , m_queue_size_limit(200) , m_rate_limit(4000) diff --git a/src/utp_socket_manager.cpp b/src/utp_socket_manager.cpp index 3b5a6efd1..729056ddb 100644 --- a/src/utp_socket_manager.cpp +++ b/src/utp_socket_manager.cpp @@ -199,7 +199,8 @@ namespace libtorrent return m_sock.local_endpoint(ec); } - bool utp_socket_manager::incoming_packet(char const* p, int size, udp::endpoint const& ep) + bool utp_socket_manager::incoming_packet(error_code const& ec, udp::endpoint const& ep + , char const* p, int size) { // UTP_LOGV("incoming packet size:%d\n", size);