removed checker thread

This commit is contained in:
Arvid Norberg 2008-03-08 06:06:31 +00:00
parent 1a0f8d5cd5
commit f53cfa7eeb
12 changed files with 776 additions and 1375 deletions

View File

@ -94,73 +94,6 @@ namespace libtorrent
{
struct session_impl;
// this data is shared between the main thread and the
// thread that initialize pieces
struct piece_checker_data
{
piece_checker_data()
: processing(false), progress(0.f), abort(false) {}
boost::shared_ptr<torrent> torrent_ptr;
fs::path save_path;
sha1_hash info_hash;
void parse_resume_data(
const entry& rd
, const torrent_info& info
, std::string& error);
std::vector<int> piece_map;
std::vector<piece_picker::downloading_piece> unfinished_pieces;
std::vector<piece_picker::block_info> block_info;
std::vector<tcp::endpoint> peers;
std::vector<tcp::endpoint> banned_peers;
entry resume_data;
// this is true if this torrent is being processed (checked)
// if it is not being processed, then it can be removed from
// the queue without problems, otherwise the abort flag has
// to be set.
bool processing;
// is filled in by storage::initialize_pieces()
// and represents the progress. It should be a
// value in the range [0, 1]
float progress;
// abort defaults to false and is typically
// filled in by torrent_handle when the user
// aborts the torrent
bool abort;
};
struct checker_impl: boost::noncopyable
{
checker_impl(session_impl& s): m_ses(s), m_abort(false) {}
void operator()();
piece_checker_data* find_torrent(const sha1_hash& info_hash);
void remove_torrent(sha1_hash const& info_hash, int options);
#ifndef NDEBUG
void check_invariant() const;
#endif
// when the files has been checked
// the torrent is added to the session
session_impl& m_ses;
mutable boost::mutex m_mutex;
boost::condition m_cond;
// a list of all torrents that are currently in queue
// or checking their files
std::deque<boost::shared_ptr<piece_checker_data> > m_torrents;
std::deque<boost::shared_ptr<piece_checker_data> > m_processing;
bool m_abort;
};
#if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING
struct tracker_logger;
#endif
@ -284,6 +217,9 @@ namespace libtorrent
std::vector<torrent_handle> get_torrents();
void check_torrent(boost::shared_ptr<torrent> const& t);
void done_checking(boost::shared_ptr<torrent> const& t);
void set_severity_level(alert::severity_t s);
std::auto_ptr<alert> pop_alert();
@ -435,6 +371,7 @@ namespace libtorrent
tracker_manager m_tracker_manager;
torrent_map m_torrents;
std::list<boost::shared_ptr<torrent> > m_queued_for_checking;
// this maps sockets to their peer_connection
// object. It is the complete list of all connected
@ -625,16 +562,8 @@ namespace libtorrent
extension_list_t m_extensions;
#endif
// data shared between the main thread
// and the checker thread
checker_impl m_checker_impl;
// the main working thread
boost::scoped_ptr<boost::thread> m_thread;
// the thread that calls initialize_pieces()
// on all torrents before they start downloading
boost::scoped_ptr<boost::thread> m_checker_thread;
};
#if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING

View File

@ -77,6 +77,8 @@ namespace libtorrent
, move_storage
, release_files
, delete_files
, check_fastresume
, check_files
};
action_t action;
@ -154,10 +156,6 @@ namespace libtorrent
, boost::function<void(int, disk_io_job const&)> const& f
= boost::function<void(int, disk_io_job const&)>());
#ifndef NDEBUG
disk_io_job find_job(boost::intrusive_ptr<piece_manager> s
, int action, int piece) const;
#endif
// keep track of the number of bytes in the job queue
// at any given time. i.e. the sum of all buffer_size.
// this is used to slow down the download global download
@ -268,9 +266,6 @@ namespace libtorrent
// number of bytes per block. The BitTorrent
// protocol defines the block size to 16 KiB.
int m_block_size;
#ifndef NDEBUG
disk_io_job m_current;
#endif
#ifdef TORRENT_DISK_STATS
std::ofstream m_log;

View File

@ -138,10 +138,7 @@ namespace libtorrent
// the vector tells which pieces we already have
// and which we don't have.
void files_checked(
std::vector<bool> const& pieces
, std::vector<downloading_piece> const& unfinished
, std::vector<int>& verify_pieces);
void init(std::vector<bool> const& pieces);
// increases the peer count for the given piece
// (is used when a HAVE message is received)

View File

@ -119,12 +119,12 @@ namespace libtorrent
// if allocate_files is true.
// allocate_files is true if allocation mode
// is set to full and sparse files are supported
// false return value indicates an error
virtual bool initialize(bool allocate_files) = 0;
// negative return value indicates an error
virtual size_type read(char* buf, int slot, int offset, int size) = 0;
// may throw file_error if storage for slot hasn't been allocated
// negative return value indicates an error
virtual size_type write(const char* buf, int slot, int offset, int size) = 0;
@ -194,47 +194,22 @@ namespace libtorrent
, fs::path const& path
, file_pool& fp
, disk_io_thread& io
, storage_constructor_type sc);
, storage_constructor_type sc
, storage_mode_t sm);
~piece_manager();
torrent_info const* info() const { return m_info.get(); }
bool check_fastresume(aux::piece_checker_data& d
, std::vector<bool>& pieces, int& num_pieces, storage_mode_t storage_mode
, std::string& error_msg);
std::pair<bool, float> check_files(std::vector<bool>& pieces
, int& num_pieces, boost::recursive_mutex& mutex, bool& error);
// frees a buffer that was returned from a read operation
void free_buffer(char* buf);
void write_resume_data(entry& rd) const
{ m_storage->write_resume_data(rd); }
void write_resume_data(entry& rd, std::vector<bool> const& have) const;
bool verify_resume_data(entry const& rd, std::string& error)
{
#ifndef NDEBUG
m_resume_data_verified = true;
#endif
return m_storage->verify_resume_data(rd, error);
}
void async_check_fastresume(entry const* resume_data
, boost::function<void(int, disk_io_job const&)> const& handler);
bool is_allocating() const
{ return m_state == state_expand_pieces; }
void mark_failed(int index);
std::string const& error() const { return m_storage->error(); }
void clear_error() { m_storage->clear_error(); }
unsigned long piece_crc(
int slot_index
, int block_size
, piece_picker::block_info const* bi);
int slot_for(int piece) const;
int piece_for(int slot) const;
void async_check_files(boost::function<void(int, disk_io_job const&)> const& handler);
void async_read(
peer_request const& r
@ -249,8 +224,6 @@ namespace libtorrent
void async_hash(int piece, boost::function<void(int, disk_io_job const&)> const& f);
fs::path save_path() const;
void async_release_files(
boost::function<void(int, disk_io_job const&)> const& handler
= boost::function<void(int, disk_io_job const&)>());
@ -262,12 +235,44 @@ namespace libtorrent
void async_move_storage(fs::path const& p
, boost::function<void(int, disk_io_job const&)> const& handler);
// fills the vector that maps all allocated
// slots to the piece that is stored (or
// partially stored) there. -2 is the index
// of unassigned pieces and -1 is unallocated
void export_piece_map(std::vector<int>& pieces
, std::vector<bool> const& have) const;
enum return_t
{
// return values from check_fastresume and check_files
no_error = 0,
need_full_check = -1,
fatal_disk_error = -2,
};
private:
fs::path save_path() const;
bool verify_resume_data(entry const& rd, std::string& error)
{ return m_storage->verify_resume_data(rd, error); }
bool is_allocating() const
{ return m_state == state_expand_pieces; }
void mark_failed(int index);
std::string const& error() const { return m_storage->error(); }
void clear_error() { m_storage->clear_error(); }
int slot_for(int piece) const;
int piece_for(int slot) const;
// helper functions for check_dastresume
int check_no_fastresume(std::string& error);
int check_init_storage(std::string& error);
// if error is set and return value is 'no_error' or 'need_full_check'
// the error message indicates that the fast resume data was rejected
// if 'fatal_disk_error' is returned, the error message indicates what
// when wrong in the disk access
int check_fastresume(entry const& rd, std::string& error);
// this function returns true if the checking is complete
int check_files(int& current_slot, int& have_piece, std::string& error);
bool compact_allocation() const
{ return m_storage_mode == storage_mode_compact; }
@ -276,18 +281,8 @@ namespace libtorrent
std::string name() const { return m_info->name(); }
#endif
private:
bool allocate_slots(int num_slots, bool abort_on_disk = false);
int identify_data(
const std::vector<char>& piece_data
, int current_slot
, std::vector<bool>& have_pieces
, int& num_pieces
, const std::multimap<sha1_hash, int>& hash_to_piece
, boost::recursive_mutex& mutex);
size_type read_impl(
char* buf
, int piece_index
@ -300,8 +295,10 @@ namespace libtorrent
, int offset
, int size);
bool check_one_piece(std::vector<bool>& pieces, int& num_pieces
, boost::recursive_mutex& mutex);
bool check_one_piece(int& have_piece);
int identify_data(
const std::vector<char>& piece_data
, int current_slot);
void switch_to_full_mode();
sha1_hash hash_for_piece_impl(int piece);
@ -357,8 +354,6 @@ namespace libtorrent
state_none,
// the file checking is complete
state_finished,
// creating the directories
state_create_files,
// checking the files
state_full_check,
// move pieces to their final position
@ -403,9 +398,6 @@ namespace libtorrent
// the piece_manager destructs. This is because
// the torrent_info object is owned by the torrent.
boost::shared_ptr<void> m_torrent;
#ifndef NDEBUG
bool m_resume_data_verified;
#endif
};
}

View File

@ -98,20 +98,19 @@ namespace libtorrent
torrent(
aux::session_impl& ses
, aux::checker_impl& checker
, boost::intrusive_ptr<torrent_info> tf
, fs::path const& save_path
, tcp::endpoint const& net_interface
, storage_mode_t m_storage_mode
, int block_size
, storage_constructor_type sc
, bool paused);
, bool paused
, entry const& resume_data);
// used with metadata-less torrents
// (the metadata is downloaded from the peers)
torrent(
aux::session_impl& ses
, aux::checker_impl& checker
, char const* tracker_url
, sha1_hash const& info_hash
, char const* name
@ -120,7 +119,8 @@ namespace libtorrent
, storage_mode_t m_storage_mode
, int block_size
, storage_constructor_type sc
, bool paused);
, bool paused
, entry const& resume_data);
~torrent();
@ -142,6 +142,12 @@ namespace libtorrent
// it will initialize the storage and the piece-picker
void init();
void on_resume_data_checked(int ret, disk_io_job const& j);
void on_piece_checked(int ret, disk_io_job const& j);
void files_checked();
void start_checking();
storage_mode_t storage_mode() const { return m_storage_mode; }
// this will flag the torrent as aborted. The main
// loop in session_impl will check for this state
// on all torrents once every second, and take
@ -149,19 +155,12 @@ namespace libtorrent
void abort();
bool is_aborted() const { return m_abort; }
// returns true if this torrent is being allocated
// by the checker thread.
bool is_allocating() const;
session_settings const& settings() const;
aux::session_impl& session() { return m_ses; }
void set_sequential_download(bool sd);
bool verify_resume_data(entry const& rd, std::string& error)
{ TORRENT_ASSERT(m_storage); return m_storage->verify_resume_data(rd, error); }
void second_tick(stat& accumulator, float tick_interval);
// debug purpose only
@ -169,11 +168,6 @@ namespace libtorrent
std::string name() const;
bool check_fastresume(aux::piece_checker_data&);
std::pair<bool, float> check_files(bool& error);
void files_checked(std::vector<piece_picker::downloading_piece> const&
unfinished_pieces);
stat statistics() const { return m_stat; }
size_type bytes_left() const;
boost::tuples::tuple<size_type, size_type> bytes_done() const;
@ -705,7 +699,6 @@ namespace libtorrent
// a back reference to the session
// this torrent belongs to.
aux::session_impl& m_ses;
aux::checker_impl& m_checker;
boost::scoped_ptr<piece_picker> m_picker;
@ -768,6 +761,12 @@ namespace libtorrent
// determines the storage state for this torrent.
storage_mode_t m_storage_mode;
// the state of this torrent (queued, checking, downloading)
torrent_status::state_t m_state;
float m_progress;
entry m_resume_data;
// defaults to 16 kiB, but can be set by the user
// when creating the torrent
const int m_default_block_size;

View File

@ -278,7 +278,7 @@ namespace libtorrent
friend struct aux::session_impl;
friend class torrent;
torrent_handle(): m_ses(0), m_chk(0), m_info_hash(0) {}
torrent_handle(): m_ses(0), m_info_hash(0) {}
void get_peer_info(std::vector<peer_info>& v) const;
bool send_chat_message(tcp::endpoint ip, std::string message) const;
@ -418,15 +418,12 @@ namespace libtorrent
private:
torrent_handle(aux::session_impl* s,
aux::checker_impl* c,
const sha1_hash& h)
torrent_handle(aux::session_impl* s
, const sha1_hash& h)
: m_ses(s)
, m_chk(c)
, m_info_hash(h)
{
TORRENT_ASSERT(m_ses != 0);
TORRENT_ASSERT(m_chk != 0);
}
#ifndef NDEBUG
@ -434,7 +431,6 @@ namespace libtorrent
#endif
aux::session_impl* m_ses;
aux::checker_impl* m_chk;
sha1_hash m_info_hash;
};

View File

@ -73,31 +73,6 @@ namespace libtorrent
TORRENT_ASSERT(m_abort == true);
}
#ifndef NDEBUG
disk_io_job disk_io_thread::find_job(boost::intrusive_ptr<piece_manager> s
, int action, int piece) const
{
mutex_t::scoped_lock l(m_mutex);
for (std::list<disk_io_job>::const_iterator i = m_jobs.begin();
i != m_jobs.end(); ++i)
{
if (i->storage != s)
continue;
if ((i->action == action || action == -1) && i->piece == piece)
return *i;
}
if ((m_current.action == action || action == -1)
&& m_current.piece == piece)
return m_current;
disk_io_job ret;
ret.action = (disk_io_job::action_t)-1;
ret.piece = -1;
return ret;
}
#endif
void disk_io_thread::join()
{
mutex_t::scoped_lock l(m_mutex);
@ -735,19 +710,14 @@ namespace libtorrent
m_log << log_time() << " idle" << std::endl;
#endif
mutex_t::scoped_lock l(m_mutex);
#ifndef NDEBUG
m_current.action = (disk_io_job::action_t)-1;
m_current.piece = -1;
#endif
while (m_jobs.empty() && !m_abort)
m_signal.wait(l);
if (m_abort && m_jobs.empty()) return;
boost::function<void(int, disk_io_job const&)> handler;
handler.swap(m_jobs.front().callback);
#ifndef NDEBUG
m_current = m_jobs.front();
#endif
disk_io_job j = m_jobs.front();
m_jobs.pop_front();
m_queue_buffer_size -= j.buffer_size;
@ -876,8 +846,8 @@ namespace libtorrent
j.storage->clear_error();
break;
}
j.str.resize(20);
std::memcpy(&j.str[0], &h[0], 20);
ret = (j.storage->info()->hash_for_piece(j.piece) == h)?0:-1;
if (ret == -1) j.storage->mark_failed(j.piece);
break;
}
case disk_io_job::move_storage:
@ -950,6 +920,47 @@ namespace libtorrent
}
break;
}
case disk_io_job::check_fastresume:
{
#ifdef TORRENT_DISK_STATS
m_log << log_time() << " check fastresume" << std::endl;
#endif
entry const* rd = (entry const*)j.buffer;
TORRENT_ASSERT(rd != 0);
ret = j.storage->check_fastresume(*rd, j.str);
break;
}
case disk_io_job::check_files:
{
#ifdef TORRENT_DISK_STATS
m_log << log_time() << " check files" << std::endl;
#endif
int piece_size = j.storage->info()->piece_length();
for (int processed = 0; processed < 4 * 1024 * 1024; processed += piece_size)
{
ret = j.storage->check_files(j.piece, j.offset, j.str);
#ifndef BOOST_NO_EXCEPTIONS
try {
#endif
TORRENT_ASSERT(handler);
if (handler && ret == piece_manager::need_full_check)
m_ios.post(bind(handler, ret, j));
#ifndef BOOST_NO_EXCEPTIONS
} catch (std::exception&) {}
#endif
if (ret != piece_manager::need_full_check) break;
}
// if the check is not done, add it at the end of the job queue
if (ret == piece_manager::need_full_check)
{
mutex_t::scoped_lock l(m_mutex);
m_jobs.push_back(j);
m_jobs.back().callback.swap(handler);
continue;
}
break;
}
}
}
#ifndef BOOST_NO_EXCEPTIONS
@ -973,12 +984,6 @@ namespace libtorrent
#ifndef BOOST_NO_EXCEPTIONS
} catch (std::exception&) {}
#endif
#ifndef NDEBUG
m_current.storage = 0;
m_current.callback.clear();
#endif
}
TORRENT_ASSERT(false);
}

View File

@ -122,10 +122,7 @@ namespace libtorrent
}
// pieces is a bitmask with the pieces we have
void piece_picker::files_checked(
std::vector<bool> const& pieces
, std::vector<downloading_piece> const& unfinished
, std::vector<int>& verify_pieces)
void piece_picker::init(std::vector<bool> const& pieces)
{
TORRENT_PIECE_PICKER_INVARIANT_CHECK;
#ifndef NDEBUG
@ -153,25 +150,6 @@ namespace libtorrent
p.index = 0;
}
}
// if we have fast resume info
// use it
if (!unfinished.empty())
{
for (std::vector<downloading_piece>::const_iterator i
= unfinished.begin(); i != unfinished.end(); ++i)
{
for (int j = 0; j < m_blocks_per_piece; ++j)
{
if (i->info[j].state == block_info::state_finished)
mark_as_finished(piece_block(i->index, j), 0);
}
if (is_piece_finished(i->index))
{
verify_pieces.push_back(i->index);
}
}
}
}
void piece_picker::piece_info(int index, piece_picker::downloading_piece& st) const

View File

@ -116,362 +116,8 @@ namespace detail
}
} namespace aux {
// This is the checker thread
// it is looping in an infinite loop
// until the session is aborted. It will
// normally just block in a wait() call,
// waiting for a signal from session that
// there's a new torrent to check.
void checker_impl::operator()()
{
eh_initializer();
// if we're currently performing a full file check,
// this is the torrent being processed
boost::shared_ptr<piece_checker_data> processing;
boost::shared_ptr<piece_checker_data> t;
for (;;)
{
// temporary torrent used while checking fastresume data
t.reset();
{
boost::mutex::scoped_lock l(m_mutex);
INVARIANT_CHECK;
// if the job queue is empty and
// we shouldn't abort
// wait for a signal
while (m_torrents.empty() && !m_abort && !processing)
m_cond.wait(l);
if (m_abort)
{
// no lock is needed here, because the main thread
// has already been shut down by now
processing.reset();
t.reset();
std::for_each(m_torrents.begin(), m_torrents.end()
, boost::bind(&torrent::abort
, boost::bind(&shared_ptr<torrent>::get
, boost::bind(&piece_checker_data::torrent_ptr, _1))));
m_torrents.clear();
std::for_each(m_processing.begin(), m_processing.end()
, boost::bind(&torrent::abort
, boost::bind(&shared_ptr<torrent>::get
, boost::bind(&piece_checker_data::torrent_ptr, _1))));
m_processing.clear();
return;
}
if (!m_torrents.empty())
{
t = m_torrents.front();
if (t->abort)
{
// make sure the locking order is
// consistent to avoid dead locks
// we need to lock the session because closing
// torrents assume to have access to it
l.unlock();
session_impl::mutex_t::scoped_lock l2(m_ses.m_mutex);
l.lock();
t->torrent_ptr->abort();
m_torrents.pop_front();
continue;
}
}
}
if (t)
{
std::string error_msg;
t->parse_resume_data(t->resume_data, t->torrent_ptr->torrent_file()
, error_msg);
// lock the session to add the new torrent
session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
if (!error_msg.empty() && m_ses.m_alerts.should_post(alert::warning))
{
m_ses.m_alerts.post_alert(fastresume_rejected_alert(
t->torrent_ptr->get_handle()
, error_msg));
#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
(*m_ses.m_logger) << "fastresume data for "
<< t->torrent_ptr->torrent_file().name() << " rejected: "
<< error_msg << "\n";
#endif
}
mutex::scoped_lock l2(m_mutex);
if (m_torrents.empty() || m_torrents.front() != t)
{
// this means the torrent was removed right after it was
// added. Abort the checking.
t.reset();
continue;
}
// clear the resume data now that it has been used
// (the fast resume data is now parsed and stored in t)
t->resume_data = entry();
bool up_to_date = t->torrent_ptr->check_fastresume(*t);
if (up_to_date)
{
INVARIANT_CHECK;
TORRENT_ASSERT(!m_torrents.empty());
TORRENT_ASSERT(m_torrents.front() == t);
t->torrent_ptr->files_checked(t->unfinished_pieces);
m_torrents.pop_front();
// we cannot add the torrent if the session is aborted.
if (!m_ses.is_aborted())
{
m_ses.m_torrents.insert(std::make_pair(t->info_hash, t->torrent_ptr));
if (m_ses.m_alerts.should_post(alert::info))
{
m_ses.m_alerts.post_alert(torrent_checked_alert(
processing->torrent_ptr->get_handle()
, "torrent finished checking"));
}
if (t->torrent_ptr->is_seed() && m_ses.m_alerts.should_post(alert::info))
{
m_ses.m_alerts.post_alert(torrent_finished_alert(
t->torrent_ptr->get_handle()
, "torrent is complete"));
}
peer_id id;
std::fill(id.begin(), id.end(), 0);
for (std::vector<tcp::endpoint>::const_iterator i = t->peers.begin();
i != t->peers.end(); ++i)
{
t->torrent_ptr->get_policy().peer_from_tracker(*i, id
, peer_info::resume_data, 0);
}
for (std::vector<tcp::endpoint>::const_iterator i = t->banned_peers.begin();
i != t->banned_peers.end(); ++i)
{
policy::peer* p = t->torrent_ptr->get_policy().peer_from_tracker(*i, id
, peer_info::resume_data, 0);
if (p) p->banned = true;
}
}
else
{
t->torrent_ptr->abort();
}
t.reset();
continue;
}
l.unlock();
// move the torrent from
// m_torrents to m_processing
TORRENT_ASSERT(m_torrents.front() == t);
m_torrents.pop_front();
m_processing.push_back(t);
if (!processing)
{
processing = t;
processing->processing = true;
t.reset();
}
}
if (!processing) continue;
TORRENT_ASSERT(processing);
bool finished = false;
bool error = false;
float progress = 0.f;
boost::tie(finished, progress) = processing->torrent_ptr->check_files(error);
if (error)
{
// This will happen if the storage fails to initialize
session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
mutex::scoped_lock l2(m_mutex);
if (!m_processing.empty()
&& m_processing.front() == processing)
m_processing.pop_front();
processing.reset();
if (!m_processing.empty())
{
processing = m_processing.front();
processing->processing = true;
}
continue;
}
{
mutex::scoped_lock l2(m_mutex);
INVARIANT_CHECK;
processing->progress = progress;
if (processing->abort)
{
TORRENT_ASSERT(!m_processing.empty());
TORRENT_ASSERT(m_processing.front() == processing);
m_processing.pop_front();
// make sure the lock order is correct
l2.unlock();
session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
l2.lock();
processing->torrent_ptr->abort();
processing.reset();
if (!m_processing.empty())
{
processing = m_processing.front();
processing->processing = true;
}
continue;
}
}
if (finished)
{
// lock the session to add the new torrent
session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
mutex::scoped_lock l2(m_mutex);
INVARIANT_CHECK;
TORRENT_ASSERT(!m_processing.empty());
TORRENT_ASSERT(m_processing.front() == processing);
// TODO: factor out the adding of torrents to the session
// and to the checker thread to avoid duplicating the
// check for abortion.
if (!m_ses.is_aborted())
{
processing->torrent_ptr->files_checked(processing->unfinished_pieces);
m_ses.m_torrents.insert(std::make_pair(
processing->info_hash, processing->torrent_ptr));
if (m_ses.m_alerts.should_post(alert::info))
{
m_ses.m_alerts.post_alert(torrent_checked_alert(
processing->torrent_ptr->get_handle()
, "torrent finished checking"));
}
if (processing->torrent_ptr->is_seed()
&& m_ses.m_alerts.should_post(alert::info))
{
m_ses.m_alerts.post_alert(torrent_finished_alert(
processing->torrent_ptr->get_handle()
, "torrent is complete"));
}
peer_id id;
std::fill(id.begin(), id.end(), 0);
for (std::vector<tcp::endpoint>::const_iterator i = processing->peers.begin();
i != processing->peers.end(); ++i)
{
processing->torrent_ptr->get_policy().peer_from_tracker(*i, id
, peer_info::resume_data, 0);
}
for (std::vector<tcp::endpoint>::const_iterator i = processing->banned_peers.begin();
i != processing->banned_peers.end(); ++i)
{
policy::peer* p = processing->torrent_ptr->get_policy().peer_from_tracker(*i, id
, peer_info::resume_data, 0);
if (p) p->banned = true;
}
}
else
{
processing->torrent_ptr->abort();
}
processing.reset();
m_processing.pop_front();
if (!m_processing.empty())
{
processing = m_processing.front();
processing->processing = true;
}
}
}
}
aux::piece_checker_data* checker_impl::find_torrent(sha1_hash const& info_hash)
{
INVARIANT_CHECK;
for (std::deque<boost::shared_ptr<piece_checker_data> >::iterator i
= m_torrents.begin(); i != m_torrents.end(); ++i)
{
if ((*i)->info_hash == info_hash) return i->get();
}
for (std::deque<boost::shared_ptr<piece_checker_data> >::iterator i
= m_processing.begin(); i != m_processing.end(); ++i)
{
if ((*i)->info_hash == info_hash) return i->get();
}
return 0;
}
void checker_impl::remove_torrent(sha1_hash const& info_hash, int options)
{
INVARIANT_CHECK;
for (std::deque<boost::shared_ptr<piece_checker_data> >::iterator i
= m_torrents.begin(); i != m_torrents.end(); ++i)
{
if ((*i)->info_hash == info_hash)
{
TORRENT_ASSERT((*i)->processing == false);
if (options & session::delete_files)
(*i)->torrent_ptr->delete_files();
m_torrents.erase(i);
return;
}
}
for (std::deque<boost::shared_ptr<piece_checker_data> >::iterator i
= m_processing.begin(); i != m_processing.end(); ++i)
{
if ((*i)->info_hash == info_hash)
{
TORRENT_ASSERT((*i)->processing == false);
if (options & session::delete_files)
(*i)->torrent_ptr->delete_files();
m_processing.erase(i);
return;
}
}
TORRENT_ASSERT(false);
}
#ifndef NDEBUG
void checker_impl::check_invariant() const
{
for (std::deque<boost::shared_ptr<piece_checker_data> >::const_iterator i
= m_torrents.begin(); i != m_torrents.end(); ++i)
{
TORRENT_ASSERT(*i);
TORRENT_ASSERT((*i)->torrent_ptr);
}
for (std::deque<boost::shared_ptr<piece_checker_data> >::const_iterator i
= m_processing.begin(); i != m_processing.end(); ++i)
{
TORRENT_ASSERT(*i);
TORRENT_ASSERT((*i)->torrent_ptr);
}
}
#endif
namespace aux {
struct seed_random_generator
{
@ -524,7 +170,6 @@ namespace detail
#if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING
, m_logpath(logpath)
#endif
, m_checker_impl(*this)
{
#ifdef WIN32
// windows XP has a limit on the number of
@ -596,7 +241,6 @@ namespace detail
bind(&session_impl::second_tick, this, _1));
m_thread.reset(new boost::thread(boost::ref(*this)));
m_checker_thread.reset(new boost::thread(boost::ref(m_checker_impl)));
}
#ifndef TORRENT_DISABLE_EXTENSIONS
@ -711,10 +355,6 @@ namespace detail
m_download_channel.close();
m_upload_channel.close();
mutex::scoped_lock l2(m_checker_impl.m_mutex);
// abort the checker thread
m_checker_impl.m_abort = true;
}
void session_impl::set_port_filter(port_filter const& f)
@ -1077,25 +717,6 @@ namespace detail
#endif
};
/*
namespace
{
struct compare_peer_ptr
{
bool operator()(peer_connection const* lhs
, intrusive_ptr<peer_connection const> const& rhs)
{
return lhs < rhs.get();
}
bool operator()(intrusive_ptr<peer_connection const> const& lhs
, peer_connection const* rhs)
{
return lhs.get() < rhs;
}
};
}
*/
void session_impl::close_connection(peer_connection const* p
, char const* message)
{
@ -1647,40 +1268,21 @@ namespace detail
std::vector<torrent_handle> session_impl::get_torrents()
{
mutex_t::scoped_lock l(m_mutex);
mutex::scoped_lock l2(m_checker_impl.m_mutex);
std::vector<torrent_handle> ret;
for (std::deque<boost::shared_ptr<aux::piece_checker_data> >::iterator i
= m_checker_impl.m_torrents.begin()
, end(m_checker_impl.m_torrents.end()); i != end; ++i)
{
if ((*i)->abort) continue;
ret.push_back(torrent_handle(this, &m_checker_impl
, (*i)->info_hash));
}
for (std::deque<boost::shared_ptr<aux::piece_checker_data> >::iterator i
= m_checker_impl.m_processing.begin()
, end(m_checker_impl.m_processing.end()); i != end; ++i)
{
if ((*i)->abort) continue;
ret.push_back(torrent_handle(this, &m_checker_impl
, (*i)->info_hash));
}
for (session_impl::torrent_map::iterator i
= m_torrents.begin(), end(m_torrents.end());
i != end; ++i)
{
if (i->second->is_aborted()) continue;
ret.push_back(torrent_handle(this, &m_checker_impl
, i->first));
ret.push_back(torrent_handle(this, i->first));
}
return ret;
}
torrent_handle session_impl::find_torrent_handle(sha1_hash const& info_hash)
{
return torrent_handle(this, &m_checker_impl, info_hash);
return torrent_handle(this, info_hash);
}
torrent_handle session_impl::add_torrent(
@ -1699,7 +1301,6 @@ namespace detail
// lock the session and the checker thread (the order is important!)
mutex_t::scoped_lock l(m_mutex);
mutex::scoped_lock l2(m_checker_impl.m_mutex);
// INVARIANT_CHECK;
@ -1710,17 +1311,13 @@ namespace detail
if (!find_torrent(ti->info_hash()).expired())
throw duplicate_torrent();
// is the torrent currently being checked?
if (m_checker_impl.find_torrent(ti->info_hash()))
throw duplicate_torrent();
// create the torrent and the data associated with
// the checker thread and store it before starting
// the thread
boost::shared_ptr<torrent> torrent_ptr(
new torrent(*this, m_checker_impl, ti, save_path
new torrent(*this, ti, save_path
, m_listen_interface, storage_mode, 16 * 1024
, sc, paused));
, sc, paused, resume_data));
torrent_ptr->start();
#ifndef TORRENT_DISABLE_EXTENSIONS
@ -1732,13 +1329,6 @@ namespace detail
}
#endif
boost::shared_ptr<aux::piece_checker_data> d(
new aux::piece_checker_data);
d->torrent_ptr = torrent_ptr;
d->save_path = save_path;
d->info_hash = ti->info_hash();
d->resume_data = resume_data;
#ifndef TORRENT_DISABLE_DHT
if (m_dht)
{
@ -1750,13 +1340,23 @@ namespace detail
}
#endif
// add the torrent to the queue to be checked
m_checker_impl.m_torrents.push_back(d);
// and notify the thread that it got another
// job in its queue
m_checker_impl.m_cond.notify_one();
m_torrents.insert(std::make_pair(ti->info_hash(), torrent_ptr));
return torrent_handle(this, &m_checker_impl, ti->info_hash());
return torrent_handle(this, ti->info_hash());
}
void session_impl::check_torrent(boost::shared_ptr<torrent> const& t)
{
if (m_queued_for_checking.empty()) t->start_checking();
m_queued_for_checking.push_back(t);
}
void session_impl::done_checking(boost::shared_ptr<torrent> const& t)
{
TORRENT_ASSERT(m_queued_for_checking.front() == t);
m_queued_for_checking.pop_front();
if (!m_queued_for_checking.empty())
m_queued_for_checking.front()->start_checking();
}
torrent_handle session_impl::add_torrent(
@ -1764,7 +1364,7 @@ namespace detail
, sha1_hash const& info_hash
, char const* name
, fs::path const& save_path
, entry const&
, entry const& resume_data
, storage_mode_t storage_mode
, storage_constructor_type sc
, bool paused
@ -1773,14 +1373,6 @@ namespace detail
// TODO: support resume data in this case
TORRENT_ASSERT(!save_path.empty());
{
// lock the checker_thread
mutex::scoped_lock l(m_checker_impl.m_mutex);
// is the torrent currently being checked?
if (m_checker_impl.find_torrent(info_hash))
throw duplicate_torrent();
}
// lock the session
session_impl::mutex_t::scoped_lock l(m_mutex);
@ -1789,7 +1381,11 @@ namespace detail
// is the torrent already active?
if (!find_torrent(info_hash).expired())
#ifndef BOOST_NO_EXCEPTIONS
throw duplicate_torrent();
#else
return torrent_handle();
#endif
// you cannot add new torrents to a session that is closing down
TORRENT_ASSERT(!is_aborted());
@ -1798,9 +1394,9 @@ namespace detail
// the checker thread and store it before starting
// the thread
boost::shared_ptr<torrent> torrent_ptr(
new torrent(*this, m_checker_impl, tracker_url, info_hash, name
new torrent(*this, tracker_url, info_hash, name
, save_path, m_listen_interface, storage_mode, 16 * 1024
, sc, paused));
, sc, paused, resume_data));
torrent_ptr->start();
#ifndef TORRENT_DISABLE_EXTENSIONS
@ -1812,16 +1408,14 @@ namespace detail
}
#endif
m_torrents.insert(
std::make_pair(info_hash, torrent_ptr)).first;
m_torrents.insert(std::make_pair(info_hash, torrent_ptr));
return torrent_handle(this, &m_checker_impl, info_hash);
return torrent_handle(this, info_hash);
}
void session_impl::remove_torrent(const torrent_handle& h, int options)
{
if (h.m_ses != this) return;
TORRENT_ASSERT(h.m_chk == &m_checker_impl || h.m_chk == 0);
TORRENT_ASSERT(h.m_ses != 0);
mutex_t::scoped_lock l(m_mutex);
@ -1871,19 +1465,6 @@ namespace detail
TORRENT_ASSERT(m_torrents.find(i_hash) == m_torrents.end());
return;
}
if (h.m_chk)
{
mutex::scoped_lock l(m_checker_impl.m_mutex);
aux::piece_checker_data* d = m_checker_impl.find_torrent(h.m_info_hash);
if (d != 0)
{
if (d->processing) d->abort = true;
else m_checker_impl.remove_torrent(h.m_info_hash, options);
return;
}
}
}
bool session_impl::listen_on(
@ -2182,31 +1763,6 @@ namespace detail
TORRENT_ASSERT(m_torrents.empty());
// it's important that the main thread is closed completely before
// the checker thread is terminated. Because all the connections
// have to be closed and removed from the torrents before they
// can be destructed. (because the weak pointers in the
// peer_connections will be invalidated when the torrents are
// destructed and then the invariant will be broken).
{
mutex::scoped_lock l(m_checker_impl.m_mutex);
// abort the checker thread
m_checker_impl.m_abort = true;
// abort the currently checking torrent
if (!m_checker_impl.m_torrents.empty())
{
m_checker_impl.m_torrents.front()->abort = true;
}
m_checker_impl.m_cond.notify_one();
}
#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
(*m_logger) << time_now_string() << " waiting for checker thread\n";
#endif
m_checker_thread->join();
#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
(*m_logger) << time_now_string() << " waiting for disk io thread\n";
#endif
@ -2476,210 +2032,5 @@ namespace detail
}
#endif
void piece_checker_data::parse_resume_data(
const entry& resume_data
, const torrent_info& info
, std::string& error)
{
// if we don't have any resume data, return
if (resume_data.type() == entry::undefined_t) return;
entry rd = resume_data;
try
{
if (rd["file-format"].string() != "libtorrent resume file")
{
error = "missing file format tag";
return;
}
if (rd["file-version"].integer() > 1)
{
error = "incompatible file version "
+ boost::lexical_cast<std::string>(rd["file-version"].integer());
return;
}
// verify info_hash
sha1_hash hash = rd["info-hash"].string();
if (hash != info.info_hash())
{
error = "mismatching info-hash: " + boost::lexical_cast<std::string>(hash);
return;
}
// the peers
if (entry* peers_entry = rd.find_key("peers"))
{
entry::list_type& peer_list = peers_entry->list();
std::vector<tcp::endpoint> tmp_peers;
tmp_peers.reserve(peer_list.size());
for (entry::list_type::iterator i = peer_list.begin();
i != peer_list.end(); ++i)
{
tcp::endpoint a(
address::from_string((*i)["ip"].string())
, (unsigned short)(*i)["port"].integer());
tmp_peers.push_back(a);
}
peers.swap(tmp_peers);
}
if (entry* banned_peers_entry = rd.find_key("banned_peers"))
{
entry::list_type& peer_list = banned_peers_entry->list();
std::vector<tcp::endpoint> tmp_peers;
tmp_peers.reserve(peer_list.size());
for (entry::list_type::iterator i = peer_list.begin();
i != peer_list.end(); ++i)
{
tcp::endpoint a(
address::from_string((*i)["ip"].string())
, (unsigned short)(*i)["port"].integer());
tmp_peers.push_back(a);
}
banned_peers.swap(tmp_peers);
}
// read piece map
const entry::list_type& slots = rd["slots"].list();
if ((int)slots.size() > info.num_pieces())
{
error = "file has more slots than torrent (slots: "
+ boost::lexical_cast<std::string>(slots.size()) + " size: "
+ boost::lexical_cast<std::string>(info.num_pieces()) + " )";
return;
}
std::vector<int> tmp_pieces;
tmp_pieces.reserve(slots.size());
for (entry::list_type::const_iterator i = slots.begin();
i != slots.end(); ++i)
{
int index = (int)i->integer();
if (index >= info.num_pieces() || index < -2)
{
error = "too high index number in slot map (index: "
+ boost::lexical_cast<std::string>(index) + " size: "
+ boost::lexical_cast<std::string>(info.num_pieces()) + ")";
return;
}
tmp_pieces.push_back(index);
}
// only bother to check the partial pieces if we have the same block size
// as in the fast resume data. If the blocksize has changed, then throw
// away all partial pieces.
std::vector<piece_picker::downloading_piece> tmp_unfinished;
int num_blocks_per_piece = (int)rd["blocks per piece"].integer();
if (num_blocks_per_piece == info.piece_length() / torrent_ptr->block_size())
{
// the unfinished pieces
entry::list_type& unfinished = rd["unfinished"].list();
int unfinished_size = int(unfinished.size());
block_info.resize(num_blocks_per_piece * unfinished_size);
tmp_unfinished.reserve(unfinished_size);
int index = 0;
for (entry::list_type::iterator i = unfinished.begin();
i != unfinished.end(); ++i, ++index)
{
piece_picker::downloading_piece p;
p.info = &block_info[index * num_blocks_per_piece];
p.index = (int)(*i)["piece"].integer();
if (p.index < 0 || p.index >= info.num_pieces())
{
error = "invalid piece index in unfinished piece list (index: "
+ boost::lexical_cast<std::string>(p.index) + " size: "
+ boost::lexical_cast<std::string>(info.num_pieces()) + ")";
return;
}
const std::string& bitmask = (*i)["bitmask"].string();
const int num_bitmask_bytes = (std::max)(num_blocks_per_piece / 8, 1);
if ((int)bitmask.size() != num_bitmask_bytes)
{
error = "invalid size of bitmask (" + boost::lexical_cast<std::string>(bitmask.size()) + ")";
return;
}
for (int j = 0; j < num_bitmask_bytes; ++j)
{
unsigned char bits = bitmask[j];
int num_bits = (std::min)(num_blocks_per_piece - j*8, 8);
for (int k = 0; k < num_bits; ++k)
{
const int bit = j * 8 + k;
if (bits & (1 << k))
{
p.info[bit].state = piece_picker::block_info::state_finished;
++p.finished;
}
}
}
if (p.finished == 0) continue;
std::vector<int>::iterator slot_iter
= std::find(tmp_pieces.begin(), tmp_pieces.end(), p.index);
if (slot_iter == tmp_pieces.end())
{
// this piece is marked as unfinished
// but doesn't have any storage
error = "piece " + boost::lexical_cast<std::string>(p.index) + " is "
"marked as unfinished, but doesn't have any storage";
return;
}
TORRENT_ASSERT(*slot_iter == p.index);
int slot_index = static_cast<int>(slot_iter - tmp_pieces.begin());
const entry* ad = i->find_key("adler32");
if (ad && ad->type() == entry::int_t)
{
unsigned long adler
= torrent_ptr->filesystem().piece_crc(
slot_index
, torrent_ptr->block_size()
, p.info);
// crc's didn't match, don't use the resume data
if (ad->integer() != entry::integer_type(adler))
{
error = "checksum mismatch on piece "
+ boost::lexical_cast<std::string>(p.index);
return;
}
}
tmp_unfinished.push_back(p);
}
}
if (!torrent_ptr->verify_resume_data(rd, error))
return;
piece_map.swap(tmp_pieces);
unfinished_pieces.swap(tmp_unfinished);
}
catch (invalid_encoding&)
{
return;
}
catch (type_error&)
{
return;
}
catch (file_error&)
{
return;
}
}
}}

View File

@ -462,7 +462,12 @@ namespace libtorrent
#endif
file(m_save_path / file_iter->path, file::out);
#ifndef BOOST_NO_EXCEPTIONS
} catch (std::exception&) {}
}
catch (std::exception& e)
{
m_error = e.what();
return true;
}
#endif
continue;
}
@ -480,7 +485,12 @@ namespace libtorrent
f->set_size(file_iter->size);
}
#ifndef BOOST_NO_EXCEPTIONS
} catch (std::exception&) {}
}
catch (std::exception& e)
{
m_error = e.what();
return true;
}
#endif
}
// close files that were opened in write mode
@ -1094,9 +1104,10 @@ namespace libtorrent
, fs::path const& save_path
, file_pool& fp
, disk_io_thread& io
, storage_constructor_type sc)
, storage_constructor_type sc
, storage_mode_t sm)
: m_storage(sc(ti, save_path, fp))
, m_storage_mode(storage_mode_sparse)
, m_storage_mode(sm)
, m_info(ti)
, m_save_path(complete(save_path))
, m_state(state_none)
@ -1107,9 +1118,6 @@ namespace libtorrent
, m_io_thread(io)
, m_torrent(torrent)
{
#ifndef NDEBUG
m_resume_data_verified = false;
#endif
}
piece_manager::~piece_manager()
@ -1149,6 +1157,26 @@ namespace libtorrent
m_io_thread.add_job(j, handler);
}
void piece_manager::async_check_fastresume(entry const* resume_data
, boost::function<void(int, disk_io_job const&)> const& handler)
{
TORRENT_ASSERT(resume_data != 0);
disk_io_job j;
j.storage = this;
j.action = disk_io_job::check_fastresume;
j.buffer = (char*)resume_data;
m_io_thread.add_job(j, handler);
}
void piece_manager::async_check_files(
boost::function<void(int, disk_io_job const&)> const& handler)
{
disk_io_job j;
j.storage = this;
j.action = disk_io_job::check_files;
m_io_thread.add_job(j, handler);
}
void piece_manager::async_read(
peer_request const& r
, boost::function<void(int, disk_io_job const&)> const& handler
@ -1238,45 +1266,44 @@ namespace libtorrent
return false;
}
void piece_manager::export_piece_map(
std::vector<int>& p, std::vector<bool> const& have) const
void piece_manager::write_resume_data(entry& rd
, std::vector<bool> const& have) const
{
boost::recursive_mutex::scoped_lock lock(m_mutex);
INVARIANT_CHECK;
m_storage->write_resume_data(rd);
entry::list_type& slots = rd["slots"].list();
if (m_storage_mode == storage_mode_compact)
{
p.clear();
p.reserve(m_info->num_pieces());
slots.clear();
std::vector<int>::const_reverse_iterator last;
for (last = m_slot_to_piece.rbegin();
last != m_slot_to_piece.rend(); ++last)
{
if (*last != unallocated && have[*last]) break;
if (*last != unallocated) break;
}
for (std::vector<int>::const_iterator i =
m_slot_to_piece.begin();
i != last.base(); ++i)
{
p.push_back((*i >= 0 && have[*i]) ? *i : unassigned);
slots.push_back((*i >= 0) ? *i : unassigned);
}
}
else
{
p.reserve(m_info->num_pieces());
for (int i = 0; i < m_info->num_pieces(); ++i)
{
p.push_back(have[i] ? i : unassigned);
slots.push_back(have[i] ? i : unassigned);
}
}
}
void piece_manager::mark_failed(int piece_index)
{
boost::recursive_mutex::scoped_lock lock(m_mutex);
INVARIANT_CHECK;
if (m_storage_mode != storage_mode_compact) return;
@ -1290,43 +1317,6 @@ namespace libtorrent
m_free_slots.push_back(slot_index);
}
unsigned long piece_manager::piece_crc(
int slot_index
, int block_size
, piece_picker::block_info const* bi)
{
TORRENT_ASSERT(slot_index >= 0);
TORRENT_ASSERT(slot_index < m_info->num_pieces());
TORRENT_ASSERT(block_size > 0);
adler32_crc crc;
std::vector<char> buf(block_size);
int num_blocks = static_cast<int>(m_info->piece_size(slot_index)) / block_size;
int last_block_size = static_cast<int>(m_info->piece_size(slot_index)) % block_size;
if (last_block_size == 0) last_block_size = block_size;
for (int i = 0; i < num_blocks-1; ++i)
{
if (bi[i].state != piece_picker::block_info::state_finished) continue;
m_storage->read(
&buf[0]
, slot_index
, i * block_size
, block_size);
crc.update(&buf[0], block_size);
}
if (num_blocks > 0 && bi[num_blocks - 1].state == piece_picker::block_info::state_finished)
{
m_storage->read(
&buf[0]
, slot_index
, block_size * (num_blocks - 1)
, last_block_size);
crc.update(&buf[0], last_block_size);
}
return crc.final();
}
size_type piece_manager::read_impl(
char* buf
, int piece_index
@ -1382,16 +1372,10 @@ namespace libtorrent
int piece_manager::identify_data(
const std::vector<char>& piece_data
, int current_slot
, std::vector<bool>& have_pieces
, int& num_pieces
, const std::multimap<sha1_hash, int>& hash_to_piece
, boost::recursive_mutex& mutex)
, int current_slot)
{
// INVARIANT_CHECK;
TORRENT_ASSERT((int)have_pieces.size() == m_info->num_pieces());
const int piece_size = static_cast<int>(m_info->piece_length());
const int last_piece_size = static_cast<int>(m_info->piece_size(
m_info->num_pieces() - 1));
@ -1421,8 +1405,8 @@ namespace libtorrent
map_iter end2;
// makes the lookups for the small digest and the large digest
boost::tie(begin1, end1) = hash_to_piece.equal_range(small_hash);
boost::tie(begin2, end2) = hash_to_piece.equal_range(large_hash);
boost::tie(begin1, end1) = m_hash_to_piece.equal_range(small_hash);
boost::tie(begin2, end2) = m_hash_to_piece.equal_range(large_hash);
// copy all potential piece indices into this vector
std::vector<int> matching_pieces;
@ -1448,15 +1432,11 @@ namespace libtorrent
// we will assume that the piece is in the right place
const int piece_index = current_slot;
// lock because we're writing to have_pieces
boost::recursive_mutex::scoped_lock l(mutex);
if (have_pieces[piece_index])
int other_slot = m_piece_to_slot[piece_index];
if (other_slot >= 0)
{
// we have already found a piece with
// this index.
int other_slot = m_piece_to_slot[piece_index];
TORRENT_ASSERT(other_slot >= 0);
// take one of the other matching pieces
// that hasn't already been assigned
@ -1464,18 +1444,15 @@ namespace libtorrent
for (std::vector<int>::iterator i = matching_pieces.begin();
i != matching_pieces.end(); ++i)
{
if (have_pieces[*i] || *i == piece_index) continue;
if (m_piece_to_slot[*i] >= 0 || *i == piece_index) continue;
other_piece = *i;
break;
}
if (other_piece >= 0)
{
// replace the old slot with 'other_piece'
TORRENT_ASSERT(have_pieces[other_piece] == false);
have_pieces[other_piece] = true;
m_slot_to_piece[other_slot] = other_piece;
m_piece_to_slot[other_piece] = other_slot;
++num_pieces;
}
else
{
@ -1491,19 +1468,9 @@ namespace libtorrent
TORRENT_ASSERT(m_piece_to_slot[piece_index] != current_slot);
TORRENT_ASSERT(m_piece_to_slot[piece_index] >= 0);
m_piece_to_slot[piece_index] = has_no_slot;
#ifndef NDEBUG
// to make the assert happy, a few lines down
have_pieces[piece_index] = false;
#endif
}
else
{
++num_pieces;
}
TORRENT_ASSERT(have_pieces[piece_index] == false);
TORRENT_ASSERT(m_piece_to_slot[piece_index] == has_no_slot);
have_pieces[piece_index] = true;
return piece_index;
}
@ -1514,21 +1481,14 @@ namespace libtorrent
for (std::vector<int>::iterator i = matching_pieces.begin();
i != matching_pieces.end(); ++i)
{
if (have_pieces[*i]) continue;
if (m_piece_to_slot[*i] >= 0) continue;
free_piece = *i;
break;
}
if (free_piece >= 0)
{
// lock because we're writing to have_pieces
boost::recursive_mutex::scoped_lock l(mutex);
TORRENT_ASSERT(have_pieces[free_piece] == false);
TORRENT_ASSERT(m_piece_to_slot[free_piece] == has_no_slot);
have_pieces[free_piece] = true;
++num_pieces;
return free_piece;
}
else
@ -1538,15 +1498,92 @@ namespace libtorrent
}
}
int piece_manager::check_no_fastresume(std::string& error)
{
torrent_info::file_iterator i = m_info->begin_files(true);
torrent_info::file_iterator end = m_info->end_files(true);
for (; i != end; ++i)
{
bool file_exists = false;
fs::path f = m_save_path / i->path;
#ifndef BOOST_NO_EXCEPTIONS
try
{
#endif
#if defined(_WIN32) && defined(UNICODE) && BOOST_VERSION < 103400
file_exists = exists_win(f);
#elif defined(_WIN32) && defined(UNICODE)
fs::wpath wf = safe_convert(f.string());
file_exists = exists(wf);
#else
file_exists = exists(f);
#endif
#ifndef BOOST_NO_EXCEPTIONS
}
catch (std::exception& e)
{
error = f.string();
error += ": ";
error += e.what();
return fatal_disk_error;
}
#endif
if (file_exists && i->size > 0)
{
m_state = state_full_check;
m_piece_to_slot.clear();
m_piece_to_slot.resize(m_info->num_pieces(), has_no_slot);
m_slot_to_piece.clear();
m_slot_to_piece.resize(m_info->num_pieces(), unallocated);
return need_full_check;
}
}
if (m_storage_mode == storage_mode_compact)
{
// in compact mode without checking, we need to
// populate the unallocated list
TORRENT_ASSERT(m_unallocated_slots.empty());
for (int i = 0, end(m_info->num_pieces()); i < end; ++i)
m_unallocated_slots.push_back(i);
m_piece_to_slot.resize(m_info->num_pieces(), has_no_slot);
m_slot_to_piece.resize(m_info->num_pieces(), unallocated);
}
return check_init_storage(error);
}
int piece_manager::check_init_storage(std::string& error)
{
if (m_storage->initialize(m_storage_mode == storage_mode_allocate))
{
error = m_storage->error();
m_storage->clear_error();
return fatal_disk_error;
}
m_state = state_finished;
buffer().swap(m_scratch_buffer);
buffer().swap(m_scratch_buffer2);
if (m_storage_mode != storage_mode_compact)
{
// if no piece is out of place
// since we're in full allocation mode, we can
// forget the piece allocation tables
std::vector<int>().swap(m_piece_to_slot);
std::vector<int>().swap(m_slot_to_piece);
std::vector<int>().swap(m_free_slots);
std::vector<int>().swap(m_unallocated_slots);
}
return no_error;
}
// check if the fastresume data is up to date
// if it is, use it and return true. If it
// isn't return false and the full check
// will be run
bool piece_manager::check_fastresume(
aux::piece_checker_data& data
, std::vector<bool>& pieces
, int& num_pieces, storage_mode_t storage_mode
, std::string& error_msg)
int piece_manager::check_fastresume(
entry const& rd, std::string& error)
{
boost::recursive_mutex::scoped_lock lock(m_mutex);
@ -1554,7 +1591,144 @@ namespace libtorrent
TORRENT_ASSERT(m_info->piece_length() > 0);
m_storage_mode = storage_mode;
// if we don't have any resume data, return
if (rd.type() == entry::undefined_t) return check_no_fastresume(error);
if (rd.type() != entry::dictionary_t)
{
error = "invalid fastresume data (not a bencoded dictionary)";
return check_no_fastresume(error);
}
entry const* file_format = rd.find_key("file-format");
if (file_format == 0 || file_format->type() != entry::string_t)
{
error = "missing file format tag";
return check_no_fastresume(error);
}
if (file_format->string() != "libtorrent resume file")
{
error = "invalid file format tag";
return check_no_fastresume(error);
}
entry const* info_hash = rd.find_key("info-hash");
if (info_hash == 0 || info_hash->type() != entry::string_t)
{
error = "missing info-hash";
return check_no_fastresume(error);
}
if (sha1_hash(info_hash->string()) != m_info->info_hash())
{
error = "mismatching info-hash";
return check_no_fastresume(error);
}
int block_size = (std::min)(16 * 1024, m_info->piece_length());
entry const* blocks_per_piece_ent = rd.find_key("blocks per piece");
if (blocks_per_piece_ent != 0
&& blocks_per_piece_ent->type() == entry::int_t
&& blocks_per_piece_ent->integer() != m_info->piece_length() / block_size)
{
error = "invalid 'blocks per piece' entry";
return check_no_fastresume(error);
}
storage_mode_t storage_mode = storage_mode_compact;
entry const* allocation = rd.find_key("allocation");
if (allocation != 0
&& allocation->type() == entry::string_t
&& allocation->string() != "compact")
storage_mode = storage_mode_sparse;
// read piece map
entry const* slots = rd.find_key("slots");
if (slots == 0 || slots->type() != entry::list_t)
{
error = "missing slot list";
return check_no_fastresume(error);
}
if ((int)slots->list().size() > m_info->num_pieces())
{
error = "file has more slots than torrent (slots: "
+ boost::lexical_cast<std::string>(slots->list().size()) + " size: "
+ boost::lexical_cast<std::string>(m_info->num_pieces()) + " )";
return check_no_fastresume(error);
}
// assume no piece is out of place (i.e. in a slot
// other than the one it should be in)
bool out_of_place = false;
if (storage_mode == storage_mode_compact)
{
int num_pieces = int(m_info->num_pieces());
m_slot_to_piece.resize(num_pieces, unallocated);
m_piece_to_slot.resize(num_pieces, has_no_slot);
int slot = 0;
for (entry::list_type::const_iterator i = slots->list().begin();
i != slots->list().end(); ++i, ++slot)
{
if (i->type() != entry::int_t)
{
error = "invalid entry type in slot list";
return check_no_fastresume(error);
}
int index = int(i->integer());
if (index >= num_pieces || index < -2)
{
error = "too high index number in slot map (index: "
+ boost::lexical_cast<std::string>(index) + " size: "
+ boost::lexical_cast<std::string>(num_pieces) + ")";
return check_no_fastresume(error);
}
if (index >= 0)
{
m_slot_to_piece[slot] = index;
m_piece_to_slot[index] = slot;
if (slot != index) out_of_place = true;
}
else if (index == unassigned)
{
if (m_storage_mode == storage_mode_compact)
m_free_slots.push_back(slot);
}
else
{
TORRENT_ASSERT(index == unallocated);
if (m_storage_mode == storage_mode_compact)
m_unallocated_slots.push_back(slot);
}
}
}
else
{
int slot = 0;
for (entry::list_type::const_iterator i = slots->list().begin();
i != slots->list().end(); ++i, ++slot)
{
if (i->type() != entry::int_t)
{
error = "invalid entry type in slot list";
return check_no_fastresume(error);
}
int index = int(i->integer());
if (index != slot)
{
error = "invalid slot index";
return check_no_fastresume(error);
}
}
}
if (!m_storage->verify_resume_data(rd, error))
return check_no_fastresume(error);
// This will corrupt the storage
// use while debugging to find
@ -1562,123 +1736,44 @@ namespace libtorrent
// by check_pieces.
// m_storage->shuffle();
m_piece_to_slot.resize(m_info->num_pieces(), has_no_slot);
m_slot_to_piece.resize(m_info->num_pieces(), unallocated);
if (m_storage_mode == storage_mode_compact)
{
if (m_unallocated_slots.empty()) switch_to_full_mode();
}
else
{
TORRENT_ASSERT(m_free_slots.empty());
TORRENT_ASSERT(m_unallocated_slots.empty());
// assume no piece is out of place (i.e. in a slot
// other than the one it should be in)
bool out_of_place = false;
pieces.clear();
pieces.resize(m_info->num_pieces(), false);
num_pieces = 0;
// if we have fast-resume info
// use it instead of doing the actual checking
if (!data.piece_map.empty()
&& int(data.piece_map.size()) <= m_info->num_pieces())
{
TORRENT_ASSERT(m_resume_data_verified);
for (int i = 0; i < (int)data.piece_map.size(); ++i)
{
m_slot_to_piece[i] = data.piece_map[i];
if (data.piece_map[i] >= 0)
{
if (data.piece_map[i] != i) out_of_place = true;
m_piece_to_slot[data.piece_map[i]] = i;
int found_piece = data.piece_map[i];
// if the piece is not in the unfinished list
// we have all of it
if (std::find_if(
data.unfinished_pieces.begin()
, data.unfinished_pieces.end()
, piece_picker::has_index(found_piece))
== data.unfinished_pieces.end())
{
++num_pieces;
pieces[found_piece] = true;
}
}
else if (data.piece_map[i] == unassigned)
{
if (m_storage_mode == storage_mode_compact)
m_free_slots.push_back(i);
}
else
{
TORRENT_ASSERT(data.piece_map[i] == unallocated);
if (m_storage_mode == storage_mode_compact)
m_unallocated_slots.push_back(i);
}
}
if (m_storage_mode == storage_mode_compact)
{
m_unallocated_slots.reserve(int(m_info->num_pieces() - data.piece_map.size()));
for (int i = (int)data.piece_map.size(); i < (int)m_info->num_pieces(); ++i)
{
m_unallocated_slots.push_back(i);
}
if (m_unallocated_slots.empty())
{
switch_to_full_mode();
}
}
else
{
if (!out_of_place)
{
// if no piece is out of place
// since we're in full allocation mode, we can
// forget the piece allocation tables
std::vector<int>().swap(m_piece_to_slot);
std::vector<int>().swap(m_slot_to_piece);
m_state = state_create_files;
return false;
}
else
if (out_of_place)
{
// in this case we're in full allocation mode, but
// we're resuming a compact allocated storage
m_state = state_expand_pieces;
m_current_slot = 0;
error_msg = "pieces needs to be reordered";
return false;
error = "pieces needs to be reordered";
return need_full_check;
}
}
m_state = state_create_files;
return false;
}
m_state = state_full_check;
return false;
return check_init_storage(error);
}
/*
state chart:
check_fastresume()
| |
| v
check_fastresume() ----------+
|
| | |
| v v
| +------------+ +---------------+
| | full_check |-->| expand_pieses |
| +------------+ +---------------+
| | |
| v |
| +--------------+ |
+->| create_files | <------+
+->| finished | <------+
+--------------+
|
v
+----------+
| finished |
+----------+
*/
@ -1688,23 +1783,12 @@ namespace libtorrent
// the second return value is the progress the
// file check is at. 0 is nothing done, and 1
// is finished
std::pair<bool, float> piece_manager::check_files(
std::vector<bool>& pieces, int& num_pieces, boost::recursive_mutex& mutex
, bool& error)
int piece_manager::check_files(int& current_slot, int& have_piece, std::string& error)
{
#ifndef NDEBUG
boost::recursive_mutex::scoped_lock l_(mutex);
TORRENT_ASSERT(num_pieces == std::count(pieces.begin(), pieces.end(), true));
l_.unlock();
#endif
if (m_state == state_create_files)
{
m_storage->initialize(m_storage_mode == storage_mode_allocate);
m_state = state_finished;
return std::make_pair(true, 1.f);
}
TORRENT_ASSERT(int(m_piece_to_slot.size()) == m_info->num_pieces());
current_slot = m_current_slot;
have_piece = -1;
if (m_state == state_expand_pieces)
{
INVARIANT_CHECK;
@ -1724,8 +1808,9 @@ namespace libtorrent
if (m_storage->read(&m_scratch_buffer2[0], piece, 0, piece_size)
!= piece_size)
{
error = true;
return std::make_pair(true, (float)m_current_slot / m_info->num_pieces());
error = m_storage->error();
m_storage->clear_error();
return fatal_disk_error;
}
m_scratch_piece = other_piece;
m_piece_to_slot[other_piece] = unassigned;
@ -1736,8 +1821,9 @@ namespace libtorrent
int piece_size = m_info->piece_size(piece);
if (m_storage->write(&m_scratch_buffer[0], piece, 0, piece_size) != piece_size)
{
error = true;
return std::make_pair(true, (float)m_current_slot / m_info->num_pieces());
error = m_storage->error();
m_storage->clear_error();
return fatal_disk_error;
}
m_piece_to_slot[piece] = piece;
m_slot_to_piece[piece] = piece;
@ -1745,7 +1831,7 @@ namespace libtorrent
if (other_piece >= 0)
m_scratch_buffer.swap(m_scratch_buffer2);
return std::make_pair(false, (float)m_current_slot / m_info->num_pieces());
return need_full_check;
}
while (m_current_slot < m_info->num_pieces()
@ -1757,15 +1843,7 @@ namespace libtorrent
if (m_current_slot == m_info->num_pieces())
{
m_state = state_create_files;
buffer().swap(m_scratch_buffer);
buffer().swap(m_scratch_buffer2);
if (m_storage_mode != storage_mode_compact)
{
std::vector<int>().swap(m_piece_to_slot);
std::vector<int>().swap(m_slot_to_piece);
}
return std::make_pair(false, 1.f);
return check_init_storage(error);
}
int piece = m_slot_to_piece[m_current_slot];
@ -1782,8 +1860,9 @@ namespace libtorrent
int piece_size = m_info->piece_size(other_piece);
if (m_storage->read(&m_scratch_buffer[0], piece, 0, piece_size) != piece_size)
{
error = true;
return std::make_pair(false, (float)m_current_slot / m_info->num_pieces());
error = m_storage->error();
m_storage->clear_error();
return fatal_disk_error;
}
m_scratch_piece = other_piece;
m_piece_to_slot[other_piece] = unassigned;
@ -1796,12 +1875,12 @@ namespace libtorrent
m_slot_to_piece[m_current_slot] = unassigned;
m_slot_to_piece[piece] = piece;
return std::make_pair(false, (float)m_current_slot / m_info->num_pieces());
return need_full_check;
}
TORRENT_ASSERT(m_state == state_full_check);
bool skip = check_one_piece(pieces, num_pieces, mutex);
bool skip = check_one_piece(have_piece);
if (skip)
{
@ -1837,6 +1916,7 @@ namespace libtorrent
}
++m_current_slot;
current_slot = m_current_slot;
if (m_current_slot >= m_info->num_pieces())
{
@ -1856,8 +1936,7 @@ namespace libtorrent
std::vector<int>().swap(m_piece_to_slot);
std::vector<int>().swap(m_slot_to_piece);
m_state = state_create_files;
return std::make_pair(false, 1.f);
return check_init_storage(error);
}
else
{
@ -1865,47 +1944,37 @@ namespace libtorrent
// we're resuming a compact allocated storage
m_state = state_expand_pieces;
m_current_slot = 0;
return std::make_pair(false, 0.f);
current_slot = m_current_slot;
return need_full_check;
}
}
else if (m_unallocated_slots.empty())
{
switch_to_full_mode();
}
m_state = state_create_files;
#ifndef NDEBUG
boost::recursive_mutex::scoped_lock l(mutex);
TORRENT_ASSERT(num_pieces == std::count(pieces.begin(), pieces.end(), true));
#endif
return std::make_pair(false, 1.f);
return check_init_storage(error);
}
return need_full_check;
}
TORRENT_ASSERT(num_pieces == std::count(pieces.begin(), pieces.end(), true));
return std::make_pair(false, (float)m_current_slot / m_info->num_pieces());
}
bool piece_manager::check_one_piece(std::vector<bool>& pieces, int& num_pieces
, boost::recursive_mutex& mutex)
bool piece_manager::check_one_piece(int& have_piece)
{
// ------------------------
// DO THE FULL CHECK
// ------------------------
TORRENT_ASSERT(int(m_piece_to_slot.size()) == m_info->num_pieces());
TORRENT_ASSERT(int(m_slot_to_piece.size()) == m_info->num_pieces());
TORRENT_ASSERT(have_piece == -1);
// initialization for the full check
if (m_hash_to_piece.empty())
{
for (int i = 0; i < m_info->num_pieces(); ++i)
{
m_hash_to_piece.insert(std::make_pair(m_info->hash_for_piece(i), i));
}
boost::recursive_mutex::scoped_lock l(mutex);
std::fill(pieces.begin(), pieces.end(), false);
num_pieces = 0;
}
m_piece_data.resize(m_info->piece_length());
m_piece_data.resize(int(m_info->piece_length()));
int piece_size = m_info->piece_size(m_current_slot);
int num_read = m_storage->read(&m_piece_data[0]
, m_current_slot, 0, piece_size);
@ -1914,14 +1983,14 @@ namespace libtorrent
if (num_read != piece_size)
return true;
int piece_index = identify_data(m_piece_data, m_current_slot
, pieces, num_pieces, m_hash_to_piece, mutex);
int piece_index = identify_data(m_piece_data, m_current_slot);
if (piece_index >= 0) have_piece = piece_index;
if (piece_index != m_current_slot
&& piece_index >= 0)
m_out_of_place = true;
TORRENT_ASSERT(num_pieces == std::count(pieces.begin(), pieces.end(), true));
TORRENT_ASSERT(piece_index == unassigned || piece_index >= 0);
const bool this_should_move = piece_index >= 0 && m_slot_to_piece[piece_index] != unallocated;
@ -2148,7 +2217,7 @@ namespace libtorrent
if (m_storage_mode != storage_mode_compact) return piece_index;
// INVARIANT_CHECK;
INVARIANT_CHECK;
TORRENT_ASSERT(piece_index >= 0);
TORRENT_ASSERT(piece_index < (int)m_piece_to_slot.size());
@ -2263,7 +2332,7 @@ namespace libtorrent
boost::recursive_mutex::scoped_lock lock(m_mutex);
TORRENT_ASSERT(num_slots > 0);
// INVARIANT_CHECK;
INVARIANT_CHECK;
TORRENT_ASSERT(!m_unallocated_slots.empty());
TORRENT_ASSERT(m_storage_mode == storage_mode_compact);

View File

@ -150,14 +150,14 @@ namespace libtorrent
torrent::torrent(
session_impl& ses
, aux::checker_impl& checker
, boost::intrusive_ptr<torrent_info> tf
, fs::path const& save_path
, tcp::endpoint const& net_interface
, storage_mode_t storage_mode
, int block_size
, storage_constructor_type sc
, bool paused)
, bool paused
, entry const& resume_data)
: m_torrent_file(tf)
, m_abort(false)
, m_paused(paused)
@ -179,7 +179,6 @@ namespace libtorrent
, m_last_dht_announce(time_now() - minutes(15))
#endif
, m_ses(ses)
, m_checker(checker)
, m_picker(0)
, m_trackers(m_torrent_file->trackers())
, m_last_working_tracker(-1)
@ -195,6 +194,9 @@ namespace libtorrent
, m_net_interface(net_interface.address(), 0)
, m_save_path(complete(save_path))
, m_storage_mode(storage_mode)
, m_state(torrent_status::queued_for_checking)
, m_progress(0.f)
, m_resume_data(resume_data)
, m_default_block_size(block_size)
, m_connections_initialized(true)
, m_settings(ses.settings())
@ -211,7 +213,6 @@ namespace libtorrent
torrent::torrent(
session_impl& ses
, aux::checker_impl& checker
, char const* tracker_url
, sha1_hash const& info_hash
, char const* name
@ -220,7 +221,8 @@ namespace libtorrent
, storage_mode_t storage_mode
, int block_size
, storage_constructor_type sc
, bool paused)
, bool paused
, entry const& resume_data)
: m_torrent_file(new torrent_info(info_hash))
, m_abort(false)
, m_paused(paused)
@ -242,7 +244,6 @@ namespace libtorrent
, m_last_dht_announce(time_now() - minutes(15))
#endif
, m_ses(ses)
, m_checker(checker)
, m_picker(0)
, m_last_working_tracker(-1)
, m_currently_trying_tracker(0)
@ -257,6 +258,9 @@ namespace libtorrent
, m_net_interface(net_interface.address(), 0)
, m_save_path(complete(save_path))
, m_storage_mode(storage_mode)
, m_state(torrent_status::queued_for_checking)
, m_progress(0.f)
, m_resume_data(resume_data)
, m_default_block_size(block_size)
, m_connections_initialized(false)
, m_settings(ses.settings())
@ -399,7 +403,8 @@ namespace libtorrent
// the shared_from_this() will create an intentional
// cycle of ownership, se the hpp file for description.
m_owning_storage = new piece_manager(shared_from_this(), m_torrent_file
, m_save_path, m_ses.m_files, m_ses.m_disk_thread, m_storage_constructor);
, m_save_path, m_ses.m_files, m_ses.m_disk_thread, m_storage_constructor
, m_storage_mode);
m_storage = m_owning_storage.get();
m_block_size = calculate_block_size(*m_torrent_file, m_default_block_size);
m_picker.reset(new piece_picker(
@ -409,6 +414,233 @@ namespace libtorrent
std::vector<std::string> const& url_seeds = m_torrent_file->url_seeds();
std::copy(url_seeds.begin(), url_seeds.end(), std::inserter(m_web_seeds
, m_web_seeds.begin()));
m_state = torrent_status::queued_for_checking;
m_storage->async_check_fastresume(&m_resume_data
, bind(&torrent::on_resume_data_checked
, shared_from_this(), _1, _2));
}
void torrent::on_resume_data_checked(int ret, disk_io_job const& j)
{
session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
if (ret == piece_manager::fatal_disk_error)
{
if (m_ses.m_alerts.should_post(alert::fatal))
{
m_ses.m_alerts.post_alert(file_error_alert(get_handle(), j.str));
#if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING
(*m_ses.m_logger) << time_now_string() << ": fatal disk error ["
" error: " << j.str <<
" torrent: " << torrent_file().name() <<
" ]\n";
#endif
}
std::fill(m_have_pieces.begin(), m_have_pieces.end(), false);
m_num_pieces = 0;
pause();
return;
}
// parse out "peers" from the resume data and add them to the peer list
entry const* peers_entry = m_resume_data.find_key("peers");
if (peers_entry && peers_entry->type() == entry::list_t)
{
peer_id id;
std::fill(id.begin(), id.end(), 0);
entry::list_type const& peer_list = peers_entry->list();
for (entry::list_type::const_iterator i = peer_list.begin();
i != peer_list.end(); ++i)
{
if (i->type() != entry::dictionary_t) continue;
entry const* ip = i->find_key("ip");
entry const* port = i->find_key("port");
if (ip == 0 || port == 0
|| ip->type() != entry::string_t
|| port->type() != entry::int_t)
continue;
tcp::endpoint a(
address::from_string(ip->string())
, (unsigned short)port->integer());
m_policy.peer_from_tracker(a, id, peer_info::resume_data, 0);
}
}
// parse out "banned_peers" and add them as banned
entry const* banned_peers_entry = m_resume_data.find_key("banned_peers");
if (banned_peers_entry != 0 && banned_peers_entry->type() == entry::list_t)
{
peer_id id;
std::fill(id.begin(), id.end(), 0);
entry::list_type const& peer_list = banned_peers_entry->list();
for (entry::list_type::const_iterator i = peer_list.begin();
i != peer_list.end(); ++i)
{
if (i->type() != entry::dictionary_t) continue;
entry const* ip = i->find_key("ip");
entry const* port = i->find_key("port");
if (ip == 0 || port == 0
|| ip->type() != entry::string_t
|| port->type() != entry::int_t)
continue;
tcp::endpoint a(
address::from_string(ip->string())
, (unsigned short)port->integer());
policy::peer* p = m_policy.peer_from_tracker(a, id, peer_info::resume_data, 0);
if (p) p->banned = true;
}
}
bool fastresume_rejected = !j.str.empty();
if (fastresume_rejected && m_ses.m_alerts.should_post(alert::warning))
{
m_ses.m_alerts.post_alert(fastresume_rejected_alert(get_handle(), j.str));
#if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING
(*m_ses.m_logger) << "fastresume data for "
<< torrent_file().name() << " rejected: "
<< j.str << "\n";
#endif
}
if (ret == 0)
{
// there are either no files for this torrent
// or the resume_data was accepted
m_num_pieces = 0;
std::fill(m_have_pieces.begin(), m_have_pieces.end(), false);
if (!fastresume_rejected)
{
// parse slots
entry const* slots_ent = m_resume_data.find_key("slots");
if (slots_ent != 0 && slots_ent->type() == entry::list_t)
{
entry::list_type const& slots = slots_ent->list();
for (entry::list_type::const_iterator i = slots.begin();
i != slots.end(); ++i)
{
if (i->type() != entry::int_t) continue;
int piece_index = int(i->integer());
if (piece_index < 0 || piece_index >= torrent_file().num_pieces())
continue;
m_have_pieces[piece_index] = true;
++m_num_pieces;
}
}
// parse unfinished pieces
int num_blocks_per_piece =
static_cast<int>(torrent_file().piece_length()) / block_size();
entry const* unfinished_ent = m_resume_data.find_key("unfinished");
if (unfinished_ent != 0 && unfinished_ent->type() == entry::list_t)
{
entry::list_type const& unfinished = unfinished_ent->list();
int index = 0;
for (entry::list_type::const_iterator i = unfinished.begin();
i != unfinished.end(); ++i, ++index)
{
if (i->type() != entry::dictionary_t) continue;
entry const* piece = i->find_key("piece");
if (piece == 0 || piece->type() != entry::int_t) continue;
int piece_index = int(piece->integer());
if (piece_index < 0 || piece_index >= torrent_file().num_pieces())
continue;
// if this assert is hit, the resume data file was corrupt
TORRENT_ASSERT(m_have_pieces[piece_index]);
m_have_pieces[piece_index] = false;
--m_num_pieces;
entry const* bitmask_ent = i->find_key("bitmask");
if (bitmask_ent == 0 || bitmask_ent->type() != entry::string_t) break;
std::string const& bitmask = bitmask_ent->string();
const int num_bitmask_bytes = (std::max)(num_blocks_per_piece / 8, 1);
if ((int)bitmask.size() != num_bitmask_bytes) continue;
for (int j = 0; j < num_bitmask_bytes; ++j)
{
unsigned char bits = bitmask[j];
int num_bits = (std::min)(num_blocks_per_piece - j*8, 8);
for (int k = 0; k < num_bits; ++k)
{
const int bit = j * 8 + k;
if (bits & (1 << k))
{
m_picker->mark_as_finished(piece_block(piece_index, bit), 0);
if (m_picker->is_piece_finished(piece_index))
async_verify_piece(piece_index, bind(&torrent::piece_finished
, shared_from_this(), piece_index, _1));
}
}
}
}
}
}
files_checked();
}
else
{
// either the fastresume data was rejected or there are
// some files
m_ses.check_torrent(shared_from_this());
}
}
void torrent::start_checking()
{
m_state = torrent_status::checking_files;
m_storage->async_check_files(bind(
&torrent::on_piece_checked
, shared_from_this(), _1, _2));
}
void torrent::on_piece_checked(int ret, disk_io_job const& j)
{
session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
if (ret == piece_manager::fatal_disk_error)
{
if (m_ses.m_alerts.should_post(alert::fatal))
{
m_ses.m_alerts.post_alert(file_error_alert(get_handle(), j.str));
#if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING
(*m_ses.m_logger) << time_now_string() << ": fatal disk error ["
" error: " << j.str <<
" torrent: " << torrent_file().name() <<
" ]\n";
#endif
}
std::fill(m_have_pieces.begin(), m_have_pieces.end(), false);
m_num_pieces = 0;
pause();
m_ses.done_checking(shared_from_this());
return;
}
m_progress = j.piece / float(torrent_file().num_pieces());
if (j.offset >= 0 && !m_have_pieces[j.offset])
{
m_have_pieces[j.offset] = true;
++m_num_pieces;
}
// we're not done checking yet
// this handler will be called repeatedly until
// we're done, or encounter a failure
if (ret == piece_manager::need_full_check) return;
m_ses.done_checking(shared_from_this());
files_checked();
}
void torrent::use_interface(const char* net_interface)
@ -1102,7 +1334,6 @@ namespace libtorrent
// that has sent the least number of pieces
m_picker->restore_piece(index);
TORRENT_ASSERT(m_storage);
m_storage->mark_failed(index);
TORRENT_ASSERT(m_have_pieces[index] == false);
@ -2151,31 +2382,14 @@ namespace libtorrent
return false;
}
init();
boost::mutex::scoped_lock(m_checker.m_mutex);
boost::shared_ptr<aux::piece_checker_data> d(
new aux::piece_checker_data);
d->torrent_ptr = shared_from_this();
d->save_path = m_save_path;
d->info_hash = m_torrent_file->info_hash();
// add the torrent to the queue to be checked
m_checker.m_torrents.push_back(d);
typedef session_impl::torrent_map torrent_map;
torrent_map::iterator i = m_ses.m_torrents.find(
m_torrent_file->info_hash());
TORRENT_ASSERT(i != m_ses.m_torrents.end());
m_ses.m_torrents.erase(i);
// and notify the thread that it got another
// job in its queue
m_checker.m_cond.notify_one();
if (m_ses.m_alerts.should_post(alert::info))
{
m_ses.m_alerts.post_alert(metadata_received_alert(
get_handle(), "metadata successfully received from swarm"));
}
init();
return true;
}
@ -2186,6 +2400,13 @@ namespace libtorrent
TORRENT_ASSERT(p != 0);
TORRENT_ASSERT(!p->is_local());
if (m_state == torrent_status::queued_for_checking
|| m_state == torrent_status::checking_files)
{
p->disconnect("torrent is not ready to accept peers");
return;
}
if (m_ses.m_connections.find(p) == m_ses.m_connections.end())
{
p->disconnect("peer is not properly constructed");
@ -2246,7 +2467,9 @@ namespace libtorrent
{
return int(m_connections.size()) < m_max_connections
&& m_ses.m_half_open.free_slots()
&& !m_paused;
&& !m_paused
&& m_state != torrent_status::checking_files
&& m_state != torrent_status::queued_for_checking;
}
void torrent::disconnect_all()
@ -2380,6 +2603,8 @@ namespace libtorrent
, "torrent has finished downloading"));
}
m_state = torrent_status::finished;
// disconnect all seeds
// TODO: should disconnect all peers that have the pieces we have
// not just seeds
@ -2414,6 +2639,7 @@ namespace libtorrent
// make the next tracker request
// be a completed-event
m_event = tracker_request::completed;
m_state = torrent_status::seeding;
force_tracker_request();
}
@ -2478,10 +2704,10 @@ namespace libtorrent
}
bool torrent::check_fastresume(aux::piece_checker_data& data)
/*
void torrent::check_fastresume()
{
INVARIANT_CHECK;
TORRENT_ASSERT(valid_metadata());
bool done = true;
try
@ -2492,16 +2718,6 @@ namespace libtorrent
done = m_storage->check_fastresume(data, m_have_pieces, m_num_pieces
, m_storage_mode, error_msg);
if (!error_msg.empty() && m_ses.m_alerts.should_post(alert::warning))
{
m_ses.m_alerts.post_alert(fastresume_rejected_alert(
get_handle(), error_msg));
#if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING
(*m_ses.m_logger) << "fastresume data for "
<< torrent_file().name() << " rejected: "
<< error_msg << "\n";
#endif
}
}
catch (std::exception& e)
{
@ -2551,30 +2767,21 @@ namespace libtorrent
return progress;
}
void torrent::files_checked(std::vector<piece_picker::downloading_piece> const&
unfinished_pieces)
*/
void torrent::files_checked()
{
session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
TORRENT_ASSERT(m_torrent_file->is_valid());
INVARIANT_CHECK;
m_state = torrent_status::connecting_to_tracker;
if (!is_seed())
{
// this is filled in with pieces that needs to be checked
// against its hashes.
std::vector<int> verify_pieces;
m_picker->files_checked(m_have_pieces, unfinished_pieces, verify_pieces);
m_picker->init(m_have_pieces);
if (m_sequential_download)
picker().sequential_download(m_sequential_download);
while (!verify_pieces.empty())
{
int piece = verify_pieces.back();
verify_pieces.pop_back();
async_verify_piece(piece, bind(&torrent::piece_finished
, shared_from_this(), piece, _1));
}
}
#ifndef TORRENT_DISABLE_EXTENSIONS
@ -2593,6 +2800,7 @@ namespace libtorrent
if (is_seed())
{
m_state = torrent_status::seeding;
m_picker.reset();
if (m_ses.settings().free_torrent_hashes)
m_torrent_file->seed_free();
@ -2634,9 +2842,6 @@ namespace libtorrent
fs::path torrent::save_path() const
{
if (m_owning_storage.get())
return m_owning_storage->save_path();
else
return m_save_path;
}
@ -2674,7 +2879,7 @@ namespace libtorrent
torrent_handle torrent::get_handle() const
{
return torrent_handle(&m_ses, &m_checker, m_torrent_file->info_hash());
return torrent_handle(&m_ses, m_torrent_file->info_hash());
}
session_settings const& torrent::settings() const
@ -2779,17 +2984,6 @@ namespace libtorrent
complete = false;
break;
}
// this is no longer valid since the completion event
// may be queued in the io service
/*
if (complete && m_files_checked)
{
disk_io_job ret = m_ses.m_disk_thread.find_job(
m_owning_storage, -1, i->index);
TORRENT_ASSERT(ret.action == disk_io_job::hash || ret.action == disk_io_job::write);
TORRENT_ASSERT(ret.piece == i->index);
}
*/
}
}
@ -3100,9 +3294,8 @@ namespace libtorrent
void torrent::on_piece_verified(int ret, disk_io_job const& j
, boost::function<void(bool)> f)
{
sha1_hash h(j.str);
session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
f(m_torrent_file->hash_for_piece(j.piece) == h);
f(ret == 0);
}
const tcp::endpoint& torrent::current_tracker() const
@ -3110,9 +3303,6 @@ namespace libtorrent
return m_tracker_address;
}
bool torrent::is_allocating() const
{ return m_owning_storage.get() && m_owning_storage->is_allocating(); }
void torrent::file_progress(std::vector<float>& fp) const
{
TORRENT_ASSERT(valid_metadata());
@ -3217,6 +3407,8 @@ namespace libtorrent
st.connections_limit = m_max_connections;
// if we don't have any metadata, stop here
st.state = m_state;
if (!valid_metadata())
{
if (m_got_tracker_response == false)
@ -3224,14 +3416,8 @@ namespace libtorrent
else
st.state = torrent_status::downloading_metadata;
// TODO: add a progress member to the torrent that will be used in this case
// and that may be set by a plugin
// if (m_metadata_size == 0) st.progress = 0.f;
// else st.progress = (std::min)(1.f, m_metadata_progress / (float)m_metadata_size);
st.progress = 0.f;
st.progress = m_progress;
st.block_size = 0;
return st;
}
@ -3258,31 +3444,14 @@ namespace libtorrent
TORRENT_ASSERT(st.total_wanted >= st.total_wanted_done);
if (st.total_wanted == 0) st.progress = 1.f;
if (m_state == torrent_status::checking_files)
st.progress = m_progress;
else if (st.total_wanted == 0) st.progress = 1.f;
else st.progress = st.total_wanted_done
/ static_cast<double>(st.total_wanted);
st.pieces = &m_have_pieces;
st.num_pieces = m_num_pieces;
if (m_got_tracker_response == false)
{
st.state = torrent_status::connecting_to_tracker;
}
else if (is_seed())
{
TORRENT_ASSERT(st.total_done == m_torrent_file->total_size());
st.state = torrent_status::seeding;
}
else if (st.total_wanted_done == st.total_wanted)
{
st.state = torrent_status::finished;
}
else
{
st.state = torrent_status::downloading;
}
st.num_seeds = num_seeds();
if (m_picker.get())
st.distributed_copies = m_picker->distributed_copies();

View File

@ -82,58 +82,46 @@ using libtorrent::aux::session_impl;
#define TORRENT_FORWARD(call) \
if (m_ses == 0) return; \
TORRENT_ASSERT(m_chk); \
session_impl::mutex_t::scoped_lock l1(m_ses->m_mutex); \
mutex::scoped_lock l2(m_chk->m_mutex); \
torrent* t = find_torrent(m_ses, m_chk, m_info_hash); \
if (t == 0) return; \
boost::shared_ptr<torrent> t = m_ses->find_torrent(m_info_hash).lock(); \
if (!t) return; \
t->call
#define TORRENT_FORWARD_RETURN(call, def) \
if (m_ses == 0) return def; \
TORRENT_ASSERT(m_chk); \
session_impl::mutex_t::scoped_lock l1(m_ses->m_mutex); \
mutex::scoped_lock l2(m_chk->m_mutex); \
torrent* t = find_torrent(m_ses, m_chk, m_info_hash); \
if (t == 0) return def; \
boost::shared_ptr<torrent> t = m_ses->find_torrent(m_info_hash).lock(); \
if (!t) return def; \
return t->call
#define TORRENT_FORWARD_RETURN2(call, def) \
if (m_ses == 0) return def; \
TORRENT_ASSERT(m_chk); \
session_impl::mutex_t::scoped_lock l1(m_ses->m_mutex); \
mutex::scoped_lock l2(m_chk->m_mutex); \
torrent* t = find_torrent(m_ses, m_chk, m_info_hash); \
if (t == 0) return def; \
boost::shared_ptr<torrent> t = m_ses->find_torrent(m_info_hash).lock(); \
if (!t) return def; \
t->call
#else
#define TORRENT_FORWARD(call) \
if (m_ses == 0) throw_invalid_handle(); \
TORRENT_ASSERT(m_chk); \
session_impl::mutex_t::scoped_lock l1(m_ses->m_mutex); \
mutex::scoped_lock l2(m_chk->m_mutex); \
torrent* t = find_torrent(m_ses, m_chk, m_info_hash); \
if (t == 0) throw_invalid_handle(); \
boost::shared_ptr<torrent> t = m_ses->find_torrent(m_info_hash).lock(); \
if (!t) throw_invalid_handle(); \
t->call
#define TORRENT_FORWARD_RETURN(call, def) \
if (m_ses == 0) throw_invalid_handle(); \
TORRENT_ASSERT(m_chk); \
session_impl::mutex_t::scoped_lock l1(m_ses->m_mutex); \
mutex::scoped_lock l2(m_chk->m_mutex); \
torrent* t = find_torrent(m_ses, m_chk, m_info_hash); \
if (t == 0) return def; \
boost::shared_ptr<torrent> t = m_ses->find_torrent(m_info_hash).lock(); \
if (!t) return def; \
return t->call
#define TORRENT_FORWARD_RETURN2(call, def) \
if (m_ses == 0) throw_invalid_handle(); \
TORRENT_ASSERT(m_chk); \
session_impl::mutex_t::scoped_lock l1(m_ses->m_mutex); \
mutex::scoped_lock l2(m_chk->m_mutex); \
torrent* t = find_torrent(m_ses, m_chk, m_info_hash); \
if (t == 0) return def; \
boost::shared_ptr<torrent> t = m_ses->find_torrent(m_info_hash).lock(); \
if (!t) return def; \
t->call
#endif
@ -150,26 +138,12 @@ namespace libtorrent
throw invalid_handle();
}
#endif
torrent* find_torrent(
session_impl* ses
, aux::checker_impl* chk
, sha1_hash const& hash)
{
aux::piece_checker_data* d = chk->find_torrent(hash);
if (d != 0) return d->torrent_ptr.get();
boost::shared_ptr<torrent> t = ses->find_torrent(hash).lock();
if (t) return t.get();
return 0;
}
}
#ifndef NDEBUG
void torrent_handle::check_invariant() const
{
TORRENT_ASSERT((m_ses == 0 && m_chk == 0) || (m_ses != 0 && m_chk != 0));
}
#endif
@ -302,30 +276,8 @@ namespace libtorrent
#else
throw_invalid_handle();
#endif
TORRENT_ASSERT(m_chk);
session_impl::mutex_t::scoped_lock l(m_ses->m_mutex);
mutex::scoped_lock l2(m_chk->m_mutex);
aux::piece_checker_data* d = m_chk->find_torrent(m_info_hash);
if (d != 0)
{
torrent_status st = d->torrent_ptr->status();
if (d->processing)
{
if (d->torrent_ptr->is_allocating())
st.state = torrent_status::allocating;
else
st.state = torrent_status::checking_files;
}
else
st.state = torrent_status::queued_for_checking;
st.progress = d->progress;
st.paused = d->torrent_ptr->is_paused();
return st;
}
boost::shared_ptr<torrent> t = m_ses->find_torrent(m_info_hash).lock();
if (t) return t->status();
@ -464,11 +416,9 @@ namespace libtorrent
#else
if (m_ses == 0) throw_invalid_handle();
#endif
TORRENT_ASSERT(m_chk);
session_impl::mutex_t::scoped_lock l1(m_ses->m_mutex);
mutex::scoped_lock l2(m_chk->m_mutex);
torrent* t = find_torrent(m_ses, m_chk, m_info_hash);
if (t == 0 || !t->valid_metadata())
boost::shared_ptr<torrent> t = m_ses->find_torrent(m_info_hash).lock();
if (!t || !t->valid_metadata())
#ifdef BOOST_NO_EXCEPTIONS
return empty;
#else
@ -481,10 +431,8 @@ namespace libtorrent
{
INVARIANT_CHECK;
if (m_ses == 0) return false;
TORRENT_ASSERT(m_chk);
session_impl::mutex_t::scoped_lock l1(m_ses->m_mutex);
mutex::scoped_lock l2(m_chk->m_mutex);
torrent* t = find_torrent(m_ses, m_chk, m_info_hash);
boost::shared_ptr<torrent> t = m_ses->find_torrent(m_info_hash).lock();
return t;
}
@ -498,12 +446,8 @@ namespace libtorrent
#else
throw_invalid_handle();
#endif
TORRENT_ASSERT(m_chk);
session_impl::mutex_t::scoped_lock l(m_ses->m_mutex);
mutex::scoped_lock l2(m_chk->m_mutex);
torrent* t = find_torrent(m_ses, m_chk, m_info_hash);
boost::shared_ptr<torrent> t = m_ses->find_torrent(m_info_hash).lock();
if (!t || !t->valid_metadata())
#ifdef BOOST_NO_EXCEPTIONS
return entry();
@ -518,7 +462,9 @@ namespace libtorrent
ret["file-format"] = "libtorrent resume file";
ret["file-version"] = 1;
ret["allocation"] = t->filesystem().compact_allocation()?"compact":"full";
storage_mode_t sm = t->storage_mode();
ret["allocation"] = sm == storage_mode_sparse?"sparse"
:sm == storage_mode_allocate?"full":"compact";
const sha1_hash& info_hash = t->torrent_file().info_hash();
ret["info-hash"] = std::string((char*)info_hash.begin(), (char*)info_hash.end());
@ -569,25 +515,13 @@ namespace libtorrent
TORRENT_ASSERT(bits == 8 || j == num_bitmask_bytes - 1);
}
piece_struct["bitmask"] = bitmask;
/*
TORRENT_ASSERT(t->filesystem().slot_for(i->index) >= 0);
unsigned long adler
= t->filesystem().piece_crc(
t->filesystem().slot_for(i->index)
, t->block_size()
, i->info);
piece_struct["adler32"] = adler;
*/
// push the struct onto the unfinished-piece list
up.push_back(piece_struct);
}
}
std::vector<int> piece_index;
t->filesystem().export_piece_map(piece_index, have_pieces);
entry::list_type& slots = ret["slots"].list();
std::copy(piece_index.begin(), piece_index.end(), std::back_inserter(slots));
t->filesystem().write_resume_data(ret, have_pieces);
// write local peers
@ -632,8 +566,6 @@ namespace libtorrent
peer_list.push_back(peer);
}
t->filesystem().write_resume_data(ret);
return ret;
}
@ -654,27 +586,16 @@ namespace libtorrent
#else
throw_invalid_handle();
#endif
TORRENT_ASSERT(m_chk);
session_impl::mutex_t::scoped_lock l(m_ses->m_mutex);
boost::shared_ptr<torrent> t = m_ses->find_torrent(m_info_hash).lock();
if (!t)
{
// the torrent is being checked. Add the peer to its
// peer list. The entries in there will be connected
// once the checking is complete.
mutex::scoped_lock l2(m_chk->m_mutex);
aux::piece_checker_data* d = m_chk->find_torrent(m_info_hash);
if (d == 0)
#ifdef BOOST_NO_EXCEPTIONS
return;
#else
throw_invalid_handle();
#endif
d->peers.push_back(adr);
return;
}
peer_id id;