premiere-libtorrent/src/session.cpp

745 lines
18 KiB
C++
Raw Normal View History

/*
Copyright (c) 2003, Arvid Norberg
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in
the documentation and/or other materials provided with the distribution.
* Neither the name of the author nor the names of its
contributors may be used to endorse or promote products derived
from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
*/
#include <ctime>
#include <iostream>
#include <fstream>
#include <iomanip>
#include <iterator>
#include <algorithm>
#include <set>
#include <cctype>
2003-10-27 16:43:33 +01:00
#include <algorithm>
#include <boost/lexical_cast.hpp>
#include <boost/filesystem/convenience.hpp>
2003-11-28 18:29:27 +01:00
#include <boost/filesystem/exception.hpp>
#include "libtorrent/peer_id.hpp"
#include "libtorrent/torrent_info.hpp"
#include "libtorrent/url_handler.hpp"
#include "libtorrent/bencode.hpp"
#include "libtorrent/hasher.hpp"
#include "libtorrent/entry.hpp"
#include "libtorrent/session.hpp"
#if defined(_MSC_VER) && _MSC_VER < 1300
namespace std
{
using ::srand;
using ::isprint;
};
#endif
2003-12-07 06:53:04 +01:00
namespace
{
// adjusts the upload rates of every peer connection
// to make sure the sum of all send quotas equals
// the given upload_limit. An upload limit of -1 means
// unlimited upload rate, but the rates of each peer
// has to be set anyway, since it depends on the download
// rate from the peer.
void control_upload_rates(
int upload_limit
, libtorrent::detail::session_impl::connection_map connections)
{
using namespace libtorrent;
if (connections.empty()) return;
assert(upload_limit != 0);
if (upload_limit == -1)
{
for (detail::session_impl::connection_map::iterator i = connections.begin();
i != connections.end();
++i)
{
// there's no limit, set the quota to max
// allowed
peer_connection& p = *i->second;
p.set_send_quota(p.send_quota_limit());
}
return;
}
2003-12-08 02:37:30 +01:00
// TODO: upload limit support is currently broken
2003-12-07 06:53:04 +01:00
assert(false);
#ifndef NDEBUG
int sum = 0;
for (detail::session_impl::connection_map::iterator i = connections.begin();
i != connections.end();
++i)
{
peer_connection& p = *i->second;
sum += p.send_quota();
}
assert(sum == upload_limit);
#endif
}
}
2003-10-26 18:35:23 +01:00
namespace libtorrent
{
namespace detail
{
2003-10-31 05:02:51 +01:00
void checker_impl::operator()()
{
2003-11-05 18:42:27 +01:00
eh_initializer();
2003-10-31 05:02:51 +01:00
for (;;)
{
piece_checker_data* t;
{
boost::mutex::scoped_lock l(m_mutex);
// if the job queue is empty and
// we shouldn't abort
// wait for a signal
if (m_torrents.empty() && !m_abort)
m_cond.wait(l);
if (m_abort) return;
assert(!m_torrents.empty());
t = &m_torrents.front();
if (t->abort)
{
m_torrents.pop_front();
continue;
}
}
try
{
2003-12-07 06:53:04 +01:00
assert(t != 0);
t->torrent_ptr->check_files(*t, m_mutex);
2003-10-31 05:02:51 +01:00
// lock the session to add the new torrent
boost::mutex::scoped_lock l(m_mutex);
if (!t->abort)
{
boost::mutex::scoped_lock l(m_ses->m_mutex);
m_ses->m_torrents.insert(
std::make_pair(t->info_hash, t->torrent_ptr)).first;
}
}
catch(const std::exception& e)
2003-11-28 18:29:27 +01:00
{
#ifndef NDEBUG
std::cerr << "error while checking files: " << e.what() << "\n";
#endif
}
2003-10-31 05:02:51 +01:00
catch(...)
{
#ifndef NDEBUG
2003-11-05 00:27:06 +01:00
std::cerr << "error while checking files\n";
2003-10-31 05:02:51 +01:00
#endif
}
// remove ourself from the 'checking'-list
// (we're no longer in the checking state)
boost::mutex::scoped_lock l(m_mutex);
m_torrents.pop_front();
}
}
detail::piece_checker_data* checker_impl::find_torrent(const sha1_hash& info_hash)
{
for (std::deque<piece_checker_data>::iterator i
= m_torrents.begin();
i != m_torrents.end();
++i)
{
if (i->info_hash == info_hash) return &(*i);
}
return 0;
}
session_impl::session_impl(int listen_port,
const std::string& cl_fprint)
2003-10-26 18:35:23 +01:00
: m_abort(false)
, m_tracker_manager(m_settings)
2003-10-31 05:02:51 +01:00
, m_listen_port(listen_port)
2003-11-09 19:17:09 +01:00
, m_upload_rate(-1)
2003-10-26 18:35:23 +01:00
{
2003-10-26 18:35:23 +01:00
// ---- generate a peer id ----
2003-10-26 18:35:23 +01:00
std::srand(std::time(0));
2003-10-31 05:02:51 +01:00
const int len1 = std::min(cl_fprint.length(), (std::size_t)7);
const int len2 = 12 - len1;
2003-10-26 18:35:23 +01:00
// the client's fingerprint
2003-10-31 05:02:51 +01:00
std::copy(cl_fprint.begin(), cl_fprint.begin()+len2, m_peer_id.begin());
2003-10-26 18:35:23 +01:00
// the zeros
2003-10-31 05:02:51 +01:00
std::fill(m_peer_id.begin()+len1, m_peer_id.begin()+len1+len2, 0);
assert(len1 + len2 == 12);
2003-10-26 18:35:23 +01:00
// the random number
2003-10-31 05:02:51 +01:00
for (unsigned char* i = m_peer_id.begin()+len1+len2;
2003-10-26 18:35:23 +01:00
i != m_peer_id.end();
++i)
{
*i = rand();
}
}
2003-10-31 05:02:51 +01:00
void session_impl::operator()()
{
2003-11-05 18:42:27 +01:00
eh_initializer();
2003-11-09 19:17:09 +01:00
#ifndef NDEBUG
m_logger = create_log("main session");
2003-11-10 14:15:41 +01:00
try
{
#endif
boost::shared_ptr<socket> listener(new socket(socket::tcp, false));
2003-10-31 05:02:51 +01:00
int max_port = m_listen_port + 9;
// create listener socket
for(;;)
{
try
{
2003-10-31 05:02:51 +01:00
listener->listen(m_listen_port, 5);
}
2003-12-01 22:27:27 +01:00
catch(std::exception&)
{
2003-11-05 00:27:06 +01:00
if (m_listen_port > max_port)
throw;
2003-10-31 05:02:51 +01:00
m_listen_port++;
continue;
}
break;
}
2003-11-09 19:17:09 +01:00
#ifndef NDEBUG
2003-10-31 05:02:51 +01:00
(*m_logger) << "listening on port: " << m_listen_port << "\n";
#endif
m_selector.monitor_readability(listener);
m_selector.monitor_errors(listener);
std::vector<boost::shared_ptr<socket> > readable_clients;
std::vector<boost::shared_ptr<socket> > writable_clients;
std::vector<boost::shared_ptr<socket> > error_clients;
boost::posix_time::ptime timer = boost::posix_time::second_clock::local_time();
2003-11-05 00:27:06 +01:00
#ifndef NDEBUG
int loops_per_second = 0;
#endif
for(;;)
{
2003-11-09 19:17:09 +01:00
#ifndef NDEBUG
assert_invariant();
loops_per_second++;
2003-11-09 19:17:09 +01:00
#endif
// if nothing happens within 500000 microseconds (0.5 seconds)
// do the loop anyway to check if anything else has changed
// << "sleeping\n";
m_selector.wait(500000, readable_clients, writable_clients, error_clients);
boost::mutex::scoped_lock l(m_mutex);
2003-10-30 00:28:09 +01:00
// +1 for the listen socket
assert(m_selector.count_read_monitors() == m_connections.size() + 1);
if (m_abort)
{
m_tracker_manager.abort_all_requests();
for (std::map<sha1_hash, boost::shared_ptr<torrent> >::iterator i =
m_torrents.begin();
i != m_torrents.end();
++i)
{
i->second->abort();
2003-10-31 05:02:51 +01:00
m_tracker_manager.queue_request(i->second->generate_tracker_request(m_listen_port));
}
m_connections.clear();
m_torrents.clear();
break;
}
#ifndef NDEBUG
assert_invariant();
#endif
// ************************
// RECEIVE SOCKETS
// ************************
// let the readable clients receive data
2003-10-23 18:55:52 +02:00
for (std::vector<boost::shared_ptr<socket> >::iterator i = readable_clients.begin();
i != readable_clients.end();
++i)
{
// special case for listener socket
if (*i == listener)
{
boost::shared_ptr<libtorrent::socket> s = (*i)->accept();
s->set_blocking(false);
if (s)
{
// we got a connection request!
2003-11-09 19:17:09 +01:00
#ifndef NDEBUG
(*m_logger) << s->sender().as_string() << " <== INCOMING CONNECTION\n";
#endif
// TODO: the send buffer size should be controllable from the outside
2003-10-31 05:02:51 +01:00
// s->set_send_bufsize(2048);
// TODO: filter ip:s
2003-11-09 19:17:09 +01:00
boost::shared_ptr<peer_connection> c(
2003-12-07 06:53:04 +01:00
new peer_connection(*this, m_selector, s));
2003-11-10 14:15:41 +01:00
if (m_upload_rate != -1) c->set_send_quota(0);
m_connections.insert(std::make_pair(s, c));
m_selector.monitor_readability(s);
m_selector.monitor_errors(s);
}
continue;
}
connection_map::iterator p = m_connections.find(*i);
if(p == m_connections.end())
{
m_selector.remove(*i);
}
else
{
try
{
// (*m_logger) << "readable: " << p->first->sender().as_string() << "\n";
p->second->receive_data();
}
2003-12-07 06:53:04 +01:00
catch(std::exception& e)
{
// the connection wants to disconnect for some reason, remove it
// from the connection-list
m_selector.remove(*i);
m_connections.erase(p);
}
}
}
#ifndef NDEBUG
assert_invariant();
#endif
// ************************
// SEND SOCKETS
// ************************
// let the writable clients send data
2003-10-27 17:06:00 +01:00
for (std::vector<boost::shared_ptr<socket> >::iterator i
= writable_clients.begin();
2003-10-23 18:55:52 +02:00
i != writable_clients.end();
++i)
{
connection_map::iterator p = m_connections.find(*i);
// the connection may have been disconnected in the receive phase
if (p == m_connections.end())
{
m_selector.remove(*i);
}
else
{
try
{
assert(m_selector.is_writability_monitored(p->first));
assert(p->second->has_data());
p->second->send_data();
}
2003-12-01 22:27:27 +01:00
catch(std::exception&)
{
// the connection wants to disconnect for some reason, remove it
// from the connection-list
m_selector.remove(*i);
m_connections.erase(p);
}
}
}
// ************************
// ERROR SOCKETS
// ************************
// disconnect the one we couldn't connect to
2003-10-23 18:55:52 +02:00
for (std::vector<boost::shared_ptr<socket> >::iterator i = error_clients.begin();
i != error_clients.end();
++i)
{
connection_map::iterator p = m_connections.find(*i);
m_selector.remove(*i);
// the connection may have been disconnected in the receive or send phase
if (p != m_connections.end()) m_connections.erase(p);
}
#ifndef NDEBUG
assert_invariant();
#endif
2003-11-09 19:17:09 +01:00
boost::posix_time::time_duration d = boost::posix_time::second_clock::local_time() - timer;
if (d.seconds() < 1) continue;
timer = boost::posix_time::second_clock::local_time();
// ************************
2003-11-09 19:17:09 +01:00
// THE SECTION BELOW IS EXECUTED ONCE EVERY SECOND
// ************************
#ifndef NDEBUG
// std::cout << "\n\nloops: " << loops_per_second << "\n";
loops_per_second = 0;
#endif
2003-11-09 19:17:09 +01:00
// distribute the maximum upload rate among the peers
// TODO: implement an intelligent algorithm that
// will shift bandwidth from the peers that can't
// utilize all their assigned bandwidth to the peers
// that actually can maintain the upload rate.
2003-11-28 18:29:27 +01:00
// This should probably be done by accumulating the
// left-over bandwidth to next second. Since the
// the sockets consumes its data in rather big chunks.
2003-12-07 06:53:04 +01:00
control_upload_rates(m_upload_rate, m_connections);
// do the second_tick() on each connection
// this will update their statistics (download and upload speeds)
// also purge sockets that have timed out
// and keep sockets open by keeping them alive.
for (connection_map::iterator i = m_connections.begin();
i != m_connections.end();)
{
connection_map::iterator j = i;
++i;
// if this socket has timed out
// close it.
if (j->second->has_timed_out())
{
m_selector.remove(j->first);
m_connections.erase(j);
continue;
}
j->second->keep_alive();
}
// check each torrent for abortion or
// tracker updates
2003-10-31 05:02:51 +01:00
for (std::map<sha1_hash, boost::shared_ptr<torrent> >::iterator i
= m_torrents.begin();
i != m_torrents.end();)
{
if (i->second->is_aborted())
{
2003-10-31 05:02:51 +01:00
m_tracker_manager.queue_request(
i->second->generate_tracker_request(m_listen_port));
i->second->close_all_connections();
2003-11-10 14:15:41 +01:00
#ifndef NDEBUG
sha1_hash i_hash = i->second->torrent_file().info_hash();
#endif
std::map<sha1_hash, boost::shared_ptr<torrent> >::iterator j = i;
++i;
m_torrents.erase(j);
2003-11-10 14:15:41 +01:00
assert(m_torrents.find(i_hash) == m_torrents.end());
continue;
}
else if (i->second->should_request())
{
m_tracker_manager.queue_request(
2003-10-31 05:02:51 +01:00
i->second->generate_tracker_request(m_listen_port),
boost::get_pointer(i->second));
}
i->second->second_tick();
++i;
}
m_tracker_manager.tick();
2003-11-09 19:17:09 +01:00
#ifndef NDEBUG
(*m_logger) << "peers: " << m_connections.size() << " \n";
for (connection_map::iterator i = m_connections.begin();
i != m_connections.end();
++i)
{
(*m_logger) << "h: " << i->first->sender().as_string()
<< " | down: " << i->second->statistics().download_rate()
<< " b/s | up: " << i->second->statistics().upload_rate()
<< " b/s \n";
}
#endif
}
while (!m_tracker_manager.send_finished())
{
m_tracker_manager.tick();
boost::xtime t;
boost::xtime_get(&t, boost::TIME_UTC);
2003-12-07 06:53:04 +01:00
t.nsec += 100000000;
boost::thread::sleep(t);
}
2003-11-10 14:15:41 +01:00
#ifndef NDEBUG
}
catch(std::bad_cast& e)
{
std::cerr << e.what() << "\n";
}
catch(std::exception& e)
{
std::cerr << e.what() << "\n";
}
catch(...)
{
std::cerr << "error!\n";
}
#endif
}
2003-10-27 16:43:33 +01:00
// the return value from this function is valid only as long as the
// session is locked!
2003-10-31 05:02:51 +01:00
torrent* session_impl::find_torrent(const sha1_hash& info_hash)
{
2003-10-27 16:43:33 +01:00
std::map<sha1_hash, boost::shared_ptr<torrent> >::iterator i
= m_torrents.find(info_hash);
if (i != m_torrents.end()) return boost::get_pointer(i->second);
2003-10-30 00:28:09 +01:00
return 0;
}
2003-10-27 16:43:33 +01:00
2003-11-09 19:17:09 +01:00
#ifndef NDEBUG
boost::shared_ptr<logger> session_impl::create_log(std::string name)
{
name = "libtorrent_log_" + name + ".log";
// current options are file_logger and cout_logger
#if defined(TORRENT_VERBOSE_LOGGING)
return boost::shared_ptr<logger>(new file_logger(name.c_str()));
#else
return boost::shared_ptr<logger>(new null_logger());
#endif
}
#endif
#ifndef NDEBUG
void session_impl::assert_invariant()
{
for (connection_map::iterator i = m_connections.begin();
i != m_connections.end();
++i)
{
assert(i->second->has_data() == m_selector.is_writability_monitored(i->first));
if (i->second->associated_torrent())
{
assert(i->second->associated_torrent()
->get_policy().has_connection(boost::get_pointer(i->second)));
}
}
}
#endif
}
session::session(int listen_port, const std::string& fingerprint)
: m_impl(listen_port, fingerprint)
, m_checker_impl(&m_impl)
, m_thread(boost::ref(m_impl))
, m_checker_thread(boost::ref(m_checker_impl))
{
#ifndef NDEBUG
boost::function0<void> test = boost::ref(m_impl);
assert(!test.empty());
#endif
}
2003-11-26 15:54:56 +01:00
// TODO: add a check to see if filenames are accepted on the
// current platform.
// if the torrent already exists, this will throw duplicate_torrent
torrent_handle session::add_torrent(
const torrent_info& ti
, const boost::filesystem::path& save_path)
{
2003-10-31 05:02:51 +01:00
{
// lock the session
boost::mutex::scoped_lock l(m_impl.m_mutex);
// is the torrent already active?
if (m_impl.find_torrent(ti.info_hash()))
throw duplicate_torrent();
2003-10-31 05:02:51 +01:00
}
2003-10-31 05:02:51 +01:00
{
// lock the checker_thread
boost::mutex::scoped_lock l(m_checker_impl.m_mutex);
// is the torrent currently being checked?
if (m_checker_impl.find_torrent(ti.info_hash()))
throw duplicate_torrent();
2003-10-31 05:02:51 +01:00
}
// create the torrent and the data associated with
// the checker thread and store it before starting
// the thread
2003-12-07 06:53:04 +01:00
boost::shared_ptr<torrent> torrent_ptr(new torrent(m_impl, ti, save_path));
2003-10-31 05:02:51 +01:00
detail::piece_checker_data d;
d.torrent_ptr = torrent_ptr;
d.save_path = save_path;
d.info_hash = ti.info_hash();
// lock the checker thread
boost::mutex::scoped_lock l(m_checker_impl.m_mutex);
2003-10-31 05:02:51 +01:00
// add the torrent to the queue to be checked
m_checker_impl.m_torrents.push_back(d);
// and notify the thread that it got another
// job in its queue
m_checker_impl.m_cond.notify_one();
2003-10-31 05:02:51 +01:00
return torrent_handle(&m_impl, &m_checker_impl, ti.info_hash());
}
2003-11-08 03:16:26 +01:00
void session::remove_torrent(const torrent_handle& h)
{
if (h.m_ses != &m_impl) return;
2003-11-28 18:29:27 +01:00
assert(h.m_chk == &m_checker_impl);
{
boost::mutex::scoped_lock l(m_impl.m_mutex);
torrent* t = m_impl.find_torrent(h.m_info_hash);
if (t != 0)
{
t->abort();
return;
}
}
{
boost::mutex::scoped_lock l(m_checker_impl.m_mutex);
detail::piece_checker_data* d = m_checker_impl.find_torrent(h.m_info_hash);
if (d != 0)
{
d->abort = true;
return;
}
}
2003-11-08 03:16:26 +01:00
}
void session::set_http_settings(const http_settings& s)
{
boost::mutex::scoped_lock l(m_impl.m_mutex);
m_impl.m_settings = s;
}
session::~session()
{
{
boost::mutex::scoped_lock l(m_impl.m_mutex);
m_impl.m_abort = true;
}
2003-10-31 05:02:51 +01:00
{
boost::mutex::scoped_lock l(m_checker_impl.m_mutex);
// abort the checker thread
m_checker_impl.m_abort = true;
2003-10-31 05:02:51 +01:00
// abort the currently checking torrent
if (!m_checker_impl.m_torrents.empty())
{
m_checker_impl.m_torrents.front().abort = true;
}
m_checker_impl.m_cond.notify_one();
}
m_thread.join();
m_checker_thread.join();
}
2003-11-09 19:17:09 +01:00
void session::set_upload_rate_limit(int bytes_per_second)
{
2003-11-10 14:15:41 +01:00
assert(bytes_per_second > 0 || bytes_per_second == -1);
2003-11-09 19:17:09 +01:00
boost::mutex::scoped_lock l(m_impl.m_mutex);
m_impl.m_upload_rate = bytes_per_second;
if (m_impl.m_upload_rate != -1 || !m_impl.m_connections.empty())
return;
for (detail::session_impl::connection_map::iterator i
= m_impl.m_connections.begin();
i != m_impl.m_connections.end();)
{
i->second->set_send_quota(-1);
}
}
2003-11-29 17:34:07 +01:00
std::auto_ptr<alert> session::pop_alert()
{
return m_impl.m_alerts.get();
}
// TODO: document
2003-10-31 05:02:51 +01:00
// TODO: if the first 4 charachters are printable
// maybe they should be considered a fingerprint?
std::string extract_fingerprint(const peer_id& p)
{
std::string ret;
const unsigned char* c = p.begin();
while (c != p.end() && *c != 0)
{
2003-10-28 02:20:50 +01:00
if (std::isprint(*c))
ret += *c;
else if (*c <= 9)
ret += '0'+ *c;
else
return std::string();
++c;
}
if (c == p.end()) return std::string();
return ret;
}
}