start chipping away at removing disk_io_job from the disk_interface

This commit is contained in:
arvidn 2016-11-20 23:58:48 -05:00 committed by Arvid Norberg
parent 8011ab57be
commit fc9dabe1ab
11 changed files with 222 additions and 151 deletions

View File

@ -60,14 +60,14 @@ namespace libtorrent
}; };
virtual void async_read(storage_interface* storage, peer_request const& r virtual void async_read(storage_interface* storage, peer_request const& r
, std::function<void(disk_io_job const*)> handler, void* requester , std::function<void(aux::block_cache_reference ref, char* block
, int flags = 0) = 0; , int flags, storage_error const& se)> handler, void* requester, int flags = 0) = 0;
virtual void async_write(storage_interface* storage, peer_request const& r virtual void async_write(storage_interface* storage, peer_request const& r
, disk_buffer_holder buffer , disk_buffer_holder buffer
, std::function<void(disk_io_job const*)> handler , std::function<void(storage_error const&)> handler
, int flags = 0) = 0; , int flags = 0) = 0;
virtual void async_hash(storage_interface* storage, int piece, int flags virtual void async_hash(storage_interface* storage, int piece, int flags
, std::function<void(disk_io_job const*)> handler, void* requester) = 0; , std::function<void(int, int, sha1_hash const&, storage_error const&)> handler, void* requester) = 0;
virtual void async_move_storage(storage_interface* storage, std::string const& p, int flags virtual void async_move_storage(storage_interface* storage, std::string const& p, int flags
, std::function<void(disk_io_job const*)> handler) = 0; , std::function<void(disk_io_job const*)> handler) = 0;
virtual void async_release_files(storage_interface* storage virtual void async_release_files(storage_interface* storage

View File

@ -35,7 +35,13 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/error_code.hpp" #include "libtorrent/error_code.hpp"
#include "libtorrent/tailqueue.hpp" #include "libtorrent/tailqueue.hpp"
#include "libtorrent/peer_request.hpp"
#include "libtorrent/aux_/block_cache_reference.hpp" #include "libtorrent/aux_/block_cache_reference.hpp"
#include "libtorrent/sha1_hash.hpp"
#include "libtorrent/aux_/disable_warnings_push.hpp"
#include <boost/variant/variant.hpp>
#include "libtorrent/aux_/disable_warnings_pop.hpp"
#include <string> #include <string>
#include <vector> #include <vector>
@ -68,6 +74,8 @@ namespace libtorrent
disk_io_job(disk_io_job const&) = delete; disk_io_job(disk_io_job const&) = delete;
disk_io_job& operator=(disk_io_job const&) = delete; disk_io_job& operator=(disk_io_job const&) = delete;
void call_callback();
enum action_t enum action_t
{ {
read read
@ -144,7 +152,14 @@ namespace libtorrent
std::shared_ptr<storage_interface> storage; std::shared_ptr<storage_interface> storage;
// this is called when operation completes // this is called when operation completes
std::function<void(disk_io_job const*)> callback;
using read_handler = std::function<void(aux::block_cache_reference ref
, char* block, int flags, storage_error const& se)>;
using write_handler = std::function<void(storage_error const&)>;
using hash_handler = std::function<void(int, int, sha1_hash const&, storage_error const&)>;
using generic_handler = std::function<void(disk_io_job const*)>;
boost::variant<read_handler, write_handler, hash_handler, generic_handler> callback;
// the error code from the file operation // the error code from the file operation
// on error, this also contains the path of the // on error, this also contains the path of the

View File

@ -292,14 +292,14 @@ namespace libtorrent
void abort(bool wait); void abort(bool wait);
void async_read(storage_interface* storage, peer_request const& r void async_read(storage_interface* storage, peer_request const& r
, std::function<void(disk_io_job const*)> handler, void* requester , std::function<void(aux::block_cache_reference ref, char* block
, int flags = 0) override; , int flags, storage_error const& se)> handler, void* requester, int flags = 0) override;
void async_write(storage_interface* storage, peer_request const& r void async_write(storage_interface* storage, peer_request const& r
, disk_buffer_holder buffer , disk_buffer_holder buffer
, std::function<void(disk_io_job const*)> handler , std::function<void(storage_error const&)> handler
, int flags = 0) override; , int flags = 0) override;
void async_hash(storage_interface* storage, int piece, int flags void async_hash(storage_interface* storage, int piece, int flags
, std::function<void(disk_io_job const*)> handler, void* requester) override; , std::function<void(int, int, sha1_hash const&, storage_error const&)> handler, void* requester) override;
void async_move_storage(storage_interface* storage, std::string const& p, int flags void async_move_storage(storage_interface* storage, std::string const& p, int flags
, std::function<void(disk_io_job const*)> handler) override; , std::function<void(disk_io_job const*)> handler) override;
void async_release_files(storage_interface* storage void async_release_files(storage_interface* storage

View File

@ -760,11 +760,13 @@ namespace libtorrent
void do_update_interest(); void do_update_interest();
void fill_send_buffer(); void fill_send_buffer();
void on_disk_read_complete(disk_io_job const* j, peer_request r void on_disk_read_complete(aux::block_cache_reference ref
, char* disk_block, int flags, storage_error const& error, peer_request r
, time_point issue_time); , time_point issue_time);
void on_disk_write_complete(disk_io_job const* j void on_disk_write_complete(storage_error const& error
, peer_request r, std::shared_ptr<torrent> t); , peer_request r, std::shared_ptr<torrent> t);
void on_seed_mode_hashed(disk_io_job const* j); void on_seed_mode_hashed(int status, int piece
, sha1_hash const& piece_hash, storage_error const& error);
int request_timeout() const; int request_timeout() const;
void check_graceful_pause(); void check_graceful_pause();

View File

@ -353,7 +353,8 @@ namespace libtorrent
void on_resume_data_checked(disk_io_job const* j); void on_resume_data_checked(disk_io_job const* j);
void on_force_recheck(disk_io_job const* j); void on_force_recheck(disk_io_job const* j);
void on_piece_hashed(disk_io_job const* j); void on_piece_hashed(int status, int piece, sha1_hash const& piece_hash
, storage_error const& error);
void files_checked(); void files_checked();
void start_checking(); void start_checking();
@ -378,7 +379,7 @@ namespace libtorrent
enum flags_t { overwrite_existing = 1 }; enum flags_t { overwrite_existing = 1 };
void add_piece(int piece, char const* data, int flags = 0); void add_piece(int piece, char const* data, int flags = 0);
void on_disk_write_complete(disk_io_job const* j void on_disk_write_complete(storage_error const& error
, peer_request p); , peer_request p);
void on_disk_tick_done(disk_io_job const* j); void on_disk_tick_done(disk_io_job const* j);
@ -391,8 +392,9 @@ namespace libtorrent
error_code error; error_code error;
}; };
void read_piece(int piece); void read_piece(int piece);
void on_disk_read_complete(disk_io_job const* j, peer_request r void on_disk_read_complete(aux::block_cache_reference ref
, std::shared_ptr<read_piece_struct> rp); , char* block, int flags, storage_error const& se
, peer_request r, std::shared_ptr<read_piece_struct> rp);
storage_mode_t storage_mode() const; storage_mode_t storage_mode() const;
storage_interface* get_storage(); storage_interface* get_storage();
@ -867,7 +869,8 @@ namespace libtorrent
void resume_download(); void resume_download();
void verify_piece(int piece); void verify_piece(int piece);
void on_piece_verified(disk_io_job const* j); void on_piece_verified(int const status, int const piece
, sha1_hash const& piece_hash, storage_error const& error);
// this is called whenever a peer in this swarm becomes interesting // this is called whenever a peer in this swarm becomes interesting
// it is responsible for issuing a block request, if appropriate // it is responsible for issuing a block request, if appropriate

View File

@ -158,34 +158,42 @@ namespace libtorrent
} }
} }
void on_hash(disk_io_job const* j, create_torrent* t struct hash_state
, std::shared_ptr<storage_interface> storage, disk_io_thread* iothread
, int* piece_counter, int* completed_piece
, std::function<void(int)> const* f, error_code* ec)
{ {
if (j->ret != 0) create_torrent& ct;
std::shared_ptr<storage_interface> storage;
disk_io_thread& iothread;
int piece_counter;
int completed_piece;
std::function<void(int)> const& f;
error_code& ec;
};
void on_hash(int const status, int const piece, sha1_hash const& piece_hash
, storage_error const& error, hash_state* st)
{
if (status != 0)
{ {
// on error // on error
*ec = j->error.ec; st->ec = error.ec;
iothread->abort(true); st->iothread.abort(true);
return; return;
} }
t->set_hash(j->piece, sha1_hash(j->d.piece_hash)); st->ct.set_hash(piece, sha1_hash(piece_hash));
(*f)(*completed_piece); st->f(st->completed_piece);
++(*completed_piece); ++st->completed_piece;
if (*piece_counter < t->num_pieces()) if (st->piece_counter < st->ct.num_pieces())
{ {
iothread->async_hash(storage.get(), *piece_counter st->iothread.async_hash(st->storage.get(), st->piece_counter
, disk_io_job::sequential_access , disk_io_job::sequential_access
, std::bind(&on_hash, _1, t, storage, iothread , std::bind(&on_hash, _1, _2, _3, _4, st), nullptr);
, piece_counter, completed_piece, f, ec), nullptr); ++st->piece_counter;
++(*piece_counter);
} }
else else
{ {
iothread->abort(true); st->iothread.abort(true);
} }
iothread->submit_jobs(); st->iothread.submit_jobs();
} }
} // anonymous namespace } // anonymous namespace
@ -283,18 +291,16 @@ namespace libtorrent
alert_manager dummy2(0, 0); alert_manager dummy2(0, 0);
disk_thread.set_settings(&sett, dummy2); disk_thread.set_settings(&sett, dummy2);
int piece_counter = 0;
int completed_piece = 0;
int piece_read_ahead = 15 * 1024 * 1024 / t.piece_length(); int piece_read_ahead = 15 * 1024 * 1024 / t.piece_length();
if (piece_read_ahead < 1) piece_read_ahead = 1; if (piece_read_ahead < 1) piece_read_ahead = 1;
hash_state st = { t, storage, disk_thread, 0, 0, f, ec };
for (int i = 0; i < piece_read_ahead; ++i) for (int i = 0; i < piece_read_ahead; ++i)
{ {
disk_thread.async_hash(storage.get(), i, disk_io_job::sequential_access disk_thread.async_hash(storage.get(), i, disk_io_job::sequential_access
, std::bind(&on_hash, _1, &t, storage, &disk_thread , std::bind(&on_hash, _1, _2, _3, _4, &st), nullptr);
, &piece_counter, &completed_piece, &f, &ec), nullptr); ++st.piece_counter;
++piece_counter; if (st.piece_counter >= t.num_pieces()) break;
if (piece_counter >= t.num_pieces()) break;
} }
disk_thread.submit_jobs(); disk_thread.submit_jobs();
ios.run(ec); ios.run(ec);

View File

@ -35,6 +35,40 @@ POSSIBILITY OF SUCH DAMAGE.
namespace libtorrent namespace libtorrent
{ {
namespace {
struct caller_visitor : boost::static_visitor<>
{
explicit caller_visitor(disk_io_job& j) : m_job(j) {}
void operator()(disk_io_job::read_handler& h) const
{
if (!h) return;
h(m_job.d.io.ref, m_job.buffer.disk_block, m_job.flags, m_job.error);
}
void operator()(disk_io_job::write_handler& h) const
{
if (!h) return;
h(m_job.error);
}
void operator()(disk_io_job::hash_handler& h) const
{
if (!h) return;
h(m_job.ret, m_job.piece, sha1_hash(m_job.d.piece_hash), m_job.error);
}
void operator()(disk_io_job::generic_handler& h) const
{
if (!h) return;
h(&m_job);
}
private:
disk_io_job& m_job;
};
}
disk_io_job::disk_io_job() disk_io_job::disk_io_job()
: piece(0) : piece(0)
, action(read) , action(read)
@ -53,6 +87,11 @@ namespace libtorrent
free(buffer.string); free(buffer.string);
} }
void disk_io_job::call_callback()
{
boost::apply_visitor(caller_visitor(*this), callback);
}
bool disk_io_job::completed(cached_piece_entry const* pe, int block_size) bool disk_io_job::completed(cached_piece_entry const* pe, int block_size)
{ {
if (action != write) return false; if (action != write) return false;

View File

@ -1509,8 +1509,8 @@ namespace libtorrent
} }
void disk_io_thread::async_read(storage_interface* storage, peer_request const& r void disk_io_thread::async_read(storage_interface* storage, peer_request const& r
, std::function<void(disk_io_job const*)> handler, void* requester , std::function<void(aux::block_cache_reference ref, char* block
, int const flags) , int flags, storage_error const& se)> handler, void* requester, int const flags)
{ {
TORRENT_ASSERT(r.length <= m_disk_cache.block_size()); TORRENT_ASSERT(r.length <= m_disk_cache.block_size());
TORRENT_ASSERT(r.length <= 16 * 1024); TORRENT_ASSERT(r.length <= 16 * 1024);
@ -1534,7 +1534,7 @@ namespace libtorrent
switch (ret) switch (ret)
{ {
case 0: case 0:
if (j->callback) j->callback(j); j->call_callback();
free_job(j); free_job(j);
break; break;
case 1: case 1:
@ -1621,7 +1621,7 @@ namespace libtorrent
void disk_io_thread::async_write(storage_interface* storage, peer_request const& r void disk_io_thread::async_write(storage_interface* storage, peer_request const& r
, disk_buffer_holder buffer , disk_buffer_holder buffer
, std::function<void(disk_io_job const*)> handler , std::function<void(storage_error const&)> handler
, int const flags) , int const flags)
{ {
TORRENT_ASSERT(r.length <= m_disk_cache.block_size()); TORRENT_ASSERT(r.length <= m_disk_cache.block_size());
@ -1723,7 +1723,7 @@ namespace libtorrent
} }
void disk_io_thread::async_hash(storage_interface* storage, int piece, int flags void disk_io_thread::async_hash(storage_interface* storage, int piece, int flags
, std::function<void(disk_io_job const*)> handler, void* requester) , std::function<void(int, int, sha1_hash const&, storage_error const&)> handler, void* requester)
{ {
disk_io_job* j = allocate_job(disk_io_job::hash); disk_io_job* j = allocate_job(disk_io_job::hash);
j->storage = storage->shared_from_this(); j->storage = storage->shared_from_this();
@ -1752,7 +1752,7 @@ namespace libtorrent
#endif #endif
l.unlock(); l.unlock();
if (j->callback) j->callback(j); j->call_callback();
free_job(j); free_job(j);
return; return;
} }
@ -1903,7 +1903,7 @@ namespace libtorrent
if (m_abort) if (m_abort)
{ {
j->error.ec = boost::asio::error::operation_aborted; j->error.ec = boost::asio::error::operation_aborted;
if (j->callback) j->callback(j); j->call_callback();
free_job(j); free_job(j);
return; return;
} }
@ -3373,7 +3373,7 @@ namespace libtorrent
#if TORRENT_USE_ASSERTS #if TORRENT_USE_ASSERTS
j->callback_called = true; j->callback_called = true;
#endif #endif
if (j->callback) j->callback(j); j->call_callback();
to_delete[cnt++] = j; to_delete[cnt++] = j;
j = next; j = next;
if (cnt == to_delete.size()) if (cnt == to_delete.size())

View File

@ -2961,15 +2961,15 @@ namespace libtorrent
disconnect(errors::torrent_paused, op_bittorrent); disconnect(errors::torrent_paused, op_bittorrent);
} }
void peer_connection::on_disk_write_complete(disk_io_job const* j void peer_connection::on_disk_write_complete(storage_error const& error
, peer_request p, std::shared_ptr<torrent> t) , peer_request p, std::shared_ptr<torrent> t)
{ {
TORRENT_ASSERT(is_single_thread()); TORRENT_ASSERT(is_single_thread());
#ifndef TORRENT_DISABLE_LOGGING #ifndef TORRENT_DISABLE_LOGGING
if (should_log(peer_log_alert::info)) if (should_log(peer_log_alert::info))
{ {
peer_log(peer_log_alert::info, "FILE_ASYNC_WRITE_COMPLETE", "ret: %d piece: %d s: %x l: %x e: %s" peer_log(peer_log_alert::info, "FILE_ASYNC_WRITE_COMPLETE", "piece: %d s: %x l: %x e: %s"
, j->ret, p.piece, p.start, p.length, j->error.ec.message().c_str()); , p.piece, p.start, p.length, error.ec.message().c_str());
} }
#endif #endif
@ -2991,7 +2991,7 @@ namespace libtorrent
if (!t) if (!t)
{ {
disconnect(j->error.ec, op_file_write); disconnect(error.ec, op_file_write);
return; return;
} }
@ -3001,12 +3001,12 @@ namespace libtorrent
piece_block const block_finished(p.piece, p.start / t->block_size()); piece_block const block_finished(p.piece, p.start / t->block_size());
if (j->ret < 0) if (error)
{ {
// we failed to write j->piece to disk tell the piece picker // we failed to write the piece to disk tell the piece picker
// this will block any other peer from issuing requests // this will block any other peer from issuing requests
// to this piece, until we've cleared it. // to this piece, until we've cleared it.
if (j->error.ec == boost::asio::error::operation_aborted) if (error.ec == boost::asio::error::operation_aborted)
{ {
if (t->has_picker()) if (t->has_picker())
t->picker().mark_as_canceled(block_finished, nullptr); t->picker().mark_as_canceled(block_finished, nullptr);
@ -3024,7 +3024,7 @@ namespace libtorrent
// when this returns, all outstanding jobs to the // when this returns, all outstanding jobs to the
// piece are done, and we can restore it, allowing // piece are done, and we can restore it, allowing
// new requests to it // new requests to it
m_disk_thread.async_clear_piece(&t->storage(), j->piece m_disk_thread.async_clear_piece(&t->storage(), p.piece
, std::bind(&torrent::on_piece_fail_sync, t, _1, block_finished)); , std::bind(&torrent::on_piece_fail_sync, t, _1, block_finished));
} }
else else
@ -3033,24 +3033,22 @@ namespace libtorrent
// exit this function early, no need to keep the picker // exit this function early, no need to keep the picker
// state up-to-date, right? // state up-to-date, right?
disk_io_job sj; disk_io_job sj;
sj.piece = j->piece; sj.piece = p.piece;
t->on_piece_fail_sync(&sj, block_finished); t->on_piece_fail_sync(&sj, block_finished);
} }
} }
t->update_gauge(); t->update_gauge();
// handle_disk_error may disconnect us // handle_disk_error may disconnect us
t->handle_disk_error("write", j->error, this, torrent::disk_class::write); t->handle_disk_error("write", error, this, torrent::disk_class::write);
return; return;
} }
TORRENT_ASSERT(j->ret == p.length);
if (!t->has_picker()) return; if (!t->has_picker()) return;
piece_picker& picker = t->picker(); piece_picker& picker = t->picker();
TORRENT_ASSERT(p.piece == j->piece); TORRENT_ASSERT(p.piece == p.piece);
TORRENT_ASSERT(p.start == j->d.io.offset); TORRENT_ASSERT(p.start == p.start);
TORRENT_ASSERT(picker.num_peers(block_finished) == 0); TORRENT_ASSERT(picker.num_peers(block_finished) == 0);
// std::fprintf(stderr, "peer_connection mark_as_finished peer: %p piece: %d block: %d\n" // std::fprintf(stderr, "peer_connection mark_as_finished peer: %p piece: %d block: %d\n"
@ -5114,8 +5112,8 @@ namespace libtorrent
// this means we're in seed mode and we haven't yet // this means we're in seed mode and we haven't yet
// verified this piece (r.piece) // verified this piece (r.piece)
m_disk_thread.async_hash(&t->storage(), r.piece, 0 m_disk_thread.async_hash(&t->storage(), r.piece, 0
, std::bind(&peer_connection::on_seed_mode_hashed, self(), _1) , std::bind(&peer_connection::on_seed_mode_hashed, self()
, this); , _1, _2, _3, _4), this);
t->verifying(r.piece); t->verifying(r.piece);
continue; continue;
} }
@ -5151,7 +5149,7 @@ namespace libtorrent
m_disk_thread.async_read(&t->storage(), r m_disk_thread.async_read(&t->storage(), r
, std::bind(&peer_connection::on_disk_read_complete , std::bind(&peer_connection::on_disk_read_complete
, self(), _1, r, clock_type::now()), this); , self(), _1, _2, _3, _4, r, clock_type::now()), this);
} }
m_last_sent_payload = clock_type::now(); m_last_sent_payload = clock_type::now();
m_requests.erase(m_requests.begin() + i); m_requests.erase(m_requests.begin() + i);
@ -5168,7 +5166,8 @@ namespace libtorrent
// this is called when a previously unchecked piece has been // this is called when a previously unchecked piece has been
// checked, while in seed-mode // checked, while in seed-mode
void peer_connection::on_seed_mode_hashed(disk_io_job const* j) void peer_connection::on_seed_mode_hashed(int const
, int const piece, sha1_hash const& piece_hash, storage_error const& error)
{ {
TORRENT_ASSERT(is_single_thread()); TORRENT_ASSERT(is_single_thread());
INVARIANT_CHECK; INVARIANT_CHECK;
@ -5180,32 +5179,32 @@ namespace libtorrent
if (!t || t->is_aborted()) return; if (!t || t->is_aborted()) return;
if (j->error) if (error)
{ {
t->handle_disk_error("hash", j->error, this); t->handle_disk_error("hash", error, this);
t->leave_seed_mode(false); t->leave_seed_mode(false);
return; return;
} }
// we're using the piece hashes here, we need the torrent to be loaded // we're using the piece hashes here, we need the torrent to be loaded
if (!m_settings.get_bool(settings_pack::disable_hash_checks) if (!m_settings.get_bool(settings_pack::disable_hash_checks)
&& sha1_hash(j->d.piece_hash) != t->torrent_file().hash_for_piece(j->piece)) && piece_hash != t->torrent_file().hash_for_piece(piece))
{ {
#ifndef TORRENT_DISABLE_LOGGING #ifndef TORRENT_DISABLE_LOGGING
peer_log(peer_log_alert::info, "SEED_MODE_FILE_HASH" peer_log(peer_log_alert::info, "SEED_MODE_FILE_HASH"
, "piece: %d failed", j->piece); , "piece: %d failed", piece);
#endif #endif
t->leave_seed_mode(false); t->leave_seed_mode(false);
} }
else else
{ {
TORRENT_ASSERT(t->verifying_piece(j->piece)); TORRENT_ASSERT(t->verifying_piece(piece));
if (t->seed_mode()) t->verified(j->piece); if (t->seed_mode()) t->verified(piece);
#ifndef TORRENT_DISABLE_LOGGING #ifndef TORRENT_DISABLE_LOGGING
peer_log(peer_log_alert::info, "SEED_MODE_FILE_HASH" peer_log(peer_log_alert::info, "SEED_MODE_FILE_HASH"
, "piece: %d passed", j->piece); , "piece: %d passed", piece);
#endif #endif
if (t) if (t)
{ {
@ -5219,7 +5218,8 @@ namespace libtorrent
fill_send_buffer(); fill_send_buffer();
} }
void peer_connection::on_disk_read_complete(disk_io_job const* j void peer_connection::on_disk_read_complete(aux::block_cache_reference ref
, char* disk_block, int const flags, storage_error const& error
, peer_request r, time_point issue_time) , peer_request r, time_point issue_time)
{ {
TORRENT_ASSERT(is_single_thread()); TORRENT_ASSERT(is_single_thread());
@ -5233,35 +5233,35 @@ namespace libtorrent
if (should_log(peer_log_alert::info)) if (should_log(peer_log_alert::info))
{ {
peer_log(peer_log_alert::info, "FILE_ASYNC_READ_COMPLETE" peer_log(peer_log_alert::info, "FILE_ASYNC_READ_COMPLETE"
, "ret: %d piece: %d s: %x l: %x b: %p c: %s e: %s rtt: %d us" , "piece: %d s: %x l: %x b: %p c: %s e: %s rtt: %d us"
, j->ret, r.piece, r.start, r.length , r.piece, r.start, r.length
, static_cast<void*>(j->buffer.disk_block) , static_cast<void*>(disk_block)
, (j->flags & disk_io_job::cache_hit ? "cache hit" : "cache miss") , (flags & disk_io_job::cache_hit ? "cache hit" : "cache miss")
, j->error.ec.message().c_str(), disk_rtt); , error.ec.message().c_str(), disk_rtt);
} }
#endif #endif
m_reading_bytes -= r.length; m_reading_bytes -= r.length;
std::shared_ptr<torrent> t = m_torrent.lock(); std::shared_ptr<torrent> t = m_torrent.lock();
if (j->ret < 0) if (error)
{ {
if (!t) if (!t)
{ {
disconnect(j->error.ec, op_file_read); disconnect(error.ec, op_file_read);
return; return;
} }
TORRENT_ASSERT(j->buffer.disk_block == nullptr); TORRENT_ASSERT(disk_block == nullptr);
write_dont_have(r.piece); write_dont_have(r.piece);
write_reject_request(r); write_reject_request(r);
if (t->alerts().should_post<file_error_alert>()) if (t->alerts().should_post<file_error_alert>())
t->alerts().emplace_alert<file_error_alert>(j->error.ec t->alerts().emplace_alert<file_error_alert>(error.ec
, t->resolve_filename(j->error.file) , t->resolve_filename(error.file)
, j->error.operation_str(), t->get_handle()); , error.operation_str(), t->get_handle());
++m_disk_read_failures; ++m_disk_read_failures;
if (m_disk_read_failures > 100) disconnect(j->error.ec, op_file_read); if (m_disk_read_failures > 100) disconnect(error.ec, op_file_read);
return; return;
} }
@ -5270,12 +5270,10 @@ namespace libtorrent
// block, the peer is still useful // block, the peer is still useful
m_disk_read_failures = 0; m_disk_read_failures = 0;
TORRENT_ASSERT(j->ret == r.length);
// even if we're disconnecting, we need to free this block // even if we're disconnecting, we need to free this block
// otherwise the disk thread will hang, waiting for the network // otherwise the disk thread will hang, waiting for the network
// thread to be done with it // thread to be done with it
disk_buffer_holder buffer(m_allocator, j->d.io.ref, j->buffer.disk_block); disk_buffer_holder buffer(m_allocator, ref, disk_block);
if (t && m_settings.get_int(settings_pack::suggest_mode) if (t && m_settings.get_int(settings_pack::suggest_mode)
== settings_pack::suggest_read_cache) == settings_pack::suggest_read_cache)
@ -5290,14 +5288,7 @@ namespace libtorrent
if (!t) if (!t)
{ {
disconnect(j->error.ec, op_file_read); disconnect(error.ec, op_file_read);
return;
}
if (j->ret != r.length)
{
// handle_disk_error may disconnect us
t->handle_disk_error("read", j->error, this);
return; return;
} }
@ -5312,7 +5303,7 @@ namespace libtorrent
// if it's rare enough to make it into the suggested piece // if it's rare enough to make it into the suggested piece
// push another piece out // push another piece out
if (m_settings.get_int(settings_pack::suggest_mode) == settings_pack::suggest_read_cache if (m_settings.get_int(settings_pack::suggest_mode) == settings_pack::suggest_read_cache
&& (j->flags & disk_io_job::cache_hit) == 0) && (flags & disk_io_job::cache_hit) == 0)
{ {
t->add_suggest_piece(r.piece); t->add_suggest_piece(r.piece);
} }

View File

@ -95,7 +95,7 @@ namespace
{ {
m_torrent.session().disk_thread().async_read(&m_torrent.storage() m_torrent.session().disk_thread().async_read(&m_torrent.storage()
, r, std::bind(&smart_ban_plugin::on_read_ok_block , r, std::bind(&smart_ban_plugin::on_read_ok_block
, shared_from_this(), *i, i->second.peer->address(), _1) , shared_from_this(), *i, i->second.peer->address(), _1, _2, _3, _4)
, reinterpret_cast<void*>(1)); , reinterpret_cast<void*>(1));
m_block_hashes.erase(i++); m_block_hashes.erase(i++);
} }
@ -152,7 +152,7 @@ namespace
// block read will have been deleted by the time it gets back to the network thread // block read will have been deleted by the time it gets back to the network thread
m_torrent.session().disk_thread().async_read(&m_torrent.storage(), r m_torrent.session().disk_thread().async_read(&m_torrent.storage(), r
, std::bind(&smart_ban_plugin::on_read_failed_block , std::bind(&smart_ban_plugin::on_read_failed_block
, shared_from_this(), pb, (*i)->address(), _1) , shared_from_this(), pb, (*i)->address(), _1, _2, _3, _4)
, reinterpret_cast<torrent_peer*>(1) , reinterpret_cast<torrent_peer*>(1)
, disk_io_job::force_copy); , disk_io_job::force_copy);
} }
@ -175,17 +175,21 @@ namespace
sha1_hash digest; sha1_hash digest;
}; };
void on_read_failed_block(piece_block b, address a, disk_io_job const* j) void on_read_failed_block(piece_block b, address a
, aux::block_cache_reference ref, char* disk_block, int
, storage_error const& error)
{ {
TORRENT_ASSERT(m_torrent.session().is_single_thread()); TORRENT_ASSERT(m_torrent.session().is_single_thread());
disk_buffer_holder buffer(m_torrent.session(), j->d.io.ref, j->buffer.disk_block); disk_buffer_holder buffer(m_torrent.session(), ref, disk_block);
// ignore read errors // ignore read errors
if (j->ret != j->d.io.buffer_size) return; if (error) return;
int const size = m_torrent.torrent_file().piece_size(b.piece_index);
hasher h; hasher h;
h.update(j->buffer.disk_block, j->d.io.buffer_size); h.update(disk_block, size);
h.update(reinterpret_cast<char const*>(&m_salt), sizeof(m_salt)); h.update(reinterpret_cast<char const*>(&m_salt), sizeof(m_salt));
std::pair<peer_list::iterator, peer_list::iterator> const range std::pair<peer_list::iterator, peer_list::iterator> const range
@ -257,17 +261,21 @@ namespace
#endif #endif
} }
void on_read_ok_block(std::pair<piece_block, block_entry> b, address a, disk_io_job const* j) void on_read_ok_block(std::pair<piece_block, block_entry> b, address a
, aux::block_cache_reference ref, char* disk_block, int
, storage_error const& error)
{ {
TORRENT_ASSERT(m_torrent.session().is_single_thread()); TORRENT_ASSERT(m_torrent.session().is_single_thread());
disk_buffer_holder buffer(m_torrent.session(), j->d.io.ref, j->buffer.disk_block); disk_buffer_holder buffer(m_torrent.session(), ref, disk_block);
// ignore read errors // ignore read errors
if (j->ret != j->d.io.buffer_size) return; if (error) return;
int const size = m_torrent.torrent_file().piece_size(b.first.piece_index);
hasher h; hasher h;
h.update(j->buffer.disk_block, j->d.io.buffer_size); h.update(disk_block, size);
h.update(reinterpret_cast<char const*>(&m_salt), sizeof(m_salt)); h.update(reinterpret_cast<char const*>(&m_salt), sizeof(m_salt));
sha1_hash const ok_digest = h.final(); sha1_hash const ok_digest = h.final();

View File

@ -860,7 +860,7 @@ namespace libtorrent
r.length = (std::min)(piece_size - r.start, block_size()); r.length = (std::min)(piece_size - r.start, block_size());
m_ses.disk_thread().async_read(&storage(), r m_ses.disk_thread().async_read(&storage(), r
, std::bind(&torrent::on_disk_read_complete , std::bind(&torrent::on_disk_read_complete
, shared_from_this(), _1, r, rp), reinterpret_cast<void*>(1)); , shared_from_this(), _1, _2, _3, _4, r, rp), reinterpret_cast<void*>(1));
} }
} }
@ -1128,24 +1128,25 @@ namespace libtorrent
} }
catch (...) { handle_exception(); } catch (...) { handle_exception(); }
void torrent::on_disk_read_complete(disk_io_job const* j, peer_request r void torrent::on_disk_read_complete(aux::block_cache_reference ref
, std::shared_ptr<read_piece_struct> rp) try , char* block, int, storage_error const& se
, peer_request r, std::shared_ptr<read_piece_struct> rp) try
{ {
// hold a reference until this function returns // hold a reference until this function returns
TORRENT_ASSERT(is_single_thread()); TORRENT_ASSERT(is_single_thread());
disk_buffer_holder buffer(m_ses, j->d.io.ref, j->buffer.disk_block); disk_buffer_holder buffer(m_ses, ref, block);
--rp->blocks_left; --rp->blocks_left;
if (j->ret != r.length) if (se)
{ {
rp->fail = true; rp->fail = true;
rp->error = j->error.ec; rp->error = se.ec;
handle_disk_error("read", j->error); handle_disk_error("read", se);
} }
else else
{ {
std::memcpy(rp->piece_data.get() + r.start, j->buffer.disk_block, r.length); std::memcpy(rp->piece_data.get() + r.start, block, r.length);
} }
if (rp->blocks_left == 0) if (rp->blocks_left == 0)
@ -1218,7 +1219,7 @@ namespace libtorrent
// TODO: 3 there's some duplication between this function and // TODO: 3 there's some duplication between this function and
// peer_connection::incoming_piece(). is there a way to merge something? // peer_connection::incoming_piece(). is there a way to merge something?
void torrent::add_piece(int piece, char const* data, int flags) void torrent::add_piece(int piece, char const* data, int const flags)
{ {
TORRENT_ASSERT(is_single_thread()); TORRENT_ASSERT(is_single_thread());
TORRENT_ASSERT(piece >= 0 && piece < m_torrent_file->num_pieces()); TORRENT_ASSERT(piece >= 0 && piece < m_torrent_file->num_pieces());
@ -1281,7 +1282,7 @@ namespace libtorrent
picker().dec_refcount(piece, nullptr); picker().dec_refcount(piece, nullptr);
} }
void torrent::on_disk_write_complete(disk_io_job const* j void torrent::on_disk_write_complete(storage_error const& error
, peer_request p) try , peer_request p) try
{ {
TORRENT_ASSERT(is_single_thread()); TORRENT_ASSERT(is_single_thread());
@ -1295,9 +1296,9 @@ namespace libtorrent
if (m_abort) return; if (m_abort) return;
piece_block block_finished(p.piece, p.start / block_size()); piece_block block_finished(p.piece, p.start / block_size());
if (j->ret == -1) if (error)
{ {
handle_disk_error("write", j->error); handle_disk_error("write", error);
return; return;
} }
@ -2256,7 +2257,7 @@ namespace libtorrent
m_ses.disk_thread().async_hash(m_storage.get(), m_checking_piece++ m_ses.disk_thread().async_hash(m_storage.get(), m_checking_piece++
, disk_io_job::sequential_access | disk_io_job::volatile_read , disk_io_job::sequential_access | disk_io_job::volatile_read
, std::bind(&torrent::on_piece_hashed , std::bind(&torrent::on_piece_hashed
, shared_from_this(), _1), reinterpret_cast<void*>(1)); , shared_from_this(), _1, _2, _3, _4), reinterpret_cast<void*>(1));
if (m_checking_piece >= m_torrent_file->num_pieces()) break; if (m_checking_piece >= m_torrent_file->num_pieces()) break;
} }
#ifndef TORRENT_DISABLE_LOGGING #ifndef TORRENT_DISABLE_LOGGING
@ -2266,14 +2267,15 @@ namespace libtorrent
// This is only used for checking of torrents. i.e. force-recheck or initial checking // This is only used for checking of torrents. i.e. force-recheck or initial checking
// of existing files // of existing files
void torrent::on_piece_hashed(disk_io_job const* j) try void torrent::on_piece_hashed(int const status, int const piece
, sha1_hash const& piece_hash, storage_error const& error) try
{ {
TORRENT_ASSERT(is_single_thread()); TORRENT_ASSERT(is_single_thread());
INVARIANT_CHECK; INVARIANT_CHECK;
if (m_abort) return; if (m_abort) return;
if (j->ret == disk_interface::disk_check_aborted) if (status == disk_interface::disk_check_aborted)
{ {
m_checking_piece = 0; m_checking_piece = 0;
m_num_checked_pieces = 0; m_num_checked_pieces = 0;
@ -2288,21 +2290,21 @@ namespace libtorrent
++m_num_checked_pieces; ++m_num_checked_pieces;
if (j->ret < 0) if (status < 0)
{ {
if (j->error.ec == boost::system::errc::no_such_file_or_directory if (error.ec == boost::system::errc::no_such_file_or_directory
|| j->error.ec == boost::asio::error::eof || error.ec == boost::asio::error::eof
#ifdef TORRENT_WINDOWS #ifdef TORRENT_WINDOWS
|| j->error.ec == error_code(ERROR_HANDLE_EOF, system_category()) || error.ec == error_code(ERROR_HANDLE_EOF, system_category())
#endif #endif
) )
{ {
TORRENT_ASSERT(j->error.file >= 0); TORRENT_ASSERT(error.file >= 0);
// skip this file by updating m_checking_piece to the first piece following it // skip this file by updating m_checking_piece to the first piece following it
file_storage const& st = m_torrent_file->files(); file_storage const& st = m_torrent_file->files();
std::uint64_t file_size = st.file_size(j->error.file); std::uint64_t file_size = st.file_size(error.file);
int last = st.map_file(j->error.file, file_size, 0).piece; int last = st.map_file(error.file, file_size, 0).piece;
if (m_checking_piece < last) if (m_checking_piece < last)
{ {
int diff = last - m_checking_piece; int diff = last - m_checking_piece;
@ -2315,16 +2317,19 @@ namespace libtorrent
m_checking_piece = 0; m_checking_piece = 0;
m_num_checked_pieces = 0; m_num_checked_pieces = 0;
if (m_ses.alerts().should_post<file_error_alert>()) if (m_ses.alerts().should_post<file_error_alert>())
m_ses.alerts().emplace_alert<file_error_alert>(j->error.ec, m_ses.alerts().emplace_alert<file_error_alert>(error.ec,
resolve_filename(j->error.file), j->error.operation_str(), get_handle()); resolve_filename(error.file), error.operation_str(), get_handle());
#ifndef TORRENT_DISABLE_LOGGING #ifndef TORRENT_DISABLE_LOGGING
if (should_log()) if (should_log())
debug_log("on_piece_hashed, fatal disk error: (%d) %s", j->error.ec.value(), j->error.ec.message().c_str()); {
debug_log("on_piece_hashed, fatal disk error: (%d) %s", error.ec.value()
, error.ec.message().c_str());
}
#endif #endif
auto_managed(false); auto_managed(false);
pause(); pause();
set_error(j->error.ec, j->error.file); set_error(error.ec, error.file);
// recalculate auto-managed torrents sooner // recalculate auto-managed torrents sooner
// in order to start checking the next torrent // in order to start checking the next torrent
@ -2336,21 +2341,21 @@ namespace libtorrent
m_progress_ppm = std::int64_t(m_num_checked_pieces) * 1000000 / torrent_file().num_pieces(); m_progress_ppm = std::int64_t(m_num_checked_pieces) * 1000000 / torrent_file().num_pieces();
if (settings().get_bool(settings_pack::disable_hash_checks) if (settings().get_bool(settings_pack::disable_hash_checks)
|| sha1_hash(j->d.piece_hash) == m_torrent_file->hash_for_piece(j->piece)) || piece_hash == m_torrent_file->hash_for_piece(piece))
{ {
if (has_picker() || !m_have_all) if (has_picker() || !m_have_all)
{ {
need_picker(); need_picker();
m_picker->we_have(j->piece); m_picker->we_have(piece);
update_gauge(); update_gauge();
} }
we_have(j->piece); we_have(piece);
} }
else else
{ {
// if the hash failed, remove it from the cache // if the hash failed, remove it from the cache
if (m_storage) if (m_storage)
m_ses.disk_thread().clear_piece(m_storage.get(), j->piece); m_ses.disk_thread().clear_piece(m_storage.get(), piece);
} }
if (m_num_checked_pieces < m_torrent_file->num_pieces()) if (m_num_checked_pieces < m_torrent_file->num_pieces())
@ -2383,7 +2388,7 @@ namespace libtorrent
m_ses.disk_thread().async_hash(m_storage.get(), m_checking_piece++ m_ses.disk_thread().async_hash(m_storage.get(), m_checking_piece++
, disk_io_job::sequential_access | disk_io_job::volatile_read , disk_io_job::sequential_access | disk_io_job::volatile_read
, std::bind(&torrent::on_piece_hashed , std::bind(&torrent::on_piece_hashed
, shared_from_this(), _1), reinterpret_cast<void*>(1)); , shared_from_this(), _1, _2, _3, _4), reinterpret_cast<void*>(1));
#ifndef TORRENT_DISABLE_LOGGING #ifndef TORRENT_DISABLE_LOGGING
debug_log("on_piece_hashed, m_checking_piece: %d", m_checking_piece); debug_log("on_piece_hashed, m_checking_piece: %d", m_checking_piece);
#endif #endif
@ -3658,24 +3663,25 @@ namespace libtorrent
TORRENT_ASSERT(st.total_done >= st.total_wanted_done); TORRENT_ASSERT(st.total_done >= st.total_wanted_done);
} }
void torrent::on_piece_verified(disk_io_job const* j) try void torrent::on_piece_verified(int const status, int const piece
, sha1_hash const& piece_hash, storage_error const& error) try
{ {
TORRENT_ASSERT(is_single_thread()); TORRENT_ASSERT(is_single_thread());
if (m_abort) return; if (m_abort) return;
int ret = j->ret; int ret = status;
if (settings().get_bool(settings_pack::disable_hash_checks)) if (settings().get_bool(settings_pack::disable_hash_checks))
{ {
ret = 0; ret = 0;
} }
else if (ret == -1) else if (ret == -1)
{ {
handle_disk_error("piece_verified", j->error); handle_disk_error("piece_verified", error);
} }
else else
{ {
if (sha1_hash(j->d.piece_hash) != m_torrent_file->hash_for_piece(j->piece)) if (sha1_hash(piece_hash) != m_torrent_file->hash_for_piece(piece))
ret = -2; ret = -2;
} }
@ -3687,10 +3693,10 @@ namespace libtorrent
if (should_log()) if (should_log())
{ {
debug_log("*** PIECE_FINISHED [ p: %d | chk: %s | size: %d ]" debug_log("*** PIECE_FINISHED [ p: %d | chk: %s | size: %d ]"
, j->piece, ((ret == 0) , piece, ((ret == 0)
?"passed":ret == -1 ?"passed":ret == -1
?"disk failed":"failed") ?"disk failed":"failed")
, m_torrent_file->piece_size(j->piece)); , m_torrent_file->piece_size(piece));
} }
#endif #endif
TORRENT_ASSERT(valid_metadata()); TORRENT_ASSERT(valid_metadata());
@ -3702,7 +3708,7 @@ namespace libtorrent
need_picker(); need_picker();
TORRENT_ASSERT(!m_picker->have_piece(j->piece)); TORRENT_ASSERT(!m_picker->have_piece(piece));
state_updated(); state_updated();
@ -3712,21 +3718,21 @@ namespace libtorrent
// called, and the piece is no longer finished. // called, and the piece is no longer finished.
// in this case, we have to ignore the fact that // in this case, we have to ignore the fact that
// it passed the check // it passed the check
if (!m_picker->is_piece_finished(j->piece)) return; if (!m_picker->is_piece_finished(piece)) return;
if (ret == 0) if (ret == 0)
{ {
// the following call may cause picker to become invalid // the following call may cause picker to become invalid
// in case we just became a seed // in case we just became a seed
piece_passed(j->piece); piece_passed(piece);
// if we're in seed mode, we just acquired this piece // if we're in seed mode, we just acquired this piece
// mark it as verified // mark it as verified
if (m_seed_mode) verified(j->piece); if (m_seed_mode) verified(piece);
} }
else if (ret == -2) else if (ret == -2)
{ {
// piece_failed() will restore the piece // piece_failed() will restore the piece
piece_failed(j->piece); piece_failed(piece);
} }
else else
{ {
@ -10257,9 +10263,10 @@ namespace libtorrent
TORRENT_ASSERT(m_storage.get()); TORRENT_ASSERT(m_storage.get());
m_ses.disk_thread().async_hash(m_storage.get(), piece, 0 m_ses.disk_thread().async_hash(m_storage.get(), piece, 0
, std::bind(&torrent::on_piece_verified, shared_from_this(), _1) , std::bind(&torrent::on_piece_verified, shared_from_this(), _1, _2, _3, _4)
, reinterpret_cast<void*>(1)); , reinterpret_cast<void*>(1));
} }
announce_entry* torrent::find_tracker(std::string const& url) announce_entry* torrent::find_tracker(std::string const& url)
{ {
auto i = std::find_if(m_trackers.begin(), m_trackers.end() auto i = std::find_if(m_trackers.begin(), m_trackers.end()