From 644d3aa66c5227e348a87d6ff3319f8bdc333b77 Mon Sep 17 00:00:00 2001 From: Arvid Norberg Date: Sun, 13 Apr 2008 21:26:57 +0000 Subject: [PATCH] disk IO thread error handling --- src/disk_io_thread.cpp | 445 +++++++++++++++++++++-------------------- src/torrent.cpp | 13 +- 2 files changed, 239 insertions(+), 219 deletions(-) diff --git a/src/disk_io_thread.cpp b/src/disk_io_thread.cpp index ad2530c04..ecb18b6de 100644 --- a/src/disk_io_thread.cpp +++ b/src/disk_io_thread.cpp @@ -770,133 +770,132 @@ namespace libtorrent #ifndef BOOST_NO_EXCEPTIONS try { #endif - std::string const& error_string = j.storage->error(); - if (!error_string.empty()) - { -#ifndef NDEBUG - std::cout << "ERROR: '" << error_string << "' " << j.error_file << std::endl; -#endif - j.str = error_string; - j.error_file = j.storage->error_file(); - j.storage->clear_error(); - ret = -1; - } - else - { - switch (j.action) - { - case disk_io_job::read: - { -#ifdef TORRENT_DISK_STATS - m_log << log_time() << " read " << j.buffer_size << std::endl; -#endif - mutex_t::scoped_lock l(m_mutex); - INVARIANT_CHECK; - TORRENT_ASSERT(j.buffer == 0); - j.buffer = allocate_buffer(); - TORRENT_ASSERT(j.buffer_size <= m_block_size); - if (j.buffer == 0) - { - ret = -1; - j.str = "out of memory"; - break; - } - disk_buffer_holder read_holder(*this, j.buffer); - ret = try_read_from_cache(j, l); - - // -2 means there's no space in the read cache - // or that the read cache is disabled - if (ret == -1) + switch (j.action) + { + case disk_io_job::read: + { + std::string const& error_string = j.storage->error(); + if (!error_string.empty()) + { +#ifndef NDEBUG + std::cout << "ERROR: '" << error_string << "' " << j.error_file << std::endl; +#endif + j.str = error_string; + j.error_file = j.storage->error_file(); + j.storage->clear_error(); + ret = -1; + break; + } +#ifdef TORRENT_DISK_STATS + m_log << log_time() << " read " << j.buffer_size << std::endl; +#endif + mutex_t::scoped_lock l(m_mutex); + INVARIANT_CHECK; + TORRENT_ASSERT(j.buffer == 0); + j.buffer = allocate_buffer(); + TORRENT_ASSERT(j.buffer_size <= m_block_size); + if (j.buffer == 0) + { + ret = -1; + j.str = "out of memory"; + break; + } + + disk_buffer_holder read_holder(*this, j.buffer); + ret = try_read_from_cache(j, l); + + // -2 means there's no space in the read cache + // or that the read cache is disabled + if (ret == -1) + { + j.buffer = 0; + j.str = j.storage->error(); + j.error_file = j.storage->error_file(); + j.storage->clear_error(); + break; + } + else if (ret == -2) + { + l.unlock(); + ret = j.storage->read_impl(j.buffer, j.piece, j.offset + , j.buffer_size); + if (ret < 0) { - j.buffer = 0; j.str = j.storage->error(); j.error_file = j.storage->error_file(); j.storage->clear_error(); break; } - else if (ret == -2) - { - l.unlock(); - ret = j.storage->read_impl(j.buffer, j.piece, j.offset - , j.buffer_size); - if (ret < 0) - { - j.str = j.storage->error(); - j.error_file = j.storage->error_file(); - j.storage->clear_error(); - break; - } - l.lock(); - ++m_cache_stats.blocks_read; - } - read_holder.release(); + l.lock(); + ++m_cache_stats.blocks_read; + } + read_holder.release(); + break; + } + case disk_io_job::write: + { + std::string const& error_string = j.storage->error(); + if (!error_string.empty()) + { +#ifndef NDEBUG + std::cout << "ERROR: '" << error_string << "' " << j.error_file << std::endl; +#endif + j.str = error_string; + j.error_file = j.storage->error_file(); + j.storage->clear_error(); + ret = -1; break; } - case disk_io_job::write: - { #ifdef TORRENT_DISK_STATS - m_log << log_time() << " write " << j.buffer_size << std::endl; + m_log << log_time() << " write " << j.buffer_size << std::endl; #endif - mutex_t::scoped_lock l(m_mutex); - INVARIANT_CHECK; - cache_t::iterator p - = find_cached_piece(m_pieces, j, l); - int block = j.offset / m_block_size; - TORRENT_ASSERT(j.buffer); - TORRENT_ASSERT(j.buffer_size <= m_block_size); - if (p != m_pieces.end()) + mutex_t::scoped_lock l(m_mutex); + INVARIANT_CHECK; + cache_t::iterator p + = find_cached_piece(m_pieces, j, l); + int block = j.offset / m_block_size; + TORRENT_ASSERT(j.buffer); + TORRENT_ASSERT(j.buffer_size <= m_block_size); + if (p != m_pieces.end()) + { + TORRENT_ASSERT(p->blocks[block] == 0); + if (p->blocks[block]) { - TORRENT_ASSERT(p->blocks[block] == 0); - if (p->blocks[block]) - { - free_buffer(p->blocks[block]); - --p->num_blocks; - } - p->blocks[block] = j.buffer; - ++m_cache_stats.cache_size; - ++p->num_blocks; - p->last_use = time_now(); + free_buffer(p->blocks[block]); + --p->num_blocks; } - else - { - cache_block(j, l); - } - // we've now inserted the buffer - // in the cache, we should not - // free it at the end - holder.release(); - if (m_cache_stats.cache_size >= m_cache_size) - flush_oldest_piece(l); - break; + p->blocks[block] = j.buffer; + ++m_cache_stats.cache_size; + ++p->num_blocks; + p->last_use = time_now(); } - case disk_io_job::hash: + else { + cache_block(j, l); + } + // we've now inserted the buffer + // in the cache, we should not + // free it at the end + holder.release(); + if (m_cache_stats.cache_size >= m_cache_size) + flush_oldest_piece(l); + break; + } + case disk_io_job::hash: + { #ifdef TORRENT_DISK_STATS - m_log << log_time() << " hash" << std::endl; + m_log << log_time() << " hash" << std::endl; #endif - mutex_t::scoped_lock l(m_mutex); + mutex_t::scoped_lock l(m_mutex); - INVARIANT_CHECK; + INVARIANT_CHECK; - cache_t::iterator i - = find_cached_piece(m_pieces, j, l); - if (i != m_pieces.end()) - { - flush_and_remove(i, l); - std::string const& e = j.storage->error(); - if (!e.empty()) - { - j.str = e; - j.error_file = j.storage->error_file(); - ret = -1; - j.storage->clear_error(); - j.storage->mark_failed(j.piece); - break; - } - } - l.unlock(); - sha1_hash h = j.storage->hash_for_piece_impl(j.piece); + cache_t::iterator i + = find_cached_piece(m_pieces, j, l); + if (i != m_pieces.end()) + { + flush_and_remove(i, l); std::string const& e = j.storage->error(); if (!e.empty()) { @@ -907,151 +906,163 @@ namespace libtorrent j.storage->mark_failed(j.piece); break; } - ret = (j.storage->info()->hash_for_piece(j.piece) == h)?0:-2; - if (ret == -2) j.storage->mark_failed(j.piece); + } + l.unlock(); + sha1_hash h = j.storage->hash_for_piece_impl(j.piece); + std::string const& e = j.storage->error(); + if (!e.empty()) + { + j.str = e; + j.error_file = j.storage->error_file(); + ret = -1; + j.storage->clear_error(); + j.storage->mark_failed(j.piece); break; } - case disk_io_job::move_storage: - { + ret = (j.storage->info()->hash_for_piece(j.piece) == h)?0:-2; + if (ret == -2) j.storage->mark_failed(j.piece); + break; + } + case disk_io_job::move_storage: + { #ifdef TORRENT_DISK_STATS - m_log << log_time() << " move" << std::endl; + m_log << log_time() << " move" << std::endl; #endif - TORRENT_ASSERT(j.buffer == 0); - ret = j.storage->move_storage_impl(j.str) ? 1 : 0; - if (ret != 0) - { - j.str = j.storage->error(); - j.error_file = j.storage->error_file(); - j.storage->clear_error(); - break; - } - j.str = j.storage->save_path().string(); + TORRENT_ASSERT(j.buffer == 0); + ret = j.storage->move_storage_impl(j.str) ? 1 : 0; + if (ret != 0) + { + j.str = j.storage->error(); + j.error_file = j.storage->error_file(); + j.storage->clear_error(); break; } - case disk_io_job::release_files: - { + j.str = j.storage->save_path().string(); + break; + } + case disk_io_job::release_files: + { #ifdef TORRENT_DISK_STATS - m_log << log_time() << " release" << std::endl; + m_log << log_time() << " release" << std::endl; #endif - TORRENT_ASSERT(j.buffer == 0); - mutex_t::scoped_lock l(m_mutex); + TORRENT_ASSERT(j.buffer == 0); + mutex_t::scoped_lock l(m_mutex); - INVARIANT_CHECK; + INVARIANT_CHECK; - for (cache_t::iterator i = m_pieces.begin(); i != m_pieces.end();) + for (cache_t::iterator i = m_pieces.begin(); i != m_pieces.end();) + { + if (i->storage == j.storage) { - if (i->storage == j.storage) - { - flush(i, l); - i = m_pieces.erase(i); - } - else - { - ++i; - } + flush(i, l); + i = m_pieces.erase(i); } + else + { + ++i; + } + } #ifndef TORRENT_DISABLE_POOL_ALLOCATOR - m_pool.release_memory(); + m_pool.release_memory(); #endif - l.unlock(); - ret = j.storage->release_files_impl(); - if (ret != 0) - { - j.str = j.storage->error(); - j.error_file = j.storage->error_file(); - j.storage->clear_error(); - } - break; - } - case disk_io_job::delete_files: + l.unlock(); + ret = j.storage->release_files_impl(); + if (ret != 0) { + j.str = j.storage->error(); + j.error_file = j.storage->error_file(); + j.storage->clear_error(); + } + break; + } + case disk_io_job::delete_files: + { #ifdef TORRENT_DISK_STATS - m_log << log_time() << " delete" << std::endl; + m_log << log_time() << " delete" << std::endl; #endif - TORRENT_ASSERT(j.buffer == 0); - mutex_t::scoped_lock l(m_mutex); + TORRENT_ASSERT(j.buffer == 0); + mutex_t::scoped_lock l(m_mutex); - INVARIANT_CHECK; + INVARIANT_CHECK; - cache_t::iterator i = std::remove_if( - m_pieces.begin(), m_pieces.end(), bind(&cached_piece_entry::storage, _1) == j.storage); + cache_t::iterator i = std::remove_if( + m_pieces.begin(), m_pieces.end(), bind(&cached_piece_entry::storage, _1) == j.storage); - for (cache_t::iterator k = i; k != m_pieces.end(); ++k) + for (cache_t::iterator k = i; k != m_pieces.end(); ++k) + { + torrent_info const& ti = *k->storage->info(); + int blocks_in_piece = (ti.piece_size(k->piece) + m_block_size - 1) / m_block_size; + for (int j = 0; j < blocks_in_piece; ++j) { - torrent_info const& ti = *k->storage->info(); - int blocks_in_piece = (ti.piece_size(k->piece) + m_block_size - 1) / m_block_size; - for (int j = 0; j < blocks_in_piece; ++j) - { - if (k->blocks[j] == 0) continue; - free_buffer(k->blocks[j], l); - k->blocks[j] = 0; - } + if (k->blocks[j] == 0) continue; + free_buffer(k->blocks[j], l); + k->blocks[j] = 0; } - m_pieces.erase(i, m_pieces.end()); + } + m_pieces.erase(i, m_pieces.end()); #ifndef TORRENT_DISABLE_POOL_ALLOCATOR - m_pool.release_memory(); + m_pool.release_memory(); #endif - l.unlock(); - ret = j.storage->delete_files_impl(); - if (ret != 0) - { - j.str = j.storage->error(); - j.error_file = j.storage->error_file(); - j.storage->clear_error(); - } - break; - } - case disk_io_job::check_fastresume: + l.unlock(); + ret = j.storage->delete_files_impl(); + if (ret != 0) { -#ifdef TORRENT_DISK_STATS - m_log << log_time() << " check fastresume" << std::endl; -#endif - entry const* rd = (entry const*)j.buffer; - TORRENT_ASSERT(rd != 0); - ret = j.storage->check_fastresume(*rd, j.str); - break; + j.str = j.storage->error(); + j.error_file = j.storage->error_file(); + j.storage->clear_error(); } - case disk_io_job::check_files: - { + break; + } + case disk_io_job::check_fastresume: + { #ifdef TORRENT_DISK_STATS - m_log << log_time() << " check files" << std::endl; + m_log << log_time() << " check fastresume" << std::endl; #endif - int piece_size = j.storage->info()->piece_length(); - for (int processed = 0; processed < 4 * 1024 * 1024; processed += piece_size) - { - ret = j.storage->check_files(j.piece, j.offset, j.str); + entry const* rd = (entry const*)j.buffer; + TORRENT_ASSERT(rd != 0); + ret = j.storage->check_fastresume(*rd, j.str); + break; + } + case disk_io_job::check_files: + { +#ifdef TORRENT_DISK_STATS + m_log << log_time() << " check files" << std::endl; +#endif + int piece_size = j.storage->info()->piece_length(); + for (int processed = 0; processed < 4 * 1024 * 1024; processed += piece_size) + { + ret = j.storage->check_files(j.piece, j.offset, j.str); #ifndef BOOST_NO_EXCEPTIONS - try { + try { #endif - TORRENT_ASSERT(handler); - if (handler && ret == piece_manager::need_full_check) - m_ios.post(bind(handler, ret, j)); + TORRENT_ASSERT(handler); + if (handler && ret == piece_manager::need_full_check) + m_ios.post(bind(handler, ret, j)); #ifndef BOOST_NO_EXCEPTIONS - } catch (std::exception&) {} + } catch (std::exception&) {} #endif - if (ret != piece_manager::need_full_check) break; - } - // if the check is not done, add it at the end of the job queue - if (ret == piece_manager::need_full_check) - { - mutex_t::scoped_lock l(m_mutex); - m_jobs.push_back(j); - m_jobs.back().callback.swap(handler); - continue; - } - break; + if (ret != piece_manager::need_full_check) break; } - case disk_io_job::save_resume_data: + // if the check is not done, add it at the end of the job queue + if (ret == piece_manager::need_full_check) { -#ifdef TORRENT_DISK_STATS - m_log << log_time() << " save resume data" << std::endl; -#endif - j.resume_data.reset(new entry(entry::dictionary_t)); - j.storage->write_resume_data(*j.resume_data); - ret = 0; - break; + mutex_t::scoped_lock l(m_mutex); + m_jobs.push_back(j); + m_jobs.back().callback.swap(handler); + continue; } + break; + } + case disk_io_job::save_resume_data: + { +#ifdef TORRENT_DISK_STATS + m_log << log_time() << " save resume data" << std::endl; +#endif + j.resume_data.reset(new entry(entry::dictionary_t)); + j.storage->write_resume_data(*j.resume_data); + ret = 0; + break; } } #ifndef BOOST_NO_EXCEPTIONS diff --git a/src/torrent.cpp b/src/torrent.cpp index 324569920..8afc9e990 100755 --- a/src/torrent.cpp +++ b/src/torrent.cpp @@ -1391,9 +1391,18 @@ namespace libtorrent if (alerts().should_post(alert::warning)) { - write_resume_data(*j.resume_data); + char const* msg; + if (j.resume_data) + { + write_resume_data(*j.resume_data); + msg = "resume data generated"; + } + else + { + msg = j.str.c_str(); + } alerts().post_alert(save_resume_data_alert(j.resume_data - , get_handle(), "resume data generated")); + , get_handle(), msg)); } }