From f0134c1b11d0d83ed5389b96f14f13c074c15bea Mon Sep 17 00:00:00 2001 From: Arvid Norberg Date: Wed, 10 Jun 2009 08:30:55 +0000 Subject: [PATCH] improved disk error handling and expanded use of error_code in error reporting --- ChangeLog | 3 + docs/manual.rst | 30 ++-- include/libtorrent/alert_types.hpp | 18 ++- include/libtorrent/aux_/session_impl.hpp | 1 + include/libtorrent/disk_io_thread.hpp | 39 +++-- include/libtorrent/peer_connection.hpp | 4 +- include/libtorrent/peer_info.hpp | 7 +- include/libtorrent/piece_picker.hpp | 2 +- include/libtorrent/session_settings.hpp | 9 +- include/libtorrent/storage.hpp | 2 + src/disk_io_thread.cpp | 183 +++++++++++++---------- src/peer_connection.cpp | 62 ++++---- src/piece_picker.cpp | 64 +++++--- src/session.cpp | 2 +- src/session_impl.cpp | 22 ++- src/storage.cpp | 17 +-- src/torrent.cpp | 83 +++++----- test/test_storage.cpp | 4 +- test/test_web_seed.cpp | 65 ++++---- 19 files changed, 356 insertions(+), 261 deletions(-) diff --git a/ChangeLog b/ChangeLog index 8ee14789a..4684525a7 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,6 @@ + * improved disk error handling and expanded use of error_code in + error reporting. added a bandwidth state, bw_disk, when waiting + for the disk io thread to catch up writing buffers * improved read cache memory efficiency * added another cache flush algorithm to write the largest contiguous blocks instead of the least recently used diff --git a/docs/manual.rst b/docs/manual.rst index 8db2b6e3a..0dee5a47e 100644 --- a/docs/manual.rst +++ b/docs/manual.rst @@ -3014,7 +3014,7 @@ It contains the following fields:: int source; - enum bw_state { bw_idle, bw_torrent, bw_global, bw_network }; + enum bw_state { bw_idle, bw_limit, bw_network, bw_disk }; char read_state; char write_state; @@ -3172,13 +3172,7 @@ defines as follows: | | send or receive data. | | | | +------------------------+--------------------------------------------------------+ -| ``bw_torrent`` | The peer is waiting for the torrent to receive | -| | bandwidth quota in order to forward the bandwidth | -| | request to the global manager. | -| | | -+------------------------+--------------------------------------------------------+ -| ``bw_global`` | The peer is waiting for the global bandwidth manager | -| | to receive more quota in order to handle the request. | +| ``bw_limit`` | The peer is waiting for the rate limiter. | | | | +------------------------+--------------------------------------------------------+ | ``bw_network`` | The peer has quota and is currently waiting for a | @@ -3187,6 +3181,10 @@ defines as follows: | | limits. | | | | +------------------------+--------------------------------------------------------+ +| ``bw_disk`` | The peer is waiting for the disk I/O thread to catch | +| | up writing buffers to disk before downloading more. | +| | | ++------------------------+--------------------------------------------------------+ The ``ip`` field is the IP-address to this peer. The type is an asio endpoint. For more info, see the asio_ documentation. @@ -3386,7 +3384,7 @@ session_settings int num_want; int initial_picker_threshold; int allowed_fast_set_size; - int max_outstanding_disk_bytes_per_connection; + int max_queued_disk_bytes; int handshake_timeout; bool use_dht_as_fallback; bool free_torrent_hashes; @@ -3595,10 +3593,12 @@ in rarest first order. ``allowed_fast_set_size`` is the number of pieces we allow peers to download from us without being unchoked. -``max_outstanding_disk_bytes_per_connection`` is the number of bytes each -connection is allowed to have waiting in the disk I/O queue before it is -throttled back. This limit is meant to stop fast internet connections to -queue up bufferes indefinitely on slow hard-drives or storage. +``max_queued_disk_bytes`` is the number maximum number of bytes, to be +written to disk, that can wait in the disk I/O thread queue. This queue +is only for waiting for the disk I/O thread to receive the job and either +write it to disk or insert it in the write cache. When this limit is reached, +the peer connections will stop reading data from their sockets, until the disk +thread catches up. Setting this too low will severly limit your download rate. ``handshake_timeout`` specifies the number of seconds we allow a peer to delay responding to a protocol handshake. If no response is received within @@ -4687,7 +4687,7 @@ generated and the torrent is paused. ``file`` is the path to the file that was accessed when the error occurred. -``msg`` is the error message received from the OS. +``error`` is the error code describing the error. :: @@ -4695,7 +4695,7 @@ generated and the torrent is paused. { // ... std::string file; - std::string msg; + error_code error; }; file_renamed_alert diff --git a/include/libtorrent/alert_types.hpp b/include/libtorrent/alert_types.hpp index a8406b977..447429ae8 100644 --- a/include/libtorrent/alert_types.hpp +++ b/include/libtorrent/alert_types.hpp @@ -958,15 +958,23 @@ namespace libtorrent { file_error_alert( std::string const& f - , const torrent_handle& h - , const std::string& msg_) + , torrent_handle const& h + , error_code const& e) : torrent_alert(h) , file(f) - , msg(msg_) - {} + , error(e) + { +#ifndef TORRENT_NO_DEPRECATE + msg = error.message(); +#endif + } std::string file; + error_code error; + +#ifndef TORRENT_NO_DEPRECATE std::string msg; +#endif virtual std::auto_ptr clone() const { return std::auto_ptr(new file_error_alert(*this)); } @@ -978,7 +986,7 @@ namespace libtorrent virtual std::string message() const { return torrent_alert::message() + " file (" + file + ") error: " - + msg; + + error.message(); } }; diff --git a/include/libtorrent/aux_/session_impl.hpp b/include/libtorrent/aux_/session_impl.hpp index c025925ff..f23f21aad 100644 --- a/include/libtorrent/aux_/session_impl.hpp +++ b/include/libtorrent/aux_/session_impl.hpp @@ -590,6 +590,7 @@ namespace libtorrent // NAT or not. bool m_incoming_connection; + void on_disk_queue(); void on_tick(error_code const& e); int auto_manage_torrents(std::vector& list diff --git a/include/libtorrent/disk_io_thread.hpp b/include/libtorrent/disk_io_thread.hpp index c8f10b032..15d3c422f 100644 --- a/include/libtorrent/disk_io_thread.hpp +++ b/include/libtorrent/disk_io_thread.hpp @@ -73,8 +73,6 @@ namespace libtorrent , piece(0) , offset(0) , priority(0) - , error_piece(-1) - , error_op(-1) {} enum action_t @@ -122,12 +120,6 @@ namespace libtorrent // the error code from the file operation error_code error; - // the piece the error occurred on - int error_piece; - - // the operation that failed (only read or write) - int error_op; - // this is called when operation completes boost::function callback; }; @@ -244,7 +236,9 @@ namespace libtorrent // of disk io jobs struct disk_io_thread : disk_buffer_pool { - disk_io_thread(io_service& ios, int block_size = 16 * 1024); + disk_io_thread(io_service& ios + , boost::function const& queue_callback + , int block_size = 16 * 1024); ~disk_io_thread(); void join(); @@ -255,6 +249,8 @@ namespace libtorrent , boost::function const& f = boost::function()); + int queued_write_bytes() const; + // keep track of the number of bytes in the job queue // at any given time. i.e. the sum of all buffer_size. // this is used to slow down the download global download @@ -277,6 +273,17 @@ namespace libtorrent std::ofstream m_disk_access_log; #endif + struct cached_block_entry + { + cached_block_entry(): buf(0) {} + // the buffer pointer (this is a disk_pool buffer) + // or 0 + char* buf; + + // callback for when this block is flushed to disk + boost::function callback; + }; + struct cached_piece_entry { int piece; @@ -287,7 +294,7 @@ namespace libtorrent // the number of blocks in the cache for this piece int num_blocks; // the pointers to the block data - boost::shared_array blocks; + boost::shared_array blocks; }; typedef boost::recursive_mutex mutex_t; @@ -308,13 +315,17 @@ namespace libtorrent // write cache operations enum options_t { dont_flush_write_blocks = 1, ignore_cache_size = 2 }; - int flush_cache_blocks(mutex_t::scoped_lock& l, int blocks, cache_t::iterator ignore, int options = 0); + int flush_cache_blocks(mutex_t::scoped_lock& l + , int blocks, cache_t::iterator ignore + , int options = 0); void flush_expired_pieces(); int flush_and_remove(cache_t::iterator i, mutex_t::scoped_lock& l); int flush_contiguous_blocks(disk_io_thread::cache_t::iterator e , mutex_t::scoped_lock& l, int lower_limit = 0); int flush_range(cache_t::iterator i, int start, int end, mutex_t::scoped_lock& l); - int cache_block(disk_io_job& j, mutex_t::scoped_lock& l); + int cache_block(disk_io_job& j + , boost::function& handler + , mutex_t::scoped_lock& l); // read cache operations int clear_oldest_read_piece(int num_blocks, cache_t::iterator ignore @@ -351,12 +362,16 @@ namespace libtorrent // exceed m_cache_size cache_status m_cache_stats; + int m_queued_write_bytes; + #ifdef TORRENT_DISK_STATS std::ofstream m_log; #endif io_service& m_ios; + boost::function m_queue_callback; + // this keeps the io_service::run() call blocked from // returning. When shutting down, it's possible that // the event queue is drained before the disk_io_thread diff --git a/include/libtorrent/peer_connection.hpp b/include/libtorrent/peer_connection.hpp index 22068e7f8..929a51ee7 100644 --- a/include/libtorrent/peer_connection.hpp +++ b/include/libtorrent/peer_connection.hpp @@ -519,6 +519,8 @@ namespace libtorrent size_type downloaded_since_unchoke() const { return m_statistics.total_payload_download() - m_downloaded_at_last_unchoke; } + void setup_receive(); + protected: virtual void get_specific_peer_info(peer_info& p) const = 0; @@ -571,8 +573,6 @@ namespace libtorrent void reset_recv_buffer(int packet_size); void set_soft_packet_size(int size) { m_soft_packet_size = size; } - void setup_receive(); - void attach_to_torrent(sha1_hash const& ih); bool verify_piece(peer_request const& p) const; diff --git a/include/libtorrent/peer_info.hpp b/include/libtorrent/peer_info.hpp index ad8160298..3ddbf6180 100644 --- a/include/libtorrent/peer_info.hpp +++ b/include/libtorrent/peer_info.hpp @@ -80,11 +80,12 @@ namespace libtorrent int source; // bw_idle: the channel is not used - // bw_torrent: the channel is waiting for torrent quota - // bw_global: the channel is waiting for global quota + // bw_limit: the channel is waiting for quota // bw_network: the channel is waiting for an async write // for read operation to complete - enum bw_state { bw_idle, bw_limit, bw_network }; + // bw_disk: the peer is waiting for the disk io thread + // to catch up + enum bw_state { bw_idle, bw_limit, bw_network, bw_disk }; #ifndef TORRENT_NO_DEPRECATE enum bw_state_deprecated { bw_torrent = bw_limit, bw_global = bw_limit }; #endif diff --git a/include/libtorrent/piece_picker.hpp b/include/libtorrent/piece_picker.hpp index ece78544a..b216d0c8d 100644 --- a/include/libtorrent/piece_picker.hpp +++ b/include/libtorrent/piece_picker.hpp @@ -275,7 +275,7 @@ namespace libtorrent , piece_state_t s); void mark_as_writing(piece_block block, void* peer); void mark_as_finished(piece_block block, void* peer); - void write_failed(int piece); + void write_failed(piece_block block); int num_peers(piece_block block) const; // returns information about the given piece diff --git a/include/libtorrent/session_settings.hpp b/include/libtorrent/session_settings.hpp index 63eef5a8e..775f297b8 100644 --- a/include/libtorrent/session_settings.hpp +++ b/include/libtorrent/session_settings.hpp @@ -112,7 +112,7 @@ namespace libtorrent , num_want(200) , initial_picker_threshold(4) , allowed_fast_set_size(10) - , max_outstanding_disk_bytes_per_connection(64 * 1024) + , max_queued_disk_bytes(256 * 1024) , handshake_timeout(10) #ifndef TORRENT_DISABLE_DHT , use_dht_as_fallback(false) @@ -334,8 +334,11 @@ namespace libtorrent // rate is being throttled. This prevents fast downloads // to slow medias to allocate more and more memory // indefinitely. This should be set to at least 16 kB - // to not completely disrupt normal downloads. - int max_outstanding_disk_bytes_per_connection; + // to not completely disrupt normal downloads. If it's + // set to 0, you will be starving the disk thread and + // nothing will be written to disk. + // this is a per session setting. + int max_queued_disk_bytes; // the number of seconds to wait for a handshake // response from a peer. If no response is received diff --git a/include/libtorrent/storage.hpp b/include/libtorrent/storage.hpp index b71242c1a..4ac899152 100644 --- a/include/libtorrent/storage.hpp +++ b/include/libtorrent/storage.hpp @@ -224,6 +224,8 @@ namespace libtorrent void async_check_files(boost::function const& handler); + int queued_bytes() const; + void async_rename_file(int index, std::string const& name , boost::function const& handler); diff --git a/src/disk_io_thread.cpp b/src/disk_io_thread.cpp index 6c84c725a..82f43d050 100644 --- a/src/disk_io_thread.cpp +++ b/src/disk_io_thread.cpp @@ -268,13 +268,17 @@ namespace libtorrent // ------- disk_io_thread ------ - disk_io_thread::disk_io_thread(asio::io_service& ios, int block_size) + disk_io_thread::disk_io_thread(asio::io_service& ios + , boost::function const& queue_callback + , int block_size) : disk_buffer_pool(block_size) , m_abort(false) , m_waiting_to_shutdown(false) , m_queue_buffer_size(0) , m_last_file_check(time_now_hires()) + , m_queued_write_bytes(0) , m_ios(ios) + , m_queue_callback(queue_callback) , m_work(io_service::work(m_ios)) , m_disk_io_thread(boost::ref(*this)) { @@ -322,7 +326,7 @@ namespace libtorrent int blocks_in_piece = (ti.piece_size(i->piece) + (m_block_size) - 1) / m_block_size; info.blocks.resize(blocks_in_piece); for (int b = 0; b < blocks_in_piece; ++b) - if (i->blocks[b]) info.blocks[b] = true; + if (i->blocks[b].buf) info.blocks[b] = true; ret.push_back(info); } for (cache_t::const_iterator i = m_read_pieces.begin() @@ -337,7 +341,7 @@ namespace libtorrent int blocks_in_piece = (ti.piece_size(i->piece) + (m_block_size) - 1) / m_block_size; info.blocks.resize(blocks_in_piece); for (int b = 0; b < blocks_in_piece; ++b) - if (i->blocks[b]) info.blocks[b] = true; + if (i->blocks[b].buf) info.blocks[b] = true; ret.push_back(info); } } @@ -466,10 +470,10 @@ namespace libtorrent for (int i = 0; i < blocks_in_piece; ++i) { - if (p.blocks[i] == 0) continue; - free_buffer(p.blocks[i]); + if (p.blocks[i].buf == 0) continue; + free_buffer(p.blocks[i].buf); ++ret; - p.blocks[i] = 0; + p.blocks[i].buf = 0; --p.num_blocks; --m_cache_stats.cache_size; --m_cache_stats.read_cache_size; @@ -507,10 +511,10 @@ namespace libtorrent while (num_blocks) { - while (i->blocks[start] == 0 && start <= end) ++start; + while (i->blocks[start].buf == 0 && start <= end) ++start; if (start > end) break; - free_buffer(i->blocks[start]); - i->blocks[start] = 0; + free_buffer(i->blocks[start].buf); + i->blocks[start].buf = 0; ++blocks; --i->num_blocks; --m_cache_stats.cache_size; @@ -518,10 +522,10 @@ namespace libtorrent --num_blocks; if (!num_blocks) break; - while (i->blocks[end] == 0 && start <= end) --end; + while (i->blocks[end].buf == 0 && start <= end) --end; if (start > end) break; - free_buffer(i->blocks[end]); - i->blocks[end] = 0; + free_buffer(i->blocks[end].buf); + i->blocks[end].buf = 0; ++blocks; --i->num_blocks; --m_cache_stats.cache_size; @@ -543,7 +547,7 @@ namespace libtorrent int blocks_in_piece = (b.storage->info()->piece_size(b.piece) + 16 * 1024 - 1) / (16 * 1024); for (int i = 0; i < blocks_in_piece; ++i) { - if (b.blocks[i]) ++current; + if (b.blocks[i].buf) ++current; else { if (current > ret) ret = current; @@ -566,7 +570,7 @@ namespace libtorrent + m_block_size - 1) / m_block_size; for (int i = 0; i < blocks_in_piece; ++i) { - if (e->blocks[i]) ++current; + if (e->blocks[i].buf) ++current; else { if (current > len) @@ -673,7 +677,7 @@ namespace libtorrent end = (std::min)(end, blocks_in_piece); for (int i = start; i <= end; ++i) { - if (i == end || p.blocks[i] == 0) + if (i == end || p.blocks[i].buf == 0) { if (buffer_size == 0) continue; @@ -704,13 +708,13 @@ namespace libtorrent TORRENT_ASSERT(offset + block_size > 0); if (!buf) { - iov[iov_counter].iov_base = p.blocks[i]; + iov[iov_counter].iov_base = p.blocks[i].buf; iov[iov_counter].iov_len = block_size; ++iov_counter; } else { - std::memcpy(buf.get() + offset, p.blocks[i], block_size); + std::memcpy(buf.get() + offset, p.blocks[i].buf, block_size); offset += m_block_size; } buffer_size += block_size; @@ -721,11 +725,22 @@ namespace libtorrent } int ret = 0; + disk_io_job j; + j.storage = p.storage; + j.action = disk_io_job::write; + j.buffer = 0; + j.piece = p.piece; + test_error(j); for (int i = start; i < end; ++i) { - if (p.blocks[i] == 0) continue; - free_buffer(p.blocks[i]); - p.blocks[i] = 0; + if (p.blocks[i].buf == 0) continue; + j.buffer_size = (std::min)(piece_size - i * m_block_size, m_block_size); + int result = j.error ? -1 : j.buffer_size; + j.offset = i * m_block_size; + free_buffer(p.blocks[i].buf); + post_callback(p.blocks[i].callback, j, result); + p.blocks[i].callback.clear(); + p.blocks[i].buf = 0; ++ret; } @@ -733,36 +748,41 @@ namespace libtorrent // std::cerr << " flushing p: " << p.piece << " cached_blocks: " << m_cache_stats.cache_size << std::endl; #ifdef TORRENT_DEBUG for (int i = start; i < end; ++i) - TORRENT_ASSERT(p.blocks[i] == 0); + TORRENT_ASSERT(p.blocks[i].buf == 0); #endif return ret; } // returns -1 on failure - int disk_io_thread::cache_block(disk_io_job& j, mutex_t::scoped_lock& l) + int disk_io_thread::cache_block(disk_io_job& j + , boost::function& handler + , mutex_t::scoped_lock& l) { INVARIANT_CHECK; TORRENT_ASSERT(find_cached_piece(m_pieces, j, l) == m_pieces.end()); TORRENT_ASSERT((j.offset & (m_block_size-1)) == 0); cached_piece_entry p; + int piece_size = j.storage->info()->piece_size(j.piece); + int blocks_in_piece = (piece_size + m_block_size - 1) / m_block_size; + // there's no point in caching the piece if + // there's only one block in it + if (blocks_in_piece <= 1) return -1; + #ifdef TORRENT_DISK_STATS rename_buffer(j.buffer, "write cache"); #endif - int piece_size = j.storage->info()->piece_size(j.piece); - int blocks_in_piece = (piece_size + m_block_size - 1) / m_block_size; - p.piece = j.piece; p.storage = j.storage; p.last_use = time_now(); p.num_blocks = 1; - p.blocks.reset(new (std::nothrow) char*[blocks_in_piece]); + p.blocks.reset(new (std::nothrow) cached_block_entry[blocks_in_piece]); if (!p.blocks) return -1; - std::memset(&p.blocks[0], 0, blocks_in_piece * sizeof(char*)); int block = j.offset / m_block_size; // std::cerr << " adding cache entry for p: " << j.piece << " block: " << block << " cached_blocks: " << m_cache_stats.cache_size << std::endl; - p.blocks[block] = j.buffer; + p.blocks[block].buf = j.buffer; + p.blocks[block].callback.swap(handler); ++m_cache_stats.cache_size; m_pieces.push_back(p); return 0; @@ -785,11 +805,11 @@ namespace libtorrent // this is a block that is already allocated // stop allocating and don't read more than // what we've allocated now - if (p.blocks[i]) break; - p.blocks[i] = allocate_buffer("read cache"); + if (p.blocks[i].buf) break; + p.blocks[i].buf = allocate_buffer("read cache"); // the allocation failed, break - if (p.blocks[i] == 0) break; + if (p.blocks[i].buf == 0) break; ++p.num_blocks; ++m_cache_stats.cache_size; ++m_cache_stats.read_cache_size; @@ -835,17 +855,17 @@ namespace libtorrent for (int i = start_block; i < end_block; ++i) { int block_size = (std::min)(piece_size - piece_offset, m_block_size); - if (p.blocks[i] == 0) break; + if (p.blocks[i].buf == 0) break; TORRENT_ASSERT(offset <= buffer_size); TORRENT_ASSERT(piece_offset <= piece_size); TORRENT_ASSERT(offset + block_size <= buffer_size); if (buf) { - std::memcpy(p.blocks[i], buf.get() + offset, block_size); + std::memcpy(p.blocks[i].buf, buf.get() + offset, block_size); } else { - iov[iov_counter].iov_base = p.blocks[i]; + iov[iov_counter].iov_base = p.blocks[i].buf; iov[iov_counter].iov_len = block_size; ++iov_counter; } @@ -894,9 +914,8 @@ namespace libtorrent p.storage = j.storage; p.last_use = time_now(); p.num_blocks = 0; - p.blocks.reset(new (std::nothrow) char*[blocks_in_piece]); + p.blocks.reset(new (std::nothrow) cached_block_entry[blocks_in_piece]); if (!p.blocks) return -1; - std::memset(&p.blocks[0], 0, blocks_in_piece * sizeof(char*)); int ret = read_into_piece(p, 0, ignore_cache_size, INT_MAX, l); if (ret < 0) @@ -933,9 +952,8 @@ namespace libtorrent p.storage = j.storage; p.last_use = time_now(); p.num_blocks = 0; - p.blocks.reset(new (std::nothrow) char*[blocks_in_piece]); + p.blocks.reset(new (std::nothrow) cached_block_entry[blocks_in_piece]); if (!p.blocks) return -1; - std::memset(&p.blocks[0], 0, blocks_in_piece * sizeof(char*)); int ret = read_into_piece(p, start_block, 0, blocks_to_read, l); if (ret < 0) @@ -962,10 +980,10 @@ namespace libtorrent int blocks = 0; for (int k = 0; k < blocks_in_piece; ++k) { - if (p.blocks[k]) + if (p.blocks[k].buf) { #ifndef TORRENT_DISABLE_POOL_ALLOCATOR - TORRENT_ASSERT(is_disk_buffer(p.blocks[k])); + TORRENT_ASSERT(is_disk_buffer(p.blocks[k].buf)); #endif ++blocks; } @@ -986,10 +1004,10 @@ namespace libtorrent int blocks = 0; for (int k = 0; k < blocks_in_piece; ++k) { - if (p.blocks[k]) + if (p.blocks[k].buf) { #ifndef TORRENT_DISABLE_POOL_ALLOCATOR - TORRENT_ASSERT(is_disk_buffer(p.blocks[k])); + TORRENT_ASSERT(is_disk_buffer(p.blocks[k].buf)); #endif ++blocks; } @@ -1048,8 +1066,8 @@ namespace libtorrent for (int i = 0; i < blocks_in_piece; ++i) { - TORRENT_ASSERT(p->blocks[i]); - ctx.update((char const*)p->blocks[i], (std::min)(piece_size, m_block_size)); + TORRENT_ASSERT(p->blocks[i].buf); + ctx.update((char const*)p->blocks[i].buf, (std::min)(piece_size, m_block_size)); piece_size -= m_block_size; } h = ctx.final(); @@ -1092,17 +1110,17 @@ namespace libtorrent int min_blocks_to_read = block_offset > 0 ? 2 : 1; TORRENT_ASSERT(size <= m_block_size); int start_block = block; - if (p->blocks[start_block] != 0 && min_blocks_to_read > 1) + if (p->blocks[start_block].buf != 0 && min_blocks_to_read > 1) ++start_block; // if block_offset > 0, we need to read two blocks, and then // copy parts of both, because it's not aligned to the block // boundaries - if (p->blocks[start_block] == 0) + if (p->blocks[start_block].buf == 0) { int piece_size = j.storage->info()->piece_size(j.piece); int blocks_in_piece = (piece_size + m_block_size - 1) / m_block_size; int end_block = start_block; - while (end_block < blocks_in_piece && p->blocks[end_block] == 0) ++end_block; + while (end_block < blocks_in_piece && p->blocks[end_block].buf == 0) ++end_block; int blocks_to_read = end_block - block; blocks_to_read = (std::min)(blocks_to_read, (std::max)((m_settings.cache_size @@ -1118,17 +1136,17 @@ namespace libtorrent hit = false; if (ret < 0) return ret; if (ret < size + block_offset) return -2; - TORRENT_ASSERT(p->blocks[block]); + TORRENT_ASSERT(p->blocks[block].buf); } p->last_use = time_now(); while (size > 0) { - TORRENT_ASSERT(p->blocks[block]); + TORRENT_ASSERT(p->blocks[block].buf); int to_copy = (std::min)(m_block_size - block_offset, size); std::memcpy(j.buffer + buffer_offset - , p->blocks[block] + block_offset + , p->blocks[block].buf + block_offset , to_copy); size -= to_copy; block_offset = 0; @@ -1177,6 +1195,12 @@ namespace libtorrent return ret; } + int disk_io_thread::queued_write_bytes() const + { + mutex_t::scoped_lock l(m_queue_mutex); + return m_queued_write_bytes; + } + void disk_io_thread::add_job(disk_io_job const& j , boost::function const& f) { @@ -1212,6 +1236,7 @@ namespace libtorrent } else if (j.action == disk_io_job::write) { + m_queued_write_bytes += j.buffer_size; for (; i != m_jobs.rend(); ++i) { if (*i < j) @@ -1245,17 +1270,14 @@ namespace libtorrent if (ec) { j.buffer = 0; - j.str = ec.message(); + j.str.clear(); j.error = ec; j.error_file = j.storage->error_file(); - j.error_piece = j.storage->last_piece(); - j.error_op = j.storage->last_operation(); - j.storage->clear_error(); #ifdef TORRENT_DEBUG - std::cout << "ERROR: '" << j.str << "' while " - << (j.error_op == disk_io_job::read?"reading ":"writing ") + std::cout << "ERROR: '" << ec.message() << " in " << j.error_file << std::endl; #endif + j.storage->clear_error(); return true; } return false; @@ -1324,6 +1346,16 @@ namespace libtorrent m_queue_buffer_size -= j.buffer_size; jl.unlock(); + if (m_queue_buffer_size + j.buffer_size >= m_settings.max_queued_disk_bytes + && m_queue_buffer_size < m_settings.max_queued_disk_bytes + && m_queue_callback) + { + // we just dropped below the high watermark of number of bytes + // queued for writing to the disk. Notify the session so that it + // can trigger all the connections waiting for this event + m_ios.post(m_queue_callback); + } + flush_expired_pieces(); int ret = 0; @@ -1425,9 +1457,7 @@ namespace libtorrent #else j.error = asio::error::no_memory; #endif - j.error_piece = j.piece; - j.error_op = disk_io_job::read; - j.str = j.error.message(); + j.str.clear(); break; } @@ -1452,9 +1482,7 @@ namespace libtorrent { j.storage->mark_failed(j.piece); j.error = error_code(errors::failed_hash_check, libtorrent_category); - j.str = j.error.message(); - j.error_piece = j.storage->last_piece(); - j.error_op = disk_io_job::read; + j.str.clear(); j.buffer = 0; break; } @@ -1489,9 +1517,7 @@ namespace libtorrent #else j.error = asio::error::no_memory; #endif - j.error_piece = j.piece; - j.error_op = disk_io_job::read; - j.str = j.error.message(); + j.str.clear(); break; } @@ -1522,9 +1548,7 @@ namespace libtorrent j.buffer = 0; j.error = error_code(errors::file_too_short, libtorrent_category); j.error_file.clear(); - j.str = j.error.message(); - j.error_piece = j.storage->last_piece(); - j.error_op = disk_io_job::read; + j.str.clear(); ret = -1; break; } @@ -1539,11 +1563,7 @@ namespace libtorrent } case disk_io_job::write: { - if (test_error(j)) - { - ret = -1; - break; - } + m_queued_write_bytes -= j.buffer_size; #ifdef TORRENT_DISK_STATS m_log << log_time() << " write " << j.buffer_size << std::endl; #endif @@ -1560,13 +1580,14 @@ namespace libtorrent TORRENT_ASSERT(j.buffer_size <= m_block_size); if (p != m_pieces.end()) { - TORRENT_ASSERT(p->blocks[block] == 0); - if (p->blocks[block]) + TORRENT_ASSERT(p->blocks[block].buf == 0); + if (p->blocks[block].buf) { - free_buffer(p->blocks[block]); + free_buffer(p->blocks[block].buf); --p->num_blocks; } - p->blocks[block] = j.buffer; + p->blocks[block].buf = j.buffer; + p->blocks[block].callback.swap(handler); #ifdef TORRENT_DISK_STATS rename_buffer(j.buffer, "write cache"); #endif @@ -1579,7 +1600,7 @@ namespace libtorrent } else { - if (cache_block(j, l) < 0) + if (cache_block(j, handler, l) < 0) { file::iovec_t iov = {j.buffer, j.buffer_size}; ret = j.storage->write_impl(&iov, j.piece, j.offset, 1); @@ -1723,9 +1744,9 @@ namespace libtorrent int blocks_in_piece = (ti.piece_size(k->piece) + m_block_size - 1) / m_block_size; for (int j = 0; j < blocks_in_piece; ++j) { - if (k->blocks[j] == 0) continue; - free_buffer(k->blocks[j]); - k->blocks[j] = 0; + if (k->blocks[j].buf == 0) continue; + free_buffer(k->blocks[j].buf); + k->blocks[j].buf = 0; --m_cache_stats.cache_size; } } diff --git a/src/peer_connection.cpp b/src/peer_connection.cpp index c59326242..3c77058cd 100644 --- a/src/peer_connection.cpp +++ b/src/peer_connection.cpp @@ -1871,7 +1871,8 @@ namespace libtorrent "s: " << p.start << " | " "l: " << p.length << " | " "ds: " << statistics().download_rate() << " | " - "qs: " << int(m_desired_queue_size) << " ]\n"; + "qs: " << int(m_desired_queue_size) << " | " + "q: " << int(m_download_queue.size()) << " ]\n"; #endif if (p.length == 0) @@ -2019,7 +2020,8 @@ namespace libtorrent TORRENT_ASSERT(m_channel_state[download_channel] == peer_info::bw_idle); m_download_queue.erase(b); - if (m_outstanding_writing_bytes > m_ses.settings().max_outstanding_disk_bytes_per_connection + if (t->filesystem().queued_bytes() >= m_ses.settings().max_queued_disk_bytes + && m_ses.settings().max_queued_disk_bytes && t->alerts().should_post()) { t->alerts().post_alert(performance_alert(t->get_handle() @@ -2052,6 +2054,19 @@ namespace libtorrent && defined TORRENT_EXPENSIVE_INVARIANT_CHECKS t->check_invariant(); #endif + + // did we just finish the piece? + // this means all blocks are either written + // to disk or are in the disk write cache + if (picker.is_piece_finished(p.piece)) + { +#ifdef TORRENT_DEBUG + check_postcondition post_checker2_(t, false); +#endif + t->async_verify_piece(p.piece, bind(&torrent::piece_finished, t + , p.piece, _1)); + } + request_a_block(*t, *this); send_block_requests(); } @@ -2104,24 +2119,6 @@ namespace libtorrent } if (t->is_aborted()) return; - - // did we just finish the piece? - if (picker.is_piece_finished(p.piece)) - { -#ifdef TORRENT_DEBUG - check_postcondition post_checker2_(t, false); -#endif - t->async_verify_piece(p.piece, bind(&torrent::piece_finished, t - , p.piece, _1)); - } - - if (!t->is_seed() && !m_torrent.expired()) - { - // this is a free function defined in policy.cpp - request_a_block(*t, *this); - send_block_requests(); - } - } // ----------------------------- @@ -3696,7 +3693,8 @@ namespace libtorrent { INVARIANT_CHECK; - if (m_channel_state[download_channel] != peer_info::bw_idle) return; + if (m_channel_state[download_channel] != peer_info::bw_idle + && m_channel_state[download_channel] != peer_info::bw_disk) return; shared_ptr t = m_torrent.lock(); @@ -3732,10 +3730,19 @@ namespace libtorrent (*m_logger) << time_now_string() << " *** CANNOT READ [" " quota: " << m_quota[download_channel] << " ignore: " << (m_ignore_bandwidth_limits?"yes":"no") << - " outstanding: " << m_outstanding_writing_bytes << - " outstanding-limit: " << m_ses.settings().max_outstanding_disk_bytes_per_connection << + " queue-size: " << ((t && t->get_storage())?t->filesystem().queued_bytes():0) << + " queue-limit: " << m_ses.settings().max_queued_disk_bytes << + " disconnecting: " << (m_disconnecting?"yes":"no") << " ]\n"; #endif + if (m_ses.settings().max_queued_disk_bytes > 0 + && t && t->get_storage() + && t->filesystem().queued_bytes() >= m_ses.settings().max_queued_disk_bytes) + m_channel_state[download_channel] = peer_info::bw_disk; + + // if we block reading, waiting for the disk, we will wake up + // by the disk_io_thread posting a message every time it drops + // from being at or exceeding the limit down to below the limit return; } @@ -4091,11 +4098,16 @@ namespace libtorrent bool peer_connection::can_read() const { + boost::shared_ptr t = m_torrent.lock(); + bool ret = (m_quota[download_channel] > 0 || m_ignore_bandwidth_limits) && !m_connecting - && m_outstanding_writing_bytes <= - m_ses.settings().max_outstanding_disk_bytes_per_connection; + && (m_ses.settings().max_queued_disk_bytes == 0 + || !t + || t->get_storage() == 0 + || t->filesystem().queued_bytes() < m_ses.settings().max_queued_disk_bytes) + && !m_disconnecting; return ret; } diff --git a/src/piece_picker.cpp b/src/piece_picker.cpp index 7666fad42..cd32ca099 100644 --- a/src/piece_picker.cpp +++ b/src/piece_picker.cpp @@ -196,6 +196,7 @@ namespace libtorrent std::copy(other->info, other->info + m_blocks_per_piece, i->info); other->info = i->info; } + m_piece_map[i->index].downloading = false; m_downloads.erase(i); } @@ -792,18 +793,17 @@ namespace libtorrent TORRENT_ASSERT(i != m_downloads.end()); #ifdef TORRENT_DEBUG - int num_blocks = blocks_in_piece(i->index); - for (int k = 0; k < num_blocks; ++k) - { - TORRENT_ASSERT(i->info[k].state == block_info::state_finished); - TORRENT_ASSERT(i->info[k].num_peers == 0); - } + int num_blocks = blocks_in_piece(i->index); + for (int k = 0; k < num_blocks; ++k) + { + TORRENT_ASSERT(i->info[k].state == block_info::state_finished); + TORRENT_ASSERT(i->info[k].num_peers == 0); + } #endif - erase_download_piece(i); piece_pos& p = m_piece_map[index]; int prev_priority = p.priority(this); - p.downloading = 0; + erase_download_piece(i); int new_priority = p.priority(this); if (new_priority == prev_priority) return; @@ -1074,7 +1074,6 @@ namespace libtorrent , has_index(index)); TORRENT_ASSERT(i != m_downloads.end()); erase_download_piece(i); - p.downloading = 0; } TORRENT_ASSERT(std::find_if(m_downloads.begin(), m_downloads.end() @@ -1907,16 +1906,17 @@ namespace libtorrent TORRENT_ASSERT(i != m_downloads.end()); TORRENT_ASSERT((int)i->finished <= m_blocks_per_piece); int max_blocks = blocks_in_piece(index); - if ((int)i->finished < max_blocks) return false; + if (int(i->finished) + int(i->writing) < max_blocks) return false; + TORRENT_ASSERT(int(i->finished) + int(i->writing) == max_blocks); #ifdef TORRENT_DEBUG for (int k = 0; k < max_blocks; ++k) { - TORRENT_ASSERT(i->info[k].state == block_info::state_finished); + TORRENT_ASSERT(i->info[k].state == block_info::state_finished + || i->info[k].state == block_info::state_writing); } #endif - TORRENT_ASSERT((int)i->finished == max_blocks); return true; } @@ -2117,15 +2117,46 @@ namespace libtorrent } } - void piece_picker::write_failed(int piece) + void piece_picker::write_failed(piece_block block) { TORRENT_PIECE_PICKER_INVARIANT_CHECK; std::vector::iterator i - = std::find_if(m_downloads.begin(), m_downloads.end(), has_index(piece)); + = std::find_if(m_downloads.begin(), m_downloads.end(), has_index(block.piece_index)); TORRENT_ASSERT(i != m_downloads.end()); + if (i == m_downloads.end()) return; - erase_download_piece(i); + block_info& info = i->info[block.block_index]; + TORRENT_ASSERT(info.state == block_info::state_writing); + TORRENT_ASSERT(info.num_peers == 0); + + TORRENT_ASSERT(i->writing > 0); + TORRENT_ASSERT(info.state == block_info::state_writing); + + if (info.state == block_info::state_finished) return; + if (info.state == block_info::state_writing) --i->writing; + + info.peer = 0; + + info.state = block_info::state_none; + + if (i->finished + i->writing + i->requested == 0) + { + piece_pos& p = m_piece_map[block.piece_index]; + int prev_priority = p.priority(this); + erase_download_piece(i); + int new_priority = p.priority(this); + TORRENT_ASSERT(new_priority >= 0); + + if (m_dirty) return; + if (new_priority == prev_priority) return; + if (prev_priority == -1) add(p.index); + else update(prev_priority, p.index); + } + else + { + sort_piece(i); + } } void piece_picker::mark_as_finished(piece_block block, void* peer) @@ -2283,12 +2314,11 @@ namespace libtorrent // that's being downloaded, remove it from the list if (i->requested + i->finished + i->writing == 0) { - erase_download_piece(i); piece_pos& p = m_piece_map[block.piece_index]; int prev_prio = p.priority(this); TORRENT_ASSERT(prev_prio < int(m_priority_boundries.size()) || m_dirty); - p.downloading = 0; + erase_download_piece(i); if (!m_dirty) { int prio = p.priority(this); diff --git a/src/session.cpp b/src/session.cpp index 248ce89a0..a508788ca 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -141,7 +141,7 @@ namespace libtorrent // whenever a peer has downloaded one block, write // it to disk, and don't read anything from the // socket until the disk write is complete - set.max_outstanding_disk_bytes_per_connection = 0; + set.max_queued_disk_bytes = 1; // don't keep track of all upnp devices, keep // the device list small diff --git a/src/session_impl.cpp b/src/session_impl.cpp index a932b3f57..7728fcb1d 100644 --- a/src/session_impl.cpp +++ b/src/session_impl.cpp @@ -183,7 +183,7 @@ namespace aux { , m_files(40) , m_io_service() , m_alerts(m_io_service) - , m_disk_thread(m_io_service) + , m_disk_thread(m_io_service, boost::bind(&session_impl::on_disk_queue, this)) , m_half_open(m_io_service) , m_download_rate(peer_connection::download_channel) #ifdef TORRENT_VERBOSE_BANDWIDTH_LIMIT @@ -646,6 +646,7 @@ namespace aux { || m_settings.write_cache_line_size != s.write_cache_line_size || m_settings.coalesce_writes != s.coalesce_writes || m_settings.coalesce_reads != s.coalesce_reads + || m_settings.max_queued_disk_bytes != s.max_queued_disk_bytes #ifndef TORRENT_DISABLE_MLOCK || m_settings.lock_disk_cache != s.lock_disk_cache #endif @@ -1150,6 +1151,25 @@ namespace aux { return port; } + // this function is called from the disk-io thread + // when the disk queue is low enough to post new + // write jobs to it. It will go through all peer + // connections that are blocked on the disk and + // wake them up + void session_impl::on_disk_queue() + { + session_impl::mutex_t::scoped_lock l(m_mutex); + + for (connection_map::iterator i = m_connections.begin() + , end(m_connections.end()); i != end; ++i) + { + if ((*i)->m_channel_state[peer_connection::download_channel] + != peer_info::bw_disk) continue; + + (*i)->setup_receive(); + } + } + void session_impl::on_tick(error_code const& e) { session_impl::mutex_t::scoped_lock l(m_mutex); diff --git a/src/storage.cpp b/src/storage.cpp index 96412e4a5..88912be7a 100644 --- a/src/storage.cpp +++ b/src/storage.cpp @@ -481,7 +481,6 @@ namespace libtorrent TORRENT_ASSERT(!error()); int num_read = 0; int slot_size = piece_size - ph.offset; - m_last_op = disk_io_job::read; if (slot_size > 0) { int block_size = 16 * 1024; @@ -520,6 +519,7 @@ namespace libtorrent ph.h.update((char const*)bufs[i].iov_base, bufs[i].iov_len); small_piece_size -= bufs[i].iov_len; } + ph.offset += bufs[i].iov_len; m_storage->disk_pool()->free_buffer((char*)bufs[i].iov_base); } } @@ -1486,7 +1486,6 @@ ret: , m_scratch_buffer2(io, 0) , m_scratch_piece(-1) , m_last_piece(-1) - , m_last_op(-1) , m_storage_constructor(sc) , m_io_thread(io) , m_torrent(torrent) @@ -1628,6 +1627,11 @@ ret: #endif } + int piece_manager::queued_bytes() const + { + return m_io_thread.queued_write_bytes(); + } + void piece_manager::async_write( peer_request const& r , disk_buffer_holder& buffer @@ -1749,7 +1753,6 @@ ret: TORRENT_ASSERT(offset >= 0); TORRENT_ASSERT(num_bufs > 0); m_last_piece = piece_index; - m_last_op = disk_io_job::read; int slot = slot_for(piece_index); return m_storage->readv(bufs, slot, offset, num_bufs); } @@ -1770,7 +1773,6 @@ ret: file::iovec_t* iov = TORRENT_ALLOCA(file::iovec_t, num_bufs); std::copy(bufs, bufs + num_bufs, iov); m_last_piece = piece_index; - m_last_op = disk_io_job::write; int slot = allocate_slot_for_piece(piece_index); int ret = m_storage->writev(bufs, slot, offset, num_bufs); // only save the partial hash if the write succeeds @@ -2344,7 +2346,6 @@ ret: // the slot where this piece belongs is // free. Just move the piece there. m_last_piece = piece; - m_last_op = disk_io_job::write; m_storage->move_slot(m_current_slot, piece); if (m_storage->error()) return -1; @@ -2572,7 +2573,6 @@ ret: bool ret = false; m_last_piece = piece_index; - m_last_op = disk_io_job::write; if (other_piece >= 0) ret |= m_storage->swap_slots(other_slot, m_current_slot); else @@ -2612,7 +2612,6 @@ ret: } m_last_piece = other_piece; - m_last_op = disk_io_job::write; if (ret) return skip_file(); TORRENT_ASSERT(m_slot_to_piece[m_current_slot] == unassigned @@ -2652,7 +2651,6 @@ ret: TORRENT_ASSERT(piece_index == slot1); m_last_piece = piece_index; - m_last_op = disk_io_job::write; m_storage->swap_slots(m_current_slot, slot1); TORRENT_ASSERT(m_slot_to_piece[m_current_slot] == unassigned @@ -2700,7 +2698,6 @@ ret: } m_last_piece = piece_index; - m_last_op = disk_io_job::write; if (ret) return skip_file(); TORRENT_ASSERT(m_slot_to_piece[m_current_slot] == unassigned @@ -2847,7 +2844,6 @@ ret: , m_piece_to_slot[piece_at_our_slot]); m_last_piece = piece_index; - m_last_op = disk_io_job::write; m_storage->move_slot(piece_index, slot_index); TORRENT_ASSERT(m_slot_to_piece[piece_index] == piece_index); @@ -2892,7 +2888,6 @@ ret: if (m_piece_to_slot[pos] != has_no_slot) { m_last_piece = pos; - m_last_op = disk_io_job::write; new_free_slot = m_piece_to_slot[pos]; m_storage->move_slot(new_free_slot, pos); m_slot_to_piece[pos] = pos; diff --git a/src/torrent.cpp b/src/torrent.cpp index ac14847aa..95320a07e 100644 --- a/src/torrent.cpp +++ b/src/torrent.cpp @@ -345,11 +345,23 @@ namespace libtorrent if (!j.error) return; #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING - (*m_ses.m_logger) << "disk error: '" << j.str << "' while " - << (j.error_op == disk_io_job::read?"reading ":"writing ") - << " piece " << j.error_piece << " in file " << j.error_file << "\n"; + (*m_ses.m_logger) << "disk error: '" << j.error.message() + << " in file " << j.error_file + << " in torrent " << torrent_file().name() + << "\n"; #endif + TORRENT_ASSERT(j.piece >= 0); + + piece_block block_finished(j.piece, j.offset / block_size()); + + if (j.action == disk_io_job::write + || j.action == disk_io_job::hash) + { + // we failed to write j.piece to disk tell the piece picker + if (has_picker() && j.piece >= 0) picker().write_failed(block_finished); + } + if (j.error == #if BOOST_VERSION >= 103500 error_code(boost::system::errc::not_enough_memory, get_posix_category()) @@ -359,23 +371,15 @@ namespace libtorrent ) { if (alerts().should_post()) - alerts().post_alert(file_error_alert(j.error_file, get_handle(), j.str)); + alerts().post_alert(file_error_alert(j.error_file, get_handle(), j.error)); if (c) c->disconnect("no memory"); return; } - TORRENT_ASSERT(j.error_piece >= 0); - - if (j.error_op == disk_io_job::write) - { - // we failed to write j.error_piece to disk - // tell the piece picker - if (has_picker() && j.error_piece >= 0) picker().write_failed(j.error_piece); - } // notify the user of the error if (alerts().should_post()) - alerts().post_alert(file_error_alert(j.error_file, get_handle(), j.str)); + alerts().post_alert(file_error_alert(j.error_file, get_handle(), j.error)); // put the torrent in an error-state set_error(j.error, j.error_file); @@ -658,23 +662,10 @@ namespace libtorrent if (ret == piece_manager::fatal_disk_error) { - if (m_ses.m_alerts.should_post()) - { - m_ses.m_alerts.post_alert(file_error_alert(j.error_file, get_handle(), j.str)); - } -#if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING - (*m_ses.m_logger) << time_now_string() << ": fatal disk error [" - " error: " << j.str << - " torrent: " << torrent_file().name() << - " ]\n"; -#endif - set_error(j.error, j.error_file); - pause(); + handle_disk_error(j); set_state(torrent_status::queued_for_checking); - std::vector().swap(m_resume_data); lazy_entry().swap(m_resume_entry); - return; } @@ -886,12 +877,13 @@ namespace libtorrent m_picker->init(m_torrent_file->piece_length() / m_block_size , int((m_torrent_file->total_size()+m_block_size-1)/m_block_size)); // assume that we don't have anything + TORRENT_ASSERT(m_picker->num_have() == 0); m_files_checked = false; set_state(torrent_status::checking_resume_data); m_policy.recalculate_connect_candidates(); - if (m_auto_managed) + if (m_auto_managed && !is_finished()) set_queue_position((std::numeric_limits::max)()); std::vector().swap(m_resume_data); @@ -907,18 +899,7 @@ namespace libtorrent if (ret == piece_manager::fatal_disk_error) { - if (m_ses.m_alerts.should_post()) - { - m_ses.m_alerts.post_alert(file_error_alert(j.error_file, get_handle(), j.str)); - } -#if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING - (*m_ses.m_logger) << time_now_string() << ": fatal disk error [" - " error: " << j.str << - " torrent: " << torrent_file().name() << - " ]\n"; -#endif - set_error(j.error, j.error_file); - pause(); + handle_disk_error(j); return; } if (ret == 0) @@ -959,7 +940,7 @@ namespace libtorrent { if (m_ses.m_alerts.should_post()) { - m_ses.m_alerts.post_alert(file_error_alert(j.error_file, get_handle(), j.str)); + m_ses.m_alerts.post_alert(file_error_alert(j.error_file, get_handle(), j.error)); } #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING (*m_ses.m_logger) << time_now_string() << ": fatal disk error [" @@ -1684,6 +1665,14 @@ namespace libtorrent TORRENT_ASSERT(valid_metadata()); + // even though the piece passed the hash-check + // it might still have failed being written to disk + // if so, piece_picker::write_failed() has been + // called, and the piece is no longer finished. + // in this case, we have to ignore the fact that + // it passed the check + if (!m_picker->is_piece_finished(index)) return; + if (passed_hash_check == 0) { // the following call may cause picker to become invalid @@ -1991,7 +1980,7 @@ namespace libtorrent for (peer_iterator i = m_connections.begin(); i != m_connections.end(); ++i) { - (*(*i)->m_logger) << "*** ABORTING TORRENT\n"; + (*(*i)->m_logger) << time_now_string() << " *** ABORTING TORRENT\n"; } #endif @@ -3991,6 +3980,8 @@ namespace libtorrent std::for_each(seeds.begin(), seeds.end() , bind(&peer_connection::disconnect, _1, "torrent finished, disconnecting seed", 0)); + if (m_abort) return; + m_policy.recalculate_connect_candidates(); TORRENT_ASSERT(m_storage); @@ -5198,13 +5189,7 @@ namespace libtorrent // -1: disk failure // -2: hash check failed - if (ret == -1) - { - if (alerts().should_post()) - alerts().post_alert(file_error_alert(j.error_file, get_handle(), j.str)); - set_error(j.error, j.error_file); - pause(); - } + if (ret == -1) handle_disk_error(j); f(ret); } diff --git a/test/test_storage.cpp b/test/test_storage.cpp index 6aaaccb43..2baed30cc 100644 --- a/test/test_storage.cpp +++ b/test/test_storage.cpp @@ -195,7 +195,7 @@ void run_storage_tests(boost::intrusive_ptr info { file_pool fp; libtorrent::asio::io_service ios; - disk_io_thread io(ios); + disk_io_thread io(ios, boost::function()); boost::shared_ptr dummy(new int); boost::intrusive_ptr pm = new piece_manager(dummy, info , test_path, fp, io, default_storage_constructor, storage_mode); @@ -396,7 +396,7 @@ void test_check_files(path const& test_path file_pool fp; libtorrent::asio::io_service ios; - disk_io_thread io(ios); + disk_io_thread io(ios, boost::function()); boost::shared_ptr dummy(new int); boost::intrusive_ptr pm = new piece_manager(dummy, info , test_path, fp, io, default_storage_constructor, storage_mode); diff --git a/test/test_web_seed.cpp b/test/test_web_seed.cpp index 685036aa2..a17c08646 100644 --- a/test/test_web_seed.cpp +++ b/test/test_web_seed.cpp @@ -55,12 +55,12 @@ void test_transfer(boost::intrusive_ptr torrent_file, int proxy) session ses(fingerprint(" ", 0,0,0,0), 0); session_settings settings; settings.ignore_limits_on_local_network = false; - settings.max_outstanding_disk_bytes_per_connection = 256 * 1024; + settings.max_queued_disk_bytes = 256 * 1024; ses.set_settings(settings); ses.set_alert_mask(~alert::progress_notification); ses.listen_on(std::make_pair(51000, 52000)); ses.set_download_rate_limit(torrent_file->total_size() / 10); - remove_all("./tmp1"); + remove_all("./tmp2_web_seed"); char const* test_name[] = {"no", "SOCKS4", "SOCKS5", "SOCKS5 password", "HTTP", "HTTP password"}; @@ -78,7 +78,7 @@ void test_transfer(boost::intrusive_ptr torrent_file, int proxy) ses.set_web_seed_proxy(ps); } - torrent_handle th = ses.add_torrent(*torrent_file, "./tmp1"); + torrent_handle th = ses.add_torrent(*torrent_file, "./tmp2_web_seed"); std::vector empty; th.replace_trackers(empty); @@ -94,12 +94,20 @@ void test_transfer(boost::intrusive_ptr torrent_file, int proxy) session_status ss = ses.status(); rate_sum += s.download_payload_rate; ses_rate_sum += ss.payload_download_rate; + + cache_status cs = ses.get_cache_status(); + if (cs.blocks_read < 1) cs.blocks_read = 1; + if (cs.blocks_written < 1) cs.blocks_written = 1; + std::cerr << (s.progress * 100.f) << " %" << " torrent rate: " << (s.download_rate / 1000.f) << " kB/s" << " session rate: " << (ss.download_rate / 1000.f) << " kB/s" << " session total: " << ss.total_payload_download << " torrent total: " << s.total_payload_download << " rate sum:" << ses_rate_sum + << " cache: " << cs.cache_size + << " rcache: " << cs.read_cache_size + << " buffers: " << cs.total_used_buffers << std::endl; print_alerts(ses, "ses"); @@ -126,8 +134,8 @@ void test_transfer(boost::intrusive_ptr torrent_file, int proxy) if (proxy) stop_proxy(8002); - TEST_CHECK(exists("./tmp1" / torrent_file->file_at(0).path)); - remove_all("./tmp1"); + TEST_CHECK(exists("./tmp2_web_seed" / torrent_file->file_at(0).path)); + remove_all("./tmp2_web_seed"); } int test_main() @@ -136,54 +144,45 @@ int test_main() using namespace boost::filesystem; try { - create_directory("test_torrent_dir"); + create_directory("./tmp1_web_seed"); + } catch (std::exception&) {} + + try { + create_directory("./tmp1_web_seed/test_torrent_dir"); } catch (std::exception&) {} char random_data[300000]; - std::srand(std::time(0)); + std::srand(10); +// memset(random_data, 1, sizeof(random_data)); std::generate(random_data, random_data + sizeof(random_data), &std::rand); - std::ofstream("./test_torrent_dir/test1").write(random_data, 35); - std::ofstream("./test_torrent_dir/test2").write(random_data, 16536 - 35); - std::ofstream("./test_torrent_dir/test3").write(random_data, 16536); - std::ofstream("./test_torrent_dir/test4").write(random_data, 17); - std::ofstream("./test_torrent_dir/test5").write(random_data, 16536); - std::ofstream("./test_torrent_dir/test6").write(random_data, 300000); - std::ofstream("./test_torrent_dir/test7").write(random_data, 300000); + std::ofstream("./tmp1_web_seed/test_torrent_dir/test1").write(random_data, 35); + std::ofstream("./tmp1_web_seed/test_torrent_dir/test2").write(random_data, 16536 - 35); + std::ofstream("./tmp1_web_seed/test_torrent_dir/test3").write(random_data, 16536); + std::ofstream("./tmp1_web_seed/test_torrent_dir/test4").write(random_data, 17); + std::ofstream("./tmp1_web_seed/test_torrent_dir/test5").write(random_data, 16536); + std::ofstream("./tmp1_web_seed/test_torrent_dir/test6").write(random_data, 300000); + std::ofstream("./tmp1_web_seed/test_torrent_dir/test7").write(random_data, 300000); file_storage fs; - add_files(fs, path("test_torrent_dir")); + add_files(fs, path("./tmp1_web_seed/test_torrent_dir")); libtorrent::create_torrent t(fs, 16 * 1024); - t.add_url_seed("http://127.0.0.1:8000/"); + t.add_url_seed("http://127.0.0.1:8000/tmp1_web_seed"); start_web_server(8000); // calculate the hash for all pieces - int num = t.num_pieces(); - char* buf = page_aligned_allocator::malloc(t.piece_length()); - - file_pool fp; - boost::scoped_ptr s(default_storage_constructor( - fs, ".", fp)); - - for (int i = 0; i < num; ++i) - { - s->read(buf, i, 0, fs.piece_size(i)); - hasher h(buf, fs.piece_size(i)); - t.set_hash(i, h.final()); - } - + set_piece_hashes(t, "./tmp1_web_seed"); boost::intrusive_ptr torrent_file(new torrent_info(t.generate())); for (int i = 0; i < 6; ++i) test_transfer(torrent_file, i); - torrent_file->rename_file(0, "./test_torrent_dir/renamed_test1"); + torrent_file->rename_file(0, "./tmp2_web_seed/test_torrent_dir/renamed_test1"); test_transfer(torrent_file, 0); stop_web_server(8000); - remove_all("./test_torrent_dir"); - page_aligned_allocator::free(buf); + remove_all("./tmp1_web_seed"); return 0; }