X-Git-Url: https://pintos-os.org/cgi-bin/gitweb.cgi?a=blobdiff_plain;f=src%2Fdata%2Fpsql-reader.c;h=4a49ba0cf92c413c551811b1ce1a87dd9e64c062;hb=8d6bfdd2a100bf8166b3b0b3f006d46f3e7a59e9;hp=f59bcb6dac8521cba01d026ca5cf1a54a524d367;hpb=da333d7456a56655ebf4ce0e16e6cf468ab1c1af;p=pspp diff --git a/src/data/psql-reader.c b/src/data/psql-reader.c index f59bcb6dac..4a49ba0cf9 100644 --- a/src/data/psql-reader.c +++ b/src/data/psql-reader.c @@ -1,5 +1,5 @@ /* PSPP - a program for statistical analysis. - Copyright (C) 2008 Free Software Foundation, Inc. + Copyright (C) 2008, 2009, 2010, 2011, 2012 Free Software Foundation, Inc. This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -16,18 +16,25 @@ #include -#include -#include -#include -#include +#include "data/psql-reader.h" + +#include +#include #include -#include "psql-reader.h" -#include "variable.h" -#include "format.h" -#include "calendar.h" +#include "data/calendar.h" +#include "data/casereader-provider.h" +#include "data/dictionary.h" +#include "data/format.h" +#include "data/variable.h" +#include "libpspp/i18n.h" +#include "libpspp/message.h" +#include "libpspp/misc.h" +#include "libpspp/str.h" -#include +#include "gl/c-strcase.h" +#include "gl/minmax.h" +#include "gl/xalloc.h" #include "gettext.h" #define _(msgid) gettext (msgid) @@ -36,7 +43,7 @@ #if !PSQL_SUPPORT struct casereader * -psql_open_reader (struct psql_read_info *info, struct dictionary **dict) +psql_open_reader (struct psql_read_info *info UNUSED, struct dictionary **dict UNUSED) { msg (ME, _("Support for reading postgres databases was not compiled into this installation of PSPP")); @@ -49,6 +56,9 @@ psql_open_reader (struct psql_read_info *info, struct dictionary **dict) #include +/* Default width of string variables. */ +#define PSQL_DEFAULT_WIDTH 8 + /* These macros must be the same as in catalog/pg_types.h from the postgres source */ #define BOOLOID 16 #define BYTEAOID 17 @@ -74,8 +84,7 @@ psql_open_reader (struct psql_read_info *info, struct dictionary **dict) static void psql_casereader_destroy (struct casereader *reader UNUSED, void *r_); -static bool psql_casereader_read (struct casereader *, void *, - struct ccase *); +static struct ccase *psql_casereader_read (struct casereader *, void *); static const struct casereader_class psql_casereader_class = { @@ -88,26 +97,27 @@ static const struct casereader_class psql_casereader_class = struct psql_reader { PGconn *conn; + PGresult *res; + int tuple; bool integer_datetimes; double postgres_epoch; - size_t value_cnt; + struct caseproto *proto; struct dictionary *dict; - bool used_first_case; - struct ccase first_case; - /* An array of ints, which maps psql column numbers into - pspp variable numbers */ - int *vmap; + pspp variables */ + struct variable **vmap; size_t vmapsize; + + struct string fetch_cmd; + int cache_size; }; -static void set_value (const struct psql_reader *r, - PGresult *res, struct ccase *c); +static struct ccase *set_value (struct psql_reader *r); @@ -171,45 +181,61 @@ create_var (struct psql_reader *r, const struct fmt_spec *fmt, int width, const char *suggested_name, int col) { unsigned long int vx = 0; - int vidx; struct variable *var; - char name[VAR_NAME_LEN + 1]; - - r->value_cnt += value_cnt_from_width (width); - - if ( ! dict_make_unique_var_name (r->dict, suggested_name, &vx, name)) - { - msg (ME, _("Cannot create variable name from %s"), suggested_name); - return NULL; - } + char *name; + name = dict_make_unique_var_name (r->dict, suggested_name, &vx); var = dict_create_var (r->dict, name, width); - var_set_both_formats (var, fmt); + free (name); - vidx = var_get_dict_index (var); + var_set_both_formats (var, fmt); if ( col != -1) { - r->vmap = xrealloc (r->vmap, (col + 1) * sizeof (int)); + r->vmap = xrealloc (r->vmap, (col + 1) * sizeof (*r->vmap)); - r->vmap[col] = vidx; + r->vmap[col] = var; r->vmapsize = col + 1; } return var; } + + + +/* Fill the cache */ +static bool +reload_cache (struct psql_reader *r) +{ + PQclear (r->res); + r->tuple = 0; + + r->res = PQexec (r->conn, ds_cstr (&r->fetch_cmd)); + + if (PQresultStatus (r->res) != PGRES_TUPLES_OK || PQntuples (r->res) < 1) + { + PQclear (r->res); + r->res = NULL; + return false; + } + + return true; +} + + struct casereader * psql_open_reader (struct psql_read_info *info, struct dictionary **dict) { int i; - int n_fields; - PGresult *res = NULL; + int n_fields, n_tuples; + PGresult *qres = NULL; + casenumber n_cases = CASENUMBER_MAX; + const char *encoding; struct psql_reader *r = xzalloc (sizeof *r); struct string query ; - r->conn = PQconnectdb (info->conninfo); if ( NULL == r->conn) { @@ -226,12 +252,12 @@ psql_open_reader (struct psql_read_info *info, struct dictionary **dict) } { - int v1; + int ver_num = 0; const char *vers = PQparameterStatus (r->conn, "server_version"); - sscanf (vers, "%d", &v1); + sscanf (vers, "%d", &ver_num); - if ( v1 < 8) + if ( ver_num < 8) { msg (ME, _("Postgres server is version %s." @@ -245,7 +271,7 @@ psql_open_reader (struct psql_read_info *info, struct dictionary **dict) { const char *dt = PQparameterStatus (r->conn, "integer_datetimes"); - r->integer_datetimes = ( 0 == strcasecmp (dt, "on")); + r->integer_datetimes = ( 0 == c_strcasecmp (dt, "on")); } #if USE_SSL @@ -260,38 +286,85 @@ psql_open_reader (struct psql_read_info *info, struct dictionary **dict) } } - r->postgres_epoch = - calendar_gregorian_to_offset (2000, 1, 1, NULL, NULL); + r->postgres_epoch = calendar_gregorian_to_offset (2000, 1, 1, NULL); + { + const int enc = PQclientEncoding (r->conn); + + /* According to section 22.2 of the Postgresql manual + a value of zero (SQL_ASCII) indicates + "a declaration of ignorance about the encoding". + Accordingly, we use the default encoding + if we find this value. + */ + encoding = enc ? pg_encoding_to_char (enc) : get_default_encoding (); + + /* Create the dictionary and populate it */ + *dict = r->dict = dict_create (encoding); + } - /* Create the dictionary and populate it */ - *dict = r->dict = dict_create (); + const int version = PQserverVersion (r->conn); + ds_init_empty (&query); + /* + Versions before 9.1 don't have the REPEATABLE READ isolation level. + However according to if the server is in the + "hot standby" mode then SERIALIZABLE won't work. + */ + ds_put_c_format (&query, + "BEGIN READ ONLY ISOLATION LEVEL %s; " + "DECLARE pspp BINARY CURSOR FOR ", + (version < 90100) ? "SERIALIZABLE" : "REPEATABLE READ"); - ds_init_cstr (&query, "BEGIN READ ONLY ISOLATION LEVEL SERIALIZABLE; DECLARE pspp BINARY CURSOR FOR "); ds_put_substring (&query, info->sql.ss); - res = PQexec (r->conn, ds_cstr (&query)); + qres = PQexec (r->conn, ds_cstr (&query)); ds_destroy (&query); - if ( PQresultStatus (res) != PGRES_COMMAND_OK ) + if ( PQresultStatus (qres) != PGRES_COMMAND_OK ) { msg (ME, _("Error from psql source: %s."), - PQresultErrorMessage (res)); + PQresultErrorMessage (qres)); goto error; } - PQclear (res); + PQclear (qres); + - res = PQexec (r->conn, "FETCH FIRST FROM pspp"); - if ( PQresultStatus (res) != PGRES_TUPLES_OK ) + /* Now use the count() function to find the total number of cases + that this query returns. + Doing this incurs some overhead. The server has to iterate every + case in order to find this number. However, it's performed on the + server side, and in all except the most huge databases the extra + overhead will be worth the effort. + On the other hand, most PSPP functions don't need to know this. + The GUI is the notable exception. + */ + ds_init_cstr (&query, "SELECT count (*) FROM ("); + ds_put_substring (&query, info->sql.ss); + ds_put_cstr (&query, ") stupid_sql_standard"); + + qres = PQexec (r->conn, ds_cstr (&query)); + ds_destroy (&query); + if ( PQresultStatus (qres) != PGRES_TUPLES_OK ) + { + msg (ME, _("Error from psql source: %s."), + PQresultErrorMessage (qres)); + goto error; + } + n_cases = atol (PQgetvalue (qres, 0, 0)); + PQclear (qres); + + qres = PQexec (r->conn, "FETCH FIRST FROM pspp"); + if ( PQresultStatus (qres) != PGRES_TUPLES_OK ) { msg (ME, _("Error from psql source: %s."), - PQresultErrorMessage (res)); + PQresultErrorMessage (qres)); goto error; } - n_fields = PQnfields (res); + n_tuples = PQntuples (qres); + n_fields = PQnfields (qres); - r->value_cnt = 0; + r->proto = NULL; r->vmap = NULL; r->vmapsize = 0; @@ -299,9 +372,16 @@ psql_open_reader (struct psql_read_info *info, struct dictionary **dict) { struct variable *var; struct fmt_spec fmt = {FMT_F, 8, 2}; - Oid type = PQftype (res, i); + Oid type = PQftype (qres, i); int width = 0; - int length = PQgetlength (res, 0, i); + int length ; + + /* If there are no data then make a finger in the air + guess at the contents */ + if ( n_tuples > 0 ) + length = PQgetlength (qres, 0, i); + else + length = PSQL_DEFAULT_WIDTH; switch (type) { @@ -328,13 +408,13 @@ psql_open_reader (struct psql_read_info *info, struct dictionary **dict) case BPCHAROID: fmt.type = FMT_A; width = (info->str_width == -1) ? - ROUND_UP (length, MAX_SHORT_STRING) : info->str_width; + ROUND_UP (length, PSQL_DEFAULT_WIDTH) : info->str_width; fmt.w = width; fmt.d = 0; break; case BYTEAOID: fmt.type = FMT_AHEX; - width = length > 0 ? length : MAX_SHORT_STRING; + width = length > 0 ? length : PSQL_DEFAULT_WIDTH; fmt.w = width * 2; fmt.d = 0; break; @@ -373,14 +453,35 @@ psql_open_reader (struct psql_read_info *info, struct dictionary **dict) default: msg (MW, _("Unsupported OID %d. SYSMIS values will be inserted."), type); fmt.type = FMT_A; - width = length > 0 ? length : MAX_SHORT_STRING; + width = length > 0 ? length : PSQL_DEFAULT_WIDTH; fmt.w = width ; fmt.d = 0; - break; } - var = create_var (r, &fmt, width, PQfname (res, i), i); + if ( width == 0 && fmt_is_string (fmt.type)) + fmt.w = width = PSQL_DEFAULT_WIDTH; + + + var = create_var (r, &fmt, width, PQfname (qres, i), i); + if ( type == NUMERICOID && n_tuples > 0) + { + const uint8_t *vptr = (const uint8_t *) PQgetvalue (qres, 0, i); + struct fmt_spec fmt; + int16_t n_digits, weight, dscale; + uint16_t sign; + + GET_VALUE (&vptr, n_digits); + GET_VALUE (&vptr, weight); + GET_VALUE (&vptr, sign); + GET_VALUE (&vptr, dscale); + + fmt.d = dscale; + fmt.type = FMT_E; + fmt.w = fmt_max_output_width (fmt.type) ; + fmt.d = MIN (dscale, fmt_max_output_decimals (fmt.type, fmt.w)); + var_set_both_formats (var, &fmt); + } /* Timezones need an extra variable */ switch (type) @@ -416,30 +517,34 @@ psql_open_reader (struct psql_read_info *info, struct dictionary **dict) default: break; } - } - /* Create the first case, and cache it */ - r->used_first_case = false; + PQclear (qres); + qres = PQexec (r->conn, "MOVE BACKWARD 1 FROM pspp"); + if ( PQresultStatus (qres) != PGRES_COMMAND_OK) + { + PQclear (qres); + goto error; + } + PQclear (qres); - case_create (&r->first_case, r->value_cnt); - memset (case_data_rw_idx (&r->first_case, 0)->s, - ' ', MAX_SHORT_STRING * r->value_cnt); + r->cache_size = info->bsize != -1 ? info->bsize: 4096; - set_value (r, res, &r->first_case); + ds_init_empty (&r->fetch_cmd); + ds_put_format (&r->fetch_cmd, "FETCH FORWARD %d FROM pspp", r->cache_size); - PQclear (res); + reload_cache (r); + r->proto = caseproto_ref (dict_get_proto (*dict)); return casereader_create_sequential (NULL, - r->value_cnt, - CASENUMBER_MAX, + r->proto, + n_cases, &psql_casereader_class, r); error: - PQclear (res); - dict_destroy (*dict); + dict_unref (*dict); psql_casereader_destroy (NULL, r); return NULL; @@ -453,69 +558,76 @@ psql_casereader_destroy (struct casereader *reader UNUSED, void *r_) if (r == NULL) return ; + ds_destroy (&r->fetch_cmd); free (r->vmap); + if (r->res) PQclear (r->res); PQfinish (r->conn); + caseproto_unref (r->proto); free (r); } -static bool -psql_casereader_read (struct casereader *reader UNUSED, void *r_, - struct ccase *cc) -{ - PGresult *res; + +static struct ccase * +psql_casereader_read (struct casereader *reader UNUSED, void *r_) +{ struct psql_reader *r = r_; - if ( !r->used_first_case ) + if ( NULL == r->res || r->tuple >= r->cache_size) { - *cc = r->first_case; - r->used_first_case = true; - return true; + if ( ! reload_cache (r) ) + return false; } - case_create (cc, r->value_cnt); - memset (case_data_rw_idx (cc, 0)->s, ' ', MAX_SHORT_STRING * r->value_cnt); + return set_value (r); +} - res = PQexec (r->conn, "FETCH NEXT FROM pspp"); - if ( PQresultStatus (res) != PGRES_TUPLES_OK || PQntuples (res) < 1) - { - PQclear (res); - case_destroy (cc); - return false; - } +static struct ccase * +set_value (struct psql_reader *r) +{ + struct ccase *c; + int n_vars; + int i; - set_value (r, res, cc); + assert (r->res); - PQclear (res); + n_vars = PQnfields (r->res); - return true; -} + if ( r->tuple >= PQntuples (r->res)) + return NULL; + + c = case_create (r->proto); + case_set_missing (c); -static void -set_value (const struct psql_reader *r, - PGresult *res, struct ccase *c) -{ - int i; - int n_vars = PQnfields (res); for (i = 0 ; i < n_vars ; ++i ) { - Oid type = PQftype (res, i); - struct variable *v = dict_get_var (r->dict, r->vmap[i]); + Oid type = PQftype (r->res, i); + const struct variable *v = r->vmap[i]; union value *val = case_data_rw (c, v); - const struct variable *v1 = NULL; + union value *val1 = NULL; - if (i < r->vmapsize && r->vmap[i] + 1 < dict_get_var_cnt (r->dict)) + switch (type) { - v1 = dict_get_var (r->dict, r->vmap[i] + 1); + case INTERVALOID: + case TIMESTAMPTZOID: + case TIMETZOID: + if (i < r->vmapsize && var_get_dict_index(v) + 1 < dict_get_var_cnt (r->dict)) + { + const struct variable *v1 = NULL; + v1 = dict_get_var (r->dict, var_get_dict_index (v) + 1); - val1 = case_data_rw (c, v1); + val1 = case_data_rw (c, v1); + } + break; + default: + break; } - if (PQgetisnull (res, 0, i)) + if (PQgetisnull (r->res, r->tuple, i)) { value_set_missing (val, var_get_width (v)); @@ -532,8 +644,8 @@ set_value (const struct psql_reader *r, } else { - const uint8_t *vptr = (const uint8_t *) PQgetvalue (res, 0, i); - int length = PQgetlength (res, 0, i); + const uint8_t *vptr = (const uint8_t *) PQgetvalue (r->res, r->tuple, i); + int length = PQgetlength (r->res, r->tuple, i); int var_width = var_get_width (v); switch (type) @@ -728,7 +840,8 @@ set_value (const struct psql_reader *r, case VARCHAROID: case BPCHAROID: case BYTEAOID: - memcpy (val->s, (char *) vptr, MIN (length, var_width)); + memcpy (value_str_rw (val, var_width), vptr, + MIN (length, var_width)); break; case NUMERICOID: @@ -743,6 +856,7 @@ set_value (const struct psql_reader *r, GET_VALUE (&vptr, sign); GET_VALUE (&vptr, dscale); +#if 0 { struct fmt_spec fmt; fmt.d = dscale; @@ -751,6 +865,7 @@ set_value (const struct psql_reader *r, fmt.d = MIN (dscale, fmt_max_output_decimals (fmt.type, fmt.w)); var_set_both_formats (v, &fmt); } +#endif for (i = 0 ; i < n_digits; ++i) { @@ -775,6 +890,10 @@ set_value (const struct psql_reader *r, } } } + + r->tuple++; + + return c; } #endif