From c9b594fde1fc18f063947e5e5bf47cdc4a38b966 Mon Sep 17 00:00:00 2001 From: Arvid Norberg Date: Fri, 15 Jan 2010 16:45:42 +0000 Subject: [PATCH] support sending suggest messages based on what's in the read cache. support 'explicit read cache' --- ChangeLog | 3 + docs/manual.rst | 29 ++++ docs/tuning.rst | 21 +++ include/libtorrent/aux_/session_impl.hpp | 9 ++ include/libtorrent/bt_peer_connection.hpp | 5 + include/libtorrent/disk_io_thread.hpp | 4 +- include/libtorrent/http_seed_connection.hpp | 1 + include/libtorrent/peer_connection.hpp | 5 + include/libtorrent/session_settings.hpp | 19 +++ include/libtorrent/storage.hpp | 4 + include/libtorrent/torrent.hpp | 7 +- include/libtorrent/torrent_info.hpp | 1 - include/libtorrent/web_peer_connection.hpp | 1 + src/bt_peer_connection.cpp | 23 +++ src/disk_io_thread.cpp | 162 +++++++++++++------- src/peer_connection.cpp | 30 +++- src/piece_picker.cpp | 2 +- src/session.cpp | 15 +- src/session_impl.cpp | 39 +++++ src/storage.cpp | 15 ++ src/torrent.cpp | 129 ++++++++++++++++ 21 files changed, 460 insertions(+), 64 deletions(-) diff --git a/ChangeLog b/ChangeLog index a76f6cfea..212953304 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,6 @@ + * support 'explicit read cache' which keeps a specific set of pieces + in the read cache, without implicitly caching other pieces + * support sending suggest messages based on what's in the read cache * clear sparse flag on files that complete on windows * support retry-after header for web seeds * replaced boost.filesystem with custom functions diff --git a/docs/manual.rst b/docs/manual.rst index 2a1461301..0e2ae5f2d 100644 --- a/docs/manual.rst +++ b/docs/manual.rst @@ -3638,6 +3638,9 @@ session_settings int num_want; int initial_picker_threshold; int allowed_fast_set_size; + + enum { no_piece_suggestions = 0, suggest_read_cache = 1 }; + int suggest_mode; int max_queued_disk_bytes; int handshake_timeout; bool use_dht_as_fallback; @@ -3651,6 +3654,8 @@ session_settings int cache_buffer_chunk_size; int cache_expiry; bool use_read_cache; + bool explicit_read_cache; + int explicit_cache_interval; bool disk_io_no_buffer; std::pair outgoing_ports; char peer_tos; @@ -3856,6 +3861,15 @@ in rarest first order. ``allowed_fast_set_size`` is the number of pieces we allow peers to download from us without being unchoked. +``suggest_mode`` controls whether or not libtorrent will send out suggest +messages to create a bias of its peers to request certain pieces. The modes +are: + +* ``no_piece_suggestsions`` which is the default and will not send out suggest + messages. +* ``suggest_read_cache`` which will send out suggest messages for the most + recent pieces that are in the read cache. + ``max_queued_disk_bytes`` is the number maximum number of bytes, to be written to disk, that can wait in the disk I/O thread queue. This queue is only for waiting for the disk I/O thread to receive the job and either @@ -3927,6 +3941,21 @@ in the write cache, to when it's forcefully flushed to disk. Default is 60 secon ``use_read_cache``, is set to true (default), the disk cache is also used to cache pieces read from disk. Blocks for writing pieces takes presedence. +``explicit_read_cache`` defaults to 0. If set to something greater than 0, the +disk read cache will not be evicted by cache misses and will explicitly be +controlled based on the rarity of pieces. Rare pieces are more likely to be +cached. This would typically be used together with ``suggest_mode`` set to +``suggest_read_cache``. The value is the number of pieces to keep in the read +cache. If the actual read cache can't fit as many, it will essentially be clamped. + +``explicit_cache_interval`` is the number of seconds in between each refresh of +a part of the explicit read cache. Torrents take turns in refreshing and this +is the time in between each torrent refresh. Refreshing a torrent's explicit +read cache means scanning all pieces and picking a random set of the rarest ones. +There is an affinity to pick pieces that are already in the cache, so that +subsequent refreshes only swaps in pieces that are rarer than whatever is in +the cache at the time. + ``disk_io_no_buffer`` defaults to true. When set to true, files are preferred to be opened in unbuffered mode. This helps the operating system from growing its file cache indefinitely. Currently only files whose offset in the torrent diff --git a/docs/tuning.rst b/docs/tuning.rst index e814064f5..810c220d1 100644 --- a/docs/tuning.rst +++ b/docs/tuning.rst @@ -221,6 +221,27 @@ In order to increase the possibility of read cache hits, set the ``session_settings::cache_expiry`` to a large number. This won't degrade anything as long as the client is only seeding, and not downloading any torrents. +In order to increase the disk cache hit rate, you can enable suggest messages based on +what's in the read cache. To do this, set ``session_settings::suggest_mode`` to +``session_settings::suggest_read_cache``. This will send suggest messages to peers +for the most recently used pieces in the read cache. This is especially useful if you +also enable explicit read cache, by settings ``session_settings::explicit_read_cache`` +to the number of pieces to keep in the cache. The explicit read cache will make the +disk read cache stick, and not be evicted by cache misses. The explicit read cache +will automatically pull in the rarest pieces in the read cache. + +Assuming that you seed much more data than you can keep in the cache, to a large +numbers of peers (so that the read cache wouldn't be useful anyway), this may be a +good idea. + +When peers first connect, libtorrent will send them a number of allow-fast messages, +which lets the peers download certain pieces even when they are choked, since peers +are choked by default, this often triggers immediate requests for those pieces. In the +case of using explicit read cache and suggesting those pieces, allowing fast pieces +should be disabled, to not systematically trigger requests for pieces that are not cached +for all peers. You can turn off allow-fast by settings ``session_settings::allowed_fast_set_size`` +to 0. + send buffer low watermark ------------------------- diff --git a/include/libtorrent/aux_/session_impl.hpp b/include/libtorrent/aux_/session_impl.hpp index 51c40b972..9a3c84164 100644 --- a/include/libtorrent/aux_/session_impl.hpp +++ b/include/libtorrent/aux_/session_impl.hpp @@ -627,6 +627,15 @@ namespace libtorrent // torrents. int m_auto_scrape_time_scaler; + // the index of the torrent that we'll + // refresh the next time + int m_next_explicit_cache_torrent; + + // this is a counter of the number of seconds until + // the next time the read cache is rotated, if we're + // using an explicit read read cache. + int m_cache_rotation_timer; + // statistics gathered from all torrents. stat m_stat; diff --git a/include/libtorrent/bt_peer_connection.hpp b/include/libtorrent/bt_peer_connection.hpp index 1de9d8fd7..bea368fef 100644 --- a/include/libtorrent/bt_peer_connection.hpp +++ b/include/libtorrent/bt_peer_connection.hpp @@ -218,6 +218,7 @@ namespace libtorrent void write_have_none(); void write_reject_request(peer_request const&); void write_allow_fast(int piece); + void write_suggest(int piece); void on_connected(); void on_metadata(); @@ -354,6 +355,10 @@ private: { return r.start < 0; } std::vector m_payloads; + // we have suggested these pieces to the peer + // don't suggest it again + bitfield m_sent_suggested_pieces; + #ifndef TORRENT_DISABLE_EXTENSIONS // the message ID for upload only message // 0 if not supported diff --git a/include/libtorrent/disk_io_thread.hpp b/include/libtorrent/disk_io_thread.hpp index 6115710b9..4169e1527 100644 --- a/include/libtorrent/disk_io_thread.hpp +++ b/include/libtorrent/disk_io_thread.hpp @@ -92,6 +92,7 @@ namespace libtorrent , abort_torrent , update_settings , read_and_hash + , cache_piece , finalize_file }; @@ -348,10 +349,11 @@ namespace libtorrent int read_into_piece(cached_piece_entry& p, int start_block , int options, int num_blocks, mutex::scoped_lock& l); int cache_read_block(disk_io_job const& j, mutex::scoped_lock& l); - int cache_read_piece(disk_io_job const& j, mutex::scoped_lock& l); int free_piece(cached_piece_entry& p, mutex::scoped_lock& l); int try_read_from_cache(disk_io_job const& j); int read_piece_from_cache_and_hash(disk_io_job const& j, sha1_hash& h); + int cache_piece(disk_io_job const& j, cache_t::iterator& p + , bool& hit, int options, mutex::scoped_lock& l); // this mutex only protects m_jobs, m_queue_buffer_size // and m_abort diff --git a/include/libtorrent/http_seed_connection.hpp b/include/libtorrent/http_seed_connection.hpp index 330f801a9..2c6f0f741 100644 --- a/include/libtorrent/http_seed_connection.hpp +++ b/include/libtorrent/http_seed_connection.hpp @@ -121,6 +121,7 @@ namespace libtorrent void on_connected(); void write_reject_request(peer_request const&) {} void write_allow_fast(int) {} + void write_suggest(int piece) {} #ifdef TORRENT_DEBUG void check_invariant() const; diff --git a/include/libtorrent/peer_connection.hpp b/include/libtorrent/peer_connection.hpp index 9322c5c84..e62ffa400 100644 --- a/include/libtorrent/peer_connection.hpp +++ b/include/libtorrent/peer_connection.hpp @@ -438,6 +438,7 @@ namespace libtorrent bool send_unchoke(); void send_interested(); void send_not_interested(); + void send_suggest(int piece); void snub_peer(); @@ -560,6 +561,7 @@ namespace libtorrent virtual void write_have(int index) = 0; virtual void write_keepalive() = 0; virtual void write_piece(peer_request const& r, disk_buffer_holder& buffer) = 0; + virtual void write_suggest(int piece) = 0; virtual void write_reject_request(peer_request const& r) = 0; virtual void write_allow_fast(int piece) = 0; @@ -1015,6 +1017,9 @@ namespace libtorrent // if this is set to true, the client will not // pick any pieces from this peer bool m_no_download:1; + + // set to true when we've sent the first round of suggests + bool m_sent_suggests:1; template struct handler_storage diff --git a/include/libtorrent/session_settings.hpp b/include/libtorrent/session_settings.hpp index de391d2bc..41d2c6d45 100644 --- a/include/libtorrent/session_settings.hpp +++ b/include/libtorrent/session_settings.hpp @@ -114,6 +114,7 @@ namespace libtorrent , num_want(200) , initial_picker_threshold(4) , allowed_fast_set_size(10) + , suggest_mode(no_piece_suggestions) , max_queued_disk_bytes(256 * 1024) , handshake_timeout(10) #ifndef TORRENT_DISABLE_DHT @@ -129,6 +130,8 @@ namespace libtorrent , cache_buffer_chunk_size(16) , cache_expiry(60) , use_read_cache(true) + , explicit_read_cache(0) + , explicit_cache_interval(30) , disk_io_write_mode(0) , disk_io_read_mode(0) , coalesce_reads(false) @@ -338,6 +341,13 @@ namespace libtorrent // that supports the fast extensions int allowed_fast_set_size; + // this determines which pieces will be suggested to peers + // suggest read cache will make libtorrent suggest pieces + // that are fresh in the disk read cache, to potentially + // lower disk access and increase the cache hit ratio + enum { no_piece_suggestions = 0, suggest_read_cache = 1 }; + int suggest_mode; + // the maximum number of bytes a connection may have // pending in the disk write queue before its download // rate is being throttled. This prevents fast downloads @@ -419,6 +429,15 @@ namespace libtorrent // cache for caching blocks read from disk too bool use_read_cache; + // don't implicitly cache pieces in the read cache, + // only cache pieces that are explicitly asked to be + // cached. + bool explicit_read_cache; + + // the number of seconds between refreshes of + // explicit caches + int explicit_cache_interval; + enum io_buffer_mode_t { enable_os_cache = 0, diff --git a/include/libtorrent/storage.hpp b/include/libtorrent/storage.hpp index e1a1463c9..6e24a8b23 100644 --- a/include/libtorrent/storage.hpp +++ b/include/libtorrent/storage.hpp @@ -228,6 +228,10 @@ namespace libtorrent , boost::function const& handler , int priority = 0); + void async_cache(int piece + , boost::function const& handler + , int priority = 0); + void async_write( peer_request const& r , disk_buffer_holder& buffer diff --git a/include/libtorrent/torrent.hpp b/include/libtorrent/torrent.hpp index 34979fc46..48f608214 100644 --- a/include/libtorrent/torrent.hpp +++ b/include/libtorrent/torrent.hpp @@ -176,6 +176,7 @@ namespace libtorrent void add_piece(int piece, char const* data, int flags = 0); void on_disk_write_complete(int ret, disk_io_job const& j , peer_request p); + void on_disk_cache_complete(int ret, disk_io_job const& j); struct read_piece_struct { @@ -379,6 +380,8 @@ namespace libtorrent void get_peer_info(std::vector& v); void get_download_queue(std::vector& queue); + void refresh_explicit_cache(int cache_size); + // -------------------------------------------- // TRACKER MANAGEMENT @@ -440,6 +443,8 @@ namespace libtorrent void update_sparse_piece_prio(int piece, int cursor, int reverse_cursor); + void get_suggested_pieces(std::vector& s) const; + bool super_seeding() const { return m_super_seeding; } @@ -1017,7 +1022,7 @@ namespace libtorrent // connection attempt. See: // http://www.ecs.umass.edu/ece/wolf/courses/ECE697J/papers/DRR.pdf // The quanta assigned to each torrent depends on the torrents - // priority, whether it's seed and the number of connected + // priority, whether it's a seed and the number of connected // peers it has. This has the effect that some torrents // will have more connection attempts than other. Each // connection attempt costs 100 points from the deficit diff --git a/include/libtorrent/torrent_info.hpp b/include/libtorrent/torrent_info.hpp index eae74e225..bb84053ff 100644 --- a/include/libtorrent/torrent_info.hpp +++ b/include/libtorrent/torrent_info.hpp @@ -35,7 +35,6 @@ POSSIBILITY OF SUCH DAMAGE. #include #include -//#include #ifdef _MSC_VER #pragma warning(push, 1) diff --git a/include/libtorrent/web_peer_connection.hpp b/include/libtorrent/web_peer_connection.hpp index 4e3124342..3fc9a5a0a 100644 --- a/include/libtorrent/web_peer_connection.hpp +++ b/include/libtorrent/web_peer_connection.hpp @@ -119,6 +119,7 @@ namespace libtorrent void on_connected(); void write_reject_request(peer_request const&) {} void write_allow_fast(int) {} + void write_suggest(int piece) {} #ifdef TORRENT_DEBUG void check_invariant() const; diff --git a/src/bt_peer_connection.cpp b/src/bt_peer_connection.cpp index 30de48692..c96dbe83e 100644 --- a/src/bt_peer_connection.cpp +++ b/src/bt_peer_connection.cpp @@ -339,6 +339,29 @@ namespace libtorrent send_buffer(msg, sizeof(msg)); } + void bt_peer_connection::write_suggest(int piece) + { + INVARIANT_CHECK; + + TORRENT_ASSERT(m_sent_handshake && m_sent_bitfield); + TORRENT_ASSERT(associated_torrent().lock()->valid_metadata()); + TORRENT_ASSERT(m_supports_fast); + + boost::shared_ptr t = associated_torrent().lock(); + TORRENT_ASSERT(t); + + if (m_sent_suggested_pieces.empty()) + m_sent_suggested_pieces.resize(t->torrent_file().num_pieces(), false); + + if (m_sent_suggested_pieces[piece]) return; + m_sent_suggested_pieces.set_bit(piece); + + char msg[] = {0,0,0,5, msg_suggest_piece, 0, 0, 0, 0}; + char* ptr = msg + 5; + detail::write_int32(piece, ptr); + send_buffer(msg, sizeof(msg)); + } + void bt_peer_connection::get_specific_peer_info(peer_info& p) const { TORRENT_ASSERT(!associated_torrent().expired()); diff --git a/src/disk_io_thread.cpp b/src/disk_io_thread.cpp index 6d2cffe00..b30c800b6 100644 --- a/src/disk_io_thread.cpp +++ b/src/disk_io_thread.cpp @@ -449,6 +449,8 @@ namespace libtorrent flush_and_remove(i, l); } + if (m_settings.explicit_read_cache) return; + // flush read cache for (;;) { @@ -819,7 +821,7 @@ namespace libtorrent TORRENT_ASSERT(piece_offset <= piece_size); // this is a block that is already allocated - // free it an allocate a new one + // free it and allocate a new one if (p.blocks[i].buf) { free_buffer(p.blocks[i].buf); @@ -925,35 +927,6 @@ namespace libtorrent TORRENT_ASSERT(ret == buffer_size); return ret; } - - // returns -1 on read error, -2 on out of memory error or the number of bytes read - // this function ignores the cache size limit, it will read the entire - // piece regardless of the offset in j - // this is used for seed-mode, where we need to read the entire piece to calculate - // the hash - int disk_io_thread::cache_read_piece(disk_io_job const& j, mutex::scoped_lock& l) - { - INVARIANT_CHECK; - - int piece_size = j.storage->info()->piece_size(j.piece); - int blocks_in_piece = (piece_size + m_block_size - 1) / m_block_size; - - if (in_use() + blocks_in_piece > m_settings.cache_size) - flush_cache_blocks(l, in_use() + blocks_in_piece - m_settings.cache_size, m_read_pieces.end()); - - cached_piece_entry p; - p.piece = j.piece; - p.storage = j.storage; - p.last_use = time_now(); - p.num_blocks = 0; - p.blocks.reset(new (std::nothrow) cached_block_entry[blocks_in_piece]); - if (!p.blocks) return -1; - int ret = read_into_piece(p, 0, ignore_cache_size, INT_MAX, l); - - if (ret >= 0) m_read_pieces.push_back(p); - - return ret; - } // returns -1 on read error, -2 if there isn't any space in the cache // or the number of bytes read @@ -961,6 +934,10 @@ namespace libtorrent { INVARIANT_CHECK; + // this function will create a new cached_piece_entry + // and requires that it doesn't already exist + TORRENT_ASSERT(find_cached_piece(m_read_pieces, j, l) == m_read_pieces.end()); + int piece_size = j.storage->info()->piece_size(j.piece); int blocks_in_piece = (piece_size + m_block_size - 1) / m_block_size; @@ -972,9 +949,11 @@ namespace libtorrent blocks_to_read = (std::min)(blocks_to_read, m_settings.read_cache_line_size); if (in_use() + blocks_to_read > m_settings.cache_size) - if (flush_cache_blocks(l, in_use() + blocks_to_read - m_settings.cache_size - , m_read_pieces.end(), dont_flush_write_blocks) == 0) + { + int clear = in_use() + blocks_to_read - m_settings.cache_size; + if (flush_cache_blocks(l, clear, m_read_pieces.end(), dont_flush_write_blocks) < clear) return -2; + } cached_piece_entry p; p.piece = j.piece; @@ -984,7 +963,7 @@ namespace libtorrent p.blocks.reset(new (std::nothrow) cached_block_entry[blocks_in_piece]); if (!p.blocks) return -1; int ret = read_into_piece(p, start_block, 0, blocks_to_read, l); - + if (ret >= 0) m_read_pieces.push_back(p); return ret; @@ -1058,16 +1037,17 @@ namespace libtorrent } #endif - int disk_io_thread::read_piece_from_cache_and_hash(disk_io_job const& j, sha1_hash& h) + // reads the full piece specified by j into the read cache + // returns the iterator to it and whether or not it already + // was in the cache (hit). + int disk_io_thread::cache_piece(disk_io_job const& j, cache_t::iterator& p + , bool& hit, int options, mutex::scoped_lock& l) { - TORRENT_ASSERT(j.buffer); + INVARIANT_CHECK; - mutex::scoped_lock l(m_piece_mutex); - - cache_t::iterator p - = find_cached_piece(m_read_pieces, j, l); + p = find_cached_piece(m_read_pieces, j, l); - bool hit = true; + hit = true; int ret = 0; int piece_size = j.storage->info()->piece_size(j.piece); @@ -1075,29 +1055,54 @@ namespace libtorrent if (p != m_read_pieces.end() && p->num_blocks != blocks_in_piece) { + INVARIANT_CHECK; // we have the piece in the cache, but not all of the blocks - ret = read_into_piece(*p, 0, ignore_cache_size, blocks_in_piece, l); + ret = read_into_piece(*p, 0, options, blocks_in_piece, l); hit = false; if (ret < 0) return ret; - TORRENT_ASSERT(!m_read_pieces.empty()); - TORRENT_ASSERT(p->piece == j.piece); - TORRENT_ASSERT(p->storage == j.storage); } - - // if the piece cannot be found in the cache, - // read the whole piece starting at the block - // we got a request for. - if (p == m_read_pieces.end()) + else if (p == m_read_pieces.end()) { - ret = cache_read_piece(j, l); + INVARIANT_CHECK; + // if the piece cannot be found in the cache, + // read the whole piece starting at the block + // we got a request for. + + cached_piece_entry pe; + pe.piece = j.piece; + pe.storage = j.storage; + pe.last_use = time_now(); + pe.num_blocks = 0; + pe.blocks.reset(new (std::nothrow) cached_block_entry[blocks_in_piece]); + if (!pe.blocks) return -1; + ret = read_into_piece(pe, 0, options, INT_MAX, l); + + if (ret >= 0) m_read_pieces.push_back(pe); hit = false; - if (ret < 0) return ret; p = m_read_pieces.end(); --p; - TORRENT_ASSERT(!m_read_pieces.empty()); - TORRENT_ASSERT(p->piece == j.piece); - TORRENT_ASSERT(p->storage == j.storage); } + p->last_use = time_now(); + TORRENT_ASSERT(!m_read_pieces.empty()); + TORRENT_ASSERT(p->piece == j.piece); + TORRENT_ASSERT(p->storage == j.storage); + return ret; + } + + // cache the entire piece and hash it + int disk_io_thread::read_piece_from_cache_and_hash(disk_io_job const& j, sha1_hash& h) + { + TORRENT_ASSERT(j.buffer); + + mutex::scoped_lock l(m_piece_mutex); + + cache_t::iterator p; + bool hit; + int ret = cache_piece(j, p, hit, ignore_cache_size, l); + if (ret < 0) return ret; + + int piece_size = j.storage->info()->piece_size(j.piece); + int blocks_in_piece = (piece_size + m_block_size - 1) / m_block_size; hasher ctx; @@ -1115,8 +1120,12 @@ namespace libtorrent // if read cache is disabled or we exceeded the // limit, remove this piece from the cache + // also, if the piece wasn't in the cache when + // the function was called, and we're using an + // explicit read cache, remove it again if (in_use() >= m_settings.cache_size - || !m_settings.use_read_cache) + || !m_settings.use_read_cache + || (m_settings.explicit_read_cache && !hit)) { TORRENT_ASSERT(!m_read_pieces.empty()); TORRENT_ASSERT(p->piece == j.piece); @@ -1178,6 +1187,10 @@ namespace libtorrent // boundaries if (p->blocks[start_block].buf == 0) { + // if we use an explicit read cache, pretend there's no + // space to force hitting disk without caching anything + if (m_settings.explicit_read_cache) return -2; + int piece_size = j.storage->info()->piece_size(j.piece); int blocks_in_piece = (piece_size + m_block_size - 1) / m_block_size; int end_block = start_block; @@ -1188,10 +1201,14 @@ namespace libtorrent + m_cache_stats.read_cache_size - in_use())/2, 3)); blocks_to_read = (std::min)(blocks_to_read, m_settings.read_cache_line_size); blocks_to_read = (std::max)(blocks_to_read, min_blocks_to_read); + + // if we don't have enough space for the new piece, try flushing something else if (in_use() + blocks_to_read > m_settings.cache_size) - if (flush_cache_blocks(l, in_use() + blocks_to_read - m_settings.cache_size - , p, dont_flush_write_blocks) == 0) + { + int clear = in_use() + blocks_to_read - m_settings.cache_size; + if (flush_cache_blocks(l, clear, m_read_pieces.end(), dont_flush_write_blocks) < clear) return -2; + } int ret = read_into_piece(*p, block, 0, blocks_to_read, l); hit = false; @@ -1235,6 +1252,13 @@ namespace libtorrent // we got a request for. if (p == m_read_pieces.end()) { + // if we use an explicit read cache and we + // couldn't find the block in the cache, + // pretend that there's not enough space + // to cache it, to force the read operation + // go go straight to disk + if (m_settings.explicit_read_cache) return -2; + ret = cache_read_block(j, l); hit = false; if (ret < 0) return ret; @@ -1338,6 +1362,7 @@ namespace libtorrent , 0 // abort_torrent , cancel_on_abort // update_settings , read_operation + cancel_on_abort // read_and_hash + , read_operation + cancel_on_abort // cache_piece , 0 // finalize_file }; @@ -1821,6 +1846,29 @@ namespace libtorrent break; } + case disk_io_job::cache_piece: + { + mutex::scoped_lock l(m_piece_mutex); + + if (test_error(j)) + { + ret = -1; + break; + } +#ifdef TORRENT_DISK_STATS + m_log << log_time() << " cache " << j.piece << std::endl; +#endif + INVARIANT_CHECK; + TORRENT_ASSERT(j.buffer == 0); + + cache_t::iterator p; + bool hit; + ret = cache_piece(j, p, hit, 0, l); + if (ret == -2) ret = -1; + + if (ret < 0) test_error(j); + break; + } case disk_io_job::hash: { #ifdef TORRENT_DISK_STATS diff --git a/src/peer_connection.cpp b/src/peer_connection.cpp index 55a6bb444..7497fc7c5 100644 --- a/src/peer_connection.cpp +++ b/src/peer_connection.cpp @@ -145,6 +145,7 @@ namespace libtorrent , m_snubbed(false) , m_bitfield_received(false) , m_no_download(false) + , m_sent_suggests(false) #ifdef TORRENT_DEBUG , m_in_constructor(true) , m_disconnect_started(false) @@ -274,6 +275,7 @@ namespace libtorrent , m_snubbed(false) , m_bitfield_received(false) , m_no_download(false) + , m_sent_suggests(false) #ifdef TORRENT_DEBUG , m_in_constructor(true) , m_disconnect_started(false) @@ -779,7 +781,7 @@ namespace libtorrent // dont announce during handshake if (in_handshake()) return; - // remove suggested pieces that we have + // remove suggested pieces once we have them std::vector::iterator i = std::find( m_suggested_pieces.begin(), m_suggested_pieces.end(), index); if (i != m_suggested_pieces.end()) m_suggested_pieces.erase(i); @@ -2842,6 +2844,18 @@ namespace libtorrent if (!m_choked) return false; boost::shared_ptr t = m_torrent.lock(); if (!t->ready_for_connections()) return false; + + if (!m_sent_suggests) + { + std::vector ret; + t->get_suggested_pieces(ret); + for (std::vector::iterator i = ret.begin() + , end(ret.end()); i != end; ++i) + send_suggest(*i); + + m_sent_suggests = true; + } + m_last_unchoke = time_now(); write_unchoke(); m_choked = false; @@ -2890,6 +2904,20 @@ namespace libtorrent disconnect_if_redundant(); } + void peer_connection::send_suggest(int piece) + { + if (m_connecting) return; + if (in_handshake()) return; + + // don't suggest a piece that the peer already has + // don't suggest anything to a peer that isn't interested + if (has_piece(piece) + || !m_peer_interested) + return; + + write_suggest(piece); + } + void peer_connection::send_block_requests() { INVARIANT_CHECK; diff --git a/src/piece_picker.cpp b/src/piece_picker.cpp index 3ab3dbca8..b50f2ccbe 100644 --- a/src/piece_picker.cpp +++ b/src/piece_picker.cpp @@ -2057,7 +2057,7 @@ namespace libtorrent block_info const& info = i->info[block.block_index]; return info.num_peers; } - + void piece_picker::get_availability(std::vector& avail) const { TORRENT_ASSERT(m_seeds >= 0); diff --git a/src/session.cpp b/src/session.cpp index 4eeffdcbb..70c57597c 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -169,8 +169,8 @@ namespace libtorrent // the same NAT set.allow_multiple_connections_per_ip = true; - // use 512 MB of cache - set.cache_size = 32768; + // use 1 GB of cache + set.cache_size = 32768 * 2; set.use_read_cache = true; set.cache_buffer_chunk_size = 128; set.read_cache_line_size = 512; @@ -178,6 +178,17 @@ namespace libtorrent // one hour expiration set.cache_expiry = 60 * 60; + // flush write cache based on largest contiguous block + set.disk_cache_algorithm = session_settings::largest_contiguous; + + // explicitly cache rare pieces + set.explicit_read_cache = true; + // prevent fast pieces to interfere with suggested pieces + // since we unchoke everyone, we don't need fast pieces anyway + set.allowed_fast_set_size = 0; + // suggest pieces in the read cache for higher cache hit rate + set.suggest_mode = session_settings::suggest_read_cache; + set.close_redundant_connections = true; set.max_rejects = 10; diff --git a/src/session_impl.cpp b/src/session_impl.cpp index f5aa0db33..baf371cdc 100644 --- a/src/session_impl.cpp +++ b/src/session_impl.cpp @@ -219,6 +219,7 @@ namespace aux { TORRENT_SETTING(integer, num_want) TORRENT_SETTING(integer, initial_picker_threshold) TORRENT_SETTING(integer, allowed_fast_set_size) + TORRENT_SETTING(integer, suggest_mode) TORRENT_SETTING(integer, max_queued_disk_bytes) TORRENT_SETTING(integer, handshake_timeout) #ifndef TORRENT_DISABLE_DHT @@ -234,6 +235,7 @@ namespace aux { TORRENT_SETTING(integer, cache_buffer_chunk_size) TORRENT_SETTING(integer, cache_expiry) TORRENT_SETTING(boolean, use_read_cache) + TORRENT_SETTING(boolean, explicit_read_cache) TORRENT_SETTING(integer, disk_io_write_mode) TORRENT_SETTING(integer, disk_io_read_mode) TORRENT_SETTING(boolean, coalesce_reads) @@ -425,6 +427,8 @@ namespace aux { , m_optimistic_unchoke_time_scaler(0) , m_disconnect_time_scaler(90) , m_auto_scrape_time_scaler(180) + , m_next_explicit_cache_torrent(0) + , m_cache_rotation_timer(0) , m_incoming_connection(false) , m_created(time_now_hires()) , m_last_tick(m_created) @@ -1008,6 +1012,7 @@ namespace aux { || m_settings.coalesce_reads != s.coalesce_reads || m_settings.max_queued_disk_bytes != s.max_queued_disk_bytes || m_settings.disable_hash_checks != s.disable_hash_checks + || m_settings.explicit_read_cache != s.explicit_read_cache #ifndef TORRENT_DISABLE_MLOCK || m_settings.lock_disk_cache != s.lock_disk_cache #endif @@ -1873,6 +1878,40 @@ namespace aux { } } + // -------------------------------------------------------------- + // refresh explicit disk read cache + // -------------------------------------------------------------- + --m_cache_rotation_timer; + if (m_settings.explicit_read_cache + && m_cache_rotation_timer <= 0) + { + m_cache_rotation_timer = m_settings.explicit_cache_interval; + + torrent_map::iterator least_recently_refreshed = m_torrents.begin(); + if (m_next_explicit_cache_torrent >= m_torrents.size()) + m_next_explicit_cache_torrent = 0; + + std::advance(least_recently_refreshed, m_next_explicit_cache_torrent); + + // how many blocks does this torrent get? + int cache_size = (std::max)(0, m_settings.cache_size * 9 / 10); + + if (m_connections.empty()) + { + // if we don't have any connections at all, split the + // cache evenly across all torrents + cache_size = cache_size / m_torrents.size(); + } + else + { + cache_size = cache_size * least_recently_refreshed->second->num_peers() + / m_connections.size(); + } + + least_recently_refreshed->second->refresh_explicit_cache(cache_size); + ++m_next_explicit_cache_torrent; + } + // -------------------------------------------------------------- // connect new peers // -------------------------------------------------------------- diff --git a/src/storage.cpp b/src/storage.cpp index ef70378b6..005c07eb0 100644 --- a/src/storage.cpp +++ b/src/storage.cpp @@ -1546,6 +1546,21 @@ ret: #endif } + void piece_manager::async_cache(int piece + , boost::function const& handler + , int priority) + { + disk_io_job j; + j.storage = this; + j.action = disk_io_job::cache_piece; + j.piece = piece; + j.offset = 0; + j.buffer_size = 0; + j.buffer = 0; + j.priority = priority; + m_io_thread.add_job(j, handler); + } + void piece_manager::async_read( peer_request const& r , boost::function const& handler diff --git a/src/torrent.cpp b/src/torrent.cpp index 7be496adf..25867ee7f 100644 --- a/src/torrent.cpp +++ b/src/torrent.cpp @@ -245,6 +245,7 @@ namespace libtorrent , m_deficit_counter(0) , m_sequence_number(seq) , m_last_working_tracker(-1) + , m_failed_trackers(0) , m_time_scaler(0) , m_priority(0) , m_abort(false) @@ -628,6 +629,14 @@ namespace libtorrent , p.piece, _1)); } } + + void torrent::on_disk_cache_complete(int ret, disk_io_job const& j) + { + // suggest this piece to all peers + for (peer_iterator i = m_connections.begin(); + i != m_connections.end(); ++i) + (*i)->send_suggest(j.piece); + } bool torrent::add_merkle_nodes(std::map const& nodes, int piece) { @@ -5479,6 +5488,126 @@ namespace libtorrent m_stat.second_tick(tick_interval_ms); } + void torrent::refresh_explicit_cache(int cache_size) + { + if (!ready_for_connections()) return; + // rotate the cached pieces + + // add blocks_per_piece / 2 in order to round to closest whole piece + int blocks_per_piece = m_torrent_file->piece_length() / m_block_size; + int num_cache_pieces = (cache_size + blocks_per_piece / 2) / blocks_per_piece; + if (num_cache_pieces > m_torrent_file->num_pieces()) + num_cache_pieces = m_torrent_file->num_pieces(); + + std::vector avail_vec; + if (has_picker()) + { + m_picker->get_availability(avail_vec); + } + else + { + // we don't keep track of availability, do it the expensive way + // do a linear search from the first piece + for (int i = 0; i < m_torrent_file->num_pieces(); ++i) + { + int availability = 0; + if (!have_piece(i)) + { + avail_vec.push_back(INT_MAX); + continue; + } + + for (const_peer_iterator j = this->begin(); j != this->end(); ++j) + if ((*j)->has_piece(i)) ++availability; + avail_vec.push_back(availability); + } + } + + // now pick the num_cache_pieces rarest pieces from avail_vec + std::vector > pieces(m_torrent_file->num_pieces()); + for (int i = 0; i < m_torrent_file->num_pieces(); ++i) + { + pieces[i].second = i; + if (!have_piece(i)) pieces[i].first = INT_MAX; + else pieces[i].first = avail_vec[i]; + } + + // decrease the availability of the pieces that are + // already in the read cache, to move them closer to + // the beginning of the pieces list, and more likely + // to be included in this round of cache pieces + std::vector ret; + m_ses.m_disk_thread.get_cache_info(info_hash(), ret); + // remove write cache entries + ret.erase(std::remove_if(ret.begin(), ret.end() + , boost::bind(&cached_piece_info::kind, _1) == cached_piece_info::write_cache) + , ret.end()); + for (std::vector::iterator i = ret.begin() + , end(ret.end()); i != end; ++i) + { + --pieces[i->piece].first; + } + + std::random_shuffle(pieces.begin(), pieces.end()); + std::stable_sort(pieces.begin(), pieces.end() + , boost::bind(&std::pair::first, _1) < + boost::bind(&std::pair::first, _2)); + avail_vec.clear(); + for (int i = 0; i < num_cache_pieces; ++i) + { + if (pieces[i].first == INT_MAX) break; + avail_vec.push_back(pieces[i].second); + } + + if (!avail_vec.empty()) + { + // the number of pieces to cache for this torrent is proportional + // the number of peers it has, divided by the total number of peers. + // Each peer gets an equal share of the cache + + avail_vec.resize((std::min)(num_cache_pieces, int(avail_vec.size()))); + + for (std::vector::iterator i = avail_vec.begin() + , end(avail_vec.end()); i != end; ++i) + filesystem().async_cache(*i, bind(&torrent::on_disk_cache_complete + , shared_from_this(), _1, _2)); + } + } + + void torrent::get_suggested_pieces(std::vector& s) const + { + if (m_ses.m_settings.suggest_mode == session_settings::no_piece_suggestions) + { + s.clear(); + return; + } + + std::vector ret; + m_ses.m_disk_thread.get_cache_info(info_hash(), ret); + ptime now = time_now(); + + // remove write cache entries + ret.erase(std::remove_if(ret.begin(), ret.end() + , boost::bind(&cached_piece_info::kind, _1) == cached_piece_info::write_cache) + , ret.end()); + + // sort by how new the cached entry is, new pieces first + std::sort(ret.begin(), ret.end() + , boost::bind(&cached_piece_info::last_use, _1) + < boost::bind(&cached_piece_info::last_use, _2)); + + // cut off the oldest pieces that we don't want to suggest + // if we have an explicit cache, it's much more likely to + // stick around, so we should suggest all pieces + int num_pieces_to_suggest = int(ret.size()); + if (!m_ses.m_settings.explicit_read_cache) + num_pieces_to_suggest = (std::max)(1, int(ret.size() / 2)); + ret.resize(num_pieces_to_suggest); + + std::transform(ret.begin(), ret.end(), std::back_inserter(s) + , boost::bind(&cached_piece_info::piece, _1)); + } + void torrent::add_stats(stat const& s) { // these stats are propagated to the session