/* PSPP - a program for statistical analysis.
- Copyright (C) 2008 Free Software Foundation, Inc.
+ Copyright (C) 2008, 2009, 2010, 2011, 2012 Free Software Foundation, Inc.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
#include <config.h>
-#include <data/casereader-provider.h>
-#include <libpspp/message.h>
-#include <gl/xalloc.h>
-#include <data/dictionary.h>
+#include "data/psql-reader.h"
+
+#include <inttypes.h>
+#include <math.h>
#include <stdlib.h>
-#include "psql-reader.h"
-#include "variable.h"
-#include "format.h"
-#include "calendar.h"
+#include "data/calendar.h"
+#include "data/casereader-provider.h"
+#include "data/dictionary.h"
+#include "data/format.h"
+#include "data/variable.h"
+#include "libpspp/i18n.h"
+#include "libpspp/message.h"
+#include "libpspp/misc.h"
+#include "libpspp/str.h"
-#include <inttypes.h>
+#include "gl/c-strcase.h"
+#include "gl/minmax.h"
+#include "gl/xalloc.h"
#include "gettext.h"
#define _(msgid) gettext (msgid)
#if !PSQL_SUPPORT
struct casereader *
-psql_open_reader (struct psql_read_info *info, struct dictionary **dict)
+psql_open_reader (struct psql_read_info *info UNUSED, struct dictionary **dict UNUSED)
{
msg (ME, _("Support for reading postgres databases was not compiled into this installation of PSPP"));
#include <libpq-fe.h>
+/* Default width of string variables. */
+#define PSQL_DEFAULT_WIDTH 8
+
/* These macros must be the same as in catalog/pg_types.h from the postgres source */
#define BOOLOID 16
#define BYTEAOID 17
static void psql_casereader_destroy (struct casereader *reader UNUSED, void *r_);
-static bool psql_casereader_read (struct casereader *, void *,
- struct ccase *);
+static struct ccase *psql_casereader_read (struct casereader *, void *);
static const struct casereader_class psql_casereader_class =
{
struct psql_reader
{
PGconn *conn;
+ PGresult *res;
+ int tuple;
bool integer_datetimes;
double postgres_epoch;
- size_t value_cnt;
+ struct caseproto *proto;
struct dictionary *dict;
- bool used_first_case;
- struct ccase first_case;
-
/* An array of ints, which maps psql column numbers into
- pspp variable numbers */
- int *vmap;
+ pspp variables */
+ struct variable **vmap;
size_t vmapsize;
+
+ struct string fetch_cmd;
+ int cache_size;
};
-static void set_value (const struct psql_reader *r,
- PGresult *res, struct ccase *c);
+static struct ccase *set_value (struct psql_reader *r);
int width, const char *suggested_name, int col)
{
unsigned long int vx = 0;
- int vidx;
struct variable *var;
- char name[VAR_NAME_LEN + 1];
-
- r->value_cnt += value_cnt_from_width (width);
-
- if ( ! dict_make_unique_var_name (r->dict, suggested_name, &vx, name))
- {
- msg (ME, _("Cannot create variable name from %s"), suggested_name);
- return NULL;
- }
+ char *name;
+ name = dict_make_unique_var_name (r->dict, suggested_name, &vx);
var = dict_create_var (r->dict, name, width);
- var_set_both_formats (var, fmt);
+ free (name);
- vidx = var_get_dict_index (var);
+ var_set_both_formats (var, fmt);
if ( col != -1)
{
- r->vmap = xrealloc (r->vmap, (col + 1) * sizeof (int));
+ r->vmap = xrealloc (r->vmap, (col + 1) * sizeof (*r->vmap));
- r->vmap[col] = vidx;
+ r->vmap[col] = var;
r->vmapsize = col + 1;
}
return var;
}
+
+
+
+/* Fill the cache */
+static bool
+reload_cache (struct psql_reader *r)
+{
+ PQclear (r->res);
+ r->tuple = 0;
+
+ r->res = PQexec (r->conn, ds_cstr (&r->fetch_cmd));
+
+ if (PQresultStatus (r->res) != PGRES_TUPLES_OK || PQntuples (r->res) < 1)
+ {
+ PQclear (r->res);
+ r->res = NULL;
+ return false;
+ }
+
+ return true;
+}
+
+
struct casereader *
psql_open_reader (struct psql_read_info *info, struct dictionary **dict)
{
int i;
- int n_fields;
- PGresult *res = NULL;
+ int n_fields, n_tuples;
+ PGresult *qres = NULL;
+ casenumber n_cases = CASENUMBER_MAX;
+ const char *encoding;
struct psql_reader *r = xzalloc (sizeof *r);
struct string query ;
-
r->conn = PQconnectdb (info->conninfo);
if ( NULL == r->conn)
{
}
{
- int v1;
+ int ver_num;
const char *vers = PQparameterStatus (r->conn, "server_version");
- sscanf (vers, "%d", &v1);
+ sscanf (vers, "%d", &ver_num);
- if ( v1 < 8)
+ if ( ver_num < 8)
{
msg (ME,
_("Postgres server is version %s."
{
const char *dt = PQparameterStatus (r->conn, "integer_datetimes");
- r->integer_datetimes = ( 0 == strcasecmp (dt, "on"));
+ r->integer_datetimes = ( 0 == c_strcasecmp (dt, "on"));
}
#if USE_SSL
}
}
- r->postgres_epoch =
- calendar_gregorian_to_offset (2000, 1, 1, NULL, NULL);
+ r->postgres_epoch = calendar_gregorian_to_offset (2000, 1, 1, NULL);
+ {
+ const int enc = PQclientEncoding (r->conn);
+
+ /* According to section 22.2 of the Postgresql manual
+ a value of zero (SQL_ASCII) indicates
+ "a declaration of ignorance about the encoding".
+ Accordingly, we use the default encoding
+ if we find this value.
+ */
+ encoding = enc ? pg_encoding_to_char (enc) : get_default_encoding ();
+
+ /* Create the dictionary and populate it */
+ *dict = r->dict = dict_create (encoding);
+ }
- /* Create the dictionary and populate it */
- *dict = r->dict = dict_create ();
+ /*
+ select count (*) from (select * from medium) stupid_sql_standard;
+ */
+ ds_init_cstr (&query,
+ "BEGIN READ ONLY ISOLATION LEVEL SERIALIZABLE; "
+ "DECLARE pspp BINARY CURSOR FOR ");
- ds_init_cstr (&query, "BEGIN READ ONLY ISOLATION LEVEL SERIALIZABLE; DECLARE pspp BINARY CURSOR FOR ");
ds_put_substring (&query, info->sql.ss);
- res = PQexec (r->conn, ds_cstr (&query));
+ qres = PQexec (r->conn, ds_cstr (&query));
ds_destroy (&query);
- if ( PQresultStatus (res) != PGRES_COMMAND_OK )
+ if ( PQresultStatus (qres) != PGRES_COMMAND_OK )
{
msg (ME, _("Error from psql source: %s."),
- PQresultErrorMessage (res));
+ PQresultErrorMessage (qres));
goto error;
}
- PQclear (res);
+ PQclear (qres);
+
+
+ /* Now use the count() function to find the total number of cases
+ that this query returns.
+ Doing this incurs some overhead. The server has to iterate every
+ case in order to find this number. However, it's performed on the
+ server side, and in all except the most huge databases the extra
+ overhead will be worth the effort.
+ On the other hand, most PSPP functions don't need to know this.
+ The GUI is the notable exception.
+ */
+ ds_init_cstr (&query, "SELECT count (*) FROM (");
+ ds_put_substring (&query, info->sql.ss);
+ ds_put_cstr (&query, ") stupid_sql_standard");
- res = PQexec (r->conn, "FETCH FIRST FROM pspp");
- if ( PQresultStatus (res) != PGRES_TUPLES_OK )
+ qres = PQexec (r->conn, ds_cstr (&query));
+ ds_destroy (&query);
+ if ( PQresultStatus (qres) != PGRES_TUPLES_OK )
{
msg (ME, _("Error from psql source: %s."),
- PQresultErrorMessage (res));
+ PQresultErrorMessage (qres));
goto error;
}
+ n_cases = atol (PQgetvalue (qres, 0, 0));
+ PQclear (qres);
- n_fields = PQnfields (res);
+ qres = PQexec (r->conn, "FETCH FIRST FROM pspp");
+ if ( PQresultStatus (qres) != PGRES_TUPLES_OK )
+ {
+ msg (ME, _("Error from psql source: %s."),
+ PQresultErrorMessage (qres));
+ goto error;
+ }
- r->value_cnt = 0;
+ n_tuples = PQntuples (qres);
+ n_fields = PQnfields (qres);
+
+ r->proto = NULL;
r->vmap = NULL;
r->vmapsize = 0;
{
struct variable *var;
struct fmt_spec fmt = {FMT_F, 8, 2};
- Oid type = PQftype (res, i);
+ Oid type = PQftype (qres, i);
int width = 0;
- int length = PQgetlength (res, 0, i);
+ int length ;
+
+ /* If there are no data then make a finger in the air
+ guess at the contents */
+ if ( n_tuples > 0 )
+ length = PQgetlength (qres, 0, i);
+ else
+ length = PSQL_DEFAULT_WIDTH;
switch (type)
{
case BPCHAROID:
fmt.type = FMT_A;
width = (info->str_width == -1) ?
- ROUND_UP (length, MAX_SHORT_STRING) : info->str_width;
+ ROUND_UP (length, PSQL_DEFAULT_WIDTH) : info->str_width;
fmt.w = width;
fmt.d = 0;
break;
case BYTEAOID:
fmt.type = FMT_AHEX;
- width = length > 0 ? length : MAX_SHORT_STRING;
+ width = length > 0 ? length : PSQL_DEFAULT_WIDTH;
fmt.w = width * 2;
fmt.d = 0;
break;
default:
msg (MW, _("Unsupported OID %d. SYSMIS values will be inserted."), type);
fmt.type = FMT_A;
- width = length > 0 ? length : MAX_SHORT_STRING;
+ width = length > 0 ? length : PSQL_DEFAULT_WIDTH;
fmt.w = width ;
fmt.d = 0;
-
break;
}
- var = create_var (r, &fmt, width, PQfname (res, i), i);
+ if ( width == 0 && fmt_is_string (fmt.type))
+ fmt.w = width = PSQL_DEFAULT_WIDTH;
+
+
+ var = create_var (r, &fmt, width, PQfname (qres, i), i);
+ if ( type == NUMERICOID && n_tuples > 0)
+ {
+ const uint8_t *vptr = (const uint8_t *) PQgetvalue (qres, 0, i);
+ struct fmt_spec fmt;
+ int16_t n_digits, weight, dscale;
+ uint16_t sign;
+
+ GET_VALUE (&vptr, n_digits);
+ GET_VALUE (&vptr, weight);
+ GET_VALUE (&vptr, sign);
+ GET_VALUE (&vptr, dscale);
+
+ fmt.d = dscale;
+ fmt.type = FMT_E;
+ fmt.w = fmt_max_output_width (fmt.type) ;
+ fmt.d = MIN (dscale, fmt_max_output_decimals (fmt.type, fmt.w));
+ var_set_both_formats (var, &fmt);
+ }
/* Timezones need an extra variable */
switch (type)
default:
break;
}
-
}
- /* Create the first case, and cache it */
- r->used_first_case = false;
+ PQclear (qres);
+ qres = PQexec (r->conn, "MOVE BACKWARD 1 FROM pspp");
+ if ( PQresultStatus (qres) != PGRES_COMMAND_OK)
+ {
+ PQclear (qres);
+ goto error;
+ }
+ PQclear (qres);
- case_create (&r->first_case, r->value_cnt);
- memset (case_data_rw_idx (&r->first_case, 0)->s,
- ' ', MAX_SHORT_STRING * r->value_cnt);
+ r->cache_size = info->bsize != -1 ? info->bsize: 4096;
- set_value (r, res, &r->first_case);
+ ds_init_empty (&r->fetch_cmd);
+ ds_put_format (&r->fetch_cmd, "FETCH FORWARD %d FROM pspp", r->cache_size);
- PQclear (res);
+ reload_cache (r);
+ r->proto = caseproto_ref (dict_get_proto (*dict));
return casereader_create_sequential
(NULL,
- r->value_cnt,
- CASENUMBER_MAX,
+ r->proto,
+ n_cases,
&psql_casereader_class, r);
error:
- PQclear (res);
dict_destroy (*dict);
psql_casereader_destroy (NULL, r);
if (r == NULL)
return ;
+ ds_destroy (&r->fetch_cmd);
free (r->vmap);
+ if (r->res) PQclear (r->res);
PQfinish (r->conn);
+ caseproto_unref (r->proto);
free (r);
}
-static bool
-psql_casereader_read (struct casereader *reader UNUSED, void *r_,
- struct ccase *cc)
-{
- PGresult *res;
+
+static struct ccase *
+psql_casereader_read (struct casereader *reader UNUSED, void *r_)
+{
struct psql_reader *r = r_;
- if ( !r->used_first_case )
+ if ( NULL == r->res || r->tuple >= r->cache_size)
{
- *cc = r->first_case;
- r->used_first_case = true;
- return true;
+ if ( ! reload_cache (r) )
+ return false;
}
- case_create (cc, r->value_cnt);
- memset (case_data_rw_idx (cc, 0)->s, ' ', MAX_SHORT_STRING * r->value_cnt);
+ return set_value (r);
+}
- res = PQexec (r->conn, "FETCH NEXT FROM pspp");
- if ( PQresultStatus (res) != PGRES_TUPLES_OK || PQntuples (res) < 1)
- {
- PQclear (res);
- case_destroy (cc);
- return false;
- }
+static struct ccase *
+set_value (struct psql_reader *r)
+{
+ struct ccase *c;
+ int n_vars;
+ int i;
- set_value (r, res, cc);
+ assert (r->res);
- PQclear (res);
+ n_vars = PQnfields (r->res);
- return true;
-}
+ if ( r->tuple >= PQntuples (r->res))
+ return NULL;
+
+ c = case_create (r->proto);
+ case_set_missing (c);
-static void
-set_value (const struct psql_reader *r,
- PGresult *res, struct ccase *c)
-{
- int i;
- int n_vars = PQnfields (res);
for (i = 0 ; i < n_vars ; ++i )
{
- Oid type = PQftype (res, i);
- struct variable *v = dict_get_var (r->dict, r->vmap[i]);
+ Oid type = PQftype (r->res, i);
+ const struct variable *v = r->vmap[i];
union value *val = case_data_rw (c, v);
- const struct variable *v1 = NULL;
+
union value *val1 = NULL;
- if (i < r->vmapsize && r->vmap[i] + 1 < dict_get_var_cnt (r->dict))
+ switch (type)
{
- v1 = dict_get_var (r->dict, r->vmap[i] + 1);
+ case INTERVALOID:
+ case TIMESTAMPTZOID:
+ case TIMETZOID:
+ if (i < r->vmapsize && var_get_dict_index(v) + 1 < dict_get_var_cnt (r->dict))
+ {
+ const struct variable *v1 = NULL;
+ v1 = dict_get_var (r->dict, var_get_dict_index (v) + 1);
- val1 = case_data_rw (c, v1);
+ val1 = case_data_rw (c, v1);
+ }
+ break;
+ default:
+ break;
}
- if (PQgetisnull (res, 0, i))
+ if (PQgetisnull (r->res, r->tuple, i))
{
value_set_missing (val, var_get_width (v));
}
else
{
- const uint8_t *vptr = (const uint8_t *) PQgetvalue (res, 0, i);
- int length = PQgetlength (res, 0, i);
+ const uint8_t *vptr = (const uint8_t *) PQgetvalue (r->res, r->tuple, i);
+ int length = PQgetlength (r->res, r->tuple, i);
int var_width = var_get_width (v);
switch (type)
case CASHOID:
{
- int32_t x;
- GET_VALUE (&vptr, x);
- val->f = x / 100.0;
+ /* Postgres 8.3 uses 64 bits.
+ Earlier versions use 32 */
+ switch (length)
+ {
+ case 8:
+ {
+ int64_t x;
+ GET_VALUE (&vptr, x);
+ val->f = x / 100.0;
+ }
+ break;
+ case 4:
+ {
+ int32_t x;
+ GET_VALUE (&vptr, x);
+ val->f = x / 100.0;
+ }
+ break;
+ default:
+ val->f = SYSMIS;
+ break;
+ }
}
break;
case VARCHAROID:
case BPCHAROID:
case BYTEAOID:
- memcpy (val->s, (char *) vptr, MIN (length, var_width));
+ memcpy (value_str_rw (val, var_width), vptr,
+ MIN (length, var_width));
break;
case NUMERICOID:
GET_VALUE (&vptr, sign);
GET_VALUE (&vptr, dscale);
+#if 0
{
struct fmt_spec fmt;
fmt.d = dscale;
fmt.d = MIN (dscale, fmt_max_output_decimals (fmt.type, fmt.w));
var_set_both_formats (v, &fmt);
}
+#endif
for (i = 0 ; i < n_digits; ++i)
{
}
}
}
+
+ r->tuple++;
+
+ return c;
}
#endif