disk IO thread error handling

This commit is contained in:
Arvid Norberg 2008-04-13 21:26:57 +00:00
parent 2b55d7ebe5
commit 644d3aa66c
2 changed files with 239 additions and 219 deletions

View File

@ -770,133 +770,132 @@ namespace libtorrent
#ifndef BOOST_NO_EXCEPTIONS #ifndef BOOST_NO_EXCEPTIONS
try { try {
#endif #endif
std::string const& error_string = j.storage->error();
if (!error_string.empty()) switch (j.action)
{ {
#ifndef NDEBUG case disk_io_job::read:
std::cout << "ERROR: '" << error_string << "' " << j.error_file << std::endl;
#endif
j.str = error_string;
j.error_file = j.storage->error_file();
j.storage->clear_error();
ret = -1;
}
else
{
switch (j.action)
{ {
case disk_io_job::read: std::string const& error_string = j.storage->error();
if (!error_string.empty())
{ {
#ifdef TORRENT_DISK_STATS #ifndef NDEBUG
m_log << log_time() << " read " << j.buffer_size << std::endl; std::cout << "ERROR: '" << error_string << "' " << j.error_file << std::endl;
#endif #endif
mutex_t::scoped_lock l(m_mutex); j.str = error_string;
INVARIANT_CHECK; j.error_file = j.storage->error_file();
TORRENT_ASSERT(j.buffer == 0); j.storage->clear_error();
j.buffer = allocate_buffer(); ret = -1;
TORRENT_ASSERT(j.buffer_size <= m_block_size); break;
if (j.buffer == 0) }
{ #ifdef TORRENT_DISK_STATS
ret = -1; m_log << log_time() << " read " << j.buffer_size << std::endl;
j.str = "out of memory"; #endif
break; mutex_t::scoped_lock l(m_mutex);
} INVARIANT_CHECK;
TORRENT_ASSERT(j.buffer == 0);
j.buffer = allocate_buffer();
TORRENT_ASSERT(j.buffer_size <= m_block_size);
if (j.buffer == 0)
{
ret = -1;
j.str = "out of memory";
break;
}
disk_buffer_holder read_holder(*this, j.buffer); disk_buffer_holder read_holder(*this, j.buffer);
ret = try_read_from_cache(j, l); ret = try_read_from_cache(j, l);
// -2 means there's no space in the read cache // -2 means there's no space in the read cache
// or that the read cache is disabled // or that the read cache is disabled
if (ret == -1) if (ret == -1)
{
j.buffer = 0;
j.str = j.storage->error();
j.error_file = j.storage->error_file();
j.storage->clear_error();
break;
}
else if (ret == -2)
{
l.unlock();
ret = j.storage->read_impl(j.buffer, j.piece, j.offset
, j.buffer_size);
if (ret < 0)
{ {
j.buffer = 0;
j.str = j.storage->error(); j.str = j.storage->error();
j.error_file = j.storage->error_file(); j.error_file = j.storage->error_file();
j.storage->clear_error(); j.storage->clear_error();
break; break;
} }
else if (ret == -2) l.lock();
{ ++m_cache_stats.blocks_read;
l.unlock(); }
ret = j.storage->read_impl(j.buffer, j.piece, j.offset read_holder.release();
, j.buffer_size); break;
if (ret < 0) }
{ case disk_io_job::write:
j.str = j.storage->error(); {
j.error_file = j.storage->error_file(); std::string const& error_string = j.storage->error();
j.storage->clear_error(); if (!error_string.empty())
break; {
} #ifndef NDEBUG
l.lock(); std::cout << "ERROR: '" << error_string << "' " << j.error_file << std::endl;
++m_cache_stats.blocks_read; #endif
} j.str = error_string;
read_holder.release(); j.error_file = j.storage->error_file();
j.storage->clear_error();
ret = -1;
break; break;
} }
case disk_io_job::write:
{
#ifdef TORRENT_DISK_STATS #ifdef TORRENT_DISK_STATS
m_log << log_time() << " write " << j.buffer_size << std::endl; m_log << log_time() << " write " << j.buffer_size << std::endl;
#endif #endif
mutex_t::scoped_lock l(m_mutex); mutex_t::scoped_lock l(m_mutex);
INVARIANT_CHECK; INVARIANT_CHECK;
cache_t::iterator p cache_t::iterator p
= find_cached_piece(m_pieces, j, l); = find_cached_piece(m_pieces, j, l);
int block = j.offset / m_block_size; int block = j.offset / m_block_size;
TORRENT_ASSERT(j.buffer); TORRENT_ASSERT(j.buffer);
TORRENT_ASSERT(j.buffer_size <= m_block_size); TORRENT_ASSERT(j.buffer_size <= m_block_size);
if (p != m_pieces.end()) if (p != m_pieces.end())
{
TORRENT_ASSERT(p->blocks[block] == 0);
if (p->blocks[block])
{ {
TORRENT_ASSERT(p->blocks[block] == 0); free_buffer(p->blocks[block]);
if (p->blocks[block]) --p->num_blocks;
{
free_buffer(p->blocks[block]);
--p->num_blocks;
}
p->blocks[block] = j.buffer;
++m_cache_stats.cache_size;
++p->num_blocks;
p->last_use = time_now();
} }
else p->blocks[block] = j.buffer;
{ ++m_cache_stats.cache_size;
cache_block(j, l); ++p->num_blocks;
} p->last_use = time_now();
// we've now inserted the buffer
// in the cache, we should not
// free it at the end
holder.release();
if (m_cache_stats.cache_size >= m_cache_size)
flush_oldest_piece(l);
break;
} }
case disk_io_job::hash: else
{ {
cache_block(j, l);
}
// we've now inserted the buffer
// in the cache, we should not
// free it at the end
holder.release();
if (m_cache_stats.cache_size >= m_cache_size)
flush_oldest_piece(l);
break;
}
case disk_io_job::hash:
{
#ifdef TORRENT_DISK_STATS #ifdef TORRENT_DISK_STATS
m_log << log_time() << " hash" << std::endl; m_log << log_time() << " hash" << std::endl;
#endif #endif
mutex_t::scoped_lock l(m_mutex); mutex_t::scoped_lock l(m_mutex);
INVARIANT_CHECK; INVARIANT_CHECK;
cache_t::iterator i cache_t::iterator i
= find_cached_piece(m_pieces, j, l); = find_cached_piece(m_pieces, j, l);
if (i != m_pieces.end()) if (i != m_pieces.end())
{ {
flush_and_remove(i, l); flush_and_remove(i, l);
std::string const& e = j.storage->error();
if (!e.empty())
{
j.str = e;
j.error_file = j.storage->error_file();
ret = -1;
j.storage->clear_error();
j.storage->mark_failed(j.piece);
break;
}
}
l.unlock();
sha1_hash h = j.storage->hash_for_piece_impl(j.piece);
std::string const& e = j.storage->error(); std::string const& e = j.storage->error();
if (!e.empty()) if (!e.empty())
{ {
@ -907,151 +906,163 @@ namespace libtorrent
j.storage->mark_failed(j.piece); j.storage->mark_failed(j.piece);
break; break;
} }
ret = (j.storage->info()->hash_for_piece(j.piece) == h)?0:-2; }
if (ret == -2) j.storage->mark_failed(j.piece); l.unlock();
sha1_hash h = j.storage->hash_for_piece_impl(j.piece);
std::string const& e = j.storage->error();
if (!e.empty())
{
j.str = e;
j.error_file = j.storage->error_file();
ret = -1;
j.storage->clear_error();
j.storage->mark_failed(j.piece);
break; break;
} }
case disk_io_job::move_storage: ret = (j.storage->info()->hash_for_piece(j.piece) == h)?0:-2;
{ if (ret == -2) j.storage->mark_failed(j.piece);
break;
}
case disk_io_job::move_storage:
{
#ifdef TORRENT_DISK_STATS #ifdef TORRENT_DISK_STATS
m_log << log_time() << " move" << std::endl; m_log << log_time() << " move" << std::endl;
#endif #endif
TORRENT_ASSERT(j.buffer == 0); TORRENT_ASSERT(j.buffer == 0);
ret = j.storage->move_storage_impl(j.str) ? 1 : 0; ret = j.storage->move_storage_impl(j.str) ? 1 : 0;
if (ret != 0) if (ret != 0)
{ {
j.str = j.storage->error(); j.str = j.storage->error();
j.error_file = j.storage->error_file(); j.error_file = j.storage->error_file();
j.storage->clear_error(); j.storage->clear_error();
break;
}
j.str = j.storage->save_path().string();
break; break;
} }
case disk_io_job::release_files: j.str = j.storage->save_path().string();
{ break;
}
case disk_io_job::release_files:
{
#ifdef TORRENT_DISK_STATS #ifdef TORRENT_DISK_STATS
m_log << log_time() << " release" << std::endl; m_log << log_time() << " release" << std::endl;
#endif #endif
TORRENT_ASSERT(j.buffer == 0); TORRENT_ASSERT(j.buffer == 0);
mutex_t::scoped_lock l(m_mutex); mutex_t::scoped_lock l(m_mutex);
INVARIANT_CHECK; INVARIANT_CHECK;
for (cache_t::iterator i = m_pieces.begin(); i != m_pieces.end();) for (cache_t::iterator i = m_pieces.begin(); i != m_pieces.end();)
{
if (i->storage == j.storage)
{ {
if (i->storage == j.storage) flush(i, l);
{ i = m_pieces.erase(i);
flush(i, l);
i = m_pieces.erase(i);
}
else
{
++i;
}
} }
else
{
++i;
}
}
#ifndef TORRENT_DISABLE_POOL_ALLOCATOR #ifndef TORRENT_DISABLE_POOL_ALLOCATOR
m_pool.release_memory(); m_pool.release_memory();
#endif #endif
l.unlock(); l.unlock();
ret = j.storage->release_files_impl(); ret = j.storage->release_files_impl();
if (ret != 0) if (ret != 0)
{
j.str = j.storage->error();
j.error_file = j.storage->error_file();
j.storage->clear_error();
}
break;
}
case disk_io_job::delete_files:
{ {
j.str = j.storage->error();
j.error_file = j.storage->error_file();
j.storage->clear_error();
}
break;
}
case disk_io_job::delete_files:
{
#ifdef TORRENT_DISK_STATS #ifdef TORRENT_DISK_STATS
m_log << log_time() << " delete" << std::endl; m_log << log_time() << " delete" << std::endl;
#endif #endif
TORRENT_ASSERT(j.buffer == 0); TORRENT_ASSERT(j.buffer == 0);
mutex_t::scoped_lock l(m_mutex); mutex_t::scoped_lock l(m_mutex);
INVARIANT_CHECK; INVARIANT_CHECK;
cache_t::iterator i = std::remove_if( cache_t::iterator i = std::remove_if(
m_pieces.begin(), m_pieces.end(), bind(&cached_piece_entry::storage, _1) == j.storage); m_pieces.begin(), m_pieces.end(), bind(&cached_piece_entry::storage, _1) == j.storage);
for (cache_t::iterator k = i; k != m_pieces.end(); ++k) for (cache_t::iterator k = i; k != m_pieces.end(); ++k)
{
torrent_info const& ti = *k->storage->info();
int blocks_in_piece = (ti.piece_size(k->piece) + m_block_size - 1) / m_block_size;
for (int j = 0; j < blocks_in_piece; ++j)
{ {
torrent_info const& ti = *k->storage->info(); if (k->blocks[j] == 0) continue;
int blocks_in_piece = (ti.piece_size(k->piece) + m_block_size - 1) / m_block_size; free_buffer(k->blocks[j], l);
for (int j = 0; j < blocks_in_piece; ++j) k->blocks[j] = 0;
{
if (k->blocks[j] == 0) continue;
free_buffer(k->blocks[j], l);
k->blocks[j] = 0;
}
} }
m_pieces.erase(i, m_pieces.end()); }
m_pieces.erase(i, m_pieces.end());
#ifndef TORRENT_DISABLE_POOL_ALLOCATOR #ifndef TORRENT_DISABLE_POOL_ALLOCATOR
m_pool.release_memory(); m_pool.release_memory();
#endif #endif
l.unlock(); l.unlock();
ret = j.storage->delete_files_impl(); ret = j.storage->delete_files_impl();
if (ret != 0) if (ret != 0)
{
j.str = j.storage->error();
j.error_file = j.storage->error_file();
j.storage->clear_error();
}
break;
}
case disk_io_job::check_fastresume:
{ {
#ifdef TORRENT_DISK_STATS j.str = j.storage->error();
m_log << log_time() << " check fastresume" << std::endl; j.error_file = j.storage->error_file();
#endif j.storage->clear_error();
entry const* rd = (entry const*)j.buffer;
TORRENT_ASSERT(rd != 0);
ret = j.storage->check_fastresume(*rd, j.str);
break;
} }
case disk_io_job::check_files: break;
{ }
case disk_io_job::check_fastresume:
{
#ifdef TORRENT_DISK_STATS #ifdef TORRENT_DISK_STATS
m_log << log_time() << " check files" << std::endl; m_log << log_time() << " check fastresume" << std::endl;
#endif #endif
int piece_size = j.storage->info()->piece_length(); entry const* rd = (entry const*)j.buffer;
for (int processed = 0; processed < 4 * 1024 * 1024; processed += piece_size) TORRENT_ASSERT(rd != 0);
{ ret = j.storage->check_fastresume(*rd, j.str);
ret = j.storage->check_files(j.piece, j.offset, j.str); break;
}
case disk_io_job::check_files:
{
#ifdef TORRENT_DISK_STATS
m_log << log_time() << " check files" << std::endl;
#endif
int piece_size = j.storage->info()->piece_length();
for (int processed = 0; processed < 4 * 1024 * 1024; processed += piece_size)
{
ret = j.storage->check_files(j.piece, j.offset, j.str);
#ifndef BOOST_NO_EXCEPTIONS #ifndef BOOST_NO_EXCEPTIONS
try { try {
#endif #endif
TORRENT_ASSERT(handler); TORRENT_ASSERT(handler);
if (handler && ret == piece_manager::need_full_check) if (handler && ret == piece_manager::need_full_check)
m_ios.post(bind(handler, ret, j)); m_ios.post(bind(handler, ret, j));
#ifndef BOOST_NO_EXCEPTIONS #ifndef BOOST_NO_EXCEPTIONS
} catch (std::exception&) {} } catch (std::exception&) {}
#endif #endif
if (ret != piece_manager::need_full_check) break; if (ret != piece_manager::need_full_check) break;
}
// if the check is not done, add it at the end of the job queue
if (ret == piece_manager::need_full_check)
{
mutex_t::scoped_lock l(m_mutex);
m_jobs.push_back(j);
m_jobs.back().callback.swap(handler);
continue;
}
break;
} }
case disk_io_job::save_resume_data: // if the check is not done, add it at the end of the job queue
if (ret == piece_manager::need_full_check)
{ {
#ifdef TORRENT_DISK_STATS mutex_t::scoped_lock l(m_mutex);
m_log << log_time() << " save resume data" << std::endl; m_jobs.push_back(j);
#endif m_jobs.back().callback.swap(handler);
j.resume_data.reset(new entry(entry::dictionary_t)); continue;
j.storage->write_resume_data(*j.resume_data);
ret = 0;
break;
} }
break;
}
case disk_io_job::save_resume_data:
{
#ifdef TORRENT_DISK_STATS
m_log << log_time() << " save resume data" << std::endl;
#endif
j.resume_data.reset(new entry(entry::dictionary_t));
j.storage->write_resume_data(*j.resume_data);
ret = 0;
break;
} }
} }
#ifndef BOOST_NO_EXCEPTIONS #ifndef BOOST_NO_EXCEPTIONS

View File

@ -1391,9 +1391,18 @@ namespace libtorrent
if (alerts().should_post(alert::warning)) if (alerts().should_post(alert::warning))
{ {
write_resume_data(*j.resume_data); char const* msg;
if (j.resume_data)
{
write_resume_data(*j.resume_data);
msg = "resume data generated";
}
else
{
msg = j.str.c_str();
}
alerts().post_alert(save_resume_data_alert(j.resume_data alerts().post_alert(save_resume_data_alert(j.resume_data
, get_handle(), "resume data generated")); , get_handle(), msg));
} }
} }