1 /* PSPP - a program for statistical analysis.
2 Copyright (C) 2008, 2009, 2010, 2011, 2012 Free Software Foundation, Inc.
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>. */
19 #include "data/psql-reader.h"
25 #include "data/calendar.h"
26 #include "data/casereader-provider.h"
27 #include "data/dictionary.h"
28 #include "data/format.h"
29 #include "data/variable.h"
30 #include "libpspp/i18n.h"
31 #include "libpspp/message.h"
32 #include "libpspp/misc.h"
33 #include "libpspp/str.h"
35 #include "gl/c-strcase.h"
36 #include "gl/minmax.h"
37 #include "gl/xalloc.h"
40 #define _(msgid) gettext (msgid)
41 #define N_(msgid) (msgid)
46 psql_open_reader (struct psql_read_info *info UNUSED, struct dictionary **dict UNUSED)
48 msg (ME, _("Support for reading postgres databases was not compiled into this installation of PSPP"));
59 /* Default width of string variables. */
60 #define PSQL_DEFAULT_WIDTH 8
62 /* These macros must be the same as in catalog/pg_types.h from the postgres source */
75 #define BPCHAROID 1042
76 #define VARCHAROID 1043
79 #define TIMESTAMPOID 1114
80 #define TIMESTAMPTZOID 1184
81 #define INTERVALOID 1186
82 #define TIMETZOID 1266
83 #define NUMERICOID 1700
85 static void psql_casereader_destroy (struct casereader *reader UNUSED, void *r_);
87 static struct ccase *psql_casereader_read (struct casereader *, void *);
89 static const struct casereader_class psql_casereader_class =
92 psql_casereader_destroy,
103 bool integer_datetimes;
105 double postgres_epoch;
107 struct caseproto *proto;
108 struct dictionary *dict;
110 /* An array of ints, which maps psql column numbers into
112 struct variable **vmap;
115 struct string fetch_cmd;
120 static struct ccase *set_value (struct psql_reader *r);
126 data_to_native (const void *in_, void *out_, int len)
129 const unsigned char *in = in_;
130 unsigned char *out = out_;
131 for (i = 0 ; i < len ; ++i )
136 data_to_native (const void *in_, void *out_, int len)
139 const unsigned char *in = in_;
140 unsigned char *out = out_;
141 for (i = 0 ; i < len ; ++i )
142 out[len - i - 1] = in[i];
147 #define GET_VALUE(IN, OUT) do { \
148 size_t sz = sizeof (OUT); \
149 data_to_native (*(IN), &(OUT), sz) ; \
156 dump (const unsigned char *x, int l)
160 for (i = 0; i < l ; ++i)
162 printf ("%02x ", x[i]);
167 for (i = 0; i < l ; ++i)
170 printf ("%c ", x[i]);
179 static struct variable *
180 create_var (struct psql_reader *r, const struct fmt_spec *fmt,
181 int width, const char *suggested_name, int col)
183 unsigned long int vx = 0;
184 struct variable *var;
187 name = dict_make_unique_var_name (r->dict, suggested_name, &vx);
188 var = dict_create_var (r->dict, name, width);
191 var_set_both_formats (var, fmt);
195 r->vmap = xrealloc (r->vmap, (col + 1) * sizeof (*r->vmap));
198 r->vmapsize = col + 1;
209 reload_cache (struct psql_reader *r)
214 r->res = PQexec (r->conn, ds_cstr (&r->fetch_cmd));
216 if (PQresultStatus (r->res) != PGRES_TUPLES_OK || PQntuples (r->res) < 1)
228 psql_open_reader (struct psql_read_info *info, struct dictionary **dict)
231 int n_fields, n_tuples;
232 PGresult *qres = NULL;
233 casenumber n_cases = CASENUMBER_MAX;
234 const char *encoding;
236 struct psql_reader *r = xzalloc (sizeof *r);
237 struct string query ;
239 r->conn = PQconnectdb (info->conninfo);
240 if ( NULL == r->conn)
242 msg (ME, _("Memory error whilst opening psql source"));
246 if ( PQstatus (r->conn) != CONNECTION_OK )
248 msg (ME, _("Error opening psql source: %s."),
249 PQerrorMessage (r->conn));
256 const char *vers = PQparameterStatus (r->conn, "server_version");
258 sscanf (vers, "%d", &ver_num);
263 _("Postgres server is version %s."
264 " Reading from versions earlier than 8.0 is not supported."),
272 const char *dt = PQparameterStatus (r->conn, "integer_datetimes");
274 r->integer_datetimes = ( 0 == c_strcasecmp (dt, "on"));
278 if ( PQgetssl (r->conn) == NULL)
281 if (! info->allow_clear)
283 msg (ME, _("Connection is unencrypted, "
284 "but unencrypted connections have not been permitted."));
289 r->postgres_epoch = calendar_gregorian_to_offset (2000, 1, 1, NULL);
292 const int enc = PQclientEncoding (r->conn);
294 /* According to section 22.2 of the Postgresql manual
295 a value of zero (SQL_ASCII) indicates
296 "a declaration of ignorance about the encoding".
297 Accordingly, we use the default encoding
298 if we find this value.
300 encoding = enc ? pg_encoding_to_char (enc) : get_default_encoding ();
302 /* Create the dictionary and populate it */
303 *dict = r->dict = dict_create (encoding);
306 const int version = PQserverVersion (r->conn);
307 ds_init_empty (&query);
309 Versions before 9.1 don't have the REPEATABLE READ isolation level.
310 However according to <a12321aabb@gmail.com> if the server is in the
311 "hot standby" mode then SERIALIZABLE won't work.
313 ds_put_c_format (&query,
314 "BEGIN READ ONLY ISOLATION LEVEL %s; "
315 "DECLARE pspp BINARY CURSOR FOR ",
316 (version < 90100) ? "SERIALIZABLE" : "REPEATABLE READ");
318 ds_put_substring (&query, info->sql.ss);
320 qres = PQexec (r->conn, ds_cstr (&query));
322 if ( PQresultStatus (qres) != PGRES_COMMAND_OK )
324 msg (ME, _("Error from psql source: %s."),
325 PQresultErrorMessage (qres));
332 /* Now use the count() function to find the total number of cases
333 that this query returns.
334 Doing this incurs some overhead. The server has to iterate every
335 case in order to find this number. However, it's performed on the
336 server side, and in all except the most huge databases the extra
337 overhead will be worth the effort.
338 On the other hand, most PSPP functions don't need to know this.
339 The GUI is the notable exception.
341 ds_init_cstr (&query, "SELECT count (*) FROM (");
342 ds_put_substring (&query, info->sql.ss);
343 ds_put_cstr (&query, ") stupid_sql_standard");
345 qres = PQexec (r->conn, ds_cstr (&query));
347 if ( PQresultStatus (qres) != PGRES_TUPLES_OK )
349 msg (ME, _("Error from psql source: %s."),
350 PQresultErrorMessage (qres));
353 n_cases = atol (PQgetvalue (qres, 0, 0));
356 qres = PQexec (r->conn, "FETCH FIRST FROM pspp");
357 if ( PQresultStatus (qres) != PGRES_TUPLES_OK )
359 msg (ME, _("Error from psql source: %s."),
360 PQresultErrorMessage (qres));
364 n_tuples = PQntuples (qres);
365 n_fields = PQnfields (qres);
371 for (i = 0 ; i < n_fields ; ++i )
373 struct variable *var;
374 struct fmt_spec fmt = {FMT_F, 8, 2};
375 Oid type = PQftype (qres, i);
379 /* If there are no data then make a finger in the air
380 guess at the contents */
382 length = PQgetlength (qres, 0, i);
384 length = PSQL_DEFAULT_WIDTH;
398 fmt.type = FMT_DOLLAR;
402 width = length > 0 ? length : 1;
410 width = (info->str_width == -1) ?
411 ROUND_UP (length, PSQL_DEFAULT_WIDTH) : info->str_width;
417 width = length > 0 ? length : PSQL_DEFAULT_WIDTH;
422 fmt.type = FMT_DTIME;
442 fmt.type = FMT_DATETIME;
454 msg (MW, _("Unsupported OID %d. SYSMIS values will be inserted."), type);
456 width = length > 0 ? length : PSQL_DEFAULT_WIDTH;
462 if ( width == 0 && fmt_is_string (fmt.type))
463 fmt.w = width = PSQL_DEFAULT_WIDTH;
466 var = create_var (r, &fmt, width, PQfname (qres, i), i);
467 if ( type == NUMERICOID && n_tuples > 0)
469 const uint8_t *vptr = (const uint8_t *) PQgetvalue (qres, 0, i);
471 int16_t n_digits, weight, dscale;
474 GET_VALUE (&vptr, n_digits);
475 GET_VALUE (&vptr, weight);
476 GET_VALUE (&vptr, sign);
477 GET_VALUE (&vptr, dscale);
481 fmt.w = fmt_max_output_width (fmt.type) ;
482 fmt.d = MIN (dscale, fmt_max_output_decimals (fmt.type, fmt.w));
483 var_set_both_formats (var, &fmt);
486 /* Timezones need an extra variable */
492 ds_init_cstr (&name, var_get_name (var));
493 ds_put_cstr (&name, "-zone");
498 create_var (r, &fmt, 0, ds_cstr (&name), -1);
507 ds_init_cstr (&name, var_get_name (var));
508 ds_put_cstr (&name, "-months");
513 create_var (r, &fmt, 0, ds_cstr (&name), -1);
524 qres = PQexec (r->conn, "MOVE BACKWARD 1 FROM pspp");
525 if ( PQresultStatus (qres) != PGRES_COMMAND_OK)
532 r->cache_size = info->bsize != -1 ? info->bsize: 4096;
534 ds_init_empty (&r->fetch_cmd);
535 ds_put_format (&r->fetch_cmd, "FETCH FORWARD %d FROM pspp", r->cache_size);
538 r->proto = caseproto_ref (dict_get_proto (*dict));
540 return casereader_create_sequential
544 &psql_casereader_class, r);
549 psql_casereader_destroy (NULL, r);
555 psql_casereader_destroy (struct casereader *reader UNUSED, void *r_)
557 struct psql_reader *r = r_;
561 ds_destroy (&r->fetch_cmd);
563 if (r->res) PQclear (r->res);
565 caseproto_unref (r->proto);
572 static struct ccase *
573 psql_casereader_read (struct casereader *reader UNUSED, void *r_)
575 struct psql_reader *r = r_;
577 if ( NULL == r->res || r->tuple >= r->cache_size)
579 if ( ! reload_cache (r) )
583 return set_value (r);
586 static struct ccase *
587 set_value (struct psql_reader *r)
595 n_vars = PQnfields (r->res);
597 if ( r->tuple >= PQntuples (r->res))
600 c = case_create (r->proto);
601 case_set_missing (c);
604 for (i = 0 ; i < n_vars ; ++i )
606 Oid type = PQftype (r->res, i);
607 const struct variable *v = r->vmap[i];
608 union value *val = case_data_rw (c, v);
610 union value *val1 = NULL;
617 if (i < r->vmapsize && var_get_dict_index(v) + 1 < dict_get_var_cnt (r->dict))
619 const struct variable *v1 = NULL;
620 v1 = dict_get_var (r->dict, var_get_dict_index (v) + 1);
622 val1 = case_data_rw (c, v1);
630 if (PQgetisnull (r->res, r->tuple, i))
632 value_set_missing (val, var_get_width (v));
647 const uint8_t *vptr = (const uint8_t *) PQgetvalue (r->res, r->tuple, i);
648 int length = PQgetlength (r->res, r->tuple, i);
650 int var_width = var_get_width (v);
656 GET_VALUE (&vptr, x);
665 GET_VALUE (&vptr, x);
673 GET_VALUE (&vptr, x);
681 GET_VALUE (&vptr, x);
689 GET_VALUE (&vptr, n);
697 GET_VALUE (&vptr, n);
704 /* Postgres 8.3 uses 64 bits.
705 Earlier versions use 32 */
711 GET_VALUE (&vptr, x);
718 GET_VALUE (&vptr, x);
731 if ( r->integer_datetimes )
738 GET_VALUE (&vptr, things);
739 GET_VALUE (&vptr, us);
740 GET_VALUE (&vptr, days);
741 GET_VALUE (&vptr, months);
743 val->f = us / 1000000.0;
744 val->f += days * 24 * 3600;
750 uint32_t days, months;
753 GET_VALUE (&vptr, seconds);
754 GET_VALUE (&vptr, days);
755 GET_VALUE (&vptr, months);
758 val->f += days * 24 * 3600;
769 GET_VALUE (&vptr, x);
771 val->f = (x + r->postgres_epoch) * 24 * 3600 ;
777 if ( r->integer_datetimes)
780 GET_VALUE (&vptr, x);
781 val->f = x / 1000000.0;
786 GET_VALUE (&vptr, x);
795 if ( r->integer_datetimes)
800 GET_VALUE (&vptr, x);
801 val->f = x / 1000000.0;
807 GET_VALUE (&vptr, x);
811 GET_VALUE (&vptr, zone);
812 val1->f = zone / 3600.0;
819 if ( r->integer_datetimes)
823 GET_VALUE (&vptr, x);
827 val->f = (x + r->postgres_epoch * 24 * 3600 );
833 GET_VALUE (&vptr, x);
835 val->f = (x + r->postgres_epoch * 24 * 3600 );
843 memcpy (val->s, vptr, MIN (length, var_width));
850 int16_t n_digits, weight, dscale;
853 GET_VALUE (&vptr, n_digits);
854 GET_VALUE (&vptr, weight);
855 GET_VALUE (&vptr, sign);
856 GET_VALUE (&vptr, dscale);
863 fmt.w = fmt_max_output_width (fmt.type) ;
864 fmt.d = MIN (dscale, fmt_max_output_decimals (fmt.type, fmt.w));
865 var_set_both_formats (v, &fmt);
869 for (i = 0 ; i < n_digits; ++i)
872 GET_VALUE (&vptr, x);
873 f += x * pow (10000, weight--);