From f14c84b01e1b37938277b4c65d6fc67719694222 Mon Sep 17 00:00:00 2001 From: Arvid Norberg Date: Mon, 29 Jan 2007 07:39:33 +0000 Subject: [PATCH] fixes in DHT to make it work when the number of transaction slots are full (very rare case). Added a feature to (optionally) resolve the countries of peers and updated the docs (see peer_info::country). --- ChangeLog | 2 + docs/manual.html | 276 +++++++-------- docs/manual.rst | 47 +-- examples/client_test.cpp | 15 +- include/libtorrent/bandwidth_manager.hpp | 6 + include/libtorrent/bt_peer_connection.hpp | 2 + include/libtorrent/kademlia/refresh.hpp | 4 +- include/libtorrent/kademlia/rpc_manager.hpp | 11 +- .../kademlia/traversal_algorithm.hpp | 3 +- include/libtorrent/peer_connection.hpp | 18 + include/libtorrent/peer_info.hpp | 6 + include/libtorrent/torrent.hpp | 20 +- include/libtorrent/torrent_handle.hpp | 3 + include/libtorrent/web_peer_connection.hpp | 3 +- src/bandwidth_manager.cpp | 13 +- src/bt_peer_connection.cpp | 13 +- src/kademlia/closest_nodes.cpp | 19 +- src/kademlia/find_data.cpp | 16 + src/kademlia/node.cpp | 3 + src/kademlia/refresh.cpp | 74 +++- src/kademlia/rpc_manager.cpp | 123 ++++--- src/kademlia/traversal_algorithm.cpp | 46 ++- src/peer_connection.cpp | 2 + src/torrent.cpp | 318 ++++++++++++++++++ src/torrent_handle.cpp | 16 + src/web_peer_connection.cpp | 8 + 26 files changed, 823 insertions(+), 244 deletions(-) diff --git a/ChangeLog b/ChangeLog index d155b87a3..438f50287 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,5 @@ + * Added the possibility to have libtorrent resolve the countries of + the peers in torrents. * Improved the bandwidth limiter (it now implements a leaky bucket). * Improved the HTTP seed downloader to report accurate progress. * Added more client peer-id signatures to be recognized. diff --git a/docs/manual.html b/docs/manual.html index 379a64038..c5fb04aab 100755 --- a/docs/manual.html +++ b/docs/manual.html @@ -24,143 +24,144 @@

Table of contents

@@ -1247,6 +1248,9 @@ struct torrent_handle bool is_paused() const; bool is_seed() const; + void resolve_countries(bool r); + bool resolve_countries() const; + void filter_piece(int index, bool filter) const; void filter_pieces(std::vector<bool> const& bitmask) const; bool is_piece_filtered(int index) const; @@ -1423,6 +1427,19 @@ is currently paused. Torrents may be paused automatically if there is a file err or something similar. See file_error_alert.

+

resolve_countries()

+
+
+void resolve_countries(bool r);
+bool resolve_countries() const;
+
+
+

Sets or gets the flag that derermines if countries should be resolved for the peers of this +torrent. It defaults to false. If it is set to true, the peer_info structure for the peers +in this torrent will have their country member set. See peer_info for more information +on how to interpret this field.

+
+

is_seed()

@@ -1837,6 +1854,8 @@ struct peer_info
         int upload_limit;
         int download_limit;
 
+        char country[2];
+
         size_type load_balancing;
 
         int download_queue_length;
@@ -1925,6 +1944,12 @@ peer every second. It may be -1 if there's no local limit on the peer. The globa
 limit and the torrent limit is always enforced anyway.

download_limit is the number of bytes per second this peer is allowed to receive. -1 means it's unlimited.

+

country is the two letter ISO 3166 country code for the country the peer +is connected from. If the country hasn't been resolved yet, both chars are set +to 0. If the resolution failed for some reason, the field is set to "--". If the +resolution service returns an invalid country code, it is set to "!!". +The countries.nerd.dk service is used to look up countries. This field will +remain set to 0 unless the torrent is set to resolve countries, see resolve_countries().

load_balancing is a measurement of the balancing of free download (that we get) and free upload that we give. Every peer gets a certain amount of free upload, but this member says how much extra free upload this peer has got. If it is a negative @@ -2088,7 +2113,7 @@ public:

-

ip_filter()

+

ip_filter()

 ip_filter()
@@ -2975,25 +3000,6 @@ length-prefix, message-id nor extension-id).

handshake, it may be incompatible with future versions of the mainline bittorrent client.

These are the extensions that are currently implemented.

-

metadata from peers

Extension name: "LT_metadata"

diff --git a/docs/manual.rst b/docs/manual.rst index 70735f10d..efa2757ca 100755 --- a/docs/manual.rst +++ b/docs/manual.rst @@ -1194,6 +1194,9 @@ Its declaration looks like this:: bool is_paused() const; bool is_seed() const; + void resolve_countries(bool r); + bool resolve_countries() const; + void filter_piece(int index, bool filter) const; void filter_pieces(std::vector const& bitmask) const; bool is_piece_filtered(int index) const; @@ -1379,6 +1382,19 @@ all potential (not connected) peers. You can use ``is_paused()`` to determine if is currently paused. Torrents may be paused automatically if there is a file error (e.g. disk full) or something similar. See file_error_alert_. +resolve_countries() +------------------- + + :: + + void resolve_countries(bool r); + bool resolve_countries() const; + +Sets or gets the flag that derermines if countries should be resolved for the peers of this +torrent. It defaults to false. If it is set to true, the peer_info_ structure for the peers +in this torrent will have their ``country`` member set. See peer_info_ for more information +on how to interpret this field. + is_seed() --------- @@ -1820,6 +1836,8 @@ It contains the following fields:: int upload_limit; int download_limit; + char country[2]; + size_type load_balancing; int download_queue_length; @@ -1906,6 +1924,15 @@ limit and the torrent limit is always enforced anyway. ``download_limit`` is the number of bytes per second this peer is allowed to receive. -1 means it's unlimited. +``country`` is the two letter `ISO 3166 country code`__ for the country the peer +is connected from. If the country hasn't been resolved yet, both chars are set +to 0. If the resolution failed for some reason, the field is set to "--". If the +resolution service returns an invalid country code, it is set to "!!". +The ``countries.nerd.dk`` service is used to look up countries. This field will +remain set to 0 unless the torrent is set to resolve countries, see `resolve_countries()`_. + +__ http://www.iso.org/iso/en/prods-services/iso3166ma/02iso-3166-code-lists/list-en1.html + ``load_balancing`` is a measurement of the balancing of free download (that we get) and free upload that we give. Every peer gets a certain amount of free upload, but this member says how much *extra* free upload this peer has got. If it is a negative @@ -3066,26 +3093,6 @@ bittorrent client. These are the extensions that are currently implemented. -.. chat messages - ------------- - - Extension name: "chat" - - The payload in the packet is a bencoded dictionary with any - combination of the following entries: - - +----------+--------------------------------------------------------+ - | "msg" | This is a string that contains a message that | - | | should be displayed to the user. | - +----------+--------------------------------------------------------+ - | "ctrl" | This is a control string that can tell a client that | - | | it is ignored (to make the user aware of that) and | - | | it can also tell a client that it is no longer ignored.| - | | These notifications are encoded as the strings: | - | | "ignored" and "not ignored". | - | | Any unrecognized strings should be ignored. | - +----------+--------------------------------------------------------+ - metadata from peers ------------------- diff --git a/examples/client_test.cpp b/examples/client_test.cpp index 876919ea8..94d085246 100755 --- a/examples/client_test.cpp +++ b/examples/client_test.cpp @@ -243,8 +243,9 @@ int peer_index(libtorrent::tcp::endpoint addr, std::vector const& peers) { using namespace libtorrent; - - out << " down (total) up (total) q r flags block progress client \n"; +#ifndef ANSI_TERMINAL_COLORS + out << " down (total) up (total) q r flags block progress country client \n"; +#endif for (std::vector::const_iterator i = peers.begin(); i != peers.end(); ++i) @@ -274,6 +275,15 @@ void print_peer_info(std::ostream& out, std::vector const out << progress_bar(0.f, 15); } + if (i->country[0] == 0) + { + out << " .."; + } + else + { + out << " " << i->country[0] << i->country[1]; + } + if (i->flags & peer_info::handshake) { out << esc("31") << " waiting for handshake" << esc("0") << "\n"; @@ -350,6 +360,7 @@ void add_torrent(libtorrent::session& ses h.set_max_uploads(-1); h.set_ratio(preferred_ratio); h.set_sequenced_download_threshold(15); + h.resolve_countries(true); } catch (std::exception&) {}; diff --git a/include/libtorrent/bandwidth_manager.hpp b/include/libtorrent/bandwidth_manager.hpp index bbf45e3f3..9b9258d5b 100644 --- a/include/libtorrent/bandwidth_manager.hpp +++ b/include/libtorrent/bandwidth_manager.hpp @@ -39,6 +39,7 @@ POSSIBILITY OF SUCH DAMAGE. #include #include #include +#include #include namespace pt = boost::posix_time; @@ -157,12 +158,14 @@ struct bandwidth_manager void throttle(int limit) { + mutex_t::scoped_lock l(m_mutex); assert(limit >= 0); m_limit = limit; } int throttle() const { + mutex_t::scoped_lock l(m_mutex); return m_limit; } @@ -181,6 +184,9 @@ private: void on_history_expire(asio::error_code const& e); void hand_out_bandwidth(); + typedef boost::mutex mutex_t; + mutable mutex_t m_mutex; + // the io_service used for the timer io_service& m_ios; diff --git a/include/libtorrent/bt_peer_connection.hpp b/include/libtorrent/bt_peer_connection.hpp index 78ac398ae..f68a5d1fa 100755 --- a/include/libtorrent/bt_peer_connection.hpp +++ b/include/libtorrent/bt_peer_connection.hpp @@ -130,6 +130,8 @@ namespace libtorrent , std::size_t bytes_transferred); virtual void get_peer_info(peer_info& p) const; + virtual bool in_handshake() const; + #ifndef TORRENT_DISABLE_EXTENSIONS bool support_extensions() const { return m_supports_extensions; } diff --git a/include/libtorrent/kademlia/refresh.hpp b/include/libtorrent/kademlia/refresh.hpp index f47b80462..7231b26f2 100644 --- a/include/libtorrent/kademlia/refresh.hpp +++ b/include/libtorrent/kademlia/refresh.hpp @@ -69,7 +69,7 @@ public: ); void ping_reply(node_id id); - void ping_timeout(node_id id); + void ping_timeout(node_id id, bool prevent_request = false); private: template @@ -88,7 +88,7 @@ private: void done(); void invoke(node_id const& id, udp::endpoint addr); - void invoke_pings_or_finish(); + void invoke_pings_or_finish(bool prevent_request = false); int m_max_active_pings; int m_active_pings; diff --git a/include/libtorrent/kademlia/rpc_manager.hpp b/include/libtorrent/kademlia/rpc_manager.hpp index 5aa850e95..2603071fc 100644 --- a/include/libtorrent/kademlia/rpc_manager.hpp +++ b/include/libtorrent/kademlia/rpc_manager.hpp @@ -128,6 +128,12 @@ struct observer : boost::noncopyable // this is called when no reply has been received within // some timeout virtual void timeout() = 0; + + // if this is called the destructor should + // not invoke any new messages, and should + // only clean up. It means the rpc-manager + // is being destructed + virtual void abort() = 0; udp::endpoint target_addr; boost::posix_time::ptime sent; @@ -162,7 +168,8 @@ public: private: enum { max_transactions = 2048 }; - unsigned int new_transaction_id(); + + unsigned int new_transaction_id(boost::shared_ptr o); void update_oldest_transaction_id(); boost::uint32_t calc_connection_id(udp::endpoint addr); @@ -170,6 +177,7 @@ private: typedef boost::array, max_transactions> transactions_t; transactions_t m_transactions; + std::vector > m_aborted_transactions; // this is the next transaction id to be used int m_next_transaction_id; @@ -185,6 +193,7 @@ private: routing_table& m_table; boost::posix_time::ptime m_timer; node_id m_random_number; + bool m_destructing; }; } } // namespace libtorrent::dht diff --git a/include/libtorrent/kademlia/traversal_algorithm.hpp b/include/libtorrent/kademlia/traversal_algorithm.hpp index a99a9c6ea..6fa647ba4 100644 --- a/include/libtorrent/kademlia/traversal_algorithm.hpp +++ b/include/libtorrent/kademlia/traversal_algorithm.hpp @@ -58,7 +58,7 @@ class traversal_algorithm : boost::noncopyable public: void traverse(node_id const& id, udp::endpoint addr); void finished(node_id const& id); - void failed(node_id const& id); + void failed(node_id const& id, bool prevent_request = false); virtual ~traversal_algorithm() {} protected: @@ -73,7 +73,6 @@ protected: , InIt end ); - void add_request(node_id const& id, udp::endpoint addr); void add_requests(); void add_entry(node_id const& id, udp::endpoint addr, unsigned char flags); diff --git a/include/libtorrent/peer_connection.hpp b/include/libtorrent/peer_connection.hpp index 44c290d53..d4677546d 100755 --- a/include/libtorrent/peer_connection.hpp +++ b/include/libtorrent/peer_connection.hpp @@ -308,6 +308,10 @@ namespace libtorrent virtual void get_peer_info(peer_info& p) const = 0; + // is true until we can be sure that the other end + // speaks our protocol (be it bittorrent or http). + virtual bool in_handshake() const = 0; + // returns the block currently being // downloaded. And the progress of that // block. If the peer isn't downloading @@ -326,6 +330,14 @@ namespace libtorrent buffer::interval allocate_send_buffer(int size); void setup_send(); + void set_country(char const* c) + { + assert(strlen(c) == 2); + m_country[0] = c[0]; + m_country[1] = c[1]; + } + bool has_country() const { return m_country[0] != 0; } + protected: virtual void write_choke() = 0; @@ -414,6 +426,12 @@ namespace libtorrent extension_list_t m_extensions; #endif + // in case the session settings is set + // to resolve countries, this is set to + // the two character country code this + // peer resides in. + char m_country[2]; + private: void fill_send_buffer(); diff --git a/include/libtorrent/peer_info.hpp b/include/libtorrent/peer_info.hpp index 7a6e50834..c293fd345 100755 --- a/include/libtorrent/peer_info.hpp +++ b/include/libtorrent/peer_info.hpp @@ -69,6 +69,12 @@ namespace libtorrent bool seed; // true if this is a seed int upload_limit; int download_limit; + + // in case the session settings is set + // to resolve countries, this is set to + // the two character country code this + // peer resides in. + char country[2]; size_type load_balancing; diff --git a/include/libtorrent/torrent.hpp b/include/libtorrent/torrent.hpp index efde693a4..4bdfab0e7 100755 --- a/include/libtorrent/torrent.hpp +++ b/include/libtorrent/torrent.hpp @@ -190,6 +190,11 @@ namespace libtorrent float ratio() const { return m_ratio; } + void resolve_countries(bool r) + { m_resolve_countries = r; } + + bool resolving_countries() const { return m_resolve_countries; } + // -------------------------------------------- // BANDWIDTH MANAGEMENT @@ -242,6 +247,7 @@ namespace libtorrent peer_iterator begin() { return m_connections.begin(); } peer_iterator end() { return m_connections.end(); } + void resolve_peer_country(boost::intrusive_ptr const& p) const; // -------------------------------------------- // TRACKER MANAGEMENT @@ -457,6 +463,8 @@ namespace libtorrent void try_next_tracker(); int prioritize_tracker(int tracker_index); + void on_country_lookup(asio::error_code const& error, tcp::resolver::iterator i + , boost::intrusive_ptr p) const; torrent_info m_torrent_file; @@ -516,8 +524,18 @@ namespace libtorrent std::set m_resolving_web_seeds; // used to resolve the names of web seeds - tcp::resolver m_host_resolver; + mutable tcp::resolver m_host_resolver; + // this is true while there is a country + // resolution in progress. To avoid flodding + // the DNS request queue, only one ip is reolved + // at a time. + mutable bool m_resolving_country; + + // this is true if the user has enabled + // country resolution in this torrent + bool m_resolve_countries; + #ifndef TORRENT_DISABLE_DHT static void on_dht_announce_response_disp(boost::weak_ptr t , std::vector const& peers); diff --git a/include/libtorrent/torrent_handle.hpp b/include/libtorrent/torrent_handle.hpp index 9a57775ec..92dbdf34b 100755 --- a/include/libtorrent/torrent_handle.hpp +++ b/include/libtorrent/torrent_handle.hpp @@ -243,6 +243,9 @@ namespace libtorrent bool is_paused() const; void pause() const; void resume() const; + + void resolve_countries(bool r); + bool resolve_countries() const; // marks the piece with the given index as filtered // it will not be downloaded diff --git a/include/libtorrent/web_peer_connection.hpp b/include/libtorrent/web_peer_connection.hpp index cef9bd2cf..b1fcbbea2 100755 --- a/include/libtorrent/web_peer_connection.hpp +++ b/include/libtorrent/web_peer_connection.hpp @@ -112,7 +112,8 @@ namespace libtorrent std::string const& url() const { return m_url; } virtual void get_peer_info(peer_info& p) const; - + virtual bool in_handshake() const; + // the following functions appends messages // to the send buffer void write_choke() {} diff --git a/src/bandwidth_manager.cpp b/src/bandwidth_manager.cpp index 94ca22eff..77b0c3de1 100644 --- a/src/bandwidth_manager.cpp +++ b/src/bandwidth_manager.cpp @@ -60,6 +60,9 @@ namespace libtorrent void bandwidth_manager::request_bandwidth(intrusive_ptr peer) { INVARIANT_CHECK; + + mutex_t::scoped_lock l(m_mutex); + // make sure this peer isn't already in line // waiting for bandwidth #ifndef NDEBUG @@ -115,6 +118,8 @@ namespace libtorrent if (e) return; + mutex_t::scoped_lock l(m_mutex); + #if defined TORRENT_LOGGING || defined TORRENT_VERBOSE_LOGGING // (*m_ses->m_logger) << "bw expire [" << m_channel << "]\n"; #endif @@ -187,6 +192,11 @@ namespace libtorrent // send. If the peer was added to the queue while the data was // still being sent, max_assignable may have been > 0 at that time. int max_assignable = peer->max_assignable_bandwidth(m_channel); + if (max_assignable == 0) + { + t->expire_bandwidth(m_channel, -1); + continue; + } // so, hand out max_assignable, but no more than // the available bandwidth (amount) and no more @@ -194,8 +204,9 @@ namespace libtorrent int single_amount = std::min(amount , std::min(bandwidth_block_size_limit , max_assignable)); + assert(single_amount > 0); amount -= single_amount; - if (single_amount > 0) peer->assign_bandwidth(m_channel, single_amount); + peer->assign_bandwidth(m_channel, single_amount); t->assign_bandwidth(m_channel, single_amount); add_history_entry(history_entry(peer, t, single_amount, now + window_size)); } diff --git a/src/bt_peer_connection.cpp b/src/bt_peer_connection.cpp index ab9f9ec8c..51e0ce2ae 100755 --- a/src/bt_peer_connection.cpp +++ b/src/bt_peer_connection.cpp @@ -182,6 +182,9 @@ namespace libtorrent p.payload_up_speed = statistics().upload_payload_rate(); p.pid = pid(); p.ip = remote(); + + p.country[0] = m_country[0]; + p.country[1] = m_country[1]; p.total_download = statistics().total_payload_download(); p.total_upload = statistics().total_payload_upload(); @@ -234,6 +237,11 @@ namespace libtorrent p.client = m_client_version; p.connection_type = peer_info::standard_bittorrent; } + + bool bt_peer_connection::in_handshake() const + { + return m_state < read_packet_size; + } void bt_peer_connection::write_handshake() { @@ -1146,8 +1154,9 @@ namespace libtorrent , recv_buffer.end) << "'\n"; #endif const char protocol_string[] = "BitTorrent protocol"; - if (!std::equal(recv_buffer.begin, recv_buffer.end - , protocol_string)) + if (recv_buffer.end - recv_buffer.begin != 19 + || !std::equal(recv_buffer.begin, recv_buffer.end + , protocol_string)) { const char cmd[] = "version"; if (recv_buffer.end - recv_buffer.begin == 7 && std::equal( diff --git a/src/kademlia/closest_nodes.cpp b/src/kademlia/closest_nodes.cpp index 8d0ccea87..e8bb9781c 100644 --- a/src/kademlia/closest_nodes.cpp +++ b/src/kademlia/closest_nodes.cpp @@ -47,12 +47,12 @@ public: closest_nodes_observer( boost::intrusive_ptr const& algorithm , node_id self - , node_id target - ) + , node_id target) : m_algorithm(algorithm) , m_target(target) , m_self(self) {} + ~closest_nodes_observer(); void send(msg& p) { @@ -61,6 +61,7 @@ public: void timeout(); void reply(msg const&); + void abort() { m_algorithm = 0; } private: boost::intrusive_ptr m_algorithm; @@ -68,8 +69,19 @@ private: node_id const m_self; }; +closest_nodes_observer::~closest_nodes_observer() +{ + if (m_algorithm) m_algorithm->failed(m_self, true); +} + void closest_nodes_observer::reply(msg const& in) { + if (!m_algorithm) + { + assert(false); + return; + } + if (!in.nodes.empty()) { for (msg::nodes_t::const_iterator i = in.nodes.begin() @@ -79,11 +91,14 @@ void closest_nodes_observer::reply(msg const& in) } } m_algorithm->finished(m_self); + m_algorithm = 0; } void closest_nodes_observer::timeout() { + if (!m_algorithm) return; m_algorithm->failed(m_self); + m_algorithm = 0; } diff --git a/src/kademlia/find_data.cpp b/src/kademlia/find_data.cpp index e1e09925b..9fe787807 100644 --- a/src/kademlia/find_data.cpp +++ b/src/kademlia/find_data.cpp @@ -51,6 +51,7 @@ public: , m_target(target) , m_self(self) {} + ~find_data_observer(); void send(msg& m) { @@ -61,6 +62,7 @@ public: void timeout(); void reply(msg const&); + void abort() { m_algorithm = 0; } private: boost::intrusive_ptr m_algorithm; @@ -68,8 +70,19 @@ private: node_id const m_self; }; +find_data_observer::~find_data_observer() +{ + if (m_algorithm) m_algorithm->failed(m_self); +} + void find_data_observer::reply(msg const& m) { + if (!m_algorithm) + { + assert(false); + return; + } + if (!m.peers.empty()) { m_algorithm->got_data(&m); @@ -83,11 +96,14 @@ void find_data_observer::reply(msg const& m) } } m_algorithm->finished(m_self); + m_algorithm = 0; } void find_data_observer::timeout() { + if (!m_algorithm) return; m_algorithm->failed(m_self); + m_algorithm = 0; } diff --git a/src/kademlia/node.cpp b/src/kademlia/node.cpp index 455d96977..07da958bb 100644 --- a/src/kademlia/node.cpp +++ b/src/kademlia/node.cpp @@ -278,6 +278,7 @@ namespace void timeout() {} void reply(msg const&) {} + void abort() {} private: sha1_hash m_info_hash; @@ -311,6 +312,7 @@ namespace new announce_observer(m_info_hash, m_listen_port, r.write_token))); m_fun(r.peers, m_info_hash); } + void abort() {} private: sha1_hash m_info_hash; @@ -344,6 +346,7 @@ namespace virtual void reply(msg const&) {} virtual void timeout() {} virtual void send(msg&) {} + virtual void abort() {} }; } diff --git a/src/kademlia/refresh.cpp b/src/kademlia/refresh.cpp index 37300d16e..ccd753de9 100644 --- a/src/kademlia/refresh.cpp +++ b/src/kademlia/refresh.cpp @@ -64,6 +64,7 @@ public: , m_self(self) , m_algorithm(algorithm) {} + ~refresh_observer(); void send(msg& m) { @@ -72,6 +73,8 @@ public: void timeout(); void reply(msg const& m); + void abort() { m_algorithm = 0; } + private: node_id const m_target; @@ -79,8 +82,15 @@ private: boost::intrusive_ptr m_algorithm; }; +refresh_observer::~refresh_observer() +{ + if (m_algorithm) m_algorithm->failed(m_self, true); +} + void refresh_observer::reply(msg const& in) { + if (!m_algorithm) return; + if (!in.nodes.empty()) { for (msg::nodes_t::const_iterator i = in.nodes.begin() @@ -90,11 +100,14 @@ void refresh_observer::reply(msg const& in) } } m_algorithm->finished(m_self); + m_algorithm = 0; } void refresh_observer::timeout() { + if (!m_algorithm) return; m_algorithm->failed(m_self); + m_algorithm = 0; } class ping_observer : public observer @@ -107,24 +120,37 @@ public: : m_self(self) , m_algorithm(algorithm) {} + ~ping_observer(); void send(msg& p) {} void timeout(); void reply(msg const& m); + void abort() { m_algorithm = 0; } + private: node_id const m_self; boost::intrusive_ptr m_algorithm; }; +ping_observer::~ping_observer() +{ + if (m_algorithm) m_algorithm->ping_timeout(m_self, true); +} + void ping_observer::reply(msg const& m) { + if (!m_algorithm) return; + m_algorithm->ping_reply(m_self); + m_algorithm = 0; } void ping_observer::timeout() { + if (!m_algorithm) return; m_algorithm->ping_timeout(m_self); + m_algorithm = 0; } void refresh::invoke(node_id const& nid, udp::endpoint addr) @@ -152,32 +178,44 @@ void refresh::ping_reply(node_id nid) invoke_pings_or_finish(); } -void refresh::ping_timeout(node_id nid) +void refresh::ping_timeout(node_id nid, bool prevent_request) { m_active_pings--; - invoke_pings_or_finish(); + invoke_pings_or_finish(prevent_request); } -void refresh::invoke_pings_or_finish() +void refresh::invoke_pings_or_finish(bool prevent_request) { - while (m_active_pings < m_max_active_pings) + if (prevent_request) { - if (m_leftover_nodes_iterator == m_results.end()) break; - - result const& node = *m_leftover_nodes_iterator; - - // Skip initial nodes - if (node.flags & result::initial) + --m_max_active_pings; + if (m_max_active_pings <= 0) + m_max_active_pings = 1; + } + else + { + while (m_active_pings < m_max_active_pings) { - ++m_leftover_nodes_iterator; - continue; + if (m_leftover_nodes_iterator == m_results.end()) break; + + result const& node = *m_leftover_nodes_iterator; + + // Skip initial nodes + if (node.flags & result::initial) + { + ++m_leftover_nodes_iterator; + continue; + } + + try + { + observer_ptr p(new ping_observer(this, node.id)); + m_rpc.invoke(messages::ping, node.addr, p); + ++m_active_pings; + ++m_leftover_nodes_iterator; + } + catch (std::exception& e) {} } - - observer_ptr p(new ping_observer(this, node.id)); - - m_rpc.invoke(messages::ping, node.addr, p); - ++m_active_pings; - ++m_leftover_nodes_iterator; } if (m_active_pings == 0) diff --git a/src/kademlia/rpc_manager.cpp b/src/kademlia/rpc_manager.cpp index 3883b04ea..d0e0f0c54 100644 --- a/src/kademlia/rpc_manager.cpp +++ b/src/kademlia/rpc_manager.cpp @@ -72,15 +72,25 @@ rpc_manager::rpc_manager(fun const& f, node_id const& our_id , m_table(table) , m_timer(boost::posix_time::microsec_clock::universal_time()) , m_random_number(generate_id()) + , m_destructing(false) { std::srand(time(0)); } rpc_manager::~rpc_manager() { + m_destructing = true; #ifdef TORRENT_DHT_VERBOSE_LOGGING TORRENT_LOG(rpc) << "Destructing"; #endif + std::for_each(m_aborted_transactions.begin(), m_aborted_transactions.end() + , bind(&observer::abort, _1)); + + for (transactions_t::iterator i = m_transactions.begin() + , end(m_transactions.end()); i != end; ++i) + { + if (*i) (*i)->abort(); + } } #ifndef NDEBUG @@ -104,6 +114,8 @@ bool rpc_manager::incoming(msg const& m) { INVARIANT_CHECK; + if (m_destructing) return false; + if (m.reply) { // if we don't have the transaction id in our @@ -195,6 +207,8 @@ time_duration rpc_manager::tick() if (m_next_transaction_id == m_oldest_transaction_id) return milliseconds(timeout_ms); + std::vector > timeouts; + for (;m_next_transaction_id != m_oldest_transaction_id; m_oldest_transaction_id = (m_oldest_transaction_id + 1) % max_transactions) { @@ -215,24 +229,39 @@ time_duration rpc_manager::tick() try { m_transactions[m_oldest_transaction_id].reset(); - o->timeout(); + timeouts.push_back(o); } catch (std::exception) {} } + + check_invariant(); + + std::for_each(timeouts.begin(), timeouts.end(), bind(&observer::timeout, _1)); + timeouts.clear(); + + // clear the aborted transactions, will likely + // generate new requests. We need to swap, since the + // destrutors may add more observers to the m_aborted_transactions + std::vector >().swap(m_aborted_transactions); return milliseconds(timeout_ms); } -unsigned int rpc_manager::new_transaction_id() +unsigned int rpc_manager::new_transaction_id(shared_ptr o) { INVARIANT_CHECK; unsigned int tid = m_next_transaction_id; m_next_transaction_id = (m_next_transaction_id + 1) % max_transactions; -// boost::shared_ptr o = m_transactions[m_next_transaction_id]; if (m_transactions[m_next_transaction_id]) { + // moving the observer into the set of aborted transactions + // it will prevent it from spawning new requests right now, + // since that would break the invariant + m_aborted_transactions.push_back(m_transactions[m_next_transaction_id]); m_transactions[m_next_transaction_id].reset(); assert(m_oldest_transaction_id == m_next_transaction_id); } + assert(!m_transactions[tid]); + m_transactions[tid] = o; if (m_oldest_transaction_id == m_next_transaction_id) { m_oldest_transaction_id = (m_oldest_transaction_id + 1) % max_transactions; @@ -243,21 +272,6 @@ unsigned int rpc_manager::new_transaction_id() update_oldest_transaction_id(); } -#ifndef NDEBUG - assert(!m_transactions[m_next_transaction_id]); - for (int i = (m_next_transaction_id + 1) % max_transactions; - i != m_oldest_transaction_id; i = (i + 1) % max_transactions) - { - assert(!m_transactions[i]); - } -#endif - -// hopefully this wouldn't happen, but unfortunately, the -// traversal algorithm will simply fail in case its connections -// are overwritten. If timeout() is called, it will likely spawn -// another connection, which in turn will close the next one -// and so on. -// if (o) o->timeout(); return tid; } @@ -280,33 +294,48 @@ void rpc_manager::invoke(int message_id, udp::endpoint target_addr { INVARIANT_CHECK; + if (m_destructing) + { + o->abort(); + return; + } + msg m; m.message_id = message_id; m.reply = false; m.id = m_our_id; m.addr = target_addr; - int tid = new_transaction_id(); - m.transaction_id.clear(); - std::back_insert_iterator out(m.transaction_id); - io::write_uint16(tid, out); - - o->send(m); + assert(!m_transactions[m_next_transaction_id]); + try + { + m.transaction_id.clear(); + std::back_insert_iterator out(m.transaction_id); + io::write_uint16(m_next_transaction_id, out); + + o->send(m); - m_transactions[tid] = o; - o->sent = boost::posix_time::microsec_clock::universal_time(); - o->target_addr = target_addr; + o->sent = boost::posix_time::microsec_clock::universal_time(); + o->target_addr = target_addr; -#ifdef TORRENT_DHT_VERBOSE_LOGGING - TORRENT_LOG(rpc) << "Invoking " << messages::ids[message_id] - << " -> " << target_addr; -#endif - m_send(m); + #ifdef TORRENT_DHT_VERBOSE_LOGGING + TORRENT_LOG(rpc) << "Invoking " << messages::ids[message_id] + << " -> " << target_addr; + #endif + m_send(m); + new_transaction_id(o); + } + catch (std::exception&) + { + assert(false); + } } void rpc_manager::reply(msg& m, msg const& reply_to) { INVARIANT_CHECK; + if (m_destructing) return; + if (m.message_id != messages::error) m.message_id = reply_to.message_id; m.addr = reply_to.addr; @@ -325,6 +354,7 @@ namespace virtual void reply(msg const&) {} virtual void timeout() {} virtual void send(msg&) {} + void abort() {} }; } @@ -332,6 +362,8 @@ void rpc_manager::reply_with_ping(msg& m, msg const& reply_to) { INVARIANT_CHECK; + if (m_destructing) return; + if (m.message_id != messages::error) m.message_id = reply_to.message_id; m.addr = reply_to.addr; @@ -340,17 +372,24 @@ void rpc_manager::reply_with_ping(msg& m, msg const& reply_to) m.id = m_our_id; m.transaction_id = reply_to.transaction_id; - int ptid = new_transaction_id(); - m.ping_transaction_id.clear(); - std::back_insert_iterator out(m.ping_transaction_id); - io::write_uint16(ptid, out); + try + { + m.ping_transaction_id.clear(); + std::back_insert_iterator out(m.ping_transaction_id); + io::write_uint16(m_next_transaction_id, out); - boost::shared_ptr o(new dummy_observer); - m_transactions[ptid] = o; - o->sent = boost::posix_time::microsec_clock::universal_time(); - o->target_addr = m.addr; - - m_send(m); + boost::shared_ptr o(new dummy_observer); + assert(!m_transactions[m_next_transaction_id]); + o->sent = boost::posix_time::microsec_clock::universal_time(); + o->target_addr = m.addr; + + m_send(m); + new_transaction_id(o); + } + catch (std::exception& e) + { + assert(false); + } } diff --git a/src/kademlia/traversal_algorithm.cpp b/src/kademlia/traversal_algorithm.cpp index 88c62f836..1efe76e77 100644 --- a/src/kademlia/traversal_algorithm.cpp +++ b/src/kademlia/traversal_algorithm.cpp @@ -65,6 +65,9 @@ void traversal_algorithm::add_entry(node_id const& id, udp::endpoint addr, unsig if (i == m_results.end() || i->id != id) { + assert(std::find_if(m_results.begin(), m_results.end() + , bind(std::equal_to() + , bind(&result::id, _1), id)) == m_results.end()); #ifdef TORRENT_DHT_VERBOSE_LOGGING TORRENT_LOG(traversal) << "adding result: " << id << " " << addr; #endif @@ -84,7 +87,10 @@ void traversal_algorithm::finished(node_id const& id) if (m_invoke_count == 0) done(); } -void traversal_algorithm::failed(node_id const& id) +// prevent request means that the total number of requests has +// overflown. This query failed because it was the oldest one. +// So, if this is true, don't make another request +void traversal_algorithm::failed(node_id const& id, bool prevent_request) { m_invoke_count--; @@ -100,23 +106,28 @@ void traversal_algorithm::failed(node_id const& id) assert(i != m_results.end()); - assert(i->flags & result::queried); - m_failed.insert(i->addr); + if (i != m_results.end()) + { + assert(i->flags & result::queried); + m_failed.insert(i->addr); #ifdef TORRENT_DHT_VERBOSE_LOGGING - TORRENT_LOG(traversal) << "failed: " << i->id << " " << i->addr; + TORRENT_LOG(traversal) << "failed: " << i->id << " " << i->addr; #endif - m_results.erase(i); - m_table.node_failed(id); + m_results.erase(i); + } + if (prevent_request) + { + --m_branch_factor; + if (m_branch_factor <= 0) m_branch_factor = 1; + } + else + { + m_table.node_failed(id); + } add_requests(); if (m_invoke_count == 0) done(); } -void traversal_algorithm::add_request(node_id const& id, udp::endpoint addr) -{ - invoke(id, addr); - ++m_invoke_count; -} - namespace { bool bitwise_nand(unsigned char lhs, unsigned char rhs) @@ -128,7 +139,7 @@ namespace void traversal_algorithm::add_requests() { while (m_invoke_count < m_branch_factor) - { + { // Find the first node that hasn't already been queried. // TODO: Better heuristic std::vector::iterator i = std::find_if( @@ -146,8 +157,13 @@ void traversal_algorithm::add_requests() if (i == last_iterator()) break; - add_request(i->id, i->addr); - i->flags |= result::queried; + try + { + invoke(i->id, i->addr); + ++m_invoke_count; + i->flags |= result::queried; + } + catch (std::exception& e) {} } } diff --git a/src/peer_connection.cpp b/src/peer_connection.cpp index c1487320e..9ba93d174 100755 --- a/src/peer_connection.cpp +++ b/src/peer_connection.cpp @@ -123,6 +123,7 @@ namespace libtorrent , m_in_constructor(true) #endif { + std::fill(m_country, m_country + 2, 0); #ifdef TORRENT_VERBOSE_LOGGING m_logger = m_ses.create_log(m_remote.address().to_string() + "_" + boost::lexical_cast(m_remote.port()), m_ses.listen_port()); @@ -185,6 +186,7 @@ namespace libtorrent , m_in_constructor(true) #endif { + std::fill(m_country, m_country + 2, 0); m_remote = m_socket->remote_endpoint(); #ifdef TORRENT_VERBOSE_LOGGING diff --git a/src/torrent.cpp b/src/torrent.cpp index 697d175a4..a31f73d32 100755 --- a/src/torrent.cpp +++ b/src/torrent.cpp @@ -239,6 +239,8 @@ namespace libtorrent , m_complete(-1) , m_incomplete(-1) , m_host_resolver(ses.m_io_service) + , m_resolving_country(false) + , m_resolve_countries(false) #ifndef TORRENT_DISABLE_DHT , m_dht_announce_timer(ses.m_io_service) #endif @@ -326,6 +328,8 @@ namespace libtorrent , m_complete(-1) , m_incomplete(-1) , m_host_resolver(ses.m_io_service) + , m_resolving_country(false) + , m_resolve_countries(false) #ifndef TORRENT_DISABLE_DHT , m_dht_announce_timer(ses.m_io_service) #endif @@ -1187,6 +1191,320 @@ namespace libtorrent } + void torrent::resolve_peer_country(boost::intrusive_ptr const& p) const + { + if (m_resolving_country + || p->has_country() + || p->is_connecting() + || p->is_queued() + || p->in_handshake()) return; + + m_resolving_country = true; + tcp::resolver::query q(boost::lexical_cast(p->remote().address()) + + ".zz.countries.nerd.dk", "0"); + m_host_resolver.async_resolve(q, m_ses.m_strand.wrap( + bind(&torrent::on_country_lookup, shared_from_this(), _1, _2, p))); + } + + namespace + { + typedef std::pair country_entry; + + bool compare_first(country_entry const& lhs, country_entry const& rhs) + { + return lhs.first < rhs.first; + } + } + + void torrent::on_country_lookup(asio::error_code const& error, tcp::resolver::iterator i + , intrusive_ptr p) const + { + session_impl::mutex_t::scoped_lock l(m_ses.m_mutex); + + INVARIANT_CHECK; + + m_resolving_country = false; + + // must be ordered in increasing order + country_entry country_map[] = + { + country_entry( 4, "AF") + , country_entry( 8, "AL") + , country_entry( 10, "AQ") + , country_entry( 12, "DZ") + , country_entry( 16, "AS") + , country_entry( 20, "AD") + , country_entry( 24, "AO") + , country_entry( 28, "AG") + , country_entry( 31, "AZ") + , country_entry( 32, "AR") + , country_entry( 36, "AU") + , country_entry( 40, "AT") + , country_entry( 44, "BS") + , country_entry( 48, "BH") + , country_entry( 50, "BD") + , country_entry( 51, "AM") + , country_entry( 52, "BB") + , country_entry( 56, "BE") + , country_entry( 60, "BM") + , country_entry( 64, "BT") + , country_entry( 68, "BO") + , country_entry( 70, "BA") + , country_entry( 72, "BW") + , country_entry( 74, "BV") + , country_entry( 76, "BR") + , country_entry( 84, "BZ") + , country_entry( 86, "IO") + , country_entry( 90, "SB") + , country_entry( 92, "VG") + , country_entry( 96, "BN") + , country_entry(100, "BG") + , country_entry(104, "MM") + , country_entry(108, "BI") + , country_entry(112, "BY") + , country_entry(116, "KH") + , country_entry(120, "CM") + , country_entry(124, "CA") + , country_entry(132, "CV") + , country_entry(136, "KY") + , country_entry(140, "CF") + , country_entry(144, "LK") + , country_entry(148, "TD") + , country_entry(152, "CL") + , country_entry(156, "CN") + , country_entry(158, "TW") + , country_entry(162, "CX") + , country_entry(166, "CC") + , country_entry(170, "CO") + , country_entry(174, "KM") + , country_entry(175, "YT") + , country_entry(178, "CG") + , country_entry(180, "CD") + , country_entry(184, "CK") + , country_entry(188, "CR") + , country_entry(191, "HR") + , country_entry(192, "CU") + , country_entry(203, "CZ") + , country_entry(204, "BJ") + , country_entry(208, "DK") + , country_entry(212, "DM") + , country_entry(214, "DO") + , country_entry(218, "EC") + , country_entry(222, "SV") + , country_entry(226, "GQ") + , country_entry(231, "ET") + , country_entry(232, "ER") + , country_entry(233, "EE") + , country_entry(234, "FO") + , country_entry(238, "FK") + , country_entry(239, "GS") + , country_entry(242, "FJ") + , country_entry(246, "FI") + , country_entry(248, "AX") + , country_entry(250, "FR") + , country_entry(254, "GF") + , country_entry(258, "PF") + , country_entry(260, "TF") + , country_entry(262, "DJ") + , country_entry(266, "GA") + , country_entry(268, "GE") + , country_entry(270, "GM") + , country_entry(275, "PS") + , country_entry(276, "DE") + , country_entry(288, "GH") + , country_entry(292, "GI") + , country_entry(296, "KI") + , country_entry(300, "GR") + , country_entry(304, "GL") + , country_entry(308, "GD") + , country_entry(312, "GP") + , country_entry(316, "GU") + , country_entry(320, "GT") + , country_entry(324, "GN") + , country_entry(328, "GY") + , country_entry(332, "HT") + , country_entry(334, "HM") + , country_entry(336, "VA") + , country_entry(340, "HN") + , country_entry(344, "HK") + , country_entry(348, "HU") + , country_entry(352, "IS") + , country_entry(356, "IN") + , country_entry(360, "ID") + , country_entry(364, "IR") + , country_entry(368, "IQ") + , country_entry(372, "IE") + , country_entry(376, "IL") + , country_entry(380, "IT") + , country_entry(384, "CI") + , country_entry(388, "JM") + , country_entry(392, "JP") + , country_entry(398, "KZ") + , country_entry(400, "JO") + , country_entry(404, "KE") + , country_entry(408, "KP") + , country_entry(410, "KR") + , country_entry(414, "KW") + , country_entry(417, "KG") + , country_entry(418, "LA") + , country_entry(422, "LB") + , country_entry(426, "LS") + , country_entry(428, "LV") + , country_entry(430, "LR") + , country_entry(434, "LY") + , country_entry(438, "LI") + , country_entry(440, "LT") + , country_entry(442, "LU") + , country_entry(446, "MO") + , country_entry(450, "MG") + , country_entry(454, "MW") + , country_entry(458, "MY") + , country_entry(462, "MV") + , country_entry(466, "ML") + , country_entry(470, "MT") + , country_entry(474, "MQ") + , country_entry(478, "MR") + , country_entry(480, "MU") + , country_entry(484, "MX") + , country_entry(492, "MC") + , country_entry(496, "MN") + , country_entry(498, "MD") + , country_entry(500, "MS") + , country_entry(504, "MA") + , country_entry(508, "MZ") + , country_entry(512, "OM") + , country_entry(516, "NA") + , country_entry(520, "NR") + , country_entry(524, "NP") + , country_entry(528, "NL") + , country_entry(530, "AN") + , country_entry(533, "AW") + , country_entry(540, "NC") + , country_entry(548, "VU") + , country_entry(554, "NZ") + , country_entry(558, "NI") + , country_entry(562, "NE") + , country_entry(566, "NG") + , country_entry(570, "NU") + , country_entry(574, "NF") + , country_entry(578, "NO") + , country_entry(580, "MP") + , country_entry(581, "UM") + , country_entry(583, "FM") + , country_entry(584, "MH") + , country_entry(585, "PW") + , country_entry(586, "PK") + , country_entry(591, "PA") + , country_entry(598, "PG") + , country_entry(600, "PY") + , country_entry(604, "PE") + , country_entry(608, "PH") + , country_entry(612, "PN") + , country_entry(616, "PL") + , country_entry(620, "PT") + , country_entry(624, "GW") + , country_entry(626, "TL") + , country_entry(630, "PR") + , country_entry(634, "QA") + , country_entry(634, "QA") + , country_entry(638, "RE") + , country_entry(642, "RO") + , country_entry(643, "RU") + , country_entry(646, "RW") + , country_entry(654, "SH") + , country_entry(659, "KN") + , country_entry(660, "AI") + , country_entry(662, "LC") + , country_entry(666, "PM") + , country_entry(670, "VC") + , country_entry(674, "SM") + , country_entry(678, "ST") + , country_entry(682, "SA") + , country_entry(686, "SN") + , country_entry(690, "SC") + , country_entry(694, "SL") + , country_entry(702, "SG") + , country_entry(703, "SK") + , country_entry(704, "VN") + , country_entry(705, "SI") + , country_entry(706, "SO") + , country_entry(710, "ZA") + , country_entry(716, "ZW") + , country_entry(724, "ES") + , country_entry(732, "EH") + , country_entry(736, "SD") + , country_entry(740, "SR") + , country_entry(744, "SJ") + , country_entry(748, "SZ") + , country_entry(752, "SE") + , country_entry(756, "CH") + , country_entry(760, "SY") + , country_entry(762, "TJ") + , country_entry(764, "TH") + , country_entry(768, "TG") + , country_entry(772, "TK") + , country_entry(776, "TO") + , country_entry(780, "TT") + , country_entry(784, "AE") + , country_entry(788, "TN") + , country_entry(792, "TR") + , country_entry(795, "TM") + , country_entry(796, "TC") + , country_entry(798, "TV") + , country_entry(800, "UG") + , country_entry(804, "UA") + , country_entry(807, "MK") + , country_entry(818, "EG") + , country_entry(826, "GB") + , country_entry(834, "TZ") + , country_entry(840, "US") + , country_entry(850, "VI") + , country_entry(854, "BF") + , country_entry(858, "UY") + , country_entry(860, "UZ") + , country_entry(862, "VE") + , country_entry(876, "WF") + , country_entry(882, "WS") + , country_entry(887, "YE") + , country_entry(891, "CS") + , country_entry(894, "ZM") + }; + + if (error || i == tcp::resolver::iterator()) + { + // this is used to indicate that we shouldn't + // try to resolve it again + p->set_country("--"); + return; + } + + while (i != tcp::resolver::iterator() + && !i->endpoint().address().is_v4()) ++i; + if (i != tcp::resolver::iterator()) + { + // country is an ISO 3166 country code + int country = i->endpoint().address().to_v4().to_ulong() & 0xffff; + + // look up the country code in the map + const int size = sizeof(country_map)/sizeof(country_map[0]); + country_entry* i = + std::lower_bound(country_map, country_map + size + , country_entry(country, ""), &compare_first); + if (i == country_map + size + || i->first != country) + { + // unknown country! + p->set_country("!!"); +#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING) + (*m_ses.m_logger) << "IP " << p->remote().address() << " was mapped to unknown country: " << country << "\n"; +#endif + return; + } + + p->set_country(i->second); + } + } + void torrent::on_name_lookup(asio::error_code const& e, tcp::resolver::iterator host , std::string url) try { diff --git a/src/torrent_handle.cpp b/src/torrent_handle.cpp index 96eb54d91..dcc46fd47 100755 --- a/src/torrent_handle.cpp +++ b/src/torrent_handle.cpp @@ -632,6 +632,20 @@ namespace libtorrent , bind(&torrent::set_ratio, _1, ratio)); } + void torrent_handle::resolve_countries(bool r) + { + INVARIANT_CHECK; + call_member(m_ses, m_chk, m_info_hash + , bind(&torrent::resolve_countries, _1, r)); + } + + bool torrent_handle::resolve_countries() const + { + INVARIANT_CHECK; + return call_member(m_ses, m_chk, m_info_hash + , bind(&torrent::resolving_countries, _1)); + } + void torrent_handle::get_peer_info(std::vector& v) const { INVARIANT_CHECK; @@ -657,6 +671,8 @@ namespace libtorrent peer_info& p = v.back(); peer->get_peer_info(p); + if (t->resolving_countries()) + t->resolve_peer_country(intrusive_ptr(peer)); } } diff --git a/src/web_peer_connection.cpp b/src/web_peer_connection.cpp index b0a2548ba..bb9d20991 100755 --- a/src/web_peer_connection.cpp +++ b/src/web_peer_connection.cpp @@ -455,6 +455,9 @@ namespace libtorrent p.payload_up_speed = statistics().upload_payload_rate(); p.pid = pid(); p.ip = remote(); + + p.country[0] = m_country[0]; + p.country[1] = m_country[1]; p.total_download = statistics().total_payload_download(); p.total_upload = statistics().total_payload_upload(); @@ -507,6 +510,11 @@ namespace libtorrent p.connection_type = peer_info::web_seed; } + bool web_peer_connection::in_handshake() const + { + return m_server_string.empty(); + } + // throws exception when the client should be disconnected void web_peer_connection::on_sent(asio::error_code const& error , std::size_t bytes_transferred)