diff --git a/examples/client_test.cpp b/examples/client_test.cpp index 337bf3e5d..50df2a2be 100644 --- a/examples/client_test.cpp +++ b/examples/client_test.cpp @@ -1367,6 +1367,8 @@ int main(int argc, char* argv[]) settings.disk_cache_algorithm = session_settings::avoid_readback; settings.volatile_read_cache = false; + settings.max_allowed_in_request_queue = 1000; + settings.send_buffer_watermark = 10 * 1024 * 1024; ses.set_settings(settings); for (std::vector::iterator i = magnet_links.begin() diff --git a/include/libtorrent/peer_connection.hpp b/include/libtorrent/peer_connection.hpp index 1bffa09bc..c14043c4b 100644 --- a/include/libtorrent/peer_connection.hpp +++ b/include/libtorrent/peer_connection.hpp @@ -538,11 +538,14 @@ namespace libtorrent // these functions are virtual to let bt_peer_connection hook into them // and encrypt the content - enum message_type_flags { message_type_request = 1, cork_message = 2 }; + enum message_type_flags { message_type_request = 1 }; virtual void send_buffer(char const* begin, int size, int flags = 0 , void (*fun)(char*, int, void*) = 0, void* userdata = 0); virtual void setup_send(); + void cork_socket() { TORRENT_ASSERT(!m_corked); m_corked = true; } + void uncork_socket(); + #ifdef TORRENT_DISK_STATS void log_buffer_usage(char* buffer, int size, char const* label); #endif @@ -1138,6 +1141,13 @@ namespace libtorrent // when this is set, the transfer stats for this connection // is not included in the torrent or session stats bool m_ignore_stats:1; + + // when this is set, the peer_connection socket is + // corked, similar to the linux TCP feature TCP_CORK. + // we won't send anything to the actual socket, just + // buffer messages up in the application layer send + // buffer, and send it once we're uncorked. + bool m_corked:1; template struct handler_storage @@ -1232,6 +1242,14 @@ namespace libtorrent int m_received_in_piece; #endif }; + + struct cork + { + cork(peer_connection& p): m_pc(p) { m_pc.cork_socket(); } + ~cork() { m_pc.uncork_socket(); } + peer_connection& m_pc; + }; + } #endif // TORRENT_PEER_CONNECTION_HPP_INCLUDED diff --git a/src/bt_peer_connection.cpp b/src/bt_peer_connection.cpp index 1869d3c55..c22e2655f 100644 --- a/src/bt_peer_connection.cpp +++ b/src/bt_peer_connection.cpp @@ -2339,12 +2339,12 @@ namespace libtorrent char* ptr = msg; detail::write_int32(r.length + 1 + 4 + 4 + 4 + piece_list_buf.size(), ptr); - send_buffer(msg, 17, cork_message); - send_buffer(&piece_list_buf[0], piece_list_buf.size(), cork_message); + send_buffer(msg, 17); + send_buffer(&piece_list_buf[0], piece_list_buf.size()); } else { - send_buffer(msg, 13, cork_message); + send_buffer(msg, 13); } append_send_buffer(buffer.get(), r.length diff --git a/src/peer_connection.cpp b/src/peer_connection.cpp index 628e3e98a..49211dc62 100644 --- a/src/peer_connection.cpp +++ b/src/peer_connection.cpp @@ -162,6 +162,7 @@ namespace libtorrent , m_sent_suggests(false) , m_holepunch_mode(false) , m_ignore_stats(false) + , m_corked(false) #if defined TORRENT_DEBUG || TORRENT_RELEASE_ASSERTS , m_in_constructor(true) , m_disconnect_started(false) @@ -312,6 +313,7 @@ namespace libtorrent , m_sent_suggests(false) , m_holepunch_mode(false) , m_ignore_stats(false) + , m_corked(false) #if defined TORRENT_DEBUG || TORRENT_RELEASE_ASSERTS , m_in_constructor(true) , m_disconnect_started(false) @@ -2550,6 +2552,11 @@ namespace libtorrent #endif TORRENT_ASSERT(m_ses.is_network_thread()); + // flush send buffer at the end of this scope + // TODO: peers should really be corked/uncorked outside of + // all completed disk operations + cork _c(*this); + INVARIANT_CHECK; m_outstanding_writing_bytes -= p.length; @@ -4405,6 +4412,11 @@ namespace libtorrent void peer_connection::on_disk_read_complete(int ret, disk_io_job const& j, peer_request r) { + // flush send buffer at the end of this scope + // TODO: peers should really be corked/uncorked outside of + // all completed disk operations + cork _c(*this); + #ifdef TORRENT_STATS ++m_ses.m_num_messages[aux::session_impl::on_disk_read_counter]; #endif @@ -4570,6 +4582,13 @@ namespace libtorrent , priority , bwc1, bwc2, bwc3, bwc4); } + void peer_connection::uncork_socket() + { + if (!m_corked) return; + m_corked = false; + setup_send(); + } + void peer_connection::setup_send() { if (m_channel_state[upload_channel] != peer_info::bw_idle @@ -4676,6 +4695,14 @@ namespace libtorrent TORRENT_ASSERT(amount_to_send > 0); + if (m_corked) + { +#ifdef TORRENT_VERBOSE_LOGGING + peer_log(">>> CORKED WRITE [ bytes: %d ]", amount_to_send); +#endif + return; + } + #ifdef TORRENT_VERBOSE_LOGGING peer_log(">>> ASYNC_WRITE [ bytes: %d ]", amount_to_send); #endif @@ -5004,7 +5031,7 @@ namespace libtorrent , boost::bind(&session_impl::free_buffer, boost::ref(m_ses), _1)); ++i; } - if ((flags & cork_message) == 0) setup_send(); + setup_send(); } template @@ -5042,6 +5069,9 @@ namespace libtorrent // to keep the object alive through the exit check boost::intrusive_ptr me(self()); + // flush the send buffer at the end of this function + cork _c(*this); + INVARIANT_CHECK; #ifdef TORRENT_VERBOSE_LOGGING