From 726f89ca99d726be54eaab5c7ae0ba37e928ddcf Mon Sep 17 00:00:00 2001 From: Arvid Norberg Date: Sat, 10 Sep 2011 05:36:38 +0000 Subject: [PATCH] (partially) fix test_web_seed --- test/setup_transfer.cpp | 190 +++++++++++++++++++++++++++++++--------- test/test_web_seed.cpp | 6 +- 2 files changed, 154 insertions(+), 42 deletions(-) diff --git a/test/setup_transfer.cpp b/test/setup_transfer.cpp index ae6cc9c46..654b5a211 100644 --- a/test/setup_transfer.cpp +++ b/test/setup_transfer.cpp @@ -55,6 +55,10 @@ POSSIBILITY OF SUCH DAMAGE. #include #endif +#define DEBUG_WEB_SERVER 0 + +#define DLOG if (DEBUG_WEB_SERVER) fprintf + using namespace libtorrent; bool tests_failure = false; @@ -147,7 +151,11 @@ void stop_proxy(int port) { char buf[100]; snprintf(buf, sizeof(buf), "delegated -P%d -Fkill", port); - system(buf); + int ret = system(buf); + if (ret == 0) + { + perror("system"); + } } void start_proxy(int port, int proxy_type) @@ -546,15 +554,23 @@ boost::asio::io_service* web_ios = 0; boost::shared_ptr web_server; libtorrent::mutex web_lock; libtorrent::event web_initialized; +bool stop_thread = false; + +static void terminate_web_thread() +{ + stop_thread = true; + web_ios->stop(); + web_ios = 0; +} void stop_web_server() { if (web_server && web_ios) { - web_ios->post(boost::bind(&io_service::stop, web_ios)); + fprintf(stderr, "stopping web server thread\n"); + web_ios->post(&terminate_web_thread); web_server->join(); web_server.reset(); - web_ios = 0; } } @@ -564,6 +580,8 @@ int start_web_server(bool ssl, bool chunked_encoding) { stop_web_server(); + stop_thread = false; + { libtorrent::mutex::scoped_lock l(web_lock); web_initialized.clear(l); @@ -605,7 +623,7 @@ void send_response(socket_type& s, error_code& ec , int len) { char msg[600]; - int pkt_len = snprintf(msg, sizeof(msg), "HTTP/1.0 %d %s\r\n" + int pkt_len = snprintf(msg, sizeof(msg), "HTTP/1.1 %d %s\r\n" "content-length: %d\r\n" "%s" "%s" @@ -617,24 +635,15 @@ void send_response(socket_type& s, error_code& ec , extra_header[1] , extra_header[2] , extra_header[3]); -// fprintf(stderr, ">> %s\n", msg); + DLOG(stderr, ">> %s\n", msg); write(s, boost::asio::buffer(msg, pkt_len), boost::asio::transfer_all(), ec); + if (ec) fprintf(stderr, "*** send failed: %s\n", ec.message().c_str()); } -bool accept_done = false; - -void on_accept(error_code const& ec) +void on_accept(error_code& accept_ec, error_code const& ec, bool* done) { - if (ec) - { - fprintf(stderr, "Error accepting socket: %s\n", ec.message().c_str()); - accept_done = false; - } - else - { -// fprintf(stderr, "accepting connection\n"); - accept_done = true; - } + accept_ec = ec; + *done = true; } void send_content(socket_type& s, char const* file, int size, bool chunked) @@ -661,6 +670,7 @@ void send_content(socket_type& s, char const* file, int size, bool chunked) bufs[1] = asio::const_buffer(file, chunk_size); } write(s, bufs, boost::asio::transfer_all(), ec); + if (ec) fprintf(stderr, "*** send failed: %s\n", ec.message().c_str()); size -= chunk_size; file += chunk_size; chunk_size *= 2; @@ -669,9 +679,23 @@ void send_content(socket_type& s, char const* file, int size, bool chunked) else { write(s, boost::asio::buffer(file, size), boost::asio::transfer_all(), ec); + if (ec) fprintf(stderr, "*** send failed: %s\n", ec.message().c_str()); } } +void on_read(error_code const& ec, size_t bytes_transferred, size_t* bt, error_code* e, bool* done) +{ + *bt = bytes_transferred; + *e = ec; + *done = true; +} + +void on_read_timeout(error_code const& ec, bool* timed_out) +{ + if (ec) return; + *timed_out = true; +} + void web_server_thread(int* port, bool ssl, bool chunked) { io_service ios; @@ -727,7 +751,7 @@ void web_server_thread(int* port, bool ssl, bool chunked) socket_type s(ios); void* ctx = 0; #ifdef TORRENT_USE_OPENSSL - boost::asio::ssl::context ssl_ctx(ios, boost::asio::ssl::context::sslv2_server); + boost::asio::ssl::context ssl_ctx(ios, boost::asio::ssl::context::sslv23_server); ssl_ctx.use_certificate_chain_file("server.pem"); ssl_ctx.use_private_key_file("server.pem", asio::ssl::context::pem); ssl_ctx.set_verify_mode(boost::asio::ssl::context::verify_none); @@ -741,8 +765,19 @@ void web_server_thread(int* port, bool ssl, bool chunked) { if (connection_close) { -// fprintf(stderr, "closing connection\n"); + error_code ec; +#ifdef TORRENT_USE_OPENSSL + if (ssl) + { + DLOG(stderr, "shutting down SSL connection\n"); + s.get >()->shutdown(ec); + if (ec) fprintf(stderr, "SSL shutdown failed: %s\n", ec.message().c_str()); + ec.clear(); + } +#endif + DLOG(stderr, "closing connection\n"); s.close(ec); + if (ec) fprintf(stderr, "close failed: %s\n", ec.message().c_str()); connection_close = false; } @@ -752,28 +787,53 @@ void web_server_thread(int* port, bool ssl, bool chunked) offset = 0; error_code ec; + instantiate_connection(ios, p, s, ctx); stream_socket* sock; #ifdef TORRENT_USE_OPENSSL - if (ssl) sock = &s.get >()->next_layer().next_layer(); + if (ssl) sock = &s.get >()->next_layer(); else #endif sock = s.get(); - accept_done = false; - acceptor.async_accept(*sock, &on_accept); - ios.reset(); - ios.run_one(); - if (!accept_done) + bool accept_done = false; + DLOG(stderr, "waiting for incoming connection\n"); + acceptor.async_accept(*sock, boost::bind(&on_accept, boost::ref(ec), _1, &accept_done)); + while (!accept_done) + { + error_code e; + ios.reset(); + if (stop_thread || ios.run_one(e) == 0) + { + fprintf(stderr, "io_service stopped: %s\n", e.message().c_str()); + break; + } + } + if (stop_thread) break; + + if (ec) { fprintf(stderr, "accept failed: %s\n", ec.message().c_str()); return; } -// fprintf(stderr, "accepting incoming connection\n"); - if (!s.is_open()) continue; + DLOG(stderr, "accepting incoming connection\n"); + if (!s.is_open()) + { + fprintf(stderr, "incoming connection closed\n"); + continue; + } #ifdef TORRENT_USE_OPENSSL if (ssl) - s.get >()->next_layer().handshake(asio::ssl::stream_base::server); + { + DLOG(stderr, "SSL handshake\n"); + s.get >()->accept_handshake(ec); + if (ec) + { + fprintf(stderr, "SSL handshake failed: %s\n", ec.message().c_str()); + connection_close = true; + continue; + } + } #endif } @@ -801,16 +861,45 @@ void web_server_thread(int* port, bool ssl, bool chunked) while (!p.finished()) { TORRENT_ASSERT(len < int(sizeof(buf))); - size_t received = s.read_some(boost::asio::buffer(&buf[len] - , sizeof(buf) - len), ec); + size_t received = 0; + bool done = false; + bool timed_out = false; + s.async_read_some(boost::asio::buffer(&buf[len] + , sizeof(buf) - len), boost::bind(&on_read, _1, _2, &received, &ec, &done)); + deadline_timer timer(ios); + timer.expires_at(time_now() + seconds(2)); + timer.async_wait(boost::bind(&on_read_timeout, _1, &timed_out)); + + while (!done && !timed_out) + { + error_code e; + ios.reset(); + if (stop_thread || ios.run_one(e) == 0) + { + fprintf(stderr, "io_service stopped: %s\n", e.message().c_str()); + break; + } + } + if (timed_out) + { + fprintf(stderr, "read timed out, closing connection\n"); + failed = true; + break; + } // fprintf(stderr, "read: %d\n", int(received)); if (ec || received <= 0) { - fprintf(stderr, "read failed: %s received: %d\n", ec.message().c_str(), int(received)); + fprintf(stderr, "read failed: \"%s\" (%s) received: %d\n" + , ec.message().c_str(), ec.category().name(), int(received)); failed = true; break; } + + timer.cancel(ec); + if (ec) + fprintf(stderr, "timer.cancel failed: %s\n", ec.message().c_str()); + len += received; p.incoming(buffer::const_interval(buf + offset, buf + len), error); @@ -831,15 +920,21 @@ void web_server_thread(int* port, bool ssl, bool chunked) // the Via: header is an indicator of delegate making the request if (connection == "close" || !via.empty()) { -// fprintf(stderr, "got connection close\n"); + DLOG(stderr, "*** got connection close\n"); + connection_close = true; + } + + if (p.protocol() == "HTTP/1.0") + { + DLOG(stderr, "*** HTTP/1.0, closing connection when done\n"); connection_close = true; } -// fprintf(stderr, "%s", std::string(buf + offset, p.body_start()).c_str()); + DLOG(stderr, "%s", std::string(buf + offset, p.body_start()).c_str()); if (failed) { - fprintf(stderr, "connection failed\n"); + fprintf(stderr, "*** connection failed\n"); connection_close = true; break; } @@ -849,7 +944,8 @@ void web_server_thread(int* port, bool ssl, bool chunked) if (p.method() != "get" && p.method() != "post") { - fprintf(stderr, "incorrect method: %s\n", p.method().c_str()); + fprintf(stderr, "*** incorrect method: %s\n", p.method().c_str()); + connection_close = true; break; } @@ -888,11 +984,14 @@ void web_server_thread(int* port, bool ssl, bool chunked) send_response(s, ec, 200, "OK", extra_header, buf.size()); write(s, boost::asio::buffer(&buf[0], buf.size()), boost::asio::transfer_all(), ec); + if (ec) + fprintf(stderr, "*** send failed: %s\n", ec.message().c_str()); } if (path.substr(0, 6) == "/seed?") { char const* piece = strstr(path.c_str(), "&piece="); + if (piece == 0) piece = strstr(path.c_str(), "?piece="); if (piece == 0) { fprintf(stderr, "invalid web seed request: %s\n", path.c_str()); @@ -900,6 +999,7 @@ void web_server_thread(int* port, bool ssl, bool chunked) } boost::uint64_t idx = atoi(piece + 7); char const* range = strstr(path.c_str(), "&ranges="); + if (range == 0) range = strstr(path.c_str(), "?ranges="); int range_end = 0; int range_start = 0; if (range) @@ -917,7 +1017,7 @@ void web_server_thread(int* port, bool ssl, bool chunked) { range_start = 0; // assume piece size of 64kiB - range_end = 64*1024-1; + range_end = 64*1024+1; } int size = range_end - range_start + 1; @@ -932,10 +1032,16 @@ void web_server_thread(int* port, bool ssl, bool chunked) continue; } send_response(s, ec, 200, "OK", extra_header, size); -// fprintf(stderr, "sending %d bytes of payload [%d, %d)\n" -// , size, int(off), int(off + size)); + DLOG(stderr, "sending %d bytes of payload [%d, %d) piece: %d\n" + , size, int(off), int(off + size), int(idx)); write(s, boost::asio::buffer(&file_buf[0] + off, size) , boost::asio::transfer_all(), ec); + if (ec) + fprintf(stderr, "*** send failed: %s\n", ec.message().c_str()); + else + { + DLOG(stderr, "*** done\n"); + } memmove(buf, buf + offset, len - offset); len -= offset; @@ -987,7 +1093,7 @@ void web_server_thread(int* port, bool ssl, bool chunked) { send_content(s, &file_buf[0] + start, end - start + 1, chunked); } -// fprintf(stderr, "send %d bytes of payload\n", end - start + 1); + DLOG(stderr, "send %d bytes of payload\n", end - start + 1); } else { @@ -995,12 +1101,14 @@ void web_server_thread(int* port, bool ssl, bool chunked) if (!file_buf.empty()) send_content(s, &file_buf[0], file_buf.size(), chunked); } -// fprintf(stderr, "%d bytes left in receive buffer. offset: %d\n", len - offset, offset); + DLOG(stderr, "%d bytes left in receive buffer. offset: %d\n", len - offset, offset); memmove(buf, buf + offset, len - offset); len -= offset; offset = 0; } while (offset < len); } + + web_ios = 0; fprintf(stderr, "exiting web server thread\n"); } diff --git a/test/test_web_seed.cpp b/test/test_web_seed.cpp index 61d3515a4..e74024afb 100644 --- a/test/test_web_seed.cpp +++ b/test/test_web_seed.cpp @@ -239,9 +239,13 @@ int run_suite(char const* protocol, bool test_url_seed, bool chunked_encoding) } else { - snprintf(tmp, sizeof(tmp), "http://127.0.0.1:%d/seed", port); + snprintf(tmp, sizeof(tmp), "%s://127.0.0.1:%d/seed", protocol, port); t.add_http_seed(tmp); } + fprintf(stderr, "testing: %s\n", tmp); + +// for (int i = 0; i < 1000; ++i) sleep(1000); + // calculate the hash for all pieces set_piece_hashes(t, "./tmp1_web_seed", ec);