From 137d2d63f5be96e24a74fe28a66794beb3e0b7fb Mon Sep 17 00:00:00 2001 From: Arvid Norberg Date: Thu, 23 Jul 2009 04:38:52 +0000 Subject: [PATCH] cleaned up policy. Moved logic from policy to torrent --- examples/client_test.cpp | 20 +- include/libtorrent/aux_/session_impl.hpp | 1 + include/libtorrent/peer_connection.hpp | 2 +- include/libtorrent/policy.hpp | 13 -- include/libtorrent/torrent.hpp | 8 +- src/peer_connection.cpp | 97 ++++++++-- src/policy.cpp | 233 ----------------------- src/session_impl.cpp | 9 + src/torrent.cpp | 119 +++++++++++- 9 files changed, 234 insertions(+), 268 deletions(-) diff --git a/examples/client_test.cpp b/examples/client_test.cpp index ea7ee481e..48a808183 100644 --- a/examples/client_test.cpp +++ b/examples/client_test.cpp @@ -552,6 +552,8 @@ void add_torrent(libtorrent::session& ses p.auto_managed = true; torrent_handle h = ses.add_torrent(p, ec); + h.connect_peer(tcp::endpoint(address::from_string("10.0.1.4"), 6881)); + handles.insert(std::make_pair( monitored_dir?std::string(torrent):std::string(), h)); @@ -767,6 +769,8 @@ int main(int argc, char* argv[]) //settings.announce_to_all_trackers = true; settings.optimize_hashing_for_speed = false; settings.disk_cache_algorithm = session_settings::largest_contiguous; + settings.max_queued_disk_bytes = 1; + int refresh_delay = 1; @@ -960,6 +964,17 @@ int main(int argc, char* argv[]) ses.set_settings(settings); + ip_filter filter; + filter.add_rule( + address_v4::from_string("0.0.0.0") + , address_v4::from_string("255.255.255.255") + , ip_filter::blocked); + filter.add_rule( + address_v4::from_string("10.0.1.4") + , address_v4::from_string("10.0.1.4") + , 0); + ses.set_ip_filter(filter); + // main loop std::vector peers; std::vector queue; @@ -1347,7 +1362,7 @@ int main(int argc, char* argv[]) out += str; snprintf(str, sizeof(str), "==== waste: %s fail: %s unchoked: %d / %d " - "bw queues: %8d (%d) | %8d (%d) cache: w: %"PRId64"%% r: %lld%% size: %s (%s) / %s ===\n" + "bw queues: %8d (%d) | %8d (%d) cache: w: %"PRId64"%% r: %lld%% size: %s (%s) / %s dq: %"PRId64" ===\n" , add_suffix(sess_stat.total_redundant_bytes).c_str() , add_suffix(sess_stat.total_failed_bytes).c_str() , sess_stat.num_unchoked, sess_stat.allowed_upload_slots @@ -1359,7 +1374,8 @@ int main(int argc, char* argv[]) , cs.blocks_read_hit * 100 / cs.blocks_read , add_suffix(cs.cache_size * 16 * 1024).c_str() , add_suffix(cs.read_cache_size * 16 * 1024).c_str() - , add_suffix(cs.total_used_buffers * 16 * 1024).c_str()); + , add_suffix(cs.total_used_buffers * 16 * 1024).c_str() + , cs.queued_bytes); out += str; snprintf(str, sizeof(str), "==== optimistic unchoke: %d unchoke counter: %d ====\n" diff --git a/include/libtorrent/aux_/session_impl.hpp b/include/libtorrent/aux_/session_impl.hpp index be16bcf40..0bfa85ecc 100644 --- a/include/libtorrent/aux_/session_impl.hpp +++ b/include/libtorrent/aux_/session_impl.hpp @@ -252,6 +252,7 @@ namespace libtorrent { return m_connections.size(); } void unchoke_peer(peer_connection& c); + void choke_peer(peer_connection& c); session_status status() const; void set_peer_id(peer_id const& id); diff --git a/include/libtorrent/peer_connection.hpp b/include/libtorrent/peer_connection.hpp index 0ca53b5d6..1c0174c24 100644 --- a/include/libtorrent/peer_connection.hpp +++ b/include/libtorrent/peer_connection.hpp @@ -415,7 +415,7 @@ namespace libtorrent // the following functions appends messages // to the send buffer - void send_choke(); + bool send_choke(); bool send_unchoke(); void send_interested(); void send_not_interested(); diff --git a/include/libtorrent/policy.hpp b/include/libtorrent/policy.hpp index 545aca538..4e7b98a37 100644 --- a/include/libtorrent/policy.hpp +++ b/include/libtorrent/policy.hpp @@ -93,15 +93,6 @@ namespace libtorrent // the peer has got at least one interesting piece void peer_is_interesting(peer_connection& c); - // the peer unchoked us - void unchoked(peer_connection& c); - - // the peer is interested in our pieces - void interested(peer_connection& c); - - // the peer is not interested in our pieces - void not_interested(peer_connection& c); - void ip_filter_updated(); void set_seed(policy::peer* p, bool s); @@ -355,10 +346,6 @@ namespace libtorrent torrent* m_torrent; - // free download we have got that hasn't - // been distributed yet. - size_type m_available_free_upload; - // The number of peers in our peer list // that are connect candidates. i.e. they're // not already connected and they have not diff --git a/include/libtorrent/torrent.hpp b/include/libtorrent/torrent.hpp index 9aeb38278..ae76b0a05 100644 --- a/include/libtorrent/torrent.hpp +++ b/include/libtorrent/torrent.hpp @@ -321,7 +321,7 @@ namespace libtorrent bool free_upload_slots() const { return m_num_uploads < m_max_uploads; } - void choke_peer(peer_connection& c); + bool choke_peer(peer_connection& c); bool unchoke_peer(peer_connection& c); // used by peer_connection to attach itself to a torrent @@ -621,6 +621,8 @@ namespace libtorrent // -------------------------------------------- // RESOURCE MANAGEMENT + void add_free_upload(int diff) { m_available_free_upload += diff; } + void set_peer_upload_limit(tcp::endpoint ip, int limit); void set_peer_download_limit(tcp::endpoint ip, int limit); @@ -909,6 +911,10 @@ namespace libtorrent // m_num_verified = m_verified.count() int m_num_verified; + // free download we have got that hasn't + // been distributed yet. + size_type m_available_free_upload; + // determines the storage state for this torrent. storage_mode_t m_storage_mode; diff --git a/src/peer_connection.cpp b/src/peer_connection.cpp index 59832318e..0f10fce64 100644 --- a/src/peer_connection.cpp +++ b/src/peer_connection.cpp @@ -1176,7 +1176,11 @@ namespace libtorrent m_peer_choked = false; if (is_disconnecting()) return; - t->get_policy().unchoked(*this); + if (is_interesting()) + { + request_a_block(*t, *this); + send_block_requests(); + } } // ----------------------------- @@ -1203,8 +1207,55 @@ namespace libtorrent #endif m_peer_interested = true; if (is_disconnecting()) return; - if (ignore_unchoke_slots()) send_unchoke(); - t->get_policy().interested(*this); + + if (is_choked()) + { + if (ignore_unchoke_slots()) + { + // if this peer is expempted from the choker + // just unchoke it immediately + send_unchoke(); + } + else if (m_ses.num_uploads() < m_ses.max_uploads() + && !ignore_unchoke_slots() + && (t->ratio() == 0 + || share_diff() >= size_type(-free_upload_amount) + || t->is_finished())) + { + // if the peer is choked and we have upload slots left, + // then unchoke it. Another condition that has to be met + // is that the torrent doesn't keep track of the individual + // up/down ratio for each peer (ratio == 0) or (if it does + // keep track) this particular connection isn't a leecher. + // If the peer was choked because it was leeching, don't + // unchoke it again. + // The exception to this last condition is if we're a seed. + // In that case we don't care if people are leeching, they + // can't pay for their downloads anyway. + m_ses.unchoke_peer(*this); + } +#if defined TORRENT_VERBOSE_LOGGING + else + { + std::string reason; + if (m_ses.num_uploads() >= m_ses.max_uploads()) + { + (*m_logger) << time_now_string() << " DID NOT UNCHOKE [ " + "the number of uploads (" << m_ses.num_uploads() << + ") is more than or equal to the limit (" + << m_ses.max_uploads() << ") ]\n"; + } + else + { + (*m_logger) << time_now_string() << " DID NOT UNCHOKE [ " + "the share ratio (" << share_diff() << + ") is <= free_upload_amount (" << int(free_upload_amount) + << ") and we are not seeding and the ratio (" + << t->ratio() << ")is non-zero"; + } + } +#endif + } } // ----------------------------- @@ -1234,20 +1285,37 @@ namespace libtorrent boost::shared_ptr t = m_torrent.lock(); TORRENT_ASSERT(t); - if (!is_choked() && !ignore_unchoke_slots()) + if (!is_choked()) { - if (m_peer_info && m_peer_info->optimistically_unchoked) + if (ignore_unchoke_slots()) { - m_peer_info->optimistically_unchoked = false; - m_ses.m_optimistic_unchoke_time_scaler = 0; + send_choke(); + } + else + { + if (m_peer_info && m_peer_info->optimistically_unchoked) + { + m_peer_info->optimistically_unchoked = false; + m_ses.m_optimistic_unchoke_time_scaler = 0; + } + m_ses.choke_peer(*this); + m_ses.m_unchoke_time_scaler = 0; } - t->choke_peer(*this); - --m_ses.m_num_unchoked; - m_ses.m_unchoke_time_scaler = 0; } - if (ignore_unchoke_slots()) send_unchoke(); - t->get_policy().not_interested(*this); + if (t->ratio() != 0.f) + { + TORRENT_ASSERT(share_diff() < (std::numeric_limits::max)()); + size_type diff = share_diff(); + if (diff > 0 && is_seed()) + { + // the peer is a seed and has sent + // us more than we have sent it back. + // consider the download as free download + t->add_free_upload(diff); + add_free_upload(-diff); + } + } if (t->super_seeding() && m_superseed_piece != -1) { @@ -2568,13 +2636,13 @@ namespace libtorrent write_cancel(r); } - void peer_connection::send_choke() + bool peer_connection::send_choke() { INVARIANT_CHECK; TORRENT_ASSERT(!m_peer_info || !m_peer_info->optimistically_unchoked); - if (m_choked) return; + if (m_choked) return false; write_choke(); m_choked = true; @@ -2610,6 +2678,7 @@ namespace libtorrent #endif i = m_requests.erase(i); } + return true; } bool peer_connection::send_unchoke() diff --git a/src/policy.cpp b/src/policy.cpp index 50d081951..36166255b 100644 --- a/src/policy.cpp +++ b/src/policy.cpp @@ -70,73 +70,6 @@ namespace { using namespace libtorrent; - size_type collect_free_download( - torrent::peer_iterator start - , torrent::peer_iterator end) - { - size_type accumulator = 0; - for (torrent::peer_iterator i = start; i != end; ++i) - { - // if the peer is interested in us, it means it may - // want to trade it's surplus uploads for downloads itself - // (and we should not consider it free). If the share diff is - // negative, there's no free download to get from this peer. - size_type diff = (*i)->share_diff(); - TORRENT_ASSERT(diff < (std::numeric_limits::max)()); - if ((*i)->is_peer_interested() || diff <= 0) - continue; - - TORRENT_ASSERT(diff > 0); - (*i)->add_free_upload(-diff); - accumulator += diff; - TORRENT_ASSERT(accumulator > 0); - } - TORRENT_ASSERT(accumulator >= 0); - return accumulator; - } - - - // returns the amount of free upload left after - // it has been distributed to the peers - size_type distribute_free_upload( - torrent::peer_iterator start - , torrent::peer_iterator end - , size_type free_upload) - { - if (free_upload <= 0) return free_upload; - int num_peers = 0; - size_type total_diff = 0; - for (torrent::peer_iterator i = start; i != end; ++i) - { - size_type d = (*i)->share_diff(); - TORRENT_ASSERT(d < (std::numeric_limits::max)()); - total_diff += d; - if (!(*i)->is_peer_interested() || (*i)->share_diff() >= 0) continue; - ++num_peers; - } - - if (num_peers == 0) return free_upload; - size_type upload_share; - if (total_diff >= 0) - { - upload_share = (std::min)(free_upload, total_diff) / num_peers; - } - else - { - upload_share = (free_upload + total_diff) / num_peers; - } - if (upload_share < 0) return free_upload; - - for (torrent::peer_iterator i = start; i != end; ++i) - { - peer_connection* p = *i; - if (!p->is_peer_interested() || p->share_diff() >= 0) continue; - p->add_free_upload(upload_share); - free_upload -= upload_share; - } - return free_upload; - } - struct match_peer_endpoint { match_peer_endpoint(tcp::endpoint const& ep) @@ -347,7 +280,6 @@ namespace libtorrent policy::policy(torrent* t) : m_round_robin(0) , m_torrent(t) - , m_available_free_upload(0) , m_num_connect_candidates(0) , m_num_seeds(0) , m_finished(false) @@ -639,37 +571,6 @@ namespace libtorrent { INVARIANT_CHECK; - // ------------------------ - // upload shift - // ------------------------ - - // this part will shift downloads - // from peers that are seeds and peers - // that don't want to download from us - // to peers that cannot upload anything - // to us. The shifting will make sure - // that the torrent's share ratio - // will be maintained - - // if the share ratio is 0 (infinite) - // m_available_free_upload isn't used - // because it isn't necessary - if (m_torrent->ratio() != 0.f) - { - // accumulate all the free download we get - // and add it to the available free upload - m_available_free_upload - += collect_free_download( - m_torrent->begin() - , m_torrent->end()); - - // distribute the free upload among the peers - m_available_free_upload = distribute_free_upload( - m_torrent->begin() - , m_torrent->end() - , m_available_free_upload); - } - erase_peers(); } @@ -1121,124 +1022,6 @@ namespace libtorrent return i; } - // this is called when we are unchoked by a peer - // i.e. a peer lets us know that we will receive - // data from now on - void policy::unchoked(peer_connection& c) - { - INVARIANT_CHECK; - if (c.is_interesting()) - { - request_a_block(*m_torrent, c); - c.send_block_requests(); - } - } - - // called when a peer is interested in us - void policy::interested(peer_connection& c) - { - INVARIANT_CHECK; - - TORRENT_ASSERT(std::find_if(m_peers.begin(), m_peers.end() - , bind(&peer::connection, _1) == &c) != m_peers.end()); - - aux::session_impl& ses = m_torrent->session(); - - // if the peer is choked and we have upload slots left, - // then unchoke it. Another condition that has to be met - // is that the torrent doesn't keep track of the individual - // up/down ratio for each peer (ratio == 0) or (if it does - // keep track) this particular connection isn't a leecher. - // If the peer was choked because it was leeching, don't - // unchoke it again. - // The exception to this last condition is if we're a seed. - // In that case we don't care if people are leeching, they - // can't pay for their downloads anyway. - if (c.is_choked() - && ses.num_uploads() < ses.max_uploads() - && !c.ignore_unchoke_slots() - && (m_torrent->ratio() == 0 - || c.share_diff() >= size_type(-free_upload_amount) - || m_torrent->is_finished())) - { - ses.unchoke_peer(c); - } -#if defined TORRENT_VERBOSE_LOGGING - else if (c.is_choked()) - { - std::string reason; - if (ses.num_uploads() >= ses.max_uploads()) - { - reason = "the number of uploads (" - + boost::lexical_cast(ses.num_uploads()) - + ") is more than or equal to the limit (" - + boost::lexical_cast(ses.max_uploads()) - + ")"; - } - else - { - reason = "the share ratio (" - + boost::lexical_cast(c.share_diff()) - + ") is <= free_upload_amount (" - + boost::lexical_cast(int(free_upload_amount)) - + ") and we are not seeding and the ratio (" - + boost::lexical_cast(m_torrent->ratio()) - + ")is non-zero"; - } - (*c.m_logger) << time_now_string() << " DID NOT UNCHOKE [ " << reason << " ]\n"; - } -#endif - } - - // called when a peer is no longer interested in us - void policy::not_interested(peer_connection& c) - { - INVARIANT_CHECK; - - if (m_torrent->ratio() != 0.f) - { - TORRENT_ASSERT(c.share_diff() < (std::numeric_limits::max)()); - size_type diff = c.share_diff(); - if (diff > 0 && c.is_seed()) - { - // the peer is a seed and has sent - // us more than we have sent it back. - // consider the download as free download - m_available_free_upload += diff; - c.add_free_upload(-diff); - } - } - } -/* - bool policy::unchoke_one_peer() - { - INVARIANT_CHECK; - - iterator p = find_unchoke_candidate(); - if (p == m_peers.end()) return false; - TORRENT_ASSERT(p->connection); - TORRENT_ASSERT(!p->connection->is_disconnecting()); - - TORRENT_ASSERT(p->connection->is_choked()); - p->connection->send_unchoke(); - p->last_optimistically_unchoked = time_now(); - ++m_num_unchoked; - return true; - } - - void policy::choke_one_peer() - { - INVARIANT_CHECK; - - iterator p = find_choke_candidate(); - if (p == m_peers.end()) return; - TORRENT_ASSERT(p->connection); - TORRENT_ASSERT(!p->connection->is_disconnecting()); - TORRENT_ASSERT(!p->connection->is_choked()); - p->connection->send_choke(); - --m_num_unchoked; - } -*/ bool policy::connect_one_peer(int session_time) { INVARIANT_CHECK; @@ -1306,20 +1089,6 @@ namespace libtorrent if (is_connect_candidate(*p, m_finished)) ++m_num_connect_candidates; - // if the share ratio is 0 (infinite), the - // m_available_free_upload isn't used, - // because it isn't necessary. - if (m_torrent->ratio() != 0.f) - { - TORRENT_ASSERT(c.associated_torrent().lock().get() == m_torrent); - TORRENT_ASSERT(c.share_diff() < (std::numeric_limits::max)()); - m_available_free_upload += c.share_diff(); - } - TORRENT_ASSERT(p->prev_amount_upload == 0); - TORRENT_ASSERT(p->prev_amount_download == 0); - p->prev_amount_download += c.statistics().total_payload_download(); - p->prev_amount_upload += c.statistics().total_payload_upload(); - // if we're already a seed, it's not as important // to keep all the possibly stale peers // if we're not a seed, but we have too many peers @@ -1427,8 +1196,6 @@ namespace libtorrent { continue; } - TORRENT_ASSERT(p.prev_amount_upload == 0); - TORRENT_ASSERT(p.prev_amount_download == 0); if (p.optimistically_unchoked) { TORRENT_ASSERT(p.connection); diff --git a/src/session_impl.cpp b/src/session_impl.cpp index 4edb7cf9d..35f3da98c 100644 --- a/src/session_impl.cpp +++ b/src/session_impl.cpp @@ -1136,6 +1136,15 @@ namespace aux { ++m_num_unchoked; } + void session_impl::choke_peer(peer_connection& c) + { + TORRENT_ASSERT(!c.ignore_unchoke_slots()); + torrent* t = c.associated_torrent().lock().get(); + TORRENT_ASSERT(t); + if (t->choke_peer(c)) + --m_num_unchoked; + } + int session_impl::next_port() { std::pair const& out_ports = m_settings.outgoing_ports; diff --git a/src/torrent.cpp b/src/torrent.cpp index 1a8e3c8bb..4932d2809 100644 --- a/src/torrent.cpp +++ b/src/torrent.cpp @@ -91,6 +91,71 @@ using libtorrent::aux::session_impl; namespace { + size_type collect_free_download( + torrent::peer_iterator start + , torrent::peer_iterator end) + { + size_type accumulator = 0; + for (torrent::peer_iterator i = start; i != end; ++i) + { + // if the peer is interested in us, it means it may + // want to trade it's surplus uploads for downloads itself + // (and we should not consider it free). If the share diff is + // negative, there's no free download to get from this peer. + size_type diff = (*i)->share_diff(); + TORRENT_ASSERT(diff < (std::numeric_limits::max)()); + if ((*i)->is_peer_interested() || diff <= 0) + continue; + + TORRENT_ASSERT(diff > 0); + (*i)->add_free_upload(-diff); + accumulator += diff; + TORRENT_ASSERT(accumulator > 0); + } + TORRENT_ASSERT(accumulator >= 0); + return accumulator; + } + + // returns the amount of free upload left after + // it has been distributed to the peers + size_type distribute_free_upload( + torrent::peer_iterator start + , torrent::peer_iterator end + , size_type free_upload) + { + if (free_upload <= 0) return free_upload; + int num_peers = 0; + size_type total_diff = 0; + for (torrent::peer_iterator i = start; i != end; ++i) + { + size_type d = (*i)->share_diff(); + TORRENT_ASSERT(d < (std::numeric_limits::max)()); + total_diff += d; + if (!(*i)->is_peer_interested() || (*i)->share_diff() >= 0) continue; + ++num_peers; + } + + if (num_peers == 0) return free_upload; + size_type upload_share; + if (total_diff >= 0) + { + upload_share = (std::min)(free_upload, total_diff) / num_peers; + } + else + { + upload_share = (free_upload + total_diff) / num_peers; + } + if (upload_share < 0) return free_upload; + + for (torrent::peer_iterator i = start; i != end; ++i) + { + peer_connection* p = *i; + if (!p->is_peer_interested() || p->share_diff() >= 0) continue; + p->add_free_upload(upload_share); + free_upload -= upload_share; + } + return free_upload; + } struct find_peer_by_ip { @@ -163,6 +228,7 @@ namespace libtorrent , m_net_interface(net_interface.address(), 0) , m_save_path(complete(p.save_path)) , m_num_verified(0) + , m_available_free_upload(0) , m_storage_mode(p.storage_mode) , m_state(torrent_status::checking_resume_data) , m_settings(ses.settings()) @@ -2753,15 +2819,16 @@ namespace libtorrent if (!m_trackers.empty()) start_announcing(); } - void torrent::choke_peer(peer_connection& c) + bool torrent::choke_peer(peer_connection& c) { INVARIANT_CHECK; TORRENT_ASSERT(!c.is_choked()); TORRENT_ASSERT(!c.ignore_unchoke_slots()); TORRENT_ASSERT(m_num_uploads > 0); - c.send_choke(); + if (!c.send_choke()) return false; --m_num_uploads; + return true; } bool torrent::unchoke_peer(peer_connection& c) @@ -2828,9 +2895,25 @@ namespace libtorrent m_ses.m_unchoke_time_scaler = 0; } - if (p->peer_info_struct() && p->peer_info_struct()->optimistically_unchoked) + policy::peer* pp = p->peer_info_struct(); + if (pp) { - m_ses.m_optimistic_unchoke_time_scaler = 0; + if (pp->optimistically_unchoked) + m_ses.m_optimistic_unchoke_time_scaler = 0; + + // if the share ratio is 0 (infinite), the + // m_available_free_upload isn't used, + // because it isn't necessary. + if (ratio() != 0.f) + { + TORRENT_ASSERT(p->associated_torrent().lock().get() == this); + TORRENT_ASSERT(p->share_diff() < (std::numeric_limits::max)()); + m_available_free_upload += p->share_diff(); + } + TORRENT_ASSERT(pp->prev_amount_upload == 0); + TORRENT_ASSERT(pp->prev_amount_download == 0); + pp->prev_amount_download += p->statistics().total_payload_download(); + pp->prev_amount_upload += p->statistics().total_payload_upload(); } m_policy.connection_closed(*p, m_ses.session_time()); @@ -5054,6 +5137,34 @@ namespace libtorrent for (int i = start; i < end; ++i) update_sparse_piece_prio(i, start, end); } + + // ------------------------ + // upload shift + // ------------------------ + + // this part will shift downloads + // from peers that are seeds and peers + // that don't want to download from us + // to peers that cannot upload anything + // to us. The shifting will make sure + // that the torrent's share ratio + // will be maintained + + // if the share ratio is 0 (infinite) + // m_available_free_upload isn't used + // because it isn't necessary + if (ratio() != 0.f) + { + // accumulate all the free download we get + // and add it to the available free upload + m_available_free_upload += collect_free_download( + this->begin(), this->end()); + + // distribute the free upload among the peers + m_available_free_upload = distribute_free_upload( + this->begin(), this->end(), m_available_free_upload); + } + m_policy.pulse(); }