fix error handling by exercising code paths where memory allocations fail (#1221)

fix error handling by exercising code paths where memory allocations fail
This commit is contained in:
Arvid Norberg 2016-10-19 01:18:05 -04:00 committed by GitHub
parent 85fe06659e
commit 524f7b1c27
20 changed files with 582 additions and 150 deletions

View File

@ -179,6 +179,7 @@ nobase_include_HEADERS = \
aux_/portmap.hpp \
aux_/lsd.hpp \
aux_/has_block.hpp \
aux_/scope_end.hpp \
\
extensions/smart_ban.hpp \
extensions/ut_metadata.hpp \

View File

@ -0,0 +1,64 @@
/*
Copyright (c) 2016, Arvid Norberg
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in
the documentation and/or other materials provided with the distribution.
* Neither the name of the author nor the names of its
contributors may be used to endorse or promote products derived
from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef TORRENT_SCOPE_END_HPP_INCLUDED
#define TORRENT_SCOPE_END_HPP_INCLUDED
#include <utility>
namespace libtorrent { namespace aux {
template <typename Fun>
struct scope_end_impl
{
explicit scope_end_impl(Fun f) : m_fun(std::move(f)) {}
~scope_end_impl() { if (m_armed) m_fun(); }
// movable
scope_end_impl(scope_end_impl&&) = default;
scope_end_impl& operator=(scope_end_impl&&) = default;
// non-copyable
scope_end_impl(scope_end_impl const&) = delete;
scope_end_impl& operator=(scope_end_impl const&) = delete;
void disarm() { m_armed = false; }
private:
Fun m_fun;
bool m_armed = true;
};
template <typename Fun>
scope_end_impl<Fun> scope_end(Fun f) { return scope_end_impl<Fun>(std::move(f)); }
}}
#endif

View File

@ -257,6 +257,9 @@ namespace libtorrent
// indicates which LRU list this piece is chained into
enum cache_state_t
{
// not added to the cache
none,
// this is the LRU list for pieces with dirty blocks
write_lru,
@ -314,18 +317,18 @@ namespace libtorrent
// ---- 32 bit boundary ---
// the sum of all refcounts in all blocks
std::uint32_t refcount;
std::uint32_t refcount = 0;
#if TORRENT_USE_ASSERTS
// the number of times this piece has finished hashing
int hash_passes;
int hash_passes = 0;
// this is a debug facility to keep a log
// of which operations have been run on this piece
std::vector<piece_log_t> piece_log;
bool in_storage;
bool in_use;
bool in_storage = false;
bool in_use = true;
#endif
};

View File

@ -136,6 +136,7 @@ namespace libtorrent
private:
void check_buffer_level(std::unique_lock<std::mutex>& l);
void remove_buffer_in_use(char* buf);
mutable std::mutex m_pool_mutex;

View File

@ -618,6 +618,12 @@ namespace libtorrent
std::mutex m_completed_jobs_mutex;
jobqueue_t m_completed_jobs;
// this is protected by the completed_jobs_mutex. It's true whenever
// there's a call_job_handlers message in-flight to the network thread. We
// only ever keep one such message in flight at a time, and coalesce
// completion callbacks in m_completed jobs
bool m_job_completions_in_flight = false;
// these are blocks that have been returned by the main thread
// but they haven't been freed yet. This is used to batch
// reclaiming of blocks, to only need one std::mutex lock per cycle

View File

@ -530,7 +530,8 @@ namespace libtorrent
partfile_read,
partfile_write,
check_resume,
hard_link
hard_link,
exception
};
// Returns a string literal representing the file operation
@ -543,7 +544,7 @@ namespace libtorrent
"", "stat", "mkdir", "open", "rename", "remove", "copy"
, "read", "write", "fallocate", "allocate cache piece"
, "partfile move", "partfile read", "partfile write"
, "check resume", "hard_link"
, "check resume", "hard_link", "exception"
};
return ops[operation];
}

View File

@ -27,7 +27,23 @@ namespace libtorrent
template<class T>
void check_invariant(T const& x)
{
invariant_access::check_invariant(x);
#ifndef BOOST_NO_EXCEPTIONS
try
{
invariant_access::check_invariant(x);
}
catch (std::exception const& err)
{
std::fprintf(stderr, "invariant_check failed with exception: %s"
, err.what());
}
catch (...)
{
std::fprintf(stderr, "invariant_check failed with exception");
}
#else
invariant_access::check_invariant(x);
#endif
}
struct invariant_checker {};
@ -38,14 +54,7 @@ namespace libtorrent
explicit invariant_checker_impl(T const& self_)
: self(self_)
{
TORRENT_TRY
{
check_invariant(self);
}
TORRENT_CATCH_ALL
{
TORRENT_ASSERT_FAIL();
}
check_invariant(self);
}
invariant_checker_impl(invariant_checker_impl const& rhs)
@ -53,14 +62,7 @@ namespace libtorrent
~invariant_checker_impl()
{
TORRENT_TRY
{
check_invariant(self);
}
TORRENT_CATCH_ALL
{
TORRENT_ASSERT_FAIL();
}
check_invariant(self);
}
T const& self;

View File

@ -52,7 +52,7 @@ namespace libtorrent
{
if (index == -1) return;
TORRENT_ASSERT(index >= 0 && index < int(list.size()));
int last = int(list.size()) - 1;
int const last = int(list.size()) - 1;
if (index < last)
{
list[last]->m_links[link_index].index = index;
@ -67,8 +67,8 @@ namespace libtorrent
{
if (index >= 0) return;
TORRENT_ASSERT(index == -1);
index = int(list.size());
list.push_back(self);
index = int(list.size()) - 1;
}
};
}

View File

@ -299,6 +299,21 @@ namespace libtorrent
// starts the announce timer
void start(add_torrent_params const& p);
void added()
{
TORRENT_ASSERT(m_added == false);
m_added = true;
update_gauge();
}
void removed()
{
TORRENT_ASSERT(m_added == true);
m_added = false;
// make sure we decrement the gauge counter for this torrent
update_gauge();
}
void start_download_url();
// returns which stats gauge this torrent currently
@ -1083,6 +1098,8 @@ namespace libtorrent
}
void add_suggest_piece(int index);
enum { no_gauge_state = 0xf };
private:
void ip_filter_updated();
@ -1403,6 +1420,10 @@ namespace libtorrent
// is is disabled while paused and checking files
bool m_announcing:1;
// this is true when the torrent has been added to the session. Before
// then, it isn't included in the counters (session_stats)
bool m_added:1;
// this is > 0 while the tracker deadline timer
// is in use. i.e. one or more trackers are waiting
// for a reannounce
@ -1580,7 +1601,6 @@ namespace libtorrent
// slots.
bool m_auto_managed:1;
enum { no_gauge_state = 0xf };
// the current stats gauge this torrent counts against
std::uint32_t m_current_gauge_state:4;

View File

@ -258,7 +258,7 @@ namespace libtorrent
bool cancelled() const { return m_abort; }
virtual void on_timeout(error_code const& ec) = 0;
virtual ~timeout_handler() {}
virtual ~timeout_handler();
io_service& get_io_service() { return m_timeout.get_io_service(); }
@ -266,7 +266,7 @@ namespace libtorrent
void timeout_callback(error_code const&);
int m_completion_timeout;
int m_completion_timeout = 0;
// used for timeouts
// this is set when the request has been sent
@ -278,9 +278,12 @@ namespace libtorrent
// the asio async operation
deadline_timer m_timeout;
int m_read_timeout;
int m_read_timeout = 0;
bool m_abort;
bool m_abort = false;
#if TORRENT_USE_ASSERTS
int m_outstanding_timer_wait = 0;
#endif
};
// TODO: 2 this class probably doesn't need to have virtual functions.
@ -402,11 +405,10 @@ namespace libtorrent
resolver_interface& m_host_resolver;
aux::session_settings const& m_settings;
counters& m_stats_counters;
bool m_abort = false;
#if !defined TORRENT_DISABLE_LOGGING || TORRENT_USE_ASSERTS
aux::session_logger& m_ses;
#endif
bool m_abort;
};
}

View File

@ -52,3 +52,6 @@ alias libtorrent-sims :
[ run test_save_resume.cpp ]
;
run test_error_handling.cpp ;
explicit test_error_handling ;

View File

@ -0,0 +1,188 @@
/*
Copyright (c) 2016, Arvid Norberg
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in
the documentation and/or other materials provided with the distribution.
* Neither the name of the author nor the names of its
contributors may be used to endorse or promote products derived
from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
*/
#include <array>
#include "test.hpp"
#include "create_torrent.hpp"
#include "settings.hpp"
#include "libtorrent/session.hpp"
#include "libtorrent/session_stats.hpp"
#include "libtorrent/settings_pack.hpp"
#include "libtorrent/ip_filter.hpp"
#include "libtorrent/alert_types.hpp"
#include "libtorrent/aux_/proxy_settings.hpp"
#include "libtorrent/settings_pack.hpp"
#include "simulator/simulator.hpp"
#include "simulator/socks_server.hpp"
#include "simulator/utils.hpp"
#include "setup_swarm.hpp"
#include "utils.hpp"
#include "setup_transfer.hpp" // for addr()
using namespace sim;
namespace lt = libtorrent;
std::string make_ep_string(char const* address, bool const is_v6
, char const* port)
{
std::string ret;
if (is_v6) ret += '[';
ret += address;
if (is_v6) ret += ']';
ret += ':';
ret += port;
return ret;
}
template <typename HandleAlerts, typename Test>
void run_test(HandleAlerts const& on_alert, Test const& test)
{
using namespace libtorrent;
using asio::ip::address;
address const peer0 = addr("50.0.0.1");
address const peer1 = addr("50.0.0.2");
// setup the simulation
sim::default_config network_cfg;
sim::simulation sim{network_cfg};
sim::asio::io_service ios0 { sim, peer0 };
sim::asio::io_service ios1 { sim, peer1 };
lt::session_proxy zombie[2];
// setup settings pack to use for the session (customization point)
lt::settings_pack pack = settings();
// disable utp by default
pack.set_bool(settings_pack::enable_outgoing_utp, false);
pack.set_bool(settings_pack::enable_incoming_utp, false);
// disable encryption by default
pack.set_bool(settings_pack::prefer_rc4, false);
pack.set_int(settings_pack::in_enc_policy, settings_pack::pe_disabled);
pack.set_int(settings_pack::out_enc_policy, settings_pack::pe_disabled);
pack.set_int(settings_pack::allowed_enc_level, settings_pack::pe_plaintext);
pack.set_str(settings_pack::listen_interfaces, peer0.to_string() + ":6881");
// create session
std::shared_ptr<lt::session> ses[2];
ses[0] = std::make_shared<lt::session>(pack, ios0);
pack.set_str(settings_pack::listen_interfaces, peer1.to_string() + ":6881");
ses[1] = std::make_shared<lt::session>(pack, ios1);
// only monitor alerts for session 0 (the downloader)
print_alerts(*ses[0], [=](lt::session& ses, lt::alert const* a) {
if (auto ta = alert_cast<lt::torrent_added_alert>(a))
{
ta->handle.connect_peer(lt::tcp::endpoint(peer1, 6881));
}
on_alert(ses, a);
});
print_alerts(*ses[1]);
// the first peer is a downloader, the second peer is a seed
lt::add_torrent_params params = create_torrent(1);
params.flags &= ~lt::add_torrent_params::flag_auto_managed;
params.flags &= ~lt::add_torrent_params::flag_paused;
params.save_path = save_path(0);
ses[0]->async_add_torrent(params);
params.save_path = save_path(1);
ses[1]->async_add_torrent(params);
sim::timer t(sim, lt::seconds(60), [&](boost::system::error_code const& ec)
{
test(ses);
// shut down
int idx = 0;
for (auto& s : ses)
{
zombie[idx++] = s->abort();
s.reset();
}
});
sim.run();
}
int g_alloc_counter = 1000000;
void* operator new(std::size_t sz)
{
if (--g_alloc_counter == 0)
{
throw std::bad_alloc();
}
return std::malloc(sz);
}
void operator delete(void* ptr) noexcept
{
std::free(ptr);
}
TORRENT_TEST(no_proxy_tcp)
{
for (int i = 0; i < 3000; ++i)
{
std::printf("\n\n === ROUND %d ===\n\n", i);
try
{
g_alloc_counter = 100 + i;
using namespace libtorrent;
run_test(
[](lt::session&, lt::alert const*) {},
[](std::shared_ptr<lt::session> ses[2]) {}
);
}
catch (std::bad_alloc const&)
{
// this is kind of expected
}
catch (...)
{
TEST_ERROR("session constructor terminated with unexpected exception. round: "
+ std::to_string(i));
}
// if we didn't fail any allocations this run, there's no need to
// continue, we won't exercise any new code paths
if (g_alloc_counter > 0) break;
}
}

View File

@ -126,15 +126,22 @@ void print_alerts(lt::session& ses
ses.set_alert_notify([&ses,start_time,on_alert] {
ses.get_io_service().post([&ses,start_time,on_alert] {
std::vector<lt::alert*> alerts;
ses.pop_alerts(&alerts);
try {
std::vector<lt::alert*> alerts;
ses.pop_alerts(&alerts);
for (lt::alert const* a : alerts)
{
std::printf("%-3d [0] %s\n", int(lt::duration_cast<lt::seconds>(a->timestamp()
- start_time).count()), a->message().c_str());
// call the user handler
on_alert(ses, a);
for (lt::alert const* a : alerts)
{
std::printf("%-3d [0] %s\n", int(lt::duration_cast<lt::seconds>(a->timestamp()
- start_time).count()), a->message().c_str());
// call the user handler
on_alert(ses, a);
}
} catch (std::exception const& e) {
std::printf("print alerts: ERROR failed with exception: %s"
, e.what());
} catch (...) {
std::printf("print alerts: ERROR failed with (unknown) exception");
}
} ); } );
}

View File

@ -320,17 +320,11 @@ cached_piece_entry::cached_piece_entry()
, hashing_done(0)
, marked_for_deletion(false)
, need_readback(false)
, cache_state(read_lru1)
, cache_state(none)
, piece_refcount(0)
, outstanding_flush(0)
, outstanding_read(0)
, pinned(0)
, refcount(0)
#if TORRENT_USE_ASSERTS
, hash_passes(0)
, in_storage(false)
, in_use(true)
#endif
{}
cached_piece_entry::~cached_piece_entry()
@ -339,13 +333,16 @@ cached_piece_entry::~cached_piece_entry()
TORRENT_ASSERT(jobs.size() == 0);
TORRENT_ASSERT(read_jobs.size() == 0);
#if TORRENT_USE_ASSERTS
for (int i = 0; i < blocks_in_piece; ++i)
if (blocks)
{
TORRENT_ASSERT(blocks[i].buf == nullptr);
TORRENT_ASSERT(!blocks[i].pending);
TORRENT_ASSERT(blocks[i].refcount == 0);
TORRENT_ASSERT(blocks[i].hashing_count == 0);
TORRENT_ASSERT(blocks[i].flushing_count == 0);
for (int i = 0; i < blocks_in_piece; ++i)
{
TORRENT_ASSERT(blocks[i].buf == nullptr);
TORRENT_ASSERT(!blocks[i].pending);
TORRENT_ASSERT(blocks[i].refcount == 0);
TORRENT_ASSERT(blocks[i].hashing_count == 0);
TORRENT_ASSERT(blocks[i].flushing_count == 0);
}
}
in_use = false;
#endif
@ -646,14 +643,14 @@ cached_piece_entry* block_cache::allocate_piece(disk_io_job const* j, int cache_
pe.storage = j->storage;
pe.expire = aux::time_now();
pe.blocks_in_piece = blocks_in_piece;
pe.blocks.reset(new (std::nothrow) cached_block_entry[blocks_in_piece]);
pe.cache_state = cache_state;
pe.last_requester = j->requester;
TORRENT_PIECE_ASSERT(pe.blocks, &pe);
if (!pe.blocks) return nullptr;
pe.last_requester = j->requester;
p = const_cast<cached_piece_entry*>(&*m_pieces.insert(pe).first);
j->storage->add_piece(p);
p->cache_state = cache_state;
TORRENT_PIECE_ASSERT(p->cache_state < cached_piece_entry::num_lrus, p);
linked_list<cached_piece_entry>* lru_list = &m_lru[p->cache_state];
@ -1667,7 +1664,7 @@ void block_cache::check_invariant() const
}
std::unordered_set<char*> buffers;
for (auto const& p :m_pieces)
for (auto const& p : m_pieces)
{
TORRENT_PIECE_ASSERT(p.blocks, &p);
@ -1677,7 +1674,7 @@ void block_cache::check_invariant() const
int num_pending = 0;
int num_refcount = 0;
bool in_storage = p.storage->has_piece(&p);
bool const in_storage = p.storage->has_piece(&p);
switch (p.cache_state)
{
case cached_piece_entry::write_lru:

View File

@ -232,7 +232,12 @@ namespace libtorrent
// we need to roll back and free all the buffers
// we've already allocated
for (int j = 0; j < i; ++j)
free_buffer_impl(static_cast<char*>(iov[j].iov_base), l);
{
char* buf = static_cast<char*>(iov[j].iov_base);
TORRENT_ASSERT(is_disk_buffer(buf, l));
free_buffer_impl(buf, l);
remove_buffer_in_use(buf);
}
return -1;
}
}
@ -244,7 +249,12 @@ namespace libtorrent
// TODO: perhaps we should sort the buffers here?
std::unique_lock<std::mutex> l(m_pool_mutex);
for (int i = 0; i < iov_len; ++i)
free_buffer_impl(static_cast<char*>(iov[i].iov_base), l);
{
char* buf = static_cast<char*>(iov[i].iov_base);
TORRENT_ASSERT(is_disk_buffer(buf, l));
free_buffer_impl(buf, l);
remove_buffer_in_use(buf);
}
check_buffer_level(l);
}
@ -304,12 +314,21 @@ namespace libtorrent
}
}
++m_in_use;
#if TORRENT_USE_INVARIANT_CHECKS
TORRENT_ASSERT(m_buffers_in_use.count(ret) == 0);
m_buffers_in_use.insert(ret);
try
{
TORRENT_ASSERT(m_buffers_in_use.count(ret) == 0);
m_buffers_in_use.insert(ret);
}
catch (...)
{
free_buffer_impl(ret, l);
return nullptr;
}
#endif
++m_in_use;
if (m_in_use >= m_low_watermark + (m_max_use - m_low_watermark)
/ 2 && !m_exceeded_max_size)
{
@ -331,8 +350,9 @@ namespace libtorrent
for (; bufvec != end; ++bufvec)
{
char* buf = *bufvec;
TORRENT_ASSERT(buf);
TORRENT_ASSERT(is_disk_buffer(buf, l));
free_buffer_impl(buf, l);
remove_buffer_in_use(buf);
}
check_buffer_level(l);
@ -341,7 +361,9 @@ namespace libtorrent
void disk_buffer_pool::free_buffer(char* buf)
{
std::unique_lock<std::mutex> l(m_pool_mutex);
TORRENT_ASSERT(is_disk_buffer(buf, l));
free_buffer_impl(buf, l);
remove_buffer_in_use(buf);
check_buffer_level(l);
}
@ -514,12 +536,21 @@ namespace libtorrent
#endif
}
void disk_buffer_pool::remove_buffer_in_use(char* buf)
{
TORRENT_UNUSED(buf);
#if TORRENT_USE_INVARIANT_CHECKS
std::set<char*>::iterator i = m_buffers_in_use.find(buf);
TORRENT_ASSERT(i != m_buffers_in_use.end());
m_buffers_in_use.erase(i);
#endif
}
void disk_buffer_pool::free_buffer_impl(char* buf, std::unique_lock<std::mutex>& l)
{
TORRENT_ASSERT(buf);
TORRENT_ASSERT(m_magic == 0x1337);
TORRENT_ASSERT(m_settings_set);
TORRENT_ASSERT(is_disk_buffer(buf, l));
TORRENT_ASSERT(l.owns_lock());
TORRENT_UNUSED(l);
@ -558,12 +589,6 @@ namespace libtorrent
#endif // TORRENT_DISABLE_POOL_ALLOCATOR
}
#if TORRENT_USE_INVARIANT_CHECKS
std::set<char*>::iterator i = m_buffers_in_use.find(buf);
TORRENT_ASSERT(i != m_buffers_in_use.end());
m_buffers_in_use.erase(i);
#endif
--m_in_use;
#ifndef TORRENT_DISABLE_POOL_ALLOCATOR

View File

@ -125,6 +125,53 @@ namespace libtorrent
return ret;
}
struct piece_refcount_holder
{
explicit piece_refcount_holder(cached_piece_entry* p) : m_pe(p)
{ ++m_pe->piece_refcount; }
~piece_refcount_holder()
{
if (!m_executed)
{
TORRENT_PIECE_ASSERT(m_pe->piece_refcount > 0, m_pe);
--m_pe->piece_refcount;
}
}
piece_refcount_holder(piece_refcount_holder const&) = delete;
piece_refcount_holder& operator=(piece_refcount_holder const&) = delete;
void release()
{
TORRENT_ASSERT(!m_executed);
m_executed = true;
TORRENT_PIECE_ASSERT(m_pe->piece_refcount > 0, m_pe);
--m_pe->piece_refcount;
}
private:
cached_piece_entry* m_pe;
bool m_executed = false;
};
template <typename Lock>
struct scoped_unlocker_impl
{
explicit scoped_unlocker_impl(Lock& l) : m_lock(&l) { m_lock->unlock(); }
~scoped_unlocker_impl() { if (m_lock) m_lock->lock(); }
scoped_unlocker_impl(scoped_unlocker_impl&& rhs) : m_lock(rhs.m_lock)
{ rhs.m_lock = nullptr; }
scoped_unlocker_impl& operator=(scoped_unlocker_impl&& rhs)
{
if (m_lock) m_lock->lock();
m_lock = rhs.m_lock;
rhs.m_lock = nullptr;
}
private:
Lock* m_lock;
};
template <typename Lock>
scoped_unlocker_impl<Lock> scoped_unlock(Lock& l)
{ return scoped_unlocker_impl<Lock>(l); }
} // anonymous namespace
// ------- disk_io_thread ------
@ -451,12 +498,13 @@ namespace libtorrent
return 0;
}
l.unlock();
storage_error error;
flush_iovec(first_piece, iov, flushing, iov_len, error);
l.lock();
{
// unlock while we're performing the actual disk I/O
// then lock again
auto unlock = scoped_unlock(l);
flush_iovec(first_piece, iov, flushing, iov_len, error);
}
block_start = 0;
for (int i = 0; i < cont_pieces; ++i)
@ -703,17 +751,15 @@ namespace libtorrent
#if TORRENT_USE_ASSERTS
pe->piece_log.push_back(piece_log_t(piece_log_t::flush_range, -1));
#endif
++pe->piece_refcount;
l.unlock();
storage_error error;
flush_iovec(pe, iov, flushing, iov_len, error);
{
piece_refcount_holder refcount_holder(pe);
auto unlocker = scoped_unlock(l);
l.lock();
flush_iovec(pe, iov, flushing, iov_len, error);
}
TORRENT_PIECE_ASSERT(pe->piece_refcount > 0, pe);
--pe->piece_refcount;
iovec_flushed(pe, flushing, iov_len, 0, error, completed_jobs);
// if the cache is under high pressure, we need to evict
@ -1068,7 +1114,30 @@ namespace libtorrent
m_stats_counters.inc_stats_counter(counters::num_running_disk_jobs, 1);
// call disk function
int ret = (this->*(job_functions[j->action]))(j, completed_jobs);
// TODO: in the future, propagate exceptions back to the handlers
int ret = 0;
try
{
ret = (this->*(job_functions[j->action]))(j, completed_jobs);
}
catch (boost::system::system_error const& err)
{
ret = -1;
j->error.ec = err.code();
j->error.operation = storage_error::exception;
}
catch (std::bad_alloc const&)
{
ret = -1;
j->error.ec = errors::no_memory;
j->error.operation = storage_error::exception;
}
catch (std::exception const&)
{
ret = -1;
j->error.ec = boost::asio::error::fault;
j->error.operation = storage_error::exception;
}
// note that -2 errors are OK
TORRENT_ASSERT(ret != -1 || (j->error.ec && j->error.operation != 0));
@ -2158,9 +2227,10 @@ namespace libtorrent
m_disk_cache.cache_hit(pe, j->requester, (j->flags & disk_io_job::volatile_read) != 0);
TORRENT_PIECE_ASSERT(pe->cache_state <= cached_piece_entry::read_lru1 || pe->cache_state == cached_piece_entry::read_lru2, pe);
++pe->piece_refcount;
kick_hasher(pe, l);
--pe->piece_refcount;
{
piece_refcount_holder h(pe);
kick_hasher(pe, l);
}
TORRENT_PIECE_ASSERT(pe->cache_state <= cached_piece_entry::read_lru1 || pe->cache_state == cached_piece_entry::read_lru2, pe);
@ -2215,7 +2285,8 @@ namespace libtorrent
TORRENT_PIECE_ASSERT(pe->cache_state <= cached_piece_entry::read_lru1
|| pe->cache_state == cached_piece_entry::read_lru2, pe);
++pe->piece_refcount;
piece_refcount_holder refcount_holder(pe);
if (pe->hash == nullptr)
{
@ -2277,14 +2348,12 @@ namespace libtorrent
if (iov.iov_base == nullptr)
{
l.lock();
// TODO: introduce a holder class that automatically increments
// and decrements the piece_refcount
// decrement the refcounts of the blocks we just hashed
for (int k = 0; k < num_locked_blocks; ++k)
m_disk_cache.dec_block_refcount(pe, locked_blocks[k], block_cache::ref_hashing);
--pe->piece_refcount;
refcount_holder.release();
pe->hashing = false;
delete pe->hash;
pe->hash = nullptr;
@ -2352,7 +2421,7 @@ namespace libtorrent
for (int i = 0; i < num_locked_blocks; ++i)
m_disk_cache.dec_block_refcount(pe, locked_blocks[i], block_cache::ref_hashing);
--pe->piece_refcount;
refcount_holder.release();
pe->hashing = 0;
@ -2502,7 +2571,8 @@ namespace libtorrent
#endif
TORRENT_PIECE_ASSERT(pe->cache_state <= cached_piece_entry::read_lru1
|| pe->cache_state == cached_piece_entry::read_lru2, pe);
++pe->piece_refcount;
piece_refcount_holder refcount_holder(pe);
int block_size = m_disk_cache.block_size();
int piece_size = j->storage->files()->piece_size(j->piece);
@ -2526,8 +2596,7 @@ namespace libtorrent
if (iov.iov_base == nullptr)
{
//#error introduce a holder class that automatically increments and decrements the piece_refcount
--pe->piece_refcount;
refcount_holder.release();
m_disk_cache.maybe_free_piece(pe);
j->error.ec = errors::no_memory;
j->error.operation = storage_error::alloc_cache_piece;
@ -2565,7 +2634,7 @@ namespace libtorrent
m_disk_cache.insert_blocks(pe, i, &iov, 1, j);
}
--pe->piece_refcount;
refcount_holder.release();
m_disk_cache.maybe_free_piece(pe);
return 0;
}
@ -2757,7 +2826,8 @@ namespace libtorrent
#endif
TORRENT_PIECE_ASSERT(pe->cache_state <= cached_piece_entry::read_lru1
|| pe->cache_state == cached_piece_entry::read_lru2, pe);
++pe->piece_refcount;
piece_refcount_holder refcount_holder(pe);
if (!pe->hashing_done)
{
@ -2782,7 +2852,7 @@ namespace libtorrent
TORRENT_ASSERT(l.owns_lock());
--pe->piece_refcount;
refcount_holder.release();
m_disk_cache.maybe_free_piece(pe);
@ -3318,10 +3388,11 @@ namespace libtorrent
#endif
}
#if DEBUG_DISK_THREAD
if (ret) DLOG("unblocked %d jobs (%d left)\n", ret
, int(m_stats_counters[counters::blocked_disk_jobs]) - ret);
#endif
if (ret)
{
DLOG("unblocked %d jobs (%d left)\n", ret
, int(m_stats_counters[counters::blocked_disk_jobs]) - ret);
}
m_stats_counters.inc_stats_counter(counters::blocked_disk_jobs, -ret);
TORRENT_ASSERT(int(m_stats_counters[counters::blocked_disk_jobs]) >= 0);
@ -3414,9 +3485,10 @@ namespace libtorrent
}
l_.unlock();
std::unique_lock<std::mutex> l(m_job_mutex);
m_generic_io_jobs.m_queued_jobs.append(other_jobs);
l.unlock();
{
std::lock_guard<std::mutex> l(m_job_mutex);
m_generic_io_jobs.m_queued_jobs.append(other_jobs);
}
while (flush_jobs.size() > 0)
{
@ -3424,25 +3496,23 @@ namespace libtorrent
add_job(j, false);
}
l.lock();
m_generic_io_jobs.m_job_cond.notify_all();
m_generic_threads.job_queued(m_generic_io_jobs.m_queued_jobs.size());
l.unlock();
{
std::lock_guard<std::mutex> l(m_job_mutex);
m_generic_io_jobs.m_job_cond.notify_all();
m_generic_threads.job_queued(m_generic_io_jobs.m_queued_jobs.size());
}
}
std::unique_lock<std::mutex> l(m_completed_jobs_mutex);
bool const need_post = m_completed_jobs.size() == 0;
std::lock_guard<std::mutex> l(m_completed_jobs_mutex);
m_completed_jobs.append(jobs);
l.unlock();
if (need_post)
if (!m_job_completions_in_flight)
{
#if DEBUG_DISK_THREAD
// we take this lock just to make the logging prettier (non-interleaved)
DLOG("posting job handlers (%d)\n", m_completed_jobs.size());
#endif
m_ios.post(std::bind(&disk_io_thread::call_job_handlers, this, m_userdata));
m_job_completions_in_flight = true;
}
}
@ -3454,9 +3524,10 @@ namespace libtorrent
{
std::unique_lock<std::mutex> l(m_completed_jobs_mutex);
#if DEBUG_DISK_THREAD
DLOG("call_job_handlers (%d)\n", m_completed_jobs.size());
#endif
TORRENT_ASSERT(m_job_completions_in_flight);
m_job_completions_in_flight = false;
int const num_jobs = m_completed_jobs.size();
disk_io_job* j = m_completed_jobs.get_all();

View File

@ -98,6 +98,7 @@ const rlim_t rlim_infinity = RLIM_INFINITY;
#include "libtorrent/platform_util.hpp"
#include "libtorrent/aux_/bind_to_device.hpp"
#include "libtorrent/hex.hpp" // to_hex, from_hex
#include "libtorrent/aux_/scope_end.hpp"
#ifndef TORRENT_DISABLE_LOGGING
@ -4536,21 +4537,24 @@ namespace aux {
void session_impl::async_add_torrent(add_torrent_params* params)
{
std::unique_ptr<add_torrent_params> holder(params);
if (string_begins_no_case("file://", params->url.c_str()) && !params->ti)
{
m_disk_thread.async_load_torrent(params
, std::bind(&session_impl::on_async_load_torrent, this, _1));
holder.release();
return;
}
error_code ec;
add_torrent(*params, ec);
delete params;
}
void session_impl::on_async_load_torrent(disk_io_job const* j)
{
add_torrent_params* params = reinterpret_cast<add_torrent_params*>(j->requester);
std::unique_ptr<add_torrent_params> holder(params);
if (j->error.ec)
{
m_alerts.emplace_alert<add_torrent_alert>(torrent_handle()
@ -4564,8 +4568,6 @@ namespace aux {
error_code ec;
add_torrent(*params, ec);
}
delete params;
}
#ifndef TORRENT_DISABLE_EXTENSIONS
@ -4587,6 +4589,11 @@ namespace aux {
// params is updated by add_torrent_impl()
add_torrent_params params = p;
std::shared_ptr<torrent> torrent_ptr;
// in case there's an error, make sure to abort the torrent before leaving
// the scope
auto abort_torrent = aux::scope_end([&]{ if (torrent_ptr) torrent_ptr->abort(); });
bool added;
std::tie(torrent_ptr, added) = add_torrent_impl(params, ec);
@ -4634,7 +4641,7 @@ namespace aux {
if (m_next_dht_torrent != m_torrents.end())
next_dht = m_next_dht_torrent->first;
#endif
float load_factor = m_torrents.load_factor();
float const load_factor = m_torrents.load_factor();
m_torrents.insert(std::make_pair(params.info_hash, torrent_ptr));
@ -4647,6 +4654,10 @@ namespace aux {
m_obfuscated_torrents.insert(std::make_pair(h.final(), torrent_ptr));
#endif
// once we successfully add the torrent, we can disarm the abort action
abort_torrent.disarm();
torrent_ptr->added();
// if this insert made the hash grow, the iterators became invalid
// we need to reset them
if (m_torrents.load_factor() < load_factor)
@ -4991,6 +5002,7 @@ namespace aux {
++m_next_lsd_torrent;
m_torrents.erase(i);
tptr->removed();
#if !defined(TORRENT_DISABLE_ENCRYPTION) && !defined(TORRENT_DISABLE_EXTENSIONS)
static char const req2[4] = {'r', 'e', 'q', '2'};
@ -5755,8 +5767,7 @@ namespace aux {
// this is not allowed to be the network thread!
// TORRENT_ASSERT(is_not_thread());
TORRENT_ASSERT(m_torrents.empty());
TORRENT_ASSERT(m_connections.empty());
// TODO: asserts that no outstanding async operations are still in flight
#if defined TORRENT_ASIO_DEBUGGING
FILE* f = fopen("wakeups.log", "w+");
@ -6665,8 +6676,9 @@ namespace aux {
}
}
int torrent_state_gauges[counters::num_error_torrents - counters::num_checking_torrents + 1];
memset(torrent_state_gauges, 0, sizeof(torrent_state_gauges));
int const num_gauges = counters::num_error_torrents - counters::num_checking_torrents + 1;
std::array<int, num_gauges> torrent_state_gauges;
torrent_state_gauges.fill(0);
#if defined TORRENT_EXPENSIVE_INVARIANT_CHECKS
@ -6676,15 +6688,18 @@ namespace aux {
int num_active_downloading = 0;
int num_active_finished = 0;
int total_downloaders = 0;
for (torrent_map::const_iterator i = m_torrents.begin()
, end(m_torrents.end()); i != end; ++i)
for (auto const& tor : m_torrents)
{
std::shared_ptr<torrent> t = i->second;
std::shared_ptr<torrent> const& t = tor.second;
if (t->want_peers_download()) ++num_active_downloading;
if (t->want_peers_finished()) ++num_active_finished;
TORRENT_ASSERT(!(t->want_peers_download() && t->want_peers_finished()));
++torrent_state_gauges[t->current_stats_state() - counters::num_checking_torrents];
int const state = t->current_stats_state() - counters::num_checking_torrents;
if (state != torrent::no_gauge_state)
{
++torrent_state_gauges[state];
}
int pos = t->queue_position();
if (pos < 0)

View File

@ -189,6 +189,7 @@ namespace libtorrent
, m_files_checked(false)
, m_storage_mode(p.storage_mode)
, m_announcing(false)
, m_added(false)
, m_active_time(0)
, m_finished_time(0)
, m_sequential_download(false)
@ -499,7 +500,8 @@ namespace libtorrent
int torrent::current_stats_state() const
{
if (m_abort) return counters::num_checking_torrents + no_gauge_state;
if (m_abort || !m_added)
return counters::num_checking_torrents + no_gauge_state;
if (has_error()) return counters::num_error_torrents;
if (m_paused || m_graceful_pause_mode)
@ -521,7 +523,7 @@ namespace libtorrent
void torrent::update_gauge()
{
int new_gauge_state = current_stats_state() - counters::num_checking_torrents;
int const new_gauge_state = current_stats_state() - counters::num_checking_torrents;
TORRENT_ASSERT(new_gauge_state >= 0);
TORRENT_ASSERT(new_gauge_state <= no_gauge_state);
@ -790,7 +792,8 @@ namespace libtorrent
torrent::~torrent()
{
TORRENT_ASSERT(m_abort);
// TODO: 3 assert there are no outstanding async operations on this
// torrent
#if TORRENT_USE_ASSERTS
for (int i = 0; i < aux::session_interface::num_torrent_lists; ++i)
@ -815,7 +818,6 @@ namespace libtorrent
// network thread cannot be maintained
TORRENT_ASSERT(m_peer_class == 0);
TORRENT_ASSERT(m_abort);
TORRENT_ASSERT(m_connections.empty());
if (!m_connections.empty())
disconnect_all(errors::torrent_aborted, op_bittorrent);
@ -1892,15 +1894,15 @@ namespace libtorrent
}
#endif // TORRENT_DISABLE_MUTABLE_TORRENTS
m_ses.disk_thread().async_check_files(
m_storage.get(), m_add_torrent_params ? m_add_torrent_params.get() : nullptr
, links, std::bind(&torrent::on_resume_data_checked
, shared_from_this(), _1));
// async_check_files will gut links
#if TORRENT_USE_ASSERTS
TORRENT_ASSERT(m_outstanding_check_files == false);
m_outstanding_check_files = true;
#endif
m_ses.disk_thread().async_check_files(
m_storage.get(), m_add_torrent_params ? m_add_torrent_params.get() : nullptr
, links, std::bind(&torrent::on_resume_data_checked
, shared_from_this(), _1));
#ifndef TORRENT_DISABLE_LOGGING
debug_log("init, async_check_files");
#endif
@ -4287,7 +4289,6 @@ namespace libtorrent
void torrent::abort()
{
TORRENT_ASSERT(is_single_thread());
INVARIANT_CHECK;
if (m_abort) return;

View File

@ -70,8 +70,14 @@ namespace libtorrent
void torrent_handle::async_call(Fun f, Args&&... a) const
{
std::shared_ptr<torrent> t = m_torrent.lock();
TORRENT_ASSERT_PRECOND(t);
if (!t) return;
if (!t)
{
#ifndef BOOST_NO_EXCEPTIONS
throw_invalid_handle();
#else
std::terminate();
#endif
}
session_impl& ses = static_cast<session_impl&>(t->session());
ses.get_io_service().dispatch([=,&ses] ()
{
@ -98,8 +104,14 @@ namespace libtorrent
void torrent_handle::sync_call(Fun f, Args&&... a) const
{
std::shared_ptr<torrent> t = m_torrent.lock();
TORRENT_ASSERT_PRECOND(t);
if (!t) return;
if (!t)
{
#ifndef BOOST_NO_EXCEPTIONS
throw_invalid_handle();
#else
std::terminate();
#endif
}
session_impl& ses = static_cast<session_impl&>(t->session());
// this is the flag to indicate the call has completed
@ -130,9 +142,12 @@ namespace libtorrent
Ret torrent_handle::sync_call_ret(Ret def, Fun f, Args&&... a) const
{
std::shared_ptr<torrent> t = m_torrent.lock();
TORRENT_ASSERT_PRECOND(t);
Ret r = def;
#ifndef BOOST_NO_EXCEPTIONS
if (!t) throw_invalid_handle();
#else
if (!t) return r;
#endif
session_impl& ses = static_cast<session_impl&>(t->session());
// this is the flag to indicate the call has completed

View File

@ -60,14 +60,16 @@ namespace libtorrent
using namespace libtorrent::aux;
timeout_handler::timeout_handler(io_service& ios)
: m_completion_timeout(0)
, m_start_time(clock_type::now())
: m_start_time(clock_type::now())
, m_read_time(m_start_time)
, m_timeout(ios)
, m_read_timeout(0)
, m_abort(false)
{}
timeout_handler::~timeout_handler()
{
TORRENT_ASSERT(m_outstanding_timer_wait == 0);
}
void timeout_handler::set_timeout(int completion_timeout, int read_timeout)
{
m_completion_timeout = completion_timeout;
@ -92,6 +94,9 @@ namespace libtorrent
m_timeout.expires_at(m_read_time + seconds(timeout), ec);
m_timeout.async_wait(std::bind(
&timeout_handler::timeout_callback, shared_from_this(), _1));
#if TORRENT_USE_ASSERTS
++m_outstanding_timer_wait;
#endif
}
void timeout_handler::restart_read_timeout()
@ -110,6 +115,10 @@ namespace libtorrent
void timeout_handler::timeout_callback(error_code const& error)
{
COMPLETE_ASYNC("timeout_handler::timeout_callback");
#if TORRENT_USE_ASSERTS
TORRENT_ASSERT(m_outstanding_timer_wait > 0);
--m_outstanding_timer_wait;
#endif
if (m_abort) return;
time_point now = clock_type::now();
@ -139,6 +148,9 @@ namespace libtorrent
m_timeout.expires_at(m_read_time + seconds(timeout), ec);
m_timeout.async_wait(
std::bind(&timeout_handler::timeout_callback, shared_from_this(), _1));
#if TORRENT_USE_ASSERTS
++m_outstanding_timer_wait;
#endif
}
tracker_connection::tracker_connection(
@ -207,12 +219,10 @@ namespace libtorrent
#if !defined TORRENT_DISABLE_LOGGING || TORRENT_USE_ASSERTS
, m_ses(ses)
#endif
, m_abort(false)
{}
tracker_manager::~tracker_manager()
{
TORRENT_ASSERT(m_abort);
abort_all_requests(true);
}