From 24e4c197c9a4e2868fe972bbe0543352182c999e Mon Sep 17 00:00:00 2001 From: Arvid Norberg Date: Sun, 14 Dec 2003 05:56:12 +0000 Subject: [PATCH] *** empty log message *** --- docs/index.html | 17 +- docs/index.rst | 19 +- examples/client_test.cpp | 31 ++- include/libtorrent/peer_connection.hpp | 12 +- include/libtorrent/policy.hpp | 44 +++-- include/libtorrent/socket.hpp | 1 + include/libtorrent/stat.hpp | 4 - include/libtorrent/torrent.hpp | 18 +- include/libtorrent/torrent_handle.hpp | 6 + src/peer_connection.cpp | 36 ++-- src/policy.cpp | 259 +++++++++++++++++++++---- src/session.cpp | 142 ++++++++++++-- src/storage.cpp | 22 ++- src/torrent.cpp | 36 ++-- src/torrent_handle.cpp | 29 +++ 15 files changed, 512 insertions(+), 164 deletions(-) diff --git a/docs/index.html b/docs/index.html index 9ab1124c9..4f4ab5ce3 100755 --- a/docs/index.html +++ b/docs/index.html @@ -90,17 +90,19 @@ The current state includes the following features:

thread-safe library interface. (i.e. There's no way for the user to cause a deadlock).
  • can limit the upload bandwidth usage
  • piece-wise file allocation
  • +
  • upload rate limit, balanced depending on download speed and upload bandwidth
  • Functions that are yet to be implemented:

    libtorrent is portable at least among windows, macosx, and UNIX-systems. It uses boost.thread, @@ -407,6 +409,8 @@ struct torrent_handle boost::filsystem::path save_path() const; + void set_max_uploads(int max_uploads); + sha1_hash info_hash() const; bool operator==(const torrent_handle&) const; @@ -416,10 +420,12 @@ struct torrent_handle

    The default constructor will initialize the handle to an invalid state. Which means you cannot perform any operation on it, unless you first assign it a valid handle. If you try to perform -any operation they will simply return.

    +any operation on an uninitialized handle, it will throw invalid_handle.

    save_path() returns the path that was given to add_torrent() when this torrent was started.

    info_hash() returns the info hash for the torrent.

    +

    set_max_uploads() sets the maximum number of peers that's unchoked at the same time on this +torrent. If you set this to -1, there will be no limit.

    status()

    status() will return a structure with information about the status of this @@ -904,6 +910,7 @@ int main(int argc, char* argv[])

    Aknowledgements

    Written by Arvid Norberg and Daniel Wallin. Copyright (c) 2003

    +

    Contributions by Magnus Jonsson

    Project is hosted by sourceforge.

    sf_logo

    diff --git a/docs/index.rst b/docs/index.rst index 4344cc47b..2c23b2dd1 100755 --- a/docs/index.rst +++ b/docs/index.rst @@ -41,18 +41,20 @@ The current state includes the following features: thread-safe library interface. (i.e. There's no way for the user to cause a deadlock). * can limit the upload bandwidth usage * piece-wise file allocation + * upload rate limit, balanced depending on download speed and upload bandwidth __ http://home.elp.rr.com/tur/multitracker-spec.txt .. _Azureus: http://azureus.sourceforge.net Functions that are yet to be implemented: - * optimistic unchoke - * choke/unchoke algorithm - * Snubbing + * more generous optimistic unchoke + * better choke/unchoke algorithm * fast resume + * number of connections limit + * better handling of peers that send bad data + * ip-filters * file-level piece priority - * a good upload speed cap (the one currently used don't balance loads between peers) libtorrent is portable at least among windows, macosx, and UNIX-systems. It uses boost.thread, boost.filesystem boost.date_time and various other boost libraries and zlib. @@ -426,6 +428,8 @@ Its declaration looks like this:: boost::filsystem::path save_path() const; + void set_max_uploads(int max_uploads); + sha1_hash info_hash() const; bool operator==(const torrent_handle&) const; @@ -435,13 +439,16 @@ Its declaration looks like this:: The default constructor will initialize the handle to an invalid state. Which means you cannot perform any operation on it, unless you first assign it a valid handle. If you try to perform -any operation they will simply return. +any operation on an uninitialized handle, it will throw ``invalid_handle``. ``save_path()`` returns the path that was given to ``add_torrent()`` when this torrent was started. ``info_hash()`` returns the info hash for the torrent. +``set_max_uploads()`` sets the maximum number of peers that's unchoked at the same time on this +torrent. If you set this to -1, there will be no limit. + status() ~~~~~~~~ @@ -990,6 +997,8 @@ Aknowledgements Written by Arvid Norberg and Daniel Wallin. Copyright (c) 2003 +Contributions by Magnus Jonsson + Project is hosted by sourceforge. |sf_logo|__ diff --git a/examples/client_test.cpp b/examples/client_test.cpp index 2fcbbb6b3..b4207de4c 100755 --- a/examples/client_test.cpp +++ b/examples/client_test.cpp @@ -36,7 +36,7 @@ POSSIBILITY OF SUCH DAMAGE. #include #include -//#include +#include #include "libtorrent/entry.hpp" #include "libtorrent/bencode.hpp" @@ -75,7 +75,7 @@ void clear() HANDLE h = GetStdHandle(STD_OUTPUT_HANDLE); COORD c = {0, 0}; DWORD n; - FillConsoleOutputCharacter(h, ' ', 80 * 80, c, &n); + FillConsoleOutputCharacter(h, ' ', 120 * 80, c, &n); } #else @@ -185,7 +185,7 @@ int main(int argc, char* argv[]) session s(6881, "E\x1"); // limit upload rate to 100 kB/s -// s.set_upload_rate_limit(100 * 1024); + s.set_upload_rate_limit(100 * 1024); s.set_http_settings(settings); for (int i = 0; i < argc-1; ++i) @@ -198,6 +198,7 @@ int main(int argc, char* argv[]) torrent_info t(e); t.print(std::cout); handles.push_back(s.add_torrent(t, "")); + handles.back().set_max_uploads(20); } catch (std::exception& e) { @@ -246,15 +247,7 @@ int main(int argc, char* argv[]) int total_down = s.total_download; int total_up = s.total_upload; int num_peers = peers.size(); -/* - std::cout << boost::format("%f%% p:%d d:(%s) %s/s u:(%s) %s/s\n") - % (s.progress*100) - % num_peers - % add_suffix(total_down) - % add_suffix(down) - % add_suffix(total_up) - % add_suffix(up); -*/ + out.precision(4); out.width(5); out.fill(' '); @@ -276,8 +269,7 @@ int main(int argc, char* argv[]) << "diff: " << add_suffix(total_down - total_up) << "\n"; boost::posix_time::time_duration t = s.next_announce; -// std::cout << "next announce: " << boost::posix_time::to_simple_string(t) << "\n"; - out << "next announce: " << t.hours() << ":" << t.minutes() << ":" << t.seconds() << "\n"; + out << "next announce: " << boost::posix_time::to_simple_string(t) << "\n"; for (std::vector::iterator i = peers.begin(); i != peers.end(); @@ -288,7 +280,7 @@ int main(int argc, char* argv[]) << "u: " << add_suffix(i->up_speed) << "/s " << "(" << add_suffix(i->total_upload) << ") " << "df: " << add_suffix((int)i->total_download - (int)i->total_upload) << " " - << "l: " << add_suffix(i->upload_ceiling) << "/s " + << "l: " << add_suffix(i->upload_limit) << "/s " << "f: " << static_cast((i->flags & peer_info::interesting)?"I":"_") << static_cast((i->flags & peer_info::choked)?"C":"_") @@ -305,13 +297,12 @@ int main(int argc, char* argv[]) if (progress * 20 > j) out << "#"; else out << "-"; } + out << "\n"; } - - out << "\n"; } out << "___________________________________\n"; -/* + i->get_download_queue(queue); for (std::vector::iterator i = queue.begin(); i != queue.end(); @@ -324,13 +315,13 @@ int main(int argc, char* argv[]) { if (i->finished_blocks[j]) out << "#"; else if (i->requested_blocks[j]) out << "="; - else out << "-"; + else out << "."; } out << "|\n"; } out << "___________________________________\n"; -*/ + } clear(); diff --git a/include/libtorrent/peer_connection.hpp b/include/libtorrent/peer_connection.hpp index b7aa43870..3af1c2366 100755 --- a/include/libtorrent/peer_connection.hpp +++ b/include/libtorrent/peer_connection.hpp @@ -59,7 +59,17 @@ POSSIBILITY OF SUCH DAMAGE. // TODO: maybe there should be some kind of // per-torrent free-upload counter. All free // download we get is put in there and increases -// the amount of free upload we give. +// the amount of free upload we give. The free upload +// could be distributed to the interest peers +// depending on amount we have downloaded from +// the peer and depending on the share ratio. +// there's no point in giving free upload to +// peers we can trade with. Maybe the free upload +// only should be given to those we are not interested +// in? + +// TODO: the interested flag has to be updated when we +// get pieces. namespace libtorrent { diff --git a/include/libtorrent/policy.hpp b/include/libtorrent/policy.hpp index bfb129889..08d0258c0 100755 --- a/include/libtorrent/policy.hpp +++ b/include/libtorrent/policy.hpp @@ -36,14 +36,11 @@ POSSIBILITY OF SUCH DAMAGE. #include #include -#include #include #include "libtorrent/peer.hpp" #include "libtorrent/piece_picker.hpp" -// TODO: should be able to close connections with too low bandwidth to save memory - namespace libtorrent { @@ -63,7 +60,7 @@ namespace libtorrent // called when an incoming connection is accepted // return false if the connection closed - bool new_connection(const boost::weak_ptr& c); + bool new_connection(peer_connection& c); // this is called once for every peer we get from // the tracker @@ -79,7 +76,7 @@ namespace libtorrent // the peer has got at least one interesting piece void peer_is_interesting(peer_connection& c); - void piece_finished(peer_connection& c, int index, bool successfully_verified); + void piece_finished(int index, bool successfully_verified); void block_finished(peer_connection& c, piece_block b); @@ -95,23 +92,22 @@ namespace libtorrent // the peer is not interested in our pieces void not_interested(peer_connection& c); + void set_max_uploads(int num_unchoked); + #ifndef NDEBUG bool has_connection(const peer_connection* p); + + void check_invariant(); #endif private: struct peer { - peer(const peer_id& pid) - : id(pid) - , last_optimistically_unchoked(boost::posix_time::second_clock::local_time()) - , connected(boost::posix_time::second_clock::local_time()) - , optimistic_unchokes(0) - , prev_amount_upload(0) - , prev_amount_download(0) - , banned(false) - {} + peer(const peer_id& pid); + + int total_download() const; + int total_upload() const; bool operator==(const peer_id& pid) const { return id == pid; } @@ -130,10 +126,6 @@ namespace libtorrent // or disconnected if it isn't connected right now boost::posix_time::ptime connected; - // the number of optimistic unchokes this peer has - // been given - int optimistic_unchokes; - // this is the accumulated amount of // uploaded and downloaded data to this // peer. It only accounts for what was @@ -151,9 +143,13 @@ namespace libtorrent // if the peer is connected now, this // will refer to a valid peer_connection - boost::weak_ptr connection; + peer_connection* connection; }; + bool unchoke_one_peer(); + peer* find_choke_candidate(); + peer* find_unchoke_candidate(); + // a functor that identifies peers that have disconnected and that // are too old for still being saved. struct old_disconnected_peer @@ -162,7 +158,7 @@ namespace libtorrent { using namespace boost::posix_time; - return p.connection.expired() + return p.connection == 0 && second_clock::local_time() - p.connected > seconds(5*60); } }; @@ -173,6 +169,14 @@ namespace libtorrent int m_num_peers; torrent* m_torrent; + // the total number of unchoked peers at + // any given time. If set to -1, it's unlimited. + // must be 2 or higher otherwise. + int m_max_uploads; + + // the number of unchoked peers + // at any given time + int m_num_unchoked; }; } diff --git a/include/libtorrent/socket.hpp b/include/libtorrent/socket.hpp index b741e4c1a..493d74e6e 100755 --- a/include/libtorrent/socket.hpp +++ b/include/libtorrent/socket.hpp @@ -35,6 +35,7 @@ POSSIBILITY OF SUCH DAMAGE. // TODO: remove the dependency of // platform specific headers here. +// sockaddr_in is hard to get rid of in a nice way #if defined(_WIN32) #include diff --git a/include/libtorrent/stat.hpp b/include/libtorrent/stat.hpp index 67a48742b..ae54ca628 100755 --- a/include/libtorrent/stat.hpp +++ b/include/libtorrent/stat.hpp @@ -75,10 +75,6 @@ namespace libtorrent m_total_upload_protocol += s.m_uploaded_protocol; } - // TODO: these function should take two arguments - // to be able to count both total data sent and also - // count only the actual payload (not counting the - // protocol chatter) void received_bytes(int bytes_payload, int bytes_protocol) { m_downloaded += bytes_payload; diff --git a/include/libtorrent/torrent.hpp b/include/libtorrent/torrent.hpp index e059844ab..867e236f2 100755 --- a/include/libtorrent/torrent.hpp +++ b/include/libtorrent/torrent.hpp @@ -63,18 +63,6 @@ namespace libtorrent struct session_impl; } - // TODO: each torrent should have a status value that - // reflects what's happening to it - // TODO: There should be a maximum number of peers that - // is maintained (if someone disconnects, try to connect to - // anotherone). There should also be a candidate slot where a - // new peer is tried for one minute, and if it has better ownload - // speed than one of the peers currently connected, it will be - // replaced to maximize bandwidth usage. It wil also have to - // depend on how many and which pieces the peers have. - // TODO: In debug mode all pieces that are sent should be checked. - - // a torrent is a class that holds information // for a specific download. It updates itself against // the tracker @@ -88,7 +76,8 @@ namespace libtorrent detail::session_impl& ses , const torrent_info& torrent_file , const boost::filesystem::path& save_path); - ~torrent() {} + + ~torrent(); void abort() { m_abort = true; m_event = event_stopped; } bool is_aborted() const { return m_abort; } @@ -115,7 +104,7 @@ namespace libtorrent torrent_status status() const; - boost::weak_ptr connect_to_peer( + peer_connection& connect_to_peer( const address& a , const peer_id& id); @@ -248,6 +237,7 @@ namespace libtorrent #ifndef NDEBUG virtual void debug_log(const std::string& line); + void check_invariant(); #endif private: diff --git a/include/libtorrent/torrent_handle.hpp b/include/libtorrent/torrent_handle.hpp index 884dae8c5..d1e51e43a 100755 --- a/include/libtorrent/torrent_handle.hpp +++ b/include/libtorrent/torrent_handle.hpp @@ -115,8 +115,14 @@ namespace libtorrent // to finish all pieces currently in the pipeline, and then // abort the torrent. + // TODO: add finish_file_allocation, which will force the + // torrent to allocate storage for all pieces. + boost::filesystem::path save_path() const; + // -1 means unlimited unchokes + void set_max_uploads(int max_uploads); + const sha1_hash& info_hash() const { return m_info_hash; } diff --git a/src/peer_connection.cpp b/src/peer_connection.cpp index 1b7206e26..7044eafd6 100755 --- a/src/peer_connection.cpp +++ b/src/peer_connection.cpp @@ -174,8 +174,7 @@ libtorrent::peer_connection::~peer_connection() void libtorrent::peer_connection::set_send_quota(int num_bytes) { - assert(num_bytes <= m_send_quota_limit); - assert(num_bytes >= 0); + assert(num_bytes <= m_send_quota_limit || m_send_quota_limit == -1); if (num_bytes > m_send_quota_limit) num_bytes = m_send_quota_limit; m_send_quota = num_bytes; @@ -438,12 +437,17 @@ bool libtorrent::peer_connection::dispatch_message(int received) r.piece = read_int(&m_recv_buffer[1]); r.start = read_int(&m_recv_buffer[5]); r.length = read_int(&m_recv_buffer[9]); - m_requests.push_back(r); if (!m_choked) { + m_requests.push_back(r); send_buffer_updated(); } + else + { + // ignoring request since we have + // choked this peer + } #ifndef NDEBUG (*m_logger) << m_socket->sender().as_string() << " <== REQUEST [ piece: " << r.piece << " | s: " << r.start << " | l: " << r.length << " ]\n"; @@ -575,7 +579,7 @@ bool libtorrent::peer_connection::dispatch_message(int received) { m_torrent->piece_failed(index); } - m_torrent->get_policy().piece_finished(*this, index, verified); + m_torrent->get_policy().piece_finished(index, verified); } break; } @@ -688,9 +692,6 @@ void libtorrent::peer_connection::request_block(piece_block block) std::size_t start_offset = m_send_buffer.size(); m_send_buffer.resize(start_offset + 17); - // TODO: add a timeout to disconnect peer if we don't get any piece messages when - // we have requested. - std::copy(buf, buf + 5, m_send_buffer.begin()+start_offset); start_offset +=5; @@ -741,6 +742,7 @@ void libtorrent::peer_connection::choke() #ifndef NDEBUG (*m_logger) << m_socket->sender().as_string() << " ==> CHOKE\n"; #endif + m_requests.clear(); send_buffer_updated(); } @@ -804,19 +806,15 @@ void libtorrent::peer_connection::second_tick() // client has sent us. This is the mean to // maintain a 1:1 share ratio with all peers. - // TODO: make sure the rate is able to rise if - // both peers uses this technique! It could be - // enough to just have a constant positive bias - // of the send_quota_limit int diff = static_cast(m_statistics.total_download()) - static_cast(m_statistics.total_upload()); - if (diff > m_torrent->torrent_file().piece_length()) + if (diff > 2*m_torrent->block_size()) { // if we have downloaded more than one piece more // than we have uploaded, have an unlimited // upload rate - m_send_quota = -1; + m_send_quota_limit = -1; } else { @@ -824,18 +822,20 @@ void libtorrent::peer_connection::second_tick() // upload rate of 10 kB/s more than we dowlload // if we have uploaded too much, send with a rate of // 10 kB/s less than we receive - if (diff > -32*1024) + int bias = 0; + if (diff > -2*m_torrent->block_size()) { - m_send_quota_limit = m_statistics.download_rate() * 1.5; + bias = m_statistics.download_rate() * .5; + if (bias < 10*1024) bias = 10*1024; } else { - m_send_quota_limit = m_statistics.download_rate() * .5; + bias = -m_statistics.download_rate() * .5; } + m_send_quota_limit = m_statistics.download_rate() + bias; // the maximum send_quota given our download rate from this peer - if (m_send_quota_limit < 500) m_send_quota_limit = 500; + if (m_send_quota_limit < 256) m_send_quota_limit = 256; } - assert(m_send_quota_limit >= 500 || m_send_quota_limit == -1); } // -------------------------- diff --git a/src/policy.cpp b/src/policy.cpp index 0f0e6268c..0ab42c743 100755 --- a/src/policy.cpp +++ b/src/policy.cpp @@ -49,7 +49,11 @@ namespace { // we try to maintain 4 requested blocks in the download // queue - request_queue = 16 + request_queue = 16, + + // the amount of free upload allowed before + // the peer is choked + free_upload_amount = 4 * 16 * 1024 }; @@ -197,18 +201,83 @@ namespace libtorrent void peer_connection::request_piece(int index); const std::vector& peer_connection::download_queue(); - TODO: implement a limit of the number of unchoked peers. - TODO: implement some kind of limit of the number of sockets opened, to use for systems where a user has a limited number - of open file descriptors + of open file descriptors. and for windows which has a buggy tcp-stack. */ - policy::policy(torrent* t) : m_num_peers(0) , m_torrent(t) + , m_max_uploads(-1) + , m_num_unchoked(0) {} + // finds the peer that has the worst download rate + // and returns it. May return 0 if all peers are + // choked. + policy::peer* policy::find_choke_candidate() + { + peer* worst_peer = 0; + int min_weight = std::numeric_limits::max(); + + for (std::vector::iterator i = m_peers.begin(); + i != m_peers.end(); + ++i) + { + peer_connection* c = i->connection; + + if (c == 0) continue; + if (c->is_choked()) continue; + // if the peer isn't interested, just choke it + if (!c->is_peer_interested()) + return &(*i); + + int diff = i->total_download() + - i->total_upload(); + + int weight = c->statistics().download_rate() * 10 + + diff + + (c->has_peer_choked()?-10:10)*1024; + + if (weight > min_weight) continue; + + min_weight = weight; + worst_peer = &(*i); + continue; + } + return worst_peer; + } + + policy::peer* policy::find_unchoke_candidate() + { + // if all of our peers are unchoked, there's + // no left to unchoke + if (m_num_unchoked == m_torrent->num_peers()) + return 0; + + using namespace boost::posix_time; + using namespace boost::gregorian; + + peer* unchoke_peer = 0; + ptime min_time(date(9999,Jan,1)); + + for (std::vector::iterator i = m_peers.begin(); + i != m_peers.end(); + ++i) + { + peer_connection* c = i->connection; + if (c == 0) continue; + if (!c->is_choked()) continue; + if (!c->is_peer_interested()) continue; + if (i->total_download() - i->total_upload() + < -free_upload_amount) continue; + if (i->last_optimistically_unchoked > min_time) continue; + + min_time = i->last_optimistically_unchoked; + unchoke_peer = &(*i); + } + return unchoke_peer; + } void policy::pulse() { @@ -221,34 +290,67 @@ namespace libtorrent , old_disconnected_peer()) , m_peers.end()); - // choke peers that have leeched too much without giving anything back - for (std::vector::iterator i = m_peers.begin(); i != m_peers.end(); ++i) + if (m_max_uploads != -1) { - boost::shared_ptr c = i->connection.lock(); - if (c.get() == 0) continue; - - int downloaded = i->prev_amount_download + c->statistics().total_download(); - int uploaded = i->prev_amount_upload + c->statistics().total_upload(); - - if (uploaded - downloaded > m_torrent->torrent_file().piece_length() - && !c->is_choked()) + // make sure we don't have too many + // unchoked peers + while (m_num_unchoked > m_max_uploads) { - // if we have uploaded more than a piece for free, choke peer and - // wait until we catch up with our download. - c->choke(); + peer* p = find_choke_candidate(); + assert(p); + p->connection->choke(); + --m_num_unchoked; } - else if (uploaded - downloaded <= m_torrent->block_size() - && c->is_choked() && c->is_interesting()) - { - // TODO: if we're not interested in this peer - // we should only unchoke it if it' its turn - // to be optimistically unchoked. - // we have catched up. We have now shared the same amount - // to eachother. Unchoke this peer. - c->unchoke(); + // optimistic unchoke. trade the 'worst' + // unchoked peer with one of the choked + assert(m_num_unchoked <= m_torrent->num_peers()); + peer* p = find_choke_candidate(); + if (p) + { + p->connection->choke(); + --m_num_unchoked; + unchoke_one_peer(); } + + // make sure we have enough + // unchoked peers + while (m_num_unchoked < m_max_uploads && unchoke_one_peer()); } + else + { + // choke peers that have leeched too much without giving anything back + for (std::vector::iterator i = m_peers.begin(); + i != m_peers.end(); + ++i) + { + peer_connection* c = i->connection; + if (c == 0) continue; + + int downloaded = i->total_download(); + int uploaded = i->total_upload(); + + if (downloaded - uploaded < -free_upload_amount + && !c->is_choked()) + { + // if we have uploaded more than a piece for free, choke peer and + // wait until we catch up with our download. + c->choke(); + } + else if (downloaded - uploaded > -free_upload_amount + && c->is_choked() && c->is_peer_interested()) + { + // we have catched up. We have now shared the same amount + // to eachother. Unchoke this peer. + c->unchoke(); + } + } + + } + +#ifndef NDEBUG + check_invariant(); +#endif } void policy::ban_peer(const peer_connection& c) @@ -259,30 +361,29 @@ namespace libtorrent i->banned = true; } - bool policy::new_connection(const boost::weak_ptr& c) + bool policy::new_connection(peer_connection& c) { - boost::shared_ptr con = c.lock(); - assert(con.get() != 0); - if (con.get() == 0) return false; - std::vector::iterator i - = std::find(m_peers.begin(), m_peers.end(), con->get_peer_id()); + = std::find(m_peers.begin(), m_peers.end(), c.get_peer_id()); if (i == m_peers.end()) { + using namespace boost::posix_time; + using namespace boost::gregorian; + // we don't have ny info about this peer. // add a new entry - peer p(con->get_peer_id()); + peer p(c.get_peer_id()); m_peers.push_back(p); i = m_peers.end()-1; } else { - assert(i->connection.expired()); + assert(i->connection == 0); if (i->banned) return false; } i->connected = boost::posix_time::second_clock::local_time(); - i->connection = c; + i->connection = &c; return true; } @@ -293,13 +394,16 @@ namespace libtorrent std::vector::iterator i = std::find(m_peers.begin(), m_peers.end(), id); if (i == m_peers.end()) { + using namespace boost::posix_time; + using namespace boost::gregorian; + // we don't have ny info about this peer. // add a new entry peer p(id); m_peers.push_back(p); i = m_peers.end()-1; } - else if (!i->connection.expired()) + else if (!i->connection == 0) { // this means we're already connected // to this peer. don't connect to @@ -310,7 +414,7 @@ namespace libtorrent if (i->banned) return; i->connected = boost::posix_time::second_clock::local_time(); - i->connection = m_torrent->connect_to_peer(remote, id); + i->connection = &m_torrent->connect_to_peer(remote, id); } catch(network_error&) {} @@ -324,8 +428,7 @@ namespace libtorrent { } - // TODO: the peer_connection argument here should be removed. - void policy::piece_finished(peer_connection& c, int index, bool successfully_verified) + void policy::piece_finished(int index, bool successfully_verified) { // TODO: if verification failed, mark the peers that were involved // in some way @@ -348,15 +451,33 @@ namespace libtorrent } } + // called when a peer is interested in us void policy::interested(peer_connection& c) { // if we're interested in the peer, we unchoke it // and hopes it will unchoke us too - if (c.is_interesting()) c.unchoke(); - } +/* if (c.is_interesting()) + { + c.unchoke(); + ++m_num_unchoked; + } +*/ } + // called when a peer is no longer interested in us void policy::not_interested(peer_connection& c) { + c.not_interested(); + } + + bool policy::unchoke_one_peer() + { + peer* p = find_unchoke_candidate(); + if (p == 0) return false; + + p->connection->unchoke(); + p->last_optimistically_unchoked = boost::posix_time::second_clock::local_time(); + ++m_num_unchoked; + return true; } // this is called whenever a peer connection is closed @@ -370,6 +491,18 @@ namespace libtorrent i->connected = boost::posix_time::second_clock::local_time(); i->prev_amount_download += c.statistics().total_download(); i->prev_amount_upload += c.statistics().total_upload(); + if (!i->connection->is_choked() && !m_torrent->is_aborted()) + { + --m_num_unchoked; + unchoke_one_peer(); + } + i->connection = 0; + } + + void policy::set_max_uploads(int max_uploads) + { + assert(max_uploads > 1 || max_uploads == -1); + m_max_uploads = max_uploads; } void policy::peer_is_interesting(peer_connection& c) @@ -384,5 +517,47 @@ namespace libtorrent { return std::find(m_peers.begin(), m_peers.end(), p->get_peer_id()) != m_peers.end(); } + + void policy::check_invariant() + { + assert(m_max_uploads >= 2 || m_max_uploads == -1); + int actual_unchoked = 0; + for (std::vector::iterator i = m_peers.begin(); + i != m_peers.end(); + ++i) + { + if (!i->connection) continue; + if (!i->connection->is_choked()) actual_unchoked++; + } + assert(actual_unchoked <= m_max_uploads || m_max_uploads == -1); + } #endif + + policy::peer::peer(const peer_id& pid) + : id(pid) + , last_optimistically_unchoked( + boost::gregorian::date(1970,boost::gregorian::Jan,1)) + , connected(boost::posix_time::second_clock::local_time()) + , prev_amount_upload(0) + , prev_amount_download(0) + , banned(false) + {} + + int policy::peer::total_download() const + { + if (connection != 0) + return connection->statistics().total_download() + + prev_amount_download; + else + return prev_amount_download; + } + + int policy::peer::total_upload() const + { + if (connection != 0) + return connection->statistics().total_upload() + + prev_amount_upload; + else + return prev_amount_upload; + } } diff --git a/src/session.cpp b/src/session.cpp index 26f244af6..21dc89972 100755 --- a/src/session.cpp +++ b/src/session.cpp @@ -1,6 +1,6 @@ /* -Copyright (c) 2003, Arvid Norberg +Copyright (c) 2003, Arvid Norberg, Magnus Jonsson All rights reserved. Redistribution and use in source and binary forms, with or without @@ -63,6 +63,41 @@ namespace std namespace { + + // This struct is used by control_upload_rates() below. It keeps + // track how much bandwidth has been allocated to each connection + // and other relevant information to assist in the allocation process. + struct connection_info + { + libtorrent::peer_connection* p; // which peer_connection this info refers to + int allocated_quota; // bandwidth allocated to this peer connection + int quota_limit; // bandwidth limit + int estimated_upload_capacity; // estimated channel bandwidth + + bool operator < (const connection_info &other) const + { + return estimated_upload_capacity < other.estimated_upload_capacity; + } + + int give(int amount) + { + + // if amount > 0, try to add amount to the allocated quota. + // if amount < 0, try to subtract abs(amount) from the allocated quota + // + // Quota will not go above quota_limit or below 0. This means that + // not all the amount given or taken may be accepted. + // + // return value: how much quota was actually added (or subtracted if negative). + + int old_quota=allocated_quota; + allocated_quota+=amount; + allocated_quota=std::min(allocated_quota,quota_limit); + allocated_quota=std::max(0,allocated_quota); + return allocated_quota-old_quota; + } + }; + // adjusts the upload rates of every peer connection // to make sure the sum of all send quotas equals // the given upload_limit. An upload limit of -1 means @@ -92,21 +127,105 @@ namespace } return; } + else + { + // There's an upload limit, so we need to distribute the available + // upload bandwidth among the peer_connections fairly, but not + // wastefully. - // TODO: upload limit support is currently broken - assert(false); + // For each peer_connection, keep some local data about their + // quota limit and estimated upload capacity, and how much quota + // has been allocated to them. + std::vector peer_info; + + for (detail::session_impl::connection_map::iterator i = connections.begin(); + i != connections.end(); + ++i) + { + peer_connection& p = *i->second; + connection_info pi; + + pi.p=&p; + pi.allocated_quota=0; // we haven't given it any bandwith yet + pi.quota_limit=p.send_quota_limit(); + + pi.estimated_upload_capacity= + p.has_data() ? std::max(10,(int)p.statistics().upload_rate()*11/10) + // If there's no data to send, upload capacity is practically 0. + // Here we set it to 1 though, because otherwise it will not be able + // to accept any quota at all, which may upset quota_limit balances. + : 1; + + peer_info.push_back(pi); + } + + // Sum all peer_connections' quota limit to get the total quota limit. + + int sum_total_of_quota_limits=0; + for(int i=0;i0); + + for(int i=0;iset_send_quota(peer_info[i].allocated_quota); + } #ifndef NDEBUG - int sum = 0; + { + int sum_quota = 0; + int sum_quota_limit = 0; for (detail::session_impl::connection_map::iterator i = connections.begin(); i != connections.end(); ++i) { peer_connection& p = *i->second; - sum += p.send_quota(); + sum_quota += p.send_quota(); + + sum_quota_limit += p.send_quota_limit(); + } + assert(abs(sum_quota - std::min(upload_limit,sum_quota_limit)) < 10); } - assert(sum == upload_limit); #endif } } @@ -324,9 +443,6 @@ namespace libtorrent #ifndef NDEBUG (*m_logger) << s->sender().as_string() << " <== INCOMING CONNECTION\n"; #endif - // TODO: the send buffer size should be controllable from the outside -// s->set_send_bufsize(2048); - // TODO: filter ip:s boost::shared_ptr c( @@ -485,14 +601,6 @@ namespace libtorrent ++i; } // distribute the maximum upload rate among the peers - // TODO: implement an intelligent algorithm that - // will shift bandwidth from the peers that can't - // utilize all their assigned bandwidth to the peers - // that actually can maintain the upload rate. - // This should probably be done by accumulating the - // left-over bandwidth to next second. Since the - // the sockets consumes its data in rather big chunks. - control_upload_rates(m_upload_rate, m_connections); diff --git a/src/storage.cpp b/src/storage.cpp index 5e307caab..6443f93db 100755 --- a/src/storage.cpp +++ b/src/storage.cpp @@ -105,6 +105,10 @@ namespace { } +// TODO: implement fast resume. i.e. the possibility to +// supply additional information about which pieces are +// assigned to which slots. + namespace libtorrent { struct thread_safe_storage @@ -608,8 +612,7 @@ namespace libtorrent { for (int i = current_slot; i < m_info.num_pieces(); ++i) { - if (pieces[i]) - continue; + if (pieces[i] && i != current_slot) continue; const sha1_hash& hash = digest[ i == m_info.num_pieces() - 1]->get(); @@ -620,11 +623,20 @@ namespace libtorrent { if (found_piece != -1) { + if (pieces[found_piece]) + { + assert(m_piece_to_slot[found_piece] != -1); + m_slot_to_piece[m_piece_to_slot[found_piece]] = -2; + m_free_slots.push_back(m_piece_to_slot[found_piece]); + } + else + { m_bytes_left -= m_info.piece_size(found_piece); + } - m_piece_to_slot[found_piece] = current_slot; - m_slot_to_piece[current_slot] = found_piece; - pieces[found_piece] = true; + m_piece_to_slot[found_piece] = current_slot; + m_slot_to_piece[current_slot] = found_piece; + pieces[found_piece] = true; } else { diff --git a/src/torrent.cpp b/src/torrent.cpp index 0705c0909..4f4f37c1b 100755 --- a/src/torrent.cpp +++ b/src/torrent.cpp @@ -168,6 +168,11 @@ namespace libtorrent m_have_pieces.resize(torrent_file.num_pieces(), false); } + torrent::~torrent() + { + if (m_ses.m_abort) m_abort = true; + } + void torrent::tracker_response(const entry& e) { std::vector peer_list; @@ -279,15 +284,14 @@ namespace libtorrent ++i) { if (std::find(downloaders.begin(), downloaders.end(), (*i)->get_peer_id()) - != downloaders.end()) + == downloaders.end()) continue; + + (*i)->received_invalid_data(); + if ((*i)->trust_points() <= -5) { - (*i)->received_invalid_data(); - if ((*i)->trust_points() <= -5) - { - // we don't trust this peer anymore - // ban it. - m_policy->ban_peer(*(*i)); - } + // we don't trust this peer anymore + // ban it. + m_policy->ban_peer(*(*i)); } } @@ -422,11 +426,9 @@ namespace libtorrent #endif } - boost::weak_ptr torrent::connect_to_peer(const address& a, const peer_id& id) + peer_connection& torrent::connect_to_peer(const address& a, const peer_id& id) { boost::shared_ptr s(new socket(socket::tcp, false)); - // TODO: the send buffer size should be controllable from the outside -// s->set_send_bufsize(2048); s->connect(a); boost::shared_ptr c(new peer_connection( m_ses @@ -449,7 +451,7 @@ namespace libtorrent m_ses.m_selector.monitor_readability(s); m_ses.m_selector.monitor_errors(s); // std::cout << "connecting to: " << a.as_string() << ":" << a.port() << "\n"; - return c; + return *c; } void torrent::attach_peer(peer_connection* p) @@ -460,7 +462,7 @@ namespace libtorrent = m_ses.m_connections.find(p->get_socket()); assert(i != m_ses.m_connections.end()); - if (!m_policy->new_connection(i->second)) throw network_error(0); + if (!m_policy->new_connection(*i->second)) throw network_error(0); } void torrent::close_all_connections() @@ -523,6 +525,14 @@ namespace libtorrent #endif } +#ifndef NDEBUG + void torrent::check_invariant() + { + assert(m_num_pieces + == std::count(m_have_pieces.begin(), m_have_pieces.end(), true)); + } +#endif + void torrent::second_tick() { m_time_scaler++; diff --git a/src/torrent_handle.cpp b/src/torrent_handle.cpp index 19a82e0b2..f0fd57ac8 100755 --- a/src/torrent_handle.cpp +++ b/src/torrent_handle.cpp @@ -63,6 +63,35 @@ namespace std namespace libtorrent { + + void torrent_handle::set_max_uploads(int max_uploads) + { + if (m_ses == 0) throw invalid_handle(); + + assert(m_chk != 0); + { + boost::mutex::scoped_lock l(m_ses->m_mutex); + torrent* t = m_ses->find_torrent(m_info_hash); + if (t != 0) + { + t->get_policy().set_max_uploads(max_uploads); + return; + } + } + + + { + boost::mutex::scoped_lock l(m_chk->m_mutex); + + detail::piece_checker_data* d = m_chk->find_torrent(m_info_hash); + if (d != 0) + { + d->torrent_ptr->get_policy().set_max_uploads(max_uploads); + return; + } + } + throw invalid_handle(); + } torrent_status torrent_handle::status() const {