From 6faa11f7abe08bc4085276a52d4065dbef81b421 Mon Sep 17 00:00:00 2001 From: Arvid Norberg Date: Fri, 11 May 2007 18:40:22 +0000 Subject: [PATCH] optimized web_peer_connection to use less memory and replaced std::copy with std::memmove in peer_connection --- include/libtorrent/web_peer_connection.hpp | 8 + src/peer_connection.cpp | 9 +- src/web_peer_connection.cpp | 178 +++++++++++++-------- 3 files changed, 123 insertions(+), 72 deletions(-) diff --git a/include/libtorrent/web_peer_connection.hpp b/include/libtorrent/web_peer_connection.hpp index db87d7459..569ea0b61 100755 --- a/include/libtorrent/web_peer_connection.hpp +++ b/include/libtorrent/web_peer_connection.hpp @@ -163,6 +163,14 @@ namespace libtorrent std::vector m_piece; // the mapping of the data in the m_piece buffer peer_request m_intermediate_piece; + + // the number of bytes into the receive buffer where + // current read cursor is. + int m_body_start; + // the number of bytes received in the current HTTP + // response. used to know where in the buffer the + // next response starts + int m_received_body; }; } diff --git a/src/peer_connection.cpp b/src/peer_connection.cpp index 4336dcc6a..916aaeba0 100755 --- a/src/peer_connection.cpp +++ b/src/peer_connection.cpp @@ -1666,11 +1666,12 @@ namespace libtorrent INVARIANT_CHECK; assert(packet_size > 0); - assert((int)m_recv_buffer.size() >= size); - // TODO: replace with memmov - std::copy(m_recv_buffer.begin() + size, m_recv_buffer.begin() + m_recv_pos, m_recv_buffer.begin()); - + assert(int(m_recv_buffer.size()) >= size); + assert(int(m_recv_buffer.size()) >= m_recv_pos); assert(m_recv_pos >= size); + + std::memmove(&m_recv_buffer[0], &m_recv_buffer[0] + size, m_recv_pos - size); + m_recv_pos -= size; #ifndef NDEBUG diff --git a/src/web_peer_connection.cpp b/src/web_peer_connection.cpp index 6d7e9163e..a341b7b50 100755 --- a/src/web_peer_connection.cpp +++ b/src/web_peer_connection.cpp @@ -107,17 +107,31 @@ namespace libtorrent boost::optional web_peer_connection::downloading_piece_progress() const { - if (!m_parser.header_finished() || m_requests.empty()) + if (m_requests.empty()) return boost::optional(); boost::shared_ptr t = associated_torrent().lock(); assert(t); - buffer::const_interval http_body = m_parser.get_body(); piece_block_progress ret; ret.piece_index = m_requests.front().piece; - ret.bytes_downloaded = http_body.left() % t->block_size(); + if (!m_piece.empty()) + { + ret.bytes_downloaded = int(m_piece.size()); + } + else + { + if (!m_parser.header_finished()) + { + ret.bytes_downloaded = 0; + } + else + { + int receive_buffer_size = receive_buffer().left() - m_parser.body_start(); + ret.bytes_downloaded = receive_buffer_size % t->block_size(); + } + } ret.block_index = (m_requests.front().start + ret.bytes_downloaded) / t->block_size(); ret.full_block_bytes = t->block_size(); const int last_piece = t->torrent_file().num_pieces() - 1; @@ -137,8 +151,8 @@ namespace libtorrent t->torrent_file().num_pieces(), true)); // it is always possible to request pieces incoming_unchoke(); - - reset_recv_buffer(t->torrent_file().piece_length() + 1024 * 2); + + reset_recv_buffer(t->block_size() + 1024); } void web_peer_connection::write_request(peer_request const& r) @@ -301,36 +315,45 @@ namespace libtorrent int payload; int protocol; bool header_finished = m_parser.header_finished(); - boost::tie(payload, protocol) = m_parser.incoming(recv_buffer); - m_statistics.received_bytes(payload, protocol); - - assert(recv_buffer.left() <= packet_size()); - assert (recv_buffer.left() < packet_size() - || m_parser.finished()); - - // this means the entire status line hasn't been received yet - if (m_parser.status_code() == -1) break; - - // if the status code is not one of the accepted ones, abort - if (m_parser.status_code() != 206 // partial content - && m_parser.status_code() != 200 // OK - && !(m_parser.status_code() >= 300 // redirect - && m_parser.status_code() < 400)) + if (!header_finished) { - // we should not try this server again. - t->remove_url_seed(m_url); - std::string error_msg = boost::lexical_cast(m_parser.status_code()) - + " " + m_parser.message(); - if (m_ses.m_alerts.should_post(alert::warning)) - { - session_impl::mutex_t::scoped_lock l(m_ses.m_mutex); - m_ses.m_alerts.post_alert(url_seed_alert(t->get_handle(), url() - , error_msg)); - } - throw std::runtime_error(error_msg); - } + boost::tie(payload, protocol) = m_parser.incoming(recv_buffer); + m_statistics.received_bytes(payload, protocol); - if (!m_parser.header_finished()) break; + assert(recv_buffer.left() == 0 || *recv_buffer.begin == 'H'); + + assert(recv_buffer.left() <= packet_size()); + + // this means the entire status line hasn't been received yet + if (m_parser.status_code() == -1) break; + + // if the status code is not one of the accepted ones, abort + if (m_parser.status_code() != 206 // partial content + && m_parser.status_code() != 200 // OK + && !(m_parser.status_code() >= 300 // redirect + && m_parser.status_code() < 400)) + { + // we should not try this server again. + t->remove_url_seed(m_url); + std::string error_msg = boost::lexical_cast(m_parser.status_code()) + + " " + m_parser.message(); + if (m_ses.m_alerts.should_post(alert::warning)) + { + session_impl::mutex_t::scoped_lock l(m_ses.m_mutex); + m_ses.m_alerts.post_alert(url_seed_alert(t->get_handle(), url() + , error_msg)); + } + throw std::runtime_error(error_msg); + } + if (!m_parser.header_finished()) break; + + m_body_start = m_parser.body_start(); + m_received_body = 0; + } + else + { + m_statistics.received_bytes(bytes_transferred, 0); + } // we just completed reading the header if (!header_finished) @@ -385,9 +408,11 @@ namespace libtorrent m_server_string += ")"; } + m_body_start = m_parser.body_start(); + m_received_body = 0; } - buffer::const_interval http_body = m_parser.get_body(); + recv_buffer.begin += m_body_start; size_type range_start; size_type range_end; @@ -428,19 +453,17 @@ namespace libtorrent , range_end - range_start); peer_request front_request = m_requests.front(); + if (in_range.piece != front_request.piece || in_range.start > front_request.start + int(m_piece.size())) { throw std::runtime_error("invalid range in HTTP response"); } - front_request = m_requests.front(); - // skip the http header and the blocks we've already read. The // http_body.begin is now in sync with the request at the front // of the request queue assert(in_range.start - int(m_piece.size()) <= front_request.start); - http_body.begin += front_request.start - in_range.start + int(m_piece.size()); // the http response body consists of 3 parts // 1. the middle of a block or the ending of a block @@ -464,40 +487,53 @@ namespace libtorrent // m_piece as buffer. int piece_size = int(m_piece.size()); - int copy_size = std::min(front_request.length - piece_size - , http_body.left()); + int copy_size = std::min(std::min(front_request.length - piece_size + , recv_buffer.left()), int(range_end - range_start - m_received_body)); m_piece.resize(piece_size + copy_size); - std::memcpy(&m_piece[0] + m_piece.size(), http_body.begin, copy_size); + assert(copy_size > 0); + std::memcpy(&m_piece[0] + piece_size, recv_buffer.begin, copy_size); assert(int(m_piece.size()) <= front_request.length); - http_body.begin += copy_size; - if (int(m_piece.size()) < front_request.length) - return; + recv_buffer.begin += copy_size; + m_received_body += copy_size; + m_body_start += copy_size; + assert(m_received_body <= range_end - range_start); + assert(int(m_piece.size()) <= front_request.length); + if (int(m_piece.size()) == front_request.length) + { + // each call to incoming_piece() may result in us becoming + // a seed. If we become a seed, all seeds we're connected to + // will be disconnected, including this web seed. We need to + // check for the disconnect condition after the call. - // each call to incoming_piece() may result in us becoming - // a seed. If we become a seed, all seeds we're connected to - // will be disconnected, including this web seed. We need to - // check for the disconnect condition after the call. - - m_requests.pop_front(); - incoming_piece(front_request, &m_piece[0]); - if (associated_torrent().expired()) return; - m_piece.clear(); + m_requests.pop_front(); + incoming_piece(front_request, &m_piece[0]); + if (associated_torrent().expired()) return; + cut_receive_buffer(m_body_start, t->block_size() + 1024); + m_body_start = 0; + recv_buffer = receive_buffer(); + assert(m_received_body <= range_end - range_start); + m_piece.clear(); + assert(m_piece.empty()); + } } // report all received blocks to the bittorrent engine while (!m_requests.empty() && range_contains(in_range, m_requests.front()) - && http_body.left() >= m_requests.front().length) + && recv_buffer.left() >= m_requests.front().length) { peer_request r = m_requests.front(); m_requests.pop_front(); - assert(http_body.begin == recv_buffer.begin + m_parser.body_start() - + r.start - in_range.start); - assert(http_body.left() >= r.length); + assert(recv_buffer.left() >= r.length); - incoming_piece(r, http_body.begin); + incoming_piece(r, recv_buffer.begin); if (associated_torrent().expired()) return; - http_body.begin += r.length; + m_received_body += r.length; + assert(receive_buffer().begin + m_body_start == recv_buffer.begin); + assert(m_received_body <= range_end - range_start); + cut_receive_buffer(r.length + m_body_start, t->block_size() + 1024); + m_body_start = 0; + recv_buffer = receive_buffer(); } if (!m_requests.empty()) @@ -506,25 +542,31 @@ namespace libtorrent > m_requests.front().start + int(m_piece.size()); if (in_range.start + in_range.length < m_requests.front().start + m_requests.front().length - && m_parser.finished()) + && (m_received_body + recv_buffer.left() >= range_end - range_start)) { int piece_size = int(m_piece.size()); - int copy_size = std::min(m_requests.front().length - piece_size - , http_body.left()); + int copy_size = std::min(std::min(m_requests.front().length - piece_size + , recv_buffer.left()), int(range_end - range_start - m_received_body)); + assert(copy_size >= 0); m_piece.resize(piece_size + copy_size); - std::memcpy(&m_piece[0] + piece_size, http_body.begin, copy_size); - http_body.begin += copy_size; + std::memcpy(&m_piece[0] + piece_size, recv_buffer.begin, copy_size); + recv_buffer.begin += copy_size; + m_received_body += copy_size; + m_body_start += copy_size; + assert(m_received_body <= range_end - range_start); } } - if (m_parser.finished()) + assert(m_received_body <= range_end - range_start); + if (m_received_body == range_end - range_start) { + cut_receive_buffer(recv_buffer.begin - receive_buffer().begin + , t->block_size() + 1024); + recv_buffer = receive_buffer(); m_file_requests.pop_front(); - assert(http_body.left() == 0); m_parser.reset(); - assert(recv_buffer.end == http_body.end || *http_body.end == 'H'); - cut_receive_buffer(http_body.end - recv_buffer.begin - , t->torrent_file().piece_length() + 1024 * 2); + m_body_start = 0; + m_received_body = 0; continue; } break;