1 /* PSPP - a program for statistical analysis.
2 Copyright (C) 2008, 2009 Free Software Foundation, Inc.
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>. */
19 #include <data/casereader-provider.h>
20 #include <libpspp/message.h>
21 #include <gl/xalloc.h>
22 #include <data/dictionary.h>
26 #include "psql-reader.h"
32 #include <libpspp/misc.h>
33 #include <libpspp/str.h>
38 #define _(msgid) gettext (msgid)
39 #define N_(msgid) (msgid)
44 psql_open_reader (struct psql_read_info *info UNUSED, struct dictionary **dict UNUSED)
46 msg (ME, _("Support for reading postgres databases was not compiled into this installation of PSPP"));
57 /* Default width of string variables. */
58 #define PSQL_DEFAULT_WIDTH 8
60 /* These macros must be the same as in catalog/pg_types.h from the postgres source */
73 #define BPCHAROID 1042
74 #define VARCHAROID 1043
77 #define TIMESTAMPOID 1114
78 #define TIMESTAMPTZOID 1184
79 #define INTERVALOID 1186
80 #define TIMETZOID 1266
81 #define NUMERICOID 1700
83 static void psql_casereader_destroy (struct casereader *reader UNUSED, void *r_);
85 static struct ccase *psql_casereader_read (struct casereader *, void *);
87 static const struct casereader_class psql_casereader_class =
90 psql_casereader_destroy,
101 bool integer_datetimes;
103 double postgres_epoch;
105 struct caseproto *proto;
106 struct dictionary *dict;
108 /* An array of ints, which maps psql column numbers into
110 struct variable **vmap;
113 struct string fetch_cmd;
118 static struct ccase *set_value (struct psql_reader *r);
124 data_to_native (const void *in_, void *out_, int len)
127 const unsigned char *in = in_;
128 unsigned char *out = out_;
129 for (i = 0 ; i < len ; ++i )
134 data_to_native (const void *in_, void *out_, int len)
137 const unsigned char *in = in_;
138 unsigned char *out = out_;
139 for (i = 0 ; i < len ; ++i )
140 out[len - i - 1] = in[i];
145 #define GET_VALUE(IN, OUT) do { \
146 size_t sz = sizeof (OUT); \
147 data_to_native (*(IN), &(OUT), sz) ; \
154 dump (const unsigned char *x, int l)
158 for (i = 0; i < l ; ++i)
160 printf ("%02x ", x[i]);
165 for (i = 0; i < l ; ++i)
168 printf ("%c ", x[i]);
177 static struct variable *
178 create_var (struct psql_reader *r, const struct fmt_spec *fmt,
179 int width, const char *suggested_name, int col)
181 unsigned long int vx = 0;
182 struct variable *var;
183 char name[VAR_NAME_LEN + 1];
185 if ( ! dict_make_unique_var_name (r->dict, suggested_name, &vx, name))
187 msg (ME, _("Cannot create variable name from %s"), suggested_name);
191 var = dict_create_var (r->dict, name, width);
192 var_set_both_formats (var, fmt);
196 r->vmap = xrealloc (r->vmap, (col + 1) * sizeof (*r->vmap));
199 r->vmapsize = col + 1;
210 reload_cache (struct psql_reader *r)
215 r->res = PQexec (r->conn, ds_cstr (&r->fetch_cmd));
217 if (PQresultStatus (r->res) != PGRES_TUPLES_OK || PQntuples (r->res) < 1)
229 psql_open_reader (struct psql_read_info *info, struct dictionary **dict)
232 int n_fields, n_tuples;
233 PGresult *qres = NULL;
234 casenumber n_cases = CASENUMBER_MAX;
236 struct psql_reader *r = xzalloc (sizeof *r);
237 struct string query ;
239 r->conn = PQconnectdb (info->conninfo);
240 if ( NULL == r->conn)
242 msg (ME, _("Memory error whilst opening psql source"));
246 if ( PQstatus (r->conn) != CONNECTION_OK )
248 msg (ME, _("Error opening psql source: %s."),
249 PQerrorMessage (r->conn));
256 const char *vers = PQparameterStatus (r->conn, "server_version");
258 sscanf (vers, "%d", &ver_num);
263 _("Postgres server is version %s."
264 " Reading from versions earlier than 8.0 is not supported."),
272 const char *dt = PQparameterStatus (r->conn, "integer_datetimes");
274 r->integer_datetimes = ( 0 == strcasecmp (dt, "on"));
278 if ( PQgetssl (r->conn) == NULL)
281 if (! info->allow_clear)
283 msg (ME, _("Connection is unencrypted, "
284 "but unencrypted connections have not been permitted."));
290 calendar_gregorian_to_offset (2000, 1, 1, NULL, NULL);
293 /* Create the dictionary and populate it */
294 *dict = r->dict = dict_create ();
297 const int enc = PQclientEncoding (r->conn);
299 /* According to section 22.2 of the Postgresql manual
300 a value of zero (SQL_ASCII) indicates
301 "a declaration of ignorance about the encoding".
302 Accordingly, we don't set the dictionary's encoding
303 if we find this value.
306 dict_set_encoding (r->dict, pg_encoding_to_char (enc));
310 select count (*) from (select * from medium) stupid_sql_standard;
312 ds_init_cstr (&query,
313 "BEGIN READ ONLY ISOLATION LEVEL SERIALIZABLE; "
314 "DECLARE pspp BINARY CURSOR FOR ");
316 ds_put_substring (&query, info->sql.ss);
318 qres = PQexec (r->conn, ds_cstr (&query));
320 if ( PQresultStatus (qres) != PGRES_COMMAND_OK )
322 msg (ME, _("Error from psql source: %s."),
323 PQresultErrorMessage (qres));
330 /* Now use the count() function to find the total number of cases
331 that this query returns.
332 Doing this incurs some overhead. The server has to iterate every
333 case in order to find this number. However, it's performed on the
334 server side, and in all except the most huge databases the extra
335 overhead will be worth the effort.
336 On the other hand, most PSPP functions don't need to know this.
337 The GUI is the notable exception.
339 ds_init_cstr (&query, "SELECT count (*) FROM (");
340 ds_put_substring (&query, info->sql.ss);
341 ds_put_cstr (&query, ") stupid_sql_standard");
343 qres = PQexec (r->conn, ds_cstr (&query));
345 if ( PQresultStatus (qres) != PGRES_TUPLES_OK )
347 msg (ME, _("Error from psql source: %s."),
348 PQresultErrorMessage (qres));
351 n_cases = atol (PQgetvalue (qres, 0, 0));
354 qres = PQexec (r->conn, "FETCH FIRST FROM pspp");
355 if ( PQresultStatus (qres) != PGRES_TUPLES_OK )
357 msg (ME, _("Error from psql source: %s."),
358 PQresultErrorMessage (qres));
362 n_tuples = PQntuples (qres);
363 n_fields = PQnfields (qres);
369 for (i = 0 ; i < n_fields ; ++i )
371 struct variable *var;
372 struct fmt_spec fmt = {FMT_F, 8, 2};
373 Oid type = PQftype (qres, i);
377 /* If there are no data then make a finger in the air
378 guess at the contents */
380 length = PQgetlength (qres, 0, i);
382 length = PSQL_DEFAULT_WIDTH;
396 fmt.type = FMT_DOLLAR;
400 width = length > 0 ? length : 1;
408 width = (info->str_width == -1) ?
409 ROUND_UP (length, PSQL_DEFAULT_WIDTH) : info->str_width;
415 width = length > 0 ? length : PSQL_DEFAULT_WIDTH;
420 fmt.type = FMT_DTIME;
440 fmt.type = FMT_DATETIME;
452 msg (MW, _("Unsupported OID %d. SYSMIS values will be inserted."), type);
454 width = length > 0 ? length : PSQL_DEFAULT_WIDTH;
460 if ( width == 0 && fmt_is_string (fmt.type))
461 fmt.w = width = PSQL_DEFAULT_WIDTH;
464 var = create_var (r, &fmt, width, PQfname (qres, i), i);
465 if ( type == NUMERICOID && n_tuples > 0)
467 const uint8_t *vptr = (const uint8_t *) PQgetvalue (qres, 0, i);
469 int16_t n_digits, weight, dscale;
472 GET_VALUE (&vptr, n_digits);
473 GET_VALUE (&vptr, weight);
474 GET_VALUE (&vptr, sign);
475 GET_VALUE (&vptr, dscale);
479 fmt.w = fmt_max_output_width (fmt.type) ;
480 fmt.d = MIN (dscale, fmt_max_output_decimals (fmt.type, fmt.w));
481 var_set_both_formats (var, &fmt);
484 /* Timezones need an extra variable */
490 ds_init_cstr (&name, var_get_name (var));
491 ds_put_cstr (&name, "-zone");
496 create_var (r, &fmt, 0, ds_cstr (&name), -1);
505 ds_init_cstr (&name, var_get_name (var));
506 ds_put_cstr (&name, "-months");
511 create_var (r, &fmt, 0, ds_cstr (&name), -1);
522 qres = PQexec (r->conn, "MOVE BACKWARD 1 FROM pspp");
523 if ( PQresultStatus (qres) != PGRES_COMMAND_OK)
530 r->cache_size = info->bsize != -1 ? info->bsize: 4096;
532 ds_init_empty (&r->fetch_cmd);
533 ds_put_format (&r->fetch_cmd, "FETCH FORWARD %d FROM pspp", r->cache_size);
536 r->proto = caseproto_ref (dict_get_proto (*dict));
538 return casereader_create_sequential
542 &psql_casereader_class, r);
545 dict_destroy (*dict);
547 psql_casereader_destroy (NULL, r);
553 psql_casereader_destroy (struct casereader *reader UNUSED, void *r_)
555 struct psql_reader *r = r_;
559 ds_destroy (&r->fetch_cmd);
561 if (r->res) PQclear (r->res);
563 caseproto_unref (r->proto);
570 static struct ccase *
571 psql_casereader_read (struct casereader *reader UNUSED, void *r_)
573 struct psql_reader *r = r_;
575 if ( NULL == r->res || r->tuple >= r->cache_size)
577 if ( ! reload_cache (r) )
581 return set_value (r);
584 static struct ccase *
585 set_value (struct psql_reader *r)
593 n_vars = PQnfields (r->res);
595 if ( r->tuple >= PQntuples (r->res))
598 c = case_create (r->proto);
599 case_set_missing (c);
602 for (i = 0 ; i < n_vars ; ++i )
604 Oid type = PQftype (r->res, i);
605 const struct variable *v = r->vmap[i];
606 union value *val = case_data_rw (c, v);
608 union value *val1 = NULL;
615 if (i < r->vmapsize && var_get_dict_index(v) + 1 < dict_get_var_cnt (r->dict))
617 const struct variable *v1 = NULL;
618 v1 = dict_get_var (r->dict, var_get_dict_index (v) + 1);
620 val1 = case_data_rw (c, v1);
628 if (PQgetisnull (r->res, r->tuple, i))
630 value_set_missing (val, var_get_width (v));
645 const uint8_t *vptr = (const uint8_t *) PQgetvalue (r->res, r->tuple, i);
646 int length = PQgetlength (r->res, r->tuple, i);
648 int var_width = var_get_width (v);
654 GET_VALUE (&vptr, x);
663 GET_VALUE (&vptr, x);
671 GET_VALUE (&vptr, x);
679 GET_VALUE (&vptr, x);
687 GET_VALUE (&vptr, n);
695 GET_VALUE (&vptr, n);
702 /* Postgres 8.3 uses 64 bits.
703 Earlier versions use 32 */
709 GET_VALUE (&vptr, x);
716 GET_VALUE (&vptr, x);
729 if ( r->integer_datetimes )
736 GET_VALUE (&vptr, things);
737 GET_VALUE (&vptr, us);
738 GET_VALUE (&vptr, days);
739 GET_VALUE (&vptr, months);
741 val->f = us / 1000000.0;
742 val->f += days * 24 * 3600;
748 uint32_t days, months;
751 GET_VALUE (&vptr, seconds);
752 GET_VALUE (&vptr, days);
753 GET_VALUE (&vptr, months);
756 val->f += days * 24 * 3600;
767 GET_VALUE (&vptr, x);
769 val->f = (x + r->postgres_epoch) * 24 * 3600 ;
775 if ( r->integer_datetimes)
778 GET_VALUE (&vptr, x);
779 val->f = x / 1000000.0;
784 GET_VALUE (&vptr, x);
793 if ( r->integer_datetimes)
798 GET_VALUE (&vptr, x);
799 val->f = x / 1000000.0;
805 GET_VALUE (&vptr, x);
809 GET_VALUE (&vptr, zone);
810 val1->f = zone / 3600.0;
817 if ( r->integer_datetimes)
821 GET_VALUE (&vptr, x);
825 val->f = (x + r->postgres_epoch * 24 * 3600 );
831 GET_VALUE (&vptr, x);
833 val->f = (x + r->postgres_epoch * 24 * 3600 );
841 memcpy (value_str_rw (val, var_width), (char *) vptr,
842 MIN (length, var_width));
849 int16_t n_digits, weight, dscale;
852 GET_VALUE (&vptr, n_digits);
853 GET_VALUE (&vptr, weight);
854 GET_VALUE (&vptr, sign);
855 GET_VALUE (&vptr, dscale);
862 fmt.w = fmt_max_output_width (fmt.type) ;
863 fmt.d = MIN (dscale, fmt_max_output_decimals (fmt.type, fmt.w));
864 var_set_both_formats (v, &fmt);
868 for (i = 0 ; i < n_digits; ++i)
871 GET_VALUE (&vptr, x);
872 f += x * pow (10000, weight--);