improved disk error handling and expanded use of error_code in error reporting
This commit is contained in:
parent
3fa0f7636b
commit
f0134c1b11
|
@ -1,3 +1,6 @@
|
|||
* improved disk error handling and expanded use of error_code in
|
||||
error reporting. added a bandwidth state, bw_disk, when waiting
|
||||
for the disk io thread to catch up writing buffers
|
||||
* improved read cache memory efficiency
|
||||
* added another cache flush algorithm to write the largest
|
||||
contiguous blocks instead of the least recently used
|
||||
|
|
|
@ -3014,7 +3014,7 @@ It contains the following fields::
|
|||
|
||||
int source;
|
||||
|
||||
enum bw_state { bw_idle, bw_torrent, bw_global, bw_network };
|
||||
enum bw_state { bw_idle, bw_limit, bw_network, bw_disk };
|
||||
|
||||
char read_state;
|
||||
char write_state;
|
||||
|
@ -3172,13 +3172,7 @@ defines as follows:
|
|||
| | send or receive data. |
|
||||
| | |
|
||||
+------------------------+--------------------------------------------------------+
|
||||
| ``bw_torrent`` | The peer is waiting for the torrent to receive |
|
||||
| | bandwidth quota in order to forward the bandwidth |
|
||||
| | request to the global manager. |
|
||||
| | |
|
||||
+------------------------+--------------------------------------------------------+
|
||||
| ``bw_global`` | The peer is waiting for the global bandwidth manager |
|
||||
| | to receive more quota in order to handle the request. |
|
||||
| ``bw_limit`` | The peer is waiting for the rate limiter. |
|
||||
| | |
|
||||
+------------------------+--------------------------------------------------------+
|
||||
| ``bw_network`` | The peer has quota and is currently waiting for a |
|
||||
|
@ -3187,6 +3181,10 @@ defines as follows:
|
|||
| | limits. |
|
||||
| | |
|
||||
+------------------------+--------------------------------------------------------+
|
||||
| ``bw_disk`` | The peer is waiting for the disk I/O thread to catch |
|
||||
| | up writing buffers to disk before downloading more. |
|
||||
| | |
|
||||
+------------------------+--------------------------------------------------------+
|
||||
|
||||
The ``ip`` field is the IP-address to this peer. The type is an asio endpoint. For
|
||||
more info, see the asio_ documentation.
|
||||
|
@ -3386,7 +3384,7 @@ session_settings
|
|||
int num_want;
|
||||
int initial_picker_threshold;
|
||||
int allowed_fast_set_size;
|
||||
int max_outstanding_disk_bytes_per_connection;
|
||||
int max_queued_disk_bytes;
|
||||
int handshake_timeout;
|
||||
bool use_dht_as_fallback;
|
||||
bool free_torrent_hashes;
|
||||
|
@ -3595,10 +3593,12 @@ in rarest first order.
|
|||
``allowed_fast_set_size`` is the number of pieces we allow peers to download
|
||||
from us without being unchoked.
|
||||
|
||||
``max_outstanding_disk_bytes_per_connection`` is the number of bytes each
|
||||
connection is allowed to have waiting in the disk I/O queue before it is
|
||||
throttled back. This limit is meant to stop fast internet connections to
|
||||
queue up bufferes indefinitely on slow hard-drives or storage.
|
||||
``max_queued_disk_bytes`` is the number maximum number of bytes, to be
|
||||
written to disk, that can wait in the disk I/O thread queue. This queue
|
||||
is only for waiting for the disk I/O thread to receive the job and either
|
||||
write it to disk or insert it in the write cache. When this limit is reached,
|
||||
the peer connections will stop reading data from their sockets, until the disk
|
||||
thread catches up. Setting this too low will severly limit your download rate.
|
||||
|
||||
``handshake_timeout`` specifies the number of seconds we allow a peer to
|
||||
delay responding to a protocol handshake. If no response is received within
|
||||
|
@ -4687,7 +4687,7 @@ generated and the torrent is paused.
|
|||
|
||||
``file`` is the path to the file that was accessed when the error occurred.
|
||||
|
||||
``msg`` is the error message received from the OS.
|
||||
``error`` is the error code describing the error.
|
||||
|
||||
::
|
||||
|
||||
|
@ -4695,7 +4695,7 @@ generated and the torrent is paused.
|
|||
{
|
||||
// ...
|
||||
std::string file;
|
||||
std::string msg;
|
||||
error_code error;
|
||||
};
|
||||
|
||||
file_renamed_alert
|
||||
|
|
|
@ -958,15 +958,23 @@ namespace libtorrent
|
|||
{
|
||||
file_error_alert(
|
||||
std::string const& f
|
||||
, const torrent_handle& h
|
||||
, const std::string& msg_)
|
||||
, torrent_handle const& h
|
||||
, error_code const& e)
|
||||
: torrent_alert(h)
|
||||
, file(f)
|
||||
, msg(msg_)
|
||||
{}
|
||||
, error(e)
|
||||
{
|
||||
#ifndef TORRENT_NO_DEPRECATE
|
||||
msg = error.message();
|
||||
#endif
|
||||
}
|
||||
|
||||
std::string file;
|
||||
error_code error;
|
||||
|
||||
#ifndef TORRENT_NO_DEPRECATE
|
||||
std::string msg;
|
||||
#endif
|
||||
|
||||
virtual std::auto_ptr<alert> clone() const
|
||||
{ return std::auto_ptr<alert>(new file_error_alert(*this)); }
|
||||
|
@ -978,7 +986,7 @@ namespace libtorrent
|
|||
virtual std::string message() const
|
||||
{
|
||||
return torrent_alert::message() + " file (" + file + ") error: "
|
||||
+ msg;
|
||||
+ error.message();
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -590,6 +590,7 @@ namespace libtorrent
|
|||
// NAT or not.
|
||||
bool m_incoming_connection;
|
||||
|
||||
void on_disk_queue();
|
||||
void on_tick(error_code const& e);
|
||||
|
||||
int auto_manage_torrents(std::vector<torrent*>& list
|
||||
|
|
|
@ -73,8 +73,6 @@ namespace libtorrent
|
|||
, piece(0)
|
||||
, offset(0)
|
||||
, priority(0)
|
||||
, error_piece(-1)
|
||||
, error_op(-1)
|
||||
{}
|
||||
|
||||
enum action_t
|
||||
|
@ -122,12 +120,6 @@ namespace libtorrent
|
|||
// the error code from the file operation
|
||||
error_code error;
|
||||
|
||||
// the piece the error occurred on
|
||||
int error_piece;
|
||||
|
||||
// the operation that failed (only read or write)
|
||||
int error_op;
|
||||
|
||||
// this is called when operation completes
|
||||
boost::function<void(int, disk_io_job const&)> callback;
|
||||
};
|
||||
|
@ -244,7 +236,9 @@ namespace libtorrent
|
|||
// of disk io jobs
|
||||
struct disk_io_thread : disk_buffer_pool
|
||||
{
|
||||
disk_io_thread(io_service& ios, int block_size = 16 * 1024);
|
||||
disk_io_thread(io_service& ios
|
||||
, boost::function<void()> const& queue_callback
|
||||
, int block_size = 16 * 1024);
|
||||
~disk_io_thread();
|
||||
|
||||
void join();
|
||||
|
@ -255,6 +249,8 @@ namespace libtorrent
|
|||
, boost::function<void(int, disk_io_job const&)> const& f
|
||||
= boost::function<void(int, disk_io_job const&)>());
|
||||
|
||||
int queued_write_bytes() const;
|
||||
|
||||
// keep track of the number of bytes in the job queue
|
||||
// at any given time. i.e. the sum of all buffer_size.
|
||||
// this is used to slow down the download global download
|
||||
|
@ -277,6 +273,17 @@ namespace libtorrent
|
|||
std::ofstream m_disk_access_log;
|
||||
#endif
|
||||
|
||||
struct cached_block_entry
|
||||
{
|
||||
cached_block_entry(): buf(0) {}
|
||||
// the buffer pointer (this is a disk_pool buffer)
|
||||
// or 0
|
||||
char* buf;
|
||||
|
||||
// callback for when this block is flushed to disk
|
||||
boost::function<void(int, disk_io_job const&)> callback;
|
||||
};
|
||||
|
||||
struct cached_piece_entry
|
||||
{
|
||||
int piece;
|
||||
|
@ -287,7 +294,7 @@ namespace libtorrent
|
|||
// the number of blocks in the cache for this piece
|
||||
int num_blocks;
|
||||
// the pointers to the block data
|
||||
boost::shared_array<char*> blocks;
|
||||
boost::shared_array<cached_block_entry> blocks;
|
||||
};
|
||||
|
||||
typedef boost::recursive_mutex mutex_t;
|
||||
|
@ -308,13 +315,17 @@ namespace libtorrent
|
|||
|
||||
// write cache operations
|
||||
enum options_t { dont_flush_write_blocks = 1, ignore_cache_size = 2 };
|
||||
int flush_cache_blocks(mutex_t::scoped_lock& l, int blocks, cache_t::iterator ignore, int options = 0);
|
||||
int flush_cache_blocks(mutex_t::scoped_lock& l
|
||||
, int blocks, cache_t::iterator ignore
|
||||
, int options = 0);
|
||||
void flush_expired_pieces();
|
||||
int flush_and_remove(cache_t::iterator i, mutex_t::scoped_lock& l);
|
||||
int flush_contiguous_blocks(disk_io_thread::cache_t::iterator e
|
||||
, mutex_t::scoped_lock& l, int lower_limit = 0);
|
||||
int flush_range(cache_t::iterator i, int start, int end, mutex_t::scoped_lock& l);
|
||||
int cache_block(disk_io_job& j, mutex_t::scoped_lock& l);
|
||||
int cache_block(disk_io_job& j
|
||||
, boost::function<void(int,disk_io_job const&)>& handler
|
||||
, mutex_t::scoped_lock& l);
|
||||
|
||||
// read cache operations
|
||||
int clear_oldest_read_piece(int num_blocks, cache_t::iterator ignore
|
||||
|
@ -351,12 +362,16 @@ namespace libtorrent
|
|||
// exceed m_cache_size
|
||||
cache_status m_cache_stats;
|
||||
|
||||
int m_queued_write_bytes;
|
||||
|
||||
#ifdef TORRENT_DISK_STATS
|
||||
std::ofstream m_log;
|
||||
#endif
|
||||
|
||||
io_service& m_ios;
|
||||
|
||||
boost::function<void()> m_queue_callback;
|
||||
|
||||
// this keeps the io_service::run() call blocked from
|
||||
// returning. When shutting down, it's possible that
|
||||
// the event queue is drained before the disk_io_thread
|
||||
|
|
|
@ -519,6 +519,8 @@ namespace libtorrent
|
|||
size_type downloaded_since_unchoke() const
|
||||
{ return m_statistics.total_payload_download() - m_downloaded_at_last_unchoke; }
|
||||
|
||||
void setup_receive();
|
||||
|
||||
protected:
|
||||
|
||||
virtual void get_specific_peer_info(peer_info& p) const = 0;
|
||||
|
@ -571,8 +573,6 @@ namespace libtorrent
|
|||
void reset_recv_buffer(int packet_size);
|
||||
void set_soft_packet_size(int size) { m_soft_packet_size = size; }
|
||||
|
||||
void setup_receive();
|
||||
|
||||
void attach_to_torrent(sha1_hash const& ih);
|
||||
|
||||
bool verify_piece(peer_request const& p) const;
|
||||
|
|
|
@ -80,11 +80,12 @@ namespace libtorrent
|
|||
int source;
|
||||
|
||||
// bw_idle: the channel is not used
|
||||
// bw_torrent: the channel is waiting for torrent quota
|
||||
// bw_global: the channel is waiting for global quota
|
||||
// bw_limit: the channel is waiting for quota
|
||||
// bw_network: the channel is waiting for an async write
|
||||
// for read operation to complete
|
||||
enum bw_state { bw_idle, bw_limit, bw_network };
|
||||
// bw_disk: the peer is waiting for the disk io thread
|
||||
// to catch up
|
||||
enum bw_state { bw_idle, bw_limit, bw_network, bw_disk };
|
||||
#ifndef TORRENT_NO_DEPRECATE
|
||||
enum bw_state_deprecated { bw_torrent = bw_limit, bw_global = bw_limit };
|
||||
#endif
|
||||
|
|
|
@ -275,7 +275,7 @@ namespace libtorrent
|
|||
, piece_state_t s);
|
||||
void mark_as_writing(piece_block block, void* peer);
|
||||
void mark_as_finished(piece_block block, void* peer);
|
||||
void write_failed(int piece);
|
||||
void write_failed(piece_block block);
|
||||
int num_peers(piece_block block) const;
|
||||
|
||||
// returns information about the given piece
|
||||
|
|
|
@ -112,7 +112,7 @@ namespace libtorrent
|
|||
, num_want(200)
|
||||
, initial_picker_threshold(4)
|
||||
, allowed_fast_set_size(10)
|
||||
, max_outstanding_disk_bytes_per_connection(64 * 1024)
|
||||
, max_queued_disk_bytes(256 * 1024)
|
||||
, handshake_timeout(10)
|
||||
#ifndef TORRENT_DISABLE_DHT
|
||||
, use_dht_as_fallback(false)
|
||||
|
@ -334,8 +334,11 @@ namespace libtorrent
|
|||
// rate is being throttled. This prevents fast downloads
|
||||
// to slow medias to allocate more and more memory
|
||||
// indefinitely. This should be set to at least 16 kB
|
||||
// to not completely disrupt normal downloads.
|
||||
int max_outstanding_disk_bytes_per_connection;
|
||||
// to not completely disrupt normal downloads. If it's
|
||||
// set to 0, you will be starving the disk thread and
|
||||
// nothing will be written to disk.
|
||||
// this is a per session setting.
|
||||
int max_queued_disk_bytes;
|
||||
|
||||
// the number of seconds to wait for a handshake
|
||||
// response from a peer. If no response is received
|
||||
|
|
|
@ -224,6 +224,8 @@ namespace libtorrent
|
|||
|
||||
void async_check_files(boost::function<void(int, disk_io_job const&)> const& handler);
|
||||
|
||||
int queued_bytes() const;
|
||||
|
||||
void async_rename_file(int index, std::string const& name
|
||||
, boost::function<void(int, disk_io_job const&)> const& handler);
|
||||
|
||||
|
|
|
@ -268,13 +268,17 @@ namespace libtorrent
|
|||
// ------- disk_io_thread ------
|
||||
|
||||
|
||||
disk_io_thread::disk_io_thread(asio::io_service& ios, int block_size)
|
||||
disk_io_thread::disk_io_thread(asio::io_service& ios
|
||||
, boost::function<void()> const& queue_callback
|
||||
, int block_size)
|
||||
: disk_buffer_pool(block_size)
|
||||
, m_abort(false)
|
||||
, m_waiting_to_shutdown(false)
|
||||
, m_queue_buffer_size(0)
|
||||
, m_last_file_check(time_now_hires())
|
||||
, m_queued_write_bytes(0)
|
||||
, m_ios(ios)
|
||||
, m_queue_callback(queue_callback)
|
||||
, m_work(io_service::work(m_ios))
|
||||
, m_disk_io_thread(boost::ref(*this))
|
||||
{
|
||||
|
@ -322,7 +326,7 @@ namespace libtorrent
|
|||
int blocks_in_piece = (ti.piece_size(i->piece) + (m_block_size) - 1) / m_block_size;
|
||||
info.blocks.resize(blocks_in_piece);
|
||||
for (int b = 0; b < blocks_in_piece; ++b)
|
||||
if (i->blocks[b]) info.blocks[b] = true;
|
||||
if (i->blocks[b].buf) info.blocks[b] = true;
|
||||
ret.push_back(info);
|
||||
}
|
||||
for (cache_t::const_iterator i = m_read_pieces.begin()
|
||||
|
@ -337,7 +341,7 @@ namespace libtorrent
|
|||
int blocks_in_piece = (ti.piece_size(i->piece) + (m_block_size) - 1) / m_block_size;
|
||||
info.blocks.resize(blocks_in_piece);
|
||||
for (int b = 0; b < blocks_in_piece; ++b)
|
||||
if (i->blocks[b]) info.blocks[b] = true;
|
||||
if (i->blocks[b].buf) info.blocks[b] = true;
|
||||
ret.push_back(info);
|
||||
}
|
||||
}
|
||||
|
@ -466,10 +470,10 @@ namespace libtorrent
|
|||
|
||||
for (int i = 0; i < blocks_in_piece; ++i)
|
||||
{
|
||||
if (p.blocks[i] == 0) continue;
|
||||
free_buffer(p.blocks[i]);
|
||||
if (p.blocks[i].buf == 0) continue;
|
||||
free_buffer(p.blocks[i].buf);
|
||||
++ret;
|
||||
p.blocks[i] = 0;
|
||||
p.blocks[i].buf = 0;
|
||||
--p.num_blocks;
|
||||
--m_cache_stats.cache_size;
|
||||
--m_cache_stats.read_cache_size;
|
||||
|
@ -507,10 +511,10 @@ namespace libtorrent
|
|||
|
||||
while (num_blocks)
|
||||
{
|
||||
while (i->blocks[start] == 0 && start <= end) ++start;
|
||||
while (i->blocks[start].buf == 0 && start <= end) ++start;
|
||||
if (start > end) break;
|
||||
free_buffer(i->blocks[start]);
|
||||
i->blocks[start] = 0;
|
||||
free_buffer(i->blocks[start].buf);
|
||||
i->blocks[start].buf = 0;
|
||||
++blocks;
|
||||
--i->num_blocks;
|
||||
--m_cache_stats.cache_size;
|
||||
|
@ -518,10 +522,10 @@ namespace libtorrent
|
|||
--num_blocks;
|
||||
if (!num_blocks) break;
|
||||
|
||||
while (i->blocks[end] == 0 && start <= end) --end;
|
||||
while (i->blocks[end].buf == 0 && start <= end) --end;
|
||||
if (start > end) break;
|
||||
free_buffer(i->blocks[end]);
|
||||
i->blocks[end] = 0;
|
||||
free_buffer(i->blocks[end].buf);
|
||||
i->blocks[end].buf = 0;
|
||||
++blocks;
|
||||
--i->num_blocks;
|
||||
--m_cache_stats.cache_size;
|
||||
|
@ -543,7 +547,7 @@ namespace libtorrent
|
|||
int blocks_in_piece = (b.storage->info()->piece_size(b.piece) + 16 * 1024 - 1) / (16 * 1024);
|
||||
for (int i = 0; i < blocks_in_piece; ++i)
|
||||
{
|
||||
if (b.blocks[i]) ++current;
|
||||
if (b.blocks[i].buf) ++current;
|
||||
else
|
||||
{
|
||||
if (current > ret) ret = current;
|
||||
|
@ -566,7 +570,7 @@ namespace libtorrent
|
|||
+ m_block_size - 1) / m_block_size;
|
||||
for (int i = 0; i < blocks_in_piece; ++i)
|
||||
{
|
||||
if (e->blocks[i]) ++current;
|
||||
if (e->blocks[i].buf) ++current;
|
||||
else
|
||||
{
|
||||
if (current > len)
|
||||
|
@ -673,7 +677,7 @@ namespace libtorrent
|
|||
end = (std::min)(end, blocks_in_piece);
|
||||
for (int i = start; i <= end; ++i)
|
||||
{
|
||||
if (i == end || p.blocks[i] == 0)
|
||||
if (i == end || p.blocks[i].buf == 0)
|
||||
{
|
||||
if (buffer_size == 0) continue;
|
||||
|
||||
|
@ -704,13 +708,13 @@ namespace libtorrent
|
|||
TORRENT_ASSERT(offset + block_size > 0);
|
||||
if (!buf)
|
||||
{
|
||||
iov[iov_counter].iov_base = p.blocks[i];
|
||||
iov[iov_counter].iov_base = p.blocks[i].buf;
|
||||
iov[iov_counter].iov_len = block_size;
|
||||
++iov_counter;
|
||||
}
|
||||
else
|
||||
{
|
||||
std::memcpy(buf.get() + offset, p.blocks[i], block_size);
|
||||
std::memcpy(buf.get() + offset, p.blocks[i].buf, block_size);
|
||||
offset += m_block_size;
|
||||
}
|
||||
buffer_size += block_size;
|
||||
|
@ -721,11 +725,22 @@ namespace libtorrent
|
|||
}
|
||||
|
||||
int ret = 0;
|
||||
disk_io_job j;
|
||||
j.storage = p.storage;
|
||||
j.action = disk_io_job::write;
|
||||
j.buffer = 0;
|
||||
j.piece = p.piece;
|
||||
test_error(j);
|
||||
for (int i = start; i < end; ++i)
|
||||
{
|
||||
if (p.blocks[i] == 0) continue;
|
||||
free_buffer(p.blocks[i]);
|
||||
p.blocks[i] = 0;
|
||||
if (p.blocks[i].buf == 0) continue;
|
||||
j.buffer_size = (std::min)(piece_size - i * m_block_size, m_block_size);
|
||||
int result = j.error ? -1 : j.buffer_size;
|
||||
j.offset = i * m_block_size;
|
||||
free_buffer(p.blocks[i].buf);
|
||||
post_callback(p.blocks[i].callback, j, result);
|
||||
p.blocks[i].callback.clear();
|
||||
p.blocks[i].buf = 0;
|
||||
++ret;
|
||||
}
|
||||
|
||||
|
@ -733,36 +748,41 @@ namespace libtorrent
|
|||
// std::cerr << " flushing p: " << p.piece << " cached_blocks: " << m_cache_stats.cache_size << std::endl;
|
||||
#ifdef TORRENT_DEBUG
|
||||
for (int i = start; i < end; ++i)
|
||||
TORRENT_ASSERT(p.blocks[i] == 0);
|
||||
TORRENT_ASSERT(p.blocks[i].buf == 0);
|
||||
#endif
|
||||
return ret;
|
||||
}
|
||||
|
||||
// returns -1 on failure
|
||||
int disk_io_thread::cache_block(disk_io_job& j, mutex_t::scoped_lock& l)
|
||||
int disk_io_thread::cache_block(disk_io_job& j
|
||||
, boost::function<void(int,disk_io_job const&)>& handler
|
||||
, mutex_t::scoped_lock& l)
|
||||
{
|
||||
INVARIANT_CHECK;
|
||||
TORRENT_ASSERT(find_cached_piece(m_pieces, j, l) == m_pieces.end());
|
||||
TORRENT_ASSERT((j.offset & (m_block_size-1)) == 0);
|
||||
cached_piece_entry p;
|
||||
|
||||
int piece_size = j.storage->info()->piece_size(j.piece);
|
||||
int blocks_in_piece = (piece_size + m_block_size - 1) / m_block_size;
|
||||
// there's no point in caching the piece if
|
||||
// there's only one block in it
|
||||
if (blocks_in_piece <= 1) return -1;
|
||||
|
||||
#ifdef TORRENT_DISK_STATS
|
||||
rename_buffer(j.buffer, "write cache");
|
||||
#endif
|
||||
|
||||
int piece_size = j.storage->info()->piece_size(j.piece);
|
||||
int blocks_in_piece = (piece_size + m_block_size - 1) / m_block_size;
|
||||
|
||||
p.piece = j.piece;
|
||||
p.storage = j.storage;
|
||||
p.last_use = time_now();
|
||||
p.num_blocks = 1;
|
||||
p.blocks.reset(new (std::nothrow) char*[blocks_in_piece]);
|
||||
p.blocks.reset(new (std::nothrow) cached_block_entry[blocks_in_piece]);
|
||||
if (!p.blocks) return -1;
|
||||
std::memset(&p.blocks[0], 0, blocks_in_piece * sizeof(char*));
|
||||
int block = j.offset / m_block_size;
|
||||
// std::cerr << " adding cache entry for p: " << j.piece << " block: " << block << " cached_blocks: " << m_cache_stats.cache_size << std::endl;
|
||||
p.blocks[block] = j.buffer;
|
||||
p.blocks[block].buf = j.buffer;
|
||||
p.blocks[block].callback.swap(handler);
|
||||
++m_cache_stats.cache_size;
|
||||
m_pieces.push_back(p);
|
||||
return 0;
|
||||
|
@ -785,11 +805,11 @@ namespace libtorrent
|
|||
// this is a block that is already allocated
|
||||
// stop allocating and don't read more than
|
||||
// what we've allocated now
|
||||
if (p.blocks[i]) break;
|
||||
p.blocks[i] = allocate_buffer("read cache");
|
||||
if (p.blocks[i].buf) break;
|
||||
p.blocks[i].buf = allocate_buffer("read cache");
|
||||
|
||||
// the allocation failed, break
|
||||
if (p.blocks[i] == 0) break;
|
||||
if (p.blocks[i].buf == 0) break;
|
||||
++p.num_blocks;
|
||||
++m_cache_stats.cache_size;
|
||||
++m_cache_stats.read_cache_size;
|
||||
|
@ -835,17 +855,17 @@ namespace libtorrent
|
|||
for (int i = start_block; i < end_block; ++i)
|
||||
{
|
||||
int block_size = (std::min)(piece_size - piece_offset, m_block_size);
|
||||
if (p.blocks[i] == 0) break;
|
||||
if (p.blocks[i].buf == 0) break;
|
||||
TORRENT_ASSERT(offset <= buffer_size);
|
||||
TORRENT_ASSERT(piece_offset <= piece_size);
|
||||
TORRENT_ASSERT(offset + block_size <= buffer_size);
|
||||
if (buf)
|
||||
{
|
||||
std::memcpy(p.blocks[i], buf.get() + offset, block_size);
|
||||
std::memcpy(p.blocks[i].buf, buf.get() + offset, block_size);
|
||||
}
|
||||
else
|
||||
{
|
||||
iov[iov_counter].iov_base = p.blocks[i];
|
||||
iov[iov_counter].iov_base = p.blocks[i].buf;
|
||||
iov[iov_counter].iov_len = block_size;
|
||||
++iov_counter;
|
||||
}
|
||||
|
@ -894,9 +914,8 @@ namespace libtorrent
|
|||
p.storage = j.storage;
|
||||
p.last_use = time_now();
|
||||
p.num_blocks = 0;
|
||||
p.blocks.reset(new (std::nothrow) char*[blocks_in_piece]);
|
||||
p.blocks.reset(new (std::nothrow) cached_block_entry[blocks_in_piece]);
|
||||
if (!p.blocks) return -1;
|
||||
std::memset(&p.blocks[0], 0, blocks_in_piece * sizeof(char*));
|
||||
int ret = read_into_piece(p, 0, ignore_cache_size, INT_MAX, l);
|
||||
|
||||
if (ret < 0)
|
||||
|
@ -933,9 +952,8 @@ namespace libtorrent
|
|||
p.storage = j.storage;
|
||||
p.last_use = time_now();
|
||||
p.num_blocks = 0;
|
||||
p.blocks.reset(new (std::nothrow) char*[blocks_in_piece]);
|
||||
p.blocks.reset(new (std::nothrow) cached_block_entry[blocks_in_piece]);
|
||||
if (!p.blocks) return -1;
|
||||
std::memset(&p.blocks[0], 0, blocks_in_piece * sizeof(char*));
|
||||
int ret = read_into_piece(p, start_block, 0, blocks_to_read, l);
|
||||
|
||||
if (ret < 0)
|
||||
|
@ -962,10 +980,10 @@ namespace libtorrent
|
|||
int blocks = 0;
|
||||
for (int k = 0; k < blocks_in_piece; ++k)
|
||||
{
|
||||
if (p.blocks[k])
|
||||
if (p.blocks[k].buf)
|
||||
{
|
||||
#ifndef TORRENT_DISABLE_POOL_ALLOCATOR
|
||||
TORRENT_ASSERT(is_disk_buffer(p.blocks[k]));
|
||||
TORRENT_ASSERT(is_disk_buffer(p.blocks[k].buf));
|
||||
#endif
|
||||
++blocks;
|
||||
}
|
||||
|
@ -986,10 +1004,10 @@ namespace libtorrent
|
|||
int blocks = 0;
|
||||
for (int k = 0; k < blocks_in_piece; ++k)
|
||||
{
|
||||
if (p.blocks[k])
|
||||
if (p.blocks[k].buf)
|
||||
{
|
||||
#ifndef TORRENT_DISABLE_POOL_ALLOCATOR
|
||||
TORRENT_ASSERT(is_disk_buffer(p.blocks[k]));
|
||||
TORRENT_ASSERT(is_disk_buffer(p.blocks[k].buf));
|
||||
#endif
|
||||
++blocks;
|
||||
}
|
||||
|
@ -1048,8 +1066,8 @@ namespace libtorrent
|
|||
|
||||
for (int i = 0; i < blocks_in_piece; ++i)
|
||||
{
|
||||
TORRENT_ASSERT(p->blocks[i]);
|
||||
ctx.update((char const*)p->blocks[i], (std::min)(piece_size, m_block_size));
|
||||
TORRENT_ASSERT(p->blocks[i].buf);
|
||||
ctx.update((char const*)p->blocks[i].buf, (std::min)(piece_size, m_block_size));
|
||||
piece_size -= m_block_size;
|
||||
}
|
||||
h = ctx.final();
|
||||
|
@ -1092,17 +1110,17 @@ namespace libtorrent
|
|||
int min_blocks_to_read = block_offset > 0 ? 2 : 1;
|
||||
TORRENT_ASSERT(size <= m_block_size);
|
||||
int start_block = block;
|
||||
if (p->blocks[start_block] != 0 && min_blocks_to_read > 1)
|
||||
if (p->blocks[start_block].buf != 0 && min_blocks_to_read > 1)
|
||||
++start_block;
|
||||
// if block_offset > 0, we need to read two blocks, and then
|
||||
// copy parts of both, because it's not aligned to the block
|
||||
// boundaries
|
||||
if (p->blocks[start_block] == 0)
|
||||
if (p->blocks[start_block].buf == 0)
|
||||
{
|
||||
int piece_size = j.storage->info()->piece_size(j.piece);
|
||||
int blocks_in_piece = (piece_size + m_block_size - 1) / m_block_size;
|
||||
int end_block = start_block;
|
||||
while (end_block < blocks_in_piece && p->blocks[end_block] == 0) ++end_block;
|
||||
while (end_block < blocks_in_piece && p->blocks[end_block].buf == 0) ++end_block;
|
||||
|
||||
int blocks_to_read = end_block - block;
|
||||
blocks_to_read = (std::min)(blocks_to_read, (std::max)((m_settings.cache_size
|
||||
|
@ -1118,17 +1136,17 @@ namespace libtorrent
|
|||
hit = false;
|
||||
if (ret < 0) return ret;
|
||||
if (ret < size + block_offset) return -2;
|
||||
TORRENT_ASSERT(p->blocks[block]);
|
||||
TORRENT_ASSERT(p->blocks[block].buf);
|
||||
}
|
||||
|
||||
p->last_use = time_now();
|
||||
while (size > 0)
|
||||
{
|
||||
TORRENT_ASSERT(p->blocks[block]);
|
||||
TORRENT_ASSERT(p->blocks[block].buf);
|
||||
int to_copy = (std::min)(m_block_size
|
||||
- block_offset, size);
|
||||
std::memcpy(j.buffer + buffer_offset
|
||||
, p->blocks[block] + block_offset
|
||||
, p->blocks[block].buf + block_offset
|
||||
, to_copy);
|
||||
size -= to_copy;
|
||||
block_offset = 0;
|
||||
|
@ -1177,6 +1195,12 @@ namespace libtorrent
|
|||
return ret;
|
||||
}
|
||||
|
||||
int disk_io_thread::queued_write_bytes() const
|
||||
{
|
||||
mutex_t::scoped_lock l(m_queue_mutex);
|
||||
return m_queued_write_bytes;
|
||||
}
|
||||
|
||||
void disk_io_thread::add_job(disk_io_job const& j
|
||||
, boost::function<void(int, disk_io_job const&)> const& f)
|
||||
{
|
||||
|
@ -1212,6 +1236,7 @@ namespace libtorrent
|
|||
}
|
||||
else if (j.action == disk_io_job::write)
|
||||
{
|
||||
m_queued_write_bytes += j.buffer_size;
|
||||
for (; i != m_jobs.rend(); ++i)
|
||||
{
|
||||
if (*i < j)
|
||||
|
@ -1245,17 +1270,14 @@ namespace libtorrent
|
|||
if (ec)
|
||||
{
|
||||
j.buffer = 0;
|
||||
j.str = ec.message();
|
||||
j.str.clear();
|
||||
j.error = ec;
|
||||
j.error_file = j.storage->error_file();
|
||||
j.error_piece = j.storage->last_piece();
|
||||
j.error_op = j.storage->last_operation();
|
||||
j.storage->clear_error();
|
||||
#ifdef TORRENT_DEBUG
|
||||
std::cout << "ERROR: '" << j.str << "' while "
|
||||
<< (j.error_op == disk_io_job::read?"reading ":"writing ")
|
||||
std::cout << "ERROR: '" << ec.message() << " in "
|
||||
<< j.error_file << std::endl;
|
||||
#endif
|
||||
j.storage->clear_error();
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
|
@ -1324,6 +1346,16 @@ namespace libtorrent
|
|||
m_queue_buffer_size -= j.buffer_size;
|
||||
jl.unlock();
|
||||
|
||||
if (m_queue_buffer_size + j.buffer_size >= m_settings.max_queued_disk_bytes
|
||||
&& m_queue_buffer_size < m_settings.max_queued_disk_bytes
|
||||
&& m_queue_callback)
|
||||
{
|
||||
// we just dropped below the high watermark of number of bytes
|
||||
// queued for writing to the disk. Notify the session so that it
|
||||
// can trigger all the connections waiting for this event
|
||||
m_ios.post(m_queue_callback);
|
||||
}
|
||||
|
||||
flush_expired_pieces();
|
||||
|
||||
int ret = 0;
|
||||
|
@ -1425,9 +1457,7 @@ namespace libtorrent
|
|||
#else
|
||||
j.error = asio::error::no_memory;
|
||||
#endif
|
||||
j.error_piece = j.piece;
|
||||
j.error_op = disk_io_job::read;
|
||||
j.str = j.error.message();
|
||||
j.str.clear();
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -1452,9 +1482,7 @@ namespace libtorrent
|
|||
{
|
||||
j.storage->mark_failed(j.piece);
|
||||
j.error = error_code(errors::failed_hash_check, libtorrent_category);
|
||||
j.str = j.error.message();
|
||||
j.error_piece = j.storage->last_piece();
|
||||
j.error_op = disk_io_job::read;
|
||||
j.str.clear();
|
||||
j.buffer = 0;
|
||||
break;
|
||||
}
|
||||
|
@ -1489,9 +1517,7 @@ namespace libtorrent
|
|||
#else
|
||||
j.error = asio::error::no_memory;
|
||||
#endif
|
||||
j.error_piece = j.piece;
|
||||
j.error_op = disk_io_job::read;
|
||||
j.str = j.error.message();
|
||||
j.str.clear();
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -1522,9 +1548,7 @@ namespace libtorrent
|
|||
j.buffer = 0;
|
||||
j.error = error_code(errors::file_too_short, libtorrent_category);
|
||||
j.error_file.clear();
|
||||
j.str = j.error.message();
|
||||
j.error_piece = j.storage->last_piece();
|
||||
j.error_op = disk_io_job::read;
|
||||
j.str.clear();
|
||||
ret = -1;
|
||||
break;
|
||||
}
|
||||
|
@ -1539,11 +1563,7 @@ namespace libtorrent
|
|||
}
|
||||
case disk_io_job::write:
|
||||
{
|
||||
if (test_error(j))
|
||||
{
|
||||
ret = -1;
|
||||
break;
|
||||
}
|
||||
m_queued_write_bytes -= j.buffer_size;
|
||||
#ifdef TORRENT_DISK_STATS
|
||||
m_log << log_time() << " write " << j.buffer_size << std::endl;
|
||||
#endif
|
||||
|
@ -1560,13 +1580,14 @@ namespace libtorrent
|
|||
TORRENT_ASSERT(j.buffer_size <= m_block_size);
|
||||
if (p != m_pieces.end())
|
||||
{
|
||||
TORRENT_ASSERT(p->blocks[block] == 0);
|
||||
if (p->blocks[block])
|
||||
TORRENT_ASSERT(p->blocks[block].buf == 0);
|
||||
if (p->blocks[block].buf)
|
||||
{
|
||||
free_buffer(p->blocks[block]);
|
||||
free_buffer(p->blocks[block].buf);
|
||||
--p->num_blocks;
|
||||
}
|
||||
p->blocks[block] = j.buffer;
|
||||
p->blocks[block].buf = j.buffer;
|
||||
p->blocks[block].callback.swap(handler);
|
||||
#ifdef TORRENT_DISK_STATS
|
||||
rename_buffer(j.buffer, "write cache");
|
||||
#endif
|
||||
|
@ -1579,7 +1600,7 @@ namespace libtorrent
|
|||
}
|
||||
else
|
||||
{
|
||||
if (cache_block(j, l) < 0)
|
||||
if (cache_block(j, handler, l) < 0)
|
||||
{
|
||||
file::iovec_t iov = {j.buffer, j.buffer_size};
|
||||
ret = j.storage->write_impl(&iov, j.piece, j.offset, 1);
|
||||
|
@ -1723,9 +1744,9 @@ namespace libtorrent
|
|||
int blocks_in_piece = (ti.piece_size(k->piece) + m_block_size - 1) / m_block_size;
|
||||
for (int j = 0; j < blocks_in_piece; ++j)
|
||||
{
|
||||
if (k->blocks[j] == 0) continue;
|
||||
free_buffer(k->blocks[j]);
|
||||
k->blocks[j] = 0;
|
||||
if (k->blocks[j].buf == 0) continue;
|
||||
free_buffer(k->blocks[j].buf);
|
||||
k->blocks[j].buf = 0;
|
||||
--m_cache_stats.cache_size;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1871,7 +1871,8 @@ namespace libtorrent
|
|||
"s: " << p.start << " | "
|
||||
"l: " << p.length << " | "
|
||||
"ds: " << statistics().download_rate() << " | "
|
||||
"qs: " << int(m_desired_queue_size) << " ]\n";
|
||||
"qs: " << int(m_desired_queue_size) << " | "
|
||||
"q: " << int(m_download_queue.size()) << " ]\n";
|
||||
#endif
|
||||
|
||||
if (p.length == 0)
|
||||
|
@ -2019,7 +2020,8 @@ namespace libtorrent
|
|||
TORRENT_ASSERT(m_channel_state[download_channel] == peer_info::bw_idle);
|
||||
m_download_queue.erase(b);
|
||||
|
||||
if (m_outstanding_writing_bytes > m_ses.settings().max_outstanding_disk_bytes_per_connection
|
||||
if (t->filesystem().queued_bytes() >= m_ses.settings().max_queued_disk_bytes
|
||||
&& m_ses.settings().max_queued_disk_bytes
|
||||
&& t->alerts().should_post<performance_alert>())
|
||||
{
|
||||
t->alerts().post_alert(performance_alert(t->get_handle()
|
||||
|
@ -2052,6 +2054,19 @@ namespace libtorrent
|
|||
&& defined TORRENT_EXPENSIVE_INVARIANT_CHECKS
|
||||
t->check_invariant();
|
||||
#endif
|
||||
|
||||
// did we just finish the piece?
|
||||
// this means all blocks are either written
|
||||
// to disk or are in the disk write cache
|
||||
if (picker.is_piece_finished(p.piece))
|
||||
{
|
||||
#ifdef TORRENT_DEBUG
|
||||
check_postcondition post_checker2_(t, false);
|
||||
#endif
|
||||
t->async_verify_piece(p.piece, bind(&torrent::piece_finished, t
|
||||
, p.piece, _1));
|
||||
}
|
||||
|
||||
request_a_block(*t, *this);
|
||||
send_block_requests();
|
||||
}
|
||||
|
@ -2104,24 +2119,6 @@ namespace libtorrent
|
|||
}
|
||||
|
||||
if (t->is_aborted()) return;
|
||||
|
||||
// did we just finish the piece?
|
||||
if (picker.is_piece_finished(p.piece))
|
||||
{
|
||||
#ifdef TORRENT_DEBUG
|
||||
check_postcondition post_checker2_(t, false);
|
||||
#endif
|
||||
t->async_verify_piece(p.piece, bind(&torrent::piece_finished, t
|
||||
, p.piece, _1));
|
||||
}
|
||||
|
||||
if (!t->is_seed() && !m_torrent.expired())
|
||||
{
|
||||
// this is a free function defined in policy.cpp
|
||||
request_a_block(*t, *this);
|
||||
send_block_requests();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// -----------------------------
|
||||
|
@ -3696,7 +3693,8 @@ namespace libtorrent
|
|||
{
|
||||
INVARIANT_CHECK;
|
||||
|
||||
if (m_channel_state[download_channel] != peer_info::bw_idle) return;
|
||||
if (m_channel_state[download_channel] != peer_info::bw_idle
|
||||
&& m_channel_state[download_channel] != peer_info::bw_disk) return;
|
||||
|
||||
shared_ptr<torrent> t = m_torrent.lock();
|
||||
|
||||
|
@ -3732,10 +3730,19 @@ namespace libtorrent
|
|||
(*m_logger) << time_now_string() << " *** CANNOT READ ["
|
||||
" quota: " << m_quota[download_channel] <<
|
||||
" ignore: " << (m_ignore_bandwidth_limits?"yes":"no") <<
|
||||
" outstanding: " << m_outstanding_writing_bytes <<
|
||||
" outstanding-limit: " << m_ses.settings().max_outstanding_disk_bytes_per_connection <<
|
||||
" queue-size: " << ((t && t->get_storage())?t->filesystem().queued_bytes():0) <<
|
||||
" queue-limit: " << m_ses.settings().max_queued_disk_bytes <<
|
||||
" disconnecting: " << (m_disconnecting?"yes":"no") <<
|
||||
" ]\n";
|
||||
#endif
|
||||
if (m_ses.settings().max_queued_disk_bytes > 0
|
||||
&& t && t->get_storage()
|
||||
&& t->filesystem().queued_bytes() >= m_ses.settings().max_queued_disk_bytes)
|
||||
m_channel_state[download_channel] = peer_info::bw_disk;
|
||||
|
||||
// if we block reading, waiting for the disk, we will wake up
|
||||
// by the disk_io_thread posting a message every time it drops
|
||||
// from being at or exceeding the limit down to below the limit
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -4091,11 +4098,16 @@ namespace libtorrent
|
|||
|
||||
bool peer_connection::can_read() const
|
||||
{
|
||||
boost::shared_ptr<torrent> t = m_torrent.lock();
|
||||
|
||||
bool ret = (m_quota[download_channel] > 0
|
||||
|| m_ignore_bandwidth_limits)
|
||||
&& !m_connecting
|
||||
&& m_outstanding_writing_bytes <=
|
||||
m_ses.settings().max_outstanding_disk_bytes_per_connection;
|
||||
&& (m_ses.settings().max_queued_disk_bytes == 0
|
||||
|| !t
|
||||
|| t->get_storage() == 0
|
||||
|| t->filesystem().queued_bytes() < m_ses.settings().max_queued_disk_bytes)
|
||||
&& !m_disconnecting;
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
|
|
@ -196,6 +196,7 @@ namespace libtorrent
|
|||
std::copy(other->info, other->info + m_blocks_per_piece, i->info);
|
||||
other->info = i->info;
|
||||
}
|
||||
m_piece_map[i->index].downloading = false;
|
||||
m_downloads.erase(i);
|
||||
}
|
||||
|
||||
|
@ -792,18 +793,17 @@ namespace libtorrent
|
|||
|
||||
TORRENT_ASSERT(i != m_downloads.end());
|
||||
#ifdef TORRENT_DEBUG
|
||||
int num_blocks = blocks_in_piece(i->index);
|
||||
for (int k = 0; k < num_blocks; ++k)
|
||||
{
|
||||
TORRENT_ASSERT(i->info[k].state == block_info::state_finished);
|
||||
TORRENT_ASSERT(i->info[k].num_peers == 0);
|
||||
}
|
||||
int num_blocks = blocks_in_piece(i->index);
|
||||
for (int k = 0; k < num_blocks; ++k)
|
||||
{
|
||||
TORRENT_ASSERT(i->info[k].state == block_info::state_finished);
|
||||
TORRENT_ASSERT(i->info[k].num_peers == 0);
|
||||
}
|
||||
#endif
|
||||
erase_download_piece(i);
|
||||
|
||||
piece_pos& p = m_piece_map[index];
|
||||
int prev_priority = p.priority(this);
|
||||
p.downloading = 0;
|
||||
erase_download_piece(i);
|
||||
int new_priority = p.priority(this);
|
||||
|
||||
if (new_priority == prev_priority) return;
|
||||
|
@ -1074,7 +1074,6 @@ namespace libtorrent
|
|||
, has_index(index));
|
||||
TORRENT_ASSERT(i != m_downloads.end());
|
||||
erase_download_piece(i);
|
||||
p.downloading = 0;
|
||||
}
|
||||
|
||||
TORRENT_ASSERT(std::find_if(m_downloads.begin(), m_downloads.end()
|
||||
|
@ -1907,16 +1906,17 @@ namespace libtorrent
|
|||
TORRENT_ASSERT(i != m_downloads.end());
|
||||
TORRENT_ASSERT((int)i->finished <= m_blocks_per_piece);
|
||||
int max_blocks = blocks_in_piece(index);
|
||||
if ((int)i->finished < max_blocks) return false;
|
||||
if (int(i->finished) + int(i->writing) < max_blocks) return false;
|
||||
TORRENT_ASSERT(int(i->finished) + int(i->writing) == max_blocks);
|
||||
|
||||
#ifdef TORRENT_DEBUG
|
||||
for (int k = 0; k < max_blocks; ++k)
|
||||
{
|
||||
TORRENT_ASSERT(i->info[k].state == block_info::state_finished);
|
||||
TORRENT_ASSERT(i->info[k].state == block_info::state_finished
|
||||
|| i->info[k].state == block_info::state_writing);
|
||||
}
|
||||
#endif
|
||||
|
||||
TORRENT_ASSERT((int)i->finished == max_blocks);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -2117,15 +2117,46 @@ namespace libtorrent
|
|||
}
|
||||
}
|
||||
|
||||
void piece_picker::write_failed(int piece)
|
||||
void piece_picker::write_failed(piece_block block)
|
||||
{
|
||||
TORRENT_PIECE_PICKER_INVARIANT_CHECK;
|
||||
|
||||
std::vector<downloading_piece>::iterator i
|
||||
= std::find_if(m_downloads.begin(), m_downloads.end(), has_index(piece));
|
||||
= std::find_if(m_downloads.begin(), m_downloads.end(), has_index(block.piece_index));
|
||||
TORRENT_ASSERT(i != m_downloads.end());
|
||||
if (i == m_downloads.end()) return;
|
||||
|
||||
erase_download_piece(i);
|
||||
block_info& info = i->info[block.block_index];
|
||||
TORRENT_ASSERT(info.state == block_info::state_writing);
|
||||
TORRENT_ASSERT(info.num_peers == 0);
|
||||
|
||||
TORRENT_ASSERT(i->writing > 0);
|
||||
TORRENT_ASSERT(info.state == block_info::state_writing);
|
||||
|
||||
if (info.state == block_info::state_finished) return;
|
||||
if (info.state == block_info::state_writing) --i->writing;
|
||||
|
||||
info.peer = 0;
|
||||
|
||||
info.state = block_info::state_none;
|
||||
|
||||
if (i->finished + i->writing + i->requested == 0)
|
||||
{
|
||||
piece_pos& p = m_piece_map[block.piece_index];
|
||||
int prev_priority = p.priority(this);
|
||||
erase_download_piece(i);
|
||||
int new_priority = p.priority(this);
|
||||
TORRENT_ASSERT(new_priority >= 0);
|
||||
|
||||
if (m_dirty) return;
|
||||
if (new_priority == prev_priority) return;
|
||||
if (prev_priority == -1) add(p.index);
|
||||
else update(prev_priority, p.index);
|
||||
}
|
||||
else
|
||||
{
|
||||
sort_piece(i);
|
||||
}
|
||||
}
|
||||
|
||||
void piece_picker::mark_as_finished(piece_block block, void* peer)
|
||||
|
@ -2283,12 +2314,11 @@ namespace libtorrent
|
|||
// that's being downloaded, remove it from the list
|
||||
if (i->requested + i->finished + i->writing == 0)
|
||||
{
|
||||
erase_download_piece(i);
|
||||
piece_pos& p = m_piece_map[block.piece_index];
|
||||
int prev_prio = p.priority(this);
|
||||
TORRENT_ASSERT(prev_prio < int(m_priority_boundries.size())
|
||||
|| m_dirty);
|
||||
p.downloading = 0;
|
||||
erase_download_piece(i);
|
||||
if (!m_dirty)
|
||||
{
|
||||
int prio = p.priority(this);
|
||||
|
|
|
@ -141,7 +141,7 @@ namespace libtorrent
|
|||
// whenever a peer has downloaded one block, write
|
||||
// it to disk, and don't read anything from the
|
||||
// socket until the disk write is complete
|
||||
set.max_outstanding_disk_bytes_per_connection = 0;
|
||||
set.max_queued_disk_bytes = 1;
|
||||
|
||||
// don't keep track of all upnp devices, keep
|
||||
// the device list small
|
||||
|
|
|
@ -183,7 +183,7 @@ namespace aux {
|
|||
, m_files(40)
|
||||
, m_io_service()
|
||||
, m_alerts(m_io_service)
|
||||
, m_disk_thread(m_io_service)
|
||||
, m_disk_thread(m_io_service, boost::bind(&session_impl::on_disk_queue, this))
|
||||
, m_half_open(m_io_service)
|
||||
, m_download_rate(peer_connection::download_channel)
|
||||
#ifdef TORRENT_VERBOSE_BANDWIDTH_LIMIT
|
||||
|
@ -646,6 +646,7 @@ namespace aux {
|
|||
|| m_settings.write_cache_line_size != s.write_cache_line_size
|
||||
|| m_settings.coalesce_writes != s.coalesce_writes
|
||||
|| m_settings.coalesce_reads != s.coalesce_reads
|
||||
|| m_settings.max_queued_disk_bytes != s.max_queued_disk_bytes
|
||||
#ifndef TORRENT_DISABLE_MLOCK
|
||||
|| m_settings.lock_disk_cache != s.lock_disk_cache
|
||||
#endif
|
||||
|
@ -1150,6 +1151,25 @@ namespace aux {
|
|||
return port;
|
||||
}
|
||||
|
||||
// this function is called from the disk-io thread
|
||||
// when the disk queue is low enough to post new
|
||||
// write jobs to it. It will go through all peer
|
||||
// connections that are blocked on the disk and
|
||||
// wake them up
|
||||
void session_impl::on_disk_queue()
|
||||
{
|
||||
session_impl::mutex_t::scoped_lock l(m_mutex);
|
||||
|
||||
for (connection_map::iterator i = m_connections.begin()
|
||||
, end(m_connections.end()); i != end; ++i)
|
||||
{
|
||||
if ((*i)->m_channel_state[peer_connection::download_channel]
|
||||
!= peer_info::bw_disk) continue;
|
||||
|
||||
(*i)->setup_receive();
|
||||
}
|
||||
}
|
||||
|
||||
void session_impl::on_tick(error_code const& e)
|
||||
{
|
||||
session_impl::mutex_t::scoped_lock l(m_mutex);
|
||||
|
|
|
@ -481,7 +481,6 @@ namespace libtorrent
|
|||
TORRENT_ASSERT(!error());
|
||||
int num_read = 0;
|
||||
int slot_size = piece_size - ph.offset;
|
||||
m_last_op = disk_io_job::read;
|
||||
if (slot_size > 0)
|
||||
{
|
||||
int block_size = 16 * 1024;
|
||||
|
@ -520,6 +519,7 @@ namespace libtorrent
|
|||
ph.h.update((char const*)bufs[i].iov_base, bufs[i].iov_len);
|
||||
small_piece_size -= bufs[i].iov_len;
|
||||
}
|
||||
ph.offset += bufs[i].iov_len;
|
||||
m_storage->disk_pool()->free_buffer((char*)bufs[i].iov_base);
|
||||
}
|
||||
}
|
||||
|
@ -1486,7 +1486,6 @@ ret:
|
|||
, m_scratch_buffer2(io, 0)
|
||||
, m_scratch_piece(-1)
|
||||
, m_last_piece(-1)
|
||||
, m_last_op(-1)
|
||||
, m_storage_constructor(sc)
|
||||
, m_io_thread(io)
|
||||
, m_torrent(torrent)
|
||||
|
@ -1628,6 +1627,11 @@ ret:
|
|||
#endif
|
||||
}
|
||||
|
||||
int piece_manager::queued_bytes() const
|
||||
{
|
||||
return m_io_thread.queued_write_bytes();
|
||||
}
|
||||
|
||||
void piece_manager::async_write(
|
||||
peer_request const& r
|
||||
, disk_buffer_holder& buffer
|
||||
|
@ -1749,7 +1753,6 @@ ret:
|
|||
TORRENT_ASSERT(offset >= 0);
|
||||
TORRENT_ASSERT(num_bufs > 0);
|
||||
m_last_piece = piece_index;
|
||||
m_last_op = disk_io_job::read;
|
||||
int slot = slot_for(piece_index);
|
||||
return m_storage->readv(bufs, slot, offset, num_bufs);
|
||||
}
|
||||
|
@ -1770,7 +1773,6 @@ ret:
|
|||
file::iovec_t* iov = TORRENT_ALLOCA(file::iovec_t, num_bufs);
|
||||
std::copy(bufs, bufs + num_bufs, iov);
|
||||
m_last_piece = piece_index;
|
||||
m_last_op = disk_io_job::write;
|
||||
int slot = allocate_slot_for_piece(piece_index);
|
||||
int ret = m_storage->writev(bufs, slot, offset, num_bufs);
|
||||
// only save the partial hash if the write succeeds
|
||||
|
@ -2344,7 +2346,6 @@ ret:
|
|||
// the slot where this piece belongs is
|
||||
// free. Just move the piece there.
|
||||
m_last_piece = piece;
|
||||
m_last_op = disk_io_job::write;
|
||||
m_storage->move_slot(m_current_slot, piece);
|
||||
if (m_storage->error()) return -1;
|
||||
|
||||
|
@ -2572,7 +2573,6 @@ ret:
|
|||
|
||||
bool ret = false;
|
||||
m_last_piece = piece_index;
|
||||
m_last_op = disk_io_job::write;
|
||||
if (other_piece >= 0)
|
||||
ret |= m_storage->swap_slots(other_slot, m_current_slot);
|
||||
else
|
||||
|
@ -2612,7 +2612,6 @@ ret:
|
|||
|
||||
}
|
||||
m_last_piece = other_piece;
|
||||
m_last_op = disk_io_job::write;
|
||||
if (ret) return skip_file();
|
||||
|
||||
TORRENT_ASSERT(m_slot_to_piece[m_current_slot] == unassigned
|
||||
|
@ -2652,7 +2651,6 @@ ret:
|
|||
TORRENT_ASSERT(piece_index == slot1);
|
||||
|
||||
m_last_piece = piece_index;
|
||||
m_last_op = disk_io_job::write;
|
||||
m_storage->swap_slots(m_current_slot, slot1);
|
||||
|
||||
TORRENT_ASSERT(m_slot_to_piece[m_current_slot] == unassigned
|
||||
|
@ -2700,7 +2698,6 @@ ret:
|
|||
}
|
||||
|
||||
m_last_piece = piece_index;
|
||||
m_last_op = disk_io_job::write;
|
||||
if (ret) return skip_file();
|
||||
|
||||
TORRENT_ASSERT(m_slot_to_piece[m_current_slot] == unassigned
|
||||
|
@ -2847,7 +2844,6 @@ ret:
|
|||
, m_piece_to_slot[piece_at_our_slot]);
|
||||
|
||||
m_last_piece = piece_index;
|
||||
m_last_op = disk_io_job::write;
|
||||
m_storage->move_slot(piece_index, slot_index);
|
||||
|
||||
TORRENT_ASSERT(m_slot_to_piece[piece_index] == piece_index);
|
||||
|
@ -2892,7 +2888,6 @@ ret:
|
|||
if (m_piece_to_slot[pos] != has_no_slot)
|
||||
{
|
||||
m_last_piece = pos;
|
||||
m_last_op = disk_io_job::write;
|
||||
new_free_slot = m_piece_to_slot[pos];
|
||||
m_storage->move_slot(new_free_slot, pos);
|
||||
m_slot_to_piece[pos] = pos;
|
||||
|
|
|
@ -345,11 +345,23 @@ namespace libtorrent
|
|||
if (!j.error) return;
|
||||
|
||||
#if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING
|
||||
(*m_ses.m_logger) << "disk error: '" << j.str << "' while "
|
||||
<< (j.error_op == disk_io_job::read?"reading ":"writing ")
|
||||
<< " piece " << j.error_piece << " in file " << j.error_file << "\n";
|
||||
(*m_ses.m_logger) << "disk error: '" << j.error.message()
|
||||
<< " in file " << j.error_file
|
||||
<< " in torrent " << torrent_file().name()
|
||||
<< "\n";
|
||||
#endif
|
||||
|
||||
TORRENT_ASSERT(j.piece >= 0);
|
||||
|
||||
piece_block block_finished(j.piece, j.offset / block_size());
|
||||
|
||||
if (j.action == disk_io_job::write
|
||||
|| j.action == disk_io_job::hash)
|
||||
{
|
||||
// we failed to write j.piece to disk tell the piece picker
|
||||
if (has_picker() && j.piece >= 0) picker().write_failed(block_finished);
|
||||
}
|
||||
|
||||
if (j.error ==
|
||||
#if BOOST_VERSION >= 103500
|
||||
error_code(boost::system::errc::not_enough_memory, get_posix_category())
|
||||
|
@ -359,23 +371,15 @@ namespace libtorrent
|
|||
)
|
||||
{
|
||||
if (alerts().should_post<file_error_alert>())
|
||||
alerts().post_alert(file_error_alert(j.error_file, get_handle(), j.str));
|
||||
alerts().post_alert(file_error_alert(j.error_file, get_handle(), j.error));
|
||||
if (c) c->disconnect("no memory");
|
||||
return;
|
||||
}
|
||||
|
||||
TORRENT_ASSERT(j.error_piece >= 0);
|
||||
|
||||
if (j.error_op == disk_io_job::write)
|
||||
{
|
||||
// we failed to write j.error_piece to disk
|
||||
// tell the piece picker
|
||||
if (has_picker() && j.error_piece >= 0) picker().write_failed(j.error_piece);
|
||||
}
|
||||
|
||||
// notify the user of the error
|
||||
if (alerts().should_post<file_error_alert>())
|
||||
alerts().post_alert(file_error_alert(j.error_file, get_handle(), j.str));
|
||||
alerts().post_alert(file_error_alert(j.error_file, get_handle(), j.error));
|
||||
|
||||
// put the torrent in an error-state
|
||||
set_error(j.error, j.error_file);
|
||||
|
@ -658,23 +662,10 @@ namespace libtorrent
|
|||
|
||||
if (ret == piece_manager::fatal_disk_error)
|
||||
{
|
||||
if (m_ses.m_alerts.should_post<file_error_alert>())
|
||||
{
|
||||
m_ses.m_alerts.post_alert(file_error_alert(j.error_file, get_handle(), j.str));
|
||||
}
|
||||
#if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING
|
||||
(*m_ses.m_logger) << time_now_string() << ": fatal disk error ["
|
||||
" error: " << j.str <<
|
||||
" torrent: " << torrent_file().name() <<
|
||||
" ]\n";
|
||||
#endif
|
||||
set_error(j.error, j.error_file);
|
||||
pause();
|
||||
handle_disk_error(j);
|
||||
set_state(torrent_status::queued_for_checking);
|
||||
|
||||
std::vector<char>().swap(m_resume_data);
|
||||
lazy_entry().swap(m_resume_entry);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -886,12 +877,13 @@ namespace libtorrent
|
|||
m_picker->init(m_torrent_file->piece_length() / m_block_size
|
||||
, int((m_torrent_file->total_size()+m_block_size-1)/m_block_size));
|
||||
// assume that we don't have anything
|
||||
TORRENT_ASSERT(m_picker->num_have() == 0);
|
||||
m_files_checked = false;
|
||||
set_state(torrent_status::checking_resume_data);
|
||||
|
||||
m_policy.recalculate_connect_candidates();
|
||||
|
||||
if (m_auto_managed)
|
||||
if (m_auto_managed && !is_finished())
|
||||
set_queue_position((std::numeric_limits<int>::max)());
|
||||
|
||||
std::vector<char>().swap(m_resume_data);
|
||||
|
@ -907,18 +899,7 @@ namespace libtorrent
|
|||
|
||||
if (ret == piece_manager::fatal_disk_error)
|
||||
{
|
||||
if (m_ses.m_alerts.should_post<file_error_alert>())
|
||||
{
|
||||
m_ses.m_alerts.post_alert(file_error_alert(j.error_file, get_handle(), j.str));
|
||||
}
|
||||
#if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING
|
||||
(*m_ses.m_logger) << time_now_string() << ": fatal disk error ["
|
||||
" error: " << j.str <<
|
||||
" torrent: " << torrent_file().name() <<
|
||||
" ]\n";
|
||||
#endif
|
||||
set_error(j.error, j.error_file);
|
||||
pause();
|
||||
handle_disk_error(j);
|
||||
return;
|
||||
}
|
||||
if (ret == 0)
|
||||
|
@ -959,7 +940,7 @@ namespace libtorrent
|
|||
{
|
||||
if (m_ses.m_alerts.should_post<file_error_alert>())
|
||||
{
|
||||
m_ses.m_alerts.post_alert(file_error_alert(j.error_file, get_handle(), j.str));
|
||||
m_ses.m_alerts.post_alert(file_error_alert(j.error_file, get_handle(), j.error));
|
||||
}
|
||||
#if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING
|
||||
(*m_ses.m_logger) << time_now_string() << ": fatal disk error ["
|
||||
|
@ -1684,6 +1665,14 @@ namespace libtorrent
|
|||
|
||||
TORRENT_ASSERT(valid_metadata());
|
||||
|
||||
// even though the piece passed the hash-check
|
||||
// it might still have failed being written to disk
|
||||
// if so, piece_picker::write_failed() has been
|
||||
// called, and the piece is no longer finished.
|
||||
// in this case, we have to ignore the fact that
|
||||
// it passed the check
|
||||
if (!m_picker->is_piece_finished(index)) return;
|
||||
|
||||
if (passed_hash_check == 0)
|
||||
{
|
||||
// the following call may cause picker to become invalid
|
||||
|
@ -1991,7 +1980,7 @@ namespace libtorrent
|
|||
for (peer_iterator i = m_connections.begin();
|
||||
i != m_connections.end(); ++i)
|
||||
{
|
||||
(*(*i)->m_logger) << "*** ABORTING TORRENT\n";
|
||||
(*(*i)->m_logger) << time_now_string() << " *** ABORTING TORRENT\n";
|
||||
}
|
||||
#endif
|
||||
|
||||
|
@ -3991,6 +3980,8 @@ namespace libtorrent
|
|||
std::for_each(seeds.begin(), seeds.end()
|
||||
, bind(&peer_connection::disconnect, _1, "torrent finished, disconnecting seed", 0));
|
||||
|
||||
if (m_abort) return;
|
||||
|
||||
m_policy.recalculate_connect_candidates();
|
||||
|
||||
TORRENT_ASSERT(m_storage);
|
||||
|
@ -5198,13 +5189,7 @@ namespace libtorrent
|
|||
// -1: disk failure
|
||||
// -2: hash check failed
|
||||
|
||||
if (ret == -1)
|
||||
{
|
||||
if (alerts().should_post<file_error_alert>())
|
||||
alerts().post_alert(file_error_alert(j.error_file, get_handle(), j.str));
|
||||
set_error(j.error, j.error_file);
|
||||
pause();
|
||||
}
|
||||
if (ret == -1) handle_disk_error(j);
|
||||
f(ret);
|
||||
}
|
||||
|
||||
|
|
|
@ -195,7 +195,7 @@ void run_storage_tests(boost::intrusive_ptr<torrent_info> info
|
|||
{
|
||||
file_pool fp;
|
||||
libtorrent::asio::io_service ios;
|
||||
disk_io_thread io(ios);
|
||||
disk_io_thread io(ios, boost::function<void()>());
|
||||
boost::shared_ptr<int> dummy(new int);
|
||||
boost::intrusive_ptr<piece_manager> pm = new piece_manager(dummy, info
|
||||
, test_path, fp, io, default_storage_constructor, storage_mode);
|
||||
|
@ -396,7 +396,7 @@ void test_check_files(path const& test_path
|
|||
|
||||
file_pool fp;
|
||||
libtorrent::asio::io_service ios;
|
||||
disk_io_thread io(ios);
|
||||
disk_io_thread io(ios, boost::function<void()>());
|
||||
boost::shared_ptr<int> dummy(new int);
|
||||
boost::intrusive_ptr<piece_manager> pm = new piece_manager(dummy, info
|
||||
, test_path, fp, io, default_storage_constructor, storage_mode);
|
||||
|
|
|
@ -55,12 +55,12 @@ void test_transfer(boost::intrusive_ptr<torrent_info> torrent_file, int proxy)
|
|||
session ses(fingerprint(" ", 0,0,0,0), 0);
|
||||
session_settings settings;
|
||||
settings.ignore_limits_on_local_network = false;
|
||||
settings.max_outstanding_disk_bytes_per_connection = 256 * 1024;
|
||||
settings.max_queued_disk_bytes = 256 * 1024;
|
||||
ses.set_settings(settings);
|
||||
ses.set_alert_mask(~alert::progress_notification);
|
||||
ses.listen_on(std::make_pair(51000, 52000));
|
||||
ses.set_download_rate_limit(torrent_file->total_size() / 10);
|
||||
remove_all("./tmp1");
|
||||
remove_all("./tmp2_web_seed");
|
||||
|
||||
char const* test_name[] = {"no", "SOCKS4", "SOCKS5", "SOCKS5 password", "HTTP", "HTTP password"};
|
||||
|
||||
|
@ -78,7 +78,7 @@ void test_transfer(boost::intrusive_ptr<torrent_info> torrent_file, int proxy)
|
|||
ses.set_web_seed_proxy(ps);
|
||||
}
|
||||
|
||||
torrent_handle th = ses.add_torrent(*torrent_file, "./tmp1");
|
||||
torrent_handle th = ses.add_torrent(*torrent_file, "./tmp2_web_seed");
|
||||
|
||||
std::vector<announce_entry> empty;
|
||||
th.replace_trackers(empty);
|
||||
|
@ -94,12 +94,20 @@ void test_transfer(boost::intrusive_ptr<torrent_info> torrent_file, int proxy)
|
|||
session_status ss = ses.status();
|
||||
rate_sum += s.download_payload_rate;
|
||||
ses_rate_sum += ss.payload_download_rate;
|
||||
|
||||
cache_status cs = ses.get_cache_status();
|
||||
if (cs.blocks_read < 1) cs.blocks_read = 1;
|
||||
if (cs.blocks_written < 1) cs.blocks_written = 1;
|
||||
|
||||
std::cerr << (s.progress * 100.f) << " %"
|
||||
<< " torrent rate: " << (s.download_rate / 1000.f) << " kB/s"
|
||||
<< " session rate: " << (ss.download_rate / 1000.f) << " kB/s"
|
||||
<< " session total: " << ss.total_payload_download
|
||||
<< " torrent total: " << s.total_payload_download
|
||||
<< " rate sum:" << ses_rate_sum
|
||||
<< " cache: " << cs.cache_size
|
||||
<< " rcache: " << cs.read_cache_size
|
||||
<< " buffers: " << cs.total_used_buffers
|
||||
<< std::endl;
|
||||
|
||||
print_alerts(ses, "ses");
|
||||
|
@ -126,8 +134,8 @@ void test_transfer(boost::intrusive_ptr<torrent_info> torrent_file, int proxy)
|
|||
|
||||
if (proxy) stop_proxy(8002);
|
||||
|
||||
TEST_CHECK(exists("./tmp1" / torrent_file->file_at(0).path));
|
||||
remove_all("./tmp1");
|
||||
TEST_CHECK(exists("./tmp2_web_seed" / torrent_file->file_at(0).path));
|
||||
remove_all("./tmp2_web_seed");
|
||||
}
|
||||
|
||||
int test_main()
|
||||
|
@ -136,54 +144,45 @@ int test_main()
|
|||
using namespace boost::filesystem;
|
||||
|
||||
try {
|
||||
create_directory("test_torrent_dir");
|
||||
create_directory("./tmp1_web_seed");
|
||||
} catch (std::exception&) {}
|
||||
|
||||
try {
|
||||
create_directory("./tmp1_web_seed/test_torrent_dir");
|
||||
} catch (std::exception&) {}
|
||||
|
||||
char random_data[300000];
|
||||
std::srand(std::time(0));
|
||||
std::srand(10);
|
||||
// memset(random_data, 1, sizeof(random_data));
|
||||
std::generate(random_data, random_data + sizeof(random_data), &std::rand);
|
||||
std::ofstream("./test_torrent_dir/test1").write(random_data, 35);
|
||||
std::ofstream("./test_torrent_dir/test2").write(random_data, 16536 - 35);
|
||||
std::ofstream("./test_torrent_dir/test3").write(random_data, 16536);
|
||||
std::ofstream("./test_torrent_dir/test4").write(random_data, 17);
|
||||
std::ofstream("./test_torrent_dir/test5").write(random_data, 16536);
|
||||
std::ofstream("./test_torrent_dir/test6").write(random_data, 300000);
|
||||
std::ofstream("./test_torrent_dir/test7").write(random_data, 300000);
|
||||
std::ofstream("./tmp1_web_seed/test_torrent_dir/test1").write(random_data, 35);
|
||||
std::ofstream("./tmp1_web_seed/test_torrent_dir/test2").write(random_data, 16536 - 35);
|
||||
std::ofstream("./tmp1_web_seed/test_torrent_dir/test3").write(random_data, 16536);
|
||||
std::ofstream("./tmp1_web_seed/test_torrent_dir/test4").write(random_data, 17);
|
||||
std::ofstream("./tmp1_web_seed/test_torrent_dir/test5").write(random_data, 16536);
|
||||
std::ofstream("./tmp1_web_seed/test_torrent_dir/test6").write(random_data, 300000);
|
||||
std::ofstream("./tmp1_web_seed/test_torrent_dir/test7").write(random_data, 300000);
|
||||
|
||||
file_storage fs;
|
||||
add_files(fs, path("test_torrent_dir"));
|
||||
add_files(fs, path("./tmp1_web_seed/test_torrent_dir"));
|
||||
|
||||
libtorrent::create_torrent t(fs, 16 * 1024);
|
||||
t.add_url_seed("http://127.0.0.1:8000/");
|
||||
t.add_url_seed("http://127.0.0.1:8000/tmp1_web_seed");
|
||||
|
||||
start_web_server(8000);
|
||||
|
||||
// calculate the hash for all pieces
|
||||
int num = t.num_pieces();
|
||||
char* buf = page_aligned_allocator::malloc(t.piece_length());
|
||||
|
||||
file_pool fp;
|
||||
boost::scoped_ptr<storage_interface> s(default_storage_constructor(
|
||||
fs, ".", fp));
|
||||
|
||||
for (int i = 0; i < num; ++i)
|
||||
{
|
||||
s->read(buf, i, 0, fs.piece_size(i));
|
||||
hasher h(buf, fs.piece_size(i));
|
||||
t.set_hash(i, h.final());
|
||||
}
|
||||
|
||||
set_piece_hashes(t, "./tmp1_web_seed");
|
||||
boost::intrusive_ptr<torrent_info> torrent_file(new torrent_info(t.generate()));
|
||||
|
||||
for (int i = 0; i < 6; ++i)
|
||||
test_transfer(torrent_file, i);
|
||||
|
||||
torrent_file->rename_file(0, "./test_torrent_dir/renamed_test1");
|
||||
torrent_file->rename_file(0, "./tmp2_web_seed/test_torrent_dir/renamed_test1");
|
||||
test_transfer(torrent_file, 0);
|
||||
|
||||
stop_web_server(8000);
|
||||
remove_all("./test_torrent_dir");
|
||||
page_aligned_allocator::free(buf);
|
||||
remove_all("./tmp1_web_seed");
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue