modernize disk_buffer_holder to be move-only (#824)

modernize disk_buffer_holder to be move-only
This commit is contained in:
Arvid Norberg 2016-06-16 08:24:41 -04:00 committed by GitHub
parent 2fed4103f8
commit 4d927f4029
13 changed files with 97 additions and 79 deletions

View File

@ -576,8 +576,8 @@ namespace libtorrent
// implements buffer_allocator_interface // implements buffer_allocator_interface
void free_disk_buffer(char* buf) override; void free_disk_buffer(char* buf) override;
char* allocate_disk_buffer(char const* category) override; disk_buffer_holder allocate_disk_buffer(char const* category) override;
char* allocate_disk_buffer(bool& exceeded disk_buffer_holder allocate_disk_buffer(bool& exceeded
, boost::shared_ptr<disk_observer> o , boost::shared_ptr<disk_observer> o
, char const* category) override; , char const* category) override;
void reclaim_block(block_cache_reference ref) override; void reclaim_block(block_cache_reference ref) override;

View File

@ -219,7 +219,7 @@ namespace libtorrent
void write_bitfield() override; void write_bitfield() override;
void write_have(int index) override; void write_have(int index) override;
void write_dont_have(int index) override; void write_dont_have(int index) override;
void write_piece(peer_request const& r, disk_buffer_holder& buffer) override; void write_piece(peer_request const& r, disk_buffer_holder buffer) override;
void write_keepalive() override; void write_keepalive() override;
void write_handshake(); void write_handshake();
#ifndef TORRENT_DISABLE_EXTENSIONS #ifndef TORRENT_DISABLE_EXTENSIONS

View File

@ -49,13 +49,14 @@ namespace libtorrent
{ {
struct disk_io_thread; struct disk_io_thread;
struct disk_observer; struct disk_observer;
struct disk_buffer_holder;
struct TORRENT_EXTRA_EXPORT buffer_allocator_interface struct TORRENT_EXTRA_EXPORT buffer_allocator_interface
{ {
virtual char* allocate_disk_buffer(char const* category) = 0;
virtual void free_disk_buffer(char* b) = 0; virtual void free_disk_buffer(char* b) = 0;
virtual void reclaim_block(block_cache_reference ref) = 0; virtual void reclaim_block(block_cache_reference ref) = 0;
virtual char* allocate_disk_buffer(bool& exceeded virtual disk_buffer_holder allocate_disk_buffer(char const* category) = 0;
virtual disk_buffer_holder allocate_disk_buffer(bool& exceeded
, boost::shared_ptr<disk_observer> o , boost::shared_ptr<disk_observer> o
, char const* category) = 0; , char const* category) = 0;
protected: protected:
@ -66,15 +67,19 @@ namespace libtorrent
// when it's destructed, unless it's released. ``release`` returns the disk // when it's destructed, unless it's released. ``release`` returns the disk
// buffer and transfers ownership and responsibility to free it to the caller. // buffer and transfers ownership and responsibility to free it to the caller.
// //
// A disk buffer is freed by passing it to ``session_impl::free_disk_buffer()``. // ``get()`` returns the pointer without transferring ownership. If
// // this buffer has been released, ``get()`` will return nullptr.
// ``get()`` returns the pointer without transferring responsibility. If
// this buffer has been released, ``buffer()`` will return 0.
struct TORRENT_EXPORT disk_buffer_holder struct TORRENT_EXPORT disk_buffer_holder
{ {
// internal // internal
disk_buffer_holder(buffer_allocator_interface& alloc, char* buf); disk_buffer_holder(buffer_allocator_interface& alloc, char* buf);
disk_buffer_holder& operator=(disk_buffer_holder&&);
disk_buffer_holder(disk_buffer_holder&&);
disk_buffer_holder& operator=(disk_buffer_holder const&) = delete;
disk_buffer_holder(disk_buffer_holder const&) = delete;
// construct a buffer holder that will free the held buffer // construct a buffer holder that will free the held buffer
// using a disk buffer pool directly (there's only one // using a disk buffer pool directly (there's only one
// disk_buffer_pool per session) // disk_buffer_pool per session)
@ -82,7 +87,7 @@ namespace libtorrent
// frees any unreleased disk buffer held by this object // frees any unreleased disk buffer held by this object
~disk_buffer_holder(); ~disk_buffer_holder();
// return the held disk buffer and clear it from the // return the held disk buffer and clear it from the
// holder. The responsibility to free it is passed on // holder. The responsibility to free it is passed on
// to the caller // to the caller
@ -100,25 +105,20 @@ namespace libtorrent
// swap pointers of two disk buffer holders. // swap pointers of two disk buffer holders.
void swap(disk_buffer_holder& h) void swap(disk_buffer_holder& h)
{ {
TORRENT_ASSERT(&h.m_allocator == &m_allocator); TORRENT_ASSERT(h.m_allocator == m_allocator);
std::swap(h.m_buf, m_buf); std::swap(h.m_buf, m_buf);
std::swap(h.m_ref, m_ref); std::swap(h.m_ref, m_ref);
} }
block_cache_reference ref() const { return m_ref; } block_cache_reference ref() const { return m_ref; }
typedef char* (disk_buffer_holder::*unspecified_bool_type)(); // implicitly convertible to true if the object is currently holding a
// buffer
// internal explicit operator bool() const { return m_buf != nullptr; }
operator unspecified_bool_type() const
{ return m_buf == 0? 0: &disk_buffer_holder::release; }
private: private:
// non-copyable
disk_buffer_holder& operator=(disk_buffer_holder const&);
disk_buffer_holder(disk_buffer_holder const*);
buffer_allocator_interface& m_allocator; buffer_allocator_interface* m_allocator;
char* m_buf; char* m_buf;
block_cache_reference m_ref; block_cache_reference m_ref;
}; };

View File

@ -56,7 +56,7 @@ namespace libtorrent
, boost::function<void(disk_io_job const*)> const& handler, void* requester , boost::function<void(disk_io_job const*)> const& handler, void* requester
, int flags = 0) = 0; , int flags = 0) = 0;
virtual void async_write(piece_manager* storage, peer_request const& r virtual void async_write(piece_manager* storage, peer_request const& r
, disk_buffer_holder& buffer , disk_buffer_holder buffer
, boost::function<void(disk_io_job const*)> const& handler , boost::function<void(disk_io_job const*)> const& handler
, int flags = 0) = 0; , int flags = 0) = 0;
virtual void async_hash(piece_manager* storage, int piece, int flags virtual void async_hash(piece_manager* storage, int piece, int flags

View File

@ -303,7 +303,7 @@ namespace libtorrent
, boost::function<void(disk_io_job const*)> const& handler, void* requester , boost::function<void(disk_io_job const*)> const& handler, void* requester
, int flags = 0) override; , int flags = 0) override;
void async_write(piece_manager* storage, peer_request const& r void async_write(piece_manager* storage, peer_request const& r
, disk_buffer_holder& buffer , disk_buffer_holder buffer
, boost::function<void(disk_io_job const*)> const& handler , boost::function<void(disk_io_job const*)> const& handler
, int flags = 0) override; , int flags = 0) override;
void async_hash(piece_manager* storage, int piece, int flags void async_hash(piece_manager* storage, int piece, int flags
@ -354,14 +354,14 @@ namespace libtorrent
// implements buffer_allocator_interface // implements buffer_allocator_interface
void reclaim_block(block_cache_reference ref) override; void reclaim_block(block_cache_reference ref) override;
void free_disk_buffer(char* buf) override { m_disk_cache.free_buffer(buf); } void free_disk_buffer(char* buf) override { m_disk_cache.free_buffer(buf); }
char* allocate_disk_buffer(char const* category) override disk_buffer_holder allocate_disk_buffer(char const* category) override
{ {
bool exceed = false; bool exceed = false;
return allocate_disk_buffer(exceed, boost::shared_ptr<disk_observer>(), category); return allocate_disk_buffer(exceed, boost::shared_ptr<disk_observer>(), category);
} }
void trigger_cache_trim(); void trigger_cache_trim();
char* allocate_disk_buffer(bool& exceeded, boost::shared_ptr<disk_observer> o disk_buffer_holder allocate_disk_buffer(bool& exceeded, boost::shared_ptr<disk_observer> o
, char const* category) override; , char const* category) override;
bool exceeded_cache_use() const bool exceeded_cache_use() const

View File

@ -574,7 +574,7 @@ namespace libtorrent
void incoming_dont_have(int piece_index); void incoming_dont_have(int piece_index);
void incoming_bitfield(bitfield const& bits); void incoming_bitfield(bitfield const& bits);
void incoming_request(peer_request const& r); void incoming_request(peer_request const& r);
void incoming_piece(peer_request const& p, disk_buffer_holder& data); void incoming_piece(peer_request const& p, disk_buffer_holder data);
void incoming_piece(peer_request const& p, char const* data); void incoming_piece(peer_request const& p, char const* data);
void incoming_piece_fragment(int bytes); void incoming_piece_fragment(int bytes);
void start_receive_piece(peer_request const& r); void start_receive_piece(peer_request const& r);
@ -730,7 +730,7 @@ namespace libtorrent
virtual void write_have(int index) = 0; virtual void write_have(int index) = 0;
virtual void write_dont_have(int index) = 0; virtual void write_dont_have(int index) = 0;
virtual void write_keepalive() = 0; virtual void write_keepalive() = 0;
virtual void write_piece(peer_request const& r, disk_buffer_holder& buffer) = 0; virtual void write_piece(peer_request const& r, disk_buffer_holder buffer) = 0;
virtual void write_suggest(int piece) = 0; virtual void write_suggest(int piece) = 0;
virtual void write_bitfield() = 0; virtual void write_bitfield() = 0;

View File

@ -108,7 +108,7 @@ namespace libtorrent
void write_cancel(peer_request const&) {} void write_cancel(peer_request const&) {}
void write_have(int) {} void write_have(int) {}
void write_dont_have(int) {} void write_dont_have(int) {}
void write_piece(peer_request const&, disk_buffer_holder&) void write_piece(peer_request const&, disk_buffer_holder)
{ TORRENT_ASSERT_FAIL(); } { TORRENT_ASSERT_FAIL(); }
void write_keepalive() {} void write_keepalive() {}
void on_connected(); void on_connected();

View File

@ -2472,7 +2472,7 @@ namespace libtorrent
} // anonymous namespace } // anonymous namespace
void bt_peer_connection::write_piece(peer_request const& r, disk_buffer_holder& buffer) void bt_peer_connection::write_piece(peer_request const& r, disk_buffer_holder buffer)
{ {
INVARIANT_CHECK; INVARIANT_CHECK;
@ -2532,7 +2532,7 @@ namespace libtorrent
send_buffer(msg, 13); send_buffer(msg, 13);
} }
if (buffer.ref().storage == 0) if (buffer.ref().storage == nullptr)
{ {
append_send_buffer(buffer.get(), r.length append_send_buffer(buffer.get(), r.length
, &buffer_free_disk_buf, &m_allocator); , &buffer_free_disk_buf, &m_allocator);

View File

@ -38,33 +38,48 @@ namespace libtorrent
{ {
disk_buffer_holder::disk_buffer_holder(buffer_allocator_interface& alloc, char* buf) disk_buffer_holder::disk_buffer_holder(buffer_allocator_interface& alloc, char* buf)
: m_allocator(alloc), m_buf(buf) : m_allocator(&alloc), m_buf(buf)
{ {
m_ref.storage = 0; m_ref.storage = nullptr;
m_ref.piece = -1; m_ref.piece = -1;
m_ref.block = -1; m_ref.block = -1;
} }
disk_buffer_holder::disk_buffer_holder(buffer_allocator_interface& alloc, disk_io_job const& j) disk_buffer_holder& disk_buffer_holder::operator=(disk_buffer_holder&& h)
: m_allocator(alloc), m_buf(j.buffer.disk_block), m_ref(j.d.io.ref)
{ {
TORRENT_ASSERT(m_ref.storage == 0 || m_ref.piece >= 0); disk_buffer_holder(std::move(h)).swap(*this);
TORRENT_ASSERT(m_ref.storage == 0 || m_ref.block >= 0); return *this;
TORRENT_ASSERT(m_ref.storage == 0 || m_ref.piece < static_cast<piece_manager*>(m_ref.storage)->files()->num_pieces()); }
TORRENT_ASSERT(m_ref.storage == 0 || m_ref.block <= static_cast<piece_manager*>(m_ref.storage)->files()->piece_length() / 0x4000);
disk_buffer_holder::disk_buffer_holder(disk_buffer_holder&& h)
: m_allocator(h.m_allocator), m_buf(h.m_buf), m_ref(h.m_ref)
{
// we own this buffer now
h.release();
}
disk_buffer_holder::disk_buffer_holder(buffer_allocator_interface& alloc, disk_io_job const& j)
: m_allocator(&alloc), m_buf(j.buffer.disk_block), m_ref(j.d.io.ref)
{
TORRENT_ASSERT(m_ref.storage == nullptr || m_ref.piece >= 0);
TORRENT_ASSERT(m_ref.storage == nullptr || m_ref.block >= 0);
TORRENT_ASSERT(m_ref.storage == nullptr
|| m_ref.piece < static_cast<piece_manager*>(m_ref.storage)->files()->num_pieces());
TORRENT_ASSERT(m_ref.storage == nullptr
|| m_ref.block <= static_cast<piece_manager*>(m_ref.storage)->files()->piece_length() / 0x4000);
TORRENT_ASSERT(j.action != disk_io_job::rename_file); TORRENT_ASSERT(j.action != disk_io_job::rename_file);
TORRENT_ASSERT(j.action != disk_io_job::move_storage); TORRENT_ASSERT(j.action != disk_io_job::move_storage);
} }
void disk_buffer_holder::reset(disk_io_job const& j) void disk_buffer_holder::reset(disk_io_job const& j)
{ {
if (m_ref.storage) m_allocator.reclaim_block(m_ref); if (m_ref.storage) m_allocator->reclaim_block(m_ref);
else if (m_buf) m_allocator.free_disk_buffer(m_buf); else if (m_buf) m_allocator->free_disk_buffer(m_buf);
m_buf = j.buffer.disk_block; m_buf = j.buffer.disk_block;
m_ref = j.d.io.ref; m_ref = j.d.io.ref;
TORRENT_ASSERT(m_ref.piece >= 0); TORRENT_ASSERT(m_ref.piece >= 0);
TORRENT_ASSERT(m_ref.storage != 0); TORRENT_ASSERT(m_ref.storage != nullptr);
TORRENT_ASSERT(m_ref.block >= 0); TORRENT_ASSERT(m_ref.block >= 0);
TORRENT_ASSERT(m_ref.piece < static_cast<piece_manager*>(m_ref.storage)->files()->num_pieces()); TORRENT_ASSERT(m_ref.piece < static_cast<piece_manager*>(m_ref.storage)->files()->num_pieces());
TORRENT_ASSERT(m_ref.block <= static_cast<piece_manager*>(m_ref.storage)->files()->piece_length() / 0x4000); TORRENT_ASSERT(m_ref.block <= static_cast<piece_manager*>(m_ref.storage)->files()->piece_length() / 0x4000);
@ -72,19 +87,19 @@ namespace libtorrent
TORRENT_ASSERT(j.action != disk_io_job::move_storage); TORRENT_ASSERT(j.action != disk_io_job::move_storage);
} }
void disk_buffer_holder::reset(char* buf) void disk_buffer_holder::reset(char* const buf)
{ {
if (m_ref.storage) m_allocator.reclaim_block(m_ref); if (m_ref.storage) m_allocator->reclaim_block(m_ref);
else if (m_buf) m_allocator.free_disk_buffer(m_buf); else if (m_buf) m_allocator->free_disk_buffer(m_buf);
m_buf = buf; m_buf = buf;
m_ref.storage = 0; m_ref.storage = nullptr;
} }
char* disk_buffer_holder::release() char* disk_buffer_holder::release()
{ {
char* ret = m_buf; char* ret = m_buf;
m_buf = 0; m_buf = nullptr;
m_ref.storage = 0; m_ref.storage = nullptr;
return ret; return ret;
} }

View File

@ -1590,7 +1590,7 @@ namespace libtorrent
} }
void disk_io_thread::async_write(piece_manager* storage, peer_request const& r void disk_io_thread::async_write(piece_manager* storage, peer_request const& r
, disk_buffer_holder& buffer , disk_buffer_holder buffer
, boost::function<void(disk_io_job const*)> const& handler , boost::function<void(disk_io_job const*)> const& handler
, int flags) , int flags)
{ {
@ -1672,24 +1672,28 @@ namespace libtorrent
// if the buffer was successfully added to the cache // if the buffer was successfully added to the cache
// our holder should no longer own it // our holder should no longer own it
if (dpe) buffer.release(); if (dpe)
if (dpe && dpe->outstanding_flush == 0)
{ {
dpe->outstanding_flush = 1; buffer.release();
l.unlock();
// the block and write job were successfully inserted if (dpe->outstanding_flush == 0)
// into the cache. Now, see if we should trigger a flush {
j = allocate_job(disk_io_job::flush_hashed); dpe->outstanding_flush = 1;
j->storage = storage->shared_from_this(); l.unlock();
j->piece = r.piece;
j->flags = flags; // the block and write job were successfully inserted
add_job(j); // into the cache. Now, see if we should trigger a flush
j = allocate_job(disk_io_job::flush_hashed);
j->storage = storage->shared_from_this();
j->piece = r.piece;
j->flags = flags;
add_job(j);
}
// if we added the block (regardless of whether we also
// issued a flush job or not), we're done.
return;
} }
// if we added the block (regardless of whether we also
// issued a flush job or not), we're done.
if (dpe) return;
l.unlock(); l.unlock();
add_job(j); add_job(j);
@ -3330,12 +3334,12 @@ namespace libtorrent
submit_jobs(); submit_jobs();
} }
char* disk_io_thread::allocate_disk_buffer(bool& exceeded disk_buffer_holder disk_io_thread::allocate_disk_buffer(bool& exceeded
, boost::shared_ptr<disk_observer> o , boost::shared_ptr<disk_observer> o
, char const* category) , char const* category)
{ {
char* ret = m_disk_cache.allocate_buffer(exceeded, o, category); char* ret = m_disk_cache.allocate_buffer(exceeded, o, category);
return ret; return disk_buffer_holder(*this, ret);
} }
void disk_io_thread::add_completed_job(disk_io_job* j) void disk_io_thread::add_completed_job(disk_io_job* j)

View File

@ -2611,9 +2611,10 @@ namespace libtorrent
{ {
TORRENT_ASSERT(is_single_thread()); TORRENT_ASSERT(is_single_thread());
bool exceeded = false; bool exceeded = false;
char* buffer = m_allocator.allocate_disk_buffer(exceeded, self(), "receive buffer"); disk_buffer_holder buffer
= m_allocator.allocate_disk_buffer(exceeded, self(), "receive buffer");
if (buffer == 0) if (!buffer)
{ {
disconnect(errors::no_memory, op_alloc_recvbuf); disconnect(errors::no_memory, op_alloc_recvbuf);
return; return;
@ -2635,12 +2636,11 @@ namespace libtorrent
#endif #endif
} }
disk_buffer_holder holder(m_allocator, buffer); std::memcpy(buffer.get(), data, p.length);
std::memcpy(buffer, data, p.length); incoming_piece(p, std::move(buffer));
incoming_piece(p, holder);
} }
void peer_connection::incoming_piece(peer_request const& p, disk_buffer_holder& data) void peer_connection::incoming_piece(peer_request const& p, disk_buffer_holder data)
{ {
TORRENT_ASSERT(is_single_thread()); TORRENT_ASSERT(is_single_thread());
INVARIANT_CHECK; INVARIANT_CHECK;
@ -2849,7 +2849,7 @@ namespace libtorrent
return; return;
} }
t->inc_refcount("async_write"); t->inc_refcount("async_write");
m_disk_thread.async_write(&t->storage(), p, data m_disk_thread.async_write(&t->storage(), p, std::move(data)
, std::bind(&peer_connection::on_disk_write_complete , std::bind(&peer_connection::on_disk_write_complete
, self(), _1, p, t)); , self(), _1, p, t));
@ -5359,7 +5359,7 @@ namespace libtorrent
{ {
t->add_suggest_piece(r.piece); t->add_suggest_piece(r.piece);
} }
write_piece(r, buffer); write_piece(r, std::move(buffer));
} }
void peer_connection::assign_bandwidth(int channel, int amount) void peer_connection::assign_bandwidth(int channel, int amount)

View File

@ -6675,7 +6675,7 @@ namespace aux {
m_disk_thread.reclaim_block(ref); m_disk_thread.reclaim_block(ref);
} }
char* session_impl::allocate_disk_buffer(char const* category) disk_buffer_holder session_impl::allocate_disk_buffer(char const* category)
{ {
return m_disk_thread.allocate_disk_buffer(category); return m_disk_thread.allocate_disk_buffer(category);
} }
@ -6685,7 +6685,7 @@ namespace aux {
m_disk_thread.free_disk_buffer(buf); m_disk_thread.free_disk_buffer(buf);
} }
char* session_impl::allocate_disk_buffer(bool& exceeded disk_buffer_holder session_impl::allocate_disk_buffer(bool& exceeded
, boost::shared_ptr<disk_observer> o , boost::shared_ptr<disk_observer> o
, char const* category) , char const* category)
{ {

View File

@ -1299,15 +1299,14 @@ namespace libtorrent
continue; continue;
p.length = (std::min)(piece_size - p.start, int(block_size())); p.length = (std::min)(piece_size - p.start, int(block_size()));
char* buffer = m_ses.allocate_disk_buffer("add piece"); disk_buffer_holder buffer = m_ses.allocate_disk_buffer("add piece");
// out of memory // out of memory
if (buffer == 0) if (!buffer)
{ {
picker().dec_refcount(piece, 0); picker().dec_refcount(piece, 0);
return; return;
} }
disk_buffer_holder holder(m_ses, buffer); std::memcpy(buffer.get(), data + p.start, p.length);
std::memcpy(buffer, data + p.start, p.length);
if (!need_loaded()) if (!need_loaded())
{ {
@ -1316,7 +1315,7 @@ namespace libtorrent
return; return;
} }
inc_refcount("add_piece"); inc_refcount("add_piece");
m_ses.disk_thread().async_write(&storage(), p, holder m_ses.disk_thread().async_write(&storage(), p, std::move(buffer)
, std::bind(&torrent::on_disk_write_complete , std::bind(&torrent::on_disk_write_complete
, shared_from_this(), _1, p)); , shared_from_this(), _1, p));
piece_block block(piece, i); piece_block block(piece, i);