wininet: Rewrite chunked stream to store full stream state and allow non-blocking reads crossing chunk boundries.

Signed-off-by: Jacek Caban <jacek@codeweavers.com>
Signed-off-by: Alexandre Julliard <julliard@winehq.org>
This commit is contained in:
Jacek Caban 2016-08-01 16:17:00 +02:00 committed by Alexandre Julliard
parent 4fe8b0cc1c
commit 6e0c842dfa
1 changed files with 131 additions and 149 deletions

View File

@ -404,7 +404,15 @@ typedef struct {
DWORD buf_size;
DWORD buf_pos;
DWORD chunk_size;
BOOL end_of_data;
enum {
CHUNKED_STREAM_STATE_READING_CHUNK_SIZE,
CHUNKED_STREAM_STATE_DISCARD_EOL_AFTER_SIZE,
CHUNKED_STREAM_STATE_READING_CHUNK,
CHUNKED_STREAM_STATE_DISCARD_EOL_AFTER_DATA,
CHUNKED_STREAM_STATE_DISCARD_EOL_AT_END,
CHUNKED_STREAM_STATE_END_OF_STREAM
} state;
} chunked_stream_t;
static inline void destroy_data_stream(data_stream_t *stream)
@ -2695,109 +2703,12 @@ static const data_stream_vtbl_t netconn_stream_vtbl = {
netconn_destroy
};
/* read some more data into the read buffer (the read section must be held) */
static DWORD read_more_chunked_data(chunked_stream_t *stream, http_request_t *req, int maxlen)
static char next_chunked_data_char(chunked_stream_t *stream)
{
DWORD res;
int len;
assert(stream->buf_size);
assert(!stream->end_of_data);
if (stream->buf_pos)
{
/* move existing data to the start of the buffer */
if(stream->buf_size)
memmove(stream->buf, stream->buf + stream->buf_pos, stream->buf_size);
stream->buf_pos = 0;
}
if (maxlen == -1) maxlen = sizeof(stream->buf);
res = NETCON_recv( req->netconn, stream->buf + stream->buf_size,
maxlen - stream->buf_size, TRUE, &len );
if(res == ERROR_SUCCESS)
stream->buf_size += len;
return res;
}
/* remove some amount of data from the read buffer (the read section must be held) */
static void remove_chunked_data(chunked_stream_t *stream, int count)
{
if (!(stream->buf_size -= count)) stream->buf_pos = 0;
else stream->buf_pos += count;
}
/* discard data contents until we reach end of line (the read section must be held) */
static DWORD discard_chunked_eol(chunked_stream_t *stream, http_request_t *req)
{
DWORD res;
do
{
BYTE *eol = memchr(stream->buf + stream->buf_pos, '\n', stream->buf_size);
if (eol)
{
remove_chunked_data(stream, (eol + 1) - (stream->buf + stream->buf_pos));
break;
}
stream->buf_pos = stream->buf_size = 0; /* discard everything */
if ((res = read_more_chunked_data(stream, req, -1)) != ERROR_SUCCESS) return res;
} while (stream->buf_size);
return ERROR_SUCCESS;
}
/* read the size of the next chunk (the read section must be held) */
static DWORD start_next_chunk(chunked_stream_t *stream, http_request_t *req)
{
DWORD chunk_size = 0, res;
assert(!stream->chunk_size || stream->chunk_size == ~0u);
if (stream->end_of_data) return ERROR_SUCCESS;
/* read terminator for the previous chunk */
if(!stream->chunk_size && (res = discard_chunked_eol(stream, req)) != ERROR_SUCCESS)
return res;
for (;;)
{
while (stream->buf_size)
{
char ch = stream->buf[stream->buf_pos];
if (ch >= '0' && ch <= '9') chunk_size = chunk_size * 16 + ch - '0';
else if (ch >= 'a' && ch <= 'f') chunk_size = chunk_size * 16 + ch - 'a' + 10;
else if (ch >= 'A' && ch <= 'F') chunk_size = chunk_size * 16 + ch - 'A' + 10;
else if (ch == ';' || ch == '\r' || ch == '\n')
{
TRACE( "reading %u byte chunk\n", chunk_size );
stream->chunk_size = chunk_size;
if (req->contentLength == ~0u) req->contentLength = chunk_size;
else req->contentLength += chunk_size;
/* eat the rest of this line */
if ((res = discard_chunked_eol(stream, req)) != ERROR_SUCCESS)
return res;
/* if there's chunk data, return now */
if (chunk_size) return ERROR_SUCCESS;
/* otherwise, eat the terminator for this chunk */
if ((res = discard_chunked_eol(stream, req)) != ERROR_SUCCESS)
return res;
stream->end_of_data = TRUE;
return ERROR_SUCCESS;
}
remove_chunked_data(stream, 1);
}
if ((res = read_more_chunked_data(stream, req, -1)) != ERROR_SUCCESS) return res;
if (!stream->buf_size)
{
stream->chunk_size = 0;
return ERROR_SUCCESS;
}
}
stream->buf_size--;
return stream->buf[stream->buf_pos++];
}
static DWORD chunked_get_avail_data(data_stream_t *stream, http_request_t *req)
@ -2809,77 +2720,148 @@ static DWORD chunked_get_avail_data(data_stream_t *stream, http_request_t *req)
static BOOL chunked_end_of_data(data_stream_t *stream, http_request_t *req)
{
chunked_stream_t *chunked_stream = (chunked_stream_t*)stream;
return chunked_stream->end_of_data;
return chunked_stream->state == CHUNKED_STREAM_STATE_END_OF_STREAM;
}
static DWORD chunked_read(data_stream_t *stream, http_request_t *req, BYTE *buf, DWORD size,
DWORD *read, blocking_mode_t blocking_mode)
{
chunked_stream_t *chunked_stream = (chunked_stream_t*)stream;
DWORD read_bytes = 0, ret_read = 0, res = ERROR_SUCCESS;
DWORD ret_read = 0, res = ERROR_SUCCESS;
BOOL continue_read = TRUE;
int read_bytes;
char ch;
if(!chunked_stream->chunk_size || chunked_stream->chunk_size == ~0u) {
res = start_next_chunk(chunked_stream, req);
if(res != ERROR_SUCCESS)
return res;
}
do {
TRACE("state %d\n", chunked_stream->state);
while(size && chunked_stream->chunk_size && !chunked_stream->end_of_data) {
if(chunked_stream->buf_size) {
read_bytes = min(size, min(chunked_stream->buf_size, chunked_stream->chunk_size));
/* this could block */
if(blocking_mode == BLOCKING_DISALLOW && read_bytes == chunked_stream->chunk_size)
/* Ensure that we have data in the buffer for states that need it. */
if(!chunked_stream->buf_size) {
switch(chunked_stream->state) {
case CHUNKED_STREAM_STATE_READING_CHUNK_SIZE:
case CHUNKED_STREAM_STATE_DISCARD_EOL_AFTER_SIZE:
case CHUNKED_STREAM_STATE_DISCARD_EOL_AFTER_DATA:
case CHUNKED_STREAM_STATE_DISCARD_EOL_AT_END:
chunked_stream->buf_pos = 0;
res = NETCON_recv(req->netconn, chunked_stream->buf, sizeof(chunked_stream->buf), blocking_mode != BLOCKING_DISALLOW, &read_bytes);
if(res == ERROR_SUCCESS && read_bytes) {
chunked_stream->buf_size += read_bytes;
}else if(res == WSAEWOULDBLOCK) {
continue_read = FALSE;
continue;
}else {
chunked_stream->state = CHUNKED_STREAM_STATE_END_OF_STREAM;
}
break;
default:
break;
}
}
memcpy(buf+ret_read, chunked_stream->buf+chunked_stream->buf_pos, read_bytes);
remove_chunked_data(chunked_stream, read_bytes);
}else {
switch(chunked_stream->state) {
case CHUNKED_STREAM_STATE_READING_CHUNK_SIZE:
ch = next_chunked_data_char(chunked_stream);
if(ch >= '0' && ch <= '9') {
chunked_stream->chunk_size = chunked_stream->chunk_size * 16 + ch - '0';
}else if(ch >= 'a' && ch <= 'f') {
chunked_stream->chunk_size = chunked_stream->chunk_size * 16 + ch - 'a' + 10;
}else if (ch >= 'A' && ch <= 'F') {
chunked_stream->chunk_size = chunked_stream->chunk_size * 16 + ch - 'A' + 10;
}else if (ch == ';' || ch == '\r' || ch == '\n') {
TRACE("reading %u byte chunk\n", chunked_stream->chunk_size);
chunked_stream->buf_size++;
chunked_stream->buf_pos--;
if(req->contentLength == ~0u) req->contentLength = chunked_stream->chunk_size;
else req->contentLength += chunked_stream->chunk_size;
chunked_stream->state = CHUNKED_STREAM_STATE_DISCARD_EOL_AFTER_SIZE;
}
break;
case CHUNKED_STREAM_STATE_DISCARD_EOL_AFTER_SIZE:
ch = next_chunked_data_char(chunked_stream);
if(ch == '\n')
chunked_stream->state = chunked_stream->chunk_size
? CHUNKED_STREAM_STATE_READING_CHUNK
: CHUNKED_STREAM_STATE_DISCARD_EOL_AT_END;
else if(ch != '\r')
WARN("unexpected char '%c'\n", ch);
break;
case CHUNKED_STREAM_STATE_READING_CHUNK:
assert(chunked_stream->chunk_size);
if(!size) {
continue_read = FALSE;
break;
}
read_bytes = min(size, chunked_stream->chunk_size);
if(blocking_mode == BLOCKING_DISALLOW) {
DWORD avail;
if(chunked_stream->buf_size) {
if(read_bytes > chunked_stream->buf_size)
read_bytes = chunked_stream->buf_size;
if(!is_valid_netconn(req->netconn) || !NETCON_query_data_available(req->netconn, &avail) || !avail)
memcpy(buf+ret_read, chunked_stream->buf+chunked_stream->buf_pos, read_bytes);
chunked_stream->buf_pos += read_bytes;
chunked_stream->buf_size -= read_bytes;
}else {
res = NETCON_recv(req->netconn, (char*)buf+ret_read, read_bytes,
blocking_mode != BLOCKING_DISALLOW, (int*)&read_bytes);
if(res != ERROR_SUCCESS) {
continue_read = FALSE;
break;
if(read_bytes > avail)
read_bytes = avail;
}
/* this could block */
if(read_bytes == chunked_stream->chunk_size)
break;
if(!read_bytes) {
chunked_stream->state = CHUNKED_STREAM_STATE_END_OF_STREAM;
continue;
}
}
res = NETCON_recv(req->netconn, (char *)buf+ret_read, read_bytes, TRUE, (int*)&read_bytes);
if(res != ERROR_SUCCESS)
break;
chunked_stream->chunk_size -= read_bytes;
size -= read_bytes;
ret_read += read_bytes;
if(!chunked_stream->chunk_size)
chunked_stream->state = CHUNKED_STREAM_STATE_DISCARD_EOL_AFTER_DATA;
if(blocking_mode == BLOCKING_ALLOW)
blocking_mode = BLOCKING_DISALLOW;
break;
case CHUNKED_STREAM_STATE_DISCARD_EOL_AFTER_DATA:
ch = next_chunked_data_char(chunked_stream);
if(ch == '\n')
chunked_stream->state = CHUNKED_STREAM_STATE_READING_CHUNK_SIZE;
else if(ch != '\r')
WARN("unexpected char '%c'\n", ch);
break;
case CHUNKED_STREAM_STATE_DISCARD_EOL_AT_END:
ch = next_chunked_data_char(chunked_stream);
if(ch == '\n')
chunked_stream->state = CHUNKED_STREAM_STATE_END_OF_STREAM;
else if(ch != '\r')
WARN("unexpected char '%c'\n", ch);
break;
case CHUNKED_STREAM_STATE_END_OF_STREAM:
continue_read = FALSE;
break;
}
} while(continue_read);
chunked_stream->chunk_size -= read_bytes;
size -= read_bytes;
ret_read += read_bytes;
if(size && !chunked_stream->chunk_size) {
assert(blocking_mode != BLOCKING_DISALLOW);
res = start_next_chunk(chunked_stream, req);
if(res != ERROR_SUCCESS)
break;
}
if(ret_read)
res = ERROR_SUCCESS;
if(res != ERROR_SUCCESS && res != WSAEWOULDBLOCK)
return res;
if(blocking_mode == BLOCKING_ALLOW)
blocking_mode = BLOCKING_DISALLOW;
}
TRACE("read %u bytes\n", ret_read);
TRACE("read %d bytes\n", ret_read);
*read = ret_read;
return res;
return ERROR_SUCCESS;
}
static BOOL chunked_drain_content(data_stream_t *stream, http_request_t *req)
{
chunked_stream_t *chunked_stream = (chunked_stream_t*)stream;
remove_chunked_data(chunked_stream, chunked_stream->buf_size);
return chunked_stream->end_of_data;
return chunked_stream->state == CHUNKED_STREAM_STATE_END_OF_STREAM;
}
static void chunked_destroy(data_stream_t *stream)
@ -2928,8 +2910,8 @@ static DWORD set_content_length(http_request_t *request)
chunked_stream->data_stream.vtbl = &chunked_stream_vtbl;
chunked_stream->buf_size = chunked_stream->buf_pos = 0;
chunked_stream->chunk_size = ~0u;
chunked_stream->end_of_data = FALSE;
chunked_stream->chunk_size = 0;
chunked_stream->state = CHUNKED_STREAM_STATE_READING_CHUNK_SIZE;
if(request->read_size) {
memcpy(chunked_stream->buf, request->read_buf+request->read_pos, request->read_size);