msi: Optimise WHERE operations.

Optimise WHERE operations on matching against strings by adding a new
function find_matching_rows to the views, which eliminates the need to
examine every record when executing a query. Implement this function
in the table using a hash table on the ID stored in the data.
This commit is contained in:
Robert Shearman 2006-03-18 16:12:15 +00:00 committed by Alexandre Julliard
parent d62aa01ec0
commit 9f487ba1de
9 changed files with 276 additions and 10 deletions

View File

@ -182,6 +182,14 @@ static UINT DELETE_delete( struct tagMSIVIEW *view )
return ERROR_SUCCESS;
}
static UINT DELETE_find_matching_rows( struct tagMSIVIEW *view, UINT col,
UINT val, UINT *row, MSIITERHANDLE *handle )
{
TRACE("%p, %d, %u, %p\n", view, col, val, *handle);
return ERROR_FUNCTION_FAILED;
}
MSIVIEWOPS delete_ops =
{
@ -194,7 +202,8 @@ MSIVIEWOPS delete_ops =
DELETE_get_dimensions,
DELETE_get_column_info,
DELETE_modify,
DELETE_delete
DELETE_delete,
DELETE_find_matching_rows
};
UINT DELETE_CreateView( MSIDATABASE *db, MSIVIEW **view, MSIVIEW *table )

View File

@ -246,6 +246,27 @@ static UINT DISTINCT_delete( struct tagMSIVIEW *view )
return ERROR_SUCCESS;
}
static UINT DISTINCT_find_matching_rows( struct tagMSIVIEW *view, UINT col,
UINT val, UINT *row, MSIITERHANDLE *handle )
{
MSIDISTINCTVIEW *dv = (MSIDISTINCTVIEW*)view;
UINT r;
TRACE("%p, %d, %u, %p\n", view, col, val, *handle);
if( !dv->table )
return ERROR_FUNCTION_FAILED;
r = dv->table->ops->find_matching_rows( dv->table, col, val, row, handle );
if( *row > dv->row_count )
return ERROR_NO_MORE_ITEMS;
*row = dv->translation[ *row ];
return r;
}
MSIVIEWOPS distinct_ops =
{
@ -258,7 +279,8 @@ MSIVIEWOPS distinct_ops =
DISTINCT_get_dimensions,
DISTINCT_get_column_info,
DISTINCT_modify,
DISTINCT_delete
DISTINCT_delete,
DISTINCT_find_matching_rows,
};
UINT DISTINCT_CreateView( MSIDATABASE *db, MSIVIEW **view, MSIVIEW *table )

View File

@ -214,6 +214,14 @@ static UINT INSERT_delete( struct tagMSIVIEW *view )
return ERROR_SUCCESS;
}
static UINT INSERT_find_matching_rows( struct tagMSIVIEW *view, UINT col,
UINT val, UINT *row, MSIITERHANDLE *handle )
{
TRACE("%p, %d, %u, %p\n", view, col, val, *handle);
return ERROR_FUNCTION_FAILED;
}
MSIVIEWOPS insert_ops =
{
@ -226,7 +234,8 @@ MSIVIEWOPS insert_ops =
INSERT_get_dimensions,
INSERT_get_column_info,
INSERT_modify,
INSERT_delete
INSERT_delete,
INSERT_find_matching_rows
};
UINT INSERT_CreateView( MSIDATABASE *db, MSIVIEW **view, LPWSTR table,

View File

@ -100,6 +100,8 @@ typedef struct tagMSIRECORD
MSIFIELD fields[1]; /* nb. array size is count+1 */
} MSIRECORD;
typedef void *MSIITERHANDLE;
typedef struct tagMSIVIEWOPS
{
/*
@ -114,7 +116,7 @@ typedef struct tagMSIVIEWOPS
UINT (*fetch_int)( struct tagMSIVIEW *, UINT row, UINT col, UINT *val );
/*
* fetch_int - reads one integer from {row,col} in the table
* fetch_stream - gets a stream from {row,col} in the table
*
* This function is similar to fetch_int, except fetches a
* stream instead of an integer.
@ -170,6 +172,18 @@ typedef struct tagMSIVIEWOPS
*/
UINT (*delete)( struct tagMSIVIEW * );
/*
* find_matching_rows - iterates through rows that match a value
*
* If the column type is a string then a string ID should be passed in.
* If the value to be looked up is an integer then no transformation of
* the input value is required, except if the column is a string, in which
* case a string ID should be passed in.
* The handle is an input/output parameter that keeps track of the current
* position in the iteration. It must be initialised to zero before the
* first call and continued to be passed in to subsequent calls.
*/
UINT (*find_matching_rows)( struct tagMSIVIEW *, UINT col, UINT val, UINT *row, MSIITERHANDLE *handle );
} MSIVIEWOPS;
struct tagMSIVIEW

View File

@ -249,6 +249,24 @@ static UINT ORDER_delete( struct tagMSIVIEW *view )
return ERROR_SUCCESS;
}
static UINT ORDER_find_matching_rows( struct tagMSIVIEW *view, UINT col,
UINT val, UINT *row, MSIITERHANDLE *handle )
{
MSIORDERVIEW *ov = (MSIORDERVIEW*)view;
UINT r;
TRACE("%p, %d, %u, %p\n", ov, col, val, *handle);
if( !ov->table )
return ERROR_FUNCTION_FAILED;
r = ov->table->ops->find_matching_rows( ov->table, col, val, row, handle );
*row = ov->reorder[ *row ];
return r;
}
MSIVIEWOPS order_ops =
{
@ -261,7 +279,8 @@ MSIVIEWOPS order_ops =
ORDER_get_dimensions,
ORDER_get_column_info,
ORDER_modify,
ORDER_delete
ORDER_delete,
ORDER_find_matching_rows
};
static UINT ORDER_AddColumn( MSIORDERVIEW *ov, LPCWSTR name )

View File

@ -196,6 +196,24 @@ static UINT SELECT_delete( struct tagMSIVIEW *view )
return ERROR_SUCCESS;
}
static UINT SELECT_find_matching_rows( struct tagMSIVIEW *view, UINT col,
UINT val, UINT *row, MSIITERHANDLE *handle )
{
MSISELECTVIEW *sv = (MSISELECTVIEW*)view;
TRACE("%p, %d, %u, %p\n", view, col, val, *handle);
if( !sv->table )
return ERROR_FUNCTION_FAILED;
if( (col==0) || (col>sv->num_cols) )
return ERROR_FUNCTION_FAILED;
col = sv->cols[ col - 1 ];
return sv->table->ops->find_matching_rows( sv->table, col, val, row, handle );
}
MSIVIEWOPS select_ops =
{
@ -208,7 +226,8 @@ MSIVIEWOPS select_ops =
SELECT_get_dimensions,
SELECT_get_column_info,
SELECT_modify,
SELECT_delete
SELECT_delete,
SELECT_find_matching_rows
};
static UINT SELECT_AddColumn( MSISELECTVIEW *sv, LPCWSTR name )

View File

@ -39,6 +39,15 @@
WINE_DEFAULT_DEBUG_CHANNEL(msidb);
#define MSITABLE_HASH_TABLE_SIZE 37
typedef struct tagMSICOLUMNHASHENTRY
{
struct tagMSICOLUMNHASHENTRY *next;
UINT value;
UINT row;
} MSICOLUMNHASHENTRY;
typedef struct tagMSICOLUMNINFO
{
LPCWSTR tablename;
@ -46,6 +55,7 @@ typedef struct tagMSICOLUMNINFO
LPCWSTR colname;
UINT type;
UINT offset;
MSICOLUMNHASHENTRY **hash_table;
} MSICOLUMNINFO;
struct tagMSITABLE
@ -884,6 +894,7 @@ static void msi_free_colinfo( MSICOLUMNINFO *colinfo, UINT count )
{
msi_free( (LPWSTR) colinfo[i].tablename );
msi_free( (LPWSTR) colinfo[i].colname );
msi_free( colinfo[i].hash_table );
}
}
@ -931,7 +942,7 @@ static UINT get_tablecolumns( MSIDATABASE *db,
return r;
}
TRACE("Table id is %d\n", table_id);
TRACE("Table id is %d, row count is %d\n", table_id, table->row_count);
count = table->row_count;
for( i=0; i<count; i++ )
@ -945,6 +956,7 @@ static UINT get_tablecolumns( MSIDATABASE *db,
colinfo[n].number = table->data[ i ][ 1 ] - (1<<15);
colinfo[n].colname = MSI_makestring( db, id );
colinfo[n].type = table->data[ i ] [ 3 ] ^ 0x8000;
colinfo[n].hash_table = NULL;
/* this assumes that columns are in order in the table */
if( n )
colinfo[n].offset = colinfo[n-1].offset
@ -1389,6 +1401,98 @@ static UINT TABLE_delete( struct tagMSIVIEW *view )
return ERROR_SUCCESS;
}
static UINT TABLE_find_matching_rows( struct tagMSIVIEW *view, UINT col,
UINT val, UINT *row, MSIITERHANDLE *handle )
{
MSITABLEVIEW *tv = (MSITABLEVIEW*)view;
const MSICOLUMNHASHENTRY *entry;
TRACE("%p, %d, %u, %p\n", view, col, val, *handle);
if( !tv->table )
return ERROR_INVALID_PARAMETER;
if( (col==0) || (col > tv->num_cols) )
return ERROR_INVALID_PARAMETER;
if( !tv->columns[col-1].hash_table )
{
UINT i;
UINT num_rows = tv->table->row_count;
MSICOLUMNHASHENTRY **hash_table;
MSICOLUMNHASHENTRY *new_entry;
if( tv->columns[col-1].offset >= tv->row_size )
{
ERR("Stuffed up %d >= %d\n", tv->columns[col-1].offset, tv->row_size );
ERR("%p %p\n", tv, tv->columns );
return ERROR_FUNCTION_FAILED;
}
/* allocate contiguous memory for the table and its entries so we
* don't have to do an expensive cleanup */
hash_table = msi_alloc(MSITABLE_HASH_TABLE_SIZE * sizeof(MSICOLUMNHASHENTRY*) +
num_rows * sizeof(MSICOLUMNHASHENTRY));
if (!hash_table)
return ERROR_OUTOFMEMORY;
memset(hash_table, 0, MSITABLE_HASH_TABLE_SIZE * sizeof(MSICOLUMNHASHENTRY*));
tv->columns[col-1].hash_table = hash_table;
new_entry = (MSICOLUMNHASHENTRY *)(hash_table + MSITABLE_HASH_TABLE_SIZE);
for (i = 0; i < num_rows; i++, new_entry++)
{
UINT row_value, n;
UINT offset = i + (tv->columns[col-1].offset/2) * num_rows;
n = bytes_per_column( &tv->columns[col-1] );
switch( n )
{
case 4:
offset = tv->columns[col-1].offset/2;
row_value = tv->table->data[i][offset] +
(tv->table->data[i][offset + 1] << 16);
break;
case 2:
offset = tv->columns[col-1].offset/2;
row_value = tv->table->data[i][offset];
break;
default:
ERR("oops! what is %d bytes per column?\n", n );
continue;
}
new_entry->next = NULL;
new_entry->value = row_value;
new_entry->row = i;
if (hash_table[row_value % MSITABLE_HASH_TABLE_SIZE])
{
MSICOLUMNHASHENTRY *prev_entry = hash_table[row_value % MSITABLE_HASH_TABLE_SIZE];
while (prev_entry->next)
prev_entry = prev_entry->next;
prev_entry->next = new_entry;
}
else
hash_table[row_value % MSITABLE_HASH_TABLE_SIZE] = new_entry;
}
}
if( !*handle )
entry = tv->columns[col-1].hash_table[val % MSITABLE_HASH_TABLE_SIZE];
else
entry = ((const MSICOLUMNHASHENTRY *)*handle)->next;
while (entry && entry->value != val)
entry = entry->next;
*handle = (MSIITERHANDLE)entry;
if (!entry)
return ERROR_NO_MORE_ITEMS;
*row = entry->row;
return ERROR_SUCCESS;
}
MSIVIEWOPS table_ops =
{
@ -1401,7 +1505,8 @@ MSIVIEWOPS table_ops =
TABLE_get_dimensions,
TABLE_get_column_info,
TABLE_modify,
TABLE_delete
TABLE_delete,
TABLE_find_matching_rows
};
UINT TABLE_CreateView( MSIDATABASE *db, LPCWSTR name, MSIVIEW **view )

View File

@ -177,6 +177,13 @@ static UINT UPDATE_delete( struct tagMSIVIEW *view )
return ERROR_SUCCESS;
}
static UINT UPDATE_find_matching_rows( struct tagMSIVIEW *view, UINT col, UINT val, UINT *row, MSIITERHANDLE *handle )
{
TRACE("%p %d %d %p\n", view, col, val, *handle );
return ERROR_FUNCTION_FAILED;
}
static MSIVIEWOPS update_ops =
{
@ -189,7 +196,8 @@ static MSIVIEWOPS update_ops =
UPDATE_get_dimensions,
UPDATE_get_column_info,
UPDATE_modify,
UPDATE_delete
UPDATE_delete,
UPDATE_find_matching_rows
};
UINT UPDATE_CreateView( MSIDATABASE *db, MSIVIEW **view, LPWSTR table,

View File

@ -248,6 +248,45 @@ static UINT WHERE_execute( struct tagMSIVIEW *view, MSIRECORD *record )
return ERROR_FUNCTION_FAILED;
wv->row_count = 0;
if (wv->cond->type == EXPR_STRCMP)
{
MSIITERHANDLE handle = NULL;
UINT row, value, col;
struct expr *col_cond = wv->cond->u.expr.left;
struct expr *val_cond = wv->cond->u.expr.right;
/* swap conditionals */
if (col_cond->type != EXPR_COL_NUMBER_STRING)
{
val_cond = wv->cond->u.expr.left;
col_cond = wv->cond->u.expr.right;
}
if ((col_cond->type == EXPR_COL_NUMBER_STRING) && (val_cond->type == EXPR_SVAL))
{
col = col_cond->u.col_number;
r = msi_string2idW(wv->db->strings, val_cond->u.sval, &value);
if (r != ERROR_SUCCESS)
{
TRACE("no id for %s, assuming it doesn't exist in the table\n", debugstr_w(wv->cond->u.expr.right->u.sval));
return ERROR_SUCCESS;
}
do
{
r = table->ops->find_matching_rows(table, col, value, &row, &handle);
if (r == ERROR_SUCCESS)
wv->reorder[ wv->row_count ++ ] = row;
} while (r == ERROR_SUCCESS);
if (r == ERROR_NO_MORE_ITEMS)
return ERROR_SUCCESS;
else
return r;
}
/* else fallback to slow case */
}
for( i=0; i<count; i++ )
{
val = 0;
@ -341,6 +380,27 @@ static UINT WHERE_delete( struct tagMSIVIEW *view )
return ERROR_SUCCESS;
}
static UINT WHERE_find_matching_rows( struct tagMSIVIEW *view, UINT col,
UINT val, UINT *row, MSIITERHANDLE *handle )
{
MSIWHEREVIEW *wv = (MSIWHEREVIEW*)view;
UINT r;
TRACE("%p, %d, %u, %p\n", view, col, val, *handle);
if( !wv->table )
return ERROR_FUNCTION_FAILED;
r = wv->table->ops->find_matching_rows( wv->table, col, val, row, handle );
if( *row > wv->row_count )
return ERROR_NO_MORE_ITEMS;
*row = wv->reorder[ *row ];
return r;
}
MSIVIEWOPS where_ops =
{
@ -353,7 +413,8 @@ MSIVIEWOPS where_ops =
WHERE_get_dimensions,
WHERE_get_column_info,
WHERE_modify,
WHERE_delete
WHERE_delete,
WHERE_find_matching_rows
};
static UINT WHERE_VerifyCondition( MSIDATABASE *db, MSIVIEW *table, struct expr *cond,