From 7f890239c47657b7ae934b1c553056baf56de87e Mon Sep 17 00:00:00 2001 From: Arvid Norberg Date: Wed, 14 Sep 2005 19:33:16 +0000 Subject: [PATCH] Added new send buffer class to avoid unecessary copying of outgoing data. --- COPYING | 2 +- Jamfile | 1 + examples/make_torrent.cpp | 4 +- include/libtorrent/buffer.hpp | 336 +++++++++++++++++++++++++ include/libtorrent/peer_connection.hpp | 4 +- src/peer_connection.cpp | 243 +++++++++--------- test/Jamfile | 1 + test/test_buffer.cpp | 165 ++++++++++++ test/test_storage.cpp | 9 - 9 files changed, 634 insertions(+), 131 deletions(-) create mode 100644 include/libtorrent/buffer.hpp create mode 100644 test/test_buffer.cpp diff --git a/COPYING b/COPYING index 832fa103a..a446aa68a 100644 --- a/COPYING +++ b/COPYING @@ -10,7 +10,7 @@ are met: * Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. - * Neither the name of the author nor the names of its + * Neither the name of Rasterbar Software nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission. diff --git a/Jamfile b/Jamfile index 36648304b..575e141f9 100755 --- a/Jamfile +++ b/Jamfile @@ -28,6 +28,7 @@ project torrent release:NDEBUG BOOST_ALL_NO_LIB _FILE_OFFSET_BITS=64 + BOOST_THREAD_USE_LIB /boost/thread//boost_thread/static /boost/filesystem//boost_filesystem/static /boost/date_time//boost_date_time/static diff --git a/examples/make_torrent.cpp b/examples/make_torrent.cpp index 723c03558..de01f4e9b 100755 --- a/examples/make_torrent.cpp +++ b/examples/make_torrent.cpp @@ -87,8 +87,8 @@ int main(int argc, char* argv[]) try { torrent_info t; - path full_path = initial_path() / path(argv[3]); - ofstream out(initial_path() / path(argv[1]), std::ios_base::binary); + path full_path = complete(path(argv[3])); + ofstream out(complete(path(argv[1])), std::ios_base::binary); int piece_size = 256 * 1024; char const* creator_str = "libtorrent"; diff --git a/include/libtorrent/buffer.hpp b/include/libtorrent/buffer.hpp new file mode 100644 index 000000000..e85fd3163 --- /dev/null +++ b/include/libtorrent/buffer.hpp @@ -0,0 +1,336 @@ +/* +Copyright (c) 2003 - 2005, Arvid Norberg, Daniel Wallin +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in + the documentation and/or other materials provided with the distribution. + * Neither the name of Rasterbar Software nor the names of its + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. + +*/ + +#ifndef LIBTORRENT_BUFFER_HPP +#define LIBTORRENT_BUFFER_HPP + +#include + +namespace libtorrent { + +class buffer +{ +public: + struct interval + { + interval(char* begin, char* end) + : begin(begin) + , end(end) + {} + + char* begin; + char* end; + }; + + struct const_interval + { + const_interval(char const* begin, char const* end) + : begin(begin) + , end(end) + {} + + char const* begin; + char const* end; + }; + + typedef std::pair interval_type; + + buffer(std::size_t n = 0); + ~buffer(); + + interval allocate(std::size_t n); + void insert(char const* first, char const* last); + void erase(std::size_t n); + std::size_t size() const; + std::size_t capacity() const; + void reserve(std::size_t n); + interval_type data() const; + bool empty() const; + + std::size_t space_left() const; + + char const* raw_data() const + { + return m_first; + } + +private: + char* m_first; + char* m_last; + char* m_write_cursor; + char* m_read_cursor; + char* m_read_end; + bool m_empty; +}; + +inline buffer::buffer(std::size_t n) + : m_first((char*)::operator new(n)) + , m_last(m_first + n) + , m_write_cursor(m_first) + , m_read_cursor(m_first) + , m_read_end(m_last) + , m_empty(true) +{} + +inline buffer::~buffer() +{ + ::operator delete (m_first); +} + +inline buffer::interval buffer::allocate(std::size_t n) +{ + assert(m_read_cursor <= m_read_end || m_empty); + + if (m_read_cursor < m_write_cursor || m_empty) + { + // ..R***W.. + if (m_last - m_write_cursor >= (std::ptrdiff_t)n) + { + interval ret(m_write_cursor, m_write_cursor + n); + m_write_cursor += n; + m_read_end = m_write_cursor; + assert(m_read_cursor <= m_read_end); + if (n) m_empty = false; + return ret; + } + + if (m_read_cursor - m_first >= (std::ptrdiff_t)n) + { + m_read_end = m_write_cursor; + interval ret(m_first, m_first + n); + m_write_cursor = m_first + n; + assert(m_read_cursor <= m_read_end); + if (n) m_empty = false; + return ret; + } + + reserve(capacity() + n - (m_last - m_write_cursor)); + assert(m_last - m_write_cursor >= (std::ptrdiff_t)n); + interval ret(m_write_cursor, m_write_cursor + n); + m_write_cursor += n; + m_read_end = m_write_cursor; + if (n) m_empty = false; + assert(m_read_cursor <= m_read_end); + return ret; + + } + //**W...R** + if (m_read_cursor - m_write_cursor >= (std::ptrdiff_t)n) + { + interval ret(m_write_cursor, m_write_cursor + n); + m_write_cursor += n; + if (n) m_empty = false; + return ret; + } + reserve(capacity() + n - (m_read_cursor - m_write_cursor)); + assert(m_read_cursor - m_write_cursor >= (std::ptrdiff_t)n); + interval ret(m_write_cursor, m_write_cursor + n); + m_write_cursor += n; + if (n) m_empty = false; + return ret; +} + +inline void buffer::insert(char const* first, char const* last) +{ + std::size_t n = last - first; + + if (space_left() < n) + { + reserve(capacity() + n); + } + + m_empty = false; + + char const* end = (m_last - m_write_cursor) < (std::ptrdiff_t)n ? + m_last : m_write_cursor + n; + + std::size_t copied = end - m_write_cursor; + std::memcpy(m_write_cursor, first, copied); + + m_write_cursor += copied; + if (m_write_cursor > m_read_end) m_read_end = m_write_cursor; + first += copied; + n -= copied; + + if (n == 0) return; + + if (m_write_cursor == m_last) m_write_cursor = m_first; + + memcpy(m_write_cursor, first, n); + m_write_cursor += n; +} + +inline void buffer::erase(std::size_t n) +{ + assert(!m_empty); +#ifndef NDEBUG + int prev_size = size(); +#endif + assert(m_read_cursor <= m_read_end); + m_read_cursor += n; + if (m_read_cursor > m_read_end) + { + m_read_cursor = m_first + (m_read_cursor - m_read_end); + assert(m_read_cursor <= m_write_cursor); + } + + m_empty = m_read_cursor == m_write_cursor; + + assert(prev_size - n == size()); +} + +inline std::size_t buffer::size() const +{ + // ...R***W. + if (m_read_cursor < m_write_cursor) + { + return m_write_cursor - m_read_cursor; + } + // ***W..R* + else + { + if (m_empty) return 0; + return (m_write_cursor - m_first) + (m_read_end - m_read_cursor); + } +} + +inline std::size_t buffer::capacity() const +{ + return m_last - m_first; +} + +inline void buffer::reserve(std::size_t size) +{ + std::size_t n = (std::size_t)(capacity() * 1.f); + if (n < size) n = size; + + char* buf = (char*)::operator new(n); + char* old = m_first; + + if (m_read_cursor < m_write_cursor) + { + // ...R***W.<>. + std::memcpy( + buf + (m_read_cursor - m_first) + , m_read_cursor + , m_write_cursor - m_read_cursor + ); + + m_write_cursor = buf + (m_write_cursor - m_first); + m_read_cursor = buf + (m_read_cursor - m_first); + m_read_end = 0; + m_first = buf; + m_last = buf + n; + } + else + { + // **W..<>.R** + std::size_t skip = n - (m_last - m_first); + + std::memcpy(buf, m_first, m_write_cursor - m_first); + std::memcpy( + buf + (m_read_cursor - m_first) + skip + , m_read_cursor + , m_last - m_read_cursor + ); + + m_write_cursor = buf + (m_write_cursor - m_first); + + if (!m_empty) + { + m_read_cursor = buf + (m_read_cursor - m_first) + skip; + m_read_end = buf + (m_read_end - m_first) + skip; + } + else + { + m_read_cursor = m_write_cursor; + m_read_end = m_write_cursor; + } + + m_first = buf; + m_last = buf + n; + } + + ::operator delete (old); +} + +inline buffer::interval_type buffer::data() const +{ + // ...R***W. + if (m_read_cursor < m_write_cursor) + { + return interval_type( + const_interval(m_read_cursor, m_write_cursor) + , const_interval(m_last, m_last) + ); + } + // **W...R** + else + { + if (m_read_cursor == m_read_end) + { + return interval_type( + const_interval(m_first, m_write_cursor) + , const_interval(m_last, m_last)); + } + assert(m_read_cursor <= m_read_end || m_empty); + return interval_type( + const_interval(m_read_cursor, m_read_end) + , const_interval(m_first, m_write_cursor) + ); + } +} + +inline bool buffer::empty() const +{ + return m_empty; +} + +inline std::size_t buffer::space_left() const +{ + if (m_empty) return m_last - m_first; + + // ...R***W. + if (m_read_cursor < m_write_cursor) + { + return (m_last - m_write_cursor) + (m_read_cursor - m_first); + } + // ***W..R* + else + { + return m_read_cursor - m_write_cursor; + } +} + +} + +#endif // LIBTORRENT_BUFFER_HPP + diff --git a/include/libtorrent/peer_connection.hpp b/include/libtorrent/peer_connection.hpp index e9bd27455..90d17e3d5 100755 --- a/include/libtorrent/peer_connection.hpp +++ b/include/libtorrent/peer_connection.hpp @@ -54,6 +54,7 @@ POSSIBILITY OF SUCH DAMAGE. #pragma warning(pop) #endif +#include "libtorrent/buffer.hpp" #include "libtorrent/socket.hpp" #include "libtorrent/peer_id.hpp" #include "libtorrent/storage.hpp" @@ -337,7 +338,8 @@ namespace libtorrent // this is the buffer where data that is // to be sent is stored until it gets // consumed by send() - std::vector m_send_buffer; +// std::vector m_send_buffer; + buffer m_send_buffer; // this is a queue of ranges that describes // where in the send buffer actual payload diff --git a/src/peer_connection.cpp b/src/peer_connection.cpp index 7083c18d1..ca72bf0b0 100755 --- a/src/peer_connection.cpp +++ b/src/peer_connection.cpp @@ -454,43 +454,46 @@ namespace libtorrent // add handshake to the send buffer const char version_string[] = "BitTorrent protocol"; const int string_len = sizeof(version_string)-1; - int pos = 1; - m_send_buffer.resize(1 + string_len + 8 + 20 + 20); + m_send_buffer.reserve(1 + string_len + 8 + 20 + 20); + buffer::interval i = m_send_buffer.allocate(1 + string_len + 8 + 20 + 20); // length of version string - m_send_buffer[0] = string_len; + *i.begin = string_len; + ++i.begin; // version string itself std::copy( version_string - , version_string+string_len - , m_send_buffer.begin()+pos); - pos += string_len; + , version_string + string_len + , i.begin); + i.begin += string_len; // 8 zeroes std::fill( - m_send_buffer.begin() + pos - , m_send_buffer.begin() + pos + 8 + i.begin + , i.begin + 8 , 0); // indicate that we support the extension protocol if (m_ses.extensions_enabled()) - m_send_buffer[pos+7] = 0x01; - pos += 8; + *(i.begin + 7) = 0x01; + i.begin += 8; // info hash std::copy( m_torrent->torrent_file().info_hash().begin() , m_torrent->torrent_file().info_hash().end() - , m_send_buffer.begin() + pos); - pos += 20; + , i.begin); + i.begin += 20; // peer id std::copy( m_ses.get_peer_id().begin() , m_ses.get_peer_id().end() - , m_send_buffer.begin() + pos); + , i.begin); + i.begin += 20; + assert(i.begin == i.end); #ifdef TORRENT_VERBOSE_LOGGING using namespace boost::posix_time; @@ -1447,11 +1450,11 @@ namespace libtorrent m_torrent->picker().abort_download(block); - std::deque::iterator i + std::deque::iterator it = std::find(m_download_queue.begin(), m_download_queue.end(), block); - assert(i != m_download_queue.end()); + assert(it != m_download_queue.end()); - m_download_queue.erase(i); + m_download_queue.erase(it); int block_offset = block.block_index * m_torrent->block_size(); @@ -1463,20 +1466,19 @@ namespace libtorrent char buf[] = {0,0,0,13, msg_cancel}; - std::size_t start_offset = m_send_buffer.size(); - m_send_buffer.resize(start_offset + 17); + buffer::interval i = m_send_buffer.allocate(17); - std::copy(buf, buf + 5, m_send_buffer.begin()+start_offset); - start_offset += 5; - - char* ptr = &m_send_buffer[start_offset]; + std::copy(buf, buf + 5, i.begin); + i.begin += 5; // index - detail::write_int32(block.piece_index, ptr); + detail::write_int32(block.piece_index, i.begin); // begin - detail::write_int32(block_offset, ptr); + detail::write_int32(block_offset, i.begin); // length - detail::write_int32(block_size, ptr); + detail::write_int32(block_size, i.begin); + + assert(i.begin == i.end); #ifdef TORRENT_VERBOSE_LOGGING using namespace boost::posix_time; @@ -1512,21 +1514,20 @@ namespace libtorrent char buf[] = {0,0,0,13, msg_request}; - std::size_t start_offset = m_send_buffer.size(); - m_send_buffer.resize(start_offset + 17); + buffer::interval i = m_send_buffer.allocate(17); - std::copy(buf, buf + 5, m_send_buffer.begin()+start_offset); + std::copy(buf, buf + 5, i.begin); + i.begin += 5; - char* ptr = &m_send_buffer[start_offset+5]; // index - detail::write_int32(block.piece_index, ptr); - + detail::write_int32(block.piece_index, i.begin); // begin - detail::write_int32(block_offset, ptr); - + detail::write_int32(block_offset, i.begin); // length - detail::write_int32(block_size, ptr); + detail::write_int32(block_size, i.begin); + assert(i.begin == i.end); + #ifdef TORRENT_VERBOSE_LOGGING using namespace boost::posix_time; (*m_logger) << to_simple_string(second_clock::universal_time()) @@ -1558,34 +1559,38 @@ namespace libtorrent // abort if the peer doesn't support the metadata extension if (!supports_extension(extended_metadata_message)) return; - std::back_insert_iterator > ptr(m_send_buffer); - if (m_torrent->valid_metadata()) { std::pair offset = req_to_offset(req, (int)m_torrent->metadata().size()); + buffer::interval i = m_send_buffer.allocate(18 + offset.second); + // yes, we have metadata, send it - detail::write_uint32(5 + 9 + offset.second, ptr); - detail::write_uint8(msg_extended, ptr); - detail::write_int32(m_extension_messages[extended_metadata_message], ptr); + detail::write_uint32(5 + 9 + offset.second, i.begin); + detail::write_uint8(msg_extended, i.begin); + detail::write_int32(m_extension_messages[extended_metadata_message], i.begin); // means 'data packet' - detail::write_uint8(1, ptr); - detail::write_uint32((int)m_torrent->metadata().size(), ptr); - detail::write_uint32(offset.first, ptr); + detail::write_uint8(1, i.begin); + detail::write_uint32((int)m_torrent->metadata().size(), i.begin); + detail::write_uint32(offset.first, i.begin); std::vector const& metadata = m_torrent->metadata(); std::copy(metadata.begin() + offset.first - , metadata.begin() + offset.first + offset.second, ptr); + , metadata.begin() + offset.first + offset.second, i.begin); + i.begin += offset.second; + assert(i.begin == i.end); } else { + buffer::interval i = m_send_buffer.allocate(10); // we don't have the metadata, reply with // don't have-message - detail::write_uint32(1 + 4 + 1, ptr); - detail::write_uint8(msg_extended, ptr); - detail::write_int32(m_extension_messages[extended_metadata_message], ptr); + detail::write_uint32(1 + 4 + 1, i.begin); + detail::write_uint8(msg_extended, i.begin); + detail::write_int32(m_extension_messages[extended_metadata_message], i.begin); // means 'have no data' - detail::write_uint8(2, ptr); + detail::write_uint8(2, i.begin); + assert(i.begin == i.end); } send_buffer_updated(); } @@ -1612,15 +1617,16 @@ namespace libtorrent << " size: " << req.second << " ]\n"; #endif - std::back_insert_iterator > ptr(m_send_buffer); + buffer::interval i = m_send_buffer.allocate(12); - detail::write_uint32(1 + 4 + 3, ptr); - detail::write_uint8(msg_extended, ptr); - detail::write_int32(m_extension_messages[extended_metadata_message], ptr); + detail::write_uint32(1 + 4 + 3, i.begin); + detail::write_uint8(msg_extended, i.begin); + detail::write_int32(m_extension_messages[extended_metadata_message], i.begin); // means 'request data' - detail::write_uint8(0, ptr); - detail::write_uint8(start, ptr); - detail::write_uint8(size - 1, ptr); + detail::write_uint8(0, i.begin); + detail::write_uint8(start, i.begin); + detail::write_uint8(size - 1, i.begin); + assert(i.begin == i.end); send_buffer_updated(); } @@ -1636,12 +1642,15 @@ namespace libtorrent std::vector message; bencode(std::back_inserter(message), e); - std::back_insert_iterator > ptr(m_send_buffer); - detail::write_uint32(1 + 4 + (int)message.size(), ptr); - detail::write_uint8(msg_extended, ptr); - detail::write_int32(m_extension_messages[extended_chat_message], ptr); - std::copy(message.begin(), message.end(), ptr); + buffer::interval i = m_send_buffer.allocate(message.size() + 9); + + detail::write_uint32(1 + 4 + (int)message.size(), i.begin); + detail::write_uint8(msg_extended, i.begin); + detail::write_int32(m_extension_messages[extended_chat_message], i.begin); + std::copy(message.begin(), message.end(), i.begin); + i.begin += message.size(); + assert(i.begin == i.end); send_buffer_updated(); } @@ -1664,19 +1673,19 @@ namespace libtorrent (*m_logger) << "\n"; #endif const int packet_size = ((int)m_have_piece.size() + 7) / 8 + 5; - const int old_size = (int)m_send_buffer.size(); - m_send_buffer.resize(old_size + packet_size); + + buffer::interval i = m_send_buffer.allocate(packet_size); - char* ptr = &m_send_buffer[old_size]; - detail::write_int32(packet_size - 4, ptr); - detail::write_uint8(msg_bitfield, ptr); + detail::write_int32(packet_size - 4, i.begin); + detail::write_uint8(msg_bitfield, i.begin); - std::fill(m_send_buffer.begin() + old_size + 5, m_send_buffer.end(), 0); - for (int i = 0; i < (int)m_have_piece.size(); ++i) + std::fill(i.begin, i.end, 0); + for (int c = 0; c < (int)m_have_piece.size(); ++c) { - if (m_torrent->have_piece(i)) - m_send_buffer[old_size + 5 + (i>>3)] |= 1 << (7 - (i&7)); + if (m_torrent->have_piece(c)) + i.begin[c >> 3] |= 1 << (7 - (c & 7)); } + assert(i.end - i.begin == ((int)m_have_piece.size() + 7) / 8); send_buffer_updated(); } @@ -1701,17 +1710,20 @@ namespace libtorrent extension_list[extension_names[i]] = i; } - // make room for message size - const int msg_size_pos = (int)m_send_buffer.size(); - m_send_buffer.resize(msg_size_pos + 4); + std::vector msg; + bencode(std::back_inserter(msg), extension_list); - m_send_buffer.push_back(msg_extension_list); - - bencode(std::back_inserter(m_send_buffer), extension_list); + // make room for message + buffer::interval i = m_send_buffer.allocate(5 + msg.size()); // write the length of the message - char* ptr = &m_send_buffer[msg_size_pos]; - detail::write_int32((int)m_send_buffer.size() - msg_size_pos - 4, ptr); + detail::write_int32((int)msg.size() + 1, i.begin); + + detail::write_uint8(msg_extension_list, i.begin); + + std::copy(msg.begin(), msg.end(), i.begin); + i.begin += msg.size(); + assert(i.begin == i.end); send_buffer_updated(); } @@ -1722,7 +1734,7 @@ namespace libtorrent if (m_choked) return; char msg[] = {0,0,0,1,msg_choke}; - m_send_buffer.insert(m_send_buffer.end(), msg, msg+sizeof(msg)); + m_send_buffer.insert(msg, msg + sizeof(msg)); m_choked = true; #ifdef TORRENT_VERBOSE_LOGGING @@ -1754,7 +1766,7 @@ namespace libtorrent if (!m_choked) return; char msg[] = {0,0,0,1,msg_unchoke}; - m_send_buffer.insert(m_send_buffer.end(), msg, msg+sizeof(msg)); + m_send_buffer.insert(msg, msg + sizeof(msg)); m_choked = false; #ifdef TORRENT_VERBOSE_LOGGING @@ -1771,7 +1783,7 @@ namespace libtorrent if (m_interesting) return; char msg[] = {0,0,0,1,msg_interested}; - m_send_buffer.insert(m_send_buffer.end(), msg, msg+sizeof(msg)); + m_send_buffer.insert(msg, msg + sizeof(msg)); m_interesting = true; #ifdef TORRENT_VERBOSE_LOGGING @@ -1788,7 +1800,7 @@ namespace libtorrent if (!m_interesting) return; char msg[] = {0,0,0,1,msg_not_interested}; - m_send_buffer.insert(m_send_buffer.end(), msg, msg+sizeof(msg)); + m_send_buffer.insert(msg, msg + sizeof(msg)); m_interesting = false; m_became_uninteresting = second_clock::universal_time(); @@ -1814,9 +1826,9 @@ namespace libtorrent const int packet_size = 9; char msg[packet_size] = {0,0,0,5,msg_have}; - char* ptr = msg+5; + char* ptr = msg + 5; detail::write_int32(index, ptr); - m_send_buffer.insert(m_send_buffer.end(), msg, msg + packet_size); + m_send_buffer.insert(msg, msg + packet_size); #ifdef TORRENT_VERBOSE_LOGGING using namespace boost::posix_time; @@ -2306,30 +2318,28 @@ namespace libtorrent assert(r.start + r.length <= m_torrent->torrent_file().piece_size(r.piece)); assert(r.length > 0 && r.start >= 0); -#ifndef NDEBUG -// assert(m_torrent->verify_piece(r.piece) && "internal error"); -#endif - const int send_buffer_offset = (int)m_send_buffer.size(); const int packet_size = 4 + 5 + 4 + r.length; - m_send_buffer.resize(send_buffer_offset + packet_size); - char* ptr = &m_send_buffer[send_buffer_offset]; - detail::write_int32(packet_size-4, ptr); - *ptr = msg_piece; ++ptr; - detail::write_int32(r.piece, ptr); - detail::write_int32(r.start, ptr); + + buffer::interval i = m_send_buffer.allocate(packet_size); + + detail::write_int32(packet_size-4, i.begin); + detail::write_uint8(msg_piece, i.begin); + detail::write_int32(r.piece, i.begin); + detail::write_int32(r.start, i.begin); m_torrent->filesystem().read( - &m_send_buffer[send_buffer_offset+13] + i.begin , r.piece , r.start , r.length); + assert(i.begin + r.length == i.end); #ifdef TORRENT_VERBOSE_LOGGING using namespace boost::posix_time; (*m_logger) << to_simple_string(second_clock::universal_time()) << " ==> PIECE [ piece: " << r.piece << " | s: " << r.start << " | l: " << r.length << " ]\n"; #endif - m_payloads.push_back(range(send_buffer_offset+13, r.length)); + m_payloads.push_back(range(m_send_buffer.size() - r.length, r.length)); m_requests.erase(m_requests.begin()); if (m_requests.empty() @@ -2350,8 +2360,7 @@ namespace libtorrent { assert(m_torrent->valid_metadata()); for (std::vector::iterator i = m_announce_queue.begin(); - i != m_announce_queue.end(); - ++i) + i != m_announce_queue.end(); ++i) { send_have(*i); } @@ -2368,22 +2377,34 @@ namespace libtorrent assert(amount_to_send > 0); + buffer::interval_type send_buffer = m_send_buffer.data(); // we have data that's scheduled for sending - int sent = m_socket->send( - &m_send_buffer[0] - , amount_to_send); + int to_send = std::min(send_buffer.first.end - send_buffer.first.begin + , m_ul_bandwidth_quota.left()); + int sent = m_socket->send(send_buffer.first.begin, to_send); + if (sent == send_buffer.first.end - send_buffer.first.end + && send_buffer.second.begin != send_buffer.second.end + && to_send > sent) + { + to_send -= sent; + to_send = std::min(to_send, send_buffer.second.end + - send_buffer.second.begin); + int ret = m_socket->send(send_buffer.second.begin, to_send); + if (ret > 0) sent += ret; + } + if (sent > 0) { m_ul_bandwidth_quota.used += sent; + m_send_buffer.erase(sent); // manage the payload markers int amount_payload = 0; if (!m_payloads.empty()) { for (std::deque::iterator i = m_payloads.begin(); - i != m_payloads.end(); - ++i) + i != m_payloads.end(); ++i) { i->start -= sent; if (i->start < 0) @@ -2401,6 +2422,7 @@ namespace libtorrent } } } + // TODO: move the erasing into the loop above // remove all payload ranges that has been sent m_payloads.erase( std::remove_if(m_payloads.begin(), m_payloads.end(), range_below_zero) @@ -2408,20 +2430,6 @@ namespace libtorrent assert(amount_payload <= sent); m_statistics.sent_bytes(amount_payload, sent - amount_payload); - - // empty the entire buffer at once or if - // only a part of the buffer could be sent - // remove the part that was sent from the buffer - if (sent == (int)m_send_buffer.size()) - { - m_send_buffer.clear(); - } - else - { - m_send_buffer.erase( - m_send_buffer.begin() - , m_send_buffer.begin() + sent); - } } else { @@ -2496,7 +2504,7 @@ namespace libtorrent if (m_announce_queue.empty()) { char noop[] = {0,0,0,0}; - m_send_buffer.insert(m_send_buffer.end(), noop, noop+4); + m_send_buffer.insert(noop, noop + 4); m_last_sent = second_clock::universal_time(); #ifdef TORRENT_VERBOSE_LOGGING using namespace boost::posix_time; @@ -2507,8 +2515,7 @@ namespace libtorrent else { for (std::vector::iterator i = m_announce_queue.begin(); - i != m_announce_queue.end(); - ++i) + i != m_announce_queue.end(); ++i) { send_have(*i); } diff --git a/test/Jamfile b/test/Jamfile index c8c82c982..4138e9d1d 100644 --- a/test/Jamfile +++ b/test/Jamfile @@ -8,6 +8,7 @@ project ; test-suite libtorrent : + [ run test_buffer.cpp ] [ run test_storage.cpp ] [ run test_piece_picker.cpp ] # [ run test_entry.cpp ] diff --git a/test/test_buffer.cpp b/test/test_buffer.cpp new file mode 100644 index 000000000..6f6360544 --- /dev/null +++ b/test/test_buffer.cpp @@ -0,0 +1,165 @@ +/* + Copyright (c) 2003 - 2005, Arvid Norberg, Daniel Wallin + All rights reserved. + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions + are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in + the documentation and/or other materials provided with the distribution. + * Neither the name of Rasterbar Software nor the names of its + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. + +*/ + +#include +#include +#include +#include +#include + +#include "libtorrent/buffer.hpp" + +#include "test.hpp" + +using libtorrent::buffer; +/* +template +T const& min_(T const& x, T const& y) +{ + return x < y ? x : y; +} + +void test_speed() +{ + buffer b; + + char data[32]; + + srand(0); + + boost::timer t; + + int const iterations = 5000000; + int const step = iterations / 20; + + for (int i = 0; i < iterations; ++i) + { + int x = rand(); + + if (i % step == 0) std::cerr << "."; + + std::size_t n = rand() % 32; + n = 32; + + if (x % 2) + { + b.insert(data, data + n); + } + else + { + b.erase(min_(b.size(), n)); + } + } + + float t1 = t.elapsed(); + std::cerr << "buffer elapsed: " << t.elapsed() << "\n"; + + std::vector v; + + srand(0); + t.restart(); + + for (int i = 0; i < iterations; ++i) + { + int x = rand(); + + if (i % step == 0) std::cerr << "."; + + std::size_t n = rand() % 32; + n = 32; + + if (x % 2) + { + v.insert(v.end(), data, data + n); + } + else + { + v.erase(v.begin(), v.begin() + min_(v.size(), n)); + } + } + + float t2 = t.elapsed(); + std::cerr << "std::vector elapsed: " << t.elapsed() << "\n"; + + assert(t1 < t2); +} +*/ + +int test_main() +{ + char data[] = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}; + + buffer b; + + TEST_CHECK(b.size() == 0); + TEST_CHECK(b.capacity() == 0); + TEST_CHECK(b.empty()); + + buffer::interval i = b.allocate(1); + memcpy(i.begin, data, 1); + + TEST_CHECK(b.size() == 1); + TEST_CHECK(b.capacity() >= 1); + + i = b.allocate(4); + memcpy(i.begin, data + 1, 4); + TEST_CHECK(b.size() == 5); + TEST_CHECK(b.capacity() >= 5); + + i = b.allocate(4); + memcpy(i.begin, data + 5, 4); + TEST_CHECK(b.size() == 9); + TEST_CHECK(b.capacity() >= 9); + + TEST_CHECK(!b.empty()); + + buffer::interval_type read_data = b.data(); + TEST_CHECK(std::equal(read_data.first.begin, read_data.first.end, data)); + + b.erase(5); + + TEST_CHECK(b.space_left() == 5); + + i = b.allocate(3); + memcpy(i.begin, data, 3); + TEST_CHECK(b.space_left() == 2); + TEST_CHECK(b.size() == 7); + + read_data = b.data(); + TEST_CHECK(std::equal(read_data.first.begin, read_data.first.end, data + 5)); + TEST_CHECK(std::equal(read_data.second.begin, read_data.second.end, data)); + + b.erase(7); + + TEST_CHECK(b.empty()); + return 0; +} + diff --git a/test/test_storage.cpp b/test/test_storage.cpp index a5fe4fe2c..e52eea5b6 100644 --- a/test/test_storage.cpp +++ b/test/test_storage.cpp @@ -9,15 +9,6 @@ #include "test.hpp" -#define _FILE_OFFSET_BITS 64 -#include -#include -#include -#include -#include - - - using namespace libtorrent; using namespace boost::filesystem;