fixed bug in web_peer_connection which could cause a hang when downloading from web servers

This commit is contained in:
Arvid Norberg 2010-06-22 18:09:04 +00:00
parent 996c68dfa7
commit 06190d8920
6 changed files with 108 additions and 45 deletions

View File

@ -33,6 +33,8 @@
incoming connection incoming connection
* added more detailed instrumentation of the disk I/O thread * added more detailed instrumentation of the disk I/O thread
* fixed bug in web_peer_connection which could cause a hang when downloading
from web servers
* fixed bug in metadata extensions combined with encryption * fixed bug in metadata extensions combined with encryption
* refactored socket reading code to not use async. operations unnecessarily * refactored socket reading code to not use async. operations unnecessarily
* some timer optimizations * some timer optimizations

View File

@ -5022,7 +5022,6 @@ namespace libtorrent
TORRENT_ASSERT(m_outstanding_bytes >= 0); TORRENT_ASSERT(m_outstanding_bytes >= 0);
if (t && t->valid_metadata() && !m_disconnecting) if (t && t->valid_metadata() && !m_disconnecting)
{ {
boost::optional<piece_block_progress> p = t?downloading_piece_progress():boost::optional<piece_block_progress>();
torrent_info const& ti = t->torrent_file(); torrent_info const& ti = t->torrent_file();
// if the piece is fully downloaded, we might have popped it from the // if the piece is fully downloaded, we might have popped it from the
// download queue already // download queue already
@ -5038,10 +5037,11 @@ namespace libtorrent
TORRENT_ASSERT(i->block.piece_index <= last_block.piece_index); TORRENT_ASSERT(i->block.piece_index <= last_block.piece_index);
TORRENT_ASSERT(i->block.piece_index < last_block.piece_index TORRENT_ASSERT(i->block.piece_index < last_block.piece_index
|| i->block.block_index <= last_block.block_index); || i->block.block_index <= last_block.block_index);
if (p && i->block == piece_block(p->piece_index, p->block_index)) if (m_received_in_piece && i == m_download_queue.begin())
{ {
in_download_queue = true; in_download_queue = true;
outstanding_bytes += p->full_block_bytes - m_received_in_piece; TORRENT_ASSERT(t->to_req(i->block).length >= m_received_in_piece);
outstanding_bytes += t->to_req(i->block).length - m_received_in_piece;
} }
else else
{ {

View File

@ -2173,7 +2173,9 @@ namespace libtorrent
{ {
if (!m_torrent_file->files().at(file_index).pad_file) if (!m_torrent_file->files().at(file_index).pad_file)
{ {
filesystem().async_finalize_file(file_index); if (m_owning_storage.get())
m_storage->async_finalize_file(file_index);
if (m_ses.m_alerts.should_post<piece_finished_alert>()) if (m_ses.m_alerts.should_post<piece_finished_alert>())
{ {
// this file just completed, post alert // this file just completed, post alert

View File

@ -119,8 +119,15 @@ namespace libtorrent
void web_peer_connection::disconnect(error_code const& ec, int error) void web_peer_connection::disconnect(error_code const& ec, int error)
{ {
boost::shared_ptr<torrent> t = associated_torrent().lock(); boost::shared_ptr<torrent> t = associated_torrent().lock();
if (t && m_block_pos)
t->add_redundant_bytes(m_block_pos);
peer_connection::disconnect(ec, error); peer_connection::disconnect(ec, error);
if (t) t->disconnect_web_seed(this); if (t)
{
t->disconnect_web_seed(this);
}
} }
boost::optional<piece_block_progress> boost::optional<piece_block_progress>
@ -135,7 +142,7 @@ namespace libtorrent
piece_block_progress ret; piece_block_progress ret;
ret.piece_index = m_requests.front().piece; ret.piece_index = m_requests.front().piece;
ret.bytes_downloaded = m_block_pos; ret.bytes_downloaded = m_block_pos % t->block_size();
if (m_block_pos) if (m_block_pos)
ret.block_index = (m_requests.front().start + m_block_pos - 1) / t->block_size(); ret.block_index = (m_requests.front().start + m_block_pos - 1) / t->block_size();
else else
@ -547,18 +554,22 @@ namespace libtorrent
peer_request front_request = m_requests.front(); peer_request front_request = m_requests.front();
TORRENT_ASSERT(m_block_pos >= 0); TORRENT_ASSERT(m_block_pos >= 0);
if (m_block_pos + payload_transferred > front_request.length)
payload_transferred = front_request.length - m_block_pos;
#ifdef TORRENT_VERBOSE_LOGGING
(*m_logger) << "*** payload_transferred: " << payload_transferred
<< " [" << front_request.piece << ":" << front_request.start
<< " = " << front_request.length << "]\n";
#endif
m_statistics.received_bytes(payload_transferred, 0); m_statistics.received_bytes(payload_transferred, 0);
incoming_piece_fragment(payload_transferred);
bytes_transferred -= payload_transferred; bytes_transferred -= payload_transferred;
m_range_pos += payload_transferred; m_range_pos += payload_transferred;
m_block_pos += payload_transferred; m_block_pos += payload_transferred;
if (m_range_pos > range_end - range_start) m_range_pos = range_end - range_start; if (m_range_pos > range_end - range_start) m_range_pos = range_end - range_start;
// std::cerr << "REQUESTS: m_requests: " << m_requests.size() #if 0
// << " file_requests: " << m_file_requests.size() << std::endl; std::cerr << "REQUESTS: m_requests: " << m_requests.size()
<< " file_requests: " << m_file_requests.size() << std::endl;
#endif
int file_index = m_file_requests.front(); int file_index = m_file_requests.front();
peer_request in_range = info.orig_files().map_file(file_index, range_start peer_request in_range = info.orig_files().map_file(file_index, range_start
@ -567,13 +578,13 @@ namespace libtorrent
size_type rs = size_type(in_range.piece) * info.piece_length() + in_range.start; size_type rs = size_type(in_range.piece) * info.piece_length() + in_range.start;
size_type re = rs + in_range.length; size_type re = rs + in_range.length;
size_type fs = size_type(front_request.piece) * info.piece_length() + front_request.start; size_type fs = size_type(front_request.piece) * info.piece_length() + front_request.start;
/* #if 0
size_type fe = fs + front_request.length; size_type fe = fs + front_request.length;
std::cerr << "RANGE: r = (" << rs << ", " << re << " ) " std::cerr << "RANGE: r = (" << rs << ", " << re << " ) "
"f = (" << fs << ", " << fe << ") " "f = (" << fs << ", " << fe << ") "
"file_index = " << file_index << " received_body = " << m_received_body << std::endl; "file_index = " << file_index << " received_body = " << m_received_body << std::endl;
*/ #endif
// the http response body consists of 3 parts // the http response body consists of 3 parts
// 1. the middle of a block or the ending of a block // 1. the middle of a block or the ending of a block
@ -585,6 +596,8 @@ namespace libtorrent
if (!range_overlaps_request) if (!range_overlaps_request)
{ {
incoming_piece_fragment((std::min)(payload_transferred
, front_request.length - m_block_pos));
m_statistics.received_bytes(0, bytes_transferred); m_statistics.received_bytes(0, bytes_transferred);
// this means the end of the incoming request ends _before_ the // this means the end of the incoming request ends _before_ the
// first expected byte (fs + m_piece.size()) // first expected byte (fs + m_piece.size())
@ -616,6 +629,7 @@ namespace libtorrent
m_body_start += copy_size; m_body_start += copy_size;
TORRENT_ASSERT(m_received_body <= range_end - range_start); TORRENT_ASSERT(m_received_body <= range_end - range_start);
TORRENT_ASSERT(int(m_piece.size()) <= front_request.length); TORRENT_ASSERT(int(m_piece.size()) <= front_request.length);
incoming_piece_fragment(copy_size);
if (int(m_piece.size()) == front_request.length) if (int(m_piece.size()) == front_request.length)
{ {
// each call to incoming_piece() may result in us becoming // each call to incoming_piece() may result in us becoming
@ -626,8 +640,7 @@ namespace libtorrent
incoming_piece(front_request, &m_piece[0]); incoming_piece(front_request, &m_piece[0]);
m_requests.pop_front(); m_requests.pop_front();
if (associated_torrent().expired()) return; if (associated_torrent().expired()) return;
TORRENT_ASSERT(m_block_pos == front_request.length); m_block_pos -= front_request.length;
m_block_pos = 0;
cut_receive_buffer(m_body_start, t->block_size() + 1024); cut_receive_buffer(m_body_start, t->block_size() + 1024);
m_body_start = 0; m_body_start = 0;
recv_buffer = receive_buffer(); recv_buffer = receive_buffer();
@ -645,11 +658,11 @@ namespace libtorrent
peer_request r = m_requests.front(); peer_request r = m_requests.front();
TORRENT_ASSERT(recv_buffer.left() >= r.length); TORRENT_ASSERT(recv_buffer.left() >= r.length);
incoming_piece_fragment(r.length);
incoming_piece(r, recv_buffer.begin); incoming_piece(r, recv_buffer.begin);
m_requests.pop_front(); m_requests.pop_front();
if (associated_torrent().expired()) return; if (associated_torrent().expired()) return;
TORRENT_ASSERT(m_block_pos == front_request.length); m_block_pos -= r.length;
m_block_pos = 0;
m_received_body += r.length; m_received_body += r.length;
TORRENT_ASSERT(receive_buffer().begin + m_body_start == recv_buffer.begin); TORRENT_ASSERT(receive_buffer().begin + m_body_start == recv_buffer.begin);
TORRENT_ASSERT(m_received_body <= range_end - range_start); TORRENT_ASSERT(m_received_body <= range_end - range_start);
@ -695,6 +708,7 @@ namespace libtorrent
continue; continue;
} }
if (bytes_transferred == 0) break; if (bytes_transferred == 0) break;
TORRENT_ASSERT(payload_transferred > 0);
} }
TORRENT_ASSERT(bytes_transferred == 0); TORRENT_ASSERT(bytes_transferred == 0);
} }

View File

@ -613,25 +613,33 @@ void web_server_thread(int* port, bool ssl)
char buf[10000]; char buf[10000];
int len = 0; int len = 0;
int offset = 0; int offset = 0;
bool connection_close = false;
stream_socket s(ios); stream_socket s(ios);
for (;;) for (;;)
{ {
s.close(ec); if (connection_close)
len = 0;
offset = 0;
accept_done = false;
acceptor.async_accept(s, &on_accept);
ios.reset();
ios.run_one();
if (!accept_done)
{ {
fprintf(stderr, "accept failed\n"); s.close(ec);
return; connection_close = false;
} }
if (!s.is_open()) continue; if (!s.is_open())
{
len = 0;
offset = 0;
accept_done = false;
acceptor.async_accept(s, &on_accept);
ios.reset();
ios.run_one();
if (!accept_done)
{
fprintf(stderr, "accept failed\n");
return;
}
if (!s.is_open()) continue;
}
http_parser p; http_parser p;
bool failed = false; bool failed = false;
@ -663,9 +671,9 @@ void web_server_thread(int* port, bool ssl)
break; break;
} }
len += received; len += received;
p.incoming(buffer::const_interval(buf + offset, buf + len), error); p.incoming(buffer::const_interval(buf + offset, buf + len), error);
TEST_CHECK(error == false); TEST_CHECK(error == false);
if (error) if (error)
{ {
@ -674,11 +682,27 @@ void web_server_thread(int* port, bool ssl)
break; break;
} }
} }
std::string connection = p.header("connection");
std::string via = p.header("via");
// The delegate proxy doesn't say connection close, but it expects it to be closed
// the Via: header is an indicator of delegate making the request
if (connection == "close" || !via.empty())
{
connection_close = true;
}
// fprintf(stderr, "%s", std::string(buf + offset, p.body_start()).c_str()); // fprintf(stderr, "%s", std::string(buf + offset, p.body_start()).c_str());
if (failed) break; if (failed)
{
s.close(ec);
break;
}
offset += p.body_start() + p.content_length(); offset += p.body_start() + p.content_length();
// fprintf(stderr, "offset: %d len: %d\n", offset, len);
if (p.method() != "get" && p.method() != "post") if (p.method() != "get" && p.method() != "post")
{ {
@ -770,6 +794,9 @@ void web_server_thread(int* port, bool ssl)
write(s, boost::asio::buffer(&file_buf[0], file_buf.size()), boost::asio::transfer_all(), ec); write(s, boost::asio::buffer(&file_buf[0], file_buf.size()), boost::asio::transfer_all(), ec);
} }
// fprintf(stderr, "%d bytes left in receive buffer. offset: %d\n", len - offset, offset); // fprintf(stderr, "%d bytes left in receive buffer. offset: %d\n", len - offset, offset);
memmove(buf, buf + offset, len - offset);
len -= offset;
offset = 0;
} while (offset < len); } while (offset < len);
} }
fprintf(stderr, "exiting web server thread\n"); fprintf(stderr, "exiting web server thread\n");

View File

@ -92,7 +92,7 @@ void test_transfer(boost::intrusive_ptr<torrent_info> torrent_file, int proxy, i
cache_status cs; cache_status cs;
for (int i = 0; i < 10; ++i) for (int i = 0; i < 30; ++i)
{ {
torrent_status s = th.status(); torrent_status s = th.status();
session_status ss = ses.status(); session_status ss = ses.status();
@ -118,10 +118,10 @@ void test_transfer(boost::intrusive_ptr<torrent_info> torrent_file, int proxy, i
if (th.is_seed()/* && ss.download_rate == 0.f*/) if (th.is_seed()/* && ss.download_rate == 0.f*/)
{ {
TEST_CHECK(th.status().total_payload_download == total_size); TEST_EQUAL(th.status().total_payload_download, total_size);
// we need to sleep here a bit to let the session sync with the torrent stats // we need to sleep here a bit to let the session sync with the torrent stats
test_sleep(1000); test_sleep(1000);
TEST_CHECK(ses.status().total_payload_download == total_size); TEST_EQUAL(ses.status().total_payload_download, total_size);
break; break;
} }
test_sleep(500); test_sleep(500);
@ -135,6 +135,7 @@ void test_transfer(boost::intrusive_ptr<torrent_info> torrent_file, int proxy, i
<< " session_rate_sum: " << ses_rate_sum << " session_rate_sum: " << ses_rate_sum
<< " session total download: " << ses.status().total_payload_download << " session total download: " << ses.status().total_payload_download
<< " torrent total download: " << th.status().total_payload_download << " torrent total download: " << th.status().total_payload_download
<< " redundant: " << th.status().total_redundant_bytes
<< std::endl; << std::endl;
// the rates for each second should sum up to the total, with a 10% error margin // the rates for each second should sum up to the total, with a 10% error margin
@ -156,24 +157,41 @@ int test_main()
error_code ec; error_code ec;
create_directories("./tmp1_web_seed/test_torrent_dir", ec); create_directories("./tmp1_web_seed/test_torrent_dir", ec);
int file_sizes[] =
{ 5, 16 - 5, 16, 17, 10, 30, 30, 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1
,1,1,1,1,1,1,13,65,34,75,2,3,4,5,23,9,43,4,43,6, 4};
char random_data[300000]; char random_data[300000];
std::srand(10); std::srand(10);
// memset(random_data, 1, sizeof(random_data)); for (int i = 0; i != sizeof(file_sizes)/sizeof(file_sizes[0]); ++i)
std::generate(random_data, random_data + sizeof(random_data), &std::rand); {
std::ofstream("./tmp1_web_seed/test_torrent_dir/test1").write(random_data, 35); std::generate(random_data, random_data + sizeof(random_data), &std::rand);
std::ofstream("./tmp1_web_seed/test_torrent_dir/test2").write(random_data, 16536 - 35); char filename[200];
std::ofstream("./tmp1_web_seed/test_torrent_dir/test3").write(random_data, 16536); snprintf(filename, sizeof(filename), "./tmp1_web_seed/test_torrent_dir/test%d", i);
std::ofstream("./tmp1_web_seed/test_torrent_dir/test4").write(random_data, 17); error_code ec;
std::ofstream("./tmp1_web_seed/test_torrent_dir/test5").write(random_data, 16536); file out(filename, file::write_only, ec);
std::ofstream("./tmp1_web_seed/test_torrent_dir/test6").write(random_data, 300000); TEST_CHECK(!ec);
std::ofstream("./tmp1_web_seed/test_torrent_dir/test7").write(random_data, 300000); if (ec)
{
fprintf(stderr, "ERROR opening file '%s': %s\n", filename, ec.message().c_str());
return 1;
}
file::iovec_t b = { random_data, file_sizes[i]};
out.writev(0, &b, 1, ec);
TEST_CHECK(!ec);
if (ec)
{
fprintf(stderr, "ERROR writing file '%s': %s\n", filename, ec.message().c_str());
return 1;
}
}
file_storage fs; file_storage fs;
add_files(fs, "./tmp1_web_seed/test_torrent_dir"); add_files(fs, "./tmp1_web_seed/test_torrent_dir");
int port = start_web_server(); int port = start_web_server();
libtorrent::create_torrent t(fs, 16 * 1024); libtorrent::create_torrent t(fs, 16);
char tmp[512]; char tmp[512];
snprintf(tmp, sizeof(tmp), "http://127.0.0.1:%d/tmp1_web_seed", port); snprintf(tmp, sizeof(tmp), "http://127.0.0.1:%d/tmp1_web_seed", port);
t.add_url_seed(tmp); t.add_url_seed(tmp);