receive buffer optimization. added receive_buffer_size and used_receive_buffer to peer_info. changed plugin api to make use of new disk_buffer_holder type

This commit is contained in:
Arvid Norberg 2008-04-10 10:03:23 +00:00
parent 5f35d170b0
commit 093d912e9a
25 changed files with 503 additions and 86 deletions

View File

@ -244,6 +244,7 @@ SOURCES =
alert alert
assert assert
connection_queue connection_queue
disk_buffer_holder
entry entry
escape_string escape_string
gzip gzip

View File

@ -285,6 +285,16 @@ used.</li>
</ul> </ul>
</td> </td>
</tr> </tr>
<tr><td><tt class="docutils literal"><span class="pre">pool-allocators</span></tt></td>
<td><ul class="first last simple">
<li><tt class="docutils literal"><span class="pre">on</span></tt> - default, uses pool allocators for send
buffers.</li>
<li><tt class="docutils literal"><span class="pre">off</span></tt> - uses <tt class="docutils literal"><span class="pre">malloc()</span></tt> and <tt class="docutils literal"><span class="pre">free()</span></tt>
instead. Might be useful to debug buffer issues
with tools like electric fence or libgmalloc.</li>
</ul>
</td>
</tr>
<tr><td><tt class="docutils literal"><span class="pre">link</span></tt></td> <tr><td><tt class="docutils literal"><span class="pre">link</span></tt></td>
<td><ul class="first last simple"> <td><ul class="first last simple">
<li><tt class="docutils literal"><span class="pre">static</span></tt> - builds libtorrent as a static <li><tt class="docutils literal"><span class="pre">static</span></tt> - builds libtorrent as a static
@ -576,6 +586,9 @@ GCC) both little-endian and big-endian versions
will be built and the correct code will be will be built and the correct code will be
chosen at run-time.</td> chosen at run-time.</td>
</tr> </tr>
<tr><td><tt class="docutils literal"><span class="pre">TORRENT_DISABLE_POOL_ALLOCATOR</span></tt></td>
<td>Disables use of <tt class="docutils literal"><span class="pre">boost::pool&lt;&gt;</span></tt>.</td>
</tr>
<tr><td><tt class="docutils literal"><span class="pre">TORRENT_LINKING_SHARED</span></tt></td> <tr><td><tt class="docutils literal"><span class="pre">TORRENT_LINKING_SHARED</span></tt></td>
<td>If this is defined when including the <td>If this is defined when including the
libtorrent headers, the classes and functions libtorrent headers, the classes and functions

View File

@ -16,28 +16,29 @@
<col class="docinfo-content" /> <col class="docinfo-content" />
<tbody valign="top"> <tbody valign="top">
<tr><th class="docinfo-name">Author:</th> <tr><th class="docinfo-name">Author:</th>
<td>Arvid Norberg, <a class="last reference" href="mailto:arvid&#64;rasterbar.com">arvid&#64;rasterbar.com</a></td></tr> <td>Arvid Norberg, <a class="last reference external" href="mailto:arvid&#64;rasterbar.com">arvid&#64;rasterbar.com</a></td></tr>
</tbody> </tbody>
</table> </table>
<div class="section"> <div class="section" id="libtorrent-plugins">
<h1><a id="libtorrent-plugins" name="libtorrent-plugins">libtorrent plugins</a></h1> <h1>libtorrent plugins</h1>
<div class="contents topic" id="table-of-contents"> <div class="contents topic" id="contents">
<p class="topic-title first"><a id="contents" name="contents">Contents</a></p> <p class="topic-title first">Contents</p>
<ul class="simple"> <ul class="simple">
<li><a class="reference" href="#libtorrent-plugins" id="id1" name="id1">libtorrent plugins</a><ul> <li><a class="reference internal" href="#libtorrent-plugins" id="id1">libtorrent plugins</a><ul>
<li><a class="reference" href="#a-word-of-caution" id="id2" name="id2">a word of caution</a></li> <li><a class="reference internal" href="#a-word-of-caution" id="id2">a word of caution</a></li>
</ul> </ul>
</li> </li>
<li><a class="reference" href="#plugin-interface" id="id3" name="id3">plugin interface</a></li> <li><a class="reference internal" href="#plugin-interface" id="id3">plugin interface</a></li>
<li><a class="reference" href="#torrent-plugin" id="id4" name="id4">torrent_plugin</a><ul> <li><a class="reference internal" href="#torrent-plugin" id="id4">torrent_plugin</a><ul>
<li><a class="reference" href="#new-connection" id="id5" name="id5">new_connection()</a></li> <li><a class="reference internal" href="#new-connection" id="id5">new_connection()</a></li>
<li><a class="reference" href="#on-piece-pass-on-piece-fail" id="id6" name="id6">on_piece_pass() on_piece_fail()</a></li> <li><a class="reference internal" href="#on-piece-pass-on-piece-fail" id="id6">on_piece_pass() on_piece_fail()</a></li>
<li><a class="reference" href="#tick" id="id7" name="id7">tick()</a></li> <li><a class="reference internal" href="#tick" id="id7">tick()</a></li>
<li><a class="reference" href="#on-pause-on-resume" id="id8" name="id8">on_pause() on_resume()</a></li> <li><a class="reference internal" href="#on-pause-on-resume" id="id8">on_pause() on_resume()</a></li>
<li><a class="reference" href="#on-files-checked" id="id9" name="id9">on_files_checked()</a></li> <li><a class="reference internal" href="#on-files-checked" id="id9">on_files_checked()</a></li>
</ul> </ul>
</li> </li>
<li><a class="reference" href="#peer-plugin" id="id10" name="id10">peer_plugin</a></li> <li><a class="reference internal" href="#peer-plugin" id="id10">peer_plugin</a></li>
<li><a class="reference internal" href="#disk-buffer-holder" id="id11">disk_buffer_holder</a></li>
</ul> </ul>
</div> </div>
<p>libtorrent has a plugin interface for implementing extensions to the protocol. <p>libtorrent has a plugin interface for implementing extensions to the protocol.
@ -47,13 +48,13 @@ to fit a particular (closed) network.</p>
<p>In short, the plugin interface makes it possible to:</p> <p>In short, the plugin interface makes it possible to:</p>
<ul class="simple"> <ul class="simple">
<li>register extension messages (sent in the extension handshake), see <li>register extension messages (sent in the extension handshake), see
<a class="reference" href="extension_protocol.html">extensions</a>.</li> <a class="reference external" href="extension_protocol.html">extensions</a>.</li>
<li>add data and parse data from the extension handshake.</li> <li>add data and parse data from the extension handshake.</li>
<li>send extension messages and standard bittorrent messages.</li> <li>send extension messages and standard bittorrent messages.</li>
<li>override or block the handling of standard bittorrent messages.</li> <li>override or block the handling of standard bittorrent messages.</li>
</ul> </ul>
<div class="section"> <div class="section" id="a-word-of-caution">
<h2><a id="a-word-of-caution" name="a-word-of-caution">a word of caution</a></h2> <h2>a word of caution</h2>
<p>Writing your own plugin is a very easy way to introduce serious bugs such as <p>Writing your own plugin is a very easy way to introduce serious bugs such as
dead locks and race conditions. Since a plugin has access to internal dead locks and race conditions. Since a plugin has access to internal
structures it is also quite easy to sabotage libtorrent's operation.</p> structures it is also quite easy to sabotage libtorrent's operation.</p>
@ -64,12 +65,12 @@ thread, you cannot use any of the member functions on the internal structures
in libtorrent, since those require the mutex to be locked. Futhermore, you would in libtorrent, since those require the mutex to be locked. Futhermore, you would
also need to have a mutex on your own shared data within the plugin, to make also need to have a mutex on your own shared data within the plugin, to make
sure it is not accessed at the same time from the libtorrent thread (through a sure it is not accessed at the same time from the libtorrent thread (through a
callback). See <a class="reference" href="http://www.boost.org/doc/html/mutex.html">boost thread's mutex</a>. If you need to send out a message from callback). See <a class="reference external" href="http://www.boost.org/doc/html/mutex.html">boost thread's mutex</a>. If you need to send out a message from
another thread, use an internal queue, and do the actual sending in <tt class="docutils literal"><span class="pre">tick()</span></tt>.</p> another thread, use an internal queue, and do the actual sending in <tt class="docutils literal"><span class="pre">tick()</span></tt>.</p>
</div> </div>
</div> </div>
<div class="section"> <div class="section" id="plugin-interface">
<h1><a id="plugin-interface" name="plugin-interface">plugin interface</a></h1> <h1>plugin interface</h1>
<p>The plugin interface consists of two base classes that the plugin may <p>The plugin interface consists of two base classes that the plugin may
implement. These are called <tt class="docutils literal"><span class="pre">torrent_plugin</span></tt> and <tt class="docutils literal"><span class="pre">peer_plugin</span></tt>. They are implement. These are called <tt class="docutils literal"><span class="pre">torrent_plugin</span></tt> and <tt class="docutils literal"><span class="pre">peer_plugin</span></tt>. They are
both found in the <tt class="docutils literal"><span class="pre">&lt;libtorrent/extensions.hpp&gt;</span></tt> header.</p> both found in the <tt class="docutils literal"><span class="pre">&lt;libtorrent/extensions.hpp&gt;</span></tt> header.</p>
@ -92,8 +93,8 @@ for this torrent. If it is a valid pointer (to a class inheriting
<tt class="docutils literal"><span class="pre">torrent_plugin</span></tt>), it will be associated with this torrent and callbacks <tt class="docutils literal"><span class="pre">torrent_plugin</span></tt>), it will be associated with this torrent and callbacks
will be made on torrent events.</p> will be made on torrent events.</p>
</div> </div>
<div class="section"> <div class="section" id="torrent-plugin">
<h1><a id="torrent-plugin" name="torrent-plugin">torrent_plugin</a></h1> <h1>torrent_plugin</h1>
<p>The synopsis for <tt class="docutils literal"><span class="pre">torrent_plugin</span></tt> follows:</p> <p>The synopsis for <tt class="docutils literal"><span class="pre">torrent_plugin</span></tt> follows:</p>
<pre class="literal-block"> <pre class="literal-block">
struct torrent_plugin struct torrent_plugin
@ -115,8 +116,8 @@ struct torrent_plugin
<p>This is the base class for a torrent_plugin. Your derived class is (if added <p>This is the base class for a torrent_plugin. Your derived class is (if added
as an extension) instantiated for each torrent in the session. The callback as an extension) instantiated for each torrent in the session. The callback
hook functions are defined as follows.</p> hook functions are defined as follows.</p>
<div class="section"> <div class="section" id="new-connection">
<h2><a id="new-connection" name="new-connection">new_connection()</a></h2> <h2>new_connection()</h2>
<pre class="literal-block"> <pre class="literal-block">
boost::shared_ptr&lt;peer_plugin&gt; new_connection(peer_connection*); boost::shared_ptr&lt;peer_plugin&gt; new_connection(peer_connection*);
</pre> </pre>
@ -133,8 +134,8 @@ held by the torrent object. So, it is generally a good idea to not keep a
use <tt class="docutils literal"><span class="pre">weak_ptr</span></tt>.</p> use <tt class="docutils literal"><span class="pre">weak_ptr</span></tt>.</p>
<p>If this function throws an exception, the connection will be closed.</p> <p>If this function throws an exception, the connection will be closed.</p>
</div> </div>
<div class="section"> <div class="section" id="on-piece-pass-on-piece-fail">
<h2><a id="on-piece-pass-on-piece-fail" name="on-piece-pass-on-piece-fail">on_piece_pass() on_piece_fail()</a></h2> <h2>on_piece_pass() on_piece_fail()</h2>
<pre class="literal-block"> <pre class="literal-block">
void on_piece_pass(int index); void on_piece_pass(int index);
void on_piece_failed(int index); void on_piece_failed(int index);
@ -144,16 +145,16 @@ check, respectively. The <tt class="docutils literal"><span class="pre">index</s
It is possible to access the list of peers that participated in sending the It is possible to access the list of peers that participated in sending the
piece through the <tt class="docutils literal"><span class="pre">torrent</span></tt> and the <tt class="docutils literal"><span class="pre">piece_picker</span></tt>.</p> piece through the <tt class="docutils literal"><span class="pre">torrent</span></tt> and the <tt class="docutils literal"><span class="pre">piece_picker</span></tt>.</p>
</div> </div>
<div class="section"> <div class="section" id="tick">
<h2><a id="tick" name="tick">tick()</a></h2> <h2>tick()</h2>
<pre class="literal-block"> <pre class="literal-block">
void tick(); void tick();
</pre> </pre>
<p>This hook is called approximately once per second. It is a way of making it <p>This hook is called approximately once per second. It is a way of making it
easy for plugins to do timed events, for sending messages or whatever.</p> easy for plugins to do timed events, for sending messages or whatever.</p>
</div> </div>
<div class="section"> <div class="section" id="on-pause-on-resume">
<h2><a id="on-pause-on-resume" name="on-pause-on-resume">on_pause() on_resume()</a></h2> <h2>on_pause() on_resume()</h2>
<pre class="literal-block"> <pre class="literal-block">
bool on_pause(); bool on_pause();
bool on_resume(); bool on_resume();
@ -169,8 +170,8 @@ handler it will recurse back into your handler, so in order to invoke the
standard handler, you have to keep your own state on whether you want standard standard handler, you have to keep your own state on whether you want standard
behavior or overridden behavior.</p> behavior or overridden behavior.</p>
</div> </div>
<div class="section"> <div class="section" id="on-files-checked">
<h2><a id="on-files-checked" name="on-files-checked">on_files_checked()</a></h2> <h2>on_files_checked()</h2>
<pre class="literal-block"> <pre class="literal-block">
void on_files_checked(); void on_files_checked();
</pre> </pre>
@ -180,8 +181,8 @@ checked. If there are no files to check, this function is called immediately.</p
can start downloading.</p> can start downloading.</p>
</div> </div>
</div> </div>
<div class="section"> <div class="section" id="peer-plugin">
<h1><a id="peer-plugin" name="peer-plugin">peer_plugin</a></h1> <h1>peer_plugin</h1>
<pre class="literal-block"> <pre class="literal-block">
struct peer_plugin struct peer_plugin
{ {
@ -201,7 +202,7 @@ struct peer_plugin
virtual bool on_have_none(); virtual bool on_have_none();
virtual bool on_allowed_fast(int index); virtual bool on_allowed_fast(int index);
virtual bool on_request(peer_request const&amp; req); virtual bool on_request(peer_request const&amp; req);
virtual bool on_piece(peer_request const&amp; piece, char const* data); virtual bool on_piece(peer_request const&amp; piece, disk_buffer_holder&amp; buffer);
virtual bool on_cancel(peer_request const&amp; req); virtual bool on_cancel(peer_request const&amp; req);
virtual bool on_reject(peer_request const&amp; req); virtual bool on_reject(peer_request const&amp; req);
virtual bool on_suggest(int index); virtual bool on_suggest(int index);
@ -218,6 +219,24 @@ struct peer_plugin
}; };
</pre> </pre>
</div> </div>
<div class="section" id="disk-buffer-holder">
<h1>disk_buffer_holder</h1>
<pre class="literal-block">
struct disk_buffer_holder
{
disk_buffer_holder(aux::session_impl&amp; s, char* b);
~disk_buffer_holder();
char* release();
char* buffer();
};
</pre>
<p>The disk buffer holder acts like a <tt class="docutils literal"><span class="pre">scoped_ptr</span></tt> that frees a disk buffer
when it's destructed, unless it's released. <tt class="docutils literal"><span class="pre">release</span></tt> returns the disk
buffer and transferres ownership and responsibility to free it to the caller.</p>
<p>A disk buffer is freed by passing it to <tt class="docutils literal"><span class="pre">session_impl::free_disk_buffer()</span></tt>.</p>
<p><tt class="docutils literal"><span class="pre">buffer()</span></tt> returns the pointer without transferring responsibility. If
this buffer has been released, <tt class="docutils literal"><span class="pre">buffer()</span></tt> will return 0.</p>
</div>
</div> </div>
</body> </body>
</html> </html>

View File

@ -200,7 +200,7 @@ peer_plugin
virtual bool on_have_none(); virtual bool on_have_none();
virtual bool on_allowed_fast(int index); virtual bool on_allowed_fast(int index);
virtual bool on_request(peer_request const& req); virtual bool on_request(peer_request const& req);
virtual bool on_piece(peer_request const& piece, char const* data); virtual bool on_piece(peer_request const& piece, disk_buffer_holder& buffer);
virtual bool on_cancel(peer_request const& req); virtual bool on_cancel(peer_request const& req);
virtual bool on_reject(peer_request const& req); virtual bool on_reject(peer_request const& req);
virtual bool on_suggest(int index); virtual bool on_suggest(int index);
@ -216,3 +216,25 @@ peer_plugin
virtual bool write_request(peer_request const& r); virtual bool write_request(peer_request const& r);
}; };
disk_buffer_holder
==================
::
struct disk_buffer_holder
{
disk_buffer_holder(aux::session_impl& s, char* b);
~disk_buffer_holder();
char* release();
char* buffer();
};
The disk buffer holder acts like a ``scoped_ptr`` that frees a disk buffer
when it's destructed, unless it's released. ``release`` returns the disk
buffer and transferres ownership and responsibility to free it to the caller.
A disk buffer is freed by passing it to ``session_impl::free_disk_buffer()``.
``buffer()`` returns the pointer without transferring responsibility. If
this buffer has been released, ``buffer()`` will return 0.

View File

@ -2429,6 +2429,9 @@ struct peer_info
int send_buffer_size; int send_buffer_size;
int used_send_buffer; int used_send_buffer;
int receive_buffer_size;
int used_receive_buffer;
int num_hashfails; int num_hashfails;
char country[2]; char country[2];
@ -2633,6 +2636,8 @@ receive. -1 means it's unlimited.</p>
to this peer and since any transfer occurred with this peer, respectively.</p> to this peer and since any transfer occurred with this peer, respectively.</p>
<p><tt class="docutils literal"><span class="pre">send_buffer_size</span></tt> and <tt class="docutils literal"><span class="pre">used_send_buffer</span></tt> is the number of bytes allocated <p><tt class="docutils literal"><span class="pre">send_buffer_size</span></tt> and <tt class="docutils literal"><span class="pre">used_send_buffer</span></tt> is the number of bytes allocated
and used for the peer's send buffer, respectively.</p> and used for the peer's send buffer, respectively.</p>
<p><tt class="docutils literal"><span class="pre">receive_buffer_size</span></tt> and <tt class="docutils literal"><span class="pre">used_receive_buffer</span></tt> are the number of bytes
allocated and used as receive buffer, respectively.</p>
<p><tt class="docutils literal"><span class="pre">num_hashfails</span></tt> is the number of pieces this peer has participated in <p><tt class="docutils literal"><span class="pre">num_hashfails</span></tt> is the number of pieces this peer has participated in
sending us that turned out to fail the hash check.</p> sending us that turned out to fail the hash check.</p>
<p><tt class="docutils literal"><span class="pre">country</span></tt> is the two letter <a class="reference external" href="http://www.iso.org/iso/en/prods-services/iso3166ma/02iso-3166-code-lists/list-en1.html">ISO 3166 country code</a> for the country the peer <p><tt class="docutils literal"><span class="pre">country</span></tt> is the two letter <a class="reference external" href="http://www.iso.org/iso/en/prods-services/iso3166ma/02iso-3166-code-lists/list-en1.html">ISO 3166 country code</a> for the country the peer

View File

@ -2423,6 +2423,9 @@ It contains the following fields::
int send_buffer_size; int send_buffer_size;
int used_send_buffer; int used_send_buffer;
int receive_buffer_size;
int used_receive_buffer;
int num_hashfails; int num_hashfails;
char country[2]; char country[2];
@ -2605,6 +2608,9 @@ to this peer and since any transfer occurred with this peer, respectively.
``send_buffer_size`` and ``used_send_buffer`` is the number of bytes allocated ``send_buffer_size`` and ``used_send_buffer`` is the number of bytes allocated
and used for the peer's send buffer, respectively. and used for the peer's send buffer, respectively.
``receive_buffer_size`` and ``used_receive_buffer`` are the number of bytes
allocated and used as receive buffer, respectively.
``num_hashfails`` is the number of pieces this peer has participated in ``num_hashfails`` is the number of pieces this peer has participated in
sending us that turned out to fail the hash check. sending us that turned out to fail the hash check.

View File

@ -348,7 +348,7 @@ void print_peer_info(std::ostream& out, std::vector<libtorrent::peer_info> const
#endif #endif
out << "down (total | peak ) up (total | peak ) sent-req recv flags source "; out << "down (total | peak ) up (total | peak ) sent-req recv flags source ";
if (print_fails) out << "fail hshf "; if (print_fails) out << "fail hshf ";
if (print_send_bufs) out << "sndb quota "; if (print_send_bufs) out << "sndb quota rcvb ";
if (print_timers) out << "inactive wait "; if (print_timers) out << "inactive wait ";
out << "disk rtt "; out << "disk rtt ";
if (print_block) out << "block-progress "; if (print_block) out << "block-progress ";
@ -427,7 +427,8 @@ void print_peer_info(std::ostream& out, std::vector<libtorrent::peer_info> const
if (print_send_bufs) if (print_send_bufs)
{ {
out << to_string(i->used_send_buffer, 6) << " ("<< add_suffix(i->send_buffer_size) << ") " out << to_string(i->used_send_buffer, 6) << " ("<< add_suffix(i->send_buffer_size) << ") "
<< to_string(i->send_quota, 5) << " "; << to_string(i->send_quota, 5) << " "
<< to_string(i->used_receive_buffer, 6) << " ("<< add_suffix(i->receive_buffer_size) << ") ";
} }
if (print_timers) if (print_timers)
{ {

View File

@ -10,6 +10,7 @@ libtorrent/buffer.hpp \
libtorrent/connection_queue.hpp \ libtorrent/connection_queue.hpp \
libtorrent/config.hpp \ libtorrent/config.hpp \
libtorrent/debug.hpp \ libtorrent/debug.hpp \
libtorrent/disk_buffer_holder.hpp \
libtorrent/disk_io_thread.hpp \ libtorrent/disk_io_thread.hpp \
libtorrent/entry.hpp \ libtorrent/entry.hpp \
libtorrent/enum_net.hpp \ libtorrent/enum_net.hpp \

View File

@ -334,6 +334,8 @@ namespace libtorrent
std::pair<char*, int> allocate_buffer(int size); std::pair<char*, int> allocate_buffer(int size);
void free_buffer(char* buf, int size); void free_buffer(char* buf, int size);
char* allocate_disk_buffer();
void free_disk_buffer(char* buf); void free_disk_buffer(char* buf);
void set_external_address(address const& ip); void set_external_address(address const& ip);

View File

@ -210,7 +210,7 @@ namespace libtorrent
void write_cancel(peer_request const& r); void write_cancel(peer_request const& r);
void write_bitfield(std::vector<bool> const& bitfield); void write_bitfield(std::vector<bool> const& bitfield);
void write_have(int index); void write_have(int index);
void write_piece(peer_request const& r, char* buffer); void write_piece(peer_request const& r, disk_buffer_holder& buffer);
void write_handshake(); void write_handshake();
#ifndef TORRENT_DISABLE_EXTENSIONS #ifndef TORRENT_DISABLE_EXTENSIONS
void write_extensions(); void write_extensions();

View File

@ -44,6 +44,11 @@ class buffer
public: public:
struct interval struct interval
{ {
interval()
: begin(0)
, end(0)
{}
interval(char* begin, char* end) interval(char* begin, char* end)
: begin(begin) : begin(begin)
, end(end) , end(end)

View File

@ -0,0 +1,59 @@
/*
Copyright (c) 2008, Arvid Norberg
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in
the documentation and/or other materials provided with the distribution.
* Neither the name of the author nor the names of its
contributors may be used to endorse or promote products derived
from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef TORRENT_DISK_BUFFER_HOLDER_HPP_INCLUDED
#define TORRENT_DISK_BUFFER_HOLDER_HPP_INCLUDED
#include "libtorrent/config.hpp"
namespace libtorrent
{
namespace aux { class session_impl; }
struct TORRENT_EXPORT disk_buffer_holder
{
disk_buffer_holder(aux::session_impl& ses, char* buf)
: m_ses(ses), m_buf(buf) {}
~disk_buffer_holder();
char* release();
char* buffer() const { return m_buf; }
private:
aux::session_impl& m_ses;
char* m_buf;
};
}
#endif

View File

@ -174,6 +174,10 @@ namespace libtorrent
void operator()(); void operator()();
#ifndef NDEBUG
bool is_disk_buffer(char* buffer) const;
#endif
char* allocate_buffer(); char* allocate_buffer();
void free_buffer(char* buf); void free_buffer(char* buf);

View File

@ -56,6 +56,7 @@ namespace libtorrent
struct peer_request; struct peer_request;
class peer_connection; class peer_connection;
class entry; class entry;
struct disk_buffer_holder;
struct TORRENT_EXPORT torrent_plugin struct TORRENT_EXPORT torrent_plugin
{ {
@ -143,7 +144,7 @@ namespace libtorrent
virtual bool on_request(peer_request const& req) virtual bool on_request(peer_request const& req)
{ return false; } { return false; }
virtual bool on_piece(peer_request const& piece, char const* data) virtual bool on_piece(peer_request const& piece, disk_buffer_holder& data)
{ return false; } { return false; }
virtual bool on_cancel(peer_request const& req) virtual bool on_cancel(peer_request const& req)

View File

@ -75,6 +75,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/intrusive_ptr_base.hpp" #include "libtorrent/intrusive_ptr_base.hpp"
#include "libtorrent/assert.hpp" #include "libtorrent/assert.hpp"
#include "libtorrent/chained_buffer.hpp" #include "libtorrent/chained_buffer.hpp"
#include "libtorrent/disk_buffer_holder.hpp"
namespace libtorrent namespace libtorrent
{ {
@ -321,6 +322,7 @@ namespace libtorrent
void incoming_have(int piece_index); void incoming_have(int piece_index);
void incoming_bitfield(std::vector<bool> const& bitfield); void incoming_bitfield(std::vector<bool> const& bitfield);
void incoming_request(peer_request const& r); void incoming_request(peer_request const& r);
void incoming_piece(peer_request const& p, disk_buffer_holder& data);
void incoming_piece(peer_request const& p, char const* data); void incoming_piece(peer_request const& p, char const* data);
void incoming_piece_fragment(); void incoming_piece_fragment();
void incoming_cancel(peer_request const& r); void incoming_cancel(peer_request const& r);
@ -439,7 +441,7 @@ namespace libtorrent
virtual void write_cancel(peer_request const& r) = 0; virtual void write_cancel(peer_request const& r) = 0;
virtual void write_have(int index) = 0; virtual void write_have(int index) = 0;
virtual void write_keepalive() = 0; virtual void write_keepalive() = 0;
virtual void write_piece(peer_request const& r, char* buffer) = 0; virtual void write_piece(peer_request const& r, disk_buffer_holder& buffer) = 0;
virtual void write_reject_request(peer_request const& r) = 0; virtual void write_reject_request(peer_request const& r) = 0;
virtual void write_allow_fast(int piece) = 0; virtual void write_allow_fast(int piece) = 0;
@ -455,10 +457,14 @@ namespace libtorrent
#ifndef TORRENT_DISABLE_ENCRYPTION #ifndef TORRENT_DISABLE_ENCRYPTION
buffer::interval wr_recv_buffer() buffer::interval wr_recv_buffer()
{ {
TORRENT_ASSERT(m_disk_recv_buffer == 0);
TORRENT_ASSERT(m_disk_recv_buffer_size == 0);
if (m_recv_buffer.empty()) return buffer::interval(0,0); if (m_recv_buffer.empty()) return buffer::interval(0,0);
return buffer::interval(&m_recv_buffer[0] return buffer::interval(&m_recv_buffer[0]
, &m_recv_buffer[0] + m_recv_pos); , &m_recv_buffer[0] + m_recv_pos);
} }
std::pair<buffer::interval, buffer::interval> wr_recv_buffers(int bytes);
#endif #endif
buffer::const_interval receive_buffer() const buffer::const_interval receive_buffer() const
@ -468,8 +474,10 @@ namespace libtorrent
, &m_recv_buffer[0] + m_recv_pos); , &m_recv_buffer[0] + m_recv_pos);
} }
bool allocate_disk_receive_buffer(int disk_buffer_size);
char* release_disk_receive_buffer();
bool has_disk_receive_buffer() const { return m_disk_recv_buffer; }
void cut_receive_buffer(int size, int packet_size); void cut_receive_buffer(int size, int packet_size);
void reset_recv_buffer(int packet_size); void reset_recv_buffer(int packet_size);
void setup_receive(); void setup_receive();
@ -555,6 +563,13 @@ namespace libtorrent
int m_recv_pos; int m_recv_pos;
buffer m_recv_buffer; buffer m_recv_buffer;
// if this peer is receiving a piece, this
// points to a disk buffer that the data is
// read into. This eliminates a memcopy from
// the receive buffer into the disk buffer
int m_disk_recv_buffer_size;
char* m_disk_recv_buffer;
chained_buffer m_send_buffer; chained_buffer m_send_buffer;
// the number of bytes we are currently reading // the number of bytes we are currently reading

View File

@ -111,6 +111,9 @@ namespace libtorrent
// the number bytes that's actually used of the send buffer // the number bytes that's actually used of the send buffer
int used_send_buffer; int used_send_buffer;
int receive_buffer_size;
int used_receive_buffer;
// the number of failed hashes for this peer // the number of failed hashes for this peer
int num_hashfails; int num_hashfails;

View File

@ -71,6 +71,7 @@ namespace libtorrent
class session; class session;
struct file_pool; struct file_pool;
struct disk_io_job; struct disk_io_job;
struct disk_buffer_holder;
enum storage_mode_t enum storage_mode_t
{ {
@ -216,7 +217,7 @@ namespace libtorrent
void async_write( void async_write(
peer_request const& r peer_request const& r
, char const* buffer , disk_buffer_holder& buffer
, boost::function<void(int, disk_io_job const&)> const& f); , boost::function<void(int, disk_io_job const&)> const& f);
void async_hash(int piece, boost::function<void(int, disk_io_job const&)> const& f); void async_hash(int piece, boost::function<void(int, disk_io_job const&)> const& f);

View File

@ -122,7 +122,7 @@ namespace libtorrent
void write_cancel(peer_request const& r) void write_cancel(peer_request const& r)
{ incoming_reject_request(r); } { incoming_reject_request(r); }
void write_have(int index) {} void write_have(int index) {}
void write_piece(peer_request const& r, char* buffer) { TORRENT_ASSERT(false); } void write_piece(peer_request const& r, disk_buffer_holder& buffer) { TORRENT_ASSERT(false); }
void write_keepalive() {} void write_keepalive() {}
void on_connected(); void on_connected();
void write_reject_request(peer_request const&) {} void write_reject_request(peer_request const&) {}

View File

@ -23,7 +23,7 @@ alert.cpp identify_client.cpp ip_filter.cpp file.cpp metadata_transfer.cpp \
logger.cpp file_pool.cpp ut_pex.cpp lsd.cpp upnp.cpp instantiate_connection.cpp \ logger.cpp file_pool.cpp ut_pex.cpp lsd.cpp upnp.cpp instantiate_connection.cpp \
socks5_stream.cpp socks4_stream.cpp http_stream.cpp connection_queue.cpp \ socks5_stream.cpp socks4_stream.cpp http_stream.cpp connection_queue.cpp \
disk_io_thread.cpp ut_metadata.cpp magnet_uri.cpp udp_socket.cpp smart_ban.cpp \ disk_io_thread.cpp ut_metadata.cpp magnet_uri.cpp udp_socket.cpp smart_ban.cpp \
http_parser.cpp gzip.cpp $(kademlia_sources) http_parser.cpp gzip.cpp disk_buffer_holder.cpp $(kademlia_sources)
noinst_HEADERS = \ noinst_HEADERS = \
$(top_srcdir)/include/libtorrent/alert.hpp \ $(top_srcdir)/include/libtorrent/alert.hpp \

View File

@ -993,6 +993,14 @@ namespace libtorrent
buffer::const_interval recv_buffer = receive_buffer(); buffer::const_interval recv_buffer = receive_buffer();
int recv_pos = recv_buffer.end - recv_buffer.begin; int recv_pos = recv_buffer.end - recv_buffer.begin;
if (recv_pos == 1)
{
TORRENT_ASSERT(!has_disk_receive_buffer());
if (!allocate_disk_receive_buffer(packet_size() - 9))
return;
}
TORRENT_ASSERT(has_disk_receive_buffer());
// classify the received data as protocol chatter // classify the received data as protocol chatter
// or data payload for the statistics // or data payload for the statistics
if (recv_pos <= 9) if (recv_pos <= 9)
@ -1021,7 +1029,8 @@ namespace libtorrent
p.start = detail::read_int32(ptr); p.start = detail::read_int32(ptr);
p.length = packet_size() - 9; p.length = packet_size() - 9;
incoming_piece(p, recv_buffer.begin + 9); disk_buffer_holder holder(m_ses, release_disk_receive_buffer());
incoming_piece(p, holder);
} }
// ----------------------------- // -----------------------------
@ -1322,6 +1331,7 @@ namespace libtorrent
buffer::const_interval recv_buffer = receive_buffer(); buffer::const_interval recv_buffer = receive_buffer();
TORRENT_ASSERT(recv_buffer.left() >= 1);
int packet_type = recv_buffer[0]; int packet_type = recv_buffer[0];
if (packet_type < 0 if (packet_type < 0
|| packet_type >= num_supported_messages || packet_type >= num_supported_messages
@ -1643,7 +1653,7 @@ namespace libtorrent
send_buffer(msg, sizeof(msg)); send_buffer(msg, sizeof(msg));
} }
void bt_peer_connection::write_piece(peer_request const& r, char* buffer) void bt_peer_connection::write_piece(peer_request const& r, disk_buffer_holder& buffer)
{ {
INVARIANT_CHECK; INVARIANT_CHECK;
@ -1661,9 +1671,10 @@ namespace libtorrent
detail::write_int32(r.start, ptr); detail::write_int32(r.start, ptr);
send_buffer(msg, sizeof(msg)); send_buffer(msg, sizeof(msg));
append_send_buffer(buffer, r.length append_send_buffer(buffer.buffer(), r.length
, boost::bind(&session_impl::free_disk_buffer , boost::bind(&session_impl::free_disk_buffer
, boost::ref(m_ses), _1)); , boost::ref(m_ses), _1));
buffer.release();
m_payloads.push_back(range(send_buffer_size() - r.length, r.length)); m_payloads.push_back(range(send_buffer_size() - r.length, r.length));
setup_send(); setup_send();
@ -1711,8 +1722,9 @@ namespace libtorrent
TORRENT_ASSERT(in_handshake() || !m_rc4_encrypted || m_encrypted); TORRENT_ASSERT(in_handshake() || !m_rc4_encrypted || m_encrypted);
if (m_rc4_encrypted && m_encrypted) if (m_rc4_encrypted && m_encrypted)
{ {
buffer::interval wr_buf = wr_recv_buffer(); std::pair<buffer::interval, buffer::interval> wr_buf = wr_recv_buffers(bytes_transferred);
m_RC4_handler->decrypt((wr_buf.end - bytes_transferred), bytes_transferred); m_RC4_handler->decrypt(wr_buf.first.begin, wr_buf.first.left());
if (wr_buf.second.left()) m_RC4_handler->decrypt(wr_buf.second.begin, wr_buf.second.left());
} }
#endif #endif
@ -1724,10 +1736,10 @@ namespace libtorrent
// for outgoing // for outgoing
if (m_state == read_pe_dhkey) if (m_state == read_pe_dhkey)
{ {
assert (!m_encrypted); TORRENT_ASSERT(!m_encrypted);
assert (!m_rc4_encrypted); TORRENT_ASSERT(!m_rc4_encrypted);
assert (packet_size() == dh_key_len); TORRENT_ASSERT(packet_size() == dh_key_len);
assert (recv_buffer == receive_buffer()); TORRENT_ASSERT(recv_buffer == receive_buffer());
if (!packet_finished()) return; if (!packet_finished()) return;
@ -2232,7 +2244,7 @@ namespace libtorrent
if (m_state == read_protocol_identifier) if (m_state == read_protocol_identifier)
{ {
assert (packet_size() == 20); TORRENT_ASSERT(packet_size() == 20);
if (!packet_finished()) return; if (!packet_finished()) return;
recv_buffer = receive_buffer(); recv_buffer = receive_buffer();
@ -2263,14 +2275,14 @@ namespace libtorrent
return; return;
} }
assert ((!is_local() && m_encrypted) || is_local()); TORRENT_ASSERT((!is_local() && m_encrypted) || is_local());
#endif // #ifndef TORRENT_DISABLE_ENCRYPTION #endif // #ifndef TORRENT_DISABLE_ENCRYPTION
disconnect("incorrect protocol identifier"); disconnect("incorrect protocol identifier");
return; return;
} }
#ifndef TORRENT_DISABLE_ENCRYPTION #ifndef TORRENT_DISABLE_ENCRYPTION
assert (m_state != read_pe_dhkey); TORRENT_ASSERT(m_state != read_pe_dhkey);
if (!is_local() && if (!is_local() &&
(m_ses.get_pe_settings().in_enc_policy == pe_settings::forced) && (m_ses.get_pe_settings().in_enc_policy == pe_settings::forced) &&
@ -2494,7 +2506,7 @@ namespace libtorrent
#endif #endif
m_state = read_packet_size; m_state = read_packet_size;
reset_recv_buffer(4); reset_recv_buffer(5);
if (t->valid_metadata()) if (t->valid_metadata())
{ {
write_bitfield(t->pieces()); write_bitfield(t->pieces());
@ -2512,11 +2524,12 @@ namespace libtorrent
if (m_state == read_packet_size) if (m_state == read_packet_size)
{ {
// Make sure this is not fallen though into // Make sure this is not fallen though into
assert (recv_buffer == receive_buffer()); TORRENT_ASSERT(recv_buffer == receive_buffer());
if (!t) return; if (!t) return;
m_statistics.received_bytes(0, bytes_transferred); m_statistics.received_bytes(0, bytes_transferred);
if (!packet_finished()) return;
if (recv_buffer.left() < 4) return;
const char* ptr = recv_buffer.begin; const char* ptr = recv_buffer.begin;
int packet_size = detail::read_int32(ptr); int packet_size = detail::read_int32(ptr);
@ -2536,15 +2549,19 @@ namespace libtorrent
incoming_keepalive(); incoming_keepalive();
// keepalive message // keepalive message
m_state = read_packet_size; m_state = read_packet_size;
reset_recv_buffer(4); cut_receive_buffer(4, 4);
return;
} }
else else
{ {
if (recv_buffer.left() < 5) return;
m_state = read_packet; m_state = read_packet;
reset_recv_buffer(packet_size); cut_receive_buffer(4, packet_size);
bytes_transferred = 1;
recv_buffer = receive_buffer();
TORRENT_ASSERT(recv_buffer.left() == 1);
} }
TORRENT_ASSERT(!packet_finished());
return;
} }
if (m_state == read_packet) if (m_state == read_packet)
@ -2554,7 +2571,7 @@ namespace libtorrent
if (dispatch_message(bytes_transferred)) if (dispatch_message(bytes_transferred))
{ {
m_state = read_packet_size; m_state = read_packet_size;
reset_recv_buffer(4); reset_recv_buffer(5);
} }
TORRENT_ASSERT(!packet_finished()); TORRENT_ASSERT(!packet_finished());
return; return;

View File

@ -0,0 +1,50 @@
/*
Copyright (c) 2008, Arvid Norberg
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in
the documentation and/or other materials provided with the distribution.
* Neither the name of the author nor the names of its
contributors may be used to endorse or promote products derived
from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
*/
#include "libtorrent/disk_buffer_holder.hpp"
#include "libtorrent/aux_/session_impl.hpp"
namespace libtorrent
{
char* disk_buffer_holder::release()
{
char* ret = m_buf;
m_buf = 0;
return ret;
}
disk_buffer_holder::~disk_buffer_holder()
{
if (m_buf) m_ses.free_disk_buffer(m_buf);
}
}

View File

@ -678,6 +678,18 @@ namespace libtorrent
m_signal.notify_all(); m_signal.notify_all();
} }
#ifndef NDEBUG
bool disk_io_thread::is_disk_buffer(char* buffer) const
{
#ifdef TORRENT_DISABLE_POOL_ALLOCATOR
return true;
#else
mutex_t::scoped_lock l(m_mutex);
return m_pool.is_from(buffer);
#endif
}
#endif
char* disk_io_thread::allocate_buffer() char* disk_io_thread::allocate_buffer()
{ {
mutex_t::scoped_lock l(m_mutex); mutex_t::scoped_lock l(m_mutex);

View File

@ -59,7 +59,6 @@ using libtorrent::aux::session_impl;
namespace libtorrent namespace libtorrent
{ {
// outbound connection // outbound connection
peer_connection::peer_connection( peer_connection::peer_connection(
session_impl& ses session_impl& ses
@ -81,6 +80,8 @@ namespace libtorrent
, m_last_unchoke(min_time()) , m_last_unchoke(min_time())
, m_packet_size(0) , m_packet_size(0)
, m_recv_pos(0) , m_recv_pos(0)
, m_disk_recv_buffer_size(0)
, m_disk_recv_buffer(0)
, m_reading_bytes(0) , m_reading_bytes(0)
, m_last_receive(time_now()) , m_last_receive(time_now())
, m_last_sent(time_now()) , m_last_sent(time_now())
@ -167,6 +168,8 @@ namespace libtorrent
, m_last_unchoke(min_time()) , m_last_unchoke(min_time())
, m_packet_size(0) , m_packet_size(0)
, m_recv_pos(0) , m_recv_pos(0)
, m_disk_recv_buffer_size(0)
, m_disk_recv_buffer(0)
, m_reading_bytes(0) , m_reading_bytes(0)
, m_last_receive(time_now()) , m_last_receive(time_now())
, m_last_sent(time_now()) , m_last_sent(time_now())
@ -451,6 +454,13 @@ namespace libtorrent
TORRENT_ASSERT(!m_in_constructor); TORRENT_ASSERT(!m_in_constructor);
TORRENT_ASSERT(m_disconnecting); TORRENT_ASSERT(m_disconnecting);
if (m_disk_recv_buffer)
{
m_ses.free_disk_buffer(m_disk_recv_buffer);
m_disk_recv_buffer = 0;
m_disk_recv_buffer_size = 0;
}
#if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
if (m_logger) if (m_logger)
{ {
@ -1368,12 +1378,27 @@ namespace libtorrent
// ----------------------------- // -----------------------------
void peer_connection::incoming_piece(peer_request const& p, char const* data) void peer_connection::incoming_piece(peer_request const& p, char const* data)
{
char* buffer = m_ses.allocate_disk_buffer();
if (buffer == 0)
{
disconnect("out of memory");
return;
}
disk_buffer_holder holder(m_ses, buffer);
incoming_piece(p, holder);
}
void peer_connection::incoming_piece(peer_request const& p, disk_buffer_holder& data)
{ {
INVARIANT_CHECK; INVARIANT_CHECK;
boost::shared_ptr<torrent> t = m_torrent.lock(); boost::shared_ptr<torrent> t = m_torrent.lock();
TORRENT_ASSERT(t); TORRENT_ASSERT(t);
TORRENT_ASSERT(m_disk_recv_buffer == 0);
TORRENT_ASSERT(m_disk_recv_buffer_size == 0);
#ifndef TORRENT_DISABLE_EXTENSIONS #ifndef TORRENT_DISABLE_EXTENSIONS
for (extension_list_t::iterator i = m_extensions.begin() for (extension_list_t::iterator i = m_extensions.begin()
, end(m_extensions.end()); i != end; ++i) , end(m_extensions.end()); i != end; ++i)
@ -2079,7 +2104,6 @@ namespace libtorrent
m_last_piece = time_now(); m_last_piece = time_now();
} }
void peer_connection::timed_out() void peer_connection::timed_out()
{ {
TORRENT_ASSERT(m_connecting); TORRENT_ASSERT(m_connecting);
@ -2301,10 +2325,51 @@ namespace libtorrent
p.remote_dl_rate = m_remote_dl_rate; p.remote_dl_rate = m_remote_dl_rate;
p.send_buffer_size = m_send_buffer.capacity(); p.send_buffer_size = m_send_buffer.capacity();
p.used_send_buffer = m_send_buffer.size(); p.used_send_buffer = m_send_buffer.size();
p.receive_buffer_size = m_recv_buffer.capacity() + m_disk_recv_buffer_size;
p.used_receive_buffer = m_recv_pos;
p.write_state = m_channel_state[upload_channel]; p.write_state = m_channel_state[upload_channel];
p.read_state = m_channel_state[download_channel]; p.read_state = m_channel_state[download_channel];
} }
// allocates a disk buffer of size 'disk_buffer_size' and replaces the
// end of the current receive buffer with it. i.e. the receive pos
// must be <= packet_size - disk_buffer_size
// the disk buffer can be accessed through release_disk_receive_buffer()
// when it is queried, the responsibility to free it is transferred
// to the caller
bool peer_connection::allocate_disk_receive_buffer(int disk_buffer_size)
{
INVARIANT_CHECK;
TORRENT_ASSERT(m_packet_size > 0);
TORRENT_ASSERT(m_recv_pos <= m_packet_size - disk_buffer_size);
TORRENT_ASSERT(m_disk_recv_buffer == 0);
TORRENT_ASSERT(disk_buffer_size <= 16 * 1024);
if (disk_buffer_size > 16 * 1024)
{
disconnect("invalid piece size");
return false;
}
m_disk_recv_buffer = m_ses.allocate_disk_buffer();
if (m_disk_recv_buffer == 0)
{
disconnect("out of memory");
return false;
}
m_disk_recv_buffer_size = disk_buffer_size;
return true;
}
char* peer_connection::release_disk_receive_buffer()
{
char* ret = m_disk_recv_buffer;
m_disk_recv_buffer = 0;
m_disk_recv_buffer_size = 0;
return ret;
}
void peer_connection::cut_receive_buffer(int size, int packet_size) void peer_connection::cut_receive_buffer(int size, int packet_size)
{ {
INVARIANT_CHECK; INVARIANT_CHECK;
@ -2324,7 +2389,6 @@ namespace libtorrent
#endif #endif
m_packet_size = packet_size; m_packet_size = packet_size;
if (m_packet_size >= m_recv_pos) m_recv_buffer.resize(m_packet_size);
} }
void peer_connection::second_tick(float tick_interval) void peer_connection::second_tick(float tick_interval)
@ -2542,9 +2606,10 @@ namespace libtorrent
m_reading_bytes -= r.length; m_reading_bytes -= r.length;
disk_buffer_holder buffer(m_ses, j.buffer);
if (ret != r.length || m_torrent.expired()) if (ret != r.length || m_torrent.expired())
{ {
if (j.buffer) m_ses.free_disk_buffer(j.buffer);
boost::shared_ptr<torrent> t = m_torrent.lock(); boost::shared_ptr<torrent> t = m_torrent.lock();
if (!t) if (!t)
{ {
@ -2572,7 +2637,7 @@ namespace libtorrent
<< " | l: " << r.length << " ]\n"; << " | l: " << r.length << " ]\n";
#endif #endif
write_piece(r, j.buffer); write_piece(r, buffer);
setup_send(); setup_send();
} }
@ -2735,11 +2800,84 @@ namespace libtorrent
#ifdef TORRENT_VERBOSE_LOGGING #ifdef TORRENT_VERBOSE_LOGGING
(*m_logger) << time_now_string() << " *** ASYNC_READ [ max: " << max_receive << " bytes ]\n"; (*m_logger) << time_now_string() << " *** ASYNC_READ [ max: " << max_receive << " bytes ]\n";
#endif #endif
m_socket->async_read_some(asio::buffer(&m_recv_buffer[m_recv_pos]
, max_receive), bind(&peer_connection::on_receive_data, self(), _1, _2)); int regular_buffer_size = m_packet_size - m_disk_recv_buffer_size;
if (int(m_recv_buffer.size()) < regular_buffer_size)
m_recv_buffer.resize(regular_buffer_size);
if (m_disk_recv_buffer == 0 || regular_buffer_size >= m_recv_pos + max_receive)
{
// only receive into regular buffer
TORRENT_ASSERT(m_recv_pos + max_receive <= int(m_recv_buffer.size()));
m_socket->async_read_some(asio::buffer(&m_recv_buffer[m_recv_pos]
, max_receive), bind(&peer_connection::on_receive_data, self(), _1, _2));
}
else if (m_recv_pos >= regular_buffer_size)
{
// only receive into disk buffer
TORRENT_ASSERT(m_recv_pos - regular_buffer_size >= 0);
TORRENT_ASSERT(m_recv_pos - regular_buffer_size + max_receive <= m_disk_recv_buffer_size);
m_socket->async_read_some(asio::buffer(m_disk_recv_buffer + m_recv_pos - regular_buffer_size
, max_receive)
, bind(&peer_connection::on_receive_data, self(), _1, _2));
}
else
{
// receive into both regular and disk buffer
TORRENT_ASSERT(max_receive + m_recv_pos > regular_buffer_size);
TORRENT_ASSERT(m_recv_pos < regular_buffer_size);
TORRENT_ASSERT(max_receive - regular_buffer_size
+ m_recv_pos <= m_disk_recv_buffer_size);
boost::array<asio::mutable_buffer, 2> vec;
vec[0] = asio::buffer(&m_recv_buffer[m_recv_pos]
, regular_buffer_size - m_recv_pos);
vec[1] = asio::buffer(m_disk_recv_buffer
, max_receive - regular_buffer_size + m_recv_pos);
m_socket->async_read_some(vec, bind(&peer_connection::on_receive_data
, self(), _1, _2));
}
m_channel_state[download_channel] = peer_info::bw_network; m_channel_state[download_channel] = peer_info::bw_network;
} }
#ifndef TORRENT_DISABLE_ENCRYPTION
// returns the last 'bytes' from the receive buffer
std::pair<buffer::interval, buffer::interval> peer_connection::wr_recv_buffers(int bytes)
{
TORRENT_ASSERT(bytes <= m_recv_pos);
std::pair<buffer::interval, buffer::interval> vec;
int regular_buffer_size = m_packet_size - m_disk_recv_buffer_size;
TORRENT_ASSERT(regular_buffer_size >= 0);
if (m_disk_recv_buffer == 0 || regular_buffer_size >= m_recv_pos)
{
vec.first = buffer::interval(&m_recv_buffer[0]
+ m_recv_pos - bytes, &m_recv_buffer[0] + m_recv_pos);
vec.second = buffer::interval(0,0);
}
else if (m_recv_pos - bytes >= regular_buffer_size)
{
vec.first = buffer::interval(m_disk_recv_buffer + m_recv_pos
- regular_buffer_size - bytes, m_disk_recv_buffer + m_recv_pos
- regular_buffer_size);
vec.second = buffer::interval(0,0);
}
else
{
TORRENT_ASSERT(m_recv_pos - bytes < regular_buffer_size);
TORRENT_ASSERT(m_recv_pos > regular_buffer_size);
vec.first = buffer::interval(&m_recv_buffer[0] + m_recv_pos - bytes
, &m_recv_buffer[0] + regular_buffer_size);
vec.second = buffer::interval(m_disk_recv_buffer
, m_disk_recv_buffer + m_recv_pos - regular_buffer_size);
}
TORRENT_ASSERT(vec.first.left() + vec.second.left() == bytes);
return vec;
}
#endif
void peer_connection::reset_recv_buffer(int packet_size) void peer_connection::reset_recv_buffer(int packet_size)
{ {
TORRENT_ASSERT(packet_size > 0); TORRENT_ASSERT(packet_size > 0);
@ -2750,8 +2888,6 @@ namespace libtorrent
} }
m_recv_pos = 0; m_recv_pos = 0;
m_packet_size = packet_size; m_packet_size = packet_size;
if (int(m_recv_buffer.size()) < m_packet_size)
m_recv_buffer.resize(m_packet_size);
} }
void peer_connection::send_buffer(char const* buf, int size) void peer_connection::send_buffer(char const* buf, int size)
@ -2861,6 +2997,7 @@ namespace libtorrent
return; return;
} }
int max_receive = 0;
do do
{ {
#ifdef TORRENT_VERBOSE_LOGGING #ifdef TORRENT_VERBOSE_LOGGING
@ -2877,7 +3014,8 @@ namespace libtorrent
m_last_receive = time_now(); m_last_receive = time_now();
m_recv_pos += bytes_transferred; m_recv_pos += bytes_transferred;
TORRENT_ASSERT(m_recv_pos <= int(m_recv_buffer.size())); TORRENT_ASSERT(m_recv_pos <= int(m_recv_buffer.size()
+ m_disk_recv_buffer_size));
on_receive(error, bytes_transferred); on_receive(error, bytes_transferred);
@ -2890,21 +3028,57 @@ namespace libtorrent
buffer(m_packet_size).swap(m_recv_buffer); buffer(m_packet_size).swap(m_recv_buffer);
} }
int max_receive = m_packet_size - m_recv_pos; max_receive = m_packet_size - m_recv_pos;
int quota_left = m_bandwidth_limit[download_channel].quota_left(); int quota_left = m_bandwidth_limit[download_channel].quota_left();
if (!m_ignore_bandwidth_limits && max_receive > quota_left) if (!m_ignore_bandwidth_limits && max_receive > quota_left)
max_receive = quota_left; max_receive = quota_left;
if (max_receive == 0) break; if (max_receive == 0) break;
int regular_buffer_size = m_packet_size - m_disk_recv_buffer_size;
if (int(m_recv_buffer.size()) < regular_buffer_size)
m_recv_buffer.resize(regular_buffer_size);
asio::error_code ec; asio::error_code ec;
bytes_transferred = m_socket->read_some(asio::buffer(&m_recv_buffer[m_recv_pos] if (m_disk_recv_buffer == 0 || regular_buffer_size >= m_recv_pos + max_receive)
, max_receive), ec); {
// only receive into regular buffer
TORRENT_ASSERT(m_recv_pos + max_receive <= int(m_recv_buffer.size()));
bytes_transferred = m_socket->read_some(asio::buffer(&m_recv_buffer[m_recv_pos]
, max_receive), ec);
}
else if (m_recv_pos >= regular_buffer_size)
{
// only receive into disk buffer
TORRENT_ASSERT(m_recv_pos - regular_buffer_size >= 0);
TORRENT_ASSERT(m_recv_pos - regular_buffer_size + max_receive <= m_disk_recv_buffer_size);
bytes_transferred = m_socket->read_some(asio::buffer(m_disk_recv_buffer
+ m_recv_pos - regular_buffer_size, (std::min)(m_packet_size
- m_recv_pos, max_receive)), ec);
}
else
{
// receive into both regular and disk buffer
TORRENT_ASSERT(max_receive + m_recv_pos > regular_buffer_size);
TORRENT_ASSERT(m_recv_pos < regular_buffer_size);
TORRENT_ASSERT(max_receive - regular_buffer_size
+ m_recv_pos <= m_disk_recv_buffer_size);
boost::array<asio::mutable_buffer, 2> vec;
vec[0] = asio::buffer(&m_recv_buffer[m_recv_pos]
, regular_buffer_size - m_recv_pos);
vec[1] = asio::buffer(m_disk_recv_buffer
, (std::min)(m_disk_recv_buffer_size
, max_receive - regular_buffer_size + m_recv_pos));
bytes_transferred = m_socket->read_some(vec, ec);
}
if (ec && ec != asio::error::would_block) if (ec && ec != asio::error::would_block)
{ {
disconnect(ec.message().c_str()); disconnect(ec.message().c_str());
return; return;
} }
if (ec == asio::error::would_block) break;
} }
while (bytes_transferred > 0); while (bytes_transferred > 0);
@ -3109,6 +3283,8 @@ namespace libtorrent
#ifndef NDEBUG #ifndef NDEBUG
void peer_connection::check_invariant() const void peer_connection::check_invariant() const
{ {
TORRENT_ASSERT((m_disk_recv_buffer != 0) == (m_disk_recv_buffer_size > 0));
boost::shared_ptr<torrent> t = m_torrent.lock(); boost::shared_ptr<torrent> t = m_torrent.lock();
if (m_disconnecting) if (m_disconnecting)
{ {

View File

@ -2215,6 +2215,11 @@ namespace aux {
{ {
m_disk_thread.free_buffer(buf); m_disk_thread.free_buffer(buf);
} }
char* session_impl::allocate_disk_buffer()
{
return m_disk_thread.allocate_buffer();
}
std::pair<char*, int> session_impl::allocate_buffer(int size) std::pair<char*, int> session_impl::allocate_buffer(int size)
{ {

View File

@ -67,6 +67,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/invariant_check.hpp" #include "libtorrent/invariant_check.hpp"
#include "libtorrent/file_pool.hpp" #include "libtorrent/file_pool.hpp"
#include "libtorrent/aux_/session_impl.hpp" #include "libtorrent/aux_/session_impl.hpp"
#include "libtorrent/disk_buffer_holder.hpp"
#ifndef NDEBUG #ifndef NDEBUG
#include <ios> #include <ios>
@ -1243,10 +1244,12 @@ namespace libtorrent
void piece_manager::async_write( void piece_manager::async_write(
peer_request const& r peer_request const& r
, char const* buffer , disk_buffer_holder& buffer
, boost::function<void(int, disk_io_job const&)> const& handler) , boost::function<void(int, disk_io_job const&)> const& handler)
{ {
TORRENT_ASSERT(r.length <= 16 * 1024); TORRENT_ASSERT(r.length <= 16 * 1024);
// the buffer needs to be allocated through the io_thread
TORRENT_ASSERT(m_io_thread.is_disk_buffer(buffer.buffer()));
disk_io_job j; disk_io_job j;
j.storage = this; j.storage = this;
@ -1254,13 +1257,9 @@ namespace libtorrent
j.piece = r.piece; j.piece = r.piece;
j.offset = r.start; j.offset = r.start;
j.buffer_size = r.length; j.buffer_size = r.length;
j.buffer = m_io_thread.allocate_buffer(); j.buffer = buffer.buffer();
#ifndef BOOST_NO_EXCEPTIONS
if (j.buffer == 0) throw file_error("out of memory");
// TODO: return error code instead of throwing
#endif
std::memcpy(j.buffer, buffer, j.buffer_size);
m_io_thread.add_job(j, handler); m_io_thread.add_job(j, handler);
buffer.release();
} }
void piece_manager::async_hash(int piece void piece_manager::async_hash(int piece