diff --git a/CMakeLists.txt b/CMakeLists.txt index b7e8cf04c..41dd8735c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -53,6 +53,7 @@ set(sources peer_list puff random + receive_buffer request_blocks resolver rss diff --git a/Jamfile b/Jamfile index 9fc7971c9..a969a2619 100755 --- a/Jamfile +++ b/Jamfile @@ -549,6 +549,7 @@ SOURCES = proxy_base puff random + receive_buffer rss session session_impl diff --git a/examples/client_test.cpp b/examples/client_test.cpp index 7e4536b6b..894ed06d4 100644 --- a/examples/client_test.cpp +++ b/examples/client_test.cpp @@ -1112,7 +1112,7 @@ int main(int argc, char* argv[]) " share ratio rather than downloading\n" " -K enable piece suggestions of read cache\n" " -r connect to specified peer\n" -#ifndef TORRENT_DISABLE_ENCRYPTION +#if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) " -e force encrypted bittorrent connections\n" #endif "\n QUEING OPTIONS\n" @@ -1288,7 +1288,7 @@ int main(int argc, char* argv[]) --i; break; case 'l': settings.set_int(settings_pack::listen_queue_size, atoi(arg)); break; -#ifndef TORRENT_DISABLE_ENCRYPTION +#if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) case 'e': { settings.set_int(settings_pack::out_enc_policy, settings_pack::pe_forced); diff --git a/include/libtorrent/aux_/session_impl.hpp b/include/libtorrent/aux_/session_impl.hpp index a94b3978b..f2e4977e9 100644 --- a/include/libtorrent/aux_/session_impl.hpp +++ b/include/libtorrent/aux_/session_impl.hpp @@ -347,7 +347,7 @@ namespace libtorrent void maybe_update_udp_mapping(int nat, int local_port, int external_port); -#ifndef TORRENT_DISABLE_ENCRYPTION +#if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) torrent const* find_encrypted_torrent( sha1_hash const& info_hash, sha1_hash const& xor_mask); @@ -740,7 +740,7 @@ namespace libtorrent tracker_manager m_tracker_manager; torrent_map m_torrents; -#ifndef TORRENT_DISABLE_ENCRYPTION +#if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) // this maps obfuscated hashes to torrents. It's only // used when encryption is enabled torrent_map m_obfuscated_torrents; diff --git a/include/libtorrent/aux_/session_interface.hpp b/include/libtorrent/aux_/session_interface.hpp index 05217af14..2beb0636c 100644 --- a/include/libtorrent/aux_/session_interface.hpp +++ b/include/libtorrent/aux_/session_interface.hpp @@ -278,7 +278,7 @@ namespace libtorrent { namespace aux virtual boost::asio::ssl::context* ssl_ctx() = 0 ; #endif -#ifndef TORRENT_DISABLE_ENCRYPTION +#if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) virtual torrent const* find_encrypted_torrent( sha1_hash const& info_hash, sha1_hash const& xor_mask) = 0; virtual void add_obfuscated_hash(sha1_hash const& obfuscated diff --git a/include/libtorrent/bt_peer_connection.hpp b/include/libtorrent/bt_peer_connection.hpp index ab09833a2..2dc1d3964 100644 --- a/include/libtorrent/bt_peer_connection.hpp +++ b/include/libtorrent/bt_peer_connection.hpp @@ -104,11 +104,14 @@ namespace libtorrent ~bt_peer_connection(); -#ifndef TORRENT_DISABLE_ENCRYPTION +#if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) bool supports_encryption() const { return m_encrypted; } bool rc4_encrypted() const { return m_rc4_encrypted; } + + void switch_send_crypto(boost::shared_ptr crypto); + void switch_recv_crypto(boost::shared_ptr crypto); #endif virtual int type() const { return peer_connection::bittorrent_connection; } @@ -161,9 +164,15 @@ namespace libtorrent , std::size_t bytes_transferred); void on_receive(error_code const& error , std::size_t bytes_transferred); + void on_receive_impl(std::size_t bytes_transferred); + +#if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) + virtual int hit_send_barrier(std::vector& iovec); +#endif virtual void get_specific_peer_info(peer_info& p) const; virtual bool in_handshake() const; + bool packet_finished() const { return m_recv_buffer.packet_finished(); } #ifndef TORRENT_DISABLE_EXTENSIONS bool supports_holepunch() const { return m_holepunch_id != 0; } @@ -260,7 +269,7 @@ namespace libtorrent // will be invalid. boost::optional downloading_piece_progress() const; -#ifndef TORRENT_DISABLE_ENCRYPTION +#if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) // if (is_local()), we are 'a' otherwise 'b' // @@ -287,6 +296,9 @@ namespace libtorrent // If no sync found, return -1 int get_syncoffset(char const* src, int src_size , char const* target, int target_size) const; + + // helper to cut down on boilerplate + void rc4_decrypt(char* pos, int len); #endif public: @@ -299,19 +311,11 @@ public: , void* userdata = NULL, block_cache_reference ref = block_cache_reference()); - virtual void send_buffer(char const* begin, int size, int flags = 0 - , void (*fun)(char*, int, void*) = 0, void* userdata = 0); - - virtual void append_send_buffer(char* buffer, int size - , chained_buffer::free_buffer_fun destructor = &nop - , void* userdata = NULL, block_cache_reference ref - = block_cache_reference(), bool encrypted = false); - private: enum state_t { -#ifndef TORRENT_DISABLE_ENCRYPTION +#if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) read_pe_dhkey = 0, read_pe_syncvc, read_pe_synchash, @@ -332,7 +336,7 @@ private: read_packet }; -#ifndef TORRENT_DISABLE_ENCRYPTION +#if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) enum { handshake_len = 68, @@ -360,7 +364,7 @@ private: // and can send bittorrent messages bool m_sent_handshake:1; -#ifndef TORRENT_DISABLE_ENCRYPTION +#if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) // this is set to true after the encryption method has been // succesfully negotiated (either plaintext or rc4), to signal // automatic encryption/decryption. @@ -368,6 +372,8 @@ private: // true if rc4, false if plaintext bool m_rc4_encrypted:1; + + crypto_receive_buffer m_recv_buffer; #endif std::string m_client_version; @@ -395,17 +401,21 @@ private: std::vector m_payloads; -#ifndef TORRENT_DISABLE_ENCRYPTION +#if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) // initialized during write_pe1_2_dhkey, and destroyed on // creation of m_enc_handler. Cannot reinitialize once // initialized. boost::scoped_ptr m_dh_key_exchange; + // used during an encrypted handshake then moved + // into m_enc_handler if rc4 encryption is negotiated + // otherwise it is destroyed when the handshake completes + boost::shared_ptr m_rc4; + // if encryption is negotiated, this is used for - // encryption/decryption during the entire session. Destroyed - // if plaintext is selected - boost::scoped_ptr m_enc_handler; - + // encryption/decryption during the entire session. + encryption_handler m_enc_handler; + // (outgoing only) synchronize verification constant with // remote peer, this will hold rc4_decrypt(vc). Destroyed // after the sync step. @@ -415,11 +425,11 @@ private: // the sync hash (hash("req1",secret)). Destroyed after the // sync step. boost::scoped_ptr m_sync_hash; -#endif // #ifndef TORRENT_DISABLE_ENCRYPTION +#endif // #if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) static const message_handler m_message_handler[num_supported_messages]; -#ifndef TORRENT_DISABLE_ENCRYPTION +#if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) // used to disconnect peer if sync points are not found within // the maximum number of bytes int m_sync_bytes_read; @@ -444,7 +454,7 @@ private: #endif #if defined TORRENT_DEBUG || TORRENT_RELEASE_ASSERTS - bool m_in_constructor; + bool m_in_constructor; #endif }; diff --git a/include/libtorrent/chained_buffer.hpp b/include/libtorrent/chained_buffer.hpp index e508c4e69..5c651b30c 100644 --- a/include/libtorrent/chained_buffer.hpp +++ b/include/libtorrent/chained_buffer.hpp @@ -88,6 +88,10 @@ namespace libtorrent , free_buffer_fun destructor, void* userdata , block_cache_reference ref = block_cache_reference()); + void prepend_buffer(char* buffer, int s, int used_size + , free_buffer_fun destructor, void* userdata + , block_cache_reference ref = block_cache_reference()); + // returns the number of bytes available at the // end of the last chained buffer. int space_in_last_buffer(); @@ -106,9 +110,13 @@ namespace libtorrent void clear(); + void build_mutable_iovec(int bytes, std::vector& vec); + ~chained_buffer(); private: + template + void build_vec(int bytes, std::vector& vec); // this is the list of all the buffers we want to // send diff --git a/include/libtorrent/extensions.hpp b/include/libtorrent/extensions.hpp index 8e7f0cf9b..574a22831 100644 --- a/include/libtorrent/extensions.hpp +++ b/include/libtorrent/extensions.hpp @@ -450,6 +450,34 @@ namespace libtorrent virtual bool write_request(peer_request const&) { return false; } }; + struct TORRENT_EXPORT crypto_plugin + { + // hidden + virtual ~crypto_plugin() {} + + virtual void set_incoming_key(unsigned char const* key, int len) = 0; + virtual void set_outgoing_key(unsigned char const* key, int len) = 0; + + // encrypted the provided buffers and returns the number of bytes which + // are now ready to be sent to the lower layer. This must be at least + // as large as the number of bytes passed in and may be larger if there + // is additional data to be inserted at the head of the send buffer. + // The additional data is retrived from the passed in vector. The + // vector must be cleared if no additional data is to be inserted. + virtual int encrypt(std::vector& /*send_vec*/) = 0; + + // decrypt the provided buffers. + // consume is set to the number of bytes which should be trimmed from the + // head of the buffers, default is 0 + // + // produce is set to the number of bytes of payload which are now ready to + // be sent to the upper layer. default is the number of bytes passed in receive_vec + // + // packet_size is set to the minimum number of bytes which must be read to + // advance the next step of decryption. default is 0 + virtual void decrypt(std::vector& /*receive_vec*/ + , int& /* consume */, int& /*produce*/, int& /*packet_size*/) = 0; + }; } #endif diff --git a/include/libtorrent/kademlia/dht_observer.hpp b/include/libtorrent/kademlia/dht_observer.hpp index 02fb347ce..2145797f1 100644 --- a/include/libtorrent/kademlia/dht_observer.hpp +++ b/include/libtorrent/kademlia/dht_observer.hpp @@ -34,6 +34,7 @@ POSSIBILITY OF SUCH DAMAGE. #define DHT_OBSERVER_HPP #include "libtorrent/address.hpp" +#include "libtorrent/kademlia/msg.hpp" namespace libtorrent { namespace dht { diff --git a/include/libtorrent/pe_crypto.hpp b/include/libtorrent/pe_crypto.hpp index fbd2b2a28..b9163b757 100644 --- a/include/libtorrent/pe_crypto.hpp +++ b/include/libtorrent/pe_crypto.hpp @@ -30,7 +30,7 @@ POSSIBILITY OF SUCH DAMAGE. */ -#ifndef TORRENT_DISABLE_ENCRYPTION +#if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) #ifndef TORRENT_PE_CRYPTO_HPP_INCLUDED #define TORRENT_PE_CRYPTO_HPP_INCLUDED @@ -52,7 +52,12 @@ void TORRENT_EXTRA_EXPORT rc4_init(const unsigned char* in, unsigned long len, r unsigned long TORRENT_EXTRA_EXPORT rc4_encrypt(unsigned char *out, unsigned long outlen, rc4 *state); #endif +#include +#include + +#include "libtorrent/receive_buffer.hpp" #include "libtorrent/peer_id.hpp" // For sha1_hash +#include "libtorrent/extensions.hpp" #include "libtorrent/assert.hpp" namespace libtorrent @@ -87,14 +92,39 @@ namespace libtorrent struct encryption_handler { - virtual void set_incoming_key(unsigned char const* key, int len) = 0; - virtual void set_outgoing_key(unsigned char const* key, int len) = 0; - virtual void encrypt(char* pos, int len) = 0; - virtual void decrypt(char* pos, int len) = 0; - virtual ~encryption_handler() {} + int encrypt(std::vector& iovec); + int decrypt(crypto_receive_buffer& recv_buffer, std::size_t& bytes_transferred); + + bool switch_send_crypto(boost::shared_ptr crypto + , int pending_encryption); + + void switch_recv_crypto(boost::shared_ptr crypto + , crypto_receive_buffer& recv_buffer); + + bool is_send_plaintext() const + { + return m_send_barriers.empty() || m_send_barriers.back().next != INT_MAX; + } + + bool is_recv_plaintext() const + { + return m_dec_handler.get() == NULL; + } + + private: + struct barrier + { + barrier(boost::shared_ptr plugin, int next) + : enc_handler(plugin), next(next) {} + boost::shared_ptr enc_handler; + // number of bytes to next barrier + int next; + }; + std::list m_send_barriers; + boost::shared_ptr m_dec_handler; }; - struct rc4_handler : encryption_handler + struct TORRENT_EXTRA_EXPORT rc4_handler : crypto_plugin { public: // Input longkeys must be 20 bytes @@ -108,39 +138,8 @@ namespace libtorrent #endif }; - void set_incoming_key(unsigned char const* key, int len) - { - m_decrypt = true; -#ifdef TORRENT_USE_GCRYPT - gcry_cipher_close(m_rc4_incoming); - gcry_cipher_open(&m_rc4_incoming, GCRY_CIPHER_ARCFOUR, GCRY_CIPHER_MODE_STREAM, 0); - gcry_cipher_setkey(m_rc4_incoming, key, len); -#elif defined TORRENT_USE_OPENSSL - RC4_set_key(&m_remote_key, len, key); -#else - rc4_init(key, len, &m_rc4_incoming); -#endif - // Discard first 1024 bytes - char buf[1024]; - decrypt(buf, 1024); - } - - void set_outgoing_key(unsigned char const* key, int len) - { - m_encrypt = true; -#ifdef TORRENT_USE_GCRYPT - gcry_cipher_close(m_rc4_outgoing); - gcry_cipher_open(&m_rc4_outgoing, GCRY_CIPHER_ARCFOUR, GCRY_CIPHER_MODE_STREAM, 0); - gcry_cipher_setkey(m_rc4_outgoing, key, len); -#elif defined TORRENT_USE_OPENSSL - RC4_set_key(&m_local_key, len, key); -#else - rc4_init(key, len, &m_rc4_outgoing); -#endif - // Discard first 1024 bytes - char buf[1024]; - encrypt(buf, 1024); - } + void set_incoming_key(unsigned char const* key, int len); + void set_outgoing_key(unsigned char const* key, int len); ~rc4_handler() { @@ -150,37 +149,11 @@ namespace libtorrent #endif }; - void encrypt(char* pos, int len) - { - if (!m_encrypt) return; - - TORRENT_ASSERT(len >= 0); - TORRENT_ASSERT(pos); - -#ifdef TORRENT_USE_GCRYPT - gcry_cipher_encrypt(m_rc4_outgoing, pos, len, 0, 0); -#elif defined TORRENT_USE_OPENSSL - RC4(&m_local_key, len, (const unsigned char*)pos, (unsigned char*)pos); -#else - rc4_encrypt((unsigned char*)pos, len, &m_rc4_outgoing); -#endif - } - - void decrypt(char* pos, int len) - { - if (!m_decrypt) return; - - TORRENT_ASSERT(len >= 0); - TORRENT_ASSERT(pos); - -#ifdef TORRENT_USE_GCRYPT - gcry_cipher_decrypt(m_rc4_incoming, pos, len, 0, 0); -#elif defined TORRENT_USE_OPENSSL - RC4(&m_remote_key, len, (const unsigned char*)pos, (unsigned char*)pos); -#else - rc4_encrypt((unsigned char*)pos, len, &m_rc4_incoming); -#endif - } + int encrypt(std::vector& buf); + void decrypt(std::vector& buf + , int& consume + , int& produce + , int& packet_size); private: #ifdef TORRENT_USE_GCRYPT diff --git a/include/libtorrent/peer_connection.hpp b/include/libtorrent/peer_connection.hpp index e5bad7fbd..25eda5bd6 100644 --- a/include/libtorrent/peer_connection.hpp +++ b/include/libtorrent/peer_connection.hpp @@ -85,6 +85,7 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/piece_picker.hpp" // for piece_block #include "libtorrent/socket.hpp" // for tcp::endpoint #include "libtorrent/io_service_fwd.hpp" +#include "libtorrent/receive_buffer.hpp" namespace libtorrent { @@ -198,8 +199,6 @@ namespace libtorrent , m_choked(true) , m_corked(false) , m_ignore_stats(false) - , m_recv_pos(0) - , m_packet_size(0) {} protected: @@ -272,15 +271,6 @@ namespace libtorrent // when this is set, the transfer stats for this connection // is not included in the torrent or session stats bool m_ignore_stats:1; - - // the byte offset in m_recv_buffer that we have - // are passing on to the upper layer. This is - // always <= m_recv_end - int m_recv_pos:24; - - // the size (in bytes) of the bittorrent message - // we're currently receiving - int m_packet_size; }; class TORRENT_EXTRA_EXPORT peer_connection @@ -680,21 +670,18 @@ namespace libtorrent return boost::optional(); } - // these functions are virtual to let bt_peer_connection hook into them - // and encrypt the content enum message_type_flags { message_type_request = 1 }; - virtual void send_buffer(char const* begin, int size, int flags = 0 - , void (*fun)(char*, int, void*) = 0, void* userdata = 0); - virtual void setup_send(); + void send_buffer(char const* begin, int size, int flags = 0); + void setup_send(); void cork_socket() { TORRENT_ASSERT(!m_corked); m_corked = true; } bool is_corked() const { return m_corked; } void uncork_socket(); - virtual void append_send_buffer(char* buffer, int size + void append_send_buffer(char* buffer, int size , chained_buffer::free_buffer_fun destructor = &nop , void* userdata = NULL, block_cache_reference ref - = block_cache_reference(), bool encrypted = false); + = block_cache_reference()); virtual void append_const_send_buffer(char const* buffer, int size , chained_buffer::free_buffer_fun destructor = &nop @@ -719,13 +706,6 @@ namespace libtorrent int send_buffer_capacity() const { return m_send_buffer.capacity(); } - int packet_size() const { return m_packet_size; } - - bool packet_finished() const - { return m_packet_size <= m_recv_pos; } - - int receive_pos() const { return m_recv_pos; } - void max_out_request_queue(int s) { m_max_out_request_queue = s; } int max_out_request_queue() const @@ -802,43 +782,9 @@ namespace libtorrent virtual void on_sent(error_code const& error , std::size_t bytes_transferred) = 0; -#ifndef TORRENT_DISABLE_ENCRYPTION - buffer::interval wr_recv_buffer() - { - if (m_recv_buffer.empty()) - { - TORRENT_ASSERT(m_recv_pos == 0); - return buffer::interval(0,0); - } - TORRENT_ASSERT(!m_disk_recv_buffer); - TORRENT_ASSERT(m_disk_recv_buffer_size == 0); - int rcv_pos = (std::min)(m_recv_pos, int(m_recv_buffer.size())); - return buffer::interval(&m_recv_buffer[0] + m_recv_start - , &m_recv_buffer[0] + m_recv_start + rcv_pos); - } - - std::pair wr_recv_buffers(int bytes); -#endif - - buffer::const_interval receive_buffer() const - { - if (m_recv_buffer.empty()) - { - TORRENT_ASSERT(m_recv_pos == 0); - return buffer::interval(0,0); - } - int rcv_pos = (std::min)(m_recv_pos, int(m_recv_buffer.size())); - return buffer::const_interval(&m_recv_buffer[0] + m_recv_start - , &m_recv_buffer[0] + m_recv_start + rcv_pos); - } + virtual int hit_send_barrier(std::vector& iovec) { return INT_MAX; } bool allocate_disk_receive_buffer(int disk_buffer_size); - char* release_disk_receive_buffer(); - bool has_disk_receive_buffer() const { return m_disk_recv_buffer; } - void cut_receive_buffer(int size, int packet_size, int offset = 0); - void reset_recv_buffer(int packet_size); - void normalize_receive_buffer(); - void set_soft_packet_size(int size) { m_soft_packet_size = size; } // if allow_encrypted is false, and the torrent 'ih' turns out // to be an encrypted torrent (AES-256 encrypted) the peer will @@ -865,6 +811,14 @@ namespace libtorrent void receive_data_impl(error_code const& error , std::size_t bytes_transferred, int read_loops); + void set_send_barrier(int bytes) + { + TORRENT_ASSERT(bytes == INT_MAX || bytes <= send_buffer_size()); + m_send_barrier = bytes; + } + + int get_send_barrier() const { return m_send_barrier; } + virtual int timeout() const; private: @@ -913,8 +867,7 @@ namespace libtorrent char m_channel_state[2]; protected: - - buffer m_recv_buffer; + receive_buffer m_recv_buffer; // number of bytes this peer can send and receive int m_quota[2]; @@ -922,9 +875,6 @@ namespace libtorrent // the blocks we have reserved in the piece // picker and will request from this peer. std::vector m_request_queue; - - // the start of the logical receive buffer - int m_recv_start:24; // this is the limit on the number of outstanding requests // we have to this peer. This is initialized to the settings @@ -1036,12 +986,6 @@ namespace libtorrent // for the round-robin unchoke algorithm. size_type m_uploaded_at_last_unchoke; - // some messages needs to be read from the socket - // buffer in multiple stages. This soft packet - // size limits the read size between message handler - // dispatch. Ignored when set to 0 - int m_soft_packet_size; - // the number of bytes that the other // end has to send us in order to respond // to all outstanding piece requests we @@ -1068,12 +1012,6 @@ namespace libtorrent handler_storage m_read_handler_storage; handler_storage m_write_handler_storage; - // if this peer is receiving a piece, this - // points to a disk buffer that the data is - // read into. This eliminates a memcopy from - // the receive buffer into the disk buffer - disk_buffer_holder m_disk_recv_buffer; - // we have suggested these pieces to the peer // don't suggest it again bitfield m_sent_suggested_pieces; @@ -1118,14 +1056,13 @@ namespace libtorrent // keeps track of the current quotas bandwidth_channel m_bandwidth_channel[num_channels]; - private: + protected: // statistics about upload and download speeds // and total amount of uploads and downloads for // this peer // TODO: factor this out into its own class with a virtual interface // torrent and session should implement this interface stat m_statistics; - protected: // if the timeout is extended for the outstanding // requests, this is the number of seconds it was @@ -1146,39 +1083,6 @@ namespace libtorrent // immediately int m_queued_time_critical; - // the number of valid, received bytes in m_recv_buffer - int m_recv_end:24; - -//#error 1 byte - - // recv_buf.begin (start of actual receive buffer) - // | - // | m_recv_start (logical start of current - // | | receive buffer, as perceived by upper layers) - // | | - // | | m_recv_pos (number of bytes consumed - // | | | by upper layer, from logical receive buffer) - // | | | - // | x---------x - // | | | recv_buf.end (end of actual receive buffer) - // | | | | - // v v v v - // *------==========--------- - // ^ - // | - // | - // ------------------->x m_recv_end (end of received data, - // beyond this point is garbage) - // m_recv_buffer - - // when not using contiguous receive buffers, there - // may be a disk_recv_buffer in the mix as well. Whenever - // m_disk_recv_buffer_size > 0 (and presumably also - // m_disk_recv_buffer != NULL) the disk buffer is imagined - // to be appended to the receive buffer right after m_recv_end. - - int m_disk_recv_buffer_size; - // the number of bytes we are currently reading // from disk, that will be added to the send // buffer as soon as they complete @@ -1228,6 +1132,9 @@ namespace libtorrent // us int m_est_reciprocation_rate; + // stop sending data after this many bytes, INT_MAX = inf + int m_send_barrier; + // the number of request we should queue up // at the remote end. boost::uint16_t m_desired_queue_size; diff --git a/include/libtorrent/torrent_peer.hpp b/include/libtorrent/torrent_peer.hpp index 554fc7d67..58c83766f 100644 --- a/include/libtorrent/torrent_peer.hpp +++ b/include/libtorrent/torrent_peer.hpp @@ -142,7 +142,7 @@ namespace libtorrent // from peer_info. unsigned source:6; -#ifndef TORRENT_DISABLE_ENCRYPTION +#if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) // Hints encryption support of torrent_peer. Only effective // for and when the outgoing encryption policy // allows both encrypted and non encrypted diff --git a/src/Makefile.am b/src/Makefile.am index 46686cc6b..e1191d9b8 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -98,6 +98,7 @@ libtorrent_rasterbar_la_SOURCES = \ peer_list.cpp \ puff.cpp \ random.cpp \ + receive_buffer.cpp \ request_blocks.cpp \ resolver.cpp \ rss.cpp \ diff --git a/src/bt_peer_connection.cpp b/src/bt_peer_connection.cpp index 5cea62b65..1ac92052d 100644 --- a/src/bt_peer_connection.cpp +++ b/src/bt_peer_connection.cpp @@ -59,7 +59,7 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/socket_type.hpp" #include "libtorrent/performance_counters.hpp" // for counters -#ifndef TORRENT_DISABLE_ENCRYPTION +#if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) #include "libtorrent/pe_crypto.hpp" #include "libtorrent/hasher.hpp" #endif @@ -104,12 +104,13 @@ namespace libtorrent , m_supports_fast(false) , m_sent_bitfield(false) , m_sent_handshake(false) -#ifndef TORRENT_DISABLE_ENCRYPTION +#if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) , m_encrypted(false) , m_rc4_encrypted(false) + , m_recv_buffer(peer_connection::m_recv_buffer) #endif , m_our_peer_id(pid) -#ifndef TORRENT_DISABLE_ENCRYPTION +#if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) , m_sync_bytes_read(0) #endif #ifndef TORRENT_DISABLE_EXTENSIONS @@ -142,7 +143,7 @@ namespace libtorrent // start in the state where we are trying to read the // handshake from the other side - reset_recv_buffer(20); + m_recv_buffer.reset(20); setup_receive(); } @@ -150,6 +151,19 @@ namespace libtorrent { } +#if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) + void bt_peer_connection::switch_send_crypto(boost::shared_ptr crypto) + { + if (m_enc_handler.switch_send_crypto(crypto, send_buffer_size() - get_send_barrier())) + set_send_barrier(send_buffer_size()); + } + + void bt_peer_connection::switch_recv_crypto(boost::shared_ptr crypto) + { + m_enc_handler.switch_recv_crypto(crypto, m_recv_buffer); + } +#endif + void bt_peer_connection::on_connected() { if (is_disconnecting()) return; @@ -170,7 +184,7 @@ namespace libtorrent // packet, or at least back-to-back packets cork c_(*this); -#ifndef TORRENT_DISABLE_ENCRYPTION +#if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) boost::uint8_t out_enc_policy = m_settings.get_int(settings_pack::out_enc_policy); @@ -190,7 +204,7 @@ namespace libtorrent if (is_disconnecting()) return; m_state = read_pe_dhkey; - reset_recv_buffer(dh_key_len); + m_recv_buffer.reset(dh_key_len); setup_receive(); } else if (out_enc_policy == settings_pack::pe_enabled) @@ -212,7 +226,7 @@ namespace libtorrent write_pe1_2_dhkey(); if (is_disconnecting()) return; m_state = read_pe_dhkey; - reset_recv_buffer(dh_key_len); + m_recv_buffer.reset(dh_key_len); setup_receive(); } else // pi->pe_support == false @@ -222,7 +236,7 @@ namespace libtorrent pi->pe_support = true; write_handshake(); - reset_recv_buffer(20); + m_recv_buffer.reset(20); setup_receive(); } } @@ -233,7 +247,7 @@ namespace libtorrent // start in the state where we are trying to read the // handshake from the other side - reset_recv_buffer(20); + m_recv_buffer.reset(20); setup_receive(); } } @@ -389,7 +403,7 @@ namespace libtorrent if (is_utp(*get_socket())) p.flags |= peer_info::utp_socket; if (is_ssl(*get_socket())) p.flags |= peer_info::ssl_socket; -#ifndef TORRENT_DISABLE_ENCRYPTION +#if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) if (m_encrypted) { p.flags |= m_rc4_encrypted @@ -411,7 +425,7 @@ namespace libtorrent return m_state < read_packet_size; } -#ifndef TORRENT_DISABLE_ENCRYPTION +#if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) void bt_peer_connection::write_pe1_2_dhkey() { @@ -521,7 +535,9 @@ namespace libtorrent #endif write_pe_vc_cryptofield(ptr, encrypt_size, crypto_provide, pad_size); - m_enc_handler->encrypt(ptr, encrypt_size); + std::vector vec; + vec.push_back(asio::mutable_buffer(ptr, encrypt_size)); + m_rc4->encrypt(vec); send_buffer(msg, sizeof(msg) - 512 + pad_size); } @@ -541,11 +557,13 @@ namespace libtorrent char msg[512 + 8 + 4 + 2]; write_pe_vc_cryptofield(msg, sizeof(msg), crypto_select, pad_size); - m_enc_handler->encrypt(msg, buf_size); + std::vector vec; + vec.push_back(asio::mutable_buffer(msg, buf_size)); + m_rc4->encrypt(vec); send_buffer(msg, buf_size); // encryption method has been negotiated - if (crypto_select == 0x02) + if (crypto_select == 0x02) m_rc4_encrypted = true; else // 0x01 m_rc4_encrypted = false; @@ -616,12 +634,12 @@ namespace libtorrent h.update((char const*)stream_key.begin(), 20); const sha1_hash remote_key = h.final(); - TORRENT_ASSERT(!m_enc_handler.get()); - m_enc_handler.reset(new (std::nothrow) rc4_handler); - m_enc_handler->set_incoming_key(&remote_key[0], 20); - m_enc_handler->set_outgoing_key(&local_key[0], 20); + TORRENT_ASSERT(!m_rc4.get()); + m_rc4 = boost::make_shared(); + m_rc4->set_incoming_key(&remote_key[0], 20); + m_rc4->set_outgoing_key(&local_key[0], 20); - if (!m_enc_handler) + if (!m_rc4) { disconnect(errors::no_memory, op_encryption); return; @@ -676,7 +694,17 @@ namespace libtorrent // no complete sync return -1; } -#endif // #ifndef TORRENT_DISABLE_ENCRYPTION + + void bt_peer_connection::rc4_decrypt(char* pos, int len) + { + std::vector vec; + vec.push_back(asio::mutable_buffer(pos, len)); + int consume = 0; + int produce = len; + int packet_size = 0; + m_rc4->decrypt(vec, consume, produce, packet_size); + } +#endif // #if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) void regular_c_free(char* buf, void* /* userdata */ , block_cache_reference /* ref */) @@ -688,14 +716,14 @@ namespace libtorrent , chained_buffer::free_buffer_fun destructor, void* userdata , block_cache_reference ref) { -#ifndef TORRENT_DISABLE_ENCRYPTION - if (m_encrypted && m_rc4_encrypted) +#if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) + if (!m_enc_handler.is_send_plaintext()) { // if we're encrypting this buffer, we need to make a copy // since we'll mutate it char* buf = (char*)malloc(size); memcpy(buf, buffer, size); - bt_peer_connection::append_send_buffer(buf, size, ®ular_c_free, NULL); + append_send_buffer(buf, size, ®ular_c_free, NULL); destructor((char*)buffer, userdata, ref); } else @@ -706,48 +734,6 @@ namespace libtorrent } } - void bt_peer_connection::append_send_buffer(char* buffer, int size - , chained_buffer::free_buffer_fun destructor, void* userdata - , block_cache_reference ref, bool encrypted) - { - TORRENT_ASSERT(encrypted == false); -#ifndef TORRENT_DISABLE_ENCRYPTION - if (m_rc4_encrypted) - m_enc_handler->encrypt(buffer, size); -#endif - peer_connection::append_send_buffer(buffer, size, destructor - , userdata, ref, true); - } - -#ifndef TORRENT_DISABLE_ENCRYPTION - void encrypt(char* buf, int len, void* userdata) - { - rc4_handler* rc4 = (rc4_handler*)userdata; - rc4->encrypt(buf, len); - } -#endif - - void bt_peer_connection::send_buffer(char const* buf, int size, int flags - , void (*f)(char*, int, void*), void* ud) - { - TORRENT_ASSERT(f == 0); - TORRENT_ASSERT(ud == 0); - TORRENT_ASSERT(buf); - TORRENT_ASSERT(size > 0); - - void* userdata = 0; - void (*fun)(char*, int, void*) = 0; -#ifndef TORRENT_DISABLE_ENCRYPTION - if (m_encrypted && m_rc4_encrypted) - { - fun = encrypt; - userdata = m_enc_handler.get(); - } -#endif - - peer_connection::send_buffer(buf, size, flags, fun, userdata); - } - void bt_peer_connection::write_handshake(bool plain_handshake) { INVARIANT_CHECK; @@ -867,7 +853,7 @@ namespace libtorrent boost::shared_ptr t = associated_torrent().lock(); TORRENT_ASSERT(t); - buffer::const_interval recv_buffer = receive_buffer(); + buffer::const_interval recv_buffer = m_recv_buffer.get(); // are we currently receiving a 'piece' message? if (m_state != read_packet || recv_buffer.left() <= 9 @@ -878,7 +864,7 @@ namespace libtorrent peer_request r; r.piece = detail::read_int32(ptr); r.start = detail::read_int32(ptr); - r.length = packet_size() - 9; + r.length = m_recv_buffer.packet_size() - 9; // is any of the piece message header data invalid? if (!verify_piece(r)) @@ -921,12 +907,12 @@ namespace libtorrent TORRENT_ASSERT(received >= 0); received_bytes(0, received); - if (packet_size() != 1) + if (m_recv_buffer.packet_size() != 1) { disconnect(errors::invalid_choke, op_bittorrent, 2); return; } - if (!packet_finished()) return; + if (!m_recv_buffer.packet_finished()) return; incoming_choke(); if (is_disconnecting()) return; @@ -971,12 +957,12 @@ namespace libtorrent TORRENT_ASSERT(received >= 0); received_bytes(0, received); - if (packet_size() != 1) + if (m_recv_buffer.packet_size() != 1) { disconnect(errors::invalid_unchoke, op_bittorrent, 2); return; } - if (!packet_finished()) return; + if (!m_recv_buffer.packet_finished()) return; incoming_unchoke(); } @@ -991,12 +977,12 @@ namespace libtorrent TORRENT_ASSERT(received >= 0); received_bytes(0, received); - if (packet_size() != 1) + if (m_recv_buffer.packet_size() != 1) { disconnect(errors::invalid_interested, op_bittorrent, 2); return; } - if (!packet_finished()) return; + if (!m_recv_buffer.packet_finished()) return; incoming_interested(); } @@ -1011,12 +997,12 @@ namespace libtorrent TORRENT_ASSERT(received >= 0); received_bytes(0, received); - if (packet_size() != 1) + if (m_recv_buffer.packet_size() != 1) { disconnect(errors::invalid_not_interested, op_bittorrent, 2); return; } - if (!packet_finished()) return; + if (!m_recv_buffer.packet_finished()) return; incoming_not_interested(); } @@ -1031,14 +1017,14 @@ namespace libtorrent TORRENT_ASSERT(received >= 0); received_bytes(0, received); - if (packet_size() != 5) + if (m_recv_buffer.packet_size() != 5) { disconnect(errors::invalid_have, op_bittorrent, 2); return; } - if (!packet_finished()) return; + if (!m_recv_buffer.packet_finished()) return; - buffer::const_interval recv_buffer = receive_buffer(); + buffer::const_interval recv_buffer = m_recv_buffer.get(); const char* ptr = recv_buffer.begin + 1; int index = detail::read_int32(ptr); @@ -1063,19 +1049,19 @@ namespace libtorrent // if we don't have the metedata, we cannot // verify the bitfield size if (t->valid_metadata() - && packet_size() - 1 != (t->torrent_file().num_pieces() + 7) / 8) + && m_recv_buffer.packet_size() - 1 != (t->torrent_file().num_pieces() + 7) / 8) { disconnect(errors::invalid_bitfield_size, op_bittorrent, 2); return; } - if (!packet_finished()) return; + if (!m_recv_buffer.packet_finished()) return; - buffer::const_interval recv_buffer = receive_buffer(); + buffer::const_interval recv_buffer = m_recv_buffer.get(); bitfield bits; bits.assign((char*)recv_buffer.begin + 1 - , t->valid_metadata()?get_bitfield().size():(packet_size()-1)*8); + , t->valid_metadata()?get_bitfield().size():(m_recv_buffer.packet_size()-1)*8); incoming_bitfield(bits); } @@ -1090,14 +1076,14 @@ namespace libtorrent TORRENT_ASSERT(received >= 0); received_bytes(0, received); - if (packet_size() != 13) + if (m_recv_buffer.packet_size() != 13) { disconnect(errors::invalid_request, op_bittorrent, 2); return; } - if (!packet_finished()) return; + if (!m_recv_buffer.packet_finished()) return; - buffer::const_interval recv_buffer = receive_buffer(); + buffer::const_interval recv_buffer = m_recv_buffer.get(); peer_request r; const char* ptr = recv_buffer.begin + 1; @@ -1118,8 +1104,8 @@ namespace libtorrent TORRENT_ASSERT(received >= 0); - buffer::const_interval recv_buffer = receive_buffer(); - int recv_pos = receive_pos(); // recv_buffer.end - recv_buffer.begin; + buffer::const_interval recv_buffer = m_recv_buffer.get(); + int recv_pos = m_recv_buffer.pos(); // recv_buffer.end - recv_buffer.begin; boost::shared_ptr t = associated_torrent().lock(); TORRENT_ASSERT(t); @@ -1128,7 +1114,7 @@ namespace libtorrent { if (recv_pos == 1) { - set_soft_packet_size(13); + m_recv_buffer.set_soft_packet_size(13); received_bytes(0, received); return; } @@ -1145,22 +1131,23 @@ namespace libtorrent // and we can allocate the disk buffer and receive // into it - if (list_size > packet_size() - 13) + if (list_size > m_recv_buffer.packet_size() - 13) { disconnect(errors::invalid_hash_list, op_bittorrent, 2); return; } - if (packet_size() - 13 - list_size > t->block_size()) + if (m_recv_buffer.packet_size() - 13 - list_size > t->block_size()) { disconnect(errors::packet_too_large, op_bittorrent, 2); return; } - TORRENT_ASSERT(!has_disk_receive_buffer()); - if (!m_settings.get_bool(settings_pack::contiguous_recv_buffer)) + m_recv_buffer.assert_no_disk_buffer(); + if (!m_settings.get_bool(settings_pack::contiguous_recv_buffer) && + m_recv_buffer.can_recv_contiguous(m_recv_buffer.packet_size() - 13 - list_size)) { - if (!allocate_disk_receive_buffer(packet_size() - 13 - list_size)) + if (!allocate_disk_receive_buffer(m_recv_buffer.packet_size() - 13 - list_size)) { received_bytes(0, received); return; @@ -1172,17 +1159,18 @@ namespace libtorrent { if (recv_pos == 1) { - TORRENT_ASSERT(!has_disk_receive_buffer()); + m_recv_buffer.assert_no_disk_buffer(); - if (packet_size() - 9 > t->block_size()) + if (m_recv_buffer.packet_size() - 9 > t->block_size()) { disconnect(errors::packet_too_large, op_bittorrent, 2); return; } - if (!m_settings.get_bool(settings_pack::contiguous_recv_buffer)) + if (!m_settings.get_bool(settings_pack::contiguous_recv_buffer) && + m_recv_buffer.can_recv_contiguous(m_recv_buffer.packet_size() - 9)) { - if (!allocate_disk_receive_buffer(packet_size() - 9)) + if (!allocate_disk_receive_buffer(m_recv_buffer.packet_size() - 9)) { received_bytes(0, received); return; @@ -1190,7 +1178,7 @@ namespace libtorrent } } } - TORRENT_ASSERT(m_settings.get_bool(settings_pack::contiguous_recv_buffer) || has_disk_receive_buffer() || packet_size() == 9); + TORRENT_ASSERT(m_settings.get_bool(settings_pack::contiguous_recv_buffer) || m_recv_buffer.has_disk_buffer() || m_recv_buffer.packet_size() == 9); // classify the received data as protocol chatter // or data payload for the statistics int piece_bytes = 0; @@ -1209,12 +1197,12 @@ namespace libtorrent if (merkle) { list_size = detail::read_int32(ptr); - p.length = packet_size() - list_size - header_size; + p.length = m_recv_buffer.packet_size() - list_size - header_size; header_size += list_size; } else { - p.length = packet_size() - header_size; + p.length = m_recv_buffer.packet_size() - header_size; } } @@ -1256,10 +1244,10 @@ namespace libtorrent if (is_disconnecting()) return; } - TORRENT_ASSERT(m_settings.get_bool(settings_pack::contiguous_recv_buffer) || has_disk_receive_buffer() || packet_size() == header_size); + TORRENT_ASSERT(m_settings.get_bool(settings_pack::contiguous_recv_buffer) || m_recv_buffer.has_disk_buffer() || m_recv_buffer.packet_size() == header_size); incoming_piece_fragment(piece_bytes); - if (!packet_finished()) return; + if (!m_recv_buffer.packet_finished()) return; if (merkle && list_size > 0) { @@ -1303,7 +1291,7 @@ namespace libtorrent } } - char* disk_buffer = release_disk_receive_buffer(); + char* disk_buffer = m_recv_buffer.release_disk_buffer(); if (disk_buffer) { disk_buffer_holder holder(m_allocator, disk_buffer); @@ -1325,14 +1313,14 @@ namespace libtorrent TORRENT_ASSERT(received >= 0); received_bytes(0, received); - if (packet_size() != 13) + if (m_recv_buffer.packet_size() != 13) { disconnect(errors::invalid_cancel, op_bittorrent, 2); return; } - if (!packet_finished()) return; + if (!m_recv_buffer.packet_finished()) return; - buffer::const_interval recv_buffer = receive_buffer(); + buffer::const_interval recv_buffer = m_recv_buffer.get(); peer_request r; const char* ptr = recv_buffer.begin + 1; @@ -1353,14 +1341,14 @@ namespace libtorrent TORRENT_ASSERT(received >= 0); received_bytes(0, received); - if (packet_size() != 3) + if (m_recv_buffer.packet_size() != 3) { disconnect(errors::invalid_dht_port, op_bittorrent, 2); return; } - if (!packet_finished()) return; + if (!m_recv_buffer.packet_finished()) return; - buffer::const_interval recv_buffer = receive_buffer(); + buffer::const_interval recv_buffer = m_recv_buffer.get(); const char* ptr = recv_buffer.begin + 1; int listen_port = detail::read_uint16(ptr); @@ -1388,9 +1376,9 @@ namespace libtorrent return; } - if (!packet_finished()) return; + if (!m_recv_buffer.packet_finished()) return; - buffer::const_interval recv_buffer = receive_buffer(); + buffer::const_interval recv_buffer = m_recv_buffer.get(); const char* ptr = recv_buffer.begin + 1; int piece = detail::read_uint32(ptr); @@ -1434,9 +1422,9 @@ namespace libtorrent return; } - if (!packet_finished()) return; + if (!m_recv_buffer.packet_finished()) return; - buffer::const_interval recv_buffer = receive_buffer(); + buffer::const_interval recv_buffer = m_recv_buffer.get(); peer_request r; const char* ptr = recv_buffer.begin + 1; @@ -1458,8 +1446,8 @@ namespace libtorrent return; } - if (!packet_finished()) return; - buffer::const_interval recv_buffer = receive_buffer(); + if (!m_recv_buffer.packet_finished()) return; + buffer::const_interval recv_buffer = m_recv_buffer.get(); const char* ptr = recv_buffer.begin + 1; int index = detail::read_int32(ptr); @@ -1475,14 +1463,14 @@ namespace libtorrent { INVARIANT_CHECK; - if (!packet_finished()) return; + if (!m_recv_buffer.packet_finished()) return; // we can't accept holepunch messages from peers // that don't support the holepunch extension // because we wouldn't be able to respond if (m_holepunch_id == 0) return; - buffer::const_interval recv_buffer = receive_buffer(); + buffer::const_interval recv_buffer = m_recv_buffer.get(); TORRENT_ASSERT(*recv_buffer.begin == msg_extended); ++recv_buffer.begin; TORRENT_ASSERT(*recv_buffer.begin == holepunch_msg); @@ -1671,7 +1659,7 @@ namespace libtorrent TORRENT_ASSERT(received >= 0); received_bytes(0, received); - if (packet_size() < 2) + if (m_recv_buffer.packet_size() < 2) { disconnect(errors::invalid_extended, op_bittorrent, 2); return; @@ -1683,7 +1671,7 @@ namespace libtorrent return; } - buffer::const_interval recv_buffer = receive_buffer(); + buffer::const_interval recv_buffer = m_recv_buffer.get(); if (recv_buffer.left() < 2) return; TORRENT_ASSERT(*recv_buffer.begin == msg_extended); @@ -1700,11 +1688,11 @@ namespace libtorrent if (extended_id == upload_only_msg) { - if (!packet_finished()) return; - if (packet_size() != 3) + if (!m_recv_buffer.packet_finished()) return; + if (m_recv_buffer.packet_size() != 3) { #ifdef TORRENT_VERBOSE_LOGGING - peer_log("<== UPLOAD_ONLY [ ERROR: unexpected packet size: %d ]", packet_size()); + peer_log("<== UPLOAD_ONLY [ ERROR: unexpected packet size: %d ]", m_recv_buffer.packet_size()); #endif return; } @@ -1718,11 +1706,11 @@ namespace libtorrent if (extended_id == share_mode_msg) { - if (!packet_finished()) return; - if (packet_size() != 3) + if (!m_recv_buffer.packet_finished()) return; + if (m_recv_buffer.packet_size() != 3) { #ifdef TORRENT_VERBOSE_LOGGING - peer_log("<== SHARE_MODE [ ERROR: unexpected packet size: %d ]", packet_size()); + peer_log("<== SHARE_MODE [ ERROR: unexpected packet size: %d ]", m_recv_buffer.packet_size()); #endif return; } @@ -1736,7 +1724,7 @@ namespace libtorrent if (extended_id == holepunch_msg) { - if (!packet_finished()) return; + if (!m_recv_buffer.packet_finished()) return; #ifdef TORRENT_VERBOSE_LOGGING peer_log("<== HOLEPUNCH"); #endif @@ -1746,11 +1734,11 @@ namespace libtorrent if (extended_id == dont_have_msg) { - if (!packet_finished()) return; - if (packet_size() != 6) + if (!m_recv_buffer.packet_finished()) return; + if (m_recv_buffer.packet_size() != 6) { #ifdef TORRENT_VERBOSE_LOGGING - peer_log("<== DONT_HAVE [ ERROR: unexpected packet size: %d ]", packet_size()); + peer_log("<== DONT_HAVE [ ERROR: unexpected packet size: %d ]", m_recv_buffer.packet_size()); #endif return; } @@ -1760,15 +1748,15 @@ namespace libtorrent } #ifdef TORRENT_VERBOSE_LOGGING - if (packet_finished()) + if (m_recv_buffer.packet_finished()) peer_log("<== EXTENSION MESSAGE [ msg: %d size: %d ]" - , extended_id, packet_size()); + , extended_id, m_recv_buffer.packet_size()); #endif for (extension_list_t::iterator i = m_extensions.begin() , end(m_extensions.end()); i != end; ++i) { - if ((*i)->on_extended(packet_size() - 2, extended_id + if ((*i)->on_extended(m_recv_buffer.packet_size() - 2, extended_id , recv_buffer)) return; } @@ -1779,12 +1767,12 @@ namespace libtorrent void bt_peer_connection::on_extended_handshake() { - if (!packet_finished()) return; + if (!m_recv_buffer.packet_finished()) return; boost::shared_ptr t = associated_torrent().lock(); TORRENT_ASSERT(t); - buffer::const_interval recv_buffer = receive_buffer(); + buffer::const_interval recv_buffer = m_recv_buffer.get(); lazy_entry root; error_code ec; @@ -1901,7 +1889,7 @@ namespace libtorrent return false; } - buffer::const_interval recv_buffer = receive_buffer(); + buffer::const_interval recv_buffer = m_recv_buffer.get(); TORRENT_ASSERT(recv_buffer.left() >= 1); int packet_type = (unsigned char)recv_buffer[0]; @@ -1917,10 +1905,10 @@ namespace libtorrent for (extension_list_t::iterator i = m_extensions.begin() , end(m_extensions.end()); i != end; ++i) { - if ((*i)->on_unknown_message(packet_size(), packet_type + if ((*i)->on_unknown_message(m_recv_buffer.packet_size(), packet_type , buffer::const_interval(recv_buffer.begin+1 , recv_buffer.end))) - return packet_finished(); + return m_recv_buffer.packet_finished(); } #endif @@ -1929,7 +1917,7 @@ namespace libtorrent // break in debug builds to allow investigation // TORRENT_ASSERT(false); disconnect(errors::invalid_message, op_bittorrent); - return packet_finished(); + return m_recv_buffer.packet_finished(); } TORRENT_ASSERT(m_message_handler[packet_type] != 0); @@ -1949,7 +1937,7 @@ namespace libtorrent TORRENT_ASSERT(stats_diff == received); #endif - bool finished = packet_finished(); + bool finished = m_recv_buffer.packet_finished(); if (finished) { @@ -2130,7 +2118,7 @@ namespace libtorrent int lazy_piece = 0; if (t->is_seed() && m_settings.get_bool(settings_pack::lazy_bitfields) -#ifndef TORRENT_DISABLE_ENCRYPTION +#if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) && !m_encrypted #endif ) @@ -2277,7 +2265,7 @@ namespace libtorrent && !t->share_mode() && !t->super_seeding() && (!m_settings.get_bool(settings_pack::lazy_bitfields) -#ifndef TORRENT_DISABLE_ENCRYPTION +#if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) || m_encrypted #endif )) @@ -2534,21 +2522,56 @@ namespace libtorrent // packet, or at least back-to-back packets cork c_(*this); - boost::shared_ptr t = associated_torrent().lock(); - -#ifndef TORRENT_DISABLE_ENCRYPTION - TORRENT_ASSERT(in_handshake() || !m_rc4_encrypted || m_encrypted); - if (m_rc4_encrypted && m_encrypted) +#if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) + if (!m_enc_handler.is_recv_plaintext()) { - std::pair wr_buf = wr_recv_buffers(bytes_transferred); - m_enc_handler->decrypt(wr_buf.first.begin, wr_buf.first.left()); - if (wr_buf.second.left()) m_enc_handler->decrypt(wr_buf.second.begin, wr_buf.second.left()); + int consumed = m_enc_handler.decrypt(m_recv_buffer, bytes_transferred); + #ifdef TORRENT_VERBOSE_LOGGING + if (consumed + bytes_transferred > 0) + peer_log("<== decrypted block [ s = %d ]", consumed + bytes_transferred); + #endif + if (bytes_transferred == SIZE_MAX) + { + disconnect(errors::parse_failed, op_encryption); + return; + } + received_bytes(0, consumed); + + int sub_transferred = 0; + while (bytes_transferred > 0 && + ((sub_transferred = m_recv_buffer.advance_pos(bytes_transferred)) > 0)) + { + #if TORRENT_USE_ASSERTS + size_type cur_payload_dl = m_statistics.last_payload_downloaded(); + size_type cur_protocol_dl = m_statistics.last_protocol_downloaded(); + #endif + on_receive_impl(sub_transferred); + bytes_transferred -= sub_transferred; + TORRENT_ASSERT(sub_transferred > 0); + + #if TORRENT_USE_ASSERTS + TORRENT_ASSERT(m_statistics.last_payload_downloaded() - cur_payload_dl >= 0); + TORRENT_ASSERT(m_statistics.last_protocol_downloaded() - cur_protocol_dl >= 0); + size_type stats_diff = m_statistics.last_payload_downloaded() - cur_payload_dl + + m_statistics.last_protocol_downloaded() - cur_protocol_dl; + TORRENT_ASSERT(stats_diff == int(sub_transferred)); + #endif + + if (m_disconnecting) return; + } } + else #endif + on_receive_impl(bytes_transferred); + } - buffer::const_interval recv_buffer = receive_buffer(); + void bt_peer_connection::on_receive_impl(std::size_t bytes_transferred) + { + boost::shared_ptr t = associated_torrent().lock(); -#ifndef TORRENT_DISABLE_ENCRYPTION + buffer::const_interval recv_buffer = m_recv_buffer.get(); + +#if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) // m_state is set to read_pe_dhkey in initial state // (read_protocol_identifier) for incoming, or in constructor // for outgoing @@ -2558,10 +2581,10 @@ namespace libtorrent TORRENT_ASSERT(!m_encrypted); TORRENT_ASSERT(!m_rc4_encrypted); - TORRENT_ASSERT(packet_size() == dh_key_len); - TORRENT_ASSERT(recv_buffer == receive_buffer()); + TORRENT_ASSERT(m_recv_buffer.packet_size() == dh_key_len); + TORRENT_ASSERT(recv_buffer == m_recv_buffer.get()); - if (!packet_finished()) return; + if (!m_recv_buffer.packet_finished()) return; // write our dh public key. m_dh_key_exchange is // initialized in write_pe1_2_dhkey() @@ -2578,7 +2601,7 @@ namespace libtorrent #ifdef TORRENT_VERBOSE_LOGGING peer_log("*** received DH key"); #endif - + // PadA/B can be a max of 512 bytes, and 20 bytes more for // the sync hash (if incoming), or 8 bytes more for the // encrypted verification constant (if outgoing). Instead @@ -2594,24 +2617,22 @@ namespace libtorrent // initial payload is the standard handshake, this is // always rc4 if sent here. m_rc4_encrypted is flagged // again according to peer selection. - m_rc4_encrypted = true; - m_encrypted = true; + switch_send_crypto(m_rc4); write_handshake(true); - m_rc4_encrypted = false; - m_encrypted = false; + switch_send_crypto(boost::shared_ptr()); // vc,crypto_select,len(pad),pad, encrypt(handshake) // 8+4+2+0+handshake_len - reset_recv_buffer(8+4+2+0+handshake_len); + m_recv_buffer.reset(8+4+2+0+handshake_len); } else { // already written dh key m_state = read_pe_synchash; // synchash,skeyhash,vc,crypto_provide,len(pad),pad,encrypt(handshake) - reset_recv_buffer(20+20+8+4+2+0+handshake_len); + m_recv_buffer.reset(20+20+8+4+2+0+handshake_len); } - TORRENT_ASSERT(!packet_finished()); + TORRENT_ASSERT(!m_recv_buffer.packet_finished()); return; } @@ -2621,13 +2642,13 @@ namespace libtorrent TORRENT_ASSERT(!m_encrypted); TORRENT_ASSERT(!m_rc4_encrypted); TORRENT_ASSERT(!is_outgoing()); - TORRENT_ASSERT(recv_buffer == receive_buffer()); - - if (recv_buffer.left() < 20) + TORRENT_ASSERT(recv_buffer == m_recv_buffer.get()); + + if (recv_buffer.left() < 20) { received_bytes(0, bytes_transferred); - if (packet_finished()) + if (m_recv_buffer.packet_finished()) disconnect(errors::sync_hash_not_found, op_bittorrent, 1); return; } @@ -2666,10 +2687,10 @@ namespace libtorrent return; } - cut_receive_buffer(bytes_processed, (std::min)(packet_size() + m_recv_buffer.cut(bytes_processed, (std::min)(m_recv_buffer.packet_size() , (512+20) - m_sync_bytes_read)); - TORRENT_ASSERT(!packet_finished()); + TORRENT_ASSERT(!m_recv_buffer.packet_finished()); return; } // found complete sync @@ -2687,7 +2708,7 @@ namespace libtorrent TORRENT_ASSERT(transferred_used <= int(bytes_transferred)); received_bytes(0, transferred_used); bytes_transferred -= transferred_used; - cut_receive_buffer(bytes_processed, 28); + m_recv_buffer.cut(bytes_processed, 28); } } @@ -2699,13 +2720,13 @@ namespace libtorrent TORRENT_ASSERT(!m_encrypted); TORRENT_ASSERT(!m_rc4_encrypted); TORRENT_ASSERT(!is_outgoing()); - TORRENT_ASSERT(packet_size() == 28); + TORRENT_ASSERT(m_recv_buffer.packet_size() == 28); - if (!packet_finished()) return; + if (!m_recv_buffer.packet_finished()) return; if (is_disconnecting()) return; TORRENT_ASSERT(!is_disconnecting()); - recv_buffer = receive_buffer(); + recv_buffer = m_recv_buffer.get(); TORRENT_ASSERT(!is_disconnecting()); @@ -2730,15 +2751,15 @@ namespace libtorrent #endif } - if (!m_enc_handler.get()) + if (!m_rc4.get()) { disconnect(errors::invalid_info_hash, op_bittorrent, 1); return; } // verify constant - buffer::interval wr_recv_buf = wr_recv_buffer(); - m_enc_handler->decrypt(wr_recv_buf.begin + 20, 8); + buffer::interval wr_recv_buf = m_recv_buffer.mutable_buffer(); + rc4_decrypt(wr_recv_buf.begin + 20, 8); wr_recv_buf.begin += 28; const char sh_vc[] = {0,0,0,0, 0,0,0,0}; @@ -2752,7 +2773,7 @@ namespace libtorrent peer_log("*** verification constant found"); #endif m_state = read_pe_cryptofield; - reset_recv_buffer(4 + 2); + m_recv_buffer.reset(4 + 2); } // cannot fall through into @@ -2761,12 +2782,12 @@ namespace libtorrent TORRENT_ASSERT(is_outgoing()); TORRENT_ASSERT(!m_encrypted); TORRENT_ASSERT(!m_rc4_encrypted); - TORRENT_ASSERT(recv_buffer == receive_buffer()); + TORRENT_ASSERT(recv_buffer == m_recv_buffer.get()); if (recv_buffer.left() < 8) { received_bytes(0, bytes_transferred); - if (packet_finished()) + if (m_recv_buffer.packet_finished()) disconnect(errors::invalid_encryption_constant, op_encryption, 2); return; } @@ -2783,7 +2804,7 @@ namespace libtorrent return; } std::fill(m_sync_vc.get(), m_sync_vc.get() + 8, 0); - m_enc_handler->decrypt(m_sync_vc.get(), 8); + rc4_decrypt(m_sync_vc.get(), 8); } TORRENT_ASSERT(m_sync_vc.get()); @@ -2803,10 +2824,10 @@ namespace libtorrent return; } - cut_receive_buffer(bytes_processed, (std::min)(packet_size() + m_recv_buffer.cut(bytes_processed, (std::min)(m_recv_buffer.packet_size() , (512+8) - m_sync_bytes_read)); - TORRENT_ASSERT(!packet_finished()); + TORRENT_ASSERT(!m_recv_buffer.packet_finished()); } // found complete sync else @@ -2821,7 +2842,7 @@ namespace libtorrent received_bytes(0, transferred_used); bytes_transferred -= transferred_used; - cut_receive_buffer(bytes_processed, 4 + 2); + m_recv_buffer.cut(bytes_processed, 4 + 2); // delete verification constant m_sync_vc.reset(); @@ -2834,16 +2855,16 @@ namespace libtorrent { TORRENT_ASSERT(!m_encrypted); TORRENT_ASSERT(!m_rc4_encrypted); - TORRENT_ASSERT(packet_size() == 4+2); + TORRENT_ASSERT(m_recv_buffer.packet_size() == 4+2); received_bytes(0, bytes_transferred); bytes_transferred = 0; - if (!packet_finished()) return; + if (!m_recv_buffer.packet_finished()) return; - buffer::interval wr_buf = wr_recv_buffer(); - m_enc_handler->decrypt(wr_buf.begin, packet_size()); + buffer::interval wr_buf = m_recv_buffer.mutable_buffer(); + rc4_decrypt(wr_buf.begin, m_recv_buffer.packet_size()); - recv_buffer = receive_buffer(); + recv_buffer = m_recv_buffer.get(); int crypto_field = detail::read_int32(recv_buffer.begin); @@ -2918,16 +2939,21 @@ namespace libtorrent m_state = read_pe_pad; if (!is_outgoing()) - reset_recv_buffer(len_pad + 2); // len(IA) at the end of pad + m_recv_buffer.reset(len_pad + 2); // len(IA) at the end of pad else { if (len_pad == 0) { m_encrypted = true; + if (m_rc4_encrypted) + { + switch_send_crypto(m_rc4); + switch_recv_crypto(m_rc4); + } m_state = init_bt_handshake; } else - reset_recv_buffer(len_pad); + m_recv_buffer.reset(len_pad); } } @@ -2936,15 +2962,15 @@ namespace libtorrent TORRENT_ASSERT(!m_encrypted); received_bytes(0, bytes_transferred); bytes_transferred = 0; - if (!packet_finished()) return; + if (!m_recv_buffer.packet_finished()) return; - int pad_size = is_outgoing() ? packet_size() : packet_size() - 2; + int pad_size = is_outgoing() ? m_recv_buffer.packet_size() : m_recv_buffer.packet_size() - 2; - buffer::interval wr_buf = wr_recv_buffer(); - m_enc_handler->decrypt(wr_buf.begin, packet_size()); + buffer::interval wr_buf = m_recv_buffer.mutable_buffer(); + rc4_decrypt(wr_buf.begin, m_recv_buffer.packet_size()); + + recv_buffer = m_recv_buffer.get(); - recv_buffer = receive_buffer(); - if (!is_outgoing()) { recv_buffer.begin += pad_size; @@ -2963,18 +2989,28 @@ namespace libtorrent { // everything after this is Encrypt2 m_encrypted = true; + if (m_rc4_encrypted) + { + switch_send_crypto(m_rc4); + switch_recv_crypto(m_rc4); + } m_state = init_bt_handshake; } else { m_state = read_pe_ia; - reset_recv_buffer(len_ia); + m_recv_buffer.reset(len_ia); } } else // is_outgoing() { // everything that arrives after this is Encrypt2 m_encrypted = true; + if (m_rc4_encrypted) + { + switch_send_crypto(m_rc4); + switch_recv_crypto(m_rc4); + } m_state = init_bt_handshake; } } @@ -2986,29 +3022,27 @@ namespace libtorrent TORRENT_ASSERT(!is_outgoing()); TORRENT_ASSERT(!m_encrypted); - if (!packet_finished()) return; + if (!m_recv_buffer.packet_finished()) return; // ia is always rc4, so decrypt it - buffer::interval wr_buf = wr_recv_buffer(); - m_enc_handler->decrypt(wr_buf.begin, packet_size()); + buffer::interval wr_buf = m_recv_buffer.mutable_buffer(); + rc4_decrypt(wr_buf.begin, m_recv_buffer.packet_size()); #ifdef TORRENT_VERBOSE_LOGGING - peer_log("*** decrypted ia : %d bytes", packet_size()); + peer_log("*** decrypted ia : %d bytes", m_recv_buffer.packet_size()); #endif - if (!m_rc4_encrypted) - { - m_enc_handler.reset(); -#ifdef TORRENT_VERBOSE_LOGGING - peer_log("*** destroyed rc4 keys"); -#endif - } - // everything that arrives after this is encrypted m_encrypted = true; + if (m_rc4_encrypted) + { + switch_send_crypto(m_rc4); + switch_recv_crypto(m_rc4); + } + m_rc4.reset(); m_state = read_protocol_identifier; - cut_receive_buffer(0, 20); + m_recv_buffer.cut(0, 20); } if (m_state == init_bt_handshake) @@ -3039,24 +3073,19 @@ namespace libtorrent // decrypt remaining received bytes if (m_rc4_encrypted) { - buffer::interval wr_buf = wr_recv_buffer(); - wr_buf.begin += packet_size(); - m_enc_handler->decrypt(wr_buf.begin, wr_buf.left()); + buffer::interval wr_buf = m_recv_buffer.mutable_buffer(); + wr_buf.begin += m_recv_buffer.packet_size(); + rc4_decrypt(wr_buf.begin, wr_buf.left()); + #ifdef TORRENT_VERBOSE_LOGGING peer_log("*** decrypted remaining %d bytes", wr_buf.left()); #endif } - else // !m_rc4_encrypted - { - m_enc_handler.reset(); -#ifdef TORRENT_VERBOSE_LOGGING - peer_log("*** destroyed encryption handler"); -#endif - } + m_rc4.reset(); // payload stream, start with 20 handshake bytes m_state = read_protocol_identifier; - reset_recv_buffer(20); + m_recv_buffer.reset(20); // encrypted portion of handshake completed, toggle // peer_info pe_support flag back to true @@ -3069,19 +3098,18 @@ namespace libtorrent pi->pe_support = true; } - } -#endif // #ifndef TORRENT_DISABLE_ENCRYPTION +#endif // #if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) if (m_state == read_protocol_identifier) { received_bytes(0, bytes_transferred); bytes_transferred = 0; - TORRENT_ASSERT(packet_size() == 20); + TORRENT_ASSERT(m_recv_buffer.packet_size() == 20); - if (!packet_finished()) return; - recv_buffer = receive_buffer(); + if (!m_recv_buffer.packet_finished()) return; + recv_buffer = m_recv_buffer.get(); int packet_size = recv_buffer[0]; const char protocol_string[] = "\x13" "BitTorrent protocol"; @@ -3089,7 +3117,7 @@ namespace libtorrent if (packet_size != 19 || memcmp(recv_buffer.begin, protocol_string, 20) != 0) { -#ifndef TORRENT_DISABLE_ENCRYPTION +#if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) #ifdef TORRENT_VERBOSE_LOGGING peer_log("*** unrecognized protocol header"); #endif @@ -3127,8 +3155,8 @@ namespace libtorrent peer_log("*** attempting encrypted connection"); #endif m_state = read_pe_dhkey; - cut_receive_buffer(0, dh_key_len); - TORRENT_ASSERT(!packet_finished()); + m_recv_buffer.cut(0, dh_key_len); + TORRENT_ASSERT(!m_recv_buffer.packet_finished()); return; #else disconnect(errors::invalid_info_hash, op_bittorrent, 1); @@ -3137,7 +3165,7 @@ namespace libtorrent } else { -#ifndef TORRENT_DISABLE_ENCRYPTION +#if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) TORRENT_ASSERT(m_state != read_pe_dhkey); if (!is_outgoing() @@ -3157,7 +3185,7 @@ namespace libtorrent } m_state = read_info_hash; - reset_recv_buffer(28); + m_recv_buffer.reset(28); } // fall through @@ -3165,10 +3193,10 @@ namespace libtorrent { received_bytes(0, bytes_transferred); bytes_transferred = 0; - TORRENT_ASSERT(packet_size() == 28); + TORRENT_ASSERT(m_recv_buffer.packet_size() == 28); - if (!packet_finished()) return; - recv_buffer = receive_buffer(); + if (!m_recv_buffer.packet_finished()) return; + recv_buffer = m_recv_buffer.get(); #ifdef TORRENT_VERBOSE_LOGGING @@ -3212,7 +3240,7 @@ namespace libtorrent std::copy(recv_buffer.begin + 8, recv_buffer.begin + 28 , (char*)info_hash.begin()); -#ifndef TORRENT_DISABLE_ENCRYPTION +#if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) bool allow_encrypted = m_encrypted && m_rc4_encrypted; #else bool allow_encrypted = true; @@ -3250,7 +3278,7 @@ namespace libtorrent if (is_disconnecting()) return; m_state = read_peer_id; - reset_recv_buffer(20); + m_recv_buffer.reset(20); } // fall through @@ -3260,15 +3288,15 @@ namespace libtorrent received_bytes(0, bytes_transferred); // bytes_transferred = 0; t = associated_torrent().lock(); - if (!t) + if (!t) { - TORRENT_ASSERT(!packet_finished()); // TODO + TORRENT_ASSERT(!m_recv_buffer.packet_finished()); // TODO return; } - TORRENT_ASSERT(packet_size() == 20); + TORRENT_ASSERT(m_recv_buffer.packet_size() == 20); - if (!packet_finished()) return; - recv_buffer = receive_buffer(); + if (!m_recv_buffer.packet_finished()) return; + recv_buffer = m_recv_buffer.get(); #ifdef TORRENT_VERBOSE_LOGGING { @@ -3288,7 +3316,7 @@ namespace libtorrent #endif peer_id pid; std::copy(recv_buffer.begin, recv_buffer.begin + 20, (char*)pid.begin()); - + if (t->settings().get_bool(settings_pack::allow_multiple_connections_per_ip)) { // now, let's see if this connection should be closed @@ -3359,7 +3387,7 @@ namespace libtorrent if (peer_info_struct()) t->clear_failcount(peer_info_struct()); -#ifndef TORRENT_DISABLE_ENCRYPTION +#if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) // Toggle pe_support back to false if this is a // standard successful connection if (is_outgoing() && !m_encrypted && @@ -3374,9 +3402,9 @@ namespace libtorrent #endif m_state = read_packet_size; - reset_recv_buffer(5); + m_recv_buffer.reset(5); - TORRENT_ASSERT(!packet_finished()); + TORRENT_ASSERT(!m_recv_buffer.packet_finished()); return; } @@ -3384,8 +3412,8 @@ namespace libtorrent if (m_state == read_packet_size) { // Make sure this is not fallen though into - TORRENT_ASSERT(recv_buffer == receive_buffer()); - TORRENT_ASSERT(packet_size() == 5); + TORRENT_ASSERT(recv_buffer == m_recv_buffer.get()); + TORRENT_ASSERT(m_recv_buffer.packet_size() == 5); if (!t) return; @@ -3420,21 +3448,21 @@ namespace libtorrent if (is_disconnecting()) return; // keepalive message m_state = read_packet_size; - cut_receive_buffer(4, 5); + m_recv_buffer.cut(4, 5); return; } if (recv_buffer.left() < 5) return; m_state = read_packet; - cut_receive_buffer(4, packet_size); - recv_buffer = receive_buffer(); + m_recv_buffer.cut(4, packet_size); + recv_buffer = m_recv_buffer.get(); TORRENT_ASSERT(recv_buffer.left() == 1); TORRENT_ASSERT(bytes_transferred == 1); } if (m_state == read_packet) { - TORRENT_ASSERT(recv_buffer == receive_buffer()); + TORRENT_ASSERT(recv_buffer == m_recv_buffer.get()); if (!t) { received_bytes(0, bytes_transferred); @@ -3448,7 +3476,7 @@ namespace libtorrent if (dispatch_message(bytes_transferred)) { m_state = read_packet_size; - reset_recv_buffer(5); + m_recv_buffer.reset(5); } #ifdef TORRENT_DEBUG TORRENT_ASSERT(statistics().last_payload_downloaded() - cur_payload_dl >= 0); @@ -3457,12 +3485,24 @@ namespace libtorrent statistics().last_protocol_downloaded() - cur_protocol_dl; TORRENT_ASSERT(stats_diff == size_type(bytes_transferred)); #endif - TORRENT_ASSERT(!packet_finished()); + TORRENT_ASSERT(!m_recv_buffer.packet_finished()); return; } - - TORRENT_ASSERT(!packet_finished()); - } + + TORRENT_ASSERT(!m_recv_buffer.packet_finished()); + } + +#if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) + int bt_peer_connection::hit_send_barrier(std::vector& iovec) + { + int next_barrier = m_enc_handler.encrypt(iovec); +#ifdef TORRENT_VERBOSE_LOGGING + if (next_barrier != 0) + peer_log("==> encrypted block [ s = %d ]", next_barrier); +#endif + return next_barrier; + } +#endif // -------------------------- // SEND DATA @@ -3529,11 +3569,11 @@ namespace libtorrent { boost::shared_ptr t = associated_torrent().lock(); -#ifndef TORRENT_DISABLE_ENCRYPTION +#if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) TORRENT_ASSERT( (bool(m_state != read_pe_dhkey) || m_dh_key_exchange.get()) || !is_outgoing()); - TORRENT_ASSERT(!m_rc4_encrypted || m_enc_handler.get()); + TORRENT_ASSERT(!m_rc4_encrypted || (!m_encrypted && m_rc4) || (m_encrypted && !m_enc_handler.is_send_plaintext())); #endif if (!in_handshake()) { diff --git a/src/chained_buffer.cpp b/src/chained_buffer.cpp index c3171d745..62b161f6c 100644 --- a/src/chained_buffer.cpp +++ b/src/chained_buffer.cpp @@ -85,6 +85,26 @@ namespace libtorrent TORRENT_ASSERT(m_bytes <= m_capacity); } + void chained_buffer::prepend_buffer(char* buffer, int s, int used_size + , free_buffer_fun destructor, void* userdata + , block_cache_reference ref) + { + TORRENT_ASSERT(s >= used_size); + buffer_t b; + b.buf = buffer; + b.size = s; + b.start = buffer; + b.used_size = used_size; + b.free_fun = destructor; + b.userdata = userdata; + b.ref = ref; + m_vec.push_front(b); + + m_bytes += used_size; + m_capacity += s; + TORRENT_ASSERT(m_bytes <= m_capacity); + } + // returns the number of bytes available at the // end of the last chained buffer. int chained_buffer::space_in_last_buffer() @@ -127,21 +147,31 @@ namespace libtorrent { TORRENT_ASSERT(is_single_thread()); m_tmp_vec.clear(); + build_vec(to_send, m_tmp_vec); + return m_tmp_vec; + } + void chained_buffer::build_mutable_iovec(int bytes, std::vector &vec) + { + build_vec(bytes, vec); + } + + template + void chained_buffer::build_vec(int bytes, std::vector &vec) + { for (std::deque::iterator i = m_vec.begin() - , end(m_vec.end()); to_send > 0 && i != end; ++i) + , end(m_vec.end()); bytes > 0 && i != end; ++i) { - if (i->used_size > to_send) + if (i->used_size > bytes) { - TORRENT_ASSERT(to_send > 0); - m_tmp_vec.push_back(asio::const_buffer(i->start, to_send)); + TORRENT_ASSERT(bytes > 0); + vec.push_back(Buffer(i->start, bytes)); break; } TORRENT_ASSERT(i->used_size > 0); - m_tmp_vec.push_back(asio::const_buffer(i->start, i->used_size)); - to_send -= i->used_size; + vec.push_back(Buffer(i->start, i->used_size)); + bytes -= i->used_size; } - return m_tmp_vec; } void chained_buffer::clear() diff --git a/src/http_seed_connection.cpp b/src/http_seed_connection.cpp index ff8f9287b..0a1f8c423 100644 --- a/src/http_seed_connection.cpp +++ b/src/http_seed_connection.cpp @@ -120,7 +120,7 @@ namespace libtorrent } else { - int receive_buffer_size = receive_buffer().left() - m_parser.body_start(); + int receive_buffer_size = m_recv_buffer.get().left() - m_parser.body_start(); // TODO: 1 in chunked encoding mode, this assert won't hold. // the chunk headers should be subtracted from the receive_buffer_size TORRENT_ASSERT_VAL(receive_buffer_size <= t->block_size(), receive_buffer_size); @@ -225,7 +225,7 @@ namespace libtorrent for (;;) { - buffer::const_interval recv_buffer = receive_buffer(); + buffer::const_interval recv_buffer = m_recv_buffer.get(); if (bytes_transferred == 0) break; TORRENT_ASSERT(recv_buffer.left() > 0); @@ -262,7 +262,7 @@ namespace libtorrent TORRENT_ASSERT(recv_buffer.left() == 0 || *recv_buffer.begin == 'H'); - TORRENT_ASSERT(recv_buffer.left() <= packet_size()); + TORRENT_ASSERT(recv_buffer.left() <= m_recv_buffer.packet_size()); // this means the entire status line hasn't been received yet if (m_parser.status_code() == -1) @@ -387,15 +387,15 @@ namespace libtorrent TORRENT_ASSERT(chunk_size != 0 || chunk_start.left() <= header_size || chunk_start.begin[header_size] == 'H'); // cut out the chunk header from the receive buffer TORRENT_ASSERT(m_chunk_pos + m_body_start < INT_MAX); - cut_receive_buffer(header_size, t->block_size() + 1024, int(m_chunk_pos + m_body_start)); - recv_buffer = receive_buffer(); + m_recv_buffer.cut(header_size, t->block_size() + 1024, int(m_chunk_pos + m_body_start)); + recv_buffer = m_recv_buffer.get(); recv_buffer.begin += m_body_start; m_chunk_pos += chunk_size; if (chunk_size == 0) { - TORRENT_ASSERT(receive_buffer().left() < m_chunk_pos + m_body_start + 1 - || receive_buffer()[int(m_chunk_pos + m_body_start)] == 'H' - || (m_parser.chunked_encoding() && receive_buffer()[int(m_chunk_pos + m_body_start)] == '\r')); + TORRENT_ASSERT(m_recv_buffer.get().left() < m_chunk_pos + m_body_start + 1 + || m_recv_buffer.get()[int(m_chunk_pos + m_body_start)] == 'H' + || (m_parser.chunked_encoding() && m_recv_buffer.get()[int(m_chunk_pos + m_body_start)] == '\r')); m_chunk_pos = -1; } } @@ -440,11 +440,11 @@ namespace libtorrent if (associated_torrent().expired()) return; int size_to_cut = m_body_start + front_request.length; - TORRENT_ASSERT(receive_buffer().left() < size_to_cut + 1 - || receive_buffer()[size_to_cut] == 'H' - || (m_parser.chunked_encoding() && receive_buffer()[size_to_cut] == '\r')); + TORRENT_ASSERT(m_recv_buffer.get().left() < size_to_cut + 1 + || m_recv_buffer.get()[size_to_cut] == 'H' + || (m_parser.chunked_encoding() && m_recv_buffer.get()[size_to_cut] == '\r')); - cut_receive_buffer(size_to_cut, t->block_size() + 1024); + m_recv_buffer.cut(size_to_cut, t->block_size() + 1024); if (m_response_left == 0) m_chunk_pos = 0; else m_chunk_pos -= front_request.length; bytes_transferred -= payload; diff --git a/src/http_tracker_connection.cpp b/src/http_tracker_connection.cpp index 0c1638a00..390772b0b 100644 --- a/src/http_tracker_connection.cpp +++ b/src/http_tracker_connection.cpp @@ -157,7 +157,7 @@ namespace libtorrent , (tracker_req().event != tracker_request::none) ? event_string[tracker_req().event - 1] : "" , tracker_req().num_want); url += str; -#ifndef TORRENT_DISABLE_ENCRYPTION +#if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) if (settings.get_int(settings_pack::in_enc_policy) != settings_pack::pe_disabled && settings.get_bool(settings_pack::announce_crypto_support)) url += "&supportcrypto=1"; diff --git a/src/pe_crypto.cpp b/src/pe_crypto.cpp index d758b5a95..a12af2f6b 100644 --- a/src/pe_crypto.cpp +++ b/src/pe_crypto.cpp @@ -1,6 +1,6 @@ /* -Copyright (c) 2007-2014, Un Shyam & Arvid Norberg +Copyright (c) 2007-2014, Un Shyam, Arvid Norberg, Steven Siloti All rights reserved. Redistribution and use in source and binary forms, with or without @@ -30,7 +30,7 @@ POSSIBILITY OF SUCH DAMAGE. */ -#ifndef TORRENT_DISABLE_ENCRYPTION +#if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) #include #include @@ -301,6 +301,230 @@ get_out: return ret; } + int encryption_handler::encrypt(std::vector& iovec) + { + TORRENT_ASSERT(!m_send_barriers.empty()); + TORRENT_ASSERT(m_send_barriers.front().enc_handler); + + int to_process = m_send_barriers.front().next; + + if (to_process != INT_MAX) + { + for (std::vector::iterator i = iovec.begin(); + to_process >= 0; ++i) + { + if (to_process == 0) + { + iovec.erase(i, iovec.end()); + break; + } + else if (to_process < asio::buffer_size(*i)) + { + *i = asio::mutable_buffer(asio::buffer_cast(*i), to_process); + iovec.erase(++i, iovec.end()); + to_process = 0; + break; + } + to_process -= asio::buffer_size(*i); + } + TORRENT_ASSERT(to_process == 0); + } + +#if defined TORRENT_DEBUG || TORRENT_RELEASE_ASSERTS + to_process = 0; + for (std::vector::iterator i = iovec.begin(); + i != iovec.end(); ++i) + to_process += asio::buffer_size(*i); +#endif + + int next_barrier = 0; + if (iovec.empty() || (next_barrier = m_send_barriers.front().enc_handler->encrypt(iovec))) + { + if (m_send_barriers.front().next != INT_MAX) + { + if (m_send_barriers.size() == 1) + // transitioning back to plaintext + next_barrier = INT_MAX; + m_send_barriers.pop_front(); + } + +#if defined TORRENT_DEBUG || TORRENT_RELEASE_ASSERTS + if (next_barrier != INT_MAX) + { + int overhead = 0; + for (std::vector::iterator i = iovec.begin(); + i != iovec.end(); ++i) + overhead += asio::buffer_size(*i); + TORRENT_ASSERT(overhead + to_process == next_barrier); + } +#endif + } + else + { + iovec.clear(); + } + return next_barrier; + } + + int encryption_handler::decrypt(crypto_receive_buffer& recv_buffer, std::size_t& bytes_transferred) + { + TORRENT_ASSERT(!is_recv_plaintext()); + int consume = 0; + if (recv_buffer.crypto_packet_finished()) + { + std::vector wr_buf; + recv_buffer.mutable_buffers(wr_buf, bytes_transferred); + int packet_size = 0; + int produce = bytes_transferred; + m_dec_handler->decrypt(wr_buf, consume, produce, packet_size); + TORRENT_ASSERT(packet_size || produce); + TORRENT_ASSERT(packet_size >= 0); + bytes_transferred = produce; + if (packet_size) + recv_buffer.crypto_cut(consume, packet_size); + } + else + bytes_transferred = 0; + return consume; + } + + bool encryption_handler::switch_send_crypto(boost::shared_ptr crypto + , int pending_encryption) + { + bool place_barrier = false; + if (!m_send_barriers.empty()) + { + std::list::iterator end = m_send_barriers.end(); --end; + for (std::list::iterator b = m_send_barriers.begin(); + b != end; ++b) + pending_encryption -= b->next; + TORRENT_ASSERT(pending_encryption >= 0); + m_send_barriers.back().next = pending_encryption; + } + else if (crypto) + place_barrier = true; + + if (crypto) + m_send_barriers.push_back(barrier(crypto, INT_MAX)); + + return place_barrier; + } + + void encryption_handler::switch_recv_crypto(boost::shared_ptr crypto + , crypto_receive_buffer& recv_buffer) + { + m_dec_handler = crypto; + int packet_size = 0; + if (crypto) + { + int consume = 0; + int produce = 0; + std::vector wr_buf; + crypto->decrypt(wr_buf, consume, produce, packet_size); + TORRENT_ASSERT(wr_buf.empty()); + TORRENT_ASSERT(consume == 0); + TORRENT_ASSERT(produce == 0); + } + recv_buffer.crypto_reset(packet_size); + } + + void rc4_handler::set_incoming_key(unsigned char const* key, int len) + { + m_decrypt = true; +#ifdef TORRENT_USE_GCRYPT + gcry_cipher_close(m_rc4_incoming); + gcry_cipher_open(&m_rc4_incoming, GCRY_CIPHER_ARCFOUR, GCRY_CIPHER_MODE_STREAM, 0); + gcry_cipher_setkey(m_rc4_incoming, key, len); +#elif defined TORRENT_USE_OPENSSL + RC4_set_key(&m_remote_key, len, key); +#else + rc4_init(key, len, &m_rc4_incoming); +#endif + // Discard first 1024 bytes + char buf[1024]; + std::vector vec(1, boost::asio::mutable_buffer(buf, 1024)); + int consume = 0; + int produce = 0; + int packet_size = 0; + decrypt(vec, consume, produce, packet_size); + } + + void rc4_handler::set_outgoing_key(unsigned char const* key, int len) + { + m_encrypt = true; +#ifdef TORRENT_USE_GCRYPT + gcry_cipher_close(m_rc4_outgoing); + gcry_cipher_open(&m_rc4_outgoing, GCRY_CIPHER_ARCFOUR, GCRY_CIPHER_MODE_STREAM, 0); + gcry_cipher_setkey(m_rc4_outgoing, key, len); +#elif defined TORRENT_USE_OPENSSL + RC4_set_key(&m_local_key, len, key); +#else + rc4_init(key, len, &m_rc4_outgoing); +#endif + // Discard first 1024 bytes + char buf[1024]; + std::vector vec(1, boost::asio::mutable_buffer(buf, 1024)); + encrypt(vec); + } + + int rc4_handler::encrypt(std::vector& buf) + { + if (!m_encrypt) return 0; + if (buf.empty()) return 0; + + int bytes_processed = 0; + for (std::vector::iterator i = buf.begin(); + i != buf.end(); ++i) + { + char* pos = boost::asio::buffer_cast(*i); + int len = boost::asio::buffer_size(*i); + + TORRENT_ASSERT(len >= 0); + TORRENT_ASSERT(pos); + + bytes_processed += len; +#ifdef TORRENT_USE_GCRYPT + gcry_cipher_encrypt(m_rc4_outgoing, pos, len, 0, 0); +#elif defined TORRENT_USE_OPENSSL + RC4(&m_local_key, len, (const unsigned char*)pos, (unsigned char*)pos); +#else + rc4_encrypt((unsigned char*)pos, len, &m_rc4_outgoing); +#endif + } + buf.clear(); + return bytes_processed; + } + + void rc4_handler::decrypt(std::vector& buf + , int& consume + , int& produce + , int& packet_size) + { + if (!m_decrypt) return; + + int bytes_processed = 0; + for (std::vector::iterator i = buf.begin(); + i != buf.end(); ++i) + { + char* pos = boost::asio::buffer_cast(*i); + int len = boost::asio::buffer_size(*i); + + TORRENT_ASSERT(len >= 0); + TORRENT_ASSERT(pos); + + bytes_processed += len; +#ifdef TORRENT_USE_GCRYPT + gcry_cipher_decrypt(m_rc4_incoming, pos, len, 0, 0); +#elif defined TORRENT_USE_OPENSSL + RC4(&m_remote_key, len, (const unsigned char*)pos, (unsigned char*)pos); +#else + rc4_encrypt((unsigned char*)pos, len, &m_rc4_incoming); +#endif + } + buf.clear(); + produce = bytes_processed; + } + } // namespace libtorrent #if !defined TORRENT_USE_OPENSSL && !defined TORRENT_USE_GCRYPT @@ -369,5 +593,5 @@ unsigned long rc4_encrypt(unsigned char *out, unsigned long outlen, rc4 *state) #endif -#endif // #ifndef TORRENT_DISABLE_ENCRYPTION +#endif // #if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) diff --git a/src/peer_connection.cpp b/src/peer_connection.cpp index 301fa21f9..66a410105 100644 --- a/src/peer_connection.cpp +++ b/src/peer_connection.cpp @@ -91,11 +91,6 @@ namespace libtorrent min_request_queue = 2, }; - int round_up8(int v) - { - return ((v & 7) == 0) ? v : v + (8 - (v & 7)); - } - #if defined TORRENT_REQUEST_LOGGING void write_request_log(FILE* f, sha1_hash const& ih , peer_connection* p, peer_request const& r) @@ -142,7 +137,7 @@ namespace libtorrent , m_peer_info(pack.peerinfo) , m_counters(*pack.stats_counters) , m_num_pieces(0) - , m_recv_start(0) + , m_recv_buffer(*pack.allocator) , m_max_out_request_queue(m_settings.get_int(settings_pack::max_out_request_queue)) , m_remote(pack.endp) , m_disk_thread(*pack.disk_thread) @@ -165,16 +160,12 @@ namespace libtorrent , m_downloaded_at_last_round(0) , m_uploaded_at_last_round(0) , m_uploaded_at_last_unchoke(0) - , m_soft_packet_size(0) , m_outstanding_bytes(0) - , m_disk_recv_buffer(*pack.allocator, 0) , m_last_seen_complete(0) , m_receiving_block(piece_block::invalid) , m_timeout_extend(0) , m_extension_outstanding_bytes(0) , m_queued_time_critical(0) - , m_recv_end(0) - , m_disk_recv_buffer_size(0) , m_reading_bytes(0) , m_picker_options(0) , m_num_invalid_requests(0) @@ -183,6 +174,7 @@ namespace libtorrent , m_outstanding_writing_bytes(0) , m_download_rate_peak(0) , m_upload_rate_peak(0) + , m_send_barrier(INT_MAX) , m_desired_queue_size(2) , m_speed(slow) , m_prefer_whole_pieces(0) @@ -527,9 +519,9 @@ namespace libtorrent TORRENT_ASSERT(is_single_thread()); if (!m_logger) return; - va_list v; + va_list v; va_start(v, fmt); - + char usr[400]; vsnprintf(usr, sizeof(usr), fmt, v); va_end(v); @@ -820,8 +812,6 @@ namespace libtorrent m_connecting = false; } - m_disk_recv_buffer_size = 0; - #ifndef TORRENT_DISABLE_EXTENSIONS m_extensions.clear(); #endif @@ -2427,7 +2417,7 @@ namespace libtorrent check_invariant(); #endif #if TORRENT_USE_ASSERTS - buffer::const_interval recv_buffer = receive_buffer(); + buffer::const_interval recv_buffer = m_recv_buffer.get(); int recv_pos = recv_buffer.end - recv_buffer.begin; TORRENT_ASSERT(recv_pos >= 9); #endif @@ -2579,8 +2569,7 @@ namespace libtorrent boost::shared_ptr t = m_torrent.lock(); TORRENT_ASSERT(t); - TORRENT_ASSERT(!m_disk_recv_buffer); - TORRENT_ASSERT(m_disk_recv_buffer_size == 0); + m_recv_buffer.assert_no_disk_buffer(); // we're not receiving any block right now m_receiving_block = piece_block::invalid; @@ -3990,8 +3979,7 @@ namespace libtorrent // make sure we free up all send buffers that are owned // by the disk thread m_send_buffer.clear(); - m_disk_recv_buffer.reset(); - m_disk_recv_buffer_size = 0; + m_recv_buffer.free_disk_buffer(); } // we cannot do this in a constructor @@ -4056,7 +4044,7 @@ namespace libtorrent if (m_outgoing) m_counters.inc_stats_counter(counters::error_outgoing_peers); else m_counters.inc_stats_counter(counters::error_incoming_peers); -#ifndef TORRENT_DISABLE_ENCRYPTION +#if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) if (type() == bittorrent_connection) { bt_peer_connection* bt = static_cast(this); @@ -4329,8 +4317,8 @@ namespace libtorrent p.remote_dl_rate = m_remote_dl_rate; p.send_buffer_size = m_send_buffer.capacity(); p.used_send_buffer = m_send_buffer.size(); - p.receive_buffer_size = m_recv_buffer.capacity() + m_disk_recv_buffer_size; - p.used_receive_buffer = m_recv_pos; + p.receive_buffer_size = m_recv_buffer.capacity(); + p.used_receive_buffer = m_recv_buffer.pos(); p.write_state = m_channel_state[upload_channel]; p.read_state = m_channel_state[download_channel]; @@ -4367,10 +4355,8 @@ namespace libtorrent TORRENT_ASSERT(is_single_thread()); INVARIANT_CHECK; - TORRENT_ASSERT(m_packet_size > 0); - TORRENT_ASSERT(m_recv_pos <= m_packet_size - disk_buffer_size); - TORRENT_ASSERT(!m_disk_recv_buffer); - TORRENT_ASSERT(m_disk_recv_buffer_size == 0); + m_recv_buffer.assert_no_disk_buffer(); + TORRENT_ASSERT(m_recv_buffer.pos() <= m_recv_buffer.packet_size() - disk_buffer_size); TORRENT_ASSERT(disk_buffer_size <= 16 * 1024); if (disk_buffer_size == 0) return true; @@ -4382,13 +4368,15 @@ namespace libtorrent } // first free the old buffer - m_disk_recv_buffer.reset(); + m_recv_buffer.free_disk_buffer(); // then allocate a new one bool exceeded = false; - m_disk_recv_buffer.reset(m_allocator.allocate_disk_buffer(exceeded, self(), "receive buffer")); + m_recv_buffer.assign_disk_buffer( + m_allocator.allocate_disk_buffer(exceeded, self(), "receive buffer") + , disk_buffer_size); - if (!m_disk_recv_buffer) + if (!m_recv_buffer.has_disk_buffer()) { disconnect(errors::no_memory, op_alloc_recvbuf); return false; @@ -4404,84 +4392,9 @@ namespace libtorrent m_channel_state[download_channel] |= peer_info::bw_disk; } - m_disk_recv_buffer_size = disk_buffer_size; return true; } - char* peer_connection::release_disk_receive_buffer() - { - TORRENT_ASSERT(is_single_thread()); - if (!m_disk_recv_buffer) return 0; - - TORRENT_ASSERT(m_disk_recv_buffer_size <= m_recv_end); - TORRENT_ASSERT(m_recv_start <= m_recv_end - m_disk_recv_buffer_size); - m_recv_end -= m_disk_recv_buffer_size; - m_disk_recv_buffer_size = 0; - return m_disk_recv_buffer.release(); - } - - // size = the packet size to remove from the receive buffer - // packet_size = the next packet size to receive in the buffer - // offset = the offset into the receive buffer where to remove `size` bytes - void peer_connection::cut_receive_buffer(int size, int packet_size, int offset) - { - TORRENT_ASSERT(is_single_thread()); - INVARIANT_CHECK; - - TORRENT_ASSERT(packet_size > 0); - TORRENT_ASSERT(int(m_recv_buffer.size()) >= size); - TORRENT_ASSERT(int(m_recv_buffer.size()) >= m_recv_pos); - TORRENT_ASSERT(m_recv_pos >= size + offset); - TORRENT_ASSERT(offset >= 0); - TORRENT_ASSERT(m_recv_buffer.size() >= m_recv_end); - TORRENT_ASSERT(m_recv_start <= m_recv_end); - TORRENT_ASSERT(size >= 0); - - if (offset > 0) - { - TORRENT_ASSERT(m_recv_start - size <= m_recv_end); - - if (size > 0) - std::memmove(&m_recv_buffer[0] + m_recv_start + offset - , &m_recv_buffer[0] + m_recv_start + offset + size - , m_recv_end - m_recv_start - size - offset); - - m_recv_pos -= size; - m_recv_end -= size; - -#ifdef TORRENT_DEBUG - std::fill(m_recv_buffer.begin() + m_recv_end, m_recv_buffer.end(), 0xcc); -#endif - } - else - { - TORRENT_ASSERT(m_recv_start + size <= m_recv_end); - m_recv_start += size; - m_recv_pos -= size; - } - - m_packet_size = packet_size; - } - - // the purpose of this function is to free up and cut off all messages - // in the receive buffer that have been parsed and processed. - void peer_connection::normalize_receive_buffer() - { - TORRENT_ASSERT(is_single_thread()); - TORRENT_ASSERT(m_recv_end >= m_recv_start); - if (m_recv_start == 0) return; - - if (m_recv_end > m_recv_start) - std::memmove(&m_recv_buffer[0], &m_recv_buffer[0] + m_recv_start, m_recv_end - m_recv_start); - - m_recv_end -= m_recv_start; - m_recv_start = 0; - -#ifdef TORRENT_DEBUG - std::fill(m_recv_buffer.begin() + m_recv_end, m_recv_buffer.end(), 0xcc); -#endif - } - void peer_connection::superseed_piece(int replace_piece, int new_piece) { TORRENT_ASSERT(is_single_thread()); @@ -5262,7 +5175,7 @@ namespace libtorrent if (channel == download_channel) { return (std::max)((std::max)(m_outstanding_bytes - , m_packet_size - m_recv_pos) + 30 + , m_recv_buffer.packet_bytes_remaining()) + 30 , int(boost::int64_t(m_statistics.download_rate()) * 2 / (1000 / m_settings.get_int(settings_pack::tick_interval)))); } @@ -5364,7 +5277,24 @@ namespace libtorrent if (m_channel_state[upload_channel] & peer_info::bw_network) return; - if (m_quota[upload_channel] == 0 + if (m_send_barrier == 0) + { + std::vector vec; + m_send_buffer.build_mutable_iovec(m_send_buffer.size(), vec); + int next_barrier = hit_send_barrier(vec); + for (std::vector::reverse_iterator i = vec.rbegin(); + i != vec.rend(); ++i) + { + m_send_buffer.prepend_buffer(asio::buffer_cast(*i) + , asio::buffer_size(*i) + , asio::buffer_size(*i) + , &nop + , NULL); + } + set_send_barrier(next_barrier); + } + + if ((m_quota[upload_channel] == 0 || m_send_barrier == 0) && !m_send_buffer.empty() && !m_connecting) { @@ -5441,6 +5371,8 @@ namespace libtorrent int amount_to_send = m_send_buffer.size(); if (amount_to_send > quota_left) amount_to_send = quota_left; + if (amount_to_send > m_send_barrier) + amount_to_send = m_send_barrier; TORRENT_ASSERT(amount_to_send > 0); @@ -5503,11 +5435,9 @@ namespace libtorrent TORRENT_ASSERT(is_single_thread()); INVARIANT_CHECK; - TORRENT_ASSERT(!m_disk_recv_buffer); TORRENT_ASSERT(m_channel_state[download_channel] & peer_info::bw_disk); - m_disk_recv_buffer.reset(buffer); - m_disk_recv_buffer_size = buffer_size; + m_recv_buffer.assign_disk_buffer(buffer, buffer_size); m_counters.inc_stats_counter(counters::num_peers_down_disk, -1); m_channel_state[download_channel] &= ~peer_info::bw_disk; @@ -5572,7 +5502,7 @@ namespace libtorrent return 0; } - int max_receive = m_packet_size - m_recv_pos; + int max_receive = m_recv_buffer.max_receive(); boost::array vec; int num_bufs = 0; @@ -5580,7 +5510,7 @@ namespace libtorrent // outstanding requests. When we're likely to receive pieces, we'll // save more time from avoiding copying data from the socket if ((m_settings.get_bool(settings_pack::contiguous_recv_buffer) - || m_download_queue.empty()) && !m_disk_recv_buffer) + || m_download_queue.empty()) && !m_recv_buffer.has_disk_buffer()) { if (s == read_sync) { @@ -5602,12 +5532,8 @@ namespace libtorrent return 0; } - TORRENT_ASSERT(m_packet_size > 0); TORRENT_ASSERT(max_receive >= 0); - if (m_recv_pos >= m_soft_packet_size) m_soft_packet_size = 0; - if (m_soft_packet_size && max_receive > m_soft_packet_size - m_recv_pos) - max_receive = m_soft_packet_size - m_recv_pos; int quota_left = m_quota[download_channel]; if (max_receive > quota_left) max_receive = quota_left; @@ -5618,43 +5544,7 @@ namespace libtorrent return 0; } - TORRENT_ASSERT(m_recv_pos >= 0); - TORRENT_ASSERT(m_packet_size > 0); - - int regular_buffer_size = m_packet_size - m_disk_recv_buffer_size; - - if (int(m_recv_buffer.size()) < regular_buffer_size) - m_recv_buffer.resize(round_up8(regular_buffer_size)); - - if (!m_disk_recv_buffer || regular_buffer_size >= m_recv_pos + max_receive) - { - // only receive into regular buffer - TORRENT_ASSERT(m_recv_pos + max_receive <= int(m_recv_buffer.size())); - vec[0] = asio::buffer(&m_recv_buffer[m_recv_pos], max_receive); - num_bufs = 1; - } - else if (m_recv_pos >= regular_buffer_size) - { - // only receive into disk buffer - TORRENT_ASSERT(m_recv_pos - regular_buffer_size >= 0); - TORRENT_ASSERT(m_recv_pos - regular_buffer_size + max_receive <= m_disk_recv_buffer_size); - vec[0] = asio::buffer(m_disk_recv_buffer.get() + m_recv_pos - regular_buffer_size, max_receive); - num_bufs = 1; - } - else - { - // receive into both regular and disk buffer - TORRENT_ASSERT(max_receive + m_recv_pos > regular_buffer_size); - TORRENT_ASSERT(m_recv_pos < regular_buffer_size); - TORRENT_ASSERT(max_receive - regular_buffer_size - + m_recv_pos <= m_disk_recv_buffer_size); - - vec[0] = asio::buffer(&m_recv_buffer[m_recv_pos] - , regular_buffer_size - m_recv_pos); - vec[1] = asio::buffer(m_disk_recv_buffer.get() - , max_receive - regular_buffer_size + m_recv_pos); - num_bufs = 2; - } + num_bufs = m_recv_buffer.reserve(vec, max_receive); if (s == read_async) { @@ -5672,12 +5562,15 @@ namespace libtorrent { if (num_bufs == 1) { + TORRENT_ASSERT(boost::asio::buffer_size(vec[0]) > 0); m_socket->async_read_some( asio::mutable_buffers_1(vec[0]), make_read_handler( boost::bind(&peer_connection::on_receive_data, self(), _1, _2))); } else { + TORRENT_ASSERT(boost::asio::buffer_size(vec[0]) + + boost::asio::buffer_size(vec[1])> 0); m_socket->async_read_some( vec, make_read_handler( boost::bind(&peer_connection::on_receive_data, self(), _1, _2))); @@ -5690,11 +5583,14 @@ namespace libtorrent j.peer = self(); if (num_bufs == 1) { + TORRENT_ASSERT(boost::asio::buffer_size(vec[0]) > 0); j.recv_buf = asio::buffer_cast(vec[0]); j.buf_size = asio::buffer_size(vec[0]); } else { + TORRENT_ASSERT(boost::asio::buffer_size(vec[0]) + + boost::asio::buffer_size(vec[1])> 0); j.read_vec = vec; } m_ses.post_socket_job(j); @@ -5720,71 +5616,11 @@ namespace libtorrent return ret; } -#ifndef TORRENT_DISABLE_ENCRYPTION - - // returns the last 'bytes' from the receive buffer - std::pair peer_connection::wr_recv_buffers(int bytes) - { - TORRENT_ASSERT(is_single_thread()); - TORRENT_ASSERT(bytes <= m_recv_pos); - - std::pair vec; - int regular_buffer_size = m_packet_size - m_disk_recv_buffer_size; - TORRENT_ASSERT(regular_buffer_size >= 0); - if (!m_disk_recv_buffer || regular_buffer_size >= m_recv_pos) - { - vec.first = buffer::interval(&m_recv_buffer[0] + m_recv_start - + m_recv_pos - bytes, &m_recv_buffer[0] + m_recv_start + m_recv_pos); - vec.second = buffer::interval(0,0); - } - else if (m_recv_pos - bytes >= regular_buffer_size) - { - vec.first = buffer::interval(m_disk_recv_buffer.get() + m_recv_pos - - regular_buffer_size - bytes, m_disk_recv_buffer.get() + m_recv_pos - - regular_buffer_size); - vec.second = buffer::interval(0,0); - } - else - { - TORRENT_ASSERT(m_recv_pos - bytes < regular_buffer_size); - TORRENT_ASSERT(m_recv_pos > regular_buffer_size); - vec.first = buffer::interval(&m_recv_buffer[0] + m_recv_start + m_recv_pos - bytes - , &m_recv_buffer[0] + m_recv_start + regular_buffer_size); - vec.second = buffer::interval(m_disk_recv_buffer.get() - , m_disk_recv_buffer.get() + m_recv_pos - regular_buffer_size); - } - TORRENT_ASSERT(vec.first.left() + vec.second.left() == bytes); - return vec; - } -#endif - - void peer_connection::reset_recv_buffer(int packet_size) - { - TORRENT_ASSERT(is_single_thread()); - TORRENT_ASSERT(m_recv_buffer.size() >= m_recv_end); - TORRENT_ASSERT(packet_size > 0); - if (m_recv_end > m_packet_size) - { - cut_receive_buffer(m_packet_size, packet_size); - return; - } - - m_recv_pos = 0; - m_recv_start = 0; - m_recv_end = 0; - m_packet_size = packet_size; - } - void peer_connection::append_send_buffer(char* buffer, int size , chained_buffer::free_buffer_fun destructor, void* userdata - , block_cache_reference ref, bool encrypted) + , block_cache_reference ref) { TORRENT_ASSERT(is_single_thread()); - // bittorrent connections should never use this function, since - // they might be encrypted and this would circumvent the actual - // encryption. bt_peer_connection overrides this function with - // its own version. - TORRENT_ASSERT(encrypted || type() != bittorrent_connection); m_send_buffer.append_buffer(buffer, size, size, destructor , userdata, ref); } @@ -5804,8 +5640,7 @@ namespace libtorrent ses->free_buffer(buffer); } - void peer_connection::send_buffer(char const* buf, int size, int flags - , void (*fun)(char*, int, void*), void* userdata) + void peer_connection::send_buffer(char const* buf, int size, int flags) { TORRENT_ASSERT(is_single_thread()); int free_space = m_send_buffer.space_in_last_buffer(); @@ -5814,7 +5649,6 @@ namespace libtorrent { char* dst = m_send_buffer.append(buf, free_space); TORRENT_ASSERT(dst != 0); - if (fun) fun(dst, free_space, userdata); size -= free_space; buf += free_space; } @@ -5833,7 +5667,6 @@ namespace libtorrent const int alloc_buf_size = m_ses.send_buffer_size(); int buf_size = (std::min)(alloc_buf_size, size); memcpy(chain_buf, buf, buf_size); - if (fun) fun(chain_buf, buf_size, userdata); buf += buf_size; size -= buf_size; m_send_buffer.append_buffer(chain_buf, alloc_buf_size, buf_size @@ -5914,14 +5747,13 @@ namespace libtorrent if (buffer_size > 2097152) buffer_size = 2097152; - m_recv_buffer.resize(m_recv_pos + buffer_size); - TORRENT_ASSERT(m_recv_start == 0); + asio::mutable_buffer buffer = m_recv_buffer.reserve(buffer_size); + TORRENT_ASSERT(m_recv_buffer.normalized()); // utp sockets aren't thread safe... if (is_utp(*m_socket)) { - bytes_transferred = m_socket->read_some(asio::buffer(&m_recv_buffer[m_recv_pos] - , buffer_size), ec); + bytes_transferred = m_socket->read_some(asio::mutable_buffers_1(buffer), ec); if (ec) { @@ -5945,8 +5777,8 @@ namespace libtorrent #endif socket_job j; j.type = socket_job::read_job; - j.recv_buf = &m_recv_buffer[m_recv_pos]; - j.buf_size = buffer_size; + j.recv_buf = asio::buffer_cast(buffer); + j.buf_size = asio::buffer_size(buffer); j.peer = self(); m_ses.post_socket_job(j); return; @@ -5979,6 +5811,8 @@ namespace libtorrent // function. TORRENT_ASSERT(m_channel_state[download_channel] & peer_info::bw_network); + TORRENT_ASSERT(bytes_transferred > 0 || error); + receive_data_impl(error, bytes_transferred, 10); } @@ -6019,6 +5853,8 @@ namespace libtorrent return; } + TORRENT_ASSERT(bytes_transferred > 0); + m_counters.inc_stats_counter(counters::on_read_counter); m_ses.received_buffer(bytes_transferred); @@ -6046,30 +5882,25 @@ namespace libtorrent return; } - TORRENT_ASSERT(m_packet_size > 0); TORRENT_ASSERT(bytes_transferred > 0); - - m_recv_end += bytes_transferred; - TORRENT_ASSERT(m_recv_pos <= int(m_recv_buffer.size() - + m_disk_recv_buffer_size)); + m_recv_buffer.received(bytes_transferred); int bytes = bytes_transferred; int sub_transferred = 0; do { INVARIANT_CHECK; -#if TORRENT_USE_ASSERTS +// TODO: The stats checks can not be honored when authenticated encryption is in use +// because we may have encrypted data which we cannot authenticate yet +#if 0 size_type cur_payload_dl = m_statistics.last_payload_downloaded(); size_type cur_protocol_dl = m_statistics.last_protocol_downloaded(); #endif - int packet_size = m_soft_packet_size ? m_soft_packet_size : m_packet_size; - int limit = packet_size > m_recv_pos ? packet_size - m_recv_pos : packet_size; - sub_transferred = (std::min)(bytes, limit); - m_recv_pos += sub_transferred; + sub_transferred = m_recv_buffer.advance_pos(bytes); on_receive(error, sub_transferred); bytes -= sub_transferred; TORRENT_ASSERT(sub_transferred > 0); -#if TORRENT_USE_ASSERTS +#if 0 TORRENT_ASSERT(m_statistics.last_payload_downloaded() - cur_payload_dl >= 0); TORRENT_ASSERT(m_statistics.last_protocol_downloaded() - cur_protocol_dl >= 0); size_type stats_diff = m_statistics.last_payload_downloaded() - cur_payload_dl + @@ -6080,22 +5911,16 @@ namespace libtorrent } while (bytes > 0 && sub_transferred > 0); - normalize_receive_buffer(); + m_recv_buffer.normalize(); - TORRENT_ASSERT(m_recv_pos == m_recv_end); + TORRENT_ASSERT(m_recv_buffer.pos_at_end()); + TORRENT_ASSERT(m_recv_buffer.packet_size() > 0); - TORRENT_ASSERT(m_packet_size > 0); - - if (m_peer_choked - && m_recv_pos == 0 - && (m_recv_buffer.capacity() - m_packet_size) > 128) + if (m_peer_choked) { - // round up to an even 8 bytes since that's the RC4 blocksize - buffer(round_up8(m_packet_size)).swap(m_recv_buffer); + m_recv_buffer.clamp_size(); } - if (m_recv_pos >= m_soft_packet_size) m_soft_packet_size = 0; - if (num_loops > read_loops) break; error_code ec; @@ -6137,6 +5962,7 @@ namespace libtorrent // we want to send data return !m_send_buffer.empty() && m_quota[upload_channel] > 0 + && (m_send_barrier > 0) && !m_connecting; } @@ -6388,6 +6214,9 @@ namespace libtorrent trancieve_ip_packet(bytes_transferred, m_remote.address().is_v6()); + if (m_send_barrier != INT_MAX) + m_send_barrier -= bytes_transferred; + #ifdef TORRENT_VERBOSE_LOGGING peer_log(">>> wrote %d bytes", int(bytes_transferred)); #endif @@ -6405,7 +6234,7 @@ namespace libtorrent // make sure we free up all send buffers that are owned // by the disk thread m_send_buffer.clear(); - m_disk_recv_buffer.reset(); + m_recv_buffer.free_disk_buffer(); return; } @@ -6448,10 +6277,9 @@ namespace libtorrent TORRENT_ASSERT(is_single_thread()); TORRENT_ASSERT(m_in_use == 1337); TORRENT_ASSERT(m_queued_time_critical <= int(m_request_queue.size())); - TORRENT_ASSERT(m_recv_end >= m_recv_start); TORRENT_ASSERT(m_accept_fast.size() == m_accept_fast_piece_cnt.size()); - TORRENT_ASSERT(bool(m_disk_recv_buffer) == (m_disk_recv_buffer_size > 0)); + m_recv_buffer.check_invariant(); for (int i = 0; i < 2; ++i) { diff --git a/src/peer_list.cpp b/src/peer_list.cpp index 376acbaf0..7f58770f9 100644 --- a/src/peer_list.cpp +++ b/src/peer_list.cpp @@ -915,7 +915,7 @@ namespace libtorrent if (m_round_robin >= iter - m_peers.begin()) ++m_round_robin; -#ifndef TORRENT_DISABLE_ENCRYPTION +#if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) if (flags & flag_encryption) p->pe_support = true; #endif if (flags & flag_seed) diff --git a/src/receive_buffer.cpp b/src/receive_buffer.cpp new file mode 100644 index 000000000..28584bc76 --- /dev/null +++ b/src/receive_buffer.cpp @@ -0,0 +1,400 @@ +/* + +Copyright (c) 2014, Arvid Norberg, Steven Siloti +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in + the documentation and/or other materials provided with the distribution. + * Neither the name of the author nor the names of its + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. + +*/ + +#include + +namespace libtorrent { + +int receive_buffer::max_receive() +{ + int max = packet_bytes_remaining(); + if (m_recv_pos >= m_soft_packet_size) m_soft_packet_size = 0; + if (m_soft_packet_size && max > m_soft_packet_size - m_recv_pos) + max = m_soft_packet_size - m_recv_pos; + return max; +} + +boost::asio::mutable_buffer receive_buffer::reserve(int size) +{ + TORRENT_ASSERT(size > 0); + TORRENT_ASSERT(!m_disk_recv_buffer); + m_recv_buffer.resize(m_recv_pos + size); + return boost::asio::buffer(&m_recv_buffer[m_recv_pos], size); +} + +int receive_buffer::reserve(boost::array& vec, int size) +{ + TORRENT_ASSERT(size > 0); + TORRENT_ASSERT(m_recv_pos >= 0); + TORRENT_ASSERT(m_packet_size > 0); + + int num_bufs; + int regular_buf_size = regular_buffer_size(); + + if (int(m_recv_buffer.size()) < regular_buf_size) + m_recv_buffer.resize(round_up8(regular_buf_size)); + + if (!m_disk_recv_buffer || regular_buf_size >= m_recv_pos + size) + { + // only receive into regular buffer + TORRENT_ASSERT(m_recv_pos + size <= int(m_recv_buffer.size())); + vec[0] = boost::asio::buffer(&m_recv_buffer[m_recv_pos], size); + TORRENT_ASSERT(boost::asio::buffer_size(vec[0]) > 0); + num_bufs = 1; + } + else if (m_recv_pos >= regular_buf_size) + { + // only receive into disk buffer + TORRENT_ASSERT(m_recv_pos - regular_buf_size >= 0); + TORRENT_ASSERT(m_recv_pos - regular_buf_size + size <= m_disk_recv_buffer_size); + vec[0] = boost::asio::buffer(m_disk_recv_buffer.get() + m_recv_pos - regular_buf_size, size); + TORRENT_ASSERT(boost::asio::buffer_size(vec[0]) > 0); + num_bufs = 1; + } + else + { + // receive into both regular and disk buffer + TORRENT_ASSERT(size + m_recv_pos > regular_buf_size); + TORRENT_ASSERT(m_recv_pos < regular_buf_size); + TORRENT_ASSERT(size - regular_buf_size + + m_recv_pos <= m_disk_recv_buffer_size); + + vec[0] = boost::asio::buffer(&m_recv_buffer[m_recv_pos] + , regular_buf_size - m_recv_pos); + vec[1] = boost::asio::buffer(m_disk_recv_buffer.get() + , size - regular_buf_size + m_recv_pos); + TORRENT_ASSERT(boost::asio::buffer_size(vec[0]) + + boost::asio::buffer_size(vec[1])> 0); + num_bufs = 2; + } + + return num_bufs; +} + +int receive_buffer::advance_pos(int bytes) +{ + int packet_size = m_soft_packet_size ? m_soft_packet_size : m_packet_size; + int limit = packet_size > m_recv_pos ? packet_size - m_recv_pos : packet_size; + int sub_transferred = (std::min)(bytes, limit); + m_recv_pos += sub_transferred; + if (m_recv_pos >= m_soft_packet_size) m_soft_packet_size = 0; + return sub_transferred; +} + +void receive_buffer::clamp_size() +{ + if (m_recv_pos == 0 + && (m_recv_buffer.capacity() - m_packet_size) > 128) + { + // round up to an even 8 bytes since that's the RC4 blocksize + buffer(round_up8(m_packet_size)).swap(m_recv_buffer); + } +} + +// size = the packet size to remove from the receive buffer +// packet_size = the next packet size to receive in the buffer +// offset = the offset into the receive buffer where to remove `size` bytes +void receive_buffer::cut(int size, int packet_size, int offset) +{ + TORRENT_ASSERT(packet_size > 0); + TORRENT_ASSERT(int(m_recv_buffer.size()) >= size); + TORRENT_ASSERT(int(m_recv_buffer.size()) >= m_recv_pos); + TORRENT_ASSERT(m_recv_pos >= size + offset); + TORRENT_ASSERT(offset >= 0); + TORRENT_ASSERT(m_recv_buffer.size() >= m_recv_end); + TORRENT_ASSERT(m_recv_start <= m_recv_end); + TORRENT_ASSERT(size >= 0); + + if (offset > 0) + { + TORRENT_ASSERT(m_recv_start - size <= m_recv_end); + + if (size > 0) + std::memmove(&m_recv_buffer[0] + m_recv_start + offset + , &m_recv_buffer[0] + m_recv_start + offset + size + , m_recv_end - m_recv_start - size - offset); + + m_recv_pos -= size; + m_recv_end -= size; + +#ifdef TORRENT_DEBUG + std::fill(m_recv_buffer.begin() + m_recv_end, m_recv_buffer.end(), 0xcc); +#endif + } + else + { + TORRENT_ASSERT(m_recv_start + size <= m_recv_end); + m_recv_start += size; + m_recv_pos -= size; + } + + m_packet_size = packet_size; +} + +buffer::const_interval receive_buffer::get() const +{ + if (m_recv_buffer.empty()) + { + TORRENT_ASSERT(m_recv_pos == 0); + return buffer::interval(0,0); + } + int rcv_pos = (std::min)(m_recv_pos, int(m_recv_buffer.size())); + return buffer::const_interval(&m_recv_buffer[0] + m_recv_start + , &m_recv_buffer[0] + m_recv_start + rcv_pos); +} + +#if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) +buffer::interval receive_buffer::mutable_buffer() +{ + if (m_recv_buffer.empty()) + { + TORRENT_ASSERT(m_recv_pos == 0); + return buffer::interval(0,0); + } + TORRENT_ASSERT(!m_disk_recv_buffer); + TORRENT_ASSERT(m_disk_recv_buffer_size == 0); + int rcv_pos = (std::min)(m_recv_pos, int(m_recv_buffer.size())); + return buffer::interval(&m_recv_buffer[0] + m_recv_start + , &m_recv_buffer[0] + m_recv_start + rcv_pos); +} + +void receive_buffer::mutable_buffers(std::vector& vec, int bytes) +{ + using namespace boost; + TORRENT_ASSERT(bytes <= m_recv_pos); + + int regular_buf_size = regular_buffer_size(); + TORRENT_ASSERT(regular_buf_size >= 0); + if (!m_disk_recv_buffer || regular_buf_size >= m_recv_pos) + { + vec.push_back(asio::mutable_buffer(&m_recv_buffer[0] + m_recv_start + + m_recv_pos - bytes, bytes)); + } + else if (m_recv_pos - bytes >= regular_buf_size) + { + vec.push_back(asio::mutable_buffer(m_disk_recv_buffer.get() + m_recv_pos + - regular_buf_size - bytes, bytes)); + } + else + { + TORRENT_ASSERT(m_recv_pos - bytes < regular_buf_size); + TORRENT_ASSERT(m_recv_pos > regular_buf_size); + vec.push_back(asio::mutable_buffer(&m_recv_buffer[0] + m_recv_start + m_recv_pos - bytes + , regular_buf_size - (m_recv_start + m_recv_pos - bytes))); + vec.push_back(asio::mutable_buffer(m_disk_recv_buffer.get() + , m_recv_pos - regular_buf_size)); + } + +#if defined TORRENT_DEBUG || TORRENT_RELEASE_ASSERTS + int vec_bytes = 0; + for (std::vector::iterator i = vec.begin(); + i != vec.end(); ++i) + vec_bytes += boost::asio::buffer_size(*i); + TORRENT_ASSERT(vec_bytes == bytes); +#endif +} +#endif + +void receive_buffer::assign_disk_buffer(char* buffer, int size) +{ + TORRENT_ASSERT(m_packet_size > 0); + assert_no_disk_buffer(); + m_disk_recv_buffer.reset(buffer); + if (m_disk_recv_buffer) + m_disk_recv_buffer_size = size; +} + +char* receive_buffer::release_disk_buffer() +{ + if (!m_disk_recv_buffer) return 0; + + TORRENT_ASSERT(m_disk_recv_buffer_size <= m_recv_end); + TORRENT_ASSERT(m_recv_start <= m_recv_end - m_disk_recv_buffer_size); + m_recv_end -= m_disk_recv_buffer_size; + m_disk_recv_buffer_size = 0; + return m_disk_recv_buffer.release(); +} + +// the purpose of this function is to free up and cut off all messages +// in the receive buffer that have been parsed and processed. +void receive_buffer::normalize() +{ + TORRENT_ASSERT(m_recv_end >= m_recv_start); + if (m_recv_start == 0) return; + + if (m_recv_end > m_recv_start) + std::memmove(&m_recv_buffer[0], &m_recv_buffer[0] + m_recv_start, m_recv_end - m_recv_start); + + m_recv_end -= m_recv_start; + m_recv_start = 0; + +#ifdef TORRENT_DEBUG + std::fill(m_recv_buffer.begin() + m_recv_end, m_recv_buffer.end(), 0xcc); +#endif +} + +void receive_buffer::reset(int packet_size) +{ + TORRENT_ASSERT(m_recv_buffer.size() >= m_recv_end); + TORRENT_ASSERT(packet_size > 0); + if (m_recv_end > m_packet_size) + { + cut(m_packet_size, packet_size); + return; + } + + m_recv_pos = 0; + m_recv_start = 0; + m_recv_end = 0; + m_packet_size = packet_size; +} + +#if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) +bool crypto_receive_buffer::packet_finished() const +{ + if (m_recv_pos == INT_MAX) + return m_connection_buffer.packet_finished(); + else + return m_packet_size <= m_recv_pos; +} + +int crypto_receive_buffer::packet_size() const +{ + if (m_recv_pos == INT_MAX) + return m_connection_buffer.packet_size(); + else + return m_packet_size; +} + +int crypto_receive_buffer::pos() const +{ + if (m_recv_pos == INT_MAX) + return m_connection_buffer.pos(); + else + return m_recv_pos; +} + +void crypto_receive_buffer::cut(int size, int packet_size, int offset) +{ + if (m_recv_pos != INT_MAX) + { + TORRENT_ASSERT(size <= m_recv_pos); + m_packet_size = packet_size; + packet_size = m_connection_buffer.packet_size() - size; + m_recv_pos -= size; + } + m_connection_buffer.cut(size, packet_size, offset); +} + +void crypto_receive_buffer::reset(int packet_size) +{ + if (m_recv_pos != INT_MAX) + { + if (m_connection_buffer.m_recv_end > m_packet_size) + { + cut(m_packet_size, packet_size); + return; + } + m_packet_size = packet_size; + packet_size = m_connection_buffer.packet_size() - m_recv_pos; + m_recv_pos = 0; + } + m_connection_buffer.reset(packet_size); +} + +void crypto_receive_buffer::crypto_reset(int packet_size) +{ + TORRENT_ASSERT(packet_finished()); + TORRENT_ASSERT(crypto_packet_finished()); + TORRENT_ASSERT(m_recv_pos == INT_MAX || m_recv_pos == m_connection_buffer.pos()); + TORRENT_ASSERT(m_recv_pos == INT_MAX || m_connection_buffer.pos_at_end()); + TORRENT_ASSERT(!m_connection_buffer.has_disk_buffer()); + + if (packet_size == 0) + { + if (m_recv_pos != INT_MAX) + m_connection_buffer.cut(0, m_packet_size); + m_recv_pos = INT_MAX; + } + else + { + if (m_recv_pos == INT_MAX) + m_packet_size = m_connection_buffer.packet_size(); + m_recv_pos = m_connection_buffer.pos(); + TORRENT_ASSERT(m_recv_pos >= 0); + m_connection_buffer.cut(0, m_recv_pos + packet_size); + } +} + +void crypto_receive_buffer::set_soft_packet_size(int size) +{ + if (m_recv_pos == INT_MAX) + m_connection_buffer.set_soft_packet_size(size); + else + m_soft_packet_size = size; +} + +int crypto_receive_buffer::advance_pos(int bytes) +{ + if (m_recv_pos == INT_MAX) return bytes; + + int packet_size = m_soft_packet_size ? m_soft_packet_size : m_packet_size; + int limit = packet_size > m_recv_pos ? packet_size - m_recv_pos : packet_size; + int sub_transferred = (std::min)(bytes, limit); + m_recv_pos += sub_transferred; + m_connection_buffer.cut(0, m_connection_buffer.packet_size() + sub_transferred); + if (m_recv_pos >= m_soft_packet_size) m_soft_packet_size = 0; + return sub_transferred; +} + +buffer::const_interval crypto_receive_buffer::get() const +{ + buffer::const_interval recv_buffer = m_connection_buffer.get(); + if (m_recv_pos < m_connection_buffer.pos()) + recv_buffer.end = recv_buffer.begin + m_recv_pos; + return recv_buffer; +} + +void crypto_receive_buffer::mutable_buffers( + std::vector& vec + , std::size_t bytes_transfered) +{ + int pending_decryption = bytes_transfered; + if (m_recv_pos != INT_MAX) + { + pending_decryption = m_connection_buffer.packet_size() - m_recv_pos; + } + m_connection_buffer.mutable_buffers(vec, pending_decryption); +} +#endif // TORRENT_DISABLE_ENCRYPTION + +} // namespace libtorrent diff --git a/src/session_impl.cpp b/src/session_impl.cpp index 02b8c82b3..eba1d9c01 100644 --- a/src/session_impl.cpp +++ b/src/session_impl.cpp @@ -4199,7 +4199,7 @@ retry: trigger_auto_manage(); } -#ifndef TORRENT_DISABLE_ENCRYPTION +#if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) torrent const* session_impl::find_encrypted_torrent(sha1_hash const& info_hash , sha1_hash const& xor_mask) { @@ -4692,7 +4692,7 @@ retry: TORRENT_ASSERT(m_torrents.size() >= m_torrent_lru.size()); -#ifndef TORRENT_DISABLE_ENCRYPTION +#if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) hasher h; h.update("req2", 4); h.update((char*)&(*ih)[0], 20); @@ -4904,7 +4904,7 @@ retry: TORRENT_ASSERT(m_torrents.size() >= m_torrent_lru.size()); -#ifndef TORRENT_DISABLE_ENCRYPTION +#if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) hasher h; h.update("req2", 4); h.update((char*)&tptr->info_hash()[0], 20); @@ -5527,13 +5527,13 @@ retry: } } -#if !defined TORRENT_DISABLE_ENCRYPTION +#if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) void session_impl::add_obfuscated_hash(sha1_hash const& obfuscated , boost::weak_ptr const& t) { m_obfuscated_torrents.insert(std::make_pair(obfuscated, t.lock())); } -#endif // TORRENT_DISABLE_ENCRYPTION +#endif // !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) bool session_impl::is_listening() const { diff --git a/src/torrent.cpp b/src/torrent.cpp index 6b2ca46df..7150ce167 100644 --- a/src/torrent.cpp +++ b/src/torrent.cpp @@ -518,7 +518,7 @@ namespace libtorrent } m_trackers.swap(new_trackers); -#ifndef TORRENT_DISABLE_ENCRYPTION +#if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) hasher h; h.update("req2", 4); h.update((char*)&m_torrent_file->info_hash()[0], 20); @@ -667,7 +667,7 @@ namespace libtorrent } m_trackers.swap(new_trackers); -#ifndef TORRENT_DISABLE_ENCRYPTION +#if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) hasher h; h.update("req2", 4); h.update((char*)&m_torrent_file->info_hash()[0], 20); diff --git a/src/torrent_peer.cpp b/src/torrent_peer.cpp index 2a99df41e..c07efebb7 100644 --- a/src/torrent_peer.cpp +++ b/src/torrent_peer.cpp @@ -145,7 +145,7 @@ namespace libtorrent , fast_reconnects(0) , trust_points(0) , source(src) -#ifndef TORRENT_DISABLE_ENCRYPTION +#if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) // assume no support in order to // prefer opening non-encrypyed // connections. If it fails, we'll diff --git a/src/ut_pex.cpp b/src/ut_pex.cpp index 02886c0fe..3ce2027ff 100644 --- a/src/ut_pex.cpp +++ b/src/ut_pex.cpp @@ -179,7 +179,7 @@ namespace libtorrent { namespace // used as a rendezvous point in case direct // connections to the peer fail int flags = p->is_seed() ? 2 : 0; -#ifndef TORRENT_DISABLE_ENCRYPTION +#if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) flags |= p->supports_encryption() ? 1 : 0; #endif flags |= is_utp(*p->get_socket()) ? 4 : 0; @@ -565,7 +565,7 @@ namespace libtorrent { namespace // used as a rendezvous point in case direct // connections to the peer fail int flags = p->is_seed() ? 2 : 0; -#ifndef TORRENT_DISABLE_ENCRYPTION +#if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) flags |= p->supports_encryption() ? 1 : 0; #endif flags |= is_utp(*p->get_socket()) ? 4 : 0; diff --git a/src/web_connection_base.cpp b/src/web_connection_base.cpp index 0ce1bdd26..d4c536433 100644 --- a/src/web_connection_base.cpp +++ b/src/web_connection_base.cpp @@ -124,7 +124,7 @@ namespace libtorrent // it is always possible to request pieces incoming_unchoke(); - reset_recv_buffer(t->block_size() + 1024); + m_recv_buffer.reset(t->block_size() + 1024); } void web_connection_base::add_headers(std::string& request diff --git a/src/web_peer_connection.cpp b/src/web_peer_connection.cpp index 29d4e84e2..435f6103c 100644 --- a/src/web_peer_connection.cpp +++ b/src/web_peer_connection.cpp @@ -376,7 +376,7 @@ void web_peer_connection::write_request(peer_request const& r) // in case the first file on this series of requests is a padfile // we need to handle it right now, and pretend that we got a response // with zeros. - buffer::const_interval recv_buffer = receive_buffer(); + buffer::const_interval recv_buffer = m_recv_buffer.get(); handle_padfile(recv_buffer); if (associated_torrent().expired()) return; @@ -412,16 +412,16 @@ bool web_peer_connection::maybe_harvest_block() boost::shared_ptr t = associated_torrent().lock(); TORRENT_ASSERT(t); - buffer::const_interval recv_buffer = receive_buffer(); + buffer::const_interval recv_buffer = m_recv_buffer.get(); incoming_piece(front_request, &m_piece[0]); m_requests.pop_front(); if (associated_torrent().expired()) return false; TORRENT_ASSERT(m_block_pos >= front_request.length); m_block_pos -= front_request.length; - cut_receive_buffer(m_body_start, t->block_size() + request_size_overhead); + m_recv_buffer.cut(m_body_start, t->block_size() + request_size_overhead); m_body_start = 0; - recv_buffer = receive_buffer(); + recv_buffer = m_recv_buffer.get(); // TORRENT_ASSERT(m_received_body <= range_end - range_start); m_piece.clear(); TORRENT_ASSERT(m_piece.empty()); @@ -510,7 +510,7 @@ void web_peer_connection::on_receive(error_code const& error == dl_target); #endif - buffer::const_interval recv_buffer = receive_buffer(); + buffer::const_interval recv_buffer = m_recv_buffer.get(); int payload; int protocol; @@ -540,7 +540,7 @@ void web_peer_connection::on_receive(error_code const& error TORRENT_ASSERT(recv_buffer.left() == 0 || *recv_buffer.begin == 'H'); - TORRENT_ASSERT(recv_buffer.left() <= packet_size()); + TORRENT_ASSERT(recv_buffer.left() <= m_recv_buffer.packet_size()); // this means the entire status line hasn't been received yet if (m_parser.status_code() == -1) @@ -800,8 +800,8 @@ void web_peer_connection::on_receive(error_code const& error TORRENT_ASSERT(chunk_size != 0 || chunk_start.left() <= header_size || chunk_start.begin[header_size] == 'H'); // cut out the chunk header from the receive buffer TORRENT_ASSERT(m_body_start + m_chunk_pos < INT_MAX); - cut_receive_buffer(header_size, t->block_size() + request_size_overhead, int(m_body_start + m_chunk_pos)); - recv_buffer = receive_buffer(); + m_recv_buffer.cut(header_size, t->block_size() + request_size_overhead, int(m_body_start + m_chunk_pos)); + recv_buffer = m_recv_buffer.get(); recv_buffer.begin += m_body_start; m_chunk_pos += chunk_size; if (chunk_size == 0) @@ -925,7 +925,7 @@ void web_peer_connection::on_receive(error_code const& error } if (maybe_harvest_block()) - recv_buffer = receive_buffer(); + recv_buffer = m_recv_buffer.get(); if (associated_torrent().expired()) return; } @@ -945,16 +945,16 @@ void web_peer_connection::on_receive(error_code const& error TORRENT_ASSERT(m_block_pos >= r.length); m_block_pos -= r.length; m_received_body += r.length; - TORRENT_ASSERT(receive_buffer().begin + m_body_start == recv_buffer.begin); + TORRENT_ASSERT(m_recv_buffer.get().begin + m_body_start == recv_buffer.begin); TORRENT_ASSERT(m_received_body <= range_end - range_start); - cut_receive_buffer(m_body_start + r.length, t->block_size() + request_size_overhead); + m_recv_buffer.cut(m_body_start + r.length, t->block_size() + request_size_overhead); if (m_chunk_pos > 0) { TORRENT_ASSERT(m_chunk_pos >= r.length); m_chunk_pos -= r.length; } m_body_start = 0; - recv_buffer = receive_buffer(); + recv_buffer = m_recv_buffer.get(); } if (!m_requests.empty()) @@ -989,18 +989,18 @@ void web_peer_connection::on_receive(error_code const& error if (m_received_body == range_end - range_start && (!m_parser.chunked_encoding() || m_chunk_pos == -1)) { - int size_to_cut = recv_buffer.begin - receive_buffer().begin; + int size_to_cut = recv_buffer.begin - m_recv_buffer.get().begin; - TORRENT_ASSERT(receive_buffer().left() < size_to_cut + 1 - || receive_buffer()[size_to_cut] == 'H'); + TORRENT_ASSERT(m_recv_buffer.get().left() < size_to_cut + 1 + || m_recv_buffer.get()[size_to_cut] == 'H'); - cut_receive_buffer(size_to_cut, t->block_size() + request_size_overhead); + m_recv_buffer.cut(size_to_cut, t->block_size() + request_size_overhead); if (m_chunk_pos > 0) { TORRENT_ASSERT(m_chunk_pos >= size_to_cut); m_chunk_pos -= size_to_cut; } - recv_buffer = receive_buffer(); + recv_buffer = m_recv_buffer.get(); m_file_requests.pop_front(); m_parser.reset(); m_body_start = 0; @@ -1072,7 +1072,7 @@ void web_peer_connection::on_receive(error_code const& error incoming_piece_fragment(pad_size); if (maybe_harvest_block()) - recv_buffer = receive_buffer(); + recv_buffer = m_recv_buffer.get(); if (associated_torrent().expired()) return; } } diff --git a/test/test_pe_crypto.cpp b/test/test_pe_crypto.cpp index d3e79b05d..43a512501 100644 --- a/test/test_pe_crypto.cpp +++ b/test/test_pe_crypto.cpp @@ -41,7 +41,7 @@ POSSIBILITY OF SUCH DAMAGE. #include "setup_transfer.hpp" #include "test.hpp" -#ifndef TORRENT_DISABLE_ENCRYPTION +#if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) char const* pe_policy(boost::uint8_t policy) { @@ -137,7 +137,7 @@ void test_transfer(libtorrent::settings_pack::enc_policy policy remove_all("tmp3_pe", ec); } -void test_enc_handler(libtorrent::encryption_handler* a, libtorrent::encryption_handler* b) +void test_enc_handler(libtorrent::crypto_plugin* a, libtorrent::crypto_plugin* b) { #ifdef TORRENT_USE_VALGRIND const int repcount = 10; @@ -153,15 +153,37 @@ void test_enc_handler(libtorrent::encryption_handler* a, libtorrent::encryption_ std::generate(buf, buf + buf_len, &std::rand); std::memcpy(cmp_buf, buf, buf_len); - a->encrypt(buf, buf_len); + using namespace boost::asio; + std::vector iovec; + iovec.push_back(mutable_buffer(buf, buf_len)); + a->encrypt(iovec); TEST_CHECK(!std::equal(buf, buf + buf_len, cmp_buf)); - b->decrypt(buf, buf_len); + TEST_CHECK(iovec.empty()); + int consume = 0; + int produce = buf_len; + int packet_size = 0; + iovec.push_back(mutable_buffer(buf, buf_len)); + b->decrypt(iovec, consume, produce, packet_size); TEST_CHECK(std::equal(buf, buf + buf_len, cmp_buf)); + TEST_CHECK(iovec.empty()); + TEST_EQUAL(consume, 0); + TEST_EQUAL(produce, buf_len); + TEST_EQUAL(packet_size, 0); - b->encrypt(buf, buf_len); + iovec.push_back(mutable_buffer(buf, buf_len)); + b->encrypt(iovec); TEST_CHECK(!std::equal(buf, buf + buf_len, cmp_buf)); - a->decrypt(buf, buf_len); + TEST_CHECK(iovec.empty()); + consume = 0; + produce = buf_len; + packet_size = 0; + iovec.push_back(mutable_buffer(buf, buf_len)); + a->decrypt(iovec, consume, produce, packet_size); TEST_CHECK(std::equal(buf, buf + buf_len, cmp_buf)); + TEST_CHECK(iovec.empty()); + TEST_EQUAL(consume, 0); + TEST_EQUAL(produce, buf_len); + TEST_EQUAL(packet_size, 0); delete[] buf; delete[] cmp_buf; @@ -174,7 +196,7 @@ int test_main() { using namespace libtorrent; -#ifndef TORRENT_DISABLE_ENCRYPTION +#if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS) #ifdef TORRENT_USE_VALGRIND const int repcount = 10;