simplify the buffer_allocator_interface

This commit is contained in:
arvidn 2016-12-25 23:38:05 -08:00 committed by Arvid Norberg
parent cda715c152
commit 192ef4962b
9 changed files with 59 additions and 94 deletions

View File

@ -622,10 +622,6 @@ namespace libtorrent
// implements buffer_allocator_interface
void free_disk_buffer(char* buf) override;
disk_buffer_holder allocate_disk_buffer(char const* category) override;
disk_buffer_holder allocate_disk_buffer(bool& exceeded
, std::shared_ptr<disk_observer> o
, char const* category) override;
void reclaim_blocks(span<block_cache_reference> refs) override;
void do_reclaim_blocks();

View File

@ -50,10 +50,6 @@ namespace libtorrent
{
virtual void free_disk_buffer(char* b) = 0;
virtual void reclaim_blocks(span<aux::block_cache_reference> refs) = 0;
virtual disk_buffer_holder allocate_disk_buffer(char const* category) = 0;
virtual disk_buffer_holder allocate_disk_buffer(bool& exceeded
, std::shared_ptr<disk_observer> o
, char const* category) = 0;
protected:
~buffer_allocator_interface() {}
};

View File

@ -36,6 +36,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/bdecode.hpp"
#include <string>
#include <memory>
#include "libtorrent/units.hpp"
#include "libtorrent/aux_/vector.hpp"
@ -77,8 +78,8 @@ namespace libtorrent
virtual void async_read(storage_interface* storage, peer_request const& r
, std::function<void(aux::block_cache_reference ref, char* block
, int flags, storage_error const& se)> handler, void* requester, std::uint8_t flags = 0) = 0;
virtual void async_write(storage_interface* storage, peer_request const& r
, disk_buffer_holder buffer
virtual bool async_write(storage_interface* storage, peer_request const& r
, char const* buf, std::shared_ptr<disk_observer> o
, std::function<void(storage_error const&)> handler
, std::uint8_t flags = 0) = 0;
virtual void async_hash(storage_interface* storage, piece_index_t piece, std::uint8_t flags

View File

@ -294,8 +294,8 @@ namespace libtorrent
void async_read(storage_interface* storage, peer_request const& r
, std::function<void(aux::block_cache_reference ref, char* block
, int flags, storage_error const& se)> handler, void* requester, std::uint8_t flags = 0) override;
void async_write(storage_interface* storage, peer_request const& r
, disk_buffer_holder buffer
bool async_write(storage_interface* storage, peer_request const& r
, char const* buf, std::shared_ptr<disk_observer> o
, std::function<void(storage_error const&)> handler
, std::uint8_t flags = 0) override;
void async_hash(storage_interface* storage, piece_index_t piece, std::uint8_t flags
@ -332,16 +332,7 @@ namespace libtorrent
// implements buffer_allocator_interface
void reclaim_blocks(span<aux::block_cache_reference> ref) override;
void free_disk_buffer(char* buf) override { m_disk_cache.free_buffer(buf); }
disk_buffer_holder allocate_disk_buffer(char const* category) override
{
bool exceed = false;
return allocate_disk_buffer(exceed, std::shared_ptr<disk_observer>(), category);
}
void trigger_cache_trim();
disk_buffer_holder allocate_disk_buffer(bool& exceeded, std::shared_ptr<disk_observer> o
, char const* category) override;
void update_stats_counters(counters& c) const override;
void get_cache_info(cache_status* ret, bool no_pieces = true
, storage_interface const* storage = 0) const override;

View File

@ -550,7 +550,6 @@ namespace libtorrent
void incoming_dont_have(piece_index_t piece_index);
void incoming_bitfield(typed_bitfield<piece_index_t> const& bits);
void incoming_request(peer_request const& r);
void incoming_piece(peer_request const& p, disk_buffer_holder data);
void incoming_piece(peer_request const& p, char const* data);
void incoming_piece_fragment(int bytes);
void start_receive_piece(peer_request const& r);

View File

@ -1625,14 +1625,19 @@ namespace libtorrent
return 1;
}
void disk_io_thread::async_write(storage_interface* storage, peer_request const& r
, disk_buffer_holder buffer
bool disk_io_thread::async_write(storage_interface* storage, peer_request const& r
, char const* buf, std::shared_ptr<disk_observer> o
, std::function<void(storage_error const&)> handler
, std::uint8_t const flags)
{
TORRENT_ASSERT(r.length <= m_disk_cache.block_size());
TORRENT_ASSERT(r.length <= 16 * 1024);
bool exceeded = false;
disk_buffer_holder buffer(*this, m_disk_cache.allocate_buffer(exceeded, o, "receive buffer"));
if (!buffer) throw std::bad_alloc();
std::memcpy(buffer.get(), buf, r.length);
disk_io_job* j = allocate_job(disk_io_job::write);
j->storage = storage->shared_from_this();
j->piece = r.piece;
@ -1690,7 +1695,7 @@ namespace libtorrent
// make the holder give up ownership of the buffer
// since the job was successfully queued up
buffer.release();
return;
return exceeded;
}
std::unique_lock<std::mutex> l(m_cache_mutex);
@ -1720,12 +1725,13 @@ namespace libtorrent
// if we added the block (regardless of whether we also
// issued a flush job or not), we're done.
return;
return exceeded;
}
l.unlock();
add_job(j);
buffer.release();
return exceeded;
}
void disk_io_thread::async_hash(storage_interface* storage
@ -3176,14 +3182,6 @@ namespace libtorrent
submit_jobs();
}
disk_buffer_holder disk_io_thread::allocate_disk_buffer(bool& exceeded
, std::shared_ptr<disk_observer> o
, char const* category)
{
char* ret = m_disk_cache.allocate_buffer(exceeded, o, category);
return disk_buffer_holder(*this, ret);
}
void disk_io_thread::add_completed_jobs(jobqueue_t& jobs)
{
jobqueue_t new_completed_jobs;

View File

@ -2588,39 +2588,6 @@ namespace libtorrent
// -----------------------------
void peer_connection::incoming_piece(peer_request const& p, char const* data)
{
TORRENT_ASSERT(is_single_thread());
bool exceeded = false;
disk_buffer_holder buffer
= m_allocator.allocate_disk_buffer(exceeded, self(), "receive buffer");
if (!buffer)
{
disconnect(errors::no_memory, op_alloc_recvbuf);
return;
}
// every peer is entitled to have two disk blocks allocated at any given
// time, regardless of whether the cache size is exceeded or not. If this
// was not the case, when the cache size setting is very small, most peers
// would be blocked most of the time, because the disk cache would
// continuously be in exceeded state. Only rarely would it actually drop
// down to 0 and unblock all peers.
if (exceeded && m_outstanding_writing_bytes > 0)
{
if ((m_channel_state[download_channel] & peer_info::bw_disk) == 0)
m_counters.inc_stats_counter(counters::num_peers_down_disk);
m_channel_state[download_channel] |= peer_info::bw_disk;
#ifndef TORRENT_DISABLE_LOGGING
peer_log(peer_log_alert::info, "DISK", "exceeded disk buffer watermark");
#endif
}
std::memcpy(buffer.get(), data, p.length);
incoming_piece(p, std::move(buffer));
}
void peer_connection::incoming_piece(peer_request const& p, disk_buffer_holder data)
{
TORRENT_ASSERT(is_single_thread());
INVARIANT_CHECK;
@ -2636,7 +2603,7 @@ namespace libtorrent
if (m_remote.address().is_v4()
&& (m_remote.address().to_v4().to_ulong() & 0xf) == 0)
{
data.get()[0] = ~data.get()[0];
data[0] = ~data[0];
}
#endif
@ -2654,7 +2621,7 @@ namespace libtorrent
#ifndef TORRENT_DISABLE_EXTENSIONS
for (auto const& e : m_extensions)
{
if (e->on_piece(p, {data.get(), size_t(p.length)}))
if (e->on_piece(p, {data, size_t(p.length)}))
{
#if TORRENT_USE_ASSERTS
TORRENT_ASSERT(m_received_in_piece == p.length);
@ -2828,10 +2795,26 @@ namespace libtorrent
if (t->is_deleted()) return;
m_disk_thread.async_write(&t->storage(), p, std::move(data)
bool const exceeded = m_disk_thread.async_write(&t->storage(), p, data, self()
, std::bind(&peer_connection::on_disk_write_complete
, self(), _1, p, t));
// every peer is entitled to have two disk blocks allocated at any given
// time, regardless of whether the cache size is exceeded or not. If this
// was not the case, when the cache size setting is very small, most peers
// would be blocked most of the time, because the disk cache would
// continuously be in exceeded state. Only rarely would it actually drop
// down to 0 and unblock all peers.
if (exceeded && m_outstanding_writing_bytes > 0)
{
if ((m_channel_state[download_channel] & peer_info::bw_disk) == 0)
m_counters.inc_stats_counter(counters::num_peers_down_disk);
m_channel_state[download_channel] |= peer_info::bw_disk;
#ifndef TORRENT_DISABLE_LOGGING
peer_log(peer_log_alert::info, "DISK", "exceeded disk buffer watermark");
#endif
}
std::int64_t const write_queue_size = m_counters.inc_stats_counter(
counters::queued_write_bytes, p.length);
m_outstanding_writing_bytes += p.length;

View File

@ -6683,23 +6683,11 @@ namespace aux {
m_blocks_to_reclaim.clear();
}
disk_buffer_holder session_impl::allocate_disk_buffer(char const* category)
{
return m_disk_thread.allocate_disk_buffer(category);
}
void session_impl::free_disk_buffer(char* buf)
{
m_disk_thread.free_disk_buffer(buf);
}
disk_buffer_holder session_impl::allocate_disk_buffer(bool& exceeded
, std::shared_ptr<disk_observer> o
, char const* category)
{
return m_disk_thread.allocate_disk_buffer(exceeded, o, category);
}
char* session_impl::allocate_buffer()
{
TORRENT_ASSERT(is_single_thread());

View File

@ -1213,6 +1213,28 @@ namespace libtorrent
}
}
struct piece_refcount
{
piece_refcount(piece_picker& p, piece_index_t piece)
: m_picker(p)
, m_piece(piece)
{
m_picker.inc_refcount(m_piece, nullptr);
}
piece_refcount(piece_refcount const&) = delete;
piece_refcount& operator=(piece_refcount const&) = delete;
~piece_refcount()
{
m_picker.dec_refcount(m_piece, nullptr);
}
private:
piece_picker& m_picker;
piece_index_t m_piece;
};
// TODO: 3 there's some duplication between this function and
// peer_connection::incoming_piece(). is there a way to merge something?
void torrent::add_piece(piece_index_t const piece, char const* data, int const flags)
@ -1235,25 +1257,17 @@ namespace libtorrent
peer_request p;
p.piece = piece;
p.start = 0;
picker().inc_refcount(piece, nullptr);
piece_refcount refcount{picker(), piece};
for (int i = 0; i < blocks_in_piece; ++i, p.start += block_size())
{
if (picker().is_finished(piece_block(piece, i))
&& (flags & torrent::overwrite_existing) == 0)
continue;
p.length = (std::min)(piece_size - p.start, int(block_size()));
disk_buffer_holder buffer = m_ses.allocate_disk_buffer("add piece");
// out of memory
if (!buffer)
{
picker().dec_refcount(piece, nullptr);
return;
}
std::memcpy(buffer.get(), data + p.start, p.length);
p.length = std::min(piece_size - p.start, int(block_size()));
m_stats_counters.inc_stats_counter(counters::queued_write_bytes, p.length);
m_ses.disk_thread().async_write(&storage(), p, std::move(buffer)
m_ses.disk_thread().async_write(&storage(), p, data + p.start, nullptr
, std::bind(&torrent::on_disk_write_complete
, shared_from_this(), _1, p));
@ -1274,7 +1288,6 @@ namespace libtorrent
verify_piece(p.piece);
}
}
picker().dec_refcount(piece, nullptr);
}
void torrent::on_disk_write_complete(storage_error const& error