From 09a1023eb3c3dcade0c18315c389f107c2c2cc16 Mon Sep 17 00:00:00 2001 From: Arvid Norberg Date: Thu, 18 Feb 2010 04:37:02 +0000 Subject: [PATCH] fixed issue with UDP over SOCKS5. Added a udp tracker to the unit test to make sure it works. Added tracker tests for all proxies --- ChangeLog | 1 + include/libtorrent/udp_socket.hpp | 20 +- include/libtorrent/udp_tracker_connection.hpp | 1 + src/udp_socket.cpp | 29 ++- src/udp_tracker_connection.cpp | 6 + test/setup_transfer.cpp | 175 +++++++++++++++++- test/setup_transfer.hpp | 6 +- test/test_transfer.cpp | 104 ++++++++--- test/test_web_seed.cpp | 2 +- 9 files changed, 300 insertions(+), 44 deletions(-) diff --git a/ChangeLog b/ChangeLog index 2750c7961..2fbc58e4d 100644 --- a/ChangeLog +++ b/ChangeLog @@ -117,6 +117,7 @@ release 0.14.9 connected multiple times * fixed MinGW support * fixed DHT bootstrapping issue + * fixed UDP over SOCKS5 issue release 0.14.8 diff --git a/include/libtorrent/udp_socket.hpp b/include/libtorrent/udp_socket.hpp index 545d46821..b2f76e14f 100644 --- a/include/libtorrent/udp_socket.hpp +++ b/include/libtorrent/udp_socket.hpp @@ -78,6 +78,14 @@ namespace libtorrent bool is_closed() const { return m_abort; } + protected: + + struct queued_packet + { + udp::endpoint ep; + buffer buf; + }; + private: callback_t m_callback; @@ -91,7 +99,7 @@ namespace libtorrent void handshake2(error_code const& e); void handshake3(error_code const& e); void handshake4(error_code const& e); - void socks_forward_udp(); + void socks_forward_udp(mutex::scoped_lock& l); void connect1(error_code const& e); void connect2(error_code const& e); @@ -119,9 +127,14 @@ namespace libtorrent connection_queue& m_cc; tcp::resolver m_resolver; char m_tmp_buf[100]; + bool m_queue_packets; bool m_tunnel_packets; bool m_abort; udp::endpoint m_proxy_addr; + // while we're connecting to the proxy + // we have to queue the packets, we'll flush + // them once we're connected + std::list m_queue; #ifdef TORRENT_DEBUG bool m_started; int m_magic; @@ -137,11 +150,6 @@ namespace libtorrent void close(); private: - struct queued_packet - { - udp::endpoint ep; - buffer buf; - }; void on_tick(error_code const& e); deadline_timer m_timer; diff --git a/include/libtorrent/udp_tracker_connection.hpp b/include/libtorrent/udp_tracker_connection.hpp index 87cb888d0..822c4f0eb 100644 --- a/include/libtorrent/udp_tracker_connection.hpp +++ b/include/libtorrent/udp_tracker_connection.hpp @@ -124,6 +124,7 @@ namespace libtorrent }; static std::map m_connection_cache; + static mutex m_cache_mutex; action_t m_state; }; diff --git a/src/udp_socket.cpp b/src/udp_socket.cpp index 65e040a2d..9cd10810e 100644 --- a/src/udp_socket.cpp +++ b/src/udp_socket.cpp @@ -60,6 +60,7 @@ udp_socket::udp_socket(asio::io_service& ios, udp_socket::callback_t const& c , m_connection_ticket(-1) , m_cc(cc) , m_resolver(ios) + , m_queue_packets(false) , m_tunnel_packets(false) , m_abort(false) { @@ -107,6 +108,15 @@ void udp_socket::send(udp::endpoint const& ep, char const* p, int len, error_cod return; } + if (m_queue_packets) + { + m_queue.push_back(queued_packet()); + queued_packet& qp = m_queue.back(); + qp.ep = ep; + qp.buf.insert(qp.buf.begin(), p, p + len); + return; + } + #if TORRENT_USE_IPV6 if (ep.address().is_v4() && m_ipv4_sock.is_open()) #endif @@ -452,6 +462,7 @@ void udp_socket::set_proxy_settings(proxy_settings const& ps) if (ps.type == proxy_settings::socks5 || ps.type == proxy_settings::socks5_pw) { + m_queue_packets = true; // connect to socks5 server and open up the UDP tunnel tcp::resolver::query q(ps.hostname, to_string(ps.port).elems); m_resolver.async_resolve(q, boost::bind( @@ -553,7 +564,7 @@ void udp_socket::handshake2(error_code const& e) if (method == 0) { - socks_forward_udp(); + socks_forward_udp(l); } else if (method == 2) { @@ -609,16 +620,14 @@ void udp_socket::handshake4(error_code const& e) if (version != 1) return; if (status != 0) return; - socks_forward_udp(); + socks_forward_udp(l); } -void udp_socket::socks_forward_udp() +void udp_socket::socks_forward_udp(mutex::scoped_lock& l) { CHECK_MAGIC; using namespace libtorrent::detail; - mutex::scoped_lock l(m_mutex); - // send SOCKS5 UDP command char* p = &m_tmp_buf[0]; write_uint8(5, p); // SOCKS VERSION 5 @@ -673,6 +682,16 @@ void udp_socket::connect2(error_code const& e) } m_tunnel_packets = true; + m_queue_packets = false; + + // forward all packets that were put in the queue + while (!m_queue.empty()) + { + queued_packet const& p = m_queue.front(); + error_code ec; + udp_socket::send(p.ep, &p.buf[0], p.buf.size(), ec); + m_queue.pop_front(); + } } rate_limited_udp_socket::rate_limited_udp_socket(io_service& ios diff --git a/src/udp_tracker_connection.cpp b/src/udp_tracker_connection.cpp index 6f6d5dd5d..e8efa2851 100644 --- a/src/udp_tracker_connection.cpp +++ b/src/udp_tracker_connection.cpp @@ -62,6 +62,8 @@ namespace libtorrent std::map udp_tracker_connection::m_connection_cache; + mutex udp_tracker_connection::m_cache_mutex; + udp_tracker_connection::udp_tracker_connection( io_service& ios , connection_queue& cc @@ -195,6 +197,7 @@ namespace libtorrent return; } + mutex::scoped_lock l(m_cache_mutex); std::map::iterator cc = m_connection_cache.find(m_target.address()); if (cc != m_connection_cache.end()) @@ -212,6 +215,7 @@ namespace libtorrent // if it expired, remove it from the cache m_connection_cache.erase(cc); } + l.unlock(); send_udp_connect(); } @@ -328,6 +332,8 @@ namespace libtorrent m_transaction_id = 0; m_attempts = 0; boost::uint64_t connection_id = detail::read_int64(buf); + + mutex::scoped_lock l(m_cache_mutex); connection_cache_entry& cce = m_connection_cache[m_target.address()]; cce.connection_id = connection_id; cce.expires = time_now() + seconds(m_ses.m_settings.udp_tracker_token_expiry); diff --git a/test/setup_transfer.cpp b/test/setup_transfer.cpp index ebf4c81a8..df471232a 100644 --- a/test/setup_transfer.cpp +++ b/test/setup_transfer.cpp @@ -168,7 +168,8 @@ boost::intrusive_ptr clone_ptr(boost::intrusive_ptr const& ptr) return boost::intrusive_ptr(new T(*ptr)); } -boost::intrusive_ptr create_torrent(std::ostream* file, int piece_size, int num_pieces) +boost::intrusive_ptr create_torrent(std::ostream* file, int piece_size + , int num_pieces, bool add_tracker) { char const* tracker_url = "http://non-existent-name.com/announce"; // excercise the path when encountering invalid urls @@ -179,9 +180,12 @@ boost::intrusive_ptr create_torrent(std::ostream* file, int piece_ int total_size = piece_size * num_pieces; fs.add_file("temporary", total_size); libtorrent::create_torrent t(fs, piece_size); - t.add_tracker(tracker_url); - t.add_tracker(invalid_tracker_url); - t.add_tracker(invalid_tracker_protocol); + if (add_tracker) + { + t.add_tracker(tracker_url); + t.add_tracker(invalid_tracker_url); + t.add_tracker(invalid_tracker_protocol); + } std::vector piece(piece_size); for (int i = 0; i < int(piece.size()); ++i) @@ -221,7 +225,7 @@ setup_transfer(session* ses1, session* ses2, session* ses3 assert(ses1); assert(ses2); - session_settings sess_set; + session_settings sess_set = ses1->settings(); sess_set.allow_multiple_connections_per_ip = true; sess_set.ignore_limits_on_local_network = false; ses1->set_settings(sess_set); @@ -251,7 +255,7 @@ setup_transfer(session* ses1, session* ses2, session* ses3 error_code ec; create_directory("./tmp1" + suffix, ec); std::ofstream file(("./tmp1" + suffix + "/temporary").c_str()); - t = ::create_torrent(&file, piece_size, 19); + t = ::create_torrent(&file, piece_size, 19, true); file.close(); if (clear_files) { @@ -335,6 +339,151 @@ setup_transfer(session* ses1, session* ses2, session* ses3 return boost::make_tuple(tor1, tor2, tor3); } +boost::asio::io_service* tracker_ios = 0; +boost::shared_ptr tracker_server; +libtorrent::mutex tracker_lock; +libtorrent::condition tracker_initialized; + +bool udp_failed = false; + +void stop_tracker() +{ + if (tracker_server && tracker_ios) + { + tracker_ios->stop(); + tracker_server->join(); + tracker_server.reset(); + tracker_ios = 0; + } +} + +void udp_tracker_thread(int* port); + +int start_tracker() +{ + stop_tracker(); + + { + mutex::scoped_lock l(tracker_lock); + tracker_initialized.clear(l); + } + + int port = 0; + + tracker_server.reset(new libtorrent::thread(boost::bind(&udp_tracker_thread, &port))); + + { + mutex::scoped_lock l(tracker_lock); + tracker_initialized.wait(l); + } + test_sleep(100); + return port; +} + +void on_udp_receive(error_code const& ec, size_t bytes_transferred, udp::endpoint const* from, char* buffer, udp::socket* sock) +{ + if (ec) + { + fprintf(stderr, "UDP tracker, read failed: %s\n", ec.message().c_str()); + return; + } + + udp_failed = false; + + if (bytes_transferred < 16) + { + fprintf(stderr, "UDP message too short\n"); + return; + } + char* ptr = buffer; + boost::uint64_t connection_id = detail::read_uint64(ptr); + boost::uint32_t action = detail::read_uint32(ptr); + boost::uint32_t transaction_id = detail::read_uint32(ptr); + + error_code e; + + switch (action) + { + case 0: // connect + + ptr = buffer; + detail::write_uint32(0, ptr); // action = connect + detail::write_uint32(transaction_id, ptr); // transaction_id + detail::write_uint64(10, ptr); // connection_id + sock->send_to(asio::buffer(buffer, 16), *from, 0, e); + break; + + case 1: // announce + + ptr = buffer; + detail::write_uint32(1, ptr); // action = announce + detail::write_uint32(transaction_id, ptr); // transaction_id + detail::write_uint32(1800, ptr); // interval + detail::write_uint32(1, ptr); // incomplete + detail::write_uint32(1, ptr); // complete + // 0 peers + sock->send_to(asio::buffer(buffer, 20), *from, 0, e); + break; + default: // ignore scrapes + break; + } +} + +void udp_tracker_thread(int* port) +{ + io_service ios; + udp::socket acceptor(ios); + error_code ec; + acceptor.open(udp::v4(), ec); + if (ec) + { + fprintf(stderr, "Error opening listen UDP socket: %s\n", ec.message().c_str()); + mutex::scoped_lock l(tracker_lock); + tracker_initialized.signal(l); + return; + } + acceptor.bind(udp::endpoint(address_v4::any(), 0), ec); + if (ec) + { + fprintf(stderr, "Error binding UDP socket to port 0: %s\n", ec.message().c_str()); + mutex::scoped_lock l(tracker_lock); + tracker_initialized.signal(l); + return; + } + *port = acceptor.local_endpoint().port(); + + tracker_ios = &ios; + + fprintf(stderr, "UDP tracker initialized on port %d\n", *port); + + { + mutex::scoped_lock l(tracker_lock); + tracker_initialized.signal(l); + } + + char buffer[2000]; + + for (;;) + { + error_code ec; + udp::endpoint from; + udp_failed = true; + acceptor.async_receive_from( + asio::buffer(buffer, sizeof(buffer)), from, boost::bind( + &on_udp_receive, _1, _2, &from, &buffer[0], &acceptor)); + ios.run_one(ec); + if (udp_failed) return; + + if (ec) + { + fprintf(stderr, "Error receiving on UDP socket: %s\n", ec.message().c_str()); + mutex::scoped_lock l(tracker_lock); + tracker_initialized.signal(l); + return; + } + ios.reset(); + } +} boost::asio::io_service* web_ios = 0; boost::shared_ptr web_server; @@ -557,6 +706,20 @@ void web_server_thread(int* port, bool ssl) break; } + if (path.substr(0, 9) == "/announce") + { + entry announce; + announce["interval"] = 1800; + announce["complete"] = 1; + announce["incomplete"] = 1; + announce["peers"].string(); + std::vector buf; + bencode(std::back_inserter(buf), announce); + + send_response(s, ec, 200, "OK", 0, buf.size()); + write(s, boost::asio::buffer(&buf[0], buf.size()), boost::asio::transfer_all(), ec); + } + // fprintf(stderr, ">> serving file %s\n", path.c_str()); std::vector file_buf; // remove the / from the path diff --git a/test/setup_transfer.hpp b/test/setup_transfer.hpp index 106753670..1f07b8e88 100644 --- a/test/setup_transfer.hpp +++ b/test/setup_transfer.hpp @@ -50,7 +50,8 @@ bool print_alerts(libtorrent::session& ses, char const* name void test_sleep(int millisec); -boost::intrusive_ptr create_torrent(std::ostream* file = 0, int piece_size = 16 * 1024, int num_pieces = 13); +boost::intrusive_ptr create_torrent(std::ostream* file = 0 + , int piece_size = 16 * 1024, int num_pieces = 13, bool add_tracker = true); boost::tuple #include @@ -92,7 +93,7 @@ void test_rate() << std::endl; if (tor2.is_seed()) break; - test_sleep(1000); + test_sleep(100); } TEST_CHECK(tor2.is_seed()); @@ -213,8 +214,22 @@ storage_interface* test_storage_constructor(file_storage const& fs return new test_storage(fs, path, fp); } -void test_transfer(bool test_disk_full = false, bool test_allowed_fast = false) +int tracker_responses = 0; + +bool on_alert(alert* a) { + if (alert_cast(a)) + ++tracker_responses; + return false; +} + +void test_transfer(int proxy_type, bool test_disk_full = false, bool test_allowed_fast = false) +{ + + char const* test_name[] = {"no", "SOCKS4", "SOCKS5", "SOCKS5 password", "HTTP", "HTTP password"}; + + fprintf(stderr, "\n\n ==== TESTING %s proxy ====\n\n\n", test_name[proxy_type]); + // in case the previous run was terminated error_code ec; remove_all("./tmp1_transfer", ec); @@ -225,14 +240,37 @@ void test_transfer(bool test_disk_full = false, bool test_allowed_fast = false) session ses1(fingerprint("LT", 0, 1, 0, 0), std::make_pair(48075, 49000), "0.0.0.0", 0); session ses2(fingerprint("LT", 0, 1, 0, 0), std::make_pair(49075, 50000), "0.0.0.0", 0); + int proxy_port = (rand() % 30000) + 10000; + if (proxy_type) + { + start_proxy(proxy_port, proxy_type); + proxy_settings ps; + ps.hostname = "127.0.0.1"; + ps.port = proxy_port; + ps.username = "testuser"; + ps.password = "testpass"; + ps.type = (proxy_settings::proxy_type)proxy_type; + ses1.set_tracker_proxy(ps); + ses2.set_tracker_proxy(ps); + } + + session_settings sett; + if (test_allowed_fast) { - session_settings sett; sett.allowed_fast_set_size = 2000; ses1.set_max_uploads(0); - ses1.set_settings(sett); } + sett.min_reconnect_time = 1; + sett.announce_to_all_trackers = true; + sett.announce_to_all_tiers = true; + // make sure we announce to both http and udp trackers + sett.prefer_udp_trackers = false; + + ses1.set_settings(sett); + ses2.set_settings(sett); + #ifndef TORRENT_DISABLE_ENCRYPTION pe_settings pes; pes.out_enc_policy = pe_settings::forced; @@ -246,20 +284,25 @@ void test_transfer(bool test_disk_full = false, bool test_allowed_fast = false) create_directory("./tmp1_transfer", ec); std::ofstream file("./tmp1_transfer/temporary"); - boost::intrusive_ptr t = ::create_torrent(&file, 16 * 1024); + boost::intrusive_ptr t = ::create_torrent(&file, 16 * 1024, 13, false); file.close(); + int udp_tracker_port = start_tracker(); + int tracker_port = start_web_server(); + + char tracker_url[200]; + snprintf(tracker_url, sizeof(tracker_url), "http://127.0.0.1:%d/announce", tracker_port); + t->add_tracker(tracker_url); + + snprintf(tracker_url, sizeof(tracker_url), "udp://127.0.0.1:%d/announce", udp_tracker_port); + t->add_tracker(tracker_url); + add_torrent_params addp(&test_storage_constructor); // test using piece sizes smaller than 16kB boost::tie(tor1, tor2, ignore) = setup_transfer(&ses1, &ses2, 0 , true, false, true, "_transfer", 8 * 1024, &t, false, test_disk_full?&addp:0); - session_settings settings = ses1.settings(); - settings.min_reconnect_time = 1; - ses1.set_settings(settings); - ses2.set_settings(settings); - // set half of the pieces to priority 0 int num_pieces = tor2.get_torrent_info().num_pieces(); std::vector priorities(num_pieces, 1); @@ -273,16 +316,18 @@ void test_transfer(bool test_disk_full = false, bool test_allowed_fast = false) ses2.set_alert_mask(alert::all_categories & ~alert::progress_notification); ses1.set_alert_dispatch(&print_alert); - ses2.set_download_rate_limit(tor2.get_torrent_info().piece_length() / 2); + ses2.set_download_rate_limit(tor2.get_torrent_info().piece_length() * 5); // also test to move the storage of the downloader and the uploader // to make sure it can handle switching paths bool test_move_storage = false; - for (int i = 0; i < 30; ++i) + tracker_responses = 0; + + for (int i = 0; i < 50; ++i) { - print_alerts(ses1, "ses1"); - print_alerts(ses2, "ses2"); + print_alerts(ses1, "ses1", true, true, true, on_alert); + print_alerts(ses2, "ses2", true, true, true, on_alert); torrent_status st1 = tor1.status(); torrent_status st2 = tor2.status(); @@ -323,9 +368,12 @@ void test_transfer(bool test_disk_full = false, bool test_allowed_fast = false) TEST_CHECK(st2.state == torrent_status::downloading || (test_disk_full && !st2.error.empty())); - test_sleep(1000); + test_sleep(100); } + // 1 announce per tracker to start + TEST_EQUAL(tracker_responses, 2); + TEST_CHECK(!tor2.is_seed()); TEST_CHECK(tor2.is_finished()); if (tor2.is_finished()) @@ -334,9 +382,9 @@ void test_transfer(bool test_disk_full = false, bool test_allowed_fast = false) std::cerr << "force recheck" << std::endl; tor2.force_recheck(); - for (int i = 0; i < 10; ++i) + for (int i = 0; i < 50; ++i) { - test_sleep(1000); + test_sleep(100); print_alerts(ses2, "ses2"); torrent_status st2 = tor2.status(); std::cerr << "\033[0m" << int(st2.progress * 100) << "% " << std::endl; @@ -361,11 +409,11 @@ void test_transfer(bool test_disk_full = false, bool test_allowed_fast = false) { std::auto_ptr holder = ses2.pop_alert(); std::cerr << "ses2: " << a->message() << std::endl; - if (dynamic_cast(a)) break; + if (alert_cast(a)) break; a = ses2.wait_for_alert(seconds(10)); } - std::vector tr; + std::vector tr = tor2.trackers(); tr.push_back(announce_entry("http://test.com/announce")); tor2.replace_trackers(tr); tr.clear(); @@ -378,10 +426,10 @@ void test_transfer(bool test_disk_full = false, bool test_allowed_fast = false) { std::auto_ptr holder = ses2.pop_alert(); std::cerr << "ses2: " << a->message() << std::endl; - if (dynamic_cast(a)) + if (alert_cast(a)) { bencode(std::back_inserter(resume_data) - , *dynamic_cast(a)->resume_data); + , *alert_cast(a)->resume_data); break; } a = ses2.wait_for_alert(seconds(10)); @@ -458,10 +506,14 @@ void test_transfer(bool test_disk_full = false, bool test_allowed_fast = false) TEST_CHECK(st1.state == torrent_status::seeding); TEST_CHECK(st2.state == torrent_status::downloading); - test_sleep(1000); + test_sleep(100); } TEST_CHECK(tor2.is_seed()); + + stop_tracker(); + stop_web_server(); + if (proxy_type) stop_proxy(proxy_port); } int test_main() @@ -473,13 +525,15 @@ int test_main() test_rate(); #endif - test_transfer(); + // test with all kinds of proxies + for (int i = 0; i < 6; ++i) + test_transfer(i); // test with a (simulated) full disk - test_transfer(true); + test_transfer(0, true); // test allowed fast - test_transfer(false, true); + test_transfer(0, false, true); error_code ec; remove_all("./tmp1_transfer", ec); diff --git a/test/test_web_seed.cpp b/test/test_web_seed.cpp index e04f89924..58c1683dc 100644 --- a/test/test_web_seed.cpp +++ b/test/test_web_seed.cpp @@ -62,7 +62,7 @@ void test_transfer(boost::intrusive_ptr torrent_file, int proxy, i char const* test_name[] = {"no", "SOCKS4", "SOCKS5", "SOCKS5 password", "HTTP", "HTTP password"}; - std::cerr << " ==== TESTING " << test_name[proxy] << " proxy ====" << std::endl; + fprintf(stderr, "\n\n ==== TESTING %s proxy ====\n\n\n", test_name[proxy]); if (proxy) {