1 /* PSPP - a program for statistical analysis.
2 Copyright (C) 2008, 2009 Free Software Foundation, Inc.
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>. */
19 #include <data/casereader-provider.h>
20 #include <libpspp/message.h>
21 #include <gl/xalloc.h>
22 #include <data/dictionary.h>
25 #include "psql-reader.h"
31 #include <libpspp/str.h>
34 #define _(msgid) gettext (msgid)
35 #define N_(msgid) (msgid)
40 psql_open_reader (struct psql_read_info *info UNUSED, struct dictionary **dict UNUSED)
42 msg (ME, _("Support for reading postgres databases was not compiled into this installation of PSPP"));
53 /* These macros must be the same as in catalog/pg_types.h from the postgres source */
66 #define BPCHAROID 1042
67 #define VARCHAROID 1043
70 #define TIMESTAMPOID 1114
71 #define TIMESTAMPTZOID 1184
72 #define INTERVALOID 1186
73 #define TIMETZOID 1266
74 #define NUMERICOID 1700
76 static void psql_casereader_destroy (struct casereader *reader UNUSED, void *r_);
78 static struct ccase *psql_casereader_read (struct casereader *, void *);
80 static const struct casereader_class psql_casereader_class =
83 psql_casereader_destroy,
94 bool integer_datetimes;
96 double postgres_epoch;
99 struct dictionary *dict;
101 /* An array of ints, which maps psql column numbers into
103 struct variable **vmap;
106 struct string fetch_cmd;
111 static struct ccase *set_value (struct psql_reader *r);
117 data_to_native (const void *in_, void *out_, int len)
120 const unsigned char *in = in_;
121 unsigned char *out = out_;
122 for (i = 0 ; i < len ; ++i )
127 data_to_native (const void *in_, void *out_, int len)
130 const unsigned char *in = in_;
131 unsigned char *out = out_;
132 for (i = 0 ; i < len ; ++i )
133 out[len - i - 1] = in[i];
138 #define GET_VALUE(IN, OUT) do { \
139 size_t sz = sizeof (OUT); \
140 data_to_native (*(IN), &(OUT), sz) ; \
147 dump (const unsigned char *x, int l)
151 for (i = 0; i < l ; ++i)
153 printf ("%02x ", x[i]);
158 for (i = 0; i < l ; ++i)
161 printf ("%c ", x[i]);
170 static struct variable *
171 create_var (struct psql_reader *r, const struct fmt_spec *fmt,
172 int width, const char *suggested_name, int col)
174 unsigned long int vx = 0;
175 struct variable *var;
176 char name[VAR_NAME_LEN + 1];
178 r->value_cnt += value_cnt_from_width (width);
180 if ( ! dict_make_unique_var_name (r->dict, suggested_name, &vx, name))
182 msg (ME, _("Cannot create variable name from %s"), suggested_name);
186 var = dict_create_var (r->dict, name, width);
187 var_set_both_formats (var, fmt);
191 r->vmap = xrealloc (r->vmap, (col + 1) * sizeof (*r->vmap));
194 r->vmapsize = col + 1;
205 reload_cache (struct psql_reader *r)
210 r->res = PQexec (r->conn, ds_cstr (&r->fetch_cmd));
212 if (PQresultStatus (r->res) != PGRES_TUPLES_OK || PQntuples (r->res) < 1)
224 psql_open_reader (struct psql_read_info *info, struct dictionary **dict)
227 int n_fields, n_tuples;
228 PGresult *qres = NULL;
229 casenumber n_cases = CASENUMBER_MAX;
231 struct psql_reader *r = xzalloc (sizeof *r);
232 struct string query ;
234 r->conn = PQconnectdb (info->conninfo);
235 if ( NULL == r->conn)
237 msg (ME, _("Memory error whilst opening psql source"));
241 if ( PQstatus (r->conn) != CONNECTION_OK )
243 msg (ME, _("Error opening psql source: %s."),
244 PQerrorMessage (r->conn));
251 const char *vers = PQparameterStatus (r->conn, "server_version");
253 sscanf (vers, "%d", &ver_num);
258 _("Postgres server is version %s."
259 " Reading from versions earlier than 8.0 is not supported."),
267 const char *dt = PQparameterStatus (r->conn, "integer_datetimes");
269 r->integer_datetimes = ( 0 == strcasecmp (dt, "on"));
273 if ( PQgetssl (r->conn) == NULL)
276 if (! info->allow_clear)
278 msg (ME, _("Connection is unencrypted, "
279 "but unencrypted connections have not been permitted."));
285 calendar_gregorian_to_offset (2000, 1, 1, NULL, NULL);
288 /* Create the dictionary and populate it */
289 *dict = r->dict = dict_create ();
292 select count (*) from (select * from medium) stupid_sql_standard;
295 ds_init_cstr (&query,
296 "BEGIN READ ONLY ISOLATION LEVEL SERIALIZABLE; "
297 "DECLARE pspp BINARY CURSOR FOR ");
299 ds_put_substring (&query, info->sql.ss);
301 qres = PQexec (r->conn, ds_cstr (&query));
303 if ( PQresultStatus (qres) != PGRES_COMMAND_OK )
305 msg (ME, _("Error from psql source: %s."),
306 PQresultErrorMessage (qres));
313 /* Now use the count() function to find the total number of cases
314 that this query returns.
315 Doing this incurs some overhead. The server has to iterate every
316 case in order to find this number. However, it's performed on the
317 server side, and in all except the most huge databases the extra
318 overhead will be worth the effort.
319 On the other hand, most PSPP functions don't need to know this.
320 The GUI is the notable exception.
322 ds_init_cstr (&query, "SELECT count (*) FROM (");
323 ds_put_substring (&query, info->sql.ss);
324 ds_put_cstr (&query, ") stupid_sql_standard");
326 qres = PQexec (r->conn, ds_cstr (&query));
328 if ( PQresultStatus (qres) != PGRES_TUPLES_OK )
330 msg (ME, _("Error from psql source: %s."),
331 PQresultErrorMessage (qres));
334 n_cases = atol (PQgetvalue (qres, 0, 0));
337 qres = PQexec (r->conn, "FETCH FIRST FROM pspp");
338 if ( PQresultStatus (qres) != PGRES_TUPLES_OK )
340 msg (ME, _("Error from psql source: %s."),
341 PQresultErrorMessage (qres));
345 n_tuples = PQntuples (qres);
346 n_fields = PQnfields (qres);
352 for (i = 0 ; i < n_fields ; ++i )
354 struct variable *var;
355 struct fmt_spec fmt = {FMT_F, 8, 2};
356 Oid type = PQftype (qres, i);
360 /* If there are no data then make a finger in the air
361 guess at the contents */
363 length = PQgetlength (qres, 0, i);
365 length = MAX_SHORT_STRING;
379 fmt.type = FMT_DOLLAR;
383 width = length > 0 ? length : 1;
391 width = (info->str_width == -1) ?
392 ROUND_UP (length, MAX_SHORT_STRING) : info->str_width;
398 width = length > 0 ? length : MAX_SHORT_STRING;
403 fmt.type = FMT_DTIME;
423 fmt.type = FMT_DATETIME;
435 msg (MW, _("Unsupported OID %d. SYSMIS values will be inserted."), type);
437 width = length > 0 ? length : MAX_SHORT_STRING;
443 if ( width == 0 && fmt_is_string (fmt.type))
444 fmt.w = width = MAX_SHORT_STRING;
447 var = create_var (r, &fmt, width, PQfname (qres, i), i);
448 if ( type == NUMERICOID && n_tuples > 0)
450 const uint8_t *vptr = (const uint8_t *) PQgetvalue (qres, 0, i);
452 int16_t n_digits, weight, dscale;
455 GET_VALUE (&vptr, n_digits);
456 GET_VALUE (&vptr, weight);
457 GET_VALUE (&vptr, sign);
458 GET_VALUE (&vptr, dscale);
462 fmt.w = fmt_max_output_width (fmt.type) ;
463 fmt.d = MIN (dscale, fmt_max_output_decimals (fmt.type, fmt.w));
464 var_set_both_formats (var, &fmt);
467 /* Timezones need an extra variable */
473 ds_init_cstr (&name, var_get_name (var));
474 ds_put_cstr (&name, "-zone");
479 create_var (r, &fmt, 0, ds_cstr (&name), -1);
488 ds_init_cstr (&name, var_get_name (var));
489 ds_put_cstr (&name, "-months");
494 create_var (r, &fmt, 0, ds_cstr (&name), -1);
505 qres = PQexec (r->conn, "MOVE BACKWARD 1 FROM pspp");
506 if ( PQresultStatus (qres) != PGRES_COMMAND_OK)
513 r->cache_size = info->bsize != -1 ? info->bsize: 4096;
515 ds_init_empty (&r->fetch_cmd);
516 ds_put_format (&r->fetch_cmd, "FETCH FORWARD %d FROM pspp", r->cache_size);
520 return casereader_create_sequential
524 &psql_casereader_class, r);
527 dict_destroy (*dict);
529 psql_casereader_destroy (NULL, r);
535 psql_casereader_destroy (struct casereader *reader UNUSED, void *r_)
537 struct psql_reader *r = r_;
541 ds_destroy (&r->fetch_cmd);
543 if (r->res) PQclear (r->res);
551 static struct ccase *
552 psql_casereader_read (struct casereader *reader UNUSED, void *r_)
554 struct psql_reader *r = r_;
556 if ( NULL == r->res || r->tuple >= r->cache_size)
558 if ( ! reload_cache (r) )
562 return set_value (r);
565 static struct ccase *
566 set_value (struct psql_reader *r)
574 n_vars = PQnfields (r->res);
576 if ( r->tuple >= PQntuples (r->res))
579 c = case_create (r->value_cnt);
580 memset (case_data_rw_idx (c, 0)->s, ' ', MAX_SHORT_STRING * r->value_cnt);
583 for (i = 0 ; i < n_vars ; ++i )
585 Oid type = PQftype (r->res, i);
586 const struct variable *v = r->vmap[i];
587 union value *val = case_data_rw (c, v);
589 union value *val1 = NULL;
596 if (i < r->vmapsize && var_get_dict_index(v) + 1 < dict_get_var_cnt (r->dict))
598 const struct variable *v1 = NULL;
599 v1 = dict_get_var (r->dict, var_get_dict_index (v) + 1);
601 val1 = case_data_rw (c, v1);
609 if (PQgetisnull (r->res, r->tuple, i))
611 value_set_missing (val, var_get_width (v));
626 const uint8_t *vptr = (const uint8_t *) PQgetvalue (r->res, r->tuple, i);
627 int length = PQgetlength (r->res, r->tuple, i);
629 int var_width = var_get_width (v);
635 GET_VALUE (&vptr, x);
644 GET_VALUE (&vptr, x);
652 GET_VALUE (&vptr, x);
660 GET_VALUE (&vptr, x);
668 GET_VALUE (&vptr, n);
676 GET_VALUE (&vptr, n);
683 /* Postgres 8.3 uses 64 bits.
684 Earlier versions use 32 */
690 GET_VALUE (&vptr, x);
697 GET_VALUE (&vptr, x);
710 if ( r->integer_datetimes )
717 GET_VALUE (&vptr, things);
718 GET_VALUE (&vptr, us);
719 GET_VALUE (&vptr, days);
720 GET_VALUE (&vptr, months);
722 val->f = us / 1000000.0;
723 val->f += days * 24 * 3600;
729 uint32_t days, months;
732 GET_VALUE (&vptr, seconds);
733 GET_VALUE (&vptr, days);
734 GET_VALUE (&vptr, months);
737 val->f += days * 24 * 3600;
748 GET_VALUE (&vptr, x);
750 val->f = (x + r->postgres_epoch) * 24 * 3600 ;
756 if ( r->integer_datetimes)
759 GET_VALUE (&vptr, x);
760 val->f = x / 1000000.0;
765 GET_VALUE (&vptr, x);
774 if ( r->integer_datetimes)
779 GET_VALUE (&vptr, x);
780 val->f = x / 1000000.0;
786 GET_VALUE (&vptr, x);
790 GET_VALUE (&vptr, zone);
791 val1->f = zone / 3600.0;
798 if ( r->integer_datetimes)
802 GET_VALUE (&vptr, x);
806 val->f = (x + r->postgres_epoch * 24 * 3600 );
812 GET_VALUE (&vptr, x);
814 val->f = (x + r->postgres_epoch * 24 * 3600 );
822 memcpy (val->s, (char *) vptr, MIN (length, var_width));
829 int16_t n_digits, weight, dscale;
832 GET_VALUE (&vptr, n_digits);
833 GET_VALUE (&vptr, weight);
834 GET_VALUE (&vptr, sign);
835 GET_VALUE (&vptr, dscale);
842 fmt.w = fmt_max_output_width (fmt.type) ;
843 fmt.d = MIN (dscale, fmt_max_output_decimals (fmt.type, fmt.w));
844 var_set_both_formats (v, &fmt);
848 for (i = 0 ; i < n_digits; ++i)
851 GET_VALUE (&vptr, x);
852 f += x * pow (10000, weight--);