1 /* PSPP - a program for statistical analysis.
2 Copyright (C) 2008, 2009, 2010, 2011, 2012 Free Software Foundation, Inc.
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>. */
19 #include "data/psql-reader.h"
25 #include "data/calendar.h"
26 #include "data/casereader-provider.h"
27 #include "data/dictionary.h"
28 #include "data/format.h"
29 #include "data/variable.h"
30 #include "libpspp/i18n.h"
31 #include "libpspp/message.h"
32 #include "libpspp/misc.h"
33 #include "libpspp/str.h"
35 #include "gl/c-strcase.h"
36 #include "gl/minmax.h"
37 #include "gl/xalloc.h"
40 #define _(msgid) gettext (msgid)
41 #define N_(msgid) (msgid)
46 psql_open_reader (struct psql_read_info *info UNUSED, struct dictionary **dict UNUSED)
48 msg (ME, _("Support for reading postgres databases was not compiled into this installation of PSPP"));
59 /* Default width of string variables. */
60 #define PSQL_DEFAULT_WIDTH 8
62 /* These macros must be the same as in catalog/pg_types.h from the postgres source */
75 #define BPCHAROID 1042
76 #define VARCHAROID 1043
79 #define TIMESTAMPOID 1114
80 #define TIMESTAMPTZOID 1184
81 #define INTERVALOID 1186
82 #define TIMETZOID 1266
83 #define NUMERICOID 1700
85 static void psql_casereader_destroy (struct casereader *reader UNUSED, void *r_);
87 static struct ccase *psql_casereader_read (struct casereader *, void *);
89 static const struct casereader_class psql_casereader_class =
92 psql_casereader_destroy,
103 bool integer_datetimes;
105 double postgres_epoch;
107 struct caseproto *proto;
108 struct dictionary *dict;
110 /* An array of ints, which maps psql column numbers into
112 struct variable **vmap;
115 struct string fetch_cmd;
120 static struct ccase *set_value (struct psql_reader *r);
126 data_to_native (const void *in_, void *out_, int len)
129 const unsigned char *in = in_;
130 unsigned char *out = out_;
131 for (i = 0 ; i < len ; ++i)
136 data_to_native (const void *in_, void *out_, int len)
139 const unsigned char *in = in_;
140 unsigned char *out = out_;
141 for (i = 0 ; i < len ; ++i)
142 out[len - i - 1] = in[i];
147 #define GET_VALUE(IN, OUT) do { \
148 size_t sz = sizeof (OUT); \
149 data_to_native (*(IN), &(OUT), sz) ; \
156 dump (const unsigned char *x, int l)
160 for (i = 0; i < l ; ++i)
162 printf ("%02x ", x[i]);
167 for (i = 0; i < l ; ++i)
170 printf ("%c ", x[i]);
179 static struct variable *
180 create_var (struct psql_reader *r, const struct fmt_spec *fmt,
181 int width, const char *suggested_name, int col)
183 unsigned long int vx = 0;
184 struct variable *var;
187 name = dict_make_unique_var_name (r->dict, suggested_name, &vx);
188 var = dict_create_var (r->dict, name, width);
191 var_set_both_formats (var, fmt);
195 r->vmap = xrealloc (r->vmap, (col + 1) * sizeof (*r->vmap));
198 r->vmapsize = col + 1;
209 reload_cache (struct psql_reader *r)
214 r->res = PQexec (r->conn, ds_cstr (&r->fetch_cmd));
216 if (PQresultStatus (r->res) != PGRES_TUPLES_OK || PQntuples (r->res) < 1)
228 psql_open_reader (struct psql_read_info *info, struct dictionary **dict)
231 int n_fields, n_tuples;
232 PGresult *qres = NULL;
233 casenumber n_cases = CASENUMBER_MAX;
234 const char *encoding;
236 struct psql_reader *r = XZALLOC (struct psql_reader);
237 struct string query ;
239 r->conn = PQconnectdb (info->conninfo);
242 msg (ME, _("Memory error whilst opening psql source"));
246 if (PQstatus (r->conn) != CONNECTION_OK)
248 msg (ME, _("Error opening psql source: %s."),
249 PQerrorMessage (r->conn));
256 const char *vers = PQparameterStatus (r->conn, "server_version");
258 sscanf (vers, "%d", &ver_num);
263 _("Postgres server is version %s."
264 " Reading from versions earlier than 8.0 is not supported."),
272 const char *dt = PQparameterStatus (r->conn, "integer_datetimes");
274 r->integer_datetimes = (0 == c_strcasecmp (dt, "on"));
278 if (PQgetssl (r->conn) == NULL)
281 if (! info->allow_clear)
283 msg (ME, _("Connection is unencrypted, "
284 "but unencrypted connections have not been permitted."));
289 r->postgres_epoch = calendar_gregorian_to_offset (
290 2000, 1, 1, settings_get_fmt_settings (), NULL);
293 const int enc = PQclientEncoding (r->conn);
295 /* According to section 22.2 of the Postgresql manual
296 a value of zero (SQL_ASCII) indicates
297 "a declaration of ignorance about the encoding".
298 Accordingly, we use the default encoding
299 if we find this value.
301 encoding = enc ? pg_encoding_to_char (enc) : get_default_encoding ();
303 /* Create the dictionary and populate it */
304 *dict = r->dict = dict_create (encoding);
307 const int version = PQserverVersion (r->conn);
308 ds_init_empty (&query);
310 Versions before 9.1 don't have the REPEATABLE READ isolation level.
311 However according to <a12321aabb@gmail.com> if the server is in the
312 "hot standby" mode then SERIALIZABLE won't work.
314 ds_put_c_format (&query,
315 "BEGIN READ ONLY ISOLATION LEVEL %s; "
316 "DECLARE pspp BINARY CURSOR FOR ",
317 (version < 90100) ? "SERIALIZABLE" : "REPEATABLE READ");
319 ds_put_substring (&query, info->sql.ss);
321 qres = PQexec (r->conn, ds_cstr (&query));
323 if (PQresultStatus (qres) != PGRES_COMMAND_OK)
325 msg (ME, _("Error from psql source: %s."),
326 PQresultErrorMessage (qres));
333 /* Now use the count() function to find the total number of cases
334 that this query returns.
335 Doing this incurs some overhead. The server has to iterate every
336 case in order to find this number. However, it's performed on the
337 server side, and in all except the most huge databases the extra
338 overhead will be worth the effort.
339 On the other hand, most PSPP functions don't need to know this.
340 The GUI is the notable exception.
342 ds_init_cstr (&query, "SELECT count (*) FROM (");
343 ds_put_substring (&query, info->sql.ss);
344 ds_put_cstr (&query, ") stupid_sql_standard");
346 qres = PQexec (r->conn, ds_cstr (&query));
348 if (PQresultStatus (qres) != PGRES_TUPLES_OK)
350 msg (ME, _("Error from psql source: %s."),
351 PQresultErrorMessage (qres));
354 n_cases = atol (PQgetvalue (qres, 0, 0));
357 qres = PQexec (r->conn, "FETCH FIRST FROM pspp");
358 if (PQresultStatus (qres) != PGRES_TUPLES_OK)
360 msg (ME, _("Error from psql source: %s."),
361 PQresultErrorMessage (qres));
365 n_tuples = PQntuples (qres);
366 n_fields = PQnfields (qres);
372 for (i = 0 ; i < n_fields ; ++i)
374 struct variable *var;
375 struct fmt_spec fmt = { .type = FMT_F, .w = 8, .d = 2 };
376 Oid type = PQftype (qres, i);
380 /* If there are no data then make a finger in the air
381 guess at the contents */
383 length = PQgetlength (qres, 0, i);
385 length = PSQL_DEFAULT_WIDTH;
399 fmt.type = FMT_DOLLAR;
403 width = length > 0 ? length : 1;
411 width = (info->str_width == -1) ?
412 ROUND_UP (length, PSQL_DEFAULT_WIDTH) : info->str_width;
418 width = length > 0 ? length : PSQL_DEFAULT_WIDTH;
423 fmt.type = FMT_DTIME;
443 fmt.type = FMT_DATETIME;
455 msg (MW, _("Unsupported OID %d. SYSMIS values will be inserted."), type);
457 width = length > 0 ? length : PSQL_DEFAULT_WIDTH;
463 if (width == 0 && fmt_is_string (fmt.type))
464 fmt.w = width = PSQL_DEFAULT_WIDTH;
467 var = create_var (r, &fmt, width, PQfname (qres, i), i);
468 if (type == NUMERICOID && n_tuples > 0)
470 const uint8_t *vptr = (const uint8_t *) PQgetvalue (qres, 0, i);
472 int16_t n_digits, weight, dscale;
475 GET_VALUE (&vptr, n_digits);
476 GET_VALUE (&vptr, weight);
477 GET_VALUE (&vptr, sign);
478 GET_VALUE (&vptr, dscale);
482 fmt.w = fmt_max_output_width (fmt.type) ;
483 fmt.d = MIN (dscale, fmt_max_output_decimals (fmt.type, fmt.w));
484 var_set_both_formats (var, &fmt);
487 /* Timezones need an extra variable */
493 ds_init_cstr (&name, var_get_name (var));
494 ds_put_cstr (&name, "-zone");
499 create_var (r, &fmt, 0, ds_cstr (&name), -1);
508 ds_init_cstr (&name, var_get_name (var));
509 ds_put_cstr (&name, "-months");
514 create_var (r, &fmt, 0, ds_cstr (&name), -1);
525 qres = PQexec (r->conn, "MOVE BACKWARD 1 FROM pspp");
526 if (PQresultStatus (qres) != PGRES_COMMAND_OK)
533 r->cache_size = info->bsize != -1 ? info->bsize: 4096;
535 ds_init_empty (&r->fetch_cmd);
536 ds_put_format (&r->fetch_cmd, "FETCH FORWARD %d FROM pspp", r->cache_size);
539 r->proto = caseproto_ref (dict_get_proto (*dict));
541 return casereader_create_sequential
545 &psql_casereader_class, r);
550 psql_casereader_destroy (NULL, r);
556 psql_casereader_destroy (struct casereader *reader UNUSED, void *r_)
558 struct psql_reader *r = r_;
562 ds_destroy (&r->fetch_cmd);
564 if (r->res) PQclear (r->res);
566 caseproto_unref (r->proto);
573 static struct ccase *
574 psql_casereader_read (struct casereader *reader UNUSED, void *r_)
576 struct psql_reader *r = r_;
578 if (NULL == r->res || r->tuple >= r->cache_size)
580 if (! reload_cache (r))
584 return set_value (r);
587 static struct ccase *
588 set_value (struct psql_reader *r)
596 n_vars = PQnfields (r->res);
598 if (r->tuple >= PQntuples (r->res))
601 c = case_create (r->proto);
602 case_set_missing (c);
605 for (i = 0 ; i < n_vars ; ++i)
607 Oid type = PQftype (r->res, i);
608 const struct variable *v = r->vmap[i];
609 union value *val = case_data_rw (c, v);
611 union value *val1 = NULL;
618 if (i < r->vmapsize && var_get_dict_index(v) + 1 < dict_get_n_vars (r->dict))
620 const struct variable *v1 = NULL;
621 v1 = dict_get_var (r->dict, var_get_dict_index (v) + 1);
623 val1 = case_data_rw (c, v1);
631 if (PQgetisnull (r->res, r->tuple, i))
633 value_set_missing (val, var_get_width (v));
648 const uint8_t *vptr = (const uint8_t *) PQgetvalue (r->res, r->tuple, i);
649 int length = PQgetlength (r->res, r->tuple, i);
651 int var_width = var_get_width (v);
657 GET_VALUE (&vptr, x);
666 GET_VALUE (&vptr, x);
674 GET_VALUE (&vptr, x);
682 GET_VALUE (&vptr, x);
690 GET_VALUE (&vptr, n);
698 GET_VALUE (&vptr, n);
705 /* Postgres 8.3 uses 64 bits.
706 Earlier versions use 32 */
712 GET_VALUE (&vptr, x);
719 GET_VALUE (&vptr, x);
732 if (r->integer_datetimes)
739 GET_VALUE (&vptr, things);
740 GET_VALUE (&vptr, us);
741 GET_VALUE (&vptr, days);
742 GET_VALUE (&vptr, months);
744 val->f = us / 1000000.0;
745 val->f += days * 24 * 3600;
751 uint32_t days, months;
754 GET_VALUE (&vptr, seconds);
755 GET_VALUE (&vptr, days);
756 GET_VALUE (&vptr, months);
759 val->f += days * 24 * 3600;
770 GET_VALUE (&vptr, x);
772 val->f = (x + r->postgres_epoch) * 24 * 3600 ;
778 if (r->integer_datetimes)
781 GET_VALUE (&vptr, x);
782 val->f = x / 1000000.0;
787 GET_VALUE (&vptr, x);
796 if (r->integer_datetimes)
801 GET_VALUE (&vptr, x);
802 val->f = x / 1000000.0;
808 GET_VALUE (&vptr, x);
812 GET_VALUE (&vptr, zone);
813 val1->f = zone / 3600.0;
820 if (r->integer_datetimes)
824 GET_VALUE (&vptr, x);
828 val->f = (x + r->postgres_epoch * 24 * 3600);
834 GET_VALUE (&vptr, x);
836 val->f = (x + r->postgres_epoch * 24 * 3600);
844 memcpy (val->s, vptr, MIN (length, var_width));
851 int16_t n_digits, weight, dscale;
854 GET_VALUE (&vptr, n_digits);
855 GET_VALUE (&vptr, weight);
856 GET_VALUE (&vptr, sign);
857 GET_VALUE (&vptr, dscale);
864 fmt.w = fmt_max_output_width (fmt.type) ;
865 fmt.d = MIN (dscale, fmt_max_output_decimals (fmt.type, fmt.w));
866 var_set_both_formats (v, &fmt);
870 for (i = 0 ; i < n_digits; ++i)
873 GET_VALUE (&vptr, x);
874 f += x * pow (10000, weight--);