fixed issue with UDP over SOCKS5. Added a udp tracker to the unit test to make sure it works. Added tracker tests for all proxies

This commit is contained in:
Arvid Norberg 2010-02-18 04:37:02 +00:00
parent 9197081618
commit 09a1023eb3
9 changed files with 300 additions and 44 deletions

View File

@ -117,6 +117,7 @@ release 0.14.9
connected multiple times
* fixed MinGW support
* fixed DHT bootstrapping issue
* fixed UDP over SOCKS5 issue
release 0.14.8

View File

@ -78,6 +78,14 @@ namespace libtorrent
bool is_closed() const { return m_abort; }
protected:
struct queued_packet
{
udp::endpoint ep;
buffer buf;
};
private:
callback_t m_callback;
@ -91,7 +99,7 @@ namespace libtorrent
void handshake2(error_code const& e);
void handshake3(error_code const& e);
void handshake4(error_code const& e);
void socks_forward_udp();
void socks_forward_udp(mutex::scoped_lock& l);
void connect1(error_code const& e);
void connect2(error_code const& e);
@ -119,9 +127,14 @@ namespace libtorrent
connection_queue& m_cc;
tcp::resolver m_resolver;
char m_tmp_buf[100];
bool m_queue_packets;
bool m_tunnel_packets;
bool m_abort;
udp::endpoint m_proxy_addr;
// while we're connecting to the proxy
// we have to queue the packets, we'll flush
// them once we're connected
std::list<queued_packet> m_queue;
#ifdef TORRENT_DEBUG
bool m_started;
int m_magic;
@ -137,11 +150,6 @@ namespace libtorrent
void close();
private:
struct queued_packet
{
udp::endpoint ep;
buffer buf;
};
void on_tick(error_code const& e);
deadline_timer m_timer;

View File

@ -124,6 +124,7 @@ namespace libtorrent
};
static std::map<address, connection_cache_entry> m_connection_cache;
static mutex m_cache_mutex;
action_t m_state;
};

View File

@ -60,6 +60,7 @@ udp_socket::udp_socket(asio::io_service& ios, udp_socket::callback_t const& c
, m_connection_ticket(-1)
, m_cc(cc)
, m_resolver(ios)
, m_queue_packets(false)
, m_tunnel_packets(false)
, m_abort(false)
{
@ -107,6 +108,15 @@ void udp_socket::send(udp::endpoint const& ep, char const* p, int len, error_cod
return;
}
if (m_queue_packets)
{
m_queue.push_back(queued_packet());
queued_packet& qp = m_queue.back();
qp.ep = ep;
qp.buf.insert(qp.buf.begin(), p, p + len);
return;
}
#if TORRENT_USE_IPV6
if (ep.address().is_v4() && m_ipv4_sock.is_open())
#endif
@ -452,6 +462,7 @@ void udp_socket::set_proxy_settings(proxy_settings const& ps)
if (ps.type == proxy_settings::socks5
|| ps.type == proxy_settings::socks5_pw)
{
m_queue_packets = true;
// connect to socks5 server and open up the UDP tunnel
tcp::resolver::query q(ps.hostname, to_string(ps.port).elems);
m_resolver.async_resolve(q, boost::bind(
@ -553,7 +564,7 @@ void udp_socket::handshake2(error_code const& e)
if (method == 0)
{
socks_forward_udp();
socks_forward_udp(l);
}
else if (method == 2)
{
@ -609,16 +620,14 @@ void udp_socket::handshake4(error_code const& e)
if (version != 1) return;
if (status != 0) return;
socks_forward_udp();
socks_forward_udp(l);
}
void udp_socket::socks_forward_udp()
void udp_socket::socks_forward_udp(mutex::scoped_lock& l)
{
CHECK_MAGIC;
using namespace libtorrent::detail;
mutex::scoped_lock l(m_mutex);
// send SOCKS5 UDP command
char* p = &m_tmp_buf[0];
write_uint8(5, p); // SOCKS VERSION 5
@ -673,6 +682,16 @@ void udp_socket::connect2(error_code const& e)
}
m_tunnel_packets = true;
m_queue_packets = false;
// forward all packets that were put in the queue
while (!m_queue.empty())
{
queued_packet const& p = m_queue.front();
error_code ec;
udp_socket::send(p.ep, &p.buf[0], p.buf.size(), ec);
m_queue.pop_front();
}
}
rate_limited_udp_socket::rate_limited_udp_socket(io_service& ios

View File

@ -62,6 +62,8 @@ namespace libtorrent
std::map<address, udp_tracker_connection::connection_cache_entry>
udp_tracker_connection::m_connection_cache;
mutex udp_tracker_connection::m_cache_mutex;
udp_tracker_connection::udp_tracker_connection(
io_service& ios
, connection_queue& cc
@ -195,6 +197,7 @@ namespace libtorrent
return;
}
mutex::scoped_lock l(m_cache_mutex);
std::map<address, connection_cache_entry>::iterator cc
= m_connection_cache.find(m_target.address());
if (cc != m_connection_cache.end())
@ -212,6 +215,7 @@ namespace libtorrent
// if it expired, remove it from the cache
m_connection_cache.erase(cc);
}
l.unlock();
send_udp_connect();
}
@ -328,6 +332,8 @@ namespace libtorrent
m_transaction_id = 0;
m_attempts = 0;
boost::uint64_t connection_id = detail::read_int64(buf);
mutex::scoped_lock l(m_cache_mutex);
connection_cache_entry& cce = m_connection_cache[m_target.address()];
cce.connection_id = connection_id;
cce.expires = time_now() + seconds(m_ses.m_settings.udp_tracker_token_expiry);

View File

@ -168,7 +168,8 @@ boost::intrusive_ptr<T> clone_ptr(boost::intrusive_ptr<T> const& ptr)
return boost::intrusive_ptr<T>(new T(*ptr));
}
boost::intrusive_ptr<torrent_info> create_torrent(std::ostream* file, int piece_size, int num_pieces)
boost::intrusive_ptr<torrent_info> create_torrent(std::ostream* file, int piece_size
, int num_pieces, bool add_tracker)
{
char const* tracker_url = "http://non-existent-name.com/announce";
// excercise the path when encountering invalid urls
@ -179,9 +180,12 @@ boost::intrusive_ptr<torrent_info> create_torrent(std::ostream* file, int piece_
int total_size = piece_size * num_pieces;
fs.add_file("temporary", total_size);
libtorrent::create_torrent t(fs, piece_size);
t.add_tracker(tracker_url);
t.add_tracker(invalid_tracker_url);
t.add_tracker(invalid_tracker_protocol);
if (add_tracker)
{
t.add_tracker(tracker_url);
t.add_tracker(invalid_tracker_url);
t.add_tracker(invalid_tracker_protocol);
}
std::vector<char> piece(piece_size);
for (int i = 0; i < int(piece.size()); ++i)
@ -221,7 +225,7 @@ setup_transfer(session* ses1, session* ses2, session* ses3
assert(ses1);
assert(ses2);
session_settings sess_set;
session_settings sess_set = ses1->settings();
sess_set.allow_multiple_connections_per_ip = true;
sess_set.ignore_limits_on_local_network = false;
ses1->set_settings(sess_set);
@ -251,7 +255,7 @@ setup_transfer(session* ses1, session* ses2, session* ses3
error_code ec;
create_directory("./tmp1" + suffix, ec);
std::ofstream file(("./tmp1" + suffix + "/temporary").c_str());
t = ::create_torrent(&file, piece_size, 19);
t = ::create_torrent(&file, piece_size, 19, true);
file.close();
if (clear_files)
{
@ -335,6 +339,151 @@ setup_transfer(session* ses1, session* ses2, session* ses3
return boost::make_tuple(tor1, tor2, tor3);
}
boost::asio::io_service* tracker_ios = 0;
boost::shared_ptr<libtorrent::thread> tracker_server;
libtorrent::mutex tracker_lock;
libtorrent::condition tracker_initialized;
bool udp_failed = false;
void stop_tracker()
{
if (tracker_server && tracker_ios)
{
tracker_ios->stop();
tracker_server->join();
tracker_server.reset();
tracker_ios = 0;
}
}
void udp_tracker_thread(int* port);
int start_tracker()
{
stop_tracker();
{
mutex::scoped_lock l(tracker_lock);
tracker_initialized.clear(l);
}
int port = 0;
tracker_server.reset(new libtorrent::thread(boost::bind(&udp_tracker_thread, &port)));
{
mutex::scoped_lock l(tracker_lock);
tracker_initialized.wait(l);
}
test_sleep(100);
return port;
}
void on_udp_receive(error_code const& ec, size_t bytes_transferred, udp::endpoint const* from, char* buffer, udp::socket* sock)
{
if (ec)
{
fprintf(stderr, "UDP tracker, read failed: %s\n", ec.message().c_str());
return;
}
udp_failed = false;
if (bytes_transferred < 16)
{
fprintf(stderr, "UDP message too short\n");
return;
}
char* ptr = buffer;
boost::uint64_t connection_id = detail::read_uint64(ptr);
boost::uint32_t action = detail::read_uint32(ptr);
boost::uint32_t transaction_id = detail::read_uint32(ptr);
error_code e;
switch (action)
{
case 0: // connect
ptr = buffer;
detail::write_uint32(0, ptr); // action = connect
detail::write_uint32(transaction_id, ptr); // transaction_id
detail::write_uint64(10, ptr); // connection_id
sock->send_to(asio::buffer(buffer, 16), *from, 0, e);
break;
case 1: // announce
ptr = buffer;
detail::write_uint32(1, ptr); // action = announce
detail::write_uint32(transaction_id, ptr); // transaction_id
detail::write_uint32(1800, ptr); // interval
detail::write_uint32(1, ptr); // incomplete
detail::write_uint32(1, ptr); // complete
// 0 peers
sock->send_to(asio::buffer(buffer, 20), *from, 0, e);
break;
default: // ignore scrapes
break;
}
}
void udp_tracker_thread(int* port)
{
io_service ios;
udp::socket acceptor(ios);
error_code ec;
acceptor.open(udp::v4(), ec);
if (ec)
{
fprintf(stderr, "Error opening listen UDP socket: %s\n", ec.message().c_str());
mutex::scoped_lock l(tracker_lock);
tracker_initialized.signal(l);
return;
}
acceptor.bind(udp::endpoint(address_v4::any(), 0), ec);
if (ec)
{
fprintf(stderr, "Error binding UDP socket to port 0: %s\n", ec.message().c_str());
mutex::scoped_lock l(tracker_lock);
tracker_initialized.signal(l);
return;
}
*port = acceptor.local_endpoint().port();
tracker_ios = &ios;
fprintf(stderr, "UDP tracker initialized on port %d\n", *port);
{
mutex::scoped_lock l(tracker_lock);
tracker_initialized.signal(l);
}
char buffer[2000];
for (;;)
{
error_code ec;
udp::endpoint from;
udp_failed = true;
acceptor.async_receive_from(
asio::buffer(buffer, sizeof(buffer)), from, boost::bind(
&on_udp_receive, _1, _2, &from, &buffer[0], &acceptor));
ios.run_one(ec);
if (udp_failed) return;
if (ec)
{
fprintf(stderr, "Error receiving on UDP socket: %s\n", ec.message().c_str());
mutex::scoped_lock l(tracker_lock);
tracker_initialized.signal(l);
return;
}
ios.reset();
}
}
boost::asio::io_service* web_ios = 0;
boost::shared_ptr<libtorrent::thread> web_server;
@ -557,6 +706,20 @@ void web_server_thread(int* port, bool ssl)
break;
}
if (path.substr(0, 9) == "/announce")
{
entry announce;
announce["interval"] = 1800;
announce["complete"] = 1;
announce["incomplete"] = 1;
announce["peers"].string();
std::vector<char> buf;
bencode(std::back_inserter(buf), announce);
send_response(s, ec, 200, "OK", 0, buf.size());
write(s, boost::asio::buffer(&buf[0], buf.size()), boost::asio::transfer_all(), ec);
}
// fprintf(stderr, ">> serving file %s\n", path.c_str());
std::vector<char> file_buf;
// remove the / from the path

View File

@ -50,7 +50,8 @@ bool print_alerts(libtorrent::session& ses, char const* name
void test_sleep(int millisec);
boost::intrusive_ptr<libtorrent::torrent_info> create_torrent(std::ostream* file = 0, int piece_size = 16 * 1024, int num_pieces = 13);
boost::intrusive_ptr<libtorrent::torrent_info> create_torrent(std::ostream* file = 0
, int piece_size = 16 * 1024, int num_pieces = 13, bool add_tracker = true);
boost::tuple<libtorrent::torrent_handle
, libtorrent::torrent_handle
@ -66,5 +67,8 @@ void stop_web_server();
void start_proxy(int port, int type);
void stop_proxy(int port);
void stop_tracker();
int start_tracker();
#endif

View File

@ -37,6 +37,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/bencode.hpp"
#include "libtorrent/thread.hpp"
#include "libtorrent/time.hpp"
#include "libtorrent/file.hpp"
#include <boost/tuple/tuple.hpp>
#include <boost/bind.hpp>
@ -92,7 +93,7 @@ void test_rate()
<< std::endl;
if (tor2.is_seed()) break;
test_sleep(1000);
test_sleep(100);
}
TEST_CHECK(tor2.is_seed());
@ -213,8 +214,22 @@ storage_interface* test_storage_constructor(file_storage const& fs
return new test_storage(fs, path, fp);
}
void test_transfer(bool test_disk_full = false, bool test_allowed_fast = false)
int tracker_responses = 0;
bool on_alert(alert* a)
{
if (alert_cast<tracker_reply_alert>(a))
++tracker_responses;
return false;
}
void test_transfer(int proxy_type, bool test_disk_full = false, bool test_allowed_fast = false)
{
char const* test_name[] = {"no", "SOCKS4", "SOCKS5", "SOCKS5 password", "HTTP", "HTTP password"};
fprintf(stderr, "\n\n ==== TESTING %s proxy ====\n\n\n", test_name[proxy_type]);
// in case the previous run was terminated
error_code ec;
remove_all("./tmp1_transfer", ec);
@ -225,14 +240,37 @@ void test_transfer(bool test_disk_full = false, bool test_allowed_fast = false)
session ses1(fingerprint("LT", 0, 1, 0, 0), std::make_pair(48075, 49000), "0.0.0.0", 0);
session ses2(fingerprint("LT", 0, 1, 0, 0), std::make_pair(49075, 50000), "0.0.0.0", 0);
int proxy_port = (rand() % 30000) + 10000;
if (proxy_type)
{
start_proxy(proxy_port, proxy_type);
proxy_settings ps;
ps.hostname = "127.0.0.1";
ps.port = proxy_port;
ps.username = "testuser";
ps.password = "testpass";
ps.type = (proxy_settings::proxy_type)proxy_type;
ses1.set_tracker_proxy(ps);
ses2.set_tracker_proxy(ps);
}
session_settings sett;
if (test_allowed_fast)
{
session_settings sett;
sett.allowed_fast_set_size = 2000;
ses1.set_max_uploads(0);
ses1.set_settings(sett);
}
sett.min_reconnect_time = 1;
sett.announce_to_all_trackers = true;
sett.announce_to_all_tiers = true;
// make sure we announce to both http and udp trackers
sett.prefer_udp_trackers = false;
ses1.set_settings(sett);
ses2.set_settings(sett);
#ifndef TORRENT_DISABLE_ENCRYPTION
pe_settings pes;
pes.out_enc_policy = pe_settings::forced;
@ -246,20 +284,25 @@ void test_transfer(bool test_disk_full = false, bool test_allowed_fast = false)
create_directory("./tmp1_transfer", ec);
std::ofstream file("./tmp1_transfer/temporary");
boost::intrusive_ptr<torrent_info> t = ::create_torrent(&file, 16 * 1024);
boost::intrusive_ptr<torrent_info> t = ::create_torrent(&file, 16 * 1024, 13, false);
file.close();
int udp_tracker_port = start_tracker();
int tracker_port = start_web_server();
char tracker_url[200];
snprintf(tracker_url, sizeof(tracker_url), "http://127.0.0.1:%d/announce", tracker_port);
t->add_tracker(tracker_url);
snprintf(tracker_url, sizeof(tracker_url), "udp://127.0.0.1:%d/announce", udp_tracker_port);
t->add_tracker(tracker_url);
add_torrent_params addp(&test_storage_constructor);
// test using piece sizes smaller than 16kB
boost::tie(tor1, tor2, ignore) = setup_transfer(&ses1, &ses2, 0
, true, false, true, "_transfer", 8 * 1024, &t, false, test_disk_full?&addp:0);
session_settings settings = ses1.settings();
settings.min_reconnect_time = 1;
ses1.set_settings(settings);
ses2.set_settings(settings);
// set half of the pieces to priority 0
int num_pieces = tor2.get_torrent_info().num_pieces();
std::vector<int> priorities(num_pieces, 1);
@ -273,16 +316,18 @@ void test_transfer(bool test_disk_full = false, bool test_allowed_fast = false)
ses2.set_alert_mask(alert::all_categories & ~alert::progress_notification);
ses1.set_alert_dispatch(&print_alert);
ses2.set_download_rate_limit(tor2.get_torrent_info().piece_length() / 2);
ses2.set_download_rate_limit(tor2.get_torrent_info().piece_length() * 5);
// also test to move the storage of the downloader and the uploader
// to make sure it can handle switching paths
bool test_move_storage = false;
for (int i = 0; i < 30; ++i)
tracker_responses = 0;
for (int i = 0; i < 50; ++i)
{
print_alerts(ses1, "ses1");
print_alerts(ses2, "ses2");
print_alerts(ses1, "ses1", true, true, true, on_alert);
print_alerts(ses2, "ses2", true, true, true, on_alert);
torrent_status st1 = tor1.status();
torrent_status st2 = tor2.status();
@ -323,9 +368,12 @@ void test_transfer(bool test_disk_full = false, bool test_allowed_fast = false)
TEST_CHECK(st2.state == torrent_status::downloading
|| (test_disk_full && !st2.error.empty()));
test_sleep(1000);
test_sleep(100);
}
// 1 announce per tracker to start
TEST_EQUAL(tracker_responses, 2);
TEST_CHECK(!tor2.is_seed());
TEST_CHECK(tor2.is_finished());
if (tor2.is_finished())
@ -334,9 +382,9 @@ void test_transfer(bool test_disk_full = false, bool test_allowed_fast = false)
std::cerr << "force recheck" << std::endl;
tor2.force_recheck();
for (int i = 0; i < 10; ++i)
for (int i = 0; i < 50; ++i)
{
test_sleep(1000);
test_sleep(100);
print_alerts(ses2, "ses2");
torrent_status st2 = tor2.status();
std::cerr << "\033[0m" << int(st2.progress * 100) << "% " << std::endl;
@ -361,11 +409,11 @@ void test_transfer(bool test_disk_full = false, bool test_allowed_fast = false)
{
std::auto_ptr<alert> holder = ses2.pop_alert();
std::cerr << "ses2: " << a->message() << std::endl;
if (dynamic_cast<torrent_paused_alert const*>(a)) break;
if (alert_cast<torrent_paused_alert>(a)) break;
a = ses2.wait_for_alert(seconds(10));
}
std::vector<announce_entry> tr;
std::vector<announce_entry> tr = tor2.trackers();
tr.push_back(announce_entry("http://test.com/announce"));
tor2.replace_trackers(tr);
tr.clear();
@ -378,10 +426,10 @@ void test_transfer(bool test_disk_full = false, bool test_allowed_fast = false)
{
std::auto_ptr<alert> holder = ses2.pop_alert();
std::cerr << "ses2: " << a->message() << std::endl;
if (dynamic_cast<save_resume_data_alert const*>(a))
if (alert_cast<save_resume_data_alert>(a))
{
bencode(std::back_inserter(resume_data)
, *dynamic_cast<save_resume_data_alert const*>(a)->resume_data);
, *alert_cast<save_resume_data_alert>(a)->resume_data);
break;
}
a = ses2.wait_for_alert(seconds(10));
@ -458,10 +506,14 @@ void test_transfer(bool test_disk_full = false, bool test_allowed_fast = false)
TEST_CHECK(st1.state == torrent_status::seeding);
TEST_CHECK(st2.state == torrent_status::downloading);
test_sleep(1000);
test_sleep(100);
}
TEST_CHECK(tor2.is_seed());
stop_tracker();
stop_web_server();
if (proxy_type) stop_proxy(proxy_port);
}
int test_main()
@ -473,13 +525,15 @@ int test_main()
test_rate();
#endif
test_transfer();
// test with all kinds of proxies
for (int i = 0; i < 6; ++i)
test_transfer(i);
// test with a (simulated) full disk
test_transfer(true);
test_transfer(0, true);
// test allowed fast
test_transfer(false, true);
test_transfer(0, false, true);
error_code ec;
remove_all("./tmp1_transfer", ec);

View File

@ -62,7 +62,7 @@ void test_transfer(boost::intrusive_ptr<torrent_info> torrent_file, int proxy, i
char const* test_name[] = {"no", "SOCKS4", "SOCKS5", "SOCKS5 password", "HTTP", "HTTP password"};
std::cerr << " ==== TESTING " << test_name[proxy] << " proxy ====" << std::endl;
fprintf(stderr, "\n\n ==== TESTING %s proxy ====\n\n\n", test_name[proxy]);
if (proxy)
{