1 /* PSPP - a program for statistical analysis.
2 Copyright (C) 2008 Free Software Foundation, Inc.
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>. */
19 #include <data/casereader-provider.h>
20 #include <libpspp/message.h>
21 #include <gl/xalloc.h>
22 #include <data/dictionary.h>
25 #include "psql-reader.h"
31 #include <libpspp/str.h>
34 #define _(msgid) gettext (msgid)
35 #define N_(msgid) (msgid)
40 psql_open_reader (struct psql_read_info *info, struct dictionary **dict)
42 msg (ME, _("Support for reading postgres databases was not compiled into this installation of PSPP"));
53 /* These macros must be the same as in catalog/pg_types.h from the postgres source */
66 #define BPCHAROID 1042
67 #define VARCHAROID 1043
70 #define TIMESTAMPOID 1114
71 #define TIMESTAMPTZOID 1184
72 #define INTERVALOID 1186
73 #define TIMETZOID 1266
74 #define NUMERICOID 1700
76 static void psql_casereader_destroy (struct casereader *reader UNUSED, void *r_);
78 static bool psql_casereader_read (struct casereader *, void *,
81 static const struct casereader_class psql_casereader_class =
84 psql_casereader_destroy,
95 bool integer_datetimes;
97 double postgres_epoch;
100 struct dictionary *dict;
102 /* An array of ints, which maps psql column numbers into
104 struct variable **vmap;
107 struct string fetch_cmd;
112 static bool set_value (struct psql_reader *r,
119 data_to_native (const void *in_, void *out_, int len)
122 const unsigned char *in = in_;
123 unsigned char *out = out_;
124 for (i = 0 ; i < len ; ++i )
129 data_to_native (const void *in_, void *out_, int len)
132 const unsigned char *in = in_;
133 unsigned char *out = out_;
134 for (i = 0 ; i < len ; ++i )
135 out[len - i - 1] = in[i];
140 #define GET_VALUE(IN, OUT) do { \
141 size_t sz = sizeof (OUT); \
142 data_to_native (*(IN), &(OUT), sz) ; \
149 dump (const unsigned char *x, int l)
153 for (i = 0; i < l ; ++i)
155 printf ("%02x ", x[i]);
160 for (i = 0; i < l ; ++i)
163 printf ("%c ", x[i]);
172 static struct variable *
173 create_var (struct psql_reader *r, const struct fmt_spec *fmt,
174 int width, const char *suggested_name, int col)
176 unsigned long int vx = 0;
177 struct variable *var;
178 char name[VAR_NAME_LEN + 1];
180 r->value_cnt += value_cnt_from_width (width);
182 if ( ! dict_make_unique_var_name (r->dict, suggested_name, &vx, name))
184 msg (ME, _("Cannot create variable name from %s"), suggested_name);
188 var = dict_create_var (r->dict, name, width);
189 var_set_both_formats (var, fmt);
193 r->vmap = xrealloc (r->vmap, (col + 1) * sizeof (*r->vmap));
196 r->vmapsize = col + 1;
207 reload_cache (struct psql_reader *r)
212 r->res = PQexec (r->conn, ds_cstr (&r->fetch_cmd));
214 if (PQresultStatus (r->res) != PGRES_TUPLES_OK || PQntuples (r->res) < 1)
226 psql_open_reader (struct psql_read_info *info, struct dictionary **dict)
229 int n_fields, n_tuples;
230 PGresult *qres = NULL;
231 casenumber n_cases = CASENUMBER_MAX;
233 struct psql_reader *r = xzalloc (sizeof *r);
234 struct string query ;
236 r->conn = PQconnectdb (info->conninfo);
237 if ( NULL == r->conn)
239 msg (ME, _("Memory error whilst opening psql source"));
243 if ( PQstatus (r->conn) != CONNECTION_OK )
245 msg (ME, _("Error opening psql source: %s."),
246 PQerrorMessage (r->conn));
253 const char *vers = PQparameterStatus (r->conn, "server_version");
255 sscanf (vers, "%d", &ver_num);
260 _("Postgres server is version %s."
261 " Reading from versions earlier than 8.0 is not supported."),
269 const char *dt = PQparameterStatus (r->conn, "integer_datetimes");
271 r->integer_datetimes = ( 0 == strcasecmp (dt, "on"));
275 if ( PQgetssl (r->conn) == NULL)
278 if (! info->allow_clear)
280 msg (ME, _("Connection is unencrypted, "
281 "but unencrypted connections have not been permitted."));
287 calendar_gregorian_to_offset (2000, 1, 1, NULL, NULL);
290 /* Create the dictionary and populate it */
291 *dict = r->dict = dict_create ();
294 select count (*) from (select * from medium) stupid_sql_standard;
297 ds_init_cstr (&query,
298 "BEGIN READ ONLY ISOLATION LEVEL SERIALIZABLE; "
299 "DECLARE pspp BINARY CURSOR FOR ");
301 ds_put_substring (&query, info->sql.ss);
303 qres = PQexec (r->conn, ds_cstr (&query));
305 if ( PQresultStatus (qres) != PGRES_COMMAND_OK )
307 msg (ME, _("Error from psql source: %s."),
308 PQresultErrorMessage (qres));
315 /* Now use the count() function to find the total number of cases
316 that this query returns.
317 Doing this incurs some overhead. The server has to iterate every
318 case in order to find this number. However, it's performed on the
319 server side, and in all except the most huge databases the extra
320 overhead will be worth the effort.
321 On the other hand, most PSPP functions don't need to know this.
322 The GUI is the notable exception.
324 ds_init_cstr (&query, "SELECT count (*) FROM (");
325 ds_put_substring (&query, info->sql.ss);
326 ds_put_cstr (&query, ") stupid_sql_standard");
328 qres = PQexec (r->conn, ds_cstr (&query));
330 if ( PQresultStatus (qres) != PGRES_TUPLES_OK )
332 msg (ME, _("Error from psql source: %s."),
333 PQresultErrorMessage (qres));
336 n_cases = atol (PQgetvalue (qres, 0, 0));
339 qres = PQexec (r->conn, "FETCH FIRST FROM pspp");
340 if ( PQresultStatus (qres) != PGRES_TUPLES_OK )
342 msg (ME, _("Error from psql source: %s."),
343 PQresultErrorMessage (qres));
347 n_tuples = PQntuples (qres);
348 n_fields = PQnfields (qres);
354 for (i = 0 ; i < n_fields ; ++i )
356 struct variable *var;
357 struct fmt_spec fmt = {FMT_F, 8, 2};
358 Oid type = PQftype (qres, i);
362 /* If there are no data then make a finger in the air
363 guess at the contents */
365 length = PQgetlength (qres, 0, i);
367 length = MAX_SHORT_STRING;
381 fmt.type = FMT_DOLLAR;
385 width = length > 0 ? length : 1;
393 width = (info->str_width == -1) ?
394 ROUND_UP (length, MAX_SHORT_STRING) : info->str_width;
400 width = length > 0 ? length : MAX_SHORT_STRING;
405 fmt.type = FMT_DTIME;
425 fmt.type = FMT_DATETIME;
437 msg (MW, _("Unsupported OID %d. SYSMIS values will be inserted."), type);
439 width = length > 0 ? length : MAX_SHORT_STRING;
445 if ( width == 0 && fmt_is_string (fmt.type))
446 fmt.w = width = MAX_SHORT_STRING;
449 var = create_var (r, &fmt, width, PQfname (qres, i), i);
450 if ( type == NUMERICOID && n_tuples > 0)
452 const uint8_t *vptr = (const uint8_t *) PQgetvalue (qres, 0, i);
454 int16_t n_digits, weight, dscale;
457 GET_VALUE (&vptr, n_digits);
458 GET_VALUE (&vptr, weight);
459 GET_VALUE (&vptr, sign);
460 GET_VALUE (&vptr, dscale);
464 fmt.w = fmt_max_output_width (fmt.type) ;
465 fmt.d = MIN (dscale, fmt_max_output_decimals (fmt.type, fmt.w));
466 var_set_both_formats (var, &fmt);
469 /* Timezones need an extra variable */
475 ds_init_cstr (&name, var_get_name (var));
476 ds_put_cstr (&name, "-zone");
481 create_var (r, &fmt, 0, ds_cstr (&name), -1);
490 ds_init_cstr (&name, var_get_name (var));
491 ds_put_cstr (&name, "-months");
496 create_var (r, &fmt, 0, ds_cstr (&name), -1);
507 qres = PQexec (r->conn, "MOVE BACKWARD 1 FROM pspp");
508 if ( PQresultStatus (qres) != PGRES_COMMAND_OK)
515 r->cache_size = info->bsize != -1 ? info->bsize: 4096;
517 ds_init_empty (&r->fetch_cmd);
518 ds_put_format (&r->fetch_cmd, "FETCH FORWARD %d FROM pspp", r->cache_size);
522 return casereader_create_sequential
526 &psql_casereader_class, r);
529 dict_destroy (*dict);
531 psql_casereader_destroy (NULL, r);
537 psql_casereader_destroy (struct casereader *reader UNUSED, void *r_)
539 struct psql_reader *r = r_;
543 ds_destroy (&r->fetch_cmd);
545 if (r->res) PQclear (r->res);
554 psql_casereader_read (struct casereader *reader UNUSED, void *r_,
557 struct psql_reader *r = r_;
559 if ( NULL == r->res || r->tuple >= r->cache_size)
561 if ( ! reload_cache (r) )
565 return set_value (r, cc);
569 set_value (struct psql_reader *r,
577 n_vars = PQnfields (r->res);
579 if ( r->tuple >= PQntuples (r->res))
582 case_create (c, r->value_cnt);
583 memset (case_data_rw_idx (c, 0)->s, ' ', MAX_SHORT_STRING * r->value_cnt);
586 for (i = 0 ; i < n_vars ; ++i )
588 Oid type = PQftype (r->res, i);
589 const struct variable *v = r->vmap[i];
590 union value *val = case_data_rw (c, v);
592 union value *val1 = NULL;
599 if (i < r->vmapsize && var_get_dict_index(v) + 1 < dict_get_var_cnt (r->dict))
601 const struct variable *v1 = NULL;
602 v1 = dict_get_var (r->dict, var_get_dict_index (v) + 1);
604 val1 = case_data_rw (c, v1);
612 if (PQgetisnull (r->res, r->tuple, i))
614 value_set_missing (val, var_get_width (v));
629 const uint8_t *vptr = (const uint8_t *) PQgetvalue (r->res, r->tuple, i);
630 int length = PQgetlength (r->res, r->tuple, i);
632 int var_width = var_get_width (v);
638 GET_VALUE (&vptr, x);
647 GET_VALUE (&vptr, x);
655 GET_VALUE (&vptr, x);
663 GET_VALUE (&vptr, x);
671 GET_VALUE (&vptr, n);
679 GET_VALUE (&vptr, n);
686 /* Postgres 8.3 uses 64 bits.
687 Earlier versions use 32 */
693 GET_VALUE (&vptr, x);
700 GET_VALUE (&vptr, x);
713 if ( r->integer_datetimes )
720 GET_VALUE (&vptr, things);
721 GET_VALUE (&vptr, us);
722 GET_VALUE (&vptr, days);
723 GET_VALUE (&vptr, months);
725 val->f = us / 1000000.0;
726 val->f += days * 24 * 3600;
732 uint32_t days, months;
735 GET_VALUE (&vptr, seconds);
736 GET_VALUE (&vptr, days);
737 GET_VALUE (&vptr, months);
740 val->f += days * 24 * 3600;
751 GET_VALUE (&vptr, x);
753 val->f = (x + r->postgres_epoch) * 24 * 3600 ;
759 if ( r->integer_datetimes)
762 GET_VALUE (&vptr, x);
763 val->f = x / 1000000.0;
768 GET_VALUE (&vptr, x);
777 if ( r->integer_datetimes)
782 GET_VALUE (&vptr, x);
783 val->f = x / 1000000.0;
789 GET_VALUE (&vptr, x);
793 GET_VALUE (&vptr, zone);
794 val1->f = zone / 3600.0;
801 if ( r->integer_datetimes)
805 GET_VALUE (&vptr, x);
809 val->f = (x + r->postgres_epoch * 24 * 3600 );
815 GET_VALUE (&vptr, x);
817 val->f = (x + r->postgres_epoch * 24 * 3600 );
825 memcpy (val->s, (char *) vptr, MIN (length, var_width));
832 int16_t n_digits, weight, dscale;
835 GET_VALUE (&vptr, n_digits);
836 GET_VALUE (&vptr, weight);
837 GET_VALUE (&vptr, sign);
838 GET_VALUE (&vptr, dscale);
845 fmt.w = fmt_max_output_width (fmt.type) ;
846 fmt.d = MIN (dscale, fmt_max_output_decimals (fmt.type, fmt.w));
847 var_set_both_formats (v, &fmt);
851 for (i = 0 ; i < n_digits; ++i)
854 GET_VALUE (&vptr, x);
855 f += x * pow (10000, weight--);