From d03540e3b3e218543b376c3587831dfe29b86b28 Mon Sep 17 00:00:00 2001 From: arvidn Date: Sat, 21 Jan 2017 18:36:51 -0500 Subject: [PATCH] move disk_job_fence to its own file --- CMakeLists.txt | 1 + Jamfile | 1 + include/libtorrent/Makefile.am | 1 + include/libtorrent/aux_/disk_job_fence.hpp | 117 ++++++++++ include/libtorrent/storage.hpp | 68 +----- src/Makefile.am | 1 + src/disk_io_thread.cpp | 4 +- src/disk_job_fence.cpp | 237 +++++++++++++++++++++ src/storage.cpp | 214 ------------------- test/test_fence.cpp | 8 +- 10 files changed, 367 insertions(+), 285 deletions(-) create mode 100644 include/libtorrent/aux_/disk_job_fence.hpp create mode 100644 src/disk_job_fence.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index bc708b4d1..c945c5a0c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -106,6 +106,7 @@ set(sources file_pool lsd disk_io_job + disk_job_fence disk_job_pool disk_buffer_pool disk_io_thread diff --git a/Jamfile b/Jamfile index 5d68f5e7f..6a4f792e0 100644 --- a/Jamfile +++ b/Jamfile @@ -567,6 +567,7 @@ SOURCES = disk_io_job disk_io_thread disk_io_thread_pool + disk_job_fence disk_job_pool entry error_code diff --git a/include/libtorrent/Makefile.am b/include/libtorrent/Makefile.am index a95115d8e..9cdd2020e 100644 --- a/include/libtorrent/Makefile.am +++ b/include/libtorrent/Makefile.am @@ -163,6 +163,7 @@ nobase_include_HEADERS = \ aux_/cpuid.hpp \ aux_/disable_warnings_push.hpp \ aux_/disable_warnings_pop.hpp \ + aux_/disk_job_fence.hpp \ aux_/dev_random.hpp \ aux_/deque.hpp \ aux_/escape_string.hpp \ diff --git a/include/libtorrent/aux_/disk_job_fence.hpp b/include/libtorrent/aux_/disk_job_fence.hpp new file mode 100644 index 000000000..dc286db41 --- /dev/null +++ b/include/libtorrent/aux_/disk_job_fence.hpp @@ -0,0 +1,117 @@ +/* + +Copyright (c) 2003-2016, Arvid Norberg +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in + the documentation and/or other materials provided with the distribution. + * Neither the name of the author nor the names of its + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. + +*/ + +#ifndef TORRENT_DISK_JOB_FENCE_HPP_INCLUDE +#define TORRENT_DISK_JOB_FENCE_HPP_INCLUDE + +#include "libtorrent/config.hpp" +#include "libtorrent/tailqueue.hpp" + +#include +#include + +namespace libtorrent { + +struct disk_io_job; +struct counters; + +namespace aux +{ + // implements the disk I/O job fence used by the storage_interface + // to provide to the disk thread. Whenever a disk job needs + // exclusive access to the storage for that torrent, it raises + // the fence, blocking all new jobs, until there are no longer + // any outstanding jobs on the torrent, then the fence is lowered + // and it can be performed, along with the backlog of jobs that + // accrued while the fence was up + struct TORRENT_EXPORT disk_job_fence + { + disk_job_fence(); + ~disk_job_fence() + { + TORRENT_ASSERT(int(m_outstanding_jobs) == 0); + TORRENT_ASSERT(m_blocked_jobs.size() == 0); + } + + // returns one of the fence_* enums. + // if there are no outstanding jobs on the + // storage, fence_post_fence is returned, the flush job is expected + // to be discarded by the caller. + // fence_post_flush is returned if the fence job was blocked and queued, + // but the flush job should be posted (i.e. put on the job queue) + // fence_post_none if both the fence and the flush jobs were queued. + enum { fence_post_fence = 0, fence_post_flush = 1, fence_post_none = 2 }; + int raise_fence(disk_io_job* fence_job, disk_io_job* flush_job + , counters& cnt); + bool has_fence() const; + + // called whenever a job completes and is posted back to the + // main network thread. the tailqueue of jobs will have the + // backed-up jobs prepended to it in case this resulted in the + // fence being lowered. + int job_complete(disk_io_job* j, tailqueue& job_queue); + int num_outstanding_jobs() const { return m_outstanding_jobs; } + + // if there is a fence up, returns true and adds the job + // to the queue of blocked jobs + bool is_blocked(disk_io_job* j); + + // the number of blocked jobs + int num_blocked() const; + + private: + // when > 0, this storage is blocked for new async + // operations until all outstanding jobs have completed. + // at that point, the m_blocked_jobs are issued + // the count is the number of fence job currently in the queue + int m_has_fence = 0; + + // when there's a fence up, jobs are queued up in here + // until the fence is lowered + tailqueue m_blocked_jobs; + + // the number of disk_io_job objects there are, belonging + // to this torrent, currently pending, hanging off of + // cached_piece_entry objects. This is used to determine + // when the fence can be lowered + std::atomic m_outstanding_jobs{0}; + + // must be held when accessing m_has_fence and + // m_blocked_jobs + mutable std::mutex m_mutex; + }; + + +}} + +#endif + diff --git a/include/libtorrent/storage.hpp b/include/libtorrent/storage.hpp index e707e66a4..cc9691514 100644 --- a/include/libtorrent/storage.hpp +++ b/include/libtorrent/storage.hpp @@ -41,6 +41,7 @@ POSSIBILITY OF SUCH DAMAGE. #include #include +#include "libtorrent/aux_/disk_job_fence.hpp" #include "libtorrent/piece_picker.hpp" #include "libtorrent/peer_request.hpp" #include "libtorrent/file.hpp" @@ -55,7 +56,6 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/performance_counters.hpp" #include "libtorrent/span.hpp" #include "libtorrent/tailqueue.hpp" -#include "libtorrent/disk_io_job.hpp" #include "libtorrent/aux_/vector.hpp" // OVERVIEW @@ -150,70 +150,6 @@ namespace libtorrent struct disk_io_thread; - // implements the disk I/O job fence used by the storage_interface - // to provide to the disk thread. Whenever a disk job needs - // exclusive access to the storage for that torrent, it raises - // the fence, blocking all new jobs, until there are no longer - // any outstanding jobs on the torrent, then the fence is lowered - // and it can be performed, along with the backlog of jobs that - // accrued while the fence was up - struct TORRENT_EXPORT disk_job_fence - { - disk_job_fence(); - ~disk_job_fence() - { - TORRENT_ASSERT(int(m_outstanding_jobs) == 0); - TORRENT_ASSERT(m_blocked_jobs.size() == 0); - } - - // returns one of the fence_* enums. - // if there are no outstanding jobs on the - // storage, fence_post_fence is returned, the flush job is expected - // to be discarded by the caller. - // fence_post_flush is returned if the fence job was blocked and queued, - // but the flush job should be posted (i.e. put on the job queue) - // fence_post_none if both the fence and the flush jobs were queued. - enum { fence_post_fence = 0, fence_post_flush = 1, fence_post_none = 2 }; - int raise_fence(disk_io_job* fence_job, disk_io_job* flush_job - , counters& cnt); - bool has_fence() const; - - // called whenever a job completes and is posted back to the - // main network thread. the tailqueue of jobs will have the - // backed-up jobs prepended to it in case this resulted in the - // fence being lowered. - int job_complete(disk_io_job* j, tailqueue& job_queue); - int num_outstanding_jobs() const { return m_outstanding_jobs; } - - // if there is a fence up, returns true and adds the job - // to the queue of blocked jobs - bool is_blocked(disk_io_job* j); - - // the number of blocked jobs - int num_blocked() const; - - private: - // when > 0, this storage is blocked for new async - // operations until all outstanding jobs have completed. - // at that point, the m_blocked_jobs are issued - // the count is the number of fence job currently in the queue - int m_has_fence = 0; - - // when there's a fence up, jobs are queued up in here - // until the fence is lowered - tailqueue m_blocked_jobs; - - // the number of disk_io_job objects there are, belonging - // to this torrent, currently pending, hanging off of - // cached_piece_entry objects. This is used to determine - // when the fence can be lowered - std::atomic m_outstanding_jobs{0}; - - // must be held when accessing m_has_fence and - // m_blocked_jobs - mutable std::mutex m_mutex; - }; - // this class keeps track of which pieces, belonging to // a specific storage, are in the cache right now. It's // used for quickly being able to evict all pieces for a @@ -258,7 +194,7 @@ namespace libtorrent // struct TORRENT_EXPORT storage_interface : public std::enable_shared_from_this - , public disk_job_fence + , public aux::disk_job_fence , public storage_piece_set , boost::noncopyable { diff --git a/src/Makefile.am b/src/Makefile.am index 89bf4b71e..a35e961bf 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -67,6 +67,7 @@ libtorrent_rasterbar_la_SOURCES = \ disk_io_job.cpp \ disk_io_thread.cpp \ disk_io_thread_pool.cpp \ + disk_job_fence.cpp \ disk_job_pool.cpp \ entry.cpp \ enum_net.cpp \ diff --git a/src/disk_io_thread.cpp b/src/disk_io_thread.cpp index 0bccdbd42..4524659fc 100644 --- a/src/disk_io_thread.cpp +++ b/src/disk_io_thread.cpp @@ -2850,7 +2850,7 @@ namespace libtorrent fj->storage = j->storage; int ret = j->storage->raise_fence(j, fj, m_stats_counters); - if (ret == disk_job_fence::fence_post_fence) + if (ret == aux::disk_job_fence::fence_post_fence) { std::unique_lock l(m_job_mutex); TORRENT_ASSERT((j->flags & disk_io_job::in_progress) || !j->storage); @@ -2867,7 +2867,7 @@ namespace libtorrent return; } - if (ret == disk_job_fence::fence_post_flush) + if (ret == aux::disk_job_fence::fence_post_flush) { // now, we have to make sure that all outstanding jobs on this // storage actually get flushed, in order for the fence job to diff --git a/src/disk_job_fence.cpp b/src/disk_job_fence.cpp new file mode 100644 index 000000000..4f57ef855 --- /dev/null +++ b/src/disk_job_fence.cpp @@ -0,0 +1,237 @@ +/* + +Copyright (c) 2003-2016, Arvid Norberg, Daniel Wallin +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in + the documentation and/or other materials provided with the distribution. + * Neither the name of the author nor the names of its + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. + +*/ + + +#include "libtorrent/aux_/disk_job_fence.hpp" +#include "libtorrent/disk_io_job.hpp" +#include "libtorrent/performance_counters.hpp" + +#define DEBUG_STORAGE 0 + +#if DEBUG_STORAGE +#define DLOG(...) std::fprintf(__VA_ARGS__) +#else +#define DLOG(...) do {} while (false) +#endif + +namespace libtorrent { namespace aux +{ + disk_job_fence::disk_job_fence() {} + + int disk_job_fence::job_complete(disk_io_job* j, tailqueue& jobs) + { + std::lock_guard l(m_mutex); + + TORRENT_ASSERT(j->flags & disk_io_job::in_progress); + j->flags &= ~disk_io_job::in_progress; + + TORRENT_ASSERT(m_outstanding_jobs > 0); + --m_outstanding_jobs; + if (j->flags & disk_io_job::fence) + { + // a fence job just completed. Make sure the fence logic + // works by asserting m_outstanding_jobs is in fact 0 now + TORRENT_ASSERT(m_outstanding_jobs == 0); + + // the fence can now be lowered + --m_has_fence; + + // now we need to post all jobs that have been queued up + // while this fence was up. However, if there's another fence + // in the queue, stop there and raise the fence again + int ret = 0; + while (m_blocked_jobs.size()) + { + disk_io_job *bj = static_cast(m_blocked_jobs.pop_front()); + if (bj->flags & disk_io_job::fence) + { + // we encountered another fence. We cannot post anymore + // jobs from the blocked jobs queue. We have to go back + // into a raised fence mode and wait for all current jobs + // to complete. The exception is that if there are no jobs + // executing currently, we should add the fence job. + if (m_outstanding_jobs == 0 && jobs.empty()) + { + TORRENT_ASSERT((bj->flags & disk_io_job::in_progress) == 0); + bj->flags |= disk_io_job::in_progress; + ++m_outstanding_jobs; + ++ret; +#if TORRENT_USE_ASSERTS + TORRENT_ASSERT(bj->blocked); + bj->blocked = false; +#endif + jobs.push_back(bj); + } + else + { + // put the fence job back in the blocked queue + m_blocked_jobs.push_front(bj); + } + return ret; + } + TORRENT_ASSERT((bj->flags & disk_io_job::in_progress) == 0); + bj->flags |= disk_io_job::in_progress; + + ++m_outstanding_jobs; + ++ret; +#if TORRENT_USE_ASSERTS + TORRENT_ASSERT(bj->blocked); + bj->blocked = false; +#endif + jobs.push_back(bj); + } + return ret; + } + + // there are still outstanding jobs, even if we have a + // fence, it's not time to lower it yet + // also, if we don't have a fence, we're done + if (m_outstanding_jobs > 0 || m_has_fence == 0) return 0; + + // there's a fence raised, and no outstanding operations. + // it means we can execute the fence job right now. + TORRENT_ASSERT(m_blocked_jobs.size() > 0); + + // this is the fence job + disk_io_job *bj = static_cast(m_blocked_jobs.pop_front()); + TORRENT_ASSERT(bj->flags & disk_io_job::fence); + + TORRENT_ASSERT((bj->flags & disk_io_job::in_progress) == 0); + bj->flags |= disk_io_job::in_progress; + + ++m_outstanding_jobs; +#if TORRENT_USE_ASSERTS + TORRENT_ASSERT(bj->blocked); + bj->blocked = false; +#endif + // prioritize fence jobs since they're blocking other jobs + jobs.push_front(bj); + return 1; + } + + bool disk_job_fence::is_blocked(disk_io_job* j) + { + std::lock_guard l(m_mutex); + DLOG(stderr, "[%p] is_blocked: fence: %d num_outstanding: %d\n" + , static_cast(this), m_has_fence, int(m_outstanding_jobs)); + + // if this is the job that raised the fence, don't block it + // ignore fence can only ignore one fence. If there are several, + // this job still needs to get queued up + if (m_has_fence == 0) + { + TORRENT_ASSERT((j->flags & disk_io_job::in_progress) == 0); + j->flags |= disk_io_job::in_progress; + ++m_outstanding_jobs; + return false; + } + + m_blocked_jobs.push_back(j); + +#if TORRENT_USE_ASSERTS + TORRENT_ASSERT(j->blocked == false); + j->blocked = true; +#endif + + return true; + } + + bool disk_job_fence::has_fence() const + { + std::lock_guard l(m_mutex); + return m_has_fence != 0; + } + + int disk_job_fence::num_blocked() const + { + std::lock_guard l(m_mutex); + return m_blocked_jobs.size(); + } + + // j is the fence job. It must have exclusive access to the storage + // fj is the flush job. If the job j is queued, we need to issue + // this job + int disk_job_fence::raise_fence(disk_io_job* j, disk_io_job* fj + , counters& cnt) + { + TORRENT_ASSERT((j->flags & disk_io_job::fence) == 0); + j->flags |= disk_io_job::fence; + + std::lock_guard l(m_mutex); + + DLOG(stderr, "[%p] raise_fence: fence: %d num_outstanding: %d\n" + , static_cast(this), m_has_fence, int(m_outstanding_jobs)); + + if (m_has_fence == 0 && m_outstanding_jobs == 0) + { + ++m_has_fence; + DLOG(stderr, "[%p] raise_fence: need posting\n" + , static_cast(this)); + + // the job j is expected to be put on the job queue + // after this, without being passed through is_blocked() + // that's why we're accounting for it here + + // fj is expected to be discarded by the caller + j->flags |= disk_io_job::in_progress; + ++m_outstanding_jobs; + return fence_post_fence; + } + + ++m_has_fence; + if (m_has_fence > 1) + { +#if TORRENT_USE_ASSERTS + TORRENT_ASSERT(fj->blocked == false); + fj->blocked = true; +#endif + m_blocked_jobs.push_back(fj); + cnt.inc_stats_counter(counters::blocked_disk_jobs); + } + else + { + // in this case, fj is expected to be put on the job queue + fj->flags |= disk_io_job::in_progress; + ++m_outstanding_jobs; + } +#if TORRENT_USE_ASSERTS + TORRENT_ASSERT(j->blocked == false); + j->blocked = true; +#endif + m_blocked_jobs.push_back(j); + cnt.inc_stats_counter(counters::blocked_disk_jobs); + + return m_has_fence > 1 ? fence_post_none : fence_post_flush; + } + +}} + diff --git a/src/storage.cpp b/src/storage.cpp index 5f8255383..1e14091eb 100644 --- a/src/storage.cpp +++ b/src/storage.cpp @@ -77,39 +77,14 @@ POSSIBILITY OF SUCH DAMAGE. //#define TORRENT_PARTIAL_HASH_LOG -#define DEBUG_STORAGE 0 #define DEBUG_DELETE_FILES 0 -#if __cplusplus >= 201103L || defined __clang__ - -#if DEBUG_STORAGE -#define DLOG(...) std::fprintf(__VA_ARGS__) -#else -#define DLOG(...) do {} while (false) -#endif - #if DEBUG_DELETE_FILES #define DFLOG(...) std::fprintf(__VA_ARGS__) #else #define DFLOG(...) do {} while (false) #endif -#else - -#if DEBUG_STORAGE -#define DLOG fprintf -#else -#define DLOG TORRENT_WHILE_0 fprintf -#endif - -#if DEBUG_DELETE_FILES -#define DFLOG fprintf -#else -#define DFLOG TORRENT_WHILE_0 fprintf -#endif - -#endif // cplusplus - namespace libtorrent { void clear_bufs(span bufs) @@ -898,193 +873,4 @@ namespace libtorrent #endif } - // ====== disk_job_fence implementation ======== - - disk_job_fence::disk_job_fence() {} - - int disk_job_fence::job_complete(disk_io_job* j, tailqueue& jobs) - { - std::lock_guard l(m_mutex); - - TORRENT_ASSERT(j->flags & disk_io_job::in_progress); - j->flags &= ~disk_io_job::in_progress; - - TORRENT_ASSERT(m_outstanding_jobs > 0); - --m_outstanding_jobs; - if (j->flags & disk_io_job::fence) - { - // a fence job just completed. Make sure the fence logic - // works by asserting m_outstanding_jobs is in fact 0 now - TORRENT_ASSERT(m_outstanding_jobs == 0); - - // the fence can now be lowered - --m_has_fence; - - // now we need to post all jobs that have been queued up - // while this fence was up. However, if there's another fence - // in the queue, stop there and raise the fence again - int ret = 0; - while (m_blocked_jobs.size()) - { - disk_io_job *bj = static_cast(m_blocked_jobs.pop_front()); - if (bj->flags & disk_io_job::fence) - { - // we encountered another fence. We cannot post anymore - // jobs from the blocked jobs queue. We have to go back - // into a raised fence mode and wait for all current jobs - // to complete. The exception is that if there are no jobs - // executing currently, we should add the fence job. - if (m_outstanding_jobs == 0 && jobs.empty()) - { - TORRENT_ASSERT((bj->flags & disk_io_job::in_progress) == 0); - bj->flags |= disk_io_job::in_progress; - ++m_outstanding_jobs; - ++ret; -#if TORRENT_USE_ASSERTS - TORRENT_ASSERT(bj->blocked); - bj->blocked = false; -#endif - jobs.push_back(bj); - } - else - { - // put the fence job back in the blocked queue - m_blocked_jobs.push_front(bj); - } - return ret; - } - TORRENT_ASSERT((bj->flags & disk_io_job::in_progress) == 0); - bj->flags |= disk_io_job::in_progress; - - ++m_outstanding_jobs; - ++ret; -#if TORRENT_USE_ASSERTS - TORRENT_ASSERT(bj->blocked); - bj->blocked = false; -#endif - jobs.push_back(bj); - } - return ret; - } - - // there are still outstanding jobs, even if we have a - // fence, it's not time to lower it yet - // also, if we don't have a fence, we're done - if (m_outstanding_jobs > 0 || m_has_fence == 0) return 0; - - // there's a fence raised, and no outstanding operations. - // it means we can execute the fence job right now. - TORRENT_ASSERT(m_blocked_jobs.size() > 0); - - // this is the fence job - disk_io_job *bj = static_cast(m_blocked_jobs.pop_front()); - TORRENT_ASSERT(bj->flags & disk_io_job::fence); - - TORRENT_ASSERT((bj->flags & disk_io_job::in_progress) == 0); - bj->flags |= disk_io_job::in_progress; - - ++m_outstanding_jobs; -#if TORRENT_USE_ASSERTS - TORRENT_ASSERT(bj->blocked); - bj->blocked = false; -#endif - // prioritize fence jobs since they're blocking other jobs - jobs.push_front(bj); - return 1; - } - - bool disk_job_fence::is_blocked(disk_io_job* j) - { - std::lock_guard l(m_mutex); - DLOG(stderr, "[%p] is_blocked: fence: %d num_outstanding: %d\n" - , static_cast(this), m_has_fence, int(m_outstanding_jobs)); - - // if this is the job that raised the fence, don't block it - // ignore fence can only ignore one fence. If there are several, - // this job still needs to get queued up - if (m_has_fence == 0) - { - TORRENT_ASSERT((j->flags & disk_io_job::in_progress) == 0); - j->flags |= disk_io_job::in_progress; - ++m_outstanding_jobs; - return false; - } - - m_blocked_jobs.push_back(j); - -#if TORRENT_USE_ASSERTS - TORRENT_ASSERT(j->blocked == false); - j->blocked = true; -#endif - - return true; - } - - bool disk_job_fence::has_fence() const - { - std::lock_guard l(m_mutex); - return m_has_fence != 0; - } - - int disk_job_fence::num_blocked() const - { - std::lock_guard l(m_mutex); - return m_blocked_jobs.size(); - } - - // j is the fence job. It must have exclusive access to the storage - // fj is the flush job. If the job j is queued, we need to issue - // this job - int disk_job_fence::raise_fence(disk_io_job* j, disk_io_job* fj - , counters& cnt) - { - TORRENT_ASSERT((j->flags & disk_io_job::fence) == 0); - j->flags |= disk_io_job::fence; - - std::lock_guard l(m_mutex); - - DLOG(stderr, "[%p] raise_fence: fence: %d num_outstanding: %d\n" - , static_cast(this), m_has_fence, int(m_outstanding_jobs)); - - if (m_has_fence == 0 && m_outstanding_jobs == 0) - { - ++m_has_fence; - DLOG(stderr, "[%p] raise_fence: need posting\n" - , static_cast(this)); - - // the job j is expected to be put on the job queue - // after this, without being passed through is_blocked() - // that's why we're accounting for it here - - // fj is expected to be discarded by the caller - j->flags |= disk_io_job::in_progress; - ++m_outstanding_jobs; - return fence_post_fence; - } - - ++m_has_fence; - if (m_has_fence > 1) - { -#if TORRENT_USE_ASSERTS - TORRENT_ASSERT(fj->blocked == false); - fj->blocked = true; -#endif - m_blocked_jobs.push_back(fj); - cnt.inc_stats_counter(counters::blocked_disk_jobs); - } - else - { - // in this case, fj is expected to be put on the job queue - fj->flags |= disk_io_job::in_progress; - ++m_outstanding_jobs; - } -#if TORRENT_USE_ASSERTS - TORRENT_ASSERT(j->blocked == false); - j->blocked = true; -#endif - m_blocked_jobs.push_back(j); - cnt.inc_stats_counter(counters::blocked_disk_jobs); - - return m_has_fence > 1 ? fence_post_none : fence_post_flush; - } } // namespace libtorrent diff --git a/test/test_fence.cpp b/test/test_fence.cpp index 4f0f9998e..f1f4ce80d 100644 --- a/test/test_fence.cpp +++ b/test/test_fence.cpp @@ -6,9 +6,11 @@ using namespace libtorrent; +using libtorrent::aux::disk_job_fence; + TORRENT_TEST(empty_fence) { - libtorrent::disk_job_fence fence; + disk_job_fence fence; counters cnt; disk_io_job test_job[10]; @@ -46,7 +48,7 @@ TORRENT_TEST(empty_fence) TORRENT_TEST(job_fence) { counters cnt; - libtorrent::disk_job_fence fence; + disk_job_fence fence; disk_io_job test_job[10]; @@ -120,7 +122,7 @@ TORRENT_TEST(job_fence) TORRENT_TEST(double_fence) { counters cnt; - libtorrent::disk_job_fence fence; + disk_job_fence fence; disk_io_job test_job[10];