optimize send buffer allocation and deallocation by not allowing requiring contiguous buffers

This commit is contained in:
Arvid Norberg 2011-05-19 02:41:28 +00:00
parent 24f72b2045
commit 88fa0b9393
10 changed files with 181 additions and 352 deletions

View File

@ -408,8 +408,8 @@ namespace libtorrent
m_total_failed_bytes += b; m_total_failed_bytes += b;
} }
std::pair<char*, int> allocate_buffer(int size); char* allocate_buffer();
void free_buffer(char* buf, int size); void free_buffer(char* buf);
char* allocate_disk_buffer(char const* category); char* allocate_disk_buffer(char const* category);
void free_disk_buffer(char* buf); void free_disk_buffer(char* buf);
@ -516,7 +516,6 @@ namespace libtorrent
// buffers from. // buffers from.
boost::pool<> m_send_buffers; boost::pool<> m_send_buffers;
#endif #endif
mutex m_send_buffer_mutex;
// the file pool that all storages in this session's // the file pool that all storages in this session's
// torrents uses. It sets a limit on the number of // torrents uses. It sets a limit on the number of

View File

@ -278,7 +278,7 @@ namespace libtorrent
void write_pe3_sync(); void write_pe3_sync();
void write_pe4_sync(int crypto_select); void write_pe4_sync(int crypto_select);
void write_pe_vc_cryptofield(buffer::interval& write_buf void write_pe_vc_cryptofield(char* write_buf, int len
, int crypto_field, int pad_size); , int crypto_field, int pad_size);
// stream key (info hash of attached torrent) // stream key (info hash of attached torrent)
@ -293,29 +293,18 @@ public:
// peer_connection functions of the same names // peer_connection functions of the same names
virtual void append_const_send_buffer(char const* buffer, int size); virtual void append_const_send_buffer(char const* buffer, int size);
void send_buffer(char const* buf, int size, int flags = 0); void send_buffer(char const* buf, int size, int flags = 0);
buffer::interval allocate_send_buffer(int size);
template <class Destructor> template <class Destructor>
void append_send_buffer(char* buffer, int size, Destructor const& destructor) void append_send_buffer(char* buffer, int size, Destructor const& destructor)
{ {
#ifndef TORRENT_DISABLE_ENCRYPTION #ifndef TORRENT_DISABLE_ENCRYPTION
if (m_rc4_encrypted) if (m_rc4_encrypted)
{
TORRENT_ASSERT(send_buffer_size() == m_encrypted_bytes);
m_RC4_handler->encrypt(buffer, size); m_RC4_handler->encrypt(buffer, size);
#if defined TORRENT_DEBUG || TORRENT_RELEASE_ASSERTS
m_encrypted_bytes += size;
TORRENT_ASSERT(m_encrypted_bytes == send_buffer_size() + size);
#endif
}
#endif #endif
peer_connection::append_send_buffer(buffer, size, destructor, true); peer_connection::append_send_buffer(buffer, size, destructor, true);
} }
void setup_send();
private: private:
void encrypt_pending_buffer();
// Returns offset at which bytestream (src, src + src_size) // Returns offset at which bytestream (src, src + src_size)
// matches bytestream(target, target + target_size). // matches bytestream(target, target + target_size).
// If no sync found, return -1 // If no sync found, return -1
@ -420,10 +409,6 @@ private:
// the maximum number of bytes // the maximum number of bytes
int m_sync_bytes_read; int m_sync_bytes_read;
// hold information about latest allocated send buffer
// need to check for non zero (begin, end) for operations with this
buffer::interval m_enc_send_buffer;
// initialized during write_pe1_2_dhkey, and destroyed on // initialized during write_pe1_2_dhkey, and destroyed on
// creation of m_RC4_handler. Cannot reinitialize once // creation of m_RC4_handler. Cannot reinitialize once
// initialized. // initialized.
@ -453,12 +438,6 @@ private:
bool m_in_constructor; bool m_in_constructor;
bool m_sent_handshake; bool m_sent_handshake;
// the number of bytes in the send buffer
// that have been encrypted (only used for
// encrypted connections)
public:
int m_encrypted_bytes;
#endif #endif
}; };

View File

@ -123,12 +123,12 @@ namespace libtorrent
// tries to copy the given buffer to the end of the // tries to copy the given buffer to the end of the
// last chained buffer. If there's not enough room // last chained buffer. If there's not enough room
// it returns false // it returns false
bool append(char const* buf, int s) char* append(char const* buf, int s)
{ {
char* insert = allocate_appendix(s); char* insert = allocate_appendix(s);
if (insert == 0) return false; if (insert == 0) return 0;
memcpy(insert, buf, s); memcpy(insert, buf, s);
return true; return insert;
} }
// tries to allocate memory from the end // tries to allocate memory from the end

View File

@ -536,8 +536,8 @@ namespace libtorrent
// these functions are virtual to let bt_peer_connection hook into them // these functions are virtual to let bt_peer_connection hook into them
// and encrypt the content // and encrypt the content
enum message_type_flags { message_type_request = 1 }; enum message_type_flags { message_type_request = 1 };
virtual void send_buffer(char const* begin, int size, int flags = 0); virtual void send_buffer(char const* begin, int size, int flags = 0
virtual buffer::interval allocate_send_buffer(int size); , void (*fun)(char*, int, void*) = 0, void* userdata = 0);
virtual void setup_send(); virtual void setup_send();
#ifdef TORRENT_DISK_STATS #ifdef TORRENT_DISK_STATS

View File

@ -53,6 +53,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/escape_string.hpp" #include "libtorrent/escape_string.hpp"
#include "libtorrent/peer_info.hpp" #include "libtorrent/peer_info.hpp"
#include "libtorrent/random.hpp" #include "libtorrent/random.hpp"
#include "libtorrent/alloca.hpp"
#ifndef TORRENT_DISABLE_ENCRYPTION #ifndef TORRENT_DISABLE_ENCRYPTION
#include "libtorrent/pe_crypto.hpp" #include "libtorrent/pe_crypto.hpp"
@ -110,7 +111,6 @@ namespace libtorrent
, m_encrypted(false) , m_encrypted(false)
, m_rc4_encrypted(false) , m_rc4_encrypted(false)
, m_sync_bytes_read(0) , m_sync_bytes_read(0)
, m_enc_send_buffer(0, 0)
#endif #endif
#if defined TORRENT_DEBUG || TORRENT_RELEASE_ASSERTS #if defined TORRENT_DEBUG || TORRENT_RELEASE_ASSERTS
, m_sent_bitfield(false) , m_sent_bitfield(false)
@ -124,7 +124,6 @@ namespace libtorrent
#if defined TORRENT_DEBUG || TORRENT_RELEASE_ASSERTS #if defined TORRENT_DEBUG || TORRENT_RELEASE_ASSERTS
m_in_constructor = false; m_in_constructor = false;
m_encrypted_bytes = 0;
#endif #endif
memset(m_reserved_bits, 0, sizeof(m_reserved_bits)); memset(m_reserved_bits, 0, sizeof(m_reserved_bits));
} }
@ -148,7 +147,6 @@ namespace libtorrent
, m_encrypted(false) , m_encrypted(false)
, m_rc4_encrypted(false) , m_rc4_encrypted(false)
, m_sync_bytes_read(0) , m_sync_bytes_read(0)
, m_enc_send_buffer(0, 0)
#endif #endif
#if defined TORRENT_DEBUG || TORRENT_RELEASE_ASSERTS #if defined TORRENT_DEBUG || TORRENT_RELEASE_ASSERTS
, m_sent_bitfield(false) , m_sent_bitfield(false)
@ -176,7 +174,6 @@ namespace libtorrent
#if defined TORRENT_DEBUG || TORRENT_RELEASE_ASSERTS #if defined TORRENT_DEBUG || TORRENT_RELEASE_ASSERTS
m_in_constructor = false; m_in_constructor = false;
m_encrypted_bytes = 0;
#endif #endif
memset(m_reserved_bits, 0, sizeof(m_reserved_bits)); memset(m_reserved_bits, 0, sizeof(m_reserved_bits));
} }
@ -434,23 +431,15 @@ namespace libtorrent
peer_log(" pad size: %d", pad_size); peer_log(" pad size: %d", pad_size);
#endif #endif
buffer::interval send_buf = allocate_send_buffer(dh_key_len + pad_size); char msg[dh_key_len + 512];
if (send_buf.begin == 0) char* ptr = msg;
{ int buf_size = dh_key_len + pad_size;
disconnect(errors::no_memory);
return;
}
std::copy(m_dh_key_exchange->get_local_key(), memcpy(ptr, m_dh_key_exchange->get_local_key(), dh_key_len);
m_dh_key_exchange->get_local_key() + dh_key_len, ptr += dh_key_len;
send_buf.begin);
std::generate(send_buf.begin + dh_key_len, send_buf.end, std::rand); std::generate(ptr, ptr + pad_size, random);
#if defined TORRENT_DEBUG || TORRENT_RELEASE_ASSERTS send_buffer(msg, buf_size);
m_encrypted_bytes += send_buf.left();
TORRENT_ASSERT(m_encrypted_bytes <= send_buffer_size());
#endif
setup_send();
#ifdef TORRENT_VERBOSE_LOGGING #ifdef TORRENT_VERBOSE_LOGGING
peer_log(" sent DH key"); peer_log(" sent DH key");
@ -475,12 +464,9 @@ namespace libtorrent
int pad_size = random() % 512; int pad_size = random() % 512;
TORRENT_ASSERT(!m_rc4_encrypted || send_buffer_size() == m_encrypted_bytes);
// synchash,skeyhash,vc,crypto_provide,len(pad),pad,len(ia) // synchash,skeyhash,vc,crypto_provide,len(pad),pad,len(ia)
buffer::interval send_buf = char msg[20 + 20 + 8 + 4 + 2 + 512 + 2];
allocate_send_buffer(20 + 20 + 8 + 4 + 2 + pad_size + 2); char* ptr = msg;
if (send_buf.begin == 0) return; // out of memory
// sync hash (hash('req1',S)) // sync hash (hash('req1',S))
h.reset(); h.reset();
@ -488,8 +474,8 @@ namespace libtorrent
h.update(secret, dh_key_len); h.update(secret, dh_key_len);
sha1_hash sync_hash = h.final(); sha1_hash sync_hash = h.final();
std::copy(sync_hash.begin(), sync_hash.end(), send_buf.begin); memcpy(ptr, &sync_hash[0], 20);
send_buf.begin += 20; ptr += 20;
// stream key obfuscated hash [ hash('req2',SKEY) xor hash('req3',S) ] // stream key obfuscated hash [ hash('req2',SKEY) xor hash('req3',S) ]
h.reset(); h.reset();
@ -503,16 +489,15 @@ namespace libtorrent
sha1_hash obfsc_hash = h.final(); sha1_hash obfsc_hash = h.final();
obfsc_hash ^= streamkey_hash; obfsc_hash ^= streamkey_hash;
std::copy(obfsc_hash.begin(), obfsc_hash.end(), send_buf.begin); memcpy(ptr, &obfsc_hash[0], 20);
send_buf.begin += 20; ptr += 20;
// Discard DH key exchange data, setup RC4 keys // Discard DH key exchange data, setup RC4 keys
init_pe_RC4_handler(secret, info_hash); init_pe_RC4_handler(secret, info_hash);
m_dh_key_exchange.reset(); // secret should be invalid at this point m_dh_key_exchange.reset(); // secret should be invalid at this point
// write the verification constant and crypto field // write the verification constant and crypto field
TORRENT_ASSERT(send_buf.left() == 8 + 4 + 2 + pad_size + 2); int encrypt_size = sizeof(msg) - 512 + pad_size - 40;
int encrypt_size = send_buf.left();
int crypto_provide = 0; int crypto_provide = 0;
pe_settings::enc_level const& allowed_enc_level = m_ses.get_pe_settings().allowed_enc_level; pe_settings::enc_level const& allowed_enc_level = m_ses.get_pe_settings().allowed_enc_level;
@ -530,14 +515,9 @@ namespace libtorrent
, level[allowed_enc_level]); , level[allowed_enc_level]);
#endif #endif
write_pe_vc_cryptofield(send_buf, crypto_provide, pad_size); write_pe_vc_cryptofield(ptr, encrypt_size, crypto_provide, pad_size);
m_RC4_handler->encrypt(send_buf.end - encrypt_size, encrypt_size); m_RC4_handler->encrypt(ptr, encrypt_size);
#if defined TORRENT_DEBUG || TORRENT_RELEASE_ASSERTS send_buffer(msg, sizeof(msg) - 512 + pad_size);
m_encrypted_bytes = send_buffer_size();
#endif
TORRENT_ASSERT(send_buf.begin == send_buf.end);
setup_send();
} }
void bt_peer_connection::write_pe4_sync(int crypto_select) void bt_peer_connection::write_pe4_sync(int crypto_select)
@ -552,20 +532,12 @@ namespace libtorrent
int pad_size = random() % 512; int pad_size = random() % 512;
TORRENT_ASSERT(!m_rc4_encrypted || send_buffer_size() == m_encrypted_bytes);
const int buf_size = 8 + 4 + 2 + pad_size; const int buf_size = 8 + 4 + 2 + pad_size;
buffer::interval send_buf = allocate_send_buffer(buf_size); char msg[512 + 8 + 4 + 2];
if (send_buf.begin == 0) return; // out of memory write_pe_vc_cryptofield(msg, sizeof(msg), crypto_select, pad_size);
write_pe_vc_cryptofield(send_buf, crypto_select, pad_size);
m_RC4_handler->encrypt(send_buf.end - buf_size, buf_size); m_RC4_handler->encrypt(msg, buf_size);
TORRENT_ASSERT(send_buffer_size() - buf_size == m_encrypted_bytes); send_buffer(msg, buf_size);
#if defined TORRENT_DEBUG || TORRENT_RELEASE_ASSERTS
m_encrypted_bytes += buf_size;
TORRENT_ASSERT(m_encrypted_bytes <= send_buffer_size());
#endif
setup_send();
// encryption method has been negotiated // encryption method has been negotiated
if (crypto_select == 0x02) if (crypto_select == 0x02)
@ -579,36 +551,34 @@ namespace libtorrent
#endif #endif
} }
void bt_peer_connection::write_pe_vc_cryptofield(buffer::interval& write_buf void bt_peer_connection::write_pe_vc_cryptofield(char* write_buf, int len
, int crypto_field, int pad_size) , int crypto_field, int pad_size)
{ {
INVARIANT_CHECK; INVARIANT_CHECK;
TORRENT_ASSERT(crypto_field <= 0x03 && crypto_field > 0); TORRENT_ASSERT(crypto_field <= 0x03 && crypto_field > 0);
// vc,crypto_field,len(pad),pad, (len(ia)) // vc,crypto_field,len(pad),pad, (len(ia))
TORRENT_ASSERT( (write_buf.left() == 8+4+2+pad_size+2 && is_local()) || TORRENT_ASSERT((len >= 8+4+2+pad_size+2 && is_local())
(write_buf.left() == 8+4+2+pad_size && !is_local()) ); || (len >= 8+4+2+pad_size && !is_local()));
TORRENT_ASSERT(!m_sent_handshake); TORRENT_ASSERT(!m_sent_handshake);
// encrypt(vc, crypto_provide/select, len(Pad), len(IA)) // encrypt(vc, crypto_provide/select, len(Pad), len(IA))
// len(pad) is zero for now, len(IA) only for outgoing connections // len(pad) is zero for now, len(IA) only for outgoing connections
// vc // vc
std::fill(write_buf.begin, write_buf.begin + 8, 0); memset(write_buf, 0, 8);
write_buf.begin += 8; write_buf += 8;
detail::write_uint32(crypto_field, write_buf.begin); detail::write_uint32(crypto_field, write_buf);
detail::write_uint16(pad_size, write_buf.begin); // len (pad) detail::write_uint16(pad_size, write_buf); // len (pad)
// fill pad with zeroes // fill pad with zeroes
std::generate(write_buf.begin, write_buf.begin + pad_size, &std::rand); std::generate(write_buf, write_buf + pad_size, &random);
write_buf.begin += pad_size; write_buf += pad_size;
// append len(ia) if we are initiating // append len(ia) if we are initiating
if (is_local()) if (is_local())
detail::write_uint16(handshake_len, write_buf.begin); // len(IA) detail::write_uint16(handshake_len, write_buf); // len(IA)
TORRENT_ASSERT(write_buf.begin == write_buf.end);
} }
void bt_peer_connection::init_pe_RC4_handler(char const* secret, sha1_hash const& stream_key) void bt_peer_connection::init_pe_RC4_handler(char const* secret, sha1_hash const& stream_key)
@ -656,13 +626,14 @@ namespace libtorrent
void bt_peer_connection::append_const_send_buffer(char const* buffer, int size) void bt_peer_connection::append_const_send_buffer(char const* buffer, int size)
{ {
TORRENT_ASSERT(!m_rc4_encrypted || send_buffer_size() == m_encrypted_bytes);
// if we're encrypting this buffer, we need to make a copy
// since we'll mutate it
#ifndef TORRENT_DISABLE_ENCRYPTION #ifndef TORRENT_DISABLE_ENCRYPTION
if (m_encrypted && m_rc4_encrypted) if (m_encrypted && m_rc4_encrypted)
{ {
send_buffer(buffer, size); // if we're encrypting this buffer, we need to make a copy
// since we'll mutate it
char* buf = (char*)malloc(size);
memcpy(buf, buffer, size);
bt_peer_connection::append_send_buffer(buf, size, boost::bind(&::free, _1));
} }
else else
#endif #endif
@ -671,72 +642,28 @@ namespace libtorrent
} }
} }
void encrypt(char* buf, int len, void* userdata)
{
RC4_handler* rc4 = (RC4_handler*)userdata;
rc4->encrypt(buf, len);
}
void bt_peer_connection::send_buffer(char const* buf, int size, int flags) void bt_peer_connection::send_buffer(char const* buf, int size, int flags)
{ {
TORRENT_ASSERT(buf); TORRENT_ASSERT(buf);
TORRENT_ASSERT(size > 0); TORRENT_ASSERT(size > 0);
void* userdata = 0;
void (*fun)(char*, int, void*) = 0;
#ifndef TORRENT_DISABLE_ENCRYPTION #ifndef TORRENT_DISABLE_ENCRYPTION
encrypt_pending_buffer();
if (m_encrypted && m_rc4_encrypted) if (m_encrypted && m_rc4_encrypted)
{ {
TORRENT_ASSERT(!m_rc4_encrypted || send_buffer_size() == m_encrypted_bytes); fun = encrypt;
m_RC4_handler->encrypt(const_cast<char*>(buf), size); userdata = m_RC4_handler.get();
#if defined TORRENT_DEBUG || TORRENT_RELEASE_ASSERTS
m_encrypted_bytes += size;
#endif
} }
#endif #endif
peer_connection::send_buffer(buf, size, flags); peer_connection::send_buffer(buf, size, flags, fun, userdata);
}
buffer::interval bt_peer_connection::allocate_send_buffer(int size)
{
#ifndef TORRENT_DISABLE_ENCRYPTION
encrypt_pending_buffer();
if (m_encrypted && m_rc4_encrypted)
{
TORRENT_ASSERT(m_enc_send_buffer.left() == 0);
m_enc_send_buffer = peer_connection::allocate_send_buffer(size);
return m_enc_send_buffer;
}
else
#endif
{
buffer::interval i = peer_connection::allocate_send_buffer(size);
return i;
}
}
#ifndef TORRENT_DISABLE_ENCRYPTION
void bt_peer_connection::encrypt_pending_buffer()
{
if (m_encrypted && m_rc4_encrypted && m_enc_send_buffer.left())
{
TORRENT_ASSERT(m_enc_send_buffer.begin);
TORRENT_ASSERT(m_enc_send_buffer.end);
TORRENT_ASSERT(m_RC4_handler);
TORRENT_ASSERT(send_buffer_size() - m_enc_send_buffer.left() == m_encrypted_bytes);
#if defined TORRENT_DEBUG || TORRENT_RELEASE_ASSERTS
m_encrypted_bytes += m_enc_send_buffer.left();
TORRENT_ASSERT(m_encrypted_bytes <= send_buffer_size());
#endif
m_RC4_handler->encrypt(m_enc_send_buffer.begin, m_enc_send_buffer.left());
m_enc_send_buffer.end = m_enc_send_buffer.begin;
}
}
#endif
void bt_peer_connection::setup_send()
{
#ifndef TORRENT_DISABLE_ENCRYPTION
encrypt_pending_buffer();
TORRENT_ASSERT(!m_encrypted || !m_rc4_encrypted || m_encrypted_bytes == send_buffer_size());
#endif
peer_connection::setup_send();
} }
int bt_peer_connection::get_syncoffset(char const* src, int src_size, int bt_peer_connection::get_syncoffset(char const* src, int src_size,
@ -801,37 +728,31 @@ namespace libtorrent
const char version_string[] = "BitTorrent protocol"; const char version_string[] = "BitTorrent protocol";
const int string_len = sizeof(version_string)-1; const int string_len = sizeof(version_string)-1;
buffer::interval i = allocate_send_buffer(1 + string_len + 8 + 20 + 20); char handshake[1 + string_len + 8 + 20 + 20];
if (i.begin == 0) return; // out of memory char* ptr = handshake;
// length of version string // length of version string
*i.begin = string_len; detail::write_uint8(string_len, ptr);
++i.begin; // protocol identifier
memcpy(ptr, version_string, string_len);
// version string itself ptr += string_len;
std::copy(
version_string
, version_string + string_len
, i.begin);
i.begin += string_len;
// 8 zeroes // 8 zeroes
std::fill(i.begin, i.begin + 8, 0); memset(ptr, 0, 8);
#ifndef TORRENT_DISABLE_DHT #ifndef TORRENT_DISABLE_DHT
// indicate that we support the DHT messages // indicate that we support the DHT messages
*(i.begin + 7) |= 0x01; *(ptr + 7) |= 0x01;
#endif #endif
#ifndef TORRENT_DISABLE_EXTENSIONS #ifndef TORRENT_DISABLE_EXTENSIONS
// we support extensions // we support extensions
*(i.begin + 5) |= 0x10; *(ptr + 5) |= 0x10;
#endif #endif
// we support merkle torrents // we support merkle torrents
*(i.begin + 5) |= 0x08; *(ptr + 5) |= 0x08;
// we support FAST extension // we support FAST extension
*(i.begin + 7) |= 0x04; *(ptr + 7) |= 0x04;
#ifdef TORRENT_VERBOSE_LOGGING #ifdef TORRENT_VERBOSE_LOGGING
std::string bitmask; std::string bitmask;
@ -839,31 +760,27 @@ namespace libtorrent
{ {
for (int j = 0; j < 8; ++j) for (int j = 0; j < 8; ++j)
{ {
if (i.begin[k] & (0x80 >> j)) bitmask += '1'; if (ptr[k] & (0x80 >> j)) bitmask += '1';
else bitmask += '0'; else bitmask += '0';
} }
} }
peer_log(">>> EXTENSION_BITS [ %s ]", bitmask.c_str()); peer_log(">>> EXTENSION_BITS [ %s ]", bitmask.c_str());
#endif #endif
i.begin += 8; ptr += 8;
// info hash // info hash
sha1_hash const& ih = t->torrent_file().info_hash(); sha1_hash const& ih = t->torrent_file().info_hash();
std::copy(ih.begin(), ih.end(), i.begin); memcpy(ptr, &ih[0], 20);
i.begin += 20; ptr += 20;
// peer id // peer id
std::copy( memcpy(ptr, &m_ses.get_peer_id()[0], 20);
m_ses.get_peer_id().begin() ptr += 20;
, m_ses.get_peer_id().end()
, i.begin);
i.begin += 20;
TORRENT_ASSERT(i.begin == i.end);
#ifdef TORRENT_VERBOSE_LOGGING #ifdef TORRENT_VERBOSE_LOGGING
peer_log("==> HANDSHAKE [ ih: %s ]", to_hex(ih.to_string()).c_str()); peer_log("==> HANDSHAKE [ ih: %s ]", to_hex(ih.to_string()).c_str());
#endif #endif
setup_send(); send_buffer(handshake, sizeof(handshake));
} }
boost::optional<piece_block_progress> bt_peer_connection::downloading_piece_progress() const boost::optional<piece_block_progress> bt_peer_connection::downloading_piece_progress() const
@ -2060,40 +1977,39 @@ namespace libtorrent
const int packet_size = (num_pieces + 7) / 8 + 5; const int packet_size = (num_pieces + 7) / 8 + 5;
buffer::interval i = allocate_send_buffer(packet_size); char* msg = TORRENT_ALLOCA(char, packet_size);
if (i.begin == 0) return; // out of memory if (msg == 0) return; // out of memory
unsigned char* ptr = (unsigned char*)msg;
detail::write_int32(packet_size - 4, i.begin); detail::write_int32(packet_size - 4, ptr);
detail::write_uint8(msg_bitfield, i.begin); detail::write_uint8(msg_bitfield, ptr);
if (t->is_seed()) if (t->is_seed())
{ {
memset(i.begin, 0xff, packet_size - 6); memset(ptr, 0xff, packet_size - 6);
// Clear trailing bits // Clear trailing bits
unsigned char *p = ((unsigned char *)i.begin) + packet_size - 6; unsigned char *p = ((unsigned char *)msg) + packet_size - 1;
*p = (0xff << ((8 - (num_pieces & 7)) & 7)) & 0xff; *p = (0xff << ((8 - (num_pieces & 7)) & 7)) & 0xff;
} }
else else
{ {
memset(i.begin, 0, packet_size - 5); memset(ptr, 0, packet_size - 5);
piece_picker const& p = t->picker(); piece_picker const& p = t->picker();
int mask = 0x80; int mask = 0x80;
unsigned char* byte = (unsigned char*)i.begin;
for (int i = 0; i < num_pieces; ++i) for (int i = 0; i < num_pieces; ++i)
{ {
if (p.have_piece(i)) *byte |= mask; if (p.have_piece(i)) *ptr |= mask;
mask >>= 1; mask >>= 1;
if (mask == 0) if (mask == 0)
{ {
mask = 0x80; mask = 0x80;
++byte; ++ptr;
} }
} }
} }
for (int c = 0; c < num_lazy_pieces; ++c) for (int c = 0; c < num_lazy_pieces; ++c)
i.begin[lazy_pieces[c] / 8] &= ~(0x80 >> (lazy_pieces[c] & 7)); msg[5 + lazy_pieces[c] / 8] &= ~(0x80 >> (lazy_pieces[c] & 7));
TORRENT_ASSERT(i.end - i.begin == (num_pieces + 7) / 8);
#ifdef TORRENT_VERBOSE_LOGGING #ifdef TORRENT_VERBOSE_LOGGING
@ -2101,7 +2017,7 @@ namespace libtorrent
bitfield_string.resize(num_pieces); bitfield_string.resize(num_pieces);
for (int k = 0; k < num_pieces; ++k) for (int k = 0; k < num_pieces; ++k)
{ {
if (i.begin[k / 8] & (0x80 >> (k % 8))) bitfield_string[k] = '1'; if (msg[5 + k / 8] & (0x80 >> (k % 8))) bitfield_string[k] = '1';
else bitfield_string[k] = '0'; else bitfield_string[k] = '0';
} }
peer_log("==> BITFIELD [ %s ]", bitfield_string.c_str()); peer_log("==> BITFIELD [ %s ]", bitfield_string.c_str());
@ -2110,7 +2026,7 @@ namespace libtorrent
m_sent_bitfield = true; m_sent_bitfield = true;
#endif #endif
setup_send(); send_buffer(msg, packet_size);
if (num_lazy_pieces > 0) if (num_lazy_pieces > 0)
{ {
@ -2219,30 +2135,25 @@ namespace libtorrent
} }
#endif #endif
std::vector<char> msg; std::vector<char> dict_msg;
bencode(std::back_inserter(msg), handshake); bencode(std::back_inserter(dict_msg), handshake);
// make room for message char msg[6];
buffer::interval i = allocate_send_buffer(6 + msg.size()); char* ptr = msg;
if (i.begin == 0) return; // out of memory
// write the length of the message // write the length of the message
detail::write_int32((int)msg.size() + 2, i.begin); detail::write_int32((int)dict_msg.size() + 2, ptr);
detail::write_uint8(msg_extended, i.begin); detail::write_uint8(msg_extended, ptr);
// signal handshake message // signal handshake message
detail::write_uint8(0, i.begin); detail::write_uint8(0, ptr);
send_buffer(msg, sizeof(msg));
std::copy(msg.begin(), msg.end(), i.begin); send_buffer(&dict_msg[0], dict_msg.size());
i.begin += msg.size();
TORRENT_ASSERT(i.begin == i.end);
#if defined TORRENT_VERBOSE_LOGGING && TORRENT_USE_IOSTREAM #if defined TORRENT_VERBOSE_LOGGING && TORRENT_USE_IOSTREAM
std::stringstream handshake_str; std::stringstream handshake_str;
handshake.print(handshake_str); handshake.print(handshake_str);
peer_log("==> EXTENDED HANDSHAKE: %s", handshake_str.str().c_str()); peer_log("==> EXTENDED HANDSHAKE: %s", handshake_str.str().c_str());
#endif #endif
setup_send();
} }
#endif #endif
@ -2449,8 +2360,6 @@ namespace libtorrent
peer_log("*** received DH key"); peer_log("*** received DH key");
#endif #endif
TORRENT_ASSERT(!m_rc4_encrypted || send_buffer_size() == m_encrypted_bytes);
// PadA/B can be a max of 512 bytes, and 20 bytes more for // PadA/B can be a max of 512 bytes, and 20 bytes more for
// the sync hash (if incoming), or 8 bytes more for the // the sync hash (if incoming), or 8 bytes more for the
// encrypted verification constant (if outgoing). Instead // encrypted verification constant (if outgoing). Instead
@ -2791,9 +2700,6 @@ namespace libtorrent
return; return;
} }
m_rc4_encrypted = true; m_rc4_encrypted = true;
#if defined TORRENT_DEBUG || TORRENT_RELEASE_ASSERTS
m_encrypted_bytes = send_buffer_size();
#endif
} }
else if (crypto_field == 0x01) else if (crypto_field == 0x01)
{ {
@ -3366,22 +3272,6 @@ namespace libtorrent
std::remove_if(m_payloads.begin(), m_payloads.end(), range_below_zero) std::remove_if(m_payloads.begin(), m_payloads.end(), range_below_zero)
, m_payloads.end()); , m_payloads.end());
#if (defined TORRENT_DEBUG || TORRENT_RELEASE_ASSERTS) && !defined TORRENT_DISABLE_ENCRYPTION
if (m_encrypted_bytes > 0)
{
if (m_rc4_encrypted)
{
m_encrypted_bytes -= bytes_transferred;
TORRENT_ASSERT(m_encrypted_bytes >= 0);
}
else
{
m_encrypted_bytes -= (std::min)(int(bytes_transferred), m_encrypted_bytes);
}
TORRENT_ASSERT(m_encrypted_bytes >= 0);
}
#endif
TORRENT_ASSERT(amount_payload <= (int)bytes_transferred); TORRENT_ASSERT(amount_payload <= (int)bytes_transferred);
m_statistics.sent_bytes(amount_payload, bytes_transferred - amount_payload); m_statistics.sent_bytes(amount_payload, bytes_transferred - amount_payload);

View File

@ -263,15 +263,14 @@ namespace libtorrent { namespace
std::vector<char> const& tex_msg = m_tp.get_lt_tex_msg(); std::vector<char> const& tex_msg = m_tp.get_lt_tex_msg();
buffer::interval i = m_pc.allocate_send_buffer(6 + tex_msg.size()); char msg[6];
char* ptr = msg;
detail::write_uint32(1 + 1 + tex_msg.size(), i.begin); detail::write_uint32(1 + 1 + tex_msg.size(), ptr);
detail::write_uint8(bt_peer_connection::msg_extended, i.begin); detail::write_uint8(bt_peer_connection::msg_extended, ptr);
detail::write_uint8(m_message_index, i.begin); detail::write_uint8(m_message_index, ptr);
std::copy(tex_msg.begin(), tex_msg.end(), i.begin); m_pc.send_buffer(msg, sizeof(msg));
i.begin += tex_msg.size(); m_pc.send_buffer(&tex_msg[0], tex_msg.size());
TORRENT_ASSERT(i.begin == i.end);
m_pc.setup_send(); m_pc.setup_send();
} }
@ -303,15 +302,14 @@ namespace libtorrent { namespace
(*m_pc.m_logger) << log_line.str(); (*m_pc.m_logger) << log_line.str();
#endif #endif
buffer::interval i = m_pc.allocate_send_buffer(6 + tex_msg.size()); char msg[6];
char* ptr = msg;
detail::write_uint32(1 + 1 + tex_msg.size(), i.begin); detail::write_uint32(1 + 1 + tex_msg.size(), ptr);
detail::write_uint8(bt_peer_connection::msg_extended, i.begin); detail::write_uint8(bt_peer_connection::msg_extended, ptr);
detail::write_uint8(m_message_index, i.begin); detail::write_uint8(m_message_index, ptr);
std::copy(tex_msg.begin(), tex_msg.end(), i.begin); m_pc.send_buffer(msg, sizeof(msg));
i.begin += tex_msg.size(); m_pc.send_buffer(&tex_msg[0], tex_msg.size());
TORRENT_ASSERT(i.begin == i.end);
m_pc.setup_send(); m_pc.setup_send();
} }

View File

@ -293,16 +293,17 @@ namespace libtorrent { namespace
<< " ==> METADATA_REQUEST [ start: " << start << " | size: " << size << " ]\n"; << " ==> METADATA_REQUEST [ start: " << start << " | size: " << size << " ]\n";
#endif #endif
buffer::interval i = m_pc.allocate_send_buffer(9); char msg[9];
char* ptr = msg;
detail::write_uint32(1 + 1 + 3, i.begin); detail::write_uint32(1 + 1 + 3, ptr);
detail::write_uint8(bt_peer_connection::msg_extended, i.begin); detail::write_uint8(bt_peer_connection::msg_extended, ptr);
detail::write_uint8(m_message_index, i.begin); detail::write_uint8(m_message_index, ptr);
// means 'request data' // means 'request data'
detail::write_uint8(0, i.begin); detail::write_uint8(0, ptr);
detail::write_uint8(start, i.begin); detail::write_uint8(start, ptr);
detail::write_uint8(size - 1, i.begin); detail::write_uint8(size - 1, ptr);
TORRENT_ASSERT(i.begin == i.end); m_pc.send_buffer(msg, sizeof(msg));
m_pc.setup_send(); m_pc.setup_send();
} }
@ -322,9 +323,8 @@ namespace libtorrent { namespace
std::pair<int, int> offset std::pair<int, int> offset
= req_to_offset(req, (int)m_tp.metadata().left()); = req_to_offset(req, (int)m_tp.metadata().left());
// TODO: don't allocate send buffer for the metadata part char msg[15];
// just tag it on as a separate buffer like ut_metadata char* ptr = msg;
buffer::interval i = m_pc.allocate_send_buffer(15 + offset.second);
#ifdef TORRENT_VERBOSE_LOGGING #ifdef TORRENT_VERBOSE_LOGGING
(*m_pc.m_logger) << time_now_string() (*m_pc.m_logger) << time_now_string()
@ -335,18 +335,16 @@ namespace libtorrent { namespace
<< " ]\n"; << " ]\n";
#endif #endif
// yes, we have metadata, send it // yes, we have metadata, send it
detail::write_uint32(11 + offset.second, i.begin); detail::write_uint32(11 + offset.second, ptr);
detail::write_uint8(bt_peer_connection::msg_extended, i.begin); detail::write_uint8(bt_peer_connection::msg_extended, ptr);
detail::write_uint8(m_message_index, i.begin); detail::write_uint8(m_message_index, ptr);
// means 'data packet' // means 'data packet'
detail::write_uint8(1, i.begin); detail::write_uint8(1, ptr);
detail::write_uint32((int)m_tp.metadata().left(), i.begin); detail::write_uint32((int)m_tp.metadata().left(), ptr);
detail::write_uint32(offset.first, i.begin); detail::write_uint32(offset.first, ptr);
m_pc.send_buffer(msg, sizeof(msg));
char const* metadata = m_tp.metadata().begin; char const* metadata = m_tp.metadata().begin;
std::copy(metadata + offset.first m_pc.append_const_send_buffer(metadata + offset.first, offset.second);
, metadata + offset.first + offset.second, i.begin);
i.begin += offset.second;
TORRENT_ASSERT(i.begin == i.end);
} }
else else
{ {
@ -354,15 +352,17 @@ namespace libtorrent { namespace
(*m_pc.m_logger) << time_now_string() (*m_pc.m_logger) << time_now_string()
<< " ==> DONT HAVE METADATA\n"; << " ==> DONT HAVE METADATA\n";
#endif #endif
buffer::interval i = m_pc.allocate_send_buffer(4 + 3); char msg[4+3];
char* ptr = msg;
// we don't have the metadata, reply with // we don't have the metadata, reply with
// don't have-message // don't have-message
detail::write_uint32(1 + 2, i.begin); detail::write_uint32(1 + 2, ptr);
detail::write_uint8(bt_peer_connection::msg_extended, i.begin); detail::write_uint8(bt_peer_connection::msg_extended, ptr);
detail::write_uint8(m_message_index, i.begin); detail::write_uint8(m_message_index, ptr);
// means 'have no data' // means 'have no data'
detail::write_uint8(2, i.begin); detail::write_uint8(2, ptr);
TORRENT_ASSERT(i.begin == i.end); m_pc.send_buffer(msg, sizeof(msg));
} }
m_pc.setup_send(); m_pc.setup_send();
} }

View File

@ -4854,7 +4854,8 @@ namespace libtorrent
#endif #endif
} }
void peer_connection::send_buffer(char const* buf, int size, int flags) void peer_connection::send_buffer(char const* buf, int size, int flags
, void (*fun)(char*, int, void*), void* userdata)
{ {
if (flags == message_type_request) if (flags == message_type_request)
m_requests_in_buffer.push_back(m_send_buffer.size() + size); m_requests_in_buffer.push_back(m_send_buffer.size() + size);
@ -4863,7 +4864,9 @@ namespace libtorrent
if (free_space > size) free_space = size; if (free_space > size) free_space = size;
if (free_space > 0) if (free_space > 0)
{ {
m_send_buffer.append(buf, free_space); char* dst = m_send_buffer.append(buf, free_space);
TORRENT_ASSERT(dst != 0);
if (fun) fun(dst, free_space, userdata);
size -= free_space; size -= free_space;
buf += free_space; buf += free_space;
#if defined TORRENT_STATS && defined TORRENT_DISK_STATS #if defined TORRENT_STATS && defined TORRENT_DISK_STATS
@ -4874,56 +4877,30 @@ namespace libtorrent
} }
if (size <= 0) return; if (size <= 0) return;
std::pair<char*, int> buffer = m_ses.allocate_buffer(size);
if (buffer.first == 0)
{
disconnect(errors::no_memory);
return;
}
TORRENT_ASSERT(buffer.second >= size);
std::memcpy(buffer.first, buf, size);
m_send_buffer.append_buffer(buffer.first, buffer.second, size
, boost::bind(&session_impl::free_buffer, boost::ref(m_ses), _1, buffer.second));
#if defined TORRENT_STATS && defined TORRENT_DISK_STATS #if defined TORRENT_STATS && defined TORRENT_DISK_STATS
m_ses.m_buffer_usage_logger << log_time() << " send_buffer_alloc: " << size << std::endl; m_ses.m_buffer_usage_logger << log_time() << " send_buffer_alloc: " << size << std::endl;
m_ses.log_buffer_usage(); m_ses.log_buffer_usage();
#endif #endif
setup_send(); int i = 0;
} while (size > 0)
// TODO: change this interface to automatically call setup_send() when the
// return value is destructed
buffer::interval peer_connection::allocate_send_buffer(int size)
{
TORRENT_ASSERT(size > 0);
char* insert = m_send_buffer.allocate_appendix(size);
if (insert == 0)
{ {
std::pair<char*, int> buffer = m_ses.allocate_buffer(size); char* chain_buf = m_ses.allocate_buffer();
if (buffer.first == 0) if (chain_buf == 0)
{ {
disconnect(errors::no_memory); disconnect(errors::no_memory);
return buffer::interval(0, 0); return;
} }
TORRENT_ASSERT(buffer.second >= size);
m_send_buffer.append_buffer(buffer.first, buffer.second, size int buf_size = (std::min)(int(aux::session_impl::send_buffer_size), size);
, boost::bind(&session_impl::free_buffer, boost::ref(m_ses), _1, buffer.second)); memcpy(chain_buf, buf, buf_size);
buffer::interval ret(buffer.first, buffer.first + size); if (fun) fun(chain_buf, buf_size, userdata);
#if defined TORRENT_STATS && defined TORRENT_DISK_STATS buf += buf_size;
m_ses.m_buffer_usage_logger << log_time() << " allocate_buffer_alloc: " << size << std::endl; size -= buf_size;
m_ses.log_buffer_usage(); m_send_buffer.append_buffer(chain_buf, aux::session_impl::send_buffer_size, buf_size
#endif , boost::bind(&session_impl::free_buffer, boost::ref(m_ses), _1));
return ret; ++i;
}
else
{
#if defined TORRENT_STATS && defined TORRENT_DISK_STATS
m_ses.m_buffer_usage_logger << log_time() << " allocate_buffer: " << size << std::endl;
m_ses.log_buffer_usage();
#endif
buffer::interval ret(insert, insert + size);
return ret;
} }
setup_send();
} }
template<class T> template<class T>

View File

@ -5159,25 +5159,21 @@ namespace aux {
return m_disk_thread.allocate_buffer(category); return m_disk_thread.allocate_buffer(category);
} }
std::pair<char*, int> session_impl::allocate_buffer(int size) char* session_impl::allocate_buffer()
{ {
TORRENT_ASSERT(size > 0); TORRENT_ASSERT(is_network_thread());
int num_buffers = (size + send_buffer_size - 1) / send_buffer_size;
TORRENT_ASSERT(num_buffers > 0);
mutex::scoped_lock l(m_send_buffer_mutex);
#ifdef TORRENT_DISK_STATS #ifdef TORRENT_DISK_STATS
TORRENT_ASSERT(m_buffer_allocations >= 0); TORRENT_ASSERT(m_buffer_allocations >= 0);
m_buffer_allocations += num_buffers; m_buffer_allocations++;
m_buffer_usage_logger << log_time() << " protocol_buffer: " m_buffer_usage_logger << log_time() << " protocol_buffer: "
<< (m_buffer_allocations * send_buffer_size) << std::endl; << (m_buffer_allocations * send_buffer_size) << std::endl;
#endif #endif
#ifdef TORRENT_DISABLE_POOL_ALLOCATOR #ifdef TORRENT_DISABLE_POOL_ALLOCATOR
int num_bytes = num_buffers * send_buffer_size; int num_bytes = send_buffer_size;
return std::make_pair((char*)malloc(num_bytes), num_bytes); return (char*)malloc(num_bytes);
#else #else
return std::make_pair((char*)m_send_buffers.ordered_malloc(num_buffers) return (char*)m_send_buffers.malloc();
, num_buffers * send_buffer_size);
#endif #endif
} }
@ -5202,16 +5198,12 @@ namespace aux {
} }
#endif #endif
void session_impl::free_buffer(char* buf, int size) void session_impl::free_buffer(char* buf)
{ {
TORRENT_ASSERT(size > 0); TORRENT_ASSERT(is_network_thread());
TORRENT_ASSERT(size % send_buffer_size == 0);
int num_buffers = size / send_buffer_size;
TORRENT_ASSERT(num_buffers > 0);
mutex::scoped_lock l(m_send_buffer_mutex);
#ifdef TORRENT_DISK_STATS #ifdef TORRENT_DISK_STATS
m_buffer_allocations -= num_buffers; m_buffer_allocations--;
TORRENT_ASSERT(m_buffer_allocations >= 0); TORRENT_ASSERT(m_buffer_allocations >= 0);
m_buffer_usage_logger << log_time() << " protocol_buffer: " m_buffer_usage_logger << log_time() << " protocol_buffer: "
<< (m_buffer_allocations * send_buffer_size) << std::endl; << (m_buffer_allocations * send_buffer_size) << std::endl;
@ -5219,7 +5211,7 @@ namespace aux {
#ifdef TORRENT_DISABLE_POOL_ALLOCATOR #ifdef TORRENT_DISABLE_POOL_ALLOCATOR
free(buf); free(buf);
#else #else
m_send_buffers.ordered_free(buf, num_buffers); m_send_buffers.free(buf);
#endif #endif
} }

View File

@ -435,15 +435,14 @@ namespace libtorrent { namespace
std::vector<char> const& pex_msg = m_tp.get_ut_pex_msg(); std::vector<char> const& pex_msg = m_tp.get_ut_pex_msg();
buffer::interval i = m_pc.allocate_send_buffer(6 + pex_msg.size()); char msg[6];
char* ptr = msg;
detail::write_uint32(1 + 1 + pex_msg.size(), i.begin); detail::write_uint32(1 + 1 + pex_msg.size(), ptr);
detail::write_uint8(bt_peer_connection::msg_extended, i.begin); detail::write_uint8(bt_peer_connection::msg_extended, ptr);
detail::write_uint8(m_message_index, i.begin); detail::write_uint8(m_message_index, ptr);
std::copy(pex_msg.begin(), pex_msg.end(), i.begin); m_pc.send_buffer(msg, sizeof(msg));
i.begin += pex_msg.size(); m_pc.send_buffer(&pex_msg[0], pex_msg.size());
TORRENT_ASSERT(i.begin == i.end);
#ifdef TORRENT_VERBOSE_LOGGING #ifdef TORRENT_VERBOSE_LOGGING
lazy_entry m; lazy_entry m;
@ -464,8 +463,6 @@ namespace libtorrent { namespace
m_pc.peer_log("==> PEX_DIFF [ dropped: %d added: %d msg_size: %d ]" m_pc.peer_log("==> PEX_DIFF [ dropped: %d added: %d msg_size: %d ]"
, num_dropped, num_added, int(pex_msg.size())); , num_dropped, num_added, int(pex_msg.size()));
#endif #endif
m_pc.setup_send();
} }
void send_ut_peer_list() void send_ut_peer_list()
@ -528,21 +525,18 @@ namespace libtorrent { namespace
std::vector<char> pex_msg; std::vector<char> pex_msg;
bencode(std::back_inserter(pex_msg), pex); bencode(std::back_inserter(pex_msg), pex);
buffer::interval i = m_pc.allocate_send_buffer(6 + pex_msg.size()); char msg[6];
char* ptr = msg;
detail::write_uint32(1 + 1 + pex_msg.size(), i.begin); detail::write_uint32(1 + 1 + pex_msg.size(), ptr);
detail::write_uint8(bt_peer_connection::msg_extended, i.begin); detail::write_uint8(bt_peer_connection::msg_extended, ptr);
detail::write_uint8(m_message_index, i.begin); detail::write_uint8(m_message_index, ptr);
std::copy(pex_msg.begin(), pex_msg.end(), i.begin); m_pc.send_buffer(msg, sizeof(msg));
i.begin += pex_msg.size(); m_pc.send_buffer(&pex_msg[0], pex_msg.size());
TORRENT_ASSERT(i.begin == i.end);
#ifdef TORRENT_VERBOSE_LOGGING #ifdef TORRENT_VERBOSE_LOGGING
m_pc.peer_log("==> PEX_FULL [ added: %d msg_size: %d ]", num_added, int(pex_msg.size())); m_pc.peer_log("==> PEX_FULL [ added: %d msg_size: %d ]", num_added, int(pex_msg.size()));
#endif #endif
m_pc.setup_send();
} }
torrent& m_torrent; torrent& m_torrent;