cleanup. changed the connection map into a set of peer_connections. moved the policy object into the torrent (as opposed to being a pointer). Fixes issues with multiple peers on the same IP. Reduces some lookups.

This commit is contained in:
Arvid Norberg 2007-10-31 09:48:20 +00:00
parent 6aeefaf513
commit 1a280e31fa
12 changed files with 237 additions and 291 deletions

View File

@ -178,9 +178,7 @@ namespace libtorrent
#endif
friend struct checker_impl;
friend class invariant_access;
typedef std::map<boost::shared_ptr<socket_type>
, boost::intrusive_ptr<peer_connection> >
connection_map;
typedef std::set<boost::intrusive_ptr<peer_connection> > connection_map;
typedef std::map<sha1_hash, boost::shared_ptr<torrent> > torrent_map;
session_impl(
@ -190,7 +188,8 @@ namespace libtorrent
~session_impl();
#ifndef TORRENT_DISABLE_EXTENSIONS
void add_extension(boost::function<boost::shared_ptr<torrent_plugin>(torrent*, void*)> ext);
void add_extension(boost::function<boost::shared_ptr<torrent_plugin>(
torrent*, void*)> ext);
#endif
void operator()();
@ -214,7 +213,7 @@ namespace libtorrent
peer_id const& get_peer_id() const { return m_peer_id; }
void close_connection(boost::intrusive_ptr<peer_connection> const& p);
void connection_failed(boost::shared_ptr<socket_type> const& s
void connection_failed(boost::intrusive_ptr<peer_connection> const& p
, tcp::endpoint const& a, char const* message);
void set_settings(session_settings const& s);
@ -342,8 +341,8 @@ namespace libtorrent
for (connection_map::const_iterator i = m_connections.begin()
, end(m_connections.end()); i != end; ++i)
{
send_buffer_capacity += i->second->send_buffer_capacity();
used_send_buffer += i->second->send_buffer_size();
send_buffer_capacity += (*i)->send_buffer_capacity();
used_send_buffer += (*i)->send_buffer_size();
}
TORRENT_ASSERT(send_buffer_capacity >= used_send_buffer);
m_buffer_usage_logger << log_time() << " send_buffer_size: " << send_buffer_capacity << std::endl;
@ -386,7 +385,9 @@ namespace libtorrent
// handles disk io requests asynchronously
// peers have pointers into the disk buffer
// pool, and must be destructed before this
// object.
// object. The disk thread relies on the file
// pool object, and must be destructed before
// m_files.
disk_io_thread m_disk_thread;
// this is where all active sockets are stored.

View File

@ -85,6 +85,7 @@ namespace libtorrent
// the tracker, pex, lsd or dht.
policy::peer* peer_from_tracker(const tcp::endpoint& remote, const peer_id& pid
, int source, char flags);
void update_peer_port(int port, policy::peer* p, int src);
// called when an incoming connection is accepted
void new_connection(peer_connection& c);
@ -219,6 +220,8 @@ namespace libtorrent
typedef std::multimap<address, peer>::const_iterator const_iterator;
iterator begin_peer() { return m_peers.begin(); }
iterator end_peer() { return m_peers.end(); }
const_iterator begin_peer() const { return m_peers.begin(); }
const_iterator end_peer() const { return m_peers.end(); }
bool connect_one_peer();
bool disconnect_one_peer();

View File

@ -171,7 +171,7 @@ namespace libtorrent
boost::tuples::tuple<size_type, size_type> bytes_done() const;
size_type quantized_bytes_done() const;
void ip_filter_updated() { m_policy->ip_filter_updated(); }
void ip_filter_updated() { m_policy.ip_filter_updated(); }
void pause();
void resume();
@ -204,7 +204,7 @@ namespace libtorrent
tcp::endpoint const& get_interface() const { return m_net_interface; }
void connect_to_url_seed(std::string const& url);
peer_connection* connect_to_peer(policy::peer* peerinfo);
bool connect_to_peer(policy::peer* peerinfo) throw();
void set_ratio(float ratio)
{ TORRENT_ASSERT(ratio >= 0.0f); m_ratio = ratio; }
@ -276,31 +276,12 @@ namespace libtorrent
bool want_more_peers() const;
bool try_connect_peer();
peer_connection* connection_for(tcp::endpoint const& a)
{
peer_iterator i = m_connections.find(a);
if (i == m_connections.end()) return 0;
return i->second;
}
#ifndef NDEBUG
void connection_for(address const& a, std::vector<peer_connection*>& pc)
{
for (peer_iterator i = m_connections.begin()
, end(m_connections.end()); i != end; ++i)
{
if (i->first.address() == a) pc.push_back(i->second);
}
return;
}
#endif
// the number of peers that belong to this torrent
int num_peers() const { return (int)m_connections.size(); }
int num_seeds() const;
typedef std::map<tcp::endpoint, peer_connection*>::iterator peer_iterator;
typedef std::map<tcp::endpoint, peer_connection*>::const_iterator const_peer_iterator;
typedef std::set<peer_connection*>::iterator peer_iterator;
typedef std::set<peer_connection*>::const_iterator const_peer_iterator;
const_peer_iterator begin() const { return m_connections.begin(); }
const_peer_iterator end() const { return m_connections.end(); }
@ -495,11 +476,7 @@ namespace libtorrent
{
return m_picker.get() != 0;
}
policy& get_policy()
{
TORRENT_ASSERT(m_policy);
return *m_policy;
}
policy& get_policy() { return m_policy; }
piece_manager& filesystem();
torrent_info const& torrent_file() const
{ return *m_torrent_file; }
@ -550,7 +527,7 @@ namespace libtorrent
// to the checker thread for initial checking
// of the storage.
void set_metadata(entry const&);
private:
void on_files_deleted(int ret, disk_io_job const& j);
@ -633,7 +610,7 @@ namespace libtorrent
#ifndef NDEBUG
public:
#endif
std::map<tcp::endpoint, peer_connection*> m_connections;
std::set<peer_connection*> m_connections;
#ifndef NDEBUG
private:
#endif
@ -689,8 +666,6 @@ namespace libtorrent
// -----------------------------
boost::shared_ptr<policy> m_policy;
// a back reference to the session
// this torrent belongs to.
aux::session_impl& m_ses;
@ -802,6 +777,8 @@ namespace libtorrent
// total_done - m_initial_done <= total_payload_download
size_type m_initial_done;
#endif
policy m_policy;
};
inline ptime torrent::next_announce() const

View File

@ -1208,11 +1208,11 @@ namespace libtorrent
// there is supposed to be a remote listen port
if (entry* listen_port = root.find_key("p"))
{
if (listen_port->type() == entry::int_t)
if (listen_port->type() == entry::int_t
&& peer_info_struct() != 0)
{
tcp::endpoint adr(remote().address()
, (unsigned short)listen_port->integer());
t->get_policy().peer_from_tracker(adr, pid(), peer_info::incoming, 0);
t->get_policy().update_peer_port(listen_port->integer()
, peer_info_struct(), peer_info::incoming);
}
}
// there should be a version too

View File

@ -344,6 +344,7 @@ namespace libtorrent
#ifndef NDEBUG
m_current.storage = 0;
m_current.callback.clear();
#endif
if (j.buffer && free_buffer)

View File

@ -504,11 +504,10 @@ namespace libtorrent { namespace
// extension and that has metadata
int peers = 0;
#ifndef TORRENT_DISABLE_EXTENSIONS
typedef std::map<tcp::endpoint, peer_connection*> conn_map;
for (conn_map::iterator i = m_torrent.begin()
for (torrent::peer_iterator i = m_torrent.begin()
, end(m_torrent.end()); i != end; ++i)
{
bt_peer_connection* c = dynamic_cast<bt_peer_connection*>(i->second);
bt_peer_connection* c = dynamic_cast<bt_peer_connection*>(*i);
if (c == 0) continue;
metadata_peer_plugin* p
= c->supports_extension<metadata_peer_plugin>();

View File

@ -390,7 +390,6 @@ namespace libtorrent
TORRENT_ASSERT(m_peer_info->connection == 0);
boost::shared_ptr<torrent> t = m_torrent.lock();
if (t) TORRENT_ASSERT(t->connection_for(remote()) != this);
#endif
}
@ -1394,7 +1393,7 @@ namespace libtorrent
if (!t)
{
m_ses.connection_failed(m_socket, remote(), j.str.c_str());
m_ses.connection_failed(self(), remote(), j.str.c_str());
return;
}
@ -1944,7 +1943,7 @@ namespace libtorrent
(*m_ses.m_logger) << "CONNECTION TIMED OUT: " << m_remote.address().to_string()
<< "\n";
#endif
m_ses.connection_failed(m_socket, m_remote, "timed out");
m_ses.connection_failed(self(), m_remote, "timed out");
}
void peer_connection::disconnect()
@ -2290,7 +2289,7 @@ namespace libtorrent
#ifdef TORRENT_VERBOSE_LOGGING
(*m_logger) << "**ERROR**: " << e.what() << "\n";
#endif
m_ses.connection_failed(m_socket, remote(), e.what());
m_ses.connection_failed(self(), remote(), e.what());
}
}
@ -2340,7 +2339,7 @@ namespace libtorrent
boost::shared_ptr<torrent> t = m_torrent.lock();
if (!t)
{
m_ses.connection_failed(m_socket, remote(), j.str.c_str());
m_ses.connection_failed(self(), remote(), j.str.c_str());
return;
}
@ -2676,7 +2675,7 @@ namespace libtorrent
boost::shared_ptr<torrent> t = m_torrent.lock();
if (!t)
{
m_ses.connection_failed(m_socket, remote(), e.what());
m_ses.connection_failed(self(), remote(), e.what());
return;
}
@ -2691,14 +2690,14 @@ namespace libtorrent
catch (std::exception& e)
{
session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
m_ses.connection_failed(m_socket, remote(), e.what());
m_ses.connection_failed(self(), remote(), e.what());
}
catch (...)
{
// all exceptions should derive from std::exception
TORRENT_ASSERT(false);
session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
m_ses.connection_failed(m_socket, remote(), "connection failed for unknown reason");
m_ses.connection_failed(self(), remote(), "connection failed for unknown reason");
}
bool peer_connection::can_write() const
@ -2779,7 +2778,7 @@ namespace libtorrent
(*m_ses.m_logger) << "CONNECTION FAILED: " << m_remote.address().to_string()
<< ": " << e.message() << "\n";
#endif
m_ses.connection_failed(m_socket, m_remote, e.message().c_str());
m_ses.connection_failed(self(), m_remote, e.message().c_str());
return;
}
@ -2799,14 +2798,14 @@ namespace libtorrent
catch (std::exception& ex)
{
session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
m_ses.connection_failed(m_socket, remote(), ex.what());
m_ses.connection_failed(self(), remote(), ex.what());
}
catch (...)
{
// all exceptions should derive from std::exception
TORRENT_ASSERT(false);
session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
m_ses.connection_failed(m_socket, remote(), "connection failed for unkown reason");
m_ses.connection_failed(self(), remote(), "connection failed for unkown reason");
}
// --------------------------
@ -2856,14 +2855,14 @@ namespace libtorrent
catch (std::exception& e)
{
session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
m_ses.connection_failed(m_socket, remote(), e.what());
m_ses.connection_failed(self(), remote(), e.what());
}
catch (...)
{
// all exceptions should derive from std::exception
TORRENT_ASSERT(false);
session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
m_ses.connection_failed(m_socket, remote(), "connection failed for unknown reason");
m_ses.connection_failed(self(), remote(), "connection failed for unknown reason");
}
@ -2880,26 +2879,18 @@ namespace libtorrent
}
boost::shared_ptr<torrent> t = m_torrent.lock();
if (!t)
if (!t) return;
if (m_peer_info)
{
typedef session_impl::torrent_map torrent_map;
torrent_map& m = m_ses.m_torrents;
for (torrent_map::iterator i = m.begin(), end(m.end()); i != end; ++i)
policy::const_iterator i;
for (i = t->get_policy().begin_peer();
i != t->get_policy().end_peer(); ++i)
{
torrent& t = *i->second;
TORRENT_ASSERT(t.connection_for(m_remote) != this);
if (&i->second == m_peer_info) break;
}
return;
TORRENT_ASSERT(i != t->get_policy().end_peer());
}
TORRENT_ASSERT(t->connection_for(remote()) != 0 || m_in_constructor);
if (!m_in_constructor && t->connection_for(remote()) != this
&& !m_ses.settings().allow_multiple_connections_per_ip)
{
TORRENT_ASSERT(false);
}
if (t->has_picker() && !t->is_aborted())
{
// make sure that pieces that have completed the download

View File

@ -82,13 +82,13 @@ namespace
// want to trade it's surplus uploads for downloads itself
// (and we should not consider it free). If the share diff is
// negative, there's no free download to get from this peer.
size_type diff = i->second->share_diff();
size_type diff = (*i)->share_diff();
TORRENT_ASSERT(diff < (std::numeric_limits<size_type>::max)());
if (i->second->is_peer_interested() || diff <= 0)
if ((*i)->is_peer_interested() || diff <= 0)
continue;
TORRENT_ASSERT(diff > 0);
i->second->add_free_upload(-diff);
(*i)->add_free_upload(-diff);
accumulator += diff;
TORRENT_ASSERT(accumulator > 0);
}
@ -109,10 +109,10 @@ namespace
size_type total_diff = 0;
for (torrent::peer_iterator i = start; i != end; ++i)
{
size_type d = i->second->share_diff();
size_type d = (*i)->share_diff();
TORRENT_ASSERT(d < (std::numeric_limits<size_type>::max)());
total_diff += d;
if (!i->second->is_peer_interested() || i->second->share_diff() >= 0) continue;
if (!(*i)->is_peer_interested() || (*i)->share_diff() >= 0) continue;
++num_peers;
}
@ -130,7 +130,7 @@ namespace
for (torrent::peer_iterator i = start; i != end; ++i)
{
peer_connection* p = i->second;
peer_connection* p = *i;
if (!p->is_peer_interested() || p->share_diff() >= 0) continue;
p->add_free_upload(upload_share);
free_upload -= upload_share;
@ -904,7 +904,7 @@ namespace libtorrent
{
TORRENT_ASSERT(!c.is_local());
INVARIANT_CHECK;
// INVARIANT_CHECK;
// if the connection comes from the tracker,
// it's probably just a NAT-check. Ignore the
@ -932,10 +932,11 @@ namespace libtorrent
if (m_torrent->settings().allow_multiple_connections_per_ip)
{
i = std::find_if(
m_peers.begin()
, m_peers.end()
, match_peer_connection(c));
tcp::endpoint remote = c.remote();
std::pair<iterator, iterator> range = m_peers.equal_range(remote.address());
i = std::find_if(range.first, range.second, match_peer_endpoint(remote));
if (i == range.second) i = m_peers.end();
}
else
{
@ -977,8 +978,6 @@ namespace libtorrent
i = m_peers.insert(std::make_pair(c.remote().address(), p));
}
TORRENT_ASSERT(m_torrent->connection_for(c.remote()) == &c);
c.set_peer_info(&i->second);
TORRENT_ASSERT(i->second.connection == 0);
c.add_stat(i->second.prev_amount_download, i->second.prev_amount_upload);
@ -991,7 +990,38 @@ namespace libtorrent
// m_last_optimistic_disconnect = time_now();
}
policy::peer* policy::peer_from_tracker(const tcp::endpoint& remote, const peer_id& pid
void policy::update_peer_port(int port, policy::peer* p, int src)
{
TORRENT_ASSERT(p != 0);
if (p->ip.port() == port) return;
if (m_torrent->settings().allow_multiple_connections_per_ip)
{
tcp::endpoint remote(p->ip.address(), port);
std::pair<iterator, iterator> range = m_peers.equal_range(remote.address());
iterator i = std::find_if(range.first, range.second
, match_peer_endpoint(remote));
if (i != m_peers.end())
{
policy::peer& pp = i->second;
if (pp.connection)
{
throw protocol_error("duplicate connection");
}
if (m_torrent->has_picker())
m_torrent->picker().clear_peer(&i->second);
m_peers.erase(i);
}
}
else
{
TORRENT_ASSERT(m_peers.count(p->ip.address()) == 1);
}
p->ip.port(port);
p->source |= src;
}
policy::peer* policy::peer_from_tracker(tcp::endpoint const& remote, peer_id const& pid
, int src, char flags)
{
// too expensive
@ -1022,9 +1052,7 @@ namespace libtorrent
{
std::pair<iterator, iterator> range = m_peers.equal_range(remote.address());
i = std::find_if(range.first, range.second, match_peer_endpoint(remote));
if (i == range.second)
i = std::find_if(m_peers.begin(), m_peers.end(), match_peer_id(pid));
if (i == range.second) i = m_peers.end();
}
else
{
@ -1066,9 +1094,6 @@ namespace libtorrent
{
i->second.type = peer::connectable;
// in case we got the ip from a remote connection, port is
// not known, so save it. Client may also have changed port
// for some reason.
i->second.ip = remote;
i->second.source |= src;
@ -1281,10 +1306,7 @@ namespace libtorrent
try
{
p->second.connected = time_now();
p->second.connection = m_torrent->connect_to_peer(&p->second);
TORRENT_ASSERT(p->second.connection == m_torrent->connection_for(p->second.ip));
if (p->second.connection == 0)
if (!m_torrent->connect_to_peer(&p->second))
{
++p->second.failcount;
return false;
@ -1396,6 +1418,7 @@ namespace libtorrent
void policy::check_invariant() const
{
if (m_torrent->is_aborted()) return;
int connected_peers = 0;
int total_connections = 0;
@ -1414,20 +1437,14 @@ namespace libtorrent
{
TORRENT_ASSERT(unique_test.count(p.ip) == 0);
unique_test.insert(p.ip);
TORRENT_ASSERT(i->first == p.ip.address());
// TORRENT_ASSERT(p.connection == 0 || p.ip == p.connection->remote());
}
++total_connections;
if (!p.connection)
{
continue;
}
if (!m_torrent->settings().allow_multiple_connections_per_ip)
{
std::vector<peer_connection*> conns;
m_torrent->connection_for(p.ip.address(), conns);
TORRENT_ASSERT(std::find_if(conns.begin(), conns.end()
, boost::bind(std::equal_to<peer_connection*>(), _1, p.connection))
!= conns.end());
}
if (p.optimistically_unchoked)
{
TORRENT_ASSERT(p.connection);
@ -1444,10 +1461,10 @@ namespace libtorrent
for (torrent::const_peer_iterator i = m_torrent->begin();
i != m_torrent->end(); ++i)
{
if (i->second->is_disconnecting()) continue;
if ((*i)->is_disconnecting()) continue;
// ignore web_peer_connections since they are not managed
// by the policy class
if (dynamic_cast<web_peer_connection*>(i->second)) continue;
if (dynamic_cast<web_peer_connection*>(*i)) continue;
++num_torrent_peers;
}

View File

@ -700,7 +700,7 @@ namespace detail
for (connection_map::iterator i = m_connections.begin()
, end(m_connections.end()); i != end; ++i)
{
i->second->disconnect();
(*i)->disconnect();
}
#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
@ -1039,7 +1039,7 @@ namespace detail
c->m_in_constructor = false;
#endif
m_connections.insert(std::make_pair(s, c));
m_connections.insert(c);
}
catch (std::exception& exc)
{
@ -1048,7 +1048,7 @@ namespace detail
#endif
};
void session_impl::connection_failed(boost::shared_ptr<socket_type> const& s
void session_impl::connection_failed(boost::intrusive_ptr<peer_connection> const& peer
, tcp::endpoint const& a, char const* message)
#ifndef NDEBUG
try
@ -1059,7 +1059,7 @@ namespace detail
// too expensive
// INVARIANT_CHECK;
connection_map::iterator p = m_connections.find(s);
connection_map::iterator p = m_connections.find(peer);
// the connection may have been disconnected in the receive or send phase
if (p == m_connections.end()) return;
@ -1068,15 +1068,15 @@ namespace detail
m_alerts.post_alert(
peer_error_alert(
a
, p->second->pid()
, (*p)->pid()
, message));
}
#if defined(TORRENT_VERBOSE_LOGGING)
(*p->second->m_logger) << "*** CONNECTION FAILED " << message << "\n";
(*(*p)->m_logger) << "*** CONNECTION FAILED " << message << "\n";
#endif
p->second->set_failed();
p->second->disconnect();
(*p)->set_failed();
(*p)->disconnect();
}
#ifndef NDEBUG
catch (...)
@ -1093,10 +1093,10 @@ namespace detail
// INVARIANT_CHECK;
TORRENT_ASSERT(p->is_disconnecting());
connection_map::iterator i = m_connections.find(p->get_socket());
connection_map::iterator i = m_connections.find(p);
if (i != m_connections.end())
{
if (!i->second->is_choked()) --m_num_unchoked;
if (!(*i)->is_choked()) --m_num_unchoked;
m_connections.erase(i);
}
}
@ -1155,7 +1155,7 @@ namespace detail
for (connection_map::iterator i = m_connections.begin()
, end(m_connections.end()); i != end; ++i)
{
if (i->second->is_connecting())
if ((*i)->is_connecting())
++num_half_open;
else
++num_complete_connections;
@ -1243,7 +1243,7 @@ namespace detail
++i;
// if this socket has timed out
// close it.
peer_connection& c = *j->second;
peer_connection& c = *j->get();
if (c.has_timed_out())
{
if (m_alerts.should_post(alert::debug))
@ -1310,7 +1310,7 @@ namespace detail
for (connection_map::iterator i = m_connections.begin()
, end(m_connections.end()); i != end; ++i)
{
peer_connection* p = i->second.get();
peer_connection* p = i->get();
torrent* t = p->associated_torrent().lock().get();
if (!p->peer_info_struct()
|| t == 0
@ -1320,7 +1320,7 @@ namespace detail
|| (p->share_diff() < -free_upload_amount
&& !t->is_seed()))
{
if (!i->second->is_choked() && t)
if (!(*i)->is_choked() && t)
{
policy::peer* pi = p->peer_info_struct();
if (pi && pi->optimistically_unchoked)
@ -1329,11 +1329,11 @@ namespace detail
// force a new optimistic unchoke
m_optimistic_unchoke_time_scaler = 0;
}
t->choke_peer(*i->second);
t->choke_peer(*(*i));
}
continue;
}
peers.push_back(i->second.get());
peers.push_back(i->get());
}
// sort the peers that are eligible for unchoke by download rate and secondary
@ -1405,7 +1405,7 @@ namespace detail
for (connection_map::iterator i = m_connections.begin()
, end(m_connections.end()); i != end; ++i)
{
peer_connection* p = i->second.get();
peer_connection* p = i->get();
TORRENT_ASSERT(p);
policy::peer* pi = p->peer_info_struct();
if (!pi) continue;
@ -1436,21 +1436,21 @@ namespace detail
{
if (current_optimistic_unchoke != m_connections.end())
{
torrent* t = current_optimistic_unchoke->second->associated_torrent().lock().get();
torrent* t = (*current_optimistic_unchoke)->associated_torrent().lock().get();
TORRENT_ASSERT(t);
current_optimistic_unchoke->second->peer_info_struct()->optimistically_unchoked = false;
t->choke_peer(*current_optimistic_unchoke->second);
(*current_optimistic_unchoke)->peer_info_struct()->optimistically_unchoked = false;
t->choke_peer(*current_optimistic_unchoke->get());
}
else
{
++m_num_unchoked;
}
torrent* t = optimistic_unchoke_candidate->second->associated_torrent().lock().get();
torrent* t = (*optimistic_unchoke_candidate)->associated_torrent().lock().get();
TORRENT_ASSERT(t);
bool ret = t->unchoke_peer(*optimistic_unchoke_candidate->second);
bool ret = t->unchoke_peer(*optimistic_unchoke_candidate->get());
TORRENT_ASSERT(ret);
optimistic_unchoke_candidate->second->peer_info_struct()->optimistically_unchoked = true;
(*optimistic_unchoke_candidate)->peer_info_struct()->optimistically_unchoked = true;
}
}
}
@ -1593,7 +1593,7 @@ namespace detail
(*m_logger) << time_now_string() << " cleaning up connections\n";
#endif
while (!m_connections.empty())
m_connections.begin()->second->disconnect();
(*m_connections.begin())->disconnect();
#ifndef NDEBUG
for (torrent_map::iterator i = m_torrents.begin();
@ -2403,19 +2403,20 @@ namespace detail
for (connection_map::const_iterator i = m_connections.begin();
i != m_connections.end(); ++i)
{
TORRENT_ASSERT(i->second);
boost::shared_ptr<torrent> t = i->second->associated_torrent().lock();
TORRENT_ASSERT(*i);
boost::shared_ptr<torrent> t = (*i)->associated_torrent().lock();
if (!i->second->is_choked()) ++unchokes;
if (i->second->peer_info_struct()
&& i->second->peer_info_struct()->optimistically_unchoked)
peer_connection* p = i->get();
if (!p->is_choked()) ++unchokes;
if (p->peer_info_struct()
&& p->peer_info_struct()->optimistically_unchoked)
{
++num_optimistic;
TORRENT_ASSERT(!i->second->is_choked());
TORRENT_ASSERT(!p->is_choked());
}
if (t && i->second->peer_info_struct())
if (t && p->peer_info_struct())
{
TORRENT_ASSERT(t->get_policy().has_connection(boost::get_pointer(i->second)));
TORRENT_ASSERT(t->get_policy().has_connection(p));
}
}
TORRENT_ASSERT(num_optimistic == 0 || num_optimistic == 1);

View File

@ -116,11 +116,11 @@ namespace
, tor(t)
{ TORRENT_ASSERT(t != 0); }
bool operator()(const session_impl::connection_map::value_type& c) const
bool operator()(session_impl::connection_map::value_type const& c) const
{
tcp::endpoint sender = c.first->remote_endpoint();
tcp::endpoint const& sender = c->remote();
if (sender.address() != ip.address()) return false;
if (tor != c.second->associated_torrent().lock().get()) return false;
if (tor != c->associated_torrent().lock().get()) return false;
return true;
}
@ -132,9 +132,9 @@ namespace
{
peer_by_id(const peer_id& i): pid(i) {}
bool operator()(const std::pair<tcp::endpoint, peer_connection*>& p) const
bool operator()(session_impl::connection_map::value_type const& p) const
{
if (p.second->pid() != pid) return false;
if (p->pid() != pid) return false;
// have a special case for all zeros. We can have any number
// of peers with that pid, since it's used to indicate no pid.
if (std::count(pid.begin(), pid.end(), 0) == 20) return false;
@ -178,7 +178,6 @@ namespace libtorrent
#ifndef TORRENT_DISABLE_DHT
, m_last_dht_announce(time_now() - minutes(15))
#endif
, m_policy()
, m_ses(ses)
, m_checker(checker)
, m_picker(0)
@ -203,8 +202,8 @@ namespace libtorrent
, m_max_uploads((std::numeric_limits<int>::max)())
, m_num_uploads(0)
, m_max_connections((std::numeric_limits<int>::max)())
, m_policy(this)
{
m_policy.reset(new policy(this));
}
torrent::torrent(
@ -239,7 +238,6 @@ namespace libtorrent
#ifndef TORRENT_DISABLE_DHT
, m_last_dht_announce(time_now() - minutes(15))
#endif
, m_policy()
, m_ses(ses)
, m_checker(checker)
, m_picker(0)
@ -263,6 +261,7 @@ namespace libtorrent
, m_max_uploads((std::numeric_limits<int>::max)())
, m_num_uploads(0)
, m_max_connections((std::numeric_limits<int>::max)())
, m_policy(this)
{
INVARIANT_CHECK;
@ -273,8 +272,6 @@ namespace libtorrent
m_trackers.push_back(announce_entry(tracker_url));
m_torrent_file->add_tracker(tracker_url);
}
m_policy.reset(new policy(this));
}
void torrent::start()
@ -317,7 +314,7 @@ namespace libtorrent
for (peer_iterator i = m_connections.begin();
i != m_connections.end(); ++i)
{
(*i->second->m_logger) << "*** DESTRUCTING TORRENT\n";
(*(*i)->m_logger) << "*** DESTRUCTING TORRENT\n";
}
#endif
@ -566,7 +563,7 @@ namespace libtorrent
continue;
}
m_policy->peer_from_tracker(a, i->pid, peer_info::tracker, 0);
m_policy.peer_from_tracker(a, i->pid, peer_info::tracker, 0);
}
catch (std::exception&)
{
@ -614,7 +611,7 @@ namespace libtorrent
return;
}
m_policy->peer_from_tracker(*host, pid, peer_info::tracker, 0);
m_policy.peer_from_tracker(*host, pid, peer_info::tracker, 0);
}
catch (std::exception&)
{};
@ -744,7 +741,7 @@ namespace libtorrent
std::map<piece_block, int> downloading_piece;
for (const_peer_iterator i = begin(); i != end(); ++i)
{
peer_connection* pc = i->second;
peer_connection* pc = *i;
boost::optional<piece_block_progress> p
= pc->downloading_piece_progress();
if (p)
@ -885,7 +882,7 @@ namespace libtorrent
{
#endif
m_policy->piece_finished(index, passed_hash_check);
m_policy.piece_finished(index, passed_hash_check);
#ifndef NDEBUG
}
@ -925,7 +922,7 @@ namespace libtorrent
// resets the download queue. So, we cannot do the
// invariant check here since it assumes:
// (total_done == m_torrent_file->total_size()) => is_seed()
// INVARIANT_CHECK;
INVARIANT_CHECK;
TORRENT_ASSERT(m_storage);
TORRENT_ASSERT(m_storage->refcount() > 0);
@ -964,17 +961,6 @@ namespace libtorrent
{
policy::peer* p = static_cast<policy::peer*>(*i);
if (p == 0) continue;
#ifndef NDEBUG
if (!settings().allow_multiple_connections_per_ip)
{
std::vector<peer_connection*> conns;
connection_for(p->ip.address(), conns);
TORRENT_ASSERT(p->connection == 0
|| std::find_if(conns.begin(), conns.end()
, boost::bind(std::equal_to<peer_connection*>(), _1, p->connection))
!= conns.end());
}
#endif
if (p->connection) p->connection->received_invalid_data(index);
// either, we have received too many failed hashes
@ -1039,7 +1025,7 @@ namespace libtorrent
for (peer_iterator i = m_connections.begin();
i != m_connections.end(); ++i)
{
(*i->second->m_logger) << "*** ABORTING TORRENT\n";
(*(*i)->m_logger) << "*** ABORTING TORRENT\n";
}
#endif
@ -1108,7 +1094,7 @@ namespace libtorrent
m_picker->we_have(index);
for (peer_iterator i = m_connections.begin(); i != m_connections.end(); ++i)
try { i->second->announce_piece(index); } catch (std::exception&) {}
try { (*i)->announce_piece(index); } catch (std::exception&) {}
for (std::set<void*>::iterator i = peers.begin()
, end(peers.end()); i != end; ++i)
@ -1279,7 +1265,7 @@ namespace libtorrent
void torrent::update_peer_interest()
{
for (peer_iterator i = begin(); i != end(); ++i)
i->second->update_interest();
(*i)->update_interest();
}
void torrent::filter_piece(int index, bool filter)
@ -1461,7 +1447,7 @@ namespace libtorrent
for (peer_iterator i = m_connections.begin()
, end(m_connections.end()); i != end; ++i)
{
i->second->cancel_request(block);
(*i)->cancel_request(block);
}
}
@ -1471,7 +1457,7 @@ namespace libtorrent
TORRENT_ASSERT(p != 0);
peer_iterator i = m_connections.find(p->remote());
peer_iterator i = m_connections.find(p);
if (i == m_connections.end())
{
TORRENT_ASSERT(false);
@ -1509,8 +1495,9 @@ namespace libtorrent
if (!p->is_choked())
--m_num_uploads;
m_policy->connection_closed(*p);
m_policy.connection_closed(*p);
p->set_peer_info(0);
TORRENT_ASSERT(i != m_connections.end());
m_connections.erase(i);
}
catch (std::exception& e)
@ -1663,14 +1650,6 @@ namespace libtorrent
return;
}
peer_iterator conn = m_connections.find(a);
if (conn != m_connections.end())
{
if (dynamic_cast<web_peer_connection*>(conn->second) == 0
|| conn->second->is_disconnecting()) conn->second->disconnect();
else return;
}
boost::shared_ptr<socket_type> s(new socket_type);
bool ret = instantiate_connection(m_ses.m_io_service, m_ses.web_seed_proxy(), *s);
@ -1702,10 +1681,8 @@ namespace libtorrent
try
{
// add the newly connected peer to this torrent's peer list
TORRENT_ASSERT(m_connections.find(a) == m_connections.end());
m_connections.insert(
std::make_pair(a, boost::get_pointer(c)));
m_ses.m_connections.insert(std::make_pair(s, c));
m_connections.insert(boost::get_pointer(c));
m_ses.m_connections.insert(c);
m_ses.m_half_open.enqueue(
bind(&peer_connection::connect, c, _1)
@ -1721,7 +1698,7 @@ namespace libtorrent
// TODO: post an error alert!
// std::map<tcp::endpoint, peer_connection*>::iterator i = m_connections.find(a);
// if (i != m_connections.end()) m_connections.erase(i);
m_ses.connection_failed(s, a, e.what());
m_ses.connection_failed(c, a, e.what());
c->disconnect();
}
}
@ -1866,19 +1843,19 @@ namespace libtorrent
}
#endif
peer_connection* torrent::connect_to_peer(policy::peer* peerinfo)
bool torrent::connect_to_peer(policy::peer* peerinfo) throw()
{
INVARIANT_CHECK;
TORRENT_ASSERT(peerinfo);
TORRENT_ASSERT(peerinfo->connection == 0);
peerinfo->connected = time_now();
#ifndef NDEBUG
// this asserts that we don't have duplicates in the policy's peer list
peer_iterator i_ = m_connections.find(peerinfo->ip);
peer_iterator i_ = std::find_if(m_connections.begin(), m_connections.end()
, bind(&peer_connection::remote, _1) == peerinfo->ip);
TORRENT_ASSERT(i_ == m_connections.end()
|| i_->second->is_disconnecting()
|| dynamic_cast<bt_peer_connection*>(i_->second) == 0
|| m_ses.settings().allow_multiple_connections_per_ip);
|| dynamic_cast<bt_peer_connection*>(*i_) == 0);
#endif
TORRENT_ASSERT(want_more_peers());
@ -1899,23 +1876,20 @@ namespace libtorrent
c->m_in_constructor = false;
#endif
#ifndef TORRENT_DISABLE_EXTENSIONS
for (extension_list_t::iterator i = m_extensions.begin()
, end(m_extensions.end()); i != end; ++i)
{
boost::shared_ptr<peer_plugin> pp((*i)->new_connection(c.get()));
if (pp) c->add_extension(pp);
}
#endif
try
{
TORRENT_ASSERT(m_connections.find(a) == m_connections.end());
#ifndef TORRENT_DISABLE_EXTENSIONS
for (extension_list_t::iterator i = m_extensions.begin()
, end(m_extensions.end()); i != end; ++i)
{
boost::shared_ptr<peer_plugin> pp((*i)->new_connection(c.get()));
if (pp) c->add_extension(pp);
}
#endif
// add the newly connected peer to this torrent's peer list
m_connections.insert(
std::make_pair(a, boost::get_pointer(c)));
m_ses.m_connections.insert(std::make_pair(s, c));
m_connections.insert(boost::get_pointer(c));
m_ses.m_connections.insert(c);
int timeout = settings().peer_connect_timeout;
if (peerinfo) timeout += 3 * peerinfo->failcount;
@ -1927,16 +1901,15 @@ namespace libtorrent
}
catch (std::exception& e)
{
TORRENT_ASSERT(false);
// TODO: post an error alert!
std::map<tcp::endpoint, peer_connection*>::iterator i = m_connections.find(a);
std::set<peer_connection*>::iterator i
= m_connections.find(boost::get_pointer(c));
if (i != m_connections.end()) m_connections.erase(i);
m_ses.connection_failed(s, a, e.what());
m_ses.connection_failed(c, a, e.what());
c->disconnect();
throw;
return false;
}
if (c->is_disconnecting()) throw protocol_error("failed to connect");
return c.get();
peerinfo->connection = c.get();
return true;
}
void torrent::set_metadata(entry const& metadata)
@ -1980,25 +1953,7 @@ namespace libtorrent
TORRENT_ASSERT(p != 0);
TORRENT_ASSERT(!p->is_local());
std::map<tcp::endpoint, peer_connection*>::iterator c
= m_connections.find(p->remote());
if (c != m_connections.end())
{
TORRENT_ASSERT(p != c->second);
// we already have a peer_connection to this ip.
// It may currently be waiting for completing a
// connection attempt that might fail. So,
// prioritize this current connection since
// it has already succeeded.
if (!c->second->is_connecting())
{
throw protocol_error("already connected to peer");
}
c->second->disconnect();
}
if (m_ses.m_connections.find(p->get_socket())
== m_ses.m_connections.end())
if (m_ses.m_connections.find(p) == m_ses.m_connections.end())
{
throw protocol_error("peer is not properly constructed");
}
@ -2008,9 +1963,8 @@ namespace libtorrent
throw protocol_error("session is closing");
}
TORRENT_ASSERT(m_connections.find(p->remote()) == m_connections.end());
peer_iterator ci = m_connections.insert(
std::make_pair(p->remote(), p)).first;
TORRENT_ASSERT(m_connections.find(p) == m_connections.end());
peer_iterator ci = m_connections.insert(p).first;
try
{
// if new_connection throws, we have to remove the
@ -2024,9 +1978,9 @@ namespace libtorrent
if (pp) p->add_extension(pp);
}
#endif
TORRENT_ASSERT(connection_for(p->remote()) == p);
TORRENT_ASSERT(ci->second == p);
m_policy->new_connection(*ci->second);
TORRENT_ASSERT(m_connections.find(p) == ci);
TORRENT_ASSERT(*ci == p);
m_policy.new_connection(**ci);
}
catch (std::exception& e)
{
@ -2036,7 +1990,7 @@ namespace libtorrent
TORRENT_ASSERT(p->remote() == p->get_socket()->remote_endpoint());
#ifndef NDEBUG
m_policy->check_invariant();
m_policy.check_invariant();
#endif
}
@ -2055,19 +2009,19 @@ namespace libtorrent
while (!m_connections.empty())
{
peer_connection& p = *m_connections.begin()->second;
TORRENT_ASSERT(p.associated_torrent().lock().get() == this);
peer_connection* p = *m_connections.begin();
TORRENT_ASSERT(p->associated_torrent().lock().get() == this);
#if defined(TORRENT_VERBOSE_LOGGING)
if (m_abort)
(*p.m_logger) << "*** CLOSING CONNECTION 'aborting'\n";
(*p->m_logger) << "*** CLOSING CONNECTION 'aborting'\n";
else
(*p.m_logger) << "*** CLOSING CONNECTION 'pausing'\n";
(*p->m_logger) << "*** CLOSING CONNECTION 'pausing'\n";
#endif
#ifndef NDEBUG
std::size_t size = m_connections.size();
#endif
p.disconnect();
p->disconnect();
TORRENT_ASSERT(m_connections.size() <= size);
}
}
@ -2158,13 +2112,14 @@ namespace libtorrent
for (peer_iterator i = m_connections.begin();
i != m_connections.end(); ++i)
{
TORRENT_ASSERT(i->second->associated_torrent().lock().get() == this);
if (i->second->is_seed())
peer_connection* p = *i;
TORRENT_ASSERT(p->associated_torrent().lock().get() == this);
if (p->is_seed())
{
#if defined(TORRENT_VERBOSE_LOGGING)
(*i->second->m_logger) << "*** SEED, CLOSING CONNECTION\n";
(*p->m_logger) << "*** SEED, CLOSING CONNECTION\n";
#endif
seeds.push_back(i->second);
seeds.push_back(p);
}
}
std::for_each(seeds.begin(), seeds.end()
@ -2365,23 +2320,21 @@ namespace libtorrent
m_connections_initialized = true;
// all peer connections have to initialize themselves now that the metadata
// is available
typedef std::map<tcp::endpoint, peer_connection*> conn_map;
for (conn_map::iterator i = m_connections.begin()
for (torrent::peer_iterator i = m_connections.begin()
, end(m_connections.end()); i != end;)
{
try
{
i->second->on_metadata();
i->second->init();
(*i)->on_metadata();
(*i)->init();
++i;
}
catch (std::exception& e)
{
// the connection failed, close it
conn_map::iterator j = i;
torrent::peer_iterator j = i;
++j;
m_ses.connection_failed(i->second->get_socket()
, i->first, e.what());
m_ses.connection_failed(*i, (*i)->remote(), e.what());
i = j;
}
}
@ -2452,7 +2405,7 @@ namespace libtorrent
std::map<piece_block, int> num_requests;
for (const_peer_iterator i = begin(); i != end(); ++i)
{
peer_connection const& p = *i->second;
peer_connection const& p = *(*i);
for (std::deque<piece_block>::const_iterator i = p.request_queue().begin()
, end(p.request_queue().end()); i != end; ++i)
++num_requests[*i];
@ -2484,12 +2437,12 @@ namespace libtorrent
TORRENT_ASSERT(m_abort || m_have_pieces.empty());
}
/* for (policy::const_iterator i = m_policy->begin_peer()
, end(m_policy->end_peer()); i != end; ++i)
for (policy::const_iterator i = m_policy.begin_peer()
, end(m_policy.end_peer()); i != end; ++i)
{
TORRENT_ASSERT(i->connection == const_cast<torrent*>(this)->connection_for(i->ip));
TORRENT_ASSERT(i->second.ip.address() == i->first);
}
*/
size_type total_done = quantized_bytes_done();
if (m_torrent_file->is_valid())
{
@ -2572,17 +2525,19 @@ namespace libtorrent
void torrent::set_peer_upload_limit(tcp::endpoint ip, int limit)
{
TORRENT_ASSERT(limit >= -1);
peer_connection* p = connection_for(ip);
if (p == 0) return;
p->set_upload_limit(limit);
peer_iterator i = std::find_if(m_connections.begin(), m_connections.end()
, bind(&peer_connection::remote, _1) == ip);
if (i == m_connections.end()) return;
(*i)->set_upload_limit(limit);
}
void torrent::set_peer_download_limit(tcp::endpoint ip, int limit)
{
TORRENT_ASSERT(limit >= -1);
peer_connection* p = connection_for(ip);
if (p == 0) return;
p->set_download_limit(limit);
peer_iterator i = std::find_if(m_connections.begin(), m_connections.end()
, bind(&peer_connection::remote, _1) == ip);
if (i == m_connections.end()) return;
(*i)->set_download_limit(limit);
}
void torrent::set_upload_limit(int limit)
@ -2621,7 +2576,7 @@ namespace libtorrent
for (peer_iterator i = m_connections.begin();
i != m_connections.end(); ++i)
{
(*i->second->m_logger) << "*** DELETING FILES IN TORRENT\n";
(*(*i)->m_logger) << "*** DELETING FILES IN TORRENT\n";
}
#endif
@ -2656,7 +2611,7 @@ namespace libtorrent
for (peer_iterator i = m_connections.begin();
i != m_connections.end(); ++i)
{
(*i->second->m_logger) << "*** PAUSING TORRENT\n";
(*(*i)->m_logger) << "*** PAUSING TORRENT\n";
}
#endif
@ -2730,7 +2685,7 @@ namespace libtorrent
i != m_connections.end(); ++i)
{
web_peer_connection* p
= dynamic_cast<web_peer_connection*>(i->second);
= dynamic_cast<web_peer_connection*>(*i);
if (!p) continue;
web_seeds.insert(p->url());
}
@ -2753,7 +2708,7 @@ namespace libtorrent
for (peer_iterator i = m_connections.begin();
i != m_connections.end();)
{
peer_connection* p = i->second;
peer_connection* p = *i;
++i;
m_stat += p->statistics();
// updates the peer connection's ul/dl bandwidth
@ -2778,14 +2733,14 @@ namespace libtorrent
if (m_time_scaler <= 0)
{
m_time_scaler = 10;
m_policy->pulse();
m_policy.pulse();
}
}
bool torrent::try_connect_peer()
{
TORRENT_ASSERT(want_more_peers());
return m_policy->connect_one_peer();
return m_policy.connect_one_peer();
}
void torrent::async_verify_piece(int piece_index, boost::function<void(bool)> const& f)
@ -2869,8 +2824,7 @@ namespace libtorrent
torrent_status st;
st.num_peers = (int)std::count_if(m_connections.begin(), m_connections.end(),
!boost::bind(&peer_connection::is_connecting
, boost::bind(&std::map<tcp::endpoint,peer_connection*>::value_type::second, _1)));
!boost::bind(&peer_connection::is_connecting, _1));
st.storage_mode = m_storage_mode;
@ -2997,9 +2951,7 @@ namespace libtorrent
INVARIANT_CHECK;
return (int)std::count_if(m_connections.begin(), m_connections.end()
, boost::bind(&peer_connection::is_seed
, boost::bind(&std::map<tcp::endpoint
, peer_connection*>::value_type::second, _1)));
, boost::bind(&peer_connection::is_seed, _1));
}
void torrent::tracker_request_timed_out(

View File

@ -908,7 +908,7 @@ namespace libtorrent
for (torrent::const_peer_iterator i = t->begin();
i != t->end(); ++i)
{
peer_connection* peer = i->second;
peer_connection* peer = *i;
// incoming peers that haven't finished the handshake should
// not be included in this list

View File

@ -113,18 +113,20 @@ namespace libtorrent { namespace
for (torrent::peer_iterator i = m_torrent.begin()
, end(m_torrent.end()); i != end; ++i)
{
if (!send_peer(*i->second)) continue;
peer_connection* peer = *i;
if (!send_peer(*peer)) continue;
m_old_peers.insert(i->first);
tcp::endpoint const& remote = peer->remote();
m_old_peers.insert(remote);
std::set<tcp::endpoint>::iterator di = dropped.find(i->first);
std::set<tcp::endpoint>::iterator di = dropped.find(remote);
if (di == dropped.end())
{
// don't write too big of a package
if (num_added >= max_peer_entries) break;
// only send proper bittorrent peers
bt_peer_connection* p = dynamic_cast<bt_peer_connection*>(i->second);
bt_peer_connection* p = dynamic_cast<bt_peer_connection*>(peer);
if (!p) continue;
// no supported flags to set yet
@ -135,14 +137,14 @@ namespace libtorrent { namespace
flags |= p->supports_encryption() ? 1 : 0;
#endif
// i->first was added since the last time
if (i->first.address().is_v4())
if (remote.address().is_v4())
{
detail::write_endpoint(i->first, pla_out);
detail::write_endpoint(remote, pla_out);
detail::write_uint8(flags, plf_out);
}
else
{
detail::write_endpoint(i->first, pla6_out);
detail::write_endpoint(remote, pla6_out);
detail::write_uint8(flags, plf6_out);
}
++num_added;
@ -156,7 +158,7 @@ namespace libtorrent { namespace
}
for (std::set<tcp::endpoint>::const_iterator i = dropped.begin()
, end(dropped.end());i != end; ++i)
, end(dropped.end()); i != end; ++i)
{
if (i->address().is_v4())
detail::write_endpoint(*i, pld_out);
@ -327,13 +329,14 @@ namespace libtorrent { namespace
for (torrent::peer_iterator i = m_torrent.begin()
, end(m_torrent.end()); i != end; ++i)
{
if (!send_peer(*i->second)) continue;
peer_connection* peer = *i;
if (!send_peer(*peer)) continue;
// don't write too big of a package
if (num_added >= max_peer_entries) break;
// only send proper bittorrent peers
bt_peer_connection* p = dynamic_cast<bt_peer_connection*>(i->second);
bt_peer_connection* p = dynamic_cast<bt_peer_connection*>(peer);
if (!p) continue;
// no supported flags to set yet
@ -343,15 +346,16 @@ namespace libtorrent { namespace
#ifndef TORRENT_DISABLE_ENCRYPTION
flags |= p->supports_encryption() ? 1 : 0;
#endif
tcp::endpoint const& remote = peer->remote();
// i->first was added since the last time
if (i->first.address().is_v4())
if (remote.address().is_v4())
{
detail::write_endpoint(i->first, pla_out);
detail::write_endpoint(remote, pla_out);
detail::write_uint8(flags, plf_out);
}
else
{
detail::write_endpoint(i->first, pla6_out);
detail::write_endpoint(remote, pla6_out);
detail::write_uint8(flags, plf6_out);
}
++num_added;