1 /* PSPP - a program for statistical analysis.
2 Copyright (C) 2008, 2009, 2010, 2011, 2012 Free Software Foundation, Inc.
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>. */
19 #include "data/psql-reader.h"
25 #include "data/calendar.h"
26 #include "data/casereader-provider.h"
27 #include "data/dictionary.h"
28 #include "data/format.h"
29 #include "data/variable.h"
30 #include "libpspp/i18n.h"
31 #include "libpspp/message.h"
32 #include "libpspp/misc.h"
33 #include "libpspp/str.h"
35 #include "gl/c-strcase.h"
36 #include "gl/minmax.h"
37 #include "gl/xalloc.h"
40 #define _(msgid) gettext (msgid)
41 #define N_(msgid) (msgid)
46 psql_open_reader (struct psql_read_info *info UNUSED, struct dictionary **dict UNUSED)
48 msg (ME, _("Support for reading postgres databases was not compiled into this installation of PSPP"));
59 /* Default width of string variables. */
60 #define PSQL_DEFAULT_WIDTH 8
62 /* These macros must be the same as in catalog/pg_types.h from the postgres source */
75 #define BPCHAROID 1042
76 #define VARCHAROID 1043
79 #define TIMESTAMPOID 1114
80 #define TIMESTAMPTZOID 1184
81 #define INTERVALOID 1186
82 #define TIMETZOID 1266
83 #define NUMERICOID 1700
85 static void psql_casereader_destroy (struct casereader *reader UNUSED, void *r_);
87 static struct ccase *psql_casereader_read (struct casereader *, void *);
89 static const struct casereader_class psql_casereader_class =
92 psql_casereader_destroy,
103 bool integer_datetimes;
105 double postgres_epoch;
107 struct caseproto *proto;
108 struct dictionary *dict;
110 /* An array of ints, which maps psql column numbers into
112 struct variable **vmap;
115 struct string fetch_cmd;
120 static struct ccase *set_value (struct psql_reader *r);
126 data_to_native (const void *in_, void *out_, int len)
129 const unsigned char *in = in_;
130 unsigned char *out = out_;
131 for (i = 0 ; i < len ; ++i)
136 data_to_native (const void *in_, void *out_, int len)
139 const unsigned char *in = in_;
140 unsigned char *out = out_;
141 for (i = 0 ; i < len ; ++i)
142 out[len - i - 1] = in[i];
147 #define GET_VALUE(IN, OUT) do { \
148 size_t sz = sizeof (OUT); \
149 data_to_native (*(IN), &(OUT), sz) ; \
156 dump (const unsigned char *x, int l)
160 for (i = 0; i < l ; ++i)
162 printf ("%02x ", x[i]);
167 for (i = 0; i < l ; ++i)
170 printf ("%c ", x[i]);
179 static struct variable *
180 create_var (struct psql_reader *r, struct fmt_spec fmt,
181 int width, const char *suggested_name, int col)
183 unsigned long int vx = 0;
184 struct variable *var;
187 name = dict_make_unique_var_name (r->dict, suggested_name, &vx);
188 var = dict_create_var (r->dict, name, width);
191 var_set_both_formats (var, fmt);
195 r->vmap = xrealloc (r->vmap, (col + 1) * sizeof (*r->vmap));
198 r->vmapsize = col + 1;
209 reload_cache (struct psql_reader *r)
214 r->res = PQexec (r->conn, ds_cstr (&r->fetch_cmd));
216 if (PQresultStatus (r->res) != PGRES_TUPLES_OK || PQntuples (r->res) < 1)
228 psql_open_reader (struct psql_read_info *info, struct dictionary **dict)
231 int n_fields, n_tuples;
232 PGresult *qres = NULL;
233 casenumber n_cases = CASENUMBER_MAX;
234 const char *encoding;
236 struct psql_reader *r = XZALLOC (struct psql_reader);
238 r->conn = PQconnectdb (info->conninfo);
241 msg (ME, _("Memory error whilst opening psql source"));
245 if (PQstatus (r->conn) != CONNECTION_OK)
247 msg (ME, _("Error opening psql source: %s."),
248 PQerrorMessage (r->conn));
255 const char *vers = PQparameterStatus (r->conn, "server_version");
257 sscanf (vers, "%d", &ver_num);
262 _("Postgres server is version %s."
263 " Reading from versions earlier than 8.0 is not supported."),
271 const char *dt = PQparameterStatus (r->conn, "integer_datetimes");
273 r->integer_datetimes = (0 == c_strcasecmp (dt, "on"));
277 if (PQgetssl (r->conn) == NULL)
280 if (! info->allow_clear)
282 msg (ME, _("Connection is unencrypted, "
283 "but unencrypted connections have not been permitted."));
288 r->postgres_epoch = calendar_gregorian_to_offset (
289 2000, 1, 1, settings_get_fmt_settings (), NULL);
292 const int enc = PQclientEncoding (r->conn);
294 /* According to section 22.2 of the Postgresql manual
295 a value of zero (SQL_ASCII) indicates
296 "a declaration of ignorance about the encoding".
297 Accordingly, we use the default encoding
298 if we find this value.
300 encoding = enc ? pg_encoding_to_char (enc) : get_default_encoding ();
302 /* Create the dictionary and populate it */
303 *dict = r->dict = dict_create (encoding);
306 const int version = PQserverVersion (r->conn);
308 Versions before 9.1 don't have the REPEATABLE READ isolation level.
309 However according to <a12321aabb@gmail.com> if the server is in the
310 "hot standby" mode then SERIALIZABLE won't work.
312 char *query = xasprintf (
313 "BEGIN READ ONLY ISOLATION LEVEL %s; "
314 "DECLARE pspp BINARY CURSOR FOR %s",
315 (version < 90100) ? "SERIALIZABLE" : "REPEATABLE READ",
317 qres = PQexec (r->conn, query);
320 if (PQresultStatus (qres) != PGRES_COMMAND_OK)
322 msg (ME, _("Error from psql source: %s."),
323 PQresultErrorMessage (qres));
330 /* Now use the count() function to find the total number of cases
331 that this query returns.
332 Doing this incurs some overhead. The server has to iterate every
333 case in order to find this number. However, it's performed on the
334 server side, and in all except the most huge databases the extra
335 overhead will be worth the effort.
336 On the other hand, most PSPP functions don't need to know this.
337 The GUI is the notable exception.
339 query = xasprintf ("SELECT count (*) FROM (%s) stupid_sql_standard",
341 qres = PQexec (r->conn, query);
344 if (PQresultStatus (qres) != PGRES_TUPLES_OK)
346 msg (ME, _("Error from psql source: %s."),
347 PQresultErrorMessage (qres));
350 n_cases = atol (PQgetvalue (qres, 0, 0));
353 qres = PQexec (r->conn, "FETCH FIRST FROM pspp");
354 if (PQresultStatus (qres) != PGRES_TUPLES_OK)
356 msg (ME, _("Error from psql source: %s."),
357 PQresultErrorMessage (qres));
361 n_tuples = PQntuples (qres);
362 n_fields = PQnfields (qres);
368 for (i = 0 ; i < n_fields ; ++i)
370 struct variable *var;
371 struct fmt_spec fmt = { .type = FMT_F, .w = 8, .d = 2 };
372 Oid type = PQftype (qres, i);
376 /* If there are no data then make a finger in the air
377 guess at the contents */
379 length = PQgetlength (qres, 0, i);
381 length = PSQL_DEFAULT_WIDTH;
395 fmt.type = FMT_DOLLAR;
399 width = length > 0 ? length : 1;
407 width = (info->str_width == -1) ?
408 ROUND_UP (length, PSQL_DEFAULT_WIDTH) : info->str_width;
414 width = length > 0 ? length : PSQL_DEFAULT_WIDTH;
419 fmt.type = FMT_DTIME;
439 fmt.type = FMT_DATETIME;
451 msg (MW, _("Unsupported OID %d. SYSMIS values will be inserted."), type);
453 width = length > 0 ? length : PSQL_DEFAULT_WIDTH;
459 if (width == 0 && fmt_is_string (fmt.type))
460 fmt.w = width = PSQL_DEFAULT_WIDTH;
463 var = create_var (r, fmt, width, PQfname (qres, i), i);
464 if (type == NUMERICOID && n_tuples > 0)
466 const uint8_t *vptr = (const uint8_t *) PQgetvalue (qres, 0, i);
468 int16_t n_digits, weight, dscale;
471 GET_VALUE (&vptr, n_digits);
472 GET_VALUE (&vptr, weight);
473 GET_VALUE (&vptr, sign);
474 GET_VALUE (&vptr, dscale);
478 fmt.w = fmt_max_output_width (fmt.type) ;
479 fmt.d = MIN (dscale, fmt_max_output_decimals (fmt.type, fmt.w));
480 var_set_both_formats (var, fmt);
483 /* Timezones need an extra variable */
489 ds_init_cstr (&name, var_get_name (var));
490 ds_put_cstr (&name, "-zone");
495 create_var (r, fmt, 0, ds_cstr (&name), -1);
504 ds_init_cstr (&name, var_get_name (var));
505 ds_put_cstr (&name, "-months");
510 create_var (r, fmt, 0, ds_cstr (&name), -1);
521 qres = PQexec (r->conn, "MOVE BACKWARD 1 FROM pspp");
522 if (PQresultStatus (qres) != PGRES_COMMAND_OK)
529 r->cache_size = info->bsize != -1 ? info->bsize: 4096;
531 ds_init_empty (&r->fetch_cmd);
532 ds_put_format (&r->fetch_cmd, "FETCH FORWARD %d FROM pspp", r->cache_size);
535 r->proto = caseproto_ref (dict_get_proto (*dict));
537 return casereader_create_sequential
541 &psql_casereader_class, r);
546 psql_casereader_destroy (NULL, r);
552 psql_casereader_destroy (struct casereader *reader UNUSED, void *r_)
554 struct psql_reader *r = r_;
558 ds_destroy (&r->fetch_cmd);
560 if (r->res) PQclear (r->res);
562 caseproto_unref (r->proto);
569 static struct ccase *
570 psql_casereader_read (struct casereader *reader UNUSED, void *r_)
572 struct psql_reader *r = r_;
574 if (NULL == r->res || r->tuple >= r->cache_size)
576 if (! reload_cache (r))
580 return set_value (r);
583 static struct ccase *
584 set_value (struct psql_reader *r)
592 n_vars = PQnfields (r->res);
594 if (r->tuple >= PQntuples (r->res))
597 c = case_create (r->proto);
598 case_set_missing (c);
601 for (i = 0 ; i < n_vars ; ++i)
603 Oid type = PQftype (r->res, i);
604 const struct variable *v = r->vmap[i];
605 union value *val = case_data_rw (c, v);
607 union value *val1 = NULL;
614 if (i < r->vmapsize && var_get_dict_index(v) + 1 < dict_get_n_vars (r->dict))
616 const struct variable *v1 = NULL;
617 v1 = dict_get_var (r->dict, var_get_dict_index (v) + 1);
619 val1 = case_data_rw (c, v1);
627 if (PQgetisnull (r->res, r->tuple, i))
629 value_set_missing (val, var_get_width (v));
644 const uint8_t *vptr = (const uint8_t *) PQgetvalue (r->res, r->tuple, i);
645 int length = PQgetlength (r->res, r->tuple, i);
647 int var_width = var_get_width (v);
653 GET_VALUE (&vptr, x);
662 GET_VALUE (&vptr, x);
670 GET_VALUE (&vptr, x);
678 GET_VALUE (&vptr, x);
686 GET_VALUE (&vptr, n);
694 GET_VALUE (&vptr, n);
701 /* Postgres 8.3 uses 64 bits.
702 Earlier versions use 32 */
708 GET_VALUE (&vptr, x);
715 GET_VALUE (&vptr, x);
728 if (r->integer_datetimes)
735 GET_VALUE (&vptr, things);
736 GET_VALUE (&vptr, us);
737 GET_VALUE (&vptr, days);
738 GET_VALUE (&vptr, months);
740 val->f = us / 1000000.0;
741 val->f += days * 24 * 3600;
747 uint32_t days, months;
750 GET_VALUE (&vptr, seconds);
751 GET_VALUE (&vptr, days);
752 GET_VALUE (&vptr, months);
755 val->f += days * 24 * 3600;
766 GET_VALUE (&vptr, x);
768 val->f = (x + r->postgres_epoch) * 24 * 3600 ;
774 if (r->integer_datetimes)
777 GET_VALUE (&vptr, x);
778 val->f = x / 1000000.0;
783 GET_VALUE (&vptr, x);
792 if (r->integer_datetimes)
797 GET_VALUE (&vptr, x);
798 val->f = x / 1000000.0;
804 GET_VALUE (&vptr, x);
808 GET_VALUE (&vptr, zone);
809 val1->f = zone / 3600.0;
816 if (r->integer_datetimes)
820 GET_VALUE (&vptr, x);
824 val->f = (x + r->postgres_epoch * 24 * 3600);
830 GET_VALUE (&vptr, x);
832 val->f = (x + r->postgres_epoch * 24 * 3600);
840 memcpy (val->s, vptr, MIN (length, var_width));
847 int16_t n_digits, weight, dscale;
850 GET_VALUE (&vptr, n_digits);
851 GET_VALUE (&vptr, weight);
852 GET_VALUE (&vptr, sign);
853 GET_VALUE (&vptr, dscale);
860 fmt.w = fmt_max_output_width (fmt.type) ;
861 fmt.d = MIN (dscale, fmt_max_output_decimals (fmt.type, fmt.w));
862 var_set_both_formats (v, &fmt);
866 for (i = 0 ; i < n_digits; ++i)
869 GET_VALUE (&vptr, x);
870 f += x * pow (10000, weight--);