support sending suggest messages based on what's in the read cache. support 'explicit read cache'

This commit is contained in:
Arvid Norberg 2010-01-15 16:45:42 +00:00
parent ef41d38c34
commit c9b594fde1
21 changed files with 460 additions and 64 deletions

View File

@ -1,3 +1,6 @@
* support 'explicit read cache' which keeps a specific set of pieces
in the read cache, without implicitly caching other pieces
* support sending suggest messages based on what's in the read cache
* clear sparse flag on files that complete on windows
* support retry-after header for web seeds
* replaced boost.filesystem with custom functions

View File

@ -3638,6 +3638,9 @@ session_settings
int num_want;
int initial_picker_threshold;
int allowed_fast_set_size;
enum { no_piece_suggestions = 0, suggest_read_cache = 1 };
int suggest_mode;
int max_queued_disk_bytes;
int handshake_timeout;
bool use_dht_as_fallback;
@ -3651,6 +3654,8 @@ session_settings
int cache_buffer_chunk_size;
int cache_expiry;
bool use_read_cache;
bool explicit_read_cache;
int explicit_cache_interval;
bool disk_io_no_buffer;
std::pair<int, int> outgoing_ports;
char peer_tos;
@ -3856,6 +3861,15 @@ in rarest first order.
``allowed_fast_set_size`` is the number of pieces we allow peers to download
from us without being unchoked.
``suggest_mode`` controls whether or not libtorrent will send out suggest
messages to create a bias of its peers to request certain pieces. The modes
are:
* ``no_piece_suggestsions`` which is the default and will not send out suggest
messages.
* ``suggest_read_cache`` which will send out suggest messages for the most
recent pieces that are in the read cache.
``max_queued_disk_bytes`` is the number maximum number of bytes, to be
written to disk, that can wait in the disk I/O thread queue. This queue
is only for waiting for the disk I/O thread to receive the job and either
@ -3927,6 +3941,21 @@ in the write cache, to when it's forcefully flushed to disk. Default is 60 secon
``use_read_cache``, is set to true (default), the disk cache is also used to
cache pieces read from disk. Blocks for writing pieces takes presedence.
``explicit_read_cache`` defaults to 0. If set to something greater than 0, the
disk read cache will not be evicted by cache misses and will explicitly be
controlled based on the rarity of pieces. Rare pieces are more likely to be
cached. This would typically be used together with ``suggest_mode`` set to
``suggest_read_cache``. The value is the number of pieces to keep in the read
cache. If the actual read cache can't fit as many, it will essentially be clamped.
``explicit_cache_interval`` is the number of seconds in between each refresh of
a part of the explicit read cache. Torrents take turns in refreshing and this
is the time in between each torrent refresh. Refreshing a torrent's explicit
read cache means scanning all pieces and picking a random set of the rarest ones.
There is an affinity to pick pieces that are already in the cache, so that
subsequent refreshes only swaps in pieces that are rarer than whatever is in
the cache at the time.
``disk_io_no_buffer`` defaults to true. When set to true, files are preferred
to be opened in unbuffered mode. This helps the operating system from growing
its file cache indefinitely. Currently only files whose offset in the torrent

View File

@ -221,6 +221,27 @@ In order to increase the possibility of read cache hits, set the
``session_settings::cache_expiry`` to a large number. This won't degrade anything as
long as the client is only seeding, and not downloading any torrents.
In order to increase the disk cache hit rate, you can enable suggest messages based on
what's in the read cache. To do this, set ``session_settings::suggest_mode`` to
``session_settings::suggest_read_cache``. This will send suggest messages to peers
for the most recently used pieces in the read cache. This is especially useful if you
also enable explicit read cache, by settings ``session_settings::explicit_read_cache``
to the number of pieces to keep in the cache. The explicit read cache will make the
disk read cache stick, and not be evicted by cache misses. The explicit read cache
will automatically pull in the rarest pieces in the read cache.
Assuming that you seed much more data than you can keep in the cache, to a large
numbers of peers (so that the read cache wouldn't be useful anyway), this may be a
good idea.
When peers first connect, libtorrent will send them a number of allow-fast messages,
which lets the peers download certain pieces even when they are choked, since peers
are choked by default, this often triggers immediate requests for those pieces. In the
case of using explicit read cache and suggesting those pieces, allowing fast pieces
should be disabled, to not systematically trigger requests for pieces that are not cached
for all peers. You can turn off allow-fast by settings ``session_settings::allowed_fast_set_size``
to 0.
send buffer low watermark
-------------------------

View File

@ -627,6 +627,15 @@ namespace libtorrent
// torrents.
int m_auto_scrape_time_scaler;
// the index of the torrent that we'll
// refresh the next time
int m_next_explicit_cache_torrent;
// this is a counter of the number of seconds until
// the next time the read cache is rotated, if we're
// using an explicit read read cache.
int m_cache_rotation_timer;
// statistics gathered from all torrents.
stat m_stat;

View File

@ -218,6 +218,7 @@ namespace libtorrent
void write_have_none();
void write_reject_request(peer_request const&);
void write_allow_fast(int piece);
void write_suggest(int piece);
void on_connected();
void on_metadata();
@ -354,6 +355,10 @@ private:
{ return r.start < 0; }
std::vector<range> m_payloads;
// we have suggested these pieces to the peer
// don't suggest it again
bitfield m_sent_suggested_pieces;
#ifndef TORRENT_DISABLE_EXTENSIONS
// the message ID for upload only message
// 0 if not supported

View File

@ -92,6 +92,7 @@ namespace libtorrent
, abort_torrent
, update_settings
, read_and_hash
, cache_piece
, finalize_file
};
@ -348,10 +349,11 @@ namespace libtorrent
int read_into_piece(cached_piece_entry& p, int start_block
, int options, int num_blocks, mutex::scoped_lock& l);
int cache_read_block(disk_io_job const& j, mutex::scoped_lock& l);
int cache_read_piece(disk_io_job const& j, mutex::scoped_lock& l);
int free_piece(cached_piece_entry& p, mutex::scoped_lock& l);
int try_read_from_cache(disk_io_job const& j);
int read_piece_from_cache_and_hash(disk_io_job const& j, sha1_hash& h);
int cache_piece(disk_io_job const& j, cache_t::iterator& p
, bool& hit, int options, mutex::scoped_lock& l);
// this mutex only protects m_jobs, m_queue_buffer_size
// and m_abort

View File

@ -121,6 +121,7 @@ namespace libtorrent
void on_connected();
void write_reject_request(peer_request const&) {}
void write_allow_fast(int) {}
void write_suggest(int piece) {}
#ifdef TORRENT_DEBUG
void check_invariant() const;

View File

@ -438,6 +438,7 @@ namespace libtorrent
bool send_unchoke();
void send_interested();
void send_not_interested();
void send_suggest(int piece);
void snub_peer();
@ -560,6 +561,7 @@ namespace libtorrent
virtual void write_have(int index) = 0;
virtual void write_keepalive() = 0;
virtual void write_piece(peer_request const& r, disk_buffer_holder& buffer) = 0;
virtual void write_suggest(int piece) = 0;
virtual void write_reject_request(peer_request const& r) = 0;
virtual void write_allow_fast(int piece) = 0;
@ -1015,6 +1017,9 @@ namespace libtorrent
// if this is set to true, the client will not
// pick any pieces from this peer
bool m_no_download:1;
// set to true when we've sent the first round of suggests
bool m_sent_suggests:1;
template <std::size_t Size>
struct handler_storage

View File

@ -114,6 +114,7 @@ namespace libtorrent
, num_want(200)
, initial_picker_threshold(4)
, allowed_fast_set_size(10)
, suggest_mode(no_piece_suggestions)
, max_queued_disk_bytes(256 * 1024)
, handshake_timeout(10)
#ifndef TORRENT_DISABLE_DHT
@ -129,6 +130,8 @@ namespace libtorrent
, cache_buffer_chunk_size(16)
, cache_expiry(60)
, use_read_cache(true)
, explicit_read_cache(0)
, explicit_cache_interval(30)
, disk_io_write_mode(0)
, disk_io_read_mode(0)
, coalesce_reads(false)
@ -338,6 +341,13 @@ namespace libtorrent
// that supports the fast extensions
int allowed_fast_set_size;
// this determines which pieces will be suggested to peers
// suggest read cache will make libtorrent suggest pieces
// that are fresh in the disk read cache, to potentially
// lower disk access and increase the cache hit ratio
enum { no_piece_suggestions = 0, suggest_read_cache = 1 };
int suggest_mode;
// the maximum number of bytes a connection may have
// pending in the disk write queue before its download
// rate is being throttled. This prevents fast downloads
@ -419,6 +429,15 @@ namespace libtorrent
// cache for caching blocks read from disk too
bool use_read_cache;
// don't implicitly cache pieces in the read cache,
// only cache pieces that are explicitly asked to be
// cached.
bool explicit_read_cache;
// the number of seconds between refreshes of
// explicit caches
int explicit_cache_interval;
enum io_buffer_mode_t
{
enable_os_cache = 0,

View File

@ -228,6 +228,10 @@ namespace libtorrent
, boost::function<void(int, disk_io_job const&)> const& handler
, int priority = 0);
void async_cache(int piece
, boost::function<void(int, disk_io_job const&)> const& handler
, int priority = 0);
void async_write(
peer_request const& r
, disk_buffer_holder& buffer

View File

@ -176,6 +176,7 @@ namespace libtorrent
void add_piece(int piece, char const* data, int flags = 0);
void on_disk_write_complete(int ret, disk_io_job const& j
, peer_request p);
void on_disk_cache_complete(int ret, disk_io_job const& j);
struct read_piece_struct
{
@ -379,6 +380,8 @@ namespace libtorrent
void get_peer_info(std::vector<peer_info>& v);
void get_download_queue(std::vector<partial_piece_info>& queue);
void refresh_explicit_cache(int cache_size);
// --------------------------------------------
// TRACKER MANAGEMENT
@ -440,6 +443,8 @@ namespace libtorrent
void update_sparse_piece_prio(int piece, int cursor, int reverse_cursor);
void get_suggested_pieces(std::vector<int>& s) const;
bool super_seeding() const
{ return m_super_seeding; }
@ -1017,7 +1022,7 @@ namespace libtorrent
// connection attempt. See:
// http://www.ecs.umass.edu/ece/wolf/courses/ECE697J/papers/DRR.pdf
// The quanta assigned to each torrent depends on the torrents
// priority, whether it's seed and the number of connected
// priority, whether it's a seed and the number of connected
// peers it has. This has the effect that some torrents
// will have more connection attempts than other. Each
// connection attempt costs 100 points from the deficit

View File

@ -35,7 +35,6 @@ POSSIBILITY OF SUCH DAMAGE.
#include <string>
#include <vector>
//#include <iosfwd>
#ifdef _MSC_VER
#pragma warning(push, 1)

View File

@ -119,6 +119,7 @@ namespace libtorrent
void on_connected();
void write_reject_request(peer_request const&) {}
void write_allow_fast(int) {}
void write_suggest(int piece) {}
#ifdef TORRENT_DEBUG
void check_invariant() const;

View File

@ -339,6 +339,29 @@ namespace libtorrent
send_buffer(msg, sizeof(msg));
}
void bt_peer_connection::write_suggest(int piece)
{
INVARIANT_CHECK;
TORRENT_ASSERT(m_sent_handshake && m_sent_bitfield);
TORRENT_ASSERT(associated_torrent().lock()->valid_metadata());
TORRENT_ASSERT(m_supports_fast);
boost::shared_ptr<torrent> t = associated_torrent().lock();
TORRENT_ASSERT(t);
if (m_sent_suggested_pieces.empty())
m_sent_suggested_pieces.resize(t->torrent_file().num_pieces(), false);
if (m_sent_suggested_pieces[piece]) return;
m_sent_suggested_pieces.set_bit(piece);
char msg[] = {0,0,0,5, msg_suggest_piece, 0, 0, 0, 0};
char* ptr = msg + 5;
detail::write_int32(piece, ptr);
send_buffer(msg, sizeof(msg));
}
void bt_peer_connection::get_specific_peer_info(peer_info& p) const
{
TORRENT_ASSERT(!associated_torrent().expired());

View File

@ -449,6 +449,8 @@ namespace libtorrent
flush_and_remove(i, l);
}
if (m_settings.explicit_read_cache) return;
// flush read cache
for (;;)
{
@ -819,7 +821,7 @@ namespace libtorrent
TORRENT_ASSERT(piece_offset <= piece_size);
// this is a block that is already allocated
// free it an allocate a new one
// free it and allocate a new one
if (p.blocks[i].buf)
{
free_buffer(p.blocks[i].buf);
@ -925,35 +927,6 @@ namespace libtorrent
TORRENT_ASSERT(ret == buffer_size);
return ret;
}
// returns -1 on read error, -2 on out of memory error or the number of bytes read
// this function ignores the cache size limit, it will read the entire
// piece regardless of the offset in j
// this is used for seed-mode, where we need to read the entire piece to calculate
// the hash
int disk_io_thread::cache_read_piece(disk_io_job const& j, mutex::scoped_lock& l)
{
INVARIANT_CHECK;
int piece_size = j.storage->info()->piece_size(j.piece);
int blocks_in_piece = (piece_size + m_block_size - 1) / m_block_size;
if (in_use() + blocks_in_piece > m_settings.cache_size)
flush_cache_blocks(l, in_use() + blocks_in_piece - m_settings.cache_size, m_read_pieces.end());
cached_piece_entry p;
p.piece = j.piece;
p.storage = j.storage;
p.last_use = time_now();
p.num_blocks = 0;
p.blocks.reset(new (std::nothrow) cached_block_entry[blocks_in_piece]);
if (!p.blocks) return -1;
int ret = read_into_piece(p, 0, ignore_cache_size, INT_MAX, l);
if (ret >= 0) m_read_pieces.push_back(p);
return ret;
}
// returns -1 on read error, -2 if there isn't any space in the cache
// or the number of bytes read
@ -961,6 +934,10 @@ namespace libtorrent
{
INVARIANT_CHECK;
// this function will create a new cached_piece_entry
// and requires that it doesn't already exist
TORRENT_ASSERT(find_cached_piece(m_read_pieces, j, l) == m_read_pieces.end());
int piece_size = j.storage->info()->piece_size(j.piece);
int blocks_in_piece = (piece_size + m_block_size - 1) / m_block_size;
@ -972,9 +949,11 @@ namespace libtorrent
blocks_to_read = (std::min)(blocks_to_read, m_settings.read_cache_line_size);
if (in_use() + blocks_to_read > m_settings.cache_size)
if (flush_cache_blocks(l, in_use() + blocks_to_read - m_settings.cache_size
, m_read_pieces.end(), dont_flush_write_blocks) == 0)
{
int clear = in_use() + blocks_to_read - m_settings.cache_size;
if (flush_cache_blocks(l, clear, m_read_pieces.end(), dont_flush_write_blocks) < clear)
return -2;
}
cached_piece_entry p;
p.piece = j.piece;
@ -984,7 +963,7 @@ namespace libtorrent
p.blocks.reset(new (std::nothrow) cached_block_entry[blocks_in_piece]);
if (!p.blocks) return -1;
int ret = read_into_piece(p, start_block, 0, blocks_to_read, l);
if (ret >= 0) m_read_pieces.push_back(p);
return ret;
@ -1058,16 +1037,17 @@ namespace libtorrent
}
#endif
int disk_io_thread::read_piece_from_cache_and_hash(disk_io_job const& j, sha1_hash& h)
// reads the full piece specified by j into the read cache
// returns the iterator to it and whether or not it already
// was in the cache (hit).
int disk_io_thread::cache_piece(disk_io_job const& j, cache_t::iterator& p
, bool& hit, int options, mutex::scoped_lock& l)
{
TORRENT_ASSERT(j.buffer);
INVARIANT_CHECK;
mutex::scoped_lock l(m_piece_mutex);
cache_t::iterator p
= find_cached_piece(m_read_pieces, j, l);
p = find_cached_piece(m_read_pieces, j, l);
bool hit = true;
hit = true;
int ret = 0;
int piece_size = j.storage->info()->piece_size(j.piece);
@ -1075,29 +1055,54 @@ namespace libtorrent
if (p != m_read_pieces.end() && p->num_blocks != blocks_in_piece)
{
INVARIANT_CHECK;
// we have the piece in the cache, but not all of the blocks
ret = read_into_piece(*p, 0, ignore_cache_size, blocks_in_piece, l);
ret = read_into_piece(*p, 0, options, blocks_in_piece, l);
hit = false;
if (ret < 0) return ret;
TORRENT_ASSERT(!m_read_pieces.empty());
TORRENT_ASSERT(p->piece == j.piece);
TORRENT_ASSERT(p->storage == j.storage);
}
// if the piece cannot be found in the cache,
// read the whole piece starting at the block
// we got a request for.
if (p == m_read_pieces.end())
else if (p == m_read_pieces.end())
{
ret = cache_read_piece(j, l);
INVARIANT_CHECK;
// if the piece cannot be found in the cache,
// read the whole piece starting at the block
// we got a request for.
cached_piece_entry pe;
pe.piece = j.piece;
pe.storage = j.storage;
pe.last_use = time_now();
pe.num_blocks = 0;
pe.blocks.reset(new (std::nothrow) cached_block_entry[blocks_in_piece]);
if (!pe.blocks) return -1;
ret = read_into_piece(pe, 0, options, INT_MAX, l);
if (ret >= 0) m_read_pieces.push_back(pe);
hit = false;
if (ret < 0) return ret;
p = m_read_pieces.end();
--p;
TORRENT_ASSERT(!m_read_pieces.empty());
TORRENT_ASSERT(p->piece == j.piece);
TORRENT_ASSERT(p->storage == j.storage);
}
p->last_use = time_now();
TORRENT_ASSERT(!m_read_pieces.empty());
TORRENT_ASSERT(p->piece == j.piece);
TORRENT_ASSERT(p->storage == j.storage);
return ret;
}
// cache the entire piece and hash it
int disk_io_thread::read_piece_from_cache_and_hash(disk_io_job const& j, sha1_hash& h)
{
TORRENT_ASSERT(j.buffer);
mutex::scoped_lock l(m_piece_mutex);
cache_t::iterator p;
bool hit;
int ret = cache_piece(j, p, hit, ignore_cache_size, l);
if (ret < 0) return ret;
int piece_size = j.storage->info()->piece_size(j.piece);
int blocks_in_piece = (piece_size + m_block_size - 1) / m_block_size;
hasher ctx;
@ -1115,8 +1120,12 @@ namespace libtorrent
// if read cache is disabled or we exceeded the
// limit, remove this piece from the cache
// also, if the piece wasn't in the cache when
// the function was called, and we're using an
// explicit read cache, remove it again
if (in_use() >= m_settings.cache_size
|| !m_settings.use_read_cache)
|| !m_settings.use_read_cache
|| (m_settings.explicit_read_cache && !hit))
{
TORRENT_ASSERT(!m_read_pieces.empty());
TORRENT_ASSERT(p->piece == j.piece);
@ -1178,6 +1187,10 @@ namespace libtorrent
// boundaries
if (p->blocks[start_block].buf == 0)
{
// if we use an explicit read cache, pretend there's no
// space to force hitting disk without caching anything
if (m_settings.explicit_read_cache) return -2;
int piece_size = j.storage->info()->piece_size(j.piece);
int blocks_in_piece = (piece_size + m_block_size - 1) / m_block_size;
int end_block = start_block;
@ -1188,10 +1201,14 @@ namespace libtorrent
+ m_cache_stats.read_cache_size - in_use())/2, 3));
blocks_to_read = (std::min)(blocks_to_read, m_settings.read_cache_line_size);
blocks_to_read = (std::max)(blocks_to_read, min_blocks_to_read);
// if we don't have enough space for the new piece, try flushing something else
if (in_use() + blocks_to_read > m_settings.cache_size)
if (flush_cache_blocks(l, in_use() + blocks_to_read - m_settings.cache_size
, p, dont_flush_write_blocks) == 0)
{
int clear = in_use() + blocks_to_read - m_settings.cache_size;
if (flush_cache_blocks(l, clear, m_read_pieces.end(), dont_flush_write_blocks) < clear)
return -2;
}
int ret = read_into_piece(*p, block, 0, blocks_to_read, l);
hit = false;
@ -1235,6 +1252,13 @@ namespace libtorrent
// we got a request for.
if (p == m_read_pieces.end())
{
// if we use an explicit read cache and we
// couldn't find the block in the cache,
// pretend that there's not enough space
// to cache it, to force the read operation
// go go straight to disk
if (m_settings.explicit_read_cache) return -2;
ret = cache_read_block(j, l);
hit = false;
if (ret < 0) return ret;
@ -1338,6 +1362,7 @@ namespace libtorrent
, 0 // abort_torrent
, cancel_on_abort // update_settings
, read_operation + cancel_on_abort // read_and_hash
, read_operation + cancel_on_abort // cache_piece
, 0 // finalize_file
};
@ -1821,6 +1846,29 @@ namespace libtorrent
break;
}
case disk_io_job::cache_piece:
{
mutex::scoped_lock l(m_piece_mutex);
if (test_error(j))
{
ret = -1;
break;
}
#ifdef TORRENT_DISK_STATS
m_log << log_time() << " cache " << j.piece << std::endl;
#endif
INVARIANT_CHECK;
TORRENT_ASSERT(j.buffer == 0);
cache_t::iterator p;
bool hit;
ret = cache_piece(j, p, hit, 0, l);
if (ret == -2) ret = -1;
if (ret < 0) test_error(j);
break;
}
case disk_io_job::hash:
{
#ifdef TORRENT_DISK_STATS

View File

@ -145,6 +145,7 @@ namespace libtorrent
, m_snubbed(false)
, m_bitfield_received(false)
, m_no_download(false)
, m_sent_suggests(false)
#ifdef TORRENT_DEBUG
, m_in_constructor(true)
, m_disconnect_started(false)
@ -274,6 +275,7 @@ namespace libtorrent
, m_snubbed(false)
, m_bitfield_received(false)
, m_no_download(false)
, m_sent_suggests(false)
#ifdef TORRENT_DEBUG
, m_in_constructor(true)
, m_disconnect_started(false)
@ -779,7 +781,7 @@ namespace libtorrent
// dont announce during handshake
if (in_handshake()) return;
// remove suggested pieces that we have
// remove suggested pieces once we have them
std::vector<int>::iterator i = std::find(
m_suggested_pieces.begin(), m_suggested_pieces.end(), index);
if (i != m_suggested_pieces.end()) m_suggested_pieces.erase(i);
@ -2842,6 +2844,18 @@ namespace libtorrent
if (!m_choked) return false;
boost::shared_ptr<torrent> t = m_torrent.lock();
if (!t->ready_for_connections()) return false;
if (!m_sent_suggests)
{
std::vector<int> ret;
t->get_suggested_pieces(ret);
for (std::vector<int>::iterator i = ret.begin()
, end(ret.end()); i != end; ++i)
send_suggest(*i);
m_sent_suggests = true;
}
m_last_unchoke = time_now();
write_unchoke();
m_choked = false;
@ -2890,6 +2904,20 @@ namespace libtorrent
disconnect_if_redundant();
}
void peer_connection::send_suggest(int piece)
{
if (m_connecting) return;
if (in_handshake()) return;
// don't suggest a piece that the peer already has
// don't suggest anything to a peer that isn't interested
if (has_piece(piece)
|| !m_peer_interested)
return;
write_suggest(piece);
}
void peer_connection::send_block_requests()
{
INVARIANT_CHECK;

View File

@ -2057,7 +2057,7 @@ namespace libtorrent
block_info const& info = i->info[block.block_index];
return info.num_peers;
}
void piece_picker::get_availability(std::vector<int>& avail) const
{
TORRENT_ASSERT(m_seeds >= 0);

View File

@ -169,8 +169,8 @@ namespace libtorrent
// the same NAT
set.allow_multiple_connections_per_ip = true;
// use 512 MB of cache
set.cache_size = 32768;
// use 1 GB of cache
set.cache_size = 32768 * 2;
set.use_read_cache = true;
set.cache_buffer_chunk_size = 128;
set.read_cache_line_size = 512;
@ -178,6 +178,17 @@ namespace libtorrent
// one hour expiration
set.cache_expiry = 60 * 60;
// flush write cache based on largest contiguous block
set.disk_cache_algorithm = session_settings::largest_contiguous;
// explicitly cache rare pieces
set.explicit_read_cache = true;
// prevent fast pieces to interfere with suggested pieces
// since we unchoke everyone, we don't need fast pieces anyway
set.allowed_fast_set_size = 0;
// suggest pieces in the read cache for higher cache hit rate
set.suggest_mode = session_settings::suggest_read_cache;
set.close_redundant_connections = true;
set.max_rejects = 10;

View File

@ -219,6 +219,7 @@ namespace aux {
TORRENT_SETTING(integer, num_want)
TORRENT_SETTING(integer, initial_picker_threshold)
TORRENT_SETTING(integer, allowed_fast_set_size)
TORRENT_SETTING(integer, suggest_mode)
TORRENT_SETTING(integer, max_queued_disk_bytes)
TORRENT_SETTING(integer, handshake_timeout)
#ifndef TORRENT_DISABLE_DHT
@ -234,6 +235,7 @@ namespace aux {
TORRENT_SETTING(integer, cache_buffer_chunk_size)
TORRENT_SETTING(integer, cache_expiry)
TORRENT_SETTING(boolean, use_read_cache)
TORRENT_SETTING(boolean, explicit_read_cache)
TORRENT_SETTING(integer, disk_io_write_mode)
TORRENT_SETTING(integer, disk_io_read_mode)
TORRENT_SETTING(boolean, coalesce_reads)
@ -425,6 +427,8 @@ namespace aux {
, m_optimistic_unchoke_time_scaler(0)
, m_disconnect_time_scaler(90)
, m_auto_scrape_time_scaler(180)
, m_next_explicit_cache_torrent(0)
, m_cache_rotation_timer(0)
, m_incoming_connection(false)
, m_created(time_now_hires())
, m_last_tick(m_created)
@ -1008,6 +1012,7 @@ namespace aux {
|| m_settings.coalesce_reads != s.coalesce_reads
|| m_settings.max_queued_disk_bytes != s.max_queued_disk_bytes
|| m_settings.disable_hash_checks != s.disable_hash_checks
|| m_settings.explicit_read_cache != s.explicit_read_cache
#ifndef TORRENT_DISABLE_MLOCK
|| m_settings.lock_disk_cache != s.lock_disk_cache
#endif
@ -1873,6 +1878,40 @@ namespace aux {
}
}
// --------------------------------------------------------------
// refresh explicit disk read cache
// --------------------------------------------------------------
--m_cache_rotation_timer;
if (m_settings.explicit_read_cache
&& m_cache_rotation_timer <= 0)
{
m_cache_rotation_timer = m_settings.explicit_cache_interval;
torrent_map::iterator least_recently_refreshed = m_torrents.begin();
if (m_next_explicit_cache_torrent >= m_torrents.size())
m_next_explicit_cache_torrent = 0;
std::advance(least_recently_refreshed, m_next_explicit_cache_torrent);
// how many blocks does this torrent get?
int cache_size = (std::max)(0, m_settings.cache_size * 9 / 10);
if (m_connections.empty())
{
// if we don't have any connections at all, split the
// cache evenly across all torrents
cache_size = cache_size / m_torrents.size();
}
else
{
cache_size = cache_size * least_recently_refreshed->second->num_peers()
/ m_connections.size();
}
least_recently_refreshed->second->refresh_explicit_cache(cache_size);
++m_next_explicit_cache_torrent;
}
// --------------------------------------------------------------
// connect new peers
// --------------------------------------------------------------

View File

@ -1546,6 +1546,21 @@ ret:
#endif
}
void piece_manager::async_cache(int piece
, boost::function<void(int, disk_io_job const&)> const& handler
, int priority)
{
disk_io_job j;
j.storage = this;
j.action = disk_io_job::cache_piece;
j.piece = piece;
j.offset = 0;
j.buffer_size = 0;
j.buffer = 0;
j.priority = priority;
m_io_thread.add_job(j, handler);
}
void piece_manager::async_read(
peer_request const& r
, boost::function<void(int, disk_io_job const&)> const& handler

View File

@ -245,6 +245,7 @@ namespace libtorrent
, m_deficit_counter(0)
, m_sequence_number(seq)
, m_last_working_tracker(-1)
, m_failed_trackers(0)
, m_time_scaler(0)
, m_priority(0)
, m_abort(false)
@ -628,6 +629,14 @@ namespace libtorrent
, p.piece, _1));
}
}
void torrent::on_disk_cache_complete(int ret, disk_io_job const& j)
{
// suggest this piece to all peers
for (peer_iterator i = m_connections.begin();
i != m_connections.end(); ++i)
(*i)->send_suggest(j.piece);
}
bool torrent::add_merkle_nodes(std::map<int, sha1_hash> const& nodes, int piece)
{
@ -5479,6 +5488,126 @@ namespace libtorrent
m_stat.second_tick(tick_interval_ms);
}
void torrent::refresh_explicit_cache(int cache_size)
{
if (!ready_for_connections()) return;
// rotate the cached pieces
// add blocks_per_piece / 2 in order to round to closest whole piece
int blocks_per_piece = m_torrent_file->piece_length() / m_block_size;
int num_cache_pieces = (cache_size + blocks_per_piece / 2) / blocks_per_piece;
if (num_cache_pieces > m_torrent_file->num_pieces())
num_cache_pieces = m_torrent_file->num_pieces();
std::vector<int> avail_vec;
if (has_picker())
{
m_picker->get_availability(avail_vec);
}
else
{
// we don't keep track of availability, do it the expensive way
// do a linear search from the first piece
for (int i = 0; i < m_torrent_file->num_pieces(); ++i)
{
int availability = 0;
if (!have_piece(i))
{
avail_vec.push_back(INT_MAX);
continue;
}
for (const_peer_iterator j = this->begin(); j != this->end(); ++j)
if ((*j)->has_piece(i)) ++availability;
avail_vec.push_back(availability);
}
}
// now pick the num_cache_pieces rarest pieces from avail_vec
std::vector<std::pair<int, int> > pieces(m_torrent_file->num_pieces());
for (int i = 0; i < m_torrent_file->num_pieces(); ++i)
{
pieces[i].second = i;
if (!have_piece(i)) pieces[i].first = INT_MAX;
else pieces[i].first = avail_vec[i];
}
// decrease the availability of the pieces that are
// already in the read cache, to move them closer to
// the beginning of the pieces list, and more likely
// to be included in this round of cache pieces
std::vector<cached_piece_info> ret;
m_ses.m_disk_thread.get_cache_info(info_hash(), ret);
// remove write cache entries
ret.erase(std::remove_if(ret.begin(), ret.end()
, boost::bind(&cached_piece_info::kind, _1) == cached_piece_info::write_cache)
, ret.end());
for (std::vector<cached_piece_info>::iterator i = ret.begin()
, end(ret.end()); i != end; ++i)
{
--pieces[i->piece].first;
}
std::random_shuffle(pieces.begin(), pieces.end());
std::stable_sort(pieces.begin(), pieces.end()
, boost::bind(&std::pair<int, int>::first, _1) <
boost::bind(&std::pair<int, int>::first, _2));
avail_vec.clear();
for (int i = 0; i < num_cache_pieces; ++i)
{
if (pieces[i].first == INT_MAX) break;
avail_vec.push_back(pieces[i].second);
}
if (!avail_vec.empty())
{
// the number of pieces to cache for this torrent is proportional
// the number of peers it has, divided by the total number of peers.
// Each peer gets an equal share of the cache
avail_vec.resize((std::min)(num_cache_pieces, int(avail_vec.size())));
for (std::vector<int>::iterator i = avail_vec.begin()
, end(avail_vec.end()); i != end; ++i)
filesystem().async_cache(*i, bind(&torrent::on_disk_cache_complete
, shared_from_this(), _1, _2));
}
}
void torrent::get_suggested_pieces(std::vector<int>& s) const
{
if (m_ses.m_settings.suggest_mode == session_settings::no_piece_suggestions)
{
s.clear();
return;
}
std::vector<cached_piece_info> ret;
m_ses.m_disk_thread.get_cache_info(info_hash(), ret);
ptime now = time_now();
// remove write cache entries
ret.erase(std::remove_if(ret.begin(), ret.end()
, boost::bind(&cached_piece_info::kind, _1) == cached_piece_info::write_cache)
, ret.end());
// sort by how new the cached entry is, new pieces first
std::sort(ret.begin(), ret.end()
, boost::bind(&cached_piece_info::last_use, _1)
< boost::bind(&cached_piece_info::last_use, _2));
// cut off the oldest pieces that we don't want to suggest
// if we have an explicit cache, it's much more likely to
// stick around, so we should suggest all pieces
int num_pieces_to_suggest = int(ret.size());
if (!m_ses.m_settings.explicit_read_cache)
num_pieces_to_suggest = (std::max)(1, int(ret.size() / 2));
ret.resize(num_pieces_to_suggest);
std::transform(ret.begin(), ret.end(), std::back_inserter(s)
, boost::bind(&cached_piece_info::piece, _1));
}
void torrent::add_stats(stat const& s)
{
// these stats are propagated to the session